#!/usr/bin/env python3 # # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution # Copyright (C)2013-2023 The MMGen Project # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ test/tooltest2.py: Test the 'mmgen-tool' utility """ # TODO: move all non-interactive 'mmgen-tool' tests in 'cmdtest.py' here # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?) import sys,os,time,importlib from subprocess import run,PIPE try: from include import test_init except ImportError: from test.include import test_init from test.include.common import set_globals,end_msg,init_coverage from mmgen import main_tool from mmgen.cfg import Config from mmgen.color import green,blue,purple,cyan,gray from mmgen.util import msg,msg_r,Msg,async_run,die skipped_tests = ['mn2hex_interactive'] coin_dependent_groups = ('Coin','File') opts_data = { 'text': { 'desc': "Simple test suite for the 'mmgen-tool' utility", 'usage':'[options] [command]...', 'options': """ -h, --help Print this help message -a, --no-altcoin Skip altcoin tests -A, --tool-api Test the tool_api subsystem -C, --coverage Produce code coverage info using trace module -d, --die-on-missing Abort if no test data found for given command --, --longhelp Print help message for long options (common options) -l, --list-tests List the test groups in this test suite -L, --list-tested-cmds Output the 'mmgen-tool' commands that are tested by this test suite -n, --names Print command names instead of descriptions -q, --quiet Produce quieter output -t, --type= Specify coin type -f, --fork Run commands via tool executable instead of importing tool module -v, --verbose Produce more verbose output """, 'notes': """ If no command is given, the whole suite of tests is run. """ } } sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:] cfg = Config( opts_data = opts_data, init_opts = { 'usr_randchars': 0, 'hash_preset': '1', 'passwd_file': 'test/ref/keyaddrfile_password', }) set_globals(cfg) from test.tooltest2_d.data import * def fork_cmd(cmd_name,args,opts,stdin_input): cmd = ( tool_cmd_preargs + tool_cmd + (opts or []) + [cmd_name] + args ) vmsg('{} {}'.format( green('Executing'), cyan(' '.join(cmd)) )) cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE) try: cmd_out = cp.stdout.decode() except: cmd_out = cp.stdout if cp.stderr: vmsg(cp.stderr.strip().decode()) if cp.returncode != 0: import re m = re.search(b'tool command returned (None|False)',cp.stdout) if m: return { b'None': None, b'False': False }[m.group(1)] else: die(2,f'Spawned program exited with error: {cp.stderr}') return cmd_out.strip() async def call_method(cls,method,cmd_name,args,mmtype,stdin_input): vmsg('{a}: {b}{c}'.format( a = purple('Running'), b = ' '.join([cmd_name]+[repr(e) for e in args]), c = ' '+mmtype if mmtype else '' )) aargs,kwargs = main_tool.process_args(cmd_name,args,cls) oq_save = bool(cfg.quiet) if not cfg.verbose: cfg.quiet = True if stdin_input: fd0,fd1 = os.pipe() if os.fork(): # parent os.close(fd1) stdin_save = os.dup(0) os.dup2(fd0,0) cmd_out = method(*aargs,**kwargs) os.dup2(stdin_save,0) os.wait() cfg.quiet = oq_save return cmd_out else: # child os.close(fd0) os.write(fd1,stdin_input) vmsg(f'Input: {stdin_input!r}') sys.exit(0) else: ret = method(*aargs,**kwargs) if type(ret).__name__ == 'coroutine': ret = await ret cfg.quiet = oq_save return ret def tool_api(cls,cmd_name,args,opts): from mmgen.tool.api import tool_api tool = tool_api(cfg) if opts: for o in opts: if o.startswith('--type='): tool.addrtype = o.split('=')[1] pargs,kwargs = main_tool.process_args(cmd_name,args,cls) return getattr(tool,cmd_name)(*pargs,**kwargs) def check_output(out,chk): if isinstance(chk,str): chk = chk.encode() if isinstance(out,int): out = str(out).encode() if isinstance(out,str): out = out.encode() err_fs = "Output ({!r}) doesn't match expected output ({!r})" try: outd = out.decode() except: outd = None if type(chk).__name__ == 'function': assert chk(outd), f'{chk.__name__}({outd}) failed!' elif isinstance(chk,dict): for k,v in chk.items(): if k == 'boolfunc': assert v(outd), f'{v.__name__}({outd}) failed!' elif k == 'value': assert outd == v, err_fs.format(outd,v) else: outval = getattr(__builtins__,k)(out) if outval != v: die(1,f'{k}({out}) returned {outval}, not {v}!') elif chk is not None: assert out == chk, err_fs.format(out,chk) async def run_test(cls,gid,cmd_name): data = tests[gid][cmd_name] # behavior is like cmdtest.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC if gid in coin_dependent_groups: k = '{}_{}'.format( ( cfg.token.lower() if proto.tokensym else proto.coin.lower() ), ('mainnet','testnet')[proto.testnet] ) if k in data: data = data[k] m2 = f' ({k})' else: qmsg(f'-- no data for {cmd_name} ({k}) - skipping') return else: if proto.coin != 'BTC' or proto.testnet: return m2 = '' m = '{} {}{}'.format( purple('Testing'), cmd_name if cfg.names else docstring_head(getattr(cls,cmd_name)), m2 ) msg_r(green(m)+'\n' if cfg.verbose else m) skipping = False for n,d in enumerate(data): args,out,opts,mmtype = d + tuple([None] * (4-len(d))) if 'fmt=xmrseed' in args and cfg.no_altcoin: if not skipping: qmsg('') qmsg(('' if n else '\n') + gray(f'Skipping altcoin test {cmd_name} {args}')) skipping = True continue else: skipping = False stdin_input = None if args and isinstance(args[0],bytes): stdin_input = args[0] args[0] = '-' if cfg.tool_api: if args and args[0 ]== '-': continue cmd_out = tool_api(cls,cmd_name,args,opts) elif cfg.fork: cmd_out = fork_cmd(cmd_name,args,opts,stdin_input) else: if stdin_input and sys.platform == 'win32': msg('Skipping for MSWin - no os.fork()') continue method = getattr(cls(cfg,cmdname=cmd_name,proto=proto,mmtype=mmtype),cmd_name) cmd_out = await call_method(cls,method,cmd_name,args,mmtype,stdin_input) try: vmsg(f'Output:\n{cmd_out}\n') except: vmsg(f'Output:\n{cmd_out!r}\n') if isinstance(out,tuple) and type(out[0]).__name__ == 'function': func_out = out[0](cmd_out) assert func_out == out[1],( '{}({}) == {} failed!\nOutput: {}'.format( out[0].__name__, cmd_out, out[1], func_out )) elif isinstance(out,(list,tuple)): for co,o in zip(cmd_out.split(NL) if cfg.fork else cmd_out,out): check_output(co,o) else: check_output(cmd_out,out) if not cfg.verbose: msg_r('.') if not cfg.verbose: msg('OK') def docstring_head(obj): return obj.__doc__.strip().split('\n')[0] if obj.__doc__ else None async def do_group(gid): desc = f'command group {gid!r}' cls = main_tool.get_mod_cls(gid.lower()) qmsg(blue('Testing ' + desc if cfg.names else ( docstring_head(cls) or desc ) )) for cmdname in cls(cfg).user_commands: if cmdname in skipped_tests: continue if cmdname not in tests[gid]: m = f'No test for command {cmdname!r} in group {gid!r}!' if cfg.die_on_missing: die(1,m+' Aborting') else: msg(m) continue await run_test(cls,gid,cmdname) async def do_cmd_in_group(cmdname): cls = main_tool.get_cmd_cls(cmdname) for gid,cmds in tests.items(): for cmd in cmds: if cmd == cmdname: await run_test(cls,gid,cmdname) return True return False def list_tested_cmds(): for gid in tests: Msg('\n'.join(tests[gid])) async def main(): if cfg._args: for cmd in cfg._args: if cmd in tests: await do_group(cmd) else: if not await do_cmd_in_group(cmd): die(1,f'{cmd!r}: not a recognized test or test group') else: for garg in tests: await do_group(garg) qmsg = cfg._util.qmsg vmsg = cfg._util.vmsg proto = cfg._proto if cfg.tool_api: del tests['Wallet'] del tests['File'] if cfg.list_tests: Msg('Available tests:') for modname,cmdlist in main_tool.mods.items(): cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd') Msg(f' {modname:6} - {docstring_head(cls)}') sys.exit(0) if cfg.list_tested_cmds: list_tested_cmds() sys.exit(0) tool_exec = os.path.relpath(os.path.join('cmds','mmgen-tool')) if cfg.fork: passthru_args = ['coin','type','testnet','token'] tool_cmd = [ tool_exec, '--skip-cfg-file' ] + [ '--{}{}'.format( k.replace('_','-'), '='+getattr(cfg,k) if getattr(cfg,k) is not True else '') for k in passthru_args if getattr(cfg,k) ] if cfg.coverage: d,f = init_coverage() tool_cmd_preargs = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f] else: tool_cmd_preargs = ['python3','scripts/exec_wrapper.py'] from mmgen.main import launch start_time = int(time.time()) launch(func = lambda: async_run(main())) end_msg(int(time.time()) - start_time)