ut_tx_deserialize.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #!/usr/bin/env python3
  2. """
  3. test/unit_tests_d/ut_tx_deserialize: TX deserialization unit test for the MMGen suite
  4. """
  5. import os
  6. from mmgen.common import *
  7. class tx_deserialize(object):
  8. def _get_core_repo_root(self):
  9. self.core_repo_root = os.getenv('CORE_REPO_ROOT')
  10. if not self.core_repo_root:
  11. die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test')
  12. def run_test(self,name):
  13. def test_tx(txhex,desc,n):
  14. def has_nonstandard_outputs(outputs):
  15. for o in outputs:
  16. t = o['scriptPubKey']['type']
  17. if t in ('nonstandard','pubkey','nulldata'):
  18. return True
  19. return False
  20. d = g.rpch.decoderawtransaction(txhex)
  21. if has_nonstandard_outputs(d['vout']): return False
  22. dt = DeserializedTX(txhex)
  23. if opt.verbose:
  24. Msg('\n====================================================')
  25. Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc))
  26. if opt.verbose:
  27. Pmsg(d)
  28. Msg('----------------------------------------------------')
  29. Pmsg(dt)
  30. # metadata
  31. assert dt['txid'] == d['txid'],'TXID does not match'
  32. assert dt['lock_time'] == d['locktime'],'Locktime does not match'
  33. assert dt['version'] == d['version'],'Version does not match'
  34. # inputs
  35. a,b = d['vin'],dt['txins']
  36. for i in range(len(a)):
  37. assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i)
  38. assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i)
  39. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  40. 'nSeq of input {} does not match'.format(i))
  41. if 'txinwitness' in a[i]:
  42. assert a[i]['txinwitness'] == b[i]['witness'],(
  43. 'witness of input {} does not match'.format(i))
  44. # outputs
  45. a,b = d['vout'],dt['txouts']
  46. for i in range(len(a)):
  47. assert a[i]['scriptPubKey']['addresses'][0] == b[i]['address'],(
  48. 'address of ouput {} does not match'.format(i))
  49. assert a[i]['value'] == b[i]['amount'],'value of ouput {} does not match'.format(i)
  50. assert a[i]['scriptPubKey']['hex'] == b[i]['scriptPubKey'],(
  51. 'scriptPubKey of ouput {} does not match'.format(i))
  52. return True
  53. def print_info(fn,extra_desc):
  54. if opt.names:
  55. Msg_r('{} {} ({}){}'.format(
  56. purple('Testing'),
  57. cyan(name),
  58. extra_desc,
  59. '' if opt.quiet else '\n'))
  60. else:
  61. Msg_r('Testing transactions from {!r}'.format(fn))
  62. if not opt.quiet: Msg('')
  63. def test_core_vectors():
  64. self._get_core_repo_root()
  65. fn = os.path.join(self.core_repo_root,'src/test/data/tx_valid.json')
  66. data = json.loads(open(fn).read())
  67. print_info(fn,'Core test vectors')
  68. n = 1
  69. for e in data:
  70. if type(e[0]) == list:
  71. test_tx(e[1],desc,n)
  72. n += 1
  73. else:
  74. desc = e[0]
  75. Msg('OK')
  76. def test_mmgen_txs():
  77. fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'),
  78. ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
  79. ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx') )
  80. from mmgen.protocol import init_coin
  81. from mmgen.tx import MMGenTX
  82. print_info('test/ref/*rawtx','MMGen reference transactions')
  83. for n,(coin,tn,fn) in enumerate(fns):
  84. init_coin(coin,tn)
  85. rpc_init(reinit=True)
  86. test_tx(MMGenTX(fn).hex,fn,n+1)
  87. init_coin('btc',False)
  88. rpc_init(reinit=True)
  89. Msg('OK')
  90. from mmgen.tx import DeserializedTX
  91. import json
  92. test_mmgen_txs()
  93. test_core_vectors()
  94. return True