protocol.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. protocol.py: Coin protocol base classes and initializer
  20. """
  21. from collections import namedtuple
  22. from .devtools import *
  23. from .globalvars import g
  24. parsed_wif = namedtuple('parsed_wif',['sec','pubkey_type','compressed'])
  25. parsed_addr = namedtuple('parsed_addr',['bytes','fmt'])
  26. _finfo = namedtuple('fork_info',['height','hash','name','replayable'])
  27. _nw = namedtuple('coin_networks',['mainnet','testnet','regtest'])
  28. _b58a='123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' # shared by Bitcoin and Monero
  29. class CoinProtocol(MMGenObject):
  30. proto_info = namedtuple('proto_info',['name','trust_level']) # trust levels: see altcoin.py
  31. # keys are mirrored in g.core_coins:
  32. coins = {
  33. 'btc': proto_info('Bitcoin', 5),
  34. 'bch': proto_info('BitcoinCash', 5),
  35. 'ltc': proto_info('Litecoin', 5),
  36. 'eth': proto_info('Ethereum', 4),
  37. 'etc': proto_info('EthereumClassic', 4),
  38. 'zec': proto_info('Zcash', 2),
  39. 'xmr': proto_info('Monero', 4)
  40. }
  41. class Base(MMGenObject):
  42. base_proto = None
  43. is_fork_of = None
  44. networks = ('mainnet','testnet','regtest')
  45. def __init__(self,coin,name,network,tokensym=None,need_amt=False):
  46. self.coin = coin.upper()
  47. self.coin_id = self.coin
  48. self.name = name
  49. self.network = network
  50. self.tokensym = tokensym
  51. self.cls_name = type(self).__name__
  52. self.testnet = network in ('testnet','regtest')
  53. self.regtest = network == 'regtest'
  54. self.networks = tuple(k for k,v in self.network_names._asdict().items() if v)
  55. self.network_id = coin.lower() + {
  56. 'mainnet': '',
  57. 'testnet': '_tn',
  58. 'regtest': '_rt',
  59. }[network]
  60. if hasattr(self,'chain_names'):
  61. self.chain_name = self.chain_names[0] # first chain name is default
  62. else:
  63. self.chain_name = self.network
  64. self.chain_names = [self.network]
  65. if self.tokensym:
  66. assert self.name.startswith('Ethereum'), 'CoinProtocol.Base_chk1'
  67. if self.base_coin in ('ETH','XMR'):
  68. from .util import get_keccak
  69. self.keccak_256 = get_keccak()
  70. if need_amt:
  71. import mmgen.amt
  72. setattr( self, 'coin_amt', getattr(mmgen.amt,self.coin_amt) )
  73. fee = getattr(self,'max_tx_fee',None)
  74. setattr( self, 'max_tx_fee', (self.coin_amt(fee) if fee else None) )
  75. else:
  76. setattr( self, 'coin_amt', None )
  77. setattr( self, 'max_tx_fee', None )
  78. @property
  79. def dcoin(self):
  80. return self.coin
  81. @classmethod
  82. def chain_name_to_network(cls,coin,chain_name):
  83. """
  84. The generic networks 'mainnet', 'testnet' and 'regtest' are required for all coins
  85. that support transaction operations.
  86. For protocols that have specific names for chains corresponding to these networks,
  87. the attribute 'chain_name' is used, while 'network' retains the generic name.
  88. For Bitcoin and Bitcoin forks, 'network' and 'chain_name' are equivalent.
  89. """
  90. for network in ('mainnet','testnet','regtest'):
  91. proto = init_proto(coin,network=network)
  92. for proto_chain_name in proto.chain_names:
  93. if chain_name == proto_chain_name:
  94. return network
  95. raise ValueError(f'{chain_name}: unrecognized chain name for coin {coin}')
  96. @staticmethod
  97. def parse_network_id(network_id):
  98. nid = namedtuple('parsed_network_id',['coin','network'])
  99. if network_id.endswith('_tn'):
  100. return nid(network_id[:-3],'testnet')
  101. elif network_id.endswith('_rt'):
  102. return nid(network_id[:-3],'regtest')
  103. else:
  104. return nid(network_id,'mainnet')
  105. @staticmethod
  106. def create_network_id(coin,network):
  107. return coin.lower() + { 'mainnet':'', 'testnet':'_tn', 'regtest':'_rt' }[network]
  108. def cap(self,s):
  109. return s in self.caps
  110. def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False):
  111. for ver_hex,fmt in self.addr_ver_bytes.items():
  112. if req_fmt == fmt:
  113. return ver_hex if return_hex else bytes.fromhex(ver_hex)
  114. return False
  115. def get_addr_len(self,addr_fmt):
  116. return self.addr_len
  117. def parse_addr_bytes(self,addr_bytes):
  118. for ver_hex,addr_fmt in self.addr_ver_bytes.items():
  119. ver_bytes = bytes.fromhex(ver_hex)
  120. vlen = len(ver_bytes)
  121. if addr_bytes[:vlen] == ver_bytes:
  122. if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt):
  123. return parsed_addr( addr_bytes[vlen:], addr_fmt )
  124. return False
  125. def coin_addr(self,addr):
  126. from .addr import CoinAddr
  127. return CoinAddr( proto=self, addr=addr )
  128. def addr_type(self,id_str):
  129. from .addr import MMGenAddrType
  130. return MMGenAddrType( proto=self, id_str=id_str )
  131. class Secp256k1(Base):
  132. """
  133. Bitcoin and Ethereum protocols inherit from this class
  134. """
  135. secp256k1_ge = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
  136. privkey_len = 32
  137. pubkey_types = ('std',)
  138. def preprocess_key(self,sec,pubkey_type):
  139. # Key must be non-zero and less than group order of secp256k1 curve
  140. if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge:
  141. return sec
  142. else: # chance of this is less than 1 in 2^127
  143. from .util import ydie
  144. pk = int.from_bytes(sec,'big')
  145. if pk == 0: # chance of this is 1 in 2^256
  146. ydie(3,'Private key is zero!')
  147. elif pk == self.secp256k1_ge: # ditto
  148. ydie(3,'Private key == secp256k1_ge!')
  149. else:
  150. if not g.test_suite:
  151. from .util import ymsg
  152. ymsg(f'Warning: private key is greater than secp256k1 group order!:\n {hexpriv}')
  153. return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big')
  154. class DummyWIF:
  155. """
  156. Ethereum and Monero protocols inherit from this class
  157. """
  158. def bytes2wif(self,privbytes,pubkey_type,compressed):
  159. assert pubkey_type == self.pubkey_type, f'{pubkey_type}: invalid pubkey_type for {self.name} protocol!'
  160. assert compressed == False, f'{self.name} protocol does not support compressed pubkeys!'
  161. return privbytes.hex()
  162. def parse_wif(self,wif):
  163. return parsed_wif(
  164. sec = bytes.fromhex(wif),
  165. pubkey_type = self.pubkey_type,
  166. compressed = False )
  167. def init_proto(coin=None,testnet=False,regtest=False,network=None,network_id=None,tokensym=None,need_amt=False):
  168. assert type(testnet) == bool, 'init_proto_chk1'
  169. assert type(regtest) == bool, 'init_proto_chk2'
  170. assert coin or network_id, 'init_proto_chk3'
  171. assert not (coin and network_id), 'init_proto_chk4'
  172. if network_id:
  173. coin,network = CoinProtocol.Base.parse_network_id(network_id)
  174. elif network:
  175. assert network in CoinProtocol.Base.networks, f'init_proto_chk5 - {network!r}: invalid network'
  176. assert testnet == False, 'init_proto_chk6'
  177. assert regtest == False, 'init_proto_chk7'
  178. else:
  179. network = 'regtest' if regtest else 'testnet' if testnet else 'mainnet'
  180. coin = coin.lower()
  181. if coin not in CoinProtocol.coins:
  182. from .altcoin import init_genonly_altcoins
  183. init_genonly_altcoins( coin, testnet=testnet ) # raises exception on failure
  184. name = CoinProtocol.coins[coin].name
  185. proto_name = name + ('' if network == 'mainnet' else network.capitalize())
  186. if not hasattr(CoinProtocol,proto_name):
  187. import importlib
  188. setattr(
  189. CoinProtocol,
  190. proto_name,
  191. getattr(importlib.import_module(f'mmgen.proto.{coin}'),network)
  192. )
  193. return getattr(CoinProtocol,proto_name)(
  194. coin = coin,
  195. name = name,
  196. network = network,
  197. tokensym = tokensym,
  198. need_amt = need_amt )
  199. def init_proto_from_opts(need_amt=False):
  200. return init_proto(
  201. coin = g.coin,
  202. testnet = g.testnet,
  203. regtest = g.regtest,
  204. tokensym = g.token,
  205. need_amt = need_amt )
  206. def warn_trustlevel(coinsym):
  207. if coinsym.lower() in CoinProtocol.coins:
  208. trust_level = CoinProtocol.coins[coinsym.lower()].trust_level
  209. else:
  210. from .altcoin import CoinInfo
  211. e = CoinInfo.get_entry(coinsym,'mainnet')
  212. trust_level = e.trust_level if e else None
  213. if trust_level in (None,-1):
  214. from .util import die
  215. die(1,f'Coin {coinsym} is not supported by {g.proj_name}')
  216. if trust_level > 3:
  217. return
  218. m = """
  219. Support for coin {c!r} is EXPERIMENTAL. The {p} project
  220. assumes no responsibility for any loss of funds you may incur.
  221. This coin’s {p} testing status: {t}
  222. Are you sure you want to continue?
  223. """
  224. from .util import qmsg,fmt,keypress_confirm
  225. from .color import red,yellow,green
  226. warning = fmt(m).strip().format(
  227. c = coinsym.upper(),
  228. t = {
  229. 0: red('COMPLETELY UNTESTED'),
  230. 1: red('LOW'),
  231. 2: yellow('MEDIUM'),
  232. 3: green('OK'),
  233. }[trust_level],
  234. p = g.proj_name )
  235. if g.test_suite:
  236. qmsg(warning)
  237. return
  238. if not keypress_confirm(warning,default_yes=True):
  239. import sys
  240. sys.exit(0)