new class: MMGenTxFile

This commit is contained in:
The MMGen Project 2020-05-18 13:25:00 +00:00
commit 9f1d647f07
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
5 changed files with 274 additions and 179 deletions

View file

@ -49,6 +49,7 @@ class TransactionChainMismatch(Exception):mmcode = 2
class RPCFailure(Exception): mmcode = 3
class BadTxSizeEstimate(Exception): mmcode = 3
class MaxInputSizeExceeded(Exception): mmcode = 3
class MaxFileSizeExceeded(Exception): mmcode = 3
class WalletFileError(Exception): mmcode = 3
class HexadecimalStringError(Exception): mmcode = 3
class SeedLengthError(Exception): mmcode = 3

View file

@ -324,9 +324,6 @@ class MMGenTX(MMGenObject):
self.txid = ''
self.coin_txid = ''
self.timestamp = ''
self.chksum = ''
self.fmt_data = ''
self.fn = ''
self.blockcount = 0
self.chain = None
self.coin = None
@ -336,7 +333,8 @@ class MMGenTX(MMGenObject):
self.tw = tw
if filename:
self.parse_tx_file(filename,metadata_only=metadata_only,quiet_open=quiet_open)
from .txfile import MMGenTxFile
MMGenTxFile(self).parse(filename,metadata_only=metadata_only,quiet_open=quiet_open)
if metadata_only:
return
self.check_pubkey_scripts()
@ -345,6 +343,10 @@ class MMGenTX(MMGenObject):
# repeat with sign and send, because coin daemon could be restarted
self.check_correct_chain()
def write_to_file(self,*args,**kwargs):
from .txfile import MMGenTxFile
MMGenTxFile(self).write(*args,**kwargs)
def check_correct_chain(self):
bad = self.chain and g.chain and self.chain != g.chain
if bad and hasattr(g.proto,'chain_name'):
@ -688,39 +690,6 @@ class MMGenTX(MMGenObject):
def add_blockcount(self):
self.blockcount = g.rpc.blockcount
def format(self):
self.inputs.check_coin_mismatch()
self.outputs.check_coin_mismatch()
def amt_to_str(d):
return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d}
coin_id = '' if g.coin == 'BTC' else g.coin + ('' if g.coin == g.dcoin else ':'+g.dcoin)
lines = [
'{}{} {} {} {} {}{}'.format(
(coin_id+' ' if coin_id else ''),
self.chain.upper() if self.chain else 'Unknown',
self.txid,
self.send_amt,
self.timestamp,
self.blockcount,
('',' LT={}'.format(self.locktime))[bool(self.locktime)]
),
self.hex,
repr([amt_to_str(e.__dict__) for e in self.inputs]),
repr([amt_to_str(e.__dict__) for e in self.outputs])
]
if self.label:
from .baseconv import baseconv
lines.append(baseconv.frombytes(self.label.encode(),'b58',tostr=True))
if self.coin_txid:
if not self.label:
lines.append('-') # keep old tx files backwards compatible
lines.append(self.coin_txid)
self.chksum = make_chksum_6(' '.join(lines))
self.fmt_data = '\n'.join([self.chksum] + lines)+'\n'
assert len(self.fmt_data) <= g.max_tx_file_size,(
'Transaction file size exceeds limit ({} bytes)'.format(g.max_tx_file_size))
def get_non_mmaddrs(self,desc):
return {i.addr for i in getattr(self,desc) if not i.mmid}
@ -952,42 +921,6 @@ class MMGenTX(MMGenObject):
self.add_blockcount()
return True
def create_fn(self):
tl = self.get_hex_locktime()
tn = ('','.testnet')[g.proto.testnet]
self.fn = '{}{}[{!s}{}{}]{x}{}.{}'.format(
self.txid,
('-'+g.dcoin,'')[g.coin=='BTC'],
self.send_amt,
('',',{}'.format(self.fee_abs2rel(
self.get_fee(),to_unit=self.fn_fee_unit))
)[self.is_replaceable()],
('',',tl={}'.format(tl))[bool(tl)],
tn,self.ext,
x='' if g.debug_utf8 else '')
def write_to_file( self,
add_desc='',
ask_write=True,
ask_write_default_yes=False,
ask_tty=True,
ask_overwrite=True):
if ask_write == False:
ask_write_default_yes = True
if not self.fmt_data:
self.format()
if not self.fn:
self.create_fn()
write_data_to_file(self.fn,self.fmt_data,self.desc+add_desc,
ask_overwrite=ask_overwrite,
ask_write=ask_write,
ask_tty=ask_tty,
ask_write_default_yes=ask_write_default_yes)
def view_with_prompt(self,prompt='',pause=True):
prompt += ' (y)es, (N)o, pager (v)iew, (t)erse view: '
from .term import get_char
@ -1157,112 +1090,6 @@ class MMGenTX(MMGenObject):
def parse_txfile_hex_data(self):
pass
def parse_tx_file(self,infile,metadata_only=False,quiet_open=False):
def eval_io_data(raw_data,desc):
from ast import literal_eval
try:
d = literal_eval(raw_data)
except:
if desc == 'inputs' and not quiet_open:
ymsg('Warning: transaction data appears to be in old format')
import re
d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
assert type(d) == list,'{} data not a list!'.format(desc)
if not (desc == 'outputs' and g.proto.base_coin == 'ETH'): # ETH txs can have no outputs
assert len(d),'no {}!'.format(desc)
for e in d:
e['amt'] = g.proto.coin_amt(e['amt'])
io,io_list = (
(MMGenTxOutput,MMGenTxOutputList),
(MMGenTxInput,MMGenTxInputList)
)[desc=='inputs']
return io_list([io(**e) for e in d])
tx_data = get_data_from_file(infile,self.desc+' data',quiet=quiet_open)
try:
desc = 'data'
assert len(tx_data) <= g.max_tx_file_size,(
'Transaction file size exceeds limit ({} bytes)'.format(g.max_tx_file_size))
tx_data = tx_data.splitlines()
assert len(tx_data) >= 5,'number of lines less than 5'
assert len(tx_data[0]) == 6,'invalid length of first line'
self.chksum = HexStr(tx_data.pop(0),on_fail='raise')
assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum'
if len(tx_data) == 6:
assert len(tx_data[-1]) == 64,'invalid coin TxID length'
desc = f'{g.proto.name} TxID'
self.coin_txid = CoinTxID(tx_data.pop(-1),on_fail='raise')
if len(tx_data) == 5:
# rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up))
assert len(tx_data[-1]) < MMGenTxLabel.max_len*6,'invalid comment length'
c = tx_data.pop(-1)
if c != '-':
desc = 'encoded comment (not base58)'
from .baseconv import baseconv
comment = baseconv.tobytes(c,'b58').decode()
assert comment != False,'invalid comment'
desc = 'comment'
self.label = MMGenTxLabel(comment,on_fail='raise')
desc = 'number of lines' # four required lines
metadata,self.hex,inputs_data,outputs_data = tx_data
assert len(metadata) < 100,'invalid metadata length' # rough check
metadata = metadata.split()
if metadata[-1].find('LT=') == 0:
desc = 'locktime'
self.locktime = int(metadata.pop()[3:])
self.coin = metadata.pop(0) if len(metadata) == 6 else 'BTC'
if ':' in self.coin:
self.coin,self.dcoin = self.coin.split(':')
if len(metadata) == 5:
t = metadata.pop(0)
self.chain = (t.lower(),None)[t=='Unknown']
desc = 'metadata (4 items minimum required)'
txid,send_amt,self.timestamp,blockcount = metadata
desc = 'txid in metadata'
self.txid = MMGenTxID(txid,on_fail='raise')
desc = 'send amount in metadata'
self.send_amt = UnknownCoinAmt(send_amt) # temporary, for 'metadata_only'
desc = 'block count in metadata'
self.blockcount = int(blockcount)
if metadata_only:
return
desc = 'send amount in metadata'
self.send_amt = g.proto.coin_amt(send_amt,on_fail='raise')
desc = 'transaction file hex data'
self.check_txfile_hex_data()
desc = f'transaction file {self.hexdata_type} data'
self.parse_txfile_hex_data()
# the following ops will all fail if g.coin doesn't match self.coin
desc = 'coin type in metadata'
assert self.coin == g.coin,self.coin
desc = 'inputs data'
self.inputs = eval_io_data(inputs_data,'inputs')
desc = 'outputs data'
self.outputs = eval_io_data(outputs_data,'outputs')
except Exception as e:
die(2,f'Invalid {desc} in transaction file: {e.args[0]}')
# test doesn't work for Ethereum: test and mainnet addrs have same format
if not self.chain and not self.inputs[0].addr.is_for_chain('testnet'):
self.chain = 'mainnet'
if self.dcoin:
self.resolve_g_token_from_txfile()
g.proto.dcoin = self.dcoin
def process_cmd_arg(self,arg,ad_f,ad_w):
def add_output_chk(addr,amt,err_desc):

