mmgen-autosign: encrypt signing wallet with session key on removable device

This commit is contained in:
philemon 2017-10-29 15:06:16 +03:00
commit 8fb3efd99c
Signed by untrusted user who does not match committer: mmgen
GPG key ID: 62DBE9E5212F05BE
12 changed files with 512 additions and 402 deletions

12
INSTALL
View file

@ -1,12 +1,8 @@
MMGen is written in Pure Python and runs on MS Windows and Linux. MMGen is written in Python and builds and runs on MS Windows/MinGW and Linux.
Instructions for installation and use reside on MMGen's Github wiki: Consult the MMGen wiki on Github for instructions on installation and use:
To install MMGen:
https://github.com/mmgen/mmgen/wiki/ https://github.com/mmgen/mmgen/wiki/
To use MMGen: Selected wiki pages in Markdown format can be found under the doc directory
https://github.com/mmgen/mmgen/wiki/Getting-Started-with-MMGen of this distribution.
The wiki pages are duplicated under this distribution's doc directory for
offline reading.

View file

@ -9,6 +9,5 @@ include scripts/compute-file-chksum.py
include scripts/deinstall.sh include scripts/deinstall.sh
include scripts/tx-old2new.py include scripts/tx-old2new.py
include scripts/test-release.sh include scripts/test-release.sh
include scripts/mmgen-autosign
prune test/ref/__db* prune test/ref/__db*

424
cmds/mmgen-autosign Executable file
View file

