string formatting, whitespace, minor cleanups throughout

This commit is contained in:
The MMGen Project 2020-05-26 14:53:44 +00:00
commit 9489b1cb2d
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
30 changed files with 438 additions and 267 deletions

View file

@ -28,17 +28,18 @@ from .baseconv import *
pnm = g.proj_name
def dmsg_sc(desc,data):
if g.debug_addrlist: Msg('sc_debug_{}: {}'.format(desc,data))
if g.debug_addrlist:
Msg(f'sc_debug_{desc}: {data}')
class AddrGenerator(MMGenObject):
def __new__(cls,addr_type):
if type(addr_type) == str: # allow override w/o check
gen_method = addr_type
elif type(addr_type) == MMGenAddrType:
assert addr_type in g.proto.mmtypes,'{}: invalid address type for coin {}'.format(addr_type,g.coin)
assert addr_type in proto.mmtypes, f'{addr_type}: invalid address type for coin {g.coin}'
gen_method = addr_type.gen_method
else:
raise TypeError('{}: incorrect argument type for {}()'.format(type(addr_type),cls.__name__))
raise TypeError(f'{type(addr_type)}: incorrect argument type for {cls.__name__}()')
gen_methods = {
'p2pkh': AddrGeneratorP2PKH,
'segwit': AddrGeneratorSegwit,
@ -92,8 +93,6 @@ class AddrGeneratorEthereum(AddrGenerator):
from .protocol import hash256
self.hash256 = hash256
return AddrGenerator.__init__(addr_type)
def to_addr(self,pubhex):
assert type(pubhex) == PubKey
return CoinAddr(self.keccak_256(bytes.fromhex(pubhex[2:])).hexdigest()[24:])
@ -116,7 +115,7 @@ class AddrGeneratorZcashZ(AddrGenerator):
def to_addr(self,pubhex): # pubhex is really privhex
key = bytes.fromhex(pubhex)
assert len(key) == 32,'{}: incorrect privkey length'.format(len(key))
assert len(key) == 32, f'{len(key)}: incorrect privkey length'
from nacl.bindings import crypto_scalarmult_base
p2 = crypto_scalarmult_base(self.zhash256(key,1))
from .protocol import _b58chk_encode
@ -126,7 +125,7 @@ class AddrGeneratorZcashZ(AddrGenerator):
def to_viewkey(self,pubhex): # pubhex is really privhex
key = bytes.fromhex(pubhex)
assert len(key) == 32,'{}: incorrect privkey length'.format(len(key))
assert len(key) == 32, f'{len(key)}: incorrect privkey length'
vk = bytearray(self.zhash256(key,0)+self.zhash256(key,1))
vk[32] &= 0xf8
vk[63] &= 0x7f
@ -164,8 +163,6 @@ class AddrGeneratorMonero(AddrGenerator):
self.scalarmult = scalarmult
self.B = B
return AddrGenerator.__init__(addr_type)
def b58enc(self,addr_bytes):
enc = baseconv.frombytes
l = len(addr_bytes)
@ -194,14 +191,16 @@ class AddrGeneratorMonero(AddrGenerator):
pvk_str = self.encodepoint(scalarmultbase(hex2int_le(vk_hex)))
addr_p1 = g.proto.addr_fmt_to_ver_bytes('monero') + pk_str + pvk_str
return CoinAddr(self.b58enc(addr_p1 + self.keccak_256(addr_p1).digest()[:4]))
return CoinAddr(
addr = self.b58enc(addr_p1 + self.keccak_256(addr_p1).digest()[:4]) )
def to_wallet_passwd(self,sk_hex):
return WalletPassword(self.hash256(sk_hex)[:32])
def to_viewkey(self,sk_hex):
assert len(sk_hex) == 64,'{}: incorrect privkey length'.format(len(sk_hex))
return MoneroViewKey(g.proto.preprocess_key(self.keccak_256(bytes.fromhex(sk_hex)).digest(),None).hex())
assert len(sk_hex) == 64, f'{len(sk_hex)}: incorrect privkey length'
return MoneroViewKey(
g.proto.preprocess_key(self.keccak_256(bytes.fromhex(sk_hex)).digest(),None).hex() )
def to_segwit_redeem_script(self,sk_hex):
raise NotImplementedError('Monero addresses incompatible with Segwit')
@ -319,6 +318,7 @@ class AddrListChksum(str,Hilite):
class AddrListIDStr(str,Hilite):
color = 'green'
trunc_ok = False
def __new__(cls,addrlist,fmt_str=None):
idxs = [e.idx for e in addrlist.data]
prev = idxs[0]
@ -376,8 +376,16 @@ Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr)
line_ctr = 0
def __init__(self,addrfile='',al_id='',adata=[],seed='',addr_idxs='',src='',
addrlist='',keylist='',mmtype=None):
def __init__(self,
addrfile = '',
al_id = '',
adata = [],
seed = '',
addr_idxs = '',
src = '',
addrlist = '',
keylist = '',
mmtype = None ):
do_chksum = True
self.update_msgs()
@ -460,7 +468,10 @@ Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
e = le(idx=num)
# Secret key is double sha256 of seed hash round /num/
e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),compressed=compressed,pubkey_type=pubkey_type)
e.sec = PrivKey(
sha256(sha256(seed).digest()).digest(),
compressed = compressed,
pubkey_type = pubkey_type )
if self.gen_addrs:
pubhex = kg.to_pubhex(e.sec)
@ -707,17 +718,17 @@ Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
lines = get_lines_from_file(fn,self.data_desc+' data',trim_comments=True)
try:
assert len(lines) >= 3, 'Too few lines in address file ({})'.format(len(lines))
assert len(lines) >= 3, f'Too few lines in address file ({len(lines)})'
ls = lines[0].split()
assert 1 < len(ls) < 5, "Invalid first line for {} file: '{}'".format(self.gen_desc,lines[0])
assert ls.pop() == '{', "'{}': invalid first line".format(ls)
assert lines[-1] == '}', "'{}': invalid last line".format(lines[-1])
assert 1 < len(ls) < 5, f'Invalid first line for {self.gen_desc} file: {lines[0]!r}'
assert ls.pop() == '{', f'{ls!r}: invalid first line'
assert lines[-1] == '}', f'{lines[-1]!r}: invalid last line'
sid = ls.pop(0)
assert is_mmgen_seed_id(sid),"'{}': invalid Seed ID".format(ls[0])
assert is_mmgen_seed_id(sid), f'{sid!r}: invalid Seed ID'
if type(self) == PasswordList and len(ls) == 2:
ss = ls.pop().split(':')
assert len(ss) == 2,"'{}': invalid password length specifier (must contain colon)".format(ls[2])
assert len(ss) == 2, f'{ss!r}: invalid password length specifier (must contain colon)'
self.set_pw_fmt(ss[0])
self.set_pw_len(ss[1])
self.pw_id_str = MMGenPWIDString(ls.pop())
@ -740,11 +751,16 @@ Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
data = self.parse_file_body(lines[1:-1])
assert isinstance(data,list),'Invalid file body data'
except Exception as e:
lcs = ', content line {}'.format(self.line_ctr) if self.line_ctr else ''
m = 'Invalid data in {} list file {!r}{} ({})'.format(self.data_desc,self.infile,lcs,e.args[0])
if exit_on_error: die(3,m)
msg(msg)
return False
m = 'Invalid data in {} list file {!r}{} ({!s})'.format(
self.data_desc,
self.infile,
(f', content line {self.line_ctr}' if self.line_ctr else ''),
e )
if exit_on_error:
die(3,m)
else:
msg(m)
return False
return data
@ -761,12 +777,12 @@ class KeyAddrList(AddrList):
class KeyList(AddrList):
msgs = {
'file_header': """
'file_header': f"""
# {pnm} key file
#
# This file is editable.
# Everything following a hash symbol '#' is a comment and ignored by {pnm}.
""".strip().format(pnm=pnm)
""".strip()
}
data_desc = 'key'
file_desc = 'secret keys'
@ -788,22 +804,22 @@ def is_xmrseed(s):
from collections import namedtuple
class PasswordList(AddrList):
msgs = {
'file_header': """
'file_header': f"""
# {pnm} password file
#
# This file is editable.
# Everything following a hash symbol '#' is a comment and ignored by {pnm}.
# A text label of {n} screen cells or less may be added to the right of each
# A text label of {TwComment.max_screen_width} screen cells or less may be added to the right of each
# password. The label may contain any printable ASCII symbol.
#
""".strip().format(n=TwComment.max_screen_width,pnm=pnm),
'file_header_mn': """
""".strip(),
'file_header_mn': f"""
# {pnm} {{}} password file
#
# This file is editable.
# Everything following a hash symbol '#' is a comment and ignored by {pnm}.
#
""".strip().format(pnm=pnm),
""".strip(),
'record_chksum': """
Record this checksum: it will be used to verify the password file in the future
""".strip()
@ -823,20 +839,26 @@ Record this checksum: it will be used to verify the password file in the future
dfl_pw_fmt = 'b58'
pwinfo = namedtuple('passwd_info',['min_len','max_len','dfl_len','valid_lens','desc','chk_func'])
pw_info = {
'b32': pwinfo(10, 42 ,24, None, 'base32 password', is_b32_str), # 32**24 < 2**128
'b58': pwinfo(8, 36 ,20, None, 'base58 password', is_b58_str), # 58**20 < 2**128
'bip39': pwinfo(12, 24 ,24, [12,18,24], 'BIP39 mnemonic', is_bip39_str),
'xmrseed': pwinfo(25, 25, 25, [25], 'Monero new-style mnemonic', is_xmrseed),
'hex': pwinfo(32, 64 ,64, [32,48,64], 'hexadecimal password', is_hex_str),
'b32': pwinfo(10, 42 ,24, None, 'base32 password', is_b32_str), # 32**24 < 2**128
'b58': pwinfo(8, 36 ,20, None, 'base58 password', is_b58_str), # 58**20 < 2**128
'bip39': pwinfo(12, 24 ,24, [12,18,24],'BIP39 mnemonic', is_bip39_str),
'xmrseed': pwinfo(25, 25, 25, [25], 'Monero new-style mnemonic',is_xmrseed),
'hex': pwinfo(32, 64 ,64, [32,48,64],'hexadecimal password', is_hex_str),
}
chksum_rec_f = lambda foo,e: (str(e.idx), e.passwd)
feature_warn_fs = 'WARNING: {!r} is a potentially dangerous feature. Use at your own risk!'
hex2bip39 = False
def __init__( self,infile=None,seed=None,
pw_idxs=None,pw_id_str=None,pw_len=None,pw_fmt=None,
chk_params_only=False):
def __init__(self,
infile = None,
seed = None,
pw_idxs = None,
pw_id_str = None,
pw_len = None,
pw_fmt = None,
chk_params_only = False
):
self.update_msgs()
@ -879,8 +901,10 @@ Record this checksum: it will be used to verify the password file in the future
self.pw_fmt = pw_fmt
self.pw_fmt_disp = pw_fmt
if self.pw_fmt not in self.pw_info:
m = '{!r}: invalid password format. Valid formats: {}'
raise InvalidPasswdFormat(m.format(self.pw_fmt,', '.join(sorted(self.pw_info))))
raise InvalidPasswdFormat(
'{!r}: invalid password format. Valid formats: {}'.format(
self.pw_fmt,
', '.join(self.pw_info) ))
def chk_pw_len(self,passwd=None):
if passwd is None:

View file

@ -302,8 +302,8 @@ class CoinDaemon(Daemon):
'bch': cd('Bcash', 'Bitcoin', 'bitcoind-abc','bitcoin-cli', 'bitcoin.conf', 'testnet3',8442,18442,18553),
'ltc': cd('Litecoin', 'Bitcoin', 'litecoind', 'litecoin-cli','litecoin.conf','testnet4',9332,19332,19444),
'xmr': cd('Monero', 'Monero', 'monerod', 'monerod', 'bitmonero.conf',None, 18081,None,None),
'eth': cd('Ethereum', 'Ethereum','parity', 'parity', 'parity.conf', None, 8545, None,None),
'etc': cd('Ethereum Classic','Ethereum','parity', 'parity', 'parity.conf', None, 8545, None,None)
'eth': cd('Ethereum', 'Ethereum','parity', 'parity', 'parity.conf', None, 8545, 8545,8545),
'etc': cd('Ethereum Classic','Ethereum','parity', 'parity', 'parity.conf', None, 8545, 8545,8545)
}
def __new__(cls,network_id,test_suite=False,flags=None):
@ -459,13 +459,16 @@ class BitcoinDaemon(CoinDaemon):
or "does not exist" in err ):
# regtest has no cookie file, so test will always fail
if self.lockfile and os.path.exists(self.lockfile):
return 'busy'
ret = 'busy'
else:
return 'stopped'
ret = 'stopped'
elif cp.returncode == 0:
return 'ready'
ret = 'ready'
else:
return 'busy'
ret = 'busy'
if self.debug:
print(f'State: {ret!r}')
return ret
@property
def stop_cmd(self):

