From a38f4e4fd97ead02db0f84ec20846a951e279c85 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Sat, 12 Oct 2024 09:14:40 +0000 Subject: [PATCH] tx.file: cleanups; other minor cleanups --- mmgen/cfg.py | 6 +- mmgen/tx/file.py | 160 ++++++++++++++++++++----------------- test/unit_tests_d/ut_tx.py | 23 +----- 3 files changed, 95 insertions(+), 94 deletions(-) diff --git a/mmgen/cfg.py b/mmgen/cfg.py index dd3c4994..34286af1 100755 --- a/mmgen/cfg.py +++ b/mmgen/cfg.py @@ -492,8 +492,10 @@ class Config(Lockable): # class attribute, if it exists: auto_opts = tuple(self._autoset_opts) + tuple(self._auto_typeset_opts) for key,val in self._uopts.items(): - assert key.isascii() and key.isidentifier() and key[0] != '_', '{key!r}: malformed configuration option' - assert key not in self._forbidden_opts, '{key!r}: forbidden configuration option' + assert key.isascii() and key.isidentifier() and key[0] != '_', ( + f'{key!r}: malformed configuration option') + assert key not in self._forbidden_opts, ( + f'{key!r}: forbidden configuration option') if key not in auto_opts: if hasattr(type(self), key): setattr( diff --git a/mmgen/tx/file.py b/mmgen/tx/file.py index fac6e339..53bc0ca0 100755 --- a/mmgen/tx/file.py +++ b/mmgen/tx/file.py @@ -24,53 +24,66 @@ import os from ..util import ymsg,make_chksum_6,die from ..obj import MMGenObject,HexStr,MMGenTxID,CoinTxID,MMGenTxComment +def get_proto_from_coin_id(tx, coin_id, chain): + coin, tokensym = coin_id.split(':') if ':' in coin_id else (coin_id, None) + + from ..protocol import CoinProtocol, init_proto + network = CoinProtocol.Base.chain_name_to_network(tx.cfg, coin, chain) + + proto = init_proto(tx.cfg, coin, network=network, need_amt=True) + + if tokensym: + proto.tokensym = tokensym + + return proto + +def eval_io_data(tx, data, desc): + if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs + assert len(data), f'no {desc}!' + for d in data: + d['amt'] = tx.proto.coin_amt(d['amt']) + io, io_list = { + 'inputs': (tx.Input, tx.InputList), + 'outputs': (tx.Output, tx.OutputList), + }[desc] + return io_list(parent=tx, data=[io(tx.proto,**d) for d in data]) + class MMGenTxFile(MMGenObject): def __init__(self,tx): self.tx = tx - self.chksum = None self.fmt_data = None self.filename = None - def parse(self,infile,metadata_only=False,quiet_open=False): + def parse(self, infile, metadata_only=False, quiet_open=False): tx = self.tx + from ..fileutil import get_data_from_file + data = get_data_from_file(tx.cfg, infile, f'{tx.desc} data', quiet=quiet_open) + if len(data) > tx.cfg.max_tx_file_size: + die('MaxFileSizeExceeded', + f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)') + return self.parse_data_legacy(data, metadata_only) - def eval_io_data(raw_data,desc): + def parse_data_legacy(self, data, metadata_only): + tx = self.tx + tx.file_format = 'legacy' + + def deserialize(raw_data, desc): from ast import literal_eval try: - d = literal_eval(raw_data) + return literal_eval(raw_data) except: - if desc == 'inputs' and not quiet_open: + if desc == 'inputs': ymsg('Warning: transaction data appears to be in old format') import re - d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data)) - assert isinstance(d,list), f'{desc} data not a list!' - if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs - assert len(d), f'no {desc}!' - for e in d: - e['amt'] = tx.proto.coin_amt(e['amt']) - if 'label' in e: - e['comment'] = e['label'] - del e['label'] - io,io_list = { - 'inputs': ( tx.Input, tx.InputList ), - 'outputs': ( tx.Output, tx.OutputList ), - }[desc] - return io_list( parent=tx, data=[io(tx.proto,**e) for e in d] ) - - from ..fileutil import get_data_from_file - tx_data = get_data_from_file( tx.cfg, infile, tx.desc+' data', quiet=quiet_open ) + return literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1', raw_data)) desc = 'data' try: - if len(tx_data) > tx.cfg.max_tx_file_size: - die('MaxFileSizeExceeded', - f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)') - tx_data = tx_data.splitlines() + tx_data = data.splitlines() assert len(tx_data) >= 5,'number of lines less than 5' assert len(tx_data[0]) == 6,'invalid length of first line' - self.chksum = HexStr(tx_data.pop(0)) - assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum' + assert HexStr(tx_data.pop(0)) == make_chksum_6(' '.join(tx_data)), 'file data does not match checksum' if len(tx_data) == 7: desc = 'sent timestamp' @@ -95,7 +108,8 @@ class MMGenTxFile(MMGenObject): tx.comment = MMGenTxComment(comment) desc = 'number of lines' # four required lines - ( metadata, tx.serialized, inputs_data, outputs_data ) = tx_data + io_data = {} + (metadata, tx.serialized, io_data['inputs'], io_data['outputs']) = tx_data assert len(metadata) < 100,'invalid metadata length' # rough check metadata = metadata.split() @@ -104,19 +118,13 @@ class MMGenTxFile(MMGenObject): tx.locktime = int(metadata.pop()[3:]) desc = 'coin token in metadata' - coin = metadata.pop(0) if len(metadata) == 6 else 'BTC' - coin,tokensym = coin.split(':') if ':' in coin else (coin,None) + coin_id = metadata.pop(0) if len(metadata) == 6 else 'BTC' desc = 'chain token in metadata' tx.chain = metadata.pop(0).lower() if len(metadata) == 5 else 'mainnet' - from ..protocol import CoinProtocol,init_proto - network = CoinProtocol.Base.chain_name_to_network(tx.cfg,coin,tx.chain) - - desc = 'initialization of protocol' - tx.proto = init_proto( tx.cfg, coin, network=network, need_amt=True ) - if tokensym: - tx.proto.tokensym = tokensym + desc = 'coin_id or chain' + tx.proto = get_proto_from_coin_id(tx, coin_id, tx.chain) desc = 'metadata (4 items)' (txid, send_amt, tx.timestamp, blockcount) = metadata @@ -133,13 +141,16 @@ class MMGenTxFile(MMGenObject): tx.check_txfile_hex_data() desc = 'Ethereum RLP or JSON data' tx.parse_txfile_serialized_data() - desc = 'inputs data' - tx.inputs = eval_io_data(inputs_data,'inputs') - desc = 'outputs data' - tx.outputs = eval_io_data(outputs_data,'outputs') + for k in ('inputs', 'outputs'): + desc = f'{k} data' + res = deserialize(io_data[k], k) + for d in res: + if 'label' in d: + d['comment'] = d['label'] + del d['label'] + setattr(tx, k, eval_io_data(tx, res, k)) desc = 'send amount in metadata' - from decimal import Decimal - assert Decimal(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}' + assert tx.proto.coin_amt(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}' except Exception as e: die(2,f'Invalid {desc} in transaction file: {e!s}') @@ -162,42 +173,47 @@ class MMGenTxFile(MMGenObject): def format(self): tx = self.tx + coin_id = tx.coin + ('' if tx.coin == tx.dcoin else ':'+tx.dcoin) - def amt_to_str(d): - return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d} + def format_data_legacy(): - coin_id = '' if tx.coin == 'BTC' else tx.coin + ('' if tx.coin == tx.dcoin else ':'+tx.dcoin) - lines = [ - '{}{} {} {} {} {}{}'.format( - (coin_id+' ' if coin_id else ''), - tx.chain.upper(), - tx.txid, - tx.send_amt, - tx.timestamp, - tx.blockcount, - (f' LT={tx.locktime}' if tx.locktime else ''), - ), - tx.serialized, - ascii([amt_to_str(e._asdict()) for e in tx.inputs]), - ascii([amt_to_str(e._asdict()) for e in tx.outputs]) - ] + def amt_to_str(d): + return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d} - if tx.comment: - from ..baseconv import baseconv - lines.append(baseconv('b58').frombytes(tx.comment.encode(),tostr=True)) + lines = [ + '{}{} {} {} {} {}{}'.format( + (f'{coin_id} ' if coin_id and tx.coin != 'BTC' else ''), + tx.chain.upper(), + tx.txid, + tx.send_amt, + tx.timestamp, + tx.blockcount, + (f' LT={tx.locktime}' if tx.locktime else ''), + ), + tx.serialized, + ascii([amt_to_str(e._asdict()) for e in tx.inputs]), + ascii([amt_to_str(e._asdict()) for e in tx.outputs]) + ] - if tx.coin_txid: - if not tx.comment: - lines.append('-') # keep old tx files backwards compatible - lines.append(tx.coin_txid) + if tx.comment: + from ..baseconv import baseconv + lines.append(baseconv('b58').frombytes(tx.comment.encode(),tostr=True)) - if tx.sent_timestamp: - lines.append(f'Sent {tx.sent_timestamp}') + if tx.coin_txid: + if not tx.comment: + lines.append('-') # keep old tx files backwards compatible + lines.append(tx.coin_txid) + + if tx.sent_timestamp: + lines.append(f'Sent {tx.sent_timestamp}') + + return '\n'.join([make_chksum_6(' '.join(lines))] + lines) + '\n' + + fmt_data = format_data_legacy() - self.chksum = make_chksum_6(' '.join(lines)) - fmt_data = '\n'.join([self.chksum] + lines) + '\n' if len(fmt_data) > tx.cfg.max_tx_file_size: die( 'MaxFileSizeExceeded', f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)' ) + return fmt_data def write(self, diff --git a/test/unit_tests_d/ut_tx.py b/test/unit_tests_d/ut_tx.py index 454b5d0b..0ab811a7 100755 --- a/test/unit_tests_d/ut_tx.py +++ b/test/unit_tests_d/ut_tx.py @@ -12,16 +12,16 @@ from mmgen.tx.file import MMGenTxFile from mmgen.daemon import CoinDaemon from mmgen.protocol import init_proto -from ..include.common import cfg,qmsg,vmsg +from ..include.common import cfg, qmsg, vmsg async def do_txfile_test(desc,fns): qmsg(f' Testing CompletedTX initializer ({desc})') for fn in fns: qmsg(f' parsing: {os.path.basename(fn)}') fpath = os.path.join('test','ref',fn) - tx = await CompletedTX( cfg=cfg, filename=fpath, quiet_open=True ) + tx = await CompletedTX(cfg=cfg, filename=fpath, quiet_open=True) - vmsg(tx.info.format()) + vmsg('\n' + tx.info.format()) f = MMGenTxFile(tx) fn_gen = f.make_filename() @@ -33,23 +33,6 @@ async def do_txfile_test(desc,fns): text = f.format() - continue # TODO: check disabled after label -> comment patch - - with open(fpath) as fp: - chk = fp.read() - - # remove Python2 'u' string prefixes from ref files: - # New in version 3.3: Support for the unicode legacy literal (u'value') was - # reintroduced to simplify the maintenance of dual Python 2.x and 3.x codebases. - # See PEP 414 for more information. - chk = chk.replace("'label':","'comment':") # TODO - chk = re.subn( r"\bu(['\"])", r'\1', chk )[0] - - diff = get_ndiff(chk,text) - print(get_diff(chk,text,from_json=False)) - nLines = len([i for i in diff if i.startswith('-')]) - assert nLines in (0,1), f'{nLines} lines differ: only checksum line may differ' - qmsg(' OK') return True