test suite: make test scripts importable as modules

This commit is contained in:
The MMGen Project 2023-10-13 09:51:14 +00:00
commit a7e816f98e
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
13 changed files with 153 additions and 144 deletions

View file

@ -1012,46 +1012,46 @@ class CmdTestRunner:
else:
return None
# main()
if __name__ == '__main__':
if not cfg.skipping_deps: # do this before list cmds exit, so we stay in sync with shm_dir
create_tmp_dirs(shm_dir)
if not cfg.skipping_deps: # do this before list cmds exit, so we stay in sync with shm_dir
create_tmp_dirs(shm_dir)
if cfg.list_cmd_groups:
CmdGroupMgr().list_cmd_groups()
elif cfg.list_cmds:
list_cmds()
elif cmd_args and cmd_args[0] in utils:
globals()[cmd_args[0]](*cmd_args[1:])
sys.exit(0)
if cfg.list_cmd_groups:
CmdGroupMgr().list_cmd_groups()
elif cfg.list_cmds:
list_cmds()
elif cmd_args and cmd_args[0] in utils:
globals()[cmd_args[0]](*cmd_args[1:])
sys.exit(0)
if cfg.pause:
set_restore_term_at_exit()
if cfg.pause:
set_restore_term_at_exit()
from mmgen.exception import TestSuiteException,TestSuiteFatalException,TestSuiteSpawnedScriptException
from mmgen.exception import TestSuiteException,TestSuiteFatalException,TestSuiteSpawnedScriptException
try:
tr = CmdTestRunner(data_dir,trash_dir)
tr.run_tests(cmd_args)
tr.warn_skipped()
if tr.daemon_started:
stop_test_daemons(network_id)
except KeyboardInterrupt:
if tr.daemon_started:
stop_test_daemons(network_id)
tr.warn_skipped()
die(1,'\ntest.py exiting at user request')
except TestSuiteException as e:
die(2,e.args[0])
except TestSuiteFatalException as e:
die(4,e.args[0])
except TestSuiteSpawnedScriptException as e:
# if spawned script is not running under exec_wrapper, output brief error msg:
if os.getenv('MMGEN_EXEC_WRAPPER'):
Msg(red(str(e)))
Msg(blue('cmdtest.py: spawned script exited with error'))
except Exception:
# if cmdtest.py itself is running under exec_wrapper, re-raise so exec_wrapper can handle exception:
if os.getenv('MMGEN_EXEC_WRAPPER') or not os.getenv('MMGEN_IGNORE_TEST_PY_EXCEPTION'):
raise
die(1,red('Test script exited with error'))
try:
tr = CmdTestRunner(data_dir,trash_dir)
tr.run_tests(cmd_args)
tr.warn_skipped()
if tr.daemon_started:
stop_test_daemons(network_id)
except KeyboardInterrupt:
if tr.daemon_started:
stop_test_daemons(network_id)
tr.warn_skipped()
die(1,'\ntest.py exiting at user request')
except TestSuiteException as e:
die(2,e.args[0])
except TestSuiteFatalException as e:
die(4,e.args[0])
except TestSuiteSpawnedScriptException as e:
# if spawned script is not running under exec_wrapper, output brief error msg:
if os.getenv('MMGEN_EXEC_WRAPPER'):
Msg(red(str(e)))
Msg(blue('cmdtest.py: spawned script exited with error'))
except Exception:
# if cmdtest.py itself is running under exec_wrapper, re-raise so exec_wrapper can handle exception:
if os.getenv('MMGEN_EXEC_WRAPPER') or not os.getenv('MMGEN_IGNORE_TEST_PY_EXCEPTION'):
raise
die(1,red('Test script exited with error'))

View file

@ -44,4 +44,5 @@ def test_color():
ret = get_terminfo_colors()
msg(f'{os.getenv("TERM")} (this terminal): {orange(str(ret))}')
test_color()
if __name__ == '__main__':
test_color()

View file

@ -582,4 +582,5 @@ vmsg = cfg._util.vmsg
proto = cfg._proto
main()
if __name__ == '__main__':
main()

View file

