From ab28caa75b3bbbfb01467a077e813148df14668e Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Fri, 28 Apr 2023 11:23:23 +0000 Subject: [PATCH] autosign,xmrwallet: use pathlib wherever possible --- mmgen/autosign.py | 146 ++++++++++++++-------------- mmgen/proto/xmr/daemon.py | 4 +- mmgen/xmrwallet.py | 156 +++++++++++++++--------------- test/test_py_d/ts_autosign.py | 15 +-- test/test_py_d/ts_xmr_autosign.py | 2 +- 5 files changed, 163 insertions(+), 160 deletions(-) diff --git a/mmgen/autosign.py b/mmgen/autosign.py index 694174fd..d7bb4d24 100755 --- a/mmgen/autosign.py +++ b/mmgen/autosign.py @@ -13,6 +13,7 @@ autosign: Auto-sign MMGen transactions, message files and XMR wallet output file """ import sys,os,asyncio +from pathlib import Path from subprocess import run,PIPE,DEVNULL from collections import namedtuple @@ -45,7 +46,7 @@ class Signable: def _unprocessed(self,attrname,rawext,sigext): if not hasattr(self,attrname): - dirlist = tuple(os.scandir(self.dir)) + dirlist = tuple(self.dir.iterdir()) names = tuple(f.name for f in dirlist) setattr( self, @@ -70,7 +71,7 @@ class Signable: async def sign(self,f): from .tx import UnsignedTX - tx1 = UnsignedTX( cfg=self.cfg, filename=f.path ) + tx1 = UnsignedTX( cfg=self.cfg, filename=f ) if tx1.proto.sign_mode == 'daemon': from .rpc import rpc_init tx1.rpc = await rpc_init( self.cfg, tx1.proto ) @@ -118,7 +119,7 @@ class Signable: def gen_bad_list(self,bad_files): for f in bad_files: - yield red(f.path) + yield red(f.name) class xmr_transaction(transaction): dir_name = 'xmr_tx_dir' @@ -132,15 +133,15 @@ class Signable: async def sign(self,f): from .xmrwallet import MoneroMMGenTX,MoneroWalletOps,xmrwallet_uargs - tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f.path ) + tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f ) m = MoneroWalletOps.sign( self.parent.xmrwallet_cfg, xmrwallet_uargs( - infile = self.parent.wallet_files[0], # MMGen wallet file + infile = str(self.parent.wallet_files[0]), # MMGen wallet file wallets = str(tx1.src_wallet_idx), spec = None ), ) - tx2 = await m.main(f.path) # TODO: stop wallet daemon? + tx2 = await m.main(f) # TODO: stop wallet daemon? tx2.write(ask_write=False) return tx2 @@ -156,11 +157,11 @@ class Signable: async def sign(self,f): from .xmrwallet import MoneroWalletOps,xmrwallet_uargs - wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f.name) + wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f) m = MoneroWalletOps.export_key_images( self.parent.xmrwallet_cfg, xmrwallet_uargs( - infile = self.parent.wallet_files[0], # MMGen wallet file + infile = str(self.parent.wallet_files[0]), # MMGen wallet file wallets = str(wallet_idx), spec = None ), ) @@ -181,11 +182,11 @@ class Signable: async def sign(self,f): from .msg import UnsignedMsg,SignedMsg - m = UnsignedMsg( self.cfg, infile=f.path ) + m = UnsignedMsg( self.cfg, infile=f ) await m.sign( wallet_files=self.parent.wallet_files[:] ) m = SignedMsg( self.cfg, data=m.__dict__ ) m.write_to_file( - outdir = os.path.abspath(self.dir), + outdir = self.dir.resolve(), ask_overwrite = False ) if m.data.get('failed_sids'): die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}') @@ -194,28 +195,28 @@ class Signable: def print_summary(self,messages): gmsg('\nSigned message files:') for m in messages: - gmsg(' ' + os.path.join( self.dir, m.signed_filename )) + gmsg(' ' + m.signed_filename) def gen_bad_list(self,bad_files): for f in bad_files: - sigfile = f.path[:-len(self.rawext)] + self.sigext - yield orange(sigfile) if os.path.exists(sigfile) else red(f.path) + sigfile = f.parent / ( f.name[:-len(self.rawext)] + self.sigext ) + yield orange(sigfile.name) if sigfile.exists() else red(f.name) class Autosign: - dfl_mountpoint = os.path.join(os.sep,'mnt','mmgen_autosign') - dfl_wallet_dir = os.path.join(os.sep,'dev','shm','autosign') - disk_label_dir = os.path.join(os.sep,'dev','disk','by-label') - part_label = 'MMGEN_TX' + dfl_mountpoint = '/mnt/mmgen_autosign' + dfl_wallet_dir = '/dev/shm/autosign' + old_dfl_mountpoint = '/mnt/tx' + + dev_disk_path = Path('/dev/disk/by-label/MMGEN_TX') - old_dfl_mountpoint = os.path.join(os.sep,'mnt','tx') old_dfl_mountpoint_errmsg = f""" - Mountpoint {old_dfl_mountpoint!r} is no longer supported! - Please rename {old_dfl_mountpoint!r} to {dfl_mountpoint!r} + Mountpoint '{old_dfl_mountpoint}' is no longer supported! + Please rename '{old_dfl_mountpoint}' to '{dfl_mountpoint}' and update your fstab accordingly. """ mountpoint_errmsg_fs = """ - Mountpoint {!r} does not exist or does not point + Mountpoint '{}' does not exist or does not point to a directory! Please create the mountpoint and add an entry to your fstab as described in this script’s help text. """ @@ -230,23 +231,23 @@ class Autosign: def __init__(self,cfg): + self.cfg = cfg + if cfg.mnemonic_fmt: if cfg.mnemonic_fmt not in self.mn_fmts: die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format( cfg.mnemonic_fmt, fmt_list( self.mn_fmts, fmt='no_spc' ) )) - self.cfg = cfg + self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint) + self.wallet_dir = Path(cfg.wallet_dir or self.dfl_wallet_dir) - self.mountpoint = cfg.mountpoint or self.dfl_mountpoint - self.wallet_dir = cfg.wallet_dir or self.dfl_wallet_dir + self.tx_dir = self.mountpoint / 'tx' + self.msg_dir = self.mountpoint / 'msg' + self.keyfile = self.mountpoint / 'autosign.key' - self.tx_dir = os.path.join( self.mountpoint, 'tx' ) - self.msg_dir = os.path.join( self.mountpoint, 'msg' ) - self.keyfile = os.path.join( self.mountpoint, 'autosign.key' ) - - cfg.outdir = self.tx_dir - cfg.passwd_file = self.keyfile + cfg.outdir = str(self.tx_dir) + cfg.passwd_file = str(self.keyfile) if any(k in cfg._uopts for k in ('help','longhelp')): return @@ -264,9 +265,9 @@ class Autosign: self.coins = ['BTC'] if 'XMR' in self.coins: - self.xmr_dir = os.path.join( self.mountpoint, 'xmr' ) - self.xmr_tx_dir = os.path.join( self.mountpoint, 'xmr', 'tx' ) - self.xmr_outputs_dir = os.path.join( self.mountpoint, 'xmr', 'outputs' ) + self.xmr_dir = self.mountpoint / 'xmr' + self.xmr_tx_dir = self.mountpoint / 'xmr' / 'tx' + self.xmr_outputs_dir = self.mountpoint / 'xmr' / 'outputs' async def check_daemons_running(self): from .protocol import init_proto @@ -291,14 +292,13 @@ class Autosign: if not hasattr(self,'_wallet_files'): try: - dirlist = os.listdir(self.wallet_dir) + dirlist = self.wallet_dir.iterdir() except: - die(1,f'Cannot open wallet directory {self.wallet_dir!r}. Did you run ‘mmgen-autosign setup’?') + die(1,f"Cannot open wallet directory '{self.wallet_dir}'. Did you run ‘mmgen-autosign setup’?") - fns = [fn for fn in dirlist if fn.endswith('.mmdat')] - if fns: - self._wallet_files = [os.path.join(self.wallet_dir,fn) for fn in fns] - else: + self._wallet_files = [f for f in dirlist if f.suffix == '.mmdat'] + + if not self._wallet_files: die(1,'No wallet files present!') return self._wallet_files @@ -309,27 +309,27 @@ class Autosign: def check_dir(cdir): try: - ds = os.stat(cdir) - assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!' - assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!' + ds = cdir.stat() + assert S_ISDIR(ds.st_mode), f"'{cdir}' is not a directory!" + assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f"'{cdir}' is not read/write for this user!" except: - die(1,f'{cdir!r} missing or not read/writable by user!') + die(1,f"'{cdir}' missing or not read/writable by user!") - if not os.path.isdir(self.mountpoint): + if not self.mountpoint.is_dir(): def do_die(m): die(1,'\n' + yellow(fmt(m.strip(),indent=' '))) - if os.path.isdir(self.old_dfl_mountpoint): + if Path(self.old_dfl_mountpoint).is_dir(): do_die(self.old_dfl_mountpoint_errmsg) else: do_die(self.mountpoint_errmsg_fs.format(self.mountpoint)) - if not os.path.ismount(self.mountpoint): + if not self.mountpoint.is_mount(): if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0: - msg(f'Mounting {self.mountpoint!r}') + msg(f"Mounting '{self.mountpoint}'") elif not self.cfg.test_suite: - die(1,f'Unable to mount device at {self.mountpoint!r}') + die(1,f"Unable to mount device at '{self.mountpoint}'") - self.have_msg_dir = os.path.isdir(self.msg_dir) + self.have_msg_dir = self.msg_dir.is_dir() check_dir(self.tx_dir) @@ -340,14 +340,14 @@ class Autosign: check_dir(self.xmr_tx_dir) def do_umount(self): - if os.path.ismount(self.mountpoint): + if self.mountpoint.is_mount(): run( ['sync'], check=True ) - msg(f'Unmounting {self.mountpoint!r}') + msg(f"Unmounting '{self.mountpoint}'") run( ['umount',self.mountpoint], check=True ) bmsg('It is now safe to extract the removable device') def decrypt_wallets(self): - msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}') + msg(f"Unlocking wallet{suf(self.wallet_files)} with key from '{self.cfg.passwd_file}'") fails = 0 for wf in self.wallet_files: try: @@ -368,9 +368,9 @@ class Autosign: try: ret = await target.sign(f) except Exception as e: - ymsg(f'An error occurred with {target.desc} {f.name!r}:\n {type(e).__name__}: {e!s}') + ymsg(f"An error occurred with {target.desc} '{f.name}':\n {type(e).__name__}: {e!s}") except: - ymsg(f'An error occurred with {target.desc} {f.name!r}') + ymsg(f"An error occurred with {target.desc} '{f.name}'") good.append(ret) if ret else bad.append(f) self.cfg._util.qmsg('') await asyncio.sleep(0.3) @@ -412,24 +412,22 @@ class Autosign: return False def wipe_existing_key(self): - try: os.stat(self.keyfile) + try: self.keyfile.stat() except: pass else: from .fileutil import shred_file - msg(f'\nShredding existing key {self.keyfile!r}') + msg(f"\nShredding existing key '{self.keyfile}'") shred_file( self.keyfile, verbose=self.cfg.verbose ) def create_key(self): - kdata = os.urandom(32).hex() - desc = f'key file {self.keyfile!r}' + desc = f"key file '{self.keyfile}'" msg('Creating ' + desc) try: - with open(self.keyfile,'w') as fp: - fp.write(kdata+'\n') - os.chmod(self.keyfile,0o400) - msg('Wrote ' + desc) + self.keyfile.write_text( os.urandom(32).hex() ) + self.keyfile.chmod(0o400) except: die(2,'Unable to write ' + desc) + msg('Wrote ' + desc) def gen_key(self,no_unmount=False): self.create_wallet_dir() @@ -448,10 +446,10 @@ class Autosign: except: pass def create_wallet_dir(self): - try: os.mkdir(self.wallet_dir) + try: self.wallet_dir.mkdir(parents=True) except: pass - try: os.stat(self.wallet_dir) - except: die(2,f'Unable to create wallet directory {self.wallet_dir!r}') + try: self.wallet_dir.stat() + except: die(2,f"Unable to create wallet directory '{self.wallet_dir}'") def setup(self): self.remove_wallet_dir() @@ -459,7 +457,7 @@ class Autosign: wf = find_file_in_dir( get_wallet_cls('mmgen'), self.cfg.data_dir ) if wf and keypress_confirm( cfg = self.cfg, - prompt = f'Default wallet {wf!r} found.\nUse default wallet for autosigning?', + prompt = f"Default wallet '{wf}' found.\nUse default wallet for autosigning?", default_yes = True ): from .cfg import Config ss_in = Wallet( Config(), wf ) @@ -477,10 +475,10 @@ class Autosign: 'wallet_rpc_user': 'autosigner', 'wallet_rpc_password': 'my very secret password', 'passwd_file': self.cfg.passwd_file, - 'wallet_dir': self.wallet_dir, + 'wallet_dir': str(self.wallet_dir), 'autosign': True, - 'autosign_mountpoint': self.mountpoint, - 'outdir': self.xmr_dir, # required by vkal.write() + 'autosign_mountpoint': str(self.mountpoint), + 'outdir': str(self.xmr_dir), # required by vkal.write() }) return self._xmrwallet_cfg @@ -490,16 +488,16 @@ class Autosign: try: shutil.rmtree(self.xmr_outputs_dir) except: pass - os.makedirs(self.xmr_outputs_dir) + self.xmr_outputs_dir.mkdir(parents=True) - os.makedirs(self.xmr_tx_dir,exist_ok=True) + self.xmr_tx_dir.mkdir(exist_ok=True) from .addrfile import ViewKeyAddrFile from .fileutil import shred_file - for f in os.scandir(self.xmr_dir): + for f in self.xmr_dir.iterdir(): if f.name.endswith(ViewKeyAddrFile.ext): msg(f'Shredding old viewkey-address file {f.name!r}') - shred_file(f.path) + shred_file(f) if len(self.wallet_files) > 1: ymsg(f'Warning: more that one wallet file, using the first ({self.wallet_files[0]}) for xmrwallet generation') @@ -518,7 +516,7 @@ class Autosign: def get_insert_status(self): if self.cfg.no_insert_check: return True - try: os.stat(os.path.join( self.disk_label_dir, self.part_label )) + try: self.dev_disk_path.stat() except: return False else: return True diff --git a/mmgen/proto/xmr/daemon.py b/mmgen/proto/xmr/daemon.py index 2e248638..3565db03 100755 --- a/mmgen/proto/xmr/daemon.py +++ b/mmgen/proto/xmr/daemon.py @@ -147,8 +147,8 @@ class MoneroWalletDaemon(RPCDaemon): ['--trusted-daemon', trust_daemon], ['--untrusted-daemon', not trust_daemon], [f'--rpc-bind-port={self.rpc_port}'], - ['--wallet-dir='+self.wallet_dir], - ['--log-file='+self.logfile], + [f'--wallet-dir={self.wallet_dir}'], + [f'--log-file={self.logfile}'], [f'--rpc-login={self.user}:{self.passwd}'], [f'--daemon-address={self.daemon_addr}', self.daemon_addr], [f'--daemon-port={self.daemon_port}', not self.daemon_addr], diff --git a/mmgen/xmrwallet.py b/mmgen/xmrwallet.py index f958a75c..36f3d003 100755 --- a/mmgen/xmrwallet.py +++ b/mmgen/xmrwallet.py @@ -20,8 +20,10 @@ xmrwallet.py - MoneroWalletOps class """ -import os,re,time,json +import re,time,json from collections import namedtuple +from pathlib import PosixPath as Path + from .objmethods import MMGenObject,Hilite,InitErrors from .obj import CoinTxID,Int from .color import red,yellow,green,blue,cyan,pink,orange @@ -164,7 +166,7 @@ class MoneroMMGenFile: ) def extract_data_from_file(self,cfg,fn): - return json.loads( get_data_from_file( cfg, fn, self.desc ))[self.data_label] + return json.loads( get_data_from_file( cfg, str(fn), self.desc ))[self.data_label] class MoneroMMGenTX: @@ -265,12 +267,12 @@ class MoneroMMGenTX: ) if self.cfg.autosign: - fn = os.path.join( get_autosign_obj(self.cfg).xmr_tx_dir, fn ) + fn = get_autosign_obj(self.cfg).xmr_tx_dir / fn from .fileutil import write_data_to_file write_data_to_file( cfg = self.cfg, - outfile = fn, + outfile = str(fn), data = self.make_wrapped_data(dict_data), desc = self.desc, ask_write = ask_write, @@ -363,7 +365,7 @@ class MoneroMMGenTX: d = self.xmrwallet_tx_data(**d_wrap['data']) if self.name != 'Completed': - assert fn.endswith('.'+self.ext), 'TX filename {fn!r} has incorrect extension (not {self.ext!r})' + assert fn.name.endswith('.'+self.ext), 'TX filename {fn} has incorrect extension (not {self.ext!r})' assert getattr(d,self.req_field), f'{self.name} TX missing required field {self.req_field!r}' assert bool(d.sign_time)==self.signed,'{} has {}sign time!'.format(self.desc,'no 'if self.signed else'') for f in self.forbidden_fields: @@ -440,34 +442,33 @@ class MoneroWalletOutputsFile: from .fileutil import write_data_to_file write_data_to_file( cfg = self.cfg, - outfile = self.get_outfile( self.cfg, self.wallet_fn ) + add_suf, + outfile = str(self.get_outfile( self.cfg, self.wallet_fn )) + add_suf, data = self.make_wrapped_data(self.data._asdict()), desc = self.desc, ask_overwrite = False, ignore_opt_outdir = True ) def get_outfile(self,cfg,wallet_fn): - fn = self.fn_fs.format( - a = wallet_fn, - b = self.base_chksum, - c = self.ext, - ) - return os.path.join( - get_autosign_obj(cfg).xmr_outputs_dir, - os.path.basename(fn) ) if cfg.autosign else fn + return ( + get_autosign_obj(cfg).xmr_outputs_dir if cfg.autosign else + wallet_fn.parent ) / self.fn_fs.format( + a = wallet_fn.name, + b = self.base_chksum, + c = self.ext, + ) def get_wallet_fn(self,fn): - assert fn.endswith(f'.{self.ext}'), ( + assert fn.name.endswith(f'.{self.ext}'), ( f'{type(self).__name__}: filename does not end with {"."+self.ext!r}' ) - return fn[:-(len(self.ext)+self.ext_offset+1)] + return fn.parent / fn.name[:-(len(self.ext)+self.ext_offset+1)] def get_info(self,indent=''): if self.data.signed_key_images is not None: data = self.data.signed_key_images or [] - return f'{self.wallet_fn}: {len(data)} signed key image{suf(data)}' + return f'{self.wallet_fn.name}: {len(data)} signed key image{suf(data)}' else: - return f'{self.wallet_fn}: no key images' + return f'{self.wallet_fn.name}: no key images' class New(Base): ext = 'raw' @@ -478,7 +479,7 @@ class MoneroWalletOutputsFile: init_data = dict.fromkeys(self.data_tuple._fields) init_data.update({ 'seed_id': parent.kal.al_id.sid, - 'wallet_index': wallet_idx or parent.get_idx_from_fn(os.path.basename(wallet_fn)), + 'wallet_index': wallet_idx or parent.get_idx_from_fn(wallet_fn), }) init_data.update({k:v for k,v in data.items() if k in init_data}) self.data = self.data_tuple(**init_data) @@ -493,7 +494,7 @@ class MoneroWalletOutputsFile: d_wrap = self.extract_data_from_file( parent.cfg, fn ) data = d_wrap['data'] check_equal( 'Seed ID', data['seed_id'], parent.kal.al_id.sid ) - wallet_idx = parent.get_idx_from_fn(os.path.basename(wallet_fn)) + wallet_idx = parent.get_idx_from_fn(wallet_fn) check_equal( 'Wallet index', data['wallet_index'], wallet_idx ) super().__init__( parent = parent, @@ -505,23 +506,22 @@ class MoneroWalletOutputsFile: @classmethod def find_fn_from_wallet_fn(cls,cfg,wallet_fn,ret_on_no_match=False): - path = get_autosign_obj(cfg).xmr_outputs_dir or os.curdir - fn = os.path.basename(wallet_fn) + path = get_autosign_obj(cfg).xmr_outputs_dir or Path() pat = cls.fn_fs.format( - a = fn, + a = wallet_fn.name, b = f'[0-9a-f]{{{cls.chksum_nchars}}}\\', c = cls.ext, ) - matches = [f for f in os.scandir(path) if re.match(pat,f.name)] + matches = [f for f in path.iterdir() if re.match(pat,f.name)] if not matches and ret_on_no_match: return None if not matches or len(matches) > 1: - die(2,'{a} matching pattern {b!r} found in {c}!'.format( + die(2,"{a} matching pattern {b!r} found in '{c}'!".format( a = 'No files' if not matches else 'More than one file', b = pat, c = path )) - return matches[0].path + return matches[0] class Unsigned(Completed): pass @@ -548,7 +548,7 @@ class MoneroWalletDumpFile: 'wallet_metadata', ]) def get_outfile(self,cfg,wallet_fn): - return f'{wallet_fn}.{self.ext}' + return wallet_fn.parent / f'{wallet_fn.name}.{self.ext}' class New(Base,MoneroWalletOutputsFile.New): pass @@ -702,7 +702,7 @@ class MoneroWalletOps: def __init__(self,cfg,uarg_tuple): def wallet_exists(fn): - try: os.stat(fn) + try: fn.stat() except: return False else: return True @@ -711,9 +711,9 @@ class MoneroWalletOps: fn = self.get_wallet_fn(d) exists = wallet_exists(fn) if exists and not self.wallet_exists: - die(1,f'Wallet {fn!r} already exists!') + die(1,f"Wallet '{fn}' already exists!") elif not exists and self.wallet_exists: - die(1,f'Wallet {fn!r} not found!') + die(1,f"Wallet '{fn}' not found!") super().__init__(cfg,uarg_tuple) @@ -738,13 +738,13 @@ class MoneroWalletOps: self.kal = (ViewKeyAddrList if (self.cfg.watch_only and first_try) else KeyAddrList)( cfg = cfg, proto = self.proto, - addrfile = self.autosign_viewkey_addr_file if self.cfg.autosign else uarg.infile, + addrfile = str(self.autosign_viewkey_addr_file) if self.cfg.autosign else uarg.infile, key_address_validity_check = True, skip_chksum_msg = True ) break except: if first_try: - msg(f'Attempting to open {uarg.infile} as key-address list') + msg(f"Attempting to open '{uarg.infile}' as key-address list") continue raise @@ -786,7 +786,7 @@ class MoneroWalletOps: @classmethod def get_idx_from_fn(cls,fn): - return int( re.match(r'[0-9a-fA-F]{8}-(\d+)-Monero(WatchOnly)?Wallet.*',fn)[1] ) + return int( re.match(r'[0-9a-fA-F]{8}-(\d+)-Monero(WatchOnly)?Wallet.*',fn.name)[1] ) def get_coin_daemon_rpc(self): @@ -806,17 +806,17 @@ class MoneroWalletOps: def autosign_viewkey_addr_file(self): from .addrfile import ViewKeyAddrFile mpdir = get_autosign_obj(self.cfg).xmr_dir - fnlist = [f for f in os.listdir(mpdir) if f.endswith(ViewKeyAddrFile.ext)] - if len(fnlist) != 1: + flist = [f for f in mpdir.iterdir() if f.name.endswith(ViewKeyAddrFile.ext)] + if len(flist) != 1: die(2, - '{a} viewkey-address files found in autosign mountpoint directory {b!r}!\n'.format( - a = 'Multiple' if fnlist else 'No', + "{a} viewkey-address files found in autosign mountpoint directory '{b}'!\n".format( + a = 'Multiple' if flist else 'No', b = mpdir ) + 'Have you run ‘mmgen-autosign setup’ on your offline machine with the --xmrwallets option?' ) else: - return os.path.join( mpdir, fnlist[0] ) + return flist[0] def create_addr_data(self): if uarg.wallets: @@ -834,12 +834,14 @@ class MoneroWalletOps: def get_wallet_fn(self,data,watch_only=None): if watch_only is None: watch_only = self.cfg.watch_only - return os.path.join( - self.cfg.wallet_dir or '.','{a}-{b}-Monero{c}Wallet{d}'.format( + return Path( + (self.cfg.wallet_dir or '.'), + '{a}-{b}-Monero{c}Wallet{d}'.format( a = self.kal.al_id.sid, b = data.idx, c = 'WatchOnly' if watch_only else '', - d = f'.{self.cfg.network}' if self.cfg.network != 'mainnet' else '')) + d = f'.{self.cfg.network}' if self.cfg.network != 'mainnet' else '') + ) async def main(self): gmsg('\n{a}ing {b} {c}wallet{d}'.format( @@ -854,7 +856,7 @@ class MoneroWalletOps: self.stem.capitalize(), n+1, len(self.addr_data), - os.path.basename(fn), + fn.name, )) processed += await self.process_wallet( d, @@ -868,7 +870,7 @@ class MoneroWalletOps: self.action.capitalize(), self.wallet_desc, wallet_idx, - os.path.basename(fn) + fn.name )) class rpc: @@ -887,7 +889,7 @@ class MoneroWalletOps: gmsg_r(f'\n Opening {desc} wallet...') self.c.call( # returns {} 'open_wallet', - filename = os.path.basename(self.fn), + filename = self.fn.name, password = self.d.wallet_passwd ) gmsg('done') @@ -912,7 +914,7 @@ class MoneroWalletOps: def print_accts(self,data,addrs_data,indent=' '): d = data['subaddress_accounts'] - msg('\n' + indent + f'Accounts of wallet {os.path.basename(self.fn)}:') + msg('\n' + indent + f'Accounts of wallet {self.fn.name}:') fs = indent + ' {:6} {:18} {:<6} {:%s} {}' % max(len(e['label']) for e in d) msg(fs.format('Index ','Base Address','nAddrs','Label','Unlocked Balance')) for i,e in enumerate(d): @@ -1087,7 +1089,7 @@ class MoneroWalletOps: if self.cfg.watch_only: ret = self.c.call( 'generate_from_keys', - filename = os.path.basename(fn), + filename = fn.name, password = d.wallet_passwd, address = d.addr, viewkey = d.viewkey, @@ -1096,7 +1098,7 @@ class MoneroWalletOps: from .xmrseed import xmrseed ret = self.c.call( 'restore_deterministic_wallet', - filename = os.path.basename(fn), + filename = fn.name, password = d.wallet_passwd, seed = xmrseed().fromhex(d.sec.wif,tostr=True), restore_height = restore_height, @@ -1123,9 +1125,9 @@ class MoneroWalletOps: vkf = vkal.file # before writing viewkey-address file, delete any old ones in the directory: - for fn in os.listdir(self.cfg.outdir): - if fn.endswith(vkf.ext): - os.unlink(os.path.join(self.cfg.outdir,fn)) + for f in Path(self.cfg.outdir or '.').iterdir(): + if f.name.endswith(vkf.ext): + f.unlink() vkf.write() # write file to self.cfg.outdir @@ -1138,16 +1140,19 @@ class MoneroWalletOps: async def process_wallet(self,d,fn,last): def get_dump_data(): - fns = [fn for fn in - [self.get_wallet_fn(d,watch_only=wo) + '.dump' for wo in (True,False)] - if os.path.exists(fn)] - if not fns: - die(1,f'No suitable dump file found for {fn!r}') - elif len(fns) > 1: - ymsg(f'Warning: more than one dump file found for {fn!r} - using the first!') + def gen(): + for fn in [self.get_wallet_fn(d,watch_only=wo) for wo in (True,False)]: + ret = fn.parent / (fn.name + '.dump') + if ret.exists(): + yield ret + dump_fns = tuple(gen()) + if not dump_fns: + die(1,f"No suitable dump file found for '{fn}'") + elif len(dump_fns) > 1: + ymsg(f"Warning: more than one dump file found for '{fn}' - using the first!") return MoneroWalletDumpFile.Completed( parent = self, - fn = fns[0] ).data._asdict()['wallet_metadata'] + fn = dump_fns[0] ).data._asdict()['wallet_metadata'] def restore_accounts(): bmsg(' Restoring accounts:') @@ -1230,7 +1235,7 @@ class MoneroWalletOps: msg_r(' Opening wallet...') self.c.call( 'open_wallet', - filename = os.path.basename(fn), + filename = fn.name, password = d.wallet_passwd ) msg('done') @@ -1265,8 +1270,6 @@ class MoneroWalletOps: t_elapsed = int(time.time() - t_start) - bn = os.path.basename(fn) - a,b = self.rpc(self,d).get_accts(print=False) msg(' Balance: {} Unlocked balance: {}'.format( @@ -1274,7 +1277,7 @@ class MoneroWalletOps: hl_amt(a['total_unlocked_balance']), )) - self.accts_data[bn] = { 'accts': a, 'addrs': b } + self.accts_data[fn.name] = { 'accts': a, 'addrs': b } msg(f' Wallet height: {wallet_height}') msg(' Sync time: {:02}:{:02}'.format( @@ -1436,16 +1439,16 @@ class MoneroWalletOps: h.print_addrs(accts_data,self.account) else: h.close_wallet('source') - bn = os.path.basename(self.get_wallet_fn(self.dest)) + wf = self.get_wallet_fn(self.dest) h2 = self.rpc(self,self.dest) h2.open_wallet('destination') accts_data = h2.get_accts()[0] - if keypress_confirm( self.cfg, f'\nCreate new account for wallet {bn!r}?' ): + if keypress_confirm( self.cfg, f'\nCreate new account for wallet {wf.name!r}?' ): dest_acct,dest_addr = h2.create_acct() dest_addr_idx = 0 h2.get_accts() - elif keypress_confirm( self.cfg, f'Sweep to last existing account of wallet {bn!r}?' ): + elif keypress_confirm( self.cfg, f'Sweep to last existing account of wallet {wf.name!r}?' ): dest_acct,dest_addr_chk = h2.get_last_acct(accts_data) dest_addr,dest_addr_idx = h2.get_last_addr(dest_acct,display=False) assert dest_addr_chk == dest_addr, 'dest_addr_chk2' @@ -1612,21 +1615,22 @@ class MoneroWalletOps: if self.cfg.daemon: die(1,f'--daemon is not supported for the ‘{self.name}’ operation. Use --tx-relay-daemon instead') - def get_unsubmitted_tx_fn(self): + @property + def unsubmitted_tx_path(self): from .autosign import Signable t = Signable.xmr_transaction( get_autosign_obj(self.cfg) ) if len(t.unsubmitted) != 1: - die('AutosignTXError', '{a} unsubmitted transaction{b} in {c!r}!'.format( + die('AutosignTXError', "{a} unsubmitted transaction{b} in '{c}'!".format( a = 'More than one' if t.unsubmitted else 'No', b = suf(t.unsubmitted), c = t.parent.xmr_tx_dir, )) - return t.unsubmitted[0].path + return t.unsubmitted[0] async def main(self): tx = MoneroMMGenTX.ColdSigned( cfg = self.cfg, - fn = uarg.infile or self.get_unsubmitted_tx_fn() ) + fn = Path(uarg.infile) if uarg.infile else self.unsubmitted_tx_path ) h = self.rpc( self, self.kal.entry(tx.src_wallet_idx) ) self.head_msg(tx.src_wallet_idx,h.fn) h.open_wallet(self.wallet_desc) @@ -1685,7 +1689,7 @@ class MoneroWalletOps: wallet_fn = fn, ret_on_no_match = True ) if old_fn: - os.unlink(old_fn) + old_fn.unlink() m = MoneroWalletOutputsFile.New( parent = self, wallet_fn = fn, @@ -1700,14 +1704,14 @@ class MoneroWalletOps: start_daemon = False offline = True - async def main(self,f,wallet_idx): + async def main(self,fn,wallet_idx): await self.c.restart_daemon() h = self.rpc(self,self.addr_data[0]) - self.head_msg(wallet_idx,f.name) + self.head_msg(wallet_idx,fn) h.open_wallet('offline signing') m = MoneroWalletOutputsFile.Unsigned( parent = self, - fn = f.path ) + fn = fn ) res = self.c.call( 'import_outputs', outputs_data_hex = m.data.outputs_data_hex ) @@ -1717,7 +1721,7 @@ class MoneroWalletOps: data.update(self.c.call('export_key_images')) # for testing: all = True m = MoneroWalletOutputsFile.SignedNew( parent = self, - wallet_fn = m.get_wallet_fn(f.name), + wallet_fn = m.get_wallet_fn(fn), data = data ) idata = m.data.signed_key_images or [] bmsg(' {} key image{} signed'.format( len(idata), suf(idata) )) @@ -1754,7 +1758,7 @@ class MoneroWalletOps: super().__init__(cfg,uarg_tuple) - self.tx = MoneroMMGenTX.Signed( self.cfg, uarg.infile ) + self.tx = MoneroMMGenTX.Signed( self.cfg, Path(uarg.infile) ) if self.cfg.tx_relay_daemon: m = self.parse_tx_relay_opt() @@ -1806,6 +1810,6 @@ class MoneroWalletOps: '\n'.join( tx.get_info() for tx in sorted( - (MoneroMMGenTX.Completed( self.cfg, fn ) for fn in uarg.infile), + (MoneroMMGenTX.Completed( self.cfg, Path(fn) ) for fn in uarg.infile), key = lambda x: x.data.sign_time or x.data.create_time ) )) diff --git a/test/test_py_d/ts_autosign.py b/test/test_py_d/ts_autosign.py index e88cea26..79f1ca60 100755 --- a/test/test_py_d/ts_autosign.py +++ b/test/test_py_d/ts_autosign.py @@ -22,6 +22,7 @@ test.test_py_d.ts_autosign: Autosign tests for the test.py test suite import os,shutil from subprocess import run +from pathlib import Path from mmgen.cfg import gc @@ -54,14 +55,14 @@ def init_led(simulate): run(['sudo','chmod','0666',fn],check=True) def check_mountpoint(asi): - if not os.path.ismount(asi.mountpoint): + if not asi.mountpoint.is_mount(): try: run(['mount',asi.mountpoint],check=True) imsg(f'Mounted {asi.mountpoint}') except: die(2,f'Could not mount {asi.mountpoint}! Exiting') - if not os.path.isdir(asi.tx_dir): + if not asi.tx_dir.is_dir(): die(2,f'Directory {asi.tx_dir} does not exist! Exiting') def do_mount(mountpoint): @@ -93,7 +94,7 @@ class TestSuiteAutosignBase(TestSuiteBase): self.network_ids = [c+'_tn' for c in self.daemon_coins] + self.daemon_coins if not self.live: - self.wallet_dir = os.path.join(self.tmpdir,'dev.shm.autosign') + self.wallet_dir = Path( self.tmpdir, 'dev.shm.autosign' ) self.asi = Autosign( AutosignConfig({ @@ -120,12 +121,12 @@ class TestSuiteAutosignBase(TestSuiteBase): check_mountpoint(self.asi) init_led(self.simulate) else: - os.makedirs(self.asi.tx_dir,exist_ok=True) # creates mountpoint - os.makedirs(self.wallet_dir,exist_ok=True) + self.asi.tx_dir.mkdir(parents=True,exist_ok=True) # creates mountpoint + self.wallet_dir.mkdir(parents=True,exist_ok=True) self.opts.extend([ - '--mountpoint=' + self.mountpoint, + f'--mountpoint={self.mountpoint}', + f'--wallet-dir={self.wallet_dir}', '--no-insert-check', - '--wallet-dir=' + self.wallet_dir, ]) self.tx_file_ops('set_count') # initialize tx_count here so we can resume anywhere diff --git a/test/test_py_d/ts_xmr_autosign.py b/test/test_py_d/ts_xmr_autosign.py index 4b03c0fa..f40db0f4 100755 --- a/test/test_py_d/ts_xmr_autosign.py +++ b/test/test_py_d/ts_xmr_autosign.py @@ -14,6 +14,7 @@ test.test_py_d.ts_xmr_autosign: xmr autosigning tests for the test.py test suite """ from .ts_xmrwallet import * +from pathlib import Path from .ts_autosign import TestSuiteAutosignBase def make_burn_addr(): @@ -183,7 +184,6 @@ class TestSuiteXMRAutosign(TestSuiteXMRWallet,TestSuiteAutosignBase): return self._delete_files( '.dump' ) def autosign_setup(self): - from pathlib import Path Path(self.autosign_xmr_dir).mkdir(parents=True,exist_ok=True) Path(self.autosign_xmr_dir,'old.vkeys').touch() t = self.run_setup(