new classes: KeyGenerator,AddrGenerator,PrivKey; read-only attrs rewrite

- OO rewrite of key/addr generation interface (KeyGenerator,AddrGenerator)
- New data objects: PrivKey,PubKey,WifKey
- rewrite of read-only attr implementation for addr/tx/tw list entries
  (MMGenImmutableAttr,MMGenListItemAttr descriptors)
- txsign: build key list of addrlist objects rather than addr,key tuples
This commit is contained in:
The MMGen Project 2017-08-07 22:02:24 +03:00
commit 52fdf29b67
Signed by: mmgen
GPG key ID: 62DBE9E5212F05BE
21 changed files with 431 additions and 391 deletions

View file

@ -23,96 +23,93 @@ addr.py: Address generation/display routines for the MMGen suite
from hashlib import sha256,sha512
from binascii import hexlify,unhexlify
from mmgen.common import *
from mmgen.bitcoin import hex2wif,wif2hex,wif_is_compressed
from mmgen.obj import *
from mmgen.tx import *
from mmgen.tw import *
pnm = g.proj_name
def _test_for_secp256k1(silent=False):
no_secp256k1_errmsg = """
secp256k1 library unavailable. Using (slow) native Python ECDSA library for address generation.
"""
try:
from mmgen.secp256k1 import priv2pub
assert priv2pub(os.urandom(32),1)
except:
if not silent: msg(no_secp256k1_errmsg.strip())
return False
return True
class AddrGenerator(MMGenObject):
def __new__(cls,atype):
d = {
'p2pkh': AddrGeneratorP2PKH,
'segwit': AddrGeneratorSegwit
}
assert atype in d
return super(cls,cls).__new__(d[atype])
def _pubhex2addr(pubhex,mmtype):
if mmtype == 'L':
class AddrGeneratorP2PKH(MMGenObject):
desc = 'p2pkh'
def to_addr(self,pubhex):
assert type(pubhex) == PubKey
from mmgen.bitcoin import hexaddr2addr,hash160
return hexaddr2addr(hash160(pubhex))
elif mmtype == 'S':
return BTCAddr(hexaddr2addr(hash160(pubhex)))
def to_segwit_redeem_script(self,pubhex):
raise NotImplemented
class AddrGeneratorSegwit(MMGenObject):
desc = 'segwit'
def to_addr(self,pubhex):
assert pubhex.compressed
from mmgen.bitcoin import pubhex2segwitaddr
return pubhex2segwitaddr(pubhex)
else:
die(2,"'{}': mmtype unrecognized".format(mmtype))
return BTCAddr(pubhex2segwitaddr(pubhex))
def _privhex2addr_python(privhex,compressed,mmtype):
assert compressed or mmtype != 'S'
from mmgen.bitcoin import privnum2pubhex
pubhex = privnum2pubhex(int(privhex,16),compressed=compressed)
return _pubhex2addr(pubhex,mmtype=mmtype)
def to_segwit_redeem_script(self,pubhex):
assert pubhex.compressed
from mmgen.bitcoin import pubhex2redeem_script
return HexStr(pubhex2redeem_script(pubhex))
def _privhex2addr_secp256k1(privhex,compressed,mmtype):
assert compressed or mmtype != 'S'
from mmgen.secp256k1 import priv2pub
pubhex = hexlify(priv2pub(unhexlify(privhex),int(compressed)))
return _pubhex2addr(pubhex,mmtype=mmtype)
class KeyGenerator(MMGenObject):
def __new__(cls,generator=None,silent=False):
if cls.test_for_secp256k1(silent=silent) and generator != 1:
if opt.key_generator != 1:
return super(cls,cls).__new__(KeyGeneratorSecp256k1)
else:
msg('Using (slow) native Python ECDSA library for address generation')
return super(cls,cls).__new__(KeyGeneratorPython)
def _wif2addr_python(wif,mmtype):
privhex = wif2hex(wif)
if not privhex: return False
return _privhex2addr_python(privhex,wif_is_compressed(wif),mmtype=mmtype)
@classmethod
def test_for_secp256k1(self,silent=False):
try:
from mmgen.secp256k1 import priv2pub
assert priv2pub(os.urandom(32),1)
return True
except:
return False
def _wif2addr_secp256k1(wif,mmtype):
privhex = wif2hex(wif)
if not privhex: return False
return _privhex2addr_secp256k1(privhex,wif_is_compressed(wif),mmtype=mmtype)
def keygen_wif2pubhex(wif,selector):
privhex = wif2hex(wif)
if not privhex: return False
if selector == 1:
from mmgen.secp256k1 import priv2pub
return hexlify(priv2pub(unhexlify(privhex),int(wif_is_compressed(wif))))
elif selector == 0:
class KeyGeneratorPython(KeyGenerator):
desc = 'python-ecdsa'
def to_pubhex(self,privhex):
assert type(privhex) == PrivKey
from mmgen.bitcoin import privnum2pubhex
return privnum2pubhex(int(privhex,16),compressed=wif_is_compressed(wif))
def keygen_selector(generator=None):
if _test_for_secp256k1() and generator != 1:
if opt.key_generator != 1:
return 1
msg('Using (slow) native Python ECDSA library for address generation')
return 0
def get_wif2addr_f(generator=None):
gen = keygen_selector(generator=generator)
return (_wif2addr_python,_wif2addr_secp256k1)[gen]
def get_privhex2addr_f(generator=None):
gen = keygen_selector(generator=generator)
return (_privhex2addr_python,_privhex2addr_secp256k1)[gen]
return PubKey(privnum2pubhex(int(privhex,16),compressed=privhex.compressed),compressed=privhex.compressed)
class KeyGeneratorSecp256k1(KeyGenerator):
desc = 'secp256k1'
def to_pubhex(self,privhex):
assert type(privhex) == PrivKey
from mmgen.secp256k1 import priv2pub
return PubKey(hexlify(priv2pub(unhexlify(privhex),int(privhex.compressed))),compressed=privhex.compressed)
class AddrListEntry(MMGenListItem):
attrs = 'idx','addr','label','wif','sec'
reassign_ok = 'label',
addr = MMGenListItemAttr('addr','BTCAddr')
idx = MMGenListItemAttr('idx','AddrIdx')
wif = MMGenListItemAttr('wif','WifKey')
label = MMGenListItemAttr('label','TwComment')
sec = MMGenImmutableAttr('sec',PrivKey)
class PasswordListEntry(MMGenListItem):
reassign_ok = 'label',
passwd = MMGenImmutableAttr('passwd',unicode) # TODO: create Password type
idx = MMGenListItemAttr('idx','AddrIdx')
label = MMGenListItemAttr('label','TwComment')
sec = MMGenImmutableAttr('sec',PrivKey)
class AddrListChksum(str,Hilite):
color = 'pink'
trunc_ok = False
def __new__(cls,addrlist):
els = ['addr','wif'] if addrlist.has_keys else ['sec'] if addrlist.gen_passwds else ['addr']
lines = [' '.join([str(e.idx)] + [getattr(e,f) for f in els]) for e in addrlist.data]
# print '[{}]'.format(' '.join(lines))
lines = [' '.join(addrlist.chksum_rec_f(e)) for e in addrlist.data]
return str.__new__(cls,make_chksum_N(' '.join(lines), nchars=16, sep=True))
class AddrListIDStr(unicode,Hilite):
@ -159,10 +156,11 @@ class AddrList(MMGenObject): # Address info for a single seed ID
Record this checksum: it will be used to verify the address file in the future
""".strip(),
'check_chksum': 'Check this value against your records',
'removed_dups': """
Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
'removed_dup_keys': """
Removed %s duplicate WIF key%s from keylist (also in {pnm} key-address file
""".strip().format(pnm=pnm)
}
entry_type = AddrListEntry
main_key = 'addr'
data_desc = 'address'
file_desc = 'addresses'
@ -175,6 +173,7 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
ext = 'addrs'
dfl_mmtype = MMGenAddrType('L')
cook_hash_rounds = 10 # not too many rounds, so hand decoding can still be feasible
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr)
def __init__(self,addrfile='',al_id='',adata=[],seed='',addr_idxs='',src='',
addrlist='',keylist='',mmtype=None,do_chksum=True,chksum_only=False):
@ -196,7 +195,7 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
adata = AddrListList([AddrListEntry(addr=a) for a in set(addrlist)])
elif keylist: # data from flat key list
self.al_id = None
adata = AddrListList([AddrListEntry(wif=k) for k in set(keylist)])
adata = AddrListList([AddrListEntry(sec=PrivKey(wif=k)) for k in set(keylist)])
elif seed or addr_idxs:
die(3,'Must specify both seed and addr indexes')
elif al_id or adata:
@ -233,15 +232,17 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
def generate(self,seed,addrnums,compressed):
assert type(addrnums) is AddrIdxList
assert compressed in (True,False,None)
assert type(compressed) is bool
seed = seed.get_data()
seed = self.cook_seed(seed)
if self.gen_addrs:
privhex2addr_f = get_privhex2addr_f() # choose internal ECDSA or secp256k1 generator
kg = KeyGenerator()
ag = AddrGenerator(('p2pkh','segwit')[self.al_id.mmtype=='S'])
t_addrs,num,pos,out = len(addrnums),0,0,AddrListList()
le = self.entry_type
while pos != t_addrs:
seed = sha512(seed).digest()
@ -254,21 +255,17 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
if not g.debug:
qmsg_r('\rGenerating %s #%s (%s of %s)' % (self.gen_desc,num,pos,t_addrs))
e = AddrListEntry(idx=num)
e = le(idx=num)
# Secret key is double sha256 of seed hash round /num/
sec = sha256(sha256(seed).digest()).hexdigest()
e.sec = PrivKey(sha256(sha256(seed).digest()).digest(),compressed)
if self.gen_addrs:
e.addr = privhex2addr_f(sec,compressed=compressed,mmtype=self.al_id.mmtype)
e.addr = ag.to_addr(kg.to_pubhex(e.sec))
if self.gen_keys:
e.wif = hex2wif(sec,compressed=compressed)
if opt.b16: e.sec = sec
if self.gen_passwds:
e.sec = self.make_passwd(sec)
dmsg('Key {:>03}: {}'.format(pos,sec))
if type(self) == PasswordList:
e.passwd = unicode(self.make_passwd(e.sec)) # TODO - own type
dmsg('Key {:>03}: {}'.format(pos,e.passwd))
out.append(e)
if g.debug: print 'generate():\n', e.pformat()
@ -347,62 +344,38 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
except: pass
return d
def flat_list(self):
class AddrListFlatEntry(AddrListEntry):
attrs = 'mmid','addr','wif'
return [AddrListFlatEntry(mmid='{}:{}'.format(self.al_id,e.idx),addr=e.addr,wif=e.wif)
for e in self.data]
def remove_dups(self,cmplist,key='wif'):
def remove_dup_keys(self,cmplist):
assert self.has_keys
pop_list = []
for n,d in enumerate(self.data):
if getattr(d,key) == None: continue
for e in cmplist.data:
if getattr(e,key) and getattr(e,key) == getattr(d,key):
if e.sec.wif == d.sec.wif:
pop_list.append(n)
for n in reversed(pop_list): self.data.pop(n)
if pop_list:
vmsg(self.msgs['removed_dups'] % (len(pop_list),suf(removed,'s')))
vmsg(self.msgs['removed_dup_keys'] % (len(pop_list),suf(removed,'s')))
def add_wifs(self,al_key):
if not al_key: return
def add_wifs(self,key_list):
if not key_list: return
for d in self.data:
for e in al_key.data:
if e.addr and e.wif and e.addr == d.addr:
d.wif = e.wif
for e in key_list.data:
if e.addr and e.sec and e.addr == d.addr:
d.sec = e.sec
def list_missing(self,key):
return [d.addr for d in self.data if not getattr(d,key)]
def get(self,key):
return [getattr(d,key) for d in self.data if getattr(d,key)]
def get_addrs(self): return self.get('addr')
def get_wifs(self): return self.get('wif')
def get_addr_wif_pairs(self):
return [(d.addr,d.wif) for d in self.data if hasattr(d,'wif')]
def generate_addrs_from_keylist(self):
wif2addr_f = get_wif2addr_f()
def generate_addrs_from_keys(self):
kg = KeyGenerator()
ag = AddrGenerator('p2pkh')
d = self.data
for n,e in enumerate(d,1):
qmsg_r('\rGenerating addresses from keylist: %s/%s' % (n,len(d)))
e.addr = wif2addr_f(e.wif,mmtype='L') # 'L' == p2pkh
e.addr = ag.to_addr(kg.to_pubhex(e.sec))
qmsg('\rGenerated addresses from keylist: %s/%s ' % (n,len(d)))
def format(self,enable_comments=False):
def check_attrs(key,desc):
for e in self.data:
if not getattr(e,key):
die(3,'missing %s in addr data' % desc)
if type(self) not in (KeyList,PasswordList): check_attrs('addr','addresses')
if self.has_keys:
if opt.b16: check_attrs('sec','hex keys')
check_attrs('wif','wif keys')
out = [self.msgs['file_header']+'\n']
if self.chksum:
out.append(u'# {} data checksum for {}: {}'.format(
@ -421,14 +394,14 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
for e in self.data:
c = ' '+e.label if enable_comments and e.label else ''
if type(self) == KeyList:
out.append(fs.format(e.idx, 'wif: '+e.wif,c))
out.append(fs.format(e.idx,'wif: {}'.format(e.sec.wif),c))
elif type(self) == PasswordList:
out.append(fs.format(e.idx, e.sec, c))
out.append(fs.format(e.idx,e.passwd,c))
else: # First line with idx
out.append(fs.format(e.idx, e.addr,c))
out.append(fs.format(e.idx,e.addr,c))
if self.has_keys:
if opt.b16: out.append(fs.format('', 'hex: '+e.sec,c))
out.append(fs.format('', 'wif: '+e.wif,c))
out.append(fs.format('', 'wif: '+e.sec.wif,c))
out.append('}')
self.fmt_data = '\n'.join([l.rstrip() for l in out]) + '\n'
@ -439,6 +412,7 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
return 'Key-address file has odd number of lines'
ret = AddrListList()
le = self.entry_type
while lines:
l = lines.pop(0)
@ -452,7 +426,7 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
if len(d) != 3: d.append('')
a = AddrListEntry(**{'idx':int(d[0]),self.main_key:d[1],'label':d[2]})
a = le(**{'idx':int(d[0]),self.main_key:d[1],'label':d[2]})
if self.has_keys:
l = lines.pop(0)
@ -463,17 +437,18 @@ Removed %s duplicate wif key%s from keylist (also in {pnm} key-address file
if not is_wif(d[1]):
return "'%s': invalid Bitcoin key" % d[1]
a.wif = d[1]
a.sec = PrivKey(wif=d[1])
ret.append(a)
if self.has_keys and keypress_confirm('Check key-to-address validity?'):
wif2addr_f = get_wif2addr_f()
kg = KeyGenerator()
ag = AddrGenerator(('p2pkh','segwit')[self.al_id.mmtype=='S'])
llen = len(ret)
for n,e in enumerate(ret):
msg_r('\rVerifying keys %s/%s' % (n+1,llen))
if e.addr != wif2addr_f(e.wif,mmtype=self.al_id.mmtype):
return "Key doesn't match address!\n %s\n %s" % (e.wif,e.addr)
if e.addr != ag.to_addr(kg.to_pubhex(e.sec)):
return "Key doesn't match address!\n %s\n %s" % (e.sec.wif,e.addr)
msg(' - done')
return ret
@ -539,6 +514,7 @@ class KeyAddrList(AddrList):
gen_keys = True
has_keys = True
ext = 'akeys'
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr, e.sec.wif)
class KeyList(AddrList):
msgs = {
@ -557,6 +533,7 @@ class KeyList(AddrList):
gen_keys = True
has_keys = True
ext = 'keys'
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr, e.sec.wif)
class PasswordList(AddrList):
msgs = {
@ -573,7 +550,8 @@ class PasswordList(AddrList):
Record this checksum: it will be used to verify the password file in the future
""".strip()
}
main_key = 'sec'
entry_type = PasswordListEntry
main_key = 'passwd'
data_desc = 'password'
file_desc = 'passwords'
gen_desc = 'password'
@ -589,6 +567,7 @@ Record this checksum: it will be used to verify the password file in the future
'b58': { 'min_len': 8 , 'max_len': 36 ,'dfl_len': 20, 'desc': 'base-58 password' },
'b32': { 'min_len': 10 ,'max_len': 42 ,'dfl_len': 24, 'desc': 'base-32 password' }
}
chksum_rec_f = lambda foo,e: (str(e.idx), e.passwd)
def __init__(self,infile=None,seed=None,pw_idxs=None,pw_id_str=None,pw_len=None,pw_fmt=None,
chksum_only=False,chk_params_only=False):
@ -605,7 +584,7 @@ Record this checksum: it will be used to verify the password file in the future
self.set_pw_len(pw_len)
if chk_params_only: return
self.al_id = AddrListID(seed.sid,MMGenPasswordType('P'))
self.data = self.generate(seed,pw_idxs,compressed=None)
self.data = self.generate(seed,pw_idxs,compressed=False)
self.num_addrs = len(self.data)
self.fmt_data = ''
@ -677,7 +656,6 @@ Record this checksum: it will be used to verify the password file in the future
dmsg('Seed: {}\nCooked seed: {}\nCooked seed len: {}'.format(hexlify(seed),hexlify(cseed),len(cseed)))
return sha256_rounds(cseed,self.cook_hash_rounds)
class AddrData(MMGenObject):
msgs = {
'too_many_acct_addresses': """

View file

@ -41,7 +41,7 @@ note_secp256k1 = """
If available, the secp256k1 library will be used for address generation.
""".strip()
def opts_data(): return {
opts_data = lambda: {
'sets': [('print_checksum',True,'quiet',True)],
'desc': """Generate a range or list of {desc} from an {pnm} wallet,
mnemonic, seed or brainwallet""".format(desc=gen_desc,pnm=g.proj_name),

View file

@ -29,7 +29,7 @@ from mmgen.obj import TwLabel
# In batch mode, bitcoind just rescans each address separately anyway, so make
# --batch and --rescan incompatible.
def opts_data(): return {
opts_data = lambda: {
'desc': """Import addresses (both {pnm} and non-{pnm}) into an {pnm}
tracking wallet""".format(pnm=g.proj_name),
'usage':'[opts] [mmgen address file]',

View file

@ -32,7 +32,7 @@ dfl_len = {
'b32': PasswordList.pw_info['b32']['dfl_len']
}
def opts_data(): return {
opts_data = lambda: {
'sets': [('print_checksum',True,'quiet',True)],
'desc': """Generate a range or list of passwords from an {pnm} wallet,
mnemonic, seed or brainwallet for the given ID string""".format(pnm=g.proj_name),

View file

@ -24,7 +24,7 @@ mmgen-tool: Perform various MMGen- and Bitcoin-related operations.
from mmgen.common import *
import mmgen.tool as tool
def opts_data(): return {
opts_data = lambda: {
'desc': 'Perform various {pnm}- and Bitcoin-related operations'.format(pnm=g.proj_name),
'usage': '[opts] <command> <command args>',
'options': """

View file

@ -24,7 +24,7 @@ mmgen-txbump: Increase the fee on a replaceable (replace-by-fee) MMGen
from mmgen.txcreate import *
from mmgen.txsign import *
def opts_data(): return {
opts_data = lambda: {
'desc': 'Increase the fee on a replaceable (RBF) {g.proj_name} transaction, creating a new transaction, and optionally sign and send the new transaction'.format(g=g),
'usage': '[opts] <{g.proj_name} TX file> [seed source] ...'.format(g=g),
'sets': ( ('yes', True, 'quiet', True), ),

View file

@ -23,7 +23,7 @@ mmgen-txcreate: Create a Bitcoin transaction to and from MMGen- or non-MMGen
from mmgen.txcreate import *
def opts_data(): return {
opts_data = lambda: {
'desc': 'Create a transaction with outputs to specified Bitcoin or {g.proj_name} addresses'.format(g=g),
'usage': '[opts] <addr,amt> ... [change addr] [addr file] ...',
'sets': ( ('yes', True, 'quiet', True), ),

View file

@ -23,7 +23,7 @@ mmgen-txdo: Create, sign and broadcast an online MMGen transaction
from mmgen.txcreate import *
from mmgen.txsign import *
def opts_data(): return {
opts_data = lambda: {
'desc': 'Create, sign and send an {g.proj_name} transaction'.format(g=g),
'usage': '[opts] <addr,amt> ... [change addr] [addr file] ... [seed source] ...',
'sets': ( ('yes', True, 'quiet', True), ),
@ -89,7 +89,7 @@ do_license_msg()
kal = get_keyaddrlist(opt)
kl = get_keylist(opt)
if kl and kal: kl.remove_dups(kal,key='wif')
if kl and kal: kl.remove_dup_keys(kal)
tx = txcreate(cmd_args,caller='txdo')
txsign(opt,c,tx,seed_files,kl,kal)

View file

@ -23,7 +23,7 @@ mmgen-txsend: Broadcast a transaction signed by 'mmgen-txsign' to the network
from mmgen.common import *
from mmgen.tx import *
def opts_data(): return {
opts_data = lambda: {
'desc': 'Send a Bitcoin transaction signed by {pnm}-txsign'.format(
pnm=g.proj_name.lower()),
'usage': '[opts] <signed transaction file>',

View file

@ -23,7 +23,7 @@ mmgen-txsign: Sign a transaction generated by 'mmgen-txcreate'
from mmgen.txsign import *
# -w, --use-wallet-dat (keys from running bitcoind) removed: use bitcoin-cli walletdump instead
def opts_data(): return {
opts_data = lambda: {
'desc': 'Sign Bitcoin transactions generated by {pnl}-txcreate'.format(pnl=pnm.lower()),
'usage': '[opts] <transaction file>... [seed source]...',
'sets': ( ('yes', True, 'quiet', True), ),
@ -88,7 +88,7 @@ seed_files = get_seed_files(opt,infiles)
kal = get_keyaddrlist(opt)
kl = get_keylist(opt)
if kl and kal: kl.remove_dups(kal,key='wif')
if kl and kal: kl.remove_dup_keys(kal)
tx_num_str = ''
for tx_num,tx_file in enumerate(tx_files,1):

View file

@ -55,7 +55,7 @@ elif invoked_as == 'passchg':
else:
die(1,"'%s': unrecognized invocation" % g.prog_name)
def opts_data(): return {
opts_data = lambda: {
# Can't use: share/Opts doesn't know anything about fmt codes
# 'sets': [('hidden_incog_output_params',bool,'out_fmt','hi')],
'desc': desc.format(pnm=g.proj_name),

View file

@ -17,21 +17,26 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
obj.py: MMGen native classes
obj.py: MMGen native classes
"""
import sys
from decimal import *
from mmgen.color import *
lvl = 0
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
def is_mmgen_id(s): return MMGenID(s,on_fail='silent')
def is_btc_addr(s): return BTCAddr(s,on_fail='silent')
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
def is_tw_label(s): return TwLabel(s,on_fail='silent')
def is_wif(s): return WifKey(s,on_fail='silent')
class MMGenObject(object):
# Pretty-print any object of type MMGenObject, recursing into sub-objects - WIP
# def pmsg(self): sys.stderr.write(self.pformat()+'\n')
# def pdie(self): sys.stderr.write(self.pformat()+'\n'); sys.exit(0)
def pmsg(self): print(self.pformat())
def pdie(self): print(self.pformat()); sys.exit(0)
# Pretty-print any object subclassed from MMGenObject, recursing into sub-objects - WIP
def pmsg(self): print(self.pformat())
def pdie(self): print(self.pformat()); sys.exit(0)
def pformat(self,lvl=0):
from decimal import Decimal
scalars = (str,unicode,int,float,Decimal)
@ -68,8 +73,8 @@ class MMGenObject(object):
# print type(self)
# print dir(self)
# print self.__dict__ # *attributes* of object
# print self.__dict__.keys() # *attributes* of object
# print self.__dict__
# print self.__dict__.keys()
# print self.keys()
out = [u'<{}>{}\n'.format(type(self).__name__,' '+repr(self) if isScalar(self) else '')]
@ -96,39 +101,73 @@ class MMGenObject(object):
class MMGenList(list,MMGenObject): pass
class MMGenDict(dict,MMGenObject): pass
# Descriptor: https://docs.python.org/2/howto/descriptor.html
class MMGenListItemAttr(object):
def __init__(self,name,dtype):
class MMGenImmutableAttr(object): # Descriptor
typeconv = False
builtin_typeconv = False
def __init__(self,name,dtype,typeconv=None,builtin_typeconv=None):
if typeconv is not None:
assert typeconv in (True,False)
self.typeconv = typeconv
if builtin_typeconv is not None:
assert builtin_typeconv
self.builtin_typeconv = builtin_typeconv
self.typeconv = False # override
self.name = name
self.dtype = dtype
def __get__(self,instance,owner):
return instance.__dict__[self.name]
# forbid all reassignment
def chk_ok_set_attr(self,instance):
if hasattr(instance,self.name):
m = "Attribute '{}' of {} instance cannot be reassigned"
raise AttributeError(m.format(self.name,type(instance)))
def __set__(self,instance,value):
# if self.name == 'mmid': print repr(instance), repr(value) # DEBUG
instance.__dict__[self.name] = globals()[self.dtype](value)
self.chk_ok_set_attr(instance)
if self.typeconv: # convert type
instance.__dict__[self.name] = globals()[self.dtype](value)
elif self.builtin_typeconv:
instance.__dict__[self.name] = self.dtype(value)
else: # check type
if type(value) != self.dtype:
m = "Attribute '{}' of {} instance must of type {}"
raise TypeError(m.format(self.name,type(instance),self.dtype))
instance.__dict__[self.name] = value
def __delete__(self,instance):
del instance.__dict__[self.name]
if self.name in instance.delete_ok:
if self.name in instance.__dict__:
del instance.__dict__[self.name]
else:
m = "Atribute '{}' of {} instance cannot be deleted"
raise AttributeError(m.format(self.name,type(instance)))
class MMGenListItemAttr(MMGenImmutableAttr):
typeconv = True
builtin_typeconv = False
# return None if attribute doesn't exist
def __get__(self,instance,owner):
try: return instance.__dict__[self.name]
except: return None
# allow reassignment if value is None or attr in reassign_ok list
def chk_ok_set_attr(self,instance):
if hasattr(instance,self.name) and not (
getattr(instance,self.name) == None or self.name in instance.reassign_ok
):
m = "Attribute '{}' of {} instance cannot be reassigned"
raise AttributeError(m.format(self.name,type(instance)))
class MMGenListItem(MMGenObject):
addr = MMGenListItemAttr('addr','BTCAddr')
amt = MMGenListItemAttr('amt','BTCAmt')
mmid = MMGenListItemAttr('mmid','MMGenID')
label = MMGenListItemAttr('label','TwComment')
attrs = ()
attrs_priv = ()
attrs_reassign = 'label',
def attr_error(self,arg):
raise AttributeError, "'{}': invalid attribute for {}".format(arg,type(self).__name__)
def set_error(self,attr,val):
raise ValueError, \
"'{}': attribute '{}' in instance of class '{}' cannot be reassigned".format(
val,attr,type(self).__name__)
attrs_base = ('attrs','attrs_priv','attrs_reassign','attrs_base','attr_error','set_error',
'__dict__','pformat','pmsg','pdie')
reassign_ok = ()
delete_ok = ()
def __init__(self,*args,**kwargs):
if args:
@ -137,40 +176,18 @@ class MMGenListItem(MMGenObject):
if kwargs[k] != None:
setattr(self,k,kwargs[k])
def __getattribute__(self,name):
ga = object.__getattribute__
if name in ga(self,'attrs') + ga(self,'attrs_priv') + ga(self,'attrs_base'):
try:
return ga(self,name)
except:
return None
else:
self.attr_error(name)
def __setattr__(self,name,val):
if name in (self.attrs + self.attrs_priv + self.attrs_base):
if getattr(self,name) == None or name in self.attrs_reassign:
object.__setattr__(self,name,val)
else:
# object.__setattr__(self,name,val) # DEBUG
self.set_error(name,val)
else:
self.attr_error(name)
def __delattr__(self,name):
if name in (self.attrs + self.attrs_priv + self.attrs_base):
try: # don't know why this is necessary
object.__delattr__(self,name)
except:
pass
else:
self.attr_error(name)
# prevent setting random attributes
def __setattr__(self,name,value):
if name not in type(self).__dict__:
m = "'{}': no such attribute in class {}"
raise AttributeError(m.format(name,type(self)))
return object.__setattr__(self,name,value)
class InitErrors(object):
@staticmethod
def arg_chk(cls,on_fail):
assert on_fail in ('die','return','silent','raise'),"arg_chk in class %s" % cls.__name__
assert on_fail in ('die','return','silent','raise'),'arg_chk in class {}'.format(cls.__name__)
@staticmethod
def init_fail(m,on_fail,silent=False):
@ -527,13 +544,83 @@ class WifKey(str,Hilite,InitErrors):
desc = 'WIF key'
def __new__(cls,s,on_fail='die',errmsg=None):
cls.arg_chk(cls,on_fail)
from mmgen.tx import is_wif
if is_wif(s):
from mmgen.bitcoin import wif2hex
if wif2hex(s):
me = str.__new__(cls,s)
return me
m = errmsg or "'{}': invalid value for {}".format(s,cls.desc)
return cls.init_fail(m,on_fail)
class HexStr(str,Hilite,InitErrors):
color = 'red'
trunc_ok = False
def __new__(cls,s,on_fail='die',case='lower'):
assert case in ('upper','lower')
cls.arg_chk(cls,on_fail)
from string import hexdigits
if set(s) <= set(getattr(hexdigits,case)()) and not len(s) % 2:
return str.__new__(cls,s)
m = "'{}': value cannot be converted to {}".format(s,cls.__name__)
return cls.init_fail(m,on_fail)
class PubKey(HexStr,MMGenObject):
def __new__(cls,s,compressed,on_fail='die'):
assert type(compressed) == bool
me = HexStr.__new__(cls,s,case='lower')
me.compressed = compressed
return me
class PrivKey(str,Hilite,InitErrors,MMGenObject):
color = 'red'
width = 64
trunc_ok = False
compressed = MMGenImmutableAttr('compressed',bool)
wif = MMGenImmutableAttr('wif',WifKey)
def __new__(*args,**kwargs): # initialize with (priv_bin,compressed), WIF or self
cls = args[0]
assert set(kwargs) <= set(['on_fail','wif'])
on_fail = kwargs['on_fail'] if 'on_fail' in kwargs else 'die'
cls.arg_chk(cls,on_fail)
if len(args) == 2:
assert type(args[1]) == cls
return args[1]
if 'wif' in kwargs:
assert len(args) == 1
try:
from mmgen.bitcoin import wif2hex,wif_is_compressed # TODO: move these here
wif = WifKey(kwargs['wif'])
me = str.__new__(cls,wif2hex(wif))
me.compressed = wif_is_compressed(wif)
me.wif = wif
return me
except:
fs = "Value '{}' cannot be converted to WIF key"
errmsg = fs.format(kwargs['wif'])
return cls.init_fail(errmsg,on_fail)
cls,s,compressed = args
try:
from binascii import hexlify
assert len(s) == cls.width / 2
me = str.__new__(cls,hexlify(s))
me.compressed = compressed
me.wif = me.towif()
return me
except:
fs = "Key={}\nCompressed={}\nValue pair cannot be converted to {}"
errmsg = fs.format(repr(s),compressed,cls.__name__)
return cls.init_fail(errmsg,on_fail)
def towif(self):
from mmgen.bitcoin import hex2wif
return WifKey(hex2wif(self,compressed=self.compressed))
class MMGenAddrType(str,Hilite,InitErrors):
width = 1
trunc_ok = False

View file

@ -24,7 +24,7 @@ import sys, getopt
# from mmgen.util import mdie,die,pdie,pmsg # DEBUG
def usage(opts_data):
print 'USAGE: %s %s' % (opts_data['prog_name'], opts_data['usage'])
print('USAGE: %s %s' % (opts_data['prog_name'], opts_data['usage']))
sys.exit(2)
def print_help_and_exit(opts_data,longhelp=False):
@ -36,9 +36,9 @@ def print_help_and_exit(opts_data,longhelp=False):
hdr = ('OPTIONS:',' LONG OPTIONS:')[longhelp]
ls = (' ','')[longhelp]
es = ('',' ')[longhelp]
out += '{ls}{}\n{ls}{es}{}\n'.format(hdr,('\n'+ls).join(od_opts),ls=ls,es=es)
out += '{ls}{}\n{ls}{es}{}'.format(hdr,('\n'+ls).join(od_opts),ls=ls,es=es)
if 'notes' in opts_data and not longhelp:
out += ' ' + '\n '.join(opts_data['notes'][1:-1].splitlines())
out += '\n ' + '\n '.join(opts_data['notes'][1:-1].splitlines())
print(out)
sys.exit(0)
@ -51,7 +51,7 @@ def process_opts(argv,opts_data,short_opts,long_opts,defer_help=False):
so_str = short_opts.replace('-:','').replace('-','')
try: cl_opts,args = getopt.getopt(argv[1:], so_str, long_opts)
except getopt.GetoptError as err:
print str(err); sys.exit(2)
print(str(err)); sys.exit(2)
sopts_list = ':_'.join(['_'.join(list(i)) for i in short_opts.split(':')]).split('_')
opts,do_help = {},False

View file

@ -38,11 +38,19 @@ class MMGenTrackingWallet(MMGenObject):
class MMGenTwOutputList(list,MMGenObject): pass
class MMGenTwOutput(MMGenListItem):
class MMGenTwUnspentOutput(MMGenListItem):
# attrs = 'txid','vout','amt','label','twmmid','addr','confs','scriptPubKey','days','skip'
reassign_ok = 'label','skip'
txid = MMGenListItemAttr('txid','BitcoinTxID')
vout = MMGenListItemAttr('vout',int,typeconv=False),
amt = MMGenListItemAttr('amt','BTCAmt'),
label = MMGenListItemAttr('label','TwComment'),
twmmid = MMGenListItemAttr('twmmid','TwMMGenID')
txid = MMGenListItemAttr('txid','BitcoinTxID')
attrs_reassign = 'label','skip'
attrs = 'txid','vout','amt','label','twmmid','addr','confs','scriptPubKey','days','skip'
addr = MMGenListItemAttr('addr','BTCAddr'),
confs = MMGenListItemAttr('confs',int,typeconv=False),
scriptPubKey = MMGenListItemAttr('scriptPubKey','HexStr')
days = MMGenListItemAttr('days',int,typeconv=False),
skip = MMGenListItemAttr('skip',bool,typeconv=False),
wmsg = {
'no_spendable_outputs': """
@ -87,12 +95,12 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
'twmmid': l.mmid,
'label': l.comment,
'days': int(o['confirmations'] * g.mins_per_block / (60*24)),
'amt': o['amount'], # TODO
'addr': o['address'],
'amt': BTCAmt(o['amount']), # TODO
'addr': BTCAddr(o['address']), # TODO
'confs': o['confirmations']
})
mm_rpc.append(o)
self.unspent = self.MMGenTwOutputList([self.MMGenTwOutput(**dict([(k,v) for k,v in o.items() if k in self.MMGenTwOutput.attrs])) for o in mm_rpc])
self.unspent = self.MMGenTwOutputList([self.MMGenTwUnspentOutput(**dict([(k,v) for k,v in o.items() if k in self.MMGenTwUnspentOutput.__dict__])) for o in mm_rpc])
for u in self.unspent:
if u.label == None: u.label = ''
if not self.unspent:

View file

@ -26,18 +26,6 @@ from binascii import unhexlify
from mmgen.common import *
from mmgen.obj import *
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
def is_mmgen_id(s): return MMGenID(s,on_fail='silent')
def is_btc_addr(s): return BTCAddr(s,on_fail='silent')
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
def is_tw_label(s): return TwLabel(s,on_fail='silent')
def is_wif(s):
if s == '': return False
from mmgen.bitcoin import wif2hex
return bool(wif2hex(s))
def segwit_is_active(exit_on_error=False):
d = bitcoin_connection().getblockchaininfo()
if d['chain'] == 'regtest':
@ -125,6 +113,19 @@ class DeserializedTX(OrderedDict,MMGenObject): # need to add MMGen types
keys = 'txid','version','lock_time','witness_size','num_txins','txins','num_txouts','txouts'
return OrderedDict.__init__(self, ((k,d[k]) for k in keys))
txio_attrs = {
'reassign_ok': ('label',),
'delete_ok': ('have_wif',),
'vout': MMGenListItemAttr('vout',int,typeconv=False),
'amt': MMGenListItemAttr('amt','BTCAmt'),
'label': MMGenListItemAttr('label','TwComment'),
'mmid': MMGenListItemAttr('mmid','MMGenID'),
'addr': MMGenListItemAttr('addr','BTCAddr'),
'confs': MMGenListItemAttr('confs',int,builtin_typeconv=True), # long confs found in the wild, so convert
'txid': MMGenListItemAttr('txid','BitcoinTxID'),
'have_wif': MMGenListItemAttr('have_wif',bool,typeconv=False)
}
class MMGenTX(MMGenObject):
ext = 'rawtx'
raw_ext = 'rawtx'
@ -133,17 +134,13 @@ class MMGenTX(MMGenObject):
desc = 'transaction'
class MMGenTxInput(MMGenListItem):
attrs = 'txid','vout','amt','label','mmid','addr','confs','scriptPubKey','have_wif','sequence'
txid = MMGenListItemAttr('txid','BitcoinTxID')
for k in txio_attrs: locals()[k] = txio_attrs[k] # in lieu of inheritance
scriptPubKey = MMGenListItemAttr('scriptPubKey','HexStr')
sequence = MMGenListItemAttr('sequence',int,typeconv=False)
class MMGenTxOutput(MMGenListItem):
attrs = 'txid','vout','amt','label','mmid','addr','have_wif','is_chg'
class MMGenTxInputOldFmt(MMGenListItem): # for converting old tx files only
tr = {'amount':'amt', 'address':'addr', 'confirmations':'confs','comment':'label'}
attrs = 'txid','vout','amt','label','mmid','addr','confs','scriptPubKey','wif'
attrs_priv = 'tr',
for k in txio_attrs: locals()[k] = txio_attrs[k]
is_chg = MMGenListItemAttr('is_chg',bool,typeconv=False)
class MMGenTxInputList(list,MMGenObject): pass
class MMGenTxOutputList(list,MMGenObject): pass
@ -204,12 +201,6 @@ class MMGenTX(MMGenObject):
e.mmid,f = d[e.addr]
if f: e.label = f
# def encode_io(self,desc):
# tr = getattr((self.MMGenTxOutput,self.MMGenTxInput)[desc=='inputs'],'tr')
# tr_rev = dict([(v,k) for k,v in tr.items()])
# return [dict([(tr_rev[e] if e in tr_rev else e,getattr(d,e)) for e in d.__dict__])
# for d in getattr(self,desc)]
#
def create_raw(self,c):
i = [{'txid':e.txid,'vout':e.vout} for e in self.inputs]
if self.inputs[0].sequence:
@ -372,11 +363,6 @@ class MMGenTX(MMGenObject):
tx_fee = my_raw_input('Enter transaction fee: ')
desc = 'User-selected'
# inputs methods
def list_wifs(self,desc,mmaddrs_only=False):
return [e.wif for e in getattr(self,desc) if e.mmid] if mmaddrs_only \
else [e.wif for e in getattr(self,desc)]
def delete_attrs(self,desc,attr):
for e in getattr(self,desc):
if hasattr(e,attr): delattr(e,attr)
@ -386,20 +372,23 @@ class MMGenTX(MMGenObject):
(self.MMGenTxOutput,self.MMGenTxOutputList),
(self.MMGenTxInput,self.MMGenTxInputList)
)[desc=='inputs']
return il([io(**dict([(k,d[k]) for k in io.attrs
return il([io(**dict([(k,d[k]) for k in io.__dict__
if k in d and d[k] not in ('',None)])) for d in data])
def decode_io_oldfmt(self,data):
io = self.MMGenTxInputOldFmt
tr_rev = dict([(v,k) for k,v in io.tr.items()])
copy_keys = [tr_rev[k] if k in tr_rev else k for k in io.attrs]
return [io(**dict([(io.tr[k] if k in io.tr else k,d[k])
for k in copy_keys if k in d and d[k] != ''])) for d in data]
tr = {'amount':'amt', 'address':'addr', 'confirmations':'confs','comment':'label'}
tr_rev = dict([(v,k) for k,v in tr.items()])
copy_keys = [tr_rev[k] if k in tr_rev else k for k in self.MMGenTxInput.__dict__]
ret = MMGenList(self.MMGenTxInput(**dict([(tr[k] if k in tr else k,d[k])
for k in copy_keys if k in d and d[k] != ''])) for d in data)
for i in ret: i.sequence = int('0xffffffff',16)
return ret
# inputs methods
def copy_inputs_from_tw(self,tw_unspent_data):
txi,self.inputs = self.MMGenTxInput,self.MMGenTxInputList()
for d in tw_unspent_data:
t = txi(**dict([(attr,getattr(d,attr)) for attr in d.__dict__ if attr in txi.attrs]))
t = txi(**dict([(attr,getattr(d,attr)) for attr in d.__dict__ if attr in txi.__dict__]))
if d.twmmid.type == 'mmgen': t.mmid = d.twmmid # twmmid -> mmid
self.inputs.append(t)
@ -443,37 +432,35 @@ class MMGenTX(MMGenObject):
def get_non_mmaddrs(self,desc):
return list(set(i.addr for i in getattr(self,desc) if not i.mmid))
# return true or false, don't exit
# return true or false; don't exit
def sign(self,c,tx_num_str,keys):
self.die_if_incorrect_chain()
if g.coin == 'BCH' and self.has_segwit_inputs():
die(2,yellow("Segwit inputs cannot be spent on BCH chain!"))
if g.coin == 'BCH' and (self.has_segwit_inputs() or self.has_segwit_outputs()):
die(2,yellow("Segwit inputs cannot be spent or spent to on the BCH chain!"))
if not keys:
msg('No keys. Cannot sign!')
return False
qmsg('Passing {} key{} to bitcoind'.format(len(keys),suf(keys,'s')))
qmsg('Passing %s key%s to bitcoind' % (len(keys),suf(keys,'s')))
if self.has_segwit_inputs():
from mmgen.addr import KeyGenerator,AddrGenerator
kg = KeyGenerator()
ag = AddrGenerator('segwit')
keydict = MMGenDict([(d.addr,d.sec) for d in keys])
sig_data = []
for d in self.inputs:
e = dict([(k,getattr(d,k)) for k in ('txid','vout','scriptPubKey','amt')])
e['amount'] = e['amt']
del e['amt']
wif = keys[d.addr]
if d.mmid and d.mmid.mmtype == 'S':
from mmgen.bitcoin import pubhex2redeem_script
from mmgen.addr import keygen_wif2pubhex,keygen_selector
pubhex = keygen_wif2pubhex(wif,keygen_selector())
e['redeemScript'] = pubhex2redeem_script(pubhex)
e['redeemScript'] = ag.to_segwit_redeem_script(kg.to_pubhex(keydict[d.addr]))
sig_data.append(e)
from mmgen.bitcoin import hash256
msg_r('Signing transaction{}...'.format(tx_num_str))
ht = ('ALL','ALL|FORKID')[g.coin=='BCH'] # sighashtype defaults to 'ALL'
ret = c.signrawtransaction(self.hex,sig_data,keys.values(),ht,on_fail='return')
wifs = [d.sec.wif for d in keys]
ret = c.signrawtransaction(self.hex,sig_data,wifs,ht,on_fail='return')
from mmgen.rpc import rpc_error,rpc_errmsg
if rpc_error(ret):
@ -586,12 +573,7 @@ class MMGenTX(MMGenObject):
confirm_or_exit(m1,m2,m3)
msg('Sending transaction')
if bogus_send:
ret = 'deadbeef' * 8
m = 'BOGUS transaction NOT sent: %s'
else:
ret = c.sendrawtransaction(self.hex,on_fail='return')
m = 'Transaction sent: %s'
ret = None if bogus_send else c.sendrawtransaction(self.hex,on_fail='return')
from mmgen.rpc import rpc_error,rpc_errmsg
if rpc_error(ret):
@ -608,10 +590,13 @@ class MMGenTX(MMGenObject):
msg(red('Send of MMGen transaction {} failed'.format(self.txid)))
return False
else:
if not bogus_send:
if bogus_send:
m = 'BOGUS transaction NOT sent: {}'
else:
assert ret == self.btc_txid, 'txid mismatch (after sending)'
m = 'Transaction sent: {}'
self.desc = 'sent transaction'
msg(m % self.btc_txid.hl())
msg(m.format(self.btc_txid.hl()))
self.add_timestamp()
self.add_blockcount(c)
return True
@ -666,7 +651,6 @@ class MMGenTX(MMGenObject):
self.inputs[0].sequence = g.max_int - 2
def format_view(self,terse=False):
# self.pdie()
try:
blockcount = bitcoin_connection().getblockcount()
except:

View file

@ -86,19 +86,20 @@ def get_seed_for_seed_id(sid,infiles,saved_seeds):
saved_seeds[ss.seed.sid] = ss.seed
if ss.seed.sid == sid: return ss.seed
def generate_keys_for_mmgen_addrs(mmgen_addrs,infiles,saved_seeds):
sids = set(i.sid for i in mmgen_addrs)
def generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds):
mmids = [e.mmid for e in need_keys]
sids = set(i.sid for i in mmids)
vmsg('Need seed%s: %s' % (suf(sids,'s'),' '.join(sids)))
d = AddrListList()
d = MMGenList()
from mmgen.addr import KeyAddrList
for sid in sids:
# Returns only if seed is found
seed = get_seed_for_seed_id(sid,infiles,saved_seeds)
for t in MMGenAddrType.mmtypes:
idx_list = [i.idx for i in mmgen_addrs if i.sid == sid and i.mmtype == t]
idx_list = [i.idx for i in mmids if i.sid == sid and i.mmtype == t]
if idx_list:
addr_idxs = AddrIdxList(idx_list=idx_list)
d += KeyAddrList(seed=seed,addr_idxs=addr_idxs,do_chksum=False,mmtype=MMGenAddrType(t)).flat_list()
d.append(KeyAddrList(seed=seed,addr_idxs=addr_idxs,do_chksum=False,mmtype=MMGenAddrType(t)))
return d
def add_keys(tx,src,infiles=None,saved_seeds=None,keyaddr_list=None):
@ -107,18 +108,20 @@ def add_keys(tx,src,infiles=None,saved_seeds=None,keyaddr_list=None):
desc,m1 = ('key-address file','From key-address file:') if keyaddr_list else \
('seed(s)','Generated from seed:')
qmsg('Checking {} -> {} address mappings for {} (from {})'.format(pnm,g.coin,src,desc))
d = keyaddr_list.flat_list() if keyaddr_list else \
generate_keys_for_mmgen_addrs([e.mmid for e in need_keys],infiles,saved_seeds)
d = MMGenList([keyaddr_list]) if keyaddr_list else \
generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds)
new_keys = []
for e in need_keys:
for f in d:
if f.mmid == e.mmid:
if f.addr == e.addr:
e.have_wif = True
if src == 'inputs':
new_keys.append((f.addr,f.wif))
else:
die(3,wmsg['mapping_error'].format(m1,f.mmid,f.addr,'tx file:',e.mmid,e.addr))
for kal in d:
for f in kal.data:
mmid = '{}:{}'.format(kal.al_id,f.idx)
if mmid == e.mmid:
if f.addr == e.addr:
e.have_wif = True
if src == 'inputs':
new_keys.append(f)
else:
die(3,wmsg['mapping_error'].format(m1,mmid,f.addr,'tx file:',e.mmid,e.addr))
if new_keys:
vmsg('Added %s wif key%s from %s' % (len(new_keys),suf(new_keys,'s'),desc))
return new_keys
@ -151,22 +154,22 @@ def get_keyaddrlist(opt):
def get_keylist(opt):
if opt.keys_from_file:
l = get_lines_from_file(opt.keys_from_file,'key-address data',trim_comments=True)
ret = KeyAddrList(keylist=[m.split()[0] for m in l]) # accept bitcoind wallet dumps
ret.generate_addrs_from_keylist()
return ret
kal = KeyAddrList(keylist=[m.split()[0] for m in l]) # accept bitcoind wallet dumps
kal.generate_addrs_from_keys()
return kal
return None
def txsign(opt,c,tx,seed_files,kl,kal,tx_num_str=''):
# Start
keys = []
# tx.pmsg()
keys = MMGenList() # list of AddrListEntry objects
non_mm_addrs = tx.get_non_mmaddrs('inputs')
if non_mm_addrs:
tmp = KeyAddrList(addrlist=non_mm_addrs,do_chksum=False)
tmp.add_wifs(kl)
m = tmp.list_missing('wif')
m = tmp.list_missing('sec')
if m: die(2,wmsg['missing_keys_error'].format(suf(m,'es'),'\n '.join(m)))
keys += tmp.get_addr_wif_pairs()
keys += tmp.data
if opt.mmgen_keys_from_file:
keys += add_keys(tx,'inputs',keyaddr_list=kal)
@ -175,6 +178,7 @@ def txsign(opt,c,tx,seed_files,kl,kal,tx_num_str=''):
keys += add_keys(tx,'inputs',seed_files,saved_seeds)
add_keys(tx,'outputs',seed_files,saved_seeds)
# this attr must not be written to file
tx.delete_attrs('inputs','have_wif')
tx.delete_attrs('outputs','have_wif')
@ -182,7 +186,7 @@ def txsign(opt,c,tx,seed_files,kl,kal,tx_num_str=''):
if extra_sids:
msg('Unused Seed ID{}: {}'.format(suf(extra_sids,'s'),' '.join(extra_sids)))
if tx.sign(c,tx_num_str,dict(keys)):
if tx.sign(c,tx_num_str,keys):
return tx
else:
die(3,red('Transaction {}could not be signed.'.format(tx_num_str)))

View file

@ -1,14 +1,18 @@
#!/usr/bin/env python
import sys,os
repo_root = os.path.split(os.path.abspath(os.path.dirname(sys.argv[0])))[0]
sys.path = [repo_root] + sys.path
from mmgen.common import *
opts_data = {
opts_data = lambda: {
'desc': 'Compute checksum for a MMGen data file',
'usage':'[opts] infile',
'options': """
-h, --help Print this help message.
-i, --include-first-line Include the first line of the file (you probably don't want this)
""".strip()
"""
}
cmd_args = opts.init(opts_data)

View file

@ -5,15 +5,9 @@ repo_root = os.path.split(os.path.abspath(os.path.dirname(sys.argv[0])))[0]
sys.path = [repo_root] + sys.path
from mmgen.common import *
from mmgen.tool import *
from mmgen.tx import *
from mmgen.bitcoin import *
from mmgen.obj import MMGenTXLabel
from mmgen.seed import *
from mmgen.term import do_pager
help_data = {
opts_data = lambda: {
'desc': "Convert MMGen transaction file from old format to new format",
'usage': "<tx file>",
'options': """
@ -22,52 +16,42 @@ help_data = {
"""
}
import mmgen.opts
cmd_args = opts.init(help_data)
cmd_args = opts.init(opts_data)
if len(cmd_args) != 1: opts.usage()
def parse_tx_file(infile):
err_str,err_fmt = '','Invalid %s in transaction file'
err_fmt = 'Invalid {} in transaction file'
tx_data = get_lines_from_file(infile)
if len(tx_data) == 5:
metadata,tx_hex,inputs_data,outputs_data,comment = tx_data
elif len(tx_data) == 4:
metadata,tx_hex,inputs_data,outputs_data = tx_data
comment = ''
else:
try:
err_str = 'number of lines'
if not err_str:
if len(metadata.split()) != 3:
err_str = 'metadata'
else:
try: unhexlify(tx_hex)
except: err_str = 'hex data'
assert len(tx_data) in (4,5)
if len(tx_data) == 5:
metadata,tx_hex,inputs,outputs,comment = tx_data
elif len(tx_data) == 4:
metadata,tx_hex,inputs,outputs = tx_data
comment = ''
err_str = 'metadata'
assert len(metadata.split()) == 3
err_str = 'hex data'
unhexlify(tx_hex)
err_str = 'inputs data'
inputs = eval(inputs)
err_str = 'btc-to-mmgen address map data'
outputs = eval(outputs)
if comment:
from mmgen.bitcoin import b58decode
comment = b58decode(comment)
if comment == False:
err_str = 'encoded comment (not base58)'
else:
try: inputs_data = eval(inputs_data)
except: err_str = 'inputs data'
else:
try: outputs_data = eval(outputs_data)
except: err_str = 'btc-to-mmgen address map data'
else:
if comment:
from mmgen.bitcoin import b58decode
comment = b58decode(comment)
if comment == False:
err_str = 'encoded comment (not base58)'
else:
try:
comment = MMGenTXLabel(comment)
except:
err_str = 'comment'
if err_str:
msg(err_fmt % err_str)
sys.exit(2)
err_str = 'comment'
comment = MMGenTXLabel(comment)
except:
die(2,err_fmt.format(err_str))
else:
return metadata.split(),tx_hex,inputs_data,outputs_data,comment
return metadata.split(),tx_hex,inputs,outputs,comment
def find_block_by_time(c,timestamp):
secs = decode_timestamp(timestamp)
@ -95,15 +79,14 @@ def find_block_by_time(c,timestamp):
tx = MMGenTX()
[tx.txid,send_amt,tx.timestamp],tx.hex,inputs,b2m_map,tx.label = parse_tx_file(cmd_args[0])
metadata,tx.hex,inputs,b2m_map,tx.label = parse_tx_file(cmd_args[0])
tx.txid,send_amt,tx.timestamp = metadata
tx.send_amt = Decimal(send_amt)
g.testnet = False
g.rpc_host = 'localhost'
c = bitcoin_connection()
# attrs = 'txid','vout','amt','comment','mmid','addr','wif'
#pp_msg(inputs)
for i in inputs:
if not 'mmid' in i and 'account' in i:
from mmgen.tw import parse_tw_acct_label
@ -112,16 +95,14 @@ for i in inputs:
i['mmid'] = a.decode('utf8')
if b: i['comment'] = b.decode('utf8')
#pp_msg(inputs)
tx.inputs = tx.decode_io_oldfmt(inputs)
if tx.check_signed(c):
if tx.marked_signed(c):
msg('Transaction is signed')
dec_tx = c.decoderawtransaction(tx.hex)
tx.outputs = [MMGenTxOutput(addr=i['scriptPubKey']['addresses'][0],amt=i['value'])
for i in dec_tx['vout']]
tx.outputs = MMGenList(MMGenTX.MMGenTxOutput(addr=i['scriptPubKey']['addresses'][0],amt=i['value'])
for i in dec_tx['vout'])
for e in tx.outputs:
if e.addr in b2m_map:
f = b2m_map[e.addr]
@ -132,9 +113,6 @@ for e in tx.outputs:
if e.addr == f.addr and f.mmid:
e.mmid = f.mmid
if f.label: e.label = f.label.decode('utf8')
#for i in tx.inputs: print i
#for i in tx.outputs: print i
#die(1,'')
tx.blockcount = find_block_by_time(c,tx.timestamp)
tx.blockcount = find_block_by_time(c,tx.timestamp)
tx.write_to_file(ask_tty=False)

View file

@ -32,7 +32,7 @@ from mmgen.common import *
from mmgen.bitcoin import hex2wif
rounds = 100
def opts_data(): return {
opts_data = lambda: {
'desc': "Test address generation in various ways",
'usage':'[options] [spec] [rounds | dump file]',
'options': """
@ -118,25 +118,27 @@ def match_error(sec,wif,a_addr,b_addr,a,b):
mmtype = ('L','S')[bool(opt.segwit)]
compressed = True
from mmgen.addr import KeyGenerator,AddrGenerator
from mmgen.obj import PrivKey
ag = AddrGenerator(('p2pkh','segwit')[bool(opt.segwit)])
if a and b:
m = "Comparing address generators '{}' and '{}'"
qmsg(green(m.format(g.key_generators[a-1],g.key_generators[b-1])))
from mmgen.addr import get_privhex2addr_f
gen_a = get_privhex2addr_f(generator=a)
gen_b = get_privhex2addr_f(generator=b)
last_t = time.time()
kg_a = KeyGenerator(a)
kg_b = KeyGenerator(b)
for i in range(rounds):
if time.time() - last_t >= 0.1:
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
last_t = time.time()
sec = hexlify(os.urandom(32))
wif = hex2wif(sec,compressed=compressed)
a_addr = gen_a(sec,compressed,mmtype=mmtype)
b_addr = gen_b(sec,compressed,mmtype=mmtype)
vmsg('\nkey: %s\naddr: %s\n' % (wif,a_addr))
sec = PrivKey(os.urandom(32),compressed)
a_addr = ag.to_addr(kg_a.to_pubhex(sec))
b_addr = ag.to_addr(kg_b.to_pubhex(sec))
vmsg('\nkey: %s\naddr: %s\n' % (sec.wif,a_addr))
if a_addr != b_addr:
match_error(sec,wif,a_addr,b_addr,a,b)
match_error(sec,sec.wif,a_addr,b_addr,a,b)
if not opt.segwit:
compressed = not compressed
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
@ -145,23 +147,21 @@ if a and b:
elif a and not fh:
m = "Testing speed of address generator '{}'"
qmsg(green(m.format(g.key_generators[a-1])))
from mmgen.addr import get_privhex2addr_f
gen = get_privhex2addr_f(generator=a)
from struct import pack,unpack
seed = os.urandom(28)
print 'Incrementing key with each round'
print 'Starting key:', hexlify(seed+pack('I',0))
import time
start = last_t = time.time()
kg = KeyGenerator(a)
for i in range(rounds):
if time.time() - last_t >= 0.1:
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
last_t = time.time()
sec = hexlify(seed+pack('I',i))
wif = hex2wif(sec,compressed=compressed)
a_addr = gen(sec,compressed,mmtype=mmtype)
vmsg('\nkey: %s\naddr: %s\n' % (wif,a_addr))
sec = PrivKey(seed+pack('I',i),compressed)
a_addr = ag.to_addr(kg.to_pubhex(sec))
vmsg('\nkey: %s\naddr: %s\n' % (sec.wif,a_addr))
if not opt.segwit:
compressed = not compressed
qmsg_r('\rRound %s/%s ' % (i+1,rounds))
@ -170,18 +170,15 @@ elif a and not fh:
elif a and dump:
m = "Comparing output of address generator '{}' against wallet dump '{}'"
qmsg(green(m.format(g.key_generators[a-1],cmd_args[1])))
if a == 2:
qmsg("NOTE: for compressed addresses, 'python-ecdsa' generator will be used")
from mmgen.addr import get_privhex2addr_f
gen_a = get_privhex2addr_f(generator=a)
from mmgen.bitcoin import wif2hex
kg = KeyGenerator(a)
for n,[wif,a_addr] in enumerate(dump,1):
qmsg_r('\rKey %s/%s ' % (n,len(dump)))
sec = wif2hex(wif)
if sec == False:
try:
sec = PrivKey(wif=wif)
except:
die(2,'\nInvalid {}net WIF address in dump file: {}'.format(('main','test')[g.testnet],wif))
compressed = wif[0] != ('5','9')[g.testnet]
b_addr = gen_a(sec,compressed,'L')
b_addr = ag.to_addr(kg.to_pubhex(sec))
if a_addr != b_addr:
match_error(sec,wif,a_addr,b_addr,1 if compressed and a==2 else a,4)
match_error(sec,wif,a_addr,b_addr,3,a)
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))

View file

@ -100,7 +100,7 @@ if not any(e in ('--skip-deps','--resume','-S','-r') for e in sys.argv+shortopts
except: pass
os.symlink(dd,data_dir)
def opts_data(): return {
opts_data = lambda: {
'desc': 'Test suite for the MMGen suite',
'usage':'[options] [command(s) or metacommand(s)]',
'options': """

View file

@ -115,7 +115,7 @@ cfg = {
'addrfile_chk': '6FEF 6FB9 7B13 5D91',
}
def opts_data(): return {
opts_data = lambda: {
'desc': "Test suite for the 'mmgen-tool' utility",
'usage':'[options] [command]',
'options': """