addresses.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tw.addresses: Tracking wallet listaddresses class for the MMGen suite
  12. """
  13. from ..util import msg, is_int, die
  14. from ..obj import MMGenListItem, ImmutableAttr, ListItemAttr, TwComment, NonNegativeInt
  15. from ..addr import CoinAddr, MMGenID, MMGenAddrType
  16. from ..amt import CoinAmtChk
  17. from ..color import red, green, yellow
  18. from .view import TwView
  19. from .shared import TwMMGenID
  20. class TwAddresses(TwView):
  21. hdr_lbl = 'tracking wallet addresses'
  22. desc = 'address list'
  23. item_desc = 'address'
  24. item_desc_pl = 'addresses'
  25. sort_key = 'twmmid'
  26. update_widths_on_age_toggle = True
  27. print_output_types = ('detail',)
  28. filters = ('showempty', 'showused', 'all_labels')
  29. showcoinaddrs = True
  30. showempty = True
  31. showused = 1 # tristate: 0: no, 1: yes, 2: all
  32. all_labels = False
  33. mod_subpath = 'tw.addresses'
  34. has_age = False
  35. has_used = False
  36. prompt_fs_in = [
  37. 'Sort options: [a]mt, [M]mgen addr, [r]everse',
  38. 'Filters: show [E]mpty addrs, show all [L]abels',
  39. 'View/Print: pager [v]iew, [w]ide pager view, [p]rint, r[e]draw{s}',
  40. 'Actions: [q]uit menu, [D]elete addr, add [l]abel, [R]efresh balance:']
  41. key_mappings = {
  42. 'a':'s_amt',
  43. 'M':'s_twmmid',
  44. 'r':'s_reverse',
  45. 'e':'d_redraw',
  46. 'E':'d_showempty',
  47. 'L':'d_all_labels',
  48. 'l':'i_comment_add',
  49. 'D':'i_addr_delete',
  50. 'v':'a_view',
  51. 'w':'a_view_detail',
  52. 'p':'a_print_detail'}
  53. extra_key_mappings = {
  54. 'R':'i_balance_refresh'}
  55. class display_type(TwView.display_type):
  56. class squeezed(TwView.display_type.squeezed):
  57. cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'date')
  58. fmt_method = 'gen_display'
  59. class detail(TwView.display_type.detail):
  60. cols = ('num', 'mmid', 'used', 'addr', 'comment', 'amt', 'block', 'date_time')
  61. fmt_method = 'gen_display'
  62. class TwAddress(MMGenListItem):
  63. valid_attrs = {
  64. 'twmmid',
  65. 'addr',
  66. 'al_id',
  67. 'confs',
  68. 'comment',
  69. 'amt',
  70. 'recvd',
  71. 'is_used',
  72. 'date',
  73. 'skip'}
  74. invalid_attrs = {'proto'}
  75. twmmid = ImmutableAttr(TwMMGenID, include_proto=True) # contains confs,txid(unused),date(unused),al_id
  76. addr = ImmutableAttr(CoinAddr, include_proto=True)
  77. al_id = ImmutableAttr(str) # set to '_' for non-MMGen addresses
  78. confs = ImmutableAttr(int, typeconv=False)
  79. comment = ListItemAttr(TwComment, reassign_ok=True)
  80. amt = ImmutableAttr(CoinAmtChk, include_proto=True)
  81. recvd = ImmutableAttr(CoinAmtChk, include_proto=True)
  82. is_used = ImmutableAttr(bool)
  83. date = ListItemAttr(int, typeconv=False, reassign_ok=True)
  84. skip = ListItemAttr(str, typeconv=False, reassign_ok=True)
  85. def __init__(self, proto, **kwargs):
  86. self.__dict__['proto'] = proto
  87. MMGenListItem.__init__(self, **kwargs)
  88. @property
  89. def coinaddr_list(self):
  90. return [d.addr for d in self.data]
  91. async def __init__(self, cfg, proto, *, minconf=1, mmgen_addrs='', get_data=False):
  92. await super().__init__(cfg, proto)
  93. self.minconf = NonNegativeInt(minconf)
  94. self.spc_w = 5 + self.has_age + self.has_used # 1 space between cols + 1 leading space in fs
  95. self.used_w = 4 if self.has_used else 0
  96. if mmgen_addrs:
  97. match mmgen_addrs.rsplit(':', 1):
  98. case [mmid, fmt_str]:
  99. from ..addrlist import AddrIdxList
  100. self.usr_addr_list = [
  101. MMGenID(self.proto, f'{mmid}:{i}') for i in AddrIdxList(fmt_str=fmt_str)]
  102. case _:
  103. die(1,
  104. f'{mmgen_addrs}: invalid address list argument ' +
  105. '(must be in form <seed ID>:[<type>:]<idx list>)')
  106. else:
  107. self.usr_addr_list = []
  108. if get_data:
  109. await self.get_data()
  110. @property
  111. def no_rpcdata_errmsg(self):
  112. return 'No addresses {}found!'.format(
  113. f'with {self.minconf} confirmations ' if self.minconf else '')
  114. async def get_rpc_data(self):
  115. self.total = self.proto.coin_amt('0')
  116. addrs = {}
  117. used_addrs = self.twctl.used_addrs
  118. minconf = int(self.minconf)
  119. block = self.twctl.rpc.get_block_from_minconf(minconf)
  120. for e in await self.twctl.get_label_addr_pairs():
  121. bal = await self.twctl.get_balance(e.coinaddr, block=block)
  122. addrs[e.label.mmid] = {
  123. 'addr': e.coinaddr,
  124. 'amt': bal,
  125. 'recvd': bal, # current bal only, CF btc.tw.addresses.get_rpc_data()
  126. 'is_used': bool(bal) or e.coinaddr in used_addrs,
  127. 'confs': minconf,
  128. 'lbl': e.label}
  129. self.total += bal
  130. return addrs
  131. def gen_data(self, rpc_data, lbl_id):
  132. return (
  133. self.TwAddress(
  134. self.proto,
  135. twmmid = twmmid,
  136. addr = data['addr'],
  137. al_id = getattr(twmmid.obj, 'al_id', '_'),
  138. confs = data['confs'],
  139. comment = data['lbl'].comment,
  140. amt = data['amt'],
  141. recvd = data['recvd'],
  142. is_used = data['is_used'],
  143. date = 0,
  144. skip = '')
  145. for twmmid, data in rpc_data.items())
  146. def filter_data(self):
  147. if self.usr_addr_list:
  148. return (d for d in self.data if d.twmmid.obj in self.usr_addr_list)
  149. else:
  150. return (d for d in self.data if
  151. (self.all_labels and d.comment) or
  152. (self.showused == 2 and d.is_used) or
  153. (not (d.is_used and not self.showused) and (d.amt or self.showempty)))
  154. def get_column_widths(self, data, *, wide, interactive):
  155. return self.compute_column_widths(
  156. widths = { # fixed cols
  157. 'num': max(2, len(str(len(data)))+1),
  158. 'mmid': max(len(d.twmmid.disp) for d in data),
  159. 'used': self.used_w,
  160. 'amt': self.amt_widths['amt'],
  161. 'date': self.age_w if self.has_age else 0,
  162. 'block': self.age_col_params['block'][0] if wide and self.has_age else 0,
  163. 'date_time': self.age_col_params['date_time'][0] if wide and self.has_age else 0,
  164. 'spc': self.spc_w},
  165. maxws = { # expandable cols
  166. 'addr': max(len(d.addr) for d in data) if self.showcoinaddrs else 0,
  167. 'comment': max(d.comment.screen_width for d in data)},
  168. minws = {
  169. 'addr': 12 if self.showcoinaddrs else 0,
  170. 'comment': len('Comment')},
  171. maxws_nice = {'addr': 18},
  172. wide = wide,
  173. interactive = interactive)
  174. def squeezed_col_hdr(self, cw, fs, color):
  175. return fs.format(
  176. n = '',
  177. m = 'MMGenID',
  178. u = 'Used',
  179. a = 'Address',
  180. c = 'Comment',
  181. A = 'Balance',
  182. d = self.age_hdr)
  183. def detail_col_hdr(self, cw, fs, color):
  184. return fs.format(
  185. n = '',
  186. m = 'MMGenID',
  187. u = 'Used',
  188. a = 'Address',
  189. c = 'Comment',
  190. A = 'Balance',
  191. b = 'Block',
  192. D = 'Date/Time')
  193. def squeezed_format_line(self, n, d, cw, fs, color, yes, no):
  194. return fs.format(
  195. n = str(n) + ')',
  196. m = d.twmmid.fmt(cw.mmid, color=color),
  197. u = yes if d.is_used else no,
  198. a = d.addr.fmt(self.addr_view_pref, cw.addr, color=color),
  199. c = d.comment.fmt2(cw.comment, color=color, nullrepl='-'),
  200. A = d.amt.fmt(cw.iwidth, color=color, prec=self.disp_prec),
  201. d = self.age_disp(d, self.age_fmt)
  202. )
  203. def detail_format_line(self, n, d, cw, fs, color, yes, no):
  204. return fs.format(
  205. n = str(n) + ')',
  206. m = d.twmmid.fmt(cw.mmid, color=color),
  207. u = yes if d.is_used else no,
  208. a = d.addr.fmt(self.addr_view_pref, cw.addr, color=color),
  209. c = d.comment.fmt2(cw.comment, color=color, nullrepl='-'),
  210. A = d.amt.fmt(cw.iwidth, color=color, prec=self.disp_prec),
  211. b = self.age_disp(d, 'block'),
  212. D = self.age_disp(d, 'date_time'))
  213. def gen_display(self, data, cw, fs, color, fmt_method):
  214. yes, no = (red('Yes '), green('No ')) if color else ('Yes ', 'No ')
  215. id_save = data[0].al_id
  216. for n, d in enumerate(data, 1):
  217. if id_save != d.al_id:
  218. id_save = d.al_id
  219. yield ''.ljust(self.term_width)
  220. yield fmt_method(n, d, cw, fs, color, yes, no)
  221. async def set_dates(self, addrs):
  222. if not self.dates_set:
  223. bc = self.rpc.blockcount + 1
  224. caddrs = [addr for addr in addrs if addr.confs]
  225. hashes = await self.rpc.gathered_call(
  226. 'getblockhash',
  227. [(n,) for n in [bc - a.confs for a in caddrs]])
  228. dates = [d['time']
  229. for d in await self.rpc.gathered_call(
  230. 'getblockheader',
  231. [(h,) for h in hashes])]
  232. for idx, addr in enumerate(caddrs):
  233. addr.date = dates[idx]
  234. self.dates_set = True
  235. sort_disp = {
  236. 'age': 'AddrListID+Age',
  237. 'amt': 'AddrListID+Amt',
  238. 'twmmid': 'MMGenID'}
  239. sort_funcs = {
  240. 'age': lambda d: '{}_{}_{}'.format(
  241. d.al_id,
  242. # Hack, but OK for the foreseeable future:
  243. ('{:>012}'.format(1_000_000_000 - d.confs) if d.confs else '_'),
  244. d.twmmid.sort_key),
  245. 'amt': lambda d: f'{d.al_id}_{d.amt}',
  246. 'twmmid': lambda d: d.twmmid.sort_key}
  247. @property
  248. def dump_fn_pfx(self):
  249. return 'listaddresses' + (f'-minconf-{self.minconf}' if self.minconf else '')
  250. @property
  251. def sid_ranges(self):
  252. def gen_sid_ranges():
  253. from collections import namedtuple
  254. sid_range = namedtuple('sid_range', ['bot', 'top'])
  255. sid_save = None
  256. bot = None
  257. for n, e in enumerate(self.data):
  258. if e.twmmid.type == 'mmgen':
  259. if e.twmmid.obj.sid != sid_save:
  260. if sid_save:
  261. yield (sid_save, sid_range(bot, n-1))
  262. sid_save = e.twmmid.obj.sid
  263. bot = n
  264. else:
  265. break
  266. else:
  267. n += 1
  268. if sid_save:
  269. yield (sid_save, sid_range(bot, n-1))
  270. assert self.sort_key == 'twmmid'
  271. assert self.reverse is False
  272. if not hasattr(self, '_sid_ranges'):
  273. self._sid_ranges = dict(gen_sid_ranges())
  274. return self._sid_ranges
  275. def is_used(self, coinaddr):
  276. for e in self.data:
  277. if e.addr == coinaddr:
  278. return e.is_used
  279. return None # addr not in tracking wallet
  280. def get_change_address(self, al_id, *, bot=None, top=None, exclude=None, desc=None):
  281. """
  282. Get lowest-indexed unused address in tracking wallet for requested AddrListID.
  283. Return values on failure:
  284. None: no addresses in wallet with requested AddrListID
  285. False: no unused addresses in wallet with requested AddrListID
  286. """
  287. def get_start(bot, top):
  288. """
  289. bisecting algorithm to find first entry with requested al_id
  290. Since 'btc' > 'F' and pre_target sorts below the first twmmid of the al_id
  291. stringwise, we can just search on raw twmmids.
  292. """
  293. pre_target = al_id + ':0'
  294. n = top >> 1
  295. while True:
  296. if bot == top:
  297. return bot if data[bot].al_id == al_id else None
  298. if data[n].twmmid < pre_target:
  299. bot = n + 1
  300. else:
  301. top = n
  302. n = (top + bot) >> 1
  303. assert self.sort_key == 'twmmid'
  304. assert self.reverse is False
  305. data = self.data
  306. start = get_start(
  307. bot = 0 if bot is None else bot,
  308. top = len(data) - 1 if top is None else top)
  309. if start is not None:
  310. for d in data[start:]:
  311. if d.al_id == al_id:
  312. if (
  313. not d.is_used
  314. and not d.twmmid in exclude
  315. and (self.cfg.autochg_ignore_labels or not d.comment)
  316. ):
  317. if d.comment:
  318. msg('{} {} {} {}{}'.format(
  319. yellow('WARNING: address'),
  320. d.twmmid.hl(),
  321. yellow('has a label,'),
  322. d.comment.hl2(encl='‘’'),
  323. yellow(f',\n but allowing it for {desc} anyway by user request')
  324. ))
  325. return d
  326. else:
  327. break
  328. return False
  329. def get_change_address_by_addrtype(self, mmtype, *, exclude, desc):
  330. """
  331. Find the lowest-indexed change addresses in tracking wallet of given address type,
  332. present them in a menu and return a single change address chosen by the user.
  333. If mmtype is None, search all preferred_mmtypes in tracking wallet
  334. Return values on failure:
  335. None: no addresses in wallet of requested address type
  336. False: no unused addresses in wallet of requested address type
  337. """
  338. def choose_address(addrs):
  339. def format_line(n, d):
  340. return '{a:3}) {b}{c}'.format(
  341. a = n,
  342. b = d.twmmid.hl(),
  343. c = yellow(' <== has a label!') if d.comment else ''
  344. )
  345. prompt = '\nChoose a {desc}:\n\n{items}\n\nEnter a number> '.format(
  346. desc = desc,
  347. items = '\n'.join(format_line(n, d) for n, d in enumerate(addrs, 1)))
  348. from ..ui import line_input
  349. while True:
  350. res = line_input(self.cfg, prompt)
  351. if is_int(res) and 0 < int(res) <= len(addrs):
  352. return addrs[int(res)-1]
  353. msg(f'{res}: invalid entry')
  354. def get_addr(mmtype):
  355. return [self.get_change_address(
  356. f'{sid}:{mmtype}', bot=r.bot, top=r.top, exclude=exclude, desc=desc)
  357. for sid, r in self.sid_ranges.items()]
  358. assert isinstance(mmtype, type(None) | MMGenAddrType)
  359. if mmtype:
  360. res = get_addr(mmtype)
  361. else:
  362. have_used = False
  363. for mmtype in self.proto.preferred_mmtypes:
  364. res = get_addr(mmtype)
  365. if any(res):
  366. break
  367. if False in res:
  368. have_used = True
  369. else:
  370. return False if have_used else None
  371. if any(res):
  372. res = list(filter(None, res))
  373. if len(res) == 1:
  374. return res[0]
  375. else:
  376. return choose_address(res)
  377. elif False in res:
  378. return False
  379. class display_action(TwView.display_action):
  380. def d_showempty(self, parent):
  381. parent.showempty = not parent.showempty
  382. def d_showused(self, parent):
  383. parent.showused = (parent.showused + 1) % 3
  384. def d_all_labels(self, parent):
  385. parent.all_labels = not parent.all_labels