224
mmgen/txfile.py Executable file
View file

@ -0,0 +1,224 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2020 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
txfile.py: Transaction file operations for the MMGen suite
"""
from .common import *
from .obj import HexStr,MMGenTxID,UnknownCoinAmt,CoinTxID,MMGenTxLabel
from .tx import MMGenTxOutput,MMGenTxOutputList,MMGenTxInput,MMGenTxInputList
from .exception import MaxFileSizeExceeded
class MMGenTxFile:
def __init__(self,tx):
self.tx = tx
self.chksum = None
self.fmt_data = None
self.filename = None
def parse(self,infile,metadata_only=False,quiet_open=False):
tx = self.tx
def eval_io_data(raw_data,desc):
from ast import literal_eval
try:
d = literal_eval(raw_data)
except:
if desc == 'inputs' and not quiet_open:
ymsg('Warning: transaction data appears to be in old format')
import re
d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
assert type(d) == list,'{} data not a list!'.format(desc)
if not (desc == 'outputs' and g.proto.base_coin == 'ETH'): # ETH txs can have no outputs
assert len(d),'no {}!'.format(desc)
for e in d:
e['amt'] = g.proto.coin_amt(e['amt'])
io,io_list = (
(MMGenTxOutput,MMGenTxOutputList),
(MMGenTxInput,MMGenTxInputList)
)[desc=='inputs']
return io_list(io(**e) for e in d)
tx_data = get_data_from_file(infile,tx.desc+' data',quiet=quiet_open)
try:
desc = 'data'
if len(tx_data) > g.max_tx_file_size:
raise MaxFileSizeExceeded(f'Transaction file size exceeds limit ({g.max_tx_file_size} bytes)')
tx_data = tx_data.splitlines()
assert len(tx_data) >= 5,'number of lines less than 5'
assert len(tx_data[0]) == 6,'invalid length of first line'
self.chksum = HexStr(tx_data.pop(0),on_fail='raise')
assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum'
if len(tx_data) == 6:
assert len(tx_data[-1]) == 64,'invalid coin TxID length'
desc = f'{g.proto.name} TxID'
tx.coin_txid = CoinTxID(tx_data.pop(-1),on_fail='raise')
if len(tx_data) == 5:
# rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up))
assert len(tx_data[-1]) < MMGenTxLabel.max_len*6,'invalid comment length'
c = tx_data.pop(-1)
if c != '-':
desc = 'encoded comment (not base58)'
from .baseconv import baseconv
comment = baseconv.tobytes(c,'b58').decode()
assert comment != False,'invalid comment'
desc = 'comment'
tx.label = MMGenTxLabel(comment,on_fail='raise')
desc = 'number of lines' # four required lines
metadata,tx.hex,inputs_data,outputs_data = tx_data
assert len(metadata) < 100,'invalid metadata length' # rough check
metadata = metadata.split()
if metadata[-1].startswith('LT='):
desc = 'locktime'
tx.locktime = int(metadata.pop()[3:])
tx.coin = metadata.pop(0) if len(metadata) == 6 else 'BTC'
if ':' in tx.coin:
tx.coin,tx.dcoin = tx.coin.split(':')
if len(metadata) == 5:
t = metadata.pop(0)
tx.chain = (t.lower(),None)[t=='Unknown']
desc = 'metadata (4 items minimum required)'
txid,send_amt,tx.timestamp,blockcount = metadata
desc = 'txid in metadata'
tx.txid = MMGenTxID(txid,on_fail='raise')
desc = 'send amount in metadata'
tx.send_amt = UnknownCoinAmt(send_amt) # temporary, for 'metadata_only'
desc = 'block count in metadata'
tx.blockcount = int(blockcount)
if metadata_only:
return
desc = 'send amount in metadata'
tx.send_amt = g.proto.coin_amt(send_amt,on_fail='raise')
desc = 'transaction file hex data'
tx.check_txfile_hex_data()
desc = f'transaction file {tx.hexdata_type} data'
tx.parse_txfile_hex_data()
# the following ops will all fail if g.coin doesn't match tx.coin
desc = 'coin type in metadata'
assert tx.coin == g.coin, tx.coin
desc = 'inputs data'
tx.inputs = eval_io_data(inputs_data,'inputs')
desc = 'outputs data'
tx.outputs = eval_io_data(outputs_data,'outputs')
except Exception as e:
die(2,f'Invalid {desc} in transaction file: {e.args[0]}')
# is_for_chain() is no-op for Ethereum: test and mainnet addrs have same format
if not tx.chain and not tx.inputs[0].addr.is_for_chain('testnet'):
tx.chain = 'mainnet'
if tx.dcoin:
tx.resolve_g_token_from_txfile()
g.proto.dcoin = tx.dcoin
def make_filename(self):
tx = self.tx
def gen_filename():
yield tx.txid
if g.coin != 'BTC':
yield '-' + g.dcoin
yield f'[{tx.send_amt!s}'
if tx.is_replaceable():
yield ',{}'.format(tx.fee_abs2rel(tx.get_fee(),to_unit=tx.fn_fee_unit))
if tx.get_hex_locktime():
yield ',tl={}'.format(tx.get_hex_locktime())
yield ']'
if g.debug_utf8:
yield ''
if g.proto.testnet:
yield '.testnet'
yield '.' + tx.ext
return ''.join(gen_filename())
def format(self):
tx = self.tx
tx.inputs.check_coin_mismatch()
tx.outputs.check_coin_mismatch()
def amt_to_str(d):
return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d}
coin_id = '' if g.coin == 'BTC' else g.coin + ('' if g.coin == g.dcoin else ':'+g.dcoin)
lines = [
'{}{} {} {} {} {}{}'.format(
(coin_id+' ' if coin_id else ''),
tx.chain.upper() if tx.chain else 'Unknown',
tx.txid,
tx.send_amt,
tx.timestamp,
tx.blockcount,
('',' LT={}'.format(tx.locktime))[bool(tx.locktime)]
),
tx.hex,
ascii([amt_to_str(e.__dict__) for e in tx.inputs]),
ascii([amt_to_str(e.__dict__) for e in tx.outputs])
]
if tx.label:
from .baseconv import baseconv
lines.append(baseconv.frombytes(tx.label.encode(),'b58',tostr=True))
if tx.coin_txid:
if not tx.label:
lines.append('-') # keep old tx files backwards compatible
lines.append(tx.coin_txid)
self.chksum = make_chksum_6(' '.join(lines))
fmt_data = '\n'.join([self.chksum] + lines) + '\n'
if len(fmt_data) > g.max_tx_file_size:
raise MaxFileSizeExceeded(f'Transaction file size exceeds limit ({g.max_tx_file_size} bytes)')
return fmt_data
def write(self,
add_desc = '',
ask_write = True,
ask_write_default_yes = False,
ask_tty = True,
ask_overwrite = True ):
if ask_write == False:
ask_write_default_yes = True
if not self.filename:
self.filename = self.make_filename()
if not self.fmt_data:
self.fmt_data = self.format()
write_data_to_file(
outfile = self.filename,
data = self.fmt_data,
desc = self.tx.desc + add_desc,
ask_overwrite = ask_overwrite,
ask_write = ask_write,
ask_tty = ask_tty,
ask_write_default_yes = ask_write_default_yes )

View file

@ -131,6 +131,7 @@ setup(
'mmgen.tool',
'mmgen.tw',
'mmgen.tx',
'mmgen.txfile',
'mmgen.txsign',
'mmgen.util',
'mmgen.wallet',

42
test/unit_tests_d/ut_tx.py Executable file
View file

@ -0,0 +1,42 @@
#!/usr/bin/env python3
"""
test.unit_tests_d.ut_tx: TX unit test for the MMGen suite
"""
import re
from mmgen.common import *
from mmgen.tx import MMGenTX
from mmgen.txfile import MMGenTxFile
class unit_tests:
def txfile(self,name,ut):
qmsg(' Testing TX file operations')
fns = ( # TODO: add altcoin TX files
'0B8D5A[15.31789,14,tl=1320969600].rawtx',
'0C7115[15.86255,14,tl=1320969600].testnet.rawtx',
'460D4D-BCH[10.19764,tl=1320969600].rawtx',
'25EFA3[2.34].testnet.rawtx',
)
for fn in fns:
fpath = os.path.join('test','ref',fn)
tx = MMGenTX(filename=fpath,quiet_open=True)
f = MMGenTxFile(tx)
fn_gen = f.make_filename()
vmsg(f' parsed: {fn_gen}')
assert fn_gen == fn, f'{fn_gen} != {fn}'
text = f.format()
# New in version 3.3: Support for the unicode legacy literal (u'value') was
# reintroduced to simplify the maintenance of dual Python 2.x and 3.x codebases.
# See PEP 414 for more information.
chk = re.subn(r"\bu'",r"'",open(fpath).read())[0] # remove Python2 'u' string prefixes from ref files
nLines = len([i for i in get_ndiff(chk,text) if i.startswith('-')])
assert nLines == 1, f'{nLines} lines differ: only checksum line should differ'
break # FIXME - test BCH, testnet
qmsg(' OK')
return True