test suite: use match statement where practicable (21 files)

This commit is contained in:
The MMGen Project 2025-09-23 09:20:53 +00:00
commit 4573e170ed
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
21 changed files with 397 additions and 386 deletions

View file

@ -143,29 +143,30 @@ class CmdTestAutosignBase(CmdTestBase):
raise
def _create_removable_device(self):
if sys.platform == 'linux':
self.txdev.create()
self.txdev.attach(silent=True)
args = [
'-E', 'root_owner={}:{}'.format(os.getuid(), os.getgid()),
'-L', self.asi.dev_label,
str(self.txdev.img_path)]
redir = DEVNULL
for cmd in ('/sbin/mkfs.ext2', 'mkfs.ext2'):
try:
run([cmd] + args, stdout=redir, stderr=redir, check=True)
break
except:
if cmd == 'mkfs.ext2':
raise
self.txdev.detach(silent=True)
elif sys.platform == 'darwin':
cmd = [
'hdiutil', 'create', '-size', '10M', '-fs', 'exFAT',
'-volname', self.asi.dev_label,
str(self.fs_image_path)]
redir = DEVNULL if self.tr.quiet else None
run(cmd, stdout=redir, check=True)
match sys.platform:
case 'linux':
self.txdev.create()
self.txdev.attach(silent=True)
args = [
'-E', 'root_owner={}:{}'.format(os.getuid(), os.getgid()),
'-L', self.asi.dev_label,
str(self.txdev.img_path)]
redir = DEVNULL
for cmd in ('/sbin/mkfs.ext2', 'mkfs.ext2'):
try:
run([cmd] + args, stdout=redir, stderr=redir, check=True)
break
except:
if cmd == 'mkfs.ext2':
raise
self.txdev.detach(silent=True)
case 'darwin':
cmd = [
'hdiutil', 'create', '-size', '10M', '-fs', 'exFAT',
'-volname', self.asi.dev_label,
str(self.fs_image_path)]
redir = DEVNULL if self.tr.quiet else None
run(cmd, stdout=redir, check=True)
def _macOS_mount_fs_image(self, loc):
time.sleep(0.2)
@ -261,32 +262,35 @@ class CmdTestAutosignBase(CmdTestBase):
if self.live:
return
loc = getattr(self, asi)
if sys.platform == 'linux':
self._set_e2label(loc.dev_label)
self.txdev.attach()
for _ in range(20):
if loc.device_inserted:
break
time.sleep(0.1)
else:
die(2, f'device insert timeout exceeded {loc.dev_label}')
elif sys.platform == 'darwin':
self._macOS_mount_fs_image(loc)
match sys.platform:
case 'linux':
self._set_e2label(loc.dev_label)
self.txdev.attach()
for _ in range(20):
if loc.device_inserted:
break
time.sleep(0.1)
else:
die(2, f'device insert timeout exceeded {loc.dev_label}')
case 'darwin':
self._macOS_mount_fs_image(loc)
def remove_device(self, asi='asi'):
if self.live:
return
loc = getattr(self, asi)
if sys.platform == 'linux':
self.txdev.detach()
for _ in range(20):
if not loc.device_inserted:
break
time.sleep(0.1)
else:
die(2, f'device remove timeout exceeded {loc.dev_label}')
elif sys.platform == 'darwin':
self._macOS_eject_disk(loc.dev_label)
match sys.platform:
case 'linux':
self.txdev.detach()
for _ in range(20):
if not loc.device_inserted:
break
time.sleep(0.1)
else:
die(2, f'device remove timeout exceeded {loc.dev_label}')
case 'darwin':
self._macOS_eject_disk(loc.dev_label)
def _mount_ops(self, loc, cmd, *args, **kwargs):
return getattr(getattr(self, loc), cmd)(*args, silent=self.silent_mount, **kwargs)
@ -837,18 +841,19 @@ class CmdTestAutosign(CmdTestAutosignBase):
# create or delete 2 bad tx files
self.spawn(msg_only=True)
fns = [joinpath(self.asi.tx_dir, f'bad{n}.rawtx') for n in (1, 2)]
if op == 'create':
for fn in fns:
with open(fn, 'w') as fp:
fp.write('bad tx data\n')
self.bad_tx_count = 2
elif op == 'remove':
for fn in fns:
try:
os.unlink(fn)
except:
pass
self.bad_tx_count = 0
match op:
case 'create':
for fn in fns:
with open(fn, 'w') as fp:
fp.write('bad tx data\n')
self.bad_tx_count = 2
case 'remove':
for fn in fns:
try:
os.unlink(fn)
except:
pass
self.bad_tx_count = 0
self.do_umount()
self.remove_device()
return 'ok'
@ -871,26 +876,29 @@ class CmdTestAutosign(CmdTestAutosignBase):
self.insert_device()
self.do_mount()
os.makedirs(destdir, exist_ok=True)
if op.endswith('_invalid'):
fn = os.path.join(destdir, 'DEADBE[BTC].rawmsg.json')
if op == 'create_invalid':
with open(fn, 'w') as fp:
fp.write('bad data\n')
self.bad_msg_count += 1
elif op == 'remove_invalid':
os.unlink(fn)
self.bad_msg_count -= 1
else:
for fn in self.ref_msgfiles:
if op == 'copy':
match op:
case 'create_invalid' | 'remove_invalid':
fn = os.path.join(destdir, 'DEADBE[BTC].rawmsg.json')
if op == 'create_invalid':
with open(fn, 'w') as fp:
fp.write('bad data\n')
self.bad_msg_count += 1
else:
os.unlink(fn)
self.bad_msg_count -= 1
case 'copy':
for fn in self.ref_msgfiles:
if os.path.basename(fn) == 'ED405C[BTC].rawmsg.json': # contains bad Seed ID
self.bad_msg_count += 1
else:
self.good_msg_count += 1
imsg(f'Copying: {fn} -> {destdir}')
shutil.copy2(fn, destdir)
elif op == 'remove_signed':
case 'remove_signed':
for fn in self.ref_msgfiles:
os.unlink(os.path.join(destdir, os.path.basename(fn).replace('rawmsg', 'sigmsg')))
self.do_umount()
self.remove_device()
return 'ok'

