new modules: autosign.signable, autosign.swap_mgr

This commit is contained in:
The MMGen Project 2026-05-08 13:34:30 +00:00
commit ce40c500e1
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
8 changed files with 485 additions and 440 deletions

View file

@ -18,440 +18,14 @@ from pathlib import Path
from subprocess import run, PIPE, DEVNULL
from ..cfg import Config
from ..util import msg, msg_r, ymsg, rmsg, gmsg, bmsg, die, suf, fmt, fmt_list, is_int, have_sudo, capfirst
from ..color import yellow, red, orange, brown, blue, gray
from ..util import msg, msg_r, ymsg, rmsg, gmsg, bmsg, die, suf, fmt, fmt_list, is_int, cached_property
from ..color import yellow, brown, gray
from ..wallet import Wallet, get_wallet_cls
from ..addrlist import AddrIdxList
from ..filename import find_file_in_dir
from ..fileutil import shred_file
from ..ui import keypress_confirm
def SwapMgr(*args, **kwargs):
match sys.platform:
case 'linux':
return SwapMgrLinux(*args, **kwargs)
case 'darwin':
return SwapMgrMacOS(*args, **kwargs)
class SwapMgrBase:
def __init__(self, cfg, *, ignore_zram=False):
self.cfg = cfg
self.ignore_zram = ignore_zram
self.desc = 'disk swap' if ignore_zram else 'swap'
def enable(self, *, quiet=False):
ret = self.do_enable()
if not quiet:
self.cfg._util.qmsg(
f'{capfirst(self.desc)} successfully enabled' if ret else
f'{capfirst(self.desc)} is already enabled' if ret is None else
f'Could not enable {self.desc}')
return ret
def disable(self, *, quiet=False):
self.cfg._util.qmsg_r(f'Attempting to disable {self.desc}...')
ret = self.do_disable()
self.cfg._util.qmsg('success')
if not quiet:
self.cfg._util.qmsg(
f'{capfirst(self.desc)} successfully disabled ({fmt_list(ret, fmt="no_quotes")})'
if ret and isinstance(ret, list) else
f'{capfirst(self.desc)} successfully disabled' if ret else
f'No active {self.desc}')
return ret
def process_cmds(self, op, cmds):
if not cmds:
return
if have_sudo(silent=True) and not self.cfg.test_suite:
for cmd in cmds:
run(cmd.split(), check=True)
else:
pre = 'failure\n' if op == 'disable' else ''
fs = blue('{a} {b} manually by executing the following command{c}:\n{d}')
post = orange('[To prevent this message in the future, enable sudo without a password]')
m = pre + fs.format(
a = 'Please disable' if op == 'disable' else 'Enable',
b = self.desc,
c = suf(cmds),
d = fmt_list(cmds, indent=' ', fmt='col')) + '\n' + post
msg(m)
if not self.cfg.test_suite:
sys.exit(1)
class SwapMgrLinux(SwapMgrBase):
def get_active(self):
for cmd in ('/sbin/swapon', 'swapon'):
try:
cp = run([cmd, '--show=NAME', '--noheadings'], stdout=PIPE, text=True, check=True)
break
except Exception:
if cmd == 'swapon':
raise
res = cp.stdout.splitlines()
return [e for e in res if not e.startswith('/dev/zram')] if self.ignore_zram else res
def do_enable(self):
if ret := self.get_active():
ymsg(f'Warning: {self.desc} is already enabled: ({fmt_list(ret, fmt="no_quotes")})')
self.process_cmds('enable', ['sudo swapon --all'])
return True
def do_disable(self):
swapdevs = self.get_active()
if not swapdevs:
return None
self.process_cmds('disable', [f'sudo swapoff {swapdev}' for swapdev in swapdevs])
return swapdevs
class SwapMgrMacOS(SwapMgrBase):
def get_active(self):
cmd = 'launchctl print system/com.apple.dynamic_pager'
return run(cmd.split(), stdout=DEVNULL, stderr=DEVNULL).returncode == 0
def _do_action(self, active, op, cmd):
if self.get_active() is active:
return None
else:
cmd = f'sudo launchctl {cmd} -w /System/Library/LaunchDaemons/com.apple.dynamic_pager.plist'
self.process_cmds(op, [cmd])
return True
def do_enable(self):
return self._do_action(active=True, op='enable', cmd='load')
def do_disable(self):
return self._do_action(active=False, op='disable', cmd='unload')
class Signable:
non_xmr_signables = (
'transaction',
'automount_transaction',
'message')
xmr_signables = ( # order is important!
'xmr_wallet_outputs_file', # import XMR wallet outputs BEFORE signing transactions
'xmr_transaction')
class base:
clean_all = False
multiple_ok = True
action_desc = 'signed'
fail_msg = 'failed to sign'
def __init__(self, parent):
self.parent = parent
self.cfg = parent.cfg
self.dir = getattr(parent, self.dir_name)
self.name = type(self).__name__
@property
def unsigned(self):
return self._unprocessed('_unsigned', self.rawext, self.sigext)
def _unprocessed(self, attrname, rawext, sigext):
if not hasattr(self, attrname):
dirlist = sorted(self.dir.iterdir())
names = {f.name for f in dirlist}
setattr(
self,
attrname,
tuple(f for f in dirlist
if f.name.endswith('.' + rawext)
and f.name[:-len(rawext)] + sigext not in names))
return getattr(self, attrname)
def print_bad_list(self, bad_files):
msg('\n{a}\n{b}'.format(
a = red(f'Failed {self.desc}s:'),
b = ' {}\n'.format('\n '.join(
self.gen_bad_list(sorted(bad_files, key=lambda f: f.name))))))
def gen_bad_list(self, bad_files):
for f in bad_files:
yield red(f.name)
class transaction(base):
desc = 'non-automount transaction'
dir_name = 'tx_dir'
rawext = 'rawtx'
sigext = 'sigtx'
automount = False
async def sign(self, f):
from ..tx import UnsignedTX
tx1 = UnsignedTX(
cfg = self.cfg,
filename = f,
automount = self.automount)
if tx1.proto.coin == 'XMR':
ctx = Signable.xmr_compat_transaction(self.parent)
for k in ('desc', 'print_summary', 'print_bad_list'):
setattr(self, k, getattr(ctx, k))
return await ctx.sign(f, compat_call=True)
if tx1.proto.sign_mode == 'daemon':
from ..rpc import rpc_init
tx1.rpc = await rpc_init(self.cfg, tx1.proto, ignore_wallet=True)
from ..tx.keys import TxKeys
tx2 = await tx1.sign(
TxKeys(
self.cfg,
tx1,
seedfiles = self.parent.wallet_files[:],
keylist = self.parent.keylist,
passwdfile = str(self.parent.keyfile),
autosign = True).keys)
if tx2:
tx2.file.write(ask_write=False, outdir=self.dir)
return tx2
else:
return False
def print_summary(self, signables):
if self.cfg.full_summary:
bmsg('\nAutosign summary:\n')
msg_r('\n'.join(tx.info.format(terse=True) for tx in signables))
return
def gen():
for tx in signables:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx, non_mmgen)
body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid, a_wid = 6, 44
def gen():
yield fs.format('TX ID ', 'Non-MMGen outputs'+' '*(a_wid-17), 'Amount')
yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
for tx, non_mmgen in body:
for nm in non_mmgen:
yield fs.format(
tx.txid.fmt(t_wid, color=True) if nm is non_mmgen[0] else ' '*t_wid,
nm.addr.fmt(nm.addr.view_pref, a_wid, color=True),
nm.amt.hl() + ' ' + yellow(tx.coin))
msg('\n' + '\n'.join(gen()))
else:
msg('\nNo non-MMGen outputs')
class automount_transaction(transaction):
desc = 'automount transaction'
dir_name = 'txauto_dir'
rawext = 'arawtx'
sigext = 'asigtx'
subext = 'asubtx'
multiple_ok = False
automount = True
@property
def unsubmitted(self):
return self._unprocessed('_unsubmitted', self.sigext, self.subext)
@property
def unsubmitted_raw(self):
return self._unprocessed('_unsubmitted_raw', self.rawext, self.subext)
unsent = unsubmitted
unsent_raw = unsubmitted_raw
@property
def submitted(self):
return self._processed('_submitted', self.subext)
def _processed(self, attrname, ext):
if not hasattr(self, attrname):
setattr(self, attrname, tuple(f for f in sorted(self.dir.iterdir())
if f.name.endswith('.' + ext)))
return getattr(self, attrname)
def die_wrong_num_txs(self, tx_type, *, msg=None, desc=None, show_dir=False):
match len(getattr(self, tx_type)): # num_txs
case 0: subj, suf, pred = ('No', 's', 'present')
case 1: subj, suf, pred = ('One', '', 'already present')
case _: subj, suf, pred = ('More than one', '', 'already present')
die('AutosignTXError', '{m}{a} {b} transaction{c} {d} {e}!'.format(
m = msg + '\n' if msg else '',
a = subj,
b = desc or tx_type,
c = suf,
d = pred,
e = f'in ‘{getattr(self.parent, self.dir_name)}'
if show_dir else 'on removable device'))
def check_create_ok(self):
if len(self.unsigned):
self.die_wrong_num_txs('unsigned', msg='Cannot create transaction')
if len(self.unsent):
die('AutosignTXError', 'Cannot create transaction: you have an unsent transaction')
def get_unsubmitted(self, tx_type='unsubmitted'):
if len(self.unsubmitted) == 1:
return self.unsubmitted[0]
else:
self.die_wrong_num_txs(tx_type)
def get_unsent(self):
return self.get_unsubmitted('unsent')
def get_submitted(self):
if len(self.submitted) == 0:
self.die_wrong_num_txs('submitted')
else:
return self.submitted
def get_abortable(self):
if len(self.unsent_raw) != 1:
self.die_wrong_num_txs('unsent_raw', desc='unsent')
if len(self.unsent) > 1:
self.die_wrong_num_txs('unsent')
if self.unsent:
if self.unsent[0].stem != self.unsent_raw[0].stem:
die(1, f'{self.unsent[0]}, {self.unsent_raw[0]}: file mismatch')
return self.unsent_raw + self.unsent
def shred_abortable(self):
files = self.get_abortable() # raises AutosignTXError if no unsent TXs available
keypress_confirm(
self.cfg,
'The following file{} will be securely deleted:\n{}\nOK?'.format(
suf(files),
fmt_list(map(str, files), fmt='col', indent=' ')),
do_exit = True)
for fn in files:
msg(f'Shredding file ‘{fn}')
shred_file(self.cfg, fn, iterations=15)
sys.exit(0)
async def get_last_sent(self, *, tx_range):
return await self.get_last_created(
# compat fallback - ‘sent_timestamp’ attr is missing in some old TX files:
sort_key = lambda x: x.sent_timestamp or x.timestamp,
tx_range = tx_range)
async def get_last_created(self, *, tx_range, sort_key=lambda x: x.timestamp):
from ..tx import CompletedTX
fns = [f for f in self.dir.iterdir() if f.name.endswith(self.subext)]
files = sorted(
[await CompletedTX(cfg=self.cfg, filename=str(txfile), quiet_open=True)
for txfile in fns],
key = sort_key)
if files:
return files[len(files) - 1 - tx_range.last:len(files) - tx_range.first]
else:
die(1, 'No sent automount transactions!')
class xmr_signable: # mixin class
automount = True
summary_footer = ''
def need_daemon_restart(self, m, new_idx):
old_idx = self.parent.xmr_cur_wallet_idx
self.parent.xmr_cur_wallet_idx = new_idx
return old_idx != new_idx or m.wd.state != 'ready'
def print_summary(self, signables):
bmsg('\nAutosign summary:')
msg('\n'.join(s.get_info(indent=' ') for s in signables) + self.summary_footer)
class xmr_transaction(xmr_signable, automount_transaction):
desc = 'Monero non-compat transaction'
dir_name = 'xmr_tx_dir'
rawext = 'rawtx'
sigext = 'sigtx'
subext = 'subtx'
async def sign(self, f, compat_call=False):
from .. import xmrwallet
from ..xmrwallet.file.tx import MoneroMMGenTX
tx1 = MoneroMMGenTX.Completed(self.parent.xmrwallet_cfg, f)
m = xmrwallet.op(
'sign',
self.parent.xmrwallet_cfg,
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(tx1.src_wallet_idx),
compat_call = compat_call)
tx2 = await m.main(f, restart_daemon=self.need_daemon_restart(m, tx1.src_wallet_idx))
tx2.write(ask_write=False)
return tx2
class xmr_compat_transaction(xmr_transaction):
desc = 'Monero compat transaction'
dir_name = 'txauto_dir'
rawext = 'arawtx'
sigext = 'asigtx'
subext = 'asubtx'
class xmr_wallet_outputs_file(xmr_signable, base):
desc = 'Monero wallet outputs file'
dir_name = 'xmr_outputs_dir'
rawext = 'raw'
sigext = 'sig'
clean_all = True
summary_footer = '\n'
@property
def unsigned(self):
import json
return tuple(
f for f in super().unsigned
if not json.loads(f.read_text())['MoneroMMGenWalletOutputsFile']['data']['imported'])
async def sign(self, f):
from .. import xmrwallet
wallet_idx = xmrwallet.op_cls('wallet').get_idx_from_fn(f)
m = xmrwallet.op(
'import_outputs',
self.parent.xmrwallet_cfg,
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(wallet_idx))
obj = await m.main(f, wallet_idx, restart_daemon=self.need_daemon_restart(m, wallet_idx))
obj.write(quiet=not obj.data.sign)
self.action_desc = 'imported and signed' if obj.data.sign else 'imported'
return obj
class message(base):
desc = 'message file'
dir_name = 'msg_dir'
rawext = 'rawmsg.json'
sigext = 'sigmsg.json'
fail_msg = 'failed to sign or signed incompletely'
async def sign(self, f):
from ..msg import UnsignedMsg, SignedMsg
m = UnsignedMsg(self.cfg, infile=f)
await m.sign(wallet_files=self.parent.wallet_files[:], passwd_file=str(self.parent.keyfile))
m = SignedMsg(self.cfg, data=m.__dict__)
m.write_to_file(
outdir = self.dir.resolve(),
ask_overwrite = False)
if m.data.get('failed_sids'):
die(
'MsgFileFailedSID',
f'Failed Seed IDs: {fmt_list(m.data["failed_sids"], fmt="bare")}')
return m
def print_summary(self, signables):
gmsg('\nSigned message files:')
for message in signables:
gmsg(' ' + message.signed_filename)
def gen_bad_list(self, bad_files):
for f in bad_files:
sigfile = f.parent / (f.name[:-len(self.rawext)] + self.sigext)
yield orange(sigfile.name) if sigfile.exists() else red(f.name)
class Autosign:
dev_label = 'MMGEN_TX'
@ -490,6 +64,15 @@ class Autosign:
'xmr_tx_dir': 'xmr/tx',
'xmr_outputs_dir': 'xmr/outputs'}
non_xmr_signables = (
'transaction',
'automount_transaction',
'message')
xmr_signables = ( # order is important!
'xmr_wallet_outputs_file', # import XMR wallet outputs BEFORE signing transactions
'xmr_transaction')
have_xmr = False
xmr_only = False
@ -575,20 +158,23 @@ class Autosign:
if not self.xmr_only:
self.dirs |= self.non_xmr_dirs
self.signables += Signable.non_xmr_signables
self.signables += self.non_xmr_signables
if self.have_xmr:
self.dirs |= self.xmr_dirs | (
{'txauto_dir': 'txauto'} if cfg.xmrwallet_compat and self.xmr_only else {})
self.signables = (
Signable.xmr_signables # xmr_wallet_outputs_file must be signed before XMR TXs
self.xmr_signables # xmr_wallet_outputs_file must be signed before XMR TXs
+ (('automount_transaction',) if cfg.xmrwallet_compat and self.xmr_only else ())
+ self.signables) # self.signables could contain compat XMR TXs
for name, path in self.dirs.items():
setattr(self, name, self.mountpoint / path)
self.swap = SwapMgr(self.cfg, ignore_zram=True)
@cached_property
def swap(self):
from .swap_mgr import SwapMgr
return SwapMgr(self.cfg, ignore_zram=True)
async def check_daemons_running(self):
from ..protocol import init_proto
@ -683,6 +269,7 @@ class Autosign:
return not fails
async def sign_all(self, target_name):
from .signable import Signable
target = getattr(Signable, target_name)(self)
if target.unsigned:
good = []
@ -892,6 +479,7 @@ class Autosign:
if raw.is_file():
do_shred(raw)
from .signable import Signable
s = getattr(Signable, s_name)(self)
msg_r(f'Cleaning directory ‘{s.dir}’..')

