123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- #!/usr/bin/env python3
- #
- # MMGen Wallet, a terminal-based cryptocurrency wallet
- # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- """
- test/tooltest2.py: Test the 'mmgen-tool' utility
- """
- # TODO: move all non-interactive 'mmgen-tool' tests in 'cmdtest.py' here
- # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
- import sys, os, time, importlib, asyncio
- from subprocess import run, PIPE
- try:
- from include import test_init
- except ImportError:
- from test.include import test_init
- from test.include.common import set_globals, end_msg, init_coverage
- from mmgen import main_tool
- from mmgen.cfg import Config
- from mmgen.color import green, blue, purple, cyan, gray
- from mmgen.util import msg, msg_r, Msg, die
- skipped_tests = ['mn2hex_interactive']
- coin_dependent_groups = ('Coin', 'File')
- opts_data = {
- 'text': {
- 'desc': "Simple test suite for the 'mmgen-tool' utility",
- 'usage':'[options] [command]...',
- 'options': """
- -h, --help Print this help message
- -a, --no-altcoin Skip altcoin tests
- -A, --tool-api Test the tool_api subsystem
- -C, --coverage Produce code coverage info using trace module
- -d, --die-on-missing Abort if no test data found for given command
- --, --longhelp Print help message for long (global) options
- -l, --list-tests List the test groups in this test suite
- -L, --list-tested-cmds Output the 'mmgen-tool' commands that are tested by this test suite
- -n, --names Print command names instead of descriptions
- -q, --quiet Produce quieter output
- -t, --type= Specify coin type
- -f, --fork Run commands via tool executable instead of importing tool module
- -v, --verbose Produce more verbose output
- """,
- 'notes': """
- If no command is given, the whole suite of tests is run.
- """
- }
- }
- sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
- cfg = Config(
- opts_data = opts_data,
- init_opts = {
- 'usr_randchars': 0,
- 'hash_preset': '1',
- 'passwd_file': 'test/ref/keyaddrfile_password',
- })
- set_globals(cfg)
- from test.tooltest2_d.data import *
- def fork_cmd(cmd_name, args, opts, stdin_input):
- cmd = (
- tool_cmd_preargs +
- tool_cmd +
- (opts or []) +
- [cmd_name] + args
- )
- vmsg('{} {}'.format(
- green('Executing'),
- cyan(' '.join(cmd))))
- cp = run(cmd, input=stdin_input or None, stdout=PIPE, stderr=PIPE)
- try:
- cmd_out = cp.stdout.decode()
- except:
- cmd_out = cp.stdout
- if cp.stderr:
- vmsg(cp.stderr.strip().decode())
- if cp.returncode != 0:
- import re
- m = re.search(b'tool command returned (None|False)', cp.stderr)
- if m:
- return eval(m.group(1))
- else:
- die(2, f'Spawned program exited with error: {cp.stderr}')
- return cmd_out.strip()
- def call_method(cls, method, cmd_name, args, mmtype, stdin_input):
- vmsg('{a}: {b}{c}'.format(
- a = purple('Running'),
- b = ' '.join([cmd_name]+[repr(e) for e in args]),
- c = ' '+mmtype if mmtype else ''))
- aargs, kwargs = main_tool.process_args(cmd_name, args, cls)
- oq_save = bool(cfg.quiet)
- if not cfg.verbose:
- cfg._set_quiet(True)
- if stdin_input:
- fd0, fd1 = os.pipe()
- if os.fork(): # parent
- os.close(fd1)
- stdin_save = os.dup(0)
- os.dup2(fd0, 0)
- cmd_out = method(*aargs, **kwargs)
- os.dup2(stdin_save, 0)
- os.wait()
- cfg._set_quiet(oq_save)
- return cmd_out
- else: # child
- os.close(fd0)
- os.write(fd1, stdin_input)
- vmsg(f'Input: {stdin_input!r}')
- sys.exit(0)
- else:
- ret = method(*aargs, **kwargs)
- if type(ret).__name__ == 'coroutine':
- ret = asyncio.run(ret)
- cfg._set_quiet(oq_save)
- return ret
- def tool_api(cls, cmd_name, args, opts):
- from mmgen.tool.api import tool_api
- tool = tool_api(cfg)
- if opts:
- for o in opts:
- if o.startswith('--type='):
- tool.addrtype = o.split('=')[1]
- pargs, kwargs = main_tool.process_args(cmd_name, args, cls)
- return getattr(tool, cmd_name)(*pargs, **kwargs)
- def check_output(out, chk):
- if isinstance(chk, str):
- chk = chk.encode()
- if isinstance(out, int):
- out = str(out).encode()
- if isinstance(out, str):
- out = out.encode()
- err_fs = "Output ({!r}) doesn't match expected output ({!r})"
- try:
- outd = out.decode()
- except:
- outd = None
- if type(chk).__name__ == 'function':
- assert chk(outd), f'{chk.__name__}({outd}) failed!'
- elif isinstance(chk, dict):
- for k, v in chk.items():
- if k == 'boolfunc':
- assert v(outd), f'{v.__name__}({outd}) failed!'
- elif k == 'value':
- assert outd == v, err_fs.format(outd, v)
- else:
- outval = getattr(__builtins__, k)(out)
- if outval != v:
- die(1, f'{k}({out}) returned {outval}, not {v}!')
- elif chk is not None:
- assert out == chk, err_fs.format(out, chk)
- def run_test(cls, gid, cmd_name):
- data = tests[gid][cmd_name]
- # behavior is like cmdtest.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC
- if gid in coin_dependent_groups:
- k = '{}_{}'.format(
- (cfg.token.lower() if proto.tokensym else proto.coin.lower()),
- proto.network)
- if k in data:
- data = data[k]
- m2 = f' ({k})'
- else:
- qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
- return
- else:
- if proto.coin != 'BTC' or proto.testnet:
- return
- m2 = ''
- m = '{} {}{}'.format(
- purple('Testing'),
- cmd_name if cfg.names else docstring_head(getattr(cls, cmd_name)),
- m2)
- msg_r(green(m)+'\n' if cfg.verbose else m)
- skipping = False
- for n, d in enumerate(data):
- args, out, opts, mmtype = d + tuple([None] * (4-len(d)))
- if 'fmt=xmrseed' in args and cfg.no_altcoin:
- if not skipping:
- qmsg('')
- skip_msg = f'Skipping altcoin test {cmd_name} {args}'
- qmsg(('' if n else '\n') + gray(skip_msg if len(skip_msg) <= 100 else skip_msg[:97] + '...'))
- skipping = True
- continue
- skipping = False
- stdin_input = None
- if args and isinstance(args[0], bytes):
- stdin_input = args[0]
- args[0] = '-'
- if cfg.tool_api:
- if args and args[0]== '-':
- continue
- cmd_out = tool_api(cls, cmd_name, args, opts)
- elif cfg.fork:
- cmd_out = fork_cmd(cmd_name, args, opts, stdin_input)
- else:
- if stdin_input and sys.platform == 'win32':
- msg(gray('Skipping for MSWin - no os.fork()'))
- continue
- method = getattr(cls(cfg, cmdname=cmd_name, proto=proto, mmtype=mmtype), cmd_name)
- cmd_out = call_method(cls, method, cmd_name, args, mmtype, stdin_input)
- try:
- vmsg(f'Output:\n{cmd_out}\n')
- except:
- vmsg(f'Output:\n{cmd_out!r}\n')
- if isinstance(out, tuple) and type(out[0]).__name__ == 'function':
- func_out = out[0](cmd_out)
- assert func_out == out[1], (
- '{}({}) == {} failed!\nOutput: {}'.format(
- out[0].__name__,
- cmd_out,
- out[1],
- func_out))
- elif isinstance(out, (list, tuple)):
- for co, o in zip(cmd_out.split(NL) if cfg.fork else cmd_out, out):
- check_output(co, o)
- else:
- check_output(cmd_out, out)
- if not cfg.verbose:
- msg_r('.')
- if not cfg.verbose:
- msg('OK')
- def docstring_head(obj):
- return obj.__doc__.strip().split('\n')[0] if obj.__doc__ else None
- def do_group(gid):
- desc = f'command group {gid!r}'
- cls = main_tool.get_mod_cls(gid.lower())
- qmsg(blue('Testing ' +
- desc if cfg.names else
- (docstring_head(cls) or desc)
- ))
- for cmdname in cls(cfg).user_commands:
- if cmdname in skipped_tests:
- continue
- if cmdname not in tests[gid]:
- m = f'No test for command {cmdname!r} in group {gid!r}!'
- if cfg.die_on_missing:
- die(1, m+' Aborting')
- else:
- msg(m)
- continue
- run_test(cls, gid, cmdname)
- def do_cmd_in_group(cmdname):
- cls = main_tool.get_cmd_cls(cmdname)
- for gid, cmds in tests.items():
- for cmd in cmds:
- if cmd == cmdname:
- run_test(cls, gid, cmdname)
- return True
- return False
- def list_tested_cmds():
- for gid in tests:
- Msg('\n'.join(tests[gid]))
- def main():
- if cfg._args:
- for cmd in cfg._args:
- if cmd in tests:
- do_group(cmd)
- else:
- if not do_cmd_in_group(cmd):
- die(1, f'{cmd!r}: not a recognized test or test group')
- else:
- for garg in tests:
- do_group(garg)
- qmsg = cfg._util.qmsg
- vmsg = cfg._util.vmsg
- proto = cfg._proto
- if cfg.tool_api:
- del tests['Wallet']
- del tests['File']
- if cfg.list_tests:
- Msg('Available tests:')
- for modname, cmdlist in main_tool.mods.items():
- cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'), 'tool_cmd')
- Msg(f' {modname:6} - {docstring_head(cls)}')
- sys.exit(0)
- if cfg.list_tested_cmds:
- list_tested_cmds()
- sys.exit(0)
- tool_exec = os.path.relpath(os.path.join('cmds', 'mmgen-tool'))
- if cfg.fork:
- passthru_args = ['coin', 'type', 'testnet', 'token']
- tool_cmd = [tool_exec, '--skip-cfg-file'] + [
- '--{}{}'.format(
- k.replace('_', '-'),
- '='+getattr(cfg, k) if getattr(cfg, k) is not True else '')
- for k in passthru_args if getattr(cfg, k)]
- if cfg.coverage:
- d, f = init_coverage()
- tool_cmd_preargs = ['python3', '-m', 'trace', '--count', '--coverdir='+d, '--file='+f]
- else:
- tool_cmd_preargs = ['python3', 'scripts/exec_wrapper.py']
- from mmgen.main import launch
- start_time = int(time.time())
- launch(func=main)
- end_msg(int(time.time()) - start_time)
|