123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326 |
- #!/usr/bin/env python
- #
- # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
- # Copyright (C)2013-2016 Philemon <mmgen-py@yandex.com>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- """
- tw: Tracking wallet methods for the MMGen suite
- """
- from mmgen.common import *
- from mmgen.obj import *
- from mmgen.tx import is_mmgen_id
- CUR_HOME,ERASE_ALL = '\033[H','\033[0J'
- def parse_tw_acct_label(s):
- ret = s.split(None,1)
- a1,a2 = None,None
- if ret:
- a1 = ret[0] if is_mmgen_id(ret[0]) else '' if ret[0][:4] == 'btc:' else None
- a2 = ret[1] if len(ret) == 2 else None
- return a1,a2
- class MMGenTWOutput(MMGenListItem):
- attrs_reassign = 'label','skip'
- attrs = 'txid','vout','amt','label','mmid','addr','confs','scriptPubKey','days','skip'
- label = MMGenListItemAttr('label','MMGenAddrLabel')
- class MMGenTrackingWallet(MMGenObject):
- wmsg = {
- 'no_spendable_outputs': """
- No spendable outputs found! Import addresses with balances into your
- watch-only wallet using '{}-addrimport' and then re-run this program.
- """.strip().format(g.proj_name)
- }
- sort_keys = 'addr','age','amt','txid','mmid'
- def __init__(self,minconf=1):
- self.unspent = []
- self.fmt_display = ''
- self.fmt_print = ''
- self.cols = None
- self.reverse = False
- self.group = False
- self.show_days = True
- self.show_mmid = True
- self.minconf = minconf
- self.get_data()
- self.sort_key = 'age'
- self.do_sort()
- self.total = self.get_total_btc()
- def get_total_btc(self):
- return sum([i.amt for i in self.unspent])
- def get_data(self):
- if g.bogus_wallet_data: # for debugging purposes only
- us_rpc = eval(get_data_from_file(g.bogus_wallet_data))
- else:
- us_rpc = bitcoin_connection().listunspent(self.minconf)
- # write_data_to_file('bogus_unspent.json', repr(us), 'bogus unspent data')
- # sys.exit()
- if not us_rpc: die(0,self.wmsg['no_spendable_outputs'])
- for o in us_rpc:
- o['mmid'],o['label'] = parse_tw_acct_label(o['account']) if 'account' in o else ('','')
- o['days'] = int(o['confirmations'] * g.mins_per_block / (60*24))
- o['amt'] = o['amount'] # TODO
- o['addr'] = o['address']
- o['confs'] = o['confirmations']
- self.unspent = [MMGenTWOutput(**dict([(k,v) for k,v in o.items() if k in MMGenTWOutput.attrs and o[k] not in (None,'')])) for o in us_rpc]
- # die(1,''.join([pp_format(i)+'\n' for i in us_rpc]))
- # die(1,''.join([str(i)+'\n' for i in self.unspent]))
- def s_addr(self,i): return i.addr
- def s_age(self,i): return 0 - i.confs
- def s_amt(self,i): return i.amt
- def s_txid(self,i): return '%s %03s' % (i.txid,i.vout)
- def s_mmid(self,i):
- if i.mmid:
- return '{}:{:>0{w}}'.format(
- *i.mmid.split(':'), w=AddrIdx.max_digits)
- else: return 'G' + (i.label or '')
- def do_sort(self,key=None,reverse=None):
- if not key: key = self.sort_key
- assert key
- self.sort_key = key
- if key not in self.sort_keys:
- fs = "'{}': invalid sort key. Valid keys: [{}]"
- die(2,fs.format(key,' '.join(self.sort_keys)))
- if reverse == None: reverse = self.reverse
- self.unspent.sort(key=getattr(self,'s_'+key),reverse=reverse)
- def sort_info(self,include_group=True):
- ret = ([],['Reverse'])[self.reverse]
- ret.append(self.sort_key.capitalize().replace('Mmid','MMGenID'))
- if include_group and self.group and (self.sort_key in ('addr','txid','mmid')):
- ret.append('Grouped')
- return ret
- def set_term_columns(self):
- from mmgen.term import get_terminal_size
- while True:
- self.cols = get_terminal_size()[0]
- if self.cols >= g.min_screen_width: break
- m1 = 'Screen too narrow to display the tracking wallet'
- m2 = 'Please resize your screen to at least {} characters and hit ENTER '
- my_raw_input(m1+'\n'+m2.format(g.min_screen_width))
- def display(self):
- if not opt.no_blank: msg(CUR_HOME+ERASE_ALL)
- msg(self.format_for_display())
- def format_for_display(self):
- unsp = [MMGenTWOutput(**i.__dict__) for i in self.unspent]
- self.set_term_columns()
- for i in unsp:
- if i.label == None: i.label = ''
- i.skip = ''
- mmid_w = max(len(i.mmid or '') for i in unsp) or 10
- max_acct_len = max([len((i.mmid or '')+i.label)+1 for i in unsp])
- addr_w = min(34+((1+max_acct_len) if self.show_mmid else 0),self.cols-46) + 6
- acct_w = min(max_acct_len, max(24,int(addr_w-10)))
- btaddr_w = addr_w - acct_w - 1
- label_w = acct_w - mmid_w - 1
- tx_w = max(11,min(64, self.cols-addr_w-32))
- txdots = ('','...')[tx_w < 64]
- fs = ' %-4s %-' + str(tx_w) + 's %-2s %s %s %s'
- table_hdr = fs % ('Num',
- 'TX id'.ljust(tx_w - 5) + ' Vout',
- '',
- BTCAddr.fmtc('Address',width=addr_w+1),
- 'Amt(BTC) ',
- ('Conf.','Age(d)')[self.show_days])
- if self.group and (self.sort_key in ('addr','txid','mmid')):
- for a,b in [(unsp[i],unsp[i+1]) for i in range(len(unsp)-1)]:
- for k in ('addr','txid','mmid'):
- if self.sort_key == k and getattr(a,k) == getattr(b,k):
- b.skip = (k,'addr')[k=='mmid']
- hdr_fmt = 'UNSPENT OUTPUTS (sort order: %s) Total BTC: %s'
- out = [hdr_fmt % (' '.join(self.sort_info()), self.total.hl()), table_hdr]
- for n,i in enumerate(unsp):
- addr_dots = '|' + '.'*33
- mmid_disp = MMGenID.fmtc('.'*mmid_w if i.skip=='addr'
- else i.mmid or 'Non-{}'.format(g.proj_name),width=mmid_w,color=True)
- if self.show_mmid:
- addr_out = '%s %s' % (
- type(i.addr).fmtc(addr_dots,width=btaddr_w,color=True) if i.skip == 'addr' \
- else i.addr.fmt(width=btaddr_w,color=True),
- '{} {}'.format(mmid_disp,i.label.fmt(width=label_w,color=True) \
- if label_w > 0 else '')
- )
- else:
- addr_out = type(i.addr).fmtc(addr_dots,width=addr_w,color=True) \
- if i.skip=='addr' else i.addr.fmt(width=addr_w,color=True)
- tx = ' ' * (tx_w-4) + '|...' if i.skip == 'txid' \
- else i.txid[:tx_w-len(txdots)]+txdots
- out.append(fs % (str(n+1)+')',tx,i.vout,addr_out,i.amt.fmt(color=True),
- i.days if self.show_days else i.confs))
- self.fmt_display = '\n'.join(out) + '\n'
- return self.fmt_display
- def format_for_printing(self,color=False):
- fs = ' %-4s %-67s %s %s %s %-8s %-6s %s'
- out = [fs % ('Num','Tx ID,Vout','Address'.ljust(34),'MMGen ID'.ljust(15),
- 'Amount(BTC)','Conf.','Age(d)', 'Label')]
- max_lbl_len = max([len(i.label) for i in self.unspent if i.label] or [1])
- for n,i in enumerate(self.unspent):
- addr = '=' if i.skip == 'addr' and self.group else i.addr.fmt(color=color)
- tx = ' ' * 63 + '=' if i.skip == 'txid' and self.group else str(i.txid)
- s = fs % (str(n+1)+')', tx+','+str(i.vout),addr,
- MMGenID.fmtc(i.mmid or 'Non-{}'.format(g.proj_name),width=14,color=color),
- i.amt.fmt(color=color),i.confs,i.days,
- i.label.hl(color=color) if i.label else
- MMGenAddrLabel.fmtc('',color=color,nullrepl='-',width=max_lbl_len))
- out.append(s.rstrip())
- fs = 'Unspent outputs ({} UTC)\nSort order: {}\n\n{}\n\nTotal BTC: {}\n'
- self.fmt_print = fs.format(
- make_timestr(),
- ' '.join(self.sort_info(include_group=False)),
- '\n'.join(out),
- self.total.hl(color=color))
- return self.fmt_print
- def display_total(self):
- fs = '\nTotal unspent: %s BTC (%s outputs)'
- msg(fs % (self.total.hl(),len(self.unspent)))
- def get_idx_and_label_from_user(self):
- msg('')
- while True:
- ret = my_raw_input("Enter unspent output number (or 'q' to return to main menu): ")
- if ret == 'q': return None,None
- n = AddrIdx(ret,on_fail='silent') # hacky way to test and convert to integer
- if not n or n < 1 or n > len(self.unspent):
- msg('Choice must be a single number between 1 and %s' % len(self.unspent))
- # elif not self.unspent[n-1].mmid:
- # msg('Address #%s is not an %s address. No label can be added to it' %
- # (n,g.proj_name))
- else:
- while True:
- s = my_raw_input("Enter label text (or 'q' to return to main menu): ")
- if s == 'q':
- return None,None
- elif s == '':
- if keypress_confirm(
- "Removing label for address #%s. Is this what you want?" % n):
- return n,s
- elif s:
- if MMGenAddrLabel(s,on_fail='return'):
- return n,s
- def view_and_sort(self):
- prompt = """
- Sort options: [t]xid, [a]mount, a[d]dress, [A]ge, [r]everse, [M]mgen addr
- Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
- """.strip()
- self.display()
- msg(prompt)
- from mmgen.term import get_char
- p = "'q'=quit view, 'p'=print to file, 'v'=pager view, 'w'=wide view, 'l'=add label:\b"
- while True:
- reply = get_char(p, immed_chars='atDdAMrgmeqpvw')
- if reply == 'a': self.do_sort('amt')
- elif reply == 'A': self.do_sort('age')
- elif reply == 'd': self.do_sort('addr')
- elif reply == 'D': self.show_days = not self.show_days
- elif reply == 'e': msg('\n%s\n%s\n%s' % (self.fmt_display,prompt,p))
- elif reply == 'g': self.group = not self.group
- elif reply == 'l':
- idx,lbl = self.get_idx_and_label_from_user()
- if idx:
- e = self.unspent[idx-1]
- if type(self).add_label(e.mmid,lbl,addr=e.addr):
- self.get_data()
- self.do_sort()
- msg('%s\n%s\n%s' % (self.fmt_display,prompt,p))
- else:
- msg('Label could not be added\n%s\n%s' % (prompt,p))
- elif reply == 'M': self.do_sort('mmid'); self.show_mmid = True
- elif reply == 'm': self.show_mmid = not self.show_mmid
- elif reply == 'p':
- msg('')
- of = 'listunspent[%s].out' % ','.join(self.sort_info(include_group=False)).lower()
- write_data_to_file(of,self.format_for_printing(),'unspent outputs listing')
- m = yellow("Data written to '%s'" % of)
- msg('\n%s\n%s\n\n%s' % (self.fmt_display,m,prompt))
- continue
- elif reply == 'q': return self.unspent
- elif reply == 'r': self.unspent.reverse(); self.reverse = not self.reverse
- elif reply == 't': self.do_sort('txid')
- elif reply == 'v':
- do_pager(self.fmt_display)
- continue
- elif reply == 'w':
- do_pager(self.format_for_printing(color=True))
- continue
- else:
- msg('\nInvalid input')
- continue
- msg('\n')
- self.display()
- msg(prompt)
- # returns on failure
- @classmethod
- def add_label(cls,arg1,label='',addr=None):
- from mmgen.tx import is_mmgen_id,is_btc_addr
- if is_mmgen_id(arg1):
- mmaddr = MMGenID(arg1)
- elif is_btc_addr(arg1): # called from 'mmgen-tool add_label'
- addr = arg1
- mmaddr = 'btc:'+arg1
- elif not arg1 and is_btc_addr(addr): # called from view_and_sort(), non-MMGen addr
- mmaddr = 'btc:'+addr
- else:
- die(3,'{}: not a BTC address or {} ID'.format(arg1,g.proj_name))
- if addr:
- if not BTCAddr(addr,on_fail='return'): return False
- else:
- from mmgen.addr import AddrData
- addr = AddrData(source='tw').mmaddr2btcaddr(mmaddr)
- if not addr:
- msg('{} address {} not found in tracking wallet'.format(g.proj_name,mmaddr))
- return False
- label = MMGenAddrLabel(label,on_fail='return')
- if not label and label != '': return False
- acct = mmaddr + (' ' + label if label else '') # label is ASCII for now
- # return on failure - args: addr,label,rescan,p2sh
- ret = bitcoin_connection().importaddress(addr,acct,False,on_fail='return')
- from mmgen.rpc import rpc_error,rpc_errmsg
- if rpc_error(ret): msg('From bitcoind: ' + rpc_errmsg(ret))
- return not rpc_error(ret)
- @classmethod
- def remove_label(cls,mmaddr): cls.add_label(mmaddr,'')
|