addresses.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tw.addresses: Tracking wallet listaddresses class for the MMGen suite
  12. """
  13. from ..util import msg, is_int, die
  14. from ..obj import MMGenListItem, ImmutableAttr, ListItemAttr, TwComment, NonNegativeInt
  15. from ..addr import CoinAddr, MMGenID, MMGenAddrType
  16. from ..amt import CoinAmtChk
  17. from ..color import red, green, yellow
  18. from .view import TwView
  19. from .shared import TwMMGenID
  20. class TwAddresses(TwView):
  21. hdr_lbl = 'tracking wallet addresses'
  22. desc = 'address list'
  23. item_desc = 'address'
  24. item_desc_pl = 'addresses'
  25. sort_key = 'twmmid'
  26. update_widths_on_age_toggle = True
  27. print_output_types = ('detail',)
  28. filters = ('showempty', 'showused', 'all_labels')
  29. showcoinaddrs = True
  30. showempty = True
  31. showused = 1 # tristate: 0: no, 1: yes, 2: all
  32. all_labels = False
  33. mod_subpath = 'tw.addresses'
  34. has_age = False
  35. has_used = False
  36. prompt_fs_in = [
  37. 'Sort options: [a]mt, [M]mgen addr, [r]everse',
  38. 'Filters: show [E]mpty addrs, show all [L]abels',
  39. 'View/Print: pager [v]iew, [w]ide pager view, [p]rint, r[e]draw{s}',
  40. 'Actions: [q]uit menu, [D]elete addr, add [l]abel, [R]efresh balance:']
  41. key_mappings = {
  42. 'a':'s_amt',
  43. 'M':'s_twmmid',
  44. 'r':'s_reverse',
  45. 'e':'d_redraw',
  46. 'E':'d_showempty',
  47. 'L':'d_all_labels',
  48. 'l':'i_comment_add',
  49. 'D':'i_addr_delete',
  50. 'v':'a_view',
  51. 'w':'a_view_detail',
  52. 'p':'a_print_detail'}
  53. extra_key_mappings = {
  54. 'R':'i_balance_refresh'}
  55. class display_type(TwView.display_type):
  56. class squeezed(TwView.display_type.squeezed):
  57. cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'date')
  58. fmt_method = 'gen_display'
  59. class detail(TwView.display_type.detail):
  60. cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'block', 'date_time')
  61. fmt_method = 'gen_display'
  62. class TwAddress(MMGenListItem):
  63. valid_attrs = {
  64. 'twmmid',
  65. 'addr',
  66. 'al_id',
  67. 'confs',
  68. 'comment',
  69. 'amt',
  70. 'recvd',
  71. 'is_used',
  72. 'date',
  73. 'skip'}
  74. invalid_attrs = {'proto'}
  75. twmmid = ImmutableAttr(TwMMGenID, include_proto=True) # contains confs,txid(unused),date(unused),al_id
  76. addr = ImmutableAttr(CoinAddr, include_proto=True)
  77. al_id = ImmutableAttr(str) # set to '_' for non-MMGen addresses
  78. confs = ImmutableAttr(int, typeconv=False)
  79. comment = ListItemAttr(TwComment, reassign_ok=True)
  80. amt = ImmutableAttr(CoinAmtChk, include_proto=True)
  81. recvd = ImmutableAttr(CoinAmtChk, include_proto=True)
  82. is_used = ImmutableAttr(bool)
  83. date = ListItemAttr(int, typeconv=False, reassign_ok=True)
  84. skip = ListItemAttr(str, typeconv=False, reassign_ok=True)
  85. def __init__(self, proto, **kwargs):
  86. self.__dict__['proto'] = proto
  87. MMGenListItem.__init__(self, **kwargs)
  88. @property
  89. def coinaddr_list(self):
  90. return [d.addr for d in self.data]
  91. async def __init__(self, cfg, proto, *, minconf=1, mmgen_addrs='', get_data=False):
  92. await super().__init__(cfg, proto)
  93. self.minconf = NonNegativeInt(minconf)
  94. self.spc_w = 5 + self.has_age + self.has_used # 1 space between cols + 1 leading space in fs
  95. self.used_w = 4 if self.has_used else 0
  96. if mmgen_addrs:
  97. match mmgen_addrs.rsplit(':', 1):
  98. case [mmid, fmt_str]:
  99. from ..addrlist import AddrIdxList
  100. self.usr_addr_list = [
  101. MMGenID(self.proto, f'{mmid}:{i}') for i in AddrIdxList(fmt_str=fmt_str)]
  102. case _:
  103. die(1,
  104. f'{mmgen_addrs}: invalid address list argument ' +
  105. '(must be in form <seed ID>:[<type>:]<idx list>)')
  106. else:
  107. self.usr_addr_list = []
  108. if get_data:
  109. await self.get_data()
  110. @property
  111. def no_rpcdata_errmsg(self):
  112. return 'No addresses {}found!'.format(
  113. f'with {self.minconf} confirmations ' if self.minconf else '')
  114. async def get_rpc_data(self):
  115. self.total = self.proto.coin_amt('0')
  116. addrs = {}
  117. used_addrs = self.twctl.used_addrs
  118. minconf = int(self.minconf)
  119. block = self.twctl.rpc.get_block_from_minconf(minconf)
  120. for e in await self.twctl.get_label_addr_pairs():
  121. bal = await self.twctl.get_balance(e.coinaddr, block=block)
  122. addrs[e.label.mmid] = {
  123. 'addr': e.coinaddr,
  124. 'amt': bal,
  125. 'recvd': bal, # current bal only, CF btc.tw.addresses.get_rpc_data()
  126. 'is_used': bool(bal) or e.coinaddr in used_addrs,
  127. 'confs': minconf,
  128. 'lbl': e.label}
  129. self.total += bal
  130. return addrs
  131. async def gen_data(self, rpc_data, lbl_id):
  132. return (
  133. self.TwAddress(
  134. self.proto,
  135. twmmid = twmmid,
  136. addr = data['addr'],
  137. al_id = getattr(twmmid.obj, 'al_id', '_'),
  138. confs = data['confs'],
  139. comment = data['lbl'].comment,
  140. amt = data['amt'],
  141. recvd = data['recvd'],
  142. is_used = data['is_used'],
  143. date = 0,
  144. skip = '')
  145. for twmmid, data in rpc_data.items()
  146. )
  147. def filter_data(self):
  148. if self.usr_addr_list:
  149. return (d for d in self.data if d.twmmid.obj in self.usr_addr_list)
  150. else:
  151. return (d for d in self.data if
  152. (self.all_labels and d.comment) or
  153. (self.showused == 2 and d.is_used) or
  154. (not (d.is_used and not self.showused) and (d.amt or self.showempty))
  155. )
  156. def get_column_widths(self, data, *, wide, interactive):
  157. return self.compute_column_widths(
  158. widths = { # fixed cols
  159. 'num': max(2, len(str(len(data)))+1),
  160. 'mmid': max(len(d.twmmid.disp) for d in data),
  161. 'used': self.used_w,
  162. 'amt': self.amt_widths['amt'],
  163. 'date': self.age_w if self.has_age else 0,
  164. 'block': self.age_col_params['block'][0] if wide and self.has_age else 0,
  165. 'date_time': self.age_col_params['date_time'][0] if wide and self.has_age else 0,
  166. 'spc': self.spc_w},
  167. maxws = { # expandable cols
  168. 'addr': max(len(d.addr) for d in data) if self.showcoinaddrs else 0,
  169. 'comment': max(d.comment.screen_width for d in data)},
  170. minws = {
  171. 'addr': 12 if self.showcoinaddrs else 0,
  172. 'comment': len('Comment')},
  173. maxws_nice = {'addr': 18},
  174. wide = wide,
  175. interactive = interactive)
  176. def squeezed_col_hdr(self, cw, fs, color):
  177. return fs.format(
  178. n = '',
  179. m = 'MMGenID',
  180. u = 'Used',
  181. a = 'Address',
  182. c = 'Comment',
  183. A = 'Balance',
  184. d = self.age_hdr)
  185. def detail_col_hdr(self, cw, fs, color):
  186. return fs.format(
  187. n = '',
  188. m = 'MMGenID',
  189. u = 'Used',
  190. a = 'Address',
  191. c = 'Comment',
  192. A = 'Balance',
  193. b = 'Block',
  194. D = 'Date/Time')
  195. def squeezed_format_line(self, n, d, cw, fs, color, yes, no):
  196. return fs.format(
  197. n = str(n) + ')',
  198. m = d.twmmid.fmt(cw.mmid, color=color),
  199. u = yes if d.is_used else no,
  200. a = d.addr.fmt(self.addr_view_pref, cw.addr, color=color),
  201. c = d.comment.fmt2(cw.comment, color=color, nullrepl='-'),
  202. A = d.amt.fmt(cw.iwidth, color=color, prec=self.disp_prec),
  203. d = self.age_disp(d, self.age_fmt)
  204. )
  205. def detail_format_line(self, n, d, cw, fs, color, yes, no):
  206. return fs.format(
  207. n = str(n) + ')',
  208. m = d.twmmid.fmt(cw.mmid, color=color),
  209. u = yes if d.is_used else no,
  210. a = d.addr.fmt(self.addr_view_pref, cw.addr, color=color),
  211. c = d.comment.fmt2(cw.comment, color=color, nullrepl='-'),
  212. A = d.amt.fmt(cw.iwidth, color=color, prec=self.disp_prec),
  213. b = self.age_disp(d, 'block'),
  214. D = self.age_disp(d, 'date_time'))
  215. def gen_display(self, data, cw, fs, color, fmt_method):
  216. yes, no = (red('Yes '), green('No ')) if color else ('Yes ', 'No ')
  217. id_save = data[0].al_id
  218. for n, d in enumerate(data, 1):
  219. if id_save != d.al_id:
  220. id_save = d.al_id
  221. yield ''.ljust(self.term_width)
  222. yield fmt_method(n, d, cw, fs, color, yes, no)
  223. async def set_dates(self, addrs):
  224. if not self.dates_set:
  225. bc = self.rpc.blockcount + 1
  226. caddrs = [addr for addr in addrs if addr.confs]
  227. hashes = await self.rpc.gathered_call('getblockhash', [(n,) for n in [bc - a.confs for a in caddrs]])
  228. dates = [d['time'] for d in await self.rpc.gathered_call('getblockheader', [(h,) for h in hashes])]
  229. for idx, addr in enumerate(caddrs):
  230. addr.date = dates[idx]
  231. self.dates_set = True
  232. sort_disp = {
  233. 'age': 'AddrListID+Age',
  234. 'amt': 'AddrListID+Amt',
  235. 'twmmid': 'MMGenID'}
  236. sort_funcs = {
  237. 'age': lambda d: '{}_{}_{}'.format(
  238. d.al_id,
  239. # Hack, but OK for the foreseeable future:
  240. ('{:>012}'.format(1_000_000_000 - d.confs) if d.confs else '_'),
  241. d.twmmid.sort_key),
  242. 'amt': lambda d: f'{d.al_id}_{d.amt}',
  243. 'twmmid': lambda d: d.twmmid.sort_key}
  244. @property
  245. def dump_fn_pfx(self):
  246. return 'listaddresses' + (f'-minconf-{self.minconf}' if self.minconf else '')
  247. @property
  248. def sid_ranges(self):
  249. def gen_sid_ranges():
  250. from collections import namedtuple
  251. sid_range = namedtuple('sid_range', ['bot', 'top'])
  252. sid_save = None
  253. bot = None
  254. for n, e in enumerate(self.data):
  255. if e.twmmid.type == 'mmgen':
  256. if e.twmmid.obj.sid != sid_save:
  257. if sid_save:
  258. yield (sid_save, sid_range(bot, n-1))
  259. sid_save = e.twmmid.obj.sid
  260. bot = n
  261. else:
  262. break
  263. else:
  264. n += 1
  265. if sid_save:
  266. yield (sid_save, sid_range(bot, n-1))
  267. assert self.sort_key == 'twmmid'
  268. assert self.reverse is False
  269. if not hasattr(self, '_sid_ranges'):
  270. self._sid_ranges = dict(gen_sid_ranges())
  271. return self._sid_ranges
  272. def is_used(self, coinaddr):
  273. for e in self.data:
  274. if e.addr == coinaddr:
  275. return e.is_used
  276. return None # addr not in tracking wallet
  277. def get_change_address(self, al_id, *, bot=None, top=None, exclude=None, desc=None):
  278. """
  279. Get lowest-indexed unused address in tracking wallet for requested AddrListID.
  280. Return values on failure:
  281. None: no addresses in wallet with requested AddrListID
  282. False: no unused addresses in wallet with requested AddrListID
  283. """
  284. def get_start(bot, top):
  285. """
  286. bisecting algorithm to find first entry with requested al_id
  287. Since 'btc' > 'F' and pre_target sorts below the first twmmid of the al_id
  288. stringwise, we can just search on raw twmmids.
  289. """
  290. pre_target = al_id + ':0'
  291. n = top >> 1
  292. while True:
  293. if bot == top:
  294. return bot if data[bot].al_id == al_id else None
  295. if data[n].twmmid < pre_target:
  296. bot = n + 1
  297. else:
  298. top = n
  299. n = (top + bot) >> 1
  300. assert self.sort_key == 'twmmid'
  301. assert self.reverse is False
  302. data = self.data
  303. start = get_start(
  304. bot = 0 if bot is None else bot,
  305. top = len(data) - 1 if top is None else top)
  306. if start is not None:
  307. for d in data[start:]:
  308. if d.al_id == al_id:
  309. if (
  310. not d.is_used
  311. and not d.twmmid in exclude
  312. and (self.cfg.autochg_ignore_labels or not d.comment)
  313. ):
  314. if d.comment:
  315. msg('{} {} {} {}{}'.format(
  316. yellow('WARNING: address'),
  317. d.twmmid.hl(),
  318. yellow('has a label,'),
  319. d.comment.hl2(encl='‘’'),
  320. yellow(f',\n but allowing it for {desc} anyway by user request')
  321. ))
  322. return d
  323. else:
  324. break
  325. return False
  326. def get_change_address_by_addrtype(self, mmtype, *, exclude, desc):
  327. """
  328. Find the lowest-indexed change addresses in tracking wallet of given address type,
  329. present them in a menu and return a single change address chosen by the user.
  330. If mmtype is None, search all preferred_mmtypes in tracking wallet
  331. Return values on failure:
  332. None: no addresses in wallet of requested address type
  333. False: no unused addresses in wallet of requested address type
  334. """
  335. def choose_address(addrs):
  336. def format_line(n, d):
  337. return '{a:3}) {b}{c}'.format(
  338. a = n,
  339. b = d.twmmid.hl(),
  340. c = yellow(' <== has a label!') if d.comment else ''
  341. )
  342. prompt = '\nChoose a {desc}:\n\n{items}\n\nEnter a number> '.format(
  343. desc = desc,
  344. items = '\n'.join(format_line(n, d) for n, d in enumerate(addrs, 1)))
  345. from ..ui import line_input
  346. while True:
  347. res = line_input(self.cfg, prompt)
  348. if is_int(res) and 0 < int(res) <= len(addrs):
  349. return addrs[int(res)-1]
  350. msg(f'{res}: invalid entry')
  351. def get_addr(mmtype):
  352. return [self.get_change_address(
  353. f'{sid}:{mmtype}', bot=r.bot, top=r.top, exclude=exclude, desc=desc)
  354. for sid, r in self.sid_ranges.items()]
  355. assert isinstance(mmtype, type(None) | MMGenAddrType)
  356. if mmtype:
  357. res = get_addr(mmtype)
  358. else:
  359. have_used = False
  360. for mmtype in self.proto.preferred_mmtypes:
  361. res = get_addr(mmtype)
  362. if any(res):
  363. break
  364. if False in res:
  365. have_used = True
  366. else:
  367. return False if have_used else None
  368. if any(res):
  369. res = list(filter(None, res))
  370. if len(res) == 1:
  371. return res[0]
  372. else:
  373. return choose_address(res)
  374. elif False in res:
  375. return False
  376. class display_action(TwView.display_action):
  377. def d_showempty(self, parent):
  378. parent.showempty = not parent.showempty
  379. def d_showused(self, parent):
  380. parent.showused = (parent.showused + 1) % 3
  381. def d_all_labels(self, parent):
  382. parent.all_labels = not parent.all_labels