unit_test.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test.include.unit_test: Unit test framework for the MMGen suite
  20. """
  21. import sys, os, time, importlib, platform, asyncio
  22. from .test_init import repo_root
  23. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  24. # in production code:
  25. if not os.getenv('MMGEN_DEVTOOLS'):
  26. from mmgen.devinit import init_dev
  27. init_dev()
  28. from mmgen.cfg import Config, gc
  29. from mmgen.color import gray, brown, orange, yellow, red
  30. from mmgen.util import msg, msg_r, gmsg, ymsg, Msg
  31. from test.include.common import set_globals, end_msg
  32. def die(ev, s):
  33. msg((red if ev > 1 else yellow)(s))
  34. sys.exit(ev)
  35. opts_data = {
  36. 'text': {
  37. 'desc': "Unit tests for the MMGen suite",
  38. 'usage':'[options] [test | test.subtest]...',
  39. 'options': """
  40. -h, --help Print this help message
  41. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  42. -A, --no-daemon-autostart Don't start and stop daemons automatically
  43. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  44. -f, --fast Speed up execution by reducing rounds on some tests
  45. -l, --list List available tests
  46. -L, --list-subtests List available tests and subtests
  47. -n, --names Print command names instead of descriptions
  48. -q, --quiet Produce quieter output
  49. -x, --exclude=T Exclude tests 'T' (comma-separated)
  50. -v, --verbose Produce more verbose output
  51. """,
  52. 'notes': """
  53. If no test is specified, all available tests are run
  54. """
  55. }
  56. }
  57. if os.path.islink(Config.test_datadir):
  58. os.unlink(Config.test_datadir)
  59. sys.argv.insert(1, '--skip-cfg-file')
  60. cfg = Config(opts_data=opts_data)
  61. if cfg.no_altcoin_deps:
  62. ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
  63. type(cfg)._reset_ok += ('use_internal_keccak_module', 'debug_addrlist')
  64. set_globals(cfg)
  65. file_pfx = 'ut_'
  66. test_type = {
  67. 'modtest.py': 'unit',
  68. 'daemontest.py': 'daemon',
  69. }[gc.prog_name]
  70. test_subdir = gc.prog_name.removesuffix('.py') + '_d'
  71. test_dir = os.path.join(repo_root, 'test', test_subdir)
  72. all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(test_dir) if fn.startswith(file_pfx))
  73. exclude = cfg.exclude.split(',') if cfg.exclude else []
  74. for e in exclude:
  75. if e not in all_tests:
  76. die(1, f'{e!r}: invalid parameter for --exclude (no such test)')
  77. start_time = int(time.time())
  78. if cfg.list:
  79. Msg(' '.join(all_tests))
  80. sys.exit(0)
  81. if cfg.list_subtests:
  82. def gen():
  83. for test in all_tests:
  84. mod = importlib.import_module(f'test.{test_subdir}.{file_pfx}{test}')
  85. if hasattr(mod, 'unit_tests'):
  86. t = getattr(mod, 'unit_tests')
  87. subtests = [k for k, v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  88. yield fs.format(test, ' '.join(f'{subtest}' for subtest in subtests))
  89. else:
  90. yield test
  91. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  92. Msg(fs.format('TEST', 'SUBTESTS') + '\n' + '\n'.join(gen()))
  93. sys.exit(0)
  94. class UnitTestHelpers:
  95. def __init__(self, subtest_name):
  96. self.subtest_name = subtest_name
  97. def skip_msg(self, desc):
  98. cfg._util.qmsg(gray(
  99. f'Skipping {test_type} subtest {self.subtest_name.replace("_", "-")!r} for {desc}'
  100. ))
  101. def process_bad_data(self, data, pfx='bad '):
  102. if os.getenv('PYTHONOPTIMIZE'):
  103. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  104. return
  105. import re
  106. desc_w = max(len(e[0]) for e in data)
  107. exc_w = max(len(e[1]) for e in data)
  108. m_exc = '{!r}: incorrect exception type (expected {!r})'
  109. m_err = '{!r}: incorrect error msg (should match {!r}'
  110. m_noraise = "\nillegal action '{}{}' failed to raise an exception (expected {!r})"
  111. for (desc, exc_chk, emsg_chk, func) in data:
  112. try:
  113. cfg._util.vmsg_r(' {}{:{w}}'.format(pfx, desc+':', w=desc_w+1))
  114. ret = func()
  115. if type(ret).__name__ == 'coroutine':
  116. asyncio.run(ret)
  117. except Exception as e:
  118. exc = type(e).__name__
  119. emsg = e.args[0]
  120. cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
  121. assert exc == exc_chk, m_exc.format(exc, exc_chk)
  122. assert re.search(emsg_chk, emsg), m_err.format(emsg, emsg_chk)
  123. else:
  124. die(4, m_noraise.format(pfx, desc, exc_chk))
  125. tests_seen = []
  126. def run_test(test, subtest=None):
  127. mod = importlib.import_module(f'test.{test_subdir}.{file_pfx}{test}')
  128. def run_subtest(t, subtest):
  129. subtest_disp = subtest.replace('_', '-')
  130. msg(brown(f'Running {test_type} subtest ') + orange(f'{test}.{subtest_disp}'))
  131. if getattr(t, 'silence_output', False):
  132. t._silence()
  133. if hasattr(t, '_pre_subtest'):
  134. getattr(t, '_pre_subtest')(test, subtest, UnitTestHelpers(subtest))
  135. try:
  136. func = getattr(t, subtest.replace('-', '_'))
  137. c = func.__code__
  138. do_desc = c.co_varnames[c.co_argcount-1] == 'desc'
  139. if do_desc:
  140. if cfg.verbose:
  141. msg(f'Testing {func.__defaults__[0]}')
  142. elif not cfg.quiet:
  143. msg_r(f'Testing {func.__defaults__[0]}...')
  144. ret = func(test, UnitTestHelpers(subtest))
  145. if type(ret).__name__ == 'coroutine':
  146. ret = asyncio.run(ret)
  147. if do_desc and not cfg.quiet:
  148. msg('OK\n' if cfg.verbose else 'OK')
  149. except:
  150. if getattr(t, 'silence_output', False):
  151. t._end_silence()
  152. raise
  153. if hasattr(t, '_post_subtest'):
  154. getattr(t, '_post_subtest')(test, subtest, UnitTestHelpers(subtest))
  155. if getattr(t, 'silence_output', False):
  156. t._end_silence()
  157. if not ret:
  158. die(4, f'Unit subtest {subtest_disp!r} failed')
  159. if test not in tests_seen:
  160. gmsg(f'Running {test_type} test {test}')
  161. tests_seen.append(test)
  162. if cfg.no_altcoin_deps and getattr(mod, 'altcoin_dep', None):
  163. cfg._util.qmsg(gray(f'Skipping {test_type} test {test!r} [--no-altcoin-deps]'))
  164. return
  165. if hasattr(mod, 'unit_tests'): # new class-based API
  166. t = getattr(mod, 'unit_tests')()
  167. altcoin_deps = getattr(t, 'altcoin_deps', ())
  168. win_skip = getattr(t, 'win_skip', ())
  169. mac_skip = getattr(t, 'mac_skip', ())
  170. arm_skip = getattr(t, 'arm_skip', ())
  171. subtests = (
  172. [subtest] if subtest else
  173. [k for k, v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  174. )
  175. if hasattr(t, '_pre'):
  176. t._pre()
  177. def subtest_skip_msg(name, add_msg):
  178. cfg._util.qmsg(gray(
  179. 'Skipping {} subtest {!r} {}'.format(test_type, name.replace('_', '-'), add_msg)
  180. ))
  181. for _subtest in subtests:
  182. if cfg.no_altcoin_deps and _subtest in altcoin_deps:
  183. subtest_skip_msg(_subtest, '[--no-altcoin-deps]')
  184. continue
  185. if sys.platform == 'win32' and _subtest in win_skip:
  186. subtest_skip_msg(_subtest, 'for Windows platform')
  187. continue
  188. if sys.platform == 'darwin' and _subtest in mac_skip:
  189. subtest_skip_msg(_subtest, 'for macOS platform')
  190. continue
  191. if platform.machine() == 'aarch64' and _subtest in arm_skip:
  192. subtest_skip_msg(_subtest, 'for ARM platform')
  193. continue
  194. run_subtest(t, _subtest)
  195. if hasattr(t, '_post'):
  196. t._post()
  197. else:
  198. assert not subtest, f'{subtest!r}: subtests not supported for this {test_type} test'
  199. if not mod.unit_test().run_test(test, UnitTestHelpers(test)):
  200. die(4, 'Unit test {test!r} failed')
  201. def main():
  202. for test in (cfg._args or all_tests):
  203. if '.' in test:
  204. test, subtest = test.split('.')
  205. else:
  206. subtest = None
  207. if test not in all_tests:
  208. die(1, f'{test!r}: test not recognized')
  209. if test not in exclude:
  210. run_test(test, subtest=subtest)
  211. end_msg(int(time.time()) - start_time)
  212. from mmgen.main import launch
  213. launch(func=main)