ut_tx_deserialize.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. #!/usr/bin/env python3
  2. """
  3. test/unit_tests_d/ut_tx_deserialize: TX deserialization unit tests for the MMGen suite
  4. """
  5. import os,json
  6. from mmgen.color import purple,cyan
  7. from mmgen.util import msg,Msg,Msg_r
  8. from mmgen.protocol import init_proto
  9. from mmgen.tx import CompletedTX
  10. from mmgen.proto.btc.tx.base import DeserializeTX
  11. from mmgen.rpc import rpc_init
  12. from ..include.common import cfg,start_test_daemons,stop_test_daemons
  13. def print_info(name,extra_desc):
  14. if cfg.names:
  15. Msg_r('{} {} {}'.format(
  16. purple('Testing'),
  17. cyan(f'{name} ({extra_desc})'),
  18. '' if cfg.quiet else '\n'))
  19. else:
  20. Msg_r(f'Testing {extra_desc}')
  21. if not cfg.quiet:
  22. Msg('')
  23. async def test_tx(tx_proto,tx_hex,desc,n):
  24. def has_nonstandard_outputs(outputs):
  25. for o in outputs:
  26. t = o['scriptPubKey']['type']
  27. if t in ('nonstandard','pubkey','nulldata'):
  28. return True
  29. return False
  30. rpc = await rpc_init( cfg, proto=tx_proto, ignore_wallet=True )
  31. d = await rpc.call('decoderawtransaction',tx_hex)
  32. if has_nonstandard_outputs(d['vout']):
  33. return False
  34. dt = DeserializeTX(tx_proto,tx_hex)
  35. if cfg.verbose:
  36. Msg('\n\n================================ Core vector: ==================================')
  37. Msg_r('.' if cfg.quiet else f'{n:>3}) {desc}\n')
  38. if cfg.verbose:
  39. Pmsg(d)
  40. Msg('\n------------------------------ MMGen deserialized: -----------------------------')
  41. Pmsg(dt._asdict())
  42. # metadata
  43. assert dt.txid == d['txid'],'TXID does not match'
  44. assert dt.locktime == d['locktime'],'Locktime does not match'
  45. assert dt.version == d['version'],'Version does not match'
  46. # inputs
  47. a,b = d['vin'],dt.txins
  48. for i in range(len(a)):
  49. assert a[i]['txid'] == b[i]['txid'],f'TxID of input {i} does not match'
  50. assert a[i]['vout'] == b[i]['vout'],f'vout of input {i} does not match'
  51. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  52. f'nSeq of input {i} does not match')
  53. if 'txinwitness' in a[i]:
  54. assert a[i]['txinwitness'] == b[i]['witness'],(
  55. f'witness of input {i} does not match')
  56. # outputs
  57. a,b = d['vout'],dt.txouts
  58. for i in range(len(a)):
  59. if 'addresses' in a[i]['scriptPubKey']:
  60. A = a[i]['scriptPubKey']['addresses'][0]
  61. B = b[i]['address']
  62. fs = 'address of output {} does not match\nA: {}\nB: {}'
  63. assert A == B, fs.format(i,A,B)
  64. A = a[i]['value']
  65. B = b[i]['amount']
  66. fs = 'value of output {} does not match\nA: {}\nB: {}'
  67. assert A == B, fs.format(i,A,B)
  68. A = a[i]['scriptPubKey']['hex']
  69. B = b[i]['scriptPubKey']
  70. fs = 'scriptPubKey of output {} does not match\nA: {}\nB: {}'
  71. assert A == B, fs.format(i,A,B)
  72. async def do_mmgen_ref(daemons,fns,name,desc):
  73. start_test_daemons(*daemons)
  74. print_info(name,desc)
  75. for n,fn in enumerate(fns):
  76. tx = await CompletedTX( cfg=cfg, filename=fn, quiet_open=True )
  77. await test_tx(
  78. tx_proto = tx.proto,
  79. tx_hex = tx.serialized,
  80. desc = fn,
  81. n = n+1 )
  82. stop_test_daemons(*daemons)
  83. Msg('OK')
  84. return True
  85. class unit_tests:
  86. altcoin_deps = ('mmgen_ref_alt',)
  87. async def core_vectors(self,name,ut):
  88. core_repo_root = os.getenv('CORE_REPO_ROOT')
  89. if not core_repo_root:
  90. msg('The environmental variable CORE_REPO_ROOT must be set before running this test')
  91. return False
  92. start_test_daemons('btc')
  93. fn_b = 'src/test/data/tx_valid.json'
  94. fn = os.path.join(core_repo_root,fn_b)
  95. with open(fn) as fp:
  96. core_data = json.loads(fp.read())
  97. print_info(name,'Bitcoin Core test vectors')
  98. n = 1
  99. for e in core_data:
  100. if isinstance(e[0],list):
  101. await test_tx(
  102. tx_proto = init_proto( cfg, 'btc', need_amt=True ),
  103. tx_hex = e[1],
  104. desc = desc,
  105. n = n )
  106. n += 1
  107. else:
  108. desc = e[0]
  109. Msg('OK')
  110. stop_test_daemons('btc')
  111. return True
  112. async def mmgen_ref(self,name,ut):
  113. return await do_mmgen_ref(
  114. ('btc','btc_tn'),
  115. (
  116. 'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx',
  117. 'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx',
  118. 'test/ref/542169[5.68152,34].sigtx',
  119. ),
  120. name,
  121. 'MMGen reference transactions [Bitcoin]' )
  122. async def mmgen_ref_alt(self,name,ut):
  123. return await do_mmgen_ref(
  124. ('ltc','ltc_tn','bch'),
  125. (
  126. 'test/ref/litecoin/AF3CDF-LTC[620.76194,1453,tl=1320969600].rawtx',
  127. 'test/ref/litecoin/A5A1E0-LTC[1454.64322,1453,tl=1320969600].testnet.rawtx',
  128. 'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx'
  129. ),
  130. name,
  131. 'MMGen reference transactions [Altcoin]' )