tw.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. altcoins.eth.tw: Ethereum tracking wallet and related classes for the MMGen suite
  20. """
  21. import os
  22. from mmgen.util import msg,vmsg,ymsg,write_mode
  23. from mmgen.obj import ListItemAttr,ImmutableAttr
  24. from mmgen.tw import TrackingWallet,TwAddrList,TwUnspentOutputs,TwGetBalance,TwLabel
  25. from mmgen.addr import is_coin_addr,is_mmgen_id
  26. from mmgen.addrdata import AddrData,TwAddrData
  27. from .contract import Token,TokenResolve
  28. from .obj import ETHAmt
  29. class EthereumTrackingWallet(TrackingWallet):
  30. caps = ('batch',)
  31. data_key = 'accounts'
  32. use_tw_file = True
  33. async def is_in_wallet(self,addr):
  34. return addr in self.data_root
  35. def init_empty(self):
  36. self.data = { 'coin': self.proto.coin, 'accounts': {}, 'tokens': {} }
  37. def upgrade_wallet_maybe(self):
  38. upgraded = False
  39. if not 'accounts' in self.data or not 'coin' in self.data:
  40. ymsg(f'Upgrading {self.desc} (v1->v2: accounts field added)')
  41. if not 'accounts' in self.data:
  42. self.data = {}
  43. import json
  44. self.data['accounts'] = json.loads(self.orig_data)
  45. if not 'coin' in self.data:
  46. self.data['coin'] = self.proto.coin
  47. upgraded = True
  48. def have_token_params_fields():
  49. for k in self.data['tokens']:
  50. if 'params' in self.data['tokens'][k]:
  51. return True
  52. def add_token_params_fields():
  53. for k in self.data['tokens']:
  54. self.data['tokens'][k]['params'] = {}
  55. if not 'tokens' in self.data:
  56. self.data['tokens'] = {}
  57. upgraded = True
  58. if self.data['tokens'] and not have_token_params_fields():
  59. ymsg(f'Upgrading {self.desc} (v2->v3: token params fields added)')
  60. add_token_params_fields()
  61. upgraded = True
  62. if upgraded:
  63. self.force_write()
  64. msg(f'{self.desc} upgraded successfully!')
  65. async def rpc_get_balance(self,addr):
  66. return ETHAmt(int(await self.rpc.call('eth_getBalance','0x'+addr,'latest'),16),'wei')
  67. @write_mode
  68. async def batch_import_address(self,args_list):
  69. for arg_list in args_list:
  70. await self.import_address(*arg_list)
  71. return args_list
  72. @write_mode
  73. async def import_address(self,addr,label,foo):
  74. r = self.data_root
  75. if addr in r:
  76. if not r[addr]['mmid'] and label.mmid:
  77. msg(f'Warning: MMGen ID {label.mmid!r} was missing in tracking wallet!')
  78. elif r[addr]['mmid'] != label.mmid:
  79. die(3,'MMGen ID {label.mmid!r} does not match tracking wallet!')
  80. r[addr] = { 'mmid': label.mmid, 'comment': label.comment }
  81. @write_mode
  82. async def remove_address(self,addr):
  83. r = self.data_root
  84. if is_coin_addr(self.proto,addr):
  85. have_match = lambda k: k == addr
  86. elif is_mmgen_id(self.proto,addr):
  87. have_match = lambda k: r[k]['mmid'] == addr
  88. else:
  89. die(1,f'{addr!r} is not an Ethereum address or MMGen ID')
  90. for k in r:
  91. if have_match(k):
  92. # return the addr resolved to mmid if possible
  93. ret = r[k]['mmid'] if is_mmgen_id(self.proto,r[k]['mmid']) else addr
  94. del r[k]
  95. self.write()
  96. return ret
  97. else:
  98. msg(f'Address {addr!r} not found in {self.data_root_desc!r} section of tracking wallet')
  99. return None
  100. @write_mode
  101. async def set_label(self,coinaddr,lbl):
  102. for addr,d in list(self.data_root.items()):
  103. if addr == coinaddr:
  104. d['comment'] = lbl.comment
  105. self.write()
  106. return None
  107. else:
  108. msg(f'Address {coinaddr!r} not found in {self.data_root_desc!r} section of tracking wallet')
  109. return False
  110. async def addr2sym(self,req_addr):
  111. for addr in self.data['tokens']:
  112. if addr == req_addr:
  113. return self.data['tokens'][addr]['params']['symbol']
  114. else:
  115. return None
  116. async def sym2addr(self,sym):
  117. for addr in self.data['tokens']:
  118. if self.data['tokens'][addr]['params']['symbol'] == sym.upper():
  119. return addr
  120. else:
  121. return None
  122. def get_token_param(self,token,param):
  123. if token in self.data['tokens']:
  124. return self.data['tokens'][token]['params'].get(param)
  125. return None
  126. class EthereumTokenTrackingWallet(EthereumTrackingWallet):
  127. desc = 'Ethereum token tracking wallet'
  128. decimals = None
  129. symbol = None
  130. cur_eth_balances = {}
  131. async def __init__(self,proto,mode='r',token_addr=None):
  132. await super().__init__(proto,mode=mode)
  133. for v in self.data['tokens'].values():
  134. self.conv_types(v)
  135. if self.importing and token_addr:
  136. if not is_coin_addr(proto,token_addr):
  137. raise InvalidTokenAddress(f'{token_addr!r}: invalid token address')
  138. else:
  139. assert token_addr == None,'EthereumTokenTrackingWallet_chk1'
  140. token_addr = await self.sym2addr(proto.tokensym) # returns None on failure
  141. if not is_coin_addr(proto,token_addr):
  142. from mmgen.exception import UnrecognizedTokenSymbol
  143. raise UnrecognizedTokenSymbol(f'Specified token {proto.tokensym!r} could not be resolved!')
  144. from mmgen.addr import TokenAddr
  145. self.token = TokenAddr(proto,token_addr)
  146. if self.token not in self.data['tokens']:
  147. if self.importing:
  148. await self.import_token(self.token)
  149. else:
  150. raise TokenNotInWallet(f'Specified token {self.token!r} not in wallet!')
  151. self.decimals = self.get_param('decimals')
  152. self.symbol = self.get_param('symbol')
  153. proto.tokensym = self.symbol
  154. async def is_in_wallet(self,addr):
  155. return addr in self.data['tokens'][self.token]
  156. @property
  157. def data_root(self):
  158. return self.data['tokens'][self.token]
  159. @property
  160. def data_root_desc(self):
  161. return 'token ' + self.get_param('symbol')
  162. async def rpc_get_balance(self,addr):
  163. return await Token(self.proto,self.token,self.decimals,self.rpc).get_balance(addr)
  164. async def get_eth_balance(self,addr,force_rpc=False):
  165. cache = self.cur_eth_balances
  166. r = self.data['accounts']
  167. ret = None if force_rpc else self.get_cached_balance(addr,cache,r)
  168. if ret == None:
  169. ret = await super().rpc_get_balance(addr)
  170. self.cache_balance(addr,ret,cache,r)
  171. return ret
  172. def get_param(self,param):
  173. return self.data['tokens'][self.token]['params'][param]
  174. @write_mode
  175. async def import_token(self,tokenaddr):
  176. """
  177. Token 'symbol' and 'decimals' values are resolved from the network by the system just
  178. once, upon token import. Thereafter, token address, symbol and decimals are resolved
  179. either from the tracking wallet (online operations) or transaction file (when signing).
  180. """
  181. t = await TokenResolve(self.proto,self.rpc,tokenaddr)
  182. self.data['tokens'][tokenaddr] = {
  183. 'params': {
  184. 'symbol': await t.get_symbol(),
  185. 'decimals': t.decimals
  186. }
  187. }
  188. # No unspent outputs with Ethereum, but naming must be consistent
  189. class EthereumTwUnspentOutputs(TwUnspentOutputs):
  190. disp_type = 'eth'
  191. can_group = False
  192. col_adj = 29
  193. hdr_fmt = 'TRACKED ACCOUNTS (sort order: {})\nTotal {}: {}'
  194. desc = 'account balances'
  195. item_desc = 'account'
  196. dump_fn_pfx = 'balances'
  197. prompt = """
  198. Sort options: [a]mount, a[d]dress, [r]everse, [M]mgen addr
  199. Display options: show [m]mgen addr, r[e]draw screen
  200. Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view,
  201. add [l]abel, [D]elete address, [R]efresh balance:
  202. """
  203. key_mappings = {
  204. 'a':'s_amt','d':'s_addr','r':'d_reverse','M':'s_twmmid',
  205. 'm':'d_mmid','e':'d_redraw',
  206. 'q':'a_quit','p':'a_print','v':'a_view','w':'a_view_wide',
  207. 'l':'a_lbl_add','D':'a_addr_delete','R':'a_balance_refresh' }
  208. async def __init__(self,proto,*args,**kwargs):
  209. from mmgen.globalvars import g
  210. if g.cached_balances:
  211. from mmgen.color import yellow
  212. self.hdr_fmt += '\n' + yellow('WARNING: Using cached balances. These may be out of date!')
  213. await TwUnspentOutputs.__init__(self,proto,*args,**kwargs)
  214. def do_sort(self,key=None,reverse=False):
  215. if key == 'txid': return
  216. super().do_sort(key=key,reverse=reverse)
  217. async def get_unspent_rpc(self):
  218. wl = self.wallet.sorted_list
  219. if self.addrs:
  220. wl = [d for d in wl if d['addr'] in self.addrs]
  221. return [{
  222. 'account': TwLabel(self.proto,d['mmid']+' '+d['comment']),
  223. 'address': d['addr'],
  224. 'amount': await self.wallet.get_balance(d['addr']),
  225. 'confirmations': 0, # TODO
  226. } for d in wl]
  227. class MMGenTwUnspentOutput(TwUnspentOutputs.MMGenTwUnspentOutput):
  228. valid_attrs = {'txid','vout','amt','amt2','label','twmmid','addr','confs','skip'}
  229. invalid_attrs = {'proto'}
  230. def age_disp(self,o,age_fmt): # TODO
  231. return None
  232. class EthereumTokenTwUnspentOutputs(EthereumTwUnspentOutputs):
  233. disp_type = 'token'
  234. prompt_fs = 'Total to spend: {} {}\n\n'
  235. col_adj = 37
  236. def get_display_precision(self):
  237. return 10 # truncate precision for narrow display
  238. async def get_unspent_data(self,*args,**kwargs):
  239. await super().get_unspent_data(*args,**kwargs)
  240. for e in self.unspent:
  241. e.amt2 = await self.wallet.get_eth_balance(e.addr)
  242. class EthereumTwAddrList(TwAddrList):
  243. has_age = False
  244. async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None):
  245. self.proto = proto
  246. self.wallet = wallet or await TrackingWallet(self.proto,mode='w')
  247. tw_dict = self.wallet.mmid_ordered_dict
  248. self.total = self.proto.coin_amt('0')
  249. from mmgen.addr import CoinAddr
  250. for mmid,d in list(tw_dict.items()):
  251. # if d['confirmations'] < minconf: continue # cannot get confirmations for eth account
  252. label = TwLabel(self.proto,mmid+' '+d['comment'])
  253. if usr_addr_list and (label.mmid not in usr_addr_list):
  254. continue
  255. bal = await self.wallet.get_balance(d['addr'])
  256. if bal == 0 and not showempty:
  257. if not label.comment or not all_labels:
  258. continue
  259. self[label.mmid] = {'amt': self.proto.coin_amt('0'), 'lbl': label }
  260. if showbtcaddrs:
  261. self[label.mmid]['addr'] = CoinAddr(self.proto,d['addr'])
  262. self[label.mmid]['lbl'].mmid.confs = None
  263. self[label.mmid]['amt'] += bal
  264. self.total += bal
  265. del self.wallet
  266. class EthereumTokenTwAddrList(EthereumTwAddrList):
  267. pass
  268. class EthereumTwGetBalance(TwGetBalance):
  269. fs = '{w:13} {c}\n' # TODO - for now, just suppress display of meaningless data
  270. async def __init__(self,proto,*args,**kwargs):
  271. self.wallet = await TrackingWallet(proto,mode='w')
  272. await TwGetBalance.__init__(self,proto,*args,**kwargs)
  273. async def create_data(self):
  274. data = self.wallet.mmid_ordered_dict
  275. for d in data:
  276. if d.type == 'mmgen':
  277. key = d.obj.sid
  278. if key not in self.data:
  279. self.data[key] = [self.proto.coin_amt('0')] * 4
  280. else:
  281. key = 'Non-MMGen'
  282. conf_level = 2 # TODO
  283. amt = await self.wallet.get_balance(data[d]['addr'])
  284. self.data['TOTAL'][conf_level] += amt
  285. self.data[key][conf_level] += amt
  286. del self.wallet
  287. class EthereumTwAddrData(TwAddrData):
  288. async def get_tw_data(self,wallet=None):
  289. vmsg('Getting address data from tracking wallet')
  290. tw = (wallet or await TrackingWallet(self.proto)).mmid_ordered_dict
  291. # emulate the output of RPC 'listaccounts' and 'getaddressesbyaccount'
  292. return [(mmid+' '+d['comment'],[d['addr']]) for mmid,d in list(tw.items())]
  293. class EthereumTokenTwGetBalance(EthereumTwGetBalance): pass
  294. class EthereumTokenTwAddrData(EthereumTwAddrData): pass
  295. class EthereumAddrData(AddrData): pass
  296. class EthereumTokenAddrData(EthereumAddrData): pass