Browse Source

make g.proto a class instance

The MMGen Project 4 years ago
parent
commit
4449c23da4
9 changed files with 79 additions and 104 deletions
  1. 1 1
      mmgen/addr.py
  2. 1 1
      mmgen/altcoin.py
  3. 5 5
      mmgen/obj.py
  4. 56 80
      mmgen/protocol.py
  5. 3 3
      mmgen/rpc.py
  6. 5 4
      mmgen/tool.py
  7. 1 1
      test/test.py
  8. 2 2
      test/test_py_d/ts_ref.py
  9. 5 7
      test/unit_tests_d/ut_tx_deserialize.py

+ 1 - 1
mmgen/addr.py

@@ -961,7 +961,7 @@ Record this checksum: it will be used to verify the password file in the future
 			# take most significant part
 			bytes_trunc = bytes.fromhex(hex_sec[:pw_len_hex])
 			from .protocol import MoneroProtocol
-			bytes_preproc = MoneroProtocol.preprocess_key(bytes_trunc,None)
+			bytes_preproc = MoneroProtocol().preprocess_key(bytes_trunc,None)
 			return ' '.join(baseconv.frombytes(bytes_preproc,wl_id='xmrseed'))
 		else:
 			# take least significant part

+ 1 - 1
mmgen/altcoin.py

@@ -435,7 +435,7 @@ class CoinInfo(object):
 				e = cls.get_entry(coin,network)
 				if e:
 					proto = CoinProtocol(coin,testnet=network=='testnet')
-					cdata = (network,coin,e,proto.__name__,verbose)
+					cdata = (network,coin,e,type(proto).__name__,verbose)
 					if not quiet:
 						msg('Verifying {} {}'.format(coin.upper(),network))
 

+ 5 - 5
mmgen/obj.py

@@ -529,7 +529,7 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
 			me.hex = ap.bytes.hex()
 			return me
 		except Exception as e:
-			return cls.init_fail(e,s,objname='{} address'.format(g.proto.__name__))
+			return cls.init_fail(e,s,objname='{} address'.format(type(g.proto).__name__))
 
 	@classmethod
 	def fmtc(cls,s,**kwargs):
@@ -538,7 +538,7 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
 
 	def is_for_chain(self,chain):
 
-		if g.proto.__name__[:8] == 'Ethereum':
+		if type(g.proto).__name__[:8] == 'Ethereum':
 			return True
 
 		proto = g.proto.get_protocol_by_chain(chain)
@@ -620,7 +620,7 @@ class MMGenID(str,Hilite,InitErrors,MMGenObject):
 			me.sid = SeedID(sid=ss[0],on_fail='raise')
 			me.idx = AddrIdx(ss[-1],on_fail='raise')
 			me.mmtype = t
-			assert t in g.proto.mmtypes,'{}: invalid address type for {}'.format(t,g.proto.__name__)
+			assert t in g.proto.mmtypes,'{}: invalid address type for {}'.format(t,type(g.proto).__name__)
 			me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
 			me.sort_key = '{}:{}:{:0{w}}'.format(me.sid,me.mmtype,me.idx,w=me.idx.max_digits)
 			return me
@@ -755,7 +755,7 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
 				me.orig_hex = None
 				if k.sec != g.proto.preprocess_key(k.sec,k.pubkey_type):
 					m = '{} WIF key {!r} encodes private key with unacceptable value {}'
-					raise PrivateKeyError(m.format(g.proto.__name__,me.wif,me))
+					raise PrivateKeyError(m.format(type(g.proto).__name__,me.wif,me))
 				return me
 			except Exception as e:
 				return cls.init_fail(e,s,objname='{} WIF key'.format(g.coin))
@@ -914,7 +914,7 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
 					for k in v._fields:
 						setattr(me,k,getattr(v,k))
 					assert me in g.proto.mmtypes + ('P',), (
-						"'{}': invalid address type for {}".format(me.name,g.proto.__name__))
+						"'{}': invalid address type for {}".format(me.name,type(g.proto).__name__))
 					return me
 			raise ValueError('unrecognized address type')
 		except Exception as e:

+ 56 - 80
mmgen/protocol.py

@@ -107,55 +107,49 @@ class BitcoinProtocol(MMGenObject):
 	privkey_len        = 32
 	avg_bdi            = int(9.7 * 60) # average block discovery interval (historical)
 
