ctl.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. tw.ctl: Tracking wallet control class for the MMGen suite
  20. """
  21. from collections import namedtuple
  22. from ..util import msg, msg_r, ymsg, suf, die
  23. from ..base_obj import AsyncInit
  24. from ..objmethods import MMGenObject
  25. from ..obj import TwComment, get_obj
  26. from ..addr import CoinAddr, is_mmgen_id, is_coin_addr
  27. from ..rpc import rpc_init
  28. from .shared import TwMMGenID, TwLabel
  29. twmmid_addr_pair = namedtuple('addr_info', ['twmmid', 'coinaddr'])
  30. label_addr_pair = namedtuple('label_addr_pair', ['label', 'coinaddr'])
  31. # decorator for TwCtl
  32. def write_mode(orig_func):
  33. def f(self, *args, **kwargs):
  34. if self.mode != 'w':
  35. die(1, '{} opened in read-only mode: cannot execute method {}()'.format(
  36. type(self).__name__,
  37. locals()['orig_func'].__name__
  38. ))
  39. return orig_func(self, *args, **kwargs)
  40. return f
  41. class TwCtl(MMGenObject, metaclass=AsyncInit):
  42. caps = ('rescan', 'batch')
  43. data_key = 'addresses'
  44. importing = False
  45. use_cached_balances = False
  46. def __new__(cls, cfg, proto, *args, **kwargs):
  47. return MMGenObject.__new__(
  48. proto.base_proto_subclass(cls, 'tw.ctl', is_token=kwargs.get('token_addr')))
  49. async def __init__(
  50. self,
  51. cfg,
  52. proto,
  53. *,
  54. mode = 'r',
  55. token_addr = None,
  56. no_rpc = False,
  57. rpc_ignore_wallet = False):
  58. assert mode in ('r', 'w', 'i'), f"{mode!r}: wallet mode must be 'r', 'w' or 'i'"
  59. if mode == 'i':
  60. self.importing = True
  61. mode = 'w'
  62. self.cfg = cfg
  63. self.proto = proto
  64. self.mode = mode
  65. self.desc = self.base_desc = f'{self.proto.name} tracking wallet'
  66. if not no_rpc:
  67. self.rpc = await rpc_init(cfg, proto, ignore_wallet=rpc_ignore_wallet)
  68. async def resolve_address(self, addrspec):
  69. twmmid, coinaddr = (None, None)
  70. pairs = await self.get_label_addr_pairs()
  71. if is_coin_addr(self.proto, addrspec):
  72. coinaddr = get_obj(CoinAddr, proto=self.proto, addr=addrspec)
  73. pair_data = [e for e in pairs if e.coinaddr == coinaddr]
  74. elif is_mmgen_id(self.proto, addrspec):
  75. twmmid = TwMMGenID(self.proto, addrspec)
  76. pair_data = [e for e in pairs if e.label.mmid == twmmid]
  77. else:
  78. msg(f'{addrspec!r}: invalid address for this network')
  79. return None
  80. if not pair_data:
  81. msg('{a} address {b!r} not found in tracking wallet'.format(
  82. a = 'MMGen' if twmmid else 'Coin',
  83. b = twmmid or coinaddr))
  84. return None
  85. return twmmid_addr_pair(
  86. twmmid or pair_data[0].label.mmid,
  87. coinaddr or pair_data[0].coinaddr)
  88. # returns on failure
  89. @write_mode
  90. async def set_comment(
  91. self,
  92. addrspec,
  93. comment = '',
  94. *,
  95. trusted_pair = None,
  96. silent = False):
  97. res = twmmid_addr_pair(*trusted_pair) if trusted_pair else await self.resolve_address(addrspec)
  98. if not res:
  99. return False
  100. comment = get_obj(TwComment, s=comment)
  101. if comment is False:
  102. return False
  103. lbl = get_obj(
  104. TwLabel,
  105. proto = self.proto,
  106. text = res.twmmid + (' ' + comment if comment else ''))
  107. if lbl is False:
  108. return False
  109. if await self.set_label(res.coinaddr, lbl):
  110. if not silent:
  111. desc = '{t} address {a} in tracking wallet'.format(
  112. t = res.twmmid.type.replace('mmgen', 'MMGen'),
  113. a = res.twmmid.addr.hl() if res.twmmid.type == 'mmgen' else
  114. res.twmmid.addr.hl(res.twmmid.addr.view_pref))
  115. msg(
  116. 'Added label {} to {}'.format(comment.hl2(encl='‘’'), desc) if comment else
  117. 'Removed label from {}'.format(desc))
  118. return True
  119. else:
  120. if not silent:
  121. msg('Label could not be {}'.format('added' if comment else 'removed'))
  122. return False
  123. @write_mode
  124. async def remove_comment(self, mmaddr):
  125. await self.set_comment(mmaddr, '')
  126. def check_import_mmid(self, addr, old_mmid, new_mmid):
  127. 'returns True if mmid needs update, None otherwise'
  128. if new_mmid != old_mmid:
  129. if old_mmid.endswith(':' + addr):
  130. ymsg(f'Warning: address {new_mmid} was previously imported as non-MMGen!')
  131. return True
  132. else:
  133. fs = (
  134. 'attempting to import MMGen address {a!r} ({b}) as non-MMGen!'
  135. if new_mmid.endswith(':' + addr) else
  136. 'imported MMGen ID {b!r} does not match tracking wallet MMGen ID {a!r}!')
  137. die(2, fs.format(a=old_mmid, b=new_mmid))
  138. async def import_address_common(self, data, *, batch=False, gather=False):
  139. async def do_import(address, comment, message):
  140. try:
  141. res = await self.import_address(address, label=comment)
  142. self.cfg._util.qmsg(message)
  143. return res
  144. except Exception as e:
  145. die(2, f'\nImport of address {address!r} failed: {e.args[0]!r}')
  146. _d = namedtuple('formatted_import_data', data[0]._fields + ('mmid_disp',))
  147. pfx = self.proto.base_coin.lower() + ':'
  148. fdata = [_d(*d, 'non-MMGen' if d.twmmid.startswith(pfx) else d.twmmid) for d in data]
  149. fs = '{:%s}: {:%s} {:%s} - OK' % (
  150. len(str(len(fdata))) * 2 + 1,
  151. max(len(d.addr) for d in fdata),
  152. max(len(d.mmid_disp) for d in fdata) + 2
  153. )
  154. nAddrs = len(data)
  155. out = [( # create list, not generator, so we know data is valid before starting import
  156. CoinAddr(self.proto, d.addr),
  157. TwLabel(self.proto, d.twmmid + (f' {d.comment}' if d.comment else '')),
  158. fs.format(f'{n}/{nAddrs}', d.addr, f'({d.mmid_disp})')
  159. ) for n, d in enumerate(fdata, 1)]
  160. if batch:
  161. msg_r(f'Batch importing {len(out)} address{suf(data, "es")}...')
  162. ret = await self.batch_import_address((a, b, False) for a, b, c in out)
  163. msg(f'done\n{len(ret)} addresses imported')
  164. else:
  165. if gather: # this seems to provide little performance benefit
  166. import asyncio
  167. await asyncio.gather(*(do_import(*d) for d in out))
  168. else:
  169. for d in out:
  170. await do_import(*d)
  171. msg('Address import completed OK')