message signing: support autosigning

Usage information:

    $ mmgen-autosign --help

Testing:

    $ test/test.py -e autosign_btc
This commit is contained in:
The MMGen Project 2022-03-30 15:49:45 +00:00
commit 25efac31bc
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
7 changed files with 207 additions and 46 deletions

View file

@ -1 +1 @@
13.1.dev22
13.1.dev23

View file

@ -49,6 +49,7 @@ class InvalidPasswdFormat(Exception): mmcode = 1
class CfgFileParseError(Exception): mmcode = 1
class UserOptError(Exception): mmcode = 1
class NoLEDSupport(Exception): mmcode = 1
class MsgFileFailedSID(Exception): mmcode = 1
# 2: yellow hl, message only
class InvalidTokenAddress(Exception): mmcode = 2

View file

@ -17,18 +17,19 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
mmgen-autosign: Auto-sign MMGen transactions
mmgen-autosign: Auto-sign MMGen transactions and message files
"""
import sys,os,asyncio,signal,shutil
from subprocess import run,PIPE,DEVNULL
from collections import namedtuple
from stat import *
from .common import *
from .color import red
mountpoint = '/mnt/tx'
tx_dir = '/mnt/tx/tx'
msg_dir = '/mnt/tx/msg'
part_label = 'MMGEN_TX'
wallet_dir = '/dev/shm/autosign'
mn_fmts = {
@ -42,7 +43,7 @@ opts.UserOpts._set_ok += ('outdir','passwd_file')
opts_data = {
'sets': [('stealth_led', True, 'led', True)],
'text': {
'desc': 'Auto-sign MMGen transactions',
'desc': 'Auto-sign MMGen transactions and message files',
'usage':'[opts] [command]',
'options': f"""
-h, --help Print this help message
@ -74,18 +75,21 @@ wait - start in loop mode: wait-mount-sign-unmount-wait
USAGE NOTES
If invoked with no command, the program mounts a removable device containing
MMGen transactions, signs any unsigned transactions, unmounts the removable
device and exits.
unsigned MMGen transactions and/or message files, signs them, unmounts the
removable device and exits.
If invoked with 'wait', the program waits in a loop, mounting, signing and
unmounting every time the removable device is inserted.
If invoked with 'wait', the program waits in a loop, mounting the removable
device, performing signing operations and unmounting the device every time it
is inserted.
On supported platforms (currently Orange Pi, Rock Pi and Raspberry Pi boards),
the status LED indicates whether the program is busy or in standby mode, i.e.
ready for device insertion or removal.
The removable device must have a partition labeled MMGEN_TX with a user-
writable directory '/tx', where unsigned MMGen transactions are placed.
writable root directory and a directory named '/tx', where unsigned MMGen
transactions are placed. Optionally, the directory '/msg' may also be created
and unsigned message files created by `mmgen-msg` placed in this directory.
On the signing machine the mount point {mountpoint!r} must exist and /etc/fstab
must contain the following entry:
@ -145,6 +149,7 @@ if opt.mountpoint:
mountpoint = opt.mountpoint
keyfile = os.path.join(mountpoint,'autosign.key')
msg_dir = os.path.join(mountpoint,'msg')
tx_dir = os.path.join(mountpoint,'tx')
opt.outdir = tx_dir
@ -184,12 +189,15 @@ def do_mount():
if not os.path.ismount(mountpoint):
if run(['mount',mountpoint],stderr=DEVNULL,stdout=DEVNULL).returncode == 0:
msg(f'Mounting {mountpoint}')
try:
ds = os.stat(tx_dir)
assert S_ISDIR(ds.st_mode), f'{tx_dir!r} is not a directory!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{tx_dir!r} is not read/write for this user!'
except:
die(1,f'{tx_dir!r} missing or not read/writable by user!')
global have_msg_dir
have_msg_dir = os.path.isdir(msg_dir)
for cdir in [tx_dir] + ([msg_dir] if have_msg_dir else []):
try:
ds = os.stat(cdir)
assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!'
assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!'
except:
die(1,f'{cdir!r} missing or not read/writable by user!')
def do_umount():
if os.path.ismount(mountpoint):
@ -197,52 +205,77 @@ def do_umount():
msg(f'Unmounting {mountpoint}')
run(['umount',mountpoint],check=True)
async def sign_tx_file(txfile):
async def sign_object(d,fn):
try:
tx1 = UnsignedTX(filename=txfile)
if tx1.proto.sign_mode == 'daemon':
tx1.rpc = await rpc_init(tx1.proto)
tx2 = await txsign(tx1,wfs,None,None)
if tx2:
tx2.file.write(ask_write=False)
return tx2
else:
return False
if d.desc == 'transaction':
tx1 = UnsignedTX(filename=fn)
if tx1.proto.sign_mode == 'daemon':
tx1.rpc = await rpc_init(tx1.proto)
tx2 = await txsign(tx1,wfs[:],None,None)
if tx2:
tx2.file.write(ask_write=False)
return tx2
else:
return False
elif d.desc == 'message file':
from .msg import UnsignedMsg,SignedMsg
m = UnsignedMsg(infile=fn)
await m.sign(wallet_files=wfs[:])
m = SignedMsg(data=m.__dict__)
m.write_to_file(
outdir = os.path.abspath(msg_dir),
ask_overwrite = False )
if getattr(m,'failed_sids',None):
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.failed_sids,fmt="bare")}')
return m
except Exception as e:
ymsg(f'An error occurred with transaction {txfile!r}:\n {e!s}')
ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}')
return False
except:
ymsg(f'An error occurred with transaction {txfile!r}')
ymsg(f'An error occurred with {d.desc} {fn!r}')
return False
async def sign():
async def sign(target):
td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc'])
d = {
'msg': td('message file', 'rawmsg.json', 'sigmsg.json', msg_dir, 'sign or signed incompletely'),
'tx': td('transaction', 'rawtx', 'sigtx', tx_dir, 'sign'),
}[target]
raw = [fn[:-len('rawtx')] for fn in os.listdir(tx_dir) if fn.endswith('.rawtx')]
signed = [fn[:-len('sigtx')] for fn in os.listdir(tx_dir) if fn.endswith('.sigtx')]
unsigned = [os.path.join(tx_dir,fn+'rawtx') for fn in raw if fn not in signed]
raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)]
signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)]
unsigned = [os.path.join(d.dir,fn+d.rawext) for fn in raw if fn not in signed]
if unsigned:
ok,bad = ([],[])
for txfile in unsigned:
ret = await sign_tx_file(txfile)
for fn in unsigned:
ret = await sign_object(d,fn)
if ret:
ok.append(ret)
else:
bad.append(txfile)
bad.append(fn)
qmsg('')
await asyncio.sleep(0.3)
msg(f'{len(ok)} transaction{suf(ok)} signed')
msg(f'{len(ok)} {d.desc}{suf(ok)} signed')
if bad:
rmsg(f'{len(bad)} transaction{suf(bad)} failed to sign')
rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}')
if ok and not opt.no_summary:
print_summary(ok)
print_summary(d,ok)
if bad:
msg('')
rmsg('Failed transactions:')
msg(' ' + '\n '.join(red(s) for s in sorted(bad)) + '\n') # avoid the 'less' NL color bug
rmsg(f'Failed {d.desc}s:')
def gen_bad_disp():
if d.desc == 'transaction':
for fn in sorted(bad):
yield red(fn)
elif d.desc == 'message file':
for rawfn in sorted(bad):
sigfn = rawfn[:-len(d.rawext)] + d.sigext
yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn)
msg(' {}\n'.format( '\n '.join(gen_bad_disp()) ))
return False if bad else True
else:
msg('No unsigned transactions')
msg(f'No unsigned {d.desc}s')
await asyncio.sleep(0.5)
return True
@ -258,18 +291,24 @@ def decrypt_wallets():
return False if fails else True
def print_summary(signed_txs):
def print_summary(d,signed_objects):
if d.desc == 'message file':
gmsg('\nSigned message files:')
for m in signed_objects:
gmsg(' ' + os.path.join(msg_dir,m.signed_filename) )
return
if opt.full_summary:
bmsg('\nAutosign summary:\n')
def gen():
for tx in signed_txs:
for tx in signed_objects:
yield tx.info.format(terse=True)
msg_r(''.join(gen()))
return
def gen():
for tx in signed_txs:
for tx in signed_objects:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx,non_mmgen)
@ -303,7 +342,9 @@ async def do_sign():
if key_ok:
if opt.stealth_led:
led.set('busy')
ret = await sign()
ret1 = await sign('tx')
ret2 = await sign('msg') if have_msg_dir else True
ret = ret1 and ret2
do_umount()
led.set(('standby','off','error')[(not ret)*2 or bool(opt.stealth_led)])
return ret

