twaddrs.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. twaddrs: Tracking wallet listaddresses class for the MMGen suite
  20. """
  21. from .color import green
  22. from .exception import BadAgeFormat
  23. from .util import msg,die,altcoin_subclass
  24. from .base_obj import AsyncInit
  25. from .obj import MMGenList,MMGenDict,TwComment
  26. from .addr import CoinAddr,MMGenID
  27. from .rpc import rpc_init
  28. from .tw import TwCommon,get_tw_label
  29. class TwAddrList(MMGenDict,TwCommon,metaclass=AsyncInit):
  30. has_age = True
  31. def __new__(cls,proto,*args,**kwargs):
  32. return MMGenDict.__new__(altcoin_subclass(cls,proto,'twaddrs'),*args,**kwargs)
  33. async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None):
  34. def check_dup_mmid(acct_labels):
  35. mmid_prev,err = None,False
  36. for mmid in sorted(a.mmid for a in acct_labels if a):
  37. if mmid == mmid_prev:
  38. err = True
  39. msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n')
  40. mmid_prev = mmid
  41. if err: rdie(3,'Tracking wallet is corrupted!')
  42. def check_addr_array_lens(acct_pairs):
  43. err = False
  44. for label,addrs in acct_pairs:
  45. if not label: continue
  46. if len(addrs) != 1:
  47. err = True
  48. if len(addrs) == 0:
  49. msg(f'Label {label!r}: has no associated address!')
  50. else:
  51. msg(f'{addrs!r}: more than one {proto.coin} address in account!')
  52. if err: rdie(3,'Tracking wallet is corrupted!')
  53. self.rpc = await rpc_init(proto)
  54. self.total = proto.coin_amt('0')
  55. self.proto = proto
  56. lbl_id = ('account','label')['label_api' in self.rpc.caps]
  57. for d in await self.rpc.call('listunspent',0):
  58. if not lbl_id in d: continue # skip coinbase outputs with missing account
  59. if d['confirmations'] < minconf: continue
  60. label = get_tw_label(proto,d[lbl_id])
  61. if label:
  62. lm = label.mmid
  63. if usr_addr_list and (lm not in usr_addr_list):
  64. continue
  65. if lm in self:
  66. if self[lm]['addr'] != d['address']:
  67. die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format(
  68. proto.coin,
  69. d['address'],
  70. self[lm]['addr'] ))
  71. else:
  72. lm.confs = d['confirmations']
  73. lm.txid = d['txid']
  74. lm.date = None
  75. self[lm] = {
  76. 'amt': proto.coin_amt('0'),
  77. 'lbl': label,
  78. 'addr': CoinAddr(proto,d['address']) }
  79. amt = proto.coin_amt(d['amount'])
  80. self[lm]['amt'] += amt
  81. self.total += amt
  82. # We use listaccounts only for empty addresses, as it shows false positive balances
  83. if showempty or all_labels:
  84. # for compatibility with old mmids, must use raw RPC rather than native data for matching
  85. # args: minconf,watchonly, MUST use keys() so we get list, not dict
  86. if 'label_api' in self.rpc.caps:
  87. acct_list = await self.rpc.call('listlabels')
  88. aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list])
  89. acct_addrs = [list(a.keys()) for a in aa]
  90. else:
  91. acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L'
  92. acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) # use raw list here
  93. acct_labels = MMGenList([get_tw_label(proto,a) for a in acct_list])
  94. check_dup_mmid(acct_labels)
  95. assert len(acct_list) == len(acct_addrs),(
  96. 'listaccounts() and getaddressesbyaccount() not equal in length')
  97. addr_pairs = list(zip(acct_labels,acct_addrs))
  98. check_addr_array_lens(addr_pairs)
  99. for label,addr_arr in addr_pairs:
  100. if not label: continue
  101. if all_labels and not showempty and not label.comment: continue
  102. if usr_addr_list and (label.mmid not in usr_addr_list): continue
  103. if label.mmid not in self:
  104. self[label.mmid] = { 'amt':proto.coin_amt('0'), 'lbl':label, 'addr':'' }
  105. if showbtcaddrs:
  106. self[label.mmid]['addr'] = CoinAddr(proto,addr_arr[0])
  107. def raw_list(self):
  108. return [((k if k.type == 'mmgen' else 'Non-MMGen'),self[k]['addr'],self[k]['amt']) for k in self]
  109. def coinaddr_list(self):
  110. return [self[k]['addr'] for k in self]
  111. async def format(self,showbtcaddrs,sort,show_age,age_fmt):
  112. if not self.has_age:
  113. show_age = False
  114. if age_fmt not in self.age_fmts:
  115. raise BadAgeFormat(f'{age_fmt!r}: invalid age format (must be one of {self.age_fmts!r})')
  116. fs = '{mid}' + ('',' {addr}')[showbtcaddrs] + ' {cmt} {amt}' + ('',' {age}')[show_age]
  117. mmaddrs = [k for k in self.keys() if k.type == 'mmgen']
  118. max_mmid_len = max(len(k) for k in mmaddrs) + 2 if mmaddrs else 10
  119. max_cmt_width = max(max(v['lbl'].comment.screen_width for v in self.values()),7)
  120. addr_width = max(len(self[mmid]['addr']) for mmid in self)
  121. max_fp_len = max([len(a.split('.')[1]) for a in [str(v['amt']) for v in self.values()] if '.' in a] or [1])
  122. def sort_algo(j):
  123. if sort and 'age' in sort:
  124. return '{}_{:>012}_{}'.format(
  125. j.obj.rsplit(':',1)[0],
  126. # Hack, but OK for the foreseeable future:
  127. (1000000000-(j.confs or 0) if hasattr(j,'confs') else 0),
  128. j.sort_key)
  129. else:
  130. return j.sort_key
  131. mmids = sorted(self,key=sort_algo,reverse=bool(sort and 'reverse' in sort))
  132. if show_age:
  133. await self.set_dates(
  134. self.rpc,
  135. [o for o in mmids if hasattr(o,'confs')] )
  136. def gen_output():
  137. if self.proto.chain_name != 'mainnet':
  138. yield 'Chain: '+green(self.proto.chain_name.upper())
  139. yield fs.format(
  140. mid=MMGenID.fmtc('MMGenID',width=max_mmid_len),
  141. addr=(CoinAddr.fmtc('ADDRESS',width=addr_width) if showbtcaddrs else None),
  142. cmt=TwComment.fmtc('COMMENT',width=max_cmt_width+1),
  143. amt='BALANCE'.ljust(max_fp_len+4),
  144. age=age_fmt.upper(),
  145. ).rstrip()
  146. al_id_save = None
  147. for mmid in mmids:
  148. if mmid.type == 'mmgen':
  149. if al_id_save and al_id_save != mmid.obj.al_id:
  150. yield ''
  151. al_id_save = mmid.obj.al_id
  152. mmid_disp = mmid
  153. else:
  154. if al_id_save:
  155. yield ''
  156. al_id_save = None
  157. mmid_disp = 'Non-MMGen'
  158. e = self[mmid]
  159. yield fs.format(
  160. mid=MMGenID.fmtc(mmid_disp,width=max_mmid_len,color=True),
  161. addr=(e['addr'].fmt(color=True,width=addr_width) if showbtcaddrs else None),
  162. cmt=e['lbl'].comment.fmt(width=max_cmt_width,color=True,nullrepl='-'),
  163. amt=e['amt'].fmt('4.{}'.format(max(max_fp_len,3)),color=True),
  164. age=self.age_disp(mmid,age_fmt) if show_age and hasattr(mmid,'confs') else '-'
  165. ).rstrip()
  166. yield '\nTOTAL: {} {}'.format(
  167. self.total.hl(color=True),
  168. self.proto.dcoin )
  169. return '\n'.join(gen_output())