-	@classmethod
-	def addr_fmt_to_ver_bytes(cls,req_fmt,return_hex=False):
-		for ver_hex,fmt in cls.addr_ver_bytes.items():
+	def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False):
+		for ver_hex,fmt in self.addr_ver_bytes.items():
 			if req_fmt == fmt:
 				return ver_hex if return_hex else bytes.fromhex(ver_hex)
 		return False
 
-	@classmethod
-	def is_testnet(cls):
-		return cls.__name__[-15:] == 'TestnetProtocol'
+	def is_testnet(self):
+		return type(self).__name__[-15:] == 'TestnetProtocol'
 
 	@staticmethod
 	def get_protocol_by_chain(chain):
 		return CoinProtocol(g.coin,{'mainnet':False,'testnet':True,'regtest':True}[chain])
 
-	@classmethod
-	def cap(cls,s): return s in cls.caps
+	def cap(self,s): return s in self.caps
 
-	@classmethod
-	def preprocess_key(cls,sec,pubkey_type):
+	def preprocess_key(self,sec,pubkey_type):
 		# Key must be non-zero and less than group order of secp256k1 curve
-		if 0 < int.from_bytes(sec,'big') < cls.secp256k1_ge:
+		if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge:
 			return sec
 		else: # chance of this is less than 1 in 2^127
 			pk = int.from_bytes(sec,'big')
 			if pk == 0: # chance of this is 1 in 2^256
 				ydie(3,'Private key is zero!')
-			elif pk == cls.secp256k1_ge: # ditto
+			elif pk == self.secp256k1_ge: # ditto
 				ydie(3,'Private key == secp256k1_ge!')
 			else:
 				if not g.test_suite:
 					ymsg('Warning: private key is greater than secp256k1 group order!:\n  {}'.format(hexpriv))
-				return (pk % cls.secp256k1_ge).to_bytes(cls.privkey_len,'big')
+				return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big')
 
-	@classmethod
-	def hex2wif(cls,hexpriv,pubkey_type,compressed): # input is preprocessed hex
+	def hex2wif(self,hexpriv,pubkey_type,compressed): # input is preprocessed hex
 		sec = bytes.fromhex(hexpriv)
-		assert len(sec) == cls.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec))
-		assert pubkey_type in cls.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type)
+		assert len(sec) == self.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec))
+		assert pubkey_type in self.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type)
 		return _b58chk_encode(
-			bytes.fromhex(cls.wif_ver_num[pubkey_type])
+			bytes.fromhex(self.wif_ver_num[pubkey_type])
 			+ sec
 			+ (b'',b'\x01')[bool(compressed)])
 
-	@classmethod
-	def parse_wif(cls,wif):
+	def parse_wif(self,wif):
 		key = _b58chk_decode(wif)
 
-		for k,v in cls.wif_ver_num.items():
+		for k,v in self.wif_ver_num.items():
 			v = bytes.fromhex(v)
 			if key[:len(v)] == v:
 				pubkey_type = k
@@ -164,67 +158,60 @@ class BitcoinProtocol(MMGenObject):
 		else:
 			raise ValueError('invalid WIF version number')
 
-		if len(key) == cls.privkey_len + 1:
+		if len(key) == self.privkey_len + 1:
 			assert key[-1] == 0x01,'{!r}: invalid compressed key suffix byte'.format(key[-1])
 			compressed = True
-		elif len(key) == cls.privkey_len:
+		elif len(key) == self.privkey_len:
 			compressed = False
 		else:
 			raise ValueError('{}: invalid key length'.format(len(key)))
 
-		return parsed_wif(key[:cls.privkey_len], pubkey_type, compressed)
+		return parsed_wif(key[:self.privkey_len], pubkey_type, compressed)
 
-	@classmethod
-	def get_addr_len(cls,addr_fmt):
-		return cls.addr_len
+	def get_addr_len(self,addr_fmt):
+		return self.addr_len
 
-	@classmethod
-	def parse_addr_bytes(cls,addr_bytes):
-		for ver_hex,addr_fmt in cls.addr_ver_bytes.items():
+	def parse_addr_bytes(self,addr_bytes):
+		for ver_hex,addr_fmt in self.addr_ver_bytes.items():
 			ver_bytes = bytes.fromhex(ver_hex)
 			vlen = len(ver_bytes)
 			if addr_bytes[:vlen] == ver_bytes:
