addresses.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tw.addresses: Tracking wallet listaddresses class for the MMGen suite
  12. """
  13. from ..util import msg,suf,is_int
  14. from ..obj import MMGenListItem,ImmutableAttr,ListItemAttr,TwComment,NonNegativeInt
  15. from ..addr import CoinAddr,MMGenID,MMGenAddrType
  16. from ..color import red,green,yellow
  17. from .view import TwView
  18. from .shared import TwMMGenID
  19. class TwAddresses(TwView):
  20. hdr_lbl = 'tracking wallet addresses'
  21. desc = 'address list'
  22. item_desc = 'address'
  23. sort_key = 'twmmid'
  24. update_widths_on_age_toggle = True
  25. print_output_types = ('detail',)
  26. filters = ('showempty','showused','all_labels')
  27. showcoinaddrs = True
  28. showempty = True
  29. showused = 1 # tristate: 0:no, 1:yes, 2:all
  30. all_labels = False
  31. no_data_errmsg = 'No addresses in tracking wallet!'
  32. mod_subpath = 'tw.addresses'
  33. class display_type(TwView.display_type):
  34. class squeezed(TwView.display_type.squeezed):
  35. cols = ('num','mmid','used','addr','comment','amt','date')
  36. fmt_method = 'gen_display'
  37. class detail(TwView.display_type.detail):
  38. cols = ('num','mmid','used','addr','comment','amt','block','date_time')
  39. fmt_method = 'gen_display'
  40. class TwAddress(MMGenListItem):
  41. valid_attrs = {'twmmid','addr','al_id','confs','comment','amt','recvd','date','skip'}
  42. invalid_attrs = {'proto'}
  43. twmmid = ImmutableAttr(TwMMGenID,include_proto=True) # contains confs,txid(unused),date(unused),al_id
  44. addr = ImmutableAttr(CoinAddr,include_proto=True)
  45. al_id = ImmutableAttr(str) # set to '_' for non-MMGen addresses
  46. confs = ImmutableAttr(int,typeconv=False)
  47. comment = ListItemAttr(TwComment,reassign_ok=True)
  48. amt = ImmutableAttr(None)
  49. recvd = ImmutableAttr(None)
  50. date = ListItemAttr(int,typeconv=False,reassign_ok=True)
  51. skip = ListItemAttr(str,typeconv=False,reassign_ok=True)
  52. def __init__(self,proto,**kwargs):
  53. self.__dict__['proto'] = proto
  54. MMGenListItem.__init__(self,**kwargs)
  55. class conv_funcs:
  56. @staticmethod
  57. def amt(instance,value):
  58. return instance.proto.coin_amt(value)
  59. @staticmethod
  60. def recvd(instance,value):
  61. return instance.proto.coin_amt(value)
  62. @property
  63. def coinaddr_list(self):
  64. return [d.addr for d in self.data]
  65. async def __init__(self,cfg,proto,minconf=1,mmgen_addrs='',get_data=False):
  66. await super().__init__(cfg,proto)
  67. self.minconf = NonNegativeInt(minconf)
  68. if mmgen_addrs:
  69. a = mmgen_addrs.rsplit(':',1)
  70. if len(a) != 2:
  71. from ..util import die
  72. die(1,
  73. f'{mmgen_addrs}: invalid address list argument ' +
  74. '(must be in form <seed ID>:[<type>:]<idx list>)' )
  75. from ..addrlist import AddrIdxList
  76. self.usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])]
  77. else:
  78. self.usr_addr_list = []
  79. if get_data:
  80. await self.get_data()
  81. @property
  82. def no_rpcdata_errmsg(self):
  83. return 'No addresses {}found!'.format(
  84. f'with {self.minconf} confirmations ' if self.minconf else '')
  85. async def gen_data(self,rpc_data,lbl_id):
  86. return (
  87. self.TwAddress(
  88. self.proto,
  89. twmmid = twmmid,
  90. addr = data['addr'],
  91. al_id = getattr(twmmid.obj,'al_id','_'),
  92. confs = data['confs'],
  93. comment = data['lbl'].comment,
  94. amt = data['amt'],
  95. recvd = data['recvd'],
  96. date = 0,
  97. skip = '' )
  98. for twmmid,data in rpc_data.items()
  99. )
  100. def filter_data(self):
  101. if self.usr_addr_list:
  102. return (d for d in self.data if d.twmmid.obj in self.usr_addr_list)
  103. else:
  104. return (d for d in self.data if
  105. (self.all_labels and d.comment) or
  106. (self.showused == 2 and d.recvd) or
  107. (not (d.recvd and not self.showused) and (d.amt or self.showempty))
  108. )
  109. def get_column_widths(self,data,wide,interactive):
  110. return self.compute_column_widths(
  111. widths = { # fixed cols
  112. 'num': max(2,len(str(len(data)))+1),
  113. 'mmid': max(len(d.twmmid.disp) for d in data),
  114. 'used': 4,
  115. 'amt': self.amt_widths['amt'],
  116. 'date': self.age_w if self.has_age else 0,
  117. 'block': self.age_col_params['block'][0] if wide and self.has_age else 0,
  118. 'date_time': self.age_col_params['date_time'][0] if wide and self.has_age else 0,
  119. 'spc': 7, # 6 spaces between cols + 1 leading space in fs
  120. },
  121. maxws = { # expandable cols
  122. 'addr': max(len(d.addr) for d in data) if self.showcoinaddrs else 0,
  123. 'comment': max(d.comment.screen_width for d in data),
  124. },
  125. minws = {
  126. 'addr': 12 if self.showcoinaddrs else 0,
  127. 'comment': len('Comment'),
  128. },
  129. maxws_nice = {'addr': 18},
  130. wide = wide,
  131. interactive = interactive,
  132. )
  133. def gen_subheader(self,cw,color):
  134. if self.minconf:
  135. yield f'Displaying balances with at least {self.minconf} confirmation{suf(self.minconf)}'
  136. def squeezed_col_hdr(self,cw,fs,color):
  137. return fs.format(
  138. n = '',
  139. m = 'MMGenID',
  140. u = 'Used',
  141. a = 'Address',
  142. c = 'Comment',
  143. A = 'Balance',
  144. d = self.age_hdr )
  145. def detail_col_hdr(self,cw,fs,color):
  146. return fs.format(
  147. n = '',
  148. m = 'MMGenID',
  149. u = 'Used',
  150. a = 'Address',
  151. c = 'Comment',
  152. A = 'Balance',
  153. b = 'Block',
  154. D = 'Date/Time' )
  155. def squeezed_format_line(self,n,d,cw,fs,color,yes,no):
  156. return fs.format(
  157. n = str(n) + ')',
  158. m = d.twmmid.fmt( width=cw.mmid, color=color ),
  159. u = yes if d.recvd else no,
  160. a = d.addr.fmt( color=color, width=cw.addr ),
  161. c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ),
  162. A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ),
  163. d = self.age_disp( d, self.age_fmt )
  164. )
  165. def detail_format_line(self,n,d,cw,fs,color,yes,no):
  166. return fs.format(
  167. n = str(n) + ')',
  168. m = d.twmmid.fmt( width=cw.mmid, color=color ),
  169. u = yes if d.recvd else no,
  170. a = d.addr.fmt( color=color, width=cw.addr ),
  171. c = d.comment.fmt2( width=cw.comment, color=color, nullrepl='-' ),
  172. A = d.amt.fmt( color=color, iwidth=cw.iwidth, prec=self.disp_prec ),
  173. b = self.age_disp( d, 'block' ),
  174. D = self.age_disp( d, 'date_time' ))
  175. def gen_display(self,data,cw,fs,color,fmt_method):
  176. yes,no = (red('Yes '),green('No ')) if color else ('Yes ','No ')
  177. id_save = data[0].al_id
  178. for n,d in enumerate(data,1):
  179. if id_save != d.al_id:
  180. id_save = d.al_id
  181. yield ''.ljust(self.term_width)
  182. yield fmt_method(n,d,cw,fs,color,yes,no)
  183. async def set_dates(self,addrs):
  184. if not self.dates_set:
  185. bc = self.rpc.blockcount + 1
  186. caddrs = [addr for addr in addrs if addr.confs]
  187. hashes = await self.rpc.gathered_call('getblockhash',[(n,) for n in [bc - a.confs for a in caddrs]])
  188. dates = [d['time'] for d in await self.rpc.gathered_call('getblockheader',[(h,) for h in hashes])]
  189. for idx,addr in enumerate(caddrs):
  190. addr.date = dates[idx]
  191. self.dates_set = True
  192. sort_disp = {
  193. 'age': 'AddrListID+Age',
  194. 'amt': 'AddrListID+Amt',
  195. 'twmmid': 'MMGenID',
  196. }
  197. sort_funcs = {
  198. 'age': lambda d: '{}_{}_{}'.format(
  199. d.al_id,
  200. # Hack, but OK for the foreseeable future:
  201. ('{:>012}'.format(1_000_000_000 - d.confs) if d.confs else '_'),
  202. d.twmmid.sort_key),
  203. 'amt': lambda d: f'{d.al_id}_{d.amt}',
  204. 'twmmid': lambda d: d.twmmid.sort_key,
  205. }
  206. @property
  207. def dump_fn_pfx(self):
  208. return 'listaddresses' + (f'-minconf-{self.minconf}' if self.minconf else '')
  209. @property
  210. def sid_ranges(self):
  211. def gen_sid_ranges():
  212. from collections import namedtuple
  213. sid_range = namedtuple('sid_range',['bot','top'])
  214. sid_save = None
  215. bot = None
  216. for n,e in enumerate(self.data):
  217. if e.twmmid.type == 'mmgen':
  218. if e.twmmid.obj.sid != sid_save:
  219. if sid_save:
  220. yield (sid_save, sid_range(bot, n-1))
  221. sid_save = e.twmmid.obj.sid
  222. bot = n
  223. else:
  224. break
  225. else:
  226. n += 1
  227. if sid_save:
  228. yield (sid_save, sid_range(bot, n-1))
  229. assert self.sort_key == 'twmmid'
  230. assert self.reverse is False
  231. if not hasattr(self,'_sid_ranges'):
  232. self._sid_ranges = dict(gen_sid_ranges())
  233. return self._sid_ranges
  234. def is_used(self,coinaddr):
  235. for e in self.data:
  236. if e.addr == coinaddr:
  237. return bool(e.recvd)
  238. else: # addr not in tracking wallet
  239. return None
  240. def get_change_address(self,al_id,bot=None,top=None):
  241. """
  242. Get lowest-indexed unused address in tracking wallet for requested AddrListID.
  243. Return values on failure:
  244. None: no addresses in wallet with requested AddrListID
  245. False: no unused addresses in wallet with requested AddrListID
  246. """
  247. def get_start(bot,top):
  248. """
  249. bisecting algorithm to find first entry with requested al_id
  250. Since 'btc' > 'F' and pre_target sorts below the first twmmid of the al_id
  251. stringwise, we can just search on raw twmmids.
  252. """
  253. pre_target = al_id + ':0'
  254. n = top >> 1
  255. while True:
  256. if bot == top:
  257. return bot if data[bot].al_id == al_id else None
  258. if data[n].twmmid < pre_target:
  259. bot = n + 1
  260. else:
  261. top = n
  262. n = (top + bot) >> 1
  263. assert self.sort_key == 'twmmid'
  264. assert self.reverse is False
  265. data = self.data
  266. start = get_start(
  267. bot = 0 if bot is None else bot,
  268. top = len(data) - 1 if top is None else top )
  269. if start is not None:
  270. for d in data[start:]:
  271. if d.al_id == al_id:
  272. if not d.recvd and (self.cfg.autochg_ignore_labels or not d.comment):
  273. if d.comment:
  274. msg('{} {} {} {}{}'.format(
  275. yellow('WARNING: address'),
  276. d.twmmid.hl(),
  277. yellow('has a label,'),
  278. d.comment.hl2(encl='‘’'),
  279. yellow(',\n but allowing it for change anyway by user request')
  280. ))
  281. return d
  282. else:
  283. break
  284. return False
  285. def get_change_address_by_addrtype(self,mmtype):
  286. """
  287. Find the lowest-indexed change addresses in tracking wallet of given address type,
  288. present them in a menu and return a single change address chosen by the user.
  289. Return values on failure:
  290. None: no addresses in wallet of requested address type
  291. False: no unused addresses in wallet of requested address type
  292. """
  293. def choose_address(addrs):
  294. def format_line(n,d):
  295. return '{a:3}) {b}{c}'.format(
  296. a = n,
  297. b = d.twmmid.hl(),
  298. c = yellow(' <== has a label!') if d.comment else ''
  299. )
  300. prompt = '\nChoose a change address:\n\n{}\n\nEnter a number> '.format(
  301. '\n'.join(format_line(n,d) for n,d in enumerate(addrs,1))
  302. )
  303. from ..ui import line_input
  304. while True:
  305. res = line_input( self.cfg, prompt )
  306. if is_int(res) and 0 < int(res) <= len(addrs):
  307. return addrs[int(res)-1]
  308. msg(f'{res}: invalid entry')
  309. assert isinstance(mmtype,MMGenAddrType)
  310. res = [self.get_change_address( f'{sid}:{mmtype}', r.bot, r.top ) for sid,r in self.sid_ranges.items()]
  311. if any(res):
  312. res = list(filter(None,res))
  313. if len(res) == 1:
  314. return res[0]
  315. else:
  316. return choose_address(res)
  317. elif False in res:
  318. return False
  319. class display_action(TwView.display_action):
  320. def d_showempty(self,parent):
  321. parent.showempty = not parent.showempty
  322. def d_showused(self,parent):
  323. parent.showused = (parent.showused + 1) % 3
  324. def d_all_labels(self,parent):
  325. parent.all_labels = not parent.all_labels