autosign.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen
  9. # https://gitlab.com/mmgen/mmgen
  10. """
  11. autosign: Auto-sign MMGen transactions, message files and XMR wallet output files
  12. """
  13. import sys,os,asyncio
  14. from pathlib import Path
  15. from subprocess import run,DEVNULL
  16. from .cfg import Config
  17. from .util import msg,msg_r,ymsg,rmsg,gmsg,bmsg,die,suf,fmt,fmt_list,async_run
  18. from .color import yellow,red,orange
  19. from .wallet import Wallet,get_wallet_cls
  20. from .filename import find_file_in_dir
  21. from .ui import keypress_confirm
  22. class AutosignConfig(Config):
  23. _set_ok = ('usr_randchars','_proto','outdir','passwd_file')
  24. class Signable:
  25. signables = ('transaction','message','xmr_transaction','xmr_wallet_outputs_file')
  26. class base:
  27. clean_all = False
  28. multiple_ok = True
  29. def __init__(self,parent):
  30. self.parent = parent
  31. self.cfg = parent.cfg
  32. self.dir = getattr(parent,self.dir_name)
  33. @property
  34. def unsigned(self):
  35. return self._unprocessed( '_unsigned', self.rawext, self.sigext )
  36. @property
  37. def unsubmitted(self):
  38. return self._unprocessed( '_unsubmitted', self.sigext, self.subext )
  39. def _unprocessed(self,attrname,rawext,sigext):
  40. if not hasattr(self,attrname):
  41. dirlist = sorted(self.dir.iterdir())
  42. names = {f.name for f in dirlist}
  43. setattr(
  44. self,
  45. attrname,
  46. tuple(f for f in dirlist
  47. if f.name.endswith('.' + rawext)
  48. and f.name[:-len(rawext)] + sigext not in names) )
  49. return getattr(self,attrname)
  50. def print_bad_list(self,bad_files):
  51. msg('\n{a}\n{b}'.format(
  52. a = red(f'Failed {self.desc}s:'),
  53. b = ' {}\n'.format('\n '.join(self.gen_bad_list(sorted(bad_files,key=lambda f: f.name))))
  54. ))
  55. class transaction(base):
  56. desc = 'transaction'
  57. rawext = 'rawtx'
  58. sigext = 'sigtx'
  59. dir_name = 'tx_dir'
  60. fail_msg = 'failed to sign'
  61. async def sign(self,f):
  62. from .tx import UnsignedTX
  63. tx1 = UnsignedTX( cfg=self.cfg, filename=f )
  64. if tx1.proto.sign_mode == 'daemon':
  65. from .rpc import rpc_init
  66. tx1.rpc = await rpc_init( self.cfg, tx1.proto, ignore_wallet=True )
  67. from .tx.sign import txsign
  68. tx2 = await txsign( self.cfg, tx1, self.parent.wallet_files[:], None, None )
  69. if tx2:
  70. tx2.file.write(ask_write=False)
  71. return tx2
  72. else:
  73. return False
  74. def print_summary(self,signables):
  75. if self.cfg.full_summary:
  76. bmsg('\nAutosign summary:\n')
  77. msg_r('\n'.join(tx.info.format(terse=True) for tx in signables))
  78. return
  79. def gen():
  80. for tx in signables:
  81. non_mmgen = [o for o in tx.outputs if not o.mmid]
  82. if non_mmgen:
  83. yield (tx,non_mmgen)
  84. body = list(gen())
  85. if body:
  86. bmsg('\nAutosign summary:')
  87. fs = '{} {} {}'
  88. t_wid,a_wid = 6,44
  89. def gen():
  90. yield fs.format('TX ID ','Non-MMGen outputs'+' '*(a_wid-17),'Amount')
  91. yield fs.format('-'*t_wid, '-'*a_wid, '-'*7)
  92. for tx,non_mmgen in body:
  93. for nm in non_mmgen:
  94. yield fs.format(
  95. tx.txid.fmt( width=t_wid, color=True ) if nm is non_mmgen[0] else ' '*t_wid,
  96. nm.addr.fmt( width=a_wid, color=True ),
  97. nm.amt.hl() + ' ' + yellow(tx.coin))
  98. msg('\n' + '\n'.join(gen()))
  99. else:
  100. msg('\nNo non-MMGen outputs')
  101. def gen_bad_list(self,bad_files):
  102. for f in bad_files:
  103. yield red(f.name)
  104. class xmr_signable(transaction): # virtual class
  105. def need_daemon_restart(self,m,new_idx):
  106. old_idx = self.parent.xmr_cur_wallet_idx
  107. self.parent.xmr_cur_wallet_idx = new_idx
  108. return old_idx != new_idx or m.wd.state != 'ready'
  109. def print_summary(self,signables):
  110. bmsg('\nAutosign summary:')
  111. msg('\n'.join(s.get_info(indent=' ') for s in signables) + self.summary_footer)
  112. class xmr_transaction(xmr_signable):
  113. dir_name = 'xmr_tx_dir'
  114. desc = 'Monero transaction'
  115. subext = 'subtx'
  116. multiple_ok = False
  117. summary_footer = ''
  118. async def sign(self,f):
  119. from .xmrwallet import MoneroMMGenTX,MoneroWalletOps,xmrwallet_uargs
  120. tx1 = MoneroMMGenTX.Completed( self.parent.xmrwallet_cfg, f )
  121. m = MoneroWalletOps.sign(
  122. self.parent.xmrwallet_cfg,
  123. xmrwallet_uargs(
  124. infile = str(self.parent.wallet_files[0]), # MMGen wallet file
  125. wallets = str(tx1.src_wallet_idx),
  126. spec = None ),
  127. )
  128. tx2 = await m.main( f, restart_daemon=self.need_daemon_restart(m,tx1.src_wallet_idx) )
  129. tx2.write(ask_write=False)
  130. return tx2
  131. class xmr_wallet_outputs_file(xmr_signable):
  132. desc = 'Monero wallet outputs file'
  133. rawext = 'raw'
  134. sigext = 'sig'
  135. dir_name = 'xmr_outputs_dir'
  136. clean_all = True
  137. summary_footer = '\n'
  138. async def sign(self,f):
  139. from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
  140. wallet_idx = MoneroWalletOps.wallet.get_idx_from_fn(f)
  141. m = MoneroWalletOps.export_key_images(
  142. self.parent.xmrwallet_cfg,
  143. xmrwallet_uargs(
  144. infile = str(self.parent.wallet_files[0]), # MMGen wallet file
  145. wallets = str(wallet_idx),
  146. spec = None ),
  147. )
  148. obj = await m.main( f, wallet_idx, restart_daemon=self.need_daemon_restart(m,wallet_idx) )
  149. obj.write()
  150. return obj
  151. class message(base):
  152. desc = 'message file'
  153. rawext = 'rawmsg.json'
  154. sigext = 'sigmsg.json'
  155. dir_name = 'msg_dir'
  156. fail_msg = 'failed to sign or signed incompletely'
  157. async def sign(self,f):
  158. from .msg import UnsignedMsg,SignedMsg
  159. m = UnsignedMsg( self.cfg, infile=f )
  160. await m.sign( wallet_files=self.parent.wallet_files[:] )
  161. m = SignedMsg( self.cfg, data=m.__dict__ )
  162. m.write_to_file(
  163. outdir = self.dir.resolve(),
  164. ask_overwrite = False )
  165. if m.data.get('failed_sids'):
  166. die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
  167. return m
  168. def print_summary(self,signables):
  169. gmsg('\nSigned message files:')
  170. for message in signables:
  171. gmsg(' ' + message.signed_filename)
  172. def gen_bad_list(self,bad_files):
  173. for f in bad_files:
  174. sigfile = f.parent / ( f.name[:-len(self.rawext)] + self.sigext )
  175. yield orange(sigfile.name) if sigfile.exists() else red(f.name)
  176. class Autosign:
  177. dfl_mountpoint = '/mnt/mmgen_autosign'
  178. dfl_wallet_dir = '/dev/shm/autosign'
  179. old_dfl_mountpoint = '/mnt/tx'
  180. dfl_dev_disk_path = '/dev/disk/by-label/MMGEN_TX'
  181. fake_dev_disk_path = '/tmp/mmgen-test-suite-dev.disk.by-label.MMGEN_TX'
  182. old_dfl_mountpoint_errmsg = f"""
  183. Mountpoint '{old_dfl_mountpoint}' is no longer supported!
  184. Please rename '{old_dfl_mountpoint}' to '{dfl_mountpoint}'
  185. and update your fstab accordingly.
  186. """
  187. mountpoint_errmsg_fs = """
  188. Mountpoint '{}' does not exist or does not point
  189. to a directory! Please create the mountpoint and add an entry
  190. to your fstab as described in this script’s help text.
  191. """
  192. mn_fmts = {
  193. 'mmgen': 'words',
  194. 'bip39': 'bip39',
  195. }
  196. dfl_mn_fmt = 'mmgen'
  197. have_msg_dir = False
  198. def __init__(self,cfg):
  199. self.cfg = cfg
  200. if cfg.mnemonic_fmt:
  201. if cfg.mnemonic_fmt not in self.mn_fmts:
  202. die(1,'{!r}: invalid mnemonic format (must be one of: {})'.format(
  203. cfg.mnemonic_fmt,
  204. fmt_list( self.mn_fmts, fmt='no_spc' ) ))
  205. self.dev_disk_path = Path(
  206. self.fake_dev_disk_path if cfg.test_suite_xmr_autosign else
  207. self.dfl_dev_disk_path )
  208. self.mountpoint = Path(cfg.mountpoint or self.dfl_mountpoint)
  209. self.wallet_dir = Path(cfg.wallet_dir or self.dfl_wallet_dir)
  210. self.tx_dir = self.mountpoint / 'tx'
  211. self.msg_dir = self.mountpoint / 'msg'
  212. self.keyfile = self.mountpoint / 'autosign.key'
  213. cfg.outdir = str(self.tx_dir)
  214. cfg.passwd_file = str(self.keyfile)
  215. if any(k in cfg._uopts for k in ('help','longhelp')):
  216. return
  217. if 'coin' in cfg._uopts:
  218. die(1,'--coin option not supported with this command. Use --coins instead')
  219. self.coins = cfg.coins.upper().split(',') if cfg.coins else []
  220. if cfg._args and cfg._args[0] == 'clean':
  221. return
  222. if cfg.xmrwallets and not 'XMR' in self.coins:
  223. self.coins.append('XMR')
  224. if not self.coins:
  225. ymsg('Warning: no coins specified, defaulting to BTC')
  226. self.coins = ['BTC']
  227. if 'XMR' in self.coins:
  228. self.xmr_dir = self.mountpoint / 'xmr'
  229. self.xmr_tx_dir = self.mountpoint / 'xmr' / 'tx'
  230. self.xmr_outputs_dir = self.mountpoint / 'xmr' / 'outputs'
  231. self.xmr_cur_wallet_idx = None
  232. async def check_daemons_running(self):
  233. from .protocol import init_proto
  234. for coin in self.coins:
  235. proto = init_proto( self.cfg, coin, testnet=self.cfg.network=='testnet', need_amt=True )
  236. if proto.sign_mode == 'daemon':
  237. self.cfg._util.vmsg(f'Checking {coin} daemon')
  238. from .rpc import rpc_init
  239. from .exception import SocketError
  240. try:
  241. await rpc_init( self.cfg, proto, ignore_wallet=True )
  242. except SocketError as e:
  243. from .daemon import CoinDaemon
  244. d = CoinDaemon( self.cfg, proto=proto, test_suite=self.cfg.test_suite )
  245. die(2,
  246. f'\n{e}\nIs the {d.coind_name} daemon ({d.exec_fn}) running '
  247. + 'and listening on the correct port?' )
  248. @property
  249. def wallet_files(self):
  250. if not hasattr(self,'_wallet_files'):
  251. try:
  252. dirlist = self.wallet_dir.iterdir()
  253. except:
  254. die(1,f"Cannot open wallet directory '{self.wallet_dir}'. Did you run ‘mmgen-autosign setup’?")
  255. self._wallet_files = [f for f in dirlist if f.suffix == '.mmdat']
  256. if not self._wallet_files:
  257. die(1,'No wallet files present!')
  258. return self._wallet_files
  259. def do_mount(self,no_xmr_chk=False):
  260. from stat import S_ISDIR,S_IWUSR,S_IRUSR
  261. def check_dir(cdir):
  262. try:
  263. ds = cdir.stat()
  264. assert S_ISDIR(ds.st_mode), f"'{cdir}' is not a directory!"
  265. assert ds.st_mode & S_IWUSR|S_IRUSR == S_IWUSR|S_IRUSR, f"'{cdir}' is not read/write for this user!"
  266. except:
  267. die(1,f"'{cdir}' missing or not read/writable by user!")
  268. if not self.mountpoint.is_dir():
  269. def do_die(m):
  270. die(1,'\n' + yellow(fmt(m.strip(),indent=' ')))
  271. if Path(self.old_dfl_mountpoint).is_dir():
  272. do_die(self.old_dfl_mountpoint_errmsg)
  273. else:
  274. do_die(self.mountpoint_errmsg_fs.format(self.mountpoint))
  275. if not self.mountpoint.is_mount():
  276. if run( ['mount',self.mountpoint], stderr=DEVNULL, stdout=DEVNULL ).returncode == 0:
  277. msg(f"Mounting '{self.mountpoint}'")
  278. elif not self.cfg.test_suite:
  279. die(1,f"Unable to mount device at '{self.mountpoint}'")
  280. self.have_msg_dir = self.msg_dir.is_dir()
  281. check_dir(self.tx_dir)
  282. if self.have_msg_dir:
  283. check_dir(self.msg_dir)
  284. if 'XMR' in self.coins and not no_xmr_chk:
  285. check_dir(self.xmr_tx_dir)
  286. def do_umount(self):
  287. if self.mountpoint.is_mount():
  288. run( ['sync'], check=True )
  289. msg(f"Unmounting '{self.mountpoint}'")
  290. run( ['umount',self.mountpoint], check=True )
  291. bmsg('It is now safe to extract the removable device')
  292. def decrypt_wallets(self):
  293. msg(f"Unlocking wallet{suf(self.wallet_files)} with key from '{self.cfg.passwd_file}'")
  294. fails = 0
  295. for wf in self.wallet_files:
  296. try:
  297. Wallet( self.cfg, wf, ignore_in_fmt=True )
  298. except SystemExit as e:
  299. if e.code != 0:
  300. fails += 1
  301. return not fails
  302. async def sign_all(self,target_name):
  303. target = getattr(Signable,target_name)(self)
  304. if target.unsigned:
  305. good = []
  306. bad = []
  307. if len(target.unsigned) > 1 and not target.multiple_ok:
  308. ymsg(f'Autosign error: only one unsigned {target.desc} transaction allowed at a time!')
  309. target.print_bad_list(target.unsigned)
  310. return False
  311. for f in target.unsigned:
  312. ret = None
  313. try:
  314. ret = await target.sign(f)
  315. except Exception as e:
  316. ymsg(f"An error occurred with {target.desc} '{f.name}':\n {type(e).__name__}: {e!s}")
  317. except:
  318. ymsg(f"An error occurred with {target.desc} '{f.name}'")
  319. good.append(ret) if ret else bad.append(f)
  320. self.cfg._util.qmsg('')
  321. await asyncio.sleep(0.3)
  322. msg(f'{len(good)} {target.desc}{suf(good)} signed')
  323. if bad:
  324. rmsg(f'{len(bad)} {target.desc}{suf(bad)} {target.fail_msg}')
  325. if good and not self.cfg.no_summary:
  326. target.print_summary(good)
  327. if bad:
  328. target.print_bad_list(bad)
  329. return not bad
  330. else:
  331. msg(f'No unsigned {target.desc}s')
  332. await asyncio.sleep(0.5)
  333. return True
  334. async def do_sign(self):
  335. if not self.cfg.stealth_led:
  336. self.led.set('busy')
  337. self.do_mount()
  338. key_ok = self.decrypt_wallets()
  339. if key_ok:
  340. if self.cfg.stealth_led:
  341. self.led.set('busy')
  342. ret1 = await self.sign_all('transaction')
  343. ret2 = await self.sign_all('message') if self.have_msg_dir else True
  344. # import XMR wallet outputs BEFORE signing transactions:
  345. ret3 = await self.sign_all('xmr_wallet_outputs_file') if 'XMR' in self.coins else True
  346. ret4 = await self.sign_all('xmr_transaction') if 'XMR' in self.coins else True
  347. ret = ret1 and ret2 and ret3 and ret4
  348. self.do_umount()
  349. self.led.set(('standby','off','error')[(not ret)*2 or bool(self.cfg.stealth_led)])
  350. return ret
  351. else:
  352. msg('Password is incorrect!')
  353. self.do_umount()
  354. if not self.cfg.stealth_led:
  355. self.led.set('error')
  356. return False
  357. def wipe_existing_key(self):
  358. try:
  359. self.keyfile.stat()
  360. except:
  361. pass
  362. else:
  363. from .fileutil import shred_file
  364. msg(f"\nShredding existing key '{self.keyfile}'")
  365. shred_file( self.keyfile, verbose=self.cfg.verbose )
  366. def create_key(self):
  367. desc = f"key file '{self.keyfile}'"
  368. msg('Creating ' + desc)
  369. try:
  370. self.keyfile.write_text( os.urandom(32).hex() )
  371. self.keyfile.chmod(0o400)
  372. except:
  373. die(2,'Unable to write ' + desc)
  374. msg('Wrote ' + desc)
  375. def gen_key(self,no_unmount=False):
  376. if not self.get_insert_status():
  377. die(1,'Removable device not present!')
  378. self.do_mount(no_xmr_chk=True)
  379. self.wipe_existing_key()
  380. self.create_key()
  381. if not no_unmount:
  382. self.do_umount()
  383. def setup(self):
  384. def remove_wallet_dir():
  385. msg(f"Deleting '{self.wallet_dir}'")
  386. import shutil
  387. try:
  388. shutil.rmtree(self.wallet_dir)
  389. except:
  390. pass
  391. def create_wallet_dir():
  392. try:
  393. self.wallet_dir.mkdir(parents=True)
  394. except:
  395. pass
  396. try:
  397. self.wallet_dir.stat()
  398. except:
  399. die(2,f"Unable to create wallet directory '{self.wallet_dir}'")
  400. remove_wallet_dir()
  401. create_wallet_dir()
  402. self.gen_key(no_unmount=True)
  403. wf = find_file_in_dir( get_wallet_cls('mmgen'), self.cfg.data_dir )
  404. if wf and keypress_confirm(
  405. cfg = self.cfg,
  406. prompt = f"Default wallet '{wf}' found.\nUse default wallet for autosigning?",
  407. default_yes = True ):
  408. ss_in = Wallet( Config(), wf )
  409. else:
  410. ss_in = Wallet( self.cfg, in_fmt=self.mn_fmts[self.cfg.mnemonic_fmt or self.dfl_mn_fmt] )
  411. ss_out = Wallet( self.cfg, ss=ss_in )
  412. ss_out.write_to_file( desc='autosign wallet', outdir=self.wallet_dir )
  413. @property
  414. def xmrwallet_cfg(self):
  415. if not hasattr(self,'_xmrwallet_cfg'):
  416. self._xmrwallet_cfg = Config({
  417. '_clone': self.cfg,
  418. 'coin': 'xmr',
  419. 'wallet_rpc_user': 'autosign',
  420. 'wallet_rpc_password': 'autosign password',
  421. 'wallet_rpc_port': 23232 if self.cfg.test_suite_xmr_autosign else None,
  422. 'wallet_dir': str(self.wallet_dir),
  423. 'autosign': True,
  424. 'autosign_mountpoint': str(self.mountpoint),
  425. 'outdir': str(self.xmr_dir), # required by vkal.write()
  426. })
  427. return self._xmrwallet_cfg
  428. def xmr_setup(self):
  429. def create_signing_wallets():
  430. from .xmrwallet import MoneroWalletOps,xmrwallet_uargs
  431. if len(self.wallet_files) > 1:
  432. ymsg(f'Warning: more than one wallet file, using the first ({self.wallet_files[0]}) for xmrwallet generation')
  433. m = MoneroWalletOps.create_offline(
  434. self.xmrwallet_cfg,
  435. xmrwallet_uargs(
  436. infile = str(self.wallet_files[0]), # MMGen wallet file
  437. wallets = self.cfg.xmrwallets, # XMR wallet idxs
  438. spec = None ),
  439. )
  440. async_run(m.main())
  441. async_run(m.stop_wallet_daemon())
  442. import shutil
  443. try:
  444. shutil.rmtree(self.xmr_outputs_dir)
  445. except:
  446. pass
  447. self.xmr_outputs_dir.mkdir(parents=True)
  448. self.xmr_tx_dir.mkdir(exist_ok=True)
  449. self.clean_old_files()
  450. create_signing_wallets()
  451. def clean_old_files(self):
  452. def do_shred(f):
  453. nonlocal count
  454. msg_r('.')
  455. from .fileutil import shred_file
  456. shred_file( f, verbose=self.cfg.verbose )
  457. count += 1
  458. def clean_dir(s_name):
  459. def clean_files(rawext,sigext):
  460. for f in s.dir.iterdir():
  461. if s.clean_all and (f.name.endswith(f'.{rawext}') or f.name.endswith(f'.{sigext}')):
  462. do_shred(f)
  463. elif f.name.endswith(f'.{sigext}'):
  464. raw = f.parent / ( f.name[:-len(sigext)] + rawext )
  465. if raw.is_file():
  466. do_shred(raw)
  467. s = getattr(Signable,s_name)(asi)
  468. msg_r(f"Cleaning directory '{s.dir}'..")
  469. if s.dir.is_dir():
  470. clean_files( s.rawext, s.sigext )
  471. if hasattr(s,'subext'):
  472. clean_files( s.rawext, s.subext )
  473. clean_files( s.sigext, s.subext )
  474. msg('done' if s.dir.is_dir() else 'skipped (no dir)')
  475. asi = get_autosign_obj( self.cfg, 'btc,xmr' )
  476. count = 0
  477. for s_name in Signable.signables:
  478. clean_dir(s_name)
  479. bmsg(f'{count} file{suf(count)} shredded')
  480. def get_insert_status(self):
  481. if self.cfg.no_insert_check:
  482. return True
  483. try:
  484. self.dev_disk_path.stat()
  485. except:
  486. return False
  487. else:
  488. return True
  489. async def do_loop(self):
  490. if not self.cfg.stealth_led:
  491. self.led.set('standby')
  492. testing_xmr = self.cfg.test_suite_xmr_autosign
  493. if testing_xmr:
  494. msg('Waiting for fake device insertion')
  495. n = 1 if testing_xmr else 0
  496. prev_status = False
  497. while True:
  498. status = self.get_insert_status()
  499. if status and not prev_status:
  500. msg('Device insertion detected')
  501. await self.do_sign()
  502. if testing_xmr:
  503. if self.dev_disk_path.exists():
  504. self.dev_disk_path.unlink()
  505. prev_status = status
  506. if not n % 10:
  507. msg_r(f"\r{' '*17}\rWaiting")
  508. await asyncio.sleep(1)
  509. if not testing_xmr:
  510. msg_r('.')
  511. n += 1
  512. def at_exit(self,exit_val,message=None):
  513. if message:
  514. msg(message)
  515. self.led.stop()
  516. sys.exit(0 if self.cfg.test_suite_xmr_autosign else int(exit_val))
  517. def init_exit_handler(self):
  518. def handler(arg1,arg2):
  519. self.at_exit(1,'\nCleaning up...')
  520. import signal
  521. signal.signal( signal.SIGTERM, handler )
  522. signal.signal( signal.SIGINT, handler )
  523. def init_led(self):
  524. from .led import LEDControl
  525. self.led = LEDControl(
  526. enabled = self.cfg.led,
  527. simulate = self.cfg.test_suite_autosign_led_simulate )
  528. self.led.set('off')
  529. def get_autosign_obj(cfg,coins=None):
  530. return Autosign(
  531. AutosignConfig({
  532. 'mountpoint': cfg.autosign_mountpoint or cfg.mountpoint,
  533. 'test_suite': cfg.test_suite,
  534. 'coins': coins if isinstance(coins,str) else ','.join(coins) if coins else 'btc',
  535. })
  536. )