autosign, test suite autosign: cleanups

This commit is contained in:
The MMGen Project 2024-08-26 13:47:02 +00:00
commit ca343e2097
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
7 changed files with 147 additions and 93 deletions

View file

@ -323,22 +323,9 @@ class Signable:
class Autosign:
dfl_mountpoint = '/mnt/mmgen_autosign'
dfl_wallet_dir = '/dev/shm/autosign'
old_dfl_mountpoint = '/mnt/tx'
dfl_dev_label_dir = '/dev/disk/by-label'
dev_label = 'MMGEN_TX'
old_dfl_mountpoint_errmsg = f"""
Mountpoint '{old_dfl_mountpoint}' is no longer supported!
Please rename '{old_dfl_mountpoint}' to '{dfl_mountpoint}'
and update your fstab accordingly.
"""
mountpoint_errmsg_fs = """
Mountpoint '{}' does not exist or does not point
to a directory! Please create the mountpoint and add an entry
to your fstab as described in this scripts help text.
"""
dev_label = 'MMGEN_TX'
linux_mount_subdir = 'mmgen_autosign'
wallet_subdir = 'autosign'
mn_fmts = {
'mmgen': 'words',
@ -359,12 +346,8 @@ class Autosign:
have_xmr = False
xmr_only = False
def init_cfg(self): # see test/overlay/fakemods/mmgen/autosign.py
self.mountpoint = Path(self.cfg.mountpoint or self.dfl_mountpoint)
self.wallet_dir = Path(self.cfg.wallet_dir or self.dfl_wallet_dir)
self.dev_label_path = Path(self.dfl_dev_label_dir) / self.dev_label
self.mount_cmd = 'mount'
self.umount_cmd = 'umount'
def init_fixup(self): # see test/overlay/fakemods/mmgen/autosign.py
pass
def __init__(self,cfg,cmd=None):
@ -374,8 +357,40 @@ class Autosign:
cfg.mnemonic_fmt,
fmt_list( self.mn_fmts, fmt='no_spc' ) ))
if sys.platform == 'linux':
self.dfl_mountpoint = f'/mnt/{self.linux_mount_subdir}'
self.dfl_shm_dir = '/dev/shm'
# linux-only attrs:
self.dev_label_dir = Path('/dev/disk/by-label')
self.old_dfl_mountpoint = '/mnt/tx'
self.old_dfl_mountpoint_errmsg = f"""
Mountpoint {self.old_dfl_mountpoint} is no longer supported!
Please rename {self.old_dfl_mountpoint} to {self.dfl_mountpoint}
and update your fstab accordingly.
"""
self.mountpoint_errmsg_fs = """
Mountpoint {} does not exist or does not point
to a directory! Please create the mountpoint and add an entry
to your fstab as described in this scripts help text.
"""
self.cfg = cfg
self.init_cfg()
self.dfl_wallet_dir = f'{self.dfl_shm_dir}/{self.wallet_subdir}'
self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint)
self.shm_dir = Path(self.dfl_shm_dir)
self.wallet_dir = Path(cfg.wallet_dir or self.dfl_wallet_dir)
if sys.platform == 'linux':
self.mount_cmd = f'mount {self.mountpoint}'
self.umount_cmd = f'umount {self.mountpoint}'
self.init_fixup()
# these use the ‘fixed-up’ values:
if sys.platform == 'linux':
self.dev_label_path = self.dev_label_dir / self.dev_label
self.keyfile = self.mountpoint / 'autosign.key'
@ -448,7 +463,7 @@ class Autosign:
return self._wallet_files
def do_mount(self, silent=False):
def do_mount(self, silent=False, verbose=False):
def check_or_create(dirname):
path = getattr(self, dirname)
@ -463,7 +478,7 @@ class Autosign:
msg(f'Creating ‘{path}')
path.mkdir(parents=True)
if not self.mountpoint.is_dir():
if sys.platform == 'linux' and not self.mountpoint.is_dir():
def do_die(m):
die(1,'\n' + yellow(fmt(m.strip(),indent=' ')))
if Path(self.old_dfl_mountpoint).is_dir():
@ -472,10 +487,8 @@ class Autosign:
do_die(self.mountpoint_errmsg_fs.format(self.mountpoint))
if not self.mountpoint.is_mount():
if run(
self.mount_cmd.split() + [str(self.mountpoint)],
stderr = DEVNULL,
stdout = DEVNULL).returncode == 0:
redir = None if verbose else DEVNULL
if run(self.mount_cmd.split(), stderr=redir, stdout=redir).returncode == 0:
if not silent:
msg(f"Mounting '{self.mountpoint}'")
else:
@ -484,12 +497,13 @@ class Autosign:
for dirname in self.dirs:
check_or_create(dirname)
def do_umount(self,silent=False):
def do_umount(self, silent=False, verbose=False):
if self.mountpoint.is_mount():
run( ['sync'], check=True )
if not silent:
msg(f"Unmounting '{self.mountpoint}'")
run(self.umount_cmd.split() + [str(self.mountpoint)], check=True)
redir = None if verbose else DEVNULL
run(self.umount_cmd.split(), stdout=redir, check=True)
if not silent:
bmsg('It is now safe to extract the removable device')
@ -701,7 +715,8 @@ class Autosign:
def device_inserted(self):
if self.cfg.no_insert_check:
return True
return self.dev_label_path.exists()
if sys.platform == 'linux':
return self.dev_label_path.exists()
async def main_loop(self):
if not self.cfg.stealth_led:

