py3port: update tests

This commit is contained in:
The MMGen Project 2018-10-31 18:20:59 +00:00
commit 0eff2811e1
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
8 changed files with 233 additions and 165 deletions

View file

@ -50,7 +50,7 @@ def getrandnum_range(nbytes,rn_max):
def getrandstr(num_chars,no_space=False):
n,m = 95,32
if no_space: n,m = 94,33
return ''.join([chr(ord(i)%n+m) for i in list(os.urandom(num_chars))])
return ''.join([chr(i%n+m) for i in list(os.urandom(num_chars))])
def mk_tmpdir(d):
try: os.mkdir(d,0o755)

View file

@ -18,21 +18,22 @@ monero_addrs='3,99,2,22-24,101-104'
dfl_tests='obj sha256 alts monero eth autosign btc btc_tn btc_rt bch bch_rt ltc ltc_tn ltc_rt tool gen'
PROGNAME=$(basename $0)
while getopts hCefilnPtv OPT
while getopts hCfilnPRtvV OPT
do
case "$OPT" in
h) printf " %-16s Test MMGen release\n" "${PROGNAME}:"
echo " USAGE: $PROGNAME [options] [branch] [tests]"
echo " OPTIONS: '-h' Print this help message"
echo " '-C' Run tests in coverage mode"
echo " '-e' Run test/test.py with '--exact-output' argument"
echo " '-f' Speed up the tests by using fewer rounds"
echo " '-i' Install only; don't run tests"
echo " '-l' List the test name symbols"
echo " '-n' Don't install; test in place"
echo " '-P' Don't pause between tests"
echo " '-R' Don't remove temporary files after program has exited"
echo " '-t' Print the tests without running them"
echo " '-e' Run test/test.py with '--verbose' argument"
echo " '-v' Run test/test.py with '--exact-output' and other commands with '--verbose' switch"
echo " '-V' Run test/test.py and other commands with '--verbose' switch"
echo " AVAILABLE TESTS:"
echo " obj - data objects"
echo " sha256 - MMGen sha256 implementation"
@ -45,8 +46,6 @@ do
echo " btc_rt - bitcoin regtest"
echo " bch - bitcoin cash (BCH)"
echo " bch_rt - bitcoin cash (BCH) regtest"
# echo " b2x - bitcoin 2x (B2X)"
# echo " b2x_rt - bitcoin 2x (B2X) regtest"
echo " ltc - litecoin"
echo " ltc_tn - litecoin testnet"
echo " ltc_rt - litecoin regtest"
@ -56,27 +55,32 @@ do
exit ;;
C) mkdir -p 'test/trace'
touch 'test/trace.acc'
test_py="$test_py --coverage"
tooltest_py="$tooltest_py --coverage"
scrambletest_py="$scrambletest_py --coverage"
test_py+=" --coverage"
tooltest_py+=" --coverage"
scrambletest_py+=" --coverage"
python="python3 -m trace --count --file=test/trace.acc --coverdir=test/trace"
objtest_py="$python $objtest_py"
gentest_py="$python $gentest_py"
mmgen_tool="$python $mmgen_tool"
mmgen_keygen="$python $mmgen_keygen"
rounds=2 rounds_low=2 rounds_spec=2 gen_rounds=2 monero_addrs='3,23,105' ;;
e) test_py="$test_py --exact-output" ;;
f) rounds=2 rounds_low=2 rounds_spec=2 gen_rounds=2 monero_addrs='3,23,105' ;;
i) INSTALL_ONLY=1 ;;
l) echo $dfl_tests; exit ;;
n) NO_INSTALL=1 ;;
P) NO_PAUSE=1 ;;
R) NO_TMPFILE_REMOVAL=1 ;;
t) TESTING=1 ;;
v) test_py="$test_py --verbose" ;;
v) EXACT_OUTPUT=1 test_py+=" --exact-output" ;&
V) VERBOSE=1 [ "$EXACT_OUTPUT" ] || test_py+=" --verbose"
tooltest_py+=" --verbose" gentest_py+=" --verbose" mmgen_tool+=" --verbose"
scrambletest_py+=" --verbose" ;;
*) exit ;;
esac
done
[ "$EXACT_OUTPUT" -o "$VERBOSE" ] || objtest_py+=" -S"
shift $((OPTIND-1))
REFDIR='test/ref'
@ -128,10 +132,10 @@ do_test() {
i_obj='Data object'
s_obj='Testing data objects'
t_obj=(
"$objtest_py --coin=btc -S"
"$objtest_py --coin=btc --testnet=1 -S"
"$objtest_py --coin=ltc -S"
"$objtest_py --coin=ltc --testnet=1 -S")
"$objtest_py --coin=btc"
"$objtest_py --coin=btc --testnet=1"
"$objtest_py --coin=ltc"
"$objtest_py --coin=ltc --testnet=1")
f_obj='Data object test complete'
i_sha256='MMGen sha256 implementation'
@ -381,6 +385,7 @@ tests=$dfl_tests
check_args
echo "Running tests: $tests"
run_tests "$tests"
rm -rf /tmp/mmgen-test-release-*
[ "$NO_TMPFILE_REMOVAL" ] || rm -rf /tmp/mmgen-test-release-*
echo -e "${GREEN}All OK$RESET"

View file

@ -5,9 +5,12 @@ sys.path.insert(0,'.')
if 'TMUX' in os.environ: del os.environ['TMUX']
os.environ['MMGEN_TRACEBACK'] = '1'
tb_source = open(sys.argv[1])
tb_source = open(sys.argv[1]).read()
tb_file = os.path.join(os.environ['PWD'],'my.err')
try: os.unlink(os.path.join(repo_root,tb_file))
except: pass
def process_exception():
l = traceback.format_exception(*sys.exc_info())
l_save = l[:]
@ -26,6 +29,10 @@ except SystemExit as e:
if e.code != 0:
process_exception()
sys.exit(e.code)
except:
except Exception as e:
process_exception()
sys.exit(1)
sys.exit(e.mmcode if hasattr(e,'mmcode') else e.code if hasattr(e,'code') else 1)
else:
print('else: '+repr(sys.exc_info()))
finally:
print('finally: '+repr(sys.exc_info()))

View file

@ -90,29 +90,31 @@ def keyconv_sec2addr(sec):
def zcash_mini_sec2addr(sec):
p = sp.Popen(['zcash-mini','-key','-simple'],stderr=sp.PIPE,stdin=sp.PIPE,stdout=sp.PIPE)
p.stdin.write(sec.wif+'\n')
o = p.stdout.read().split()
return sec.wif,o[0],o[-1]
ret = p.communicate(sec.wif.encode()+b'\n')[0].decode().strip().split('\n')
return (sec.wif,ret[0],ret[-1])
def pycoin_sec2addr(sec):
coin = ci.external_tests['testnet']['pycoin'][g.coin] if g.testnet else g.coin
key = pcku.parse_key(sec.decode(),[network_for_netcode(coin)],secp256k1_generator)[1]
key = pcku.parse_key(sec.decode(),[network_for_netcode(coin)])[1]
if key is None: die(1,"can't parse {}".format(sec))
o = pcku.create_output(sec,key,network_for_netcode(coin))[0]
suf = ('_uncompressed','')[addr_type.compressed]
wif = o['wif{}'.format(suf)]
addr = o['p2sh_segwit' if addr_type.name == 'segwit' else '{}_address{}'.format(coin,suf)]
return wif,addr
d = {
'legacy': ('wif_uncompressed','address_uncompressed'),
'compressed': ('wif','address'),
'segwit': ('wif','p2sh_segwit'),
}[addr_type.name]
return [pcku.create_output(sec,key,network_for_netcode(coin),d[i])[0][d[i]] for i in (0,1)]
# pycoin/networks/all.py pycoin/networks/legacy_networks.py
def init_external_prog():
global b,b_desc,ext_lib,ext_sec2addr,sp,eth,addr_type
def test_support(k):
if b == k: return True
if b != 'ext' and b != k: return False
if g.coin in ci.external_tests['mainnet'][k] and not g.testnet: return True
if g.coin in ci.external_tests['testnet'][k]: return True
return False
if b == 'zcash_mini' or addr_type.name == 'zcash_z':
import subprocess as sp
from mmgen.protocol import init_coin

View file

@ -60,16 +60,14 @@ def my_expect(p,s,t='',delay=send_delay,regex=False,nonl=False,silent=False):
ret = f(s,timeout=(60,5)[bool(opt.debug_pexpect)])
except pexpect.TIMEOUT:
if opt.debug_pexpect: raise
errmsg(red('\nERROR. Expect {}{}{} timed out. Exiting'.format(quo,s,quo)))
sys.exit(1)
rdie(1,red('\nERROR. Expect {}{}{} timed out. Exiting'.format(quo,s,quo)))
debug_pexpect_msg(p)
if opt.verbose and type(s) != str:
msg_r(' ==> {} '.format(ret))
if ret == -1:
errmsg('Error. Expect returned {}'.format(ret))
sys.exit(1)
rdie(1,'Error. Expect returned {}'.format(ret))
else:
if t == '':
if not nonl and not silent: vmsg('')
@ -79,8 +77,8 @@ def my_expect(p,s,t='',delay=send_delay,regex=False,nonl=False,silent=False):
def debug_pexpect_msg(p):
if opt.debug_pexpect:
errmsg('\n{}{}{}'.format(red('BEFORE ['),p.before,red(']')))
errmsg('{}{}{}'.format(red('MATCH ['),p.after,red(']')))
msg('\n{}{}{}'.format(red('BEFORE ['),p.before,red(']')))
msg('{}{}{}'.format(red('MATCH ['),p.after,red(']')))
data_dir = os.path.join('test','data_dir'+('','')[bool(os.getenv('MMGEN_DEBUG_UTF8'))])
@ -103,12 +101,13 @@ class MMGenPexpect(object):
else: cmd,args = mmgen_cmd,cmd_args
for i in args:
if type(i) not in (str,str):
if type(i) is not str:
m1 = 'Error: missing input files in cmd line?:'
m2 = '\nName: {}\nCmd: {}\nCmd args: {}'
die(2,(m1+m2).format(name,cmd,args))
if opt.popen_spawn:
# if opt.popen_spawn:
if True:
args = ['{q}{}{q}'.format(a,q="'" if ' ' in a else '') for a in args]
cmd_str = '{} {}'.format(cmd,' '.join(args)).replace('\\','/')
@ -146,12 +145,14 @@ class MMGenPexpect(object):
cmd_str = tc + ' ' + cmd_str
# Msg('\ncmd_str: {}'.format(cmd_str))
if opt.popen_spawn:
# NOTE: the following is outdated for Python 3
# PopenSpawn() requires cmd string to be bytes. However, it autoconverts unicode
# input to bytes, though this behavior seems to be undocumented. Setting 'encoding'
# to 'UTF-8' will cause pexpect to reject non-unicode string input.
self.p = PopenSpawn(cmd_str,encoding='utf8')
else:
self.p = pexpect.spawn(cmd,args)
self.p = pexpect.spawn(cmd_str,encoding='utf8')
self.p.delaybeforesend = 0
if opt.exact_output: self.p.logfile = sys.stdout
def do_decrypt_ka_data(self,hp,pw,desc='key-address data',check=True):
@ -172,8 +173,7 @@ class MMGenPexpect(object):
def ok(self,exit_val=0):
ret = self.p.wait()
# Msg('expect: {} got: {}'.format(exit_val,ret))
if ret != exit_val and not opt.coverage:
if ret not in (exit_val,None) and not opt.coverage: # Some cmds exit with None
die(1,red('test.py: spawned program exited with value {}'.format(ret)))
if opt.profile: return
if opt.verbose or opt.exact_output:
@ -209,15 +209,11 @@ class MMGenPexpect(object):
my_send(self.p,'\n')
else:
rand_chars = list(getrandstr(num_chars,no_space=True))
my_expect(self.p,'symbols left: ','x')
try:
vmsg_r('SEND ')
while self.p.expect('left: ',0.1) == 0:
ch = rand_chars.pop(0)
msg_r(yellow(ch)+' ' if opt.verbose else '+')
self.p.send(ch)
except:
vmsg('EOT')
vmsg_r('SEND ')
while rand_chars:
ch = rand_chars.pop(0)
msg_r(yellow(ch)+' ' if opt.verbose else '+')
ret = my_expect(self.p,'left: ',ch,delay=0.005)
my_expect(self.p,'ENTER to continue: ','\n')
def passphrase_new(self,desc,passphrase):
@ -288,7 +284,8 @@ class MMGenPexpect(object):
# return [l.rstrip()+'\n' for l in self.p.readlines()]
def read(self,n=None):
return self.p.read(n)
if n: return self.p.read(n)
else: return self.p.read()
def close(self):
if not opt.popen_spawn:

View file

@ -76,6 +76,10 @@ def run_test(test,arg,input_data):
cls = globals()[test]
ret = cls(*args,**kwargs)
bad_ret = list() if issubclass(cls,list) else None
if issubclass(type(ret_chk),str): ret_chk = ret_chk.encode()
if issubclass(type(ret),str): ret = ret.encode()
if (opt.silent and input_data=='bad' and ret!=bad_ret) or (not opt.silent and input_data=='bad'):
raise UserWarning("Non-'None' return value {} with bad input data".format(repr(ret)))
if opt.silent and input_data=='good' and ret==bad_ret:

View file

@ -27,11 +27,17 @@ repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.ar
os.chdir(repo_root)
sys.path.__setitem__(0,repo_root)
try: os.unlink(os.path.join(repo_root,'my.err'))
except: pass
# Import these _after_ local path's been added to sys.path
from mmgen.common import *
from mmgen.test import *
from mmgen.protocol import CoinProtocol,init_coin
class TestSuiteException(Exception): pass
class TestSuiteFatalException(Exception): pass
set_debug_all()
g.quiet = False # if 'quiet' was set in config file, disable here
@ -77,6 +83,8 @@ tool_enc_passwd = "Scrypt it, don't hash it!"
sample_text = \
'The Times 03/Jan/2009 Chancellor on brink of second bailout for banks\n'
chksum_pat = r'\b[A-F0-9]{4} [A-F0-9]{4} [A-F0-9]{4} [A-F0-9]{4}\b'
# Laggy flash media cause pexpect to crash, so create a temporary directory
# under '/dev/shm' and put datadir and temp files here.
shortopts = ''.join([e[1:] for e in sys.argv if len(e) > 1 and e[0] == '-' and e[1] != '-'])
@ -137,7 +145,7 @@ opts_data = lambda: {
-l, --list-cmds List and describe the commands in the test suite
-L, --log Log commands to file {lf}
-n, --names Display command names instead of descriptions
-O, --popen-spawn Use pexpect's popen_spawn instead of popen (always true, so ignored)
-O, --popen-spawn Use pexpect's popen_spawn instead of popen
-p, --pause Pause between tests, resuming on keypress
-P, --profile Record the execution time of each script
-q, --quiet Produce minimal output. Suppress dependency info
@ -160,7 +168,8 @@ If no command is given, the whole suite of tests is run.
sys.argv = [sys.argv[0]] + ['--data-dir',data_dir] + sys.argv[1:]
cmd_args = opts.init(opts_data)
opt.popen_spawn = True # popen has issues, so use popen_spawn always
# Under python3, with PopenSpawn we can no longer imitate cbreak mode with sys.stdin.read(1)
# opt.popen_spawn = True # popen has issues, so use popen_spawn always
if not opt.system: os.environ['PYTHONPATH'] = repo_root
@ -676,10 +685,10 @@ eth_token_bals = {
}
def eth_args():
assert g.coin in ('ETH','ETC'),'for ethdev tests, --coin must be set to either ETH or ETC'
if g.coin not in ('ETH','ETC'):
raise TestSuiteException('for ethdev tests, --coin must be set to either ETH or ETC')
return ['--outdir={}'.format(cfgs['22']['tmpdir']),'--rpc-port=8549','--quiet']
from copy import deepcopy
for a,b in (('6','11'),('7','12'),('8','13')):
cfgs[b] = deepcopy(cfgs[a])
@ -983,7 +992,8 @@ cmd_group['ethdev'] = (
('ethdev_contract_deploy', 'deploying contract (create,sign,send)'),
('ethdev_token_transfer_funds','transferring token funds from dev to user'),
('ethdev_token_fund_users', 'transferring token funds from dev to user'),
('ethdev_token_user_bals', 'show balances after transfer'),
('ethdev_token_addrgen', 'generating token addresses'),
('ethdev_token_addrimport_badaddr1','importing token addresses (no token address)'),
('ethdev_token_addrimport_badaddr2','importing token addresses (bad token address)'),
@ -1223,15 +1233,11 @@ usr_rand_chars = (5,30)[bool(opt.usr_random)]
usr_rand_arg = '-r{}'.format(usr_rand_chars)
cmd_total = 0
# Disable color in spawned scripts so we can parse their output
# Disable color in spawned scripts so pexpect can parse their output
os.environ['MMGEN_DISABLE_COLOR'] = '1'
os.environ['MMGEN_NO_LICENSE'] = '1'
os.environ['MMGEN_MIN_URANDCHARS'] = '3'
os.environ['MMGEN_BOGUS_SEND'] = '1'
def get_segwit_arg(cfg):
return ['--type='+('segwit','bech32')[bool(opt.bech32)]] if cfg['segwit'] else []
# Tell spawned programs they're running in the test suite
os.environ['MMGEN_TEST_SUITE'] = '1'
@ -1288,7 +1294,7 @@ if opt.list_cmds:
NL = ('\r\n','\n')[g.platform=='linux' and bool(opt.popen_spawn)]
def get_file_with_ext(ext,mydir,delete=True,no_dot=False,return_list=False):
def get_file_with_ext(ext,mydir,delete=True,no_dot=False,return_list=False,delete_all=False):
dot = ('.','')[bool(no_dot)]
flist = [os.path.join(mydir,f) for f in os.listdir(mydir) if f == ext or f[-len(dot+ext):] == dot+ext]
@ -1296,12 +1302,11 @@ def get_file_with_ext(ext,mydir,delete=True,no_dot=False,return_list=False):
if not flist: return False
if return_list: return flist
if len(flist) > 1:
if delete:
if len(flist) > 1 or delete_all:
if delete or delete_all:
if not opt.quiet:
msg("Multiple *.{} files in '{}' - deleting".format(ext,mydir))
for f in flist:
msg(f)
os.unlink(f)
return False
else:
@ -1325,11 +1330,10 @@ def get_addrfile_checksum(display=False):
def verify_checksum_or_exit(checksum,chk):
if checksum != chk:
errmsg(red('Checksum error: {}'.format(chk)))
sys.exit(1)
raise TestSuiteFatalException('Checksum error: {}'.format(chk))
vmsg(green('Checksums match: ') + cyan(chk))
from .test.mmgen_pexpect import MMGenPexpect
from test.mmgen_pexpect import MMGenPexpect
class MMGenExpect(MMGenPexpect):
def __init__(self,name,mmgen_cmd,cmd_args=[],extra_desc='',no_output=False,msg_only=False,no_msg=False):
@ -1449,8 +1453,8 @@ def create_tx_data(sources,addrs_per_wallet=addrs_per_wallet):
ad.add(al)
aix = AddrIdxList(fmt_str=cfgs[s]['addr_idx_list'])
if len(aix) != addrs_per_wallet:
errmsg(red('Address index list length != {}: {}'.format(addrs_per_wallet,repr(aix))))
sys.exit(0)
raise TestSuiteFatalException(
'Address index list length != {}: {}'.format(addrs_per_wallet,repr(aix)))
tx_data[s] = {
'addrfile': afile,
'chk': al.chksum,
@ -1526,12 +1530,10 @@ def do_between():
if keypress_confirm(green('Continue?'),default_yes=True):
if opt.verbose or opt.exact_output: sys.stderr.write('\n')
else:
errmsg('Exiting at user request')
sys.exit(0)
raise KeyboardInterrupt('Exiting at user request')
elif opt.verbose or opt.exact_output:
sys.stderr.write('\n')
rebuild_list = OrderedDict()
def check_needs_rerun(
@ -1591,10 +1593,8 @@ def refcheck(desc,chk,refchk):
if chk == refchk:
ok()
else:
if not opt.verbose: errmsg('')
m = "Fatal error - {} '{}' does not match reference value '{}'. Aborting test"
errmsg(red(m.format(desc,chk,refchk)))
sys.exit(3)
m = "\nFatal error - {} '{}' does not match reference value '{}'. Aborting test"
raise TestSuiteFatalException(m.format(desc,chk,refchk))
def check_deps(cmds):
if len(cmds) != 1:
@ -1703,7 +1703,6 @@ class MMGenTestSuite(object):
'addrgen','addrimport','keygen','passchg','tool','passgen','regtest','autosign')
for s in scripts:
t = MMGenExpect(name,('mmgen-'+s),[arg],extra_desc='(mmgen-{})'.format(s),no_output=True)
t.read()
t.ok()
def longhelpscreens(self,name): self.helpscreens(name,arg='--longhelp')
@ -1716,6 +1715,7 @@ class MMGenTestSuite(object):
t = MMGenExpect(name,'mmgen-walletgen', args + [usr_rand_arg])
t.license()
t.usr_rand(usr_rand_chars)
t.expect('Generating')
t.passphrase_new('new MMGen wallet',cfg['wpasswd'])
t.label()
global have_dfl_wallet
@ -1866,7 +1866,7 @@ class MMGenTestSuite(object):
input_sels_prompt='to spend',
bad_input_sels=False,non_mmgen_inputs=0,
interactive_fee='',
fee_desc='transaction fee',fee_res=None,
fee_desc='transaction fee',fee_res=None,eth_fee_res=None,
add_comment='',view='t',save=True,no_ok=False):
for choice in menu + ['q']:
t.expect(r'\[q\]uit view, .*?:.',choice,regex=True)
@ -1884,7 +1884,10 @@ class MMGenTestSuite(object):
t.send('y')
else:
if have_est_fee: t.send('n')
t.send(interactive_fee+'\n')
if eth_fee_res:
t.expect('or gas price: ',interactive_fee+'\n')
else:
t.send(interactive_fee+'\n')
if fee_res: t.expect(fee_res)
t.expect('OK? (Y/n): ','y')
@ -1977,8 +1980,11 @@ class MMGenTestSuite(object):
([],['--rbf'])[g.proto.cap('rbf')] +
['-f',tx_fee,'-B'] + add_args + cmd_args + txdo_args)
if t.expect([('Get','Transac')[cmdline_inputs],'Unable to connect to \S+'],regex=True) == 1:
raise TestSuiteException('\n'+t.p.after)
if cmdline_inputs:
t.written_to_file('Transaction')
t.written_to_file('tion')
t.ok()
return
@ -1988,19 +1994,12 @@ class MMGenTestSuite(object):
t.do_decrypt_ka_data(hp='1',pw=cfgs['14']['kapasswd'])
for num in tx_data:
t.expect_getend('Getting address data from file ')
t.expect_getend('ting address data from file ')
chk=t.expect_getend(r'Checksum for address data .*?: ',regex=True)
verify_checksum_or_exit(tx_data[num]['chk'],chk)
# not in tracking wallet warning, (1 + num sources) times
if t.expect(['Continue anyway? (y/N): ',
'Unable to connect to {}'.format(g.proto.daemon_name)]) == 0:
t.send('y')
else:
errmsg(red('Error: unable to connect to {}. Exiting'.format(g.proto.daemon_name)))
sys.exit(1)
for num in tx_data:
for num in range(len(tx_data) + 1):
t.expect('Continue anyway? (y/N): ','y')
outputs_list = [(addrs_per_wallet+1)*i + 1 for i in range(len(tx_data))]
@ -2297,9 +2296,9 @@ class MMGenTestSuite(object):
def txdo4(self,name,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12):
non_mm_fn = os.path.join(cfg['tmpdir'],non_mmgen_fn)
add_args = ['-d',cfg['tmpdir'],'-i','brain','-b'+cfg['bw_params'],'-p1','-k',non_mm_fn,'-M',f12]
get_file_with_ext('sigtx',cfg['tmpdir'],delete_all=True) # delete tx signed by txsign4
t = self.txcreate_common(name,sources=['1','2','3','4','14'],
non_mmgen_input='4',do_label=True,txdo_args=[f7,f8,f9,f10],add_args=add_args)
os.system('rm -f {}/*.sigtx'.format(cfg['tmpdir'].encode('utf8')))
for cnum,desc in (('1','incognito data'),('3','MMGen wallet')):
t.passphrase('{}'.format(desc),cfgs[cnum]['wpasswd'])
@ -2311,10 +2310,6 @@ class MMGenTestSuite(object):
os.system(cmd.encode())
t.ok()
def txbump4(self,name,f1,f2,f3,f4,f5,f6,f7,f8,f9): # f7:txfile,f9:'txdo'
non_mm_fn = os.path.join(cfg['tmpdir'],non_mmgen_fn)
self.txbump(name,f7,prepend_args=['-p1','-k',non_mm_fn,'-M',f1],seed_args=[f2,f3,f4,f5,f6,f8])
def txsign4(self,name,f1,f2,f3,f4,f5,f6):
non_mm_fn = os.path.join(cfg['tmpdir'],non_mmgen_fn)
a = ['-d',cfg['tmpdir'],'-i','brain','-b'+cfg['bw_params'],'-p1','-k',non_mm_fn,'-M',f6,f1,f2,f3,f4,f5]
@ -2329,6 +2324,10 @@ class MMGenTestSuite(object):
self.txsign_end(t,has_label=True)
t.ok()
def txbump4(self,name,f1,f2,f3,f4,f5,f6,f7,f8,f9): # f7:txfile,f9:'txdo'
non_mm_fn = os.path.join(cfg['tmpdir'],non_mmgen_fn)
self.txbump(name,f7,prepend_args=['-p1','-k',non_mm_fn,'-M',f1],seed_args=[f2,f3,f4,f5,f6,f8])
def walletgen5(self,name,del_dw_run='dummy'):
self.walletgen(name)
@ -2411,46 +2410,67 @@ class MMGenTestSuite(object):
cmp_or_die(hincog_offset,int(o))
def autosign(self,name): # tests everything except device detection, mount/unmount
if skip_for_win(): return
fdata = ( ('btc',''),
('bch',''),
('ltc','litecoin'),
('eth','ethereum'),
('erc20','ethereum'),
('etc','ethereum_classic'))
tfns = [cfgs['8']['ref_tx_file'][c][1] for c,d in fdata] + \
[cfgs['8']['ref_tx_file'][c][0] for c,d in fdata]
tfs = [os.path.join(ref_dir,d[1],fn) for d,fn in zip(fdata+fdata,tfns)]
try: os.mkdir(os.path.join(cfg['tmpdir'],'tx'))
except: pass
for f,fn in zip(tfs,tfns):
if fn: # use empty fn to skip file
shutil.copyfile(f,os.path.join(cfg['tmpdir'],'tx',fn))
# make a bad tx file
with open(os.path.join(cfg['tmpdir'],'tx','bad.rawtx'),'w') as f:
f.write('bad tx data')
opts = ['--mountpoint='+cfg['tmpdir'],'--coins=btc,bch,ltc,eth']
mn_fn = os.path.join(ref_dir,cfgs['8']['seed_id']+'.mmwords')
mn = read_from_file(mn_fn).strip().split()
t = MMGenExpect(name,'mmgen-autosign',opts+['gen_key'],extra_desc='(gen_key)')
t.expect_getend('Wrote key file ')
t.ok()
def copy_files_and_make_key():
fdata = ( ('btc',''),
('bch',''),
('ltc','litecoin'),
('eth','ethereum'),
('erc20','ethereum'),
('etc','ethereum_classic'))
tfns = [cfgs['8']['ref_tx_file'][c][1] for c,d in fdata] + \
[cfgs['8']['ref_tx_file'][c][0] for c,d in fdata]
tfs = [os.path.join(ref_dir,d[1],fn) for d,fn in zip(fdata+fdata,tfns)]
t = MMGenExpect(name,'mmgen-autosign',opts+['setup'],extra_desc='(setup)')
t.expect('words: ','3')
t.expect('OK? (Y/n): ','\n')
for i in range(24):
t.expect('word #{}: '.format(i+1),mn[i]+'\n')
wf = t.written_to_file('Autosign wallet')
t.ok()
try: os.mkdir(os.path.join(cfg['tmpdir'],'tx'))
except: pass
t = MMGenExpect(name,'mmgen-autosign',opts+['wait'],extra_desc='(sign)')
t.expect('11 transactions signed')
t.expect('1 transaction failed to sign')
t.expect('Waiting.')
t.kill(2)
t.ok(exit_val=1)
for f,fn in zip(tfs,tfns):
if fn: # use empty fn to skip file
shutil.copyfile(f,os.path.join(cfg['tmpdir'],'tx',fn))
# make a bad tx file
with open(os.path.join(cfg['tmpdir'],'tx','bad.rawtx'),'w') as f:
f.write('bad tx data')
t = MMGenExpect(name,'mmgen-autosign',opts+['gen_key'],extra_desc='(gen_key)')
t.expect_getend('Wrote key file ')
t.ok()
def get_mnemonic():
t = MMGenExpect(name,'mmgen-autosign',opts+['setup'],extra_desc='(setup)')
t.expect('words: ','3')
t.expect('OK? (Y/n): ','\n')
mn_fn = os.path.join(ref_dir,cfgs['8']['seed_id']+'.mmwords')
mn = read_from_file(mn_fn).strip().split()
mn = ['foo'] + mn[:5] + ['realiz','realized'] + mn[5:]
wnum = 1
for i in range(len(mn)):
em,rm = 'Enter word #{}: ','Repeat word #{}: '
ret = t.expect((em.format(wnum),rm.format(wnum-1)))
if ret == 0: wnum += 1
for j in range(len(mn[i])):
t.send(mn[i][j])
time.sleep(0.005)
t.send('\n')
wf = t.written_to_file('Autosign wallet')
t.ok()
def run_autosign():
t = MMGenExpect(name,'mmgen-autosign',opts+['wait'],extra_desc='(sign)')
t.expect('11 transactions signed')
t.expect('1 transaction failed to sign')
t.expect('Waiting')
t.kill(2)
t.ok(exit_val=1)
copy_files_and_make_key()
get_mnemonic()
run_autosign()
# Saved reference file tests
def ref_wallet_conv(self,name):
@ -2572,18 +2592,21 @@ class MMGenTestSuite(object):
t = MMGenExpect(name,'mmgen-tool',coin_arg+[tool_cmd,af]+add_args)
if ftype == 'keyaddr':
t.do_decrypt_ka_data(hp=ref_kafile_hash_preset,pw=ref_kafile_pass)
o = t.read().strip().split('\n')[-1]
rc = cfg[ 'ref_' + ftype + 'file_chksum' +
('_'+coin.lower() if coin else '') +
('_'+mmtype if mmtype else '')]
ref_chksum = rc if (ftype == 'passwd' or coin) else rc[g.proto.base_coin.lower()][g.testnet]
cmp_or_die(ref_chksum,o)
t.expect(chksum_pat,regex=True)
m = t.p.match.group(0)
t.read()
cmp_or_die(ref_chksum,m)
def ref_altcoin_addrgen(self,name,coin,mmtype,gen_what='addr',coin_suf=''):
def ref_altcoin_addrgen(self,name,coin,mmtype,gen_what='addr',coin_suf='',add_args=[]):
wf = os.path.join(ref_dir,cfg['seed_id']+'.mmwords')
t = MMGenExpect(name,'mmgen-{}gen'.format(gen_what),
['-Sq','--coin='+coin] +
(['--type='+mmtype] if mmtype else []) +
add_args +
[wf,cfg['addr_idx_list']])
if gen_what == 'key':
t.expect('Encrypt key list? (y/N): ','N')
@ -2592,7 +2615,6 @@ class MMGenTestSuite(object):
t.read()
refcheck('{}list data checksum'.format(gen_what),chk,chk_ref)
def ref_addrfile_gen_eth(self,name):
self.ref_altcoin_addrgen(name,coin='ETH',mmtype='ethereum')
@ -2712,8 +2734,11 @@ class MMGenTestSuite(object):
restore_debug()
t.passphrase('user data',tool_enc_passwd)
t.expect(NL,nonl=True)
t.expect('to confirm: ','YES\n')
import re
o = re.sub('\r\n','\n',t.read())
o = t.read()
o = re.sub('YES\r\n','',o).split('\n')[0]
o = re.sub('\r','\n',o)
cmp_or_die(sample_text,o)
# wallet conversion tests
@ -2970,14 +2995,14 @@ class MMGenTestSuite(object):
ext = '{}{}{}[{}]{x}.testnet.addrs'.format(
sid,altcoin_pfx,id_str,addr_range,x='' if g.debug_utf8 else '')
fn = get_file_with_ext(ext,self.regtest_user_dir(user),no_dot=True)
silence()
psave = g.proto
g.proto = CoinProtocol(g.coin,True)
if hasattr(g.proto,'bech32_hrp_rt'):
g.proto.bech32_hrp = g.proto.bech32_hrp_rt
silence()
addr = AddrList(fn).data[idx].addr
g.proto = psave
end_silence()
g.proto = psave
return addr
def create_tx_outputs(self,user,data):
@ -3036,7 +3061,7 @@ class MMGenTestSuite(object):
ret = MMGenExpect(name,'mmgen-regtest',['show_mempool']).read()
restore_debug()
from ast import literal_eval
return literal_eval(ret)
return literal_eval(ret.split('\n')[0]) # allow for extra output by handler at end
def regtest_get_mempool1(self,name):
mp = self.regtest_get_mempool(name)
@ -3352,6 +3377,7 @@ class MMGenTestSuite(object):
def ethdev_txcreate(self,name,args=[],menu=[],acct='1',non_mmgen_inputs=0,
interactive_fee='50G',
fee_res='0.00105 {} (50 gas price in Gwei)'.format(g.coin),
eth_fee_res=None,
fee_desc = 'gas price'):
t = MMGenExpect(name,'mmgen-txcreate', eth_args() + ['-B'] + args)
t.expect(r'add \[l\]abel, .*?:.','p', regex=True)
@ -3361,7 +3387,8 @@ class MMGenTestSuite(object):
input_sels_prompt='to spend from',
inputs=acct,file_desc='Ethereum transaction',
bad_input_sels=True,non_mmgen_inputs=non_mmgen_inputs,
interactive_fee=interactive_fee,fee_res=fee_res,fee_desc=fee_desc,
interactive_fee=interactive_fee,fee_res=fee_res,
fee_desc=fee_desc,eth_fee_res=eth_fee_res,
add_comment=ref_tx_label_jp)
def ethdev_txsign(self,name,ni=False,ext='{}.rawtx',add_args=[]):
@ -3421,7 +3448,7 @@ class MMGenTestSuite(object):
interactive_fee='40G'
fee_res='0.00084 {} (40 gas price in Gwei)'.format(g.coin)
return self.ethdev_txcreate(name,args=args,acct='1',non_mmgen_inputs=0,
interactive_fee=interactive_fee,fee_res=fee_res)
interactive_fee=interactive_fee,fee_res=fee_res,eth_fee_res=True)
def ethdev_txbump(self,name,ext=',40000]{}.rawtx',fee='50G',add_args=[]):
ext = ext.format('' if g.debug_utf8 else '')
@ -3487,9 +3514,7 @@ class MMGenTestSuite(object):
t.expect('Removed label.*in tracking wallet',regex=True)
t.ok()
def init_ethdev_common(self):
g.testnet = True
def ethdev_rpc_init(self):
init_coin(g.coin)
g.proto.rpc_port = 8549
rpc_init()
@ -3497,13 +3522,11 @@ class MMGenTestSuite(object):
def ethdev_token_compile(self,name,token_data={}):
MMGenExpect(name,'',msg_only=True)
cmd_args = ['--{}={}'.format(k,v) for k,v in list(token_data.items())]
silence()
imsg("Compiling solidity token contract '{}' with 'solc'".format(token_data['symbol']))
cmd = ['scripts/create-token.py','--coin='+g.coin,'--outdir='+cfg['tmpdir']] + cmd_args + [eth_addr]
imsg("Executing: {}".format(' '.join(cmd)))
subprocess.check_output(cmd)
subprocess.check_output(cmd,stderr=subprocess.STDOUT)
imsg("ERC20 token '{}' compiled".format(token_data['symbol']))
end_silence()
ok()
def ethdev_token_compile1(self,name):
@ -3515,7 +3538,7 @@ class MMGenTestSuite(object):
self.ethdev_token_compile(name,token_data)
def ethdev_token_deploy(self,name,num,key,gas,mmgen_cmd='txdo',tx_fee='8G'):
self.init_ethdev_common()
self.ethdev_rpc_init()
key_fn = get_tmpfile_fn(cfg,cfg['parity_keyfile'])
fn = os.path.join(cfg['tmpdir'],key+'.bin')
os.environ['MMGEN_BOGUS_SEND'] = ''
@ -3539,9 +3562,7 @@ class MMGenTestSuite(object):
"Contract '{}:{}' failed to execute. Aborting".format(num,key))
if key == 'Token':
write_to_tmpfile(cfg,'token_addr{}'.format(num),addr+'\n')
silence()
imsg('\nToken #{} ({}) deployed!'.format(num,addr))
end_silence()
t.ok()
def ethdev_token_deploy1a(self,name): self.ethdev_token_deploy(name,num=1,key='SafeMath',gas=200000)
@ -3560,26 +3581,46 @@ class MMGenTestSuite(object):
def ethdev_contract_deploy(self,name): # test create,sign,send
self.ethdev_token_deploy(name,num=2,key='SafeMath',gas=1100000,mmgen_cmd='txcreate')
def ethdev_token_transfer_funds(self,name):
def ethdev_token_transfer_ops(self,name,op,amt=1000):
MMGenExpect(name,'',msg_only=True)
sid = cfgs['8']['seed_id']
cmd = lambda i: ['mmgen-tool','--coin='+g.coin,'gen_addr','{}:E:{}'.format(sid,i),'wallet='+dfl_words]
silence()
usr_addrs = [subprocess.check_output(cmd(i),stderr=sys.stderr).strip() for i in (11,21)]
self.init_ethdev_common()
from mmgen.tool import Gen_addr
usr_mmaddrs = ['{}:E:{}'.format(sid,i) for i in (11,21)]
usr_addrs = [Gen_addr(addr,dfl_words,return_result=True) for addr in usr_mmaddrs]
self.ethdev_rpc_init()
from mmgen.altcoins.eth.contract import Token
from mmgen.altcoins.eth.tx import EthereumMMGenTX as etx
for i in range(2):
tk = Token(read_from_tmpfile(cfg,'token_addr{}'.format(i+1)).strip())
imsg('\n'+tk.info())
txid = tk.transfer(eth_addr,usr_addrs[i],1000,eth_key,
start_gas=ETHAmt(60000,'wei'),gasPrice=ETHAmt(8,'Gwei'))
assert etx.get_exec_status(txid,True) != 0,'Transfer of token funds failed. Aborting'
imsg('dev token balance: {}'.format(tk.balance(eth_addr)))
imsg('usr{} token balance: {}'.format(i+1,tk.balance(usr_addrs[i])))
def do_transfer():
for i in range(2):
tk = Token(read_from_tmpfile(cfg,'token_addr{}'.format(i+1)).strip())
imsg_r('\n'+tk.info())
imsg('dev token balance (pre-send): {}'.format(tk.balance(eth_addr)))
imsg('Sending {} {} to address {} ({})'.format(amt,g.coin,usr_addrs[i],usr_mmaddrs[i]))
txid = tk.transfer(eth_addr,usr_addrs[i],amt,eth_key,
start_gas=ETHAmt(60000,'wei'),gasPrice=ETHAmt(8,'Gwei'))
assert etx.get_exec_status(txid,True) != 0,'Transfer of token funds failed. Aborting'
def show_bals():
for i in range(2):
tk = Token(read_from_tmpfile(cfg,'token_addr{}'.format(i+1)).strip())
imsg('Token: {}'.format(tk.symbol()))
imsg('dev token balance: {}'.format(tk.balance(eth_addr)))
imsg('usr token balance: {} ({} {})'.format(
tk.balance(usr_addrs[i]),usr_mmaddrs[i],usr_addrs[i]))
silence()
if op == 'show_bals': show_bals()
elif op == 'do_transfer': do_transfer()
end_silence()
ok()
def ethdev_token_fund_users(self,name):
return self.ethdev_token_transfer_ops(name,op='do_transfer')
def ethdev_token_user_bals(self,name):
return self.ethdev_token_transfer_ops(name,op='show_bals')
def ethdev_token_addrgen(self,name):
self.ethdev_addrgen(name,addrs='11-13')
self.ethdev_addrgen(name,addrs='21-23')
@ -3658,7 +3699,7 @@ class MMGenTestSuite(object):
t_non_mmgen='888.111122223333444455',t_mmgen='111.888877776666555545',extra_args=['--token=mm1'])
def ethdev_txcreate_noamt(self,name):
return self.ethdev_txcreate(name,args=['98831F3A:E:12'])
return self.ethdev_txcreate(name,args=['98831F3A:E:12'],eth_fee_res=True)
def ethdev_txsign_noamt(self,name):
self.ethdev_txsign(name,ext='99.99895,50000]{}.rawtx')
def ethdev_txsend_noamt(self,name):
@ -3849,6 +3890,10 @@ if cmd_args and cmd_args[0] == 'admin':
cmd_data = cmd_data_admin
cmd_list = cmd_list_admin
if opt.exit_after:
if opt.exit_after not in cmd_data.keys():
die(1,"'{}': command not recognized".format(opt.exit_after))
try:
if cmd_args:
for arg in cmd_args:
@ -3878,7 +3923,13 @@ try:
if cmd is not list(cmd_data.keys())[-1]: do_between()
except KeyboardInterrupt:
die(1,'\nExiting at user request')
except TestSuiteException as e:
ydie(1,e.args[0])
except TestSuiteFatalException as e:
rdie(1,e.args[0])
except opt.traceback and Exception:
import traceback
print(''.join(traceback.format_exception(*sys.exc_info())))
try:
os.stat('my.err')
with open('my.err') as f:

