ut_tx_deserialize.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. #!/usr/bin/env python3
  2. """
  3. test/unit_tests_d/ut_tx_deserialize: TX deserialization unit test for the MMGen suite
  4. """
  5. import os,json
  6. from mmgen.common import *
  7. from ..include.common import *
  8. from mmgen.protocol import init_proto
  9. from mmgen.tx import MMGenTX,DeserializedTX
  10. from mmgen.rpc import rpc_init
  11. from mmgen.daemon import CoinDaemon
  12. class unit_test(object):
  13. def _get_core_repo_root(self):
  14. self.core_repo_root = os.getenv('CORE_REPO_ROOT')
  15. if not self.core_repo_root:
  16. die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test')
  17. def run_test(self,name,ut):
  18. async def test_tx(tx_proto,tx_hex,desc,n):
  19. def has_nonstandard_outputs(outputs):
  20. for o in outputs:
  21. t = o['scriptPubKey']['type']
  22. if t in ('nonstandard','pubkey','nulldata'):
  23. return True
  24. return False
  25. rpc = await rpc_init(proto=tx_proto)
  26. d = await rpc.call('decoderawtransaction',tx_hex)
  27. if has_nonstandard_outputs(d['vout']): return False
  28. dt = DeserializedTX(tx_proto,tx_hex)
  29. if opt.verbose:
  30. Msg('\n====================================================')
  31. Msg_r('.' if opt.quiet else f'{n:>3}) {desc}\n')
  32. if opt.verbose:
  33. Pmsg(d)
  34. Msg('----------------------------------------------------')
  35. Pmsg(dt)
  36. # metadata
  37. assert dt['txid'] == d['txid'],'TXID does not match'
  38. assert dt['lock_time'] == d['locktime'],'Locktime does not match'
  39. assert dt['version'] == d['version'],'Version does not match'
  40. # inputs
  41. a,b = d['vin'],dt['txins']
  42. for i in range(len(a)):
  43. assert a[i]['txid'] == b[i]['txid'],f'TxID of input {i} does not match'
  44. assert a[i]['vout'] == b[i]['vout'],f'vout of input {i} does not match'
  45. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  46. f'nSeq of input {i} does not match')
  47. if 'txinwitness' in a[i]:
  48. assert a[i]['txinwitness'] == b[i]['witness'],(
  49. f'witness of input {i} does not match')
  50. # outputs
  51. a,b = d['vout'],dt['txouts']
  52. for i in range(len(a)):
  53. if 'addresses' in a[i]['scriptPubKey']:
  54. A = a[i]['scriptPubKey']['addresses'][0]
  55. B = b[i]['address']
  56. fs = 'address of output {} does not match\nA: {}\nB: {}'
  57. assert A == B, fs.format(i,A,B)
  58. A = a[i]['value']
  59. B = b[i]['amount']
  60. fs = 'value of output {} does not match\nA: {}\nB: {}'
  61. assert A == B, fs.format(i,A,B)
  62. A = a[i]['scriptPubKey']['hex']
  63. B = b[i]['scriptPubKey']
  64. fs = 'scriptPubKey of output {} does not match\nA: {}\nB: {}'
  65. assert A == B, fs.format(i,A,B)
  66. return True
  67. def print_info(fn,extra_desc):
  68. if opt.names:
  69. Msg_r('{} {} ({}){}'.format(
  70. purple('Testing'),
  71. cyan(name),
  72. extra_desc,
  73. '' if opt.quiet else '\n'))
  74. else:
  75. Msg_r(f'Testing {extra_desc} transactions from {fn!r}')
  76. if not opt.quiet:
  77. Msg('')
  78. async def test_core_vectors():
  79. self._get_core_repo_root()
  80. fn_b = 'src/test/data/tx_valid.json'
  81. fn = os.path.join(self.core_repo_root,fn_b)
  82. with open(fn) as fp:
  83. data = json.loads(fp.read())
  84. print_info(fn_b,'Core test vector')
  85. n = 1
  86. for e in data:
  87. if type(e[0]) == list:
  88. await test_tx(
  89. tx_proto = init_proto('btc'),
  90. tx_hex = e[1],
  91. desc = desc,
  92. n = n )
  93. n += 1
  94. else:
  95. desc = e[0]
  96. Msg('OK')
  97. async def test_mmgen_txs():
  98. fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'),
  99. ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
  100. # ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx')
  101. )
  102. print_info('test/ref/*rawtx','MMGen reference')
  103. for n,(coin,testnet,fn) in enumerate(fns):
  104. tx = MMGenTX.Unsigned(filename=fn)
  105. await test_tx(
  106. tx_proto = tx.proto,
  107. tx_hex = tx.hex,
  108. desc = fn,
  109. n = n+1 )
  110. Msg('OK')
  111. start_test_daemons('btc',remove_datadir=True)
  112. start_test_daemons('btc_tn')
  113. run_session(test_core_vectors())
  114. run_session(test_mmgen_txs())
  115. stop_test_daemons('btc','btc_tn')
  116. return True