cmdtest xmr_autosign: use virtual removable device

This commit is contained in:
The MMGen Project 2024-02-15 09:28:09 +00:00
commit 27bd25a733
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
6 changed files with 125 additions and 33 deletions

View file

@ -246,12 +246,13 @@ class Autosign:
fmt_list( self.mn_fmts, fmt='no_spc' ) ))
if pfx := cfg.test_suite_root_pfx:
subdir = 'online'
subdir = 'online' if cfg.online else 'offline'
self.mountpoint = Path(f'{pfx}/{subdir}/{self.dfl_mountpoint}')
self.wallet_dir = Path(f'{pfx}/{subdir}/{self.dfl_wallet_dir}')
self.dev_label_path = Path(f'{pfx}/{subdir}/{self.dfl_dev_label_dir}') / self.dev_label
# mount --type=fuse-ext2 --options=rw+ ### current fuse-ext2 (0.4 29) is buggy - can’t use
self.mount_cmd = f'sudo mount {pfx}/removable_device'
self.fs_image_path = Path(f'{pfx}/removable_device_image')
self.mount_cmd = f'sudo mount {self.fs_image_path}'
self.umount_cmd = 'sudo umount'
else:
self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint)
@ -346,7 +347,7 @@ class Autosign:
stdout = DEVNULL).returncode == 0:
if not silent:
msg(f"Mounting '{self.mountpoint}'")
elif not self.cfg.test_suite:
else:
die(1,f"Unable to mount device at '{self.mountpoint}'")
self.have_msg_dir = self.msg_dir.is_dir()
@ -523,7 +524,7 @@ class Autosign:
'autosign': True,
'autosign_mountpoint': str(self.mountpoint),
'outdir': str(self.xmr_dir), # required by vkal.write()
'offline': False,
'offline': True,
})
return self._xmrwallet_cfg
@ -604,24 +605,19 @@ class Autosign:
async def main_loop(self):
if not self.cfg.stealth_led:
self.led.set('standby')
testing_xmr = self.cfg.test_suite_xmr_autosign
if testing_xmr:
msg('Waiting for fake device insertion')
n = 1 if testing_xmr else 0
silent = self.cfg.test_suite_xmr_autosign
n = 1 if silent else 0
prev_status = False
while True:
status = self.get_insert_status()
if status and not prev_status:
msg('Device insertion detected')
await self.do_sign()
if testing_xmr:
if self.dev_label_path.exists():
self.dev_label_path.unlink()
prev_status = status
if not n % 10:
msg_r(f"\r{' '*17}\rWaiting")
await asyncio.sleep(1)
if not testing_xmr:
if not silent:
msg_r('.')
n += 1

View file

@ -1 +1 @@
14.1.dev9
14.1.dev10

View file

@ -84,7 +84,7 @@ def get_autosign_obj(cfg):
'test_suite': cfg.test_suite,
'test_suite_root_pfx': cfg.test_suite_root_pfx,
'coins': 'xmr',
'online': True,
'online': not cfg.offline,
})
)
@ -759,7 +759,7 @@ class MoneroWalletOps:
pass
def mount_removable_device(self):
if self.cfg.autosign and not self.cfg.test_suite:
if self.cfg.autosign:
if not self.asi.get_insert_status():
die(1,'Removable device not present!')
if self.do_umount:

View file

