autosign: improve checking/creation of dirs, add autosign_clean test

This commit is contained in:
The MMGen Project 2024-02-24 14:17:32 +00:00
commit 0129da60e5
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
5 changed files with 178 additions and 138 deletions

View file

@ -276,8 +276,15 @@ class Autosign:
}
dfl_mn_fmt = 'mmgen'
have_msg_dir = False
non_xmr_dirs = {
'tx_dir': 'tx',
'msg_dir': 'msg',
}
xmr_dirs = {
'xmr_dir': 'xmr',
'xmr_tx_dir': 'xmr/tx',
'xmr_outputs_dir': 'xmr/outputs',
}
have_xmr = False
xmr_only = False
@ -299,13 +306,8 @@ class Autosign:
self.cfg = cfg
self.init_cfg()
self.tx_dir = self.mountpoint / 'tx'
self.msg_dir = self.mountpoint / 'msg'
self.keyfile = self.mountpoint / 'autosign.key'
if any(k in cfg._uopts for k in ('help','longhelp')):
return
if 'coin' in cfg._uopts:
die(1,'--coin option not supported with this command. Use --coins instead')
@ -322,19 +324,22 @@ class Autosign:
self.have_xmr = True
if len(self.coins) == 1:
self.xmr_only = True
self.xmr_dir = self.mountpoint / 'xmr'
self.xmr_tx_dir = self.mountpoint / 'xmr' / 'tx'
self.xmr_outputs_dir = self.mountpoint / 'xmr' / 'outputs'
self.xmr_cur_wallet_idx = None
self.dirs = {}
self.to_sign = ()
if not self.xmr_only:
self.dirs |= self.non_xmr_dirs
self.to_sign += Signable.non_xmr_signables
if self.have_xmr:
self.dirs |= self.xmr_dirs
self.to_sign += Signable.xmr_signables
for name,path in self.dirs.items():
setattr(self, name, self.mountpoint / path)
async def check_daemons_running(self):
from .protocol import init_proto
for coin in self.coins:
@ -369,15 +374,20 @@ class Autosign:
return self._wallet_files
def do_mount(self,silent=False,no_dir_chk=False,no_xmr_chk=False):
def do_mount(self, silent=False):
def check_dir(cdir):
try:
ds = cdir.stat()
assert S_ISDIR(ds.st_mode), f"'{cdir}' is not a directory!"
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f"'{cdir}' is not read/write for this user!"
except:
die(1,f"'{cdir}' missing or not read/writable by user!")
def check_or_create(dirname):
path = getattr(self, dirname)
if path.is_dir():
if not path.stat().st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR:
die(1, f'{path}’ is not read/write for this user!')
elif path.exists():
die(1, f'{path}’ is not a directory!')
elif path.is_symlink():
die(1, f'{path}’ is a symlink not pointing to a directory!')
else:
msg(f'Creating ‘{path}')
path.mkdir(parents=True)
if not self.mountpoint.is_dir():
def do_die(m):
@ -397,18 +407,8 @@ class Autosign:
else:
die(1,f"Unable to mount device at '{self.mountpoint}'")
self.have_msg_dir = self.msg_dir.is_dir()
if no_dir_chk:
return
check_dir(self.tx_dir)
if self.have_msg_dir:
check_dir(self.msg_dir)
if 'XMR' in self.coins and not no_xmr_chk:
check_dir(self.xmr_tx_dir)
for dirname in self.dirs:
check_or_create(dirname)
def do_umount(self,silent=False):
if self.mountpoint.is_mount():
@ -507,7 +507,7 @@ class Autosign:
def gen_key(self,no_unmount=False):
if not self.get_insert_status():
die(1,'Removable device not present!')
self.do_mount(no_xmr_chk=True)
self.do_mount()
self.wipe_existing_key()
self.create_key()
if not no_unmount:
@ -587,10 +587,6 @@ class Autosign:
except:
pass
self.xmr_outputs_dir.mkdir(parents=True)
self.xmr_tx_dir.mkdir(exist_ok=True)
self.clean_old_files()
create_signing_wallets()

View file

