whitespace: tests, top level

This commit is contained in:
The MMGen Project 2024-10-18 10:32:12 +00:00
commit 74bc49f973
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
11 changed files with 835 additions and 801 deletions

View file

@ -19,20 +19,20 @@ try:
except ImportError:
from test.include import test_init
from mmgen.cfg import gc,Config
from mmgen.cfg import gc, Config
from mmgen.util import msg
from mmgen.altcoin.params import CoinInfo
def test_equal(desc,a,b,*cdata):
def test_equal(desc, a, b, *cdata):
if type(a) is int:
a = hex(a)
b = hex(b)
(network,coin,_,b_desc,verbose) = cdata
(network, coin, _, b_desc, verbose) = cdata
if verbose:
msg(f' {desc:20}: {a!r}')
if a != b:
raise ValueError(
f'{desc.capitalize()}s for {coin.upper()} {network} do not match:\n CoinInfo: {a}\n {b_desc}: {b}' )
f'{desc.capitalize()}s for {coin.upper()} {network} do not match:\n CoinInfo: {a}\n {b_desc}: {b}')
class TestCoinInfo(CoinInfo):
@ -40,41 +40,41 @@ class TestCoinInfo(CoinInfo):
# No check for segwit, p2sh check skipped if source doesn't support it
cross_checks = {
'2GIVE': ['wn'],
'42': ['vg','wn'],
'42': ['vg', 'wn'],
'611': ['wn'],
'AC': ['lb','vg'],
'AC': ['lb', 'vg'],
'ACOIN': ['wn'],
'ALF': ['wn'],
'ANC': ['vg','wn'],
'ANC': ['vg', 'wn'],
'APEX': ['wn'],
'ARCO': ['wn'],
'ARG': ['pc'],
'AUR': ['vg','wn'],
'AUR': ['vg', 'wn'],
'BCH': ['wn'],
'BLK': ['lb','vg','wn'],
'BQC': ['vg','wn'],
'BLK': ['lb', 'vg', 'wn'],
'BQC': ['vg', 'wn'],
'BSTY': ['wn'],
'BTC': ['lb','vg','wn'],
'BTCD': ['lb','vg','wn'],
'BTC': ['lb', 'vg', 'wn'],
'BTCD': ['lb', 'vg', 'wn'],
'BUCKS': ['wn'],
'CASH': ['wn'],
'CBX': ['wn'],
'CCN': ['lb','vg','wn'],
'CDN': ['lb','vg','wn'],
'CCN': ['lb', 'vg', 'wn'],
'CDN': ['lb', 'vg', 'wn'],
'CHC': ['wn'],
'CLAM': ['lb','vg'],
'CON': ['vg','wn'],
'CLAM': ['lb', 'vg'],
'CON': ['vg', 'wn'],
'CPC': ['wn'],
'DASH': ['lb','pc','vg','wn'],
'DASH': ['lb', 'pc', 'vg', 'wn'],
'DCR': ['pc'],
'DFC': ['pc'],
'DGB': ['lb','vg'],
'DGC': ['lb','vg','wn'],
'DOGE': ['lb','pc','vg','wn'],
'DOGED': ['lb','vg','wn'],
'DOPE': ['lb','vg'],
'DVC': ['vg','wn'],
'EFL': ['lb','vg','wn'],
'DGB': ['lb', 'vg'],
'DGC': ['lb', 'vg', 'wn'],
'DOGE': ['lb', 'pc', 'vg', 'wn'],
'DOGED': ['lb', 'vg', 'wn'],
'DOPE': ['lb', 'vg'],
'DVC': ['vg', 'wn'],
'EFL': ['lb', 'vg', 'wn'],
'EMC': ['vg'],
'EMD': ['wn'],
'ESP': ['wn'],
@ -85,162 +85,162 @@ class TestCoinInfo(CoinInfo):
'FLO': ['wn'],
'FLT': ['wn'],
'FST': ['wn'],
'FTC': ['lb','pc','vg','wn'],
'GCR': ['lb','vg'],
'FTC': ['lb', 'pc', 'vg', 'wn'],
'GCR': ['lb', 'vg'],
'GOOD': ['wn'],
'GRC': ['vg','wn'],
'GUN': ['vg','wn'],
'HAM': ['vg','wn'],
'GRC': ['vg', 'wn'],
'GUN': ['vg', 'wn'],
'HAM': ['vg', 'wn'],
'HTML5': ['wn'],
'HYP': ['wn'],
'ICASH': ['wn'],
'INFX': ['wn'],
'IPC': ['wn'],
'JBS': ['lb','pc','vg','wn'],
'JBS': ['lb', 'pc', 'vg', 'wn'],
'JUDGE': ['wn'],
'LANA': ['wn'],
'LAT': ['wn'],
'LDOGE': ['wn'],
'LMC': ['wn'],
'LTC': ['lb','vg','wn'],
'LTC': ['lb', 'vg', 'wn'],
'MARS': ['wn'],
'MEC': ['pc','wn'],
'MEC': ['pc', 'wn'],
'MINT': ['wn'],
'MOBI': ['wn'],
'MONA': ['lb','vg'],
'MONA': ['lb', 'vg'],
'MOON': ['wn'],
'MUE': ['lb','vg'],
'MUE': ['lb', 'vg'],
'MXT': ['wn'],
'MYR': ['pc'],
'MYRIAD': ['vg','wn'],
'MZC': ['lb','pc','vg','wn'],
'NEOS': ['lb','vg'],
'MYRIAD': ['vg', 'wn'],
'MZC': ['lb', 'pc', 'vg', 'wn'],
'NEOS': ['lb', 'vg'],
'NEVA': ['wn'],
'NKA': ['wn'],
'NLG': ['vg','wn'],
'NMC': ['lb','vg'],
'NVC': ['lb','vg','wn'],
'OK': ['lb','vg'],
'OMC': ['vg','wn'],
'ONION': ['vg','wn'],
'NLG': ['vg', 'wn'],
'NMC': ['lb', 'vg'],
'NVC': ['lb', 'vg', 'wn'],
'OK': ['lb', 'vg'],
'OMC': ['vg', 'wn'],
'ONION': ['vg', 'wn'],
'PART': ['wn'],
'PINK': ['vg','wn'],
'PINK': ['vg', 'wn'],
'PIVX': ['wn'],
'PKB': ['lb','vg','wn'],
'PND': ['lb','vg','wn'],
'POT': ['lb','vg','wn'],
'PPC': ['lb','vg','wn'],
'PTC': ['vg','wn'],
'PKB': ['lb', 'vg', 'wn'],
'PND': ['lb', 'vg', 'wn'],
'POT': ['lb', 'vg', 'wn'],
'PPC': ['lb', 'vg', 'wn'],
'PTC': ['vg', 'wn'],
'PXC': ['wn'],
'QRK': ['wn'],
'RAIN': ['wn'],
'RBT': ['wn'],
'RBY': ['lb','vg'],
'RDD': ['vg','wn'],
'RIC': ['pc','vg','wn'],
'SDC': ['lb','vg'],
'RBY': ['lb', 'vg'],
'RDD': ['vg', 'wn'],
'RIC': ['pc', 'vg', 'wn'],
'SDC': ['lb', 'vg'],
'SIB': ['wn'],
'SMLY': ['wn'],
'SONG': ['wn'],
'SPR': ['vg','wn'],
'START': ['lb','vg'],
'SPR': ['vg', 'wn'],
'START': ['lb', 'vg'],
'SYS': ['wn'],
'TAJ': ['wn'],
'TIT': ['wn'],
'TPC': ['lb','vg'],
'TPC': ['lb', 'vg'],
'TRC': ['wn'],
'TTC': ['wn'],
'TX': ['wn'],
'UNO': ['pc','vg','wn'],
'VIA': ['lb','pc','vg','wn'],
'VPN': ['lb','vg'],
'VTC': ['lb','vg','wn'],
'WDC': ['vg','wn'],
'UNO': ['pc', 'vg', 'wn'],
'VIA': ['lb', 'pc', 'vg', 'wn'],
'VPN': ['lb', 'vg'],
'VTC': ['lb', 'vg', 'wn'],
'WDC': ['vg', 'wn'],
'WISC': ['wn'],
'WKC': ['vg','wn'],
'WKC': ['vg', 'wn'],
'WSX': ['wn'],
'XCN': ['wn'],
'XGB': ['wn'],
'XPM': ['lb','vg','wn'],
'XPM': ['lb', 'vg', 'wn'],
'XST': ['wn'],
'XVC': ['wn'],
'ZET': ['wn'],
'ZOOM': ['lb','vg'],
'ZRC': ['lb','vg']
'ZOOM': ['lb', 'vg'],
'ZRC': ['lb', 'vg']
}
@classmethod
def verify_leading_symbols(cls,quiet=False,verbose=False):
def verify_leading_symbols(cls, quiet=False, verbose=False):
for network in ('mainnet','testnet'):
for network in ('mainnet', 'testnet'):
for coin in [e.symbol for e in cls.coin_constants[network]]:
e = cls.get_entry(coin,network)
cdata = (network,coin,e,'Computed value',verbose)
e = cls.get_entry(coin, network)
cdata = (network, coin, e, 'Computed value', verbose)
if not quiet:
msg(f'{coin} {network}')
vn_info = e.p2pkh_info
ret = cls.find_addr_leading_symbol(vn_info[0])
test_equal('P2PKH leading symbol',vn_info[1],ret,*cdata)
test_equal('P2PKH leading symbol', vn_info[1], ret, *cdata)
vn_info = e.p2sh_info
if vn_info:
ret = cls.find_addr_leading_symbol(vn_info[0])
test_equal('P2SH leading symbol',vn_info[1],ret,*cdata)
test_equal('P2SH leading symbol', vn_info[1], ret, *cdata)
@classmethod
def verify_core_coin_data(cls,cfg,quiet=False,verbose=False):
from mmgen.protocol import CoinProtocol,init_proto
def verify_core_coin_data(cls, cfg, quiet=False, verbose=False):
from mmgen.protocol import CoinProtocol, init_proto
for network in ('mainnet','testnet'):
for network in ('mainnet', 'testnet'):
for coin in gc.core_coins:
e = cls.get_entry(coin,network)
e = cls.get_entry(coin, network)
if e:
proto = init_proto( cfg, coin, network=network )
cdata = (network,coin,e,type(proto).__name__,verbose)
proto = init_proto(cfg, coin, network=network)
cdata = (network, coin, e, type(proto).__name__, verbose)
if not quiet:
msg(f'Verifying {coin.upper()} {network}')
if coin != 'bch': # TODO
test_equal('coin name',e.name,proto.name,*cdata)
test_equal('coin name', e.name, proto.name, *cdata)
if e.trust_level != -1:
test_equal('Trust level',e.trust_level,CoinProtocol.coins[coin].trust_level,*cdata)
test_equal('Trust level', e.trust_level, CoinProtocol.coins[coin].trust_level, *cdata)
test_equal(
'WIF version number',
e.wif_ver_num,
int.from_bytes(proto.wif_ver_bytes['std'],'big'),
*cdata )
int.from_bytes(proto.wif_ver_bytes['std'], 'big'),
*cdata)
test_equal(
'P2PKH version number',
e.p2pkh_info[0],
int.from_bytes(proto.addr_fmt_to_ver_bytes['p2pkh'],'big'),
*cdata )
int.from_bytes(proto.addr_fmt_to_ver_bytes['p2pkh'], 'big'),
*cdata)
test_equal(
'P2SH version number',
e.p2sh_info[0],
int.from_bytes(proto.addr_fmt_to_ver_bytes['p2sh'],'big'),
*cdata )
int.from_bytes(proto.addr_fmt_to_ver_bytes['p2sh'], 'big'),
*cdata)
# Data is one of the coin_constants lists above. Normalize ints to hex of correct width, add
# missing leading letters, set trust level from external_tests.
# Insert a coin entry from outside source, set version info leading letters to '?' and trust level
# to 0, then run TestCoinInfo.fix_table(data). 'has_segwit' field is updated manually for now.
@classmethod
def fix_table(cls,data):
def fix_table(cls, data):
import re
def myhex(n):
return '0x{:0{}x}'.format(n,2 if n < 256 else 4)
return '0x{:0{}x}'.format(n, 2 if n < 256 else 4)
def fix_ver_info(e,k):
def fix_ver_info(e, k):
e[k] = list(e[k])
e[k][0] = myhex(e[k][0])
s1 = cls.find_addr_leading_symbol(int(e[k][0][2:],16))
s1 = cls.find_addr_leading_symbol(int(e[k][0][2:], 16))
m = f'Fixing leading address letter for coin {e["symbol"]} ({e[k][1]!r} --> {s1})'
if e[k][1] != '?':
assert s1 == e[k][1], f'First letters do not match! {m}'
@ -263,17 +263,17 @@ class TestCoinInfo(CoinInfo):
for e in data:
e = e._asdict()
e['wif_ver_num'] = myhex(e['wif_ver_num'])
sym,trust = e['symbol'],e['trust_level']
sym, trust = e['symbol'], e['trust_level']
fix_ver_info(e,'p2pkh_info')
if isinstance(e['p2sh_info'],tuple):
fix_ver_info(e,'p2sh_info')
fix_ver_info(e, 'p2pkh_info')
if isinstance(e['p2sh_info'], tuple):
fix_ver_info(e, 'p2sh_info')
for k in e.keys():
e[k] = repr(e[k])
e[k] = re.sub(r"'0x(..)'",r'0x\1',e[k])
e[k] = re.sub(r"'0x(....)'",r'0x\1',e[k])
e[k] = re.sub(r' ',r'',e[k]) + ('',',')[k != 'trust_level']
e[k] = re.sub(r"'0x(..)'", r'0x\1', e[k])
e[k] = re.sub(r"'0x(....)'", r'0x\1', e[k])
e[k] = re.sub(r' ', r'', e[k]) + ('', ',')[k != 'trust_level']
if trust != -1:
if sym in tt:
@ -295,29 +295,29 @@ class TestCoinInfo(CoinInfo):
msg(f'Processed {len(data)} entries')
@classmethod
def find_addr_leading_symbol(cls,ver_num,verbose=False):
def find_addr_leading_symbol(cls, ver_num, verbose=False):
if ver_num == 0:
return '1'
def phash2addr(ver_num,pk_hash):
def phash2addr(ver_num, pk_hash):
from mmgen.proto.btc.common import b58chk_encode
bl = ver_num.bit_length()
ver_bytes = int.to_bytes(ver_num,bl//8 + bool(bl%8),'big')
ver_bytes = int.to_bytes(ver_num, bl//8 + bool(bl%8), 'big')
return b58chk_encode(ver_bytes + pk_hash)
low = phash2addr(ver_num,b'\x00'*20)
high = phash2addr(ver_num,b'\xff'*20)
low = phash2addr(ver_num, b'\x00'*20)
high = phash2addr(ver_num, b'\xff'*20)
if verbose:
print('low address: ' + low)
print('high address: ' + high)
l1,h1 = low[0],high[0]
return (l1,h1) if l1 != h1 else l1
l1, h1 = low[0], high[0]
return (l1, h1) if l1 != h1 else l1
@classmethod
def print_symbols(cls,include_names=False,reverse=False):
def print_symbols(cls, include_names=False, reverse=False):
for e in cls.coin_constants['mainnet']:
if reverse:
print(f'{e.symbol:6} {e.name}')
@ -340,10 +340,10 @@ class TestCoinInfo(CoinInfo):
tt[k] = cls.trust_override[k]
return tt
trust_override = {'BTC':3,'BCH':3,'LTC':3,'DASH':1,'EMC':2}
trust_override = {'BTC':3, 'BCH':3, 'LTC':3, 'DASH':1, 'EMC':2}
@classmethod
def get_test_support(cls,coin,addr_type,network,toolname=None,verbose=False):
def get_test_support(cls, coin, addr_type, network, toolname=None, verbose=False):
"""
If requested tool supports coin/addr_type/network triplet, return tool name.
If 'tool' is None, return tool that supports coin/addr_type/network triplet.
@ -359,11 +359,11 @@ class TestCoinInfo(CoinInfo):
if verbose:
m1 = 'Requested tool {t!r} does not support coin {c} on network {n}'
m2 = 'No test tool found for coin {c} on network {n}'
msg((m1 if toolname else m2).format(t=tool,c=coin,n=network))
msg((m1 if toolname else m2).format(t=tool, c=coin, n=network))
return None
if addr_type == 'zcash_z':
if toolname in (None,'zcash-mini'):
if toolname in (None, 'zcash-mini'):
return 'zcash-mini'
else:
if verbose:
@ -383,7 +383,7 @@ class TestCoinInfo(CoinInfo):
if toolname: # skip whitelists
return tool
if addr_type in ('segwit','bech32'):
if addr_type in ('segwit', 'bech32'):
st = cls.external_tests_segwit_whitelist
if addr_type in st and coin in st[addr_type]:
return tool
@ -391,7 +391,7 @@ class TestCoinInfo(CoinInfo):
if verbose:
m1 = 'Requested tool {t!r} does not support coin {c}, addr_type {a!r}, on network {n}'
m2 = 'No test tool found supporting coin {c}, addr_type {a!r}, on network {n}'
msg((m1 if toolname else m2).format(t=tool,c=coin,n=network,a=addr_type))
msg((m1 if toolname else m2).format(t=tool, c=coin, n=network, a=addr_type))
return None
return tool
@ -403,19 +403,18 @@ class TestCoinInfo(CoinInfo):
'pycoin': (
'DASH', # only compressed
'BCH',
'BTC','LTC','VIA','FTC','DOGE','MEC',
'JBS','MZC','RIC','DFC','FAI','ARG','ZEC','DCR'),
'keyconv': (
'BCH',
# broken: PIVX
'42','AC','AIB','ANC','ARS','ATMOS','AUR','BLK','BQC','BTC','TEST','BTCD','CCC','CCN','CDN',
'CLAM','CNC','CNOTE','CON','CRW','DEEPONION','DGB','DGC','DMD','DOGED','DOGE','DOPE',
'DVC','EFL','EMC','EXCL','FAIR','FLOZ','FTC','GAME','GAP','GCR','GRC','GRS','GUN','HAM','HODL',
'IXC','JBS','LBRY','LEAF','LTC','MMC','MONA','MUE','MYRIAD','MZC','NEOS','NLG','NMC','NVC',
'NYAN','OK','OMC','PIGGY','PINK','PKB','PND','POT','PPC','PTC','PTS','QTUM','RBY','RDD',
'RIC','SCA','SDC','SKC','SPR','START','SXC','TPC','UIS','UNO','VIA','VPN','VTC','WDC','WKC',
'WUBS', 'XC', 'XPM', 'YAC', 'ZOOM', 'ZRC'),
'ethkey': ('ETH','ETC'),
'BTC', 'LTC', 'VIA', 'FTC', 'DOGE', 'MEC',
'JBS', 'MZC', 'RIC', 'DFC', 'FAI', 'ARG', 'ZEC', 'DCR'),
'keyconv': ( # broken: PIVX
'BCH', '42', 'AC', 'AIB', 'ANC', 'ARS', 'ATMOS', 'AUR', 'BLK', 'BQC', 'BTC', 'TEST',
'BTCD', 'CCC', 'CCN', 'CDN', 'CLAM', 'CNC', 'CNOTE', 'CON', 'CRW', 'DEEPONION', 'DGB',
'DGC', 'DMD', 'DOGED', 'DOGE', 'DOPE', 'DVC', 'EFL', 'EMC', 'EXCL', 'FAIR', 'FLOZ', 'FTC',
'GAME', 'GAP', 'GCR', 'GRC', 'GRS', 'GUN', 'HAM', 'HODL', 'IXC', 'JBS', 'LBRY', 'LEAF',
'LTC', 'MMC', 'MONA', 'MUE', 'MYRIAD', 'MZC', 'NEOS', 'NLG', 'NMC', 'NVC', 'NYAN', 'OK',
'OMC', 'PIGGY', 'PINK', 'PKB', 'PND', 'POT', 'PPC', 'PTC', 'PTS', 'QTUM', 'RBY', 'RDD',
'RIC', 'SCA', 'SDC', 'SKC', 'SPR', 'START', 'SXC', 'TPC', 'UIS', 'UNO', 'VIA', 'VPN',
'VTC', 'WDC', 'WKC', 'WUBS', 'XC', 'XPM', 'YAC', 'ZOOM', 'ZRC'),
'ethkey': ('ETH', 'ETC'),
'zcash-mini': ('ZEC',),
'monero-python': ('XMR',),
},
@ -423,7 +422,7 @@ class TestCoinInfo(CoinInfo):
'pycoin': {
'DASH':'tDASH', # only compressed
'BCH':'XTN',
'BTC':'XTN','LTC':'XLT','VIA':'TVI','FTC':'FTX','DOGE':'XDT','DCR':'DCRT'
'BTC':'XTN', 'LTC':'XLT', 'VIA':'TVI', 'FTC':'FTX', 'DOGE':'XDT', 'DCR':'DCRT'
},
'ethkey': {},
'keyconv': {}
@ -433,17 +432,17 @@ class TestCoinInfo(CoinInfo):
# Whitelists apply to the *first* tool in cls.external_tests supporting the given coin/addr_type.
# They're ignored if specific tool is requested.
'segwit': ('BTC',), # LTC Segwit broken on pycoin: uses old fmt
'bech32': ('BTC','LTC'),
'bech32': ('BTC', 'LTC'),
'compressed': (
'BTC','LTC','VIA','FTC','DOGE','DASH','MEC','MYR','UNO',
'JBS','MZC','RIC','DFC','FAI','ARG','ZEC','DCR','ZEC'
'BTC', 'LTC', 'VIA', 'FTC', 'DOGE', 'DASH', 'MEC', 'MYR', 'UNO',
'JBS', 'MZC', 'RIC', 'DFC', 'FAI', 'ARG', 'ZEC', 'DCR', 'ZEC'
),
}
external_tests_blacklist = {
# Unconditionally block testing of the given coin/addr_type with given tool, or all coins if True
'legacy': {},
'segwit': { 'keyconv': True },
'bech32': { 'keyconv': True },
'segwit': {'keyconv': True},
'bech32': {'keyconv': True},
}
if __name__ == '__main__':
@ -456,10 +455,10 @@ if __name__ == '__main__':
}
}
cfg = Config( opts_data=opts_data, need_amt=False )
cfg = Config(opts_data=opts_data, need_amt=False)
msg('Checking CoinInfo WIF/P2PKH/P2SH version numbers and trust levels against protocol.py')
TestCoinInfo.verify_core_coin_data( cfg, cfg.quiet, cfg.verbose )
TestCoinInfo.verify_core_coin_data(cfg, cfg.quiet, cfg.verbose)
msg('Checking CoinInfo address leading symbols')
TestCoinInfo.verify_leading_symbols( cfg.quiet, cfg.verbose )
TestCoinInfo.verify_leading_symbols(cfg.quiet, cfg.verbose)

View file

@ -21,17 +21,17 @@ test/cmdtest.py: Command test runner for the MMGen wallet system
"""
def check_segwit_opts():
for k,m in (('segwit','S'),('segwit_random','S'),('bech32','B')):
if getattr(cfg,k) and m not in proto.mmtypes:
die(1,f'--{k.replace("_","-")} option incompatible with {proto.cls_name}')
for k, m in (('segwit', 'S'), ('segwit_random', 'S'), ('bech32', 'B')):
if getattr(cfg, k) and m not in proto.mmtypes:
die(1, f'--{k.replace("_", "-")} option incompatible with {proto.cls_name}')
def create_shm_dir(data_dir,trash_dir):
def create_shm_dir(data_dir, trash_dir):
# Laggy flash media can cause pexpect to fail, so create a temporary directory
# under '/dev/shm' and put datadir and tmpdirs here.
import shutil
from subprocess import run
if sys.platform in ('win32', 'darwin'):
for tdir in (data_dir,trash_dir):
for tdir in (data_dir, trash_dir):
try:
os.listdir(tdir)
except:
@ -41,35 +41,35 @@ def create_shm_dir(data_dir,trash_dir):
shutil.rmtree(tdir)
except: # we couldn't remove data dir - perhaps regtest daemon is running
try:
run(['python3',os.path.join('cmds','mmgen-regtest'),'stop'],check=True)
run(['python3', os.path.join('cmds', 'mmgen-regtest'), 'stop'], check=True)
except:
die(4,f'Unable to remove {tdir!r}!')
die(4, f'Unable to remove {tdir!r}!')
else:
time.sleep(2)
shutil.rmtree(tdir)
os.mkdir(tdir,0o755)
os.mkdir(tdir, 0o755)
shm_dir = 'test'
else:
tdir,pfx = '/dev/shm','mmgen-test-'
tdir, pfx = '/dev/shm', 'mmgen-test-'
try:
run(f'rm -rf {tdir}/{pfx}*',shell=True,check=True)
run(f'rm -rf {tdir}/{pfx}*', shell=True, check=True)
except Exception as e:
die(2,f'Unable to delete directory tree {tdir}/{pfx}* ({e.args[0]})')
die(2, f'Unable to delete directory tree {tdir}/{pfx}* ({e.args[0]})')
try:
import tempfile
shm_dir = str(tempfile.mkdtemp('',pfx,tdir))
shm_dir = str(tempfile.mkdtemp('', pfx, tdir))
except Exception as e:
die(2,f'Unable to create temporary directory in {tdir} ({e.args[0]})')
die(2, f'Unable to create temporary directory in {tdir} ({e.args[0]})')
dest = os.path.join(shm_dir,os.path.basename(trash_dir))
os.mkdir(dest,0o755)
dest = os.path.join(shm_dir, os.path.basename(trash_dir))
os.mkdir(dest, 0o755)
run(f'rm -rf {trash_dir}',shell=True,check=True)
os.symlink(dest,trash_dir)
run(f'rm -rf {trash_dir}', shell=True, check=True)
os.symlink(dest, trash_dir)
dest = os.path.join(shm_dir,os.path.basename(data_dir))
shutil.move(data_dir,dest) # data_dir was created by Config()
os.symlink(dest,data_dir)
dest = os.path.join(shm_dir, os.path.basename(data_dir))
shutil.move(data_dir, dest) # data_dir was created by Config()
os.symlink(dest, data_dir)
return shm_dir
@ -81,8 +81,8 @@ try:
except ImportError:
from test.include.test_init import repo_root
from mmgen.cfg import Config,gc
from mmgen.color import red,yellow,green,blue,cyan,gray,nocolor,init_color
from mmgen.cfg import Config, gc
from mmgen.color import red, yellow, green, blue, cyan, gray, nocolor, init_color
from mmgen.util import msg, Msg, rmsg, bmsg, die, suf, make_timestr
from test.include.common import (
@ -101,7 +101,7 @@ from test.include.common import (
)
try:
os.unlink(os.path.join(repo_root,cmdtest_py_error_fn))
os.unlink(os.path.join(repo_root, cmdtest_py_error_fn))
except:
pass
@ -109,10 +109,10 @@ os.environ['MMGEN_QUIET'] = '0' # for this script and spawned scripts
opts_data = {
'sets': [
('list_current_cmd_groups',True,'list_cmd_groups',True),
('demo',True,'exact_output',True),
('demo',True,'buf_keypress',True),
('demo',True,'pexpect_spawn',True),
('list_current_cmd_groups', True, 'list_cmd_groups', True),
('demo', True, 'exact_output', True),
('demo', True, 'buf_keypress', True),
('demo', True, 'pexpect_spawn', True),
],
'text': {
'desc': 'High-level tests for the MMGen Wallet suite',
@ -171,14 +171,14 @@ environment var
"""
},
'code': {
'options': lambda proto,help_notes,s: s.format(
'options': lambda proto, help_notes, s: s.format(
lf = cmdtest_py_log_fn
)
}
}
# we need some opt values before running opts.init, so parse without initializing:
po = Config(opts_data=opts_data,parse_only=True)._parsed_opts
po = Config(opts_data=opts_data, parse_only=True)._parsed_opts
data_dir = Config.test_datadir
@ -193,7 +193,7 @@ if not po.user_opts.get('skip_deps'):
cfg = Config(opts_data=opts_data)
if cfg.no_altcoin and cfg.coin != 'BTC':
die(1,f'--no-altcoin incompatible with --coin={cfg.coin}')
die(1, f'--no-altcoin incompatible with --coin={cfg.coin}')
set_globals(cfg)
@ -208,31 +208,31 @@ type(cfg)._reset_ok += (
'no_timings',
'exit_after',
'resuming',
'skipping_deps' )
'skipping_deps')
logging = cfg.log or os.getenv('MMGEN_EXEC_WRAPPER')
cfg.resuming = any(k in po.user_opts for k in ('resume','resume_after'))
cfg.resuming = any(k in po.user_opts for k in ('resume', 'resume_after'))
cfg.skipping_deps = cfg.resuming or 'skip_deps' in po.user_opts
cmd_args = cfg._args
if cfg.pexpect_spawn and sys.platform == 'win32':
die(1,'--pexpect-spawn option not supported on Windows platform, exiting')
die(1, '--pexpect-spawn option not supported on Windows platform, exiting')
if cfg.daemon_id and cfg.daemon_id in cfg.blacklisted_daemons.split():
die(1,f'cmdtest.py: daemon {cfg.daemon_id!r} blacklisted, exiting')
die(1, f'cmdtest.py: daemon {cfg.daemon_id!r} blacklisted, exiting')
network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '')
proto = cfg._proto
# step 3: move data_dir to /dev/shm and symlink it back to ./test:
trash_dir = os.path.join('test','trash')
trash_dir2 = os.path.join('test','trash2')
trash_dir = os.path.join('test', 'trash')
trash_dir2 = os.path.join('test', 'trash2')
if not cfg.skipping_deps:
shm_dir = create_shm_dir(data_dir,trash_dir)
shm_dir = create_shm_dir(data_dir, trash_dir)
check_segwit_opts()
@ -262,24 +262,24 @@ def list_cmds():
def gen_output():
gm = CmdGroupMgr()
cw,d = 0,[]
cw, d = 0, []
yield green('AVAILABLE COMMANDS:')
for gname in gm.cmd_groups:
tg = gm.gm_init_group(None,gname,None,None)
tg = gm.gm_init_group(None, gname, None, None)
desc = tg.__doc__.strip() if tg.__doc__ else type(tg).__name__
d.append( (gname,desc,gm.cmd_list,gm.dpy_data) )
cw = max(max(len(k) for k in gm.dpy_data),cw)
d.append((gname, desc, gm.cmd_list, gm.dpy_data))
cw = max(max(len(k) for k in gm.dpy_data), cw)
for gname,gdesc,clist,dpdata in d:
for gname, gdesc, clist, dpdata in d:
yield '\n'+green(f'{gname!r} - {gdesc}:')
for cmd in clist:
data = dpdata[cmd]
yield ' {:{w}} - {}'.format(
cmd,
(data if isinstance(data,str) else data[1]),
w = cw )
(data if isinstance(data, str) else data[1]),
w = cw)
from mmgen.ui import do_pager
do_pager('\n'.join(gen_output()))
@ -297,9 +297,9 @@ def create_tmp_dirs(shm_dir):
for cfg in sorted(cfgs):
mk_tmpdir(cfgs[cfg]['tmpdir'])
else:
os.makedirs( os.path.join('test','tmp'), mode=0o755, exist_ok=True )
os.makedirs(os.path.join('test', 'tmp'), mode=0o755, exist_ok=True)
for cfg in sorted(cfgs):
src = os.path.join(shm_dir,cfgs[cfg]['tmpdir'].split('/')[-1])
src = os.path.join(shm_dir, cfgs[cfg]['tmpdir'].split('/')[-1])
mk_tmpdir(src)
try:
os.unlink(cfgs[cfg]['tmpdir'])
@ -307,10 +307,10 @@ def create_tmp_dirs(shm_dir):
if e.errno != 2:
raise
finally:
os.symlink(src,cfgs[cfg]['tmpdir'])
os.symlink(src, cfgs[cfg]['tmpdir'])
def set_restore_term_at_exit():
import termios,atexit
import termios, atexit
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
def at_exit():
@ -321,36 +321,36 @@ class CmdGroupMgr:
dpy_data = None
from test.cmdtest_py_d.cfg import cmd_groups_dfl,cmd_groups_extra
from test.cmdtest_py_d.cfg import cmd_groups_dfl, cmd_groups_extra
cmd_groups = cmd_groups_dfl.copy()
cmd_groups.update(cmd_groups_extra)
@staticmethod
def create_cmd_group(cls,sg_name=None):
def create_cmd_group(cls, sg_name=None):
cmd_group_in = dict(cls.cmd_group_in)
if sg_name and 'subgroup.' + sg_name not in cmd_group_in:
die(1,f'{sg_name!r}: no such subgroup in test group {cls.__name__}')
die(1, f'{sg_name!r}: no such subgroup in test group {cls.__name__}')
def add_entries(key,add_deps=True,added_subgroups=[]):
def add_entries(key, add_deps=True, added_subgroups=[]):
if add_deps:
for dep in cmd_group_in['subgroup.'+key]:
yield from add_entries(dep)
assert isinstance(cls.cmd_subgroups[key][0],str), f'header for subgroup {key!r} missing!'
assert isinstance(cls.cmd_subgroups[key][0], str), f'header for subgroup {key!r} missing!'
if not key in added_subgroups:
yield from cls.cmd_subgroups[key][1:]
added_subgroups.append(key)
def gen():
for name,data in cls.cmd_group_in:
for name, data in cls.cmd_group_in:
if name.startswith('subgroup.'):
sg_key = name.removeprefix('subgroup.')
if sg_name in (None,sg_key):
if sg_name in (None, sg_key):
yield from add_entries(
sg_key,
add_deps = sg_name and not cfg.skipping_deps,
@ -358,55 +358,55 @@ class CmdGroupMgr:
if cfg.deps_only and sg_key == sg_name:
return
elif not cfg.skipping_deps:
yield (name,data)
yield (name, data)
return tuple(gen())
def load_mod(self,gname,modname=None):
clsname,kwargs = self.cmd_groups[gname]
def load_mod(self, gname, modname=None):
clsname, kwargs = self.cmd_groups[gname]
if modname is None and 'modname' in kwargs:
modname = kwargs['modname']
import importlib
modpath = f'test.cmdtest_py_d.ct_{modname or gname}'
return getattr(importlib.import_module(modpath),clsname)
return getattr(importlib.import_module(modpath), clsname)
def create_group(self,gname,sg_name,full_data=False,modname=None,is3seed=False,add_dpy=False):
def create_group(self, gname, sg_name, full_data=False, modname=None, is3seed=False, add_dpy=False):
"""
Initializes the list 'cmd_list' and dict 'dpy_data' from module's cmd_group data.
Alternatively, if called with 'add_dpy=True', updates 'dpy_data' from module data
without touching 'cmd_list'
"""
cls = self.load_mod(gname,modname)
cls = self.load_mod(gname, modname)
cdata = []
def get_shared_deps(cmdname,tmpdir_idx):
def get_shared_deps(cmdname, tmpdir_idx):
"""
shared_deps are "implied" dependencies for all cmds in cmd_group that don't appear in
the cmd_group data or cmds' argument lists. Supported only for 3seed tests at present.
"""
if not hasattr(cls,'shared_deps'):
if not hasattr(cls, 'shared_deps'):
return []
return [k for k,v in cfgs[str(tmpdir_idx)]['dep_generators'].items()
return [k for k, v in cfgs[str(tmpdir_idx)]['dep_generators'].items()
if k in cls.shared_deps and v != cmdname]
if not hasattr(cls,'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls,sg_name)
if not hasattr(cls, 'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls, sg_name)
for a,b in cls.cmd_group:
for a, b in cls.cmd_group:
if is3seed:
for n,(i,j) in enumerate(zip(cls.tmpdir_nums,(128,192,256))):
for n, (i, j) in enumerate(zip(cls.tmpdir_nums, (128, 192, 256))):
k = f'{a}_{n+1}'
if hasattr(cls,'skip_cmds') and k in cls.skip_cmds:
if hasattr(cls, 'skip_cmds') and k in cls.skip_cmds:
continue
sdeps = get_shared_deps(k,i)
if isinstance(b,str):
cdata.append( (k, (i,f'{b} ({j}-bit)',[[[]+sdeps,i]])) )
sdeps = get_shared_deps(k, i)
if isinstance(b, str):
cdata.append((k, (i, f'{b} ({j}-bit)', [[[]+sdeps, i]])))
else:
cdata.append( (k, (i,f'{b[1]} ({j}-bit)',[[b[0]+sdeps,i]])) )
cdata.append((k, (i, f'{b[1]} ({j}-bit)', [[b[0]+sdeps, i]])))
else:
cdata.append( (a, b if full_data else (cls.tmpdir_nums[0],b,[[[],cls.tmpdir_nums[0]]])) )
cdata.append((a, b if full_data else (cls.tmpdir_nums[0], b, [[[], cls.tmpdir_nums[0]]])))
if add_dpy:
self.dpy_data.update(dict(cdata))
@ -416,26 +416,26 @@ class CmdGroupMgr:
return cls
def gm_init_group(self,trunner,gname,sg_name,spawn_prog):
def gm_init_group(self, trunner, gname, sg_name, spawn_prog):
kwargs = self.cmd_groups[gname][1]
cls = self.create_group(gname,sg_name,**kwargs)
cls = self.create_group(gname, sg_name, **kwargs)
cls.group_name = gname
return cls(trunner,cfgs,spawn_prog)
return cls(trunner, cfgs, spawn_prog)
def get_cls_by_gname(self,gname):
return self.load_mod( gname, self.cmd_groups[gname][1].get('modname') )
def get_cls_by_gname(self, gname):
return self.load_mod(gname, self.cmd_groups[gname][1].get('modname'))
def list_cmd_groups(self):
ginfo = []
for gname in self.cmd_groups:
ginfo.append(( gname, self.get_cls_by_gname(gname) ))
ginfo.append((gname, self.get_cls_by_gname(gname)))
if cfg.list_current_cmd_groups:
exclude = (cfg.exclude_groups or '').split(',')
ginfo = [g for g in ginfo
if network_id in g[1].networks
and not g[0] in exclude
and g[0] in tuple(self.cmd_groups_dfl) + tuple(cmd_args) ]
and g[0] in tuple(self.cmd_groups_dfl) + tuple(cmd_args)]
desc = 'CONFIGURED'
else:
desc = 'AVAILABLE'
@ -443,30 +443,30 @@ class CmdGroupMgr:
def gen_output():
yield green(f'{desc} COMMAND GROUPS AND SUBGROUPS:')
yield ''
for name,cls in ginfo:
for name, cls in ginfo:
yield ' {} - {}'.format(
yellow(name.ljust(13)),
(cls.__doc__.strip() if cls.__doc__ else cls.__name__) )
if hasattr(cls,'cmd_subgroups'):
subgroups = {k:v for k,v in cls.cmd_subgroups.items() if not k.startswith('_')}
(cls.__doc__.strip() if cls.__doc__ else cls.__name__))
if hasattr(cls, 'cmd_subgroups'):
subgroups = {k:v for k, v in cls.cmd_subgroups.items() if not k.startswith('_')}
max_w = max(len(k) for k in subgroups)
for k,v in subgroups.items():
yield ' + {} · {}'.format( cyan(k.ljust(max_w+1)), v[0] )
for k, v in subgroups.items():
yield ' + {} · {}'.format(cyan(k.ljust(max_w+1)), v[0])
from mmgen.ui import do_pager
do_pager('\n'.join(gen_output()))
Msg( '\n' + ' '.join(e[0] for e in ginfo) )
Msg('\n' + ' '.join(e[0] for e in ginfo))
sys.exit(0)
def find_cmd_in_groups(self,cmd,group=None):
def find_cmd_in_groups(self, cmd, group=None):
"""
Search for a test command in specified group or all configured command groups
and return it as a string. Loads modules but alters no global variables.
"""
if group:
if not group in [e[0] for e in self.cmd_groups]:
die(1,f'{group!r}: unrecognized group')
die(1, f'{group!r}: unrecognized group')
groups = [self.cmd_groups[group]]
else:
groups = self.cmd_groups
@ -474,13 +474,13 @@ class CmdGroupMgr:
for gname in groups:
cls = self.get_cls_by_gname(gname)
if not hasattr(cls,'cmd_group'):
if not hasattr(cls, 'cmd_group'):
cls.cmd_group = self.create_cmd_group(cls)
if cmd in cls.cmd_group: # first search the class
return gname
if cmd in dir(cls(None,None,None)): # then a throwaway instance
if cmd in dir(cls(None, None, None)): # then a throwaway instance
return gname # cmd might exist in more than one group - we'll go with the first
return None
@ -492,7 +492,7 @@ class CmdTestRunner:
if logging:
self.log_fd.close()
def __init__(self,data_dir,trash_dir):
def __init__(self, data_dir, trash_dir):
self.data_dir = data_dir
self.trash_dir = trash_dir
@ -505,16 +505,16 @@ class CmdTestRunner:
self.deps_only = None
if logging:
self.log_fd = open(cmdtest_py_log_fn,'a')
self.log_fd = open(cmdtest_py_log_fn, 'a')
self.log_fd.write(f'\nLog started: {make_timestr()} UTC\n')
omsg(f'INFO → Logging to file {cmdtest_py_log_fn!r}')
else:
self.log_fd = None
if cfg.coverage:
coverdir,accfile = init_coverage()
coverdir, accfile = init_coverage()
omsg(f'INFO → Writing coverage files to {coverdir!r}')
self.pre_args = ['python3','-m','trace','--count','--coverdir='+coverdir,'--file='+accfile]
self.pre_args = ['python3', '-m', 'trace', '--count', '--coverdir='+coverdir, '--file='+accfile]
else:
self.pre_args = ['python3'] if sys.platform == 'win32' else []
@ -567,22 +567,22 @@ class CmdTestRunner:
cmd_path = (
cmd if cfg.system # cfg.system is broken for main test group with overlay tree
else os.path.relpath(os.path.join(repo_root,cmd_dir,cmd)) )
else os.path.relpath(os.path.join(repo_root, cmd_dir, cmd)))
args = (
self.pre_args +
([] if no_exec_wrapper else ['scripts/exec_wrapper.py']) +
[cmd_path] +
([] if no_passthru_opts else self.passthru_opts) +
args )
args)
try:
qargs = ['{q}{}{q}'.format( a, q = "'" if ' ' in a else '' ) for a in args]
qargs = ['{q}{}{q}'.format(a, q = "'" if ' ' in a else '') for a in args]
except:
msg(f'args: {args}')
raise
cmd_disp = ' '.join(qargs).replace('\\','/') # for mingw
cmd_disp = ' '.join(qargs).replace('\\', '/') # for mingw
if logging:
self.log_fd.write('[{}][{}:{}] {}\n'.format(
@ -592,17 +592,17 @@ class CmdTestRunner:
cmd_disp))
for i in args: # die only after writing log entry
if not isinstance(i,str):
die(2,'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format(
if not isinstance(i, str):
die(2, 'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format(
self.tg.test_name,
args ))
args))
if not no_msg:
t_pfx = '' if cfg.no_timings else f'[{time.time() - self.start_time:08.2f}] '
if cfg.verbose or cfg.print_cmdline or cfg.exact_output:
omsg(green(f'{t_pfx}Testing: {desc}'))
if not msg_only:
clr1,clr2 = (nocolor,nocolor) if cfg.print_cmdline else (green,cyan)
clr1, clr2 = (nocolor, nocolor) if cfg.print_cmdline else (green, cyan)
omsg(
clr1('Executing: ') +
clr2(repr(cmd_disp) if sys.platform == 'win32' else cmd_disp)
@ -638,7 +638,7 @@ class CmdTestRunner:
pexpect_spawn = pexpect_spawn,
timeout = timeout,
send_delay = send_delay,
direct_exec = direct_exec )
direct_exec = direct_exec)
def end_msg(self):
t = int(time.time() - self.start_time)
@ -647,7 +647,7 @@ class CmdTestRunner:
('\n' if cfg.no_timings else f'. Elapsed time: {t//60:02d}:{t%60:02d}\n')
))
def init_group(self,gname,sg_name=None,cmd=None,quiet=False,do_clean=True):
def init_group(self, gname, sg_name=None, cmd=None, quiet=False, do_clean=True):
from test.cmdtest_py_d.cfg import cmd_groups_altcoin
if cfg.no_altcoin and gname in cmd_groups_altcoin:
@ -660,19 +660,19 @@ class CmdTestRunner:
omsg(gray(f'INFO → skipping test {gname!r} for platform {sys.platform!r}'))
return None
for k in ('segwit','segwit_random','bech32'):
if getattr(cfg,k):
for k in ('segwit', 'segwit_random', 'bech32'):
if getattr(cfg, k):
segwit_opt = k
break
else:
segwit_opt = None
def gen_msg():
yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname,c=cmd)
yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname, c=cmd)
if len(ct_cls.networks) != 1:
yield f' for {proto.coin} {proto.network}'
if segwit_opt:
yield ' (--{})'.format( segwit_opt.replace('_','-') )
yield ' (--{})'.format(segwit_opt.replace('_', '-'))
m = ''.join(gen_msg())
@ -681,10 +681,10 @@ class CmdTestRunner:
return None
# 'networks = ()' means all networks allowed
nws = [(e.split('_')[0],'testnet') if '_' in e else (e,'mainnet') for e in ct_cls.networks]
nws = [(e.split('_')[0], 'testnet') if '_' in e else (e, 'mainnet') for e in ct_cls.networks]
if nws:
coin = proto.coin.lower()
for a,b in nws:
for a, b in nws:
if a == coin and b == proto.network:
break
else:
@ -698,18 +698,18 @@ class CmdTestRunner:
bmsg('Executing ' + m)
if (not self.daemon_started) and self.gm.get_cls_by_gname(gname).need_daemon:
start_test_daemons(network_id,remove_datadir=True)
start_test_daemons(network_id, remove_datadir=True)
self.daemon_started = True
if hasattr(self,'tg'):
if hasattr(self, 'tg'):
del self.tg
self.tg = self.gm.gm_init_group(self,gname,sg_name,self.spawn_wrapper)
self.tg = self.gm.gm_init_group(self, gname, sg_name, self.spawn_wrapper)
self.ct_clsname = type(self.tg).__name__
# pass through opts from cmdline (po.user_opts)
self.passthru_opts = ['--{}{}'.format(
k.replace('_','-'),
k.replace('_', '-'),
'' if cfg._uopts[k] is True else '=' + cfg._uopts[k]
) for k in cfg._uopts if k in self.tg.base_passthru_opts + self.tg.passthru_opts]
@ -722,45 +722,45 @@ class CmdTestRunner:
cfg.exit_after = self.resume_cmd
if cfg.exit_after and cfg.exit_after not in self.gm.cmd_list:
die(1,f'{cfg.exit_after!r}: command not recognized')
die(1, f'{cfg.exit_after!r}: command not recognized')
return self.tg
def run_tests(self,cmd_args):
def run_tests(self, cmd_args):
self.start_time = time.time()
self.daemon_started = False
gname_save = None
def parse_arg(arg):
if '.' in arg:
a,b = arg.split('.')
return [a] + b.split(':') if ':' in b else [a,b,None]
a, b = arg.split('.')
return [a] + b.split(':') if ':' in b else [a, b, None]
elif ':' in arg:
a,b = arg.split(':')
return [a,None,b]
a, b = arg.split(':')
return [a, None, b]
else:
return [self.gm.find_cmd_in_groups(arg),None,arg]
return [self.gm.find_cmd_in_groups(arg), None, arg]
if cmd_args:
for arg in cmd_args:
if arg in self.gm.cmd_groups:
if self.init_group(arg):
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd,build=True)
self.check_needs_rerun(cmd, build=True)
do_between()
else:
gname,sg_name,cmdname = parse_arg(arg)
gname, sg_name, cmdname = parse_arg(arg)
if gname:
same_grp = gname == gname_save # same group as previous cmd: don't clean, suppress blue msg
if self.init_group(gname,sg_name,cmdname,quiet=same_grp,do_clean=not same_grp):
if self.init_group(gname, sg_name, cmdname, quiet=same_grp, do_clean=not same_grp):
if cmdname:
if cfg.deps_only:
self.deps_only = cmdname
try:
self.check_needs_rerun(cmdname,build=True)
self.check_needs_rerun(cmdname, build=True)
except Exception as e: # allow calling of functions not in cmd_group
if isinstance(e,KeyError) and e.args[0] == cmdname:
ret = getattr(self.tg,cmdname)()
if isinstance(e, KeyError) and e.args[0] == cmdname:
ret = getattr(self.tg, cmdname)()
if type(ret).__name__ == 'coroutine':
asyncio.run(ret)
else:
@ -768,23 +768,23 @@ class CmdTestRunner:
do_between()
else:
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd,build=True)
self.check_needs_rerun(cmd, build=True)
do_between()
gname_save = gname
else:
die(1,f'{arg!r}: command not recognized')
die(1, f'{arg!r}: command not recognized')
else:
if cfg.exclude_groups:
exclude = cfg.exclude_groups.split(',')
for e in exclude:
if e not in self.gm.cmd_groups_dfl:
die(1,f'{e!r}: group not recognized')
die(1, f'{e!r}: group not recognized')
for gname in self.gm.cmd_groups_dfl:
if cfg.exclude_groups and gname in exclude:
continue
if self.init_group(gname):
for cmd in self.gm.cmd_list:
self.check_needs_rerun(cmd,build=True)
self.check_needs_rerun(cmd, build=True)
do_between()
self.end_msg()
@ -810,7 +810,7 @@ class CmdTestRunner:
ret = self.get_num_exts_for_cmd(cmd)
if ret:
for ext in ret[1]:
fn = get_file_with_ext(cfgs[ret[0]]['tmpdir'],ext,delete=build)
fn = get_file_with_ext(cfgs[ret[0]]['tmpdir'], ext, delete=build)
if fn:
if force_delete:
os.unlink(fn)
@ -822,13 +822,13 @@ class CmdTestRunner:
for fn in fns:
my_age = os.stat(fn).st_mtime
for num,ext in fdeps:
f = get_file_with_ext(cfgs[num]['tmpdir'],ext,delete=build)
for num, ext in fdeps:
f = get_file_with_ext(cfgs[num]['tmpdir'], ext, delete=build)
if f and os.stat(f).st_mtime > my_age:
rerun = True
for cdep in cdeps:
if self.check_needs_rerun(cdep,build=build,root=False,dpy=cmd):
if self.check_needs_rerun(cdep, build=build, root=False, dpy=cmd):
rerun = True
if build:
@ -843,22 +843,22 @@ class CmdTestRunner:
else:
# If prog produces multiple files:
if cmd not in self.rebuild_list or rerun is True:
self.rebuild_list[cmd] = (rerun,fns[0] if fns else '') # FIX
self.rebuild_list[cmd] = (rerun, fns[0] if fns else '') # FIX
return rerun
def run_test(self,cmd):
def run_test(self, cmd):
if self.deps_only and cmd == self.deps_only:
sys.exit(0)
d = [(str(num),ext) for exts,num in self.gm.dpy_data[cmd][2] for ext in exts]
d = [(str(num), ext) for exts, num in self.gm.dpy_data[cmd][2] for ext in exts]
# delete files depended on by this cmd
arg_list = [get_file_with_ext(cfgs[num]['tmpdir'],ext) for num,ext in d]
arg_list = [get_file_with_ext(cfgs[num]['tmpdir'], ext) for num, ext in d]
# remove shared_deps from arg list
if hasattr(self.tg,'shared_deps'):
if hasattr(self.tg, 'shared_deps'):
arg_list = arg_list[:-len(self.tg.shared_deps)]
if self.resume_cmd:
@ -878,21 +878,19 @@ class CmdTestRunner:
self.tg.tmpdir_num = cdata[0]
# self.tg.cfg = cfgs[str(cdata[0])] # will remove this eventually
test_cfg = cfgs[str(cdata[0])]
for k in ( 'seed_len', 'seed_id',
'wpasswd', 'kapasswd',
'segwit', 'hash_preset',
'bw_filename', 'bw_params', 'ref_bw_seed_id',
'addr_idx_list', 'pass_idx_list' ):
for k in (
'seed_len', 'seed_id', 'wpasswd', 'kapasswd', 'segwit', 'hash_preset', 'bw_filename',
'bw_params', 'ref_bw_seed_id', 'addr_idx_list', 'pass_idx_list'):
if k in test_cfg:
setattr(self.tg,k,test_cfg[k])
setattr(self.tg, k, test_cfg[k])
ret = getattr(self.tg,cmd)(*arg_list) # run the test
ret = getattr(self.tg, cmd)(*arg_list) # run the test
if type(ret).__name__ == 'coroutine':
ret = asyncio.run(ret)
self.process_retval(cmd,ret)
self.process_retval(cmd, ret)
if cfg.profile:
omsg('\r\033[50C{:.4f}'.format( time.time() - start ))
omsg('\r\033[50C{:.4f}'.format(time.time() - start))
if cmd == cfg.exit_after:
sys.exit(0)
@ -903,7 +901,7 @@ class CmdTestRunner:
r = '-' * 72 + '\n'
print(r+('\n'+r).join(self.skipped_warnings))
def process_retval(self,cmd,ret):
def process_retval(self, cmd, ret):
if type(ret).__name__ == 'MMGenPexpect':
ret.ok(exit_val=self.exit_val)
self.cmd_total += 1
@ -911,45 +909,45 @@ class CmdTestRunner:
ok()
self.cmd_total += 1
elif ret == 'error':
die(2,red(f'\nTest {self.tg.test_name!r} failed'))
elif ret in ('skip','skip_msg','silent'):
die(2, red(f'\nTest {self.tg.test_name!r} failed'))
elif ret in ('skip', 'skip_msg', 'silent'):
if ret == 'silent':
self.cmd_total += 1
elif ret == 'skip_msg':
ok('SKIP')
elif isinstance(ret,tuple) and ret[0] == 'skip_warn':
elif isinstance(ret, tuple) and ret[0] == 'skip_warn':
self.skipped_warnings.append(
'Test {!r} was skipped:\n {}'.format(cmd,'\n '.join(ret[1].split('\n'))))
'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n'))))
else:
die(2,f'{cmd!r} returned {ret}')
die(2, f'{cmd!r} returned {ret}')
def check_deps(self,cmds): # TODO: broken
def check_deps(self, cmds): # TODO: broken
if len(cmds) != 1:
die(1,f'Usage: {gc.prog_name} check_deps <command>')
die(1, f'Usage: {gc.prog_name} check_deps <command>')
cmd = cmds[0]
if cmd not in self.gm.cmd_list:
die(1,f'{cmd!r}: unrecognized command')
die(1, f'{cmd!r}: unrecognized command')
if not cfg.quiet:
omsg(f'Checking dependencies for {cmd!r}')
self.check_needs_rerun(self.tg,cmd)
self.check_needs_rerun(self.tg, cmd)
w = max(map(len,self.rebuild_list)) + 1
w = max(map(len, self.rebuild_list)) + 1
for cmd in self.rebuild_list:
c = self.rebuild_list[cmd]
m = 'Rebuild' if (c[0] and c[1]) else 'Build' if c[0] else 'OK'
omsg('cmd {:<{w}} {}'.format( cmd+':', m, w=w ))
omsg('cmd {:<{w}} {}'.format(cmd+':', m, w=w))
def generate_file_deps(self,cmd):
return [(str(n),e) for exts,n in self.gm.dpy_data[cmd][2] for e in exts]
def generate_file_deps(self, cmd):
return [(str(n), e) for exts, n in self.gm.dpy_data[cmd][2] for e in exts]
def generate_cmd_deps(self,fdeps):
return [cfgs[str(n)]['dep_generators'][ext] for n,ext in fdeps]
def generate_cmd_deps(self, fdeps):
return [cfgs[str(n)]['dep_generators'][ext] for n, ext in fdeps]
def get_num_exts_for_cmd(self,cmd):
def get_num_exts_for_cmd(self, cmd):
try:
num = str(self.gm.dpy_data[cmd][0])
except KeyError:
@ -958,7 +956,7 @@ class CmdTestRunner:
if gname:
kwargs = self.gm.cmd_groups[gname][1]
kwargs.update({'add_dpy':True})
self.gm.create_group(gname,None,**kwargs)
self.gm.create_group(gname, None, **kwargs)
num = str(self.gm.dpy_data[cmd][0])
qmsg(f' found in group {gname!r}')
else:
@ -967,7 +965,7 @@ class CmdTestRunner:
dgl = cfgs[num]['dep_generators']
if cmd in dgl.values():
exts = [k for k in dgl if dgl[k] == cmd]
return (num,exts)
return (num, exts)
else:
return None
@ -987,7 +985,7 @@ if __name__ == '__main__':
from mmgen.exception import TestSuiteSpawnedScriptException
try:
tr = CmdTestRunner(data_dir,trash_dir)
tr = CmdTestRunner(data_dir, trash_dir)
tr.run_tests(cmd_args)
tr.warn_skipped()
if tr.daemon_started and not cfg.no_daemon_stop:
@ -1002,7 +1000,7 @@ if __name__ == '__main__':
if hasattr(tr, 'tg'):
del tr.tg
del tr
die(1,yellow('\ntest.py exiting at user request'))
die(1, yellow('\ntest.py exiting at user request'))
except TestSuiteSpawnedScriptException as e:
# if spawned script is not running under exec_wrapper, output brief error msg:
if os.getenv('MMGEN_EXEC_WRAPPER'):
@ -1021,4 +1019,4 @@ if __name__ == '__main__':
# if cmdtest.py itself is running under exec_wrapper, re-raise so exec_wrapper can handle exception:
if os.getenv('MMGEN_EXEC_WRAPPER') or not os.getenv('MMGEN_IGNORE_TEST_PY_EXCEPTION'):
raise
die(1,red('Test script exited with error'))
die(1, red('Test script exited with error'))

View file

@ -15,25 +15,35 @@ except ImportError:
from test.include import test_init
from mmgen.color import *
from mmgen.util import msg,ymsg,gmsg
from mmgen.util import msg, ymsg, gmsg
import mmgen.color as color_mod
def test_color():
ymsg("Terminal display:") # init_color() not called yet, so no yellow here
for desc,nc in (('pre-init',None),('auto','auto'),('8-color',8),('256-color',256),('disabled',0)):
for desc, nc in (
('pre-init', None),
('auto', 'auto'),
('8-color', 8),
('256-color', 256),
('disabled', 0)):
if nc is not None:
init_color(num_colors=nc)
msg('{:9}: {}'.format(
desc,
' '.join( getattr(color_mod,c)(c) for c in sorted(color_mod._colors) ) ))
' '.join(getattr(color_mod, c)(c) for c in sorted(color_mod._colors))))
init_color()
gmsg("\nParsed terminfo 'colors' values:")
from mmgen.color import orange
for t,c in (('rxvt',8),('xterm',8),('rxvt-unicode',88),('screen-256color',256),('xterm-256color',256)):
for t, c in (
('rxvt', 8),
('xterm', 8),
('rxvt-unicode', 88),
('screen-256color', 256),
('xterm-256color', 256)):
ret = get_terminfo_colors(t)
if ret is None:
ymsg(f'Warning: unable to get info for terminal {t!r}')

View file

@ -20,7 +20,7 @@
test/gentest.py: Cryptocoin key/address generation tests for the MMGen suite
"""
import sys,os,time
import sys, os, time
try:
from include import test_init
@ -28,9 +28,9 @@ except ImportError:
from test.include import test_init
# Import these _after_ local path's been added to sys.path
from mmgen.cfg import gc,Config
from mmgen.color import green,red,purple
from mmgen.util import msg,ymsg,capfirst,is_int,die
from mmgen.cfg import gc, Config
from mmgen.color import green, red, purple
from mmgen.util import msg, ymsg, capfirst, is_int, die
results_file = 'gentest.out.json'
@ -47,8 +47,8 @@ opts_data = {
-q, --quiet Produce quieter output
-s, --save-results Save output of external tool in Compare test to
{rf!r}
-t, --type=t Specify address type (e.g. 'compressed','segwit',
'zcash_z','bech32')
-t, --type=t Specify address type (e.g. 'compressed', 'segwit',
'zcash_z', 'bech32')
-v, --verbose Produce more verbose output
""",
'notes': """
@ -106,7 +106,7 @@ EXAMPLES:
SUPPORTED EXTERNAL TOOLS:
+ ethkey (for ETH,ETC)
+ ethkey (for ETH, ETC)
https://github.com/openethereum/openethereum
(build with 'cargo build -p ethkey-cli --release')
@ -126,23 +126,23 @@ SUPPORTED EXTERNAL TOOLS:
},
'code': {
'options': lambda s: s.format(
rf=results_file,
rf = results_file,
),
'notes': lambda s: s.format(
prog='test/gentest.py',
pnm=gc.proj_name,
snum=rounds )
prog = 'test/gentest.py',
pnm = gc.proj_name,
snum = rounds)
}
}
def get_cmd_output(cmd,input=None):
return run(cmd,input=input,stdout=PIPE,stderr=DEVNULL).stdout.decode().splitlines()
def get_cmd_output(cmd, input=None):
return run(cmd, input=input, stdout=PIPE, stderr=DEVNULL).stdout.decode().splitlines()
saved_results = {}
class GenTool:
def __init__(self,proto,addr_type):
def __init__(self, proto, addr_type):
self.proto = proto
self.addr_type = addr_type
self.data = {}
@ -150,55 +150,55 @@ class GenTool:
def __del__(self):
if cfg.save_results:
key = f'{self.proto.coin}-{self.proto.network}-{self.addr_type.name}-{self.desc}'.lower()
saved_results[key] = {k.hex():v._asdict() for k,v in self.data.items()}
saved_results[key] = {k.hex():v._asdict() for k, v in self.data.items()}
def run_tool(self,sec,cache_data):
def run_tool(self, sec, cache_data):
vcoin = 'BTC' if self.proto.coin == 'BCH' else self.proto.coin
key = sec.orig_bytes
if key in self.data:
return self.data[key]
else:
ret = self.run(sec,vcoin)
ret = self.run(sec, vcoin)
if cache_data:
self.data[key] = sd( **{'reduced':sec.hex()}, **ret._asdict() )
self.data[key] = sd(**{'reduced':sec.hex()}, **ret._asdict())
return ret
class GenToolEthkey(GenTool):
desc = 'ethkey'
def __init__(self,*args,**kwargs):
def __init__(self, *args, **kwargs):
self.cmdname = get_ethkey()
super().__init__(*args,**kwargs)
super().__init__(*args, **kwargs)
def run(self,sec,vcoin):
o = get_cmd_output([self.cmdname,'info',sec.hex()])
def run(self, sec, vcoin):
o = get_cmd_output([self.cmdname, 'info', sec.hex()])
return gtr(
o[0].split()[1],
o[-1].split()[1],
None )
None)
class GenToolKeyconv(GenTool):
desc = 'keyconv'
def run(self,sec,vcoin):
o = get_cmd_output(['keyconv','-C',vcoin,sec.wif])
def run(self, sec, vcoin):
o = get_cmd_output(['keyconv', '-C', vcoin, sec.wif])
return gtr(
o[1].split()[1],
o[0].split()[1],
None )
None)
class GenToolZcash_mini(GenTool):
desc = 'zcash-mini'
def run(self,sec,vcoin):
o = get_cmd_output(['zcash-mini','-key','-simple'],input=(sec.wif+'\n').encode())
return gtr( o[1], o[0], o[-1] )
def run(self, sec, vcoin):
o = get_cmd_output(['zcash-mini', '-key', '-simple'], input=(sec.wif+'\n').encode())
return gtr(o[1], o[0], o[-1])
class GenToolPycoin(GenTool):
"""
pycoin/networks/all.py pycoin/networks/legacy_networks.py
"""
desc = 'pycoin'
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
from pycoin.networks.registry import network_for_netcode
except ImportError as e:
@ -206,16 +206,16 @@ class GenToolPycoin(GenTool):
f'{e}\nUnable to import pycoin.networks.registry. Is pycoin installed on your system?') from e
self.nfnc = network_for_netcode
def run(self,sec,vcoin):
def run(self, sec, vcoin):
if self.proto.testnet:
vcoin = cinfo.external_tests['testnet']['pycoin'][vcoin]
network = self.nfnc(vcoin)
key = network.keys.private(
secret_exponent = int(sec.hex(),16),
is_compressed = self.addr_type.name != 'legacy' )
secret_exponent = int(sec.hex(), 16),
is_compressed = self.addr_type.name != 'legacy')
if key is None:
die(1,f'can’t parse {sec.hex()}')
if self.addr_type.name in ('segwit','bech32'):
die(1, f'can’t parse {sec.hex()}')
if self.addr_type.name in ('segwit', 'bech32'):
hash160_c = key.hash160(is_compressed=True)
if self.addr_type.name == 'segwit':
p2sh_script = network.contract.for_p2pkh_wit(hash160_c)
@ -224,13 +224,13 @@ class GenToolPycoin(GenTool):
addr = network.address.for_p2pkh_wit(hash160_c)
else:
addr = key.address()
return gtr( key.wif(), addr, None )
return gtr(key.wif(), addr, None)
class GenToolMonero_python(GenTool):
desc = 'monero-python'
def __init__(self,*args,**kwargs):
super().__init__(*args,**kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
from monero.seed import Seed
except ImportError as e:
@ -238,36 +238,36 @@ class GenToolMonero_python(GenTool):
f'{e}\nUnable to import monero-python. Is monero-python installed on your system?') from e
self.Seed = Seed
def run(self,sec,vcoin):
seed = self.Seed( sec.orig_bytes.hex() )
def run(self, sec, vcoin):
seed = self.Seed(sec.orig_bytes.hex())
sk = seed.secret_spend_key()
vk = seed.secret_view_key()
addr = seed.public_address()
return gtr( sk, addr, vk )
return gtr(sk, addr, vk)
def find_or_check_tool(proto,addr_type,toolname):
def find_or_check_tool(proto, addr_type, toolname):
ext_progs = list(cinfo.external_tests[proto.network])
if toolname not in ext_progs + ['ext']:
die(1,f'{toolname!r}: unsupported tool for network {proto.network}')
die(1, f'{toolname!r}: unsupported tool for network {proto.network}')
if cfg.all_coins and toolname == 'ext':
die(1,"'--all-coins' must be combined with a specific external testing tool")
die(1, "'--all-coins' must be combined with a specific external testing tool")
else:
tool = cinfo.get_test_support(
proto.coin,
addr_type.name,
proto.network,
verbose = not cfg.quiet,
toolname = toolname if toolname != 'ext' else None )
toolname = toolname if toolname != 'ext' else None)
if tool and toolname in ext_progs and toolname != tool:
sys.exit(3)
if tool is None:
return None
return tool
def test_equal(desc,a_val,b_val,in_bytes,sec,wif,a_desc,b_desc):
def test_equal(desc, a_val, b_val, in_bytes, sec, wif, a_desc, b_desc):
if a_val != b_val:
fs = """
{i:{w}}: {}
@ -281,33 +281,33 @@ def test_equal(desc,a_val,b_val,in_bytes,sec,wif,a_desc,b_desc):
+ fs.format(
in_bytes.hex(), sec, wif, a_val, b_val,
i='input', s='sec key', W='WIF key', a=a_desc, b=b_desc,
w=max(len(e) for e in (a_desc,b_desc)) + 1
w=max(len(e) for e in (a_desc, b_desc)) + 1
).rstrip())
def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data):
def do_ab_test(proto, scfg, addr_type, gen1, kg2, ag, tool, cache_data):
def do_ab_inner(n,trounds,in_bytes):
def do_ab_inner(n, trounds, in_bytes):
global last_t
if cfg.verbose or time.time() - last_t >= 0.1:
qmsg_r(f'\rRound {i+1}/{trounds} ')
last_t = time.time()
sec = PrivKey(proto,in_bytes,compressed=addr_type.compressed,pubkey_type=addr_type.pubkey_type)
sec = PrivKey(proto, in_bytes, compressed=addr_type.compressed, pubkey_type=addr_type.pubkey_type)
data = kg1.gen_data(sec)
addr1 = ag.to_addr(data)
view_pref = 1 if proto.coin == 'BCH' else 0
tinfo = ( in_bytes, sec, sec.wif, type(kg1).__name__, type(kg2).__name__ if kg2 else tool.desc )
tinfo = (in_bytes, sec, sec.wif, type(kg1).__name__, type(kg2).__name__ if kg2 else tool.desc)
def do_msg():
if cfg.verbose:
msg( fs.format( b=in_bytes.hex(), r=sec.hex(), k=sec.wif, v=vk2, a=addr1 ))
msg(fs.format(b=in_bytes.hex(), r=sec.hex(), k=sec.wif, v=vk2, a=addr1))
if tool:
def run_tool():
o = tool.run_tool(sec,cache_data)
test_equal( 'WIF keys', sec.wif, o.wif, *tinfo )
o = tool.run_tool(sec, cache_data)
test_equal('WIF keys', sec.wif, o.wif, *tinfo)
test_equal('addresses', addr1.views[view_pref], o.addr, *tinfo)
if o.viewkey:
test_equal( 'view keys', ag.to_viewkey(data), o.viewkey, *tinfo )
test_equal('view keys', ag.to_viewkey(data), o.viewkey, *tinfo)
return o.viewkey
vk2 = run_tool()
do_msg()
@ -325,18 +325,18 @@ def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data):
for _ in range(scfg.rounds):
yield getrand(32)
kg1 = KeyGenerator( cfg, proto, addr_type.pubkey_type, gen1 )
kg1 = KeyGenerator(cfg, proto, addr_type.pubkey_type, gen1)
if type(kg1) == type(kg2):
die(4,'Key generators are the same!')
die(4, 'Key generators are the same!')
e = cinfo.get_entry(proto.coin,proto.network)
e = cinfo.get_entry(proto.coin, proto.network)
qmsg(green("Comparing address generators '{A}' and '{B}' for {N} {c} ({n}), addrtype {a!r}".format(
A = type(kg1).__name__.replace('_','-'),
B = type(kg2).__name__.replace('_','-') if kg2 else tool.desc,
A = type(kg1).__name__.replace('_', '-'),
B = type(kg2).__name__.replace('_', '-') if kg2 else tool.desc,
N = proto.network,
c = proto.coin,
n = e.name if e else '---',
a = addr_type.name )))
a = addr_type.name)))
global last_t
last_t = time.time()
@ -346,7 +346,7 @@ def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data):
'\nreduced: {r}' +
'\n{:9} {{k}}'.format(addr_type.wif_label+':') +
('\nviewkey: {v}' if 'viewkey' in addr_type.extra_attrs else '') +
'\naddr: {a}\n' )
'\naddr: {a}\n')
group_order = CoinProtocol.Secp256k1.secp256k1_group_order
@ -363,35 +363,35 @@ def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data):
)
qmsg(purple('edge cases:'))
for i,privbytes in enumerate(edgecase_sks):
do_ab_inner(i,len(edgecase_sks),privbytes)
for i, privbytes in enumerate(edgecase_sks):
do_ab_inner(i, len(edgecase_sks), privbytes)
qmsg(green('\rOK ' if cfg.verbose else 'OK'))
qmsg(purple('random input:'))
for i,privbytes in enumerate(get_randbytes()):
do_ab_inner(i,scfg.rounds,privbytes)
for i, privbytes in enumerate(get_randbytes()):
do_ab_inner(i, scfg.rounds, privbytes)
qmsg(green('\rOK ' if cfg.verbose else 'OK'))
def init_tool(proto,addr_type,toolname):
return globals()['GenTool'+capfirst(toolname.replace('-','_'))](proto,addr_type)
def init_tool(proto, addr_type, toolname):
return globals()['GenTool'+capfirst(toolname.replace('-', '_'))](proto, addr_type)
def ab_test(proto,scfg):
def ab_test(proto, scfg):
addr_type = MMGenAddrType( proto=proto, id_str=cfg.type or proto.dfl_mmtype )
addr_type = MMGenAddrType(proto=proto, id_str=cfg.type or proto.dfl_mmtype)
if scfg.gen2:
assert scfg.gen1 != 'all', "'all' must be used only with external tool"
kg2 = KeyGenerator( cfg, proto, addr_type.pubkey_type, scfg.gen2 )
kg2 = KeyGenerator(cfg, proto, addr_type.pubkey_type, scfg.gen2)
tool = None
else:
toolname = find_or_check_tool( proto, addr_type, scfg.tool )
toolname = find_or_check_tool(proto, addr_type, scfg.tool)
if toolname is None:
ymsg(f'Warning: skipping tool {scfg.tool!r} for {proto.coin} {addr_type.name}')
return
tool = init_tool( proto, addr_type, toolname )
tool = init_tool(proto, addr_type, toolname)
kg2 = None
ag = AddrGenerator( cfg, proto, addr_type )
ag = AddrGenerator(cfg, proto, addr_type)
if scfg.all_backends: # check all backends against external tool
for n in range(len(get_backends(addr_type.pubkey_type))):
@ -415,21 +415,21 @@ def ab_test(proto,scfg):
tool = tool,
cache_data = False)
def speed_test(proto,kg,ag,rounds):
def speed_test(proto, kg, ag, rounds):
qmsg(green('Testing speed of address generator {!r} for coin {}'.format(
type(kg).__name__,
proto.coin )))
proto.coin)))
from struct import pack
seed = getrand(28)
qmsg('Incrementing key with each round')
qmsg('Starting key: {}'.format( (seed + pack('I',0)).hex() ))
qmsg('Starting key: {}'.format((seed + pack('I', 0)).hex()))
start = last_t = time.time()
for i in range(rounds):
if time.time() - last_t >= 0.1:
qmsg_r(f'\rRound {i+1}/{rounds} ')
last_t = time.time()
sec = PrivKey( proto, seed+pack('I', i), compressed=ag.compressed, pubkey_type=ag.pubkey_type )
sec = PrivKey(proto, seed+pack('I', i), compressed=ag.compressed, pubkey_type=ag.pubkey_type)
addr = ag.to_addr(kg.gen_data(sec))
vmsg(f'\nkey: {sec.wif}\naddr: {addr}\n')
qmsg(
@ -438,12 +438,12 @@ def speed_test(proto,kg,ag,rounds):
('' if cfg.test_suite_deterministic else f' in {time.time()-start:.2f} seconds')
)
def dump_test(proto,kg,ag,filename):
def dump_test(proto, kg, ag, filename):
with open(filename) as fp:
dump = [[*(e.split()[0] for e in line.split('addr='))] for line in fp.readlines() if 'addr=' in line]
if not dump:
die(1,f'File {filename!r} appears not to be a wallet dump')
die(1, f'File {filename!r} appears not to be a wallet dump')
qmsg(green(
"A: generator pair '{}:{}'\nB: wallet dump {!r}".format(
@ -451,27 +451,27 @@ def dump_test(proto,kg,ag,filename):
type(ag).__name__,
filename)))
for count,(b_wif,b_addr) in enumerate(dump,1):
for count, (b_wif, b_addr) in enumerate(dump, 1):
qmsg_r(f'\rKey {count}/{len(dump)} ')
try:
b_sec = PrivKey(proto,wif=b_wif)
b_sec = PrivKey(proto, wif=b_wif)
except:
die(2,f'\nInvalid {proto.network} WIF address in dump file: {b_wif}')
die(2, f'\nInvalid {proto.network} WIF address in dump file: {b_wif}')
a_addr = ag.to_addr(kg.gen_data(b_sec))
vmsg(f'\nwif: {b_wif}\naddr: {b_addr}\n')
tinfo = (b_sec,b_sec.hex(),b_wif,type(kg).__name__,filename)
test_equal('addresses',a_addr,b_addr,*tinfo)
tinfo = (b_sec, b_sec.hex(), b_wif, type(kg).__name__, filename)
test_equal('addresses', a_addr, b_addr, *tinfo)
qmsg(green(('\n','')[bool(cfg.verbose)] + 'OK'))
qmsg(green(('\n', '')[bool(cfg.verbose)] + 'OK'))
def get_protos(proto,addr_type,toolname):
def get_protos(proto, addr_type, toolname):
init_genonly_altcoins(testnet=proto.testnet)
for coin in cinfo.external_tests[proto.network][toolname]:
if coin.lower() not in CoinProtocol.coins:
continue
ret = init_proto( cfg, coin, testnet=proto.testnet )
ret = init_proto(cfg, coin, testnet=proto.testnet)
if addr_type not in ret.mmtypes:
continue
yield ret
@ -481,15 +481,15 @@ def parse_args():
if len(cfg._args) != 2:
cfg._usage()
arg1,arg2 = cfg._args
gen1,gen2,rounds = (0,0,0)
tool,all_backends,dumpfile = (None,None,None)
arg1, arg2 = cfg._args
gen1, gen2, rounds = (0, 0, 0)
tool, all_backends, dumpfile = (None, None, None)
if is_int(arg1) and is_int(arg2):
test = 'speed'
gen1 = arg1
rounds = arg2
elif is_int(arg1) and os.access(arg2,os.R_OK):
elif is_int(arg1) and os.access(arg2, os.R_OK):
test = 'dump'
gen1 = arg1
dumpfile = arg2
@ -498,12 +498,12 @@ def parse_args():
rounds = arg2
if not is_int(arg2):
die(1,'Second argument must be dump filename or integer rounds specification')
die(1, 'Second argument must be dump filename or integer rounds specification')
try:
a,b = arg1.split(':')
a, b = arg1.split(':')
except:
die(1,'First argument must be a generator backend number or two colon-separated arguments')
die(1, 'First argument must be a generator backend number or two colon-separated arguments')
if is_int(a):
gen1 = a
@ -511,64 +511,65 @@ def parse_args():
if a == 'all':
all_backends = True
else:
die(1,"First part of first argument must be a generator backend number or 'all'")
die(1, "First part of first argument must be a generator backend number or 'all'")
if is_int(b):
if cfg.all_coins:
die(1,'--all-coins must be used with external tool only')
die(1, '--all-coins must be used with external tool only')
gen2 = b
else:
tool = b
ext_progs = list(cinfo.external_tests[cfg._proto.network]) + ['ext']
if b not in ext_progs:
die(1,f'Second part of first argument must be a generator backend number or one of {ext_progs}')
die(1, f'Second part of first argument must be a generator backend number or one of {ext_progs}')
return namedtuple('parsed_args',['test','gen1','gen2','rounds','tool','all_backends','dumpfile'])(
return namedtuple('parsed_args',
['test', 'gen1', 'gen2', 'rounds', 'tool', 'all_backends', 'dumpfile'])(
test,
int(gen1) or None,
int(gen2) or None,
int(rounds) or None,
tool,
all_backends,
dumpfile )
dumpfile)
def main():
scfg = parse_args()
addr_type = MMGenAddrType( proto=proto, id_str=cfg.type or proto.dfl_mmtype )
addr_type = MMGenAddrType(proto=proto, id_str=cfg.type or proto.dfl_mmtype)
if scfg.test == 'ab':
protos = get_protos(proto,addr_type,scfg.tool) if cfg.all_coins else [proto]
protos = get_protos(proto, addr_type, scfg.tool) if cfg.all_coins else [proto]
for p in protos:
ab_test( p, scfg )
ab_test(p, scfg)
else:
kg = KeyGenerator( cfg, proto, addr_type.pubkey_type, scfg.gen1 )
ag = AddrGenerator( cfg, proto, addr_type )
kg = KeyGenerator(cfg, proto, addr_type.pubkey_type, scfg.gen1)
ag = AddrGenerator(cfg, proto, addr_type)
if scfg.test == 'speed':
speed_test( proto, kg, ag, scfg.rounds )
speed_test(proto, kg, ag, scfg.rounds)
elif scfg.test == 'dump':
dump_test( proto, kg, ag, scfg.dumpfile )
dump_test(proto, kg, ag, scfg.dumpfile)
if saved_results:
import json
with open(results_file,'w') as fp:
fp.write(json.dumps( saved_results, indent=4 ))
with open(results_file, 'w') as fp:
fp.write(json.dumps(saved_results, indent=4))
from subprocess import run,PIPE,DEVNULL
from subprocess import run, PIPE, DEVNULL
from collections import namedtuple
from mmgen.protocol import init_proto,CoinProtocol
from mmgen.protocol import init_proto, CoinProtocol
from mmgen.altcoin.params import init_genonly_altcoins
from test.altcointest import TestCoinInfo as cinfo
from mmgen.key import PrivKey
from mmgen.addr import MMGenAddrType
from mmgen.addrgen import KeyGenerator,AddrGenerator
from mmgen.addrgen import KeyGenerator, AddrGenerator
from mmgen.keygen import get_backends
from mmgen.util2 import load_cryptodomex
from test.include.common import getrand,get_ethkey,set_globals
from test.include.common import getrand, get_ethkey, set_globals
gtr = namedtuple('gen_tool_result',['wif','addr','viewkey'])
sd = namedtuple('saved_data_item',['reduced','wif','addr','viewkey'])
gtr = namedtuple('gen_tool_result', ['wif', 'addr', 'viewkey'])
sd = namedtuple('saved_data_item', ['reduced', 'wif', 'addr', 'viewkey'])
sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]

View file

@ -27,7 +27,7 @@ try:
except ImportError:
from test.include import test_init
from mmgen.util import msg,msg_r,die
from mmgen.util import msg, msg_r, die
def green(s):
return '\033[32;1m' + s + '\033[0m'
@ -39,33 +39,33 @@ class TestHashFunc:
h = self.t_cls(b'foo')
if h.H_init != self.H_ref:
m = 'Generated constants H[] differ from reference value:\nReference:\n{}\nGenerated:\n{}'
die(3,m.format([hex(n) for n in self.H_ref],[hex(n) for n in h.H_init]))
die(3, m.format([hex(n) for n in self.H_ref], [hex(n) for n in h.H_init]))
if h.K != self.K_ref:
m = 'Generated constants K[] differ from reference value:\nReference:\n{}\nGenerated:\n{}'
die(3,m.format([hex(n) for n in self.K_ref],[hex(n) for n in h.K]))
die(3, m.format([hex(n) for n in self.K_ref], [hex(n) for n in h.K]))
msg('OK')
def compare_hashes(self,data,chk=None):
def compare_hashes(self, data, chk=None):
if chk is None:
chk = getattr(self.hashlib,self.desc)(data).hexdigest()
chk = getattr(self.hashlib, self.desc)(data).hexdigest()
res = self.t_cls(data).hexdigest()
if res != chk:
m ='\nHashes do not match!\nReference {d}: {}\nMMGen {d}: {}'
die(3,m.format(chk,res,d=self.desc.upper()))
die(3, m.format(chk, res, d=self.desc.upper()))
def test_ref(self):
for i,data in enumerate(self.vectors):
for i, data in enumerate(self.vectors):
msg_r(f'\rTesting reference input data: {i+1:4}/{len(self.vectors)} ')
self.compare_hashes(data.encode(), chk=self.vectors[data])
msg('OK')
def test_random(self,rounds):
def test_random(self, rounds):
if not self.hashlib:
return
for i in range(rounds):
if i+1 in (1,rounds) or not (i+1) % 10:
if i+1 in (1, rounds) or not (i+1) % 10:
msg_r(f'\rTesting random input data: {i+1:4}/{rounds} ')
dlen = int(getrand(4).hex(),16) >> 18
dlen = int(getrand(4).hex(), 16) >> 18
self.compare_hashes(getrand(dlen))
msg('OK')
@ -120,16 +120,16 @@ class TestSha2(TestHashFunc):
desc = 'sha2'
def __init__(self):
from mmgen.sha2 import Sha256,Sha512
from mmgen.sha2 import Sha256, Sha512
import hashlib
self.t_cls = { 'sha256':Sha256, 'sha512':Sha512 }[self.desc]
self.t_cls = {'sha256':Sha256, 'sha512':Sha512}[self.desc]
self.hashlib = hashlib
self.vectors = {k:None for k in TestKeccak.vectors}
class TestSha256(TestSha2):
desc = 'sha256'
H_ref = (
0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19 )
0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19)
K_ref = (
0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5,
0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174,
@ -138,13 +138,13 @@ class TestSha256(TestSha2):
0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85,
0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070,
0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3,
0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2 )
0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2)
class TestSha512(TestSha2):
desc = 'sha512'
H_ref = (
0x6a09e667f3bcc908, 0xbb67ae8584caa73b, 0x3c6ef372fe94f82b, 0xa54ff53a5f1d36f1,
0x510e527fade682d1, 0x9b05688c2b3e6c1f, 0x1f83d9abfb41bd6b, 0x5be0cd19137e2179 )
0x510e527fade682d1, 0x9b05688c2b3e6c1f, 0x1f83d9abfb41bd6b, 0x5be0cd19137e2179)
K_ref = (
0x428a2f98d728ae22, 0x7137449123ef65cd, 0xb5c0fbcfec4d3b2f, 0xe9b5dba58189dbbc, 0x3956c25bf348b538,
0x59f111f1b605d019, 0x923f82a4af194f9b, 0xab1c5ed5da6d8118, 0xd807aa98a3030242, 0x12835b0145706fbe,
@ -161,21 +161,21 @@ class TestSha512(TestSha2):
0x90befffa23631e28, 0xa4506cebde82bde9, 0xbef9a3f7b2c67915, 0xc67178f2e372532b, 0xca273eceea26619c,
0xd186b8c721c0c207, 0xeada7dd6cde0eb1e, 0xf57d4f7fee6ed178, 0x06f067aa72176fba, 0x0a637dc5a2c898a6,
0x113f9804bef90dae, 0x1b710b35131c471b, 0x28db77f523047d84, 0x32caab7b40c72493, 0x3c9ebe0a15c9bebc,
0x431d67c49c100d4c, 0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817 )
0x431d67c49c100d4c, 0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817)
from test.include.common import getrand,set_globals
from test.include.common import getrand, set_globals
from mmgen.cfg import Config
from mmgen.main import launch
def main():
if len(sys.argv) not in (2,3):
die(1,'Test takes 1 or 2 arguments: test name, plus optional rounds count')
if len(sys.argv) not in (2, 3):
die(1, 'Test takes 1 or 2 arguments: test name, plus optional rounds count')
test = sys.argv[1].capitalize()
if test not in ('Sha256','Sha512','Keccak'):
die(1, "Valid choices for test are 'sha256','sha512' or 'keccak'")
if test not in ('Sha256', 'Sha512', 'Keccak'):
die(1, "Valid choices for test are 'sha256', 'sha512' or 'keccak'")
random_rounds = int(sys.argv[2]) if len(sys.argv) == 3 else 500

View file

@ -30,8 +30,8 @@ except ImportError:
from test.include import test_init
from mmgen.cfg import Config
from mmgen.util import msg,msg_r,gmsg,die
from mmgen.color import red,yellow,green,blue,purple,nocolor
from mmgen.util import msg, msg_r, gmsg, die
from mmgen.color import red, yellow, green, blue, purple, nocolor
from mmgen.obj import ListItemAttr
opts_data = {
@ -57,8 +57,8 @@ set_globals(cfg)
from test.objattrtest_py_d.oat_common import sample_objs
pd = namedtuple('attr_bits', ['read_ok','delete_ok','reassign_ok','typeconv','set_none_ok'])
perm_bits = ('read_ok','delete_ok','reassign_ok')
pd = namedtuple('attr_bits', ['read_ok', 'delete_ok', 'reassign_ok', 'typeconv', 'set_none_ok'])
perm_bits = ('read_ok', 'delete_ok', 'reassign_ok')
attr_dfls = {
'reassign_ok': False,
'delete_ok': False,
@ -75,54 +75,54 @@ def parse_attrbits(bits):
bool(0b10000 & bits), # set_none
)
def get_descriptor_obj(objclass,attrname):
for o in (objclass,objclass.__bases__[0]): # assume there's only one base class
def get_descriptor_obj(objclass, attrname):
for o in (objclass, objclass.__bases__[0]): # assume there's only one base class
if attrname in o.__dict__:
return o.__dict__[attrname]
die(4,f'unable to find descriptor {objclass.__name__}.{attrname}')
die(4, f'unable to find descriptor {objclass.__name__}.{attrname}')
def test_attr_perm(obj,attrname,perm_name,perm_value,dobj,attrval_type):
def test_attr_perm(obj, attrname, perm_name, perm_value, dobj, attrval_type):
class SampleObjError(Exception):
pass
pname = perm_name.replace('_ok','')
pname = perm_name.replace('_ok', '')
pstem = pname.rstrip('e')
try:
if perm_name == 'read_ok': # non-existent perm
getattr(obj,attrname)
getattr(obj, attrname)
elif perm_name == 'reassign_ok':
try:
so = sample_objs[attrval_type.__name__]
except Exception as e:
raise SampleObjError(f'unable to find sample object of type {attrval_type.__name__!r}') from e
# ListItemAttr allows setting an attribute if its value is None
if type(dobj) is ListItemAttr and getattr(obj,attrname) is None:
setattr(obj,attrname,so)
setattr(obj,attrname,so)
if type(dobj) is ListItemAttr and getattr(obj, attrname) is None:
setattr(obj, attrname, so)
setattr(obj, attrname, so)
elif perm_name == 'delete_ok':
delattr(obj,attrname)
delattr(obj, attrname)
except SampleObjError as e:
die(4,f'Test script error ({e})')
die(4, f'Test script error ({e})')
except Exception as e:
if perm_value is True:
fs = '{!r}: unable to {} attribute {!r}, though {}ing is allowed ({})'
die(4,fs.format(type(obj).__name__,pname,attrname,pstem,e))
die(4, fs.format(type(obj).__name__, pname, attrname, pstem, e))
else:
if perm_value is False:
fs = '{!r}: attribute {!r} is {n}able, though {n}ing is forbidden'
die(4,fs.format(type(obj).__name__,attrname,n=pstem))
die(4, fs.format(type(obj).__name__, attrname, n=pstem))
def test_attr(data,obj,attrname,dobj,bits,attrval_type):
if hasattr(obj,attrname): # TODO
def test_attr(data, obj, attrname, dobj, bits, attrval_type):
if hasattr(obj, attrname): # TODO
td_attrval_type = data.attrs[attrname][1]
if attrval_type not in (td_attrval_type,type(None)):
if attrval_type not in (td_attrval_type, type(None)):
fs = '\nattribute {!r} of {!r} instance has incorrect type {!r} (should be {!r})'
die(4,fs.format(attrname,type(obj).__name__,attrval_type.__name__,td_attrval_type.__name__))
die(4, fs.format(attrname, type(obj).__name__, attrval_type.__name__, td_attrval_type.__name__))
if hasattr(dobj,'__dict__'):
if hasattr(dobj, '__dict__'):
d = dobj.__dict__
bits = bits._asdict()
colors = {
@ -135,38 +135,38 @@ def test_attr(data,obj,attrname,dobj,bits,attrval_type):
if k in d:
if d[k] != bits[k]:
fs = 'init value {iv}={a} for attr {n!r} does not match test data ({iv}={b})'
die(4,fs.format(iv=k,n=attrname,a=d[k],b=bits[k]))
die(4, fs.format(iv=k, n=attrname, a=d[k], b=bits[k]))
if cfg.verbose and d[k] != attr_dfls[k]:
msg_r(colors[k](f' {k}={d[k]!r}'))
def test_object(mod,test_data,objname):
def test_object(mod, test_data, objname):
if '.' in objname:
on1,on2 = objname.split('.')
cls = getattr(getattr(mod,on1),on2)
on1, on2 = objname.split('.')
cls = getattr(getattr(mod, on1), on2)
else:
cls = getattr(mod,objname)
cls = getattr(mod, objname)
fs = 'Testing attribute ' + ('{!r:<15}{dt:13}' if cfg.show_descriptor_type else '{!r}')
data = test_data[objname]
obj = cls(*data.args,**data.kwargs)
obj = cls(*data.args, **data.kwargs)
for attrname,adata in data.attrs.items():
dobj = get_descriptor_obj(type(obj),attrname)
for attrname, adata in data.attrs.items():
dobj = get_descriptor_obj(type(obj), attrname)
if cfg.verbose:
msg_r(fs.format(attrname,dt=type(dobj).__name__.replace('MMGen','')))
msg_r(fs.format(attrname, dt=type(dobj).__name__.replace('MMGen', '')))
bits = parse_attrbits(adata[0])
test_attr(data,obj,attrname,dobj,bits,adata[1])
for bit_name,bit_value in bits._asdict().items():
test_attr(data, obj, attrname, dobj, bits, adata[1])
for bit_name, bit_value in bits._asdict().items():
if bit_name in perm_bits:
test_attr_perm(obj,attrname,bit_name,bit_value,dobj,adata[1])
test_attr_perm(obj, attrname, bit_name, bit_value, dobj, adata[1])
cfg._util.vmsg('')
def do_loop():
import importlib
modname = f'test.objattrtest_py_d.oat_{proto.coin.lower()}_{proto.network}'
mod = importlib.import_module(modname)
test_data = getattr(mod,'tests')
test_data = getattr(mod, 'tests')
gmsg(f'Running immutable attribute tests for {proto.coin} {proto.network}')
utests = cfg._args
@ -174,7 +174,7 @@ def do_loop():
if utests and obj not in utests:
continue
msg((blue if cfg.verbose else nocolor)(f'Testing {obj}'))
test_object(mod,test_data,obj)
test_object(mod, test_data, obj)
proto = cfg._proto

View file

@ -20,7 +20,7 @@
test/objtest.py: Test MMGen data objects
"""
import os,re
import os, re
try:
from include import test_init
@ -35,8 +35,8 @@ if not os.getenv('MMGEN_DEVTOOLS'):
init_dev()
from mmgen.cfg import Config
from mmgen.util import msg,msg_r,gmsg,capfirst,die
from mmgen.color import red,yellow,blue,green,orange,purple,gray,nocolor
from mmgen.util import msg, msg_r, gmsg, capfirst, die
from mmgen.color import red, yellow, blue, green, orange, purple, gray, nocolor
from mmgen.obj import get_obj
opts_data = {
@ -64,14 +64,14 @@ if cfg.verbose:
from test.include.common import set_globals
set_globals(cfg)
def run_test(mod,test,arg,input_data,arg1,exc_name):
def run_test(mod, test, arg, input_data, arg1, exc_name):
arg_copy = arg
kwargs = {}
ret_chk = arg
ret_idx = None
if input_data == 'good' and isinstance(arg,tuple):
arg,ret_chk = arg
if isinstance(arg,dict): # pass one arg + kwargs to constructor
if input_data == 'good' and isinstance(arg, tuple):
arg, ret_chk = arg
if isinstance(arg, dict): # pass one arg + kwargs to constructor
arg_copy = arg.copy()
if 'arg' in arg:
args = [arg['arg']]
@ -93,7 +93,7 @@ def run_test(mod,test,arg,input_data,arg1,exc_name):
del arg['ret_idx']
del arg_copy['ret_idx']
kwargs.update(arg)
elif isinstance(arg,tuple):
elif isinstance(arg, tuple):
args = arg
else:
args = [arg]
@ -107,23 +107,23 @@ def run_test(mod,test,arg,input_data,arg1,exc_name):
try:
if not cfg.super_silent:
arg_disp = repr(arg_copy[0] if isinstance(arg_copy,tuple) else arg_copy)
if cfg.test_suite_deterministic and isinstance(arg_copy,dict):
arg_disp = re.sub(r'object at 0x[0-9a-f]+','object at [SCRUBBED]',arg_disp)
arg_disp = repr(arg_copy[0] if isinstance(arg_copy, tuple) else arg_copy)
if cfg.test_suite_deterministic and isinstance(arg_copy, dict):
arg_disp = re.sub(r'object at 0x[0-9a-f]+', 'object at [SCRUBBED]', arg_disp)
msg_r((green if input_data=='good' else orange)(f'{arg_disp+":":<22}'))
cls = getattr(mod,test)
cls = getattr(mod, test)
if cfg.getobj:
ret = get_obj(getattr(mod,test),**kwargs)
ret = get_obj(getattr(mod, test), **kwargs)
else:
ret = cls(*args,**kwargs)
ret = cls(*args, **kwargs)
bad_ret = [] if issubclass(cls,list) else None
bad_ret = [] if issubclass(cls, list) else None
if isinstance(ret_chk,str):
if isinstance(ret_chk, str):
ret_chk = ret_chk.encode()
if isinstance(ret,str):
if isinstance(ret, str):
ret = ret.encode()
if cfg.getobj:
@ -154,12 +154,12 @@ def run_test(mod,test,arg,input_data,arg1,exc_name):
ret_disp = ret
msg(f'==> {ret_disp!r}')
if cfg.verbose and issubclass(cls,MMGenObject):
ret.pmsg() if hasattr(ret,'pmsg') else pmsg(ret)
if cfg.verbose and issubclass(cls, MMGenObject):
ret.pmsg() if hasattr(ret, 'pmsg') else pmsg(ret)
except UserWarning as e:
msg(f'==> {ret!r}')
die(2,red(str(e)))
die(2, red(str(e)))
except Exception as e:
if input_data == 'good':
raise ValueError(f'Error on good input data: {e}') from e
@ -182,7 +182,7 @@ def do_loop():
import importlib
modname = f'test.objtest_py_d.ot_{proto.coin.lower()}_{proto.network}'
mod = importlib.import_module(modname)
test_data = getattr(mod,'tests')
test_data = getattr(mod, 'tests')
gmsg(f'Running data object tests for {proto.coin} {proto.network}')
clr = None
@ -191,8 +191,8 @@ def do_loop():
arg1 = test_data[test].get('arg1')
if utests and test not in utests:
continue
nl = ('\n','')[bool(cfg.super_silent) or clr is None]
clr = (blue,nocolor)[bool(cfg.super_silent)]
nl = ('\n', '')[bool(cfg.super_silent) or clr is None]
clr = (blue, nocolor)[bool(cfg.super_silent)]
if cfg.getobj and arg1 is None:
msg(gray(f'{nl}Skipping {test}'))
@ -200,7 +200,7 @@ def do_loop():
msg(clr(f'{nl}Testing {test}'))
for k in ('bad','good'):
for k in ('bad', 'good'):
if not cfg.super_silent:
msg(purple(capfirst(k)+' input:'))
for arg in test_data[test][k]:
@ -210,8 +210,7 @@ def do_loop():
arg,
input_data = k,
arg1 = arg1,
exc_name = test_data[test].get('exc_name') or ('ObjectInitError','None')[k=='good'],
)
exc_name = test_data[test].get('exc_name') or ('ObjectInitError', 'None')[k=='good'])
proto = cfg._proto