View file

@ -135,6 +135,7 @@ class CmdTestAutosignAutomount(CmdTestAutosignThreaded, CmdTestRegtestBDBWallet)
t.expect(expect)
for pat in shred_expect:
t.expect(pat, regex=True)
t.read()
self.remove_device_online()
return t
@ -173,11 +174,7 @@ class CmdTestAutosignAutomount(CmdTestAutosignThreaded, CmdTestRegtestBDBWallet)
return 'ok'
def alice_run_autosign_setup(self):
self.insert_device()
t = self.run_setup(mn_type='default', use_dfl_wallet=True, passwd=rt_pw)
t.read()
self.remove_device()
return t
return self.run_setup(mn_type='default', use_dfl_wallet=True, passwd=rt_pw)
def alice_txsend1(self):
return self._alice_txsend('This one’s worth a comment', no_wait=True)
@ -199,6 +196,7 @@ class CmdTestAutosignAutomount(CmdTestAutosignThreaded, CmdTestRegtestBDBWallet)
['--alice', '--autosign', '--status', '--verbose'],
exit_val = exit_val)
t.expect(expect)
t.read()
self.remove_device_online()
return t

View file

@ -90,11 +90,7 @@ class CmdTestAutosignETH(CmdTestAutosignThreaded, CmdTestEthdev):
return t
def run_autosign_setup(self):
self.insert_device()
t = self.run_setup(mn_type='bip39', mn_file='test/ref/98831F3A.bip39', use_dfl_wallet=None)
t.read()
self.remove_device()
return t
return self.run_setup(mn_type='bip39', mn_file='test/ref/98831F3A.bip39', use_dfl_wallet=None)
def send_tx(self, add_args=[]):
self._wait_signed('transaction')

View file

