mmgen.tw.common: new format(), header(), subheader() methods

This commit is contained in:
The MMGen Project 2022-11-09 13:05:09 +00:00
commit 5d3ed7d976
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
11 changed files with 129 additions and 93 deletions

View file

@ -280,6 +280,9 @@ class Int(int,Hilite,InitErrors):
def colorize(cls,n,**kwargs):
return super().colorize(repr(n),**kwargs)
class NonNegativeInt(Int):
min_val = 0
class MMGenIdx(Int):
min_val = 1

View file

@ -71,7 +71,7 @@ class Hilite:
assert trunc_ok, "If 'trunc_ok' is false, 'width' must be >= screen width of string"
s = truncate_str(s,width-add_len)
if s == '' and nullrepl:
s = nullrepl.ljust(width)
s = nullrepl
else:
s = a+s+b
if center:

View file

@ -224,7 +224,7 @@ class BitcoinTwTransaction(BitcoinTwCommon):
class BitcoinTwTxHistory(TwTxHistory,BitcoinTwCommon):
has_age = True
hdr_fmt = 'TRANSACTION HISTORY (sort order: {a})'
hdr_lbl = 'transaction history'
desc = 'transaction history'
item_desc = 'transaction'
no_data_errmsg = 'No transactions in tracking wallet!'

View file

@ -24,7 +24,7 @@ class BitcoinTwUnspentOutputs(TwUnspentOutputs):
has_age = True
can_group = True
hdr_fmt = 'UNSPENT OUTPUTS (sort order: {a}) Total {b}: {c}'
hdr_lbl = 'unspent outputs'
desc = 'unspent outputs'
item_desc = 'unspent output'
no_data_errmsg = 'No unspent outputs in tracking wallet!'

View file

@ -20,6 +20,7 @@
proto.eth.twuo: Ethereum tracking wallet unspent outputs class
"""
from ....globalvars import g
from ....tw.common import TwLabel
from ....tw.unspent import TwUnspentOutputs
@ -33,7 +34,7 @@ class EthereumTwUnspentOutputs(TwUnspentOutputs):
has_age = False
can_group = False
col_adj = 29
hdr_fmt = 'TRACKED ACCOUNTS (sort order: {a})\nTotal {b}: {c}'
hdr_lbl = 'tracked accounts'
desc = 'account balances'
item_desc = 'account'
dump_fn_pfx = 'balances'
@ -62,12 +63,13 @@ Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view,
wide_fs_fs = ' {{n:4}} {{a}} {{m}} {{A:{aw}}} {{l}}'
no_data_errmsg = 'No accounts in tracking wallet!'
async def __init__(self,proto,*args,**kwargs):
from ....globalvars import g
def subheader(self,color):
if g.cached_balances:
from ....color import yellow
self.hdr_fmt += '\n' + yellow('WARNING: Using cached balances. These may be out of date!')
await super().__init__(proto,*args,**kwargs)
from ....color import nocolor,yellow
return (nocolor,yellow)[color](
'WARNING: Using cached balances. These may be out of date!') + '\n'
else:
return ''
def do_sort(self,key=None,reverse=False):
if key == 'txid': return

View file

@ -26,6 +26,7 @@ from collections import namedtuple
from .common import *
from .base_obj import AsyncInit
from .obj import NonNegativeInt
from .objmethods import Hilite,InitErrors,MMGenObject
auth_data = namedtuple('rpc_auth_data',['user','passwd'])
@ -468,4 +469,6 @@ async def rpc_init(
RPC client chain: {rpc.chain}
""",indent=' ').rstrip() )
rpc.blockcount = NonNegativeInt(rpc.blockcount)
return rpc

View file

