unit_test.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test.include.unit_test: Unit test framework for the MMGen suite
  20. """
  21. import sys, os, time, importlib, platform, asyncio
  22. from .test_init import repo_root
  23. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  24. # in production code:
  25. if not os.getenv('MMGEN_DEVTOOLS'):
  26. from mmgen.devinit import init_dev
  27. init_dev()
  28. from mmgen.cfg import Config, gc
  29. from mmgen.color import gray, brown, orange, yellow, red
  30. from mmgen.util import msg, msg_r, gmsg, ymsg, Msg
  31. from test.include.common import set_globals, end_msg
  32. def die(ev, s):
  33. msg((red if ev > 1 else yellow)(s))
  34. sys.exit(ev)
  35. opts_data = {
  36. 'text': {
  37. 'desc': "Unit tests for the MMGen suite",
  38. 'usage':'[options] [test | test.subtest]...',
  39. 'options': """
  40. -h, --help Print this help message
  41. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  42. -A, --no-daemon-autostart Don't start and stop daemons automatically
  43. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  44. -f, --fast Speed up execution by reducing rounds on some tests
  45. -l, --list List available tests
  46. -L, --list-subtests List available tests and subtests
  47. -n, --names Print command names instead of descriptions
  48. -q, --quiet Produce quieter output
  49. -x, --exclude=T Exclude tests 'T' (comma-separated)
  50. -v, --verbose Produce more verbose output
  51. """,
  52. 'notes': """
  53. If no test is specified, all available tests are run
  54. """
  55. }
  56. }
  57. if os.path.islink(Config.test_datadir):
  58. os.unlink(Config.test_datadir)
  59. sys.argv.insert(1, '--skip-cfg-file')
  60. cfg = Config(opts_data=opts_data)
  61. if cfg.no_altcoin_deps:
  62. ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
  63. type(cfg)._reset_ok += ('use_internal_keccak_module', 'debug_addrlist')
  64. set_globals(cfg)
  65. file_pfx = 'ut_'
  66. test_type = {
  67. 'modtest.py': 'unit',
  68. 'daemontest.py': 'daemon',
  69. }[gc.prog_name]
  70. test_subdir = gc.prog_name.removesuffix('.py') + '_d'
  71. test_dir = os.path.join(repo_root, 'test', test_subdir)
  72. all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(test_dir) if fn.startswith(file_pfx))
  73. exclude = cfg.exclude.split(',') if cfg.exclude else []
  74. for e in exclude:
  75. if e not in all_tests:
  76. die(1, f'{e!r}: invalid parameter for --exclude (no such test)')
  77. start_time = int(time.time())
  78. if cfg.list:
  79. Msg(' '.join(all_tests))
  80. sys.exit(0)
  81. if cfg.list_subtests:
  82. def gen():
  83. for test in all_tests:
  84. mod = importlib.import_module(f'test.{test_subdir}.{file_pfx}{test}')
  85. if hasattr(mod, 'unit_tests'):
  86. t = getattr(mod, 'unit_tests')
  87. subtests = [k for k, v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  88. yield fs.format(test, ' '.join(f'{subtest}' for subtest in subtests))
  89. else:
  90. yield test
  91. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  92. Msg(fs.format('TEST', 'SUBTESTS') + '\n' + '\n'.join(gen()))
  93. sys.exit(0)
  94. class UnitTestHelpers:
  95. def __init__(self, subtest_name):
  96. self.subtest_name = subtest_name
  97. def skip_msg(self, desc):
  98. cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_", "-")!r} for {desc}'))
  99. def process_bad_data(self, data, pfx='bad '):
  100. if os.getenv('PYTHONOPTIMIZE'):
  101. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  102. return
  103. import re
  104. desc_w = max(len(e[0]) for e in data)
  105. exc_w = max(len(e[1]) for e in data)
  106. m_exc = '{!r}: incorrect exception type (expected {!r})'
  107. m_err = '{!r}: incorrect error msg (should match {!r}'
  108. m_noraise = "\nillegal action '{}{}' failed to raise an exception (expected {!r})"
  109. for (desc, exc_chk, emsg_chk, func) in data:
  110. try:
  111. cfg._util.vmsg_r(' {}{:{w}}'.format(pfx, desc+':', w=desc_w+1))
  112. ret = func()
  113. if type(ret).__name__ == 'coroutine':
  114. asyncio.run(ret)
  115. except Exception as e:
  116. exc = type(e).__name__
  117. emsg = e.args[0]
  118. cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
  119. assert exc == exc_chk, m_exc.format(exc, exc_chk)
  120. assert re.search(emsg_chk, emsg), m_err.format(emsg, emsg_chk)
  121. else:
  122. die(4, m_noraise.format(pfx, desc, exc_chk))
  123. tests_seen = []
  124. def run_test(test, subtest=None):
  125. mod = importlib.import_module(f'test.{test_subdir}.{file_pfx}{test}')
  126. def run_subtest(t, subtest):
  127. subtest_disp = subtest.replace('_', '-')
  128. msg(brown(f'Running {test_type} subtest ') + orange(f'{test}.{subtest_disp}'))
  129. if getattr(t, 'silence_output', False):
  130. t._silence()
  131. if hasattr(t, '_pre_subtest'):
  132. getattr(t, '_pre_subtest')(test, subtest, UnitTestHelpers(subtest))
  133. try:
  134. func = getattr(t, subtest.replace('-', '_'))
  135. c = func.__code__
  136. do_desc = c.co_varnames[c.co_argcount-1] == 'desc'
  137. if do_desc:
  138. if cfg.verbose:
  139. msg(f'Testing {func.__defaults__[0]}')
  140. elif not cfg.quiet:
  141. msg_r(f'Testing {func.__defaults__[0]}...')
  142. ret = func(test, UnitTestHelpers(subtest))
  143. if type(ret).__name__ == 'coroutine':
  144. ret = asyncio.run(ret)
  145. if do_desc and not cfg.quiet:
  146. msg('OK\n' if cfg.verbose else 'OK')
  147. except:
  148. if getattr(t, 'silence_output', False):
  149. t._end_silence()
  150. raise
  151. if hasattr(t, '_post_subtest'):
  152. getattr(t, '_post_subtest')(test, subtest, UnitTestHelpers(subtest))
  153. if getattr(t, 'silence_output', False):
  154. t._end_silence()
  155. if not ret:
  156. die(4, f'Unit subtest {subtest_disp!r} failed')
  157. if test not in tests_seen:
  158. gmsg(f'Running {test_type} test {test}')
  159. tests_seen.append(test)
  160. if cfg.no_altcoin_deps and getattr(mod, 'altcoin_dep', None):
  161. cfg._util.qmsg(gray(f'Skipping {test_type} test {test!r} [--no-altcoin-deps]'))
  162. return
  163. if hasattr(mod, 'unit_tests'): # new class-based API
  164. t = getattr(mod, 'unit_tests')()
  165. altcoin_deps = getattr(t, 'altcoin_deps', ())
  166. win_skip = getattr(t, 'win_skip', ())
  167. mac_skip = getattr(t, 'mac_skip', ())
  168. arm_skip = getattr(t, 'arm_skip', ())
  169. subtests = (
  170. [subtest] if subtest else
  171. [k for k, v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  172. )
  173. if hasattr(t, '_pre'):
  174. t._pre()
  175. for _subtest in subtests:
  176. subtest_disp = _subtest.replace('_', '-')
  177. if cfg.no_altcoin_deps and _subtest in altcoin_deps:
  178. cfg._util.qmsg(gray(f'Skipping {test_type} subtest {subtest_disp!r} [--no-altcoin-deps]'))
  179. continue
  180. if sys.platform == 'win32' and _subtest in win_skip:
  181. cfg._util.qmsg(gray(f'Skipping {test_type} subtest {subtest_disp!r} for Windows platform'))
  182. continue
  183. if sys.platform == 'darwin' and _subtest in mac_skip:
  184. cfg._util.qmsg(gray(f'Skipping {test_type} subtest {subtest_disp!r} for macOS platform'))
  185. continue
  186. if platform.machine() == 'aarch64' and _subtest in arm_skip:
  187. cfg._util.qmsg(gray(f'Skipping {test_type} subtest {subtest_disp!r} for ARM platform'))
  188. continue
  189. run_subtest(t, _subtest)
  190. if hasattr(t, '_post'):
  191. t._post()
  192. else:
  193. assert not subtest, f'{subtest!r}: subtests not supported for this {test_type} test'
  194. if not mod.unit_test().run_test(test, UnitTestHelpers(test)):
  195. die(4, 'Unit test {test!r} failed')
  196. def main():
  197. for test in (cfg._args or all_tests):
  198. if '.' in test:
  199. test, subtest = test.split('.')
  200. else:
  201. subtest = None
  202. if test not in all_tests:
  203. die(1, f'{test!r}: test not recognized')
  204. if test not in exclude:
  205. run_test(test, subtest=subtest)
  206. end_msg(int(time.time()) - start_time)
  207. from mmgen.main import launch
  208. launch(func=main)