View file

@ -131,4 +131,4 @@ if os.getenv('MMGEN_DEBUG') or os.getenv('MMGEN_TEST_SUITE') or os.getenv('MMGEN
def get_ndiff(a,b):
a = a.split('\n')
b = b.split('\n')
return ndiff(a,b)
return list(ndiff(a,b))

View file

@ -141,7 +141,7 @@ class GlobalContext:
proj_name.lower()
)
data_dir_root,data_dir,cfg_file = None,None,None
daemon_data_dir = '' # set by user or protocol
daemon_data_dir = '' # set by user
# global var sets user opt:
global_sets_opt = ( 'minconf','seed_len','hash_preset','usr_randchars','debug',

View file

@ -130,7 +130,7 @@ async def check_daemons_running():
if opt.coins:
coins = opt.coins.upper().split(',')
else:
ymsg('Warning: no coins specified, so defaulting to BTC only')
ymsg('Warning: no coins specified, defaulting to BTC')
coins = ['BTC']
for coin in coins:
@ -165,9 +165,9 @@ def do_mount():
try:
ds = os.stat(tx_dir)
assert S_ISDIR(ds.st_mode), f'{tx_dir!r} is not a directory!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR,f'{tx_dir!r} is not read/write for this user!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{tx_dir!r} is not read/write for this user!'
except:
die(1,'{tx_dir!r} missing, or not read/writable by user!')
die(1,f'{tx_dir!r} missing or not read/writable by user!')
def do_umount():
if os.path.ismount(mountpoint):
@ -229,9 +229,9 @@ async def sign():
fails.append(txfile)
qmsg('')
time.sleep(0.3)
msg('{} transaction{} signed'.format(len(signed_txs),suf(signed_txs)))
msg(f'{len(signed_txs)} transaction{suf(signed_txs)} signed')
if fails:
rmsg('{} transaction{} failed to sign'.format(len(fails),suf(fails)))
rmsg(f'{len(fails)} transaction{suf(fails)} failed to sign')
if signed_txs and not opt.no_summary:
print_summary(signed_txs)
if fails:
@ -248,7 +248,7 @@ def decrypt_wallets():
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
from .wallet import Wallet
msg("Unlocking wallet{} with key from '{}'".format(suf(wfs),opt.passwd_file))
msg(f'Unlocking wallet{suf(wfs)} with key from {opt.passwd_file!r}')
fails = 0
for wf in wfs:
try:
@ -321,13 +321,13 @@ def wipe_existing_key():
try: os.stat(fn)
except: pass
else:
msg('\nWiping existing key {}'.format(fn))
msg(f'\nWiping existing key {fn!r}')
run(['wipe','-cf',fn],check=True)
def create_key():
kdata = os.urandom(32).hex()
fn = os.path.join(tx_dir,key_fn)
desc = 'key file {}'.format(fn)
desc = f'key file {fn!r}'
msg('Creating ' + desc)
try:
open(fn,'w').write(kdata+'\n')
@ -347,7 +347,7 @@ def gen_key(no_unmount=False):
do_umount()
def remove_wallet_dir():
msg("Deleting '{}'".format(wallet_dir))
msg(f'Deleting {wallet_dir!r}')
try: shutil.rmtree(wallet_dir)
except: pass
@ -355,7 +355,7 @@ def create_wallet_dir():
try: os.mkdir(wallet_dir)
except: pass
try: os.stat(wallet_dir)
except: die(2,"Unable to create wallet directory '{}'".format(wallet_dir))
except: die(2,f'Unable to create wallet directory {wallet_dir!r}')
def setup():
remove_wallet_dir()
@ -399,7 +399,7 @@ async def do_loop():
await do_sign()
prev_status = status
if not n % 10:
msg_r('\r{}\rWaiting'.format(' '*17))
msg_r(f"\r{' '*17}\rWaiting")
sys.stderr.flush()
time.sleep(1)
msg_r('.')

View file

@ -147,12 +147,22 @@ sf = get_seed_file(cmd_args,1)
pw_fmt = opt.passwd_fmt or PasswordList.dfl_pw_fmt
pw_len = pwi[pw_fmt].dfl_len // 2 if opt.passwd_len in ('h','H') else opt.passwd_len
PasswordList(pw_id_str=pw_id_str,pw_len=pw_len,pw_fmt=pw_fmt,chk_params_only=True)
PasswordList(
pw_id_str = pw_id_str,
pw_len = pw_len,
pw_fmt = pw_fmt,
chk_params_only = True )
do_license_msg()
ss = Wallet(sf)
al = PasswordList(seed=ss.seed,pw_idxs=pw_idxs,pw_id_str=pw_id_str,pw_len=pw_len,pw_fmt=pw_fmt)
al = PasswordList(
seed = ss.seed,
pw_idxs = pw_idxs,
pw_id_str = pw_id_str,
pw_len = pw_len,
pw_fmt = pw_fmt )
al.format()

View file

@ -79,4 +79,5 @@ elif cmd_args[0] not in MMGenRegtest.usr_cmds:
die(1,'{!r}: invalid command'.format(cmd_args[0]))
elif cmd_args[0] not in ('cli','balances'):
check_num_args()
MMGenRegtest(g.coin).cmd(cmd_args)

View file

@ -82,7 +82,10 @@ column below:
},
'code': {
'options': lambda s: s.format(
g=g,pnm=g.proj_name,pnl=g.proj_name.lower(),dn=g.proto.daemon_name,
g=g,
pnm=g.proj_name,
pnl=g.proj_name.lower(),
dn=proto.daemon_name,
fu=help_notes('rel_fee_desc'),fl=help_notes('fee_spec_letters'),
kgs=' '.join(['{}:{}'.format(n,k) for n,k in enumerate(g.key_generators,1)]),
kg=g.key_generator,

View file

@ -98,7 +98,8 @@ column below:
kgs=' '.join(['{}:{}'.format(n,k) for n,k in enumerate(g.key_generators,1)]),
fu=help_notes('rel_fee_desc'),
fl=help_notes('fee_spec_letters'),
ss=g.subseeds,ss_max=SubSeedIdxRange.max_idx,
ss=g.subseeds,
ss_max=SubSeedIdxRange.max_idx,
fe_all=fmt_list(g.autoset_opts['fee_estimate_mode'].choices,fmt='no_spc'),
fe_dfl=g.autoset_opts['fee_estimate_mode'].choices[0],
kg=g.key_generator,

View file

@ -78,10 +78,14 @@ column below:
},
'code': {
'options': lambda s: s.format(
g=g,pnm=g.proj_name,pnl=g.proj_name.lower(),dn=g.proto.daemon_name,
g=g,
pnm=g.proj_name,
pnl=g.proj_name.lower(),
dn=g.proto.daemon_name,
kgs=' '.join(['{}:{}'.format(n,k) for n,k in enumerate(g.key_generators,1)]),
kg=g.key_generator,
ss=g.subseeds,ss_max=SubSeedIdxRange.max_idx,
ss=g.subseeds,
ss_max=SubSeedIdxRange.max_idx,
cu=g.coin),
'notes': lambda s: s.format(
help_notes('txsign'),

View file

@ -34,14 +34,14 @@ class aInitMeta(type):
await instance.__ainit__(*args,**kwargs)
return instance
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
def is_mmgen_id(s): return MMGenID(s,on_fail='silent')
def is_coin_addr(s): return CoinAddr(s,on_fail='silent')
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
def is_tw_label(s): return TwLabel(s,on_fail='silent')
def is_wif(s): return WifKey(s,on_fail='silent')
def is_viewkey(s): return ViewKey(s,on_fail='silent')
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
def is_mmgen_id(s): return MMGenID(s,on_fail='silent')
def is_coin_addr(s): return CoinAddr(s,on_fail='silent')
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
def is_tw_label(s): return TwLabel(s,on_fail='silent')
def is_wif(s): return WifKey(s,on_fail='silent')
def is_viewkey(s): return ViewKey(s,on_fail='silent')
def is_seed_split_specifier(s): return SeedSplitSpecifier(s,on_fail='silent')
def truncate_str(s,width): # width = screen width
@ -641,22 +641,23 @@ class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
color = 'orange'
width = 0
trunc_ok = False
def __new__(cls,s,on_fail='die'):
if type(s) == cls: return s
def __new__(cls,id_str,on_fail='die'):
if type(id_str) == cls:
return id_str
cls.arg_chk(on_fail)
ret = None
try:
ret = MMGenID(s,on_fail='raise')
ret = MMGenID(id_str,on_fail='raise')
sort_key,idtype = ret.sort_key,'mmgen'
except Exception as e:
try:
assert s.split(':',1)[0] == g.proto.base_coin.lower(),(
assert id_str.split(':',1)[0] == g.proto.base_coin.lower(),(
"not a string beginning with the prefix '{}:'".format(g.proto.base_coin.lower()))
assert set(s[4:]) <= set(ascii_letters+digits),'contains non-alphanumeric characters'
assert len(s) > 4,'not more that four characters long'
ret,sort_key,idtype = str(s),'z_'+s,'non-mmgen'
assert set(id_str[4:]) <= set(ascii_letters+digits),'contains non-alphanumeric characters'
assert len(id_str) > 4,'not more that four characters long'
ret,sort_key,idtype = str(id_str),'z_'+id_str,'non-mmgen'
except Exception as e2:
return cls.init_fail(e,s,e2=e2)
return cls.init_fail(e,id_str,e2=e2)
me = str.__new__(cls,ret)
me.obj = ret
@ -666,19 +667,20 @@ class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
# non-displaying container for TwMMGenID,TwComment
class TwLabel(str,InitErrors,MMGenObject):
def __new__(cls,s,on_fail='die'):
if type(s) == cls: return s
def __new__(cls,text,on_fail='die'):
if type(text) == cls:
return text
cls.arg_chk(on_fail)
try:
ss = s.split(None,1)
mmid = TwMMGenID(ss[0],on_fail='raise')
comment = TwComment(ss[1] if len(ss) == 2 else '',on_fail='raise')
ts = text.split(None,1)
mmid = TwMMGenID(ts[0],on_fail='raise')
comment = TwComment(ts[1] if len(ts) == 2 else '',on_fail='raise')
me = str.__new__(cls,'{}{}'.format(mmid,' {}'.format(comment) if comment else ''))
me.mmid = mmid
me.comment = comment
return me
except Exception as e:
return cls.init_fail(e,s)
return cls.init_fail(e,text)
class HexStr(str,Hilite,InitErrors):
color = 'red'
@ -712,15 +714,16 @@ class WifKey(str,Hilite,InitErrors):
"""
width = 53
color = 'blue'
def __new__(cls,s,on_fail='die'):
if type(s) == cls: return s
def __new__(cls,wif,on_fail='die'):
if type(wif) == cls:
return wif
cls.arg_chk(on_fail)
try:
assert set(s) <= set(ascii_letters+digits),'not an ascii alphanumeric string'
g.proto.parse_wif(s) # raises exception on error
return str.__new__(cls,s)
assert set(wif) <= set(ascii_letters+digits),'not an ascii alphanumeric string'
g.proto.parse_wif(wif) # raises exception on error
return str.__new__(cls,wif)
except Exception as e:
return cls.init_fail(e,s)
return cls.init_fail(e,wif)
class PubKey(HexStr,MMGenObject): # TODO: add some real checks
def __new__(cls,s,compressed,on_fail='die'):
@ -797,8 +800,7 @@ class AddrListID(str,Hilite,InitErrors,MMGenObject):
try:
assert type(sid) == SeedID,"{!r} not a SeedID instance".format(sid)
if not isinstance(mmtype,(MMGenAddrType,MMGenPasswordType)):
m = '{!r}: not an instance of MMGenAddrType or MMGenPasswordType'.format(mmtype)
raise ValueError(m.format(mmtype))
raise ValueError(f'{mmtype!r}: not an instance of MMGenAddrType or MMGenPasswordType')
me = str.__new__(cls,sid+':'+mmtype)
me.sid = sid
me.mmtype = mmtype
@ -913,24 +915,26 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
'Z': ati('zcash_z','zcash_z',False,'zcash_z', 'zcash_z', 'wif', ('viewkey',), 'Zcash z-address'),
'M': ati('monero', 'monero', False,'monero', 'monero', 'spendkey',('viewkey','wallet_passwd'),'Monero address'),
}
def __new__(cls,s,on_fail='die',errmsg=None):
if type(s) == cls: return s
def __new__(cls,id_str,on_fail='die',errmsg=None):
if type(id_str) == cls:
return id_str
cls.arg_chk(on_fail)
try:
for k,v in list(cls.mmtypes.items()):
if s in (k,v.name):
if s == v.name: s = k
me = str.__new__(cls,s)
if id_str in (k,v.name):
if id_str == v.name:
id_str = k
me = str.__new__(cls,id_str)
for k in v._fields:
setattr(me,k,getattr(v,k))
if me not in g.proto.mmtypes + ('P',):
raise ValueError(f'{me.name!r}: invalid address type for {g.proto.cls_name}')
return me
raise ValueError('unrecognized address type')
raise ValueError(f'{id_str}: unrecognized address type for protocol {proto.name}')
except Exception as e:
emsg = '{!r}\n'.format(errmsg) if errmsg else ''
m = '{}{!r}: invalid value for {} ({})'.format(emsg,s,cls.__name__,e.args[0])
return cls.init_fail(e,m,preformat=True)
return cls.init_fail( e,
f"{errmsg or ''}{id_str!r}: invalid value for {cls.__name__} ({e!s})",
preformat = True )
@classmethod
def get_names(cls):

View file

@ -55,7 +55,10 @@ def print_help(po,opts_data,opt_filter):
opts_data['text']['long_options'] = d
remove_unneeded_long_opts()
mmgen.share.Opts.print_help(po,opts_data,opt_filter) # exits
mmgen.share.Opts.print_help( # exits
po,
opts_data,
opt_filter )
def fmt_opt(o):
return '--' + o.replace('_','-')

View file

@ -457,8 +457,8 @@ class CoinProtocol(MMGenObject):
def init_proto(coin,testnet=False,regtest=False,network=None):
assert type(testnet) == bool
assert type(regtest) == bool
assert type(testnet) == bool, 'init_proto_chk1'
assert type(regtest) == bool, 'init_proto_chk2'
if network is None:
network = 'regtest' if regtest else 'testnet' if testnet else 'mainnet'

View file

@ -205,7 +205,7 @@ class RPCClient(MMGenObject):
try:
socket.create_connection((host,port),timeout=1).close()
except:
raise SocketError('Unable to connect to {}:{}'.format(host,port))
raise SocketError(f'Unable to connect to {host}:{port}')
self.http_hdrs = { 'Content-Type': 'application/json' }
self.url = f'{self.network_proto}://{host}:{port}{self.host_path}'

View file

@ -409,28 +409,39 @@ class MMGenToolCmdCoin(MMGenToolCmds):
def randwif(self):
"generate a random private key in WIF format"
init_generators('at')
return PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed).wif
return PrivKey(
get_random(32),
pubkey_type = at.pubkey_type,
compressed = at.compressed ).wif
def randpair(self):
"generate a random private key/address pair"
init_generators()
privhex = PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed)
privhex = PrivKey(
get_random(32),
pubkey_type = at.pubkey_type,
compressed = at.compressed )
addr = ag.to_addr(kg.to_pubhex(privhex))
return (privhex.wif,addr)
def wif2hex(self,wifkey:'sstr'):
"convert a private key from WIF to hex format"
return PrivKey(wif=wifkey)
return PrivKey(
wif = wifkey )
def hex2wif(self,privhex:'sstr'):
"convert a private key from hex to WIF format"
init_generators('at')
return PrivKey(bytes.fromhex(privhex),pubkey_type=at.pubkey_type,compressed=at.compressed).wif
return PrivKey(
bytes.fromhex(privhex),
pubkey_type = at.pubkey_type,
compressed = at.compressed ).wif
def wif2addr(self,wifkey:'sstr'):
"generate a coin address from a key in WIF format"
init_generators()
privhex = PrivKey(wif=wifkey)
privhex = PrivKey(
wif = wifkey )
addr = ag.to_addr(kg.to_pubhex(privhex))
return addr
@ -438,14 +449,16 @@ class MMGenToolCmdCoin(MMGenToolCmds):
"convert a WIF private key to a Segwit P2SH-P2WPKH redeem script"
assert opt.type == 'segwit','This command is meaningful only for --type=segwit'
init_generators()
privhex = PrivKey(wif=wifkey)
privhex = PrivKey(
wif = wifkey )
return ag.to_segwit_redeem_script(kg.to_pubhex(privhex))
def wif2segwit_pair(self,wifkey:'sstr'):
"generate both a Segwit P2SH-P2WPKH redeem script and address from WIF"
assert opt.type == 'segwit','This command is meaningful only for --type=segwit'
init_generators()
pubhex = kg.to_pubhex(PrivKey(wif=wifkey))
pubhex = kg.to_pubhex(PrivKey(
wif = wifkey ))
addr = ag.to_addr(pubhex)
rs = ag.to_segwit_redeem_script(pubhex)
return (rs,addr)
@ -453,7 +466,10 @@ class MMGenToolCmdCoin(MMGenToolCmds):
def privhex2addr(self,privhex:'sstr',output_pubhex=False):
"generate coin address from raw private key data in hexadecimal format"
init_generators()
pk = PrivKey(bytes.fromhex(privhex),compressed=at.compressed,pubkey_type=at.pubkey_type)
pk = PrivKey(
bytes.fromhex(privhex),
compressed = at.compressed,
pubkey_type = at.pubkey_type )
ph = kg.to_pubhex(pk)
return ph if output_pubhex else ag.to_addr(ph)
@ -833,7 +849,10 @@ class MMGenToolCmdWallet(MMGenToolCmds):
if ss.seed.sid != addr.sid:
m = 'Seed ID of requested address ({}) does not match wallet ({})'
die(1,m.format(addr.sid,ss.seed.sid))
al = AddrList(seed=ss.seed,addr_idxs=AddrIdxList(str(addr.idx)),mmtype=addr.mmtype)
al = AddrList(
seed = ss.seed,
addr_idxs = AddrIdxList(str(addr.idx)),
mmtype = addr.mmtype )
d = al.data[0]
ret = d.sec.wif if target=='wif' else d.addr
return ret

View file

@ -401,13 +401,19 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
elif action == 'd_days':
af = self.age_fmts_interactive
self.age_fmt = af[(af.index(self.age_fmt) + 1) % len(af)]
elif action == 'd_mmid': self.show_mmid = not self.show_mmid
elif action == 'd_mmid':
self.show_mmid = not self.show_mmid
elif action == 'd_group':
if self.can_group:
self.group = not self.group
elif action == 'd_redraw': pass
elif action == 'd_reverse': self.unspent.reverse(); self.reverse = not self.reverse
elif action == 'a_quit': msg(''); return self.unspent
elif action == 'd_redraw':
pass
elif action == 'd_reverse':
self.unspent.reverse()
self.reverse = not self.reverse
elif action == 'a_quit':
msg('')
return self.unspent
elif action == 'a_balance_refresh':
idx = self.get_idx_from_user(action)
if idx:
@ -546,7 +552,8 @@ class TwAddrList(MMGenDict,metaclass=aInitMeta):
def raw_list(self):
return [((k if k.type == 'mmgen' else 'Non-MMGen'),self[k]['addr'],self[k]['amt']) for k in self]
def coinaddr_list(self): return [self[k]['addr'] for k in self]
def coinaddr_list(self):
return [self[k]['addr'] for k in self]
async def format(self,showbtcaddrs,sort,show_age,age_fmt):
if not self.has_age:
@ -649,7 +656,11 @@ class TrackingWallet(MMGenObject,metaclass=aInitMeta):
tw_dir = (
os.path.join(g.data_dir,g.proto.data_subdir) if g.coin == 'BTC' else
os.path.join(g.data_dir_root,'altcoins',g.coin.lower(),g.proto.data_subdir) )
os.path.join(
g.data_dir_root,
'altcoins',
g.coin.lower(),
g.proto.data_subdir) )
self.tw_fn = os.path.join(tw_dir,'tracking-wallet.json')
check_or_create_dir(tw_dir)

View file

@ -70,7 +70,7 @@ class MMGenTxFile:
if len(tx_data) == 6:
assert len(tx_data[-1]) == 64,'invalid coin TxID length'
desc = f'{g.proto.name} TxID'
desc = f'coin TxID'
tx.coin_txid = CoinTxID(tx_data.pop(-1),on_fail='raise')
if len(tx_data) == 5:

View file

@ -54,17 +54,18 @@ def get_seed_for_seed_id(sid,infiles,saved_seeds):
subseeds_checked = True
if not seed: continue
elif opt.in_fmt:
qmsg('Need seed data for Seed ID {}'.format(sid))
qmsg(f'Need seed data for Seed ID {sid}')
seed = Wallet().seed
msg('User input produced Seed ID {}'.format(seed.sid))
msg(f'User input produced Seed ID {seed.sid}')
if not seed.sid == sid: # TODO: add test
seed = seed.subseed_by_seed_id(sid,print_msg=True)
if seed:
saved_seeds[seed.sid] = seed
if seed.sid == sid: return seed
if seed.sid == sid:
return seed
else:
die(2,'ERROR: No seed source found for Seed ID: {}'.format(sid))
die(2,f'ERROR: No seed source found for Seed ID: {sid}')
def generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds):
mmids = [e.mmid for e in need_keys]
@ -84,12 +85,15 @@ def generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds):
def add_keys(tx,src,infiles=None,saved_seeds=None,keyaddr_list=None):
need_keys = [e for e in getattr(tx,src) if e.mmid and not e.have_wif]
if not need_keys: return []
desc,m1 = ('key-address file','From key-address file:') if keyaddr_list else \
('seed(s)','Generated from seed:')
qmsg('Checking {} -> {} address mappings for {} (from {})'.format(pnm,g.coin,src,desc))
d = MMGenList([keyaddr_list]) if keyaddr_list else \
generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds)
if not need_keys:
return []
desc,src_desc = (
('key-address file','From key-address file:') if keyaddr_list else
('seed(s)','Generated from seed:') )
qmsg(f'Checking {g.proj_name} -> {g.coin} address mappings for {src} (from {desc})')
d = (
MMGenList([keyaddr_list]) if keyaddr_list else
generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds) )
new_keys = []
for e in need_keys:
for kal in d:
@ -101,9 +105,9 @@ def add_keys(tx,src,infiles=None,saved_seeds=None,keyaddr_list=None):
if src == 'inputs':
new_keys.append(f)
else:
die(3,wmsg['mapping_error'].format(m1,mmid,f.addr,'tx file:',e.mmid,e.addr))
die(3,wmsg['mapping_error'].format(src_desc,mmid,f.addr,'tx file:',e.mmid,e.addr))
if new_keys:
vmsg('Added {} wif key{} from {}'.format(len(new_keys),suf(new_keys),desc))
vmsg(f'Added {len(new_keys)} wif key{suf(new_keys)} from {desc}')
return new_keys
def _pop_and_return(args,cmplist): # strips found args
@ -111,7 +115,8 @@ def _pop_and_return(args,cmplist): # strips found args
def get_tx_files(opt,args):
ret = _pop_and_return(args,[MMGenTX.raw_ext])
if not ret: die(1,'You must specify a raw transaction file!')
if not ret:
die(1,'You must specify a raw transaction file!')
return ret
def get_seed_files(opt,args):
@ -142,12 +147,13 @@ def get_keylist(opt):
async def txsign(tx,seed_files,kl,kal,tx_num_str=''):
keys = MMGenList() # list of AddrListEntry objects
non_mm_addrs = tx.get_non_mmaddrs('inputs')
non_mmaddrs = tx.get_non_mmaddrs('inputs')
if non_mm_addrs:
if non_mmaddrs:
if not kl:
die(2,'Transaction has non-{} inputs, but no flat key list is present'.format(g.proj_name))
tmp = KeyAddrList(addrlist=non_mm_addrs)
tmp = KeyAddrList(
addrlist = non_mmaddrs )
tmp.add_wifs(kl)
m = tmp.list_missing('sec')
if m:
@ -161,12 +167,12 @@ async def txsign(tx,seed_files,kl,kal,tx_num_str=''):
keys += add_keys(tx,'inputs',seed_files,saved_seeds)
add_keys(tx,'outputs',seed_files,saved_seeds)
# this attr must not be written to file
# this (boolean) attr isn't needed in transaction file
tx.delete_attrs('inputs','have_wif')
tx.delete_attrs('outputs','have_wif')
extra_sids = set(saved_seeds) - tx.get_input_sids() - tx.get_output_sids()
if extra_sids:
msg('Unused Seed ID{}: {}'.format(suf(extra_sids),' '.join(extra_sids)))
msg(f"Unused Seed ID{suf(extra_sids)}: {' '.join(extra_sids)}")
return await tx.sign(tx_num_str,keys) # returns True or False

View file

@ -201,7 +201,7 @@ def check_or_create_dir(path):
try:
os.makedirs(path,0o700)
except:
die(2,"ERROR: unable to read or create path '{}'".format(path))
die(2,f'ERROR: unable to read or create path {path!r}')
from .opts import opt
@ -250,13 +250,12 @@ def suf(arg,suf_type='s',verb='none'):
die(2,'{}: invalid parameter for suf()'.format(arg))
return suf_types[verb][suf_type][n == 1]
def get_extension(f):
a,b = os.path.splitext(f)
return ('',b[1:])[len(b) > 1]
def get_extension(fn):
return os.path.splitext(fn)[1][1:]
def remove_extension(f,e):
a,b = os.path.splitext(f)
return (f,a)[len(b)>1 and b[1:]==e]
def remove_extension(fn,ext):
a,b = os.path.splitext(fn)
return a if b[1:] == ext else fn
def make_chksum_N(s,nchars,sep=False):
if isinstance(s,str): s = s.encode()

View file

@ -187,9 +187,9 @@ def create_src(code):
def check_version():
res = run(['solc','--version'],stdout=PIPE).stdout.decode()
ver = re.search(r'Version:\s*(.*)',res).group(1)
msg("Installed solc version: {}".format(ver))
msg(f'Installed solc version: {ver}')
if not re.search(r'{}\b'.format(solc_version_pat),ver):
ydie(1,'Incorrect Solidity compiler version (need version {})'.format(solc_version_pat))
ydie(1,f'Incorrect Solidity compiler version (need version {solc_version_pat})')
def compile_code(code):
check_version()
@ -197,14 +197,14 @@ def compile_code(code):
if not opt.stdout:
cmd += ['--output-dir', opt.outdir or '.']
cmd += ['-']
msg('Executing: {}'.format(' '.join(cmd)))
msg(f"Executing: {' '.join(cmd)}")
cp = run(cmd,input=code.encode(),stdout=PIPE,stderr=PIPE)
out = cp.stdout.decode().replace('\r','')
err = cp.stderr.decode().replace('\r','').strip()
if cp.returncode != 0:
rmsg('Solidity compiler produced the following error:')
msg(err)
rdie(2,'Solidity compiler exited with error (return val: {})'.format(cp.returncode))
rdie(2,f'Solidity compiler exited with error (return val: {cp.returncode})')
if err:
ymsg('Solidity compiler produced the following warning:')
msg(err)

View file

@ -45,7 +45,7 @@ import mmgen.tx
tx = mmgen.tx.MMGenTX(cmd_args[0])
if opt.verbose:
gmsg('Original transaction is in {} format'.format(g.coin))
gmsg(f'Original transaction is in {g.coin} format')
from mmgen.protocol import init_proto
g.proto = init_proto('bch')

View file

@ -83,7 +83,8 @@ class MMGenPexpect(object):
ret = self.p.wait()
if ret != self.req_exit_val and not opt.coverage:
die(1,red('test.py: spawned program exited with value {}'.format(ret)))
if opt.profile: return
if opt.profile:
return
if not self.skip_ok:
sys.stderr.write(green('OK\n') if opt.exact_output or opt.verbose else (' OK\n'))
return self

View file

@ -142,7 +142,7 @@ If no command is given, the whole test suite is run.
data_dir = os.path.join('test','data_dir' + ('','')[bool(os.getenv('MMGEN_DEBUG_UTF8'))])
# we need the values of two opts before running opts.init, so parse without initializing:
# we need some opt values before running opts.init, so parse without initializing:
_uopts = opts.init(opts_data,parse_only=True).user_opts
# step 1: delete data_dir symlink in ./test;

View file

@ -64,7 +64,8 @@ class TestSuiteAutosign(TestSuiteBase):
live=False,
simulate=False):
if self.skip_for_win(): return 'skip'
if self.skip_for_win():
return 'skip'
def make_wallet(opts):
t = self.spawn('mmgen-autosign',opts+['gen_key'],extra_desc='(gen_key)')

View file

@ -135,6 +135,8 @@ token_bals_getbalance = {
from .ts_base import *
from .ts_shared import *
coin = g.coin
class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
'Ethereum transacting, token deployment and tracking wallet operations'
networks = ('eth','etc')
@ -142,7 +144,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
tmpdir_nums = [22]
solc_vers = ('0.5.1','0.5.3') # 0.5.1: Raspbian Stretch, 0.5.3: Ubuntu Bionic
cmd_group = (
('setup', 'Ethereum Parity dev mode tests for coin {} (start parity)'.format(g.coin)),
('setup', 'Ethereum Parity dev mode tests for coin {} (start parity)'.format(coin)),
('wallet_upgrade1', 'upgrading the tracking wallet (v1 -> v2)'),
('wallet_upgrade2', 'upgrading the tracking wallet (v2 -> v3)'),
('addrgen', 'generating addresses'),
@ -154,17 +156,17 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('tx_status0_bad', 'getting the transaction status'),
('txsign1_ni', 'signing the transaction (non-interactive)'),
('txsend1', 'sending the transaction'),
('bal1', 'the {} balance'.format(g.coin)),
('bal1', 'the {} balance'.format(coin)),
('txcreate2', 'creating a transaction (spend from dev address to address :11)'),
('txsign2', 'signing the transaction'),
('txsend2', 'sending the transaction'),
('bal2', 'the {} balance'.format(g.coin)),
('bal2', 'the {} balance'.format(coin)),
('txcreate3', 'creating a transaction (spend from dev address to address :21)'),
('txsign3', 'signing the transaction'),
('txsend3', 'sending the transaction'),
('bal3', 'the {} balance'.format(g.coin)),
('bal3', 'the {} balance'.format(coin)),
('tx_status1', 'getting the transaction status'),
@ -174,14 +176,14 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('txsign4', 'signing the transaction'),
('txsend4', 'sending the transaction'),
('tx_status1a', 'getting the transaction status'),
('bal4', 'the {} balance'.format(g.coin)),
('bal4', 'the {} balance'.format(coin)),
('txcreate5', 'creating a transaction (fund burn address)'),
('txsign5', 'signing the transaction'),
('txsend5', 'sending the transaction'),
('addrimport_burn_addr',"importing burn address"),
('bal5', 'the {} balance'.format(g.coin)),
('bal5', 'the {} balance'.format(coin)),
('add_label1', 'adding a UTF-8 label (zh)'),
('chk_label1', 'the label'),
@ -196,7 +198,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('token_deploy1c', 'deploying ERC20 token #1 (Token)'),
('tx_status2', 'getting the transaction status'),
('bal6', 'the {} balance'.format(g.coin)),
('bal6', 'the {} balance'.format(coin)),
('token_compile2', 'compiling ERC20 token #2'),
@ -214,43 +216,43 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('token_addrimport', 'importing token addresses'),
('token_addrimport_batch','importing token addresses (dummy batch mode)'),
('bal7', 'the {} balance'.format(g.coin)),
('token_bal1', 'the {} balance and token balance'.format(g.coin)),
('bal7', 'the {} balance'.format(coin)),
('token_bal1', 'the {} balance and token balance'.format(coin)),
('token_txcreate1', 'creating a token transaction'),
('token_txsign1', 'signing the transaction'),
('token_txsend1', 'sending the transaction'),
('tx_status3', 'getting the transaction status'),
('token_bal2', 'the {} balance and token balance'.format(g.coin)),
('token_bal2', 'the {} balance and token balance'.format(coin)),
('token_txcreate2', 'creating a token transaction (to burn address)'),
('token_txbump', 'bumping the transaction fee'),
('token_txsign2', 'signing the transaction'),
('token_txsend2', 'sending the transaction'),
('token_bal3', 'the {} balance and token balance'.format(g.coin)),
('token_bal3', 'the {} balance and token balance'.format(coin)),
('del_dev_addr', "deleting the dev address"),
('bal1_getbalance', 'the {} balance (getbalance)'.format(g.coin)),
('bal1_getbalance', 'the {} balance (getbalance)'.format(coin)),
('addrimport_token_burn_addr',"importing the token burn address"),
('token_bal4', 'the {} balance and token balance'.format(g.coin)),
('token_bal4', 'the {} balance and token balance'.format(coin)),
('token_bal_getbalance','the token balance (getbalance)'),
('txcreate_noamt', 'creating a transaction (full amount send)'),
('txsign_noamt', 'signing the transaction'),
('txsend_noamt', 'sending the transaction'),
('bal8', 'the {} balance'.format(g.coin)),
('bal8', 'the {} balance'.format(coin)),
('token_bal5', 'the token balance'),
('token_txcreate_noamt', 'creating a token transaction (full amount send)'),
('token_txsign_noamt', 'signing the transaction'),
('token_txsend_noamt', 'sending the transaction'),
('bal9', 'the {} balance'.format(g.coin)),
('bal9', 'the {} balance'.format(coin)),
('token_bal6', 'the token balance'),
('listaddresses1', 'listaddresses'),
@ -268,7 +270,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('txdo_cached_balances', 'txdo (cached balances)'),
('txcreate_refresh_balances','refreshing balances'),
('bal10', 'the {} balance'.format(g.coin)),
('bal10', 'the {} balance'.format(coin)),
('token_txdo_cached_balances', 'token txdo (cached balances)'),
('token_txcreate_refresh_balances','refreshing token balances'),
@ -284,16 +286,16 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('token_twview2','twview --token=mm1 wide=1'),
('token_twview3','twview --token=mm1 wide=1 sort=age (ignored)'),
('edit_label1','adding label to addr #{} in {} tracking wallet (zh)'.format(del_addrs[0],g.coin)),
('edit_label2','adding label to addr #{} in {} tracking wallet (lat+cyr+gr)'.format(del_addrs[1],g.coin)),
('edit_label3','removing label from addr #{} in {} tracking wallet'.format(del_addrs[0],g.coin)),
('edit_label1','adding label to addr #{} in {} tracking wallet (zh)'.format(del_addrs[0],coin)),
('edit_label2','adding label to addr #{} in {} tracking wallet (lat+cyr+gr)'.format(del_addrs[1],coin)),
('edit_label3','removing label from addr #{} in {} tracking wallet'.format(del_addrs[0],coin)),
('token_edit_label1','adding label to addr #{} in {} token tracking wallet'.format(del_addrs[0],g.coin)),
('token_edit_label1','adding label to addr #{} in {} token tracking wallet'.format(del_addrs[0],coin)),
('remove_addr1','removing addr #{} from {} tracking wallet'.format(del_addrs[0],g.coin)),
('remove_addr2','removing addr #{} from {} tracking wallet'.format(del_addrs[1],g.coin)),
('token_remove_addr1','removing addr #{} from {} token tracking wallet'.format(del_addrs[0],g.coin)),
('token_remove_addr2','removing addr #{} from {} token tracking wallet'.format(del_addrs[1],g.coin)),
('remove_addr1','removing addr #{} from {} tracking wallet'.format(del_addrs[0],coin)),
('remove_addr2','removing addr #{} from {} tracking wallet'.format(del_addrs[1],coin)),
('token_remove_addr1','removing addr #{} from {} token tracking wallet'.format(del_addrs[0],coin)),
('token_remove_addr2','removing addr #{} from {} token tracking wallet'.format(del_addrs[1],coin)),
('stop', 'stopping parity'),
)
@ -472,7 +474,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
non_mmgen_inputs = 0,
interactive_fee = interactive_fee,
fee_res_fs = fee_res_fs,
eth_fee_res = True)
eth_fee_res = True )
def txbump(self,ext=',40000]{}.rawtx',fee='50G',add_args=[]):
ext = ext.format('' if g.debug_utf8 else '')
@ -520,7 +522,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
bal1 = token_bals_getbalance[idx][0]
bal2 = token_bals_getbalance[idx][1]
bal1 = Decimal(bal1)
if etc_adj and g.coin == 'ETC': bal1 += self.bal_corr
if etc_adj and g.coin == 'ETC':
bal1 += self.bal_corr
t = self.spawn('mmgen-tool', self.eth_args + extra_args + ['getbalance'])
t.expect(r'\n[0-9A-F]{8}: .* '+str(bal1),regex=True)
t.expect(r'\nNon-MMGen: .* '+bal2,regex=True)
@ -641,19 +644,25 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
from mmgen.altcoins.eth.tx import EthereumMMGenTX as etx
async def do_transfer():
for i in range(2):
tk = await TokenResolve(self.read_from_tmpfile('token_addr{}'.format(i+1)).strip())
imsg_r('\n' + await tk.info())
tk = await TokenResolve(
self.read_from_tmpfile(f'token_addr{i+1}').strip() )
imsg_r( '\n' + await tk.info() )
imsg('dev token balance (pre-send): {}'.format(await tk.get_balance(dfl_addr)))
imsg('Sending {} {} to address {} ({})'.format(amt,g.coin,usr_addrs[i],usr_mmaddrs[i]))
from mmgen.obj import ETHAmt
txid = await tk.transfer( dfl_addr, usr_addrs[i], amt, dfl_privkey,
start_gas = ETHAmt(60000,'wei'),
gasPrice = ETHAmt(8,'Gwei') )
txid = await tk.transfer(
dfl_addr,
usr_addrs[i],
amt,
dfl_privkey,
start_gas = ETHAmt(60000,'wei'),
gasPrice = ETHAmt(8,'Gwei') )
assert (await etx.get_exec_status(txid,True)) != 0,'Transfer of token funds failed. Aborting'
async def show_bals():
for i in range(2):
tk = await TokenResolve(self.read_from_tmpfile(f'token_addr{i+1}').strip())
tk = await TokenResolve(
self.read_from_tmpfile(f'token_addr{i+1}').strip() )
imsg('Token: {}'.format(await tk.get_symbol()))
imsg('dev token balance: {}'.format(await tk.get_balance(dfl_addr)))
imsg('usr token balance: {} ({} {})'.format(
@ -840,7 +849,9 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
def txcreate_refresh_balances(self,
bals=['2','3'],
args=['-B','--cached-balances','-i'],
total= '1000126.14829832312345678',adj_total=True,total_coin=g.coin):
total= '1000126.14829832312345678',
adj_total=True,
total_coin=g.coin):
if g.coin == 'ETC' and adj_total:
total = str(Decimal(total) + self.bal_corr)
t = self.spawn('mmgen-txcreate', self.eth_args + args)

View file

@ -287,8 +287,10 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
write_data_to_file(unspent_data_file,d,'Unspent outputs',quiet=True,ignore_opt_outdir=True)
os.environ['MMGEN_BOGUS_WALLET_DATA'] = unspent_data_file
bwd_msg = 'MMGEN_BOGUS_WALLET_DATA={}'.format(unspent_data_file)
if opt.print_cmdline: msg(bwd_msg)
if opt.log: self.tr.log_fd.write(bwd_msg + ' ')
if opt.print_cmdline:
msg(bwd_msg)
if opt.log:
self.tr.log_fd.write(bwd_msg + ' ')
if opt.verbose or opt.exact_output:
sys.stderr.write("Fake transaction wallet data written to file {!r}\n".format(unspent_data_file))
@ -327,9 +329,14 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
if non_mmgen_input:
from mmgen.obj import PrivKey
privkey = PrivKey(os.urandom(32),compressed=non_mmgen_input_compressed,pubkey_type='std')
privkey = PrivKey(
os.urandom(32),
compressed = non_mmgen_input_compressed,
pubkey_type = 'std' )
from mmgen.addr import AddrGenerator,KeyGenerator
rand_coinaddr = AddrGenerator('p2pkh').to_addr(KeyGenerator('std').to_pubhex(privkey))
rand_coinaddr = AddrGenerator(
'p2pkh'
).to_addr(KeyGenerator(g.proto,'std').to_pubhex(privkey))
of = joinpath(self.cfgs[non_mmgen_input]['tmpdir'],non_mmgen_fn)
write_data_to_file(
outfile = of,
@ -390,15 +397,15 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
return cmd_args + [tx_data[num]['addrfile'] for num in tx_data]
def txcreate_common(self,
sources=['1'],
non_mmgen_input='',
do_label=False,
txdo_args=[],
add_args=[],
view='n',
addrs_per_wallet=addrs_per_wallet,
non_mmgen_input_compressed=True,
cmdline_inputs=False):
sources = ['1'],
non_mmgen_input = '',
do_label = False,
txdo_args = [],
add_args = [],
view = 'n',
addrs_per_wallet = addrs_per_wallet,
non_mmgen_input_compressed = True,
cmdline_inputs = False )
if opt.verbose or opt.exact_output:
sys.stderr.write(green('Generating fake tracking wallet info\n'))
@ -417,7 +424,8 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
),'--outdir='+self.tr.trash_dir] + cmd_args[1:]
end_silence()
if opt.verbose or opt.exact_output: sys.stderr.write('\n')
if opt.verbose or opt.exact_output:
sys.stderr.write('\n')
t = self.spawn(
'mmgen-'+('txcreate','txdo')[bool(txdo_args)],
@ -446,14 +454,15 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
t.expect('Continue anyway? (y/N): ','y')
outputs_list = [(addrs_per_wallet+1)*i + 1 for i in range(len(tx_data))]
if non_mmgen_input: outputs_list.append(len(tx_data)*(addrs_per_wallet+1) + 1)
if non_mmgen_input:
outputs_list.append(len(tx_data)*(addrs_per_wallet+1) + 1)
self.txcreate_ui_common(t,
menu=(['M'],['M','D','m','g'])[self.test_name=='txcreate'],
inputs=' '.join(map(str,outputs_list)),
add_comment=('',tx_label_lat_cyr_gr)[do_label],
non_mmgen_inputs=(0,1)[bool(non_mmgen_input and not txdo_args)],
view=view)
menu = (['M'],['M','D','m','g'])[self.test_name=='txcreate'],
inputs = ' '.join(map(str,outputs_list)),
add_comment = ('',tx_label_lat_cyr_gr)[do_label],
non_mmgen_inputs = (0,1)[bool(non_mmgen_input and not txdo_args)],
view = view )
return t
@ -681,12 +690,23 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
return self.addrgen(wf,pf='')
def txcreate4(self,f1,f2,f3,f4,f5,f6):
return self.txcreate_common(sources=['1','2','3','4','14'],non_mmgen_input='4',do_label=True,view='y')
return self.txcreate_common(
sources = ['1', '2', '3', '4', '14'],
non_mmgen_input = '4',
do_label = True,
view = 'y' )
def txsign4(self,f1,f2,f3,f4,f5,f6):
non_mm_file = joinpath(self.tmpdir,non_mmgen_fn)
a = ['-d',self.tmpdir,'-i','brain','-b'+self.bw_params,'-p1','-k',non_mm_file,'-M',f6,f1,f2,f3,f4,f5]
t = self.spawn('mmgen-txsign',a)
add_args = [
'-d', self.tmpdir,
'-i', 'brain',
'-b' + self.bw_params,
'-p1',
'--keys-from-file=' + non_mm_file,
'--mmgen-keys-from-file=' + f6,
f1, f2, f3, f4, f5 ]
t = self.spawn('mmgen-txsign',add_args)
t.license()
t.do_decrypt_ka_data(hp='1',pw=self.cfgs['14']['kapasswd'])
t.view_tx('t')
@ -699,7 +719,13 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
def txdo4(self,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12):
non_mm_file = joinpath(self.tmpdir,non_mmgen_fn)
add_args = ['-d',self.tmpdir,'-i','brain','-b'+self.bw_params,'-p1','-k',non_mm_file,'-M',f12]
add_args = [
'-d', self.tmpdir,
'-i', 'brain',
'-b'+self.bw_params,
'-p1',
'--keys-from-file=' + non_mm_file,
'--mmgen-keys-from-file=' + f12 ]
self.get_file_with_ext('sigtx',delete_all=True) # delete tx signed by txsign4
t = self.txcreate_common(sources=['1','2','3','4','14'],
non_mmgen_input='4',do_label=True,txdo_args=[f7,f8,f9,f10],add_args=add_args)
@ -725,7 +751,10 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
return self.addrgen(wf,pf='')
def txcreate5(self,addrfile):
return self.txcreate_common(sources=['20'],non_mmgen_input='20',non_mmgen_input_compressed=False)
return self.txcreate_common(
sources = ['20'],
non_mmgen_input = '20',
non_mmgen_input_compressed = False )
def txsign5(self,wf,txf,bad_vsize=True,add_args=[]):
non_mm_file = joinpath(self.tmpdir,non_mmgen_fn)
@ -754,7 +783,10 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
def txcreate6(self,addrfile):
return self.txcreate_common(
sources=['21'],non_mmgen_input='21',non_mmgen_input_compressed=False,add_args=['--vsize-adj=1.08'])
sources = ['21'],
non_mmgen_input = '21',
non_mmgen_input_compressed = False,
add_args = ['--vsize-adj=1.08'] )
def txsign6(self,txf,wf):
return self.txsign5(txf,wf,bad_vsize=False,add_args=['--vsize-adj=1.08'])

View file

@ -137,7 +137,8 @@ class TestSuiteRefTX(TestSuiteMain,TestSuiteBase):
return TestSuiteMain.__init__(self,trunner,cfgs,spawn)
def ref_tx_addrgen(self,atype):
if atype not in g.proto.mmtypes: return
if atype not in g.proto.mmtypes:
return
t = self.spawn('mmgen-addrgen',['--outdir='+self.tmpdir,'--type='+atype,dfl_words_file,'1-2'])
t.read()
return t

View file

@ -369,8 +369,11 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
t.expect('Mined 1 block')
return t
def fund_bob(self): return self.fund_wallet('bob','C',rtFundAmt)
def fund_alice(self): return self.fund_wallet('alice',('L','S')[g.proto.cap('segwit')],rtFundAmt)
def fund_bob(self):
return self.fund_wallet('bob','C',rtFundAmt)
def fund_alice(self):
return self.fund_wallet('alice',('L','S')[g.proto.cap('segwit')],rtFundAmt)
def user_twview(self,user,chk=None,sort='age'):
t = self.spawn('mmgen-tool',['--'+user,'twview','sort='+sort])
@ -579,16 +582,24 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return [self.get_addr_from_addrlist(user,sid,mmtype,idx-1)+amt_str for mmtype,idx,amt_str in data]
def bob_rbf_1output_create(self):
if g.coin != 'BTC': return 'skip' # non-coin-dependent test, so run just once for BTC
if g.coin != 'BTC':
return 'skip' # non-coin-dependent test, so run just once for BTC
out_addr = self._create_tx_outputs('alice',(('B',5,''),))
t = self.spawn('mmgen-txcreate',['-d',self.tr.trash_dir,'-B','--bob','--rbf'] + out_addr)
return self.txcreate_ui_common(t,menu=[],inputs='3',interactive_fee='3s') # out amt: 199.99999343
def bob_rbf_1output_bump(self):
if g.coin != 'BTC': return 'skip'
if g.coin != 'BTC':
return 'skip'
ext = '9343,3]{x}.testnet.rawtx'.format(x='' if g.debug_utf8 else '')
txfile = get_file_with_ext(self.tr.trash_dir,ext,delete=False,no_dot=True)
return self.user_txbump('bob',self.tr.trash_dir,txfile,'8s',has_label=False,signed_tx=False,one_output=True)
return self.user_txbump('bob',
self.tr.trash_dir,
txfile,
'8s',
has_label = False,
signed_tx = False,
one_output = True )
def bob_send_maybe_rbf(self):
outputs_cl = self._create_tx_outputs('alice',(('L',1,',60'),('C',1,',40'))) # alice_sid:L:1, alice_sid:C:1
@ -610,7 +621,8 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return self.user_txdo('alice',None,outputs_cl,'1') # fee=None
def user_txbump(self,user,outdir,txfile,fee,add_args=[],has_label=True,signed_tx=True,one_output=False):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
os.environ['MMGEN_BOGUS_SEND'] = ''
t = self.spawn('mmgen-txbump',
['-d',outdir,'--'+user,'--tx-fee='+fee,'--output-to-reduce=c'] + add_args + [txfile])
@ -658,17 +670,20 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return 'ok'
def bob_rbf_status(self,fee,exp1,exp2=''):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
ext = ',{}]{x}.testnet.sigtx'.format(fee[:-1],x='' if g.debug_utf8 else '')
txfile = self.get_file_with_ext(ext,delete=False,no_dot=True)
return self.user_txsend_status('bob',txfile,exp1,exp2)
def bob_rbf_status1(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
return self.bob_rbf_status(rtFee[1],'in mempool, replaceable')
def get_mempool2(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
mp = self._get_mempool()
if len(mp) != 1:
rdie(2,'Mempool has more or less than one TX!')
@ -679,28 +694,33 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return 'ok'
def bob_rbf_status2(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Transaction has been replaced','{} in mempool'.format(new_txid))
def bob_rbf_status3(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
return self.bob_rbf_status(rtFee[2],'status: in mempool, replaceable')
def bob_rbf_status4(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 1 confirmation',
'Replacing transactions:\s+{}'.format(new_txid))
def bob_rbf_status5(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
return self.bob_rbf_status(rtFee[2],'Transaction has 1 confirmation')
def bob_rbf_status6(self):
if not g.proto.cap('rbf'): return 'skip'
if not g.proto.cap('rbf'):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 2 confirmations',
@ -750,7 +770,10 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
amts = (1.12345678,2.87654321,3.33443344,4.00990099,5.43214321)
outputs1 = list(map('{},{}'.format,addrs,amts))
sid = self._user_sid('bob')
l1,l2 = (':S',':B') if 'B' in g.proto.mmtypes else (':S',':S') if g.proto.cap('segwit') else (':L',':L')
l1,l2 = (
(':S',':B') if 'B' in g.proto.mmtypes else
(':S',':S') if g.proto.cap('segwit') else
(':L',':L') )
outputs2 = [sid+':C:2,6.333', sid+':L:3,6.667',sid+l1+':4,0.123',sid+l2+':5']
return self.user_txdo('bob',rtFee[5],outputs1+outputs2,'1-2')

View file

@ -37,7 +37,8 @@ from mmgen.baseconv import *
NL = ('\n','\r\n')[g.platform=='win']
def is_str(s): return type(s) == str
def is_str(s):
return type(s) == str
def md5_hash(s):
from hashlib import md5
@ -781,7 +782,7 @@ def run_test(gid,cmd_name):
data = data[k]
m2 = ' ({})'.format(k)
else:
qmsg("-- no data for {} ({}) - skipping".format(cmd_name,k))
qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
return
else:
if g.coin != 'BTC' or g.proto.testnet:
@ -805,7 +806,7 @@ def run_test(gid,cmd_name):
if m:
return { b'None': None, b'False': False }[m.group(1)]
else:
ydie(1,'Spawned program exited with error: {}'.format(cp.stderr))
ydie(1,f'Spawned program exited with error: {cp.stderr}')
return cmd_out.strip()
@ -878,32 +879,39 @@ def run_test(gid,cmd_name):
except: vmsg('Output:\n{}\n'.format(repr(cmd_out)))
def check_output(out,chk):
if isinstance(chk,str): chk = chk.encode()
if isinstance(out,int): out = str(out).encode()
if isinstance(out,str): out = out.encode()
if isinstance(chk,str):
chk = chk.encode()
if isinstance(out,int):
out = str(out).encode()
if isinstance(out,str):
out = out.encode()
err_fs = "Output ({!r}) doesn't match expected output ({!r})"
try: outd = out.decode()
except: outd = None
if type(chk).__name__ == 'function':
assert chk(outd),"{}({}) failed!".format(chk.__name__,outd)
assert chk(outd), f'{chk.__name__}({outd}) failed!'
elif type(chk) == dict:
for k,v in chk.items():
if k == 'boolfunc':
assert v(outd),"{}({}) failed!".format(v.__name__,outd)
assert v(outd), f'{v.__name__}({outd}) failed!'
elif k == 'value':
assert outd == v, err_fs.format(outd,v)
else:
outval = getattr(__builtins__,k)(out)
if outval != v:
die(1,"{}({}) returned {}, not {}!".format(k,out,outval,v))
die(1,f'{k}({out}) returned {outval}, not {v}!')
elif chk is not None:
assert out == chk, err_fs.format(out,chk)
if type(out) == tuple and type(out[0]).__name__ == 'function':
func_out = out[0](cmd_out)
assert func_out == out[1],(
"{}({}) == {} failed!\nOutput: {}".format(out[0].__name__,cmd_out,out[1],func_out))
'{}({}) == {} failed!\nOutput: {}'.format(
out[0].__name__,
cmd_out,
out[1],
func_out ))
elif isinstance(out,(list,tuple)):
for co,o in zip(cmd_out.split(NL) if opt.fork else cmd_out,out):
check_output(co,o)
@ -918,13 +926,13 @@ def docstring_head(obj):
return obj.__doc__.strip().split('\n')[0]
def do_group(gid):
qmsg(blue("Testing {}".format(
"command group '{}'".format(gid) if opt.names
else docstring_head(tc.classes['MMGenToolCmd'+gid]))))
qmsg(blue('Testing ' +
f'command group {gid!r}' if opt.names else
docstring_head(tc.classes['MMGenToolCmd'+gid]) ))
for cname in tc.classes['MMGenToolCmd'+gid].user_commands:
if cname not in tests[gid]:
m = 'No test for command {!r} in group {!r}!'.format(cname,gid)
m = f'No test for command {cname!r} in group {gid!r}!'
if opt.die_on_missing:
die(1,m+' Aborting')
else:
@ -958,7 +966,9 @@ tc = tool.MMGenToolCmds
if opt.list_tests:
Msg('Available tests:')
for gid in tests:
Msg(' {:6} - {}'.format(gid,docstring_head(tc.classes['MMGenToolCmd'+gid])))
Msg(' {:6} - {}'.format(
gid,
docstring_head(tc.classes['MMGenToolCmd'+gid]) ))
sys.exit(0)
if opt.list_tested_cmds:
@ -993,19 +1003,22 @@ else:
start_time = int(time.time())
try:
if cmd_args:
for cmd in cmd_args:
if cmd in tests:
do_group(cmd)
else:
if not do_cmd_in_group(cmd):
die(1,"'{}': not a recognized test or test group".format(cmd))
else:
for garg in tests:
do_group(garg)
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
def main():
try:
if cmd_args:
for cmd in cmd_args:
if cmd in tests:
await do_group(cmd)
else:
if not do_cmd_in_group(cmd):
die(1,f'{cmd!r}: not a recognized test or test group')
else:
for garg in tests:
await do_group(garg)
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
main()
t = int(time.time()) - start_time
gmsg('All requested tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60))

View file

@ -88,15 +88,16 @@ class unit_test(object):
extra_desc,
'' if opt.quiet else '\n'))
else:
Msg_r('Testing transactions from {!r}'.format(fn))
if not opt.quiet: Msg('')
Msg_r(f'Testing {extra_desc} transactions from {fn!r}')
if not opt.quiet:
Msg('')
async def test_core_vectors():
self._get_core_repo_root()
fn_b = 'src/test/data/tx_valid.json'
fn = os.path.join(self.core_repo_root,fn_b)
data = json.loads(open(fn).read())
print_info(fn_b,'Core test vectors')
print_info(fn_b,'Core test vector')
n = 1
for e in data:
if type(e[0]) == list:
@ -112,7 +113,7 @@ class unit_test(object):
('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'),
# ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx')
)
print_info('test/ref/*rawtx','MMGen reference transactions')
print_info('test/ref/*rawtx','MMGen reference')
g.rpc_port = None
for n,(coin,testnet,fn) in enumerate(fns):
g.proto = init_proto(coin,testnet=testnet)