From 27bd25a733d246873259300dc7dc7c6ec767513f Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Thu, 15 Feb 2024 09:28:09 +0000 Subject: [PATCH] cmdtest xmr_autosign: use virtual removable device --- mmgen/autosign.py | 20 +++----- mmgen/data/version | 2 +- mmgen/xmrwallet.py | 4 +- test/cmdtest_py_d/ct_autosign.py | 46 +++++++++++++++-- test/cmdtest_py_d/ct_xmr_autosign.py | 77 +++++++++++++++++++++++----- test/cmdtest_py_d/ct_xmrwallet.py | 9 ++++ 6 files changed, 125 insertions(+), 33 deletions(-) diff --git a/mmgen/autosign.py b/mmgen/autosign.py index 27b4114c..9654caf1 100755 --- a/mmgen/autosign.py +++ b/mmgen/autosign.py @@ -246,12 +246,13 @@ class Autosign: fmt_list( self.mn_fmts, fmt='no_spc' ) )) if pfx := cfg.test_suite_root_pfx: - subdir = 'online' + subdir = 'online' if cfg.online else 'offline' self.mountpoint = Path(f'{pfx}/{subdir}/{self.dfl_mountpoint}') self.wallet_dir = Path(f'{pfx}/{subdir}/{self.dfl_wallet_dir}') self.dev_label_path = Path(f'{pfx}/{subdir}/{self.dfl_dev_label_dir}') / self.dev_label # mount --type=fuse-ext2 --options=rw+ ### current fuse-ext2 (0.4 29) is buggy - can’t use - self.mount_cmd = f'sudo mount {pfx}/removable_device' + self.fs_image_path = Path(f'{pfx}/removable_device_image') + self.mount_cmd = f'sudo mount {self.fs_image_path}' self.umount_cmd = 'sudo umount' else: self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint) @@ -346,7 +347,7 @@ class Autosign: stdout = DEVNULL).returncode == 0: if not silent: msg(f"Mounting '{self.mountpoint}'") - elif not self.cfg.test_suite: + else: die(1,f"Unable to mount device at '{self.mountpoint}'") self.have_msg_dir = self.msg_dir.is_dir() @@ -523,7 +524,7 @@ class Autosign: 'autosign': True, 'autosign_mountpoint': str(self.mountpoint), 'outdir': str(self.xmr_dir), # required by vkal.write() - 'offline': False, + 'offline': True, }) return self._xmrwallet_cfg @@ -604,24 +605,19 @@ class Autosign: async def main_loop(self): if not self.cfg.stealth_led: self.led.set('standby') - testing_xmr = self.cfg.test_suite_xmr_autosign - if testing_xmr: - msg('Waiting for fake device insertion') - n = 1 if testing_xmr else 0 + silent = self.cfg.test_suite_xmr_autosign + n = 1 if silent else 0 prev_status = False while True: status = self.get_insert_status() if status and not prev_status: msg('Device insertion detected') await self.do_sign() - if testing_xmr: - if self.dev_label_path.exists(): - self.dev_label_path.unlink() prev_status = status if not n % 10: msg_r(f"\r{' '*17}\rWaiting") await asyncio.sleep(1) - if not testing_xmr: + if not silent: msg_r('.') n += 1 diff --git a/mmgen/data/version b/mmgen/data/version index 42ba33dc..dd68960e 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -14.1.dev9 +14.1.dev10 diff --git a/mmgen/xmrwallet.py b/mmgen/xmrwallet.py index a15e9282..22d8269c 100755 --- a/mmgen/xmrwallet.py +++ b/mmgen/xmrwallet.py @@ -84,7 +84,7 @@ def get_autosign_obj(cfg): 'test_suite': cfg.test_suite, 'test_suite_root_pfx': cfg.test_suite_root_pfx, 'coins': 'xmr', - 'online': True, + 'online': not cfg.offline, }) ) @@ -759,7 +759,7 @@ class MoneroWalletOps: pass def mount_removable_device(self): - if self.cfg.autosign and not self.cfg.test_suite: + if self.cfg.autosign: if not self.asi.get_insert_status(): die(1,'Removable device not present!') if self.do_umount: diff --git a/test/cmdtest_py_d/ct_autosign.py b/test/cmdtest_py_d/ct_autosign.py index a3d1b9b9..f4525cba 100755 --- a/test/cmdtest_py_d/ct_autosign.py +++ b/test/cmdtest_py_d/ct_autosign.py @@ -21,7 +21,7 @@ test.cmdtest_py_d.ct_autosign: Autosign tests for the cmdtest.py test suite """ import sys,os,shutil -from subprocess import run +from subprocess import run,DEVNULL from pathlib import Path from mmgen.cfg import Config @@ -83,7 +83,8 @@ class CmdTestAutosignBase(CmdTestBase): self._create_autosign_instances(create_dirs=not cfg.skipping_deps) - (self.asi_ts.mountpoint / 'tx').mkdir() + if not (cfg.skipping_deps or self.live): + self._create_removable_device() if self.simulate_led and not cfg.exact_output: die(1,red('This command must be run with --exact-output enabled!')) @@ -136,9 +137,19 @@ class CmdTestAutosignBase(CmdTestBase): 'test_suite': True, 'test_suite_xmr_autosign': self.name == 'CmdTestXMRAutosign', 'test_suite_root_pfx': None if self.live else self.tmpdir, - 'online': True, + 'online': subdir == 'online', }))) + def _create_removable_device(self): + redir = DEVNULL + img_file = str(self.asi_ts.fs_image_path) + run(['truncate', '--size=10M', img_file], check=True) + run(['/sbin/mkfs.ext2', '-E', f'root_owner={os.getuid()}:{os.getgid()}', img_file], + stdout=redir, stderr=redir, check=True) + run(self.asi_ts.mount_cmd.split() + [str(self.asi_ts.mountpoint)], stdout=redir, check=True) + (self.asi_ts.mountpoint / 'tx').mkdir() + run(self.asi_ts.umount_cmd.split() + [str(self.asi_ts.mountpoint)], check=True) + def __del__(self): if sys.platform == 'win32' or self.tr is None: return @@ -239,6 +250,8 @@ class CmdTestAutosignBase(CmdTestBase): if op == 'set_count': return + self.asi_ts.do_mount(self.silent) + for coindir,fn in data: src = joinpath(ref_dir,coindir,fn) if cfg.debug_utf8: @@ -252,6 +265,8 @@ class CmdTestAutosignBase(CmdTestBase): except: pass + self.asi_ts.do_umount(self.silent) + return 'ok' def create_bad_txfiles(self): @@ -264,8 +279,7 @@ class CmdTestAutosignBase(CmdTestBase): remove_bad_txfiles2 = remove_bad_txfiles def bad_txfiles(self,op): - if self.live: - self.asi.do_mount(self.silent) + self.asi_ts.do_mount(self.silent) # create or delete 2 bad tx files self.spawn('',msg_only=True) fns = [joinpath(self.asi_ts.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)] @@ -281,6 +295,7 @@ class CmdTestAutosignBase(CmdTestBase): except: pass self.bad_tx_count = 0 + self.asi_ts.do_umount(self.silent) return 'ok' def copy_msgfiles(self): @@ -298,6 +313,7 @@ class CmdTestAutosignBase(CmdTestBase): def msgfile_ops(self,op): self.spawn('',msg_only=True) destdir = joinpath(self.asi_ts.mountpoint,'msg') + self.asi_ts.do_mount(self.silent) os.makedirs(destdir,exist_ok=True) if op.endswith('_invalid'): fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json') @@ -319,6 +335,7 @@ class CmdTestAutosignBase(CmdTestBase): shutil.copy2(fn,destdir) elif op == 'remove_signed': os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') )) + self.asi_ts.do_umount(self.silent) return 'ok' def do_sign(self,args,have_msg=False,tx_name='transaction'): @@ -354,9 +371,28 @@ class CmdTestAutosignBase(CmdTestBase): imsg('') return t + @property + def device_inserted(self): + return self.asi.dev_label_path.exists() + + @property + def device_inserted_ts(self): + return self.asi_ts.dev_label_path.exists() + def insert_device(self): self.asi.dev_label_path.touch() + def insert_device_ts(self): + self.asi_ts.dev_label_path.touch() + + def remove_device(self): + if self.asi.dev_label_path.exists(): + self.asi.dev_label_path.unlink() + + def remove_device_ts(self): + if self.asi_ts.dev_label_path.exists(): + self.asi_ts.dev_label_path.unlink() + class CmdTestAutosign(CmdTestAutosignBase): 'autosigning transactions for all supported coins' coins = ['btc','bch','ltc','eth'] diff --git a/test/cmdtest_py_d/ct_xmr_autosign.py b/test/cmdtest_py_d/ct_xmr_autosign.py index 154a029d..de2713f2 100755 --- a/test/cmdtest_py_d/ct_xmr_autosign.py +++ b/test/cmdtest_py_d/ct_xmr_autosign.py @@ -186,6 +186,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): def _dump_wallets(self,autosign): data = self.users['alice'] + self.insert_device_ts() t = self.spawn( 'mmgen-xmrwallet', self.extra_opts @@ -194,6 +195,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): + ['dump'] + ([] if autosign else [get_file_with_ext(data.udir,'akeys')]) ) t.expect('2 wallets dumped') + self.remove_device_ts() return t def _delete_files(self,*ext_list): @@ -222,20 +224,28 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): return self.fund_alice(wallet=2) def autosign_setup(self): + + self.asi_ts.do_mount(self.silent,no_xmr_chk=True) + + self.asi_ts.xmr_dir.mkdir(exist_ok=True) + (self.asi_ts.xmr_dir / 'old.vkeys').touch() + + self.asi_ts.do_umount(self.silent) + self.insert_device() - Path(self.asi_ts.xmr_dir).mkdir(parents=True,exist_ok=True) - Path(self.asi_ts.xmr_dir,'old.vkeys').touch() + t = self.run_setup( mn_type = 'mmgen', mn_file = self.users['alice'].mmwords, use_dfl_wallet = None ) t.expect('Continue with Monero setup? (Y/n): ','y') t.written_to_file('View keys') + + self.remove_device() + return t def autosign_start_thread(self): - if self.asi.dev_label_path.exists(): - self.asi.dev_label_path.unlink() def run(): t = self.spawn('mmgen-autosign', self.opts + ['wait'], direct_exec=True) self.write_to_tmpfile('autosign_thread_pid',str(t.ep.pid)) @@ -257,33 +267,50 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): return 'ok' def create_watchonly_wallets(self): - return self.create_wallets( 'alice', op='restore' ) + self.insert_device_ts() + t = self.create_wallets('alice', op='restore') + t.read() # required! + self.remove_device_ts() + return t def restore_wallets(self): return self.create_watchonly_wallets() def _create_transfer_tx(self,amt): + self.insert_device_ts() t = self.do_op('transfer','alice',f'1:0:{self.burn_addr},{amt}',no_relay=True,do_ret=True) t.read() # required! - self.insert_device() + self.remove_device_ts() return t def create_transfer_tx1(self): return self._create_transfer_tx('0.124') def create_transfer_tx2(self): + self.asi_ts.do_mount(self.silent) get_file_with_ext(self.asi_ts.xmr_tx_dir,'rawtx',delete_all=True) get_file_with_ext(self.asi_ts.xmr_tx_dir,'sigtx',delete_all=True) + self.asi_ts.do_umount(self.silent) return self._create_transfer_tx('0.257') def _wait_signed(self,dtype): oqmsg_r(gray(f'→ offline wallet{"s" if dtype.endswith("s") else ""} signing {dtype}')) + assert not self.device_inserted, f'‘{self.asi.dev_label_path}’ is inserted!' + assert not self.asi.mountpoint.is_mount(), f'‘{self.asi.mountpoint}’ is mounted!' + self.insert_device() while True: oqmsg_r(gray('.')) - if not self.asi.dev_label_path.exists(): + if self.asi.mountpoint.is_mount(): + oqmsg_r(gray('..working..')) break time.sleep(0.5) - oqmsg(gray('done')) + while True: + oqmsg_r(gray('.')) + if not self.asi.mountpoint.is_mount(): + oqmsg(gray('..done')) + break + time.sleep(0.5) + self.remove_device() def _xmr_autosign_op( self, @@ -349,6 +376,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): return self._submit_transfer_tx( relay_parm=self.tx_relay_daemon_parm ) def _submit_transfer_tx(self,relay_parm=None,ext=None,op='submit',check_bal=True): + self.insert_device_ts() t = self._xmr_autosign_op( op = op, add_opts = [f'--tx-relay-daemon={relay_parm}'] if relay_parm else [], @@ -357,6 +385,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): wait_signed = op == 'submit' ) t.expect( f'{op.capitalize()} transaction? (y/N): ', 'y' ) t.written_to_file('Submitted transaction') + self.remove_device_ts() if check_bal: t.ok() return self._mine_chk('unlocked') @@ -364,13 +393,13 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): return t def _export_outputs(self,wallet_arg,add_opts=[]): + self.insert_device_ts() t = self._xmr_autosign_op( op = 'export-outputs', wallet_arg = wallet_arg, add_opts = add_opts ) t.written_to_file('Wallet outputs') - t.read() # required! - self.insert_device() + self.remove_device_ts() return t def export_outputs1(self): @@ -380,11 +409,14 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): return self._export_outputs('1-2') def _import_key_images(self,wallet_arg): + self.insert_device_ts() t = self._xmr_autosign_op( op = 'import-key-images', wallet_arg = wallet_arg, dtype = 'wallet outputs', wait_signed = True ) + t.read() + self.remove_device_ts() return t def import_key_images1(self): @@ -446,13 +478,25 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): def autosign_clean(self): + self.asi_ts.do_mount(self.silent,no_xmr_chk=True) + self.create_fake_tx_files() before = '\n'.join(self._gen_listing()) t = self.spawn('mmgen-autosign', self.opts + ['clean']) out = t.read() + self.asi_ts.do_mount(self.silent,no_xmr_chk=True) + after = '\n'.join(self._gen_listing()) + + for k in ('tx','msg','xmr'): + shutil.rmtree(self.asi_ts.mountpoint / k) + + self.asi_ts.tx_dir.mkdir() + + self.asi_ts.do_umount(self.silent) + chk = """ tx: a.sigtx b.sigtx c.rawtx d.sigtx msg: a.sigmsg.json b.rawmsg.json c.sigmsg.json d.sigmsg.json @@ -460,9 +504,6 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): xmr/outputs: """ - shutil.rmtree(self.asi_ts.mountpoint) - self.asi_ts.tx_dir.mkdir(parents=True) - imsg(f'\nBefore cleaning:\n{before}') imsg(f'\nAfter cleaning:\n{after}') @@ -472,6 +513,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): return t def txlist(self): + self.insert_device_ts() t = self.spawn( 'mmgen-xmrwallet', self.autosign_opts + ['txlist'] ) t.match_expect_list([ 'SUBMITTED', @@ -479,13 +521,22 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignBase): 'Transfer 1:0','-> ext', 'Transfer 1:0','-> ext' ]) + self.remove_device_ts() return t def check_tx_dirs(self): + + self.asi_ts.do_mount(self.silent) before = '\n'.join(self._gen_listing()) + self.asi_ts.do_umount(self.silent) + t = self.spawn('mmgen-autosign', self.opts + ['clean']) t.read() + + self.asi_ts.do_mount(self.silent) after = '\n'.join(self._gen_listing()) + self.asi_ts.do_umount(self.silent) + imsg(f'\nBefore cleaning:\n{before}') imsg(f'\nAfter cleaning:\n{after}') pat = r'xmr/tx: \s*\S+\.subtx \S+\.subtx\s+xmr/outputs:\s*$' diff --git a/test/cmdtest_py_d/ct_xmrwallet.py b/test/cmdtest_py_d/ct_xmrwallet.py index 887e7743..a2bb5f7d 100755 --- a/test/cmdtest_py_d/ct_xmrwallet.py +++ b/test/cmdtest_py_d/ct_xmrwallet.py @@ -481,6 +481,8 @@ class CmdTestXMRWallet(CmdTestBase): def sync_wallets(self,user,op='sync',wallets=None,add_opts=[],bal_chk_func=None): data = self.users[user] + if data.autosign: + self.insert_device_ts() cmd_opts = list_gen( [f'--wallet-dir={data.udir}'], [f'--daemon=localhost:{data.md.rpc_port}'], @@ -512,6 +514,9 @@ class CmdTestXMRWallet(CmdTestBase): m = re.match( r'(\S+) Unlocked balance: (\S+)', res, re.DOTALL ) amts = [XMRAmt(amt) for amt in m.groups()] assert bal_chk_func(n,*amts), f'balance check for wallet {n} failed!' + if data.autosign: + t.read() + self.remove_device_ts() return t def do_op( @@ -667,6 +672,8 @@ class CmdTestXMRWallet(CmdTestBase): async def open_wallet_user(self,user,wnum): data = self.users[user] + if data.autosign: + self.asi_ts.do_mount(self.silent) silence() kal = (ViewKeyAddrList if data.autosign else KeyAddrList)( cfg = cfg, @@ -675,6 +682,8 @@ class CmdTestXMRWallet(CmdTestBase): skip_chksum_msg = True, key_address_validity_check = False ) end_silence() + if data.autosign: + self.asi_ts.do_umount(self.silent) self.users[user].wd.start(silent=not (cfg.exact_output or cfg.verbose)) return data.wd_rpc.call( 'open_wallet',