mmgen.tw.{ctl,common,txhistory,unspent}: cleanups

This commit is contained in:
The MMGen Project 2022-11-09 13:05:09 +00:00
commit b21864fd08
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
19 changed files with 247 additions and 220 deletions

View file

@ -97,4 +97,8 @@ class TwAddrData(AddrData,metaclass=AsyncInit):
vmsg(f'{i} {g.proj_name} addresses found, {len(twd)} accounts total')
for al_id in out:
self.add(AddrList(self.proto,al_id=al_id,adata=AddrListData(sorted(out[al_id],key=lambda a: a.idx))))
self.add(AddrList(
self.proto,
al_id = al_id,
adata = AddrListData(sorted( out[al_id], key=lambda a: a.idx ))
))

View file

@ -46,8 +46,8 @@ def get_obj(objname,*args,**kwargs):
ret = objname(**kwargs)
except Exception as e:
if not silent:
from .util import msg
msg(f'{e!s}')
from .util import rmsg
rmsg(f'{e!s}')
return False
else:
return True if return_bool else ret

View file

@ -22,13 +22,14 @@ class BitcoinTwGetBalance(TwGetBalance):
async def create_data(self):
# 0: unconfirmed, 1: below minconf, 2: confirmed, 3: spendable (privkey in wallet)
lbl_id = ('account','label')['label_api' in self.rpc.caps]
amt0 = self.proto.coin_amt('0')
for d in await self.rpc.call('listunspent',0):
lbl = get_tw_label(self.proto,d[lbl_id])
if lbl:
if lbl.mmid.type == 'mmgen':
key = lbl.mmid.obj.sid
if key not in self.data:
self.data[key] = [self.proto.coin_amt('0')] * 4
self.data[key] = [amt0] * 4
else:
key = 'Non-MMGen'
else:

View file

@ -9,7 +9,7 @@
# https://gitlab.com/mmgen/mmgen
"""
proto.btc.tw: Bitcoin base protocol tracking wallet dependency classes
proto.btc.tw.common: Bitcoin base protocol tracking wallet dependency classes
"""
from ....addr import CoinAddr
@ -26,7 +26,7 @@ class BitcoinTwCommon:
"""
def check_dup_mmid(acct_labels):
mmid_prev,err = None,False
for mmid in sorted(a.mmid for a in acct_labels if a):
for mmid in sorted(label.mmid for label in acct_labels if label):
if mmid == mmid_prev:
err = True
msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n')
@ -72,6 +72,7 @@ class BitcoinTwCommon:
"""
data = {}
lbl_id = ('account','label')['label_api' in self.rpc.caps]
amt0 = self.proto.coin_amt('0')
for d in await self.rpc.call('listunspent',0):
@ -99,7 +100,7 @@ class BitcoinTwCommon:
lm.vout = d['vout']
lm.date = None
data[lm] = {
'amt': self.proto.coin_amt('0'),
'amt': amt0,
'lbl': label,
'addr': CoinAddr(self.proto,d['address']) }
amt = self.proto.coin_amt(d['amount'])

View file

@ -40,10 +40,11 @@ class BitcoinTrackingWallet(TrackingWallet):
raise NotImplementedError(f'address removal not implemented for coin {self.proto.coin}')
@write_mode
async def set_comment(self,coinaddr,lbl):
async def set_label(self,coinaddr,lbl):
args = self.rpc.daemon.set_comment_args( self.rpc, coinaddr, lbl )
try:
return await self.rpc.call(*args)
await self.rpc.call(*args)
return True
except Exception as e:
rmsg(e.args[0])
return False
@ -91,7 +92,7 @@ class BitcoinTrackingWallet(TrackingWallet):
@write_mode
async def rescan_address(self,addrspec):
res = await self.resolve_address(addrspec,None)
res = await self.resolve_address(addrspec)
if not res:
return False
return await self.rescan_addresses([res.coinaddr])

View file

@ -33,11 +33,12 @@ class EthereumTwGetBalance(TwGetBalance):
async def create_data(self):
data = self.wallet.mmid_ordered_dict
amt0 = self.proto.coin_amt('0')
for d in data:
if d.type == 'mmgen':
key = d.obj.sid
if key not in self.data:
self.data[key] = [self.proto.coin_amt('0')] * 4
self.data[key] = [amt0] * 4
else:
key = 'Non-MMGen'

View file

