unit_test.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test.include.unit_test: Unit test framework for the MMGen suite
  20. """
  21. import sys, os, time, importlib, platform, asyncio
  22. from .test_init import repo_root
  23. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  24. # in production code:
  25. if not os.getenv('MMGEN_DEVTOOLS'):
  26. from mmgen.devinit import init_dev
  27. init_dev()
  28. from mmgen.cfg import Config, gc, gv
  29. from mmgen.color import gray, brown, orange, yellow, red
  30. from mmgen.util import msg, msg_r, gmsg, ymsg, Msg, isAsync
  31. from test.include.common import set_globals, end_msg
  32. def die(ev, s):
  33. msg((red if ev > 1 else yellow)(s))
  34. sys.exit(ev)
  35. opts_data = {
  36. 'text': {
  37. 'desc': "Unit tests for the MMGen suite",
  38. 'usage':'[options] [test | test.subtest]...',
  39. 'options': """
  40. -h, --help Print this help message
  41. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  42. -A, --no-daemon-autostart Don't start and stop daemons automatically
  43. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  44. -f, --fast Speed up execution by reducing rounds on some tests
  45. -l, --list List available tests
  46. -L, --list-subtests List available tests and subtests
  47. -n, --names Print command names instead of descriptions
  48. -q, --quiet Produce quieter output
  49. -x, --exclude=T Exclude tests 'T' (comma-separated)
  50. -v, --verbose Produce more verbose output
  51. """,
  52. 'notes': """
  53. If no test is specified, all available tests are run
  54. """
  55. }
  56. }
  57. if os.path.islink(Config.test_datadir):
  58. os.unlink(Config.test_datadir)
  59. sys.argv.insert(1, '--skip-cfg-file')
  60. cfg = Config(opts_data=opts_data)
  61. type(cfg)._reset_ok += ('use_internal_keccak_module', 'debug_addrlist')
  62. set_globals(cfg)
  63. test_type = {
  64. 'modtest.py': 'unit',
  65. 'daemontest.py': 'daemon',
  66. }[gc.prog_name]
  67. test_subdir = gc.prog_name.removesuffix('.py') + '_d'
  68. test_dir = os.path.join(repo_root, 'test', test_subdir)
  69. all_tests = sorted(fn.removesuffix('.py') for fn in os.listdir(test_dir) if not fn.startswith('_'))
  70. exclude = cfg.exclude.split(',') if cfg.exclude else []
  71. if cfg.no_altcoin_deps:
  72. ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
  73. altcoin_tests = importlib.import_module(f'test.{test_subdir}').altcoin_tests
  74. for e in exclude:
  75. if e not in all_tests:
  76. die(1, f'{e!r}: invalid parameter for --exclude (no such test)')
  77. start_time = int(time.time())
  78. if cfg.list:
  79. Msg(' '.join(all_tests))
  80. sys.exit(0)
  81. if cfg.list_subtests:
  82. def gen():
  83. for test in all_tests:
  84. mod = importlib.import_module(f'test.{test_subdir}.{test}')
  85. if hasattr(mod, 'unit_tests'):
  86. t = getattr(mod, 'unit_tests')
  87. subtests = [k for k, v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  88. yield fs.format(test, ' '.join(f'{subtest}' for subtest in subtests))
  89. else:
  90. yield test
  91. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  92. Msg(fs.format('TEST', 'SUBTESTS') + '\n' + '\n'.join(gen()))
  93. sys.exit(0)
  94. def silence():
  95. if not cfg.verbose:
  96. global stdout_save, stderr_save
  97. stdout_save = sys.stdout
  98. stderr_save = sys.stderr
  99. sys.stdout = sys.stderr = gv.stdout = gv.stderr = open(os.devnull, 'w')
  100. def end_silence():
  101. if not cfg.verbose:
  102. global stdout_save, stderr_save
  103. sys.stdout = gv.stdout = stdout_save
  104. sys.stderr = gv.stderr = stderr_save
  105. class UnitTestHelpers:
  106. def __init__(self, subtest_name):
  107. self.subtest_name = subtest_name
  108. def skip_msg(self, desc):
  109. cfg._util.qmsg(gray(
  110. f'Skipping {test_type} subtest {self.subtest_name.replace("_", "-")!r} for {desc}'
  111. ))
  112. def process_bad_data(self, data, pfx='bad '):
  113. if os.getenv('PYTHONOPTIMIZE'):
  114. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  115. return
  116. import re
  117. desc_w = max(len(e[0]) for e in data)
  118. exc_w = max(len(e[1]) for e in data)
  119. m_exc = '{!r}: incorrect exception type (expected {!r})'
  120. m_err = '{!r}: incorrect error msg (should match {!r}'
  121. m_noraise = "\nillegal action '{}{}' failed to raise an exception (expected {!r})"
  122. for (desc, exc_chk, emsg_chk, func) in data:
  123. try:
  124. cfg._util.vmsg_r(' {}{:{w}}'.format(pfx, desc+':', w=desc_w+1))
  125. asyncio.run(func()) if isAsync(func) else func()
  126. except Exception as e:
  127. exc = type(e).__name__
  128. emsg = e.args[0] if e.args else '(unspecified error)'
  129. cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
  130. assert exc == exc_chk, m_exc.format(exc, exc_chk)
  131. assert re.search(emsg_chk, emsg), m_err.format(emsg, emsg_chk)
  132. else:
  133. die(4, m_noraise.format(pfx, desc, exc_chk))
  134. tests_seen = []
  135. def run_test(test, subtest=None):
  136. def run_subtest(t, subtest):
  137. subtest_disp = subtest.replace('_', '-')
  138. msg(brown(f'Running {test_type} subtest ') + orange(f'{test}.{subtest_disp}'))
  139. if getattr(t, 'silence_output', False):
  140. silence()
  141. if hasattr(t, '_pre_subtest'):
  142. getattr(t, '_pre_subtest')(test, subtest, UnitTestHelpers(subtest))
  143. try:
  144. func = getattr(t, subtest.replace('-', '_'))
  145. c = func.__code__
  146. do_desc = c.co_varnames[c.co_argcount-1] == 'desc'
  147. if do_desc:
  148. if cfg.verbose:
  149. msg(f'Testing {func.__defaults__[0]}')
  150. elif not cfg.quiet:
  151. msg_r(f'Testing {func.__defaults__[0]}...')
  152. if isAsync(func):
  153. ret = asyncio.run(func(test, UnitTestHelpers(subtest)))
  154. else:
  155. ret = func(test, UnitTestHelpers(subtest))
  156. if do_desc and not cfg.quiet:
  157. msg('OK\n' if cfg.verbose else 'OK')
  158. except:
  159. if getattr(t, 'silence_output', False):
  160. end_silence()
  161. raise
  162. if hasattr(t, '_post_subtest'):
  163. getattr(t, '_post_subtest')(test, subtest, UnitTestHelpers(subtest))
  164. if getattr(t, 'silence_output', False):
  165. end_silence()
  166. if not ret:
  167. die(4, f'Unit subtest {subtest_disp!r} failed')
  168. if test not in tests_seen:
  169. gmsg(f'Running {test_type} test {test}')
  170. tests_seen.append(test)
  171. if cfg.no_altcoin_deps and test in altcoin_tests:
  172. msg(gray(f'Skipping {test_type} test {test!r} [--no-altcoin-deps]'))
  173. return
  174. mod = importlib.import_module(f'test.{test_subdir}.{test}')
  175. if hasattr(mod, 'unit_tests'): # new class-based API
  176. t = getattr(mod, 'unit_tests')()
  177. altcoin_deps = getattr(t, 'altcoin_deps', ())
  178. win_skip = getattr(t, 'win_skip', ())
  179. mac_skip = getattr(t, 'mac_skip', ())
  180. arm_skip = getattr(t, 'arm_skip', ())
  181. riscv_skip = getattr(t, 'riscv_skip', ())
  182. fast_skip = getattr(t, 'fast_skip', ())
  183. subtests = (
  184. [subtest] if subtest else
  185. [k for k, v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  186. )
  187. if hasattr(t, '_pre'):
  188. t._pre()
  189. def subtest_skip_msg(name, add_msg):
  190. cfg._util.qmsg(gray(
  191. 'Skipping {} subtest {!r} {}'.format(test_type, name.replace('_', '-'), add_msg)
  192. ))
  193. for _subtest in subtests:
  194. if cfg.no_altcoin_deps and _subtest in altcoin_deps:
  195. subtest_skip_msg(_subtest, '[--no-altcoin-deps]')
  196. continue
  197. if cfg.fast and _subtest in fast_skip:
  198. subtest_skip_msg(_subtest, '[--fast]')
  199. continue
  200. if sys.platform == 'win32' and _subtest in win_skip:
  201. subtest_skip_msg(_subtest, 'for Windows platform')
  202. continue
  203. if sys.platform == 'darwin' and _subtest in mac_skip:
  204. subtest_skip_msg(_subtest, 'for macOS platform')
  205. continue
  206. if platform.machine() == 'aarch64' and _subtest in arm_skip:
  207. subtest_skip_msg(_subtest, 'for ARM platform')
  208. continue
  209. if platform.machine() == 'riscv64' and _subtest in riscv_skip:
  210. subtest_skip_msg(_subtest, 'for RISC-V platform')
  211. continue
  212. run_subtest(t, _subtest)
  213. if hasattr(t, '_post'):
  214. t._post()
  215. else:
  216. assert not subtest, f'{subtest!r}: subtests not supported for this {test_type} test'
  217. if not mod.unit_test().run_test(test, UnitTestHelpers(test)):
  218. die(4, 'Unit test {test!r} failed')
  219. def main():
  220. for test in (cfg._args or all_tests):
  221. if '.' in test:
  222. test, subtest = test.split('.')
  223. else:
  224. subtest = None
  225. if test not in all_tests:
  226. die(1, f'{test!r}: test not recognized')
  227. if test not in exclude:
  228. run_test(test, subtest=subtest)
  229. end_msg(int(time.time()) - start_time)
  230. from mmgen.main import launch
  231. launch(func=main)