View file

@ -95,35 +95,36 @@ amt2 = '888.111122223333444455'
def set_vbals(daemon_id):
global vbal1, vbal2, vbal3, vbal4, vbal5, vbal6, vbal7, vbal9
if daemon_id == 'geth':
vbal1 = '1.2288334'
vbal2 = '99.996560752'
vbal3 = '1.2314176'
vbal4 = '127.0287834'
vbal5 = '999904.14775104212345678'
vbal6 = '999904.14880104212345678'
vbal7 = '999902.91891764212345678'
vbal9 = '1.2262504'
elif daemon_id == 'reth':
vbal1 = '1.2288334'
vbal2 = '99.996560752'
vbal3 = '1.23142525'
vbal3 = '1.2314176'
vbal4 = '127.0287834'
vbal5 = '999904.14775104212345678'
vbal6 = '999904.14880104212345678'
vbal7 = '999902.91891764212345678'
vbal9 = '1.2262504'
else:
vbal1 = '1.2288396'
vbal2 = '99.997088092'
vbal3 = '1.23142525'
vbal3 = '1.2314246'
vbal4 = '127.0287896'
vbal5 = '999904.14828458212345678'
vbal6 = '999904.14933458212345678'
vbal7 = '999902.91944498212345678'
vbal9 = '1.226261'
match daemon_id:
case 'geth':
vbal1 = '1.2288334'
vbal2 = '99.996560752'
vbal3 = '1.2314176'
vbal4 = '127.0287834'
vbal5 = '999904.14775104212345678'
vbal6 = '999904.14880104212345678'
vbal7 = '999902.91891764212345678'
vbal9 = '1.2262504'
case 'reth':
vbal1 = '1.2288334'
vbal2 = '99.996560752'
vbal3 = '1.23142525'
vbal3 = '1.2314176'
vbal4 = '127.0287834'
vbal5 = '999904.14775104212345678'
vbal6 = '999904.14880104212345678'
vbal7 = '999902.91891764212345678'
vbal9 = '1.2262504'
case _:
vbal1 = '1.2288396'
vbal2 = '99.997088092'
vbal3 = '1.23142525'
vbal3 = '1.2314246'
vbal4 = '127.0287896'
vbal5 = '999904.14828458212345678'
vbal6 = '999904.14933458212345678'
vbal7 = '999902.91944498212345678'
vbal9 = '1.226261'
coin = cfg.coin
@ -275,16 +276,17 @@ class CmdTestEthdevMethods:
] + (['--wait'] if mmgen_cmd == 'txdo' else [])
contract_addr = self._get_contract_address(dfl_devaddr)
if key == 'Token':
self.write_to_tmpfile(f'token_addr{num}', contract_addr+'\n')
elif key == 'thorchain_router':
from mmgen.fileutil import write_data_to_file
write_data_to_file(
self.cfg,
thorchain_router_addr_file,
contract_addr + '\n',
ask_overwrite = False,
quiet = True)
match key:
case 'Token':
self.write_to_tmpfile(f'token_addr{num}', contract_addr+'\n')
case 'thorchain_router':
from mmgen.fileutil import write_data_to_file
write_data_to_file(
self.cfg,
thorchain_router_addr_file,
contract_addr + '\n',
ask_overwrite = False,
quiet = True)
if mmgen_cmd == 'txdo':
args += ['-k', keyfile]
@ -395,10 +397,11 @@ class CmdTestEthdevMethods:
silence()
usr_addrs = list(map(gen_addr, usr_mmaddrs))
if op == 'show_bals':
await show_bals(await self.rpc)
elif op == 'fund_user':
await fund_user(await self.rpc)
match op:
case 'show_bals':
await show_bals(await self.rpc)
case 'fund_user':
await fund_user(await self.rpc)
end_silence()
return 'ok'
@ -907,10 +910,11 @@ class CmdTestEthdev(CmdTestEthdevMethods, CmdTestBase, CmdTestShared):
t = self.spawn(
'mmgen-cli',
[f'--coin={self.proto.coin}', '--regtest=1', 'eth_getBalance', '0x'+dfl_devaddr, 'latest'])
if self.daemon.id == 'geth':
t.expect('0x33b2e3c91ec0e9113986000')
elif self.daemon.id == 'reth':
t.expect('0xd3c21bab45fb313f0000')
match self.daemon.id:
case 'geth':
t.expect('0x33b2e3c91ec0e9113986000')
case 'reth':
t.expect('0xd3c21bab45fb313f0000')
return t
async def _wallet_upgrade(self, src_fn, expect1, expect2=None):

