tw/unspent.py: factor out shared from unspent-output-specific code

This commit is contained in:
The MMGen Project 2022-05-23 16:28:52 +00:00
commit 6d2a8913c9
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
2 changed files with 177 additions and 174 deletions

View file

@ -24,9 +24,9 @@ import time
from ..globalvars import g
from ..objmethods import Hilite,InitErrors,MMGenObject
from ..obj import TwComment,get_obj,MMGenIdx
from ..color import red,yellow
from ..util import msg,msg_r,die,line_input,do_pager,capfirst
from ..obj import TwComment,get_obj,MMGenIdx,MMGenList
from ..color import nocolor,red,yellow,green
from ..util import msg,msg_r,fmt,die,line_input,do_pager,capfirst,make_timestr
from ..addr import MMGenID
# mixin class for TwUnspentOutputs,TwAddrList:
@ -58,6 +58,22 @@ class TwCommon:
else:
return self.date_formatter[age_fmt](self.rpc,o.date)
async def get_data(self,sort_key=None,reverse_sort=False):
rpc_data = await self.get_rpc_data()
if not rpc_data:
die(0,fmt(self.no_rpcdata_errmsg).strip())
lbl_id = ('account','label')['label_api' in self.rpc.caps]
self.data = MMGenList(self.gen_data(rpc_data,lbl_id))
if not self.data:
die(1,self.no_data_errmsg)
self.do_sort(key=sort_key,reverse=reverse_sort)
@staticmethod
async def set_dates(rpc,us):
if us and us[0].date is None:
@ -112,6 +128,51 @@ class TwCommon:
assert type(reverse) == bool
self.data.sort(key=sort_funcs[key],reverse=reverse or self.reverse)
async def format_for_display(self):
data = self.data
if self.has_age and self.age_fmt in self.age_fmts_date_dependent:
await self.set_dates(self.rpc,data)
self.set_term_columns()
c = getattr(self,'display_constants',None)
if not c:
c = self.display_constants = self.get_display_constants()
if self.group and (self.sort_key in ('addr','txid','twmmid')):
for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]:
for k in ('addr','txid','twmmid'):
if self.sort_key == k and getattr(a,k) == getattr(b,k):
b.skip = (k,'addr')[k=='twmmid']
self.fmt_display = (
self.hdr_fmt.format(
a = ' '.join(self.sort_info()),
b = self.proto.dcoin,
c = self.total.hl() if hasattr(self,'total') else None )
+ ('\nChain: '+green(self.proto.chain_name.upper()) if self.proto.chain_name != 'mainnet' else '')
+ '\n' + '\n'.join(self.gen_display_output(c))
+ '\n'
)
return self.fmt_display
async def format_for_printing(self,color=False,show_confs=True):
if self.has_age:
await self.set_dates(self.rpc,self.data)
self.fmt_print = self.print_hdr_fs.format(
a = capfirst(self.desc),
b = self.rpc.blockcount,
c = make_timestr(self.rpc.cur_date),
d = ('' if self.proto.chain_name == 'mainnet' else
'Chain: {}\n'.format((nocolor,green)[color](self.proto.chain_name.upper())) ),
e = ' '.join(self.sort_info(include_group=False)),
f = '\n'.join(self.gen_print_output(color,show_confs)),
g = self.proto.dcoin,
h = self.total.hl(color=color) if hasattr(self,'total') else None )
return self.fmt_print
async def item_action_loop(self,action):
msg('')
while True:

View file

