View key support for Zcash z-addresses
- view keys are included in key-address lists - tests updated to test the new functionality - mmgen-tool: `compressed` and `segwit` args replaced by --type option
This commit is contained in:
parent
1cc64e1eb9
commit
4876ee47ed
14 changed files with 551 additions and 450 deletions
34
README.md
34
README.md
|
|
@ -4,33 +4,35 @@ easier way to install MMGen, check out the prebuilt bootable USB images on the
|
|||
|
||||
# MMGen = Multi-Mode GENerator
|
||||
|
||||
##### a Bitcoin cold-storage solution for the command line
|
||||
##### a Bitcoin and altcoin online/offline software wallet for the command line
|
||||
|
||||
### Description
|
||||
|
||||
MMGen is a Bitcoin cold-storage system implemented as a suite of Python
|
||||
command-line scripts requiring only a bare minimum of system resources. The
|
||||
scripts work in tandem with a reference Bitcoin Core daemon (bitcoind) running
|
||||
on both an online and an offline air-gapped computer to provide a robust
|
||||
solution for securely storing, tracking, sending and receiving Bitcoins. To
|
||||
track address balances without exposing keys on the online computer, MMGen
|
||||
relies on Bitcoin Core’s watch-only address support. Ordinary Bitcoin addresses
|
||||
can be tracked and spent too, creating an easy migration path from other
|
||||
wallets.
|
||||
MMGen is a wallet and cold storage solution for Bitcoin (and selected altcoins)
|
||||
implemented as a suite of lightweight Python scripts. The scripts work in
|
||||
tandem with a reference Bitcoin Core daemon (or altcoin daemon) running on both
|
||||
an online and offline computer to provide a robust solution for securely
|
||||
storing, tracking, sending and receiving Bitcoins.
|
||||
|
||||
The online computer is used only for tracking balances and creating and sending
|
||||
transactions. **Thus it holds no private keys that can be hacked or stolen.**
|
||||
All transactions are signed offline: **your seed and private keys never touch a
|
||||
network-connected device.** The offline computer used for wallet creation,
|
||||
address generation and transaction signing is typically a low-powered device
|
||||
such as a Raspberry Pi.
|
||||
|
||||
MMGen is designed for reliability by having the Bitcoin daemon itself, rather
|
||||
than less-tested third-party software, do all the “heavy lifting” of tracking
|
||||
and signing transactions. It’s also designed for privacy: unlike some other
|
||||
online/offline wallet solutions, MMGen plus Bitcoin Core is a completely
|
||||
and signing transactions. It’s also designed with privacy in mind: unlike some
|
||||
other online/offline wallet solutions, MMGen plus Bitcoin Core is a completely
|
||||
self-contained system that makes **no connections to the Internet** except for
|
||||
the Bitcoin network itself: no third parties are involved, and thus no
|
||||
information about which addresses you’re tracking is leaked to the outside
|
||||
information about the addresses you’re tracking is leaked to the outside
|
||||
world.
|
||||
|
||||
Like all deterministic wallets, MMGen can generate a virtually unlimited number
|
||||
of address/key pairs from a single seed. Your wallet never changes, so you need
|
||||
back it up only once. Transactions are signed offline: your seed and private
|
||||
keys never touch an online computer.
|
||||
back it up only once.
|
||||
|
||||
At the heart of the MMGen system is the seed, the “master key” providing access
|
||||
to all your Bitcoins. The seed can be stored in five different ways:
|
||||
|
|
@ -107,7 +109,7 @@ future use in an address file, which addresses may safely be made public.
|
|||
|
||||
> #### [Recovering your keys without the MMGen software][r]
|
||||
|
||||
> #### [Forkcoin and Altcoin support (BCH,LTC)][x]
|
||||
> #### [Forkcoin and Altcoin support (BCH,LTC,ETH,ETC,DASH,ZEC)][x]
|
||||
|
||||
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
|
||||
|
||||
|
|
|
|||
184
mmgen/addr.py
184
mmgen/addr.py
|
|
@ -51,15 +51,15 @@ class AddrGeneratorP2PKH(AddrGenerator):
|
|||
return CoinAddr(g.proto.pubhash2addr(hash160(pubhex),p2sh=False))
|
||||
|
||||
def to_segwit_redeem_script(self,pubhex):
|
||||
raise NotImplementedError
|
||||
raise NotImplementedError,'Coin/type pair incompatible with Segwit'
|
||||
|
||||
class AddrGeneratorSegwit(AddrGenerator):
|
||||
def to_addr(self,pubhex):
|
||||
assert pubhex.compressed
|
||||
assert pubhex.compressed,'Uncompressed public keys incompatible with Segwit'
|
||||
return CoinAddr(g.proto.pubhex2segwitaddr(pubhex))
|
||||
|
||||
def to_segwit_redeem_script(self,pubhex):
|
||||
assert pubhex.compressed
|
||||
assert pubhex.compressed,'Uncompressed public keys incompatible with Segwit'
|
||||
return HexStr(g.proto.pubhex2redeem_script(pubhex))
|
||||
|
||||
class AddrGeneratorEthereum(AddrGenerator):
|
||||
|
|
@ -69,33 +69,42 @@ class AddrGeneratorEthereum(AddrGenerator):
|
|||
return CoinAddr(sha3.keccak_256(pubhex[2:].decode('hex')).digest()[12:].encode('hex'))
|
||||
|
||||
def to_segwit_redeem_script(self,pubhex):
|
||||
raise NotImplementedError
|
||||
raise NotImplementedError,'Coin/type pair incompatible with Segwit'
|
||||
|
||||
# github.com/FiloSottile/zcash-mini/zcash/address.go
|
||||
class AddrGeneratorZcashZ(AddrGenerator):
|
||||
|
||||
def zhash256(self,vhex,t):
|
||||
byte0 = '{:02x}'.format(int(vhex[:2],16) | 0xc0)
|
||||
byte32 = '{:02x}'.format(t)
|
||||
vhex_fix = byte0 + vhex[2:64] + byte32 + '00' * 31
|
||||
assert len(vhex_fix) == 128
|
||||
def zhash256(self,s,t):
|
||||
s = map(ord,s+'\0'*32)
|
||||
s[0] |= 0xc0
|
||||
s[32] = t
|
||||
from mmgen.sha256 import Sha256
|
||||
return Sha256(unhexlify(vhex_fix),preprocess=False).hexdigest()
|
||||
return Sha256(map(chr,s),preprocess=False).digest()
|
||||
|
||||
def to_addr(self,pubhex): # pubhex is really privhex
|
||||
key = pubhex
|
||||
assert len(key) == 64,'{}: incorrect privkey length'.format(len(key))
|
||||
addr1 = self.zhash256(key,0)
|
||||
addr2 = self.zhash256(key,1)
|
||||
key = pubhex.decode('hex')
|
||||
assert len(key) == 32,'{}: incorrect privkey length'.format(len(key))
|
||||
from nacl.bindings import crypto_scalarmult_base
|
||||
addr2 = hexlify(crypto_scalarmult_base(unhexlify(addr2)))
|
||||
|
||||
p2 = crypto_scalarmult_base(self.zhash256(key,1))
|
||||
from mmgen.protocol import _b58chk_encode
|
||||
ret = _b58chk_encode(g.proto.addr_ver_num['zcash_z'][0] + addr1 + addr2)
|
||||
assert len(ret) == g.proto.addr_width,'Invalid zaddr length'
|
||||
ret = _b58chk_encode(g.proto.addr_ver_num['zcash_z'][0] + hexlify(self.zhash256(key,0)+p2))
|
||||
assert len(ret) == g.proto.addr_width,'Invalid Zcash z-address length'
|
||||
return CoinAddr(ret)
|
||||
|
||||
def to_viewkey(self,pubhex): # pubhex is really privhex
|
||||
key = pubhex.decode('hex')
|
||||
assert len(key) == 32,'{}: incorrect privkey length'.format(len(key))
|
||||
vk = map(ord,self.zhash256(key,0)+self.zhash256(key,1))
|
||||
vk[32] &= 0xf8
|
||||
vk[63] &= 0x7f
|
||||
vk[63] |= 0x40
|
||||
from mmgen.protocol import _b58chk_encode
|
||||
ret = _b58chk_encode(g.proto.addr_ver_num['viewkey'][0] + hexlify(''.join(map(chr,vk))))
|
||||
assert len(ret) == g.proto.addr_width,'Invalid Zcash view key length'
|
||||
return ZcashViewKey(ret)
|
||||
|
||||
def to_segwit_redeem_script(self,pubhex):
|
||||
raise NotImplementedError
|
||||
raise NotImplementedError,'Zcash z-addresses incompatible with Segwit'
|
||||
|
||||
class KeyGenerator(MMGenObject):
|
||||
|
||||
|
|
@ -174,10 +183,11 @@ class KeyGeneratorDummy(KeyGenerator):
|
|||
return PubKey(str(privhex),compressed=privhex.compressed)
|
||||
|
||||
class AddrListEntry(MMGenListItem):
|
||||
addr = MMGenListItemAttr('addr','CoinAddr')
|
||||
idx = MMGenListItemAttr('idx','AddrIdx') # not present in flat addrlists
|
||||
label = MMGenListItemAttr('label','TwComment',reassign_ok=True)
|
||||
sec = MMGenListItemAttr('sec',PrivKey,typeconv=False)
|
||||
addr = MMGenListItemAttr('addr','CoinAddr')
|
||||
viewkey = MMGenListItemAttr('viewkey','ZcashViewKey')
|
||||
idx = MMGenListItemAttr('idx','AddrIdx') # not present in flat addrlists
|
||||
label = MMGenListItemAttr('label','TwComment',reassign_ok=True)
|
||||
sec = MMGenListItemAttr('sec',PrivKey,typeconv=False)
|
||||
|
||||
class PasswordListEntry(MMGenListItem):
|
||||
passwd = MMGenImmutableAttr('passwd',unicode,typeconv=False) # TODO: create Password type
|
||||
|
|
@ -317,6 +327,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
|
||||
compressed = self.al_id.mmtype.compressed
|
||||
pubkey_type = self.al_id.mmtype.pubkey_type
|
||||
has_viewkey = self.al_id.mmtype.has_viewkey
|
||||
|
||||
if self.gen_addrs:
|
||||
kg = KeyGenerator(pubkey_type)
|
||||
|
|
@ -342,7 +353,10 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),compressed=compressed,pubkey_type=pubkey_type)
|
||||
|
||||
if self.gen_addrs:
|
||||
e.addr = ag.to_addr(kg.to_pubhex(e.sec))
|
||||
ph = kg.to_pubhex(e.sec)
|
||||
e.addr = ag.to_addr(ph)
|
||||
if has_viewkey:
|
||||
e.viewkey = ag.to_viewkey(ph)
|
||||
|
||||
if type(self) == PasswordList:
|
||||
e.passwd = unicode(self.make_passwd(e.sec)) # TODO - own type
|
||||
|
|
@ -480,17 +494,16 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
else: # First line with idx
|
||||
out.append(fs.format(e.idx,e.addr,c))
|
||||
if self.has_keys:
|
||||
if self.al_id.mmtype.has_viewkey:
|
||||
out.append(fs.format('','view: '+e.viewkey,c))
|
||||
if opt.b16: out.append(fs.format('', 'hex: '+e.sec,c))
|
||||
out.append(fs.format('', 'wif: '+e.sec.wif,c))
|
||||
out.append(fs.format('','wif: '+e.sec.wif,c))
|
||||
|
||||
out.append('}')
|
||||
self.fmt_data = '\n'.join([l.rstrip() for l in out]) + '\n'
|
||||
|
||||
def parse_file_body(self,lines):
|
||||
|
||||
if self.has_keys and len(lines) % 2:
|
||||
return 'Key-address file has odd number of lines'
|
||||
|
||||
ret = AddrListList()
|
||||
le = self.entry_type
|
||||
|
||||
|
|
@ -498,25 +511,19 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
l = lines.pop(0)
|
||||
d = l.split(None,2)
|
||||
|
||||
if not is_mmgen_idx(d[0]):
|
||||
return "'%s': invalid address num. in line: '%s'" % (d[0],l)
|
||||
|
||||
if not self.check_format(d[1]):
|
||||
return "'{}': invalid {}".format(d[1],self.data_desc)
|
||||
assert is_mmgen_idx(d[0]),"'%s': invalid address num. in line: '%s'" % (d[0],l)
|
||||
assert self.check_format(d[1]),"'{}': invalid {}".format(d[1],self.data_desc)
|
||||
|
||||
if len(d) != 3: d.append('')
|
||||
|
||||
a = le(**{'idx':int(d[0]),self.main_attr:d[1],'label':d[2]})
|
||||
|
||||
if self.has_keys:
|
||||
l = lines.pop(0)
|
||||
d = l.split(None,2)
|
||||
|
||||
if d[0] != 'wif:':
|
||||
return "Invalid key line in file: '{}'".format(l)
|
||||
if not is_wif(d[1]):
|
||||
return "'{}': invalid {} key".format(d[1],g.proto.name.capitalize())
|
||||
|
||||
if self.al_id.mmtype.has_viewkey:
|
||||
d = lines.pop(0).split(None,2)
|
||||
assert d[0] == 'view:',"Invalid line in file: '{}'".format(' '.join(d))
|
||||
a.viewkey = ZcashViewKey(d[1])
|
||||
d = lines.pop(0).split(None,2)
|
||||
assert d[0] == 'wif:',"Invalid line in file: '{}'".format(' '.join(d))
|
||||
a.sec = PrivKey(wif=d[1])
|
||||
|
||||
ret.append(a)
|
||||
|
|
@ -527,36 +534,14 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
llen = len(ret)
|
||||
for n,e in enumerate(ret):
|
||||
msg_r('\rVerifying keys %s/%s' % (n+1,llen))
|
||||
if e.addr != ag.to_addr(kg.to_pubhex(e.sec)):
|
||||
return "Key doesn't match address!\n %s\n %s" % (e.sec.wif,e.addr)
|
||||
assert e.addr == ag.to_addr(kg.to_pubhex(e.sec)),(
|
||||
"Key doesn't match address!\n %s\n %s" % (e.sec.wif,e.addr))
|
||||
msg(' - done')
|
||||
|
||||
return ret
|
||||
|
||||
def parse_file(self,fn,buf=[],exit_on_error=True):
|
||||
|
||||
def do_error(msg):
|
||||
if exit_on_error: die(3,msg)
|
||||
msg(msg)
|
||||
return False
|
||||
|
||||
lines = get_lines_from_file(fn,self.data_desc+' data',trim_comments=True)
|
||||
|
||||
if len(lines) < 3:
|
||||
return do_error("Too few lines in address file (%s)" % len(lines))
|
||||
|
||||
ls = lines[0].split()
|
||||
if not 1 < len(ls) < 5:
|
||||
return do_error("Invalid first line for {} file: '{}'".format(self.gen_desc,lines[0]))
|
||||
if ls.pop() != '{':
|
||||
return do_error("'%s': invalid first line" % ls)
|
||||
if lines[-1] != '}':
|
||||
return do_error("'%s': invalid last line" % lines[-1])
|
||||
|
||||
sid = ls.pop(0)
|
||||
if not is_mmgen_seed_id(sid):
|
||||
return do_error("'%s': invalid Seed ID" % ls[0])
|
||||
|
||||
def parse_addrfile_label(lbl): # we must maintain backwards compat, so parse is tricky
|
||||
al_coin,al_mmtype = None,None
|
||||
lbl = lbl.split(':',1)
|
||||
|
|
@ -570,46 +555,59 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
|
||||
# this block fails if al_mmtype is invalid for g.coin
|
||||
if not al_mmtype:
|
||||
mmtype = MMGenAddrType('E' if al_coin in ('ETH','ETC') else 'L')
|
||||
mmtype = MMGenAddrType('E' if al_coin in ('ETH','ETC') else 'L',on_fail='raise')
|
||||
else:
|
||||
try:
|
||||
mmtype = MMGenAddrType(al_mmtype)
|
||||
mmtype = MMGenAddrType(al_mmtype,on_fail='raise')
|
||||
except:
|
||||
return do_error(u"'{}': invalid address type in address file. Must be one of: {}".format(
|
||||
raise ValueError,(
|
||||
u"'{}': invalid address type in address file. Must be one of: {}".format(
|
||||
mmtype.upper(),' '.join([i['name'].upper() for i in MMGenAddrType.mmtypes.values()])))
|
||||
|
||||
from mmgen.protocol import CoinProtocol
|
||||
base_coin = CoinProtocol(al_coin or 'BTC',testnet=False).base_coin
|
||||
if not base_coin:
|
||||
die(2,"'{}': unknown base coin in address file label!".format(al_coin))
|
||||
return base_coin,mmtype
|
||||
|
||||
def check_coin_mismatch(base_coin): # die if addrfile coin doesn't match g.coin
|
||||
if not base_coin == g.proto.base_coin:
|
||||
die(2,'{} address file format, but base coin is {}!'.format(base_coin,g.proto.base_coin))
|
||||
m = '{} address file format, but base coin is {}!'
|
||||
assert base_coin == g.proto.base_coin, m.format(base_coin,g.proto.base_coin)
|
||||
|
||||
if type(self) == PasswordList and len(ls) == 2:
|
||||
ss = ls.pop().split(':')
|
||||
if len(ss) != 2:
|
||||
return do_error("'%s': invalid password length specifier (must contain colon)" % ls[2])
|
||||
self.set_pw_fmt(ss[0])
|
||||
self.set_pw_len(ss[1])
|
||||
self.pw_id_str = MMGenPWIDString(ls.pop())
|
||||
mmtype = MMGenPasswordType('P')
|
||||
elif len(ls) == 1:
|
||||
base_coin,mmtype = parse_addrfile_label(ls[0])
|
||||
check_coin_mismatch(base_coin)
|
||||
elif len(ls) == 0:
|
||||
base_coin,mmtype = 'BTC',MMGenAddrType('L')
|
||||
check_coin_mismatch(base_coin)
|
||||
else:
|
||||
return do_error(u"Invalid first line for {} file: '{}'".format(self.gen_desc,lines[0]))
|
||||
lines = get_lines_from_file(fn,self.data_desc+' data',trim_comments=True)
|
||||
|
||||
self.al_id = AddrListID(SeedID(sid=sid),mmtype)
|
||||
try:
|
||||
assert len(lines) >= 3, 'Too few lines in address file ({})'.format(len(lines))
|
||||
ls = lines[0].split()
|
||||
assert 1 < len(ls) < 5, "Invalid first line for {} file: '{}'".format(self.gen_desc,lines[0])
|
||||
assert ls.pop() == '{', "'{}': invalid first line".format(ls)
|
||||
assert lines[-1] == '}', "'{}': invalid last line".format(lines[-1])
|
||||
sid = ls.pop(0)
|
||||
assert is_mmgen_seed_id(sid),"'{}': invalid Seed ID".format(ls[0])
|
||||
|
||||
data = self.parse_file_body(lines[1:-1])
|
||||
if not issubclass(type(data),list):
|
||||
return do_error(data)
|
||||
if type(self) == PasswordList and len(ls) == 2:
|
||||
ss = ls.pop().split(':')
|
||||
assert len(ss) == 2,"'{}': invalid password length specifier (must contain colon)".format(ls[2])
|
||||
self.set_pw_fmt(ss[0])
|
||||
self.set_pw_len(ss[1])
|
||||
self.pw_id_str = MMGenPWIDString(ls.pop())
|
||||
mmtype = MMGenPasswordType('P')
|
||||
elif len(ls) == 1:
|
||||
base_coin,mmtype = parse_addrfile_label(ls[0])
|
||||
check_coin_mismatch(base_coin)
|
||||
elif len(ls) == 0:
|
||||
base_coin,mmtype = 'BTC',MMGenAddrType('L')
|
||||
check_coin_mismatch(base_coin)
|
||||
else:
|
||||
raise ValueError,u"'{}': Invalid first line for {} file '{}'".format(lines[0],self.gen_desc,fn)
|
||||
|
||||
self.al_id = AddrListID(SeedID(sid=sid),mmtype)
|
||||
|
||||
data = self.parse_file_body(lines[1:-1])
|
||||
assert issubclass(type(data),list),'Invalid file body data'
|
||||
except Exception as e:
|
||||
m = 'Invalid address list file ({})'.format(e[0])
|
||||
if exit_on_error: die(3,m)
|
||||
msg(msg)
|
||||
return False
|
||||
|
||||
return data
|
||||
|
||||
|
|
|
|||
|
|
@ -117,6 +117,7 @@ opts_data = lambda: {
|
|||
-q, --quiet Produce quieter output
|
||||
-r, --usr-randchars=n Get 'n' characters of additional randomness from
|
||||
user (min={g.min_urandchars}, max={g.max_urandchars})
|
||||
-t, --type=t Specify address type (valid options: 'compressed','segwit','zcash_z')
|
||||
-v, --verbose Produce more verbose output
|
||||
""".format(g=g),
|
||||
'notes': """
|
||||
|
|
@ -146,5 +147,9 @@ if Command not in tool.cmd_data:
|
|||
die(1,"'%s': no such command" % Command.lower())
|
||||
|
||||
args,kwargs = tool.process_args(Command,cmd_args)
|
||||
ret = tool.__dict__[Command](*args,**kwargs)
|
||||
try:
|
||||
ret = tool.__dict__[Command](*args,**kwargs)
|
||||
except Exception as e:
|
||||
die(1,'{}'.format(e))
|
||||
|
||||
sys.exit(0 if ret in (None,True) else 1) # some commands die, some return False on failure
|
||||
|
|
|
|||
22
mmgen/obj.py
22
mmgen/obj.py
|
|
@ -32,6 +32,7 @@ def is_coin_addr(s): return CoinAddr(s,on_fail='silent')
|
|||
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
|
||||
def is_tw_label(s): return TwLabel(s,on_fail='silent')
|
||||
def is_wif(s): return WifKey(s,on_fail='silent')
|
||||
def is_viewkey(s): return ZcashViewKey(s,on_fail='silent')
|
||||
|
||||
class MMGenObject(object):
|
||||
|
||||
|
|
@ -355,6 +356,7 @@ class LTCAmt(BTCAmt): max_amt = 84000000
|
|||
|
||||
class CoinAddr(str,Hilite,InitErrors,MMGenObject):
|
||||
color = 'cyan'
|
||||
hex_width = 40
|
||||
def __new__(cls,s,on_fail='die'):
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
|
|
@ -362,7 +364,7 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
|
|||
try:
|
||||
assert set(s) <= set(ascii_letters+digits),'contains non-ascii characters'
|
||||
me = str.__new__(cls,s)
|
||||
va = g.proto.verify_addr(s,return_dict=True)
|
||||
va = g.proto.verify_addr(s,hex_width=cls.hex_width,return_dict=True)
|
||||
assert va,'failed verification'
|
||||
me.addr_fmt = va['format']
|
||||
me.hex = va['hex']
|
||||
|
|
@ -402,6 +404,8 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
|
|||
d = rpc_init().validateaddress(self)
|
||||
return d['iswatchonly'] and 'account' in d
|
||||
|
||||
class ZcashViewKey(CoinAddr): hex_width = 128
|
||||
|
||||
class SeedID(str,Hilite,InitErrors):
|
||||
color = 'blue'
|
||||
width = 8
|
||||
|
|
@ -569,11 +573,11 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
|
|||
from mmgen.globalvars import g
|
||||
|
||||
if type(s) == cls: return s
|
||||
assert wif or (s and type(compressed) == bool),'Incorrect args for PrivKey()'
|
||||
cls.arg_chk(cls,on_fail)
|
||||
|
||||
if wif:
|
||||
try:
|
||||
assert s == None
|
||||
assert set(wif) <= set(ascii_letters+digits),'not an ascii string'
|
||||
w2h = g.proto.wif2hex(wif) # raises exception on error
|
||||
me = str.__new__(cls,w2h['hex'])
|
||||
|
|
@ -582,15 +586,16 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
|
|||
me.wif = str.__new__(WifKey,wif) # check has been done
|
||||
return me
|
||||
except Exception as e:
|
||||
fs = "Value {!r} cannot be converted to WIF key ({})"
|
||||
return cls.init_fail(fs.format(wif,e[0]),on_fail)
|
||||
fs = "Value {!r} cannot be converted to {} WIF key ({})"
|
||||
return cls.init_fail(fs.format(wif,g.coin,e[0]),on_fail)
|
||||
|
||||
try:
|
||||
assert s and type(compressed) == bool and pubkey_type,'Incorrect args for PrivKey()'
|
||||
assert len(s) == cls.width / 2,'Key length must be {}'.format(cls.width/2)
|
||||
me = str.__new__(cls,g.proto.preprocess_key(s.encode('hex'),pubkey_type))
|
||||
me.compressed = compressed
|
||||
me.pubkey_type = pubkey_type
|
||||
if me.pubkey_type: # skip WIF creation for passwds
|
||||
if pubkey_type != 'password': # skip WIF creation for passwds
|
||||
me.wif = WifKey(g.proto.hex2wif(me,pubkey_type,compressed),on_fail='raise')
|
||||
return me
|
||||
except Exception as e:
|
||||
|
|
@ -710,10 +715,11 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
|
|||
if s in (k,v['name']):
|
||||
if s == v['name']: s = k
|
||||
me = str.__new__(cls,s)
|
||||
assert me in g.proto.mmtypes + ('P',), (
|
||||
"'{}': invalid address type for {}".format(me,g.proto.__name__))
|
||||
for k in ('name','pubkey_type','compressed','gen_method','addr_fmt','desc'):
|
||||
setattr(me,k,v[k])
|
||||
assert me in g.proto.mmtypes + ('P',), (
|
||||
"'{}': invalid address type for {}".format(me.name,g.proto.__name__))
|
||||
me.has_viewkey = me.name == 'zcash_z'
|
||||
return me
|
||||
raise ValueError,'not found'
|
||||
except Exception as e:
|
||||
|
|
@ -728,7 +734,7 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
|
|||
class MMGenPasswordType(MMGenAddrType):
|
||||
mmtypes = {
|
||||
'P': { 'name':'password',
|
||||
'pubkey_type':None,
|
||||
'pubkey_type':'password',
|
||||
'compressed':False,
|
||||
'gen_method':None,
|
||||
'addr_fmt':None,
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@
|
|||
protocol.py: Coin protocol functions, classes and methods
|
||||
"""
|
||||
|
||||
import os,hashlib
|
||||
import sys,os,hashlib
|
||||
from binascii import unhexlify
|
||||
from mmgen.util import msg,pmsg,Msg
|
||||
from mmgen.util import msg,pmsg,Msg,pdie
|
||||
from mmgen.obj import MMGenObject,BTCAmt,LTCAmt,BCHAmt,B2XAmt
|
||||
from mmgen.globalvars import g
|
||||
|
||||
|
|
@ -48,10 +48,8 @@ def _numtob58(num):
|
|||
return ''.join(ret)[::-1]
|
||||
|
||||
def _b58tonum(b58num):
|
||||
b58num = b58num.strip()
|
||||
for i in b58num:
|
||||
if not i in _b58a:
|
||||
raise ValueError,'_b58tonum(): invalid b58 value'
|
||||
if [i for i in b58num if not i in _b58a]:
|
||||
raise ValueError,'_b58tonum(): invalid b58 value'
|
||||
return sum(_b58a.index(n) * (58**i) for i,n in enumerate(list(b58num[::-1])))
|
||||
|
||||
def _b58chk_encode(hexstr):
|
||||
|
|
@ -89,6 +87,7 @@ class BitcoinProtocol(MMGenObject):
|
|||
mmcaps = ('key','addr','rpc')
|
||||
base_coin = 'BTC'
|
||||
addr_width = 34
|
||||
addr_hex_width = 40
|
||||
|
||||
@staticmethod
|
||||
def get_protocol_by_chain(chain):
|
||||
|
|
@ -126,21 +125,7 @@ class BitcoinProtocol(MMGenObject):
|
|||
return { 'hex':key[:64], 'pubkey_type':pubkey_type, 'compressed':compressed }
|
||||
|
||||
@classmethod
|
||||
def wif2hex_old(cls,wif):
|
||||
num = _b58tonum(wif)
|
||||
if num == False: return False
|
||||
key = '{:x}'.format(num)
|
||||
if len(key) not in (74,76): return False
|
||||
compressed = len(key) == 76
|
||||
if compressed and key[66:68] != '01': return False
|
||||
klen = (66,68)[compressed]
|
||||
if (key[:2] == cls.wif_ver_num['std'] and key[klen:] == hash256(key[:klen])[:8]):
|
||||
return { 'hex':key[2:66], 'compressed':compressed }
|
||||
else:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def verify_addr(cls,addr,return_dict=False):
|
||||
def verify_addr(cls,addr,hex_width,return_dict=False):
|
||||
for addr_fmt in cls.addr_ver_num:
|
||||
ver_num,pfx = cls.addr_ver_num[addr_fmt]
|
||||
if type(pfx) == tuple:
|
||||
|
|
@ -150,12 +135,14 @@ class BitcoinProtocol(MMGenObject):
|
|||
if num == False:
|
||||
if g.debug: Msg('Address cannot be converted to base 58')
|
||||
break
|
||||
addr_hex = '{:0{}x}'.format(num,48+len(ver_num))
|
||||
addr_hex = '{:0{}x}'.format(num,len(ver_num)+hex_width+8)
|
||||
# pmsg(hex_width,len(addr_hex),addr_hex[:len(ver_num)],ver_num)
|
||||
if addr_hex[:len(ver_num)] != ver_num: continue
|
||||
if hash256(addr_hex[:-8])[:8] == addr_hex[-8:]:
|
||||
return {
|
||||
'hex': addr_hex[len(ver_num):-8],
|
||||
'format': {'p2pkh':'p2pkh','p2sh':'p2sh','p2sh2':'p2sh','zcash_z':'zcash_z'}[addr_fmt],
|
||||
'format': {'p2pkh':'p2pkh','p2sh':'p2sh','p2sh2':'p2sh',
|
||||
'zcash_z':'zcash_z','viewkey':'viewkey'}[addr_fmt],
|
||||
'width': cls.addr_width
|
||||
} if return_dict else True
|
||||
else:
|
||||
|
|
@ -166,6 +153,7 @@ class BitcoinProtocol(MMGenObject):
|
|||
|
||||
@classmethod
|
||||
def pubhash2addr(cls,pubkey_hash,p2sh):
|
||||
assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
|
||||
s = cls.addr_ver_num[('p2pkh','p2sh')[p2sh]][0] + pubkey_hash
|
||||
lzeroes = (len(s) - len(s.lstrip('0'))) / 2 # non-zero only for ver num '00' (BTC p2pkh)
|
||||
return ('1' * lzeroes) + _b58chk_encode(s)
|
||||
|
|
@ -284,13 +272,19 @@ class EthereumProtocol(BitcoinProtocolAddrgen):
|
|||
return { 'hex':str(wif), 'pubkey_type':'std', 'compressed':False }
|
||||
|
||||
@classmethod
|
||||
def verify_addr(cls,addr,return_dict=False):
|
||||
def verify_addr(cls,addr,hex_width,return_dict=False):
|
||||
from mmgen.util import is_hex_str_lc
|
||||
if is_hex_str_lc(addr) and len(addr) == 40:
|
||||
return { 'hex': addr, 'format': 'ethereum', 'width': cls.addr_width } if return_dict else True
|
||||
if g.debug: Msg("Invalid address '{}'".format(addr))
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def pubhash2addr(cls,pubkey_hash,p2sh):
|
||||
assert len(pubkey_hash) == 40,'{}: invalid length for pubkey hash'.format(len(pubkey_hash))
|
||||
assert not p2sh,'Ethereum has no P2SH address format'
|
||||
return pubkey_hash
|
||||
|
||||
class EthereumTestnetProtocol(EthereumProtocol): pass
|
||||
class EthereumClassicProtocol(EthereumProtocol):
|
||||
name = 'ethereum_classic'
|
||||
|
|
@ -299,10 +293,15 @@ class EthereumClassicTestnetProtocol(EthereumClassicProtocol): pass
|
|||
class ZcashProtocol(BitcoinProtocolAddrgen):
|
||||
name = 'zcash'
|
||||
base_coin = 'ZEC'
|
||||
addr_ver_num = { 'p2pkh': ('1cb8','t1'), 'p2sh': ('1cbd','t3'), 'zcash_z': ('169a','zc') }
|
||||
addr_ver_num = {
|
||||
'p2pkh': ('1cb8','t1'),
|
||||
'p2sh': ('1cbd','t3'),
|
||||
'zcash_z': ('169a','zc'),
|
||||
'viewkey': ('0b1c','V') }
|
||||
wif_ver_num = { 'std': '80', 'zcash_z': 'ab36' }
|
||||
mmtypes = ('C','Z')
|
||||
dfl_mmtype = 'C'
|
||||
addr_hex_width = 40
|
||||
|
||||
@classmethod
|
||||
def preprocess_key(cls,hexpriv,pubkey_type): # zero the first four bits
|
||||
|
|
@ -311,9 +310,23 @@ class ZcashProtocol(BitcoinProtocolAddrgen):
|
|||
else:
|
||||
return hexpriv
|
||||
|
||||
@classmethod
|
||||
def pubhash2addr(cls,pubkey_hash,p2sh):
|
||||
hl = len(pubkey_hash)
|
||||
if hl == 40:
|
||||
return super(cls,cls).pubhash2addr(pubkey_hash,p2sh)
|
||||
elif hl == 128:
|
||||
raise NotImplementedError,'Zcash z-addresses have no pubkey hash'
|
||||
else:
|
||||
raise ValueError,'{}: incorrect pubkey_hash length'.format(hl)
|
||||
|
||||
class ZcashTestnetProtocol(ZcashProtocol):
|
||||
wif_ver_num = { 'std': '??', 'zcash_z': 'ac08' }
|
||||
addr_ver_num = { 'p2pkh': ('??','t1'), 'p2sh': ('??','t3'), 'zcash_z': ('16b6','??') }
|
||||
addr_ver_num = {
|
||||
'p2pkh': ('??','t1'),
|
||||
'p2sh': ('??','t3'),
|
||||
'zcash_z': ('16b6','??'),
|
||||
'viewkey': ('0b2a','??') }
|
||||
|
||||
class DashProtocol(BitcoinProtocolAddrgen):
|
||||
name = 'dash'
|
||||
|
|
@ -342,9 +355,8 @@ class CoinProtocol(MMGenObject):
|
|||
def __new__(cls,coin,testnet):
|
||||
coin = coin.lower()
|
||||
assert type(testnet) == bool
|
||||
if coin not in cls.coins:
|
||||
from mmgen.util import die
|
||||
die(1,"'{}': not a valid coin. Valid choices are '{}'".format(coin,"','".join(cls.coins)))
|
||||
m = "'{}': not a valid coin. Valid choices are '{}'"
|
||||
assert coin in cls.coins,m.format(coin,"','".join(cls.coins))
|
||||
return cls.coins[coin][testnet]
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -41,15 +41,11 @@ class Sha256(object):
|
|||
def getFractionalBits(n):
|
||||
return int((n - int(n)) * 0x100000000)
|
||||
|
||||
def toSigned32(n): return ((n & 0xffffffff) ^ 0x80000000) - 0x80000000
|
||||
|
||||
k = [0] * 64
|
||||
n,nPrime = 2,0
|
||||
while nPrime < 64:
|
||||
if isPrime(n):
|
||||
k[nPrime] = getFractionalBits(math.pow(n, 1.0 / 3))
|
||||
# for testing against signed implementations:
|
||||
# k[nPrime] = toSigned32(getFractionalBits(math.pow(n, 1.0 / 3)))
|
||||
nPrime += 1
|
||||
n += 1
|
||||
|
||||
|
|
@ -75,7 +71,7 @@ class Sha256(object):
|
|||
return self.digest().encode('hex')
|
||||
|
||||
def bytesToWords(self):
|
||||
assert type(self.M) == str
|
||||
assert type(self.M) in (str,list)
|
||||
words = [0] * (len(self.M) / 4 + len(self.M) % 4)
|
||||
b = 0
|
||||
for i in range(len(self.M)):
|
||||
|
|
@ -84,7 +80,7 @@ class Sha256(object):
|
|||
self.M = words
|
||||
|
||||
def wordsToBytes(self):
|
||||
assert type(self.M) == list
|
||||
assert type(self.M) == list and len(self.M) == 8
|
||||
self.M = ''.join([chr((self.M[b >> 5] >> (24 - b % 32)) & 0xff) for b in range(0,len(self.M)*32,8)])
|
||||
|
||||
def preprocessBlock(self):
|
||||
|
|
|
|||
144
mmgen/tool.py
144
mmgen/tool.py
|
|
@ -55,17 +55,17 @@ cmd_data = OrderedDict([
|
|||
('Hexlify', ['<string> [str-]']),
|
||||
('Rand2file', ['<outfile> [str]','<nbytes> [str]','threads [int=4]','silent [bool=False]']),
|
||||
|
||||
('Randwif', ["pubkey_type [str='std']",'compressed [bool=False]']),
|
||||
('Randpair', ["pubkey_type [str='std']",'compressed [bool=False]','segwit [bool=False]']),
|
||||
('Hex2wif', ['<private key in hex format> [str-]',"pubkey_type [str='std']",'compressed [bool=False]']),
|
||||
('Randwif', []),
|
||||
('Randpair', []),
|
||||
('Hex2wif', ['<private key in hex format> [str-]']),
|
||||
('Wif2hex', ['<wif> [str-]']),
|
||||
('Wif2addr', ['<wif> [str-]','segwit [bool=False]']),
|
||||
('Wif2addr', ['<wif> [str-]']),
|
||||
('Wif2segwit_pair',['<wif> [str-]']),
|
||||
('Pubhash2addr', ['<coin address in hex format> [str-]','p2sh [bool=False]']),
|
||||
('Pubhash2addr', ['<coin address in hex format> [str-]']),
|
||||
('Addr2hexaddr', ['<coin address> [str-]']),
|
||||
('Privhex2addr', ['<private key in hex format> [str-]',"pubkey_type [str='std']",'compressed [bool=False]','segwit [bool=False]']),
|
||||
('Privhex2pubhex',['<private key in hex format> [str-]',"pubkey_type [str='std']",'compressed [bool=False]']),
|
||||
('Pubhex2addr', ['<public key in hex format> [str-]','p2sh [bool=False]']), # new
|
||||
('Privhex2addr', ['<private key in hex format> [str-]']),
|
||||
('Privhex2pubhex',['<private key in hex format> [str-]']),
|
||||
('Pubhex2addr', ['<public key in hex format> [str-]']), # new
|
||||
('Pubhex2redeem_script',['<public key in hex format> [str-]']), # new
|
||||
('Wif2redeem_script', ['<private key in WIF format> [str-]']), # new
|
||||
|
||||
|
|
@ -120,9 +120,9 @@ def usage(command):
|
|||
c,h = line.split('-',1)
|
||||
Msg('MMGEN-TOOL {}: {}'.format(c.strip().upper(),h.strip()))
|
||||
cd = cmd_data[Command]
|
||||
msg('USAGE: %s %s %s' % (g.prog_name, command, ' '.join(cd)))
|
||||
msg('USAGE: {} {} {}'.format(g.prog_name,command,' '.join(cd)))
|
||||
else:
|
||||
msg("'%s': no such tool command" % command)
|
||||
msg("'{}': no such tool command".format(command))
|
||||
sys.exit(1)
|
||||
|
||||
Help = usage
|
||||
|
|
@ -138,7 +138,7 @@ def process_args(command,cmd_args):
|
|||
for i in cmd_data[command] if '=' not in i]
|
||||
c_kwargs = dict([[
|
||||
i.split(' [')[0],
|
||||
[i.split(' [')[1].split('=')[0], i.split(' [')[1].split('=')[1][:-1]]
|
||||
[i.split(' [')[1].split('=')[0],i.split(' [')[1].split('=')[1][:-1]]
|
||||
] for i in cmd_data[command] if '=' in i])
|
||||
|
||||
if not margs:
|
||||
|
|
@ -154,8 +154,8 @@ def process_args(command,cmd_args):
|
|||
die(2,'{}: ERROR: no output from previous command in pipe'.format(command.lower()))
|
||||
|
||||
if not margs and len(u_args) < len(c_args):
|
||||
m1 = 'Command requires exactly %s non-keyword argument%s'
|
||||
msg(m1 % (len(c_args),suf(c_args,'s')))
|
||||
m1 = 'Command requires exactly {} non-keyword argument{}'
|
||||
msg(m1.format(len(c_args),suf(c_args,'s')))
|
||||
usage(command)
|
||||
|
||||
extra_args = len(cmd_args) - len(c_args)
|
||||
|
|
@ -171,19 +171,17 @@ def process_args(command,cmd_args):
|
|||
elif extra_args > 0:
|
||||
u_kwargs = dict([a.split('=') for a in cmd_args[len(c_args):] if '=' in a])
|
||||
if len(u_kwargs) != extra_args:
|
||||
msg('Command requires exactly %s non-keyword argument%s'
|
||||
% (len(c_args),suf(c_args,'s')))
|
||||
msg('Command requires exactly {} non-keyword argument{}'.format(len(c_args),suf(c_args,'s')))
|
||||
usage(command)
|
||||
if len(u_kwargs) > len(c_kwargs):
|
||||
msg('Command requires exactly %s keyword argument%s'
|
||||
% (len(c_kwargs),suf(c_kwargs,'s')))
|
||||
msg('Command requires exactly {} keyword argument{}'.format(len(c_kwargs),suf(c_kwargs,'s')))
|
||||
usage(command)
|
||||
|
||||
# mdie(c_args,c_kwargs,u_args,u_kwargs)
|
||||
|
||||
for k in u_kwargs:
|
||||
if k not in c_kwargs:
|
||||
msg("'%s': invalid keyword argument" % k)
|
||||
msg("'{}': invalid keyword argument".format(k))
|
||||
usage(command)
|
||||
|
||||
def conv_type(arg,arg_name,arg_type):
|
||||
|
|
@ -192,13 +190,12 @@ def process_args(command,cmd_args):
|
|||
if arg.lower() in ('true','yes','1','on'): arg = True
|
||||
elif arg.lower() in ('false','no','0','off'): arg = False
|
||||
else:
|
||||
msg("'%s': invalid boolean value for keyword argument" % arg)
|
||||
msg("'{}': invalid boolean value for keyword argument".format(arg))
|
||||
usage(command)
|
||||
try:
|
||||
return __builtins__[arg_type](arg)
|
||||
except:
|
||||
die(1,"'%s': Invalid argument for argument %s ('%s' required)" % \
|
||||
(arg, arg_name, arg_type))
|
||||
die(1,"'{}': Invalid argument for argument {} ('{}' required)".format(arg,arg_name,arg_type))
|
||||
|
||||
if margs:
|
||||
args = [conv_type(u_args[i],c_args[0][0],c_args[0][1]) for i in range(len(u_args))]
|
||||
|
|
@ -219,16 +216,19 @@ def are_equal(a,b,dtype=''):
|
|||
def print_convert_results(indata,enc,dec,dtype):
|
||||
error = (True,False)[are_equal(indata,dec,dtype)]
|
||||
if error or opt.verbose:
|
||||
Msg('Input: %s' % repr(indata))
|
||||
Msg('Encoded data: %s' % repr(enc))
|
||||
Msg('Recoded data: %s' % repr(dec))
|
||||
Msg('Input: {}'.format(repr(indata)))
|
||||
Msg('Encoded data: {}'.format(repr(enc)))
|
||||
Msg('Recoded data: {}'.format(repr(dec)))
|
||||
else: Msg(enc)
|
||||
if error:
|
||||
die(3,"Error! Recoded data doesn't match input!")
|
||||
|
||||
kg = KeyGenerator('std')
|
||||
from mmgen.obj import MMGenAddrType
|
||||
at = MMGenAddrType((hasattr(opt,'type') and opt.type) or g.proto.dfl_mmtype)
|
||||
kg = KeyGenerator(at.pubkey_type)
|
||||
ag = AddrGenerator(at.gen_method)
|
||||
|
||||
def Hexdump(infile, cols=8, line_nums=True):
|
||||
def Hexdump(infile,cols=8,line_nums=True):
|
||||
Msg(pretty_hexdump(
|
||||
get_data_from_file(infile,dash=True,silent=True,binary=True),
|
||||
cols=cols,line_nums=line_nums))
|
||||
|
|
@ -249,60 +249,51 @@ def B58randenc():
|
|||
def Randhex(nbytes='32'):
|
||||
Msg(binascii.hexlify(get_random(int(nbytes))))
|
||||
|
||||
def Randwif(pubkey_type='std',compressed=False):
|
||||
Msg(PrivKey(get_random(32),compressed=compressed,pubkey_type=pubkey_type).wif)
|
||||
def Randwif():
|
||||
Msg(PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed).wif)
|
||||
|
||||
def Randpair(pubkey_type='std',compressed=False,segwit=False):
|
||||
if segwit: compressed = True
|
||||
ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)])
|
||||
privhex = PrivKey(get_random(32),compressed=compressed,pubkey_type=pubkey_type)
|
||||
def Randpair():
|
||||
privhex = PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed)
|
||||
addr = ag.to_addr(kg.to_pubhex(privhex))
|
||||
Vmsg('Key (hex): %s' % privhex)
|
||||
Vmsg('Key (hex): {}'.format(privhex))
|
||||
Vmsg_r('Key (WIF): '); Msg(privhex.wif)
|
||||
Vmsg_r('Addr: '); Msg(addr)
|
||||
|
||||
def Wif2addr(wif,segwit=False):
|
||||
def Wif2addr(wif):
|
||||
privhex = PrivKey(wif=wif)
|
||||
if segwit and not privhex.compressed:
|
||||
die(2,'Segwit addresses must use compressed public keys')
|
||||
ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)])
|
||||
addr = ag.to_addr(kg.to_pubhex(privhex))
|
||||
Vmsg_r('Addr: '); Msg(addr)
|
||||
|
||||
def Wif2segwit_pair(wif):
|
||||
privhex = PrivKey(wif=wif)
|
||||
if not privhex.compressed:
|
||||
die(1,'Segwit address cannot be generated from uncompressed WIF')
|
||||
ag = AddrGenerator('segwit')
|
||||
pubhex = kg.to_pubhex(privhex)
|
||||
pubhex = kg.to_pubhex(PrivKey(wif=wif))
|
||||
addr = ag.to_addr(pubhex)
|
||||
rs = ag.to_segwit_redeem_script(pubhex)
|
||||
Msg('{}\n{}'.format(rs,addr))
|
||||
|
||||
def Pubhash2addr(pubhash,p2sh=False): Msg(g.proto.pubhash2addr(pubhash,p2sh=p2sh))
|
||||
def Addr2hexaddr(addr): Msg(g.proto.verify_addr(addr,return_dict=True)['hex'])
|
||||
def Hash160(pubkeyhex): Msg(hash160(pubkeyhex))
|
||||
def Pubhex2addr(pubkeyhex,p2sh=False): Msg(g.proto.pubhash2addr(hash160(pubkeyhex),p2sh=p2sh))
|
||||
def Wif2hex(wif): Msg(PrivKey(wif=wif))
|
||||
def Pubhash2addr(pubhash):
|
||||
Msg(g.proto.pubhash2addr(pubhash,at.addr_fmt=='p2sh'))
|
||||
|
||||
def Hex2wif(hexpriv,pubkey_type='std',compressed=False):
|
||||
Msg(g.proto.hex2wif(hexpriv,pubkey_type=pubkey_type,compressed=compressed))
|
||||
def Privhex2addr(privhex,pubkey_type='std',compressed=False,segwit=False,output_pubhex=False):
|
||||
if segwit and not compressed:
|
||||
die(1,'Segwit address can be generated only from a compressed pubkey')
|
||||
pk = PrivKey(binascii.unhexlify(privhex),compressed=compressed,pubkey_type=pubkey_type)
|
||||
def Addr2hexaddr(addr): Msg(g.proto.verify_addr(addr,CoinAddr.hex_width,return_dict=True)['hex'])
|
||||
def Hash160(pubkeyhex): Msg(hash160(pubkeyhex))
|
||||
def Pubhex2addr(pubkeyhex): Pubhash2addr(hash160(pubkeyhex))
|
||||
def Wif2hex(wif): Msg(PrivKey(wif=wif))
|
||||
|
||||
def Hex2wif(hexpriv):
|
||||
Msg(g.proto.hex2wif(hexpriv,pubkey_type=at.pubkey_type,compressed=at.compressed))
|
||||
|
||||
def Privhex2addr(privhex,output_pubhex=False):
|
||||
pk = PrivKey(binascii.unhexlify(privhex),compressed=at.compressed,pubkey_type=at.pubkey_type)
|
||||
ph = kg.to_pubhex(pk)
|
||||
ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)])
|
||||
Msg(ph if output_pubhex else ag.to_addr(ph))
|
||||
def Privhex2pubhex(privhex,pubkey_type='std',compressed=False): # new
|
||||
return Privhex2addr(privhex,pubkey_type=pubkey_type,compressed=compressed,output_pubhex=True)
|
||||
|
||||
def Privhex2pubhex(privhex): # new
|
||||
Privhex2addr(privhex,output_pubhex=True)
|
||||
|
||||
def Pubhex2redeem_script(pubhex): # new
|
||||
Msg(g.proto.pubhex2redeem_script(pubhex))
|
||||
|
||||
def Wif2redeem_script(wif): # new
|
||||
privhex = PrivKey(wif=wif)
|
||||
if not privhex.compressed:
|
||||
die(1,'Segwit redeem script cannot be generated from uncompressed WIF')
|
||||
ag = AddrGenerator('segwit')
|
||||
Msg(ag.to_segwit_redeem_script(kg.to_pubhex(privhex)))
|
||||
|
||||
wordlists = 'electrum','tirosh'
|
||||
|
|
@ -310,10 +301,10 @@ dfl_wl_id = 'electrum'
|
|||
|
||||
def do_random_mn(nbytes,wordlist):
|
||||
hexrand = binascii.hexlify(get_random(nbytes))
|
||||
Vmsg('Seed: %s' % hexrand)
|
||||
Vmsg('Seed: {}'.format(hexrand))
|
||||
for wl_id in ([wordlist],wordlists)[wordlist=='all']:
|
||||
if wordlist == 'all':
|
||||
Msg('%s mnemonic:' % (capfirst(wl_id)))
|
||||
Msg('{} mnemonic:'.format(capfirst(wl_id)))
|
||||
mn = baseconv.fromhex(hexrand,wl_id)
|
||||
Msg(' '.join(mn))
|
||||
|
||||
|
|
@ -370,7 +361,7 @@ def Hexreverse(s):
|
|||
def Hexlify(s):
|
||||
Msg(binascii.hexlify(s))
|
||||
|
||||
def Hash256(s, file_input=False, hex_input=False):
|
||||
def Hash256(s,file_input=False,hex_input=False):
|
||||
from hashlib import sha256
|
||||
if file_input: b = get_data_from_file(s,binary=True)
|
||||
elif hex_input: b = decode_pretty_hexdump(s)
|
||||
|
|
@ -381,7 +372,7 @@ def Encrypt(infile,outfile='',hash_preset=''):
|
|||
data = get_data_from_file(infile,'data for encryption',binary=True)
|
||||
enc_d = mmgen_encrypt(data,'user data',hash_preset)
|
||||
if not outfile:
|
||||
outfile = '%s.%s' % (os.path.basename(infile),g.mmenc_ext)
|
||||
outfile = '{}.{}'.format(os.path.basename(infile),g.mmenc_ext)
|
||||
|
||||
write_data_to_file(outfile,enc_d,'encrypted data',binary=True)
|
||||
|
||||
|
|
@ -406,7 +397,7 @@ def Find_incog_data(filename,iv_id,keep_searching=False):
|
|||
f = os.open(filename,flgs)
|
||||
for ch in iv_id:
|
||||
if ch not in '0123456789ABCDEF':
|
||||
die(2,"'%s': invalid Incog ID" % iv_id)
|
||||
die(2,"'{}': invalid Incog ID".format(iv_id))
|
||||
while True:
|
||||
d = os.read(f,bsize)
|
||||
if not d: break
|
||||
|
|
@ -414,17 +405,17 @@ def Find_incog_data(filename,iv_id,keep_searching=False):
|
|||
for i in range(bsize):
|
||||
if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == iv_id:
|
||||
if n+i < ivsize: continue
|
||||
msg('\rIncog data for ID %s found at offset %s' %
|
||||
(iv_id,n+i-ivsize))
|
||||
msg('\rIncog data for ID {} found at offset {}'.format(iv_id,n+i-ivsize))
|
||||
if not keep_searching: sys.exit(0)
|
||||
carry = d[len(d)-ivsize:]
|
||||
n += bsize
|
||||
if not n % mod: msg_r('\rSearched: %s bytes' % n)
|
||||
if not n % mod:
|
||||
msg_r('\rSearched: {} bytes'.format(n))
|
||||
|
||||
msg('')
|
||||
os.close(f)
|
||||
|
||||
def Rand2file(outfile, nbytes, threads=4, silent=False):
|
||||
def Rand2file(outfile,nbytes,threads=4,silent=False):
|
||||
nbytes = parse_nbytes(nbytes)
|
||||
from Crypto import Random
|
||||
rh = Random.new()
|
||||
|
|
@ -443,8 +434,7 @@ def Rand2file(outfile, nbytes, threads=4, silent=False):
|
|||
def encrypt_worker(wid):
|
||||
while True:
|
||||
i,d = q1.get()
|
||||
c = AES.new(key, AES.MODE_CTR,
|
||||
counter=Counter.new(g.aesctr_iv_len*8,initial_value=i))
|
||||
c = AES.new(key,AES.MODE_CTR,counter=Counter.new(g.aesctr_iv_len*8,initial_value=i))
|
||||
enc_data = c.encrypt(d)
|
||||
q2.put(enc_data)
|
||||
q1.task_done()
|
||||
|
|
@ -457,7 +447,7 @@ def Rand2file(outfile, nbytes, threads=4, silent=False):
|
|||
|
||||
q1 = Queue()
|
||||
for i in range(max(1,threads-2)):
|
||||
t = Thread(target=encrypt_worker, args=(i,))
|
||||
t = Thread(target=encrypt_worker,args=(i,))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
|
||||
|
|
@ -473,11 +463,11 @@ def Rand2file(outfile, nbytes, threads=4, silent=False):
|
|||
rbytes -= bsize
|
||||
i += 1
|
||||
if not (bsize*i) % roll:
|
||||
msg_r('\rRead: %s bytes' % (bsize*i))
|
||||
msg_r('\rRead: {} bytes'.format(bsize*i))
|
||||
|
||||
if not silent:
|
||||
msg('\rRead: %s bytes' % nbytes)
|
||||
qmsg("\r%s bytes of random data written to file '%s'" % (nbytes,outfile))
|
||||
msg('\rRead: {} bytes'.format(nbytes))
|
||||
qmsg("\r{} bytes of random data written to file '{}'".format(nbytes,outfile))
|
||||
q1.join()
|
||||
q2.join()
|
||||
f.close()
|
||||
|
|
@ -567,7 +557,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Tr
|
|||
acct_labels = MMGenList([TwLabel(a,on_fail='silent') for a in acct_list])
|
||||
check_dup_mmid(acct_labels)
|
||||
acct_addrs = g.rpch.getaddressesbyaccount([[a] for a in acct_list],batch=True) # use raw list here
|
||||
assert len(acct_list) == len(acct_addrs), 'listaccounts() and getaddressesbyaccount() not equal in length'
|
||||
assert len(acct_list) == len(acct_addrs),'listaccounts() and getaddressesbyaccount() not equal in length'
|
||||
addr_pairs = zip(acct_labels,acct_addrs)
|
||||
check_addr_array_lens(addr_pairs)
|
||||
for label,addr_arr in addr_pairs:
|
||||
|
|
@ -646,7 +636,9 @@ def Getbalance(minconf=1,quiet=False,return_val=False):
|
|||
else:
|
||||
fs = '{:13} {} {} {}'
|
||||
mc,lbl = str(minconf),'confirms'
|
||||
o = [fs.format('Wallet', *[s.ljust(16) for s in ' Unconfirmed',' <%s %s'%(mc,lbl),' >=%s %s'%(mc,lbl)])]
|
||||
o = [fs.format(
|
||||
'Wallet',
|
||||
*[s.ljust(16) for s in ' Unconfirmed',' <{} {}'.format(mc,lbl),' >={} {}'.format(mc,lbl)])]
|
||||
for key in sorted(accts.keys()):
|
||||
o += [fs.format(key+':', *[a.fmt(color=True,suf=' '+g.coin) for a in accts[key]])]
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
# Tested on Linux, MinGW-64
|
||||
# MinGW's bash 3.1.17 doesn't do ${var^^}
|
||||
|
||||
dfl_tests='obj alts misc btc btc_tn btc_rt bch bch_rt ltc ltc_tn ltc_rt tool gen'
|
||||
dfl_tests='obj misc_ni alts misc btc btc_tn btc_rt bch bch_rt ltc ltc_tn ltc_rt tool gen'
|
||||
PROGNAME=$(basename $0)
|
||||
while getopts hinPt OPT
|
||||
do
|
||||
|
|
@ -15,21 +15,22 @@ do
|
|||
echo " '-P' Don't pause between tests"
|
||||
echo " '-t' Print the tests without running them"
|
||||
echo " AVAILABLE TESTS:"
|
||||
echo " obj - data objects"
|
||||
echo " alts - operations for all supported gen-only altcoins"
|
||||
echo " misc - miscellaneous operations"
|
||||
echo " btc - bitcoin"
|
||||
echo " btc_tn - bitcoin testnet"
|
||||
echo " btc_rt - bitcoin regtest"
|
||||
echo " bch - bitcoin cash (BCH)"
|
||||
echo " bch_rt - bitcoin cash (BCH) regtest"
|
||||
# echo " b2x - bitcoin 2x (B2X)"
|
||||
# echo " b2x_rt - bitcoin 2x (B2X) regtest"
|
||||
echo " ltc - litecoin"
|
||||
echo " ltc_tn - litecoin testnet"
|
||||
echo " ltc_rt - litecoin regtest"
|
||||
echo " tool - tooltest (all supported coins)"
|
||||
echo " gen - gentest (all supported coins)"
|
||||
echo " obj - data objects"
|
||||
echo " misc_ni - miscellaneous operations (non-interactive tests)"
|
||||
echo " alts - operations for all supported gen-only altcoins"
|
||||
echo " misc - miscellaneous operations (interactive tests)"
|
||||
echo " btc - bitcoin"
|
||||
echo " btc_tn - bitcoin testnet"
|
||||
echo " btc_rt - bitcoin regtest"
|
||||
echo " bch - bitcoin cash (BCH)"
|
||||
echo " bch_rt - bitcoin cash (BCH) regtest"
|
||||
# echo " b2x - bitcoin 2x (B2X)"
|
||||
# echo " b2x_rt - bitcoin 2x (B2X) regtest"
|
||||
echo " ltc - litecoin"
|
||||
echo " ltc_tn - litecoin testnet"
|
||||
echo " ltc_rt - litecoin regtest"
|
||||
echo " tool - tooltest (all supported coins)"
|
||||
echo " gen - gentest (all supported coins)"
|
||||
echo " By default, all tests are run"
|
||||
exit ;;
|
||||
i) INSTALL_ONLY=1 ;;
|
||||
|
|
@ -98,23 +99,47 @@ f_obj='Data object test complete'
|
|||
|
||||
i_alts='Gen-only altcoin'
|
||||
s_alts='The following tests will test generation operations for all supported altcoins'
|
||||
ROUNDS=100
|
||||
ROUNDS_SPEC=500
|
||||
t_alts=(
|
||||
'test/scrambletest.py'
|
||||
'test/test.py -n altcoin_ref'
|
||||
'test/gentest.py --coin=btc 2:ext 100'
|
||||
'test/gentest.py --coin=ltc 2:ext 100'
|
||||
'test/gentest.py --coin=dash 2:ext 100'
|
||||
'test/gentest.py --coin=zec 2:ext 100'
|
||||
'test/gentest.py --coin=etc 2:ext 100'
|
||||
'test/gentest.py --coin=eth 2:ext 100'
|
||||
'test/gentest.py --coin=zec --type=zcash_z 2:ext 1000')
|
||||
"test/gentest.py --coin=btc 2 $ROUNDS"
|
||||
"test/gentest.py --coin=btc --type=compressed 2 $ROUNDS"
|
||||
"test/gentest.py --coin=btc --type=segwit 2 $ROUNDS"
|
||||
"test/gentest.py --coin=ltc 2 $ROUNDS"
|
||||
"test/gentest.py --coin=ltc --type=compressed 2 $ROUNDS"
|
||||
"test/gentest.py --coin=ltc --type=segwit 2 $ROUNDS"
|
||||
"test/gentest.py --coin=dash 2 $ROUNDS"
|
||||
"test/gentest.py --coin=zec 2 $ROUNDS"
|
||||
"test/gentest.py --coin=etc 2 $ROUNDS"
|
||||
"test/gentest.py --coin=eth 2 $ROUNDS"
|
||||
"test/gentest.py --coin=zec --type=zcash_z 2 $ROUNDS_SPEC"
|
||||
|
||||
"test/gentest.py --coin=btc 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=btc --type=compressed 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=btc --type=segwit 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=ltc 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=ltc --type=compressed 2:ext $ROUNDS"
|
||||
# "test/gentest.py --coin=ltc --type=segwit 2:ext $ROUNDS" # pycoin generates old-style LTC Segwit addrs
|
||||
"test/gentest.py --coin=dash 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=zec 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=etc 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=eth 2:ext $ROUNDS"
|
||||
"test/gentest.py --coin=zec --type=zcash_z 2:ext $ROUNDS_SPEC")
|
||||
f_alts='Gen-only altcoin tests completed'
|
||||
|
||||
i_misc='Miscellaneous operations' # includes autosign!
|
||||
i_misc_ni='Miscellaneous operations (non-interactive)'
|
||||
s_misc_ni='Testing miscellaneous operations (non-interactive)'
|
||||
t_misc_ni=(
|
||||
'test/sha256test.py')
|
||||
f_misc_ni='Miscellaneous non-interactive tests complete'
|
||||
|
||||
i_misc='Miscellaneous operations (interactive)' # includes autosign!
|
||||
s_misc='The bitcoin, bitcoin-abc and litecoin (mainnet) daemons must be running for the following tests'
|
||||
t_misc=(
|
||||
'test/test.py -On misc')
|
||||
f_misc='Miscellaneous operations test complete'
|
||||
f_misc='Miscellaneous interactive tests test complete'
|
||||
|
||||
i_btc='Bitcoin mainnet'
|
||||
s_btc='The bitcoin (mainnet) daemon must both be running for the following tests'
|
||||
|
|
@ -193,16 +218,12 @@ t_tool=(
|
|||
'test/tooltest.py --coin=btc util'
|
||||
'test/tooltest.py --coin=btc cryptocoin'
|
||||
'test/tooltest.py --coin=btc mnemonic'
|
||||
'test/tooltest.py --coin=ltc util'
|
||||
'test/tooltest.py --coin=ltc cryptocoin'
|
||||
'test/tooltest.py --coin=ltc mnemonic'
|
||||
'test/tooltest.py --coin=zec util'
|
||||
'test/tooltest.py --coin=zec cryptocoin'
|
||||
'test/tooltest.py --coin=zec mnemonic'
|
||||
'test/tooltest.py --coin=dash util'
|
||||
'test/tooltest.py --coin=eth cryptocoin'
|
||||
'test/tooltest.py --coin=etc cryptocoin'
|
||||
'test/tooltest.py --coin=dash cryptocoin'
|
||||
'test/tooltest.py --coin=dash mnemonic'
|
||||
)
|
||||
'test/tooltest.py --coin=zec cryptocoin'
|
||||
'test/tooltest.py --coin=zec --type=zcash_z cryptocoin')
|
||||
f_tool='tooltest tests completed'
|
||||
|
||||
i_gen='Gentest'
|
||||
|
|
@ -210,16 +231,16 @@ s_gen='The following tests will run test/gentest.py on mainnet and testnet for a
|
|||
t_gen=(
|
||||
"test/gentest.py -q 2 $REFDIR/btcwallet.dump"
|
||||
'test/gentest.py -q 1:2 10'
|
||||
'test/gentest.py -q --segwit 1:2 10'
|
||||
'test/gentest.py -q --type=segwit 1:2 10'
|
||||
"test/gentest.py -q --testnet=1 2 $REFDIR/btcwallet-testnet.dump"
|
||||
'test/gentest.py -q --testnet=1 1:2 10'
|
||||
'test/gentest.py -q --testnet=1 --segwit 1:2 10'
|
||||
'test/gentest.py -q --testnet=1 --type=segwit 1:2 10'
|
||||
"test/gentest.py -q --coin=ltc 2 $REFDIR/litecoin/ltcwallet.dump"
|
||||
'test/gentest.py -q --coin=ltc 1:2 10'
|
||||
'test/gentest.py -q --coin=ltc --segwit 1:2 10'
|
||||
'test/gentest.py -q --coin=ltc --type=segwit 1:2 10'
|
||||
"test/gentest.py -q --coin=ltc --testnet=1 2 $REFDIR/litecoin/ltcwallet-testnet.dump"
|
||||
'test/gentest.py -q --coin=ltc --testnet=1 1:2 10'
|
||||
'test/gentest.py -q --coin=ltc --testnet=1 --segwit 1:2 10'
|
||||
'test/gentest.py -q --coin=ltc --testnet=1 --type=segwit 1:2 10'
|
||||
)
|
||||
f_gen='gentest tests completed'
|
||||
|
||||
|
|
|
|||
224
test/gentest.py
224
test/gentest.py
|
|
@ -29,6 +29,7 @@ from binascii import hexlify
|
|||
|
||||
# Import these _after_ local path's been added to sys.path
|
||||
from mmgen.common import *
|
||||
from mmgen.obj import MMGenAddrType
|
||||
|
||||
rounds = 100
|
||||
opts_data = lambda: {
|
||||
|
|
@ -38,8 +39,7 @@ opts_data = lambda: {
|
|||
-h, --help Print this help message
|
||||
--, --longhelp Print help message for long options (common options)
|
||||
-q, --quiet Produce quieter output
|
||||
-a, --type= Specify address type (options: 'std','zcash_z')
|
||||
-s, --segwit Generate Segwit (P2SH-P2WPKH) addresses
|
||||
-t, --type=t Specify address type (valid options: 'compressed','segwit','zcash_z')
|
||||
-v, --verbose Produce more verbose output
|
||||
""",
|
||||
'notes': """
|
||||
|
|
@ -73,7 +73,7 @@ cmd_args = opts.init(opts_data,add_opts=['exact_output'])
|
|||
|
||||
if not 1 <= len(cmd_args) <= 2: opts.usage()
|
||||
|
||||
addr_type = opt.type or 'std'
|
||||
addr_type = MMGenAddrType(opt.type or g.proto.dfl_mmtype)
|
||||
|
||||
def pyethereum_sec2addr(sec):
|
||||
return sec,eth.privtoaddr(sec).encode('hex')
|
||||
|
|
@ -81,7 +81,8 @@ def pyethereum_sec2addr(sec):
|
|||
def zcash_mini_sec2addr(sec):
|
||||
p = sp.Popen(['zcash-mini','-key','-simple'],stderr=sp.PIPE,stdin=sp.PIPE,stdout=sp.PIPE)
|
||||
p.stdin.write(sec.wif+'\n')
|
||||
return sec.wif,p.stdout.read().split()[0]
|
||||
o = p.stdout.read().split()
|
||||
return sec.wif,o[0],o[-1]
|
||||
|
||||
def pycoin_sec2addr(sec):
|
||||
if g.testnet: # pycoin/networks/all.py pycoin/networks/legacy_networks.py
|
||||
|
|
@ -92,11 +93,108 @@ def pycoin_sec2addr(sec):
|
|||
if key is None: die(1,"can't parse {}".format(sec))
|
||||
o = pcku.create_output(sec,key)[0]
|
||||
# pmsg(o)
|
||||
suf = ('_uncompressed','')[compressed]
|
||||
suf = ('_uncompressed','')[addr_type.compressed]
|
||||
wif = o['wif{}'.format(suf)]
|
||||
addr = o['{}_address{}'.format(coin,suf)]
|
||||
addr = o['p2sh_segwit' if addr_type.name == 'segwit' else '{}_address{}'.format(coin,suf)]
|
||||
return wif,addr
|
||||
|
||||
def init_external_prog():
|
||||
global b_desc,ext_lib,ext_sec2addr,sp,eth,pcku,PREFIX_TRANSFORMS
|
||||
if addr_type.name == 'zcash_z':
|
||||
import subprocess as sp
|
||||
ext_sec2addr = zcash_mini_sec2addr
|
||||
ext_lib = 'zcash_mini'
|
||||
elif addr_type.name == 'ethereum':
|
||||
try:
|
||||
import ethereum.utils as eth
|
||||
except:
|
||||
die(1,"Unable to import 'pyethereum' module. Is pyethereum installed?")
|
||||
ext_sec2addr = pyethereum_sec2addr
|
||||
ext_lib = 'pyethereum'
|
||||
else:
|
||||
try:
|
||||
import pycoin.cmds.ku as pcku
|
||||
except:
|
||||
die(1,"Unable to import module 'ku'. Is pycoin installed?")
|
||||
PREFIX_TRANSFORMS = pcku.prefix_transforms_for_network(g.coin)
|
||||
ext_sec2addr = pycoin_sec2addr
|
||||
ext_lib = 'pycoin'
|
||||
b_desc = ext_lib
|
||||
|
||||
def match_error(sec,wif,a_addr,b_addr,a,b):
|
||||
qmsg_r(red('\nERROR: Values do not match!'))
|
||||
die(3,"""
|
||||
sec key : {}
|
||||
WIF key : {}
|
||||
{a:10}: {}
|
||||
{b:10}: {}
|
||||
""".format(sec,wif,a_addr,b_addr,pnm=g.proj_name,a=kg_a.desc,b=b_desc).rstrip())
|
||||
|
||||
def compare_test():
|
||||
m = "Comparing address generators '{}' and '{}' for coin {}"
|
||||
last_t = time.time()
|
||||
qmsg(green(m.format(kg_a.desc,(ext_lib if b == 'ext' else kg_b.desc),g.coin)))
|
||||
|
||||
for i in range(rounds):
|
||||
if opt.verbose or time.time() - last_t >= 0.1:
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
last_t = time.time()
|
||||
sec = PrivKey(os.urandom(32),compressed=addr_type.compressed,pubkey_type=addr_type.pubkey_type)
|
||||
ph = kg_a.to_pubhex(sec)
|
||||
a_addr = ag.to_addr(ph)
|
||||
if opt.type == 'zcash_z':
|
||||
a_vk = ag.to_viewkey(ph)
|
||||
if b == 'ext':
|
||||
if opt.type == 'zcash_z':
|
||||
b_wif,b_addr,b_vk = ext_sec2addr(sec)
|
||||
if b_vk != a_vk:
|
||||
match_error(sec,sec.wif,a_vk,b_vk,a,b)
|
||||
else:
|
||||
b_wif,b_addr = ext_sec2addr(sec)
|
||||
if b_wif != sec.wif:
|
||||
match_error(sec,sec.wif,sec.wif,b_wif,a,b)
|
||||
else:
|
||||
b_addr = ag.to_addr(kg_b.to_pubhex(sec))
|
||||
vmsg('\nkey: %s\naddr: %s\n' % (sec.wif,a_addr))
|
||||
if a_addr != b_addr:
|
||||
match_error(sec,sec.wif,a_addr,b_addr,a,ext_lib if b == 'ext' else b)
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))
|
||||
|
||||
def speed_test():
|
||||
m = "Testing speed of address generator '{}' for coin {}"
|
||||
qmsg(green(m.format(kg_a.desc,g.coin)))
|
||||
from struct import pack,unpack
|
||||
seed = os.urandom(28)
|
||||
print 'Incrementing key with each round'
|
||||
print 'Starting key:', hexlify(seed+pack('I',0))
|
||||
import time
|
||||
start = last_t = time.time()
|
||||
|
||||
for i in range(rounds):
|
||||
if time.time() - last_t >= 0.1:
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
last_t = time.time()
|
||||
sec = PrivKey(seed+pack('I',i),compressed=addr_type.compressed,pubkey_type=addr_type.pubkey_type)
|
||||
a_addr = ag.to_addr(kg_a.to_pubhex(sec))
|
||||
vmsg('\nkey: %s\naddr: %s\n' % (sec.wif,a_addr))
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
qmsg('\n{} addresses generated in {:.2f} seconds'.format(rounds,time.time()-start))
|
||||
|
||||
def dump_test():
|
||||
m = "Comparing output of address generator '{}' against wallet dump '{}'"
|
||||
qmsg(green(m.format(kg_a.desc,cmd_args[1])))
|
||||
for n,[wif,a_addr] in enumerate(dump,1):
|
||||
qmsg_r('\rKey %s/%s ' % (n,len(dump)))
|
||||
try:
|
||||
sec = PrivKey(wif=wif)
|
||||
except:
|
||||
die(2,'\nInvalid {}net WIF address in dump file: {}'.format(('main','test')[g.testnet],wif))
|
||||
b_addr = ag.to_addr(kg_a.to_pubhex(sec))
|
||||
if a_addr != b_addr:
|
||||
match_error(sec,wif,a_addr,b_addr,3,a)
|
||||
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))
|
||||
|
||||
urounds,fh = None,None
|
||||
dump = []
|
||||
if len(cmd_args) == 2:
|
||||
|
|
@ -117,6 +215,7 @@ if len(cmd_args) == 2:
|
|||
if urounds: rounds = urounds
|
||||
|
||||
a,b = None,None
|
||||
b_desc = 'unknown'
|
||||
try:
|
||||
a,b = cmd_args[0].split(':')
|
||||
except:
|
||||
|
|
@ -131,25 +230,7 @@ else:
|
|||
a = int(a)
|
||||
assert 1 <= a <= len(g.key_generators)
|
||||
if b == 'ext':
|
||||
if addr_type == 'zcash_z':
|
||||
import subprocess as sp
|
||||
ext_sec2addr = zcash_mini_sec2addr
|
||||
ext_lib = 'zcash_mini'
|
||||
elif g.coin in ('ETH','ETC'):
|
||||
try:
|
||||
import ethereum.utils as eth
|
||||
except:
|
||||
die(1,"Unable to import 'pyethereum' module. Is pyethereum installed?")
|
||||
ext_sec2addr = pyethereum_sec2addr
|
||||
ext_lib = 'pyethereum'
|
||||
else:
|
||||
try:
|
||||
import pycoin.cmds.ku as pcku
|
||||
except:
|
||||
die(1,"Unable to import module 'ku'. Is pycoin installed?")
|
||||
PREFIX_TRANSFORMS = pcku.prefix_transforms_for_network(g.coin)
|
||||
ext_sec2addr = pycoin_sec2addr
|
||||
ext_lib = 'pycoin'
|
||||
init_external_prog()
|
||||
else:
|
||||
b = int(b)
|
||||
assert 1 <= b <= len(g.key_generators)
|
||||
|
|
@ -157,90 +238,21 @@ else:
|
|||
except:
|
||||
die(1,"%s: invalid generator IDs" % cmd_args[0])
|
||||
|
||||
def match_error(sec,wif,a_addr,b_addr,a,b):
|
||||
m = ['','py-ecdsa','secp256k1','dump']
|
||||
qmsg_r(red('\nERROR: Addresses do not match!'))
|
||||
die(3,"""
|
||||
sec key : {}
|
||||
WIF key : {}
|
||||
{a:10}: {}
|
||||
{b:10}: {}
|
||||
""".format(sec,wif,a_addr,b_addr,pnm=g.proj_name,a=m[a],b=m[b] if b in m else b).rstrip())
|
||||
|
||||
# Begin execution
|
||||
no_compressed = g.coin in ('ETH','ETC') or addr_type == 'zcash_z'
|
||||
no_uncompressed = opt.segwit or g.coin == 'DASH' or (g.coin=='ZEC' and addr_type == 'std')
|
||||
switch_compressed = not no_compressed and not no_uncompressed
|
||||
compressed = not no_compressed
|
||||
|
||||
from mmgen.addr import KeyGenerator,AddrGenerator
|
||||
from mmgen.obj import PrivKey
|
||||
ag = AddrGenerator(
|
||||
'ethereum' if g.coin in ('ETH','ETC')
|
||||
else 'zcash_z' if addr_type == 'zcash_z'
|
||||
else ('p2pkh','segwit')[bool(opt.segwit)])
|
||||
|
||||
kg_a = KeyGenerator(addr_type.pubkey_type,a)
|
||||
ag = AddrGenerator(addr_type.gen_method)
|
||||
|
||||
if a and b:
|
||||
m = "Comparing address generators '{}' and '{}' for coin {}"
|
||||
last_t = time.time()
|
||||
kg_a = KeyGenerator(addr_type,a)
|
||||
if b != 'ext': kg_b = KeyGenerator(addr_type,b)
|
||||
qmsg(green(m.format(kg_a.desc,(ext_lib if b == 'ext' else kg_b.desc),g.coin)))
|
||||
|
||||
for i in range(rounds):
|
||||
if opt.verbose or time.time() - last_t >= 0.1:
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
last_t = time.time()
|
||||
sec = PrivKey(os.urandom(32),compressed=compressed,pubkey_type=addr_type)
|
||||
a_addr = ag.to_addr(kg_a.to_pubhex(sec))
|
||||
if b == 'ext':
|
||||
b_wif,b_addr = ext_sec2addr(sec)
|
||||
if b_wif != sec.wif:
|
||||
match_error(sec,sec.wif,sec.wif,b_wif,a,b)
|
||||
else:
|
||||
b_addr = ag.to_addr(kg_b.to_pubhex(sec))
|
||||
vmsg('\nkey: %s\naddr: %s\n' % (sec.wif,a_addr))
|
||||
if a_addr != b_addr:
|
||||
match_error(sec,sec.wif,a_addr,b_addr,a,ext_lib if b == 'ext' else b)
|
||||
if switch_compressed:
|
||||
compressed = not compressed
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
|
||||
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))
|
||||
if b != 'ext':
|
||||
kg_b = KeyGenerator(addr_type.pubkey_type,b)
|
||||
b_desc = kg_b.desc
|
||||
compare_test()
|
||||
elif a and not fh:
|
||||
kg = KeyGenerator(addr_type,a)
|
||||
m = "Testing speed of address generator '{}' for coin {}"
|
||||
qmsg(green(m.format(kg.desc,g.coin)))
|
||||
from struct import pack,unpack
|
||||
seed = os.urandom(28)
|
||||
print 'Incrementing key with each round'
|
||||
print 'Starting key:', hexlify(seed+pack('I',0))
|
||||
import time
|
||||
start = last_t = time.time()
|
||||
|
||||
for i in range(rounds):
|
||||
if time.time() - last_t >= 0.1:
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
last_t = time.time()
|
||||
sec = PrivKey(seed+pack('I',i),compressed=compressed,pubkey_type=addr_type)
|
||||
a_addr = ag.to_addr(kg.to_pubhex(sec))
|
||||
vmsg('\nkey: %s\naddr: %s\n' % (sec.wif,a_addr))
|
||||
if switch_compressed:
|
||||
compressed = not compressed
|
||||
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
|
||||
|
||||
qmsg('\n{} addresses generated in {:.2f} seconds'.format(rounds,time.time()-start))
|
||||
speed_test()
|
||||
elif a and dump:
|
||||
kg = KeyGenerator(addr_type,a)
|
||||
m = "Comparing output of address generator '{}' against wallet dump '{}'"
|
||||
qmsg(green(m.format(kg.desc,cmd_args[1])))
|
||||
for n,[wif,a_addr] in enumerate(dump,1):
|
||||
qmsg_r('\rKey %s/%s ' % (n,len(dump)))
|
||||
try:
|
||||
sec = PrivKey(wif=wif)
|
||||
except:
|
||||
die(2,'\nInvalid {}net WIF address in dump file: {}'.format(('main','test')[g.testnet],wif))
|
||||
b_addr = ag.to_addr(kg.to_pubhex(sec))
|
||||
if a_addr != b_addr:
|
||||
match_error(sec,wif,a_addr,b_addr,3,a)
|
||||
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))
|
||||
b_desc = 'dump'
|
||||
dump_test()
|
||||
else:
|
||||
die(2,'Illegal invocation')
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ def run_test(test,arg,input_data):
|
|||
if not opt.super_silent:
|
||||
msg(u'==> {}'.format(ret))
|
||||
if opt.verbose and issubclass(cls,MMGenObject):
|
||||
ret.pmsg()
|
||||
ret.pmsg() if hasattr(ret,'pmsg') else pmsg(ret)
|
||||
except SystemExit as e:
|
||||
if input_data == 'good':
|
||||
raise ValueError,'Error on good input data'
|
||||
|
|
@ -209,8 +209,8 @@ tests = OrderedDict([
|
|||
{'wif':'cSaJAXBAm9ooHpVJgoxqjDG3AcareFy29Cz8mhnNTRijjv2HLgta',
|
||||
'ret':'94fa8b90c11fea8fb907c9376b919534b0a75b9a9621edf71a78753544b4101c'})),
|
||||
}[g.coin.lower()][bool(g.testnet)],
|
||||
{'s':r32,'compressed':False,'ret':hexlify(r32)},
|
||||
{'s':r32,'compressed':True,'ret':hexlify(r32)}
|
||||
{'s':r32,'compressed':False,'pubkey_type':'std','ret':hexlify(r32)},
|
||||
{'s':r32,'compressed':True,'pubkey_type':'std','ret':hexlify(r32)}
|
||||
)
|
||||
}),
|
||||
('AddrListID', { # a rather pointless test, but do it anyway
|
||||
|
|
|
|||
Binary file not shown.
46
test/sha256test.py
Executable file
46
test/sha256test.py
Executable file
|
|
@ -0,0 +1,46 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import sys,os,hashlib
|
||||
from mmgen.sha256 import Sha256
|
||||
|
||||
def msg(s): sys.stderr.write(s)
|
||||
def green(s): return '\033[32;1m' + s + '\033[0m'
|
||||
|
||||
def compare_hashes(dlen,data):
|
||||
sha2 = hashlib.sha256(data).hexdigest()
|
||||
# msg('Dlen {:<5} {}\r'.format(dlen,sha2))
|
||||
my_sha2 = Sha256(data).hexdigest()
|
||||
assert my_sha2 == sha2,'Hashes do not match!'
|
||||
|
||||
def test_K():
|
||||
msg('Testing generated constants: ')
|
||||
K_ref = [1116352408,1899447441,-1245643825,-373957723,961987163,1508970993,-1841331548,-1424204075,-670586216,310598401,607225278,1426881987,1925078388,-2132889090,-1680079193,-1046744716,-459576895,-272742522,264347078,604807628,770255983,1249150122,1555081692,1996064986,-1740746414,-1473132947,-1341970488,-1084653625,-958395405,-710438585,113926993,338241895,666307205,773529912,1294757372,1396182291,1695183700,1986661051,-2117940946,-1838011259,-1564481375,-1474664885,-1035236496,-949202525,-778901479,-694614492,-200395387,275423344,430227734,506948616,659060556,883997877,958139571,1322822218,1537002063,1747873779,1955562222,2024104815,-2067236844,-1933114872,-1866530822,-1538233109,-1090935817,-965641998]
|
||||
def toSigned32(n): return ((n & 0xffffffff) ^ 0x80000000) - 0x80000000
|
||||
K_sig = [toSigned32(n) for n in Sha256.K]
|
||||
assert K_sig == K_ref,'Generated constants in K[] differ from reference value'
|
||||
msg('OK\n')
|
||||
|
||||
def test_ref():
|
||||
inputs = (
|
||||
'','x','xa','the','the quick','the quick brown fox',
|
||||
'\x00','\x00\x00','\x00'*256,'\x00'*512,'\x00'*511,'\x00'*513,
|
||||
'\x0f','\x0f\x0f','\x0f'*256,'\x0f'*512,'\x0f'*511,'\x0f'*513,
|
||||
'\x0f\x0d','\x0e\x0e'*256,'\x00\x0f'*512,'\x0e\x0f'*511,'\x0a\x0d'*513
|
||||
)
|
||||
for i,data in enumerate(inputs):
|
||||
msg('\rTesting reference input data: {:4}/{} '.format(i+1,len(inputs)))
|
||||
compare_hashes(len(data),data)
|
||||
msg('OK\n')
|
||||
|
||||
def test_random(rounds):
|
||||
for i in range(rounds):
|
||||
if not (i+1) % 10:
|
||||
msg('\rTesting random input data: {:4}/{} '.format(i+1,rounds))
|
||||
dlen = int(os.urandom(4).encode('hex'),16) >> 18
|
||||
compare_hashes(dlen,os.urandom(dlen))
|
||||
msg('OK\n')
|
||||
|
||||
msg(green('Testing MMGen implementation of Sha256()\n'))
|
||||
test_K()
|
||||
test_ref()
|
||||
test_random(500)
|
||||
16
test/test.py
16
test/test.py
|
|
@ -1085,7 +1085,7 @@ def create_tx_data(sources):
|
|||
return ad,tx_data
|
||||
|
||||
def make_txcreate_cmdline(tx_data):
|
||||
privkey = PrivKey(os.urandom(32),compressed=True)
|
||||
privkey = PrivKey(os.urandom(32),compressed=True,pubkey_type='std')
|
||||
t = ('p2pkh','segwit')['S' in g.proto.mmtypes]
|
||||
coinaddr = AddrGenerator(t).to_addr(KeyGenerator('std').to_pubhex(privkey))
|
||||
|
||||
|
|
@ -2046,9 +2046,9 @@ class MMGenTestSuite(object):
|
|||
t.passphrase(w,ref_kafile_pass)
|
||||
t.expect('Check key-to-address validity? (y/N): ','y')
|
||||
o = t.read().strip().split('\n')[-1]
|
||||
rc = cfg[ 'ref_' + ftype + 'file_chksum' +
|
||||
('_'+coin.lower() if coin else '') +
|
||||
('_'+mmtype if mmtype else '')]
|
||||
rc = cfg[ 'ref_' + ftype + 'file_chksum' +
|
||||
('_'+coin.lower() if coin else '') +
|
||||
('_'+mmtype if mmtype else '')]
|
||||
ref_chksum = rc if (ftype == 'passwd' or coin) else rc[g.proto.base_coin.lower()][g.testnet]
|
||||
cmp_or_die(ref_chksum,o)
|
||||
|
||||
|
|
@ -2412,8 +2412,10 @@ class MMGenTestSuite(object):
|
|||
@staticmethod
|
||||
def gen_pairs(n):
|
||||
return [subprocess.check_output(
|
||||
['python',os.path.join('cmds','mmgen-tool'),'--testnet=1','-r0','randpair','compressed={}'.format((i+1)%2)]).split()
|
||||
for i in range(n)]
|
||||
['python',os.path.join('cmds','mmgen-tool'),'--testnet=1'] +
|
||||
([],['--type=compressed'])[bool((i+1)%2)] +
|
||||
['-r0','randpair']
|
||||
).split() for i in range(n)]
|
||||
|
||||
def regtest_bob_pre_import(self,name):
|
||||
pairs = self.gen_pairs(5)
|
||||
|
|
@ -2615,7 +2617,7 @@ class MMGenTestSuite(object):
|
|||
sid = cfg['seed_id']
|
||||
psave = g.proto
|
||||
g.proto = CoinProtocol(g.coin,True)
|
||||
privhex = PrivKey(os.urandom(32),compressed=True)
|
||||
privhex = PrivKey(os.urandom(32),compressed=True,pubkey_type='std')
|
||||
addr = AddrGenerator('p2pkh').to_addr(KeyGenerator('std').to_pubhex(privhex))
|
||||
g.proto = psave
|
||||
outputs_cl = [sid+':{}:3,1.1234'.format(g.proto.mmtypes[-1]), sid+':C:5,5.5555',sid+':L:4',addr+',100']
|
||||
|
|
|
|||
153
test/tooltest.py
153
test/tooltest.py
|
|
@ -28,6 +28,29 @@ sys.path.__setitem__(0,repo_root)
|
|||
# Import this _after_ local path's been added to sys.path
|
||||
from mmgen.common import *
|
||||
|
||||
opts_data = lambda: {
|
||||
'desc': "Test suite for the 'mmgen-tool' utility",
|
||||
'usage':'[options] [command]',
|
||||
'options': """
|
||||
-h, --help Print this help message
|
||||
--, --longhelp Print help message for long options (common options)
|
||||
-l, --list-cmds List and describe the tests and commands in this test suite
|
||||
-L, --list-names List the names of all tested 'mmgen-tool' commands
|
||||
-s, --system Test scripts and modules installed on system rather than
|
||||
those in the repo root
|
||||
-t, --type=t Specify address type (valid options: 'zcash_z')
|
||||
-v, --verbose Produce more verbose output
|
||||
""",
|
||||
'notes': """
|
||||
|
||||
If no command is given, the whole suite of tests is run.
|
||||
"""
|
||||
}
|
||||
|
||||
sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
|
||||
|
||||
cmd_args = opts.init(opts_data,add_opts=['exact_output','profile'])
|
||||
|
||||
from collections import OrderedDict
|
||||
cmd_data = OrderedDict([
|
||||
('util', {
|
||||
|
|
@ -64,17 +87,19 @@ cmd_data = OrderedDict([
|
|||
|
||||
('Privhex2pubhex', ('Wif2hex','o3')),
|
||||
('Pubhex2addr', ('Privhex2pubhex','o3')),
|
||||
('Pubhex2redeem_script', ('Privhex2pubhex','o3')),
|
||||
('Wif2redeem_script', ('Randpair','o3')),
|
||||
('Wif2segwit_pair', ('Randpair','o2')),
|
||||
|
||||
('Privhex2addr', ('Wif2hex','o3')), # compare with output of Randpair
|
||||
('Hex2wif', ('Wif2hex','io2')),
|
||||
('Addr2hexaddr', ('Randpair','o2')),
|
||||
('Pubhash2addr', ('Addr2hexaddr','io2')),
|
||||
|
||||
('Pipetest', ('Randpair','o3')),
|
||||
])
|
||||
('Addr2hexaddr', ('Randpair','o2'))] + # TODO: Hexaddr2addr
|
||||
([],[
|
||||
('Pubhash2addr', ('Addr2hexaddr','io2'))
|
||||
])[opt.type != 'zcash_z'] +
|
||||
([],[
|
||||
('Pubhex2redeem_script', ('Privhex2pubhex','o3')),
|
||||
('Wif2redeem_script', ('Randpair','o3')),
|
||||
('Wif2segwit_pair', ('Randpair','o2')),
|
||||
('Privhex2addr', ('Wif2hex','o3')), # compare with output of Randpair
|
||||
('Pipetest', ('Randpair','o3'))
|
||||
])[g.coin in ('BTC','LTC')]
|
||||
)
|
||||
}
|
||||
),
|
||||
('mnemonic', {
|
||||
|
|
@ -93,7 +118,7 @@ cmd_data = OrderedDict([
|
|||
('rpc', {
|
||||
'desc': 'Coin daemon RPC commands',
|
||||
'cmd_data': OrderedDict([
|
||||
# ('keyaddrfile_chksum', ()), # interactive
|
||||
# ('Keyaddrfile_chksum', ()), # interactive
|
||||
('Addrfile_chksum', ()),
|
||||
('Getbalance', ()),
|
||||
('Listaddresses', ()),
|
||||
|
|
@ -122,28 +147,6 @@ cfg = {
|
|||
}
|
||||
}
|
||||
|
||||
opts_data = lambda: {
|
||||
'desc': "Test suite for the 'mmgen-tool' utility",
|
||||
'usage':'[options] [command]',
|
||||
'options': """
|
||||
-h, --help Print this help message
|
||||
--, --longhelp Print help message for long options (common options)
|
||||
-l, --list-cmds List and describe the tests and commands in this test suite
|
||||
-L, --list-names List the names of all tested 'mmgen-tool' commands
|
||||
-s, --system Test scripts and modules installed on system rather than
|
||||
those in the repo root
|
||||
-v, --verbose Produce more verbose output
|
||||
""",
|
||||
'notes': """
|
||||
|
||||
If no command is given, the whole suite of tests is run.
|
||||
"""
|
||||
}
|
||||
|
||||
sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
|
||||
|
||||
cmd_args = opts.init(opts_data,add_opts=['exact_output','profile'])
|
||||
|
||||
ref_subdir = '' if g.proto.base_coin == 'BTC' else g.proto.name
|
||||
altcoin_pfx = '' if g.proto.base_coin == 'BTC' else '-'+g.proto.base_coin
|
||||
tn_ext = ('','.testnet')[g.testnet]
|
||||
|
|
@ -158,7 +161,7 @@ spawn_cmd = ([],['python'])[g.platform == 'win'] + [mmgen_cmd]
|
|||
|
||||
add_spawn_args = ['--data-dir='+cfg['tmpdir']] + ['--{}{}'.format(
|
||||
k.replace('_','-'),'='+getattr(opt,k) if getattr(opt,k) != True else '')
|
||||
for k in ('testnet','rpc_host','regtest','coin') if getattr(opt,k)]
|
||||
for k in ('testnet','rpc_host','regtest','coin','type') if getattr(opt,k)]
|
||||
|
||||
if opt.list_cmds:
|
||||
fs = ' {:<{w}} - {}'
|
||||
|
|
@ -191,6 +194,11 @@ def test_msg(m):
|
|||
m2 = 'Testing {}'.format(m)
|
||||
msg_r(green(m2+'\n') if opt.verbose else '{:{w}}'.format(m2,w=msg_w+8))
|
||||
|
||||
maybe_compressed = ('','compressed')['C' in g.proto.mmtypes]
|
||||
maybe_segwit = ('','segwit')['S' in g.proto.mmtypes]
|
||||
maybe_type_compressed = ([],['--type=compressed'])['C' in g.proto.mmtypes]
|
||||
maybe_type_segwit = ([],['--type=segwit'])['S' in g.proto.mmtypes]
|
||||
|
||||
class MMGenToolTestSuite(object):
|
||||
|
||||
def __init__(self):
|
||||
|
|
@ -201,13 +209,8 @@ class MMGenToolTestSuite(object):
|
|||
if cdata:
|
||||
name,code = cdata
|
||||
io,count = (code[:-1],int(code[-1])) if code[-1] in '0123456789' else (code,1)
|
||||
|
||||
for c in range(count):
|
||||
fns += ['%s%s%s' % (
|
||||
name,
|
||||
('',c+1)[count > 1],
|
||||
('.out','.in')[ch=='i']
|
||||
) for ch in io]
|
||||
fns += ['{}{}{}'.format(name,('',c+1)[count > 1],('.out','.in')[ch=='i']) for ch in io]
|
||||
return fns
|
||||
|
||||
def get_num_exts_for_cmd(self,cmd,dpy): # dpy required here
|
||||
|
|
@ -225,16 +228,18 @@ class MMGenToolTestSuite(object):
|
|||
file_list = [os.path.join(cfg['tmpdir'],fn) for fn in fns]
|
||||
self.__class__.__dict__[cmd](*([self,cmd] + file_list))
|
||||
|
||||
def run_cmd(self,name,tool_args,kwargs='',extra_msg='',silent=False,strip=True):
|
||||
def run_cmd(self,name,tool_args,kwargs='',extra_msg='',silent=False,strip=True,add_opts=[]):
|
||||
sys_cmd = (
|
||||
spawn_cmd +
|
||||
add_spawn_args +
|
||||
['-r0','-d',cfg['tmpdir'],name.lower()] +
|
||||
['-r0','-d',cfg['tmpdir']] +
|
||||
add_opts +
|
||||
[name.lower()] +
|
||||
tool_args +
|
||||
kwargs.split()
|
||||
)
|
||||
if extra_msg: extra_msg = '({})'.format(extra_msg)
|
||||
full_name = ' '.join([name.lower()]+kwargs.split()+extra_msg.split())
|
||||
full_name = ' '.join([name.lower()]+add_opts+kwargs.split()+extra_msg.split())
|
||||
if not silent:
|
||||
if opt.verbose:
|
||||
sys.stderr.write(green('Testing {}\nExecuting '.format(full_name)))
|
||||
|
|
@ -250,10 +255,10 @@ class MMGenToolTestSuite(object):
|
|||
die(1,red('Called process returned with an error (retcode %s)' % retcode))
|
||||
return (a,a.rstrip())[bool(strip)]
|
||||
|
||||
def run_cmd_chk(self,name,f1,f2,kwargs='',extra_msg='',strip_hex=False):
|
||||
def run_cmd_chk(self,name,f1,f2,kwargs='',extra_msg='',strip_hex=False,add_opts=[]):
|
||||
idata = read_from_file(f1).rstrip()
|
||||
odata = read_from_file(f2).rstrip()
|
||||
ret = self.run_cmd(name,[odata],kwargs=kwargs,extra_msg=extra_msg)
|
||||
ret = self.run_cmd(name,[odata],kwargs=kwargs,extra_msg=extra_msg,add_opts=add_opts)
|
||||
vmsg('In: ' + repr(odata))
|
||||
vmsg('Out: ' + repr(ret))
|
||||
def cmp_equal(a,b):
|
||||
|
|
@ -264,16 +269,16 @@ class MMGenToolTestSuite(object):
|
|||
"Error: values don't match:\nIn: %s\nOut: %s" % (repr(idata),repr(ret))))
|
||||
return ret
|
||||
|
||||
def run_cmd_nochk(self,name,f1,kwargs=''):
|
||||
def run_cmd_nochk(self,name,f1,kwargs='',add_opts=[]):
|
||||
odata = read_from_file(f1).rstrip()
|
||||
ret = self.run_cmd(name,[odata],kwargs=kwargs)
|
||||
ret = self.run_cmd(name,[odata],kwargs=kwargs,add_opts=add_opts)
|
||||
vmsg('In: ' + repr(odata))
|
||||
vmsg('Out: ' + repr(ret))
|
||||
return ret
|
||||
|
||||
def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='',literal=False,chkdata='',hush=False):
|
||||
def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='',literal=False,chkdata='',hush=False,add_opts=[]):
|
||||
if carg: write_to_tmpfile(cfg,'%s%s.in' % (name,fn_idx),carg+'\n')
|
||||
ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs,extra_msg=extra_msg)
|
||||
ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs,extra_msg=extra_msg,add_opts=add_opts)
|
||||
if carg: vmsg('In: ' + repr(carg))
|
||||
vmsg('Out: ' + (repr(ret),ret.decode('utf8'))[literal])
|
||||
if ret or ret == '':
|
||||
|
|
@ -287,11 +292,11 @@ class MMGenToolTestSuite(object):
|
|||
else:
|
||||
die(3,red("Error for command '%s'" % name))
|
||||
|
||||
def run_cmd_randinput(self,name,strip=True):
|
||||
def run_cmd_randinput(self,name,strip=True,add_opts=[]):
|
||||
s = os.urandom(128)
|
||||
fn = name+'.in'
|
||||
write_to_tmpfile(cfg,fn,s,binary=True)
|
||||
ret = self.run_cmd(name,[get_tmpfile_fn(cfg,fn)],strip=strip)
|
||||
ret = self.run_cmd(name,[get_tmpfile_fn(cfg,fn)],strip=strip,add_opts=add_opts)
|
||||
fn = name+'.out'
|
||||
write_to_tmpfile(cfg,fn,ret+'\n')
|
||||
ok()
|
||||
|
|
@ -337,68 +342,72 @@ class MMGenToolTestSuite(object):
|
|||
|
||||
# Cryptocoin
|
||||
def Randwif(self,name):
|
||||
for n,k in enumerate(['','compressed=1']):
|
||||
ret = self.run_cmd_out(name,kwargs=k,Return=True,fn_idx=n+1)
|
||||
for n,k in enumerate(['',maybe_compressed]):
|
||||
ao = ['--type='+k] if k else []
|
||||
ret = self.run_cmd_out(name,add_opts=ao,Return=True,fn_idx=n+1)
|
||||
ok_or_die(ret,is_wif,'WIF key')
|
||||
def Randpair(self,name):
|
||||
for n,k in enumerate(['','compressed=1','segwit=1 compressed=1']):
|
||||
wif,addr = self.run_cmd_out(name,kwargs=k,Return=True,fn_idx=n+1).split()
|
||||
for n,k in enumerate(['',maybe_compressed,maybe_segwit]):
|
||||
ao = ['--type='+k] if k else []
|
||||
wif,addr = self.run_cmd_out(name,add_opts=ao,Return=True,fn_idx=n+1).split()
|
||||
ok_or_die(wif,is_wif,'WIF key',skip_ok=True)
|
||||
ok_or_die(addr,is_coin_addr,'Coin address')
|
||||
def Wif2addr(self,name,f1,f2,f3):
|
||||
for n,f,k,m in ((1,f1,'',''),(2,f2,'','compressed'),(3,f3,'segwit=1','compressed')):
|
||||
for n,f,k,m in ((1,f1,'',''),(2,f2,'',maybe_compressed),(3,f3,maybe_segwit,maybe_compressed)):
|
||||
ao = ['--type='+k] if k else []
|
||||
wif = read_from_file(f).split()[0]
|
||||
self.run_cmd_out(name,wif,kwargs=k,fn_idx=n,extra_msg=m)
|
||||
self.run_cmd_out(name,wif,add_opts=ao,fn_idx=n,extra_msg=m)
|
||||
def Wif2hex(self,name,f1,f2,f3):
|
||||
for n,f,m in ((1,f1,''),(2,f2,'compressed'),(3,f3,'compressed for segwit')):
|
||||
for n,f,m in ((1,f1,''),(2,f2,maybe_compressed),(3,f3,'{} for {}'.format(maybe_compressed,maybe_segwit))):
|
||||
wif = read_from_file(f).split()[0]
|
||||
self.run_cmd_out(name,wif,fn_idx=n,extra_msg=m)
|
||||
def Privhex2addr(self,name,f1,f2,f3):
|
||||
keys = [read_from_file(f).rstrip() for f in (f1,f2,f3)]
|
||||
for n,k in enumerate(('','compressed=1','compressed=1 segwit=1')):
|
||||
ret = self.run_cmd(name,[keys[n]],kwargs=k).rstrip()
|
||||
for n,k in enumerate(('',maybe_compressed,maybe_segwit)):
|
||||
ao = ['--type='+k] if k else []
|
||||
ret = self.run_cmd(name,[keys[n]],add_opts=ao).rstrip()
|
||||
iaddr = read_from_tmpfile(cfg,'Randpair{}.out'.format(n+1)).split()[-1]
|
||||
cmp_or_die(iaddr,ret)
|
||||
def Hex2wif(self,name,f1,f2,f3,f4):
|
||||
for n,fi,fo,k in ((1,f1,f2,''),(2,f3,f4,'compressed=1')):
|
||||
ret = self.run_cmd_chk(name,fi,fo,kwargs=k)
|
||||
for n,fi,fo,k in ((1,f1,f2,''),(2,f3,f4,maybe_compressed)):
|
||||
ao = ['--type='+k] if k else []
|
||||
ret = self.run_cmd_chk(name,fi,fo,add_opts=ao)
|
||||
def Addr2hexaddr(self,name,f1,f2):
|
||||
for n,f,m in ((1,f1,''),(2,f2,'from compressed')):
|
||||
for n,f,m in ((1,f1,''),(2,f2,'from {}'.format(maybe_compressed))):
|
||||
addr = read_from_file(f).split()[-1]
|
||||
self.run_cmd_out(name,addr,fn_idx=n,extra_msg=m)
|
||||
def Pubhash2addr(self,name,f1,f2,f3,f4):
|
||||
for n,fi,fo,m in ((1,f1,f2,''),(2,f3,f4,'from compressed')):
|
||||
for n,fi,fo,m in ((1,f1,f2,''),(2,f3,f4,'from {}'.format(maybe_compressed))):
|
||||
self.run_cmd_chk(name,fi,fo,extra_msg=m)
|
||||
def Privhex2pubhex(self,name,f1,f2,f3): # from Hex2wif
|
||||
addr = read_from_file(f3).strip()
|
||||
self.run_cmd_out(name,addr,kwargs='compressed=1',fn_idx=3) # what about uncompressed?
|
||||
self.run_cmd_out(name,addr,add_opts=maybe_type_compressed,fn_idx=3) # what about uncompressed?
|
||||
def Pubhex2redeem_script(self,name,f1,f2,f3): # from above
|
||||
addr = read_from_file(f3).strip()
|
||||
self.run_cmd_out(name,addr,fn_idx=3)
|
||||
rs = read_from_tmpfile(cfg,name+'3.out').strip()
|
||||
self.run_cmd_out('pubhex2addr',rs,kwargs='p2sh=1',fn_idx=3,hush=True)
|
||||
self.run_cmd_out('pubhex2addr',rs,add_opts=maybe_type_segwit,fn_idx=3,hush=True)
|
||||
addr1 = read_from_tmpfile(cfg,'pubhex2addr3.out').strip()
|
||||
addr2 = read_from_tmpfile(cfg,'Randpair3.out').split()[1]
|
||||
cmp_or_die(addr1,addr2)
|
||||
def Wif2redeem_script(self,name,f1,f2,f3): # compare output with above
|
||||
wif = read_from_file(f3).split()[0]
|
||||
ret1 = self.run_cmd_out(name,wif,fn_idx=3,Return=True)
|
||||
ret1 = self.run_cmd_out(name,wif,add_opts=maybe_type_segwit,fn_idx=3,Return=True)
|
||||
ret2 = read_from_tmpfile(cfg,'Pubhex2redeem_script3.out').strip()
|
||||
cmp_or_die(ret1,ret2)
|
||||
def Wif2segwit_pair(self,name,f1,f2): # does its own checking, so just run
|
||||
wif = read_from_file(f2).split()[0]
|
||||
self.run_cmd_out(name,wif,fn_idx=2)
|
||||
|
||||
self.run_cmd_out(name,wif,add_opts=maybe_type_segwit,fn_idx=2)
|
||||
def Pubhex2addr(self,name,f1,f2,f3):
|
||||
addr = read_from_file(f3).strip()
|
||||
self.run_cmd_out(name,addr,fn_idx=3)
|
||||
self.run_cmd_out(name,addr,add_opts=maybe_type_segwit,fn_idx=3)
|
||||
|
||||
def Pipetest(self,name,f1,f2,f3):
|
||||
wif = read_from_file(f3).split()[0]
|
||||
cmd = ( '{c} {a} wif2hex {wif} | ' +
|
||||
'{c} {a} privhex2pubhex - compressed=1 | ' +
|
||||
'{c} {a} --type=compressed privhex2pubhex - | ' +
|
||||
'{c} {a} pubhex2redeem_script - | ' +
|
||||
'{c} {a} pubhex2addr - p2sh=1').format(
|
||||
'{c} {a} --type=segwit pubhex2addr -').format(
|
||||
c=' '.join(spawn_cmd),
|
||||
a=' '.join(add_spawn_args),
|
||||
wif=wif)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue