autosign,xmrwallet: use pathlib wherever possible

This commit is contained in:
The MMGen Project 2023-04-28 11:23:23 +00:00
commit ab28caa75b
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
5 changed files with 163 additions and 160 deletions

View file

@ -13,6 +13,7 @@ autosign: Auto-sign MMGen transactions, message files and XMR wallet output file
"""
import sys,os,asyncio
from pathlib import Path
from subprocess import run,PIPE,DEVNULL
from collections import namedtuple
@ -45,7 +46,7 @@ class Signable:
def _unprocessed(self,attrname,rawext,sigext):
if not hasattr(self,attrname):
dirlist = tuple(os.scandir(self.dir))
dirlist = tuple(self.dir.iterdir())
names = tuple(f.name for f in dirlist)
setattr(
self,
@ -70,7 +71,7 @@ class Signable:
async def sign(self,f):
from .tx import UnsignedTX
tx1 = UnsignedTX( cfg=self.cfg, filename=f.path )
tx1 = UnsignedTX( cfg=self.cfg, filename=f )
if tx1.proto.sign_mode == 'daemon':
from .rpc import rpc_init
tx1.rpc = await rpc_init( self.cfg, tx1.proto )
@ -118,7 +119,7 @@ class Signable:
def gen_bad_list(self,bad_files):
for f in bad_files:
yield red(f.path)
yield red(f.name)
class xmr_transaction(transaction):
dir_name = 'xmr_tx_dir'
@ -132,15 +133,15 @@ class Signable:
async def sign(self,f):
from .xmrwallet import MoneroMMGenTX,MoneroWalletOps,xmrwallet_uargs
tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f.path )
tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f )
m = MoneroWalletOps.sign(
self.parent.xmrwallet_cfg,
xmrwallet_uargs(
infile = self.parent.wallet_files[0], # MMGen wallet file
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(tx1.src_wallet_idx),
spec = None ),
)
tx2 = await m.main(f.path) # TODO: stop wallet daemon?
tx2 = await m.main(f) # TODO: stop wallet daemon?
tx2.write(ask_write=False)
return tx2
@ -156,11 +157,11 @@ class Signable:
async def sign(self,f):
from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f.name)
wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f)
m = MoneroWalletOps.export_key_images(
self.parent.xmrwallet_cfg,
xmrwallet_uargs(
infile = self.parent.wallet_files[0], # MMGen wallet file
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(wallet_idx),
spec = None ),
)
@ -181,11 +182,11 @@ class Signable:
async def sign(self,f):
from .msg import UnsignedMsg,SignedMsg
m = UnsignedMsg( self.cfg, infile=f.path )
m = UnsignedMsg( self.cfg, infile=f )
await m.sign( wallet_files=self.parent.wallet_files[:] )
m = SignedMsg( self.cfg, data=m.__dict__ )
m.write_to_file(
outdir = os.path.abspath(self.dir),
outdir = self.dir.resolve(),
ask_overwrite = False )
if m.data.get('failed_sids'):
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
@ -194,28 +195,28 @@ class Signable:
def print_summary(self,messages):
gmsg('\nSigned message files:')
for m in messages:
gmsg(' ' + os.path.join( self.dir, m.signed_filename ))
gmsg(' ' + m.signed_filename)
def gen_bad_list(self,bad_files):
for f in bad_files:
sigfile = f.path[:-len(self.rawext)] + self.sigext
yield orange(sigfile) if os.path.exists(sigfile) else red(f.path)
sigfile = f.parent / ( f.name[:-len(self.rawext)] + self.sigext )
yield orange(sigfile.name) if sigfile.exists() else red(f.name)
class Autosign:
dfl_mountpoint = os.path.join(os.sep,'mnt','mmgen_autosign')
dfl_wallet_dir = os.path.join(os.sep,'dev','shm','autosign')
disk_label_dir = os.path.join(os.sep,'dev','disk','by-label')
part_label = 'MMGEN_TX'
dfl_mountpoint = '/mnt/mmgen_autosign'
dfl_wallet_dir = '/dev/shm/autosign'
old_dfl_mountpoint = '/mnt/tx'
dev_disk_path = Path('/dev/disk/by-label/MMGEN_TX')
old_dfl_mountpoint = os.path.join(os.sep,'mnt','tx')
old_dfl_mountpoint_errmsg = f"""
Mountpoint {old_dfl_mountpoint!r} is no longer supported!
Please rename {old_dfl_mountpoint!r} to {dfl_mountpoint!r}
Mountpoint '{old_dfl_mountpoint}' is no longer supported!
Please rename '{old_dfl_mountpoint}' to '{dfl_mountpoint}'
and update your fstab accordingly.
"""
mountpoint_errmsg_fs = """
Mountpoint {!r} does not exist or does not point
Mountpoint '{}' does not exist or does not point
to a directory! Please create the mountpoint and add an entry
to your fstab as described in this scripts help text.
"""
@ -230,23 +231,23 @@ class Autosign:
def __init__(self,cfg):
self.cfg = cfg
if cfg.mnemonic_fmt:
if cfg.mnemonic_fmt not in self.mn_fmts:
die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format(
cfg.mnemonic_fmt,
fmt_list( self.mn_fmts, fmt='no_spc' ) ))
self.cfg = cfg
self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint)
self.wallet_dir = Path(cfg.wallet_dir or self.dfl_wallet_dir)
self.mountpoint = cfg.mountpoint or self.dfl_mountpoint
self.wallet_dir = cfg.wallet_dir or self.dfl_wallet_dir
self.tx_dir = self.mountpoint / 'tx'
self.msg_dir = self.mountpoint / 'msg'
self.keyfile = self.mountpoint / 'autosign.key'
self.tx_dir = os.path.join( self.mountpoint, 'tx' )
self.msg_dir = os.path.join( self.mountpoint, 'msg' )
self.keyfile = os.path.join( self.mountpoint, 'autosign.key' )
cfg.outdir = self.tx_dir
cfg.passwd_file = self.keyfile
cfg.outdir = str(self.tx_dir)
cfg.passwd_file = str(self.keyfile)
if any(k in cfg._uopts for k in ('help','longhelp')):
return
@ -264,9 +265,9 @@ class Autosign:
self.coins = ['BTC']
if 'XMR' in self.coins:
self.xmr_dir = os.path.join( self.mountpoint, 'xmr' )
self.xmr_tx_dir = os.path.join( self.mountpoint, 'xmr', 'tx' )
self.xmr_outputs_dir = os.path.join( self.mountpoint, 'xmr', 'outputs' )
self.xmr_dir = self.mountpoint / 'xmr'
self.xmr_tx_dir = self.mountpoint / 'xmr' / 'tx'
self.xmr_outputs_dir = self.mountpoint / 'xmr' / 'outputs'
async def check_daemons_running(self):
from .protocol import init_proto
@ -291,14 +292,13 @@ class Autosign:
if not hasattr(self,'_wallet_files'):
try:
dirlist = os.listdir(self.wallet_dir)
dirlist = self.wallet_dir.iterdir()
except:
die(1,f'Cannot open wallet directory {self.wallet_dir!r}. Did you run ‘mmgen-autosign setup’?')
die(1,f"Cannot open wallet directory '{self.wallet_dir}'. Did you run ‘mmgen-autosign setup’?")
fns = [fn for fn in dirlist if fn.endswith('.mmdat')]
if fns:
self._wallet_files = [os.path.join(self.wallet_dir,fn) for fn in fns]
else:
self._wallet_files = [f for f in dirlist if f.suffix == '.mmdat']
if not self._wallet_files:
die(1,'No wallet files present!')
return self._wallet_files
@ -309,27 +309,27 @@ class Autosign:
def check_dir(cdir):
try:
ds = os.stat(cdir)
assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!'
ds = cdir.stat()
assert S_ISDIR(ds.st_mode), f"'{cdir}' is not a directory!"
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f"'{cdir}' is not read/write for this user!"
except:
die(1,f'{cdir!r} missing or not read/writable by user!')
die(1,f"'{cdir}' missing or not read/writable by user!")
if not os.path.isdir(self.mountpoint):
if not self.mountpoint.is_dir():
def do_die(m):
die(1,'\n' + yellow(fmt(m.strip(),indent=' ')))
if os.path.isdir(self.old_dfl_mountpoint):
if Path(self.old_dfl_mountpoint).is_dir():
do_die(self.old_dfl_mountpoint_errmsg)
else:
do_die(self.mountpoint_errmsg_fs.format(self.mountpoint))
if not os.path.ismount(self.mountpoint):
if not self.mountpoint.is_mount():
if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0:
msg(f'Mounting {self.mountpoint!r}')
msg(f"Mounting '{self.mountpoint}'")
elif not self.cfg.test_suite:
die(1,f'Unable to mount device at {self.mountpoint!r}')
die(1,f"Unable to mount device at '{self.mountpoint}'")
self.have_msg_dir = os.path.isdir(self.msg_dir)
self.have_msg_dir = self.msg_dir.is_dir()
check_dir(self.tx_dir)
@ -340,14 +340,14 @@ class Autosign:
check_dir(self.xmr_tx_dir)
def do_umount(self):
if os.path.ismount(self.mountpoint):
if self.mountpoint.is_mount():
run( ['sync'], check=True )
msg(f'Unmounting {self.mountpoint!r}')
msg(f"Unmounting '{self.mountpoint}'")
run( ['umount',self.mountpoint], check=True )
bmsg('It is now safe to extract the removable device')
def decrypt_wallets(self):
msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}')
msg(f"Unlocking wallet{suf(self.wallet_files)} with key from '{self.cfg.passwd_file}'")
fails = 0
for wf in self.wallet_files:
try:
@ -368,9 +368,9 @@ class Autosign:
try:
ret = await target.sign(f)
except Exception as e:
ymsg(f'An error occurred with {target.desc} {f.name!r}:\n {type(e).__name__}: {e!s}')
ymsg(f"An error occurred with {target.desc} '{f.name}':\n {type(e).__name__}: {e!s}")
except:
ymsg(f'An error occurred with {target.desc} {f.name!r}')
ymsg(f"An error occurred with {target.desc} '{f.name}'")
good.append(ret) if ret else bad.append(f)
self.cfg._util.qmsg('')
await asyncio.sleep(0.3)
@ -412,24 +412,22 @@ class Autosign:
return False
def wipe_existing_key(self):
try: os.stat(self.keyfile)
try: self.keyfile.stat()
except: pass
else:
from .fileutil import shred_file
msg(f'\nShredding existing key {self.keyfile!r}')
msg(f"\nShredding existing key '{self.keyfile}'")
shred_file( self.keyfile, verbose=self.cfg.verbose )
def create_key(self):
kdata = os.urandom(32).hex()
desc = f'key file {self.keyfile!r}'
desc = f"key file '{self.keyfile}'"
msg('Creating ' + desc)
try:
with open(self.keyfile,'w') as fp:
fp.write(kdata+'\n')
os.chmod(self.keyfile,0o400)
msg('Wrote ' + desc)
self.keyfile.write_text( os.urandom(32).hex() )
self.keyfile.chmod(0o400)
except:
die(2,'Unable to write ' + desc)
msg('Wrote ' + desc)
def gen_key(self,no_unmount=False):
self.create_wallet_dir()
@ -448,10 +446,10 @@ class Autosign:
except: pass
def create_wallet_dir(self):
try: os.mkdir(self.wallet_dir)
try: self.wallet_dir.mkdir(parents=True)
except: pass
try: os.stat(self.wallet_dir)
except: die(2,f'Unable to create wallet directory {self.wallet_dir!r}')
try: self.wallet_dir.stat()
except: die(2,f"Unable to create wallet directory '{self.wallet_dir}'")
def setup(self):
self.remove_wallet_dir()
@ -459,7 +457,7 @@ class Autosign:
wf = find_file_in_dir( get_wallet_cls('mmgen'), self.cfg.data_dir )
if wf and keypress_confirm(
cfg = self.cfg,
prompt = f'Default wallet {wf!r} found.\nUse default wallet for autosigning?',
prompt = f"Default wallet '{wf}' found.\nUse default wallet for autosigning?",
default_yes = True ):
from .cfg import Config
ss_in = Wallet( Config(), wf )
@ -477,10 +475,10 @@ class Autosign:
'wallet_rpc_user': 'autosigner',
'wallet_rpc_password': 'my very secret password',
'passwd_file': self.cfg.passwd_file,
'wallet_dir': self.wallet_dir,
'wallet_dir': str(self.wallet_dir),
'autosign': True,
'autosign_mountpoint': self.mountpoint,
'outdir': self.xmr_dir, # required by vkal.write()
'autosign_mountpoint': str(self.mountpoint),
'outdir': str(self.xmr_dir), # required by vkal.write()
})
return self._xmrwallet_cfg
@ -490,16 +488,16 @@ class Autosign:
try: shutil.rmtree(self.xmr_outputs_dir)
except: pass
os.makedirs(self.xmr_outputs_dir)
self.xmr_outputs_dir.mkdir(parents=True)
os.makedirs(self.xmr_tx_dir,exist_ok=True)
self.xmr_tx_dir.mkdir(exist_ok=True)
from .addrfile import ViewKeyAddrFile
from .fileutil import shred_file
for f in os.scandir(self.xmr_dir):
for f in self.xmr_dir.iterdir():
if f.name.endswith(ViewKeyAddrFile.ext):
msg(f'Shredding old viewkey-address file {f.name!r}')
shred_file(f.path)
shred_file(f)
if len(self.wallet_files) > 1:
ymsg(f'Warning: more that one wallet file, using the first ({self.wallet_files[0]}) for xmrwallet generation')
@ -518,7 +516,7 @@ class Autosign:
def get_insert_status(self):
if self.cfg.no_insert_check:
return True
try: os.stat(os.path.join( self.disk_label_dir, self.part_label ))
try: self.dev_disk_path.stat()
except: return False
else: return True

View file

@ -147,8 +147,8 @@ class MoneroWalletDaemon(RPCDaemon):
['--trusted-daemon', trust_daemon],
['--untrusted-daemon', not trust_daemon],
[f'--rpc-bind-port={self.rpc_port}'],
['--wallet-dir='+self.wallet_dir],
['--log-file='+self.logfile],
[f'--wallet-dir={self.wallet_dir}'],
[f'--log-file={self.logfile}'],
[f'--rpc-login={self.user}:{self.passwd}'],
[f'--daemon-address={self.daemon_addr}', self.daemon_addr],
[f'--daemon-port={self.daemon_port}', not self.daemon_addr],

View file

@ -20,8 +20,10 @@
xmrwallet.py - MoneroWalletOps class
"""
import os,re,time,json
import re,time,json
from collections import namedtuple
from pathlib import PosixPath as Path
from .objmethods import MMGenObject,Hilite,InitErrors
from .obj import CoinTxID,Int
from .color import red,yellow,green,blue,cyan,pink,orange
@ -164,7 +166,7 @@ class MoneroMMGenFile:
)
def extract_data_from_file(self,cfg,fn):
return json.loads( get_data_from_file( cfg, fn, self.desc ))[self.data_label]
return json.loads( get_data_from_file( cfg, str(fn), self.desc ))[self.data_label]
class MoneroMMGenTX:
@ -265,12 +267,12 @@ class MoneroMMGenTX:
)
if self.cfg.autosign:
fn = os.path.join( get_autosign_obj(self.cfg).xmr_tx_dir, fn )
fn = get_autosign_obj(self.cfg).xmr_tx_dir / fn
from .fileutil import write_data_to_file
write_data_to_file(
cfg = self.cfg,
outfile = fn,
outfile = str(fn),
data = self.make_wrapped_data(dict_data),
desc = self.desc,
ask_write = ask_write,
@ -363,7 +365,7 @@ class MoneroMMGenTX:
d = self.xmrwallet_tx_data(**d_wrap['data'])
if self.name != 'Completed':
assert fn.endswith('.'+self.ext), 'TX filename {fn!r} has incorrect extension (not {self.ext!r})'
assert fn.name.endswith('.'+self.ext), 'TX filename {fn} has incorrect extension (not {self.ext!r})'
assert getattr(d,self.req_field), f'{self.name} TX missing required field {self.req_field!r}'
assert bool(d.sign_time)==self.signed,'{} has {}sign time!'.format(self.desc,'no 'if self.signed else'')
for f in self.forbidden_fields:
@ -440,34 +442,33 @@ class MoneroWalletOutputsFile:
from .fileutil import write_data_to_file
write_data_to_file(
cfg = self.cfg,
outfile = self.get_outfile( self.cfg, self.wallet_fn ) + add_suf,
outfile = str(self.get_outfile( self.cfg, self.wallet_fn )) + add_suf,
data = self.make_wrapped_data(self.data._asdict()),
desc = self.desc,
ask_overwrite = False,
ignore_opt_outdir = True )
def get_outfile(self,cfg,wallet_fn):
fn = self.fn_fs.format(
a = wallet_fn,
b = self.base_chksum,
c = self.ext,
)
return os.path.join(
get_autosign_obj(cfg).xmr_outputs_dir,
os.path.basename(fn) ) if cfg.autosign else fn
return (
get_autosign_obj(cfg).xmr_outputs_dir if cfg.autosign else
wallet_fn.parent ) / self.fn_fs.format(
a = wallet_fn.name,
b = self.base_chksum,
c = self.ext,
)
def get_wallet_fn(self,fn):
assert fn.endswith(f'.{self.ext}'), (
assert fn.name.endswith(f'.{self.ext}'), (
f'{type(self).__name__}: filename does not end with {"."+self.ext!r}'
)
return fn[:-(len(self.ext)+self.ext_offset+1)]
return fn.parent / fn.name[:-(len(self.ext)+self.ext_offset+1)]
def get_info(self,indent=''):
if self.data.signed_key_images is not None:
data = self.data.signed_key_images or []
return f'{self.wallet_fn}: {len(data)} signed key image{suf(data)}'
return f'{self.wallet_fn.name}: {len(data)} signed key image{suf(data)}'
else:
return f'{self.wallet_fn}: no key images'
return f'{self.wallet_fn.name}: no key images'
class New(Base):
ext = 'raw'
@ -478,7 +479,7 @@ class MoneroWalletOutputsFile:
init_data = dict.fromkeys(self.data_tuple._fields)
init_data.update({
'seed_id': parent.kal.al_id.sid,
'wallet_index': wallet_idx or parent.get_idx_from_fn(os.path.basename(wallet_fn)),
'wallet_index': wallet_idx or parent.get_idx_from_fn(wallet_fn),
})
init_data.update({k:v for k,v in data.items() if k in init_data})
self.data = self.data_tuple(**init_data)
@ -493,7 +494,7 @@ class MoneroWalletOutputsFile:
d_wrap = self.extract_data_from_file( parent.cfg, fn )
data = d_wrap['data']
check_equal( 'Seed ID', data['seed_id'], parent.kal.al_id.sid )
wallet_idx = parent.get_idx_from_fn(os.path.basename(wallet_fn))
wallet_idx = parent.get_idx_from_fn(wallet_fn)
check_equal( 'Wallet index', data['wallet_index'], wallet_idx )
super().__init__(
parent = parent,
@ -505,23 +506,22 @@ class MoneroWalletOutputsFile:
@classmethod
def find_fn_from_wallet_fn(cls,cfg,wallet_fn,ret_on_no_match=False):
path = get_autosign_obj(cfg).xmr_outputs_dir or os.curdir
fn = os.path.basename(wallet_fn)
path = get_autosign_obj(cfg).xmr_outputs_dir or Path()
pat = cls.fn_fs.format(
a = fn,
a = wallet_fn.name,
b = f'[0-9a-f]{{{cls.chksum_nchars}}}\\',
c = cls.ext,
)
matches = [f for f in os.scandir(path) if re.match(pat,f.name)]
matches = [f for f in path.iterdir() if re.match(pat,f.name)]
if not matches and ret_on_no_match:
return None
if not matches or len(matches) > 1:
die(2,'{a} matching pattern {b!r} found in {c}!'.format(
die(2,"{a} matching pattern {b!r} found in '{c}'!".format(
a = 'No files' if not matches else 'More than one file',
b = pat,
c = path
))
return matches[0].path
return matches[0]
class Unsigned(Completed):
pass
@ -548,7 +548,7 @@ class MoneroWalletDumpFile:
'wallet_metadata',
])
def get_outfile(self,cfg,wallet_fn):
return f'{wallet_fn}.{self.ext}'
return wallet_fn.parent / f'{wallet_fn.name}.{self.ext}'
class New(Base,MoneroWalletOutputsFile.New):
pass
@ -702,7 +702,7 @@ class MoneroWalletOps:
def __init__(self,cfg,uarg_tuple):
def wallet_exists(fn):
try: os.stat(fn)
try: fn.stat()
except: return False
else: return True
@ -711,9 +711,9 @@ class MoneroWalletOps:
fn = self.get_wallet_fn(d)
exists = wallet_exists(fn)
if exists and not self.wallet_exists:
die(1,f'Wallet {fn!r} already exists!')
die(1,f"Wallet '{fn}' already exists!")
elif not exists and self.wallet_exists:
die(1,f'Wallet {fn!r} not found!')
die(1,f"Wallet '{fn}' not found!")
super().__init__(cfg,uarg_tuple)
@ -738,13 +738,13 @@ class MoneroWalletOps:
self.kal = (ViewKeyAddrList if (self.cfg.watch_only and first_try) else KeyAddrList)(
cfg = cfg,
proto = self.proto,
addrfile = self.autosign_viewkey_addr_file if self.cfg.autosign else uarg.infile,
addrfile = str(self.autosign_viewkey_addr_file) if self.cfg.autosign else uarg.infile,
key_address_validity_check = True,
skip_chksum_msg = True )
break
except:
if first_try:
msg(f'Attempting to open {uarg.infile} as key-address list')
msg(f"Attempting to open '{uarg.infile}' as key-address list")
continue
raise
@ -786,7 +786,7 @@ class MoneroWalletOps:
@classmethod
def get_idx_from_fn(cls,fn):
return int( re.match(r'[0-9a-fA-F]{8}-(\d+)-Monero(WatchOnly)?Wallet.*',fn)[1] )
return int( re.match(r'[0-9a-fA-F]{8}-(\d+)-Monero(WatchOnly)?Wallet.*',fn.name)[1] )
def get_coin_daemon_rpc(self):
@ -806,17 +806,17 @@ class MoneroWalletOps:
def autosign_viewkey_addr_file(self):
from .addrfile import ViewKeyAddrFile
mpdir = get_autosign_obj(self.cfg).xmr_dir
fnlist = [f for f in os.listdir(mpdir) if f.endswith(ViewKeyAddrFile.ext)]
if len(fnlist) != 1:
flist = [f for f in mpdir.iterdir() if f.name.endswith(ViewKeyAddrFile.ext)]
if len(flist) != 1:
die(2,
'{a} viewkey-address files found in autosign mountpoint directory {b!r}!\n'.format(
a = 'Multiple' if fnlist else 'No',
"{a} viewkey-address files found in autosign mountpoint directory '{b}'!\n".format(
a = 'Multiple' if flist else 'No',
b = mpdir
)
+ 'Have you run ‘mmgen-autosign setup’ on your offline machine with the --xmrwallets option?'
)
else:
return os.path.join( mpdir, fnlist[0] )
return flist[0]
def create_addr_data(self):
if uarg.wallets:
@ -834,12 +834,14 @@ class MoneroWalletOps:
def get_wallet_fn(self,data,watch_only=None):
if watch_only is None:
watch_only = self.cfg.watch_only
return os.path.join(
self.cfg.wallet_dir or '.','{a}-{b}-Monero{c}Wallet{d}'.format(
return Path(
(self.cfg.wallet_dir or '.'),
'{a}-{b}-Monero{c}Wallet{d}'.format(
a = self.kal.al_id.sid,
b = data.idx,
c = 'WatchOnly' if watch_only else '',
d = f'.{self.cfg.network}' if self.cfg.network != 'mainnet' else ''))
d = f'.{self.cfg.network}' if self.cfg.network != 'mainnet' else '')
)
async def main(self):
gmsg('\n{a}ing {b} {c}wallet{d}'.format(
@ -854,7 +856,7 @@ class MoneroWalletOps:
self.stem.capitalize(),
n+1,
len(self.addr_data),
os.path.basename(fn),
fn.name,
))
processed += await self.process_wallet(
d,
@ -868,7 +870,7 @@ class MoneroWalletOps:
self.action.capitalize(),
self.wallet_desc,
wallet_idx,
os.path.basename(fn)
fn.name
))
class rpc:
@ -887,7 +889,7 @@ class MoneroWalletOps:
gmsg_r(f'\n Opening {desc} wallet...')
self.c.call( # returns {}
'open_wallet',
filename = os.path.basename(self.fn),
filename = self.fn.name,
password = self.d.wallet_passwd )
gmsg('done')
@ -912,7 +914,7 @@ class MoneroWalletOps:
def print_accts(self,data,addrs_data,indent=' '):
d = data['subaddress_accounts']
msg('\n' + indent + f'Accounts of wallet {os.path.basename(self.fn)}:')
msg('\n' + indent + f'Accounts of wallet {self.fn.name}:')
fs = indent + ' {:6} {:18} {:<6} {:%s} {}' % max(len(e['label']) for e in d)
msg(fs.format('Index ','Base Address','nAddrs','Label','Unlocked Balance'))
for i,e in enumerate(d):
@ -1087,7 +1089,7 @@ class MoneroWalletOps:
if self.cfg.watch_only:
ret = self.c.call(
'generate_from_keys',
filename = os.path.basename(fn),
filename = fn.name,
password = d.wallet_passwd,
address = d.addr,
viewkey = d.viewkey,
@ -1096,7 +1098,7 @@ class MoneroWalletOps:
from .xmrseed import xmrseed
ret = self.c.call(
'restore_deterministic_wallet',
filename = os.path.basename(fn),
filename = fn.name,
password = d.wallet_passwd,
seed = xmrseed().fromhex(d.sec.wif,tostr=True),
restore_height = restore_height,
@ -1123,9 +1125,9 @@ class MoneroWalletOps:
vkf = vkal.file
# before writing viewkey-address file, delete any old ones in the directory:
for fn in os.listdir(self.cfg.outdir):
if fn.endswith(vkf.ext):
os.unlink(os.path.join(self.cfg.outdir,fn))
for f in Path(self.cfg.outdir or '.').iterdir():
if f.name.endswith(vkf.ext):
f.unlink()
vkf.write() # write file to self.cfg.outdir
@ -1138,16 +1140,19 @@ class MoneroWalletOps:
async def process_wallet(self,d,fn,last):
def get_dump_data():
fns = [fn for fn in
[self.get_wallet_fn(d,watch_only=wo) + '.dump' for wo in (True,False)]
if os.path.exists(fn)]
if not fns:
die(1,f'No suitable dump file found for {fn!r}')
elif len(fns) > 1:
ymsg(f'Warning: more than one dump file found for {fn!r} - using the first!')
def gen():
for fn in [self.get_wallet_fn(d,watch_only=wo) for wo in (True,False)]:
ret = fn.parent / (fn.name + '.dump')
if ret.exists():
yield ret
dump_fns = tuple(gen())
if not dump_fns:
die(1,f"No suitable dump file found for '{fn}'")
elif len(dump_fns) > 1:
ymsg(f"Warning: more than one dump file found for '{fn}' - using the first!")
return MoneroWalletDumpFile.Completed(
parent = self,
fn = fns[0] ).data._asdict()['wallet_metadata']
fn = dump_fns[0] ).data._asdict()['wallet_metadata']
def restore_accounts():
bmsg(' Restoring accounts:')
@ -1230,7 +1235,7 @@ class MoneroWalletOps:
msg_r(' Opening wallet...')
self.c.call(
'open_wallet',
filename = os.path.basename(fn),
filename = fn.name,
password = d.wallet_passwd )
msg('done')
@ -1265,8 +1270,6 @@ class MoneroWalletOps:
t_elapsed = int(time.time() - t_start)
bn = os.path.basename(fn)
a,b = self.rpc(self,d).get_accts(print=False)
msg(' Balance: {} Unlocked balance: {}'.format(
@ -1274,7 +1277,7 @@ class MoneroWalletOps:
hl_amt(a['total_unlocked_balance']),
))
self.accts_data[bn] = { 'accts': a, 'addrs': b }
self.accts_data[fn.name] = { 'accts': a, 'addrs': b }
msg(f' Wallet height: {wallet_height}')
msg(' Sync time: {:02}:{:02}'.format(
@ -1436,16 +1439,16 @@ class MoneroWalletOps:
h.print_addrs(accts_data,self.account)
else:
h.close_wallet('source')
bn = os.path.basename(self.get_wallet_fn(self.dest))
wf = self.get_wallet_fn(self.dest)
h2 = self.rpc(self,self.dest)
h2.open_wallet('destination')
accts_data = h2.get_accts()[0]
if keypress_confirm( self.cfg, f'\nCreate new account for wallet {bn!r}?' ):
if keypress_confirm( self.cfg, f'\nCreate new account for wallet {wf.name!r}?' ):
dest_acct,dest_addr = h2.create_acct()
dest_addr_idx = 0
h2.get_accts()
elif keypress_confirm( self.cfg, f'Sweep to last existing account of wallet {bn!r}?' ):
elif keypress_confirm( self.cfg, f'Sweep to last existing account of wallet {wf.name!r}?' ):
dest_acct,dest_addr_chk = h2.get_last_acct(accts_data)
dest_addr,dest_addr_idx = h2.get_last_addr(dest_acct,display=False)
assert dest_addr_chk == dest_addr, 'dest_addr_chk2'
@ -1612,21 +1615,22 @@ class MoneroWalletOps:
if self.cfg.daemon:
die(1,f'--daemon is not supported for the ‘{self.name}’ operation. Use --tx-relay-daemon instead')
def get_unsubmitted_tx_fn(self):
@property
def unsubmitted_tx_path(self):
from .autosign import Signable
t = Signable.xmr_transaction( get_autosign_obj(self.cfg) )
if len(t.unsubmitted) != 1:
die('AutosignTXError', '{a} unsubmitted transaction{b} in {c!r}!'.format(
die('AutosignTXError', "{a} unsubmitted transaction{b} in '{c}'!".format(
a = 'More than one' if t.unsubmitted else 'No',
b = suf(t.unsubmitted),
c = t.parent.xmr_tx_dir,
))
return t.unsubmitted[0].path
return t.unsubmitted[0]
async def main(self):
tx = MoneroMMGenTX.ColdSigned(
cfg = self.cfg,
fn = uarg.infile or self.get_unsubmitted_tx_fn() )
fn = Path(uarg.infile) if uarg.infile else self.unsubmitted_tx_path )
h = self.rpc( self, self.kal.entry(tx.src_wallet_idx) )
self.head_msg(tx.src_wallet_idx,h.fn)
h.open_wallet(self.wallet_desc)
@ -1685,7 +1689,7 @@ class MoneroWalletOps:
wallet_fn = fn,
ret_on_no_match = True )
if old_fn:
os.unlink(old_fn)
old_fn.unlink()
m = MoneroWalletOutputsFile.New(
parent = self,
wallet_fn = fn,
@ -1700,14 +1704,14 @@ class MoneroWalletOps:
start_daemon = False
offline = True
async def main(self,f,wallet_idx):
async def main(self,fn,wallet_idx):
await self.c.restart_daemon()
h = self.rpc(self,self.addr_data[0])
self.head_msg(wallet_idx,f.name)
self.head_msg(wallet_idx,fn)
h.open_wallet('offline signing')
m = MoneroWalletOutputsFile.Unsigned(
parent = self,
fn = f.path )
fn = fn )
res = self.c.call(
'import_outputs',
outputs_data_hex = m.data.outputs_data_hex )
@ -1717,7 +1721,7 @@ class MoneroWalletOps:
data.update(self.c.call('export_key_images')) # for testing: all = True
m = MoneroWalletOutputsFile.SignedNew(
parent = self,
wallet_fn = m.get_wallet_fn(f.name),
wallet_fn = m.get_wallet_fn(fn),
data = data )
idata = m.data.signed_key_images or []
bmsg(' {} key image{} signed'.format( len(idata), suf(idata) ))
@ -1754,7 +1758,7 @@ class MoneroWalletOps:
super().__init__(cfg,uarg_tuple)
self.tx = MoneroMMGenTX.Signed( self.cfg, uarg.infile )
self.tx = MoneroMMGenTX.Signed( self.cfg, Path(uarg.infile) )
if self.cfg.tx_relay_daemon:
m = self.parse_tx_relay_opt()
@ -1806,6 +1810,6 @@ class MoneroWalletOps:
'\n'.join(
tx.get_info() for tx in
sorted(
(MoneroMMGenTX.Completed( self.cfg, fn ) for fn in uarg.infile),
(MoneroMMGenTX.Completed( self.cfg, Path(fn) ) for fn in uarg.infile),
key = lambda x: x.data.sign_time or x.data.create_time )
))

View file

@ -22,6 +22,7 @@ test.test_py_d.ts_autosign: Autosign tests for the test.py test suite
import os,shutil
from subprocess import run
from pathlib import Path
from mmgen.cfg import gc
@ -54,14 +55,14 @@ def init_led(simulate):
run(['sudo','chmod','0666',fn],check=True)
def check_mountpoint(asi):
if not os.path.ismount(asi.mountpoint):
if not asi.mountpoint.is_mount():
try:
run(['mount',asi.mountpoint],check=True)
imsg(f'Mounted {asi.mountpoint}')
except:
die(2,f'Could not mount {asi.mountpoint}! Exiting')
if not os.path.isdir(asi.tx_dir):
if not asi.tx_dir.is_dir():
die(2,f'Directory {asi.tx_dir} does not exist! Exiting')
def do_mount(mountpoint):
@ -93,7 +94,7 @@ class TestSuiteAutosignBase(TestSuiteBase):
self.network_ids = [c+'_tn' for c in self.daemon_coins] + self.daemon_coins
if not self.live:
self.wallet_dir = os.path.join(self.tmpdir,'dev.shm.autosign')
self.wallet_dir = Path( self.tmpdir, 'dev.shm.autosign' )
self.asi = Autosign(
AutosignConfig({
@ -120,12 +121,12 @@ class TestSuiteAutosignBase(TestSuiteBase):
check_mountpoint(self.asi)
init_led(self.simulate)
else:
os.makedirs(self.asi.tx_dir,exist_ok=True) # creates mountpoint
os.makedirs(self.wallet_dir,exist_ok=True)
self.asi.tx_dir.mkdir(parents=True,exist_ok=True) # creates mountpoint
self.wallet_dir.mkdir(parents=True,exist_ok=True)
self.opts.extend([
'--mountpoint=' + self.mountpoint,
f'--mountpoint={self.mountpoint}',
f'--wallet-dir={self.wallet_dir}',
'--no-insert-check',
'--wallet-dir=' + self.wallet_dir,
])
self.tx_file_ops('set_count') # initialize tx_count here so we can resume anywhere

View file

@ -14,6 +14,7 @@ test.test_py_d.ts_xmr_autosign: xmr autosigning tests for the test.py test suite
"""
from .ts_xmrwallet import *
from pathlib import Path
from .ts_autosign import TestSuiteAutosignBase
def make_burn_addr():
@ -183,7 +184,6 @@ class TestSuiteXMRAutosign(TestSuiteXMRWallet,TestSuiteAutosignBase):
return self._delete_files( '.dump' )
def autosign_setup(self):
from pathlib import Path
Path(self.autosign_xmr_dir).mkdir(parents=True,exist_ok=True)
Path(self.autosign_xmr_dir,'old.vkeys').touch()
t = self.run_setup(