568 lines
16 KiB
Python
Executable file
568 lines
16 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# MMGen Wallet, a terminal-based cryptocurrency wallet
|
|
# Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
|
|
# Licensed under the GNU General Public License, Version 3:
|
|
# https://www.gnu.org/licenses
|
|
# Public project repositories:
|
|
# https://github.com/mmgen/mmgen-wallet
|
|
# https://gitlab.com/mmgen/mmgen-wallet
|
|
|
|
"""
|
|
test.cmdtest_d.include.runner: test runner for the MMGen Wallet cmdtest suite
|
|
"""
|
|
|
|
import sys, os, time, asyncio
|
|
from collections import namedtuple
|
|
|
|
from mmgen.cfg import gc
|
|
from mmgen.color import red, yellow, green, blue, cyan, gray, nocolor
|
|
from mmgen.util import msg, Msg, rmsg, ymsg, bmsg, die, suf, make_timestr, isAsync, capfirst
|
|
|
|
from ...include.common import (
|
|
cmdtest_py_log_fn,
|
|
iqmsg,
|
|
omsg,
|
|
omsg_r,
|
|
ok,
|
|
start_test_daemons,
|
|
init_coverage,
|
|
clean
|
|
)
|
|
|
|
from .common import get_file_with_ext, confirm_continue
|
|
from .cfg import cfgs
|
|
from .group_mgr import CmdGroupMgr
|
|
|
|
def format_args(args):
|
|
try:
|
|
return ' '.join((f"'{a}'" if ' ' in a else a) for a in args).replace('\\', '/') # for MSYS2
|
|
except Exception as e:
|
|
print(type(e), e)
|
|
print('cmdline:', args)
|
|
|
|
class CmdTestRunner:
|
|
'cmdtest.py test runner'
|
|
|
|
def __del__(self):
|
|
if self.logging:
|
|
self.log_fd.close()
|
|
|
|
def __init__(self, cfg, repo_root, data_dir, trash_dir, trash_dir2):
|
|
|
|
self.cfg = cfg
|
|
self.proto = cfg._proto
|
|
self.data_dir = data_dir
|
|
self.trash_dir = trash_dir
|
|
self.trash_dir2 = trash_dir2
|
|
self.cmd_total = 0
|
|
self.rebuild_list = {}
|
|
self.gm = CmdGroupMgr(cfg)
|
|
self.repo_root = repo_root
|
|
self.warnings = []
|
|
self.skipped_warnings = []
|
|
self.resume_cmd = None
|
|
self.deps_only = None
|
|
self.logging = self.cfg.log or os.getenv('MMGEN_EXEC_WRAPPER')
|
|
self.testing_segwit = cfg.segwit or cfg.segwit_random or cfg.bech32
|
|
self.network_id = self.proto.coin.lower() + ('_tn' if self.proto.testnet else '')
|
|
self.daemon_started = False
|
|
self.quiet = not (cfg.exact_output or cfg.verbose)
|
|
|
|
global qmsg, qmsg_r
|
|
if cfg.exact_output:
|
|
qmsg = qmsg_r = lambda s: None
|
|
else:
|
|
qmsg = cfg._util.qmsg
|
|
qmsg_r = cfg._util.qmsg_r
|
|
|
|
if self.logging:
|
|
self.log_fd = open(cmdtest_py_log_fn, 'a')
|
|
self.log_fd.write(f'\nLog started: {make_timestr()} UTC\n')
|
|
omsg(f'INFO → Logging to file {cmdtest_py_log_fn!r}')
|
|
else:
|
|
self.log_fd = None
|
|
|
|
if self.cfg.coverage:
|
|
coverdir, accfile = init_coverage()
|
|
omsg(f'INFO → Writing coverage files to {coverdir!r}')
|
|
self.pre_args = ['python3', '-m', 'trace', '--count', '--coverdir='+coverdir, '--file='+accfile]
|
|
else:
|
|
self.pre_args = ['python3'] if gc.platform == 'win32' else []
|
|
|
|
if self.cfg.pexpect_spawn:
|
|
omsg('INFO → Using pexpect.spawn() for real terminal emulation')
|
|
|
|
self.set_spawn_env()
|
|
self.start_time = time.time()
|
|
|
|
def do_between(self):
|
|
if self.cfg.pause:
|
|
confirm_continue()
|
|
elif not (self.quiet or self.cfg.skipping_deps):
|
|
sys.stderr.write('\n')
|
|
|
|
def set_spawn_env(self):
|
|
|
|
self.spawn_env = dict(os.environ)
|
|
|
|
self.spawn_env.update({
|
|
'MMGEN_NO_LICENSE': '1',
|
|
'MMGEN_BOGUS_SEND': '1',
|
|
'MMGEN_TEST_SUITE_PEXPECT': '1',
|
|
'EXEC_WRAPPER_DO_RUNTIME_MSG':'1',
|
|
# if cmdtest.py itself is running under exec_wrapper, disable writing of traceback file for spawned script
|
|
'EXEC_WRAPPER_TRACEBACK': '' if os.getenv('MMGEN_EXEC_WRAPPER') else '1'})
|
|
|
|
if self.cfg.dev_mode:
|
|
self.spawn_env.update({
|
|
'PYTHONDEVMODE': '1',
|
|
'PYTHONTRACEMALLOC': '10'})
|
|
|
|
if self.cfg.exact_output:
|
|
from mmgen.term import get_terminal_size
|
|
self.spawn_env['MMGEN_COLUMNS'] = str(get_terminal_size().width)
|
|
else:
|
|
self.spawn_env['MMGEN_COLUMNS'] = '120'
|
|
|
|
def spawn_wrapper(
|
|
self,
|
|
cmd = '',
|
|
args = [],
|
|
extra_desc = '',
|
|
no_output = False,
|
|
msg_only = False,
|
|
log_only = False,
|
|
no_msg = False,
|
|
cmd_dir = 'cmds',
|
|
no_exec_wrapper = False,
|
|
timeout = None,
|
|
pexpect_spawn = None,
|
|
direct_exec = False,
|
|
no_passthru_opts = False,
|
|
spawn_env_override = None,
|
|
exit_val = None,
|
|
silent = False,
|
|
env = {}):
|
|
|
|
self.exit_val = exit_val
|
|
|
|
desc = self.tg.test_name if self.cfg.names else self.gm.dpy_data[self.tg.test_name].desc
|
|
|
|
if extra_desc:
|
|
desc += ' ' + extra_desc
|
|
|
|
cmd_path = (
|
|
cmd if self.cfg.system # self.cfg.system is broken for main test group with overlay tree
|
|
else os.path.relpath(os.path.join(self.repo_root, cmd_dir, cmd)))
|
|
|
|
passthru_opts = (
|
|
self.passthru_opts if not no_passthru_opts else
|
|
[] if no_passthru_opts is True else
|
|
[o for o in self.passthru_opts
|
|
if o[2:].split('=')[0].replace('-','_') not in no_passthru_opts])
|
|
|
|
args = (
|
|
self.pre_args +
|
|
([] if no_exec_wrapper else ['scripts/exec_wrapper.py']) +
|
|
[cmd_path] +
|
|
passthru_opts +
|
|
args)
|
|
|
|
cmd_disp = format_args(args)
|
|
|
|
if self.logging:
|
|
self.log_fd.write('[{}][{}:{}] {}\n'.format(
|
|
self.proto.coin.lower(),
|
|
self.tg.group_name,
|
|
self.tg.test_name,
|
|
cmd_disp))
|
|
|
|
if log_only:
|
|
return
|
|
|
|
for i in args: # die only after writing log entry
|
|
if not isinstance(i, str):
|
|
die(2, 'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format(
|
|
self.tg.test_name,
|
|
args))
|
|
|
|
if not no_msg:
|
|
t_pfx = '' if self.cfg.no_timings else f'[{time.time() - self.start_time:08.2f}] '
|
|
if (not self.quiet) or self.cfg.print_cmdline:
|
|
omsg(green(f'{t_pfx}Testing: {desc}'))
|
|
if not msg_only:
|
|
clr1, clr2 = (nocolor, nocolor) if self.cfg.print_cmdline else (green, cyan)
|
|
omsg(
|
|
clr1('Executing: ') +
|
|
clr2(repr(cmd_disp) if gc.platform == 'win32' else cmd_disp)
|
|
)
|
|
else:
|
|
omsg_r('{a}Testing {b}: {c}'.format(
|
|
a = t_pfx,
|
|
b = desc,
|
|
c = 'OK\n' if direct_exec or self.cfg.direct_exec else ''))
|
|
|
|
if msg_only:
|
|
return
|
|
|
|
# NB: the `pexpect_spawn` arg enables hold_protect and send_delay while the corresponding cmdline
|
|
# option does not. For performance reasons, this is the desired behavior. For full emulation of
|
|
# the user experience with hold protect enabled, specify --buf-keypress or --demo.
|
|
send_delay = 0.4 if pexpect_spawn is True or self.cfg.buf_keypress else None
|
|
pexpect_spawn = pexpect_spawn if pexpect_spawn is not None else bool(self.cfg.pexpect_spawn)
|
|
|
|
spawn_env = dict(spawn_env_override or self.tg.spawn_env)
|
|
spawn_env.update({
|
|
'MMGEN_HOLD_PROTECT_DISABLE': '' if send_delay else '1',
|
|
'MMGEN_TEST_SUITE_POPEN_SPAWN': '' if pexpect_spawn else '1',
|
|
'EXEC_WRAPPER_EXIT_VAL': '' if exit_val is None else str(exit_val),
|
|
})
|
|
spawn_env.update(env)
|
|
|
|
from .pexpect import CmdTestPexpect
|
|
return CmdTestPexpect(
|
|
args = args,
|
|
no_output = no_output,
|
|
spawn_env = spawn_env,
|
|
pexpect_spawn = pexpect_spawn,
|
|
timeout = timeout,
|
|
send_delay = send_delay,
|
|
silent = silent,
|
|
direct_exec = direct_exec)
|
|
|
|
def end_msg(self):
|
|
t = int(time.time() - self.start_time)
|
|
sys.stderr.write(green(
|
|
f'{self.cmd_total} test{suf(self.cmd_total)} performed' +
|
|
('\n' if self.cfg.no_timings else f'. Elapsed time: {t//60:02d}:{t%60:02d}\n')
|
|
))
|
|
|
|
def init_group(self, gname, sg_name=None, cmd=None, quiet=False, do_clean=True):
|
|
|
|
from .cfg import cmd_groups_altcoin
|
|
if self.cfg.no_altcoin and gname in cmd_groups_altcoin:
|
|
omsg(gray(f'INFO → skipping test group {gname!r} (--no-altcoin)'))
|
|
return None
|
|
|
|
ct_cls = self.gm.load_mod(gname)
|
|
|
|
if gc.platform in ct_cls.platform_skip:
|
|
omsg(gray(f'INFO → skipping test {gname!r} for platform {gc.platform!r}'))
|
|
return None
|
|
|
|
for k in ('segwit', 'segwit_random', 'bech32'):
|
|
if getattr(self.cfg, k):
|
|
segwit_opt = k
|
|
break
|
|
else:
|
|
segwit_opt = None
|
|
|
|
def gen_msg():
|
|
yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname, c=cmd)
|
|
if len(ct_cls.networks) != 1:
|
|
yield f' for {self.proto.coin} {self.proto.network}'
|
|
if segwit_opt:
|
|
yield ' (--{})'.format(segwit_opt.replace('_', '-'))
|
|
|
|
m = ''.join(gen_msg())
|
|
|
|
if segwit_opt and not ct_cls.segwit_opts_ok:
|
|
iqmsg(gray(f'INFO → skipping {m}'))
|
|
return None
|
|
|
|
# 'networks = ()' means all networks allowed
|
|
nws = [(e.split('_')[0], 'testnet') if '_' in e else (e, 'mainnet') for e in ct_cls.networks]
|
|
if nws:
|
|
coin = self.proto.coin.lower()
|
|
for a, b in nws:
|
|
if a == coin and b == self.proto.network:
|
|
break
|
|
else:
|
|
iqmsg(gray(f'INFO → skipping {m} for {self.proto.coin} {self.proto.network}'))
|
|
return None
|
|
|
|
if do_clean and not self.cfg.skipping_deps:
|
|
clean(
|
|
cfgs,
|
|
tmpdir_ids = ct_cls.tmpdir_nums,
|
|
extra_dirs = [self.data_dir, self.trash_dir, self.trash_dir2])
|
|
|
|
if not quiet:
|
|
bmsg('Executing ' + m)
|
|
|
|
if (not self.daemon_started) and self.gm.get_cls_by_gname(gname).need_daemon:
|
|
start_test_daemons(self.network_id, remove_datadir=True)
|
|
self.daemon_started = True
|
|
|
|
if hasattr(self, 'tg'):
|
|
del self.tg
|
|
|
|
self.tg = self.gm.gm_init_group(self.cfg, self, gname, sg_name, self.spawn_wrapper)
|
|
self.ct_clsname = type(self.tg).__name__
|
|
|
|
# pass through opts from cmdline (po.user_opts)
|
|
self.passthru_opts = ['--{}{}'.format(
|
|
k.replace('_', '-'),
|
|
'' if self.cfg._uopts[k] is True else '=' + self.cfg._uopts[k]
|
|
) for k in self.cfg._uopts
|
|
if self.cfg._uopts[k] and k in self.tg.base_passthru_opts + self.tg.passthru_opts]
|
|
|
|
if self.cfg.resuming:
|
|
rc = self.cfg.resume or self.cfg.resume_after
|
|
offset = 1 if self.cfg.resume_after else 0
|
|
self.resume_cmd = self.gm.cmd_list[self.gm.cmd_list.index(rc)+offset]
|
|
omsg(f'INFO → Resuming at command {self.resume_cmd!r}')
|
|
if self.cfg.step:
|
|
self.cfg.exit_after = self.resume_cmd
|
|
|
|
if self.cfg.exit_after and self.cfg.exit_after not in self.gm.cmd_list:
|
|
die(1, f'{self.cfg.exit_after!r}: command not recognized')
|
|
|
|
return self.tg
|
|
|
|
def run_tests(self, cmd_args):
|
|
gname_save = None
|
|
|
|
def parse_arg(arg):
|
|
if '.' in arg:
|
|
a, b = arg.split('.')
|
|
return [a] + b.split(':') if ':' in b else [a, b, None]
|
|
elif ':' in arg:
|
|
a, b = arg.split(':')
|
|
return [a, None, b]
|
|
else:
|
|
return [self.gm.find_cmd_in_groups(arg), None, arg]
|
|
|
|
if cmd_args:
|
|
for arg in cmd_args:
|
|
if arg in self.gm.cmd_groups:
|
|
if self.init_group(arg):
|
|
for cmd in self.gm.cmd_list:
|
|
self.check_needs_rerun(cmd, build=True)
|
|
self.do_between()
|
|
else:
|
|
gname, sg_name, cmdname = parse_arg(arg)
|
|
if gname:
|
|
same_grp = gname == gname_save # same group as previous cmd: don't clean, suppress blue msg
|
|
if self.init_group(gname, sg_name, cmdname, quiet=same_grp, do_clean=not same_grp):
|
|
if cmdname:
|
|
if self.cfg.deps_only:
|
|
self.deps_only = cmdname
|
|
try:
|
|
self.check_needs_rerun(cmdname, build=True)
|
|
except Exception as e: # allow calling of functions not in cmd_group
|
|
if isinstance(e, KeyError) and e.args[0] == cmdname:
|
|
func = getattr(self.tg, cmdname)
|
|
self.process_retval(
|
|
cmdname,
|
|
asyncio.run(func()) if isAsync(func) else func())
|
|
else:
|
|
raise
|
|
self.do_between()
|
|
else:
|
|
for cmd in self.gm.cmd_list:
|
|
self.check_needs_rerun(cmd, build=True)
|
|
self.do_between()
|
|
gname_save = gname
|
|
else:
|
|
die(1, f'{arg!r}: command not recognized')
|
|
else:
|
|
for gname in CmdGroupMgr.get_cmd_groups(self.cfg):
|
|
if self.init_group(gname):
|
|
for cmd in self.gm.cmd_list:
|
|
self.check_needs_rerun(cmd, build=True)
|
|
self.do_between()
|
|
|
|
self.end_msg()
|
|
|
|
def check_needs_rerun(
|
|
self,
|
|
cmd,
|
|
build = False,
|
|
root = True,
|
|
force_delete = False,
|
|
dpy = False):
|
|
|
|
self.tg.test_name = cmd
|
|
|
|
if self.ct_clsname == 'CmdTestMain' and self.testing_segwit and cmd not in self.tg.segwit_do:
|
|
return False
|
|
|
|
rerun = root # force_delete is not passed to recursive call
|
|
|
|
fns = []
|
|
if force_delete or not root:
|
|
# does cmd produce a required dependency(ies)?
|
|
if deps := self.get_cmd_deps(cmd):
|
|
for ext in deps.exts:
|
|
if fn := get_file_with_ext(cfgs[deps.cfgnum]['tmpdir'], ext, delete=build):
|
|
if force_delete:
|
|
os.unlink(fn)
|
|
else:
|
|
fns.append(fn)
|
|
else:
|
|
rerun = True
|
|
|
|
fdeps = self.generate_file_deps(cmd)
|
|
cdeps = self.generate_cmd_deps(fdeps)
|
|
|
|
for fn in fns:
|
|
my_age = os.stat(fn).st_mtime
|
|
for num, ext in fdeps:
|
|
f = get_file_with_ext(cfgs[num]['tmpdir'], ext, delete=build)
|
|
if f and os.stat(f).st_mtime > my_age:
|
|
rerun = True
|
|
|
|
for cdep in cdeps:
|
|
if self.check_needs_rerun(cdep, build=build, root=False, dpy=cmd):
|
|
rerun = True
|
|
|
|
if build:
|
|
if rerun:
|
|
for fn in fns:
|
|
if not root:
|
|
os.unlink(fn)
|
|
if not (dpy and self.cfg.skipping_deps):
|
|
self.run_test(cmd)
|
|
if not root:
|
|
self.do_between()
|
|
elif rerun or cmd not in self.rebuild_list:
|
|
self.rebuild_list[cmd] = 'rebuild' if rerun and fns else 'build' if rerun else 'OK'
|
|
|
|
return rerun
|
|
|
|
def run_test(self, cmd, sub=False):
|
|
|
|
if self.deps_only and cmd == self.deps_only:
|
|
sys.exit(0)
|
|
|
|
if self.tg.full_data:
|
|
d = [(num, ext) for exts, num in self.gm.dpy_data[cmd].dpy_list for ext in exts]
|
|
# delete files depended on by this cmd
|
|
arg_list = [get_file_with_ext(cfgs[str(num)]['tmpdir'], ext) for num, ext in d]
|
|
|
|
# remove shared_deps from arg list
|
|
if hasattr(self.tg, 'shared_deps'):
|
|
arg_list = arg_list[:-len(self.tg.shared_deps)]
|
|
else:
|
|
arg_list = []
|
|
|
|
if self.resume_cmd:
|
|
if cmd != self.resume_cmd:
|
|
return
|
|
bmsg(f'Resuming at {self.resume_cmd!r}')
|
|
self.resume_cmd = None
|
|
self.cfg.skipping_deps = False
|
|
self.cfg.resuming = False
|
|
|
|
if self.cfg.profile:
|
|
start = time.time()
|
|
|
|
self.tg.test_name = cmd # NB: Do not remove, this needs to be set twice
|
|
|
|
if self.tg.full_data:
|
|
tmpdir_num = self.gm.dpy_data[cmd].tmpdir_num
|
|
self.tg.tmpdir_num = tmpdir_num
|
|
for k in (test_cfg := cfgs[str(tmpdir_num)]):
|
|
if k in self.gm.cfg_attrs:
|
|
setattr(self.tg, k, test_cfg[k])
|
|
|
|
func = getattr(self.tg, cmd)
|
|
ret = asyncio.run(func(*arg_list)) if isAsync(func) else func(*arg_list) # run the test
|
|
|
|
if sub:
|
|
return ret
|
|
|
|
self.process_retval(cmd, ret)
|
|
|
|
if self.cfg.profile:
|
|
omsg('\r\033[50C{:.4f}'.format(time.time() - start))
|
|
|
|
if cmd == self.cfg.exit_after:
|
|
sys.exit(0)
|
|
|
|
def print_warnings(self):
|
|
if self.skipped_warnings:
|
|
print(yellow('The following tests were skipped and may require attention:'))
|
|
r = '-' * 72 + '\n'
|
|
print(r+('\n'+r).join(self.skipped_warnings))
|
|
if self.warnings:
|
|
print(yellow('The following issues were encountered and may require attention:'))
|
|
r = '-' * 72 + '\n'
|
|
print(r+('\n'+r).join(self.warnings))
|
|
|
|
def process_retval(self, cmd, ret):
|
|
match ret:
|
|
case x if type(x).__name__ == 'CmdTestPexpect':
|
|
ret.ok(exit_val=self.exit_val)
|
|
self.cmd_total += 1
|
|
case 'ok':
|
|
ok()
|
|
self.cmd_total += 1
|
|
case 'skip':
|
|
pass
|
|
case 'skip_msg':
|
|
ok('SKIP')
|
|
case 'silent':
|
|
self.cmd_total += 1
|
|
case 'error':
|
|
die(2, red(f'\nTest {self.tg.test_name!r} failed'))
|
|
case (x, _) if x == 'skip_warn':
|
|
wmsg = 'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n')))
|
|
self.skipped_warnings.append(wmsg)
|
|
if self.logging:
|
|
self.log_fd.write(f'WARNING: {wmsg}\n')
|
|
case _:
|
|
die(2, f'{cmd!r} returned {ret}')
|
|
|
|
def warn(self, text):
|
|
ymsg(text)
|
|
wmsg = 'Test ‘{}:{}’: {}'.format(self.tg.group_name, self.tg.test_name, text)
|
|
self.warnings.append(wmsg)
|
|
if self.logging:
|
|
self.log_fd.write(f'WARNING: {wmsg}\n')
|
|
|
|
def check_deps(self, cmds): # TODO: broken, unused
|
|
if len(cmds) != 1:
|
|
die(1, f'Usage: {gc.prog_name} check_deps <command>')
|
|
|
|
cmd = cmds[0]
|
|
|
|
if cmd not in self.gm.cmd_list:
|
|
die(1, f'{cmd!r}: unrecognized command')
|
|
|
|
if not self.cfg.quiet:
|
|
omsg(f'Checking dependencies for {cmd!r}')
|
|
|
|
self.check_needs_rerun(cmd)
|
|
|
|
w = max(map(len, self.rebuild_list)) + 1
|
|
for cmd, desc in self.rebuild_list.items():
|
|
omsg('cmd {:<{w}} {}'.format(cmd+':', capfirst(desc), w=w))
|
|
|
|
def generate_file_deps(self, cmd):
|
|
return [(str(n), e) for exts, n in self.gm.dpy_data[cmd].dpy_list for e in exts]
|
|
|
|
def generate_cmd_deps(self, fdeps):
|
|
return [cfgs[str(n)]['dep_generators'][ext] for n, ext in fdeps]
|
|
|
|
def get_cmd_deps(self, cmd):
|
|
try:
|
|
self.gm.dpy_data[cmd]
|
|
except KeyError:
|
|
qmsg_r(f'Missing dependency {cmd!r}')
|
|
if gname := self.gm.find_cmd_in_groups(cmd):
|
|
kwargs = self.gm.cmd_groups[gname].params | {'add_dpy': True}
|
|
self.gm.create_group(gname, None, **kwargs)
|
|
qmsg(f' found in group {gname!r}')
|
|
else:
|
|
qmsg(' not found in any command group!')
|
|
raise
|
|
num = str(self.gm.dpy_data[cmd].tmpdir_num)
|
|
dep_gens = cfgs[num]['dep_generators']
|
|
if cmd in dep_gens.values():
|
|
cd = namedtuple('cmd_deps', ['cfgnum', 'exts'])
|
|
return cd(num, [k for k in dep_gens if dep_gens[k] == cmd])
|
|
else:
|
|
return None
|