Browse Source

protocol.py: parse_addr() rewrite

- rename verify_addr() to parse_addr()
- implement using bytes instead of hex strings
- remove unnecessary first-letter and encoded length checks
- perform decoded length check
The MMGen Project 5 years ago
parent
commit
b4c528f92a
4 changed files with 72 additions and 84 deletions
  1. 3 8
      mmgen/addr.py
  2. 5 14
      mmgen/obj.py
  3. 61 61
      mmgen/protocol.py
  4. 3 1
      mmgen/tx.py

+ 3 - 8
mmgen/addr.py

@@ -107,9 +107,6 @@ class AddrGeneratorEthereum(AddrGenerator):
 # github.com/FiloSottile/zcash-mini/zcash/address.go
 class AddrGeneratorZcashZ(AddrGenerator):
 
-	addr_width = 95
-	vk_width = 97
-
 	def zhash256(self,s,t):
 		s = bytearray(s + bytes(32))
 		s[0] |= 0xc0
@@ -123,9 +120,8 @@ class AddrGeneratorZcashZ(AddrGenerator):
 		from nacl.bindings import crypto_scalarmult_base
 		p2 = crypto_scalarmult_base(self.zhash256(key,1))
 		from mmgen.protocol import _b58chk_encode
-		ver_bytes = bytes.fromhex(g.proto.addr_ver_num['zcash_z'][0])
+		ver_bytes = g.proto.addr_fmt_to_ver_bytes('zcash_z')
 		ret = _b58chk_encode(ver_bytes + self.zhash256(key,0) + p2)
-		assert len(ret) == self.addr_width,'Invalid Zcash z-address length'
 		return CoinAddr(ret)
 
 	def to_viewkey(self,pubhex): # pubhex is really privhex
@@ -136,9 +132,8 @@ class AddrGeneratorZcashZ(AddrGenerator):
 		vk[63] &= 0x7f
 		vk[63] |= 0x40
 		from mmgen.protocol import _b58chk_encode
-		ver_bytes = bytes.fromhex(g.proto.addr_ver_num['viewkey'][0])
+		ver_bytes = g.proto.addr_fmt_to_ver_bytes('viewkey')
 		ret = _b58chk_encode(ver_bytes + vk)
-		assert len(ret) == self.vk_width,'Invalid Zcash view key length'
 		return ZcashViewKey(ret)
 
 	def to_segwit_redeem_script(self,pubhex):
@@ -197,7 +192,7 @@ class AddrGeneratorMonero(AddrGenerator):
 		vk_hex = self.to_viewkey(sk_hex)
 		pk_str  = self.encodepoint(scalarmultbase(hex2int_le(sk_hex)))
 		pvk_str = self.encodepoint(scalarmultbase(hex2int_le(vk_hex)))
-		addr_p1 = bytes.fromhex(g.proto.addr_ver_num['monero'][0]) + pk_str + pvk_str
+		addr_p1 = g.proto.addr_fmt_to_ver_bytes('monero') + pk_str + pvk_str
 
 		return CoinAddr(self.b58enc(addr_p1 + self.keccak_256(addr_p1).digest()[:4]))
 

+ 5 - 14
mmgen/obj.py

@@ -497,10 +497,10 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
 		try:
 			assert set(s) <= set(ascii_letters+digits),'contains non-alphanumeric characters'
 			me = str.__new__(cls,s)
-			va = g.proto.verify_addr(s,hex_width=cls.hex_width,return_dict=True)
-			assert va,'coin address {!r} failed verification'.format(s)
-			me.addr_fmt = va['format']
-			me.hex = va['hex']
+			ap = g.proto.parse_addr(s)
+			assert ap,'coin address {!r} could not be parsed'.format(s)
+			me.addr_fmt = ap.fmt
+			me.hex = ap.bytes.hex()
 			return me
 		except Exception as e:
 			return cls.init_fail(e,s,objname='{} address'.format(g.proto.__name__))
@@ -521,21 +521,12 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
 		if g.proto.__name__[:8] == 'Ethereum':
 			return True
 
