cmdtest.py: modularize CmdTestRunner, CmdGroupMgr

This commit is contained in:
The MMGen Project 2025-03-18 11:09:51 +03:00
commit bb7deefdc5
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
3 changed files with 751 additions and 695 deletions

View file

@ -20,7 +20,7 @@
test/cmdtest.py: Command test runner for the MMGen wallet system
"""
def check_segwit_opts():
def check_segwit_opts(proto):
for k, m in (('segwit', 'S'), ('segwit_random', 'S'), ('bech32', 'B')):
if getattr(cfg, k) and m not in proto.mmtypes:
die(1, f'--{k.replace("_", "-")} option incompatible with {proto.cls_name}')
@ -73,7 +73,7 @@ def create_shm_dir(data_dir, trash_dir):
return shm_dir
import sys, os, time, asyncio
import sys, os, time
# overlay must be set up before importing mmgen mods!
try:
@ -81,24 +81,16 @@ try:
except ImportError:
from test.include.test_init import repo_root
from mmgen.cfg import Config, gc
from mmgen.color import red, yellow, green, blue, cyan, gray, nocolor, init_color
from mmgen.util import msg, Msg, rmsg, bmsg, die, suf, make_timestr
from mmgen.cfg import Config
from mmgen.color import red, yellow, green, blue, init_color
from mmgen.util import msg, Msg, rmsg, die
from test.include.common import (
set_globals,
cmdtest_py_log_fn,
cmdtest_py_error_fn,
mk_tmpdir,
iqmsg,
omsg,
omsg_r,
ok,
start_test_daemons,
stop_test_daemons,
init_coverage,
clean,
)
stop_test_daemons)
try:
os.unlink(os.path.join(repo_root, cmdtest_py_error_fn))
@ -197,11 +189,6 @@ if cfg.no_altcoin and cfg.coin != 'BTC':
set_globals(cfg)
from test.cmdtest_d.common import ( # this must be loaded after set_globals()
get_file_with_ext,
confirm_continue
)
type(cfg)._reset_ok += (
'no_daemon_autostart',
'names',
@ -210,8 +197,6 @@ type(cfg)._reset_ok += (
'resuming',
'skipping_deps')
logging = cfg.log or os.getenv('MMGEN_EXEC_WRAPPER')
cfg.resuming = any(k in po.user_opts for k in ('resume', 'resume_after'))
cfg.skipping_deps = cfg.resuming or 'skip_deps' in po.user_opts
@ -223,10 +208,6 @@ if cfg.pexpect_spawn and sys.platform == 'win32':
if cfg.daemon_id and cfg.daemon_id in cfg.blacklisted_daemons.split():
die(1, f'cmdtest.py: daemon {cfg.daemon_id!r} blacklisted, exiting')
network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '')
proto = cfg._proto
# step 3: move data_dir to /dev/shm and symlink it back to ./test:
trash_dir = os.path.join('test', 'trash')
trash_dir2 = os.path.join('test', 'trash2')
@ -234,7 +215,7 @@ trash_dir2 = os.path.join('test', 'trash2')
if not cfg.skipping_deps:
shm_dir = create_shm_dir(data_dir, trash_dir)
check_segwit_opts()
check_segwit_opts(cfg._proto)
testing_segwit = cfg.segwit or cfg.segwit_random or cfg.bech32
@ -261,7 +242,8 @@ def list_cmds():
def gen_output():
gm = CmdGroupMgr()
from test.cmdtest_d.group_mgr import CmdGroupMgr
gm = CmdGroupMgr(cfg)
cw, d = 0, []
yield green('AVAILABLE COMMANDS:')
@ -286,12 +268,6 @@ def list_cmds():
sys.exit(0)
def do_between():
if cfg.pause:
confirm_continue()
elif (cfg.verbose or cfg.exact_output) and not cfg.skipping_deps:
sys.stderr.write('\n')
def create_tmp_dirs(shm_dir):
if sys.platform in ('win32', 'darwin'):
for cfg in sorted(cfgs):
@ -317,671 +293,14 @@ def set_restore_term_at_exit():
termios.tcsetattr(fd, termios.TCSADRAIN, old)
atexit.register(at_exit)
class CmdGroupMgr:
dpy_data = None
from test.cmdtest_d.cfg import cmd_groups_dfl, cmd_groups_extra
cmd_groups = cmd_groups_dfl.copy()
cmd_groups.update(cmd_groups_extra)
@staticmethod
def create_cmd_group(cls, sg_name=None):
cmd_group_in = dict(cls.cmd_group_in)
if sg_name and 'subgroup.' + sg_name not in cmd_group_in:
die(1, f'{sg_name!r}: no such subgroup in test group {cls.__name__}')
def add_entries(key, add_deps=True, added_subgroups=[]):
if add_deps:
for dep in cmd_group_in['subgroup.'+key]:
yield from add_entries(dep)
assert isinstance(cls.cmd_subgroups[key][0], str), f'header for subgroup {key!r} missing!'
if not key in added_subgroups:
yield from cls.cmd_subgroups[key][1:]
added_subgroups.append(key)
def gen():
for name, data in cls.cmd_group_in:
if name.startswith('subgroup.'):
sg_key = name.removeprefix('subgroup.')
if sg_name in (None, sg_key):
yield from add_entries(
sg_key,
add_deps = sg_name and not cfg.skipping_deps,
added_subgroups = [sg_name] if cfg.deps_only else [])
if cfg.deps_only and sg_key == sg_name:
return
elif not cfg.skipping_deps:
yield (name, data)
return tuple(gen())
def load_mod(self, gname, modname=None):
clsname, kwargs = self.cmd_groups[gname]
if modname is None and 'modname' in kwargs:
modname = kwargs['modname']
import importlib
modpath = f'test.cmdtest_d.ct_{modname or gname}'
return getattr(importlib.import_module(modpath), clsname)
def create_group(self, gname, sg_name, full_data=False, modname=None, is3seed=False, add_dpy=False):
"""
Initializes the list 'cmd_list' and dict 'dpy_data' from module's cmd_group data.
Alternatively, if called with 'add_dpy=True', updates 'dpy_data' from module data
without touching 'cmd_list'
"""
cls = self.load_mod(gname, modname)
cdata = []
def get_shared_deps(cmdname, tmpdir_idx):
"""
shared_deps are "implied" dependencies for all cmds in cmd_group that don't appear in
the cmd_group data or cmds' argument lists. Supported only for 3seed tests at present.
"""
if not hasattr(cls, 'shared_deps'):
return []
return [k for k, v in cfgs[str(tmpdir_idx)]['dep_generators'].items()
if k in cls.shared_deps and v != cmdname]
if not hasattr(cls, 'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls, sg_name)
for a, b in cls.cmd_group:
if is3seed:
for n, (i, j) in enumerate(zip(cls.tmpdir_nums, (128, 192, 256))):
k = f'{a}_{n+1}'
if hasattr(cls, 'skip_cmds') and k in cls.skip_cmds:
continue
sdeps = get_shared_deps(k, i)
if isinstance(b, str):
cdata.append((k, (i, f'{b} ({j}-bit)', [[[]+sdeps, i]])))
else:
cdata.append((k, (i, f'{b[1]} ({j}-bit)', [[b[0]+sdeps, i]])))
else:
cdata.append((a, b if full_data else (cls.tmpdir_nums[0], b, [[[], cls.tmpdir_nums[0]]])))
if add_dpy:
self.dpy_data.update(dict(cdata))
else:
self.cmd_list = tuple(e[0] for e in cdata)
self.dpy_data = dict(cdata)
return cls
def gm_init_group(self, trunner, gname, sg_name, spawn_prog):
kwargs = self.cmd_groups[gname][1]
cls = self.create_group(gname, sg_name, **kwargs)
cls.group_name = gname
return cls(trunner, cfgs, spawn_prog)
def get_cls_by_gname(self, gname):
return self.load_mod(gname, self.cmd_groups[gname][1].get('modname'))
def list_cmd_groups(self):
ginfo = []
for gname in self.cmd_groups:
ginfo.append((gname, self.get_cls_by_gname(gname)))
if cfg.list_current_cmd_groups:
exclude = (cfg.exclude_groups or '').split(',')
ginfo = [g for g in ginfo
if network_id in g[1].networks
and not g[0] in exclude
and g[0] in tuple(self.cmd_groups_dfl) + tuple(cmd_args)]
desc = 'CONFIGURED'
else:
desc = 'AVAILABLE'
def gen_output():
yield green(f'{desc} COMMAND GROUPS AND SUBGROUPS:')
yield ''
for name, cls in ginfo:
yield ' {} - {}'.format(
yellow(name.ljust(13)),
(cls.__doc__.strip() if cls.__doc__ else cls.__name__))
if 'cmd_subgroups' in cls.__dict__:
subgroups = {k:v for k, v in cls.cmd_subgroups.items() if not k.startswith('_')}
max_w = max(len(k) for k in subgroups)
for k, v in subgroups.items():
yield ' + {} · {}'.format(cyan(k.ljust(max_w+1)), v[0])
from mmgen.ui import do_pager
do_pager('\n'.join(gen_output()))
Msg('\n' + ' '.join(e[0] for e in ginfo))
sys.exit(0)
def find_cmd_in_groups(self, cmd, group=None):
"""
Search for a test command in specified group or all configured command groups
and return it as a string. Loads modules but alters no global variables.
"""
if group:
if not group in [e[0] for e in self.cmd_groups]:
die(1, f'{group!r}: unrecognized group')
groups = [self.cmd_groups[group]]
else:
groups = self.cmd_groups
for gname in groups:
cls = self.get_cls_by_gname(gname)
if not hasattr(cls, 'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls)
if cmd in cls.cmd_group: # first search the class
return gname
if cmd in dir(cls(None, None, None)): # then a throwaway instance
return gname # cmd might exist in more than one group - we'll go with the first
return None
class CmdTestRunner:
'cmdtest.py test runner'
def __del__(self):
if logging:
self.log_fd.close()
def __init__(self, data_dir, trash_dir):
self.data_dir = data_dir
self.trash_dir = trash_dir
self.cmd_total = 0
self.rebuild_list = {}
self.gm = CmdGroupMgr()
self.repo_root = repo_root
self.skipped_warnings = []
self.resume_cmd = None
self.deps_only = None
if logging:
self.log_fd = open(cmdtest_py_log_fn, 'a')
self.log_fd.write(f'\nLog started: {make_timestr()} UTC\n')
omsg(f'INFO → Logging to file {cmdtest_py_log_fn!r}')
else:
self.log_fd = None
if cfg.coverage:
coverdir, accfile = init_coverage()
omsg(f'INFO → Writing coverage files to {coverdir!r}')
self.pre_args = ['python3', '-m', 'trace', '--count', '--coverdir='+coverdir, '--file='+accfile]
else:
self.pre_args = ['python3'] if sys.platform == 'win32' else []
if cfg.pexpect_spawn:
omsg('INFO → Using pexpect.spawn() for real terminal emulation')
self.set_spawn_env()
def set_spawn_env(self):
self.spawn_env = dict(os.environ)
self.spawn_env.update({
'MMGEN_NO_LICENSE': '1',
'MMGEN_BOGUS_SEND': '1',
'MMGEN_TEST_SUITE_PEXPECT': '1',
'EXEC_WRAPPER_DO_RUNTIME_MSG':'1',
# if cmdtest.py itself is running under exec_wrapper, disable writing of traceback file for spawned script
'EXEC_WRAPPER_TRACEBACK': '' if os.getenv('MMGEN_EXEC_WRAPPER') else '1',
})
if cfg.exact_output:
from mmgen.term import get_terminal_size
self.spawn_env['MMGEN_COLUMNS'] = str(get_terminal_size().width)
else:
self.spawn_env['MMGEN_COLUMNS'] = '120'
def spawn_wrapper(
self,
cmd = '',
args = [],
extra_desc = '',
no_output = False,
msg_only = False,
no_msg = False,
cmd_dir = 'cmds',
no_exec_wrapper = False,
timeout = None,
pexpect_spawn = None,
direct_exec = False,
no_passthru_opts = False,
spawn_env_override = None,
exit_val = None,
env = {}):
self.exit_val = exit_val
desc = self.tg.test_name if cfg.names else self.gm.dpy_data[self.tg.test_name][1]
if extra_desc:
desc += ' ' + extra_desc
cmd_path = (
cmd if cfg.system # cfg.system is broken for main test group with overlay tree
else os.path.relpath(os.path.join(repo_root, cmd_dir, cmd)))
passthru_opts = (
self.passthru_opts if not no_passthru_opts else
[] if no_passthru_opts is True else
[o for o in self.passthru_opts if o[2:].split('=')[0] not in no_passthru_opts])
args = (
self.pre_args +
([] if no_exec_wrapper else ['scripts/exec_wrapper.py']) +
[cmd_path] +
passthru_opts +
args)
try:
qargs = ['{q}{}{q}'.format(a, q = "'" if ' ' in a else '') for a in args]
except:
msg(f'args: {args}')
raise
cmd_disp = ' '.join(qargs).replace('\\', '/') # for mingw
if logging:
self.log_fd.write('[{}][{}:{}] {}\n'.format(
(proto.coin.lower() if 'coin' in self.tg.passthru_opts else 'NONE'),
self.tg.group_name,
self.tg.test_name,
cmd_disp))
for i in args: # die only after writing log entry
if not isinstance(i, str):
die(2, 'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format(
self.tg.test_name,
args))
if not no_msg:
t_pfx = '' if cfg.no_timings else f'[{time.time() - self.start_time:08.2f}] '
if cfg.verbose or cfg.print_cmdline or cfg.exact_output:
omsg(green(f'{t_pfx}Testing: {desc}'))
if not msg_only:
clr1, clr2 = (nocolor, nocolor) if cfg.print_cmdline else (green, cyan)
omsg(
clr1('Executing: ') +
clr2(repr(cmd_disp) if sys.platform == 'win32' else cmd_disp)
)
else:
omsg_r('{a}Testing {b}: {c}'.format(
a = t_pfx,
b = desc,
c = 'OK\n' if direct_exec or cfg.direct_exec else ''))
if msg_only:
return
# NB: the `pexpect_spawn` arg enables hold_protect and send_delay while the corresponding cmdline
# option does not. For performance reasons, this is the desired behavior. For full emulation of
# the user experience with hold protect enabled, specify --buf-keypress or --demo.
send_delay = 0.4 if pexpect_spawn is True or cfg.buf_keypress else None
pexpect_spawn = pexpect_spawn if pexpect_spawn is not None else bool(cfg.pexpect_spawn)
spawn_env = dict(spawn_env_override or self.tg.spawn_env)
spawn_env.update({
'MMGEN_HOLD_PROTECT_DISABLE': '' if send_delay else '1',
'MMGEN_TEST_SUITE_POPEN_SPAWN': '' if pexpect_spawn else '1',
'EXEC_WRAPPER_EXIT_VAL': '' if exit_val is None else str(exit_val),
})
spawn_env.update(env)
from test.include.pexpect import MMGenPexpect
return MMGenPexpect(
args = args,
no_output = no_output,
spawn_env = spawn_env,
pexpect_spawn = pexpect_spawn,
timeout = timeout,
send_delay = send_delay,
direct_exec = direct_exec)
def end_msg(self):
t = int(time.time() - self.start_time)
sys.stderr.write(green(
f'{self.cmd_total} test{suf(self.cmd_total)} performed' +
('\n' if cfg.no_timings else f'. Elapsed time: {t//60:02d}:{t%60:02d}\n')
))
def init_group(self, gname, sg_name=None, cmd=None, quiet=False, do_clean=True):
from test.cmdtest_d.cfg import cmd_groups_altcoin
if cfg.no_altcoin and gname in cmd_groups_altcoin:
omsg(gray(f'INFO → skipping test group {gname!r} (--no-altcoin)'))
return None
ct_cls = CmdGroupMgr().load_mod(gname)
if sys.platform in ct_cls.platform_skip:
omsg(gray(f'INFO → skipping test {gname!r} for platform {sys.platform!r}'))
return None
for k in ('segwit', 'segwit_random', 'bech32'):
if getattr(cfg, k):
segwit_opt = k
break
else:
segwit_opt = None
def gen_msg():
yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname, c=cmd)
if len(ct_cls.networks) != 1:
yield f' for {proto.coin} {proto.network}'
if segwit_opt:
yield ' (--{})'.format(segwit_opt.replace('_', '-'))
m = ''.join(gen_msg())
if segwit_opt and not ct_cls.segwit_opts_ok:
iqmsg(gray(f'INFO → skipping {m}'))
return None
# 'networks = ()' means all networks allowed
nws = [(e.split('_')[0], 'testnet') if '_' in e else (e, 'mainnet') for e in ct_cls.networks]
if nws:
coin = proto.coin.lower()
for a, b in nws:
if a == coin and b == proto.network:
break
else:
iqmsg(gray(f'INFO → skipping {m} for {proto.coin} {proto.network}'))
return None
if do_clean and not cfg.skipping_deps:
clean(cfgs, tmpdir_ids=ct_cls.tmpdir_nums, extra_dirs=[data_dir, trash_dir, trash_dir2])
if not quiet:
bmsg('Executing ' + m)
if (not self.daemon_started) and self.gm.get_cls_by_gname(gname).need_daemon:
start_test_daemons(network_id, remove_datadir=True)
self.daemon_started = True
if hasattr(self, 'tg'):
del self.tg
self.tg = self.gm.gm_init_group(self, gname, sg_name, self.spawn_wrapper)
self.ct_clsname = type(self.tg).__name__
# pass through opts from cmdline (po.user_opts)
self.passthru_opts = ['--{}{}'.format(
k.replace('_', '-'),
'' if cfg._uopts[k] is True else '=' + cfg._uopts[k]
) for k in cfg._uopts if k in self.tg.base_passthru_opts + self.tg.passthru_opts]
if cfg.resuming:
rc = cfg.resume or cfg.resume_after
offset = 1 if cfg.resume_after else 0
self.resume_cmd = self.gm.cmd_list[self.gm.cmd_list.index(rc)+offset]
omsg(f'INFO → Resuming at command {self.resume_cmd!r}')
if cfg.step:
cfg.exit_after = self.resume_cmd
if cfg.exit_after and cfg.exit_after not in self.gm.cmd_list:
die(1, f'{cfg.exit_after!r}: command not recognized')
return self.tg
def run_tests(self, cmd_args):
self.start_time = time.time()
self.daemon_started = False
gname_save = None
def parse_arg(arg):
if '.' in arg:
a, b = arg.split('.')
return [a] + b.split(':') if ':' in b else [a, b, None]
elif ':' in arg:
a, b = arg.split(':')
return [a, None, b]
else:
return [self.gm.find_cmd_in_groups(arg), None, arg]
if cmd_args:
for arg in cmd_args:
if arg in self.gm.cmd_groups:
if self.init_group(arg):
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd, build=True)
do_between()
else:
gname, sg_name, cmdname = parse_arg(arg)
if gname:
same_grp = gname == gname_save # same group as previous cmd: don't clean, suppress blue msg
if self.init_group(gname, sg_name, cmdname, quiet=same_grp, do_clean=not same_grp):
if cmdname:
if cfg.deps_only:
self.deps_only = cmdname
try:
self.check_needs_rerun(cmdname, build=True)
except Exception as e: # allow calling of functions not in cmd_group
if isinstance(e, KeyError) and e.args[0] == cmdname:
ret = getattr(self.tg, cmdname)()
if type(ret).__name__ == 'coroutine':
ret = asyncio.run(ret)
self.process_retval(cmdname, ret)
else:
raise
do_between()
else:
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd, build=True)
do_between()
gname_save = gname
else:
die(1, f'{arg!r}: command not recognized')
else:
if cfg.exclude_groups:
exclude = cfg.exclude_groups.split(',')
for e in exclude:
if e not in self.gm.cmd_groups_dfl:
die(1, f'{e!r}: group not recognized')
for gname in self.gm.cmd_groups_dfl:
if cfg.exclude_groups and gname in exclude:
continue
if self.init_group(gname):
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd, build=True)
do_between()
self.end_msg()
def check_needs_rerun(
self,
cmd,
build = False,
root = True,
force_delete = False,
dpy = False):
self.tg.test_name = cmd
if self.ct_clsname == 'CmdTestMain' and testing_segwit and cmd not in self.tg.segwit_do:
return False
rerun = root # force_delete is not passed to recursive call
fns = []
if force_delete or not root:
# does cmd produce a needed dependency(ies)?
ret = self.get_num_exts_for_cmd(cmd)
if ret:
for ext in ret[1]:
fn = get_file_with_ext(cfgs[ret[0]]['tmpdir'], ext, delete=build)
if fn:
if force_delete:
os.unlink(fn)
else: fns.append(fn)
else: rerun = True
fdeps = self.generate_file_deps(cmd)
cdeps = self.generate_cmd_deps(fdeps)
for fn in fns:
my_age = os.stat(fn).st_mtime
for num, ext in fdeps:
f = get_file_with_ext(cfgs[num]['tmpdir'], ext, delete=build)
if f and os.stat(f).st_mtime > my_age:
rerun = True
for cdep in cdeps:
if self.check_needs_rerun(cdep, build=build, root=False, dpy=cmd):
rerun = True
if build:
if rerun:
for fn in fns:
if not root:
os.unlink(fn)
if not (dpy and cfg.skipping_deps):
self.run_test(cmd)
if not root:
do_between()
else:
# If prog produces multiple files:
if cmd not in self.rebuild_list or rerun is True:
self.rebuild_list[cmd] = (rerun, fns[0] if fns else '') # FIX
return rerun
def run_test(self, cmd):
if self.deps_only and cmd == self.deps_only:
sys.exit(0)
d = [(str(num), ext) for exts, num in self.gm.dpy_data[cmd][2] for ext in exts]
# delete files depended on by this cmd
arg_list = [get_file_with_ext(cfgs[num]['tmpdir'], ext) for num, ext in d]
# remove shared_deps from arg list
if hasattr(self.tg, 'shared_deps'):
arg_list = arg_list[:-len(self.tg.shared_deps)]
if self.resume_cmd:
if cmd != self.resume_cmd:
return
bmsg(f'Resuming at {self.resume_cmd!r}')
self.resume_cmd = None
cfg.skipping_deps = False
cfg.resuming = False
if cfg.profile:
start = time.time()
self.tg.test_name = cmd # NB: Do not remove, this needs to be set twice
cdata = self.gm.dpy_data[cmd]
# self.tg.test_dpydata = cdata
self.tg.tmpdir_num = cdata[0]
# self.tg.cfg = cfgs[str(cdata[0])] # will remove this eventually
test_cfg = cfgs[str(cdata[0])]
for k in (
'seed_len', 'seed_id', 'wpasswd', 'kapasswd', 'segwit', 'hash_preset', 'bw_filename',
'bw_params', 'ref_bw_seed_id', 'addr_idx_list', 'pass_idx_list'):
if k in test_cfg:
setattr(self.tg, k, test_cfg[k])
ret = getattr(self.tg, cmd)(*arg_list) # run the test
if type(ret).__name__ == 'coroutine':
ret = asyncio.run(ret)
self.process_retval(cmd, ret)
if cfg.profile:
omsg('\r\033[50C{:.4f}'.format(time.time() - start))
if cmd == cfg.exit_after:
sys.exit(0)
def warn_skipped(self):
if self.skipped_warnings:
print(yellow('The following tests were skipped and may require attention:'))
r = '-' * 72 + '\n'
print(r+('\n'+r).join(self.skipped_warnings))
def process_retval(self, cmd, ret):
if type(ret).__name__ == 'MMGenPexpect':
ret.ok(exit_val=self.exit_val)
self.cmd_total += 1
elif ret == 'ok':
ok()
self.cmd_total += 1
elif ret == 'error':
die(2, red(f'\nTest {self.tg.test_name!r} failed'))
elif ret in ('skip', 'skip_msg', 'silent'):
if ret == 'silent':
self.cmd_total += 1
elif ret == 'skip_msg':
ok('SKIP')
elif isinstance(ret, tuple) and ret[0] == 'skip_warn':
self.skipped_warnings.append(
'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n'))))
else:
die(2, f'{cmd!r} returned {ret}')
def check_deps(self, cmds): # TODO: broken
if len(cmds) != 1:
die(1, f'Usage: {gc.prog_name} check_deps <command>')
cmd = cmds[0]
if cmd not in self.gm.cmd_list:
die(1, f'{cmd!r}: unrecognized command')
if not cfg.quiet:
omsg(f'Checking dependencies for {cmd!r}')
self.check_needs_rerun(self.tg, cmd)
w = max(map(len, self.rebuild_list)) + 1
for cmd in self.rebuild_list:
c = self.rebuild_list[cmd]
m = 'Rebuild' if (c[0] and c[1]) else 'Build' if c[0] else 'OK'
omsg('cmd {:<{w}} {}'.format(cmd+':', m, w=w))
def generate_file_deps(self, cmd):
return [(str(n), e) for exts, n in self.gm.dpy_data[cmd][2] for e in exts]
def generate_cmd_deps(self, fdeps):
return [cfgs[str(n)]['dep_generators'][ext] for n, ext in fdeps]
def get_num_exts_for_cmd(self, cmd):
try:
num = str(self.gm.dpy_data[cmd][0])
except KeyError:
qmsg_r(f'Missing dependency {cmd!r}')
gname = self.gm.find_cmd_in_groups(cmd)
if gname:
kwargs = self.gm.cmd_groups[gname][1]
kwargs.update({'add_dpy':True})
self.gm.create_group(gname, None, **kwargs)
num = str(self.gm.dpy_data[cmd][0])
qmsg(f' found in group {gname!r}')
else:
qmsg(' not found in any command group!')
raise
dgl = cfgs[num]['dep_generators']
if cmd in dgl.values():
exts = [k for k in dgl if dgl[k] == cmd]
return (num, exts)
else:
return None
if __name__ == '__main__':
if not cfg.skipping_deps: # do this before list cmds exit, so we stay in sync with shm_dir
create_tmp_dirs(shm_dir)
if cfg.list_cmd_groups:
CmdGroupMgr().list_cmd_groups()
from test.cmdtest_d.group_mgr import CmdGroupMgr
CmdGroupMgr(cfg).list_cmd_groups()
elif cfg.list_cmds:
list_cmds()
@ -989,19 +308,20 @@ if __name__ == '__main__':
set_restore_term_at_exit()
from mmgen.exception import TestSuiteSpawnedScriptException
from test.cmdtest_d.runner import CmdTestRunner
try:
tr = CmdTestRunner(data_dir, trash_dir)
tr = CmdTestRunner(cfg, cfg._proto, repo_root, data_dir, trash_dir, trash_dir2)
tr.run_tests(cmd_args)
tr.warn_skipped()
if tr.daemon_started and not cfg.no_daemon_stop:
stop_test_daemons(network_id, remove_datadir=True)
stop_test_daemons(tr.network_id, remove_datadir=True)
if hasattr(tr, 'tg'):
del tr.tg
del tr
except KeyboardInterrupt:
if tr.daemon_started and not cfg.no_daemon_stop:
stop_test_daemons(network_id, remove_datadir=True)
stop_test_daemons(tr.network_id, remove_datadir=True)
tr.warn_skipped()
if hasattr(tr, 'tg'):
del tr.tg

190
test/cmdtest_d/group_mgr.py Executable file
View file

@ -0,0 +1,190 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
test.include.group_mgr: Command group manager for the MMGen Wallet cmdtest suite
"""
import sys, os, time
from mmgen.color import yellow, green, cyan
from mmgen.util import Msg, die
from .cfg import cfgs, cmd_groups_dfl, cmd_groups_extra
class CmdGroupMgr:
dpy_data = None
cmd_groups = cmd_groups_dfl.copy()
cmd_groups.update(cmd_groups_extra)
def __init__(self, cfg):
self.cfg = cfg
self.network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '')
self.name = type(self).__name__
def create_cmd_group(self, cls, sg_name=None):
cmd_group_in = dict(cls.cmd_group_in)
if sg_name and 'subgroup.' + sg_name not in cmd_group_in:
die(1, f'{sg_name!r}: no such subgroup in test group {cls.__name__}')
def add_entries(key, add_deps=True, added_subgroups=[]):
if add_deps:
for dep in cmd_group_in['subgroup.'+key]:
yield from add_entries(dep)
assert isinstance(cls.cmd_subgroups[key][0], str), f'header for subgroup {key!r} missing!'
if not key in added_subgroups:
yield from cls.cmd_subgroups[key][1:]
added_subgroups.append(key)
def gen():
for name, data in cls.cmd_group_in:
if name.startswith('subgroup.'):
sg_key = name.removeprefix('subgroup.')
if sg_name in (None, sg_key):
yield from add_entries(
sg_key,
add_deps = sg_name and not self.cfg.skipping_deps,
added_subgroups = [sg_name] if self.cfg.deps_only else [])
if self.cfg.deps_only and sg_key == sg_name:
return
elif not self.cfg.skipping_deps:
yield (name, data)
return tuple(gen())
def load_mod(self, gname, modname=None):
clsname, kwargs = self.cmd_groups[gname]
if modname is None and 'modname' in kwargs:
modname = kwargs['modname']
import importlib
modpath = f'test.cmdtest_d.ct_{modname or gname}'
return getattr(importlib.import_module(modpath), clsname)
def create_group(self, gname, sg_name, full_data=False, modname=None, is3seed=False, add_dpy=False):
"""
Initializes the list 'cmd_list' and dict 'dpy_data' from module's cmd_group data.
Alternatively, if called with 'add_dpy=True', updates 'dpy_data' from module data
without touching 'cmd_list'
"""
cls = self.load_mod(gname, modname)
cdata = []
def get_shared_deps(cmdname, tmpdir_idx):
"""
shared_deps are "implied" dependencies for all cmds in cmd_group that don't appear in
the cmd_group data or cmds' argument lists. Supported only for 3seed tests at present.
"""
if not hasattr(cls, 'shared_deps'):
return []
return [k for k, v in cfgs[str(tmpdir_idx)]['dep_generators'].items()
if k in cls.shared_deps and v != cmdname]
if not hasattr(cls, 'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls, sg_name)
for a, b in cls.cmd_group:
if is3seed:
for n, (i, j) in enumerate(zip(cls.tmpdir_nums, (128, 192, 256))):
k = f'{a}_{n+1}'
if hasattr(cls, 'skip_cmds') and k in cls.skip_cmds:
continue
sdeps = get_shared_deps(k, i)
if isinstance(b, str):
cdata.append((k, (i, f'{b} ({j}-bit)', [[[]+sdeps, i]])))
else:
cdata.append((k, (i, f'{b[1]} ({j}-bit)', [[b[0]+sdeps, i]])))
else:
cdata.append((a, b if full_data else (cls.tmpdir_nums[0], b, [[[], cls.tmpdir_nums[0]]])))
if add_dpy:
self.dpy_data.update(dict(cdata))
else:
self.cmd_list = tuple(e[0] for e in cdata)
self.dpy_data = dict(cdata)
return cls
def gm_init_group(self, trunner, gname, sg_name, spawn_prog):
kwargs = self.cmd_groups[gname][1]
cls = self.create_group(gname, sg_name, **kwargs)
cls.group_name = gname
return cls(trunner, cfgs, spawn_prog)
def get_cls_by_gname(self, gname):
return self.load_mod(gname, self.cmd_groups[gname][1].get('modname'))
def list_cmd_groups(self):
ginfo = []
for gname in self.cmd_groups:
ginfo.append((gname, self.get_cls_by_gname(gname)))
if self.cfg.list_current_cmd_groups:
exclude = (self.cfg.exclude_groups or '').split(',')
ginfo = [g for g in ginfo
if self.network_id in g[1].networks
and not g[0] in exclude
and g[0] in tuple(self.cmd_groups_dfl) + tuple(self.cfg._args)]
desc = 'CONFIGURED'
else:
desc = 'AVAILABLE'
def gen_output():
yield green(f'{desc} COMMAND GROUPS AND SUBGROUPS:')
yield ''
for name, cls in ginfo:
yield ' {} - {}'.format(
yellow(name.ljust(13)),
(cls.__doc__.strip() if cls.__doc__ else cls.__name__))
if 'cmd_subgroups' in cls.__dict__:
subgroups = {k:v for k, v in cls.cmd_subgroups.items() if not k.startswith('_')}
max_w = max(len(k) for k in subgroups)
for k, v in subgroups.items():
yield ' + {} · {}'.format(cyan(k.ljust(max_w+1)), v[0])
from mmgen.ui import do_pager
do_pager('\n'.join(gen_output()))
Msg('\n' + ' '.join(e[0] for e in ginfo))
sys.exit(0)
def find_cmd_in_groups(self, cmd, group=None):
"""
Search for a test command in specified group or all configured command groups
and return it as a string. Loads modules but alters no global variables.
"""
if group:
if not group in [e[0] for e in self.cmd_groups]:
die(1, f'{group!r}: unrecognized group')
groups = [self.cmd_groups[group]]
else:
groups = self.cmd_groups
for gname in groups:
cls = self.get_cls_by_gname(gname)
if not hasattr(cls, 'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls)
if cmd in cls.cmd_group: # first search the class
return gname
if cmd in dir(cls(None, None, None)): # then a throwaway instance
return gname # cmd might exist in more than one group - we'll go with the first
return None

546
test/cmdtest_d/runner.py Executable file
View file

@ -0,0 +1,546 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
test.include.runner: test runner for the MMGen Wallet cmdtest suite
"""
import sys, os, time, asyncio
from mmgen.cfg import gc
from mmgen.color import red, yellow, green, blue, cyan, gray, nocolor
from mmgen.util import msg, Msg, rmsg, bmsg, die, suf, make_timestr
from test.include.common import (
cmdtest_py_log_fn,
iqmsg,
omsg,
omsg_r,
ok,
start_test_daemons,
init_coverage,
clean
)
from .common import get_file_with_ext, confirm_continue
from .cfg import cfgs, cmd_groups_dfl
from .group_mgr import CmdGroupMgr
class CmdTestRunner:
'cmdtest.py test runner'
def __del__(self):
if self.logging:
self.log_fd.close()
def __init__(self, cfg, proto, repo_root, data_dir, trash_dir, trash_dir2):
self.cfg = cfg
self.proto = proto
self.data_dir = data_dir
self.trash_dir = trash_dir
self.trash_dir2 = trash_dir2
self.cmd_total = 0
self.rebuild_list = {}
self.gm = CmdGroupMgr(cfg)
self.repo_root = repo_root
self.skipped_warnings = []
self.resume_cmd = None
self.deps_only = None
self.logging = self.cfg.log or os.getenv('MMGEN_EXEC_WRAPPER')
self.testing_segwit = cfg.segwit or cfg.segwit_random or cfg.bech32
self.network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '')
global qmsg, qmsg_r
if cfg.exact_output:
qmsg = qmsg_r = lambda s: None
else:
qmsg = cfg._util.qmsg
qmsg_r = cfg._util.qmsg_r
if self.logging:
self.log_fd = open(cmdtest_py_log_fn, 'a')
self.log_fd.write(f'\nLog started: {make_timestr()} UTC\n')
omsg(f'INFO → Logging to file {cmdtest_py_log_fn!r}')
else:
self.log_fd = None
if self.cfg.coverage:
coverdir, accfile = init_coverage()
omsg(f'INFO → Writing coverage files to {coverdir!r}')
self.pre_args = ['python3', '-m', 'trace', '--count', '--coverdir='+coverdir, '--file='+accfile]
else:
self.pre_args = ['python3'] if sys.platform == 'win32' else []
if self.cfg.pexpect_spawn:
omsg('INFO → Using pexpect.spawn() for real terminal emulation')
self.set_spawn_env()
def do_between(self):
if self.cfg.pause:
confirm_continue()
elif (self.cfg.verbose or self.cfg.exact_output) and not self.cfg.skipping_deps:
sys.stderr.write('\n')
def set_spawn_env(self):
self.spawn_env = dict(os.environ)
self.spawn_env.update({
'MMGEN_NO_LICENSE': '1',
'MMGEN_BOGUS_SEND': '1',
'MMGEN_TEST_SUITE_PEXPECT': '1',
'EXEC_WRAPPER_DO_RUNTIME_MSG':'1',
# if cmdtest.py itself is running under exec_wrapper, disable writing of traceback file for spawned script
'EXEC_WRAPPER_TRACEBACK': '' if os.getenv('MMGEN_EXEC_WRAPPER') else '1',
})
if self.cfg.exact_output:
from mmgen.term import get_terminal_size
self.spawn_env['MMGEN_COLUMNS'] = str(get_terminal_size().width)
else:
self.spawn_env['MMGEN_COLUMNS'] = '120'
def spawn_wrapper(
self,
cmd = '',
args = [],
extra_desc = '',
no_output = False,
msg_only = False,
no_msg = False,
cmd_dir = 'cmds',
no_exec_wrapper = False,
timeout = None,
pexpect_spawn = None,
direct_exec = False,
no_passthru_opts = False,
spawn_env_override = None,
exit_val = None,
env = {}):
self.exit_val = exit_val
desc = self.tg.test_name if self.cfg.names else self.gm.dpy_data[self.tg.test_name][1]
if extra_desc:
desc += ' ' + extra_desc
cmd_path = (
cmd if self.cfg.system # self.cfg.system is broken for main test group with overlay tree
else os.path.relpath(os.path.join(self.repo_root, cmd_dir, cmd)))
passthru_opts = (
self.passthru_opts if not no_passthru_opts else
[] if no_passthru_opts is True else
[o for o in self.passthru_opts if o[2:].split('=')[0] not in no_passthru_opts])
args = (
self.pre_args +
([] if no_exec_wrapper else ['scripts/exec_wrapper.py']) +
[cmd_path] +
passthru_opts +
args)
try:
qargs = ['{q}{}{q}'.format(a, q = "'" if ' ' in a else '') for a in args]
except:
msg(f'args: {args}')
raise
cmd_disp = ' '.join(qargs).replace('\\', '/') # for mingw
if self.logging:
self.log_fd.write('[{}][{}:{}] {}\n'.format(
(self.proto.coin.lower() if 'coin' in self.tg.passthru_opts else 'NONE'),
self.tg.group_name,
self.tg.test_name,
cmd_disp))
for i in args: # die only after writing log entry
if not isinstance(i, str):
die(2, 'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format(
self.tg.test_name,
args))
if not no_msg:
t_pfx = '' if self.cfg.no_timings else f'[{time.time() - self.start_time:08.2f}] '
if self.cfg.verbose or self.cfg.print_cmdline or self.cfg.exact_output:
omsg(green(f'{t_pfx}Testing: {desc}'))
if not msg_only:
clr1, clr2 = (nocolor, nocolor) if self.cfg.print_cmdline else (green, cyan)
omsg(
clr1('Executing: ') +
clr2(repr(cmd_disp) if sys.platform == 'win32' else cmd_disp)
)
else:
omsg_r('{a}Testing {b}: {c}'.format(
a = t_pfx,
b = desc,
c = 'OK\n' if direct_exec or self.cfg.direct_exec else ''))
if msg_only:
return
# NB: the `pexpect_spawn` arg enables hold_protect and send_delay while the corresponding cmdline
# option does not. For performance reasons, this is the desired behavior. For full emulation of
# the user experience with hold protect enabled, specify --buf-keypress or --demo.
send_delay = 0.4 if pexpect_spawn is True or self.cfg.buf_keypress else None
pexpect_spawn = pexpect_spawn if pexpect_spawn is not None else bool(self.cfg.pexpect_spawn)
spawn_env = dict(spawn_env_override or self.tg.spawn_env)
spawn_env.update({
'MMGEN_HOLD_PROTECT_DISABLE': '' if send_delay else '1',
'MMGEN_TEST_SUITE_POPEN_SPAWN': '' if pexpect_spawn else '1',
'EXEC_WRAPPER_EXIT_VAL': '' if exit_val is None else str(exit_val),
})
spawn_env.update(env)
from test.include.pexpect import MMGenPexpect
return MMGenPexpect(
args = args,
no_output = no_output,
spawn_env = spawn_env,
pexpect_spawn = pexpect_spawn,
timeout = timeout,
send_delay = send_delay,
direct_exec = direct_exec)
def end_msg(self):
t = int(time.time() - self.start_time)
sys.stderr.write(green(
f'{self.cmd_total} test{suf(self.cmd_total)} performed' +
('\n' if self.cfg.no_timings else f'. Elapsed time: {t//60:02d}:{t%60:02d}\n')
))
def init_group(self, gname, sg_name=None, cmd=None, quiet=False, do_clean=True):
from test.cmdtest_d.cfg import cmd_groups_altcoin
if self.cfg.no_altcoin and gname in cmd_groups_altcoin:
omsg(gray(f'INFO → skipping test group {gname!r} (--no-altcoin)'))
return None
ct_cls = self.gm.load_mod(gname)
if sys.platform in ct_cls.platform_skip:
omsg(gray(f'INFO → skipping test {gname!r} for platform {sys.platform!r}'))
return None
for k in ('segwit', 'segwit_random', 'bech32'):
if getattr(self.cfg, k):
segwit_opt = k
break
else:
segwit_opt = None
def gen_msg():
yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname, c=cmd)
if len(ct_cls.networks) != 1:
yield f' for {self.proto.coin} {self.proto.network}'
if segwit_opt:
yield ' (--{})'.format(segwit_opt.replace('_', '-'))
m = ''.join(gen_msg())
if segwit_opt and not ct_cls.segwit_opts_ok:
iqmsg(gray(f'INFO → skipping {m}'))
return None
# 'networks = ()' means all networks allowed
nws = [(e.split('_')[0], 'testnet') if '_' in e else (e, 'mainnet') for e in ct_cls.networks]
if nws:
coin = self.proto.coin.lower()
for a, b in nws:
if a == coin and b == self.proto.network:
break
else:
iqmsg(gray(f'INFO → skipping {m} for {self.proto.coin} {self.proto.network}'))
return None
if do_clean and not self.cfg.skipping_deps:
clean(
cfgs,
tmpdir_ids = ct_cls.tmpdir_nums,
extra_dirs = [self.data_dir, self.trash_dir, self.trash_dir2])
if not quiet:
bmsg('Executing ' + m)
if (not self.daemon_started) and self.gm.get_cls_by_gname(gname).need_daemon:
start_test_daemons(self.network_id, remove_datadir=True)
self.daemon_started = True
if hasattr(self, 'tg'):
del self.tg
self.tg = self.gm.gm_init_group(self, gname, sg_name, self.spawn_wrapper)
self.ct_clsname = type(self.tg).__name__
# pass through opts from cmdline (po.user_opts)
self.passthru_opts = ['--{}{}'.format(
k.replace('_', '-'),
'' if self.cfg._uopts[k] is True else '=' + self.cfg._uopts[k]
) for k in self.cfg._uopts if k in self.tg.base_passthru_opts + self.tg.passthru_opts]
if self.cfg.resuming:
rc = self.cfg.resume or self.cfg.resume_after
offset = 1 if self.cfg.resume_after else 0
self.resume_cmd = self.gm.cmd_list[self.gm.cmd_list.index(rc)+offset]
omsg(f'INFO → Resuming at command {self.resume_cmd!r}')
if self.cfg.step:
self.cfg.exit_after = self.resume_cmd
if self.cfg.exit_after and self.cfg.exit_after not in self.gm.cmd_list:
die(1, f'{self.cfg.exit_after!r}: command not recognized')
return self.tg
def run_tests(self, cmd_args):
self.start_time = time.time()
self.daemon_started = False
gname_save = None
def parse_arg(arg):
if '.' in arg:
a, b = arg.split('.')
return [a] + b.split(':') if ':' in b else [a, b, None]
elif ':' in arg:
a, b = arg.split(':')
return [a, None, b]
else:
return [self.gm.find_cmd_in_groups(arg), None, arg]
if cmd_args:
for arg in cmd_args:
if arg in self.gm.cmd_groups:
if self.init_group(arg):
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd, build=True)
self.do_between()
else:
gname, sg_name, cmdname = parse_arg(arg)
if gname:
same_grp = gname == gname_save # same group as previous cmd: don't clean, suppress blue msg
if self.init_group(gname, sg_name, cmdname, quiet=same_grp, do_clean=not same_grp):
if cmdname:
if self.cfg.deps_only:
self.deps_only = cmdname
try:
self.check_needs_rerun(cmdname, build=True)
except Exception as e: # allow calling of functions not in cmd_group
if isinstance(e, KeyError) and e.args[0] == cmdname:
ret = getattr(self.tg, cmdname)()
if type(ret).__name__ == 'coroutine':
ret = asyncio.run(ret)
self.process_retval(cmdname, ret)
else:
raise
self.do_between()
else:
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd, build=True)
self.do_between()
gname_save = gname
else:
die(1, f'{arg!r}: command not recognized')
else:
if self.cfg.exclude_groups:
exclude = self.cfg.exclude_groups.split(',')
for e in exclude:
if e not in cmd_groups_dfl:
die(1, f'{e!r}: group not recognized')
for gname in cmd_groups_dfl:
if self.cfg.exclude_groups and gname in exclude:
continue
if self.init_group(gname):
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd, build=True)
self.do_between()
self.end_msg()
def check_needs_rerun(
self,
cmd,
build = False,
root = True,
force_delete = False,
dpy = False):
self.tg.test_name = cmd
if self.ct_clsname == 'CmdTestMain' and self.testing_segwit and cmd not in self.tg.segwit_do:
return False
rerun = root # force_delete is not passed to recursive call
fns = []
if force_delete or not root:
# does cmd produce a needed dependency(ies)?
ret = self.get_num_exts_for_cmd(cmd)
if ret:
for ext in ret[1]:
fn = get_file_with_ext(cfgs[ret[0]]['tmpdir'], ext, delete=build)
if fn:
if force_delete:
os.unlink(fn)
else: fns.append(fn)
else: rerun = True
fdeps = self.generate_file_deps(cmd)
cdeps = self.generate_cmd_deps(fdeps)
for fn in fns:
my_age = os.stat(fn).st_mtime
for num, ext in fdeps:
f = get_file_with_ext(cfgs[num]['tmpdir'], ext, delete=build)
if f and os.stat(f).st_mtime > my_age:
rerun = True
for cdep in cdeps:
if self.check_needs_rerun(cdep, build=build, root=False, dpy=cmd):
rerun = True
if build:
if rerun:
for fn in fns:
if not root:
os.unlink(fn)
if not (dpy and self.cfg.skipping_deps):
self.run_test(cmd)
if not root:
self.do_between()
else:
# If prog produces multiple files:
if cmd not in self.rebuild_list or rerun is True:
self.rebuild_list[cmd] = (rerun, fns[0] if fns else '') # FIX
return rerun
def run_test(self, cmd):
if self.deps_only and cmd == self.deps_only:
sys.exit(0)
d = [(str(num), ext) for exts, num in self.gm.dpy_data[cmd][2] for ext in exts]
# delete files depended on by this cmd
arg_list = [get_file_with_ext(cfgs[num]['tmpdir'], ext) for num, ext in d]
# remove shared_deps from arg list
if hasattr(self.tg, 'shared_deps'):
arg_list = arg_list[:-len(self.tg.shared_deps)]
if self.resume_cmd:
if cmd != self.resume_cmd:
return
bmsg(f'Resuming at {self.resume_cmd!r}')
self.resume_cmd = None
self.cfg.skipping_deps = False
self.cfg.resuming = False
if self.cfg.profile:
start = time.time()
self.tg.test_name = cmd # NB: Do not remove, this needs to be set twice
cdata = self.gm.dpy_data[cmd]
# self.tg.test_dpydata = cdata
self.tg.tmpdir_num = cdata[0]
# self.tg.self.cfg = cfgs[str(cdata[0])] # will remove this eventually
test_cfg = cfgs[str(cdata[0])]
for k in (
'seed_len', 'seed_id', 'wpasswd', 'kapasswd', 'segwit', 'hash_preset', 'bw_filename',
'bw_params', 'ref_bw_seed_id', 'addr_idx_list', 'pass_idx_list'):
if k in test_cfg:
setattr(self.tg, k, test_cfg[k])
ret = getattr(self.tg, cmd)(*arg_list) # run the test
if type(ret).__name__ == 'coroutine':
ret = asyncio.run(ret)
self.process_retval(cmd, ret)
if self.cfg.profile:
omsg('\r\033[50C{:.4f}'.format(time.time() - start))
if cmd == self.cfg.exit_after:
sys.exit(0)
def warn_skipped(self):
if self.skipped_warnings:
print(yellow('The following tests were skipped and may require attention:'))
r = '-' * 72 + '\n'
print(r+('\n'+r).join(self.skipped_warnings))
def process_retval(self, cmd, ret):
if type(ret).__name__ == 'MMGenPexpect':
ret.ok(exit_val=self.exit_val)
self.cmd_total += 1
elif ret == 'ok':
ok()
self.cmd_total += 1
elif ret == 'error':
die(2, red(f'\nTest {self.tg.test_name!r} failed'))
elif ret in ('skip', 'skip_msg', 'silent'):
if ret == 'silent':
self.cmd_total += 1
elif ret == 'skip_msg':
ok('SKIP')
elif isinstance(ret, tuple) and ret[0] == 'skip_warn':
self.skipped_warnings.append(
'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n'))))
else:
die(2, f'{cmd!r} returned {ret}')
def check_deps(self, cmds): # TODO: broken
if len(cmds) != 1:
die(1, f'Usage: {gc.prog_name} check_deps <command>')
cmd = cmds[0]
if cmd not in self.gm.cmd_list:
die(1, f'{cmd!r}: unrecognized command')
if not self.cfg.quiet:
omsg(f'Checking dependencies for {cmd!r}')
self.check_needs_rerun(self.tg, cmd)
w = max(map(len, self.rebuild_list)) + 1
for cmd in self.rebuild_list:
c = self.rebuild_list[cmd]
m = 'Rebuild' if (c[0] and c[1]) else 'Build' if c[0] else 'OK'
omsg('cmd {:<{w}} {}'.format(cmd+':', m, w=w))
def generate_file_deps(self, cmd):
return [(str(n), e) for exts, n in self.gm.dpy_data[cmd][2] for e in exts]
def generate_cmd_deps(self, fdeps):
return [cfgs[str(n)]['dep_generators'][ext] for n, ext in fdeps]
def get_num_exts_for_cmd(self, cmd):
try:
num = str(self.gm.dpy_data[cmd][0])
except KeyError:
qmsg_r(f'Missing dependency {cmd!r}')
gname = self.gm.find_cmd_in_groups(cmd)
if gname:
kwargs = self.gm.cmd_groups[gname][1]
kwargs.update({'add_dpy':True})
self.gm.create_group(gname, None, **kwargs)
num = str(self.gm.dpy_data[cmd][0])
qmsg(f' found in group {gname!r}')
else:
qmsg(' not found in any command group!')
raise
dgl = cfgs[num]['dep_generators']
if cmd in dgl.values():
exts = [k for k in dgl if dgl[k] == cmd]
return (num, exts)
else:
return None