whitespace: tw

This commit is contained in:
The MMGen Project 2024-10-18 10:32:10 +00:00
commit 4ea4e5e472
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
10 changed files with 488 additions and 483 deletions

View file

@ -12,11 +12,11 @@
tw.addresses: Tracking wallet listaddresses class for the MMGen suite
"""
from ..util import msg,suf,is_int
from ..obj import MMGenListItem,ImmutableAttr,ListItemAttr,TwComment,NonNegativeInt
from ..addr import CoinAddr,MMGenID,MMGenAddrType
from ..util import msg, suf, is_int
from ..obj import MMGenListItem, ImmutableAttr, ListItemAttr, TwComment, NonNegativeInt
from ..addr import CoinAddr, MMGenID, MMGenAddrType
from ..amt import CoinAmtChk
from ..color import red,green,yellow
from ..color import red, green, yellow
from .view import TwView
from .shared import TwMMGenID
@ -28,7 +28,7 @@ class TwAddresses(TwView):
sort_key = 'twmmid'
update_widths_on_age_toggle = True
print_output_types = ('detail',)
filters = ('showempty','showused','all_labels')
filters = ('showempty', 'showused', 'all_labels')
showcoinaddrs = True
showempty = True
showused = 1 # tristate: 0:no, 1:yes, 2:all
@ -39,50 +39,50 @@ class TwAddresses(TwView):
class display_type(TwView.display_type):
class squeezed(TwView.display_type.squeezed):
cols = ('num','mmid','used','addr','comment','amt','date')
cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'date')
fmt_method = 'gen_display'
class detail(TwView.display_type.detail):
cols = ('num','mmid','used','addr','comment','amt','block','date_time')
cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'block', 'date_time')
fmt_method = 'gen_display'
class TwAddress(MMGenListItem):
valid_attrs = {'twmmid','addr','al_id','confs','comment','amt','recvd','date','skip'}
valid_attrs = {'twmmid', 'addr', 'al_id', 'confs', 'comment', 'amt', 'recvd', 'date', 'skip'}
invalid_attrs = {'proto'}
twmmid = ImmutableAttr(TwMMGenID,include_proto=True) # contains confs,txid(unused),date(unused),al_id
addr = ImmutableAttr(CoinAddr,include_proto=True)
twmmid = ImmutableAttr(TwMMGenID, include_proto=True) # contains confs,txid(unused),date(unused),al_id
addr = ImmutableAttr(CoinAddr, include_proto=True)
al_id = ImmutableAttr(str) # set to '_' for non-MMGen addresses
confs = ImmutableAttr(int,typeconv=False)
comment = ListItemAttr(TwComment,reassign_ok=True)
confs = ImmutableAttr(int, typeconv=False)
comment = ListItemAttr(TwComment, reassign_ok=True)
amt = ImmutableAttr(CoinAmtChk, include_proto=True)
recvd = ImmutableAttr(CoinAmtChk, include_proto=True)
date = ListItemAttr(int,typeconv=False,reassign_ok=True)
skip = ListItemAttr(str,typeconv=False,reassign_ok=True)
date = ListItemAttr(int, typeconv=False, reassign_ok=True)
skip = ListItemAttr(str, typeconv=False, reassign_ok=True)
def __init__(self,proto,**kwargs):
def __init__(self, proto, **kwargs):
self.__dict__['proto'] = proto
MMGenListItem.__init__(self,**kwargs)
MMGenListItem.__init__(self, **kwargs)
@property
def coinaddr_list(self):
return [d.addr for d in self.data]
async def __init__(self,cfg,proto,minconf=1,mmgen_addrs='',get_data=False):
async def __init__(self, cfg, proto, minconf=1, mmgen_addrs='', get_data=False):
await super().__init__(cfg,proto)
await super().__init__(cfg, proto)
self.minconf = NonNegativeInt(minconf)
if mmgen_addrs:
a = mmgen_addrs.rsplit(':',1)
a = mmgen_addrs.rsplit(':', 1)
if len(a) != 2:
from ..util import die
die(1,
f'{mmgen_addrs}: invalid address list argument ' +
'(must be in form <seed ID>:[<type>:]<idx list>)' )
'(must be in form <seed ID>:[<type>:]<idx list>)')
from ..addrlist import AddrIdxList
self.usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])]
self.usr_addr_list = [MMGenID(self.proto, f'{a[0]}:{i}') for i in AddrIdxList(a[1])]
else:
self.usr_addr_list = []
@ -94,20 +94,20 @@ class TwAddresses(TwView):
return 'No addresses {}found!'.format(
f'with {self.minconf} confirmations ' if self.minconf else '')
async def gen_data(self,rpc_data,lbl_id):
async def gen_data(self, rpc_data, lbl_id):
return (
self.TwAddress(
self.proto,
twmmid = twmmid,
addr = data['addr'],
al_id = getattr(twmmid.obj,'al_id','_'),
al_id = getattr(twmmid.obj, 'al_id', '_'),
confs = data['confs'],
comment = data['lbl'].comment,
amt = data['amt'],
recvd = data['recvd'],
date = 0,
skip = '' )
for twmmid,data in rpc_data.items()
skip = '')
for twmmid, data in rpc_data.items()
)
def filter_data(self):
@ -120,11 +120,11 @@ class TwAddresses(TwView):
(not (d.recvd and not self.showused) and (d.amt or self.showempty))
)
def get_column_widths(self,data,wide,interactive):
def get_column_widths(self, data, wide, interactive):
return self.compute_column_widths(
widths = { # fixed cols
'num': max(2,len(str(len(data)))+1),
'num': max(2, len(str(len(data)))+1),
'mmid': max(len(d.twmmid.disp) for d in data),
'used': 4,
'amt': self.amt_widths['amt'],
@ -146,11 +146,11 @@ class TwAddresses(TwView):
interactive = interactive,
)
def gen_subheader(self,cw,color):
def gen_subheader(self, cw, color):
if self.minconf:
yield f'Displaying balances with at least {self.minconf} confirmation{suf(self.minconf)}'
def squeezed_col_hdr(self,cw,fs,color):
def squeezed_col_hdr(self, cw, fs, color):
return fs.format(
n = '',
m = 'MMGenID',
@ -158,9 +158,9 @@ class TwAddresses(TwView):
a = 'Address',
c = 'Comment',
A = 'Balance',
d = self.age_hdr )
d = self.age_hdr)
def detail_col_hdr(self,cw,fs,color):
def detail_col_hdr(self, cw, fs, color):
return fs.format(
n = '',
m = 'MMGenID',
@ -169,48 +169,48 @@ class TwAddresses(TwView):
c = 'Comment',
A = 'Balance',
b = 'Block',
D = 'Date/Time' )
D = 'Date/Time')
def squeezed_format_line(self,n,d,cw,fs,color,yes,no):
def squeezed_format_line(self, n, d, cw, fs, color, yes, no):
return fs.format(
n = str(n) + ')',
m = d.twmmid.fmt( width=cw.mmid, color=color ),
m = d.twmmid.fmt(width=cw.mmid, color=color),
u = yes if d.recvd else no,
a = d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color),
c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ),
A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ),
d = self.age_disp( d, self.age_fmt )
c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-'),
A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec),
d = self.age_disp(d, self.age_fmt)
)
def detail_format_line(self,n,d,cw,fs,color,yes,no):
def detail_format_line(self, n, d, cw, fs, color, yes, no):
return fs.format(
n = str(n) + ')',
m = d.twmmid.fmt( width=cw.mmid, color=color ),
m = d.twmmid.fmt(width=cw.mmid, color=color),
u = yes if d.recvd else no,
a = d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color),
c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ),
A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ),
b = self.age_disp( d, 'block' ),
D = self.age_disp( d, 'date_time' ))
c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-'),
A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec),
b = self.age_disp(d, 'block'),
D = self.age_disp(d, 'date_time'))
def gen_display(self,data,cw,fs,color,fmt_method):
def gen_display(self, data, cw, fs, color, fmt_method):
yes,no = (red('Yes '),green('No ')) if color else ('Yes ','No ')
yes, no = (red('Yes '), green('No ')) if color else ('Yes ', 'No ')
id_save = data[0].al_id
for n,d in enumerate(data,1):
for n, d in enumerate(data, 1):
if id_save != d.al_id:
id_save = d.al_id
yield ''.ljust(self.term_width)
yield fmt_method(n,d,cw,fs,color,yes,no)
yield fmt_method(n, d, cw, fs, color, yes, no)
async def set_dates(self,addrs):
async def set_dates(self, addrs):
if not self.dates_set:
bc = self.rpc.blockcount + 1
caddrs = [addr for addr in addrs if addr.confs]
hashes = await self.rpc.gathered_call('getblockhash',[(n,) for n in [bc - a.confs for a in caddrs]])
dates = [d['time'] for d in await self.rpc.gathered_call('getblockheader',[(h,) for h in hashes])]
for idx,addr in enumerate(caddrs):
hashes = await self.rpc.gathered_call('getblockhash', [(n,) for n in [bc - a.confs for a in caddrs]])
dates = [d['time'] for d in await self.rpc.gathered_call('getblockheader', [(h,) for h in hashes])]
for idx, addr in enumerate(caddrs):
addr.date = dates[idx]
self.dates_set = True
@ -240,12 +240,12 @@ class TwAddresses(TwView):
def gen_sid_ranges():
from collections import namedtuple
sid_range = namedtuple('sid_range',['bot','top'])
sid_range = namedtuple('sid_range', ['bot', 'top'])
sid_save = None
bot = None
for n,e in enumerate(self.data):
for n, e in enumerate(self.data):
if e.twmmid.type == 'mmgen':
if e.twmmid.obj.sid != sid_save:
if sid_save:
@ -263,12 +263,12 @@ class TwAddresses(TwView):
assert self.sort_key == 'twmmid'
assert self.reverse is False
if not hasattr(self,'_sid_ranges'):
if not hasattr(self, '_sid_ranges'):
self._sid_ranges = dict(gen_sid_ranges())
return self._sid_ranges
def is_used(self,coinaddr):
def is_used(self, coinaddr):
for e in self.data:
if e.addr == coinaddr:
return bool(e.recvd)
@ -282,7 +282,7 @@ class TwAddresses(TwView):
False: no unused addresses in wallet with requested AddrListID
"""
def get_start(bot,top):
def get_start(bot, top):
"""
bisecting algorithm to find first entry with requested al_id
@ -310,7 +310,7 @@ class TwAddresses(TwView):
data = self.data
start = get_start(
bot = 0 if bot is None else bot,
top = len(data) - 1 if top is None else top )
top = len(data) - 1 if top is None else top)
if start is not None:
for d in data[start:]:
@ -345,7 +345,7 @@ class TwAddresses(TwView):
def choose_address(addrs):
def format_line(n,d):
def format_line(n, d):
return '{a:3}) {b}{c}'.format(
a = n,
b = d.twmmid.hl(),
@ -353,23 +353,23 @@ class TwAddresses(TwView):
)
prompt = '\nChoose a change address:\n\n{}\n\nEnter a number> '.format(
'\n'.join(format_line(n,d) for n,d in enumerate(addrs,1))
'\n'.join(format_line(n, d) for n, d in enumerate(addrs, 1))
)
from ..ui import line_input
while True:
res = line_input( self.cfg, prompt )
res = line_input(self.cfg, prompt)
if is_int(res) and 0 < int(res) <= len(addrs):
return addrs[int(res)-1]
msg(f'{res}: invalid entry')
assert isinstance(mmtype,MMGenAddrType)
assert isinstance(mmtype, MMGenAddrType)
res = [self.get_change_address(f'{sid}:{mmtype}', r.bot, r.top, exclude)
for sid,r in self.sid_ranges.items()]
for sid, r in self.sid_ranges.items()]
if any(res):
res = list(filter(None,res))
res = list(filter(None, res))
if len(res) == 1:
return res[0]
else:
@ -379,11 +379,11 @@ class TwAddresses(TwView):
class display_action(TwView.display_action):
def d_showempty(self,parent):
def d_showempty(self, parent):
parent.showempty = not parent.showempty
def d_showused(self,parent):
def d_showused(self, parent):
parent.showused = (parent.showused + 1) % 3
def d_all_labels(self,parent):
def d_all_labels(self, parent):
parent.all_labels = not parent.all_labels

View file

@ -24,12 +24,12 @@ from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..obj import NonNegativeInt
class TwGetBalance(MMGenObject,metaclass=AsyncInit):
class TwGetBalance(MMGenObject, metaclass=AsyncInit):
def __new__(cls,cfg,proto,*args,**kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw.bal'))
def __new__(cls, cfg, proto, *args, **kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.bal'))
async def __init__(self,cfg,proto,minconf,quiet):
async def __init__(self, cfg, proto, minconf, quiet):
class BalanceInfo(dict):
def __init__(self):
@ -40,7 +40,7 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit):
'ge_minconf': amt0,
'spendable': amt0,
}
dict.__init__(self,**data)
dict.__init__(self, **data)
self.minconf = NonNegativeInt(minconf)
self.balance_info = BalanceInfo
@ -53,7 +53,7 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit):
await self.create_data()
def format(self,color):
def format(self, color):
def gen_output():
@ -64,11 +64,11 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit):
def get_col_iwidth(colname):
return len(str(int(max(v[colname] for v in self.data.values())))) + iwidth_adj
def make_col(label,col):
return self.data[label][col].fmt( iwidth=iwidths[col], color=color )
def make_col(label, col):
return self.data[label][col].fmt(iwidth=iwidths[col], color=color)
if color:
from ..color import green,yellow
from ..color import green, yellow
else:
from ..color import nocolor
green = yellow = nocolor
@ -87,14 +87,14 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit):
lbl = 'Wallet',
w = col1_w + iwidth_adj,
cols = ' '.join(v.format(minconf=self.minconf).ljust(iwidths[k]+add_w)
for k,v in self.conf_cols.items()) ).rstrip()
for k, v in self.conf_cols.items())).rstrip()
from ..addr import MMGenID
for label in sorted(self.data.keys()):
yield '{lbl} {cols}'.format(
lbl = yellow((label + ' ' + self.proto.coin).ljust(col1_w)) if label == 'TOTAL'
else MMGenID.hlc( (label+':').ljust(col1_w), color=color ),
cols = ' '.join(make_col(label,col) for col in self.conf_cols)
else MMGenID.hlc((label+':').ljust(col1_w), color=color),
cols = ' '.join(make_col(label, col) for col in self.conf_cols)
).rstrip()
return '\n'.join(gen_output())

View file

@ -24,39 +24,39 @@ import json
from collections import namedtuple
from pathlib import Path
from ..util import msg,msg_r,suf,die
from ..util import msg, msg_r, suf, die
from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..obj import TwComment,get_obj
from ..addr import CoinAddr,is_mmgen_id,is_coin_addr
from ..obj import TwComment, get_obj
from ..addr import CoinAddr, is_mmgen_id, is_coin_addr
from ..rpc import rpc_init
from .shared import TwMMGenID,TwLabel
from .shared import TwMMGenID, TwLabel
twmmid_addr_pair = namedtuple('addr_info',['twmmid','coinaddr'])
label_addr_pair = namedtuple('label_addr_pair',['label','coinaddr'])
twmmid_addr_pair = namedtuple('addr_info', ['twmmid', 'coinaddr'])
label_addr_pair = namedtuple('label_addr_pair', ['label', 'coinaddr'])
# decorator for TwCtl
def write_mode(orig_func):
def f(self,*args,**kwargs):
def f(self, *args, **kwargs):
if self.mode != 'w':
die(1,'{} opened in read-only mode: cannot execute method {}()'.format(
die(1, '{} opened in read-only mode: cannot execute method {}()'.format(
type(self).__name__,
locals()['orig_func'].__name__
))
return orig_func(self,*args,**kwargs)
return orig_func(self, *args, **kwargs)
return f
class TwCtl(MMGenObject,metaclass=AsyncInit):
class TwCtl(MMGenObject, metaclass=AsyncInit):
caps = ('rescan','batch')
caps = ('rescan', 'batch')
data_key = 'addresses'
use_tw_file = False
aggressive_sync = False
importing = False
tw_fn = 'tracking-wallet.json'
def __new__(cls,cfg,proto,*args,**kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw.ctl'))
def __new__(cls, cfg, proto, *args, **kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.ctl'))
async def __init__(
self,
@ -68,7 +68,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
no_wallet_init = False,
rpc_ignore_wallet = False):
assert mode in ('r','w','i'), f"{mode!r}: wallet mode must be 'r','w' or 'i'"
assert mode in ('r', 'w', 'i'), f"{mode!r}: wallet mode must be 'r', 'w' or 'i'"
if mode == 'i':
self.importing = True
mode = 'w'
@ -103,13 +103,13 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
self.init_empty()
if self.data['coin'] != self.proto.coin: # TODO remove?
die( 'WalletFileError',
die('WalletFileError',
f'Tracking wallet coin ({self.data["coin"]}) does not match current coin ({self.proto.coin})!')
self.conv_types(self.data[self.data_key])
def init_from_wallet_file(self):
from ..fileutil import check_or_create_dir,get_data_from_file
from ..fileutil import check_or_create_dir, get_data_from_file
check_or_create_dir(self.tw_dir)
try:
self.orig_data = get_data_from_file(self.cfg, self.tw_path, quiet=True)
@ -132,7 +132,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
def del_twctl(twctl):
self.cfg._util.dmsg(f'Running exit handler del_twctl() for {twctl!r}')
del twctl
atexit.register(del_twctl,self)
atexit.register(del_twctl, self)
def __del__(self):
"""
@ -147,15 +147,15 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
Since no exceptions are raised, errors will not be caught by the test suite.
"""
if getattr(self,'mode',None) == 'w': # mode attr might not exist in this state
if getattr(self, 'mode', None) == 'w': # mode attr might not exist in this state
self.write()
elif self.cfg.debug:
msg('read-only wallet, doing nothing')
def conv_types(self,ad):
for k,v in ad.items():
if k not in ('params','coin'):
v['mmid'] = TwMMGenID(self.proto,v['mmid'])
def conv_types(self, ad):
for k, v in ad.items():
if k not in ('params', 'coin'):
v['mmid'] = TwMMGenID(self.proto, v['mmid'])
v['comment'] = TwComment(v['comment'])
@property
@ -166,7 +166,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
def data_root_desc(self):
return self.data_key
def cache_balance(self,addr,bal,session_cache,data_root,force=False):
def cache_balance(self, addr, bal, session_cache, data_root, force=False):
if force or addr not in session_cache:
session_cache[addr] = str(bal)
if addr in data_root:
@ -174,7 +174,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
if self.aggressive_sync:
self.write()
def get_cached_balance(self,addr,session_cache,data_root):
def get_cached_balance(self, addr, session_cache, data_root):
if addr in session_cache:
return self.proto.coin_amt(session_cache[addr])
if not self.cfg.cached_balances:
@ -182,11 +182,11 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
if addr in data_root and 'balance' in data_root[addr]:
return self.proto.coin_amt(data_root[addr]['balance'])
async def get_balance(self,addr,force_rpc=False):
ret = None if force_rpc else self.get_cached_balance(addr,self.cur_balances,self.data_root)
async def get_balance(self, addr, force_rpc=False):
ret = None if force_rpc else self.get_cached_balance(addr, self.cur_balances, self.data_root)
if ret is None:
ret = await self.rpc_get_balance(addr)
self.cache_balance(addr,ret,self.cur_balances,self.data_root)
self.cache_balance(addr, ret, self.cur_balances, self.data_root)
return ret
def force_write(self):
@ -196,7 +196,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
self.mode = mode_save
@write_mode
def write_changed(self,data,quiet):
def write_changed(self, data, quiet):
from ..fileutil import write_data_to_file
write_data_to_file(
self.cfg,
@ -207,11 +207,11 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
ignore_opt_outdir = True,
quiet = quiet,
check_data = True, # die if wallet has been altered by another program
cmp_data = self.orig_data )
cmp_data = self.orig_data)
self.orig_data = data
def write(self,quiet=True):
def write(self, quiet=True):
if not self.use_tw_file:
self.cfg._util.dmsg("'use_tw_file' is False, doing nothing")
return
@ -219,21 +219,21 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
wdata = json.dumps(self.data)
if self.orig_data != wdata:
self.write_changed(wdata,quiet=quiet)
self.write_changed(wdata, quiet=quiet)
elif self.cfg.debug:
msg('Data is unchanged\n')
async def resolve_address(self,addrspec):
async def resolve_address(self, addrspec):
twmmid,coinaddr = (None,None)
twmmid, coinaddr = (None, None)
pairs = await self.get_label_addr_pairs()
if is_coin_addr(self.proto,addrspec):
coinaddr = get_obj(CoinAddr,proto=self.proto,addr=addrspec)
if is_coin_addr(self.proto, addrspec):
coinaddr = get_obj(CoinAddr, proto=self.proto, addr=addrspec)
pair_data = [e for e in pairs if e.coinaddr == coinaddr]
elif is_mmgen_id(self.proto,addrspec):
twmmid = TwMMGenID(self.proto,addrspec)
elif is_mmgen_id(self.proto, addrspec):
twmmid = TwMMGenID(self.proto, addrspec)
pair_data = [e for e in pairs if e.label.mmid == twmmid]
else:
msg(f'{addrspec!r}: invalid address for this network')
@ -263,7 +263,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
if not res:
return False
comment = get_obj(TwComment,s=comment)
comment = get_obj(TwComment, s=comment)
if comment is False:
return False
@ -276,38 +276,38 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
if lbl is False:
return False
if await self.set_label(res.coinaddr,lbl):
if await self.set_label(res.coinaddr, lbl):
if not silent:
desc = '{t} address {a} in tracking wallet'.format(
t = res.twmmid.type.replace('mmgen','MMGen'),
t = res.twmmid.type.replace('mmgen', 'MMGen'),
a = res.twmmid.addr.hl() if res.twmmid.type == 'mmgen' else
res.twmmid.addr.hl(res.twmmid.addr.view_pref))
msg(
'Added label {} to {}'.format(comment.hl2(encl='‘’'),desc) if comment else
'Removed label from {}'.format(desc) )
'Added label {} to {}'.format(comment.hl2(encl='‘’'), desc) if comment else
'Removed label from {}'.format(desc))
return True
else:
if not silent:
msg( 'Label could not be {}'.format('added' if comment else 'removed') )
msg('Label could not be {}'.format('added' if comment else 'removed'))
return False
@write_mode
async def remove_comment(self,mmaddr):
await self.set_comment(mmaddr,'')
async def remove_comment(self, mmaddr):
await self.set_comment(mmaddr, '')
async def import_address_common(self,data,batch=False,gather=False):
async def import_address_common(self, data, batch=False, gather=False):
async def do_import(address,comment,message):
async def do_import(address, comment, message):
try:
res = await self.import_address( address, comment )
res = await self.import_address(address, comment)
self.cfg._util.qmsg(message)
return res
except Exception as e:
die(2,f'\nImport of address {address!r} failed: {e.args[0]!r}')
die(2, f'\nImport of address {address!r} failed: {e.args[0]!r}')
_d = namedtuple( 'formatted_import_data', data[0]._fields + ('mmid_disp',))
_d = namedtuple('formatted_import_data', data[0]._fields + ('mmid_disp',))
pfx = self.proto.base_coin.lower() + ':'
fdata = [ _d(*d, 'non-MMGen' if d.twmmid.startswith(pfx) else d.twmmid ) for d in data ]
fdata = [_d(*d, 'non-MMGen' if d.twmmid.startswith(pfx) else d.twmmid) for d in data]
fs = '{:%s}: {:%s} {:%s} - OK' % (
len(str(len(fdata))) * 2 + 1,
@ -317,13 +317,13 @@ class TwCtl(MMGenObject,metaclass=AsyncInit):
nAddrs = len(data)
out = [( # create list, not generator, so we know data is valid before starting import
CoinAddr( self.proto, d.addr ),
TwLabel( self.proto, d.twmmid + (f' {d.comment}' if d.comment else '') ),
fs.format( f'{n}/{nAddrs}', d.addr, f'({d.mmid_disp})' )
) for n,d in enumerate(fdata,1)]
CoinAddr(self.proto, d.addr),
TwLabel(self.proto, d.twmmid + (f' {d.comment}' if d.comment else '')),
fs.format(f'{n}/{nAddrs}', d.addr, f'({d.mmid_disp})')
) for n, d in enumerate(fdata, 1)]
if batch:
msg_r(f'Batch importing {len(out)} address{suf(data,"es")}...')
msg_r(f'Batch importing {len(out)} address{suf(data, "es")}...')
ret = await self.batch_import_address((a, b, False) for a, b, c in out)
msg(f'done\n{len(ret)} addresses imported')
else:

View file

@ -12,10 +12,10 @@
tw.json: export and import tracking wallet to JSON format
"""
import sys,os,json
import sys, os, json
from collections import namedtuple
from ..util import msg,ymsg,fmt,suf,die,make_timestamp,make_chksum_8
from ..util import msg, ymsg, fmt, suf, die, make_timestamp, make_chksum_8
from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..rpc import json_encoder
@ -29,16 +29,16 @@ class TwJSON:
pruned = None
fn_pfx = 'mmgen-tracking-wallet-dump'
def __new__(cls,cfg,proto,*args,**kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(TwJSON,'tw.json',cls.__name__))
def __new__(cls, cfg, proto, *args, **kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(TwJSON, 'tw.json', cls.__name__))
def __init__(self,cfg,proto):
def __init__(self, cfg, proto):
self.cfg = cfg
self.proto = proto
self.coin = proto.coin_id.lower()
self.network = proto.network
self.keys = ['mmgen_id','address','amount','comment']
self.entry_tuple = namedtuple('tw_entry',self.keys)
self.keys = ['mmgen_id', 'address', 'amount', 'comment']
self.entry_tuple = namedtuple('tw_entry', self.keys)
@property
def dump_fn(self):
@ -48,7 +48,7 @@ class TwJSON:
a = self.fn_pfx,
b = f'-pruned[{prune_id}]' if prune_id else '',
c = self.coin,
d = self.network )
d = self.network)
if self.pruned:
from ..addrlist import AddrIdxList
@ -62,16 +62,16 @@ class TwJSON:
return fn
def json_dump(self,data,pretty=False):
def json_dump(self, data, pretty=False):
return json.dumps(
data,
cls = json_encoder,
sort_keys = True,
separators = None if pretty else (',', ':'),
indent = 4 if pretty else None ) + ('\n' if pretty else '')
indent = 4 if pretty else None) + ('\n' if pretty else '')
def make_chksum(self,data):
return make_chksum_8( self.json_dump(data).encode() ).lower()
def make_chksum(self, data):
return make_chksum_8(self.json_dump(data).encode()).lower()
@property
def mappings_chksum(self):
@ -79,9 +79,9 @@ class TwJSON:
@property
def entry_tuple_in(self):
return namedtuple('entry_tuple_in',self.keys)
return namedtuple('entry_tuple_in', self.keys)
class Import(Base,metaclass=AsyncInit):
class Import(Base, metaclass=AsyncInit):
blockchain_rescan_warning = None
@ -91,18 +91,18 @@ class TwJSON:
proto,
filename,
ignore_checksum = False,
batch = False ):
batch = False):
super().__init__(cfg,proto)
super().__init__(cfg, proto)
self.twctl = await TwCtl( cfg, proto, mode='i', rpc_ignore_wallet=True )
self.twctl = await TwCtl(cfg, proto, mode='i', rpc_ignore_wallet=True)
def check_network(data):
coin,network = data['network'].split('_')
coin, network = data['network'].split('_')
if coin != self.coin:
die(2,f'Coin in wallet dump is {coin.upper()}, but configured coin is {self.coin.upper()}')
die(2, f'Coin in wallet dump is {coin.upper()}, but configured coin is {self.coin.upper()}')
if network != self.network:
die(2,f'Network in wallet dump is {network}, but configured network is {self.network}')
die(2, f'Network in wallet dump is {network}, but configured network is {self.network}')
def check_chksum(d):
chksum = self.make_chksum(d['data'])
@ -110,7 +110,7 @@ class TwJSON:
if ignore_checksum:
ymsg(f'Warning: ignoring incorrect checksum {chksum}')
else:
die(3,f'File checksum incorrect! ({chksum} != {d["checksum"]})')
die(3, f'File checksum incorrect! ({chksum} != {d["checksum"]})')
def verify_data(d):
check_network(d['data'])
@ -119,13 +119,13 @@ class TwJSON:
val1 = self.mappings_chksum,
val2 = d['data']['mappings_checksum'],
desc1 = 'computed mappings checksum',
desc2 = 'saved checksum' )
desc2 = 'saved checksum')
if not await self.check_and_create_wallet():
return
from ..fileutil import get_data_from_file
self.data = json.loads(get_data_from_file( self.cfg, filename, quiet=True ))
self.data = json.loads(get_data_from_file(self.cfg, filename, quiet=True))
self.keys = self.data['data']['entries_keys']
self.entries = await self.get_entries()
@ -136,28 +136,28 @@ class TwJSON:
await self.twctl.rescan_addresses(addrs)
if self.blockchain_rescan_warning:
ymsg('\n' + fmt(self.blockchain_rescan_warning.strip(),indent=' '))
ymsg('\n' + fmt(self.blockchain_rescan_warning.strip(), indent=' '))
async def check_and_create_wallet(self):
if await self.tracking_wallet_exists:
die(3,
f'Existing {self.twctl.rpc.daemon.desc} wallet detected!\n' +
'It must be moved, or backed up and securely deleted, before running this command' )
'It must be moved, or backed up and securely deleted, before running this command')
msg('\n'+fmt(self.info_msg.strip(),indent=' '))
msg('\n'+fmt(self.info_msg.strip(), indent=' '))
from ..ui import keypress_confirm
if not keypress_confirm( self.cfg, 'Continue?' ):
if not keypress_confirm(self.cfg, 'Continue?'):
msg('Exiting at user request')
return False
if not await self.create_tracking_wallet():
die(3,'Wallet could not be created')
die(3, 'Wallet could not be created')
return True
class Export(Base,metaclass=AsyncInit):
class Export(Base, metaclass=AsyncInit):
async def __init__(
self,
@ -167,27 +167,27 @@ class TwJSON:
pretty = False,
prune = False,
warn_used = False,
force_overwrite = False ):
force_overwrite = False):
if prune and not self.can_prune:
die(1,f'Pruning not supported for {proto.name} protocol')
die(1, f'Pruning not supported for {proto.name} protocol')
self.prune = prune
self.warn_used = warn_used
super().__init__(cfg,proto)
super().__init__(cfg, proto)
if not include_amts:
self.keys.remove('amount')
self.twctl = await TwCtl( cfg, proto )
self.twctl = await TwCtl(cfg, proto)
self.entries = await self.get_entries()
if self.prune:
msg('Pruned {} address{}'.format( len(self.pruned), suf(self.pruned,'es') ))
msg('Pruned {} address{}'.format(len(self.pruned), suf(self.pruned, 'es')))
msg('Exporting {} address{}'.format( self.num_entries, suf(self.num_entries,'es') ))
msg('Exporting {} address{}'.format(self.num_entries, suf(self.num_entries, 'es')))
data = {
'id': 'mmgen_tracking_wallet',
@ -212,6 +212,6 @@ class TwJSON:
'checksum': self.make_chksum(data),
'data': data
},
pretty = pretty ),
pretty = pretty),
desc = 'tracking wallet JSON data',
ask_overwrite = not force_overwrite )
ask_overwrite = not force_overwrite)

