From 01fbc328b884db3da3cf9621d53be36a5ebbb701 Mon Sep 17 00:00:00 2001 From: philemon Date: Fri, 9 Jan 2015 21:02:16 +0300 Subject: [PATCH] New test suite for mmgen-tool: 'test/tooltest.py' Additional tests for the 'test/test.py' suite --- MANIFEST | 7 +- mmgen/addr.py | 4 +- mmgen/crypto.py | 7 +- mmgen/main_passchg.py | 2 +- mmgen/main_tool.py | 11 +- mmgen/main_walletchk.py | 4 +- mmgen/main_walletgen.py | 4 +- mmgen/mnemonic.py | 28 +- mmgen/test.py | 91 +++++++ mmgen/tool.py | 237 +++++++++-------- mmgen/tx.py | 2 +- mmgen/util.py | 8 + setup.py | 8 +- test/__init__.py | 0 test/test.py | 577 +++++++++++++++++----------------------- test/tooltest.py | 351 ++++++++++++++++++++++++ 16 files changed, 866 insertions(+), 475 deletions(-) create mode 100755 mmgen/test.py create mode 100644 test/__init__.py create mode 100755 test/tooltest.py diff --git a/MANIFEST b/MANIFEST index e6e4a38d..e3fe6d56 100644 --- a/MANIFEST +++ b/MANIFEST @@ -33,6 +33,7 @@ mmgen/mn_electrum.py mmgen/mn_tirosh.py mmgen/mnemonic.py mmgen/term.py +mmgen/test.py mmgen/tool.py mmgen/tx.py mmgen/util.py @@ -45,8 +46,6 @@ mmgen/rpc/data.py mmgen/rpc/exceptions.py mmgen/rpc/proxy.py mmgen/rpc/util.py -mmgen/tests/__init__.py -mmgen/tests/bitcoin.py -mmgen/tests/mnemonic.py -mmgen/tests/test.py +test/__init__.py test/test.py +test/tooltest.py diff --git a/mmgen/addr.py b/mmgen/addr.py index 543185d1..8e54b24c 100755 --- a/mmgen/addr.py +++ b/mmgen/addr.py @@ -28,7 +28,7 @@ from binascii import hexlify, unhexlify from mmgen.bitcoin import numtowif # from mmgen.util import msg,qmsg,qmsg_r,make_chksum_N,get_lines_from_file,get_data_from_file,get_extension from mmgen.util import * -from mmgen.tx import is_mmgen_idx,is_mmgen_seed_id,is_btc_addr,is_wip_key,get_wif2addr_f +from mmgen.tx import is_mmgen_idx,is_mmgen_seed_id,is_btc_addr,is_wif,get_wif2addr_f import mmgen.config as g addrmsgs = { @@ -147,7 +147,7 @@ def _parse_addrfile_body(lines,has_keys=False,check=False): if d[0] != "wif:": return "Invalid key line in file: '%s'" % l - if not is_wip_key(d[1]): + if not is_wif(d[1]): return "'%s': invalid Bitcoin key" % d[1] a.wif = unicode(d[1]) diff --git a/mmgen/crypto.py b/mmgen/crypto.py index 576bbed2..7dea0739 100755 --- a/mmgen/crypto.py +++ b/mmgen/crypto.py @@ -199,7 +199,7 @@ def get_random_data_from_user(uchars): return key_data+"".join(fmt_time_data) -def get_random(length,opts): +def get_random(length): from Crypto import Random os_rand = Random.new().read(length) if g.use_urandchars: @@ -430,8 +430,9 @@ def _get_seed_from_brain_passphrase(words,opts): salt_len,sha256_len,nonce_len = 32,32,32 def mmgen_encrypt(data,what="data",hash_preset='',opts={}): - salt,iv,nonce = get_random(salt_len,opts),\ - get_random(g.aesctr_iv_len,opts), get_random(nonce_len,opts) + salt,iv,nonce = get_random(salt_len),\ + get_random(g.aesctr_iv_len), \ + get_random(nonce_len) hp = hash_preset or get_hash_preset_from_user('3',what) m = "default" if hp == '3' else "user-requested" vmsg("Encrypting %s" % what) diff --git a/mmgen/main_passchg.py b/mmgen/main_passchg.py index 42f20e6f..f4870452 100755 --- a/mmgen/main_passchg.py +++ b/mmgen/main_passchg.py @@ -114,7 +114,7 @@ if 'preset' in changed or 'passwd' in changed: # Update key ID, salt from hashlib import sha256 - salt = sha256(salt + get_random(128,opts)).digest()[:g.salt_len] + salt = sha256(salt + get_random(128)).digest()[:g.salt_len] key = make_key(passwd, salt, opts['hash_preset']) new_key_id = make_chksum_8(key) qmsg("Key ID changed: %s -> %s" % (key_id,new_key_id)) diff --git a/mmgen/main_tool.py b/mmgen/main_tool.py index 70f3a1e8..1837ff00 100755 --- a/mmgen/main_tool.py +++ b/mmgen/main_tool.py @@ -17,7 +17,7 @@ # along with this program. If not, see . """ -mmgen-tool: Perform various Bitcoin-related operations. +mmgen-tool: Perform various MMGen- and Bitcoin-related operations. Part of the MMGen suite """ @@ -28,7 +28,7 @@ from mmgen.Opts import * help_data = { 'prog_name': g.prog_name, - 'desc': "Perform various BTC-related operations", + 'desc': "Perform various MMGen- and Bitcoin-related operations", 'usage': "[opts] ", 'options': """ -d, --outdir= d Specify an alternate directory 'd' for output @@ -41,9 +41,9 @@ help_data = { 'notes': """ COMMANDS:{} -Type '{} --help for usage information on a particular +Type '{} usage for usage information on a particular command -""".format(tool.command_help,g.prog_name) +""".format(tool.cmd_help,g.prog_name) } opts,cmd_args = parse_opts(sys.argv,help_data) @@ -54,7 +54,7 @@ if len(cmd_args) < 1: command = cmd_args.pop(0) -if command not in tool.commands.keys(): +if command not in tool.cmd_data: msg("'%s': No such command" % command) sys.exit(1) @@ -64,7 +64,6 @@ if cmd_args and cmd_args[0] == '--help': args,kwargs = tool.process_args(g.prog_name, command, cmd_args) -#msgrepr(args,kwargs) tool.opts = opts tool.__dict__[command](*args,**kwargs) diff --git a/mmgen/main_walletchk.py b/mmgen/main_walletchk.py index 36eaf048..418df74a 100755 --- a/mmgen/main_walletchk.py +++ b/mmgen/main_walletchk.py @@ -73,12 +73,12 @@ def wallet_to_incog_data(infile,opts): seed = decrypt_seed(enc_seed, key, seed_id, key_id) if seed: break - iv = get_random(g.aesctr_iv_len,opts) + iv = get_random(g.aesctr_iv_len) iv_id = make_iv_chksum(iv) msg("Incog ID: %s" % iv_id) if not 'old_incog_fmt' in opts: - salt = get_random(g.salt_len,opts) + salt = get_random(g.salt_len) key = make_key(passwd, salt, preset, "incog wallet key") key_id = make_chksum_8(key) from hashlib import sha256 diff --git a/mmgen/main_walletgen.py b/mmgen/main_walletgen.py index 7268a4ee..004b77e8 100755 --- a/mmgen/main_walletgen.py +++ b/mmgen/main_walletgen.py @@ -163,9 +163,9 @@ for i in 'from_mnemonic','from_brain','from_seed','from_incog': break else: # Truncate random data for smaller seed lengths - seed = sha256(get_random(128,opts)).digest()[:opts['seed_len']/8] + seed = sha256(get_random(128)).digest()[:opts['seed_len']/8] -salt = sha256(get_random(128,opts)).digest()[:g.salt_len] +salt = sha256(get_random(128)).digest()[:g.salt_len] qmsg(wmsg['choose_wallet_passphrase'] % opts['hash_preset']) diff --git a/mmgen/mnemonic.py b/mmgen/mnemonic.py index eb6347ef..659a51a4 100755 --- a/mmgen/mnemonic.py +++ b/mmgen/mnemonic.py @@ -21,7 +21,9 @@ mnemonic.py: Mnemonic routines for the MMGen suite """ import sys -from mmgen.util import msg,msg_r,make_chksum_8 +from binascii import hexlify +from mmgen.util import msg,msg_r,make_chksum_8,Vmsg +from mmgen.crypto import get_random import mmgen.config as g wl_checksums = { @@ -38,7 +40,8 @@ def hex2mn_pad(hexnum): return len(hexnum) * 3 / 8 def baseNtohex(base,words,wordlist,pad=0): deconv = \ [wordlist.index(words[::-1][i])*(base**i) for i in range(len(words))] - return ("{:0%sx}"%pad).format(sum(deconv)) + ret = ("{:0%sx}" % pad).format(sum(deconv)) + return "%s%s" % (('0' if len(ret) % 2 else ''), ret) def hextobaseN(base,hexnum,wordlist,pad=0): num,ret = int(hexnum,16),[] @@ -122,3 +125,24 @@ def check_wordlist(wl,label): else: print "ERROR: List is not sorted!" sys.exit(3) + +from mmgen.mn_electrum import electrum_words as el +from mmgen.mn_tirosh import tirosh_words as tl +wordlists = sorted(wl_checksums) + +def get_wordlist(wordlist): + wordlist = wordlist.lower() + if wordlist not in wordlists: + Msg('"%s": invalid wordlist. Valid choices: %s' % + (wordlist,'"'+'" "'.join(wordlists)+'"')) + sys.exit(1) + return (el if wordlist == "electrum" else tl).strip().split("\n") + +def do_random_mn(nbytes,wordlist): + r = get_random(nbytes) + Vmsg("Seed: %s" % hexlify(r)) + for wlname in (wordlists if wordlist == "all" else [wordlist]): + wl = get_wordlist(wlname) + mn = get_mnemonic_from_seed(r,wl,wordlist) + Vmsg("%s wordlist mnemonic:" % (wlname.capitalize())) + print " ".join(mn) diff --git a/mmgen/test.py b/mmgen/test.py new file mode 100755 index 00000000..fb720b16 --- /dev/null +++ b/mmgen/test.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2015 Philemon +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +addr.py: Shared routines for the test suites +""" + +import sys,os +from binascii import hexlify +from mmgen.util import msg,write_to_file +import mmgen.config as g + +_red,_grn,_yel,_cya,_reset = ( + ["\033[%sm" % c for c in "31;1","32;1","33;1","36;1","0"] +) +def red(s): return _red+s+_reset +def green(s): return _grn+s+_reset +def yellow(s): return _yel+s+_reset +def cyan(s): return _cya+s+_reset + +def cleandir(d): + try: files = os.listdir(d) + except: return + + msg(green("Cleaning directory '%s'" % d)) + for f in files: + os.unlink(os.path.join(d,f)) + +def getrandnum(n): return int(hexlify(os.urandom(n)),16) +def getrandhex(n): return hexlify(os.urandom(n)) +def getrandstr(num_chars,no_space=False): + n,m = 95,32 + if no_space: n,m = 94,33 + return "".join([chr(ord(i)%n+m) for i in list(os.urandom(num_chars))]) + +def mk_tmpdir(cfg): + try: os.mkdir(cfg['tmpdir'],0755) + except OSError as e: + if e.errno != 17: raise + else: msg("Created directory '%s'" % cfg['tmpdir']) + +def get_tmpfile_fn(cfg,fn): + return os.path.join(cfg['tmpdir'],fn) + +def write_to_tmpfile(cfg,fn,data): + write_to_file(os.path.join(cfg['tmpdir'],fn),data,{},silent=True) + +def read_from_tmpfile(cfg,fn): + from mmgen.util import get_data_from_file + return get_data_from_file(os.path.join(cfg['tmpdir'],fn),silent=True) + +def read_from_file(fn): + from mmgen.util import get_data_from_file + return get_data_from_file(fn,silent=True) + +def ok(): + if g.verbose or g.exact_output: + sys.stderr.write(green("OK\n")) + else: msg(" OK") + +def ok_or_die(val,chk_func,s,skip_ok=False): + try: ret = chk_func(val) + except: ret = False + if ret: + if not skip_ok: ok() + else: + msg(red("Returned value '%s' is not a %s" % (val,s))) + sys.exit(3) + +def cmp_or_die(s,t,skip_ok=False): + if s == t: + if not skip_ok: ok() + else: + msg(red("Recoded data:\n%s\ndiffers from original data:\n%s\n" % + (repr(t),repr(s)))) + sys.exit(3) diff --git a/mmgen/tool.py b/mmgen/tool.py index ea5d96a2..c72de070 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -29,63 +29,64 @@ from mmgen.crypto import * from mmgen.util import * from mmgen.tx import * -def Msg(s): sys.stdout.write(s + "\n") -def Msg_r(s): sys.stdout.write(s) -def Vmsg(s): - if g.verbose: sys.stdout.write(s + "\n") -def Vmsg_r(s): - if g.verbose: sys.stdout.write(s) - opts = {} -commands = { - "help": [], - "strtob58": [' [str]'], - "b58tostr": [' [str]'], - "hextob58": [' [str]'], - "b58tohex": [' [str]'], - "b58randenc": [], - "randhex": ['nbytes [int=32]'], - "randwif": ['compressed [bool=False]'], - "randpair": ['compressed [bool=False]'], - "wif2hex": [' [str]', 'compressed [bool=False]'], - "wif2addr": [' [str]', 'compressed [bool=False]'], - "hex2wif": [' [str]', 'compressed [bool=False]'], - "hexdump": [' [str]', 'cols [int=8]', 'line_nums [bool=True]'], - "unhexdump": [' [str]'], - "hex2mn": [' [str]','wordlist [str="electrum"]'], - "mn2hex": [' [str]', 'wordlist [str="electrum"]'], - "b32tohex": [' [str]'], - "hextob32": [' [str]'], - "mn_rand128": ['wordlist [str="electrum"]'], - "mn_rand192": ['wordlist [str="electrum"]'], - "mn_rand256": ['wordlist [str="electrum"]'], - "mn_stats": ['wordlist [str="electrum"]'], - "mn_printlist": ['wordlist [str="electrum"]'], - "id8": [' [str]'], - "id6": [' [str]'], - "str2id6": [' [str]'], - "listaddresses":['minconf [int=1]','showempty [bool=False]','pager [bool=False]'], - "getbalance": ['minconf [int=1]'], - "txview": [' [str]','pager [bool=False]','terse [bool=False]'], - "addrfile_chksum": [' [str]'], - "keyaddrfile_chksum": [' [str]'], - "find_incog_data": [' [str]',' [str]','keep_searching [bool=False]'], - "hexreverse": [' [str]'], - "sha256x2": [' [str]', - 'hex_input [bool=False]','file_input [bool=False]'], - "hexlify": [' [str]'], - "hexaddr2addr": [' [str]'], - "addr2hexaddr": [' [str]'], - "pubkey2addr": [' [str]'], - "pubkey2hexaddr": [' [str]'], - "privhex2addr": [' [str]','compressed [bool=False]'], - "encrypt": [' [str]','outfile [str=""]','hash_preset [str=""]'], - "decrypt": [' [str]','outfile [str=""]','hash_preset [str=""]'], - "rand2file": [' [str]',' [str]','threads [int=4]'], - "bytespec": [' [str]'], -} +from collections import OrderedDict +cmd_data = OrderedDict([ + ("help", []), + ("usage", [' [str]']), + ("strtob58", [' [str]']), + ("b58tostr", [' [str]']), + ("hextob58", [' [str]']), + ("b58tohex", [' [str]']), + ("b58randenc", []), + ("b32tohex", [' [str]']), + ("hextob32", [' [str]']), + ("randhex", ['nbytes [int=32]']), + ("id8", [' [str]']), + ("id6", [' [str]']), + ("sha256x2", [' [str]', + 'hex_input [bool=False]','file_input [bool=False]']), + ("str2id6", [' [str]']), + ("hexdump", [' [str]', 'cols [int=8]', 'line_nums [bool=True]']), + ("unhexdump", [' [str]']), + ("hexreverse", [' [str]']), + ("hexlify", [' [str]']), + ("rand2file", [' [str]',' [str]','threads [int=4]','silent [bool=False']), -command_help = """ + ("randwif", ['compressed [bool=False]']), + ("randpair", ['compressed [bool=False]']), + ("hex2wif", [' [str]', 'compressed [bool=False]']), + ("wif2hex", [' [str]', 'compressed [bool=False]']), + ("wif2addr", [' [str]', 'compressed [bool=False]']), + ("hexaddr2addr", [' [str]']), + ("addr2hexaddr", [' [str]']), + ("pubkey2addr", [' [str]']), + ("pubkey2hexaddr", [' [str]']), + ("privhex2addr", [' [str]','compressed [bool=False]']), + + ("hex2mn", [' [str]','wordlist [str="electrum"]']), + ("mn2hex", [' [str]', 'wordlist [str="electrum"]']), + ("mn_rand128", ['wordlist [str="electrum"]']), + ("mn_rand192", ['wordlist [str="electrum"]']), + ("mn_rand256", ['wordlist [str="electrum"]']), + ("mn_stats", ['wordlist [str="electrum"]']), + ("mn_printlist", ['wordlist [str="electrum"]']), + + + ("listaddresses",['minconf [int=1]','showempty [bool=False]','pager [bool=False]']), + ("getbalance", ['minconf [int=1]']), + ("txview", [' [str]','pager [bool=False]','terse [bool=False]']), + + ("addrfile_chksum", [' [str]']), + ("keyaddrfile_chksum", [' [str]']), + ("find_incog_data", [' [str]',' [str]','keep_searching [bool=False]']), + + ("encrypt", [' [str]','outfile [str=""]','hash_preset [str=""]']), + ("decrypt", [' [str]','outfile [str=""]','hash_preset [str=""]']), + ("bytespec", [' [str]']), +]) + +cmd_help = """ Bitcoin address/key operations (compressed public keys supported): addr2hexaddr - convert Bitcoin address from base58 to hex format hex2wif - convert a private key from hex to WIF format @@ -153,24 +154,33 @@ command_help = """ def tool_usage(prog_name, command): print "USAGE: '%s %s%s'" % (prog_name, command, - (" "+" ".join(commands[command]) if commands[command] else "")) + (" "+" ".join(cmd_data[command]) if cmd_data[command] else "")) def process_args(prog_name, command, cmd_args): c_args = [[i.split(" [")[0],i.split(" [")[1][:-1]] - for i in commands[command] if "=" not in i] + for i in cmd_data[command] if "=" not in i] c_kwargs = dict([[ i.split(" [")[0], [i.split(" [")[1].split("=")[0], i.split(" [")[1].split("=")[1][:-1]] - ] for i in commands[command] if "=" in i]) + ] for i in cmd_data[command] if "=" in i]) + u_args = cmd_args[:len(c_args)] - u_kwargs = dict([i.split("=") for i in cmd_args[len(c_args):]]) + u_kwargs = cmd_args[len(c_args):] -# print c_args; print c_kwargs; print u_args; print u_kwargs; sys.exit() - - if len(u_args) != len(c_args): + if len(u_args) < len(c_args): + msg("%s args required" % len(c_args)) tool_usage(prog_name, command) sys.exit(1) + if len(u_kwargs) > len(c_kwargs): + msg("Too many arguments") + tool_usage(prog_name, command) + sys.exit(1) + + u_kwargs = dict([a.split("=") for a in u_kwargs]) + +# print c_args; print c_kwargs; print u_args; print u_kwargs; sys.exit() + if set(u_kwargs) > set(c_kwargs): print "Invalid named argument" sys.exit(1) @@ -203,67 +213,84 @@ def process_args(prog_name, command, cmd_args): return args,kwargs -# Individual commands +# Individual cmd_data def help(): Msg("Available commands:") - for k in commands.keys(): - Msg("%-16s %s" % (k," ".join(commands[k]))) + for k in cmd_data.keys(): + Msg("%-16s %s" % (k," ".join(cmd_data[k]))) -def print_convert_results(indata,enc,dec,no_recode=False): - Vmsg("Input: [%s]" % indata) - Vmsg_r("Encoded data: ["); Msg_r(enc); Vmsg_r("]"); Msg("") - if not no_recode: - Vmsg("Recoded data: [%s]" % dec) - if indata != dec: - Msg("WARNING! Recoded number doesn't match input stringwise!") +def are_equal(a,b,dtype=""): + if dtype == "str": return a.lstrip("\0") == b.lstrip("\0") + if dtype == "hex": return a.lstrip("0") == b.lstrip("0") + if dtype == "b58": return a.lstrip("1") == b.lstrip("1") + else: return a == b + +def print_convert_results(indata,enc,dec,dtype): + + error = False if are_equal(indata,dec,dtype) else True + + if error or g.verbose: + Msg("Input: %s" % repr(indata)) + Msg("Encoded data: %s" % repr(enc)) + Msg("Recoded data: %s" % repr(dec)) + else: Msg(enc) + + if error: + Msg("Error! Recoded data doesn't match input!") + sys.exit(3) + +def usage(cmd): + tool_usage(g.prog_name, cmd) def hexdump(infile, cols=8, line_nums=True): - print pretty_hexdump(get_data_from_file(infile,dash=True), + print pretty_hexdump(get_data_from_file(infile,dash=True,silent=True), cols=cols, line_nums=line_nums) def unhexdump(infile): - sys.stdout.write(decode_pretty_hexdump(get_data_from_file(infile,dash=True))) + sys.stdout.write(decode_pretty_hexdump( + get_data_from_file(infile,dash=True,silent=True))) def strtob58(s): enc = bitcoin.b58encode(s) dec = bitcoin.b58decode(enc) - print_convert_results(s,enc,dec) + print_convert_results(s,enc,dec,"str") def hextob58(s,f_enc=bitcoin.b58encode, f_dec=bitcoin.b58decode): enc = f_enc(ba.unhexlify(s)) dec = ba.hexlify(f_dec(enc)) - print_convert_results(s,enc,dec) + print_convert_results(s,enc,dec,"hex") def b58tohex(s,f_enc=bitcoin.b58decode, f_dec=bitcoin.b58encode): tmp = f_enc(s) if tmp == False: sys.exit(1) enc = ba.hexlify(tmp) dec = f_dec(ba.unhexlify(enc)) - print_convert_results(s,enc,dec) + print_convert_results(s,enc,dec,"b58") def b58tostr(s,f_enc=bitcoin.b58decode, f_dec=bitcoin.b58encode): enc = f_enc(s) if enc == False: sys.exit(1) dec = f_dec(enc) - print_convert_results(s,enc,dec) + print_convert_results(s,enc,dec,"b58") def b58randenc(): - r = get_random(32,opts) + r = get_random(32) enc = bitcoin.b58encode(r) dec = bitcoin.b58decode(enc) - print_convert_results(ba.hexlify(r),enc,ba.hexlify(dec)) + print_convert_results(r,enc,dec,"str") def randhex(nbytes='32'): - print ba.hexlify(get_random(int(nbytes),opts)) + print ba.hexlify(get_random(int(nbytes))) def randwif(compressed=False): - r_hex = ba.hexlify(get_random(32,opts)) + r_hex = ba.hexlify(get_random(32)) enc = bitcoin.hextowif(r_hex,compressed) - print_convert_results(r_hex,enc,"",no_recode=True) + dec = bitcoin.wiftohex(enc,compressed) + print_convert_results(r_hex,enc,dec,"hex") def randpair(compressed=False): - r_hex = ba.hexlify(get_random(32,opts)) + r_hex = ba.hexlify(get_random(32)) wif = bitcoin.hextowif(r_hex,compressed) addr = bitcoin.privnum2addr(int(r_hex,16),compressed) Vmsg("Key (hex): %s" % r_hex) @@ -279,27 +306,6 @@ def wif2addr(wif,compressed=False): Vmsg_r("Addr: "); Msg(addr) from mmgen.mnemonic import * -from mmgen.mn_electrum import electrum_words as el -from mmgen.mn_tirosh import tirosh_words as tl - -wordlists = sorted(wl_checksums.keys()) - -def get_wordlist(wordlist): - wordlist = wordlist.lower() - if wordlist not in wordlists: - Msg('"%s": invalid wordlist. Valid choices: %s' % - (wordlist,'"'+'" "'.join(wordlists)+'"')) - sys.exit(1) - return (el if wordlist == "electrum" else tl).strip().split("\n") - -def do_random_mn(nbytes,wordlist): - r = get_random(nbytes,opts) - Vmsg("Seed: %s" % ba.hexlify(r)) - for wlname in (wordlists if wordlist == "all" else [wordlist]): - wl = get_wordlist(wlname) - mn = get_mnemonic_from_seed(r,wl,wordlist) - Vmsg("%s wordlist mnemonic:" % (wlname.capitalize())) - print " ".join(mn) def mn_rand128(wordlist="electrum"): do_random_mn(16,wordlist) def mn_rand192(wordlist="electrum"): do_random_mn(24,wordlist) @@ -333,8 +339,8 @@ def mn_printlist(wordlist="electrum"): wl = get_wordlist(wordlist) print "\n".join(wl) -def id8(infile): print make_chksum_8(get_data_from_file(infile,dash=True)) -def id6(infile): print make_chksum_6(get_data_from_file(infile,dash=True)) +def id8(infile): print make_chksum_8(get_data_from_file(infile,dash=True,silent=True)) +def id6(infile): print make_chksum_6(get_data_from_file(infile,dash=True,silent=True)) def str2id6(s): print make_chksum_6("".join(s.split())) # List MMGen addresses and their balances: @@ -462,9 +468,9 @@ def hex2wif(hexpriv,compressed=False): print bitcoin.hextowif(hexpriv,compressed) -def encrypt(infile,outfile="",hash_preset=''): +def encrypt(infile,outfile="",hash_preset=""): data = get_data_from_file(infile,"data for encryption") - enc_d = mmgen_encrypt(data,"user data","",opts) + enc_d = mmgen_encrypt(data,"user data",hash_preset,opts) if outfile == '-': write_to_stdout(enc_d,"encrypted data",confirm=True) else: @@ -473,10 +479,10 @@ def encrypt(infile,outfile="",hash_preset=''): write_to_file(outfile, enc_d, opts,"encrypted data",True,True) -def decrypt(infile,outfile="",hash_preset=''): +def decrypt(infile,outfile="",hash_preset=""): enc_d = get_data_from_file(infile,"encrypted data") while True: - dec_d = mmgen_decrypt(enc_d,"user data") + dec_d = mmgen_decrypt(enc_d,"user data",hash_preset) if dec_d: break msg("Trying again...") if outfile == '-': @@ -539,7 +545,7 @@ def parse_nbytes(nbytes): sys.exit(1) -def rand2file(outfile, nbytes, threads=4): +def rand2file(outfile, nbytes, threads=4, silent=False): nbytes = parse_nbytes(nbytes) from Crypto import Random rh = Random.new() @@ -553,7 +559,7 @@ def rand2file(outfile, nbytes, threads=4): from Crypto.Cipher import AES from Crypto.Util import Counter - key = get_random(32,opts) + key = get_random(32) def encrypt_worker(wid): while True: @@ -590,8 +596,9 @@ def rand2file(outfile, nbytes, threads=4): if not (bsize*i) % roll: msg_r("\rRead: %s bytes" % (bsize*i)) - msg("\rRead: %s bytes" % nbytes) - qmsg("\r%s bytes written to file '%s'" % (nbytes,outfile)) + if not silent: + msg("\rRead: %s bytes" % nbytes) + qmsg("\r%s bytes written to file '%s'" % (nbytes,outfile)) q1.join() q2.join() f.close() diff --git a/mmgen/tx.py b/mmgen/tx.py index f71ea7a7..eed43ee5 100755 --- a/mmgen/tx.py +++ b/mmgen/tx.py @@ -85,7 +85,7 @@ def is_b58_str(s): from mmgen.bitcoin import b58a return set(list(s)) <= set(b58a) -def is_wip_key(s): +def is_wif(s): if s == "": return False compressed = not s[0] == '5' from mmgen.bitcoin import wiftohex diff --git a/mmgen/util.py b/mmgen/util.py index 927876c0..1c0fac8b 100755 --- a/mmgen/util.py +++ b/mmgen/util.py @@ -41,6 +41,14 @@ def vmsg(s): def vmsg_r(s): if g.verbose: sys.stderr.write(s) +def Msg(s): sys.stdout.write(s + "\n") +def Msg_r(s): sys.stdout.write(s) +def Vmsg(s): + if g.verbose: sys.stdout.write(s + "\n") +def Vmsg_r(s): + if g.verbose: sys.stdout.write(s) + + def msgrepr(*args): for d in args: sys.stdout.write(repr(d)+"\n") diff --git a/setup.py b/setup.py index e684cefa..a1081522 100755 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ setup( 'mmgen.mn_tirosh', 'mmgen.Opts', 'mmgen.term', + 'mmgen.test', 'mmgen.tool', 'mmgen.tx', 'mmgen.util', @@ -46,10 +47,9 @@ setup( 'mmgen.rpc.proxy', 'mmgen.rpc.util', - 'mmgen.tests.__init__', - 'mmgen.tests.bitcoin', - 'mmgen.tests.mnemonic', - 'mmgen.tests.test', + 'test.__init__', + 'test.test', + 'test.tooltest', ], scripts=[ 'mmgen-addrgen', diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/test.py b/test/test.py index 7fb751ba..62b42c5c 100755 --- a/test/test.py +++ b/test/test.py @@ -8,109 +8,14 @@ pn = os.path.dirname(sys.argv[0]) os.chdir(os.path.join(pn,os.pardir)) sys.path.__setitem__(0,os.path.abspath(os.curdir)) -from mmgen.util import msgrepr, msgrepr_exit +import mmgen.config as g +from mmgen.util import msgrepr,msgrepr_exit,Msg +from mmgen.test import * hincog_fn = "rand_data" non_mmgen_fn = "btckey" -from collections import OrderedDict -cmd_data = OrderedDict([ -# test description depends - ['refwalletgen', (6,'reference wallet seed ID', [[[],6]])], - ['refaddrgen', (6,'reference wallet address checksum', [[["mmdat"],6]])], - ['refkeyaddrgen', (6,'reference wallet key-address checksum', [[["mmdat"],6]])], - - ['walletgen', (1,'wallet generation', [[[],1]])], - ['walletchk', (1,'wallet check', [[["mmdat"],1]])], - ['passchg', (5,'password, label and hash preset change',[[["mmdat"],1]])], - ['walletchk_newpass',(5,'wallet check with new pw, label and hash preset',[[["mmdat"],5]])], - ['addrgen', (1,'address generation', [[["mmdat"],1]])], - ['addrimport', (1,'address import', [[["addrs"],1]])], - ['txcreate', (1,'transaction creation', [[["addrs"],1]])], - ['txsign', (1,'transaction signing', [[["mmdat","raw"],1]])], - ['txsend', (1,'transaction sending', [[["sig"],1]])], - - ['export_seed', (1,'seed export to mmseed format', [[["mmdat"],1]])], - ['export_mnemonic', (1,'seed export to mmwords format', [[["mmdat"],1]])], - ['export_incog', (1,'seed export to mmincog format', [[["mmdat"],1]])], - ['export_incog_hex',(1,'seed export to mmincog hex format', [[["mmdat"],1]])], - ['export_incog_hidden',(1,'seed export to hidden mmincog format', [[["mmdat"],1]])], - - ['addrgen_seed', (1,'address generation from mmseed file', [[["mmseed","addrs"],1]])], - ['addrgen_mnemonic',(1,'address generation from mmwords file',[[["mmwords","addrs"],1]])], - ['addrgen_incog', (1,'address generation from mmincog file',[[["mmincog","addrs"],1]])], - ['addrgen_incog_hex',(1,'address generation from mmincog hex file',[[["mmincox","addrs"],1]])], - ['addrgen_incog_hidden',(1,'address generation from hidden mmincog file', [[[hincog_fn,"addrs"],1]])], - - ['keyaddrgen', (1,'key-address file generation', [[["mmdat"],1]])], - ['txsign_keyaddr',(1,'transaction signing with key-address file', [[["akeys.mmenc","raw"],1]])], - - ['walletgen2',(2,'wallet generation (2)', [])], -# ['walletgen2',(2,'wallet generation (2), 128-bit seed (WIP)', [])], - ['addrgen2', (2,'address generation (2)', [[["mmdat"],2]])], - ['txcreate2', (2,'transaction creation (2)', [[["addrs"],2]])], - ['txsign2', (2,'transaction signing, two transactions',[[["mmdat","raw"],1],[["mmdat","raw"],2]])], - ['export_mnemonic2', (2,'seed export to mmwords format (2)',[[["mmdat"],2]])], -# ['export_mnemonic2', (2,'seed export to mmwords format (2), 128-bit seed (WIP)',[[["mmdat"],2]])], - - ['walletgen3',(3,'wallet generation (3)', [])], - ['addrgen3', (3,'address generation (3)', [[["mmdat"],3]])], - ['txcreate3', (3,'tx creation with inputs and outputs from two wallets', [[["addrs"],1],[["addrs"],3]])], - ['txsign3', (3,'tx signing with inputs and outputs from two wallets',[[["mmdat"],1],[["mmdat","raw"],3]])], - - ['walletgen4',(4,'wallet generation (4) (brainwallet)', [])], -# ['walletgen4',(4,'wallet generation (4) (brainwallet, 192-bit seed (WIP))', [])], - ['addrgen4', (4,'address generation (4)', [[["mmdat"],4]])], - ['txcreate4', (4,'tx creation with inputs and outputs from four seed sources, plus non-MMGen inputs and outputs', [[["addrs"],1],[["addrs"],2],[["addrs"],3],[["addrs"],4]])], - ['txsign4', (4,'tx signing with inputs and outputs from incog file, mnemonic file, wallet and brainwallet, plus non-MMGen inputs and outputs', [[["mmincog"],1],[["mmwords"],2],[["mmdat"],3],[["mmbrain","raw"],4]])], -]) - -tool_cmd_data = OrderedDict([ - ['strtob58', (10, '', [])], - ['b58tostr', (10, '', [[["strtob58.in","strtob58.out"],10]])], - ['hextob58', (10, '', [])], - ['b58tohex', (10, '', [[["hextob58.in","hextob58.out"],10]])], -# "b58randenc": [], -# "randhex": ['nbytes [int=32]'], -# "randwif": ['compressed [bool=False]'], -# "randpair": ['compressed [bool=False]'], -# "wif2hex": [' [str]', 'compressed [bool=False]'], -# "wif2addr": [' [str]', 'compressed [bool=False]'], -# "hex2wif": [' [str]', 'compressed [bool=False]'], -# "hexdump": [' [str]', 'cols [int=8]', 'line_nums [bool=True]'], -# "unhexdump": [' [str]'], -# "hex2mn": [' [str]','wordlist [str="electrum"]'], -# "mn2hex": [' [str]', 'wordlist [str="electrum"]'], -# "b32tohex": [' [str]'], -# "hextob32": [' [str]'], -# "mn_rand128": ['wordlist [str="electrum"]'], -# "mn_rand192": ['wordlist [str="electrum"]'], -# "mn_rand256": ['wordlist [str="electrum"]'], -# "mn_stats": ['wordlist [str="electrum"]'], -# "mn_printlist": ['wordlist [str="electrum"]'], -# "id8": [' [str]'], -# "id6": [' [str]'], -# "str2id6": [' [str]'], -]) - - -utils = { - 'check_deps': 'check dependencies for specified command', - 'clean': 'clean specified tmp dir(s) 1, 2, 3 or 4 (no arg = all dirs)', -} - -addrs_per_wallet = 8 cfgs = { - '10': { - 'name': "test the tool utility", - 'enc_passwd': "Ten Satoshis", - 'tmpdir': "test/tmp10", - 'dep_generators': { - 'strtob58.out': "strtob58", - 'strtob58.in': "strtob58", - }, - - }, '6': { 'name': "reference wallet check", 'bw_passwd': "abc", @@ -194,17 +99,94 @@ cfgs = { 'mmdat': "passchg", }, }, + '9': { + 'tmpdir': "test/tmp9", + 'tool_enc_passwd': "Scrypt it, don't hash it!", + 'tool_enc_reftext': + "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks\n", + 'tool_enc_infn': "tool_encrypt.in", + 'tool_enc_ref_infn': "tool_encrypt_ref.in", + 'dep_generators': { + 'tool_encrypt.in': "tool_encrypt", + 'tool_encrypt.in.mmenc': "tool_encrypt", + 'tool_encrypt_ref.in': "tool_encrypt_ref", + 'tool_encrypt_ref.in.mmenc': "tool_encrypt_ref", + }, + }, } -from binascii import hexlify -def getrand(n): return int(hexlify(os.urandom(n)),16) -def getrandhex(n): return hexlify(os.urandom(n)) +from collections import OrderedDict +cmd_data = OrderedDict([ +# test description depends + ['refwalletgen', (6,'reference wallet seed ID', [[[],6]])], + ['refaddrgen', (6,'reference wallet address checksum', [[["mmdat"],6]])], + ['refkeyaddrgen', (6,'reference wallet key-address checksum', [[["mmdat"],6]])], + + ['walletgen', (1,'wallet generation', [[[],1]])], + ['walletchk', (1,'wallet check', [[["mmdat"],1]])], + ['passchg', (5,'password, label and hash preset change',[[["mmdat"],1]])], + ['walletchk_newpass',(5,'wallet check with new pw, label and hash preset',[[["mmdat"],5]])], + ['addrgen', (1,'address generation', [[["mmdat"],1]])], + ['addrimport', (1,'address import', [[["addrs"],1]])], + ['txcreate', (1,'transaction creation', [[["addrs"],1]])], + ['txsign', (1,'transaction signing', [[["mmdat","raw"],1]])], + ['txsend', (1,'transaction sending', [[["sig"],1]])], + + ['export_seed', (1,'seed export to mmseed format', [[["mmdat"],1]])], + ['export_mnemonic', (1,'seed export to mmwords format', [[["mmdat"],1]])], + ['export_incog', (1,'seed export to mmincog format', [[["mmdat"],1]])], + ['export_incog_hex',(1,'seed export to mmincog hex format', [[["mmdat"],1]])], + ['export_incog_hidden',(1,'seed export to hidden mmincog format', [[["mmdat"],1]])], + + ['addrgen_seed', (1,'address generation from mmseed file', [[["mmseed","addrs"],1]])], + ['addrgen_mnemonic',(1,'address generation from mmwords file',[[["mmwords","addrs"],1]])], + ['addrgen_incog', (1,'address generation from mmincog file',[[["mmincog","addrs"],1]])], + ['addrgen_incog_hex',(1,'address generation from mmincog hex file',[[["mmincox","addrs"],1]])], + ['addrgen_incog_hidden',(1,'address generation from hidden mmincog file', [[[hincog_fn,"addrs"],1]])], + + ['keyaddrgen', (1,'key-address file generation', [[["mmdat"],1]])], + ['txsign_keyaddr',(1,'transaction signing with key-address file', [[["akeys.mmenc","raw"],1]])], + + ['walletgen2',(2,'wallet generation (2)', [])], +# ['walletgen2',(2,'wallet generation (2), 128-bit seed (WIP)', [])], + ['addrgen2', (2,'address generation (2)', [[["mmdat"],2]])], + ['txcreate2', (2,'transaction creation (2)', [[["addrs"],2]])], + ['txsign2', (2,'transaction signing, two transactions',[[["mmdat","raw"],1],[["mmdat","raw"],2]])], + ['export_mnemonic2', (2,'seed export to mmwords format (2)',[[["mmdat"],2]])], +# ['export_mnemonic2', (2,'seed export to mmwords format (2), 128-bit seed (WIP)',[[["mmdat"],2]])], + + ['walletgen3',(3,'wallet generation (3)', [])], + ['addrgen3', (3,'address generation (3)', [[["mmdat"],3]])], + ['txcreate3', (3,'tx creation with inputs and outputs from two wallets', [[["addrs"],1],[["addrs"],3]])], + ['txsign3', (3,'tx signing with inputs and outputs from two wallets',[[["mmdat"],1],[["mmdat","raw"],3]])], + + ['walletgen4',(4,'wallet generation (4) (brainwallet)', [])], +# ['walletgen4',(4,'wallet generation (4) (brainwallet, 192-bit seed (WIP))', [])], + ['addrgen4', (4,'address generation (4)', [[["mmdat"],4]])], + ['txcreate4', (4,'tx creation with inputs and outputs from four seed sources, plus non-MMGen inputs and outputs', [[["addrs"],1],[["addrs"],2],[["addrs"],3],[["addrs"],4]])], + ['txsign4', (4,'tx signing with inputs and outputs from incog file, mnemonic file, wallet and brainwallet, plus non-MMGen inputs and outputs', [[["mmincog"],1],[["mmwords"],2],[["mmdat"],3],[["mmbrain","raw"],4]])], + ['tool_encrypt', (9,"'mmgen-tool encrypt' (random data)", [])], + ['tool_decrypt', (9,"'mmgen-tool decrypt' (random data)", + [[[cfgs['9']['tool_enc_infn'], + cfgs['9']['tool_enc_infn']+".mmenc"],9]])], + ['tool_encrypt_ref', (9,"'mmgen-tool encrypt' (reference text)", [])], + ['tool_decrypt_ref', (9,"'mmgen-tool decrypt' (reference text)", + [[[cfgs['9']['tool_enc_ref_infn'], + cfgs['9']['tool_enc_ref_infn']+".mmenc"],9]])], +]) + +utils = { + 'check_deps': 'check dependencies for specified command', + 'clean': 'clean specified tmp dir(s) 1,2,3,4,5 or 6 (no arg = all dirs)', +} + +addrs_per_wallet = 8 # total of two outputs must be < 10 BTC for k in cfgs.keys(): cfgs[k]['amts'] = [0,0] for idx,mod in (0,6),(1,4): - cfgs[k]['amts'][idx] = "%s.%s" % ((getrand(2) % mod), str(getrand(4))[:5]) + cfgs[k]['amts'][idx] = "%s.%s" % ((getrandnum(2) % mod), str(getrandnum(4))[:5]) meta_cmds = OrderedDict([ ['ref', (6,("refwalletgen","refaddrgen","refkeyaddrgen"))], @@ -217,11 +199,12 @@ meta_cmds = OrderedDict([ ['2', (2,[k for k in cmd_data if cmd_data[k][0] == 2])], ['3', (3,[k for k in cmd_data if cmd_data[k][0] == 3])], ['4', (4,[k for k in cmd_data if cmd_data[k][0] == 4])], + ['tool', (9,("tool_encrypt","tool_decrypt","tool_encrypt_ref","tool_decrypt_ref"))], ]) from mmgen.Opts import * help_data = { - 'prog_name': "test.py", + 'prog_name': g.prog_name, 'desc': "Test suite for the MMGen suite", 'usage':"[options] [command or metacommand]", 'options': """ @@ -253,69 +236,60 @@ else: env["MMGEN_DISABLE_HOLD_PROTECT"] = "1" for k in 'debug','verbose','exact_output','pause','quiet': - globals()[k] = True if k in opts else False + g.__dict__[k] = True if k in opts else False -if debug: verbose = True +if g.debug: g.verbose = True -if exact_output: +if g.exact_output: def msg(s): pass vmsg = vmsg_r = msg_r = msg else: def msg(s): sys.stderr.write(s+"\n") def vmsg(s): - if verbose: sys.stderr.write(s+"\n") + if g.verbose: sys.stderr.write(s+"\n") def msg_r(s): sys.stderr.write(s) def vmsg_r(s): - if verbose: sys.stderr.write(s) + if g.verbose: sys.stderr.write(s) stderr_save = sys.stderr def silence(): - if not (verbose or exact_output): + if not (g.verbose or g.exact_output): sys.stderr = open("/dev/null","a") def end_silence(): - if not (verbose or exact_output): + if not (g.verbose or g.exact_output): sys.stderr = stderr_save def errmsg(s): stderr_save.write(s+"\n") def errmsg_r(s): stderr_save.write(s) -def Msg(s): sys.stdout.write(s+"\n") - if "list_cmds" in opts: + fs = " {:<{w}} - {}" Msg("Available commands:") w = max([len(i) for i in cmd_data]) for cmd in cmd_data: - Msg(" {:<{w}} - {}".format(cmd,cmd_data[cmd][1],w=w)) + Msg(fs.format(cmd,cmd_data[cmd][1],w=w)) Msg("\nAvailable metacommands:") w = max([len(i) for i in meta_cmds]) for cmd in meta_cmds: - Msg(" {:<{w}} - {}".format(cmd," + ".join(meta_cmds[cmd][1]),w=w)) + Msg(fs.format(cmd," + ".join(meta_cmds[cmd][1]),w=w)) Msg("\nAvailable utilities:") w = max([len(i) for i in utils]) for cmd in sorted(utils): - Msg(" {:<{w}} - {}".format(cmd,utils[cmd],w=w)) + Msg(fs.format(cmd,utils[cmd],w=w)) sys.exit() import pexpect,time,re import mmgen.config as g from mmgen.util import get_data_from_file,write_to_file,get_lines_from_file -redc,grnc,yelc,cyac,reset = ( - ["\033[%sm" % c for c in "31;1","32;1","33;1","36;1","0"] -) -def red(s): return redc+s+reset -def green(s): return grnc+s+reset -def yellow(s): return yelc+s+reset -def cyan(s): return cyac+s+reset - def my_send(p,t,delay=send_delay,s=False): if delay: time.sleep(delay) ret = p.send(t) # returns num bytes written if delay: time.sleep(delay) - if verbose: - ls = "" if debug or not s else " " + if g.verbose: + ls = "" if g.debug or not s else " " es = "" if s else " " msg("%sSEND %s%s" % (ls,es,yellow("'%s'"%t.replace('\n',r'\n')))) return ret @@ -323,7 +297,7 @@ def my_send(p,t,delay=send_delay,s=False): def my_expect(p,s,t='',delay=send_delay,regex=False,nonl=False): quo = "'" if type(s) == str else "" - if verbose: msg_r("EXPECT %s" % yellow(quo+str(s)+quo)) + if g.verbose: msg_r("EXPECT %s" % yellow(quo+str(s)+quo)) else: msg_r("+") try: @@ -335,7 +309,7 @@ def my_expect(p,s,t='',delay=send_delay,regex=False,nonl=False): errmsg(red("\nERROR. Expect %s%s%s timed out. Exiting" % (quo,s,quo))) sys.exit(1) - if debug or (verbose and type(s) != str): msg_r(" ==> %s " % ret) + if g.debug or (g.verbose and type(s) != str): msg_r(" ==> %s " % ret) if ret == -1: errmsg("Error. Expect returned %s" % ret) @@ -346,14 +320,6 @@ def my_expect(p,s,t='',delay=send_delay,regex=False,nonl=False): else: ret = my_send(p,t,delay,s) return ret -def cleandir(d): - try: files = os.listdir(d) - except: return - - msg(green("Cleaning directory '%s'" % d)) - for f in files: - os.unlink(os.path.join(d,f)) - def get_file_with_ext(ext,mydir,delete=True): flist = [os.path.join(mydir,f) for f in os.listdir(mydir) @@ -363,7 +329,7 @@ def get_file_with_ext(ext,mydir,delete=True): if len(flist) > 1: if delete: - if not quiet: + if not g.quiet: msg("Multiple *.%s files in '%s' - deleting" % (ext,mydir)) for f in flist: os.unlink(f) return False @@ -375,7 +341,7 @@ def get_addrfile_checksum(display=False): silence() from mmgen.addr import AddrInfo chk = AddrInfo(addrfile).checksum - if verbose and display: msg("Checksum: %s" % cyan(chk)) + if g.verbose and display: msg("Checksum: %s" % cyan(chk)) end_silence() return chk @@ -385,9 +351,6 @@ def verify_checksum_or_exit(checksum,chk): sys.exit(1) vmsg(green("Checksums match: %s") % (cyan(chk))) -def get_rand_printable_chars(num_chars,no_punc=False): - return [chr(ord(i)%94+33) for i in list(os.urandom(num_chars))] - class MMGenExpect(object): @@ -395,8 +358,7 @@ class MMGenExpect(object): if not 'system' in opts: mmgen_cmd = os.path.join(os.curdir,mmgen_cmd) desc = cmd_data[name][1] - if not desc: desc = name - if verbose or exact_output: + if g.verbose or g.exact_output: sys.stderr.write( green("Testing %s\nExecuting " % desc) + cyan("'%s %s'\n" % (mmgen_cmd," ".join(cmd_args))) @@ -406,27 +368,27 @@ class MMGenExpect(object): # msgrepr(mmgen_cmd,cmd_args); msg("") if env: self.p = pexpect.spawn(mmgen_cmd,cmd_args,env=env) else: self.p = pexpect.spawn(mmgen_cmd,cmd_args) - if exact_output: self.p.logfile = sys.stdout + if g.exact_output: self.p.logfile = sys.stdout def license(self): p = "'w' for conditions and warranty info, or 'c' to continue: " my_expect(self.p,p,'c') def usr_rand(self,num_chars): - rand_chars = get_rand_printable_chars(num_chars) + rand_chars = list(getrandstr(num_chars,no_space=True)) my_expect(self.p,'symbols left: ','x') try: vmsg_r("SEND ") while self.p.expect('left: ',0.1) == 0: ch = rand_chars.pop(0) - msg_r(yellow(ch)+" " if verbose else "+") + msg_r(yellow(ch)+" " if g.verbose else "+") self.p.send(ch) except: vmsg("EOT") my_expect(self.p,"ENTER to continue: ",'\n') def passphrase_new(self,what,passphrase): - my_expect(self.p,("Enter passphrase for new %s: " % what), passphrase+"\n") + my_expect(self.p,("Enter passphrase for %s: " % what), passphrase+"\n") my_expect(self.p,"Repeat passphrase: ", passphrase+"\n") def passphrase(self,what,passphrase,pwtype=""): @@ -483,7 +445,6 @@ class MMGenExpect(object): def read(self,n=None): return self.p.read(n) - from mmgen.rpc.data import TransactionInfo from decimal import Decimal from mmgen.bitcoin import verify_addr @@ -491,13 +452,13 @@ from mmgen.bitcoin import verify_addr def add_fake_unspent_entry(out,address,comment): out.append(TransactionInfo( account = unicode(comment), - vout = int(getrand(4) % 8), + vout = int(getrandnum(4) % 8), txid = unicode(hexlify(os.urandom(32))), - amount = Decimal("%s.%s" % (10+(getrand(4) % 40), getrand(4) % 100000000)), + amount = Decimal("%s.%s" % (10+(getrandnum(4) % 40), getrandnum(4) % 100000000)), address = address, spendable = False, scriptPubKey = ("76a914"+verify_addr(address,return_hex=True)+"88ac"), - confirmations = getrand(4) % 500 + confirmations = getrandnum(4) % 500 )) def create_fake_unspent_data(adata,unspent_data_file,tx_data,non_mmgen_input=''): @@ -511,7 +472,7 @@ def create_fake_unspent_data(adata,unspent_data_file,tx_data,non_mmgen_input='') if non_mmgen_input: from mmgen.bitcoin import privnum2addr,hextowif - privnum = getrand(32) + privnum = getrandnum(32) btcaddr = privnum2addr(privnum,compressed=True) of = os.path.join(cfgs[non_mmgen_input]['tmpdir'],non_mmgen_fn) write_to_file(of, hextowif("{:064x}".format(privnum), @@ -538,34 +499,24 @@ def make_brainwallet_file(fn): wl = tirosh_words.split("\n") nwords,ws_list,max_spaces = 10," \n",5 def rand_ws_seq(): - nchars = getrand(1) % max_spaces + 1 - return "".join([ws_list[getrand(1)%len(ws_list)] for i in range(nchars)]) - rand_pairs = [wl[getrand(4) % len(wl)] + rand_ws_seq() for i in range(nwords)] + nchars = getrandnum(1) % max_spaces + 1 + return "".join([ws_list[getrandnum(1)%len(ws_list)] for i in range(nchars)]) + rand_pairs = [wl[getrandnum(4) % len(wl)] + rand_ws_seq() for i in range(nwords)] d = "".join(rand_pairs).rstrip() + "\n" - if verbose: msg_r("Brainwallet password:\n%s" % cyan(d)) + if g.verbose: msg_r("Brainwallet password:\n%s" % cyan(d)) write_to_file(fn,d,{},"brainwallet password") def do_between(): - if pause: + if g.pause: from mmgen.util import keypress_confirm if keypress_confirm(green("Continue?"),default_yes=True): - if verbose or exact_output: sys.stderr.write("\n") + if g.verbose or g.exact_output: sys.stderr.write("\n") else: errmsg("Exiting at user request") sys.exit() - elif verbose or exact_output: + elif g.verbose or g.exact_output: sys.stderr.write("\n") -def do_cmd(ts,cmd): - - d = [(str(num),ext) for exts,num in cmd_data[cmd][2] for ext in exts] - al = [get_file_with_ext(ext,cfgs[num]['tmpdir']) for num,ext in d] - - global cfg - cfg = cfgs[str(cmd_data[cmd][0])] - - ts.__class__.__dict__[cmd](*([ts,cmd] + al)) - hincog_bytes = 1024*1024 hincog_offset = 98765 @@ -573,115 +524,129 @@ hincog_seedlen = 256 rebuild_list = OrderedDict() -def get_num_ext_for_cmd(cmd): - num = str(cmd_data[cmd][0]) - dgl = cfgs[num]['dep_generators'] -# msgrepr(num,cmd,dgl) - if cmd in dgl.values(): - ext = [k for k in dgl if dgl[k] == cmd][0] - return (num,ext) - else: - return ('','') +def check_needs_rerun(ts,cmd,build=False,root=True,force_delete=False,dpy=False): -def check_needs_rerun(cmd,build=False,root=True,force_delete=False): + rerun = True if root else False # force_delete is not passed to recursive call - rerun = True if root else False + fns = [] + if force_delete or not root: + ret = ts.get_num_exts_for_cmd(cmd,dpy) #does cmd produce a needed dependency? + if ret: + for ext in ret[1]: + fn = get_file_with_ext(ext,cfgs[ret[0]]['tmpdir'],delete=build) + if fn: + if force_delete: os.unlink(fn) + else: fns.append(fn) + else: rerun = True - num,ext = get_num_ext_for_cmd(cmd) # does cmd produce a needed dependency? - if num and (force_delete or not root): - fn = get_file_with_ext(ext,cfgs[num]['tmpdir'],delete=build) - if not fn: rerun = True - if fn and force_delete: - os.unlink(fn); fn = "" - else: fn = "" + fdeps = ts.generate_file_deps(cmd) + cdeps = ts.generate_cmd_deps(fdeps) - fdeps = [(str(n),e) for exts,n in cmd_data[cmd][2] for e in exts] - cdeps = [cfgs[str(n)]['dep_generators'][e] for n,e in fdeps] - - if fn: + for fn in fns: my_age = os.stat(fn).st_mtime for num,ext in fdeps: f = get_file_with_ext(ext,cfgs[num]['tmpdir'],delete=build) if f and os.stat(f).st_mtime > my_age: rerun = True for cdep in cdeps: - if check_needs_rerun(cdep,build=build,root=False): rerun = True + if check_needs_rerun(ts,cdep,build=build,root=False,dpy=cmd): rerun = True if build: if rerun: - if fn and not root: - os.unlink(fn) - do_cmd(ts,cmd) + for fn in fns: + if not root: os.unlink(fn) + ts.do_cmd(cmd) if not root: do_between() else: # If prog produces multiple files: if cmd not in rebuild_list or rerun == True: - rebuild_list[cmd] = (rerun,fn) + rebuild_list[cmd] = (rerun,fns[0] if fns else "") # FIX return rerun -def mk_tmpdir(cfg): - try: os.mkdir(cfg['tmpdir'],0755) - except OSError as e: - if e.errno != 17: raise - else: msg("Created directory '%s'" % cfg['tmpdir']) - def refcheck(what,chk,refchk): vmsg("Comparing %s '%s' to stored reference" % (what,chk)) if chk == refchk: ok() else: - if not verbose: errmsg("") + if not g.verbose: errmsg("") errmsg(red(""" Fatal error - %s '%s' does not match reference value '%s'. Aborting test """.strip() % (what,chk,refchk))) sys.exit(3) -def ok(): - if verbose or exact_output: - sys.stderr.write(green("OK\n")) - else: msg(" OK") +def check_deps(ts,name,cmds): + if len(cmds) != 1: + msg("Usage: %s check_deps " % g.prog_name) + sys.exit(1) + + cmd = cmds[0] + + if cmd not in cmd_data: + msg("'%s': unrecognized command" % cmd) + sys.exit(1) + + if not g.quiet: + msg("Checking dependencies for '%s'" % (cmd)) + + check_needs_rerun(ts,cmd,build=False) + + w = max(len(i) for i in rebuild_list) + 1 + for cmd in rebuild_list: + c = rebuild_list[cmd] + m = "Rebuild" if (c[0] and c[1]) else "Build" if c[0] else "OK" + msg("cmd {:<{w}} {}".format(cmd+":", m, w=w)) +# msgrepr(cmd,c) + + +def clean(dirs=[]): + ts = MMGenTestSuite() + dirlist = ts.list_tmp_dirs() + if not dirs: dirs = dirlist.keys() + for d in dirs: + if d in sorted(dirlist): + cleandir(dirlist[d]) + else: + msg("%s: invalid directory number" % d) + sys.exit(1) class MMGenTestSuite(object): def __init__(self): pass - def check_deps(self,name,cmds): - if len(cmds) != 1: - msg("Usage: %s check_deps " % g.prog_name) - sys.exit(1) + def list_tmp_dirs(self): + d = {} + for k in cfgs: d[k] = cfgs[k]['tmpdir'] + return d - cmd = cmds[0] + def get_num_exts_for_cmd(self,cmd,dpy=False): # dpy ignored here + num = str(cmd_data[cmd][0]) + dgl = cfgs[num]['dep_generators'] +# msgrepr(num,cmd,dgl) + if cmd in dgl.values(): + ext = [k for k in dgl if dgl[k] == cmd][0] + return (num,[ext]) + else: + return None - if cmd not in cmd_data: - msg("'%s': unrecognized command" % cmd) - sys.exit(1) + def do_cmd(self,cmd): - if not quiet: - msg("Checking dependencies for '%s'" % (cmd)) + d = [(str(num),ext) for exts,num in cmd_data[cmd][2] for ext in exts] + al = [get_file_with_ext(ext,cfgs[num]['tmpdir']) for num,ext in d] - check_needs_rerun(cmd,build=False) + global cfg + cfg = cfgs[str(cmd_data[cmd][0])] - w = max(len(i) for i in rebuild_list) + 1 - for cmd in rebuild_list: - c = rebuild_list[cmd] - m = "Rebuild" if (c[0] and c[1]) else "Build" if c[0] else "OK" - msg("cmd {:<{w}} {}".format(cmd+":", m, w=w)) -# msgrepr(cmd,c) + self.__class__.__dict__[cmd](*([self,cmd] + al)) + def generate_file_deps(self,cmd): + return [(str(n),e) for exts,n in cmd_data[cmd][2] for e in exts] - def clean(self,name,dirs=[]): - dirlist = dirs if dirs else sorted(cfgs.keys()) - for k in dirlist: - if k in cfgs: - cleandir(cfgs[k]['tmpdir']) - else: - msg("%s: invalid directory index" % k) - sys.exit(1) + def generate_cmd_deps(self,fdeps): + return [cfgs[str(n)]['dep_generators'][ext] for n,ext in fdeps] def walletgen(self,name,brain=False): - mk_tmpdir(cfg) args = ["-d",cfg['tmpdir'],"-p1","-r10"] # if 'seed_len' in cfg: args += ["-l",cfg['seed_len']] @@ -703,22 +668,20 @@ class MMGenTestSuite(object): t.expect("Generating encryption key from OS random data plus %s" % s) if brain: break - t.passphrase_new("MMGen wallet",cfg['wpasswd']) + t.passphrase_new("new MMGen wallet",cfg['wpasswd']) t.written_to_file("Wallet") ok() def refwalletgen(self,name): - mk_tmpdir(cfg) args = ["-q","-d",cfg['tmpdir'],"-p1","-r10","-b"+cfg['bw_hashparams']] t = MMGenExpect(name,"mmgen-walletgen", args) t.expect("passphrase: ",cfg['bw_passwd']+"\n") t.usr_rand(10) - t.passphrase_new("MMGen wallet",cfg['wpasswd']) + t.passphrase_new("new MMGen wallet",cfg['wpasswd']) key_id = t.written_to_file("Wallet").split("-")[0].split("/")[-1] refcheck("key id",key_id,cfg['key_id']) def passchg(self,name,walletfile): - mk_tmpdir(cfg) t = MMGenExpect(name,"mmgen-passchg", ["-d",cfg['tmpdir'],"-p","2","-L","New Label","-r","16",walletfile]) @@ -778,7 +741,7 @@ class MMGenTestSuite(object): self.txcreate_common(name,sources=['1']) def txcreate_common(self,name,sources=['1'],non_mmgen_input=''): - if verbose or exact_output: + if g.verbose or g.exact_output: sys.stderr.write(green("Generating fake transaction info\n")) silence() from mmgen.addr import AddrInfo,AddrInfoList @@ -805,7 +768,7 @@ class MMGenTestSuite(object): # make the command line from mmgen.bitcoin import privnum2addr - btcaddr = privnum2addr(getrand(32),compressed=True) + btcaddr = privnum2addr(getrandnum(32),compressed=True) cmd_args = ["-d",cfg['tmpdir']] for num in tx_data.keys(): @@ -823,7 +786,7 @@ class MMGenTestSuite(object): env["MMGEN_BOGUS_WALLET_DATA"] = unspent_data_file end_silence() - if verbose or exact_output: sys.stderr.write("\n") + if g.verbose or g.exact_output: sys.stderr.write("\n") t = MMGenExpect(name,"mmgen-txcreate",cmd_args,env) t.license() @@ -908,7 +871,7 @@ class MMGenTestSuite(object): def export_incog_hidden(self,name,walletfile): rf,rd = os.path.join(cfg['tmpdir'],hincog_fn),os.urandom(hincog_bytes) vmsg(green("Writing %s bytes of data to file '%s'" % (hincog_bytes,rf))) - write_to_file(rf,rd,{},verbose=verbose) + write_to_file(rf,rd,{},verbose=g.verbose) t = self.export_incog(name,walletfile,args=["-G","%s,%s"%(rf,hincog_offset)]) t.written_to_file("Data",query="") ok() @@ -960,7 +923,7 @@ class MMGenTestSuite(object): return t.expect("Encrypt key list? (y/N): ","y") t.hash_preset("new key list",'1') - t.passphrase_new("key list",cfg['kapasswd']) + t.passphrase_new("new key list",cfg['kapasswd']) t.written_to_file("Keys") ok() @@ -1054,86 +1017,41 @@ class MMGenTestSuite(object): t.written_to_file("Signed transaction") ok() + def tool_encrypt(self,name,infile=""): + if infile: + infn = infile + else: + d = os.urandom(1033) + tmp_fn = cfg['tool_enc_infn'] + write_to_tmpfile(cfg,tmp_fn,d) + infn = get_tmpfile_fn(cfg,tmp_fn) + t = MMGenExpect(name,"mmgen-tool",["-d",cfg['tmpdir'],"encrypt",infn]) + t.hash_preset("user data",'1') + t.passphrase_new("user data",cfg['tool_enc_passwd']) + t.written_to_file("Encrypted data") + ok() -def write_to_tmpfile(fn,data): - write_to_file(os.path.join(cfg['tmpdir'],fn),data,{},silent=True) + def tool_encrypt_ref(self,name): + infn = get_tmpfile_fn(cfg,cfg['tool_enc_ref_infn']) + write_to_file(infn,cfg['tool_enc_reftext'],{},silent=True) + self.tool_encrypt(name,infn) -def read_from_tmpfile(fn): - from mmgen.util import get_data_from_file - return get_data_from_file(os.path.join(cfg['tmpdir'],fn),silent=True) + # Two deps produced by one prog is broken - TODO + def tool_decrypt(self,name,f1,f2): + of = name + ".out" + t = MMGenExpect(name,"mmgen-tool", + ["-d",cfg['tmpdir'],"decrypt",f2,"outfile="+of,"hash_preset=1"]) + t.passphrase("user data",cfg['tool_enc_passwd']) + t.written_to_file("Decrypted data") + d1 = read_from_file(f1) + d2 = read_from_file(get_tmpfile_fn(cfg,of)) + cmp_or_die(d1,d2) -def read_from_file(fn): - from mmgen.util import get_data_from_file - return get_data_from_file(fn,silent=True) - -class MMGenToolTestSuite(object): - - def __init__(self): - global cmd_data,tool_cmd_data - cmd_data = tool_cmd_data - pass - - def clean(self,name): - cleandir(cfgs['10']['tmpdir']) - - def cmd(self,name,tool_args): - mk_tmpdir(cfg) - t = MMGenExpect(name,"mmgen-tool", ["-d",cfg['tmpdir']] + tool_args) - return t.read() - - def cmd_to_tmpfile(self,name,tool_args,tmpfile): - ret = self.cmd(name,tool_args) - if ret: - write_to_tmpfile(tmpfile,ret) - ok() - - def strtob58(self,name): - s = "".join(get_rand_printable_chars(15)) - write_to_tmpfile('strtob58.in',s) - self.cmd_to_tmpfile(name,["strtob58",s],'strtob58.out') - - def b58tostr(self,name,f1,f2): - idata = read_from_file(f1) - odata = read_from_file(f2)[:-2] - res = self.cmd(name,["b58tostr",odata])[:-2] - if res == idata: ok() - else: errmsg(red("Error")) - - def hextob58(self,name): - hexnum = getrandhex(32) - write_to_tmpfile('hextob58.in',hexnum) - self.cmd_to_tmpfile(name,["hextob58",hexnum],'hextob58.out') - - def b58tohex(self,name,f1,f2): - idata = read_from_file(f1) - odata = read_from_file(f2)[:-2] - res = self.cmd(name,["b58tohex",odata])[:-2] - if res == idata: ok() - else: errmsg(red("Error")) -# "b58randenc": [], -# "randhex": ['nbytes [int=32]'], -# "randwif": ['compressed [bool=False]'], -# "randpair": ['compressed [bool=False]'], -# "wif2hex": [' [str]', 'compressed [bool=False]'], -# "wif2addr": [' [str]', 'compressed [bool=False]'], -# "hex2wif": [' [str]', 'compressed [bool=False]'], -# "hexdump": [' [str]', 'cols [int=8]', 'line_nums [bool=True]'], -# "unhexdump": [' [str]'], -# "hex2mn": [' [str]','wordlist [str="electrum"]'], -# "mn2hex": [' [str]', 'wordlist [str="electrum"]'], -# "b32tohex": [' [str]'], -# "hextob32": [' [str]'], -# "mn_rand128": ['wordlist [str="electrum"]'], -# "mn_rand192": ['wordlist [str="electrum"]'], -# "mn_rand256": ['wordlist [str="electrum"]'], -# "mn_stats": ['wordlist [str="electrum"]'], -# "mn_printlist": ['wordlist [str="electrum"]'], -# "id8": [' [str]'], -# "id6": [' [str]'], -# "str2id6": [' [str]'], + def tool_decrypt_ref(self,name,f1,f2): + self.tool_decrypt(name,f1,f2) # main() -if pause: +if g.pause: import termios,atexit fd = sys.stdin.fileno() old = termios.tcgetattr(fd) @@ -1142,25 +1060,26 @@ if pause: atexit.register(at_exit) start_time = int(time.time()) +ts = MMGenTestSuite() + +for cfg in sorted(cfgs): mk_tmpdir(cfgs[cfg]) try: - if cmd_args and cmd_args[0] != "tool": + if cmd_args: arg1 = cmd_args[0] if arg1 in utils: - MMGenTestSuite.__dict__[arg1](ts,arg1,cmd_args[1:]) + globals()[arg1](cmd_args[1:]) sys.exit() elif arg1 in meta_cmds: - ts = MMGenTestSuite() if len(cmd_args) == 1: for cmd in meta_cmds[arg1][1]: - check_needs_rerun(cmd,build=True,force_delete=True) + check_needs_rerun(ts,cmd,build=True,force_delete=True) else: msg("Only one meta command may be specified") sys.exit(1) - elif arg1 in cmd_data.keys() + tool_cmd_data.keys(): - ts = MMGenTestSuite() if arg1 in cmd_data else MMGenToolTestSuite() + elif arg1 in cmd_data.keys(): if len(cmd_args) == 1: - check_needs_rerun(arg1,build=True) + check_needs_rerun(ts,arg1,build=True) else: msg("Only one command may be specified") sys.exit(1) @@ -1168,17 +1087,9 @@ try: errmsg("%s: unrecognized command" % arg1) sys.exit(1) else: - if cmd_args: # tool - if len(cmd_args) != 1: - msg("Only one command may be specified") - sys.exit(1) - ts = MMGenToolTestSuite() - else: - ts = MMGenTestSuite() - - ts.clean("clean") + clean() for cmd in cmd_data: - do_cmd(ts,cmd) + ts.do_cmd(cmd) if cmd is not cmd_data.keys()[-1]: do_between() except: sys.stderr = stderr_save diff --git a/test/tooltest.py b/test/tooltest.py new file mode 100755 index 00000000..4be4a4cd --- /dev/null +++ b/test/tooltest.py @@ -0,0 +1,351 @@ +#!/usr/bin/python + +# Chdir to repo root. +# Since script is not in repo root, fix sys.path so that modules are +# imported from repo, not system. +import sys,os +pn = os.path.dirname(sys.argv[0]) +os.chdir(os.path.join(pn,os.pardir)) +sys.path.__setitem__(0,os.path.abspath(os.curdir)) + +import mmgen.config as g +from mmgen.util import msg,msg_r,vmsg,vmsg_r,Msg,msgrepr, msgrepr_exit +from collections import OrderedDict + +cmd_data = OrderedDict([ + ('util', { + 'desc': "base conversion, hashing and file utilities", + 'cmd_data': OrderedDict([ + ('strtob58', ()), + ('b58tostr', ("strtob58","io")), + ('hextob58', ()), + ('b58tohex', ("hextob58","io")), + ('b58randenc', ()), + ('hextob32', ()), + ('b32tohex', ("hextob32","io")), + ('randhex', ()), + ('id8', ()), + ('id6', ()), + ('str2id6', ()), + ("sha256x2", ()), + ("hexreverse", ()), + ("hexlify", ()), + ('hexdump', ()), + ('unhexdump', ("hexdump","io")), + ('rand2file', ()), + ]) + } + ), + ('bitcoin', { + 'desc': "Bitcoin address/key commands", + 'cmd_data': OrderedDict([ + ('randwif', ()), + ('randpair', ()), + ('wif2addr', ("randpair","o2")), + ('wif2hex', ("randpair","o2")), + ('privhex2addr', ("wif2hex","o2")), # wif from randpair o2 + ('hex2wif', ("wif2hex","io2")), + ('addr2hexaddr', ("randpair","o2")), + ('hexaddr2addr', ("addr2hexaddr","io2")), +# ("pubkey2addr", [' [str]']), +# ("pubkey2hexaddr", [' [str]']), + ]) + } + ), + ('mnemonic', { + 'desc': "mnemonic commands", + 'cmd_data': OrderedDict([ + ('hex2mn', ()), + ('mn2hex', ("hex2mn","io3")), + ('mn_rand128', ()), + ('mn_rand192', ()), + ('mn_rand256', ()), + ('mn_stats', ()), + ('mn_printlist', ()), + ]) + } + ) +]) + +cfg = { + 'name': "the tool utility", + 'enc_passwd': "Ten Satoshis", + 'tmpdir': "test/tmp10", + 'tmpdir_num': 10, +} + +from mmgen.Opts import * +help_data = { + 'prog_name': g.prog_name, + 'desc': "Test suite for the 'mmgen-tool' utility", + 'usage':"[options] [command]", + 'options': """ +-h, --help Print this help message +-d, --debug Produce debugging output +-l, --list-cmds List and describe the tests and commands in the test suite +-s, --system Test scripts and modules installed on system rather than those in the repo root +-v, --verbose Produce more verbose output +""", + 'notes': """ + +If no command is given, the whole suite of tests is run. +""" +} + +opts,cmd_args = parse_opts(sys.argv,help_data) + +if 'system' in opts: sys.path.pop(0) + +env = os.environ + +for k in 'debug','verbose','quiet','exact_output': + g.__dict__[k] = True if k in opts else False + +if g.debug: g.verbose = True + +if "list_cmds" in opts: + fs = " {:<{w}} - {}" + Msg("Available commands:") + w = max([len(i) for i in cmd_data]) + for cmd in cmd_data: + Msg(fs.format(cmd,cmd_data[cmd]['desc'],w=w)) + Msg("\nAvailable utilities:") + Msg(fs.format("clean","Clean the tmp directory",w=w)) + sys.exit() + +import binascii +import mmgen.config as g +from mmgen.test import * +from mmgen.util import get_data_from_file,write_to_file,get_lines_from_file +from mmgen.tx import is_wif,is_btc_addr,is_b58_str +from mmgen.mnemonic import get_seed_from_mnemonic + +class MMGenToolTestSuite(object): + + def __init__(self): + pass + + def gen_deps_for_cmd(self,cmd,cdata): + fns = [] + if cdata: + name,code = cdata + io,count = code,1 + if code[-1] in "0123456789": + io,count = code[:-1],int(code[-1]) + + for c in range(count): + fns += ["%s%s%s" % ( + name, + (c+1 if count > 1 else ""), + ('.in' if ch=='i' else '.out') + ) for ch in io] + return fns + + def get_num_exts_for_cmd(self,cmd,dpy): # dpy required here + num = str(tool_cfgs['tmpdir_num']) + # return only first file - a hack + exts = gen_deps_for_cmd(dpy) + return num,exts + + def do_cmds(self,cmd_group): + cdata = cmd_data[cmd_group]['cmd_data'] + for cmd in cdata: self.do_cmd(cmd,cdata[cmd]) + + def do_cmd(self,cmd,cdata): + + fns = self.gen_deps_for_cmd(cmd,cdata) + + file_list = [os.path.join(cfg['tmpdir'],fn) for fn in fns] + + self.__class__.__dict__[cmd](*([self,cmd] + file_list)) + + + def run_cmd(self,name,tool_args,kwargs="",extra_msg="",silent=False): + mmgen_tool = "mmgen-tool" + if not 'system' in opts: + mmgen_tool = os.path.join(os.curdir,mmgen_tool) + + sys_cmd = [mmgen_tool, "-d",cfg['tmpdir'], name] + tool_args + kwargs.split() + if extra_msg: extra_msg = "(%s)" % extra_msg + full_name = " ".join([name]+kwargs.split()+extra_msg.split()) + if not silent: + if g.verbose: + sys.stderr.write(green("Testing %s\nExecuting " % full_name)) + sys.stderr.write("%s\n" % cyan(repr(sys_cmd))) + else: + msg_r("Testing %-31s%s" % (full_name+":","")) + + import subprocess + return subprocess.check_output(sys_cmd) + + def run_cmd_chk(self,name,f1,f2,kwargs="",extra_msg=""): + idata = read_from_file(f1)[:-1] + odata = read_from_file(f2)[:-1] + ret = self.run_cmd(name,[odata],kwargs=kwargs,extra_msg=extra_msg)[:-1] + vmsg("In: " + repr(odata)) + vmsg("Out: " + repr(ret)) + if ret == idata: ok() + else: + msg(red( + "Error: values don't match:\nIn: %s\nOut: %s" % (repr(idata),repr(ret)))) + sys.exit(3) + return ret + + def run_cmd_nochk(self,name,f1,kwargs=""): + odata = read_from_file(f1)[:-1] + ret = self.run_cmd(name,[odata],kwargs=kwargs)[:-1] + vmsg("In: " + repr(odata)) + vmsg("Out: " + repr(ret)) + return ret + + def run_cmd_out(self,name,carg=None,Return=False,kwargs="",fn_idx="",extra_msg=""): + if carg: write_to_tmpfile(cfg,"%s%s.in" % (name,fn_idx),carg+"\n") + ret = self.run_cmd(name,[carg] if carg else [],kwargs=kwargs,extra_msg=extra_msg) + if carg: vmsg("In: " + repr(carg)) + vmsg("Out: " + repr(ret[:-1])) + if ret: + write_to_tmpfile(cfg,"%s%s.out" % (name,fn_idx),ret) + if Return: return ret + else: ok() + else: + msg(red("Error for command '%s'" % name)) + sys.exit(3) + + def run_cmd_randfileinput(self,name): + s = os.urandom(128) + fn = name+".in" + write_to_tmpfile(cfg,fn,s) + ret = self.run_cmd(name,[get_tmpfile_fn(cfg,fn)]) + fn = name+".out" + write_to_tmpfile(cfg,fn,ret) + ok() + vmsg("Returned: %s" % ret) + + def str2id6(self,name): + s = getrandstr(120,no_space=True) + s2 = " %s %s %s %s %s " % (s[:3],s[3:9],s[9:29],s[29:50],s[50:120]) + ret1 = self.run_cmd(name,[s],extra_msg="unspaced input"); ok() + ret2 = self.run_cmd(name,[s2],extra_msg="spaced input") + cmp_or_die(ret1,ret2) + vmsg("Returned: %s" % ret1) + + def mn_rand128(self,name): + self.run_cmd_out(name) + + def mn_rand192(self,name): + self.run_cmd_out(name) + + def mn_rand256(self,name): + self.run_cmd_out(name) + + def mn_stats(self,name): + self.run_cmd_out(name) + + def mn_printlist(self,name): + self.run_cmd(name,[]) + ok() + + def id6(self,name): self.run_cmd_randfileinput(name) + def id8(self,name): self.run_cmd_randfileinput(name) + def hexdump(self,name): self.run_cmd_randfileinput(name) + + def unhexdump(self,name,fn1,fn2): + ret = self.run_cmd(name,[fn2]) + orig = read_from_file(fn1) + cmp_or_die(orig,ret) + + def rand2file(self,name): + of = name + ".out" + dlen = 1024 + self.run_cmd(name,[of,str(1024),"threads=4","silent=1"]) + d = read_from_tmpfile(cfg,of) + cmp_or_die(dlen,len(d)) + + def strtob58(self,name): self.run_cmd_out(name,getrandstr(16)) + def sha256x2(self,name): self.run_cmd_out(name,getrandstr(16)) + def hexreverse(self,name): self.run_cmd_out(name,getrandhex(24)) + def hexlify(self,name): self.run_cmd_out(name,getrandstr(24)) + def b58tostr(self,name,f1,f2): self.run_cmd_chk(name,f1,f2) + def hextob58(self,name): self.run_cmd_out(name,getrandhex(32)) + def b58tohex(self,name,f1,f2): self.run_cmd_chk(name,f1,f2) + def hextob32(self,name): self.run_cmd_out(name,getrandhex(24)) + def b32tohex(self,name,f1,f2): self.run_cmd_chk(name,f1,f2) + def b58randenc(self,name): + ret = self.run_cmd_out(name,Return=True) + ok_or_die(ret[:-1],is_b58_str,"base 58 string") + def randhex(self,name): + ret = self.run_cmd_out(name,Return=True) + ok_or_die(ret[:-1],binascii.unhexlify,"hex string") + def randwif(self,name): + for n,k in enumerate(["","compressed=1"]): + ret = self.run_cmd_out(name,kwargs=k,Return=True,fn_idx=n+1) + ok_or_die(ret[:-1],is_wif,"WIF key") + def randpair(self,name): + for n,k in enumerate(["","compressed=1"]): + wif,addr = self.run_cmd_out(name,kwargs=k,Return=True,fn_idx=n+1).split() + ok_or_die(wif,is_wif,"WIF key",skip_ok=True) + ok_or_die(addr,is_btc_addr,"Bitcoin address") + def hex2wif(self,name,f1,f2,f3,f4): + for n,fi,fo,k in (1,f1,f2,""),(2,f3,f4,"compressed=1"): + ret = self.run_cmd_chk(name,fi,fo,kwargs=k) + def wif2hex(self,name,f1,f2): + for n,f,k in (1,f1,""),(2,f2,"compressed=1"): + wif = read_from_file(f).split()[0] + self.run_cmd_out(name,wif,kwargs=k,fn_idx=n) + def wif2addr(self,name,f1,f2): + for n,f,k in (1,f1,""),(2,f2,"compressed=1"): + wif = read_from_file(f).split()[0] + self.run_cmd_out(name,wif,kwargs=k,fn_idx=n) + def addr2hexaddr(self,name,f1,f2): + for n,f,m in (1,f1,""),(2,f2,"from compressed"): + addr = read_from_file(f).split()[-1] + self.run_cmd_out(name,addr,fn_idx=n,extra_msg=m) + def hexaddr2addr(self,name,f1,f2,f3,f4): + for n,fi,fo,m in (1,f1,f2,""),(2,f3,f4,"from compressed"): + self.run_cmd_chk(name,fi,fo,extra_msg=m) + def privhex2addr(self,name,f1,f2): + key1 = read_from_file(f1) + key2 = read_from_file(f2) + for n,args in enumerate([[key1],[key2,"compressed=1"]]): + ret = self.run_cmd(name,args).rstrip() + iaddr = read_from_tmpfile(cfg,"randpair%s.out" % (n+1)).split()[-1] + cmp_or_die(iaddr,ret) + def hex2mn(self,name): + for n,size,m in(1,16,"128-bit"),(2,24,"192-bit"),(3,32,"256-bit"): + hexnum = getrandhex(size) + self.run_cmd_out(name,hexnum,fn_idx=n,extra_msg=m) + def mn2hex(self,name,f1,f2,f3,f4,f5,f6): + for f_i,f_o,m in (f1,f2,"128-bit"),(f3,f4,"192-bit"),(f5,f6,"256-bit"): + self.run_cmd_chk(name,f_i,f_o,extra_msg=m) + +# main() +import time +start_time = int(time.time()) +ts = MMGenToolTestSuite() +mk_tmpdir(cfg) + +if cmd_args: + if len(cmd_args) != 1: + msg("Only one command may be specified") + sys.exit(1) + + cmd = cmd_args[0] + if cmd in cmd_data: + msg("Running tests for %s:" % cmd_data[cmd]['desc']) + ts.do_cmds(cmd) + elif cmd == "clean": + cleandir(cfg['tmpdir']) + sys.exit(0) + else: + msg("'%s': unrecognized command" % cmd) + sys.exit(1) +else: + cleandir(cfg['tmpdir']) + for cmd in cmd_data: + msg("Running tests for %s:" % cmd_data[cmd]['desc']) + ts.do_cmds(cmd) + if cmd is not cmd_data.keys()[-1]: msg("") + +t = int(time.time()) - start_time +msg(green( + "All requested tests finished OK, elapsed time: %02i:%02i" % (t/60,t%60)))