@ -127,12 +127,12 @@ class EthereumTrackingWallet(TrackingWallet):
return None
@write_mode
async def set_comment(self,coinaddr,lbl):
async def set_label(self,coinaddr,lbl):
for addr,d in list(self.data_root.items()):
if addr == coinaddr:
d['comment'] = lbl.comment
self.write()
return None
return True
else:
msg(f'Address {coinaddr!r} not found in {self.data_root_desc!r} section of tracking wallet')
return False
@ -156,6 +156,19 @@ class EthereumTrackingWallet(TrackingWallet):
return self.data['tokens'][token]['params'].get(param)
return None
@property
def sorted_list(self):
return sorted(
[ { 'addr':x[0],
'mmid':x[1]['mmid'],
'comment':x[1]['comment'] }
for x in self.data_root.items() if x[0] not in ('params','coin') ],
key=lambda x: x['mmid'].sort_key+x['addr'] )
@property
def mmid_ordered_dict(self):
return dict((x['mmid'],{'addr':x['addr'],'comment':x['comment']}) for x in self.sorted_list)
class EthereumTokenTrackingWallet(EthereumTrackingWallet):
desc = 'Ethereum token tracking wallet'

View file

@ -42,7 +42,7 @@ class EthereumTwUnspentOutputs(TwUnspentOutputs):
Sort options: [a]mount, a[d]dress, [r]everse, [M]mgen addr
Display options: show [m]mgen addr, r[e]draw screen
Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view,
add [l]abel, [D]elete address, [R]efresh balance:
[D]elete address, add [l]abel, [R]efresh balance:
"""
key_mappings = {
'a':'s_amt',

View file

@ -110,7 +110,7 @@ class tool_cmd(tool_cmd_base):
await obj.get_data(sort_key=sort,reverse_sort=reverse)
if interactive:
await obj.view_and_sort()
await obj.view_filter_and_sort()
return True
else:
return await obj.format('detail' if detail else 'squeezed')
@ -151,8 +151,7 @@ class tool_cmd(tool_cmd_base):
async def add_label(self,mmgen_or_coin_addr:str,label:str):
"add descriptive label for address in tracking wallet"
from ..tw.ctl import TrackingWallet
await (await TrackingWallet(self.proto,mode='w')).add_comment( mmgen_or_coin_addr, label, on_fail='raise' )
return True
return await (await TrackingWallet(self.proto,mode='w')).set_comment(mmgen_or_coin_addr,label)
async def remove_label(self,mmgen_or_coin_addr:str):
"remove descriptive label for address in tracking wallet"
@ -176,7 +175,7 @@ class tool_cmd(tool_cmd_base):
if ret:
from ..util import Msg
from ..addr import is_coin_addr
return ret.mmaddr if is_coin_addr(self.proto,mmgen_or_coin_addr) else ret.coinaddr
return ret.twmmid if is_coin_addr(self.proto,mmgen_or_coin_addr) else ret.coinaddr
else:
return False

View file

@ -92,9 +92,9 @@ class TwCommon:
def age_disp(self,o,age_fmt):
if age_fmt == 'confs':
return o.confs
return o.confs or '-'
elif age_fmt == 'block':
return self.rpc.blockcount - (o.confs - 1)
return self.rpc.blockcount + 1 - o.confs if o.confs else '-'
else:
return self.date_formatter[age_fmt](self.rpc,o.date)
@ -109,21 +109,13 @@ class TwCommon:
res = self.gen_data(rpc_data,lbl_id)
self.data = MMGenList(await res if type(res).__name__ == 'coroutine' else res)
self.disp_data = list(self.filter_data())
if not self.data:
die(1,self.no_data_errmsg)
self.do_sort(key=sort_key,reverse=reverse_sort)
async def set_dates(self,us):
if not self.dates_set:
# 'blocktime' differs from 'time', is same as getblockheader['time']
dates = [ o.get('blocktime',0)
for o in await self.rpc.gathered_icall('gettransaction',[(o.txid,True,False) for o in us]) ]
for idx,o in enumerate(us):
o.date = dates[idx]
self.dates_set = True
@property
def age_w(self):
return self.age_col_params[self.age_fmt][0]
@ -258,10 +250,10 @@ class TwCommon:
def header(self,color):
Blue,Green = (blue,green) if color else (nocolor,nocolor)
yes,no = green('yes'),red('no') if color else ('yes','no')
Yes,No = (green('yes'),red('no')) if color else ('yes','no')
def fmt_filter(k):
return '{}:{}'.format(k,yes if getattr(self,k) else no)
return '{}:{}'.format(k,{0:No,1:Yes}[getattr(self,k)])
return '{h} (sort order: {s}){f}\nNetwork: {n}\nBlock {b} [{d}]\n{t}'.format(
h = self.hdr_lbl.upper(),
@ -277,29 +269,26 @@ class TwCommon:
return ''
def filter_data(self):
return self.data
return self.data.copy()
async def format(self,display_type,color=True,cached=False,interactive=False):
if not cached:
data = list(self.filter_data()) # method could be a generator
dt = getattr(self.display_type,display_type)
if data:
if self.has_age and (self.age_fmt in self.age_fmts_date_dependent or dt.detail):
await self.set_dates(self.data)
dt = getattr(self.display_type,display_type)
data = self.disp_data = list(self.filter_data()) # method could be a generator
cw = self.get_column_widths(data,wide=dt.detail) if dt.need_column_widths else None
cw = self.get_column_widths(data,wide=dt.detail) if data and dt.need_column_widths else None
if self.has_age and (self.age_fmt in self.age_fmts_date_dependent or dt.detail):
await self.set_dates(data)
self._display_data[display_type] = (
self.header(color) + self.subheader(color) + '\n'
+ (
dt.item_separator.join(getattr(self,dt.fmt_method)(data,cw,color=color)) + '\n'
if data else (nocolor,yellow)[color]('[no data for requested parameters]') + '\n'
)
self._display_data[display_type] = '{a}{b}\n{c}\n'.format(
a = self.header(color),
b = self.subheader(color),
c = dt.item_separator.join(getattr(self,dt.fmt_method)(data,cw,color=color))
if data else (nocolor,yellow)[color]('[no data for requested parameters]')
)
return self._display_data[display_type] + ('' if interactive else self.footer(color))
@ -310,7 +299,7 @@ class TwCommon:
self.proto.dcoin
) if hasattr(self,'total') else ''
async def view_and_sort(self):
async def view_filter_and_sort(self):
from ..opts import opt
from ..term import get_char
prompt = self.prompt.strip() + '\b'
@ -351,7 +340,7 @@ class TwCommon:
await self.item_action().run(self,action)
elif action == 'a_quit':
msg('')
return self.data
return self.disp_data
class action:
@ -421,15 +410,75 @@ class TwCommon:
msg('')
from ..ui import line_input
while True:
ret = line_input(f'Enter {parent.item_desc} number (or RETURN to return to main menu): ')
ret = line_input(f'Enter {parent.item_desc} number (or ENTER to return to main menu): ')
if ret == '':
return None
idx = get_obj(MMGenIdx,n=ret,silent=True)
if not idx or idx < 1 or idx > len(parent.data):
msg(f'Choice must be a single number between 1 and {len(parent.data)}')
if not idx or idx < 1 or idx > len(parent.disp_data):
msg(f'Choice must be a single number between 1 and {len(parent.disp_data)}')
elif (await getattr(self,action)(parent,idx)) != 'redo':
break
async def a_balance_refresh(self,parent,idx):
from ..ui import keypress_confirm
if not keypress_confirm(
f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?'):
return 'redo'
await parent.wallet.get_balance( parent.disp_data[idx-1].addr, force_rpc=True )
await parent.get_data()
parent.oneshot_msg = yellow(f'{parent.proto.dcoin} balance for account #{idx} refreshed\n\n')
async def a_addr_delete(self,parent,idx):
from ..ui import keypress_confirm
if not keypress_confirm(
'Removing {} {} from tracking wallet. Is this what you want?'.format(
parent.item_desc, red(f'#{idx}') )):
return 'redo'
if await parent.wallet.remove_address( parent.disp_data[idx-1].addr ):
await parent.get_data()
parent.oneshot_msg = yellow(f'{capfirst(parent.item_desc)} #{idx} removed\n\n')
else:
await asyncio.sleep(3)
parent.oneshot_msg = red('Address could not be removed\n\n')
async def a_comment_add(self,parent,idx):
async def do_comment_add(comment):
if await parent.wallet.set_comment( entry.twmmid, comment, entry.addr ):
await parent.get_data()
parent.oneshot_msg = yellow('Label {a} {b}{c}\n\n'.format(
a = 'for' if cur_comment and comment else 'added to' if comment else 'removed from',
b = desc,
c = ' edited' if cur_comment and comment else '' ))
return True
else:
await asyncio.sleep(3)
parent.oneshot_msg = red('Label for {desc} could not be {action}\n\n'.format(
desc = desc,
action = 'edited' if cur_comment and comment else 'added' if comment else 'removed'
))
return False
entry = parent.disp_data[idx-1]
desc = f'{parent.item_desc} #{idx}'
cur_comment = parent.disp_data[idx-1].comment
msg('Current label: {}'.format(cur_comment.hl() if cur_comment else '(none)'))
from ..ui import line_input
res = line_input(
'Enter label text for {} {}: '.format(parent.item_desc,red(f'#{idx}')),
insert_txt = cur_comment )
if res == cur_comment:
parent.oneshot_msg = green(f'Label for {desc} unchanged\n\n')
return None
elif res == '':
from ..ui import keypress_confirm
if not keypress_confirm(f'Removing label for {desc}. Is this what you want?'):
return None
return await do_comment_add(res)
class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
color = 'orange'
width = 0
@ -437,22 +486,24 @@ class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
def __new__(cls,proto,id_str):
if type(id_str) == cls:
return id_str
ret = None
try:
ret = MMGenID(proto,id_str)
sort_key,idtype = ret.sort_key,'mmgen'
ret = addr = disp = MMGenID(proto,id_str)
sort_key,idtype = (ret.sort_key,'mmgen')
except Exception as e:
try:
assert id_str.split(':',1)[0] == proto.base_coin.lower(),(
coin,addr = id_str.split(':',1)
assert coin == proto.base_coin.lower(),(
f'not a string beginning with the prefix {proto.base_coin.lower()!r}:' )
assert id_str.isascii() and id_str[4:].isalnum(), 'not an ASCII alphanumeric string'
assert len(id_str) > 4,'not more that four characters long'
ret,sort_key,idtype = str(id_str),'z_'+id_str,'non-mmgen'
assert addr.isascii() and addr.isalnum(), 'not an ASCII alphanumeric string'
ret,sort_key,idtype,disp = (id_str,'z_'+id_str,'non-mmgen','non-MMGen')
addr = proto.coin_addr(addr)
except Exception as e2:
return cls.init_fail(e,id_str,e2=e2)
me = str.__new__(cls,ret)
me.obj = ret
me.disp = disp
me.addr = addr
me.sort_key = sort_key
me.type = idtype
me.proto = proto

View file

@ -38,6 +38,8 @@ from ..addr import CoinAddr,is_mmgen_id,is_coin_addr
from ..rpc import rpc_init
from .common import TwMMGenID,TwLabel
addr_info = namedtuple('addr_info',['twmmid','coinaddr'])
# decorator for TrackingWallet
def write_mode(orig_func):
def f(self,*args,**kwargs):
@ -179,19 +181,6 @@ class TrackingWallet(MMGenObject,metaclass=AsyncInit):
self.cache_balance(addr,ret,self.cur_balances,self.data_root)
return ret
@property
def sorted_list(self):
return sorted(
[ { 'addr':x[0],
'mmid':x[1]['mmid'],
'comment':x[1]['comment'] }
for x in self.data_root.items() if x[0] not in ('params','coin') ],
key=lambda x: x['mmid'].sort_key+x['addr'] )
@property
def mmid_ordered_dict(self):
return dict((x['mmid'],{'addr':x['addr'],'comment':x['comment']}) for x in self.sorted_list)
def force_write(self):
mode_save = self.mode
self.mode = 'w'
@ -261,45 +250,49 @@ class TrackingWallet(MMGenObject,metaclass=AsyncInit):
if not mmaddr:
mmaddr = f'{self.proto.base_coin.lower()}:{coinaddr}'
return namedtuple('addr_info',['mmaddr','coinaddr'])(
TwMMGenID(self.proto,mmaddr),
coinaddr )
return addr_info( TwMMGenID(self.proto,mmaddr), coinaddr )
# returns on failure
@write_mode
async def add_comment(self,addrspec,comment='',coinaddr=None,silent=False,on_fail='return'):
assert on_fail in ('return','raise'), 'add_comment_chk1'
async def set_comment(self,addrspec,comment='',trusted_coinaddr=None,silent=False):
res = (
addr_info(addrspec,trusted_coinaddr) if trusted_coinaddr
else await self.resolve_address(addrspec) )
res = await self.resolve_address(addrspec,coinaddr)
if not res:
return False
cmt = TwComment(comment) if on_fail=='raise' else get_obj(TwComment,s=comment)
if cmt in (False,None):
comment = get_obj(TwComment,s=comment)
if comment == False:
return False
lbl_txt = res.mmaddr + (' ' + cmt if cmt else '')
lbl = (
TwLabel(self.proto,lbl_txt) if on_fail == 'raise' else
get_obj(TwLabel,proto=self.proto,text=lbl_txt) )
lbl = get_obj(
TwLabel,
proto = self.proto,
text = res.twmmid + (' ' + comment if comment else ''))
if await self.set_comment(res.coinaddr,lbl) == False:
if not silent:
msg( 'Label could not be {}'.format('added' if comment else 'removed') )
if lbl == False:
return False
else:
if await self.set_label(res.coinaddr,lbl):
desc = '{} address {} in tracking wallet'.format(
res.mmaddr.type.replace('mmgen','MMGen'),
res.mmaddr.replace(self.proto.base_coin.lower()+':','') )
res.twmmid.type.replace('mmgen','MMGen'),
res.twmmid.addr.hl() )
if comment:
msg(f'Added label {comment!r} to {desc}')
msg('Added label {} to {}'.format(comment.hl(encl="''"),desc))
else:
msg(f'Removed label from {desc}')
return True
else:
if not silent:
msg( 'Label could not be {}'.format('added' if comment else 'removed') )
return False
@write_mode
async def remove_comment(self,mmaddr):
await self.add_comment(mmaddr,'')
await self.set_comment(mmaddr,'')
async def import_address_common(self,data,batch=False,gather=False):

View file

@ -43,7 +43,6 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
async def __init__(self,proto,sinceblock=0):
self.proto = proto
self.data = MMGenList()
self.rpc = await rpc_init(proto)
self.sinceblock = Int( sinceblock if sinceblock >= 0 else self.rpc.blockcount + sinceblock )
@ -52,6 +51,9 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
return 'No transaction history {}found!'.format(
f'from block {self.sinceblock} ' if self.sinceblock else '')
def filter_data(self):
return (d for d in self.data if d.confirmations > 0 or self.show_unconfirmed)
def get_column_widths(self,data,wide=False):
# var cols: addr1 addr2 comment [txid]
@ -116,18 +118,15 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
a2 = 'Outputs',
l = 'Comment' ).rstrip()
n = 0
for d in data:
if d.confirmations > 0 or self.show_unconfirmed:
n += 1
yield fs.format(
n = str(n) + ')',
i = d.txid_disp( width=cw.txid, color=color ) if hasattr(cw,'txid') else None,
d = d.age_disp( self.age_fmt, width=self.age_w, color=color ),
a1 = d.vouts_disp( 'inputs', width=cw.addr1, color=color ),
A = d.amt_disp(self.show_total_amt).fmt( prec=self.disp_prec, color=color ),
a2 = d.vouts_disp( 'outputs', width=cw.addr2, color=color ),
l = d.comment.fmt( width=cw.comment, color=color ) ).rstrip()
for n,d in enumerate(data,1):
yield fs.format(
n = str(n) + ')',
i = d.txid_disp( width=cw.txid, color=color ) if hasattr(cw,'txid') else None,
d = d.age_disp( self.age_fmt, width=self.age_w, color=color ),
a1 = d.vouts_disp( 'inputs', width=cw.addr1, color=color ),
A = d.amt_disp(self.show_total_amt).fmt( prec=self.disp_prec, color=color ),
a2 = d.vouts_disp( 'outputs', width=cw.addr2, color=color ),
l = d.comment.fmt( width=cw.comment, color=color ) ).rstrip()
def gen_detail_display(self,data,cw,color):
@ -150,23 +149,20 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
{a2}
""",strip_char='\t').strip()
n = 0
for d in data:
if d.confirmations > 0 or self.show_unconfirmed:
n += 1
yield fs.format(
n = str(n) + ')',
d = d.age_disp( 'date_time', width=None, color=None ),
b = d.blockheight_disp(color=color),
D = d.txdate_disp( 'date_time' ),
i = d.txid_disp( width=None, color=color ),
A1 = d.amt_disp(True).hl( color=color ),
A2 = d.amt_disp(False).hl( color=color ),
f = d.fee_disp( color=color ),
a1 = d.vouts_list_disp( 'inputs', color=color, indent=' '*8 ),
oc = d.nOutputs,
a2 = d.vouts_list_disp( 'outputs', color=color, indent=' '*8 ),
)
for n,d in enumerate(data,1):
yield fs.format(
n = str(n) + ')',
d = d.age_disp( 'date_time', width=None, color=None ),
b = d.blockheight_disp(color=color),
D = d.txdate_disp( 'date_time' ),
i = d.txid_disp( width=None, color=color ),
A1 = d.amt_disp(True).hl( color=color ),
A2 = d.amt_disp(False).hl( color=color ),
f = d.fee_disp( color=color ),
a1 = d.vouts_list_disp( 'inputs', color=color, indent=' '*8 ),
oc = d.nOutputs,
a2 = d.vouts_list_disp( 'outputs', color=color, indent=' '*8 ),
)
sort_disp = {
'age': 'Age',
@ -184,7 +180,7 @@ class TwTxHistory(MMGenObject,TwCommon,metaclass=AsyncInit):
'txid': lambda i: i.txid,
}
async def set_dates(self,us):
async def set_dates(self,foo):
pass
@property

