348 lines
9.3 KiB
Python
Executable file
348 lines
9.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#
|
|
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
|
|
# Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
test/tooltest2.py: Test the 'mmgen-tool' utility
|
|
"""
|
|
|
|
# TODO: move all non-interactive 'mmgen-tool' tests in 'cmdtest.py' here
|
|
# TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
|
|
|
|
import sys, os, time, importlib, asyncio
|
|
from subprocess import run,PIPE
|
|
|
|
try:
|
|
from include import test_init
|
|
except ImportError:
|
|
from test.include import test_init
|
|
|
|
from test.include.common import set_globals,end_msg,init_coverage
|
|
|
|
from mmgen import main_tool
|
|
from mmgen.cfg import Config
|
|
from mmgen.color import green,blue,purple,cyan,gray
|
|
from mmgen.util import msg, msg_r, Msg, die
|
|
|
|
skipped_tests = ['mn2hex_interactive']
|
|
coin_dependent_groups = ('Coin','File')
|
|
|
|
opts_data = {
|
|
'text': {
|
|
'desc': "Simple test suite for the 'mmgen-tool' utility",
|
|
'usage':'[options] [command]...',
|
|
'options': """
|
|
-h, --help Print this help message
|
|
-a, --no-altcoin Skip altcoin tests
|
|
-A, --tool-api Test the tool_api subsystem
|
|
-C, --coverage Produce code coverage info using trace module
|
|
-d, --die-on-missing Abort if no test data found for given command
|
|
--, --longhelp Print help message for long (global) options
|
|
-l, --list-tests List the test groups in this test suite
|
|
-L, --list-tested-cmds Output the 'mmgen-tool' commands that are tested by this test suite
|
|
-n, --names Print command names instead of descriptions
|
|
-q, --quiet Produce quieter output
|
|
-t, --type= Specify coin type
|
|
-f, --fork Run commands via tool executable instead of importing tool module
|
|
-v, --verbose Produce more verbose output
|
|
""",
|
|
'notes': """
|
|
|
|
If no command is given, the whole suite of tests is run.
|
|
"""
|
|
}
|
|
}
|
|
|
|
sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
|
|
|
|
cfg = Config(
|
|
opts_data = opts_data,
|
|
init_opts = {
|
|
'usr_randchars': 0,
|
|
'hash_preset': '1',
|
|
'passwd_file': 'test/ref/keyaddrfile_password',
|
|
})
|
|
|
|
set_globals(cfg)
|
|
|
|
from test.tooltest2_d.data import *
|
|
|
|
def fork_cmd(cmd_name,args,opts,stdin_input):
|
|
cmd = (
|
|
tool_cmd_preargs +
|
|
tool_cmd +
|
|
(opts or []) +
|
|
[cmd_name] + args
|
|
)
|
|
vmsg('{} {}'.format(
|
|
green('Executing'),
|
|
cyan(' '.join(cmd)) ))
|
|
cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
|
|
try:
|
|
cmd_out = cp.stdout.decode()
|
|
except:
|
|
cmd_out = cp.stdout
|
|
if cp.stderr:
|
|
vmsg(cp.stderr.strip().decode())
|
|
if cp.returncode != 0:
|
|
import re
|
|
m = re.search(b'tool command returned (None|False)',cp.stderr)
|
|
if m:
|
|
return eval(m.group(1))
|
|
else:
|
|
die(2,f'Spawned program exited with error: {cp.stderr}')
|
|
|
|
return cmd_out.strip()
|
|
|
|
def call_method(cls, method, cmd_name, args, mmtype, stdin_input):
|
|
vmsg('{a}: {b}{c}'.format(
|
|
a = purple('Running'),
|
|
b = ' '.join([cmd_name]+[repr(e) for e in args]),
|
|
c = ' '+mmtype if mmtype else '' ))
|
|
aargs,kwargs = main_tool.process_args(cmd_name,args,cls)
|
|
oq_save = bool(cfg.quiet)
|
|
if not cfg.verbose:
|
|
cfg._set_quiet(True)
|
|
if stdin_input:
|
|
fd0,fd1 = os.pipe()
|
|
if os.fork(): # parent
|
|
os.close(fd1)
|
|
stdin_save = os.dup(0)
|
|
os.dup2(fd0,0)
|
|
cmd_out = method(*aargs,**kwargs)
|
|
os.dup2(stdin_save,0)
|
|
os.wait()
|
|
cfg._set_quiet(oq_save)
|
|
return cmd_out
|
|
else: # child
|
|
os.close(fd0)
|
|
os.write(fd1,stdin_input)
|
|
vmsg(f'Input: {stdin_input!r}')
|
|
sys.exit(0)
|
|
else:
|
|
ret = method(*aargs,**kwargs)
|
|
if type(ret).__name__ == 'coroutine':
|
|
ret = asyncio.run(ret)
|
|
cfg._set_quiet(oq_save)
|
|
return ret
|
|
|
|
def tool_api(cls,cmd_name,args,opts):
|
|
from mmgen.tool.api import tool_api
|
|
tool = tool_api(cfg)
|
|
if opts:
|
|
for o in opts:
|
|
if o.startswith('--type='):
|
|
tool.addrtype = o.split('=')[1]
|
|
pargs,kwargs = main_tool.process_args(cmd_name,args,cls)
|
|
return getattr(tool,cmd_name)(*pargs,**kwargs)
|
|
|
|
def check_output(out,chk):
|
|
if isinstance(chk,str):
|
|
chk = chk.encode()
|
|
if isinstance(out,int):
|
|
out = str(out).encode()
|
|
if isinstance(out,str):
|
|
out = out.encode()
|
|
err_fs = "Output ({!r}) doesn't match expected output ({!r})"
|
|
try:
|
|
outd = out.decode()
|
|
except:
|
|
outd = None
|
|
|
|
if type(chk).__name__ == 'function':
|
|
assert chk(outd), f'{chk.__name__}({outd}) failed!'
|
|
elif isinstance(chk,dict):
|
|
for k,v in chk.items():
|
|
if k == 'boolfunc':
|
|
assert v(outd), f'{v.__name__}({outd}) failed!'
|
|
elif k == 'value':
|
|
assert outd == v, err_fs.format(outd,v)
|
|
else:
|
|
outval = getattr(__builtins__,k)(out)
|
|
if outval != v:
|
|
die(1,f'{k}({out}) returned {outval}, not {v}!')
|
|
elif chk is not None:
|
|
assert out == chk, err_fs.format(out,chk)
|
|
|
|
def run_test(cls, gid, cmd_name):
|
|
data = tests[gid][cmd_name]
|
|
|
|
# behavior is like cmdtest.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC
|
|
if gid in coin_dependent_groups:
|
|
k = '{}_{}'.format(
|
|
(cfg.token.lower() if proto.tokensym else proto.coin.lower()),
|
|
proto.network)
|
|
if k in data:
|
|
data = data[k]
|
|
m2 = f' ({k})'
|
|
else:
|
|
qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
|
|
return
|
|
else:
|
|
if proto.coin != 'BTC' or proto.testnet:
|
|
return
|
|
m2 = ''
|
|
|
|
m = '{} {}{}'.format(
|
|
purple('Testing'),
|
|
cmd_name if cfg.names else docstring_head(getattr(cls,cmd_name)),
|
|
m2 )
|
|
|
|
msg_r(green(m)+'\n' if cfg.verbose else m)
|
|
skipping = False
|
|
|
|
for n,d in enumerate(data):
|
|
args,out,opts,mmtype = d + tuple([None] * (4-len(d)))
|
|
if 'fmt=xmrseed' in args and cfg.no_altcoin:
|
|
if not skipping:
|
|
qmsg('')
|
|
skip_msg = f'Skipping altcoin test {cmd_name} {args}'
|
|
qmsg(('' if n else '\n') + gray(skip_msg if len(skip_msg) <= 100 else skip_msg[:97] + '...'))
|
|
skipping = True
|
|
continue
|
|
skipping = False
|
|
stdin_input = None
|
|
if args and isinstance(args[0],bytes):
|
|
stdin_input = args[0]
|
|
args[0] = '-'
|
|
|
|
if cfg.tool_api:
|
|
if args and args[0 ]== '-':
|
|
continue
|
|
cmd_out = tool_api(cls,cmd_name,args,opts)
|
|
elif cfg.fork:
|
|
cmd_out = fork_cmd(cmd_name,args,opts,stdin_input)
|
|
else:
|
|
if stdin_input and sys.platform == 'win32':
|
|
msg(gray('Skipping for MSWin - no os.fork()'))
|
|
continue
|
|
method = getattr(cls(cfg,cmdname=cmd_name,proto=proto,mmtype=mmtype),cmd_name)
|
|
cmd_out = call_method(cls, method, cmd_name, args, mmtype, stdin_input)
|
|
|
|
try:
|
|
vmsg(f'Output:\n{cmd_out}\n')
|
|
except:
|
|
vmsg(f'Output:\n{cmd_out!r}\n')
|
|
|
|
if isinstance(out,tuple) and type(out[0]).__name__ == 'function':
|
|
func_out = out[0](cmd_out)
|
|
assert func_out == out[1],(
|
|
'{}({}) == {} failed!\nOutput: {}'.format(
|
|
out[0].__name__,
|
|
cmd_out,
|
|
out[1],
|
|
func_out ))
|
|
elif isinstance(out,(list,tuple)):
|
|
for co,o in zip(cmd_out.split(NL) if cfg.fork else cmd_out,out):
|
|
check_output(co,o)
|
|
else:
|
|
check_output(cmd_out,out)
|
|
|
|
if not cfg.verbose:
|
|
msg_r('.')
|
|
|
|
if not cfg.verbose:
|
|
msg('OK')
|
|
|
|
def docstring_head(obj):
|
|
return obj.__doc__.strip().split('\n')[0] if obj.__doc__ else None
|
|
|
|
def do_group(gid):
|
|
desc = f'command group {gid!r}'
|
|
cls = main_tool.get_mod_cls(gid.lower())
|
|
qmsg(blue('Testing ' +
|
|
desc if cfg.names else
|
|
( docstring_head(cls) or desc )
|
|
))
|
|
|
|
for cmdname in cls(cfg).user_commands:
|
|
if cmdname in skipped_tests:
|
|
continue
|
|
if cmdname not in tests[gid]:
|
|
m = f'No test for command {cmdname!r} in group {gid!r}!'
|
|
if cfg.die_on_missing:
|
|
die(1,m+' Aborting')
|
|
else:
|
|
msg(m)
|
|
continue
|
|
run_test(cls,gid,cmdname)
|
|
|
|
def do_cmd_in_group(cmdname):
|
|
cls = main_tool.get_cmd_cls(cmdname)
|
|
for gid,cmds in tests.items():
|
|
for cmd in cmds:
|
|
if cmd == cmdname:
|
|
run_test(cls,gid,cmdname)
|
|
return True
|
|
return False
|
|
|
|
def list_tested_cmds():
|
|
for gid in tests:
|
|
Msg('\n'.join(tests[gid]))
|
|
|
|
def main():
|
|
if cfg._args:
|
|
for cmd in cfg._args:
|
|
if cmd in tests:
|
|
do_group(cmd)
|
|
else:
|
|
if not do_cmd_in_group(cmd):
|
|
die(1,f'{cmd!r}: not a recognized test or test group')
|
|
else:
|
|
for garg in tests:
|
|
do_group(garg)
|
|
|
|
qmsg = cfg._util.qmsg
|
|
vmsg = cfg._util.vmsg
|
|
|
|
proto = cfg._proto
|
|
|
|
if cfg.tool_api:
|
|
del tests['Wallet']
|
|
del tests['File']
|
|
|
|
if cfg.list_tests:
|
|
Msg('Available tests:')
|
|
for modname,cmdlist in main_tool.mods.items():
|
|
cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
|
|
Msg(f' {modname:6} - {docstring_head(cls)}')
|
|
sys.exit(0)
|
|
|
|
if cfg.list_tested_cmds:
|
|
list_tested_cmds()
|
|
sys.exit(0)
|
|
|
|
tool_exec = os.path.relpath(os.path.join('cmds','mmgen-tool'))
|
|
|
|
if cfg.fork:
|
|
passthru_args = ['coin','type','testnet','token']
|
|
tool_cmd = [ tool_exec, '--skip-cfg-file' ] + [
|
|
'--{}{}'.format(
|
|
k.replace('_','-'),
|
|
'='+getattr(cfg,k) if getattr(cfg,k) is not True else '')
|
|
for k in passthru_args if getattr(cfg,k) ]
|
|
|
|
if cfg.coverage:
|
|
d,f = init_coverage()
|
|
tool_cmd_preargs = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f]
|
|
else:
|
|
tool_cmd_preargs = ['python3','scripts/exec_wrapper.py']
|
|
|
|
from mmgen.main import launch
|
|
start_time = int(time.time())
|
|
launch(func=main)
|
|
end_msg(int(time.time()) - start_time)
|