obj.py rewrite+test suite, Bob and Alice regtest mode, compressed addresses
- basic data types in obj.py rewritten
- new test suite 'test/objtest.py' for testing basic data types
- new compressed address type with the 'C' identifier
- Bob and Alice regtest mode for testing MMGen in a mock two-user environment
* All MMGen commands are available in this mode.
* Set up with 'mmgen-regtest setup'. Bob and Alice's wallets are funded with
500 BTC each. Use the --mixed switch to import mixed address types in
Bob and Alice's tracking wallets.
* Transact as Bob by adding --bob switch to MMGen commands
* Transact as Alice by adding --alice switch to MMGen commands
* After sending a transaction, mine a block to confirm it with
'mmgen-regtest generate'
The bitcoin daemon is stopped and restarted automatically when switching
between users.
This commit is contained in:
parent
33673cbb1b
commit
b23b497d77
20 changed files with 813 additions and 461 deletions
25
mmgen-regtest
Executable file
25
mmgen-regtest
Executable file
|
|
@ -0,0 +1,25 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
|
||||
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation, either version 3 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
mmgen-regtest: Bitcoind regression test mode setup and operations for the MMGen
|
||||
suite
|
||||
"""
|
||||
|
||||
from mmgen.main import launch
|
||||
launch("regtest")
|
||||
|
|
@ -61,7 +61,7 @@ class AddrGeneratorSegwit(MMGenObject):
|
|||
class KeyGenerator(MMGenObject):
|
||||
def __new__(cls,generator=None,silent=False):
|
||||
if cls.test_for_secp256k1(silent=silent) and generator != 1:
|
||||
if opt.key_generator != 1:
|
||||
if (not hasattr(opt,'key_generator')) or opt.key_generator == 2 or generator == 2:
|
||||
return super(cls,cls).__new__(KeyGeneratorSecp256k1)
|
||||
else:
|
||||
msg('Using (slow) native Python ECDSA library for address generation')
|
||||
|
|
@ -182,7 +182,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
|
||||
if seed and addr_idxs: # data from seed + idxs
|
||||
self.al_id,src = AddrListID(seed.sid,mmtype),'gen'
|
||||
adata = self.generate(seed,addr_idxs,compressed=(mmtype=='S'))
|
||||
adata = self.generate(seed,addr_idxs)
|
||||
elif addrfile: # data from MMGen address file
|
||||
adata = self.parse_file(addrfile) # sets self.al_id
|
||||
elif al_id and adata: # data from tracking wallet
|
||||
|
|
@ -210,7 +210,6 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
if self.al_id == None: return
|
||||
|
||||
self.id_str = AddrListIDStr(self)
|
||||
|
||||
if type(self) == KeyList: return
|
||||
|
||||
if do_chksum:
|
||||
|
|
@ -223,21 +222,18 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
qmsg(self.msgs[('check_chksum','record_chksum')[src=='gen']])
|
||||
|
||||
def update_msgs(self):
|
||||
if type(self).msgs and type(self) != AddrList:
|
||||
for k in AddrList.msgs:
|
||||
if k not in self.msgs:
|
||||
self.msgs[k] = AddrList.msgs[k]
|
||||
self.msgs = AddrList.msgs
|
||||
self.msgs.update(type(self).msgs)
|
||||
|
||||
def generate(self,seed,addrnums,compressed):
|
||||
def generate(self,seed,addrnums):
|
||||
assert type(addrnums) is AddrIdxList
|
||||
assert type(compressed) is bool
|
||||
|
||||
seed = seed.get_data()
|
||||
seed = self.cook_seed(seed)
|
||||
|
||||
if self.gen_addrs:
|
||||
kg = KeyGenerator()
|
||||
ag = AddrGenerator(('p2pkh','segwit')[self.al_id.mmtype=='S'])
|
||||
ag = AddrGenerator(self.al_id.mmtype.gen_method)
|
||||
|
||||
t_addrs,num,pos,out = len(addrnums),0,0,AddrListList()
|
||||
le = self.entry_type
|
||||
|
|
@ -256,7 +252,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
e = le(idx=num)
|
||||
|
||||
# Secret key is double sha256 of seed hash round /num/
|
||||
e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),compressed)
|
||||
e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),self.al_id.mmtype.compressed)
|
||||
|
||||
if self.gen_addrs:
|
||||
e.addr = ag.to_addr(kg.to_pubhex(e.sec))
|
||||
|
|
@ -278,8 +274,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
def is_for_current_chain(self):
|
||||
return self.data[0].addr.is_for_current_chain()
|
||||
|
||||
def chk_addr_or_pw(self,addr):
|
||||
return {'L':'p2pkh','S':'p2sh'}[self.al_id.mmtype] == is_btc_addr(addr).addr_fmt
|
||||
def check_format(self,addr): return True # format is checked when added to list entry object
|
||||
|
||||
def cook_seed(self,seed):
|
||||
if self.al_id.mmtype == 'L':
|
||||
|
|
@ -386,7 +381,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
elif self.al_id.mmtype == 'L':
|
||||
out.append('{} {{'.format(self.al_id.sid))
|
||||
else:
|
||||
out.append('{} {} {{'.format(self.al_id.sid,MMGenAddrType.mmtypes[self.al_id.mmtype].upper()))
|
||||
out.append('{} {} {{'.format(self.al_id.sid,self.al_id.mmtype.name.upper()))
|
||||
|
||||
fs = ' {:<%s} {:<34}{}' % len(str(self.data[-1].idx))
|
||||
for e in self.data:
|
||||
|
|
@ -419,7 +414,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
if not is_mmgen_idx(d[0]):
|
||||
return "'%s': invalid address num. in line: '%s'" % (d[0],l)
|
||||
|
||||
if not self.chk_addr_or_pw(d[1]):
|
||||
if not self.check_format(d[1]):
|
||||
return "'{}': invalid {}".format(d[1],self.data_desc)
|
||||
|
||||
if len(d) != 3: d.append('')
|
||||
|
|
@ -441,7 +436,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
|
||||
if self.has_keys and keypress_confirm('Check key-to-address validity?'):
|
||||
kg = KeyGenerator()
|
||||
ag = AddrGenerator(('p2pkh','segwit')[self.al_id.mmtype=='S'])
|
||||
ag = AddrGenerator(self.al_id.mmtype.gen_method)
|
||||
llen = len(ret)
|
||||
for n,e in enumerate(ret):
|
||||
msg_r('\rVerifying keys %s/%s' % (n+1,llen))
|
||||
|
|
@ -489,7 +484,7 @@ Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
|
|||
mmtype = MMGenAddrType(mmtype)
|
||||
except:
|
||||
return do_error(u"'{}': invalid address type in address file. Must be one of: {}".format(
|
||||
mmtype.upper(),' '.join(MMGenAddrType.mmtypes.values()).upper()))
|
||||
mmtype.upper(),' '.join([i['name'].upper() for i in MMGenAddrType.mmtypes.values()])))
|
||||
elif len(ls) == 0:
|
||||
mmtype = MMGenAddrType('L')
|
||||
else:
|
||||
|
|
@ -582,7 +577,7 @@ Record this checksum: it will be used to verify the password file in the future
|
|||
self.set_pw_len(pw_len)
|
||||
if chk_params_only: return
|
||||
self.al_id = AddrListID(seed.sid,MMGenPasswordType('P'))
|
||||
self.data = self.generate(seed,pw_idxs,compressed=False)
|
||||
self.data = self.generate(seed,pw_idxs)
|
||||
|
||||
self.num_addrs = len(self.data)
|
||||
self.fmt_data = ''
|
||||
|
|
@ -632,7 +627,7 @@ Record this checksum: it will be used to verify the password file in the future
|
|||
# we take least significant part
|
||||
return ''.join(baseconv.fromhex(hex_sec,self.pw_fmt,pad=self.pw_len))[-self.pw_len:]
|
||||
|
||||
def chk_addr_or_pw(self,pw):
|
||||
def check_format(self,pw):
|
||||
if not (is_b58_str,is_b32_str)[self.pw_fmt=='b32'](pw):
|
||||
msg('Password is not a valid {} string'.format(self.pw_fmt))
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -60,8 +60,6 @@ def _b58tonum(b58num):
|
|||
if not i in _b58a: return False
|
||||
return sum(_b58a.index(n) * (58**i) for i,n in enumerate(list(b58num[::-1])))
|
||||
|
||||
from mmgen.globalvars import g
|
||||
|
||||
def hash160(hexnum): # take hex, return hex - OP_HASH160
|
||||
return hashlib_new('ripemd160',sha256(unhexlify(hexnum)).digest()).hexdigest()
|
||||
|
||||
|
|
@ -69,56 +67,57 @@ def hash256(hexnum): # take hex, return hex - OP_HASH256
|
|||
return sha256(sha256(unhexlify(hexnum)).digest()).hexdigest()
|
||||
|
||||
# devdoc/ref_transactions.md:
|
||||
btc_ver_nums = {
|
||||
'p2pkh': (('00','1'),('6f','mn')),
|
||||
'p2sh': (('05','3'),('c4','2'))
|
||||
btc_addr_ver_nums = {
|
||||
'p2pkh': { 'mainnet': ('00','1'), 'testnet': ('6f','mn') },
|
||||
'p2sh': { 'mainnet': ('05','3'), 'testnet': ('c4','2') }
|
||||
}
|
||||
addr_pfxs = { 'mainnet': '13', 'testnet': 'mn2', 'regtest': 'mn2' }
|
||||
vnum_all = tuple([k for k,v in btc_ver_nums['p2pkh'] + btc_ver_nums['p2sh']])
|
||||
btc_addr_pfxs = { 'mainnet': '13', 'testnet': 'mn2', 'regtest': 'mn2' }
|
||||
btc_uncompressed_wif_pfxs = { 'mainnet':'5','testnet':'9' }
|
||||
btc_privkey_pfxs = { 'mainnet':'80','testnet':'ef' }
|
||||
|
||||
def hexaddr2addr(hexaddr,p2sh=False):
|
||||
s = vnum_all[g.testnet+(2*p2sh)] + hexaddr.strip()
|
||||
lzeroes = (len(s) - len(s.lstrip('0'))) / 2
|
||||
return ('1' * lzeroes) + _numtob58(int(s+hash256(s)[:8],16))
|
||||
from mmgen.globalvars import g
|
||||
|
||||
def verify_addr(addr,verbose=False,return_hex=False,return_type=False):
|
||||
addr = addr.strip()
|
||||
|
||||
for k in ('p2pkh','p2sh'):
|
||||
for ver_num,ldigit in btc_ver_nums[k]:
|
||||
def verify_addr(addr,verbose=False,return_dict=False,testnet=None):
|
||||
testnet = testnet if testnet != None else g.testnet # allow override
|
||||
for addr_fmt in ('p2pkh','p2sh'):
|
||||
for net in ('mainnet','testnet'):
|
||||
ver_num,ldigit = btc_addr_ver_nums[addr_fmt][net]
|
||||
if addr[0] not in ldigit: continue
|
||||
num = _b58tonum(addr)
|
||||
if num == False: break
|
||||
addr_hex = '{:050x}'.format(num)
|
||||
if addr_hex[:2] != ver_num: continue
|
||||
if hash256(addr_hex[:42])[:8] == addr_hex[42:]:
|
||||
return addr_hex[2:42] if return_hex else k if return_type else True
|
||||
return {'hex':addr_hex[2:42],'format':addr_fmt,'net':net} if return_dict else True
|
||||
else:
|
||||
if verbose: Msg("Invalid checksum in address '%s'" % addr)
|
||||
if verbose: Msg("Invalid checksum in address '{}'".format(addr))
|
||||
break
|
||||
|
||||
if verbose: Msg("Invalid address '%s'" % addr)
|
||||
if verbose: Msg("Invalid address '{}'".format(addr))
|
||||
return False
|
||||
|
||||
# Compressed address support:
|
||||
def hexaddr2addr(hexaddr,p2sh=False,testnet=None):
|
||||
testnet = testnet if testnet != None else g.testnet # allow override
|
||||
s = btc_addr_ver_nums[('p2pkh','p2sh')[p2sh]][('mainnet','testnet')[testnet]][0] + hexaddr
|
||||
lzeroes = (len(s) - len(s.lstrip('0'))) / 2
|
||||
return ('1' * lzeroes) + _numtob58(int(s+hash256(s)[:8],16))
|
||||
|
||||
def wif_is_compressed(wif): return wif[0] != ('5','9')[g.testnet]
|
||||
|
||||
def wif2hex(wif):
|
||||
wif = wif.strip()
|
||||
compressed = wif_is_compressed(wif)
|
||||
def wif2hex(wif,testnet=None):
|
||||
testnet = testnet if testnet != None else g.testnet # allow override
|
||||
num = _b58tonum(wif)
|
||||
if num == False: return False
|
||||
key = '{:x}'.format(num)
|
||||
compressed = wif[0] != btc_uncompressed_wif_pfxs[('mainnet','testnet')[testnet]]
|
||||
klen = (66,68)[bool(compressed)]
|
||||
if compressed and key[66:68] != '01': return False
|
||||
if (key[:2] == ('80','ef')[g.testnet] and key[klen:] == hash256(key[:klen])[:8]):
|
||||
return key[2:66]
|
||||
if (key[:2] == btc_privkey_pfxs[('mainnet','testnet')[testnet]] and key[klen:] == hash256(key[:klen])[:8]):
|
||||
return {'hex':key[2:66],'compressed':compressed,'testnet':testnet}
|
||||
else:
|
||||
return False
|
||||
|
||||
def hex2wif(hexpriv,compressed=False):
|
||||
s = ('80','ef')[g.testnet] + hexpriv.strip() + ('','01')[bool(compressed)]
|
||||
def hex2wif(hexpriv,compressed=False,testnet=None):
|
||||
testnet = testnet if testnet != None else g.testnet # allow override
|
||||
s = btc_privkey_pfxs[('mainnet','testnet')[testnet]] + hexpriv + ('','01')[bool(compressed)]
|
||||
return _numtob58(int(s+hash256(s)[:8],16))
|
||||
|
||||
# devdoc/guide_wallets.md:
|
||||
|
|
|
|||
|
|
@ -86,6 +86,9 @@ class g(object):
|
|||
rpc_password = ''
|
||||
testnet_name = 'testnet3'
|
||||
|
||||
bob = False
|
||||
alice = False
|
||||
|
||||
# test suite:
|
||||
bogus_wallet_data = ''
|
||||
traceback_cmd = 'scripts/traceback.py'
|
||||
|
|
@ -110,14 +113,15 @@ class g(object):
|
|||
# User opt sets global var:
|
||||
common_opts = (
|
||||
'color','no_license','rpc_host','rpc_port','testnet','rpc_user','rpc_password',
|
||||
'bitcoin_data_dir','force_256_color','regtest','coin'
|
||||
'bitcoin_data_dir','force_256_color','regtest','coin','bob','alice'
|
||||
)
|
||||
required_opts = (
|
||||
'quiet','verbose','debug','outdir','echo_passphrase','passwd_file','stdout',
|
||||
'show_hash_presets','label','keep_passphrase','keep_hash_preset','yes',
|
||||
'brain_params','b16','usr_randchars','coin'
|
||||
'brain_params','b16','usr_randchars','coin','bob','alice'
|
||||
)
|
||||
incompatible_opts = (
|
||||
('bob','alice'),
|
||||
('quiet','verbose'),
|
||||
('label','keep_label'),
|
||||
('tx_id','info'),
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ opts_data = lambda: {
|
|||
kgs=' '.join(['{}:{}'.format(n,k) for n,k in enumerate(g.key_generators,1)]),
|
||||
kg=g.key_generator,
|
||||
what=gen_what,g=g,
|
||||
dmat="'{}' or '{}'".format(MAT.dfl_mmtype,MAT.mmtypes[MAT.dfl_mmtype])
|
||||
dmat="'{}' or '{}'".format(MAT.dfl_mmtype,MAT.mmtypes[MAT.dfl_mmtype]['name'])
|
||||
),
|
||||
'notes': """
|
||||
|
||||
|
|
@ -95,6 +95,7 @@ range(s).
|
|||
ADDRESS TYPES:
|
||||
{n_at}
|
||||
|
||||
|
||||
NOTES FOR ALL GENERATOR COMMANDS
|
||||
|
||||
{pwn}
|
||||
|
|
@ -106,7 +107,7 @@ FMT CODES:
|
|||
""".format(
|
||||
n_secp=note_secp256k1,n_addrkey=note_addrkey,pwn=pw_note,bwn=bw_note,
|
||||
f='\n '.join(SeedSource.format_fmt_codes().splitlines()),
|
||||
n_at='\n '.join(["'{}', '{}'".format(k,v) for k,v in MAT.mmtypes.items()]),
|
||||
n_at='\n '.join(["'{}','{:<12} - {}".format(k,v['name']+"'",v['desc']) for k,v in MAT.mmtypes.items()]),
|
||||
o=opts
|
||||
)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,16 +64,7 @@ def import_mmgen_list(infile):
|
|||
return al
|
||||
|
||||
def import_flat_list(lines):
|
||||
al = AddrList(addrlist=lines)
|
||||
from mmgen.bitcoin import verify_addr
|
||||
qmsg_r('Validating addresses...')
|
||||
for e in al.data:
|
||||
if not verify_addr(e.addr,verbose=True):
|
||||
die(2,'\n%s: invalid address' % e.addr)
|
||||
if e.addr.addr_fmt == 'p2sh':
|
||||
fs = "\n'{}':\n Non-{} P2SH addresses may not be imported into the tracking wallet"
|
||||
rdie(2,fs.format(e.addr,g.proj_name))
|
||||
return al
|
||||
return AddrList(addrlist=lines)
|
||||
|
||||
if len(cmd_args) == 1:
|
||||
infile = cmd_args[0]
|
||||
|
|
|
|||
65
mmgen/main_regtest.py
Executable file
65
mmgen/main_regtest.py
Executable file
|
|
@ -0,0 +1,65 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
|
||||
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
mmgen-regtest: Bitcoind regression test mode setup and operations for the MMGen
|
||||
suite
|
||||
"""
|
||||
|
||||
from mmgen.common import *
|
||||
opts_data = lambda: {
|
||||
'desc': 'Bitcoind regression test mode setup and operations for the {} suite'.format(g.proj_name),
|
||||
'usage': '[opts] <command>',
|
||||
'sets': ( ('yes', True, 'quiet', True), ),
|
||||
'options': """
|
||||
-h, --help Print this help message
|
||||
-m, --mixed Create Bob and Alice's wallets with mixed address types
|
||||
--, --longhelp Print help message for long options (common options)
|
||||
-q, --quiet Produce quieter output
|
||||
-v, --verbose Produce more verbose output
|
||||
""",
|
||||
'notes': """
|
||||
|
||||
|
||||
AVAILABLE COMMANDS
|
||||
|
||||
setup - setup up system for regtest operation with MMGen
|
||||
stop - stop the regtest bitcoind
|
||||
bob - switch to Bob's wallet, starting daemon if necessary
|
||||
alice - switch to Alice's wallet, starting daemon if necessary
|
||||
user - show current user
|
||||
generate - mine a block
|
||||
test_daemon - test whether daemon is running
|
||||
get_balances - get balances of Bob and Alice
|
||||
"""
|
||||
}
|
||||
|
||||
cmd_args = opts.init(opts_data)
|
||||
|
||||
if len(cmd_args) != 1:
|
||||
opts.usage()
|
||||
|
||||
cmds = ('setup','stop','generate','test_daemon','create_data_dir','bob','alice','user',
|
||||
'wait_for_daemon','wait_for_exit','get_current_user','get_balances')
|
||||
|
||||
if cmd_args[0] not in cmds:
|
||||
opts.usage()
|
||||
|
||||
from mmgen.regtest import *
|
||||
|
||||
globals()[cmd_args[0]]()
|
||||
|
|
@ -48,7 +48,7 @@ if not opt.status: do_license_msg()
|
|||
|
||||
c = bitcoin_connection()
|
||||
tx = MMGenTX(infile) # sig check performed here
|
||||
qmsg("Signed transaction file '%s' is valid" % infile)
|
||||
vmsg("Signed transaction file '%s' is valid" % infile)
|
||||
|
||||
if not tx.marked_signed(c):
|
||||
die(1,'Transaction is not signed!')
|
||||
|
|
|
|||
666
mmgen/obj.py
666
mmgen/obj.py
|
|
@ -23,6 +23,7 @@ obj.py: MMGen native classes
|
|||
import sys
|
||||
from decimal import *
|
||||
from mmgen.color import *
|
||||
from string import hexdigits,ascii_letters,digits
|
||||
|
||||
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
|
||||
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
|
||||
|
|
@ -38,7 +39,6 @@ class MMGenObject(object):
|
|||
def pmsg(self): print(self.pformat())
|
||||
def pdie(self): print(self.pformat()); sys.exit(0)
|
||||
def pformat(self,lvl=0):
|
||||
from decimal import Decimal
|
||||
scalars = (str,unicode,int,float,Decimal)
|
||||
def do_list(out,e,lvl=0,is_dict=False):
|
||||
out.append('\n')
|
||||
|
|
@ -100,80 +100,7 @@ class MMGenObject(object):
|
|||
|
||||
class MMGenList(list,MMGenObject): pass
|
||||
class MMGenDict(dict,MMGenObject): pass
|
||||
|
||||
# for attrs that are always present in the data instance
|
||||
# reassignment and deletion forbidden
|
||||
class MMGenImmutableAttr(object): # Descriptor
|
||||
|
||||
def __init__(self,name,dtype,typeconv=True):
|
||||
self.typeconv = typeconv
|
||||
assert type(dtype) in (str,type)
|
||||
self.name = name
|
||||
self.dtype = dtype
|
||||
|
||||
def __get__(self,instance,owner):
|
||||
return instance.__dict__[self.name]
|
||||
|
||||
# forbid all reassignment
|
||||
def set_attr_ok(self,instance):
|
||||
return not hasattr(instance,self.name)
|
||||
|
||||
def __set__(self,instance,value):
|
||||
if not self.set_attr_ok(instance):
|
||||
m = "Attribute '{}' of {} instance cannot be reassigned"
|
||||
raise AttributeError(m.format(self.name,type(instance)))
|
||||
if self.typeconv: # convert type
|
||||
instance.__dict__[self.name] = \
|
||||
globals()[self.dtype](value) if type(self.dtype) == str else self.dtype(value)
|
||||
else: # check type
|
||||
if type(value) != self.dtype:
|
||||
m = "Attribute '{}' of {} instance must of type {}"
|
||||
raise TypeError(m.format(self.name,type(instance),self.dtype))
|
||||
instance.__dict__[self.name] = value
|
||||
|
||||
def __delete__(self,instance):
|
||||
m = "Atribute '{}' of {} instance cannot be deleted"
|
||||
raise AttributeError(m.format(self.name,type(instance)))
|
||||
|
||||
# for attrs that might not be present in the data instance
|
||||
# reassignment or deletion allowed if specified
|
||||
class MMGenListItemAttr(MMGenImmutableAttr):
|
||||
|
||||
def __init__(self,name,dtype,typeconv=True,reassign_ok=False,delete_ok=False):
|
||||
self.reassign_ok = reassign_ok
|
||||
self.delete_ok = delete_ok
|
||||
MMGenImmutableAttr.__init__(self,name,dtype,typeconv=typeconv)
|
||||
|
||||
# return None if attribute doesn't exist
|
||||
def __get__(self,instance,owner):
|
||||
try: return instance.__dict__[self.name]
|
||||
except: return None
|
||||
|
||||
def set_attr_ok(self,instance):
|
||||
return getattr(instance,self.name) == None or self.reassign_ok
|
||||
|
||||
def __delete__(self,instance):
|
||||
if self.delete_ok:
|
||||
if self.name in instance.__dict__:
|
||||
del instance.__dict__[self.name]
|
||||
else:
|
||||
MMGenImmutableAttr.__delete__(self,instance)
|
||||
|
||||
class MMGenListItem(MMGenObject):
|
||||
|
||||
def __init__(self,*args,**kwargs):
|
||||
if args:
|
||||
raise ValueError, 'Non-keyword args not allowed'
|
||||
for k in kwargs:
|
||||
if kwargs[k] != None:
|
||||
setattr(self,k,kwargs[k])
|
||||
|
||||
# prevent setting random attributes
|
||||
def __setattr__(self,name,value):
|
||||
if name not in type(self).__dict__:
|
||||
m = "'{}': no such attribute in class {}"
|
||||
raise AttributeError(m.format(name,type(self)))
|
||||
return object.__setattr__(self,name,value)
|
||||
class AddrListList(list,MMGenObject): pass
|
||||
|
||||
class InitErrors(object):
|
||||
|
||||
|
|
@ -185,69 +112,13 @@ class InitErrors(object):
|
|||
def init_fail(m,on_fail,silent=False):
|
||||
if silent: m = ''
|
||||
from mmgen.util import die,msg
|
||||
if on_fail == 'die': die(1,m)
|
||||
if on_fail == 'die': die(1,m)
|
||||
elif on_fail == 'return':
|
||||
if m: msg(m)
|
||||
return None # TODO: change to False
|
||||
elif on_fail == 'silent': return None # same here
|
||||
elif on_fail == 'raise': raise ValueError,m
|
||||
|
||||
class AddrIdx(int,InitErrors):
|
||||
|
||||
max_digits = 7
|
||||
|
||||
def __new__(cls,num,on_fail='die'):
|
||||
cls.arg_chk(cls,on_fail)
|
||||
try:
|
||||
assert type(num) is not float
|
||||
me = int.__new__(cls,num)
|
||||
except:
|
||||
m = "'%s': value cannot be converted to address index" % num
|
||||
else:
|
||||
if len(str(me)) > cls.max_digits:
|
||||
m = "'%s': too many digits in addr idx" % num
|
||||
elif me < 1:
|
||||
m = "'%s': addr idx cannot be less than one" % num
|
||||
else:
|
||||
return me
|
||||
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class AddrIdxList(list,InitErrors,MMGenObject):
|
||||
|
||||
max_len = 1000000
|
||||
|
||||
def __init__(self,fmt_str=None,idx_list=None,on_fail='die',sep=','):
|
||||
self.arg_chk(type(self),on_fail)
|
||||
assert fmt_str or idx_list
|
||||
if idx_list:
|
||||
# dies on failure
|
||||
return list.__init__(self,sorted(set(AddrIdx(i) for i in idx_list)))
|
||||
elif fmt_str:
|
||||
desc = fmt_str
|
||||
ret,fs = [],"'%s': value cannot be converted to address index"
|
||||
from mmgen.util import msg
|
||||
for i in (fmt_str.split(sep)):
|
||||
j = i.split('-')
|
||||
if len(j) == 1:
|
||||
idx = AddrIdx(i,on_fail='return')
|
||||
if not idx: break
|
||||
ret.append(idx)
|
||||
elif len(j) == 2:
|
||||
beg = AddrIdx(j[0],on_fail='return')
|
||||
if not beg: break
|
||||
end = AddrIdx(j[1],on_fail='return')
|
||||
if not beg: break
|
||||
if end < beg:
|
||||
msg(fs % "%s-%s (invalid range)" % (beg,end)); break
|
||||
ret.extend([AddrIdx(x) for x in range(beg,end+1)])
|
||||
else:
|
||||
msg((fs % i) + ' list'); break
|
||||
else:
|
||||
return list.__init__(self,sorted(set(ret))) # fell off end of loop - success
|
||||
|
||||
return self.init_fail((fs + ' list') % desc,on_fail)
|
||||
|
||||
class Hilite(object):
|
||||
|
||||
color = 'red'
|
||||
|
|
@ -292,29 +163,142 @@ class Hilite(object):
|
|||
k = color if type(color) is str else cls.color # hack: override color with str value
|
||||
return globals()[k](s) if (color or cls.color_always) else s
|
||||
|
||||
# For attrs that are always present in the data instance
|
||||
# Reassignment and deletion forbidden
|
||||
class MMGenImmutableAttr(object): # Descriptor
|
||||
|
||||
def __init__(self,name,dtype,typeconv=True):
|
||||
self.typeconv = typeconv
|
||||
assert type(dtype) in (str,type)
|
||||
self.name = name
|
||||
self.dtype = dtype
|
||||
|
||||
def __get__(self,instance,owner):
|
||||
return instance.__dict__[self.name]
|
||||
|
||||
# forbid all reassignment
|
||||
def set_attr_ok(self,instance):
|
||||
return not hasattr(instance,self.name)
|
||||
|
||||
def __set__(self,instance,value):
|
||||
if not self.set_attr_ok(instance):
|
||||
m = "Attribute '{}' of {} instance cannot be reassigned"
|
||||
raise AttributeError(m.format(self.name,type(instance)))
|
||||
if self.typeconv: # convert type
|
||||
instance.__dict__[self.name] = \
|
||||
globals()[self.dtype](value,on_fail='raise') if type(self.dtype) == str else self.dtype(value)
|
||||
else: # check type
|
||||
if type(value) != self.dtype:
|
||||
m = "Attribute '{}' of {} instance must of type {}"
|
||||
raise TypeError(m.format(self.name,type(instance),self.dtype))
|
||||
instance.__dict__[self.name] = value
|
||||
|
||||
def __delete__(self,instance):
|
||||
m = "Atribute '{}' of {} instance cannot be deleted"
|
||||
raise AttributeError(m.format(self.name,type(instance)))
|
||||
|
||||
# For attrs that might not be present in the data instance
|
||||
# Reassignment or deletion allowed if specified
|
||||
class MMGenListItemAttr(MMGenImmutableAttr): # Descriptor
|
||||
|
||||
def __init__(self,name,dtype,typeconv=True,reassign_ok=False,delete_ok=False):
|
||||
self.reassign_ok = reassign_ok
|
||||
self.delete_ok = delete_ok
|
||||
MMGenImmutableAttr.__init__(self,name,dtype,typeconv=typeconv)
|
||||
|
||||
# return None if attribute doesn't exist
|
||||
def __get__(self,instance,owner):
|
||||
try: return instance.__dict__[self.name]
|
||||
except: return None
|
||||
|
||||
def set_attr_ok(self,instance):
|
||||
return getattr(instance,self.name) == None or self.reassign_ok
|
||||
|
||||
def __delete__(self,instance):
|
||||
if self.delete_ok:
|
||||
if self.name in instance.__dict__:
|
||||
del instance.__dict__[self.name]
|
||||
else:
|
||||
MMGenImmutableAttr.__delete__(self,instance)
|
||||
|
||||
class MMGenListItem(MMGenObject):
|
||||
|
||||
def __init__(self,*args,**kwargs):
|
||||
if args:
|
||||
raise ValueError, 'Non-keyword args not allowed'
|
||||
for k in kwargs:
|
||||
if kwargs[k] != None:
|
||||
setattr(self,k,kwargs[k])
|
||||
|
||||
# prevent setting random attributes
|
||||
def __setattr__(self,name,value):
|
||||
if name not in type(self).__dict__:
|
||||
m = "'{}': no such attribute in class {}"
|
||||
raise AttributeError(m.format(name,type(self)))
|
||||
return object.__setattr__(self,name,value)
|
||||
|
||||
class AddrIdx(int,InitErrors):
|
||||
max_digits = 7
|
||||
def __new__(cls,num,on_fail='die'):
|
||||
cls.arg_chk(cls,on_fail)
|
||||
try:
|
||||
assert type(num) is not float,'is float'
|
||||
me = int.__new__(cls,num)
|
||||
assert len(str(me)) <= cls.max_digits,'is more than {} digits'.format(cls.max_digits)
|
||||
assert me > 0,'is less than one'
|
||||
return me
|
||||
except Exception as e:
|
||||
m = "{!r}: value cannot be converted to address index ({})"
|
||||
return cls.init_fail(m.format(num,e[0]),on_fail)
|
||||
|
||||
class AddrIdxList(list,InitErrors,MMGenObject):
|
||||
max_len = 1000000
|
||||
def __init__(self,fmt_str=None,idx_list=None,on_fail='die',sep=','):
|
||||
self.arg_chk(type(self),on_fail)
|
||||
try:
|
||||
if idx_list:
|
||||
return list.__init__(self,sorted(set(AddrIdx(i,on_fail='raise') for i in idx_list)))
|
||||
elif fmt_str:
|
||||
ret = []
|
||||
for i in (fmt_str.split(sep)):
|
||||
j = i.split('-')
|
||||
if len(j) == 1:
|
||||
idx = AddrIdx(i,on_fail='raise')
|
||||
if not idx: break
|
||||
ret.append(idx)
|
||||
elif len(j) == 2:
|
||||
beg = AddrIdx(j[0],on_fail='raise')
|
||||
if not beg: break
|
||||
end = AddrIdx(j[1],on_fail='raise')
|
||||
if not beg: break
|
||||
if end < beg: break
|
||||
ret.extend([AddrIdx(x,on_fail='raise') for x in range(beg,end+1)])
|
||||
else: break
|
||||
else:
|
||||
return list.__init__(self,sorted(set(ret))) # fell off end of loop - success
|
||||
raise ValueError,"{!r}: invalid range".format(i)
|
||||
except Exception as e:
|
||||
m = "{!r}: value cannot be converted to AddrIdxList ({})"
|
||||
return type(self).init_fail(m.format(idx_list or fmt_str,e[0]),on_fail)
|
||||
|
||||
class BTCAmt(Decimal,Hilite,InitErrors):
|
||||
color = 'yellow'
|
||||
max_prec = 8
|
||||
max_amt = 21000000
|
||||
|
||||
def __new__(cls,num,on_fail='die'):
|
||||
if type(num) == cls: return num
|
||||
cls.arg_chk(cls,on_fail)
|
||||
try:
|
||||
assert type(num) is not float,'number is floating-point'
|
||||
assert type(num) is not long,'number is a long integer'
|
||||
me = Decimal.__new__(cls,str(num))
|
||||
except:
|
||||
m = "'%s': value cannot be converted to decimal" % num
|
||||
else:
|
||||
if me.normalize().as_tuple()[-1] < -cls.max_prec:
|
||||
from mmgen.globalvars import g
|
||||
m = "'{}': too many decimal places in {} amount".format(num,g.coin)
|
||||
elif me > cls.max_amt:
|
||||
from mmgen.globalvars import g
|
||||
m = "'{}': {} amount too large (>{})".format(num,g.coin,cls.max_amt)
|
||||
# elif me.as_tuple()[0]:
|
||||
# m = "'%s': BTC amount cannot be negative" % num
|
||||
else:
|
||||
return me
|
||||
return cls.init_fail(m,on_fail)
|
||||
assert me.normalize().as_tuple()[-1] >= -cls.max_prec,'too many decimal places in coin amount'
|
||||
assert me <= cls.max_amt,'coin amount too large (>{})'.format(cls.max_amt)
|
||||
assert me >= 0,'coin amount cannot be negative'
|
||||
return me
|
||||
except Exception as e:
|
||||
m = "{!r}: value cannot be converted to BTCAmt ({})"
|
||||
return cls.init_fail(m.format(num,e[0]),on_fail)
|
||||
|
||||
@classmethod
|
||||
def fmtc(cls):
|
||||
|
|
@ -367,16 +351,21 @@ class BTCAddr(str,Hilite,InitErrors,MMGenObject):
|
|||
color = 'cyan'
|
||||
width = 35 # max len of testnet p2sh addr
|
||||
def __new__(cls,s,on_fail='die'):
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
m = "'%s': value is not a Bitcoin address" % s
|
||||
me = str.__new__(cls,s)
|
||||
from mmgen.bitcoin import verify_addr,addr_pfxs
|
||||
if type(s) in (str,unicode,BTCAddr):
|
||||
me.addr_fmt = verify_addr(s,return_type=True)
|
||||
me.testnet = s[0] in addr_pfxs['testnet']
|
||||
if me.addr_fmt:
|
||||
return me
|
||||
return cls.init_fail(m,on_fail)
|
||||
try:
|
||||
assert set(s) <= set(ascii_letters+digits),'contains non-ascii characters'
|
||||
me = str.__new__(cls,s)
|
||||
from mmgen.bitcoin import verify_addr
|
||||
va = verify_addr(s,return_dict=True)
|
||||
assert va,'failed verification'
|
||||
me.addr_fmt = va['format']
|
||||
me.hex = va['hex']
|
||||
me.testnet = va['net'] == 'testnet'
|
||||
return me
|
||||
except Exception as e:
|
||||
m = "{!r}: value cannot be converted to Bitcoin address ({})"
|
||||
return cls.init_fail(m.format(s,e[0]),on_fail)
|
||||
|
||||
@classmethod
|
||||
def fmtc(cls,s,**kwargs):
|
||||
|
|
@ -390,13 +379,13 @@ class BTCAddr(str,Hilite,InitErrors,MMGenObject):
|
|||
|
||||
def is_for_current_chain(self):
|
||||
from mmgen.globalvars import g
|
||||
assert g.chain, 'global chain variable unset'
|
||||
from bitcoin import addr_pfxs
|
||||
return self[0] in addr_pfxs[g.chain]
|
||||
assert g.chain,'global chain variable unset'
|
||||
from bitcoin import btc_addr_pfxs
|
||||
return self[0] in btc_addr_pfxs[g.chain]
|
||||
|
||||
def is_mainnet(self):
|
||||
from bitcoin import addr_pfxs
|
||||
return self[0] in addr_pfxs['mainnet']
|
||||
from bitcoin import btc_addr_pfxs
|
||||
return self[0] in btc_addr_pfxs['mainnet']
|
||||
|
||||
def is_in_tracking_wallet(self):
|
||||
from mmgen.rpc import bitcoin_connection
|
||||
|
|
@ -408,122 +397,118 @@ class SeedID(str,Hilite,InitErrors):
|
|||
width = 8
|
||||
trunc_ok = False
|
||||
def __new__(cls,seed=None,sid=None,on_fail='die'):
|
||||
if type(sid) == cls: return sid
|
||||
cls.arg_chk(cls,on_fail)
|
||||
assert seed or sid
|
||||
if seed:
|
||||
from mmgen.seed import Seed
|
||||
from mmgen.util import make_chksum_8
|
||||
if type(seed) == Seed:
|
||||
try:
|
||||
if seed:
|
||||
from mmgen.seed import Seed
|
||||
assert type(seed) == Seed,'not a Seed instance'
|
||||
from mmgen.util import make_chksum_8
|
||||
return str.__new__(cls,make_chksum_8(seed.get_data()))
|
||||
elif sid:
|
||||
sid = str(sid)
|
||||
from string import hexdigits
|
||||
if len(sid) == cls.width and set(sid) <= set(hexdigits.upper()):
|
||||
elif sid:
|
||||
assert set(sid) <= set(hexdigits.upper()),'not uppercase hex digits'
|
||||
assert len(sid) == cls.width,'not {} characters wide'.format(cls.width)
|
||||
return str.__new__(cls,sid)
|
||||
|
||||
m = "'%s': value cannot be converted to SeedID" % str(seed or sid)
|
||||
return cls.init_fail(m,on_fail)
|
||||
raise ValueError,'no arguments provided'
|
||||
except Exception as e:
|
||||
m = "{!r}: value cannot be converted to SeedID ({})"
|
||||
return cls.init_fail(m.format(seed or sid,e[0]),on_fail)
|
||||
|
||||
class MMGenID(str,Hilite,InitErrors,MMGenObject):
|
||||
|
||||
color = 'orange'
|
||||
width = 0
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls,s,on_fail='die'):
|
||||
cls.arg_chk(cls,on_fail)
|
||||
s = str(s)
|
||||
try:
|
||||
ss = s.split(':')
|
||||
assert len(ss) in (2,3)
|
||||
sid = SeedID(sid=ss[0],on_fail='silent')
|
||||
assert sid
|
||||
idx = AddrIdx(ss[-1],on_fail='silent')
|
||||
assert idx
|
||||
t = MMGenAddrType((MMGenAddrType.dfl_mmtype,ss[1])[len(ss) != 2],on_fail='silent')
|
||||
assert t
|
||||
me = str.__new__(cls,'{}:{}:{}'.format(sid,t,idx))
|
||||
me.sid = sid
|
||||
ss = str(s).split(':')
|
||||
assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
|
||||
t = MMGenAddrType((ss[1],MMGenAddrType.dfl_mmtype)[len(ss)==2],on_fail='raise')
|
||||
me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
|
||||
me.sid = SeedID(sid=ss[0],on_fail='raise')
|
||||
me.idx = AddrIdx(ss[-1],on_fail='raise')
|
||||
me.mmtype = t
|
||||
me.idx = idx
|
||||
me.al_id = AddrListID(sid,me.mmtype) # key with colon!
|
||||
assert me.al_id
|
||||
me.sort_key = '{}:{}:{:0{w}}'.format(sid,t,idx,w=idx.max_digits)
|
||||
me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
|
||||
me.sort_key = '{}:{}:{:0{w}}'.format(me.sid,me.mmtype,me.idx,w=me.idx.max_digits)
|
||||
return me
|
||||
except:
|
||||
m = "'%s': value cannot be converted to MMGenID" % s
|
||||
return cls.init_fail(m,on_fail)
|
||||
except Exception as e:
|
||||
m = "{}\n{!r}: value cannot be converted to MMGenID"
|
||||
return cls.init_fail(m.format(e[0],s),on_fail)
|
||||
|
||||
class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
|
||||
|
||||
color = 'orange'
|
||||
width = 0
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls,s,on_fail='die'):
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
obj,sort_key = None,None
|
||||
ret = None
|
||||
try:
|
||||
obj = MMGenID(s,on_fail='silent')
|
||||
sort_key,t = obj.sort_key,'mmgen'
|
||||
except:
|
||||
ret = MMGenID(s,on_fail='raise')
|
||||
sort_key,idtype = ret.sort_key,'mmgen'
|
||||
except Exception as e:
|
||||
try:
|
||||
assert len(s) > 4 and s[:4] == 'btc:'
|
||||
obj,sort_key,t = str(s),'z_'+s,'non-mmgen'
|
||||
except:
|
||||
pass
|
||||
assert s[:4] == 'btc:',"not a string beginning with the prefix 'btc:'"
|
||||
assert set(s[4:]) <= set(ascii_letters+digits),'contains non-ascii characters'
|
||||
assert len(s) > 4,'not more that four characters long'
|
||||
ret,sort_key,idtype = str(s),'z_'+s,'non-mmgen'
|
||||
except Exception as f:
|
||||
m = "{}\nValue is {}\n{!r}: value cannot be converted to TwMMGenID"
|
||||
return cls.init_fail(m.format(e[0],f[0],s),on_fail)
|
||||
|
||||
if obj and sort_key:
|
||||
me = str.__new__(cls,obj)
|
||||
me.obj = obj
|
||||
me.sort_key = sort_key
|
||||
me.type = t
|
||||
return me
|
||||
|
||||
m = "'{}': value cannot be converted to {}".format(s,cls.__name__)
|
||||
return cls.init_fail(m,on_fail)
|
||||
me = str.__new__(cls,ret)
|
||||
me.obj = ret
|
||||
me.sort_key = sort_key
|
||||
me.type = idtype
|
||||
return me
|
||||
|
||||
# contains TwMMGenID,TwComment. Not for display
|
||||
class TwLabel(str,InitErrors,MMGenObject):
|
||||
|
||||
def __new__(cls,s,on_fail='die'):
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
try:
|
||||
ss = s.split(None,1)
|
||||
me = str.__new__(cls,s)
|
||||
me.mmid = TwMMGenID(ss[0],on_fail='silent')
|
||||
assert me.mmid
|
||||
me.comment = TwComment(ss[1] if len(ss) == 2 else '',on_fail='silent')
|
||||
assert me.comment != None
|
||||
mmid = TwMMGenID(ss[0],on_fail='raise')
|
||||
comment = TwComment(ss[1] if len(ss) == 2 else '',on_fail='raise')
|
||||
me = str.__new__(cls,'{}{}'.format(mmid,' {}'.format(comment) if comment else ''))
|
||||
me.mmid = mmid
|
||||
me.comment = comment
|
||||
return me
|
||||
except:
|
||||
m = "'{}': value cannot be converted to {}".format(s,cls.__name__)
|
||||
return cls.init_fail(m,on_fail)
|
||||
except Exception as e:
|
||||
m = u"{}\n{!r}: value cannot be converted to TwLabel"
|
||||
return cls.init_fail(m.format(e[0],s),on_fail)
|
||||
|
||||
class HexStr(str,Hilite,InitErrors):
|
||||
color = 'red'
|
||||
trunc_ok = False
|
||||
def __new__(cls,s,on_fail='die',case='lower'):
|
||||
if type(s) == cls: return s
|
||||
assert case in ('upper','lower')
|
||||
cls.arg_chk(cls,on_fail)
|
||||
from string import hexdigits
|
||||
if set(s) <= set(getattr(hexdigits,case)()) and not len(s) % 2:
|
||||
try:
|
||||
assert type(s) in (str,unicode,bytes),'not a string'
|
||||
assert set(s) <= set(getattr(hexdigits,case)()),'not {}case hexadecimal symbols'.format(case)
|
||||
assert not len(s) % 2,'odd-length string'
|
||||
return str.__new__(cls,s)
|
||||
m = "'{}': value cannot be converted to {}".format(s,cls.__name__)
|
||||
return cls.init_fail(m,on_fail)
|
||||
except Exception as e:
|
||||
m = "{!r}: value cannot be converted to {} (value is {})"
|
||||
return cls.init_fail(m.format(s,cls.__name__,e[0]),on_fail)
|
||||
|
||||
class MMGenTxID(str,Hilite,InitErrors):
|
||||
class MMGenTxID(HexStr,Hilite,InitErrors):
|
||||
color = 'red'
|
||||
width = 6
|
||||
trunc_ok = False
|
||||
hexcase = 'upper'
|
||||
def __new__(cls,s,on_fail='die'):
|
||||
cls.arg_chk(cls,on_fail)
|
||||
from string import hexdigits
|
||||
if len(s) == cls.width and set(s) <= set(getattr(hexdigits,cls.hexcase)()):
|
||||
return str.__new__(cls,s)
|
||||
m = "'{}': value cannot be converted to {}".format(s,cls.__name__)
|
||||
return cls.init_fail(m,on_fail)
|
||||
try:
|
||||
ret = HexStr.__new__(cls,s,case=cls.hexcase,on_fail='raise')
|
||||
assert len(s) == cls.width,'Value is not {} characters wide'.format(cls.width)
|
||||
return ret
|
||||
except Exception as e:
|
||||
m = "{}\n{!r}: value cannot be converted to {}"
|
||||
return cls.init_fail(m.format(e[0],s,cls.__name__),on_fail)
|
||||
|
||||
class BitcoinTxID(MMGenTxID):
|
||||
color = 'purple'
|
||||
|
|
@ -533,34 +518,29 @@ class BitcoinTxID(MMGenTxID):
|
|||
class WifKey(str,Hilite,InitErrors):
|
||||
width = 53
|
||||
color = 'blue'
|
||||
desc = 'WIF key'
|
||||
def __new__(cls,s,on_fail='die',errmsg=None):
|
||||
def __new__(cls,s,on_fail='die',testnet=None): # fall back to g.testnet
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
from mmgen.bitcoin import wif2hex
|
||||
if wif2hex(s):
|
||||
me = str.__new__(cls,s)
|
||||
return me
|
||||
m = errmsg or "'{}': invalid value for {}".format(s,cls.desc)
|
||||
return cls.init_fail(m,on_fail)
|
||||
try:
|
||||
assert set(s) <= set(ascii_letters+digits),'not an ascii string'
|
||||
from mmgen.bitcoin import wif2hex
|
||||
if wif2hex(s,testnet=testnet):
|
||||
return str.__new__(cls,s)
|
||||
raise ValueError,'failed verification'
|
||||
except Exception as e:
|
||||
m = '{!r}: invalid value for WIF key ({})'.format(s,e[0])
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class HexStr(str,Hilite,InitErrors):
|
||||
color = 'red'
|
||||
trunc_ok = False
|
||||
def __new__(cls,s,on_fail='die',case='lower'):
|
||||
assert case in ('upper','lower')
|
||||
cls.arg_chk(cls,on_fail)
|
||||
from string import hexdigits
|
||||
if set(s) <= set(getattr(hexdigits,case)()) and not len(s) % 2:
|
||||
return str.__new__(cls,s)
|
||||
m = "'{}': value cannot be converted to {}".format(s,cls.__name__)
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class PubKey(HexStr,MMGenObject):
|
||||
class PubKey(HexStr,MMGenObject): # TODO: add some real checks
|
||||
def __new__(cls,s,compressed,on_fail='die'):
|
||||
assert type(compressed) == bool
|
||||
me = HexStr.__new__(cls,s,case='lower')
|
||||
me.compressed = compressed
|
||||
return me
|
||||
try:
|
||||
assert type(compressed) == bool,"'compressed' must be of type bool"
|
||||
me = HexStr.__new__(cls,s,case='lower',on_fail='raise')
|
||||
me.compressed = compressed
|
||||
return me
|
||||
except Exception as e:
|
||||
m = '{!r}: invalid value for pubkey ({})'.format(s,e[0])
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class PrivKey(str,Hilite,InitErrors,MMGenObject):
|
||||
|
||||
|
|
@ -571,104 +551,71 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
|
|||
compressed = MMGenImmutableAttr('compressed',bool,typeconv=False)
|
||||
wif = MMGenImmutableAttr('wif',WifKey,typeconv=False)
|
||||
|
||||
def __new__(*args,**kwargs): # initialize with (priv_bin,compressed), WIF or self
|
||||
cls = args[0]
|
||||
assert set(kwargs) <= set(['on_fail','wif'])
|
||||
on_fail = kwargs['on_fail'] if 'on_fail' in kwargs else 'die'
|
||||
# initialize with (priv_bin,compressed), WIF or self
|
||||
def __new__(cls,s=None,compressed=None,wif=None,on_fail='die',testnet=None): # default to g.testnet
|
||||
|
||||
if type(s) == cls: return s
|
||||
assert wif or (s and type(compressed) == bool),'Incorrect args for PrivKey()'
|
||||
cls.arg_chk(cls,on_fail)
|
||||
|
||||
if len(args) == 2:
|
||||
assert type(args[1]) == cls
|
||||
return args[1]
|
||||
|
||||
if 'wif' in kwargs:
|
||||
assert len(args) == 1
|
||||
if wif:
|
||||
try:
|
||||
from mmgen.bitcoin import wif2hex,wif_is_compressed # TODO: move these here
|
||||
wif = WifKey(kwargs['wif'])
|
||||
me = str.__new__(cls,wif2hex(wif))
|
||||
me.compressed = wif_is_compressed(wif)
|
||||
me.wif = wif
|
||||
assert set(wif) <= set(ascii_letters+digits),'not an ascii string'
|
||||
from mmgen.bitcoin import wif2hex
|
||||
w2h = wif2hex(wif,testnet=testnet)
|
||||
assert w2h,"wif2hex() failed for wif key {!r}".format(wif)
|
||||
me = str.__new__(cls,w2h['hex'])
|
||||
me.compressed = w2h['compressed']
|
||||
me.testnet = w2h['testnet']
|
||||
me.wif = str.__new__(WifKey,wif) # check has been done
|
||||
return me
|
||||
except:
|
||||
fs = "Value '{}' cannot be converted to WIF key"
|
||||
errmsg = fs.format(kwargs['wif'])
|
||||
return cls.init_fail(errmsg,on_fail)
|
||||
|
||||
cls,s,compressed = args
|
||||
except Exception as e:
|
||||
fs = "Value {!r} cannot be converted to WIF key ({})"
|
||||
return cls.init_fail(fs.format(wif,e[0]),on_fail)
|
||||
|
||||
try:
|
||||
from binascii import hexlify
|
||||
assert len(s) == cls.width / 2
|
||||
assert len(s) == cls.width / 2,'Key length must be {}'.format(cls.width/2)
|
||||
me = str.__new__(cls,hexlify(s))
|
||||
me.compressed = compressed
|
||||
me.wif = me.towif()
|
||||
me.wif = me.towif(testnet=testnet)
|
||||
# me.testnet = testnet # leave uninitialized for now
|
||||
return me
|
||||
except:
|
||||
fs = "Key={}\nCompressed={}\nValue pair cannot be converted to {}"
|
||||
errmsg = fs.format(repr(s),compressed,cls.__name__)
|
||||
return cls.init_fail(errmsg,on_fail)
|
||||
except Exception as e:
|
||||
fs = "Key={}\nCompressed={}\nValue pair cannot be converted to PrivKey ({})"
|
||||
return cls.init_fail(fs.format(repr(s),compressed,e[0]),on_fail)
|
||||
|
||||
def towif(self):
|
||||
def towif(self,testnet=None):
|
||||
from mmgen.bitcoin import hex2wif
|
||||
return WifKey(hex2wif(self,compressed=self.compressed))
|
||||
return WifKey(hex2wif(self,compressed=self.compressed),on_fail='raise',testnet=testnet)
|
||||
|
||||
class MMGenAddrType(str,Hilite,InitErrors):
|
||||
width = 1
|
||||
trunc_ok = False
|
||||
color = 'blue'
|
||||
mmtypes = {
|
||||
# TODO 'L' is ambiguous: For user, it means MMGen legacy uncompressed address.
|
||||
# For generator functions, 'L' means any p2pkh address, and 'S' any ps2h address
|
||||
'L': 'legacy',
|
||||
'S': 'segwit',
|
||||
# 'l': 'litecoin',
|
||||
# 'e': 'ethereum',
|
||||
# 'E': 'ethereum_classic',
|
||||
# 'm': 'monero',
|
||||
# 'z': 'zcash',
|
||||
}
|
||||
dfl_mmtype = 'L'
|
||||
def __new__(cls,s,on_fail='die',errmsg=None):
|
||||
cls.arg_chk(cls,on_fail)
|
||||
for k,v in cls.mmtypes.items():
|
||||
if s in (k,v):
|
||||
if s == v: s = k
|
||||
me = str.__new__(cls,s)
|
||||
me.name = cls.mmtypes[s]
|
||||
return me
|
||||
m = errmsg or "'{}': invalid value for {}".format(s,cls.__name__)
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class MMGenPasswordType(MMGenAddrType):
|
||||
mmtypes = { 'P': 'password' }
|
||||
|
||||
class AddrListID(str,Hilite,InitErrors):
|
||||
class AddrListID(str,Hilite,InitErrors,MMGenObject):
|
||||
width = 10
|
||||
trunc_ok = False
|
||||
color = 'yellow'
|
||||
def __new__(cls,sid,mmtype,on_fail='die'):
|
||||
cls.arg_chk(cls,on_fail)
|
||||
m = "'{}': not a SeedID. Cannot create {}".format(sid,cls.__name__)
|
||||
if type(sid) == SeedID:
|
||||
m = "'{}': not an MMGenAddrType object. Cannot create {}".format(mmtype,cls.__name__)
|
||||
if type(mmtype) in (MMGenAddrType,MMGenPasswordType):
|
||||
me = str.__new__(cls,sid+':'+mmtype) # colon in key is OK
|
||||
me.sid = sid
|
||||
me.mmtype = mmtype
|
||||
return me
|
||||
return cls.init_fail(m,on_fail)
|
||||
try:
|
||||
assert type(sid) == SeedID,"{!r} not a SeedID instance".format(sid)
|
||||
t = MMGenAddrType,MMGenPasswordType
|
||||
assert type(mmtype) in t,"{!r} not an instance of {}".format(mmtype,','.join([i.__name__ for i in t]))
|
||||
me = str.__new__(cls,sid+':'+mmtype)
|
||||
me.sid = sid
|
||||
me.mmtype = mmtype
|
||||
return me
|
||||
except Exception as e:
|
||||
m = "Cannot create AddrListID ({})".format(e[0])
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class MMGenLabel(unicode,Hilite,InitErrors):
|
||||
|
||||
color = 'pink'
|
||||
allowed = []
|
||||
forbidden = []
|
||||
max_len = 0
|
||||
min_len = 0
|
||||
desc = 'label'
|
||||
|
||||
def __new__(cls,s,on_fail='die',msg=None):
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
for k in cls.forbidden,cls.allowed:
|
||||
assert type(k) == list
|
||||
|
|
@ -677,23 +624,17 @@ class MMGenLabel(unicode,Hilite,InitErrors):
|
|||
s = s.strip()
|
||||
if type(s) != unicode:
|
||||
s = s.decode('utf8')
|
||||
except:
|
||||
m = "'%s': value is not a valid UTF-8 string" % s
|
||||
else:
|
||||
from mmgen.util import capfirst
|
||||
if len(s) > cls.max_len:
|
||||
m = u"'{}': {} too long (>{} symbols)".format(s,capfirst(cls.desc),cls.max_len)
|
||||
elif len(s) < cls.min_len:
|
||||
m = u"'{}': {} too short (<{} symbols)".format(s,capfirst(cls.desc),cls.min_len)
|
||||
elif cls.allowed and not set(list(s)).issubset(set(cls.allowed)):
|
||||
m = u"{} '{}' contains non-allowed symbols: {}".format(capfirst(cls.desc),s,
|
||||
' '.join(set(list(s)) - set(cls.allowed)))
|
||||
elif cls.forbidden and any(ch in s for ch in cls.forbidden):
|
||||
m = u"{} '{}' contains one of these forbidden symbols: '{}'".format(capfirst(cls.desc),s,
|
||||
"', '".join(cls.forbidden))
|
||||
else:
|
||||
return unicode.__new__(cls,s)
|
||||
return cls.init_fail((msg+'\n' if msg else '') + m,on_fail)
|
||||
assert len(s) <= cls.max_len, 'too long (>{} symbols)'.format(cls.max_len)
|
||||
assert len(s) >= cls.min_len, 'too short (<{} symbols)'.format(cls.min_len)
|
||||
assert not cls.allowed or set(list(s)).issubset(set(cls.allowed)),\
|
||||
u'contains non-allowed symbols: {}'.format(' '.join(set(list(s)) - set(cls.allowed)))
|
||||
assert not cls.forbidden or not any(ch in s for ch in cls.forbidden),\
|
||||
u"contains one of these forbidden symbols: '{}'".format("', '".join(cls.forbidden))
|
||||
return unicode.__new__(cls,s)
|
||||
except Exception as e:
|
||||
m = u"{!r}: value cannot be converted to {} ({})"
|
||||
return cls.init_fail(m.format(s,cls.__name__,e),on_fail)
|
||||
|
||||
class MMGenWalletLabel(MMGenLabel):
|
||||
max_len = 48
|
||||
|
|
@ -715,4 +656,41 @@ class MMGenPWIDString(MMGenLabel):
|
|||
desc = 'password ID string'
|
||||
forbidden = list(u' :/\\')
|
||||
|
||||
class AddrListList(list,MMGenObject): pass
|
||||
class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
|
||||
width = 1
|
||||
trunc_ok = False
|
||||
color = 'blue'
|
||||
mmtypes = { # since 'name' is used to cook the seed, it must never change!
|
||||
'L': {'name':'legacy','comp':False,'gen':'p2pkh', 'fmt':'p2pkh','desc':'Legacy uncompressed Bitcoin address'},
|
||||
'S': {'name':'segwit','comp':True, 'gen':'segwit','fmt':'p2sh', 'desc':'Bitcoin Segwit P2SH-P2WPK address' },
|
||||
'C': {'name':'compressed','comp':True,'gen':'p2pkh','fmt':'p2pkh','desc':'Compressed Bitcoin P2PKH address'}
|
||||
# 'l': 'litecoin',
|
||||
# 'e': 'ethereum',
|
||||
# 'E': 'ethereum_classic',
|
||||
# 'm': 'monero',
|
||||
# 'z': 'zcash',
|
||||
}
|
||||
dfl_mmtype = 'L'
|
||||
def __new__(cls,s,on_fail='die',errmsg=None):
|
||||
if type(s) == cls: return s
|
||||
cls.arg_chk(cls,on_fail)
|
||||
try:
|
||||
for k,v in cls.mmtypes.items():
|
||||
if s in (k,v['name']):
|
||||
if s == v['name']: s = k
|
||||
me = str.__new__(cls,s)
|
||||
me.name = v['name']
|
||||
me.compressed = v['comp']
|
||||
me.gen_method = v['gen']
|
||||
me.desc = v['desc']
|
||||
me.addr_fmt = v['fmt']
|
||||
return me
|
||||
raise ValueError,'not found'
|
||||
except Exception as e:
|
||||
m = errmsg or '{!r}: invalid value for {} ({})'.format(s,cls.__name__,e[0])
|
||||
return cls.init_fail(m,on_fail)
|
||||
|
||||
class MMGenPasswordType(MMGenAddrType):
|
||||
mmtypes = {
|
||||
'P': {'name':'password','comp':False,'gen':None,'fmt':None,'desc':'Password generated from MMGen seed'}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,6 +61,8 @@ common_opts_data = """
|
|||
--, --testnet=0|1 Disable or enable testnet
|
||||
--, --skip-cfg-file Skip reading the configuration file
|
||||
--, --version Print version information and exit
|
||||
--, --bob Switch to user "Bob" in MMGen regtest setup
|
||||
--, --alice Switch to user "Alice" in MMGen regtest setup
|
||||
""".format(
|
||||
pnm=g.proj_name,
|
||||
cu_dfl=g.coin,
|
||||
|
|
@ -250,6 +252,16 @@ def init(opts_f,add_opts=[],opt_filter=None):
|
|||
for k in ('prog_name','desc','usage','options','notes'):
|
||||
if k in opts_data: del opts_data[k]
|
||||
|
||||
if g.bob or g.alice:
|
||||
import regtest as rt
|
||||
rt.user(('alice','bob')[g.bob],quiet=True)
|
||||
g.testnet = True
|
||||
g.rpc_host = 'localhost'
|
||||
g.rpc_port = rt.rpc_port
|
||||
g.rpc_user = rt.rpc_user
|
||||
g.rpc_password = rt.rpc_password
|
||||
g.data_dir = os.path.join(g.home_dir,'.'+g.proj_name.lower(),'regtest')
|
||||
|
||||
if g.debug: opt_postproc_debug()
|
||||
|
||||
return args
|
||||
|
|
|
|||
110
mmgen/tool.py
110
mmgen/tool.py
|
|
@ -27,6 +27,7 @@ import mmgen.bitcoin as mmb
|
|||
from mmgen.common import *
|
||||
from mmgen.crypto import *
|
||||
from mmgen.tx import *
|
||||
from mmgen.addr import *
|
||||
|
||||
pnm = g.proj_name
|
||||
|
||||
|
|
@ -78,7 +79,7 @@ cmd_data = OrderedDict([
|
|||
|
||||
('Listaddress',['<{} address> [str]'.format(pnm),'minconf [int=1]','pager [bool=False]','showempty [bool=True]''showbtcaddr [bool=True]']),
|
||||
('Listaddresses',["addrs [str='']",'minconf [int=1]','showempty [bool=False]','pager [bool=False]','showbtcaddrs [bool=False]']),
|
||||
('Getbalance', ['minconf [int=1]']),
|
||||
('Getbalance', ['minconf [int=1]','quiet [bool=False]']),
|
||||
('Txview', ['<{} TX file(s)> [str]'.format(pnm),'pager [bool=False]','terse [bool=False]',"sort [str='mtime'] (options: 'ctime','atime')",'MARGS']),
|
||||
('Twview', ["sort [str='age']",'reverse [bool=False]','show_days [bool=True]','show_mmid [bool=True]','minconf [int=1]','wide [bool=False]','pager [bool=False]']),
|
||||
|
||||
|
|
@ -308,6 +309,8 @@ def print_convert_results(indata,enc,dec,dtype):
|
|||
if error:
|
||||
die(3,"Error! Recoded data doesn't match input!")
|
||||
|
||||
kg = KeyGenerator()
|
||||
|
||||
def Hexdump(infile, cols=8, line_nums=True):
|
||||
Msg(pretty_hexdump(
|
||||
get_data_from_file(infile,dash=True,silent=True,binary=True),
|
||||
|
|
@ -330,41 +333,37 @@ def Randhex(nbytes='32'):
|
|||
Msg(ba.hexlify(get_random(int(nbytes))))
|
||||
|
||||
def Randwif(compressed=False):
|
||||
r_hex = ba.hexlify(get_random(32))
|
||||
enc = mmb.hex2wif(r_hex,compressed)
|
||||
dec = wif2hex(enc)
|
||||
print_convert_results(r_hex,enc,dec,'hex')
|
||||
Msg(PrivKey(get_random(32),compressed).wif)
|
||||
|
||||
def Randpair(compressed=False,segwit=False):
|
||||
if segwit: compressed = True
|
||||
r_hex = ba.hexlify(get_random(32))
|
||||
wif = mmb.hex2wif(r_hex,compressed)
|
||||
addr = mmb.privnum2addr(int(r_hex,16),compressed,segwit=segwit)
|
||||
Vmsg('Key (hex): %s' % r_hex)
|
||||
Vmsg_r('Key (WIF): '); Msg(wif)
|
||||
ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)])
|
||||
privhex = PrivKey(get_random(32),compressed)
|
||||
addr = ag.to_addr(kg.to_pubhex(privhex))
|
||||
Vmsg('Key (hex): %s' % privhex)
|
||||
Vmsg_r('Key (WIF): '); Msg(privhex.wif)
|
||||
Vmsg_r('Addr: '); Msg(addr)
|
||||
|
||||
def Wif2addr(wif,segwit=False):
|
||||
compressed = mmb.wif_is_compressed(wif)
|
||||
if segwit and not compressed:
|
||||
die(1,'Segwit address cannot be generated from uncompressed WIF')
|
||||
privhex = wif2hex(wif)
|
||||
addr = mmb.privnum2addr(int(privhex,16),compressed,segwit=segwit)
|
||||
privhex = PrivKey(wif=wif)
|
||||
if segwit and not privhex.compressed:
|
||||
die(2,'Segwit addresses must use compressed public keys')
|
||||
ag = AddrGenerator(('p2pkh','segwit')[bool(segwit)])
|
||||
addr = ag.to_addr(kg.to_pubhex(privhex))
|
||||
Vmsg_r('Addr: '); Msg(addr)
|
||||
|
||||
def Wif2segwit_pair(wif):
|
||||
if not mmb.wif_is_compressed(wif):
|
||||
privhex = PrivKey(wif=wif)
|
||||
if not privhex.compressed:
|
||||
die(1,'Segwit address cannot be generated from uncompressed WIF')
|
||||
privhex = wif2hex(wif)
|
||||
pubhex = mmb.privnum2pubhex(int(privhex,16),compressed=True)
|
||||
rs = mmb.pubhex2redeem_script(pubhex)
|
||||
addr = mmb.hexaddr2addr(mmb.hash160(rs),p2sh=True)
|
||||
addr_chk = mmb.privnum2addr(int(privhex,16),compressed=True,segwit=True)
|
||||
assert addr == addr_chk
|
||||
ag = AddrGenerator('segwit')
|
||||
pubhex = kg.to_pubhex(privhex)
|
||||
addr = ag.to_addr(pubhex)
|
||||
rs = ag.to_segwit_redeem_script(pubhex)
|
||||
Msg('{}\n{}'.format(rs,addr))
|
||||
|
||||
def Hexaddr2addr(hexaddr): Msg(mmb.hexaddr2addr(hexaddr))
|
||||
def Addr2hexaddr(addr): Msg(mmb.verify_addr(addr,return_hex=True))
|
||||
def Addr2hexaddr(addr): Msg(mmb.verify_addr(addr,return_dict=True)['hex'])
|
||||
def Hash160(pubkeyhex): Msg(mmb.hash160(pubkeyhex))
|
||||
def Pubhex2addr(pubkeyhex,p2sh=False): Msg(mmb.hexaddr2addr(mmb.hash160(pubkeyhex),p2sh=p2sh))
|
||||
def Wif2hex(wif): Msg(wif2hex(wif))
|
||||
|
|
@ -379,13 +378,14 @@ def Privhex2pubhex(privhex,compressed=False): # new
|
|||
def Pubhex2redeem_script(pubhex): # new
|
||||
Msg(mmb.pubhex2redeem_script(pubhex))
|
||||
def Wif2redeem_script(wif): # new
|
||||
if not mmb.wif_is_compressed(wif):
|
||||
die(1,'Witness redeem script cannot be generated from uncompressed WIF')
|
||||
pubhex = mmb.privnum2pubhex(int(wif2hex(wif),16),compressed=True)
|
||||
Msg(mmb.pubhex2redeem_script(pubhex))
|
||||
privhex = PrivKey(wif=wif)
|
||||
if not privhex.compressed:
|
||||
die(1,'Segwit redeem script cannot be generated from uncompressed WIF')
|
||||
ag = AddrGenerator('segwit')
|
||||
Msg(ag.to_segwit_redeem_script(kg.to_pubhex(privhex)))
|
||||
|
||||
def wif2hex(wif): # wrapper
|
||||
ret = mmb.wif2hex(wif)
|
||||
ret = PrivKey(wif=wif)
|
||||
return ret or die(1,'{}: Invalid WIF'.format(wif))
|
||||
|
||||
wordlists = 'electrum','tirosh'
|
||||
|
|
@ -452,7 +452,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa
|
|||
"""
|
||||
m_prev = None
|
||||
|
||||
for m in sorted([l.mmid for l in accts]):
|
||||
for m in sorted(b.mmid for b in [a for a in accts if a]):
|
||||
if m == m_prev:
|
||||
msg('Duplicate MMGen ID ({}) discovered in tracking wallet!\n'.format(m))
|
||||
bad_accts = MMGenList([l for l in accts if l.mmid == m])
|
||||
|
|
@ -464,6 +464,18 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa
|
|||
die(3,red('Exiting on error'))
|
||||
m_prev = m
|
||||
|
||||
def check_addr_array_lens(acct_pairs):
|
||||
err = False
|
||||
for label,addrs in acct_pairs:
|
||||
if not label: continue
|
||||
if len(addrs) != 1:
|
||||
err = True
|
||||
if len(addrs) == 0:
|
||||
msg("Label '{}': has no associated address!".format(label))
|
||||
else:
|
||||
msg("'{}': more than one {} address in account!".format(addrs,g.coin))
|
||||
if err: rdie(3,'Tracking wallet is corrupted!')
|
||||
|
||||
usr_addr_list = []
|
||||
if addrs:
|
||||
a = addrs.rsplit(':',1)
|
||||
|
|
@ -494,20 +506,22 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa
|
|||
|
||||
# We use listaccounts only for empty addresses, as it shows false positive balances
|
||||
if showempty:
|
||||
# args: minconf,watchonly
|
||||
accts = MMGenList([b for b in [TwLabel(a,on_fail='silent') for a in c.listaccounts(0,True)] if b])
|
||||
check_dup_mmid(accts)
|
||||
acct_addrs = c.getaddressesbyaccount([[a] for a in accts],batch=True)
|
||||
assert len(accts) == len(acct_addrs), 'listaccounts() and getaddressesbyaccount() not of same length'
|
||||
for a in acct_addrs:
|
||||
if len(a) != 1:
|
||||
die(2,"'{}': more than one {} address in account!".format(a,g.coin))
|
||||
for label,addr in zip(accts,[b[0] for b in acct_addrs]):
|
||||
# for compatibility with old mmids, must use raw RPC rather than native data for matching
|
||||
# args: minconf,watchonly, MUST use keys() so we get list, not dict
|
||||
acct_list = c.listaccounts(0,True).keys() # raw list, no 'L'
|
||||
acct_labels = MMGenList([TwLabel(a,on_fail='silent') for a in acct_list])
|
||||
check_dup_mmid(acct_labels)
|
||||
acct_addrs = c.getaddressesbyaccount([[a] for a in acct_list],batch=True) # use raw list here
|
||||
assert len(acct_list) == len(acct_addrs), 'listaccounts() and getaddressesbyaccount() not equal in length'
|
||||
addr_pairs = zip(acct_labels,acct_addrs)
|
||||
check_addr_array_lens(addr_pairs)
|
||||
for label,addr_arr in addr_pairs:
|
||||
if not label: continue
|
||||
if usr_addr_list and (label.mmid not in usr_addr_list): continue
|
||||
if label.mmid not in addrs:
|
||||
addrs[label.mmid] = { 'amt':BTCAmt('0'), 'lbl':label, 'addr':'' }
|
||||
if showbtcaddrs:
|
||||
addrs[label.mmid]['addr'] = BTCAddr(addr)
|
||||
addrs[label.mmid]['addr'] = BTCAddr(addr_arr[0])
|
||||
|
||||
if not addrs:
|
||||
die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty])
|
||||
|
|
@ -547,7 +561,7 @@ def Listaddresses(addrs='',minconf=1,showempty=False,pager=False,showbtcaddrs=Fa
|
|||
o = '\n'.join(out)
|
||||
return do_pager(o) if pager else Msg(o)
|
||||
|
||||
def Getbalance(minconf=1):
|
||||
def Getbalance(minconf=1,quiet=False):
|
||||
accts = {}
|
||||
for d in bitcoin_connection().listunspent(0):
|
||||
ma = split2(d['account'] if 'account' in d else '')[0] # include coinbase outputs if spendable
|
||||
|
|
@ -562,12 +576,16 @@ def Getbalance(minconf=1):
|
|||
for j in ([],[0])[confs==0] + [i]:
|
||||
accts[key][j] += d['amount']
|
||||
|
||||
fs = '{:13} {} {} {}'
|
||||
mc,lbl = str(minconf),'confirms'
|
||||
Msg(fs.format('Wallet',
|
||||
*[s.ljust(16) for s in ' Unconfirmed',' <%s %s'%(mc,lbl),' >=%s %s'%(mc,lbl)]))
|
||||
for key in sorted(accts.keys()):
|
||||
Msg(fs.format(key+':', *[a.fmt(color=True,suf=' '+g.coin) for a in accts[key]]))
|
||||
if quiet:
|
||||
Msg('{}'.format(accts['TOTAL'][2]))
|
||||
else:
|
||||
fs = '{:13} {} {} {}'
|
||||
mc,lbl = str(minconf),'confirms'
|
||||
Msg(fs.format('Wallet',
|
||||
*[s.ljust(16) for s in ' Unconfirmed',' <%s %s'%(mc,lbl),' >=%s %s'%(mc,lbl)]))
|
||||
for key in sorted(accts.keys()):
|
||||
Msg(fs.format(key+':', *[a.fmt(color=True,suf=' '+g.coin) for a in accts[key]]))
|
||||
|
||||
if 'SPENDABLE' in accts:
|
||||
Msg(red('Warning: this wallet contains PRIVATE KEYS for the SPENDABLE balance!'))
|
||||
|
||||
|
|
|
|||
10
mmgen/tx.py
10
mmgen/tx.py
|
|
@ -457,6 +457,8 @@ class MMGenTX(MMGenObject):
|
|||
msg_r('Signing transaction{}...'.format(tx_num_str))
|
||||
ht = ('ALL','ALL|FORKID')[g.coin=='BCH'] # sighashtype defaults to 'ALL'
|
||||
wifs = [d.sec.wif for d in keys]
|
||||
# keys.pmsg()
|
||||
# pmsg(wifs)
|
||||
ret = c.signrawtransaction(self.hex,sig_data,wifs,ht,on_fail='return')
|
||||
|
||||
from mmgen.rpc import rpc_error,rpc_errmsg
|
||||
|
|
@ -525,7 +527,10 @@ class MMGenTX(MMGenObject):
|
|||
|
||||
def is_in_wallet(self,c):
|
||||
ret = c.gettransaction(self.btc_txid,on_fail='silent')
|
||||
return 'confirmations' in ret and ret['confirmations'] > 0
|
||||
if 'confirmations' in ret and ret['confirmations'] > 0:
|
||||
return ret['confirmations']
|
||||
else:
|
||||
return False
|
||||
|
||||
def is_replaced(self,c):
|
||||
if self.is_in_mempool(c): return False
|
||||
|
|
@ -541,7 +546,8 @@ class MMGenTX(MMGenObject):
|
|||
if self.is_in_mempool(c):
|
||||
msg(('Warning: transaction is in mempool!','Transaction is in mempool')[status])
|
||||
elif self.is_in_wallet(c):
|
||||
die(1,'Transaction has been confirmed{}'.format('' if status else '!'))
|
||||
confs = self.is_in_wallet(c)
|
||||
die(0,'Transaction has {} confirmation{}'.format(confs,suf(confs,'s')))
|
||||
elif self.is_in_utxos(c):
|
||||
die(2,red('ERROR: transaction is in the blockchain (but not in the tracking wallet)!'))
|
||||
ret = self.is_replaced(c) # 1: replacement in mempool, 2: replacement confirmed
|
||||
|
|
|
|||
|
|
@ -138,8 +138,12 @@ def get_seed_files(opt,args):
|
|||
# favor unencrypted seed sources first, as they don't require passwords
|
||||
u,e = SeedSourceUnenc,SeedSourceEnc
|
||||
ret = _pop_and_return(args,u.get_extensions())
|
||||
from mmgen.filename import find_file_in_dir
|
||||
wf = find_file_in_dir(Wallet,g.data_dir) # Make this the first encrypted ss in the list
|
||||
from mmgen.filename import find_file_in_dir,find_files_in_dir
|
||||
if g.bob or g.alice:
|
||||
import regtest as rt
|
||||
wf = rt.mmwords[('alice','bob')[g.bob]]
|
||||
else:
|
||||
wf = find_file_in_dir(Wallet,g.data_dir) # Make this the first encrypted ss in the list
|
||||
if wf: ret.append(wf)
|
||||
ret += _pop_and_return(args,e.get_extensions())
|
||||
if not (ret or opt.mmgen_keys_from_file or opt.keys_from_file): # or opt.use_wallet_dat
|
||||
|
|
|
|||
|
|
@ -31,6 +31,10 @@ def msg_r(s): sys.stderr.write(s.encode('utf8'))
|
|||
def Msg(s): sys.stdout.write(s.encode('utf8') + '\n')
|
||||
def Msg_r(s): sys.stdout.write(s.encode('utf8'))
|
||||
def msgred(s): msg(red(s))
|
||||
def ymsg(s): msg(yellow(s))
|
||||
def ymsg_r(s): msg_r(yellow(s))
|
||||
def gmsg(s): msg(green(s))
|
||||
def gmsg_r(s): msg_r(green(s))
|
||||
|
||||
def mmsg(*args):
|
||||
for d in args: Msg(repr(d))
|
||||
|
|
@ -464,7 +468,11 @@ def make_full_path(outdir,outfile):
|
|||
def get_seed_file(cmd_args,nargs,invoked_as=None):
|
||||
from mmgen.filename import find_file_in_dir
|
||||
from mmgen.seed import Wallet
|
||||
wf = find_file_in_dir(Wallet,g.data_dir)
|
||||
if g.bob or g.alice:
|
||||
import regtest as rt
|
||||
wf = rt.mmwords[('alice','bob')[g.bob]]
|
||||
else:
|
||||
wf = find_file_in_dir(Wallet,g.data_dir)
|
||||
|
||||
wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
|
||||
|
||||
|
|
@ -800,9 +808,12 @@ def get_bitcoind_auth_cookie():
|
|||
def bitcoin_connection():
|
||||
|
||||
def check_coin_mismatch(c):
|
||||
if c.getblockcount() == 0:
|
||||
msg('Warning: no blockchain, so skipping block mismatch check')
|
||||
return
|
||||
fb = '00000000000000000019f112ec0a9982926f1258cdcc558dd7c3b7e5dc7fa148'
|
||||
err = []
|
||||
if int(c.getblockchaininfo()['blocks']) <= 478558 or c.getblockhash(478559) == fb:
|
||||
if c.getblockchaininfo()['blocks'] <= 478558 or c.getblockhash(478559) == fb:
|
||||
if g.coin == 'BCH': err = 'BCH','BTC'
|
||||
elif g.coin == 'BTC': err = 'BTC','BCH'
|
||||
if err: wdie(2,"'{}' requested, but this is the {} chain!".format(*err))
|
||||
|
|
|
|||
|
|
@ -10,7 +10,10 @@ try:
|
|||
sys.argv.pop(0)
|
||||
execfile(sys.argv[0])
|
||||
except SystemExit:
|
||||
sys.exit(int(str(sys.exc_info()[1])))
|
||||
try:
|
||||
sys.exit(int(str(sys.exc_info()[1])))
|
||||
except:
|
||||
sys.exit(1)
|
||||
except:
|
||||
l = traceback.format_exception(*sys.exc_info())
|
||||
exc = l.pop()
|
||||
|
|
|
|||
3
setup.py
3
setup.py
|
|
@ -120,6 +120,7 @@ setup(
|
|||
'mmgen.mn_tirosh',
|
||||
'mmgen.obj',
|
||||
'mmgen.opts',
|
||||
'mmgen.regtest',
|
||||
'mmgen.rpc',
|
||||
'mmgen.seed',
|
||||
'mmgen.term',
|
||||
|
|
@ -134,6 +135,7 @@ setup(
|
|||
'mmgen.main_addrgen',
|
||||
'mmgen.main_passgen',
|
||||
'mmgen.main_addrimport',
|
||||
'mmgen.main_regtest',
|
||||
'mmgen.main_txcreate',
|
||||
'mmgen.main_txbump',
|
||||
'mmgen.main_txsign',
|
||||
|
|
@ -152,6 +154,7 @@ setup(
|
|||
'mmgen-passgen',
|
||||
'mmgen-addrimport',
|
||||
'mmgen-passchg',
|
||||
'mmgen-regtest',
|
||||
'mmgen-walletchk',
|
||||
'mmgen-walletconv',
|
||||
'mmgen-walletgen',
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ from binascii import hexlify
|
|||
|
||||
# Import these _after_ local path's been added to sys.path
|
||||
from mmgen.common import *
|
||||
from mmgen.bitcoin import hex2wif
|
||||
|
||||
rounds = 100
|
||||
opts_data = lambda: {
|
||||
|
|
@ -115,7 +114,6 @@ def match_error(sec,wif,a_addr,b_addr,a,b):
|
|||
""".format(sec,wif,a_addr,b_addr,pnm=g.proj_name,a=m[a],b=m[b]).rstrip())
|
||||
|
||||
# Begin execution
|
||||
mmtype = ('L','S')[bool(opt.segwit)]
|
||||
compressed = True
|
||||
|
||||
from mmgen.addr import KeyGenerator,AddrGenerator
|
||||
|
|
@ -177,7 +175,6 @@ elif a and dump:
|
|||
sec = PrivKey(wif=wif)
|
||||
except:
|
||||
die(2,'\nInvalid {}net WIF address in dump file: {}'.format(('main','test')[g.testnet],wif))
|
||||
compressed = wif[0] != ('5','9')[g.testnet]
|
||||
b_addr = ag.to_addr(kg.to_pubhex(sec))
|
||||
if a_addr != b_addr:
|
||||
match_error(sec,wif,a_addr,b_addr,3,a)
|
||||
|
|
|
|||
249
test/objtest.py
Executable file
249
test/objtest.py
Executable file
|
|
@ -0,0 +1,249 @@
|
|||
#!/usr/bin/env python
|
||||
# -*- coding: UTF-8 -*-
|
||||
#
|
||||
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
|
||||
# Copyright (C)2013-2017 Philemon <mmgen-py@yandex.com>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
test/objtest.py: Test MMGen data objects
|
||||
"""
|
||||
|
||||
import sys,os
|
||||
pn = os.path.dirname(sys.argv[0])
|
||||
os.chdir(os.path.join(pn,os.pardir))
|
||||
sys.path.__setitem__(0,os.path.abspath(os.curdir))
|
||||
|
||||
from binascii import hexlify
|
||||
|
||||
# Import these _after_ local path's been added to sys.path
|
||||
from mmgen.common import *
|
||||
from mmgen.obj import *
|
||||
from mmgen.seed import *
|
||||
|
||||
opts_data = lambda: {
|
||||
'desc': 'Test MMGen data objects',
|
||||
'sets': ( ('super_silent', True, 'silent', True), ),
|
||||
'usage':'[options] [object]',
|
||||
'options': """
|
||||
-h, --help Print this help message
|
||||
--, --longhelp Print help message for long options (common options)
|
||||
-q, --quiet Produce quieter output
|
||||
-s, --silent Silence output of tested objects
|
||||
-S, --super-silent Silence all output except for errors
|
||||
-v, --verbose Produce more verbose output
|
||||
"""
|
||||
}
|
||||
|
||||
cmd_args = opts.init(opts_data)
|
||||
|
||||
def run_test(test,arg,input_data):
|
||||
arg_copy = arg
|
||||
kwargs = {'on_fail':'silent'} if opt.silent else {}
|
||||
ret_chk = arg
|
||||
if input_data == 'good' and type(arg) == tuple: arg,ret_chk = arg
|
||||
if type(arg) == dict: # pass one arg + kwargs to constructor
|
||||
arg_copy = arg.copy()
|
||||
if 'arg' in arg:
|
||||
args = [arg['arg']]
|
||||
ret_chk = args[0]
|
||||
del arg['arg']
|
||||
else:
|
||||
args = []
|
||||
ret_chk = arg.values()[0] # assume only one key present
|
||||
if 'ret' in arg:
|
||||
ret_chk = arg['ret']
|
||||
del arg['ret']
|
||||
del arg_copy['ret']
|
||||
kwargs.update(arg)
|
||||
else:
|
||||
args = [arg]
|
||||
try:
|
||||
if not opt.super_silent:
|
||||
msg_r((orange,green)[input_data=='good']('{:<22}'.format(repr(arg_copy)+':')))
|
||||
cls = globals()[test]
|
||||
ret = cls(*args,**kwargs)
|
||||
bad_ret = list() if issubclass(cls,list) else None
|
||||
if (opt.silent and input_data=='bad' and ret!=bad_ret) or (not opt.silent and input_data=='bad'):
|
||||
raise UserWarning,"Non-'None' return value {} with bad input data".format(repr(ret))
|
||||
if opt.silent and input_data=='good' and ret==bad_ret:
|
||||
raise UserWarning,"'None' returned with good input data"
|
||||
if input_data=='good' and ret != ret_chk and repr(ret) != repr(ret_chk):
|
||||
raise UserWarning,"Return value ({!r}) doesn't match expected value ({!r})".format(ret,ret_chk)
|
||||
if not opt.super_silent:
|
||||
msg(u'==> {}'.format(ret))
|
||||
if opt.verbose and issubclass(cls,MMGenObject):
|
||||
ret.pmsg()
|
||||
except SystemExit as e:
|
||||
if input_data == 'good':
|
||||
raise ValueError,'Error on good input data'
|
||||
if opt.verbose:
|
||||
msg('exitval: {}'.format(e[0]))
|
||||
except UserWarning as e:
|
||||
msg('==> {!r}'.format(ret))
|
||||
die(2,red('{}'.format(e[0])))
|
||||
|
||||
r32,r24,r16,r17,r18 = os.urandom(32),os.urandom(24),os.urandom(16),os.urandom(17),os.urandom(18)
|
||||
|
||||
from collections import OrderedDict
|
||||
tests = OrderedDict([
|
||||
('AddrIdx', {
|
||||
'bad': ('s',1.1,12345678,-1),
|
||||
'good': (('7',7),)
|
||||
}),
|
||||
('AddrIdxList', {
|
||||
'bad': ('x','5,9,1-2-3','8,-11','66,3-2'),
|
||||
'good': (
|
||||
('3,2,2',[2,3]),
|
||||
('101,1,3,5,2-7,99',[1,2,3,4,5,6,7,99,101]),
|
||||
({'idx_list':AddrIdxList('1-5')},[1,2,3,4,5])
|
||||
)}),
|
||||
('BTCAmt', {
|
||||
'bad': ('-3.2','0.123456789',123L,'123L',22000000,20999999.12345678),
|
||||
'good': (('20999999.12345678',Decimal('20999999.12345678')),)
|
||||
}),
|
||||
('BTCAddr', {
|
||||
'bad': (1,'x','я'),
|
||||
'good': (
|
||||
'1MjjELEy6EJwk8fSNfpS8b5teFRo4X5fZr',
|
||||
'32GiSWo9zJQgkCmjAaLRrbPwXhKry2jHhj',
|
||||
'n2FgXPKwuFkCXF946EnoxWJDWF2VwQ6q8J',
|
||||
'2MspvWFjBbkv2wzQGqhxJUYPCk3Y2jMaxLN'
|
||||
)}),
|
||||
('SeedID', {
|
||||
'bad': (
|
||||
{'sid':'я'},
|
||||
{'sid':'F00F00'},
|
||||
{'sid':'xF00F00x'},
|
||||
{'sid':1},
|
||||
{'sid':'F00BAA123'},
|
||||
{'sid':'f00baa12'},
|
||||
'я',r32,'abc'),
|
||||
'good': (({'sid':'F00BAA12'},'F00BAA12'),(Seed(r16),Seed(r16).sid))
|
||||
}),
|
||||
('MMGenID', {
|
||||
'bad': ('x',1,'f00f00f','a:b','x:L:3','F00BAA12:0','F00BAA12:Z:99'),
|
||||
'good': (('F00BAA12:99','F00BAA12:L:99'),'F00BAA12:L:99','F00BAA12:S:99')
|
||||
}),
|
||||
('TwMMGenID', {
|
||||
'bad': ('x','я','я:я',1,'f00f00f','a:b','x:L:3','F00BAA12:0','F00BAA12:Z:99','btc:','btc:я'),
|
||||
'good': (('F00BAA12:99','F00BAA12:L:99'),'F00BAA12:L:99','F00BAA12:S:9999999','btc:x')
|
||||
}),
|
||||
('TwComment', {
|
||||
'bad': ('я',"comment too long for tracking wallet",),
|
||||
'good': ('OK comment',)
|
||||
}),
|
||||
('TwLabel', {
|
||||
'bad': ('x x','x я','я:я',1,'f00f00f','a:b','x:L:3','F00BAA12:0 x',
|
||||
'F00BAA12:Z:99','F00BAA12:L:99 я','btc: x','btc:я x'),
|
||||
'good': (
|
||||
('F00BAA12:99 a comment','F00BAA12:L:99 a comment'),
|
||||
'F00BAA12:L:99 comment',
|
||||
'F00BAA12:S:9999999 comment',
|
||||
'btc:x comment')
|
||||
}),
|
||||
('HexStr', {
|
||||
'bad': (1,[],'\0','\1','я','g','gg','FF','f00'),
|
||||
'good': ('deadbeef','f00baa12')
|
||||
}),
|
||||
('MMGenTxID', {
|
||||
'bad': (1,[],'\0','\1','я','g','gg','FF','f00','F00F0012'),
|
||||
'good': ('DEADBE','F00BAA')
|
||||
}),
|
||||
('BitcoinTxID',{
|
||||
'bad': (1,[],'\0','\1','я','g','gg','FF','f00','F00F0012',hexlify(r16),hexlify(r32)+'ee'),
|
||||
'good': (hexlify(r32),)
|
||||
}),
|
||||
('WifKey', {
|
||||
'bad': (1,[],'\0','\1','я','g','gg','FF','f00',hexlify(r16),'2MspvWFjBbkv2wzQGqhxJUYPCk3Y2jMaxLN'),
|
||||
'good': (
|
||||
'5KXEpVzjWreTcQoG5hX357s1969MUKNLuSfcszF6yu84kpsNZKb',
|
||||
'KwWr9rDh8KK5TtDa3HLChEvQXNYcUXpwhRFUPc5uSNnMtqNKLFhk',
|
||||
{'arg':'93HsQEpH75ibaUJYi3QwwiQxnkW4dUuYFPXZxcbcKds7XrqHkY6','testnet':True},
|
||||
{'arg':'cMsqcmDYZP1LdKgqRh9L4ZRU9br28yvdmTPwW2YQwVSN9aQiMAoR','testnet':True}
|
||||
)
|
||||
}),
|
||||
('PubKey', {
|
||||
'bad': ({'arg':1,'compressed':False},{'arg':'F00BAA12','compressed':False},),
|
||||
'good': ({'arg':'deadbeef','compressed':True},) # TODO: add real pubkeys
|
||||
}),
|
||||
('PrivKey', {
|
||||
'bad': ({'wif':1},),
|
||||
'good': (
|
||||
{'wif':'5KXEpVzjWreTcQoG5hX357s1969MUKNLuSfcszF6yu84kpsNZKb',
|
||||
'ret':'e0aef965b905a2fedf907151df8e0a6bac832aa697801c51f58bd2ecb4fd381c'},
|
||||
{'wif':'KwWr9rDh8KK5TtDa3HLChEvQXNYcUXpwhRFUPc5uSNnMtqNKLFhk',
|
||||
'ret':'08d0ed83b64b68d56fa064be48e2385060ed205be2b1e63cd56d218038c3a05f'},
|
||||
{'wif':'93HsQEpH75ibaUJYi3QwwiQxnkW4dUuYFPXZxcbcKds7XrqHkY6','testnet':True,
|
||||
'ret':'e0aef965b905a2fedf907151df8e0a6bac832aa697801c51f58bd2ecb4fd381c'},
|
||||
{'wif':'cMsqcmDYZP1LdKgqRh9L4ZRU9br28yvdmTPwW2YQwVSN9aQiMAoR','testnet':True,
|
||||
'ret':'08d0ed83b64b68d56fa064be48e2385060ed205be2b1e63cd56d218038c3a05f'},
|
||||
{'s':r32,'compressed':False,'ret':hexlify(r32)},
|
||||
{'s':r32,'compressed':True,'ret':hexlify(r32)}
|
||||
)
|
||||
}),
|
||||
('AddrListID', { # a rather pointless test, but do it anyway
|
||||
'bad': (
|
||||
{'sid':SeedID(sid='F00BAA12'),'mmtype':'Z','ret':'F00BAA12:Z'},
|
||||
),
|
||||
'good': (
|
||||
{'sid':SeedID(sid='F00BAA12'),'mmtype':MMGenAddrType('S'),'ret':'F00BAA12:S'},
|
||||
{'sid':SeedID(sid='F00BAA12'),'mmtype':MMGenAddrType('L'),'ret':'F00BAA12:L'},
|
||||
)
|
||||
}),
|
||||
('MMGenWalletLabel', {
|
||||
'bad': ('яqwerty','This text is too long to fit in an MMGen wallet label'),
|
||||
'good': ('a good label',)
|
||||
}),
|
||||
('TwComment', {
|
||||
'bad': (u'яqwerty','This text is too long for a TW comment'),
|
||||
'good': ('a good comment',)
|
||||
}),
|
||||
('MMGenTXLabel',{
|
||||
'bad': ('This text is too long for a transaction comment. '*2,),
|
||||
'good': (u'UTF-8 is OK: я','a good comment',)
|
||||
}),
|
||||
('MMGenPWIDString', { # forbidden = list(u' :/\\')
|
||||
'bad': ('foo/','foo:','foo:\\'),
|
||||
'good': (u'qwerty@яяя',)
|
||||
}),
|
||||
('MMGenAddrType', {
|
||||
'bad': ('U','z','xx',1,'dogecoin'),
|
||||
'good': (
|
||||
{'s':'segwit','ret':'S'},
|
||||
{'s':'S','ret':'S'},
|
||||
{'s':'legacy','ret':'L'},
|
||||
{'s':'L','ret':'L'},
|
||||
{'s':'compressed','ret':'C'},
|
||||
{'s':'C','ret':'C'}
|
||||
)}),
|
||||
('MMGenPasswordType', {
|
||||
'bad': ('U','z','я',1,'passw0rd'),
|
||||
'good': (
|
||||
{'s':'password','ret':'P'},
|
||||
{'s':'P','ret':'P'},
|
||||
)}),
|
||||
])
|
||||
|
||||
def do_loop():
|
||||
utests = cmd_args
|
||||
for test in tests:
|
||||
if utests and test not in utests: continue
|
||||
msg((blue,nocolor)[bool(opt.super_silent)]('Testing {}'.format(test)))
|
||||
for k in ('bad','good'):
|
||||
for arg in tests[test][k]:
|
||||
run_test(test,arg,input_data=k)
|
||||
|
||||
do_loop()
|
||||
25
test/test.py
25
test/test.py
|
|
@ -50,7 +50,8 @@ ref_wallet_brainpass = 'abc'
|
|||
ref_wallet_hash_preset = '1'
|
||||
ref_wallet_incog_offset = 123
|
||||
|
||||
from mmgen.obj import MMGenTXLabel
|
||||
from mmgen.obj import MMGenTXLabel,PrivKey,BTCAmt
|
||||
from mmgen.addr import AddrGenerator,KeyGenerator,AddrList,AddrData,AddrIdxList
|
||||
ref_tx_label = ''.join([unichr(i) for i in range(65,91) +
|
||||
range(1040,1072) + # cyrillic
|
||||
range(913,939) + # greek
|
||||
|
|
@ -709,7 +710,6 @@ def find_generated_exts(cmd):
|
|||
def get_addrfile_checksum(display=False):
|
||||
addrfile = get_file_with_ext('addrs',cfg['tmpdir'])
|
||||
silence()
|
||||
from mmgen.addr import AddrList
|
||||
chk = AddrList(addrfile).chksum
|
||||
if opt.verbose and display: msg('Checksum: %s' % cyan(chk))
|
||||
end_silence()
|
||||
|
|
@ -728,9 +728,6 @@ class MMGenExpect(MMGenPexpect):
|
|||
desc = (cmd_data[name][1],name)[bool(opt.names)] + (' ' + extra_desc).strip()
|
||||
return MMGenPexpect.__init__(self,name,mmgen_cmd,cmd_args,desc,no_output=no_output)
|
||||
|
||||
from mmgen.obj import BTCAmt
|
||||
from mmgen.bitcoin import verify_addr
|
||||
|
||||
def create_fake_unspent_entry(btcaddr,al_id=None,idx=None,lbl=None,non_mmgen=False,segwit=False):
|
||||
if lbl: lbl = ' ' + lbl
|
||||
spk1,spk2 = (('76a914','88ac'),('a914','87'))[segwit and btcaddr.addr_fmt=='p2sh']
|
||||
|
|
@ -741,7 +738,7 @@ def create_fake_unspent_entry(btcaddr,al_id=None,idx=None,lbl=None,non_mmgen=Fal
|
|||
'amount': BTCAmt('%s.%s' % (10+(getrandnum(4) % 40), getrandnum(4) % 100000000)),
|
||||
'address': btcaddr,
|
||||
'spendable': False,
|
||||
'scriptPubKey': (spk1+verify_addr(btcaddr,return_hex=True)+spk2),
|
||||
'scriptPubKey': '{}{}{}'.format(spk1,btcaddr.hex,spk2),
|
||||
'confirmations': getrandnum(4) % 50000
|
||||
}
|
||||
|
||||
|
|
@ -784,14 +781,10 @@ def create_fake_unspent_data(adata,tx_data,non_mmgen_input=''):
|
|||
out.append(create_fake_unspent_entry(btcaddr,d['al_id'],idx,lbl,segwit=d['segwit']))
|
||||
|
||||
if non_mmgen_input:
|
||||
privnum = getrandnum(32)
|
||||
from mmgen.bitcoin import privnum2addr,hex2wif
|
||||
from mmgen.obj import BTCAddr
|
||||
btcaddr = BTCAddr(privnum2addr(privnum,compressed=True))
|
||||
privkey = PrivKey(os.urandom(32),compressed=True)
|
||||
btcaddr = AddrGenerator('p2pkh').to_addr(KeyGenerator().to_pubhex(privkey))
|
||||
of = os.path.join(cfgs[non_mmgen_input]['tmpdir'],non_mmgen_fn)
|
||||
wif = hex2wif('{:064x}'.format(privnum),compressed=True)
|
||||
# Msg(yellow(wif + ' ' + btcaddr))
|
||||
write_data_to_file(of,wif+'\n','compressed bitcoin key',silent=True)
|
||||
write_data_to_file(of,privkey.wif+'\n','compressed bitcoin key',silent=True)
|
||||
out.append(create_fake_unspent_entry(btcaddr,non_mmgen=True,segwit=False))
|
||||
|
||||
# msg('\n'.join([repr(o) for o in out])); sys.exit(0)
|
||||
|
|
@ -808,7 +801,6 @@ def write_fake_data_to_file(d):
|
|||
sys.stderr.write("Fake transaction wallet data written to file '%s'\n" % unspent_data_file)
|
||||
|
||||
def create_tx_data(sources):
|
||||
from mmgen.addr import AddrList,AddrData,AddrIdxList
|
||||
tx_data,ad = {},AddrData()
|
||||
for s in sources:
|
||||
afile = get_file_with_ext('addrs',cfgs[s]['tmpdir'])
|
||||
|
|
@ -829,8 +821,8 @@ def create_tx_data(sources):
|
|||
return ad,tx_data
|
||||
|
||||
def make_txcreate_cmdline(tx_data):
|
||||
from mmgen.bitcoin import privnum2addr
|
||||
btcaddr = privnum2addr(getrandnum(32),compressed=True)
|
||||
privkey = PrivKey(os.urandom(32),compressed=True)
|
||||
btcaddr = AddrGenerator('segwit').to_addr(KeyGenerator().to_pubhex(privkey))
|
||||
|
||||
cmd_args = ['-d',cfg['tmpdir']]
|
||||
for num in tx_data:
|
||||
|
|
@ -848,7 +840,6 @@ def make_txcreate_cmdline(tx_data):
|
|||
def add_comments_to_addr_file(addrfile,outfile):
|
||||
silence()
|
||||
msg(green("Adding comments to address file '%s'" % addrfile))
|
||||
from mmgen.addr import AddrList
|
||||
a = AddrList(addrfile)
|
||||
for n,idx in enumerate(a.idxs(),1):
|
||||
if n % 2: a.set_comment(idx,'Test address %s' % n)
|
||||
|
|
|
|||
|
|
@ -354,7 +354,7 @@ class MMGenToolTestSuite(object):
|
|||
def Hexaddr2addr(self,name,f1,f2,f3,f4):
|
||||
for n,fi,fo,m in ((1,f1,f2,''),(2,f3,f4,'from compressed')):
|
||||
self.run_cmd_chk(name,fi,fo,extra_msg=m)
|
||||
def Privhex2pubhex(self,name,f1,f2,f3): # from hex2wif
|
||||
def Privhex2pubhex(self,name,f1,f2,f3): # from Hex2wif
|
||||
addr = read_from_file(f3).strip()
|
||||
self.run_cmd_out(name,addr,kwargs='compressed=1',fn_idx=3)
|
||||
def Pubhex2redeem_script(self,name,f1,f2,f3): # from above
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue