From 9f1d647f07a365d78d1fef10cd83389f29cd6816 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Mon, 18 May 2020 13:25:00 +0000 Subject: [PATCH] new class: MMGenTxFile --- mmgen/exception.py | 1 + mmgen/tx.py | 185 +----------------------------- mmgen/txfile.py | 224 +++++++++++++++++++++++++++++++++++++ setup.py | 1 + test/unit_tests_d/ut_tx.py | 42 +++++++ 5 files changed, 274 insertions(+), 179 deletions(-) create mode 100755 mmgen/txfile.py create mode 100755 test/unit_tests_d/ut_tx.py diff --git a/mmgen/exception.py b/mmgen/exception.py index 575570d7..a6777542 100755 --- a/mmgen/exception.py +++ b/mmgen/exception.py @@ -49,6 +49,7 @@ class TransactionChainMismatch(Exception):mmcode = 2 class RPCFailure(Exception): mmcode = 3 class BadTxSizeEstimate(Exception): mmcode = 3 class MaxInputSizeExceeded(Exception): mmcode = 3 +class MaxFileSizeExceeded(Exception): mmcode = 3 class WalletFileError(Exception): mmcode = 3 class HexadecimalStringError(Exception): mmcode = 3 class SeedLengthError(Exception): mmcode = 3 diff --git a/mmgen/tx.py b/mmgen/tx.py index 4429e68f..dc4fb117 100755 --- a/mmgen/tx.py +++ b/mmgen/tx.py @@ -324,9 +324,6 @@ class MMGenTX(MMGenObject): self.txid = '' self.coin_txid = '' self.timestamp = '' - self.chksum = '' - self.fmt_data = '' - self.fn = '' self.blockcount = 0 self.chain = None self.coin = None @@ -336,7 +333,8 @@ class MMGenTX(MMGenObject): self.tw = tw if filename: - self.parse_tx_file(filename,metadata_only=metadata_only,quiet_open=quiet_open) + from .txfile import MMGenTxFile + MMGenTxFile(self).parse(filename,metadata_only=metadata_only,quiet_open=quiet_open) if metadata_only: return self.check_pubkey_scripts() @@ -345,6 +343,10 @@ class MMGenTX(MMGenObject): # repeat with sign and send, because coin daemon could be restarted self.check_correct_chain() + def write_to_file(self,*args,**kwargs): + from .txfile import MMGenTxFile + MMGenTxFile(self).write(*args,**kwargs) + def check_correct_chain(self): bad = self.chain and g.chain and self.chain != g.chain if bad and hasattr(g.proto,'chain_name'): @@ -688,39 +690,6 @@ class MMGenTX(MMGenObject): def add_blockcount(self): self.blockcount = g.rpc.blockcount - def format(self): - self.inputs.check_coin_mismatch() - self.outputs.check_coin_mismatch() - def amt_to_str(d): - return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d} - coin_id = '' if g.coin == 'BTC' else g.coin + ('' if g.coin == g.dcoin else ':'+g.dcoin) - lines = [ - '{}{} {} {} {} {}{}'.format( - (coin_id+' ' if coin_id else ''), - self.chain.upper() if self.chain else 'Unknown', - self.txid, - self.send_amt, - self.timestamp, - self.blockcount, - ('',' LT={}'.format(self.locktime))[bool(self.locktime)] - ), - self.hex, - repr([amt_to_str(e.__dict__) for e in self.inputs]), - repr([amt_to_str(e.__dict__) for e in self.outputs]) - ] - if self.label: - from .baseconv import baseconv - lines.append(baseconv.frombytes(self.label.encode(),'b58',tostr=True)) - if self.coin_txid: - if not self.label: - lines.append('-') # keep old tx files backwards compatible - lines.append(self.coin_txid) - self.chksum = make_chksum_6(' '.join(lines)) - self.fmt_data = '\n'.join([self.chksum] + lines)+'\n' - - assert len(self.fmt_data) <= g.max_tx_file_size,( - 'Transaction file size exceeds limit ({} bytes)'.format(g.max_tx_file_size)) - def get_non_mmaddrs(self,desc): return {i.addr for i in getattr(self,desc) if not i.mmid} @@ -952,42 +921,6 @@ class MMGenTX(MMGenObject): self.add_blockcount() return True - def create_fn(self): - tl = self.get_hex_locktime() - tn = ('','.testnet')[g.proto.testnet] - self.fn = '{}{}[{!s}{}{}]{x}{}.{}'.format( - self.txid, - ('-'+g.dcoin,'')[g.coin=='BTC'], - self.send_amt, - ('',',{}'.format(self.fee_abs2rel( - self.get_fee(),to_unit=self.fn_fee_unit)) - )[self.is_replaceable()], - ('',',tl={}'.format(tl))[bool(tl)], - tn,self.ext, - x='-α' if g.debug_utf8 else '') - - def write_to_file( self, - add_desc='', - ask_write=True, - ask_write_default_yes=False, - ask_tty=True, - ask_overwrite=True): - - if ask_write == False: - ask_write_default_yes = True - - if not self.fmt_data: - self.format() - - if not self.fn: - self.create_fn() - - write_data_to_file(self.fn,self.fmt_data,self.desc+add_desc, - ask_overwrite=ask_overwrite, - ask_write=ask_write, - ask_tty=ask_tty, - ask_write_default_yes=ask_write_default_yes) - def view_with_prompt(self,prompt='',pause=True): prompt += ' (y)es, (N)o, pager (v)iew, (t)erse view: ' from .term import get_char @@ -1157,112 +1090,6 @@ class MMGenTX(MMGenObject): def parse_txfile_hex_data(self): pass - def parse_tx_file(self,infile,metadata_only=False,quiet_open=False): - - def eval_io_data(raw_data,desc): - from ast import literal_eval - try: - d = literal_eval(raw_data) - except: - if desc == 'inputs' and not quiet_open: - ymsg('Warning: transaction data appears to be in old format') - import re - d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data)) - assert type(d) == list,'{} data not a list!'.format(desc) - if not (desc == 'outputs' and g.proto.base_coin == 'ETH'): # ETH txs can have no outputs - assert len(d),'no {}!'.format(desc) - for e in d: - e['amt'] = g.proto.coin_amt(e['amt']) - io,io_list = ( - (MMGenTxOutput,MMGenTxOutputList), - (MMGenTxInput,MMGenTxInputList) - )[desc=='inputs'] - return io_list([io(**e) for e in d]) - - tx_data = get_data_from_file(infile,self.desc+' data',quiet=quiet_open) - - try: - desc = 'data' - assert len(tx_data) <= g.max_tx_file_size,( - 'Transaction file size exceeds limit ({} bytes)'.format(g.max_tx_file_size)) - tx_data = tx_data.splitlines() - assert len(tx_data) >= 5,'number of lines less than 5' - assert len(tx_data[0]) == 6,'invalid length of first line' - self.chksum = HexStr(tx_data.pop(0),on_fail='raise') - assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum' - - if len(tx_data) == 6: - assert len(tx_data[-1]) == 64,'invalid coin TxID length' - desc = f'{g.proto.name} TxID' - self.coin_txid = CoinTxID(tx_data.pop(-1),on_fail='raise') - - if len(tx_data) == 5: - # rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up)) - assert len(tx_data[-1]) < MMGenTxLabel.max_len*6,'invalid comment length' - c = tx_data.pop(-1) - if c != '-': - desc = 'encoded comment (not base58)' - from .baseconv import baseconv - comment = baseconv.tobytes(c,'b58').decode() - assert comment != False,'invalid comment' - desc = 'comment' - self.label = MMGenTxLabel(comment,on_fail='raise') - - desc = 'number of lines' # four required lines - metadata,self.hex,inputs_data,outputs_data = tx_data - assert len(metadata) < 100,'invalid metadata length' # rough check - metadata = metadata.split() - - if metadata[-1].find('LT=') == 0: - desc = 'locktime' - self.locktime = int(metadata.pop()[3:]) - - self.coin = metadata.pop(0) if len(metadata) == 6 else 'BTC' - if ':' in self.coin: - self.coin,self.dcoin = self.coin.split(':') - - if len(metadata) == 5: - t = metadata.pop(0) - self.chain = (t.lower(),None)[t=='Unknown'] - - desc = 'metadata (4 items minimum required)' - txid,send_amt,self.timestamp,blockcount = metadata - - desc = 'txid in metadata' - self.txid = MMGenTxID(txid,on_fail='raise') - desc = 'send amount in metadata' - self.send_amt = UnknownCoinAmt(send_amt) # temporary, for 'metadata_only' - desc = 'block count in metadata' - self.blockcount = int(blockcount) - - if metadata_only: - return - - desc = 'send amount in metadata' - self.send_amt = g.proto.coin_amt(send_amt,on_fail='raise') - - desc = 'transaction file hex data' - self.check_txfile_hex_data() - desc = f'transaction file {self.hexdata_type} data' - self.parse_txfile_hex_data() - # the following ops will all fail if g.coin doesn't match self.coin - desc = 'coin type in metadata' - assert self.coin == g.coin,self.coin - desc = 'inputs data' - self.inputs = eval_io_data(inputs_data,'inputs') - desc = 'outputs data' - self.outputs = eval_io_data(outputs_data,'outputs') - except Exception as e: - die(2,f'Invalid {desc} in transaction file: {e.args[0]}') - - # test doesn't work for Ethereum: test and mainnet addrs have same format - if not self.chain and not self.inputs[0].addr.is_for_chain('testnet'): - self.chain = 'mainnet' - - if self.dcoin: - self.resolve_g_token_from_txfile() - g.proto.dcoin = self.dcoin - def process_cmd_arg(self,arg,ad_f,ad_w): def add_output_chk(addr,amt,err_desc): diff --git a/mmgen/txfile.py b/mmgen/txfile.py new file mode 100755 index 00000000..c33c3648 --- /dev/null +++ b/mmgen/txfile.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2020 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +txfile.py: Transaction file operations for the MMGen suite +""" + +from .common import * +from .obj import HexStr,MMGenTxID,UnknownCoinAmt,CoinTxID,MMGenTxLabel +from .tx import MMGenTxOutput,MMGenTxOutputList,MMGenTxInput,MMGenTxInputList +from .exception import MaxFileSizeExceeded + +class MMGenTxFile: + + def __init__(self,tx): + self.tx = tx + self.chksum = None + self.fmt_data = None + self.filename = None + + def parse(self,infile,metadata_only=False,quiet_open=False): + tx = self.tx + + def eval_io_data(raw_data,desc): + from ast import literal_eval + try: + d = literal_eval(raw_data) + except: + if desc == 'inputs' and not quiet_open: + ymsg('Warning: transaction data appears to be in old format') + import re + d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data)) + assert type(d) == list,'{} data not a list!'.format(desc) + if not (desc == 'outputs' and g.proto.base_coin == 'ETH'): # ETH txs can have no outputs + assert len(d),'no {}!'.format(desc) + for e in d: + e['amt'] = g.proto.coin_amt(e['amt']) + io,io_list = ( + (MMGenTxOutput,MMGenTxOutputList), + (MMGenTxInput,MMGenTxInputList) + )[desc=='inputs'] + return io_list(io(**e) for e in d) + + tx_data = get_data_from_file(infile,tx.desc+' data',quiet=quiet_open) + + try: + desc = 'data' + if len(tx_data) > g.max_tx_file_size: + raise MaxFileSizeExceeded(f'Transaction file size exceeds limit ({g.max_tx_file_size} bytes)') + tx_data = tx_data.splitlines() + assert len(tx_data) >= 5,'number of lines less than 5' + assert len(tx_data[0]) == 6,'invalid length of first line' + self.chksum = HexStr(tx_data.pop(0),on_fail='raise') + assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum' + + if len(tx_data) == 6: + assert len(tx_data[-1]) == 64,'invalid coin TxID length' + desc = f'{g.proto.name} TxID' + tx.coin_txid = CoinTxID(tx_data.pop(-1),on_fail='raise') + + if len(tx_data) == 5: + # rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up)) + assert len(tx_data[-1]) < MMGenTxLabel.max_len*6,'invalid comment length' + c = tx_data.pop(-1) + if c != '-': + desc = 'encoded comment (not base58)' + from .baseconv import baseconv + comment = baseconv.tobytes(c,'b58').decode() + assert comment != False,'invalid comment' + desc = 'comment' + tx.label = MMGenTxLabel(comment,on_fail='raise') + + desc = 'number of lines' # four required lines + metadata,tx.hex,inputs_data,outputs_data = tx_data + assert len(metadata) < 100,'invalid metadata length' # rough check + metadata = metadata.split() + + if metadata[-1].startswith('LT='): + desc = 'locktime' + tx.locktime = int(metadata.pop()[3:]) + + tx.coin = metadata.pop(0) if len(metadata) == 6 else 'BTC' + if ':' in tx.coin: + tx.coin,tx.dcoin = tx.coin.split(':') + + if len(metadata) == 5: + t = metadata.pop(0) + tx.chain = (t.lower(),None)[t=='Unknown'] + + desc = 'metadata (4 items minimum required)' + txid,send_amt,tx.timestamp,blockcount = metadata + + desc = 'txid in metadata' + tx.txid = MMGenTxID(txid,on_fail='raise') + desc = 'send amount in metadata' + tx.send_amt = UnknownCoinAmt(send_amt) # temporary, for 'metadata_only' + desc = 'block count in metadata' + tx.blockcount = int(blockcount) + + if metadata_only: + return + + desc = 'send amount in metadata' + tx.send_amt = g.proto.coin_amt(send_amt,on_fail='raise') + + desc = 'transaction file hex data' + tx.check_txfile_hex_data() + desc = f'transaction file {tx.hexdata_type} data' + tx.parse_txfile_hex_data() + # the following ops will all fail if g.coin doesn't match tx.coin + desc = 'coin type in metadata' + assert tx.coin == g.coin, tx.coin + desc = 'inputs data' + tx.inputs = eval_io_data(inputs_data,'inputs') + desc = 'outputs data' + tx.outputs = eval_io_data(outputs_data,'outputs') + except Exception as e: + die(2,f'Invalid {desc} in transaction file: {e.args[0]}') + + # is_for_chain() is no-op for Ethereum: test and mainnet addrs have same format + if not tx.chain and not tx.inputs[0].addr.is_for_chain('testnet'): + tx.chain = 'mainnet' + + if tx.dcoin: + tx.resolve_g_token_from_txfile() + g.proto.dcoin = tx.dcoin + + def make_filename(self): + tx = self.tx + def gen_filename(): + yield tx.txid + if g.coin != 'BTC': + yield '-' + g.dcoin + yield f'[{tx.send_amt!s}' + if tx.is_replaceable(): + yield ',{}'.format(tx.fee_abs2rel(tx.get_fee(),to_unit=tx.fn_fee_unit)) + if tx.get_hex_locktime(): + yield ',tl={}'.format(tx.get_hex_locktime()) + yield ']' + if g.debug_utf8: + yield '-α' + if g.proto.testnet: + yield '.testnet' + yield '.' + tx.ext + return ''.join(gen_filename()) + + def format(self): + tx = self.tx + tx.inputs.check_coin_mismatch() + tx.outputs.check_coin_mismatch() + + def amt_to_str(d): + return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d} + + coin_id = '' if g.coin == 'BTC' else g.coin + ('' if g.coin == g.dcoin else ':'+g.dcoin) + lines = [ + '{}{} {} {} {} {}{}'.format( + (coin_id+' ' if coin_id else ''), + tx.chain.upper() if tx.chain else 'Unknown', + tx.txid, + tx.send_amt, + tx.timestamp, + tx.blockcount, + ('',' LT={}'.format(tx.locktime))[bool(tx.locktime)] + ), + tx.hex, + ascii([amt_to_str(e.__dict__) for e in tx.inputs]), + ascii([amt_to_str(e.__dict__) for e in tx.outputs]) + ] + + if tx.label: + from .baseconv import baseconv + lines.append(baseconv.frombytes(tx.label.encode(),'b58',tostr=True)) + + if tx.coin_txid: + if not tx.label: + lines.append('-') # keep old tx files backwards compatible + lines.append(tx.coin_txid) + + self.chksum = make_chksum_6(' '.join(lines)) + fmt_data = '\n'.join([self.chksum] + lines) + '\n' + if len(fmt_data) > g.max_tx_file_size: + raise MaxFileSizeExceeded(f'Transaction file size exceeds limit ({g.max_tx_file_size} bytes)') + return fmt_data + + def write(self, + add_desc = '', + ask_write = True, + ask_write_default_yes = False, + ask_tty = True, + ask_overwrite = True ): + + if ask_write == False: + ask_write_default_yes = True + + if not self.filename: + self.filename = self.make_filename() + + if not self.fmt_data: + self.fmt_data = self.format() + + write_data_to_file( + outfile = self.filename, + data = self.fmt_data, + desc = self.tx.desc + add_desc, + ask_overwrite = ask_overwrite, + ask_write = ask_write, + ask_tty = ask_tty, + ask_write_default_yes = ask_write_default_yes ) diff --git a/setup.py b/setup.py index beb5c2f8..7884aca4 100755 --- a/setup.py +++ b/setup.py @@ -131,6 +131,7 @@ setup( 'mmgen.tool', 'mmgen.tw', 'mmgen.tx', + 'mmgen.txfile', 'mmgen.txsign', 'mmgen.util', 'mmgen.wallet', diff --git a/test/unit_tests_d/ut_tx.py b/test/unit_tests_d/ut_tx.py new file mode 100755 index 00000000..88c4a791 --- /dev/null +++ b/test/unit_tests_d/ut_tx.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +test.unit_tests_d.ut_tx: TX unit test for the MMGen suite +""" + +import re +from mmgen.common import * +from mmgen.tx import MMGenTX +from mmgen.txfile import MMGenTxFile + +class unit_tests: + + def txfile(self,name,ut): + + qmsg(' Testing TX file operations') + + fns = ( # TODO: add altcoin TX files + '0B8D5A[15.31789,14,tl=1320969600].rawtx', + '0C7115[15.86255,14,tl=1320969600].testnet.rawtx', + '460D4D-BCH[10.19764,tl=1320969600].rawtx', + '25EFA3[2.34].testnet.rawtx', + ) + for fn in fns: + fpath = os.path.join('test','ref',fn) + tx = MMGenTX(filename=fpath,quiet_open=True) + f = MMGenTxFile(tx) + + fn_gen = f.make_filename() + vmsg(f' parsed: {fn_gen}') + assert fn_gen == fn, f'{fn_gen} != {fn}' + + text = f.format() + # New in version 3.3: Support for the unicode legacy literal (u'value') was + # reintroduced to simplify the maintenance of dual Python 2.x and 3.x codebases. + # See PEP 414 for more information. + chk = re.subn(r"\bu'",r"'",open(fpath).read())[0] # remove Python2 'u' string prefixes from ref files + nLines = len([i for i in get_ndiff(chk,text) if i.startswith('-')]) + assert nLines == 1, f'{nLines} lines differ: only checksum line should differ' + break # FIXME - test BCH, testnet + + qmsg(' OK') + return True