337
mmgen/autosign/signable.py Executable file
View file

@ -0,0 +1,337 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
autosign.signable: Signable class for MMGen Wallet autosigning
"""
import sys
from ..util import msg, msg_r, gmsg, bmsg, die, suf, fmt_list
from ..color import yellow, red, orange
from ..fileutil import shred_file
from ..ui import keypress_confirm
class Signable:
class base:
clean_all = False
multiple_ok = True
action_desc = 'signed'
fail_msg = 'failed to sign'
def __init__(self, parent):
self.parent = parent
self.cfg = parent.cfg
self.dir = getattr(parent, self.dir_name)
self.name = type(self).__name__
@property
def unsigned(self):
return self._unprocessed('_unsigned', self.rawext, self.sigext)
def _unprocessed(self, attrname, rawext, sigext):
if not hasattr(self, attrname):
dirlist = sorted(self.dir.iterdir())
names = {f.name for f in dirlist}
setattr(
self,
attrname,
tuple(f for f in dirlist
if f.name.endswith('.' + rawext)
and f.name[:-len(rawext)] + sigext not in names))
return getattr(self, attrname)
def print_bad_list(self, bad_files):
msg('\n{a}\n{b}'.format(
a = red(f'Failed {self.desc}s:'),
b = ' {}\n'.format('\n '.join(
self.gen_bad_list(sorted(bad_files, key=lambda f: f.name))))))
def gen_bad_list(self, bad_files):
for f in bad_files:
yield red(f.name)
class transaction(base):
desc = 'non-automount transaction'
dir_name = 'tx_dir'
rawext = 'rawtx'
sigext = 'sigtx'
automount = False
async def sign(self, f):
from ..tx import UnsignedTX
tx1 = UnsignedTX(
cfg = self.cfg,
filename = f,
automount = self.automount)
if tx1.proto.coin == 'XMR':
ctx = Signable.xmr_compat_transaction(self.parent)
for k in ('desc', 'print_summary', 'print_bad_list'):
setattr(self, k, getattr(ctx, k))
return await ctx.sign(f, compat_call=True)
if tx1.proto.sign_mode == 'daemon':
from ..rpc import rpc_init
tx1.rpc = await rpc_init(self.cfg, tx1.proto, ignore_wallet=True)
from ..tx.keys import TxKeys
tx2 = await tx1.sign(
TxKeys(
self.cfg,
tx1,
seedfiles = self.parent.wallet_files[:],
keylist = self.parent.keylist,
passwdfile = str(self.parent.keyfile),
autosign = True).keys)
if tx2:
tx2.file.write(ask_write=False, outdir=self.dir)
return tx2
else:
return False
def print_summary(self, signables):
if self.cfg.full_summary:
bmsg('\nAutosign summary:\n')
msg_r('\n'.join(tx.info.format(terse=True) for tx in signables))
return
def gen():
for tx in signables:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx, non_mmgen)
body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid, a_wid = 6, 44
def gen():
yield fs.format('TX ID ', 'Non-MMGen outputs'+' '*(a_wid-17), 'Amount')
yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
for tx, non_mmgen in body:
for nm in non_mmgen:
yield fs.format(
tx.txid.fmt(t_wid, color=True) if nm is non_mmgen[0] else ' '*t_wid,
nm.addr.fmt(nm.addr.view_pref, a_wid, color=True),
nm.amt.hl() + ' ' + yellow(tx.coin))
msg('\n' + '\n'.join(gen()))
else:
msg('\nNo non-MMGen outputs')
class automount_transaction(transaction):
desc = 'automount transaction'
dir_name = 'txauto_dir'
rawext = 'arawtx'
sigext = 'asigtx'
subext = 'asubtx'
multiple_ok = False
automount = True
@property
def unsubmitted(self):
return self._unprocessed('_unsubmitted', self.sigext, self.subext)
@property
def unsubmitted_raw(self):
return self._unprocessed('_unsubmitted_raw', self.rawext, self.subext)
unsent = unsubmitted
unsent_raw = unsubmitted_raw
@property
def submitted(self):
return self._processed('_submitted', self.subext)
def _processed(self, attrname, ext):
if not hasattr(self, attrname):
setattr(self, attrname, tuple(f for f in sorted(self.dir.iterdir())
if f.name.endswith('.' + ext)))
return getattr(self, attrname)
def die_wrong_num_txs(self, tx_type, *, msg=None, desc=None, show_dir=False):
match len(getattr(self, tx_type)): # num_txs
case 0: subj, suf, pred = ('No', 's', 'present')
case 1: subj, suf, pred = ('One', '', 'already present')
case _: subj, suf, pred = ('More than one', '', 'already present')
die('AutosignTXError', '{m}{a} {b} transaction{c} {d} {e}!'.format(
m = msg + '\n' if msg else '',
a = subj,
b = desc or tx_type,
c = suf,
d = pred,
e = f'in ‘{getattr(self.parent, self.dir_name)}'
if show_dir else 'on removable device'))
def check_create_ok(self):
if len(self.unsigned):
self.die_wrong_num_txs('unsigned', msg='Cannot create transaction')
if len(self.unsent):
die('AutosignTXError', 'Cannot create transaction: you have an unsent transaction')
def get_unsubmitted(self, tx_type='unsubmitted'):
if len(self.unsubmitted) == 1:
return self.unsubmitted[0]
else:
self.die_wrong_num_txs(tx_type)
def get_unsent(self):
return self.get_unsubmitted('unsent')
def get_submitted(self):
if len(self.submitted) == 0:
self.die_wrong_num_txs('submitted')
else:
return self.submitted
def get_abortable(self):
if len(self.unsent_raw) != 1:
self.die_wrong_num_txs('unsent_raw', desc='unsent')
if len(self.unsent) > 1:
self.die_wrong_num_txs('unsent')
if self.unsent:
if self.unsent[0].stem != self.unsent_raw[0].stem:
die(1, f'{self.unsent[0]}, {self.unsent_raw[0]}: file mismatch')
return self.unsent_raw + self.unsent
def shred_abortable(self):
files = self.get_abortable() # raises AutosignTXError if no unsent TXs available
keypress_confirm(
self.cfg,
'The following file{} will be securely deleted:\n{}\nOK?'.format(
suf(files),
fmt_list(map(str, files), fmt='col', indent=' ')),
do_exit = True)
for fn in files:
msg(f'Shredding file ‘{fn}')
shred_file(self.cfg, fn, iterations=15)
sys.exit(0)
async def get_last_sent(self, *, tx_range):
return await self.get_last_created(
# compat fallback - ‘sent_timestamp’ attr is missing in some old TX files:
sort_key = lambda x: x.sent_timestamp or x.timestamp,
tx_range = tx_range)
async def get_last_created(self, *, tx_range, sort_key=lambda x: x.timestamp):
from ..tx import CompletedTX
fns = [f for f in self.dir.iterdir() if f.name.endswith(self.subext)]
files = sorted(
[await CompletedTX(cfg=self.cfg, filename=str(txfile), quiet_open=True)
for txfile in fns],
key = sort_key)
if files:
return files[len(files) - 1 - tx_range.last:len(files) - tx_range.first]
else:
die(1, 'No sent automount transactions!')
class xmr_signable: # mixin class
automount = True
summary_footer = ''
def need_daemon_restart(self, m, new_idx):
old_idx = self.parent.xmr_cur_wallet_idx
self.parent.xmr_cur_wallet_idx = new_idx
return old_idx != new_idx or m.wd.state != 'ready'
def print_summary(self, signables):
bmsg('\nAutosign summary:')
msg('\n'.join(s.get_info(indent=' ') for s in signables) + self.summary_footer)
class xmr_transaction(xmr_signable, automount_transaction):
desc = 'Monero non-compat transaction'
dir_name = 'xmr_tx_dir'
rawext = 'rawtx'
sigext = 'sigtx'
subext = 'subtx'
async def sign(self, f, compat_call=False):
from .. import xmrwallet
from ..xmrwallet.file.tx import MoneroMMGenTX
tx1 = MoneroMMGenTX.Completed(self.parent.xmrwallet_cfg, f)
m = xmrwallet.op(
'sign',
self.parent.xmrwallet_cfg,
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(tx1.src_wallet_idx),
compat_call = compat_call)
tx2 = await m.main(f, restart_daemon=self.need_daemon_restart(m, tx1.src_wallet_idx))
tx2.write(ask_write=False)
return tx2
class xmr_compat_transaction(xmr_transaction):
desc = 'Monero compat transaction'
dir_name = 'txauto_dir'
rawext = 'arawtx'
sigext = 'asigtx'
subext = 'asubtx'
class xmr_wallet_outputs_file(xmr_signable, base):
desc = 'Monero wallet outputs file'
dir_name = 'xmr_outputs_dir'
rawext = 'raw'
sigext = 'sig'
clean_all = True
summary_footer = '\n'
@property
def unsigned(self):
import json
return tuple(
f for f in super().unsigned
if not json.loads(f.read_text())['MoneroMMGenWalletOutputsFile']['data']['imported'])
async def sign(self, f):
from .. import xmrwallet
wallet_idx = xmrwallet.op_cls('wallet').get_idx_from_fn(f)
m = xmrwallet.op(
'import_outputs',
self.parent.xmrwallet_cfg,
infile = str(self.parent.wallet_files[0]), # MMGen wallet file
wallets = str(wallet_idx))
obj = await m.main(f, wallet_idx, restart_daemon=self.need_daemon_restart(m, wallet_idx))
obj.write(quiet=not obj.data.sign)
self.action_desc = 'imported and signed' if obj.data.sign else 'imported'
return obj
class message(base):
desc = 'message file'
dir_name = 'msg_dir'
rawext = 'rawmsg.json'
sigext = 'sigmsg.json'
fail_msg = 'failed to sign or signed incompletely'
async def sign(self, f):
from ..msg import UnsignedMsg, SignedMsg
m = UnsignedMsg(self.cfg, infile=f)
await m.sign(wallet_files=self.parent.wallet_files[:], passwd_file=str(self.parent.keyfile))
m = SignedMsg(self.cfg, data=m.__dict__)
m.write_to_file(
outdir = self.dir.resolve(),
ask_overwrite = False)
if m.data.get('failed_sids'):
die(
'MsgFileFailedSID',
f'Failed Seed IDs: {fmt_list(m.data["failed_sids"], fmt="bare")}')
return m
def print_summary(self, signables):
gmsg('\nSigned message files:')
for message in signables:
gmsg(' ' + message.signed_filename)
def gen_bad_list(self, bad_files):
for f in bad_files:
sigfile = f.parent / (f.name[:-len(self.rawext)] + self.sigext)
yield orange(sigfile.name) if sigfile.exists() else red(f.name)

119
mmgen/autosign/swap_mgr.py Executable file
View file

@ -0,0 +1,119 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
autosign.swap_mgr: swap management for MMGen Wallet autosigning
"""
import sys
from subprocess import run, PIPE, DEVNULL
from ..util import msg, ymsg, suf, fmt_list, have_sudo, capfirst
from ..color import orange, blue
def SwapMgr(*args, **kwargs):
match sys.platform:
case 'linux':
return SwapMgrLinux(*args, **kwargs)
case 'darwin':
return SwapMgrMacOS(*args, **kwargs)
class SwapMgrBase:
def __init__(self, cfg, *, ignore_zram=False):
self.cfg = cfg
self.ignore_zram = ignore_zram
self.desc = 'disk swap' if ignore_zram else 'swap'
def enable(self, *, quiet=False):
ret = self.do_enable()
if not quiet:
self.cfg._util.qmsg(
f'{capfirst(self.desc)} successfully enabled' if ret else
f'{capfirst(self.desc)} is already enabled' if ret is None else
f'Could not enable {self.desc}')
return ret
def disable(self, *, quiet=False):
self.cfg._util.qmsg_r(f'Attempting to disable {self.desc}...')
ret = self.do_disable()
self.cfg._util.qmsg('success')
if not quiet:
self.cfg._util.qmsg(
f'{capfirst(self.desc)} successfully disabled ({fmt_list(ret, fmt="no_quotes")})'
if ret and isinstance(ret, list) else
f'{capfirst(self.desc)} successfully disabled' if ret else
f'No active {self.desc}')
return ret
def process_cmds(self, op, cmds):
if not cmds:
return
if have_sudo(silent=True) and not self.cfg.test_suite:
for cmd in cmds:
run(cmd.split(), check=True)
else:
pre = 'failure\n' if op == 'disable' else ''
fs = blue('{a} {b} manually by executing the following command{c}:\n{d}')
post = orange('[To prevent this message in the future, enable sudo without a password]')
m = pre + fs.format(
a = 'Please disable' if op == 'disable' else 'Enable',
b = self.desc,
c = suf(cmds),
d = fmt_list(cmds, indent=' ', fmt='col')) + '\n' + post
msg(m)
if not self.cfg.test_suite:
sys.exit(1)
class SwapMgrLinux(SwapMgrBase):
def get_active(self):
for cmd in ('/sbin/swapon', 'swapon'):
try:
cp = run([cmd, '--show=NAME', '--noheadings'], stdout=PIPE, text=True, check=True)
break
except Exception:
if cmd == 'swapon':
raise
res = cp.stdout.splitlines()
return [e for e in res if not e.startswith('/dev/zram')] if self.ignore_zram else res
def do_enable(self):
if ret := self.get_active():
ymsg(f'Warning: {self.desc} is already enabled: ({fmt_list(ret, fmt="no_quotes")})')
self.process_cmds('enable', ['sudo swapon --all'])
return True
def do_disable(self):
swapdevs = self.get_active()
if not swapdevs:
return None
self.process_cmds('disable', [f'sudo swapoff {swapdev}' for swapdev in swapdevs])
return swapdevs
class SwapMgrMacOS(SwapMgrBase):
def get_active(self):
cmd = 'launchctl print system/com.apple.dynamic_pager'
return run(cmd.split(), stdout=DEVNULL, stderr=DEVNULL).returncode == 0
def _do_action(self, active, op, cmd):
if self.get_active() is active:
return None
else:
cmd = f'sudo launchctl {cmd} -w /System/Library/LaunchDaemons/com.apple.dynamic_pager.plist'
self.process_cmds(op, [cmd])
return True
def do_enable(self):
return self._do_action(active=True, op='enable', cmd='load')
def do_disable(self):
return self._do_action(active=False, op='disable', cmd='unload')

