Monero,autosign: minor improvements

- Monero (create,sync): specify addr range; display total balance for all wallets
- autosign: improve exception handling, launch the standard way
This commit is contained in:
The MMGen Project 2018-02-10 19:54:17 +03:00
commit 65e2db0512
Signed by: mmgen
GPG key ID: 62DBE9E5212F05BE
8 changed files with 499 additions and 446 deletions

View file

@ -116,7 +116,7 @@ future use in an address file, which addresses may safely be made public.
[**Forum**][4] |
[Reddit][0] |
[PGP Public Key][5] |
Donate: 15TLdmi5NYLdqmtCqczUs5pBPkJDXRs83w
Donate (BTC,BCH): 15TLdmi5NYLdqmtCqczUs5pBPkJDXRs83w
[0]: https://www.reddit.com/user/mmgen-py
[1]: https://github.com/mmgen/mmgen/wiki/Install-MMGen-on-Microsoft-Windows

View file

@ -1,432 +1,24 @@
#!/usr/bin/env python
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2018 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
"""
mmgen-autosign: Auto-sign MMGen transactions
"""
import sys,os,subprocess,time,signal,shutil
from stat import *
mountpoint = '/mnt/tx'
tx_dir = '/mnt/tx/tx'
part_label = 'MMGEN_TX'
wallet_dir = '/dev/shm/autosign'
key_fn = 'autosign.key'
from mmgen.common import *
prog_name = os.path.basename(sys.argv[0])
opts_data = lambda: {
'desc': 'Auto-sign MMGen transactions',
'usage':'[opts] [command]',
'options': """
-h, --help Print this help message
--, --longhelp Print help message for long options (common options)
-c, --coins=c Coins to sign for (comma-separated list)
-l, --led Use status LED to signal standby, busy and error
-m, --mountpoint=m Specify an alternate mountpoint (default: '{mp}')
-s, --stealth-led Stealth LED mode - signal busy and error only, and only
after successful authorization.
-q, --quiet Produce quieter output
-v, --verbose Produce more verbose output
""".format(mp=mountpoint),
'notes': """
COMMANDS
gen_key - generate the wallet encryption key and copy it to '{td}'
setup - generate the wallet encryption key and wallet
wait - start in loop mode: wait-mount-sign-unmount-wait
USAGE NOTES
If invoked with no command, the program mounts a removable device containing
MMGen transactions, signs any unsigned transactions, unmounts the removable
device and exits.
If invoked with 'wait', the program waits in a loop, mounting, signing and
unmounting every time the removable device is inserted.
On supported platforms (currently Orange Pi and Raspberry Pi boards), the
status LED indicates whether the program is busy or in standby mode, i.e.
ready for device insertion or removal.
The removable device must have a partition labeled MMGEN_TX and a user-
writable directory '/tx', where unsigned MMGen transactions are placed.
On the signing machine the mount point '{mp}' must exist and /etc/fstab
must contain the following entry:
LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0
Transactions are signed with a wallet on the signing machine (in the directory
'{wd}') encrypted with a 64-character hexadecimal password on the
removable device.
The password and wallet can be created in one operation by invoking the
command with 'setup' with the removable device inserted. The user will be
prompted for a seed mnemonic.
Alternatively, the password and wallet can be created separately by first
invoking the command with 'gen_key' and then creating and encrypting the
wallet using the -P (--passwd-file) option:
$ mmgen-walletconv -r0 -q -iwords -d{wd} -p1 -P{td}/{kf} -Llabel
Note that the hash preset must be '1'. Multiple wallets are permissible.
For good security, it's advisable to re-generate a new wallet and key for
each signing session.
This command is currently available only on Linux-based platforms.
""".format(pnm=prog_name,wd=wallet_dir,td=tx_dir,kf=key_fn,mp=mountpoint)
}
cmd_args = opts.init(opts_data,add_opts=['mmgen_keys_from_file','in_fmt'])
import mmgen.tx
from mmgen.txsign import txsign
from mmgen.protocol import CoinProtocol,init_coin
if opt.stealth_led: opt.led = True
if opt.mountpoint: mountpoint = opt.mountpoint # TODO: make global
opt.outdir = tx_dir = os.path.join(mountpoint,'tx')
def check_daemons_running():
if opt.coin:
die(1,'--coin option not supported with this command. Use --coins instead')
if opt.coins:
coins = opt.coins.upper().split(',')
else:
ymsg('Warning: no coins specified, so defaulting to BTC only')
coins = ['BTC']
for coin in coins:
g.proto = CoinProtocol(coin,g.testnet)
vmsg('Checking {} daemon'.format(coin))
try:
rpc_init(reinit=True)
g.rpch.getbalance()
except SystemExit as e:
if e[0] != 0:
ydie(1,'{} daemon not running or not listening on port {}'.format(coin,g.proto.rpc_port))
def get_wallet_files():
wfs = [f for f in os.listdir(wallet_dir) if f[-6:] == '.mmdat']
if not wfs:
die(1,'No wallet files present!')
return [os.path.join(wallet_dir,w) for w in wfs]
def do_mount():
if not os.path.ismount(mountpoint):
if subprocess.Popen(['mount',mountpoint],stderr=subprocess.PIPE,stdout=subprocess.PIPE).wait() == 0:
msg('Mounting '+mountpoint)
try:
ds = os.stat(tx_dir)
assert S_ISDIR(ds.st_mode)
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR
except:
die(1,'{} missing, or not read/writable by user!'.format(tx_dir))
def do_umount():
if os.path.ismount(mountpoint):
subprocess.call(['sync'])
msg('Unmounting '+mountpoint)
subprocess.call(['umount',mountpoint])
def sign_tx_file(txfile):
try:
init_coin(mmgen.tx.MMGenTX(txfile,md_only=True).coin)
reload(sys.modules['mmgen.tx'])
tx = mmgen.tx.MMGenTX(txfile)
rpc_init(reinit=True)
txsign(tx,wfs,None,None)
tx.write_to_file(ask_write=False)
return True
except:
return False
def sign():
dirlist = os.listdir(tx_dir)
raw = [f for f in dirlist if f[-6:] == '.rawtx']
signed = [f[:-6] for f in dirlist if f[-6:] == '.sigtx']
unsigned = [os.path.join(tx_dir,f) for f in raw if f[:-6] not in signed]
if unsigned:
fails = 0
for txfile in unsigned:
ret = sign_tx_file(txfile)
if not ret:
fails += 1
qmsg('')
time.sleep(0.3)
n_ok = len(unsigned) - fails
msg('{} transaction{} signed'.format(n_ok,suf(n_ok)))
if fails:
ymsg('{} transaction{} failed to sign'.format(fails,suf(fails)))
return False if fails else True
else:
msg('No unsigned transactions')
time.sleep(1)
return True
def decrypt_wallets():
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
# opt.passwd_file = '/tmp/key'
from mmgen.seed import SeedSource
msg("Unlocking wallet{} with key from '{}'".format(suf(wfs),opt.passwd_file))
fails = 0
for wf in wfs:
try:
SeedSource(wf)
except SystemExit as e:
if e[0] != 0:
fails += 1
return False if fails else True
def do_sign():
if not opt.stealth_led: set_led('busy')
do_mount()
key_ok = decrypt_wallets()
if key_ok:
if opt.stealth_led: set_led('busy')
ret = sign()
do_umount()
set_led(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)])
return ret
else:
msg('Password is incorrect!')
do_umount()
if not opt.stealth_led: set_led('error')
return False
def wipe_existing_key():
fn = os.path.join(tx_dir,key_fn)
try: os.stat(fn)
except: pass
else:
msg('\nWiping existing key {}'.format(fn))
subprocess.call(['wipe','-cf',fn])
def create_key():
from binascii import hexlify
kdata = hexlify(os.urandom(32))
fn = os.path.join(tx_dir,key_fn)
desc = 'key file {}'.format(fn)
msg('Creating ' + desc)
try:
with open(fn,'w') as f: f.write(kdata+'\n')
os.chmod(fn,0400)
msg('Wrote ' + desc)
except:
die(2,'Unable to write ' + desc)
def gen_key(no_unmount=False):
create_wallet_dir()
if not get_insert_status():
die(2,'Removable device not present!')
do_mount()
wipe_existing_key()
create_key()
if not no_unmount:
do_umount()
def remove_wallet_dir():
msg("Deleting '{}'".format(wallet_dir))
try: shutil.rmtree(wallet_dir)
except: pass
def create_wallet_dir():
try: os.mkdir(wallet_dir)
except: pass
try: os.stat(wallet_dir)
except: die(2,"Unable to create wallet directory '{}'".format(wallet_dir))
def setup():
remove_wallet_dir()
gen_key(no_unmount=True)
from mmgen.seed import SeedSource
opt.hidden_incog_input_params = None
opt.quiet = True
opt.in_fmt = 'words'
ss_in = SeedSource()
opt.out_fmt = 'mmdat'
opt.usr_randchars = 0
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
from mmgen.obj import MMGenWalletLabel
opt.label = MMGenWalletLabel('Autosign Wallet')
ss_out = SeedSource(ss=ss_in)
ss_out.write_to_file(desc='autosign wallet',outdir=wallet_dir)
def ev_sleep(secs):
ev.wait(secs)
return (False,True)[ev.isSet()]
def do_led(on,off):
if not on:
with open(status_ctl,'w') as f: f.write('0\n')
while True:
if ev_sleep(3600): return
while True:
for s_time,val in ((on,255),(off,0)):
with open(status_ctl,'w') as f: f.write('{}\n'.format(val))
if ev_sleep(s_time): return
def set_led(cmd):
if not opt.led: return
vmsg("Setting LED state to '{}'".format(cmd))
timings = {
'off': ( 0, 0 ),
'standby': ( 2.2, 0.2 ),
'busy': ( 0.06, 0.06 ),
'error': ( 0.5, 0.5 )}[cmd]
global led_thread
if led_thread:
ev.set(); led_thread.join(); ev.clear()
led_thread = threading.Thread(target=do_led,name='LED loop',args=timings)
led_thread.start()
def get_insert_status():
if os.getenv('MMGEN_TEST_SUITE'): return True
try: os.stat(os.path.join('/dev/disk/by-label/',part_label))
except: return False
else: return True
def do_loop():
n,prev_status = 0,False
if not opt.stealth_led:
set_led('standby')
while True:
status = get_insert_status()
if status and not prev_status:
msg('Device insert detected')
do_sign()
prev_status = status
if not n % 10:
msg_r('\r{}\rWaiting'.format(' '*17))
time.sleep(1)
msg_r('.')
n += 1
def check_access(fn,desc='status LED control',init_val=None):
try:
with open(fn) as f: b = f.read().strip()
with open(fn,'w') as f:
f.write('{}\n'.format(init_val or b))
return True
except:
m1 = "You do not have access to the {} file\n".format(desc)
m2 = "To allow access, run 'sudo chmod 0666 {}'".format(fn)
msg(m1+m2)
return False
def check_wipe_present():
try:
subprocess.Popen(['wipe','-v'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
except:
die(2,"The 'wipe' utility must be installed before running this program")
def init_led():
sc = {
'opi': '/sys/class/leds/orangepi:red:status/brightness',
'rpi': '/sys/class/leds/led0/brightness'
}
tc = {
'rpi': '/sys/class/leds/led0/trigger', # mmc,none
}
for k in ('opi','rpi'):
try: os.stat(sc[k])
except: pass
else:
board = k
break
else:
die(2,'Control files not found! LED option not supported')
status_ctl = sc[board]
trigger_ctl = tc[board] if board in tc else None
if not check_access(status_ctl) or (
trigger_ctl and not check_access(trigger_ctl,desc='LED trigger',init_val='none')
):
sys.exit(1)
if trigger_ctl:
with open(trigger_ctl,'w') as f: f.write('none\n')
return status_ctl,trigger_ctl
def at_exit(exit_val,nl=True):
if opt.led:
set_led('off')
ev.set()
led_thread.join()
if trigger_ctl:
with open(trigger_ctl,'w') as f: f.write('mmc0\n')
if nl: msg('')
sys.exit(exit_val)
def handler(a,b): at_exit(1)
# main()
if len(cmd_args) == 1:
if cmd_args[0] in ('gen_key','setup'):
globals()[cmd_args[0]]()
sys.exit(0)
elif cmd_args[0] == 'wait':
pass
else:
die(2,"'{}': unrecognized command".format(cmd_args[0]))
check_wipe_present()
wfs = get_wallet_files()
check_daemons_running()
#sign()
#sys.exit()
if opt.led:
import threading
status_ctl,trigger_ctl = init_led()
ev = threading.Event()
led_thread = None
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
try:
if len(cmd_args) == 1 and cmd_args[0] == 'wait':
do_loop()
elif len(cmd_args) == 0:
ret = do_sign()
at_exit((1,0)[ret],nl=False)
else:
msg('Invalid invocation')
except IOError:
at_exit(2)
except KeyboardInterrupt:
at_exit(1)
from mmgen.main import launch
launch("autosign")

425
mmgen/main_autosign.py Executable file
View file

@ -0,0 +1,425 @@
#!/usr/bin/env python
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2018 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
mmgen-autosign: Auto-sign MMGen transactions
"""
import sys,os,subprocess,time,signal,shutil
from stat import *
mountpoint = '/mnt/tx'
tx_dir = '/mnt/tx/tx'
part_label = 'MMGEN_TX'
wallet_dir = '/dev/shm/autosign'
key_fn = 'autosign.key'
from mmgen.common import *
prog_name = os.path.basename(sys.argv[0])
opts_data = lambda: {
'desc': 'Auto-sign MMGen transactions',
'usage':'[opts] [command]',
'options': """
-h, --help Print this help message
--, --longhelp Print help message for long options (common options)
-c, --coins=c Coins to sign for (comma-separated list)
-l, --led Use status LED to signal standby, busy and error
-m, --mountpoint=m Specify an alternate mountpoint (default: '{mp}')
-s, --stealth-led Stealth LED mode - signal busy and error only, and only
after successful authorization.
-q, --quiet Produce quieter output
-v, --verbose Produce more verbose output
""".format(mp=mountpoint),
'notes': """
COMMANDS
gen_key - generate the wallet encryption key and copy it to '{td}'
setup - generate the wallet encryption key and wallet
wait - start in loop mode: wait-mount-sign-unmount-wait
USAGE NOTES
If invoked with no command, the program mounts a removable device containing
MMGen transactions, signs any unsigned transactions, unmounts the removable
device and exits.
If invoked with 'wait', the program waits in a loop, mounting, signing and
unmounting every time the removable device is inserted.
On supported platforms (currently Orange Pi and Raspberry Pi boards), the
status LED indicates whether the program is busy or in standby mode, i.e.
ready for device insertion or removal.
The removable device must have a partition labeled MMGEN_TX and a user-
writable directory '/tx', where unsigned MMGen transactions are placed.
On the signing machine the mount point '{mp}' must exist and /etc/fstab
must contain the following entry:
LABEL='MMGEN_TX' /mnt/tx auto noauto,user 0 0
Transactions are signed with a wallet on the signing machine (in the directory
'{wd}') encrypted with a 64-character hexadecimal password on the
removable device.
The password and wallet can be created in one operation by invoking the
command with 'setup' with the removable device inserted. The user will be
prompted for a seed mnemonic.
Alternatively, the password and wallet can be created separately by first
invoking the command with 'gen_key' and then creating and encrypting the
wallet using the -P (--passwd-file) option:
$ mmgen-walletconv -r0 -q -iwords -d{wd} -p1 -P{td}/{kf} -Llabel
Note that the hash preset must be '1'. Multiple wallets are permissible.
For good security, it's advisable to re-generate a new wallet and key for
each signing session.
This command is currently available only on Linux-based platforms.
""".format(pnm=prog_name,wd=wallet_dir,td=tx_dir,kf=key_fn,mp=mountpoint)
}
cmd_args = opts.init(opts_data,add_opts=['mmgen_keys_from_file','in_fmt'])
import mmgen.tx
from mmgen.txsign import txsign
from mmgen.protocol import CoinProtocol,init_coin
if opt.stealth_led: opt.led = True
if opt.mountpoint: mountpoint = opt.mountpoint # TODO: make global
opt.outdir = tx_dir = os.path.join(mountpoint,'tx')
def check_daemons_running():
if opt.coin:
die(1,'--coin option not supported with this command. Use --coins instead')
if opt.coins:
coins = opt.coins.upper().split(',')
else:
ymsg('Warning: no coins specified, so defaulting to BTC only')
coins = ['BTC']
for coin in coins:
g.proto = CoinProtocol(coin,g.testnet)
vmsg('Checking {} daemon'.format(coin))
try:
rpc_init(reinit=True)
g.rpch.getbalance()
except SystemExit as e:
if e[0] != 0:
ydie(1,'{} daemon not running or not listening on port {}'.format(coin,g.proto.rpc_port))
def get_wallet_files():
wfs = [f for f in os.listdir(wallet_dir) if f[-6:] == '.mmdat']
if not wfs:
die(1,'No wallet files present!')
return [os.path.join(wallet_dir,w) for w in wfs]
def do_mount():
if not os.path.ismount(mountpoint):
if subprocess.Popen(['mount',mountpoint],stderr=subprocess.PIPE,stdout=subprocess.PIPE).wait() == 0:
msg('Mounting '+mountpoint)
try:
ds = os.stat(tx_dir)
assert S_ISDIR(ds.st_mode)
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR
except:
die(1,'{} missing, or not read/writable by user!'.format(tx_dir))
def do_umount():
if os.path.ismount(mountpoint):
subprocess.call(['sync'])
msg('Unmounting '+mountpoint)
subprocess.call(['umount',mountpoint])
def sign_tx_file(txfile):
try:
init_coin(mmgen.tx.MMGenTX(txfile,md_only=True).coin)
reload(sys.modules['mmgen.tx'])
tx = mmgen.tx.MMGenTX(txfile)
rpc_init(reinit=True)
txsign(tx,wfs,None,None)
tx.write_to_file(ask_write=False)
return True
except:
return False
def sign():
dirlist = os.listdir(tx_dir)
raw = [f for f in dirlist if f[-6:] == '.rawtx']
signed = [f[:-6] for f in dirlist if f[-6:] == '.sigtx']
unsigned = [os.path.join(tx_dir,f) for f in raw if f[:-6] not in signed]
if unsigned:
fails = 0
for txfile in unsigned:
ret = sign_tx_file(txfile)
if not ret:
fails += 1
qmsg('')
time.sleep(0.3)
n_ok = len(unsigned) - fails
msg('{} transaction{} signed'.format(n_ok,suf(n_ok)))
if fails:
ymsg('{} transaction{} failed to sign'.format(fails,suf(fails)))
return False if fails else True
else:
msg('No unsigned transactions')
time.sleep(1)
return True
def decrypt_wallets():
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
# opt.passwd_file = '/tmp/key'
from mmgen.seed import SeedSource
msg("Unlocking wallet{} with key from '{}'".format(suf(wfs),opt.passwd_file))
fails = 0
for wf in wfs:
try:
SeedSource(wf)
except SystemExit as e:
if e[0] != 0:
fails += 1
return False if fails else True
def do_sign():
if not opt.stealth_led: set_led('busy')
do_mount()
key_ok = decrypt_wallets()
if key_ok:
if opt.stealth_led: set_led('busy')
ret = sign()
do_umount()
set_led(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)])
return ret
else:
msg('Password is incorrect!')
do_umount()
if not opt.stealth_led: set_led('error')
return False
def wipe_existing_key():
fn = os.path.join(tx_dir,key_fn)
try: os.stat(fn)
except: pass
else:
msg('\nWiping existing key {}'.format(fn))
subprocess.call(['wipe','-cf',fn])
def create_key():
from binascii import hexlify
kdata = hexlify(os.urandom(32))
fn = os.path.join(tx_dir,key_fn)
desc = 'key file {}'.format(fn)
msg('Creating ' + desc)
try:
with open(fn,'w') as f: f.write(kdata+'\n')
os.chmod(fn,0400)
msg('Wrote ' + desc)
except:
die(2,'Unable to write ' + desc)
def gen_key(no_unmount=False):
create_wallet_dir()
if not get_insert_status():
die(1,'Removable device not present!')
do_mount()
wipe_existing_key()
create_key()
if not no_unmount:
do_umount()
def remove_wallet_dir():
msg("Deleting '{}'".format(wallet_dir))
try: shutil.rmtree(wallet_dir)
except: pass
def create_wallet_dir():
try: os.mkdir(wallet_dir)
except: pass
try: os.stat(wallet_dir)
except: die(2,"Unable to create wallet directory '{}'".format(wallet_dir))
def setup():
remove_wallet_dir()
gen_key(no_unmount=True)
from mmgen.seed import SeedSource
opt.hidden_incog_input_params = None
opt.quiet = True
opt.in_fmt = 'words'
ss_in = SeedSource()
opt.out_fmt = 'mmdat'
opt.usr_randchars = 0
opt.hash_preset = '1'
opt.set_by_user = ['hash_preset']
opt.passwd_file = os.path.join(tx_dir,key_fn)
from mmgen.obj import MMGenWalletLabel
opt.label = MMGenWalletLabel('Autosign Wallet')
ss_out = SeedSource(ss=ss_in)
ss_out.write_to_file(desc='autosign wallet',outdir=wallet_dir)
def ev_sleep(secs):
ev.wait(secs)
return (False,True)[ev.isSet()]
def do_led(on,off):
if not on:
with open(status_ctl,'w') as f: f.write('0\n')
while True:
if ev_sleep(3600): return
while True:
for s_time,val in ((on,255),(off,0)):
with open(status_ctl,'w') as f: f.write('{}\n'.format(val))
if ev_sleep(s_time): return
def set_led(cmd):
if not opt.led: return
vmsg("Setting LED state to '{}'".format(cmd))
timings = {
'off': ( 0, 0 ),
'standby': ( 2.2, 0.2 ),
'busy': ( 0.06, 0.06 ),
'error': ( 0.5, 0.5 )}[cmd]
global led_thread
if led_thread:
ev.set(); led_thread.join(); ev.clear()
led_thread = threading.Thread(target=do_led,name='LED loop',args=timings)
led_thread.start()
def get_insert_status():
if os.getenv('MMGEN_TEST_SUITE'): return True
try: os.stat(os.path.join('/dev/disk/by-label/',part_label))
except: return False
else: return True
def do_loop():
n,prev_status = 0,False
if not opt.stealth_led:
set_led('standby')
while True:
status = get_insert_status()
if status and not prev_status:
msg('Device insert detected')
do_sign()
prev_status = status
if not n % 10:
msg_r('\r{}\rWaiting'.format(' '*17))
time.sleep(1)
msg_r('.')
n += 1
def check_access(fn,desc='status LED control',init_val=None):
try:
with open(fn) as f: b = f.read().strip()
with open(fn,'w') as f:
f.write('{}\n'.format(init_val or b))
return True
except:
m1 = "You do not have access to the {} file\n".format(desc)
m2 = "To allow access, run 'sudo chmod 0666 {}'".format(fn)
msg(m1+m2)
return False
def check_wipe_present():
try:
subprocess.Popen(['wipe','-v'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
except:
die(2,"The 'wipe' utility must be installed before running this program")
def init_led():
sc = {
'opi': '/sys/class/leds/orangepi:red:status/brightness',
'rpi': '/sys/class/leds/led0/brightness'
}
tc = {
'rpi': '/sys/class/leds/led0/trigger', # mmc,none
}
for k in ('opi','rpi'):
try: os.stat(sc[k])
except: pass
else:
board = k
break
else:
die(2,'Control files not found! LED option not supported')
status_ctl = sc[board]
trigger_ctl = tc[board] if board in tc else None
if not check_access(status_ctl) or (
trigger_ctl and not check_access(trigger_ctl,desc='LED trigger',init_val='none')
):
sys.exit(1)
if trigger_ctl:
with open(trigger_ctl,'w') as f: f.write('none\n')
return status_ctl,trigger_ctl
# main()
if len(cmd_args) not in (0,1):
opts.usage()
if len(cmd_args) == 1:
if cmd_args[0] in ('gen_key','setup'):
globals()[cmd_args[0]]()
sys.exit(0)
elif cmd_args[0] != 'wait':
die(1,"'{}': unrecognized command".format(cmd_args[0]))
check_wipe_present()
wfs = get_wallet_files()
check_daemons_running()
def at_exit(exit_val,nl=False):
if nl: msg('')
msg('Cleaning up...')
if opt.led:
set_led('off')
ev.set()
led_thread.join()
if trigger_ctl:
with open(trigger_ctl,'w') as f: f.write('mmc0\n')
sys.exit(exit_val)
def handler(a,b): at_exit(1,nl=True)
signal.signal(signal.SIGTERM,handler)
signal.signal(signal.SIGINT,handler)
if opt.led:
import threading
status_ctl,trigger_ctl = init_led()
ev = threading.Event()
led_thread = None
if len(cmd_args) == 0:
ret = do_sign()
at_exit(int(not ret))
elif cmd_args[0] == 'wait':
do_loop()

View file

@ -94,8 +94,8 @@ cmd_data = OrderedDict([
('Decrypt', ['<infile> [str]',"outfile [str='']","hash_preset [str='']"]),
('Bytespec', ['<bytespec> [str]']),
('Keyaddrlist2monerowallets',['<{} XMR key-address file> [str]'.format(pnm),'blockheight [int=(current height)]']),
('Syncmonerowallets', ['<{} XMR key-address file> [str]'.format(pnm)]),
('Keyaddrlist2monerowallets',['<{} XMR key-address file> [str]'.format(pnm),'blockheight [int=(current height)]',"addrs [str=''] (addr idx list or range)"]),
('Syncmonerowallets', ['<{} XMR key-address file> [str]'.format(pnm),"addrs [str=''] (addr idx list or range)"]),
])
def usage(command):
@ -476,12 +476,13 @@ def Rand2file(outfile,nbytes,threads=4,silent=False):
def Bytespec(s): Msg(str(parse_nbytes(s)))
def Syncmonerowallets(infile): monero_wallet_ops(infile=infile,op='sync')
def Keyaddrlist2monerowallets(infile,blockheight=None,addrs=None):
monero_wallet_ops(infile=infile,op='create',blockheight=blockheight,addrs=addrs)
def Keyaddrlist2monerowallets(infile,blockheight=None):
monero_wallet_ops(infile=infile,op='create',blockheight=blockheight)
def Syncmonerowallets(infile,addrs=None):
monero_wallet_ops(infile=infile,op='sync',addrs=addrs)
def monero_wallet_ops(infile,op,blockheight=None):
def monero_wallet_ops(infile,op,blockheight=None,addrs=None):
def run_cmd(cmd):
import subprocess as sp
@ -570,7 +571,10 @@ def monero_wallet_ops(infile,op,blockheight=None):
msg('\r Block {h} / {h}'.format(h=height))
else:
msg(' Wallet in sync')
msg(' '+[l for l in p.before.splitlines() if l[:8] == 'Balance:'][0])
b = [l for l in p.before.splitlines() if l[:8] == 'Balance:'][0].split()
msg(' Balance: {} Unlocked balance: {}'.format(b[1],b[4]))
bals[0] += float(b[1][0:-1])
bals[1] += float(b[4])
my_sendline(p,'Exiting','exit',5)
p.read()
break
@ -585,9 +589,11 @@ def monero_wallet_ops(infile,op,blockheight=None):
init_coin('xmr')
from mmgen.addr import AddrList
al = KeyAddrList(infile)
dl = len(al.data)
data = [d for d in al.data if addrs == None or d.idx in AddrIdxList(addrs)]
dl = len(data)
assert dl,"No addresses in addrfile within range '{}'".format(addrs)
gmsg('\n{}ing {} wallet{}'.format(m[op][0],dl,suf(dl)))
for n,d in enumerate(al.data): # [d.sec,d.wallet_passwd,d.viewkey,d.addr]
for n,d in enumerate(data): # [d.sec,d.wallet_passwd,d.viewkey,d.addr]
fn = '{}{}-{}-MoneroWallet'.format(
(opt.outdir+'/' if opt.outdir else ''),
al.al_id.sid,
@ -595,11 +601,15 @@ def monero_wallet_ops(infile,op,blockheight=None):
gmsg('\n{}ing wallet {}/{} ({})'.format(m[op][1],n+1,dl,fn))
m[op][2](n,d,fn)
gmsg('\n{} wallet{} {}ed'.format(dl,suf(dl),m[op][0].lower()))
if op == 'sync':
msg('Balance: {:.12f}, Unlocked balance: {:.12f}'.format(*bals))
os.environ['LANG'] = 'C'
import pexpect
if blockheight != None and int(blockheight) < 0: blockheight = 0 # TODO: non-zero coverage
if blockheight != None and int(blockheight) < 0:
blockheight = 0 # TODO: non-zero coverage
cur_height = test_rpc()
bals = [0.0,0.0] # locked,unlocked
try:
process_wallets()
@ -608,7 +618,10 @@ def monero_wallet_ops(infile,op,blockheight=None):
except EOFError:
rdie(2,'\nEnd of file\n')
except Exception as e:
rdie(1,'Program died: {!r}'.format(e))
try:
die(1,'Error: {}'.format(e[0]))
except:
rdie(1,'Error: {!r}'.format(e))
# ================ RPC commands ================== #

View file

@ -859,3 +859,14 @@ def rpc_init(reinit=False):
g.rpch = conn
return conn
def format_text(s,indent=0,width=80):
words,lines = s.split(),[]
assert width >= indent + 4,'width must be >= indent + 4'
while words:
line = ''
while len(line) <= (width-indent) and words:
if len(line) + len(words[0]) + 1 > width-indent: break
line += ('',' ')[bool(line)] + words.pop(0)
lines.append(' '*indent + line)
return '\n'.join(lines) + '\n'

View file

@ -161,8 +161,13 @@ s_monero='The monerod (mainnet) daemon must be running for the following tests'
ROUNDS=1000
t_monero=(
'python cmds/mmgen-keygen --accept-defaults --outdir $TMPDIR --coin=xmr test/ref/98831F3A.mmwords 3,99,2,22-24,101-104'
'python cmds/mmgen-tool -q --accept-defaults --outdir $TMPDIR keyaddrlist2monerowallets $TMPDIR/988*XMR*akeys addrs=23'
'python cmds/mmgen-tool -q --accept-defaults --outdir $TMPDIR keyaddrlist2monerowallets $TMPDIR/988*XMR*akeys addrs=103-200'
'rm $TMPDIR/*-MoneroWallet*'
'python cmds/mmgen-tool -q --accept-defaults --outdir $TMPDIR keyaddrlist2monerowallets $TMPDIR/988*XMR*akeys'
'python cmds/mmgen-tool -q --outdir $TMPDIR syncmonerowallets $TMPDIR/988*XMR*akeys'
'python cmds/mmgen-tool -q --accept-defaults --outdir $TMPDIR syncmonerowallets $TMPDIR/988*XMR*akeys addrs=3'
'python cmds/mmgen-tool -q --accept-defaults --outdir $TMPDIR syncmonerowallets $TMPDIR/988*XMR*akeys addrs=23-29'
'python cmds/mmgen-tool -q --accept-defaults --outdir $TMPDIR syncmonerowallets $TMPDIR/988*XMR*akeys'
)
[ "$MINGW" ] && t_monero=("$t_monero")
f_monero='Monero tests completed'

View file

@ -134,19 +134,20 @@ setup(
'mmgen.util',
'mmgen.main',
'mmgen.main_wallet',
'mmgen.main_addrgen',
'mmgen.main_passgen',
'mmgen.main_addrimport',
'mmgen.main_autosign',
'mmgen.main_passgen',
'mmgen.main_regtest',
'mmgen.main_split',
'mmgen.main_txcreate',
'mmgen.main_txbump',
'mmgen.main_txsign',
'mmgen.main_txsend',
'mmgen.main_txdo',
'mmgen.txsign',
'mmgen.main_tool',
'mmgen.main_txbump',
'mmgen.main_txcreate',
'mmgen.main_txdo',
'mmgen.main_txsend',
'mmgen.main_txsign',
'mmgen.main_wallet',
'mmgen.txsign',
'mmgen.share.__init__',
'mmgen.share.Opts',

View file

@ -904,6 +904,8 @@ def errmsg(s): stderr_save.write(s+'\n')
def errmsg_r(s): stderr_save.write(s)
if opt.list_cmds:
from mmgen.term import get_terminal_size
tw = get_terminal_size()[0]
fs = ' {:<{w}} - {}'
Msg(green('AVAILABLE COMMANDS:'))
w = max([len(i) for i in cmd_data])
@ -915,16 +917,20 @@ if opt.list_cmds:
Msg(' '+fs.format(cmd,cmd_data[cmd][1],w=w))
w = max([len(i) for i in meta_cmds])
Msg(green('\nAVAILABLE METACOMMANDS:'))
Msg('\n'+green('AVAILABLE METACOMMANDS:'))
for cmd in meta_cmds:
Msg(fs.format(cmd,' '.join(meta_cmds[cmd]),w=w))
ft = format_text(' '.join(meta_cmds[cmd]),width=tw,indent=4).lstrip()
sep = '\n' if not ft else ' ' if len(ft.splitlines()[0]) + len(cmd) < tw - 4 else '\n '
Msg_r(' {}{}{}'.format(yellow(cmd+':'),sep,ft))
w = max([len(i) for i in cmd_list])
Msg(green('\nAVAILABLE COMMAND GROUPS:'))
for g in cmd_list:
Msg(fs.format(g,' '.join(cmd_list[g]),w=w))
Msg('\n'+green('AVAILABLE COMMAND GROUPS:'))
for cmd in cmd_list:
ft = format_text(' '.join(cmd_list[cmd]),width=tw,indent=4).lstrip()
sep = '\n' if not ft else ' ' if len(ft.splitlines()[0]) + len(cmd) < tw - 4 else '\n '
Msg_r(' {}{}{}'.format(yellow(cmd+':'),sep,ft))
Msg(green('\nAVAILABLE UTILITIES:'))
Msg('\n'+green('AVAILABLE UTILITIES:'))
w = max([len(i) for i in utils])
for cmd in sorted(utils):
Msg(fs.format(cmd,utils[cmd],w=w))