From 7b56d5bb2d749aa3c6d7363c5c58d2827cf53aa9 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Tue, 18 Apr 2023 18:35:55 +0000 Subject: [PATCH] mmgen-autosign: reimplement using new Autosign class --- mmgen/autosign.py | 369 ++++++++++++++++++++++++++++++++++++++ mmgen/main_autosign.py | 389 +++++------------------------------------ 2 files changed, 410 insertions(+), 348 deletions(-) create mode 100755 mmgen/autosign.py diff --git a/mmgen/autosign.py b/mmgen/autosign.py new file mode 100755 index 00000000..c9bcae18 --- /dev/null +++ b/mmgen/autosign.py @@ -0,0 +1,369 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet +# Copyright (C)2013-2023 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen +# https://gitlab.com/mmgen/mmgen + +""" +autosign: Auto-sign MMGen transactions and message files +""" + +import sys,os,asyncio +from subprocess import run,PIPE,DEVNULL +from collections import namedtuple + +from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt_list +from .color import yellow,red,orange +from .wallet import Wallet + +class Autosign: + + dfl_mountpoint = os.path.join(os.sep,'mnt','tx') + wallet_dir = os.path.join(os.sep,'dev','shm','autosign') + disk_label_dir = os.path.join(os.sep,'dev','disk','by-label') + part_label = 'MMGEN_TX' + + mn_fmts = { + 'mmgen': 'words', + 'bip39': 'bip39', + } + dfl_mn_fmt = 'mmgen' + + have_msg_dir = False + + def __init__(self,cfg): + + if cfg.mnemonic_fmt: + if cfg.mnemonic_fmt not in self.mn_fmts: + die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format( + cfg.mnemonic_fmt, + fmt_list( self.mn_fmts, fmt='no_spc' ) )) + + self.cfg = cfg + + self.mountpoint = cfg.mountpoint or self.dfl_mountpoint + + self.tx_dir = os.path.join( self.mountpoint, 'tx' ) + self.msg_dir = os.path.join( self.mountpoint, 'msg' ) + self.keyfile = os.path.join( self.mountpoint, 'autosign.key' ) + + cfg.outdir = self.tx_dir + cfg.passwd_file = self.keyfile + + async def check_daemons_running(self): + + if 'coin' in self.cfg._uopts: + die(1,'--coin option not supported with this command. Use --coins instead') + + if self.cfg.coins: + coins = self.cfg.coins.upper().split(',') + else: + ymsg('Warning: no coins specified, defaulting to BTC') + coins = ['BTC'] + + from .protocol import init_proto + for coin in coins: + proto = init_proto( self.cfg, coin, testnet=self.cfg.network=='testnet', need_amt=True ) + if proto.sign_mode == 'daemon': + self.cfg._util.vmsg(f'Checking {coin} daemon') + from .rpc import rpc_init + from .exception import SocketError + try: + await rpc_init( self.cfg, proto ) + except SocketError as e: + die(2,f'{coin} daemon not running or not listening on port {proto.rpc_port}') + @property + def wallet_files(self): + + if not hasattr(self,'_wallet_files'): + + try: + dirlist = os.listdir(self.wallet_dir) + except: + die(1,f'Cannot open wallet directory {self.wallet_dir!r}. Did you run ‘mmgen-autosign setup’?') + + fns = [fn for fn in dirlist if fn.endswith('.mmdat')] + if fns: + self._wallet_files = [os.path.join(self.wallet_dir,fn) for fn in fns] + else: + die(1,'No wallet files present!') + + return self._wallet_files + + def do_mount(self): + + if not os.path.ismount(self.mountpoint): + if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0: + msg(f'Mounting {self.mountpoint}') + + self.have_msg_dir = os.path.isdir(self.msg_dir) + + from stat import S_ISDIR,S_IWUSR,S_IRUSR + for cdir in [self.tx_dir] + ([self.msg_dir] if self.have_msg_dir else []): + try: + ds = os.stat(cdir) + assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!' + assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!' + except: + die(1,f'{cdir!r} missing or not read/writable by user!') + + def do_umount(self): + if os.path.ismount(self.mountpoint): + run( ['sync'], check=True ) + msg(f'Unmounting {self.mountpoint}') + run( ['umount',self.mountpoint], check=True ) + + async def sign_object(self,d,fn): + from .tx import UnsignedTX + from .tx.sign import txsign + from .rpc import rpc_init + try: + if d.desc == 'transaction': + tx1 = UnsignedTX( cfg=self.cfg, filename=fn ) + if tx1.proto.sign_mode == 'daemon': + tx1.rpc = await rpc_init( self.cfg, tx1.proto ) + tx2 = await txsign( self.cfg, tx1, self.wallet_files[:], None, None ) + if tx2: + tx2.file.write(ask_write=False) + return tx2 + else: + return False + elif d.desc == 'message file': + from .msg import UnsignedMsg,SignedMsg + m = UnsignedMsg( self.cfg, infile=fn ) + await m.sign( wallet_files=self.wallet_files[:] ) + m = SignedMsg( self.cfg, data=m.__dict__ ) + m.write_to_file( + outdir = os.path.abspath(self.msg_dir), + ask_overwrite = False ) + if m.data.get('failed_sids'): + die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}') + return m + except Exception as e: + ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}') + return False + except: + ymsg(f'An error occurred with {d.desc} {fn!r}') + return False + + async def sign(self,target): + + _td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc']) + + d = { + 'msg': _td('message file', 'rawmsg.json', 'sigmsg.json', self.msg_dir, 'sign or signed incompletely'), + 'tx': _td('transaction', 'rawtx', 'sigtx', self.tx_dir, 'sign'), + }[target] + + raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)] + signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)] + unsigned = [os.path.join( d.dir, fn+d.rawext ) for fn in raw if fn not in signed] + + if unsigned: + ok = [] + bad = [] + for fn in unsigned: + ret = await self.sign_object(d,fn) + if ret: + ok.append(ret) + else: + bad.append(fn) + self.cfg._util.qmsg('') + await asyncio.sleep(0.3) + msg(f'{len(ok)} {d.desc}{suf(ok)} signed') + if bad: + rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}') + if ok and not self.cfg.no_summary: + self.print_summary(d,ok) + if bad: + msg('') + rmsg(f'Failed {d.desc}s:') + def gen_bad_disp(): + if d.desc == 'transaction': + for fn in sorted(bad): + yield red(fn) + elif d.desc == 'message file': + for rawfn in sorted(bad): + sigfn = rawfn[:-len(d.rawext)] + d.sigext + yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn) + msg(' {}\n'.format( '\n '.join(gen_bad_disp()) )) + return False if bad else True + else: + msg(f'No unsigned {d.desc}s') + await asyncio.sleep(0.5) + return True + + def decrypt_wallets(self): + msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}') + fails = 0 + for wf in self.wallet_files: + try: + Wallet( self.cfg, wf, ignore_in_fmt=True ) + except SystemExit as e: + if e.code != 0: + fails += 1 + + return False if fails else True + + def print_summary(self,d,signed_objects): + + if d.desc == 'message file': + gmsg('\nSigned message files:') + for m in signed_objects: + gmsg(' ' + os.path.join( self.msg_dir, m.signed_filename )) + return + + if self.cfg.full_summary: + bmsg('\nAutosign summary:\n') + def gen(): + for tx in signed_objects: + yield tx.info.format(terse=True) + msg_r('\n'.join(gen())) + return + + def gen(): + for tx in signed_objects: + non_mmgen = [o for o in tx.outputs if not o.mmid] + if non_mmgen: + yield (tx,non_mmgen) + + body = list(gen()) + + if body: + bmsg('\nAutosign summary:') + fs = '{} {} {}' + t_wid,a_wid = 6,44 + + def gen(): + yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount') + yield fs.format('-'*t_wid, '-'*a_wid, '-'*7) + for tx,non_mmgen in body: + for nm in non_mmgen: + yield fs.format( + tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid, + nm.addr.fmt( width=a_wid, color=True ), + nm.amt.hl() + ' ' + yellow(tx.coin)) + + msg('\n' + '\n'.join(gen())) + else: + msg('\nNo non-MMGen outputs') + + async def do_sign(self): + if not self.cfg.stealth_led: + self.led.set('busy') + self.do_mount() + key_ok = self.decrypt_wallets() + if key_ok: + if self.cfg.stealth_led: + self.led.set('busy') + ret1 = await self.sign('tx') + ret2 = await self.sign('msg') if self.have_msg_dir else True + ret = ret1 and ret2 + self.do_umount() + self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)]) + return ret + else: + msg('Password is incorrect!') + self.do_umount() + if not self.cfg.stealth_led: + self.led.set('error') + return False + + def wipe_existing_key(self): + try: os.stat(self.keyfile) + except: pass + else: + from .fileutil import shred_file + msg(f'\nShredding existing key {self.keyfile!r}') + shred_file( self.keyfile, verbose=self.cfg.verbose ) + + def create_key(self): + kdata = os.urandom(32).hex() + desc = f'key file {self.keyfile!r}' + msg('Creating ' + desc) + try: + with open(self.keyfile,'w') as fp: + fp.write(kdata+'\n') + os.chmod(self.keyfile,0o400) + msg('Wrote ' + desc) + except: + die(2,'Unable to write ' + desc) + + def gen_key(self,no_unmount=False): + self.create_wallet_dir() + if not self.get_insert_status(): + die(1,'Removable device not present!') + self.do_mount() + self.wipe_existing_key() + self.create_key() + if not no_unmount: + self.do_umount() + + def remove_wallet_dir(self): + msg(f'Deleting {self.wallet_dir!r}') + import shutil + try: shutil.rmtree(self.wallet_dir) + except: pass + + def create_wallet_dir(self): + try: os.mkdir(self.wallet_dir) + except: pass + try: os.stat(self.wallet_dir) + except: die(2,f'Unable to create wallet directory {self.wallet_dir!r}') + + def setup(self): + self.remove_wallet_dir() + self.gen_key(no_unmount=True) + ss_in = Wallet( self.cfg, in_fmt=self.mn_fmts[self.cfg.mnemonic_fmt or self.dfl_mn_fmt] ) + ss_out = Wallet( self.cfg, ss=ss_in ) + ss_out.write_to_file( desc='autosign wallet', outdir=self.wallet_dir ) + + def get_insert_status(self): + if self.cfg.no_insert_check: + return True + try: os.stat(os.path.join( self.disk_label_dir, self.part_label )) + except: return False + else: return True + + async def do_loop(self): + n,prev_status = 0,False + if not self.cfg.stealth_led: + self.led.set('standby') + while True: + status = self.get_insert_status() + if status and not prev_status: + msg('Device insertion detected') + await self.do_sign() + prev_status = status + if not n % 10: + msg_r(f"\r{' '*17}\rWaiting") + sys.stderr.flush() + await asyncio.sleep(1) + msg_r('.') + n += 1 + + def at_exit(self,exit_val,message=None): + if message: + msg(message) + self.led.stop() + sys.exit(int(exit_val)) + + def init_exit_handler(self): + + def handler(arg1,arg2): + self.at_exit(1,'\nCleaning up...') + + import signal + signal.signal( signal.SIGTERM, handler ) + signal.signal( signal.SIGINT, handler ) + + def init_led(self): + from .led import LEDControl + self.led = LEDControl( + enabled = self.cfg.led, + simulate = os.getenv('MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE') ) + self.led.set('off') diff --git a/mmgen/main_autosign.py b/mmgen/main_autosign.py index ace29476..fca6a88b 100755 --- a/mmgen/main_autosign.py +++ b/mmgen/main_autosign.py @@ -20,40 +20,27 @@ mmgen-autosign: Auto-sign MMGen transactions and message files """ -import sys,os,asyncio,signal,shutil -from subprocess import run,PIPE,DEVNULL -from collections import namedtuple -from stat import * +import sys from .cfg import Config -from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt_list,async_run,exit_if_mswin -from .color import yellow,red,orange +from .util import die,fmt_list,exit_if_mswin,async_run -mountpoint = '/mnt/tx' -tx_dir = '/mnt/tx/tx' -msg_dir = '/mnt/tx/msg' -part_label = 'MMGEN_TX' -wallet_dir = '/dev/shm/autosign' -mn_fmts = { - 'mmgen': 'words', - 'bip39': 'bip39', -} -mn_fmt_dfl = 'mmgen' +exit_if_mswin('autosigning') opts_data = { 'sets': [('stealth_led', True, 'led', True)], 'text': { 'desc': 'Auto-sign MMGen transactions and message files', 'usage':'[opts] [command]', - 'options': f""" + 'options': """ -h, --help Print this help message --, --longhelp Print help message for long options (common options) -c, --coins=c Coins to sign for (comma-separated list) -I, --no-insert-check Don’t check for device insertion -l, --led Use status LED to signal standby, busy and error --m, --mountpoint=M Specify an alternate mountpoint 'M' (default: {mountpoint!r}) +-m, --mountpoint=M Specify an alternate mountpoint 'M' (default: {asi.dfl_mountpoint!r}) -M, --mnemonic-fmt=F During setup, prompt for mnemonic seed phrase of format - 'F' (choices: {fmt_list(mn_fmts,fmt='no_spc')}; default: {mn_fmt_dfl!r}) + 'F' (choices: {mn_fmts}; default: {asi.dfl_mn_fmt!r}) -n, --no-summary Don’t print a transaction summary -s, --stealth-led Stealth LED mode - signal busy and error only, and only after successful authorization. @@ -63,11 +50,12 @@ opts_data = { -q, --quiet Produce quieter output -v, --verbose Produce more verbose output """, - 'notes': f""" + 'notes': """ COMMANDS -gen_key - generate the wallet encryption key and copy it to {mountpoint!r} +gen_key - generate the wallet encryption key and copy it to the mountpoint + (currently configured as {asi.mountpoint!r}) setup - generate the wallet encryption key and wallet wait - start in loop mode: wait-mount-sign-unmount-wait @@ -91,14 +79,15 @@ writable root directory and a directory named '/tx', where unsigned MMGen transactions are placed. Optionally, the directory '/msg' may also be created and unsigned message files created by `mmgen-msg` placed in this directory. -On the signing machine the mount point {mountpoint!r} must exist and /etc/fstab -must contain the following entry: +On the signing machine the mount point (currently configured as {asi.mountpoint!r}) +must exist and /etc/fstab must contain the following entry: LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0 -Transactions are signed with a wallet on the signing machine (in the directory -{wallet_dir!r}) encrypted with a 64-character hexadecimal password saved -in the file `autosign.key` in the root of the removable device partition. +Transactions are signed with a wallet on the signing machine located in the wallet +directory (currently configured as {asi.wallet_dir!r}) encrypted with a 64-character +hexadecimal password saved in the file `autosign.key` in the root of the removable +device partition. The password and wallet can be created in one operation by invoking the command with 'setup' with the removable device inserted. In this case, the @@ -108,7 +97,7 @@ Alternatively, the password and wallet can be created separately by first invoking the command with 'gen_key' and then creating and encrypting the wallet using the -P (--passwd-file) option: - $ mmgen-walletconv -r0 -q -iwords -d{wallet_dir} -p1 -P/mnt/tx/autosign.key -Llabel + $ mmgen-walletconv -r0 -q -iwords -d{asi.wallet_dir} -p1 -P/mnt/tx/autosign.key -Llabel Note that the hash preset must be '1'. Multiple wallets are permissible. @@ -117,6 +106,13 @@ each signing session. This command is currently available only on Linux-based platforms. """ + }, + 'code': { + 'options': lambda s: s.format( + asi = asi, + mn_fmts = fmt_list( asi.mn_fmts, fmt='no_spc' ), + ), + 'notes': lambda s: s.format(asi=asi) } } @@ -128,347 +124,44 @@ cfg = Config( 'usr_randchars': 0, 'hash_preset': '1', 'label': 'Autosign Wallet', - }) + }, + do_post_init = True ) cmd_args = cfg._args type(cfg)._set_ok += ('outdir','passwd_file') -exit_if_mswin('autosigning') +from .autosign import Autosign +asi = Autosign(cfg) -if cfg.mnemonic_fmt: - if cfg.mnemonic_fmt not in mn_fmts: - die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format( - cfg.mnemonic_fmt, - fmt_list(mn_fmts,fmt='no_spc') )) - -from .wallet import Wallet -from .tx import UnsignedTX -from .tx.sign import txsign -from .protocol import init_proto -from .rpc import rpc_init - -if cfg.mountpoint: - mountpoint = cfg.mountpoint - -keyfile = os.path.join(mountpoint,'autosign.key') -msg_dir = os.path.join(mountpoint,'msg') -tx_dir = os.path.join(mountpoint,'tx') - -cfg.outdir = tx_dir -cfg.passwd_file = keyfile - -async def check_daemons_running(): - if cfg.coin != type(cfg).coin: - die(1,'--coin option not supported with this command. Use --coins instead') - if cfg.coins: - coins = cfg.coins.upper().split(',') - else: - ymsg('Warning: no coins specified, defaulting to BTC') - coins = ['BTC'] - - for coin in coins: - proto = init_proto( cfg, coin, testnet=cfg.network=='testnet', need_amt=True ) - if proto.sign_mode == 'daemon': - cfg._util.vmsg(f'Checking {coin} daemon') - from .exception import SocketError - try: - await rpc_init(cfg,proto) - except SocketError as e: - die(2,f'{coin} daemon not running or not listening on port {proto.rpc_port}') - -def get_wallet_files(): - try: - dlist = os.listdir(wallet_dir) - except: - die(1,f"Cannot open wallet directory {wallet_dir!r}. Did you run 'mmgen-autosign setup'?") - - fns = [x for x in dlist if x.endswith('.mmdat')] - if fns: - return [os.path.join(wallet_dir,w) for w in fns] - else: - die(1,'No wallet files present!') - -def do_mount(): - if not os.path.ismount(mountpoint): - if run(['mount',mountpoint],stderr=DEVNULL,stdout=DEVNULL).returncode == 0: - msg(f'Mounting {mountpoint}') - global have_msg_dir - have_msg_dir = os.path.isdir(msg_dir) - for cdir in [tx_dir] + ([msg_dir] if have_msg_dir else []): - try: - ds = os.stat(cdir) - assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!' - assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!' - except: - die(1,f'{cdir!r} missing or not read/writable by user!') - -def do_umount(): - if os.path.ismount(mountpoint): - run(['sync'],check=True) - msg(f'Unmounting {mountpoint}') - run(['umount',mountpoint],check=True) - -async def sign_object(d,fn): - try: - if d.desc == 'transaction': - tx1 = UnsignedTX(cfg=cfg,filename=fn) - if tx1.proto.sign_mode == 'daemon': - tx1.rpc = await rpc_init(cfg,tx1.proto) - tx2 = await txsign(cfg,tx1,wfs[:],None,None) - if tx2: - tx2.file.write(ask_write=False) - return tx2 - else: - return False - elif d.desc == 'message file': - from .msg import UnsignedMsg,SignedMsg - m = UnsignedMsg(cfg,infile=fn) - await m.sign(wallet_files=wfs[:]) - m = SignedMsg(cfg,data=m.__dict__) - m.write_to_file( - outdir = os.path.abspath(msg_dir), - ask_overwrite = False ) - if m.data.get('failed_sids'): - die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}') - return m - except Exception as e: - ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}') - return False - except: - ymsg(f'An error occurred with {d.desc} {fn!r}') - return False - -async def sign(target): - td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc']) - d = { - 'msg': td('message file', 'rawmsg.json', 'sigmsg.json', msg_dir, 'sign or signed incompletely'), - 'tx': td('transaction', 'rawtx', 'sigtx', tx_dir, 'sign'), - }[target] - - raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)] - signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)] - unsigned = [os.path.join(d.dir,fn+d.rawext) for fn in raw if fn not in signed] - - if unsigned: - ok,bad = ([],[]) - for fn in unsigned: - ret = await sign_object(d,fn) - if ret: - ok.append(ret) - else: - bad.append(fn) - cfg._util.qmsg('') - await asyncio.sleep(0.3) - msg(f'{len(ok)} {d.desc}{suf(ok)} signed') - if bad: - rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}') - if ok and not cfg.no_summary: - print_summary(d,ok) - if bad: - msg('') - rmsg(f'Failed {d.desc}s:') - def gen_bad_disp(): - if d.desc == 'transaction': - for fn in sorted(bad): - yield red(fn) - elif d.desc == 'message file': - for rawfn in sorted(bad): - sigfn = rawfn[:-len(d.rawext)] + d.sigext - yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn) - msg(' {}\n'.format( '\n '.join(gen_bad_disp()) )) - return False if bad else True - else: - msg(f'No unsigned {d.desc}s') - await asyncio.sleep(0.5) - return True - -def decrypt_wallets(): - msg(f'Unlocking wallet{suf(wfs)} with key from {cfg.passwd_file!r}') - fails = 0 - for wf in wfs: - try: - Wallet(cfg,wf,ignore_in_fmt=True) - except SystemExit as e: - if e.code != 0: - fails += 1 - - return False if fails else True - -def print_summary(d,signed_objects): - - if d.desc == 'message file': - gmsg('\nSigned message files:') - for m in signed_objects: - gmsg(' ' + os.path.join(msg_dir,m.signed_filename) ) - return - - if cfg.full_summary: - bmsg('\nAutosign summary:\n') - def gen(): - for tx in signed_objects: - yield tx.info.format(terse=True) - msg_r(''.join(gen())) - return - - def gen(): - for tx in signed_objects: - non_mmgen = [o for o in tx.outputs if not o.mmid] - if non_mmgen: - yield (tx,non_mmgen) - - body = list(gen()) - - if body: - bmsg('\nAutosign summary:') - fs = '{} {} {}' - t_wid,a_wid = 6,44 - - def gen(): - yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount') - yield fs.format('-'*t_wid, '-'*a_wid, '-'*7) - for tx,non_mmgen in body: - for nm in non_mmgen: - yield fs.format( - tx.txid.fmt(width=t_wid,color=True) if nm is non_mmgen[0] else ' '*t_wid, - nm.addr.fmt(width=a_wid,color=True), - nm.amt.hl() + ' ' + yellow(tx.coin)) - - msg('\n'.join(gen())) - else: - msg('No non-MMGen outputs') - -async def do_sign(): - if not cfg.stealth_led: - led.set('busy') - do_mount() - key_ok = decrypt_wallets() - if key_ok: - if cfg.stealth_led: - led.set('busy') - ret1 = await sign('tx') - ret2 = await sign('msg') if have_msg_dir else True - ret = ret1 and ret2 - do_umount() - led.set(('standby','off','error')[(not ret)*2 or bool(cfg.stealth_led)]) - return ret - else: - msg('Password is incorrect!') - do_umount() - if not cfg.stealth_led: - led.set('error') - return False - -def wipe_existing_key(): - try: os.stat(keyfile) - except: pass - else: - from .fileutil import shred_file - msg(f'\nShredding existing key {keyfile!r}') - shred_file( keyfile, verbose=cfg.verbose ) - -def create_key(): - kdata = os.urandom(32).hex() - desc = f'key file {keyfile!r}' - msg('Creating ' + desc) - try: - with open(keyfile,'w') as fp: - fp.write(kdata+'\n') - os.chmod(keyfile,0o400) - msg('Wrote ' + desc) - except: - die(2,'Unable to write ' + desc) - -def gen_key(no_unmount=False): - create_wallet_dir() - if not get_insert_status(): - die(1,'Removable device not present!') - do_mount() - wipe_existing_key() - create_key() - if not no_unmount: - do_umount() - -def remove_wallet_dir(): - msg(f'Deleting {wallet_dir!r}') - try: shutil.rmtree(wallet_dir) - except: pass - -def create_wallet_dir(): - try: os.mkdir(wallet_dir) - except: pass - try: os.stat(wallet_dir) - except: die(2,f'Unable to create wallet directory {wallet_dir!r}') - -def setup(): - remove_wallet_dir() - gen_key(no_unmount=True) - ss_in = Wallet(cfg,in_fmt=mn_fmts[cfg.mnemonic_fmt or mn_fmt_dfl]) - ss_out = Wallet(cfg,ss=ss_in) - ss_out.write_to_file(desc='autosign wallet',outdir=wallet_dir) - -def get_insert_status(): - if cfg.no_insert_check: - return True - try: os.stat(os.path.join('/dev/disk/by-label',part_label)) - except: return False - else: return True - -async def do_loop(): - n,prev_status = 0,False - if not cfg.stealth_led: - led.set('standby') - while True: - status = get_insert_status() - if status and not prev_status: - msg('Device insertion detected') - await do_sign() - prev_status = status - if not n % 10: - msg_r(f"\r{' '*17}\rWaiting") - sys.stderr.flush() - await asyncio.sleep(1) - msg_r('.') - n += 1 +cfg._post_init() if len(cmd_args) not in (0,1): cfg._opts.usage() if len(cmd_args) == 1: cmd = cmd_args[0] - if cmd in ('gen_key','setup'): - (gen_key if cmd == 'gen_key' else setup)() + if cmd == 'gen_key': + asi.gen_key() + sys.exit(0) + elif cmd == 'setup': + asi.setup() sys.exit(0) elif cmd != 'wait': die(1,f'{cmd!r}: unrecognized command') -wfs = get_wallet_files() +asi.init_led() -def at_exit(exit_val,message='\nCleaning up...'): - if message: - msg(message) - led.stop() - sys.exit(exit_val) - -def handler(a,b): - at_exit(1) - -signal.signal(signal.SIGTERM,handler) -signal.signal(signal.SIGINT,handler) - -from .led import LEDControl -led = LEDControl( - enabled = cfg.led, - simulate = os.getenv('MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE') ) -led.set('off') +asi.init_exit_handler() async def main(): - await check_daemons_running() - if len(cmd_args) == 0: - ret = await do_sign() - at_exit(int(not ret),message='') + await asi.check_daemons_running() + + if not cmd_args: + ret = await asi.do_sign() + asi.at_exit(not ret) elif cmd_args[0] == 'wait': - await do_loop() + await asi.do_loop() async_run(main())