tx.py: string formatting, whitespace; minor code rewrites

This commit is contained in:
The MMGen Project 2020-05-18 08:34:36 +00:00
commit 7a0d4bf298
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
4 changed files with 212 additions and 181 deletions

View file

@ -22,6 +22,7 @@ altcoins.eth.tx: Ethereum transaction classes for the MMGen suite
import json
from mmgen.common import *
from mmgen.exception import TransactionChainMismatch
from mmgen.obj import *
from mmgen.tx import MMGenTX,MMGenBumpTX,MMGenSplitTX,MMGenTxForSigning
@ -208,7 +209,7 @@ class EthereumMMGenTX(MMGenTX):
return ret
def convert_and_check_fee(self,tx_fee,desc='Missing description'):
abs_fee = self.process_fee_spec(tx_fee,None,on_fail='return')
abs_fee = self.process_fee_spec(tx_fee,None)
if abs_fee == False:
return False
elif not self.disable_fee_check and (abs_fee > g.proto.max_tx_fee):
@ -302,7 +303,7 @@ class EthereumMMGenTX(MMGenTX):
if not self.marked_signed():
die(1,'Transaction is not signed!')
self.check_correct_chain(on_fail='die')
self.check_correct_chain()
fee = self.fee_rel2abs(self.txobj['gasPrice'].toWei())
@ -420,7 +421,9 @@ class EthereumMMGenTxForSigning(EthereumMMGenTX,MMGenTxForSigning):
msg('Transaction is already signed!')
return False
if not self.check_correct_chain(on_fail='return'):
try:
self.check_correct_chain()
except TransactionChainMismatch:
return False
msg_r('Signing transaction{}...'.format(tx_num_str))
@ -503,12 +506,11 @@ class EthereumTokenMMGenTxForSigning(EthereumTokenMMGenTX,EthereumMMGenTxForSign
if g.token.upper() == self.dcoin:
g.token = d['token_addr']
elif g.token != d['token_addr']:
die(1,"""
{p!r}: invalid --token parameter for {t} {n} token transaction file\nPlease use '--token={t}'
""").strip().format(
p = g.token,
t = self.dcoin,
n = g.proto.name )
die(1,
"{!r}: invalid --token parameter for {t} {} token transaction file\nPlease use '--token={t}'".format(
g.token,
g.proto.name,
t = self.dcoin ))
def parse_txfile_hex_data(self):
d = EthereumMMGenTxForSigning.parse_txfile_hex_data(self)

View file

@ -43,6 +43,7 @@ class TokenNotInWallet(Exception): mmcode = 2
class BadTwComment(Exception): mmcode = 2
class BaseConversionError(Exception): mmcode = 2
class BaseConversionPadError(Exception): mmcode = 2
class TransactionChainMismatch(Exception):mmcode = 2
# 3: yellow hl, 'MMGen Error' + exception + message
class RPCFailure(Exception): mmcode = 3

View file

@ -371,7 +371,7 @@ def opt_is_tx_fee(key,val,desc): # 'key' must remain a placeholder
tx = MMGenTX()
# Size of 224 is just a ball-park figure to eliminate the most extreme cases at startup
# This check will be performed again once we know the true size
ret = tx.process_fee_spec(val,224,on_fail='return')
ret = tx.process_fee_spec(val,224)
if ret == False:
raise UserOptError('{!r}: invalid {}\n(not a {} amount or {} specification)'.format(

View file

@ -147,7 +147,8 @@ class DeserializedTX(dict,MMGenObject):
def readVInt(skip=False):
s = tx[self.idx]
self.idx += 1
if not skip: self.raw_tx.append(s)
if not skip:
self.raw_tx.append(s)
vbytes_len = 1 if s < 0xfd else 2 if s == 0xfd else 4 if s == 0xfe else 8
@ -156,7 +157,8 @@ class DeserializedTX(dict,MMGenObject):
else:
vbytes = tx[self.idx:self.idx+vbytes_len]
self.idx += vbytes_len
if not skip: self.raw_tx += vbytes
if not skip:
self.raw_tx += vbytes
return int(vbytes[::-1].hex(),16)
def make_txid(tx_bytes):
@ -245,15 +247,14 @@ class MMGenTxInputList(list,MMGenObject):
def convert_coin(self,verbose=False):
if verbose:
msg('{}:'.format(self.desc.capitalize()))
msg(f'{self.desc}:')
for i in self:
d = i.__dict__
d['amt'] = g.proto.coin_amt(d['amt'])
setattr(i,'amt',g.proto.coin_amt(i.amt))
def check_coin_mismatch(self):
for i in self:
if type(i.amt) != g.proto.coin_amt:
die(2,'Coin mismatch in transaction: amount {} not of type {}!'.format(i.amt,g.proto.coin_amt))
die(2,f'Coin mismatch in transaction: amount {i.amt} not of type {g.proto.coin_amt}!')
# Lexicographical Indexing of Transaction Inputs and Outputs
# https://github.com/bitcoin/bips/blob/master/bip-0069.mediawiki
@ -297,16 +298,17 @@ class MMGenTX(MMGenObject):
msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
msg_low_coin = 'Selected outputs insufficient to fund this transaction ({} {} needed)'
msg_no_change_output = """
ERROR: No change address specified. If you wish to create a transaction with
only one output, specify a single output address with no {} amount
""".strip()
msg_non_mmgen_inputs = """
NOTE: This transaction includes non-{pnm} inputs, which makes the signing
process more complicated. When signing the transaction, keys for non-{pnm}
inputs must be supplied to '{pnl}-txsign' in a file with the '--keys-from-file'
option.
Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_name.lower())
msg_no_change_output = fmt("""
ERROR: No change address specified. If you wish to create a transaction with
only one output, specify a single output address with no {} amount
""").strip()
msg_non_mmgen_inputs = fmt(f"""
NOTE: This transaction includes non-{g.proj_name} inputs, which makes the signing
process more complicated. When signing the transaction, keys for non-{g.proj_name}
inputs must be supplied to '{g.proj_name.lower()}-txsign' in a file with the '--keys-from-file'
option.
Selected non-{g.proj_name} inputs: {{}}
""").strip()
def __init__(self,filename=None,metadata_only=False,caller=None,quiet_open=False,data=None,tw=None):
if data:
@ -341,17 +343,14 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
self.check_sigs() # marks the tx as signed
# repeat with sign and send, because coin daemon could be restarted
self.check_correct_chain(on_fail='die')
self.check_correct_chain()
def check_correct_chain(self,on_fail='return'):
assert on_fail in ('return','die'),"'{}': invalid value for 'on_fail'".format(on_fail)
m = 'Transaction is for {}, but current chain is {}!'.format(self.chain,g.chain)
def check_correct_chain(self):
bad = self.chain and g.chain and self.chain != g.chain
if bad and hasattr(g.proto,'chain_name'):
bad = self.chain != g.proto.chain_name
if bad:
msg(m) if on_fail == 'return' else die(2,m)
return not bad
raise TransactionChainMismatch(f'Transaction is for {self.chain}, but current chain is {g.chain}!')
def add_output(self,coinaddr,amt,is_chg=None):
self.outputs.append(MMGenTxOutput(addr=coinaddr,amt=amt,is_chg=is_chg))
@ -380,8 +379,10 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
self.outputs.pop(idx)
def sum_outputs(self,exclude=None):
olist = self.outputs if exclude == None else \
self.outputs[:exclude] + self.outputs[exclude+1:]
if exclude == None:
olist = self.outputs
else:
olist = self.outputs[:exclude] + self.outputs[exclude+1:]
if not olist:
return g.proto.coin_amt('0')
return g.proto.coin_amt(sum(e.amt for e in olist))
@ -394,13 +395,14 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
for e in self.outputs:
if e.addr and e.addr in d:
e.mmid,f = d[e.addr]
if f: e.label = f
if f:
e.label = f
def check_dup_addrs(self,io_str):
assert io_str in ('inputs','outputs')
addrs = [e.addr for e in getattr(self,io_str)]
if len(addrs) != len(set(addrs)):
die(2,'{}: duplicate address in transaction {}'.format(attr,io_str))
die(2,f'{addrs}: duplicate address in transaction {io_str}')
def update_txid(self):
self.txid = MMGenTxID(make_chksum_6(bytes.fromhex(self.hex)).upper())
@ -413,7 +415,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
self.hex = HexStr(await g.rpc.call('createrawtransaction',i,o))
self.update_txid()
def print_contract_addr(self): pass
def print_contract_addr(self):
pass
# returns true if comment added or changed
def add_comment(self,infile=None):
@ -445,19 +448,20 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
est_vsize = self.estimate_size()
d = tx_decoded
vsize = d['vsize'] if 'vsize' in d else d['size']
vmsg('\nVsize: {} (true) {} (estimated)'.format(vsize,est_vsize))
m1 = 'Estimated transaction vsize is {:1.2f} times the true vsize\n'
m2 = 'Your transaction fee estimates will be inaccurate\n'
m3 = 'Please re-create and re-sign the transaction using the option --vsize-adj={:1.2f}'
# allow for 5% error
vmsg(f'\nVsize: {vsize} (true) {est_vsize} (estimated)')
ratio = float(est_vsize) / vsize
if not (0.95 < ratio < 1.05):
raise BadTxSizeEstimate((m1+m2+m3).format(ratio,1/ratio))
if not (0.95 < ratio < 1.05): # allow for 5% error
raise BadTxSizeEstimate(fmt(f"""
Estimated transaction vsize is {ratio:1.2f} times the true vsize
Your transaction fee estimates will be inaccurate
Please re-create and re-sign the transaction using the option --vsize-adj={1/ratio:1.2f}
""").strip())
# https://bitcoin.stackexchange.com/questions/1195/how-to-calculate-transaction-size-before-sending
# 180: uncompressed, 148: compressed
def estimate_size_old(self):
if not self.inputs or not self.outputs: return None
if not self.inputs or not self.outputs:
return None
return len(self.inputs)*180 + len(self.outputs)*34 + 10
# https://bitcoincore.org/en/segwit_wallet_dev/
@ -466,7 +470,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
# TODO: results differ slightly from actual transaction size
def estimate_size(self):
if not self.inputs or not self.outputs: return None
if not self.inputs or not self.outputs:
return None
sig_size = 72 # sig in DER format
pubkey_size_uncompressed = 65
@ -502,7 +507,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
# A non-witness program txin MUST be associated with an empty witness field, represented
# by a 0x00. If all txins are not witness program, a transaction's wtxid is equal to its txid.
def get_witness_size():
if not self.has_segwit_inputs(): return 0
if not self.has_segwit_inputs():
return 0
wf_size = 1 + 1 + sig_size + 1 + pubkey_size_compressed # vInt vInt sig vInt pubkey = 108
return sum((1,wf_size)[bool(i.mmid) and i.mmid.mmtype in ('S','B')] for i in self.inputs)
@ -563,29 +569,27 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
f = fee_per_kb * opt.tx_fee_adj * tx_size / 1024
ret = g.proto.coin_amt(f,from_decimal=True)
if opt.verbose:
msg('{} fee for {} confirmations: {} {}/kB'.format(fe_type.upper(),opt.tx_confs,fee_per_kb,g.coin))
msg('TX size (estimated): {} bytes'.format(tx_size))
msg('Fee adjustment factor: {}'.format(opt.tx_fee_adj))
msg('Absolute fee (fee_per_kb * adj_factor * tx_size / 1024): {} {}'.format(ret,g.coin))
msg(fmt(f"""
{fe_type.upper()} fee for {opt.tx_confs} confirmations: {fee_per_kb} {g.coin}/kB
TX size (estimated): {tx_size} bytes
Fee adjustment factor: {opt.tx_fee_adj}
Absolute fee (fee_per_kb * adj_factor * tx_size / 1024): {ret} {g.coin}
""").strip())
return ret
def convert_and_check_fee(self,tx_fee,desc='Missing description'):
abs_fee = self.process_fee_spec(tx_fee,self.estimate_size(),on_fail='return')
if abs_fee == None:
# we shouldn't be calling this if tx size is unknown
m = "'{}': cannot convert {} to {} because transaction size is unknown"
assert False, m.format(tx_fee,self.rel_fee_desc,g.coin)
abs_fee = self.process_fee_spec(tx_fee,self.estimate_size())
if abs_fee == None: # we shouldn't be calling this method if tx size is unknown
raise ValueError(
f'{tx_fee}: cannot convert {self.rel_fee_desc} to {g.coin} because transaction size is unknown')
elif abs_fee == False:
m = "'{}': invalid TX fee (not a {} amount or {} specification)"
msg(m.format(tx_fee,g.coin,self.rel_fee_desc))
msg(f'{tx_fee!r}: invalid TX fee (not a {g.coin} amount or {self.rel_fee_desc} specification)')
return False
elif abs_fee > g.proto.max_tx_fee:
m = '{} {c}: {} fee too large (maximum fee: {} {c})'
msg(m.format(abs_fee,desc,g.proto.max_tx_fee,c=g.coin))
msg(f'{abs_fee} {g.coin}: {desc} fee too large (maximum fee: {g.proto.max_tx_fee} {g.coin})')
return False
elif abs_fee < self.relay_fee:
m = '{} {c}: {} fee too small (below relay fee of {} {c})'
msg(m.format(str(abs_fee),desc,str(self.relay_fee),c=g.coin))
msg(f'{abs_fee} {g.coin}: {desc} fee too small (less than relay fee of {self.relay_fee} {g.coin})')
return False
else:
return abs_fee
@ -594,8 +598,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
# given tx size and absolute fee or fee spec, return absolute fee
# relative fee is N+<first letter of unit name>
def process_fee_spec(self,tx_fee,tx_size,on_fail='throw'):
def process_fee_spec(self,tx_fee,tx_size):
if g.proto.coin_amt(tx_fee,on_fail='silent'):
return g.proto.coin_amt(tx_fee)
else:
@ -605,11 +608,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
if pat.match(tx_fee):
amt,unit = pat.match(tx_fee).groups()
return self.convert_fee_spec(tx_size,units,amt,unit)
if on_fail == 'return':
return False
elif on_fail == 'throw':
assert False, "'{}': invalid tx-fee argument".format(tx_fee)
return False
def get_usr_fee_interactive(self,tx_fee=None,desc='Starting'):
abs_fee = None
@ -617,17 +616,19 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
if tx_fee:
abs_fee = self.convert_and_check_fee(tx_fee,desc)
if abs_fee:
p = '{} TX fee{}: {}{} {} ({} {})\n'.format(
prompt = '{} TX fee{}: {}{} {} ({} {})\n'.format(
desc,
('',' (after {}X adjustment)'.format(opt.tx_fee_adj))[
opt.tx_fee_adj != 1 and desc.startswith('Network-estimated')],
(f' (after {opt.tx_fee_adj}X adjustment)'
if opt.tx_fee_adj != 1 and desc.startswith('Network-estimated')
else ''),
('','')[self.fee_is_approximate],
abs_fee.hl(),
g.coin,
pink(str(self.fee_abs2rel(abs_fee))),
self.rel_fee_disp)
if opt.yes or keypress_confirm(p+'OK?',default_yes=True):
if opt.yes: msg(p)
if opt.yes or keypress_confirm(prompt+'OK?',default_yes=True):
if opt.yes:
msg(prompt)
return abs_fee
tx_fee = my_raw_input(self.usr_fee_prompt)
desc = 'User-selected'
@ -638,7 +639,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
desc = 'User-selected'
start_fee = opt.tx_fee
else:
desc = 'Network-estimated (mode: {})'.format(opt.fee_estimate_mode.upper())
desc = f'Network-estimated (mode: {opt.fee_estimate_mode.upper()})'
fee_per_kb,fe_type = await self.get_rel_fee_from_network()
if fee_per_kb < 0:
@ -653,7 +654,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
def delete_attrs(self,desc,attr):
for e in getattr(self,desc):
if hasattr(e,attr): delattr(e,attr)
if hasattr(e,attr):
delattr(e,attr)
# inputs methods
def copy_inputs_from_tw(self,tw_unspent_data):
@ -710,7 +712,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
from .baseconv import baseconv
lines.append(baseconv.frombytes(self.label.encode(),'b58',tostr=True))
if self.coin_txid:
if not self.label: lines.append('-') # keep old tx files backwards compatible
if not self.label:
lines.append('-') # keep old tx files backwards compatible
lines.append(self.coin_txid)
self.chksum = make_chksum_6(' '.join(lines))
self.fmt_data = '\n'.join([self.chksum] + lines)+'\n'
@ -810,7 +813,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
async def get_status(self,status=False):
class r(object): pass
class r(object):
pass
async def is_in_wallet():
try: ret = await g.rpc.call('gettransaction',self.coin_txid)
@ -888,7 +892,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
if not self.marked_signed():
die(1,'Transaction is not signed!')
self.check_correct_chain(on_fail='die')
self.check_correct_chain()
self.check_pubkey_scripts()
@ -948,12 +952,6 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
self.add_blockcount()
return True
def write_txid_to_file(self,ask_write=False,ask_write_default_yes=True):
fn = '{}[{}].{}'.format(self.txid,self.send_amt,self.txid_ext)
write_data_to_file(fn,self.coin_txid+'\n','transaction ID',
ask_write=ask_write,
ask_write_default_yes=ask_write_default_yes)
def create_fn(self):
tl = self.get_hex_locktime()
tn = ('','.testnet')[g.proto.testnet]
@ -975,9 +973,14 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
ask_tty=True,
ask_overwrite=True):
if ask_write == False: ask_write_default_yes = True
if not self.fmt_data: self.format()
if not self.fn: self.create_fn()
if ask_write == False:
ask_write_default_yes = True
if not self.fmt_data:
self.format()
if not self.fn:
self.create_fn()
write_data_to_file(self.fn,self.fmt_data,self.desc+add_desc,
ask_overwrite=ask_overwrite,
@ -1002,7 +1005,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
def view(self,pager=False,pause=True,terse=False):
o = self.format_view(terse=terse)
if pager: do_pager(o)
if pager:
do_pager(o)
else:
msg_r(o)
from .term import get_char
@ -1085,11 +1089,11 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
return g.proto.coin_amt(self.get_fee_from_tx()).hl()
def format_view_verbose_footer(self):
ts = len(self.hex)//2 if self.hex else 'unknown'
out = 'Transaction size: Vsize {} (estimated), Total {}'.format(self.estimate_size(),ts)
tsize = len(self.hex)//2 if self.hex else 'unknown'
out = f'Transaction size: Vsize {self.estimate_size()} (estimated), Total {tsize}'
if self.marked_signed():
ws = DeserializedTX(self.hex)['witness_size']
out += ', Base {}, Witness {}'.format(ts-ws,ws)
wsize = DeserializedTX(self.hex)['witness_size']
out += f', Base {tsize-wsize}, Witness {wsize}'
return out + '\n'
def format_view(self,terse=False,sort=dfl_view_sort_order):
@ -1107,39 +1111,45 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
sel_f = lambda o: len(o.mmid) + (2,8)[bool(o.is_chg)] # + len(' (chg)')
return max(max([sel_f(o) for o in io if o.mmid] or [0]),len(nonmm_str))
nonmm_str = '(non-{} address)'.format(g.proj_name)
nonmm_str = f'(non-{g.proj_name} address)'
max_mmwid = max(get_max_mmwid(self.inputs),get_max_mmwid(self.outputs))
out = (self.txview_hdr_fs,self.txview_hdr_fs_short)[bool(terse)].format(
i=self.txid.hl(),
a=self.send_amt.hl(),
c=g.dcoin,
t=self.timestamp,
r=(red('False'),green('True'))[self.is_replaceable()],
s=self.marked_signed(color=True),
l=(green('None'),orange(strfmt_locktime(self.locktime,terse=True)))[bool(self.locktime)])
def gen_view():
yield (self.txview_hdr_fs_short if terse else self.txview_hdr_fs).format(
i = self.txid.hl(),
a = self.send_amt.hl(),
c = g.dcoin,
t = self.timestamp,
r = (red('False'),green('True'))[self.is_replaceable()],
s = self.marked_signed(color=True),
l = (green('None'),orange(strfmt_locktime(self.locktime,terse=True)))[bool(self.locktime)] )
if self.chain != 'mainnet':
out += green('Chain: {}\n'.format(self.chain.upper()))
if self.coin_txid:
out += '{} TxID: {}\n'.format(g.coin,self.coin_txid.hl())
enl = ('\n','')[bool(terse)]
out += enl
if self.label:
out += 'Comment: {}\n{}'.format(self.label.hl(),enl)
if self.chain != 'mainnet':
yield green(f'Chain: {self.chain.upper()}') + '\n'
out += self.format_view_body(blockcount,nonmm_str,max_mmwid,enl,terse=terse,sort=sort)
if self.coin_txid:
yield f'{g.coin} TxID: {self.coin_txid.hl()}\n'
out += (self.txview_ftr_fs,self.txview_ftr_fs_short)[bool(terse)].format(
i=self.sum_inputs().hl(),
o=self.sum_outputs().hl(),
a=self.format_view_abs_fee(),
r=self.format_view_rel_fee(terse),
d=g.dcoin,c=g.coin)
enl = ('\n','')[bool(terse)]
yield enl
if opt.verbose: out += self.format_view_verbose_footer()
if self.label:
yield f'Comment: {self.label.hl()}\n{enl}'
return out # TX label might contain non-ascii chars
yield self.format_view_body(blockcount,nonmm_str,max_mmwid,enl,terse=terse,sort=sort)
yield (self.txview_ftr_fs_short if terse else self.txview_ftr_fs).format(
i = self.sum_inputs().hl(),
o = self.sum_outputs().hl(),
a = self.format_view_abs_fee(),
r = self.format_view_rel_fee(terse),
d = g.dcoin,
c = g.coin )
if opt.verbose:
yield self.format_view_verbose_footer()
return ''.join(gen_view()) # TX label might contain non-ascii chars
def check_txfile_hex_data(self):
self.hex = HexStr(self.hex,on_fail='raise')
@ -1161,7 +1171,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
assert type(d) == list,'{} data not a list!'.format(desc)
if not (desc == 'outputs' and g.proto.base_coin == 'ETH'): # ETH txs can have no outputs
assert len(d),'no {}!'.format(desc)
for e in d: e['amt'] = g.proto.coin_amt(e['amt'])
for e in d:
e['amt'] = g.proto.coin_amt(e['amt'])
io,io_list = (
(MMGenTxOutput,MMGenTxOutputList),
(MMGenTxInput,MMGenTxInputList)
@ -1224,7 +1235,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
desc = 'block count in metadata'
self.blockcount = int(blockcount)
if metadata_only: return
if metadata_only:
return
desc = 'send amount in metadata'
self.send_amt = g.proto.coin_amt(send_amt,on_fail='raise')
@ -1241,7 +1253,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
desc = 'outputs data'
self.outputs = eval_io_data(outputs_data,'outputs')
except Exception as e:
die(2,'Invalid {} in transaction file: {}'.format(desc,e.args[0]))
die(2,f'Invalid {desc} in transaction file: {e.args[0]}')
# test doesn't work for Ethereum: test and mainnet addrs have same format
if not self.chain and not self.inputs[0].addr.is_for_chain('testnet'):
@ -1260,7 +1272,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
coin_addr = mmaddr2coinaddr(addr,ad_w,ad_f) if is_mmgen_id(addr) else CoinAddr(addr)
self.add_output(coin_addr,g.proto.coin_amt(amt or '0'),is_chg=not amt)
else:
die(2,"{}: invalid {} '{}'".format(addr,err_desc,','.join((addr,amt)) if amt else addr))
die(2,f'{addr}: invalid {err_desc} {{!r}}'.format(f'{addr},{amt}' if amt else addr))
if ',' in arg:
addr,amt = arg.split(',',1)
@ -1270,15 +1282,16 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
def process_cmd_args(self,cmd_args,ad_f,ad_w):
for a in cmd_args: self.process_cmd_arg(a,ad_f,ad_w)
for a in cmd_args:
self.process_cmd_arg(a,ad_f,ad_w)
if self.get_chg_output_idx() == None:
die(2,( 'ERROR: No change output specified',
self.msg_no_change_output.format(g.dcoin))[len(self.outputs) == 1])
if not segwit_is_active() and self.has_segwit_outputs():
fs = '{} Segwit address requested on the command line, but Segwit is not active on this chain'
rdie(2,fs.format(g.proj_name))
rdie(2,f'{g.proj_name} Segwit address requested on the command line, '
+ 'but Segwit is not active on this chain')
if not self.outputs:
die(2,'At least one output must be specified on the command line')
@ -1328,34 +1341,35 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
msg(self.msg_low_coin.format(g.proto.coin_amt(-change_amt).hl(),g.coin))
def final_inputs_ok_msg(self,change_amt):
m = 'Transaction produces {} {} in change'
return m.format(g.proto.coin_amt(change_amt).hl(),g.coin)
return f'Transaction produces {g.proto.coin_amt(change_amt).hl()} {g.coin} in change'
def select_unspent_cmdline(self,unspent):
sel_nums = []
for i in opt.inputs.split(','):
ls = len(sel_nums)
if is_mmgen_id(i):
for j in range(len(unspent)):
if unspent[j].twmmid == i:
sel_nums.append(j+1)
elif is_coin_addr(i):
for j in range(len(unspent)):
if unspent[j].addr == i:
sel_nums.append(j+1)
else:
die(1,"'{}': not an MMGen ID or coin address".format(i))
ldiff = len(sel_nums) - ls
if ldiff:
sel_inputs = ','.join([str(i) for i in sel_nums[-ldiff:]])
ul = unspent[sel_nums[-1]-1]
mmid_disp = ' (' + ul.twmmid + ')' if ul.twmmid.type == 'mmgen' else ''
msg('Adding input{}: {} {}{}'.format(suf(ldiff),sel_inputs,ul.addr,mmid_disp))
else:
die(1,"'{}': address not found in tracking wallet".format(i))
def idx2num(idx):
uo = unspent[idx]
mmid_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
msg(f'Adding input: {idx + 1} {uo.addr}{mmid_disp}')
return idx + 1
return set(sel_nums) # silently discard duplicates
def get_uo_nums():
for addr in opt.inputs.split(','):
if is_mmgen_id(addr):
attr = 'twmmid'
elif is_coin_addr(addr):
attr = 'addr'
else:
die(1,f'{addr!r}: not an MMGen ID or {g.coin} address')
found = False
for idx in range(len(unspent)):
if getattr(unspent[idx],attr) == addr:
yield idx2num(idx)
found = True
if not found:
die(1,f'{addr!r}: address not found in tracking wallet')
return set(get_uo_nums()) # silently discard duplicates
async def get_cmdline_input_addrs(self):
# Bitcoin full node, call doesn't go to the network, so just call listunspent with addrs=[]
@ -1367,7 +1381,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
us_f = self.select_unspent_cmdline if opt.inputs else self.select_unspent
sel_nums = us_f(self.twuo.unspent)
msg('Selected output{}: {}'.format(suf(sel_nums),' '.join(map(str,sel_nums))))
msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums)))
sel_unspent = self.twuo.MMGenTwOutputList([self.twuo.unspent[i-1] for i in sel_nums])
inputs_sum = sum(s.amt for s in sel_unspent)
@ -1390,7 +1404,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
if change_amt >= 0:
p = self.final_inputs_ok_msg(change_amt)
if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
if opt.yes: msg(p)
if opt.yes:
msg(p)
return change_amt
else:
self.warn_insufficient_chg(change_amt)
@ -1434,9 +1449,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
self.send_amt = self.sum_outputs()
msg('Total amount to spend: {}'.format(
('Unknown','{} {}'.format(self.send_amt.hl(),g.dcoin))[bool(self.send_amt)]
))
msg_r('Total amount to spend: ')
msg(f'{self.send_amt.hl()} {g.dcoin}' if self.send_amt else 'Unknown')
change_amt = await self.get_inputs_from_user()
@ -1447,8 +1461,10 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
self.inputs.sort_bip69()
self.outputs.sort_bip69()
# do this only after inputs are sorted
if opt.rbf: self.inputs[0].sequence = g.max_int - 2 # handles the locktime case too
elif locktime: self.inputs[0].sequence = g.max_int - 1
if opt.rbf:
self.inputs[0].sequence = g.max_int - 2 # handles the locktime case too
elif locktime:
self.inputs[0].sequence = g.max_int - 1
if not opt.yes:
self.add_comment() # edits an existing comment
@ -1456,7 +1472,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam
await self.create_raw() # creates self.hex, self.txid
if g.proto.base_proto == 'Bitcoin' and locktime:
msg('Setting nlocktime to {}!'.format(strfmt_locktime(locktime)))
msg(f'Setting nlocktime to {strfmt_locktime(locktime)}!')
self.set_hex_locktime(locktime)
self.update_txid()
self.locktime = locktime
@ -1487,16 +1503,18 @@ class MMGenTxForSigning(MMGenTX):
msg('Transaction is already signed!')
return False
if not self.check_correct_chain(on_fail='return'):
try:
self.check_correct_chain()
except TransactionChainMismatch:
return False
if (self.has_segwit_inputs() or self.has_segwit_outputs()) and not g.proto.cap('segwit'):
ymsg("TX has Segwit inputs or outputs, but {} doesn't support Segwit!".format(g.coin))
ymsg(f"TX has Segwit inputs or outputs, but {g.coin} doesn't support Segwit!")
return False
self.check_pubkey_scripts()
qmsg('Passing {} key{} to {}'.format(len(keys),suf(keys),g.proto.daemon_name))
qmsg(f'Passing {len(keys)} key{suf(keys)} to {g.proto.daemon_name}')
if self.has_segwit_inputs():
from .addr import KeyGenerator,AddrGenerator
@ -1513,12 +1531,12 @@ class MMGenTxForSigning(MMGenTX):
e['redeemScript'] = ag.to_segwit_redeem_script(kg.to_pubhex(keydict[d.addr]))
sig_data.append(e)
msg_r('Signing transaction{}...'.format(tx_num_str))
msg_r(f'Signing transaction{tx_num_str}...')
wifs = [d.sec.wif for d in keys]
try:
args = (
('signrawtransaction',self.hex,sig_data,wifs,g.proto.sighash_type),
('signrawtransaction', self.hex,sig_data,wifs,g.proto.sighash_type),
('signrawtransactionwithkey',self.hex,wifs,sig_data,g.proto.sighash_type)
)['sign_with_key' in g.rpc.caps]
ret = await g.rpc.call(*args)
@ -1562,22 +1580,23 @@ class MMGenBumpTX(MMGenTxForSigning):
super().__init__(filename,tw=tw)
if not self.is_replaceable():
die(1,"Transaction '{}' is not replaceable".format(self.txid))
die(1,f'Transaction {self.txid} is not replaceable')
# If sending, require tx to be signed
if send:
if not self.marked_signed():
die(1,"File '{}' is not a signed {} transaction file".format(filename,g.proj_name))
die(1,'File {filename!r} is not a signed {g.proj_name} transaction file')
if not self.coin_txid:
die(1,"Transaction '{}' was not broadcast to the network".format(self.txid))
die(1,'Transaction {self.txid!r} was not broadcast to the network')
self.coin_txid = ''
self.mark_raw()
def check_bumpable(self):
if not [o.amt for o in self.outputs if o.amt >= self.min_fee]:
die(1,'Transaction cannot be bumped.' +
'\nAll outputs have less than the minimum fee ({} {})'.format(self.min_fee,g.coin))
die(1,
'Transaction cannot be bumped.\n' +
f'All outputs contain less than the minimum fee ({self.min_fee} {g.coin})')
def choose_output(self):
chg_idx = self.get_chg_output_idx()
@ -1585,8 +1604,7 @@ class MMGenBumpTX(MMGenTxForSigning):
def check_sufficient_funds(o_amt):
if o_amt < self.min_fee:
msg('Minimum fee ({} {c}) is greater than output amount ({} {c})'.format(
self.min_fee,o_amt,c=g.coin))
msg(f'Minimum fee ({self.min_fee} {g.coin}) is greater than output amount ({o_amt} {g.coin})')
return False
return True
@ -1604,21 +1622,21 @@ class MMGenBumpTX(MMGenTxForSigning):
else:
reply,init_reply = init_reply,None
if chg_idx == None and not is_int(reply):
msg("Output must be an integer")
msg('Output must be an integer')
elif chg_idx != None and not is_int(reply) and reply != 'c':
msg("Output must be an integer, or 'c' for the change output")
else:
idx = chg_idx if reply == 'c' else (int(reply) - 1)
if idx < 0 or idx >= len(self.outputs):
msg('Output must be in the range 1-{}'.format(len(self.outputs)))
msg(f'Output must be in the range 1-{len(self.outputs)}')
else:
o_amt = self.outputs[idx].amt
cs = ('',' (change output)')[chg_idx == idx]
p = 'Fee will be deducted from output {}{} ({} {})'.format(idx+1,cs,o_amt,g.coin)
cm = ' (change output)' if chg_idx == idx else ''
prompt = f'Fee will be deducted from output {idx+1}{cm} ({o_amt} {g.coin})'
if check_sufficient_funds(o_amt):
if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
if opt.yes or keypress_confirm(prompt+'. OK?',default_yes=True):
if opt.yes:
msg(p)
msg(prompt)
self.bump_output_idx = idx
return idx
@ -1634,11 +1652,20 @@ class MMGenBumpTX(MMGenTxForSigning):
ret = super().convert_and_check_fee(tx_fee,desc)
if ret < self.min_fee:
msg('{} {c}: {} fee too small. Minimum fee: {} {c} ({} {})'.format(
ret,desc,self.min_fee,self.fee_abs2rel(self.min_fee.hl()),self.rel_fee_desc,c=g.coin))
ret.hl(),
desc,
self.min_fee,
self.fee_abs2rel(self.min_fee.hl()),
self.rel_fee_desc,
c = g.coin ))
return False
output_amt = self.outputs[self.bump_output_idx].amt
if ret >= output_amt:
msg('{} {c}: {} fee too large. Maximum fee: <{} {c}'.format(ret.hl(),desc,output_amt.hl(),c=g.coin))
msg('{} {c}: {} fee too large. Maximum fee: <{} {c}'.format(
ret.hl(),
desc,
output_amt.hl(),
c = g.coin ))
return False
return ret
@ -1679,7 +1706,8 @@ class MMGenSplitTX(MMGenTX):
if change_amt >= 0:
p = 'Transaction produces {} {} in change'.format(change_amt.hl(),g.coin)
if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
if opt.yes: msg(p)
if opt.yes:
msg(p)
break
else:
self.warn_insufficient_chg(change_amt)