protocol.py 12 KB


  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. protocol: Coin protocol base classes and initializer
  20. """
  21. from collections import namedtuple
  22. from .cfg import gc
  23. from .base_obj import Lockable
  24. from .objmethods import MMGenObject
  25. decoded_wif = namedtuple('decoded_wif', ['sec', 'pubkey_type', 'compressed'])
  26. decoded_addr = namedtuple('decoded_addr', ['bytes', 'ver_bytes', 'fmt'])
  27. decoded_addr_multiview = namedtuple('mv_decoded_addr', ['bytes', 'ver_bytes', 'fmt', 'addr', 'views', 'view_pref'])
  28. parsed_addr = namedtuple('parsed_addr', ['ver_bytes', 'data'])
  29. _finfo = namedtuple('fork_info', ['height', 'hash', 'name', 'replayable'])
  30. _nw = namedtuple('coin_networks', ['mainnet', 'testnet', 'regtest'])
  31. class CoinProtocol(MMGenObject):
  32. proto_info = namedtuple('proto_info', ['name', 'trust_level']) # trust levels: see altcoin/params.py
  33. # keys are mirrored in gc.core_coins:
  34. coins = {
  35. 'btc': proto_info('Bitcoin', 5),
  36. 'bch': proto_info('BitcoinCash', 5),
  37. 'ltc': proto_info('Litecoin', 5),
  38. 'eth': proto_info('Ethereum', 4),
  39. 'etc': proto_info('EthereumClassic', 4),
  40. 'zec': proto_info('Zcash', 2),
  41. 'xmr': proto_info('Monero', 5),
  42. 'rune': proto_info('THORChain', 4)}
  43. class Base(Lockable):
  44. base_proto = None
  45. base_proto_coin = None
  46. base_coin = None
  47. is_fork_of = None
  48. chain_names = None
  49. is_vm = False
  50. is_evm = False
  51. has_usr_fee = True
  52. rpc_type = 'local'
  53. networks = ('mainnet', 'testnet', 'regtest')
  54. decimal_prec = 28
  55. _set_ok = ('tokensym',)
  56. def __init__(self, cfg, coin, *, name, network, tokensym=None, need_amt=False):
  57. self.cfg = cfg
  58. self.coin = coin.upper()
  59. self.coin_id = self.coin
  60. self.name = name
  61. self.network = network
  62. self.tokensym = tokensym
  63. self.cls_name = type(self).__name__
  64. self.testnet = network in ('testnet', 'regtest')
  65. self.regtest = network == 'regtest'
  66. self.networks = tuple(k for k, v in self.network_names._asdict().items() if v)
  67. self.network_id = coin.lower() + {
  68. 'mainnet': '',
  69. 'testnet': '_tn',
  70. 'regtest': '_rt',
  71. }[network]
  72. if hasattr(self, 'wif_ver_num'):
  73. self.wif_ver_bytes = {k: bytes.fromhex(v) for k, v in self.wif_ver_num.items()}
  74. self.wif_ver_bytes_to_pubkey_type = {v: k for k, v in self.wif_ver_bytes.items()}
  75. vbs = list(self.wif_ver_bytes.values())
  76. self.wif_ver_bytes_len = len(vbs[0]) if len(set(len(b) for b in vbs)) == 1 else None
  77. if hasattr(self, 'addr_ver_info'):
  78. self.addr_ver_bytes = {bytes.fromhex(k): v for k, v in self.addr_ver_info.items()}
  79. self.addr_fmt_to_ver_bytes = {v: k for k, v in self.addr_ver_bytes.items()}
  80. self.addr_ver_bytes_len = len(list(self.addr_ver_bytes)[0])
  81. if gc.cmd_caps:
  82. for cap in gc.cmd_caps.caps:
  83. if cap not in self.mmcaps:
  84. from .util import die
  85. die(2, f'Command {gc.prog_name!r} not supported for coin {self.coin}')
  86. if self.chain_names:
  87. self.chain_name = self.chain_names[0] # first chain name is default
  88. else:
  89. self.chain_names = [self.network]
  90. self.chain_name = self.network
  91. if self.tokensym:
  92. assert self.name.startswith('Ethereum'), 'CoinProtocol.Base_chk1'
  93. if self.base_coin in ('ETH', 'XMR'):
  94. from .util2 import get_keccak
  95. self.keccak_256 = get_keccak(cfg)
  96. if need_amt:
  97. from . import amt
  98. from decimal import getcontext
  99. self.coin_amt = getattr(amt, self.coin_amt)
  100. self.max_tx_fee = self.coin_amt(str(self.max_tx_fee)) if hasattr(self, 'max_tx_fee') else None
  101. getcontext().prec = self.decimal_prec
  102. else:
  103. self.coin_amt = None
  104. self.max_tx_fee = None
  105. self.set_cfg_opts()
  106. def set_cfg_opts(self):
  107. pass
  108. @property
  109. def dcoin(self):
  110. return self.coin
  111. @classmethod
  112. def chain_name_to_network(cls, cfg, coin, chain_name):
  113. """
  114. The generic networks 'mainnet', 'testnet' and 'regtest' are required for all coins
  115. that support transaction operations.
  116. For protocols that have specific names for chains corresponding to these networks,
  117. the attribute 'chain_name' is used, while 'network' retains the generic name.
  118. For Bitcoin and Bitcoin forks, 'network' and 'chain_name' are equivalent.
  119. """
  120. for network in ('mainnet', 'testnet', 'regtest'):
  121. proto = init_proto(cfg, coin, network=network)
  122. for proto_chain_name in proto.chain_names:
  123. if chain_name == proto_chain_name:
  124. return network
  125. raise ValueError(f'{chain_name}: unrecognized chain name for coin {coin}')
  126. @staticmethod
  127. def parse_network_id(network_id):
  128. match network_id.rsplit('_', 1):
  129. case (coin, netcode) if netcode in ('tn', 'rt'):
  130. network = {'tn': 'testnet', 'rt': 'regtest'}[netcode]
  131. case _:
  132. coin = network_id
  133. network = 'mainnet'
  134. return namedtuple('parsed_network_id', ['coin', 'network'])(coin, network)
  135. @staticmethod
  136. def create_network_id(coin, network):
  137. return coin.lower() + {'mainnet':'', 'testnet':'_tn', 'regtest':'_rt'}[network]
  138. def cap(self, s):
  139. return s in self.caps
  140. def get_addr_len(self, addr_fmt):
  141. return self.addr_len
  142. def decode_addr_bytes(self, addr_bytes):
  143. vlen = self.addr_ver_bytes_len
  144. return decoded_addr(
  145. addr_bytes[vlen:],
  146. addr_bytes[:vlen],
  147. self.addr_ver_bytes[addr_bytes[:vlen]])
  148. def coin_addr(self, addr):
  149. from .addr import CoinAddr
  150. return CoinAddr(proto=self, addr=addr)
  151. def addr_type(self, id_str):
  152. from .addr import MMGenAddrType
  153. return MMGenAddrType(proto=self, id_str=id_str)
  154. def viewkey(self, viewkey_str):
  155. raise NotImplementedError(f'{self.name} protocol does not support view keys')
  156. def base_proto_subclass(self, cls, modname, *, sub_clsname=None, is_token=False):
  157. """
  158. magic module loading and class selection
  159. """
  160. modpath = f'mmgen.proto.{self.base_proto_coin.lower()}.{modname}'
  161. clsname = (
  162. self.mod_clsname
  163. + ('Token' if self.tokensym or is_token else '')
  164. + cls.__name__)
  165. import importlib
  166. if sub_clsname:
  167. return getattr(getattr(importlib.import_module(modpath), clsname), sub_clsname)
  168. else:
  169. return getattr(importlib.import_module(modpath), clsname)
  170. class RPC:
  171. # prefixed with coin, e.g. ‘ltc_rpc_host’: refvals taken from proto class
  172. coin_cfg_opts = ()
  173. # prefixed with coin + network, e.g. ‘eth_mainnet_chain_names’: refvals taken from proto class
  174. proto_cfg_opts = ()
  175. # default vals (refvals): bool(val) must be False (val = None -> option takes no parameter)
  176. ignore_daemon_version = None
  177. rpc_host = ''
  178. rpc_port = 0
  179. rpc_user = ''
  180. rpc_password = ''
  181. tw_name = ''
  182. daemon_id = ''
  183. @classmethod
  184. def get_opt_clsval(cls, cfg, opt):
  185. coin, *rem = opt.split('_', 2)
  186. network = rem[0] if rem[0] in init_proto(cfg, coin, return_cls=True).network_names else None
  187. opt_name = '_'.join(rem[bool(network):])
  188. if ((network is None and opt_name in cls.coin_cfg_opts) or
  189. (network and opt_name in cls.proto_cfg_opts)):
  190. # raises AttributeError on failure:
  191. return getattr(init_proto(cfg, coin, network=network, return_cls=True), opt_name)
  192. else:
  193. raise AttributeError(f'{opt_name}: unrecognized attribute')
  194. def set_cfg_opts(self):
  195. for opt in self.cfg.__dict__:
  196. if opt.startswith(self.coin.lower() + '_'):
  197. res = opt.split('_', 2)[1:]
  198. network = res[0] if res[0] in self.network_names else None
  199. if network is None or network == self.network:
  200. setattr(self, '_'.join(res[bool(network):]), getattr(self.cfg, opt))
  201. class Secp256k1(RPC, Base):
  202. """
  203. Bitcoin and Ethereum protocols inherit from this class
  204. """
  205. secp256k1_group_order = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
  206. privkey_len = 32
  207. pubkey_types = ('std',)
  208. def parse_addr(self, ver_bytes, addr_bytes, fmt):
  209. return parsed_addr(
  210. ver_bytes = ver_bytes,
  211. data = addr_bytes,
  212. )
  213. def preprocess_key(self, sec, pubkey_type):
  214. # Key must be non-zero and less than group order of secp256k1 curve
  215. if 0 < int.from_bytes(sec, 'big') < self.secp256k1_group_order:
  216. return sec
  217. # less than 1 in 2^127 probability that we get here
  218. from .util import die, ymsg
  219. match int.from_bytes(sec, 'big'):
  220. case 0:
  221. die(4, 'Private key is zero!')
  222. case self.secp256k1_group_order:
  223. die(4, 'Private key == secp256k1_group_order!')
  224. case pk: # return key (mod group order) as the key
  225. if not self.cfg.test_suite:
  226. ymsg(f'Warning: private key is greater than secp256k1 group order!:\n {sec.hex()}')
  227. return (pk % self.secp256k1_group_order).to_bytes(self.privkey_len, 'big')
  228. class DummyWIF:
  229. """
  230. Ethereum and Monero protocols inherit from this class
  231. """
  232. def encode_wif(self, privbytes, pubkey_type, compressed):
  233. assert pubkey_type == self.pubkey_type, f'{pubkey_type}: invalid pubkey_type for {self.name} protocol!'
  234. assert compressed is False, f'{self.name} protocol does not support compressed pubkeys!'
  235. return privbytes.hex()
  236. def decode_wif(self, wif):
  237. return decoded_wif(
  238. sec = bytes.fromhex(wif),
  239. pubkey_type = self.pubkey_type,
  240. compressed = False)
  241. def init_proto(
  242. cfg,
  243. coin = None,
  244. *,
  245. testnet = False,
  246. regtest = False,
  247. network = None,
  248. network_id = None,
  249. tokensym = None,
  250. need_amt = False,
  251. return_cls = False):
  252. assert type(testnet) is bool, 'init_proto_chk1'
  253. assert type(regtest) is bool, 'init_proto_chk2'
  254. assert coin or network_id, 'init_proto_chk3'
  255. assert not (coin and network_id), 'init_proto_chk4'
  256. if network_id:
  257. coin, network = CoinProtocol.Base.parse_network_id(network_id)
  258. elif network:
  259. assert network in CoinProtocol.Base.networks, f'init_proto_chk5 - {network!r}: invalid network'
  260. assert testnet is False, 'init_proto_chk6'
  261. assert regtest is False, 'init_proto_chk7'
  262. else:
  263. network = 'regtest' if regtest else 'testnet' if testnet else 'mainnet'
  264. coin = coin.lower()
  265. if coin not in CoinProtocol.coins:
  266. from .altcoin.params import init_genonly_altcoins
  267. init_genonly_altcoins(coin, testnet=testnet) # raises exception on failure
  268. name = CoinProtocol.coins[coin].name
  269. proto_name = name + ('' if network == 'mainnet' else network.capitalize())
  270. if not hasattr(CoinProtocol, proto_name):
  271. import importlib
  272. setattr(
  273. CoinProtocol,
  274. proto_name,
  275. getattr(importlib.import_module(f'mmgen.proto.{coin}.params'), network)
  276. )
  277. if return_cls:
  278. return getattr(CoinProtocol, proto_name)
  279. return getattr(CoinProtocol, proto_name)(
  280. cfg = cfg,
  281. coin = coin,
  282. name = name,
  283. network = network,
  284. tokensym = tokensym,
  285. need_amt = need_amt)
  286. def init_proto_from_cfg(cfg, need_amt):
  287. return init_proto(
  288. cfg = cfg,
  289. coin = cfg.coin,
  290. network = cfg.network,
  291. tokensym = cfg.token,
  292. need_amt = need_amt)
  293. def warn_trustlevel(cfg):
  294. coinsym = cfg.coin
  295. if coinsym.lower() in CoinProtocol.coins:
  296. trust_level = CoinProtocol.coins[coinsym.lower()].trust_level
  297. else:
  298. from .altcoin.params import CoinInfo
  299. e = CoinInfo.get_entry(coinsym, 'mainnet')
  300. trust_level = e.trust_level if e else None
  301. if trust_level in (None, -1):
  302. from .util import die
  303. die(1, f'Coin {coinsym} is not supported by {gc.proj_name}')
  304. if trust_level > 3:
  305. return
  306. m = """
  307. Support for coin {c!r} is EXPERIMENTAL. {a}
  308. assumes no responsibility for any loss of funds you may incur.
  309. This coin’s testing status: {t}
  310. Are you sure you want to continue?
  311. """
  312. from .util import fmt
  313. from .color import red, yellow, green
  314. warning = fmt(m).strip().format(
  315. c = coinsym.upper(),
  316. t = {
  317. 0: red('COMPLETELY UNTESTED'),
  318. 1: red('LOW'),
  319. 2: yellow('MEDIUM'),
  320. 3: green('OK'),
  321. }[trust_level],
  322. a = gc.author)
  323. if cfg.test_suite:
  324. cfg._util.qmsg(warning)
  325. return
  326. from .ui import keypress_confirm
  327. if not keypress_confirm(cfg, warning, default_yes=True):
  328. import sys
  329. sys.exit(0)