whitespace: test/include

This commit is contained in:
The MMGen Project 2024-10-18 10:32:12 +00:00
commit 6130efcf0c
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
5 changed files with 143 additions and 141 deletions

View file

@ -17,8 +17,8 @@ from pathlib import PurePath
sys.path[0] = str(PurePath(*PurePath(__file__).parts[:-3]))
from mmgen.cfg import Config,gc
from mmgen.util import msg,die,oneshot_warning,async_run
from mmgen.cfg import Config, gc
from mmgen.util import msg, die, oneshot_warning, async_run
from mmgen.protocol import init_proto
from mmgen.daemon import CoinDaemon
@ -30,7 +30,7 @@ xmr_wallet_network_ids = {
action = gc.prog_name.split('-')[0]
opts_data = {
'sets': [('debug',True,'verbose',True)],
'sets': [('debug', True, 'verbose', True)],
'text': {
'desc': f'{action.capitalize()} coin or wallet daemons for the MMGen test suite',
'usage':'[opts] <network IDs>',
@ -56,8 +56,10 @@ Valid network IDs: {nid}, {xmrw_nid}, all, no_xmr
"""
},
'code': {
'options': lambda s: s.format(a=action.capitalize(),pn=gc.prog_name),
'notes': lambda s,help_notes: s.format(
'options': lambda s: s.format(
a = action.capitalize(),
pn = gc.prog_name),
'notes': lambda s, help_notes: s.format(
nid = help_notes('coin_daemon_network_ids'),
xmrw_nid = ', '.join(xmr_wallet_network_ids),
)
@ -68,20 +70,20 @@ class warn_missing_exec(oneshot_warning):
color = 'nocolor'
message = 'daemon executable {!r} not found on this system!'
def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False):
def run(network_id=None, proto=None, daemon_id=None, missing_exec_ok=False):
if network_id in xmr_wallet_network_ids:
from mmgen.proto.xmr.daemon import MoneroWalletDaemon
d = MoneroWalletDaemon(
cfg = cfg,
proto = init_proto( cfg, coin='XMR', network=xmr_wallet_network_ids[network_id] ),
proto = init_proto(cfg, coin='XMR', network=xmr_wallet_network_ids[network_id]),
user = 'test',
passwd = 'test passwd',
test_suite = True,
monerod_addr = None,
trust_monerod = True,
test_monerod = False,
opts = ['no_daemonize'] if cfg.no_daemonize else None )
opts = ['no_daemonize'] if cfg.no_daemonize else None)
else:
d = CoinDaemon(
cfg,
@ -91,7 +93,7 @@ def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False):
opts = ['no_daemonize'] if cfg.no_daemonize else None,
port_shift = int(cfg.port_shift or 0),
datadir = cfg.datadir,
daemon_id = daemon_id )
daemon_id = daemon_id)
if cfg.mainnet_only and d.network != 'mainnet':
return
@ -105,20 +107,20 @@ def run(network_id=None,proto=None,daemon_id=None,missing_exec_ok=False):
except Exception as e:
if not cfg.quiet:
msg(str(e))
warn_missing_exec( div=d.exec_fn, fmt_args=(d.exec_fn,) )
warn_missing_exec(div=d.exec_fn, fmt_args=(d.exec_fn,))
return
if cfg.print_version:
msg('{:16} {}'.format( d.exec_fn+':', d.get_exec_version_str() ))
msg('{:16} {}'.format(d.exec_fn+':', d.get_exec_version_str()))
elif cfg.get_state:
print(d.state_msg())
elif cfg.testing:
for cmd in d.start_cmds if action == 'start' else [d.stop_cmd]:
print(' '.join(cmd))
else:
if action == 'stop' and hasattr(d,'rpc'):
if action == 'stop' and hasattr(d, 'rpc'):
async_run(d.rpc.stop_daemon(quiet=cfg.quiet))
else:
d.cmd(action,quiet=cfg.quiet)
d.cmd(action, quiet=cfg.quiet)
def main():
@ -126,16 +128,16 @@ def main():
print('\n'.join(CoinDaemon.all_daemon_ids()))
elif 'all' in cfg._args or 'no_xmr' in cfg._args:
if len(cfg._args) != 1:
die(1,"'all' or 'no_xmr' must be the sole argument")
die(1, "'all' or 'no_xmr' must be the sole argument")
for coin in CoinDaemon.coins:
if coin == 'XMR' and cfg._args[0] == 'no_xmr':
continue
for daemon_id in CoinDaemon.get_daemon_ids(cfg,coin):
for network in CoinDaemon.get_daemon(cfg,coin,daemon_id).networks:
for daemon_id in CoinDaemon.get_daemon_ids(cfg, coin):
for network in CoinDaemon.get_daemon(cfg, coin, daemon_id).networks:
run(
proto = init_proto( cfg, coin=coin, network=network ),
proto = init_proto(cfg, coin=coin, network=network),
daemon_id = daemon_id,
missing_exec_ok = True )
missing_exec_ok = True)
else:
ids = cfg._args
network_ids = CoinDaemon.get_network_ids(cfg)
@ -143,7 +145,7 @@ def main():
cfg._usage()
for i in ids:
if i not in network_ids + list(xmr_wallet_network_ids):
die(1,f'{i!r}: invalid network ID')
die(1, f'{i!r}: invalid network ID')
for network_id in ids:
run(network_id=network_id.lower())

View file

@ -25,11 +25,11 @@ from subprocess import run, PIPE, DEVNULL
from pathlib import Path
from mmgen.cfg import gv
from mmgen.color import yellow,green,orange
from mmgen.color import yellow, green, orange
from mmgen.util import msg, msg_r, Msg, Msg_r, gmsg, die, suf, fmt_list
from mmgen.fileutil import write_data_to_file,get_data_from_file
from mmgen.fileutil import write_data_to_file, get_data_from_file
def noop(*args,**kwargs):
def noop(*args, **kwargs):
pass
def set_globals(cfg):
@ -57,23 +57,23 @@ def set_globals(cfg):
this.dmsg = msg if cfg.debug else noop
def strip_ansi_escapes(s):
return re.sub('\x1b' + r'\[[;0-9]+?m','',s)
return re.sub('\x1b' + r'\[[;0-9]+?m', '', s)
cmdtest_py_log_fn = 'cmdtest.py.log'
cmdtest_py_error_fn = 'cmdtest.py.err'
parity_dev_amt = 1606938044258990275541962092341162602522202993782792835301376
ascii_uc = ''.join(map(chr,list(range(65,91)))) # 26 chars
ascii_lc = ''.join(map(chr,list(range(97,123)))) # 26 chars
lat_accent = ''.join(map(chr,list(range(192,383)))) # 191 chars, L,S
ru_uc = ''.join(map(chr,list(range(1040,1072)))) # 32 chars
gr_uc = ''.join(map(chr,list(range(913,930)) + list(range(931,940)))) # 26 chars (930 is ctrl char)
gr_uc_w_ctrl = ''.join(map(chr,list(range(913,940)))) # 27 chars, L,C
ascii_uc = ''.join(map(chr, list(range(65, 91)))) # 26 chars
ascii_lc = ''.join(map(chr, list(range(97, 123)))) # 26 chars
lat_accent = ''.join(map(chr, list(range(192, 383)))) # 191 chars, L, S
ru_uc = ''.join(map(chr, list(range(1040, 1072)))) # 32 chars
gr_uc = ''.join(map(chr, list(range(913, 930)) + list(range(931, 940)))) # 26 chars (930 is ctrl char)
gr_uc_w_ctrl = ''.join(map(chr, list(range(913, 940)))) # 27 chars, L, C
lat_cyr_gr = lat_accent[:130:5] + ru_uc + gr_uc # 84 chars
ascii_cyr_gr = ascii_uc + ru_uc + gr_uc # 84 chars
utf8_text = '[α-$ample UTF-8 text-ω]' * 10 # 230 chars, L,N,P,S,Z
utf8_combining = '[α-$ámple UTF-8 téxt-ω]' * 10 # L,N,P,S,Z,M
utf8_ctrl = '[α-$ample\nUTF-8\ntext-ω]' * 10 # L,N,P,S,Z,C
utf8_text = '[α-$ample UTF-8 text-ω]' * 10 # 230 chars, L, N, P, S, Z
utf8_combining = '[α-$ámple UTF-8 téxt-ω]' * 10 # L, N, P, S, Z, M
utf8_ctrl = '[α-$ample\nUTF-8\ntext-ω]' * 10 # L, N, P, S, Z, C
text_jp = '必要なのは、信用ではなく暗号化された証明に基づく電子取引システムであり、これにより希望する二者が信用できる第三者機関を介さずに直接取引できるよう' # 72 chars ('W'ide)
text_zh = '所以,我們非常需要這樣一種電子支付系統,它基於密碼學原理而不基於信用,使得任何達成一致的雙方,能夠直接進行支付,從而不需要協力廠商仲介的參與。。' # 72 chars ('F'ull + 'W'ide)
@ -119,23 +119,23 @@ def getrand(n):
return os.urandom(n)
def getrandnum(n):
return int(getrand(n).hex(),16)
return int(getrand(n).hex(), 16)
def getrandhex(n):
return getrand(n).hex()
def getrandnum_range(nbytes,rn_max):
def getrandnum_range(nbytes, rn_max):
while True:
rn = int(getrand(nbytes).hex(),16)
rn = int(getrand(nbytes).hex(), 16)
if rn < rn_max:
return rn
def getrandstr(num_chars,no_space=False):
n,m = (94,33) if no_space else (95,32)
return ''.join( chr(i % n + m) for i in list(getrand(num_chars)) )
def getrandstr(num_chars, no_space=False):
n, m = (94, 33) if no_space else (95, 32)
return ''.join(chr(i % n + m) for i in list(getrand(num_chars)))
# Windows uses non-UTF8 encodings in filesystem, so use raw bytes here
def cleandir(d,do_msg=False):
def cleandir(d, do_msg=False):
d_enc = d.encode()
try:
@ -151,13 +151,13 @@ def cleandir(d,do_msg=False):
try:
os.unlink(os.path.join(d_enc, f))
except:
rmtree(os.path.join(d_enc,f), ignore_errors=True)
rmtree(os.path.join(d_enc, f), ignore_errors=True)
return files
def mk_tmpdir(d):
try:
os.makedirs( d, mode=0o755, exist_ok=True )
os.makedirs(d, mode=0o755, exist_ok=True)
except OSError as e:
if e.errno != 17:
raise
@ -196,29 +196,29 @@ def clean(cfgs, tmpdir_ids=None, extra_dirs=[]):
print(f'Removing non-directory ‘{d}')
os.unlink(d)
def get_tmpfile(cfg,fn):
return os.path.join(cfg['tmpdir'],fn)
def get_tmpfile(cfg, fn):
return os.path.join(cfg['tmpdir'], fn)
def write_to_file(fn,data,binary=False):
def write_to_file(fn, data, binary=False):
write_data_to_file(
cfg,
fn,
data,
quiet = True,
binary = binary,
ignore_opt_outdir = True )
ignore_opt_outdir = True)
def write_to_tmpfile(cfg,fn,data,binary=False):
write_to_file( os.path.join(cfg['tmpdir'],fn), data=data, binary=binary )
def write_to_tmpfile(cfg, fn, data, binary=False):
write_to_file(os.path.join(cfg['tmpdir'], fn), data=data, binary=binary)
def read_from_file(fn,binary=False):
return get_data_from_file( cfg, fn, quiet=True, binary=binary )
def read_from_file(fn, binary=False):
return get_data_from_file(cfg, fn, quiet=True, binary=binary)
def read_from_tmpfile(cfg,fn,binary=False):
return read_from_file(os.path.join(cfg['tmpdir'],fn),binary=binary)
def read_from_tmpfile(cfg, fn, binary=False):
return read_from_file(os.path.join(cfg['tmpdir'], fn), binary=binary)
def joinpath(*args,**kwargs):
return os.path.join(*args,**kwargs)
def joinpath(*args, **kwargs):
return os.path.join(*args, **kwargs)
def ok(text='OK'):
if cfg.profile:
@ -228,25 +228,25 @@ def ok(text='OK'):
else:
msg(f' {text}')
def cmp_or_die(s,t,desc=None):
def cmp_or_die(s, t, desc=None):
if s != t:
die( 'TestSuiteFatalException',
die('TestSuiteFatalException',
(f'For {desc}:\n' if desc else '') +
f'ERROR: recoded data:\n{t!r}\ndiffers from original data:\n{s!r}'
)
def init_coverage():
coverdir = os.path.join('test','trace')
acc_file = os.path.join('test','trace.acc')
coverdir = os.path.join('test', 'trace')
acc_file = os.path.join('test', 'trace.acc')
try:
os.mkdir(coverdir,0o755)
os.mkdir(coverdir, 0o755)
except:
pass
return coverdir,acc_file
return coverdir, acc_file
def silence():
if not (cfg.verbose or cfg.exact_output):
gv.stdout = gv.stderr = open(os.devnull,'w')
gv.stdout = gv.stderr = open(os.devnull, 'w')
def end_silence():
if not (cfg.verbose or cfg.exact_output):
@ -287,31 +287,31 @@ def end_msg(t):
('' if cfg.test_suite_deterministic else f', elapsed time: {t//60:02d}:{t%60:02d}')
))
def start_test_daemons(*network_ids,remove_datadir=False):
def start_test_daemons(*network_ids, remove_datadir=False):
if not cfg.no_daemon_autostart:
return test_daemons_ops(*network_ids,op='start',remove_datadir=remove_datadir)
return test_daemons_ops(*network_ids, op='start', remove_datadir=remove_datadir)
def stop_test_daemons(*network_ids,force=False,remove_datadir=False):
def stop_test_daemons(*network_ids, force=False, remove_datadir=False):
if force or not cfg.no_daemon_stop:
return test_daemons_ops(*network_ids,op='stop',remove_datadir=remove_datadir)
return test_daemons_ops(*network_ids, op='stop', remove_datadir=remove_datadir)
def restart_test_daemons(*network_ids,remove_datadir=False):
def restart_test_daemons(*network_ids, remove_datadir=False):
if not stop_test_daemons(*network_ids):
return False
return start_test_daemons(*network_ids,remove_datadir=remove_datadir)
return start_test_daemons(*network_ids, remove_datadir=remove_datadir)
def test_daemons_ops(*network_ids,op,remove_datadir=False):
def test_daemons_ops(*network_ids, op, remove_datadir=False):
if not cfg.no_daemon_autostart:
from mmgen.daemon import CoinDaemon
silent = not (cfg.verbose or cfg.exact_output)
ret = False
for network_id in network_ids:
d = CoinDaemon(cfg,network_id,test_suite=True)
d = CoinDaemon(cfg, network_id, test_suite=True)
if remove_datadir:
d.wait = True
d.stop(silent=True)
d.remove_datadir()
ret = d.cmd(op,silent=silent)
ret = d.cmd(op, silent=silent)
return ret
tested_solc_ver = '0.8.26'
@ -319,9 +319,9 @@ tested_solc_ver = '0.8.26'
def check_solc_ver():
cmd = 'python3 scripts/create-token.py --check-solc-version'
try:
cp = run(cmd.split(),check=False,stdout=PIPE)
cp = run(cmd.split(), check=False, stdout=PIPE)
except Exception as e:
die(4,f'Unable to execute {cmd!r}: {e}')
die(4, f'Unable to execute {cmd!r}: {e}')
res = cp.stdout.decode().strip()
if cp.returncode == 0:
omsg(
@ -335,15 +335,15 @@ def check_solc_ver():
return False
def get_ethkey():
cmdnames = ('ethkey','openethereum-ethkey')
cmdnames = ('ethkey', 'openethereum-ethkey')
for cmdname in cmdnames:
try:
run([cmdname,'--help'],stdout=PIPE)
run([cmdname, '--help'], stdout=PIPE)
except:
pass
else:
return cmdname
die(1,f'ethkey executable not found (tried {cmdnames})')
die(1, f'ethkey executable not found (tried {cmdnames})')
def do_run(cmd, check=True):
return run(cmd, stdout=PIPE, stderr=DEVNULL, check=check)
@ -386,7 +386,7 @@ class VirtBlockDeviceBase:
def attach(self, dev_mode=None, silent=False):
if res := self._get_associations():
die(2, f'Device{suf(res)} {fmt_list(res,fmt="barest")} already associated with {self.img_path}')
die(2, f'Device{suf(res)} {fmt_list(res, fmt="barest")} already associated with {self.img_path}')
dev = self.get_new_dev()
if dev_mode:
self.dev_mode_orig = '0{:o}'.format(os.stat(dev).st_mode & 0xfff)

View file

@ -21,7 +21,7 @@ def _pubkey_to_pub_point(vk_bytes):
except Exception as e:
raise ValueError(f'invalid pubkey {vk_bytes.hex()}\n {type(e).__name__}: {e}')
def _check_pub_point(pub_point,vk_bytes,addend_bytes=None):
def _check_pub_point(pub_point, vk_bytes, addend_bytes=None):
if pub_point is ecdsa.ellipticcurve.INFINITY:
raise ValueError(
'pubkey {}{} produced key with point at infinity!'.format(
@ -31,7 +31,7 @@ def _check_pub_point(pub_point,vk_bytes,addend_bytes=None):
def pubkey_check_pyecdsa(vk_bytes):
_check_pub_point(_pubkey_to_pub_point(vk_bytes), vk_bytes)
def pubkey_tweak_add_pyecdsa(vk_bytes,pk_addend_bytes):
def pubkey_tweak_add_pyecdsa(vk_bytes, pk_addend_bytes):
pk_addend = int.from_bytes(pk_addend_bytes, byteorder='big')
point_sum = (
_pubkey_to_pub_point(vk_bytes) +

View file

@ -20,20 +20,20 @@
test.include.pexpect: pexpect implementation for MMGen test suites
"""
import sys,time
from mmgen.color import red,yellow,green,cyan
from mmgen.util import msg,msg_r,rmsg,die
from .common import cfg,vmsg,vmsg_r,getrandstr,strip_ansi_escapes
import sys, time
from mmgen.color import red, yellow, green, cyan
from mmgen.util import msg, msg_r, rmsg, die
from .common import cfg, vmsg, vmsg_r, getrandstr, strip_ansi_escapes
try:
import pexpect
from pexpect.popen_spawn import PopenSpawn
except ImportError as e:
die(2,red(f'Pexpect module is missing. Cannnot run test suite ({e!r})'))
die(2, red(f'Pexpect module is missing. Cannnot run test suite ({e!r})'))
def debug_pexpect_msg(p):
msg('\n{}{}{}'.format( red('BEFORE ['), p.before, red(']') ))
msg('{}{}{}'.format( red('MATCH ['), p.after, red(']') ))
msg('\n{}{}{}'.format(red('BEFORE ['), p.before, red(']')))
msg('{}{}{}'.format(red('MATCH ['), p.after, red(']')))
NL = '\n'
@ -47,7 +47,7 @@ class MMGenPexpect:
pexpect_spawn = False,
send_delay = None,
timeout = None,
direct_exec = False ):
direct_exec = False):
self.pexpect_spawn = pexpect_spawn
self.send_delay = send_delay
@ -57,15 +57,15 @@ class MMGenPexpect:
self.exit_val = None
if direct_exec or cfg.direct_exec:
from subprocess import Popen,DEVNULL
from subprocess import Popen, DEVNULL
redir = DEVNULL if (no_output or not cfg.exact_output) else None
self.ep = Popen([args[0]] + args[1:], stderr=redir, env=spawn_env )
self.ep = Popen([args[0]] + args[1:], stderr=redir, env=spawn_env)
else:
timeout = int(timeout or cfg.pexpect_timeout or 0) or (60,5)[bool(cfg.debug_pexpect)]
timeout = int(timeout or cfg.pexpect_timeout or 0) or (60, 5)[bool(cfg.debug_pexpect)]
if pexpect_spawn:
self.p = pexpect.spawn(args[0],args[1:],encoding='utf8',timeout=timeout,env=spawn_env)
self.p = pexpect.spawn(args[0], args[1:], encoding='utf8', timeout=timeout, env=spawn_env)
else:
self.p = PopenSpawn(args,encoding='utf8',timeout=timeout,env=spawn_env)
self.p = PopenSpawn(args, encoding='utf8', timeout=timeout, env=spawn_env)
if cfg.exact_output:
self.p.logfile = sys.stdout
@ -76,20 +76,20 @@ class MMGenPexpect:
desc = 'key-address data',
check = True,
have_yes_opt = False):
self.passphrase(desc,pw)
self.passphrase(desc, pw)
if not have_yes_opt:
self.expect('Check key-to-address validity? (y/N): ',('n','y')[check])
self.expect('Check key-to-address validity? (y/N): ', ('n', 'y')[check])
def view_tx(self,view):
self.expect(r'View.* transaction.*\? .*: ',view,regex=True)
def view_tx(self, view):
self.expect(r'View.* transaction.*\? .*: ', view, regex=True)
if view not in 'n\n':
self.expect('to continue: ','\n')
self.expect('to continue: ', '\n')
def do_comment(self,add_comment,has_label=False):
p = ('Add a comment to transaction','Edit transaction comment')[has_label]
self.expect(f'{p}? (y/N): ',('n','y')[bool(add_comment)])
def do_comment(self, add_comment, has_label=False):
p = ('Add a comment to transaction', 'Edit transaction comment')[has_label]
self.expect(f'{p}? (y/N): ', ('n', 'y')[bool(add_comment)])
if add_comment:
self.expect('Comment: ',add_comment+'\n')
self.expect('Comment: ', add_comment+'\n')
def ok(self, exit_val=None):
if not self.pexpect_spawn:
@ -97,81 +97,81 @@ class MMGenPexpect:
self.p.read()
ret = self.p.wait()
if ret != (self.exit_val or exit_val or 0) and not cfg.coverage:
die( 'TestSuiteSpawnedScriptException', f'Spawned script exited with value {ret}' )
die('TestSuiteSpawnedScriptException', f'Spawned script exited with value {ret}')
if cfg.profile:
return
if not self.skip_ok:
m = 'OK\n' if ret == 0 else f'OK[{ret}]\n'
sys.stderr.write( green(m) if cfg.exact_output or cfg.verbose else ' '+m )
sys.stderr.write(green(m) if cfg.exact_output or cfg.verbose else ' '+m)
return self
def license(self):
if self.spawn_env.get('MMGEN_NO_LICENSE'):
return
self.expect("'w' for conditions and warranty info, or 'c' to continue: ",'c')
self.expect("'w' for conditions and warranty info, or 'c' to continue: ", 'c')
def label(self,label='Test Label (UTF-8) α'):
self.expect('Enter a wallet label, or hit ENTER for no label: ',label+'\n')
def label(self, label='Test Label (UTF-8) α'):
self.expect('Enter a wallet label, or hit ENTER for no label: ', label+'\n')
def usr_rand(self,num_chars):
def usr_rand(self, num_chars):
if cfg.usr_random:
self.interactive()
self.send('\n')
else:
rand_chars = list(getrandstr(num_chars,no_space=True))
rand_chars = list(getrandstr(num_chars, no_space=True))
vmsg_r('SEND ')
while rand_chars:
ch = rand_chars.pop(0)
msg_r(yellow(ch)+' ' if cfg.verbose else '+')
self.expect('left: ',ch,delay=0.005)
self.expect('ENTER to continue: ','\n')
self.expect('left: ', ch, delay=0.005)
self.expect('ENTER to continue: ', '\n')
def passphrase_new(self,desc,passphrase):
self.expect(f'Enter passphrase for {desc}: ',passphrase+'\n')
self.expect('Repeat passphrase: ',passphrase+'\n')
def passphrase_new(self, desc, passphrase):
self.expect(f'Enter passphrase for {desc}: ', passphrase+'\n')
self.expect('Repeat passphrase: ', passphrase+'\n')
def passphrase(self,desc,passphrase,pwtype=''):
def passphrase(self, desc, passphrase, pwtype=''):
if pwtype:
pwtype += ' '
self.expect(f'Enter {pwtype}passphrase for {desc}.*?: ',passphrase+'\n',regex=True)
self.expect(f'Enter {pwtype}passphrase for {desc}.*?: ', passphrase+'\n', regex=True)
def hash_preset(self,desc,preset=''):
def hash_preset(self, desc, preset=''):
self.expect(f'Enter hash preset for {desc}')
self.expect('or hit ENTER .*?:',str(preset)+'\n',regex=True)
self.expect('or hit ENTER .*?:', str(preset)+'\n', regex=True)
def written_to_file(self,desc,overwrite_unlikely=False,query='Overwrite? '):
def written_to_file(self, desc, overwrite_unlikely=False, query='Overwrite? '):
s1 = f'{desc} written to file '
s2 = query + "Type uppercase 'YES' to confirm: "
ret = self.expect(([s1,s2],s1)[overwrite_unlikely])
ret = self.expect(([s1, s2], s1)[overwrite_unlikely])
if ret == 1:
self.send('YES\n')
return self.expect_getend("Overwriting file '").rstrip("'")
self.expect(NL,nonl=True)
self.expect(NL, nonl=True)
outfile = self.p.before.strip().strip("'")
if cfg.debug_pexpect:
rmsg(f'Outfile [{outfile}]')
vmsg('{} file: {}'.format( desc, cyan(outfile.replace('"',"")) ))
vmsg('{} file: {}'.format(desc, cyan(outfile.replace('"', ""))))
return outfile
def hincog_create(self,hincog_bytes):
ret = self.expect(['Create? (Y/n): ',"'YES' to confirm: "])
def hincog_create(self, hincog_bytes):
ret = self.expect(['Create? (Y/n): ', "'YES' to confirm: "])
if ret == 0:
self.send('\n')
self.expect('Enter file size: ',str(hincog_bytes)+'\n')
self.expect('Enter file size: ', str(hincog_bytes)+'\n')
else:
self.send('YES\n')
return ret
def no_overwrite(self):
self.expect("Overwrite? Type uppercase 'YES' to confirm: ",'\n')
self.expect("Overwrite? Type uppercase 'YES' to confirm: ", '\n')
self.expect('Exiting at user request')
def expect_getend(self,s,regex=False):
self.expect(s,regex=regex,nonl=True)
def expect_getend(self, s, regex=False):
self.expect(s, regex=regex, nonl=True)
if cfg.debug_pexpect:
debug_pexpect_msg(self.p)
# readline() of partial lines doesn't work with PopenSpawn, so do this instead:
self.expect(NL,nonl=True,silent=True)
self.expect(NL, nonl=True, silent=True)
if cfg.debug_pexpect:
debug_pexpect_msg(self.p)
end = self.p.before.rstrip()
@ -182,21 +182,21 @@ class MMGenPexpect:
def interactive(self):
return self.p.interact() # interact() not available with popen_spawn
def kill(self,signal):
def kill(self, signal):
return self.p.kill(signal)
def match_expect_list(self,expect_list,greedy=False):
def match_expect_list(self, expect_list, greedy=False):
allrep = '.*' if greedy else '.*?'
expect = (
r'(\b|\s)' +
fr'\s{allrep}\s'.join(s.replace(r'.',r'\.').replace(' ',r'\s+') for s in expect_list) +
r'(\b|\s)' )
fr'\s{allrep}\s'.join(s.replace(r'.', r'\.').replace(' ', r'\s+') for s in expect_list) +
r'(\b|\s)')
import re
m = re.search( expect, self.read(strip_color=True), re.DOTALL )
m = re.search(expect, self.read(strip_color=True), re.DOTALL)
assert m, f'No match found for regular expression {expect!r}'
return m
def expect(self,s,t='',delay=None,regex=False,nonl=False,silent=False):
def expect(self, s, t='', delay=None, regex=False, nonl=False, silent=False):
if not silent:
if cfg.verbose:
@ -205,7 +205,7 @@ class MMGenPexpect:
msg_r('+')
try:
ret = (self.p.expect_exact,self.p.expect)[bool(regex)](s) if s else 0
ret = (self.p.expect_exact, self.p.expect)[bool(regex)](s) if s else 0
except pexpect.TIMEOUT as e:
if cfg.debug_pexpect:
raise
@ -217,20 +217,20 @@ class MMGenPexpect:
if cfg.debug_pexpect:
debug_pexpect_msg(self.p)
if cfg.verbose and not isinstance(s,str):
if cfg.verbose and not isinstance(s, str):
msg_r(f' ==> {ret} ')
if ret == -1:
die(4,f'Error. Expect returned {ret}')
die(4, f'Error. Expect returned {ret}')
else:
if t:
self.send(t,delay,s)
self.send(t, delay, s)
else:
if not nonl and not silent:
vmsg('')
return ret
def send(self,t,delay=None,s=False):
def send(self, t, delay=None, s=False):
delay = delay or self.send_delay
if delay:
time.sleep(delay)
@ -241,12 +241,12 @@ class MMGenPexpect:
if cfg.verbose:
ls = '' if cfg.debug or not s else ' '
es = '' if s else ' '
yt = yellow('{!r}'.format( t.replace('\n',r'\n') ))
yt = yellow('{!r}'.format(t.replace('\n', r'\n')))
msg(f'{ls}SEND {es}{yt}')
return ret
def read(self,n=-1,strip_color=False):
return strip_ansi_escapes(self.p.read(n)).replace('\r','') if strip_color else self.p.read(n)
def read(self, n=-1, strip_color=False):
return strip_ansi_escapes(self.p.read(n)).replace('\r', '') if strip_color else self.p.read(n)
def close(self):
if self.pexpect_spawn:

View file

@ -12,7 +12,7 @@
test.include.test_init: Initialization module for test scripts
"""
import sys,os
import sys, os
from pathlib import PurePath
os.environ['MMGEN_TEST_SUITE'] = '1'
repo_root = str(PurePath(*PurePath(__file__).parts[:-3]))