unit_tests.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2019 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/unit_tests.py: Unit tests for the MMGen suite
  20. """
  21. import sys,os,time
  22. repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),os.pardir)))
  23. os.chdir(repo_root)
  24. sys.path[0] = repo_root
  25. os.environ['MMGEN_TEST_SUITE'] = '1'
  26. # Import these _after_ prepending repo_root to sys.path
  27. from mmgen.common import *
  28. opts_data = lambda: {
  29. 'desc': "Unit tests for the MMGen suite",
  30. 'usage':'[options] [tests]',
  31. 'options': """
  32. -h, --help Print this help message
  33. -l, --list List available tests
  34. -n, --names Print command names instead of descriptions
  35. -q, --quiet Produce quieter output
  36. -v, --verbose Produce more verbose output
  37. """,
  38. 'notes': """
  39. If no test is specified, all available tests are run
  40. """
  41. }
  42. sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
  43. cmd_args = opts.init(opts_data)
  44. class UnitTests(object):
  45. def _get_core_repo_root(self):
  46. self.core_repo_root = os.getenv('CORE_REPO_ROOT')
  47. if not self.core_repo_root:
  48. die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test')
  49. def tx_deserialize(self,name):
  50. def test_tx(txhex,desc,n):
  51. def has_nonstandard_outputs(outputs):
  52. for o in outputs:
  53. t = o['scriptPubKey']['type']
  54. if t in ('nonstandard','pubkey','nulldata'):
  55. return True
  56. return False
  57. d = g.rpch.decoderawtransaction(txhex)
  58. if has_nonstandard_outputs(d['vout']): return False
  59. dt = DeserializedTX(txhex)
  60. if opt.verbose:
  61. Msg('\n====================================================')
  62. Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc))
  63. if opt.verbose:
  64. Pmsg(d)
  65. Msg('----------------------------------------------------')
  66. Pmsg(dt)
  67. # metadata
  68. assert dt['txid'] == d['txid'],'TXID does not match'
  69. assert dt['lock_time'] == d['locktime'],'Locktime does not match'
  70. assert dt['version'] == d['version'],'Version does not match'
  71. # inputs
  72. a,b = d['vin'],dt['txins']
  73. for i in range(len(a)):
  74. assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i)
  75. assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i)
  76. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  77. 'nSeq of input {} does not match'.format(i))
  78. if 'txinwitness' in a[i]:
  79. assert a[i]['txinwitness'] == b[i]['witness'],(
  80. 'witness of input {} does not match'.format(i))
  81. # outputs
  82. a,b = d['vout'],dt['txouts']
  83. for i in range(len(a)):
  84. assert a[i]['scriptPubKey']['addresses'][0] == b[i]['address'],(
  85. 'address of ouput {} does not match'.format(i))
  86. assert a[i]['value'] == b[i]['amount'],'value of ouput {} does not match'.format(i)
  87. assert a[i]['scriptPubKey']['hex'] == b[i]['scriptPubKey'],(
  88. 'scriptPubKey of ouput {} does not match'.format(i))
  89. return True
  90. def print_info(fn,extra_desc):
  91. if opt.names:
  92. Msg_r('{} {} ({}){}'.format(
  93. purple('Testing'),
  94. cyan(name),
  95. extra_desc,
  96. '' if opt.quiet else '\n'))
  97. else:
  98. Msg('Testing transactions from {!r}'.format(fn))
  99. def test_core_vectors():
  100. self._get_core_repo_root()
  101. fn = os.path.join(self.core_repo_root,'src/test/data/tx_valid.json')
  102. data = json.loads(open(fn).read())
  103. print_info(fn,'Core test vectors')
  104. n = 1
  105. for e in data:
  106. if type(e[0]) == list:
  107. test_tx(e[1],desc,n)
  108. n += 1
  109. else:
  110. desc = e[0]
  111. Msg('OK')
  112. def test_mmgen_txs():
  113. fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'),
  114. ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
  115. ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx') )
  116. from mmgen.protocol import init_coin
  117. from mmgen.tx import MMGenTX
  118. from importlib import reload
  119. print_info('test/ref/*rawtx','MMGen reference transactions')
  120. for n,(coin,tn,fn) in enumerate(fns):
  121. init_coin(coin,tn)
  122. rpc_init(reinit=True)
  123. reload(sys.modules['mmgen.tx'])
  124. test_tx(MMGenTX(fn).hex,fn,n+1)
  125. init_coin('btc',False)
  126. rpc_init(reinit=True)
  127. reload(sys.modules['mmgen.tx'])
  128. Msg('OK')
  129. from mmgen.tx import DeserializedTX
  130. import json
  131. test_mmgen_txs()
  132. test_core_vectors()
  133. return True
  134. def exit_msg():
  135. t = int(time.time()) - start_time
  136. gmsg('All requested tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60))
  137. def run_unit_test(test):
  138. gmsg('Running unit test {}'.format(test))
  139. t = UnitTests()
  140. return getattr(t,test)(test)
  141. all_tests = filter(lambda s: s[0] != '_',dir(UnitTests))
  142. start_time = int(time.time())
  143. if opt.list:
  144. Die(0,' '.join(all_tests))
  145. try:
  146. for test in cmd_args:
  147. if test not in all_tests:
  148. die(1,"'{}': test not recognized".format(test))
  149. for test in (cmd_args or all_tests):
  150. if not run_unit_test(test):
  151. rdie(2,'Test failed')
  152. exit_msg()
  153. except KeyboardInterrupt:
  154. die(1,green('\nExiting at user request'))