View file

@ -240,7 +240,7 @@ class MMGenToolTestSuite(object):
file_list = [os.path.join(cfg['tmpdir'],fn) for fn in fns]
self.__class__.__dict__[cmd](*([self,cmd] + file_list))
def run_cmd(self,name,tool_args,kwargs='',extra_msg='',silent=False,strip=True,add_opts=[]):
def run_cmd(self,name,tool_args,kwargs='',extra_msg='',silent=False,strip=True,add_opts=[],binary=False):
sys_cmd = (
spawn_cmd +
add_spawn_args +
@ -289,9 +289,11 @@ class MMGenToolTestSuite(object):
vmsg('Out: ' + repr(ret))
return ret
def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='',literal=False,chkdata='',hush=False,add_opts=[]):
def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='',
literal=False,chkdata='',hush=False,add_opts=[]):
if carg: write_to_tmpfile(cfg,'{}{}.in'.format(name,fn_idx),carg+'\n')
ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs,extra_msg=extra_msg,add_opts=add_opts)
ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs,
extra_msg=extra_msg,add_opts=add_opts)
if carg: vmsg('In: ' + repr(carg))
vmsg('Out: ' + (repr(ret),ret)[literal])
if ret or ret == '':
@ -343,7 +345,7 @@ class MMGenToolTestSuite(object):
def Hexlify(self,name): self.run_cmd_out(name,getrandstr(24))
def Hexdump(self,name): self.run_cmd_randinput(name,strip=False)
def Unhexdump(self,name,fn1,fn2):
ret = self.run_cmd(name,[fn2],strip=False)
ret = self.run_cmd(name,[fn2],strip=False,binary=True)
orig = read_from_file(fn1,binary=True)
cmp_or_die(orig,ret)
def Rand2file(self,name):