ut_tx_deserialize.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. #!/usr/bin/env python3
  2. """
  3. test/unit_tests_d/ut_tx_deserialize: TX deserialization unit tests for the MMGen suite
  4. """
  5. import os,json
  6. from mmgen.color import purple,cyan
  7. from mmgen.util import msg,Msg,Msg_r
  8. from mmgen.devtools import Pmsg
  9. from mmgen.protocol import init_proto
  10. from mmgen.tx import CompletedTX
  11. from mmgen.proto.btc.tx.base import DeserializeTX
  12. from mmgen.rpc import rpc_init
  13. from ..include.common import cfg,start_test_daemons,stop_test_daemons
  14. def print_info(name,extra_desc):
  15. if cfg.names:
  16. Msg_r('{} {} {}'.format(
  17. purple('Testing'),
  18. cyan(f'{name} ({extra_desc})'),
  19. '' if cfg.quiet else '\n'))
  20. else:
  21. Msg_r(f'Testing {extra_desc}')
  22. if not cfg.quiet:
  23. Msg('')
  24. async def test_tx(tx_proto,tx_hex,desc,n):
  25. def has_nonstandard_outputs(outputs):
  26. for o in outputs:
  27. t = o['scriptPubKey']['type']
  28. if t in ('nonstandard','pubkey','nulldata'):
  29. return True
  30. return False
  31. rpc = await rpc_init( cfg, proto=tx_proto, ignore_wallet=True )
  32. d = await rpc.call('decoderawtransaction',tx_hex)
  33. if has_nonstandard_outputs(d['vout']):
  34. return False
  35. dt = DeserializeTX(tx_proto,tx_hex)
  36. if cfg.verbose:
  37. Msg('\n\n================================ Core vector: ==================================')
  38. Msg_r('.' if cfg.quiet else f'{n:>3}) {desc}\n')
  39. if cfg.verbose:
  40. Pmsg(d)
  41. Msg('\n------------------------------ MMGen deserialized: -----------------------------')
  42. Pmsg(dt._asdict())
  43. # metadata
  44. assert dt.txid == d['txid'],'TXID does not match'
  45. assert dt.locktime == d['locktime'],'Locktime does not match'
  46. assert dt.version == d['version'],'Version does not match'
  47. # inputs
  48. a,b = d['vin'],dt.txins
  49. for i in range(len(a)):
  50. assert a[i]['txid'] == b[i]['txid'],f'TxID of input {i} does not match'
  51. assert a[i]['vout'] == b[i]['vout'],f'vout of input {i} does not match'
  52. assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
  53. f'nSeq of input {i} does not match')
  54. if 'txinwitness' in a[i]:
  55. assert a[i]['txinwitness'] == b[i]['witness'],(
  56. f'witness of input {i} does not match')
  57. # outputs
  58. a,b = d['vout'],dt.txouts
  59. for i in range(len(a)):
  60. if 'addresses' in a[i]['scriptPubKey']:
  61. A = a[i]['scriptPubKey']['addresses'][0]
  62. B = b[i]['address']
  63. fs = 'address of output {} does not match\nA: {}\nB: {}'
  64. assert A == B, fs.format(i,A,B)
  65. A = a[i]['value']
  66. B = b[i]['amount']
  67. fs = 'value of output {} does not match\nA: {}\nB: {}'
  68. assert A == B, fs.format(i,A,B)
  69. A = a[i]['scriptPubKey']['hex']
  70. B = b[i]['scriptPubKey']
  71. fs = 'scriptPubKey of output {} does not match\nA: {}\nB: {}'
  72. assert A == B, fs.format(i,A,B)
  73. async def do_mmgen_ref(daemons,fns,name,desc):
  74. # NB: remove_datadir is required here for some reason (seems to be Bitcoin Core version-dependent)
  75. start_test_daemons(*daemons,remove_datadir=True)
  76. print_info(name,desc)
  77. for n,fn in enumerate(fns):
  78. tx = await CompletedTX( cfg=cfg, filename=fn, quiet_open=True )
  79. await test_tx(
  80. tx_proto = tx.proto,
  81. tx_hex = tx.serialized,
  82. desc = fn,
  83. n = n+1 )
  84. stop_test_daemons(*daemons)
  85. Msg('OK')
  86. return True
  87. class unit_tests:
  88. altcoin_deps = ('mmgen_ref_alt',)
  89. async def core_vectors(self,name,ut):
  90. core_repo_root = os.getenv('CORE_REPO_ROOT')
  91. if not core_repo_root:
  92. msg('The environmental variable CORE_REPO_ROOT must be set before running this test')
  93. return False
  94. start_test_daemons('btc')
  95. fn_b = 'src/test/data/tx_valid.json'
  96. fn = os.path.join(core_repo_root,fn_b)
  97. with open(fn) as fp:
  98. core_data = json.loads(fp.read())
  99. print_info(name,'Bitcoin Core test vectors')
  100. n = 1
  101. for e in core_data:
  102. if isinstance(e[0],list):
  103. await test_tx(
  104. tx_proto = init_proto( cfg, 'btc', need_amt=True ),
  105. tx_hex = e[1],
  106. desc = desc,
  107. n = n )
  108. n += 1
  109. else:
  110. desc = e[0]
  111. Msg('OK')
  112. stop_test_daemons('btc')
  113. return True
  114. async def mmgen_ref(self,name,ut):
  115. return await do_mmgen_ref(
  116. ('btc','btc_tn'),
  117. (
  118. 'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx',
  119. 'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx',
  120. 'test/ref/542169[5.68152,34].sigtx',
  121. ),
  122. name,
  123. 'MMGen reference transactions [Bitcoin]' )
  124. async def mmgen_ref_alt(self,name,ut):
  125. return await do_mmgen_ref(
  126. ('ltc','ltc_tn','bch'),
  127. (
  128. 'test/ref/litecoin/AF3CDF-LTC[620.76194,1453,tl=1320969600].rawtx',
  129. 'test/ref/litecoin/A5A1E0-LTC[1454.64322,1453,tl=1320969600].testnet.rawtx',
  130. 'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx'
  131. ),
  132. name,
  133. 'MMGen reference transactions [Altcoin]' )