View file

@ -12,11 +12,11 @@
tw.prune: Tracking wallet pruned listaddresses class for the MMGen suite
"""
from ..util import msg,msg_r,rmsg,ymsg
from ..color import red,green,gray,yellow
from ..util import msg, msg_r, rmsg, ymsg
from ..color import red, green, gray, yellow
from ..obj import ListItemAttr
from .addresses import TwAddresses
from .view import CUR_HOME,ERASE_ALL
from .view import CUR_HOME, ERASE_ALL
class TwAddressesPrune(TwAddresses):
@ -24,29 +24,29 @@ class TwAddressesPrune(TwAddresses):
class TwAddress(TwAddresses.TwAddress):
valid_attrs = TwAddresses.TwAddress.valid_attrs | {'tag'}
tag = ListItemAttr(bool,typeconv=False,reassign_ok=True)
tag = ListItemAttr(bool, typeconv=False, reassign_ok=True)
async def __init__(self,*args,warn_used=False,**kwargs):
async def __init__(self, *args, warn_used=False, **kwargs):
self.warn_used = warn_used
await super().__init__(*args,**kwargs)
await super().__init__(*args, **kwargs)
def gen_display(self,data,cw,fs,color,fmt_method):
def gen_display(self, data, cw, fs, color, fmt_method):
id_save = data[0].al_id
yes,no = red('Yes '),green('No ')
yes, no = red('Yes '), green('No ')
for n,d in enumerate(data,1):
for n, d in enumerate(data, 1):
if id_save != d.al_id:
id_save = d.al_id
yield ''.ljust(self.term_width)
yield (
gray(fmt_method(n,d,cw,fs,False,'Yes ','No ')) if d.tag else
fmt_method(n,d,cw,fs,True,yes,no) )
gray(fmt_method(n, d, cw, fs, False, 'Yes ', 'No ')) if d.tag else
fmt_method(n, d, cw, fs, True, yes, no))
def do_prune(self):
def gen():
for n,d in enumerate(self.data,1):
for n, d in enumerate(self.data, 1):
if d.tag:
pruned.append(n)
if d.amt:
@ -65,16 +65,16 @@ class TwAddressesPrune(TwAddresses):
class action(TwAddresses.action):
def get_addrnums(self,parent,desc):
def get_addrnums(self, parent, desc):
prompt = f'Enter a range or space-separated list of addresses to {desc}: '
from ..ui import line_input
msg('')
while True:
reply = line_input( parent.cfg, prompt ).strip()
reply = line_input(parent.cfg, prompt).strip()
if reply:
from ..addrlist import AddrIdxList
from ..obj import get_obj
selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()))
if selected:
if selected[-1] <= len(parent.disp_data):
return selected
@ -82,23 +82,23 @@ class TwAddressesPrune(TwAddresses):
else:
return []
def query_user(self,desc,addrnum,e):
def query_user(self, desc, addrnum, e):
from collections import namedtuple
md = namedtuple('mdata',['wmsg','prompt'])
md = namedtuple('mdata', ['wmsg', 'prompt'])
m = {
'amt': md(
red('Address #{a} ({b}) has a balance of {c}!'.format(
a = addrnum,
b = e.twmmid.addr,
c = e.amt.hl2(color=False,unit=True) )),
c = e.amt.hl2(color=False, unit=True))),
'[p]rune anyway, [P]rune all with balance, [s]kip, [S]kip all with balance: ',
),
'used': md(
yellow('Address #{a} ({b}) is used!'.format(
a = addrnum,
b = e.twmmid.addr )),
b = e.twmmid.addr)),
'[p]rune anyway, [P]rune all used, [s]kip, [S]kip all used: ',
),
}
@ -108,7 +108,7 @@ class TwAddressesPrune(TwAddresses):
msg(m[desc].wmsg)
while True:
res = get_char( m[desc].prompt, immed_chars=valid_res )
res = get_char(m[desc].prompt, immed_chars=valid_res)
if res in valid_res:
msg('')
return {
@ -121,13 +121,13 @@ class TwAddressesPrune(TwAddresses):
else:
msg('\nInvalid keypress')
async def a_prune(self,parent):
async def a_prune(self, parent):
def do_entry(desc,n,addrnum,e):
def do_entry(desc, n, addrnum, e):
if auto[desc]:
return False
else:
auto[desc],prune = self.query_user(desc,addrnum,e)
auto[desc], prune = self.query_user(desc, addrnum, e)
dfl[desc] = auto[desc] and prune
skip_all_used = auto['used'] and not dfl['used']
if auto[desc]: # we’ve switched to auto mode, so go back and fix up all previous entries
@ -145,30 +145,30 @@ class TwAddressesPrune(TwAddresses):
dfl['amt'] = False
return prune
addrnums = self.get_addrnums(parent,'prune')
addrnums = self.get_addrnums(parent, 'prune')
dfl = {'amt': False, 'used': False} # default prune policy for given property (has amt, is used)
auto = {'amt': False, 'used': False} # whether to ask the user, or apply default policy automatically
for n,addrnum in enumerate(addrnums):
for n, addrnum in enumerate(addrnums):
e = parent.disp_data[addrnum-1]
if e.amt and not dfl['amt']:
e.tag = do_entry('amt',n,addrnum,e)
e.tag = do_entry('amt', n, addrnum, e)
elif parent.warn_used and (e.recvd and not e.amt) and not dfl['used']:
e.tag = do_entry('used',n,addrnum,e)
e.tag = do_entry('used', n, addrnum, e)
else:
e.tag = True
if parent.scroll:
msg_r(CUR_HOME + ERASE_ALL)
async def a_unprune(self,parent):
for addrnum in self.get_addrnums(parent,'unprune'):
async def a_unprune(self, parent):
for addrnum in self.get_addrnums(parent, 'unprune'):
parent.disp_data[addrnum-1].tag = False
if parent.scroll:
msg_r(CUR_HOME + ERASE_ALL)
async def a_clear_prune_list(self,parent):
async def a_clear_prune_list(self, parent):
for d in parent.data:
d.tag = False

View file

@ -16,10 +16,10 @@ from ..objmethods import MMGenObject
class TwRPC:
def __new__(cls,proto,*args,**kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw.rpc'))
def __new__(cls, proto, *args, **kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.rpc'))
def __init__(self,proto,rpc,twctl):
def __init__(self, proto, rpc, twctl):
self.proto = proto
self.rpc = rpc
self.twctl = twctl

View file

@ -12,31 +12,31 @@
tw.shared: classes and functions shared by all tracking wallet classes
"""
from ..objmethods import HiliteStr,InitErrors,MMGenObject
from ..objmethods import HiliteStr, InitErrors, MMGenObject
from ..obj import TwComment
from ..addr import MMGenID
class TwMMGenID(HiliteStr,InitErrors,MMGenObject):
class TwMMGenID(HiliteStr, InitErrors, MMGenObject):
color = 'orange'
width = 0
trunc_ok = False
def __new__(cls,proto,id_str):
if isinstance(id_str,cls):
def __new__(cls, proto, id_str):
if isinstance(id_str, cls):
return id_str
try:
ret = addr = disp = MMGenID(proto,id_str)
sort_key,idtype = (ret.sort_key,'mmgen')
ret = addr = disp = MMGenID(proto, id_str)
sort_key, idtype = (ret.sort_key, 'mmgen')
except Exception as e:
try:
coin,addr = id_str.split(':',1)
assert coin == proto.base_coin.lower(),(
f'not a string beginning with the prefix {proto.base_coin.lower()!r}:' )
ret,sort_key,idtype,disp = (id_str,'z_'+id_str,'non-mmgen','non-MMGen')
coin, addr = id_str.split(':', 1)
assert coin == proto.base_coin.lower(), (
f'not a string beginning with the prefix {proto.base_coin.lower()!r}:')
ret, sort_key, idtype, disp = (id_str, 'z_'+id_str, 'non-mmgen', 'non-MMGen')
addr = proto.coin_addr(addr)
except Exception as e2:
return cls.init_fail(e,id_str,e2=e2)
return cls.init_fail(e, id_str, e2=e2)
me = str.__new__(cls,ret)
me = str.__new__(cls, ret)
me.obj = ret
me.disp = disp
me.addr = addr
@ -45,34 +45,34 @@ class TwMMGenID(HiliteStr,InitErrors,MMGenObject):
me.proto = proto
return me
def fmt(self,**kwargs):
return super().fmtc(self.disp,**kwargs)
def fmt(self, **kwargs):
return super().fmtc(self.disp, **kwargs)
# non-displaying container for TwMMGenID,TwComment
class TwLabel(str,InitErrors,MMGenObject):
# non-displaying container for TwMMGenID, TwComment
class TwLabel(str, InitErrors, MMGenObject):
exc = 'BadTwLabel'
passthru_excs = ('BadTwComment',)
def __new__(cls,proto,text):
if isinstance(text,cls):
def __new__(cls, proto, text):
if isinstance(text, cls):
return text
try:
ts = text.split(None,1)
mmid = TwMMGenID(proto,ts[0])
ts = text.split(None, 1)
mmid = TwMMGenID(proto, ts[0])
comment = TwComment(ts[1] if len(ts) == 2 else '')
me = str.__new__( cls, mmid + (' ' + comment if comment else '') )
me = str.__new__(cls, mmid + (' ' + comment if comment else ''))
me.mmid = mmid
me.comment = comment
me.proto = proto
return me
except Exception as e:
return cls.init_fail(e,text)
return cls.init_fail(e, text)
def get_tw_label(proto,s):
def get_tw_label(proto, s):
"""
raise an exception on a malformed comment, return None on an empty or invalid label
"""
try:
return TwLabel(proto,s)
return TwLabel(proto, s)
except Exception as e:
if type(e).__name__ == 'BadTwComment': # do it this way to avoid importing .exception
raise