View file

@ -49,7 +49,7 @@ opts_data = {
-v, --verbose Produce more verbose output
""",
'notes': """
Valid commands: 'coin','pw'
Valid commands: 'coin', 'pw'
If no command is given, the whole suite of tests is run.
"""
}
@ -109,7 +109,7 @@ def make_cmd(progname, opts, add_opts, args):
def run_cmd(cmd):
cp = run(cmd, stdout=PIPE, stderr=PIPE, text=True, env=run_env)
if cp.returncode != 0:
die(2,f'\nSpawned program exited with error code {cp.returncode}:\n{cp.stderr}')
die(2, f'\nSpawned program exited with error code {cp.returncode}:\n{cp.stderr}')
return cp.stdout.splitlines()
def run_test(progname, opts, add_opts, args, test_data, addr_desc, opts_w):
@ -121,15 +121,15 @@ def run_test(progname, opts, add_opts, args, test_data, addr_desc, opts_w):
lines = run_cmd(cmd)
cmd_out = dict([e[9:].split(': ') for e in lines if e.startswith('sc_debug_')])
cmd_out['addr'] = lines[-2].split(None,1)[-1]
cmd_out['addr'] = lines[-2].split(None, 1)[-1]
ref_data = test_data._asdict()
for k in ref_data:
if cmd_out[k] == ref_data[k]:
s = k.replace('seed','seed[:8]').replace('addr',addr_desc)
s = k.replace('seed', 'seed[:8]').replace('addr', addr_desc)
cfg._util.vmsg(f' {s:9}: {cmd_out[k]}')
else:
die(4,f'\nError: sc_{k} value {cmd_out[k]} does not match reference value {ref_data[k]}')
die(4, f'\nError: sc_{k} value {cmd_out[k]} does not match reference value {ref_data[k]}')
msg(green('OK') if cfg.verbose else 'OK')
def make_coin_test_data():
@ -143,7 +143,7 @@ def make_coin_test_data():
opts = list_gen(
[f'--coin={coin}'],
[f'--type={mmtype}', mmtype],
[ '--cashaddr=0', coin == 'bch']
['--cashaddr=0', coin == 'bch']
)
yield ('mmgen-addrgen', opts, [], [], test_data, 'address')

View file

@ -20,8 +20,8 @@
test/tooltest.py: Tests for the 'mmgen-tool' utility
"""
import sys,os,time
from subprocess import run,PIPE
import sys, os, time
from subprocess import run, PIPE
try:
from include.test_init import repo_root
@ -29,8 +29,8 @@ except ImportError:
from test.include.test_init import repo_root
from mmgen.cfg import Config
from mmgen.color import red,yellow,green,blue,cyan
from mmgen.util import msg,msg_r,Msg,die
from mmgen.color import red, yellow, green, blue, cyan
from mmgen.util import msg, msg_r, Msg, die
opts_data = {
'text': {
@ -78,28 +78,28 @@ vmsg = cfg._util.vmsg
proto = cfg._proto
assert cfg.type in (None,'zcash_z'), 'Only zcash-z permitted for --type argument'
assert cfg.type in (None, 'zcash_z'), 'Only zcash-z permitted for --type argument'
cmd_data = {
'cryptocoin': {
'desc': 'Cryptocoin address/key commands',
'cmd_data': {
'randwif': (),
'randpair': (), # create 4 pairs: uncomp,comp,segwit,bech32
'wif2addr': ('randpair','o4'),
'wif2hex': ('randpair','o4'),
'privhex2pubhex': ('wif2hex','o3'), # segwit only
'pubhex2addr': ('privhex2pubhex','o3'), # segwit only
'hex2wif': ('wif2hex','io2'), # uncomp, comp
'addr2pubhash': ('randpair','o4'), # uncomp, comp, bech32
'pubhash2addr': ('addr2pubhash','io4'), # uncomp, comp, bech32
'randpair': (), # create 4 pairs: uncomp, comp, segwit, bech32
'wif2addr': ('randpair', 'o4'),
'wif2hex': ('randpair', 'o4'),
'privhex2pubhex': ('wif2hex', 'o3'), # segwit only
'pubhex2addr': ('privhex2pubhex', 'o3'), # segwit only
'hex2wif': ('wif2hex', 'io2'), # uncomp, comp
'addr2pubhash': ('randpair', 'o4'), # uncomp, comp, bech32
'pubhash2addr': ('addr2pubhash', 'io4'), # uncomp, comp, bech32
},
},
'mnemonic': {
'desc': 'mnemonic commands',
'cmd_data': {
'hex2mn': (),
'mn2hex': ('hex2mn','io3'),
'mn2hex': ('hex2mn', 'io3'),
'mn_rand128': (),
'mn_rand192': (),
'mn_rand256': (),
@ -109,13 +109,13 @@ cmd_data = {
},
}
if proto.coin in ('BTC','LTC'):
if proto.coin in ('BTC', 'LTC'):
cmd_data['cryptocoin']['cmd_data'].update({
'pubhex2redeem_script': ('privhex2pubhex','o3'),
'wif2redeem_script': ('randpair','o3'),
'wif2segwit_pair': ('randpair','o2'),
'privhex2addr': ('wif2hex','o4'), # compare with output of randpair
'pipetest': ('randpair','o3')
'pubhex2redeem_script': ('privhex2pubhex', 'o3'),
'wif2redeem_script': ('randpair', 'o3'),
'wif2segwit_pair': ('randpair', 'o2'),
'privhex2addr': ('wif2hex', 'o4'), # compare with output of randpair
'pipetest': ('randpair', 'o3')
})
if proto.coin == 'XMR' or cfg.type == 'zcash_z':
@ -146,47 +146,48 @@ tcfg = {
ref_subdir = '' if proto.base_coin == 'BTC' else proto.name.lower()
altcoin_pfx = '' if proto.base_coin == 'BTC' else '-'+proto.base_coin
tn_ext = ('','.testnet')[proto.testnet]
tn_ext = ('', '.testnet')[proto.testnet]
spawn_cmd = [
'scripts/exec_wrapper.py',
os.path.relpath(os.path.join(repo_root,'cmds','mmgen-tool')) ]
os.path.relpath(os.path.join(repo_root, 'cmds', 'mmgen-tool'))]
if cfg.coverage:
d,f = init_coverage()
spawn_cmd = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f] + spawn_cmd
d, f = init_coverage()
spawn_cmd = ['python3', '-m', 'trace', '--count', '--coverdir='+d, '--file='+f] + spawn_cmd
elif sys.platform == 'win32':
spawn_cmd = ['python3'] + spawn_cmd
add_spawn_args = ['--data-dir='+tcfg['tmpdir']] + ['--{}{}'.format(
k.replace('_','-'),
'='+getattr(cfg,k) if getattr(cfg,k) is not True else '')
for k in ('testnet','rpc_host','regtest','coin','type') if getattr(cfg,k)]
k.replace('_', '-'),
'='+getattr(cfg, k) if getattr(cfg, k) is not True else '')
for k in ('testnet', 'rpc_host', 'regtest', 'coin', 'type') if getattr(cfg, k)]
if cfg.list_cmds:
fs = ' {:<{w}} - {}'
Msg('Available commands:')
w = max(map(len,cmd_data))
w = max(map(len, cmd_data))
for cmd in cmd_data:
Msg(fs.format(cmd,cmd_data[cmd]['desc'],w=w))
Msg(fs.format(cmd, cmd_data[cmd]['desc'], w=w))
Msg('\nAvailable utilities:')
Msg(fs.format('clean','Clean the tmp directory',w=w))
Msg(fs.format('clean', 'Clean the tmp directory', w=w))
sys.exit(0)
if cfg.testing_status:
tested_in = {
'tooltest.py': [],
'cmdtest.py': (
'encrypt','decrypt','find_incog_data',
'addrfile_chksum','keyaddrfile_chksum','passwdfile_chksum',
'add_label','remove_label','remove_address','twview',
'getbalance','listaddresses','listaddress',
'daemon_version','decrypt_keystore','decrypt_geth_keystore',
'mn2hex_interactive','rand2file',
'rescan_address','rescan_blockchain','resolve_address',
'twexport','twimport','txhist'
'encrypt', 'decrypt', 'find_incog_data',
'addrfile_chksum', 'keyaddrfile_chksum', 'passwdfile_chksum',
'add_label', 'remove_label', 'remove_address', 'twview',
'getbalance', 'listaddresses', 'listaddress',
'daemon_version', 'decrypt_keystore', 'decrypt_geth_keystore',
'mn2hex_interactive', 'rand2file',
'rescan_address', 'rescan_blockchain', 'resolve_address',
'twexport', 'twimport', 'txhist'
),
'tooltest2.py': run(
['python3','test/tooltest2.py','--list-tested-cmds'],
['python3', 'test/tooltest2.py', '--list-tested-cmds'],
stdout = PIPE,
check = True
).stdout.decode().split()
@ -195,7 +196,7 @@ if cfg.testing_status:
tested_in['tooltest.py'] += list(v['cmd_data'].keys())
Msg(green("Testing status of 'mmgen-tool' commands:"))
for l in ('tooltest.py','tooltest2.py','cmdtest.py'):
for l in ('tooltest.py', 'tooltest2.py', 'cmdtest.py'):
Msg('\n ' + blue(l+':'))
Msg(' '+'\n '.join(sorted(tested_in[l])))
@ -209,34 +210,46 @@ if cfg.testing_status:
set(tested_in['cmdtest.py'])
)
if uc:
Msg(yellow('\n {}\n {}'.format('Untested commands:','\n '.join(uc))))
Msg(yellow('\n {}\n {}'.format('Untested commands:', '\n '.join(uc))))
sys.exit(0)
from mmgen.key import is_wif
from mmgen.addr import is_coin_addr
def is_wif_loc(s):
return is_wif(proto,s)
return is_wif(proto, s)
def is_coin_addr_loc(s):
return is_coin_addr(proto,s)
return is_coin_addr(proto, s)
msg_w = 35
def test_msg(m):
msg_r(green(f'Testing {m}\n') if cfg.verbose else '{:{w}}'.format(f'Testing {m}', w=msg_w+8))
compressed = cfg.type or ('','compressed')['C' in proto.mmtypes]
segwit = ('','segwit')['S' in proto.mmtypes]
bech32 = ('','bech32')['B' in proto.mmtypes]
type_compressed_arg = ([],['--type=' + (cfg.type or 'compressed')])[bool(cfg.type) or 'C' in proto.mmtypes]
type_segwit_arg = ([],['--type=segwit'])['S' in proto.mmtypes]
type_bech32_arg = ([],['--type=bech32'])['B' in proto.mmtypes]
compressed = cfg.type or ('', 'compressed')['C' in proto.mmtypes]
segwit = ('', 'segwit')['S' in proto.mmtypes]
bech32 = ('', 'bech32')['B' in proto.mmtypes]
type_compressed_arg = ([], ['--type=' + (cfg.type or 'compressed')])[bool(cfg.type) or 'C' in proto.mmtypes]
type_segwit_arg = ([], ['--type=segwit'])['S' in proto.mmtypes]
type_bech32_arg = ([], ['--type=bech32'])['B' in proto.mmtypes]
class MMGenToolTestUtils:
def run_cmd(self,name,tool_args,kwargs='',extra_msg='',silent=False,strip=True,add_opts=[],binary=False):
def run_cmd(
self,
name,
tool_args,
kwargs = '',
extra_msg = '',
silent = False,
strip = True,
add_opts = [],
binary = False):
sys_cmd = (
spawn_cmd +
add_spawn_args +
['-r0','-d',tcfg['tmpdir']] +
['-r0', '-d', tcfg['tmpdir']] +
add_opts +
[name.lower()] +
tool_args +
@ -250,9 +263,9 @@ class MMGenToolTestUtils:
sys.stderr.write(green(f'Testing {full_name}\nExecuting '))
sys.stderr.write(cyan(' '.join(sys_cmd)+'\n'))
else:
msg_r('Testing {:{w}}'.format( full_name+':', w=msg_w ))
msg_r('Testing {:{w}}'.format(full_name+':', w=msg_w))
cp = run(sys_cmd,stdout=PIPE,stderr=PIPE)
cp = run(sys_cmd, stdout=PIPE, stderr=PIPE)
out = cp.stdout
err = cp.stderr
if cfg.debug:
@ -267,65 +280,79 @@ class MMGenToolTestUtils:
msg('{}\n{}\n{}'.format(
red('FAILED'),
yellow('Command stderr output:'),
err.decode() ))
die(2,f'Called process returned with an error (retcode {cp.returncode})')
return (out,out.rstrip())[bool(strip)]
err.decode()))
die(2, f'Called process returned with an error (retcode {cp.returncode})')
return (out, out.rstrip())[bool(strip)]
def run_cmd_chk(self,name,f1,f2,kwargs='',extra_msg='',strip_hex=False,add_opts=[]):
def run_cmd_chk(self, name, f1, f2, kwargs='', extra_msg='', strip_hex=False, add_opts=[]):
idata = read_from_file(f1).rstrip()
odata = read_from_file(f2).rstrip()
ret = self.run_cmd(name,[odata],kwargs=kwargs,extra_msg=extra_msg,add_opts=add_opts)
ret = self.run_cmd(name, [odata], kwargs=kwargs, extra_msg=extra_msg, add_opts=add_opts)
vmsg('In: ' + repr(odata))
vmsg('Out: ' + repr(ret))
def cmp_equal(a,b):
def cmp_equal(a, b):
return (a.lstrip('0') == b.lstrip('0')) if strip_hex else (a == b)
if cmp_equal(ret,idata):
if cmp_equal(ret, idata):
ok()
else:
die(4, f"Error: values don't match:\nIn: {idata!r}\nOut: {ret!r}")
return ret
def run_cmd_nochk(self,name,f1,kwargs='',add_opts=[]):
def run_cmd_nochk(self, name, f1, kwargs='', add_opts=[]):
odata = read_from_file(f1).rstrip()
ret = self.run_cmd(name,[odata],kwargs=kwargs,add_opts=add_opts)
ret = self.run_cmd(name, [odata], kwargs=kwargs, add_opts=add_opts)
vmsg('In: ' + repr(odata))
vmsg('Out: ' + repr(ret))
return ret
def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='',
literal=False,chkdata='',hush=False,add_opts=[]):
def run_cmd_out(
self,
name,
carg = None,
Return = False,
kwargs = '',
fn_idx = '',
extra_msg = '',
literal = False,
chkdata = '',
hush = False,
add_opts = []):
if carg:
write_to_tmpfile(tcfg,f'{name}{fn_idx}.in',carg+'\n')
ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs,
extra_msg=extra_msg,add_opts=add_opts)
write_to_tmpfile(tcfg, f'{name}{fn_idx}.in', carg+'\n')
ret = self.run_cmd(
name,
([], [carg])[bool(carg)],
kwargs = kwargs,
extra_msg = extra_msg,
add_opts = add_opts)
if carg:
vmsg('In: ' + repr(carg))
vmsg('Out: ' + (repr(ret),ret)[literal])
vmsg('Out: ' + (repr(ret), ret)[literal])
if ret or ret == '':
write_to_tmpfile(tcfg,f'{name}{fn_idx}.out',ret+'\n')
write_to_tmpfile(tcfg, f'{name}{fn_idx}.out', ret+'\n')
if chkdata:
cmp_or_die(ret,chkdata)
cmp_or_die(ret, chkdata)
return
if Return:
return ret
elif not hush:
ok()
else:
die(4,f'Error for command {name!r}')
die(4, f'Error for command {name!r}')
def run_cmd_randinput(self,name,strip=True,add_opts=[]):
def run_cmd_randinput(self, name, strip=True, add_opts=[]):
s = getrand(128)
fn = name+'.in'
write_to_tmpfile(tcfg,fn,s,binary=True)
ret = self.run_cmd(name,[get_tmpfile(tcfg,fn)],strip=strip,add_opts=add_opts)
write_to_tmpfile(tcfg, fn, s, binary=True)
ret = self.run_cmd(name, [get_tmpfile(tcfg, fn)], strip=strip, add_opts=add_opts)
fn = name+'.out'
write_to_tmpfile(tcfg,fn,ret+'\n')
write_to_tmpfile(tcfg, fn, ret+'\n')
ok()
vmsg(f'Returned: {ret}')
tu = MMGenToolTestUtils()
def ok_or_die(val,chk_func,s,skip_ok=False):
def ok_or_die(val, chk_func, s, skip_ok=False):
try:
ret = chk_func(val)
except:
@ -334,131 +361,131 @@ def ok_or_die(val,chk_func,s,skip_ok=False):
if not skip_ok:
ok()
else:
die(4,f'Returned value {val!r} is not a {s}')
die(4, f'Returned value {val!r} is not a {s}')
class MMGenToolTestCmds:
# Cryptocoin
def randwif(self,name):
for n,k in enumerate(['',compressed]):
def randwif(self, name):
for n, k in enumerate(['', compressed]):
ao = ['--type='+k] if k else []
ret = tu.run_cmd_out(name,add_opts=ao,Return=True,fn_idx=n+1)
ok_or_die(ret,is_wif_loc,'WIF key')
def randpair(self,name):
for n,k in enumerate(['',compressed,segwit,bech32]):
ret = tu.run_cmd_out(name, add_opts=ao, Return=True, fn_idx=n+1)
ok_or_die(ret, is_wif_loc, 'WIF key')
def randpair(self, name):
for n, k in enumerate(['', compressed, segwit, bech32]):
ao = ['--type='+k] if k else []
wif,addr = tu.run_cmd_out(name,add_opts=ao,Return=True,fn_idx=n+1,literal=True).split()
ok_or_die(wif,is_wif_loc,'WIF key',skip_ok=True)
ok_or_die(addr,is_coin_addr_loc,'Coin address')
def wif2addr(self,name,f1,f2,f3,f4):
for n,f,k in (
(1,f1,''),
(2,f2,compressed),
(3,f3,segwit),
(4,f4,bech32)
):
wif, addr = tu.run_cmd_out(name, add_opts=ao, Return=True, fn_idx=n+1, literal=True).split()
ok_or_die(wif, is_wif_loc, 'WIF key', skip_ok=True)
ok_or_die(addr, is_coin_addr_loc, 'Coin address')
def wif2addr(self, name, f1, f2, f3, f4):
for n, f, k in (
(1, f1, ''),
(2, f2, compressed),
(3, f3, segwit),
(4, f4, bech32)):
ao = ['--type='+k] if k else []
wif = read_from_file(f).split()[0]
tu.run_cmd_out(name,wif,add_opts=ao,fn_idx=n)
def wif2hex(self,name,f1,f2,f3,f4):
for n,f,m in (
(1,f1,''),
(2,f2,compressed),
(3,f3,'{} for {}'.format( compressed or 'uncompressed', segwit or 'p2pkh' )),
(4,f4,'{} for {}'.format( compressed or 'uncompressed', bech32 or 'p2pkh' ))
):
tu.run_cmd_out(name, wif, add_opts=ao, fn_idx=n)
def wif2hex(self, name, f1, f2, f3, f4):
for n, f, m in (
(1, f1, ''),
(2, f2, compressed),
(3, f3, '{} for {}'.format(compressed or 'uncompressed', segwit or 'p2pkh')),
(4, f4, '{} for {}'.format(compressed or 'uncompressed', bech32 or 'p2pkh'))):
wif = read_from_file(f).split()[0]
tu.run_cmd_out(name,wif,fn_idx=n,extra_msg=m)
def privhex2addr(self,name,f1,f2,f3,f4):
keys = [read_from_file(f).rstrip() for f in (f1,f2,f3,f4)]
for n,k in enumerate(('',compressed,segwit,bech32)):
tu.run_cmd_out(name, wif, fn_idx=n, extra_msg=m)
def privhex2addr(self, name, f1, f2, f3, f4):
keys = [read_from_file(f).rstrip() for f in (f1, f2, f3, f4)]
for n, k in enumerate(('', compressed, segwit, bech32)):
ao = ['--type='+k] if k else []
ret = tu.run_cmd(name,[keys[n]],add_opts=ao).rstrip()
iaddr = read_from_tmpfile(tcfg,f'randpair{n+1}.out').split()[-1]
ret = tu.run_cmd(name, [keys[n]], add_opts=ao).rstrip()
iaddr = read_from_tmpfile(tcfg, f'randpair{n+1}.out').split()[-1]
vmsg(f'Out: {ret}')
cmp_or_die(iaddr,ret)
cmp_or_die(iaddr, ret)
ok()
def hex2wif(self,name,f1,f2,f3,f4):
for fi,fo,k in (
(f1,f2,''),
(f3,f4,compressed)):
def hex2wif(self, name, f1, f2, f3, f4):
for fi, fo, k in (
(f1, f2, ''),
(f3, f4, compressed)):
ao = ['--type='+k] if k else []
tu.run_cmd_chk(name,fi,fo,add_opts=ao)
def addr2pubhash(self,name,f1,f2,f3,f4):
for n,f,m,ao in (
(1,f1,'',[]),
(2,f2,'from {}'.format( compressed or 'uncompressed' ),[]),
(4,f4,'',type_bech32_arg),
):
tu.run_cmd_chk(name, fi, fo, add_opts=ao)
def addr2pubhash(self, name, f1, f2, f3, f4):
for n, f, m, ao in (
(1, f1, '', []),
(2, f2, 'from {}'.format(compressed or 'uncompressed'), []),
(4, f4, '', type_bech32_arg)):
addr = read_from_file(f).split()[-1]
tu.run_cmd_out(name,addr,fn_idx=n,add_opts=ao,extra_msg=m)
def pubhash2addr(self,name,f1,f2,f3,f4,f5,f6,f7,f8):
for _,fi,fo,m,ao in (
(1,f1,f2,'',[]),
(2,f3,f4,'from {}'.format( compressed or 'uncompressed' ),[]),
(4,f7,f8,'',type_bech32_arg)
):
tu.run_cmd_chk(name,fi,fo,add_opts=ao,extra_msg=m)
def privhex2pubhex(self,name,f1,f2,f3): # from Hex2wif
tu.run_cmd_out(name, addr, fn_idx=n, add_opts=ao, extra_msg=m)
def pubhash2addr(self, name, f1, f2, f3, f4, f5, f6, f7, f8):
for _, fi, fo, m, ao in (
(1, f1, f2, '', []),
(2, f3, f4, 'from {}'.format(compressed or 'uncompressed'), []),
(4, f7, f8, '', type_bech32_arg)):
tu.run_cmd_chk(name, fi, fo, add_opts=ao, extra_msg=m)
def privhex2pubhex(self, name, f1, f2, f3): # from Hex2wif
addr = read_from_file(f3).strip()
tu.run_cmd_out(name,addr,add_opts=type_compressed_arg,fn_idx=3) # what about uncompressed?
def pubhex2redeem_script(self,name,f1,f2,f3): # from above
tu.run_cmd_out(name, addr, add_opts=type_compressed_arg, fn_idx=3) # what about uncompressed?
def pubhex2redeem_script(self, name, f1, f2, f3): # from above
addr = read_from_file(f3).strip()
tu.run_cmd_out(name,addr,add_opts=type_segwit_arg,fn_idx=3)
rs = read_from_tmpfile(tcfg,'privhex2pubhex3.out').strip()
tu.run_cmd_out('pubhex2addr',rs,add_opts=type_segwit_arg,fn_idx=3,hush=True)
addr1 = read_from_tmpfile(tcfg,'pubhex2addr3.out').strip()
addr2 = read_from_tmpfile(tcfg,'randpair3.out').split()[1]
cmp_or_die(addr1,addr2)
tu.run_cmd_out(name, addr, add_opts=type_segwit_arg, fn_idx=3)
rs = read_from_tmpfile(tcfg, 'privhex2pubhex3.out').strip()
tu.run_cmd_out('pubhex2addr', rs, add_opts=type_segwit_arg, fn_idx=3, hush=True)
addr1 = read_from_tmpfile(tcfg, 'pubhex2addr3.out').strip()
addr2 = read_from_tmpfile(tcfg, 'randpair3.out').split()[1]
cmp_or_die(addr1, addr2)
ok()
def wif2redeem_script(self,name,f1,f2,f3): # compare output with above
def wif2redeem_script(self, name, f1, f2, f3): # compare output with above
wif = read_from_file(f3).split()[0]
ret1 = tu.run_cmd_out(name,wif,add_opts=type_segwit_arg,fn_idx=3,Return=True)
ret2 = read_from_tmpfile(tcfg,'pubhex2redeem_script3.out').strip()
cmp_or_die(ret1,ret2)
ret1 = tu.run_cmd_out(name, wif, add_opts=type_segwit_arg, fn_idx=3, Return=True)
ret2 = read_from_tmpfile(tcfg, 'pubhex2redeem_script3.out').strip()
cmp_or_die(ret1, ret2)
ok()
def wif2segwit_pair(self,name,f1,f2): # does its own checking, so just run
def wif2segwit_pair(self, name, f1, f2): # does its own checking, so just run
wif = read_from_file(f2).split()[0]
tu.run_cmd_out(name,wif,add_opts=type_segwit_arg,fn_idx=2)
def pubhex2addr(self,name,f1,f2,f3):
tu.run_cmd_out(name, wif, add_opts=type_segwit_arg, fn_idx=2)
def pubhex2addr(self, name, f1, f2, f3):
addr = read_from_file(f3).strip()
tu.run_cmd_out(name,addr,add_opts=type_segwit_arg,fn_idx=3)
tu.run_cmd_out(name, addr, add_opts=type_segwit_arg, fn_idx=3)
def pipetest(self,name,f1,f2,f3):
def pipetest(self, name, f1, f2, f3):
wif = read_from_file(f3).split()[0]
cmd = ( '{c} {a} wif2hex {wif}' +
' | {c} {a} --type=compressed privhex2pubhex -' +
' | {c} {a} --type=segwit pubhex2redeem_script -' +
' | {c} {a} --type=segwit redeem_script2addr -').format(
c=' '.join(spawn_cmd),
a=' '.join(add_spawn_args),
wif=wif)
cmd = (
'{c} {a} wif2hex {wif}' +
' | {c} {a} --type=compressed privhex2pubhex -' +
' | {c} {a} --type=segwit pubhex2redeem_script -' +
' | {c} {a} --type=segwit redeem_script2addr -').format(
c = ' '.join(spawn_cmd),
a = ' '.join(add_spawn_args),
wif = wif)
test_msg('command piping')
if cfg.verbose:
sys.stderr.write(green('Executing ') + cyan(cmd) + '\n')
res = run(cmd,stdout=PIPE,shell=True).stdout.decode().strip()
addr = read_from_tmpfile(tcfg,'wif2addr3.out').strip()
cmp_or_die(addr,res)
res = run(cmd, stdout=PIPE, shell=True).stdout.decode().strip()
addr = read_from_tmpfile(tcfg, 'wif2addr3.out').strip()
cmp_or_die(addr, res)
ok()
# Mnemonic
def hex2mn(self,name):
for n,size,m in ((1,16,'128-bit'),(2,24,'192-bit'),(3,32,'256-bit')):
def hex2mn(self, name):
for n, size, m in (
(1, 16, '128-bit'),
(2, 24, '192-bit'),
(3, 32, '256-bit')):
hexnum = getrandhex(size)
tu.run_cmd_out(name,hexnum,fn_idx=n,extra_msg=m)
def mn2hex(self,name,f1,f2,f3,f4,f5,f6):
for f_i,f_o,m in ((f1,f2,'128-bit'),(f3,f4,'192-bit'),(f5,f6,'256-bit')):
tu.run_cmd_chk(name,f_i,f_o,extra_msg=m,strip_hex=True)
def mn_rand128(self,name):
tu.run_cmd_out(name, hexnum, fn_idx=n, extra_msg=m)
def mn2hex(self, name, f1, f2, f3, f4, f5, f6):
for f_i, f_o, m in ((f1, f2, '128-bit'), (f3, f4, '192-bit'), (f5, f6, '256-bit')):
tu.run_cmd_chk(name, f_i, f_o, extra_msg=m, strip_hex=True)
def mn_rand128(self, name):
tu.run_cmd_out(name)
def mn_rand192(self,name):
def mn_rand192(self, name):
tu.run_cmd_out(name)
def mn_rand256(self,name):
def mn_rand256(self, name):
tu.run_cmd_out(name)
def mn_stats(self,name):
def mn_stats(self, name):
tu.run_cmd_out(name)
def mn_printlist(self,name):
tu.run_cmd(name,[])
def mn_printlist(self, name):
tu.run_cmd(name, [])
ok()
# main()
@ -468,8 +495,8 @@ mk_tmpdir(tcfg['tmpdir'])
def gen_deps_for_cmd(cdata):
fns = []
if cdata:
name,code = cdata
io,count = (code[:-1],int(code[-1])) if code[-1] in '0123456789' else (code,1)
name, code = cdata
io, count = (code[:-1], int(code[-1])) if code[-1] in '0123456789' else (code, 1)
for c in range(count):
fns += ['{}{}{}'.format(
name,
@ -483,27 +510,27 @@ def do_cmds(cmd_group):
gdata = cmd_data[cmd_group]['cmd_data']
for cmd in gdata:
fns = gen_deps_for_cmd(gdata[cmd])
cmdline = [cmd] + [os.path.join(tcfg['tmpdir'],fn) for fn in fns]
getattr(tc,cmd)(*cmdline)
cmdline = [cmd] + [os.path.join(tcfg['tmpdir'], fn) for fn in fns]
getattr(tc, cmd)(*cmdline)
def main():
if cfg._args:
if len(cfg._args) != 1:
die(1,'Only one command may be specified')
die(1, 'Only one command may be specified')
cmd = cfg._args[0]
if cmd in cmd_data:
cleandir(tcfg['tmpdir'],do_msg=True)
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
cleandir(tcfg['tmpdir'], do_msg=True)
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
do_cmds(cmd)
elif cmd == 'clean':
cleandir(tcfg['tmpdir'],do_msg=True)
cleandir(tcfg['tmpdir'], do_msg=True)
sys.exit(0)
else:
die(1,f'{cmd!r}: unrecognized command')
die(1, f'{cmd!r}: unrecognized command')
else:
cleandir(tcfg['tmpdir'],do_msg=True)
cleandir(tcfg['tmpdir'], do_msg=True)
for cmd in cmd_data:
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
do_cmds(cmd)
if cmd is not list(cmd_data.keys())[-1]:
msg('')

View file

@ -24,22 +24,22 @@ test/tooltest2.py: Test the 'mmgen-tool' utility
# TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
import sys, os, time, importlib, asyncio
from subprocess import run,PIPE
from subprocess import run, PIPE
try:
from include import test_init
except ImportError:
from test.include import test_init
from test.include.common import set_globals,end_msg,init_coverage
from test.include.common import set_globals, end_msg, init_coverage
from mmgen import main_tool
from mmgen.cfg import Config
from mmgen.color import green,blue,purple,cyan,gray
from mmgen.color import green, blue, purple, cyan, gray
from mmgen.util import msg, msg_r, Msg, die
skipped_tests = ['mn2hex_interactive']
coin_dependent_groups = ('Coin','File')
coin_dependent_groups = ('Coin', 'File')
opts_data = {
'text': {
@ -81,7 +81,7 @@ set_globals(cfg)
from test.tooltest2_d.data import *
def fork_cmd(cmd_name,args,opts,stdin_input):
def fork_cmd(cmd_name, args, opts, stdin_input):
cmd = (
tool_cmd_preargs +
tool_cmd +
@ -90,8 +90,8 @@ def fork_cmd(cmd_name,args,opts,stdin_input):
)
vmsg('{} {}'.format(
green('Executing'),
cyan(' '.join(cmd)) ))
cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
cyan(' '.join(cmd))))
cp = run(cmd, input=stdin_input or None, stdout=PIPE, stderr=PIPE)
try:
cmd_out = cp.stdout.decode()
except:
@ -100,11 +100,11 @@ def fork_cmd(cmd_name,args,opts,stdin_input):
vmsg(cp.stderr.strip().decode())
if cp.returncode != 0:
import re
m = re.search(b'tool command returned (None|False)',cp.stderr)
m = re.search(b'tool command returned (None|False)', cp.stderr)
if m:
return eval(m.group(1))
else:
die(2,f'Spawned program exited with error: {cp.stderr}')
die(2, f'Spawned program exited with error: {cp.stderr}')
return cmd_out.strip()
@ -112,50 +112,50 @@ def call_method(cls, method, cmd_name, args, mmtype, stdin_input):
vmsg('{a}: {b}{c}'.format(
a = purple('Running'),
b = ' '.join([cmd_name]+[repr(e) for e in args]),
c = ' '+mmtype if mmtype else '' ))
aargs,kwargs = main_tool.process_args(cmd_name,args,cls)
c = ' '+mmtype if mmtype else ''))
aargs, kwargs = main_tool.process_args(cmd_name, args, cls)
oq_save = bool(cfg.quiet)
if not cfg.verbose:
cfg._set_quiet(True)
if stdin_input:
fd0,fd1 = os.pipe()
fd0, fd1 = os.pipe()
if os.fork(): # parent
os.close(fd1)
stdin_save = os.dup(0)
os.dup2(fd0,0)
cmd_out = method(*aargs,**kwargs)
os.dup2(stdin_save,0)
os.dup2(fd0, 0)
cmd_out = method(*aargs, **kwargs)
os.dup2(stdin_save, 0)
os.wait()
cfg._set_quiet(oq_save)
return cmd_out
else: # child
os.close(fd0)
os.write(fd1,stdin_input)
os.write(fd1, stdin_input)
vmsg(f'Input: {stdin_input!r}')
sys.exit(0)
else:
ret = method(*aargs,**kwargs)
ret = method(*aargs, **kwargs)
if type(ret).__name__ == 'coroutine':
ret = asyncio.run(ret)
cfg._set_quiet(oq_save)
return ret
def tool_api(cls,cmd_name,args,opts):
def tool_api(cls, cmd_name, args, opts):
from mmgen.tool.api import tool_api
tool = tool_api(cfg)
if opts:
for o in opts:
if o.startswith('--type='):
tool.addrtype = o.split('=')[1]
pargs,kwargs = main_tool.process_args(cmd_name,args,cls)
return getattr(tool,cmd_name)(*pargs,**kwargs)
pargs, kwargs = main_tool.process_args(cmd_name, args, cls)
return getattr(tool, cmd_name)(*pargs, **kwargs)
def check_output(out,chk):
if isinstance(chk,str):
def check_output(out, chk):
if isinstance(chk, str):
chk = chk.encode()
if isinstance(out,int):
if isinstance(out, int):
out = str(out).encode()
if isinstance(out,str):
if isinstance(out, str):
out = out.encode()
err_fs = "Output ({!r}) doesn't match expected output ({!r})"
try:
@ -165,18 +165,18 @@ def check_output(out,chk):
if type(chk).__name__ == 'function':
assert chk(outd), f'{chk.__name__}({outd}) failed!'
elif isinstance(chk,dict):
for k,v in chk.items():
elif isinstance(chk, dict):
for k, v in chk.items():
if k == 'boolfunc':
assert v(outd), f'{v.__name__}({outd}) failed!'
elif k == 'value':
assert outd == v, err_fs.format(outd,v)
assert outd == v, err_fs.format(outd, v)
else:
outval = getattr(__builtins__,k)(out)
outval = getattr(__builtins__, k)(out)
if outval != v:
die(1,f'{k}({out}) returned {outval}, not {v}!')
die(1, f'{k}({out}) returned {outval}, not {v}!')
elif chk is not None:
assert out == chk, err_fs.format(out,chk)
assert out == chk, err_fs.format(out, chk)
def run_test(cls, gid, cmd_name):
data = tests[gid][cmd_name]
@ -199,14 +199,14 @@ def run_test(cls, gid, cmd_name):
m = '{} {}{}'.format(
purple('Testing'),
cmd_name if cfg.names else docstring_head(getattr(cls,cmd_name)),
m2 )
cmd_name if cfg.names else docstring_head(getattr(cls, cmd_name)),
m2)
msg_r(green(m)+'\n' if cfg.verbose else m)
skipping = False
for n,d in enumerate(data):
args,out,opts,mmtype = d + tuple([None] * (4-len(d)))
for n, d in enumerate(data):
args, out, opts, mmtype = d + tuple([None] * (4-len(d)))
if 'fmt=xmrseed' in args and cfg.no_altcoin:
if not skipping:
qmsg('')
@ -216,21 +216,21 @@ def run_test(cls, gid, cmd_name):
continue
skipping = False
stdin_input = None
if args and isinstance(args[0],bytes):
if args and isinstance(args[0], bytes):
stdin_input = args[0]
args[0] = '-'
if cfg.tool_api:
if args and args[0 ]== '-':
if args and args[0]== '-':
continue
cmd_out = tool_api(cls,cmd_name,args,opts)
cmd_out = tool_api(cls, cmd_name, args, opts)
elif cfg.fork:
cmd_out = fork_cmd(cmd_name,args,opts,stdin_input)
cmd_out = fork_cmd(cmd_name, args, opts, stdin_input)
else:
if stdin_input and sys.platform == 'win32':
msg(gray('Skipping for MSWin - no os.fork()'))
continue
method = getattr(cls(cfg,cmdname=cmd_name,proto=proto,mmtype=mmtype),cmd_name)
method = getattr(cls(cfg, cmdname=cmd_name, proto=proto, mmtype=mmtype), cmd_name)
cmd_out = call_method(cls, method, cmd_name, args, mmtype, stdin_input)
try:
@ -238,19 +238,19 @@ def run_test(cls, gid, cmd_name):
except:
vmsg(f'Output:\n{cmd_out!r}\n')
if isinstance(out,tuple) and type(out[0]).__name__ == 'function':
if isinstance(out, tuple) and type(out[0]).__name__ == 'function':
func_out = out[0](cmd_out)
assert func_out == out[1],(
assert func_out == out[1], (
'{}({}) == {} failed!\nOutput: {}'.format(
out[0].__name__,
cmd_out,
out[1],
func_out ))
elif isinstance(out,(list,tuple)):
for co,o in zip(cmd_out.split(NL) if cfg.fork else cmd_out,out):
check_output(co,o)
func_out))
elif isinstance(out, (list, tuple)):
for co, o in zip(cmd_out.split(NL) if cfg.fork else cmd_out, out):
check_output(co, o)
else:
check_output(cmd_out,out)
check_output(cmd_out, out)
if not cfg.verbose:
msg_r('.')
@ -266,7 +266,7 @@ def do_group(gid):
cls = main_tool.get_mod_cls(gid.lower())
qmsg(blue('Testing ' +
desc if cfg.names else
( docstring_head(cls) or desc )
(docstring_head(cls) or desc)
))
for cmdname in cls(cfg).user_commands:
@ -275,18 +275,18 @@ def do_group(gid):
if cmdname not in tests[gid]:
m = f'No test for command {cmdname!r} in group {gid!r}!'
if cfg.die_on_missing:
die(1,m+' Aborting')
die(1, m+' Aborting')
else:
msg(m)
continue
run_test(cls,gid,cmdname)
run_test(cls, gid, cmdname)
def do_cmd_in_group(cmdname):
cls = main_tool.get_cmd_cls(cmdname)
for gid,cmds in tests.items():
for gid, cmds in tests.items():
for cmd in cmds:
if cmd == cmdname:
run_test(cls,gid,cmdname)
run_test(cls, gid, cmdname)
return True
return False
@ -301,7 +301,7 @@ def main():
do_group(cmd)
else:
if not do_cmd_in_group(cmd):
die(1,f'{cmd!r}: not a recognized test or test group')
die(1, f'{cmd!r}: not a recognized test or test group')
else:
for garg in tests:
do_group(garg)
@ -317,8 +317,8 @@ if cfg.tool_api:
if cfg.list_tests:
Msg('Available tests:')
for modname,cmdlist in main_tool.mods.items():
cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
for modname, cmdlist in main_tool.mods.items():
cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'), 'tool_cmd')
Msg(f' {modname:6} - {docstring_head(cls)}')
sys.exit(0)
@ -326,21 +326,21 @@ if cfg.list_tested_cmds:
list_tested_cmds()
sys.exit(0)
tool_exec = os.path.relpath(os.path.join('cmds','mmgen-tool'))
tool_exec = os.path.relpath(os.path.join('cmds', 'mmgen-tool'))
if cfg.fork:
passthru_args = ['coin','type','testnet','token']
tool_cmd = [ tool_exec, '--skip-cfg-file' ] + [
passthru_args = ['coin', 'type', 'testnet', 'token']
tool_cmd = [tool_exec, '--skip-cfg-file'] + [
'--{}{}'.format(
k.replace('_','-'),
'='+getattr(cfg,k) if getattr(cfg,k) is not True else '')
for k in passthru_args if getattr(cfg,k) ]
k.replace('_', '-'),
'='+getattr(cfg, k) if getattr(cfg, k) is not True else '')
for k in passthru_args if getattr(cfg, k)]
if cfg.coverage:
d,f = init_coverage()
tool_cmd_preargs = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f]
d, f = init_coverage()
tool_cmd_preargs = ['python3', '-m', 'trace', '--count', '--coverdir='+d, '--file='+f]
else:
tool_cmd_preargs = ['python3','scripts/exec_wrapper.py']
tool_cmd_preargs = ['python3', 'scripts/exec_wrapper.py']
from mmgen.main import launch
start_time = int(time.time())