View file

@ -0,0 +1,12 @@
{
"id": "MMGen unsigned message data",
"metadata": {
"addrlists": [
"98831F3A:B:9,3-5",
"99E0FC61:S:102"
],
"message": "24/3/2022 Russia announces it will sell gas for bitcoin",
"network": "btc_mainnet"
},
"signatures": {}
}

View file

@ -0,0 +1,12 @@
{
"id": "MMGen unsigned message data",
"metadata": {
"addrlists": [
"C3B3290F:L:33",
"DEADBEEF:C:1"
],
"message": "08/Jun/2021 Bitcoin Law Enacted by El Salvador Legislative Assembly",
"network": "btc_mainnet"
},
"signatures": {}
}

View file

@ -0,0 +1,11 @@
{
"id": "MMGen unsigned message data",
"metadata": {
"addrlists": [
"98831F3A:B:1"
],
"message": "24/3/2022 Russia announces it will sell gas for bitcoin",
"network": "ltc_mainnet"
},
"signatures": {}
}

View file

@ -111,6 +111,18 @@ class TestSuiteAutosignBase(TestSuiteBase):
self.tx_file_ops('set_count') # initialize tx_count here so we can resume anywhere
def gen_msg_fns():
fmap = dict(filedir_map)
for coin in self.coins:
sdir = os.path.join('test','ref',fmap[coin])
for fn in os.listdir(sdir):
if fn.endswith(f'[{coin.upper()}].rawmsg.json'):
yield os.path.join(sdir,fn)
self.ref_msgfiles = tuple(gen_msg_fns())
self.good_msg_count = 0
self.bad_msg_count = 0
def __del__(self):
if self.simulate or not self.live:
LEDControl.delete_dummy_control_files()
@ -224,6 +236,44 @@ class TestSuiteAutosignBase(TestSuiteBase):
self.bad_tx_count = 0
return 'ok'
def copy_msgfiles(self):
return self.msgfile_ops('copy')
def remove_signed_msgfiles(self):
return self.msgfile_ops('remove_signed')
def create_invalid_msgfile(self):
return self.msgfile_ops('create_invalid')
def remove_invalid_msgfile(self):
return self.msgfile_ops('remove_invalid')
def msgfile_ops(self,op):
self.spawn('',msg_only=True)
destdir = joinpath(self.mountpoint,'msg')
os.makedirs(destdir,exist_ok=True)
if op.endswith('_invalid'):
fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json')
if op == 'create_invalid':
with open(fn,'w') as fp:
fp.write('bad data\n')
self.bad_msg_count += 1
elif op == 'remove_invalid':
os.unlink(fn)
self.bad_msg_count -= 1
else:
for fn in self.ref_msgfiles:
if op == 'copy':
if os.path.basename(fn) == '9DA060[BTC].rawmsg.json': # contains bad Seed ID
self.bad_msg_count += 1
else:
self.good_msg_count += 1
imsg(f'Copying: {fn} -> {destdir}')
shutil.copy2(fn,destdir)
elif op == 'remove_signed':
os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
return 'ok'
class TestSuiteAutosign(TestSuiteAutosignBase):
'autosigning transactions for all supported coins'
coins = ['btc','bch','ltc','eth']
@ -247,17 +297,39 @@ class TestSuiteAutosign(TestSuiteAutosignBase):
('sign_led', 'signing transactions (--led - BTC files only)'),
('remove_signed_txfiles', 'removing signed transaction files'),
('sign_stealth_led', 'signing transactions (--stealth-led)'),
('remove_signed_txfiles', 'removing signed transaction files'),
('copy_msgfiles', 'copying message files'),
('sign_quiet_msg', 'signing transactions and messages (--quiet)'),
('remove_signed_txfiles', 'removing signed transaction files'),
('create_bad_txfiles', 'creating bad transaction files'),
('remove_signed_msgfiles', 'removing signed message files'),
('create_invalid_msgfile', 'creating invalid message file'),
('sign_full_summary_msg', 'signing transactions and messages (--full-summary)'),
('remove_invalid_msgfile', 'removing invalid message file'),
('remove_bad_txfiles', 'removing bad transaction files'),
('sign_no_unsigned_msg', 'signing transactions and messages (nothing to sign)'),
('stop_daemons', 'stopping daemons'),
)
def do_sign(self,args):
def do_sign(self,args,have_msg=False):
t = self.spawn('mmgen-autosign', self.opts + args )
t.expect(f'{self.tx_count} transactions signed')
t.expect(
f'{self.tx_count} transactions signed' if self.tx_count else
'No unsigned transactions' )
if self.bad_tx_count:
t.expect(f'{self.bad_tx_count} transactions failed to sign')
t.req_exit_val = 1
if have_msg:
t.expect(
f'{self.good_msg_count} message files{{0,1}} signed' if self.good_msg_count else
'No unsigned message files', regex=True )
if self.bad_msg_count:
t.expect(f'{self.bad_msg_count} message files{{0,1}} failed to sign', regex=True)
t.req_exit_val = 1
if 'wait' in args:
t.expect('Waiting')
t.kill(2)
@ -280,6 +352,18 @@ class TestSuiteAutosign(TestSuiteAutosignBase):
def sign_stealth_led(self):
return self.do_sign(['--quiet','--stealth-led','wait'])
def sign_quiet_msg(self):
return self.do_sign(['--quiet','wait'],have_msg=True)
def sign_full_summary_msg(self):
return self.do_sign(['--full-summary','wait'],have_msg=True)
def sign_no_unsigned_msg(self):
self.tx_count = 0
self.good_msg_count = 0
self.bad_msg_count = 0
return self.do_sign(['--quiet','wait'],have_msg=True)
class TestSuiteAutosignBTC(TestSuiteAutosign):
'autosigning BTC transactions'
coins = ['btc']