View file

@ -23,7 +23,7 @@ class TwTxHistory(TwView):
class display_type(TwView.display_type):
class squeezed(TwView.display_type.squeezed):
cols = ('num','txid','date','inputs','amt','outputs','comment')
cols = ('num', 'txid', 'date', 'inputs', 'amt', 'outputs', 'comment')
subhdr_fmt_method = 'gen_squeezed_subheader'
class detail(TwView.display_type.detail):
@ -37,13 +37,13 @@ class TwTxHistory(TwView):
show_unconfirmed = False
show_total_amt = False
update_widths_on_age_toggle = True
print_output_types = ('squeezed','detail')
print_output_types = ('squeezed', 'detail')
filters = ('show_unconfirmed',)
mod_subpath = 'tw.txhistory'
async def __init__(self,cfg,proto,sinceblock=0):
await super().__init__(cfg,proto)
self.sinceblock = NonNegativeInt( sinceblock if sinceblock >= 0 else self.rpc.blockcount + sinceblock )
async def __init__(self, cfg, proto, sinceblock=0):
await super().__init__(cfg, proto)
self.sinceblock = NonNegativeInt(sinceblock if sinceblock >= 0 else self.rpc.blockcount + sinceblock)
@property
def no_rpcdata_errmsg(self):
@ -53,14 +53,14 @@ class TwTxHistory(TwView):
def filter_data(self):
return (d for d in self.data if d.confirmations > 0 or self.show_unconfirmed)
def set_amt_widths(self,data):
amts_tuple = namedtuple('amts_data',['amt'])
def set_amt_widths(self, data):
amts_tuple = namedtuple('amts_data', ['amt'])
return super().set_amt_widths([amts_tuple(d.amt_disp(self.show_total_amt)) for d in data])
def get_column_widths(self,data,wide,interactive):
def get_column_widths(self, data, wide, interactive):
# var cols: inputs outputs comment [txid]
if not hasattr(self,'varcol_maxwidths'):
if not hasattr(self, 'varcol_maxwidths'):
self.varcol_maxwidths = {
'inputs': max(len(d.vouts_disp(
'inputs', width=None, color=False, addr_view_pref=self.addr_view_pref)) for d in data),
@ -85,30 +85,36 @@ class TwTxHistory(TwView):
maxws_nice = {}
widths = { # fixed cols
'num': max(2,len(str(len(data)))+1),
'num': max(2, len(str(len(data)))+1),
'date': self.age_w,
'amt': self.amt_widths['amt'],
'spc': 6 + self.show_txid, # 5(6) spaces between cols + 1 leading space in fs
}
return self.compute_column_widths(widths,maxws,minws,maxws_nice,wide=wide,interactive=interactive)
return self.compute_column_widths(
widths,
maxws,
minws,
maxws_nice,
wide = wide,
interactive = interactive)
def gen_squeezed_subheader(self,cw,color):
def gen_squeezed_subheader(self, cw, color):
# keep these shorter than min screen width (currently prompt width, or 65 chars)
if self.sinceblock:
yield f'Displaying transactions since block {self.sinceblock.hl(color=color)}'
yield 'Only wallet-related outputs are shown'
yield 'Comment is from first wallet address in outputs or inputs'
if (cw.inputs < self.varcol_maxwidths['inputs'] or
cw.outputs < self.varcol_maxwidths['outputs'] ):
cw.outputs < self.varcol_maxwidths['outputs']):
yield 'Note: screen is too narrow to display all inputs and outputs'
def gen_detail_subheader(self,cw,color):
def gen_detail_subheader(self, cw, color):
if self.sinceblock:
yield f'Displaying transactions since block {self.sinceblock.hl(color=color)}'
yield 'Only wallet-related outputs are shown'
def squeezed_col_hdr(self,cw,fs,color):
def squeezed_col_hdr(self, cw, fs, color):
return fs.format(
n = '',
t = 'TxID',
@ -116,21 +122,21 @@ class TwTxHistory(TwView):
i = 'Inputs',
A = 'Amt({})'.format('TX' if self.show_total_amt else 'Wallet'),
o = 'Outputs',
c = 'Comment' )
c = 'Comment')
def gen_squeezed_display(self,data,cw,fs,color,fmt_method):
def gen_squeezed_display(self, data, cw, fs, color, fmt_method):
for n,d in enumerate(data,1):
for n, d in enumerate(data, 1):
yield fs.format(
n = str(n) + ')',
t = d.txid_disp( width=cw.txid, color=color ) if hasattr(cw,'txid') else None,
d = d.age_disp( self.age_fmt, width=self.age_w, color=color ),
t = d.txid_disp(width=cw.txid, color=color) if hasattr(cw, 'txid') else None,
d = d.age_disp(self.age_fmt, width=self.age_w, color=color),
i = d.vouts_disp('inputs', width=cw.inputs, color=color, addr_view_pref=self.addr_view_pref),
A = d.amt_disp(self.show_total_amt).fmt( iwidth=cw.iwidth, prec=self.disp_prec, color=color ),
A = d.amt_disp(self.show_total_amt).fmt(iwidth=cw.iwidth, prec=self.disp_prec, color=color),
o = d.vouts_disp('outputs', width=cw.outputs, color=color, addr_view_pref=self.addr_view_pref),
c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ) )
c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-'))
def gen_detail_display(self,data,cw,fs,color,fmt_method):
def gen_detail_display(self, data, cw, fs, color, fmt_method):
fs = fmt("""
{n}
@ -143,18 +149,18 @@ class TwTxHistory(TwView):
{i}
Outputs ({N}):
{o}
""",strip_char='\t').strip()
""", strip_char='\t').strip()
for n,d in enumerate(data,1):
for n, d in enumerate(data, 1):
yield fs.format(
n = str(n) + ')',
d = d.age_disp( 'date_time', width=None, color=None ),
d = d.age_disp('date_time', width=None, color=None),
b = d.blockheight_disp(color=color),
D = d.txdate_disp( 'date_time' ),
t = d.txid_disp( color=color ),
A = d.amt_disp(show_total_amt=True).hl( color=color ),
B = d.amt_disp(show_total_amt=False).hl( color=color ),
f = d.fee_disp( color=color ),
D = d.txdate_disp('date_time'),
t = d.txid_disp(color=color),
A = d.amt_disp(show_total_amt=True).hl(color=color),
B = d.amt_disp(show_total_amt=False).hl(color=color),
f = d.fee_disp(color=color),
i = d.vouts_list_disp('inputs', color=color, indent=' '*8, addr_view_pref=self.addr_view_pref),
N = d.nOutputs,
o = d.vouts_list_disp('outputs', color=color, indent=' '*8, addr_view_pref=self.addr_view_pref),
@ -176,7 +182,7 @@ class TwTxHistory(TwView):
'txid': lambda i: i.txid,
}
async def set_dates(self,_):
async def set_dates(self, _):
pass
@property
@ -185,24 +191,24 @@ class TwTxHistory(TwView):
class sort_action(TwView.sort_action):
def s_blockheight(self,parent):
def s_blockheight(self, parent):
parent.do_sort('blockheight')
def s_amt(self,parent):
def s_amt(self, parent):
parent.do_sort('amt')
parent.show_total_amt = False
def s_total_amt(self,parent):
def s_total_amt(self, parent):
parent.do_sort('total_amt')
parent.show_total_amt = True
class display_action(TwView.display_action):
def d_show_txid(self,parent):
def d_show_txid(self, parent):
parent.show_txid = not parent.show_txid
def d_show_unconfirmed(self,parent):
def d_show_unconfirmed(self, parent):
parent.show_unconfirmed = not parent.show_unconfirmed
def d_show_total_amt(self,parent):
def d_show_total_amt(self, parent):
parent.show_total_amt = not parent.show_total_amt

View file

@ -28,10 +28,10 @@ from ..obj import (
TwComment,
HexStr,
CoinTxID,
NonNegativeInt )
NonNegativeInt)
from ..addr import CoinAddr
from ..amt import CoinAmtChk
from .shared import TwMMGenID,get_tw_label
from .shared import TwMMGenID, get_tw_label
from .view import TwView
class TwUnspentOutputs(TwView):
@ -39,10 +39,10 @@ class TwUnspentOutputs(TwView):
class display_type(TwView.display_type):
class squeezed(TwView.display_type.squeezed):
cols = ('num','txid','vout','addr','mmid','comment','amt','amt2','date')
cols = ('num', 'txid', 'vout', 'addr', 'mmid', 'comment', 'amt', 'amt2', 'date')
class detail(TwView.display_type.detail):
cols = ('num','txid','vout','addr','mmid','amt','amt2','block','date_time','comment')
cols = ('num', 'txid', 'vout', 'addr', 'mmid', 'amt', 'amt2', 'block', 'date_time', 'comment')
show_mmid = True
no_rpcdata_errmsg = """
@ -58,20 +58,20 @@ class TwUnspentOutputs(TwView):
vout = ListItemAttr(NonNegativeInt)
amt = ImmutableAttr(CoinAmtChk, include_proto=True)
amt2 = ListItemAttr(CoinAmtChk, include_proto=True) # the ETH balance for token account
comment = ListItemAttr(TwComment,reassign_ok=True)
twmmid = ImmutableAttr(TwMMGenID,include_proto=True)
addr = ImmutableAttr(CoinAddr,include_proto=True)
confs = ImmutableAttr(int,typeconv=False)
date = ListItemAttr(int,typeconv=False,reassign_ok=True)
comment = ListItemAttr(TwComment, reassign_ok=True)
twmmid = ImmutableAttr(TwMMGenID, include_proto=True)
addr = ImmutableAttr(CoinAddr, include_proto=True)
confs = ImmutableAttr(int, typeconv=False)
date = ListItemAttr(int, typeconv=False, reassign_ok=True)
scriptPubKey = ImmutableAttr(HexStr)
skip = ListItemAttr(str,typeconv=False,reassign_ok=True)
skip = ListItemAttr(str, typeconv=False, reassign_ok=True)
def __init__(self,proto,**kwargs):
def __init__(self, proto, **kwargs):
self.__dict__['proto'] = proto
MMGenListItem.__init__(self,**kwargs)
MMGenListItem.__init__(self, **kwargs)
async def __init__(self,cfg,proto,minconf=1,addrs=[]):
await super().__init__(cfg,proto)
async def __init__(self, cfg, proto, minconf=1, addrs=[]):
await super().__init__(cfg, proto)
self.minconf = minconf
self.addrs = addrs
from ..cfg import gc
@ -81,23 +81,23 @@ class TwUnspentOutputs(TwView):
def total(self):
return sum(i.amt for i in self.data)
def gen_data(self,rpc_data,lbl_id):
def gen_data(self, rpc_data, lbl_id):
for o in rpc_data:
if not lbl_id in o:
continue # coinbase outputs have no account field
l = get_tw_label(self.proto,o[lbl_id])
l = get_tw_label(self.proto, o[lbl_id])
if l:
if not 'amt' in o:
o['amt'] = self.proto.coin_amt(o['amount'])
o.update({
'twmmid': l.mmid,
'comment': l.comment or '',
'addr': CoinAddr(self.proto,o['address']),
'addr': CoinAddr(self.proto, o['address']),
'confs': o['confirmations']
})
yield self.MMGenTwUnspentOutput(
self.proto,
**{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } )
**{k:v for k, v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs})
def filter_data(self):
@ -106,27 +106,27 @@ class TwUnspentOutputs(TwView):
for d in data:
d.skip = ''
gkeys = {'addr':'addr','twmmid':'addr','txid':'txid'}
gkeys = {'addr': 'addr', 'twmmid': 'addr', 'txid': 'txid'}
if self.group and self.sort_key in gkeys:
for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]:
for a, b in [(data[i], data[i+1]) for i in range(len(data)-1)]:
for k in gkeys:
if self.sort_key == k and getattr(a,k) == getattr(b,k):
if self.sort_key == k and getattr(a, k) == getattr(b, k):
b.skip = gkeys[k]
return data
def get_column_widths(self,data,wide,interactive):
def get_column_widths(self, data, wide, interactive):
show_mmid = self.show_mmid or wide
# num txid vout addr [mmid] [comment] amt [amt2] date
return self.compute_column_widths(
widths = { # fixed cols
'num': max(2,len(str(len(data)))+1),
'num': max(2, len(str(len(data)))+1),
'vout': 4,
'mmid': max(len(d.twmmid.disp) for d in data) if show_mmid else 0,
'amt': self.amt_widths['amt'],
'amt2': self.amt_widths.get('amt2',0),
'amt2': self.amt_widths.get('amt2', 0),
'block': self.age_col_params['block'][0] if wide else 0,
'date_time': self.age_col_params['date_time'][0] if wide else 0,
'date': self.age_w,
@ -147,7 +147,7 @@ class TwUnspentOutputs(TwView):
interactive = interactive,
)
def squeezed_col_hdr(self,cw,fs,color):
def squeezed_col_hdr(self, cw, fs, color):
return fs.format(
n = '',
t = 'TxID',
@ -157,9 +157,9 @@ class TwUnspentOutputs(TwView):
c = 'Comment',
A = 'Amt({})'.format(self.proto.dcoin),
B = 'Amt({})'.format(self.proto.coin),
d = self.age_hdr )
d = self.age_hdr)
def detail_col_hdr(self,cw,fs,color):
def detail_col_hdr(self, cw, fs, color):
return fs.format(
n = '',
t = 'TxID',
@ -170,68 +170,68 @@ class TwUnspentOutputs(TwView):
B = 'Amt({})'.format(self.proto.coin),
b = 'Block',
D = 'Date/Time',
c = 'Comment' )
c = 'Comment')
def gen_squeezed_display(self,data,cw,fs,color,fmt_method):
def gen_squeezed_display(self, data, cw, fs, color, fmt_method):
for n,d in enumerate(data):
for n, d in enumerate(data):
yield fs.format(
n = str(n+1) + ')',
t = (d.txid.fmtc( '|' + '.'*(cw.txid-1), width=cw.txid, color=color ) if d.skip == 'txid'
else d.txid.truncate( width=cw.txid, color=color )) if cw.txid else None,
v = ' ' + d.vout.fmt( width=cw.vout-1, color=color ) if cw.vout else None,
a = d.addr.fmtc( '|' + '.'*(cw.addr-1), width=cw.addr, color=color ) if d.skip == 'addr'
t = (d.txid.fmtc('|' + '.'*(cw.txid-1), width=cw.txid, color=color) if d.skip == 'txid'
else d.txid.truncate(width=cw.txid, color=color)) if cw.txid else None,
v = ' ' + d.vout.fmt(width=cw.vout-1, color=color) if cw.vout else None,
a = d.addr.fmtc('|' + '.'*(cw.addr-1), width=cw.addr, color=color) if d.skip == 'addr'
else d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color),
m = (d.twmmid.fmtc( '.'*cw.mmid, width=cw.mmid, color=color ) if d.skip == 'addr'
else d.twmmid.fmt( width=cw.mmid, color=color )) if cw.mmid else None,
c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ) if cw.comment else None,
A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ),
B = d.amt2.fmt( color=color, iwidth=cw.iwidth2, prec=self.disp_prec ) if cw.amt2 else None,
d = self.age_disp(d,self.age_fmt),
m = (d.twmmid.fmtc('.'*cw.mmid, width=cw.mmid, color=color) if d.skip == 'addr'
else d.twmmid.fmt(width=cw.mmid, color=color)) if cw.mmid else None,
c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-') if cw.comment else None,
A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec),
B = d.amt2.fmt(color=color, iwidth=cw.iwidth2, prec=self.disp_prec) if cw.amt2 else None,
d = self.age_disp(d, self.age_fmt),
)
def gen_detail_display(self,data,cw,fs,color,fmt_method):
def gen_detail_display(self, data, cw, fs, color, fmt_method):
for n,d in enumerate(data):
for n, d in enumerate(data):
yield fs.format(
n = str(n+1) + ')',
t = d.txid.fmt( width=cw.txid, color=color ) if cw.txid else None,
v = ' ' + d.vout.fmt( width=cw.vout-1, color=color ) if cw.vout else None,
t = d.txid.fmt(width=cw.txid, color=color) if cw.txid else None,
v = ' ' + d.vout.fmt(width=cw.vout-1, color=color) if cw.vout else None,
a = d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color),
m = d.twmmid.fmt( width=cw.mmid, color=color ),
A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ),
B = d.amt2.fmt( color=color, iwidth=cw.iwidth2, prec=self.disp_prec ) if cw.amt2 else None,
b = self.age_disp(d,'block'),
D = self.age_disp(d,'date_time'),
c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ))
m = d.twmmid.fmt(width=cw.mmid, color=color),
A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec),
B = d.amt2.fmt(color=color, iwidth=cw.iwidth2, prec=self.disp_prec) if cw.amt2 else None,
b = self.age_disp(d, 'block'),
D = self.age_disp(d, 'date_time'),
c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-'))
def display_total(self):
msg('\nTotal unspent: {} {} ({} output{})'.format(
self.total.hl(),
self.proto.dcoin,
len(self.data),
suf(self.data) ))
suf(self.data)))
async def set_dates(self,us):
async def set_dates(self, us):
if not self.dates_set:
# 'blocktime' differs from 'time', is same as getblockheader['time']
dates = [ o.get('blocktime',0)
for o in await self.rpc.gathered_icall('gettransaction',[(o.txid,True,False) for o in us]) ]
for idx,o in enumerate(us):
dates = [o.get('blocktime', 0)
for o in await self.rpc.gathered_icall('gettransaction', [(o.txid, True, False) for o in us])]
for idx, o in enumerate(us):
o.date = dates[idx]
self.dates_set = True
class sort_action(TwView.sort_action):
def s_twmmid(self,parent):
def s_twmmid(self, parent):
parent.do_sort('twmmid')
parent.show_mmid = True
class display_action(TwView.display_action):
def d_mmid(self,parent):
def d_mmid(self, parent):
parent.show_mmid = not parent.show_mmid
def d_group(self,parent):
def d_group(self, parent):
if parent.can_group:
parent.group = not parent.group

