protocol.py: make coin protocols attrs of CoinProtocol
This commit is contained in:
parent
8a4fbab48d
commit
04775afb2e
6 changed files with 396 additions and 382 deletions
|
|
@ -960,8 +960,8 @@ Record this checksum: it will be used to verify the password file in the future
|
|||
pw_len_hex = baseconv.seedlen_map_rev['xmrseed'][self.pw_len] * 2
|
||||
# take most significant part
|
||||
bytes_trunc = bytes.fromhex(hex_sec[:pw_len_hex])
|
||||
from .protocol import MoneroProtocol
|
||||
bytes_preproc = MoneroProtocol().preprocess_key(bytes_trunc,None)
|
||||
from .protocol import CoinProtocol
|
||||
bytes_preproc = CoinProtocol.Monero().preprocess_key(bytes_trunc,None)
|
||||
return ' '.join(baseconv.frombytes(bytes_preproc,wl_id='xmrseed'))
|
||||
else:
|
||||
# take least significant part
|
||||
|
|
|
|||
|
|
@ -538,10 +538,10 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
|
|||
|
||||
def is_for_chain(self,chain):
|
||||
|
||||
if type(g.proto).__name__[:8] == 'Ethereum':
|
||||
if type(g.proto).__name__.startswith('Ethereum'):
|
||||
return True
|
||||
|
||||
proto = g.proto.get_protocol_by_chain(chain)
|
||||
proto = CoinProtocol.get_protocol_by_chain(chain)
|
||||
|
||||
if self.addr_fmt == 'bech32':
|
||||
return self[:len(proto.bech32_hrp)] == proto.bech32_hrp
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ def _b58chk_decode(s):
|
|||
finfo = namedtuple('fork_info',['height','hash','name','replayable'])
|
||||
|
||||
class CoinProtocol(MMGenObject):
|
||||
|
||||
proto_info = namedtuple('proto_info',['mainnet','testnet','trust_level']) # trust levels: see altcoin.py
|
||||
coins = {
|
||||
'btc': proto_info('Bitcoin', 'BitcoinTestnet', 5),
|
||||
|
|
@ -82,361 +83,367 @@ class CoinProtocol(MMGenObject):
|
|||
}
|
||||
core_coins = tuple(coins.keys()) # coins may be added by init_genonly_altcoins(), so save
|
||||
|
||||
# chainparams.cpp
|
||||
class BitcoinProtocol(MMGenObject):
|
||||
name = 'bitcoin'
|
||||
daemon_name = 'bitcoind'
|
||||
daemon_family = 'bitcoind'
|
||||
addr_ver_bytes = { '00': 'p2pkh', '05': 'p2sh' }
|
||||
addr_len = 20
|
||||
wif_ver_num = { 'std': '80' }
|
||||
mmtypes = ('L','C','S','B')
|
||||
dfl_mmtype = 'L'
|
||||
data_subdir = ''
|
||||
rpc_port = 8332
|
||||
secs_per_block = 600
|
||||
coin_amt = BTCAmt
|
||||
max_tx_fee = BTCAmt('0.003')
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.bitcoin')
|
||||
daemon_data_subdir = ''
|
||||
sighash_type = 'ALL'
|
||||
block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
|
||||
forks = [
|
||||
finfo(478559,'00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148','BCH',False),
|
||||
finfo(None,'','B2X',True),
|
||||
]
|
||||
caps = ('rbf','segwit')
|
||||
mmcaps = ('key','addr','rpc','tx')
|
||||
base_coin = 'BTC'
|
||||
base_proto = 'Bitcoin'
|
||||
# From BIP173: witness version 'n' is stored as 'OP_n'. OP_0 is encoded as 0x00,
|
||||
# but OP_1 through OP_16 are encoded as 0x51 though 0x60 (81 to 96 in decimal).
|
||||
witness_vernum_hex = '00'
|
||||
witness_vernum = int(witness_vernum_hex,16)
|
||||
bech32_hrp = 'bc'
|
||||
sign_mode = 'daemon'
|
||||
secp256k1_ge = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
|
||||
privkey_len = 32
|
||||
avg_bdi = int(9.7 * 60) # average block discovery interval (historical)
|
||||
|
||||
def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False):
|
||||
for ver_hex,fmt in self.addr_ver_bytes.items():
|
||||
if req_fmt == fmt:
|
||||
return ver_hex if return_hex else bytes.fromhex(ver_hex)
|
||||
return False
|
||||
|
||||
def is_testnet(self):
|
||||
return type(self).__name__[-15:] == 'TestnetProtocol'
|
||||
|
||||
@staticmethod
|
||||
def get_protocol_by_chain(chain):
|
||||
return init_proto(g.coin,{'mainnet':False,'testnet':True,'regtest':True}[chain])
|
||||
|
||||
def cap(self,s): return s in self.caps
|
||||
class Common(MMGenObject):
|
||||
|
||||
def preprocess_key(self,sec,pubkey_type):
|
||||
# Key must be non-zero and less than group order of secp256k1 curve
|
||||
if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge:
|
||||
return sec
|
||||
else: # chance of this is less than 1 in 2^127
|
||||
pk = int.from_bytes(sec,'big')
|
||||
if pk == 0: # chance of this is 1 in 2^256
|
||||
ydie(3,'Private key is zero!')
|
||||
elif pk == self.secp256k1_ge: # ditto
|
||||
ydie(3,'Private key == secp256k1_ge!')
|
||||
def is_testnet(self):
|
||||
return type(self).__name__.endswith('Testnet')
|
||||
|
||||
def cap(self,s):
|
||||
return s in self.caps
|
||||
|
||||
class Bitcoin(Common): # chainparams.cpp
|
||||
name = 'bitcoin'
|
||||
mod_clsname = 'bitcoin'
|
||||
daemon_name = 'bitcoind'
|
||||
daemon_family = 'bitcoind'
|
||||
addr_ver_bytes = { '00': 'p2pkh', '05': 'p2sh' }
|
||||
addr_len = 20
|
||||
wif_ver_num = { 'std': '80' }
|
||||
mmtypes = ('L','C','S','B')
|
||||
dfl_mmtype = 'L'
|
||||
data_subdir = ''
|
||||
rpc_port = 8332
|
||||
secs_per_block = 600
|
||||
coin_amt = BTCAmt
|
||||
max_tx_fee = BTCAmt('0.003')
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.bitcoin')
|
||||
daemon_data_subdir = ''
|
||||
sighash_type = 'ALL'
|
||||
block0 = '000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f'
|
||||
forks = [
|
||||
finfo(478559,'00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148','BCH',False),
|
||||
finfo(None,'','B2X',True),
|
||||
]
|
||||
caps = ('rbf','segwit')
|
||||
mmcaps = ('key','addr','rpc','tx')
|
||||
base_coin = 'BTC'
|
||||
base_proto = 'Bitcoin'
|
||||
# From BIP173: witness version 'n' is stored as 'OP_n'. OP_0 is encoded as 0x00,
|
||||
# but OP_1 through OP_16 are encoded as 0x51 though 0x60 (81 to 96 in decimal).
|
||||
witness_vernum_hex = '00'
|
||||
witness_vernum = int(witness_vernum_hex,16)
|
||||
bech32_hrp = 'bc'
|
||||
sign_mode = 'daemon'
|
||||
secp256k1_ge = 0xfffffffffffffffffffffffffffffffebaaedce6af48a03bbfd25e8cd0364141
|
||||
privkey_len = 32
|
||||
avg_bdi = int(9.7 * 60) # average block discovery interval (historical)
|
||||
|
||||
def addr_fmt_to_ver_bytes(self,req_fmt,return_hex=False):
|
||||
for ver_hex,fmt in self.addr_ver_bytes.items():
|
||||
if req_fmt == fmt:
|
||||
return ver_hex if return_hex else bytes.fromhex(ver_hex)
|
||||
return False
|
||||
|
||||
def preprocess_key(self,sec,pubkey_type):
|
||||
# Key must be non-zero and less than group order of secp256k1 curve
|
||||
if 0 < int.from_bytes(sec,'big') < self.secp256k1_ge:
|
||||
return sec
|
||||
else: # chance of this is less than 1 in 2^127
|
||||
pk = int.from_bytes(sec,'big')
|
||||
if pk == 0: # chance of this is 1 in 2^256
|
||||
ydie(3,'Private key is zero!')
|
||||
elif pk == self.secp256k1_ge: # ditto
|
||||
ydie(3,'Private key == secp256k1_ge!')
|
||||
else:
|
||||
if not g.test_suite:
|
||||
ymsg('Warning: private key is greater than secp256k1 group order!:\n {}'.format(hexpriv))
|
||||
return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big')
|
||||
|
||||
def hex2wif(self,hexpriv,pubkey_type,compressed): # input is preprocessed hex
|
||||
sec = bytes.fromhex(hexpriv)
|
||||
assert len(sec) == self.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec))
|
||||
assert pubkey_type in self.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type)
|
||||
return _b58chk_encode(
|
||||
bytes.fromhex(self.wif_ver_num[pubkey_type])
|
||||
+ sec
|
||||
+ (b'',b'\x01')[bool(compressed)])
|
||||
|
||||
def parse_wif(self,wif):
|
||||
key = _b58chk_decode(wif)
|
||||
|
||||
for k,v in self.wif_ver_num.items():
|
||||
v = bytes.fromhex(v)
|
||||
if key[:len(v)] == v:
|
||||
pubkey_type = k
|
||||
key = key[len(v):]
|
||||
break
|
||||
else:
|
||||
if not g.test_suite:
|
||||
ymsg('Warning: private key is greater than secp256k1 group order!:\n {}'.format(hexpriv))
|
||||
return (pk % self.secp256k1_ge).to_bytes(self.privkey_len,'big')
|
||||
raise ValueError('invalid WIF version number')
|
||||
|
||||
def hex2wif(self,hexpriv,pubkey_type,compressed): # input is preprocessed hex
|
||||
sec = bytes.fromhex(hexpriv)
|
||||
assert len(sec) == self.privkey_len, '{} bytes: incorrect private key length!'.format(len(sec))
|
||||
assert pubkey_type in self.wif_ver_num, '{!r}: invalid pubkey_type'.format(pubkey_type)
|
||||
return _b58chk_encode(
|
||||
bytes.fromhex(self.wif_ver_num[pubkey_type])
|
||||
+ sec
|
||||
+ (b'',b'\x01')[bool(compressed)])
|
||||
if len(key) == self.privkey_len + 1:
|
||||
assert key[-1] == 0x01,'{!r}: invalid compressed key suffix byte'.format(key[-1])
|
||||
compressed = True
|
||||
elif len(key) == self.privkey_len:
|
||||
compressed = False
|
||||
else:
|
||||
raise ValueError('{}: invalid key length'.format(len(key)))
|
||||
|
||||
def parse_wif(self,wif):
|
||||
key = _b58chk_decode(wif)
|
||||
return parsed_wif(key[:self.privkey_len], pubkey_type, compressed)
|
||||
|
||||
for k,v in self.wif_ver_num.items():
|
||||
v = bytes.fromhex(v)
|
||||
if key[:len(v)] == v:
|
||||
pubkey_type = k
|
||||
key = key[len(v):]
|
||||
break
|
||||
else:
|
||||
raise ValueError('invalid WIF version number')
|
||||
def get_addr_len(self,addr_fmt):
|
||||
return self.addr_len
|
||||
|
||||
if len(key) == self.privkey_len + 1:
|
||||
assert key[-1] == 0x01,'{!r}: invalid compressed key suffix byte'.format(key[-1])
|
||||
compressed = True
|
||||
elif len(key) == self.privkey_len:
|
||||
compressed = False
|
||||
else:
|
||||
raise ValueError('{}: invalid key length'.format(len(key)))
|
||||
def parse_addr_bytes(self,addr_bytes):
|
||||
for ver_hex,addr_fmt in self.addr_ver_bytes.items():
|
||||
ver_bytes = bytes.fromhex(ver_hex)
|
||||
vlen = len(ver_bytes)
|
||||
if addr_bytes[:vlen] == ver_bytes:
|
||||
if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt):
|
||||
return parsed_addr( addr_bytes[vlen:], addr_fmt )
|
||||
|
||||
return parsed_wif(key[:self.privkey_len], pubkey_type, compressed)
|
||||
return False
|
||||
|
||||
def get_addr_len(self,addr_fmt):
|
||||
return self.addr_len
|
||||
def parse_addr(self,addr):
|
||||
|
||||
def parse_addr_bytes(self,addr_bytes):
|
||||
for ver_hex,addr_fmt in self.addr_ver_bytes.items():
|
||||
ver_bytes = bytes.fromhex(ver_hex)
|
||||
vlen = len(ver_bytes)
|
||||
if addr_bytes[:vlen] == ver_bytes:
|
||||
if len(addr_bytes[vlen:]) == self.get_addr_len(addr_fmt):
|
||||
return parsed_addr( addr_bytes[vlen:], addr_fmt )
|
||||
if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp:
|
||||
ret = bech32.decode(self.bech32_hrp,addr)
|
||||
|
||||
return False
|
||||
if ret[0] != self.witness_vernum:
|
||||
msg('{}: Invalid witness version number'.format(ret[0]))
|
||||
return False
|
||||
|
||||
def parse_addr(self,addr):
|
||||
return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False
|
||||
|
||||
if 'B' in self.mmtypes and addr[:len(self.bech32_hrp)] == self.bech32_hrp:
|
||||
ret = bech32.decode(self.bech32_hrp,addr)
|
||||
return self.parse_addr_bytes(_b58chk_decode(addr))
|
||||
|
||||
if ret[0] != self.witness_vernum:
|
||||
msg('{}: Invalid witness version number'.format(ret[0]))
|
||||
return False
|
||||
def pubhash2addr(self,pubkey_hash,p2sh):
|
||||
assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
|
||||
s = self.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash
|
||||
return _b58chk_encode(bytes.fromhex(s))
|
||||
|
||||
return parsed_addr( bytes(ret[1]), 'bech32' ) if ret[1] else False
|
||||
# Segwit:
|
||||
def pubhex2redeem_script(self,pubhex):
|
||||
# https://bitcoincore.org/en/segwit_wallet_dev/
|
||||
# The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed
|
||||
# by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash})
|
||||
return self.witness_vernum_hex + '14' + hash160(pubhex)
|
||||
|
||||
return self.parse_addr_bytes(_b58chk_decode(addr))
|
||||
def pubhex2segwitaddr(self,pubhex):
|
||||
return self.pubhash2addr(hash160(self.pubhex2redeem_script(pubhex)),p2sh=True)
|
||||
|
||||
def pubhash2addr(self,pubkey_hash,p2sh):
|
||||
assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
|
||||
s = self.addr_fmt_to_ver_bytes(('p2pkh','p2sh')[p2sh],return_hex=True) + pubkey_hash
|
||||
return _b58chk_encode(bytes.fromhex(s))
|
||||
def pubhash2bech32addr(self,pubhash):
|
||||
d = list(bytes.fromhex(pubhash))
|
||||
return bech32.bech32_encode(self.bech32_hrp,[self.witness_vernum]+bech32.convertbits(d,8,5))
|
||||
|
||||
# Segwit:
|
||||
def pubhex2redeem_script(self,pubhex):
|
||||
# https://bitcoincore.org/en/segwit_wallet_dev/
|
||||
# The P2SH redeemScript is always 22 bytes. It starts with a OP_0, followed
|
||||
# by a canonical push of the keyhash (i.e. 0x0014{20-byte keyhash})
|
||||
return self.witness_vernum_hex + '14' + hash160(pubhex)
|
||||
class BitcoinTestnet(Bitcoin):
|
||||
addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' }
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet3'
|
||||
rpc_port = 18332
|
||||
bech32_hrps = {'testnet':'tb','regtest':'bcrt'}
|
||||
|
||||
def pubhex2segwitaddr(self,pubhex):
|
||||
return self.pubhash2addr(hash160(self.pubhex2redeem_script(pubhex)),p2sh=True)
|
||||
class BitcoinCash(Bitcoin):
|
||||
# TODO: assumes MSWin user installs in custom dir 'Bitcoin_ABC'
|
||||
daemon_name = 'bitcoind-abc'
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_ABC') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.bitcoin-abc')
|
||||
rpc_port = 8442
|
||||
mmtypes = ('L','C')
|
||||
sighash_type = 'ALL|FORKID'
|
||||
forks = [
|
||||
finfo(478559,'000000000000000000651ef99cb9fcbe0dadde1d424bd9f15ff20136191a5eec','BTC',False)
|
||||
]
|
||||
caps = ()
|
||||
coin_amt = BCHAmt
|
||||
max_tx_fee = BCHAmt('0.1')
|
||||
|
||||
def pubhash2bech32addr(self,pubhash):
|
||||
d = list(bytes.fromhex(pubhash))
|
||||
return bech32.bech32_encode(self.bech32_hrp,[self.witness_vernum]+bech32.convertbits(d,8,5))
|
||||
def pubhex2redeem_script(self,pubhex): raise NotImplementedError
|
||||
def pubhex2segwitaddr(self,pubhex): raise NotImplementedError
|
||||
|
||||
class BitcoinTestnetProtocol(BitcoinProtocol):
|
||||
addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' }
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet3'
|
||||
rpc_port = 18332
|
||||
bech32_hrps = {'testnet':'tb','regtest':'bcrt'}
|
||||
class BitcoinCashTestnet(BitcoinCash):
|
||||
rpc_port = 18442
|
||||
addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' }
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet3'
|
||||
|
||||
class BitcoinCashProtocol(BitcoinProtocol):
|
||||
# TODO: assumes MSWin user installs in custom dir 'Bitcoin_ABC'
|
||||
daemon_name = 'bitcoind-abc'
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_ABC') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.bitcoin-abc')
|
||||
rpc_port = 8442
|
||||
mmtypes = ('L','C')
|
||||
sighash_type = 'ALL|FORKID'
|
||||
forks = [
|
||||
finfo(478559,'000000000000000000651ef99cb9fcbe0dadde1d424bd9f15ff20136191a5eec','BTC',False)
|
||||
]
|
||||
caps = ()
|
||||
coin_amt = BCHAmt
|
||||
max_tx_fee = BCHAmt('0.1')
|
||||
class B2X(Bitcoin):
|
||||
daemon_name = 'bitcoind-2x'
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_2X') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.bitcoin-2x')
|
||||
rpc_port = 8338
|
||||
coin_amt = B2XAmt
|
||||
max_tx_fee = B2XAmt('0.1')
|
||||
forks = [
|
||||
finfo(None,'','BTC',True) # activation: 494784
|
||||
]
|
||||
|
||||
def pubhex2redeem_script(self,pubhex): raise NotImplementedError
|
||||
def pubhex2segwitaddr(self,pubhex): raise NotImplementedError
|
||||
class B2XTestnet(B2X):
|
||||
addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' }
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet5'
|
||||
rpc_port = 18338
|
||||
|
||||
class BitcoinCashTestnetProtocol(BitcoinCashProtocol):
|
||||
rpc_port = 18442
|
||||
addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' }
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet3'
|
||||
class Litecoin(Bitcoin):
|
||||
block0 = '12a765e31ffd4059bada1e25190f6e98c99d9714d334efa41a195a7e7e04bfe2'
|
||||
name = 'litecoin'
|
||||
daemon_name = 'litecoind'
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Litecoin') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.litecoin')
|
||||
addr_ver_bytes = { '30': 'p2pkh', '32': 'p2sh', '05': 'p2sh' } # new p2sh ver 0x32 must come first
|
||||
wif_ver_num = { 'std': 'b0' }
|
||||
mmtypes = ('L','C','S','B')
|
||||
secs_per_block = 150
|
||||
rpc_port = 9332
|
||||
coin_amt = LTCAmt
|
||||
max_tx_fee = LTCAmt('0.3')
|
||||
base_coin = 'LTC'
|
||||
forks = []
|
||||
bech32_hrp = 'ltc'
|
||||
avg_bdi = 2 * 60
|
||||
|
||||
class B2XProtocol(BitcoinProtocol):
|
||||
daemon_name = 'bitcoind-2x'
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Bitcoin_2X') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.bitcoin-2x')
|
||||
rpc_port = 8338
|
||||
coin_amt = B2XAmt
|
||||
max_tx_fee = B2XAmt('0.1')
|
||||
forks = [
|
||||
finfo(None,'','BTC',True) # activation: 494784
|
||||
]
|
||||
class LitecoinTestnet(Litecoin):
|
||||
# addr ver nums same as Bitcoin testnet, except for 'p2sh'
|
||||
addr_ver_bytes = { '6f':'p2pkh', '3a':'p2sh', 'c4':'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' } # same as Bitcoin testnet
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet4'
|
||||
rpc_port = 19332
|
||||
bech32_hrps = {'testnet':'tltc','regtest':'rltc'}
|
||||
|
||||
class B2XTestnetProtocol(B2XProtocol):
|
||||
addr_ver_bytes = { '6f': 'p2pkh', 'c4': 'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' }
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet5'
|
||||
rpc_port = 18338
|
||||
class BitcoinAddrgen(Bitcoin):
|
||||
mmcaps = ('key','addr')
|
||||
|
||||
class LitecoinProtocol(BitcoinProtocol):
|
||||
block0 = '12a765e31ffd4059bada1e25190f6e98c99d9714d334efa41a195a7e7e04bfe2'
|
||||
name = 'litecoin'
|
||||
daemon_name = 'litecoind'
|
||||
daemon_data_dir = os.path.join(os.getenv('APPDATA'),'Litecoin') if g.platform == 'win' \
|
||||
else os.path.join(g.home_dir,'.litecoin')
|
||||
addr_ver_bytes = { '30': 'p2pkh', '32': 'p2sh', '05': 'p2sh' } # new p2sh ver 0x32 must come first
|
||||
wif_ver_num = { 'std': 'b0' }
|
||||
mmtypes = ('L','C','S','B')
|
||||
secs_per_block = 150
|
||||
rpc_port = 9332
|
||||
coin_amt = LTCAmt
|
||||
max_tx_fee = LTCAmt('0.3')
|
||||
base_coin = 'LTC'
|
||||
forks = []
|
||||
bech32_hrp = 'ltc'
|
||||
avg_bdi = 2 * 60
|
||||
class BitcoinAddrgenTestnet(BitcoinTestnet):
|
||||
mmcaps = ('key','addr')
|
||||
|
||||
class LitecoinTestnetProtocol(LitecoinProtocol):
|
||||
# addr ver nums same as Bitcoin testnet, except for 'p2sh'
|
||||
addr_ver_bytes = { '6f':'p2pkh', '3a':'p2sh', 'c4':'p2sh' }
|
||||
wif_ver_num = { 'std': 'ef' } # same as Bitcoin testnet
|
||||
data_subdir = 'testnet'
|
||||
daemon_data_subdir = 'testnet4'
|
||||
rpc_port = 19332
|
||||
bech32_hrps = {'testnet':'tltc','regtest':'rltc'}
|
||||
class DummyWIF(object):
|
||||
|
||||
class BitcoinProtocolAddrgen(BitcoinProtocol): mmcaps = ('key','addr')
|
||||
class BitcoinTestnetProtocolAddrgen(BitcoinTestnetProtocol): mmcaps = ('key','addr')
|
||||
def hex2wif(self,hexpriv,pubkey_type,compressed):
|
||||
n = self.name.capitalize()
|
||||
assert pubkey_type == self.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n)
|
||||
assert compressed == False,'{} does not support compressed pubkeys!'.format(n)
|
||||
return hexpriv
|
||||
|
||||
class DummyWIF(object):
|
||||
def parse_wif(self,wif):
|
||||
return parsed_wif(bytes.fromhex(wif), self.pubkey_type, False)
|
||||
|
||||
def hex2wif(self,hexpriv,pubkey_type,compressed):
|
||||
n = self.name.capitalize()
|
||||
assert pubkey_type == self.pubkey_type,'{}: invalid pubkey_type for {}!'.format(pubkey_type,n)
|
||||
assert compressed == False,'{} does not support compressed pubkeys!'.format(n)
|
||||
return hexpriv
|
||||
class Ethereum(DummyWIF,Bitcoin):
|
||||
|
||||
def parse_wif(self,wif):
|
||||
return parsed_wif(bytes.fromhex(wif), self.pubkey_type, False)
|
||||
addr_len = 20
|
||||
mmtypes = ('E',)
|
||||
dfl_mmtype = 'E'
|
||||
name = 'ethereum'
|
||||
mod_clsname = 'ethereum'
|
||||
base_coin = 'ETH'
|
||||
pubkey_type = 'std' # required by DummyWIF
|
||||
|
||||
class EthereumProtocol(DummyWIF,BitcoinProtocol):
|
||||
data_subdir = ''
|
||||
daemon_name = 'parity'
|
||||
daemon_family = 'parity'
|
||||
rpc_port = 8545
|
||||
mmcaps = ('key','addr','rpc')
|
||||
coin_amt = ETHAmt
|
||||
max_tx_fee = ETHAmt('0.005')
|
||||
chain_name = 'foundation'
|
||||
sign_mode = 'standalone'
|
||||
caps = ('token',)
|
||||
base_proto = 'Ethereum'
|
||||
|
||||
addr_len = 20
|
||||
mmtypes = ('E',)
|
||||
dfl_mmtype = 'E'
|
||||
name = 'ethereum'
|
||||
base_coin = 'ETH'
|
||||
pubkey_type = 'std' # required by DummyWIF
|
||||
def parse_addr(self,addr):
|
||||
from .util import is_hex_str_lc
|
||||
if is_hex_str_lc(addr) and len(addr) == self.addr_len * 2:
|
||||
return parsed_addr( bytes.fromhex(addr), 'ethereum' )
|
||||
if g.debug: Msg("Invalid address '{}'".format(addr))
|
||||
return False
|
||||
|
||||
data_subdir = ''
|
||||
daemon_name = 'parity'
|
||||
daemon_family = 'parity'
|
||||
rpc_port = 8545
|
||||
mmcaps = ('key','addr','rpc')
|
||||
coin_amt = ETHAmt
|
||||
max_tx_fee = ETHAmt('0.005')
|
||||
chain_name = 'foundation'
|
||||
sign_mode = 'standalone'
|
||||
caps = ('token',)
|
||||
base_proto = 'Ethereum'
|
||||
def pubhash2addr(self,pubkey_hash,p2sh):
|
||||
assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
|
||||
assert not p2sh,'Ethereum has no P2SH address format'
|
||||
return pubkey_hash
|
||||
|
||||
def parse_addr(self,addr):
|
||||
from .util import is_hex_str_lc
|
||||
if is_hex_str_lc(addr) and len(addr) == self.addr_len * 2:
|
||||
return parsed_addr( bytes.fromhex(addr), 'ethereum' )
|
||||
if g.debug: Msg("Invalid address '{}'".format(addr))
|
||||
return False
|
||||
class EthereumTestnet(Ethereum):
|
||||
data_subdir = 'testnet'
|
||||
rpc_port = 8547 # start Parity with --jsonrpc-port=8547 or --ports-shift=2
|
||||
chain_name = 'kovan'
|
||||
|
||||
def pubhash2addr(self,pubkey_hash,p2sh):
|
||||
assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
|
||||
assert not p2sh,'Ethereum has no P2SH address format'
|
||||
return pubkey_hash
|
||||
class EthereumClassic(Ethereum):
|
||||
name = 'ethereumClassic' # TODO
|
||||
rpc_port = 8555 # start Parity with --jsonrpc-port=8555 or --ports-shift=10
|
||||
chain_name = 'ethereum_classic' # chain_id 0x3d (61)
|
||||
|
||||
class EthereumTestnetProtocol(EthereumProtocol):
|
||||
data_subdir = 'testnet'
|
||||
rpc_port = 8547 # start Parity with --jsonrpc-port=8547 or --ports-shift=2
|
||||
chain_name = 'kovan'
|
||||
class EthereumClassicTestnet(EthereumClassic):
|
||||
rpc_port = 8557 # start Parity with --jsonrpc-port=8557 or --ports-shift=12
|
||||
chain_name = 'classic-testnet' # aka Morden, chain_id 0x3e (62) (UNTESTED)
|
||||
|
||||
class EthereumClassicProtocol(EthereumProtocol):
|
||||
name = 'ethereumClassic'
|
||||
class_pfx = 'Ethereum'
|
||||
rpc_port = 8555 # start Parity with --jsonrpc-port=8555 or --ports-shift=10
|
||||
chain_name = 'ethereum_classic' # chain_id 0x3d (61)
|
||||
class Zcash(BitcoinAddrgen):
|
||||
name = 'zcash'
|
||||
base_coin = 'ZEC'
|
||||
addr_ver_bytes = { '1cb8': 'p2pkh', '1cbd': 'p2sh', '169a': 'zcash_z', 'a8abd3': 'viewkey' }
|
||||
wif_ver_num = { 'std': '80', 'zcash_z': 'ab36' }
|
||||
mmtypes = ('L','C','Z')
|
||||
dfl_mmtype = 'L'
|
||||
|
||||
class EthereumClassicTestnetProtocol(EthereumClassicProtocol):
|
||||
rpc_port = 8557 # start Parity with --jsonrpc-port=8557 or --ports-shift=12
|
||||
chain_name = 'classic-testnet' # aka Morden, chain_id 0x3e (62) (UNTESTED)
|
||||
def get_addr_len(self,addr_fmt):
|
||||
return (20,64)[addr_fmt in ('zcash_z','viewkey')]
|
||||
|
||||
class ZcashProtocol(BitcoinProtocolAddrgen):
|
||||
name = 'zcash'
|
||||
base_coin = 'ZEC'
|
||||
addr_ver_bytes = { '1cb8': 'p2pkh', '1cbd': 'p2sh', '169a': 'zcash_z', 'a8abd3': 'viewkey' }
|
||||
wif_ver_num = { 'std': '80', 'zcash_z': 'ab36' }
|
||||
mmtypes = ('L','C','Z')
|
||||
dfl_mmtype = 'L'
|
||||
def preprocess_key(self,sec,pubkey_type):
|
||||
if pubkey_type == 'zcash_z': # zero the first four bits
|
||||
return bytes([sec[0] & 0x0f]) + sec[1:]
|
||||
else:
|
||||
return super().preprocess_key(sec,pubkey_type)
|
||||
|
||||
def get_addr_len(self,addr_fmt):
|
||||
return (20,64)[addr_fmt in ('zcash_z','viewkey')]
|
||||
def pubhash2addr(self,pubkey_hash,p2sh):
|
||||
hl = len(pubkey_hash)
|
||||
if hl == 40:
|
||||
return super().pubhash2addr(pubkey_hash,p2sh)
|
||||
elif hl == 128:
|
||||
raise NotImplementedError('Zcash z-addresses have no pubkey hash')
|
||||
else:
|
||||
raise ValueError('{}: incorrect pubkey_hash length'.format(hl))
|
||||
|
||||
def preprocess_key(self,sec,pubkey_type):
|
||||
if pubkey_type == 'zcash_z': # zero the first four bits
|
||||
return bytes([sec[0] & 0x0f]) + sec[1:]
|
||||
else:
|
||||
return super().preprocess_key(sec,pubkey_type)
|
||||
|
||||
def pubhash2addr(self,pubkey_hash,p2sh):
|
||||
hl = len(pubkey_hash)
|
||||
if hl == 40:
|
||||
return super().pubhash2addr(pubkey_hash,p2sh)
|
||||
elif hl == 128:
|
||||
raise NotImplementedError('Zcash z-addresses have no pubkey hash')
|
||||
else:
|
||||
raise ValueError('{}: incorrect pubkey_hash length'.format(hl))
|
||||
|
||||
class ZcashTestnetProtocol(ZcashProtocol):
|
||||
wif_ver_num = { 'std': 'ef', 'zcash_z': 'ac08' }
|
||||
addr_ver_bytes = { '1d25': 'p2pkh', '1cba': 'p2sh', '16b6': 'zcash_z', 'a8ac0c': 'viewkey' }
|
||||
class ZcashTestnet(Zcash):
|
||||
wif_ver_num = { 'std': 'ef', 'zcash_z': 'ac08' }
|
||||
addr_ver_bytes = { '1d25': 'p2pkh', '1cba': 'p2sh', '16b6': 'zcash_z', 'a8ac0c': 'viewkey' }
|
||||
|
||||
# https://github.com/monero-project/monero/blob/master/src/cryptonote_config.h
|
||||
class MoneroProtocol(DummyWIF,BitcoinProtocolAddrgen):
|
||||
name = 'monero'
|
||||
base_coin = 'XMR'
|
||||
addr_ver_bytes = { '12': 'monero', '2a': 'monero_sub' }
|
||||
addr_len = 68
|
||||
wif_ver_num = {}
|
||||
mmtypes = ('M',)
|
||||
dfl_mmtype = 'M'
|
||||
pubkey_type = 'monero' # required by DummyWIF
|
||||
class Monero(DummyWIF,BitcoinAddrgen):
|
||||
name = 'monero'
|
||||
base_coin = 'XMR'
|
||||
addr_ver_bytes = { '12': 'monero', '2a': 'monero_sub' }
|
||||
addr_len = 68
|
||||
wif_ver_num = {}
|
||||
mmtypes = ('M',)
|
||||
dfl_mmtype = 'M'
|
||||
pubkey_type = 'monero' # required by DummyWIF
|
||||
|
||||
def preprocess_key(self,sec,pubkey_type): # reduce key
|
||||
from .ed25519 import l
|
||||
n = int.from_bytes(sec[::-1],'big') % l
|
||||
return int.to_bytes(n,self.privkey_len,'big')[::-1]
|
||||
def preprocess_key(self,sec,pubkey_type): # reduce key
|
||||
from .ed25519 import l
|
||||
n = int.from_bytes(sec[::-1],'big') % l
|
||||
return int.to_bytes(n,self.privkey_len,'big')[::-1]
|
||||
|
||||
def parse_addr(self,addr):
|
||||
def parse_addr(self,addr):
|
||||
|
||||
from .baseconv import baseconv,is_b58_str
|
||||
from .baseconv import baseconv,is_b58_str
|
||||
|
||||
def b58dec(addr_str):
|
||||
l = len(addr_str)
|
||||
a = b''.join([baseconv.tobytes(addr_str[i*11:i*11+11],'b58',pad=8) for i in range(l//11)])
|
||||
b = baseconv.tobytes(addr_str[-(l%11):],'b58',pad=5)
|
||||
return a + b
|
||||
def b58dec(addr_str):
|
||||
l = len(addr_str)
|
||||
a = b''.join([baseconv.tobytes(addr_str[i*11:i*11+11],'b58',pad=8) for i in range(l//11)])
|
||||
b = baseconv.tobytes(addr_str[-(l%11):],'b58',pad=5)
|
||||
return a + b
|
||||
|
||||
ret = b58dec(addr)
|
||||
ret = b58dec(addr)
|
||||
|
||||
try:
|
||||
assert not g.use_internal_keccak_module
|
||||
from sha3 import keccak_256
|
||||
except:
|
||||
from .keccak import keccak_256
|
||||
try:
|
||||
assert not g.use_internal_keccak_module
|
||||
from sha3 import keccak_256
|
||||
except:
|
||||
from .keccak import keccak_256
|
||||
|
||||
chk = keccak_256(ret[:-4]).digest()[:4]
|
||||
assert ret[-4:] == chk,'{}: incorrect checksum. Correct value: {}'.format(ret[-4:].hex(),chk.hex())
|
||||
chk = keccak_256(ret[:-4]).digest()[:4]
|
||||
assert ret[-4:] == chk,'{}: incorrect checksum. Correct value: {}'.format(ret[-4:].hex(),chk.hex())
|
||||
|
||||
return self.parse_addr_bytes(ret)
|
||||
return self.parse_addr_bytes(ret)
|
||||
|
||||
class MoneroTestnetProtocol(MoneroProtocol):
|
||||
addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' }
|
||||
class MoneroTestnet(Monero):
|
||||
addr_ver_bytes = { '35': 'monero', '3f': 'monero_sub' }
|
||||
|
||||
def init_proto(coin,testnet):
|
||||
coin = coin.lower()
|
||||
|
|
@ -446,7 +453,7 @@ def init_proto(coin,testnet):
|
|||
'{}: not a valid coin for network {}\nSupported coins: {}'.format(
|
||||
coin.upper(),g.network.upper(),
|
||||
' '.join(c.upper() for c in CoinProtocol.coins) ))
|
||||
proto = globals()[CoinProtocol.coins[coin][testnet] + 'Protocol']
|
||||
proto = getattr(CoinProtocol,CoinProtocol.coins[coin][testnet])
|
||||
if hasattr(proto,'bech32_hrps'):
|
||||
proto.bech32_hrp = proto.bech32_hrps[('testnet','regtest')[g.regtest]]
|
||||
return proto()
|
||||
|
|
@ -490,42 +497,51 @@ def make_init_genonly_altcoins_str(data):
|
|||
|
||||
def make_proto(e,testnet=False):
|
||||
tn_str = 'Testnet' if testnet else ''
|
||||
proto,coin = '{}{}Protocol'.format(e.name,tn_str),e.symbol
|
||||
if proto[0] in '0123456789': proto = 'X_'+proto
|
||||
if proto in globals(): return ''
|
||||
if coin.lower() in CoinProtocol.coins: return ''
|
||||
proto = e.name + tn_str
|
||||
coin = e.symbol
|
||||
if proto[0] in '0123456789':
|
||||
proto = 'X_'+proto
|
||||
if hasattr(CoinProtocol,proto) or coin.lower() in CoinProtocol.coins:
|
||||
return ''
|
||||
|
||||
def num2hexstr(n):
|
||||
return "'{:0{}x}'".format(n,(4,2)[n < 256])
|
||||
|
||||
o = ['class {}(Bitcoin{}ProtocolAddrgen):'.format(proto,tn_str)]
|
||||
o += ["base_coin = '{}'".format(coin)]
|
||||
o += ["name = '{}'".format(e.name.lower())]
|
||||
o += ["nameCaps = '{}'".format(e.name)]
|
||||
o += ["addr_ver_bytes = {{ {}: 'p2pkh'{} }}".format(
|
||||
num2hexstr(e.p2pkh_info[0]),
|
||||
", {}: 'p2sh'".format(num2hexstr(e.p2sh_info[0])) if e.p2sh_info else ''
|
||||
)]
|
||||
o += ["wif_ver_num = {{ 'std': {} }}".format(num2hexstr(e.wif_ver_num))]
|
||||
o += ["mmtypes = ('L','C'{})".format(",'S'" if e.has_segwit else '')]
|
||||
o += ["dfl_mmtype = '{}'".format('L')]
|
||||
return '\n\t'.join(o) + '\n'
|
||||
p2sh_info = ", {}: 'p2sh'".format(num2hexstr(e.p2sh_info[0])) if e.p2sh_info else ''
|
||||
sw_mmtype = ",'S'" if e.has_segwit else ''
|
||||
|
||||
out = ''
|
||||
for e in data['mainnet']:
|
||||
out += make_proto(e)
|
||||
for e in data['testnet']:
|
||||
out += make_proto(e,testnet=True)
|
||||
return f"""
|
||||
class {proto}(CoinProtocol.BitcoinAddrgen{tn_str}):
|
||||
base_coin = {coin!r}
|
||||
name = {e.name.lower()!r}
|
||||
nameCaps = {e.name!r}
|
||||
addr_ver_bytes = {{ {num2hexstr(e.p2pkh_info[0])}: 'p2pkh'{p2sh_info} }}
|
||||
wif_ver_num = {{ 'std': {num2hexstr(e.wif_ver_num)} }}
|
||||
mmtypes = ('L','C'{sw_mmtype})
|
||||
dfl_mmtype = 'L'
|
||||
""".rstrip()
|
||||
|
||||
tn_coins = [e.symbol for e in data['testnet']]
|
||||
fs = "CoinProtocol.coins['{}'] = CoinProtocol.proto_info('{}',{},{})\n"
|
||||
for e in data['mainnet']:
|
||||
proto,coin = e.name,e.symbol
|
||||
if proto[0] in '0123456789': proto = 'X_'+proto
|
||||
if proto+'Protocol' in globals(): continue
|
||||
if coin.lower() in CoinProtocol.coins: continue
|
||||
out += fs.format(coin.lower(),proto,('None',f"'{proto}Testnet'")[coin in tn_coins],e.trust_level)
|
||||
return out
|
||||
def gen_text():
|
||||
yield "class CoinProtocol(CoinProtocol):"
|
||||
for e in data['mainnet']:
|
||||
yield make_proto(e)
|
||||
for e in data['testnet']:
|
||||
yield make_proto(e,testnet=True)
|
||||
yield ''
|
||||
|
||||
for e in data['mainnet']:
|
||||
proto,coin = e.name,e.symbol
|
||||
if proto[0] in '0123456789':
|
||||
proto = 'X_'+proto
|
||||
if hasattr(CoinProtocol,proto) or coin.lower() in CoinProtocol.coins:
|
||||
continue
|
||||
yield 'CoinProtocol.coins[{!r}] = CoinProtocol.proto_info({!r},{},{})'.format(
|
||||
coin.lower(),
|
||||
proto,
|
||||
('None',f"'{proto}Testnet'")[coin in [e.symbol for e in data['testnet']]],
|
||||
e.trust_level )
|
||||
|
||||
return '\n'.join(gen_text()) + '\n'
|
||||
|
||||
def init_coin(coin,testnet=None):
|
||||
if testnet is not None:
|
||||
|
|
|
|||
|
|
@ -526,8 +526,8 @@ class MMGenToolCmdMnemonic(MMGenToolCmds):
|
|||
|
||||
@staticmethod
|
||||
def _xmr_reduce(bytestr):
|
||||
from .protocol import MoneroProtocol
|
||||
p = MoneroProtocol()
|
||||
from .protocol import CoinProtocol
|
||||
p = CoinProtocol.Monero()
|
||||
if len(bytestr) != p.privkey_len:
|
||||
m = '{!r}: invalid bit length for Monero private key (must be {})'
|
||||
die(1,m.format(len(bytestr*8),p.privkey_len*8))
|
||||
|
|
|
|||
|
|
@ -860,11 +860,10 @@ def format_par(s,indent=0,width=80,as_list=False):
|
|||
def altcoin_subclass(cls,mod_id,cls_name):
|
||||
if cls.__name__ != cls_name: return cls
|
||||
mod_dir = g.proto.base_coin.lower()
|
||||
pname = g.proto.class_pfx if hasattr(g.proto,'class_pfx') else capfirst(g.proto.name)
|
||||
tname = 'Token' if g.token else ''
|
||||
import importlib
|
||||
modname = 'mmgen.altcoins.{}.{}'.format(mod_dir,mod_id)
|
||||
clsname = '{}{}{}'.format(pname,tname,cls_name)
|
||||
clsname = '{}{}{}'.format(capfirst(g.proto.mod_clsname),tname,cls_name)
|
||||
try:
|
||||
return getattr(importlib.import_module(modname),clsname)
|
||||
except ImportError:
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
|
|||
from mmgen.common import *
|
||||
from mmgen.exception import *
|
||||
|
||||
from mmgen.protocol import init_coin,EthereumProtocol
|
||||
from mmgen.protocol import init_coin
|
||||
from mmgen.rpc import MoneroWalletRPCClient
|
||||
from mmgen.daemon import CoinDaemon,MoneroWalletDaemon
|
||||
|
||||
|
|
@ -72,7 +72,6 @@ class unit_tests:
|
|||
async def run_test():
|
||||
qmsg(' Testing backend {!r}'.format(type(g.rpc.backend).__name__))
|
||||
ret = await g.rpc.call('parity_versionInfo',timeout=300)
|
||||
#print(ret)
|
||||
|
||||
for backend in g.autoset_opts['rpc_backend'].choices:
|
||||
run_session(run_test(),backend=backend)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue