From 74bc49f9730d95b32a3fe5aaf957b4298a887f45 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Fri, 18 Oct 2024 10:32:12 +0000 Subject: [PATCH] whitespace: tests, top level --- test/altcointest.py | 263 +++++++++++++++-------------- test/cmdtest.py | 344 +++++++++++++++++++------------------- test/colortest.py | 18 +- test/gentest.py | 233 +++++++++++++------------- test/hashfunc.py | 42 ++--- test/objattrtest.py | 72 ++++---- test/objtest.py | 51 +++--- test/scrambletest.py | 12 +- test/tooltest.py | 383 +++++++++++++++++++++++-------------------- test/tooltest2.py | 126 +++++++------- test/unit_tests.py | 92 +++++------ 11 files changed, 835 insertions(+), 801 deletions(-) diff --git a/test/altcointest.py b/test/altcointest.py index e16083c8..b9ed0c90 100755 --- a/test/altcointest.py +++ b/test/altcointest.py @@ -19,20 +19,20 @@ try: except ImportError: from test.include import test_init -from mmgen.cfg import gc,Config +from mmgen.cfg import gc, Config from mmgen.util import msg from mmgen.altcoin.params import CoinInfo -def test_equal(desc,a,b,*cdata): +def test_equal(desc, a, b, *cdata): if type(a) is int: a = hex(a) b = hex(b) - (network,coin,_,b_desc,verbose) = cdata + (network, coin, _, b_desc, verbose) = cdata if verbose: msg(f' {desc:20}: {a!r}') if a != b: raise ValueError( - f'{desc.capitalize()}s for {coin.upper()} {network} do not match:\n CoinInfo: {a}\n {b_desc}: {b}' ) + f'{desc.capitalize()}s for {coin.upper()} {network} do not match:\n CoinInfo: {a}\n {b_desc}: {b}') class TestCoinInfo(CoinInfo): @@ -40,41 +40,41 @@ class TestCoinInfo(CoinInfo): # No check for segwit, p2sh check skipped if source doesn't support it cross_checks = { '2GIVE': ['wn'], - '42': ['vg','wn'], + '42': ['vg', 'wn'], '611': ['wn'], - 'AC': ['lb','vg'], + 'AC': ['lb', 'vg'], 'ACOIN': ['wn'], 'ALF': ['wn'], - 'ANC': ['vg','wn'], + 'ANC': ['vg', 'wn'], 'APEX': ['wn'], 'ARCO': ['wn'], 'ARG': ['pc'], - 'AUR': ['vg','wn'], + 'AUR': ['vg', 'wn'], 'BCH': ['wn'], - 'BLK': ['lb','vg','wn'], - 'BQC': ['vg','wn'], + 'BLK': ['lb', 'vg', 'wn'], + 'BQC': ['vg', 'wn'], 'BSTY': ['wn'], - 'BTC': ['lb','vg','wn'], - 'BTCD': ['lb','vg','wn'], + 'BTC': ['lb', 'vg', 'wn'], + 'BTCD': ['lb', 'vg', 'wn'], 'BUCKS': ['wn'], 'CASH': ['wn'], 'CBX': ['wn'], - 'CCN': ['lb','vg','wn'], - 'CDN': ['lb','vg','wn'], + 'CCN': ['lb', 'vg', 'wn'], + 'CDN': ['lb', 'vg', 'wn'], 'CHC': ['wn'], - 'CLAM': ['lb','vg'], - 'CON': ['vg','wn'], + 'CLAM': ['lb', 'vg'], + 'CON': ['vg', 'wn'], 'CPC': ['wn'], - 'DASH': ['lb','pc','vg','wn'], + 'DASH': ['lb', 'pc', 'vg', 'wn'], 'DCR': ['pc'], 'DFC': ['pc'], - 'DGB': ['lb','vg'], - 'DGC': ['lb','vg','wn'], - 'DOGE': ['lb','pc','vg','wn'], - 'DOGED': ['lb','vg','wn'], - 'DOPE': ['lb','vg'], - 'DVC': ['vg','wn'], - 'EFL': ['lb','vg','wn'], + 'DGB': ['lb', 'vg'], + 'DGC': ['lb', 'vg', 'wn'], + 'DOGE': ['lb', 'pc', 'vg', 'wn'], + 'DOGED': ['lb', 'vg', 'wn'], + 'DOPE': ['lb', 'vg'], + 'DVC': ['vg', 'wn'], + 'EFL': ['lb', 'vg', 'wn'], 'EMC': ['vg'], 'EMD': ['wn'], 'ESP': ['wn'], @@ -85,162 +85,162 @@ class TestCoinInfo(CoinInfo): 'FLO': ['wn'], 'FLT': ['wn'], 'FST': ['wn'], - 'FTC': ['lb','pc','vg','wn'], - 'GCR': ['lb','vg'], + 'FTC': ['lb', 'pc', 'vg', 'wn'], + 'GCR': ['lb', 'vg'], 'GOOD': ['wn'], - 'GRC': ['vg','wn'], - 'GUN': ['vg','wn'], - 'HAM': ['vg','wn'], + 'GRC': ['vg', 'wn'], + 'GUN': ['vg', 'wn'], + 'HAM': ['vg', 'wn'], 'HTML5': ['wn'], 'HYP': ['wn'], 'ICASH': ['wn'], 'INFX': ['wn'], 'IPC': ['wn'], - 'JBS': ['lb','pc','vg','wn'], + 'JBS': ['lb', 'pc', 'vg', 'wn'], 'JUDGE': ['wn'], 'LANA': ['wn'], 'LAT': ['wn'], 'LDOGE': ['wn'], 'LMC': ['wn'], - 'LTC': ['lb','vg','wn'], + 'LTC': ['lb', 'vg', 'wn'], 'MARS': ['wn'], - 'MEC': ['pc','wn'], + 'MEC': ['pc', 'wn'], 'MINT': ['wn'], 'MOBI': ['wn'], - 'MONA': ['lb','vg'], + 'MONA': ['lb', 'vg'], 'MOON': ['wn'], - 'MUE': ['lb','vg'], + 'MUE': ['lb', 'vg'], 'MXT': ['wn'], 'MYR': ['pc'], - 'MYRIAD': ['vg','wn'], - 'MZC': ['lb','pc','vg','wn'], - 'NEOS': ['lb','vg'], + 'MYRIAD': ['vg', 'wn'], + 'MZC': ['lb', 'pc', 'vg', 'wn'], + 'NEOS': ['lb', 'vg'], 'NEVA': ['wn'], 'NKA': ['wn'], - 'NLG': ['vg','wn'], - 'NMC': ['lb','vg'], - 'NVC': ['lb','vg','wn'], - 'OK': ['lb','vg'], - 'OMC': ['vg','wn'], - 'ONION': ['vg','wn'], + 'NLG': ['vg', 'wn'], + 'NMC': ['lb', 'vg'], + 'NVC': ['lb', 'vg', 'wn'], + 'OK': ['lb', 'vg'], + 'OMC': ['vg', 'wn'], + 'ONION': ['vg', 'wn'], 'PART': ['wn'], - 'PINK': ['vg','wn'], + 'PINK': ['vg', 'wn'], 'PIVX': ['wn'], - 'PKB': ['lb','vg','wn'], - 'PND': ['lb','vg','wn'], - 'POT': ['lb','vg','wn'], - 'PPC': ['lb','vg','wn'], - 'PTC': ['vg','wn'], + 'PKB': ['lb', 'vg', 'wn'], + 'PND': ['lb', 'vg', 'wn'], + 'POT': ['lb', 'vg', 'wn'], + 'PPC': ['lb', 'vg', 'wn'], + 'PTC': ['vg', 'wn'], 'PXC': ['wn'], 'QRK': ['wn'], 'RAIN': ['wn'], 'RBT': ['wn'], - 'RBY': ['lb','vg'], - 'RDD': ['vg','wn'], - 'RIC': ['pc','vg','wn'], - 'SDC': ['lb','vg'], + 'RBY': ['lb', 'vg'], + 'RDD': ['vg', 'wn'], + 'RIC': ['pc', 'vg', 'wn'], + 'SDC': ['lb', 'vg'], 'SIB': ['wn'], 'SMLY': ['wn'], 'SONG': ['wn'], - 'SPR': ['vg','wn'], - 'START': ['lb','vg'], + 'SPR': ['vg', 'wn'], + 'START': ['lb', 'vg'], 'SYS': ['wn'], 'TAJ': ['wn'], 'TIT': ['wn'], - 'TPC': ['lb','vg'], + 'TPC': ['lb', 'vg'], 'TRC': ['wn'], 'TTC': ['wn'], 'TX': ['wn'], - 'UNO': ['pc','vg','wn'], - 'VIA': ['lb','pc','vg','wn'], - 'VPN': ['lb','vg'], - 'VTC': ['lb','vg','wn'], - 'WDC': ['vg','wn'], + 'UNO': ['pc', 'vg', 'wn'], + 'VIA': ['lb', 'pc', 'vg', 'wn'], + 'VPN': ['lb', 'vg'], + 'VTC': ['lb', 'vg', 'wn'], + 'WDC': ['vg', 'wn'], 'WISC': ['wn'], - 'WKC': ['vg','wn'], + 'WKC': ['vg', 'wn'], 'WSX': ['wn'], 'XCN': ['wn'], 'XGB': ['wn'], - 'XPM': ['lb','vg','wn'], + 'XPM': ['lb', 'vg', 'wn'], 'XST': ['wn'], 'XVC': ['wn'], 'ZET': ['wn'], - 'ZOOM': ['lb','vg'], - 'ZRC': ['lb','vg'] + 'ZOOM': ['lb', 'vg'], + 'ZRC': ['lb', 'vg'] } @classmethod - def verify_leading_symbols(cls,quiet=False,verbose=False): + def verify_leading_symbols(cls, quiet=False, verbose=False): - for network in ('mainnet','testnet'): + for network in ('mainnet', 'testnet'): for coin in [e.symbol for e in cls.coin_constants[network]]: - e = cls.get_entry(coin,network) - cdata = (network,coin,e,'Computed value',verbose) + e = cls.get_entry(coin, network) + cdata = (network, coin, e, 'Computed value', verbose) if not quiet: msg(f'{coin} {network}') vn_info = e.p2pkh_info ret = cls.find_addr_leading_symbol(vn_info[0]) - test_equal('P2PKH leading symbol',vn_info[1],ret,*cdata) + test_equal('P2PKH leading symbol', vn_info[1], ret, *cdata) vn_info = e.p2sh_info if vn_info: ret = cls.find_addr_leading_symbol(vn_info[0]) - test_equal('P2SH leading symbol',vn_info[1],ret,*cdata) + test_equal('P2SH leading symbol', vn_info[1], ret, *cdata) @classmethod - def verify_core_coin_data(cls,cfg,quiet=False,verbose=False): - from mmgen.protocol import CoinProtocol,init_proto + def verify_core_coin_data(cls, cfg, quiet=False, verbose=False): + from mmgen.protocol import CoinProtocol, init_proto - for network in ('mainnet','testnet'): + for network in ('mainnet', 'testnet'): for coin in gc.core_coins: - e = cls.get_entry(coin,network) + e = cls.get_entry(coin, network) if e: - proto = init_proto( cfg, coin, network=network ) - cdata = (network,coin,e,type(proto).__name__,verbose) + proto = init_proto(cfg, coin, network=network) + cdata = (network, coin, e, type(proto).__name__, verbose) if not quiet: msg(f'Verifying {coin.upper()} {network}') if coin != 'bch': # TODO - test_equal('coin name',e.name,proto.name,*cdata) + test_equal('coin name', e.name, proto.name, *cdata) if e.trust_level != -1: - test_equal('Trust level',e.trust_level,CoinProtocol.coins[coin].trust_level,*cdata) + test_equal('Trust level', e.trust_level, CoinProtocol.coins[coin].trust_level, *cdata) test_equal( 'WIF version number', e.wif_ver_num, - int.from_bytes(proto.wif_ver_bytes['std'],'big'), - *cdata ) + int.from_bytes(proto.wif_ver_bytes['std'], 'big'), + *cdata) test_equal( 'P2PKH version number', e.p2pkh_info[0], - int.from_bytes(proto.addr_fmt_to_ver_bytes['p2pkh'],'big'), - *cdata ) + int.from_bytes(proto.addr_fmt_to_ver_bytes['p2pkh'], 'big'), + *cdata) test_equal( 'P2SH version number', e.p2sh_info[0], - int.from_bytes(proto.addr_fmt_to_ver_bytes['p2sh'],'big'), - *cdata ) + int.from_bytes(proto.addr_fmt_to_ver_bytes['p2sh'], 'big'), + *cdata) # Data is one of the coin_constants lists above. Normalize ints to hex of correct width, add # missing leading letters, set trust level from external_tests. # Insert a coin entry from outside source, set version info leading letters to '?' and trust level # to 0, then run TestCoinInfo.fix_table(data). 'has_segwit' field is updated manually for now. @classmethod - def fix_table(cls,data): + def fix_table(cls, data): import re def myhex(n): - return '0x{:0{}x}'.format(n,2 if n < 256 else 4) + return '0x{:0{}x}'.format(n, 2 if n < 256 else 4) - def fix_ver_info(e,k): + def fix_ver_info(e, k): e[k] = list(e[k]) e[k][0] = myhex(e[k][0]) - s1 = cls.find_addr_leading_symbol(int(e[k][0][2:],16)) + s1 = cls.find_addr_leading_symbol(int(e[k][0][2:], 16)) m = f'Fixing leading address letter for coin {e["symbol"]} ({e[k][1]!r} --> {s1})' if e[k][1] != '?': assert s1 == e[k][1], f'First letters do not match! {m}' @@ -263,17 +263,17 @@ class TestCoinInfo(CoinInfo): for e in data: e = e._asdict() e['wif_ver_num'] = myhex(e['wif_ver_num']) - sym,trust = e['symbol'],e['trust_level'] + sym, trust = e['symbol'], e['trust_level'] - fix_ver_info(e,'p2pkh_info') - if isinstance(e['p2sh_info'],tuple): - fix_ver_info(e,'p2sh_info') + fix_ver_info(e, 'p2pkh_info') + if isinstance(e['p2sh_info'], tuple): + fix_ver_info(e, 'p2sh_info') for k in e.keys(): e[k] = repr(e[k]) - e[k] = re.sub(r"'0x(..)'",r'0x\1',e[k]) - e[k] = re.sub(r"'0x(....)'",r'0x\1',e[k]) - e[k] = re.sub(r' ',r'',e[k]) + ('',',')[k != 'trust_level'] + e[k] = re.sub(r"'0x(..)'", r'0x\1', e[k]) + e[k] = re.sub(r"'0x(....)'", r'0x\1', e[k]) + e[k] = re.sub(r' ', r'', e[k]) + ('', ',')[k != 'trust_level'] if trust != -1: if sym in tt: @@ -295,29 +295,29 @@ class TestCoinInfo(CoinInfo): msg(f'Processed {len(data)} entries') @classmethod - def find_addr_leading_symbol(cls,ver_num,verbose=False): + def find_addr_leading_symbol(cls, ver_num, verbose=False): if ver_num == 0: return '1' - def phash2addr(ver_num,pk_hash): + def phash2addr(ver_num, pk_hash): from mmgen.proto.btc.common import b58chk_encode bl = ver_num.bit_length() - ver_bytes = int.to_bytes(ver_num,bl//8 + bool(bl%8),'big') + ver_bytes = int.to_bytes(ver_num, bl//8 + bool(bl%8), 'big') return b58chk_encode(ver_bytes + pk_hash) - low = phash2addr(ver_num,b'\x00'*20) - high = phash2addr(ver_num,b'\xff'*20) + low = phash2addr(ver_num, b'\x00'*20) + high = phash2addr(ver_num, b'\xff'*20) if verbose: print('low address: ' + low) print('high address: ' + high) - l1,h1 = low[0],high[0] - return (l1,h1) if l1 != h1 else l1 + l1, h1 = low[0], high[0] + return (l1, h1) if l1 != h1 else l1 @classmethod - def print_symbols(cls,include_names=False,reverse=False): + def print_symbols(cls, include_names=False, reverse=False): for e in cls.coin_constants['mainnet']: if reverse: print(f'{e.symbol:6} {e.name}') @@ -340,10 +340,10 @@ class TestCoinInfo(CoinInfo): tt[k] = cls.trust_override[k] return tt - trust_override = {'BTC':3,'BCH':3,'LTC':3,'DASH':1,'EMC':2} + trust_override = {'BTC':3, 'BCH':3, 'LTC':3, 'DASH':1, 'EMC':2} @classmethod - def get_test_support(cls,coin,addr_type,network,toolname=None,verbose=False): + def get_test_support(cls, coin, addr_type, network, toolname=None, verbose=False): """ If requested tool supports coin/addr_type/network triplet, return tool name. If 'tool' is None, return tool that supports coin/addr_type/network triplet. @@ -359,11 +359,11 @@ class TestCoinInfo(CoinInfo): if verbose: m1 = 'Requested tool {t!r} does not support coin {c} on network {n}' m2 = 'No test tool found for coin {c} on network {n}' - msg((m1 if toolname else m2).format(t=tool,c=coin,n=network)) + msg((m1 if toolname else m2).format(t=tool, c=coin, n=network)) return None if addr_type == 'zcash_z': - if toolname in (None,'zcash-mini'): + if toolname in (None, 'zcash-mini'): return 'zcash-mini' else: if verbose: @@ -383,7 +383,7 @@ class TestCoinInfo(CoinInfo): if toolname: # skip whitelists return tool - if addr_type in ('segwit','bech32'): + if addr_type in ('segwit', 'bech32'): st = cls.external_tests_segwit_whitelist if addr_type in st and coin in st[addr_type]: return tool @@ -391,7 +391,7 @@ class TestCoinInfo(CoinInfo): if verbose: m1 = 'Requested tool {t!r} does not support coin {c}, addr_type {a!r}, on network {n}' m2 = 'No test tool found supporting coin {c}, addr_type {a!r}, on network {n}' - msg((m1 if toolname else m2).format(t=tool,c=coin,n=network,a=addr_type)) + msg((m1 if toolname else m2).format(t=tool, c=coin, n=network, a=addr_type)) return None return tool @@ -403,19 +403,18 @@ class TestCoinInfo(CoinInfo): 'pycoin': ( 'DASH', # only compressed 'BCH', - 'BTC','LTC','VIA','FTC','DOGE','MEC', - 'JBS','MZC','RIC','DFC','FAI','ARG','ZEC','DCR'), - 'keyconv': ( - 'BCH', - # broken: PIVX - '42','AC','AIB','ANC','ARS','ATMOS','AUR','BLK','BQC','BTC','TEST','BTCD','CCC','CCN','CDN', - 'CLAM','CNC','CNOTE','CON','CRW','DEEPONION','DGB','DGC','DMD','DOGED','DOGE','DOPE', - 'DVC','EFL','EMC','EXCL','FAIR','FLOZ','FTC','GAME','GAP','GCR','GRC','GRS','GUN','HAM','HODL', - 'IXC','JBS','LBRY','LEAF','LTC','MMC','MONA','MUE','MYRIAD','MZC','NEOS','NLG','NMC','NVC', - 'NYAN','OK','OMC','PIGGY','PINK','PKB','PND','POT','PPC','PTC','PTS','QTUM','RBY','RDD', - 'RIC','SCA','SDC','SKC','SPR','START','SXC','TPC','UIS','UNO','VIA','VPN','VTC','WDC','WKC', - 'WUBS', 'XC', 'XPM', 'YAC', 'ZOOM', 'ZRC'), - 'ethkey': ('ETH','ETC'), + 'BTC', 'LTC', 'VIA', 'FTC', 'DOGE', 'MEC', + 'JBS', 'MZC', 'RIC', 'DFC', 'FAI', 'ARG', 'ZEC', 'DCR'), + 'keyconv': ( # broken: PIVX + 'BCH', '42', 'AC', 'AIB', 'ANC', 'ARS', 'ATMOS', 'AUR', 'BLK', 'BQC', 'BTC', 'TEST', + 'BTCD', 'CCC', 'CCN', 'CDN', 'CLAM', 'CNC', 'CNOTE', 'CON', 'CRW', 'DEEPONION', 'DGB', + 'DGC', 'DMD', 'DOGED', 'DOGE', 'DOPE', 'DVC', 'EFL', 'EMC', 'EXCL', 'FAIR', 'FLOZ', 'FTC', + 'GAME', 'GAP', 'GCR', 'GRC', 'GRS', 'GUN', 'HAM', 'HODL', 'IXC', 'JBS', 'LBRY', 'LEAF', + 'LTC', 'MMC', 'MONA', 'MUE', 'MYRIAD', 'MZC', 'NEOS', 'NLG', 'NMC', 'NVC', 'NYAN', 'OK', + 'OMC', 'PIGGY', 'PINK', 'PKB', 'PND', 'POT', 'PPC', 'PTC', 'PTS', 'QTUM', 'RBY', 'RDD', + 'RIC', 'SCA', 'SDC', 'SKC', 'SPR', 'START', 'SXC', 'TPC', 'UIS', 'UNO', 'VIA', 'VPN', + 'VTC', 'WDC', 'WKC', 'WUBS', 'XC', 'XPM', 'YAC', 'ZOOM', 'ZRC'), + 'ethkey': ('ETH', 'ETC'), 'zcash-mini': ('ZEC',), 'monero-python': ('XMR',), }, @@ -423,7 +422,7 @@ class TestCoinInfo(CoinInfo): 'pycoin': { 'DASH':'tDASH', # only compressed 'BCH':'XTN', - 'BTC':'XTN','LTC':'XLT','VIA':'TVI','FTC':'FTX','DOGE':'XDT','DCR':'DCRT' + 'BTC':'XTN', 'LTC':'XLT', 'VIA':'TVI', 'FTC':'FTX', 'DOGE':'XDT', 'DCR':'DCRT' }, 'ethkey': {}, 'keyconv': {} @@ -433,17 +432,17 @@ class TestCoinInfo(CoinInfo): # Whitelists apply to the *first* tool in cls.external_tests supporting the given coin/addr_type. # They're ignored if specific tool is requested. 'segwit': ('BTC',), # LTC Segwit broken on pycoin: uses old fmt - 'bech32': ('BTC','LTC'), + 'bech32': ('BTC', 'LTC'), 'compressed': ( - 'BTC','LTC','VIA','FTC','DOGE','DASH','MEC','MYR','UNO', - 'JBS','MZC','RIC','DFC','FAI','ARG','ZEC','DCR','ZEC' + 'BTC', 'LTC', 'VIA', 'FTC', 'DOGE', 'DASH', 'MEC', 'MYR', 'UNO', + 'JBS', 'MZC', 'RIC', 'DFC', 'FAI', 'ARG', 'ZEC', 'DCR', 'ZEC' ), } external_tests_blacklist = { # Unconditionally block testing of the given coin/addr_type with given tool, or all coins if True 'legacy': {}, - 'segwit': { 'keyconv': True }, - 'bech32': { 'keyconv': True }, + 'segwit': {'keyconv': True}, + 'bech32': {'keyconv': True}, } if __name__ == '__main__': @@ -456,10 +455,10 @@ if __name__ == '__main__': } } - cfg = Config( opts_data=opts_data, need_amt=False ) + cfg = Config(opts_data=opts_data, need_amt=False) msg('Checking CoinInfo WIF/P2PKH/P2SH version numbers and trust levels against protocol.py') - TestCoinInfo.verify_core_coin_data( cfg, cfg.quiet, cfg.verbose ) + TestCoinInfo.verify_core_coin_data(cfg, cfg.quiet, cfg.verbose) msg('Checking CoinInfo address leading symbols') - TestCoinInfo.verify_leading_symbols( cfg.quiet, cfg.verbose ) + TestCoinInfo.verify_leading_symbols(cfg.quiet, cfg.verbose) diff --git a/test/cmdtest.py b/test/cmdtest.py index 5ddf8ad3..92b45f0f 100755 --- a/test/cmdtest.py +++ b/test/cmdtest.py @@ -21,17 +21,17 @@ test/cmdtest.py: Command test runner for the MMGen wallet system """ def check_segwit_opts(): - for k,m in (('segwit','S'),('segwit_random','S'),('bech32','B')): - if getattr(cfg,k) and m not in proto.mmtypes: - die(1,f'--{k.replace("_","-")} option incompatible with {proto.cls_name}') + for k, m in (('segwit', 'S'), ('segwit_random', 'S'), ('bech32', 'B')): + if getattr(cfg, k) and m not in proto.mmtypes: + die(1, f'--{k.replace("_", "-")} option incompatible with {proto.cls_name}') -def create_shm_dir(data_dir,trash_dir): +def create_shm_dir(data_dir, trash_dir): # Laggy flash media can cause pexpect to fail, so create a temporary directory # under '/dev/shm' and put datadir and tmpdirs here. import shutil from subprocess import run if sys.platform in ('win32', 'darwin'): - for tdir in (data_dir,trash_dir): + for tdir in (data_dir, trash_dir): try: os.listdir(tdir) except: @@ -41,35 +41,35 @@ def create_shm_dir(data_dir,trash_dir): shutil.rmtree(tdir) except: # we couldn't remove data dir - perhaps regtest daemon is running try: - run(['python3',os.path.join('cmds','mmgen-regtest'),'stop'],check=True) + run(['python3', os.path.join('cmds', 'mmgen-regtest'), 'stop'], check=True) except: - die(4,f'Unable to remove {tdir!r}!') + die(4, f'Unable to remove {tdir!r}!') else: time.sleep(2) shutil.rmtree(tdir) - os.mkdir(tdir,0o755) + os.mkdir(tdir, 0o755) shm_dir = 'test' else: - tdir,pfx = '/dev/shm','mmgen-test-' + tdir, pfx = '/dev/shm', 'mmgen-test-' try: - run(f'rm -rf {tdir}/{pfx}*',shell=True,check=True) + run(f'rm -rf {tdir}/{pfx}*', shell=True, check=True) except Exception as e: - die(2,f'Unable to delete directory tree {tdir}/{pfx}* ({e.args[0]})') + die(2, f'Unable to delete directory tree {tdir}/{pfx}* ({e.args[0]})') try: import tempfile - shm_dir = str(tempfile.mkdtemp('',pfx,tdir)) + shm_dir = str(tempfile.mkdtemp('', pfx, tdir)) except Exception as e: - die(2,f'Unable to create temporary directory in {tdir} ({e.args[0]})') + die(2, f'Unable to create temporary directory in {tdir} ({e.args[0]})') - dest = os.path.join(shm_dir,os.path.basename(trash_dir)) - os.mkdir(dest,0o755) + dest = os.path.join(shm_dir, os.path.basename(trash_dir)) + os.mkdir(dest, 0o755) - run(f'rm -rf {trash_dir}',shell=True,check=True) - os.symlink(dest,trash_dir) + run(f'rm -rf {trash_dir}', shell=True, check=True) + os.symlink(dest, trash_dir) - dest = os.path.join(shm_dir,os.path.basename(data_dir)) - shutil.move(data_dir,dest) # data_dir was created by Config() - os.symlink(dest,data_dir) + dest = os.path.join(shm_dir, os.path.basename(data_dir)) + shutil.move(data_dir, dest) # data_dir was created by Config() + os.symlink(dest, data_dir) return shm_dir @@ -81,8 +81,8 @@ try: except ImportError: from test.include.test_init import repo_root -from mmgen.cfg import Config,gc -from mmgen.color import red,yellow,green,blue,cyan,gray,nocolor,init_color +from mmgen.cfg import Config, gc +from mmgen.color import red, yellow, green, blue, cyan, gray, nocolor, init_color from mmgen.util import msg, Msg, rmsg, bmsg, die, suf, make_timestr from test.include.common import ( @@ -101,7 +101,7 @@ from test.include.common import ( ) try: - os.unlink(os.path.join(repo_root,cmdtest_py_error_fn)) + os.unlink(os.path.join(repo_root, cmdtest_py_error_fn)) except: pass @@ -109,10 +109,10 @@ os.environ['MMGEN_QUIET'] = '0' # for this script and spawned scripts opts_data = { 'sets': [ - ('list_current_cmd_groups',True,'list_cmd_groups',True), - ('demo',True,'exact_output',True), - ('demo',True,'buf_keypress',True), - ('demo',True,'pexpect_spawn',True), + ('list_current_cmd_groups', True, 'list_cmd_groups', True), + ('demo', True, 'exact_output', True), + ('demo', True, 'buf_keypress', True), + ('demo', True, 'pexpect_spawn', True), ], 'text': { 'desc': 'High-level tests for the MMGen Wallet suite', @@ -171,14 +171,14 @@ environment var """ }, 'code': { - 'options': lambda proto,help_notes,s: s.format( + 'options': lambda proto, help_notes, s: s.format( lf = cmdtest_py_log_fn ) } } # we need some opt values before running opts.init, so parse without initializing: -po = Config(opts_data=opts_data,parse_only=True)._parsed_opts +po = Config(opts_data=opts_data, parse_only=True)._parsed_opts data_dir = Config.test_datadir @@ -193,7 +193,7 @@ if not po.user_opts.get('skip_deps'): cfg = Config(opts_data=opts_data) if cfg.no_altcoin and cfg.coin != 'BTC': - die(1,f'--no-altcoin incompatible with --coin={cfg.coin}') + die(1, f'--no-altcoin incompatible with --coin={cfg.coin}') set_globals(cfg) @@ -208,31 +208,31 @@ type(cfg)._reset_ok += ( 'no_timings', 'exit_after', 'resuming', - 'skipping_deps' ) + 'skipping_deps') logging = cfg.log or os.getenv('MMGEN_EXEC_WRAPPER') -cfg.resuming = any(k in po.user_opts for k in ('resume','resume_after')) +cfg.resuming = any(k in po.user_opts for k in ('resume', 'resume_after')) cfg.skipping_deps = cfg.resuming or 'skip_deps' in po.user_opts cmd_args = cfg._args if cfg.pexpect_spawn and sys.platform == 'win32': - die(1,'--pexpect-spawn option not supported on Windows platform, exiting') + die(1, '--pexpect-spawn option not supported on Windows platform, exiting') if cfg.daemon_id and cfg.daemon_id in cfg.blacklisted_daemons.split(): - die(1,f'cmdtest.py: daemon {cfg.daemon_id!r} blacklisted, exiting') + die(1, f'cmdtest.py: daemon {cfg.daemon_id!r} blacklisted, exiting') network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '') proto = cfg._proto # step 3: move data_dir to /dev/shm and symlink it back to ./test: -trash_dir = os.path.join('test','trash') -trash_dir2 = os.path.join('test','trash2') +trash_dir = os.path.join('test', 'trash') +trash_dir2 = os.path.join('test', 'trash2') if not cfg.skipping_deps: - shm_dir = create_shm_dir(data_dir,trash_dir) + shm_dir = create_shm_dir(data_dir, trash_dir) check_segwit_opts() @@ -262,24 +262,24 @@ def list_cmds(): def gen_output(): gm = CmdGroupMgr() - cw,d = 0,[] + cw, d = 0, [] yield green('AVAILABLE COMMANDS:') for gname in gm.cmd_groups: - tg = gm.gm_init_group(None,gname,None,None) + tg = gm.gm_init_group(None, gname, None, None) desc = tg.__doc__.strip() if tg.__doc__ else type(tg).__name__ - d.append( (gname,desc,gm.cmd_list,gm.dpy_data) ) - cw = max(max(len(k) for k in gm.dpy_data),cw) + d.append((gname, desc, gm.cmd_list, gm.dpy_data)) + cw = max(max(len(k) for k in gm.dpy_data), cw) - for gname,gdesc,clist,dpdata in d: + for gname, gdesc, clist, dpdata in d: yield '\n'+green(f'{gname!r} - {gdesc}:') for cmd in clist: data = dpdata[cmd] yield ' {:{w}} - {}'.format( cmd, - (data if isinstance(data,str) else data[1]), - w = cw ) + (data if isinstance(data, str) else data[1]), + w = cw) from mmgen.ui import do_pager do_pager('\n'.join(gen_output())) @@ -297,9 +297,9 @@ def create_tmp_dirs(shm_dir): for cfg in sorted(cfgs): mk_tmpdir(cfgs[cfg]['tmpdir']) else: - os.makedirs( os.path.join('test','tmp'), mode=0o755, exist_ok=True ) + os.makedirs(os.path.join('test', 'tmp'), mode=0o755, exist_ok=True) for cfg in sorted(cfgs): - src = os.path.join(shm_dir,cfgs[cfg]['tmpdir'].split('/')[-1]) + src = os.path.join(shm_dir, cfgs[cfg]['tmpdir'].split('/')[-1]) mk_tmpdir(src) try: os.unlink(cfgs[cfg]['tmpdir']) @@ -307,10 +307,10 @@ def create_tmp_dirs(shm_dir): if e.errno != 2: raise finally: - os.symlink(src,cfgs[cfg]['tmpdir']) + os.symlink(src, cfgs[cfg]['tmpdir']) def set_restore_term_at_exit(): - import termios,atexit + import termios, atexit fd = sys.stdin.fileno() old = termios.tcgetattr(fd) def at_exit(): @@ -321,36 +321,36 @@ class CmdGroupMgr: dpy_data = None - from test.cmdtest_py_d.cfg import cmd_groups_dfl,cmd_groups_extra + from test.cmdtest_py_d.cfg import cmd_groups_dfl, cmd_groups_extra cmd_groups = cmd_groups_dfl.copy() cmd_groups.update(cmd_groups_extra) @staticmethod - def create_cmd_group(cls,sg_name=None): + def create_cmd_group(cls, sg_name=None): cmd_group_in = dict(cls.cmd_group_in) if sg_name and 'subgroup.' + sg_name not in cmd_group_in: - die(1,f'{sg_name!r}: no such subgroup in test group {cls.__name__}') + die(1, f'{sg_name!r}: no such subgroup in test group {cls.__name__}') - def add_entries(key,add_deps=True,added_subgroups=[]): + def add_entries(key, add_deps=True, added_subgroups=[]): if add_deps: for dep in cmd_group_in['subgroup.'+key]: yield from add_entries(dep) - assert isinstance(cls.cmd_subgroups[key][0],str), f'header for subgroup {key!r} missing!' + assert isinstance(cls.cmd_subgroups[key][0], str), f'header for subgroup {key!r} missing!' if not key in added_subgroups: yield from cls.cmd_subgroups[key][1:] added_subgroups.append(key) def gen(): - for name,data in cls.cmd_group_in: + for name, data in cls.cmd_group_in: if name.startswith('subgroup.'): sg_key = name.removeprefix('subgroup.') - if sg_name in (None,sg_key): + if sg_name in (None, sg_key): yield from add_entries( sg_key, add_deps = sg_name and not cfg.skipping_deps, @@ -358,55 +358,55 @@ class CmdGroupMgr: if cfg.deps_only and sg_key == sg_name: return elif not cfg.skipping_deps: - yield (name,data) + yield (name, data) return tuple(gen()) - def load_mod(self,gname,modname=None): - clsname,kwargs = self.cmd_groups[gname] + def load_mod(self, gname, modname=None): + clsname, kwargs = self.cmd_groups[gname] if modname is None and 'modname' in kwargs: modname = kwargs['modname'] import importlib modpath = f'test.cmdtest_py_d.ct_{modname or gname}' - return getattr(importlib.import_module(modpath),clsname) + return getattr(importlib.import_module(modpath), clsname) - def create_group(self,gname,sg_name,full_data=False,modname=None,is3seed=False,add_dpy=False): + def create_group(self, gname, sg_name, full_data=False, modname=None, is3seed=False, add_dpy=False): """ Initializes the list 'cmd_list' and dict 'dpy_data' from module's cmd_group data. Alternatively, if called with 'add_dpy=True', updates 'dpy_data' from module data without touching 'cmd_list' """ - cls = self.load_mod(gname,modname) + cls = self.load_mod(gname, modname) cdata = [] - def get_shared_deps(cmdname,tmpdir_idx): + def get_shared_deps(cmdname, tmpdir_idx): """ shared_deps are "implied" dependencies for all cmds in cmd_group that don't appear in the cmd_group data or cmds' argument lists. Supported only for 3seed tests at present. """ - if not hasattr(cls,'shared_deps'): + if not hasattr(cls, 'shared_deps'): return [] - return [k for k,v in cfgs[str(tmpdir_idx)]['dep_generators'].items() + return [k for k, v in cfgs[str(tmpdir_idx)]['dep_generators'].items() if k in cls.shared_deps and v != cmdname] - if not hasattr(cls,'cmd_group'): - cls.cmd_group = self.create_cmd_group(cls,sg_name) + if not hasattr(cls, 'cmd_group'): + cls.cmd_group = self.create_cmd_group(cls, sg_name) - for a,b in cls.cmd_group: + for a, b in cls.cmd_group: if is3seed: - for n,(i,j) in enumerate(zip(cls.tmpdir_nums,(128,192,256))): + for n, (i, j) in enumerate(zip(cls.tmpdir_nums, (128, 192, 256))): k = f'{a}_{n+1}' - if hasattr(cls,'skip_cmds') and k in cls.skip_cmds: + if hasattr(cls, 'skip_cmds') and k in cls.skip_cmds: continue - sdeps = get_shared_deps(k,i) - if isinstance(b,str): - cdata.append( (k, (i,f'{b} ({j}-bit)',[[[]+sdeps,i]])) ) + sdeps = get_shared_deps(k, i) + if isinstance(b, str): + cdata.append((k, (i, f'{b} ({j}-bit)', [[[]+sdeps, i]]))) else: - cdata.append( (k, (i,f'{b[1]} ({j}-bit)',[[b[0]+sdeps,i]])) ) + cdata.append((k, (i, f'{b[1]} ({j}-bit)', [[b[0]+sdeps, i]]))) else: - cdata.append( (a, b if full_data else (cls.tmpdir_nums[0],b,[[[],cls.tmpdir_nums[0]]])) ) + cdata.append((a, b if full_data else (cls.tmpdir_nums[0], b, [[[], cls.tmpdir_nums[0]]]))) if add_dpy: self.dpy_data.update(dict(cdata)) @@ -416,26 +416,26 @@ class CmdGroupMgr: return cls - def gm_init_group(self,trunner,gname,sg_name,spawn_prog): + def gm_init_group(self, trunner, gname, sg_name, spawn_prog): kwargs = self.cmd_groups[gname][1] - cls = self.create_group(gname,sg_name,**kwargs) + cls = self.create_group(gname, sg_name, **kwargs) cls.group_name = gname - return cls(trunner,cfgs,spawn_prog) + return cls(trunner, cfgs, spawn_prog) - def get_cls_by_gname(self,gname): - return self.load_mod( gname, self.cmd_groups[gname][1].get('modname') ) + def get_cls_by_gname(self, gname): + return self.load_mod(gname, self.cmd_groups[gname][1].get('modname')) def list_cmd_groups(self): ginfo = [] for gname in self.cmd_groups: - ginfo.append(( gname, self.get_cls_by_gname(gname) )) + ginfo.append((gname, self.get_cls_by_gname(gname))) if cfg.list_current_cmd_groups: exclude = (cfg.exclude_groups or '').split(',') ginfo = [g for g in ginfo if network_id in g[1].networks and not g[0] in exclude - and g[0] in tuple(self.cmd_groups_dfl) + tuple(cmd_args) ] + and g[0] in tuple(self.cmd_groups_dfl) + tuple(cmd_args)] desc = 'CONFIGURED' else: desc = 'AVAILABLE' @@ -443,30 +443,30 @@ class CmdGroupMgr: def gen_output(): yield green(f'{desc} COMMAND GROUPS AND SUBGROUPS:') yield '' - for name,cls in ginfo: + for name, cls in ginfo: yield ' {} - {}'.format( yellow(name.ljust(13)), - (cls.__doc__.strip() if cls.__doc__ else cls.__name__) ) - if hasattr(cls,'cmd_subgroups'): - subgroups = {k:v for k,v in cls.cmd_subgroups.items() if not k.startswith('_')} + (cls.__doc__.strip() if cls.__doc__ else cls.__name__)) + if hasattr(cls, 'cmd_subgroups'): + subgroups = {k:v for k, v in cls.cmd_subgroups.items() if not k.startswith('_')} max_w = max(len(k) for k in subgroups) - for k,v in subgroups.items(): - yield ' + {} · {}'.format( cyan(k.ljust(max_w+1)), v[0] ) + for k, v in subgroups.items(): + yield ' + {} · {}'.format(cyan(k.ljust(max_w+1)), v[0]) from mmgen.ui import do_pager do_pager('\n'.join(gen_output())) - Msg( '\n' + ' '.join(e[0] for e in ginfo) ) + Msg('\n' + ' '.join(e[0] for e in ginfo)) sys.exit(0) - def find_cmd_in_groups(self,cmd,group=None): + def find_cmd_in_groups(self, cmd, group=None): """ Search for a test command in specified group or all configured command groups and return it as a string. Loads modules but alters no global variables. """ if group: if not group in [e[0] for e in self.cmd_groups]: - die(1,f'{group!r}: unrecognized group') + die(1, f'{group!r}: unrecognized group') groups = [self.cmd_groups[group]] else: groups = self.cmd_groups @@ -474,13 +474,13 @@ class CmdGroupMgr: for gname in groups: cls = self.get_cls_by_gname(gname) - if not hasattr(cls,'cmd_group'): + if not hasattr(cls, 'cmd_group'): cls.cmd_group = self.create_cmd_group(cls) if cmd in cls.cmd_group: # first search the class return gname - if cmd in dir(cls(None,None,None)): # then a throwaway instance + if cmd in dir(cls(None, None, None)): # then a throwaway instance return gname # cmd might exist in more than one group - we'll go with the first return None @@ -492,7 +492,7 @@ class CmdTestRunner: if logging: self.log_fd.close() - def __init__(self,data_dir,trash_dir): + def __init__(self, data_dir, trash_dir): self.data_dir = data_dir self.trash_dir = trash_dir @@ -505,16 +505,16 @@ class CmdTestRunner: self.deps_only = None if logging: - self.log_fd = open(cmdtest_py_log_fn,'a') + self.log_fd = open(cmdtest_py_log_fn, 'a') self.log_fd.write(f'\nLog started: {make_timestr()} UTC\n') omsg(f'INFO → Logging to file {cmdtest_py_log_fn!r}') else: self.log_fd = None if cfg.coverage: - coverdir,accfile = init_coverage() + coverdir, accfile = init_coverage() omsg(f'INFO → Writing coverage files to {coverdir!r}') - self.pre_args = ['python3','-m','trace','--count','--coverdir='+coverdir,'--file='+accfile] + self.pre_args = ['python3', '-m', 'trace', '--count', '--coverdir='+coverdir, '--file='+accfile] else: self.pre_args = ['python3'] if sys.platform == 'win32' else [] @@ -567,22 +567,22 @@ class CmdTestRunner: cmd_path = ( cmd if cfg.system # cfg.system is broken for main test group with overlay tree - else os.path.relpath(os.path.join(repo_root,cmd_dir,cmd)) ) + else os.path.relpath(os.path.join(repo_root, cmd_dir, cmd))) args = ( self.pre_args + ([] if no_exec_wrapper else ['scripts/exec_wrapper.py']) + [cmd_path] + ([] if no_passthru_opts else self.passthru_opts) + - args ) + args) try: - qargs = ['{q}{}{q}'.format( a, q = "'" if ' ' in a else '' ) for a in args] + qargs = ['{q}{}{q}'.format(a, q = "'" if ' ' in a else '') for a in args] except: msg(f'args: {args}') raise - cmd_disp = ' '.join(qargs).replace('\\','/') # for mingw + cmd_disp = ' '.join(qargs).replace('\\', '/') # for mingw if logging: self.log_fd.write('[{}][{}:{}] {}\n'.format( @@ -592,17 +592,17 @@ class CmdTestRunner: cmd_disp)) for i in args: # die only after writing log entry - if not isinstance(i,str): - die(2,'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format( + if not isinstance(i, str): + die(2, 'Error: missing input files in cmd line?:\nName: {}\nCmdline: {!r}'.format( self.tg.test_name, - args )) + args)) if not no_msg: t_pfx = '' if cfg.no_timings else f'[{time.time() - self.start_time:08.2f}] ' if cfg.verbose or cfg.print_cmdline or cfg.exact_output: omsg(green(f'{t_pfx}Testing: {desc}')) if not msg_only: - clr1,clr2 = (nocolor,nocolor) if cfg.print_cmdline else (green,cyan) + clr1, clr2 = (nocolor, nocolor) if cfg.print_cmdline else (green, cyan) omsg( clr1('Executing: ') + clr2(repr(cmd_disp) if sys.platform == 'win32' else cmd_disp) @@ -638,7 +638,7 @@ class CmdTestRunner: pexpect_spawn = pexpect_spawn, timeout = timeout, send_delay = send_delay, - direct_exec = direct_exec ) + direct_exec = direct_exec) def end_msg(self): t = int(time.time() - self.start_time) @@ -647,7 +647,7 @@ class CmdTestRunner: ('\n' if cfg.no_timings else f'. Elapsed time: {t//60:02d}:{t%60:02d}\n') )) - def init_group(self,gname,sg_name=None,cmd=None,quiet=False,do_clean=True): + def init_group(self, gname, sg_name=None, cmd=None, quiet=False, do_clean=True): from test.cmdtest_py_d.cfg import cmd_groups_altcoin if cfg.no_altcoin and gname in cmd_groups_altcoin: @@ -660,19 +660,19 @@ class CmdTestRunner: omsg(gray(f'INFO → skipping test {gname!r} for platform {sys.platform!r}')) return None - for k in ('segwit','segwit_random','bech32'): - if getattr(cfg,k): + for k in ('segwit', 'segwit_random', 'bech32'): + if getattr(cfg, k): segwit_opt = k break else: segwit_opt = None def gen_msg(): - yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname,c=cmd) + yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname, c=cmd) if len(ct_cls.networks) != 1: yield f' for {proto.coin} {proto.network}' if segwit_opt: - yield ' (--{})'.format( segwit_opt.replace('_','-') ) + yield ' (--{})'.format(segwit_opt.replace('_', '-')) m = ''.join(gen_msg()) @@ -681,10 +681,10 @@ class CmdTestRunner: return None # 'networks = ()' means all networks allowed - nws = [(e.split('_')[0],'testnet') if '_' in e else (e,'mainnet') for e in ct_cls.networks] + nws = [(e.split('_')[0], 'testnet') if '_' in e else (e, 'mainnet') for e in ct_cls.networks] if nws: coin = proto.coin.lower() - for a,b in nws: + for a, b in nws: if a == coin and b == proto.network: break else: @@ -698,18 +698,18 @@ class CmdTestRunner: bmsg('Executing ' + m) if (not self.daemon_started) and self.gm.get_cls_by_gname(gname).need_daemon: - start_test_daemons(network_id,remove_datadir=True) + start_test_daemons(network_id, remove_datadir=True) self.daemon_started = True - if hasattr(self,'tg'): + if hasattr(self, 'tg'): del self.tg - self.tg = self.gm.gm_init_group(self,gname,sg_name,self.spawn_wrapper) + self.tg = self.gm.gm_init_group(self, gname, sg_name, self.spawn_wrapper) self.ct_clsname = type(self.tg).__name__ # pass through opts from cmdline (po.user_opts) self.passthru_opts = ['--{}{}'.format( - k.replace('_','-'), + k.replace('_', '-'), '' if cfg._uopts[k] is True else '=' + cfg._uopts[k] ) for k in cfg._uopts if k in self.tg.base_passthru_opts + self.tg.passthru_opts] @@ -722,45 +722,45 @@ class CmdTestRunner: cfg.exit_after = self.resume_cmd if cfg.exit_after and cfg.exit_after not in self.gm.cmd_list: - die(1,f'{cfg.exit_after!r}: command not recognized') + die(1, f'{cfg.exit_after!r}: command not recognized') return self.tg - def run_tests(self,cmd_args): + def run_tests(self, cmd_args): self.start_time = time.time() self.daemon_started = False gname_save = None def parse_arg(arg): if '.' in arg: - a,b = arg.split('.') - return [a] + b.split(':') if ':' in b else [a,b,None] + a, b = arg.split('.') + return [a] + b.split(':') if ':' in b else [a, b, None] elif ':' in arg: - a,b = arg.split(':') - return [a,None,b] + a, b = arg.split(':') + return [a, None, b] else: - return [self.gm.find_cmd_in_groups(arg),None,arg] + return [self.gm.find_cmd_in_groups(arg), None, arg] if cmd_args: for arg in cmd_args: if arg in self.gm.cmd_groups: if self.init_group(arg): for cmd in self.gm.cmd_list: - self.check_needs_rerun(cmd,build=True) + self.check_needs_rerun(cmd, build=True) do_between() else: - gname,sg_name,cmdname = parse_arg(arg) + gname, sg_name, cmdname = parse_arg(arg) if gname: same_grp = gname == gname_save # same group as previous cmd: don't clean, suppress blue msg - if self.init_group(gname,sg_name,cmdname,quiet=same_grp,do_clean=not same_grp): + if self.init_group(gname, sg_name, cmdname, quiet=same_grp, do_clean=not same_grp): if cmdname: if cfg.deps_only: self.deps_only = cmdname try: - self.check_needs_rerun(cmdname,build=True) + self.check_needs_rerun(cmdname, build=True) except Exception as e: # allow calling of functions not in cmd_group - if isinstance(e,KeyError) and e.args[0] == cmdname: - ret = getattr(self.tg,cmdname)() + if isinstance(e, KeyError) and e.args[0] == cmdname: + ret = getattr(self.tg, cmdname)() if type(ret).__name__ == 'coroutine': asyncio.run(ret) else: @@ -768,23 +768,23 @@ class CmdTestRunner: do_between() else: for cmd in self.gm.cmd_list: - self.check_needs_rerun(cmd,build=True) + self.check_needs_rerun(cmd, build=True) do_between() gname_save = gname else: - die(1,f'{arg!r}: command not recognized') + die(1, f'{arg!r}: command not recognized') else: if cfg.exclude_groups: exclude = cfg.exclude_groups.split(',') for e in exclude: if e not in self.gm.cmd_groups_dfl: - die(1,f'{e!r}: group not recognized') + die(1, f'{e!r}: group not recognized') for gname in self.gm.cmd_groups_dfl: if cfg.exclude_groups and gname in exclude: continue if self.init_group(gname): for cmd in self.gm.cmd_list: - self.check_needs_rerun(cmd,build=True) + self.check_needs_rerun(cmd, build=True) do_between() self.end_msg() @@ -810,7 +810,7 @@ class CmdTestRunner: ret = self.get_num_exts_for_cmd(cmd) if ret: for ext in ret[1]: - fn = get_file_with_ext(cfgs[ret[0]]['tmpdir'],ext,delete=build) + fn = get_file_with_ext(cfgs[ret[0]]['tmpdir'], ext, delete=build) if fn: if force_delete: os.unlink(fn) @@ -822,13 +822,13 @@ class CmdTestRunner: for fn in fns: my_age = os.stat(fn).st_mtime - for num,ext in fdeps: - f = get_file_with_ext(cfgs[num]['tmpdir'],ext,delete=build) + for num, ext in fdeps: + f = get_file_with_ext(cfgs[num]['tmpdir'], ext, delete=build) if f and os.stat(f).st_mtime > my_age: rerun = True for cdep in cdeps: - if self.check_needs_rerun(cdep,build=build,root=False,dpy=cmd): + if self.check_needs_rerun(cdep, build=build, root=False, dpy=cmd): rerun = True if build: @@ -843,22 +843,22 @@ class CmdTestRunner: else: # If prog produces multiple files: if cmd not in self.rebuild_list or rerun is True: - self.rebuild_list[cmd] = (rerun,fns[0] if fns else '') # FIX + self.rebuild_list[cmd] = (rerun, fns[0] if fns else '') # FIX return rerun - def run_test(self,cmd): + def run_test(self, cmd): if self.deps_only and cmd == self.deps_only: sys.exit(0) - d = [(str(num),ext) for exts,num in self.gm.dpy_data[cmd][2] for ext in exts] + d = [(str(num), ext) for exts, num in self.gm.dpy_data[cmd][2] for ext in exts] # delete files depended on by this cmd - arg_list = [get_file_with_ext(cfgs[num]['tmpdir'],ext) for num,ext in d] + arg_list = [get_file_with_ext(cfgs[num]['tmpdir'], ext) for num, ext in d] # remove shared_deps from arg list - if hasattr(self.tg,'shared_deps'): + if hasattr(self.tg, 'shared_deps'): arg_list = arg_list[:-len(self.tg.shared_deps)] if self.resume_cmd: @@ -878,21 +878,19 @@ class CmdTestRunner: self.tg.tmpdir_num = cdata[0] # self.tg.cfg = cfgs[str(cdata[0])] # will remove this eventually test_cfg = cfgs[str(cdata[0])] - for k in ( 'seed_len', 'seed_id', - 'wpasswd', 'kapasswd', - 'segwit', 'hash_preset', - 'bw_filename', 'bw_params', 'ref_bw_seed_id', - 'addr_idx_list', 'pass_idx_list' ): + for k in ( + 'seed_len', 'seed_id', 'wpasswd', 'kapasswd', 'segwit', 'hash_preset', 'bw_filename', + 'bw_params', 'ref_bw_seed_id', 'addr_idx_list', 'pass_idx_list'): if k in test_cfg: - setattr(self.tg,k,test_cfg[k]) + setattr(self.tg, k, test_cfg[k]) - ret = getattr(self.tg,cmd)(*arg_list) # run the test + ret = getattr(self.tg, cmd)(*arg_list) # run the test if type(ret).__name__ == 'coroutine': ret = asyncio.run(ret) - self.process_retval(cmd,ret) + self.process_retval(cmd, ret) if cfg.profile: - omsg('\r\033[50C{:.4f}'.format( time.time() - start )) + omsg('\r\033[50C{:.4f}'.format(time.time() - start)) if cmd == cfg.exit_after: sys.exit(0) @@ -903,7 +901,7 @@ class CmdTestRunner: r = '-' * 72 + '\n' print(r+('\n'+r).join(self.skipped_warnings)) - def process_retval(self,cmd,ret): + def process_retval(self, cmd, ret): if type(ret).__name__ == 'MMGenPexpect': ret.ok(exit_val=self.exit_val) self.cmd_total += 1 @@ -911,45 +909,45 @@ class CmdTestRunner: ok() self.cmd_total += 1 elif ret == 'error': - die(2,red(f'\nTest {self.tg.test_name!r} failed')) - elif ret in ('skip','skip_msg','silent'): + die(2, red(f'\nTest {self.tg.test_name!r} failed')) + elif ret in ('skip', 'skip_msg', 'silent'): if ret == 'silent': self.cmd_total += 1 elif ret == 'skip_msg': ok('SKIP') - elif isinstance(ret,tuple) and ret[0] == 'skip_warn': + elif isinstance(ret, tuple) and ret[0] == 'skip_warn': self.skipped_warnings.append( - 'Test {!r} was skipped:\n {}'.format(cmd,'\n '.join(ret[1].split('\n')))) + 'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n')))) else: - die(2,f'{cmd!r} returned {ret}') + die(2, f'{cmd!r} returned {ret}') - def check_deps(self,cmds): # TODO: broken + def check_deps(self, cmds): # TODO: broken if len(cmds) != 1: - die(1,f'Usage: {gc.prog_name} check_deps ') + die(1, f'Usage: {gc.prog_name} check_deps ') cmd = cmds[0] if cmd not in self.gm.cmd_list: - die(1,f'{cmd!r}: unrecognized command') + die(1, f'{cmd!r}: unrecognized command') if not cfg.quiet: omsg(f'Checking dependencies for {cmd!r}') - self.check_needs_rerun(self.tg,cmd) + self.check_needs_rerun(self.tg, cmd) - w = max(map(len,self.rebuild_list)) + 1 + w = max(map(len, self.rebuild_list)) + 1 for cmd in self.rebuild_list: c = self.rebuild_list[cmd] m = 'Rebuild' if (c[0] and c[1]) else 'Build' if c[0] else 'OK' - omsg('cmd {:<{w}} {}'.format( cmd+':', m, w=w )) + omsg('cmd {:<{w}} {}'.format(cmd+':', m, w=w)) - def generate_file_deps(self,cmd): - return [(str(n),e) for exts,n in self.gm.dpy_data[cmd][2] for e in exts] + def generate_file_deps(self, cmd): + return [(str(n), e) for exts, n in self.gm.dpy_data[cmd][2] for e in exts] - def generate_cmd_deps(self,fdeps): - return [cfgs[str(n)]['dep_generators'][ext] for n,ext in fdeps] + def generate_cmd_deps(self, fdeps): + return [cfgs[str(n)]['dep_generators'][ext] for n, ext in fdeps] - def get_num_exts_for_cmd(self,cmd): + def get_num_exts_for_cmd(self, cmd): try: num = str(self.gm.dpy_data[cmd][0]) except KeyError: @@ -958,7 +956,7 @@ class CmdTestRunner: if gname: kwargs = self.gm.cmd_groups[gname][1] kwargs.update({'add_dpy':True}) - self.gm.create_group(gname,None,**kwargs) + self.gm.create_group(gname, None, **kwargs) num = str(self.gm.dpy_data[cmd][0]) qmsg(f' found in group {gname!r}') else: @@ -967,7 +965,7 @@ class CmdTestRunner: dgl = cfgs[num]['dep_generators'] if cmd in dgl.values(): exts = [k for k in dgl if dgl[k] == cmd] - return (num,exts) + return (num, exts) else: return None @@ -987,7 +985,7 @@ if __name__ == '__main__': from mmgen.exception import TestSuiteSpawnedScriptException try: - tr = CmdTestRunner(data_dir,trash_dir) + tr = CmdTestRunner(data_dir, trash_dir) tr.run_tests(cmd_args) tr.warn_skipped() if tr.daemon_started and not cfg.no_daemon_stop: @@ -1002,7 +1000,7 @@ if __name__ == '__main__': if hasattr(tr, 'tg'): del tr.tg del tr - die(1,yellow('\ntest.py exiting at user request')) + die(1, yellow('\ntest.py exiting at user request')) except TestSuiteSpawnedScriptException as e: # if spawned script is not running under exec_wrapper, output brief error msg: if os.getenv('MMGEN_EXEC_WRAPPER'): @@ -1021,4 +1019,4 @@ if __name__ == '__main__': # if cmdtest.py itself is running under exec_wrapper, re-raise so exec_wrapper can handle exception: if os.getenv('MMGEN_EXEC_WRAPPER') or not os.getenv('MMGEN_IGNORE_TEST_PY_EXCEPTION'): raise - die(1,red('Test script exited with error')) + die(1, red('Test script exited with error')) diff --git a/test/colortest.py b/test/colortest.py index f18e864a..0c5d7e20 100755 --- a/test/colortest.py +++ b/test/colortest.py @@ -15,25 +15,35 @@ except ImportError: from test.include import test_init from mmgen.color import * -from mmgen.util import msg,ymsg,gmsg +from mmgen.util import msg, ymsg, gmsg import mmgen.color as color_mod def test_color(): ymsg("Terminal display:") # init_color() not called yet, so no yellow here - for desc,nc in (('pre-init',None),('auto','auto'),('8-color',8),('256-color',256),('disabled',0)): + for desc, nc in ( + ('pre-init', None), + ('auto', 'auto'), + ('8-color', 8), + ('256-color', 256), + ('disabled', 0)): if nc is not None: init_color(num_colors=nc) msg('{:9}: {}'.format( desc, - ' '.join( getattr(color_mod,c)(c) for c in sorted(color_mod._colors) ) )) + ' '.join(getattr(color_mod, c)(c) for c in sorted(color_mod._colors)))) init_color() gmsg("\nParsed terminfo 'colors' values:") from mmgen.color import orange - for t,c in (('rxvt',8),('xterm',8),('rxvt-unicode',88),('screen-256color',256),('xterm-256color',256)): + for t, c in ( + ('rxvt', 8), + ('xterm', 8), + ('rxvt-unicode', 88), + ('screen-256color', 256), + ('xterm-256color', 256)): ret = get_terminfo_colors(t) if ret is None: ymsg(f'Warning: unable to get info for terminal {t!r}') diff --git a/test/gentest.py b/test/gentest.py index 9585924c..0cb9c852 100755 --- a/test/gentest.py +++ b/test/gentest.py @@ -20,7 +20,7 @@ test/gentest.py: Cryptocoin key/address generation tests for the MMGen suite """ -import sys,os,time +import sys, os, time try: from include import test_init @@ -28,9 +28,9 @@ except ImportError: from test.include import test_init # Import these _after_ local path's been added to sys.path -from mmgen.cfg import gc,Config -from mmgen.color import green,red,purple -from mmgen.util import msg,ymsg,capfirst,is_int,die +from mmgen.cfg import gc, Config +from mmgen.color import green, red, purple +from mmgen.util import msg, ymsg, capfirst, is_int, die results_file = 'gentest.out.json' @@ -47,8 +47,8 @@ opts_data = { -q, --quiet Produce quieter output -s, --save-results Save output of external tool in Compare test to {rf!r} --t, --type=t Specify address type (e.g. 'compressed','segwit', - 'zcash_z','bech32') +-t, --type=t Specify address type (e.g. 'compressed', 'segwit', + 'zcash_z', 'bech32') -v, --verbose Produce more verbose output """, 'notes': """ @@ -106,7 +106,7 @@ EXAMPLES: SUPPORTED EXTERNAL TOOLS: - + ethkey (for ETH,ETC) + + ethkey (for ETH, ETC) https://github.com/openethereum/openethereum (build with 'cargo build -p ethkey-cli --release') @@ -126,23 +126,23 @@ SUPPORTED EXTERNAL TOOLS: }, 'code': { 'options': lambda s: s.format( - rf=results_file, + rf = results_file, ), 'notes': lambda s: s.format( - prog='test/gentest.py', - pnm=gc.proj_name, - snum=rounds ) + prog = 'test/gentest.py', + pnm = gc.proj_name, + snum = rounds) } } -def get_cmd_output(cmd,input=None): - return run(cmd,input=input,stdout=PIPE,stderr=DEVNULL).stdout.decode().splitlines() +def get_cmd_output(cmd, input=None): + return run(cmd, input=input, stdout=PIPE, stderr=DEVNULL).stdout.decode().splitlines() saved_results = {} class GenTool: - def __init__(self,proto,addr_type): + def __init__(self, proto, addr_type): self.proto = proto self.addr_type = addr_type self.data = {} @@ -150,55 +150,55 @@ class GenTool: def __del__(self): if cfg.save_results: key = f'{self.proto.coin}-{self.proto.network}-{self.addr_type.name}-{self.desc}'.lower() - saved_results[key] = {k.hex():v._asdict() for k,v in self.data.items()} + saved_results[key] = {k.hex():v._asdict() for k, v in self.data.items()} - def run_tool(self,sec,cache_data): + def run_tool(self, sec, cache_data): vcoin = 'BTC' if self.proto.coin == 'BCH' else self.proto.coin key = sec.orig_bytes if key in self.data: return self.data[key] else: - ret = self.run(sec,vcoin) + ret = self.run(sec, vcoin) if cache_data: - self.data[key] = sd( **{'reduced':sec.hex()}, **ret._asdict() ) + self.data[key] = sd(**{'reduced':sec.hex()}, **ret._asdict()) return ret class GenToolEthkey(GenTool): desc = 'ethkey' - def __init__(self,*args,**kwargs): + def __init__(self, *args, **kwargs): self.cmdname = get_ethkey() - super().__init__(*args,**kwargs) + super().__init__(*args, **kwargs) - def run(self,sec,vcoin): - o = get_cmd_output([self.cmdname,'info',sec.hex()]) + def run(self, sec, vcoin): + o = get_cmd_output([self.cmdname, 'info', sec.hex()]) return gtr( o[0].split()[1], o[-1].split()[1], - None ) + None) class GenToolKeyconv(GenTool): desc = 'keyconv' - def run(self,sec,vcoin): - o = get_cmd_output(['keyconv','-C',vcoin,sec.wif]) + def run(self, sec, vcoin): + o = get_cmd_output(['keyconv', '-C', vcoin, sec.wif]) return gtr( o[1].split()[1], o[0].split()[1], - None ) + None) class GenToolZcash_mini(GenTool): desc = 'zcash-mini' - def run(self,sec,vcoin): - o = get_cmd_output(['zcash-mini','-key','-simple'],input=(sec.wif+'\n').encode()) - return gtr( o[1], o[0], o[-1] ) + def run(self, sec, vcoin): + o = get_cmd_output(['zcash-mini', '-key', '-simple'], input=(sec.wif+'\n').encode()) + return gtr(o[1], o[0], o[-1]) class GenToolPycoin(GenTool): """ pycoin/networks/all.py pycoin/networks/legacy_networks.py """ desc = 'pycoin' - def __init__(self,*args,**kwargs): - super().__init__(*args,**kwargs) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) try: from pycoin.networks.registry import network_for_netcode except ImportError as e: @@ -206,16 +206,16 @@ class GenToolPycoin(GenTool): f'{e}\nUnable to import pycoin.networks.registry. Is pycoin installed on your system?') from e self.nfnc = network_for_netcode - def run(self,sec,vcoin): + def run(self, sec, vcoin): if self.proto.testnet: vcoin = cinfo.external_tests['testnet']['pycoin'][vcoin] network = self.nfnc(vcoin) key = network.keys.private( - secret_exponent = int(sec.hex(),16), - is_compressed = self.addr_type.name != 'legacy' ) + secret_exponent = int(sec.hex(), 16), + is_compressed = self.addr_type.name != 'legacy') if key is None: - die(1,f'can’t parse {sec.hex()}') - if self.addr_type.name in ('segwit','bech32'): + die(1, f'can’t parse {sec.hex()}') + if self.addr_type.name in ('segwit', 'bech32'): hash160_c = key.hash160(is_compressed=True) if self.addr_type.name == 'segwit': p2sh_script = network.contract.for_p2pkh_wit(hash160_c) @@ -224,13 +224,13 @@ class GenToolPycoin(GenTool): addr = network.address.for_p2pkh_wit(hash160_c) else: addr = key.address() - return gtr( key.wif(), addr, None ) + return gtr(key.wif(), addr, None) class GenToolMonero_python(GenTool): desc = 'monero-python' - def __init__(self,*args,**kwargs): - super().__init__(*args,**kwargs) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) try: from monero.seed import Seed except ImportError as e: @@ -238,36 +238,36 @@ class GenToolMonero_python(GenTool): f'{e}\nUnable to import monero-python. Is monero-python installed on your system?') from e self.Seed = Seed - def run(self,sec,vcoin): - seed = self.Seed( sec.orig_bytes.hex() ) + def run(self, sec, vcoin): + seed = self.Seed(sec.orig_bytes.hex()) sk = seed.secret_spend_key() vk = seed.secret_view_key() addr = seed.public_address() - return gtr( sk, addr, vk ) + return gtr(sk, addr, vk) -def find_or_check_tool(proto,addr_type,toolname): +def find_or_check_tool(proto, addr_type, toolname): ext_progs = list(cinfo.external_tests[proto.network]) if toolname not in ext_progs + ['ext']: - die(1,f'{toolname!r}: unsupported tool for network {proto.network}') + die(1, f'{toolname!r}: unsupported tool for network {proto.network}') if cfg.all_coins and toolname == 'ext': - die(1,"'--all-coins' must be combined with a specific external testing tool") + die(1, "'--all-coins' must be combined with a specific external testing tool") else: tool = cinfo.get_test_support( proto.coin, addr_type.name, proto.network, verbose = not cfg.quiet, - toolname = toolname if toolname != 'ext' else None ) + toolname = toolname if toolname != 'ext' else None) if tool and toolname in ext_progs and toolname != tool: sys.exit(3) if tool is None: return None return tool -def test_equal(desc,a_val,b_val,in_bytes,sec,wif,a_desc,b_desc): +def test_equal(desc, a_val, b_val, in_bytes, sec, wif, a_desc, b_desc): if a_val != b_val: fs = """ {i:{w}}: {} @@ -281,33 +281,33 @@ def test_equal(desc,a_val,b_val,in_bytes,sec,wif,a_desc,b_desc): + fs.format( in_bytes.hex(), sec, wif, a_val, b_val, i='input', s='sec key', W='WIF key', a=a_desc, b=b_desc, - w=max(len(e) for e in (a_desc,b_desc)) + 1 + w=max(len(e) for e in (a_desc, b_desc)) + 1 ).rstrip()) -def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data): +def do_ab_test(proto, scfg, addr_type, gen1, kg2, ag, tool, cache_data): - def do_ab_inner(n,trounds,in_bytes): + def do_ab_inner(n, trounds, in_bytes): global last_t if cfg.verbose or time.time() - last_t >= 0.1: qmsg_r(f'\rRound {i+1}/{trounds} ') last_t = time.time() - sec = PrivKey(proto,in_bytes,compressed=addr_type.compressed,pubkey_type=addr_type.pubkey_type) + sec = PrivKey(proto, in_bytes, compressed=addr_type.compressed, pubkey_type=addr_type.pubkey_type) data = kg1.gen_data(sec) addr1 = ag.to_addr(data) view_pref = 1 if proto.coin == 'BCH' else 0 - tinfo = ( in_bytes, sec, sec.wif, type(kg1).__name__, type(kg2).__name__ if kg2 else tool.desc ) + tinfo = (in_bytes, sec, sec.wif, type(kg1).__name__, type(kg2).__name__ if kg2 else tool.desc) def do_msg(): if cfg.verbose: - msg( fs.format( b=in_bytes.hex(), r=sec.hex(), k=sec.wif, v=vk2, a=addr1 )) + msg(fs.format(b=in_bytes.hex(), r=sec.hex(), k=sec.wif, v=vk2, a=addr1)) if tool: def run_tool(): - o = tool.run_tool(sec,cache_data) - test_equal( 'WIF keys', sec.wif, o.wif, *tinfo ) + o = tool.run_tool(sec, cache_data) + test_equal('WIF keys', sec.wif, o.wif, *tinfo) test_equal('addresses', addr1.views[view_pref], o.addr, *tinfo) if o.viewkey: - test_equal( 'view keys', ag.to_viewkey(data), o.viewkey, *tinfo ) + test_equal('view keys', ag.to_viewkey(data), o.viewkey, *tinfo) return o.viewkey vk2 = run_tool() do_msg() @@ -325,18 +325,18 @@ def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data): for _ in range(scfg.rounds): yield getrand(32) - kg1 = KeyGenerator( cfg, proto, addr_type.pubkey_type, gen1 ) + kg1 = KeyGenerator(cfg, proto, addr_type.pubkey_type, gen1) if type(kg1) == type(kg2): - die(4,'Key generators are the same!') + die(4, 'Key generators are the same!') - e = cinfo.get_entry(proto.coin,proto.network) + e = cinfo.get_entry(proto.coin, proto.network) qmsg(green("Comparing address generators '{A}' and '{B}' for {N} {c} ({n}), addrtype {a!r}".format( - A = type(kg1).__name__.replace('_','-'), - B = type(kg2).__name__.replace('_','-') if kg2 else tool.desc, + A = type(kg1).__name__.replace('_', '-'), + B = type(kg2).__name__.replace('_', '-') if kg2 else tool.desc, N = proto.network, c = proto.coin, n = e.name if e else '---', - a = addr_type.name ))) + a = addr_type.name))) global last_t last_t = time.time() @@ -346,7 +346,7 @@ def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data): '\nreduced: {r}' + '\n{:9} {{k}}'.format(addr_type.wif_label+':') + ('\nviewkey: {v}' if 'viewkey' in addr_type.extra_attrs else '') + - '\naddr: {a}\n' ) + '\naddr: {a}\n') group_order = CoinProtocol.Secp256k1.secp256k1_group_order @@ -363,35 +363,35 @@ def do_ab_test(proto,scfg,addr_type,gen1,kg2,ag,tool,cache_data): ) qmsg(purple('edge cases:')) - for i,privbytes in enumerate(edgecase_sks): - do_ab_inner(i,len(edgecase_sks),privbytes) + for i, privbytes in enumerate(edgecase_sks): + do_ab_inner(i, len(edgecase_sks), privbytes) qmsg(green('\rOK ' if cfg.verbose else 'OK')) qmsg(purple('random input:')) - for i,privbytes in enumerate(get_randbytes()): - do_ab_inner(i,scfg.rounds,privbytes) + for i, privbytes in enumerate(get_randbytes()): + do_ab_inner(i, scfg.rounds, privbytes) qmsg(green('\rOK ' if cfg.verbose else 'OK')) -def init_tool(proto,addr_type,toolname): - return globals()['GenTool'+capfirst(toolname.replace('-','_'))](proto,addr_type) +def init_tool(proto, addr_type, toolname): + return globals()['GenTool'+capfirst(toolname.replace('-', '_'))](proto, addr_type) -def ab_test(proto,scfg): +def ab_test(proto, scfg): - addr_type = MMGenAddrType( proto=proto, id_str=cfg.type or proto.dfl_mmtype ) + addr_type = MMGenAddrType(proto=proto, id_str=cfg.type or proto.dfl_mmtype) if scfg.gen2: assert scfg.gen1 != 'all', "'all' must be used only with external tool" - kg2 = KeyGenerator( cfg, proto, addr_type.pubkey_type, scfg.gen2 ) + kg2 = KeyGenerator(cfg, proto, addr_type.pubkey_type, scfg.gen2) tool = None else: - toolname = find_or_check_tool( proto, addr_type, scfg.tool ) + toolname = find_or_check_tool(proto, addr_type, scfg.tool) if toolname is None: ymsg(f'Warning: skipping tool {scfg.tool!r} for {proto.coin} {addr_type.name}') return - tool = init_tool( proto, addr_type, toolname ) + tool = init_tool(proto, addr_type, toolname) kg2 = None - ag = AddrGenerator( cfg, proto, addr_type ) + ag = AddrGenerator(cfg, proto, addr_type) if scfg.all_backends: # check all backends against external tool for n in range(len(get_backends(addr_type.pubkey_type))): @@ -415,21 +415,21 @@ def ab_test(proto,scfg): tool = tool, cache_data = False) -def speed_test(proto,kg,ag,rounds): +def speed_test(proto, kg, ag, rounds): qmsg(green('Testing speed of address generator {!r} for coin {}'.format( type(kg).__name__, - proto.coin ))) + proto.coin))) from struct import pack seed = getrand(28) qmsg('Incrementing key with each round') - qmsg('Starting key: {}'.format( (seed + pack('I',0)).hex() )) + qmsg('Starting key: {}'.format((seed + pack('I', 0)).hex())) start = last_t = time.time() for i in range(rounds): if time.time() - last_t >= 0.1: qmsg_r(f'\rRound {i+1}/{rounds} ') last_t = time.time() - sec = PrivKey( proto, seed+pack('I', i), compressed=ag.compressed, pubkey_type=ag.pubkey_type ) + sec = PrivKey(proto, seed+pack('I', i), compressed=ag.compressed, pubkey_type=ag.pubkey_type) addr = ag.to_addr(kg.gen_data(sec)) vmsg(f'\nkey: {sec.wif}\naddr: {addr}\n') qmsg( @@ -438,12 +438,12 @@ def speed_test(proto,kg,ag,rounds): ('' if cfg.test_suite_deterministic else f' in {time.time()-start:.2f} seconds') ) -def dump_test(proto,kg,ag,filename): +def dump_test(proto, kg, ag, filename): with open(filename) as fp: dump = [[*(e.split()[0] for e in line.split('addr='))] for line in fp.readlines() if 'addr=' in line] if not dump: - die(1,f'File {filename!r} appears not to be a wallet dump') + die(1, f'File {filename!r} appears not to be a wallet dump') qmsg(green( "A: generator pair '{}:{}'\nB: wallet dump {!r}".format( @@ -451,27 +451,27 @@ def dump_test(proto,kg,ag,filename): type(ag).__name__, filename))) - for count,(b_wif,b_addr) in enumerate(dump,1): + for count, (b_wif, b_addr) in enumerate(dump, 1): qmsg_r(f'\rKey {count}/{len(dump)} ') try: - b_sec = PrivKey(proto,wif=b_wif) + b_sec = PrivKey(proto, wif=b_wif) except: - die(2,f'\nInvalid {proto.network} WIF address in dump file: {b_wif}') + die(2, f'\nInvalid {proto.network} WIF address in dump file: {b_wif}') a_addr = ag.to_addr(kg.gen_data(b_sec)) vmsg(f'\nwif: {b_wif}\naddr: {b_addr}\n') - tinfo = (b_sec,b_sec.hex(),b_wif,type(kg).__name__,filename) - test_equal('addresses',a_addr,b_addr,*tinfo) + tinfo = (b_sec, b_sec.hex(), b_wif, type(kg).__name__, filename) + test_equal('addresses', a_addr, b_addr, *tinfo) - qmsg(green(('\n','')[bool(cfg.verbose)] + 'OK')) + qmsg(green(('\n', '')[bool(cfg.verbose)] + 'OK')) -def get_protos(proto,addr_type,toolname): +def get_protos(proto, addr_type, toolname): init_genonly_altcoins(testnet=proto.testnet) for coin in cinfo.external_tests[proto.network][toolname]: if coin.lower() not in CoinProtocol.coins: continue - ret = init_proto( cfg, coin, testnet=proto.testnet ) + ret = init_proto(cfg, coin, testnet=proto.testnet) if addr_type not in ret.mmtypes: continue yield ret @@ -481,15 +481,15 @@ def parse_args(): if len(cfg._args) != 2: cfg._usage() - arg1,arg2 = cfg._args - gen1,gen2,rounds = (0,0,0) - tool,all_backends,dumpfile = (None,None,None) + arg1, arg2 = cfg._args + gen1, gen2, rounds = (0, 0, 0) + tool, all_backends, dumpfile = (None, None, None) if is_int(arg1) and is_int(arg2): test = 'speed' gen1 = arg1 rounds = arg2 - elif is_int(arg1) and os.access(arg2,os.R_OK): + elif is_int(arg1) and os.access(arg2, os.R_OK): test = 'dump' gen1 = arg1 dumpfile = arg2 @@ -498,12 +498,12 @@ def parse_args(): rounds = arg2 if not is_int(arg2): - die(1,'Second argument must be dump filename or integer rounds specification') + die(1, 'Second argument must be dump filename or integer rounds specification') try: - a,b = arg1.split(':') + a, b = arg1.split(':') except: - die(1,'First argument must be a generator backend number or two colon-separated arguments') + die(1, 'First argument must be a generator backend number or two colon-separated arguments') if is_int(a): gen1 = a @@ -511,64 +511,65 @@ def parse_args(): if a == 'all': all_backends = True else: - die(1,"First part of first argument must be a generator backend number or 'all'") + die(1, "First part of first argument must be a generator backend number or 'all'") if is_int(b): if cfg.all_coins: - die(1,'--all-coins must be used with external tool only') + die(1, '--all-coins must be used with external tool only') gen2 = b else: tool = b ext_progs = list(cinfo.external_tests[cfg._proto.network]) + ['ext'] if b not in ext_progs: - die(1,f'Second part of first argument must be a generator backend number or one of {ext_progs}') + die(1, f'Second part of first argument must be a generator backend number or one of {ext_progs}') - return namedtuple('parsed_args',['test','gen1','gen2','rounds','tool','all_backends','dumpfile'])( + return namedtuple('parsed_args', + ['test', 'gen1', 'gen2', 'rounds', 'tool', 'all_backends', 'dumpfile'])( test, int(gen1) or None, int(gen2) or None, int(rounds) or None, tool, all_backends, - dumpfile ) + dumpfile) def main(): scfg = parse_args() - addr_type = MMGenAddrType( proto=proto, id_str=cfg.type or proto.dfl_mmtype ) + addr_type = MMGenAddrType(proto=proto, id_str=cfg.type or proto.dfl_mmtype) if scfg.test == 'ab': - protos = get_protos(proto,addr_type,scfg.tool) if cfg.all_coins else [proto] + protos = get_protos(proto, addr_type, scfg.tool) if cfg.all_coins else [proto] for p in protos: - ab_test( p, scfg ) + ab_test(p, scfg) else: - kg = KeyGenerator( cfg, proto, addr_type.pubkey_type, scfg.gen1 ) - ag = AddrGenerator( cfg, proto, addr_type ) + kg = KeyGenerator(cfg, proto, addr_type.pubkey_type, scfg.gen1) + ag = AddrGenerator(cfg, proto, addr_type) if scfg.test == 'speed': - speed_test( proto, kg, ag, scfg.rounds ) + speed_test(proto, kg, ag, scfg.rounds) elif scfg.test == 'dump': - dump_test( proto, kg, ag, scfg.dumpfile ) + dump_test(proto, kg, ag, scfg.dumpfile) if saved_results: import json - with open(results_file,'w') as fp: - fp.write(json.dumps( saved_results, indent=4 )) + with open(results_file, 'w') as fp: + fp.write(json.dumps(saved_results, indent=4)) -from subprocess import run,PIPE,DEVNULL +from subprocess import run, PIPE, DEVNULL from collections import namedtuple -from mmgen.protocol import init_proto,CoinProtocol +from mmgen.protocol import init_proto, CoinProtocol from mmgen.altcoin.params import init_genonly_altcoins from test.altcointest import TestCoinInfo as cinfo from mmgen.key import PrivKey from mmgen.addr import MMGenAddrType -from mmgen.addrgen import KeyGenerator,AddrGenerator +from mmgen.addrgen import KeyGenerator, AddrGenerator from mmgen.keygen import get_backends from mmgen.util2 import load_cryptodomex -from test.include.common import getrand,get_ethkey,set_globals +from test.include.common import getrand, get_ethkey, set_globals -gtr = namedtuple('gen_tool_result',['wif','addr','viewkey']) -sd = namedtuple('saved_data_item',['reduced','wif','addr','viewkey']) +gtr = namedtuple('gen_tool_result', ['wif', 'addr', 'viewkey']) +sd = namedtuple('saved_data_item', ['reduced', 'wif', 'addr', 'viewkey']) sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:] diff --git a/test/hashfunc.py b/test/hashfunc.py index 993d465e..01703053 100755 --- a/test/hashfunc.py +++ b/test/hashfunc.py @@ -27,7 +27,7 @@ try: except ImportError: from test.include import test_init -from mmgen.util import msg,msg_r,die +from mmgen.util import msg, msg_r, die def green(s): return '\033[32;1m' + s + '\033[0m' @@ -39,33 +39,33 @@ class TestHashFunc: h = self.t_cls(b'foo') if h.H_init != self.H_ref: m = 'Generated constants H[] differ from reference value:\nReference:\n{}\nGenerated:\n{}' - die(3,m.format([hex(n) for n in self.H_ref],[hex(n) for n in h.H_init])) + die(3, m.format([hex(n) for n in self.H_ref], [hex(n) for n in h.H_init])) if h.K != self.K_ref: m = 'Generated constants K[] differ from reference value:\nReference:\n{}\nGenerated:\n{}' - die(3,m.format([hex(n) for n in self.K_ref],[hex(n) for n in h.K])) + die(3, m.format([hex(n) for n in self.K_ref], [hex(n) for n in h.K])) msg('OK') - def compare_hashes(self,data,chk=None): + def compare_hashes(self, data, chk=None): if chk is None: - chk = getattr(self.hashlib,self.desc)(data).hexdigest() + chk = getattr(self.hashlib, self.desc)(data).hexdigest() res = self.t_cls(data).hexdigest() if res != chk: m ='\nHashes do not match!\nReference {d}: {}\nMMGen {d}: {}' - die(3,m.format(chk,res,d=self.desc.upper())) + die(3, m.format(chk, res, d=self.desc.upper())) def test_ref(self): - for i,data in enumerate(self.vectors): + for i, data in enumerate(self.vectors): msg_r(f'\rTesting reference input data: {i+1:4}/{len(self.vectors)} ') self.compare_hashes(data.encode(), chk=self.vectors[data]) msg('OK') - def test_random(self,rounds): + def test_random(self, rounds): if not self.hashlib: return for i in range(rounds): - if i+1 in (1,rounds) or not (i+1) % 10: + if i+1 in (1, rounds) or not (i+1) % 10: msg_r(f'\rTesting random input data: {i+1:4}/{rounds} ') - dlen = int(getrand(4).hex(),16) >> 18 + dlen = int(getrand(4).hex(), 16) >> 18 self.compare_hashes(getrand(dlen)) msg('OK') @@ -120,16 +120,16 @@ class TestSha2(TestHashFunc): desc = 'sha2' def __init__(self): - from mmgen.sha2 import Sha256,Sha512 + from mmgen.sha2 import Sha256, Sha512 import hashlib - self.t_cls = { 'sha256':Sha256, 'sha512':Sha512 }[self.desc] + self.t_cls = {'sha256':Sha256, 'sha512':Sha512}[self.desc] self.hashlib = hashlib self.vectors = {k:None for k in TestKeccak.vectors} class TestSha256(TestSha2): desc = 'sha256' H_ref = ( - 0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19 ) + 0x6a09e667, 0xbb67ae85, 0x3c6ef372, 0xa54ff53a, 0x510e527f, 0x9b05688c, 0x1f83d9ab, 0x5be0cd19) K_ref = ( 0x428a2f98, 0x71374491, 0xb5c0fbcf, 0xe9b5dba5, 0x3956c25b, 0x59f111f1, 0x923f82a4, 0xab1c5ed5, 0xd807aa98, 0x12835b01, 0x243185be, 0x550c7dc3, 0x72be5d74, 0x80deb1fe, 0x9bdc06a7, 0xc19bf174, @@ -138,13 +138,13 @@ class TestSha256(TestSha2): 0x27b70a85, 0x2e1b2138, 0x4d2c6dfc, 0x53380d13, 0x650a7354, 0x766a0abb, 0x81c2c92e, 0x92722c85, 0xa2bfe8a1, 0xa81a664b, 0xc24b8b70, 0xc76c51a3, 0xd192e819, 0xd6990624, 0xf40e3585, 0x106aa070, 0x19a4c116, 0x1e376c08, 0x2748774c, 0x34b0bcb5, 0x391c0cb3, 0x4ed8aa4a, 0x5b9cca4f, 0x682e6ff3, - 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2 ) + 0x748f82ee, 0x78a5636f, 0x84c87814, 0x8cc70208, 0x90befffa, 0xa4506ceb, 0xbef9a3f7, 0xc67178f2) class TestSha512(TestSha2): desc = 'sha512' H_ref = ( 0x6a09e667f3bcc908, 0xbb67ae8584caa73b, 0x3c6ef372fe94f82b, 0xa54ff53a5f1d36f1, - 0x510e527fade682d1, 0x9b05688c2b3e6c1f, 0x1f83d9abfb41bd6b, 0x5be0cd19137e2179 ) + 0x510e527fade682d1, 0x9b05688c2b3e6c1f, 0x1f83d9abfb41bd6b, 0x5be0cd19137e2179) K_ref = ( 0x428a2f98d728ae22, 0x7137449123ef65cd, 0xb5c0fbcfec4d3b2f, 0xe9b5dba58189dbbc, 0x3956c25bf348b538, 0x59f111f1b605d019, 0x923f82a4af194f9b, 0xab1c5ed5da6d8118, 0xd807aa98a3030242, 0x12835b0145706fbe, @@ -161,21 +161,21 @@ class TestSha512(TestSha2): 0x90befffa23631e28, 0xa4506cebde82bde9, 0xbef9a3f7b2c67915, 0xc67178f2e372532b, 0xca273eceea26619c, 0xd186b8c721c0c207, 0xeada7dd6cde0eb1e, 0xf57d4f7fee6ed178, 0x06f067aa72176fba, 0x0a637dc5a2c898a6, 0x113f9804bef90dae, 0x1b710b35131c471b, 0x28db77f523047d84, 0x32caab7b40c72493, 0x3c9ebe0a15c9bebc, - 0x431d67c49c100d4c, 0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817 ) + 0x431d67c49c100d4c, 0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817) -from test.include.common import getrand,set_globals +from test.include.common import getrand, set_globals from mmgen.cfg import Config from mmgen.main import launch def main(): - if len(sys.argv) not in (2,3): - die(1,'Test takes 1 or 2 arguments: test name, plus optional rounds count') + if len(sys.argv) not in (2, 3): + die(1, 'Test takes 1 or 2 arguments: test name, plus optional rounds count') test = sys.argv[1].capitalize() - if test not in ('Sha256','Sha512','Keccak'): - die(1, "Valid choices for test are 'sha256','sha512' or 'keccak'") + if test not in ('Sha256', 'Sha512', 'Keccak'): + die(1, "Valid choices for test are 'sha256', 'sha512' or 'keccak'") random_rounds = int(sys.argv[2]) if len(sys.argv) == 3 else 500 diff --git a/test/objattrtest.py b/test/objattrtest.py index 41f3852f..b9cf6667 100755 --- a/test/objattrtest.py +++ b/test/objattrtest.py @@ -30,8 +30,8 @@ except ImportError: from test.include import test_init from mmgen.cfg import Config -from mmgen.util import msg,msg_r,gmsg,die -from mmgen.color import red,yellow,green,blue,purple,nocolor +from mmgen.util import msg, msg_r, gmsg, die +from mmgen.color import red, yellow, green, blue, purple, nocolor from mmgen.obj import ListItemAttr opts_data = { @@ -57,8 +57,8 @@ set_globals(cfg) from test.objattrtest_py_d.oat_common import sample_objs -pd = namedtuple('attr_bits', ['read_ok','delete_ok','reassign_ok','typeconv','set_none_ok']) -perm_bits = ('read_ok','delete_ok','reassign_ok') +pd = namedtuple('attr_bits', ['read_ok', 'delete_ok', 'reassign_ok', 'typeconv', 'set_none_ok']) +perm_bits = ('read_ok', 'delete_ok', 'reassign_ok') attr_dfls = { 'reassign_ok': False, 'delete_ok': False, @@ -75,54 +75,54 @@ def parse_attrbits(bits): bool(0b10000 & bits), # set_none ) -def get_descriptor_obj(objclass,attrname): - for o in (objclass,objclass.__bases__[0]): # assume there's only one base class +def get_descriptor_obj(objclass, attrname): + for o in (objclass, objclass.__bases__[0]): # assume there's only one base class if attrname in o.__dict__: return o.__dict__[attrname] - die(4,f'unable to find descriptor {objclass.__name__}.{attrname}') + die(4, f'unable to find descriptor {objclass.__name__}.{attrname}') -def test_attr_perm(obj,attrname,perm_name,perm_value,dobj,attrval_type): +def test_attr_perm(obj, attrname, perm_name, perm_value, dobj, attrval_type): class SampleObjError(Exception): pass - pname = perm_name.replace('_ok','') + pname = perm_name.replace('_ok', '') pstem = pname.rstrip('e') try: if perm_name == 'read_ok': # non-existent perm - getattr(obj,attrname) + getattr(obj, attrname) elif perm_name == 'reassign_ok': try: so = sample_objs[attrval_type.__name__] except Exception as e: raise SampleObjError(f'unable to find sample object of type {attrval_type.__name__!r}') from e # ListItemAttr allows setting an attribute if its value is None - if type(dobj) is ListItemAttr and getattr(obj,attrname) is None: - setattr(obj,attrname,so) - setattr(obj,attrname,so) + if type(dobj) is ListItemAttr and getattr(obj, attrname) is None: + setattr(obj, attrname, so) + setattr(obj, attrname, so) elif perm_name == 'delete_ok': - delattr(obj,attrname) + delattr(obj, attrname) except SampleObjError as e: - die(4,f'Test script error ({e})') + die(4, f'Test script error ({e})') except Exception as e: if perm_value is True: fs = '{!r}: unable to {} attribute {!r}, though {}ing is allowed ({})' - die(4,fs.format(type(obj).__name__,pname,attrname,pstem,e)) + die(4, fs.format(type(obj).__name__, pname, attrname, pstem, e)) else: if perm_value is False: fs = '{!r}: attribute {!r} is {n}able, though {n}ing is forbidden' - die(4,fs.format(type(obj).__name__,attrname,n=pstem)) + die(4, fs.format(type(obj).__name__, attrname, n=pstem)) -def test_attr(data,obj,attrname,dobj,bits,attrval_type): - if hasattr(obj,attrname): # TODO +def test_attr(data, obj, attrname, dobj, bits, attrval_type): + if hasattr(obj, attrname): # TODO td_attrval_type = data.attrs[attrname][1] - if attrval_type not in (td_attrval_type,type(None)): + if attrval_type not in (td_attrval_type, type(None)): fs = '\nattribute {!r} of {!r} instance has incorrect type {!r} (should be {!r})' - die(4,fs.format(attrname,type(obj).__name__,attrval_type.__name__,td_attrval_type.__name__)) + die(4, fs.format(attrname, type(obj).__name__, attrval_type.__name__, td_attrval_type.__name__)) - if hasattr(dobj,'__dict__'): + if hasattr(dobj, '__dict__'): d = dobj.__dict__ bits = bits._asdict() colors = { @@ -135,38 +135,38 @@ def test_attr(data,obj,attrname,dobj,bits,attrval_type): if k in d: if d[k] != bits[k]: fs = 'init value {iv}={a} for attr {n!r} does not match test data ({iv}={b})' - die(4,fs.format(iv=k,n=attrname,a=d[k],b=bits[k])) + die(4, fs.format(iv=k, n=attrname, a=d[k], b=bits[k])) if cfg.verbose and d[k] != attr_dfls[k]: msg_r(colors[k](f' {k}={d[k]!r}')) -def test_object(mod,test_data,objname): +def test_object(mod, test_data, objname): if '.' in objname: - on1,on2 = objname.split('.') - cls = getattr(getattr(mod,on1),on2) + on1, on2 = objname.split('.') + cls = getattr(getattr(mod, on1), on2) else: - cls = getattr(mod,objname) + cls = getattr(mod, objname) fs = 'Testing attribute ' + ('{!r:<15}{dt:13}' if cfg.show_descriptor_type else '{!r}') data = test_data[objname] - obj = cls(*data.args,**data.kwargs) + obj = cls(*data.args, **data.kwargs) - for attrname,adata in data.attrs.items(): - dobj = get_descriptor_obj(type(obj),attrname) + for attrname, adata in data.attrs.items(): + dobj = get_descriptor_obj(type(obj), attrname) if cfg.verbose: - msg_r(fs.format(attrname,dt=type(dobj).__name__.replace('MMGen',''))) + msg_r(fs.format(attrname, dt=type(dobj).__name__.replace('MMGen', ''))) bits = parse_attrbits(adata[0]) - test_attr(data,obj,attrname,dobj,bits,adata[1]) - for bit_name,bit_value in bits._asdict().items(): + test_attr(data, obj, attrname, dobj, bits, adata[1]) + for bit_name, bit_value in bits._asdict().items(): if bit_name in perm_bits: - test_attr_perm(obj,attrname,bit_name,bit_value,dobj,adata[1]) + test_attr_perm(obj, attrname, bit_name, bit_value, dobj, adata[1]) cfg._util.vmsg('') def do_loop(): import importlib modname = f'test.objattrtest_py_d.oat_{proto.coin.lower()}_{proto.network}' mod = importlib.import_module(modname) - test_data = getattr(mod,'tests') + test_data = getattr(mod, 'tests') gmsg(f'Running immutable attribute tests for {proto.coin} {proto.network}') utests = cfg._args @@ -174,7 +174,7 @@ def do_loop(): if utests and obj not in utests: continue msg((blue if cfg.verbose else nocolor)(f'Testing {obj}')) - test_object(mod,test_data,obj) + test_object(mod, test_data, obj) proto = cfg._proto diff --git a/test/objtest.py b/test/objtest.py index 48e21250..7b4b2038 100755 --- a/test/objtest.py +++ b/test/objtest.py @@ -20,7 +20,7 @@ test/objtest.py: Test MMGen data objects """ -import os,re +import os, re try: from include import test_init @@ -35,8 +35,8 @@ if not os.getenv('MMGEN_DEVTOOLS'): init_dev() from mmgen.cfg import Config -from mmgen.util import msg,msg_r,gmsg,capfirst,die -from mmgen.color import red,yellow,blue,green,orange,purple,gray,nocolor +from mmgen.util import msg, msg_r, gmsg, capfirst, die +from mmgen.color import red, yellow, blue, green, orange, purple, gray, nocolor from mmgen.obj import get_obj opts_data = { @@ -64,14 +64,14 @@ if cfg.verbose: from test.include.common import set_globals set_globals(cfg) -def run_test(mod,test,arg,input_data,arg1,exc_name): +def run_test(mod, test, arg, input_data, arg1, exc_name): arg_copy = arg kwargs = {} ret_chk = arg ret_idx = None - if input_data == 'good' and isinstance(arg,tuple): - arg,ret_chk = arg - if isinstance(arg,dict): # pass one arg + kwargs to constructor + if input_data == 'good' and isinstance(arg, tuple): + arg, ret_chk = arg + if isinstance(arg, dict): # pass one arg + kwargs to constructor arg_copy = arg.copy() if 'arg' in arg: args = [arg['arg']] @@ -93,7 +93,7 @@ def run_test(mod,test,arg,input_data,arg1,exc_name): del arg['ret_idx'] del arg_copy['ret_idx'] kwargs.update(arg) - elif isinstance(arg,tuple): + elif isinstance(arg, tuple): args = arg else: args = [arg] @@ -107,23 +107,23 @@ def run_test(mod,test,arg,input_data,arg1,exc_name): try: if not cfg.super_silent: - arg_disp = repr(arg_copy[0] if isinstance(arg_copy,tuple) else arg_copy) - if cfg.test_suite_deterministic and isinstance(arg_copy,dict): - arg_disp = re.sub(r'object at 0x[0-9a-f]+','object at [SCRUBBED]',arg_disp) + arg_disp = repr(arg_copy[0] if isinstance(arg_copy, tuple) else arg_copy) + if cfg.test_suite_deterministic and isinstance(arg_copy, dict): + arg_disp = re.sub(r'object at 0x[0-9a-f]+', 'object at [SCRUBBED]', arg_disp) msg_r((green if input_data=='good' else orange)(f'{arg_disp+":":<22}')) - cls = getattr(mod,test) + cls = getattr(mod, test) if cfg.getobj: - ret = get_obj(getattr(mod,test),**kwargs) + ret = get_obj(getattr(mod, test), **kwargs) else: - ret = cls(*args,**kwargs) + ret = cls(*args, **kwargs) - bad_ret = [] if issubclass(cls,list) else None + bad_ret = [] if issubclass(cls, list) else None - if isinstance(ret_chk,str): + if isinstance(ret_chk, str): ret_chk = ret_chk.encode() - if isinstance(ret,str): + if isinstance(ret, str): ret = ret.encode() if cfg.getobj: @@ -154,12 +154,12 @@ def run_test(mod,test,arg,input_data,arg1,exc_name): ret_disp = ret msg(f'==> {ret_disp!r}') - if cfg.verbose and issubclass(cls,MMGenObject): - ret.pmsg() if hasattr(ret,'pmsg') else pmsg(ret) + if cfg.verbose and issubclass(cls, MMGenObject): + ret.pmsg() if hasattr(ret, 'pmsg') else pmsg(ret) except UserWarning as e: msg(f'==> {ret!r}') - die(2,red(str(e))) + die(2, red(str(e))) except Exception as e: if input_data == 'good': raise ValueError(f'Error on good input data: {e}') from e @@ -182,7 +182,7 @@ def do_loop(): import importlib modname = f'test.objtest_py_d.ot_{proto.coin.lower()}_{proto.network}' mod = importlib.import_module(modname) - test_data = getattr(mod,'tests') + test_data = getattr(mod, 'tests') gmsg(f'Running data object tests for {proto.coin} {proto.network}') clr = None @@ -191,8 +191,8 @@ def do_loop(): arg1 = test_data[test].get('arg1') if utests and test not in utests: continue - nl = ('\n','')[bool(cfg.super_silent) or clr is None] - clr = (blue,nocolor)[bool(cfg.super_silent)] + nl = ('\n', '')[bool(cfg.super_silent) or clr is None] + clr = (blue, nocolor)[bool(cfg.super_silent)] if cfg.getobj and arg1 is None: msg(gray(f'{nl}Skipping {test}')) @@ -200,7 +200,7 @@ def do_loop(): msg(clr(f'{nl}Testing {test}')) - for k in ('bad','good'): + for k in ('bad', 'good'): if not cfg.super_silent: msg(purple(capfirst(k)+' input:')) for arg in test_data[test][k]: @@ -210,8 +210,7 @@ def do_loop(): arg, input_data = k, arg1 = arg1, - exc_name = test_data[test].get('exc_name') or ('ObjectInitError','None')[k=='good'], - ) + exc_name = test_data[test].get('exc_name') or ('ObjectInitError', 'None')[k=='good']) proto = cfg._proto diff --git a/test/scrambletest.py b/test/scrambletest.py index 44d1c0c8..78f8f917 100755 --- a/test/scrambletest.py +++ b/test/scrambletest.py @@ -49,7 +49,7 @@ opts_data = { -v, --verbose Produce more verbose output """, 'notes': """ -Valid commands: 'coin','pw' +Valid commands: 'coin', 'pw' If no command is given, the whole suite of tests is run. """ } @@ -109,7 +109,7 @@ def make_cmd(progname, opts, add_opts, args): def run_cmd(cmd): cp = run(cmd, stdout=PIPE, stderr=PIPE, text=True, env=run_env) if cp.returncode != 0: - die(2,f'\nSpawned program exited with error code {cp.returncode}:\n{cp.stderr}') + die(2, f'\nSpawned program exited with error code {cp.returncode}:\n{cp.stderr}') return cp.stdout.splitlines() def run_test(progname, opts, add_opts, args, test_data, addr_desc, opts_w): @@ -121,15 +121,15 @@ def run_test(progname, opts, add_opts, args, test_data, addr_desc, opts_w): lines = run_cmd(cmd) cmd_out = dict([e[9:].split(': ') for e in lines if e.startswith('sc_debug_')]) - cmd_out['addr'] = lines[-2].split(None,1)[-1] + cmd_out['addr'] = lines[-2].split(None, 1)[-1] ref_data = test_data._asdict() for k in ref_data: if cmd_out[k] == ref_data[k]: - s = k.replace('seed','seed[:8]').replace('addr',addr_desc) + s = k.replace('seed', 'seed[:8]').replace('addr', addr_desc) cfg._util.vmsg(f' {s:9}: {cmd_out[k]}') else: - die(4,f'\nError: sc_{k} value {cmd_out[k]} does not match reference value {ref_data[k]}') + die(4, f'\nError: sc_{k} value {cmd_out[k]} does not match reference value {ref_data[k]}') msg(green('OK') if cfg.verbose else 'OK') def make_coin_test_data(): @@ -143,7 +143,7 @@ def make_coin_test_data(): opts = list_gen( [f'--coin={coin}'], [f'--type={mmtype}', mmtype], - [ '--cashaddr=0', coin == 'bch'] + ['--cashaddr=0', coin == 'bch'] ) yield ('mmgen-addrgen', opts, [], [], test_data, 'address') diff --git a/test/tooltest.py b/test/tooltest.py index 87ec8f25..50d7941b 100755 --- a/test/tooltest.py +++ b/test/tooltest.py @@ -20,8 +20,8 @@ test/tooltest.py: Tests for the 'mmgen-tool' utility """ -import sys,os,time -from subprocess import run,PIPE +import sys, os, time +from subprocess import run, PIPE try: from include.test_init import repo_root @@ -29,8 +29,8 @@ except ImportError: from test.include.test_init import repo_root from mmgen.cfg import Config -from mmgen.color import red,yellow,green,blue,cyan -from mmgen.util import msg,msg_r,Msg,die +from mmgen.color import red, yellow, green, blue, cyan +from mmgen.util import msg, msg_r, Msg, die opts_data = { 'text': { @@ -78,28 +78,28 @@ vmsg = cfg._util.vmsg proto = cfg._proto -assert cfg.type in (None,'zcash_z'), 'Only zcash-z permitted for --type argument' +assert cfg.type in (None, 'zcash_z'), 'Only zcash-z permitted for --type argument' cmd_data = { 'cryptocoin': { 'desc': 'Cryptocoin address/key commands', 'cmd_data': { 'randwif': (), - 'randpair': (), # create 4 pairs: uncomp,comp,segwit,bech32 - 'wif2addr': ('randpair','o4'), - 'wif2hex': ('randpair','o4'), - 'privhex2pubhex': ('wif2hex','o3'), # segwit only - 'pubhex2addr': ('privhex2pubhex','o3'), # segwit only - 'hex2wif': ('wif2hex','io2'), # uncomp, comp - 'addr2pubhash': ('randpair','o4'), # uncomp, comp, bech32 - 'pubhash2addr': ('addr2pubhash','io4'), # uncomp, comp, bech32 + 'randpair': (), # create 4 pairs: uncomp, comp, segwit, bech32 + 'wif2addr': ('randpair', 'o4'), + 'wif2hex': ('randpair', 'o4'), + 'privhex2pubhex': ('wif2hex', 'o3'), # segwit only + 'pubhex2addr': ('privhex2pubhex', 'o3'), # segwit only + 'hex2wif': ('wif2hex', 'io2'), # uncomp, comp + 'addr2pubhash': ('randpair', 'o4'), # uncomp, comp, bech32 + 'pubhash2addr': ('addr2pubhash', 'io4'), # uncomp, comp, bech32 }, }, 'mnemonic': { 'desc': 'mnemonic commands', 'cmd_data': { 'hex2mn': (), - 'mn2hex': ('hex2mn','io3'), + 'mn2hex': ('hex2mn', 'io3'), 'mn_rand128': (), 'mn_rand192': (), 'mn_rand256': (), @@ -109,13 +109,13 @@ cmd_data = { }, } -if proto.coin in ('BTC','LTC'): +if proto.coin in ('BTC', 'LTC'): cmd_data['cryptocoin']['cmd_data'].update({ - 'pubhex2redeem_script': ('privhex2pubhex','o3'), - 'wif2redeem_script': ('randpair','o3'), - 'wif2segwit_pair': ('randpair','o2'), - 'privhex2addr': ('wif2hex','o4'), # compare with output of randpair - 'pipetest': ('randpair','o3') + 'pubhex2redeem_script': ('privhex2pubhex', 'o3'), + 'wif2redeem_script': ('randpair', 'o3'), + 'wif2segwit_pair': ('randpair', 'o2'), + 'privhex2addr': ('wif2hex', 'o4'), # compare with output of randpair + 'pipetest': ('randpair', 'o3') }) if proto.coin == 'XMR' or cfg.type == 'zcash_z': @@ -146,47 +146,48 @@ tcfg = { ref_subdir = '' if proto.base_coin == 'BTC' else proto.name.lower() altcoin_pfx = '' if proto.base_coin == 'BTC' else '-'+proto.base_coin -tn_ext = ('','.testnet')[proto.testnet] +tn_ext = ('', '.testnet')[proto.testnet] spawn_cmd = [ 'scripts/exec_wrapper.py', - os.path.relpath(os.path.join(repo_root,'cmds','mmgen-tool')) ] + os.path.relpath(os.path.join(repo_root, 'cmds', 'mmgen-tool'))] if cfg.coverage: - d,f = init_coverage() - spawn_cmd = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f] + spawn_cmd + d, f = init_coverage() + spawn_cmd = ['python3', '-m', 'trace', '--count', '--coverdir='+d, '--file='+f] + spawn_cmd elif sys.platform == 'win32': spawn_cmd = ['python3'] + spawn_cmd add_spawn_args = ['--data-dir='+tcfg['tmpdir']] + ['--{}{}'.format( - k.replace('_','-'), - '='+getattr(cfg,k) if getattr(cfg,k) is not True else '') - for k in ('testnet','rpc_host','regtest','coin','type') if getattr(cfg,k)] + k.replace('_', '-'), + '='+getattr(cfg, k) if getattr(cfg, k) is not True else '') + for k in ('testnet', 'rpc_host', 'regtest', 'coin', 'type') if getattr(cfg, k)] if cfg.list_cmds: fs = ' {:<{w}} - {}' Msg('Available commands:') - w = max(map(len,cmd_data)) + w = max(map(len, cmd_data)) for cmd in cmd_data: - Msg(fs.format(cmd,cmd_data[cmd]['desc'],w=w)) + Msg(fs.format(cmd, cmd_data[cmd]['desc'], w=w)) Msg('\nAvailable utilities:') - Msg(fs.format('clean','Clean the tmp directory',w=w)) + Msg(fs.format('clean', 'Clean the tmp directory', w=w)) sys.exit(0) + if cfg.testing_status: tested_in = { 'tooltest.py': [], 'cmdtest.py': ( - 'encrypt','decrypt','find_incog_data', - 'addrfile_chksum','keyaddrfile_chksum','passwdfile_chksum', - 'add_label','remove_label','remove_address','twview', - 'getbalance','listaddresses','listaddress', - 'daemon_version','decrypt_keystore','decrypt_geth_keystore', - 'mn2hex_interactive','rand2file', - 'rescan_address','rescan_blockchain','resolve_address', - 'twexport','twimport','txhist' + 'encrypt', 'decrypt', 'find_incog_data', + 'addrfile_chksum', 'keyaddrfile_chksum', 'passwdfile_chksum', + 'add_label', 'remove_label', 'remove_address', 'twview', + 'getbalance', 'listaddresses', 'listaddress', + 'daemon_version', 'decrypt_keystore', 'decrypt_geth_keystore', + 'mn2hex_interactive', 'rand2file', + 'rescan_address', 'rescan_blockchain', 'resolve_address', + 'twexport', 'twimport', 'txhist' ), 'tooltest2.py': run( - ['python3','test/tooltest2.py','--list-tested-cmds'], + ['python3', 'test/tooltest2.py', '--list-tested-cmds'], stdout = PIPE, check = True ).stdout.decode().split() @@ -195,7 +196,7 @@ if cfg.testing_status: tested_in['tooltest.py'] += list(v['cmd_data'].keys()) Msg(green("Testing status of 'mmgen-tool' commands:")) - for l in ('tooltest.py','tooltest2.py','cmdtest.py'): + for l in ('tooltest.py', 'tooltest2.py', 'cmdtest.py'): Msg('\n ' + blue(l+':')) Msg(' '+'\n '.join(sorted(tested_in[l]))) @@ -209,34 +210,46 @@ if cfg.testing_status: set(tested_in['cmdtest.py']) ) if uc: - Msg(yellow('\n {}\n {}'.format('Untested commands:','\n '.join(uc)))) + Msg(yellow('\n {}\n {}'.format('Untested commands:', '\n '.join(uc)))) sys.exit(0) from mmgen.key import is_wif from mmgen.addr import is_coin_addr + def is_wif_loc(s): - return is_wif(proto,s) + return is_wif(proto, s) + def is_coin_addr_loc(s): - return is_coin_addr(proto,s) + return is_coin_addr(proto, s) msg_w = 35 + def test_msg(m): msg_r(green(f'Testing {m}\n') if cfg.verbose else '{:{w}}'.format(f'Testing {m}', w=msg_w+8)) -compressed = cfg.type or ('','compressed')['C' in proto.mmtypes] -segwit = ('','segwit')['S' in proto.mmtypes] -bech32 = ('','bech32')['B' in proto.mmtypes] -type_compressed_arg = ([],['--type=' + (cfg.type or 'compressed')])[bool(cfg.type) or 'C' in proto.mmtypes] -type_segwit_arg = ([],['--type=segwit'])['S' in proto.mmtypes] -type_bech32_arg = ([],['--type=bech32'])['B' in proto.mmtypes] +compressed = cfg.type or ('', 'compressed')['C' in proto.mmtypes] +segwit = ('', 'segwit')['S' in proto.mmtypes] +bech32 = ('', 'bech32')['B' in proto.mmtypes] +type_compressed_arg = ([], ['--type=' + (cfg.type or 'compressed')])[bool(cfg.type) or 'C' in proto.mmtypes] +type_segwit_arg = ([], ['--type=segwit'])['S' in proto.mmtypes] +type_bech32_arg = ([], ['--type=bech32'])['B' in proto.mmtypes] class MMGenToolTestUtils: - def run_cmd(self,name,tool_args,kwargs='',extra_msg='',silent=False,strip=True,add_opts=[],binary=False): + def run_cmd( + self, + name, + tool_args, + kwargs = '', + extra_msg = '', + silent = False, + strip = True, + add_opts = [], + binary = False): sys_cmd = ( spawn_cmd + add_spawn_args + - ['-r0','-d',tcfg['tmpdir']] + + ['-r0', '-d', tcfg['tmpdir']] + add_opts + [name.lower()] + tool_args + @@ -250,9 +263,9 @@ class MMGenToolTestUtils: sys.stderr.write(green(f'Testing {full_name}\nExecuting ')) sys.stderr.write(cyan(' '.join(sys_cmd)+'\n')) else: - msg_r('Testing {:{w}}'.format( full_name+':', w=msg_w )) + msg_r('Testing {:{w}}'.format(full_name+':', w=msg_w)) - cp = run(sys_cmd,stdout=PIPE,stderr=PIPE) + cp = run(sys_cmd, stdout=PIPE, stderr=PIPE) out = cp.stdout err = cp.stderr if cfg.debug: @@ -267,65 +280,79 @@ class MMGenToolTestUtils: msg('{}\n{}\n{}'.format( red('FAILED'), yellow('Command stderr output:'), - err.decode() )) - die(2,f'Called process returned with an error (retcode {cp.returncode})') - return (out,out.rstrip())[bool(strip)] + err.decode())) + die(2, f'Called process returned with an error (retcode {cp.returncode})') + return (out, out.rstrip())[bool(strip)] - def run_cmd_chk(self,name,f1,f2,kwargs='',extra_msg='',strip_hex=False,add_opts=[]): + def run_cmd_chk(self, name, f1, f2, kwargs='', extra_msg='', strip_hex=False, add_opts=[]): idata = read_from_file(f1).rstrip() odata = read_from_file(f2).rstrip() - ret = self.run_cmd(name,[odata],kwargs=kwargs,extra_msg=extra_msg,add_opts=add_opts) + ret = self.run_cmd(name, [odata], kwargs=kwargs, extra_msg=extra_msg, add_opts=add_opts) vmsg('In: ' + repr(odata)) vmsg('Out: ' + repr(ret)) - def cmp_equal(a,b): + def cmp_equal(a, b): return (a.lstrip('0') == b.lstrip('0')) if strip_hex else (a == b) - if cmp_equal(ret,idata): + if cmp_equal(ret, idata): ok() else: die(4, f"Error: values don't match:\nIn: {idata!r}\nOut: {ret!r}") return ret - def run_cmd_nochk(self,name,f1,kwargs='',add_opts=[]): + def run_cmd_nochk(self, name, f1, kwargs='', add_opts=[]): odata = read_from_file(f1).rstrip() - ret = self.run_cmd(name,[odata],kwargs=kwargs,add_opts=add_opts) + ret = self.run_cmd(name, [odata], kwargs=kwargs, add_opts=add_opts) vmsg('In: ' + repr(odata)) vmsg('Out: ' + repr(ret)) return ret - def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='', - literal=False,chkdata='',hush=False,add_opts=[]): + def run_cmd_out( + self, + name, + carg = None, + Return = False, + kwargs = '', + fn_idx = '', + extra_msg = '', + literal = False, + chkdata = '', + hush = False, + add_opts = []): if carg: - write_to_tmpfile(tcfg,f'{name}{fn_idx}.in',carg+'\n') - ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs, - extra_msg=extra_msg,add_opts=add_opts) + write_to_tmpfile(tcfg, f'{name}{fn_idx}.in', carg+'\n') + ret = self.run_cmd( + name, + ([], [carg])[bool(carg)], + kwargs = kwargs, + extra_msg = extra_msg, + add_opts = add_opts) if carg: vmsg('In: ' + repr(carg)) - vmsg('Out: ' + (repr(ret),ret)[literal]) + vmsg('Out: ' + (repr(ret), ret)[literal]) if ret or ret == '': - write_to_tmpfile(tcfg,f'{name}{fn_idx}.out',ret+'\n') + write_to_tmpfile(tcfg, f'{name}{fn_idx}.out', ret+'\n') if chkdata: - cmp_or_die(ret,chkdata) + cmp_or_die(ret, chkdata) return if Return: return ret elif not hush: ok() else: - die(4,f'Error for command {name!r}') + die(4, f'Error for command {name!r}') - def run_cmd_randinput(self,name,strip=True,add_opts=[]): + def run_cmd_randinput(self, name, strip=True, add_opts=[]): s = getrand(128) fn = name+'.in' - write_to_tmpfile(tcfg,fn,s,binary=True) - ret = self.run_cmd(name,[get_tmpfile(tcfg,fn)],strip=strip,add_opts=add_opts) + write_to_tmpfile(tcfg, fn, s, binary=True) + ret = self.run_cmd(name, [get_tmpfile(tcfg, fn)], strip=strip, add_opts=add_opts) fn = name+'.out' - write_to_tmpfile(tcfg,fn,ret+'\n') + write_to_tmpfile(tcfg, fn, ret+'\n') ok() vmsg(f'Returned: {ret}') tu = MMGenToolTestUtils() -def ok_or_die(val,chk_func,s,skip_ok=False): +def ok_or_die(val, chk_func, s, skip_ok=False): try: ret = chk_func(val) except: @@ -334,131 +361,131 @@ def ok_or_die(val,chk_func,s,skip_ok=False): if not skip_ok: ok() else: - die(4,f'Returned value {val!r} is not a {s}') + die(4, f'Returned value {val!r} is not a {s}') class MMGenToolTestCmds: # Cryptocoin - def randwif(self,name): - for n,k in enumerate(['',compressed]): + def randwif(self, name): + for n, k in enumerate(['', compressed]): ao = ['--type='+k] if k else [] - ret = tu.run_cmd_out(name,add_opts=ao,Return=True,fn_idx=n+1) - ok_or_die(ret,is_wif_loc,'WIF key') - def randpair(self,name): - for n,k in enumerate(['',compressed,segwit,bech32]): + ret = tu.run_cmd_out(name, add_opts=ao, Return=True, fn_idx=n+1) + ok_or_die(ret, is_wif_loc, 'WIF key') + def randpair(self, name): + for n, k in enumerate(['', compressed, segwit, bech32]): ao = ['--type='+k] if k else [] - wif,addr = tu.run_cmd_out(name,add_opts=ao,Return=True,fn_idx=n+1,literal=True).split() - ok_or_die(wif,is_wif_loc,'WIF key',skip_ok=True) - ok_or_die(addr,is_coin_addr_loc,'Coin address') - def wif2addr(self,name,f1,f2,f3,f4): - for n,f,k in ( - (1,f1,''), - (2,f2,compressed), - (3,f3,segwit), - (4,f4,bech32) - ): + wif, addr = tu.run_cmd_out(name, add_opts=ao, Return=True, fn_idx=n+1, literal=True).split() + ok_or_die(wif, is_wif_loc, 'WIF key', skip_ok=True) + ok_or_die(addr, is_coin_addr_loc, 'Coin address') + def wif2addr(self, name, f1, f2, f3, f4): + for n, f, k in ( + (1, f1, ''), + (2, f2, compressed), + (3, f3, segwit), + (4, f4, bech32)): ao = ['--type='+k] if k else [] wif = read_from_file(f).split()[0] - tu.run_cmd_out(name,wif,add_opts=ao,fn_idx=n) - def wif2hex(self,name,f1,f2,f3,f4): - for n,f,m in ( - (1,f1,''), - (2,f2,compressed), - (3,f3,'{} for {}'.format( compressed or 'uncompressed', segwit or 'p2pkh' )), - (4,f4,'{} for {}'.format( compressed or 'uncompressed', bech32 or 'p2pkh' )) - ): + tu.run_cmd_out(name, wif, add_opts=ao, fn_idx=n) + def wif2hex(self, name, f1, f2, f3, f4): + for n, f, m in ( + (1, f1, ''), + (2, f2, compressed), + (3, f3, '{} for {}'.format(compressed or 'uncompressed', segwit or 'p2pkh')), + (4, f4, '{} for {}'.format(compressed or 'uncompressed', bech32 or 'p2pkh'))): wif = read_from_file(f).split()[0] - tu.run_cmd_out(name,wif,fn_idx=n,extra_msg=m) - def privhex2addr(self,name,f1,f2,f3,f4): - keys = [read_from_file(f).rstrip() for f in (f1,f2,f3,f4)] - for n,k in enumerate(('',compressed,segwit,bech32)): + tu.run_cmd_out(name, wif, fn_idx=n, extra_msg=m) + def privhex2addr(self, name, f1, f2, f3, f4): + keys = [read_from_file(f).rstrip() for f in (f1, f2, f3, f4)] + for n, k in enumerate(('', compressed, segwit, bech32)): ao = ['--type='+k] if k else [] - ret = tu.run_cmd(name,[keys[n]],add_opts=ao).rstrip() - iaddr = read_from_tmpfile(tcfg,f'randpair{n+1}.out').split()[-1] + ret = tu.run_cmd(name, [keys[n]], add_opts=ao).rstrip() + iaddr = read_from_tmpfile(tcfg, f'randpair{n+1}.out').split()[-1] vmsg(f'Out: {ret}') - cmp_or_die(iaddr,ret) + cmp_or_die(iaddr, ret) ok() - def hex2wif(self,name,f1,f2,f3,f4): - for fi,fo,k in ( - (f1,f2,''), - (f3,f4,compressed)): + def hex2wif(self, name, f1, f2, f3, f4): + for fi, fo, k in ( + (f1, f2, ''), + (f3, f4, compressed)): ao = ['--type='+k] if k else [] - tu.run_cmd_chk(name,fi,fo,add_opts=ao) - def addr2pubhash(self,name,f1,f2,f3,f4): - for n,f,m,ao in ( - (1,f1,'',[]), - (2,f2,'from {}'.format( compressed or 'uncompressed' ),[]), - (4,f4,'',type_bech32_arg), - ): + tu.run_cmd_chk(name, fi, fo, add_opts=ao) + def addr2pubhash(self, name, f1, f2, f3, f4): + for n, f, m, ao in ( + (1, f1, '', []), + (2, f2, 'from {}'.format(compressed or 'uncompressed'), []), + (4, f4, '', type_bech32_arg)): addr = read_from_file(f).split()[-1] - tu.run_cmd_out(name,addr,fn_idx=n,add_opts=ao,extra_msg=m) - def pubhash2addr(self,name,f1,f2,f3,f4,f5,f6,f7,f8): - for _,fi,fo,m,ao in ( - (1,f1,f2,'',[]), - (2,f3,f4,'from {}'.format( compressed or 'uncompressed' ),[]), - (4,f7,f8,'',type_bech32_arg) - ): - tu.run_cmd_chk(name,fi,fo,add_opts=ao,extra_msg=m) - def privhex2pubhex(self,name,f1,f2,f3): # from Hex2wif + tu.run_cmd_out(name, addr, fn_idx=n, add_opts=ao, extra_msg=m) + def pubhash2addr(self, name, f1, f2, f3, f4, f5, f6, f7, f8): + for _, fi, fo, m, ao in ( + (1, f1, f2, '', []), + (2, f3, f4, 'from {}'.format(compressed or 'uncompressed'), []), + (4, f7, f8, '', type_bech32_arg)): + tu.run_cmd_chk(name, fi, fo, add_opts=ao, extra_msg=m) + def privhex2pubhex(self, name, f1, f2, f3): # from Hex2wif addr = read_from_file(f3).strip() - tu.run_cmd_out(name,addr,add_opts=type_compressed_arg,fn_idx=3) # what about uncompressed? - def pubhex2redeem_script(self,name,f1,f2,f3): # from above + tu.run_cmd_out(name, addr, add_opts=type_compressed_arg, fn_idx=3) # what about uncompressed? + def pubhex2redeem_script(self, name, f1, f2, f3): # from above addr = read_from_file(f3).strip() - tu.run_cmd_out(name,addr,add_opts=type_segwit_arg,fn_idx=3) - rs = read_from_tmpfile(tcfg,'privhex2pubhex3.out').strip() - tu.run_cmd_out('pubhex2addr',rs,add_opts=type_segwit_arg,fn_idx=3,hush=True) - addr1 = read_from_tmpfile(tcfg,'pubhex2addr3.out').strip() - addr2 = read_from_tmpfile(tcfg,'randpair3.out').split()[1] - cmp_or_die(addr1,addr2) + tu.run_cmd_out(name, addr, add_opts=type_segwit_arg, fn_idx=3) + rs = read_from_tmpfile(tcfg, 'privhex2pubhex3.out').strip() + tu.run_cmd_out('pubhex2addr', rs, add_opts=type_segwit_arg, fn_idx=3, hush=True) + addr1 = read_from_tmpfile(tcfg, 'pubhex2addr3.out').strip() + addr2 = read_from_tmpfile(tcfg, 'randpair3.out').split()[1] + cmp_or_die(addr1, addr2) ok() - def wif2redeem_script(self,name,f1,f2,f3): # compare output with above + def wif2redeem_script(self, name, f1, f2, f3): # compare output with above wif = read_from_file(f3).split()[0] - ret1 = tu.run_cmd_out(name,wif,add_opts=type_segwit_arg,fn_idx=3,Return=True) - ret2 = read_from_tmpfile(tcfg,'pubhex2redeem_script3.out').strip() - cmp_or_die(ret1,ret2) + ret1 = tu.run_cmd_out(name, wif, add_opts=type_segwit_arg, fn_idx=3, Return=True) + ret2 = read_from_tmpfile(tcfg, 'pubhex2redeem_script3.out').strip() + cmp_or_die(ret1, ret2) ok() - def wif2segwit_pair(self,name,f1,f2): # does its own checking, so just run + def wif2segwit_pair(self, name, f1, f2): # does its own checking, so just run wif = read_from_file(f2).split()[0] - tu.run_cmd_out(name,wif,add_opts=type_segwit_arg,fn_idx=2) - def pubhex2addr(self,name,f1,f2,f3): + tu.run_cmd_out(name, wif, add_opts=type_segwit_arg, fn_idx=2) + def pubhex2addr(self, name, f1, f2, f3): addr = read_from_file(f3).strip() - tu.run_cmd_out(name,addr,add_opts=type_segwit_arg,fn_idx=3) + tu.run_cmd_out(name, addr, add_opts=type_segwit_arg, fn_idx=3) - def pipetest(self,name,f1,f2,f3): + def pipetest(self, name, f1, f2, f3): wif = read_from_file(f3).split()[0] - cmd = ( '{c} {a} wif2hex {wif}' + - ' | {c} {a} --type=compressed privhex2pubhex -' + - ' | {c} {a} --type=segwit pubhex2redeem_script -' + - ' | {c} {a} --type=segwit redeem_script2addr -').format( - c=' '.join(spawn_cmd), - a=' '.join(add_spawn_args), - wif=wif) + cmd = ( + '{c} {a} wif2hex {wif}' + + ' | {c} {a} --type=compressed privhex2pubhex -' + + ' | {c} {a} --type=segwit pubhex2redeem_script -' + + ' | {c} {a} --type=segwit redeem_script2addr -').format( + c = ' '.join(spawn_cmd), + a = ' '.join(add_spawn_args), + wif = wif) test_msg('command piping') if cfg.verbose: sys.stderr.write(green('Executing ') + cyan(cmd) + '\n') - res = run(cmd,stdout=PIPE,shell=True).stdout.decode().strip() - addr = read_from_tmpfile(tcfg,'wif2addr3.out').strip() - cmp_or_die(addr,res) + res = run(cmd, stdout=PIPE, shell=True).stdout.decode().strip() + addr = read_from_tmpfile(tcfg, 'wif2addr3.out').strip() + cmp_or_die(addr, res) ok() # Mnemonic - def hex2mn(self,name): - for n,size,m in ((1,16,'128-bit'),(2,24,'192-bit'),(3,32,'256-bit')): + def hex2mn(self, name): + for n, size, m in ( + (1, 16, '128-bit'), + (2, 24, '192-bit'), + (3, 32, '256-bit')): hexnum = getrandhex(size) - tu.run_cmd_out(name,hexnum,fn_idx=n,extra_msg=m) - def mn2hex(self,name,f1,f2,f3,f4,f5,f6): - for f_i,f_o,m in ((f1,f2,'128-bit'),(f3,f4,'192-bit'),(f5,f6,'256-bit')): - tu.run_cmd_chk(name,f_i,f_o,extra_msg=m,strip_hex=True) - def mn_rand128(self,name): + tu.run_cmd_out(name, hexnum, fn_idx=n, extra_msg=m) + def mn2hex(self, name, f1, f2, f3, f4, f5, f6): + for f_i, f_o, m in ((f1, f2, '128-bit'), (f3, f4, '192-bit'), (f5, f6, '256-bit')): + tu.run_cmd_chk(name, f_i, f_o, extra_msg=m, strip_hex=True) + def mn_rand128(self, name): tu.run_cmd_out(name) - def mn_rand192(self,name): + def mn_rand192(self, name): tu.run_cmd_out(name) - def mn_rand256(self,name): + def mn_rand256(self, name): tu.run_cmd_out(name) - def mn_stats(self,name): + def mn_stats(self, name): tu.run_cmd_out(name) - def mn_printlist(self,name): - tu.run_cmd(name,[]) + def mn_printlist(self, name): + tu.run_cmd(name, []) ok() # main() @@ -468,8 +495,8 @@ mk_tmpdir(tcfg['tmpdir']) def gen_deps_for_cmd(cdata): fns = [] if cdata: - name,code = cdata - io,count = (code[:-1],int(code[-1])) if code[-1] in '0123456789' else (code,1) + name, code = cdata + io, count = (code[:-1], int(code[-1])) if code[-1] in '0123456789' else (code, 1) for c in range(count): fns += ['{}{}{}'.format( name, @@ -483,27 +510,27 @@ def do_cmds(cmd_group): gdata = cmd_data[cmd_group]['cmd_data'] for cmd in gdata: fns = gen_deps_for_cmd(gdata[cmd]) - cmdline = [cmd] + [os.path.join(tcfg['tmpdir'],fn) for fn in fns] - getattr(tc,cmd)(*cmdline) + cmdline = [cmd] + [os.path.join(tcfg['tmpdir'], fn) for fn in fns] + getattr(tc, cmd)(*cmdline) def main(): if cfg._args: if len(cfg._args) != 1: - die(1,'Only one command may be specified') + die(1, 'Only one command may be specified') cmd = cfg._args[0] if cmd in cmd_data: - cleandir(tcfg['tmpdir'],do_msg=True) - msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] )) + cleandir(tcfg['tmpdir'], do_msg=True) + msg('Running tests for {}:'.format(cmd_data[cmd]['desc'])) do_cmds(cmd) elif cmd == 'clean': - cleandir(tcfg['tmpdir'],do_msg=True) + cleandir(tcfg['tmpdir'], do_msg=True) sys.exit(0) else: - die(1,f'{cmd!r}: unrecognized command') + die(1, f'{cmd!r}: unrecognized command') else: - cleandir(tcfg['tmpdir'],do_msg=True) + cleandir(tcfg['tmpdir'], do_msg=True) for cmd in cmd_data: - msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] )) + msg('Running tests for {}:'.format(cmd_data[cmd]['desc'])) do_cmds(cmd) if cmd is not list(cmd_data.keys())[-1]: msg('') diff --git a/test/tooltest2.py b/test/tooltest2.py index 35b4e376..b3af9732 100755 --- a/test/tooltest2.py +++ b/test/tooltest2.py @@ -24,22 +24,22 @@ test/tooltest2.py: Test the 'mmgen-tool' utility # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?) import sys, os, time, importlib, asyncio -from subprocess import run,PIPE +from subprocess import run, PIPE try: from include import test_init except ImportError: from test.include import test_init -from test.include.common import set_globals,end_msg,init_coverage +from test.include.common import set_globals, end_msg, init_coverage from mmgen import main_tool from mmgen.cfg import Config -from mmgen.color import green,blue,purple,cyan,gray +from mmgen.color import green, blue, purple, cyan, gray from mmgen.util import msg, msg_r, Msg, die skipped_tests = ['mn2hex_interactive'] -coin_dependent_groups = ('Coin','File') +coin_dependent_groups = ('Coin', 'File') opts_data = { 'text': { @@ -81,7 +81,7 @@ set_globals(cfg) from test.tooltest2_d.data import * -def fork_cmd(cmd_name,args,opts,stdin_input): +def fork_cmd(cmd_name, args, opts, stdin_input): cmd = ( tool_cmd_preargs + tool_cmd + @@ -90,8 +90,8 @@ def fork_cmd(cmd_name,args,opts,stdin_input): ) vmsg('{} {}'.format( green('Executing'), - cyan(' '.join(cmd)) )) - cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE) + cyan(' '.join(cmd)))) + cp = run(cmd, input=stdin_input or None, stdout=PIPE, stderr=PIPE) try: cmd_out = cp.stdout.decode() except: @@ -100,11 +100,11 @@ def fork_cmd(cmd_name,args,opts,stdin_input): vmsg(cp.stderr.strip().decode()) if cp.returncode != 0: import re - m = re.search(b'tool command returned (None|False)',cp.stderr) + m = re.search(b'tool command returned (None|False)', cp.stderr) if m: return eval(m.group(1)) else: - die(2,f'Spawned program exited with error: {cp.stderr}') + die(2, f'Spawned program exited with error: {cp.stderr}') return cmd_out.strip() @@ -112,50 +112,50 @@ def call_method(cls, method, cmd_name, args, mmtype, stdin_input): vmsg('{a}: {b}{c}'.format( a = purple('Running'), b = ' '.join([cmd_name]+[repr(e) for e in args]), - c = ' '+mmtype if mmtype else '' )) - aargs,kwargs = main_tool.process_args(cmd_name,args,cls) + c = ' '+mmtype if mmtype else '')) + aargs, kwargs = main_tool.process_args(cmd_name, args, cls) oq_save = bool(cfg.quiet) if not cfg.verbose: cfg._set_quiet(True) if stdin_input: - fd0,fd1 = os.pipe() + fd0, fd1 = os.pipe() if os.fork(): # parent os.close(fd1) stdin_save = os.dup(0) - os.dup2(fd0,0) - cmd_out = method(*aargs,**kwargs) - os.dup2(stdin_save,0) + os.dup2(fd0, 0) + cmd_out = method(*aargs, **kwargs) + os.dup2(stdin_save, 0) os.wait() cfg._set_quiet(oq_save) return cmd_out else: # child os.close(fd0) - os.write(fd1,stdin_input) + os.write(fd1, stdin_input) vmsg(f'Input: {stdin_input!r}') sys.exit(0) else: - ret = method(*aargs,**kwargs) + ret = method(*aargs, **kwargs) if type(ret).__name__ == 'coroutine': ret = asyncio.run(ret) cfg._set_quiet(oq_save) return ret -def tool_api(cls,cmd_name,args,opts): +def tool_api(cls, cmd_name, args, opts): from mmgen.tool.api import tool_api tool = tool_api(cfg) if opts: for o in opts: if o.startswith('--type='): tool.addrtype = o.split('=')[1] - pargs,kwargs = main_tool.process_args(cmd_name,args,cls) - return getattr(tool,cmd_name)(*pargs,**kwargs) + pargs, kwargs = main_tool.process_args(cmd_name, args, cls) + return getattr(tool, cmd_name)(*pargs, **kwargs) -def check_output(out,chk): - if isinstance(chk,str): +def check_output(out, chk): + if isinstance(chk, str): chk = chk.encode() - if isinstance(out,int): + if isinstance(out, int): out = str(out).encode() - if isinstance(out,str): + if isinstance(out, str): out = out.encode() err_fs = "Output ({!r}) doesn't match expected output ({!r})" try: @@ -165,18 +165,18 @@ def check_output(out,chk): if type(chk).__name__ == 'function': assert chk(outd), f'{chk.__name__}({outd}) failed!' - elif isinstance(chk,dict): - for k,v in chk.items(): + elif isinstance(chk, dict): + for k, v in chk.items(): if k == 'boolfunc': assert v(outd), f'{v.__name__}({outd}) failed!' elif k == 'value': - assert outd == v, err_fs.format(outd,v) + assert outd == v, err_fs.format(outd, v) else: - outval = getattr(__builtins__,k)(out) + outval = getattr(__builtins__, k)(out) if outval != v: - die(1,f'{k}({out}) returned {outval}, not {v}!') + die(1, f'{k}({out}) returned {outval}, not {v}!') elif chk is not None: - assert out == chk, err_fs.format(out,chk) + assert out == chk, err_fs.format(out, chk) def run_test(cls, gid, cmd_name): data = tests[gid][cmd_name] @@ -199,14 +199,14 @@ def run_test(cls, gid, cmd_name): m = '{} {}{}'.format( purple('Testing'), - cmd_name if cfg.names else docstring_head(getattr(cls,cmd_name)), - m2 ) + cmd_name if cfg.names else docstring_head(getattr(cls, cmd_name)), + m2) msg_r(green(m)+'\n' if cfg.verbose else m) skipping = False - for n,d in enumerate(data): - args,out,opts,mmtype = d + tuple([None] * (4-len(d))) + for n, d in enumerate(data): + args, out, opts, mmtype = d + tuple([None] * (4-len(d))) if 'fmt=xmrseed' in args and cfg.no_altcoin: if not skipping: qmsg('') @@ -216,21 +216,21 @@ def run_test(cls, gid, cmd_name): continue skipping = False stdin_input = None - if args and isinstance(args[0],bytes): + if args and isinstance(args[0], bytes): stdin_input = args[0] args[0] = '-' if cfg.tool_api: - if args and args[0 ]== '-': + if args and args[0]== '-': continue - cmd_out = tool_api(cls,cmd_name,args,opts) + cmd_out = tool_api(cls, cmd_name, args, opts) elif cfg.fork: - cmd_out = fork_cmd(cmd_name,args,opts,stdin_input) + cmd_out = fork_cmd(cmd_name, args, opts, stdin_input) else: if stdin_input and sys.platform == 'win32': msg(gray('Skipping for MSWin - no os.fork()')) continue - method = getattr(cls(cfg,cmdname=cmd_name,proto=proto,mmtype=mmtype),cmd_name) + method = getattr(cls(cfg, cmdname=cmd_name, proto=proto, mmtype=mmtype), cmd_name) cmd_out = call_method(cls, method, cmd_name, args, mmtype, stdin_input) try: @@ -238,19 +238,19 @@ def run_test(cls, gid, cmd_name): except: vmsg(f'Output:\n{cmd_out!r}\n') - if isinstance(out,tuple) and type(out[0]).__name__ == 'function': + if isinstance(out, tuple) and type(out[0]).__name__ == 'function': func_out = out[0](cmd_out) - assert func_out == out[1],( + assert func_out == out[1], ( '{}({}) == {} failed!\nOutput: {}'.format( out[0].__name__, cmd_out, out[1], - func_out )) - elif isinstance(out,(list,tuple)): - for co,o in zip(cmd_out.split(NL) if cfg.fork else cmd_out,out): - check_output(co,o) + func_out)) + elif isinstance(out, (list, tuple)): + for co, o in zip(cmd_out.split(NL) if cfg.fork else cmd_out, out): + check_output(co, o) else: - check_output(cmd_out,out) + check_output(cmd_out, out) if not cfg.verbose: msg_r('.') @@ -266,7 +266,7 @@ def do_group(gid): cls = main_tool.get_mod_cls(gid.lower()) qmsg(blue('Testing ' + desc if cfg.names else - ( docstring_head(cls) or desc ) + (docstring_head(cls) or desc) )) for cmdname in cls(cfg).user_commands: @@ -275,18 +275,18 @@ def do_group(gid): if cmdname not in tests[gid]: m = f'No test for command {cmdname!r} in group {gid!r}!' if cfg.die_on_missing: - die(1,m+' Aborting') + die(1, m+' Aborting') else: msg(m) continue - run_test(cls,gid,cmdname) + run_test(cls, gid, cmdname) def do_cmd_in_group(cmdname): cls = main_tool.get_cmd_cls(cmdname) - for gid,cmds in tests.items(): + for gid, cmds in tests.items(): for cmd in cmds: if cmd == cmdname: - run_test(cls,gid,cmdname) + run_test(cls, gid, cmdname) return True return False @@ -301,7 +301,7 @@ def main(): do_group(cmd) else: if not do_cmd_in_group(cmd): - die(1,f'{cmd!r}: not a recognized test or test group') + die(1, f'{cmd!r}: not a recognized test or test group') else: for garg in tests: do_group(garg) @@ -317,8 +317,8 @@ if cfg.tool_api: if cfg.list_tests: Msg('Available tests:') - for modname,cmdlist in main_tool.mods.items(): - cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd') + for modname, cmdlist in main_tool.mods.items(): + cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'), 'tool_cmd') Msg(f' {modname:6} - {docstring_head(cls)}') sys.exit(0) @@ -326,21 +326,21 @@ if cfg.list_tested_cmds: list_tested_cmds() sys.exit(0) -tool_exec = os.path.relpath(os.path.join('cmds','mmgen-tool')) +tool_exec = os.path.relpath(os.path.join('cmds', 'mmgen-tool')) if cfg.fork: - passthru_args = ['coin','type','testnet','token'] - tool_cmd = [ tool_exec, '--skip-cfg-file' ] + [ + passthru_args = ['coin', 'type', 'testnet', 'token'] + tool_cmd = [tool_exec, '--skip-cfg-file'] + [ '--{}{}'.format( - k.replace('_','-'), - '='+getattr(cfg,k) if getattr(cfg,k) is not True else '') - for k in passthru_args if getattr(cfg,k) ] + k.replace('_', '-'), + '='+getattr(cfg, k) if getattr(cfg, k) is not True else '') + for k in passthru_args if getattr(cfg, k)] if cfg.coverage: - d,f = init_coverage() - tool_cmd_preargs = ['python3','-m','trace','--count','--coverdir='+d,'--file='+f] + d, f = init_coverage() + tool_cmd_preargs = ['python3', '-m', 'trace', '--count', '--coverdir='+d, '--file='+f] else: - tool_cmd_preargs = ['python3','scripts/exec_wrapper.py'] + tool_cmd_preargs = ['python3', 'scripts/exec_wrapper.py'] from mmgen.main import launch start_time = int(time.time()) diff --git a/test/unit_tests.py b/test/unit_tests.py index 81b36de3..b7b366e1 100755 --- a/test/unit_tests.py +++ b/test/unit_tests.py @@ -20,7 +20,7 @@ test/unit_tests.py: Unit tests for the MMGen suite """ -import sys,os,time,importlib,platform,asyncio +import sys, os, time, importlib, platform, asyncio try: from include.test_init import repo_root @@ -33,13 +33,13 @@ if not os.getenv('MMGEN_DEVTOOLS'): from mmgen.devinit import init_dev init_dev() -from mmgen.cfg import Config,gc +from mmgen.cfg import Config, gc from mmgen.color import gray, brown, orange, yellow, red from mmgen.util import msg, msg_r, gmsg, ymsg, Msg -from test.include.common import set_globals,end_msg +from test.include.common import set_globals, end_msg -def die(ev,s): +def die(ev, s): msg((red if ev > 1 else yellow)(s)) sys.exit(ev) @@ -69,20 +69,20 @@ If no test is specified, all available tests are run if os.path.islink(Config.test_datadir): os.unlink(Config.test_datadir) -sys.argv.insert(1,'--skip-cfg-file') +sys.argv.insert(1, '--skip-cfg-file') cfg = Config(opts_data=opts_data) if cfg.no_altcoin_deps: ymsg(f'{gc.prog_name}: skipping altcoin tests by user request') -type(cfg)._reset_ok += ('use_internal_keccak_module','debug_addrlist') +type(cfg)._reset_ok += ('use_internal_keccak_module', 'debug_addrlist') set_globals(cfg) file_pfx = 'ut_' -tests_d = os.path.join(repo_root,'test','unit_tests_d') +tests_d = os.path.join(repo_root, 'test', 'unit_tests_d') all_tests = sorted(fn[len(file_pfx):-len('.py')] for fn in os.listdir(tests_d) if fn.startswith(file_pfx)) @@ -90,7 +90,7 @@ exclude = cfg.exclude.split(',') if cfg.exclude else [] for e in exclude: if e not in all_tests: - die(1,f'{e!r}: invalid parameter for --exclude (no such test)') + die(1, f'{e!r}: invalid parameter for --exclude (no such test)') start_time = int(time.time()) @@ -102,25 +102,25 @@ if cfg.list_subtests: def gen(): for test in all_tests: mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}') - if hasattr(mod,'unit_tests'): - t = getattr(mod,'unit_tests') - subtests = [k for k,v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_'] - yield fs.format( test, ' '.join(f'{subtest}' for subtest in subtests) ) + if hasattr(mod, 'unit_tests'): + t = getattr(mod, 'unit_tests') + subtests = [k for k, v in t.__dict__.items() if type(v).__name__ == 'function' and k[0] != '_'] + yield fs.format(test, ' '.join(f'{subtest}' for subtest in subtests)) else: yield test fs = '{:%s} {}' % max(len(t) for t in all_tests) - Msg( fs.format('TEST','SUBTESTS') + '\n' + '\n'.join(gen()) ) + Msg(fs.format('TEST', 'SUBTESTS') + '\n' + '\n'.join(gen())) sys.exit(0) class UnitTestHelpers: - def __init__(self,subtest_name): + def __init__(self, subtest_name): self.subtest_name = subtest_name - def skip_msg(self,desc): - cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_","-")!r} for {desc}')) + def skip_msg(self, desc): + cfg._util.qmsg(gray(f'Skipping subtest {self.subtest_name.replace("_", "-")!r} for {desc}')) - def process_bad_data(self,data,pfx='bad '): + def process_bad_data(self, data, pfx='bad '): if os.getenv('PYTHONOPTIMIZE'): ymsg('PYTHONOPTIMIZE set, skipping error handling tests') return @@ -130,7 +130,7 @@ class UnitTestHelpers: m_exc = '{!r}: incorrect exception type (expected {!r})' m_err = '{!r}: incorrect error msg (should match {!r}' m_noraise = "\nillegal action '{}{}' failed to raise an exception (expected {!r})" - for (desc,exc_chk,emsg_chk,func) in data: + for (desc, exc_chk, emsg_chk, func) in data: try: cfg._util.vmsg_r(' {}{:{w}}'.format(pfx, desc+':', w=desc_w+1)) ret = func() @@ -140,28 +140,28 @@ class UnitTestHelpers: exc = type(e).__name__ emsg = e.args[0] cfg._util.vmsg(f' {exc:{exc_w}} [{emsg}]') - assert exc == exc_chk, m_exc.format(exc,exc_chk) - assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk) + assert exc == exc_chk, m_exc.format(exc, exc_chk) + assert re.search(emsg_chk, emsg), m_err.format(emsg, emsg_chk) else: - die(4,m_noraise.format(pfx,desc,exc_chk)) + die(4, m_noraise.format(pfx, desc, exc_chk)) tests_seen = [] -def run_test(test,subtest=None): +def run_test(test, subtest=None): mod = importlib.import_module(f'test.unit_tests_d.{file_pfx}{test}') - def run_subtest(t,subtest): - subtest_disp = subtest.replace('_','-') + def run_subtest(t, subtest): + subtest_disp = subtest.replace('_', '-') msg(brown('Running unit subtest ') + orange(f'{test}.{subtest_disp}')) - if getattr(t,'silence_output',False): + if getattr(t, 'silence_output', False): t._silence() - if hasattr(t,'_pre_subtest'): - getattr(t,'_pre_subtest')(test,subtest,UnitTestHelpers(subtest)) + if hasattr(t, '_pre_subtest'): + getattr(t, '_pre_subtest')(test, subtest, UnitTestHelpers(subtest)) try: - func = getattr(t,subtest.replace('-','_')) + func = getattr(t, subtest.replace('-', '_')) c = func.__code__ do_desc = c.co_varnames[c.co_argcount-1] == 'desc' if do_desc: @@ -176,41 +176,41 @@ def run_test(test,subtest=None): if do_desc and not cfg.quiet: msg('OK\n' if cfg.verbose else 'OK') except: - if getattr(t,'silence_output',False): + if getattr(t, 'silence_output', False): t._end_silence() raise - if hasattr(t,'_post_subtest'): - getattr(t,'_post_subtest')(test,subtest,UnitTestHelpers(subtest)) + if hasattr(t, '_post_subtest'): + getattr(t, '_post_subtest')(test, subtest, UnitTestHelpers(subtest)) - if getattr(t,'silence_output',False): + if getattr(t, 'silence_output', False): t._end_silence() if not ret: - die(4,f'Unit subtest {subtest_disp!r} failed') + die(4, f'Unit subtest {subtest_disp!r} failed') if test not in tests_seen: gmsg(f'Running unit test {test}') tests_seen.append(test) - if cfg.no_altcoin_deps and getattr(mod,'altcoin_dep',None): + if cfg.no_altcoin_deps and getattr(mod, 'altcoin_dep', None): cfg._util.qmsg(gray(f'Skipping unit test {test!r} [--no-altcoin-deps]')) return - if hasattr(mod,'unit_tests'): # new class-based API - t = getattr(mod,'unit_tests')() - altcoin_deps = getattr(t,'altcoin_deps',()) + if hasattr(mod, 'unit_tests'): # new class-based API + t = getattr(mod, 'unit_tests')() + altcoin_deps = getattr(t, 'altcoin_deps', ()) win_skip = getattr(t, 'win_skip', ()) mac_skip = getattr(t, 'mac_skip', ()) arm_skip = getattr(t, 'arm_skip', ()) subtests = ( [subtest] if subtest else - [k for k,v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_'] + [k for k, v in type(t).__dict__.items() if type(v).__name__ == 'function' and k[0] != '_'] ) - if hasattr(t,'_pre'): + if hasattr(t, '_pre'): t._pre() for _subtest in subtests: - subtest_disp = _subtest.replace('_','-') + subtest_disp = _subtest.replace('_', '-') if cfg.no_altcoin_deps and _subtest in altcoin_deps: cfg._util.qmsg(gray(f'Skipping unit subtest {subtest_disp!r} [--no-altcoin-deps]')) continue @@ -224,23 +224,23 @@ def run_test(test,subtest=None): cfg._util.qmsg(gray(f'Skipping unit subtest {subtest_disp!r} for ARM platform')) continue run_subtest(t, _subtest) - if hasattr(t,'_post'): + if hasattr(t, '_post'): t._post() else: assert not subtest, f'{subtest!r}: subtests not supported for this unit test' - if not mod.unit_test().run_test(test,UnitTestHelpers(test)): - die(4,'Unit test {test!r} failed') + if not mod.unit_test().run_test(test, UnitTestHelpers(test)): + die(4, 'Unit test {test!r} failed') def main(): for test in (cfg._args or all_tests): if '.' in test: - test,subtest = test.split('.') + test, subtest = test.split('.') else: subtest = None if test not in all_tests: - die(1,f'{test!r}: test not recognized') + die(1, f'{test!r}: test not recognized') if test not in exclude: - run_test(test,subtest=subtest) + run_test(test, subtest=subtest) end_msg(int(time.time()) - start_time) from mmgen.main import launch