View file

@ -71,7 +71,6 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
async def __init__(self,proto,minconf=1,addrs=[]):
self.proto = proto
self.data = MMGenList()
self.show_mmid = True
self.minconf = minconf
self.addrs = addrs
@ -104,7 +103,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
def filter_data(self):
data = self.data
data = self.data.copy()
for d in data:
d.skip = ''
@ -220,10 +219,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
a = (
'|'+'.' * addr_w if i.skip == 'addr' and self.group else
i.addr.fmt(color=color,width=addr_w) ),
m = MMGenID.fmtc(
(i.twmmid if i.twmmid.type == 'mmgen' else f'Non-{g.proj_name}'),
width = mmid_w,
color = color ),
m = MMGenID.fmtc( i.twmmid.disp, width=mmid_w, color=color ),
A = i.amt.fmt(color=color),
A2 = ( i.amt2.fmt(color=color) if i.amt2 is not None else '' ),
c = i.confs,
@ -244,6 +240,15 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
len(self.data),
suf(self.data) ))
async def set_dates(self,us):
if not self.dates_set:
# 'blocktime' differs from 'time', is same as getblockheader['time']
dates = [ o.get('blocktime',0)
for o in await self.rpc.gathered_icall('gettransaction',[(o.txid,True,False) for o in us]) ]
for idx,o in enumerate(us):
o.date = dates[idx]
self.dates_set = True
class action(TwCommon.action):
def s_twmmid(self,parent):
@ -256,61 +261,3 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
def d_group(self,parent):
if parent.can_group:
parent.group = not parent.group
class item_action(TwCommon.item_action):
async def a_balance_refresh(self,uo,idx):
from ..ui import keypress_confirm
if not keypress_confirm(
f'Refreshing tracking wallet {uo.item_desc} #{idx}. Is this what you want?'):
return 'redo'
await uo.wallet.get_balance( uo.data[idx-1].addr, force_rpc=True )
await uo.get_data()
uo.oneshot_msg = yellow(f'{uo.proto.dcoin} balance for account #{idx} refreshed\n\n')
async def a_addr_delete(self,uo,idx):
from ..ui import keypress_confirm
if not keypress_confirm(
f'Removing {uo.item_desc} #{idx} from tracking wallet. Is this what you want?'):
return 'redo'
if await uo.wallet.remove_address( uo.data[idx-1].addr ):
await uo.get_data()
uo.oneshot_msg = yellow(f'{capfirst(uo.item_desc)} #{idx} removed\n\n')
else:
await asyncio.sleep(3)
uo.oneshot_msg = red('Address could not be removed\n\n')
async def a_comment_add(self,uo,idx):
async def do_comment_add(comment):
e = uo.data[idx-1]
if await uo.wallet.add_comment( e.twmmid, comment, coinaddr=e.addr ):
await uo.get_data()
uo.oneshot_msg = yellow('Label {a} {b}{c}\n\n'.format(
a = 'to' if cur_comment and comment else 'added to' if comment else 'removed from',
b = desc,
c = ' edited' if cur_comment and comment else '' ))
else:
await asyncio.sleep(3)
uo.oneshot_msg = red('Label could not be {}\n\n'.format(
'edited' if cur_comment and comment else
'added' if comment else
'removed' ))
desc = f'{uo.item_desc} #{idx}'
cur_comment = uo.data[idx-1].comment
msg('Current label: {}'.format(cur_comment.hl() if cur_comment else '(none)'))
from ..ui import line_input
res = line_input(
"Enter label text (or ENTER to return to main menu): ",
insert_txt = cur_comment )
if res == cur_comment:
return None
elif res == '':
from ..ui import keypress_confirm
return (await do_comment_add('')) if keypress_confirm(
f'Removing label for {desc}. Is this what you want?') else 'redo'
else:
return (await do_comment_add(res)) if get_obj(TwComment,s=res) else 'redo'

