ut_tx_deserialize.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. #!/usr/bin/env python3
  2. """
  3. test/unit_tests_d/ut_tx_deserialize: TX deserialization unit test for the MMGen suite
  4. """
  5. import os,json
  6. from mmgen.common import *
  7. from ..include.common import *
  8. from mmgen.protocol import init_proto
  9. from mmgen.tx import MMGenTX,DeserializedTX
  10. from mmgen.rpc import rpc_init
  11. from mmgen.daemon import CoinDaemon
  12. class unit_test(object):
  13. def _get_core_repo_root(self):
  14. self.core_repo_root = os.getenv('CORE_REPO_ROOT')
  15. if not self.core_repo_root:
  16. die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test')
  17. def run_test(self,name,ut):
  18. async def test_tx(tx_proto,tx_hex,desc,n):
  19. def has_nonstandard_outputs(outputs):
  20. for o in outputs:
  21. t = o['scriptPubKey']['type']
  22. if t in ('nonstandard','pubkey','nulldata'):
  23. return True
  24. return False
  25. rpc = await rpc_init(proto=tx_proto)
  26. d = await rpc.call('decoderawtransaction',tx_hex)
  27. if has_nonstandard_outputs(d['vout']): return False
  28. dt = DeserializedTX(tx_proto,tx_hex)
  29. if opt.verbose:
  30. Msg('\n====================================================')
  31. Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc))
  32. if opt.verbose:
  33. Pmsg(d)
  34. Msg('----------------------------------------------------')
  35. Pmsg(dt)
  36. # metadata
  37. assert dt['txid'] == d['txid'],'TXID does not match'
  38. assert dt['lock_time'] == d['locktime'],'Locktime does not match'
  39. assert dt['version'] == d['version'],'Version does not match'
  40. # inputs
  41. a,b = d['vin'],dt['txins']
  42. for i in range(len(a)):
  43. assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i)
  44. assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i)
  45. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  46. 'nSeq of input {} does not match'.format(i))
  47. if 'txinwitness' in a[i]:
  48. assert a[i]['txinwitness'] == b[i]['witness'],(
  49. 'witness of input {} does not match'.format(i))
  50. # outputs
  51. a,b = d['vout'],dt['txouts']
  52. for i in range(len(a)):
  53. A = a[i]['scriptPubKey']['addresses'][0]
  54. B = b[i]['address']
  55. fs = 'address of output {} does not match\nA: {}\nB: {}'
  56. assert A == B, fs.format(i,A,B)
  57. A = a[i]['value']
  58. B = b[i]['amount']
  59. fs = 'value of output {} does not match\nA: {}\nB: {}'
  60. assert A == B, fs.format(i,A,B)
  61. A = a[i]['scriptPubKey']['hex']
  62. B = b[i]['scriptPubKey']
  63. fs = 'scriptPubKey of output {} does not match\nA: {}\nB: {}'
  64. assert A == B, fs.format(i,A,B)
  65. return True
  66. def print_info(fn,extra_desc):
  67. if opt.names:
  68. Msg_r('{} {} ({}){}'.format(
  69. purple('Testing'),
  70. cyan(name),
  71. extra_desc,
  72. '' if opt.quiet else '\n'))
  73. else:
  74. Msg_r(f'Testing {extra_desc} transactions from {fn!r}')
  75. if not opt.quiet:
  76. Msg('')
  77. async def test_core_vectors():
  78. self._get_core_repo_root()
  79. fn_b = 'src/test/data/tx_valid.json'
  80. fn = os.path.join(self.core_repo_root,fn_b)
  81. data = json.loads(open(fn).read())
  82. print_info(fn_b,'Core test vector')
  83. n = 1
  84. for e in data:
  85. if type(e[0]) == list:
  86. await test_tx(
  87. tx_proto = init_proto('btc'),
  88. tx_hex = e[1],
  89. desc = desc,
  90. n = n )
  91. n += 1
  92. else:
  93. desc = e[0]
  94. Msg('OK')
  95. async def test_mmgen_txs():
  96. fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'),
  97. ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
  98. # ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx')
  99. )
  100. print_info('test/ref/*rawtx','MMGen reference')
  101. for n,(coin,testnet,fn) in enumerate(fns):
  102. tx = MMGenTX.Unsigned(filename=fn)
  103. await test_tx(
  104. tx_proto = tx.proto,
  105. tx_hex = tx.hex,
  106. desc = fn,
  107. n = n+1 )
  108. Msg('OK')
  109. start_test_daemons('btc',remove_datadir=True)
  110. start_test_daemons('btc_tn')
  111. run_session(test_core_vectors())
  112. run_session(test_mmgen_txs())
  113. stop_test_daemons('btc','btc_tn')
  114. return True