mmgen-autosign: reimplement using new Autosign class

This commit is contained in:
The MMGen Project 2023-04-18 18:35:55 +00:00
commit 7b56d5bb2d
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
2 changed files with 411 additions and 349 deletions

369
mmgen/autosign.py Executable file
View file

@ -0,0 +1,369 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
autosign: Auto-sign MMGen transactions and message files
"""
import sys,os,asyncio
from subprocess import run,PIPE,DEVNULL
from collections import namedtuple
from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt_list
from .color import yellow,red,orange
from .wallet import Wallet
class Autosign:
dfl_mountpoint = os.path.join(os.sep,'mnt','tx')
wallet_dir = os.path.join(os.sep,'dev','shm','autosign')
disk_label_dir = os.path.join(os.sep,'dev','disk','by-label')
part_label = 'MMGEN_TX'
mn_fmts = {
'mmgen': 'words',
'bip39': 'bip39',
}
dfl_mn_fmt = 'mmgen'
have_msg_dir = False
def __init__(self,cfg):
if cfg.mnemonic_fmt:
if cfg.mnemonic_fmt not in self.mn_fmts:
die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format(
cfg.mnemonic_fmt,
fmt_list( self.mn_fmts, fmt='no_spc' ) ))
self.cfg = cfg
self.mountpoint = cfg.mountpoint or self.dfl_mountpoint
self.tx_dir = os.path.join( self.mountpoint, 'tx' )
self.msg_dir = os.path.join( self.mountpoint, 'msg' )
self.keyfile = os.path.join( self.mountpoint, 'autosign.key' )
cfg.outdir = self.tx_dir
cfg.passwd_file = self.keyfile
async def check_daemons_running(self):
if 'coin' in self.cfg._uopts:
die(1,'--coin option not supported with this command. Use --coins instead')
if self.cfg.coins:
coins = self.cfg.coins.upper().split(',')
else:
ymsg('Warning: no coins specified, defaulting to BTC')
coins = ['BTC']
from .protocol import init_proto
for coin in coins:
proto = init_proto( self.cfg, coin, testnet=self.cfg.network=='testnet', need_amt=True )
if proto.sign_mode == 'daemon':
self.cfg._util.vmsg(f'Checking {coin} daemon')
from .rpc import rpc_init
from .exception import SocketError
try:
await rpc_init( self.cfg, proto )
except SocketError as e:
die(2,f'{coin} daemon not running or not listening on port {proto.rpc_port}')
@property
def wallet_files(self):
if not hasattr(self,'_wallet_files'):
try:
dirlist = os.listdir(self.wallet_dir)
except:
die(1,f'Cannot open wallet directory {self.wallet_dir!r}. Did you run ‘mmgen-autosign setup’?')
fns = [fn for fn in dirlist if fn.endswith('.mmdat')]
if fns:
self._wallet_files = [os.path.join(self.wallet_dir,fn) for fn in fns]
else:
die(1,'No wallet files present!')
return self._wallet_files
def do_mount(self):
if not os.path.ismount(self.mountpoint):
if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0:
msg(f'Mounting {self.mountpoint}')
self.have_msg_dir = os.path.isdir(self.msg_dir)
from stat import S_ISDIR,S_IWUSR,S_IRUSR
for cdir in [self.tx_dir] + ([self.msg_dir] if self.have_msg_dir else []):
try:
ds = os.stat(cdir)
assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!'
except:
die(1,f'{cdir!r} missing or not read/writable by user!')
def do_umount(self):
if os.path.ismount(self.mountpoint):
run( ['sync'], check=True )
msg(f'Unmounting {self.mountpoint}')
run( ['umount',self.mountpoint], check=True )
async def sign_object(self,d,fn):
from .tx import UnsignedTX
from .tx.sign import txsign
from .rpc import rpc_init
try:
if d.desc == 'transaction':
tx1 = UnsignedTX( cfg=self.cfg, filename=fn )
if tx1.proto.sign_mode == 'daemon':
tx1.rpc = await rpc_init( self.cfg, tx1.proto )
tx2 = await txsign( self.cfg, tx1, self.wallet_files[:], None, None )
if tx2:
tx2.file.write(ask_write=False)
return tx2
else:
return False
elif d.desc == 'message file':
from .msg import UnsignedMsg,SignedMsg
m = UnsignedMsg( self.cfg, infile=fn )
await m.sign( wallet_files=self.wallet_files[:] )
m = SignedMsg( self.cfg, data=m.__dict__ )
m.write_to_file(
outdir = os.path.abspath(self.msg_dir),
ask_overwrite = False )
if m.data.get('failed_sids'):
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
return m
except Exception as e:
ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}')
return False
except:
ymsg(f'An error occurred with {d.desc} {fn!r}')
return False
async def sign(self,target):
_td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc'])
d = {
'msg': _td('message file', 'rawmsg.json', 'sigmsg.json', self.msg_dir, 'sign or signed incompletely'),
'tx': _td('transaction', 'rawtx', 'sigtx', self.tx_dir, 'sign'),
}[target]
raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)]
signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)]
unsigned = [os.path.join( d.dir, fn+d.rawext ) for fn in raw if fn not in signed]
if unsigned:
ok = []
bad = []
for fn in unsigned:
ret = await self.sign_object(d,fn)
if ret:
ok.append(ret)
else:
bad.append(fn)
self.cfg._util.qmsg('')
await asyncio.sleep(0.3)
msg(f'{len(ok)} {d.desc}{suf(ok)} signed')
if bad:
rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}')
if ok and not self.cfg.no_summary:
self.print_summary(d,ok)
if bad:
msg('')
rmsg(f'Failed {d.desc}s:')
def gen_bad_disp():
if d.desc == 'transaction':
for fn in sorted(bad):
yield red(fn)
elif d.desc == 'message file':
for rawfn in sorted(bad):
sigfn = rawfn[:-len(d.rawext)] + d.sigext
yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn)
msg(' {}\n'.format( '\n '.join(gen_bad_disp()) ))
return False if bad else True
else:
msg(f'No unsigned {d.desc}s')
await asyncio.sleep(0.5)
return True
def decrypt_wallets(self):
msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}')
fails = 0
for wf in self.wallet_files:
try:
Wallet( self.cfg, wf, ignore_in_fmt=True )
except SystemExit as e:
if e.code != 0:
fails += 1
return False if fails else True
def print_summary(self,d,signed_objects):
if d.desc == 'message file':
gmsg('\nSigned message files:')
for m in signed_objects:
gmsg(' ' + os.path.join( self.msg_dir, m.signed_filename ))
return
if self.cfg.full_summary:
bmsg('\nAutosign summary:\n')
def gen():
for tx in signed_objects:
yield tx.info.format(terse=True)
msg_r('\n'.join(gen()))
return
def gen():
for tx in signed_objects:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx,non_mmgen)
body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid,a_wid = 6,44
def gen():
yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
for tx,non_mmgen in body:
for nm in non_mmgen:
yield fs.format(
tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid,
nm.addr.fmt( width=a_wid, color=True ),
nm.amt.hl() + ' ' + yellow(tx.coin))
msg('\n' + '\n'.join(gen()))
else:
msg('\nNo non-MMGen outputs')
async def do_sign(self):
if not self.cfg.stealth_led:
self.led.set('busy')
self.do_mount()
key_ok = self.decrypt_wallets()
if key_ok:
if self.cfg.stealth_led:
self.led.set('busy')
ret1 = await self.sign('tx')
ret2 = await self.sign('msg') if self.have_msg_dir else True
ret = ret1 and ret2
self.do_umount()
self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)])
return ret
else:
msg('Password is incorrect!')
self.do_umount()
if not self.cfg.stealth_led:
self.led.set('error')
return False
def wipe_existing_key(self):
try: os.stat(self.keyfile)
except: pass
else:
from .fileutil import shred_file
msg(f'\nShredding existing key {self.keyfile!r}')
shred_file( self.keyfile, verbose=self.cfg.verbose )
def create_key(self):
kdata = os.urandom(32).hex()
desc = f'key file {self.keyfile!r}'
msg('Creating ' + desc)
try:
with open(self.keyfile,'w') as fp:
fp.write(kdata+'\n')
os.chmod(self.keyfile,0o400)
msg('Wrote ' + desc)
except:
die(2,'Unable to write ' + desc)
def gen_key(self,no_unmount=False):
self.create_wallet_dir()
if not self.get_insert_status():
die(1,'Removable device not present!')
self.do_mount()
self.wipe_existing_key()
self.create_key()
if not no_unmount:
self.do_umount()
def remove_wallet_dir(self):
msg(f'Deleting {self.wallet_dir!r}')
import shutil
try: shutil.rmtree(self.wallet_dir)
except: pass
def create_wallet_dir(self):
try: os.mkdir(self.wallet_dir)
except: pass
try: os.stat(self.wallet_dir)
except: die(2,f'Unable to create wallet directory {self.wallet_dir!r}')
def setup(self):
self.remove_wallet_dir()
self.gen_key(no_unmount=True)
ss_in = Wallet( self.cfg, in_fmt=self.mn_fmts[self.cfg.mnemonic_fmt or self.dfl_mn_fmt] )
ss_out = Wallet( self.cfg, ss=ss_in )
ss_out.write_to_file( desc='autosign wallet', outdir=self.wallet_dir )
def get_insert_status(self):
if self.cfg.no_insert_check:
return True
try: os.stat(os.path.join( self.disk_label_dir, self.part_label ))
except: return False
else: return True
async def do_loop(self):
n,prev_status = 0,False
if not self.cfg.stealth_led:
self.led.set('standby')
while True:
status = self.get_insert_status()
if status and not prev_status:
msg('Device insertion detected')
await self.do_sign()
prev_status = status
if not n % 10:
msg_r(f"\r{' '*17}\rWaiting")
sys.stderr.flush()
await asyncio.sleep(1)
msg_r('.')
n += 1
def at_exit(self,exit_val,message=None):
if message:
msg(message)
self.led.stop()
sys.exit(int(exit_val))
def init_exit_handler(self):
def handler(arg1,arg2):
self.at_exit(1,'\nCleaning up...')
import signal
signal.signal( signal.SIGTERM, handler )
signal.signal( signal.SIGINT, handler )
def init_led(self):
from .led import LEDControl
self.led = LEDControl(
enabled = self.cfg.led,
simulate = os.getenv('MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE') )
self.led.set('off')

View file

@ -20,40 +20,27 @@
mmgen-autosign: Auto-sign MMGen transactions and message files
"""
import sys,os,asyncio,signal,shutil
from subprocess import run,PIPE,DEVNULL
from collections import namedtuple
from stat import *
import sys
from .cfg import Config
from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt_list,async_run,exit_if_mswin
from .color import yellow,red,orange
from .util import die,fmt_list,exit_if_mswin,async_run
mountpoint = '/mnt/tx'
tx_dir = '/mnt/tx/tx'
msg_dir = '/mnt/tx/msg'
part_label = 'MMGEN_TX'
wallet_dir = '/dev/shm/autosign'
mn_fmts = {
'mmgen': 'words',
'bip39': 'bip39',
}
mn_fmt_dfl = 'mmgen'
exit_if_mswin('autosigning')
opts_data = {
'sets': [('stealth_led', True, 'led', True)],
'text': {
'desc': 'Auto-sign MMGen transactions and message files',
'usage':'[opts] [command]',
'options': f"""
'options': """
-h, --help Print this help message
--, --longhelp Print help message for long options (common options)
-c, --coins=c Coins to sign for (comma-separated list)
-I, --no-insert-check Dont check for device insertion
-l, --led Use status LED to signal standby, busy and error
-m, --mountpoint=M Specify an alternate mountpoint 'M' (default: {mountpoint!r})
-m, --mountpoint=M Specify an alternate mountpoint 'M' (default: {asi.dfl_mountpoint!r})
-M, --mnemonic-fmt=F During setup, prompt for mnemonic seed phrase of format
'F' (choices: {fmt_list(mn_fmts,fmt='no_spc')}; default: {mn_fmt_dfl!r})
'F' (choices: {mn_fmts}; default: {asi.dfl_mn_fmt!r})
-n, --no-summary Dont print a transaction summary
-s, --stealth-led Stealth LED mode - signal busy and error only, and only
after successful authorization.
@ -63,11 +50,12 @@ opts_data = {
-q, --quiet Produce quieter output
-v, --verbose Produce more verbose output
""",
'notes': f"""
'notes': """
COMMANDS
gen_key - generate the wallet encryption key and copy it to {mountpoint!r}
gen_key - generate the wallet encryption key and copy it to the mountpoint
(currently configured as {asi.mountpoint!r})
setup - generate the wallet encryption key and wallet
wait - start in loop mode: wait-mount-sign-unmount-wait
@ -91,14 +79,15 @@ writable root directory and a directory named '/tx', where unsigned MMGen
transactions are placed. Optionally, the directory '/msg' may also be created
and unsigned message files created by `mmgen-msg` placed in this directory.
On the signing machine the mount point {mountpoint!r} must exist and /etc/fstab
must contain the following entry:
On the signing machine the mount point (currently configured as {asi.mountpoint!r})
must exist and /etc/fstab must contain the following entry:
LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0
Transactions are signed with a wallet on the signing machine (in the directory
{wallet_dir!r}) encrypted with a 64-character hexadecimal password saved
in the file `autosign.key` in the root of the removable device partition.
Transactions are signed with a wallet on the signing machine located in the wallet
directory (currently configured as {asi.wallet_dir!r}) encrypted with a 64-character
hexadecimal password saved in the file `autosign.key` in the root of the removable
device partition.
The password and wallet can be created in one operation by invoking the
command with 'setup' with the removable device inserted. In this case, the
@ -108,7 +97,7 @@ Alternatively, the password and wallet can be created separately by first
invoking the command with 'gen_key' and then creating and encrypting the
wallet using the -P (--passwd-file) option:
$ mmgen-walletconv -r0 -q -iwords -d{wallet_dir} -p1 -P/mnt/tx/autosign.key -Llabel
$ mmgen-walletconv -r0 -q -iwords -d{asi.wallet_dir} -p1 -P/mnt/tx/autosign.key -Llabel
Note that the hash preset must be '1'. Multiple wallets are permissible.
@ -117,6 +106,13 @@ each signing session.
This command is currently available only on Linux-based platforms.
"""
},
'code': {
'options': lambda s: s.format(
asi = asi,
mn_fmts = fmt_list( asi.mn_fmts, fmt='no_spc' ),
),
'notes': lambda s: s.format(asi=asi)
}
}
@ -128,347 +124,44 @@ cfg = Config(
'usr_randchars': 0,
'hash_preset': '1',
'label': 'Autosign Wallet',
})
},
do_post_init = True )
cmd_args = cfg._args
type(cfg)._set_ok += ('outdir','passwd_file')
exit_if_mswin('autosigning')
from .autosign import Autosign
asi = Autosign(cfg)
if cfg.mnemonic_fmt:
if cfg.mnemonic_fmt not in mn_fmts:
die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format(
cfg.mnemonic_fmt,
fmt_list(mn_fmts,fmt='no_spc') ))
from .wallet import Wallet
from .tx import UnsignedTX
from .tx.sign import txsign
from .protocol import init_proto
from .rpc import rpc_init
if cfg.mountpoint:
mountpoint = cfg.mountpoint
keyfile = os.path.join(mountpoint,'autosign.key')
msg_dir = os.path.join(mountpoint,'msg')
tx_dir = os.path.join(mountpoint,'tx')
cfg.outdir = tx_dir
cfg.passwd_file = keyfile
async def check_daemons_running():
if cfg.coin != type(cfg).coin:
die(1,'--coin option not supported with this command. Use --coins instead')
if cfg.coins:
coins = cfg.coins.upper().split(',')
else:
ymsg('Warning: no coins specified, defaulting to BTC')
coins = ['BTC']
for coin in coins:
proto = init_proto( cfg, coin, testnet=cfg.network=='testnet', need_amt=True )
if proto.sign_mode == 'daemon':
cfg._util.vmsg(f'Checking {coin} daemon')
from .exception import SocketError
try:
await rpc_init(cfg,proto)
except SocketError as e:
die(2,f'{coin} daemon not running or not listening on port {proto.rpc_port}')
def get_wallet_files():
try:
dlist = os.listdir(wallet_dir)
except:
die(1,f"Cannot open wallet directory {wallet_dir!r}. Did you run 'mmgen-autosign setup'?")
fns = [x for x in dlist if x.endswith('.mmdat')]
if fns:
return [os.path.join(wallet_dir,w) for w in fns]
else:
die(1,'No wallet files present!')
def do_mount():
if not os.path.ismount(mountpoint):
if run(['mount',mountpoint],stderr=DEVNULL,stdout=DEVNULL).returncode == 0:
msg(f'Mounting {mountpoint}')
global have_msg_dir
have_msg_dir = os.path.isdir(msg_dir)
for cdir in [tx_dir] + ([msg_dir] if have_msg_dir else []):
try:
ds = os.stat(cdir)
assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!'
except:
die(1,f'{cdir!r} missing or not read/writable by user!')
def do_umount():
if os.path.ismount(mountpoint):
run(['sync'],check=True)
msg(f'Unmounting {mountpoint}')
run(['umount',mountpoint],check=True)
async def sign_object(d,fn):
try:
if d.desc == 'transaction':
tx1 = UnsignedTX(cfg=cfg,filename=fn)
if tx1.proto.sign_mode == 'daemon':
tx1.rpc = await rpc_init(cfg,tx1.proto)
tx2 = await txsign(cfg,tx1,wfs[:],None,None)
if tx2:
tx2.file.write(ask_write=False)
return tx2
else:
return False
elif d.desc == 'message file':
from .msg import UnsignedMsg,SignedMsg
m = UnsignedMsg(cfg,infile=fn)
await m.sign(wallet_files=wfs[:])
m = SignedMsg(cfg,data=m.__dict__)
m.write_to_file(
outdir = os.path.abspath(msg_dir),
ask_overwrite = False )
if m.data.get('failed_sids'):
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
return m
except Exception as e:
ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}')
return False
except:
ymsg(f'An error occurred with {d.desc} {fn!r}')
return False
async def sign(target):
td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc'])
d = {
'msg': td('message file', 'rawmsg.json', 'sigmsg.json', msg_dir, 'sign or signed incompletely'),
'tx': td('transaction', 'rawtx', 'sigtx', tx_dir, 'sign'),
}[target]
raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)]
signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)]
unsigned = [os.path.join(d.dir,fn+d.rawext) for fn in raw if fn not in signed]
if unsigned:
ok,bad = ([],[])
for fn in unsigned:
ret = await sign_object(d,fn)
if ret:
ok.append(ret)
else:
bad.append(fn)
cfg._util.qmsg('')
await asyncio.sleep(0.3)
msg(f'{len(ok)} {d.desc}{suf(ok)} signed')
if bad:
rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}')
if ok and not cfg.no_summary:
print_summary(d,ok)
if bad:
msg('')
rmsg(f'Failed {d.desc}s:')
def gen_bad_disp():
if d.desc == 'transaction':
for fn in sorted(bad):
yield red(fn)
elif d.desc == 'message file':
for rawfn in sorted(bad):
sigfn = rawfn[:-len(d.rawext)] + d.sigext
yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn)
msg(' {}\n'.format( '\n '.join(gen_bad_disp()) ))
return False if bad else True
else:
msg(f'No unsigned {d.desc}s')
await asyncio.sleep(0.5)
return True
def decrypt_wallets():
msg(f'Unlocking wallet{suf(wfs)} with key from {cfg.passwd_file!r}')
fails = 0
for wf in wfs:
try:
Wallet(cfg,wf,ignore_in_fmt=True)
except SystemExit as e:
if e.code != 0:
fails += 1
return False if fails else True
def print_summary(d,signed_objects):
if d.desc == 'message file':
gmsg('\nSigned message files:')
for m in signed_objects:
gmsg(' ' + os.path.join(msg_dir,m.signed_filename) )
return
if cfg.full_summary:
bmsg('\nAutosign summary:\n')
def gen():
for tx in signed_objects:
yield tx.info.format(terse=True)
msg_r(''.join(gen()))
return
def gen():
for tx in signed_objects:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx,non_mmgen)
body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid,a_wid = 6,44
def gen():
yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
for tx,non_mmgen in body:
for nm in non_mmgen:
yield fs.format(
tx.txid.fmt(width=t_wid,color=True) if nm is non_mmgen[0] else ' '*t_wid,
nm.addr.fmt(width=a_wid,color=True),
nm.amt.hl() + ' ' + yellow(tx.coin))
msg('\n'.join(gen()))
else:
msg('No non-MMGen outputs')
async def do_sign():
if not cfg.stealth_led:
led.set('busy')
do_mount()
key_ok = decrypt_wallets()
if key_ok:
if cfg.stealth_led:
led.set('busy')
ret1 = await sign('tx')
ret2 = await sign('msg') if have_msg_dir else True
ret = ret1 and ret2
do_umount()
led.set(('standby','off','error')[(not ret)*2 or bool(cfg.stealth_led)])
return ret
else:
msg('Password is incorrect!')
do_umount()
if not cfg.stealth_led:
led.set('error')
return False
def wipe_existing_key():
try: os.stat(keyfile)
except: pass
else:
from .fileutil import shred_file
msg(f'\nShredding existing key {keyfile!r}')
shred_file( keyfile, verbose=cfg.verbose )
def create_key():
kdata = os.urandom(32).hex()
desc = f'key file {keyfile!r}'
msg('Creating ' + desc)
try:
with open(keyfile,'w') as fp:
fp.write(kdata+'\n')
os.chmod(keyfile,0o400)
msg('Wrote ' + desc)
except:
die(2,'Unable to write ' + desc)
def gen_key(no_unmount=False):
create_wallet_dir()
if not get_insert_status():
die(1,'Removable device not present!')
do_mount()
wipe_existing_key()
create_key()
if not no_unmount:
do_umount()
def remove_wallet_dir():
msg(f'Deleting {wallet_dir!r}')
try: shutil.rmtree(wallet_dir)
except: pass
def create_wallet_dir():
try: os.mkdir(wallet_dir)
except: pass
try: os.stat(wallet_dir)
except: die(2,f'Unable to create wallet directory {wallet_dir!r}')
def setup():
remove_wallet_dir()
gen_key(no_unmount=True)
ss_in = Wallet(cfg,in_fmt=mn_fmts[cfg.mnemonic_fmt or mn_fmt_dfl])
ss_out = Wallet(cfg,ss=ss_in)
ss_out.write_to_file(desc='autosign wallet',outdir=wallet_dir)
def get_insert_status():
if cfg.no_insert_check:
return True
try: os.stat(os.path.join('/dev/disk/by-label',part_label))
except: return False
else: return True
async def do_loop():
n,prev_status = 0,False
if not cfg.stealth_led:
led.set('standby')
while True:
status = get_insert_status()
if status and not prev_status:
msg('Device insertion detected')
await do_sign()
prev_status = status
if not n % 10:
msg_r(f"\r{' '*17}\rWaiting")
sys.stderr.flush()
await asyncio.sleep(1)
msg_r('.')
n += 1
cfg._post_init()
if len(cmd_args) not in (0,1):
cfg._opts.usage()
if len(cmd_args) == 1:
cmd = cmd_args[0]
if cmd in ('gen_key','setup'):
(gen_key if cmd == 'gen_key' else setup)()
if cmd == 'gen_key':
asi.gen_key()
sys.exit(0)
elif cmd == 'setup':
asi.setup()
sys.exit(0)
elif cmd != 'wait':
die(1,f'{cmd!r}: unrecognized command')
wfs = get_wallet_files()
asi.init_led()
def at_exit(exit_val,message='\nCleaning up...'):
if message:
msg(message)
led.stop()
sys.exit(exit_val)
def handler(a,b):
at_exit(1)
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
from .led import LEDControl
led = LEDControl(
enabled = cfg.led,
simulate = os.getenv('MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE') )
led.set('off')
asi.init_exit_handler()
async def main():
await check_daemons_running()
if len(cmd_args) == 0:
ret = await do_sign()
at_exit(int(not ret),message='')
await asi.check_daemons_running()
if not cmd_args:
ret = await asi.do_sign()
asi.at_exit(not ret)
elif cmd_args[0] == 'wait':
await do_loop()
await asi.do_loop()
async_run(main())