contract.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. proto.eth.contract: Ethereum contract and ERC20 token classes
  20. """
  21. from decimal import Decimal
  22. from collections import namedtuple
  23. from . import rlp
  24. from . import erigon_sleep
  25. from ...util import msg, pp_msg, die
  26. from ...base_obj import AsyncInit
  27. from ...obj import CoinTxID
  28. from ...addr import CoinAddr, TokenAddr
  29. def parse_abi(s):
  30. return [s[:8]] + [s[8+x*64:8+(x+1)*64] for x in range(len(s[8:])//64)]
  31. class Contract:
  32. def strip(self, s):
  33. return ''.join([chr(b) for b in s if 32 <= b <= 127]).strip()
  34. def create_method_id(self, sig):
  35. return self.keccak_256(sig.encode()).hexdigest()[:8]
  36. async def code(self):
  37. return (await self.rpc.call('eth_getCode', '0x'+self.addr))[2:]
  38. async def do_call(self, method_sig, method_args='', *, toUnit=False):
  39. data = self.create_method_id(method_sig) + method_args
  40. if self.cfg.debug:
  41. msg('ETH_CALL {}: {}'.format(
  42. method_sig,
  43. '\n '.join(parse_abi(data))))
  44. ret = await self.rpc.call('eth_call', {'to': '0x'+self.addr, 'data': '0x'+data}, 'pending')
  45. await erigon_sleep(self)
  46. if toUnit:
  47. return int(ret, 16) * self.base_unit
  48. else:
  49. return ret
  50. def make_tx_in(self, *, gas, gasPrice, nonce, data):
  51. return {
  52. 'to': bytes.fromhex(self.addr),
  53. 'startgas': gas.toWei(),
  54. 'gasprice': gasPrice.toWei(),
  55. 'value': 0,
  56. 'nonce': nonce,
  57. 'data': bytes.fromhex(data)}
  58. async def txsign(self, tx_in, key, from_addr, *, chain_id=None):
  59. from .pyethereum.transactions import Transaction
  60. if chain_id is None:
  61. res = await self.rpc.call('eth_chainId')
  62. chain_id = None if res is None else int(res, 16)
  63. etx = Transaction(**tx_in).sign(key, chain_id)
  64. if etx.sender.hex() != from_addr:
  65. die(3, f'Sender address {from_addr!r} does not match address of key {etx.sender.hex()!r}!')
  66. if self.cfg.debug:
  67. msg('TOKEN DATA:')
  68. pp_msg(etx.to_dict())
  69. msg('PARSED ABI DATA:\n {}'.format(
  70. '\n '.join(parse_abi(etx.data.hex()))))
  71. return namedtuple('signed_contract_transaction', ['etx', 'txhex', 'txid'])(
  72. etx,
  73. rlp.encode(etx).hex(),
  74. CoinTxID(etx.hash.hex()))
  75. async def txsend(self, txhex):
  76. return (await self.rpc.call('eth_sendRawTransaction', '0x'+txhex)).replace('0x', '', 1)
  77. class Token(Contract):
  78. def __init__(self, cfg, proto, addr, decimals, *, rpc=None):
  79. if type(self).__name__ == 'Token':
  80. from ...util2 import get_keccak
  81. self.keccak_256 = get_keccak(cfg)
  82. self.cfg = cfg
  83. self.proto = proto
  84. self.addr = TokenAddr(proto, addr)
  85. assert isinstance(decimals, int), f'decimals param must be int instance, not {type(decimals)}'
  86. self.decimals = decimals
  87. self.base_unit = Decimal('10') ** -self.decimals
  88. self.rpc = rpc
  89. async def get_balance(self, acct_addr):
  90. return self.proto.coin_amt(
  91. await self.do_call('balanceOf(address)', acct_addr.rjust(64, '0'), toUnit=True),
  92. from_decimal = True)
  93. async def get_name(self):
  94. return self.strip(bytes.fromhex((await self.do_call('name()'))[2:]))
  95. async def get_symbol(self):
  96. return self.strip(bytes.fromhex((await self.do_call('symbol()'))[2:]))
  97. async def get_decimals(self):
  98. ret = await self.do_call('decimals()')
  99. try:
  100. assert ret[:2] == '0x'
  101. return int(ret, 16)
  102. except:
  103. msg(f'RPC call to decimals() failed (returned {ret!r})')
  104. async def get_total_supply(self):
  105. return await self.do_call('totalSupply()', toUnit=True)
  106. async def info(self):
  107. return ('{:15}{}\n' * 5).format(
  108. 'token address:', self.addr,
  109. 'token symbol:', await self.get_symbol(),
  110. 'token name:', await self.get_name(),
  111. 'decimals:', self.decimals,
  112. 'total supply:', await self.get_total_supply())
  113. def transferdata2sendaddr(self, data): # online
  114. return CoinAddr(self.proto, parse_abi(data)[1][-40:])
  115. def transferdata2amt(self, data): # online
  116. return self.proto.coin_amt(
  117. int(parse_abi(data)[-1], 16) * self.base_unit,
  118. from_decimal = True)
  119. def create_token_data(self, to_addr, amt, *, op):
  120. assert op in ('transfer', 'approve'), f'{op}: invalid operation (not ‘transfer’ or ‘approve’)'
  121. return (
  122. self.create_method_id(f'{op}(address,uint256)')
  123. + to_addr.rjust(64, '0')
  124. + '{:064x}'.format(int(amt / self.base_unit)))
  125. # used for testing only:
  126. async def transfer(self, *, from_addr, to_addr, amt, key, gas, gasPrice):
  127. nonce = await self.rpc.call('eth_getTransactionCount', '0x'+from_addr, 'pending')
  128. tx_in = self.make_tx_in(
  129. gas = gas,
  130. gasPrice = gasPrice,
  131. nonce = int(nonce, 16),
  132. data = self.create_token_data(to_addr, amt, op='transfer'))
  133. res = await self.txsign(tx_in, key, from_addr)
  134. return await self.txsend(res.txhex)
  135. class ResolvedToken(Token, metaclass=AsyncInit):
  136. async def __init__(self, cfg, proto, rpc, addr):
  137. from ...util2 import get_keccak
  138. self.keccak_256 = get_keccak(cfg)
  139. self.cfg = cfg
  140. self.proto = proto
  141. self.rpc = rpc
  142. self.addr = TokenAddr(proto, addr)
  143. decimals = await self.get_decimals() # requires self.addr!
  144. if not decimals:
  145. die('TokenNotInBlockchain', f'Token {addr!r} not in blockchain')
  146. Token.__init__(self, cfg, proto, addr, decimals, rpc=rpc)