cmdtest autosign: rename mount,umount methods

This commit is contained in:
The MMGen Project 2024-02-17 09:45:08 +00:00
commit 40724b3c81
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
4 changed files with 54 additions and 35 deletions

View file

@ -320,7 +320,7 @@ class Autosign:
return self._wallet_files
def do_mount(self,silent=False,no_xmr_chk=False):
def do_mount(self,silent=False,no_dir_chk=False,no_xmr_chk=False):
from stat import S_ISDIR,S_IWUSR,S_IRUSR
@ -352,6 +352,9 @@ class Autosign:
self.have_msg_dir = self.msg_dir.is_dir()
if no_dir_chk:
return
check_dir(self.tx_dir)
if self.have_msg_dir:

View file

@ -70,6 +70,7 @@ class CmdTestAutosignBase(CmdTestBase):
color = True
no_insert_check = True
win_skip = True
have_online = False
def __init__(self,trunner,cfgs,spawn):
@ -78,7 +79,7 @@ class CmdTestAutosignBase(CmdTestBase):
if trunner is None:
return
self.silent = self.live or not (cfg.exact_output or cfg.verbose)
self.silent_mount = self.live or not (cfg.exact_output or cfg.verbose)
self.network_ids = [c+'_tn' for c in self.daemon_coins] + self.daemon_coins
self._create_autosign_instances(create_dirs=not cfg.skipping_deps)
@ -120,10 +121,9 @@ class CmdTestAutosignBase(CmdTestBase):
self.spawn_env['MMGEN_TEST_SUITE_ROOT_PFX'] = self.tmpdir
def _create_autosign_instances(self,create_dirs):
d = {
'offline': {'name':'asi'},
'online': {'name':'asi_ts'}
}
d = {'offline': {'name':'asi'}}
if self.have_online:
d['online'] = {'name':'asi_ts'}
for subdir,data in d.items():
if create_dirs and not self.live:
for k in ('mountpoint','wallet_dir','dev_label_dir'):
@ -142,13 +142,13 @@ class CmdTestAutosignBase(CmdTestBase):
def _create_removable_device(self):
redir = DEVNULL
img_file = str(self.asi_ts.fs_image_path)
img_file = str(self.asi.fs_image_path)
run(['truncate', '--size=10M', img_file], check=True)
run(['/sbin/mkfs.ext2', '-E', f'root_owner={os.getuid()}:{os.getgid()}', img_file],
stdout=redir, stderr=redir, check=True)
run(self.asi_ts.mount_cmd.split() + [str(self.asi_ts.mountpoint)], stdout=redir, check=True)
(self.asi_ts.mountpoint / 'tx').mkdir()
run(self.asi_ts.umount_cmd.split() + [str(self.asi_ts.mountpoint)], check=True)
self.do_mount(no_dir_chk=True)
(self.asi.mountpoint / 'tx').mkdir()
self.do_umount()
def __del__(self):
if sys.platform == 'win32' or self.tr is None:
@ -250,14 +250,14 @@ class CmdTestAutosignBase(CmdTestBase):
if op == 'set_count':
return
self.asi_ts.do_mount(self.silent)
self.do_mount()
for coindir,fn in data:
src = joinpath(ref_dir,coindir,fn)
if cfg.debug_utf8:
ext = '.testnet.rawtx' if fn.endswith('.testnet.rawtx') else '.rawtx'
fn = fn[:-len(ext)] + '' + ext
target = joinpath(self.asi_ts.mountpoint,'tx',fn)
target = joinpath(self.asi.mountpoint,'tx',fn)
if not op == 'remove_signed':
shutil.copyfile(src,target)
try:
@ -265,7 +265,7 @@ class CmdTestAutosignBase(CmdTestBase):
except:
pass
self.asi_ts.do_umount(self.silent)
self.do_umount()
return 'ok'
@ -279,10 +279,10 @@ class CmdTestAutosignBase(CmdTestBase):
remove_bad_txfiles2 = remove_bad_txfiles
def bad_txfiles(self,op):
self.asi_ts.do_mount(self.silent)
self.do_mount()
# create or delete 2 bad tx files
self.spawn('',msg_only=True)
fns = [joinpath(self.asi_ts.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)]
fns = [joinpath(self.asi.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)]
if op == 'create':
for fn in fns:
with open(fn,'w') as fp:
@ -295,7 +295,7 @@ class CmdTestAutosignBase(CmdTestBase):
except:
pass
self.bad_tx_count = 0
self.asi_ts.do_umount(self.silent)
self.do_umount()
return 'ok'
def copy_msgfiles(self):
@ -312,8 +312,8 @@ class CmdTestAutosignBase(CmdTestBase):
def msgfile_ops(self,op):
self.spawn('',msg_only=True)
destdir = joinpath(self.asi_ts.mountpoint,'msg')
self.asi_ts.do_mount(self.silent)
destdir = joinpath(self.asi.mountpoint,'msg')
self.do_mount()
os.makedirs(destdir,exist_ok=True)
if op.endswith('_invalid'):
fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json')
@ -335,7 +335,7 @@ class CmdTestAutosignBase(CmdTestBase):
shutil.copy2(fn,destdir)
elif op == 'remove_signed':
os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
self.asi_ts.do_umount(self.silent)
self.do_umount()
return 'ok'
def do_sign(self,args,have_msg=False,tx_name='transaction'):
@ -393,6 +393,21 @@ class CmdTestAutosignBase(CmdTestBase):
if self.asi_ts.dev_label_path.exists():
self.asi_ts.dev_label_path.unlink()
def _mount_ops(self, loc, cmd, *args, **kwargs):
return getattr(getattr(self,loc),cmd)(*args, silent=self.silent_mount, **kwargs)
def do_mount(self, *args, **kwargs):
return self._mount_ops('asi', 'do_mount', *args, **kwargs)
def do_umount(self, *args, **kwargs):
return self._mount_ops('asi', 'do_umount', *args, **kwargs)
def do_mount_online(self, *args, **kwargs):
return self._mount_ops('asi_ts', 'do_mount', *args, **kwargs)
def do_umount_online(self, *args, **kwargs):
return self._mount_ops('asi_ts', 'do_umount', *args, **kwargs)
class CmdTestAutosign(CmdTestAutosignBase):
'autosigning transactions for all supported coins'
coins = ['btc','bch','ltc','eth']
@ -516,7 +531,7 @@ class CmdTestAutosignLive(CmdTestAutosignBTC):
omsg(purple(f'Running autosign test with {opts_msg}'))
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
prompt_remove()
omsg(green(info_msg))
t = self.spawn(
@ -526,9 +541,9 @@ class CmdTestAutosignLive(CmdTestAutosignBTC):
omsg('')
prompt_insert_sign(t)
self.asi_ts.do_mount(self.silent) # race condition due to device insertion detection
self.do_mount_online() # race condition due to device insertion detection
self.remove_signed_txfiles()
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
imsg(purple('\nKilling wait loop!'))
t.kill(2) # 2 = SIGINT

View file

@ -63,6 +63,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
tx_relay_user = 'miner'
no_insert_check = False
win_skip = True
have_online = True
cmd_group = (
('daemon_version', 'checking daemon version'),
@ -225,12 +226,12 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
def autosign_setup(self):
self.asi_ts.do_mount(self.silent,no_xmr_chk=True)
self.do_mount_online(no_xmr_chk=True)
self.asi_ts.xmr_dir.mkdir(exist_ok=True)
(self.asi_ts.xmr_dir / 'old.vkeys').touch()
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
self.insert_device()
@ -287,10 +288,10 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return self._create_transfer_tx('0.124')
def create_transfer_tx2(self):
self.asi_ts.do_mount(self.silent)
self.do_mount_online()
get_file_with_ext(self.asi_ts.xmr_tx_dir,'rawtx',delete_all=True)
get_file_with_ext(self.asi_ts.xmr_tx_dir,'sigtx',delete_all=True)
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
return self._create_transfer_tx('0.257')
def _wait_signed(self,dtype):
@ -478,7 +479,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
def autosign_clean(self):
self.asi_ts.do_mount(self.silent,no_xmr_chk=True)
self.do_mount_online(no_xmr_chk=True)
self.create_fake_tx_files()
before = '\n'.join(self._gen_listing())
@ -486,7 +487,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
t = self.spawn('mmgen-autosign', self.opts + ['clean'])
out = t.read()
self.asi_ts.do_mount(self.silent,no_xmr_chk=True)
self.do_mount_online(no_xmr_chk=True)
after = '\n'.join(self._gen_listing())
@ -495,7 +496,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
self.asi_ts.tx_dir.mkdir()
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
chk = """
tx: a.sigtx b.sigtx c.rawtx d.sigtx
@ -526,16 +527,16 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
def check_tx_dirs(self):
self.asi_ts.do_mount(self.silent)
self.do_mount_online()
before = '\n'.join(self._gen_listing())
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
t = self.spawn('mmgen-autosign', self.opts + ['clean'])
t.read()
self.asi_ts.do_mount(self.silent)
self.do_mount_online()
after = '\n'.join(self._gen_listing())
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
imsg(f'\nBefore cleaning:\n{before}')
imsg(f'\nAfter cleaning:\n{after}')

View file

@ -673,7 +673,7 @@ class CmdTestXMRWallet(CmdTestBase):
async def open_wallet_user(self,user,wnum):
data = self.users[user]
if data.autosign:
self.asi_ts.do_mount(self.silent)
self.do_mount_online()
silence()
kal = (ViewKeyAddrList if data.autosign else KeyAddrList)(
cfg = cfg,
@ -683,7 +683,7 @@ class CmdTestXMRWallet(CmdTestBase):
key_address_validity_check = False )
end_silence()
if data.autosign:
self.asi_ts.do_umount(self.silent)
self.do_umount_online()
self.users[user].wd.start(silent=not (cfg.exact_output or cfg.verbose))
return data.wd_rpc.call(
'open_wallet',