@ -0,0 +1,424 @@
#!/usr/bin/env python
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
mmgen-autosign: Auto-sign MMGen transactions
"""
import sys,os,subprocess,time,signal,shutil
from stat import *
mountpoint = '/mnt/tx'
tx_dir = '/mnt/tx/tx'
part_label = 'MMGEN_TX'
wallet_dir = '/dev/shm/autosign'
key_fn = 'autosign.key'
from mmgen.common import *
prog_name = os.path.basename(sys.argv[0])
opts_data = lambda: {
'desc': 'Auto-sign MMGen transactions',
'usage':'[opts] [command]',
'options': """
-h, --help Print this help message
--, --longhelp Print help message for long options (common options)
-c, --coins=c Coins to sign for (comma-separated list)
-l, --led Use status LED to signal standby, busy and error
-m, --mountpoint=m Specify an alternate mountpoint (default: '{mp}')
-s, --stealth-led Stealth LED mode - signal busy and error only, and only
after successful authorization.
-q, --quiet Produce quieter output
-v, --verbose Produce more verbose output
""".format(mp=mountpoint),
'notes': """
COMMANDS
gen_key - generate the wallet encryption key and copy it to '{td}'
setup - generate the wallet encryption key and wallet
wait - start in loop mode: wait-mount-sign-unmount-wait
USAGE NOTES
If invoked with no command, the program mounts a removable device containing
MMGen transactions, signs any unsigned transactions, unmounts the removable
device and exits.
If invoked with 'wait', the program waits in a loop, mounting, signing and
unmounting every time the removable device is inserted.
On supported platforms (currently Orange Pi and Raspberry Pi boards), the
status LED indicates whether the program is busy or in standby mode, i.e.
ready for device insertion or removal.
The removable device must have a partition labeled MMGEN_TX and a user-
writable directory '/tx', where unsigned MMGen transactions are placed.
On the signing machine the mount point '{mp}' must exist and /etc/fstab
must contain the following entry:
LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0
Transactions are signed with a wallet on the signing machine (in the directory
'{wd}') encrypted with a 64-character hexadecimal password on the
removable device.
The password and wallet can be created in one operation by invoking the
command with 'setup' with the removable device inserted. The user will be
prompted for a seed mnemonic.
Alternatively, the password and wallet can be created separately by first
invoking the command with 'gen_key' and then creating and encrypting the
wallet using the -P (--passwd-file) option:
$ mmgen-walletconv -r0 -q -iwords -d{wd} -p1 -P{td}/{kf} -Llabel
Note that the hash preset must be '1'. Multiple wallets are permissible.
For good security, it's advisable to re-generate a new wallet and key for
each signing session.
This command is currently available only on Linux-based platforms.
""".format(pnm=prog_name,wd=wallet_dir,td=tx_dir,kf=key_fn,mp=mountpoint)
}
cmd_args = opts.init(opts_data,add_opts=['mmgen_keys_from_file','in_fmt'])
import mmgen.tx
from mmgen.txsign import txsign
from mmgen.protocol import CoinProtocol
if opt.stealth_led: opt.led = True
if opt.mountpoint: mountpoint = opt.mountpoint # TODO: make global
opt.outdir = tx_dir = os.path.join(mountpoint,'tx')
def check_daemons_running():
if opt.coin:
die(1,'--coin option not supported with this command. Use --coins instead')
if opt.coins:
coins = opt.coins.upper().split(',')
else:
ymsg('Warning: no coins specified, so defaulting to BTC only')
coins = ['BTC']
for coin in coins:
g.proto = CoinProtocol(coin,g.testnet)
vmsg('Checking {} daemon'.format(coin))
try:
rpc_init(reinit=True)
g.rpch.getbalance()
except SystemExit as e:
if e[0] != 0:
ydie(1,'{} daemon not running or not listening on port {}'.format(coin,g.proto.rpc_port))
def get_wallet_files():
wfs = [f for f in os.listdir(wallet_dir) if f[-6:] == '.mmdat']
if not wfs:
die(1,'No wallet files present!')
return [os.path.join(wallet_dir,w) for w in wfs]
def do_mount():
if not os.path.ismount(mountpoint):
if subprocess.Popen(['mount',mountpoint],stderr=subprocess.PIPE,stdout=subprocess.PIPE).wait() == 0:
msg('Mounting '+mountpoint)
try:
ds = os.stat(tx_dir)
assert S_ISDIR(ds.st_mode)
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR
except:
die(1,'{} missing, or not read/writable by user!'.format(tx_dir))
def do_umount():
if os.path.ismount(mountpoint):
subprocess.call(['sync'])
msg('Unmounting '+mountpoint)
subprocess.call(['umount',mountpoint])
def sign_tx_file(txfile):
try:
g.coin = mmgen.tx.MMGenTX(txfile,md_only=True).coin
g.proto = CoinProtocol(g.coin,g.testnet)
reload(sys.modules['mmgen.tx'])
tx = mmgen.tx.MMGenTX(txfile)
rpc_init(reinit=True)
txsign(tx,wfs,None,None)
tx.write_to_file(ask_write=False)
return True
except:
return False
def sign():
dirlist = os.listdir(tx_dir)
raw = [f for f in dirlist if f[-6:] == '.rawtx']
signed = [f[:-6] for f in dirlist if f[-6:] == '.sigtx']
unsigned = [os.path.join(tx_dir,f) for f in raw if f[:-6] not in signed]
if unsigned:
fails = 0
for txfile in unsigned:
ret = sign_tx_file(txfile)
if not ret:
fails += 1
qmsg('')
time.sleep(0.3)
n_ok = len(unsigned) - fails
msg('{} transaction{} signed'.format(n_ok,suf(n_ok)))
if fails:
ymsg('{} transaction{} failed to sign'.format(fails,suf(fails)))
return False if fails else True
else:
msg('No unsigned transactions')
time.sleep(1)
return True
def decrypt_wallets():
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
# opt.passwd_file = '/tmp/key'
from mmgen.seed import SeedSource
msg("Trying to unlock wallet{} with key from '{}'".format(suf(wfs),opt.passwd_file))
fails = 0
for wf in wfs:
try:
SeedSource(wf)
except SystemExit as e:
if e[0] != 0:
fails += 1
return False if fails else True
def do_sign():
if not opt.stealth_led: set_led('busy')
do_mount()
key_ok = decrypt_wallets()
if key_ok:
if opt.stealth_led: set_led('busy')
ret = sign()
do_umount()
set_led(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)])
return ret
else:
msg('Password is incorrect!')
do_umount()
if not opt.stealth_led: set_led('error')
return False
def wipe_existing_key():
fn = os.path.join(tx_dir,key_fn)
try: os.stat(fn)
except: pass
else:
msg('\nWiping existing key {}'.format(fn))
subprocess.call(['wipe','-cf',fn])
def create_key():
from binascii import hexlify
kdata = hexlify(os.urandom(32))
fn = os.path.join(tx_dir,key_fn)
desc = 'key file {}'.format(fn)
msg('Creating ' + desc)
try:
with open(fn,'w') as f: f.write(kdata+'\n')
os.chmod(fn,0400)
msg('Wrote ' + desc)
except:
die(2,'Unable to write ' + desc)
def gen_key(no_unmount=False):
if not get_insert_status():
die(2,'Removable device not present!')
do_mount()
wipe_existing_key()
create_key()
if not no_unmount:
do_umount()
def create_wallet_dir():
msg("Deleting '{}'".format(wallet_dir))
try: shutil.rmtree(wallet_dir)
except: pass
try: os.mkdir(wallet_dir)
except: pass
try: os.stat(wallet_dir)
except: die(2,"Unable to create wallet directory '{}'".format(wallet_dir))
def setup():
create_wallet_dir()
gen_key(no_unmount=True)
from mmgen.seed import SeedSource
opt.hidden_incog_input_params = None
opt.quiet = True
opt.in_fmt = 'words'
ss_in = SeedSource()
opt.out_fmt = 'mmdat'
opt.usr_randchars = 0
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
from mmgen.obj import MMGenWalletLabel
opt.label = MMGenWalletLabel('Autosign Wallet')
ss_out = SeedSource(ss=ss_in)
ss_out.write_to_file(desc='autosign wallet',outdir=wallet_dir)
def ev_sleep(secs):
ev.wait(secs)
return (False,True)[ev.isSet()]
def do_led(on,off):
if not on:
with open(status_ctl,'w') as f: f.write('0\n')
while True:
if ev_sleep(3600): return
while True:
for s_time,val in ((on,255),(off,0)):
with open(status_ctl,'w') as f: f.write('{}\n'.format(val))
if ev_sleep(s_time): return
def set_led(cmd):
if not opt.led: return
vmsg("Setting LED state to '{}'".format(cmd))
timings = {
'off': ( 0, 0 ),
'standby': ( 2.2, 0.2 ),
'busy': ( 0.06, 0.06 ),
'error': ( 0.5, 0.5 )}[cmd]
global led_thread
if led_thread:
ev.set(); led_thread.join(); ev.clear()
led_thread = threading.Thread(target=do_led,name='LED loop',args=timings)
led_thread.start()
def get_insert_status():
try: os.stat(os.path.join('/dev/disk/by-label/',part_label))
except: return False
else: return True
def do_loop():
n,prev_status = 0,False
if not opt.stealth_led:
set_led('standby')
while True:
status = get_insert_status()
if status and not prev_status:
msg('Device insert detected')
do_sign()
prev_status = status
if not n % 10:
msg_r('\r{}\rWaiting'.format(' '*17))
time.sleep(1)
msg_r('.')
n += 1
def check_access(fn,desc='status LED control',init_val=None):
try:
with open(fn) as f: b = f.read().strip()
with open(fn,'w') as f:
f.write('{}\n'.format(init_val or b))
return True
except:
m1 = "You do not have access to the {} file\n".format(desc)
m2 = "To allow access, run 'sudo chmod 0666 {}'".format(fn)
msg(m1+m2)
return False
def check_wipe_present():
try:
subprocess.Popen(['wipe','-v'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
except:
die(2,"The 'wipe' utility must be installed before running this program")
def init_led():
sc = {
'opi': '/sys/class/leds/orangepi:red:status/brightness',
'rpi': '/sys/class/leds/led0/brightness'
}
tc = {
'rpi': '/sys/class/leds/led0/trigger', # mmc,none
}
for k in ('opi','rpi'):
try: os.stat(sc[k])
except: pass
else:
board = k
break
else:
die(2,'Control files not found! LED option not supported')
status_ctl = sc[board]
trigger_ctl = tc[board] if board in tc else None
if not check_access(status_ctl) or (
trigger_ctl and not check_access(trigger_ctl,desc='LED trigger',init_val='none')
):
sys.exit(1)
if trigger_ctl:
with open(trigger_ctl,'w') as f: f.write('none\n')
return status_ctl,trigger_ctl
def at_exit(exit_val,nl=True):
if opt.led:
set_led('off')
ev.set()
led_thread.join()
if trigger_ctl:
with open(trigger_ctl,'w') as f: f.write('mmc0\n')
if nl: msg('')
sys.exit(exit_val)
def handler(a,b): at_exit(1)
# main()
if len(cmd_args) == 1 and cmd_args[0] in ('gen_key','setup'):
globals()[cmd_args[0]]()
sys.exit(0)
check_wipe_present()
wfs = get_wallet_files()
check_daemons_running()
#sign()
#sys.exit()
if opt.led:
import threading
status_ctl,trigger_ctl = init_led()
ev = threading.Event()
led_thread = None
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
try:
if len(cmd_args) == 1 and cmd_args[0] == 'wait':
do_loop()
elif len(cmd_args) == 0:
ret = do_sign()
at_exit((1,0)[ret],nl=False)
else:
msg('Invalid invocation')
except IOError:
at_exit(2)
except KeyboardInterrupt:
at_exit(1)

View file

@ -33,6 +33,12 @@ class CoinDaemonRPCConnection(object):
dmsg(' host [{}] port [{}] user [{}] passwd [{}] auth_cookie [{}]\n'.format( dmsg(' host [{}] port [{}] user [{}] passwd [{}] auth_cookie [{}]\n'.format(
host,port,user,passwd,auth_cookie)) host,port,user,passwd,auth_cookie))
import socket
try:
socket.create_connection((host,port)).close()
except:
die(1,'Unable to connect to {}:{}'.format(host,port))
if user and passwd: if user and passwd:
self.auth_str = '{}:{}'.format(user,passwd) self.auth_str = '{}:{}'.format(user,passwd)
elif auth_cookie: elif auth_cookie:

View file

@ -54,14 +54,11 @@ def _kb_hold_protect_unix():
def _kb_hold_protect_unix_raw(): pass def _kb_hold_protect_unix_raw(): pass
def _get_keypress_unix(prompt='',immed_chars='',prehold_protect=True): def _get_keypress_unix(prompt='',immed_chars='',prehold_protect=True):
msg_r(prompt) msg_r(prompt)
timeout = float(0.3) timeout = float(0.3)
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
old = termios.tcgetattr(fd) old = termios.tcgetattr(fd)
tty.setcbreak(fd) tty.setcbreak(fd)
while True: while True:
# Protect against held-down key before read() # Protect against held-down key before read()
key = select([sys.stdin], [], [], timeout)[0] key = select([sys.stdin], [], [], timeout)[0]
@ -73,7 +70,6 @@ def _get_keypress_unix(prompt='',immed_chars='',prehold_protect=True):
# Protect against long keypress # Protect against long keypress
key = select([sys.stdin], [], [], timeout)[0] key = select([sys.stdin], [], [], timeout)[0]
if not key: break if not key: break
termios.tcsetattr(fd, termios.TCSADRAIN, old) termios.tcsetattr(fd, termios.TCSADRAIN, old)
return ch return ch
@ -82,17 +78,12 @@ def _get_keypress_unix_stub(prompt='',immed_chars='',prehold_protect=None):
return sys.stdin.read(1) return sys.stdin.read(1)
def _get_keypress_unix_raw(prompt='',immed_chars='',prehold_protect=None): def _get_keypress_unix_raw(prompt='',immed_chars='',prehold_protect=None):
msg_r(prompt) msg_r(prompt)
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
old = termios.tcgetattr(fd) old = termios.tcgetattr(fd)
tty.setcbreak(fd) tty.setcbreak(fd)
ch = sys.stdin.read(1) ch = sys.stdin.read(1)
termios.tcsetattr(fd, termios.TCSADRAIN, old) termios.tcsetattr(fd, termios.TCSADRAIN, old)
return ch return ch
def _kb_hold_protect_mswin(): def _kb_hold_protect_mswin():
@ -203,11 +194,12 @@ def set_terminal_vars():
kb_hold_protect = (_kb_hold_protect_unix_raw,_kb_hold_protect_unix)[g.hold_protect] kb_hold_protect = (_kb_hold_protect_unix_raw,_kb_hold_protect_unix)[g.hold_protect]
if not sys.stdin.isatty(): if not sys.stdin.isatty():
get_char,kb_hold_protect = _get_keypress_unix_stub,_kb_hold_protect_unix_raw get_char,kb_hold_protect = _get_keypress_unix_stub,_kb_hold_protect_unix_raw
get_char_raw = get_char
get_terminal_size = _get_terminal_size_linux get_terminal_size = _get_terminal_size_linux
else: else:
get_char_raw = _get_keypress_mswin_raw get_char_raw = _get_keypress_mswin_raw
get_char = (_get_keypress_mswin_raw,_get_keypress_mswin)[g.hold_protect] get_char = (_get_keypress_mswin_raw,_get_keypress_mswin)[g.hold_protect]
kb_hold_protect = (_kb_hold_protect_mswin_raw,_kb_hold_protect_mswin)[g.hold_protect] kb_hold_protect = (_kb_hold_protect_mswin_raw,_kb_hold_protect_mswin)[g.hold_protect]
if not sys.stdin.isatty(): if not sys.stdin.isatty():
get_char = _get_keypress_mswin_stub get_char = get_char_raw = _get_keypress_mswin_stub
get_terminal_size = _get_terminal_size_mswin get_terminal_size = _get_terminal_size_mswin

View file

@ -143,7 +143,7 @@ def Vmsg_r(s,force=False):
def dmsg(s): def dmsg(s):
if opt.debug: msg(s) if opt.debug: msg(s)
def suf(arg,suf_type): def suf(arg,suf_type='s'):
suf_types = { 's': ('s',''), 'es': ('es','') } suf_types = { 's': ('s',''), 'es': ('es','') }
assert suf_type in suf_types assert suf_type in suf_types
t = type(arg) t = type(arg)

View file

@ -1,373 +0,0 @@
#!/usr/bin/env python
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
mmgen-autosign: Auto-sign MMGen transactions
"""
import sys,os,subprocess,time,signal
from stat import *
mountpoint = '/mnt/tx'
tx_dir = os.path.join(mountpoint,'tx')
part_label = 'MMGEN_TX'
shm_dir = '/dev/shm'
secret_fn = 'txsign-secret'
from mmgen.common import *
prog_name = os.path.basename(sys.argv[0])
opts_data = lambda: {
'desc': 'Auto-sign MMGen transactions',
'usage':'[opts] [command]',
'options': """
-h, --help Print this help message
-c, --coins=c Coins to sign for (comma-separated list)
-l, --led Use status LED to signal standby, busy and error
-s, --stealth-led Stealth LED mode - signal busy and error only, and only
after successful authorization.
-q, --quiet Produce quieter output
-v, --verbose Produce more verbose output
""",
'notes': """
COMMANDS
gen_secret - generate the shared secret and copy to /dev/shm and USB stick
wait - start in loop mode: wait - mount - sign - unmount - wait
USAGE NOTES
If invoked with no command, the program mounts the USB stick, signs any
unsigned transactions, unmounts the USB stick and exits.
If invoked with 'wait', the program waits in a loop, mounting, signing and
unmounting every time the USB stick is inserted. On supported platforms,
the status LED indicates whether the program is busy or in standby mode,
i.e. ready for USB stick insertion or removal.
The USB stick must have a partition labeled MMGEN_TX and a user-writable
directory '/tx', where unsigned MMGen transactions are placed.
On the signing machine the directory /mnt/tx must exist and /etc/fstab must
contain the following entry:
LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0
The signing wallet or wallets must be in MMGen mnemonic format and
present in /dev/shm. The wallet(s) can be created interactively with
the following command:
$ mmgen-walletconv -i words -o words -d /dev/shm
{} checks that a shared secret is present on the USB stick
before signing transactions. The shared secret is generated by invoking
the command with 'gen_secret' with the USB stick inserted. For good
security, it's advisable to re-generate a new shared secret before each
signing session.
Status LED functionality is supported on Orange Pi and Raspberry Pi boards.
This program is a helper script and is not installed by default. You
may copy it to your executable path if you wish, or just run it in place
in the scripts directory of the MMGen repository root where it resides.
""".format(prog_name)
}
cmd_args = opts.init(opts_data,add_opts=['mmgen_keys_from_file','in_fmt'])
import mmgen.tx
from mmgen.txsign import txsign
from mmgen.protocol import CoinProtocol
if opt.stealth_led: opt.led = True
opt.outdir = tx_dir
def check_daemons_running():
if opt.coin:
die(1,'--coin option not supported with this command. Use --coins instead')
if opt.coins:
coins = opt.coins.split(',')
else:
ymsg('Warning: no coins specified, so defaulting to BTC only')
coins = ['btc']
for coin in coins:
cmd = [ 'mmgen-tool',
'--coin={}'.format(coin),
'--testnet={}'.format(opt.testnet or 0)
] + ([],['--quiet'])[bool(opt.quiet)] + ['getbalance']
vmsg('Executing: {}'.format(' '.join(cmd)))
try: subprocess.check_output(cmd)
except: die(1,'{} daemon not running'.format(coin.upper()))
def get_wallet_files():
wfs = [f for f in os.listdir(shm_dir) if f[-8:] == '.mmwords']
if not wfs:
die(1,'No mnemonic files present!')
return [os.path.join(shm_dir,w) for w in wfs]
def get_secret_in_dir(d,on_fail='die'):
fn = os.path.join(d,secret_fn)
try:
with open(fn) as f: ret = f.read().rstrip()
assert is_hex_str(ret) and len(ret) == 32
return ret
except:
msg("Secret file '{}' non-existent, unreadable or in incorrect format!".format(fn))
if on_fail == 'die': sys.exit(1)
else: return None
def do_mount():
if not os.path.ismount(mountpoint):
msg('Mounting '+mountpoint)
subprocess.call(['mount',mountpoint])
try:
ds = os.stat(tx_dir)
assert S_ISDIR(ds.st_mode)
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR
except:
die(1,'{} missing, or not read/writable by user!'.format(tx_dir))
def do_umount():
if os.path.ismount(mountpoint):
subprocess.call(['sync'])
msg('Unmounting '+mountpoint)
subprocess.call(['umount',mountpoint])
def sign_tx_file(txfile):
try:
g.coin = mmgen.tx.MMGenTX(txfile,md_only=True).coin
g.proto = CoinProtocol(g.coin,g.testnet)
reload(sys.modules['mmgen.tx'])
tx = mmgen.tx.MMGenTX(txfile)
rpc_init(reinit=True)
txsign(tx,wfs,None,None)
tx.write_to_file(ask_write=False)
return True
except:
return False
def sign():
dirlist = os.listdir(tx_dir)
raw = [f for f in dirlist if f[-6:] == '.rawtx']
signed = [f[:-6] for f in dirlist if f[-6:] == '.sigtx']
unsigned = [os.path.join(tx_dir,f) for f in raw if f[:-6] not in signed]
if unsigned:
fails = 0
for txfile in unsigned:
ret = sign_tx_file(txfile)
if not ret:
fails += 1
qmsg('')
if fails: ymsg('{} failed signs'.format(fails))
time.sleep(0.3)
return False if fails else True
else:
msg('No unsigned transactions')
time.sleep(1)
return True
def do_sign():
if not opt.stealth_led: set_led('busy')
do_mount()
ret = get_secret_in_dir(tx_dir,on_fail='return')
if ret == secret:
if opt.stealth_led: set_led('busy')
ret = sign()
do_umount()
set_led(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)])
else:
if ret != None:
msg('Secret is incorrect!')
do_umount()
if not opt.stealth_led: set_led('error')
def wipe_existing_secret_files():
for d in (tx_dir,shm_dir):
fn = os.path.join(d,secret_fn)
try:
os.stat(fn)
except:
pass
else:
msg('\nWiping existing key {}'.format(fn))
subprocess.call(['wipe','-c',fn])
def create_secret_files():
from binascii import hexlify
secret = hexlify(os.urandom(16))
for d in (tx_dir,shm_dir):
fn = os.path.join(d,secret_fn)
desc = 'secret file in {}'.format(d)
msg('Creating ' + desc)
try:
with open(fn,'w') as f: f.write(secret+'\n')
os.chmod(fn,0400)
msg('Wrote ' + desc)
except:
die(2,'Unable to write ' + desc)
def do_create_secret_files():
if not get_insert_status():
die(2,'USB stick not present!')
do_mount()
wipe_existing_secret_files()
create_secret_files()
do_umount()
def ev_sleep(secs):
ev.wait(secs)
return (False,True)[ev.isSet()]
def do_led(on,off):
if not on:
with open(status_ctl,'w') as f: f.write('0\n')
while True:
if ev_sleep(3600): return
while True:
with open(status_ctl,'w') as f: f.write('255\n')
if ev_sleep(on): return
with open(status_ctl,'w') as f: f.write('0\n')
if ev_sleep(off): return
def set_led(cmd):
if not opt.led: return
vmsg("Setting LED state to '{}'".format(cmd))
timings = {
'off': ( 0, 0 ),
'standby': ( 2.2, 0.2 ),
'busy': ( 0.06, 0.06 ),
'error': ( 0.5, 0.5 )}[cmd]
global led_thread
if led_thread:
ev.set(); led_thread.join(); ev.clear()
led_thread = threading.Thread(target=do_led,name='LED loop',args=timings)
led_thread.start()
def get_insert_status():
try: os.stat(os.path.join('/dev/disk/by-label/',part_label))
except: return False
else: return True
def do_loop():
n,prev_status = 0,False
if not opt.stealth_led:
set_led('standby')
while True:
status = get_insert_status()
if status and not prev_status:
msg('Running command...')
do_sign()
prev_status = status
if not n % 10:
msg_r('\r{}\rWaiting'.format(' '*17))
time.sleep(1)
msg_r('.')
n += 1
def check_access(fn,desc='status LED control',init_val=None):
try:
with open(fn) as f: b = f.read().strip()
with open(fn,'w') as f:
f.write('{}\n'.format(init_val or b))
return True
except:
m1 = "You do not have access to the {} file\n".format(desc)
m2 = "To allow access, run 'sudo chmod 0666 {}'".format(fn)
msg(m1+m2)
return False
def check_wipe_present():
try:
subprocess.Popen(['wipe','-v'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
except:
die(2,"The 'wipe' utility must be installed before running this program")
def at_exit(nl=True):
if opt.led:
set_led('off')
ev.set()
led_thread.join()
if board == 'rpi':
with open(trigger[board],'w') as f: f.write('mmc0\n')
if nl: msg('')
raise SystemExit
def handler(a,b): at_exit()
# main()
if len(cmd_args) == 1 and cmd_args[0] == 'gen_secret':
do_create_secret_files()
sys.exit()
if opt.led:
import threading
status = {
'opi': '/sys/class/leds/orangepi:red:status/brightness',
'rpi': '/sys/class/leds/led0/brightness'
}
trigger = {
'rpi': '/sys/class/leds/led0/trigger', # mmc,none
}
for k in ('opi','rpi'):
try: os.stat(status[k])
except: pass
else:
board = k
status_ctl = status[board]
break
else:
die(2,'Control files not found! LED option not supported')
if not check_access(status_ctl) or (
board == 'rpi' and not check_access(trigger[board],desc='LED trigger',init_val='none')):
sys.exit(1)
ev = threading.Event()
led_thread = None
if board == 'rpi':
with open(trigger[board],'w') as f: f.write('none\n')
check_wipe_present()
wfs = get_wallet_files()
secret = get_secret_in_dir(shm_dir,on_fail='die')
check_daemons_running()
#sign(); sys.exit()
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
try:
if len(cmd_args) == 1 and cmd_args[0] == 'wait':
do_loop()
elif len(cmd_args) == 0:
do_sign()
at_exit(nl=False)
else:
msg('Invalid invocation')
except IOError:
at_exit()
except KeyboardInterrupt:
at_exit()

View file

@ -2,7 +2,7 @@
# Tested on Linux, MinGW-64 # Tested on Linux, MinGW-64
# MinGW's bash 3.1.17 doesn't do ${var^^} # MinGW's bash 3.1.17 doesn't do ${var^^}
dfl_tests='obj btc btc_tn btc_rt bch bch_rt ltc ltc_tn ltc_rt tool gen' dfl_tests='obj misc btc btc_tn btc_rt bch bch_rt ltc ltc_tn ltc_rt tool gen'
PROGNAME=$(basename $0) PROGNAME=$(basename $0)
while getopts hinPt OPT while getopts hinPt OPT
do do
@ -16,6 +16,7 @@ do
echo " '-t' Print the tests without running them" echo " '-t' Print the tests without running them"
echo " AVAILABLE TESTS:" echo " AVAILABLE TESTS:"
echo " obj - data objects" echo " obj - data objects"
echo " misc - miscellaneous operations"
echo " btc - bitcoin" echo " btc - bitcoin"
echo " btc_tn - bitcoin testnet" echo " btc_tn - bitcoin testnet"
echo " btc_rt - bitcoin regtest" echo " btc_rt - bitcoin regtest"
@ -92,6 +93,12 @@ t_obj=(
'test/objtest.py --coin=ltc --testnet=1 -S') 'test/objtest.py --coin=ltc --testnet=1 -S')
f_obj='Data object test complete' f_obj='Data object test complete'
i_misc='Miscellaneous operations'
s_misc='The bitcoin, bitcoin-abc and litecoin (mainnet) daemons must be running for the following tests'
t_misc=(
'test/test.py -On misc')
f_misc='Miscellaneous operations test complete'
i_btc='Bitcoin mainnet' i_btc='Bitcoin mainnet'
s_btc='The bitcoin (mainnet) daemon must both be running for the following tests' s_btc='The bitcoin (mainnet) daemon must both be running for the following tests'
t_btc=( t_btc=(

View file

@ -148,7 +148,7 @@ setup(
'mmgen.share.__init__', 'mmgen.share.__init__',
'mmgen.share.Opts', 'mmgen.share.Opts',
], ],
scripts=[ scripts = [
'cmds/mmgen-addrgen', 'cmds/mmgen-addrgen',
'cmds/mmgen-keygen', 'cmds/mmgen-keygen',
'cmds/mmgen-passgen', 'cmds/mmgen-passgen',
@ -163,6 +163,7 @@ setup(
'cmds/mmgen-txsign', 'cmds/mmgen-txsign',
'cmds/mmgen-txsend', 'cmds/mmgen-txsend',
'cmds/mmgen-txdo', 'cmds/mmgen-txdo',
'cmds/mmgen-tool' 'cmds/mmgen-tool',
'cmds/mmgen-autosign'
] ]
) )

View file

@ -248,6 +248,9 @@ class MMGenPexpect(object):
def interactive(self): def interactive(self):
return self.p.interact() # interact() not available with popen_spawn return self.p.interact() # interact() not available with popen_spawn
def kill(self,signal):
return self.p.kill(signal)
def logfile(self,arg): def logfile(self,arg):
self.p.logfile = arg self.p.logfile = arg

View file

@ -112,9 +112,13 @@ tests = OrderedDict([
({'idx_list':AddrIdxList('1-5')},[1,2,3,4,5]) ({'idx_list':AddrIdxList('1-5')},[1,2,3,4,5])
)}), )}),
('BTCAmt', { ('BTCAmt', {
'bad': ('-3.2','0.123456789',123L,'123L',22000000,20999999.12345678), 'bad': ('-3.2','0.123456789',123L,'123L','22000000',20999999.12345678),
'good': (('20999999.12345678',Decimal('20999999.12345678')),) 'good': (('20999999.12345678',Decimal('20999999.12345678')),)
}), }),
('LTCAmt', {
'bad': ('-3.2','0.123456789',123L,'123L','88000000',80999999.12345678),
'good': (('80999999.12345678',Decimal('80999999.12345678')),)
}),
('CoinAddr', { ('CoinAddr', {
'bad': (1,'x','я'), 'bad': (1,'x','я'),
'good': { 'good': {

View file

@ -204,6 +204,9 @@ cfgs = {
'17': { '17': {
'tmpdir': os.path.join('test','tmp17'), 'tmpdir': os.path.join('test','tmp17'),
}, },
'18': {
'tmpdir': os.path.join('test','tmp18'),
},
'1': { '1': {
'tmpdir': os.path.join('test','tmp1'), 'tmpdir': os.path.join('test','tmp1'),
'wpasswd': 'Dorian', 'wpasswd': 'Dorian',
@ -668,6 +671,10 @@ cmd_group['regtest'] = (
('regtest_stop', 'stopping regtest daemon'), ('regtest_stop', 'stopping regtest daemon'),
) )
cmd_group['misc'] = (
('autosign', 'transaction autosigning (BTC,BCH,LTC)'),
)
# undocumented admin cmds # undocumented admin cmds
cmd_group_admin = OrderedDict() cmd_group_admin = OrderedDict()
cmd_group_admin['create_ref_tx'] = ( cmd_group_admin['create_ref_tx'] = (
@ -736,6 +743,11 @@ for a,b in cmd_group['regtest']:
cmd_list['regtest'].append(a) cmd_list['regtest'].append(a)
cmd_data[a] = (17,b,[[[],17]]) cmd_data[a] = (17,b,[[[],17]])
cmd_data['info_misc'] = 'miscellaneous operations',[18]
for a,b in cmd_group['misc']:
cmd_list['misc'].append(a)
cmd_data[a] = (18,b,[[[],18]])
utils = { utils = {
'check_deps': 'check dependencies for specified command', 'check_deps': 'check dependencies for specified command',
'clean': 'clean specified tmp dir(s) 1,2,3,4,5 or 6 (no arg = all dirs)', 'clean': 'clean specified tmp dir(s) 1,2,3,4,5 or 6 (no arg = all dirs)',
@ -900,7 +912,7 @@ class MMGenExpect(MMGenPexpect):
def __init__(self,name,mmgen_cmd,cmd_args=[],extra_desc='',no_output=False,msg_only=False): def __init__(self,name,mmgen_cmd,cmd_args=[],extra_desc='',no_output=False,msg_only=False):
desc = (cmd_data[name][1],name)[bool(opt.names)] + (' ' + extra_desc).strip() desc = ((cmd_data[name][1],name)[bool(opt.names)] + (' ' + extra_desc)).strip()
passthru_args = ['testnet','rpc_host','rpc_port','regtest','coin'] passthru_args = ['testnet','rpc_host','rpc_port','regtest','coin']
if not opt.system: if not opt.system:
@ -1230,7 +1242,7 @@ class MMGenTestSuite(object):
def helpscreens(self,name,arg='--help'): def helpscreens(self,name,arg='--help'):
scripts = ( scripts = (
'walletgen','walletconv','walletchk','txcreate','txsign','txsend','txdo','txbump', 'walletgen','walletconv','walletchk','txcreate','txsign','txsend','txdo','txbump',
'addrgen','addrimport','keygen','passchg','tool','passgen','regtest') 'addrgen','addrimport','keygen','passchg','tool','passgen','regtest','autosign')
for s in scripts: for s in scripts:
t = MMGenExpect(name,('mmgen-'+s),[arg],extra_desc='(mmgen-%s)'%s,no_output=True) t = MMGenExpect(name,('mmgen-'+s),[arg],extra_desc='(mmgen-%s)'%s,no_output=True)
t.read() t.read()
@ -1805,6 +1817,45 @@ class MMGenTestSuite(object):
os.unlink(f1) os.unlink(f1)
cmp_or_die(hincog_offset,int(o)) cmp_or_die(hincog_offset,int(o))
# Miscellaneous tests
def autosign(self,name):
if g.platform == 'win':
msg('Skipping {} (not supported)'.format(name)); return
fdata = (('btc',''),('bch',''),('ltc','litecoin'))
tfns = [cfgs['8']['ref_tx_file'][c].format('') for c,d in fdata]
tfs = [os.path.join(ref_dir,d[1],fn) for d,fn in zip(fdata,tfns)]
try: os.mkdir(os.path.join(cfg['tmpdir'],'tx'))
except: pass
for f,fn in zip(tfs,tfns):
shutil.copyfile(f,os.path.join(cfg['tmpdir'],'tx',fn))
# make a bad tx file
with open(os.path.join(cfg['tmpdir'],'tx','bad.rawtx'),'w') as f:
f.write('bad tx data')
ls = os.listdir(cfg['tmpdir'])
opts = ['--mountpoint='+cfg['tmpdir'],'--coins=btc,bch,ltc']
# opts += ['--quiet']
mn_fn = os.path.join(ref_dir,cfgs['8']['seed_id']+'.mmwords')
mn = read_from_file(mn_fn).strip().split()
t = MMGenExpect(name,'mmgen-autosign',opts+['gen_key'],extra_desc='(gen_key)')
t.expect_getend('Wrote key file ')
t.ok()
t = MMGenExpect(name,'mmgen-autosign',opts+['setup'],extra_desc='(setup)')
t.expect('words: ','3')
t.expect('OK? (Y/n): ','\n')
for i in range(24):
t.expect('word #{}: '.format(i+1),mn[i]+'\n')
wf = t.written_to_file('Autosign wallet')
t.ok()
t = MMGenExpect(name,'mmgen-autosign',opts+['wait'],extra_desc='(sign)')
t.expect('3 transactions signed')
t.expect('1 transaction failed to sign')
t.expect('Waiting.')
t.kill(2)
t.ok(exit_val=1)
# Saved reference file tests # Saved reference file tests
def ref_wallet_conv(self,name): def ref_wallet_conv(self,name):
wf = os.path.join(ref_dir,cfg['ref_wallet']) wf = os.path.join(ref_dir,cfg['ref_wallet'])
@ -2493,8 +2544,8 @@ start_time = int(time.time())
def end_msg(): def end_msg():
t = int(time.time()) - start_time t = int(time.time()) - start_time
m = '{} tests performed. Elapsed time: {:02d}:{:02d}\n' m = '{} test{} performed. Elapsed time: {:02d}:{:02d}\n'
sys.stderr.write(green(m.format(cmd_total,t/60,t%60))) sys.stderr.write(green(m.format(cmd_total,suf(cmd_total),t/60,t%60)))
ts = MMGenTestSuite() ts = MMGenTestSuite()
@ -2524,7 +2575,7 @@ try:
else: else:
clean() clean()
for cmd in cmd_data: for cmd in cmd_data:
if cmd == 'info_regtest': break # don't run these by default if cmd == 'info_regtest': break # don't run everything after this by default
if cmd[:5] == 'info_': if cmd[:5] == 'info_':
msg(green('%sTesting %s' % (('\n','')[bool(opt.resume)],cmd_data[cmd][0]))) msg(green('%sTesting %s' % (('\n','')[bool(opt.resume)],cmd_data[cmd][0])))
continue continue