@ -32,6 +32,7 @@ cmd_groups_dfl = {
'tool': ('CmdTestTool',{'full_data':True}),
'input': ('CmdTestInput',{}),
'output': ('CmdTestOutput',{'modname':'misc','full_data':True}),
'autosign_clean': ('CmdTestAutosignClean', {'modname':'autosign'}),
'autosign': ('CmdTestAutosign',{}),
'regtest': ('CmdTestRegtest',{}),
# 'chainsplit': ('CmdTestChainsplit',{}),
@ -216,6 +217,7 @@ cfgs = { # addr_idx_lists (except 31,32,33,34) must contain exactly 8 addresses
'32': {}, # ref_tx
'33': {}, # ref_tx
'34': {}, # ref_tx
'38': {}, # autosign_clean
'39': {}, # xmr_autosign
'40': {}, # cfgfile
'41': {}, # opts

View file

@ -26,7 +26,7 @@ from pathlib import Path
from mmgen.cfg import Config
from mmgen.color import red,green,blue,yellow,cyan,orange,purple,gray
from mmgen.util import msg,suf,die,indent
from mmgen.util import msg,suf,die,indent,fmt
from mmgen.led import LEDControl
from mmgen.autosign import Autosign, Signable
@ -107,10 +107,6 @@ class CmdTestAutosignBase(CmdTestBase):
run(['truncate', '--size=10M', img_file], check=True)
run(['/sbin/mkfs.ext2', '-E', f'root_owner={os.getuid()}:{os.getgid()}', img_file],
stdout=redir, stderr=redir, check=True)
self.do_mount(no_dir_chk=True)
self.asi.tx_dir.mkdir()
self.asi.msg_dir.mkdir()
self.do_umount()
def start_daemons(self):
self.spawn('',msg_only=True)
@ -178,11 +174,132 @@ class CmdTestAutosignBase(CmdTestBase):
def do_umount(self, *args, **kwargs):
return self._mount_ops('asi', 'do_umount', *args, **kwargs)
def do_mount_online(self, *args, **kwargs):
return self._mount_ops('asi_online', 'do_mount', *args, **kwargs)
def _gen_listing(self):
for k in self.asi.dirs:
d = getattr(self.asi,k)
if d.is_dir():
yield '{:12} {}'.format(
str(Path(*d.parts[6:])) + ':',
' '.join(sorted(i.name for i in d.iterdir()))).strip()
def do_umount_online(self, *args, **kwargs):
return self._mount_ops('asi_online', 'do_umount', *args, **kwargs)
class CmdTestAutosignClean(CmdTestAutosignBase):
have_online = False
live = False
simulate_led = True
no_insert_check = False
coins = ['btc']
tmpdir_nums = [38]
cmd_group = (
('clean_no_xmr', 'cleaning signable file directories (no XMR)'),
('clean_xmr_only', 'cleaning signable file directories (XMR-only)'),
('clean_all', 'cleaning signable file directories (with XMR)'),
)
def create_fake_tx_files(self):
imsg('Creating fake transaction files')
if not self.asi.xmr_only:
for fn in (
'a.rawtx', 'a.sigtx',
'b.rawtx', 'b.sigtx',
'c.rawtx',
'd.sigtx',
):
(self.asi.tx_dir / fn).touch()
for fn in (
'a.rawmsg.json', 'a.sigmsg.json',
'b.rawmsg.json',
'c.sigmsg.json',
'd.rawmsg.json', 'd.sigmsg.json',
):
(self.asi.msg_dir / fn).touch()
if self.asi.have_xmr:
for fn in (
'a.rawtx', 'a.sigtx', 'a.subtx',
'b.rawtx', 'b.sigtx',
'c.subtx',
'd.rawtx', 'd.subtx',
'e.rawtx',
'f.sigtx', 'f.subtx',
):
(self.asi.xmr_tx_dir / fn).touch()
for fn in (
'a.raw', 'a.sig',
'b.raw',
'c.sig',
):
(self.asi.xmr_outputs_dir / fn).touch()
return 'ok'
def clean_no_xmr(self):
return self._clean('btc,ltc,eth')
def clean_xmr_only(self):
self.asi = Autosign(Config({'_clone': self.asi.cfg, 'coins': 'xmr'}))
return self._clean('xmr')
def clean_all(self):
self.asi = Autosign(Config({'_clone': self.asi.cfg, 'coins': 'xmr,btc,bch,eth'}))
return self._clean('xmr,btc,bch,eth')
def _clean(self,coins):
self.spawn('', msg_only=True)
self.insert_device()
silence()
self.do_mount()
end_silence()
self.create_fake_tx_files()
before = '\n'.join(self._gen_listing())
t = self.spawn('mmgen-autosign', [f'--coins={coins}','clean'], no_msg=True)
out = t.read()
self.do_mount()
self.remove_device()
after = '\n'.join(self._gen_listing())
chk_non_xmr = """
tx: a.sigtx b.sigtx c.rawtx d.sigtx
msg: a.sigmsg.json b.rawmsg.json c.sigmsg.json d.sigmsg.json
"""
chk_xmr = """
xmr: outputs tx
xmr/tx: a.subtx b.sigtx c.subtx d.subtx e.rawtx f.subtx
xmr/outputs:
"""
chk = ''
shred_count = 0
if not self.asi.xmr_only:
for k in ('tx_dir','msg_dir'):
shutil.rmtree(getattr(self.asi, k))
chk += chk_non_xmr.rstrip()
shred_count += 4
if self.asi.have_xmr:
shutil.rmtree(self.asi.xmr_dir)
chk += chk_xmr.rstrip()
shred_count += 9
self.do_umount()
imsg(f'\nBefore cleaning:\n{before}')
imsg(f'\nAfter cleaning:\n{after}')
assert f'{shred_count} files shredded' in out
assert after + '\n' == fmt(chk), f'\n{after}\n!=\n{fmt(chk)}'
return t
class CmdTestAutosignThreaded(CmdTestAutosignBase):
have_online = True
@ -294,6 +411,8 @@ class CmdTestAutosign(CmdTestAutosignBase):
('remove_invalid_msgfile', 'removing invalid message file'),
('remove_bad_txfiles2', 'removing bad transaction files'),
('sign_no_unsigned', 'signing transactions and messages (nothing to sign)'),
('sign_no_unsigned_xmr', 'signing transactions and messages (nothing to sign, with XMR)'),
('sign_no_unsigned_xmronly', 'signing transactions and messages (nothing to sign, XMR-only)'),
('stop_daemons', 'stopping daemons'),
)
@ -538,17 +657,28 @@ class CmdTestAutosign(CmdTestAutosignBase):
present = ['non_xmr_signables'],
absent = ['xmr_signables'])
def sign_no_unsigned_xmr(self):
return self._sign_no_unsigned(
coins = 'XMR,BTC',
present = ['xmr_signables','non_xmr_signables'])
def sign_no_unsigned_xmronly(self):
return self._sign_no_unsigned(
coins = 'XMR',
present = ['xmr_signables'],
absent = ['non_xmr_signables'])
def _sign_no_unsigned(self,coins,present=[],absent=[]):
t = self.spawn('mmgen-autosign', ['--quiet', '--no-insert-check', f'--coins={coins}'])
res = t.read()
for signable_list in present:
for signable_clsname in getattr(Signable,signable_list):
desc = getattr(Signable, signable_clsname).desc
assert f'No unsigned {desc}' in res, f'{desc!r} missing in output'
assert f'No unsigned {desc}s' in res, f'‘No unsigned {desc}s’ missing in output'
for signable_list in absent:
for signable_clsname in getattr(Signable,signable_list):
desc = getattr(Signable, signable_clsname).desc
assert desc not in res, f'{desc!r} should be absent in output'
assert not f'No unsigned {desc}s' in res, f'‘No unsigned {desc}s’ should be absent in output'
return t
class CmdTestAutosignBTC(CmdTestAutosign):

