unit_tests.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2019 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test/unit_tests.py: Unit tests for the MMGen suite
  20. """
  21. import sys,os,time
  22. repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),os.pardir)))
  23. os.chdir(repo_root)
  24. sys.path[0] = repo_root
  25. os.environ['MMGEN_TEST_SUITE'] = '1'
  26. # Import these _after_ prepending repo_root to sys.path
  27. from mmgen.common import *
  28. opts_data = {
  29. 'text': {
  30. 'desc': "Unit tests for the MMGen suite",
  31. 'usage':'[options] [tests]',
  32. 'options': """
  33. -h, --help Print this help message
  34. -l, --list List available tests
  35. -n, --names Print command names instead of descriptions
  36. -q, --quiet Produce quieter output
  37. -v, --verbose Produce more verbose output
  38. """,
  39. 'notes': """
  40. If no test is specified, all available tests are run
  41. """
  42. }
  43. }
  44. sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
  45. cmd_args = opts.init(opts_data)
  46. class UnitTests(object):
  47. def _get_core_repo_root(self):
  48. self.core_repo_root = os.getenv('CORE_REPO_ROOT')
  49. if not self.core_repo_root:
  50. die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test')
  51. def tx_deserialize(self,name):
  52. def test_tx(txhex,desc,n):
  53. def has_nonstandard_outputs(outputs):
  54. for o in outputs:
  55. t = o['scriptPubKey']['type']
  56. if t in ('nonstandard','pubkey','nulldata'):
  57. return True
  58. return False
  59. d = g.rpch.decoderawtransaction(txhex)
  60. if has_nonstandard_outputs(d['vout']): return False
  61. dt = DeserializedTX(txhex)
  62. if opt.verbose:
  63. Msg('\n====================================================')
  64. Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc))
  65. if opt.verbose:
  66. Pmsg(d)
  67. Msg('----------------------------------------------------')
  68. Pmsg(dt)
  69. # metadata
  70. assert dt['txid'] == d['txid'],'TXID does not match'
  71. assert dt['lock_time'] == d['locktime'],'Locktime does not match'
  72. assert dt['version'] == d['version'],'Version does not match'
  73. # inputs
  74. a,b = d['vin'],dt['txins']
  75. for i in range(len(a)):
  76. assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i)
  77. assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i)
  78. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  79. 'nSeq of input {} does not match'.format(i))
  80. if 'txinwitness' in a[i]:
  81. assert a[i]['txinwitness'] == b[i]['witness'],(
  82. 'witness of input {} does not match'.format(i))
  83. # outputs
  84. a,b = d['vout'],dt['txouts']
  85. for i in range(len(a)):
  86. assert a[i]['scriptPubKey']['addresses'][0] == b[i]['address'],(
  87. 'address of ouput {} does not match'.format(i))
  88. assert a[i]['value'] == b[i]['amount'],'value of ouput {} does not match'.format(i)
  89. assert a[i]['scriptPubKey']['hex'] == b[i]['scriptPubKey'],(
  90. 'scriptPubKey of ouput {} does not match'.format(i))
  91. return True
  92. def print_info(fn,extra_desc):
  93. if opt.names:
  94. Msg_r('{} {} ({}){}'.format(
  95. purple('Testing'),
  96. cyan(name),
  97. extra_desc,
  98. '' if opt.quiet else '\n'))
  99. else:
  100. Msg_r('Testing transactions from {!r}'.format(fn))
  101. if not opt.quiet: Msg('')
  102. def test_core_vectors():
  103. self._get_core_repo_root()
  104. fn = os.path.join(self.core_repo_root,'src/test/data/tx_valid.json')
  105. data = json.loads(open(fn).read())
  106. print_info(fn,'Core test vectors')
  107. n = 1
  108. for e in data:
  109. if type(e[0]) == list:
  110. test_tx(e[1],desc,n)
  111. n += 1
  112. else:
  113. desc = e[0]
  114. Msg('OK')
  115. def test_mmgen_txs():
  116. fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'),
  117. ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
  118. ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx') )
  119. from mmgen.protocol import init_coin
  120. from mmgen.tx import MMGenTX
  121. print_info('test/ref/*rawtx','MMGen reference transactions')
  122. for n,(coin,tn,fn) in enumerate(fns):
  123. init_coin(coin,tn)
  124. rpc_init(reinit=True)
  125. test_tx(MMGenTX(fn).hex,fn,n+1)
  126. init_coin('btc',False)
  127. rpc_init(reinit=True)
  128. Msg('OK')
  129. from mmgen.tx import DeserializedTX
  130. import json
  131. test_mmgen_txs()
  132. test_core_vectors()
  133. return True
  134. def subseed(self,name):
  135. from mmgen.seed import Seed
  136. from mmgen.obj import SubSeedIdxRange
  137. def basic_ops():
  138. msg_r('Testing basic ops...')
  139. for a,b,c,d,e,f,h in (
  140. (8,'4710FBF0','0C1B0615','803B165C','2669AC64',256,'10L'),
  141. (6,'9D07ABBD','EBA9C33F','20787E6A','192E2AA2',192,'10L'),
  142. (4,'43670520','04A4CCB3','B5F21D7B','C1934CFF',128,'10L'),
  143. ):
  144. seed_bin = bytes.fromhex('deadbeef' * a)
  145. seed = Seed(seed_bin)
  146. assert seed.sid == b, seed.sid
  147. subseed = seed.subseed('2s')
  148. assert subseed.sid == c, subseed.sid
  149. subseed = seed.subseed('3')
  150. assert subseed.sid == d, subseed.sid
  151. subseed = seed.subseed_by_seed_id(e)
  152. assert subseed.length == f, subseed.length
  153. assert subseed.sid == e, subseed.sid
  154. assert subseed.idx == 10, subseed.idx
  155. assert subseed.ss_idx == h, subseed.ss_idx
  156. seed2 = Seed(seed_bin)
  157. s2s = seed2.subseeds['short']
  158. s2l = seed2.subseeds['long']
  159. seed2.gen_subseeds(1)
  160. assert len(s2s) == 1, len(s2s)
  161. seed2.gen_subseeds(1) # do nothing
  162. seed2.gen_subseeds(2) # append one item
  163. seed2.gen_subseeds(5)
  164. assert len(s2s) == 5, len(s2s)
  165. seed2.gen_subseeds(3) # do nothing
  166. assert len(s2l) == 5, len(s2l)
  167. seed2.gen_subseeds(10)
  168. assert len(s2s) == 10, len(s2s)
  169. assert seed.pformat() == seed2.pformat()
  170. s = seed.fmt_subseeds()
  171. s_lines = s.strip().split('\n')
  172. assert len(s_lines) == g.subseeds + 4, s
  173. a = seed.subseed('2L').sid
  174. b = [e for e in s_lines if ' 2L:' in e][0].strip().split()[1]
  175. assert a == b, b
  176. c = seed.subseed('2').sid
  177. assert c == a, c
  178. a = seed.subseed('5S').sid
  179. b = [e for e in s_lines if ' 5S:' in e][0].strip().split()[3]
  180. assert a == b, b
  181. s = seed.fmt_subseeds(g.subseeds+1,g.subseeds+2)
  182. s_lines = s.strip().split('\n')
  183. assert len(s_lines) == 6, s
  184. ss_idx = str(g.subseeds+2) + 'S'
  185. a = seed.subseed(ss_idx).sid
  186. b = [e for e in s_lines if ' {}:'.format(ss_idx) in e][0].strip().split()[3]
  187. assert a == b, b
  188. s = seed.fmt_subseeds(1,2)
  189. s_lines = s.strip().split('\n')
  190. assert len(s_lines) == 6, s
  191. msg('OK')
  192. def defaults_and_limits():
  193. msg_r('Testing defaults and limits...')
  194. seed_bin = bytes.fromhex('deadbeef' * 8)
  195. seed = Seed(seed_bin)
  196. seed.gen_subseeds()
  197. ss = seed.subseeds
  198. assert len(ss['short']) == g.subseeds, ss['short']
  199. assert len(ss['long']) == g.subseeds, ss['long']
  200. seed = Seed(seed_bin)
  201. seed.subseed_by_seed_id('EEEEEEEE')
  202. ss = seed.subseeds
  203. assert len(ss['short']) == g.subseeds, ss['short']
  204. assert len(ss['long']) == g.subseeds, ss['long']
  205. seed = Seed(seed_bin)
  206. subseed = seed.subseed_by_seed_id('803B165C')
  207. assert subseed.sid == '803B165C', subseed.sid
  208. assert subseed.idx == 3, subseed.idx
  209. seed = Seed(seed_bin)
  210. subseed = seed.subseed_by_seed_id('803B165C',last_idx=1)
  211. assert subseed == None, subseed
  212. r = SubSeedIdxRange('1-5')
  213. r2 = SubSeedIdxRange(1,5)
  214. assert r2 == r, r2
  215. assert r == (r.first,r.last), r
  216. assert r.first == 1, r.first
  217. assert r.last == 5, r.last
  218. assert r.items == [1,2,3,4,5], r.items
  219. assert list(r.iterate()) == r.items, list(r.iterate())
  220. r = SubSeedIdxRange('22')
  221. r2 = SubSeedIdxRange(22,22)
  222. assert r2 == r, r2
  223. assert r == (r.first,r.last), r
  224. assert r.first == 22, r.first
  225. assert r.last == 22, r.last
  226. assert r.items == [22], r
  227. assert list(r.iterate()) == r.items, list(r.iterate())
  228. r = SubSeedIdxRange('3-3')
  229. assert r.items == [3], r.items
  230. r = SubSeedIdxRange('{}-{}'.format(g.subseeds-1,g.subseeds))
  231. assert r.items == [g.subseeds-1,g.subseeds], r.items
  232. for n,e in enumerate(SubSeedIdxRange('1-5').iterate(),1):
  233. assert n == e, e
  234. assert n == 5, n
  235. msg('OK')
  236. def collisions():
  237. msg_r('Testing Seed ID collisions ({} subseed pairs)...'.format(59344))
  238. seed_bin = bytes.fromhex('deadbeef' * 8)
  239. seed = Seed(seed_bin)
  240. subseed = seed.subseed('29429s')
  241. assert subseed.sid == 'AE4C5E39', subseed.sid
  242. assert subseed.nonce == 1, subseed.nonce
  243. subseed = seed.subseed('59344')
  244. assert subseed.sid == 'FC4AD16F', subseed.sid
  245. assert subseed.nonce == 1, subseed.nonce
  246. subseed2 = seed.subseed_by_seed_id('FC4AD16F')
  247. assert subseed.pformat() == subseed2.pformat()
  248. msg('OK')
  249. def count_collisions():
  250. msg_r('Counting Seed ID collisions ({} subseed pairs)...'.format(SubSeedIdxRange.max_idx))
  251. seed_bin = bytes.fromhex('12abcdef' * 8)
  252. seed = Seed(seed_bin)
  253. seed.gen_subseeds(SubSeedIdxRange.max_idx)
  254. ss = seed.subseeds
  255. for sid in ss['long']:
  256. # msg(sid)
  257. assert sid not in ss['short']
  258. collisions = 0
  259. for k in ('short','long'):
  260. for sid in ss[k]:
  261. collisions += ss[k][sid][1]
  262. assert collisions == 470, collisions
  263. msg_r('({} collisions) '.format(collisions))
  264. msg('OK')
  265. basic_ops()
  266. defaults_and_limits()
  267. collisions()
  268. count_collisions()
  269. return True
  270. def exit_msg():
  271. t = int(time.time()) - start_time
  272. gmsg('All requested tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60))
  273. def run_unit_test(test):
  274. gmsg('Running unit test {}'.format(test))
  275. t = UnitTests()
  276. return getattr(t,test)(test)
  277. all_tests = filter(lambda s: s[0] != '_',dir(UnitTests))
  278. start_time = int(time.time())
  279. if opt.list:
  280. Die(0,' '.join(all_tests))
  281. try:
  282. for test in cmd_args:
  283. if test not in all_tests:
  284. die(1,"'{}': test not recognized".format(test))
  285. for test in (cmd_args or all_tests):
  286. if not run_unit_test(test):
  287. rdie(2,'Test failed')
  288. exit_msg()
  289. except KeyboardInterrupt:
  290. die(1,green('\nExiting at user request'))