@ -112,10 +112,8 @@ class tool_cmd(tool_cmd_base):
if interactive:
await obj.view_and_sort()
return True
elif detail:
return await obj.format_detail( color=True )
else:
return await obj.format_squeezed()
return await obj.format('detail' if detail else 'squeezed')
async def twview(self,
pager: 'send output to pager' = False,

View file

@ -26,7 +26,7 @@ from collections import namedtuple
from ..globalvars import g
from ..objmethods import Hilite,InitErrors,MMGenObject
from ..obj import TwComment,get_obj,MMGenIdx,MMGenList
from ..color import nocolor,yellow,green
from ..color import nocolor,yellow,green,red,blue
from ..util import msg,msg_r,fmt,die,capfirst,make_timestr
from ..addr import MMGenID
@ -39,6 +39,8 @@ class TwCommon:
group = False
sort_key = 'age'
interactive = False
_display_data = {}
filters = ()
age_fmts = ('confs','block','days','date','date_time')
age_fmts_date_dependent = ('days','date','date_time')
@ -72,6 +74,22 @@ class TwCommon:
Please resize your screen to at least {} characters and hit any key:
"""
class display_type:
class squeezed:
detail = False
fmt_method = 'gen_squeezed_display'
need_column_widths = True
item_separator = '\n'
print_header = '[screen print truncated to width {}]\n'
class detail:
detail = True
fmt_method = 'gen_detail_display'
need_column_widths = True
item_separator = '\n'
print_header = ''
def age_disp(self,o,age_fmt):
if age_fmt == 'confs':
return o.confs
@ -237,53 +255,60 @@ class TwCommon:
else:
return do_ret(get_freews(self.cols,varws,varw,minw))
async def format_squeezed(self,color=True,cached=False):
def header(self,color):
Blue,Green = (blue,green) if color else (nocolor,nocolor)
yes,no = green('yes'),red('no') if color else ('yes','no')
def fmt_filter(k):
return '{}:{}'.format(k,yes if getattr(self,k) else no)
return '{h} (sort order: {s}){f}\nNetwork: {n}\nBlock {b} [{d}]\n{t}'.format(
h = self.hdr_lbl.upper(),
f = '\nFilters: '+' '.join(fmt_filter(k) for k in self.filters) if self.filters else '',
s = Blue(' '.join(self.sort_info())),
n = Green(self.proto.coin + ' ' + self.proto.chain_name.upper()),
b = self.rpc.blockcount.hl(color=color),
d = make_timestr(self.rpc.cur_date),
t = f'Total {self.proto.dcoin}: {self.total.hl(color=color)}\n' if hasattr(self,'total') else '',
)
def subheader(self,color):
return ''
def filter_data(self):
return self.data
async def format(self,display_type,color=True,cached=False,interactive=False):
if not cached:
data = self.data
if self.has_age and self.age_fmt in self.age_fmts_date_dependent:
await self.set_dates(data)
if not getattr(self,'column_widths',None):
self.set_column_params()
data = list(self.filter_data()) # method could be a generator
if self.group and (self.sort_key in ('addr','txid','twmmid')):
for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]:
for k in ('addr','txid','twmmid'):
if self.sort_key == k and getattr(a,k) == getattr(b,k):
b.skip = (k,'addr')[k=='twmmid']
if data:
self._format_squeezed_display_data = (
self.hdr_fmt.format(
a = ' '.join(self.sort_info()),
b = self.proto.dcoin,
c = self.total.hl() if hasattr(self,'total') else None )
+ '\nNetwork: {}'.format((nocolor,green)[color](
self.proto.coin + ' ' +
self.proto.chain_name.upper() ))
+ '\n' + '\n'.join(self.gen_squeezed_display(self.column_widths,color=color))
+ '\n'
dt = getattr(self.display_type,display_type)
cw = self.get_column_widths(data,wide=dt.detail) if dt.need_column_widths else None
if self.has_age and (self.age_fmt in self.age_fmts_date_dependent or dt.detail):
await self.set_dates(data)
self._display_data[display_type] = (
self.header(color) + self.subheader(color) + '\n'
+ (
dt.item_separator.join(getattr(self,dt.fmt_method)(data,cw,color=color)) + '\n'
if data else (nocolor,yellow)[color]('[no data for requested parameters]') + '\n'
)
)
return self._format_squeezed_display_data
return self._display_data[display_type] + ('' if interactive else self.footer(color))
async def format_detail(self,color):
if self.has_age:
await self.set_dates(self.data)
sep = self.detail_display_separator
return self.print_hdr_fs.format(
a = capfirst(self.desc),
b = self.rpc.blockcount,
c = make_timestr(self.rpc.cur_date),
d = 'Network: {}\n'.format((nocolor,green)[color](
self.proto.coin + ' ' +
self.proto.chain_name.upper() )),
e = ' '.join(self.sort_info(include_group=False)),
f = sep.join(self.gen_detail_display(color)),
g = self.proto.dcoin,
h = self.total.hl(color=color) if hasattr(self,'total') else None )
def footer(self,color):
return '\nTOTAL: {} {}\n'.format(
self.total.hl(color=color) if hasattr(self,'total') else None,
self.proto.dcoin
) if hasattr(self,'total') else ''
async def view_and_sort(self):
from ..opts import opt
@ -304,7 +329,7 @@ class TwCommon:
reply = get_char(
'' if self.no_output else (
clear_screen
+ await self.format_squeezed()
+ await self.format('squeezed',interactive=True)
+ '\n'
+ (self.oneshot_msg or '')
+ prompt
@ -324,7 +349,6 @@ class TwCommon:
self.do_sort(action[2:])
elif hasattr(self.item_action,action):
await self.item_action().run(self,action)
self.set_column_params()
elif action == 'a_quit':
msg('')
return self.data
@ -339,11 +363,11 @@ class TwCommon:
def d_days(self,parent):
af = parent.age_fmts_interactive
parent.age_fmt = af[(af.index(parent.age_fmt) + 1) % len(af)]
if parent.update_widths_on_age_toggle:
parent.set_column_params()
if parent.update_widths_on_age_toggle: # TODO
pass
def d_redraw(self,parent):
parent.set_column_params()
pass
def d_reverse(self,parent):
parent.data.reverse()
@ -365,14 +389,11 @@ class TwCommon:
msg('')
from ..fileutil import write_data_to_file
from ..exception import UserNonConfirmation
hdr = {
'squeezed': f'[screen print truncated to width {parent.cols}]\n',
'detail': '',
}[output_type]
hdr = getattr(parent.display_type,output_type).print_header.format(parent.cols)
try:
write_data_to_file(
outfile = outfile,
data = hdr + await getattr(parent,f'format_{output_type}')(color=False),
data = hdr + await parent.format(display_type=output_type,color=False),
desc = f'{parent.desc} listing' )
except UserNonConfirmation as e:
parent.oneshot_msg = yellow(f'File {outfile!r} not overwritten by user request\n\n')
@ -381,12 +402,12 @@ class TwCommon:
async def a_view(self,parent):
from ..ui import do_pager
do_pager( await parent.format_squeezed(color=True,cached=True) )
do_pager( await parent.format('squeezed',color=True,cached=True) )
self.post_view(parent)
async def a_view_detail(self,parent):
from ..ui import do_pager
do_pager( await parent.format_detail(color=True) )
do_pager( await parent.format('detail',color=True) )
self.post_view(parent)
def post_view(self,parent):

View file

@ -23,6 +23,12 @@ from .common import TwCommon
class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
class display_type(TwCommon.display_type):
class detail(TwCommon.display_type.detail):
need_column_widths = False
item_separator = '\n\n'
def __new__(cls,proto,*args,**kwargs):
return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw','txhistory'))
@ -30,11 +36,10 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
show_txid = False
show_unconfirmed = False
show_total_amt = False
print_hdr_fs = '{a} (block #{b}, {c} UTC)\n{d}Sort order: {e}\n{f}\n'
age_fmts_interactive = ('confs','block','days','date','date_time')
update_widths_on_age_toggle = True
detail_display_separator = '\n\n'
print_output_types = ('squeezed','detail')
filters = ('show_unconfirmed',)
async def __init__(self,proto,sinceblock=0):
self.proto = proto
@ -47,11 +52,7 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
return 'No transaction history {}found!'.format(
f'from block {self.sinceblock} ' if self.sinceblock else '')
def set_column_params(self):
data = self.data
show_txid = self.show_txid
for d in data:
d.skip = ''
def get_column_widths(self,data,wide=False):
# var cols: addr1 addr2 comment [txid]
if not hasattr(self,'varcol_maxwidths'):
@ -81,9 +82,9 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
'spc': 6 + self.show_txid, # 5(6) spaces between cols + 1 leading space in fs
}
self.column_widths = self.compute_column_widths(widths,maxws,minws,maxws_nice)
return self.compute_column_widths(widths,maxws,minws,maxws_nice,wide=wide)
def gen_squeezed_display(self,cw,color):
def gen_squeezed_display(self,data,cw,color):
if self.sinceblock:
yield f'Displaying transactions since block {self.sinceblock.hl(color=color)}'
@ -116,7 +117,7 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
l = 'Comment' ).rstrip()
n = 0
for d in self.data:
for d in data:
if d.confirmations > 0 or self.show_unconfirmed:
n += 1
yield fs.format(
@ -128,7 +129,7 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
a2 = d.vouts_disp( 'outputs', width=cw.addr2, color=color ),
l = d.comment.fmt( width=cw.comment, color=color ) ).rstrip()
def gen_detail_display(self,color):
def gen_detail_display(self,data,cw,color):
yield (
(f'Displaying transactions since block {self.sinceblock.hl(color=color)}\n'
@ -150,7 +151,7 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
""",strip_char='\t').strip()
n = 0
for d in self.data:
for d in data:
if d.confirmations > 0 or self.show_unconfirmed:
n += 1
yield fs.format(
@ -202,7 +203,6 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
def d_show_txid(self,parent):
parent.show_txid = not parent.show_txid
parent.set_column_params()
def d_show_unconfirmed(self,parent):
parent.show_unconfirmed = not parent.show_unconfirmed

View file

@ -39,13 +39,11 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
return MMGenObject.__new__(proto.base_proto_subclass(cls,'tw','unspent'))
txid_w = 64
print_hdr_fs = '{a} (block #{b}, {c} UTC)\n{d}Sort order: {e}\n{f}\n\nTotal {g}: {h}\n'
no_rpcdata_errmsg = f"""
no_rpcdata_errmsg = """
No spendable outputs found! Import addresses with balances into your
watch-only wallet using 'mmgen-addrimport' and then re-run this program.
"""
update_widths_on_age_toggle = False
detail_display_separator = '\n'
print_output_types = ('detail',)
class MMGenTwUnspentOutput(MMGenListItem):
@ -104,10 +102,23 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
self.proto,
**{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } )
def set_column_params(self):
def filter_data(self):
data = self.data
for i in data:
i.skip = ''
for d in data:
d.skip = ''
gkeys = {'addr':'addr','twmmid':'addr','txid':'txid'}
if self.group and self.sort_key in gkeys:
for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]:
for k in gkeys:
if self.sort_key == k and getattr(a,k) == getattr(b,k):
b.skip = gkeys[k]
return data
def get_column_widths(self,data,wide=False):
self.cols = self.get_term_columns(g.min_screen_width)
@ -122,14 +133,13 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
btaddr_w = addr_w - acct_w - 1
comment_w = acct_w - mmid_w - 1
tx_w = min(self.txid_w,self.cols-addr_w-29-col1_w) # min=6 TODO
txdots = ('','..')[tx_w < self.txid_w]
self.column_widths = namedtuple(
return namedtuple(
'column_widths',
['num','mmid','addr','btaddr','comment','tx']
)(col1_w, mmid_w, addr_w, btaddr_w, comment_w, tx_w)
def gen_squeezed_display(self,cw,color):
def gen_squeezed_display(self,data,cw,color):
fs = self.squeezed_fs_fs.format( cw=cw.num, tw=cw.tx )
hdr_fs = self.squeezed_hdr_fs_fs.format( cw=cw.num, tw=cw.tx )
yield hdr_fs.format(
@ -140,7 +150,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
A2 = f' Amt({self.proto.coin})'.ljust(self.disp_prec+4),
c = self.age_hdr ).rstrip()
for n,i in enumerate(self.data):
for n,i in enumerate(data):
addr_dots = '|' + '.'*(cw.addr-1)
mmid_disp = MMGenID.fmtc(
(
@ -177,9 +187,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
c = self.age_disp(i,self.age_fmt),
).rstrip()
def gen_detail_display(self,color):
data = self.data
def gen_detail_display(self,data,cw,color):
addr_w = max(len(i.addr) for i in data)
mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in data) or 12 # DEADBEEF:S:1

View file

@ -7,6 +7,7 @@ if overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA'):
rpc_init = rpc_init
async def rpc_init(*args,**kwargs):
from .obj import NonNegativeInt
ret = await overlay_fake_data.rpc_init(*args,**kwargs)
ret.blockcount = 1000000
ret.blockcount = NonNegativeInt(1000000)
return ret