123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- #!/usr/bin/env python3
- #
- # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
- # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
- # Licensed under the GNU General Public License, Version 3:
- # https://www.gnu.org/licenses
- # Public project repositories:
- # https://github.com/mmgen/mmgen
- # https://gitlab.com/mmgen/mmgen
- """
- autosign: Auto-sign MMGen transactions, message files and XMR wallet output files
- """
- import sys,os,asyncio
- from pathlib import Path
- from subprocess import run,PIPE,DEVNULL
- from collections import namedtuple
- from .cfg import Config
- from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt,fmt_list,async_run
- from .color import yellow,red,orange
- from .wallet import Wallet,get_wallet_cls
- from .filename import find_file_in_dir
- from .ui import keypress_confirm
- class AutosignConfig(Config):
- _set_ok = ('usr_randchars','_proto','outdir','passwd_file')
- class Signable:
- signables = ('transaction','message','xmr_transaction','xmr_wallet_outputs_file')
- class base:
- clean_all = False
- multiple_ok = True
- def __init__(self,parent):
- self.parent = parent
- self.cfg = parent.cfg
- self.dir = getattr(parent,self.dir_name)
- @property
- def unsigned(self):
- return self._unprocessed( '_unsigned', self.rawext, self.sigext )
- @property
- def unsubmitted(self):
- return self._unprocessed( '_unsubmitted', self.sigext, self.subext )
- def _unprocessed(self,attrname,rawext,sigext):
- if not hasattr(self,attrname):
- dirlist = tuple(self.dir.iterdir())
- names = tuple(f.name for f in dirlist)
- setattr(
- self,
- attrname,
- tuple(f for f in dirlist
- if f.name.endswith('.' + rawext)
- and f.name[:-len(rawext)] + sigext not in names) )
- return getattr(self,attrname)
- def print_bad_list(self,bad_files):
- msg('\n{a}\n{b}'.format(
- a = red(f'Failed {self.desc}s:'),
- b = ' {}\n'.format('\n '.join(self.gen_bad_list(sorted(bad_files,key=lambda f: f.name))))
- ))
- class transaction(base):
- desc = 'transaction'
- rawext = 'rawtx'
- sigext = 'sigtx'
- dir_name = 'tx_dir'
- fail_msg = 'failed to sign'
- async def sign(self,f):
- from .tx import UnsignedTX
- tx1 = UnsignedTX( cfg=self.cfg, filename=f )
- if tx1.proto.sign_mode == 'daemon':
- from .rpc import rpc_init
- tx1.rpc = await rpc_init( self.cfg, tx1.proto )
- from .tx.sign import txsign
- tx2 = await txsign( self.cfg, tx1, self.parent.wallet_files[:], None, None )
- if tx2:
- tx2.file.write(ask_write=False)
- return tx2
- else:
- return False
- def print_summary(self,txs):
- if self.cfg.full_summary:
- bmsg('\nAutosign summary:\n')
- msg_r('\n'.join(tx.info.format(terse=True) for tx in txs))
- return
- def gen():
- for tx in txs:
- non_mmgen = [o for o in tx.outputs if not o.mmid]
- if non_mmgen:
- yield (tx,non_mmgen)
- body = list(gen())
- if body:
- bmsg('\nAutosign summary:')
- fs = '{} {} {}'
- t_wid,a_wid = 6,44
- def gen():
- yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
- yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
- for tx,non_mmgen in body:
- for nm in non_mmgen:
- yield fs.format(
- tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid,
- nm.addr.fmt( width=a_wid, color=True ),
- nm.amt.hl() + ' ' + yellow(tx.coin))
- msg('\n' + '\n'.join(gen()))
- else:
- msg('\nNo non-MMGen outputs')
- def gen_bad_list(self,bad_files):
- for f in bad_files:
- yield red(f.name)
- class xmr_transaction(transaction):
- dir_name = 'xmr_tx_dir'
- desc = 'Monero transaction'
- subext = 'subtx'
- multiple_ok = False
- async def sign(self,f):
- from .xmrwallet import MoneroMMGenTX,MoneroWalletOps,xmrwallet_uargs
- tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f )
- m = MoneroWalletOps.sign(
- self.parent.xmrwallet_cfg,
- xmrwallet_uargs(
- infile = str(self.parent.wallet_files[0]), # MMGen wallet file
- wallets = str(tx1.src_wallet_idx),
- spec = None ),
- )
- tx2 = await m.main(f) # TODO: stop wallet daemon?
- tx2.write(ask_write=False)
- return tx2
- def print_summary(self,txs):
- bmsg('\nAutosign summary:\n')
- msg_r('\n'.join(tx.get_info() for tx in txs))
- class xmr_wallet_outputs_file(transaction):
- desc = 'Monero wallet outputs file'
- rawext = 'raw'
- sigext = 'sig'
- dir_name = 'xmr_outputs_dir'
- clean_all = True
- async def sign(self,f):
- from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
- wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f)
- m = MoneroWalletOps.export_key_images(
- self.parent.xmrwallet_cfg,
- xmrwallet_uargs(
- infile = str(self.parent.wallet_files[0]), # MMGen wallet file
- wallets = str(wallet_idx),
- spec = None ),
- )
- obj = await m.main( f, wallet_idx )
- obj.write()
- return obj
- def print_summary(self,txs):
- bmsg('\nAutosign summary:')
- msg(' ' + '\n '.join(tx.get_info() for tx in txs) + '\n')
- class message(base):
- desc = 'message file'
- rawext = 'rawmsg.json'
- sigext = 'sigmsg.json'
- dir_name = 'msg_dir'
- fail_msg = 'failed to sign or signed incompletely'
- async def sign(self,f):
- from .msg import UnsignedMsg,SignedMsg
- m = UnsignedMsg( self.cfg, infile=f )
- await m.sign( wallet_files=self.parent.wallet_files[:] )
- m = SignedMsg( self.cfg, data=m.__dict__ )
- m.write_to_file(
- outdir = self.dir.resolve(),
- ask_overwrite = False )
- if m.data.get('failed_sids'):
- die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
- return m
- def print_summary(self,messages):
- gmsg('\nSigned message files:')
- for m in messages:
- gmsg(' ' + m.signed_filename)
- def gen_bad_list(self,bad_files):
- for f in bad_files:
- sigfile = f.parent / ( f.name[:-len(self.rawext)] + self.sigext )
- yield orange(sigfile.name) if sigfile.exists() else red(f.name)
- class Autosign:
- dfl_mountpoint = '/mnt/mmgen_autosign'
- dfl_wallet_dir = '/dev/shm/autosign'
- old_dfl_mountpoint = '/mnt/tx'
- dev_disk_path = Path('/dev/disk/by-label/MMGEN_TX')
- old_dfl_mountpoint_errmsg = f"""
- Mountpoint '{old_dfl_mountpoint}' is no longer supported!
- Please rename '{old_dfl_mountpoint}' to '{dfl_mountpoint}'
- and update your fstab accordingly.
- """
- mountpoint_errmsg_fs = """
- Mountpoint '{}' does not exist or does not point
- to a directory! Please create the mountpoint and add an entry
- to your fstab as described in this script’s help text.
- """
- mn_fmts = {
- 'mmgen': 'words',
- 'bip39': 'bip39',
- }
- dfl_mn_fmt = 'mmgen'
- have_msg_dir = False
- def __init__(self,cfg):
- self.cfg = cfg
- if cfg.mnemonic_fmt:
- if cfg.mnemonic_fmt not in self.mn_fmts:
- die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format(
- cfg.mnemonic_fmt,
- fmt_list( self.mn_fmts, fmt='no_spc' ) ))
- self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint)
- self.wallet_dir = Path(cfg.wallet_dir or self.dfl_wallet_dir)
- self.tx_dir = self.mountpoint / 'tx'
- self.msg_dir = self.mountpoint / 'msg'
- self.keyfile = self.mountpoint / 'autosign.key'
- cfg.outdir = str(self.tx_dir)
- cfg.passwd_file = str(self.keyfile)
- if any(k in cfg._uopts for k in ('help','longhelp')):
- return
- if 'coin' in cfg._uopts:
- die(1,'--coin option not supported with this command. Use --coins instead')
- self.coins = cfg.coins.upper().split(',') if cfg.coins else []
- if cfg._args and cfg._args[0] == 'clean':
- return
- if cfg.xmrwallets and not 'XMR' in self.coins:
- self.coins.append('XMR')
- if not self.coins:
- ymsg('Warning: no coins specified, defaulting to BTC')
- self.coins = ['BTC']
- if 'XMR' in self.coins:
- self.xmr_dir = self.mountpoint / 'xmr'
- self.xmr_tx_dir = self.mountpoint / 'xmr' / 'tx'
- self.xmr_outputs_dir = self.mountpoint / 'xmr' / 'outputs'
- async def check_daemons_running(self):
- from .protocol import init_proto
- for coin in self.coins:
- proto = init_proto( self.cfg, coin, testnet=self.cfg.network=='testnet', need_amt=True )
- if proto.sign_mode == 'daemon':
- self.cfg._util.vmsg(f'Checking {coin} daemon')
- from .rpc import rpc_init
- from .exception import SocketError
- try:
- await rpc_init( self.cfg, proto )
- except SocketError as e:
- from .daemon import CoinDaemon
- d = CoinDaemon( self.cfg, proto=proto, test_suite=self.cfg.test_suite )
- die(2,
- f'\n{e}\nIs the {d.coind_name} daemon ({d.exec_fn}) running '
- + 'and listening on the correct port?' )
- @property
- def wallet_files(self):
- if not hasattr(self,'_wallet_files'):
- try:
- dirlist = self.wallet_dir.iterdir()
- except:
- die(1,f"Cannot open wallet directory '{self.wallet_dir}'. Did you run ‘mmgen-autosign setup’?")
- self._wallet_files = [f for f in dirlist if f.suffix == '.mmdat']
- if not self._wallet_files:
- die(1,'No wallet files present!')
- return self._wallet_files
- def do_mount(self,no_xmr_chk=False):
- from stat import S_ISDIR,S_IWUSR,S_IRUSR
- def check_dir(cdir):
- try:
- ds = cdir.stat()
- assert S_ISDIR(ds.st_mode), f"'{cdir}' is not a directory!"
- assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f"'{cdir}' is not read/write for this user!"
- except:
- die(1,f"'{cdir}' missing or not read/writable by user!")
- if not self.mountpoint.is_dir():
- def do_die(m):
- die(1,'\n' + yellow(fmt(m.strip(),indent=' ')))
- if Path(self.old_dfl_mountpoint).is_dir():
- do_die(self.old_dfl_mountpoint_errmsg)
- else:
- do_die(self.mountpoint_errmsg_fs.format(self.mountpoint))
- if not self.mountpoint.is_mount():
- if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0:
- msg(f"Mounting '{self.mountpoint}'")
- elif not self.cfg.test_suite:
- die(1,f"Unable to mount device at '{self.mountpoint}'")
- self.have_msg_dir = self.msg_dir.is_dir()
- check_dir(self.tx_dir)
- if self.have_msg_dir:
- check_dir(self.msg_dir)
- if 'XMR' in self.coins and not no_xmr_chk:
- check_dir(self.xmr_tx_dir)
- def do_umount(self):
- if self.mountpoint.is_mount():
- run( ['sync'], check=True )
- msg(f"Unmounting '{self.mountpoint}'")
- run( ['umount',self.mountpoint], check=True )
- bmsg('It is now safe to extract the removable device')
- def decrypt_wallets(self):
- msg(f"Unlocking wallet{suf(self.wallet_files)} with key from '{self.cfg.passwd_file}'")
- fails = 0
- for wf in self.wallet_files:
- try:
- Wallet( self.cfg, wf, ignore_in_fmt=True )
- except SystemExit as e:
- if e.code != 0:
- fails += 1
- return False if fails else True
- async def sign_all(self,target_name):
- target = getattr(Signable,target_name)(self)
- if target.unsigned:
- if len(target.unsigned) > 1 and not target.multiple_ok:
- die(f'AutosignTXError', 'Only one unsigned {target.desc} transaction allowed at a time!')
- good = []
- bad = []
- for f in target.unsigned:
- ret = None
- try:
- ret = await target.sign(f)
- except Exception as e:
- ymsg(f"An error occurred with {target.desc} '{f.name}':\n {type(e).__name__}: {e!s}")
- except:
- ymsg(f"An error occurred with {target.desc} '{f.name}'")
- good.append(ret) if ret else bad.append(f)
- self.cfg._util.qmsg('')
- await asyncio.sleep(0.3)
- msg(f'{len(good)} {target.desc}{suf(good)} signed')
- if bad:
- rmsg(f'{len(bad)} {target.desc}{suf(bad)} {target.fail_msg}')
- if good and not self.cfg.no_summary:
- target.print_summary(good)
- if bad:
- target.print_bad_list(bad)
- return not bad
- else:
- msg(f'No unsigned {target.desc}s')
- await asyncio.sleep(0.5)
- return True
- async def do_sign(self):
- if not self.cfg.stealth_led:
- self.led.set('busy')
- self.do_mount()
- key_ok = self.decrypt_wallets()
- if key_ok:
- if self.cfg.stealth_led:
- self.led.set('busy')
- ret1 = await self.sign_all('transaction')
- ret2 = await self.sign_all('message') if self.have_msg_dir else True
- # import XMR wallet outputs BEFORE signing transactions:
- ret3 = await self.sign_all('xmr_wallet_outputs_file') if 'XMR' in self.coins else True
- ret4 = await self.sign_all('xmr_transaction') if 'XMR' in self.coins else True
- ret = ret1 and ret2 and ret3 and ret4
- self.do_umount()
- self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)])
- return ret
- else:
- msg('Password is incorrect!')
- self.do_umount()
- if not self.cfg.stealth_led:
- self.led.set('error')
- return False
- def wipe_existing_key(self):
- try: self.keyfile.stat()
- except: pass
- else:
- from .fileutil import shred_file
- msg(f"\nShredding existing key '{self.keyfile}'")
- shred_file( self.keyfile, verbose=self.cfg.verbose )
- def create_key(self):
- desc = f"key file '{self.keyfile}'"
- msg('Creating ' + desc)
- try:
- self.keyfile.write_text( os.urandom(32).hex() )
- self.keyfile.chmod(0o400)
- except:
- die(2,'Unable to write ' + desc)
- msg('Wrote ' + desc)
- def gen_key(self,no_unmount=False):
- if not self.get_insert_status():
- die(1,'Removable device not present!')
- self.do_mount(no_xmr_chk=True)
- self.wipe_existing_key()
- self.create_key()
- if not no_unmount:
- self.do_umount()
- def setup(self):
- def remove_wallet_dir():
- msg(f"Deleting '{self.wallet_dir}'")
- import shutil
- try: shutil.rmtree(self.wallet_dir)
- except: pass
- def create_wallet_dir():
- try: self.wallet_dir.mkdir(parents=True)
- except: pass
- try: self.wallet_dir.stat()
- except: die(2,f"Unable to create wallet directory '{self.wallet_dir}'")
- remove_wallet_dir()
- create_wallet_dir()
- self.gen_key(no_unmount=True)
- wf = find_file_in_dir( get_wallet_cls('mmgen'), self.cfg.data_dir )
- if wf and keypress_confirm(
- cfg = self.cfg,
- prompt = f"Default wallet '{wf}' found.\nUse default wallet for autosigning?",
- default_yes = True ):
- from .cfg import Config
- ss_in = Wallet( Config(), wf )
- else:
- ss_in = Wallet( self.cfg, in_fmt=self.mn_fmts[self.cfg.mnemonic_fmt or self.dfl_mn_fmt] )
- ss_out = Wallet( self.cfg, ss=ss_in )
- ss_out.write_to_file( desc='autosign wallet', outdir=self.wallet_dir )
- @property
- def xmrwallet_cfg(self):
- if not hasattr(self,'_xmrwallet_cfg'):
- from .cfg import Config
- self._xmrwallet_cfg = Config({
- 'coin': 'xmr',
- 'wallet_rpc_user': 'autosigner',
- 'wallet_rpc_password': 'my very secret password',
- 'passwd_file': self.cfg.passwd_file,
- 'wallet_dir': str(self.wallet_dir),
- 'autosign': True,
- 'autosign_mountpoint': str(self.mountpoint),
- 'outdir': str(self.xmr_dir), # required by vkal.write()
- })
- return self._xmrwallet_cfg
- def xmr_setup(self):
- def create_signing_wallets():
- from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
- if len(self.wallet_files) > 1:
- ymsg(f'Warning: more than one wallet file, using the first ({self.wallet_files[0]}) for xmrwallet generation')
- m = MoneroWalletOps.create_offline(
- self.xmrwallet_cfg,
- xmrwallet_uargs(
- infile = str(self.wallet_files[0]), # MMGen wallet file
- wallets = self.cfg.xmrwallets, # XMR wallet idxs
- spec = None ),
- )
- async_run(m.main())
- async_run(m.stop_wallet_daemon())
- import shutil
- try: shutil.rmtree(self.xmr_outputs_dir)
- except: pass
- self.xmr_outputs_dir.mkdir(parents=True)
- self.xmr_tx_dir.mkdir(exist_ok=True)
- self.clean_old_files()
- create_signing_wallets()
- def clean_old_files(self):
- def do_shred(f):
- nonlocal count
- msg_r('.')
- shred_file( f, verbose=self.cfg.verbose )
- count += 1
- def clean_dir(s_name):
- def clean_files(rawext,sigext):
- for f in s.dir.iterdir():
- if s.clean_all and (f.name.endswith(f'.{rawext}') or f.name.endswith(f'.{sigext}')):
- do_shred(f)
- elif f.name.endswith(f'.{sigext}'):
- raw = f.parent / ( f.name[:-len(sigext)] + rawext )
- if raw.is_file():
- do_shred(raw)
- s = getattr(Signable,s_name)(asi)
- msg_r(f"Cleaning directory '{s.dir}'..")
- if s.dir.is_dir():
- clean_files( s.rawext, s.sigext )
- if hasattr(s,'subext'):
- clean_files( s.rawext, s.subext )
- clean_files( s.sigext, s.subext )
- msg('done' if s.dir.is_dir() else 'skipped (no dir)')
- asi = get_autosign_obj( self.cfg, 'btc,xmr' )
- count = 0
- from .fileutil import shred_file
- for s_name in Signable.signables:
- clean_dir(s_name)
- bmsg(f'{count} file{suf(count)} shredded')
- def get_insert_status(self):
- if self.cfg.no_insert_check:
- return True
- try: self.dev_disk_path.stat()
- except: return False
- else: return True
- async def do_loop(self):
- n,prev_status = 0,False
- if not self.cfg.stealth_led:
- self.led.set('standby')
- while True:
- status = self.get_insert_status()
- if status and not prev_status:
- msg('Device insertion detected')
- await self.do_sign()
- prev_status = status
- if not n % 10:
- msg_r(f"\r{' '*17}\rWaiting")
- sys.stderr.flush()
- await asyncio.sleep(1)
- msg_r('.')
- n += 1
- def at_exit(self,exit_val,message=None):
- if message:
- msg(message)
- self.led.stop()
- sys.exit(int(exit_val))
- def init_exit_handler(self):
- def handler(arg1,arg2):
- self.at_exit(1,'\nCleaning up...')
- import signal
- signal.signal( signal.SIGTERM, handler )
- signal.signal( signal.SIGINT, handler )
- def init_led(self):
- from .led import LEDControl
- self.led = LEDControl(
- enabled = self.cfg.led,
- simulate = os.getenv('MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE') )
- self.led.set('off')
- def get_autosign_obj(cfg,coins=None):
- return Autosign(
- AutosignConfig({
- 'mountpoint': cfg.autosign_mountpoint or cfg.mountpoint,
- 'test_suite': cfg.test_suite,
- 'coins': coins if isinstance(coins,str) else ','.join(coins) if coins else 'btc',
- })
- )
|