autosign.py: new Signable class

This commit is contained in:
The MMGen Project 2023-04-18 18:35:57 +00:00
commit 1996ecab4a
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2

View file

@ -24,6 +24,120 @@ from .wallet import Wallet
class AutosignConfig(Config):
_set_ok = ('usr_randchars','_proto','outdir','passwd_file')
class Signable:
class base:
def __init__(self,parent):
self.parent = parent
self.cfg = parent.cfg
self.dir = getattr(parent,self.dir_name)
@property
def unsigned(self):
if not hasattr(self,'_unsigned'):
dirlist = tuple(os.scandir(self.dir))
names = tuple(f.name for f in dirlist)
self._unsigned = tuple(f for f in dirlist
if f.name.endswith('.'+self.rawext)
and f.name[:-len(self.rawext)]+self.sigext not in names)
return self._unsigned
def print_bad_list(self,bad_files):
msg('\n{a}\n{b}'.format(
a = red(f'Failed {self.desc}s:'),
b = ' {}\n'.format('\n '.join(self.gen_bad_list(sorted(bad_files,key=lambda f: f.name))))
))
class transaction(base):
desc = 'transaction'
rawext = 'rawtx'
sigext = 'sigtx'
dir_name = 'tx_dir'
fail_msg = 'failed to sign'
async def sign(self,f):
from .tx import UnsignedTX
tx1 = UnsignedTX( cfg=self.cfg, filename=f.path )
if tx1.proto.sign_mode == 'daemon':
from .rpc import rpc_init
tx1.rpc = await rpc_init( self.cfg, tx1.proto )
from .tx.sign import txsign
tx2 = await txsign( self.cfg, tx1, self.parent.wallet_files[:], None, None )
if tx2:
tx2.file.write(ask_write=False)
return tx2
else:
return False
def print_summary(self,txs):
if self.cfg.full_summary:
bmsg('\nAutosign summary:\n')
msg_r('\n'.join(tx.info.format(terse=True) for tx in txs))
return
def gen():
for tx in txs:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx,non_mmgen)
body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid,a_wid = 6,44
def gen():
yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
for tx,non_mmgen in body:
for nm in non_mmgen:
yield fs.format(
tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid,
nm.addr.fmt( width=a_wid, color=True ),
nm.amt.hl() + ' ' + yellow(tx.coin))
msg('\n' + '\n'.join(gen()))
else:
msg('\nNo non-MMGen outputs')
def gen_bad_list(self,bad_files):
for f in bad_files:
yield red(f.path)
class message(base):
desc = 'message file'
rawext = 'rawmsg.json'
sigext = 'sigmsg.json'
dir_name = 'msg_dir'
fail_msg = 'failed to sign or signed incompletely'
async def sign(self,f):
from .msg import UnsignedMsg,SignedMsg
m = UnsignedMsg( self.cfg, infile=f.path )
await m.sign( wallet_files=self.parent.wallet_files[:] )
m = SignedMsg( self.cfg, data=m.__dict__ )
m.write_to_file(
outdir = os.path.abspath(self.dir),
ask_overwrite = False )
if m.data.get('failed_sids'):
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
return m
def print_summary(self,messages):
gmsg('\nSigned message files:')
for m in messages:
gmsg(' ' + os.path.join( self.dir, m.signed_filename ))
return
def gen_bad_list(self,bad_files):
for f in bad_files:
sigfile = f.path[:-len(self.rawext)] + self.sigext
yield orange(sigfile) if os.path.exists(sigfile) else red(f.path)
class Autosign:
dfl_mountpoint = os.path.join(os.sep,'mnt','mmgen_autosign')
@ -142,86 +256,6 @@ class Autosign:
msg(f'Unmounting {self.mountpoint}')
run( ['umount',self.mountpoint], check=True )
async def sign_object(self,d,fn):
from .tx import UnsignedTX
from .tx.sign import txsign
from .rpc import rpc_init
try:
if d.desc == 'transaction':
tx1 = UnsignedTX( cfg=self.cfg, filename=fn )
if tx1.proto.sign_mode == 'daemon':
tx1.rpc = await rpc_init( self.cfg, tx1.proto )
tx2 = await txsign( self.cfg, tx1, self.wallet_files[:], None, None )
if tx2:
tx2.file.write(ask_write=False)
return tx2
else:
return False
elif d.desc == 'message file':
from .msg import UnsignedMsg,SignedMsg
m = UnsignedMsg( self.cfg, infile=fn )
await m.sign( wallet_files=self.wallet_files[:] )
m = SignedMsg( self.cfg, data=m.__dict__ )
m.write_to_file(
outdir = os.path.abspath(self.msg_dir),
ask_overwrite = False )
if m.data.get('failed_sids'):
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
return m
except Exception as e:
ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}')
return False
except:
ymsg(f'An error occurred with {d.desc} {fn!r}')
return False
async def sign(self,target):
_td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc'])
d = {
'msg': _td('message file', 'rawmsg.json', 'sigmsg.json', self.msg_dir, 'sign or signed incompletely'),
'tx': _td('transaction', 'rawtx', 'sigtx', self.tx_dir, 'sign'),
}[target]
raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)]
signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)]
unsigned = [os.path.join( d.dir, fn+d.rawext ) for fn in raw if fn not in signed]
if unsigned:
ok = []
bad = []
for fn in unsigned:
ret = await self.sign_object(d,fn)
if ret:
ok.append(ret)
else:
bad.append(fn)
self.cfg._util.qmsg('')
await asyncio.sleep(0.3)
msg(f'{len(ok)} {d.desc}{suf(ok)} signed')
if bad:
rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}')
if ok and not self.cfg.no_summary:
self.print_summary(d,ok)
if bad:
msg('')
rmsg(f'Failed {d.desc}s:')
def gen_bad_disp():
if d.desc == 'transaction':
for fn in sorted(bad):
yield red(fn)
elif d.desc == 'message file':
for rawfn in sorted(bad):
sigfn = rawfn[:-len(d.rawext)] + d.sigext
yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn)
msg(' {}\n'.format( '\n '.join(gen_bad_disp()) ))
return False if bad else True
else:
msg(f'No unsigned {d.desc}s')
await asyncio.sleep(0.5)
return True
def decrypt_wallets(self):
msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}')
fails = 0
@ -234,48 +268,34 @@ class Autosign:
return False if fails else True
def print_summary(self,d,signed_objects):
if d.desc == 'message file':
gmsg('\nSigned message files:')
for m in signed_objects:
gmsg(' ' + os.path.join( self.msg_dir, m.signed_filename ))
return
if self.cfg.full_summary:
bmsg('\nAutosign summary:\n')
def gen():
for tx in signed_objects:
yield tx.info.format(terse=True)
msg_r('\n'.join(gen()))
return
def gen():
for tx in signed_objects:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx,non_mmgen)
body = list(gen())
if body:
bmsg('\nAutosign summary:')
fs = '{} {} {}'
t_wid,a_wid = 6,44
def gen():
yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
for tx,non_mmgen in body:
for nm in non_mmgen:
yield fs.format(
tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid,
nm.addr.fmt( width=a_wid, color=True ),
nm.amt.hl() + ' ' + yellow(tx.coin))
msg('\n' + '\n'.join(gen()))
async def sign_all(self,target_name):
target = getattr(Signable,target_name)(self)
if target.unsigned:
good = []
bad = []
for f in target.unsigned:
ret = None
try:
ret = await target.sign(f)
except Exception as e:
ymsg(f'An error occurred with {target.desc} {f.name!r}:\n {type(e).__name__}: {e!s}')
except:
ymsg(f'An error occurred with {target.desc} {f.name!r}')
good.append(ret) if ret else bad.append(f)
self.cfg._util.qmsg('')
await asyncio.sleep(0.3)
msg(f'{len(good)} {target.desc}{suf(good)} signed')
if bad:
rmsg(f'{len(bad)} {target.desc}{suf(bad)} {target.fail_msg}')
if good and not self.cfg.no_summary:
target.print_summary(good)
if bad:
target.print_bad_list(bad)
return not bad
else:
msg('\nNo non-MMGen outputs')
msg(f'No unsigned {target.desc}s')
await asyncio.sleep(0.5)
return True
async def do_sign(self):
if not self.cfg.stealth_led:
@ -285,8 +305,8 @@ class Autosign:
if key_ok:
if self.cfg.stealth_led:
self.led.set('busy')
ret1 = await self.sign('tx')
ret2 = await self.sign('msg') if self.have_msg_dir else True
ret1 = await self.sign_all('transaction')
ret2 = await self.sign_all('message') if self.have_msg_dir else True
ret = ret1 and ret2
self.do_umount()
self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)])