From 04775afb2e29accd041b813f89dc0d7f5ff582eb Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Thu, 14 May 2020 20:08:50 +0000 Subject: [PATCH] protocol.py: make coin protocols attrs of CoinProtocol --- mmgen/addr.py | 4 +- mmgen/obj.py | 4 +- mmgen/protocol.py | 674 ++++++++++++++++++------------------ mmgen/tool.py | 4 +- mmgen/util.py | 3 +- test/unit_tests_d/ut_rpc.py | 3 +- 6 files changed, 353 insertions(+), 339 deletions(-) diff --git a/mmgen/addr.py b/mmgen/addr.py index f7227f94..58e74d2a 100755 --- a/mmgen/addr.py +++ b/mmgen/addr.py @@ -960,8 +960,8 @@ Record this checksum: it will be used to verify the password file in the future pw_len_hex = baseconv.seedlen_map_rev['xmrseed'][self.pw_len] * 2 # take most significant part bytes_trunc = bytes.fromhex(hex_sec[:pw_len_hex]) - from .protocol import MoneroProtocol - bytes_preproc = MoneroProtocol().preprocess_key(bytes_trunc,None) + from .protocol import CoinProtocol + bytes_preproc = CoinProtocol.Monero().preprocess_key(bytes_trunc,None) return ' '.join(baseconv.frombytes(bytes_preproc,wl_id='xmrseed')) else: # take least significant part diff --git a/mmgen/obj.py b/mmgen/obj.py index d3d25175..4d91bb11 100755 --- a/mmgen/obj.py +++ b/mmgen/obj.py @@ -538,10 +538,10 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject): def is_for_chain(self,chain): - if type(g.proto).__name__[:8] == 'Ethereum': + if type(g.proto).__name__.startswith('Ethereum'): return True - proto = g.proto.get_protocol_by_chain(chain) + proto = CoinProtocol.get_protocol_by_chain(chain) if self.addr_fmt == 'bech32': return self[:len(proto.bech32_hrp)] == proto.bech32_hrp diff --git a/mmgen/protocol.py b/mmgen/protocol.py index 16b904ab..7887a85a 100755 --- a/mmgen/protocol.py +++ b/mmgen/protocol.py @@ -70,6 +70,7 @@ def _b58chk_decode(s): finfo = namedtuple('fork_info',['height','hash','name','replayable']) class CoinProtocol(MMGenObject): + proto_info = namedtuple('proto_info',['mainnet','testnet','trust_level']) # trust levels: see altcoin.py coins = { 'btc': proto_info('Bitcoin', 'BitcoinTestnet', 5), @@ -82,361 +83,367 @@ class CoinProtocol(MMGenObject): } core_coins = tuple(coins.keys()) # coins may be added by init_genonly_altcoins(), so save -# chainparams.cpp -class BitcoinProtocol(MMGenObject): - name = 'bitcoin' - daemon_name = 'bitcoind' - daemon_family = 'bitcoind' - addr_ver_bytes = { '00': 'p2pkh', '05': 'p2sh' } - addr_len = 20 - wif_ver_num = { 'std': '80' } - mmtypes = ('L','C','S','B') - dfl_mmtype = 'L' - data_subdir = '' - rpc_port = 8332 - secs_per_block = 600 - coin_amt = BTCAmt - max_tx_fee = BTCAmt('0.003') - daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin') if g.platform == 'win' \ - else os.path.join(g.home_dir,'.bitcoin') - daemon_data_subdir = '' - sighash_type = 'ALL' - block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' - forks = [ - finfo(478559,'00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148','BCH',False), - finfo(None,'','B2X',True), - ] - caps = ('rbf','segwit') - mmcaps = ('key','addr','rpc','tx') - base_coin = 'BTC' - base_proto = 'Bitcoin' - # From BIP173: witness version 'n' is stored as 'OP_n'. OP_0 is encoded as 0x00, - # but OP_1 through OP_16 are encoded as 0x51 though 0x60 (81 to 96 in decimal). - witness_vernum_hex = '00' - witness_vernum = int(witness_vernum_hex,16) - bech32_hrp = 'bc' - sign_mode = 'daemon' - secp256k1_ge = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141 - privkey_len = 32 - avg_bdi = int(9.7 * 60) # average block discovery interval (historical) - - def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False): - for ver_hex,fmt in self.addr_ver_bytes.items(): - if req_fmt == fmt: - return ver_hex if return_hex else bytes.fromhex(ver_hex) - return False - - def is_testnet(self): - return type(self).__name__[-15:] == 'TestnetProtocol' - @staticmethod def get_protocol_by_chain(chain): return init_proto(g.coin,{'mainnet':False,'testnet':True,'regtest':True}[chain]) - def cap(self,s): return s in self.caps + class Common(MMGenObject): - def preprocess_key(self,sec,pubkey_type): - # Key must be non-zero and less than group order of secp256k1 curve - if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge: - return sec - else: # chance of this is less than 1 in 2^127 - pk = int.from_bytes(sec,'big') - if pk == 0: # chance of this is 1 in 2^256 - ydie(3,'Private key is zero!') - elif pk == self.secp256k1_ge: # ditto - ydie(3,'Private key == secp256k1_ge!') + def is_testnet(self): + return type(self).__name__.endswith('Testnet') + + def cap(self,s): + return s in self.caps + + class Bitcoin(Common): # chainparams.cpp + name = 'bitcoin' + mod_clsname = 'bitcoin' + daemon_name = 'bitcoind' + daemon_family = 'bitcoind' + addr_ver_bytes = { '00': 'p2pkh', '05': 'p2sh' } + addr_len = 20 + wif_ver_num = { 'std': '80' } + mmtypes = ('L','C','S','B') + dfl_mmtype = 'L' + data_subdir = '' + rpc_port = 8332 + secs_per_block = 600 + coin_amt = BTCAmt + max_tx_fee = BTCAmt('0.003') + daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin') if g.platform == 'win' \ + else os.path.join(g.home_dir,'.bitcoin') + daemon_data_subdir = '' + sighash_type = 'ALL' + block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f' + forks = [ + finfo(478559,'00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148','BCH',False), + finfo(None,'','B2X',True), + ] + caps = ('rbf','segwit') + mmcaps = ('key','addr','rpc','tx') + base_coin = 'BTC' + base_proto = 'Bitcoin' + # From BIP173: witness version 'n' is stored as 'OP_n'. OP_0 is encoded as 0x00, + # but OP_1 through OP_16 are encoded as 0x51 though 0x60 (81 to 96 in decimal). + witness_vernum_hex = '00' + witness_vernum = int(witness_vernum_hex,16) + bech32_hrp = 'bc' + sign_mode = 'daemon' + secp256k1_ge = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141 + privkey_len = 32 + avg_bdi = int(9.7 * 60) # average block discovery interval (historical) + + def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False): + for ver_hex,fmt in self.addr_ver_bytes.items(): + if req_fmt == fmt: + return ver_hex if return_hex else bytes.fromhex(ver_hex) + return False + + def preprocess_key(self,sec,pubkey_type): + # Key must be non-zero and less than group order of secp256k1 curve + if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge: + return sec + else: # chance of this is less than 1 in 2^127 + pk = int.from_bytes(sec,'big') + if pk == 0: # chance of this is 1 in 2^256 + ydie(3,'Private key is zero!') + elif pk == self.secp256k1_ge: # ditto + ydie(3,'Private key == secp256k1_ge!') + else: + if not g.test_suite: + ymsg('Warning: private key is greater than secp256k1 group order!:\n {}'.format(hexpriv)) + return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big') + + def hex2wif(self,hexpriv,pubkey_type,compressed): # input is preprocessed hex + sec = bytes.fromhex(hexpriv) + assert len(sec) == self.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec)) + assert pubkey_type in self.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type) + return _b58chk_encode( + bytes.fromhex(self.wif_ver_num[pubkey_type]) + + sec + + (b'',b'\x01')[bool(compressed)]) + + def parse_wif(self,wif): + key = _b58chk_decode(wif) + + for k,v in self.wif_ver_num.items(): + v = bytes.fromhex(v) + if key[:len(v)] == v: + pubkey_type = k + key = key[len(v):] + break else: - if not g.test_suite: - ymsg('Warning: private key is greater than secp256k1 group order!:\n {}'.format(hexpriv)) - return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big') + raise ValueError('invalid WIF version number') - def hex2wif(self,hexpriv,pubkey_type,compressed): # input is preprocessed hex - sec = bytes.fromhex(hexpriv) - assert len(sec) == self.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec)) - assert pubkey_type in self.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type) - return _b58chk_encode( - bytes.fromhex(self.wif_ver_num[pubkey_type]) - + sec - + (b'',b'\x01')[bool(compressed)]) + if len(key) == self.privkey_len + 1: + assert key[-1] == 0x01,'{!r}: invalid compressed key suffix byte'.format(key[-1]) + compressed = True + elif len(key) == self.privkey_len: + compressed = False + else: + raise ValueError('{}: invalid key length'.format(len(key))) - def parse_wif(self,wif): - key = _b58chk_decode(wif) + return parsed_wif(key[:self.privkey_len], pubkey_type, compressed) - for k,v in self.wif_ver_num.items(): - v = bytes.fromhex(v) - if key[:len(v)] == v: - pubkey_type = k - key = key[len(v):] - break - else: - raise ValueError('invalid WIF version number') + def get_addr_len(self,addr_fmt): + return self.addr_len - if len(key) == self.privkey_len + 1: - assert key[-1] == 0x01,'{!r}: invalid compressed key suffix byte'.format(key[-1]) - compressed = True - elif len(key) == self.privkey_len: - compressed = False - else: - raise ValueError('{}: invalid key length'.format(len(key))) + def parse_addr_bytes(self,addr_bytes): + for ver_hex,addr_fmt in self.addr_ver_bytes.items(): + ver_bytes = bytes.fromhex(ver_hex) + vlen = len(ver_bytes) + if addr_bytes[:vlen] == ver_bytes: + if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt): + return parsed_addr( addr_bytes[vlen:], addr_fmt ) - return parsed_wif(key[:self.privkey_len], pubkey_type, compressed) + return False - def get_addr_len(self,addr_fmt): - return self.addr_len + def parse_addr(self,addr): - def parse_addr_bytes(self,addr_bytes): - for ver_hex,addr_fmt in self.addr_ver_bytes.items(): - ver_bytes = bytes.fromhex(ver_hex) - vlen = len(ver_bytes) - if addr_bytes[:vlen] == ver_bytes: - if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt): - return parsed_addr( addr_bytes[vlen:], addr_fmt ) + if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp: + ret = bech32.decode(self.bech32_hrp,addr) - return False + if ret[0] != self.witness_vernum: + msg('{}: Invalid witness version number'.format(ret[0])) + return False - def parse_addr(self,addr): + return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False - if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp: - ret = bech32.decode(self.bech32_hrp,addr) + return self.parse_addr_bytes(_b58chk_decode(addr)) - if ret[0] != self.witness_vernum: - msg('{}: Invalid witness version number'.format(ret[0])) - return False + def pubhash2addr(self,pubkey_hash,p2sh): + assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash)) + s = self.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash + return _b58chk_encode(bytes.fromhex(s)) - return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False + # Segwit: + def pubhex2redeem_script(self,pubhex): + # https://bitcoincore.org/en/segwit_wallet_dev/ + # The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed + # by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash}) + return self.witness_vernum_hex + '14' + hash160(pubhex) - return self.parse_addr_bytes(_b58chk_decode(addr)) + def pubhex2segwitaddr(self,pubhex): + return self.pubhash2addr(hash160(self.pubhex2redeem_script(pubhex)),p2sh=True) - def pubhash2addr(self,pubkey_hash,p2sh): - assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash)) - s = self.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash - return _b58chk_encode(bytes.fromhex(s)) + def pubhash2bech32addr(self,pubhash): + d = list(bytes.fromhex(pubhash)) + return bech32.bech32_encode(self.bech32_hrp,[self.witness_vernum]+bech32.convertbits(d,8,5)) - # Segwit: - def pubhex2redeem_script(self,pubhex): - # https://bitcoincore.org/en/segwit_wallet_dev/ - # The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed - # by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash}) - return self.witness_vernum_hex + '14' + hash160(pubhex) + class BitcoinTestnet(Bitcoin): + addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } + wif_ver_num = { 'std': 'ef' } + data_subdir = 'testnet' + daemon_data_subdir = 'testnet3' + rpc_port = 18332 + bech32_hrps = {'testnet':'tb','regtest':'bcrt'} - def pubhex2segwitaddr(self,pubhex): - return self.pubhash2addr(hash160(self.pubhex2redeem_script(pubhex)),p2sh=True) + class BitcoinCash(Bitcoin): + # TODO: assumes MSWin user installs in custom dir 'Bitcoin_ABC' + daemon_name = 'bitcoind-abc' + daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_ABC') if g.platform == 'win' \ + else os.path.join(g.home_dir,'.bitcoin-abc') + rpc_port = 8442 + mmtypes = ('L','C') + sighash_type = 'ALL|FORKID' + forks = [ + finfo(478559,'000000000000000000651ef99cb9fcbe0dadde1d424bd9f15ff20136191a5eec','BTC',False) + ] + caps = () + coin_amt = BCHAmt + max_tx_fee = BCHAmt('0.1') - def pubhash2bech32addr(self,pubhash): - d = list(bytes.fromhex(pubhash)) - return bech32.bech32_encode(self.bech32_hrp,[self.witness_vernum]+bech32.convertbits(d,8,5)) + def pubhex2redeem_script(self,pubhex): raise NotImplementedError + def pubhex2segwitaddr(self,pubhex): raise NotImplementedError -class BitcoinTestnetProtocol(BitcoinProtocol): - addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } - wif_ver_num = { 'std': 'ef' } - data_subdir = 'testnet' - daemon_data_subdir = 'testnet3' - rpc_port = 18332 - bech32_hrps = {'testnet':'tb','regtest':'bcrt'} + class BitcoinCashTestnet(BitcoinCash): + rpc_port = 18442 + addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } + wif_ver_num = { 'std': 'ef' } + data_subdir = 'testnet' + daemon_data_subdir = 'testnet3' -class BitcoinCashProtocol(BitcoinProtocol): - # TODO: assumes MSWin user installs in custom dir 'Bitcoin_ABC' - daemon_name = 'bitcoind-abc' - daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_ABC') if g.platform == 'win' \ - else os.path.join(g.home_dir,'.bitcoin-abc') - rpc_port = 8442 - mmtypes = ('L','C') - sighash_type = 'ALL|FORKID' - forks = [ - finfo(478559,'000000000000000000651ef99cb9fcbe0dadde1d424bd9f15ff20136191a5eec','BTC',False) - ] - caps = () - coin_amt = BCHAmt - max_tx_fee = BCHAmt('0.1') + class B2X(Bitcoin): + daemon_name = 'bitcoind-2x' + daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_2X') if g.platform == 'win' \ + else os.path.join(g.home_dir,'.bitcoin-2x') + rpc_port = 8338 + coin_amt = B2XAmt + max_tx_fee = B2XAmt('0.1') + forks = [ + finfo(None,'','BTC',True) # activation: 494784 + ] - def pubhex2redeem_script(self,pubhex): raise NotImplementedError - def pubhex2segwitaddr(self,pubhex): raise NotImplementedError + class B2XTestnet(B2X): + addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } + wif_ver_num = { 'std': 'ef' } + data_subdir = 'testnet' + daemon_data_subdir = 'testnet5' + rpc_port = 18338 -class BitcoinCashTestnetProtocol(BitcoinCashProtocol): - rpc_port = 18442 - addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } - wif_ver_num = { 'std': 'ef' } - data_subdir = 'testnet' - daemon_data_subdir = 'testnet3' + class Litecoin(Bitcoin): + block0 = '12a765e31ffd4059bada1e25190f6e98c99d9714d334efa41a195a7e7e04bfe2' + name = 'litecoin' + daemon_name = 'litecoind' + daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Litecoin') if g.platform == 'win' \ + else os.path.join(g.home_dir,'.litecoin') + addr_ver_bytes = { '30': 'p2pkh', '32': 'p2sh', '05': 'p2sh' } # new p2sh ver 0x32 must come first + wif_ver_num = { 'std': 'b0' } + mmtypes = ('L','C','S','B') + secs_per_block = 150 + rpc_port = 9332 + coin_amt = LTCAmt + max_tx_fee = LTCAmt('0.3') + base_coin = 'LTC' + forks = [] + bech32_hrp = 'ltc' + avg_bdi = 2 * 60 -class B2XProtocol(BitcoinProtocol): - daemon_name = 'bitcoind-2x' - daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_2X') if g.platform == 'win' \ - else os.path.join(g.home_dir,'.bitcoin-2x') - rpc_port = 8338 - coin_amt = B2XAmt - max_tx_fee = B2XAmt('0.1') - forks = [ - finfo(None,'','BTC',True) # activation: 494784 - ] + class LitecoinTestnet(Litecoin): + # addr ver nums same as Bitcoin testnet, except for 'p2sh' + addr_ver_bytes = { '6f':'p2pkh', '3a':'p2sh', 'c4':'p2sh' } + wif_ver_num = { 'std': 'ef' } # same as Bitcoin testnet + data_subdir = 'testnet' + daemon_data_subdir = 'testnet4' + rpc_port = 19332 + bech32_hrps = {'testnet':'tltc','regtest':'rltc'} -class B2XTestnetProtocol(B2XProtocol): - addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } - wif_ver_num = { 'std': 'ef' } - data_subdir = 'testnet' - daemon_data_subdir = 'testnet5' - rpc_port = 18338 + class BitcoinAddrgen(Bitcoin): + mmcaps = ('key','addr') -class LitecoinProtocol(BitcoinProtocol): - block0 = '12a765e31ffd4059bada1e25190f6e98c99d9714d334efa41a195a7e7e04bfe2' - name = 'litecoin' - daemon_name = 'litecoind' - daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Litecoin') if g.platform == 'win' \ - else os.path.join(g.home_dir,'.litecoin') - addr_ver_bytes = { '30': 'p2pkh', '32': 'p2sh', '05': 'p2sh' } # new p2sh ver 0x32 must come first - wif_ver_num = { 'std': 'b0' } - mmtypes = ('L','C','S','B') - secs_per_block = 150 - rpc_port = 9332 - coin_amt = LTCAmt - max_tx_fee = LTCAmt('0.3') - base_coin = 'LTC' - forks = [] - bech32_hrp = 'ltc' - avg_bdi = 2 * 60 + class BitcoinAddrgenTestnet(BitcoinTestnet): + mmcaps = ('key','addr') -class LitecoinTestnetProtocol(LitecoinProtocol): - # addr ver nums same as Bitcoin testnet, except for 'p2sh' - addr_ver_bytes = { '6f':'p2pkh', '3a':'p2sh', 'c4':'p2sh' } - wif_ver_num = { 'std': 'ef' } # same as Bitcoin testnet - data_subdir = 'testnet' - daemon_data_subdir = 'testnet4' - rpc_port = 19332 - bech32_hrps = {'testnet':'tltc','regtest':'rltc'} + class DummyWIF(object): -class BitcoinProtocolAddrgen(BitcoinProtocol): mmcaps = ('key','addr') -class BitcoinTestnetProtocolAddrgen(BitcoinTestnetProtocol): mmcaps = ('key','addr') + def hex2wif(self,hexpriv,pubkey_type,compressed): + n = self.name.capitalize() + assert pubkey_type == self.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n) + assert compressed == False,'{} does not support compressed pubkeys!'.format(n) + return hexpriv -class DummyWIF(object): + def parse_wif(self,wif): + return parsed_wif(bytes.fromhex(wif), self.pubkey_type, False) - def hex2wif(self,hexpriv,pubkey_type,compressed): - n = self.name.capitalize() - assert pubkey_type == self.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n) - assert compressed == False,'{} does not support compressed pubkeys!'.format(n) - return hexpriv + class Ethereum(DummyWIF,Bitcoin): - def parse_wif(self,wif): - return parsed_wif(bytes.fromhex(wif), self.pubkey_type, False) + addr_len = 20 + mmtypes = ('E',) + dfl_mmtype = 'E' + name = 'ethereum' + mod_clsname = 'ethereum' + base_coin = 'ETH' + pubkey_type = 'std' # required by DummyWIF -class EthereumProtocol(DummyWIF,BitcoinProtocol): + data_subdir = '' + daemon_name = 'parity' + daemon_family = 'parity' + rpc_port = 8545 + mmcaps = ('key','addr','rpc') + coin_amt = ETHAmt + max_tx_fee = ETHAmt('0.005') + chain_name = 'foundation' + sign_mode = 'standalone' + caps = ('token',) + base_proto = 'Ethereum' - addr_len = 20 - mmtypes = ('E',) - dfl_mmtype = 'E' - name = 'ethereum' - base_coin = 'ETH' - pubkey_type = 'std' # required by DummyWIF + def parse_addr(self,addr): + from .util import is_hex_str_lc + if is_hex_str_lc(addr) and len(addr) == self.addr_len * 2: + return parsed_addr( bytes.fromhex(addr), 'ethereum' ) + if g.debug: Msg("Invalid address '{}'".format(addr)) + return False - data_subdir = '' - daemon_name = 'parity' - daemon_family = 'parity' - rpc_port = 8545 - mmcaps = ('key','addr','rpc') - coin_amt = ETHAmt - max_tx_fee = ETHAmt('0.005') - chain_name = 'foundation' - sign_mode = 'standalone' - caps = ('token',) - base_proto = 'Ethereum' + def pubhash2addr(self,pubkey_hash,p2sh): + assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash)) + assert not p2sh,'Ethereum has no P2SH address format' + return pubkey_hash - def parse_addr(self,addr): - from .util import is_hex_str_lc - if is_hex_str_lc(addr) and len(addr) == self.addr_len * 2: - return parsed_addr( bytes.fromhex(addr), 'ethereum' ) - if g.debug: Msg("Invalid address '{}'".format(addr)) - return False + class EthereumTestnet(Ethereum): + data_subdir = 'testnet' + rpc_port = 8547 # start Parity with --jsonrpc-port=8547 or --ports-shift=2 + chain_name = 'kovan' - def pubhash2addr(self,pubkey_hash,p2sh): - assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash)) - assert not p2sh,'Ethereum has no P2SH address format' - return pubkey_hash + class EthereumClassic(Ethereum): + name = 'ethereumClassic' # TODO + rpc_port = 8555 # start Parity with --jsonrpc-port=8555 or --ports-shift=10 + chain_name = 'ethereum_classic' # chain_id 0x3d (61) -class EthereumTestnetProtocol(EthereumProtocol): - data_subdir = 'testnet' - rpc_port = 8547 # start Parity with --jsonrpc-port=8547 or --ports-shift=2 - chain_name = 'kovan' + class EthereumClassicTestnet(EthereumClassic): + rpc_port = 8557 # start Parity with --jsonrpc-port=8557 or --ports-shift=12 + chain_name = 'classic-testnet' # aka Morden, chain_id 0x3e (62) (UNTESTED) -class EthereumClassicProtocol(EthereumProtocol): - name = 'ethereumClassic' - class_pfx = 'Ethereum' - rpc_port = 8555 # start Parity with --jsonrpc-port=8555 or --ports-shift=10 - chain_name = 'ethereum_classic' # chain_id 0x3d (61) + class Zcash(BitcoinAddrgen): + name = 'zcash' + base_coin = 'ZEC' + addr_ver_bytes = { '1cb8': 'p2pkh', '1cbd': 'p2sh', '169a': 'zcash_z', 'a8abd3': 'viewkey' } + wif_ver_num = { 'std': '80', 'zcash_z': 'ab36' } + mmtypes = ('L','C','Z') + dfl_mmtype = 'L' -class EthereumClassicTestnetProtocol(EthereumClassicProtocol): - rpc_port = 8557 # start Parity with --jsonrpc-port=8557 or --ports-shift=12 - chain_name = 'classic-testnet' # aka Morden, chain_id 0x3e (62) (UNTESTED) + def get_addr_len(self,addr_fmt): + return (20,64)[addr_fmt in ('zcash_z','viewkey')] -class ZcashProtocol(BitcoinProtocolAddrgen): - name = 'zcash' - base_coin = 'ZEC' - addr_ver_bytes = { '1cb8': 'p2pkh', '1cbd': 'p2sh', '169a': 'zcash_z', 'a8abd3': 'viewkey' } - wif_ver_num = { 'std': '80', 'zcash_z': 'ab36' } - mmtypes = ('L','C','Z') - dfl_mmtype = 'L' + def preprocess_key(self,sec,pubkey_type): + if pubkey_type == 'zcash_z': # zero the first four bits + return bytes([sec[0] & 0x0f]) + sec[1:] + else: + return super().preprocess_key(sec,pubkey_type) - def get_addr_len(self,addr_fmt): - return (20,64)[addr_fmt in ('zcash_z','viewkey')] + def pubhash2addr(self,pubkey_hash,p2sh): + hl = len(pubkey_hash) + if hl == 40: + return super().pubhash2addr(pubkey_hash,p2sh) + elif hl == 128: + raise NotImplementedError('Zcash z-addresses have no pubkey hash') + else: + raise ValueError('{}: incorrect pubkey_hash length'.format(hl)) - def preprocess_key(self,sec,pubkey_type): - if pubkey_type == 'zcash_z': # zero the first four bits - return bytes([sec[0] & 0x0f]) + sec[1:] - else: - return super().preprocess_key(sec,pubkey_type) - - def pubhash2addr(self,pubkey_hash,p2sh): - hl = len(pubkey_hash) - if hl == 40: - return super().pubhash2addr(pubkey_hash,p2sh) - elif hl == 128: - raise NotImplementedError('Zcash z-addresses have no pubkey hash') - else: - raise ValueError('{}: incorrect pubkey_hash length'.format(hl)) - -class ZcashTestnetProtocol(ZcashProtocol): - wif_ver_num = { 'std': 'ef', 'zcash_z': 'ac08' } - addr_ver_bytes = { '1d25': 'p2pkh', '1cba': 'p2sh', '16b6': 'zcash_z', 'a8ac0c': 'viewkey' } + class ZcashTestnet(Zcash): + wif_ver_num = { 'std': 'ef', 'zcash_z': 'ac08' } + addr_ver_bytes = { '1d25': 'p2pkh', '1cba': 'p2sh', '16b6': 'zcash_z', 'a8ac0c': 'viewkey' } # https://github.com/monero-project/monero/blob/master/src/cryptonote_config.h -class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen): - name = 'monero' - base_coin = 'XMR' - addr_ver_bytes = { '12': 'monero', '2a': 'monero_sub' } - addr_len = 68 - wif_ver_num = {} - mmtypes = ('M',) - dfl_mmtype = 'M' - pubkey_type = 'monero' # required by DummyWIF + class Monero(DummyWIF,BitcoinAddrgen): + name = 'monero' + base_coin = 'XMR' + addr_ver_bytes = { '12': 'monero', '2a': 'monero_sub' } + addr_len = 68 + wif_ver_num = {} + mmtypes = ('M',) + dfl_mmtype = 'M' + pubkey_type = 'monero' # required by DummyWIF - def preprocess_key(self,sec,pubkey_type): # reduce key - from .ed25519 import l - n = int.from_bytes(sec[::-1],'big') % l - return int.to_bytes(n,self.privkey_len,'big')[::-1] + def preprocess_key(self,sec,pubkey_type): # reduce key + from .ed25519 import l + n = int.from_bytes(sec[::-1],'big') % l + return int.to_bytes(n,self.privkey_len,'big')[::-1] - def parse_addr(self,addr): + def parse_addr(self,addr): - from .baseconv import baseconv,is_b58_str + from .baseconv import baseconv,is_b58_str - def b58dec(addr_str): - l = len(addr_str) - a = b''.join([baseconv.tobytes(addr_str[i*11:i*11+11],'b58',pad=8) for i in range(l//11)]) - b = baseconv.tobytes(addr_str[-(l%11):],'b58',pad=5) - return a + b + def b58dec(addr_str): + l = len(addr_str) + a = b''.join([baseconv.tobytes(addr_str[i*11:i*11+11],'b58',pad=8) for i in range(l//11)]) + b = baseconv.tobytes(addr_str[-(l%11):],'b58',pad=5) + return a + b - ret = b58dec(addr) + ret = b58dec(addr) - try: - assert not g.use_internal_keccak_module - from sha3 import keccak_256 - except: - from .keccak import keccak_256 + try: + assert not g.use_internal_keccak_module + from sha3 import keccak_256 + except: + from .keccak import keccak_256 - chk = keccak_256(ret[:-4]).digest()[:4] - assert ret[-4:] == chk,'{}: incorrect checksum. Correct value: {}'.format(ret[-4:].hex(),chk.hex()) + chk = keccak_256(ret[:-4]).digest()[:4] + assert ret[-4:] == chk,'{}: incorrect checksum. Correct value: {}'.format(ret[-4:].hex(),chk.hex()) - return self.parse_addr_bytes(ret) + return self.parse_addr_bytes(ret) -class MoneroTestnetProtocol(MoneroProtocol): - addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' } + class MoneroTestnet(Monero): + addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' } def init_proto(coin,testnet): coin = coin.lower() @@ -446,7 +453,7 @@ def init_proto(coin,testnet): '{}: not a valid coin for network {}\nSupported coins: {}'.format( coin.upper(),g.network.upper(), ' '.join(c.upper() for c in CoinProtocol.coins) )) - proto = globals()[CoinProtocol.coins[coin][testnet] + 'Protocol'] + proto = getattr(CoinProtocol,CoinProtocol.coins[coin][testnet]) if hasattr(proto,'bech32_hrps'): proto.bech32_hrp = proto.bech32_hrps[('testnet','regtest')[g.regtest]] return proto() @@ -490,42 +497,51 @@ def make_init_genonly_altcoins_str(data): def make_proto(e,testnet=False): tn_str = 'Testnet' if testnet else '' - proto,coin = '{}{}Protocol'.format(e.name,tn_str),e.symbol - if proto[0] in '0123456789': proto = 'X_'+proto - if proto in globals(): return '' - if coin.lower() in CoinProtocol.coins: return '' + proto = e.name + tn_str + coin = e.symbol + if proto[0] in '0123456789': + proto = 'X_'+proto + if hasattr(CoinProtocol,proto) or coin.lower() in CoinProtocol.coins: + return '' def num2hexstr(n): return "'{:0{}x}'".format(n,(4,2)[n < 256]) - o = ['class {}(Bitcoin{}ProtocolAddrgen):'.format(proto,tn_str)] - o += ["base_coin = '{}'".format(coin)] - o += ["name = '{}'".format(e.name.lower())] - o += ["nameCaps = '{}'".format(e.name)] - o += ["addr_ver_bytes = {{ {}: 'p2pkh'{} }}".format( - num2hexstr(e.p2pkh_info[0]), - ", {}: 'p2sh'".format(num2hexstr(e.p2sh_info[0])) if e.p2sh_info else '' - )] - o += ["wif_ver_num = {{ 'std': {} }}".format(num2hexstr(e.wif_ver_num))] - o += ["mmtypes = ('L','C'{})".format(",'S'" if e.has_segwit else '')] - o += ["dfl_mmtype = '{}'".format('L')] - return '\n\t'.join(o) + '\n' + p2sh_info = ", {}: 'p2sh'".format(num2hexstr(e.p2sh_info[0])) if e.p2sh_info else '' + sw_mmtype = ",'S'" if e.has_segwit else '' - out = '' - for e in data['mainnet']: - out += make_proto(e) - for e in data['testnet']: - out += make_proto(e,testnet=True) + return f""" + class {proto}(CoinProtocol.BitcoinAddrgen{tn_str}): + base_coin = {coin!r} + name = {e.name.lower()!r} + nameCaps = {e.name!r} + addr_ver_bytes = {{ {num2hexstr(e.p2pkh_info[0])}: 'p2pkh'{p2sh_info} }} + wif_ver_num = {{ 'std': {num2hexstr(e.wif_ver_num)} }} + mmtypes = ('L','C'{sw_mmtype}) + dfl_mmtype = 'L' + """.rstrip() - tn_coins = [e.symbol for e in data['testnet']] - fs = "CoinProtocol.coins['{}'] = CoinProtocol.proto_info('{}',{},{})\n" - for e in data['mainnet']: - proto,coin = e.name,e.symbol - if proto[0] in '0123456789': proto = 'X_'+proto - if proto+'Protocol' in globals(): continue - if coin.lower() in CoinProtocol.coins: continue - out += fs.format(coin.lower(),proto,('None',f"'{proto}Testnet'")[coin in tn_coins],e.trust_level) - return out + def gen_text(): + yield "class CoinProtocol(CoinProtocol):" + for e in data['mainnet']: + yield make_proto(e) + for e in data['testnet']: + yield make_proto(e,testnet=True) + yield '' + + for e in data['mainnet']: + proto,coin = e.name,e.symbol + if proto[0] in '0123456789': + proto = 'X_'+proto + if hasattr(CoinProtocol,proto) or coin.lower() in CoinProtocol.coins: + continue + yield 'CoinProtocol.coins[{!r}] = CoinProtocol.proto_info({!r},{},{})'.format( + coin.lower(), + proto, + ('None',f"'{proto}Testnet'")[coin in [e.symbol for e in data['testnet']]], + e.trust_level ) + + return '\n'.join(gen_text()) + '\n' def init_coin(coin,testnet=None): if testnet is not None: diff --git a/mmgen/tool.py b/mmgen/tool.py index ffcee2e5..ec533243 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -526,8 +526,8 @@ class MMGenToolCmdMnemonic(MMGenToolCmds): @staticmethod def _xmr_reduce(bytestr): - from .protocol import MoneroProtocol - p = MoneroProtocol() + from .protocol import CoinProtocol + p = CoinProtocol.Monero() if len(bytestr) != p.privkey_len: m = '{!r}: invalid bit length for Monero private key (must be {})' die(1,m.format(len(bytestr*8),p.privkey_len*8)) diff --git a/mmgen/util.py b/mmgen/util.py index 7a7909f7..079ca916 100755 --- a/mmgen/util.py +++ b/mmgen/util.py @@ -860,11 +860,10 @@ def format_par(s,indent=0,width=80,as_list=False): def altcoin_subclass(cls,mod_id,cls_name): if cls.__name__ != cls_name: return cls mod_dir = g.proto.base_coin.lower() - pname = g.proto.class_pfx if hasattr(g.proto,'class_pfx') else capfirst(g.proto.name) tname = 'Token' if g.token else '' import importlib modname = 'mmgen.altcoins.{}.{}'.format(mod_dir,mod_id) - clsname = '{}{}{}'.format(pname,tname,cls_name) + clsname = '{}{}{}'.format(capfirst(g.proto.mod_clsname),tname,cls_name) try: return getattr(importlib.import_module(modname),clsname) except ImportError: diff --git a/test/unit_tests_d/ut_rpc.py b/test/unit_tests_d/ut_rpc.py index a73d9980..1fa33789 100755 --- a/test/unit_tests_d/ut_rpc.py +++ b/test/unit_tests_d/ut_rpc.py @@ -6,7 +6,7 @@ test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite from mmgen.common import * from mmgen.exception import * -from mmgen.protocol import init_coin,EthereumProtocol +from mmgen.protocol import init_coin from mmgen.rpc import MoneroWalletRPCClient from mmgen.daemon import CoinDaemon,MoneroWalletDaemon @@ -72,7 +72,6 @@ class unit_tests: async def run_test(): qmsg(' Testing backend {!r}'.format(type(g.rpc.backend).__name__)) ret = await g.rpc.call('parity_versionInfo',timeout=300) - #print(ret) for backend in g.autoset_opts['rpc_backend'].choices: run_session(run_test(),backend=backend)