tx.file: cleanups; other minor cleanups

This commit is contained in:
The MMGen Project 2024-10-12 09:14:40 +00:00
commit a38f4e4fd9
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
3 changed files with 100 additions and 99 deletions

View file

@ -492,8 +492,10 @@ class Config(Lockable):
# class attribute, if it exists:
auto_opts = tuple(self._autoset_opts) + tuple(self._auto_typeset_opts)
for key,val in self._uopts.items():
assert key.isascii() and key.isidentifier() and key[0] != '_', '{key!r}: malformed configuration option'
assert key not in self._forbidden_opts, '{key!r}: forbidden configuration option'
assert key.isascii() and key.isidentifier() and key[0] != '_', (
f'{key!r}: malformed configuration option')
assert key not in self._forbidden_opts, (
f'{key!r}: forbidden configuration option')
if key not in auto_opts:
if hasattr(type(self), key):
setattr(

View file

@ -24,53 +24,66 @@ import os
from ..util import ymsg,make_chksum_6,die
from ..obj import MMGenObject,HexStr,MMGenTxID,CoinTxID,MMGenTxComment
def get_proto_from_coin_id(tx, coin_id, chain):
coin, tokensym = coin_id.split(':') if ':' in coin_id else (coin_id, None)
from ..protocol import CoinProtocol, init_proto
network = CoinProtocol.Base.chain_name_to_network(tx.cfg, coin, chain)
proto = init_proto(tx.cfg, coin, network=network, need_amt=True)
if tokensym:
proto.tokensym = tokensym
return proto
def eval_io_data(tx, data, desc):
if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs
assert len(data), f'no {desc}!'
for d in data:
d['amt'] = tx.proto.coin_amt(d['amt'])
io, io_list = {
'inputs': (tx.Input, tx.InputList),
'outputs': (tx.Output, tx.OutputList),
}[desc]
return io_list(parent=tx, data=[io(tx.proto,**d) for d in data])
class MMGenTxFile(MMGenObject):
def __init__(self,tx):
self.tx = tx
self.chksum = None
self.fmt_data = None
self.filename = None
def parse(self,infile,metadata_only=False,quiet_open=False):
def parse(self, infile, metadata_only=False, quiet_open=False):
tx = self.tx
from ..fileutil import get_data_from_file
data = get_data_from_file(tx.cfg, infile, f'{tx.desc} data', quiet=quiet_open)
if len(data) > tx.cfg.max_tx_file_size:
die('MaxFileSizeExceeded',
f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)')
return self.parse_data_legacy(data, metadata_only)
def eval_io_data(raw_data,desc):
def parse_data_legacy(self, data, metadata_only):
tx = self.tx
tx.file_format = 'legacy'
def deserialize(raw_data, desc):
from ast import literal_eval
try:
d = literal_eval(raw_data)
return literal_eval(raw_data)
except:
if desc == 'inputs' and not quiet_open:
if desc == 'inputs':
ymsg('Warning: transaction data appears to be in old format')
import re
d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
assert isinstance(d,list), f'{desc} data not a list!'
if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs
assert len(d), f'no {desc}!'
for e in d:
e['amt'] = tx.proto.coin_amt(e['amt'])
if 'label' in e:
e['comment'] = e['label']
del e['label']
io,io_list = {
'inputs': ( tx.Input, tx.InputList ),
'outputs': ( tx.Output, tx.OutputList ),
}[desc]
return io_list( parent=tx, data=[io(tx.proto,**e) for e in d] )
from ..fileutil import get_data_from_file
tx_data = get_data_from_file( tx.cfg, infile, tx.desc+' data', quiet=quiet_open )
return literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1', raw_data))
desc = 'data'
try:
if len(tx_data) > tx.cfg.max_tx_file_size:
die('MaxFileSizeExceeded',
f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)')
tx_data = tx_data.splitlines()
tx_data = data.splitlines()
assert len(tx_data) >= 5,'number of lines less than 5'
assert len(tx_data[0]) == 6,'invalid length of first line'
self.chksum = HexStr(tx_data.pop(0))
assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum'
assert HexStr(tx_data.pop(0)) == make_chksum_6(' '.join(tx_data)), 'file data does not match checksum'
if len(tx_data) == 7:
desc = 'sent timestamp'
@ -95,7 +108,8 @@ class MMGenTxFile(MMGenObject):
tx.comment = MMGenTxComment(comment)
desc = 'number of lines' # four required lines
( metadata, tx.serialized, inputs_data, outputs_data ) = tx_data
io_data = {}
(metadata, tx.serialized, io_data['inputs'], io_data['outputs']) = tx_data
assert len(metadata) < 100,'invalid metadata length' # rough check
metadata = metadata.split()
@ -104,19 +118,13 @@ class MMGenTxFile(MMGenObject):
tx.locktime = int(metadata.pop()[3:])
desc = 'coin token in metadata'
coin = metadata.pop(0) if len(metadata) == 6 else 'BTC'
coin,tokensym = coin.split(':') if ':' in coin else (coin,None)
coin_id = metadata.pop(0) if len(metadata) == 6 else 'BTC'
desc = 'chain token in metadata'
tx.chain = metadata.pop(0).lower() if len(metadata) == 5 else 'mainnet'
from ..protocol import CoinProtocol,init_proto
network = CoinProtocol.Base.chain_name_to_network(tx.cfg,coin,tx.chain)
desc = 'initialization of protocol'
tx.proto = init_proto( tx.cfg, coin, network=network, need_amt=True )
if tokensym:
tx.proto.tokensym = tokensym
desc = 'coin_id or chain'
tx.proto = get_proto_from_coin_id(tx, coin_id, tx.chain)
desc = 'metadata (4 items)'
(txid, send_amt, tx.timestamp, blockcount) = metadata
@ -133,13 +141,16 @@ class MMGenTxFile(MMGenObject):
tx.check_txfile_hex_data()
desc = 'Ethereum RLP or JSON data'
tx.parse_txfile_serialized_data()
desc = 'inputs data'
tx.inputs = eval_io_data(inputs_data,'inputs')
desc = 'outputs data'
tx.outputs = eval_io_data(outputs_data,'outputs')
for k in ('inputs', 'outputs'):
desc = f'{k} data'
res = deserialize(io_data[k], k)
for d in res:
if 'label' in d:
d['comment'] = d['label']
del d['label']
setattr(tx, k, eval_io_data(tx, res, k))
desc = 'send amount in metadata'
from decimal import Decimal
assert Decimal(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}'
assert tx.proto.coin_amt(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}'
except Exception as e:
die(2,f'Invalid {desc} in transaction file: {e!s}')
@ -162,42 +173,47 @@ class MMGenTxFile(MMGenObject):
def format(self):
tx = self.tx
coin_id = tx.coin + ('' if tx.coin == tx.dcoin else ':'+tx.dcoin)
def amt_to_str(d):
return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d}
def format_data_legacy():
coin_id = '' if tx.coin == 'BTC' else tx.coin + ('' if tx.coin == tx.dcoin else ':'+tx.dcoin)
lines = [
'{}{} {} {} {} {}{}'.format(
(coin_id+' ' if coin_id else ''),
tx.chain.upper(),
tx.txid,
tx.send_amt,
tx.timestamp,
tx.blockcount,
(f' LT={tx.locktime}' if tx.locktime else ''),
),
tx.serialized,
ascii([amt_to_str(e._asdict()) for e in tx.inputs]),
ascii([amt_to_str(e._asdict()) for e in tx.outputs])
]
def amt_to_str(d):
return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d}
if tx.comment:
from ..baseconv import baseconv
lines.append(baseconv('b58').frombytes(tx.comment.encode(),tostr=True))
lines = [
'{}{} {} {} {} {}{}'.format(
(f'{coin_id} ' if coin_id and tx.coin != 'BTC' else ''),
tx.chain.upper(),
tx.txid,
tx.send_amt,
tx.timestamp,
tx.blockcount,
(f' LT={tx.locktime}' if tx.locktime else ''),
),
tx.serialized,
ascii([amt_to_str(e._asdict()) for e in tx.inputs]),
ascii([amt_to_str(e._asdict()) for e in tx.outputs])
]
if tx.coin_txid:
if not tx.comment:
lines.append('-') # keep old tx files backwards compatible
lines.append(tx.coin_txid)
if tx.comment:
from ..baseconv import baseconv
lines.append(baseconv('b58').frombytes(tx.comment.encode(),tostr=True))
if tx.sent_timestamp:
lines.append(f'Sent {tx.sent_timestamp}')
if tx.coin_txid:
if not tx.comment:
lines.append('-') # keep old tx files backwards compatible
lines.append(tx.coin_txid)
if tx.sent_timestamp:
lines.append(f'Sent {tx.sent_timestamp}')
return '\n'.join([make_chksum_6(' '.join(lines))] + lines) + '\n'
fmt_data = format_data_legacy()
self.chksum = make_chksum_6(' '.join(lines))
fmt_data = '\n'.join([self.chksum] + lines) + '\n'
if len(fmt_data) > tx.cfg.max_tx_file_size:
die( 'MaxFileSizeExceeded', f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)' )
return fmt_data
def write(self,

View file

@ -12,16 +12,16 @@ from mmgen.tx.file import MMGenTxFile
from mmgen.daemon import CoinDaemon
from mmgen.protocol import init_proto
from ..include.common import cfg,qmsg,vmsg
from ..include.common import cfg, qmsg, vmsg
async def do_txfile_test(desc,fns):
qmsg(f' Testing CompletedTX initializer ({desc})')
for fn in fns:
qmsg(f' parsing: {os.path.basename(fn)}')
fpath = os.path.join('test','ref',fn)
tx = await CompletedTX( cfg=cfg, filename=fpath, quiet_open=True )
tx = await CompletedTX(cfg=cfg, filename=fpath, quiet_open=True)
vmsg(tx.info.format())
vmsg('\n' + tx.info.format())
f = MMGenTxFile(tx)
fn_gen = f.make_filename()
@ -33,23 +33,6 @@ async def do_txfile_test(desc,fns):
text = f.format()
continue # TODO: check disabled after label -> comment patch
with open(fpath) as fp:
chk = fp.read()
# remove Python2 'u' string prefixes from ref files:
# New in version 3.3: Support for the unicode legacy literal (u'value') was
# reintroduced to simplify the maintenance of dual Python 2.x and 3.x codebases.
# See PEP 414 for more information.
chk = chk.replace("'label':","'comment':") # TODO
chk = re.subn( r"\bu(['\"])", r'\1', chk )[0]
diff = get_ndiff(chk,text)
print(get_diff(chk,text,from_json=False))
nLines = len([i for i in diff if i.startswith('-')])
assert nLines in (0,1), f'{nLines} lines differ: only checksum line may differ'
qmsg(' OK')
return True