@ -21,7 +21,7 @@ test.cmdtest_py_d.ct_autosign: Autosign tests for the cmdtest.py test suite
"""
import sys,os,shutil
from subprocess import run
from subprocess import run,DEVNULL
from pathlib import Path
from mmgen.cfg import Config
@ -83,7 +83,8 @@ class CmdTestAutosignBase(CmdTestBase):
self._create_autosign_instances(create_dirs=not cfg.skipping_deps)
(self.asi_ts.mountpoint / 'tx').mkdir()
if not (cfg.skipping_deps or self.live):
self._create_removable_device()
if self.simulate_led and not cfg.exact_output:
die(1,red('This command must be run with --exact-output enabled!'))
@ -136,9 +137,19 @@ class CmdTestAutosignBase(CmdTestBase):
'test_suite': True,
'test_suite_xmr_autosign': self.name == 'CmdTestXMRAutosign',
'test_suite_root_pfx': None if self.live else self.tmpdir,
'online': True,
'online': subdir == 'online',
})))
def _create_removable_device(self):
redir = DEVNULL
img_file = str(self.asi_ts.fs_image_path)
run(['truncate', '--size=10M', img_file], check=True)
run(['/sbin/mkfs.ext2', '-E', f'root_owner={os.getuid()}:{os.getgid()}', img_file],
stdout=redir, stderr=redir, check=True)
run(self.asi_ts.mount_cmd.split() + [str(self.asi_ts.mountpoint)], stdout=redir, check=True)
(self.asi_ts.mountpoint / 'tx').mkdir()
run(self.asi_ts.umount_cmd.split() + [str(self.asi_ts.mountpoint)], check=True)
def __del__(self):
if sys.platform == 'win32' or self.tr is None:
return
@ -239,6 +250,8 @@ class CmdTestAutosignBase(CmdTestBase):
if op == 'set_count':
return
self.asi_ts.do_mount(self.silent)
for coindir,fn in data:
src = joinpath(ref_dir,coindir,fn)
if cfg.debug_utf8:
@ -252,6 +265,8 @@ class CmdTestAutosignBase(CmdTestBase):
except:
pass
self.asi_ts.do_umount(self.silent)
return 'ok'
def create_bad_txfiles(self):
@ -264,8 +279,7 @@ class CmdTestAutosignBase(CmdTestBase):
remove_bad_txfiles2 = remove_bad_txfiles
def bad_txfiles(self,op):
if self.live:
self.asi.do_mount(self.silent)
self.asi_ts.do_mount(self.silent)
# create or delete 2 bad tx files
self.spawn('',msg_only=True)
fns = [joinpath(self.asi_ts.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)]
@ -281,6 +295,7 @@ class CmdTestAutosignBase(CmdTestBase):
except:
pass
self.bad_tx_count = 0
self.asi_ts.do_umount(self.silent)
return 'ok'
def copy_msgfiles(self):
@ -298,6 +313,7 @@ class CmdTestAutosignBase(CmdTestBase):
def msgfile_ops(self,op):
self.spawn('',msg_only=True)
destdir = joinpath(self.asi_ts.mountpoint,'msg')
self.asi_ts.do_mount(self.silent)
os.makedirs(destdir,exist_ok=True)
if op.endswith('_invalid'):
fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json')
@ -319,6 +335,7 @@ class CmdTestAutosignBase(CmdTestBase):
shutil.copy2(fn,destdir)
elif op == 'remove_signed':
os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
self.asi_ts.do_umount(self.silent)
return 'ok'
def do_sign(self,args,have_msg=False,tx_name='transaction'):
@ -354,9 +371,28 @@ class CmdTestAutosignBase(CmdTestBase):
imsg('')
return t
@property
def device_inserted(self):
return self.asi.dev_label_path.exists()
@property
def device_inserted_ts(self):
return self.asi_ts.dev_label_path.exists()
def insert_device(self):
self.asi.dev_label_path.touch()
def insert_device_ts(self):
self.asi_ts.dev_label_path.touch()
def remove_device(self):
if self.asi.dev_label_path.exists():
self.asi.dev_label_path.unlink()
def remove_device_ts(self):
if self.asi_ts.dev_label_path.exists():
self.asi_ts.dev_label_path.unlink()
class CmdTestAutosign(CmdTestAutosignBase):
'autosigning transactions for all supported coins'
coins = ['btc','bch','ltc','eth']

View file

@ -186,6 +186,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
def _dump_wallets(self,autosign):
data = self.users['alice']
self.insert_device_ts()
t = self.spawn(
'mmgen-xmrwallet',
self.extra_opts
@ -194,6 +195,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
+ ['dump']
+ ([] if autosign else [get_file_with_ext(data.udir,'akeys')]) )
t.expect('2 wallets dumped')
self.remove_device_ts()
return t
def _delete_files(self,*ext_list):
@ -222,20 +224,28 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return self.fund_alice(wallet=2)
def autosign_setup(self):
self.asi_ts.do_mount(self.silent,no_xmr_chk=True)
self.asi_ts.xmr_dir.mkdir(exist_ok=True)
(self.asi_ts.xmr_dir / 'old.vkeys').touch()
self.asi_ts.do_umount(self.silent)
self.insert_device()
Path(self.asi_ts.xmr_dir).mkdir(parents=True,exist_ok=True)
Path(self.asi_ts.xmr_dir,'old.vkeys').touch()
t = self.run_setup(
mn_type = 'mmgen',
mn_file = self.users['alice'].mmwords,
use_dfl_wallet = None )
t.expect('Continue with Monero setup? (Y/n): ','y')
t.written_to_file('View keys')
self.remove_device()
return t
def autosign_start_thread(self):
if self.asi.dev_label_path.exists():
self.asi.dev_label_path.unlink()
def run():
t = self.spawn('mmgen-autosign', self.opts + ['wait'], direct_exec=True)
self.write_to_tmpfile('autosign_thread_pid',str(t.ep.pid))
@ -257,33 +267,50 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return 'ok'
def create_watchonly_wallets(self):
return self.create_wallets( 'alice', op='restore' )
self.insert_device_ts()
t = self.create_wallets('alice', op='restore')
t.read() # required!
self.remove_device_ts()
return t
def restore_wallets(self):
return self.create_watchonly_wallets()
def _create_transfer_tx(self,amt):
self.insert_device_ts()
t = self.do_op('transfer','alice',f'1:0:{self.burn_addr},{amt}',no_relay=True,do_ret=True)
t.read() # required!
self.insert_device()
self.remove_device_ts()
return t
def create_transfer_tx1(self):
return self._create_transfer_tx('0.124')
def create_transfer_tx2(self):
self.asi_ts.do_mount(self.silent)
get_file_with_ext(self.asi_ts.xmr_tx_dir,'rawtx',delete_all=True)
get_file_with_ext(self.asi_ts.xmr_tx_dir,'sigtx',delete_all=True)
self.asi_ts.do_umount(self.silent)
return self._create_transfer_tx('0.257')
def _wait_signed(self,dtype):
oqmsg_r(gray(f'→ offline wallet{"s" if dtype.endswith("s") else ""} signing {dtype}'))
assert not self.device_inserted, f'{self.asi.dev_label_path}’ is inserted!'
assert not self.asi.mountpoint.is_mount(), f'{self.asi.mountpoint}’ is mounted!'
self.insert_device()
while True:
oqmsg_r(gray('.'))
if not self.asi.dev_label_path.exists():
if self.asi.mountpoint.is_mount():
oqmsg_r(gray('..working..'))
break
time.sleep(0.5)
oqmsg(gray('done'))
while True:
oqmsg_r(gray('.'))
if not self.asi.mountpoint.is_mount():
oqmsg(gray('..done'))
break
time.sleep(0.5)
self.remove_device()
def _xmr_autosign_op(
self,
@ -349,6 +376,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return self._submit_transfer_tx( relay_parm=self.tx_relay_daemon_parm )
def _submit_transfer_tx(self,relay_parm=None,ext=None,op='submit',check_bal=True):
self.insert_device_ts()
t = self._xmr_autosign_op(
op = op,
add_opts = [f'--tx-relay-daemon={relay_parm}'] if relay_parm else [],
@ -357,6 +385,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
wait_signed = op == 'submit' )
t.expect( f'{op.capitalize()} transaction? (y/N): ', 'y' )
t.written_to_file('Submitted transaction')
self.remove_device_ts()
if check_bal:
t.ok()
return self._mine_chk('unlocked')
@ -364,13 +393,13 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return t
def _export_outputs(self,wallet_arg,add_opts=[]):
self.insert_device_ts()
t = self._xmr_autosign_op(
op = 'export-outputs',
wallet_arg = wallet_arg,
add_opts = add_opts )
t.written_to_file('Wallet outputs')
t.read() # required!
self.insert_device()
self.remove_device_ts()
return t
def export_outputs1(self):
@ -380,11 +409,14 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return self._export_outputs('1-2')
def _import_key_images(self,wallet_arg):
self.insert_device_ts()
t = self._xmr_autosign_op(
op = 'import-key-images',
wallet_arg = wallet_arg,
dtype = 'wallet outputs',
wait_signed = True )
t.read()
self.remove_device_ts()
return t
def import_key_images1(self):
@ -446,13 +478,25 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
def autosign_clean(self):
self.asi_ts.do_mount(self.silent,no_xmr_chk=True)
self.create_fake_tx_files()
before = '\n'.join(self._gen_listing())
t = self.spawn('mmgen-autosign', self.opts + ['clean'])
out = t.read()
self.asi_ts.do_mount(self.silent,no_xmr_chk=True)
after = '\n'.join(self._gen_listing())
for k in ('tx','msg','xmr'):
shutil.rmtree(self.asi_ts.mountpoint / k)
self.asi_ts.tx_dir.mkdir()
self.asi_ts.do_umount(self.silent)
chk = """
tx: a.sigtx b.sigtx c.rawtx d.sigtx
msg: a.sigmsg.json b.rawmsg.json c.sigmsg.json d.sigmsg.json
@ -460,9 +504,6 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
xmr/outputs:
"""
shutil.rmtree(self.asi_ts.mountpoint)
self.asi_ts.tx_dir.mkdir(parents=True)
imsg(f'\nBefore cleaning:\n{before}')
imsg(f'\nAfter cleaning:\n{after}')
@ -472,6 +513,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
return t
def txlist(self):
self.insert_device_ts()
t = self.spawn( 'mmgen-xmrwallet', self.autosign_opts + ['txlist'] )
t.match_expect_list([
'SUBMITTED',
@ -479,13 +521,22 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase):
'Transfer 1:0','-> ext',
'Transfer 1:0','-> ext'
])
self.remove_device_ts()
return t
def check_tx_dirs(self):
self.asi_ts.do_mount(self.silent)
before = '\n'.join(self._gen_listing())
self.asi_ts.do_umount(self.silent)
t = self.spawn('mmgen-autosign', self.opts + ['clean'])
t.read()
self.asi_ts.do_mount(self.silent)
after = '\n'.join(self._gen_listing())
self.asi_ts.do_umount(self.silent)
imsg(f'\nBefore cleaning:\n{before}')
imsg(f'\nAfter cleaning:\n{after}')
pat = r'xmr/tx: \s*\S+\.subtx \S+\.subtx\s+xmr/outputs:\s*$'

View file

@ -481,6 +481,8 @@ class CmdTestXMRWallet(CmdTestBase):
def sync_wallets(self,user,op='sync',wallets=None,add_opts=[],bal_chk_func=None):
data = self.users[user]
if data.autosign:
self.insert_device_ts()
cmd_opts = list_gen(
[f'--wallet-dir={data.udir}'],
[f'--daemon=localhost:{data.md.rpc_port}'],
@ -512,6 +514,9 @@ class CmdTestXMRWallet(CmdTestBase):
m = re.match( r'(\S+) Unlocked balance: (\S+)', res, re.DOTALL )
amts = [XMRAmt(amt) for amt in m.groups()]
assert bal_chk_func(n,*amts), f'balance check for wallet {n} failed!'
if data.autosign:
t.read()
self.remove_device_ts()
return t
def do_op(
@ -667,6 +672,8 @@ class CmdTestXMRWallet(CmdTestBase):
async def open_wallet_user(self,user,wnum):
data = self.users[user]
if data.autosign:
self.asi_ts.do_mount(self.silent)
silence()
kal = (ViewKeyAddrList if data.autosign else KeyAddrList)(
cfg = cfg,
@ -675,6 +682,8 @@ class CmdTestXMRWallet(CmdTestBase):
skip_chksum_msg = True,
key_address_validity_check = False )
end_silence()
if data.autosign:
self.asi_ts.do_umount(self.silent)
self.users[user].wd.start(silent=not (cfg.exact_output or cfg.verbose))
return data.wd_rpc.call(
'open_wallet',