View file

@ -22,15 +22,16 @@ class EtherscanServer(HTTPD):
content_type = 'text/html'
def make_response_body(self, method, environ):
if method == 'GET':
target = 'form'
elif method == 'POST':
target = 'result'
length = int(environ.get('CONTENT_LENGTH', '0'))
qs = environ['wsgi.input'].read(length).decode()
tx = [s for s in qs.split('&') if 'RawTx=' in s][0].split('=')[1]
keccak_256 = get_keccak()
txid = '0x' + keccak_256(bytes.fromhex(tx[2:])).hexdigest()
match method:
case 'GET':
target = 'form'
case 'POST':
target = 'result'
length = int(environ.get('CONTENT_LENGTH', '0'))
qs = environ['wsgi.input'].read(length).decode()
tx = [s for s in qs.split('&') if 'RawTx=' in s][0].split('=')[1]
keccak_256 = get_keccak()
txid = '0x' + keccak_256(bytes.fromhex(tx[2:])).hexdigest()
with open(f'test/ref/ethereum/etherscan-{target}.html') as fh:
text = fh.read()

View file

@ -47,20 +47,21 @@ def stealth_mnemonic_entry(t, mne, mn, entry_mode, pad_entry=False):
ret.append(w)
return ret
if entry_mode == 'fixed':
mn = ['bkr'] + mn[:5] + ['nfb'] + mn[5:]
ssl = mne.uniq_ss_len
def gen_mn():
for w in mn:
if len(w) >= ssl:
yield w[:ssl]
else:
yield w[0] + 'z\b' + '#' * (ssl-len(w)) + w[1:]
mn = list(gen_mn())
elif entry_mode in ('full', 'short'):
mn = ['fzr'] + mn[:5] + ['grd', 'grdbxm'] + mn[5:]
mn = pad_mnemonic(mn, mne.em.ss_len)
mn[10] = '@#$%*##' + mn[10]
match entry_mode:
case 'fixed':
mn = ['bkr'] + mn[:5] + ['nfb'] + mn[5:]
ssl = mne.uniq_ss_len
def gen_mn():
for w in mn:
if len(w) >= ssl:
yield w[:ssl]
else:
yield w[0] + 'z\b' + '#' * (ssl-len(w)) + w[1:]
mn = list(gen_mn())
case 'full' | 'short':
mn = ['fzr'] + mn[:5] + ['grd', 'grdbxm'] + mn[5:]
mn = pad_mnemonic(mn, mne.em.ss_len)
mn[10] = '@#$%*##' + mn[10]
wnum = 1
p_ok, p_err = mne.word_prompt

