ts_autosign.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. test.test_py_d.ts_autosign: Autosign tests for the test.py test suite
  20. """
  21. import os,shutil
  22. from subprocess import run
  23. from mmgen.globalvars import g,gc
  24. from mmgen.opts import opt
  25. from ..include.common import *
  26. from .common import *
  27. from .ts_base import *
  28. from .ts_shared import *
  29. from .input import *
  30. from mmgen.led import LEDControl
  31. filedir_map = (
  32. ('btc',''),
  33. ('bch',''),
  34. ('ltc','litecoin'),
  35. ('eth','ethereum'),
  36. ('mm1','ethereum'),
  37. ('etc','ethereum_classic'),
  38. )
  39. def init_led(simulate):
  40. try:
  41. cf = LEDControl(enabled=True,simulate=simulate)
  42. except Exception as e:
  43. msg(str(e))
  44. die(2,'LEDControl initialization failed')
  45. for fn in (cf.board.status,cf.board.trigger):
  46. if fn:
  47. run(['sudo','chmod','0666',fn],check=True)
  48. def check_mountpoint(mountpoint):
  49. if not os.path.ismount(mountpoint):
  50. try:
  51. run(['mount',mountpoint],check=True)
  52. imsg(f'Mounted {mountpoint}')
  53. except:
  54. die(2,f'Could not mount {mountpoint}! Exiting')
  55. txdir = joinpath(mountpoint,'tx')
  56. if not os.path.isdir(txdir):
  57. die(2,f'Directory {txdir} does not exist! Exiting')
  58. def do_mount(mountpoint):
  59. if not os.path.ismount(mountpoint):
  60. try: run(['mount',mountpoint],check=True)
  61. except: pass
  62. def do_umount(mountpoint):
  63. if os.path.ismount(mountpoint):
  64. try: run(['umount',mountpoint],check=True)
  65. except: pass
  66. class TestSuiteAutosignBase(TestSuiteBase):
  67. networks = ('btc',)
  68. tmpdir_nums = [18]
  69. color = True
  70. def __init__(self,trunner,cfgs,spawn):
  71. super().__init__(trunner,cfgs,spawn)
  72. if trunner == None:
  73. return
  74. if gc.platform == 'win':
  75. die(1,f'Test {type(self).__name__} not supported for Windows platform')
  76. self.network_ids = [c+'_tn' for c in self.daemon_coins] + self.daemon_coins
  77. if self.simulate and not opt.exact_output:
  78. die(1,red('This command must be run with --exact-output enabled!'))
  79. if self.simulate or not self.live:
  80. os.environ['MMGEN_TEST_SUITE_AUTOSIGN_LED_SIMULATE'] = '1'
  81. LEDControl.create_dummy_control_files()
  82. if self.live:
  83. self.mountpoint = '/mnt/tx'
  84. self.opts = ['--coins='+','.join(self.coins)]
  85. check_mountpoint(self.mountpoint)
  86. init_led(self.simulate)
  87. else:
  88. self.mountpoint = self.tmpdir
  89. try:
  90. os.mkdir(joinpath(self.mountpoint,'tx'))
  91. except:
  92. pass
  93. self.opts = [
  94. '--coins='+','.join(self.coins),
  95. '--mountpoint='+self.mountpoint,
  96. '--no-insert-check' ]
  97. self.tx_file_ops('set_count') # initialize tx_count here so we can resume anywhere
  98. def gen_msg_fns():
  99. fmap = dict(filedir_map)
  100. for coin in self.coins:
  101. sdir = os.path.join('test','ref',fmap[coin])
  102. for fn in os.listdir(sdir):
  103. if fn.endswith(f'[{coin.upper()}].rawmsg.json'):
  104. yield os.path.join(sdir,fn)
  105. self.ref_msgfiles = tuple(gen_msg_fns())
  106. self.good_msg_count = 0
  107. self.bad_msg_count = 0
  108. def __del__(self):
  109. if gc.platform == 'win' or self.tr == None:
  110. return
  111. if self.simulate or not self.live:
  112. LEDControl.delete_dummy_control_files()
  113. def start_daemons(self):
  114. self.spawn('',msg_only=True)
  115. start_test_daemons(*self.network_ids)
  116. return 'ok'
  117. def stop_daemons(self):
  118. self.spawn('',msg_only=True)
  119. stop_test_daemons(*[i for i in self.network_ids if i != 'btc'])
  120. return 'ok'
  121. def gen_key(self):
  122. t = self.spawn( 'mmgen-autosign', self.opts + ['gen_key'] )
  123. t.expect_getend('Wrote key file ')
  124. return t
  125. def make_wallet_mmgen(self):
  126. return self.make_wallet(mn_type='mmgen')
  127. def make_wallet_bip39(self):
  128. return self.make_wallet(mn_type='bip39')
  129. def make_wallet(self,mn_type=None):
  130. mn_desc = mn_type or 'default'
  131. mn_type = mn_type or 'mmgen'
  132. t = self.spawn(
  133. 'mmgen-autosign',
  134. self.opts +
  135. ([] if mn_desc == 'default' else [f'--mnemonic-fmt={mn_type}']) +
  136. ['setup'] )
  137. t.expect('words: ','3')
  138. t.expect('OK? (Y/n): ','\n')
  139. mn_file = { 'mmgen': dfl_words_file, 'bip39': dfl_bip39_file }[mn_type]
  140. mn = read_from_file(mn_file).strip().split()
  141. from mmgen.mn_entry import mn_entry
  142. entry_mode = 'full'
  143. mne = mn_entry(mn_type,entry_mode)
  144. t.expect('Type a number.*: ',str(mne.entry_modes.index(entry_mode)+1),regex=True)
  145. stealth_mnemonic_entry(t,mne,mn,entry_mode)
  146. wf = t.written_to_file('Autosign wallet')
  147. return t
  148. def copy_tx_files(self):
  149. self.spawn('',msg_only=True)
  150. return self.tx_file_ops('copy')
  151. def remove_signed_txfiles(self):
  152. self.tx_file_ops('remove_signed')
  153. return 'skip'
  154. def remove_signed_txfiles_btc(self):
  155. self.tx_file_ops('remove_signed',txfile_coins=['btc'])
  156. return 'skip'
  157. def tx_file_ops(self,op,txfile_coins=[]):
  158. assert op in ('copy','set_count','remove_signed')
  159. fdata = [e for e in filedir_map if e[0] in (txfile_coins or self.txfile_coins)]
  160. from .ts_ref import TestSuiteRef
  161. tfns = [TestSuiteRef.sources['ref_tx_file'][c][1] for c,d in fdata] + \
  162. [TestSuiteRef.sources['ref_tx_file'][c][0] for c,d in fdata] + \
  163. ['25EFA3[2.34].testnet.rawtx'] # TX with 2 non-MMGen outputs
  164. self.tx_count = len([fn for fn in tfns if fn])
  165. if op == 'set_count':
  166. return
  167. tfs = [joinpath(ref_dir,d[1],fn) for d,fn in zip(fdata+fdata+[('btc','')],tfns)]
  168. for f,fn in zip(tfs,tfns):
  169. if fn: # use empty fn to skip file
  170. if g.debug_utf8:
  171. ext = '.testnet.rawtx' if fn.endswith('.testnet.rawtx') else '.rawtx'
  172. fn = fn[:-len(ext)] + '-α' + ext
  173. target = joinpath(self.mountpoint,'tx',fn)
  174. if not op == 'remove_signed':
  175. shutil.copyfile(f,target)
  176. try:
  177. os.unlink(target.replace('.rawtx','.sigtx'))
  178. except:
  179. pass
  180. return 'ok'
  181. def create_bad_txfiles(self):
  182. return self.bad_txfiles('create')
  183. def remove_bad_txfiles(self):
  184. return self.bad_txfiles('remove')
  185. def bad_txfiles(self,op):
  186. if self.live:
  187. do_mount(self.mountpoint)
  188. # create or delete 2 bad tx files
  189. self.spawn('',msg_only=True)
  190. fns = [joinpath(self.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)]
  191. if op == 'create':
  192. for fn in fns:
  193. with open(fn,'w') as fp:
  194. fp.write('bad tx data\n')
  195. self.bad_tx_count = 2
  196. elif op == 'remove':
  197. for fn in fns:
  198. try: os.unlink(fn)
  199. except: pass
  200. self.bad_tx_count = 0
  201. return 'ok'
  202. def copy_msgfiles(self):
  203. return self.msgfile_ops('copy')
  204. def remove_signed_msgfiles(self):
  205. return self.msgfile_ops('remove_signed')
  206. def create_invalid_msgfile(self):
  207. return self.msgfile_ops('create_invalid')
  208. def remove_invalid_msgfile(self):
  209. return self.msgfile_ops('remove_invalid')
  210. def msgfile_ops(self,op):
  211. self.spawn('',msg_only=True)
  212. destdir = joinpath(self.mountpoint,'msg')
  213. os.makedirs(destdir,exist_ok=True)
  214. if op.endswith('_invalid'):
  215. fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json')
  216. if op == 'create_invalid':
  217. with open(fn,'w') as fp:
  218. fp.write('bad data\n')
  219. self.bad_msg_count += 1
  220. elif op == 'remove_invalid':
  221. os.unlink(fn)
  222. self.bad_msg_count -= 1
  223. else:
  224. for fn in self.ref_msgfiles:
  225. if op == 'copy':
  226. if os.path.basename(fn) == 'ED405C[BTC].rawmsg.json': # contains bad Seed ID
  227. self.bad_msg_count += 1
  228. else:
  229. self.good_msg_count += 1
  230. imsg(f'Copying: {fn} -> {destdir}')
  231. shutil.copy2(fn,destdir)
  232. elif op == 'remove_signed':
  233. os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
  234. return 'ok'
  235. class TestSuiteAutosign(TestSuiteAutosignBase):
  236. 'autosigning transactions for all supported coins'
  237. coins = ['btc','bch','ltc','eth']
  238. daemon_coins = ['btc','bch','ltc']
  239. txfile_coins = ['btc','bch','ltc','eth','mm1','etc']
  240. live = False
  241. simulate = False
  242. bad_tx_count = 0
  243. cmd_group = (
  244. ('start_daemons', 'starting daemons'),
  245. ('copy_tx_files', 'copying transaction files'),
  246. ('gen_key', 'generating key'),
  247. ('make_wallet_mmgen', 'making wallet (MMGen native)'),
  248. ('sign_quiet', 'signing transactions (--quiet)'),
  249. ('remove_signed_txfiles', 'removing signed transaction files'),
  250. ('make_wallet_bip39', 'making wallet (BIP39)'),
  251. ('create_bad_txfiles', 'creating bad transaction files'),
  252. ('sign_full_summary', 'signing transactions (--full-summary)'),
  253. ('remove_signed_txfiles_btc','removing transaction files (BTC only)'),
  254. ('remove_bad_txfiles', 'removing bad transaction files'),
  255. ('sign_led', 'signing transactions (--led - BTC files only)'),
  256. ('remove_signed_txfiles', 'removing signed transaction files'),
  257. ('sign_stealth_led', 'signing transactions (--stealth-led)'),
  258. ('remove_signed_txfiles', 'removing signed transaction files'),
  259. ('copy_msgfiles', 'copying message files'),
  260. ('sign_quiet_msg', 'signing transactions and messages (--quiet)'),
  261. ('remove_signed_txfiles', 'removing signed transaction files'),
  262. ('create_bad_txfiles', 'creating bad transaction files'),
  263. ('remove_signed_msgfiles', 'removing signed message files'),
  264. ('create_invalid_msgfile', 'creating invalid message file'),
  265. ('sign_full_summary_msg', 'signing transactions and messages (--full-summary)'),
  266. ('remove_invalid_msgfile', 'removing invalid message file'),
  267. ('remove_bad_txfiles', 'removing bad transaction files'),
  268. ('sign_no_unsigned_msg', 'signing transactions and messages (nothing to sign)'),
  269. ('stop_daemons', 'stopping daemons'),
  270. )
  271. def do_sign(self,args,have_msg=False):
  272. t = self.spawn('mmgen-autosign', self.opts + args )
  273. t.expect(
  274. f'{self.tx_count} transactions signed' if self.tx_count else
  275. 'No unsigned transactions' )
  276. if self.bad_tx_count:
  277. t.expect(f'{self.bad_tx_count} transactions failed to sign')
  278. t.req_exit_val = 1
  279. if have_msg:
  280. t.expect(
  281. f'{self.good_msg_count} message files{{0,1}} signed' if self.good_msg_count else
  282. 'No unsigned message files', regex=True )
  283. if self.bad_msg_count:
  284. t.expect(f'{self.bad_msg_count} message files{{0,1}} failed to sign', regex=True)
  285. t.req_exit_val = 1
  286. if 'wait' in args:
  287. t.expect('Waiting')
  288. t.kill(2)
  289. t.req_exit_val = 1
  290. else:
  291. t.read()
  292. imsg('')
  293. return t
  294. def sign_quiet(self):
  295. return self.do_sign(['--quiet','wait'])
  296. def sign_full_summary(self):
  297. return self.do_sign(['--full-summary','wait'])
  298. def sign_led(self):
  299. return self.do_sign(['--quiet','--led'])
  300. def sign_stealth_led(self):
  301. return self.do_sign(['--quiet','--stealth-led','wait'])
  302. def sign_quiet_msg(self):
  303. return self.do_sign(['--quiet','wait'],have_msg=True)
  304. def sign_full_summary_msg(self):
  305. return self.do_sign(['--full-summary','wait'],have_msg=True)
  306. def sign_no_unsigned_msg(self):
  307. self.tx_count = 0
  308. self.good_msg_count = 0
  309. self.bad_msg_count = 0
  310. return self.do_sign(['--quiet','wait'],have_msg=True)
  311. class TestSuiteAutosignBTC(TestSuiteAutosign):
  312. 'autosigning BTC transactions'
  313. coins = ['btc']
  314. daemon_coins = ['btc']
  315. txfile_coins = ['btc']
  316. class TestSuiteAutosignLive(TestSuiteAutosignBTC):
  317. 'live autosigning BTC transactions'
  318. live = True
  319. cmd_group = (
  320. ('start_daemons', 'starting daemons'),
  321. ('copy_tx_files', 'copying transaction files'),
  322. ('gen_key', 'generating key'),
  323. ('make_wallet_bip39', 'making wallet (BIP39)'),
  324. ('sign_live', 'signing transactions'),
  325. ('create_bad_txfiles', 'creating bad transaction files'),
  326. ('sign_live_led', 'signing transactions (--led)'),
  327. ('remove_bad_txfiles', 'removing bad transaction files'),
  328. ('sign_live_stealth_led','signing transactions (--stealth-led)'),
  329. ('stop_daemons', 'stopping daemons'),
  330. )
  331. def sign_live(self):
  332. return self.do_sign_live([])
  333. def sign_live_led(self):
  334. return self.do_sign_live(['--led'])
  335. def sign_live_stealth_led(self):
  336. return self.do_sign_live(['--stealth-led'])
  337. def do_sign_live(self,led_opts):
  338. def prompt_remove():
  339. omsg_r(blue('\nRemove removable device and then hit ENTER '))
  340. input()
  341. def prompt_insert_sign(t):
  342. omsg(blue(insert_msg))
  343. t.expect(f'{self.tx_count} transactions signed')
  344. if self.bad_tx_count:
  345. t.expect(f'{self.bad_tx_count} transactions failed to sign')
  346. t.expect('Waiting')
  347. if led_opts:
  348. opts_msg = "'" + ' '.join(led_opts) + "'"
  349. info_msg = f"Running 'mmgen-autosign wait' with {led_opts[0]}. " + {
  350. '--led': "The LED should start blinking slowly now",
  351. '--stealth-led': "You should see no LED activity now"
  352. }[led_opts[0]]
  353. insert_msg = 'Insert removable device and watch for fast LED activity during signing'
  354. else:
  355. opts_msg = 'no LED'
  356. info_msg = "Running 'mmgen-autosign wait'"
  357. insert_msg = 'Insert removable device '
  358. omsg(purple(f'Running autosign test with {opts_msg}'))
  359. do_umount(self.mountpoint)
  360. prompt_remove()
  361. omsg(green(info_msg))
  362. t = self.spawn(
  363. 'mmgen-autosign',
  364. self.opts + led_opts + ['--quiet','--no-summary','wait'])
  365. if not opt.exact_output:
  366. omsg('')
  367. prompt_insert_sign(t)
  368. do_mount(self.mountpoint) # race condition due to device insertion detection
  369. self.remove_signed_txfiles()
  370. do_umount(self.mountpoint)
  371. imsg(purple('\nKilling wait loop!'))
  372. t.kill(2) # 2 = SIGINT
  373. t.req_exit_val = 1
  374. if self.simulate and led_opts:
  375. t.expect("Stopping LED")
  376. return t
  377. class TestSuiteAutosignLiveSimulate(TestSuiteAutosignLive):
  378. 'live autosigning BTC transactions with simulated LED support'
  379. simulate = True