ut_tx_deserialize.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. #!/usr/bin/env python3
  2. """
  3. test/unit_tests_d/ut_tx_deserialize: TX deserialization unit test for the MMGen suite
  4. """
  5. import os,json
  6. from mmgen.common import *
  7. from ..include.common import *
  8. from mmgen.protocol import init_proto
  9. from mmgen.tx import UnsignedTX
  10. from mmgen.base_proto.bitcoin.tx.base import DeserializeTX
  11. from mmgen.rpc import rpc_init
  12. from mmgen.daemon import CoinDaemon
  13. class unit_test(object):
  14. def _get_core_repo_root(self):
  15. self.core_repo_root = os.getenv('CORE_REPO_ROOT')
  16. if not self.core_repo_root:
  17. die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test')
  18. def run_test(self,name,ut):
  19. async def test_tx(tx_proto,tx_hex,desc,n):
  20. def has_nonstandard_outputs(outputs):
  21. for o in outputs:
  22. t = o['scriptPubKey']['type']
  23. if t in ('nonstandard','pubkey','nulldata'):
  24. return True
  25. return False
  26. rpc = await rpc_init(proto=tx_proto)
  27. d = await rpc.call('decoderawtransaction',tx_hex)
  28. if has_nonstandard_outputs(d['vout']): return False
  29. dt = DeserializeTX(tx_proto,tx_hex)
  30. if opt.verbose:
  31. Msg('\n====================================================')
  32. Msg_r('.' if opt.quiet else f'{n:>3}) {desc}\n')
  33. if opt.verbose:
  34. Pmsg(d)
  35. Msg('----------------------------------------------------')
  36. Pmsg(dt)
  37. # metadata
  38. assert dt.txid == d['txid'],'TXID does not match'
  39. assert dt.locktime == d['locktime'],'Locktime does not match'
  40. assert dt.version == d['version'],'Version does not match'
  41. # inputs
  42. a,b = d['vin'],dt.txins
  43. for i in range(len(a)):
  44. assert a[i]['txid'] == b[i]['txid'],f'TxID of input {i} does not match'
  45. assert a[i]['vout'] == b[i]['vout'],f'vout of input {i} does not match'
  46. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  47. f'nSeq of input {i} does not match')
  48. if 'txinwitness' in a[i]:
  49. assert a[i]['txinwitness'] == b[i]['witness'],(
  50. f'witness of input {i} does not match')
  51. # outputs
  52. a,b = d['vout'],dt.txouts
  53. for i in range(len(a)):
  54. if 'addresses' in a[i]['scriptPubKey']:
  55. A = a[i]['scriptPubKey']['addresses'][0]
  56. B = b[i]['address']
  57. fs = 'address of output {} does not match\nA: {}\nB: {}'
  58. assert A == B, fs.format(i,A,B)
  59. A = a[i]['value']
  60. B = b[i]['amount']
  61. fs = 'value of output {} does not match\nA: {}\nB: {}'
  62. assert A == B, fs.format(i,A,B)
  63. A = a[i]['scriptPubKey']['hex']
  64. B = b[i]['scriptPubKey']
  65. fs = 'scriptPubKey of output {} does not match\nA: {}\nB: {}'
  66. assert A == B, fs.format(i,A,B)
  67. return True
  68. def print_info(fn,extra_desc):
  69. if opt.names:
  70. Msg_r('{} {} ({}){}'.format(
  71. purple('Testing'),
  72. cyan(name),
  73. extra_desc,
  74. '' if opt.quiet else '\n'))
  75. else:
  76. Msg_r(f'Testing {extra_desc} transactions from {fn!r}')
  77. if not opt.quiet:
  78. Msg('')
  79. async def test_core_vectors():
  80. self._get_core_repo_root()
  81. fn_b = 'src/test/data/tx_valid.json'
  82. fn = os.path.join(self.core_repo_root,fn_b)
  83. with open(fn) as fp:
  84. data = json.loads(fp.read())
  85. print_info(fn_b,'Core test vector')
  86. n = 1
  87. for e in data:
  88. if type(e[0]) == list:
  89. await test_tx(
  90. tx_proto = init_proto('btc',need_amt=True),
  91. tx_hex = e[1],
  92. desc = desc,
  93. n = n )
  94. n += 1
  95. else:
  96. desc = e[0]
  97. Msg('OK')
  98. async def test_mmgen_txs():
  99. fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'),
  100. ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
  101. # ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx')
  102. )
  103. print_info('test/ref/*rawtx','MMGen reference')
  104. for n,(coin,testnet,fn) in enumerate(fns):
  105. tx = UnsignedTX(filename=fn)
  106. await test_tx(
  107. tx_proto = tx.proto,
  108. tx_hex = tx.serialized,
  109. desc = fn,
  110. n = n+1 )
  111. Msg('OK')
  112. start_test_daemons('btc',remove_datadir=True)
  113. start_test_daemons('btc_tn')
  114. run_session(test_core_vectors())
  115. run_session(test_mmgen_txs())
  116. stop_test_daemons('btc','btc_tn')
  117. return True