From b23b497d773e583d5742ab13c8a24d440673fbb9 Mon Sep 17 00:00:00 2001 From: philemon Date: Mon, 14 Aug 2017 13:04:25 +0300 Subject: [PATCH] obj.py rewrite+test suite, Bob and Alice regtest mode, compressed addresses - basic data types in obj.py rewritten - new test suite 'test/objtest.py' for testing basic data types - new compressed address type with the 'C' identifier - Bob and Alice regtest mode for testing MMGen in a mock two-user environment * All MMGen commands are available in this mode. * Set up with 'mmgen-regtest setup'. Bob and Alice's wallets are funded with 500 BTC each. Use the --mixed switch to import mixed address types in Bob and Alice's tracking wallets. * Transact as Bob by adding --bob switch to MMGen commands * Transact as Alice by adding --alice switch to MMGen commands * After sending a transaction, mine a block to confirm it with 'mmgen-regtest generate' The bitcoin daemon is stopped and restarted automatically when switching between users. --- mmgen-regtest | 25 ++ mmgen/addr.py | 33 +- mmgen/bitcoin.py | 57 ++-- mmgen/globalvars.py | 8 +- mmgen/main_addrgen.py | 5 +- mmgen/main_addrimport.py | 11 +- mmgen/main_regtest.py | 65 ++++ mmgen/main_txsend.py | 2 +- mmgen/obj.py | 666 +++++++++++++++++++-------------------- mmgen/opts.py | 12 + mmgen/tool.py | 110 ++++--- mmgen/tx.py | 10 +- mmgen/txsign.py | 8 +- mmgen/util.py | 15 +- scripts/traceback.py | 5 +- setup.py | 3 + test/gentest.py | 3 - test/objtest.py | 249 +++++++++++++++ test/test.py | 25 +- test/tooltest.py | 2 +- 20 files changed, 833 insertions(+), 481 deletions(-) create mode 100755 mmgen-regtest create mode 100755 mmgen/main_regtest.py create mode 100755 test/objtest.py diff --git a/mmgen-regtest b/mmgen-regtest new file mode 100755 index 00000000..9766b5a9 --- /dev/null +++ b/mmgen-regtest @@ -0,0 +1,25 @@ +#!/usr/bin/env python + +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2017 Philemon +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program. If not, see . + +""" +mmgen-regtest: Bitcoind regression test mode setup and operations for the MMGen + suite +""" + +from mmgen.main import launch +launch("regtest") diff --git a/mmgen/addr.py b/mmgen/addr.py index bfc7d040..0fa0f861 100755 --- a/mmgen/addr.py +++ b/mmgen/addr.py @@ -61,7 +61,7 @@ class AddrGeneratorSegwit(MMGenObject): class KeyGenerator(MMGenObject): def __new__(cls,generator=None,silent=False): if cls.test_for_secp256k1(silent=silent) and generator != 1: - if opt.key_generator != 1: + if (not hasattr(opt,'key_generator')) or opt.key_generator == 2 or generator == 2: return super(cls,cls).__new__(KeyGeneratorSecp256k1) else: msg('Using (slow) native Python ECDSA library for address generation') @@ -182,7 +182,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file if seed and addr_idxs: # data from seed + idxs self.al_id,src = AddrListID(seed.sid,mmtype),'gen' - adata = self.generate(seed,addr_idxs,compressed=(mmtype=='S')) + adata = self.generate(seed,addr_idxs) elif addrfile: # data from MMGen address file adata = self.parse_file(addrfile) # sets self.al_id elif al_id and adata: # data from tracking wallet @@ -210,7 +210,6 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file if self.al_id == None: return self.id_str = AddrListIDStr(self) - if type(self) == KeyList: return if do_chksum: @@ -223,21 +222,18 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file qmsg(self.msgs[('check_chksum','record_chksum')[src=='gen']]) def update_msgs(self): - if type(self).msgs and type(self) != AddrList: - for k in AddrList.msgs: - if k not in self.msgs: - self.msgs[k] = AddrList.msgs[k] + self.msgs = AddrList.msgs + self.msgs.update(type(self).msgs) - def generate(self,seed,addrnums,compressed): + def generate(self,seed,addrnums): assert type(addrnums) is AddrIdxList - assert type(compressed) is bool seed = seed.get_data() seed = self.cook_seed(seed) if self.gen_addrs: kg = KeyGenerator() - ag = AddrGenerator(('p2pkh','segwit')[self.al_id.mmtype=='S']) + ag = AddrGenerator(self.al_id.mmtype.gen_method) t_addrs,num,pos,out = len(addrnums),0,0,AddrListList() le = self.entry_type @@ -256,7 +252,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file e = le(idx=num) # Secret key is double sha256 of seed hash round /num/ - e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),compressed) + e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),self.al_id.mmtype.compressed) if self.gen_addrs: e.addr = ag.to_addr(kg.to_pubhex(e.sec)) @@ -278,8 +274,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file def is_for_current_chain(self): return self.data[0].addr.is_for_current_chain() - def chk_addr_or_pw(self,addr): - return {'L':'p2pkh','S':'p2sh'}[self.al_id.mmtype] == is_btc_addr(addr).addr_fmt + def check_format(self,addr): return True # format is checked when added to list entry object def cook_seed(self,seed): if self.al_id.mmtype == 'L': @@ -386,7 +381,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file elif self.al_id.mmtype == 'L': out.append('{} {{'.format(self.al_id.sid)) else: - out.append('{} {} {{'.format(self.al_id.sid,MMGenAddrType.mmtypes[self.al_id.mmtype].upper())) + out.append('{} {} {{'.format(self.al_id.sid,self.al_id.mmtype.name.upper())) fs = ' {:<%s} {:<34}{}' % len(str(self.data[-1].idx)) for e in self.data: @@ -419,7 +414,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file if not is_mmgen_idx(d[0]): return "'%s': invalid address num. in line: '%s'" % (d[0],l) - if not self.chk_addr_or_pw(d[1]): + if not self.check_format(d[1]): return "'{}': invalid {}".format(d[1],self.data_desc) if len(d) != 3: d.append('') @@ -441,7 +436,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file if self.has_keys and keypress_confirm('Check key-to-address validity?'): kg = KeyGenerator() - ag = AddrGenerator(('p2pkh','segwit')[self.al_id.mmtype=='S']) + ag = AddrGenerator(self.al_id.mmtype.gen_method) llen = len(ret) for n,e in enumerate(ret): msg_r('\rVerifying keys %s/%s' % (n+1,llen)) @@ -489,7 +484,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file mmtype = MMGenAddrType(mmtype) except: return do_error(u"'{}': invalid address type in address file. Must be one of: {}".format( - mmtype.upper(),' '.join(MMGenAddrType.mmtypes.values()).upper())) + mmtype.upper(),' '.join([i['name'].upper() for i in MMGenAddrType.mmtypes.values()]))) elif len(ls) == 0: mmtype = MMGenAddrType('L') else: @@ -582,7 +577,7 @@ Record this checksum: it will be used to verify the password file in the future self.set_pw_len(pw_len) if chk_params_only: return self.al_id = AddrListID(seed.sid,MMGenPasswordType('P')) - self.data = self.generate(seed,pw_idxs,compressed=False) + self.data = self.generate(seed,pw_idxs) self.num_addrs = len(self.data) self.fmt_data = '' @@ -632,7 +627,7 @@ Record this checksum: it will be used to verify the password file in the future # we take least significant part return ''.join(baseconv.fromhex(hex_sec,self.pw_fmt,pad=self.pw_len))[-self.pw_len:] - def chk_addr_or_pw(self,pw): + def check_format(self,pw): if not (is_b58_str,is_b32_str)[self.pw_fmt=='b32'](pw): msg('Password is not a valid {} string'.format(self.pw_fmt)) return False diff --git a/mmgen/bitcoin.py b/mmgen/bitcoin.py index e05f6ef7..39fcdad2 100755 --- a/mmgen/bitcoin.py +++ b/mmgen/bitcoin.py @@ -60,8 +60,6 @@ def _b58tonum(b58num): if not i in _b58a: return False return sum(_b58a.index(n) * (58**i) for i,n in enumerate(list(b58num[::-1]))) -from mmgen.globalvars import g - def hash160(hexnum): # take hex, return hex - OP_HASH160 return hashlib_new('ripemd160',sha256(unhexlify(hexnum)).digest()).hexdigest() @@ -69,56 +67,57 @@ def hash256(hexnum): # take hex, return hex - OP_HASH256 return sha256(sha256(unhexlify(hexnum)).digest()).hexdigest() # devdoc/ref_transactions.md: -btc_ver_nums = { - 'p2pkh': (('00','1'),('6f','mn')), - 'p2sh': (('05','3'),('c4','2')) +btc_addr_ver_nums = { + 'p2pkh': { 'mainnet': ('00','1'), 'testnet': ('6f','mn') }, + 'p2sh': { 'mainnet': ('05','3'), 'testnet': ('c4','2') } } -addr_pfxs = { 'mainnet': '13', 'testnet': 'mn2', 'regtest': 'mn2' } -vnum_all = tuple([k for k,v in btc_ver_nums['p2pkh'] + btc_ver_nums['p2sh']]) +btc_addr_pfxs = { 'mainnet': '13', 'testnet': 'mn2', 'regtest': 'mn2' } +btc_uncompressed_wif_pfxs = { 'mainnet':'5','testnet':'9' } +btc_privkey_pfxs = { 'mainnet':'80','testnet':'ef' } -def hexaddr2addr(hexaddr,p2sh=False): - s = vnum_all[g.testnet+(2*p2sh)] + hexaddr.strip() - lzeroes = (len(s) - len(s.lstrip('0'))) / 2 - return ('1' * lzeroes) + _numtob58(int(s+hash256(s)[:8],16)) +from mmgen.globalvars import g -def verify_addr(addr,verbose=False,return_hex=False,return_type=False): - addr = addr.strip() - - for k in ('p2pkh','p2sh'): - for ver_num,ldigit in btc_ver_nums[k]: +def verify_addr(addr,verbose=False,return_dict=False,testnet=None): + testnet = testnet if testnet != None else g.testnet # allow override + for addr_fmt in ('p2pkh','p2sh'): + for net in ('mainnet','testnet'): + ver_num,ldigit = btc_addr_ver_nums[addr_fmt][net] if addr[0] not in ldigit: continue num = _b58tonum(addr) if num == False: break addr_hex = '{:050x}'.format(num) if addr_hex[:2] != ver_num: continue if hash256(addr_hex[:42])[:8] == addr_hex[42:]: - return addr_hex[2:42] if return_hex else k if return_type else True + return {'hex':addr_hex[2:42],'format':addr_fmt,'net':net} if return_dict else True else: - if verbose: Msg("Invalid checksum in address '%s'" % addr) + if verbose: Msg("Invalid checksum in address '{}'".format(addr)) break - if verbose: Msg("Invalid address '%s'" % addr) + if verbose: Msg("Invalid address '{}'".format(addr)) return False -# Compressed address support: +def hexaddr2addr(hexaddr,p2sh=False,testnet=None): + testnet = testnet if testnet != None else g.testnet # allow override + s = btc_addr_ver_nums[('p2pkh','p2sh')[p2sh]][('mainnet','testnet')[testnet]][0] + hexaddr + lzeroes = (len(s) - len(s.lstrip('0'))) / 2 + return ('1' * lzeroes) + _numtob58(int(s+hash256(s)[:8],16)) -def wif_is_compressed(wif): return wif[0] != ('5','9')[g.testnet] - -def wif2hex(wif): - wif = wif.strip() - compressed = wif_is_compressed(wif) +def wif2hex(wif,testnet=None): + testnet = testnet if testnet != None else g.testnet # allow override num = _b58tonum(wif) if num == False: return False key = '{:x}'.format(num) + compressed = wif[0] != btc_uncompressed_wif_pfxs[('mainnet','testnet')[testnet]] klen = (66,68)[bool(compressed)] if compressed and key[66:68] != '01': return False - if (key[:2] == ('80','ef')[g.testnet] and key[klen:] == hash256(key[:klen])[:8]): - return key[2:66] + if (key[:2] == btc_privkey_pfxs[('mainnet','testnet')[testnet]] and key[klen:] == hash256(key[:klen])[:8]): + return {'hex':key[2:66],'compressed':compressed,'testnet':testnet} else: return False -def hex2wif(hexpriv,compressed=False): - s = ('80','ef')[g.testnet] + hexpriv.strip() + ('','01')[bool(compressed)] +def hex2wif(hexpriv,compressed=False,testnet=None): + testnet = testnet if testnet != None else g.testnet # allow override + s = btc_privkey_pfxs[('mainnet','testnet')[testnet]] + hexpriv + ('','01')[bool(compressed)] return _numtob58(int(s+hash256(s)[:8],16)) # devdoc/guide_wallets.md: diff --git a/mmgen/globalvars.py b/mmgen/globalvars.py index 359853b1..f11d08f4 100755 --- a/mmgen/globalvars.py +++ b/mmgen/globalvars.py @@ -86,6 +86,9 @@ class g(object): rpc_password = '' testnet_name = 'testnet3' + bob = False + alice = False + # test suite: bogus_wallet_data = '' traceback_cmd = 'scripts/traceback.py' @@ -110,14 +113,15 @@ class g(object): # User opt sets global var: common_opts = ( 'color','no_license','rpc_host','rpc_port','testnet','rpc_user','rpc_password', - 'bitcoin_data_dir','force_256_color','regtest','coin' + 'bitcoin_data_dir','force_256_color','regtest','coin','bob','alice' ) required_opts = ( 'quiet','verbose','debug','outdir','echo_passphrase','passwd_file','stdout', 'show_hash_presets','label','keep_passphrase','keep_hash_preset','yes', - 'brain_params','b16','usr_randchars','coin' + 'brain_params','b16','usr_randchars','coin','bob','alice' ) incompatible_opts = ( + ('bob','alice'), ('quiet','verbose'), ('label','keep_label'), ('tx_id','info'), diff --git a/mmgen/main_addrgen.py b/mmgen/main_addrgen.py index 76e4cf68..13be495c 100755 --- a/mmgen/main_addrgen.py +++ b/mmgen/main_addrgen.py @@ -80,7 +80,7 @@ opts_data = lambda: { kgs=' '.join(['{}:{}'.format(n,k) for n,k in enumerate(g.key_generators,1)]), kg=g.key_generator, what=gen_what,g=g, - dmat="'{}' or '{}'".format(MAT.dfl_mmtype,MAT.mmtypes[MAT.dfl_mmtype]) + dmat="'{}' or '{}'".format(MAT.dfl_mmtype,MAT.mmtypes[MAT.dfl_mmtype]['name']) ), 'notes': """ @@ -95,6 +95,7 @@ range(s). ADDRESS TYPES: {n_at} + NOTES FOR ALL GENERATOR COMMANDS {pwn} @@ -106,7 +107,7 @@ FMT CODES: """.format( n_secp=note_secp256k1,n_addrkey=note_addrkey,pwn=pw_note,bwn=bw_note, f='\n '.join(SeedSource.format_fmt_codes().splitlines()), - n_at='\n '.join(["'{}', '{}'".format(k,v) for k,v in MAT.mmtypes.items()]), + n_at='\n '.join(["'{}','{:<12} - {}".format(k,v['name']+"'",v['desc']) for k,v in MAT.mmtypes.items()]), o=opts ) } diff --git a/mmgen/main_addrimport.py b/mmgen/main_addrimport.py index 81381347..cfca5915 100755 --- a/mmgen/main_addrimport.py +++ b/mmgen/main_addrimport.py @@ -64,16 +64,7 @@ def import_mmgen_list(infile): return al def import_flat_list(lines): - al = AddrList(addrlist=lines) - from mmgen.bitcoin import verify_addr - qmsg_r('Validating addresses...') - for e in al.data: - if not verify_addr(e.addr,verbose=True): - die(2,'\n%s: invalid address' % e.addr) - if e.addr.addr_fmt == 'p2sh': - fs = "\n'{}':\n Non-{} P2SH addresses may not be imported into the tracking wallet" - rdie(2,fs.format(e.addr,g.proj_name)) - return al + return AddrList(addrlist=lines) if len(cmd_args) == 1: infile = cmd_args[0] diff --git a/mmgen/main_regtest.py b/mmgen/main_regtest.py new file mode 100755 index 00000000..b7b2b72f --- /dev/null +++ b/mmgen/main_regtest.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2017 Philemon +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +mmgen-regtest: Bitcoind regression test mode setup and operations for the MMGen + suite +""" + +from mmgen.common import * +opts_data = lambda: { + 'desc': 'Bitcoind regression test mode setup and operations for the {} suite'.format(g.proj_name), + 'usage': '[opts] ', + 'sets': ( ('yes', True, 'quiet', True), ), + 'options': """ +-h, --help Print this help message +-m, --mixed Create Bob and Alice's wallets with mixed address types +--, --longhelp Print help message for long options (common options) +-q, --quiet Produce quieter output +-v, --verbose Produce more verbose output +""", + 'notes': """ + + + AVAILABLE COMMANDS + + setup - setup up system for regtest operation with MMGen + stop - stop the regtest bitcoind + bob - switch to Bob's wallet, starting daemon if necessary + alice - switch to Alice's wallet, starting daemon if necessary + user - show current user + generate - mine a block + test_daemon - test whether daemon is running + get_balances - get balances of Bob and Alice + """ +} + +cmd_args = opts.init(opts_data) + +if len(cmd_args) != 1: + opts.usage() + +cmds = ('setup','stop','generate','test_daemon','create_data_dir','bob','alice','user', + 'wait_for_daemon','wait_for_exit','get_current_user','get_balances') + +if cmd_args[0] not in cmds: + opts.usage() + +from mmgen.regtest import * + +globals()[cmd_args[0]]() diff --git a/mmgen/main_txsend.py b/mmgen/main_txsend.py index 35a1778a..d6c1bdbc 100755 --- a/mmgen/main_txsend.py +++ b/mmgen/main_txsend.py @@ -48,7 +48,7 @@ if not opt.status: do_license_msg() c = bitcoin_connection() tx = MMGenTX(infile) # sig check performed here -qmsg("Signed transaction file '%s' is valid" % infile) +vmsg("Signed transaction file '%s' is valid" % infile) if not tx.marked_signed(c): die(1,'Transaction is not signed!') diff --git a/mmgen/obj.py b/mmgen/obj.py index 524a8da0..7ab547c5 100755 --- a/mmgen/obj.py +++ b/mmgen/obj.py @@ -23,6 +23,7 @@ obj.py: MMGen native classes import sys from decimal import * from mmgen.color import * +from string import hexdigits,ascii_letters,digits def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent') def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent') @@ -38,7 +39,6 @@ class MMGenObject(object): def pmsg(self): print(self.pformat()) def pdie(self): print(self.pformat()); sys.exit(0) def pformat(self,lvl=0): - from decimal import Decimal scalars = (str,unicode,int,float,Decimal) def do_list(out,e,lvl=0,is_dict=False): out.append('\n') @@ -100,80 +100,7 @@ class MMGenObject(object): class MMGenList(list,MMGenObject): pass class MMGenDict(dict,MMGenObject): pass - -# for attrs that are always present in the data instance -# reassignment and deletion forbidden -class MMGenImmutableAttr(object): # Descriptor - - def __init__(self,name,dtype,typeconv=True): - self.typeconv = typeconv - assert type(dtype) in (str,type) - self.name = name - self.dtype = dtype - - def __get__(self,instance,owner): - return instance.__dict__[self.name] - - # forbid all reassignment - def set_attr_ok(self,instance): - return not hasattr(instance,self.name) - - def __set__(self,instance,value): - if not self.set_attr_ok(instance): - m = "Attribute '{}' of {} instance cannot be reassigned" - raise AttributeError(m.format(self.name,type(instance))) - if self.typeconv: # convert type - instance.__dict__[self.name] = \ - globals()[self.dtype](value) if type(self.dtype) == str else self.dtype(value) - else: # check type - if type(value) != self.dtype: - m = "Attribute '{}' of {} instance must of type {}" - raise TypeError(m.format(self.name,type(instance),self.dtype)) - instance.__dict__[self.name] = value - - def __delete__(self,instance): - m = "Atribute '{}' of {} instance cannot be deleted" - raise AttributeError(m.format(self.name,type(instance))) - -# for attrs that might not be present in the data instance -# reassignment or deletion allowed if specified -class MMGenListItemAttr(MMGenImmutableAttr): - - def __init__(self,name,dtype,typeconv=True,reassign_ok=False,delete_ok=False): - self.reassign_ok = reassign_ok - self.delete_ok = delete_ok - MMGenImmutableAttr.__init__(self,name,dtype,typeconv=typeconv) - - # return None if attribute doesn't exist - def __get__(self,instance,owner): - try: return instance.__dict__[self.name] - except: return None - - def set_attr_ok(self,instance): - return getattr(instance,self.name) == None or self.reassign_ok - - def __delete__(self,instance): - if self.delete_ok: - if self.name in instance.__dict__: - del instance.__dict__[self.name] - else: - MMGenImmutableAttr.__delete__(self,instance) - -class MMGenListItem(MMGenObject): - - def __init__(self,*args,**kwargs): - if args: - raise ValueError, 'Non-keyword args not allowed' - for k in kwargs: - if kwargs[k] != None: - setattr(self,k,kwargs[k]) - - # prevent setting random attributes - def __setattr__(self,name,value): - if name not in type(self).__dict__: - m = "'{}': no such attribute in class {}" - raise AttributeError(m.format(name,type(self))) - return object.__setattr__(self,name,value) +class AddrListList(list,MMGenObject): pass class InitErrors(object): @@ -185,69 +112,13 @@ class InitErrors(object): def init_fail(m,on_fail,silent=False): if silent: m = '' from mmgen.util import die,msg - if on_fail == 'die': die(1,m) + if on_fail == 'die': die(1,m) elif on_fail == 'return': if m: msg(m) return None # TODO: change to False elif on_fail == 'silent': return None # same here elif on_fail == 'raise': raise ValueError,m -class AddrIdx(int,InitErrors): - - max_digits = 7 - - def __new__(cls,num,on_fail='die'): - cls.arg_chk(cls,on_fail) - try: - assert type(num) is not float - me = int.__new__(cls,num) - except: - m = "'%s': value cannot be converted to address index" % num - else: - if len(str(me)) > cls.max_digits: - m = "'%s': too many digits in addr idx" % num - elif me < 1: - m = "'%s': addr idx cannot be less than one" % num - else: - return me - - return cls.init_fail(m,on_fail) - -class AddrIdxList(list,InitErrors,MMGenObject): - - max_len = 1000000 - - def __init__(self,fmt_str=None,idx_list=None,on_fail='die',sep=','): - self.arg_chk(type(self),on_fail) - assert fmt_str or idx_list - if idx_list: - # dies on failure - return list.__init__(self,sorted(set(AddrIdx(i) for i in idx_list))) - elif fmt_str: - desc = fmt_str - ret,fs = [],"'%s': value cannot be converted to address index" - from mmgen.util import msg - for i in (fmt_str.split(sep)): - j = i.split('-') - if len(j) == 1: - idx = AddrIdx(i,on_fail='return') - if not idx: break - ret.append(idx) - elif len(j) == 2: - beg = AddrIdx(j[0],on_fail='return') - if not beg: break - end = AddrIdx(j[1],on_fail='return') - if not beg: break - if end < beg: - msg(fs % "%s-%s (invalid range)" % (beg,end)); break - ret.extend([AddrIdx(x) for x in range(beg,end+1)]) - else: - msg((fs % i) + ' list'); break - else: - return list.__init__(self,sorted(set(ret))) # fell off end of loop - success - - return self.init_fail((fs + ' list') % desc,on_fail) - class Hilite(object): color = 'red' @@ -292,29 +163,142 @@ class Hilite(object): k = color if type(color) is str else cls.color # hack: override color with str value return globals()[k](s) if (color or cls.color_always) else s +# For attrs that are always present in the data instance +# Reassignment and deletion forbidden +class MMGenImmutableAttr(object): # Descriptor + + def __init__(self,name,dtype,typeconv=True): + self.typeconv = typeconv + assert type(dtype) in (str,type) + self.name = name + self.dtype = dtype + + def __get__(self,instance,owner): + return instance.__dict__[self.name] + + # forbid all reassignment + def set_attr_ok(self,instance): + return not hasattr(instance,self.name) + + def __set__(self,instance,value): + if not self.set_attr_ok(instance): + m = "Attribute '{}' of {} instance cannot be reassigned" + raise AttributeError(m.format(self.name,type(instance))) + if self.typeconv: # convert type + instance.__dict__[self.name] = \ + globals()[self.dtype](value,on_fail='raise') if type(self.dtype) == str else self.dtype(value) + else: # check type + if type(value) != self.dtype: + m = "Attribute '{}' of {} instance must of type {}" + raise TypeError(m.format(self.name,type(instance),self.dtype)) + instance.__dict__[self.name] = value + + def __delete__(self,instance): + m = "Atribute '{}' of {} instance cannot be deleted" + raise AttributeError(m.format(self.name,type(instance))) + +# For attrs that might not be present in the data instance +# Reassignment or deletion allowed if specified +class MMGenListItemAttr(MMGenImmutableAttr): # Descriptor + + def __init__(self,name,dtype,typeconv=True,reassign_ok=False,delete_ok=False): + self.reassign_ok = reassign_ok + self.delete_ok = delete_ok + MMGenImmutableAttr.__init__(self,name,dtype,typeconv=typeconv) + + # return None if attribute doesn't exist + def __get__(self,instance,owner): + try: return instance.__dict__[self.name] + except: return None + + def set_attr_ok(self,instance): + return getattr(instance,self.name) == None or self.reassign_ok + + def __delete__(self,instance): + if self.delete_ok: + if self.name in instance.__dict__: + del instance.__dict__[self.name] + else: + MMGenImmutableAttr.__delete__(self,instance) + +class MMGenListItem(MMGenObject): + + def __init__(self,*args,**kwargs): + if args: + raise ValueError, 'Non-keyword args not allowed' + for k in kwargs: + if kwargs[k] != None: + setattr(self,k,kwargs[k]) + + # prevent setting random attributes + def __setattr__(self,name,value): + if name not in type(self).__dict__: + m = "'{}': no such attribute in class {}" + raise AttributeError(m.format(name,type(self))) + return object.__setattr__(self,name,value) + +class AddrIdx(int,InitErrors): + max_digits = 7 + def __new__(cls,num,on_fail='die'): + cls.arg_chk(cls,on_fail) + try: + assert type(num) is not float,'is float' + me = int.__new__(cls,num) + assert len(str(me)) <= cls.max_digits,'is more than {} digits'.format(cls.max_digits) + assert me > 0,'is less than one' + return me + except Exception as e: + m = "{!r}: value cannot be converted to address index ({})" + return cls.init_fail(m.format(num,e[0]),on_fail) + +class AddrIdxList(list,InitErrors,MMGenObject): + max_len = 1000000 + def __init__(self,fmt_str=None,idx_list=None,on_fail='die',sep=','): + self.arg_chk(type(self),on_fail) + try: + if idx_list: + return list.__init__(self,sorted(set(AddrIdx(i,on_fail='raise') for i in idx_list))) + elif fmt_str: + ret = [] + for i in (fmt_str.split(sep)): + j = i.split('-') + if len(j) == 1: + idx = AddrIdx(i,on_fail='raise') + if not idx: break + ret.append(idx) + elif len(j) == 2: + beg = AddrIdx(j[0],on_fail='raise') + if not beg: break + end = AddrIdx(j[1],on_fail='raise') + if not beg: break + if end < beg: break + ret.extend([AddrIdx(x,on_fail='raise') for x in range(beg,end+1)]) + else: break + else: + return list.__init__(self,sorted(set(ret))) # fell off end of loop - success + raise ValueError,"{!r}: invalid range".format(i) + except Exception as e: + m = "{!r}: value cannot be converted to AddrIdxList ({})" + return type(self).init_fail(m.format(idx_list or fmt_str,e[0]),on_fail) + class BTCAmt(Decimal,Hilite,InitErrors): color = 'yellow' max_prec = 8 max_amt = 21000000 - def __new__(cls,num,on_fail='die'): + if type(num) == cls: return num cls.arg_chk(cls,on_fail) try: + assert type(num) is not float,'number is floating-point' + assert type(num) is not long,'number is a long integer' me = Decimal.__new__(cls,str(num)) - except: - m = "'%s': value cannot be converted to decimal" % num - else: - if me.normalize().as_tuple()[-1] < -cls.max_prec: - from mmgen.globalvars import g - m = "'{}': too many decimal places in {} amount".format(num,g.coin) - elif me > cls.max_amt: - from mmgen.globalvars import g - m = "'{}': {} amount too large (>{})".format(num,g.coin,cls.max_amt) -# elif me.as_tuple()[0]: -# m = "'%s': BTC amount cannot be negative" % num - else: - return me - return cls.init_fail(m,on_fail) + assert me.normalize().as_tuple()[-1] >= -cls.max_prec,'too many decimal places in coin amount' + assert me <= cls.max_amt,'coin amount too large (>{})'.format(cls.max_amt) + assert me >= 0,'coin amount cannot be negative' + return me + except Exception as e: + m = "{!r}: value cannot be converted to BTCAmt ({})" + return cls.init_fail(m.format(num,e[0]),on_fail) @classmethod def fmtc(cls): @@ -367,16 +351,21 @@ class BTCAddr(str,Hilite,InitErrors,MMGenObject): color = 'cyan' width = 35 # max len of testnet p2sh addr def __new__(cls,s,on_fail='die'): + if type(s) == cls: return s cls.arg_chk(cls,on_fail) - m = "'%s': value is not a Bitcoin address" % s - me = str.__new__(cls,s) - from mmgen.bitcoin import verify_addr,addr_pfxs - if type(s) in (str,unicode,BTCAddr): - me.addr_fmt = verify_addr(s,return_type=True) - me.testnet = s[0] in addr_pfxs['testnet'] - if me.addr_fmt: - return me - return cls.init_fail(m,on_fail) + try: + assert set(s) <= set(ascii_letters+digits),'contains non-ascii characters' + me = str.__new__(cls,s) + from mmgen.bitcoin import verify_addr + va = verify_addr(s,return_dict=True) + assert va,'failed verification' + me.addr_fmt = va['format'] + me.hex = va['hex'] + me.testnet = va['net'] == 'testnet' + return me + except Exception as e: + m = "{!r}: value cannot be converted to Bitcoin address ({})" + return cls.init_fail(m.format(s,e[0]),on_fail) @classmethod def fmtc(cls,s,**kwargs): @@ -390,13 +379,13 @@ class BTCAddr(str,Hilite,InitErrors,MMGenObject): def is_for_current_chain(self): from mmgen.globalvars import g - assert g.chain, 'global chain variable unset' - from bitcoin import addr_pfxs - return self[0] in addr_pfxs[g.chain] + assert g.chain,'global chain variable unset' + from bitcoin import btc_addr_pfxs + return self[0] in btc_addr_pfxs[g.chain] def is_mainnet(self): - from bitcoin import addr_pfxs - return self[0] in addr_pfxs['mainnet'] + from bitcoin import btc_addr_pfxs + return self[0] in btc_addr_pfxs['mainnet'] def is_in_tracking_wallet(self): from mmgen.rpc import bitcoin_connection @@ -408,122 +397,118 @@ class SeedID(str,Hilite,InitErrors): width = 8 trunc_ok = False def __new__(cls,seed=None,sid=None,on_fail='die'): + if type(sid) == cls: return sid cls.arg_chk(cls,on_fail) - assert seed or sid - if seed: - from mmgen.seed import Seed - from mmgen.util import make_chksum_8 - if type(seed) == Seed: + try: + if seed: + from mmgen.seed import Seed + assert type(seed) == Seed,'not a Seed instance' + from mmgen.util import make_chksum_8 return str.__new__(cls,make_chksum_8(seed.get_data())) - elif sid: - sid = str(sid) - from string import hexdigits - if len(sid) == cls.width and set(sid) <= set(hexdigits.upper()): + elif sid: + assert set(sid) <= set(hexdigits.upper()),'not uppercase hex digits' + assert len(sid) == cls.width,'not {} characters wide'.format(cls.width) return str.__new__(cls,sid) - - m = "'%s': value cannot be converted to SeedID" % str(seed or sid) - return cls.init_fail(m,on_fail) + raise ValueError,'no arguments provided' + except Exception as e: + m = "{!r}: value cannot be converted to SeedID ({})" + return cls.init_fail(m.format(seed or sid,e[0]),on_fail) class MMGenID(str,Hilite,InitErrors,MMGenObject): - color = 'orange' width = 0 trunc_ok = False - def __new__(cls,s,on_fail='die'): cls.arg_chk(cls,on_fail) - s = str(s) try: - ss = s.split(':') - assert len(ss) in (2,3) - sid = SeedID(sid=ss[0],on_fail='silent') - assert sid - idx = AddrIdx(ss[-1],on_fail='silent') - assert idx - t = MMGenAddrType((MMGenAddrType.dfl_mmtype,ss[1])[len(ss) != 2],on_fail='silent') - assert t - me = str.__new__(cls,'{}:{}:{}'.format(sid,t,idx)) - me.sid = sid + ss = str(s).split(':') + assert len(ss) in (2,3),'not 2 or 3 colon-separated items' + t = MMGenAddrType((ss[1],MMGenAddrType.dfl_mmtype)[len(ss)==2],on_fail='raise') + me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1])) + me.sid = SeedID(sid=ss[0],on_fail='raise') + me.idx = AddrIdx(ss[-1],on_fail='raise') me.mmtype = t - me.idx = idx - me.al_id = AddrListID(sid,me.mmtype) # key with colon! - assert me.al_id - me.sort_key = '{}:{}:{:0{w}}'.format(sid,t,idx,w=idx.max_digits) + me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done + me.sort_key = '{}:{}:{:0{w}}'.format(me.sid,me.mmtype,me.idx,w=me.idx.max_digits) return me - except: - m = "'%s': value cannot be converted to MMGenID" % s - return cls.init_fail(m,on_fail) + except Exception as e: + m = "{}\n{!r}: value cannot be converted to MMGenID" + return cls.init_fail(m.format(e[0],s),on_fail) class TwMMGenID(str,Hilite,InitErrors,MMGenObject): - color = 'orange' width = 0 trunc_ok = False - def __new__(cls,s,on_fail='die'): + if type(s) == cls: return s cls.arg_chk(cls,on_fail) - obj,sort_key = None,None + ret = None try: - obj = MMGenID(s,on_fail='silent') - sort_key,t = obj.sort_key,'mmgen' - except: + ret = MMGenID(s,on_fail='raise') + sort_key,idtype = ret.sort_key,'mmgen' + except Exception as e: try: - assert len(s) > 4 and s[:4] == 'btc:' - obj,sort_key,t = str(s),'z_'+s,'non-mmgen' - except: - pass + assert s[:4] == 'btc:',"not a string beginning with the prefix 'btc:'" + assert set(s[4:]) <= set(ascii_letters+digits),'contains non-ascii characters' + assert len(s) > 4,'not more that four characters long' + ret,sort_key,idtype = str(s),'z_'+s,'non-mmgen' + except Exception as f: + m = "{}\nValue is {}\n{!r}: value cannot be converted to TwMMGenID" + return cls.init_fail(m.format(e[0],f[0],s),on_fail) - if obj and sort_key: - me = str.__new__(cls,obj) - me.obj = obj - me.sort_key = sort_key - me.type = t - return me - - m = "'{}': value cannot be converted to {}".format(s,cls.__name__) - return cls.init_fail(m,on_fail) + me = str.__new__(cls,ret) + me.obj = ret + me.sort_key = sort_key + me.type = idtype + return me # contains TwMMGenID,TwComment. Not for display class TwLabel(str,InitErrors,MMGenObject): - def __new__(cls,s,on_fail='die'): + if type(s) == cls: return s cls.arg_chk(cls,on_fail) try: ss = s.split(None,1) - me = str.__new__(cls,s) - me.mmid = TwMMGenID(ss[0],on_fail='silent') - assert me.mmid - me.comment = TwComment(ss[1] if len(ss) == 2 else '',on_fail='silent') - assert me.comment != None + mmid = TwMMGenID(ss[0],on_fail='raise') + comment = TwComment(ss[1] if len(ss) == 2 else '',on_fail='raise') + me = str.__new__(cls,'{}{}'.format(mmid,' {}'.format(comment) if comment else '')) + me.mmid = mmid + me.comment = comment return me - except: - m = "'{}': value cannot be converted to {}".format(s,cls.__name__) - return cls.init_fail(m,on_fail) + except Exception as e: + m = u"{}\n{!r}: value cannot be converted to TwLabel" + return cls.init_fail(m.format(e[0],s),on_fail) class HexStr(str,Hilite,InitErrors): color = 'red' trunc_ok = False def __new__(cls,s,on_fail='die',case='lower'): + if type(s) == cls: return s assert case in ('upper','lower') cls.arg_chk(cls,on_fail) - from string import hexdigits - if set(s) <= set(getattr(hexdigits,case)()) and not len(s) % 2: + try: + assert type(s) in (str,unicode,bytes),'not a string' + assert set(s) <= set(getattr(hexdigits,case)()),'not {}case hexadecimal symbols'.format(case) + assert not len(s) % 2,'odd-length string' return str.__new__(cls,s) - m = "'{}': value cannot be converted to {}".format(s,cls.__name__) - return cls.init_fail(m,on_fail) + except Exception as e: + m = "{!r}: value cannot be converted to {} (value is {})" + return cls.init_fail(m.format(s,cls.__name__,e[0]),on_fail) -class MMGenTxID(str,Hilite,InitErrors): +class MMGenTxID(HexStr,Hilite,InitErrors): color = 'red' width = 6 trunc_ok = False hexcase = 'upper' def __new__(cls,s,on_fail='die'): cls.arg_chk(cls,on_fail) - from string import hexdigits - if len(s) == cls.width and set(s) <= set(getattr(hexdigits,cls.hexcase)()): - return str.__new__(cls,s) - m = "'{}': value cannot be converted to {}".format(s,cls.__name__) - return cls.init_fail(m,on_fail) + try: + ret = HexStr.__new__(cls,s,case=cls.hexcase,on_fail='raise') + assert len(s) == cls.width,'Value is not {} characters wide'.format(cls.width) + return ret + except Exception as e: + m = "{}\n{!r}: value cannot be converted to {}" + return cls.init_fail(m.format(e[0],s,cls.__name__),on_fail) class BitcoinTxID(MMGenTxID): color = 'purple' @@ -533,34 +518,29 @@ class BitcoinTxID(MMGenTxID): class WifKey(str,Hilite,InitErrors): width = 53 color = 'blue' - desc = 'WIF key' - def __new__(cls,s,on_fail='die',errmsg=None): + def __new__(cls,s,on_fail='die',testnet=None): # fall back to g.testnet + if type(s) == cls: return s cls.arg_chk(cls,on_fail) - from mmgen.bitcoin import wif2hex - if wif2hex(s): - me = str.__new__(cls,s) - return me - m = errmsg or "'{}': invalid value for {}".format(s,cls.desc) - return cls.init_fail(m,on_fail) + try: + assert set(s) <= set(ascii_letters+digits),'not an ascii string' + from mmgen.bitcoin import wif2hex + if wif2hex(s,testnet=testnet): + return str.__new__(cls,s) + raise ValueError,'failed verification' + except Exception as e: + m = '{!r}: invalid value for WIF key ({})'.format(s,e[0]) + return cls.init_fail(m,on_fail) -class HexStr(str,Hilite,InitErrors): - color = 'red' - trunc_ok = False - def __new__(cls,s,on_fail='die',case='lower'): - assert case in ('upper','lower') - cls.arg_chk(cls,on_fail) - from string import hexdigits - if set(s) <= set(getattr(hexdigits,case)()) and not len(s) % 2: - return str.__new__(cls,s) - m = "'{}': value cannot be converted to {}".format(s,cls.__name__) - return cls.init_fail(m,on_fail) - -class PubKey(HexStr,MMGenObject): +class PubKey(HexStr,MMGenObject): # TODO: add some real checks def __new__(cls,s,compressed,on_fail='die'): - assert type(compressed) == bool - me = HexStr.__new__(cls,s,case='lower') - me.compressed = compressed - return me + try: + assert type(compressed) == bool,"'compressed' must be of type bool" + me = HexStr.__new__(cls,s,case='lower',on_fail='raise') + me.compressed = compressed + return me + except Exception as e: + m = '{!r}: invalid value for pubkey ({})'.format(s,e[0]) + return cls.init_fail(m,on_fail) class PrivKey(str,Hilite,InitErrors,MMGenObject): @@ -571,104 +551,71 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject): compressed = MMGenImmutableAttr('compressed',bool,typeconv=False) wif = MMGenImmutableAttr('wif',WifKey,typeconv=False) - def __new__(*args,**kwargs): # initialize with (priv_bin,compressed), WIF or self - cls = args[0] - assert set(kwargs) <= set(['on_fail','wif']) - on_fail = kwargs['on_fail'] if 'on_fail' in kwargs else 'die' + # initialize with (priv_bin,compressed), WIF or self + def __new__(cls,s=None,compressed=None,wif=None,on_fail='die',testnet=None): # default to g.testnet + + if type(s) == cls: return s + assert wif or (s and type(compressed) == bool),'Incorrect args for PrivKey()' cls.arg_chk(cls,on_fail) - if len(args) == 2: - assert type(args[1]) == cls - return args[1] - - if 'wif' in kwargs: - assert len(args) == 1 + if wif: try: - from mmgen.bitcoin import wif2hex,wif_is_compressed # TODO: move these here - wif = WifKey(kwargs['wif']) - me = str.__new__(cls,wif2hex(wif)) - me.compressed = wif_is_compressed(wif) - me.wif = wif + assert set(wif) <= set(ascii_letters+digits),'not an ascii string' + from mmgen.bitcoin import wif2hex + w2h = wif2hex(wif,testnet=testnet) + assert w2h,"wif2hex() failed for wif key {!r}".format(wif) + me = str.__new__(cls,w2h['hex']) + me.compressed = w2h['compressed'] + me.testnet = w2h['testnet'] + me.wif = str.__new__(WifKey,wif) # check has been done return me - except: - fs = "Value '{}' cannot be converted to WIF key" - errmsg = fs.format(kwargs['wif']) - return cls.init_fail(errmsg,on_fail) - - cls,s,compressed = args + except Exception as e: + fs = "Value {!r} cannot be converted to WIF key ({})" + return cls.init_fail(fs.format(wif,e[0]),on_fail) try: from binascii import hexlify - assert len(s) == cls.width / 2 + assert len(s) == cls.width / 2,'Key length must be {}'.format(cls.width/2) me = str.__new__(cls,hexlify(s)) me.compressed = compressed - me.wif = me.towif() + me.wif = me.towif(testnet=testnet) +# me.testnet = testnet # leave uninitialized for now return me - except: - fs = "Key={}\nCompressed={}\nValue pair cannot be converted to {}" - errmsg = fs.format(repr(s),compressed,cls.__name__) - return cls.init_fail(errmsg,on_fail) + except Exception as e: + fs = "Key={}\nCompressed={}\nValue pair cannot be converted to PrivKey ({})" + return cls.init_fail(fs.format(repr(s),compressed,e[0]),on_fail) - def towif(self): + def towif(self,testnet=None): from mmgen.bitcoin import hex2wif - return WifKey(hex2wif(self,compressed=self.compressed)) + return WifKey(hex2wif(self,compressed=self.compressed),on_fail='raise',testnet=testnet) -class MMGenAddrType(str,Hilite,InitErrors): - width = 1 - trunc_ok = False - color = 'blue' - mmtypes = { - # TODO 'L' is ambiguous: For user, it means MMGen legacy uncompressed address. - # For generator functions, 'L' means any p2pkh address, and 'S' any ps2h address - 'L': 'legacy', - 'S': 'segwit', -# 'l': 'litecoin', -# 'e': 'ethereum', -# 'E': 'ethereum_classic', -# 'm': 'monero', -# 'z': 'zcash', - } - dfl_mmtype = 'L' - def __new__(cls,s,on_fail='die',errmsg=None): - cls.arg_chk(cls,on_fail) - for k,v in cls.mmtypes.items(): - if s in (k,v): - if s == v: s = k - me = str.__new__(cls,s) - me.name = cls.mmtypes[s] - return me - m = errmsg or "'{}': invalid value for {}".format(s,cls.__name__) - return cls.init_fail(m,on_fail) - -class MMGenPasswordType(MMGenAddrType): - mmtypes = { 'P': 'password' } - -class AddrListID(str,Hilite,InitErrors): +class AddrListID(str,Hilite,InitErrors,MMGenObject): width = 10 trunc_ok = False color = 'yellow' def __new__(cls,sid,mmtype,on_fail='die'): cls.arg_chk(cls,on_fail) - m = "'{}': not a SeedID. Cannot create {}".format(sid,cls.__name__) - if type(sid) == SeedID: - m = "'{}': not an MMGenAddrType object. Cannot create {}".format(mmtype,cls.__name__) - if type(mmtype) in (MMGenAddrType,MMGenPasswordType): - me = str.__new__(cls,sid+':'+mmtype) # colon in key is OK - me.sid = sid - me.mmtype = mmtype - return me - return cls.init_fail(m,on_fail) + try: + assert type(sid) == SeedID,"{!r} not a SeedID instance".format(sid) + t = MMGenAddrType,MMGenPasswordType + assert type(mmtype) in t,"{!r} not an instance of {}".format(mmtype,','.join([i.__name__ for i in t])) + me = str.__new__(cls,sid+':'+mmtype) + me.sid = sid + me.mmtype = mmtype + return me + except Exception as e: + m = "Cannot create AddrListID ({})".format(e[0]) + return cls.init_fail(m,on_fail) class MMGenLabel(unicode,Hilite,InitErrors): - color = 'pink' allowed = [] forbidden = [] max_len = 0 min_len = 0 desc = 'label' - def __new__(cls,s,on_fail='die',msg=None): + if type(s) == cls: return s cls.arg_chk(cls,on_fail) for k in cls.forbidden,cls.allowed: assert type(k) == list @@ -677,23 +624,17 @@ class MMGenLabel(unicode,Hilite,InitErrors): s = s.strip() if type(s) != unicode: s = s.decode('utf8') - except: - m = "'%s': value is not a valid UTF-8 string" % s - else: from mmgen.util import capfirst - if len(s) > cls.max_len: - m = u"'{}': {} too long (>{} symbols)".format(s,capfirst(cls.desc),cls.max_len) - elif len(s) < cls.min_len: - m = u"'{}': {} too short (<{} symbols)".format(s,capfirst(cls.desc),cls.min_len) - elif cls.allowed and not set(list(s)).issubset(set(cls.allowed)): - m = u"{} '{}' contains non-allowed symbols: {}".format(capfirst(cls.desc),s, - ' '.join(set(list(s)) - set(cls.allowed))) - elif cls.forbidden and any(ch in s for ch in cls.forbidden): - m = u"{} '{}' contains one of these forbidden symbols: '{}'".format(capfirst(cls.desc),s, - "', '".join(cls.forbidden)) - else: - return unicode.__new__(cls,s) - return cls.init_fail((msg+'\n' if msg else '') + m,on_fail) + assert len(s) <= cls.max_len, 'too long (>{} symbols)'.format(cls.max_len) + assert len(s) >= cls.min_len, 'too short (<{} symbols)'.format(cls.min_len) + assert not cls.allowed or set(list(s)).issubset(set(cls.allowed)),\ + u'contains non-allowed symbols: {}'.format(' '.join(set(list(s)) - set(cls.allowed))) + assert not cls.forbidden or not any(ch in s for ch in cls.forbidden),\ + u"contains one of these forbidden symbols: '{}'".format("', '".join(cls.forbidden)) + return unicode.__new__(cls,s) + except Exception as e: + m = u"{!r}: value cannot be converted to {} ({})" + return cls.init_fail(m.format(s,cls.__name__,e),on_fail) class MMGenWalletLabel(MMGenLabel): max_len = 48 @@ -715,4 +656,41 @@ class MMGenPWIDString(MMGenLabel): desc = 'password ID string' forbidden = list(u' :/\\') -class AddrListList(list,MMGenObject): pass +class MMGenAddrType(str,Hilite,InitErrors,MMGenObject): + width = 1 + trunc_ok = False + color = 'blue' + mmtypes = { # since 'name' is used to cook the seed, it must never change! +'L': {'name':'legacy','comp':False,'gen':'p2pkh', 'fmt':'p2pkh','desc':'Legacy uncompressed Bitcoin address'}, +'S': {'name':'segwit','comp':True, 'gen':'segwit','fmt':'p2sh', 'desc':'Bitcoin Segwit P2SH-P2WPK address' }, +'C': {'name':'compressed','comp':True,'gen':'p2pkh','fmt':'p2pkh','desc':'Compressed Bitcoin P2PKH address'} +# 'l': 'litecoin', +# 'e': 'ethereum', +# 'E': 'ethereum_classic', +# 'm': 'monero', +# 'z': 'zcash', + } + dfl_mmtype = 'L' + def __new__(cls,s,on_fail='die',errmsg=None): + if type(s) == cls: return s + cls.arg_chk(cls,on_fail) + try: + for k,v in cls.mmtypes.items(): + if s in (k,v['name']): + if s == v['name']: s = k + me = str.__new__(cls,s) + me.name = v['name'] + me.compressed = v['comp'] + me.gen_method = v['gen'] + me.desc = v['desc'] + me.addr_fmt = v['fmt'] + return me + raise ValueError,'not found' + except Exception as e: + m = errmsg or '{!r}: invalid value for {} ({})'.format(s,cls.__name__,e[0]) + return cls.init_fail(m,on_fail) + +class MMGenPasswordType(MMGenAddrType): + mmtypes = { + 'P': {'name':'password','comp':False,'gen':None,'fmt':None,'desc':'Password generated from MMGen seed'} + } diff --git a/mmgen/opts.py b/mmgen/opts.py index aca8fd74..7b1c0223 100755 --- a/mmgen/opts.py +++ b/mmgen/opts.py @@ -61,6 +61,8 @@ common_opts_data = """ --, --testnet=0|1 Disable or enable testnet --, --skip-cfg-file Skip reading the configuration file --, --version Print version information and exit +--, --bob Switch to user "Bob" in MMGen regtest setup +--, --alice Switch to user "Alice" in MMGen regtest setup """.format( pnm=g.proj_name, cu_dfl=g.coin, @@ -250,6 +252,16 @@ def init(opts_f,add_opts=[],opt_filter=None): for k in ('prog_name','desc','usage','options','notes'): if k in opts_data: del opts_data[k] + if g.bob or g.alice: + import regtest as rt + rt.user(('alice','bob')[g.bob],quiet=True) + g.testnet = True + g.rpc_host = 'localhost' + g.rpc_port = rt.rpc_port + g.rpc_user = rt.rpc_user + g.rpc_password = rt.rpc_password + g.data_dir = os.path.join(g.home_dir,'.'+g.proj_name.lower(),'regtest') + if g.debug: opt_postproc_debug() return args diff --git a/mmgen/tool.py b/mmgen/tool.py index ad65f13c..382d6374 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -27,6 +27,7 @@ import mmgen.bitcoin as mmb from mmgen.common import * from mmgen.crypto import * from mmgen.tx import * +from mmgen.addr import * pnm = g.proj_name @@ -78,7 +79,7 @@ cmd_data = OrderedDict([ ('Listaddress',['<{} address> [str]'.format(pnm),'minconf [int=1]','pager [bool=False]','showempty [bool=True]''showbtcaddr [bool=True]']), ('Listaddresses',["addrs [str='']",'minconf [int=1]','showempty [bool=False]','pager [bool=False]','showbtcaddrs [bool=False]']), - ('Getbalance', ['minconf [int=1]']), + ('Getbalance', ['minconf [int=1]','quiet [bool=False]']), ('Txview', ['<{} TX file(s)> [str]'.format(pnm),'pager [bool=False]','terse [bool=False]',"sort [str='mtime'] (options: 'ctime','atime')",'MARGS']), ('Twview', ["sort [str='age']",'reverse [bool=False]','show_days [bool=True]','show_mmid [bool=True]','minconf [int=1]','wide [bool=False]','pager [bool=False]']), @@ -308,6 +309,8 @@ def print_convert_results(indata,enc,dec,dtype): if error: die(3,"Error! Recoded data doesn't match input!") +kg = KeyGenerator() + def Hexdump(infile, cols=8, line_nums=True): Msg(pretty_hexdump( get_data_from_file(infile,dash=True,silent=True,binary=True), @@ -330,41 +333,37 @@ def Randhex(nbytes='32'): Msg(ba.hexlify(get_random(int(nbytes)))) def Randwif(compressed=False): - r_hex = ba.hexlify(get_random(32)) - enc = mmb.hex2wif(r_hex,compressed) - dec = wif2hex(enc) - print_convert_results(r_hex,enc,dec,'hex') + Msg(PrivKey(get_random(32),compressed).wif) def Randpair(compressed=False,segwit=False): if segwit: compressed = True - r_hex = ba.hexlify(get_random(32)) - wif = mmb.hex2wif(r_hex,compressed) - addr = mmb.privnum2addr(int(r_hex,16),compressed,segwit=segwit) - Vmsg('Key (hex): %s' % r_hex) - Vmsg_r('Key (WIF): '); Msg(wif) + ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)]) + privhex = PrivKey(get_random(32),compressed) + addr = ag.to_addr(kg.to_pubhex(privhex)) + Vmsg('Key (hex): %s' % privhex) + Vmsg_r('Key (WIF): '); Msg(privhex.wif) Vmsg_r('Addr: '); Msg(addr) def Wif2addr(wif,segwit=False): - compressed = mmb.wif_is_compressed(wif) - if segwit and not compressed: - die(1,'Segwit address cannot be generated from uncompressed WIF') - privhex = wif2hex(wif) - addr = mmb.privnum2addr(int(privhex,16),compressed,segwit=segwit) + privhex = PrivKey(wif=wif) + if segwit and not privhex.compressed: + die(2,'Segwit addresses must use compressed public keys') + ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)]) + addr = ag.to_addr(kg.to_pubhex(privhex)) Vmsg_r('Addr: '); Msg(addr) def Wif2segwit_pair(wif): - if not mmb.wif_is_compressed(wif): + privhex = PrivKey(wif=wif) + if not privhex.compressed: die(1,'Segwit address cannot be generated from uncompressed WIF') - privhex = wif2hex(wif) - pubhex = mmb.privnum2pubhex(int(privhex,16),compressed=True) - rs = mmb.pubhex2redeem_script(pubhex) - addr = mmb.hexaddr2addr(mmb.hash160(rs),p2sh=True) - addr_chk = mmb.privnum2addr(int(privhex,16),compressed=True,segwit=True) - assert addr == addr_chk + ag = AddrGenerator('segwit') + pubhex = kg.to_pubhex(privhex) + addr = ag.to_addr(pubhex) + rs = ag.to_segwit_redeem_script(pubhex) Msg('{}\n{}'.format(rs,addr)) def Hexaddr2addr(hexaddr): Msg(mmb.hexaddr2addr(hexaddr)) -def Addr2hexaddr(addr): Msg(mmb.verify_addr(addr,return_hex=True)) +def Addr2hexaddr(addr): Msg(mmb.verify_addr(addr,return_dict=True)['hex']) def Hash160(pubkeyhex): Msg(mmb.hash160(pubkeyhex)) def Pubhex2addr(pubkeyhex,p2sh=False): Msg(mmb.hexaddr2addr(mmb.hash160(pubkeyhex),p2sh=p2sh)) def Wif2hex(wif): Msg(wif2hex(wif)) @@ -379,13 +378,14 @@ def Privhex2pubhex(privhex,compressed=False): # new def Pubhex2redeem_script(pubhex): # new Msg(mmb.pubhex2redeem_script(pubhex)) def Wif2redeem_script(wif): # new - if not mmb.wif_is_compressed(wif): - die(1,'Witness redeem script cannot be generated from uncompressed WIF') - pubhex = mmb.privnum2pubhex(int(wif2hex(wif),16),compressed=True) - Msg(mmb.pubhex2redeem_script(pubhex)) + privhex = PrivKey(wif=wif) + if not privhex.compressed: + die(1,'Segwit redeem script cannot be generated from uncompressed WIF') + ag = AddrGenerator('segwit') + Msg(ag.to_segwit_redeem_script(kg.to_pubhex(privhex))) def wif2hex(wif): # wrapper - ret = mmb.wif2hex(wif) + ret = PrivKey(wif=wif) return ret or die(1,'{}: Invalid WIF'.format(wif)) wordlists = 'electrum','tirosh' @@ -452,7 +452,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa """ m_prev = None - for m in sorted([l.mmid for l in accts]): + for m in sorted(b.mmid for b in [a for a in accts if a]): if m == m_prev: msg('Duplicate MMGen ID ({}) discovered in tracking wallet!\n'.format(m)) bad_accts = MMGenList([l for l in accts if l.mmid == m]) @@ -464,6 +464,18 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa die(3,red('Exiting on error')) m_prev = m + def check_addr_array_lens(acct_pairs): + err = False + for label,addrs in acct_pairs: + if not label: continue + if len(addrs) != 1: + err = True + if len(addrs) == 0: + msg("Label '{}': has no associated address!".format(label)) + else: + msg("'{}': more than one {} address in account!".format(addrs,g.coin)) + if err: rdie(3,'Tracking wallet is corrupted!') + usr_addr_list = [] if addrs: a = addrs.rsplit(':',1) @@ -494,20 +506,22 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa # We use listaccounts only for empty addresses, as it shows false positive balances if showempty: - # args: minconf,watchonly - accts = MMGenList([b for b in [TwLabel(a,on_fail='silent') for a in c.listaccounts(0,True)] if b]) - check_dup_mmid(accts) - acct_addrs = c.getaddressesbyaccount([[a] for a in accts],batch=True) - assert len(accts) == len(acct_addrs), 'listaccounts() and getaddressesbyaccount() not of same length' - for a in acct_addrs: - if len(a) != 1: - die(2,"'{}': more than one {} address in account!".format(a,g.coin)) - for label,addr in zip(accts,[b[0] for b in acct_addrs]): + # for compatibility with old mmids, must use raw RPC rather than native data for matching + # args: minconf,watchonly, MUST use keys() so we get list, not dict + acct_list = c.listaccounts(0,True).keys() # raw list, no 'L' + acct_labels = MMGenList([TwLabel(a,on_fail='silent') for a in acct_list]) + check_dup_mmid(acct_labels) + acct_addrs = c.getaddressesbyaccount([[a] for a in acct_list],batch=True) # use raw list here + assert len(acct_list) == len(acct_addrs), 'listaccounts() and getaddressesbyaccount() not equal in length' + addr_pairs = zip(acct_labels,acct_addrs) + check_addr_array_lens(addr_pairs) + for label,addr_arr in addr_pairs: + if not label: continue if usr_addr_list and (label.mmid not in usr_addr_list): continue if label.mmid not in addrs: addrs[label.mmid] = { 'amt':BTCAmt('0'), 'lbl':label, 'addr':'' } if showbtcaddrs: - addrs[label.mmid]['addr'] = BTCAddr(addr) + addrs[label.mmid]['addr'] = BTCAddr(addr_arr[0]) if not addrs: die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty]) @@ -547,7 +561,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa o = '\n'.join(out) return do_pager(o) if pager else Msg(o) -def Getbalance(minconf=1): +def Getbalance(minconf=1,quiet=False): accts = {} for d in bitcoin_connection().listunspent(0): ma = split2(d['account'] if 'account' in d else '')[0] # include coinbase outputs if spendable @@ -562,12 +576,16 @@ def Getbalance(minconf=1): for j in ([],[0])[confs==0] + [i]: accts[key][j] += d['amount'] - fs = '{:13} {} {} {}' - mc,lbl = str(minconf),'confirms' - Msg(fs.format('Wallet', - *[s.ljust(16) for s in ' Unconfirmed',' <%s %s'%(mc,lbl),' >=%s %s'%(mc,lbl)])) - for key in sorted(accts.keys()): - Msg(fs.format(key+':', *[a.fmt(color=True,suf=' '+g.coin) for a in accts[key]])) + if quiet: + Msg('{}'.format(accts['TOTAL'][2])) + else: + fs = '{:13} {} {} {}' + mc,lbl = str(minconf),'confirms' + Msg(fs.format('Wallet', + *[s.ljust(16) for s in ' Unconfirmed',' <%s %s'%(mc,lbl),' >=%s %s'%(mc,lbl)])) + for key in sorted(accts.keys()): + Msg(fs.format(key+':', *[a.fmt(color=True,suf=' '+g.coin) for a in accts[key]])) + if 'SPENDABLE' in accts: Msg(red('Warning: this wallet contains PRIVATE KEYS for the SPENDABLE balance!')) diff --git a/mmgen/tx.py b/mmgen/tx.py index 48296add..8d290cc2 100755 --- a/mmgen/tx.py +++ b/mmgen/tx.py @@ -457,6 +457,8 @@ class MMGenTX(MMGenObject): msg_r('Signing transaction{}...'.format(tx_num_str)) ht = ('ALL','ALL|FORKID')[g.coin=='BCH'] # sighashtype defaults to 'ALL' wifs = [d.sec.wif for d in keys] +# keys.pmsg() +# pmsg(wifs) ret = c.signrawtransaction(self.hex,sig_data,wifs,ht,on_fail='return') from mmgen.rpc import rpc_error,rpc_errmsg @@ -525,7 +527,10 @@ class MMGenTX(MMGenObject): def is_in_wallet(self,c): ret = c.gettransaction(self.btc_txid,on_fail='silent') - return 'confirmations' in ret and ret['confirmations'] > 0 + if 'confirmations' in ret and ret['confirmations'] > 0: + return ret['confirmations'] + else: + return False def is_replaced(self,c): if self.is_in_mempool(c): return False @@ -541,7 +546,8 @@ class MMGenTX(MMGenObject): if self.is_in_mempool(c): msg(('Warning: transaction is in mempool!','Transaction is in mempool')[status]) elif self.is_in_wallet(c): - die(1,'Transaction has been confirmed{}'.format('' if status else '!')) + confs = self.is_in_wallet(c) + die(0,'Transaction has {} confirmation{}'.format(confs,suf(confs,'s'))) elif self.is_in_utxos(c): die(2,red('ERROR: transaction is in the blockchain (but not in the tracking wallet)!')) ret = self.is_replaced(c) # 1: replacement in mempool, 2: replacement confirmed diff --git a/mmgen/txsign.py b/mmgen/txsign.py index be7327cc..4e4caf4f 100755 --- a/mmgen/txsign.py +++ b/mmgen/txsign.py @@ -138,8 +138,12 @@ def get_seed_files(opt,args): # favor unencrypted seed sources first, as they don't require passwords u,e = SeedSourceUnenc,SeedSourceEnc ret = _pop_and_return(args,u.get_extensions()) - from mmgen.filename import find_file_in_dir - wf = find_file_in_dir(Wallet,g.data_dir) # Make this the first encrypted ss in the list + from mmgen.filename import find_file_in_dir,find_files_in_dir + if g.bob or g.alice: + import regtest as rt + wf = rt.mmwords[('alice','bob')[g.bob]] + else: + wf = find_file_in_dir(Wallet,g.data_dir) # Make this the first encrypted ss in the list if wf: ret.append(wf) ret += _pop_and_return(args,e.get_extensions()) if not (ret or opt.mmgen_keys_from_file or opt.keys_from_file): # or opt.use_wallet_dat diff --git a/mmgen/util.py b/mmgen/util.py index 49e6f9d9..279146d0 100755 --- a/mmgen/util.py +++ b/mmgen/util.py @@ -31,6 +31,10 @@ def msg_r(s): sys.stderr.write(s.encode('utf8')) def Msg(s): sys.stdout.write(s.encode('utf8') + '\n') def Msg_r(s): sys.stdout.write(s.encode('utf8')) def msgred(s): msg(red(s)) +def ymsg(s): msg(yellow(s)) +def ymsg_r(s): msg_r(yellow(s)) +def gmsg(s): msg(green(s)) +def gmsg_r(s): msg_r(green(s)) def mmsg(*args): for d in args: Msg(repr(d)) @@ -464,7 +468,11 @@ def make_full_path(outdir,outfile): def get_seed_file(cmd_args,nargs,invoked_as=None): from mmgen.filename import find_file_in_dir from mmgen.seed import Wallet - wf = find_file_in_dir(Wallet,g.data_dir) + if g.bob or g.alice: + import regtest as rt + wf = rt.mmwords[('alice','bob')[g.bob]] + else: + wf = find_file_in_dir(Wallet,g.data_dir) wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt? @@ -800,9 +808,12 @@ def get_bitcoind_auth_cookie(): def bitcoin_connection(): def check_coin_mismatch(c): + if c.getblockcount() == 0: + msg('Warning: no blockchain, so skipping block mismatch check') + return fb = '00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148' err = [] - if int(c.getblockchaininfo()['blocks']) <= 478558 or c.getblockhash(478559) == fb: + if c.getblockchaininfo()['blocks'] <= 478558 or c.getblockhash(478559) == fb: if g.coin == 'BCH': err = 'BCH','BTC' elif g.coin == 'BTC': err = 'BTC','BCH' if err: wdie(2,"'{}' requested, but this is the {} chain!".format(*err)) diff --git a/scripts/traceback.py b/scripts/traceback.py index b751eae5..ee078d6c 100755 --- a/scripts/traceback.py +++ b/scripts/traceback.py @@ -10,7 +10,10 @@ try: sys.argv.pop(0) execfile(sys.argv[0]) except SystemExit: - sys.exit(int(str(sys.exc_info()[1]))) + try: + sys.exit(int(str(sys.exc_info()[1]))) + except: + sys.exit(1) except: l = traceback.format_exception(*sys.exc_info()) exc = l.pop() diff --git a/setup.py b/setup.py index 62e189be..d2b699eb 100755 --- a/setup.py +++ b/setup.py @@ -120,6 +120,7 @@ setup( 'mmgen.mn_tirosh', 'mmgen.obj', 'mmgen.opts', + 'mmgen.regtest', 'mmgen.rpc', 'mmgen.seed', 'mmgen.term', @@ -134,6 +135,7 @@ setup( 'mmgen.main_addrgen', 'mmgen.main_passgen', 'mmgen.main_addrimport', + 'mmgen.main_regtest', 'mmgen.main_txcreate', 'mmgen.main_txbump', 'mmgen.main_txsign', @@ -152,6 +154,7 @@ setup( 'mmgen-passgen', 'mmgen-addrimport', 'mmgen-passchg', + 'mmgen-regtest', 'mmgen-walletchk', 'mmgen-walletconv', 'mmgen-walletgen', diff --git a/test/gentest.py b/test/gentest.py index e731afc7..82e45a53 100755 --- a/test/gentest.py +++ b/test/gentest.py @@ -29,7 +29,6 @@ from binascii import hexlify # Import these _after_ local path's been added to sys.path from mmgen.common import * -from mmgen.bitcoin import hex2wif rounds = 100 opts_data = lambda: { @@ -115,7 +114,6 @@ def match_error(sec,wif,a_addr,b_addr,a,b): """.format(sec,wif,a_addr,b_addr,pnm=g.proj_name,a=m[a],b=m[b]).rstrip()) # Begin execution -mmtype = ('L','S')[bool(opt.segwit)] compressed = True from mmgen.addr import KeyGenerator,AddrGenerator @@ -177,7 +175,6 @@ elif a and dump: sec = PrivKey(wif=wif) except: die(2,'\nInvalid {}net WIF address in dump file: {}'.format(('main','test')[g.testnet],wif)) - compressed = wif[0] != ('5','9')[g.testnet] b_addr = ag.to_addr(kg.to_pubhex(sec)) if a_addr != b_addr: match_error(sec,wif,a_addr,b_addr,3,a) diff --git a/test/objtest.py b/test/objtest.py new file mode 100755 index 00000000..1030e4c3 --- /dev/null +++ b/test/objtest.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 -*- +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2017 Philemon +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +test/objtest.py: Test MMGen data objects +""" + +import sys,os +pn = os.path.dirname(sys.argv[0]) +os.chdir(os.path.join(pn,os.pardir)) +sys.path.__setitem__(0,os.path.abspath(os.curdir)) + +from binascii import hexlify + +# Import these _after_ local path's been added to sys.path +from mmgen.common import * +from mmgen.obj import * +from mmgen.seed import * + +opts_data = lambda: { + 'desc': 'Test MMGen data objects', + 'sets': ( ('super_silent', True, 'silent', True), ), + 'usage':'[options] [object]', + 'options': """ +-h, --help Print this help message +--, --longhelp Print help message for long options (common options) +-q, --quiet Produce quieter output +-s, --silent Silence output of tested objects +-S, --super-silent Silence all output except for errors +-v, --verbose Produce more verbose output +""" +} + +cmd_args = opts.init(opts_data) + +def run_test(test,arg,input_data): + arg_copy = arg + kwargs = {'on_fail':'silent'} if opt.silent else {} + ret_chk = arg + if input_data == 'good' and type(arg) == tuple: arg,ret_chk = arg + if type(arg) == dict: # pass one arg + kwargs to constructor + arg_copy = arg.copy() + if 'arg' in arg: + args = [arg['arg']] + ret_chk = args[0] + del arg['arg'] + else: + args = [] + ret_chk = arg.values()[0] # assume only one key present + if 'ret' in arg: + ret_chk = arg['ret'] + del arg['ret'] + del arg_copy['ret'] + kwargs.update(arg) + else: + args = [arg] + try: + if not opt.super_silent: + msg_r((orange,green)[input_data=='good']('{:<22}'.format(repr(arg_copy)+':'))) + cls = globals()[test] + ret = cls(*args,**kwargs) + bad_ret = list() if issubclass(cls,list) else None + if (opt.silent and input_data=='bad' and ret!=bad_ret) or (not opt.silent and input_data=='bad'): + raise UserWarning,"Non-'None' return value {} with bad input data".format(repr(ret)) + if opt.silent and input_data=='good' and ret==bad_ret: + raise UserWarning,"'None' returned with good input data" + if input_data=='good' and ret != ret_chk and repr(ret) != repr(ret_chk): + raise UserWarning,"Return value ({!r}) doesn't match expected value ({!r})".format(ret,ret_chk) + if not opt.super_silent: + msg(u'==> {}'.format(ret)) + if opt.verbose and issubclass(cls,MMGenObject): + ret.pmsg() + except SystemExit as e: + if input_data == 'good': + raise ValueError,'Error on good input data' + if opt.verbose: + msg('exitval: {}'.format(e[0])) + except UserWarning as e: + msg('==> {!r}'.format(ret)) + die(2,red('{}'.format(e[0]))) + +r32,r24,r16,r17,r18 = os.urandom(32),os.urandom(24),os.urandom(16),os.urandom(17),os.urandom(18) + +from collections import OrderedDict +tests = OrderedDict([ + ('AddrIdx', { + 'bad': ('s',1.1,12345678,-1), + 'good': (('7',7),) + }), + ('AddrIdxList', { + 'bad': ('x','5,9,1-2-3','8,-11','66,3-2'), + 'good': ( + ('3,2,2',[2,3]), + ('101,1,3,5,2-7,99',[1,2,3,4,5,6,7,99,101]), + ({'idx_list':AddrIdxList('1-5')},[1,2,3,4,5]) + )}), + ('BTCAmt', { + 'bad': ('-3.2','0.123456789',123L,'123L',22000000,20999999.12345678), + 'good': (('20999999.12345678',Decimal('20999999.12345678')),) + }), + ('BTCAddr', { + 'bad': (1,'x','я'), + 'good': ( + '1MjjELEy6EJwk8fSNfpS8b5teFRo4X5fZr', + '32GiSWo9zJQgkCmjAaLRrbPwXhKry2jHhj', + 'n2FgXPKwuFkCXF946EnoxWJDWF2VwQ6q8J', + '2MspvWFjBbkv2wzQGqhxJUYPCk3Y2jMaxLN' + )}), + ('SeedID', { + 'bad': ( + {'sid':'я'}, + {'sid':'F00F00'}, + {'sid':'xF00F00x'}, + {'sid':1}, + {'sid':'F00BAA123'}, + {'sid':'f00baa12'}, + 'я',r32,'abc'), + 'good': (({'sid':'F00BAA12'},'F00BAA12'),(Seed(r16),Seed(r16).sid)) + }), + ('MMGenID', { + 'bad': ('x',1,'f00f00f','a:b','x:L:3','F00BAA12:0','F00BAA12:Z:99'), + 'good': (('F00BAA12:99','F00BAA12:L:99'),'F00BAA12:L:99','F00BAA12:S:99') + }), + ('TwMMGenID', { + 'bad': ('x','я','я:я',1,'f00f00f','a:b','x:L:3','F00BAA12:0','F00BAA12:Z:99','btc:','btc:я'), + 'good': (('F00BAA12:99','F00BAA12:L:99'),'F00BAA12:L:99','F00BAA12:S:9999999','btc:x') + }), + ('TwComment', { + 'bad': ('я',"comment too long for tracking wallet",), + 'good': ('OK comment',) + }), + ('TwLabel', { + 'bad': ('x x','x я','я:я',1,'f00f00f','a:b','x:L:3','F00BAA12:0 x', + 'F00BAA12:Z:99','F00BAA12:L:99 я','btc: x','btc:я x'), + 'good': ( + ('F00BAA12:99 a comment','F00BAA12:L:99 a comment'), + 'F00BAA12:L:99 comment', + 'F00BAA12:S:9999999 comment', + 'btc:x comment') + }), + ('HexStr', { + 'bad': (1,[],'\0','\1','я','g','gg','FF','f00'), + 'good': ('deadbeef','f00baa12') + }), + ('MMGenTxID', { + 'bad': (1,[],'\0','\1','я','g','gg','FF','f00','F00F0012'), + 'good': ('DEADBE','F00BAA') + }), + ('BitcoinTxID',{ + 'bad': (1,[],'\0','\1','я','g','gg','FF','f00','F00F0012',hexlify(r16),hexlify(r32)+'ee'), + 'good': (hexlify(r32),) + }), + ('WifKey', { + 'bad': (1,[],'\0','\1','я','g','gg','FF','f00',hexlify(r16),'2MspvWFjBbkv2wzQGqhxJUYPCk3Y2jMaxLN'), + 'good': ( + '5KXEpVzjWreTcQoG5hX357s1969MUKNLuSfcszF6yu84kpsNZKb', + 'KwWr9rDh8KK5TtDa3HLChEvQXNYcUXpwhRFUPc5uSNnMtqNKLFhk', + {'arg':'93HsQEpH75ibaUJYi3QwwiQxnkW4dUuYFPXZxcbcKds7XrqHkY6','testnet':True}, + {'arg':'cMsqcmDYZP1LdKgqRh9L4ZRU9br28yvdmTPwW2YQwVSN9aQiMAoR','testnet':True} + ) + }), + ('PubKey', { + 'bad': ({'arg':1,'compressed':False},{'arg':'F00BAA12','compressed':False},), + 'good': ({'arg':'deadbeef','compressed':True},) # TODO: add real pubkeys + }), + ('PrivKey', { + 'bad': ({'wif':1},), + 'good': ( + {'wif':'5KXEpVzjWreTcQoG5hX357s1969MUKNLuSfcszF6yu84kpsNZKb', + 'ret':'e0aef965b905a2fedf907151df8e0a6bac832aa697801c51f58bd2ecb4fd381c'}, + {'wif':'KwWr9rDh8KK5TtDa3HLChEvQXNYcUXpwhRFUPc5uSNnMtqNKLFhk', + 'ret':'08d0ed83b64b68d56fa064be48e2385060ed205be2b1e63cd56d218038c3a05f'}, + {'wif':'93HsQEpH75ibaUJYi3QwwiQxnkW4dUuYFPXZxcbcKds7XrqHkY6','testnet':True, + 'ret':'e0aef965b905a2fedf907151df8e0a6bac832aa697801c51f58bd2ecb4fd381c'}, + {'wif':'cMsqcmDYZP1LdKgqRh9L4ZRU9br28yvdmTPwW2YQwVSN9aQiMAoR','testnet':True, + 'ret':'08d0ed83b64b68d56fa064be48e2385060ed205be2b1e63cd56d218038c3a05f'}, + {'s':r32,'compressed':False,'ret':hexlify(r32)}, + {'s':r32,'compressed':True,'ret':hexlify(r32)} + ) + }), + ('AddrListID', { # a rather pointless test, but do it anyway + 'bad': ( + {'sid':SeedID(sid='F00BAA12'),'mmtype':'Z','ret':'F00BAA12:Z'}, + ), + 'good': ( + {'sid':SeedID(sid='F00BAA12'),'mmtype':MMGenAddrType('S'),'ret':'F00BAA12:S'}, + {'sid':SeedID(sid='F00BAA12'),'mmtype':MMGenAddrType('L'),'ret':'F00BAA12:L'}, + ) + }), + ('MMGenWalletLabel', { + 'bad': ('яqwerty','This text is too long to fit in an MMGen wallet label'), + 'good': ('a good label',) + }), + ('TwComment', { + 'bad': (u'яqwerty','This text is too long for a TW comment'), + 'good': ('a good comment',) + }), + ('MMGenTXLabel',{ + 'bad': ('This text is too long for a transaction comment. '*2,), + 'good': (u'UTF-8 is OK: я','a good comment',) + }), + ('MMGenPWIDString', { # forbidden = list(u' :/\\') + 'bad': ('foo/','foo:','foo:\\'), + 'good': (u'qwerty@яяя',) + }), + ('MMGenAddrType', { + 'bad': ('U','z','xx',1,'dogecoin'), + 'good': ( + {'s':'segwit','ret':'S'}, + {'s':'S','ret':'S'}, + {'s':'legacy','ret':'L'}, + {'s':'L','ret':'L'}, + {'s':'compressed','ret':'C'}, + {'s':'C','ret':'C'} + )}), + ('MMGenPasswordType', { + 'bad': ('U','z','я',1,'passw0rd'), + 'good': ( + {'s':'password','ret':'P'}, + {'s':'P','ret':'P'}, + )}), +]) + +def do_loop(): + utests = cmd_args + for test in tests: + if utests and test not in utests: continue + msg((blue,nocolor)[bool(opt.super_silent)]('Testing {}'.format(test))) + for k in ('bad','good'): + for arg in tests[test][k]: + run_test(test,arg,input_data=k) + +do_loop() diff --git a/test/test.py b/test/test.py index 7dd12f49..35578573 100755 --- a/test/test.py +++ b/test/test.py @@ -50,7 +50,8 @@ ref_wallet_brainpass = 'abc' ref_wallet_hash_preset = '1' ref_wallet_incog_offset = 123 -from mmgen.obj import MMGenTXLabel +from mmgen.obj import MMGenTXLabel,PrivKey,BTCAmt +from mmgen.addr import AddrGenerator,KeyGenerator,AddrList,AddrData,AddrIdxList ref_tx_label = ''.join([unichr(i) for i in range(65,91) + range(1040,1072) + # cyrillic range(913,939) + # greek @@ -709,7 +710,6 @@ def find_generated_exts(cmd): def get_addrfile_checksum(display=False): addrfile = get_file_with_ext('addrs',cfg['tmpdir']) silence() - from mmgen.addr import AddrList chk = AddrList(addrfile).chksum if opt.verbose and display: msg('Checksum: %s' % cyan(chk)) end_silence() @@ -728,9 +728,6 @@ class MMGenExpect(MMGenPexpect): desc = (cmd_data[name][1],name)[bool(opt.names)] + (' ' + extra_desc).strip() return MMGenPexpect.__init__(self,name,mmgen_cmd,cmd_args,desc,no_output=no_output) -from mmgen.obj import BTCAmt -from mmgen.bitcoin import verify_addr - def create_fake_unspent_entry(btcaddr,al_id=None,idx=None,lbl=None,non_mmgen=False,segwit=False): if lbl: lbl = ' ' + lbl spk1,spk2 = (('76a914','88ac'),('a914','87'))[segwit and btcaddr.addr_fmt=='p2sh'] @@ -741,7 +738,7 @@ def create_fake_unspent_entry(btcaddr,al_id=None,idx=None,lbl=None,non_mmgen=Fal 'amount': BTCAmt('%s.%s' % (10+(getrandnum(4) % 40), getrandnum(4) % 100000000)), 'address': btcaddr, 'spendable': False, - 'scriptPubKey': (spk1+verify_addr(btcaddr,return_hex=True)+spk2), + 'scriptPubKey': '{}{}{}'.format(spk1,btcaddr.hex,spk2), 'confirmations': getrandnum(4) % 50000 } @@ -784,14 +781,10 @@ def create_fake_unspent_data(adata,tx_data,non_mmgen_input=''): out.append(create_fake_unspent_entry(btcaddr,d['al_id'],idx,lbl,segwit=d['segwit'])) if non_mmgen_input: - privnum = getrandnum(32) - from mmgen.bitcoin import privnum2addr,hex2wif - from mmgen.obj import BTCAddr - btcaddr = BTCAddr(privnum2addr(privnum,compressed=True)) + privkey = PrivKey(os.urandom(32),compressed=True) + btcaddr = AddrGenerator('p2pkh').to_addr(KeyGenerator().to_pubhex(privkey)) of = os.path.join(cfgs[non_mmgen_input]['tmpdir'],non_mmgen_fn) - wif = hex2wif('{:064x}'.format(privnum),compressed=True) -# Msg(yellow(wif + ' ' + btcaddr)) - write_data_to_file(of,wif+'\n','compressed bitcoin key',silent=True) + write_data_to_file(of,privkey.wif+'\n','compressed bitcoin key',silent=True) out.append(create_fake_unspent_entry(btcaddr,non_mmgen=True,segwit=False)) # msg('\n'.join([repr(o) for o in out])); sys.exit(0) @@ -808,7 +801,6 @@ def write_fake_data_to_file(d): sys.stderr.write("Fake transaction wallet data written to file '%s'\n" % unspent_data_file) def create_tx_data(sources): - from mmgen.addr import AddrList,AddrData,AddrIdxList tx_data,ad = {},AddrData() for s in sources: afile = get_file_with_ext('addrs',cfgs[s]['tmpdir']) @@ -829,8 +821,8 @@ def create_tx_data(sources): return ad,tx_data def make_txcreate_cmdline(tx_data): - from mmgen.bitcoin import privnum2addr - btcaddr = privnum2addr(getrandnum(32),compressed=True) + privkey = PrivKey(os.urandom(32),compressed=True) + btcaddr = AddrGenerator('segwit').to_addr(KeyGenerator().to_pubhex(privkey)) cmd_args = ['-d',cfg['tmpdir']] for num in tx_data: @@ -848,7 +840,6 @@ def make_txcreate_cmdline(tx_data): def add_comments_to_addr_file(addrfile,outfile): silence() msg(green("Adding comments to address file '%s'" % addrfile)) - from mmgen.addr import AddrList a = AddrList(addrfile) for n,idx in enumerate(a.idxs(),1): if n % 2: a.set_comment(idx,'Test address %s' % n) diff --git a/test/tooltest.py b/test/tooltest.py index afe2fb94..7d763a47 100755 --- a/test/tooltest.py +++ b/test/tooltest.py @@ -354,7 +354,7 @@ class MMGenToolTestSuite(object): def Hexaddr2addr(self,name,f1,f2,f3,f4): for n,fi,fo,m in ((1,f1,f2,''),(2,f3,f4,'from compressed')): self.run_cmd_chk(name,fi,fo,extra_msg=m) - def Privhex2pubhex(self,name,f1,f2,f3): # from hex2wif + def Privhex2pubhex(self,name,f1,f2,f3): # from Hex2wif addr = read_from_file(f3).strip() self.run_cmd_out(name,addr,kwargs='compressed=1',fn_idx=3) def Pubhex2redeem_script(self,name,f1,f2,f3): # from above