unit_tests.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/unit_tests.py: Unit tests for the MMGen suite
  20. """
  21. import sys,os,time,importlib,platform
  22. try:
  23. from include.test_init import repo_root
  24. except ImportError:
  25. from test.include.test_init import repo_root
  26. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  27. # in production code:
  28. if not os.getenv('MMGEN_DEVTOOLS'):
  29. from mmgen.devinit import init_dev
  30. init_dev()
  31. from mmgen.cfg import Config,gc
  32. from mmgen.color import green,gray
  33. from mmgen.util import msg,gmsg,ymsg,Msg,die,async_run
  34. from test.include.common import set_globals,end_msg
  35. opts_data = {
  36. 'text': {
  37. 'desc': "Unit tests for the MMGen suite",
  38. 'usage':'[options] [test | test.subtest]...',
  39. 'options': """
  40. -h, --help Print this help message
  41. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  42. -A, --no-daemon-autostart Don't start and stop daemons automatically
  43. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  44. -f, --fast Speed up execution by reducing rounds on some tests
  45. -l, --list List available tests
  46. -L, --list-subtests List available tests and subtests
  47. -n, --names Print command names instead of descriptions
  48. -q, --quiet Produce quieter output
  49. -x, --exclude=T Exclude tests 'T' (comma-separated)
  50. -v, --verbose Produce more verbose output
  51. """,
  52. 'notes': """
  53. If no test is specified, all available tests are run
  54. """
  55. }
  56. }
  57. sys.argv.insert(1,'--skip-cfg-file')
  58. cfg = Config(opts_data=opts_data)
  59. if cfg.no_altcoin_deps:
  60. ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
  61. type(cfg)._reset_ok += ('use_internal_keccak_module','debug_addrlist')
  62. set_globals(cfg)
  63. file_pfx = 'ut_'
  64. tests_d = os.path.join(repo_root,'test','unit_tests_d')
  65. all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(tests_d) if fn.startswith(file_pfx))
  66. exclude = cfg.exclude.split(',') if cfg.exclude else []
  67. for e in exclude:
  68. if e not in all_tests:
  69. die(1,f'{e!r}: invalid parameter for --exclude (no such test)')
  70. start_time = int(time.time())
  71. if cfg.list:
  72. Msg(' '.join(all_tests))
  73. sys.exit(0)
  74. if cfg.list_subtests:
  75. def gen():
  76. for test in all_tests:
  77. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  78. if hasattr(mod,'unit_tests'):
  79. t = getattr(mod,'unit_tests')
  80. subtests = [k for k,v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  81. yield fs.format( test, ' '.join(f'{subtest}' for subtest in subtests) )
  82. else:
  83. yield test
  84. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  85. Msg( fs.format('TEST','SUBTESTS') + '\n' + '\n'.join(gen()) )
  86. sys.exit(0)
  87. class UnitTestHelpers:
  88. def __init__(self,subtest_name):
  89. self.subtest_name = subtest_name
  90. def skip_msg(self,desc):
  91. cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_","-")!r} for {desc}'))
  92. def process_bad_data(self,data):
  93. if os.getenv('PYTHONOPTIMIZE'):
  94. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  95. return
  96. import re
  97. desc_w = max(len(e[0]) for e in data)
  98. exc_w = max(len(e[1]) for e in data)
  99. m_exc = '{!r}: incorrect exception type (expected {!r})'
  100. m_err = '{!r}: incorrect error msg (should match {!r}'
  101. m_noraise = "\nillegal action 'bad {}' failed to raise an exception (expected {!r})"
  102. for (desc,exc_chk,emsg_chk,func) in data:
  103. try:
  104. cfg._util.vmsg_r(' bad {:{w}}'.format( desc+':', w=desc_w+1 ))
  105. func()
  106. except Exception as e:
  107. exc = type(e).__name__
  108. emsg = e.args[0]
  109. cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
  110. assert exc == exc_chk, m_exc.format(exc,exc_chk)
  111. assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk)
  112. else:
  113. die(4,m_noraise.format(desc,exc_chk))
  114. tests_seen = []
  115. def run_test(test,subtest=None):
  116. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  117. def run_subtest(t,subtest):
  118. subtest_disp = subtest.replace('_','-')
  119. msg(f'Running unit subtest {test}.{subtest_disp}')
  120. if getattr(t,'silence_output',False):
  121. t._silence()
  122. if hasattr(t,'_pre_subtest'):
  123. getattr(t,'_pre_subtest')(test,subtest,UnitTestHelpers(subtest))
  124. try:
  125. ret = getattr(t,subtest.replace('-','_'))(test,UnitTestHelpers(subtest))
  126. if type(ret).__name__ == 'coroutine':
  127. ret = async_run(ret)
  128. except:
  129. if getattr(t,'silence_output',False):
  130. t._end_silence()
  131. raise
  132. if hasattr(t,'_post_subtest'):
  133. getattr(t,'_post_subtest')(test,subtest,UnitTestHelpers(subtest))
  134. if getattr(t,'silence_output',False):
  135. t._end_silence()
  136. if not ret:
  137. die(4,f'Unit subtest {subtest_disp!r} failed')
  138. if test not in tests_seen:
  139. gmsg(f'Running unit test {test}')
  140. tests_seen.append(test)
  141. if hasattr(mod,'unit_tests'): # new class-based API
  142. t = getattr(mod,'unit_tests')()
  143. altcoin_deps = getattr(t,'altcoin_deps',())
  144. win_skip = getattr(t,'win_skip',())
  145. arm_skip = getattr(t,'arm_skip',())
  146. subtests = (
  147. [subtest] if subtest else
  148. [k for k,v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  149. )
  150. if hasattr(t,'_pre'):
  151. t._pre()
  152. for subtest in subtests:
  153. subtest_disp = subtest.replace('_','-')
  154. if cfg.no_altcoin_deps and subtest in altcoin_deps:
  155. cfg._util.qmsg(gray(f'Invoked with --no-altcoin-deps, so skipping subtest {subtest_disp!r}'))
  156. continue
  157. if sys.platform == 'win32' and subtest in win_skip:
  158. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for Windows platform'))
  159. continue
  160. elif platform.machine() == 'aarch64' and subtest in arm_skip:
  161. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for ARM platform'))
  162. continue
  163. run_subtest(t,subtest)
  164. if hasattr(t,'_post'):
  165. t._post()
  166. else:
  167. assert not subtest, f'{subtest!r}: subtests not supported for this unit test'
  168. if not mod.unit_test().run_test(test,UnitTestHelpers(test)):
  169. die(4,'Unit test {test!r} failed')
  170. try:
  171. for test in (cfg._args or all_tests):
  172. if '.' in test:
  173. test,subtest = test.split('.')
  174. else:
  175. subtest = None
  176. if test not in all_tests:
  177. die(1,f'{test!r}: test not recognized')
  178. if test not in exclude:
  179. run_test(test,subtest=subtest)
  180. end_msg(int(time.time()) - start_time)
  181. except KeyboardInterrupt:
  182. die(1,green('\nExiting at user request'))