View file

@ -88,10 +88,13 @@ class CmdTestPexpect:
def view_tx(self, view):
self.expect(r'View.* transaction.*\? .*: ', view, regex=True)
if cfg.pexpect_spawn and view == 'v':
self.expect('END', 'q')
elif view not in 'vn\n':
self.expect('to continue: ', '\n')
match view:
case 'v' if cfg.pexpect_spawn:
self.expect('END', 'q')
case 'v' | 'n' | '\n':
pass
case _:
self.expect('to continue: ', '\n')
def do_comment(self, add_comment, has_label=False):
p = ('Add a comment to transaction', 'Edit transaction comment')[has_label]

View file

@ -496,26 +496,28 @@ class CmdTestRunner:
print(r+('\n'+r).join(self.warnings))
def process_retval(self, cmd, ret):
if type(ret).__name__ == 'CmdTestPexpect':
ret.ok(exit_val=self.exit_val)
self.cmd_total += 1
elif ret == 'ok':
ok()
self.cmd_total += 1
elif ret in ('skip', 'skip_msg', 'silent'):
if ret == 'silent':
match ret:
case x if type(x).__name__ == 'CmdTestPexpect':
ret.ok(exit_val=self.exit_val)
self.cmd_total += 1
elif ret == 'skip_msg':
case 'ok':
ok()
self.cmd_total += 1
case 'skip':
pass
case 'skip_msg':
ok('SKIP')
elif ret == 'error':
die(2, red(f'\nTest {self.tg.test_name!r} failed'))
elif isinstance(ret, tuple) and ret[0] == 'skip_warn':
wmsg = 'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n')))
self.skipped_warnings.append(wmsg)
if self.logging:
self.log_fd.write(f'WARNING: {wmsg}\n')
else:
die(2, f'{cmd!r} returned {ret}')
case 'silent':
self.cmd_total += 1
case 'error':
die(2, red(f'\nTest {self.tg.test_name!r} failed'))
case (x, _) if x == 'skip_warn':
wmsg = 'Test {!r} was skipped:\n {}'.format(cmd, '\n '.join(ret[1].split('\n')))
self.skipped_warnings.append(wmsg)
if self.logging:
self.log_fd.write(f'WARNING: {wmsg}\n')
case _:
die(2, f'{cmd!r} returned {ret}')
def warn(self, text):
ymsg(text)

View file

@ -823,22 +823,22 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
ocls = get_wallet_cls(fmt_code=out_fmt)
if ocls.enc and ocls.type != 'brain':
t.passphrase_new('new '+ocls.desc, self.wpasswd)
t.usr_rand(self.usr_rand_chars)
if ocls.type.startswith('incog'):
m = 'Encrypting random data from your operating system with ephemeral key'
t.expect(m)
t.expect(m)
incog_id = t.expect_getend('New Incog Wallet ID: ')
t.expect(m)
if ocls.type == 'incog_hidden':
self.write_to_tmpfile(incog_id_fn, incog_id)
t.hincog_create(hincog_bytes)
elif ocls.type == 'mmgen':
t.label()
if ocls.enc:
if ocls.type != 'brain':
t.passphrase_new('new '+ocls.desc, self.wpasswd)
t.usr_rand(self.usr_rand_chars)
match ocls.type:
case 'incog' | 'incog_hex' | 'incog_hidden':
m = 'Encrypting random data from your operating system with ephemeral key'
t.expect(m)
t.expect(m)
incog_id = t.expect_getend('New Incog Wallet ID: ')
t.expect(m)
if ocls.type == 'incog_hidden':
self.write_to_tmpfile(incog_id_fn, incog_id)
t.hincog_create(hincog_bytes)
case 'mmgen':
t.label()
return t.written_to_file(capfirst(ocls.desc)), t

View file

@ -111,14 +111,15 @@ class CmdTestMisc(CmdTestBase):
files = get_file_with_ext('test/ref/monero', 'tx', no_dot=True, delete=False, return_list=True)
t = self.spawn('mmgen-xmrwallet', [op] + files)
res = t.read(strip_color=True)
if op == 'txview':
for s in (
'Amount: 0.74 XMR',
'Dest: 56VQ9M6k',
):
assert s in res, f'{s} not in {res}'
elif op == 'txlist':
assert re.search('3EBD06-.*D94583-.*8BFA29-', res, re.DOTALL)
match op:
case 'txview':
for s in (
'Amount: 0.74 XMR',
'Dest: 56VQ9M6k',
):
assert s in res, f'{s} not in {res}'
case 'txlist':
assert re.search('3EBD06-.*D94583-.*8BFA29-', res, re.DOTALL)
return t
def xmrwallet_txlist(self):

