From 6130efcf0c9e16cb163bb3cdc99faa26189cd6c9 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Fri, 18 Oct 2024 10:32:12 +0000 Subject: [PATCH] whitespace: test/include --- test/include/coin_daemon_control.py | 40 ++++----- test/include/common.py | 112 ++++++++++++------------- test/include/ecc.py | 4 +- test/include/pexpect.py | 126 ++++++++++++++-------------- test/include/test_init.py | 2 +- 5 files changed, 143 insertions(+), 141 deletions(-) diff --git a/test/include/coin_daemon_control.py b/test/include/coin_daemon_control.py index ce9c84b4..33fbd504 100755 --- a/test/include/coin_daemon_control.py +++ b/test/include/coin_daemon_control.py @@ -17,8 +17,8 @@ from pathlib import PurePath sys.path[0] = str(PurePath(*PurePath(__file__).parts[:-3])) -from mmgen.cfg import Config,gc -from mmgen.util import msg,die,oneshot_warning,async_run +from mmgen.cfg import Config, gc +from mmgen.util import msg, die, oneshot_warning, async_run from mmgen.protocol import init_proto from mmgen.daemon import CoinDaemon @@ -30,7 +30,7 @@ xmr_wallet_network_ids = { action = gc.prog_name.split('-')[0] opts_data = { - 'sets': [('debug',True,'verbose',True)], + 'sets': [('debug', True, 'verbose', True)], 'text': { 'desc': f'{action.capitalize()} coin or wallet daemons for the MMGen test suite', 'usage':'[opts] ', @@ -56,8 +56,10 @@ Valid network IDs: {nid}, {xmrw_nid}, all, no_xmr """ }, 'code': { - 'options': lambda s: s.format(a=action.capitalize(),pn=gc.prog_name), - 'notes': lambda s,help_notes: s.format( + 'options': lambda s: s.format( + a = action.capitalize(), + pn = gc.prog_name), + 'notes': lambda s, help_notes: s.format( nid = help_notes('coin_daemon_network_ids'), xmrw_nid = ', '.join(xmr_wallet_network_ids), ) @@ -68,20 +70,20 @@ class warn_missing_exec(oneshot_warning): color = 'nocolor' message = 'daemon executable {!r} not found on this system!' -def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False): +def run(network_id=None, proto=None, daemon_id=None, missing_exec_ok=False): if network_id in xmr_wallet_network_ids: from mmgen.proto.xmr.daemon import MoneroWalletDaemon d = MoneroWalletDaemon( cfg = cfg, - proto = init_proto( cfg, coin='XMR', network=xmr_wallet_network_ids[network_id] ), + proto = init_proto(cfg, coin='XMR', network=xmr_wallet_network_ids[network_id]), user = 'test', passwd = 'test passwd', test_suite = True, monerod_addr = None, trust_monerod = True, test_monerod = False, - opts = ['no_daemonize'] if cfg.no_daemonize else None ) + opts = ['no_daemonize'] if cfg.no_daemonize else None) else: d = CoinDaemon( cfg, @@ -91,7 +93,7 @@ def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False): opts = ['no_daemonize'] if cfg.no_daemonize else None, port_shift = int(cfg.port_shift or 0), datadir = cfg.datadir, - daemon_id = daemon_id ) + daemon_id = daemon_id) if cfg.mainnet_only and d.network != 'mainnet': return @@ -105,20 +107,20 @@ def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False): except Exception as e: if not cfg.quiet: msg(str(e)) - warn_missing_exec( div=d.exec_fn, fmt_args=(d.exec_fn,) ) + warn_missing_exec(div=d.exec_fn, fmt_args=(d.exec_fn,)) return if cfg.print_version: - msg('{:16} {}'.format( d.exec_fn+':', d.get_exec_version_str() )) + msg('{:16} {}'.format(d.exec_fn+':', d.get_exec_version_str())) elif cfg.get_state: print(d.state_msg()) elif cfg.testing: for cmd in d.start_cmds if action == 'start' else [d.stop_cmd]: print(' '.join(cmd)) else: - if action == 'stop' and hasattr(d,'rpc'): + if action == 'stop' and hasattr(d, 'rpc'): async_run(d.rpc.stop_daemon(quiet=cfg.quiet)) else: - d.cmd(action,quiet=cfg.quiet) + d.cmd(action, quiet=cfg.quiet) def main(): @@ -126,16 +128,16 @@ def main(): print('\n'.join(CoinDaemon.all_daemon_ids())) elif 'all' in cfg._args or 'no_xmr' in cfg._args: if len(cfg._args) != 1: - die(1,"'all' or 'no_xmr' must be the sole argument") + die(1, "'all' or 'no_xmr' must be the sole argument") for coin in CoinDaemon.coins: if coin == 'XMR' and cfg._args[0] == 'no_xmr': continue - for daemon_id in CoinDaemon.get_daemon_ids(cfg,coin): - for network in CoinDaemon.get_daemon(cfg,coin,daemon_id).networks: + for daemon_id in CoinDaemon.get_daemon_ids(cfg, coin): + for network in CoinDaemon.get_daemon(cfg, coin, daemon_id).networks: run( - proto = init_proto( cfg, coin=coin, network=network ), + proto = init_proto(cfg, coin=coin, network=network), daemon_id = daemon_id, - missing_exec_ok = True ) + missing_exec_ok = True) else: ids = cfg._args network_ids = CoinDaemon.get_network_ids(cfg) @@ -143,7 +145,7 @@ def main(): cfg._usage() for i in ids: if i not in network_ids + list(xmr_wallet_network_ids): - die(1,f'{i!r}: invalid network ID') + die(1, f'{i!r}: invalid network ID') for network_id in ids: run(network_id=network_id.lower()) diff --git a/test/include/common.py b/test/include/common.py index d4858e84..4a6e9dc6 100755 --- a/test/include/common.py +++ b/test/include/common.py @@ -25,11 +25,11 @@ from subprocess import run, PIPE, DEVNULL from pathlib import Path from mmgen.cfg import gv -from mmgen.color import yellow,green,orange +from mmgen.color import yellow, green, orange from mmgen.util import msg, msg_r, Msg, Msg_r, gmsg, die, suf, fmt_list -from mmgen.fileutil import write_data_to_file,get_data_from_file +from mmgen.fileutil import write_data_to_file, get_data_from_file -def noop(*args,**kwargs): +def noop(*args, **kwargs): pass def set_globals(cfg): @@ -57,23 +57,23 @@ def set_globals(cfg): this.dmsg = msg if cfg.debug else noop def strip_ansi_escapes(s): - return re.sub('\x1b' + r'\[[;0-9]+?m','',s) + return re.sub('\x1b' + r'\[[;0-9]+?m', '', s) cmdtest_py_log_fn = 'cmdtest.py.log' cmdtest_py_error_fn = 'cmdtest.py.err' parity_dev_amt = 1606938044258990275541962092341162602522202993782792835301376 -ascii_uc = ''.join(map(chr,list(range(65,91)))) # 26 chars -ascii_lc = ''.join(map(chr,list(range(97,123)))) # 26 chars -lat_accent = ''.join(map(chr,list(range(192,383)))) # 191 chars, L,S -ru_uc = ''.join(map(chr,list(range(1040,1072)))) # 32 chars -gr_uc = ''.join(map(chr,list(range(913,930)) + list(range(931,940)))) # 26 chars (930 is ctrl char) -gr_uc_w_ctrl = ''.join(map(chr,list(range(913,940)))) # 27 chars, L,C +ascii_uc = ''.join(map(chr, list(range(65, 91)))) # 26 chars +ascii_lc = ''.join(map(chr, list(range(97, 123)))) # 26 chars +lat_accent = ''.join(map(chr, list(range(192, 383)))) # 191 chars, L, S +ru_uc = ''.join(map(chr, list(range(1040, 1072)))) # 32 chars +gr_uc = ''.join(map(chr, list(range(913, 930)) + list(range(931, 940)))) # 26 chars (930 is ctrl char) +gr_uc_w_ctrl = ''.join(map(chr, list(range(913, 940)))) # 27 chars, L, C lat_cyr_gr = lat_accent[:130:5] + ru_uc + gr_uc # 84 chars ascii_cyr_gr = ascii_uc + ru_uc + gr_uc # 84 chars -utf8_text = '[α-$ample UTF-8 text-ω]' * 10 # 230 chars, L,N,P,S,Z -utf8_combining = '[α-$ámple UTF-8 téxt-ω]' * 10 # L,N,P,S,Z,M -utf8_ctrl = '[α-$ample\nUTF-8\ntext-ω]' * 10 # L,N,P,S,Z,C +utf8_text = '[α-$ample UTF-8 text-ω]' * 10 # 230 chars, L, N, P, S, Z +utf8_combining = '[α-$ámple UTF-8 téxt-ω]' * 10 # L, N, P, S, Z, M +utf8_ctrl = '[α-$ample\nUTF-8\ntext-ω]' * 10 # L, N, P, S, Z, C text_jp = '必要なのは、信用ではなく暗号化された証明に基づく電子取引システムであり、これにより希望する二者が信用できる第三者機関を介さずに直接取引できるよう' # 72 chars ('W'ide) text_zh = '所以,我們非常需要這樣一種電子支付系統,它基於密碼學原理而不基於信用,使得任何達成一致的雙方,能夠直接進行支付,從而不需要協力廠商仲介的參與。。' # 72 chars ('F'ull + 'W'ide) @@ -119,23 +119,23 @@ def getrand(n): return os.urandom(n) def getrandnum(n): - return int(getrand(n).hex(),16) + return int(getrand(n).hex(), 16) def getrandhex(n): return getrand(n).hex() -def getrandnum_range(nbytes,rn_max): +def getrandnum_range(nbytes, rn_max): while True: - rn = int(getrand(nbytes).hex(),16) + rn = int(getrand(nbytes).hex(), 16) if rn < rn_max: return rn -def getrandstr(num_chars,no_space=False): - n,m = (94,33) if no_space else (95,32) - return ''.join( chr(i % n + m) for i in list(getrand(num_chars)) ) +def getrandstr(num_chars, no_space=False): + n, m = (94, 33) if no_space else (95, 32) + return ''.join(chr(i % n + m) for i in list(getrand(num_chars))) # Windows uses non-UTF8 encodings in filesystem, so use raw bytes here -def cleandir(d,do_msg=False): +def cleandir(d, do_msg=False): d_enc = d.encode() try: @@ -151,13 +151,13 @@ def cleandir(d,do_msg=False): try: os.unlink(os.path.join(d_enc, f)) except: - rmtree(os.path.join(d_enc,f), ignore_errors=True) + rmtree(os.path.join(d_enc, f), ignore_errors=True) return files def mk_tmpdir(d): try: - os.makedirs( d, mode=0o755, exist_ok=True ) + os.makedirs(d, mode=0o755, exist_ok=True) except OSError as e: if e.errno != 17: raise @@ -196,29 +196,29 @@ def clean(cfgs, tmpdir_ids=None, extra_dirs=[]): print(f'Removing non-directory ‘{d}’') os.unlink(d) -def get_tmpfile(cfg,fn): - return os.path.join(cfg['tmpdir'],fn) +def get_tmpfile(cfg, fn): + return os.path.join(cfg['tmpdir'], fn) -def write_to_file(fn,data,binary=False): +def write_to_file(fn, data, binary=False): write_data_to_file( cfg, fn, data, quiet = True, binary = binary, - ignore_opt_outdir = True ) + ignore_opt_outdir = True) -def write_to_tmpfile(cfg,fn,data,binary=False): - write_to_file( os.path.join(cfg['tmpdir'],fn), data=data, binary=binary ) +def write_to_tmpfile(cfg, fn, data, binary=False): + write_to_file(os.path.join(cfg['tmpdir'], fn), data=data, binary=binary) -def read_from_file(fn,binary=False): - return get_data_from_file( cfg, fn, quiet=True, binary=binary ) +def read_from_file(fn, binary=False): + return get_data_from_file(cfg, fn, quiet=True, binary=binary) -def read_from_tmpfile(cfg,fn,binary=False): - return read_from_file(os.path.join(cfg['tmpdir'],fn),binary=binary) +def read_from_tmpfile(cfg, fn, binary=False): + return read_from_file(os.path.join(cfg['tmpdir'], fn), binary=binary) -def joinpath(*args,**kwargs): - return os.path.join(*args,**kwargs) +def joinpath(*args, **kwargs): + return os.path.join(*args, **kwargs) def ok(text='OK'): if cfg.profile: @@ -228,25 +228,25 @@ def ok(text='OK'): else: msg(f' {text}') -def cmp_or_die(s,t,desc=None): +def cmp_or_die(s, t, desc=None): if s != t: - die( 'TestSuiteFatalException', + die('TestSuiteFatalException', (f'For {desc}:\n' if desc else '') + f'ERROR: recoded data:\n{t!r}\ndiffers from original data:\n{s!r}' ) def init_coverage(): - coverdir = os.path.join('test','trace') - acc_file = os.path.join('test','trace.acc') + coverdir = os.path.join('test', 'trace') + acc_file = os.path.join('test', 'trace.acc') try: - os.mkdir(coverdir,0o755) + os.mkdir(coverdir, 0o755) except: pass - return coverdir,acc_file + return coverdir, acc_file def silence(): if not (cfg.verbose or cfg.exact_output): - gv.stdout = gv.stderr = open(os.devnull,'w') + gv.stdout = gv.stderr = open(os.devnull, 'w') def end_silence(): if not (cfg.verbose or cfg.exact_output): @@ -287,31 +287,31 @@ def end_msg(t): ('' if cfg.test_suite_deterministic else f', elapsed time: {t//60:02d}:{t%60:02d}') )) -def start_test_daemons(*network_ids,remove_datadir=False): +def start_test_daemons(*network_ids, remove_datadir=False): if not cfg.no_daemon_autostart: - return test_daemons_ops(*network_ids,op='start',remove_datadir=remove_datadir) + return test_daemons_ops(*network_ids, op='start', remove_datadir=remove_datadir) -def stop_test_daemons(*network_ids,force=False,remove_datadir=False): +def stop_test_daemons(*network_ids, force=False, remove_datadir=False): if force or not cfg.no_daemon_stop: - return test_daemons_ops(*network_ids,op='stop',remove_datadir=remove_datadir) + return test_daemons_ops(*network_ids, op='stop', remove_datadir=remove_datadir) -def restart_test_daemons(*network_ids,remove_datadir=False): +def restart_test_daemons(*network_ids, remove_datadir=False): if not stop_test_daemons(*network_ids): return False - return start_test_daemons(*network_ids,remove_datadir=remove_datadir) + return start_test_daemons(*network_ids, remove_datadir=remove_datadir) -def test_daemons_ops(*network_ids,op,remove_datadir=False): +def test_daemons_ops(*network_ids, op, remove_datadir=False): if not cfg.no_daemon_autostart: from mmgen.daemon import CoinDaemon silent = not (cfg.verbose or cfg.exact_output) ret = False for network_id in network_ids: - d = CoinDaemon(cfg,network_id,test_suite=True) + d = CoinDaemon(cfg, network_id, test_suite=True) if remove_datadir: d.wait = True d.stop(silent=True) d.remove_datadir() - ret = d.cmd(op,silent=silent) + ret = d.cmd(op, silent=silent) return ret tested_solc_ver = '0.8.26' @@ -319,9 +319,9 @@ tested_solc_ver = '0.8.26' def check_solc_ver(): cmd = 'python3 scripts/create-token.py --check-solc-version' try: - cp = run(cmd.split(),check=False,stdout=PIPE) + cp = run(cmd.split(), check=False, stdout=PIPE) except Exception as e: - die(4,f'Unable to execute {cmd!r}: {e}') + die(4, f'Unable to execute {cmd!r}: {e}') res = cp.stdout.decode().strip() if cp.returncode == 0: omsg( @@ -335,15 +335,15 @@ def check_solc_ver(): return False def get_ethkey(): - cmdnames = ('ethkey','openethereum-ethkey') + cmdnames = ('ethkey', 'openethereum-ethkey') for cmdname in cmdnames: try: - run([cmdname,'--help'],stdout=PIPE) + run([cmdname, '--help'], stdout=PIPE) except: pass else: return cmdname - die(1,f'ethkey executable not found (tried {cmdnames})') + die(1, f'ethkey executable not found (tried {cmdnames})') def do_run(cmd, check=True): return run(cmd, stdout=PIPE, stderr=DEVNULL, check=check) @@ -386,7 +386,7 @@ class VirtBlockDeviceBase: def attach(self, dev_mode=None, silent=False): if res := self._get_associations(): - die(2, f'Device{suf(res)} {fmt_list(res,fmt="barest")} already associated with {self.img_path}') + die(2, f'Device{suf(res)} {fmt_list(res, fmt="barest")} already associated with {self.img_path}') dev = self.get_new_dev() if dev_mode: self.dev_mode_orig = '0{:o}'.format(os.stat(dev).st_mode & 0xfff) diff --git a/test/include/ecc.py b/test/include/ecc.py index 8f327ed0..4b1186bf 100755 --- a/test/include/ecc.py +++ b/test/include/ecc.py @@ -21,7 +21,7 @@ def _pubkey_to_pub_point(vk_bytes): except Exception as e: raise ValueError(f'invalid pubkey {vk_bytes.hex()}\n {type(e).__name__}: {e}') -def _check_pub_point(pub_point,vk_bytes,addend_bytes=None): +def _check_pub_point(pub_point, vk_bytes, addend_bytes=None): if pub_point is ecdsa.ellipticcurve.INFINITY: raise ValueError( 'pubkey {}{} produced key with point at infinity!'.format( @@ -31,7 +31,7 @@ def _check_pub_point(pub_point,vk_bytes,addend_bytes=None): def pubkey_check_pyecdsa(vk_bytes): _check_pub_point(_pubkey_to_pub_point(vk_bytes), vk_bytes) -def pubkey_tweak_add_pyecdsa(vk_bytes,pk_addend_bytes): +def pubkey_tweak_add_pyecdsa(vk_bytes, pk_addend_bytes): pk_addend = int.from_bytes(pk_addend_bytes, byteorder='big') point_sum = ( _pubkey_to_pub_point(vk_bytes) + diff --git a/test/include/pexpect.py b/test/include/pexpect.py index 2962dd0e..10114a47 100755 --- a/test/include/pexpect.py +++ b/test/include/pexpect.py @@ -20,20 +20,20 @@ test.include.pexpect: pexpect implementation for MMGen test suites """ -import sys,time -from mmgen.color import red,yellow,green,cyan -from mmgen.util import msg,msg_r,rmsg,die -from .common import cfg,vmsg,vmsg_r,getrandstr,strip_ansi_escapes +import sys, time +from mmgen.color import red, yellow, green, cyan +from mmgen.util import msg, msg_r, rmsg, die +from .common import cfg, vmsg, vmsg_r, getrandstr, strip_ansi_escapes try: import pexpect from pexpect.popen_spawn import PopenSpawn except ImportError as e: - die(2,red(f'Pexpect module is missing. Cannnot run test suite ({e!r})')) + die(2, red(f'Pexpect module is missing. Cannnot run test suite ({e!r})')) def debug_pexpect_msg(p): - msg('\n{}{}{}'.format( red('BEFORE ['), p.before, red(']') )) - msg('{}{}{}'.format( red('MATCH ['), p.after, red(']') )) + msg('\n{}{}{}'.format(red('BEFORE ['), p.before, red(']'))) + msg('{}{}{}'.format(red('MATCH ['), p.after, red(']'))) NL = '\n' @@ -47,7 +47,7 @@ class MMGenPexpect: pexpect_spawn = False, send_delay = None, timeout = None, - direct_exec = False ): + direct_exec = False): self.pexpect_spawn = pexpect_spawn self.send_delay = send_delay @@ -57,15 +57,15 @@ class MMGenPexpect: self.exit_val = None if direct_exec or cfg.direct_exec: - from subprocess import Popen,DEVNULL + from subprocess import Popen, DEVNULL redir = DEVNULL if (no_output or not cfg.exact_output) else None - self.ep = Popen([args[0]] + args[1:], stderr=redir, env=spawn_env ) + self.ep = Popen([args[0]] + args[1:], stderr=redir, env=spawn_env) else: - timeout = int(timeout or cfg.pexpect_timeout or 0) or (60,5)[bool(cfg.debug_pexpect)] + timeout = int(timeout or cfg.pexpect_timeout or 0) or (60, 5)[bool(cfg.debug_pexpect)] if pexpect_spawn: - self.p = pexpect.spawn(args[0],args[1:],encoding='utf8',timeout=timeout,env=spawn_env) + self.p = pexpect.spawn(args[0], args[1:], encoding='utf8', timeout=timeout, env=spawn_env) else: - self.p = PopenSpawn(args,encoding='utf8',timeout=timeout,env=spawn_env) + self.p = PopenSpawn(args, encoding='utf8', timeout=timeout, env=spawn_env) if cfg.exact_output: self.p.logfile = sys.stdout @@ -76,20 +76,20 @@ class MMGenPexpect: desc = 'key-address data', check = True, have_yes_opt = False): - self.passphrase(desc,pw) + self.passphrase(desc, pw) if not have_yes_opt: - self.expect('Check key-to-address validity? (y/N): ',('n','y')[check]) + self.expect('Check key-to-address validity? (y/N): ', ('n', 'y')[check]) - def view_tx(self,view): - self.expect(r'View.* transaction.*\? .*: ',view,regex=True) + def view_tx(self, view): + self.expect(r'View.* transaction.*\? .*: ', view, regex=True) if view not in 'n\n': - self.expect('to continue: ','\n') + self.expect('to continue: ', '\n') - def do_comment(self,add_comment,has_label=False): - p = ('Add a comment to transaction','Edit transaction comment')[has_label] - self.expect(f'{p}? (y/N): ',('n','y')[bool(add_comment)]) + def do_comment(self, add_comment, has_label=False): + p = ('Add a comment to transaction', 'Edit transaction comment')[has_label] + self.expect(f'{p}? (y/N): ', ('n', 'y')[bool(add_comment)]) if add_comment: - self.expect('Comment: ',add_comment+'\n') + self.expect('Comment: ', add_comment+'\n') def ok(self, exit_val=None): if not self.pexpect_spawn: @@ -97,81 +97,81 @@ class MMGenPexpect: self.p.read() ret = self.p.wait() if ret != (self.exit_val or exit_val or 0) and not cfg.coverage: - die( 'TestSuiteSpawnedScriptException', f'Spawned script exited with value {ret}' ) + die('TestSuiteSpawnedScriptException', f'Spawned script exited with value {ret}') if cfg.profile: return if not self.skip_ok: m = 'OK\n' if ret == 0 else f'OK[{ret}]\n' - sys.stderr.write( green(m) if cfg.exact_output or cfg.verbose else ' '+m ) + sys.stderr.write(green(m) if cfg.exact_output or cfg.verbose else ' '+m) return self def license(self): if self.spawn_env.get('MMGEN_NO_LICENSE'): return - self.expect("'w' for conditions and warranty info, or 'c' to continue: ",'c') + self.expect("'w' for conditions and warranty info, or 'c' to continue: ", 'c') - def label(self,label='Test Label (UTF-8) α'): - self.expect('Enter a wallet label, or hit ENTER for no label: ',label+'\n') + def label(self, label='Test Label (UTF-8) α'): + self.expect('Enter a wallet label, or hit ENTER for no label: ', label+'\n') - def usr_rand(self,num_chars): + def usr_rand(self, num_chars): if cfg.usr_random: self.interactive() self.send('\n') else: - rand_chars = list(getrandstr(num_chars,no_space=True)) + rand_chars = list(getrandstr(num_chars, no_space=True)) vmsg_r('SEND ') while rand_chars: ch = rand_chars.pop(0) msg_r(yellow(ch)+' ' if cfg.verbose else '+') - self.expect('left: ',ch,delay=0.005) - self.expect('ENTER to continue: ','\n') + self.expect('left: ', ch, delay=0.005) + self.expect('ENTER to continue: ', '\n') - def passphrase_new(self,desc,passphrase): - self.expect(f'Enter passphrase for {desc}: ',passphrase+'\n') - self.expect('Repeat passphrase: ',passphrase+'\n') + def passphrase_new(self, desc, passphrase): + self.expect(f'Enter passphrase for {desc}: ', passphrase+'\n') + self.expect('Repeat passphrase: ', passphrase+'\n') - def passphrase(self,desc,passphrase,pwtype=''): + def passphrase(self, desc, passphrase, pwtype=''): if pwtype: pwtype += ' ' - self.expect(f'Enter {pwtype}passphrase for {desc}.*?: ',passphrase+'\n',regex=True) + self.expect(f'Enter {pwtype}passphrase for {desc}.*?: ', passphrase+'\n', regex=True) - def hash_preset(self,desc,preset=''): + def hash_preset(self, desc, preset=''): self.expect(f'Enter hash preset for {desc}') - self.expect('or hit ENTER .*?:',str(preset)+'\n',regex=True) + self.expect('or hit ENTER .*?:', str(preset)+'\n', regex=True) - def written_to_file(self,desc,overwrite_unlikely=False,query='Overwrite? '): + def written_to_file(self, desc, overwrite_unlikely=False, query='Overwrite? '): s1 = f'{desc} written to file ' s2 = query + "Type uppercase 'YES' to confirm: " - ret = self.expect(([s1,s2],s1)[overwrite_unlikely]) + ret = self.expect(([s1, s2], s1)[overwrite_unlikely]) if ret == 1: self.send('YES\n') return self.expect_getend("Overwriting file '").rstrip("'") - self.expect(NL,nonl=True) + self.expect(NL, nonl=True) outfile = self.p.before.strip().strip("'") if cfg.debug_pexpect: rmsg(f'Outfile [{outfile}]') - vmsg('{} file: {}'.format( desc, cyan(outfile.replace('"',"")) )) + vmsg('{} file: {}'.format(desc, cyan(outfile.replace('"', "")))) return outfile - def hincog_create(self,hincog_bytes): - ret = self.expect(['Create? (Y/n): ',"'YES' to confirm: "]) + def hincog_create(self, hincog_bytes): + ret = self.expect(['Create? (Y/n): ', "'YES' to confirm: "]) if ret == 0: self.send('\n') - self.expect('Enter file size: ',str(hincog_bytes)+'\n') + self.expect('Enter file size: ', str(hincog_bytes)+'\n') else: self.send('YES\n') return ret def no_overwrite(self): - self.expect("Overwrite? Type uppercase 'YES' to confirm: ",'\n') + self.expect("Overwrite? Type uppercase 'YES' to confirm: ", '\n') self.expect('Exiting at user request') - def expect_getend(self,s,regex=False): - self.expect(s,regex=regex,nonl=True) + def expect_getend(self, s, regex=False): + self.expect(s, regex=regex, nonl=True) if cfg.debug_pexpect: debug_pexpect_msg(self.p) # readline() of partial lines doesn't work with PopenSpawn, so do this instead: - self.expect(NL,nonl=True,silent=True) + self.expect(NL, nonl=True, silent=True) if cfg.debug_pexpect: debug_pexpect_msg(self.p) end = self.p.before.rstrip() @@ -182,21 +182,21 @@ class MMGenPexpect: def interactive(self): return self.p.interact() # interact() not available with popen_spawn - def kill(self,signal): + def kill(self, signal): return self.p.kill(signal) - def match_expect_list(self,expect_list,greedy=False): + def match_expect_list(self, expect_list, greedy=False): allrep = '.*' if greedy else '.*?' expect = ( r'(\b|\s)' + - fr'\s{allrep}\s'.join(s.replace(r'.',r'\.').replace(' ',r'\s+') for s in expect_list) + - r'(\b|\s)' ) + fr'\s{allrep}\s'.join(s.replace(r'.', r'\.').replace(' ', r'\s+') for s in expect_list) + + r'(\b|\s)') import re - m = re.search( expect, self.read(strip_color=True), re.DOTALL ) + m = re.search(expect, self.read(strip_color=True), re.DOTALL) assert m, f'No match found for regular expression {expect!r}' return m - def expect(self,s,t='',delay=None,regex=False,nonl=False,silent=False): + def expect(self, s, t='', delay=None, regex=False, nonl=False, silent=False): if not silent: if cfg.verbose: @@ -205,7 +205,7 @@ class MMGenPexpect: msg_r('+') try: - ret = (self.p.expect_exact,self.p.expect)[bool(regex)](s) if s else 0 + ret = (self.p.expect_exact, self.p.expect)[bool(regex)](s) if s else 0 except pexpect.TIMEOUT as e: if cfg.debug_pexpect: raise @@ -217,20 +217,20 @@ class MMGenPexpect: if cfg.debug_pexpect: debug_pexpect_msg(self.p) - if cfg.verbose and not isinstance(s,str): + if cfg.verbose and not isinstance(s, str): msg_r(f' ==> {ret} ') if ret == -1: - die(4,f'Error. Expect returned {ret}') + die(4, f'Error. Expect returned {ret}') else: if t: - self.send(t,delay,s) + self.send(t, delay, s) else: if not nonl and not silent: vmsg('') return ret - def send(self,t,delay=None,s=False): + def send(self, t, delay=None, s=False): delay = delay or self.send_delay if delay: time.sleep(delay) @@ -241,12 +241,12 @@ class MMGenPexpect: if cfg.verbose: ls = '' if cfg.debug or not s else ' ' es = '' if s else ' ' - yt = yellow('{!r}'.format( t.replace('\n',r'\n') )) + yt = yellow('{!r}'.format(t.replace('\n', r'\n'))) msg(f'{ls}SEND {es}{yt}') return ret - def read(self,n=-1,strip_color=False): - return strip_ansi_escapes(self.p.read(n)).replace('\r','') if strip_color else self.p.read(n) + def read(self, n=-1, strip_color=False): + return strip_ansi_escapes(self.p.read(n)).replace('\r', '') if strip_color else self.p.read(n) def close(self): if self.pexpect_spawn: diff --git a/test/include/test_init.py b/test/include/test_init.py index 17c41cee..ca967bb7 100755 --- a/test/include/test_init.py +++ b/test/include/test_init.py @@ -12,7 +12,7 @@ test.include.test_init: Initialization module for test scripts """ -import sys,os +import sys, os from pathlib import PurePath os.environ['MMGEN_TEST_SUITE'] = '1' repo_root = str(PurePath(*PurePath(__file__).parts[:-3]))