tooltest2.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/tooltest2.py: Test the 'mmgen-tool' utility
  20. """
  21. # TODO: move all non-interactive 'mmgen-tool' tests in 'cmdtest.py' here
  22. # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
  23. import sys, os, time, importlib, asyncio
  24. from subprocess import run, PIPE
  25. try:
  26. from include import test_init
  27. except ImportError:
  28. from test.include import test_init
  29. from test.include.common import set_globals, end_msg, init_coverage
  30. from mmgen import main_tool
  31. from mmgen.cfg import Config
  32. from mmgen.color import green, blue, purple, cyan, gray
  33. from mmgen.util import msg, msg_r, Msg, die, isAsync
  34. skipped_tests = ['mn2hex_interactive']
  35. coin_dependent_groups = ('Coin', 'File')
  36. opts_data = {
  37. 'text': {
  38. 'desc': "Simple test suite for the 'mmgen-tool' utility",
  39. 'usage':'[options] [command]...',
  40. 'options': """
  41. -h, --help Print this help message
  42. -a, --no-altcoin Skip altcoin tests
  43. -A, --tool-api Test the tool_api subsystem
  44. -C, --coverage Produce code coverage info using trace module
  45. -d, --die-on-missing Abort if no test data found for given command
  46. --, --longhelp Print help message for long (global) options
  47. -l, --list-tests List the test groups in this test suite
  48. -L, --list-tested-cmds Output the 'mmgen-tool' commands that are tested by this test suite
  49. -n, --names Print command names instead of descriptions
  50. -q, --quiet Produce quieter output
  51. -t, --type= Specify coin type
  52. -f, --fork Run commands via tool executable instead of importing tool module
  53. -v, --verbose Produce more verbose output
  54. """,
  55. 'notes': """
  56. If no command is given, the whole suite of tests is run.
  57. """
  58. }
  59. }
  60. sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
  61. cfg = Config(
  62. opts_data = opts_data,
  63. init_opts = {
  64. 'usr_randchars': 0,
  65. 'hash_preset': '1',
  66. 'passwd_file': 'test/ref/keyaddrfile_password',
  67. })
  68. set_globals(cfg)
  69. from test.tooltest2_d.data import *
  70. def fork_cmd(cmd_name, args, opts, stdin_input):
  71. cmd = (
  72. tool_cmd_preargs +
  73. tool_cmd +
  74. (opts or []) +
  75. [cmd_name] + args
  76. )
  77. vmsg('{} {}'.format(
  78. green('Executing'),
  79. cyan(' '.join(cmd))))
  80. cp = run(cmd, input=stdin_input or None, stdout=PIPE, stderr=PIPE)
  81. try:
  82. cmd_out = cp.stdout.decode()
  83. except:
  84. cmd_out = cp.stdout
  85. if cp.stderr:
  86. vmsg(cp.stderr.strip().decode())
  87. if cp.returncode != 0:
  88. import re
  89. m = re.search(b'tool command returned (None|False)', cp.stderr)
  90. if m:
  91. return eval(m.group(1))
  92. else:
  93. die(2, f'Spawned program exited with error: {cp.stderr}')
  94. return cmd_out.strip()
  95. def call_method(cls, method, cmd_name, args, mmtype, stdin_input):
  96. vmsg('{a}: {b}{c}'.format(
  97. a = purple('Running'),
  98. b = ' '.join([cmd_name]+[repr(e) for e in args]),
  99. c = ' '+mmtype if mmtype else ''))
  100. aargs, kwargs = main_tool.process_args(cmd_name, args, cls)
  101. oq_save = bool(cfg.quiet)
  102. if not cfg.verbose:
  103. cfg._set_quiet(True)
  104. if stdin_input:
  105. fd0, fd1 = os.pipe()
  106. if os.fork(): # parent
  107. os.close(fd1)
  108. stdin_save = os.dup(0)
  109. os.dup2(fd0, 0)
  110. cmd_out = method(*aargs, **kwargs)
  111. os.dup2(stdin_save, 0)
  112. os.wait()
  113. cfg._set_quiet(oq_save)
  114. return cmd_out
  115. else: # child
  116. os.close(fd0)
  117. os.write(fd1, stdin_input)
  118. vmsg(f'Input: {stdin_input!r}')
  119. sys.exit(0)
  120. else:
  121. ret = asyncio.run(method(*aargs, **kwargs)) if isAsync(method) else method(*aargs, **kwargs)
  122. cfg._set_quiet(oq_save)
  123. return ret
  124. def tool_api(cls, cmd_name, args, opts):
  125. from mmgen.tool.api import tool_api
  126. tool = tool_api(cfg)
  127. if opts:
  128. for o in opts:
  129. if o.startswith('--type='):
  130. tool.addrtype = o.split('=')[1]
  131. pargs, kwargs = main_tool.process_args(cmd_name, args, cls)
  132. return getattr(tool, cmd_name)(*pargs, **kwargs)
  133. def check_output(out, chk):
  134. match chk:
  135. case str():
  136. chk = chk.encode()
  137. match out:
  138. case int():
  139. out = str(out).encode()
  140. case str():
  141. out = out.encode()
  142. try:
  143. outd = out.decode()
  144. except:
  145. outd = None
  146. err_fs = "Output ({!r}) doesn't match expected output ({!r})"
  147. match type(chk).__name__:
  148. case 'NoneType':
  149. pass
  150. case 'function':
  151. assert chk(outd), f'{chk.__name__}({outd}) failed!'
  152. case 'dict':
  153. for k, v in chk.items():
  154. match k:
  155. case 'boolfunc':
  156. assert v(outd), f'{v.__name__}({outd}) failed!'
  157. case 'value':
  158. assert outd == v, err_fs.format(outd, v)
  159. case _:
  160. if (outval := getattr(__builtins__, k)(out)) != v:
  161. die(1, f'{k}({out}) returned {outval}, not {v}!')
  162. case _:
  163. assert out == chk, err_fs.format(out, chk)
  164. def run_test(cls, gid, cmd_name):
  165. data = tests[gid][cmd_name]
  166. # behavior is like cmdtest.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC
  167. if gid in coin_dependent_groups:
  168. k = '{}_{}'.format(
  169. (cfg.token.lower() if proto.tokensym else proto.coin.lower()),
  170. proto.network)
  171. if k in data:
  172. data = data[k]
  173. m2 = f' ({k})'
  174. else:
  175. qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
  176. return
  177. else:
  178. if proto.coin != 'BTC' or proto.testnet:
  179. return
  180. m2 = ''
  181. m = '{} {}{}'.format(
  182. purple('Testing'),
  183. cmd_name if cfg.names else docstring_head(getattr(cls, cmd_name)),
  184. m2)
  185. msg_r(green(m)+'\n' if cfg.verbose else m)
  186. skipping = False
  187. for n, d in enumerate(data):
  188. args, out, opts, mmtype = d + tuple([None] * (4-len(d)))
  189. if 'fmt=xmrseed' in args and cfg.no_altcoin:
  190. if not skipping:
  191. qmsg('')
  192. skip_msg = f'Skipping altcoin test {cmd_name} {args}'
  193. qmsg(('' if n else '\n') + gray(skip_msg if len(skip_msg) <= 100 else skip_msg[:97] + '...'))
  194. skipping = True
  195. continue
  196. skipping = False
  197. stdin_input = None
  198. if args and isinstance(args[0], bytes):
  199. stdin_input = args[0]
  200. args[0] = '-'
  201. if cfg.tool_api:
  202. if args and args[0]== '-':
  203. continue
  204. cmd_out = tool_api(cls, cmd_name, args, opts)
  205. elif cfg.fork:
  206. cmd_out = fork_cmd(cmd_name, args, opts, stdin_input)
  207. else:
  208. if stdin_input and sys.platform == 'win32':
  209. msg(gray('Skipping for MSWin - no os.fork()'))
  210. continue
  211. method = getattr(cls(cfg, cmdname=cmd_name, proto=proto, mmtype=mmtype), cmd_name)
  212. cmd_out = call_method(cls, method, cmd_name, args, mmtype, stdin_input)
  213. try:
  214. vmsg(f'Output:\n{cmd_out}\n')
  215. except:
  216. vmsg(f'Output:\n{cmd_out!r}\n')
  217. if isinstance(out, tuple) and type(out[0]).__name__ == 'function':
  218. func_out = out[0](cmd_out)
  219. assert func_out == out[1], (
  220. '{}({}) == {} failed!\nOutput: {}'.format(
  221. out[0].__name__,
  222. cmd_out,
  223. out[1],
  224. func_out))
  225. elif isinstance(out, list | tuple):
  226. for co, o in zip(cmd_out.split(NL) if cfg.fork else cmd_out, out):
  227. check_output(co, o)
  228. else:
  229. check_output(cmd_out, out)
  230. if not cfg.verbose:
  231. msg_r('.')
  232. if not cfg.verbose:
  233. msg('OK')
  234. def docstring_head(obj):
  235. return obj.__doc__.strip().split('\n')[0] if obj.__doc__ else None
  236. def do_group(gid):
  237. desc = f'command group {gid!r}'
  238. cls = main_tool.get_mod_cls(gid.lower())
  239. qmsg(blue('Testing ' +
  240. desc if cfg.names else
  241. (docstring_head(cls) or desc)
  242. ))
  243. for cmdname in cls(cfg).user_commands:
  244. if cmdname in skipped_tests:
  245. continue
  246. if cmdname not in tests[gid]:
  247. m = f'No test for command {cmdname!r} in group {gid!r}!'
  248. if cfg.die_on_missing:
  249. die(1, m+' Aborting')
  250. else:
  251. msg(m)
  252. continue
  253. run_test(cls, gid, cmdname)
  254. def do_cmd_in_group(cmdname):
  255. cls = main_tool.get_cmd_cls(cmdname)
  256. for gid, cmds in tests.items():
  257. for cmd in cmds:
  258. if cmd == cmdname:
  259. run_test(cls, gid, cmdname)
  260. return True
  261. return False
  262. def list_tested_cmds():
  263. for gid in tests:
  264. Msg('\n'.join(tests[gid]))
  265. def main():
  266. if cfg._args:
  267. for cmd in cfg._args:
  268. if cmd in tests:
  269. do_group(cmd)
  270. else:
  271. if not do_cmd_in_group(cmd):
  272. die(1, f'{cmd!r}: not a recognized test or test group')
  273. else:
  274. for garg in tests:
  275. do_group(garg)
  276. qmsg = cfg._util.qmsg
  277. vmsg = cfg._util.vmsg
  278. proto = cfg._proto
  279. if cfg.tool_api:
  280. del tests['Wallet']
  281. del tests['File']
  282. if cfg.list_tests:
  283. Msg('Available tests:')
  284. for modname, cmdlist in main_tool.mods.items():
  285. cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'), 'tool_cmd')
  286. Msg(f' {modname:6} - {docstring_head(cls)}')
  287. sys.exit(0)
  288. if cfg.list_tested_cmds:
  289. list_tested_cmds()
  290. sys.exit(0)
  291. tool_exec = os.path.relpath(os.path.join('cmds', 'mmgen-tool'))
  292. if cfg.fork:
  293. passthru_args = ['coin', 'type', 'testnet', 'token']
  294. tool_cmd = [tool_exec, '--skip-cfg-file'] + [
  295. '--{}{}'.format(
  296. k.replace('_', '-'),
  297. '='+getattr(cfg, k) if getattr(cfg, k) is not True else '')
  298. for k in passthru_args if getattr(cfg, k)]
  299. if cfg.coverage:
  300. d, f = init_coverage()
  301. tool_cmd_preargs = ['python3', '-m', 'trace', '--count', '--coverdir='+d, '--file='+f]
  302. else:
  303. tool_cmd_preargs = ['python3', 'scripts/exec_wrapper.py']
  304. from mmgen.main import launch
  305. start_time = int(time.time())
  306. launch(func=main)
  307. end_msg(int(time.time()) - start_time)