#!/usr/bin/env python3 """ test.modtest_d.ut_tx: TX unit tests for the MMGen suite """ import os from mmgen.tx import CompletedTX, UnsignedTX from mmgen.tx.file import MMGenTxFile from mmgen.cfg import Config from ..include.common import cfg, qmsg, vmsg, gr_uc, make_burn_addr async def do_txfile_test(desc, fns, cfg=cfg, check=False): qmsg(f'\n Testing CompletedTX initializer ({desc})') for fn in fns: qmsg(f' parsing: {os.path.basename(fn)}') fpath = os.path.join('test', 'ref', fn) tx = await CompletedTX(cfg=cfg, filename=fpath, quiet_open=True) vmsg('\n' + tx.info.format()) f = MMGenTxFile(tx) fn_gen = f.make_filename() if cfg.debug_utf8: fn_gen = fn_gen.replace('-α', '') assert fn_gen == os.path.basename(fn), f'{fn_gen} != {fn}' if check: import json from mmgen.tx.file import json_dumps from mmgen.util import make_chksum_6 text = f.format() with open(fpath) as fh: text_chk = fh.read() data_chk = json.loads(text_chk) outputs = data_chk['MMGenTransaction']['outputs'] for n, o in enumerate(outputs): outputs[n] = {k:v for k,v in o.items() if not (type(v) is bool and v is False)} data_chk['chksum'] = make_chksum_6(json_dumps(data_chk['MMGenTransaction'])) text_chk_fixed = json_dumps(data_chk) assert text == text_chk_fixed, f'\nformatted text:\n{text}\n !=\noriginal file:\n{text_chk_fixed}' qmsg(' OK') return True class unit_tests: altcoin_deps = ('txfile_alt', 'txfile_alt_legacy') async def txfile(self, name, ut, desc='displaying transaction files (BTC)'): return await do_txfile_test( 'Bitcoin', ( 'tx/7A8157[6.65227,34].rawtx', 'tx/BB3FD2[7.57134314,123].sigtx', 'tx/0A869F[1.23456,32].regtest.asubtx', ), check = True ) async def txfile_alt(self, name, ut, desc='displaying transaction files (LTC, BCH, ETH)'): return await do_txfile_test( 'altcoins', ( 'tx/C09D73-LTC[981.73747,2000].testnet.rawtx', 'tx/91060A-BCH[1.23456].regtest.arawtx', 'tx/D850C6-MM1[43.21,50000].subtx', # token tx ), # token resolved by tracking wallet under data_dir: cfg = Config({'data_dir': 'test/ref/data_dir'}), check = True ) async def txfile_legacy(self, name, ut, desc='displaying transaction files (legacy format, BTC)'): return await do_txfile_test( 'Bitcoin - legacy file format', ( '0B8D5A[15.31789,14,tl=1320969600].rawtx', '542169[5.68152,34].sigtx', '0C7115[15.86255,14,tl=1320969600].testnet.rawtx', '25EFA3[2.34].testnet.rawtx', ) ) async def txfile_alt_legacy(self, name, ut, desc='displaying transaction files (legacy format, LTC, BCH, ETH)'): return await do_txfile_test( 'altcoins - legacy file format', ( '460D4D-BCH[10.19764,tl=1320969600].rawtx', 'ethereum/5881D2-MM1[1.23456,50000].rawtx', 'ethereum/6BDB25-MM1[1.23456,50000].testnet.rawtx', 'ethereum/88FEFD-ETH[23.45495,40000].rawtx', 'ethereum/B472BD-ETH[23.45495,40000].testnet.rawtx', 'ethereum/B472BD-ETH[23.45495,40000].testnet.sigtx', 'litecoin/A5A1E0-LTC[1454.64322,1453,tl=1320969600].testnet.rawtx', 'litecoin/AF3CDF-LTC[620.76194,1453,tl=1320969600].rawtx', ) ) def errors(self, name, ut, desc='reading transaction files (error handling)'): async def bad1(): await CompletedTX(cfg, filename='foo') def bad2(): UnsignedTX(cfg, filename='foo') bad_data = ( ('forbidden positional args', 'TypeError', 'positional arguments', bad1), ('forbidden positional args', 'TypeError', 'positional arguments', bad2), ) ut.process_bad_data(bad_data) return True def op_return_data(self, name, ut, desc='OpReturnData class'): from mmgen.proto.btc.tx.op_return_data import OpReturnData vecs = [ 'data:=:ETH.ETH:0x86d526d6624AbC0178cF7296cD538Ecc080A95F1:0/1/0', 'hexdata:3d3a4554482e4554483a30783836643532366436363234416243303137' '38634637323936634435333845636330383041393546313a302f312f30', 'hexdata:00010203040506', 'data:a\n', 'data:a\tb', 'data:' + gr_uc[:24], ] assert OpReturnData(cfg._proto, vecs[0]) == OpReturnData(cfg._proto, vecs[1]) for vec in vecs: d = OpReturnData(cfg._proto, vec) assert d == OpReturnData(cfg._proto, repr(d)) # repr() must return a valid initializer assert isinstance(d, bytes) assert isinstance(str(d), str) vmsg('-' * 80) vmsg(vec) vmsg(repr(d)) vmsg(d.hl()) vmsg(d.hl(add_label=True)) bad_data = [ 'data:', 'hexdata:', 'data:' + ('x' * 81), 'hexdata:' + ('deadbeef' * 20) + 'ee', 'hex:0abc', 'da:xyz', 'hexdata:xyz', 'hexdata:abcde', b'data:abc', ] def bad(n): return lambda: OpReturnData(cfg._proto, bad_data[n]) vmsg('-' * 80) vmsg('Testing error handling:') ut.process_bad_data(( ('bad1', 'AssertionError', 'not in range', bad(0)), ('bad2', 'AssertionError', 'not in range', bad(1)), ('bad3', 'AssertionError', 'not in range', bad(2)), ('bad4', 'AssertionError', 'not in range', bad(3)), ('bad5', 'ValueError', 'must start', bad(4)), ('bad6', 'ValueError', 'must start', bad(5)), ('bad7', 'AssertionError', 'not in hex', bad(6)), ('bad8', 'AssertionError', 'even', bad(7)), ('bad9', 'AssertionError', 'a string', bad(8)), ), pfx='') return True def memo(self, name, ut, desc='Swap transaction memo'): from mmgen.protocol import init_proto from mmgen.swap.proto.thorchain.memo import Memo for coin, addrtype in ( ('ltc', 'bech32'), ('bch', 'compressed'), ): proto = init_proto(cfg, coin, need_amt=True) addr = make_burn_addr(proto, addrtype) for limit, limit_chk in ( ('123.4567', 12340000000), ('1.234567', 123400000), ('0.01234567', 1234000), ('0.00012345', 12345), (None, 0), ): vmsg('\nTesting memo initialization:') m = Memo(proto, addr, trade_limit=proto.coin_amt(limit) if limit else None) vmsg(f'str(memo): {m}') vmsg(f'repr(memo): {m!r}') vmsg(f'limit: {limit}') p = Memo.parse(m) limit_dec = proto.coin_amt(p.trade_limit, from_unit='satoshi') vmsg(f'limit_dec: {limit_dec.hl()}') vmsg('\nTesting memo parsing:') from pprint import pformat vmsg(pformat(p._asdict())) assert p.proto == 'THORChain' assert p.function == 'SWAP' assert p.chain == coin.upper() assert p.asset == coin.upper() assert p.address == addr.views[addr.view_pref] assert p.trade_limit == limit_chk assert p.stream_interval == 1 assert p.stream_quantity == 0 # auto vmsg('\nTesting is_partial_memo():') for vec in ( str(m), 'SWAP:xyz', '=:xyz', 's:xyz', 'a:xz', '+:xz', 'WITHDRAW:xz', 'LOAN+:xz:x:x', 'TRADE-:xz:x:x', 'BOND:xz', ): vmsg(f' pass: {vec}') assert Memo.is_partial_memo(vec), vec for vec in ( '=', 'swap', 'swap:', 'swap:abc', 'SWAP:a', ): vmsg(f' fail: {vec}') assert not Memo.is_partial_memo(vec), vec vmsg('\nTesting error handling:') def bad(s): return lambda: Memo.parse(s) ut.process_bad_data(( ('bad1', 'SwapMemoParseError', 'must contain', bad('x')), ('bad2', 'SwapMemoParseError', 'must contain', bad('y:z:x')), ('bad3', 'SwapMemoParseError', 'function abbrev', bad('z:l:foobar:0/1/0')), ('bad4', 'SwapMemoParseError', 'asset abbrev', bad('=:x:foobar:0/1/0')), ('bad5', 'SwapMemoParseError', 'failed to parse', bad('=:l:foobar:n')), ('bad6', 'SwapMemoParseError', 'invalid specifier', bad('=:l:foobar:x/1/0')), ('bad7', 'SwapMemoParseError', 'extra', bad('=:l:foobar:0/1/0:x')), ), pfx='') return True