protocol.py 10 KB


  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. protocol: Coin protocol base classes and initializer
  20. """
  21. from collections import namedtuple
  22. from .cfg import gc
  23. from .objmethods import MMGenObject
  24. decoded_wif = namedtuple('decoded_wif',['sec','pubkey_type','compressed'])
  25. decoded_addr = namedtuple('decoded_addr',['bytes','ver_bytes','fmt'])
  26. parsed_addr = namedtuple('parsed_addr',['ver_bytes','data'])
  27. _finfo = namedtuple('fork_info',['height','hash','name','replayable'])
  28. _nw = namedtuple('coin_networks',['mainnet','testnet','regtest'])
  29. class CoinProtocol(MMGenObject):
  30. proto_info = namedtuple('proto_info',['name','trust_level']) # trust levels: see altcoin/params.py
  31. # keys are mirrored in gc.core_coins:
  32. coins = {
  33. 'btc': proto_info('Bitcoin', 5),
  34. 'bch': proto_info('BitcoinCash', 5),
  35. 'ltc': proto_info('Litecoin', 5),
  36. 'eth': proto_info('Ethereum', 4),
  37. 'etc': proto_info('EthereumClassic', 4),
  38. 'zec': proto_info('Zcash', 2),
  39. 'xmr': proto_info('Monero', 4)
  40. }
  41. class Base(MMGenObject):
  42. base_proto = None
  43. base_proto_coin = None
  44. base_coin = None
  45. is_fork_of = None
  46. chain_names = None
  47. networks = ('mainnet','testnet','regtest')
  48. def __init__(self,cfg,coin,name,network,tokensym=None,need_amt=False):
  49. self.cfg = cfg
  50. self.coin = coin.upper()
  51. self.coin_id = self.coin
  52. self.name = name
  53. self.network = network
  54. self.tokensym = tokensym
  55. self.cls_name = type(self).__name__
  56. self.testnet = network in ('testnet','regtest')
  57. self.regtest = network == 'regtest'
  58. self.networks = tuple(k for k,v in self.network_names._asdict().items() if v)
  59. self.network_id = coin.lower() + {
  60. 'mainnet': '',
  61. 'testnet': '_tn',
  62. 'regtest': '_rt',
  63. }[network]
  64. if hasattr(self,'wif_ver_num'):
  65. self.wif_ver_bytes = {k:bytes.fromhex(v) for k,v in self.wif_ver_num.items()}
  66. self.wif_ver_bytes_to_pubkey_type = {v:k for k,v in self.wif_ver_bytes.items()}
  67. vbs = list(self.wif_ver_bytes.values())
  68. self.wif_ver_bytes_len = len(vbs[0]) if len(set(len(b) for b in vbs)) == 1 else None
  69. if hasattr(self,'addr_ver_info'):
  70. self.addr_ver_bytes = {bytes.fromhex(k):v for k,v in self.addr_ver_info.items()}
  71. self.addr_fmt_to_ver_bytes = {v:k for k,v in self.addr_ver_bytes.items()}
  72. self.addr_ver_bytes_len = len(list(self.addr_ver_bytes)[0])
  73. if 'tx' not in self.mmcaps and gc.is_txprog:
  74. from .util import die
  75. die(2,f'Command {gc.prog_name!r} not supported for coin {self.coin}')
  76. if self.chain_names:
  77. self.chain_name = self.chain_names[0] # first chain name is default
  78. else:
  79. self.chain_names = [self.network]
  80. self.chain_name = self.network
  81. if self.tokensym:
  82. assert self.name.startswith('Ethereum'), 'CoinProtocol.Base_chk1'
  83. if self.base_coin in ('ETH','XMR'):
  84. from .util2 import get_keccak
  85. self.keccak_256 = get_keccak(cfg)
  86. if need_amt:
  87. from . import amt
  88. self.coin_amt = getattr(amt,self.coin_amt)
  89. self.max_tx_fee = self.coin_amt(self.max_tx_fee) if hasattr(self,'max_tx_fee') else None
  90. else:
  91. self.coin_amt = None
  92. self.max_tx_fee = None
  93. @property
  94. def dcoin(self):
  95. return self.coin
  96. @classmethod
  97. def chain_name_to_network(cls,cfg,coin,chain_name):
  98. """
  99. The generic networks 'mainnet', 'testnet' and 'regtest' are required for all coins
  100. that support transaction operations.
  101. For protocols that have specific names for chains corresponding to these networks,
  102. the attribute 'chain_name' is used, while 'network' retains the generic name.
  103. For Bitcoin and Bitcoin forks, 'network' and 'chain_name' are equivalent.
  104. """
  105. for network in ('mainnet','testnet','regtest'):
  106. proto = init_proto( cfg, coin, network=network )
  107. for proto_chain_name in proto.chain_names:
  108. if chain_name == proto_chain_name:
  109. return network
  110. raise ValueError(f'{chain_name}: unrecognized chain name for coin {coin}')
  111. @staticmethod
  112. def parse_network_id(network_id):
  113. nid = namedtuple('parsed_network_id',['coin','network'])
  114. if network_id.endswith('_tn'):
  115. return nid(network_id[:-3],'testnet')
  116. elif network_id.endswith('_rt'):
  117. return nid(network_id[:-3],'regtest')
  118. else:
  119. return nid(network_id,'mainnet')
  120. @staticmethod
  121. def create_network_id(coin,network):
  122. return coin.lower() + { 'mainnet':'', 'testnet':'_tn', 'regtest':'_rt' }[network]
  123. def cap(self,s):
  124. return s in self.caps
  125. def get_addr_len(self,addr_fmt):
  126. return self.addr_len
  127. def decode_addr_bytes(self,addr_bytes):
  128. vlen = self.addr_ver_bytes_len
  129. return decoded_addr(
  130. addr_bytes[vlen:],
  131. addr_bytes[:vlen],
  132. self.addr_ver_bytes[addr_bytes[:vlen]] )
  133. def coin_addr(self,addr):
  134. from .addr import CoinAddr
  135. return CoinAddr( proto=self, addr=addr )
  136. def addr_type(self,id_str):
  137. from .addr import MMGenAddrType
  138. return MMGenAddrType( proto=self, id_str=id_str )
  139. def viewkey(self,viewkey_str):
  140. raise NotImplementedError(f'{self.name} protocol does not support view keys')
  141. def base_proto_subclass(self,cls,modname,sub_clsname=None):
  142. """
  143. magic module loading and class selection
  144. """
  145. modpath = f'mmgen.proto.{self.base_proto_coin.lower()}.{modname}'
  146. clsname = (
  147. self.mod_clsname
  148. + ('Token' if self.tokensym else '')
  149. + cls.__name__ )
  150. import importlib
  151. if sub_clsname:
  152. return getattr(getattr(importlib.import_module(modpath),clsname),sub_clsname)
  153. else:
  154. return getattr(importlib.import_module(modpath),clsname)
  155. class Secp256k1(Base):
  156. """
  157. Bitcoin and Ethereum protocols inherit from this class
  158. """
  159. secp256k1_group_order = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
  160. privkey_len = 32
  161. pubkey_types = ('std',)
  162. def parse_addr(self,ver_bytes,addr_bytes,fmt):
  163. return parsed_addr(
  164. ver_bytes = ver_bytes,
  165. data = addr_bytes,
  166. )
  167. def preprocess_key(self,sec,pubkey_type):
  168. # Key must be non-zero and less than group order of secp256k1 curve
  169. if 0 < int.from_bytes(sec,'big') < self.secp256k1_group_order:
  170. return sec
  171. else: # chance of this is less than 1 in 2^127
  172. from .util import die,ymsg
  173. pk = int.from_bytes(sec,'big')
  174. if pk == 0: # chance of this is 1 in 2^256
  175. die(4,'Private key is zero!')
  176. elif pk == self.secp256k1_group_order: # ditto
  177. die(4,'Private key == secp256k1_group_order!')
  178. else: # return key mod group order as the key
  179. if not self.cfg.test_suite:
  180. ymsg(f'Warning: private key is greater than secp256k1 group order!:\n {sec.hex()}')
  181. return (pk % self.secp256k1_group_order).to_bytes(self.privkey_len,'big')
  182. class DummyWIF:
  183. """
  184. Ethereum and Monero protocols inherit from this class
  185. """
  186. def encode_wif(self,privbytes,pubkey_type,compressed):
  187. assert pubkey_type == self.pubkey_type, f'{pubkey_type}: invalid pubkey_type for {self.name} protocol!'
  188. assert compressed is False, f'{self.name} protocol does not support compressed pubkeys!'
  189. return privbytes.hex()
  190. def decode_wif(self,wif):
  191. return decoded_wif(
  192. sec = bytes.fromhex(wif),
  193. pubkey_type = self.pubkey_type,
  194. compressed = False )
  195. def init_proto(
  196. cfg,
  197. coin = None,
  198. testnet = False,
  199. regtest = False,
  200. network = None,
  201. network_id = None,
  202. tokensym = None,
  203. need_amt = False,
  204. return_cls = False):
  205. assert type(testnet) is bool, 'init_proto_chk1'
  206. assert type(regtest) is bool, 'init_proto_chk2'
  207. assert coin or network_id, 'init_proto_chk3'
  208. assert not (coin and network_id), 'init_proto_chk4'
  209. if network_id:
  210. coin,network = CoinProtocol.Base.parse_network_id(network_id)
  211. elif network:
  212. assert network in CoinProtocol.Base.networks, f'init_proto_chk5 - {network!r}: invalid network'
  213. assert testnet is False, 'init_proto_chk6'
  214. assert regtest is False, 'init_proto_chk7'
  215. else:
  216. network = 'regtest' if regtest else 'testnet' if testnet else 'mainnet'
  217. coin = coin.lower()
  218. if coin not in CoinProtocol.coins:
  219. from .altcoin.params import init_genonly_altcoins
  220. init_genonly_altcoins( coin, testnet=testnet ) # raises exception on failure
  221. name = CoinProtocol.coins[coin].name
  222. proto_name = name + ('' if network == 'mainnet' else network.capitalize())
  223. if not hasattr(CoinProtocol,proto_name):
  224. import importlib
  225. setattr(
  226. CoinProtocol,
  227. proto_name,
  228. getattr(importlib.import_module(f'mmgen.proto.{coin}.params'),network)
  229. )
  230. if return_cls:
  231. return getattr(CoinProtocol,proto_name)
  232. return getattr(CoinProtocol,proto_name)(
  233. cfg = cfg,
  234. coin = coin,
  235. name = name,
  236. network = network,
  237. tokensym = tokensym,
  238. need_amt = need_amt )
  239. def init_proto_from_cfg(cfg,need_amt):
  240. return init_proto(
  241. cfg = cfg,
  242. coin = cfg.coin,
  243. network = cfg.network,
  244. tokensym = cfg.token,
  245. need_amt = need_amt )
  246. def warn_trustlevel(cfg):
  247. coinsym = cfg.coin
  248. if coinsym.lower() in CoinProtocol.coins:
  249. trust_level = CoinProtocol.coins[coinsym.lower()].trust_level
  250. else:
  251. from .altcoin.params import CoinInfo
  252. e = CoinInfo.get_entry(coinsym,'mainnet')
  253. trust_level = e.trust_level if e else None
  254. if trust_level in (None,-1):
  255. from .util import die
  256. die(1,f'Coin {coinsym} is not supported by {gc.proj_name}')
  257. if trust_level > 3:
  258. return
  259. m = """
  260. Support for coin {c!r} is EXPERIMENTAL. The {p} project
  261. assumes no responsibility for any loss of funds you may incur.
  262. This coin’s {p} testing status: {t}
  263. Are you sure you want to continue?
  264. """
  265. from .util import fmt
  266. from .color import red,yellow,green
  267. warning = fmt(m).strip().format(
  268. c = coinsym.upper(),
  269. t = {
  270. 0: red('COMPLETELY UNTESTED'),
  271. 1: red('LOW'),
  272. 2: yellow('MEDIUM'),
  273. 3: green('OK'),
  274. }[trust_level],
  275. p = gc.proj_name )
  276. if cfg.test_suite:
  277. cfg._util.qmsg(warning)
  278. return
  279. from .ui import keypress_confirm
  280. if not keypress_confirm( cfg, warning, default_yes=True ):
  281. import sys
  282. sys.exit(0)