|
|
@@ -9,7 +9,7 @@
|
|
|
# https://gitlab.com/mmgen/mmgen-wallet
|
|
|
|
|
|
"""
|
|
|
-autosign: Auto-sign MMGen transactions, message files and XMR wallet output files
|
|
|
+autosign: Autosign MMGen transactions, message files and XMR wallet output files
|
|
|
"""
|
|
|
|
|
|
import sys, os, asyncio
|
|
|
@@ -155,7 +155,8 @@ class Signable:
|
|
|
|
|
|
def _processed(self, attrname, ext):
|
|
|
if not hasattr(self, attrname):
|
|
|
- setattr(self, attrname, tuple(f for f in sorted(self.dir.iterdir()) if f.name.endswith('.'+ext)))
|
|
|
+ setattr(self, attrname, tuple(f for f in sorted(self.dir.iterdir())
|
|
|
+ if f.name.endswith('.' + ext)))
|
|
|
return getattr(self, attrname)
|
|
|
|
|
|
@property
|
|
|
@@ -193,13 +194,14 @@ class Signable:
|
|
|
|
|
|
def die_wrong_num_txs(self, tx_type, *, msg=None, desc=None, show_dir=False):
|
|
|
num_txs = len(getattr(self, tx_type))
|
|
|
- die('AutosignTXError', "{m}{a} {b} transaction{c} {d} {e}!".format(
|
|
|
+ die('AutosignTXError', '{m}{a} {b} transaction{c} {d} {e}!'.format(
|
|
|
m = msg + '\n' if msg else '',
|
|
|
a = 'One' if num_txs == 1 else 'More than one' if num_txs else 'No',
|
|
|
b = desc or tx_type,
|
|
|
c = suf(num_txs),
|
|
|
d = 'already present' if num_txs else 'present',
|
|
|
- e = f'in ‘{getattr(self.parent, self.dir_name)}’' if show_dir else 'on removable device'))
|
|
|
+ e = f'in ‘{getattr(self.parent, self.dir_name)}’'
|
|
|
+ if show_dir else 'on removable device'))
|
|
|
|
|
|
def check_create_ok(self):
|
|
|
if len(self.unsigned):
|
|
|
@@ -250,7 +252,8 @@ class Signable:
|
|
|
ext = '.' + Signable.automount_transaction.subext
|
|
|
files = [f for f in self.dir.iterdir() if f.name.endswith(ext)]
|
|
|
return sorted(
|
|
|
- [await CompletedTX(cfg=self.cfg, filename=str(txfile), quiet_open=True) for txfile in files],
|
|
|
+ [await CompletedTX(cfg=self.cfg, filename=str(txfile), quiet_open=True)
|
|
|
+ for txfile in files],
|
|
|
key = lambda x: x.timestamp)[-1]
|
|
|
|
|
|
class transaction(base):
|
|
|
@@ -405,7 +408,9 @@ class Signable:
|
|
|
outdir = self.dir.resolve(),
|
|
|
ask_overwrite = False)
|
|
|
if m.data.get('failed_sids'):
|
|
|
- die('MsgFileFailedSID', f'Failed Seed IDs: {fmt_list(m.data["failed_sids"], fmt="bare")}')
|
|
|
+ die(
|
|
|
+ 'MsgFileFailedSID',
|
|
|
+ f'Failed Seed IDs: {fmt_list(m.data["failed_sids"], fmt="bare")}')
|
|
|
return m
|
|
|
|
|
|
def print_summary(self, signables):
|
|
|
@@ -575,7 +580,9 @@ class Autosign:
|
|
|
try:
|
|
|
dirlist = self.wallet_dir.iterdir()
|
|
|
except:
|
|
|
- die(1, f"Cannot open wallet directory '{self.wallet_dir}'. Did you run ‘mmgen-autosign setup’?")
|
|
|
+ die(1,
|
|
|
+ f'Cannot open wallet directory ‘{self.wallet_dir}’. '
|
|
|
+ 'Did you run ‘mmgen-autosign setup’?')
|
|
|
|
|
|
self._wallet_files = [f for f in dirlist if f.suffix == '.mmdat']
|
|
|
|
|
|
@@ -611,9 +618,9 @@ class Autosign:
|
|
|
redir = None if verbose else DEVNULL
|
|
|
if run(self.mount_cmd.split(), stderr=redir, stdout=redir).returncode == 0:
|
|
|
if not silent:
|
|
|
- msg(f"Mounting '{self.mountpoint}'")
|
|
|
+ msg(f'Mounting ‘{self.mountpoint}’')
|
|
|
else:
|
|
|
- die(1, f'Unable to mount device {self.dev_label} at {self.mountpoint}')
|
|
|
+ die(1, f'Unable to mount device ‘{self.dev_label}’ at ‘{self.mountpoint}’')
|
|
|
|
|
|
for dirname in self.dirs:
|
|
|
check_or_create(dirname)
|
|
|
@@ -622,14 +629,14 @@ class Autosign:
|
|
|
if self.mountpoint.is_mount():
|
|
|
run(['sync'], check=True)
|
|
|
if not silent:
|
|
|
- msg(f"Unmounting '{self.mountpoint}'")
|
|
|
+ msg(f'Unmounting ‘{self.mountpoint}’')
|
|
|
redir = None if verbose else DEVNULL
|
|
|
run(self.umount_cmd.split(), stdout=redir, check=True)
|
|
|
if not silent:
|
|
|
bmsg('It is now safe to extract the removable device')
|
|
|
|
|
|
def decrypt_wallets(self):
|
|
|
- msg(f"Unlocking wallet{suf(self.wallet_files)} with key from ‘{self.keyfile}’")
|
|
|
+ msg(f'Unlocking wallet{suf(self.wallet_files)} with key from ‘{self.keyfile}’')
|
|
|
fails = 0
|
|
|
for wf in self.wallet_files:
|
|
|
try:
|
|
|
@@ -654,9 +661,10 @@ class Autosign:
|
|
|
try:
|
|
|
ret = await target.sign(f)
|
|
|
except Exception as e:
|
|
|
- ymsg(f"An error occurred with {target.desc} '{f.name}':\n {type(e).__name__}: {e!s}")
|
|
|
+ ymsg('An error occurred with {} ‘{}’:\n {}: ‘{}’'.format(
|
|
|
+ target.desc, f.name, type(e).__name__, e))
|
|
|
except:
|
|
|
- ymsg(f"An error occurred with {target.desc} '{f.name}'")
|
|
|
+ ymsg('An error occurred with {} ‘{}’'.format(target.desc, f.name))
|
|
|
good.append(ret) if ret else bad.append(f)
|
|
|
self.cfg._util.qmsg('')
|
|
|
await asyncio.sleep(0.3)
|
|
|
@@ -704,7 +712,7 @@ class Autosign:
|
|
|
gmsg('No wallet encryption key on removable device')
|
|
|
|
|
|
def create_key(self):
|
|
|
- desc = f"key file '{self.keyfile}'"
|
|
|
+ desc = f'key file ‘{self.keyfile}’'
|
|
|
msg('Creating ' + desc)
|
|
|
try:
|
|
|
self.keyfile.write_text(os.urandom(32).hex())
|
|
|
@@ -747,7 +755,7 @@ class Autosign:
|
|
|
def setup(self):
|
|
|
|
|
|
def remove_wallet_dir():
|
|
|
- msg(f"Deleting '{self.wallet_dir}'")
|
|
|
+ msg(f'Deleting ‘{self.wallet_dir}’')
|
|
|
import shutil
|
|
|
try:
|
|
|
shutil.rmtree(self.wallet_dir)
|
|
|
@@ -762,7 +770,7 @@ class Autosign:
|
|
|
try:
|
|
|
self.wallet_dir.stat()
|
|
|
except:
|
|
|
- die(2, f"Unable to create wallet directory '{self.wallet_dir}'")
|
|
|
+ die(2, f'Unable to create wallet directory ‘{self.wallet_dir}’')
|
|
|
|
|
|
self.gen_key(no_unmount=True)
|
|
|
|
|
|
@@ -777,7 +785,7 @@ class Autosign:
|
|
|
wf = find_file_in_dir(get_wallet_cls('mmgen'), self.cfg.data_dir)
|
|
|
if wf and keypress_confirm(
|
|
|
cfg = self.cfg,
|
|
|
- prompt = f"Default wallet '{wf}' found.\nUse default wallet for autosigning?",
|
|
|
+ prompt = f'Default wallet ‘{wf}’ found.\nUse default wallet for autosigning?',
|
|
|
default_yes = True):
|
|
|
ss_in = Wallet(Config(), fn=wf)
|
|
|
else:
|
|
|
@@ -810,7 +818,9 @@ class Autosign:
|
|
|
def create_signing_wallets():
|
|
|
from . import xmrwallet
|
|
|
if len(self.wallet_files) > 1:
|
|
|
- ymsg(f'Warning: more than one wallet file, using the first ({self.wallet_files[0]}) for xmrwallet generation')
|
|
|
+ ymsg(
|
|
|
+ 'Warning: more than one wallet file, using the first '
|
|
|
+ f'({self.wallet_files[0]}) for xmrwallet generation')
|
|
|
m = xmrwallet.op(
|
|
|
'create_offline',
|
|
|
self.xmrwallet_cfg,
|
|
|
@@ -844,7 +854,7 @@ class Autosign:
|
|
|
|
|
|
s = getattr(Signable, s_name)(self)
|
|
|
|
|
|
- msg_r(f"Cleaning directory '{s.dir}'..")
|
|
|
+ msg_r(f'Cleaning directory ‘{s.dir}’..')
|
|
|
|
|
|
if s.dir.is_dir():
|
|
|
clean_files(s.rawext, s.sigext)
|
|
|
@@ -875,7 +885,9 @@ class Autosign:
|
|
|
if self.cfg.test_suite_root_pfx:
|
|
|
return self.mountpoint.exists()
|
|
|
else:
|
|
|
- return run(['diskutil', 'info', self.dev_label], stdout=DEVNULL, stderr=DEVNULL).returncode == 0
|
|
|
+ return run(
|
|
|
+ ['diskutil', 'info', self.dev_label],
|
|
|
+ stdout=DEVNULL, stderr=DEVNULL).returncode == 0
|
|
|
|
|
|
async def main_loop(self):
|
|
|
if not self.cfg.stealth_led:
|
|
|
@@ -890,7 +902,7 @@ class Autosign:
|
|
|
await self.do_sign()
|
|
|
prev_status = status
|
|
|
if not n % 10:
|
|
|
- msg_r(f"\r{' '*17}\rWaiting")
|
|
|
+ msg_r(f'\r{" "*17}\rWaiting')
|
|
|
await asyncio.sleep(0.2 if threaded else 1)
|
|
|
if not threaded:
|
|
|
msg_r('.')
|