@ -27,15 +27,7 @@ try:
except ImportError:
from test.include import test_init
from mmgen.util import die
assert len(sys.argv) in (2,3),"Test takes 1 or 2 arguments: test name, plus optional rounds count"
test = sys.argv[1].capitalize()
assert test in ('Sha256','Sha512','Keccak'), "Valid choices for test are 'sha256','sha512' or 'keccak'"
random_rounds = int(sys.argv[2]) if len(sys.argv) == 3 else 500
def msg(s):
sys.stderr.write(s)
from mmgen.util import msg,msg_r,die
def green(s):
return '\033[32;1m' + s + '\033[0m'
@ -43,7 +35,7 @@ def green(s):
class TestHashFunc:
def test_constants(self):
msg('Testing generated constants: ')
msg_r('Testing generated constants: ')
h = self.t_cls(b'foo')
if h.H_init != self.H_ref:
m = 'Generated constants H[] differ from reference value:\nReference:\n{}\nGenerated:\n{}'
@ -51,7 +43,7 @@ class TestHashFunc:
if h.K != self.K_ref:
m = 'Generated constants K[] differ from reference value:\nReference:\n{}\nGenerated:\n{}'
die(3,m.format([hex(n) for n in self.K_ref],[hex(n) for n in h.K]))
msg('OK\n')
msg('OK')
def compare_hashes(self,data,chk=None):
if chk is None:
@ -63,19 +55,19 @@ class TestHashFunc:
def test_ref(self):
for i,data in enumerate(self.vectors):
msg(f'\rTesting reference input data: {i+1:4}/{len(self.vectors)} ')
msg_r(f'\rTesting reference input data: {i+1:4}/{len(self.vectors)} ')
self.compare_hashes(data.encode(), chk=self.vectors[data])
msg('OK\n')
msg('OK')
def test_random(self,rounds):
if not self.hashlib:
return
for i in range(rounds):
if i+1 in (1,rounds) or not (i+1) % 10:
msg(f'\rTesting random input data: {i+1:4}/{rounds} ')
msg_r(f'\rTesting random input data: {i+1:4}/{rounds} ')
dlen = int(getrand(4).hex(),16) >> 18
self.compare_hashes(getrand(dlen))
msg('OK\n')
msg('OK')
class TestKeccak(TestHashFunc):
desc = 'keccak_256'
@ -180,13 +172,18 @@ class TestSha512(TestSha2):
0x113f9804bef90dae, 0x1b710b35131c471b, 0x28db77f523047d84, 0x32caab7b40c72493, 0x3c9ebe0a15c9bebc,
0x431d67c49c100d4c, 0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817 )
from test.include.common import getrand,set_globals
from mmgen.cfg import Config
if __name__ == '__main__':
from test.include.common import getrand,set_globals
from mmgen.cfg import Config
set_globals(Config())
assert len(sys.argv) in (2,3),"Test takes 1 or 2 arguments: test name, plus optional rounds count"
test = sys.argv[1].capitalize()
assert test in ('Sha256','Sha512','Keccak'), "Valid choices for test are 'sha256','sha512' or 'keccak'"
random_rounds = int(sys.argv[2]) if len(sys.argv) == 3 else 500
t = globals()['Test'+test]()
msg(f'Testing internal implementation of {t.desc}\n')
t.test_constants()
t.test_ref()
t.test_random(random_rounds)
set_globals(Config())
t = globals()['Test'+test]()
msg(f'Testing internal implementation of {t.desc}')
t.test_constants()
t.test_ref()
t.test_random(random_rounds)

View file

@ -15,6 +15,7 @@ test.include.coin_daemon_control: Start and stop daemons for the MMGen test suit
from mmgen.cfg import Config,gc
from mmgen.util import msg,die,oneshot_warning,async_run
from mmgen.protocol import init_proto
from mmgen.daemon import CoinDaemon
xmr_wallet_network_ids = {
'xmrw': 'mainnet',
@ -58,10 +59,6 @@ Valid network IDs: {nid}, {xmrw_nid}, all, no_xmr
}
}
cfg = Config(opts_data=opts_data)
from mmgen.daemon import CoinDaemon
class warn_missing_exec(oneshot_warning):
color = 'nocolor'
message = 'daemon executable {!r} not found on this system!'
@ -118,27 +115,31 @@ def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False):
else:
d.cmd(action,quiet=cfg.quiet)
if cfg.daemon_ids:
print('\n'.join(CoinDaemon.all_daemon_ids()))
elif 'all' in cfg._args or 'no_xmr' in cfg._args:
if len(cfg._args) != 1:
die(1,"'all' or 'no_xmr' must be the sole argument")
for coin in CoinDaemon.coins:
if coin == 'XMR' and cfg._args[0] == 'no_xmr':
continue
for daemon_id in CoinDaemon.get_daemon_ids(cfg,coin):
for network in CoinDaemon.get_daemon(cfg,coin,daemon_id).networks:
run(
proto = init_proto( cfg, coin=coin, network=network ),
daemon_id = daemon_id,
missing_exec_ok = True )
else:
ids = cfg._args
network_ids = CoinDaemon.get_network_ids(cfg)
if not ids:
cfg._opts.usage()
for i in ids:
if i not in network_ids + list(xmr_wallet_network_ids):
die(1,f'{i!r}: invalid network ID')
for network_id in ids:
run(network_id=network_id.lower())
def main():
if cfg.daemon_ids:
print('\n'.join(CoinDaemon.all_daemon_ids()))
elif 'all' in cfg._args or 'no_xmr' in cfg._args:
if len(cfg._args) != 1:
die(1,"'all' or 'no_xmr' must be the sole argument")
for coin in CoinDaemon.coins:
if coin == 'XMR' and cfg._args[0] == 'no_xmr':
continue
for daemon_id in CoinDaemon.get_daemon_ids(cfg,coin):
for network in CoinDaemon.get_daemon(cfg,coin,daemon_id).networks:
run(
proto = init_proto( cfg, coin=coin, network=network ),
daemon_id = daemon_id,
missing_exec_ok = True )
else:
ids = cfg._args
network_ids = CoinDaemon.get_network_ids(cfg)
if not ids:
cfg._opts.usage()
for i in ids:
if i not in network_ids + list(xmr_wallet_network_ids):
die(1,f'{i!r}: invalid network ID')
for network_id in ids:
run(network_id=network_id.lower())
cfg = Config(opts_data=opts_data)

