From 7a0d4bf298fcf32041649bae23f66ec5b76d4cdf Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Mon, 18 May 2020 08:34:36 +0000 Subject: [PATCH] tx.py: string formatting, whitespace; minor code rewrites --- mmgen/altcoins/eth/tx.py | 20 ++- mmgen/exception.py | 1 + mmgen/opts.py | 2 +- mmgen/tx.py | 366 +++++++++++++++++++++------------------ 4 files changed, 210 insertions(+), 179 deletions(-) diff --git a/mmgen/altcoins/eth/tx.py b/mmgen/altcoins/eth/tx.py index 9bd26aa6..156eda62 100755 --- a/mmgen/altcoins/eth/tx.py +++ b/mmgen/altcoins/eth/tx.py @@ -22,6 +22,7 @@ altcoins.eth.tx: Ethereum transaction classes for the MMGen suite import json from mmgen.common import * +from mmgen.exception import TransactionChainMismatch from mmgen.obj import * from mmgen.tx import MMGenTX,MMGenBumpTX,MMGenSplitTX,MMGenTxForSigning @@ -208,7 +209,7 @@ class EthereumMMGenTX(MMGenTX): return ret def convert_and_check_fee(self,tx_fee,desc='Missing description'): - abs_fee = self.process_fee_spec(tx_fee,None,on_fail='return') + abs_fee = self.process_fee_spec(tx_fee,None) if abs_fee == False: return False elif not self.disable_fee_check and (abs_fee > g.proto.max_tx_fee): @@ -302,7 +303,7 @@ class EthereumMMGenTX(MMGenTX): if not self.marked_signed(): die(1,'Transaction is not signed!') - self.check_correct_chain(on_fail='die') + self.check_correct_chain() fee = self.fee_rel2abs(self.txobj['gasPrice'].toWei()) @@ -420,7 +421,9 @@ class EthereumMMGenTxForSigning(EthereumMMGenTX,MMGenTxForSigning): msg('Transaction is already signed!') return False - if not self.check_correct_chain(on_fail='return'): + try: + self.check_correct_chain() + except TransactionChainMismatch: return False msg_r('Signing transaction{}...'.format(tx_num_str)) @@ -503,12 +506,11 @@ class EthereumTokenMMGenTxForSigning(EthereumTokenMMGenTX,EthereumMMGenTxForSign if g.token.upper() == self.dcoin: g.token = d['token_addr'] elif g.token != d['token_addr']: - die(1,""" - {p!r}: invalid --token parameter for {t} {n} token transaction file\nPlease use '--token={t}' - """).strip().format( - p = g.token, - t = self.dcoin, - n = g.proto.name ) + die(1, + "{!r}: invalid --token parameter for {t} {} token transaction file\nPlease use '--token={t}'".format( + g.token, + g.proto.name, + t = self.dcoin )) def parse_txfile_hex_data(self): d = EthereumMMGenTxForSigning.parse_txfile_hex_data(self) diff --git a/mmgen/exception.py b/mmgen/exception.py index d6fb4a5c..575570d7 100755 --- a/mmgen/exception.py +++ b/mmgen/exception.py @@ -43,6 +43,7 @@ class TokenNotInWallet(Exception): mmcode = 2 class BadTwComment(Exception): mmcode = 2 class BaseConversionError(Exception): mmcode = 2 class BaseConversionPadError(Exception): mmcode = 2 +class TransactionChainMismatch(Exception):mmcode = 2 # 3: yellow hl, 'MMGen Error' + exception + message class RPCFailure(Exception): mmcode = 3 diff --git a/mmgen/opts.py b/mmgen/opts.py index 66a990ef..67aa5d26 100755 --- a/mmgen/opts.py +++ b/mmgen/opts.py @@ -371,7 +371,7 @@ def opt_is_tx_fee(key,val,desc): # 'key' must remain a placeholder tx = MMGenTX() # Size of 224 is just a ball-park figure to eliminate the most extreme cases at startup # This check will be performed again once we know the true size - ret = tx.process_fee_spec(val,224,on_fail='return') + ret = tx.process_fee_spec(val,224) if ret == False: raise UserOptError('{!r}: invalid {}\n(not a {} amount or {} specification)'.format( diff --git a/mmgen/tx.py b/mmgen/tx.py index 389c59ba..487202e7 100755 --- a/mmgen/tx.py +++ b/mmgen/tx.py @@ -147,7 +147,8 @@ class DeserializedTX(dict,MMGenObject): def readVInt(skip=False): s = tx[self.idx] self.idx += 1 - if not skip: self.raw_tx.append(s) + if not skip: + self.raw_tx.append(s) vbytes_len = 1 if s < 0xfd else 2 if s == 0xfd else 4 if s == 0xfe else 8 @@ -156,7 +157,8 @@ class DeserializedTX(dict,MMGenObject): else: vbytes = tx[self.idx:self.idx+vbytes_len] self.idx += vbytes_len - if not skip: self.raw_tx += vbytes + if not skip: + self.raw_tx += vbytes return int(vbytes[::-1].hex(),16) def make_txid(tx_bytes): @@ -245,15 +247,14 @@ class MMGenTxInputList(list,MMGenObject): def convert_coin(self,verbose=False): if verbose: - msg('{}:'.format(self.desc.capitalize())) + msg(f'{self.desc}:') for i in self: - d = i.__dict__ - d['amt'] = g.proto.coin_amt(d['amt']) + setattr(i,'amt',g.proto.coin_amt(i.amt)) def check_coin_mismatch(self): for i in self: if type(i.amt) != g.proto.coin_amt: - die(2,'Coin mismatch in transaction: amount {} not of type {}!'.format(i.amt,g.proto.coin_amt)) + die(2,f'Coin mismatch in transaction: amount {i.amt} not of type {g.proto.coin_amt}!') # Lexicographical Indexing of Transaction Inputs and Outputs # https://github.com/bitcoin/bips/blob/master/bip-0069.mediawiki @@ -297,16 +298,17 @@ class MMGenTX(MMGenObject): msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)' msg_low_coin = 'Selected outputs insufficient to fund this transaction ({} {} needed)' - msg_no_change_output = """ -ERROR: No change address specified. If you wish to create a transaction with -only one output, specify a single output address with no {} amount -""".strip() - msg_non_mmgen_inputs = """ -NOTE: This transaction includes non-{pnm} inputs, which makes the signing -process more complicated. When signing the transaction, keys for non-{pnm} -inputs must be supplied to '{pnl}-txsign' in a file with the '--keys-from-file' -option. -Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_name.lower()) + msg_no_change_output = fmt(""" + ERROR: No change address specified. If you wish to create a transaction with + only one output, specify a single output address with no {} amount + """).strip() + msg_non_mmgen_inputs = fmt(f""" + NOTE: This transaction includes non-{g.proj_name} inputs, which makes the signing + process more complicated. When signing the transaction, keys for non-{g.proj_name} + inputs must be supplied to '{g.proj_name.lower()}-txsign' in a file with the '--keys-from-file' + option. + Selected non-{g.proj_name} inputs: {{}} + """).strip() def __init__(self,filename=None,metadata_only=False,caller=None,quiet_open=False,data=None,tw=None): if data: @@ -341,17 +343,14 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam self.check_sigs() # marks the tx as signed # repeat with sign and send, because coin daemon could be restarted - self.check_correct_chain(on_fail='die') + self.check_correct_chain() - def check_correct_chain(self,on_fail='return'): - assert on_fail in ('return','die'),"'{}': invalid value for 'on_fail'".format(on_fail) - m = 'Transaction is for {}, but current chain is {}!'.format(self.chain,g.chain) + def check_correct_chain(self): bad = self.chain and g.chain and self.chain != g.chain if bad and hasattr(g.proto,'chain_name'): bad = self.chain != g.proto.chain_name if bad: - msg(m) if on_fail == 'return' else die(2,m) - return not bad + raise TransactionChainMismatch(f'Transaction is for {self.chain}, but current chain is {g.chain}!') def add_output(self,coinaddr,amt,is_chg=None): self.outputs.append(MMGenTxOutput(addr=coinaddr,amt=amt,is_chg=is_chg)) @@ -380,8 +379,10 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam self.outputs.pop(idx) def sum_outputs(self,exclude=None): - olist = self.outputs if exclude == None else \ - self.outputs[:exclude] + self.outputs[exclude+1:] + if exclude == None: + olist = self.outputs + else: + olist = self.outputs[:exclude] + self.outputs[exclude+1:] if not olist: return g.proto.coin_amt('0') return g.proto.coin_amt(sum(e.amt for e in olist)) @@ -394,13 +395,14 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam for e in self.outputs: if e.addr and e.addr in d: e.mmid,f = d[e.addr] - if f: e.label = f + if f: + e.label = f def check_dup_addrs(self,io_str): assert io_str in ('inputs','outputs') addrs = [e.addr for e in getattr(self,io_str)] if len(addrs) != len(set(addrs)): - die(2,'{}: duplicate address in transaction {}'.format(attr,io_str)) + die(2,f'{addrs}: duplicate address in transaction {io_str}') def update_txid(self): self.txid = MMGenTxID(make_chksum_6(bytes.fromhex(self.hex)).upper()) @@ -413,7 +415,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam self.hex = HexStr(await g.rpc.call('createrawtransaction',i,o)) self.update_txid() - def print_contract_addr(self): pass + def print_contract_addr(self): + pass # returns true if comment added or changed def add_comment(self,infile=None): @@ -445,19 +448,20 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam est_vsize = self.estimate_size() d = tx_decoded vsize = d['vsize'] if 'vsize' in d else d['size'] - vmsg('\nVsize: {} (true) {} (estimated)'.format(vsize,est_vsize)) - m1 = 'Estimated transaction vsize is {:1.2f} times the true vsize\n' - m2 = 'Your transaction fee estimates will be inaccurate\n' - m3 = 'Please re-create and re-sign the transaction using the option --vsize-adj={:1.2f}' - # allow for 5% error + vmsg(f'\nVsize: {vsize} (true) {est_vsize} (estimated)') ratio = float(est_vsize) / vsize - if not (0.95 < ratio < 1.05): - raise BadTxSizeEstimate((m1+m2+m3).format(ratio,1/ratio)) + if not (0.95 < ratio < 1.05): # allow for 5% error + raise BadTxSizeEstimate(fmt(f""" + Estimated transaction vsize is {ratio:1.2f} times the true vsize + Your transaction fee estimates will be inaccurate + Please re-create and re-sign the transaction using the option --vsize-adj={1/ratio:1.2f} + """).strip()) # https://bitcoin.stackexchange.com/questions/1195/how-to-calculate-transaction-size-before-sending # 180: uncompressed, 148: compressed def estimate_size_old(self): - if not self.inputs or not self.outputs: return None + if not self.inputs or not self.outputs: + return None return len(self.inputs)*180 + len(self.outputs)*34 + 10 # https://bitcoincore.org/en/segwit_wallet_dev/ @@ -466,7 +470,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam # TODO: results differ slightly from actual transaction size def estimate_size(self): - if not self.inputs or not self.outputs: return None + if not self.inputs or not self.outputs: + return None sig_size = 72 # sig in DER format pubkey_size_uncompressed = 65 @@ -502,7 +507,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam # A non-witness program txin MUST be associated with an empty witness field, represented # by a 0x00. If all txins are not witness program, a transaction's wtxid is equal to its txid. def get_witness_size(): - if not self.has_segwit_inputs(): return 0 + if not self.has_segwit_inputs(): + return 0 wf_size = 1 + 1 + sig_size + 1 + pubkey_size_compressed # vInt vInt sig vInt pubkey = 108 return sum((1,wf_size)[bool(i.mmid) and i.mmid.mmtype in ('S','B')] for i in self.inputs) @@ -563,29 +569,27 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam f = fee_per_kb * opt.tx_fee_adj * tx_size / 1024 ret = g.proto.coin_amt(f,from_decimal=True) if opt.verbose: - msg('{} fee for {} confirmations: {} {}/kB'.format(fe_type.upper(),opt.tx_confs,fee_per_kb,g.coin)) - msg('TX size (estimated): {} bytes'.format(tx_size)) - msg('Fee adjustment factor: {}'.format(opt.tx_fee_adj)) - msg('Absolute fee (fee_per_kb * adj_factor * tx_size / 1024): {} {}'.format(ret,g.coin)) + msg(fmt(f""" + {fe_type.upper()} fee for {opt.tx_confs} confirmations: {fee_per_kb} {g.coin}/kB + TX size (estimated): {tx_size} bytes + Fee adjustment factor: {opt.tx_fee_adj} + Absolute fee (fee_per_kb * adj_factor * tx_size / 1024): {ret} {g.coin} + """).strip()) return ret def convert_and_check_fee(self,tx_fee,desc='Missing description'): - abs_fee = self.process_fee_spec(tx_fee,self.estimate_size(),on_fail='return') - if abs_fee == None: - # we shouldn't be calling this if tx size is unknown - m = "'{}': cannot convert {} to {} because transaction size is unknown" - assert False, m.format(tx_fee,self.rel_fee_desc,g.coin) + abs_fee = self.process_fee_spec(tx_fee,self.estimate_size()) + if abs_fee == None: # we shouldn't be calling this method if tx size is unknown + raise ValueError( + f'{tx_fee}: cannot convert {self.rel_fee_desc} to {g.coin} because transaction size is unknown') elif abs_fee == False: - m = "'{}': invalid TX fee (not a {} amount or {} specification)" - msg(m.format(tx_fee,g.coin,self.rel_fee_desc)) + msg(f'{tx_fee!r}: invalid TX fee (not a {g.coin} amount or {self.rel_fee_desc} specification)') return False elif abs_fee > g.proto.max_tx_fee: - m = '{} {c}: {} fee too large (maximum fee: {} {c})' - msg(m.format(abs_fee,desc,g.proto.max_tx_fee,c=g.coin)) + msg(f'{abs_fee} {g.coin}: {desc} fee too large (maximum fee: {g.proto.max_tx_fee} {g.coin})') return False elif abs_fee < self.relay_fee: - m = '{} {c}: {} fee too small (below relay fee of {} {c})' - msg(m.format(str(abs_fee),desc,str(self.relay_fee),c=g.coin)) + msg(f'{abs_fee} {g.coin}: {desc} fee too small (less than relay fee of {self.relay_fee} {g.coin})') return False else: return abs_fee @@ -594,8 +598,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam # given tx size and absolute fee or fee spec, return absolute fee # relative fee is N+ - def process_fee_spec(self,tx_fee,tx_size,on_fail='throw'): - + def process_fee_spec(self,tx_fee,tx_size): if g.proto.coin_amt(tx_fee,on_fail='silent'): return g.proto.coin_amt(tx_fee) else: @@ -605,11 +608,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam if pat.match(tx_fee): amt,unit = pat.match(tx_fee).groups() return self.convert_fee_spec(tx_size,units,amt,unit) - - if on_fail == 'return': - return False - elif on_fail == 'throw': - assert False, "'{}': invalid tx-fee argument".format(tx_fee) + return False def get_usr_fee_interactive(self,tx_fee=None,desc='Starting'): abs_fee = None @@ -617,17 +616,19 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam if tx_fee: abs_fee = self.convert_and_check_fee(tx_fee,desc) if abs_fee: - p = '{} TX fee{}: {}{} {} ({} {})\n'.format( + prompt = '{} TX fee{}: {}{} {} ({} {})\n'.format( desc, - ('',' (after {}X adjustment)'.format(opt.tx_fee_adj))[ - opt.tx_fee_adj != 1 and desc.startswith('Network-estimated')], + (f' (after {opt.tx_fee_adj}X adjustment)' + if opt.tx_fee_adj != 1 and desc.startswith('Network-estimated') + else ''), ('','≈')[self.fee_is_approximate], abs_fee.hl(), g.coin, pink(str(self.fee_abs2rel(abs_fee))), self.rel_fee_disp) - if opt.yes or keypress_confirm(p+'OK?',default_yes=True): - if opt.yes: msg(p) + if opt.yes or keypress_confirm(prompt+'OK?',default_yes=True): + if opt.yes: + msg(prompt) return abs_fee tx_fee = my_raw_input(self.usr_fee_prompt) desc = 'User-selected' @@ -638,7 +639,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam desc = 'User-selected' start_fee = opt.tx_fee else: - desc = 'Network-estimated (mode: {})'.format(opt.fee_estimate_mode.upper()) + desc = f'Network-estimated (mode: {opt.fee_estimate_mode.upper()})' fee_per_kb,fe_type = await self.get_rel_fee_from_network() if fee_per_kb < 0: @@ -653,7 +654,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam def delete_attrs(self,desc,attr): for e in getattr(self,desc): - if hasattr(e,attr): delattr(e,attr) + if hasattr(e,attr): + delattr(e,attr) # inputs methods def copy_inputs_from_tw(self,tw_unspent_data): @@ -710,7 +712,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam from .baseconv import baseconv lines.append(baseconv.frombytes(self.label.encode(),'b58',tostr=True)) if self.coin_txid: - if not self.label: lines.append('-') # keep old tx files backwards compatible + if not self.label: + lines.append('-') # keep old tx files backwards compatible lines.append(self.coin_txid) self.chksum = make_chksum_6(' '.join(lines)) self.fmt_data = '\n'.join([self.chksum] + lines)+'\n' @@ -810,7 +813,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam async def get_status(self,status=False): - class r(object): pass + class r(object): + pass async def is_in_wallet(): try: ret = await g.rpc.call('gettransaction',self.coin_txid) @@ -888,7 +892,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam if not self.marked_signed(): die(1,'Transaction is not signed!') - self.check_correct_chain(on_fail='die') + self.check_correct_chain() self.check_pubkey_scripts() @@ -948,12 +952,6 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam self.add_blockcount() return True - def write_txid_to_file(self,ask_write=False,ask_write_default_yes=True): - fn = '{}[{}].{}'.format(self.txid,self.send_amt,self.txid_ext) - write_data_to_file(fn,self.coin_txid+'\n','transaction ID', - ask_write=ask_write, - ask_write_default_yes=ask_write_default_yes) - def create_fn(self): tl = self.get_hex_locktime() tn = ('','.testnet')[g.proto.testnet] @@ -975,9 +973,14 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam ask_tty=True, ask_overwrite=True): - if ask_write == False: ask_write_default_yes = True - if not self.fmt_data: self.format() - if not self.fn: self.create_fn() + if ask_write == False: + ask_write_default_yes = True + + if not self.fmt_data: + self.format() + + if not self.fn: + self.create_fn() write_data_to_file(self.fn,self.fmt_data,self.desc+add_desc, ask_overwrite=ask_overwrite, @@ -1002,7 +1005,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam def view(self,pager=False,pause=True,terse=False): o = self.format_view(terse=terse) - if pager: do_pager(o) + if pager: + do_pager(o) else: msg_r(o) from .term import get_char @@ -1085,11 +1089,11 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam return g.proto.coin_amt(self.get_fee_from_tx()).hl() def format_view_verbose_footer(self): - ts = len(self.hex)//2 if self.hex else 'unknown' - out = 'Transaction size: Vsize {} (estimated), Total {}'.format(self.estimate_size(),ts) + tsize = len(self.hex)//2 if self.hex else 'unknown' + out = f'Transaction size: Vsize {self.estimate_size()} (estimated), Total {tsize}' if self.marked_signed(): - ws = DeserializedTX(self.hex)['witness_size'] - out += ', Base {}, Witness {}'.format(ts-ws,ws) + wsize = DeserializedTX(self.hex)['witness_size'] + out += f', Base {tsize-wsize}, Witness {wsize}' return out + '\n' def format_view(self,terse=False,sort=dfl_view_sort_order): @@ -1107,39 +1111,45 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam sel_f = lambda o: len(o.mmid) + (2,8)[bool(o.is_chg)] # + len(' (chg)') return max(max([sel_f(o) for o in io if o.mmid] or [0]),len(nonmm_str)) - nonmm_str = '(non-{} address)'.format(g.proj_name) + nonmm_str = f'(non-{g.proj_name} address)' max_mmwid = max(get_max_mmwid(self.inputs),get_max_mmwid(self.outputs)) - out = (self.txview_hdr_fs,self.txview_hdr_fs_short)[bool(terse)].format( - i=self.txid.hl(), - a=self.send_amt.hl(), - c=g.dcoin, - t=self.timestamp, - r=(red('False'),green('True'))[self.is_replaceable()], - s=self.marked_signed(color=True), - l=(green('None'),orange(strfmt_locktime(self.locktime,terse=True)))[bool(self.locktime)]) + def gen_view(): + yield (self.txview_hdr_fs_short if terse else self.txview_hdr_fs).format( + i = self.txid.hl(), + a = self.send_amt.hl(), + c = g.dcoin, + t = self.timestamp, + r = (red('False'),green('True'))[self.is_replaceable()], + s = self.marked_signed(color=True), + l = (green('None'),orange(strfmt_locktime(self.locktime,terse=True)))[bool(self.locktime)] ) - if self.chain != 'mainnet': - out += green('Chain: {}\n'.format(self.chain.upper())) - if self.coin_txid: - out += '{} TxID: {}\n'.format(g.coin,self.coin_txid.hl()) - enl = ('\n','')[bool(terse)] - out += enl - if self.label: - out += 'Comment: {}\n{}'.format(self.label.hl(),enl) + if self.chain != 'mainnet': + yield green(f'Chain: {self.chain.upper()}') + '\n' - out += self.format_view_body(blockcount,nonmm_str,max_mmwid,enl,terse=terse,sort=sort) + if self.coin_txid: + yield f'{g.coin} TxID: {self.coin_txid.hl()}\n' - out += (self.txview_ftr_fs,self.txview_ftr_fs_short)[bool(terse)].format( - i=self.sum_inputs().hl(), - o=self.sum_outputs().hl(), - a=self.format_view_abs_fee(), - r=self.format_view_rel_fee(terse), - d=g.dcoin,c=g.coin) + enl = ('\n','')[bool(terse)] + yield enl - if opt.verbose: out += self.format_view_verbose_footer() + if self.label: + yield f'Comment: {self.label.hl()}\n{enl}' - return out # TX label might contain non-ascii chars + yield self.format_view_body(blockcount,nonmm_str,max_mmwid,enl,terse=terse,sort=sort) + + yield (self.txview_ftr_fs_short if terse else self.txview_ftr_fs).format( + i = self.sum_inputs().hl(), + o = self.sum_outputs().hl(), + a = self.format_view_abs_fee(), + r = self.format_view_rel_fee(terse), + d = g.dcoin, + c = g.coin ) + + if opt.verbose: + yield self.format_view_verbose_footer() + + return ''.join(gen_view()) # TX label might contain non-ascii chars def check_txfile_hex_data(self): self.hex = HexStr(self.hex,on_fail='raise') @@ -1161,7 +1171,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam assert type(d) == list,'{} data not a list!'.format(desc) if not (desc == 'outputs' and g.proto.base_coin == 'ETH'): # ETH txs can have no outputs assert len(d),'no {}!'.format(desc) - for e in d: e['amt'] = g.proto.coin_amt(e['amt']) + for e in d: + e['amt'] = g.proto.coin_amt(e['amt']) io,io_list = ( (MMGenTxOutput,MMGenTxOutputList), (MMGenTxInput,MMGenTxInputList) @@ -1224,7 +1235,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam desc = 'block count in metadata' self.blockcount = int(blockcount) - if metadata_only: return + if metadata_only: + return desc = 'send amount in metadata' self.send_amt = g.proto.coin_amt(send_amt,on_fail='raise') @@ -1241,7 +1253,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam desc = 'outputs data' self.outputs = eval_io_data(outputs_data,'outputs') except Exception as e: - die(2,'Invalid {} in transaction file: {}'.format(desc,e.args[0])) + die(2,f'Invalid {desc} in transaction file: {e.args[0]}') # test doesn't work for Ethereum: test and mainnet addrs have same format if not self.chain and not self.inputs[0].addr.is_for_chain('testnet'): @@ -1260,7 +1272,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam coin_addr = mmaddr2coinaddr(addr,ad_w,ad_f) if is_mmgen_id(addr) else CoinAddr(addr) self.add_output(coin_addr,g.proto.coin_amt(amt or '0'),is_chg=not amt) else: - die(2,"{}: invalid {} '{}'".format(addr,err_desc,','.join((addr,amt)) if amt else addr)) + die(2,f'{addr}: invalid {err_desc} {{!r}}'.format(f'{addr},{amt}' if amt else addr)) if ',' in arg: addr,amt = arg.split(',',1) @@ -1270,15 +1282,16 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam def process_cmd_args(self,cmd_args,ad_f,ad_w): - for a in cmd_args: self.process_cmd_arg(a,ad_f,ad_w) + for a in cmd_args: + self.process_cmd_arg(a,ad_f,ad_w) if self.get_chg_output_idx() == None: die(2,( 'ERROR: No change output specified', self.msg_no_change_output.format(g.dcoin))[len(self.outputs) == 1]) if not segwit_is_active() and self.has_segwit_outputs(): - fs = '{} Segwit address requested on the command line, but Segwit is not active on this chain' - rdie(2,fs.format(g.proj_name)) + rdie(2,f'{g.proj_name} Segwit address requested on the command line, ' + + 'but Segwit is not active on this chain') if not self.outputs: die(2,'At least one output must be specified on the command line') @@ -1328,34 +1341,35 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam msg(self.msg_low_coin.format(g.proto.coin_amt(-change_amt).hl(),g.coin)) def final_inputs_ok_msg(self,change_amt): - m = 'Transaction produces {} {} in change' - return m.format(g.proto.coin_amt(change_amt).hl(),g.coin) + return f'Transaction produces {g.proto.coin_amt(change_amt).hl()} {g.coin} in change' def select_unspent_cmdline(self,unspent): - sel_nums = [] - for i in opt.inputs.split(','): - ls = len(sel_nums) - if is_mmgen_id(i): - for j in range(len(unspent)): - if unspent[j].twmmid == i: - sel_nums.append(j+1) - elif is_coin_addr(i): - for j in range(len(unspent)): - if unspent[j].addr == i: - sel_nums.append(j+1) - else: - die(1,"'{}': not an MMGen ID or coin address".format(i)) - ldiff = len(sel_nums) - ls - if ldiff: - sel_inputs = ','.join([str(i) for i in sel_nums[-ldiff:]]) - ul = unspent[sel_nums[-1]-1] - mmid_disp = ' (' + ul.twmmid + ')' if ul.twmmid.type == 'mmgen' else '' - msg('Adding input{}: {} {}{}'.format(suf(ldiff),sel_inputs,ul.addr,mmid_disp)) - else: - die(1,"'{}': address not found in tracking wallet".format(i)) + def idx2num(idx): + uo = unspent[idx] + mmid_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else '' + msg(f'Adding input: {idx + 1} {uo.addr}{mmid_disp}') + return idx + 1 - return set(sel_nums) # silently discard duplicates + def get_uo_nums(): + for addr in opt.inputs.split(','): + if is_mmgen_id(addr): + attr = 'twmmid' + elif is_coin_addr(addr): + attr = 'addr' + else: + die(1,f'{addr!r}: not an MMGen ID or {g.coin} address') + + found = False + for idx in range(len(unspent)): + if getattr(unspent[idx],attr) == addr: + yield idx2num(idx) + found = True + + if not found: + die(1,f'{addr!r}: address not found in tracking wallet') + + return set(get_uo_nums()) # silently discard duplicates async def get_cmdline_input_addrs(self): # Bitcoin full node, call doesn't go to the network, so just call listunspent with addrs=[] @@ -1367,7 +1381,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam us_f = self.select_unspent_cmdline if opt.inputs else self.select_unspent sel_nums = us_f(self.twuo.unspent) - msg('Selected output{}: {}'.format(suf(sel_nums),' '.join(map(str,sel_nums)))) + msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums))) sel_unspent = self.twuo.MMGenTwOutputList([self.twuo.unspent[i-1] for i in sel_nums]) inputs_sum = sum(s.amt for s in sel_unspent) @@ -1390,7 +1404,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam if change_amt >= 0: p = self.final_inputs_ok_msg(change_amt) if opt.yes or keypress_confirm(p+'. OK?',default_yes=True): - if opt.yes: msg(p) + if opt.yes: + msg(p) return change_amt else: self.warn_insufficient_chg(change_amt) @@ -1434,9 +1449,8 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam self.send_amt = self.sum_outputs() - msg('Total amount to spend: {}'.format( - ('Unknown','{} {}'.format(self.send_amt.hl(),g.dcoin))[bool(self.send_amt)] - )) + msg_r('Total amount to spend: ') + msg(f'{self.send_amt.hl()} {g.dcoin}' if self.send_amt else 'Unknown') change_amt = await self.get_inputs_from_user() @@ -1447,8 +1461,10 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam self.inputs.sort_bip69() self.outputs.sort_bip69() # do this only after inputs are sorted - if opt.rbf: self.inputs[0].sequence = g.max_int - 2 # handles the locktime case too - elif locktime: self.inputs[0].sequence = g.max_int - 1 + if opt.rbf: + self.inputs[0].sequence = g.max_int - 2 # handles the locktime case too + elif locktime: + self.inputs[0].sequence = g.max_int - 1 if not opt.yes: self.add_comment() # edits an existing comment @@ -1456,7 +1472,7 @@ Selected non-{pnm} inputs: {{}}""".strip().format(pnm=g.proj_name,pnl=g.proj_nam await self.create_raw() # creates self.hex, self.txid if g.proto.base_proto == 'Bitcoin' and locktime: - msg('Setting nlocktime to {}!'.format(strfmt_locktime(locktime))) + msg(f'Setting nlocktime to {strfmt_locktime(locktime)}!') self.set_hex_locktime(locktime) self.update_txid() self.locktime = locktime @@ -1487,16 +1503,18 @@ class MMGenTxForSigning(MMGenTX): msg('Transaction is already signed!') return False - if not self.check_correct_chain(on_fail='return'): + try: + self.check_correct_chain() + except TransactionChainMismatch: return False if (self.has_segwit_inputs() or self.has_segwit_outputs()) and not g.proto.cap('segwit'): - ymsg("TX has Segwit inputs or outputs, but {} doesn't support Segwit!".format(g.coin)) + ymsg(f"TX has Segwit inputs or outputs, but {g.coin} doesn't support Segwit!") return False self.check_pubkey_scripts() - qmsg('Passing {} key{} to {}'.format(len(keys),suf(keys),g.proto.daemon_name)) + qmsg(f'Passing {len(keys)} key{suf(keys)} to {g.proto.daemon_name}') if self.has_segwit_inputs(): from .addr import KeyGenerator,AddrGenerator @@ -1513,12 +1531,12 @@ class MMGenTxForSigning(MMGenTX): e['redeemScript'] = ag.to_segwit_redeem_script(kg.to_pubhex(keydict[d.addr])) sig_data.append(e) - msg_r('Signing transaction{}...'.format(tx_num_str)) + msg_r(f'Signing transaction{tx_num_str}...') wifs = [d.sec.wif for d in keys] try: args = ( - ('signrawtransaction',self.hex,sig_data,wifs,g.proto.sighash_type), + ('signrawtransaction', self.hex,sig_data,wifs,g.proto.sighash_type), ('signrawtransactionwithkey',self.hex,wifs,sig_data,g.proto.sighash_type) )['sign_with_key' in g.rpc.caps] ret = await g.rpc.call(*args) @@ -1562,22 +1580,23 @@ class MMGenBumpTX(MMGenTxForSigning): super().__init__(filename,tw=tw) if not self.is_replaceable(): - die(1,"Transaction '{}' is not replaceable".format(self.txid)) + die(1,f'Transaction {self.txid} is not replaceable') # If sending, require tx to be signed if send: if not self.marked_signed(): - die(1,"File '{}' is not a signed {} transaction file".format(filename,g.proj_name)) + die(1,'File {filename!r} is not a signed {g.proj_name} transaction file') if not self.coin_txid: - die(1,"Transaction '{}' was not broadcast to the network".format(self.txid)) + die(1,'Transaction {self.txid!r} was not broadcast to the network') self.coin_txid = '' self.mark_raw() def check_bumpable(self): if not [o.amt for o in self.outputs if o.amt >= self.min_fee]: - die(1,'Transaction cannot be bumped.' + - '\nAll outputs have less than the minimum fee ({} {})'.format(self.min_fee,g.coin)) + die(1, + 'Transaction cannot be bumped.\n' + + f'All outputs contain less than the minimum fee ({self.min_fee} {g.coin})') def choose_output(self): chg_idx = self.get_chg_output_idx() @@ -1585,8 +1604,7 @@ class MMGenBumpTX(MMGenTxForSigning): def check_sufficient_funds(o_amt): if o_amt < self.min_fee: - msg('Minimum fee ({} {c}) is greater than output amount ({} {c})'.format( - self.min_fee,o_amt,c=g.coin)) + msg(f'Minimum fee ({self.min_fee} {g.coin}) is greater than output amount ({o_amt} {g.coin})') return False return True @@ -1604,21 +1622,21 @@ class MMGenBumpTX(MMGenTxForSigning): else: reply,init_reply = init_reply,None if chg_idx == None and not is_int(reply): - msg("Output must be an integer") + msg('Output must be an integer') elif chg_idx != None and not is_int(reply) and reply != 'c': msg("Output must be an integer, or 'c' for the change output") else: idx = chg_idx if reply == 'c' else (int(reply) - 1) if idx < 0 or idx >= len(self.outputs): - msg('Output must be in the range 1-{}'.format(len(self.outputs))) + msg(f'Output must be in the range 1-{len(self.outputs)}') else: o_amt = self.outputs[idx].amt - cs = ('',' (change output)')[chg_idx == idx] - p = 'Fee will be deducted from output {}{} ({} {})'.format(idx+1,cs,o_amt,g.coin) + cm = ' (change output)' if chg_idx == idx else '' + prompt = f'Fee will be deducted from output {idx+1}{cm} ({o_amt} {g.coin})' if check_sufficient_funds(o_amt): - if opt.yes or keypress_confirm(p+'. OK?',default_yes=True): + if opt.yes or keypress_confirm(prompt+'. OK?',default_yes=True): if opt.yes: - msg(p) + msg(prompt) self.bump_output_idx = idx return idx @@ -1634,11 +1652,20 @@ class MMGenBumpTX(MMGenTxForSigning): ret = super().convert_and_check_fee(tx_fee,desc) if ret < self.min_fee: msg('{} {c}: {} fee too small. Minimum fee: {} {c} ({} {})'.format( - ret,desc,self.min_fee,self.fee_abs2rel(self.min_fee.hl()),self.rel_fee_desc,c=g.coin)) + ret.hl(), + desc, + self.min_fee, + self.fee_abs2rel(self.min_fee.hl()), + self.rel_fee_desc, + c = g.coin )) return False output_amt = self.outputs[self.bump_output_idx].amt if ret >= output_amt: - msg('{} {c}: {} fee too large. Maximum fee: <{} {c}'.format(ret.hl(),desc,output_amt.hl(),c=g.coin)) + msg('{} {c}: {} fee too large. Maximum fee: <{} {c}'.format( + ret.hl(), + desc, + output_amt.hl(), + c = g.coin )) return False return ret @@ -1679,7 +1706,8 @@ class MMGenSplitTX(MMGenTX): if change_amt >= 0: p = 'Transaction produces {} {} in change'.format(change_amt.hl(),g.coin) if opt.yes or keypress_confirm(p+'. OK?',default_yes=True): - if opt.yes: msg(p) + if opt.yes: + msg(p) break else: self.warn_insufficient_chg(change_amt)