twaddrs.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. twaddrs: Tracking wallet listaddresses class for the MMGen suite
  20. """
  21. from .color import green
  22. from .util import msg,die,base_proto_subclass
  23. from .base_obj import AsyncInit
  24. from .obj import MMGenList,MMGenDict,TwComment
  25. from .addr import CoinAddr,MMGenID
  26. from .rpc import rpc_init
  27. from .tw import TwCommon,get_tw_label
  28. class TwAddrList(MMGenDict,TwCommon,metaclass=AsyncInit):
  29. has_age = True
  30. def __new__(cls,proto,*args,**kwargs):
  31. return MMGenDict.__new__(base_proto_subclass(cls,proto,'twaddrs'),*args,**kwargs)
  32. async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None):
  33. def check_dup_mmid(acct_labels):
  34. mmid_prev,err = None,False
  35. for mmid in sorted(a.mmid for a in acct_labels if a):
  36. if mmid == mmid_prev:
  37. err = True
  38. msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n')
  39. mmid_prev = mmid
  40. if err:
  41. die(4,'Tracking wallet is corrupted!')
  42. def check_addr_array_lens(acct_pairs):
  43. err = False
  44. for label,addrs in acct_pairs:
  45. if not label: continue
  46. if len(addrs) != 1:
  47. err = True
  48. if len(addrs) == 0:
  49. msg(f'Label {label!r}: has no associated address!')
  50. else:
  51. msg(f'{addrs!r}: more than one {proto.coin} address in account!')
  52. if err:
  53. die(4,'Tracking wallet is corrupted!')
  54. self.rpc = await rpc_init(proto)
  55. self.total = proto.coin_amt('0')
  56. self.proto = proto
  57. lbl_id = ('account','label')['label_api' in self.rpc.caps]
  58. for d in await self.rpc.call('listunspent',0):
  59. if not lbl_id in d: continue # skip coinbase outputs with missing account
  60. if d['confirmations'] < minconf: continue
  61. label = get_tw_label(proto,d[lbl_id])
  62. if label:
  63. lm = label.mmid
  64. if usr_addr_list and (lm not in usr_addr_list):
  65. continue
  66. if lm in self:
  67. if self[lm]['addr'] != d['address']:
  68. die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format(
  69. proto.coin,
  70. d['address'],
  71. self[lm]['addr'] ))
  72. else:
  73. lm.confs = d['confirmations']
  74. lm.txid = d['txid']
  75. lm.date = None
  76. self[lm] = {
  77. 'amt': proto.coin_amt('0'),
  78. 'lbl': label,
  79. 'addr': CoinAddr(proto,d['address']) }
  80. amt = proto.coin_amt(d['amount'])
  81. self[lm]['amt'] += amt
  82. self.total += amt
  83. # We use listaccounts only for empty addresses, as it shows false positive balances
  84. if showempty or all_labels:
  85. # for compatibility with old mmids, must use raw RPC rather than native data for matching
  86. # args: minconf,watchonly, MUST use keys() so we get list, not dict
  87. if 'label_api' in self.rpc.caps:
  88. acct_list = await self.rpc.call('listlabels')
  89. aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list])
  90. acct_addrs = [list(a.keys()) for a in aa]
  91. else:
  92. acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L'
  93. acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) # use raw list here
  94. acct_labels = MMGenList([get_tw_label(proto,a) for a in acct_list])
  95. check_dup_mmid(acct_labels)
  96. assert len(acct_list) == len(acct_addrs),(
  97. 'listaccounts() and getaddressesbyaccount() not equal in length')
  98. addr_pairs = list(zip(acct_labels,acct_addrs))
  99. check_addr_array_lens(addr_pairs)
  100. for label,addr_arr in addr_pairs:
  101. if not label: continue
  102. if all_labels and not showempty and not label.comment: continue
  103. if usr_addr_list and (label.mmid not in usr_addr_list): continue
  104. if label.mmid not in self:
  105. self[label.mmid] = { 'amt':proto.coin_amt('0'), 'lbl':label, 'addr':'' }
  106. if showbtcaddrs:
  107. self[label.mmid]['addr'] = CoinAddr(proto,addr_arr[0])
  108. def raw_list(self):
  109. return [((k if k.type == 'mmgen' else 'Non-MMGen'),self[k]['addr'],self[k]['amt']) for k in self]
  110. def coinaddr_list(self):
  111. return [self[k]['addr'] for k in self]
  112. async def format(self,showbtcaddrs,sort,show_age,age_fmt):
  113. if not self.has_age:
  114. show_age = False
  115. if age_fmt not in self.age_fmts:
  116. die( 'BadAgeFormat', f'{age_fmt!r}: invalid age format (must be one of {self.age_fmts!r})' )
  117. fs = '{mid}' + ('',' {addr}')[showbtcaddrs] + ' {cmt} {amt}' + ('',' {age}')[show_age]
  118. mmaddrs = [k for k in self.keys() if k.type == 'mmgen']
  119. max_mmid_len = max(len(k) for k in mmaddrs) + 2 if mmaddrs else 10
  120. max_cmt_width = max(max(v['lbl'].comment.screen_width for v in self.values()),7)
  121. addr_width = max(len(self[mmid]['addr']) for mmid in self)
  122. max_fp_len = max([len(a.split('.')[1]) for a in [str(v['amt']) for v in self.values()] if '.' in a] or [1])
  123. def sort_algo(j):
  124. if sort and 'age' in sort:
  125. return '{}_{:>012}_{}'.format(
  126. j.obj.rsplit(':',1)[0],
  127. # Hack, but OK for the foreseeable future:
  128. (1000000000-(j.confs or 0) if hasattr(j,'confs') else 0),
  129. j.sort_key)
  130. else:
  131. return j.sort_key
  132. mmids = sorted(self,key=sort_algo,reverse=bool(sort and 'reverse' in sort))
  133. if show_age:
  134. await self.set_dates(
  135. self.rpc,
  136. [o for o in mmids if hasattr(o,'confs')] )
  137. def gen_output():
  138. if self.proto.chain_name != 'mainnet':
  139. yield 'Chain: '+green(self.proto.chain_name.upper())
  140. yield fs.format(
  141. mid=MMGenID.fmtc('MMGenID',width=max_mmid_len),
  142. addr=(CoinAddr.fmtc('ADDRESS',width=addr_width) if showbtcaddrs else None),
  143. cmt=TwComment.fmtc('COMMENT',width=max_cmt_width+1),
  144. amt='BALANCE'.ljust(max_fp_len+4),
  145. age=age_fmt.upper(),
  146. ).rstrip()
  147. al_id_save = None
  148. for mmid in mmids:
  149. if mmid.type == 'mmgen':
  150. if al_id_save and al_id_save != mmid.obj.al_id:
  151. yield ''
  152. al_id_save = mmid.obj.al_id
  153. mmid_disp = mmid
  154. else:
  155. if al_id_save:
  156. yield ''
  157. al_id_save = None
  158. mmid_disp = 'Non-MMGen'
  159. e = self[mmid]
  160. yield fs.format(
  161. mid=MMGenID.fmtc(mmid_disp,width=max_mmid_len,color=True),
  162. addr=(e['addr'].fmt(color=True,width=addr_width) if showbtcaddrs else None),
  163. cmt=e['lbl'].comment.fmt(width=max_cmt_width,color=True,nullrepl='-'),
  164. amt=e['amt'].fmt('4.{}'.format(max(max_fp_len,3)),color=True),
  165. age=self.age_disp(mmid,age_fmt) if show_age and hasattr(mmid,'confs') else '-'
  166. ).rstrip()
  167. yield '\nTOTAL: {} {}'.format(
  168. self.total.hl(color=True),
  169. self.proto.dcoin )
  170. return '\n'.join(gen_output())