From 1996ecab4a06d64de31a55019288054882a2fc3d Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Tue, 18 Apr 2023 18:35:57 +0000 Subject: [PATCH] autosign.py: new Signable class --- mmgen/autosign.py | 266 +++++++++++++++++++++++++--------------------- 1 file changed, 143 insertions(+), 123 deletions(-) diff --git a/mmgen/autosign.py b/mmgen/autosign.py index fd537682..ae4e6d0e 100755 --- a/mmgen/autosign.py +++ b/mmgen/autosign.py @@ -24,6 +24,120 @@ from .wallet import Wallet class AutosignConfig(Config): _set_ok = ('usr_randchars','_proto','outdir','passwd_file') +class Signable: + + class base: + + def __init__(self,parent): + self.parent = parent + self.cfg = parent.cfg + self.dir = getattr(parent,self.dir_name) + + @property + def unsigned(self): + if not hasattr(self,'_unsigned'): + dirlist = tuple(os.scandir(self.dir)) + names = tuple(f.name for f in dirlist) + self._unsigned = tuple(f for f in dirlist + if f.name.endswith('.'+self.rawext) + and f.name[:-len(self.rawext)]+self.sigext not in names) + return self._unsigned + + def print_bad_list(self,bad_files): + msg('\n{a}\n{b}'.format( + a = red(f'Failed {self.desc}s:'), + b = ' {}\n'.format('\n '.join(self.gen_bad_list(sorted(bad_files,key=lambda f: f.name)))) + )) + + class transaction(base): + desc = 'transaction' + rawext = 'rawtx' + sigext = 'sigtx' + dir_name = 'tx_dir' + fail_msg = 'failed to sign' + + async def sign(self,f): + from .tx import UnsignedTX + tx1 = UnsignedTX( cfg=self.cfg, filename=f.path ) + if tx1.proto.sign_mode == 'daemon': + from .rpc import rpc_init + tx1.rpc = await rpc_init( self.cfg, tx1.proto ) + from .tx.sign import txsign + tx2 = await txsign( self.cfg, tx1, self.parent.wallet_files[:], None, None ) + if tx2: + tx2.file.write(ask_write=False) + return tx2 + else: + return False + + def print_summary(self,txs): + + if self.cfg.full_summary: + bmsg('\nAutosign summary:\n') + msg_r('\n'.join(tx.info.format(terse=True) for tx in txs)) + return + + def gen(): + for tx in txs: + non_mmgen = [o for o in tx.outputs if not o.mmid] + if non_mmgen: + yield (tx,non_mmgen) + + body = list(gen()) + + if body: + bmsg('\nAutosign summary:') + fs = '{} {} {}' + t_wid,a_wid = 6,44 + + def gen(): + yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount') + yield fs.format('-'*t_wid, '-'*a_wid, '-'*7) + for tx,non_mmgen in body: + for nm in non_mmgen: + yield fs.format( + tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid, + nm.addr.fmt( width=a_wid, color=True ), + nm.amt.hl() + ' ' + yellow(tx.coin)) + + msg('\n' + '\n'.join(gen())) + else: + msg('\nNo non-MMGen outputs') + + def gen_bad_list(self,bad_files): + for f in bad_files: + yield red(f.path) + + class message(base): + desc = 'message file' + rawext = 'rawmsg.json' + sigext = 'sigmsg.json' + dir_name = 'msg_dir' + fail_msg = 'failed to sign or signed incompletely' + + async def sign(self,f): + from .msg import UnsignedMsg,SignedMsg + m = UnsignedMsg( self.cfg, infile=f.path ) + await m.sign( wallet_files=self.parent.wallet_files[:] ) + m = SignedMsg( self.cfg, data=m.__dict__ ) + m.write_to_file( + outdir = os.path.abspath(self.dir), + ask_overwrite = False ) + if m.data.get('failed_sids'): + die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}') + return m + + def print_summary(self,messages): + gmsg('\nSigned message files:') + for m in messages: + gmsg(' ' + os.path.join( self.dir, m.signed_filename )) + return + + def gen_bad_list(self,bad_files): + for f in bad_files: + sigfile = f.path[:-len(self.rawext)] + self.sigext + yield orange(sigfile) if os.path.exists(sigfile) else red(f.path) + class Autosign: dfl_mountpoint = os.path.join(os.sep,'mnt','mmgen_autosign') @@ -142,86 +256,6 @@ class Autosign: msg(f'Unmounting {self.mountpoint}') run( ['umount',self.mountpoint], check=True ) - async def sign_object(self,d,fn): - from .tx import UnsignedTX - from .tx.sign import txsign - from .rpc import rpc_init - try: - if d.desc == 'transaction': - tx1 = UnsignedTX( cfg=self.cfg, filename=fn ) - if tx1.proto.sign_mode == 'daemon': - tx1.rpc = await rpc_init( self.cfg, tx1.proto ) - tx2 = await txsign( self.cfg, tx1, self.wallet_files[:], None, None ) - if tx2: - tx2.file.write(ask_write=False) - return tx2 - else: - return False - elif d.desc == 'message file': - from .msg import UnsignedMsg,SignedMsg - m = UnsignedMsg( self.cfg, infile=fn ) - await m.sign( wallet_files=self.wallet_files[:] ) - m = SignedMsg( self.cfg, data=m.__dict__ ) - m.write_to_file( - outdir = os.path.abspath(self.msg_dir), - ask_overwrite = False ) - if m.data.get('failed_sids'): - die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}') - return m - except Exception as e: - ymsg(f'An error occurred with {d.desc} {fn!r}:\n {e!s}') - return False - except: - ymsg(f'An error occurred with {d.desc} {fn!r}') - return False - - async def sign(self,target): - - _td = namedtuple('tdata',['desc','rawext','sigext','dir','fail_desc']) - - d = { - 'msg': _td('message file', 'rawmsg.json', 'sigmsg.json', self.msg_dir, 'sign or signed incompletely'), - 'tx': _td('transaction', 'rawtx', 'sigtx', self.tx_dir, 'sign'), - }[target] - - raw = [fn[:-len(d.rawext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.rawext)] - signed = [fn[:-len(d.sigext)] for fn in os.listdir(d.dir) if fn.endswith('.'+d.sigext)] - unsigned = [os.path.join( d.dir, fn+d.rawext ) for fn in raw if fn not in signed] - - if unsigned: - ok = [] - bad = [] - for fn in unsigned: - ret = await self.sign_object(d,fn) - if ret: - ok.append(ret) - else: - bad.append(fn) - self.cfg._util.qmsg('') - await asyncio.sleep(0.3) - msg(f'{len(ok)} {d.desc}{suf(ok)} signed') - if bad: - rmsg(f'{len(bad)} {d.desc}{suf(bad)} failed to {d.fail_desc}') - if ok and not self.cfg.no_summary: - self.print_summary(d,ok) - if bad: - msg('') - rmsg(f'Failed {d.desc}s:') - def gen_bad_disp(): - if d.desc == 'transaction': - for fn in sorted(bad): - yield red(fn) - elif d.desc == 'message file': - for rawfn in sorted(bad): - sigfn = rawfn[:-len(d.rawext)] + d.sigext - yield orange(sigfn) if os.path.exists(sigfn) else red(rawfn) - msg(' {}\n'.format( '\n '.join(gen_bad_disp()) )) - return False if bad else True - else: - msg(f'No unsigned {d.desc}s') - await asyncio.sleep(0.5) - return True - def decrypt_wallets(self): msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}') fails = 0 @@ -234,48 +268,34 @@ class Autosign: return False if fails else True - def print_summary(self,d,signed_objects): - - if d.desc == 'message file': - gmsg('\nSigned message files:') - for m in signed_objects: - gmsg(' ' + os.path.join( self.msg_dir, m.signed_filename )) - return - - if self.cfg.full_summary: - bmsg('\nAutosign summary:\n') - def gen(): - for tx in signed_objects: - yield tx.info.format(terse=True) - msg_r('\n'.join(gen())) - return - - def gen(): - for tx in signed_objects: - non_mmgen = [o for o in tx.outputs if not o.mmid] - if non_mmgen: - yield (tx,non_mmgen) - - body = list(gen()) - - if body: - bmsg('\nAutosign summary:') - fs = '{} {} {}' - t_wid,a_wid = 6,44 - - def gen(): - yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount') - yield fs.format('-'*t_wid, '-'*a_wid, '-'*7) - for tx,non_mmgen in body: - for nm in non_mmgen: - yield fs.format( - tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid, - nm.addr.fmt( width=a_wid, color=True ), - nm.amt.hl() + ' ' + yellow(tx.coin)) - - msg('\n' + '\n'.join(gen())) + async def sign_all(self,target_name): + target = getattr(Signable,target_name)(self) + if target.unsigned: + good = [] + bad = [] + for f in target.unsigned: + ret = None + try: + ret = await target.sign(f) + except Exception as e: + ymsg(f'An error occurred with {target.desc} {f.name!r}:\n {type(e).__name__}: {e!s}') + except: + ymsg(f'An error occurred with {target.desc} {f.name!r}') + good.append(ret) if ret else bad.append(f) + self.cfg._util.qmsg('') + await asyncio.sleep(0.3) + msg(f'{len(good)} {target.desc}{suf(good)} signed') + if bad: + rmsg(f'{len(bad)} {target.desc}{suf(bad)} {target.fail_msg}') + if good and not self.cfg.no_summary: + target.print_summary(good) + if bad: + target.print_bad_list(bad) + return not bad else: - msg('\nNo non-MMGen outputs') + msg(f'No unsigned {target.desc}s') + await asyncio.sleep(0.5) + return True async def do_sign(self): if not self.cfg.stealth_led: @@ -285,8 +305,8 @@ class Autosign: if key_ok: if self.cfg.stealth_led: self.led.set('busy') - ret1 = await self.sign('tx') - ret2 = await self.sign('msg') if self.have_msg_dir else True + ret1 = await self.sign_all('transaction') + ret2 = await self.sign_all('message') if self.have_msg_dir else True ret = ret1 and ret2 self.do_umount() self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)])