unit_tests.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/unit_tests.py: Unit tests for the MMGen suite
  20. """
  21. import sys,os,time,importlib,platform
  22. import include.test_init
  23. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  24. # in production code:
  25. if not os.getenv('MMGEN_DEVTOOLS'):
  26. from mmgen.devinit import init_dev
  27. init_dev()
  28. from mmgen.common import *
  29. from test.include.common import set_globals,end_msg
  30. opts_data = {
  31. 'text': {
  32. 'desc': "Unit tests for the MMGen suite",
  33. 'usage':'[options] [test | test.subtest]...',
  34. 'options': """
  35. -h, --help Print this help message
  36. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  37. -A, --no-daemon-autostart Don't start and stop daemons automatically
  38. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  39. -f, --fast Speed up execution by reducing rounds on some tests
  40. -l, --list List available tests
  41. -L, --list-subtests List available tests and subtests
  42. -n, --names Print command names instead of descriptions
  43. -q, --quiet Produce quieter output
  44. -x, --exclude=T Exclude tests 'T' (comma-separated)
  45. -v, --verbose Produce more verbose output
  46. """,
  47. 'notes': """
  48. If no test is specified, all available tests are run
  49. """
  50. }
  51. }
  52. sys.argv.insert(1,'--skip-cfg-file')
  53. cfg = Config(opts_data=opts_data)
  54. type(cfg)._reset_ok += ('use_internal_keccak_module','debug_addrlist')
  55. set_globals(cfg)
  56. file_pfx = 'ut_'
  57. tests_d = os.path.join(include.test_init.repo_root,'test','unit_tests_d')
  58. all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(tests_d) if fn.startswith(file_pfx))
  59. exclude = cfg.exclude.split(',') if cfg.exclude else []
  60. for e in exclude:
  61. if e not in all_tests:
  62. die(1,f'{e!r}: invalid parameter for --exclude (no such test)')
  63. start_time = int(time.time())
  64. if cfg.list:
  65. Msg(' '.join(all_tests))
  66. sys.exit(0)
  67. if cfg.list_subtests:
  68. def gen():
  69. for test in all_tests:
  70. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  71. if hasattr(mod,'unit_tests'):
  72. t = getattr(mod,'unit_tests')
  73. subtests = [k for k,v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  74. yield fs.format( test, ' '.join(f'{subtest}' for subtest in subtests) )
  75. else:
  76. yield test
  77. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  78. Msg( fs.format('TEST','SUBTESTS') + '\n' + '\n'.join(gen()) )
  79. sys.exit(0)
  80. class UnitTestHelpers:
  81. def __init__(self,subtest_name):
  82. self.subtest_name = subtest_name
  83. def skip_msg(self,desc):
  84. cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_","-")!r} for {desc}'))
  85. def process_bad_data(self,data):
  86. if os.getenv('PYTHONOPTIMIZE'):
  87. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  88. return
  89. import re
  90. desc_w = max(len(e[0]) for e in data)
  91. exc_w = max(len(e[1]) for e in data)
  92. m_exc = '{!r}: incorrect exception type (expected {!r})'
  93. m_err = '{!r}: incorrect error msg (should match {!r}'
  94. m_noraise = "\nillegal action 'bad {}' failed to raise an exception (expected {!r})"
  95. for (desc,exc_chk,emsg_chk,func) in data:
  96. try:
  97. cfg._util.vmsg_r(' bad {:{w}}'.format( desc+':', w=desc_w+1 ))
  98. func()
  99. except Exception as e:
  100. exc = type(e).__name__
  101. emsg = e.args[0]
  102. cfg._util.vmsg(' {:{w}} [{}]'.format( exc, emsg, w=exc_w ))
  103. assert exc == exc_chk, m_exc.format(exc,exc_chk)
  104. assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk)
  105. else:
  106. die(4,m_noraise.format(desc,exc_chk))
  107. tests_seen = []
  108. def run_test(test,subtest=None):
  109. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  110. def run_subtest(subtest):
  111. subtest_disp = subtest.replace('_','-')
  112. msg(f'Running unit subtest {test}.{subtest_disp}')
  113. t = getattr(mod,'unit_tests')()
  114. if getattr(t,'silence_output',False):
  115. t._silence()
  116. if hasattr(t,'_pre_subtest'):
  117. getattr(t,'_pre_subtest')(test,subtest,UnitTestHelpers(subtest))
  118. try:
  119. ret = getattr(t,subtest.replace('-','_'))(test,UnitTestHelpers(subtest))
  120. if type(ret).__name__ == 'coroutine':
  121. ret = async_run(ret)
  122. except:
  123. if getattr(t,'silence_output',False):
  124. t._end_silence()
  125. raise
  126. if hasattr(t,'_post_subtest'):
  127. getattr(t,'_post_subtest')(test,subtest,UnitTestHelpers(subtest))
  128. if getattr(t,'silence_output',False):
  129. t._end_silence()
  130. if not ret:
  131. die(4,f'Unit subtest {subtest_disp!r} failed')
  132. if test not in tests_seen:
  133. gmsg(f'Running unit test {test}')
  134. tests_seen.append(test)
  135. if hasattr(mod,'unit_tests'): # new class-based API
  136. t = getattr(mod,'unit_tests')()
  137. altcoin_deps = getattr(t,'altcoin_deps',())
  138. win_skip = getattr(t,'win_skip',())
  139. arm_skip = getattr(t,'arm_skip',())
  140. subtests = (
  141. [subtest] if subtest else
  142. [k for k,v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  143. )
  144. if hasattr(t,'_pre'):
  145. t._pre()
  146. for subtest in subtests:
  147. subtest_disp = subtest.replace('_','-')
  148. if cfg.no_altcoin_deps and subtest in altcoin_deps:
  149. cfg._util.qmsg(gray(f'Invoked with --no-altcoin-deps, so skipping subtest {subtest_disp!r}'))
  150. continue
  151. if gc.platform == 'win' and subtest in win_skip:
  152. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for Windows platform'))
  153. continue
  154. elif platform.machine() == 'aarch64' and subtest in arm_skip:
  155. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for ARM platform'))
  156. continue
  157. run_subtest(subtest)
  158. if hasattr(t,'_post'):
  159. t._post()
  160. else:
  161. assert not subtest, f'{subtest!r}: subtests not supported for this unit test'
  162. if not mod.unit_test().run_test(test,UnitTestHelpers(test)):
  163. die(4,'Unit test {test!r} failed')
  164. try:
  165. for test in (cfg._args or all_tests):
  166. if '.' in test:
  167. test,subtest = test.split('.')
  168. else:
  169. subtest = None
  170. if test not in all_tests:
  171. die(1,f'{test!r}: test not recognized')
  172. if test not in exclude:
  173. run_test(test,subtest=subtest)
  174. end_msg(int(time.time()) - start_time)
  175. except KeyboardInterrupt:
  176. die(1,green('\nExiting at user request'))