ts_autosign.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test.test_py_d.ts_autosign: Autosign tests for the test.py test suite
  20. """
  21. import os,shutil
  22. from subprocess import run
  23. from mmgen.cfg import gc
  24. from ..include.common import *
  25. from .common import *
  26. from .ts_base import *
  27. from .ts_shared import *
  28. from .input import *
  29. from mmgen.led import LEDControl
  30. from mmgen.autosign import Autosign,AutosignConfig
  31. filedir_map = (
  32. ('btc',''),
  33. ('bch',''),
  34. ('ltc','litecoin'),
  35. ('eth','ethereum'),
  36. ('mm1','ethereum'),
  37. ('etc','ethereum_classic'),
  38. )
  39. def init_led(simulate):
  40. try:
  41. cf = LEDControl(enabled=True,simulate=simulate)
  42. except Exception as e:
  43. msg(str(e))
  44. die(2,'LEDControl initialization failed')
  45. for fn in (cf.board.status,cf.board.trigger):
  46. if fn:
  47. run(['sudo','chmod','0666',fn],check=True)
  48. def check_mountpoint(asi):
  49. if not os.path.ismount(asi.mountpoint):
  50. try:
  51. run(['mount',asi.mountpoint],check=True)
  52. imsg(f'Mounted {asi.mountpoint}')
  53. except:
  54. die(2,f'Could not mount {asi.mountpoint}! Exiting')
  55. if not os.path.isdir(asi.tx_dir):
  56. die(2,f'Directory {asi.tx_dir} does not exist! Exiting')
  57. def do_mount(mountpoint):
  58. if not os.path.ismount(mountpoint):
  59. try: run(['mount',mountpoint],check=True)
  60. except: pass
  61. def do_umount(mountpoint):
  62. if os.path.ismount(mountpoint):
  63. try: run(['umount',mountpoint],check=True)
  64. except: pass
  65. class TestSuiteAutosignBase(TestSuiteBase):
  66. networks = ('btc',)
  67. tmpdir_nums = [18]
  68. color = True
  69. mountpoint_basename = 'mmgen_autosign'
  70. def __init__(self,trunner,cfgs,spawn):
  71. super().__init__(trunner,cfgs,spawn)
  72. if trunner == None:
  73. return
  74. if gc.platform == 'win':
  75. die(1,f'Test {type(self).__name__} not supported for Windows platform')
  76. self.network_ids = [c+'_tn' for c in self.daemon_coins] + self.daemon_coins
  77. if not self.live:
  78. self.wallet_dir = os.path.join(self.tmpdir,'dev.shm.autosign')
  79. self.asi = Autosign(
  80. AutosignConfig({
  81. 'coins': ','.join(self.coins),
  82. 'mountpoint': (
  83. None if self.live else
  84. os.path.join(self.tmpdir,self.mountpoint_basename)
  85. ),
  86. 'wallet_dir': None if self.live else self.wallet_dir,
  87. })
  88. )
  89. self.mountpoint = self.asi.mountpoint
  90. if self.simulate and not cfg.exact_output:
  91. die(1,red('This command must be run with --exact-output enabled!'))
  92. if self.simulate or not self.live:
  93. os.environ['MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE'] = '1'
  94. LEDControl.create_dummy_control_files()
  95. self.opts = ['--coins='+','.join(self.coins)]
  96. if self.live:
  97. check_mountpoint(self.asi)
  98. init_led(self.simulate)
  99. else:
  100. os.makedirs(self.asi.tx_dir,exist_ok=True) # creates mountpoint
  101. os.makedirs(self.wallet_dir,exist_ok=True)
  102. self.opts.extend([
  103. '--mountpoint=' + self.mountpoint,
  104. '--no-insert-check',
  105. '--wallet-dir=' + self.wallet_dir,
  106. ])
  107. self.tx_file_ops('set_count') # initialize tx_count here so we can resume anywhere
  108. def gen_msg_fns():
  109. fmap = dict(filedir_map)
  110. for coin in self.coins:
  111. if coin == 'xmr':
  112. continue
  113. sdir = os.path.join('test','ref',fmap[coin])
  114. for fn in os.listdir(sdir):
  115. if fn.endswith(f'[{coin.upper()}].rawmsg.json'):
  116. yield os.path.join(sdir,fn)
  117. self.ref_msgfiles = tuple(gen_msg_fns())
  118. self.good_msg_count = 0
  119. self.bad_msg_count = 0
  120. def __del__(self):
  121. if gc.platform == 'win' or self.tr == None:
  122. return
  123. if self.simulate or not self.live:
  124. LEDControl.delete_dummy_control_files()
  125. def start_daemons(self):
  126. self.spawn('',msg_only=True)
  127. start_test_daemons(*self.network_ids)
  128. return 'ok'
  129. def stop_daemons(self):
  130. self.spawn('',msg_only=True)
  131. stop_test_daemons(*[i for i in self.network_ids if i != 'btc'])
  132. return 'ok'
  133. def gen_key(self):
  134. t = self.spawn( 'mmgen-autosign', self.opts + ['gen_key'] )
  135. t.expect_getend('Wrote key file ')
  136. return t
  137. def create_dfl_wallet(self):
  138. t = self.spawn( 'mmgen-walletconv', [
  139. f'--outdir={cfg.data_dir}',
  140. '--usr-randchars=0', '--quiet', '--hash-preset=1', '--label=foo',
  141. 'test/ref/98831F3A.hex'
  142. ]
  143. )
  144. t.passphrase_new('new MMGen wallet','abc')
  145. t.written_to_file('MMGen wallet')
  146. return t
  147. def make_wallet_from_dfl_wallet(self):
  148. return self.make_wallet(mn_type='default',use_dfl_wallet=True)
  149. def make_wallet_bip39(self):
  150. return self.make_wallet(mn_type='bip39')
  151. def make_wallet(self,mn_type=None,mn_file=None,use_dfl_wallet=False):
  152. mn_desc = mn_type or 'default'
  153. mn_type = mn_type or 'mmgen'
  154. t = self.spawn(
  155. 'mmgen-autosign',
  156. self.opts +
  157. ([] if mn_desc == 'default' else [f'--mnemonic-fmt={mn_type}']) +
  158. ['setup'] )
  159. if use_dfl_wallet:
  160. t.expect( 'Use default wallet for autosigning? (Y/n): ', 'y' )
  161. t.passphrase( 'MMGen wallet', 'abc' )
  162. else:
  163. if use_dfl_wallet is not None: # None => no dfl wallet present
  164. t.expect( 'Use default wallet for autosigning? (Y/n): ', 'n' )
  165. mn_file = mn_file or { 'mmgen': dfl_words_file, 'bip39': dfl_bip39_file }[mn_type]
  166. mn = read_from_file(mn_file).strip().split()
  167. from mmgen.mn_entry import mn_entry
  168. entry_mode = 'full'
  169. mne = mn_entry( cfg, mn_type, entry_mode )
  170. t.expect('words: ',{ 12:'1', 18:'2', 24:'3' }[len(mn)])
  171. t.expect('OK? (Y/n): ','\n')
  172. t.expect('Type a number.*: ',str(mne.entry_modes.index(entry_mode)+1),regex=True)
  173. stealth_mnemonic_entry(t,mne,mn,entry_mode)
  174. wf = t.written_to_file('Autosign wallet')
  175. return t
  176. def copy_tx_files(self):
  177. self.spawn('',msg_only=True)
  178. return self.tx_file_ops('copy')
  179. def remove_signed_txfiles(self):
  180. self.tx_file_ops('remove_signed')
  181. return 'skip'
  182. def remove_signed_txfiles_btc(self):
  183. self.tx_file_ops('remove_signed',txfile_coins=['btc'])
  184. return 'skip'
  185. def tx_file_ops(self,op,txfile_coins=[]):
  186. assert op in ('copy','set_count','remove_signed')
  187. fdata = [e for e in filedir_map if e[0] in (txfile_coins or self.txfile_coins)]
  188. from .ts_ref import TestSuiteRef
  189. tfns = [TestSuiteRef.sources['ref_tx_file'][c][1] for c,d in fdata] + \
  190. [TestSuiteRef.sources['ref_tx_file'][c][0] for c,d in fdata] + \
  191. ['25EFA3[2.34].testnet.rawtx'] # TX with 2 non-MMGen outputs
  192. self.tx_count = len([fn for fn in tfns if fn])
  193. if op == 'set_count':
  194. return
  195. tfs = [joinpath(ref_dir,d[1],fn) for d,fn in zip(fdata+fdata+[('btc','')],tfns)]
  196. for f,fn in zip(tfs,tfns):
  197. if fn: # use empty fn to skip file
  198. if cfg.debug_utf8:
  199. ext = '.testnet.rawtx' if fn.endswith('.testnet.rawtx') else '.rawtx'
  200. fn = fn[:-len(ext)] + '-α' + ext
  201. target = joinpath(self.mountpoint,'tx',fn)
  202. if not op == 'remove_signed':
  203. shutil.copyfile(f,target)
  204. try:
  205. os.unlink(target.replace('.rawtx','.sigtx'))
  206. except:
  207. pass
  208. return 'ok'
  209. def create_bad_txfiles(self):
  210. return self.bad_txfiles('create')
  211. def remove_bad_txfiles(self):
  212. return self.bad_txfiles('remove')
  213. def bad_txfiles(self,op):
  214. if self.live:
  215. do_mount(self.mountpoint)
  216. # create or delete 2 bad tx files
  217. self.spawn('',msg_only=True)
  218. fns = [joinpath(self.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)]
  219. if op == 'create':
  220. for fn in fns:
  221. with open(fn,'w') as fp:
  222. fp.write('bad tx data\n')
  223. self.bad_tx_count = 2
  224. elif op == 'remove':
  225. for fn in fns:
  226. try: os.unlink(fn)
  227. except: pass
  228. self.bad_tx_count = 0
  229. return 'ok'
  230. def copy_msgfiles(self):
  231. return self.msgfile_ops('copy')
  232. def remove_signed_msgfiles(self):
  233. return self.msgfile_ops('remove_signed')
  234. def create_invalid_msgfile(self):
  235. return self.msgfile_ops('create_invalid')
  236. def remove_invalid_msgfile(self):
  237. return self.msgfile_ops('remove_invalid')
  238. def msgfile_ops(self,op):
  239. self.spawn('',msg_only=True)
  240. destdir = joinpath(self.mountpoint,'msg')
  241. os.makedirs(destdir,exist_ok=True)
  242. if op.endswith('_invalid'):
  243. fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json')
  244. if op == 'create_invalid':
  245. with open(fn,'w') as fp:
  246. fp.write('bad data\n')
  247. self.bad_msg_count += 1
  248. elif op == 'remove_invalid':
  249. os.unlink(fn)
  250. self.bad_msg_count -= 1
  251. else:
  252. for fn in self.ref_msgfiles:
  253. if op == 'copy':
  254. if os.path.basename(fn) == 'ED405C[BTC].rawmsg.json': # contains bad Seed ID
  255. self.bad_msg_count += 1
  256. else:
  257. self.good_msg_count += 1
  258. imsg(f'Copying: {fn} -> {destdir}')
  259. shutil.copy2(fn,destdir)
  260. elif op == 'remove_signed':
  261. os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
  262. return 'ok'
  263. def do_sign(self,args,have_msg=False,tx_name='transaction'):
  264. t = self.spawn('mmgen-autosign', self.opts + args )
  265. t.expect(
  266. f'{self.tx_count} {tx_name}{suf(self.tx_count)} signed' if self.tx_count else
  267. 'No unsigned transactions' )
  268. if self.bad_tx_count:
  269. t.expect(f'{self.bad_tx_count} {tx_name}{suf(self.bad_tx_count)} failed to sign')
  270. t.req_exit_val = 1
  271. if have_msg:
  272. t.expect(
  273. f'{self.good_msg_count} message file{suf(self.good_msg_count)}{{0,1}} signed'
  274. if self.good_msg_count else
  275. 'No unsigned message files', regex=True )
  276. if self.bad_msg_count:
  277. t.expect(
  278. f'{self.bad_msg_count} message file{suf(self.bad_msg_count)}{{0,1}} failed to sign',
  279. regex = True )
  280. t.req_exit_val = 1
  281. if 'wait' in args:
  282. t.expect('Waiting')
  283. imsg(purple('\nKilling wait loop!'))
  284. t.kill(2)
  285. t.req_exit_val = 1
  286. else:
  287. t.read()
  288. imsg('')
  289. return t
  290. class TestSuiteAutosign(TestSuiteAutosignBase):
  291. 'autosigning transactions for all supported coins'
  292. coins = ['btc','bch','ltc','eth']
  293. daemon_coins = ['btc','bch','ltc']
  294. txfile_coins = ['btc','bch','ltc','eth','mm1','etc']
  295. live = False
  296. simulate = False
  297. bad_tx_count = 0
  298. cmd_group = (
  299. ('start_daemons', 'starting daemons'),
  300. ('copy_tx_files', 'copying transaction files'),
  301. ('gen_key', 'generating key'),
  302. ('create_dfl_wallet', 'creating default MMGen wallet'),
  303. ('make_wallet_from_dfl_wallet','making autosign wallet (from user’s default wallet)'),
  304. ('sign_quiet', 'signing transactions (--quiet)'),
  305. ('remove_signed_txfiles', 'removing signed transaction files'),
  306. ('make_wallet_bip39', 'making autosign wallet (from BIP39 mnemonic)'),
  307. ('create_bad_txfiles', 'creating bad transaction files'),
  308. ('sign_full_summary', 'signing transactions (--full-summary)'),
  309. ('remove_signed_txfiles_btc','removing transaction files (BTC only)'),
  310. ('remove_bad_txfiles', 'removing bad transaction files'),
  311. ('sign_led', 'signing transactions (--led - BTC files only)'),
  312. ('remove_signed_txfiles', 'removing signed transaction files'),
  313. ('sign_stealth_led', 'signing transactions (--stealth-led)'),
  314. ('remove_signed_txfiles', 'removing signed transaction files'),
  315. ('copy_msgfiles', 'copying message files'),
  316. ('sign_quiet_msg', 'signing transactions and messages (--quiet)'),
  317. ('remove_signed_txfiles', 'removing signed transaction files'),
  318. ('create_bad_txfiles', 'creating bad transaction files'),
  319. ('remove_signed_msgfiles', 'removing signed message files'),
  320. ('create_invalid_msgfile', 'creating invalid message file'),
  321. ('sign_full_summary_msg', 'signing transactions and messages (--full-summary)'),
  322. ('remove_invalid_msgfile', 'removing invalid message file'),
  323. ('remove_bad_txfiles', 'removing bad transaction files'),
  324. ('sign_no_unsigned_msg', 'signing transactions and messages (nothing to sign)'),
  325. ('stop_daemons', 'stopping daemons'),
  326. )
  327. def sign_quiet(self):
  328. return self.do_sign(['--quiet','wait'])
  329. def sign_full_summary(self):
  330. return self.do_sign(['--full-summary','wait'])
  331. def sign_led(self):
  332. return self.do_sign(['--quiet','--led'])
  333. def sign_stealth_led(self):
  334. return self.do_sign(['--quiet','--stealth-led','wait'])
  335. def sign_quiet_msg(self):
  336. return self.do_sign(['--quiet','wait'],have_msg=True)
  337. def sign_full_summary_msg(self):
  338. return self.do_sign(['--full-summary','wait'],have_msg=True)
  339. def sign_no_unsigned_msg(self):
  340. self.tx_count = 0
  341. self.good_msg_count = 0
  342. self.bad_msg_count = 0
  343. return self.do_sign(['--quiet','wait'],have_msg=True)
  344. class TestSuiteAutosignBTC(TestSuiteAutosign):
  345. 'autosigning BTC transactions'
  346. coins = ['btc']
  347. daemon_coins = ['btc']
  348. txfile_coins = ['btc']
  349. class TestSuiteAutosignLive(TestSuiteAutosignBTC):
  350. 'live autosigning BTC transactions'
  351. live = True
  352. cmd_group = (
  353. ('start_daemons', 'starting daemons'),
  354. ('copy_tx_files', 'copying transaction files'),
  355. ('gen_key', 'generating key'),
  356. ('make_wallet_mmgen', 'making autosign wallet (from MMGen native mnemonic)'),
  357. ('sign_live', 'signing transactions'),
  358. ('create_bad_txfiles', 'creating bad transaction files'),
  359. ('sign_live_led', 'signing transactions (--led)'),
  360. ('remove_bad_txfiles', 'removing bad transaction files'),
  361. ('sign_live_stealth_led','signing transactions (--stealth-led)'),
  362. ('stop_daemons', 'stopping daemons'),
  363. )
  364. def make_wallet_mmgen(self):
  365. return self.make_wallet(mn_type='mmgen',use_dfl_wallet=None)
  366. def sign_live(self):
  367. return self.do_sign_live([])
  368. def sign_live_led(self):
  369. return self.do_sign_live(['--led'])
  370. def sign_live_stealth_led(self):
  371. return self.do_sign_live(['--stealth-led'])
  372. def do_sign_live(self,led_opts):
  373. def prompt_remove():
  374. omsg_r(blue('\nRemove removable device and then hit ENTER '))
  375. input()
  376. def prompt_insert_sign(t):
  377. omsg(blue(insert_msg))
  378. t.expect(f'{self.tx_count} transactions signed')
  379. if self.bad_tx_count:
  380. t.expect(f'{self.bad_tx_count} transactions failed to sign')
  381. t.expect('Waiting')
  382. if led_opts:
  383. opts_msg = "'" + ' '.join(led_opts) + "'"
  384. info_msg = f"Running 'mmgen-autosign wait' with {led_opts[0]}. " + {
  385. '--led': "The LED should start blinking slowly now",
  386. '--stealth-led': "You should see no LED activity now"
  387. }[led_opts[0]]
  388. insert_msg = 'Insert removable device and watch for fast LED activity during signing'
  389. else:
  390. opts_msg = 'no LED'
  391. info_msg = "Running 'mmgen-autosign wait'"
  392. insert_msg = 'Insert removable device '
  393. omsg(purple(f'Running autosign test with {opts_msg}'))
  394. do_umount(self.mountpoint)
  395. prompt_remove()
  396. omsg(green(info_msg))
  397. t = self.spawn(
  398. 'mmgen-autosign',
  399. self.opts + led_opts + ['--quiet','--no-summary','wait'])
  400. if not cfg.exact_output:
  401. omsg('')
  402. prompt_insert_sign(t)
  403. do_mount(self.mountpoint) # race condition due to device insertion detection
  404. self.remove_signed_txfiles()
  405. do_umount(self.mountpoint)
  406. imsg(purple('\nKilling wait loop!'))
  407. t.kill(2) # 2 = SIGINT
  408. t.req_exit_val = 1
  409. if self.simulate and led_opts:
  410. t.expect("Stopping LED")
  411. return t
  412. class TestSuiteAutosignLiveSimulate(TestSuiteAutosignLive):
  413. 'live autosigning BTC transactions with simulated LED support'
  414. simulate = True