|
@@ -16,9 +16,9 @@ from collections import namedtuple
|
|
|
|
|
|
from .base import Base
|
|
|
from ..cfg import gc
|
|
|
-from ..color import pink,yellow
|
|
|
-from ..obj import get_obj,MMGenList
|
|
|
-from ..util import msg,fmt,die,suf,remove_dups,get_extension
|
|
|
+from ..color import pink, yellow
|
|
|
+from ..obj import get_obj, MMGenList
|
|
|
+from ..util import msg, fmt, die, suf, remove_dups, get_extension
|
|
|
from ..addr import (
|
|
|
is_mmgen_id,
|
|
|
MMGenAddrType,
|
|
@@ -29,7 +29,7 @@ from ..addr import (
|
|
|
is_addrlist_id
|
|
|
)
|
|
|
|
|
|
-def mmaddr2coinaddr(cfg,mmaddr,ad_w,ad_f,proto):
|
|
|
+def mmaddr2coinaddr(cfg, mmaddr, ad_w, ad_f, proto):
|
|
|
|
|
|
def wmsg(k):
|
|
|
messages = {
|
|
@@ -49,7 +49,7 @@ def mmaddr2coinaddr(cfg,mmaddr,ad_w,ad_f,proto):
|
|
|
address file for it on the command line.
|
|
|
"""
|
|
|
}
|
|
|
- return '\n' + fmt(messages[k],indent=' ')
|
|
|
+ return '\n' + fmt(messages[k], indent=' ')
|
|
|
|
|
|
# assume mmaddr has already been checked
|
|
|
coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
|
|
@@ -60,15 +60,15 @@ def mmaddr2coinaddr(cfg,mmaddr,ad_w,ad_f,proto):
|
|
|
if coin_addr:
|
|
|
msg(wmsg('addr_in_addrfile_only'))
|
|
|
from ..ui import keypress_confirm
|
|
|
- if not (cfg.yes or keypress_confirm( cfg, 'Continue anyway?' )):
|
|
|
+ if not (cfg.yes or keypress_confirm(cfg, 'Continue anyway?')):
|
|
|
import sys
|
|
|
sys.exit(1)
|
|
|
else:
|
|
|
- die(2,wmsg('addr_not_found'))
|
|
|
+ die(2, wmsg('addr_not_found'))
|
|
|
else:
|
|
|
- die(2,wmsg('addr_not_found_no_addrfile'))
|
|
|
+ die(2, wmsg('addr_not_found_no_addrfile'))
|
|
|
|
|
|
- return CoinAddr(proto,coin_addr)
|
|
|
+ return CoinAddr(proto, coin_addr)
|
|
|
|
|
|
class New(Base):
|
|
|
|
|
@@ -81,32 +81,32 @@ class New(Base):
|
|
|
"""
|
|
|
chg_autoselected = False
|
|
|
|
|
|
- def update_output_amt(self,idx,amt):
|
|
|
+ def update_output_amt(self, idx, amt):
|
|
|
o = self.outputs[idx]._asdict()
|
|
|
o['amt'] = amt
|
|
|
- self.outputs[idx] = self.Output(self.proto,**o)
|
|
|
+ self.outputs[idx] = self.Output(self.proto, **o)
|
|
|
|
|
|
- def add_mmaddrs_to_outputs(self,ad_w,ad_f):
|
|
|
+ def add_mmaddrs_to_outputs(self, ad_w, ad_f):
|
|
|
a = [e.addr for e in self.outputs]
|
|
|
d = ad_w.make_reverse_dict(a)
|
|
|
if ad_f:
|
|
|
d.update(ad_f.make_reverse_dict(a))
|
|
|
for e in self.outputs:
|
|
|
if e.addr and e.addr in d:
|
|
|
- e.mmid,f = d[e.addr]
|
|
|
+ e.mmid, f = d[e.addr]
|
|
|
if f:
|
|
|
e.comment = f
|
|
|
|
|
|
- def check_dup_addrs(self,io_str):
|
|
|
- assert io_str in ('inputs','outputs')
|
|
|
- addrs = [e.addr for e in getattr(self,io_str)]
|
|
|
+ def check_dup_addrs(self, io_str):
|
|
|
+ assert io_str in ('inputs', 'outputs')
|
|
|
+ addrs = [e.addr for e in getattr(self, io_str)]
|
|
|
if len(addrs) != len(set(addrs)):
|
|
|
- die(2,f'{addrs}: duplicate address in transaction {io_str}')
|
|
|
+ die(2, f'{addrs}: duplicate address in transaction {io_str}')
|
|
|
|
|
|
# given tx size and absolute fee or fee spec, return absolute fee
|
|
|
# relative fee is N+<first letter of unit name>
|
|
|
- def feespec2abs(self,fee_arg,tx_size):
|
|
|
- fee = get_obj(self.proto.coin_amt,num=fee_arg,silent=True)
|
|
|
+ def feespec2abs(self, fee_arg, tx_size):
|
|
|
+ fee = get_obj(self.proto.coin_amt, num=fee_arg, silent=True)
|
|
|
if fee:
|
|
|
return fee
|
|
|
else:
|
|
@@ -114,66 +114,66 @@ class New(Base):
|
|
|
units = {u[0]:u for u in self.proto.coin_amt.units}
|
|
|
pat = re.compile(r'([1-9][0-9]*)({})'.format('|'.join(units)))
|
|
|
if pat.match(fee_arg):
|
|
|
- amt,unit = pat.match(fee_arg).groups()
|
|
|
- return self.fee_rel2abs(tx_size,units,int(amt),unit)
|
|
|
+ amt, unit = pat.match(fee_arg).groups()
|
|
|
+ return self.fee_rel2abs(tx_size, units, int(amt), unit)
|
|
|
return False
|
|
|
|
|
|
- def get_usr_fee_interactive(self,fee=None,desc='Starting'):
|
|
|
+ def get_usr_fee_interactive(self, fee=None, desc='Starting'):
|
|
|
abs_fee = None
|
|
|
from ..ui import line_input
|
|
|
while True:
|
|
|
if fee:
|
|
|
- abs_fee = self.convert_and_check_fee(fee,desc)
|
|
|
+ abs_fee = self.convert_and_check_fee(fee, desc)
|
|
|
if abs_fee:
|
|
|
prompt = '{a} TX fee{b}: {c}{d} {e} ({f} {g})\n'.format(
|
|
|
- a = desc,
|
|
|
- b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
|
|
|
+ a = desc,
|
|
|
+ b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
|
|
|
if self.cfg.fee_adjust != 1 and desc.startswith('Network-estimated')
|
|
|
else ''),
|
|
|
- c = ('','≈')[self.fee_is_approximate],
|
|
|
- d = abs_fee.hl(),
|
|
|
- e = self.coin,
|
|
|
- f = pink(str(self.fee_abs2rel(abs_fee))),
|
|
|
- g = self.rel_fee_disp)
|
|
|
+ c = ('', '≈')[self.fee_is_approximate],
|
|
|
+ d = abs_fee.hl(),
|
|
|
+ e = self.coin,
|
|
|
+ f = pink(str(self.fee_abs2rel(abs_fee))),
|
|
|
+ g = self.rel_fee_disp)
|
|
|
from ..ui import keypress_confirm
|
|
|
- if self.cfg.yes or keypress_confirm( self.cfg, prompt+'OK?', default_yes=True ):
|
|
|
+ if self.cfg.yes or keypress_confirm(self.cfg, prompt+'OK?', default_yes=True):
|
|
|
if self.cfg.yes:
|
|
|
msg(prompt)
|
|
|
return abs_fee
|
|
|
- fee = line_input( self.cfg, self.usr_fee_prompt )
|
|
|
+ fee = line_input(self.cfg, self.usr_fee_prompt)
|
|
|
desc = 'User-selected'
|
|
|
|
|
|
# we don't know fee yet, so perform preliminary check with fee == 0
|
|
|
- async def precheck_sufficient_funds(self,inputs_sum,sel_unspent,outputs_sum):
|
|
|
+ async def precheck_sufficient_funds(self, inputs_sum, sel_unspent, outputs_sum):
|
|
|
if self.twuo.total < outputs_sum:
|
|
|
- msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
|
|
|
+ msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum, self.dcoin))
|
|
|
return False
|
|
|
if inputs_sum < outputs_sum:
|
|
|
- msg(self.msg_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
|
|
|
+ msg(self.msg_low_coin.format(outputs_sum-inputs_sum, self.dcoin))
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
- async def get_fee_from_user(self,have_estimate_fail=[]):
|
|
|
+ async def get_fee_from_user(self, have_estimate_fail=[]):
|
|
|
|
|
|
if self.cfg.fee:
|
|
|
desc = 'User-selected'
|
|
|
start_fee = self.cfg.fee
|
|
|
else:
|
|
|
desc = self.network_estimated_fee_label
|
|
|
- fee_per_kb,fe_type = await self.get_rel_fee_from_network()
|
|
|
+ fee_per_kb, fe_type = await self.get_rel_fee_from_network()
|
|
|
|
|
|
if fee_per_kb < 0:
|
|
|
if not have_estimate_fail:
|
|
|
- msg(self.fee_fail_fs.format(c=self.cfg.fee_estimate_confs,t=fe_type))
|
|
|
+ msg(self.fee_fail_fs.format(c=self.cfg.fee_estimate_confs, t=fe_type))
|
|
|
have_estimate_fail.append(True)
|
|
|
start_fee = None
|
|
|
else:
|
|
|
- start_fee = self.fee_est2abs(fee_per_kb,fe_type)
|
|
|
+ start_fee = self.fee_est2abs(fee_per_kb, fe_type)
|
|
|
|
|
|
- return self.get_usr_fee_interactive(start_fee,desc=desc)
|
|
|
+ return self.get_usr_fee_interactive(start_fee, desc=desc)
|
|
|
|
|
|
- def add_output(self,coinaddr,amt,is_chg=None):
|
|
|
- self.outputs.append(self.Output(self.proto,addr=coinaddr,amt=amt,is_chg=is_chg))
|
|
|
+ def add_output(self, coinaddr, amt, is_chg=None):
|
|
|
+ self.outputs.append(self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg))
|
|
|
|
|
|
def parse_cmd_arg(self, arg_in, ad_f, ad_w):
|
|
|
|
|
@@ -195,7 +195,7 @@ class New(Base):
|
|
|
|
|
|
return _pa(arg, mmid, coin_addr, amt)
|
|
|
|
|
|
- async def process_cmd_args(self,cmd_args,ad_f,ad_w):
|
|
|
+ async def process_cmd_args(self, cmd_args, ad_f, ad_w):
|
|
|
|
|
|
async def get_autochg_addr(arg, parsed_args):
|
|
|
from ..tw.addresses import TwAddresses
|
|
@@ -212,7 +212,7 @@ class New(Base):
|
|
|
if res:
|
|
|
return res
|
|
|
|
|
|
- die(2,'Tracking wallet contains no {t}addresses {d} {a!r}'.format(
|
|
|
+ die(2, 'Tracking wallet contains no {t}addresses {d} {a!r}'.format(
|
|
|
t = '' if res is None else 'unused ',
|
|
|
d = desc,
|
|
|
a = arg))
|
|
@@ -245,8 +245,8 @@ class New(Base):
|
|
|
if not self.outputs:
|
|
|
die(2, 'At least one output must be specified on the command line')
|
|
|
|
|
|
- async def get_outputs_from_cmdline(self,cmd_args):
|
|
|
- from ..addrdata import AddrData,TwAddrData
|
|
|
+ async def get_outputs_from_cmdline(self, cmd_args):
|
|
|
+ from ..addrdata import AddrData, TwAddrData
|
|
|
from ..addrlist import AddrList
|
|
|
from ..addrfile import AddrFile
|
|
|
addrfiles = remove_dups(
|
|
@@ -264,13 +264,13 @@ class New(Base):
|
|
|
from ..fileutil import check_infile
|
|
|
for addrfile in addrfiles:
|
|
|
check_infile(addrfile)
|
|
|
- ad_f.add(AddrList( self.cfg, self.proto, addrfile ))
|
|
|
+ ad_f.add(AddrList(self.cfg, self.proto, addrfile))
|
|
|
|
|
|
- ad_w = await TwAddrData(self.cfg,self.proto,twctl=self.twctl)
|
|
|
+ ad_w = await TwAddrData(self.cfg, self.proto, twctl=self.twctl)
|
|
|
|
|
|
- await self.process_cmd_args(cmd_args,ad_f,ad_w)
|
|
|
+ await self.process_cmd_args(cmd_args, ad_f, ad_w)
|
|
|
|
|
|
- self.add_mmaddrs_to_outputs(ad_w,ad_f)
|
|
|
+ self.add_mmaddrs_to_outputs(ad_w, ad_f)
|
|
|
self.check_dup_addrs('outputs')
|
|
|
|
|
|
if self.chg_output is not None:
|
|
@@ -279,19 +279,19 @@ class New(Base):
|
|
|
elif len(self.outputs) > 1:
|
|
|
await self.warn_chg_addr_used(self.chg_output)
|
|
|
|
|
|
- def confirm_autoselected_addr(self,chg):
|
|
|
+ def confirm_autoselected_addr(self, chg):
|
|
|
from ..ui import keypress_confirm
|
|
|
if not keypress_confirm(
|
|
|
self.cfg,
|
|
|
'Using {a} as {b} address. OK?'.format(
|
|
|
a = chg.mmid.hl(),
|
|
|
- b = 'single output' if len(self.outputs) == 1 else 'change' ),
|
|
|
- default_yes = True ):
|
|
|
- die(1,'Exiting at user request')
|
|
|
+ b = 'single output' if len(self.outputs) == 1 else 'change'),
|
|
|
+ default_yes = True):
|
|
|
+ die(1, 'Exiting at user request')
|
|
|
|
|
|
- async def warn_chg_addr_used(self,chg):
|
|
|
+ async def warn_chg_addr_used(self, chg):
|
|
|
from ..tw.addresses import TwAddresses
|
|
|
- if (await TwAddresses(self.cfg,self.proto,get_data=True)).is_used(chg.addr):
|
|
|
+ if (await TwAddresses(self.cfg, self.proto, get_data=True)).is_used(chg.addr):
|
|
|
from ..ui import keypress_confirm
|
|
|
if not keypress_confirm(
|
|
|
self.cfg,
|
|
@@ -302,24 +302,24 @@ class New(Base):
|
|
|
d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
|
|
|
),
|
|
|
complete_prompt = True,
|
|
|
- default_yes = False ):
|
|
|
- die(1,'Exiting at user request')
|
|
|
+ default_yes = False):
|
|
|
+ die(1, 'Exiting at user request')
|
|
|
|
|
|
# inputs methods
|
|
|
- def select_unspent(self,unspent):
|
|
|
+ def select_unspent(self, unspent):
|
|
|
prompt = 'Enter a range or space-separated list of outputs to spend: '
|
|
|
from ..ui import line_input
|
|
|
while True:
|
|
|
- reply = line_input( self.cfg, prompt ).strip()
|
|
|
+ reply = line_input(self.cfg, prompt).strip()
|
|
|
if reply:
|
|
|
from ..addrlist import AddrIdxList
|
|
|
- selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
|
|
|
+ selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()))
|
|
|
if selected:
|
|
|
if selected[-1] <= len(unspent):
|
|
|
return selected
|
|
|
msg(f'Unspent output number must be <= {len(unspent)}')
|
|
|
|
|
|
- def select_unspent_cmdline(self,unspent):
|
|
|
+ def select_unspent_cmdline(self, unspent):
|
|
|
|
|
|
def idx2num(idx):
|
|
|
uo = unspent[idx]
|
|
@@ -329,42 +329,42 @@ class New(Base):
|
|
|
|
|
|
def get_uo_nums():
|
|
|
for addr in self.cfg.inputs.split(','):
|
|
|
- if is_mmgen_id(self.proto,addr):
|
|
|
+ if is_mmgen_id(self.proto, addr):
|
|
|
attr = 'twmmid'
|
|
|
- elif is_coin_addr(self.proto,addr):
|
|
|
+ elif is_coin_addr(self.proto, addr):
|
|
|
attr = 'addr'
|
|
|
else:
|
|
|
- die(1,f'{addr!r}: not an MMGen ID or {self.coin} address')
|
|
|
+ die(1, f'{addr!r}: not an MMGen ID or {self.coin} address')
|
|
|
|
|
|
found = False
|
|
|
- for idx,us in enumerate(unspent):
|
|
|
- if getattr(us,attr) == addr:
|
|
|
+ for idx, us in enumerate(unspent):
|
|
|
+ if getattr(us, attr) == addr:
|
|
|
yield idx2num(idx)
|
|
|
found = True
|
|
|
|
|
|
if not found:
|
|
|
- die(1,f'{addr!r}: address not found in tracking wallet')
|
|
|
+ die(1, f'{addr!r}: address not found in tracking wallet')
|
|
|
|
|
|
return set(get_uo_nums()) # silently discard duplicates
|
|
|
|
|
|
- def copy_inputs_from_tw(self,tw_unspent_data):
|
|
|
+ def copy_inputs_from_tw(self, tw_unspent_data):
|
|
|
def gen_inputs():
|
|
|
for d in tw_unspent_data:
|
|
|
i = self.Input(
|
|
|
self.proto,
|
|
|
- **{attr:getattr(d,attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs} )
|
|
|
+ **{attr:getattr(d, attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs})
|
|
|
if d.twmmid.type == 'mmgen':
|
|
|
i.mmid = d.twmmid # twmmid -> mmid
|
|
|
yield i
|
|
|
- self.inputs = type(self.inputs)(self,list(gen_inputs()))
|
|
|
+ self.inputs = type(self.inputs)(self, list(gen_inputs()))
|
|
|
|
|
|
- def warn_insufficient_funds(self,funds_left):
|
|
|
- msg(self.msg_low_coin.format(self.proto.coin_amt(-funds_left).hl(),self.coin))
|
|
|
+ def warn_insufficient_funds(self, funds_left):
|
|
|
+ msg(self.msg_low_coin.format(self.proto.coin_amt(-funds_left).hl(), self.coin))
|
|
|
|
|
|
- async def get_funds_left(self,fee,outputs_sum):
|
|
|
+ async def get_funds_available(self, fee, outputs_sum):
|
|
|
return self.sum_inputs() - outputs_sum - fee
|
|
|
|
|
|
- async def get_inputs_from_user(self,outputs_sum):
|
|
|
+ async def get_inputs_from_user(self, outputs_sum):
|
|
|
|
|
|
while True:
|
|
|
us_f = self.select_unspent_cmdline if self.cfg.inputs else self.select_unspent
|
|
@@ -374,28 +374,28 @@ class New(Base):
|
|
|
sel_unspent = MMGenList(self.twuo.data[i-1] for i in sel_nums)
|
|
|
|
|
|
inputs_sum = sum(s.amt for s in sel_unspent)
|
|
|
- if not await self.precheck_sufficient_funds(inputs_sum,sel_unspent,outputs_sum):
|
|
|
+ if not await self.precheck_sufficient_funds(inputs_sum, sel_unspent, outputs_sum):
|
|
|
continue
|
|
|
|
|
|
self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
|
|
|
|
|
|
self.usr_fee = await self.get_fee_from_user()
|
|
|
|
|
|
- funds_left = await self.get_funds_left(self.usr_fee,outputs_sum)
|
|
|
+ funds_left = await self.get_funds_available(self.usr_fee,outputs_sum)
|
|
|
|
|
|
if funds_left >= 0:
|
|
|
p = self.final_inputs_ok_msg(funds_left)
|
|
|
from ..ui import keypress_confirm
|
|
|
- if self.cfg.yes or keypress_confirm( self.cfg, p+'. OK?', default_yes=True ):
|
|
|
+ if self.cfg.yes or keypress_confirm(self.cfg, p+'. OK?', default_yes=True):
|
|
|
if self.cfg.yes:
|
|
|
msg(p)
|
|
|
return funds_left
|
|
|
else:
|
|
|
self.warn_insufficient_funds(funds_left)
|
|
|
|
|
|
- async def create(self,cmd_args,locktime=None,do_info=False,caller='txcreate'):
|
|
|
+ async def create(self, cmd_args, locktime=None, do_info=False, caller='txcreate'):
|
|
|
|
|
|
- assert isinstance( locktime, (int,type(None)) ), 'locktime must be of type int'
|
|
|
+ assert isinstance(locktime, (int, type(None))), 'locktime must be of type int'
|
|
|
|
|
|
from ..tw.unspent import TwUnspentOutputs
|
|
|
|
|
@@ -404,7 +404,7 @@ class New(Base):
|
|
|
|
|
|
twuo_addrs = await self.get_input_addrs_from_cmdline()
|
|
|
|
|
|
- self.twuo = await TwUnspentOutputs(self.cfg,self.proto,minconf=self.cfg.minconf,addrs=twuo_addrs)
|
|
|
+ self.twuo = await TwUnspentOutputs(self.cfg, self.proto, minconf=self.cfg.minconf, addrs=twuo_addrs)
|
|
|
await self.twuo.get_data()
|
|
|
|
|
|
if not do_info:
|