From 4449c23da40ec4803c3d584eb601d04177784c53 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Wed, 13 May 2020 21:07:45 +0000 Subject: [PATCH] make g.proto a class instance --- mmgen/addr.py | 2 +- mmgen/altcoin.py | 2 +- mmgen/obj.py | 10 +- mmgen/protocol.py | 136 ++++++++++--------------- mmgen/rpc.py | 6 +- mmgen/tool.py | 9 +- test/test.py | 2 +- test/test_py_d/ts_ref.py | 4 +- test/unit_tests_d/ut_tx_deserialize.py | 12 +-- 9 files changed, 79 insertions(+), 104 deletions(-) diff --git a/mmgen/addr.py b/mmgen/addr.py index 3e22ddfb..533383d9 100755 --- a/mmgen/addr.py +++ b/mmgen/addr.py @@ -961,7 +961,7 @@ Record this checksum: it will be used to verify the password file in the future # take most significant part bytes_trunc = bytes.fromhex(hex_sec[:pw_len_hex]) from .protocol import MoneroProtocol - bytes_preproc = MoneroProtocol.preprocess_key(bytes_trunc,None) + bytes_preproc = MoneroProtocol().preprocess_key(bytes_trunc,None) return ' '.join(baseconv.frombytes(bytes_preproc,wl_id='xmrseed')) else: # take least significant part diff --git a/mmgen/altcoin.py b/mmgen/altcoin.py index 492846b3..3c28dc84 100755 --- a/mmgen/altcoin.py +++ b/mmgen/altcoin.py @@ -435,7 +435,7 @@ class CoinInfo(object): e = cls.get_entry(coin,network) if e: proto = CoinProtocol(coin,testnet=network=='testnet') - cdata = (network,coin,e,proto.__name__,verbose) + cdata = (network,coin,e,type(proto).__name__,verbose) if not quiet: msg('Verifying {} {}'.format(coin.upper(),network)) diff --git a/mmgen/obj.py b/mmgen/obj.py index 853489d9..d3d25175 100755 --- a/mmgen/obj.py +++ b/mmgen/obj.py @@ -529,7 +529,7 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject): me.hex = ap.bytes.hex() return me except Exception as e: - return cls.init_fail(e,s,objname='{} address'.format(g.proto.__name__)) + return cls.init_fail(e,s,objname='{} address'.format(type(g.proto).__name__)) @classmethod def fmtc(cls,s,**kwargs): @@ -538,7 +538,7 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject): def is_for_chain(self,chain): - if g.proto.__name__[:8] == 'Ethereum': + if type(g.proto).__name__[:8] == 'Ethereum': return True proto = g.proto.get_protocol_by_chain(chain) @@ -620,7 +620,7 @@ class MMGenID(str,Hilite,InitErrors,MMGenObject): me.sid = SeedID(sid=ss[0],on_fail='raise') me.idx = AddrIdx(ss[-1],on_fail='raise') me.mmtype = t - assert t in g.proto.mmtypes,'{}: invalid address type for {}'.format(t,g.proto.__name__) + assert t in g.proto.mmtypes,'{}: invalid address type for {}'.format(t,type(g.proto).__name__) me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done me.sort_key = '{}:{}:{:0{w}}'.format(me.sid,me.mmtype,me.idx,w=me.idx.max_digits) return me @@ -755,7 +755,7 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject): me.orig_hex = None if k.sec != g.proto.preprocess_key(k.sec,k.pubkey_type): m = '{} WIF key {!r} encodes private key with unacceptable value {}' - raise PrivateKeyError(m.format(g.proto.__name__,me.wif,me)) + raise PrivateKeyError(m.format(type(g.proto).__name__,me.wif,me)) return me except Exception as e: return cls.init_fail(e,s,objname='{} WIF key'.format(g.coin)) @@ -914,7 +914,7 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject): for k in v._fields: setattr(me,k,getattr(v,k)) assert me in g.proto.mmtypes + ('P',), ( - "'{}': invalid address type for {}".format(me.name,g.proto.__name__)) + "'{}': invalid address type for {}".format(me.name,type(g.proto).__name__)) return me raise ValueError('unrecognized address type') except Exception as e: diff --git a/mmgen/protocol.py b/mmgen/protocol.py index f862fef6..07c12ec6 100755 --- a/mmgen/protocol.py +++ b/mmgen/protocol.py @@ -107,55 +107,49 @@ class BitcoinProtocol(MMGenObject): privkey_len = 32 avg_bdi = int(9.7 * 60) # average block discovery interval (historical) - @classmethod - def addr_fmt_to_ver_bytes(cls,req_fmt,return_hex=False): - for ver_hex,fmt in cls.addr_ver_bytes.items(): + def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False): + for ver_hex,fmt in self.addr_ver_bytes.items(): if req_fmt == fmt: return ver_hex if return_hex else bytes.fromhex(ver_hex) return False - @classmethod - def is_testnet(cls): - return cls.__name__[-15:] == 'TestnetProtocol' + def is_testnet(self): + return type(self).__name__[-15:] == 'TestnetProtocol' @staticmethod def get_protocol_by_chain(chain): return CoinProtocol(g.coin,{'mainnet':False,'testnet':True,'regtest':True}[chain]) - @classmethod - def cap(cls,s): return s in cls.caps + def cap(self,s): return s in self.caps - @classmethod - def preprocess_key(cls,sec,pubkey_type): + def preprocess_key(self,sec,pubkey_type): # Key must be non-zero and less than group order of secp256k1 curve - if 0 < int.from_bytes(sec,'big') < cls.secp256k1_ge: + if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge: return sec else: # chance of this is less than 1 in 2^127 pk = int.from_bytes(sec,'big') if pk == 0: # chance of this is 1 in 2^256 ydie(3,'Private key is zero!') - elif pk == cls.secp256k1_ge: # ditto + elif pk == self.secp256k1_ge: # ditto ydie(3,'Private key == secp256k1_ge!') else: if not g.test_suite: ymsg('Warning: private key is greater than secp256k1 group order!:\n {}'.format(hexpriv)) - return (pk % cls.secp256k1_ge).to_bytes(cls.privkey_len,'big') + return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big') - @classmethod - def hex2wif(cls,hexpriv,pubkey_type,compressed): # input is preprocessed hex + def hex2wif(self,hexpriv,pubkey_type,compressed): # input is preprocessed hex sec = bytes.fromhex(hexpriv) - assert len(sec) == cls.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec)) - assert pubkey_type in cls.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type) + assert len(sec) == self.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec)) + assert pubkey_type in self.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type) return _b58chk_encode( - bytes.fromhex(cls.wif_ver_num[pubkey_type]) + bytes.fromhex(self.wif_ver_num[pubkey_type]) + sec + (b'',b'\x01')[bool(compressed)]) - @classmethod - def parse_wif(cls,wif): + def parse_wif(self,wif): key = _b58chk_decode(wif) - for k,v in cls.wif_ver_num.items(): + for k,v in self.wif_ver_num.items(): v = bytes.fromhex(v) if key[:len(v)] == v: pubkey_type = k @@ -164,67 +158,60 @@ class BitcoinProtocol(MMGenObject): else: raise ValueError('invalid WIF version number') - if len(key) == cls.privkey_len + 1: + if len(key) == self.privkey_len + 1: assert key[-1] == 0x01,'{!r}: invalid compressed key suffix byte'.format(key[-1]) compressed = True - elif len(key) == cls.privkey_len: + elif len(key) == self.privkey_len: compressed = False else: raise ValueError('{}: invalid key length'.format(len(key))) - return parsed_wif(key[:cls.privkey_len], pubkey_type, compressed) + return parsed_wif(key[:self.privkey_len], pubkey_type, compressed) - @classmethod - def get_addr_len(cls,addr_fmt): - return cls.addr_len + def get_addr_len(self,addr_fmt): + return self.addr_len - @classmethod - def parse_addr_bytes(cls,addr_bytes): - for ver_hex,addr_fmt in cls.addr_ver_bytes.items(): + def parse_addr_bytes(self,addr_bytes): + for ver_hex,addr_fmt in self.addr_ver_bytes.items(): ver_bytes = bytes.fromhex(ver_hex) vlen = len(ver_bytes) if addr_bytes[:vlen] == ver_bytes: - if len(addr_bytes[vlen:]) == cls.get_addr_len(addr_fmt): + if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt): return parsed_addr( addr_bytes[vlen:], addr_fmt ) return False - @classmethod - def parse_addr(cls,addr): + def parse_addr(self,addr): - if 'B' in cls.mmtypes and addr[:len(cls.bech32_hrp)] == cls.bech32_hrp: - ret = bech32.decode(cls.bech32_hrp,addr) + if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp: + ret = bech32.decode(self.bech32_hrp,addr) - if ret[0] != cls.witness_vernum: + if ret[0] != self.witness_vernum: msg('{}: Invalid witness version number'.format(ret[0])) return False return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False - return cls.parse_addr_bytes(_b58chk_decode(addr)) + return self.parse_addr_bytes(_b58chk_decode(addr)) - @classmethod - def pubhash2addr(cls,pubkey_hash,p2sh): + def pubhash2addr(self,pubkey_hash,p2sh): assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash)) - s = cls.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash + s = self.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash return _b58chk_encode(bytes.fromhex(s)) # Segwit: - @classmethod - def pubhex2redeem_script(cls,pubhex): + def pubhex2redeem_script(self,pubhex): # https://bitcoincore.org/en/segwit_wallet_dev/ # The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed # by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash}) - return cls.witness_vernum_hex + '14' + hash160(pubhex) + return self.witness_vernum_hex + '14' + hash160(pubhex) - @classmethod - def pubhex2segwitaddr(cls,pubhex): - return cls.pubhash2addr(hash160(cls.pubhex2redeem_script(pubhex)),p2sh=True) + def pubhex2segwitaddr(self,pubhex): + return self.pubhash2addr(hash160(self.pubhex2redeem_script(pubhex)),p2sh=True) - @classmethod - def pubhash2bech32addr(cls,pubhash): + def pubhash2bech32addr(self,pubhash): d = list(bytes.fromhex(pubhash)) - return bech32.bech32_encode(cls.bech32_hrp,[cls.witness_vernum]+bech32.convertbits(d,8,5)) + return bech32.bech32_encode(self.bech32_hrp,[self.witness_vernum]+bech32.convertbits(d,8,5)) class BitcoinTestnetProtocol(BitcoinProtocol): addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' } @@ -249,10 +236,8 @@ class BitcoinCashProtocol(BitcoinProtocol): coin_amt = BCHAmt max_tx_fee = BCHAmt('0.1') - @classmethod - def pubhex2redeem_script(cls,pubhex): raise NotImplementedError - @classmethod - def pubhex2segwitaddr(cls,pubhex): raise NotImplementedError + def pubhex2redeem_script(self,pubhex): raise NotImplementedError + def pubhex2segwitaddr(self,pubhex): raise NotImplementedError class BitcoinCashTestnetProtocol(BitcoinCashProtocol): rpc_port = 18442 @@ -311,16 +296,14 @@ class BitcoinTestnetProtocolAddrgen(BitcoinTestnetProtocol): mmcaps = ('key','ad class DummyWIF(object): - @classmethod - def hex2wif(cls,hexpriv,pubkey_type,compressed): - n = cls.name.capitalize() - assert pubkey_type == cls.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n) + def hex2wif(self,hexpriv,pubkey_type,compressed): + n = self.name.capitalize() + assert pubkey_type == self.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n) assert compressed == False,'{} does not support compressed pubkeys!'.format(n) return hexpriv - @classmethod - def parse_wif(cls,wif): - return parsed_wif(bytes.fromhex(wif), cls.pubkey_type, False) + def parse_wif(self,wif): + return parsed_wif(bytes.fromhex(wif), self.pubkey_type, False) class EthereumProtocol(DummyWIF,BitcoinProtocol): @@ -343,16 +326,14 @@ class EthereumProtocol(DummyWIF,BitcoinProtocol): caps = ('token',) base_proto = 'Ethereum' - @classmethod - def parse_addr(cls,addr): + def parse_addr(self,addr): from .util import is_hex_str_lc - if is_hex_str_lc(addr) and len(addr) == cls.addr_len * 2: + if is_hex_str_lc(addr) and len(addr) == self.addr_len * 2: return parsed_addr( bytes.fromhex(addr), 'ethereum' ) if g.debug: Msg("Invalid address '{}'".format(addr)) return False - @classmethod - def pubhash2addr(cls,pubkey_hash,p2sh): + def pubhash2addr(self,pubkey_hash,p2sh): assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash)) assert not p2sh,'Ethereum has no P2SH address format' return pubkey_hash @@ -380,22 +361,19 @@ class ZcashProtocol(BitcoinProtocolAddrgen): mmtypes = ('L','C','Z') dfl_mmtype = 'L' - @classmethod - def get_addr_len(cls,addr_fmt): + def get_addr_len(self,addr_fmt): return (20,64)[addr_fmt in ('zcash_z','viewkey')] - @classmethod - def preprocess_key(cls,sec,pubkey_type): + def preprocess_key(self,sec,pubkey_type): if pubkey_type == 'zcash_z': # zero the first four bits return bytes([sec[0] & 0x0f]) + sec[1:] else: - return super(cls,cls).preprocess_key(sec,pubkey_type) + return super().preprocess_key(sec,pubkey_type) - @classmethod - def pubhash2addr(cls,pubkey_hash,p2sh): + def pubhash2addr(self,pubkey_hash,p2sh): hl = len(pubkey_hash) if hl == 40: - return super(cls,cls).pubhash2addr(pubkey_hash,p2sh) + return super().pubhash2addr(pubkey_hash,p2sh) elif hl == 128: raise NotImplementedError('Zcash z-addresses have no pubkey hash') else: @@ -416,14 +394,12 @@ class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen): dfl_mmtype = 'M' pubkey_type = 'monero' # required by DummyWIF - @classmethod - def preprocess_key(cls,sec,pubkey_type): # reduce key + def preprocess_key(self,sec,pubkey_type): # reduce key from .ed25519 import l n = int.from_bytes(sec[::-1],'big') % l - return int.to_bytes(n,cls.privkey_len,'big')[::-1] + return int.to_bytes(n,self.privkey_len,'big')[::-1] - @classmethod - def parse_addr(cls,addr): + def parse_addr(self,addr): from .baseconv import baseconv,is_b58_str @@ -444,7 +420,7 @@ class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen): chk = keccak_256(ret[:-4]).digest()[:4] assert ret[-4:] == chk,'{}: incorrect checksum. Correct value: {}'.format(ret[-4:].hex(),chk.hex()) - return cls.parse_addr_bytes(ret) + return self.parse_addr_bytes(ret) class MoneroTestnetProtocol(MoneroProtocol): addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' } @@ -470,7 +446,7 @@ class CoinProtocol(MMGenObject): proto = cls.coins[coin][testnet] if hasattr(proto,'bech32_hrps'): proto.bech32_hrp = proto.bech32_hrps[('testnet','regtest')[g.regtest]] - return proto + return proto() @classmethod def list_coins(cls): diff --git a/mmgen/rpc.py b/mmgen/rpc.py index 6827c79c..ed6a65e0 100755 --- a/mmgen/rpc.py +++ b/mmgen/rpc.py @@ -327,7 +327,7 @@ class BitcoinRPCClient(RPCClient,metaclass=aInitMeta): async def check_chainfork_mismatch(block0): try: - assert block0 == self.proto.block0,'Incorrect Genesis block for {}'.format(self.proto.__name__) + assert block0 == self.proto.block0,'Incorrect Genesis block for {}'.format(type(self.proto).__name__) for fork in self.proto.forks: if fork.height == None or self.blockcount < fork.height: break @@ -546,10 +546,10 @@ class MoneroWalletRPCClient(RPCClient): async def rpc_init(proto=None,backend=None): proto = proto or g.proto - backend = backend or g.rpc_backend + backend = backend or opt.rpc_backend if not 'rpc' in proto.mmcaps: - die(1,'Coin daemon operations not supported for {}!'.format(proto.__name__)) + die(1,'Coin daemon operations not supported for {}!'.format(type(proto).__name__)) g.rpc = await { 'bitcoind': BitcoinRPCClient, diff --git a/mmgen/tool.py b/mmgen/tool.py index 901353e4..dda52984 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -526,11 +526,12 @@ class MMGenToolCmdMnemonic(MMGenToolCmds): @staticmethod def _xmr_reduce(bytestr): - from .protocol import MoneroProtocol as mp - if len(bytestr) != mp.privkey_len: + from .protocol import MoneroProtocol + p = MoneroProtocol() + if len(bytestr) != p.privkey_len: m = '{!r}: invalid bit length for Monero private key (must be {})' - die(1,m.format(len(bytestr*8),mp.privkey_len*8)) - return mp.preprocess_key(bytestr,None) + die(1,m.format(len(bytestr*8),p.privkey_len*8)) + return p.preprocess_key(bytestr,None) def _do_random_mn(self,nbytes:int,fmt:str): assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32' diff --git a/test/test.py b/test/test.py index 6a9c5a91..8f6c1525 100755 --- a/test/test.py +++ b/test/test.py @@ -23,7 +23,7 @@ test/test.py: Test suite for the MMGen wallet system def check_segwit_opts(): for k,m in (('segwit','S'),('segwit_random','S'),('bech32','B')): if getattr(opt,k) and m not in g.proto.mmtypes: - die(1,'--{} option incompatible with {}'.format(k.replace('_','-'),g.proto.__name__)) + die(1,'--{} option incompatible with {}'.format(k.replace('_','-'),type(g.proto).__name__)) def create_shm_dir(data_dir,trash_dir): # Laggy flash media can cause pexpect to fail, so create a temporary directory diff --git a/test/test_py_d/ts_ref.py b/test/test_py_d/ts_ref.py index a442a3e3..aab0100c 100755 --- a/test/test_py_d/ts_ref.py +++ b/test/test_py_d/ts_ref.py @@ -231,12 +231,12 @@ class TestSuiteRef(TestSuiteBase,TestSuiteShared): def ref_segwitaddrfile_chk(self): if not 'S' in g.proto.mmtypes: - return skip('not supported by {}'.format(g.proto.__name__)) + return skip('not supported by {}'.format(type(g.proto).__name__)) return self.ref_addrfile_chk(ftype='segwitaddr',pat='{}.*Segwit'.format(nw_name)) def ref_bech32addrfile_chk(self): if not 'B' in g.proto.mmtypes: - return skip('not supported by {}'.format(g.proto.__name__)) + return skip('not supported by {}'.format(type(g.proto).__name__)) return self.ref_addrfile_chk(ftype='bech32addr',pat='{}.*Bech32'.format(nw_name)) def ref_keyaddrfile_chk(self): diff --git a/test/unit_tests_d/ut_tx_deserialize.py b/test/unit_tests_d/ut_tx_deserialize.py index cc1a7210..8044b822 100755 --- a/test/unit_tests_d/ut_tx_deserialize.py +++ b/test/unit_tests_d/ut_tx_deserialize.py @@ -113,15 +113,13 @@ class unit_test(object): # ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx') ) print_info('test/ref/*rawtx','MMGen reference transactions') + g.rpc_port = None for n,(coin,tn,fn) in enumerate(fns): - init_coin(coin,tn) - g.proto.daemon_data_dir = 'test/daemons/' + g.coin.lower() - g.rpc_port = CoinDaemon(coin + ('','_tn')[tn],test_suite=True).rpc_port - await rpc_init() + proto = init_coin(coin,tn) + proto.daemon_data_dir = 'test/daemons/' + coin + proto.rpc_port = CoinDaemon(coin + ('','_tn')[tn],test_suite=True).rpc_port + await rpc_init(proto=proto) await test_tx(MMGenTX(fn).hex,fn,n+1) - init_coin('btc',False) - g.rpc_port = CoinDaemon('btc',test_suite=True).rpc_port - await rpc_init() Msg('OK') start_test_daemons('btc','btc_tn') # ,'bch')