-				if len(addr_bytes[vlen:]) == cls.get_addr_len(addr_fmt):
+				if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt):
 					return parsed_addr( addr_bytes[vlen:], addr_fmt )
 
 		return False
 
-	@classmethod
-	def parse_addr(cls,addr):
+	def parse_addr(self,addr):
 
-		if 'B' in cls.mmtypes and addr[:len(cls.bech32_hrp)] == cls.bech32_hrp:
-			ret = bech32.decode(cls.bech32_hrp,addr)
+		if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp:
+			ret = bech32.decode(self.bech32_hrp,addr)
 
-			if ret[0] != cls.witness_vernum:
+			if ret[0] != self.witness_vernum:
 				msg('{}: Invalid witness version number'.format(ret[0]))
 				return False
 
 			return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False
 
-		return cls.parse_addr_bytes(_b58chk_decode(addr))
+		return self.parse_addr_bytes(_b58chk_decode(addr))
 
-	@classmethod
-	def pubhash2addr(cls,pubkey_hash,p2sh):
+	def pubhash2addr(self,pubkey_hash,p2sh):
 		assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
-		s = cls.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash
+		s = self.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash
 		return _b58chk_encode(bytes.fromhex(s))
 
 	# Segwit:
-	@classmethod
-	def pubhex2redeem_script(cls,pubhex):
+	def pubhex2redeem_script(self,pubhex):
 		# https://bitcoincore.org/en/segwit_wallet_dev/
 		# The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed
 		# by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash})
-		return cls.witness_vernum_hex + '14' + hash160(pubhex)
+		return self.witness_vernum_hex + '14' + hash160(pubhex)
 
-	@classmethod
-	def pubhex2segwitaddr(cls,pubhex):
-		return cls.pubhash2addr(hash160(cls.pubhex2redeem_script(pubhex)),p2sh=True)
+	def pubhex2segwitaddr(self,pubhex):
+		return self.pubhash2addr(hash160(self.pubhex2redeem_script(pubhex)),p2sh=True)
 
-	@classmethod
-	def pubhash2bech32addr(cls,pubhash):
+	def pubhash2bech32addr(self,pubhash):
 		d = list(bytes.fromhex(pubhash))
-		return bech32.bech32_encode(cls.bech32_hrp,[cls.witness_vernum]+bech32.convertbits(d,8,5))
+		return bech32.bech32_encode(self.bech32_hrp,[self.witness_vernum]+bech32.convertbits(d,8,5))
 
 class BitcoinTestnetProtocol(BitcoinProtocol):
 	addr_ver_bytes       = { '6f': 'p2pkh', 'c4': 'p2sh' }
@@ -249,10 +236,8 @@ class BitcoinCashProtocol(BitcoinProtocol):
 	coin_amt        = BCHAmt
 	max_tx_fee      = BCHAmt('0.1')
 
-	@classmethod
-	def pubhex2redeem_script(cls,pubhex): raise NotImplementedError
-	@classmethod
-	def pubhex2segwitaddr(cls,pubhex):    raise NotImplementedError
+	def pubhex2redeem_script(self,pubhex): raise NotImplementedError
+	def pubhex2segwitaddr(self,pubhex):    raise NotImplementedError
 
 class BitcoinCashTestnetProtocol(BitcoinCashProtocol):
 	rpc_port      = 18442
@@ -311,16 +296,14 @@ class BitcoinTestnetProtocolAddrgen(BitcoinTestnetProtocol): mmcaps = ('key','ad
 
 class DummyWIF(object):
 
-	@classmethod
-	def hex2wif(cls,hexpriv,pubkey_type,compressed):
-		n = cls.name.capitalize()
-		assert pubkey_type == cls.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n)
+	def hex2wif(self,hexpriv,pubkey_type,compressed):
+		n = self.name.capitalize()
+		assert pubkey_type == self.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n)
 		assert compressed == False,'{} does not support compressed pubkeys!'.format(n)
 		return hexpriv
 
-	@classmethod
-	def parse_wif(cls,wif):
-		return parsed_wif(bytes.fromhex(wif), cls.pubkey_type, False)
+	def parse_wif(self,wif):
+		return parsed_wif(bytes.fromhex(wif), self.pubkey_type, False)
 
 class EthereumProtocol(DummyWIF,BitcoinProtocol):
 
