unit_tests.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/unit_tests.py: Unit tests for the MMGen suite
  20. """
  21. import sys,os,time,importlib,platform,asyncio
  22. try:
  23. from include.test_init import repo_root
  24. except ImportError:
  25. from test.include.test_init import repo_root
  26. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  27. # in production code:
  28. if not os.getenv('MMGEN_DEVTOOLS'):
  29. from mmgen.devinit import init_dev
  30. init_dev()
  31. from mmgen.cfg import Config,gc
  32. from mmgen.color import green,gray,brown,orange
  33. from mmgen.util import msg,gmsg,ymsg,Msg
  34. from test.include.common import set_globals,end_msg
  35. def die(ev,s):
  36. msg(s)
  37. sys.exit(ev)
  38. opts_data = {
  39. 'text': {
  40. 'desc': "Unit tests for the MMGen suite",
  41. 'usage':'[options] [test | test.subtest]...',
  42. 'options': """
  43. -h, --help Print this help message
  44. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  45. -A, --no-daemon-autostart Don't start and stop daemons automatically
  46. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  47. -f, --fast Speed up execution by reducing rounds on some tests
  48. -l, --list List available tests
  49. -L, --list-subtests List available tests and subtests
  50. -n, --names Print command names instead of descriptions
  51. -q, --quiet Produce quieter output
  52. -x, --exclude=T Exclude tests 'T' (comma-separated)
  53. -v, --verbose Produce more verbose output
  54. """,
  55. 'notes': """
  56. If no test is specified, all available tests are run
  57. """
  58. }
  59. }
  60. sys.argv.insert(1,'--skip-cfg-file')
  61. cfg = Config(opts_data=opts_data)
  62. if cfg.no_altcoin_deps:
  63. ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
  64. type(cfg)._reset_ok += ('use_internal_keccak_module','debug_addrlist')
  65. set_globals(cfg)
  66. file_pfx = 'ut_'
  67. tests_d = os.path.join(repo_root,'test','unit_tests_d')
  68. all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(tests_d) if fn.startswith(file_pfx))
  69. exclude = cfg.exclude.split(',') if cfg.exclude else []
  70. for e in exclude:
  71. if e not in all_tests:
  72. die(1,f'{e!r}: invalid parameter for --exclude (no such test)')
  73. start_time = int(time.time())
  74. if cfg.list:
  75. Msg(' '.join(all_tests))
  76. sys.exit(0)
  77. if cfg.list_subtests:
  78. def gen():
  79. for test in all_tests:
  80. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  81. if hasattr(mod,'unit_tests'):
  82. t = getattr(mod,'unit_tests')
  83. subtests = [k for k,v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  84. yield fs.format( test, ' '.join(f'{subtest}' for subtest in subtests) )
  85. else:
  86. yield test
  87. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  88. Msg( fs.format('TEST','SUBTESTS') + '\n' + '\n'.join(gen()) )
  89. sys.exit(0)
  90. class UnitTestHelpers:
  91. def __init__(self,subtest_name):
  92. self.subtest_name = subtest_name
  93. def skip_msg(self,desc):
  94. cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_","-")!r} for {desc}'))
  95. def process_bad_data(self,data,pfx='bad '):
  96. if os.getenv('PYTHONOPTIMIZE'):
  97. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  98. return
  99. import re
  100. desc_w = max(len(e[0]) for e in data)
  101. exc_w = max(len(e[1]) for e in data)
  102. m_exc = '{!r}: incorrect exception type (expected {!r})'
  103. m_err = '{!r}: incorrect error msg (should match {!r}'
  104. m_noraise = "\nillegal action '{}{}' failed to raise an exception (expected {!r})"
  105. for (desc,exc_chk,emsg_chk,func) in data:
  106. try:
  107. cfg._util.vmsg_r(' {}{:{w}}'.format(pfx, desc+':', w=desc_w+1))
  108. ret = func()
  109. if type(ret).__name__ == 'coroutine':
  110. asyncio.run(ret)
  111. except Exception as e:
  112. exc = type(e).__name__
  113. emsg = e.args[0]
  114. cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
  115. assert exc == exc_chk, m_exc.format(exc,exc_chk)
  116. assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk)
  117. else:
  118. die(4,m_noraise.format(pfx,desc,exc_chk))
  119. tests_seen = []
  120. def run_test(test,subtest=None):
  121. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  122. def run_subtest(t,subtest):
  123. subtest_disp = subtest.replace('_','-')
  124. msg(brown('Running unit subtest ') + orange(f'{test}.{subtest_disp}'))
  125. if getattr(t,'silence_output',False):
  126. t._silence()
  127. if hasattr(t,'_pre_subtest'):
  128. getattr(t,'_pre_subtest')(test,subtest,UnitTestHelpers(subtest))
  129. try:
  130. ret = getattr(t,subtest.replace('-','_'))(test,UnitTestHelpers(subtest))
  131. if type(ret).__name__ == 'coroutine':
  132. ret = asyncio.run(ret)
  133. except:
  134. if getattr(t,'silence_output',False):
  135. t._end_silence()
  136. raise
  137. if hasattr(t,'_post_subtest'):
  138. getattr(t,'_post_subtest')(test,subtest,UnitTestHelpers(subtest))
  139. if getattr(t,'silence_output',False):
  140. t._end_silence()
  141. if not ret:
  142. die(4,f'Unit subtest {subtest_disp!r} failed')
  143. if test not in tests_seen:
  144. gmsg(f'Running unit test {test}')
  145. tests_seen.append(test)
  146. if hasattr(mod,'unit_tests'): # new class-based API
  147. t = getattr(mod,'unit_tests')()
  148. altcoin_deps = getattr(t,'altcoin_deps',())
  149. win_skip = getattr(t,'win_skip',())
  150. arm_skip = getattr(t,'arm_skip',())
  151. subtests = (
  152. [subtest] if subtest else
  153. [k for k,v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  154. )
  155. if hasattr(t,'_pre'):
  156. t._pre()
  157. for subtest in subtests:
  158. subtest_disp = subtest.replace('_','-')
  159. if cfg.no_altcoin_deps and subtest in altcoin_deps:
  160. cfg._util.qmsg(gray(f'Invoked with --no-altcoin-deps, so skipping subtest {subtest_disp!r}'))
  161. continue
  162. if sys.platform == 'win32' and subtest in win_skip:
  163. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for Windows platform'))
  164. continue
  165. elif platform.machine() == 'aarch64' and subtest in arm_skip:
  166. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for ARM platform'))
  167. continue
  168. run_subtest(t,subtest)
  169. if hasattr(t,'_post'):
  170. t._post()
  171. else:
  172. assert not subtest, f'{subtest!r}: subtests not supported for this unit test'
  173. if not mod.unit_test().run_test(test,UnitTestHelpers(test)):
  174. die(4,'Unit test {test!r} failed')
  175. def main():
  176. for test in (cfg._args or all_tests):
  177. if '.' in test:
  178. test,subtest = test.split('.')
  179. else:
  180. subtest = None
  181. if test not in all_tests:
  182. die(1,f'{test!r}: test not recognized')
  183. if test not in exclude:
  184. run_test(test,subtest=subtest)
  185. end_msg(int(time.time()) - start_time)
  186. from mmgen.main import launch
  187. launch(func=main)