| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483 |
- #!/usr/bin/env python3
- #
- # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
- # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- """
- twuo: Tracking wallet unspent outputs class for the MMGen suite
- """
- import time
- from collections import namedtuple
- from .globalvars import g
- from .color import red,yellow,green
- from .util import (
- msg,
- msg_r,
- die,
- capfirst,
- suf,
- fmt,
- make_timestr,
- keypress_confirm,
- line_input,
- do_pager,
- base_proto_subclass
- )
- from .base_obj import AsyncInit
- from .objmethods import MMGenObject
- from .obj import ImmutableAttr,ListItemAttr,MMGenListItem,TwComment,get_obj,HexStr,CoinTxID
- from .addr import CoinAddr,MMGenID,AddrIdx
- from .rpc import rpc_init
- from .tw import TwCommon,TwMMGenID,get_tw_label
- class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
- def __new__(cls,proto,*args,**kwargs):
- return MMGenObject.__new__(base_proto_subclass(cls,proto,'twuo'))
- txid_w = 64
- age_fmts_date_dependent = ('days','date','date_time')
- age_fmts_interactive = ('confs','block','days','date')
- _age_fmt = 'confs'
- class MMGenTwOutputList(list,MMGenObject): pass
- class MMGenTwUnspentOutput(MMGenListItem):
- txid = ListItemAttr(CoinTxID)
- vout = ListItemAttr(int,typeconv=False)
- amt = ImmutableAttr(None)
- amt2 = ListItemAttr(None)
- label = ListItemAttr(TwComment,reassign_ok=True)
- twmmid = ImmutableAttr(TwMMGenID,include_proto=True)
- addr = ImmutableAttr(CoinAddr,include_proto=True)
- confs = ImmutableAttr(int,typeconv=False)
- date = ListItemAttr(int,typeconv=False,reassign_ok=True)
- scriptPubKey = ImmutableAttr(HexStr)
- skip = ListItemAttr(str,typeconv=False,reassign_ok=True)
- def __init__(self,proto,**kwargs):
- self.__dict__['proto'] = proto
- MMGenListItem.__init__(self,**kwargs)
- class conv_funcs:
- def amt(self,value):
- return self.proto.coin_amt(value)
- def amt2(self,value):
- return self.proto.coin_amt(value)
- async def __init__(self,proto,minconf=1,addrs=[]):
- self.proto = proto
- self.unspent = self.MMGenTwOutputList()
- self.fmt_display = ''
- self.fmt_print = ''
- self.cols = None
- self.reverse = False
- self.group = False
- self.show_mmid = True
- self.minconf = minconf
- self.addrs = addrs
- self.sort_key = 'age'
- self.disp_prec = self.get_display_precision()
- self.rpc = await rpc_init(proto)
- from .twctl import TrackingWallet
- self.wallet = await TrackingWallet(proto,mode='w')
- @property
- def age_fmt(self):
- return self._age_fmt
- @age_fmt.setter
- def age_fmt(self,val):
- if val not in self.age_fmts:
- die( 'BadAgeFormat', f'{val!r}: invalid age format (must be one of {self.age_fmts!r})' )
- self._age_fmt = val
- def get_display_precision(self):
- return self.proto.coin_amt.max_prec
- @property
- def total(self):
- return sum(i.amt for i in self.unspent)
- async def get_unspent_data(self,sort_key=None,reverse_sort=False):
- us_raw = await self.get_unspent_rpc()
- if not us_raw:
- die(0,fmt(f"""
- No spendable outputs found! Import addresses with balances into your
- watch-only wallet using '{g.proj_name.lower()}-addrimport' and then re-run this program.
- """).strip())
- lbl_id = ('account','label')['label_api' in self.rpc.caps]
- def gen_unspent():
- for o in us_raw:
- if not lbl_id in o:
- continue # coinbase outputs have no account field
- l = get_tw_label(self.proto,o[lbl_id])
- if l:
- o.update({
- 'twmmid': l.mmid,
- 'label': l.comment or '',
- 'amt': self.proto.coin_amt(o['amount']),
- 'addr': CoinAddr(self.proto,o['address']),
- 'confs': o['confirmations']
- })
- yield self.MMGenTwUnspentOutput(
- self.proto,
- **{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } )
- self.unspent = self.MMGenTwOutputList(gen_unspent())
- if not self.unspent:
- die(1, f'No tracked {self.item_desc}s in tracking wallet!')
- self.do_sort(key=sort_key,reverse=reverse_sort)
- def do_sort(self,key=None,reverse=False):
- sort_funcs = {
- 'addr': lambda i: i.addr,
- 'age': lambda i: 0 - i.confs,
- 'amt': lambda i: i.amt,
- 'txid': lambda i: f'{i.txid} {i.vout:04}',
- 'twmmid': lambda i: i.twmmid.sort_key
- }
- key = key or self.sort_key
- if key not in sort_funcs:
- die(1,f'{key!r}: invalid sort key. Valid options: {" ".join(sort_funcs.keys())}')
- self.sort_key = key
- assert type(reverse) == bool
- self.unspent.sort(key=sort_funcs[key],reverse=reverse or self.reverse)
- def sort_info(self,include_group=True):
- ret = ([],['Reverse'])[self.reverse]
- ret.append(capfirst(self.sort_key).replace('Twmmid','MMGenID'))
- if include_group and self.group and (self.sort_key in ('addr','txid','twmmid')):
- ret.append('Grouped')
- return ret
- def set_term_columns(self):
- from .term import get_terminal_size
- while True:
- self.cols = g.terminal_width or get_terminal_size().width
- if self.cols >= g.min_screen_width:
- break
- line_input(
- 'Screen too narrow to display the tracking wallet\n'
- + f'Please resize your screen to at least {g.min_screen_width} characters and hit ENTER ' )
- def get_display_constants(self):
- unsp = self.unspent
- for i in unsp:
- i.skip = ''
- # allow for 7-digit confirmation nums
- col1_w = max(3,len(str(len(unsp)))+1) # num + ')'
- mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in unsp) or 12 # DEADBEEF:S:1
- max_acct_w = max(i.label.screen_width for i in unsp) + mmid_w + 1
- max_btcaddr_w = max(len(i.addr) for i in unsp)
- min_addr_w = self.cols - self.col_adj
- addr_w = min(max_btcaddr_w + (0,1+max_acct_w)[self.show_mmid],min_addr_w)
- acct_w = min(max_acct_w, max(24,addr_w-10))
- btaddr_w = addr_w - acct_w - 1
- label_w = acct_w - mmid_w - 1
- tx_w = min(self.txid_w,self.cols-addr_w-29-col1_w) # min=6 TODO
- txdots = ('','..')[tx_w < self.txid_w]
- dc = namedtuple('display_constants',['col1_w','mmid_w','addr_w','btaddr_w','label_w','tx_w','txdots'])
- return dc(col1_w,mmid_w,addr_w,btaddr_w,label_w,tx_w,txdots)
- async def format_for_display(self):
- unsp = self.unspent
- if self.has_age and self.age_fmt in self.age_fmts_date_dependent:
- await self.set_dates(self.rpc,unsp)
- self.set_term_columns()
- c = getattr(self,'display_constants',None)
- if not c:
- c = self.display_constants = self.get_display_constants()
- if self.group and (self.sort_key in ('addr','txid','twmmid')):
- for a,b in [(unsp[i],unsp[i+1]) for i in range(len(unsp)-1)]:
- for k in ('addr','txid','twmmid'):
- if self.sort_key == k and getattr(a,k) == getattr(b,k):
- b.skip = (k,'addr')[k=='twmmid']
- def gen_output():
- yield self.hdr_fmt.format(' '.join(self.sort_info()),self.proto.dcoin,self.total.hl())
- if self.proto.chain_name != 'mainnet':
- yield 'Chain: '+green(self.proto.chain_name.upper())
- fs = self.display_fs_fs.format( cw=c.col1_w, tw=c.tx_w )
- hdr_fs = self.display_hdr_fs_fs.format( cw=c.col1_w, tw=c.tx_w )
- yield hdr_fs.format(
- n = 'Num',
- t = 'TXid'.ljust(c.tx_w - 2) + ' Vout',
- a = 'Address'.ljust(c.addr_w),
- A = f'Amt({self.proto.dcoin})'.ljust(self.disp_prec+5),
- A2 = f' Amt({self.proto.coin})'.ljust(self.disp_prec+4),
- c = {
- 'confs': 'Confs',
- 'block': 'Block',
- 'days': 'Age(d)',
- 'date': 'Date',
- 'date_time': 'Date',
- }[self.age_fmt],
- ).rstrip()
- for n,i in enumerate(unsp):
- addr_dots = '|' + '.'*(c.addr_w-1)
- mmid_disp = MMGenID.fmtc(
- (
- '.'*c.mmid_w if i.skip == 'addr' else
- i.twmmid if i.twmmid.type == 'mmgen' else
- f'Non-{g.proj_name}'
- ),
- width = c.mmid_w,
- color = True )
- if self.show_mmid:
- addr_out = '{} {}{}'.format((
- type(i.addr).fmtc(addr_dots,width=c.btaddr_w,color=True) if i.skip == 'addr' else
- i.addr.fmt(width=c.btaddr_w,color=True)
- ),
- mmid_disp,
- (' ' + i.label.fmt(width=c.label_w,color=True)) if c.label_w > 0 else ''
- )
- else:
- addr_out = (
- type(i.addr).fmtc(addr_dots,width=c.addr_w,color=True) if i.skip=='addr' else
- i.addr.fmt(width=c.addr_w,color=True) )
- yield fs.format(
- n = str(n+1)+')',
- t = (
- '' if not i.txid else
- ' ' * (c.tx_w-4) + '|...' if i.skip == 'txid' else
- i.txid[:c.tx_w-len(c.txdots)] + c.txdots ),
- v = i.vout,
- a = addr_out,
- A = i.amt.fmt(color=True,prec=self.disp_prec),
- A2 = (i.amt2.fmt(color=True,prec=self.disp_prec) if i.amt2 is not None else ''),
- c = self.age_disp(i,self.age_fmt),
- ).rstrip()
- self.fmt_display = '\n'.join(gen_output()) + '\n'
- return self.fmt_display
- async def format_for_printing(self,color=False,show_confs=True):
- if self.has_age:
- await self.set_dates(self.rpc,self.unspent)
- addr_w = max(len(i.addr) for i in self.unspent)
- mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in self.unspent) or 12 # DEADBEEF:S:1
- fs = self.print_fs_fs.format(
- tw = self.txid_w + 3,
- cf = '{c:<8} ' if show_confs else '',
- aw = self.proto.coin_amt.max_prec + 5 )
- def gen_output():
- yield fs.format(
- n = 'Num',
- t = 'Tx ID,Vout',
- a = 'Address'.ljust(addr_w),
- m = 'MMGen ID'.ljust(mmid_w),
- A = f'Amount({self.proto.dcoin})',
- A2 = f'Amount({self.proto.coin})',
- c = 'Confs', # skipped for eth
- b = 'Block', # skipped for eth
- D = 'Date',
- l = 'Label' )
- max_lbl_len = max([len(i.label) for i in self.unspent if i.label] or [2])
- for n,i in enumerate(self.unspent):
- yield fs.format(
- n = str(n+1) + ')',
- t = '{},{}'.format(
- ('|'+'.'*63 if i.skip == 'txid' and self.group else i.txid),
- i.vout ),
- a = (
- '|'+'.' * addr_w if i.skip == 'addr' and self.group else
- i.addr.fmt(color=color,width=addr_w) ),
- m = MMGenID.fmtc(
- (i.twmmid if i.twmmid.type == 'mmgen' else f'Non-{g.proj_name}'),
- width = mmid_w,
- color = color ),
- A = i.amt.fmt(color=color),
- A2 = ( i.amt2.fmt(color=color) if i.amt2 is not None else '' ),
- c = i.confs,
- b = self.rpc.blockcount - (i.confs - 1),
- D = self.age_disp(i,'date_time'),
- l = i.label.hl(color=color) if i.label else
- TwComment.fmtc(
- s = '',
- color = color,
- nullrepl = '-',
- width = max_lbl_len )
- ).rstrip()
- fs2 = '{} (block #{}, {} UTC)\n{}Sort order: {}\n{}\n\nTotal {}: {}\n'
- self.fmt_print = fs2.format(
- capfirst(self.desc),
- self.rpc.blockcount,
- make_timestr(self.rpc.cur_date),
- ('' if self.proto.chain_name == 'mainnet' else
- 'Chain: {}\n'.format(green(self.proto.chain_name.upper())) ),
- ' '.join(self.sort_info(include_group=False)),
- '\n'.join(gen_output()),
- self.proto.dcoin,
- self.total.hl(color=color) )
- return self.fmt_print
- def display_total(self):
- msg('\nTotal unspent: {} {} ({} output{})'.format(
- self.total.hl(),
- self.proto.dcoin,
- len(self.unspent),
- suf(self.unspent) ))
- def get_idx_from_user(self,action):
- msg('')
- while True:
- ret = line_input(f'Enter {self.item_desc} number (or RETURN to return to main menu): ')
- if ret == '':
- return (None,None) if action == 'a_lbl_add' else None
- n = get_obj(AddrIdx,n=ret,silent=True)
- if not n or n < 1 or n > len(self.unspent):
- msg(f'Choice must be a single number between 1 and {len(self.unspent)}')
- else:
- if action == 'a_lbl_add':
- cur_lbl = self.unspent[n-1].label
- msg('Current label: {}'.format(cur_lbl.hl() if cur_lbl else '(none)'))
- while True:
- s = line_input(
- "Enter label text (or 'q' to return to main menu): ",
- insert_txt = cur_lbl )
- if s == 'q':
- return None,None
- elif s == '':
- if keypress_confirm(
- f'Removing label for {self.item_desc} #{n}. Is this what you want?'):
- return n,s
- elif s:
- if get_obj(TwComment,s=s):
- return n,s
- else:
- if action == 'a_addr_delete':
- fs = 'Removing {} #{} from tracking wallet. Is this what you want?'
- elif action == 'a_balance_refresh':
- fs = 'Refreshing tracking wallet {} #{}. Is this what you want?'
- if keypress_confirm(fs.format(self.item_desc,n)):
- return n
- async def view_and_sort(self,tx):
- from .term import get_char
- prompt = self.prompt.strip() + '\b'
- no_output,oneshot_msg = False,None
- from .opts import opt
- CUR_HOME,ERASE_ALL = '\033[H','\033[0J'
- CUR_RIGHT = lambda n: f'\033[{n}C'
- while True:
- msg_r('' if no_output else '\n\n' if opt.no_blank else CUR_HOME+ERASE_ALL)
- reply = get_char(
- '' if no_output else await self.format_for_display()+'\n'+(oneshot_msg or '')+prompt,
- immed_chars=''.join(self.key_mappings.keys())
- )
- no_output = False
- oneshot_msg = '' if oneshot_msg else None # tristate, saves previous state
- if reply not in self.key_mappings:
- msg_r('\ninvalid keypress ')
- time.sleep(0.5)
- continue
- action = self.key_mappings[reply]
- if action[:2] == 's_':
- self.do_sort(action[2:])
- if action == 's_twmmid': self.show_mmid = True
- elif action == 'd_days':
- af = self.age_fmts_interactive
- self.age_fmt = af[(af.index(self.age_fmt) + 1) % len(af)]
- elif action == 'd_mmid':
- self.show_mmid = not self.show_mmid
- elif action == 'd_group':
- if self.can_group:
- self.group = not self.group
- elif action == 'd_redraw':
- pass
- elif action == 'd_reverse':
- self.unspent.reverse()
- self.reverse = not self.reverse
- elif action == 'a_quit':
- msg('')
- return self.unspent
- elif action == 'a_balance_refresh':
- idx = self.get_idx_from_user(action)
- if idx:
- e = self.unspent[idx-1]
- bal = await self.wallet.get_balance(e.addr,force_rpc=True)
- await self.get_unspent_data()
- oneshot_msg = yellow(f'{self.proto.dcoin} balance for account #{idx} refreshed\n\n')
- self.display_constants = self.get_display_constants()
- elif action == 'a_lbl_add':
- idx,lbl = self.get_idx_from_user(action)
- if idx:
- e = self.unspent[idx-1]
- if await self.wallet.add_label(e.twmmid,lbl,addr=e.addr):
- await self.get_unspent_data()
- oneshot_msg = yellow('Label {} {} #{}\n\n'.format(
- ('added to' if lbl else 'removed from'),
- self.item_desc,
- idx ))
- else:
- oneshot_msg = red('Label could not be added\n\n')
- self.display_constants = self.get_display_constants()
- elif action == 'a_addr_delete':
- idx = self.get_idx_from_user(action)
- if idx:
- e = self.unspent[idx-1]
- if await self.wallet.remove_address(e.addr):
- await self.get_unspent_data()
- oneshot_msg = yellow(f'{capfirst(self.item_desc)} #{idx} removed\n\n')
- else:
- oneshot_msg = red('Address could not be removed\n\n')
- self.display_constants = self.get_display_constants()
- elif action == 'a_print':
- of = '{}-{}[{}].out'.format(
- self.dump_fn_pfx,
- self.proto.dcoin,
- ','.join(self.sort_info(include_group=False)).lower() )
- msg('')
- from .fileutil import write_data_to_file
- try:
- write_data_to_file(
- of,
- await self.format_for_printing(),
- desc = f'{self.desc} listing' )
- except UserNonConfirmation as e:
- oneshot_msg = red(f'File {of!r} not overwritten by user request\n\n')
- else:
- oneshot_msg = yellow(f'Data written to {of!r}\n\n')
- elif action in ('a_view','a_view_wide'):
- do_pager(
- self.fmt_display if action == 'a_view' else
- await self.format_for_printing(color=True) )
- if g.platform == 'linux' and oneshot_msg == None:
- msg_r(CUR_RIGHT(len(prompt.split('\n')[-1])-2))
- no_output = True
|