From ab8b5d053ee444ae2d01489e771eebb88e472fd6 Mon Sep 17 00:00:00 2001 From: MMGen Date: Tue, 14 May 2019 09:42:02 +0000 Subject: [PATCH] modularize unit_tests.py --- MANIFEST.in | 1 + test/unit_tests.py | 295 +------------------------ test/unit_tests_d/__init__.py | 0 test/unit_tests_d/ut_subseed.py | 180 +++++++++++++++ test/unit_tests_d/ut_tx_deserialize.py | 114 ++++++++++ 5 files changed, 304 insertions(+), 286 deletions(-) create mode 100755 test/unit_tests_d/__init__.py create mode 100755 test/unit_tests_d/ut_subseed.py create mode 100755 test/unit_tests_d/ut_tx_deserialize.py diff --git a/MANIFEST.in b/MANIFEST.in index 719bd3b1..1a17566e 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -3,6 +3,7 @@ include doc/wiki/using-mmgen/* include test/*.py include test/test_py_d/*.py +include test/unit_tests_d/*.py include test/ref/* include test/ref/litecoin/* include test/ref/ethereum/* diff --git a/test/unit_tests.py b/test/unit_tests.py index 76084981..780f6cbc 100755 --- a/test/unit_tests.py +++ b/test/unit_tests.py @@ -51,295 +51,12 @@ If no test is specified, all available tests are run sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:] cmd_args = opts.init(opts_data) -class UnitTests(object): - - def _get_core_repo_root(self): - self.core_repo_root = os.getenv('CORE_REPO_ROOT') - if not self.core_repo_root: - die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test') - - def tx_deserialize(self,name): - - def test_tx(txhex,desc,n): - - def has_nonstandard_outputs(outputs): - for o in outputs: - t = o['scriptPubKey']['type'] - if t in ('nonstandard','pubkey','nulldata'): - return True - return False - - d = g.rpch.decoderawtransaction(txhex) - - if has_nonstandard_outputs(d['vout']): return False - - dt = DeserializedTX(txhex) - - if opt.verbose: - Msg('\n====================================================') - Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc)) - if opt.verbose: - Pmsg(d) - Msg('----------------------------------------------------') - Pmsg(dt) - - # metadata - assert dt['txid'] == d['txid'],'TXID does not match' - assert dt['lock_time'] == d['locktime'],'Locktime does not match' - assert dt['version'] == d['version'],'Version does not match' - - # inputs - a,b = d['vin'],dt['txins'] - for i in range(len(a)): - assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i) - assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i) - assert a[i]['sequence'] == int(b[i]['nSeq'],16),( - 'nSeq of input {} does not match'.format(i)) - if 'txinwitness' in a[i]: - assert a[i]['txinwitness'] == b[i]['witness'],( - 'witness of input {} does not match'.format(i)) - - # outputs - a,b = d['vout'],dt['txouts'] - for i in range(len(a)): - assert a[i]['scriptPubKey']['addresses'][0] == b[i]['address'],( - 'address of ouput {} does not match'.format(i)) - assert a[i]['value'] == b[i]['amount'],'value of ouput {} does not match'.format(i) - assert a[i]['scriptPubKey']['hex'] == b[i]['scriptPubKey'],( - 'scriptPubKey of ouput {} does not match'.format(i)) - - return True - - def print_info(fn,extra_desc): - if opt.names: - Msg_r('{} {} ({}){}'.format( - purple('Testing'), - cyan(name), - extra_desc, - '' if opt.quiet else '\n')) - else: - Msg_r('Testing transactions from {!r}'.format(fn)) - if not opt.quiet: Msg('') - - def test_core_vectors(): - self._get_core_repo_root() - fn = os.path.join(self.core_repo_root,'src/test/data/tx_valid.json') - data = json.loads(open(fn).read()) - print_info(fn,'Core test vectors') - n = 1 - for e in data: - if type(e[0]) == list: - test_tx(e[1],desc,n) - n += 1 - else: - desc = e[0] - Msg('OK') - - def test_mmgen_txs(): - fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'), - ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'), - ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx') ) - from mmgen.protocol import init_coin - from mmgen.tx import MMGenTX - print_info('test/ref/*rawtx','MMGen reference transactions') - for n,(coin,tn,fn) in enumerate(fns): - init_coin(coin,tn) - rpc_init(reinit=True) - test_tx(MMGenTX(fn).hex,fn,n+1) - init_coin('btc',False) - rpc_init(reinit=True) - Msg('OK') - - from mmgen.tx import DeserializedTX - import json - - test_mmgen_txs() - test_core_vectors() - - return True - - def subseed(self,name): - from mmgen.seed import Seed - from mmgen.obj import SubSeedIdxRange - - def basic_ops(): - msg_r('Testing basic ops...') - for a,b,c,d,e,f,h in ( - (8,'4710FBF0','0C1B0615','803B165C','2669AC64',256,'10L'), - (6,'9D07ABBD','EBA9C33F','20787E6A','192E2AA2',192,'10L'), - (4,'43670520','04A4CCB3','B5F21D7B','C1934CFF',128,'10L'), - ): - - seed_bin = bytes.fromhex('deadbeef' * a) - seed = Seed(seed_bin) - assert seed.sid == b, seed.sid - - subseed = seed.subseed('2s') - assert subseed.sid == c, subseed.sid - - subseed = seed.subseed('3') - assert subseed.sid == d, subseed.sid - - subseed = seed.subseed_by_seed_id(e) - assert subseed.length == f, subseed.length - assert subseed.sid == e, subseed.sid - assert subseed.idx == 10, subseed.idx - assert subseed.ss_idx == h, subseed.ss_idx - - seed2 = Seed(seed_bin) - s2s = seed2.subseeds['short'] - s2l = seed2.subseeds['long'] - - seed2.gen_subseeds(1) - assert len(s2s) == 1, len(s2s) - - seed2.gen_subseeds(1) # do nothing - seed2.gen_subseeds(2) # append one item - - seed2.gen_subseeds(5) - assert len(s2s) == 5, len(s2s) - - seed2.gen_subseeds(3) # do nothing - assert len(s2l) == 5, len(s2l) - - seed2.gen_subseeds(10) - assert len(s2s) == 10, len(s2s) - - assert seed.pformat() == seed2.pformat() - - s = seed.fmt_subseeds() - s_lines = s.strip().split('\n') - assert len(s_lines) == g.subseeds + 4, s - - a = seed.subseed('2L').sid - b = [e for e in s_lines if ' 2L:' in e][0].strip().split()[1] - assert a == b, b - - c = seed.subseed('2').sid - assert c == a, c - - a = seed.subseed('5S').sid - b = [e for e in s_lines if ' 5S:' in e][0].strip().split()[3] - assert a == b, b - - s = seed.fmt_subseeds(g.subseeds+1,g.subseeds+2) - s_lines = s.strip().split('\n') - assert len(s_lines) == 6, s - - ss_idx = str(g.subseeds+2) + 'S' - a = seed.subseed(ss_idx).sid - b = [e for e in s_lines if ' {}:'.format(ss_idx) in e][0].strip().split()[3] - assert a == b, b - - s = seed.fmt_subseeds(1,2) - s_lines = s.strip().split('\n') - assert len(s_lines) == 6, s - - msg('OK') - - def defaults_and_limits(): - msg_r('Testing defaults and limits...') - - seed_bin = bytes.fromhex('deadbeef' * 8) - seed = Seed(seed_bin) - seed.gen_subseeds() - ss = seed.subseeds - assert len(ss['short']) == g.subseeds, ss['short'] - assert len(ss['long']) == g.subseeds, ss['long'] - - seed = Seed(seed_bin) - seed.subseed_by_seed_id('EEEEEEEE') - ss = seed.subseeds - assert len(ss['short']) == g.subseeds, ss['short'] - assert len(ss['long']) == g.subseeds, ss['long'] - - seed = Seed(seed_bin) - subseed = seed.subseed_by_seed_id('803B165C') - assert subseed.sid == '803B165C', subseed.sid - assert subseed.idx == 3, subseed.idx - - seed = Seed(seed_bin) - subseed = seed.subseed_by_seed_id('803B165C',last_idx=1) - assert subseed == None, subseed - - r = SubSeedIdxRange('1-5') - r2 = SubSeedIdxRange(1,5) - assert r2 == r, r2 - assert r == (r.first,r.last), r - assert r.first == 1, r.first - assert r.last == 5, r.last - assert r.items == [1,2,3,4,5], r.items - assert list(r.iterate()) == r.items, list(r.iterate()) - - r = SubSeedIdxRange('22') - r2 = SubSeedIdxRange(22,22) - assert r2 == r, r2 - assert r == (r.first,r.last), r - assert r.first == 22, r.first - assert r.last == 22, r.last - assert r.items == [22], r - assert list(r.iterate()) == r.items, list(r.iterate()) - - r = SubSeedIdxRange('3-3') - assert r.items == [3], r.items - - r = SubSeedIdxRange('{}-{}'.format(g.subseeds-1,g.subseeds)) - assert r.items == [g.subseeds-1,g.subseeds], r.items - - for n,e in enumerate(SubSeedIdxRange('1-5').iterate(),1): - assert n == e, e - - assert n == 5, n - - msg('OK') - - def collisions(): - ss_count,ltr,last_sid,collisions_chk = ( - (SubSeedIdxRange.max_idx,'S','2788F26B',470), - (49509,'L','8D1FE500',2) - )[bool(opt.fast)] - - last_idx = str(ss_count) + ltr - - msg_r('Testing Seed ID collisions ({} subseed pairs)...'.format(ss_count)) - - seed_bin = bytes.fromhex('12abcdef' * 8) - seed = Seed(seed_bin) - - seed.gen_subseeds(ss_count) - ss = seed.subseeds - - assert seed.subseed(last_idx).sid == last_sid, seed.subseed(last_idx).sid - - for sid in ss['long']: - # msg(sid) - assert sid not in ss['short'] - - collisions = 0 - for k in ('short','long'): - for sid in ss[k]: - collisions += ss[k][sid][1] - - assert collisions == collisions_chk, collisions - msg_r('({} collisions) '.format(collisions)) - msg('OK') - - basic_ops() - defaults_and_limits() - collisions() - - return True - def exit_msg(): t = int(time.time()) - start_time gmsg('All requested tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60)) -def run_unit_test(test): - gmsg('Running unit test {}'.format(test)) - t = UnitTests() - return getattr(t,test)(test) +all_tests = [fn[3:-3] for fn in os.listdir(os.path.join(repo_root,'test','unit_tests_d')) if fn[:3] == 'ut_'] -all_tests = filter(lambda s: s[0] != '_',dir(UnitTests)) start_time = int(time.time()) if opt.list: @@ -349,9 +66,15 @@ try: for test in cmd_args: if test not in all_tests: die(1,"'{}': test not recognized".format(test)) + for test in (cmd_args or all_tests): - if not run_unit_test(test): - rdie(2,'Test failed') + exec('from test.unit_tests_d.ut_{m} import {m}'.format(m=test)) + gmsg('Running unit test {}'.format(test)) + t = globals()[test]() + if not t.run_test(test): + rdie(1,'Unit test {!r} failed'.format(test)) + exec('del {}'.format(test)) + exit_msg() except KeyboardInterrupt: die(1,green('\nExiting at user request')) diff --git a/test/unit_tests_d/__init__.py b/test/unit_tests_d/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/test/unit_tests_d/ut_subseed.py b/test/unit_tests_d/ut_subseed.py new file mode 100755 index 00000000..c9ed421c --- /dev/null +++ b/test/unit_tests_d/ut_subseed.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +test/unit_tests_d/ut_subseed: subseed unit test for the MMGen suite +""" + +from mmgen.common import * + +class subseed(object): + + def run_test(self,name): + from mmgen.seed import Seed + from mmgen.obj import SubSeedIdxRange + + def basic_ops(): + msg_r('Testing basic ops...') + for a,b,c,d,e,f,h in ( + (8,'4710FBF0','0C1B0615','803B165C','2669AC64',256,'10L'), + (6,'9D07ABBD','EBA9C33F','20787E6A','192E2AA2',192,'10L'), + (4,'43670520','04A4CCB3','B5F21D7B','C1934CFF',128,'10L'), + ): + + seed_bin = bytes.fromhex('deadbeef' * a) + seed = Seed(seed_bin) + assert seed.sid == b, seed.sid + + subseed = seed.subseed('2s') + assert subseed.sid == c, subseed.sid + + subseed = seed.subseed('3') + assert subseed.sid == d, subseed.sid + + subseed = seed.subseed_by_seed_id(e) + assert subseed.length == f, subseed.length + assert subseed.sid == e, subseed.sid + assert subseed.idx == 10, subseed.idx + assert subseed.ss_idx == h, subseed.ss_idx + + seed2 = Seed(seed_bin) + s2s = seed2.subseeds['short'] + s2l = seed2.subseeds['long'] + + seed2.gen_subseeds(1) + assert len(s2s) == 1, len(s2s) + + seed2.gen_subseeds(1) # do nothing + seed2.gen_subseeds(2) # append one item + + seed2.gen_subseeds(5) + assert len(s2s) == 5, len(s2s) + + seed2.gen_subseeds(3) # do nothing + assert len(s2l) == 5, len(s2l) + + seed2.gen_subseeds(10) + assert len(s2s) == 10, len(s2s) + + assert seed.pformat() == seed2.pformat() + + s = seed.fmt_subseeds() + s_lines = s.strip().split('\n') + assert len(s_lines) == g.subseeds + 4, s + + a = seed.subseed('2L').sid + b = [e for e in s_lines if ' 2L:' in e][0].strip().split()[1] + assert a == b, b + + c = seed.subseed('2').sid + assert c == a, c + + a = seed.subseed('5S').sid + b = [e for e in s_lines if ' 5S:' in e][0].strip().split()[3] + assert a == b, b + + s = seed.fmt_subseeds(g.subseeds+1,g.subseeds+2) + s_lines = s.strip().split('\n') + assert len(s_lines) == 6, s + + ss_idx = str(g.subseeds+2) + 'S' + a = seed.subseed(ss_idx).sid + b = [e for e in s_lines if ' {}:'.format(ss_idx) in e][0].strip().split()[3] + assert a == b, b + + s = seed.fmt_subseeds(1,2) + s_lines = s.strip().split('\n') + assert len(s_lines) == 6, s + + msg('OK') + + def defaults_and_limits(): + msg_r('Testing defaults and limits...') + + seed_bin = bytes.fromhex('deadbeef' * 8) + seed = Seed(seed_bin) + seed.gen_subseeds() + ss = seed.subseeds + assert len(ss['short']) == g.subseeds, ss['short'] + assert len(ss['long']) == g.subseeds, ss['long'] + + seed = Seed(seed_bin) + seed.subseed_by_seed_id('EEEEEEEE') + ss = seed.subseeds + assert len(ss['short']) == g.subseeds, ss['short'] + assert len(ss['long']) == g.subseeds, ss['long'] + + seed = Seed(seed_bin) + subseed = seed.subseed_by_seed_id('803B165C') + assert subseed.sid == '803B165C', subseed.sid + assert subseed.idx == 3, subseed.idx + + seed = Seed(seed_bin) + subseed = seed.subseed_by_seed_id('803B165C',last_idx=1) + assert subseed == None, subseed + + r = SubSeedIdxRange('1-5') + r2 = SubSeedIdxRange(1,5) + assert r2 == r, r2 + assert r == (r.first,r.last), r + assert r.first == 1, r.first + assert r.last == 5, r.last + assert r.items == [1,2,3,4,5], r.items + assert list(r.iterate()) == r.items, list(r.iterate()) + + r = SubSeedIdxRange('22') + r2 = SubSeedIdxRange(22,22) + assert r2 == r, r2 + assert r == (r.first,r.last), r + assert r.first == 22, r.first + assert r.last == 22, r.last + assert r.items == [22], r + assert list(r.iterate()) == r.items, list(r.iterate()) + + r = SubSeedIdxRange('3-3') + assert r.items == [3], r.items + + r = SubSeedIdxRange('{}-{}'.format(g.subseeds-1,g.subseeds)) + assert r.items == [g.subseeds-1,g.subseeds], r.items + + for n,e in enumerate(SubSeedIdxRange('1-5').iterate(),1): + assert n == e, e + + assert n == 5, n + + msg('OK') + + def collisions(): + ss_count,ltr,last_sid,collisions_chk = ( + (SubSeedIdxRange.max_idx,'S','2788F26B',470), + (49509,'L','8D1FE500',2) + )[bool(opt.fast)] + + last_idx = str(ss_count) + ltr + + msg_r('Testing Seed ID collisions ({} subseed pairs)...'.format(ss_count)) + + seed_bin = bytes.fromhex('12abcdef' * 8) + seed = Seed(seed_bin) + + seed.gen_subseeds(ss_count) + ss = seed.subseeds + + assert seed.subseed(last_idx).sid == last_sid, seed.subseed(last_idx).sid + + for sid in ss['long']: + # msg(sid) + assert sid not in ss['short'] + + collisions = 0 + for k in ('short','long'): + for sid in ss[k]: + collisions += ss[k][sid][1] + + assert collisions == collisions_chk, collisions + msg_r('({} collisions) '.format(collisions)) + msg('OK') + + basic_ops() + defaults_and_limits() + collisions() + + return True diff --git a/test/unit_tests_d/ut_tx_deserialize.py b/test/unit_tests_d/ut_tx_deserialize.py new file mode 100755 index 00000000..5e1c5448 --- /dev/null +++ b/test/unit_tests_d/ut_tx_deserialize.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +test/unit_tests_d/ut_tx_deserialize: TX deserialization unit test for the MMGen suite +""" + +import os +from mmgen.common import * + +class tx_deserialize(object): + + def _get_core_repo_root(self): + self.core_repo_root = os.getenv('CORE_REPO_ROOT') + if not self.core_repo_root: + die(1,'The environmental variable CORE_REPO_ROOT must be set before running this test') + + def run_test(self,name): + + def test_tx(txhex,desc,n): + + def has_nonstandard_outputs(outputs): + for o in outputs: + t = o['scriptPubKey']['type'] + if t in ('nonstandard','pubkey','nulldata'): + return True + return False + + d = g.rpch.decoderawtransaction(txhex) + + if has_nonstandard_outputs(d['vout']): return False + + dt = DeserializedTX(txhex) + + if opt.verbose: + Msg('\n====================================================') + Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc)) + if opt.verbose: + Pmsg(d) + Msg('----------------------------------------------------') + Pmsg(dt) + + # metadata + assert dt['txid'] == d['txid'],'TXID does not match' + assert dt['lock_time'] == d['locktime'],'Locktime does not match' + assert dt['version'] == d['version'],'Version does not match' + + # inputs + a,b = d['vin'],dt['txins'] + for i in range(len(a)): + assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i) + assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i) + assert a[i]['sequence'] == int(b[i]['nSeq'],16),( + 'nSeq of input {} does not match'.format(i)) + if 'txinwitness' in a[i]: + assert a[i]['txinwitness'] == b[i]['witness'],( + 'witness of input {} does not match'.format(i)) + + # outputs + a,b = d['vout'],dt['txouts'] + for i in range(len(a)): + assert a[i]['scriptPubKey']['addresses'][0] == b[i]['address'],( + 'address of ouput {} does not match'.format(i)) + assert a[i]['value'] == b[i]['amount'],'value of ouput {} does not match'.format(i) + assert a[i]['scriptPubKey']['hex'] == b[i]['scriptPubKey'],( + 'scriptPubKey of ouput {} does not match'.format(i)) + + return True + + def print_info(fn,extra_desc): + if opt.names: + Msg_r('{} {} ({}){}'.format( + purple('Testing'), + cyan(name), + extra_desc, + '' if opt.quiet else '\n')) + else: + Msg_r('Testing transactions from {!r}'.format(fn)) + if not opt.quiet: Msg('') + + def test_core_vectors(): + self._get_core_repo_root() + fn = os.path.join(self.core_repo_root,'src/test/data/tx_valid.json') + data = json.loads(open(fn).read()) + print_info(fn,'Core test vectors') + n = 1 + for e in data: + if type(e[0]) == list: + test_tx(e[1],desc,n) + n += 1 + else: + desc = e[0] + Msg('OK') + + def test_mmgen_txs(): + fns = ( ('btc',False,'test/ref/0B8D5A[15.31789,14,tl=1320969600].rawtx'), + ('btc',True,'test/ref/0C7115[15.86255,14,tl=1320969600].testnet.rawtx'), + ('bch',False,'test/ref/460D4D-BCH[10.19764,tl=1320969600].rawtx') ) + from mmgen.protocol import init_coin + from mmgen.tx import MMGenTX + print_info('test/ref/*rawtx','MMGen reference transactions') + for n,(coin,tn,fn) in enumerate(fns): + init_coin(coin,tn) + rpc_init(reinit=True) + test_tx(MMGenTX(fn).hex,fn,n+1) + init_coin('btc',False) + rpc_init(reinit=True) + Msg('OK') + + from mmgen.tx import DeserializedTX + import json + + test_mmgen_txs() + test_core_vectors() + + return True