View file

@ -178,4 +178,5 @@ def do_loop():
proto = cfg._proto
do_loop()
if __name__ == '__main__':
do_loop()

View file

@ -215,4 +215,5 @@ def do_loop():
proto = cfg._proto
do_loop()
if __name__ == '__main__':
do_loop()

View file

@ -151,10 +151,11 @@ def do_passwd_tests():
s = fs.format('','')
do_test(cmd,tdata,s+' '*(40-len(s)),'password')
start_time = int(time.time())
if __name__ == '__main__':
start_time = int(time.time())
cmds = cfg._args or ('coin','pw')
for cmd in cmds:
{'coin': do_coin_tests, 'pw': do_passwd_tests }[cmd]()
cmds = cfg._args or ('coin','pw')
for cmd in cmds:
{'coin': do_coin_tests, 'pw': do_passwd_tests }[cmd]()
end_msg(int(time.time()) - start_time)
end_msg(int(time.time()) - start_time)

View file

@ -5,6 +5,8 @@ test/start-coin-daemons.py: Start daemons for the MMGen test suite
"""
try:
import include.coin_daemon_control
from include.coin_daemon_control import main
except ImportError:
import test.include.coin_daemon_control
from test.include.coin_daemon_control import main
main()

View file

@ -5,6 +5,8 @@ test/stop-coin-daemons.py: Stop daemons for the MMGen test suite
"""
try:
import include.coin_daemon_control
from include.coin_daemon_control import main
except ImportError:
import test.include.coin_daemon_control
from test.include.coin_daemon_control import main
main()

View file

@ -486,28 +486,29 @@ def do_cmds(cmd_group):
cmdline = [cmd] + [os.path.join(tcfg['tmpdir'],fn) for fn in fns]
getattr(tc,cmd)(*cmdline)
try:
if cfg._args:
if len(cfg._args) != 1:
die(1,'Only one command may be specified')
cmd = cfg._args[0]
if cmd in cmd_data:
cleandir(tcfg['tmpdir'],do_msg=True)
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
do_cmds(cmd)
elif cmd == 'clean':
cleandir(tcfg['tmpdir'],do_msg=True)
sys.exit(0)
if __name__ == '__main__':
try:
if cfg._args:
if len(cfg._args) != 1:
die(1,'Only one command may be specified')
cmd = cfg._args[0]
if cmd in cmd_data:
cleandir(tcfg['tmpdir'],do_msg=True)
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
do_cmds(cmd)
elif cmd == 'clean':
cleandir(tcfg['tmpdir'],do_msg=True)
sys.exit(0)
else:
die(1,f'{cmd!r}: unrecognized command')
else:
die(1,f'{cmd!r}: unrecognized command')
else:
cleandir(tcfg['tmpdir'],do_msg=True)
for cmd in cmd_data:
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
do_cmds(cmd)
if cmd is not list(cmd_data.keys())[-1]:
msg('')
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
cleandir(tcfg['tmpdir'],do_msg=True)
for cmd in cmd_data:
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
do_cmds(cmd)
if cmd is not list(cmd_data.keys())[-1]:
msg('')
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
end_msg(int(time.time()) - start_time)
end_msg(int(time.time()) - start_time)

View file

@ -1013,6 +1013,6 @@ async def main():
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
async_run(main())
end_msg(int(time.time()) - start_time)
if __name__ == '__main__':
async_run(main())
end_msg(int(time.time()) - start_time)

View file

@ -203,16 +203,17 @@ def run_test(test,subtest=None):
if not mod.unit_test().run_test(test,UnitTestHelpers(test)):
die(4,'Unit test {test!r} failed')
try:
for test in (cfg._args or all_tests):
if '.' in test:
test,subtest = test.split('.')
else:
subtest = None
if test not in all_tests:
die(1,f'{test!r}: test not recognized')
if test not in exclude:
run_test(test,subtest=subtest)
end_msg(int(time.time()) - start_time)
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
if __name__ == '__main__':
try:
for test in (cfg._args or all_tests):
if '.' in test:
test,subtest = test.split('.')
else:
subtest = None
if test not in all_tests:
die(1,f'{test!r}: test not recognized')
if test not in exclude:
run_test(test,subtest=subtest)
end_msg(int(time.time()) - start_time)
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))