ctl.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. proto.eth.tw.ctl: Ethereum tracking wallet control class
  20. """
  21. from ....util import msg, ymsg, die
  22. from ....tw.ctl import TwCtl, write_mode, label_addr_pair
  23. from ....tw.shared import TwLabel
  24. from ....addr import is_coin_addr, is_mmgen_id, CoinAddr
  25. from ..contract import Token, ResolvedToken
  26. class EthereumTwCtl(TwCtl):
  27. caps = ('batch',)
  28. data_key = 'accounts'
  29. use_tw_file = True
  30. def init_empty(self):
  31. self.data = {
  32. 'coin': self.proto.coin,
  33. 'network': self.proto.network.upper(),
  34. 'accounts': {},
  35. 'tokens': {},
  36. }
  37. def upgrade_wallet_maybe(self):
  38. upgraded = False
  39. if not 'accounts' in self.data or not 'coin' in self.data:
  40. ymsg(f'Upgrading {self.desc} (v1->v2: accounts field added)')
  41. if not 'accounts' in self.data:
  42. self.data = {}
  43. import json
  44. self.data['accounts'] = json.loads(self.orig_data)
  45. if not 'coin' in self.data:
  46. self.data['coin'] = self.proto.coin
  47. upgraded = True
  48. def have_token_params_fields():
  49. for k in self.data['tokens']:
  50. if 'params' in self.data['tokens'][k]:
  51. return True
  52. def add_token_params_fields():
  53. for k in self.data['tokens']:
  54. self.data['tokens'][k]['params'] = {}
  55. if not 'tokens' in self.data:
  56. self.data['tokens'] = {}
  57. upgraded = True
  58. if self.data['tokens'] and not have_token_params_fields():
  59. ymsg(f'Upgrading {self.desc} (v2->v3: token params fields added)')
  60. add_token_params_fields()
  61. upgraded = True
  62. if not 'network' in self.data:
  63. ymsg(f'Upgrading {self.desc} (v3->v4: network field added)')
  64. self.data['network'] = self.proto.network.upper()
  65. upgraded = True
  66. if upgraded:
  67. self.force_write()
  68. msg(f'{self.desc} upgraded successfully!')
  69. async def rpc_get_balance(self, addr):
  70. return self.proto.coin_amt(
  71. int(await self.rpc.call('eth_getBalance', '0x' + addr, 'latest'), 16),
  72. from_unit = 'wei')
  73. @write_mode
  74. async def batch_import_address(self, args_list):
  75. return [await self.import_address(*a) for a in args_list]
  76. async def rescan_addresses(self, coin_addrs):
  77. pass
  78. @write_mode
  79. async def import_address(self, addr, label, rescan=False):
  80. r = self.data_root
  81. if addr in r:
  82. if not r[addr]['mmid'] and label.mmid:
  83. msg(f'Warning: MMGen ID {label.mmid!r} was missing in tracking wallet!')
  84. elif r[addr]['mmid'] != label.mmid:
  85. die(3, 'MMGen ID {label.mmid!r} does not match tracking wallet!')
  86. r[addr] = {'mmid': label.mmid, 'comment': label.comment}
  87. @write_mode
  88. async def remove_address(self, addr):
  89. r = self.data_root
  90. if is_coin_addr(self.proto, addr):
  91. have_match = lambda k: k == addr
  92. elif is_mmgen_id(self.proto, addr):
  93. have_match = lambda k: r[k]['mmid'] == addr
  94. else:
  95. die(1, f'{addr!r} is not an Ethereum address or MMGen ID')
  96. for k in r:
  97. if have_match(k):
  98. # return the addr resolved to mmid if possible
  99. ret = r[k]['mmid'] if is_mmgen_id(self.proto, r[k]['mmid']) else addr
  100. del r[k]
  101. self.write()
  102. return ret
  103. msg(f'Address {addr!r} not found in {self.data_root_desc!r} section of tracking wallet')
  104. return None
  105. @write_mode
  106. async def set_label(self, coinaddr, lbl):
  107. for addr, d in list(self.data_root.items()):
  108. if addr == coinaddr:
  109. d['comment'] = lbl.comment
  110. self.write()
  111. return True
  112. msg(f'Address {coinaddr!r} not found in {self.data_root_desc!r} section of tracking wallet')
  113. return False
  114. async def addr2sym(self, req_addr):
  115. for addr in self.data['tokens']:
  116. if addr == req_addr:
  117. return self.data['tokens'][addr]['params']['symbol']
  118. async def sym2addr(self, sym):
  119. for addr in self.data['tokens']:
  120. if self.data['tokens'][addr]['params']['symbol'] == sym.upper():
  121. return addr
  122. def get_token_param(self, token, param):
  123. if token in self.data['tokens']:
  124. return self.data['tokens'][token]['params'].get(param)
  125. @property
  126. def sorted_list(self):
  127. return sorted([{
  128. 'addr': x[0],
  129. 'mmid': x[1]['mmid'],
  130. 'comment': x[1]['comment']
  131. } for x in self.data_root.items() if x[0] not in ('params', 'coin')],
  132. key = lambda x: x['mmid'].sort_key + x['addr'])
  133. @property
  134. def mmid_ordered_dict(self):
  135. return dict((x['mmid'], {'addr': x['addr'], 'comment': x['comment']}) for x in self.sorted_list)
  136. async def get_label_addr_pairs(self):
  137. return [label_addr_pair(
  138. TwLabel(self.proto, f"{mmid} {d['comment']}"),
  139. CoinAddr(self.proto, d['addr'])
  140. ) for mmid, d in self.mmid_ordered_dict.items()]
  141. class EthereumTokenTwCtl(EthereumTwCtl):
  142. desc = 'Ethereum token tracking wallet'
  143. decimals = None
  144. symbol = None
  145. cur_eth_balances = {}
  146. async def __init__(self, cfg, proto, mode='r', token_addr=None, no_rpc=False):
  147. await super().__init__(cfg, proto, mode=mode, no_rpc=no_rpc)
  148. for v in self.data['tokens'].values():
  149. self.conv_types(v)
  150. if self.importing and token_addr:
  151. if not is_coin_addr(proto, token_addr):
  152. die('InvalidTokenAddress', f'{token_addr!r}: invalid token address')
  153. else:
  154. assert token_addr is None, 'EthereumTokenTwCtl_chk1'
  155. token_addr = await self.sym2addr(proto.tokensym) # returns None on failure
  156. if not is_coin_addr(proto, token_addr):
  157. die('UnrecognizedTokenSymbol', f'Specified token {proto.tokensym!r} could not be resolved!')
  158. from ....addr import TokenAddr
  159. self.token = TokenAddr(proto, token_addr)
  160. if self.token not in self.data['tokens']:
  161. if self.importing:
  162. await self.import_token(self.token)
  163. else:
  164. die('TokenNotInWallet', f'Specified token {self.token!r} not in wallet!')
  165. self.decimals = self.get_param('decimals')
  166. self.symbol = self.get_param('symbol')
  167. if mode == 'i' and not proto.tokensym:
  168. proto.tokensym = self.symbol
  169. @property
  170. def data_root(self):
  171. return self.data['tokens'][self.token]
  172. @property
  173. def data_root_desc(self):
  174. return 'token ' + self.get_param('symbol')
  175. async def rpc_get_balance(self, addr):
  176. return await Token(self.cfg, self.proto, self.token, self.decimals, self.rpc).get_balance(addr)
  177. async def get_eth_balance(self, addr, force_rpc=False):
  178. cache = self.cur_eth_balances
  179. r = self.data['accounts']
  180. ret = None if force_rpc else self.get_cached_balance(addr, cache, r)
  181. if ret is None:
  182. ret = await super().rpc_get_balance(addr)
  183. self.cache_balance(addr, ret, cache, r)
  184. return ret
  185. def get_param(self, param):
  186. return self.data['tokens'][self.token]['params'][param]
  187. @write_mode
  188. async def import_token(self, tokenaddr):
  189. """
  190. Token 'symbol' and 'decimals' values are resolved from the network by the system just
  191. once, upon token import. Thereafter, token address, symbol and decimals are resolved
  192. either from the tracking wallet (online operations) or transaction file (when signing).
  193. """
  194. t = await ResolvedToken(self.cfg, self.proto, self.rpc, tokenaddr)
  195. self.data['tokens'][tokenaddr] = {
  196. 'params': {
  197. 'symbol': await t.get_symbol(),
  198. 'decimals': t.decimals
  199. }
  200. }