unit_tests.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/unit_tests.py: Unit tests for the MMGen suite
  20. """
  21. import sys,os,time,importlib,platform
  22. try:
  23. from include.test_init import repo_root
  24. except ImportError:
  25. from test.include.test_init import repo_root
  26. # for the unit tests, violate MMGen Project best practices and allow use of the dev tools
  27. # in production code:
  28. if not os.getenv('MMGEN_DEVTOOLS'):
  29. from mmgen.devinit import init_dev
  30. init_dev()
  31. from mmgen.cfg import Config,gc
  32. from mmgen.color import green,gray
  33. from mmgen.util import msg,gmsg,ymsg,Msg,async_run
  34. from test.include.common import set_globals,end_msg
  35. def die(ev,s):
  36. msg(s)
  37. sys.exit(ev)
  38. opts_data = {
  39. 'text': {
  40. 'desc': "Unit tests for the MMGen suite",
  41. 'usage':'[options] [test | test.subtest]...',
  42. 'options': """
  43. -h, --help Print this help message
  44. -a, --no-altcoin-deps Skip tests requiring altcoin daemons, libs or utils
  45. -A, --no-daemon-autostart Don't start and stop daemons automatically
  46. -D, --no-daemon-stop Don't stop auto-started daemons after running tests
  47. -f, --fast Speed up execution by reducing rounds on some tests
  48. -l, --list List available tests
  49. -L, --list-subtests List available tests and subtests
  50. -n, --names Print command names instead of descriptions
  51. -q, --quiet Produce quieter output
  52. -x, --exclude=T Exclude tests 'T' (comma-separated)
  53. -v, --verbose Produce more verbose output
  54. """,
  55. 'notes': """
  56. If no test is specified, all available tests are run
  57. """
  58. }
  59. }
  60. sys.argv.insert(1,'--skip-cfg-file')
  61. cfg = Config(opts_data=opts_data)
  62. if cfg.no_altcoin_deps:
  63. ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
  64. type(cfg)._reset_ok += ('use_internal_keccak_module','debug_addrlist')
  65. set_globals(cfg)
  66. file_pfx = 'ut_'
  67. tests_d = os.path.join(repo_root,'test','unit_tests_d')
  68. all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(tests_d) if fn.startswith(file_pfx))
  69. exclude = cfg.exclude.split(',') if cfg.exclude else []
  70. for e in exclude:
  71. if e not in all_tests:
  72. die(1,f'{e!r}: invalid parameter for --exclude (no such test)')
  73. start_time = int(time.time())
  74. if cfg.list:
  75. Msg(' '.join(all_tests))
  76. sys.exit(0)
  77. if cfg.list_subtests:
  78. def gen():
  79. for test in all_tests:
  80. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  81. if hasattr(mod,'unit_tests'):
  82. t = getattr(mod,'unit_tests')
  83. subtests = [k for k,v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  84. yield fs.format( test, ' '.join(f'{subtest}' for subtest in subtests) )
  85. else:
  86. yield test
  87. fs = '{:%s} {}' % max(len(t) for t in all_tests)
  88. Msg( fs.format('TEST','SUBTESTS') + '\n' + '\n'.join(gen()) )
  89. sys.exit(0)
  90. class UnitTestHelpers:
  91. def __init__(self,subtest_name):
  92. self.subtest_name = subtest_name
  93. def skip_msg(self,desc):
  94. cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_","-")!r} for {desc}'))
  95. def process_bad_data(self,data):
  96. if os.getenv('PYTHONOPTIMIZE'):
  97. ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
  98. return
  99. import re
  100. desc_w = max(len(e[0]) for e in data)
  101. exc_w = max(len(e[1]) for e in data)
  102. m_exc = '{!r}: incorrect exception type (expected {!r})'
  103. m_err = '{!r}: incorrect error msg (should match {!r}'
  104. m_noraise = "\nillegal action 'bad {}' failed to raise an exception (expected {!r})"
  105. for (desc,exc_chk,emsg_chk,func) in data:
  106. try:
  107. cfg._util.vmsg_r(' bad {:{w}}'.format( desc+':', w=desc_w+1 ))
  108. func()
  109. except Exception as e:
  110. exc = type(e).__name__
  111. emsg = e.args[0]
  112. cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
  113. assert exc == exc_chk, m_exc.format(exc,exc_chk)
  114. assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk)
  115. else:
  116. die(4,m_noraise.format(desc,exc_chk))
  117. tests_seen = []
  118. def run_test(test,subtest=None):
  119. mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
  120. def run_subtest(t,subtest):
  121. subtest_disp = subtest.replace('_','-')
  122. msg(f'Running unit subtest {test}.{subtest_disp}')
  123. if getattr(t,'silence_output',False):
  124. t._silence()
  125. if hasattr(t,'_pre_subtest'):
  126. getattr(t,'_pre_subtest')(test,subtest,UnitTestHelpers(subtest))
  127. try:
  128. ret = getattr(t,subtest.replace('-','_'))(test,UnitTestHelpers(subtest))
  129. if type(ret).__name__ == 'coroutine':
  130. ret = async_run(ret)
  131. except:
  132. if getattr(t,'silence_output',False):
  133. t._end_silence()
  134. raise
  135. if hasattr(t,'_post_subtest'):
  136. getattr(t,'_post_subtest')(test,subtest,UnitTestHelpers(subtest))
  137. if getattr(t,'silence_output',False):
  138. t._end_silence()
  139. if not ret:
  140. die(4,f'Unit subtest {subtest_disp!r} failed')
  141. if test not in tests_seen:
  142. gmsg(f'Running unit test {test}')
  143. tests_seen.append(test)
  144. if hasattr(mod,'unit_tests'): # new class-based API
  145. t = getattr(mod,'unit_tests')()
  146. altcoin_deps = getattr(t,'altcoin_deps',())
  147. win_skip = getattr(t,'win_skip',())
  148. arm_skip = getattr(t,'arm_skip',())
  149. subtests = (
  150. [subtest] if subtest else
  151. [k for k,v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
  152. )
  153. if hasattr(t,'_pre'):
  154. t._pre()
  155. for subtest in subtests:
  156. subtest_disp = subtest.replace('_','-')
  157. if cfg.no_altcoin_deps and subtest in altcoin_deps:
  158. cfg._util.qmsg(gray(f'Invoked with --no-altcoin-deps, so skipping subtest {subtest_disp!r}'))
  159. continue
  160. if sys.platform == 'win32' and subtest in win_skip:
  161. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for Windows platform'))
  162. continue
  163. elif platform.machine() == 'aarch64' and subtest in arm_skip:
  164. cfg._util.qmsg(gray(f'Skipping subtest {subtest_disp!r} for ARM platform'))
  165. continue
  166. run_subtest(t,subtest)
  167. if hasattr(t,'_post'):
  168. t._post()
  169. else:
  170. assert not subtest, f'{subtest!r}: subtests not supported for this unit test'
  171. if not mod.unit_test().run_test(test,UnitTestHelpers(test)):
  172. die(4,'Unit test {test!r} failed')
  173. if __name__ == '__main__':
  174. try:
  175. for test in (cfg._args or all_tests):
  176. if '.' in test:
  177. test,subtest = test.split('.')
  178. else:
  179. subtest = None
  180. if test not in all_tests:
  181. die(1,f'{test!r}: test not recognized')
  182. if test not in exclude:
  183. run_test(test,subtest=subtest)
  184. end_msg(int(time.time()) - start_time)
  185. except KeyboardInterrupt:
  186. die(1,green('\nExiting at user request'))