#!/usr/bin/env python # # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution # Copyright (C)2013-2017 Philemon # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ mmgen-autosign: Auto-sign MMGen transactions """ import sys,os,subprocess,time,signal from stat import * mountpoint = '/mnt/tx' tx_dir = os.path.join(mountpoint,'tx') part_label = 'MMGEN_TX' shm_dir = '/dev/shm' secret_fn = 'txsign-secret' from mmgen.common import * prog_name = os.path.basename(sys.argv[0]) opts_data = lambda: { 'desc': 'Auto-sign MMGen transactions', 'usage':'[opts] [command]', 'options': """ -h, --help Print this help message -c, --coins=c Coins to sign for (comma-separated list) -l, --led Use status LED to signal standby, busy and error -s, --stealth-led Stealth LED mode - signal busy and error only, and only after successful authorization. -q, --quiet Produce quieter output -v, --verbose Produce more verbose output """, 'notes': """ COMMANDS gen_secret - generate the shared secret and copy to /dev/shm and USB stick wait - start in loop mode: wait - mount - sign - unmount - wait USAGE NOTES If invoked with no command, the program mounts the USB stick, signs any unsigned transactions, unmounts the USB stick and exits. If invoked with 'wait', the program waits in a loop, mounting, signing and unmounting every time the USB stick is inserted. On supported platforms, the status LED indicates whether the program is busy or in standby mode, i.e. ready for USB stick insertion or removal. The USB stick must have a partition labeled MMGEN_TX and a user-writable directory '/tx', where unsigned MMGen transactions are placed. On the signing machine the directory /mnt/tx must exist and /etc/fstab must contain the following entry: LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0 The signing wallet or wallets must be in MMGen mnemonic format and present in /dev/shm. The wallet(s) can be created interactively with the following command: $ mmgen-walletconv -i words -o words -d /dev/shm {} checks that a shared secret is present on the USB stick before signing transactions. The shared secret is generated by invoking the command with 'gen_secret' with the USB stick inserted. For good security, it's advisable to re-generate a new shared secret before each signing session. Status LED functionality is supported on Orange Pi and Raspberry Pi boards. This program is a helper script and is not installed by default. You may copy it to your executable path if you wish, or just run it in place in the scripts directory of the MMGen repository root where it resides. """.format(prog_name) } cmd_args = opts.init(opts_data,add_opts=['mmgen_keys_from_file','in_fmt']) import mmgen.tx from mmgen.txsign import txsign from mmgen.protocol import CoinProtocol if opt.stealth_led: opt.led = True opt.outdir = tx_dir def check_daemons_running(): if opt.coin: die(1,'--coin option not supported with this command. Use --coins instead') if opt.coins: coins = opt.coins.split(',') else: ymsg('Warning: no coins specified, so defaulting to BTC only') coins = ['btc'] for coin in coins: cmd = [ 'mmgen-tool', '--coin={}'.format(coin), '--testnet={}'.format(opt.testnet or 0) ] + ([],['--quiet'])[bool(opt.quiet)] + ['getbalance'] vmsg('Executing: {}'.format(' '.join(cmd))) try: subprocess.check_output(cmd) except: die(1,'{} daemon not running'.format(coin.upper())) def get_wallet_files(): wfs = [f for f in os.listdir(shm_dir) if f[-8:] == '.mmwords'] if not wfs: die(1,'No mnemonic files present!') return [os.path.join(shm_dir,w) for w in wfs] def get_secret_in_dir(d,on_fail='die'): fn = os.path.join(d,secret_fn) try: with open(fn) as f: ret = f.read().rstrip() assert is_hex_str(ret) and len(ret) == 32 return ret except: msg("Secret file '{}' non-existent, unreadable or in incorrect format!".format(fn)) if on_fail == 'die': sys.exit(1) else: return None def do_mount(): if not os.path.ismount(mountpoint): msg('Mounting '+mountpoint) subprocess.call(['mount',mountpoint]) try: ds = os.stat(tx_dir) assert S_ISDIR(ds.st_mode) assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR except: die(1,'{} missing, or not read/writable by user!'.format(tx_dir)) def do_umount(): if os.path.ismount(mountpoint): subprocess.call(['sync']) msg('Unmounting '+mountpoint) subprocess.call(['umount',mountpoint]) def sign_tx_file(txfile): try: g.coin = mmgen.tx.MMGenTX(txfile,md_only=True).coin g.proto = CoinProtocol(g.coin,g.testnet) reload(sys.modules['mmgen.tx']) tx = mmgen.tx.MMGenTX(txfile) rpc_init(reinit=True) txsign(tx,wfs,None,None) tx.write_to_file(ask_write=False) return True except: return False def sign(): dirlist = os.listdir(tx_dir) raw = [f for f in dirlist if f[-6:] == '.rawtx'] signed = [f[:-6] for f in dirlist if f[-6:] == '.sigtx'] unsigned = [os.path.join(tx_dir,f) for f in raw if f[:-6] not in signed] if unsigned: fails = 0 for txfile in unsigned: ret = sign_tx_file(txfile) if not ret: fails += 1 qmsg('') if fails: ymsg('{} failed signs'.format(fails)) time.sleep(0.3) return False if fails else True else: msg('No unsigned transactions') time.sleep(1) return True def do_sign(): if not opt.stealth_led: set_led('busy') do_mount() ret = get_secret_in_dir(tx_dir,on_fail='return') if ret == secret: if opt.stealth_led: set_led('busy') ret = sign() do_umount() set_led(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)]) else: if ret != None: msg('Secret is incorrect!') do_umount() if not opt.stealth_led: set_led('error') def wipe_existing_secret_files(): for d in (tx_dir,shm_dir): fn = os.path.join(d,secret_fn) try: os.stat(fn) except: pass else: msg('\nWiping existing key {}'.format(fn)) subprocess.call(['wipe','-c',fn]) def create_secret_files(): from binascii import hexlify secret = hexlify(os.urandom(16)) for d in (tx_dir,shm_dir): fn = os.path.join(d,secret_fn) desc = 'secret file in {}'.format(d) msg('Creating ' + desc) try: with open(fn,'w') as f: f.write(secret+'\n') os.chmod(fn,0400) msg('Wrote ' + desc) except: die(2,'Unable to write ' + desc) def do_create_secret_files(): if not get_insert_status(): die(2,'USB stick not present!') do_mount() wipe_existing_secret_files() create_secret_files() do_umount() def ev_sleep(secs): ev.wait(secs) return (False,True)[ev.isSet()] def do_led(on,off): if not on: with open(status_ctl,'w') as f: f.write('0\n') while True: if ev_sleep(3600): return while True: with open(status_ctl,'w') as f: f.write('255\n') if ev_sleep(on): return with open(status_ctl,'w') as f: f.write('0\n') if ev_sleep(off): return def set_led(cmd): if not opt.led: return vmsg("Setting LED state to '{}'".format(cmd)) timings = { 'off': ( 0, 0 ), 'standby': ( 2.2, 0.2 ), 'busy': ( 0.06, 0.06 ), 'error': ( 0.5, 0.5 )}[cmd] global led_thread if led_thread: ev.set(); led_thread.join(); ev.clear() led_thread = threading.Thread(target=do_led,name='LED loop',args=timings) led_thread.start() def get_insert_status(): try: os.stat(os.path.join('/dev/disk/by-label/',part_label)) except: return False else: return True def do_loop(): n,prev_status = 0,False if not opt.stealth_led: set_led('standby') while True: status = get_insert_status() if status and not prev_status: msg('Running command...') do_sign() prev_status = status if not n % 10: msg_r('\r{}\rWaiting'.format(' '*17)) time.sleep(1) msg_r('.') n += 1 def check_access(fn,desc='status LED control',init_val=None): try: with open(fn) as f: b = f.read().strip() with open(fn,'w') as f: f.write('{}\n'.format(init_val or b)) return True except: m1 = "You do not have access to the {} file\n".format(desc) m2 = "To allow access, run 'sudo chmod 0666 {}'".format(fn) msg(m1+m2) return False def check_wipe_present(): try: subprocess.Popen(['wipe','-v'],stdout=subprocess.PIPE,stderr=subprocess.PIPE) except: die(2,"The 'wipe' utility must be installed before running this program") def at_exit(nl=True): if opt.led: set_led('off') ev.set() led_thread.join() if board == 'rpi': with open(trigger[board],'w') as f: f.write('mmc0\n') if nl: msg('') raise SystemExit def handler(a,b): at_exit() # main() if len(cmd_args) == 1 and cmd_args[0] == 'gen_secret': do_create_secret_files() sys.exit() if opt.led: import threading status = { 'opi': '/sys/class/leds/orangepi:red:status/brightness', 'rpi': '/sys/class/leds/led0/brightness' } trigger = { 'rpi': '/sys/class/leds/led0/trigger', # mmc,none } for k in ('opi','rpi'): try: os.stat(status[k]) except: pass else: board = k status_ctl = status[board] break else: die(2,'Control files not found! LED option not supported') if not check_access(status_ctl) or ( board == 'rpi' and not check_access(trigger[board],desc='LED trigger',init_val='none')): sys.exit(1) ev = threading.Event() led_thread = None if board == 'rpi': with open(trigger[board],'w') as f: f.write('none\n') check_wipe_present() wfs = get_wallet_files() secret = get_secret_in_dir(shm_dir,on_fail='die') check_daemons_running() #sign(); sys.exit() signal.signal(signal.SIGTERM,handler) signal.signal(signal.SIGINT,handler) try: if len(cmd_args) == 1 and cmd_args[0] == 'wait': do_loop() elif len(cmd_args) == 0: do_sign() at_exit(nl=False) else: msg('Invalid invocation') except IOError: at_exit() except KeyboardInterrupt: at_exit()