View file

@ -57,7 +57,6 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignThreaded):
('new_address_alice_label', 'adding an address to Alice’s tmp wallet (with label)'),
('dump_tmp_wallets', 'dumping Alice’s tmp wallets'),
('delete_tmp_wallets', 'deleting Alice’s tmp wallets'),
('autosign_clean', 'cleaning signable file directories'),
('autosign_setup', 'autosign setup with Alice’s seed'),
('create_watchonly_wallets', 'creating online (watch-only) wallets for Alice'),
('delete_tmp_dump_files', 'deleting Alice’s dump files'),
@ -209,7 +208,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignThreaded):
def autosign_setup(self):
self.do_mount_online(no_xmr_chk=True)
self.do_mount_online()
self.asi_online.xmr_dir.mkdir(exist_ok=True)
(self.asi_online.xmr_dir / 'old.vkeys').touch()
@ -369,93 +368,6 @@ class CmdTestXMRAutosign(CmdTestXMRWallet,CmdTestAutosignThreaded):
def import_key_images2(self):
return self._import_key_images(None)
def create_fake_tx_files(self):
imsg('Creating fake transaction files')
self.asi_online.msg_dir.mkdir(exist_ok=True)
self.asi_online.xmr_dir.mkdir(exist_ok=True)
self.asi_online.xmr_tx_dir.mkdir(exist_ok=True)
self.asi_online.xmr_outputs_dir.mkdir(exist_ok=True)
for fn in (
'a.rawtx', 'a.sigtx',
'b.rawtx', 'b.sigtx',
'c.rawtx',
'd.sigtx',
):
(self.asi_online.tx_dir / fn).touch()
for fn in (
'a.rawmsg.json', 'a.sigmsg.json',
'b.rawmsg.json',
'c.sigmsg.json',
'd.rawmsg.json', 'd.sigmsg.json',
):
(self.asi_online.msg_dir / fn).touch()
for fn in (
'a.rawtx', 'a.sigtx', 'a.subtx',
'b.rawtx', 'b.sigtx',
'c.subtx',
'd.rawtx', 'd.subtx',
'e.rawtx',
'f.sigtx','f.subtx',
):
(self.asi_online.xmr_tx_dir / fn).touch()
for fn in (
'a.raw', 'a.sig',
'b.raw',
'c.sig',
):
(self.asi_online.xmr_outputs_dir / fn).touch()
return 'ok'
def _gen_listing(self):
for k in ('tx_dir','msg_dir','xmr_tx_dir','xmr_outputs_dir'):
d = getattr(self.asi_online,k)
if d.is_dir():
yield '{:12} {}'.format(
str(Path(*d.parts[6:])) + ':',
' '.join(sorted(i.name for i in d.iterdir()))).strip()
def autosign_clean(self):
self.do_mount_online(no_xmr_chk=True)
self.create_fake_tx_files()
before = '\n'.join(self._gen_listing())
t = self.spawn('mmgen-autosign', self.opts + ['clean'])
out = t.read()
self.do_mount_online(no_xmr_chk=True)
after = '\n'.join(self._gen_listing())
for k in ('tx','msg','xmr'):
shutil.rmtree(self.asi_online.mountpoint / k)
self.asi_online.tx_dir.mkdir()
self.do_umount_online()
chk = """
tx: a.sigtx b.sigtx c.rawtx d.sigtx
msg: a.sigmsg.json b.rawmsg.json c.sigmsg.json d.sigmsg.json
xmr/tx: a.subtx b.sigtx c.subtx d.subtx e.rawtx f.subtx
xmr/outputs:
"""
imsg(f'\nBefore cleaning:\n{before}')
imsg(f'\nAfter cleaning:\n{after}')
assert '13 files shredded' in out
assert after + '\n' == fmt(chk), f'\n{after}\n!=\n{fmt(chk)}'
return t
def txlist(self):
self.insert_device_online()
t = self.spawn( 'mmgen-xmrwallet', self.autosign_opts + ['txlist'] )

View file

@ -154,7 +154,7 @@ init_tests() {
t_etc="parity $cmdtest_py --coin=etc ethdev"
d_autosign="transaction and message autosigning"
t_autosign="- $cmdtest_py autosign"
t_autosign="- $cmdtest_py autosign autosign_clean"
d_autosign_btc="transaction and message autosigning (Bitcoin only)"
t_autosign_btc="- $cmdtest_py autosign_btc"
@ -164,7 +164,7 @@ init_tests() {
d_btc="overall operations with emulated RPC data (Bitcoin)"
t_btc="
- $cmdtest_py --exclude regtest,autosign,ref_altcoin
- $cmdtest_py --exclude regtest,autosign,autosign_clean,ref_altcoin
- $cmdtest_py --segwit
- $cmdtest_py --segwit-random
- $cmdtest_py --bech32