@@ -33,6 +33,7 @@ key_fn = 'autosign.key'
from .common import *
prog_name = os.path.basename(sys.argv[0])
opts_data = {
+ 'sets': [('stealth_led', True, 'led', True)],
'text': {
'desc': 'Auto-sign MMGen transactions',
'usage':'[opts] [command]',
@@ -43,6 +44,7 @@ opts_data = {
-I, --no-insert-check Don't check for device insertion
-l, --led Use status LED to signal standby, busy and error
-m, --mountpoint=m Specify an alternate mountpoint (default: '{mp}')
+-n, --no-summary Don't print a transaction summary
-s, --stealth-led Stealth LED mode - signal busy and error only, and only
after successful authorization.
-S, --full-summary Print a full summary of each signed transaction after
@@ -116,9 +118,9 @@ from .protocol import CoinProtocol,init_coin
if g.test_suite:
from .daemon import CoinDaemon
-if opt.stealth_led: opt.led = True
+if opt.mountpoint:
+ mountpoint = opt.mountpoint # TODO: make global
-if opt.mountpoint: mountpoint = opt.mountpoint # TODO: make global
opt.outdir = tx_dir = os.path.join(mountpoint,'tx')
def check_daemons_running():
@@ -132,47 +134,44 @@ def check_daemons_running():
for coin in coins:
g.proto = CoinProtocol(coin,g.testnet)
- if g.proto.sign_mode != 'daemon':
- continue
- if g.test_suite:
- g.proto.daemon_data_dir = 'test/daemons/' + coin.lower()
- g.rpc_port = CoinDaemon(get_network_id(coin,g.testnet),test_suite=True).rpc_port
- vmsg('Checking {} daemon'.format(coin))
- try:
- rpc_init(reinit=True)
- g.rpch.getblockcount()
- except SystemExit as e:
- if e.code != 0:
- fs = '{} daemon not running or not listening on port {}'
- ydie(1,fs.format(coin,g.proto.rpc_port))
+ if g.proto.sign_mode == 'daemon':
+ if g.test_suite:
+ g.proto.daemon_data_dir = 'test/daemons/' + coin.lower()
+ g.rpc_port = CoinDaemon(get_network_id(coin,g.testnet),test_suite=True).rpc_port
+ vmsg(f'Checking {coin} daemon')
+ try:
+ rpc_init(reinit=True)
+ except SystemExit as e:
+ if e.code != 0:
+ ydie(1,f'{coin} daemon not running or not listening on port {g.proto.rpc_port}')
def get_wallet_files():
- m = "Cannot open wallet directory '{}'. Did you run 'mmgen-autosign setup'?"
- try: dlist = os.listdir(wallet_dir)
- except: die(1,m.format(wallet_dir))
+ try:
+ dlist = os.listdir(wallet_dir)
+ except:
+ die(1,f"Cannot open wallet directory {wallet_dir!r}. Did you run 'mmgen-autosign setup'?")
- wfs = [x for x in dlist if x[-6:] == '.mmdat']
- if not wfs:
+ fns = [x for x in dlist if x.endswith('.mmdat')]
+ if fns:
+ return [os.path.join(wallet_dir,w) for w in fns]
+ else:
die(1,'No wallet files present!')
- return [os.path.join(wallet_dir,w) for w in wfs]
def do_mount():
if not os.path.ismount(mountpoint):
if run(['mount',mountpoint],stderr=DEVNULL,stdout=DEVNULL).returncode == 0:
- msg('Mounting '+mountpoint)
+ msg(f'Mounting {mountpoint}')
ds = os.stat(tx_dir)
- m1 = "'{}' is not a directory!"
- m2 = "'{}' is not read/write for this user!"
- assert S_ISDIR(ds.st_mode),m1.format(tx_dir)
- assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR,m2.format(tx_dir)
+ assert S_ISDIR(ds.st_mode), f'{tx_dir!r} is not a directory!'
+ assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR,f'{tx_dir!r} is not read/write for this user!'
- die(1,'{} missing, or not read/writable by user!'.format(tx_dir))
+ die(1,'{tx_dir!r} missing, or not read/writable by user!')
def do_umount():
if os.path.ismount(mountpoint):
- msg('Unmounting '+mountpoint)
+ msg(f'Unmounting {mountpoint}')
def sign_tx_file(txfile,signed_txs):
@@ -187,8 +186,8 @@ def sign_tx_file(txfile,signed_txs):
if hasattr(g.proto,'chain_name'):
- m = 'Chains do not match! tx file: {}, proto: {}'
- assert tmp_tx.chain == g.proto.chain_name,m.format(tmp_tx.chain,g.proto.chain_name)
+ if tmp_tx.chain != g.proto.chain_name:
+ die(2, f'Chains do not match! tx file: {tmp_tx.chain}, proto: {g.proto.chain_name}')
g.chain = tmp_tx.chain
g.token = tmp_tx.dcoin
@@ -209,18 +208,17 @@ def sign_tx_file(txfile,signed_txs):
return False
except Exception as e:
- msg('An error occurred: {}'.format(e.args[0]))
+ msg(f'An error occurred: {e.args[0]}')
if g.debug or g.traceback:
- print_stack_trace('AUTOSIGN {}'.format(txfile))
+ print_stack_trace(f'AUTOSIGN {txfile}')
return False
return False
def sign():
dirlist = os.listdir(tx_dir)
- raw = [f for f in dirlist if f[-6:] == '.rawtx']
- signed = [f[:-6] for f in dirlist if f[-6:] == '.sigtx']
- unsigned = [os.path.join(tx_dir,f) for f in raw if f[:-6] not in signed]
+ raw,signed = [set(f[:-6] for f in dirlist if f.endswith(ext)) for ext in ('.rawtx','.sigtx')]
+ unsigned = [os.path.join(tx_dir,f+'.rawtx') for f in raw - signed]
if unsigned:
signed_txs,fails = [],[]
@@ -233,10 +231,10 @@ def sign():
msg('{} transaction{} signed'.format(len(signed_txs),suf(signed_txs)))
if fails:
rmsg('{} transaction{} failed to sign'.format(len(fails),suf(fails)))
- if signed_txs:
+ if signed_txs and not opt.no_summary:
if fails:
- rmsg('{}Failed transactions:'.format('' if opt.full_summary else '\n'))
+ rmsg('\nFailed transactions:')
rmsg(' ' + '\n '.join(sorted(fails)) + '\n')
return False if fails else True
@@ -248,7 +246,6 @@ def decrypt_wallets():
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
-# opt.passwd_file = '/tmp/key'
from .wallet import Wallet
msg("Unlocking wallet{} with key from '{}'".format(suf(wfs),opt.passwd_file))
fails = 0
@@ -261,43 +258,52 @@ def decrypt_wallets():
return False if fails else True
def print_summary(signed_txs):
if opt.full_summary:
bmsg('\nAutosign summary:\n')
- for tx in signed_txs:
- init_coin(tx.coin,tx.chain == 'testnet')
- msg_r(tx.format_view(terse=True))
+ def gen():
+ for tx in signed_txs:
+ init_coin(tx.coin,tx.chain == 'testnet')
+ yield tx.format_view(terse=True)
+ msg_r(''.join(gen()))
- body = []
- for tx in signed_txs:
- non_mmgen = [o for o in tx.outputs if not o.mmid]
- if non_mmgen:
- body.append((tx,non_mmgen))
+ def gen():
+ for tx in signed_txs:
+ non_mmgen = [o for o in tx.outputs if not o.mmid]
+ if non_mmgen:
+ yield (tx,non_mmgen)
+ body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid,a_wid = 6,44
- msg(fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount'))
- msg(fs.format('-'*t_wid, '-'*a_wid, '-'*7))
- for tx,non_mmgen in body:
- for nm in non_mmgen:
- msg(fs.format(
- tx.txid.fmt(width=t_wid,color=True) if nm is non_mmgen[0] else ' '*t_wid,
- nm.addr.fmt(width=a_wid,color=True),
- nm.amt.hl() + ' ' + yellow(tx.coin)))
+ def gen():
+ yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
+ yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
+ for tx,non_mmgen in body:
+ for nm in non_mmgen:
+ yield fs.format(
+ tx.txid.fmt(width=t_wid,color=True) if nm is non_mmgen[0] else ' '*t_wid,
+ nm.addr.fmt(width=a_wid,color=True),
+ nm.amt.hl() + ' ' + yellow(tx.coin))
+ msg('\n'.join(gen()))
msg('No non-MMGen outputs')
def do_sign():
- if not opt.stealth_led: set_led('busy')
+ if not opt.stealth_led:
+ set_led('busy')
key_ok = decrypt_wallets()
if key_ok:
- if opt.stealth_led: set_led('busy')
+ if opt.stealth_led:
+ set_led('busy')
ret = sign()
set_led(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)])
@@ -305,7 +311,8 @@ def do_sign():
msg('Password is incorrect!')
- if not opt.stealth_led: set_led('error')
+ if not opt.stealth_led:
+ set_led('error')
return False
def wipe_existing_key():
@@ -397,7 +404,8 @@ def set_led(cmd):
def get_insert_status():
- if opt.no_insert_check: return True
+ if opt.no_insert_check:
+ return True
try: os.stat(os.path.join('/dev/disk/by-label',part_label))
except: return False
else: return True
@@ -471,11 +479,12 @@ if len(cmd_args) not in (0,1):
if len(cmd_args) == 1:
- if cmd_args[0] in ('gen_key','setup'):
- globals()[cmd_args[0]]()
+ cmd = cmd_args[0]
+ if cmd in ('gen_key','setup'):
+ globals()[cmd]()
- elif cmd_args[0] != 'wait':
- die(1,"'{}': unrecognized command".format(cmd_args[0]))
+ elif cmd != 'wait':
+ die(1,f'{cmd!r}: unrecognized command')
wfs = get_wallet_files()