View file

@ -20,14 +20,14 @@
tw.view: base class for tracking wallet view classes
"""
import sys,time,asyncio
import sys, time, asyncio
from collections import namedtuple
from ..cfg import gv
from ..objmethods import MMGenObject
from ..obj import get_obj,MMGenIdx,MMGenList
from ..color import nocolor,yellow,green,red,blue
from ..util import msg,msg_r,fmt,die,capfirst,make_timestr
from ..obj import get_obj, MMGenIdx, MMGenList
from ..color import nocolor, yellow, green, red, blue
from ..util import msg, msg_r, fmt, die, capfirst, make_timestr
from ..rpc import rpc_init
from ..base_obj import AsyncInit
@ -39,17 +39,17 @@ ERASE_ALL = '\033[0J'
# decorator for action.run():
def enable_echo(orig_func):
async def f(self,parent,action_method):
async def f(self, parent, action_method):
if parent.scroll:
parent.term.set('echo')
ret = await orig_func(self,parent,action_method)
ret = await orig_func(self, parent, action_method)
if parent.scroll:
parent.term.set('noecho')
return ret
return f
# base class for TwUnspentOutputs,TwAddresses,TwTxHistory:
class TwView(MMGenObject,metaclass=AsyncInit):
# base class for TwUnspentOutputs, TwAddresses, TwTxHistory:
class TwView(MMGenObject, metaclass=AsyncInit):
class display_type:
@ -77,8 +77,8 @@ class TwView(MMGenObject,metaclass=AsyncInit):
class print:
@staticmethod
def do(method,data,cw,fs,color,fmt_method):
return [l.rstrip() for l in method(data,cw,fs,color,fmt_method)]
def do(method, data, cw, fs, color, fmt_method):
return [l.rstrip() for l in method(data, cw, fs, color, fmt_method)]
has_wallet = True
has_amt2 = False
@ -101,7 +101,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
pos = 0
filters = ()
fp = namedtuple('fs_params',['fs_key','hdr_fs_repl','fs_repl','hdr_fs','fs'])
fp = namedtuple('fs_params', ['fs_key', 'hdr_fs_repl', 'fs_repl', 'hdr_fs', 'fs'])
fs_params = {
'num': fp('n', True, True, ' {n:>%s}', ' {n:>%s}'),
'txid': fp('t', True, False, ' {t:%s}', ' {t}'),
@ -119,8 +119,8 @@ class TwView(MMGenObject,metaclass=AsyncInit):
'outputs': fp('o', True, False, ' {o:%s}', ' {o}'),
}
age_fmts = ('confs','block','days','date','date_time')
age_fmts_date_dependent = ('days','date','date_time')
age_fmts = ('confs', 'block', 'days', 'date', 'date_time')
age_fmts_date_dependent = ('days', 'date', 'date_time')
_age_fmt = 'confs'
bch_addr_fmts = ('cashaddr', 'legacy')
@ -134,12 +134,12 @@ class TwView(MMGenObject,metaclass=AsyncInit):
}
date_formatter = {
'days': lambda rpc,secs: (rpc.cur_date - secs) // 86400 if secs else 0,
'days': lambda rpc, secs: (rpc.cur_date - secs) // 86400 if secs else 0,
'date': (
lambda rpc,secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:]
lambda rpc, secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:]
if secs else '- '),
'date_time': (
lambda rpc,secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5])
lambda rpc, secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5])
if secs else '- '),
}
@ -187,17 +187,17 @@ class TwView(MMGenObject,metaclass=AsyncInit):
}
scroll_keys['darwin'] = scroll_keys['linux']
def __new__(cls,cfg,proto,*args,**kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls,cls.mod_subpath))
def __new__(cls, cfg, proto, *args, **kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls, cls.mod_subpath))
async def __init__(self,cfg,proto):
async def __init__(self, cfg, proto):
self.cfg = cfg
self.proto = proto
self.rpc = await rpc_init(cfg,proto)
self.rpc = await rpc_init(cfg, proto)
if self.has_wallet:
from .ctl import TwCtl
self.twctl = await TwCtl(cfg,proto,mode='w')
self.amt_keys = {'amt':'iwidth','amt2':'iwidth2'} if self.has_amt2 else {'amt':'iwidth'}
self.twctl = await TwCtl(cfg, proto, mode='w')
self.amt_keys = {'amt':'iwidth', 'amt2':'iwidth2'} if self.has_amt2 else {'amt':'iwidth'}
if repl := self.prompt_fs_repl.get(self.proto.coin):
self.prompt_fs_in[repl[0]] = repl[1]
self.prompt_fs = '\n'.join(self.prompt_fs_in)
@ -218,20 +218,20 @@ class TwView(MMGenObject,metaclass=AsyncInit):
return self._age_fmt
@age_fmt.setter
def age_fmt(self,val):
def age_fmt(self, val):
if val not in self.age_fmts:
die( 'BadAgeFormat', f'{val!r}: invalid age format (must be one of {self.age_fmts!r})' )
die('BadAgeFormat', f'{val!r}: invalid age format (must be one of {self.age_fmts!r})')
self._age_fmt = val
def age_disp(self,o,age_fmt):
def age_disp(self, o, age_fmt):
if age_fmt == 'confs':
return o.confs or '-'
elif age_fmt == 'block':
return self.rpc.blockcount + 1 - o.confs if o.confs else '-'
else:
return self.date_formatter[age_fmt](self.rpc,o.date)
return self.date_formatter[age_fmt](self.rpc, o.date)
def get_disp_prec(self,wide):
def get_disp_prec(self, wide):
return self.proto.coin_amt.max_prec
sort_disp = {
@ -250,48 +250,48 @@ class TwView(MMGenObject,metaclass=AsyncInit):
'twmmid': lambda i: '{} {:010} {:024.12f}'.format(i.twmmid.sort_key, 0xffffffff - abs(i.confs), i.amt)
}
def sort_info(self,include_group=True):
ret = ([],['Reverse'])[self.reverse]
def sort_info(self, include_group=True):
ret = ([], ['Reverse'])[self.reverse]
ret.append(self.sort_disp[self.sort_key])
if include_group and self.group and (self.sort_key in ('addr','txid','twmmid')):
if include_group and self.group and (self.sort_key in ('addr', 'txid', 'twmmid')):
ret.append('Grouped')
return ret
def do_sort(self,key=None,reverse=False):
def do_sort(self, key=None, reverse=False):
key = key or self.sort_key
if key not in self.sort_funcs:
die(1,f'{key!r}: invalid sort key. Valid options: {" ".join(self.sort_funcs)}')
die(1, f'{key!r}: invalid sort key. Valid options: {" ".join(self.sort_funcs)}')
self.sort_key = key
assert isinstance(reverse,bool)
assert isinstance(reverse, bool)
save = self.data.copy()
self.data.sort(key=self.sort_funcs[key],reverse=reverse or self.reverse)
self.data.sort(key=self.sort_funcs[key], reverse=reverse or self.reverse)
if self.data != save:
self.pos = 0
async def get_data(self,sort_key=None,reverse_sort=False):
async def get_data(self, sort_key=None, reverse_sort=False):
rpc_data = await self.get_rpc_data()
if not rpc_data:
die(1,fmt(self.no_rpcdata_errmsg).strip())
die(1, fmt(self.no_rpcdata_errmsg).strip())
lbl_id = ('account','label')['label_api' in self.rpc.caps]
lbl_id = ('account', 'label')['label_api' in self.rpc.caps]
res = self.gen_data(rpc_data,lbl_id)
res = self.gen_data(rpc_data, lbl_id)
self.data = MMGenList(await res if type(res).__name__ == 'coroutine' else res)
self.disp_data = list(self.filter_data())
if not self.data:
die(1,self.no_data_errmsg)
die(1, self.no_data_errmsg)
self.do_sort(key=sort_key,reverse=reverse_sort)
self.do_sort(key=sort_key, reverse=reverse_sort)
# get_data() is immediately followed by display header, and get_rpc_data() produces output,
# so add NL here (' ' required because CUR_HOME erases preceding blank lines)
msg(' ')
def get_term_dimensions(self,min_cols,min_lines=None):
from ..term import get_terminal_size,get_char_raw,_term_dimensions
def get_term_dimensions(self, min_cols, min_lines=None):
from ..term import get_terminal_size, get_char_raw, _term_dimensions
user_resized = False
while True:
ts = get_terminal_size()
@ -300,30 +300,30 @@ class TwView(MMGenObject,metaclass=AsyncInit):
if cols >= min_cols and (min_lines is None or lines >= min_lines):
if user_resized:
msg_r(CUR_HOME + ERASE_ALL)
return _term_dimensions(cols,ts.height)
return _term_dimensions(cols, ts.height)
if sys.stdout.isatty():
if self.cfg.columns and cols < min_cols:
die(1,'\n'+fmt(self.twidth_diemsg.format(self.cfg.columns,self.desc,min_cols),indent=' '))
die(1, '\n'+fmt(self.twidth_diemsg.format(self.cfg.columns, self.desc, min_cols), indent=' '))
else:
m,dim = (self.twidth_errmsg,min_cols) if cols < min_cols else (self.theight_errmsg,min_lines)
get_char_raw( CUR_HOME + ERASE_ALL + fmt( m.format(self.desc,dim), append='' ))
m, dim = (self.twidth_errmsg, min_cols) if cols < min_cols else (self.theight_errmsg, min_lines)
get_char_raw(CUR_HOME + ERASE_ALL + fmt(m.format(self.desc, dim), append=''))
user_resized = True
else:
return _term_dimensions(min_cols,ts.height)
return _term_dimensions(min_cols, ts.height)
def compute_column_widths(self,widths,maxws,minws,maxws_nice,wide,interactive):
def compute_column_widths(self, widths, maxws, minws, maxws_nice, wide, interactive):
def do_ret(freews):
widths.update({k:minws[k] + freews.get(k,0) for k in minws})
widths.update({ikey: widths[key] - self.disp_prec - 1 for key,ikey in self.amt_keys.items()})
return namedtuple('column_widths',widths.keys())(*widths.values())
widths.update({k:minws[k] + freews.get(k, 0) for k in minws})
widths.update({ikey: widths[key] - self.disp_prec - 1 for key, ikey in self.amt_keys.items()})
return namedtuple('column_widths', widths.keys())(*widths.values())
def do_ret_max():
widths.update({k:max(minws[k],maxws[k]) for k in minws})
widths.update({ikey: widths[key] - self.disp_prec - 1 for key,ikey in self.amt_keys.items()})
return namedtuple('column_widths',widths.keys())(*widths.values())
widths.update({k:max(minws[k], maxws[k]) for k in minws})
widths.update({ikey: widths[key] - self.disp_prec - 1 for key, ikey in self.amt_keys.items()})
return namedtuple('column_widths', widths.keys())(*widths.values())
def get_freews(cols,varws,varw,minw):
def get_freews(cols, varws, varw, minw):
freew = cols - minw
if freew and varw:
x = freew / varw
@ -343,12 +343,12 @@ class TwView(MMGenObject,metaclass=AsyncInit):
minw = sum(widths.values()) + sum(minws.values())
varw = sum(varws.values())
self.min_term_width = 40 if wide else max(self.prompt_width,minw) if interactive else minw
self.min_term_width = 40 if wide else max(self.prompt_width, minw) if interactive else minw
td = self.get_term_dimensions(self.min_term_width)
self.term_height = td.height
self.term_width = td.width
self.cols = min(self.term_width,minw + varw)
self.cols = min(self.term_width, minw + varw)
if wide or self.cols == minw + varw:
return do_ret_max()
@ -358,33 +358,33 @@ class TwView(MMGenObject,metaclass=AsyncInit):
varws_hp = {k: maxws_nice[k] - minws[k] if k in maxws_nice else varws[k] for k in varws}
varw_hp = sum(varws_hp.values())
widths_hp = get_freews(
min(self.term_width,minw + varw_hp),
min(self.term_width, minw + varw_hp),
varws_hp,
varw_hp,
minw )
minw)
# compute low-priority (nice) widths:
varws_lp = {k: varws[k] - varws_hp[k] for k in maxws_nice if k in varws}
widths_lp = get_freews(
self.cols,
varws_lp,
sum(varws_lp.values()),
minw + sum(widths_hp.values()) )
minw + sum(widths_hp.values()))
# sum the two for each field:
return do_ret({k:widths_hp[k] + widths_lp.get(k,0) for k in varws})
return do_ret({k:widths_hp[k] + widths_lp.get(k, 0) for k in varws})
else:
return do_ret(get_freews(self.cols,varws,varw,minw))
return do_ret(get_freews(self.cols, varws, varw, minw))
def gen_subheader(self,cw,color):
def gen_subheader(self, cw, color):
return ()
def gen_footer(self,color):
if hasattr(self,'total'):
yield 'TOTAL: {} {}'.format( self.total.hl(color=color), self.proto.dcoin )
def gen_footer(self, color):
if hasattr(self, 'total'):
yield 'TOTAL: {} {}'.format(self.total.hl(color=color), self.proto.dcoin)
def set_amt_widths(self,data):
# width of amts column: min(7,width of integer part) + len('.') + width of fractional part
def set_amt_widths(self, data):
# width of amts column: min(7, width of integer part) + len('.') + width of fractional part
self.amt_widths = {
k:min(7,max(len(str(getattr(d,k).to_integral_value())) for d in data)) + 1 + self.disp_prec
k:min(7, max(len(str(getattr(d, k).to_integral_value())) for d in data)) + 1 + self.disp_prec
for k in self.amt_keys}
async def format(
@ -399,57 +399,57 @@ class TwView(MMGenObject,metaclass=AsyncInit):
def gen_hdr():
Blue,Green = (blue,green) if color else (nocolor,nocolor)
Yes,No,All = (green('yes'),red('no'),yellow('all')) if color else ('yes','no','all')
Blue, Green = (blue, green) if color else (nocolor, nocolor)
Yes, No, All = (green('yes'), red('no'), yellow('all')) if color else ('yes', 'no', 'all')
sort_info = ' '.join(self.sort_info())
def fmt_filter(k):
return '{}:{}'.format(k,{0:No,1:Yes,2:All}[getattr(self,k)])
return '{}:{}'.format(k, {0:No, 1:Yes, 2:All}[getattr(self, k)])
yield '{} (sort order: {}){}'.format(
self.hdr_lbl.upper(),
Blue(sort_info),
' ' * (self.cols - len(f'{self.hdr_lbl} (sort order: {sort_info})')) )
' ' * (self.cols - len(f'{self.hdr_lbl} (sort order: {sort_info})')))
if self.filters:
yield 'Filters: {}{}'.format(
' '.join(map(fmt_filter,self.filters)),
' ' * len(self.filters) )
' '.join(map(fmt_filter, self.filters)),
' ' * len(self.filters))
yield 'Network: {}'.format(Green(
self.proto.coin + ' ' + self.proto.chain_name.upper() ))
self.proto.coin + ' ' + self.proto.chain_name.upper()))
yield 'Block {} [{}]'.format(
self.rpc.blockcount.hl(color=color),
make_timestr(self.rpc.cur_date) )
make_timestr(self.rpc.cur_date))
if hasattr(self,'total'):
yield 'Total {}: {}'.format( self.proto.dcoin, self.total.hl(color=color) )
if hasattr(self, 'total'):
yield 'Total {}: {}'.format(self.proto.dcoin, self.total.hl(color=color))
yield from getattr(self,dt.subhdr_fmt_method)(cw,color)
yield from getattr(self, dt.subhdr_fmt_method)(cw, color)
yield ' ' * self.term_width
if data and dt.colhdr_fmt_method:
col_hdr = getattr(self,dt.colhdr_fmt_method)(cw,hdr_fs,color)
col_hdr = getattr(self, dt.colhdr_fmt_method)(cw, hdr_fs, color)
yield col_hdr.rstrip() if line_processing == 'print' else col_hdr
def get_body(method):
if line_processing:
return getattr(self.line_processing,line_processing).do(
method,data,cw,fs,color,getattr(self,dt.line_fmt_method))
return getattr(self.line_processing, line_processing).do(
method, data, cw, fs, color, getattr(self, dt.line_fmt_method))
else:
return method(data,cw,fs,color,getattr(self,dt.line_fmt_method))
return method(data, cw, fs, color, getattr(self, dt.line_fmt_method))
if data and dt.need_column_widths:
self.set_amt_widths(data)
cw = self.get_column_widths(data,wide=dt.detail,interactive=interactive)
cw = self.get_column_widths(data, wide=dt.detail, interactive=interactive)
cwh = cw._asdict()
fp = self.fs_params
rfill = ' ' * (self.term_width - self.cols) if scroll else ''
hdr_fs = ''.join(fp[name].hdr_fs % ((),cwh[name])[fp[name].hdr_fs_repl]
hdr_fs = ''.join(fp[name].hdr_fs % ((), cwh[name])[fp[name].hdr_fs_repl]
for name in dt.cols if cwh[name]) + rfill
fs = ''.join(fp[name].fs % ((),cwh[name])[fp[name].fs_repl]
fs = ''.join(fp[name].fs % ((), cwh[name])[fp[name].fs_repl]
for name in dt.cols if cwh[name]) + rfill
else:
cw = hdr_fs = fs = None
@ -457,14 +457,14 @@ class TwView(MMGenObject,metaclass=AsyncInit):
return (
tuple(gen_hdr()),
tuple(
get_body(getattr(self,dt.fmt_method)) if data else
[(nocolor,yellow)[color](self.nodata_msg.ljust(self.term_width))] )
get_body(getattr(self, dt.fmt_method)) if data else
[(nocolor, yellow)[color](self.nodata_msg.ljust(self.term_width))])
)
if not gv.stdout.isatty():
line_processing = 'print'
dt = getattr(self.display_type,display_type)
dt = getattr(self.display_type, display_type)
if self.use_cached:
self.use_cached = False
@ -482,7 +482,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
if data != dsave:
self.pos = 0
display_hdr,display_body = make_display()
display_hdr, display_body = make_display()
if scroll:
fixed_height = len(display_hdr) + self.prompt_height + 1
@ -490,14 +490,14 @@ class TwView(MMGenObject,metaclass=AsyncInit):
if self.term_height - fixed_height < self.min_scrollable_height:
td = self.get_term_dimensions(
self.min_term_width,
min_lines = self.min_scrollable_height + fixed_height )
min_lines = self.min_scrollable_height + fixed_height)
self.term_height = td.height
self.term_width = td.width
display_hdr,display_body = make_display()
display_hdr, display_body = make_display()
self.scrollable_height = self.term_height - fixed_height
self.max_pos = max(0, len(display_body) - self.scrollable_height)
self.pos = min(self.pos,self.max_pos)
self.pos = min(self.pos, self.max_pos)
if not dt.detail:
self.display_hdr = display_hdr
@ -508,7 +508,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
bot = self.pos + self.scrollable_height
fill = ('\n' + ''.ljust(self.term_width)) * (self.scrollable_height - len(display_body))
else:
top,bot,fill = (None,None,'')
top, bot, fill = (None, None, '')
if interactive:
footer = ''
@ -544,10 +544,10 @@ class TwView(MMGenObject,metaclass=AsyncInit):
scroll = self.scroll = self.cfg.scroll
key_mappings = make_key_mappings(scroll)
action_classes = { k: getattr(self,action_map[v[:2]])() for k,v in key_mappings.items() }
action_methods = { k: getattr(v,key_mappings[k]) for k,v in action_classes.items() }
action_classes = {k: getattr(self, action_map[v[:2]])() for k, v in key_mappings.items()}
action_methods = {k: getattr(v, key_mappings[k]) for k, v in action_classes.items()}
prompt = self.prompt_fs.strip().format(
s='\nScrolling: k=up, j=down, b=pgup, f=pgdown, g=top, G=bottom' if scroll else '' )
s='\nScrolling: k=up, j=down, b=pgup, f=pgdown, g=top, G=bottom' if scroll else '')
self.prompt_width = max(len(l) for l in prompt.split('\n'))
self.prompt_height = len(prompt.split('\n'))
@ -556,7 +556,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
clear_screen = '\n\n' if self.cfg.no_blank else CUR_HOME + ('' if scroll else ERASE_ALL)
from ..term import get_term,get_char,get_char_raw
from ..term import get_term, get_char, get_char_raw
if scroll:
self.term = get_term()
@ -574,16 +574,16 @@ class TwView(MMGenObject,metaclass=AsyncInit):
reply = get_char(
clear_screen
+ await self.format('squeezed',interactive=True,scroll=scroll)
+ await self.format('squeezed', interactive=True, scroll=scroll)
+ '\n\n'
+ (self.oneshot_msg + '\n\n' if self.oneshot_msg and not scroll else '')
+ prompt,
immed_chars = key_mappings )
immed_chars = key_mappings)
self.oneshot_msg = ''
if reply in key_mappings:
ret = action_classes[reply].run(self,action_methods[reply])
ret = action_classes[reply].run(self, action_methods[reply])
if type(ret).__name__ == 'coroutine':
await ret
elif reply == 'q':
@ -600,9 +600,9 @@ class TwView(MMGenObject,metaclass=AsyncInit):
def blank_prompt(self):
return CUR_HOME + CUR_DOWN(self.term_height - self.prompt_height) + ERASE_ALL
def keypress_confirm(self,*args,**kwargs):
def keypress_confirm(self, *args, **kwargs):
from ..ui import keypress_confirm
if keypress_confirm( self.cfg, *args, no_nl=self.scroll, **kwargs ):
if keypress_confirm(self.cfg, *args, no_nl=self.scroll, **kwargs):
return True
else:
if self.scroll:
@ -612,16 +612,16 @@ class TwView(MMGenObject,metaclass=AsyncInit):
class action:
@enable_echo
async def run(self,parent,action_method):
async def run(self, parent, action_method):
return await action_method(parent)
async def a_print_detail(self,parent):
return await self._print(parent,output_type='detail')
async def a_print_detail(self, parent):
return await self._print(parent, output_type='detail')
async def a_print_squeezed(self,parent):
return await self._print(parent,output_type='squeezed')
async def a_print_squeezed(self, parent):
return await self._print(parent, output_type='squeezed')
async def _print(self,parent,output_type):
async def _print(self, parent, output_type):
if not parent.disp_data:
return None
@ -631,9 +631,9 @@ class TwView(MMGenObject,metaclass=AsyncInit):
b = f'-{output_type}' if len(parent.print_output_types) > 1 else '',
c = parent.proto.dcoin,
d = ('' if parent.proto.network == 'mainnet' else '-'+parent.proto.network.upper()),
e = ','.join(parent.sort_info(include_group=False)).replace(' ','') )
e = ','.join(parent.sort_info(include_group=False)).replace(' ', ''))
print_hdr = getattr(parent.display_type,output_type).print_header.format(parent.cols)
print_hdr = getattr(parent.display_type, output_type).print_header.format(parent.cols)
msg_r(parent.blank_prompt if parent.scroll else '\n')
@ -646,28 +646,28 @@ class TwView(MMGenObject,metaclass=AsyncInit):
data = print_hdr + await parent.format(
display_type = output_type,
line_processing = 'print',
color = False ),
desc = f'{parent.desc} listing' )
color = False),
desc = f'{parent.desc} listing')
except UserNonConfirmation:
parent.oneshot_msg = yellow(f'File {outfile!r} not overwritten by user request')
else:
parent.oneshot_msg = green(f'Data written to {outfile!r}')
async def a_view(self,parent):
async def a_view(self, parent):
from ..ui import do_pager
parent.use_cached = True
msg_r(CUR_HOME)
do_pager( await parent.format('squeezed',color=True) )
do_pager(await parent.format('squeezed', color=True))
async def a_view_detail(self,parent):
async def a_view_detail(self, parent):
from ..ui import do_pager
msg_r(CUR_HOME)
do_pager( await parent.format('detail',color=True) )
do_pager(await parent.format('detail', color=True))
class item_action:
@enable_echo
async def run(self,parent,action_method):
async def run(self, parent, action_method):
if not parent.disp_data:
return
@ -677,17 +677,17 @@ class TwView(MMGenObject,metaclass=AsyncInit):
msg_r(parent.blank_prompt if parent.scroll else '\n')
ret = line_input(
parent.cfg,
f'Enter {parent.item_desc} number (or ENTER to return to main menu): ' )
f'Enter {parent.item_desc} number (or ENTER to return to main menu): ')
if ret == '':
if parent.scroll:
msg_r( CUR_UP(1) + '\r' + ''.ljust(parent.term_width) )
msg_r(CUR_UP(1) + '\r' + ''.ljust(parent.term_width))
return
idx = get_obj(MMGenIdx,n=ret,silent=True)
idx = get_obj(MMGenIdx, n=ret, silent=True)
if not idx or idx < 1 or idx > len(parent.disp_data):
msg_r(
'Choice must be a single number between 1 and {n}{s}'.format(
n = len(parent.disp_data),
s = ' ' if parent.scroll else '' ))
s = ' ' if parent.scroll else ''))
if parent.scroll:
await asyncio.sleep(1.5)
msg_r(CUR_UP(1) + '\r' + ERASE_ALL)
@ -697,7 +697,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
# None: action aborted by user or no action performed
# False: an error occurred
# 'redo': user will be re-prompted for item number
ret = await action_method(parent,idx)
ret = await action_method(parent, idx)
if ret != 'redo':
break
await asyncio.sleep(0.5)
@ -706,22 +706,22 @@ class TwView(MMGenObject,metaclass=AsyncInit):
# error messages could leave screen in messy state, so do complete redraw:
msg_r(
CUR_HOME + ERASE_ALL +
await parent.format(display_type='squeezed',interactive=True,scroll=True) )
await parent.format(display_type='squeezed', interactive=True, scroll=True))
async def i_balance_refresh(self,parent,idx):
async def i_balance_refresh(self, parent, idx):
if not parent.keypress_confirm(
f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?' ):
f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?'):
return 'redo'
await parent.twctl.get_balance( parent.disp_data[idx-1].addr, force_rpc=True )
await parent.twctl.get_balance(parent.disp_data[idx-1].addr, force_rpc=True)
await parent.get_data()
parent.oneshot_msg = yellow(f'{parent.proto.dcoin} balance for account #{idx} refreshed')
async def i_addr_delete(self,parent,idx):
async def i_addr_delete(self, parent, idx):
if not parent.keypress_confirm(
'Removing {} {} from tracking wallet. Is this what you want?'.format(
parent.item_desc, red(f'#{idx}') )):
parent.item_desc, red(f'#{idx}'))):
return 'redo'
if await parent.twctl.remove_address( parent.disp_data[idx-1].addr ):
if await parent.twctl.remove_address(parent.disp_data[idx-1].addr):
await parent.get_data()
parent.oneshot_msg = yellow(f'{capfirst(parent.item_desc)} #{idx} removed')
return True
@ -730,7 +730,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
parent.oneshot_msg = red('Address could not be removed')
return False
async def i_comment_add(self,parent,idx):
async def i_comment_add(self, parent, idx):
async def do_comment_add(comment):
@ -744,7 +744,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
parent.oneshot_msg = (green if comment else yellow)('Label {a} {b}{c}'.format(
a = 'for' if edited else 'added to' if comment else 'removed from',
b = desc,
c = ' edited' if edited else '' ))
c = ' edited' if edited else ''))
return True
else:
await asyncio.sleep(3)
@ -762,80 +762,79 @@ class TwView(MMGenObject,metaclass=AsyncInit):
from ..ui import line_input
res = line_input(
parent.cfg,
'Enter label text for {} {}: '.format(parent.item_desc,red(f'#{idx}')),
insert_txt = cur_comment )
'Enter label text for {} {}: '.format(parent.item_desc, red(f'#{idx}')),
insert_txt = cur_comment)
if res == cur_comment:
parent.oneshot_msg = yellow(f'Label for {desc} unchanged')
return None
elif res == '':
if not parent.keypress_confirm(
f'Removing label for {desc}. Is this what you want?' ):
if not parent.keypress_confirm(f'Removing label for {desc}. Is this what you want?'):
return 'redo'
return await do_comment_add(res)
class scroll_action:
def run(self,parent,action_method):
def run(self, parent, action_method):
self.use_cached = True
return action_method(parent)
def m_cursor_up(self,parent):
parent.pos -= min( parent.pos - 0, 1 )
def m_cursor_up(self, parent):
parent.pos -= min(parent.pos - 0, 1)
def m_cursor_down(self,parent):
parent.pos += min( parent.max_pos - parent.pos, 1 )
def m_cursor_down(self, parent):
parent.pos += min(parent.max_pos - parent.pos, 1)
def m_pg_up(self,parent):
parent.pos -= min( parent.scrollable_height, parent.pos - 0 )
def m_pg_up(self, parent):
parent.pos -= min(parent.scrollable_height, parent.pos - 0)
def m_pg_down(self,parent):
parent.pos += min( parent.scrollable_height, parent.max_pos - parent.pos )
def m_pg_down(self, parent):
parent.pos += min(parent.scrollable_height, parent.max_pos - parent.pos)
def m_top(self,parent):
def m_top(self, parent):
parent.pos = 0
def m_bot(self,parent):
def m_bot(self, parent):
parent.pos = parent.max_pos
class sort_action:
def run(self,parent,action_method):
def run(self, parent, action_method):
return action_method(parent)
def s_addr(self,parent):
def s_addr(self, parent):
parent.do_sort('addr')
def s_age(self,parent):
def s_age(self, parent):
parent.do_sort('age')
def s_amt(self,parent):
def s_amt(self, parent):
parent.do_sort('amt')
def s_txid(self,parent):
def s_txid(self, parent):
parent.do_sort('txid')
def s_twmmid(self,parent):
def s_twmmid(self, parent):
parent.do_sort('twmmid')
def s_reverse(self,parent):
def s_reverse(self, parent):
parent.data.reverse()
parent.reverse = not parent.reverse
class display_action:
def run(self,parent,action_method):
def run(self, parent, action_method):
return action_method(parent)
def d_days(self,parent):
def d_days(self, parent):
af = parent.age_fmts
parent.age_fmt = af[(af.index(parent.age_fmt) + 1) % len(af)]
if parent.update_widths_on_age_toggle: # TODO
pass
def d_redraw(self,parent):
def d_redraw(self, parent):
msg_r(CUR_HOME + ERASE_ALL)
def d_addr_view_pref(self,parent):
def d_addr_view_pref(self, parent):
parent.addr_view_pref = (parent.addr_view_pref + 1) % len(parent.bch_addr_fmts)