@@ -343,16 +326,14 @@ class EthereumProtocol(DummyWIF,BitcoinProtocol):
 	caps        = ('token',)
 	base_proto  = 'Ethereum'
 
-	@classmethod
-	def parse_addr(cls,addr):
+	def parse_addr(self,addr):
 		from .util import is_hex_str_lc
-		if is_hex_str_lc(addr) and len(addr) == cls.addr_len * 2:
+		if is_hex_str_lc(addr) and len(addr) == self.addr_len * 2:
 			return parsed_addr( bytes.fromhex(addr), 'ethereum' )
 		if g.debug: Msg("Invalid address '{}'".format(addr))
 		return False
 
-	@classmethod
-	def pubhash2addr(cls,pubkey_hash,p2sh):
+	def pubhash2addr(self,pubkey_hash,p2sh):
 		assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
 		assert not p2sh,'Ethereum has no P2SH address format'
 		return pubkey_hash
@@ -380,22 +361,19 @@ class ZcashProtocol(BitcoinProtocolAddrgen):
 	mmtypes      = ('L','C','Z')
 	dfl_mmtype   = 'L'
 
-	@classmethod
-	def get_addr_len(cls,addr_fmt):
+	def get_addr_len(self,addr_fmt):
 		return (20,64)[addr_fmt in ('zcash_z','viewkey')]
 
-	@classmethod
-	def preprocess_key(cls,sec,pubkey_type):
+	def preprocess_key(self,sec,pubkey_type):
 		if pubkey_type == 'zcash_z': # zero the first four bits
 			return bytes([sec[0] & 0x0f]) + sec[1:]
 		else:
-			return super(cls,cls).preprocess_key(sec,pubkey_type)
+			return super().preprocess_key(sec,pubkey_type)
 
-	@classmethod
-	def pubhash2addr(cls,pubkey_hash,p2sh):
+	def pubhash2addr(self,pubkey_hash,p2sh):
 		hl = len(pubkey_hash)
 		if hl == 40:
-			return super(cls,cls).pubhash2addr(pubkey_hash,p2sh)
+			return super().pubhash2addr(pubkey_hash,p2sh)
 		elif hl == 128:
 			raise NotImplementedError('Zcash z-addresses have no pubkey hash')
 		else:
@@ -416,14 +394,12 @@ class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen):
 	dfl_mmtype   = 'M'
 	pubkey_type = 'monero' # required by DummyWIF
 
-	@classmethod
-	def preprocess_key(cls,sec,pubkey_type): # reduce key
+	def preprocess_key(self,sec,pubkey_type): # reduce key
 		from .ed25519 import l
 		n = int.from_bytes(sec[::-1],'big') % l
-		return int.to_bytes(n,cls.privkey_len,'big')[::-1]
+		return int.to_bytes(n,self.privkey_len,'big')[::-1]
 
-	@classmethod
-	def parse_addr(cls,addr):
+	def parse_addr(self,addr):
 
 		from .baseconv import baseconv,is_b58_str
 
@@ -444,7 +420,7 @@ class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen):
 		chk = keccak_256(ret[:-4]).digest()[:4]
 		assert ret[-4:] == chk,'{}: incorrect checksum.  Correct value: {}'.format(ret[-4:].hex(),chk.hex())
 
-		return cls.parse_addr_bytes(ret)
+		return self.parse_addr_bytes(ret)
 
 class MoneroTestnetProtocol(MoneroProtocol):
 	addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' }
@@ -470,7 +446,7 @@ class CoinProtocol(MMGenObject):
 		proto = cls.coins[coin][testnet]
 		if hasattr(proto,'bech32_hrps'):
 			proto.bech32_hrp = proto.bech32_hrps[('testnet','regtest')[g.regtest]]
-		return proto
+		return proto()
 
 	@classmethod
 	def list_coins(cls):

+ 3 - 3
mmgen/rpc.py

@@ -327,7 +327,7 @@ class BitcoinRPCClient(RPCClient,metaclass=aInitMeta):
 
 		async def check_chainfork_mismatch(block0):
 			try:
-				assert block0 == self.proto.block0,'Incorrect Genesis block for {}'.format(self.proto.__name__)
+				assert block0 == self.proto.block0,'Incorrect Genesis block for {}'.format(type(self.proto).__name__)
 				for fork in self.proto.forks:
 					if fork.height == None or self.blockcount < fork.height:
 						break
