diff --git a/mmgen/autosign/__init__.py b/mmgen/autosign/__init__.py index 3eed51b0..7fd872d8 100755 --- a/mmgen/autosign/__init__.py +++ b/mmgen/autosign/__init__.py @@ -18,440 +18,14 @@ from pathlib import Path from subprocess import run, PIPE, DEVNULL from ..cfg import Config -from ..util import msg, msg_r, ymsg, rmsg, gmsg, bmsg, die, suf, fmt, fmt_list, is_int, have_sudo, capfirst -from ..color import yellow, red, orange, brown, blue, gray +from ..util import msg, msg_r, ymsg, rmsg, gmsg, bmsg, die, suf, fmt, fmt_list, is_int, cached_property +from ..color import yellow, brown, gray from ..wallet import Wallet, get_wallet_cls from ..addrlist import AddrIdxList from ..filename import find_file_in_dir from ..fileutil import shred_file from ..ui import keypress_confirm -def SwapMgr(*args, **kwargs): - match sys.platform: - case 'linux': - return SwapMgrLinux(*args, **kwargs) - case 'darwin': - return SwapMgrMacOS(*args, **kwargs) - -class SwapMgrBase: - - def __init__(self, cfg, *, ignore_zram=False): - self.cfg = cfg - self.ignore_zram = ignore_zram - self.desc = 'disk swap' if ignore_zram else 'swap' - - def enable(self, *, quiet=False): - ret = self.do_enable() - if not quiet: - self.cfg._util.qmsg( - f'{capfirst(self.desc)} successfully enabled' if ret else - f'{capfirst(self.desc)} is already enabled' if ret is None else - f'Could not enable {self.desc}') - return ret - - def disable(self, *, quiet=False): - self.cfg._util.qmsg_r(f'Attempting to disable {self.desc}...') - ret = self.do_disable() - self.cfg._util.qmsg('success') - if not quiet: - self.cfg._util.qmsg( - f'{capfirst(self.desc)} successfully disabled ({fmt_list(ret, fmt="no_quotes")})' - if ret and isinstance(ret, list) else - f'{capfirst(self.desc)} successfully disabled' if ret else - f'No active {self.desc}') - return ret - - def process_cmds(self, op, cmds): - if not cmds: - return - if have_sudo(silent=True) and not self.cfg.test_suite: - for cmd in cmds: - run(cmd.split(), check=True) - else: - pre = 'failure\n' if op == 'disable' else '' - fs = blue('{a} {b} manually by executing the following command{c}:\n{d}') - post = orange('[To prevent this message in the future, enable sudo without a password]') - m = pre + fs.format( - a = 'Please disable' if op == 'disable' else 'Enable', - b = self.desc, - c = suf(cmds), - d = fmt_list(cmds, indent=' ', fmt='col')) + '\n' + post - msg(m) - if not self.cfg.test_suite: - sys.exit(1) - -class SwapMgrLinux(SwapMgrBase): - - def get_active(self): - for cmd in ('/sbin/swapon', 'swapon'): - try: - cp = run([cmd, '--show=NAME', '--noheadings'], stdout=PIPE, text=True, check=True) - break - except Exception: - if cmd == 'swapon': - raise - res = cp.stdout.splitlines() - return [e for e in res if not e.startswith('/dev/zram')] if self.ignore_zram else res - - def do_enable(self): - if ret := self.get_active(): - ymsg(f'Warning: {self.desc} is already enabled: ({fmt_list(ret, fmt="no_quotes")})') - self.process_cmds('enable', ['sudo swapon --all']) - return True - - def do_disable(self): - swapdevs = self.get_active() - if not swapdevs: - return None - self.process_cmds('disable', [f'sudo swapoff {swapdev}' for swapdev in swapdevs]) - return swapdevs - -class SwapMgrMacOS(SwapMgrBase): - - def get_active(self): - cmd = 'launchctl print system/com.apple.dynamic_pager' - return run(cmd.split(), stdout=DEVNULL, stderr=DEVNULL).returncode == 0 - - def _do_action(self, active, op, cmd): - if self.get_active() is active: - return None - else: - cmd = f'sudo launchctl {cmd} -w /System/Library/LaunchDaemons/com.apple.dynamic_pager.plist' - self.process_cmds(op, [cmd]) - return True - - def do_enable(self): - return self._do_action(active=True, op='enable', cmd='load') - - def do_disable(self): - return self._do_action(active=False, op='disable', cmd='unload') - -class Signable: - - non_xmr_signables = ( - 'transaction', - 'automount_transaction', - 'message') - - xmr_signables = ( # order is important! - 'xmr_wallet_outputs_file', # import XMR wallet outputs BEFORE signing transactions - 'xmr_transaction') - - class base: - - clean_all = False - multiple_ok = True - action_desc = 'signed' - fail_msg = 'failed to sign' - - def __init__(self, parent): - self.parent = parent - self.cfg = parent.cfg - self.dir = getattr(parent, self.dir_name) - self.name = type(self).__name__ - - @property - def unsigned(self): - return self._unprocessed('_unsigned', self.rawext, self.sigext) - - def _unprocessed(self, attrname, rawext, sigext): - if not hasattr(self, attrname): - dirlist = sorted(self.dir.iterdir()) - names = {f.name for f in dirlist} - setattr( - self, - attrname, - tuple(f for f in dirlist - if f.name.endswith('.' + rawext) - and f.name[:-len(rawext)] + sigext not in names)) - return getattr(self, attrname) - - def print_bad_list(self, bad_files): - msg('\n{a}\n{b}'.format( - a = red(f'Failed {self.desc}s:'), - b = ' {}\n'.format('\n '.join( - self.gen_bad_list(sorted(bad_files, key=lambda f: f.name)))))) - - def gen_bad_list(self, bad_files): - for f in bad_files: - yield red(f.name) - - class transaction(base): - desc = 'non-automount transaction' - dir_name = 'tx_dir' - rawext = 'rawtx' - sigext = 'sigtx' - automount = False - - async def sign(self, f): - from ..tx import UnsignedTX - tx1 = UnsignedTX( - cfg = self.cfg, - filename = f, - automount = self.automount) - if tx1.proto.coin == 'XMR': - ctx = Signable.xmr_compat_transaction(self.parent) - for k in ('desc', 'print_summary', 'print_bad_list'): - setattr(self, k, getattr(ctx, k)) - return await ctx.sign(f, compat_call=True) - if tx1.proto.sign_mode == 'daemon': - from ..rpc import rpc_init - tx1.rpc = await rpc_init(self.cfg, tx1.proto, ignore_wallet=True) - from ..tx.keys import TxKeys - tx2 = await tx1.sign( - TxKeys( - self.cfg, - tx1, - seedfiles = self.parent.wallet_files[:], - keylist = self.parent.keylist, - passwdfile = str(self.parent.keyfile), - autosign = True).keys) - if tx2: - tx2.file.write(ask_write=False, outdir=self.dir) - return tx2 - else: - return False - - def print_summary(self, signables): - - if self.cfg.full_summary: - bmsg('\nAutosign summary:\n') - msg_r('\n'.join(tx.info.format(terse=True) for tx in signables)) - return - - def gen(): - for tx in signables: - non_mmgen = [o for o in tx.outputs if not o.mmid] - if non_mmgen: - yield (tx, non_mmgen) - - body = list(gen()) - - if body: - bmsg('\nAutosign summary:') - fs = '{} {} {}' - t_wid, a_wid = 6, 44 - - def gen(): - yield fs.format('TX ID ', 'Non-MMGen outputs'+' '*(a_wid-17), 'Amount') - yield fs.format('-'*t_wid, '-'*a_wid, '-'*7) - for tx, non_mmgen in body: - for nm in non_mmgen: - yield fs.format( - tx.txid.fmt(t_wid, color=True) if nm is non_mmgen[0] else ' '*t_wid, - nm.addr.fmt(nm.addr.view_pref, a_wid, color=True), - nm.amt.hl() + ' ' + yellow(tx.coin)) - - msg('\n' + '\n'.join(gen())) - else: - msg('\nNo non-MMGen outputs') - - class automount_transaction(transaction): - desc = 'automount transaction' - dir_name = 'txauto_dir' - rawext = 'arawtx' - sigext = 'asigtx' - subext = 'asubtx' - multiple_ok = False - automount = True - - @property - def unsubmitted(self): - return self._unprocessed('_unsubmitted', self.sigext, self.subext) - - @property - def unsubmitted_raw(self): - return self._unprocessed('_unsubmitted_raw', self.rawext, self.subext) - - unsent = unsubmitted - unsent_raw = unsubmitted_raw - - @property - def submitted(self): - return self._processed('_submitted', self.subext) - - def _processed(self, attrname, ext): - if not hasattr(self, attrname): - setattr(self, attrname, tuple(f for f in sorted(self.dir.iterdir()) - if f.name.endswith('.' + ext))) - return getattr(self, attrname) - - def die_wrong_num_txs(self, tx_type, *, msg=None, desc=None, show_dir=False): - match len(getattr(self, tx_type)): # num_txs - case 0: subj, suf, pred = ('No', 's', 'present') - case 1: subj, suf, pred = ('One', '', 'already present') - case _: subj, suf, pred = ('More than one', '', 'already present') - die('AutosignTXError', '{m}{a} {b} transaction{c} {d} {e}!'.format( - m = msg + '\n' if msg else '', - a = subj, - b = desc or tx_type, - c = suf, - d = pred, - e = f'in ‘{getattr(self.parent, self.dir_name)}’' - if show_dir else 'on removable device')) - - def check_create_ok(self): - if len(self.unsigned): - self.die_wrong_num_txs('unsigned', msg='Cannot create transaction') - if len(self.unsent): - die('AutosignTXError', 'Cannot create transaction: you have an unsent transaction') - - def get_unsubmitted(self, tx_type='unsubmitted'): - if len(self.unsubmitted) == 1: - return self.unsubmitted[0] - else: - self.die_wrong_num_txs(tx_type) - - def get_unsent(self): - return self.get_unsubmitted('unsent') - - def get_submitted(self): - if len(self.submitted) == 0: - self.die_wrong_num_txs('submitted') - else: - return self.submitted - - def get_abortable(self): - if len(self.unsent_raw) != 1: - self.die_wrong_num_txs('unsent_raw', desc='unsent') - if len(self.unsent) > 1: - self.die_wrong_num_txs('unsent') - if self.unsent: - if self.unsent[0].stem != self.unsent_raw[0].stem: - die(1, f'{self.unsent[0]}, {self.unsent_raw[0]}: file mismatch') - return self.unsent_raw + self.unsent - - def shred_abortable(self): - files = self.get_abortable() # raises AutosignTXError if no unsent TXs available - keypress_confirm( - self.cfg, - 'The following file{} will be securely deleted:\n{}\nOK?'.format( - suf(files), - fmt_list(map(str, files), fmt='col', indent=' ')), - do_exit = True) - for fn in files: - msg(f'Shredding file ‘{fn}’') - shred_file(self.cfg, fn, iterations=15) - sys.exit(0) - - async def get_last_sent(self, *, tx_range): - return await self.get_last_created( - # compat fallback - ‘sent_timestamp’ attr is missing in some old TX files: - sort_key = lambda x: x.sent_timestamp or x.timestamp, - tx_range = tx_range) - - async def get_last_created(self, *, tx_range, sort_key=lambda x: x.timestamp): - from ..tx import CompletedTX - fns = [f for f in self.dir.iterdir() if f.name.endswith(self.subext)] - files = sorted( - [await CompletedTX(cfg=self.cfg, filename=str(txfile), quiet_open=True) - for txfile in fns], - key = sort_key) - if files: - return files[len(files) - 1 - tx_range.last:len(files) - tx_range.first] - else: - die(1, 'No sent automount transactions!') - - class xmr_signable: # mixin class - automount = True - summary_footer = '' - - def need_daemon_restart(self, m, new_idx): - old_idx = self.parent.xmr_cur_wallet_idx - self.parent.xmr_cur_wallet_idx = new_idx - return old_idx != new_idx or m.wd.state != 'ready' - - def print_summary(self, signables): - bmsg('\nAutosign summary:') - msg('\n'.join(s.get_info(indent=' ') for s in signables) + self.summary_footer) - - class xmr_transaction(xmr_signable, automount_transaction): - desc = 'Monero non-compat transaction' - dir_name = 'xmr_tx_dir' - rawext = 'rawtx' - sigext = 'sigtx' - subext = 'subtx' - - async def sign(self, f, compat_call=False): - from .. import xmrwallet - from ..xmrwallet.file.tx import MoneroMMGenTX - tx1 = MoneroMMGenTX.Completed(self.parent.xmrwallet_cfg, f) - m = xmrwallet.op( - 'sign', - self.parent.xmrwallet_cfg, - infile = str(self.parent.wallet_files[0]), # MMGen wallet file - wallets = str(tx1.src_wallet_idx), - compat_call = compat_call) - tx2 = await m.main(f, restart_daemon=self.need_daemon_restart(m, tx1.src_wallet_idx)) - tx2.write(ask_write=False) - return tx2 - - class xmr_compat_transaction(xmr_transaction): - desc = 'Monero compat transaction' - dir_name = 'txauto_dir' - rawext = 'arawtx' - sigext = 'asigtx' - subext = 'asubtx' - - class xmr_wallet_outputs_file(xmr_signable, base): - desc = 'Monero wallet outputs file' - dir_name = 'xmr_outputs_dir' - rawext = 'raw' - sigext = 'sig' - clean_all = True - summary_footer = '\n' - - @property - def unsigned(self): - import json - return tuple( - f for f in super().unsigned - if not json.loads(f.read_text())['MoneroMMGenWalletOutputsFile']['data']['imported']) - - async def sign(self, f): - from .. import xmrwallet - wallet_idx = xmrwallet.op_cls('wallet').get_idx_from_fn(f) - m = xmrwallet.op( - 'import_outputs', - self.parent.xmrwallet_cfg, - infile = str(self.parent.wallet_files[0]), # MMGen wallet file - wallets = str(wallet_idx)) - obj = await m.main(f, wallet_idx, restart_daemon=self.need_daemon_restart(m, wallet_idx)) - obj.write(quiet=not obj.data.sign) - self.action_desc = 'imported and signed' if obj.data.sign else 'imported' - return obj - - class message(base): - desc = 'message file' - dir_name = 'msg_dir' - rawext = 'rawmsg.json' - sigext = 'sigmsg.json' - fail_msg = 'failed to sign or signed incompletely' - - async def sign(self, f): - from ..msg import UnsignedMsg, SignedMsg - m = UnsignedMsg(self.cfg, infile=f) - await m.sign(wallet_files=self.parent.wallet_files[:], passwd_file=str(self.parent.keyfile)) - m = SignedMsg(self.cfg, data=m.__dict__) - m.write_to_file( - outdir = self.dir.resolve(), - ask_overwrite = False) - if m.data.get('failed_sids'): - die( - 'MsgFileFailedSID', - f'Failed Seed IDs: {fmt_list(m.data["failed_sids"], fmt="bare")}') - return m - - def print_summary(self, signables): - gmsg('\nSigned message files:') - for message in signables: - gmsg(' ' + message.signed_filename) - - def gen_bad_list(self, bad_files): - for f in bad_files: - sigfile = f.parent / (f.name[:-len(self.rawext)] + self.sigext) - yield orange(sigfile.name) if sigfile.exists() else red(f.name) - class Autosign: dev_label = 'MMGEN_TX' @@ -490,6 +64,15 @@ class Autosign: 'xmr_tx_dir': 'xmr/tx', 'xmr_outputs_dir': 'xmr/outputs'} + non_xmr_signables = ( + 'transaction', + 'automount_transaction', + 'message') + + xmr_signables = ( # order is important! + 'xmr_wallet_outputs_file', # import XMR wallet outputs BEFORE signing transactions + 'xmr_transaction') + have_xmr = False xmr_only = False @@ -575,20 +158,23 @@ class Autosign: if not self.xmr_only: self.dirs |= self.non_xmr_dirs - self.signables += Signable.non_xmr_signables + self.signables += self.non_xmr_signables if self.have_xmr: self.dirs |= self.xmr_dirs | ( {'txauto_dir': 'txauto'} if cfg.xmrwallet_compat and self.xmr_only else {}) self.signables = ( - Signable.xmr_signables # xmr_wallet_outputs_file must be signed before XMR TXs + self.xmr_signables # xmr_wallet_outputs_file must be signed before XMR TXs + (('automount_transaction',) if cfg.xmrwallet_compat and self.xmr_only else ()) + self.signables) # self.signables could contain compat XMR TXs for name, path in self.dirs.items(): setattr(self, name, self.mountpoint / path) - self.swap = SwapMgr(self.cfg, ignore_zram=True) + @cached_property + def swap(self): + from .swap_mgr import SwapMgr + return SwapMgr(self.cfg, ignore_zram=True) async def check_daemons_running(self): from ..protocol import init_proto @@ -683,6 +269,7 @@ class Autosign: return not fails async def sign_all(self, target_name): + from .signable import Signable target = getattr(Signable, target_name)(self) if target.unsigned: good = [] @@ -892,6 +479,7 @@ class Autosign: if raw.is_file(): do_shred(raw) + from .signable import Signable s = getattr(Signable, s_name)(self) msg_r(f'Cleaning directory ‘{s.dir}’..') diff --git a/mmgen/autosign/signable.py b/mmgen/autosign/signable.py new file mode 100755 index 00000000..3c05bcc0 --- /dev/null +++ b/mmgen/autosign/signable.py @@ -0,0 +1,337 @@ +#!/usr/bin/env python3 +# +# MMGen Wallet, a terminal-based cryptocurrency wallet +# Copyright (C)2013-2026 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen-wallet +# https://gitlab.com/mmgen/mmgen-wallet + +""" +autosign.signable: Signable class for MMGen Wallet autosigning +""" + +import sys + +from ..util import msg, msg_r, gmsg, bmsg, die, suf, fmt_list +from ..color import yellow, red, orange +from ..fileutil import shred_file +from ..ui import keypress_confirm + +class Signable: + + class base: + + clean_all = False + multiple_ok = True + action_desc = 'signed' + fail_msg = 'failed to sign' + + def __init__(self, parent): + self.parent = parent + self.cfg = parent.cfg + self.dir = getattr(parent, self.dir_name) + self.name = type(self).__name__ + + @property + def unsigned(self): + return self._unprocessed('_unsigned', self.rawext, self.sigext) + + def _unprocessed(self, attrname, rawext, sigext): + if not hasattr(self, attrname): + dirlist = sorted(self.dir.iterdir()) + names = {f.name for f in dirlist} + setattr( + self, + attrname, + tuple(f for f in dirlist + if f.name.endswith('.' + rawext) + and f.name[:-len(rawext)] + sigext not in names)) + return getattr(self, attrname) + + def print_bad_list(self, bad_files): + msg('\n{a}\n{b}'.format( + a = red(f'Failed {self.desc}s:'), + b = ' {}\n'.format('\n '.join( + self.gen_bad_list(sorted(bad_files, key=lambda f: f.name)))))) + + def gen_bad_list(self, bad_files): + for f in bad_files: + yield red(f.name) + + class transaction(base): + desc = 'non-automount transaction' + dir_name = 'tx_dir' + rawext = 'rawtx' + sigext = 'sigtx' + automount = False + + async def sign(self, f): + from ..tx import UnsignedTX + tx1 = UnsignedTX( + cfg = self.cfg, + filename = f, + automount = self.automount) + if tx1.proto.coin == 'XMR': + ctx = Signable.xmr_compat_transaction(self.parent) + for k in ('desc', 'print_summary', 'print_bad_list'): + setattr(self, k, getattr(ctx, k)) + return await ctx.sign(f, compat_call=True) + if tx1.proto.sign_mode == 'daemon': + from ..rpc import rpc_init + tx1.rpc = await rpc_init(self.cfg, tx1.proto, ignore_wallet=True) + from ..tx.keys import TxKeys + tx2 = await tx1.sign( + TxKeys( + self.cfg, + tx1, + seedfiles = self.parent.wallet_files[:], + keylist = self.parent.keylist, + passwdfile = str(self.parent.keyfile), + autosign = True).keys) + if tx2: + tx2.file.write(ask_write=False, outdir=self.dir) + return tx2 + else: + return False + + def print_summary(self, signables): + + if self.cfg.full_summary: + bmsg('\nAutosign summary:\n') + msg_r('\n'.join(tx.info.format(terse=True) for tx in signables)) + return + + def gen(): + for tx in signables: + non_mmgen = [o for o in tx.outputs if not o.mmid] + if non_mmgen: + yield (tx, non_mmgen) + + body = list(gen()) + + if body: + bmsg('\nAutosign summary:') + fs = '{} {} {}' + t_wid, a_wid = 6, 44 + + def gen(): + yield fs.format('TX ID ', 'Non-MMGen outputs'+' '*(a_wid-17), 'Amount') + yield fs.format('-'*t_wid, '-'*a_wid, '-'*7) + for tx, non_mmgen in body: + for nm in non_mmgen: + yield fs.format( + tx.txid.fmt(t_wid, color=True) if nm is non_mmgen[0] else ' '*t_wid, + nm.addr.fmt(nm.addr.view_pref, a_wid, color=True), + nm.amt.hl() + ' ' + yellow(tx.coin)) + + msg('\n' + '\n'.join(gen())) + else: + msg('\nNo non-MMGen outputs') + + class automount_transaction(transaction): + desc = 'automount transaction' + dir_name = 'txauto_dir' + rawext = 'arawtx' + sigext = 'asigtx' + subext = 'asubtx' + multiple_ok = False + automount = True + + @property + def unsubmitted(self): + return self._unprocessed('_unsubmitted', self.sigext, self.subext) + + @property + def unsubmitted_raw(self): + return self._unprocessed('_unsubmitted_raw', self.rawext, self.subext) + + unsent = unsubmitted + unsent_raw = unsubmitted_raw + + @property + def submitted(self): + return self._processed('_submitted', self.subext) + + def _processed(self, attrname, ext): + if not hasattr(self, attrname): + setattr(self, attrname, tuple(f for f in sorted(self.dir.iterdir()) + if f.name.endswith('.' + ext))) + return getattr(self, attrname) + + def die_wrong_num_txs(self, tx_type, *, msg=None, desc=None, show_dir=False): + match len(getattr(self, tx_type)): # num_txs + case 0: subj, suf, pred = ('No', 's', 'present') + case 1: subj, suf, pred = ('One', '', 'already present') + case _: subj, suf, pred = ('More than one', '', 'already present') + die('AutosignTXError', '{m}{a} {b} transaction{c} {d} {e}!'.format( + m = msg + '\n' if msg else '', + a = subj, + b = desc or tx_type, + c = suf, + d = pred, + e = f'in ‘{getattr(self.parent, self.dir_name)}’' + if show_dir else 'on removable device')) + + def check_create_ok(self): + if len(self.unsigned): + self.die_wrong_num_txs('unsigned', msg='Cannot create transaction') + if len(self.unsent): + die('AutosignTXError', 'Cannot create transaction: you have an unsent transaction') + + def get_unsubmitted(self, tx_type='unsubmitted'): + if len(self.unsubmitted) == 1: + return self.unsubmitted[0] + else: + self.die_wrong_num_txs(tx_type) + + def get_unsent(self): + return self.get_unsubmitted('unsent') + + def get_submitted(self): + if len(self.submitted) == 0: + self.die_wrong_num_txs('submitted') + else: + return self.submitted + + def get_abortable(self): + if len(self.unsent_raw) != 1: + self.die_wrong_num_txs('unsent_raw', desc='unsent') + if len(self.unsent) > 1: + self.die_wrong_num_txs('unsent') + if self.unsent: + if self.unsent[0].stem != self.unsent_raw[0].stem: + die(1, f'{self.unsent[0]}, {self.unsent_raw[0]}: file mismatch') + return self.unsent_raw + self.unsent + + def shred_abortable(self): + files = self.get_abortable() # raises AutosignTXError if no unsent TXs available + keypress_confirm( + self.cfg, + 'The following file{} will be securely deleted:\n{}\nOK?'.format( + suf(files), + fmt_list(map(str, files), fmt='col', indent=' ')), + do_exit = True) + for fn in files: + msg(f'Shredding file ‘{fn}’') + shred_file(self.cfg, fn, iterations=15) + sys.exit(0) + + async def get_last_sent(self, *, tx_range): + return await self.get_last_created( + # compat fallback - ‘sent_timestamp’ attr is missing in some old TX files: + sort_key = lambda x: x.sent_timestamp or x.timestamp, + tx_range = tx_range) + + async def get_last_created(self, *, tx_range, sort_key=lambda x: x.timestamp): + from ..tx import CompletedTX + fns = [f for f in self.dir.iterdir() if f.name.endswith(self.subext)] + files = sorted( + [await CompletedTX(cfg=self.cfg, filename=str(txfile), quiet_open=True) + for txfile in fns], + key = sort_key) + if files: + return files[len(files) - 1 - tx_range.last:len(files) - tx_range.first] + else: + die(1, 'No sent automount transactions!') + + class xmr_signable: # mixin class + automount = True + summary_footer = '' + + def need_daemon_restart(self, m, new_idx): + old_idx = self.parent.xmr_cur_wallet_idx + self.parent.xmr_cur_wallet_idx = new_idx + return old_idx != new_idx or m.wd.state != 'ready' + + def print_summary(self, signables): + bmsg('\nAutosign summary:') + msg('\n'.join(s.get_info(indent=' ') for s in signables) + self.summary_footer) + + class xmr_transaction(xmr_signable, automount_transaction): + desc = 'Monero non-compat transaction' + dir_name = 'xmr_tx_dir' + rawext = 'rawtx' + sigext = 'sigtx' + subext = 'subtx' + + async def sign(self, f, compat_call=False): + from .. import xmrwallet + from ..xmrwallet.file.tx import MoneroMMGenTX + tx1 = MoneroMMGenTX.Completed(self.parent.xmrwallet_cfg, f) + m = xmrwallet.op( + 'sign', + self.parent.xmrwallet_cfg, + infile = str(self.parent.wallet_files[0]), # MMGen wallet file + wallets = str(tx1.src_wallet_idx), + compat_call = compat_call) + tx2 = await m.main(f, restart_daemon=self.need_daemon_restart(m, tx1.src_wallet_idx)) + tx2.write(ask_write=False) + return tx2 + + class xmr_compat_transaction(xmr_transaction): + desc = 'Monero compat transaction' + dir_name = 'txauto_dir' + rawext = 'arawtx' + sigext = 'asigtx' + subext = 'asubtx' + + class xmr_wallet_outputs_file(xmr_signable, base): + desc = 'Monero wallet outputs file' + dir_name = 'xmr_outputs_dir' + rawext = 'raw' + sigext = 'sig' + clean_all = True + summary_footer = '\n' + + @property + def unsigned(self): + import json + return tuple( + f for f in super().unsigned + if not json.loads(f.read_text())['MoneroMMGenWalletOutputsFile']['data']['imported']) + + async def sign(self, f): + from .. import xmrwallet + wallet_idx = xmrwallet.op_cls('wallet').get_idx_from_fn(f) + m = xmrwallet.op( + 'import_outputs', + self.parent.xmrwallet_cfg, + infile = str(self.parent.wallet_files[0]), # MMGen wallet file + wallets = str(wallet_idx)) + obj = await m.main(f, wallet_idx, restart_daemon=self.need_daemon_restart(m, wallet_idx)) + obj.write(quiet=not obj.data.sign) + self.action_desc = 'imported and signed' if obj.data.sign else 'imported' + return obj + + class message(base): + desc = 'message file' + dir_name = 'msg_dir' + rawext = 'rawmsg.json' + sigext = 'sigmsg.json' + fail_msg = 'failed to sign or signed incompletely' + + async def sign(self, f): + from ..msg import UnsignedMsg, SignedMsg + m = UnsignedMsg(self.cfg, infile=f) + await m.sign(wallet_files=self.parent.wallet_files[:], passwd_file=str(self.parent.keyfile)) + m = SignedMsg(self.cfg, data=m.__dict__) + m.write_to_file( + outdir = self.dir.resolve(), + ask_overwrite = False) + if m.data.get('failed_sids'): + die( + 'MsgFileFailedSID', + f'Failed Seed IDs: {fmt_list(m.data["failed_sids"], fmt="bare")}') + return m + + def print_summary(self, signables): + gmsg('\nSigned message files:') + for message in signables: + gmsg(' ' + message.signed_filename) + + def gen_bad_list(self, bad_files): + for f in bad_files: + sigfile = f.parent / (f.name[:-len(self.rawext)] + self.sigext) + yield orange(sigfile.name) if sigfile.exists() else red(f.name) diff --git a/mmgen/autosign/swap_mgr.py b/mmgen/autosign/swap_mgr.py new file mode 100755 index 00000000..cfc0fde1 --- /dev/null +++ b/mmgen/autosign/swap_mgr.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +# +# MMGen Wallet, a terminal-based cryptocurrency wallet +# Copyright (C)2013-2026 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen-wallet +# https://gitlab.com/mmgen/mmgen-wallet + +""" +autosign.swap_mgr: swap management for MMGen Wallet autosigning +""" + +import sys +from subprocess import run, PIPE, DEVNULL + +from ..util import msg, ymsg, suf, fmt_list, have_sudo, capfirst +from ..color import orange, blue + +def SwapMgr(*args, **kwargs): + match sys.platform: + case 'linux': + return SwapMgrLinux(*args, **kwargs) + case 'darwin': + return SwapMgrMacOS(*args, **kwargs) + +class SwapMgrBase: + + def __init__(self, cfg, *, ignore_zram=False): + self.cfg = cfg + self.ignore_zram = ignore_zram + self.desc = 'disk swap' if ignore_zram else 'swap' + + def enable(self, *, quiet=False): + ret = self.do_enable() + if not quiet: + self.cfg._util.qmsg( + f'{capfirst(self.desc)} successfully enabled' if ret else + f'{capfirst(self.desc)} is already enabled' if ret is None else + f'Could not enable {self.desc}') + return ret + + def disable(self, *, quiet=False): + self.cfg._util.qmsg_r(f'Attempting to disable {self.desc}...') + ret = self.do_disable() + self.cfg._util.qmsg('success') + if not quiet: + self.cfg._util.qmsg( + f'{capfirst(self.desc)} successfully disabled ({fmt_list(ret, fmt="no_quotes")})' + if ret and isinstance(ret, list) else + f'{capfirst(self.desc)} successfully disabled' if ret else + f'No active {self.desc}') + return ret + + def process_cmds(self, op, cmds): + if not cmds: + return + if have_sudo(silent=True) and not self.cfg.test_suite: + for cmd in cmds: + run(cmd.split(), check=True) + else: + pre = 'failure\n' if op == 'disable' else '' + fs = blue('{a} {b} manually by executing the following command{c}:\n{d}') + post = orange('[To prevent this message in the future, enable sudo without a password]') + m = pre + fs.format( + a = 'Please disable' if op == 'disable' else 'Enable', + b = self.desc, + c = suf(cmds), + d = fmt_list(cmds, indent=' ', fmt='col')) + '\n' + post + msg(m) + if not self.cfg.test_suite: + sys.exit(1) + +class SwapMgrLinux(SwapMgrBase): + + def get_active(self): + for cmd in ('/sbin/swapon', 'swapon'): + try: + cp = run([cmd, '--show=NAME', '--noheadings'], stdout=PIPE, text=True, check=True) + break + except Exception: + if cmd == 'swapon': + raise + res = cp.stdout.splitlines() + return [e for e in res if not e.startswith('/dev/zram')] if self.ignore_zram else res + + def do_enable(self): + if ret := self.get_active(): + ymsg(f'Warning: {self.desc} is already enabled: ({fmt_list(ret, fmt="no_quotes")})') + self.process_cmds('enable', ['sudo swapon --all']) + return True + + def do_disable(self): + swapdevs = self.get_active() + if not swapdevs: + return None + self.process_cmds('disable', [f'sudo swapoff {swapdev}' for swapdev in swapdevs]) + return swapdevs + +class SwapMgrMacOS(SwapMgrBase): + + def get_active(self): + cmd = 'launchctl print system/com.apple.dynamic_pager' + return run(cmd.split(), stdout=DEVNULL, stderr=DEVNULL).returncode == 0 + + def _do_action(self, active, op, cmd): + if self.get_active() is active: + return None + else: + cmd = f'sudo launchctl {cmd} -w /System/Library/LaunchDaemons/com.apple.dynamic_pager.plist' + self.process_cmds(op, [cmd]) + return True + + def do_enable(self): + return self._do_action(active=True, op='enable', cmd='load') + + def do_disable(self): + return self._do_action(active=False, op='disable', cmd='unload') diff --git a/mmgen/main_txbump.py b/mmgen/main_txbump.py index 2b0ce7d6..3bdaced9 100755 --- a/mmgen/main_txbump.py +++ b/mmgen/main_txbump.py @@ -168,7 +168,7 @@ async def main(): if cfg.autosign: from .tx.util import mount_removable_device - from .autosign import Signable + from .autosign.signable import Signable asi = mount_removable_device(cfg) si = Signable.automount_transaction(asi) if si.unsigned or si.unsent: diff --git a/mmgen/main_txcreate.py b/mmgen/main_txcreate.py index aa00f968..21e4e167 100755 --- a/mmgen/main_txcreate.py +++ b/mmgen/main_txcreate.py @@ -138,7 +138,7 @@ async def main(): if cfg.autosign: from .tx.util import mount_removable_device - from .autosign import Signable + from .autosign.signable import Signable asi = mount_removable_device(cfg, add_cfg={'xmrwallet_compat': True}) Signable.automount_transaction(asi).check_create_ok() diff --git a/mmgen/main_txsend.py b/mmgen/main_txsend.py index 791b742b..2df93583 100755 --- a/mmgen/main_txsend.py +++ b/mmgen/main_txsend.py @@ -109,7 +109,7 @@ def init_autosign(arg): global asi, si, infile, tx_range from .tx.util import mount_removable_device from .tx.online import SentTXRange - from .autosign import Signable + from .autosign.signable import Signable asi = mount_removable_device(cfg) si = Signable.automount_transaction(asi) if cfg.abort: diff --git a/mmgen/xmrwallet/ops/submit.py b/mmgen/xmrwallet/ops/submit.py index e1d0c657..6bfe9501 100755 --- a/mmgen/xmrwallet/ops/submit.py +++ b/mmgen/xmrwallet/ops/submit.py @@ -42,7 +42,7 @@ class OpSubmit(OpWallet): if self.uargs.infile: fn = Path(self.uargs.infile) else: - from ...autosign import Signable + from ...autosign.signable import Signable fn = Signable.xmr_transaction(self.asi).get_unsubmitted() return self.get_tx_cls('ColdSigned')(cfg=self.cfg, fn=fn) @@ -115,7 +115,7 @@ class OpResubmit(OpSubmit): die(1, '--autosign is required for this operation') def get_tx(self): - from ...autosign import Signable + from ...autosign.signable import Signable fns = Signable.xmr_transaction(self.asi).get_submitted() cls = self.get_tx_cls('Submitted') return sorted((cls(self.cfg, Path(fn)) for fn in fns), @@ -127,5 +127,5 @@ class OpAbort(OpBase): def __init__(self, cfg, uarg_tuple): super().__init__(cfg, uarg_tuple) self.mount_removable_device() - from ...autosign import Signable + from ...autosign.signable import Signable Signable.xmr_transaction(self.asi).shred_abortable() # prompts user, then raises exception or exits diff --git a/test/cmdtest_d/autosign.py b/test/cmdtest_d/autosign.py index 8519c5ac..df3354b5 100755 --- a/test/cmdtest_d/autosign.py +++ b/test/cmdtest_d/autosign.py @@ -28,7 +28,8 @@ from mmgen.cfg import Config from mmgen.color import red, blue, yellow, cyan, orange, purple, gray from mmgen.util import msg, suf, die, indent, fmt from mmgen.led import LEDControl -from mmgen.autosign import Autosign, Signable +from mmgen.autosign import Autosign +from mmgen.autosign.signable import Signable from ..include.common import ( omsg, @@ -998,11 +999,11 @@ class CmdTestAutosign(CmdTestAutosignBase): res = t.read() self.remove_device() for signable_list in present: - for signable_clsname in getattr(Signable, signable_list): + for signable_clsname in getattr(Autosign, signable_list): desc = getattr(Signable, signable_clsname).desc assert f'No unsigned {desc}s' in res, f'‘No unsigned {desc}s’ missing in output' for signable_list in absent: - for signable_clsname in getattr(Signable, signable_list): + for signable_clsname in getattr(Autosign, signable_list): desc = getattr(Signable, signable_clsname).desc assert not f'No unsigned {desc}s' in res, f'‘No unsigned {desc}s’ should be absent in output' return t