@ -24,21 +24,20 @@ import time
from collections import namedtuple
from ..globalvars import g
from ..color import red,yellow,green
from ..color import red,yellow
from ..util import (
msg,
die,
capfirst,
suf,
fmt,
make_timestr,
keypress_confirm,
line_input,
base_proto_tw_subclass
)
from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..obj import ImmutableAttr,ListItemAttr,MMGenListItem,TwComment,get_obj,HexStr,CoinTxID,MMGenIdx,MMGenList
from ..obj import ImmutableAttr,ListItemAttr,MMGenListItem,TwComment,get_obj,HexStr,CoinTxID,MMGenList
from ..addr import CoinAddr,MMGenID
from ..rpc import rpc_init
from .common import TwCommon,TwMMGenID,get_tw_label
@ -49,6 +48,11 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
return MMGenObject.__new__(base_proto_tw_subclass(cls,proto,'unspent'))
txid_w = 64
print_hdr_fs = '{a} (block #{b}, {c} UTC)\n{d}Sort order: {e}\n{f}\n\nTotal {g}: {h}\n'
no_rpcdata_errmsg = f"""
No spendable outputs found! Import addresses with balances into your
watch-only wallet using 'mmgen-addrimport' and then re-run this program.
"""
class MMGenTwUnspentOutput(MMGenListItem):
txid = ListItemAttr(CoinTxID)
@ -88,41 +92,22 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
def total(self):
return sum(i.amt for i in self.data)
async def get_data(self,sort_key=None,reverse_sort=False):
us_raw = await self.get_rpc_data()
if not us_raw:
die(0,fmt(f"""
No spendable outputs found! Import addresses with balances into your
watch-only wallet using '{g.proj_name.lower()}-addrimport' and then re-run this program.
""").strip())
lbl_id = ('account','label')['label_api' in self.rpc.caps]
def gen_unspent():
for o in us_raw:
if not lbl_id in o:
continue # coinbase outputs have no account field
l = get_tw_label(self.proto,o[lbl_id])
if l:
o.update({
'twmmid': l.mmid,
'label': l.comment or '',
'amt': self.proto.coin_amt(o['amount']),
'addr': CoinAddr(self.proto,o['address']),
'confs': o['confirmations']
})
yield self.MMGenTwUnspentOutput(
self.proto,
**{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } )
self.data = MMGenList(gen_unspent())
if not self.data:
die(1,self.no_data_errmsg)
self.do_sort(key=sort_key,reverse=reverse_sort)
def gen_data(self,rpc_data,lbl_id):
for o in rpc_data:
if not lbl_id in o:
continue # coinbase outputs have no account field
l = get_tw_label(self.proto,o[lbl_id])
if l:
o.update({
'twmmid': l.mmid,
'label': l.comment or '',
'amt': self.proto.coin_amt(o['amount']),
'addr': CoinAddr(self.proto,o['address']),
'confs': o['confirmations']
})
yield self.MMGenTwUnspentOutput(
self.proto,
**{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } )
def get_display_constants(self):
data = self.data
@ -145,149 +130,106 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
dc = namedtuple('display_constants',['col1_w','mmid_w','addr_w','btaddr_w','label_w','tx_w','txdots'])
return dc(col1_w,mmid_w,addr_w,btaddr_w,label_w,tx_w,txdots)
async def format_for_display(self):
data = self.data
if self.has_age and self.age_fmt in self.age_fmts_date_dependent:
await self.set_dates(self.rpc,data)
self.set_term_columns()
def gen_display_output(self,c):
fs = self.display_fs_fs.format( cw=c.col1_w, tw=c.tx_w )
hdr_fs = self.display_hdr_fs_fs.format( cw=c.col1_w, tw=c.tx_w )
yield hdr_fs.format(
n = 'Num',
t = 'TXid'.ljust(c.tx_w - 2) + ' Vout',
a = 'Address'.ljust(c.addr_w),
A = f'Amt({self.proto.dcoin})'.ljust(self.disp_prec+5),
A2 = f' Amt({self.proto.coin})'.ljust(self.disp_prec+4),
c = {
'confs': 'Confs',
'block': 'Block',
'days': 'Age(d)',
'date': 'Date',
'date_time': 'Date',
}[self.age_fmt],
).rstrip()
c = getattr(self,'display_constants',None)
if not c:
c = self.display_constants = self.get_display_constants()
for n,i in enumerate(self.data):
addr_dots = '|' + '.'*(c.addr_w-1)
mmid_disp = MMGenID.fmtc(
(
'.'*c.mmid_w if i.skip == 'addr' else
i.twmmid if i.twmmid.type == 'mmgen' else
f'Non-{g.proj_name}'
),
width = c.mmid_w,
color = True )
if self.group and (self.sort_key in ('addr','txid','twmmid')):
for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]:
for k in ('addr','txid','twmmid'):
if self.sort_key == k and getattr(a,k) == getattr(b,k):
b.skip = (k,'addr')[k=='twmmid']
if self.show_mmid:
addr_out = '{} {}{}'.format((
type(i.addr).fmtc(addr_dots,width=c.btaddr_w,color=True) if i.skip == 'addr' else
i.addr.fmt(width=c.btaddr_w,color=True)
),
mmid_disp,
(' ' + i.label.fmt(width=c.label_w,color=True)) if c.label_w > 0 else ''
)
else:
addr_out = (
type(i.addr).fmtc(addr_dots,width=c.addr_w,color=True) if i.skip=='addr' else
i.addr.fmt(width=c.addr_w,color=True) )
def gen_output():
yield self.hdr_fmt.format(
a = ' '.join(self.sort_info()),
b = self.proto.dcoin,
c = self.total.hl() if hasattr(self,'total') else None )
if self.proto.chain_name != 'mainnet':
yield 'Chain: '+green(self.proto.chain_name.upper())
fs = self.display_fs_fs.format( cw=c.col1_w, tw=c.tx_w )
hdr_fs = self.display_hdr_fs_fs.format( cw=c.col1_w, tw=c.tx_w )
yield hdr_fs.format(
n = 'Num',
t = 'TXid'.ljust(c.tx_w - 2) + ' Vout',
a = 'Address'.ljust(c.addr_w),
A = f'Amt({self.proto.dcoin})'.ljust(self.disp_prec+5),
A2 = f' Amt({self.proto.coin})'.ljust(self.disp_prec+4),
c = {
'confs': 'Confs',
'block': 'Block',
'days': 'Age(d)',
'date': 'Date',
'date_time': 'Date',
}[self.age_fmt],
yield fs.format(
n = str(n+1)+')',
t = (
'' if not i.txid else
' ' * (c.tx_w-4) + '|...' if i.skip == 'txid' else
i.txid[:c.tx_w-len(c.txdots)] + c.txdots ),
v = i.vout,
a = addr_out,
A = i.amt.fmt(color=True,prec=self.disp_prec),
A2 = (i.amt2.fmt(color=True,prec=self.disp_prec) if i.amt2 is not None else ''),
c = self.age_disp(i,self.age_fmt),
).rstrip()
for n,i in enumerate(data):
addr_dots = '|' + '.'*(c.addr_w-1)
mmid_disp = MMGenID.fmtc(
(
'.'*c.mmid_w if i.skip == 'addr' else
i.twmmid if i.twmmid.type == 'mmgen' else
f'Non-{g.proj_name}'
),
width = c.mmid_w,
color = True )
if self.show_mmid:
addr_out = '{} {}{}'.format((
type(i.addr).fmtc(addr_dots,width=c.btaddr_w,color=True) if i.skip == 'addr' else
i.addr.fmt(width=c.btaddr_w,color=True)
),
mmid_disp,
(' ' + i.label.fmt(width=c.label_w,color=True)) if c.label_w > 0 else ''
)
else:
addr_out = (
type(i.addr).fmtc(addr_dots,width=c.addr_w,color=True) if i.skip=='addr' else
i.addr.fmt(width=c.addr_w,color=True) )
yield fs.format(
n = str(n+1)+')',
t = (
'' if not i.txid else
' ' * (c.tx_w-4) + '|...' if i.skip == 'txid' else
i.txid[:c.tx_w-len(c.txdots)] + c.txdots ),
v = i.vout,
a = addr_out,
A = i.amt.fmt(color=True,prec=self.disp_prec),
A2 = (i.amt2.fmt(color=True,prec=self.disp_prec) if i.amt2 is not None else ''),
c = self.age_disp(i,self.age_fmt),
).rstrip()
self.fmt_display = '\n'.join(gen_output()) + '\n'
return self.fmt_display
async def format_for_printing(self,color=False,show_confs=True):
if self.has_age:
await self.set_dates(self.rpc,self.data)
def gen_print_output(self,color,show_confs):
addr_w = max(len(i.addr) for i in self.data)
mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in self.data) or 12 # DEADBEEF:S:1
fs = self.print_fs_fs.format(
tw = self.txid_w + 3,
cf = '{c:<8} ' if show_confs else '',
aw = self.proto.coin_amt.max_prec + 5 )
yield fs.format(
n = 'Num',
t = 'Tx ID,Vout',
a = 'Address'.ljust(addr_w),
m = 'MMGen ID'.ljust(mmid_w),
A = f'Amount({self.proto.dcoin})',
A2 = f'Amount({self.proto.coin})',
c = 'Confs', # skipped for eth
b = 'Block', # skipped for eth
D = 'Date',
l = 'Label' )
def gen_output():
max_lbl_len = max([len(i.label) for i in self.data if i.label] or [2])
for n,i in enumerate(self.data):
yield fs.format(
n = 'Num',
t = 'Tx ID,Vout',
a = 'Address'.ljust(addr_w),
m = 'MMGen ID'.ljust(mmid_w),
A = f'Amount({self.proto.dcoin})',
A2 = f'Amount({self.proto.coin})',
c = 'Confs', # skipped for eth
b = 'Block', # skipped for eth
D = 'Date',
l = 'Label' )
max_lbl_len = max([len(i.label) for i in self.data if i.label] or [2])
for n,i in enumerate(self.data):
yield fs.format(
n = str(n+1) + ')',
t = '{},{}'.format(
('|'+'.'*63 if i.skip == 'txid' and self.group else i.txid),
i.vout ),
a = (
'|'+'.' * addr_w if i.skip == 'addr' and self.group else
i.addr.fmt(color=color,width=addr_w) ),
m = MMGenID.fmtc(
(i.twmmid if i.twmmid.type == 'mmgen' else f'Non-{g.proj_name}'),
width = mmid_w,
color = color ),
A = i.amt.fmt(color=color),
A2 = ( i.amt2.fmt(color=color) if i.amt2 is not None else '' ),
c = i.confs,
b = self.rpc.blockcount - (i.confs - 1),
D = self.age_disp(i,'date_time'),
l = i.label.hl(color=color) if i.label else
TwComment.fmtc(
s = '',
color = color,
nullrepl = '-',
width = max_lbl_len )
).rstrip()
fs2 = '{} (block #{}, {} UTC)\n{}Sort order: {}\n{}\n\nTotal {}: {}\n'
self.fmt_print = fs2.format(
capfirst(self.desc),
self.rpc.blockcount,
make_timestr(self.rpc.cur_date),
('' if self.proto.chain_name == 'mainnet' else
'Chain: {}\n'.format(green(self.proto.chain_name.upper())) ),
' '.join(self.sort_info(include_group=False)),
'\n'.join(gen_output()),
self.proto.dcoin,
self.total.hl(color=color) )
return self.fmt_print
n = str(n+1) + ')',
t = '{},{}'.format(
('|'+'.'*63 if i.skip == 'txid' and self.group else i.txid),
i.vout ),
a = (
'|'+'.' * addr_w if i.skip == 'addr' and self.group else
i.addr.fmt(color=color,width=addr_w) ),
m = MMGenID.fmtc(
(i.twmmid if i.twmmid.type == 'mmgen' else f'Non-{g.proj_name}'),
width = mmid_w,
color = color ),
A = i.amt.fmt(color=color),
A2 = ( i.amt2.fmt(color=color) if i.amt2 is not None else '' ),
c = i.confs,
b = self.rpc.blockcount - (i.confs - 1),
D = self.age_disp(i,'date_time'),
l = i.label.hl(color=color) if i.label else
TwComment.fmtc(
s = '',
color = color,
nullrepl = '-',
width = max_lbl_len )
).rstrip()
def display_total(self):
msg('\nTotal unspent: {} {} ({} output{})'.format(