123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636 |
- #!/usr/bin/env python3
- #
- # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
- # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
- #
- # This program is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <http://www.gnu.org/licenses/>.
- """
- test.cmdtest_py_d.ct_autosign: Autosign tests for the cmdtest.py test suite
- """
- import sys,os,time,shutil
- from subprocess import run,DEVNULL
- from pathlib import Path
- from mmgen.cfg import Config
- from mmgen.color import red,green,blue,yellow,cyan,orange,purple,gray
- from mmgen.util import msg,suf,die,indent
- from mmgen.led import LEDControl
- from mmgen.autosign import Autosign, Signable
- from ..include.common import (
- cfg,
- omsg,
- omsg_r,
- oqmsg,
- oqmsg_r,
- start_test_daemons,
- stop_test_daemons,
- joinpath,
- imsg,
- read_from_file
- )
- from .common import ref_dir,dfl_words_file,dfl_bip39_file
- from .ct_base import CmdTestBase
- from .input import stealth_mnemonic_entry
- class CmdTestAutosignBase(CmdTestBase):
- networks = ('btc',)
- tmpdir_nums = [18]
- color = True
- win_skip = True
- threaded = False
- daemon_coins = []
- def __init__(self,trunner,cfgs,spawn):
- CmdTestBase.__init__(self,trunner,cfgs,spawn)
- if trunner is None:
- return
- self.silent_mount = self.live or not (cfg.exact_output or cfg.verbose)
- self.network_ids = [c+'_tn' for c in self.daemon_coins] + self.daemon_coins
- self._create_autosign_instances(create_dirs=not cfg.skipping_deps)
- if not (cfg.skipping_deps or self.live):
- self._create_removable_device()
- self.opts = ['--coins='+','.join(self.coins)]
- if not self.live:
- self.spawn_env['MMGEN_TEST_SUITE_ROOT_PFX'] = self.tmpdir
- if self.threaded:
- self.spawn_env['MMGEN_TEST_SUITE_AUTOSIGN_THREADED'] = '1'
- def _create_autosign_instances(self,create_dirs):
- d = {'offline': {'name':'asi'}}
- if self.have_online:
- d['online'] = {'name':'asi_online'}
- for subdir,data in d.items():
- if create_dirs and not self.live:
- for k in ('mountpoint','wallet_dir','dev_label_dir'):
- if k == 'wallet_dir' and subdir == 'online':
- continue
- (Path(self.tmpdir) / (subdir + getattr(Autosign,'dfl_'+k))).mkdir(parents=True,exist_ok=True)
- setattr(self,data['name'],
- Autosign(
- Config({
- 'coins': ','.join(self.coins),
- 'test_suite': True,
- 'test_suite_xmr_autosign': self.name == 'CmdTestXMRAutosign',
- 'test_suite_autosign_threaded': self.threaded,
- 'test_suite_root_pfx': None if self.live else self.tmpdir,
- 'online': subdir == 'online',
- })))
- def _create_removable_device(self):
- redir = DEVNULL
- img_file = str(self.asi.fs_image_path)
- run(['truncate', '--size=10M', img_file], check=True)
- run(['/sbin/mkfs.ext2', '-E', f'root_owner={os.getuid()}:{os.getgid()}', img_file],
- stdout=redir, stderr=redir, check=True)
- self.do_mount(no_dir_chk=True)
- (self.asi.mountpoint / 'tx').mkdir()
- self.do_umount()
- def start_daemons(self):
- self.spawn('',msg_only=True)
- start_test_daemons(*self.network_ids)
- return 'ok'
- def stop_daemons(self):
- self.spawn('',msg_only=True)
- stop_test_daemons(*self.network_ids)
- return 'ok'
- def run_setup(
- self,
- mn_type = None,
- mn_file = None,
- use_dfl_wallet = False,
- passwd = 'abc'):
- mn_desc = mn_type or 'default'
- mn_type = mn_type or 'mmgen'
- t = self.spawn(
- 'mmgen-autosign',
- self.opts
- + ([] if mn_desc == 'default' else [f'--mnemonic-fmt={mn_type}'])
- + ['setup'],
- no_passthru_opts = True)
- if use_dfl_wallet:
- t.expect( 'Use default wallet for autosigning? (Y/n): ', 'y' )
- t.passphrase('MMGen wallet', passwd)
- else:
- if use_dfl_wallet is not None: # None => no dfl wallet present
- t.expect( 'Use default wallet for autosigning? (Y/n): ', 'n' )
- mn_file = mn_file or { 'mmgen': dfl_words_file, 'bip39': dfl_bip39_file }[mn_type]
- mn = read_from_file(mn_file).strip().split()
- from mmgen.mn_entry import mn_entry
- entry_mode = 'full'
- mne = mn_entry( cfg, mn_type, entry_mode )
- t.expect('words: ',{ 12:'1', 18:'2', 24:'3' }[len(mn)])
- t.expect('OK? (Y/n): ','\n')
- t.expect('Type a number.*: ',str(mne.entry_modes.index(entry_mode)+1),regex=True)
- stealth_mnemonic_entry(t,mne,mn,entry_mode)
- t.written_to_file('Autosign wallet')
- return t
- def _mount_ops(self, loc, cmd, *args, **kwargs):
- return getattr(getattr(self,loc),cmd)(*args, silent=self.silent_mount, **kwargs)
- def do_mount(self, *args, **kwargs):
- return self._mount_ops('asi', 'do_mount', *args, **kwargs)
- def do_umount(self, *args, **kwargs):
- return self._mount_ops('asi', 'do_umount', *args, **kwargs)
- def do_mount_online(self, *args, **kwargs):
- return self._mount_ops('asi_online', 'do_mount', *args, **kwargs)
- def do_umount_online(self, *args, **kwargs):
- return self._mount_ops('asi_online', 'do_umount', *args, **kwargs)
- class CmdTestAutosignThreaded(CmdTestAutosignBase):
- have_online = True
- live = False
- no_insert_check = False
- threaded = True
- def autosign_start_thread(self):
- def run():
- t = self.spawn(
- 'mmgen-autosign',
- self.opts + ['--full-summary','wait'],
- direct_exec = True,
- no_passthru_opts = True,
- spawn_env_override = self.spawn_env | {'EXEC_WRAPPER_DO_RUNTIME_MSG': ''})
- self.write_to_tmpfile('autosign_thread_pid',str(t.ep.pid))
- import threading
- threading.Thread(target=run, name='Autosign wait loop').start()
- time.sleep(0.2)
- return 'silent'
- def autosign_kill_thread(self):
- self.spawn('',msg_only=True)
- pid = int(self.read_from_tmpfile('autosign_thread_pid'))
- self.delete_tmpfile('autosign_thread_pid')
- from signal import SIGTERM
- imsg(purple(f'Killing autosign wait loop [PID {pid}]'))
- try:
- os.kill(pid,SIGTERM)
- except:
- imsg(yellow(f'{pid}: no such process'))
- return 'ok'
- def _wait_signed(self,desc):
- oqmsg_r(gray(f'→ offline wallet{"s" if desc.endswith("s") else ""} signing {desc}'))
- assert not self.device_inserted, f'‘{self.asi.dev_label_path}’ is inserted!'
- assert not self.asi.mountpoint.is_mount(), f'‘{self.asi.mountpoint}’ is mounted!'
- self.insert_device()
- while True:
- oqmsg_r(gray('.'))
- if self.asi.mountpoint.is_mount():
- oqmsg_r(gray('..working..'))
- break
- time.sleep(0.5)
- while True:
- oqmsg_r(gray('.'))
- if not self.asi.mountpoint.is_mount():
- oqmsg(gray('..done'))
- break
- time.sleep(0.5)
- imsg('')
- self.remove_device()
- @property
- def device_inserted(self):
- return self.asi.dev_label_path.exists()
- def insert_device(self):
- self.asi.dev_label_path.touch()
- def remove_device(self):
- if self.asi.dev_label_path.exists():
- self.asi.dev_label_path.unlink()
- @property
- def device_inserted_online(self):
- return self.asi_online.dev_label_path.exists()
- def insert_device_online(self):
- self.asi_online.dev_label_path.touch()
- def remove_device_online(self):
- if self.asi_online.dev_label_path.exists():
- self.asi_online.dev_label_path.unlink()
- class CmdTestAutosign(CmdTestAutosignBase):
- 'autosigning transactions for all supported coins'
- coins = ['btc','bch','ltc','eth']
- daemon_coins = ['btc','bch','ltc']
- txfile_coins = ['btc','bch','ltc','eth','mm1','etc']
- have_online = False
- live = False
- simulate_led = True
- no_insert_check = True
- filedir_map = (
- ('btc',''),
- ('bch',''),
- ('ltc','litecoin'),
- ('eth','ethereum'),
- ('mm1','ethereum'),
- ('etc','ethereum_classic'),
- )
- cmd_group = (
- ('start_daemons', 'starting daemons'),
- ('copy_tx_files', 'copying transaction files'),
- ('gen_key', 'generating key'),
- ('create_dfl_wallet', 'creating default MMGen wallet'),
- ('run_setup_dfl_wallet', 'running ‘autosign setup’ (with default wallet)'),
- ('sign_quiet', 'signing transactions (--quiet)'),
- ('remove_signed_txfiles', 'removing signed transaction files'),
- ('run_setup_bip39', 'running ‘autosign setup’ (BIP39 mnemonic)'),
- ('create_bad_txfiles', 'creating bad transaction files'),
- ('sign_full_summary', 'signing transactions (--full-summary)'),
- ('remove_signed_txfiles_btc','removing transaction files (BTC only)'),
- ('remove_bad_txfiles', 'removing bad transaction files'),
- ('sign_led', 'signing transactions (--led - BTC files only)'),
- ('remove_signed_txfiles', 'removing signed transaction files'),
- ('sign_stealth_led', 'signing transactions (--stealth-led)'),
- ('remove_signed_txfiles', 'removing signed transaction files'),
- ('copy_msgfiles', 'copying message files'),
- ('sign_quiet_msg', 'signing transactions and messages (--quiet)'),
- ('remove_signed_txfiles', 'removing signed transaction files'),
- ('create_bad_txfiles2', 'creating bad transaction files'),
- ('remove_signed_msgfiles', 'removing signed message files'),
- ('create_invalid_msgfile', 'creating invalid message file'),
- ('sign_full_summary_msg', 'signing transactions and messages (--full-summary)'),
- ('remove_invalid_msgfile', 'removing invalid message file'),
- ('remove_bad_txfiles2', 'removing bad transaction files'),
- ('sign_no_unsigned_msg', 'signing transactions and messages (nothing to sign)'),
- ('stop_daemons', 'stopping daemons'),
- )
- def __init__(self,trunner,cfgs,spawn):
- super().__init__(trunner,cfgs,spawn)
- if trunner is None:
- return
- if self.live and not cfg.exact_output:
- die(1,red('autosign_live tests must be run with --exact-output enabled!'))
- if self.no_insert_check:
- self.opts.append('--no-insert-check')
- self.tx_file_ops('set_count') # initialize self.tx_count here so we can resume anywhere
- self.bad_tx_count = 0
- def gen_msg_fns():
- fmap = dict(self.filedir_map)
- for coin in self.coins:
- if coin == 'xmr':
- continue
- sdir = os.path.join('test','ref',fmap[coin])
- for fn in os.listdir(sdir):
- if fn.endswith(f'[{coin.upper()}].rawmsg.json'):
- yield os.path.join(sdir,fn)
- self.ref_msgfiles = tuple(gen_msg_fns())
- self.good_msg_count = 0
- self.bad_msg_count = 0
- if self.simulate_led:
- LEDControl.create_dummy_control_files()
- self.have_dummy_control_files = True
- def __del__(self):
- if hasattr(self,'have_dummy_control_files'):
- LEDControl.delete_dummy_control_files()
- def gen_key(self):
- t = self.spawn( 'mmgen-autosign', self.opts + ['gen_key'] )
- t.expect_getend('Wrote key file ')
- return t
- def create_dfl_wallet(self):
- t = self.spawn( 'mmgen-walletconv', [
- f'--outdir={cfg.data_dir}',
- '--usr-randchars=0', '--quiet', '--hash-preset=1', '--label=foo',
- 'test/ref/98831F3A.hex'
- ]
- )
- t.passphrase_new('new MMGen wallet','abc')
- t.written_to_file('MMGen wallet')
- return t
- def run_setup_dfl_wallet(self):
- return self.run_setup(mn_type='default',use_dfl_wallet=True)
- def run_setup_bip39(self):
- return self.run_setup(mn_type='bip39')
- def copy_tx_files(self):
- self.spawn('',msg_only=True)
- return self.tx_file_ops('copy')
- def remove_signed_txfiles(self):
- self.tx_file_ops('remove_signed')
- return 'skip'
- def remove_signed_txfiles_btc(self):
- self.tx_file_ops('remove_signed',txfile_coins=['btc'])
- return 'skip'
- def tx_file_ops(self,op,txfile_coins=[]):
- assert op in ('copy','set_count','remove_signed')
- from .ct_ref import CmdTestRef
- def gen():
- d = CmdTestRef.sources['ref_tx_file']
- dirmap = [e for e in self.filedir_map if e[0] in (txfile_coins or self.txfile_coins)]
- for coin,coindir in dirmap:
- for network in (0,1):
- fn = d[coin][network]
- if fn:
- yield (coindir,fn)
- data = list(gen()) + [('','25EFA3[2.34].testnet.rawtx')] # TX with 2 non-MMGen outputs
- self.tx_count = len(data)
- if op == 'set_count':
- return
- self.do_mount()
- for coindir,fn in data:
- src = joinpath(ref_dir,coindir,fn)
- if cfg.debug_utf8:
- ext = '.testnet.rawtx' if fn.endswith('.testnet.rawtx') else '.rawtx'
- fn = fn[:-len(ext)] + '-α' + ext
- target = joinpath(self.asi.mountpoint,'tx',fn)
- if not op == 'remove_signed':
- shutil.copyfile(src,target)
- try:
- os.unlink(target.replace('.rawtx','.sigtx'))
- except:
- pass
- self.do_umount()
- return 'ok'
- def create_bad_txfiles(self):
- return self.bad_txfiles('create')
- def remove_bad_txfiles(self):
- return self.bad_txfiles('remove')
- create_bad_txfiles2 = create_bad_txfiles
- remove_bad_txfiles2 = remove_bad_txfiles
- def bad_txfiles(self,op):
- self.do_mount()
- # create or delete 2 bad tx files
- self.spawn('',msg_only=True)
- fns = [joinpath(self.asi.mountpoint,'tx',f'bad{n}.rawtx') for n in (1,2)]
- if op == 'create':
- for fn in fns:
- with open(fn,'w') as fp:
- fp.write('bad tx data\n')
- self.bad_tx_count = 2
- elif op == 'remove':
- for fn in fns:
- try:
- os.unlink(fn)
- except:
- pass
- self.bad_tx_count = 0
- self.do_umount()
- return 'ok'
- def copy_msgfiles(self):
- return self.msgfile_ops('copy')
- def remove_signed_msgfiles(self):
- return self.msgfile_ops('remove_signed')
- def create_invalid_msgfile(self):
- return self.msgfile_ops('create_invalid')
- def remove_invalid_msgfile(self):
- return self.msgfile_ops('remove_invalid')
- def msgfile_ops(self,op):
- self.spawn('',msg_only=True)
- destdir = joinpath(self.asi.mountpoint,'msg')
- self.do_mount()
- os.makedirs(destdir,exist_ok=True)
- if op.endswith('_invalid'):
- fn = os.path.join(destdir,'DEADBE[BTC].rawmsg.json')
- if op == 'create_invalid':
- with open(fn,'w') as fp:
- fp.write('bad data\n')
- self.bad_msg_count += 1
- elif op == 'remove_invalid':
- os.unlink(fn)
- self.bad_msg_count -= 1
- else:
- for fn in self.ref_msgfiles:
- if op == 'copy':
- if os.path.basename(fn) == 'ED405C[BTC].rawmsg.json': # contains bad Seed ID
- self.bad_msg_count += 1
- else:
- self.good_msg_count += 1
- imsg(f'Copying: {fn} -> {destdir}')
- shutil.copy2(fn,destdir)
- elif op == 'remove_signed':
- os.unlink(os.path.join( destdir, os.path.basename(fn).replace('rawmsg','sigmsg') ))
- self.do_umount()
- return 'ok'
- def do_sign(self, args, have_msg=False):
- tx_desc = Signable.transaction.desc
- t = self.spawn('mmgen-autosign', self.opts + args)
- t.expect(
- f'{self.tx_count} {tx_desc}{suf(self.tx_count)} signed' if self.tx_count else
- f'No unsigned {tx_desc}s')
- if self.bad_tx_count:
- t.expect(f'{self.bad_tx_count} {tx_desc}{suf(self.bad_tx_count)} failed to sign')
- t.req_exit_val = 1
- if have_msg:
- t.expect(
- f'{self.good_msg_count} message file{suf(self.good_msg_count)}{{0,1}} signed'
- if self.good_msg_count else
- 'No unsigned message files', regex=True)
- if self.bad_msg_count:
- t.expect(
- f'{self.bad_msg_count} message file{suf(self.bad_msg_count)}{{0,1}} failed to sign',
- regex = True)
- t.req_exit_val = 1
- if 'wait' in args:
- t.expect('Waiting')
- imsg(purple('\nKilling wait loop!'))
- t.kill(2)
- t.req_exit_val = 1
- else:
- t.read()
- imsg('')
- return t
- def sign_quiet(self):
- return self.do_sign(['--quiet','wait'])
- def sign_full_summary(self):
- return self.do_sign(['--full-summary','wait'])
- def sign_led(self):
- return self.do_sign(['--quiet','--led'])
- def sign_stealth_led(self):
- return self.do_sign(['--quiet','--stealth-led','wait'])
- def sign_quiet_msg(self):
- return self.do_sign(['--quiet','wait'],have_msg=True)
- def sign_full_summary_msg(self):
- return self.do_sign(['--full-summary','wait'],have_msg=True)
- def sign_no_unsigned_msg(self):
- self.tx_count = 0
- self.good_msg_count = 0
- self.bad_msg_count = 0
- return self.do_sign(['--quiet','wait'],have_msg=True)
- class CmdTestAutosignBTC(CmdTestAutosign):
- 'autosigning BTC transactions'
- coins = ['btc']
- daemon_coins = ['btc']
- txfile_coins = ['btc']
- class CmdTestAutosignLive(CmdTestAutosignBTC):
- 'live autosigning BTC transactions'
- live = True
- simulate_led = False
- no_insert_check = False
- cmd_group = (
- ('start_daemons', 'starting daemons'),
- ('copy_tx_files', 'copying transaction files'),
- ('gen_key', 'generating key'),
- ('run_setup_mmgen', 'running ‘autosign setup’ (MMGen native mnemonic)'),
- ('sign_live', 'signing transactions'),
- ('create_bad_txfiles', 'creating bad transaction files'),
- ('sign_live_led', 'signing transactions (--led)'),
- ('remove_bad_txfiles', 'removing bad transaction files'),
- ('sign_live_stealth_led','signing transactions (--stealth-led)'),
- ('stop_daemons', 'stopping daemons'),
- )
- def __init__(self,trunner,cfgs,spawn):
- super().__init__(trunner,cfgs,spawn)
- if trunner is None:
- return
- try:
- cf = LEDControl(enabled=True,simulate=self.simulate_led)
- except Exception as e:
- msg(str(e))
- die(2,'LEDControl initialization failed')
- for path in (cf.board.status,cf.board.trigger):
- if path:
- run(['sudo','chmod','0666',path],check=True)
- def run_setup_mmgen(self):
- return self.run_setup(mn_type='mmgen',use_dfl_wallet=None)
- def sign_live(self):
- return self.do_sign_live()
- def sign_live_led(self):
- return self.do_sign_live(['--led'], 'The LED should start blinking slowly now')
- def sign_live_stealth_led(self):
- return self.do_sign_live(['--stealth-led'], 'You should see no LED activity now')
- def do_sign_live(self,led_opts=None,led_msg=None):
- def prompt_remove():
- omsg_r(orange('\nExtract removable device and then hit ENTER '))
- input()
- def prompt_insert_sign(t):
- omsg(orange(insert_msg))
- t.expect(f'{self.tx_count} transactions signed')
- if self.bad_tx_count:
- t.expect(f'{self.bad_tx_count} transactions failed to sign')
- t.expect('Waiting')
- if led_opts:
- opts_msg = '‘' + ' '.join(led_opts) + '’'
- info_msg = 'Running ‘mmgen-autosign wait’ with {}. {}'.format(opts_msg, led_msg)
- insert_msg = 'Insert removable device and watch for fast LED activity during signing'
- else:
- opts_msg = 'no LED'
- info_msg = 'Running ‘mmgen-autosign wait’'
- insert_msg = 'Insert removable device '
- self.spawn('', msg_only=True)
- self.do_umount()
- prompt_remove()
- omsg('\n' + cyan(indent(info_msg)))
- t = self.spawn(
- 'mmgen-autosign',
- self.opts + (led_opts or []) + ['--quiet', '--no-summary', 'wait'],
- no_msg = True)
- if not cfg.exact_output:
- omsg('')
- prompt_insert_sign(t)
- self.do_mount() # race condition due to device insertion detection
- self.remove_signed_txfiles()
- self.do_umount()
- imsg(purple('\nKilling wait loop!'))
- t.kill(2) # 2 = SIGINT
- t.req_exit_val = 1
- if self.simulate_led and led_opts:
- t.expect('Stopping LED')
- return t
- class CmdTestAutosignLiveSimulate(CmdTestAutosignLive):
- 'live autosigning BTC transactions with simulated LED support'
- simulate_led = True