autosign.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen
  9. # https://gitlab.com/mmgen/mmgen
  10. """
  11. autosign: Auto-sign MMGen transactions and message files
  12. """
  13. import sys,os,asyncio
  14. from subprocess import run,PIPE,DEVNULL
  15. from collections import namedtuple
  16. from .cfg import Config
  17. from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt,fmt_list
  18. from .color import yellow,red,orange
  19. from .wallet import Wallet,get_wallet_cls
  20. from .filename import find_file_in_dir
  21. from .ui import keypress_confirm
  22. class AutosignConfig(Config):
  23. _set_ok = ('usr_randchars','_proto','outdir','passwd_file')
  24. class Signable:
  25. class base:
  26. def __init__(self,parent):
  27. self.parent = parent
  28. self.cfg = parent.cfg
  29. self.dir = getattr(parent,self.dir_name)
  30. @property
  31. def unsigned(self):
  32. return self._unprocessed( '_unsigned', self.rawext, self.sigext )
  33. def _unprocessed(self,attrname,rawext,sigext):
  34. if not hasattr(self,attrname):
  35. dirlist = tuple(os.scandir(self.dir))
  36. names = tuple(f.name for f in dirlist)
  37. setattr(
  38. self,
  39. attrname,
  40. tuple(f for f in dirlist
  41. if f.name.endswith('.' + rawext)
  42. and f.name[:-len(rawext)] + sigext not in names) )
  43. return getattr(self,attrname)
  44. def print_bad_list(self,bad_files):
  45. msg('\n{a}\n{b}'.format(
  46. a = red(f'Failed {self.desc}s:'),
  47. b = ' {}\n'.format('\n '.join(self.gen_bad_list(sorted(bad_files,key=lambda f: f.name))))
  48. ))
  49. class transaction(base):
  50. desc = 'transaction'
  51. rawext = 'rawtx'
  52. sigext = 'sigtx'
  53. dir_name = 'tx_dir'
  54. fail_msg = 'failed to sign'
  55. async def sign(self,f):
  56. from .tx import UnsignedTX
  57. tx1 = UnsignedTX( cfg=self.cfg, filename=f.path )
  58. if tx1.proto.sign_mode == 'daemon':
  59. from .rpc import rpc_init
  60. tx1.rpc = await rpc_init( self.cfg, tx1.proto )
  61. from .tx.sign import txsign
  62. tx2 = await txsign( self.cfg, tx1, self.parent.wallet_files[:], None, None )
  63. if tx2:
  64. tx2.file.write(ask_write=False)
  65. return tx2
  66. else:
  67. return False
  68. def print_summary(self,txs):
  69. if self.cfg.full_summary:
  70. bmsg('\nAutosign summary:\n')
  71. msg_r('\n'.join(tx.info.format(terse=True) for tx in txs))
  72. return
  73. def gen():
  74. for tx in txs:
  75. non_mmgen = [o for o in tx.outputs if not o.mmid]
  76. if non_mmgen:
  77. yield (tx,non_mmgen)
  78. body = list(gen())
  79. if body:
  80. bmsg('\nAutosign summary:')
  81. fs = '{} {} {}'
  82. t_wid,a_wid = 6,44
  83. def gen():
  84. yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
  85. yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
  86. for tx,non_mmgen in body:
  87. for nm in non_mmgen:
  88. yield fs.format(
  89. tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid,
  90. nm.addr.fmt( width=a_wid, color=True ),
  91. nm.amt.hl() + ' ' + yellow(tx.coin))
  92. msg('\n' + '\n'.join(gen()))
  93. else:
  94. msg('\nNo non-MMGen outputs')
  95. def gen_bad_list(self,bad_files):
  96. for f in bad_files:
  97. yield red(f.path)
  98. class message(base):
  99. desc = 'message file'
  100. rawext = 'rawmsg.json'
  101. sigext = 'sigmsg.json'
  102. dir_name = 'msg_dir'
  103. fail_msg = 'failed to sign or signed incompletely'
  104. async def sign(self,f):
  105. from .msg import UnsignedMsg,SignedMsg
  106. m = UnsignedMsg( self.cfg, infile=f.path )
  107. await m.sign( wallet_files=self.parent.wallet_files[:] )
  108. m = SignedMsg( self.cfg, data=m.__dict__ )
  109. m.write_to_file(
  110. outdir = os.path.abspath(self.dir),
  111. ask_overwrite = False )
  112. if m.data.get('failed_sids'):
  113. die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
  114. return m
  115. def print_summary(self,messages):
  116. gmsg('\nSigned message files:')
  117. for m in messages:
  118. gmsg(' ' + os.path.join( self.dir, m.signed_filename ))
  119. def gen_bad_list(self,bad_files):
  120. for f in bad_files:
  121. sigfile = f.path[:-len(self.rawext)] + self.sigext
  122. yield orange(sigfile) if os.path.exists(sigfile) else red(f.path)
  123. class Autosign:
  124. dfl_mountpoint = os.path.join(os.sep,'mnt','mmgen_autosign')
  125. dfl_wallet_dir = os.path.join(os.sep,'dev','shm','autosign')
  126. disk_label_dir = os.path.join(os.sep,'dev','disk','by-label')
  127. part_label = 'MMGEN_TX'
  128. old_dfl_mountpoint = os.path.join(os.sep,'mnt','tx')
  129. old_dfl_mountpoint_errmsg = f"""
  130. Mountpoint {old_dfl_mountpoint!r} is no longer supported!
  131. Please rename {old_dfl_mountpoint!r} to {dfl_mountpoint!r}
  132. and update your fstab accordingly.
  133. """
  134. mountpoint_errmsg_fs = """
  135. Mountpoint {!r} does not exist or does not point
  136. to a directory! Please create the mountpoint and add an entry
  137. to your fstab as described in this script’s help text.
  138. """
  139. mn_fmts = {
  140. 'mmgen': 'words',
  141. 'bip39': 'bip39',
  142. }
  143. dfl_mn_fmt = 'mmgen'
  144. have_msg_dir = False
  145. def __init__(self,cfg):
  146. if cfg.mnemonic_fmt:
  147. if cfg.mnemonic_fmt not in self.mn_fmts:
  148. die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format(
  149. cfg.mnemonic_fmt,
  150. fmt_list( self.mn_fmts, fmt='no_spc' ) ))
  151. self.cfg = cfg
  152. self.mountpoint = cfg.mountpoint or self.dfl_mountpoint
  153. self.wallet_dir = cfg.wallet_dir or self.dfl_wallet_dir
  154. self.tx_dir = os.path.join( self.mountpoint, 'tx' )
  155. self.msg_dir = os.path.join( self.mountpoint, 'msg' )
  156. self.keyfile = os.path.join( self.mountpoint, 'autosign.key' )
  157. cfg.outdir = self.tx_dir
  158. cfg.passwd_file = self.keyfile
  159. if any(k in cfg._uopts for k in ('help','longhelp')):
  160. return
  161. if 'coin' in cfg._uopts:
  162. die(1,'--coin option not supported with this command. Use --coins instead')
  163. self.coins = cfg.coins.upper().split(',') if cfg.coins else []
  164. if not self.coins:
  165. ymsg('Warning: no coins specified, defaulting to BTC')
  166. self.coins = ['BTC']
  167. async def check_daemons_running(self):
  168. from .protocol import init_proto
  169. for coin in self.coins:
  170. proto = init_proto( self.cfg, coin, testnet=self.cfg.network=='testnet', need_amt=True )
  171. if proto.sign_mode == 'daemon':
  172. self.cfg._util.vmsg(f'Checking {coin} daemon')
  173. from .rpc import rpc_init
  174. from .exception import SocketError
  175. try:
  176. await rpc_init( self.cfg, proto )
  177. except SocketError as e:
  178. from .daemon import CoinDaemon
  179. d = CoinDaemon( self.cfg, proto=proto, test_suite=self.cfg.test_suite )
  180. die(2,
  181. f'\n{e}\nIs the {d.coind_name} daemon ({d.exec_fn}) running '
  182. + 'and listening on the correct port?' )
  183. @property
  184. def wallet_files(self):
  185. if not hasattr(self,'_wallet_files'):
  186. try:
  187. dirlist = os.listdir(self.wallet_dir)
  188. except:
  189. die(1,f'Cannot open wallet directory {self.wallet_dir!r}. Did you run ‘mmgen-autosign setup’?')
  190. fns = [fn for fn in dirlist if fn.endswith('.mmdat')]
  191. if fns:
  192. self._wallet_files = [os.path.join(self.wallet_dir,fn) for fn in fns]
  193. else:
  194. die(1,'No wallet files present!')
  195. return self._wallet_files
  196. def do_mount(self):
  197. from stat import S_ISDIR,S_IWUSR,S_IRUSR
  198. def check_dir(cdir):
  199. try:
  200. ds = os.stat(cdir)
  201. assert S_ISDIR(ds.st_mode), f'{cdir!r} is not a directory!'
  202. assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f'{cdir!r} is not read/write for this user!'
  203. except:
  204. die(1,f'{cdir!r} missing or not read/writable by user!')
  205. if not os.path.isdir(self.mountpoint):
  206. def do_die(m):
  207. die(1,'\n' + yellow(fmt(m.strip(),indent=' ')))
  208. if os.path.isdir(self.old_dfl_mountpoint):
  209. do_die(self.old_dfl_mountpoint_errmsg)
  210. else:
  211. do_die(self.mountpoint_errmsg_fs.format(self.mountpoint))
  212. if not os.path.ismount(self.mountpoint):
  213. if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0:
  214. msg(f'Mounting {self.mountpoint!r}')
  215. elif not self.cfg.test_suite:
  216. die(1,f'Unable to mount device at {self.mountpoint!r}')
  217. self.have_msg_dir = os.path.isdir(self.msg_dir)
  218. check_dir(self.tx_dir)
  219. if self.have_msg_dir:
  220. check_dir(self.msg_dir)
  221. def do_umount(self):
  222. if os.path.ismount(self.mountpoint):
  223. run( ['sync'], check=True )
  224. msg(f'Unmounting {self.mountpoint!r}')
  225. run( ['umount',self.mountpoint], check=True )
  226. bmsg('It is now safe to extract the removable device')
  227. def decrypt_wallets(self):
  228. msg(f'Unlocking wallet{suf(self.wallet_files)} with key from {self.cfg.passwd_file!r}')
  229. fails = 0
  230. for wf in self.wallet_files:
  231. try:
  232. Wallet( self.cfg, wf, ignore_in_fmt=True )
  233. except SystemExit as e:
  234. if e.code != 0:
  235. fails += 1
  236. return False if fails else True
  237. async def sign_all(self,target_name):
  238. target = getattr(Signable,target_name)(self)
  239. if target.unsigned:
  240. good = []
  241. bad = []
  242. for f in target.unsigned:
  243. ret = None
  244. try:
  245. ret = await target.sign(f)
  246. except Exception as e:
  247. ymsg(f'An error occurred with {target.desc} {f.name!r}:\n {type(e).__name__}: {e!s}')
  248. except:
  249. ymsg(f'An error occurred with {target.desc} {f.name!r}')
  250. good.append(ret) if ret else bad.append(f)
  251. self.cfg._util.qmsg('')
  252. await asyncio.sleep(0.3)
  253. msg(f'{len(good)} {target.desc}{suf(good)} signed')
  254. if bad:
  255. rmsg(f'{len(bad)} {target.desc}{suf(bad)} {target.fail_msg}')
  256. if good and not self.cfg.no_summary:
  257. target.print_summary(good)
  258. if bad:
  259. target.print_bad_list(bad)
  260. return not bad
  261. else:
  262. msg(f'No unsigned {target.desc}s')
  263. await asyncio.sleep(0.5)
  264. return True
  265. async def do_sign(self):
  266. if not self.cfg.stealth_led:
  267. self.led.set('busy')
  268. self.do_mount()
  269. key_ok = self.decrypt_wallets()
  270. if key_ok:
  271. if self.cfg.stealth_led:
  272. self.led.set('busy')
  273. ret1 = await self.sign_all('transaction')
  274. ret2 = await self.sign_all('message') if self.have_msg_dir else True
  275. ret = ret1 and ret2
  276. self.do_umount()
  277. self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)])
  278. return ret
  279. else:
  280. msg('Password is incorrect!')
  281. self.do_umount()
  282. if not self.cfg.stealth_led:
  283. self.led.set('error')
  284. return False
  285. def wipe_existing_key(self):
  286. try: os.stat(self.keyfile)
  287. except: pass
  288. else:
  289. from .fileutil import shred_file
  290. msg(f'\nShredding existing key {self.keyfile!r}')
  291. shred_file( self.keyfile, verbose=self.cfg.verbose )
  292. def create_key(self):
  293. kdata = os.urandom(32).hex()
  294. desc = f'key file {self.keyfile!r}'
  295. msg('Creating ' + desc)
  296. try:
  297. with open(self.keyfile,'w') as fp:
  298. fp.write(kdata+'\n')
  299. os.chmod(self.keyfile,0o400)
  300. msg('Wrote ' + desc)
  301. except:
  302. die(2,'Unable to write ' + desc)
  303. def gen_key(self,no_unmount=False):
  304. self.create_wallet_dir()
  305. if not self.get_insert_status():
  306. die(1,'Removable device not present!')
  307. self.do_mount()
  308. self.wipe_existing_key()
  309. self.create_key()
  310. if not no_unmount:
  311. self.do_umount()
  312. def remove_wallet_dir(self):
  313. msg(f'Deleting {self.wallet_dir!r}')
  314. import shutil
  315. try: shutil.rmtree(self.wallet_dir)
  316. except: pass
  317. def create_wallet_dir(self):
  318. try: os.mkdir(self.wallet_dir)
  319. except: pass
  320. try: os.stat(self.wallet_dir)
  321. except: die(2,f'Unable to create wallet directory {self.wallet_dir!r}')
  322. def setup(self):
  323. self.remove_wallet_dir()
  324. self.gen_key(no_unmount=True)
  325. wf = find_file_in_dir( get_wallet_cls('mmgen'), self.cfg.data_dir )
  326. if wf and keypress_confirm(
  327. cfg = self.cfg,
  328. prompt = f'Default wallet {wf!r} found.\nUse default wallet for autosigning?',
  329. default_yes = True ):
  330. from .cfg import Config
  331. ss_in = Wallet( Config(), wf )
  332. else:
  333. ss_in = Wallet( self.cfg, in_fmt=self.mn_fmts[self.cfg.mnemonic_fmt or self.dfl_mn_fmt] )
  334. ss_out = Wallet( self.cfg, ss=ss_in )
  335. ss_out.write_to_file( desc='autosign wallet', outdir=self.wallet_dir )
  336. def get_insert_status(self):
  337. if self.cfg.no_insert_check:
  338. return True
  339. try: os.stat(os.path.join( self.disk_label_dir, self.part_label ))
  340. except: return False
  341. else: return True
  342. async def do_loop(self):
  343. n,prev_status = 0,False
  344. if not self.cfg.stealth_led:
  345. self.led.set('standby')
  346. while True:
  347. status = self.get_insert_status()
  348. if status and not prev_status:
  349. msg('Device insertion detected')
  350. await self.do_sign()
  351. prev_status = status
  352. if not n % 10:
  353. msg_r(f"\r{' '*17}\rWaiting")
  354. sys.stderr.flush()
  355. await asyncio.sleep(1)
  356. msg_r('.')
  357. n += 1
  358. def at_exit(self,exit_val,message=None):
  359. if message:
  360. msg(message)
  361. self.led.stop()
  362. sys.exit(int(exit_val))
  363. def init_exit_handler(self):
  364. def handler(arg1,arg2):
  365. self.at_exit(1,'\nCleaning up...')
  366. import signal
  367. signal.signal( signal.SIGTERM, handler )
  368. signal.signal( signal.SIGINT, handler )
  369. def init_led(self):
  370. from .led import LEDControl
  371. self.led = LEDControl(
  372. enabled = self.cfg.led,
  373. simulate = os.getenv('MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE') )
  374. self.led.set('off')