@@ -546,10 +546,10 @@ class MoneroWalletRPCClient(RPCClient):
 async def rpc_init(proto=None,backend=None):
 
 	proto = proto or g.proto
-	backend = backend or g.rpc_backend
+	backend = backend or opt.rpc_backend
 
 	if not 'rpc' in proto.mmcaps:
-		die(1,'Coin daemon operations not supported for {}!'.format(proto.__name__))
+		die(1,'Coin daemon operations not supported for {}!'.format(type(proto).__name__))
 
 	g.rpc = await {
 		'bitcoind': BitcoinRPCClient,

+ 5 - 4
mmgen/tool.py

@@ -526,11 +526,12 @@ class MMGenToolCmdMnemonic(MMGenToolCmds):
 
 	@staticmethod
 	def _xmr_reduce(bytestr):
-		from .protocol import MoneroProtocol as mp
-		if len(bytestr) != mp.privkey_len:
+		from .protocol import MoneroProtocol
+		p = MoneroProtocol()
+		if len(bytestr) != p.privkey_len:
 			m = '{!r}: invalid bit length for Monero private key (must be {})'
-			die(1,m.format(len(bytestr*8),mp.privkey_len*8))
-		return mp.preprocess_key(bytestr,None)
+			die(1,m.format(len(bytestr*8),p.privkey_len*8))
+		return p.preprocess_key(bytestr,None)
 
 	def _do_random_mn(self,nbytes:int,fmt:str):
 		assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32'

+ 1 - 1
test/test.py

@@ -23,7 +23,7 @@ test/test.py: Test suite for the MMGen wallet system
 def check_segwit_opts():
 	for k,m in (('segwit','S'),('segwit_random','S'),('bech32','B')):
 		if getattr(opt,k) and m not in g.proto.mmtypes:
-			die(1,'--{} option incompatible with {}'.format(k.replace('_','-'),g.proto.__name__))
+			die(1,'--{} option incompatible with {}'.format(k.replace('_','-'),type(g.proto).__name__))
 
 def create_shm_dir(data_dir,trash_dir):
 	# Laggy flash media can cause pexpect to fail, so create a temporary directory

+ 2 - 2
test/test_py_d/ts_ref.py

@@ -231,12 +231,12 @@ class TestSuiteRef(TestSuiteBase,TestSuiteShared):
 
 	def ref_segwitaddrfile_chk(self):
 		if not 'S' in g.proto.mmtypes:
-			return skip('not supported by {}'.format(g.proto.__name__))
+			return skip('not supported by {}'.format(type(g.proto).__name__))
 		return self.ref_addrfile_chk(ftype='segwitaddr',pat='{}.*Segwit'.format(nw_name))
 
 	def ref_bech32addrfile_chk(self):
 		if not 'B' in g.proto.mmtypes:
-			return skip('not supported by {}'.format(g.proto.__name__))
+			return skip('not supported by {}'.format(type(g.proto).__name__))
 		return self.ref_addrfile_chk(ftype='bech32addr',pat='{}.*Bech32'.format(nw_name))
 
 	def ref_keyaddrfile_chk(self):

+ 5 - 7
test/unit_tests_d/ut_tx_deserialize.py

@@ -113,15 +113,13 @@ class unit_test(object):
 				#	('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx')
 				)
 			print_info('test/ref/*rawtx','MMGen reference transactions')
+			g.rpc_port = None
 			for n,(coin,tn,fn) in enumerate(fns):
-				init_coin(coin,tn)
-				g.proto.daemon_data_dir = 'test/daemons/' + g.coin.lower()
-				g.rpc_port = CoinDaemon(coin + ('','_tn')[tn],test_suite=True).rpc_port
-				await rpc_init()
+				proto = init_coin(coin,tn)
+				proto.daemon_data_dir = 'test/daemons/' + coin
+				proto.rpc_port = CoinDaemon(coin + ('','_tn')[tn],test_suite=True).rpc_port
+				await rpc_init(proto=proto)
 				await test_tx(MMGenTX(fn).hex,fn,n+1)
-			init_coin('btc',False)
-			g.rpc_port = CoinDaemon('btc',test_suite=True).rpc_port
-			await rpc_init()
 			Msg('OK')
 
 		start_test_daemons('btc','btc_tn') # ,'bch')