View file

@ -20,7 +20,7 @@
test/unit_tests.py: Unit tests for the MMGen suite
"""
import sys,os,time,importlib,platform,asyncio
import sys, os, time, importlib, platform, asyncio
try:
from include.test_init import repo_root
@ -33,13 +33,13 @@ if not os.getenv('MMGEN_DEVTOOLS'):
from mmgen.devinit import init_dev
init_dev()
from mmgen.cfg import Config,gc
from mmgen.cfg import Config, gc
from mmgen.color import gray, brown, orange, yellow, red
from mmgen.util import msg, msg_r, gmsg, ymsg, Msg
from test.include.common import set_globals,end_msg
from test.include.common import set_globals, end_msg
def die(ev,s):
def die(ev, s):
msg((red if ev > 1 else yellow)(s))
sys.exit(ev)
@ -69,20 +69,20 @@ If no test is specified, all available tests are run
if os.path.islink(Config.test_datadir):
os.unlink(Config.test_datadir)
sys.argv.insert(1,'--skip-cfg-file')
sys.argv.insert(1, '--skip-cfg-file')
cfg = Config(opts_data=opts_data)
if cfg.no_altcoin_deps:
ymsg(f'{gc.prog_name}: skipping altcoin tests by user request')
type(cfg)._reset_ok += ('use_internal_keccak_module','debug_addrlist')
type(cfg)._reset_ok += ('use_internal_keccak_module', 'debug_addrlist')
set_globals(cfg)
file_pfx = 'ut_'
tests_d = os.path.join(repo_root,'test','unit_tests_d')
tests_d = os.path.join(repo_root, 'test', 'unit_tests_d')
all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(tests_d) if fn.startswith(file_pfx))
@ -90,7 +90,7 @@ exclude = cfg.exclude.split(',') if cfg.exclude else []
for e in exclude:
if e not in all_tests:
die(1,f'{e!r}: invalid parameter for --exclude (no such test)')
die(1, f'{e!r}: invalid parameter for --exclude (no such test)')
start_time = int(time.time())
@ -102,25 +102,25 @@ if cfg.list_subtests:
def gen():
for test in all_tests:
mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
if hasattr(mod,'unit_tests'):
t = getattr(mod,'unit_tests')
subtests = [k for k,v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
yield fs.format( test, ' '.join(f'{subtest}' for subtest in subtests) )
if hasattr(mod, 'unit_tests'):
t = getattr(mod, 'unit_tests')
subtests = [k for k, v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
yield fs.format(test, ' '.join(f'{subtest}' for subtest in subtests))
else:
yield test
fs = '{:%s} {}' % max(len(t) for t in all_tests)
Msg( fs.format('TEST','SUBTESTS') + '\n' + '\n'.join(gen()) )
Msg(fs.format('TEST', 'SUBTESTS') + '\n' + '\n'.join(gen()))
sys.exit(0)
class UnitTestHelpers:
def __init__(self,subtest_name):
def __init__(self, subtest_name):
self.subtest_name = subtest_name
def skip_msg(self,desc):
cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_","-")!r} for {desc}'))
def skip_msg(self, desc):
cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_", "-")!r} for {desc}'))
def process_bad_data(self,data,pfx='bad '):
def process_bad_data(self, data, pfx='bad '):
if os.getenv('PYTHONOPTIMIZE'):
ymsg('PYTHONOPTIMIZE set, skipping error handling tests')
return
@ -130,7 +130,7 @@ class UnitTestHelpers:
m_exc = '{!r}: incorrect exception type (expected {!r})'
m_err = '{!r}: incorrect error msg (should match {!r}'
m_noraise = "\nillegal action '{}{}' failed to raise an exception (expected {!r})"
for (desc,exc_chk,emsg_chk,func) in data:
for (desc, exc_chk, emsg_chk, func) in data:
try:
cfg._util.vmsg_r(' {}{:{w}}'.format(pfx, desc+':', w=desc_w+1))
ret = func()
@ -140,28 +140,28 @@ class UnitTestHelpers:
exc = type(e).__name__
emsg = e.args[0]
cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]')
assert exc == exc_chk, m_exc.format(exc,exc_chk)
assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk)
assert exc == exc_chk, m_exc.format(exc, exc_chk)
assert re.search(emsg_chk, emsg), m_err.format(emsg, emsg_chk)
else:
die(4,m_noraise.format(pfx,desc,exc_chk))
die(4, m_noraise.format(pfx, desc, exc_chk))
tests_seen = []
def run_test(test,subtest=None):
def run_test(test, subtest=None):
mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}')
def run_subtest(t,subtest):
subtest_disp = subtest.replace('_','-')
def run_subtest(t, subtest):
subtest_disp = subtest.replace('_', '-')
msg(brown('Running unit subtest ') + orange(f'{test}.{subtest_disp}'))
if getattr(t,'silence_output',False):
if getattr(t, 'silence_output', False):
t._silence()
if hasattr(t,'_pre_subtest'):
getattr(t,'_pre_subtest')(test,subtest,UnitTestHelpers(subtest))
if hasattr(t, '_pre_subtest'):
getattr(t, '_pre_subtest')(test, subtest, UnitTestHelpers(subtest))
try:
func = getattr(t,subtest.replace('-','_'))
func = getattr(t, subtest.replace('-', '_'))
c = func.__code__
do_desc = c.co_varnames[c.co_argcount-1] == 'desc'
if do_desc:
@ -176,41 +176,41 @@ def run_test(test,subtest=None):
if do_desc and not cfg.quiet:
msg('OK\n' if cfg.verbose else 'OK')
except:
if getattr(t,'silence_output',False):
if getattr(t, 'silence_output', False):
t._end_silence()
raise
if hasattr(t,'_post_subtest'):
getattr(t,'_post_subtest')(test,subtest,UnitTestHelpers(subtest))
if hasattr(t, '_post_subtest'):
getattr(t, '_post_subtest')(test, subtest, UnitTestHelpers(subtest))
if getattr(t,'silence_output',False):
if getattr(t, 'silence_output', False):
t._end_silence()
if not ret:
die(4,f'Unit subtest {subtest_disp!r} failed')
die(4, f'Unit subtest {subtest_disp!r} failed')
if test not in tests_seen:
gmsg(f'Running unit test {test}')
tests_seen.append(test)
if cfg.no_altcoin_deps and getattr(mod,'altcoin_dep',None):
if cfg.no_altcoin_deps and getattr(mod, 'altcoin_dep', None):
cfg._util.qmsg(gray(f'Skipping unit test {test!r} [--no-altcoin-deps]'))
return
if hasattr(mod,'unit_tests'): # new class-based API
t = getattr(mod,'unit_tests')()
altcoin_deps = getattr(t,'altcoin_deps',())
if hasattr(mod, 'unit_tests'): # new class-based API
t = getattr(mod, 'unit_tests')()
altcoin_deps = getattr(t, 'altcoin_deps', ())
win_skip = getattr(t, 'win_skip', ())
mac_skip = getattr(t, 'mac_skip', ())
arm_skip = getattr(t, 'arm_skip', ())
subtests = (
[subtest] if subtest else
[k for k,v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
[k for k, v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_']
)
if hasattr(t,'_pre'):
if hasattr(t, '_pre'):
t._pre()
for _subtest in subtests:
subtest_disp = _subtest.replace('_','-')
subtest_disp = _subtest.replace('_', '-')
if cfg.no_altcoin_deps and _subtest in altcoin_deps:
cfg._util.qmsg(gray(f'Skipping unit subtest {subtest_disp!r} [--no-altcoin-deps]'))
continue
@ -224,23 +224,23 @@ def run_test(test,subtest=None):
cfg._util.qmsg(gray(f'Skipping unit subtest {subtest_disp!r} for ARM platform'))
continue
run_subtest(t, _subtest)
if hasattr(t,'_post'):
if hasattr(t, '_post'):
t._post()
else:
assert not subtest, f'{subtest!r}: subtests not supported for this unit test'
if not mod.unit_test().run_test(test,UnitTestHelpers(test)):
die(4,'Unit test {test!r} failed')
if not mod.unit_test().run_test(test, UnitTestHelpers(test)):
die(4, 'Unit test {test!r} failed')
def main():
for test in (cfg._args or all_tests):
if '.' in test:
test,subtest = test.split('.')
test, subtest = test.split('.')
else:
subtest = None
if test not in all_tests:
die(1,f'{test!r}: test not recognized')
die(1, f'{test!r}: test not recognized')
if test not in exclude:
run_test(test,subtest=subtest)
run_test(test, subtest=subtest)
end_msg(int(time.time()) - start_time)
from mmgen.main import launch