-		def pfx_ok(pfx):
-			if type(pfx) == tuple:
-				if self[0] in pfx: return True
-			elif self[:len(pfx)] == pfx: return True
-			return False
-
 		proto = g.proto.get_protocol_by_chain(chain)
-		vn = proto.addr_ver_num
 
 		if self.addr_fmt == 'bech32':
 			return self[:len(proto.bech32_hrp)] == proto.bech32_hrp
-		elif self.addr_fmt == 'p2sh' and 'p2sh2' in vn:
-			return pfx_ok(vn['p2sh'][1]) or pfx_ok(vn['p2sh2'][1])
 		else:
-			return pfx_ok(vn[self.addr_fmt][1])
+			return bool(proto.parse_addr(self))
 
 class TokenAddr(CoinAddr):
 	color = 'blue'

+ 61 - 61
mmgen/protocol.py

@@ -21,7 +21,7 @@ protocol.py: Coin protocol functions, classes and methods
 """
 
 import sys,os,hashlib
-from collections import namedtuple
+from collections import namedtuple,OrderedDict
 
 from mmgen.util import msg,ymsg,Msg,ydie
 from mmgen.devtools import *
@@ -30,6 +30,7 @@ from mmgen.globalvars import g
 import mmgen.bech32 as bech32
 
 parsed_wif = namedtuple('parsed_wif',['sec','pubkey_type','compressed'])
+parsed_addr = namedtuple('parsed_addr',['bytes','fmt'])
 
 def hash160(hexnum): # take hex, return hex - OP_HASH160
 	return hashlib.new('ripemd160',hashlib.sha256(bytes.fromhex(hexnum)).digest()).hexdigest()
@@ -71,7 +72,8 @@ class BitcoinProtocol(MMGenObject):
 	name            = 'bitcoin'
 	daemon_name     = 'bitcoind'
 	daemon_family   = 'bitcoind'
-	addr_ver_num    = { 'p2pkh': ('00','1'), 'p2sh':  ('05','3') }
+	addr_ver_bytes  = { '00': 'p2pkh', '05': 'p2sh' }
+	addr_len        = 20
 	wif_ver_num     = { 'std': '80' }
 	mmtypes         = ('L','C','S','B')
 	dfl_mmtype      = 'L'
@@ -102,6 +104,13 @@ class BitcoinProtocol(MMGenObject):
 	secp256k1_ge       = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
 	privkey_len        = 32
 
+	@classmethod
+	def addr_fmt_to_ver_bytes(cls,req_fmt,return_hex=False):
+		for ver_hex,fmt in cls.addr_ver_bytes.items():
+			if req_fmt == fmt:
+				return ver_hex if return_hex else bytes.fromhex(ver_hex)
+		return False
+
 	@classmethod
 	def is_testnet(cls):
 		return cls.__name__[-15:] == 'TestnetProtocol'
@@ -163,41 +172,38 @@ class BitcoinProtocol(MMGenObject):
 		return parsed_wif(key[:cls.privkey_len], pubkey_type, compressed)
 
 	@classmethod
-	def verify_addr(cls,addr,hex_width,return_dict=False):
+	def get_addr_len(cls,addr_fmt):
+		return cls.addr_len
+
+	@classmethod
+	def parse_addr_bytes(cls,addr_bytes):
+		for ver_hex,addr_fmt in cls.addr_ver_bytes.items():
+			ver_bytes = bytes.fromhex(ver_hex)
+			vlen = len(ver_bytes)
+			if addr_bytes[:vlen] == ver_bytes:
+				if len(addr_bytes[vlen:]) == cls.get_addr_len(addr_fmt):
+					return parsed_addr( addr_bytes[vlen:], addr_fmt )
+
+		return False
+
+	@classmethod
+	def parse_addr(cls,addr):
 
 		if 'B' in cls.mmtypes and addr[:len(cls.bech32_hrp)] == cls.bech32_hrp:
 			ret = bech32.decode(cls.bech32_hrp,addr)
+
 			if ret[0] != cls.witness_vernum:
 				msg('{}: Invalid witness version number'.format(ret[0]))
-			elif ret[1]:
-				return {
-					'hex': bytes(ret[1]).hex(),
-					'format': 'bech32'
-				} if return_dict else True
-			return False
-
-		for addr_fmt in cls.addr_ver_num:
-			ver_num,pfx = cls.addr_ver_num[addr_fmt]
-			if type(pfx) == tuple:
-				if addr[0] not in pfx: continue
-			elif addr[:len(pfx)] != pfx: continue
-			addr_hex = _b58chk_decode(addr).hex()
-			if addr_hex[:len(ver_num)] != ver_num: continue
-			return {
-				'hex': addr_hex[len(ver_num):],
-				'format': { 'p2pkh':'p2pkh',
-							'p2sh':'p2sh',
-							'p2sh2':'p2sh',
-							'zcash_z':'zcash_z',
-							'viewkey':'viewkey'}[addr_fmt]
-			} if return_dict else True
+				return False
 
-		return False
+			return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False
+
+		return cls.parse_addr_bytes(_b58chk_decode(addr))
 
 	@classmethod
 	def pubhash2addr(cls,pubkey_hash,p2sh):
 		assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
-		s = cls.addr_ver_num[('p2pkh','p2sh')[p2sh]][0] + pubkey_hash
+		s = cls.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash
 		return _b58chk_encode(bytes.fromhex(s))
 
 	# Segwit:
@@ -218,7 +224,7 @@ class BitcoinProtocol(MMGenObject):
 		return bech32.bech32_encode(cls.bech32_hrp,[cls.witness_vernum]+bech32.convertbits(d,8,5))
 
 class BitcoinTestnetProtocol(BitcoinProtocol):
-	addr_ver_num         = { 'p2pkh': ('6f',('m','n')), 'p2sh':  ('c4','2') }
+	addr_ver_bytes       = { '6f': 'p2pkh', 'c4': 'p2sh' }
 	wif_ver_num          = { 'std': 'ef' }
 	data_subdir          = 'testnet'
 	daemon_data_subdir   = 'testnet3'
@@ -248,7 +254,7 @@ class BitcoinCashProtocol(BitcoinProtocol):
 
 class BitcoinCashTestnetProtocol(BitcoinCashProtocol):
 	rpc_port      = 18442
-	addr_ver_num  = { 'p2pkh': ('6f',('m','n')), 'p2sh':  ('c4','2') }
+	addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
 	wif_ver_num   = { 'std': 'ef' }
 	data_subdir   = 'testnet'
 	daemon_data_subdir = 'testnet3'
@@ -265,7 +271,7 @@ class B2XProtocol(BitcoinProtocol):
 	]
 
 class B2XTestnetProtocol(B2XProtocol):
-	addr_ver_num       = { 'p2pkh': ('6f',('m','n')), 'p2sh':  ('c4','2') }
+	addr_ver_bytes     = { '6f': 'p2pkh', 'c4': 'p2sh' }
 	wif_ver_num        = { 'std': 'ef' }
 	data_subdir        = 'testnet'
 	daemon_data_subdir = 'testnet5'
@@ -277,7 +283,7 @@ class LitecoinProtocol(BitcoinProtocol):
 	daemon_name    = 'litecoind'
 	daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Litecoin') if g.platform == 'win' \
 						else os.path.join(g.home_dir,'.litecoin')
-	addr_ver_num   = { 'p2pkh': ('30','L'), 'p2sh':  ('32','M'), 'p2sh2':  ('05','3') } # 'p2sh' is new fmt
+	addr_ver_bytes = OrderedDict((('30','p2pkh'), ('32','p2sh'), ('05','p2sh'))) # new p2sh ver 0x32 must come first
 	wif_ver_num    = { 'std': 'b0' }
 	mmtypes         = ('L','C','S','B')
 	secs_per_block = 150
@@ -290,7 +296,7 @@ class LitecoinProtocol(BitcoinProtocol):
 
 class LitecoinTestnetProtocol(LitecoinProtocol):
 	# addr ver nums same as Bitcoin testnet, except for 'p2sh'
-	addr_ver_num   = { 'p2pkh': ('6f',('m','n')), 'p2sh':  ('3a','Q'), 'p2sh2':  ('c4','2') }
+	addr_ver_bytes = OrderedDict((('6f','p2pkh'), ('3a','p2sh'), ('c4','p2sh')))
 	wif_ver_num    = { 'std': 'ef' } # same as Bitcoin testnet
 	data_subdir    = 'testnet'
 	daemon_data_subdir = 'testnet4'
@@ -316,7 +322,7 @@ class DummyWIF(object):
 
 class EthereumProtocol(DummyWIF,BitcoinProtocol):
 
-	addr_width = 40
+	addr_len   = 20
 	mmtypes    = ('E',)
 	dfl_mmtype = 'E'
 	name = 'ethereum'
@@ -336,10 +342,10 @@ class EthereumProtocol(DummyWIF,BitcoinProtocol):
 	base_proto  = 'Ethereum'
 
 	@classmethod
-	def verify_addr(cls,addr,hex_width,return_dict=False):
+	def parse_addr(cls,addr):
 		from mmgen.util import is_hex_str_lc
-		if is_hex_str_lc(addr) and len(addr) == cls.addr_width:
-			return { 'hex': addr, 'format': 'ethereum' } if return_dict else True
+		if is_hex_str_lc(addr) and len(addr) == cls.addr_len * 2:
+			return parsed_addr( bytes.fromhex(addr), 'ethereum' )
 		if g.debug: Msg("Invalid address '{}'".format(addr))
 		return False
 
@@ -367,15 +373,15 @@ class EthereumClassicTestnetProtocol(EthereumClassicProtocol):
 class ZcashProtocol(BitcoinProtocolAddrgen):
 	name         = 'zcash'
 	base_coin    = 'ZEC'
-	addr_ver_num = {
-		'p2pkh':   ('1cb8','t1'),
-		'p2sh':    ('1cbd','t3'),
-		'zcash_z': ('169a','zc'),
-		'viewkey': ('a8abd3','ZiVK') }
+	addr_ver_bytes = { '1cb8': 'p2pkh', '1cbd': 'p2sh', '169a': 'zcash_z', 'a8abd3': 'viewkey' }
 	wif_ver_num  = { 'std': '80', 'zcash_z': 'ab36' }
 	mmtypes      = ('L','C','Z')
 	dfl_mmtype   = 'L'
 
+	@classmethod
+	def get_addr_len(cls,addr_fmt):
+		return (20,64)[addr_fmt in ('zcash_z','viewkey')]
+
 	@classmethod
 	def preprocess_key(cls,sec,pubkey_type):
 		if pubkey_type == 'zcash_z': # zero the first four bits
@@ -395,21 +401,17 @@ class ZcashProtocol(BitcoinProtocolAddrgen):
 
 class ZcashTestnetProtocol(ZcashProtocol):
 	wif_ver_num  = { 'std': 'ef', 'zcash_z': 'ac08' }
-	addr_ver_num = {
-		'p2pkh':   ('1d25','tm'),
-		'p2sh':    ('1cba','t2'),
-		'zcash_z': ('16b6','zt'),
-		'viewkey': ('a8ac0c','ZiVt') }
+	addr_ver_bytes = { '1d25': 'p2pkh', '1cba': 'p2sh', '16b6': 'zcash_z', 'a8ac0c': 'viewkey' }
 
 # https://github.com/monero-project/monero/blob/master/src/cryptonote_config.h
 class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen):
 	name         = 'monero'
 	base_coin    = 'XMR'
-	addr_ver_num = { 'monero': ('12','4'), 'monero_sub': ('2a','8') } # 18,42
+	addr_ver_bytes = { '12': 'monero', '2a': 'monero_sub' }
+	addr_len     = 68
 	wif_ver_num  = {}
 	mmtypes      = ('M',)
 	dfl_mmtype   = 'M'
-	addr_width   = 95
 	pubkey_type = 'monero' # required by DummyWIF
 
 	@classmethod
@@ -419,19 +421,16 @@ class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen):
 		return int.to_bytes(n,cls.privkey_len,'big')[::-1]
 
 	@classmethod
-	def verify_addr(cls,addr,hex_width,return_dict=False):
+	def parse_addr(cls,addr):
 
 		from mmgen.baseconv import baseconv,is_b58_str
 
 		def b58dec(addr_str):
 			l = len(addr_str)
-			a = ''.join([baseconv.tohex(addr_str[i*11:i*11+11],'b58',pad=16) for i in range(l//11)])
-			b = baseconv.tohex(addr_str[-(l%11):],'b58',pad=10)
+			a = b''.join([baseconv.tobytes(addr_str[i*11:i*11+11],'b58',pad=8) for i in range(l//11)])
+			b = baseconv.tobytes(addr_str[-(l%11):],'b58',pad=5)
 			return a + b
 
-		assert is_b58_str(addr),'Not valid base-58 string'
-		assert len(addr) == cls.addr_width,'Incorrect width'
-
 		ret = b58dec(addr)
 
 		try:
@@ -440,13 +439,13 @@ class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen):
 		except:
 			from mmgen.keccak import keccak_256
 
-		chk = keccak_256(bytes.fromhex(ret)[:-4]).hexdigest()[:8]
-		assert chk == ret[-8:],'{}: incorrect checksum.  Correct value: {}'.format(ret[-8:],chk)
+		chk = keccak_256(ret[:-4]).digest()[:4]
+		assert ret[-4:] == chk,'{}: incorrect checksum.  Correct value: {}'.format(ret[-4:].hex(),chk.hex())
 
-		return { 'hex': ret, 'format': 'monero' } if return_dict else True
+		return cls.parse_addr_bytes(ret)
 
 class MoneroTestnetProtocol(MoneroProtocol):
-	addr_ver_num = { 'monero': ('35','4'), 'monero_sub': ('3f','8') } # 53,63
+	addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' }
 
 class CoinProtocol(MMGenObject):
 	pi = namedtuple('proto_info',['main_cls','test_cls','trust_level']) # trust levels: see altcoin.py
@@ -526,9 +525,10 @@ def make_init_genonly_altcoins_str(data):
 		o += ["base_coin = '{}'".format(coin)]
 		o += ["name = '{}'".format(e.name.lower())]
 		o += ["nameCaps = '{}'".format(e.name)]
-		a = "addr_ver_num = {{ 'p2pkh': ({},{!r})".format(num2hexstr(e.p2pkh_info[0]),e.p2pkh_info[1])
-		b = ", 'p2sh':  ({},{!r})".format(num2hexstr(e.p2sh_info[0]),e.p2sh_info[1]) if e.p2sh_info else ''
-		o += [a+b+' }']
+		o += ["addr_ver_bytes = {{ {}: 'p2pkh'{} }}".format(
+			num2hexstr(e.p2pkh_info[0]),
+			", {}: 'p2sh'".format(num2hexstr(e.p2sh_info[0])) if e.p2sh_info else ''
+		)]
 		o += ["wif_ver_num = {{ 'std': {} }}".format(num2hexstr(e.wif_ver_num))]
 		o += ["mmtypes = ('L','C'{})".format(",'S'" if e.has_segwit else '')]
 		o += ["dfl_mmtype = '{}'".format('L')]

+ 3 - 1
mmgen/tx.py

@@ -98,7 +98,9 @@ def segwit_is_active(exit_on_error=False):
 		return False
 
 def addr2pubhash(addr):
-	return g.proto.verify_addr(addr,addr.hex_width,return_dict=True)['hex']
+	ap = g.proto.parse_addr(addr)
+	assert ap,'coin address {!r} could not be parsed'.format(addr)
+	return ap.bytes.hex()
 
 def addr2scriptPubKey(addr):
 	return {