View file

@ -168,7 +168,7 @@ async def main():
if cfg.autosign:
from .tx.util import mount_removable_device
from .autosign import Signable
from .autosign.signable import Signable
asi = mount_removable_device(cfg)
si = Signable.automount_transaction(asi)
if si.unsigned or si.unsent:

View file

@ -138,7 +138,7 @@ async def main():
if cfg.autosign:
from .tx.util import mount_removable_device
from .autosign import Signable
from .autosign.signable import Signable
asi = mount_removable_device(cfg, add_cfg={'xmrwallet_compat': True})
Signable.automount_transaction(asi).check_create_ok()

View file

@ -109,7 +109,7 @@ def init_autosign(arg):
global asi, si, infile, tx_range
from .tx.util import mount_removable_device
from .tx.online import SentTXRange
from .autosign import Signable
from .autosign.signable import Signable
asi = mount_removable_device(cfg)
si = Signable.automount_transaction(asi)
if cfg.abort:

View file

@ -42,7 +42,7 @@ class OpSubmit(OpWallet):
if self.uargs.infile:
fn = Path(self.uargs.infile)
else:
from ...autosign import Signable
from ...autosign.signable import Signable
fn = Signable.xmr_transaction(self.asi).get_unsubmitted()
return self.get_tx_cls('ColdSigned')(cfg=self.cfg, fn=fn)
@ -115,7 +115,7 @@ class OpResubmit(OpSubmit):
die(1, '--autosign is required for this operation')
def get_tx(self):
from ...autosign import Signable
from ...autosign.signable import Signable
fns = Signable.xmr_transaction(self.asi).get_submitted()
cls = self.get_tx_cls('Submitted')
return sorted((cls(self.cfg, Path(fn)) for fn in fns),
@ -127,5 +127,5 @@ class OpAbort(OpBase):
def __init__(self, cfg, uarg_tuple):
super().__init__(cfg, uarg_tuple)
self.mount_removable_device()
from ...autosign import Signable
from ...autosign.signable import Signable
Signable.xmr_transaction(self.asi).shred_abortable() # prompts user, then raises exception or exits

View file

@ -28,7 +28,8 @@ from mmgen.cfg import Config
from mmgen.color import red, blue, yellow, cyan, orange, purple, gray
from mmgen.util import msg, suf, die, indent, fmt
from mmgen.led import LEDControl
from mmgen.autosign import Autosign, Signable
from mmgen.autosign import Autosign
from mmgen.autosign.signable import Signable
from ..include.common import (
omsg,
@ -998,11 +999,11 @@ class CmdTestAutosign(CmdTestAutosignBase):
res = t.read()
self.remove_device()
for signable_list in present:
for signable_clsname in getattr(Signable, signable_list):
for signable_clsname in getattr(Autosign, signable_list):
desc = getattr(Signable, signable_clsname).desc
assert f'No unsigned {desc}s' in res, f'‘No unsigned {desc}s’ missing in output'
for signable_list in absent:
for signable_clsname in getattr(Signable, signable_list):
for signable_clsname in getattr(Autosign, signable_list):
desc = getattr(Signable, signable_clsname).desc
assert not f'No unsigned {desc}s' in res, f'‘No unsigned {desc}s’ should be absent in output'
return t