123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- #!/usr/bin/env python3
- #
- # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
- # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- """
- test/tooltest2.py: Test the 'mmgen-tool' utility
- """
- # TODO: move all non-interactive 'mmgen-tool' tests in 'cmdtest.py' here
- # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
- import sys,os,time,importlib
- from subprocess import run,PIPE
- try:
- from include import test_init
- except ImportError:
- from test.include import test_init
- from test.include.common import set_globals,end_msg,init_coverage
- from mmgen import main_tool
- from mmgen.cfg import Config
- from mmgen.color import green,blue,purple,cyan,gray
- from mmgen.util import msg,msg_r,Msg,async_run,die
- skipped_tests = ['mn2hex_interactive']
- coin_dependent_groups = ('Coin','File')
- opts_data = {
- 'text': {
- 'desc': "Simple test suite for the 'mmgen-tool' utility",
- 'usage':'[options] [command]...',
- 'options': """
- -h, --help Print this help message
- -a, --no-altcoin Skip altcoin tests
- -A, --tool-api Test the tool_api subsystem
- -C, --coverage Produce code coverage info using trace module
- -d, --die-on-missing Abort if no test data found for given command
- --, --longhelp Print help message for long options (common options)
- -l, --list-tests List the test groups in this test suite
- -L, --list-tested-cmds Output the 'mmgen-tool' commands that are tested by this test suite
- -n, --names Print command names instead of descriptions
- -q, --quiet Produce quieter output
- -t, --type= Specify coin type
- -f, --fork Run commands via tool executable instead of importing tool module
- -v, --verbose Produce more verbose output
- """,
- 'notes': """
- If no command is given, the whole suite of tests is run.
- """
- }
- }
- sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
- cfg = Config(
- opts_data = opts_data,
- init_opts = {
- 'usr_randchars': 0,
- 'hash_preset': '1',
- 'passwd_file': 'test/ref/keyaddrfile_password',
- })
- set_globals(cfg)
- from test.tooltest2_d.data import *
- def fork_cmd(cmd_name,args,opts,stdin_input):
- cmd = (
- tool_cmd_preargs +
- tool_cmd +
- (opts or []) +
- [cmd_name] + args
- )
- vmsg('{} {}'.format(
- green('Executing'),
- cyan(' '.join(cmd)) ))
- cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
- try:
- cmd_out = cp.stdout.decode()
- except:
- cmd_out = cp.stdout
- if cp.stderr:
- vmsg(cp.stderr.strip().decode())
- if cp.returncode != 0:
- import re
- m = re.search(b'tool command returned (None|False)',cp.stdout)
- if m:
- return { b'None': None, b'False': False }[m.group(1)]
- else:
- die(2,f'Spawned program exited with error: {cp.stderr}')
- return cmd_out.strip()
- async def call_method(cls,method,cmd_name,args,mmtype,stdin_input):
- vmsg('{a}: {b}{c}'.format(
- a = purple('Running'),
- b = ' '.join([cmd_name]+[repr(e) for e in args]),
- c = ' '+mmtype if mmtype else '' ))
- aargs,kwargs = main_tool.process_args(cmd_name,args,cls)
- oq_save = bool(cfg.quiet)
- if not cfg.verbose:
- cfg.quiet = True
- if stdin_input:
- fd0,fd1 = os.pipe()
- if os.fork(): # parent
- os.close(fd1)
- stdin_save = os.dup(0)
- os.dup2(fd0,0)
- cmd_out = method(*aargs,**kwargs)
- os.dup2(stdin_save,0)
- os.wait()
- cfg.quiet = oq_save
- return cmd_out
- else: # child
- os.close(fd0)
- os.write(fd1,stdin_input)
- vmsg(f'Input: {stdin_input!r}')
- sys.exit(0)
- else:
- ret = method(*aargs,**kwargs)
- if type(ret).__name__ == 'coroutine':
- ret = await ret
- cfg.quiet = oq_save
- return ret
- def tool_api(cls,cmd_name,args,opts):
- from mmgen.tool.api import tool_api
- tool = tool_api(cfg)
- if opts:
- for o in opts:
- if o.startswith('--type='):
- tool.addrtype = o.split('=')[1]
- pargs,kwargs = main_tool.process_args(cmd_name,args,cls)
- return getattr(tool,cmd_name)(*pargs,**kwargs)
- def check_output(out,chk):
- if isinstance(chk,str):
- chk = chk.encode()
- if isinstance(out,int):
- out = str(out).encode()
- if isinstance(out,str):
- out = out.encode()
- err_fs = "Output ({!r}) doesn't match expected output ({!r})"
- try:
- outd = out.decode()
- except:
- outd = None
- if type(chk).__name__ == 'function':
- assert chk(outd), f'{chk.__name__}({outd}) failed!'
- elif isinstance(chk,dict):
- for k,v in chk.items():
- if k == 'boolfunc':
- assert v(outd), f'{v.__name__}({outd}) failed!'
- elif k == 'value':
- assert outd == v, err_fs.format(outd,v)
- else:
- outval = getattr(__builtins__,k)(out)
- if outval != v:
- die(1,f'{k}({out}) returned {outval}, not {v}!')
- elif chk is not None:
- assert out == chk, err_fs.format(out,chk)
- async def run_test(cls,gid,cmd_name):
- data = tests[gid][cmd_name]
- # behavior is like cmdtest.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC
- if gid in coin_dependent_groups:
- k = '{}_{}'.format(
- ( cfg.token.lower() if proto.tokensym else proto.coin.lower() ),
- ('mainnet','testnet')[proto.testnet] )
- if k in data:
- data = data[k]
- m2 = f' ({k})'
- else:
- qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
- return
- else:
- if proto.coin != 'BTC' or proto.testnet:
- return
- m2 = ''
- m = '{} {}{}'.format(
- purple('Testing'),
- cmd_name if cfg.names else docstring_head(getattr(cls,cmd_name)),
- m2 )
- msg_r(green(m)+'\n' if cfg.verbose else m)
- skipping = False
- for n,d in enumerate(data):
- args,out,opts,mmtype = d + tuple([None] * (4-len(d)))
- if 'fmt=xmrseed' in args and cfg.no_altcoin:
- if not skipping:
- qmsg('')
- qmsg(('' if n else '\n') + gray(f'Skipping altcoin test {cmd_name} {args}'))
- skipping = True
- continue
- else:
- skipping = False
- stdin_input = None
- if args and isinstance(args[0],bytes):
- stdin_input = args[0]
- args[0] = '-'
- if cfg.tool_api:
- if args and args[0 ]== '-':
- continue
- cmd_out = tool_api(cls,cmd_name,args,opts)
- elif cfg.fork:
- cmd_out = fork_cmd(cmd_name,args,opts,stdin_input)
- else:
- if stdin_input and sys.platform == 'win32':
- msg('Skipping for MSWin - no os.fork()')
- continue
- method = getattr(cls(cfg,cmdname=cmd_name,proto=proto,mmtype=mmtype),cmd_name)
- cmd_out = await call_method(cls,method,cmd_name,args,mmtype,stdin_input)
- try:
- vmsg(f'Output:\n{cmd_out}\n')
- except:
- vmsg(f'Output:\n{cmd_out!r}\n')
- if isinstance(out,tuple) and type(out[0]).__name__ == 'function':
- func_out = out[0](cmd_out)
- assert func_out == out[1],(
- '{}({}) == {} failed!\nOutput: {}'.format(
- out[0].__name__,
- cmd_out,
- out[1],
- func_out ))
- elif isinstance(out,(list,tuple)):
- for co,o in zip(cmd_out.split(NL) if cfg.fork else cmd_out,out):
- check_output(co,o)
- else:
- check_output(cmd_out,out)
- if not cfg.verbose:
- msg_r('.')
- if not cfg.verbose:
- msg('OK')
- def docstring_head(obj):
- return obj.__doc__.strip().split('\n')[0] if obj.__doc__ else None
- async def do_group(gid):
- desc = f'command group {gid!r}'
- cls = main_tool.get_mod_cls(gid.lower())
- qmsg(blue('Testing ' +
- desc if cfg.names else
- ( docstring_head(cls) or desc )
- ))
- for cmdname in cls(cfg).user_commands:
- if cmdname in skipped_tests:
- continue
- if cmdname not in tests[gid]:
- m = f'No test for command {cmdname!r} in group {gid!r}!'
- if cfg.die_on_missing:
- die(1,m+' Aborting')
- else:
- msg(m)
- continue
- await run_test(cls,gid,cmdname)
- async def do_cmd_in_group(cmdname):
- cls = main_tool.get_cmd_cls(cmdname)
- for gid,cmds in tests.items():
- for cmd in cmds:
- if cmd == cmdname:
- await run_test(cls,gid,cmdname)
- return True
- return False
- def list_tested_cmds():
- for gid in tests:
- Msg('\n'.join(tests[gid]))
- async def main():
- if cfg._args:
- for cmd in cfg._args:
- if cmd in tests:
- await do_group(cmd)
- else:
- if not await do_cmd_in_group(cmd):
- die(1,f'{cmd!r}: not a recognized test or test group')
- else:
- for garg in tests:
- await do_group(garg)
- qmsg = cfg._util.qmsg
- vmsg = cfg._util.vmsg
- proto = cfg._proto
- if cfg.tool_api:
- del tests['Wallet']
- del tests['File']
- if cfg.list_tests:
- Msg('Available tests:')
- for modname,cmdlist in main_tool.mods.items():
- cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
- Msg(f' {modname:6} - {docstring_head(cls)}')
- sys.exit(0)
- if cfg.list_tested_cmds:
- list_tested_cmds()
- sys.exit(0)
- tool_exec = os.path.relpath(os.path.join('cmds','mmgen-tool'))
- if cfg.fork:
- passthru_args = ['coin','type','testnet','token']
- tool_cmd = [ tool_exec, '--skip-cfg-file' ] + [
- '--{}{}'.format(
- k.replace('_','-'),
- '='+getattr(cfg,k) if getattr(cfg,k) is not True else '')
- for k in passthru_args if getattr(cfg,k) ]
- if cfg.coverage:
- d,f = init_coverage()
- tool_cmd_preargs = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f]
- else:
- tool_cmd_preargs = ['python3','scripts/exec_wrapper.py']
- start_time = int(time.time())
- if __name__ == '__main__':
- async_run(main())
- end_msg(int(time.time()) - start_time)
|