View file

@ -340,7 +340,7 @@ class New(Base):
do_license_msg()
if not opt.inputs:
await self.twuo.view_and_sort()
await self.twuo.view_filter_and_sort()
self.twuo.display_total()

View file

@ -18,6 +18,7 @@ from .ot_common import *
from mmgen.protocol import init_proto
proto = init_proto('btc',need_amt=True)
tw_pfx = proto.base_coin.lower() + ':'
zero_addr = '1111111111111111111114oLvT2'
ssm = str(SeedShareCount.max_val)
privkey = PrivKey(proto=proto,s=bytes.fromhex('deadbeef'*8),compressed=True,pubkey_type='std')
@ -162,9 +163,10 @@ tests = {
{'id_str':'F00BAA12:Z:99', 'proto':proto},
{'id_str':tw_pfx, 'proto':proto},
{'id_str':tw_pfx+'я', 'proto':proto},
{'id_str':tw_pfx+'x', 'proto':proto},
),
'good': (
{'id_str':tw_pfx+'x', 'proto':proto},
{'id_str':tw_pfx+zero_addr, 'proto':proto},
{'id_str':'F00BAA12:99', 'proto':proto, 'ret':'F00BAA12:L:99'},
{'id_str':'F00BAA12:L:99', 'proto':proto},
{'id_str':'F00BAA12:S:9999999', 'proto':proto},
@ -188,13 +190,14 @@ tests = {
{'text':tw_pfx+'я x', 'proto':proto},
{'text':utf8_ctrl[:40], 'proto':proto},
{'text':'F00BAA12:S:1 ' + utf8_ctrl[:40], 'proto':proto, 'exc_name': 'BadTwComment'},
{'text':tw_pfx+'x comment','proto':proto},
),
'good': (
{'text':'F00BAA12:99 a comment', 'proto':proto, 'ret':'F00BAA12:L:99 a comment'},
{'text':'F00BAA12:L:99 a comment', 'proto':proto},
{'text': 'F00BAA12:L:99 comment (UTF-8) α', 'proto':proto},
{'text':'F00BAA12:S:9999999 comment', 'proto':proto},
{'text':tw_pfx+'x comment', 'proto':proto},
{'text':tw_pfx+zero_addr+' comment', 'proto':proto},
),
},
'MMGenTxID': {

View file

@ -15,13 +15,5 @@ if overlay_fake_os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC'):
if overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA'):
class overlay_fake_data2:
async def set_dates(foo,us):
for o in us:
o.date = 1831006505 - int(9.7 * 60 * (o.confs - 1))
TwCommon.set_dates = overlay_fake_data2.set_dates
# 1831006505 (09 Jan 2028) = projected time of block 1000000
TwCommon.date_formatter['days'] = lambda rpc,secs: (1831006505 - secs) // 86400

View file

@ -0,0 +1,12 @@
import os as overlay_fake_os
from .unspent_orig import *
if overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA'):
class overlay_fake_data:
async def set_dates(foo,us):
for o in us:
o.date = 1831006505 - int(9.7 * 60 * (o.confs - 1))
TwUnspentOutputs.set_dates = overlay_fake_data.set_dates

View file

@ -71,26 +71,26 @@ bals = {
('98831F3A:E:2','23.45495'),
('98831F3A:E:11','1.234'),
('98831F3A:E:21','2.345'),
(burn_addr + r'\s+Non-MMGen',amt1)],
(burn_addr + r'\s+non-MMGen',amt1)],
'8': [ ('98831F3A:E:1','0'),
('98831F3A:E:2','23.45495'),
('98831F3A:E:11',vbal1,'a1'),
('98831F3A:E:12','99.99895'),
('98831F3A:E:21','2.345'),
(burn_addr + r'\s+Non-MMGen',amt1)],
(burn_addr + r'\s+non-MMGen',amt1)],
'9': [ ('98831F3A:E:1','0'),
('98831F3A:E:2','23.45495'),
('98831F3A:E:11',vbal1,'a1'),
('98831F3A:E:12',vbal2),
('98831F3A:E:21','2.345'),
(burn_addr + r'\s+Non-MMGen',amt1)],
(burn_addr + r'\s+non-MMGen',amt1)],
'10': [ ('98831F3A:E:1','0'),
('98831F3A:E:2','23.0218'),
('98831F3A:E:3','0.4321'),
('98831F3A:E:11',vbal1,'a1'),
('98831F3A:E:12',vbal2),
('98831F3A:E:21','2.345'),
(burn_addr + r'\s+Non-MMGen',amt1)]
(burn_addr + r'\s+non-MMGen',amt1)]
}
token_bals = {
@ -101,18 +101,18 @@ token_bals = {
('98831F3A:E:12','1.23456','0')],
'4': [ ('98831F3A:E:11','110.654317776666555545',vbal1,'a1'),
('98831F3A:E:12','1.23456','0'),
(burn_addr + r'\s+Non-MMGen',amt2,amt1)],
(burn_addr + r'\s+non-MMGen',amt2,amt1)],
'5': [ ('98831F3A:E:11','110.654317776666555545',vbal1,'a1'),
('98831F3A:E:12','1.23456','99.99895'),
(burn_addr + r'\s+Non-MMGen',amt2,amt1)],
(burn_addr + r'\s+non-MMGen',amt2,amt1)],
'6': [ ('98831F3A:E:11','110.654317776666555545',vbal1,'a1'),
('98831F3A:E:12','0',vbal2),
('98831F3A:E:13','1.23456','0'),
(burn_addr + r'\s+Non-MMGen',amt2,amt1)],
(burn_addr + r'\s+non-MMGen',amt2,amt1)],
'7': [ ('98831F3A:E:11','67.444317776666555545',vbal9,'a2'),
('98831F3A:E:12','43.21',vbal2),
('98831F3A:E:13','1.23456','0'),
(burn_addr + r'\s+Non-MMGen',amt2,amt1)]
(burn_addr + r'\s+non-MMGen',amt2,amt1)]
}
token_bals_getbalance = {
'1': (vbal4,'999999.12345689012345678'),
@ -1260,20 +1260,32 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
def edit_comment(self,out_num,args=[],action='l',comment_text=None,changed=False,pexpect_spawn=None):
t = self.spawn('mmgen-txcreate', self.eth_args + args + ['-B','-i'],pexpect_spawn=pexpect_spawn)
p1,p2 = ('efresh balance:\b','return to main menu): ')
p3,r3 = (p2,comment_text+'\n') if comment_text is not None else ('(y/N): ','y')
p4,r4 = (('(y/N): ',),('y',)) if comment_text == Ctrl_U else ((),())
for p,r in zip((p1,p1,p2,p3)+p4,('M',action,out_num+'\n',r3)+r4):
t.expect(p,r)
menu_prompt = 'efresh balance:\b'
t.expect(menu_prompt,'M')
t.expect(menu_prompt,action)
t.expect(r'return to main menu): ',out_num+'\n')
for p,r in (
('Enter label text.*: ',comment_text+'\n') if comment_text is not None else (r'\(y/N\): ','y'),
(r'\(y/N\): ','y') if comment_text == Ctrl_U else (None,None),
):
if p:
t.expect(p,r,regex=True)
m = (
'Label to account #{} edited' if changed else
'Label for account #{} edited' if changed else
'Account #{} removed' if action == 'D' else
'Label added to account #{}' if comment_text and comment_text != Ctrl_U else
'Label removed from account #{}' )
t.expect(m.format(out_num))
for p,r in zip((p1,p1),('M','q')):
t.expect(p,r)
t.expect(menu_prompt,'M')
t.expect(menu_prompt,'q')
t.expect('Total unspent:')
return t
def edit_comment1(self):

View file

@ -683,7 +683,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
t = self.spawn('mmgen-tool',['--'+user,'txhist'] + args)
res = strip_ansi_escapes(t.read()).replace('\r','')
m = re.search(expect,res,re.DOTALL)
assert m, m
assert m, f'Expected: {expect}'
return t
def bob_txhist1(self):
@ -699,7 +699,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
def bob_txhist3(self):
return self.user_txhist('bob',
args = ['sort=blockheight','sinceblock=-7','age_fmt=block'],
expect = fr'Displaying transactions since block 399.*\s6\)\s+405.*:C:3\s.*\s{rtBals[9]}\s.*:L:5.*\s7\)'
expect = fr'Displaying transactions since block 399.*\s6\)\s+405\s.*\s{rtBals[9]}\s.*:L:5.*\s7\)'
)
def bob_txhist4(self):
@ -1147,28 +1147,29 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
mmid = self._user_sid('alice') + (':S:1',':L:1')[self.proto.coin=='BCH']
return self._user_chk_comment('alice',mmid,'Label added using coin address of MMGen address')
def alice_add_comment_badaddr(self,addr,reply):
def alice_add_comment_badaddr(self,addr,reply,exit_val):
if os.getenv('PYTHONOPTIMIZE'):
omsg(yellow(f'PYTHONOPTIMIZE set, skipping test {self.test_name!r}'))
return 'skip'
t = self.spawn('mmgen-tool',['--alice','add_label',addr,'(none)'])
t.expect(reply,regex=True)
t.req_exit_val = exit_val
return t
def alice_add_comment_badaddr1(self):
return self.alice_add_comment_badaddr( rt_pw,'Invalid coin address for this chain: ')
return self.alice_add_comment_badaddr( rt_pw,'Invalid coin address for this chain: ', 2)
def alice_add_comment_badaddr2(self):
addr = init_proto(self.proto.coin,network='mainnet').pubhash2addr(bytes(20),False) # mainnet zero address
return self.alice_add_comment_badaddr( addr, f'Invalid coin address for this chain: {addr}' )
return self.alice_add_comment_badaddr( addr, f'Invalid coin address for this chain: {addr}', 2 )
def alice_add_comment_badaddr3(self):
addr = self._user_sid('alice') + ':C:123'
return self.alice_add_comment_badaddr( addr, f'MMGen address {addr!r} not found in tracking wallet' )
return self.alice_add_comment_badaddr( addr, f'MMGen address {addr!r} not found in tracking wallet', 2 )
def alice_add_comment_badaddr4(self):
addr = self.proto.pubhash2addr(bytes(20),False) # regtest (testnet) zero address
return self.alice_add_comment_badaddr( addr, f'Address {addr!r} not found in tracking wallet' )
return self.alice_add_comment_badaddr( addr, f'Address {addr!r} not found in tracking wallet', 2 )
def alice_remove_comment1(self):
sid = self._user_sid('alice')
@ -1201,7 +1202,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
t.expect(r'add \[l\]abel:.','M',regex=True)
t.expect(r'add \[l\]abel:.','l',regex=True)
t.expect(r"Enter unspent.*return to main menu\):.",output+'\n',regex=True)
t.expect(r"Enter label text.*return to main menu\):.",comment+'\n',regex=True)
t.expect(r"Enter label text.*:.",comment+'\n',regex=True)
t.expect(r'\[q\]uit view, .*?:.','q',regex=True)
return t