View file

@ -674,14 +674,14 @@ class CmdTestXMRWallet(CmdTestBase):
ignore_battery = True, # ignore battery state (on laptop)
miner_address = addr, # account address to mine to
threads_count = 1) # number of mining threads to run
status = self.get_status(ret)
if status == 'OK':
return True
elif status == 'BUSY':
await asyncio.sleep(5)
omsg('Daemon busy. Attempting to start mining...')
else:
die(2, f'Monerod returned status {status}')
match self.get_status(ret):
case 'OK':
return True
case 'BUSY':
await asyncio.sleep(5)
omsg('Daemon busy. Attempting to start mining...')
case status:
die(2, f'Monerod returned status {status}')
die(2, 'Max retries exceeded')
async def stop_mining(self):

View file

@ -15,20 +15,21 @@ from ..include.common import cfg, silence, end_silence, restart_test_daemons, st
def get_obj(coin, network, msghash_type):
if coin == 'bch':
addrlists = 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1'
elif coin == 'eth':
addrlists = 'DEADBEEF:E:1-20 98831F3A:E:8,2 A091ABAA:E:111'
else:
# A091ABAA = 98831F3A:5S
addrlists = 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1'
def get_addrlists():
match coin:
case 'bch':
return 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1'
case 'eth':
return 'DEADBEEF:E:1-20 98831F3A:E:8,2 A091ABAA:E:111'
case _:
return 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1'
return NewMsg(
cfg = cfg,
coin = coin,
network = network,
message = '08/Jun/2021 Bitcoin Law Enacted by El Salvador Legislative Assembly',
addrlists = addrlists,
addrlists = get_addrlists(),
msghash_type = msghash_type)
def print_total(n):

View file

@ -490,57 +490,44 @@ def get_protos(proto, addr_type, toolname):
def parse_args():
if len(cfg._args) != 2:
cfg._usage()
all_backends, gen2, tool = (False, None, None)
arg1, arg2 = cfg._args
gen1, gen2, rounds = (0, 0, 0)
tool, all_backends, dumpfile = (None, None, None)
match cfg._args:
case (gen1, rounds) if is_int(gen1) and is_int(rounds):
test, dumpfile = ('speed', None)
case (gen1, dumpfile) if is_int(gen1) and os.access(dumpfile, os.R_OK):
test, rounds = ('dump', None)
case (ab, rounds) if (ab := ab.split(':')) and is_int(rounds):
test, dumpfile = ('ab', None)
if is_int(arg1) and is_int(arg2):
test = 'speed'
gen1 = arg1
rounds = arg2
elif is_int(arg1) and os.access(arg2, os.R_OK):
test = 'dump'
gen1 = arg1
dumpfile = arg2
else:
test = 'ab'
rounds = arg2
match ab[0]:
case x if is_int(x):
gen1 = x
case 'all':
all_backends = True
gen1 = None
case _:
die(1, "First part of first argument must be a generator backend number or 'all'")
if not is_int(arg2):
die(1, 'Second argument must be dump filename or integer rounds specification')
try:
a, b = arg1.split(':')
except:
die(1, 'First argument must be a generator backend number or two colon-separated arguments')
if is_int(a):
gen1 = a
else:
if a == 'all':
all_backends = True
else:
die(1, "First part of first argument must be a generator backend number or 'all'")
if is_int(b):
if cfg.all_coins:
die(1, '--all-coins must be used with external tool only')
gen2 = b
else:
tool = b
ext_progs = list(cinfo.external_tests[cfg._proto.network]) + ['ext']
if b not in ext_progs:
die(1, f'Second part of first argument must be a generator backend number or one of {ext_progs}')
match ab[1]:
case x if is_int(x):
if cfg.all_coins:
die(1, '--all-coins must be used with external tool only')
gen2 = x
case x:
tool = x
ext_progs = list(cinfo.external_tests[cfg._proto.network]) + ['ext']
if tool not in ext_progs:
die(1, f'Second part of first argument must be a generator backend number or one of {ext_progs}')
case _:
cfg._usage()
return namedtuple('parsed_args',
['test', 'gen1', 'gen2', 'rounds', 'tool', 'all_backends', 'dumpfile'])(
test,
int(gen1) or None,
int(gen2) or None,
int(rounds) or None,
None if gen1 is None else int(gen1),
None if gen2 is None else int(gen2),
None if rounds is None else int(rounds),
tool,
all_backends,
dumpfile)
@ -551,17 +538,19 @@ def main():
addr_type = MMGenAddrType(proto=proto, id_str=cfg.type or proto.dfl_mmtype)
if scfg.test == 'ab':
protos = get_protos(proto, addr_type, scfg.tool) if cfg.all_coins else [proto]
for p in protos:
ab_test(p, scfg)
else:
kg = KeyGenerator(cfg, proto, addr_type.pubkey_type, backend=scfg.gen1)
ag = AddrGenerator(cfg, proto, addr_type)
if scfg.test == 'speed':
speed_test(proto, kg, ag, scfg.rounds)
elif scfg.test == 'dump':
dump_test(proto, kg, ag, scfg.dumpfile)
match scfg.test:
case 'ab':
protos = get_protos(proto, addr_type, scfg.tool) if cfg.all_coins else [proto]
for p in protos:
ab_test(p, scfg)
case 'speed' | 'dump':
kg = KeyGenerator(cfg, proto, addr_type.pubkey_type, backend=scfg.gen1)
ag = AddrGenerator(cfg, proto, addr_type)
match scfg.test:
case 'speed':
speed_test(proto, kg, ag, scfg.rounds)
case 'dump':
dump_test(proto, kg, ag, scfg.dumpfile)
if saved_results:
import json

