From 4ea4e5e4729ac4a3b7621e997bb089ac685de699 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Fri, 18 Oct 2024 10:32:10 +0000 Subject: [PATCH] whitespace: tw --- mmgen/tw/addresses.py | 132 +++++++++--------- mmgen/tw/bal.py | 24 ++-- mmgen/tw/ctl.py | 112 +++++++-------- mmgen/tw/json.py | 74 +++++----- mmgen/tw/prune.py | 60 ++++---- mmgen/tw/rpc.py | 6 +- mmgen/tw/shared.py | 48 +++---- mmgen/tw/txhistory.py | 82 ++++++----- mmgen/tw/unspent.py | 116 ++++++++-------- mmgen/tw/view.py | 317 +++++++++++++++++++++--------------------- 10 files changed, 488 insertions(+), 483 deletions(-) diff --git a/mmgen/tw/addresses.py b/mmgen/tw/addresses.py index 71458fa0..e1b15ec8 100755 --- a/mmgen/tw/addresses.py +++ b/mmgen/tw/addresses.py @@ -12,11 +12,11 @@ tw.addresses: Tracking wallet listaddresses class for the MMGen suite """ -from ..util import msg,suf,is_int -from ..obj import MMGenListItem,ImmutableAttr,ListItemAttr,TwComment,NonNegativeInt -from ..addr import CoinAddr,MMGenID,MMGenAddrType +from ..util import msg, suf, is_int +from ..obj import MMGenListItem, ImmutableAttr, ListItemAttr, TwComment, NonNegativeInt +from ..addr import CoinAddr, MMGenID, MMGenAddrType from ..amt import CoinAmtChk -from ..color import red,green,yellow +from ..color import red, green, yellow from .view import TwView from .shared import TwMMGenID @@ -28,7 +28,7 @@ class TwAddresses(TwView): sort_key = 'twmmid' update_widths_on_age_toggle = True print_output_types = ('detail',) - filters = ('showempty','showused','all_labels') + filters = ('showempty', 'showused', 'all_labels') showcoinaddrs = True showempty = True showused = 1 # tristate: 0:no, 1:yes, 2:all @@ -39,50 +39,50 @@ class TwAddresses(TwView): class display_type(TwView.display_type): class squeezed(TwView.display_type.squeezed): - cols = ('num','mmid','used','addr','comment','amt','date') + cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'date') fmt_method = 'gen_display' class detail(TwView.display_type.detail): - cols = ('num','mmid','used','addr','comment','amt','block','date_time') + cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'block', 'date_time') fmt_method = 'gen_display' class TwAddress(MMGenListItem): - valid_attrs = {'twmmid','addr','al_id','confs','comment','amt','recvd','date','skip'} + valid_attrs = {'twmmid', 'addr', 'al_id', 'confs', 'comment', 'amt', 'recvd', 'date', 'skip'} invalid_attrs = {'proto'} - twmmid = ImmutableAttr(TwMMGenID,include_proto=True) # contains confs,txid(unused),date(unused),al_id - addr = ImmutableAttr(CoinAddr,include_proto=True) + twmmid = ImmutableAttr(TwMMGenID, include_proto=True) # contains confs,txid(unused),date(unused),al_id + addr = ImmutableAttr(CoinAddr, include_proto=True) al_id = ImmutableAttr(str) # set to '_' for non-MMGen addresses - confs = ImmutableAttr(int,typeconv=False) - comment = ListItemAttr(TwComment,reassign_ok=True) + confs = ImmutableAttr(int, typeconv=False) + comment = ListItemAttr(TwComment, reassign_ok=True) amt = ImmutableAttr(CoinAmtChk, include_proto=True) recvd = ImmutableAttr(CoinAmtChk, include_proto=True) - date = ListItemAttr(int,typeconv=False,reassign_ok=True) - skip = ListItemAttr(str,typeconv=False,reassign_ok=True) + date = ListItemAttr(int, typeconv=False, reassign_ok=True) + skip = ListItemAttr(str, typeconv=False, reassign_ok=True) - def __init__(self,proto,**kwargs): + def __init__(self, proto, **kwargs): self.__dict__['proto'] = proto - MMGenListItem.__init__(self,**kwargs) + MMGenListItem.__init__(self, **kwargs) @property def coinaddr_list(self): return [d.addr for d in self.data] - async def __init__(self,cfg,proto,minconf=1,mmgen_addrs='',get_data=False): + async def __init__(self, cfg, proto, minconf=1, mmgen_addrs='', get_data=False): - await super().__init__(cfg,proto) + await super().__init__(cfg, proto) self.minconf = NonNegativeInt(minconf) if mmgen_addrs: - a = mmgen_addrs.rsplit(':',1) + a = mmgen_addrs.rsplit(':', 1) if len(a) != 2: from ..util import die die(1, f'{mmgen_addrs}: invalid address list argument ' + - '(must be in form :[:])' ) + '(must be in form :[:])') from ..addrlist import AddrIdxList - self.usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])] + self.usr_addr_list = [MMGenID(self.proto, f'{a[0]}:{i}') for i in AddrIdxList(a[1])] else: self.usr_addr_list = [] @@ -94,20 +94,20 @@ class TwAddresses(TwView): return 'No addresses {}found!'.format( f'with {self.minconf} confirmations ' if self.minconf else '') - async def gen_data(self,rpc_data,lbl_id): + async def gen_data(self, rpc_data, lbl_id): return ( self.TwAddress( self.proto, twmmid = twmmid, addr = data['addr'], - al_id = getattr(twmmid.obj,'al_id','_'), + al_id = getattr(twmmid.obj, 'al_id', '_'), confs = data['confs'], comment = data['lbl'].comment, amt = data['amt'], recvd = data['recvd'], date = 0, - skip = '' ) - for twmmid,data in rpc_data.items() + skip = '') + for twmmid, data in rpc_data.items() ) def filter_data(self): @@ -120,11 +120,11 @@ class TwAddresses(TwView): (not (d.recvd and not self.showused) and (d.amt or self.showempty)) ) - def get_column_widths(self,data,wide,interactive): + def get_column_widths(self, data, wide, interactive): return self.compute_column_widths( widths = { # fixed cols - 'num': max(2,len(str(len(data)))+1), + 'num': max(2, len(str(len(data)))+1), 'mmid': max(len(d.twmmid.disp) for d in data), 'used': 4, 'amt': self.amt_widths['amt'], @@ -146,11 +146,11 @@ class TwAddresses(TwView): interactive = interactive, ) - def gen_subheader(self,cw,color): + def gen_subheader(self, cw, color): if self.minconf: yield f'Displaying balances with at least {self.minconf} confirmation{suf(self.minconf)}' - def squeezed_col_hdr(self,cw,fs,color): + def squeezed_col_hdr(self, cw, fs, color): return fs.format( n = '', m = 'MMGenID', @@ -158,9 +158,9 @@ class TwAddresses(TwView): a = 'Address', c = 'Comment', A = 'Balance', - d = self.age_hdr ) + d = self.age_hdr) - def detail_col_hdr(self,cw,fs,color): + def detail_col_hdr(self, cw, fs, color): return fs.format( n = '', m = 'MMGenID', @@ -169,48 +169,48 @@ class TwAddresses(TwView): c = 'Comment', A = 'Balance', b = 'Block', - D = 'Date/Time' ) + D = 'Date/Time') - def squeezed_format_line(self,n,d,cw,fs,color,yes,no): + def squeezed_format_line(self, n, d, cw, fs, color, yes, no): return fs.format( n = str(n) + ')', - m = d.twmmid.fmt( width=cw.mmid, color=color ), + m = d.twmmid.fmt(width=cw.mmid, color=color), u = yes if d.recvd else no, a = d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color), - c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ), - A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ), - d = self.age_disp( d, self.age_fmt ) + c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-'), + A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec), + d = self.age_disp(d, self.age_fmt) ) - def detail_format_line(self,n,d,cw,fs,color,yes,no): + def detail_format_line(self, n, d, cw, fs, color, yes, no): return fs.format( n = str(n) + ')', - m = d.twmmid.fmt( width=cw.mmid, color=color ), + m = d.twmmid.fmt(width=cw.mmid, color=color), u = yes if d.recvd else no, a = d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color), - c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ), - A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ), - b = self.age_disp( d, 'block' ), - D = self.age_disp( d, 'date_time' )) + c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-'), + A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec), + b = self.age_disp(d, 'block'), + D = self.age_disp(d, 'date_time')) - def gen_display(self,data,cw,fs,color,fmt_method): + def gen_display(self, data, cw, fs, color, fmt_method): - yes,no = (red('Yes '),green('No ')) if color else ('Yes ','No ') + yes, no = (red('Yes '), green('No ')) if color else ('Yes ', 'No ') id_save = data[0].al_id - for n,d in enumerate(data,1): + for n, d in enumerate(data, 1): if id_save != d.al_id: id_save = d.al_id yield ''.ljust(self.term_width) - yield fmt_method(n,d,cw,fs,color,yes,no) + yield fmt_method(n, d, cw, fs, color, yes, no) - async def set_dates(self,addrs): + async def set_dates(self, addrs): if not self.dates_set: bc = self.rpc.blockcount + 1 caddrs = [addr for addr in addrs if addr.confs] - hashes = await self.rpc.gathered_call('getblockhash',[(n,) for n in [bc - a.confs for a in caddrs]]) - dates = [d['time'] for d in await self.rpc.gathered_call('getblockheader',[(h,) for h in hashes])] - for idx,addr in enumerate(caddrs): + hashes = await self.rpc.gathered_call('getblockhash', [(n,) for n in [bc - a.confs for a in caddrs]]) + dates = [d['time'] for d in await self.rpc.gathered_call('getblockheader', [(h,) for h in hashes])] + for idx, addr in enumerate(caddrs): addr.date = dates[idx] self.dates_set = True @@ -240,12 +240,12 @@ class TwAddresses(TwView): def gen_sid_ranges(): from collections import namedtuple - sid_range = namedtuple('sid_range',['bot','top']) + sid_range = namedtuple('sid_range', ['bot', 'top']) sid_save = None bot = None - for n,e in enumerate(self.data): + for n, e in enumerate(self.data): if e.twmmid.type == 'mmgen': if e.twmmid.obj.sid != sid_save: if sid_save: @@ -263,12 +263,12 @@ class TwAddresses(TwView): assert self.sort_key == 'twmmid' assert self.reverse is False - if not hasattr(self,'_sid_ranges'): + if not hasattr(self, '_sid_ranges'): self._sid_ranges = dict(gen_sid_ranges()) return self._sid_ranges - def is_used(self,coinaddr): + def is_used(self, coinaddr): for e in self.data: if e.addr == coinaddr: return bool(e.recvd) @@ -282,7 +282,7 @@ class TwAddresses(TwView): False: no unused addresses in wallet with requested AddrListID """ - def get_start(bot,top): + def get_start(bot, top): """ bisecting algorithm to find first entry with requested al_id @@ -310,7 +310,7 @@ class TwAddresses(TwView): data = self.data start = get_start( bot = 0 if bot is None else bot, - top = len(data) - 1 if top is None else top ) + top = len(data) - 1 if top is None else top) if start is not None: for d in data[start:]: @@ -345,7 +345,7 @@ class TwAddresses(TwView): def choose_address(addrs): - def format_line(n,d): + def format_line(n, d): return '{a:3}) {b}{c}'.format( a = n, b = d.twmmid.hl(), @@ -353,23 +353,23 @@ class TwAddresses(TwView): ) prompt = '\nChoose a change address:\n\n{}\n\nEnter a number> '.format( - '\n'.join(format_line(n,d) for n,d in enumerate(addrs,1)) + '\n'.join(format_line(n, d) for n, d in enumerate(addrs, 1)) ) from ..ui import line_input while True: - res = line_input( self.cfg, prompt ) + res = line_input(self.cfg, prompt) if is_int(res) and 0 < int(res) <= len(addrs): return addrs[int(res)-1] msg(f'{res}: invalid entry') - assert isinstance(mmtype,MMGenAddrType) + assert isinstance(mmtype, MMGenAddrType) res = [self.get_change_address(f'{sid}:{mmtype}', r.bot, r.top, exclude) - for sid,r in self.sid_ranges.items()] + for sid, r in self.sid_ranges.items()] if any(res): - res = list(filter(None,res)) + res = list(filter(None, res)) if len(res) == 1: return res[0] else: @@ -379,11 +379,11 @@ class TwAddresses(TwView): class display_action(TwView.display_action): - def d_showempty(self,parent): + def d_showempty(self, parent): parent.showempty = not parent.showempty - def d_showused(self,parent): + def d_showused(self, parent): parent.showused = (parent.showused + 1) % 3 - def d_all_labels(self,parent): + def d_all_labels(self, parent): parent.all_labels = not parent.all_labels diff --git a/mmgen/tw/bal.py b/mmgen/tw/bal.py index 36bb389b..b16fad99 100755 --- a/mmgen/tw/bal.py +++ b/mmgen/tw/bal.py @@ -24,12 +24,12 @@ from ..base_obj import AsyncInit from ..objmethods import MMGenObject from ..obj import NonNegativeInt -class TwGetBalance(MMGenObject,metaclass=AsyncInit): +class TwGetBalance(MMGenObject, metaclass=AsyncInit): - def __new__(cls,cfg,proto,*args,**kwargs): - return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw.bal')) + def __new__(cls, cfg, proto, *args, **kwargs): + return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.bal')) - async def __init__(self,cfg,proto,minconf,quiet): + async def __init__(self, cfg, proto, minconf, quiet): class BalanceInfo(dict): def __init__(self): @@ -40,7 +40,7 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit): 'ge_minconf': amt0, 'spendable': amt0, } - dict.__init__(self,**data) + dict.__init__(self, **data) self.minconf = NonNegativeInt(minconf) self.balance_info = BalanceInfo @@ -53,7 +53,7 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit): await self.create_data() - def format(self,color): + def format(self, color): def gen_output(): @@ -64,11 +64,11 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit): def get_col_iwidth(colname): return len(str(int(max(v[colname] for v in self.data.values())))) + iwidth_adj - def make_col(label,col): - return self.data[label][col].fmt( iwidth=iwidths[col], color=color ) + def make_col(label, col): + return self.data[label][col].fmt(iwidth=iwidths[col], color=color) if color: - from ..color import green,yellow + from ..color import green, yellow else: from ..color import nocolor green = yellow = nocolor @@ -87,14 +87,14 @@ class TwGetBalance(MMGenObject,metaclass=AsyncInit): lbl = 'Wallet', w = col1_w + iwidth_adj, cols = ' '.join(v.format(minconf=self.minconf).ljust(iwidths[k]+add_w) - for k,v in self.conf_cols.items()) ).rstrip() + for k, v in self.conf_cols.items())).rstrip() from ..addr import MMGenID for label in sorted(self.data.keys()): yield '{lbl} {cols}'.format( lbl = yellow((label + ' ' + self.proto.coin).ljust(col1_w)) if label == 'TOTAL' - else MMGenID.hlc( (label+':').ljust(col1_w), color=color ), - cols = ' '.join(make_col(label,col) for col in self.conf_cols) + else MMGenID.hlc((label+':').ljust(col1_w), color=color), + cols = ' '.join(make_col(label, col) for col in self.conf_cols) ).rstrip() return '\n'.join(gen_output()) diff --git a/mmgen/tw/ctl.py b/mmgen/tw/ctl.py index 55ab78a7..f244ddf2 100755 --- a/mmgen/tw/ctl.py +++ b/mmgen/tw/ctl.py @@ -24,39 +24,39 @@ import json from collections import namedtuple from pathlib import Path -from ..util import msg,msg_r,suf,die +from ..util import msg, msg_r, suf, die from ..base_obj import AsyncInit from ..objmethods import MMGenObject -from ..obj import TwComment,get_obj -from ..addr import CoinAddr,is_mmgen_id,is_coin_addr +from ..obj import TwComment, get_obj +from ..addr import CoinAddr, is_mmgen_id, is_coin_addr from ..rpc import rpc_init -from .shared import TwMMGenID,TwLabel +from .shared import TwMMGenID, TwLabel -twmmid_addr_pair = namedtuple('addr_info',['twmmid','coinaddr']) -label_addr_pair = namedtuple('label_addr_pair',['label','coinaddr']) +twmmid_addr_pair = namedtuple('addr_info', ['twmmid', 'coinaddr']) +label_addr_pair = namedtuple('label_addr_pair', ['label', 'coinaddr']) # decorator for TwCtl def write_mode(orig_func): - def f(self,*args,**kwargs): + def f(self, *args, **kwargs): if self.mode != 'w': - die(1,'{} opened in read-only mode: cannot execute method {}()'.format( + die(1, '{} opened in read-only mode: cannot execute method {}()'.format( type(self).__name__, locals()['orig_func'].__name__ )) - return orig_func(self,*args,**kwargs) + return orig_func(self, *args, **kwargs) return f -class TwCtl(MMGenObject,metaclass=AsyncInit): +class TwCtl(MMGenObject, metaclass=AsyncInit): - caps = ('rescan','batch') + caps = ('rescan', 'batch') data_key = 'addresses' use_tw_file = False aggressive_sync = False importing = False tw_fn = 'tracking-wallet.json' - def __new__(cls,cfg,proto,*args,**kwargs): - return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw.ctl')) + def __new__(cls, cfg, proto, *args, **kwargs): + return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.ctl')) async def __init__( self, @@ -68,7 +68,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): no_wallet_init = False, rpc_ignore_wallet = False): - assert mode in ('r','w','i'), f"{mode!r}: wallet mode must be 'r','w' or 'i'" + assert mode in ('r', 'w', 'i'), f"{mode!r}: wallet mode must be 'r', 'w' or 'i'" if mode == 'i': self.importing = True mode = 'w' @@ -103,13 +103,13 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): self.init_empty() if self.data['coin'] != self.proto.coin: # TODO remove? - die( 'WalletFileError', + die('WalletFileError', f'Tracking wallet coin ({self.data["coin"]}) does not match current coin ({self.proto.coin})!') self.conv_types(self.data[self.data_key]) def init_from_wallet_file(self): - from ..fileutil import check_or_create_dir,get_data_from_file + from ..fileutil import check_or_create_dir, get_data_from_file check_or_create_dir(self.tw_dir) try: self.orig_data = get_data_from_file(self.cfg, self.tw_path, quiet=True) @@ -132,7 +132,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): def del_twctl(twctl): self.cfg._util.dmsg(f'Running exit handler del_twctl() for {twctl!r}') del twctl - atexit.register(del_twctl,self) + atexit.register(del_twctl, self) def __del__(self): """ @@ -147,15 +147,15 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): Since no exceptions are raised, errors will not be caught by the test suite. """ - if getattr(self,'mode',None) == 'w': # mode attr might not exist in this state + if getattr(self, 'mode', None) == 'w': # mode attr might not exist in this state self.write() elif self.cfg.debug: msg('read-only wallet, doing nothing') - def conv_types(self,ad): - for k,v in ad.items(): - if k not in ('params','coin'): - v['mmid'] = TwMMGenID(self.proto,v['mmid']) + def conv_types(self, ad): + for k, v in ad.items(): + if k not in ('params', 'coin'): + v['mmid'] = TwMMGenID(self.proto, v['mmid']) v['comment'] = TwComment(v['comment']) @property @@ -166,7 +166,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): def data_root_desc(self): return self.data_key - def cache_balance(self,addr,bal,session_cache,data_root,force=False): + def cache_balance(self, addr, bal, session_cache, data_root, force=False): if force or addr not in session_cache: session_cache[addr] = str(bal) if addr in data_root: @@ -174,7 +174,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): if self.aggressive_sync: self.write() - def get_cached_balance(self,addr,session_cache,data_root): + def get_cached_balance(self, addr, session_cache, data_root): if addr in session_cache: return self.proto.coin_amt(session_cache[addr]) if not self.cfg.cached_balances: @@ -182,11 +182,11 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): if addr in data_root and 'balance' in data_root[addr]: return self.proto.coin_amt(data_root[addr]['balance']) - async def get_balance(self,addr,force_rpc=False): - ret = None if force_rpc else self.get_cached_balance(addr,self.cur_balances,self.data_root) + async def get_balance(self, addr, force_rpc=False): + ret = None if force_rpc else self.get_cached_balance(addr, self.cur_balances, self.data_root) if ret is None: ret = await self.rpc_get_balance(addr) - self.cache_balance(addr,ret,self.cur_balances,self.data_root) + self.cache_balance(addr, ret, self.cur_balances, self.data_root) return ret def force_write(self): @@ -196,7 +196,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): self.mode = mode_save @write_mode - def write_changed(self,data,quiet): + def write_changed(self, data, quiet): from ..fileutil import write_data_to_file write_data_to_file( self.cfg, @@ -207,11 +207,11 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): ignore_opt_outdir = True, quiet = quiet, check_data = True, # die if wallet has been altered by another program - cmp_data = self.orig_data ) + cmp_data = self.orig_data) self.orig_data = data - def write(self,quiet=True): + def write(self, quiet=True): if not self.use_tw_file: self.cfg._util.dmsg("'use_tw_file' is False, doing nothing") return @@ -219,21 +219,21 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): wdata = json.dumps(self.data) if self.orig_data != wdata: - self.write_changed(wdata,quiet=quiet) + self.write_changed(wdata, quiet=quiet) elif self.cfg.debug: msg('Data is unchanged\n') - async def resolve_address(self,addrspec): + async def resolve_address(self, addrspec): - twmmid,coinaddr = (None,None) + twmmid, coinaddr = (None, None) pairs = await self.get_label_addr_pairs() - if is_coin_addr(self.proto,addrspec): - coinaddr = get_obj(CoinAddr,proto=self.proto,addr=addrspec) + if is_coin_addr(self.proto, addrspec): + coinaddr = get_obj(CoinAddr, proto=self.proto, addr=addrspec) pair_data = [e for e in pairs if e.coinaddr == coinaddr] - elif is_mmgen_id(self.proto,addrspec): - twmmid = TwMMGenID(self.proto,addrspec) + elif is_mmgen_id(self.proto, addrspec): + twmmid = TwMMGenID(self.proto, addrspec) pair_data = [e for e in pairs if e.label.mmid == twmmid] else: msg(f'{addrspec!r}: invalid address for this network') @@ -263,7 +263,7 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): if not res: return False - comment = get_obj(TwComment,s=comment) + comment = get_obj(TwComment, s=comment) if comment is False: return False @@ -276,38 +276,38 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): if lbl is False: return False - if await self.set_label(res.coinaddr,lbl): + if await self.set_label(res.coinaddr, lbl): if not silent: desc = '{t} address {a} in tracking wallet'.format( - t = res.twmmid.type.replace('mmgen','MMGen'), + t = res.twmmid.type.replace('mmgen', 'MMGen'), a = res.twmmid.addr.hl() if res.twmmid.type == 'mmgen' else res.twmmid.addr.hl(res.twmmid.addr.view_pref)) msg( - 'Added label {} to {}'.format(comment.hl2(encl='‘’'),desc) if comment else - 'Removed label from {}'.format(desc) ) + 'Added label {} to {}'.format(comment.hl2(encl='‘’'), desc) if comment else + 'Removed label from {}'.format(desc)) return True else: if not silent: - msg( 'Label could not be {}'.format('added' if comment else 'removed') ) + msg('Label could not be {}'.format('added' if comment else 'removed')) return False @write_mode - async def remove_comment(self,mmaddr): - await self.set_comment(mmaddr,'') + async def remove_comment(self, mmaddr): + await self.set_comment(mmaddr, '') - async def import_address_common(self,data,batch=False,gather=False): + async def import_address_common(self, data, batch=False, gather=False): - async def do_import(address,comment,message): + async def do_import(address, comment, message): try: - res = await self.import_address( address, comment ) + res = await self.import_address(address, comment) self.cfg._util.qmsg(message) return res except Exception as e: - die(2,f'\nImport of address {address!r} failed: {e.args[0]!r}') + die(2, f'\nImport of address {address!r} failed: {e.args[0]!r}') - _d = namedtuple( 'formatted_import_data', data[0]._fields + ('mmid_disp',)) + _d = namedtuple('formatted_import_data', data[0]._fields + ('mmid_disp',)) pfx = self.proto.base_coin.lower() + ':' - fdata = [ _d(*d, 'non-MMGen' if d.twmmid.startswith(pfx) else d.twmmid ) for d in data ] + fdata = [_d(*d, 'non-MMGen' if d.twmmid.startswith(pfx) else d.twmmid) for d in data] fs = '{:%s}: {:%s} {:%s} - OK' % ( len(str(len(fdata))) * 2 + 1, @@ -317,13 +317,13 @@ class TwCtl(MMGenObject,metaclass=AsyncInit): nAddrs = len(data) out = [( # create list, not generator, so we know data is valid before starting import - CoinAddr( self.proto, d.addr ), - TwLabel( self.proto, d.twmmid + (f' {d.comment}' if d.comment else '') ), - fs.format( f'{n}/{nAddrs}', d.addr, f'({d.mmid_disp})' ) - ) for n,d in enumerate(fdata,1)] + CoinAddr(self.proto, d.addr), + TwLabel(self.proto, d.twmmid + (f' {d.comment}' if d.comment else '')), + fs.format(f'{n}/{nAddrs}', d.addr, f'({d.mmid_disp})') + ) for n, d in enumerate(fdata, 1)] if batch: - msg_r(f'Batch importing {len(out)} address{suf(data,"es")}...') + msg_r(f'Batch importing {len(out)} address{suf(data, "es")}...') ret = await self.batch_import_address((a, b, False) for a, b, c in out) msg(f'done\n{len(ret)} addresses imported') else: diff --git a/mmgen/tw/json.py b/mmgen/tw/json.py index d6afa321..ae5e6cf0 100755 --- a/mmgen/tw/json.py +++ b/mmgen/tw/json.py @@ -12,10 +12,10 @@ tw.json: export and import tracking wallet to JSON format """ -import sys,os,json +import sys, os, json from collections import namedtuple -from ..util import msg,ymsg,fmt,suf,die,make_timestamp,make_chksum_8 +from ..util import msg, ymsg, fmt, suf, die, make_timestamp, make_chksum_8 from ..base_obj import AsyncInit from ..objmethods import MMGenObject from ..rpc import json_encoder @@ -29,16 +29,16 @@ class TwJSON: pruned = None fn_pfx = 'mmgen-tracking-wallet-dump' - def __new__(cls,cfg,proto,*args,**kwargs): - return MMGenObject.__new__(proto.base_proto_subclass(TwJSON,'tw.json',cls.__name__)) + def __new__(cls, cfg, proto, *args, **kwargs): + return MMGenObject.__new__(proto.base_proto_subclass(TwJSON, 'tw.json', cls.__name__)) - def __init__(self,cfg,proto): + def __init__(self, cfg, proto): self.cfg = cfg self.proto = proto self.coin = proto.coin_id.lower() self.network = proto.network - self.keys = ['mmgen_id','address','amount','comment'] - self.entry_tuple = namedtuple('tw_entry',self.keys) + self.keys = ['mmgen_id', 'address', 'amount', 'comment'] + self.entry_tuple = namedtuple('tw_entry', self.keys) @property def dump_fn(self): @@ -48,7 +48,7 @@ class TwJSON: a = self.fn_pfx, b = f'-pruned[{prune_id}]' if prune_id else '', c = self.coin, - d = self.network ) + d = self.network) if self.pruned: from ..addrlist import AddrIdxList @@ -62,16 +62,16 @@ class TwJSON: return fn - def json_dump(self,data,pretty=False): + def json_dump(self, data, pretty=False): return json.dumps( data, cls = json_encoder, sort_keys = True, separators = None if pretty else (',', ':'), - indent = 4 if pretty else None ) + ('\n' if pretty else '') + indent = 4 if pretty else None) + ('\n' if pretty else '') - def make_chksum(self,data): - return make_chksum_8( self.json_dump(data).encode() ).lower() + def make_chksum(self, data): + return make_chksum_8(self.json_dump(data).encode()).lower() @property def mappings_chksum(self): @@ -79,9 +79,9 @@ class TwJSON: @property def entry_tuple_in(self): - return namedtuple('entry_tuple_in',self.keys) + return namedtuple('entry_tuple_in', self.keys) - class Import(Base,metaclass=AsyncInit): + class Import(Base, metaclass=AsyncInit): blockchain_rescan_warning = None @@ -91,18 +91,18 @@ class TwJSON: proto, filename, ignore_checksum = False, - batch = False ): + batch = False): - super().__init__(cfg,proto) + super().__init__(cfg, proto) - self.twctl = await TwCtl( cfg, proto, mode='i', rpc_ignore_wallet=True ) + self.twctl = await TwCtl(cfg, proto, mode='i', rpc_ignore_wallet=True) def check_network(data): - coin,network = data['network'].split('_') + coin, network = data['network'].split('_') if coin != self.coin: - die(2,f'Coin in wallet dump is {coin.upper()}, but configured coin is {self.coin.upper()}') + die(2, f'Coin in wallet dump is {coin.upper()}, but configured coin is {self.coin.upper()}') if network != self.network: - die(2,f'Network in wallet dump is {network}, but configured network is {self.network}') + die(2, f'Network in wallet dump is {network}, but configured network is {self.network}') def check_chksum(d): chksum = self.make_chksum(d['data']) @@ -110,7 +110,7 @@ class TwJSON: if ignore_checksum: ymsg(f'Warning: ignoring incorrect checksum {chksum}') else: - die(3,f'File checksum incorrect! ({chksum} != {d["checksum"]})') + die(3, f'File checksum incorrect! ({chksum} != {d["checksum"]})') def verify_data(d): check_network(d['data']) @@ -119,13 +119,13 @@ class TwJSON: val1 = self.mappings_chksum, val2 = d['data']['mappings_checksum'], desc1 = 'computed mappings checksum', - desc2 = 'saved checksum' ) + desc2 = 'saved checksum') if not await self.check_and_create_wallet(): return from ..fileutil import get_data_from_file - self.data = json.loads(get_data_from_file( self.cfg, filename, quiet=True )) + self.data = json.loads(get_data_from_file(self.cfg, filename, quiet=True)) self.keys = self.data['data']['entries_keys'] self.entries = await self.get_entries() @@ -136,28 +136,28 @@ class TwJSON: await self.twctl.rescan_addresses(addrs) if self.blockchain_rescan_warning: - ymsg('\n' + fmt(self.blockchain_rescan_warning.strip(),indent=' ')) + ymsg('\n' + fmt(self.blockchain_rescan_warning.strip(), indent=' ')) async def check_and_create_wallet(self): if await self.tracking_wallet_exists: die(3, f'Existing {self.twctl.rpc.daemon.desc} wallet detected!\n' + - 'It must be moved, or backed up and securely deleted, before running this command' ) + 'It must be moved, or backed up and securely deleted, before running this command') - msg('\n'+fmt(self.info_msg.strip(),indent=' ')) + msg('\n'+fmt(self.info_msg.strip(), indent=' ')) from ..ui import keypress_confirm - if not keypress_confirm( self.cfg, 'Continue?' ): + if not keypress_confirm(self.cfg, 'Continue?'): msg('Exiting at user request') return False if not await self.create_tracking_wallet(): - die(3,'Wallet could not be created') + die(3, 'Wallet could not be created') return True - class Export(Base,metaclass=AsyncInit): + class Export(Base, metaclass=AsyncInit): async def __init__( self, @@ -167,27 +167,27 @@ class TwJSON: pretty = False, prune = False, warn_used = False, - force_overwrite = False ): + force_overwrite = False): if prune and not self.can_prune: - die(1,f'Pruning not supported for {proto.name} protocol') + die(1, f'Pruning not supported for {proto.name} protocol') self.prune = prune self.warn_used = warn_used - super().__init__(cfg,proto) + super().__init__(cfg, proto) if not include_amts: self.keys.remove('amount') - self.twctl = await TwCtl( cfg, proto ) + self.twctl = await TwCtl(cfg, proto) self.entries = await self.get_entries() if self.prune: - msg('Pruned {} address{}'.format( len(self.pruned), suf(self.pruned,'es') )) + msg('Pruned {} address{}'.format(len(self.pruned), suf(self.pruned, 'es'))) - msg('Exporting {} address{}'.format( self.num_entries, suf(self.num_entries,'es') )) + msg('Exporting {} address{}'.format(self.num_entries, suf(self.num_entries, 'es'))) data = { 'id': 'mmgen_tracking_wallet', @@ -212,6 +212,6 @@ class TwJSON: 'checksum': self.make_chksum(data), 'data': data }, - pretty = pretty ), + pretty = pretty), desc = 'tracking wallet JSON data', - ask_overwrite = not force_overwrite ) + ask_overwrite = not force_overwrite) diff --git a/mmgen/tw/prune.py b/mmgen/tw/prune.py index f13697f1..76eb8ed5 100755 --- a/mmgen/tw/prune.py +++ b/mmgen/tw/prune.py @@ -12,11 +12,11 @@ tw.prune: Tracking wallet pruned listaddresses class for the MMGen suite """ -from ..util import msg,msg_r,rmsg,ymsg -from ..color import red,green,gray,yellow +from ..util import msg, msg_r, rmsg, ymsg +from ..color import red, green, gray, yellow from ..obj import ListItemAttr from .addresses import TwAddresses -from .view import CUR_HOME,ERASE_ALL +from .view import CUR_HOME, ERASE_ALL class TwAddressesPrune(TwAddresses): @@ -24,29 +24,29 @@ class TwAddressesPrune(TwAddresses): class TwAddress(TwAddresses.TwAddress): valid_attrs = TwAddresses.TwAddress.valid_attrs | {'tag'} - tag = ListItemAttr(bool,typeconv=False,reassign_ok=True) + tag = ListItemAttr(bool, typeconv=False, reassign_ok=True) - async def __init__(self,*args,warn_used=False,**kwargs): + async def __init__(self, *args, warn_used=False, **kwargs): self.warn_used = warn_used - await super().__init__(*args,**kwargs) + await super().__init__(*args, **kwargs) - def gen_display(self,data,cw,fs,color,fmt_method): + def gen_display(self, data, cw, fs, color, fmt_method): id_save = data[0].al_id - yes,no = red('Yes '),green('No ') + yes, no = red('Yes '), green('No ') - for n,d in enumerate(data,1): + for n, d in enumerate(data, 1): if id_save != d.al_id: id_save = d.al_id yield ''.ljust(self.term_width) yield ( - gray(fmt_method(n,d,cw,fs,False,'Yes ','No ')) if d.tag else - fmt_method(n,d,cw,fs,True,yes,no) ) + gray(fmt_method(n, d, cw, fs, False, 'Yes ', 'No ')) if d.tag else + fmt_method(n, d, cw, fs, True, yes, no)) def do_prune(self): def gen(): - for n,d in enumerate(self.data,1): + for n, d in enumerate(self.data, 1): if d.tag: pruned.append(n) if d.amt: @@ -65,16 +65,16 @@ class TwAddressesPrune(TwAddresses): class action(TwAddresses.action): - def get_addrnums(self,parent,desc): + def get_addrnums(self, parent, desc): prompt = f'Enter a range or space-separated list of addresses to {desc}: ' from ..ui import line_input msg('') while True: - reply = line_input( parent.cfg, prompt ).strip() + reply = line_input(parent.cfg, prompt).strip() if reply: from ..addrlist import AddrIdxList from ..obj import get_obj - selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) ) + selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split())) if selected: if selected[-1] <= len(parent.disp_data): return selected @@ -82,23 +82,23 @@ class TwAddressesPrune(TwAddresses): else: return [] - def query_user(self,desc,addrnum,e): + def query_user(self, desc, addrnum, e): from collections import namedtuple - md = namedtuple('mdata',['wmsg','prompt']) + md = namedtuple('mdata', ['wmsg', 'prompt']) m = { 'amt': md( red('Address #{a} ({b}) has a balance of {c}!'.format( a = addrnum, b = e.twmmid.addr, - c = e.amt.hl2(color=False,unit=True) )), + c = e.amt.hl2(color=False, unit=True))), '[p]rune anyway, [P]rune all with balance, [s]kip, [S]kip all with balance: ', ), 'used': md( yellow('Address #{a} ({b}) is used!'.format( a = addrnum, - b = e.twmmid.addr )), + b = e.twmmid.addr)), '[p]rune anyway, [P]rune all used, [s]kip, [S]kip all used: ', ), } @@ -108,7 +108,7 @@ class TwAddressesPrune(TwAddresses): msg(m[desc].wmsg) while True: - res = get_char( m[desc].prompt, immed_chars=valid_res ) + res = get_char(m[desc].prompt, immed_chars=valid_res) if res in valid_res: msg('') return { @@ -121,13 +121,13 @@ class TwAddressesPrune(TwAddresses): else: msg('\nInvalid keypress') - async def a_prune(self,parent): + async def a_prune(self, parent): - def do_entry(desc,n,addrnum,e): + def do_entry(desc, n, addrnum, e): if auto[desc]: return False else: - auto[desc],prune = self.query_user(desc,addrnum,e) + auto[desc], prune = self.query_user(desc, addrnum, e) dfl[desc] = auto[desc] and prune skip_all_used = auto['used'] and not dfl['used'] if auto[desc]: # we’ve switched to auto mode, so go back and fix up all previous entries @@ -145,30 +145,30 @@ class TwAddressesPrune(TwAddresses): dfl['amt'] = False return prune - addrnums = self.get_addrnums(parent,'prune') + addrnums = self.get_addrnums(parent, 'prune') dfl = {'amt': False, 'used': False} # default prune policy for given property (has amt, is used) auto = {'amt': False, 'used': False} # whether to ask the user, or apply default policy automatically - for n,addrnum in enumerate(addrnums): + for n, addrnum in enumerate(addrnums): e = parent.disp_data[addrnum-1] if e.amt and not dfl['amt']: - e.tag = do_entry('amt',n,addrnum,e) + e.tag = do_entry('amt', n, addrnum, e) elif parent.warn_used and (e.recvd and not e.amt) and not dfl['used']: - e.tag = do_entry('used',n,addrnum,e) + e.tag = do_entry('used', n, addrnum, e) else: e.tag = True if parent.scroll: msg_r(CUR_HOME + ERASE_ALL) - async def a_unprune(self,parent): - for addrnum in self.get_addrnums(parent,'unprune'): + async def a_unprune(self, parent): + for addrnum in self.get_addrnums(parent, 'unprune'): parent.disp_data[addrnum-1].tag = False if parent.scroll: msg_r(CUR_HOME + ERASE_ALL) - async def a_clear_prune_list(self,parent): + async def a_clear_prune_list(self, parent): for d in parent.data: d.tag = False diff --git a/mmgen/tw/rpc.py b/mmgen/tw/rpc.py index 1d425989..24273051 100755 --- a/mmgen/tw/rpc.py +++ b/mmgen/tw/rpc.py @@ -16,10 +16,10 @@ from ..objmethods import MMGenObject class TwRPC: - def __new__(cls,proto,*args,**kwargs): - return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw.rpc')) + def __new__(cls, proto, *args, **kwargs): + return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.rpc')) - def __init__(self,proto,rpc,twctl): + def __init__(self, proto, rpc, twctl): self.proto = proto self.rpc = rpc self.twctl = twctl diff --git a/mmgen/tw/shared.py b/mmgen/tw/shared.py index 38b799e0..16eab905 100755 --- a/mmgen/tw/shared.py +++ b/mmgen/tw/shared.py @@ -12,31 +12,31 @@ tw.shared: classes and functions shared by all tracking wallet classes """ -from ..objmethods import HiliteStr,InitErrors,MMGenObject +from ..objmethods import HiliteStr, InitErrors, MMGenObject from ..obj import TwComment from ..addr import MMGenID -class TwMMGenID(HiliteStr,InitErrors,MMGenObject): +class TwMMGenID(HiliteStr, InitErrors, MMGenObject): color = 'orange' width = 0 trunc_ok = False - def __new__(cls,proto,id_str): - if isinstance(id_str,cls): + def __new__(cls, proto, id_str): + if isinstance(id_str, cls): return id_str try: - ret = addr = disp = MMGenID(proto,id_str) - sort_key,idtype = (ret.sort_key,'mmgen') + ret = addr = disp = MMGenID(proto, id_str) + sort_key, idtype = (ret.sort_key, 'mmgen') except Exception as e: try: - coin,addr = id_str.split(':',1) - assert coin == proto.base_coin.lower(),( - f'not a string beginning with the prefix {proto.base_coin.lower()!r}:' ) - ret,sort_key,idtype,disp = (id_str,'z_'+id_str,'non-mmgen','non-MMGen') + coin, addr = id_str.split(':', 1) + assert coin == proto.base_coin.lower(), ( + f'not a string beginning with the prefix {proto.base_coin.lower()!r}:') + ret, sort_key, idtype, disp = (id_str, 'z_'+id_str, 'non-mmgen', 'non-MMGen') addr = proto.coin_addr(addr) except Exception as e2: - return cls.init_fail(e,id_str,e2=e2) + return cls.init_fail(e, id_str, e2=e2) - me = str.__new__(cls,ret) + me = str.__new__(cls, ret) me.obj = ret me.disp = disp me.addr = addr @@ -45,34 +45,34 @@ class TwMMGenID(HiliteStr,InitErrors,MMGenObject): me.proto = proto return me - def fmt(self,**kwargs): - return super().fmtc(self.disp,**kwargs) + def fmt(self, **kwargs): + return super().fmtc(self.disp, **kwargs) -# non-displaying container for TwMMGenID,TwComment -class TwLabel(str,InitErrors,MMGenObject): +# non-displaying container for TwMMGenID, TwComment +class TwLabel(str, InitErrors, MMGenObject): exc = 'BadTwLabel' passthru_excs = ('BadTwComment',) - def __new__(cls,proto,text): - if isinstance(text,cls): + def __new__(cls, proto, text): + if isinstance(text, cls): return text try: - ts = text.split(None,1) - mmid = TwMMGenID(proto,ts[0]) + ts = text.split(None, 1) + mmid = TwMMGenID(proto, ts[0]) comment = TwComment(ts[1] if len(ts) == 2 else '') - me = str.__new__( cls, mmid + (' ' + comment if comment else '') ) + me = str.__new__(cls, mmid + (' ' + comment if comment else '')) me.mmid = mmid me.comment = comment me.proto = proto return me except Exception as e: - return cls.init_fail(e,text) + return cls.init_fail(e, text) -def get_tw_label(proto,s): +def get_tw_label(proto, s): """ raise an exception on a malformed comment, return None on an empty or invalid label """ try: - return TwLabel(proto,s) + return TwLabel(proto, s) except Exception as e: if type(e).__name__ == 'BadTwComment': # do it this way to avoid importing .exception raise diff --git a/mmgen/tw/txhistory.py b/mmgen/tw/txhistory.py index a4e2f181..ed051367 100755 --- a/mmgen/tw/txhistory.py +++ b/mmgen/tw/txhistory.py @@ -23,7 +23,7 @@ class TwTxHistory(TwView): class display_type(TwView.display_type): class squeezed(TwView.display_type.squeezed): - cols = ('num','txid','date','inputs','amt','outputs','comment') + cols = ('num', 'txid', 'date', 'inputs', 'amt', 'outputs', 'comment') subhdr_fmt_method = 'gen_squeezed_subheader' class detail(TwView.display_type.detail): @@ -37,13 +37,13 @@ class TwTxHistory(TwView): show_unconfirmed = False show_total_amt = False update_widths_on_age_toggle = True - print_output_types = ('squeezed','detail') + print_output_types = ('squeezed', 'detail') filters = ('show_unconfirmed',) mod_subpath = 'tw.txhistory' - async def __init__(self,cfg,proto,sinceblock=0): - await super().__init__(cfg,proto) - self.sinceblock = NonNegativeInt( sinceblock if sinceblock >= 0 else self.rpc.blockcount + sinceblock ) + async def __init__(self, cfg, proto, sinceblock=0): + await super().__init__(cfg, proto) + self.sinceblock = NonNegativeInt(sinceblock if sinceblock >= 0 else self.rpc.blockcount + sinceblock) @property def no_rpcdata_errmsg(self): @@ -53,14 +53,14 @@ class TwTxHistory(TwView): def filter_data(self): return (d for d in self.data if d.confirmations > 0 or self.show_unconfirmed) - def set_amt_widths(self,data): - amts_tuple = namedtuple('amts_data',['amt']) + def set_amt_widths(self, data): + amts_tuple = namedtuple('amts_data', ['amt']) return super().set_amt_widths([amts_tuple(d.amt_disp(self.show_total_amt)) for d in data]) - def get_column_widths(self,data,wide,interactive): + def get_column_widths(self, data, wide, interactive): # var cols: inputs outputs comment [txid] - if not hasattr(self,'varcol_maxwidths'): + if not hasattr(self, 'varcol_maxwidths'): self.varcol_maxwidths = { 'inputs': max(len(d.vouts_disp( 'inputs', width=None, color=False, addr_view_pref=self.addr_view_pref)) for d in data), @@ -85,30 +85,36 @@ class TwTxHistory(TwView): maxws_nice = {} widths = { # fixed cols - 'num': max(2,len(str(len(data)))+1), + 'num': max(2, len(str(len(data)))+1), 'date': self.age_w, 'amt': self.amt_widths['amt'], 'spc': 6 + self.show_txid, # 5(6) spaces between cols + 1 leading space in fs } - return self.compute_column_widths(widths,maxws,minws,maxws_nice,wide=wide,interactive=interactive) + return self.compute_column_widths( + widths, + maxws, + minws, + maxws_nice, + wide = wide, + interactive = interactive) - def gen_squeezed_subheader(self,cw,color): + def gen_squeezed_subheader(self, cw, color): # keep these shorter than min screen width (currently prompt width, or 65 chars) if self.sinceblock: yield f'Displaying transactions since block {self.sinceblock.hl(color=color)}' yield 'Only wallet-related outputs are shown' yield 'Comment is from first wallet address in outputs or inputs' if (cw.inputs < self.varcol_maxwidths['inputs'] or - cw.outputs < self.varcol_maxwidths['outputs'] ): + cw.outputs < self.varcol_maxwidths['outputs']): yield 'Note: screen is too narrow to display all inputs and outputs' - def gen_detail_subheader(self,cw,color): + def gen_detail_subheader(self, cw, color): if self.sinceblock: yield f'Displaying transactions since block {self.sinceblock.hl(color=color)}' yield 'Only wallet-related outputs are shown' - def squeezed_col_hdr(self,cw,fs,color): + def squeezed_col_hdr(self, cw, fs, color): return fs.format( n = '', t = 'TxID', @@ -116,21 +122,21 @@ class TwTxHistory(TwView): i = 'Inputs', A = 'Amt({})'.format('TX' if self.show_total_amt else 'Wallet'), o = 'Outputs', - c = 'Comment' ) + c = 'Comment') - def gen_squeezed_display(self,data,cw,fs,color,fmt_method): + def gen_squeezed_display(self, data, cw, fs, color, fmt_method): - for n,d in enumerate(data,1): + for n, d in enumerate(data, 1): yield fs.format( n = str(n) + ')', - t = d.txid_disp( width=cw.txid, color=color ) if hasattr(cw,'txid') else None, - d = d.age_disp( self.age_fmt, width=self.age_w, color=color ), + t = d.txid_disp(width=cw.txid, color=color) if hasattr(cw, 'txid') else None, + d = d.age_disp(self.age_fmt, width=self.age_w, color=color), i = d.vouts_disp('inputs', width=cw.inputs, color=color, addr_view_pref=self.addr_view_pref), - A = d.amt_disp(self.show_total_amt).fmt( iwidth=cw.iwidth, prec=self.disp_prec, color=color ), + A = d.amt_disp(self.show_total_amt).fmt(iwidth=cw.iwidth, prec=self.disp_prec, color=color), o = d.vouts_disp('outputs', width=cw.outputs, color=color, addr_view_pref=self.addr_view_pref), - c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ) ) + c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-')) - def gen_detail_display(self,data,cw,fs,color,fmt_method): + def gen_detail_display(self, data, cw, fs, color, fmt_method): fs = fmt(""" {n} @@ -143,18 +149,18 @@ class TwTxHistory(TwView): {i} Outputs ({N}): {o} - """,strip_char='\t').strip() + """, strip_char='\t').strip() - for n,d in enumerate(data,1): + for n, d in enumerate(data, 1): yield fs.format( n = str(n) + ')', - d = d.age_disp( 'date_time', width=None, color=None ), + d = d.age_disp('date_time', width=None, color=None), b = d.blockheight_disp(color=color), - D = d.txdate_disp( 'date_time' ), - t = d.txid_disp( color=color ), - A = d.amt_disp(show_total_amt=True).hl( color=color ), - B = d.amt_disp(show_total_amt=False).hl( color=color ), - f = d.fee_disp( color=color ), + D = d.txdate_disp('date_time'), + t = d.txid_disp(color=color), + A = d.amt_disp(show_total_amt=True).hl(color=color), + B = d.amt_disp(show_total_amt=False).hl(color=color), + f = d.fee_disp(color=color), i = d.vouts_list_disp('inputs', color=color, indent=' '*8, addr_view_pref=self.addr_view_pref), N = d.nOutputs, o = d.vouts_list_disp('outputs', color=color, indent=' '*8, addr_view_pref=self.addr_view_pref), @@ -176,7 +182,7 @@ class TwTxHistory(TwView): 'txid': lambda i: i.txid, } - async def set_dates(self,_): + async def set_dates(self, _): pass @property @@ -185,24 +191,24 @@ class TwTxHistory(TwView): class sort_action(TwView.sort_action): - def s_blockheight(self,parent): + def s_blockheight(self, parent): parent.do_sort('blockheight') - def s_amt(self,parent): + def s_amt(self, parent): parent.do_sort('amt') parent.show_total_amt = False - def s_total_amt(self,parent): + def s_total_amt(self, parent): parent.do_sort('total_amt') parent.show_total_amt = True class display_action(TwView.display_action): - def d_show_txid(self,parent): + def d_show_txid(self, parent): parent.show_txid = not parent.show_txid - def d_show_unconfirmed(self,parent): + def d_show_unconfirmed(self, parent): parent.show_unconfirmed = not parent.show_unconfirmed - def d_show_total_amt(self,parent): + def d_show_total_amt(self, parent): parent.show_total_amt = not parent.show_total_amt diff --git a/mmgen/tw/unspent.py b/mmgen/tw/unspent.py index 9442a1fa..7e75f6c9 100755 --- a/mmgen/tw/unspent.py +++ b/mmgen/tw/unspent.py @@ -28,10 +28,10 @@ from ..obj import ( TwComment, HexStr, CoinTxID, - NonNegativeInt ) + NonNegativeInt) from ..addr import CoinAddr from ..amt import CoinAmtChk -from .shared import TwMMGenID,get_tw_label +from .shared import TwMMGenID, get_tw_label from .view import TwView class TwUnspentOutputs(TwView): @@ -39,10 +39,10 @@ class TwUnspentOutputs(TwView): class display_type(TwView.display_type): class squeezed(TwView.display_type.squeezed): - cols = ('num','txid','vout','addr','mmid','comment','amt','amt2','date') + cols = ('num', 'txid', 'vout', 'addr', 'mmid', 'comment', 'amt', 'amt2', 'date') class detail(TwView.display_type.detail): - cols = ('num','txid','vout','addr','mmid','amt','amt2','block','date_time','comment') + cols = ('num', 'txid', 'vout', 'addr', 'mmid', 'amt', 'amt2', 'block', 'date_time', 'comment') show_mmid = True no_rpcdata_errmsg = """ @@ -58,20 +58,20 @@ class TwUnspentOutputs(TwView): vout = ListItemAttr(NonNegativeInt) amt = ImmutableAttr(CoinAmtChk, include_proto=True) amt2 = ListItemAttr(CoinAmtChk, include_proto=True) # the ETH balance for token account - comment = ListItemAttr(TwComment,reassign_ok=True) - twmmid = ImmutableAttr(TwMMGenID,include_proto=True) - addr = ImmutableAttr(CoinAddr,include_proto=True) - confs = ImmutableAttr(int,typeconv=False) - date = ListItemAttr(int,typeconv=False,reassign_ok=True) + comment = ListItemAttr(TwComment, reassign_ok=True) + twmmid = ImmutableAttr(TwMMGenID, include_proto=True) + addr = ImmutableAttr(CoinAddr, include_proto=True) + confs = ImmutableAttr(int, typeconv=False) + date = ListItemAttr(int, typeconv=False, reassign_ok=True) scriptPubKey = ImmutableAttr(HexStr) - skip = ListItemAttr(str,typeconv=False,reassign_ok=True) + skip = ListItemAttr(str, typeconv=False, reassign_ok=True) - def __init__(self,proto,**kwargs): + def __init__(self, proto, **kwargs): self.__dict__['proto'] = proto - MMGenListItem.__init__(self,**kwargs) + MMGenListItem.__init__(self, **kwargs) - async def __init__(self,cfg,proto,minconf=1,addrs=[]): - await super().__init__(cfg,proto) + async def __init__(self, cfg, proto, minconf=1, addrs=[]): + await super().__init__(cfg, proto) self.minconf = minconf self.addrs = addrs from ..cfg import gc @@ -81,23 +81,23 @@ class TwUnspentOutputs(TwView): def total(self): return sum(i.amt for i in self.data) - def gen_data(self,rpc_data,lbl_id): + def gen_data(self, rpc_data, lbl_id): for o in rpc_data: if not lbl_id in o: continue # coinbase outputs have no account field - l = get_tw_label(self.proto,o[lbl_id]) + l = get_tw_label(self.proto, o[lbl_id]) if l: if not 'amt' in o: o['amt'] = self.proto.coin_amt(o['amount']) o.update({ 'twmmid': l.mmid, 'comment': l.comment or '', - 'addr': CoinAddr(self.proto,o['address']), + 'addr': CoinAddr(self.proto, o['address']), 'confs': o['confirmations'] }) yield self.MMGenTwUnspentOutput( self.proto, - **{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } ) + **{k:v for k, v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs}) def filter_data(self): @@ -106,27 +106,27 @@ class TwUnspentOutputs(TwView): for d in data: d.skip = '' - gkeys = {'addr':'addr','twmmid':'addr','txid':'txid'} + gkeys = {'addr': 'addr', 'twmmid': 'addr', 'txid': 'txid'} if self.group and self.sort_key in gkeys: - for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]: + for a, b in [(data[i], data[i+1]) for i in range(len(data)-1)]: for k in gkeys: - if self.sort_key == k and getattr(a,k) == getattr(b,k): + if self.sort_key == k and getattr(a, k) == getattr(b, k): b.skip = gkeys[k] return data - def get_column_widths(self,data,wide,interactive): + def get_column_widths(self, data, wide, interactive): show_mmid = self.show_mmid or wide # num txid vout addr [mmid] [comment] amt [amt2] date return self.compute_column_widths( widths = { # fixed cols - 'num': max(2,len(str(len(data)))+1), + 'num': max(2, len(str(len(data)))+1), 'vout': 4, 'mmid': max(len(d.twmmid.disp) for d in data) if show_mmid else 0, 'amt': self.amt_widths['amt'], - 'amt2': self.amt_widths.get('amt2',0), + 'amt2': self.amt_widths.get('amt2', 0), 'block': self.age_col_params['block'][0] if wide else 0, 'date_time': self.age_col_params['date_time'][0] if wide else 0, 'date': self.age_w, @@ -147,7 +147,7 @@ class TwUnspentOutputs(TwView): interactive = interactive, ) - def squeezed_col_hdr(self,cw,fs,color): + def squeezed_col_hdr(self, cw, fs, color): return fs.format( n = '', t = 'TxID', @@ -157,9 +157,9 @@ class TwUnspentOutputs(TwView): c = 'Comment', A = 'Amt({})'.format(self.proto.dcoin), B = 'Amt({})'.format(self.proto.coin), - d = self.age_hdr ) + d = self.age_hdr) - def detail_col_hdr(self,cw,fs,color): + def detail_col_hdr(self, cw, fs, color): return fs.format( n = '', t = 'TxID', @@ -170,68 +170,68 @@ class TwUnspentOutputs(TwView): B = 'Amt({})'.format(self.proto.coin), b = 'Block', D = 'Date/Time', - c = 'Comment' ) + c = 'Comment') - def gen_squeezed_display(self,data,cw,fs,color,fmt_method): + def gen_squeezed_display(self, data, cw, fs, color, fmt_method): - for n,d in enumerate(data): + for n, d in enumerate(data): yield fs.format( n = str(n+1) + ')', - t = (d.txid.fmtc( '|' + '.'*(cw.txid-1), width=cw.txid, color=color ) if d.skip == 'txid' - else d.txid.truncate( width=cw.txid, color=color )) if cw.txid else None, - v = ' ' + d.vout.fmt( width=cw.vout-1, color=color ) if cw.vout else None, - a = d.addr.fmtc( '|' + '.'*(cw.addr-1), width=cw.addr, color=color ) if d.skip == 'addr' + t = (d.txid.fmtc('|' + '.'*(cw.txid-1), width=cw.txid, color=color) if d.skip == 'txid' + else d.txid.truncate(width=cw.txid, color=color)) if cw.txid else None, + v = ' ' + d.vout.fmt(width=cw.vout-1, color=color) if cw.vout else None, + a = d.addr.fmtc('|' + '.'*(cw.addr-1), width=cw.addr, color=color) if d.skip == 'addr' else d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color), - m = (d.twmmid.fmtc( '.'*cw.mmid, width=cw.mmid, color=color ) if d.skip == 'addr' - else d.twmmid.fmt( width=cw.mmid, color=color )) if cw.mmid else None, - c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ) if cw.comment else None, - A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ), - B = d.amt2.fmt( color=color, iwidth=cw.iwidth2, prec=self.disp_prec ) if cw.amt2 else None, - d = self.age_disp(d,self.age_fmt), + m = (d.twmmid.fmtc('.'*cw.mmid, width=cw.mmid, color=color) if d.skip == 'addr' + else d.twmmid.fmt(width=cw.mmid, color=color)) if cw.mmid else None, + c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-') if cw.comment else None, + A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec), + B = d.amt2.fmt(color=color, iwidth=cw.iwidth2, prec=self.disp_prec) if cw.amt2 else None, + d = self.age_disp(d, self.age_fmt), ) - def gen_detail_display(self,data,cw,fs,color,fmt_method): + def gen_detail_display(self, data, cw, fs, color, fmt_method): - for n,d in enumerate(data): + for n, d in enumerate(data): yield fs.format( n = str(n+1) + ')', - t = d.txid.fmt( width=cw.txid, color=color ) if cw.txid else None, - v = ' ' + d.vout.fmt( width=cw.vout-1, color=color ) if cw.vout else None, + t = d.txid.fmt(width=cw.txid, color=color) if cw.txid else None, + v = ' ' + d.vout.fmt(width=cw.vout-1, color=color) if cw.vout else None, a = d.addr.fmt(self.addr_view_pref, width=cw.addr, color=color), - m = d.twmmid.fmt( width=cw.mmid, color=color ), - A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ), - B = d.amt2.fmt( color=color, iwidth=cw.iwidth2, prec=self.disp_prec ) if cw.amt2 else None, - b = self.age_disp(d,'block'), - D = self.age_disp(d,'date_time'), - c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' )) + m = d.twmmid.fmt(width=cw.mmid, color=color), + A = d.amt.fmt(color=color, iwidth=cw.iwidth, prec=self.disp_prec), + B = d.amt2.fmt(color=color, iwidth=cw.iwidth2, prec=self.disp_prec) if cw.amt2 else None, + b = self.age_disp(d, 'block'), + D = self.age_disp(d, 'date_time'), + c = d.comment.fmt2(width=cw.comment, color=color, nullrepl='-')) def display_total(self): msg('\nTotal unspent: {} {} ({} output{})'.format( self.total.hl(), self.proto.dcoin, len(self.data), - suf(self.data) )) + suf(self.data))) - async def set_dates(self,us): + async def set_dates(self, us): if not self.dates_set: # 'blocktime' differs from 'time', is same as getblockheader['time'] - dates = [ o.get('blocktime',0) - for o in await self.rpc.gathered_icall('gettransaction',[(o.txid,True,False) for o in us]) ] - for idx,o in enumerate(us): + dates = [o.get('blocktime', 0) + for o in await self.rpc.gathered_icall('gettransaction', [(o.txid, True, False) for o in us])] + for idx, o in enumerate(us): o.date = dates[idx] self.dates_set = True class sort_action(TwView.sort_action): - def s_twmmid(self,parent): + def s_twmmid(self, parent): parent.do_sort('twmmid') parent.show_mmid = True class display_action(TwView.display_action): - def d_mmid(self,parent): + def d_mmid(self, parent): parent.show_mmid = not parent.show_mmid - def d_group(self,parent): + def d_group(self, parent): if parent.can_group: parent.group = not parent.group diff --git a/mmgen/tw/view.py b/mmgen/tw/view.py index 43e7e4bc..1d4118b1 100755 --- a/mmgen/tw/view.py +++ b/mmgen/tw/view.py @@ -20,14 +20,14 @@ tw.view: base class for tracking wallet view classes """ -import sys,time,asyncio +import sys, time, asyncio from collections import namedtuple from ..cfg import gv from ..objmethods import MMGenObject -from ..obj import get_obj,MMGenIdx,MMGenList -from ..color import nocolor,yellow,green,red,blue -from ..util import msg,msg_r,fmt,die,capfirst,make_timestr +from ..obj import get_obj, MMGenIdx, MMGenList +from ..color import nocolor, yellow, green, red, blue +from ..util import msg, msg_r, fmt, die, capfirst, make_timestr from ..rpc import rpc_init from ..base_obj import AsyncInit @@ -39,17 +39,17 @@ ERASE_ALL = '\033[0J' # decorator for action.run(): def enable_echo(orig_func): - async def f(self,parent,action_method): + async def f(self, parent, action_method): if parent.scroll: parent.term.set('echo') - ret = await orig_func(self,parent,action_method) + ret = await orig_func(self, parent, action_method) if parent.scroll: parent.term.set('noecho') return ret return f -# base class for TwUnspentOutputs,TwAddresses,TwTxHistory: -class TwView(MMGenObject,metaclass=AsyncInit): +# base class for TwUnspentOutputs, TwAddresses, TwTxHistory: +class TwView(MMGenObject, metaclass=AsyncInit): class display_type: @@ -77,8 +77,8 @@ class TwView(MMGenObject,metaclass=AsyncInit): class print: @staticmethod - def do(method,data,cw,fs,color,fmt_method): - return [l.rstrip() for l in method(data,cw,fs,color,fmt_method)] + def do(method, data, cw, fs, color, fmt_method): + return [l.rstrip() for l in method(data, cw, fs, color, fmt_method)] has_wallet = True has_amt2 = False @@ -101,7 +101,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): pos = 0 filters = () - fp = namedtuple('fs_params',['fs_key','hdr_fs_repl','fs_repl','hdr_fs','fs']) + fp = namedtuple('fs_params', ['fs_key', 'hdr_fs_repl', 'fs_repl', 'hdr_fs', 'fs']) fs_params = { 'num': fp('n', True, True, ' {n:>%s}', ' {n:>%s}'), 'txid': fp('t', True, False, ' {t:%s}', ' {t}'), @@ -119,8 +119,8 @@ class TwView(MMGenObject,metaclass=AsyncInit): 'outputs': fp('o', True, False, ' {o:%s}', ' {o}'), } - age_fmts = ('confs','block','days','date','date_time') - age_fmts_date_dependent = ('days','date','date_time') + age_fmts = ('confs', 'block', 'days', 'date', 'date_time') + age_fmts_date_dependent = ('days', 'date', 'date_time') _age_fmt = 'confs' bch_addr_fmts = ('cashaddr', 'legacy') @@ -134,12 +134,12 @@ class TwView(MMGenObject,metaclass=AsyncInit): } date_formatter = { - 'days': lambda rpc,secs: (rpc.cur_date - secs) // 86400 if secs else 0, + 'days': lambda rpc, secs: (rpc.cur_date - secs) // 86400 if secs else 0, 'date': ( - lambda rpc,secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:] + lambda rpc, secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:] if secs else '- '), 'date_time': ( - lambda rpc,secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5]) + lambda rpc, secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5]) if secs else '- '), } @@ -187,17 +187,17 @@ class TwView(MMGenObject,metaclass=AsyncInit): } scroll_keys['darwin'] = scroll_keys['linux'] - def __new__(cls,cfg,proto,*args,**kwargs): - return MMGenObject.__new__(proto.base_proto_subclass(cls,cls.mod_subpath)) + def __new__(cls, cfg, proto, *args, **kwargs): + return MMGenObject.__new__(proto.base_proto_subclass(cls, cls.mod_subpath)) - async def __init__(self,cfg,proto): + async def __init__(self, cfg, proto): self.cfg = cfg self.proto = proto - self.rpc = await rpc_init(cfg,proto) + self.rpc = await rpc_init(cfg, proto) if self.has_wallet: from .ctl import TwCtl - self.twctl = await TwCtl(cfg,proto,mode='w') - self.amt_keys = {'amt':'iwidth','amt2':'iwidth2'} if self.has_amt2 else {'amt':'iwidth'} + self.twctl = await TwCtl(cfg, proto, mode='w') + self.amt_keys = {'amt':'iwidth', 'amt2':'iwidth2'} if self.has_amt2 else {'amt':'iwidth'} if repl := self.prompt_fs_repl.get(self.proto.coin): self.prompt_fs_in[repl[0]] = repl[1] self.prompt_fs = '\n'.join(self.prompt_fs_in) @@ -218,20 +218,20 @@ class TwView(MMGenObject,metaclass=AsyncInit): return self._age_fmt @age_fmt.setter - def age_fmt(self,val): + def age_fmt(self, val): if val not in self.age_fmts: - die( 'BadAgeFormat', f'{val!r}: invalid age format (must be one of {self.age_fmts!r})' ) + die('BadAgeFormat', f'{val!r}: invalid age format (must be one of {self.age_fmts!r})') self._age_fmt = val - def age_disp(self,o,age_fmt): + def age_disp(self, o, age_fmt): if age_fmt == 'confs': return o.confs or '-' elif age_fmt == 'block': return self.rpc.blockcount + 1 - o.confs if o.confs else '-' else: - return self.date_formatter[age_fmt](self.rpc,o.date) + return self.date_formatter[age_fmt](self.rpc, o.date) - def get_disp_prec(self,wide): + def get_disp_prec(self, wide): return self.proto.coin_amt.max_prec sort_disp = { @@ -250,48 +250,48 @@ class TwView(MMGenObject,metaclass=AsyncInit): 'twmmid': lambda i: '{} {:010} {:024.12f}'.format(i.twmmid.sort_key, 0xffffffff - abs(i.confs), i.amt) } - def sort_info(self,include_group=True): - ret = ([],['Reverse'])[self.reverse] + def sort_info(self, include_group=True): + ret = ([], ['Reverse'])[self.reverse] ret.append(self.sort_disp[self.sort_key]) - if include_group and self.group and (self.sort_key in ('addr','txid','twmmid')): + if include_group and self.group and (self.sort_key in ('addr', 'txid', 'twmmid')): ret.append('Grouped') return ret - def do_sort(self,key=None,reverse=False): + def do_sort(self, key=None, reverse=False): key = key or self.sort_key if key not in self.sort_funcs: - die(1,f'{key!r}: invalid sort key. Valid options: {" ".join(self.sort_funcs)}') + die(1, f'{key!r}: invalid sort key. Valid options: {" ".join(self.sort_funcs)}') self.sort_key = key - assert isinstance(reverse,bool) + assert isinstance(reverse, bool) save = self.data.copy() - self.data.sort(key=self.sort_funcs[key],reverse=reverse or self.reverse) + self.data.sort(key=self.sort_funcs[key], reverse=reverse or self.reverse) if self.data != save: self.pos = 0 - async def get_data(self,sort_key=None,reverse_sort=False): + async def get_data(self, sort_key=None, reverse_sort=False): rpc_data = await self.get_rpc_data() if not rpc_data: - die(1,fmt(self.no_rpcdata_errmsg).strip()) + die(1, fmt(self.no_rpcdata_errmsg).strip()) - lbl_id = ('account','label')['label_api' in self.rpc.caps] + lbl_id = ('account', 'label')['label_api' in self.rpc.caps] - res = self.gen_data(rpc_data,lbl_id) + res = self.gen_data(rpc_data, lbl_id) self.data = MMGenList(await res if type(res).__name__ == 'coroutine' else res) self.disp_data = list(self.filter_data()) if not self.data: - die(1,self.no_data_errmsg) + die(1, self.no_data_errmsg) - self.do_sort(key=sort_key,reverse=reverse_sort) + self.do_sort(key=sort_key, reverse=reverse_sort) # get_data() is immediately followed by display header, and get_rpc_data() produces output, # so add NL here (' ' required because CUR_HOME erases preceding blank lines) msg(' ') - def get_term_dimensions(self,min_cols,min_lines=None): - from ..term import get_terminal_size,get_char_raw,_term_dimensions + def get_term_dimensions(self, min_cols, min_lines=None): + from ..term import get_terminal_size, get_char_raw, _term_dimensions user_resized = False while True: ts = get_terminal_size() @@ -300,30 +300,30 @@ class TwView(MMGenObject,metaclass=AsyncInit): if cols >= min_cols and (min_lines is None or lines >= min_lines): if user_resized: msg_r(CUR_HOME + ERASE_ALL) - return _term_dimensions(cols,ts.height) + return _term_dimensions(cols, ts.height) if sys.stdout.isatty(): if self.cfg.columns and cols < min_cols: - die(1,'\n'+fmt(self.twidth_diemsg.format(self.cfg.columns,self.desc,min_cols),indent=' ')) + die(1, '\n'+fmt(self.twidth_diemsg.format(self.cfg.columns, self.desc, min_cols), indent=' ')) else: - m,dim = (self.twidth_errmsg,min_cols) if cols < min_cols else (self.theight_errmsg,min_lines) - get_char_raw( CUR_HOME + ERASE_ALL + fmt( m.format(self.desc,dim), append='' )) + m, dim = (self.twidth_errmsg, min_cols) if cols < min_cols else (self.theight_errmsg, min_lines) + get_char_raw(CUR_HOME + ERASE_ALL + fmt(m.format(self.desc, dim), append='')) user_resized = True else: - return _term_dimensions(min_cols,ts.height) + return _term_dimensions(min_cols, ts.height) - def compute_column_widths(self,widths,maxws,minws,maxws_nice,wide,interactive): + def compute_column_widths(self, widths, maxws, minws, maxws_nice, wide, interactive): def do_ret(freews): - widths.update({k:minws[k] + freews.get(k,0) for k in minws}) - widths.update({ikey: widths[key] - self.disp_prec - 1 for key,ikey in self.amt_keys.items()}) - return namedtuple('column_widths',widths.keys())(*widths.values()) + widths.update({k:minws[k] + freews.get(k, 0) for k in minws}) + widths.update({ikey: widths[key] - self.disp_prec - 1 for key, ikey in self.amt_keys.items()}) + return namedtuple('column_widths', widths.keys())(*widths.values()) def do_ret_max(): - widths.update({k:max(minws[k],maxws[k]) for k in minws}) - widths.update({ikey: widths[key] - self.disp_prec - 1 for key,ikey in self.amt_keys.items()}) - return namedtuple('column_widths',widths.keys())(*widths.values()) + widths.update({k:max(minws[k], maxws[k]) for k in minws}) + widths.update({ikey: widths[key] - self.disp_prec - 1 for key, ikey in self.amt_keys.items()}) + return namedtuple('column_widths', widths.keys())(*widths.values()) - def get_freews(cols,varws,varw,minw): + def get_freews(cols, varws, varw, minw): freew = cols - minw if freew and varw: x = freew / varw @@ -343,12 +343,12 @@ class TwView(MMGenObject,metaclass=AsyncInit): minw = sum(widths.values()) + sum(minws.values()) varw = sum(varws.values()) - self.min_term_width = 40 if wide else max(self.prompt_width,minw) if interactive else minw + self.min_term_width = 40 if wide else max(self.prompt_width, minw) if interactive else minw td = self.get_term_dimensions(self.min_term_width) self.term_height = td.height self.term_width = td.width - self.cols = min(self.term_width,minw + varw) + self.cols = min(self.term_width, minw + varw) if wide or self.cols == minw + varw: return do_ret_max() @@ -358,33 +358,33 @@ class TwView(MMGenObject,metaclass=AsyncInit): varws_hp = {k: maxws_nice[k] - minws[k] if k in maxws_nice else varws[k] for k in varws} varw_hp = sum(varws_hp.values()) widths_hp = get_freews( - min(self.term_width,minw + varw_hp), + min(self.term_width, minw + varw_hp), varws_hp, varw_hp, - minw ) + minw) # compute low-priority (nice) widths: varws_lp = {k: varws[k] - varws_hp[k] for k in maxws_nice if k in varws} widths_lp = get_freews( self.cols, varws_lp, sum(varws_lp.values()), - minw + sum(widths_hp.values()) ) + minw + sum(widths_hp.values())) # sum the two for each field: - return do_ret({k:widths_hp[k] + widths_lp.get(k,0) for k in varws}) + return do_ret({k:widths_hp[k] + widths_lp.get(k, 0) for k in varws}) else: - return do_ret(get_freews(self.cols,varws,varw,minw)) + return do_ret(get_freews(self.cols, varws, varw, minw)) - def gen_subheader(self,cw,color): + def gen_subheader(self, cw, color): return () - def gen_footer(self,color): - if hasattr(self,'total'): - yield 'TOTAL: {} {}'.format( self.total.hl(color=color), self.proto.dcoin ) + def gen_footer(self, color): + if hasattr(self, 'total'): + yield 'TOTAL: {} {}'.format(self.total.hl(color=color), self.proto.dcoin) - def set_amt_widths(self,data): - # width of amts column: min(7,width of integer part) + len('.') + width of fractional part + def set_amt_widths(self, data): + # width of amts column: min(7, width of integer part) + len('.') + width of fractional part self.amt_widths = { - k:min(7,max(len(str(getattr(d,k).to_integral_value())) for d in data)) + 1 + self.disp_prec + k:min(7, max(len(str(getattr(d, k).to_integral_value())) for d in data)) + 1 + self.disp_prec for k in self.amt_keys} async def format( @@ -399,57 +399,57 @@ class TwView(MMGenObject,metaclass=AsyncInit): def gen_hdr(): - Blue,Green = (blue,green) if color else (nocolor,nocolor) - Yes,No,All = (green('yes'),red('no'),yellow('all')) if color else ('yes','no','all') + Blue, Green = (blue, green) if color else (nocolor, nocolor) + Yes, No, All = (green('yes'), red('no'), yellow('all')) if color else ('yes', 'no', 'all') sort_info = ' '.join(self.sort_info()) def fmt_filter(k): - return '{}:{}'.format(k,{0:No,1:Yes,2:All}[getattr(self,k)]) + return '{}:{}'.format(k, {0:No, 1:Yes, 2:All}[getattr(self, k)]) yield '{} (sort order: {}){}'.format( self.hdr_lbl.upper(), Blue(sort_info), - ' ' * (self.cols - len(f'{self.hdr_lbl} (sort order: {sort_info})')) ) + ' ' * (self.cols - len(f'{self.hdr_lbl} (sort order: {sort_info})'))) if self.filters: yield 'Filters: {}{}'.format( - ' '.join(map(fmt_filter,self.filters)), - ' ' * len(self.filters) ) + ' '.join(map(fmt_filter, self.filters)), + ' ' * len(self.filters)) yield 'Network: {}'.format(Green( - self.proto.coin + ' ' + self.proto.chain_name.upper() )) + self.proto.coin + ' ' + self.proto.chain_name.upper())) yield 'Block {} [{}]'.format( self.rpc.blockcount.hl(color=color), - make_timestr(self.rpc.cur_date) ) + make_timestr(self.rpc.cur_date)) - if hasattr(self,'total'): - yield 'Total {}: {}'.format( self.proto.dcoin, self.total.hl(color=color) ) + if hasattr(self, 'total'): + yield 'Total {}: {}'.format(self.proto.dcoin, self.total.hl(color=color)) - yield from getattr(self,dt.subhdr_fmt_method)(cw,color) + yield from getattr(self, dt.subhdr_fmt_method)(cw, color) yield ' ' * self.term_width if data and dt.colhdr_fmt_method: - col_hdr = getattr(self,dt.colhdr_fmt_method)(cw,hdr_fs,color) + col_hdr = getattr(self, dt.colhdr_fmt_method)(cw, hdr_fs, color) yield col_hdr.rstrip() if line_processing == 'print' else col_hdr def get_body(method): if line_processing: - return getattr(self.line_processing,line_processing).do( - method,data,cw,fs,color,getattr(self,dt.line_fmt_method)) + return getattr(self.line_processing, line_processing).do( + method, data, cw, fs, color, getattr(self, dt.line_fmt_method)) else: - return method(data,cw,fs,color,getattr(self,dt.line_fmt_method)) + return method(data, cw, fs, color, getattr(self, dt.line_fmt_method)) if data and dt.need_column_widths: self.set_amt_widths(data) - cw = self.get_column_widths(data,wide=dt.detail,interactive=interactive) + cw = self.get_column_widths(data, wide=dt.detail, interactive=interactive) cwh = cw._asdict() fp = self.fs_params rfill = ' ' * (self.term_width - self.cols) if scroll else '' - hdr_fs = ''.join(fp[name].hdr_fs % ((),cwh[name])[fp[name].hdr_fs_repl] + hdr_fs = ''.join(fp[name].hdr_fs % ((), cwh[name])[fp[name].hdr_fs_repl] for name in dt.cols if cwh[name]) + rfill - fs = ''.join(fp[name].fs % ((),cwh[name])[fp[name].fs_repl] + fs = ''.join(fp[name].fs % ((), cwh[name])[fp[name].fs_repl] for name in dt.cols if cwh[name]) + rfill else: cw = hdr_fs = fs = None @@ -457,14 +457,14 @@ class TwView(MMGenObject,metaclass=AsyncInit): return ( tuple(gen_hdr()), tuple( - get_body(getattr(self,dt.fmt_method)) if data else - [(nocolor,yellow)[color](self.nodata_msg.ljust(self.term_width))] ) + get_body(getattr(self, dt.fmt_method)) if data else + [(nocolor, yellow)[color](self.nodata_msg.ljust(self.term_width))]) ) if not gv.stdout.isatty(): line_processing = 'print' - dt = getattr(self.display_type,display_type) + dt = getattr(self.display_type, display_type) if self.use_cached: self.use_cached = False @@ -482,7 +482,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): if data != dsave: self.pos = 0 - display_hdr,display_body = make_display() + display_hdr, display_body = make_display() if scroll: fixed_height = len(display_hdr) + self.prompt_height + 1 @@ -490,14 +490,14 @@ class TwView(MMGenObject,metaclass=AsyncInit): if self.term_height - fixed_height < self.min_scrollable_height: td = self.get_term_dimensions( self.min_term_width, - min_lines = self.min_scrollable_height + fixed_height ) + min_lines = self.min_scrollable_height + fixed_height) self.term_height = td.height self.term_width = td.width - display_hdr,display_body = make_display() + display_hdr, display_body = make_display() self.scrollable_height = self.term_height - fixed_height self.max_pos = max(0, len(display_body) - self.scrollable_height) - self.pos = min(self.pos,self.max_pos) + self.pos = min(self.pos, self.max_pos) if not dt.detail: self.display_hdr = display_hdr @@ -508,7 +508,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): bot = self.pos + self.scrollable_height fill = ('\n' + ''.ljust(self.term_width)) * (self.scrollable_height - len(display_body)) else: - top,bot,fill = (None,None,'') + top, bot, fill = (None, None, '') if interactive: footer = '' @@ -544,10 +544,10 @@ class TwView(MMGenObject,metaclass=AsyncInit): scroll = self.scroll = self.cfg.scroll key_mappings = make_key_mappings(scroll) - action_classes = { k: getattr(self,action_map[v[:2]])() for k,v in key_mappings.items() } - action_methods = { k: getattr(v,key_mappings[k]) for k,v in action_classes.items() } + action_classes = {k: getattr(self, action_map[v[:2]])() for k, v in key_mappings.items()} + action_methods = {k: getattr(v, key_mappings[k]) for k, v in action_classes.items()} prompt = self.prompt_fs.strip().format( - s='\nScrolling: k=up, j=down, b=pgup, f=pgdown, g=top, G=bottom' if scroll else '' ) + s='\nScrolling: k=up, j=down, b=pgup, f=pgdown, g=top, G=bottom' if scroll else '') self.prompt_width = max(len(l) for l in prompt.split('\n')) self.prompt_height = len(prompt.split('\n')) @@ -556,7 +556,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): clear_screen = '\n\n' if self.cfg.no_blank else CUR_HOME + ('' if scroll else ERASE_ALL) - from ..term import get_term,get_char,get_char_raw + from ..term import get_term, get_char, get_char_raw if scroll: self.term = get_term() @@ -574,16 +574,16 @@ class TwView(MMGenObject,metaclass=AsyncInit): reply = get_char( clear_screen - + await self.format('squeezed',interactive=True,scroll=scroll) + + await self.format('squeezed', interactive=True, scroll=scroll) + '\n\n' + (self.oneshot_msg + '\n\n' if self.oneshot_msg and not scroll else '') + prompt, - immed_chars = key_mappings ) + immed_chars = key_mappings) self.oneshot_msg = '' if reply in key_mappings: - ret = action_classes[reply].run(self,action_methods[reply]) + ret = action_classes[reply].run(self, action_methods[reply]) if type(ret).__name__ == 'coroutine': await ret elif reply == 'q': @@ -600,9 +600,9 @@ class TwView(MMGenObject,metaclass=AsyncInit): def blank_prompt(self): return CUR_HOME + CUR_DOWN(self.term_height - self.prompt_height) + ERASE_ALL - def keypress_confirm(self,*args,**kwargs): + def keypress_confirm(self, *args, **kwargs): from ..ui import keypress_confirm - if keypress_confirm( self.cfg, *args, no_nl=self.scroll, **kwargs ): + if keypress_confirm(self.cfg, *args, no_nl=self.scroll, **kwargs): return True else: if self.scroll: @@ -612,16 +612,16 @@ class TwView(MMGenObject,metaclass=AsyncInit): class action: @enable_echo - async def run(self,parent,action_method): + async def run(self, parent, action_method): return await action_method(parent) - async def a_print_detail(self,parent): - return await self._print(parent,output_type='detail') + async def a_print_detail(self, parent): + return await self._print(parent, output_type='detail') - async def a_print_squeezed(self,parent): - return await self._print(parent,output_type='squeezed') + async def a_print_squeezed(self, parent): + return await self._print(parent, output_type='squeezed') - async def _print(self,parent,output_type): + async def _print(self, parent, output_type): if not parent.disp_data: return None @@ -631,9 +631,9 @@ class TwView(MMGenObject,metaclass=AsyncInit): b = f'-{output_type}' if len(parent.print_output_types) > 1 else '', c = parent.proto.dcoin, d = ('' if parent.proto.network == 'mainnet' else '-'+parent.proto.network.upper()), - e = ','.join(parent.sort_info(include_group=False)).replace(' ','') ) + e = ','.join(parent.sort_info(include_group=False)).replace(' ', '')) - print_hdr = getattr(parent.display_type,output_type).print_header.format(parent.cols) + print_hdr = getattr(parent.display_type, output_type).print_header.format(parent.cols) msg_r(parent.blank_prompt if parent.scroll else '\n') @@ -646,28 +646,28 @@ class TwView(MMGenObject,metaclass=AsyncInit): data = print_hdr + await parent.format( display_type = output_type, line_processing = 'print', - color = False ), - desc = f'{parent.desc} listing' ) + color = False), + desc = f'{parent.desc} listing') except UserNonConfirmation: parent.oneshot_msg = yellow(f'File {outfile!r} not overwritten by user request') else: parent.oneshot_msg = green(f'Data written to {outfile!r}') - async def a_view(self,parent): + async def a_view(self, parent): from ..ui import do_pager parent.use_cached = True msg_r(CUR_HOME) - do_pager( await parent.format('squeezed',color=True) ) + do_pager(await parent.format('squeezed', color=True)) - async def a_view_detail(self,parent): + async def a_view_detail(self, parent): from ..ui import do_pager msg_r(CUR_HOME) - do_pager( await parent.format('detail',color=True) ) + do_pager(await parent.format('detail', color=True)) class item_action: @enable_echo - async def run(self,parent,action_method): + async def run(self, parent, action_method): if not parent.disp_data: return @@ -677,17 +677,17 @@ class TwView(MMGenObject,metaclass=AsyncInit): msg_r(parent.blank_prompt if parent.scroll else '\n') ret = line_input( parent.cfg, - f'Enter {parent.item_desc} number (or ENTER to return to main menu): ' ) + f'Enter {parent.item_desc} number (or ENTER to return to main menu): ') if ret == '': if parent.scroll: - msg_r( CUR_UP(1) + '\r' + ''.ljust(parent.term_width) ) + msg_r(CUR_UP(1) + '\r' + ''.ljust(parent.term_width)) return - idx = get_obj(MMGenIdx,n=ret,silent=True) + idx = get_obj(MMGenIdx, n=ret, silent=True) if not idx or idx < 1 or idx > len(parent.disp_data): msg_r( 'Choice must be a single number between 1 and {n}{s}'.format( n = len(parent.disp_data), - s = ' ' if parent.scroll else '' )) + s = ' ' if parent.scroll else '')) if parent.scroll: await asyncio.sleep(1.5) msg_r(CUR_UP(1) + '\r' + ERASE_ALL) @@ -697,7 +697,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): # None: action aborted by user or no action performed # False: an error occurred # 'redo': user will be re-prompted for item number - ret = await action_method(parent,idx) + ret = await action_method(parent, idx) if ret != 'redo': break await asyncio.sleep(0.5) @@ -706,22 +706,22 @@ class TwView(MMGenObject,metaclass=AsyncInit): # error messages could leave screen in messy state, so do complete redraw: msg_r( CUR_HOME + ERASE_ALL + - await parent.format(display_type='squeezed',interactive=True,scroll=True) ) + await parent.format(display_type='squeezed', interactive=True, scroll=True)) - async def i_balance_refresh(self,parent,idx): + async def i_balance_refresh(self, parent, idx): if not parent.keypress_confirm( - f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?' ): + f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?'): return 'redo' - await parent.twctl.get_balance( parent.disp_data[idx-1].addr, force_rpc=True ) + await parent.twctl.get_balance(parent.disp_data[idx-1].addr, force_rpc=True) await parent.get_data() parent.oneshot_msg = yellow(f'{parent.proto.dcoin} balance for account #{idx} refreshed') - async def i_addr_delete(self,parent,idx): + async def i_addr_delete(self, parent, idx): if not parent.keypress_confirm( 'Removing {} {} from tracking wallet. Is this what you want?'.format( - parent.item_desc, red(f'#{idx}') )): + parent.item_desc, red(f'#{idx}'))): return 'redo' - if await parent.twctl.remove_address( parent.disp_data[idx-1].addr ): + if await parent.twctl.remove_address(parent.disp_data[idx-1].addr): await parent.get_data() parent.oneshot_msg = yellow(f'{capfirst(parent.item_desc)} #{idx} removed') return True @@ -730,7 +730,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): parent.oneshot_msg = red('Address could not be removed') return False - async def i_comment_add(self,parent,idx): + async def i_comment_add(self, parent, idx): async def do_comment_add(comment): @@ -744,7 +744,7 @@ class TwView(MMGenObject,metaclass=AsyncInit): parent.oneshot_msg = (green if comment else yellow)('Label {a} {b}{c}'.format( a = 'for' if edited else 'added to' if comment else 'removed from', b = desc, - c = ' edited' if edited else '' )) + c = ' edited' if edited else '')) return True else: await asyncio.sleep(3) @@ -762,80 +762,79 @@ class TwView(MMGenObject,metaclass=AsyncInit): from ..ui import line_input res = line_input( parent.cfg, - 'Enter label text for {} {}: '.format(parent.item_desc,red(f'#{idx}')), - insert_txt = cur_comment ) + 'Enter label text for {} {}: '.format(parent.item_desc, red(f'#{idx}')), + insert_txt = cur_comment) if res == cur_comment: parent.oneshot_msg = yellow(f'Label for {desc} unchanged') return None elif res == '': - if not parent.keypress_confirm( - f'Removing label for {desc}. Is this what you want?' ): + if not parent.keypress_confirm(f'Removing label for {desc}. Is this what you want?'): return 'redo' return await do_comment_add(res) class scroll_action: - def run(self,parent,action_method): + def run(self, parent, action_method): self.use_cached = True return action_method(parent) - def m_cursor_up(self,parent): - parent.pos -= min( parent.pos - 0, 1 ) + def m_cursor_up(self, parent): + parent.pos -= min(parent.pos - 0, 1) - def m_cursor_down(self,parent): - parent.pos += min( parent.max_pos - parent.pos, 1 ) + def m_cursor_down(self, parent): + parent.pos += min(parent.max_pos - parent.pos, 1) - def m_pg_up(self,parent): - parent.pos -= min( parent.scrollable_height, parent.pos - 0 ) + def m_pg_up(self, parent): + parent.pos -= min(parent.scrollable_height, parent.pos - 0) - def m_pg_down(self,parent): - parent.pos += min( parent.scrollable_height, parent.max_pos - parent.pos ) + def m_pg_down(self, parent): + parent.pos += min(parent.scrollable_height, parent.max_pos - parent.pos) - def m_top(self,parent): + def m_top(self, parent): parent.pos = 0 - def m_bot(self,parent): + def m_bot(self, parent): parent.pos = parent.max_pos class sort_action: - def run(self,parent,action_method): + def run(self, parent, action_method): return action_method(parent) - def s_addr(self,parent): + def s_addr(self, parent): parent.do_sort('addr') - def s_age(self,parent): + def s_age(self, parent): parent.do_sort('age') - def s_amt(self,parent): + def s_amt(self, parent): parent.do_sort('amt') - def s_txid(self,parent): + def s_txid(self, parent): parent.do_sort('txid') - def s_twmmid(self,parent): + def s_twmmid(self, parent): parent.do_sort('twmmid') - def s_reverse(self,parent): + def s_reverse(self, parent): parent.data.reverse() parent.reverse = not parent.reverse class display_action: - def run(self,parent,action_method): + def run(self, parent, action_method): return action_method(parent) - def d_days(self,parent): + def d_days(self, parent): af = parent.age_fmts parent.age_fmt = af[(af.index(parent.age_fmt) + 1) % len(af)] if parent.update_widths_on_age_toggle: # TODO pass - def d_redraw(self,parent): + def d_redraw(self, parent): msg_r(CUR_HOME + ERASE_ALL) - def d_addr_view_pref(self,parent): + def d_addr_view_pref(self, parent): parent.addr_view_pref = (parent.addr_view_pref + 1) % len(parent.bch_addr_fmts)