tooltest2.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/tooltest2.py: Test the 'mmgen-tool' utility
  20. """
  21. # TODO: move all non-interactive 'mmgen-tool' tests in 'cmdtest.py' here
  22. # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
  23. import sys,os,time,importlib
  24. from subprocess import run,PIPE
  25. try:
  26. from include import test_init
  27. except ImportError:
  28. from test.include import test_init
  29. from test.include.common import set_globals,end_msg,init_coverage
  30. from mmgen import main_tool
  31. from mmgen.cfg import Config
  32. from mmgen.color import green,blue,purple,cyan,gray
  33. from mmgen.util import msg,msg_r,Msg,async_run,die
  34. skipped_tests = ['mn2hex_interactive']
  35. coin_dependent_groups = ('Coin','File')
  36. opts_data = {
  37. 'text': {
  38. 'desc': "Simple test suite for the 'mmgen-tool' utility",
  39. 'usage':'[options] [command]...',
  40. 'options': """
  41. -h, --help Print this help message
  42. -a, --no-altcoin Skip altcoin tests
  43. -A, --tool-api Test the tool_api subsystem
  44. -C, --coverage Produce code coverage info using trace module
  45. -d, --die-on-missing Abort if no test data found for given command
  46. --, --longhelp Print help message for long options (common options)
  47. -l, --list-tests List the test groups in this test suite
  48. -L, --list-tested-cmds Output the 'mmgen-tool' commands that are tested by this test suite
  49. -n, --names Print command names instead of descriptions
  50. -q, --quiet Produce quieter output
  51. -t, --type= Specify coin type
  52. -f, --fork Run commands via tool executable instead of importing tool module
  53. -v, --verbose Produce more verbose output
  54. """,
  55. 'notes': """
  56. If no command is given, the whole suite of tests is run.
  57. """
  58. }
  59. }
  60. sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
  61. cfg = Config(
  62. opts_data = opts_data,
  63. init_opts = {
  64. 'usr_randchars': 0,
  65. 'hash_preset': '1',
  66. 'passwd_file': 'test/ref/keyaddrfile_password',
  67. })
  68. set_globals(cfg)
  69. from test.tooltest2_d.data import *
  70. def fork_cmd(cmd_name,args,opts,stdin_input):
  71. cmd = (
  72. tool_cmd_preargs +
  73. tool_cmd +
  74. (opts or []) +
  75. [cmd_name] + args
  76. )
  77. vmsg('{} {}'.format(
  78. green('Executing'),
  79. cyan(' '.join(cmd)) ))
  80. cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
  81. try:
  82. cmd_out = cp.stdout.decode()
  83. except:
  84. cmd_out = cp.stdout
  85. if cp.stderr:
  86. vmsg(cp.stderr.strip().decode())
  87. if cp.returncode != 0:
  88. import re
  89. m = re.search(b'tool command returned (None|False)',cp.stdout)
  90. if m:
  91. return { b'None': None, b'False': False }[m.group(1)]
  92. else:
  93. die(2,f'Spawned program exited with error: {cp.stderr}')
  94. return cmd_out.strip()
  95. async def call_method(cls,method,cmd_name,args,mmtype,stdin_input):
  96. vmsg('{a}: {b}{c}'.format(
  97. a = purple('Running'),
  98. b = ' '.join([cmd_name]+[repr(e) for e in args]),
  99. c = ' '+mmtype if mmtype else '' ))
  100. aargs,kwargs = main_tool.process_args(cmd_name,args,cls)
  101. oq_save = bool(cfg.quiet)
  102. if not cfg.verbose:
  103. cfg.quiet = True
  104. if stdin_input:
  105. fd0,fd1 = os.pipe()
  106. if os.fork(): # parent
  107. os.close(fd1)
  108. stdin_save = os.dup(0)
  109. os.dup2(fd0,0)
  110. cmd_out = method(*aargs,**kwargs)
  111. os.dup2(stdin_save,0)
  112. os.wait()
  113. cfg.quiet = oq_save
  114. return cmd_out
  115. else: # child
  116. os.close(fd0)
  117. os.write(fd1,stdin_input)
  118. vmsg(f'Input: {stdin_input!r}')
  119. sys.exit(0)
  120. else:
  121. ret = method(*aargs,**kwargs)
  122. if type(ret).__name__ == 'coroutine':
  123. ret = await ret
  124. cfg.quiet = oq_save
  125. return ret
  126. def tool_api(cls,cmd_name,args,opts):
  127. from mmgen.tool.api import tool_api
  128. tool = tool_api(cfg)
  129. if opts:
  130. for o in opts:
  131. if o.startswith('--type='):
  132. tool.addrtype = o.split('=')[1]
  133. pargs,kwargs = main_tool.process_args(cmd_name,args,cls)
  134. return getattr(tool,cmd_name)(*pargs,**kwargs)
  135. def check_output(out,chk):
  136. if isinstance(chk,str):
  137. chk = chk.encode()
  138. if isinstance(out,int):
  139. out = str(out).encode()
  140. if isinstance(out,str):
  141. out = out.encode()
  142. err_fs = "Output ({!r}) doesn't match expected output ({!r})"
  143. try:
  144. outd = out.decode()
  145. except:
  146. outd = None
  147. if type(chk).__name__ == 'function':
  148. assert chk(outd), f'{chk.__name__}({outd}) failed!'
  149. elif isinstance(chk,dict):
  150. for k,v in chk.items():
  151. if k == 'boolfunc':
  152. assert v(outd), f'{v.__name__}({outd}) failed!'
  153. elif k == 'value':
  154. assert outd == v, err_fs.format(outd,v)
  155. else:
  156. outval = getattr(__builtins__,k)(out)
  157. if outval != v:
  158. die(1,f'{k}({out}) returned {outval}, not {v}!')
  159. elif chk is not None:
  160. assert out == chk, err_fs.format(out,chk)
  161. async def run_test(cls,gid,cmd_name):
  162. data = tests[gid][cmd_name]
  163. # behavior is like cmdtest.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC
  164. if gid in coin_dependent_groups:
  165. k = '{}_{}'.format(
  166. ( cfg.token.lower() if proto.tokensym else proto.coin.lower() ),
  167. ('mainnet','testnet')[proto.testnet] )
  168. if k in data:
  169. data = data[k]
  170. m2 = f' ({k})'
  171. else:
  172. qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
  173. return
  174. else:
  175. if proto.coin != 'BTC' or proto.testnet:
  176. return
  177. m2 = ''
  178. m = '{} {}{}'.format(
  179. purple('Testing'),
  180. cmd_name if cfg.names else docstring_head(getattr(cls,cmd_name)),
  181. m2 )
  182. msg_r(green(m)+'\n' if cfg.verbose else m)
  183. skipping = False
  184. for n,d in enumerate(data):
  185. args,out,opts,mmtype = d + tuple([None] * (4-len(d)))
  186. if 'fmt=xmrseed' in args and cfg.no_altcoin:
  187. if not skipping:
  188. qmsg('')
  189. qmsg(('' if n else '\n') + gray(f'Skipping altcoin test {cmd_name} {args}'))
  190. skipping = True
  191. continue
  192. else:
  193. skipping = False
  194. stdin_input = None
  195. if args and isinstance(args[0],bytes):
  196. stdin_input = args[0]
  197. args[0] = '-'
  198. if cfg.tool_api:
  199. if args and args[0 ]== '-':
  200. continue
  201. cmd_out = tool_api(cls,cmd_name,args,opts)
  202. elif cfg.fork:
  203. cmd_out = fork_cmd(cmd_name,args,opts,stdin_input)
  204. else:
  205. if stdin_input and sys.platform == 'win32':
  206. msg('Skipping for MSWin - no os.fork()')
  207. continue
  208. method = getattr(cls(cfg,cmdname=cmd_name,proto=proto,mmtype=mmtype),cmd_name)
  209. cmd_out = await call_method(cls,method,cmd_name,args,mmtype,stdin_input)
  210. try:
  211. vmsg(f'Output:\n{cmd_out}\n')
  212. except:
  213. vmsg(f'Output:\n{cmd_out!r}\n')
  214. if isinstance(out,tuple) and type(out[0]).__name__ == 'function':
  215. func_out = out[0](cmd_out)
  216. assert func_out == out[1],(
  217. '{}({}) == {} failed!\nOutput: {}'.format(
  218. out[0].__name__,
  219. cmd_out,
  220. out[1],
  221. func_out ))
  222. elif isinstance(out,(list,tuple)):
  223. for co,o in zip(cmd_out.split(NL) if cfg.fork else cmd_out,out):
  224. check_output(co,o)
  225. else:
  226. check_output(cmd_out,out)
  227. if not cfg.verbose:
  228. msg_r('.')
  229. if not cfg.verbose:
  230. msg('OK')
  231. def docstring_head(obj):
  232. return obj.__doc__.strip().split('\n')[0] if obj.__doc__ else None
  233. async def do_group(gid):
  234. desc = f'command group {gid!r}'
  235. cls = main_tool.get_mod_cls(gid.lower())
  236. qmsg(blue('Testing ' +
  237. desc if cfg.names else
  238. ( docstring_head(cls) or desc )
  239. ))
  240. for cmdname in cls(cfg).user_commands:
  241. if cmdname in skipped_tests:
  242. continue
  243. if cmdname not in tests[gid]:
  244. m = f'No test for command {cmdname!r} in group {gid!r}!'
  245. if cfg.die_on_missing:
  246. die(1,m+' Aborting')
  247. else:
  248. msg(m)
  249. continue
  250. await run_test(cls,gid,cmdname)
  251. async def do_cmd_in_group(cmdname):
  252. cls = main_tool.get_cmd_cls(cmdname)
  253. for gid,cmds in tests.items():
  254. for cmd in cmds:
  255. if cmd == cmdname:
  256. await run_test(cls,gid,cmdname)
  257. return True
  258. return False
  259. def list_tested_cmds():
  260. for gid in tests:
  261. Msg('\n'.join(tests[gid]))
  262. async def main():
  263. if cfg._args:
  264. for cmd in cfg._args:
  265. if cmd in tests:
  266. await do_group(cmd)
  267. else:
  268. if not await do_cmd_in_group(cmd):
  269. die(1,f'{cmd!r}: not a recognized test or test group')
  270. else:
  271. for garg in tests:
  272. await do_group(garg)
  273. qmsg = cfg._util.qmsg
  274. vmsg = cfg._util.vmsg
  275. proto = cfg._proto
  276. if cfg.tool_api:
  277. del tests['Wallet']
  278. del tests['File']
  279. if cfg.list_tests:
  280. Msg('Available tests:')
  281. for modname,cmdlist in main_tool.mods.items():
  282. cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
  283. Msg(f' {modname:6} - {docstring_head(cls)}')
  284. sys.exit(0)
  285. if cfg.list_tested_cmds:
  286. list_tested_cmds()
  287. sys.exit(0)
  288. tool_exec = os.path.relpath(os.path.join('cmds','mmgen-tool'))
  289. if cfg.fork:
  290. passthru_args = ['coin','type','testnet','token']
  291. tool_cmd = [ tool_exec, '--skip-cfg-file' ] + [
  292. '--{}{}'.format(
  293. k.replace('_','-'),
  294. '='+getattr(cfg,k) if getattr(cfg,k) is not True else '')
  295. for k in passthru_args if getattr(cfg,k) ]
  296. if cfg.coverage:
  297. d,f = init_coverage()
  298. tool_cmd_preargs = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f]
  299. else:
  300. tool_cmd_preargs = ['python3','scripts/exec_wrapper.py']
  301. from mmgen.main import launch
  302. start_time = int(time.time())
  303. launch(func = lambda: async_run(main()))
  304. end_msg(int(time.time()) - start_time)