View file

@ -387,21 +387,23 @@ def create_addrpairs(proto, mmtype, num):
for m in range(num)]
def VirtBlockDevice(img_path, size):
if sys.platform == 'linux':
return VirtBlockDeviceLinux(img_path, size)
elif sys.platform == 'darwin':
return VirtBlockDeviceMacOS(img_path, size)
match sys.platform:
case 'linux':
return VirtBlockDeviceLinux(img_path, size)
case 'darwin':
return VirtBlockDeviceMacOS(img_path, size)
class VirtBlockDeviceBase:
@property
def dev(self):
res = self._get_associations()
if len(res) < 1:
die(2, f'No device associated with {self.img_path}')
elif len(res) > 1:
die(2, f'More than one device associated with {self.img_path}')
return res[0]
match self._get_associations():
case [x]:
return x
case []:
die(2, f'No device associated with {self.img_path}')
case _:
die(2, f'More than one device associated with {self.img_path}')
def try_detach(self):
try:

View file

@ -35,16 +35,16 @@ msg(f'Usr cfg file: {os.path.relpath(cf_usr.fn)}')
msg(f'Sys cfg file: {os.path.relpath(cf_sys.fn)}')
msg(f'Sample cfg file: {os.path.relpath(cf_sample.fn)}')
if op:
if op == 'print_cfg':
match op:
case 'print_cfg':
for name in args:
msg('{} {}'.format(name+':', getattr(cfg, name)))
elif op == 'parse_test':
case 'parse_test':
ps = cf_sample.get_lines()
msg(f'parsed chunks: {len(ps)}')
pu = cf_usr.get_lines()
msg('usr cfg: {}'.format(' '.join(f'{i.name}={i.value}' for i in pu)))
elif op == 'coin_specific_vars':
case 'coin_specific_vars':
for varname in args:
msg('{}.{}.{}: {}'.format(
cfg._proto.coin.lower(),
@ -52,11 +52,11 @@ if op:
varname,
getattr(cfg._proto, varname)
))
elif op == 'autoset_opts':
case 'autoset_opts':
assert cfg.rpc_backend == 'aiohttp', "cfg.rpc_backend != 'aiohttp'"
elif op == 'autoset_opts_cmdline':
case 'autoset_opts_cmdline':
assert cfg.rpc_backend == 'curl', "cfg.rpc_backend != 'curl'"
elif op == 'mnemonic_entry_modes':
case 'mnemonic_entry_modes':
from mmgen.mn_entry import mn_entry
msg('mnemonic_entry_modes: {}\nmmgen: {}\nbip39: {}'.format(
cfg.mnemonic_entry_modes,

View file

@ -18,28 +18,25 @@ opts_data = {
cfg = Config(opts_data=opts_data)
cmd_args = cfg._args
cmd = cmd_args[0]
if cmd == 'passphrase':
from mmgen.ui import get_words_from_user
pw = get_words_from_user(
cfg,
('Enter passphrase: ', 'Enter passphrase (echoed): ')[bool(cfg.echo_passphrase)] )
msg('Entered: {}'.format(' '.join(pw)))
elif cmd in ('get_char', 'line_input'):
from mmgen.term import get_char
from mmgen.ui import line_input
from ast import literal_eval
func_args = literal_eval(cmd_args[1])
msg(f'\n term: {get_char.__self__.__name__}')
msg(f' cfg.hold_protect_disable: {cfg.hold_protect_disable}')
if cmd == 'line_input':
func_args.update({'cfg':cfg})
msg(' Calling {name}({args})'.format(
name = cmd,
args = ', '.join(f'{k}={v!r}' for k, v in func_args.items())
))
ret = locals()[cmd](**func_args)
msg(f' ==> {ret!r}')
match cfg._args:
case ['passphrase']:
from mmgen.ui import get_words_from_user
pw = get_words_from_user(
cfg,
('Enter passphrase: ', 'Enter passphrase (echoed): ')[bool(cfg.echo_passphrase)] )
msg('Entered: {}'.format(' '.join(pw)))
case 'get_char' | 'line_input' as cmd, args:
from mmgen.term import get_char
from mmgen.ui import line_input
from ast import literal_eval
func_args = literal_eval(args)
msg(f'\n term: {get_char.__self__.__name__}')
msg(f' cfg.hold_protect_disable: {cfg.hold_protect_disable}')
if cmd == 'line_input':
func_args.update({'cfg':cfg})
msg(' Calling {name}({args})'.format(
name = cmd,
args = ', '.join(f'{k}={v!r}' for k, v in func_args.items())
))
ret = locals()[cmd](**func_args)
msg(f' ==> {ret!r}')

View file

@ -21,16 +21,18 @@ commands = [
'get_char_one',
'get_char_one_raw',
]
if sys.platform in ('linux', 'darwin'):
commands.extend([
'get_char',
'get_char_immed_chars',
'get_char_raw',
])
elif sys.platform == 'win32':
commands.extend([
'get_char_one_char_immed_chars',
])
match sys.platform:
case 'linux' | 'darwin':
commands.extend([
'get_char',
'get_char_immed_chars',
'get_char_raw',
])
case 'win32':
commands.extend([
'get_char_one_char_immed_chars',
])
opts_data = {
'text': {

View file

@ -12,29 +12,27 @@ from mmgen.term import init_term, get_term
init_term(cfg)
term = get_term()
if sys.argv[1] == 'echo':
match sys.argv[1]:
case 'echo':
from mmgen.ui import line_input
from mmgen.term import get_char_raw
from mmgen.ui import line_input
from mmgen.term import get_char_raw
def test_noecho():
term.init(noecho=True)
ret = line_input(cfg, 'noecho> ')
msg(f'==> [{ret.upper()}]')
get_char_raw()
def test_noecho():
term.init(noecho=True)
ret = line_input(cfg, 'noecho> ')
msg(f'==> [{ret.upper()}]')
get_char_raw()
def test_echo():
term.set('echo')
ret = line_input(cfg, 'echo> ')
msg(f'==> [{ret.upper()}]')
def test_echo():
term.set('echo')
ret = line_input(cfg, 'echo> ')
msg(f'==> [{ret.upper()}]')
test_noecho()
test_echo()
test_noecho()
test_noecho()
test_echo()
test_noecho()
elif sys.argv[1] == 'cleanup':
term.register_cleanup()
import tty
tty.setcbreak(term.stdin_fd)
case 'cleanup':
term.register_cleanup()
import tty
tty.setcbreak(term.stdin_fd)

View file

@ -68,10 +68,11 @@ class unit_tests:
for vec in self.vectors[wl_id]['idx_minimal']:
chk = vec[1]
b = m.idx(vec[0], 'minimal')
if chk is False:
assert b is None, (b, None)
elif chk is None:
assert type(b) is tuple, (type(b), tuple)
elif type(chk) is int:
assert b == chk, (b, chk)
match chk:
case False:
assert b is None, (b, None)
case None:
assert type(b) is tuple, (type(b), tuple)
case int(x):
assert b == x, (b, x)
return True

View file

@ -143,16 +143,17 @@ def test_tx(src, cfg, vec):
assert src in ('parse', 'build', 'swapbuild')
if src == 'parse':
tx_in = open(os.path.join('test/ref/thorchain', vec.fn), 'br').read()
tx = RuneTx.loads(tx_in)
if not parms.from_addr:
ymsg(f'Warning: missing test vector data for {vec.fn}')
assert bytes(tx) == tx_in
elif src == 'build':
tx = build_tx(cfg, proto, parms, null_fee=vec.null_fee)
elif src == 'swapbuild':
tx = build_swap_tx(cfg, proto, parms)
match src:
case 'parse':
tx_in = open(os.path.join('test/ref/thorchain', vec.fn), 'br').read()
tx = RuneTx.loads(tx_in)
if not parms.from_addr:
ymsg(f'Warning: missing test vector data for {vec.fn}')
assert bytes(tx) == tx_in
case 'build':
tx = build_tx(cfg, proto, parms, null_fee=vec.null_fee)
case 'swapbuild':
tx = build_swap_tx(cfg, proto, parms)
vmsg(pp_fmt(tx))

View file

@ -90,19 +90,20 @@ def test_attr_perm(obj, attrname, perm_name, perm_value, dobj, attrval_type):
pstem = pname.rstrip('e')
try:
if perm_name == 'read_ok': # non-existent perm
getattr(obj, attrname)
elif perm_name == 'reassign_ok':
try:
so = sample_objs[attrval_type.__name__]
except Exception as e:
raise SampleObjError(f'unable to find sample object of type {attrval_type.__name__!r}') from e
# ListItemAttr allows setting an attribute if its value is None
if type(dobj) is ListItemAttr and getattr(obj, attrname) is None:
match perm_name:
case 'read_ok': # non-existent perm
getattr(obj, attrname)
case 'reassign_ok':
try:
so = sample_objs[attrval_type.__name__]
except Exception as e:
raise SampleObjError(f'unable to find sample object of type {attrval_type.__name__!r}') from e
# ListItemAttr allows setting an attribute if its value is None
if type(dobj) is ListItemAttr and getattr(obj, attrname) is None:
setattr(obj, attrname, so)
setattr(obj, attrname, so)
setattr(obj, attrname, so)
elif perm_name == 'delete_ok':
delattr(obj, attrname)
case 'delete_ok':
delattr(obj, attrname)
except SampleObjError as e:
die(4, f'Test script error ({e})')
except Exception as e:

View file

@ -514,26 +514,25 @@ def do_cmds(cmd_group):
getattr(tc, cmd)(*cmdline)
def main():
if cfg._args:
if len(cfg._args) != 1:
match cfg._args:
case []:
cleandir(tcfg['tmpdir'], do_msg=True)
for cmd in cmd_data:
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
do_cmds(cmd)
if cmd is not list(cmd_data.keys())[-1]:
msg('')
case [_, _]:
die(1, 'Only one command may be specified')
cmd = cfg._args[0]
if cmd in cmd_data:
case [cmd] if cmd in cmd_data:
cleandir(tcfg['tmpdir'], do_msg=True)
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
do_cmds(cmd)
elif cmd == 'clean':
case ['clean']:
cleandir(tcfg['tmpdir'], do_msg=True)
sys.exit(0)
else:
case _:
die(1, f'{cmd!r}: unrecognized command')
else:
cleandir(tcfg['tmpdir'], do_msg=True)
for cmd in cmd_data:
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
do_cmds(cmd)
if cmd is not list(cmd_data.keys())[-1]:
msg('')
end_msg(int(time.time()) - start_time)
from mmgen.main import launch

View file

@ -167,14 +167,14 @@ def check_output(out, chk):
assert chk(outd), f'{chk.__name__}({outd}) failed!'
elif isinstance(chk, dict):
for k, v in chk.items():
if k == 'boolfunc':
assert v(outd), f'{v.__name__}({outd}) failed!'
elif k == 'value':
assert outd == v, err_fs.format(outd, v)
else:
outval = getattr(__builtins__, k)(out)
if outval != v:
die(1, f'{k}({out}) returned {outval}, not {v}!')
match k:
case 'boolfunc':
assert v(outd), f'{v.__name__}({outd}) failed!'
case 'value':
assert outd == v, err_fs.format(outd, v)
case _:
if (outval := getattr(__builtins__, k)(out)) != v:
die(1, f'{k}({out}) returned {outval}, not {v}!')
elif chk is not None:
assert out == chk, err_fs.format(out, chk)