@ -20,7 +20,7 @@
test.cmdtest_py_d.ct_autosign: Autosign tests for the cmdtest.py test suite
"""
import sys,os,time,shutil
import sys, os, time, shutil
from subprocess import run,DEVNULL
from pathlib import Path
@ -80,6 +80,10 @@ class CmdTestAutosignBase(CmdTestBase):
if self.threaded:
self.spawn_env['MMGEN_TEST_SUITE_AUTOSIGN_THREADED'] = '1'
def __del__(self):
if hasattr(self,'have_dummy_control_files'):
LEDControl.delete_dummy_control_files()
def _create_autosign_instances(self,create_dirs):
d = {'offline': {'name':'asi'}}
@ -98,19 +102,20 @@ class CmdTestAutosignBase(CmdTestBase):
}))
if create_dirs and not self.live:
for k in ('mountpoint', 'wallet_dir', 'dev_label_dir'):
if k == 'wallet_dir' and subdir == 'online':
for k in ('mountpoint', 'shm_dir', 'wallet_dir', 'dev_label_dir'):
if subdir == 'online' and k in ('shm_dir', 'wallet_dir'):
continue
(Path(self.tmpdir) / (subdir + getattr(Autosign,'dfl_'+k))).mkdir(parents=True,exist_ok=True)
getattr(asi, k).mkdir(parents=True, exist_ok=True)
setattr(self, data['name'], asi)
def _create_removable_device(self):
redir = DEVNULL
img_file = str(self.asi.fs_image_path)
run(['truncate', '--size=10M', img_file], check=True)
run(['/sbin/mkfs.ext2', '-E', f'root_owner={os.getuid()}:{os.getgid()}', img_file],
stdout=redir, stderr=redir, check=True)
img = self.asi.fs_image_path
if sys.platform == 'linux':
redir = DEVNULL
run(f'truncate --size=10M {img}'.split(), check=True)
run(f'/sbin/mkfs.ext2 -E root_owner={os.getuid()}:{os.getgid()} {img}'.split(),
stdout=redir, stderr=redir, check=True)
def start_daemons(self):
self.spawn('',msg_only=True)
@ -129,11 +134,14 @@ class CmdTestAutosignBase(CmdTestBase):
use_dfl_wallet = False,
seed_len = None,
usr_entry_modes = False,
passwd = 'abc'):
passwd = 'abc',
expect_args = []):
mn_desc = mn_type or 'default'
mn_type = mn_type or 'mmgen'
self.insert_device()
t = self.spawn(
'mmgen-autosign',
self.opts
@ -166,14 +174,29 @@ class CmdTestAutosignBase(CmdTestBase):
stealth_mnemonic_entry(t,mne,mn,entry_mode)
t.written_to_file('Autosign wallet')
if expect_args:
t.expect(*expect_args)
t.read()
self.remove_device()
return t
def insert_device(self):
self.asi.dev_label_path.touch()
def insert_device(self, asi='asi'):
if self.live:
return
loc = getattr(self, asi)
if sys.platform == 'linux':
loc.dev_label_path.touch()
def remove_device(self):
if self.asi.dev_label_path.exists():
self.asi.dev_label_path.unlink()
def remove_device(self, asi='asi'):
if self.live:
return
loc = getattr(self, asi)
if sys.platform == 'linux':
if loc.dev_label_path.exists():
loc.dev_label_path.unlink()
def _mount_ops(self, loc, cmd, *args, **kwargs):
return getattr(getattr(self,loc),cmd)(*args, silent=self.silent_mount, **kwargs)
@ -283,8 +306,10 @@ class CmdTestAutosignClean(CmdTestAutosignBase):
t = self.spawn('mmgen-autosign', [f'--coins={coins}','clean'], no_msg=True)
out = t.read()
self.insert_device()
silence()
self.do_mount()
self.remove_device()
end_silence()
after = '\n'.join(self._gen_listing())
@ -313,6 +338,7 @@ class CmdTestAutosignClean(CmdTestAutosignBase):
shred_count += 9
self.do_umount()
self.remove_device()
imsg(f'\nBefore cleaning:\n{before}')
imsg(f'\nAfter cleaning:\n{after}')
@ -340,6 +366,7 @@ class CmdTestAutosignThreaded(CmdTestAutosignBase):
import threading
threading.Thread(target=run, name='Autosign wait loop').start()
time.sleep(0.2)
return 'silent'
def autosign_kill_thread(self):
@ -376,11 +403,10 @@ class CmdTestAutosignThreaded(CmdTestAutosignBase):
return 'ok'
def insert_device_online(self):
self.asi_online.dev_label_path.touch()
return self.insert_device(asi='asi_online')
def remove_device_online(self):
if self.asi_online.dev_label_path.exists():
self.asi_online.dev_label_path.unlink()
return self.remove_device(asi='asi_online')
def do_mount_online(self, *args, **kwargs):
return self._mount_ops('asi_online', 'do_mount', *args, **kwargs)
@ -390,6 +416,7 @@ class CmdTestAutosignThreaded(CmdTestAutosignBase):
async def txview(self):
self.spawn('', msg_only=True)
self.insert_device()
self.do_mount()
src = Path(self.asi.txauto_dir)
from mmgen.tx import CompletedTX
@ -401,6 +428,7 @@ class CmdTestAutosignThreaded(CmdTestAutosignBase):
out = tx.info.format(terse=True)
imsg(indent(out, indent=' '))
self.do_umount()
self.remove_device()
return 'ok'
class CmdTestAutosign(CmdTestAutosignBase):
@ -494,13 +522,12 @@ class CmdTestAutosign(CmdTestAutosignBase):
self.have_dummy_control_files = True
self.spawn_env['MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE'] = '1'
def __del__(self):
if hasattr(self,'have_dummy_control_files'):
LEDControl.delete_dummy_control_files()
def gen_key(self):
self.insert_device()
t = self.spawn( 'mmgen-autosign', self.opts + ['gen_key'] )
t.expect_getend('Wrote key file ')
t.read()
self.remove_device()
return t
def create_dfl_wallet(self):
@ -579,8 +606,10 @@ class CmdTestAutosign(CmdTestAutosignBase):
if op == 'set_count':
return
self.insert_device()
silence()
self.do_mount()
self.do_mount(verbose=cfg.verbose or cfg.exact_output)
end_silence()
for coindir,fn in data:
@ -597,6 +626,7 @@ class CmdTestAutosign(CmdTestAutosignBase):
pass
self.do_umount()
self.remove_device()
return 'ok'
@ -610,6 +640,7 @@ class CmdTestAutosign(CmdTestAutosignBase):
remove_bad_txfiles2 = remove_bad_txfiles
def bad_txfiles(self,op):
self.insert_device()
self.do_mount()
# create or delete 2 bad tx files
self.spawn('',msg_only=True)
@ -627,6 +658,7 @@ class CmdTestAutosign(CmdTestAutosignBase):
pass
self.bad_tx_count = 0
self.do_umount()
self.remove_device()
return 'ok'
def copy_msgfiles(self):
@ -644,6 +676,7 @@ class CmdTestAutosign(CmdTestAutosignBase):
def msgfile_ops(self,op):
self.spawn('',msg_only=True)
destdir = joinpath(self.asi.mountpoint,'msg')
self.insert_device()
self.do_mount()
os.makedirs(destdir,exist_ok=True)
if op.endswith('_invalid'):
@ -667,10 +700,14 @@ class CmdTestAutosign(CmdTestAutosignBase):
elif op == 'remove_signed':
os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
self.do_umount()
self.remove_device()
return 'ok'
def do_sign(self, args=[], have_msg=False, exc_exit_val=None):
tx_desc = Signable.transaction.desc
self.insert_device()
t = self.spawn(
'mmgen-autosign',
self.opts + args,
@ -698,6 +735,7 @@ class CmdTestAutosign(CmdTestAutosignBase):
regex = True)
t.read()
self.remove_device()
imsg('')
return t
@ -732,19 +770,25 @@ class CmdTestAutosign(CmdTestAutosignBase):
absent = ['xmr_signables'])
def sign_no_unsigned_xmr(self):
if self.coins == ['btc']:
return 'skip'
return self._sign_no_unsigned(
coins = 'XMR,BTC',
present = ['xmr_signables','non_xmr_signables'])
def sign_no_unsigned_xmronly(self):
if self.coins == ['btc']:
return 'skip'
return self._sign_no_unsigned(
coins = 'XMR',
present = ['xmr_signables'],
absent = ['non_xmr_signables'])
def _sign_no_unsigned(self,coins,present=[],absent=[]):
self.insert_device()
t = self.spawn('mmgen-autosign', ['--quiet', '--no-insert-check', f'--coins={coins}'])
res = t.read()
self.remove_device()
for signable_list in present:
for signable_clsname in getattr(Signable,signable_list):
desc = getattr(Signable, signable_clsname).desc
@ -756,8 +800,11 @@ class CmdTestAutosign(CmdTestAutosignBase):
return t
def wipe_key(self):
self.insert_device()
t = self.spawn('mmgen-autosign', ['--quiet', '--no-insert-check', 'wipe_key'])
t.expect('Shredding')
t.read()
self.remove_device()
return t
class CmdTestAutosignBTC(CmdTestAutosign):

View file

@ -214,14 +214,11 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignThreaded):
return self.fund_alice(wallet=2)
def autosign_setup(self):
self.insert_device()
t = self.run_setup(
return self.run_setup(
mn_type = 'mmgen',
mn_file = self.users['alice'].mmwords,
use_dfl_wallet = None )
t.expect('Continue with Monero setup? (Y/n): ','n')
self.remove_device()
return t
use_dfl_wallet = None,
expect_args = ['Continue with Monero setup? (Y/n): ', 'n'])
def autosign_xmr_setup(self):
self.do_mount_online()

View file

@ -2,18 +2,20 @@ from .autosign_orig import *
class overlay_fake_Autosign:
def init_cfg(self):
def init_fixup(self):
if pfx := self.cfg.test_suite_root_pfx:
subdir = 'online' if self.cfg.online else 'offline'
self.mountpoint = Path(f'{pfx}/{subdir}/{self.dfl_mountpoint}')
self.wallet_dir = Path(f'{pfx}/{subdir}/{self.dfl_wallet_dir}')
self.dev_label_path = Path(f'{pfx}/{subdir}/{self.dfl_dev_label_dir}') / self.dev_label
subdir = pfx + '/' + ('online' if self.cfg.online else 'offline')
for k in ('mountpoint', 'shm_dir', 'wallet_dir', 'dev_label_dir'):
if hasattr(self, k):
orig_path = str(getattr(self, k))
setattr(self, k, Path(subdir + orig_path.removeprefix(subdir)))
# mount --type=fuse-ext2 --options=rw+ ### current fuse-ext2 (0.4 29) is buggy - can’t use
self.fs_image_path = Path(f'{pfx}/removable_device_image')
self.mount_cmd = f'sudo mount {self.fs_image_path}'
self.umount_cmd = 'sudo umount'
else:
self.init_cfg_orig()
self.fs_image_path = Path(f'{pfx}/removable_device_image').absolute()
import sys
if sys.platform == 'linux':
self.mount_cmd = f'sudo mount {self.fs_image_path} {self.mountpoint}'
self.umount_cmd = f'sudo umount {self.mountpoint}'
Autosign.init_cfg_orig = Autosign.init_cfg
Autosign.init_cfg = overlay_fake_Autosign.init_cfg
Autosign.dev_label = 'MMGEN_TS_TX'
Autosign.linux_mount_subdir = 'mmgen_ts_autosign'
Autosign.init_fixup = overlay_fake_Autosign.init_fixup

View file

@ -20,13 +20,13 @@ groups_desc="
init_groups() {
dfl_tests='dep alt obj color unit hash ref tool tool2 gen autosign btc btc_tn btc_rt altref altgen bch bch_rt ltc ltc_rt eth etc xmr'
extra_tests='dep dev lint autosign_btc autosign_live ltc_tn bch_tn'
noalt_tests='dep alt obj color unit hash ref tool tool2 gen autosign_btc btc btc_tn btc_rt'
extra_tests='dep dev lint autosign_live ltc_tn bch_tn'
noalt_tests='dep alt obj color unit hash ref tool tool2 gen autosign btc btc_tn btc_rt'
quick_tests='dep alt obj color unit hash ref tool tool2 gen autosign btc btc_rt altref altgen eth etc xmr'
qskip_tests='lint btc_tn bch bch_rt ltc ltc_rt'
noalt_ok_tests='lint'
[ "$MSYS2" ] && SKIP_LIST='autosign autosign_btc autosign_live'
[ "$MSYS2" ] && SKIP_LIST='autosign autosign_live'
[ "$ARM32" -o "$ARM64" ] && SKIP_LIST+=' etc'
true
@ -162,18 +162,17 @@ init_tests() {
d_autosign="transaction autosigning with automount"
t_autosign="
- $cmdtest_py autosign autosign_clean autosign_automount
- $cmdtest_py autosign_clean autosign_automount autosign
b $cmdtest_py autosign_clean autosign_automount autosign_btc
- $cmdtest_py --coin=bch autosign_automount
s $cmdtest_py --coin=ltc autosign_automount
- $cmdtest_py --coin=eth autosign_eth
s $cmdtest_py --coin=etc autosign_eth
"
[ "$FAST" ] && t_autosign_skip='s'
if [ "$SKIP_ALT_DEP" ]; then t_autosign_skip='- s'; else t_autosign_skip='b'; fi
[ "$FAST" ] && t_autosign_skip+=' s'
d_autosign_btc="transaction and message autosigning (Bitcoin only)"
t_autosign_btc="- $cmdtest_py autosign_btc"
d_autosign_btc="transaction and message autosigning (interactive)"
d_autosign_live="transaction and message autosigning (interactive)"
t_autosign_live="- $cmdtest_py autosign_live"
d_btc="overall operations with emulated RPC data (Bitcoin)"