addr.py: move AddrList and related classes to addrlist.py
- also move PasswordList to passwdlist.py
- add a minimal unit test
Testing:
$ test/unit_tests.py addrlist
This commit is contained in:
parent
5961d1c36d
commit
e0352568db
20 changed files with 722 additions and 585 deletions
539
mmgen/addr.py
539
mmgen/addr.py
|
|
@ -22,18 +22,10 @@ addr.py: Address generation/display routines for the MMGen suite
|
|||
|
||||
from hashlib import sha256,sha512
|
||||
from .common import *
|
||||
from .base_obj import AsyncInit
|
||||
from .objmethods import Hilite,InitErrors,MMGenObject
|
||||
from .objmethods import MMGenObject
|
||||
from .obj import *
|
||||
from .baseconv import *
|
||||
from .protocol import init_proto,hash160
|
||||
from .seed import SeedID,is_seed_id
|
||||
|
||||
pnm = g.proj_name
|
||||
|
||||
def dmsg_sc(desc,data):
|
||||
if g.debug_addrlist:
|
||||
Msg(f'sc_debug_{desc}: {data}')
|
||||
from .protocol import hash160
|
||||
|
||||
class AddrGenerator(MMGenObject):
|
||||
def __new__(cls,proto,addr_type):
|
||||
|
|
@ -295,536 +287,9 @@ class KeyGeneratorDummy(KeyGenerator):
|
|||
s = privhex,
|
||||
privkey = privhex )
|
||||
|
||||
class AddrListEntryBase(MMGenListItem):
|
||||
invalid_attrs = {'proto'}
|
||||
def __init__(self,proto,**kwargs):
|
||||
self.__dict__['proto'] = proto
|
||||
MMGenListItem.__init__(self,**kwargs)
|
||||
|
||||
class AddrListEntry(AddrListEntryBase):
|
||||
addr = ListItemAttr('CoinAddr',include_proto=True)
|
||||
idx = ListItemAttr('AddrIdx') # not present in flat addrlists
|
||||
label = ListItemAttr('TwComment',reassign_ok=True)
|
||||
sec = ListItemAttr('PrivKey',include_proto=True)
|
||||
viewkey = ListItemAttr('ViewKey',include_proto=True)
|
||||
wallet_passwd = ListItemAttr('WalletPassword')
|
||||
|
||||
class PasswordListEntry(AddrListEntryBase):
|
||||
passwd = ListItemAttr(str,typeconv=False) # TODO: create Password type
|
||||
idx = ImmutableAttr('AddrIdx')
|
||||
label = ListItemAttr('TwComment',reassign_ok=True)
|
||||
sec = ListItemAttr('PrivKey',include_proto=True)
|
||||
|
||||
class AddrListChksum(str,Hilite):
|
||||
color = 'pink'
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls,addrlist):
|
||||
ea = addrlist.al_id.mmtype.extra_attrs # add viewkey and passwd to the mix, if present
|
||||
if ea == None: ea = ()
|
||||
lines = [' '.join(
|
||||
addrlist.chksum_rec_f(e) +
|
||||
tuple(getattr(e,a) for a in ea if getattr(e,a))
|
||||
) for e in addrlist.data]
|
||||
return str.__new__(cls,make_chksum_N(' '.join(lines), nchars=16, sep=True))
|
||||
|
||||
class AddrListIDStr(str,Hilite):
|
||||
color = 'green'
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls,addrlist,fmt_str=None):
|
||||
idxs = [e.idx for e in addrlist.data]
|
||||
prev = idxs[0]
|
||||
ret = prev,
|
||||
for i in idxs[1:]:
|
||||
if i == prev + 1:
|
||||
if i == idxs[-1]: ret += '-', i
|
||||
else:
|
||||
if prev != ret[-1]: ret += '-', prev
|
||||
ret += ',', i
|
||||
prev = i
|
||||
s = ''.join(map(str,ret))
|
||||
|
||||
if fmt_str:
|
||||
ret = fmt_str.format(s)
|
||||
else:
|
||||
bc = (addrlist.proto.base_coin,addrlist.proto.coin)[addrlist.proto.base_coin=='ETH']
|
||||
mt = addrlist.al_id.mmtype
|
||||
ret = '{}{}{}[{}]'.format(
|
||||
addrlist.al_id.sid,
|
||||
('-'+bc,'')[bc == 'BTC'],
|
||||
('-'+mt,'')[mt in ('L','E')],
|
||||
s )
|
||||
|
||||
dmsg_sc('id_str',ret[8:].split('[')[0])
|
||||
|
||||
return str.__new__(cls,ret)
|
||||
|
||||
class AddrList(MMGenObject): # Address info for a single seed ID
|
||||
msgs = {
|
||||
'record_chksum': """
|
||||
Record this checksum: it will be used to verify the address file in the future
|
||||
""".strip(),
|
||||
'check_chksum': 'Check this value against your records',
|
||||
'removed_dup_keys': f"""
|
||||
Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
|
||||
""".strip(),
|
||||
}
|
||||
entry_type = AddrListEntry
|
||||
main_attr = 'addr'
|
||||
desc = 'address'
|
||||
gen_desc = 'address'
|
||||
gen_desc_pl = 'es'
|
||||
gen_addrs = True
|
||||
gen_passwds = False
|
||||
gen_keys = False
|
||||
has_keys = False
|
||||
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr)
|
||||
|
||||
def __init__(self,proto,
|
||||
addrfile = '',
|
||||
al_id = '',
|
||||
adata = [],
|
||||
seed = '',
|
||||
addr_idxs = '',
|
||||
src = '',
|
||||
addrlist = '',
|
||||
keylist = '',
|
||||
mmtype = None,
|
||||
skip_key_address_validity_check = False,
|
||||
skip_chksum = False ):
|
||||
|
||||
self.skip_ka_check = skip_key_address_validity_check
|
||||
self.update_msgs()
|
||||
mmtype = mmtype or proto.dfl_mmtype
|
||||
assert mmtype in MMGenAddrType.mmtypes, f'{mmtype}: mmtype not in {MMGenAddrType.mmtypes!r}'
|
||||
|
||||
from .protocol import CoinProtocol
|
||||
self.bitcoin_addrtypes = tuple(
|
||||
MMGenAddrType(CoinProtocol.Bitcoin,key).name for key in CoinProtocol.Bitcoin.mmtypes)
|
||||
|
||||
self.proto = proto
|
||||
|
||||
do_chksum = False
|
||||
if seed and addr_idxs: # data from seed + idxs
|
||||
self.al_id,src = AddrListID(seed.sid,mmtype),'gen'
|
||||
adata = self.generate(seed,addr_idxs)
|
||||
do_chksum = True
|
||||
elif addrfile: # data from MMGen address file
|
||||
self.infile = addrfile
|
||||
adata = self.get_file().parse_file(addrfile) # sets self.al_id
|
||||
do_chksum = True
|
||||
elif al_id and adata: # data from tracking wallet
|
||||
self.al_id = al_id
|
||||
elif addrlist: # data from flat address list
|
||||
self.al_id = None
|
||||
addrlist = remove_dups(addrlist,edesc='address',desc='address list')
|
||||
adata = AddrListData([AddrListEntry(proto=proto,addr=a) for a in addrlist])
|
||||
elif keylist: # data from flat key list
|
||||
self.al_id = None
|
||||
keylist = remove_dups(keylist,edesc='key',desc='key list',hide=True)
|
||||
adata = AddrListData([AddrListEntry(proto=proto,sec=PrivKey(proto=proto,wif=k)) for k in keylist])
|
||||
elif seed or addr_idxs:
|
||||
die(3,'Must specify both seed and addr indexes')
|
||||
elif al_id or adata:
|
||||
die(3,'Must specify both al_id and adata')
|
||||
else:
|
||||
die(3,f'Incorrect arguments for {type(self).__name__}')
|
||||
|
||||
# al_id,adata now set
|
||||
self.data = adata
|
||||
self.num_addrs = len(adata)
|
||||
self.fmt_data = ''
|
||||
self.chksum = None
|
||||
|
||||
if self.al_id == None: return
|
||||
|
||||
self.id_str = AddrListIDStr(self)
|
||||
if type(self) == KeyList: return
|
||||
|
||||
if do_chksum and not skip_chksum:
|
||||
self.chksum = AddrListChksum(self)
|
||||
qmsg(
|
||||
f'Checksum for {self.desc} data {self.id_str.hl()}: {self.chksum.hl()}\n' +
|
||||
self.msgs[('check_chksum','record_chksum')[src=='gen']] )
|
||||
|
||||
def update_msgs(self):
|
||||
self.msgs = AddrList.msgs
|
||||
self.msgs.update(type(self).msgs)
|
||||
|
||||
def generate(self,seed,addrnums):
|
||||
assert type(addrnums) is AddrIdxList
|
||||
|
||||
seed = self.scramble_seed(seed.data)
|
||||
dmsg_sc('seed',seed[:8].hex())
|
||||
|
||||
compressed = self.al_id.mmtype.compressed
|
||||
pubkey_type = self.al_id.mmtype.pubkey_type
|
||||
|
||||
gen_wallet_passwd = type(self) == KeyAddrList and 'wallet_passwd' in self.al_id.mmtype.extra_attrs
|
||||
gen_viewkey = type(self) == KeyAddrList and 'viewkey' in self.al_id.mmtype.extra_attrs
|
||||
|
||||
if self.gen_addrs:
|
||||
kg = KeyGenerator(self.proto,self.al_id.mmtype)
|
||||
ag = AddrGenerator(self.proto,self.al_id.mmtype)
|
||||
|
||||
t_addrs,num,pos,out = len(addrnums),0,0,AddrListData()
|
||||
le = self.entry_type
|
||||
|
||||
while pos != t_addrs:
|
||||
seed = sha512(seed).digest()
|
||||
num += 1 # round
|
||||
|
||||
if num != addrnums[pos]: continue
|
||||
|
||||
pos += 1
|
||||
|
||||
if not g.debug:
|
||||
qmsg_r(f'\rGenerating {self.gen_desc} #{num} ({pos} of {t_addrs})')
|
||||
|
||||
e = le(proto=self.proto,idx=num)
|
||||
|
||||
# Secret key is double sha256 of seed hash round /num/
|
||||
e.sec = PrivKey(
|
||||
self.proto,
|
||||
sha256(sha256(seed).digest()).digest(),
|
||||
compressed = compressed,
|
||||
pubkey_type = pubkey_type )
|
||||
|
||||
if self.gen_addrs:
|
||||
pubhex = kg.to_pubhex(e.sec)
|
||||
e.addr = ag.to_addr(pubhex)
|
||||
if gen_viewkey:
|
||||
e.viewkey = ag.to_viewkey(pubhex)
|
||||
if gen_wallet_passwd:
|
||||
e.wallet_passwd = ag.to_wallet_passwd(e.sec)
|
||||
|
||||
if type(self) == PasswordList:
|
||||
e.passwd = str(self.make_passwd(e.sec)) # TODO - own type
|
||||
dmsg(f'Key {pos:>03}: {e.passwd}')
|
||||
|
||||
out.append(e)
|
||||
if g.debug_addrlist:
|
||||
Msg(f'generate():\n{e.pfmt()}')
|
||||
|
||||
qmsg('\r{}: {} {}{} generated{}'.format(
|
||||
self.al_id.hl(),
|
||||
t_addrs,
|
||||
self.gen_desc,
|
||||
suf(t_addrs,self.gen_desc_pl),
|
||||
' ' * 15 ))
|
||||
|
||||
return out
|
||||
|
||||
def check_format(self,addr):
|
||||
return True # format is checked when added to list entry object
|
||||
|
||||
def scramble_seed(self,seed):
|
||||
is_btcfork = self.proto.base_coin == 'BTC'
|
||||
if is_btcfork and self.al_id.mmtype == 'L' and not self.proto.testnet:
|
||||
dmsg_sc('str','(none)')
|
||||
return seed
|
||||
if self.proto.base_coin == 'ETH':
|
||||
scramble_key = self.proto.coin.lower()
|
||||
else:
|
||||
scramble_key = (self.proto.coin.lower()+':','')[is_btcfork] + self.al_id.mmtype.name
|
||||
from .crypto import scramble_seed
|
||||
if self.proto.testnet:
|
||||
scramble_key += ':' + self.proto.network
|
||||
dmsg_sc('str',scramble_key)
|
||||
return scramble_seed(seed,scramble_key.encode())
|
||||
|
||||
def idxs(self):
|
||||
return [e.idx for e in self.data]
|
||||
|
||||
def addrs(self):
|
||||
return [f'{self.al_id.sid}:{e.idx}' for e in self.data]
|
||||
|
||||
def addrpairs(self):
|
||||
return [(e.idx,e.addr) for e in self.data]
|
||||
|
||||
def coinaddrs(self):
|
||||
return [e.addr for e in self.data]
|
||||
|
||||
def comments(self):
|
||||
return [e.label for e in self.data]
|
||||
|
||||
def entry(self,idx):
|
||||
for e in self.data:
|
||||
if idx == e.idx: return e
|
||||
|
||||
def coinaddr(self,idx):
|
||||
for e in self.data:
|
||||
if idx == e.idx: return e.addr
|
||||
|
||||
def comment(self,idx):
|
||||
for e in self.data:
|
||||
if idx == e.idx: return e.label
|
||||
|
||||
def set_comment(self,idx,comment):
|
||||
for e in self.data:
|
||||
if idx == e.idx:
|
||||
e.label = comment
|
||||
|
||||
def make_reverse_dict_addrlist(self,coinaddrs):
|
||||
d = MMGenDict()
|
||||
b = coinaddrs
|
||||
for e in self.data:
|
||||
try:
|
||||
d[b[b.index(e.addr)]] = ( MMGenID(self.proto, f'{self.al_id}:{e.idx}'), e.label )
|
||||
except ValueError:
|
||||
pass
|
||||
return d
|
||||
|
||||
def add_wifs(self,key_list):
|
||||
"""
|
||||
Match WIF keys in a flat list to addresses in self by generating all
|
||||
possible addresses for each key.
|
||||
"""
|
||||
def gen_addr(pk,t):
|
||||
at = self.proto.addr_type(t)
|
||||
kg = KeyGenerator(self.proto,at.pubkey_type)
|
||||
ag = AddrGenerator(self.proto,at)
|
||||
return ag.to_addr(kg.to_pubhex(pk))
|
||||
|
||||
compressed_types = set(self.proto.mmtypes) - {'L','E'}
|
||||
uncompressed_types = set(self.proto.mmtypes) & {'L','E'}
|
||||
|
||||
def gen():
|
||||
for wif in key_list:
|
||||
pk = PrivKey(proto=self.proto,wif=wif)
|
||||
for t in (compressed_types if pk.compressed else uncompressed_types):
|
||||
yield ( gen_addr(pk,t), pk )
|
||||
|
||||
addrs4keys = dict(gen())
|
||||
|
||||
for d in self.data:
|
||||
if d.addr in addrs4keys:
|
||||
d.sec = addrs4keys[d.addr]
|
||||
|
||||
def list_missing(self,attr):
|
||||
return [d.addr for d in self.data if not getattr(d,attr)]
|
||||
|
||||
def get_file(self):
|
||||
import mmgen.addrfile as mod
|
||||
return getattr( mod, type(self).__name__.replace('List','File') )(self)
|
||||
|
||||
class KeyAddrList(AddrList):
|
||||
desc = 'key-address'
|
||||
gen_desc = 'key/address pair'
|
||||
gen_desc_pl = 's'
|
||||
gen_addrs = True
|
||||
gen_keys = True
|
||||
has_keys = True
|
||||
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr, e.sec.wif)
|
||||
|
||||
class KeyList(AddrList):
|
||||
desc = 'key'
|
||||
gen_desc = 'key'
|
||||
gen_desc_pl = 's'
|
||||
gen_addrs = False
|
||||
gen_keys = True
|
||||
has_keys = True
|
||||
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr, e.sec.wif)
|
||||
|
||||
def is_bip39_str(s):
|
||||
from .bip39 import bip39
|
||||
return bool(bip39.tohex(s.split(),wl_id='bip39'))
|
||||
|
||||
def is_xmrseed(s):
|
||||
return bool(baseconv.tobytes(s.split(),wl_id='xmrseed'))
|
||||
|
||||
from collections import namedtuple
|
||||
class PasswordList(AddrList):
|
||||
msgs = {
|
||||
'record_chksum': """
|
||||
Record this checksum: it will be used to verify the password file in the future
|
||||
""".strip()
|
||||
}
|
||||
entry_type = PasswordListEntry
|
||||
main_attr = 'passwd'
|
||||
desc = 'password'
|
||||
gen_desc = 'password'
|
||||
gen_desc_pl = 's'
|
||||
gen_addrs = False
|
||||
gen_keys = False
|
||||
gen_passwds = True
|
||||
has_keys = False
|
||||
pw_len = None
|
||||
dfl_pw_fmt = 'b58'
|
||||
pwinfo = namedtuple('passwd_info',['min_len','max_len','dfl_len','valid_lens','desc','chk_func'])
|
||||
pw_info = {
|
||||
'b32': pwinfo(10, 42 ,24, None, 'base32 password', is_b32_str), # 32**24 < 2**128
|
||||
'b58': pwinfo(8, 36 ,20, None, 'base58 password', is_b58_str), # 58**20 < 2**128
|
||||
'bip39': pwinfo(12, 24 ,24, [12,18,24],'BIP39 mnemonic', is_bip39_str),
|
||||
'xmrseed': pwinfo(25, 25, 25, [25], 'Monero new-style mnemonic',is_xmrseed),
|
||||
'hex': pwinfo(32, 64 ,64, [32,48,64],'hexadecimal password', is_hex_str),
|
||||
}
|
||||
chksum_rec_f = lambda foo,e: (str(e.idx), e.passwd)
|
||||
|
||||
feature_warn_fs = 'WARNING: {!r} is a potentially dangerous feature. Use at your own risk!'
|
||||
hex2bip39 = False
|
||||
|
||||
def __init__(self,proto,
|
||||
infile = None,
|
||||
seed = None,
|
||||
pw_idxs = None,
|
||||
pw_id_str = None,
|
||||
pw_len = None,
|
||||
pw_fmt = None,
|
||||
chk_params_only = False
|
||||
):
|
||||
|
||||
self.proto = proto # proto is ignored
|
||||
self.update_msgs()
|
||||
|
||||
if infile:
|
||||
self.infile = infile
|
||||
self.data = self.get_file().parse_file(infile) # sets self.pw_id_str,self.pw_fmt,self.pw_len
|
||||
else:
|
||||
if not chk_params_only:
|
||||
for k in (seed,pw_idxs):
|
||||
assert k
|
||||
self.pw_id_str = MMGenPWIDString(pw_id_str)
|
||||
self.set_pw_fmt(pw_fmt)
|
||||
self.set_pw_len(pw_len)
|
||||
if chk_params_only:
|
||||
return
|
||||
if self.hex2bip39:
|
||||
ymsg(self.feature_warn_fs.format(pw_fmt))
|
||||
self.set_pw_len_vs_seed_len(pw_len,seed)
|
||||
self.al_id = AddrListID(seed.sid,MMGenPasswordType(self.proto,'P'))
|
||||
self.data = self.generate(seed,pw_idxs)
|
||||
|
||||
self.num_addrs = len(self.data)
|
||||
self.fmt_data = ''
|
||||
self.chksum = AddrListChksum(self)
|
||||
|
||||
fs = f'{self.al_id.sid}-{self.pw_id_str}-{self.pw_fmt_disp}-{self.pw_len}[{{}}]'
|
||||
self.id_str = AddrListIDStr(self,fs)
|
||||
qmsg(
|
||||
f'Checksum for {self.desc} data {self.id_str.hl()}: {self.chksum.hl()}\n' +
|
||||
self.msgs[('record_chksum','check_chksum')[bool(infile)]] )
|
||||
|
||||
def set_pw_fmt(self,pw_fmt):
|
||||
if pw_fmt == 'hex2bip39':
|
||||
self.hex2bip39 = True
|
||||
self.pw_fmt = 'bip39'
|
||||
self.pw_fmt_disp = 'hex2bip39'
|
||||
else:
|
||||
self.pw_fmt = pw_fmt
|
||||
self.pw_fmt_disp = pw_fmt
|
||||
if self.pw_fmt not in self.pw_info:
|
||||
raise InvalidPasswdFormat(
|
||||
'{!r}: invalid password format. Valid formats: {}'.format(
|
||||
self.pw_fmt,
|
||||
', '.join(self.pw_info) ))
|
||||
|
||||
def chk_pw_len(self,passwd=None):
|
||||
if passwd is None:
|
||||
assert self.pw_len,'either passwd or pw_len must be set'
|
||||
pw_len = self.pw_len
|
||||
fs = '{l}: invalid user-requested length for {b} ({c}{m})'
|
||||
else:
|
||||
pw_len = len(passwd)
|
||||
fs = '{pw}: {b} has invalid length {l} ({c}{m} characters)'
|
||||
d = self.pw_info[self.pw_fmt]
|
||||
if d.valid_lens:
|
||||
if pw_len not in d.valid_lens:
|
||||
die(2, fs.format( l=pw_len, b=d.desc, c='not one of ', m=d.valid_lens, pw=passwd ))
|
||||
elif pw_len > d.max_len:
|
||||
die(2, fs.format( l=pw_len, b=d.desc, c='>', m=d.max_len, pw=passwd ))
|
||||
elif pw_len < d.min_len:
|
||||
die(2, fs.format( l=pw_len, b=d.desc, c='<', m=d.min_len, pw=passwd ))
|
||||
|
||||
def set_pw_len(self,pw_len):
|
||||
d = self.pw_info[self.pw_fmt]
|
||||
|
||||
if pw_len is None:
|
||||
self.pw_len = d.dfl_len
|
||||
return
|
||||
|
||||
if not is_int(pw_len):
|
||||
die(2,f'{pw_len!r}: invalid user-requested password length (not an integer)')
|
||||
self.pw_len = int(pw_len)
|
||||
self.chk_pw_len()
|
||||
|
||||
def set_pw_len_vs_seed_len(self,pw_len,seed):
|
||||
pf = self.pw_fmt
|
||||
if pf == 'hex':
|
||||
pw_bytes = self.pw_len // 2
|
||||
good_pw_len = seed.byte_len * 2
|
||||
elif pf == 'bip39':
|
||||
from .bip39 import bip39
|
||||
pw_bytes = bip39.nwords2seedlen(self.pw_len,in_bytes=True)
|
||||
good_pw_len = bip39.seedlen2nwords(seed.byte_len,in_bytes=True)
|
||||
elif pf == 'xmrseed':
|
||||
pw_bytes = baseconv.seedlen_map_rev['xmrseed'][self.pw_len]
|
||||
try:
|
||||
good_pw_len = baseconv.seedlen_map['xmrseed'][seed.byte_len]
|
||||
except:
|
||||
die(1,f'{seed.byte_len*8}: unsupported seed length for Monero new-style mnemonic')
|
||||
elif pf in ('b32','b58'):
|
||||
pw_int = (32 if pf == 'b32' else 58) ** self.pw_len
|
||||
pw_bytes = pw_int.bit_length() // 8
|
||||
good_pw_len = len(baseconv.frombytes(b'\xff'*seed.byte_len,wl_id=pf))
|
||||
else:
|
||||
raise NotImplementedError(f'{pf!r}: unknown password format')
|
||||
|
||||
if pw_bytes > seed.byte_len:
|
||||
die(1,
|
||||
'Cannot generate passwords with more entropy than underlying seed! ({} bits)\n'.format(
|
||||
len(seed.data) * 8 ) + (
|
||||
'Re-run the command with --passwd-len={}' if pf in ('bip39','hex') else
|
||||
'Re-run the command, specifying a password length of {} or less'
|
||||
).format(good_pw_len) )
|
||||
|
||||
if pf in ('bip39','hex') and pw_bytes < seed.byte_len:
|
||||
if not keypress_confirm(
|
||||
f'WARNING: requested {self.pw_info[pf].desc} length has less entropy ' +
|
||||
'than underlying seed!\nIs this what you want?',
|
||||
default_yes = True ):
|
||||
die(1,'Exiting at user request')
|
||||
|
||||
def make_passwd(self,hex_sec):
|
||||
assert self.pw_fmt in self.pw_info
|
||||
if self.pw_fmt == 'hex':
|
||||
# take most significant part
|
||||
return hex_sec[:self.pw_len]
|
||||
elif self.pw_fmt == 'bip39':
|
||||
from .bip39 import bip39
|
||||
pw_len_hex = bip39.nwords2seedlen(self.pw_len,in_hex=True)
|
||||
# take most significant part
|
||||
return ' '.join(bip39.fromhex(hex_sec[:pw_len_hex],wl_id='bip39'))
|
||||
elif self.pw_fmt == 'xmrseed':
|
||||
pw_len_hex = baseconv.seedlen_map_rev['xmrseed'][self.pw_len] * 2
|
||||
# take most significant part
|
||||
bytes_trunc = bytes.fromhex(hex_sec[:pw_len_hex])
|
||||
bytes_preproc = init_proto('xmr').preprocess_key(bytes_trunc,None)
|
||||
return ' '.join(baseconv.frombytes(bytes_preproc,wl_id='xmrseed'))
|
||||
else:
|
||||
# take least significant part
|
||||
return baseconv.fromhex(hex_sec,self.pw_fmt,pad=self.pw_len,tostr=True)[-self.pw_len:]
|
||||
|
||||
def check_format(self,pw):
|
||||
if not self.pw_info[self.pw_fmt].chk_func(pw):
|
||||
raise ValueError(f'Password is not valid {self.pw_info[self.pw_fmt].desc} data')
|
||||
pwlen = len(pw.split()) if self.pw_fmt in ('bip39','xmrseed') else len(pw)
|
||||
if pwlen != self.pw_len:
|
||||
raise ValueError(f'Password has incorrect length ({pwlen} != {self.pw_len})')
|
||||
return True
|
||||
|
||||
def scramble_seed(self,seed):
|
||||
# Changing either pw_fmt or pw_len will cause a different, unrelated
|
||||
# set of passwords to be generated: this is what we want.
|
||||
# NB: In original implementation, pw_id_str was 'baseN', not 'bN'
|
||||
scramble_key = f'{self.pw_fmt}:{self.pw_len}:{self.pw_id_str}'
|
||||
|
||||
if self.hex2bip39:
|
||||
from .bip39 import bip39
|
||||
pwlen = bip39.nwords2seedlen(self.pw_len,in_hex=True)
|
||||
scramble_key = f'hex:{pwlen}:{self.pw_id_str}'
|
||||
|
||||
from .crypto import scramble_seed
|
||||
dmsg_sc('str',scramble_key)
|
||||
return scramble_seed(seed,scramble_key.encode())
|
||||
|
|
|
|||
|
|
@ -22,8 +22,9 @@ addrdata.py: MMGen AddrData and related classes
|
|||
|
||||
from .util import vmsg,altcoin_subclass
|
||||
from .base_obj import AsyncInit
|
||||
from .obj import MMGenObject,MMGenDict,get_obj,AddrListID,AddrListData
|
||||
from .addr import MMGenID,AddrListEntry,AddrList
|
||||
from .obj import MMGenObject,MMGenDict,get_obj,AddrListID
|
||||
from .addr import MMGenID
|
||||
from .addrlist import AddrListEntry,AddrListData,AddrList
|
||||
|
||||
class AddrData(MMGenObject):
|
||||
msgs = {
|
||||
|
|
@ -63,6 +64,7 @@ re-import your addresses.
|
|||
return (list(d.values())[0][0]) if d else None
|
||||
|
||||
def add(self,addrlist):
|
||||
from .addrlist import AddrList
|
||||
if type(addrlist) == AddrList:
|
||||
self.al_ids[addrlist.al_id] = addrlist
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -33,7 +33,8 @@ from .util import (
|
|||
from .protocol import init_proto
|
||||
from .obj import *
|
||||
from .seed import SeedID,is_seed_id
|
||||
from .addr import KeyList,PasswordList,dmsg_sc
|
||||
from .addrlist import KeyList,AddrListData,dmsg_sc
|
||||
from .passwdlist import PasswordList
|
||||
|
||||
class AddrFile(MMGenObject):
|
||||
desc = 'addresses'
|
||||
|
|
@ -116,7 +117,7 @@ class AddrFile(MMGenObject):
|
|||
if p.has_keys:
|
||||
from .opts import opt
|
||||
if opt.b16:
|
||||
out.append(fs.format( '', f'orig_hex: {e.sec.orig_bytes.hex()}', c ))
|
||||
out.append(fs.format( '', f'orig_hex: {e.sec.orig_hex()}', c ))
|
||||
out.append(fs.format( '', f'{p.al_id.mmtype.wif_label}: {e.sec.wif}', c ))
|
||||
for k in ('viewkey','wallet_passwd'):
|
||||
v = getattr(e,k)
|
||||
|
|
|
|||
395
mmgen/addrlist.py
Executable file
395
mmgen/addrlist.py
Executable file
|
|
@ -0,0 +1,395 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
|
||||
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
addrlist.py: Address list classes for the MMGen suite
|
||||
"""
|
||||
|
||||
from hashlib import sha256,sha512
|
||||
from .util import qmsg,qmsg_r,suf,make_chksum_N,Msg
|
||||
from .objmethods import MMGenObject,Hilite,InitErrors
|
||||
from .obj import MMGenListItem,ListItemAttr,MMGenDict,WalletPassword,PrivKey
|
||||
from .seed import SeedID
|
||||
from .obj import MMGenID,MMGenAddrType,CoinAddr,AddrIdx,AddrListID,ViewKey
|
||||
|
||||
def dmsg_sc(desc,data):
|
||||
from .globalvars import g
|
||||
if g.debug_addrlist:
|
||||
Msg(f'sc_debug_{desc}: {data}')
|
||||
|
||||
class AddrIdxList(list,InitErrors,MMGenObject):
|
||||
max_len = 1000000
|
||||
def __init__(self,fmt_str=None,idx_list=None,sep=','):
|
||||
try:
|
||||
if idx_list:
|
||||
return list.__init__(self,sorted({AddrIdx(i) for i in idx_list}))
|
||||
elif fmt_str:
|
||||
ret = []
|
||||
for i in (fmt_str.split(sep)):
|
||||
j = i.split('-')
|
||||
if len(j) == 1:
|
||||
idx = AddrIdx(i)
|
||||
if not idx:
|
||||
break
|
||||
ret.append(idx)
|
||||
elif len(j) == 2:
|
||||
beg = AddrIdx(j[0])
|
||||
if not beg:
|
||||
break
|
||||
end = AddrIdx(j[1])
|
||||
if not beg or (end < beg):
|
||||
break
|
||||
ret.extend([AddrIdx(x) for x in range(beg,end+1)])
|
||||
else: break
|
||||
else:
|
||||
return list.__init__(self,sorted(set(ret))) # fell off end of loop - success
|
||||
raise ValueError(f'{i!r}: invalid range')
|
||||
except Exception as e:
|
||||
return type(self).init_fail(e,idx_list or fmt_str)
|
||||
|
||||
class AddrListEntryBase(MMGenListItem):
|
||||
invalid_attrs = {'proto'}
|
||||
def __init__(self,proto,**kwargs):
|
||||
self.__dict__['proto'] = proto
|
||||
MMGenListItem.__init__(self,**kwargs)
|
||||
|
||||
class AddrListEntry(AddrListEntryBase):
|
||||
addr = ListItemAttr(CoinAddr,include_proto=True)
|
||||
idx = ListItemAttr(AddrIdx) # not present in flat addrlists
|
||||
label = ListItemAttr('TwComment',reassign_ok=True)
|
||||
sec = ListItemAttr(PrivKey,include_proto=True)
|
||||
viewkey = ListItemAttr(ViewKey,include_proto=True)
|
||||
wallet_passwd = ListItemAttr('WalletPassword')
|
||||
|
||||
class AddrListChksum(str,Hilite):
|
||||
color = 'pink'
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls,addrlist):
|
||||
ea = addrlist.al_id.mmtype.extra_attrs or () # add viewkey and passwd to the mix, if present
|
||||
lines = [' '.join(
|
||||
addrlist.chksum_rec_f(e) +
|
||||
tuple(getattr(e,a) for a in ea if getattr(e,a))
|
||||
) for e in addrlist.data]
|
||||
return str.__new__(cls,make_chksum_N(' '.join(lines), nchars=16, sep=True))
|
||||
|
||||
class AddrListIDStr(str,Hilite):
|
||||
color = 'green'
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls,addrlist,fmt_str=None):
|
||||
idxs = [e.idx for e in addrlist.data]
|
||||
prev = idxs[0]
|
||||
ret = prev,
|
||||
for i in idxs[1:]:
|
||||
if i == prev + 1:
|
||||
if i == idxs[-1]:
|
||||
ret += '-', i
|
||||
else:
|
||||
if prev != ret[-1]:
|
||||
ret += '-', prev
|
||||
ret += ',', i
|
||||
prev = i
|
||||
s = ''.join(map(str,ret))
|
||||
|
||||
if fmt_str:
|
||||
ret = fmt_str.format(s)
|
||||
else:
|
||||
bc = (addrlist.proto.base_coin,addrlist.proto.coin)[addrlist.proto.base_coin=='ETH']
|
||||
mt = addrlist.al_id.mmtype
|
||||
ret = '{}{}{}[{}]'.format(
|
||||
addrlist.al_id.sid,
|
||||
('-'+bc,'')[bc == 'BTC'],
|
||||
('-'+mt,'')[mt in ('L','E')],
|
||||
s )
|
||||
|
||||
dmsg_sc('id_str',ret[8:].split('[')[0])
|
||||
|
||||
return str.__new__(cls,ret)
|
||||
|
||||
class AddrListData(list,MMGenObject):
|
||||
pass
|
||||
|
||||
class AddrList(MMGenObject): # Address info for a single seed ID
|
||||
entry_type = AddrListEntry
|
||||
main_attr = 'addr'
|
||||
desc = 'address'
|
||||
gen_desc = 'address'
|
||||
gen_desc_pl = 'es'
|
||||
gen_addrs = True
|
||||
gen_passwds = False
|
||||
gen_keys = False
|
||||
has_keys = False
|
||||
chksum_rec_f = lambda foo,e: ( str(e.idx), e.addr )
|
||||
|
||||
def __init__(self,proto,
|
||||
addrfile = '',
|
||||
al_id = '',
|
||||
adata = [],
|
||||
seed = '',
|
||||
addr_idxs = '',
|
||||
src = '',
|
||||
addrlist = '',
|
||||
keylist = '',
|
||||
mmtype = None,
|
||||
skip_key_address_validity_check = False,
|
||||
skip_chksum = False,
|
||||
):
|
||||
|
||||
self.skip_ka_check = skip_key_address_validity_check
|
||||
self.proto = proto
|
||||
do_chksum = False
|
||||
|
||||
mmtype = mmtype or proto.dfl_mmtype
|
||||
assert mmtype in MMGenAddrType.mmtypes, f'{mmtype}: mmtype not in {MMGenAddrType.mmtypes!r}'
|
||||
|
||||
from .protocol import CoinProtocol
|
||||
self.bitcoin_addrtypes = tuple(
|
||||
MMGenAddrType(CoinProtocol.Bitcoin,key).name for key in CoinProtocol.Bitcoin.mmtypes)
|
||||
|
||||
if seed and addr_idxs and mmtype: # data from seed + idxs
|
||||
self.al_id,src = AddrListID(seed.sid,mmtype),'gen'
|
||||
adata = self.generate(seed,addr_idxs)
|
||||
do_chksum = True
|
||||
elif addrfile: # data from MMGen address file
|
||||
self.infile = addrfile
|
||||
adata = self.get_file().parse_file(addrfile) # sets self.al_id
|
||||
do_chksum = True
|
||||
elif al_id and adata: # data from tracking wallet
|
||||
self.al_id = al_id
|
||||
elif addrlist: # data from flat address list
|
||||
self.al_id = None
|
||||
from .util import remove_dups
|
||||
addrlist = remove_dups(addrlist,edesc='address',desc='address list')
|
||||
adata = AddrListData([AddrListEntry(proto=proto,addr=a) for a in addrlist])
|
||||
elif keylist: # data from flat key list
|
||||
self.al_id = None
|
||||
keylist = remove_dups(keylist,edesc='key',desc='key list',hide=True)
|
||||
adata = AddrListData([AddrListEntry(proto=proto,sec=PrivKey(proto=proto,wif=k)) for k in keylist])
|
||||
elif seed or addr_idxs:
|
||||
die(3,'Must specify both seed and addr indexes')
|
||||
elif al_id or adata:
|
||||
die(3,'Must specify both al_id and adata')
|
||||
else:
|
||||
die(3,f'Incorrect arguments for {type(self).__name__}')
|
||||
|
||||
# al_id,adata now set
|
||||
self.data = adata
|
||||
self.num_addrs = len(adata)
|
||||
self.fmt_data = ''
|
||||
self.chksum = None
|
||||
|
||||
if self.al_id == None:
|
||||
return
|
||||
|
||||
self.id_str = AddrListIDStr(self)
|
||||
|
||||
if type(self) == KeyList:
|
||||
return
|
||||
|
||||
if do_chksum and not skip_chksum:
|
||||
self.chksum = AddrListChksum(self)
|
||||
self.do_chksum_msg(record=src=='gen')
|
||||
|
||||
def do_chksum_msg(self,record):
|
||||
chk = 'Check this value against your records'
|
||||
rec = f'Record this checksum: it will be used to verify the {self.desc} file in the future'
|
||||
qmsg(
|
||||
f'Checksum for {self.desc} data {self.id_str.hl()}: {self.chksum.hl()}\n' +
|
||||
(chk,rec)[record] )
|
||||
|
||||
def generate(self,seed,addr_idxs):
|
||||
assert type(addr_idxs) is AddrIdxList
|
||||
|
||||
seed = self.scramble_seed(seed.data)
|
||||
dmsg_sc('seed',seed[:8].hex())
|
||||
|
||||
mmtype = self.al_id.mmtype
|
||||
|
||||
gen_wallet_passwd = type(self) == KeyAddrList and 'wallet_passwd' in mmtype.extra_attrs
|
||||
gen_viewkey = type(self) == KeyAddrList and 'viewkey' in mmtype.extra_attrs
|
||||
|
||||
if self.gen_addrs:
|
||||
from .addr import KeyGenerator,AddrGenerator
|
||||
kg = KeyGenerator( self.proto, mmtype )
|
||||
ag = AddrGenerator( self.proto, mmtype )
|
||||
|
||||
t_addrs,out = ( len(addr_idxs), AddrListData() )
|
||||
le = self.entry_type
|
||||
num,pos = (0,0)
|
||||
|
||||
from .globalvars import g
|
||||
|
||||
while pos != t_addrs:
|
||||
seed = sha512(seed).digest()
|
||||
num += 1 # round
|
||||
|
||||
if num != addr_idxs[pos]:
|
||||
continue
|
||||
|
||||
pos += 1
|
||||
|
||||
if not g.debug:
|
||||
qmsg_r(f'\rGenerating {self.gen_desc} #{num} ({pos} of {t_addrs})')
|
||||
|
||||
e = le(proto=self.proto,idx=num)
|
||||
|
||||
# Secret key is double sha256 of seed hash round /num/
|
||||
e.sec = PrivKey(
|
||||
self.proto,
|
||||
sha256(sha256(seed).digest()).digest(),
|
||||
compressed = mmtype.compressed,
|
||||
pubkey_type = mmtype.pubkey_type )
|
||||
|
||||
if self.gen_addrs:
|
||||
pubhex = kg.to_pubhex(e.sec)
|
||||
e.addr = ag.to_addr(pubhex)
|
||||
if gen_viewkey:
|
||||
e.viewkey = ag.to_viewkey(pubhex)
|
||||
if gen_wallet_passwd:
|
||||
e.wallet_passwd = ag.to_wallet_passwd(e.sec)
|
||||
elif self.gen_passwds:
|
||||
e.passwd = self.gen_passwd(e.sec) # TODO - own type
|
||||
|
||||
out.append(e)
|
||||
|
||||
if g.debug_addrlist:
|
||||
Msg(f'generate():\n{e.pfmt()}')
|
||||
|
||||
qmsg('\r{}: {} {}{} generated{}'.format(
|
||||
self.al_id.hl(),
|
||||
t_addrs,
|
||||
self.gen_desc,
|
||||
suf(t_addrs,self.gen_desc_pl),
|
||||
' ' * 15 ))
|
||||
|
||||
return out
|
||||
|
||||
def gen_wallet_passwd(self,privbytes):
|
||||
from .protocol import hash256
|
||||
return WalletPassword( hash256(privbytes)[:16].hex() )
|
||||
|
||||
def check_format(self,addr):
|
||||
return True # format is checked when added to list entry object
|
||||
|
||||
def scramble_seed(self,seed):
|
||||
is_btcfork = self.proto.base_coin == 'BTC'
|
||||
if is_btcfork and self.al_id.mmtype == 'L' and not self.proto.testnet:
|
||||
dmsg_sc('str','(none)')
|
||||
return seed
|
||||
if self.proto.base_coin == 'ETH':
|
||||
scramble_key = self.proto.coin.lower()
|
||||
else:
|
||||
scramble_key = (self.proto.coin.lower()+':','')[is_btcfork] + self.al_id.mmtype.name
|
||||
from .crypto import scramble_seed
|
||||
if self.proto.testnet:
|
||||
scramble_key += ':' + self.proto.network
|
||||
dmsg_sc('str',scramble_key)
|
||||
return scramble_seed(seed,scramble_key.encode())
|
||||
|
||||
def idxs(self):
|
||||
return [e.idx for e in self.data]
|
||||
|
||||
def addrs(self):
|
||||
return [f'{self.al_id.sid}:{e.idx}' for e in self.data]
|
||||
|
||||
def addrpairs(self):
|
||||
return [(e.idx,e.addr) for e in self.data]
|
||||
|
||||
def coinaddrs(self):
|
||||
return [e.addr for e in self.data]
|
||||
|
||||
def comments(self):
|
||||
return [e.label for e in self.data]
|
||||
|
||||
def entry(self,idx):
|
||||
for e in self.data:
|
||||
if idx == e.idx:
|
||||
return e
|
||||
|
||||
def coinaddr(self,idx):
|
||||
for e in self.data:
|
||||
if idx == e.idx:
|
||||
return e.addr
|
||||
|
||||
def comment(self,idx):
|
||||
for e in self.data:
|
||||
if idx == e.idx:
|
||||
return e.label
|
||||
|
||||
def set_comment(self,idx,comment):
|
||||
for e in self.data:
|
||||
if idx == e.idx:
|
||||
e.label = comment
|
||||
|
||||
def make_reverse_dict_addrlist(self,coinaddrs):
|
||||
d = MMGenDict()
|
||||
b = coinaddrs
|
||||
for e in self.data:
|
||||
try:
|
||||
d[b[b.index(e.addr)]] = ( MMGenID(self.proto, f'{self.al_id}:{e.idx}'), e.label )
|
||||
except ValueError:
|
||||
pass
|
||||
return d
|
||||
|
||||
def add_wifs(self,key_list):
|
||||
"""
|
||||
Match WIF keys in a flat list to addresses in self by generating all
|
||||
possible addresses for each key.
|
||||
"""
|
||||
def gen_addr(pk,t):
|
||||
at = self.proto.addr_type(t)
|
||||
from .addr import KeyGenerator,AddrGenerator
|
||||
kg = KeyGenerator(self.proto,at)
|
||||
ag = AddrGenerator(self.proto,at)
|
||||
return ag.to_addr(kg.to_pubhex(pk))
|
||||
|
||||
compressed_types = set(self.proto.mmtypes) - {'L','E'}
|
||||
uncompressed_types = set(self.proto.mmtypes) & {'L','E'}
|
||||
|
||||
def gen():
|
||||
for wif in key_list:
|
||||
pk = PrivKey(proto=self.proto,wif=wif)
|
||||
for t in (compressed_types if pk.compressed else uncompressed_types):
|
||||
yield ( gen_addr(pk,t), pk )
|
||||
|
||||
addrs4keys = dict(gen())
|
||||
|
||||
for d in self.data:
|
||||
if d.addr in addrs4keys:
|
||||
d.sec = addrs4keys[d.addr]
|
||||
|
||||
def list_missing(self,attr):
|
||||
return [d.addr for d in self.data if not getattr(d,attr)]
|
||||
|
||||
def get_file(self):
|
||||
import mmgen.addrfile as mod
|
||||
return getattr( mod, type(self).__name__.replace('List','File') )(self)
|
||||
|
||||
class KeyAddrList(AddrList):
|
||||
desc = 'key-address'
|
||||
gen_desc = 'key/address pair'
|
||||
gen_desc_pl = 's'
|
||||
gen_keys = True
|
||||
has_keys = True
|
||||
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr, e.sec.wif)
|
||||
|
||||
class KeyList(KeyAddrList):
|
||||
desc = 'key'
|
||||
gen_desc = 'key'
|
||||
gen_addrs = False
|
||||
|
|
@ -23,7 +23,8 @@ mmgen-addrgen: Generate a series or range of addresses from an MMGen
|
|||
|
||||
from .common import *
|
||||
from .crypto import *
|
||||
from .addr import AddrList,KeyAddrList,KeyList,MMGenAddrType,AddrIdxList
|
||||
from .addr import MMGenAddrType
|
||||
from .addrlist import AddrList,KeyAddrList,KeyList,AddrIdxList
|
||||
from .addrfile import AddrFile
|
||||
from .wallet import Wallet
|
||||
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ mmgen-addrimport: Import addresses into a MMGen coin daemon tracking wallet
|
|||
import time
|
||||
|
||||
from .common import *
|
||||
from .addr import AddrList,KeyAddrList
|
||||
from .addrlist import AddrList,KeyAddrList
|
||||
from .obj import TwLabel
|
||||
|
||||
ai_msgs = lambda k: {
|
||||
|
|
|
|||
|
|
@ -23,7 +23,8 @@ mmgen-passgen: Generate a series or range of passwords from an MMGen
|
|||
|
||||
from .common import *
|
||||
from .crypto import *
|
||||
from .addr import PasswordList,AddrIdxList
|
||||
from .addrlist import AddrIdxList
|
||||
from .passwdlist import PasswordList
|
||||
from .wallet import Wallet
|
||||
from .obj import MMGenPWIDString
|
||||
|
||||
|
|
|
|||
36
mmgen/obj.py
36
mmgen/obj.py
|
|
@ -96,8 +96,6 @@ class IndexedDict(dict):
|
|||
|
||||
class MMGenList(list,MMGenObject): pass
|
||||
class MMGenDict(dict,MMGenObject): pass
|
||||
class AddrListData(list,MMGenObject): pass
|
||||
|
||||
class Str(str,Hilite): pass
|
||||
|
||||
class Int(int,Hilite,InitErrors):
|
||||
|
|
@ -139,7 +137,7 @@ class ImmutableAttr: # Descriptor
|
|||
def __init__(self,dtype,typeconv=True,set_none_ok=False,include_proto=False):
|
||||
assert isinstance(dtype,self.ok_dtypes), 'ImmutableAttr_check1'
|
||||
if include_proto:
|
||||
assert typeconv and type(dtype) == str, 'ImmutableAttr_check2'
|
||||
assert typeconv, 'ImmutableAttr_check2'
|
||||
if set_none_ok:
|
||||
assert typeconv and type(dtype) != str, 'ImmutableAttr_check3'
|
||||
|
||||
|
|
@ -156,6 +154,8 @@ class ImmutableAttr: # Descriptor
|
|||
else:
|
||||
if set_none_ok:
|
||||
self.conv = lambda instance,value: None if value is None else dtype(value)
|
||||
elif include_proto:
|
||||
self.conv = lambda instance,value: dtype(instance.proto,value)
|
||||
else:
|
||||
self.conv = lambda instance,value: dtype(value)
|
||||
else:
|
||||
|
|
@ -258,36 +258,6 @@ class MMGenListItem(MMGenObject):
|
|||
class MMGenIdx(Int): min_val = 1
|
||||
class AddrIdx(MMGenIdx): max_digits = 7
|
||||
|
||||
class AddrIdxList(list,InitErrors,MMGenObject):
|
||||
max_len = 1000000
|
||||
def __init__(self,fmt_str=None,idx_list=None,sep=','):
|
||||
try:
|
||||
if idx_list:
|
||||
return list.__init__(self,sorted({AddrIdx(i) for i in idx_list}))
|
||||
elif fmt_str:
|
||||
ret = []
|
||||
for i in (fmt_str.split(sep)):
|
||||
j = i.split('-')
|
||||
if len(j) == 1:
|
||||
idx = AddrIdx(i)
|
||||
if not idx:
|
||||
break
|
||||
ret.append(idx)
|
||||
elif len(j) == 2:
|
||||
beg = AddrIdx(j[0])
|
||||
if not beg:
|
||||
break
|
||||
end = AddrIdx(j[1])
|
||||
if not beg or (end < beg):
|
||||
break
|
||||
ret.extend([AddrIdx(x) for x in range(beg,end+1)])
|
||||
else: break
|
||||
else:
|
||||
return list.__init__(self,sorted(set(ret))) # fell off end of loop - success
|
||||
raise ValueError(f'{i!r}: invalid range')
|
||||
except Exception as e:
|
||||
return type(self).init_fail(e,idx_list or fmt_str)
|
||||
|
||||
class MMGenRange(tuple,InitErrors,MMGenObject):
|
||||
|
||||
min_idx = None
|
||||
|
|
|
|||
228
mmgen/passwdlist.py
Executable file
228
mmgen/passwdlist.py
Executable file
|
|
@ -0,0 +1,228 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
|
||||
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
passwdlist.py: Password list class for the MMGen suite
|
||||
"""
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from .exception import InvalidPasswdFormat
|
||||
from .util import ymsg,is_hex_str,is_int,keypress_confirm
|
||||
from .obj import ImmutableAttr,ListItemAttr,MMGenPWIDString,PrivKey
|
||||
from .baseconv import baseconv,is_b32_str,is_b58_str
|
||||
from .addr import MMGenPasswordType,AddrIdx,AddrListID,is_xmrseed,is_bip39_str
|
||||
from .addrlist import (
|
||||
AddrListChksum,
|
||||
AddrListIDStr,
|
||||
AddrListEntryBase,
|
||||
AddrList,
|
||||
dmsg_sc,
|
||||
)
|
||||
|
||||
class PasswordListEntry(AddrListEntryBase):
|
||||
passwd = ListItemAttr(str,typeconv=False) # TODO: create Password type
|
||||
idx = ImmutableAttr(AddrIdx)
|
||||
label = ListItemAttr('TwComment',reassign_ok=True)
|
||||
sec = ListItemAttr(PrivKey,include_proto=True)
|
||||
|
||||
class PasswordList(AddrList):
|
||||
entry_type = PasswordListEntry
|
||||
main_attr = 'passwd'
|
||||
desc = 'password'
|
||||
gen_desc = 'password'
|
||||
gen_desc_pl = 's'
|
||||
gen_addrs = False
|
||||
gen_keys = False
|
||||
gen_passwds = True
|
||||
pw_len = None
|
||||
dfl_pw_fmt = 'b58'
|
||||
pwinfo = namedtuple('passwd_info',['min_len','max_len','dfl_len','valid_lens','desc','chk_func'])
|
||||
pw_info = {
|
||||
'b32': pwinfo(10, 42 ,24, None, 'base32 password', is_b32_str), # 32**24 < 2**128
|
||||
'b58': pwinfo(8, 36 ,20, None, 'base58 password', is_b58_str), # 58**20 < 2**128
|
||||
'bip39': pwinfo(12, 24 ,24, [12,18,24],'BIP39 mnemonic', is_bip39_str),
|
||||
'xmrseed': pwinfo(25, 25, 25, [25], 'Monero new-style mnemonic',is_xmrseed),
|
||||
'hex': pwinfo(32, 64 ,64, [32,48,64],'hexadecimal password', is_hex_str),
|
||||
}
|
||||
chksum_rec_f = lambda foo,e: (str(e.idx), e.passwd)
|
||||
|
||||
feature_warn_fs = 'WARNING: {!r} is a potentially dangerous feature. Use at your own risk!'
|
||||
hex2bip39 = False
|
||||
|
||||
def __init__(self,proto,
|
||||
infile = None,
|
||||
seed = None,
|
||||
pw_idxs = None,
|
||||
pw_id_str = None,
|
||||
pw_len = None,
|
||||
pw_fmt = None,
|
||||
chk_params_only = False,
|
||||
):
|
||||
|
||||
self.proto = proto # proto is ignored
|
||||
|
||||
if infile:
|
||||
self.infile = infile
|
||||
self.data = self.get_file().parse_file(infile) # sets self.pw_id_str,self.pw_fmt,self.pw_len
|
||||
else:
|
||||
if not chk_params_only:
|
||||
for k in (seed,pw_idxs):
|
||||
assert k
|
||||
self.pw_id_str = MMGenPWIDString(pw_id_str)
|
||||
self.set_pw_fmt(pw_fmt)
|
||||
self.set_pw_len(pw_len)
|
||||
if chk_params_only:
|
||||
return
|
||||
if self.hex2bip39:
|
||||
ymsg(self.feature_warn_fs.format(pw_fmt))
|
||||
self.set_pw_len_vs_seed_len(pw_len,seed)
|
||||
self.al_id = AddrListID(seed.sid,MMGenPasswordType(self.proto,'P'))
|
||||
self.data = self.generate(seed,pw_idxs)
|
||||
|
||||
self.num_addrs = len(self.data)
|
||||
self.fmt_data = ''
|
||||
self.chksum = AddrListChksum(self)
|
||||
|
||||
fs = f'{self.al_id.sid}-{self.pw_id_str}-{self.pw_fmt_disp}-{self.pw_len}[{{}}]'
|
||||
self.id_str = AddrListIDStr(self,fs)
|
||||
self.do_chksum_msg(record=not infile)
|
||||
|
||||
def set_pw_fmt(self,pw_fmt):
|
||||
if pw_fmt == 'hex2bip39':
|
||||
self.hex2bip39 = True
|
||||
self.pw_fmt = 'bip39'
|
||||
self.pw_fmt_disp = 'hex2bip39'
|
||||
else:
|
||||
self.pw_fmt = pw_fmt
|
||||
self.pw_fmt_disp = pw_fmt
|
||||
if self.pw_fmt not in self.pw_info:
|
||||
raise InvalidPasswdFormat(
|
||||
'{!r}: invalid password format. Valid formats: {}'.format(
|
||||
self.pw_fmt,
|
||||
', '.join(self.pw_info) ))
|
||||
|
||||
def chk_pw_len(self,passwd=None):
|
||||
if passwd is None:
|
||||
assert self.pw_len,'either passwd or pw_len must be set'
|
||||
pw_len = self.pw_len
|
||||
fs = '{l}: invalid user-requested length for {b} ({c}{m})'
|
||||
else:
|
||||
pw_len = len(passwd)
|
||||
fs = '{pw}: {b} has invalid length {l} ({c}{m} characters)'
|
||||
d = self.pw_info[self.pw_fmt]
|
||||
if d.valid_lens:
|
||||
if pw_len not in d.valid_lens:
|
||||
die(2, fs.format( l=pw_len, b=d.desc, c='not one of ', m=d.valid_lens, pw=passwd ))
|
||||
elif pw_len > d.max_len:
|
||||
die(2, fs.format( l=pw_len, b=d.desc, c='>', m=d.max_len, pw=passwd ))
|
||||
elif pw_len < d.min_len:
|
||||
die(2, fs.format( l=pw_len, b=d.desc, c='<', m=d.min_len, pw=passwd ))
|
||||
|
||||
def set_pw_len(self,pw_len):
|
||||
d = self.pw_info[self.pw_fmt]
|
||||
|
||||
if pw_len is None:
|
||||
self.pw_len = d.dfl_len
|
||||
return
|
||||
|
||||
if not is_int(pw_len):
|
||||
die(2,f'{pw_len!r}: invalid user-requested password length (not an integer)')
|
||||
self.pw_len = int(pw_len)
|
||||
self.chk_pw_len()
|
||||
|
||||
def set_pw_len_vs_seed_len(self,pw_len,seed):
|
||||
pf = self.pw_fmt
|
||||
if pf == 'hex':
|
||||
pw_bytes = self.pw_len // 2
|
||||
good_pw_len = seed.byte_len * 2
|
||||
elif pf == 'bip39':
|
||||
from .bip39 import bip39
|
||||
pw_bytes = bip39.nwords2seedlen(self.pw_len,in_bytes=True)
|
||||
good_pw_len = bip39.seedlen2nwords(seed.byte_len,in_bytes=True)
|
||||
elif pf == 'xmrseed':
|
||||
pw_bytes = baseconv.seedlen_map_rev['xmrseed'][self.pw_len]
|
||||
try:
|
||||
good_pw_len = baseconv.seedlen_map['xmrseed'][seed.byte_len]
|
||||
except:
|
||||
die(1,f'{seed.byte_len*8}: unsupported seed length for Monero new-style mnemonic')
|
||||
elif pf in ('b32','b58'):
|
||||
pw_int = (32 if pf == 'b32' else 58) ** self.pw_len
|
||||
pw_bytes = pw_int.bit_length() // 8
|
||||
good_pw_len = len(baseconv.frombytes(b'\xff'*seed.byte_len,wl_id=pf))
|
||||
else:
|
||||
raise NotImplementedError(f'{pf!r}: unknown password format')
|
||||
|
||||
if pw_bytes > seed.byte_len:
|
||||
die(1,
|
||||
'Cannot generate passwords with more entropy than underlying seed! ({} bits)\n'.format(
|
||||
len(seed.data) * 8 ) + (
|
||||
'Re-run the command with --passwd-len={}' if pf in ('bip39','hex') else
|
||||
'Re-run the command, specifying a password length of {} or less'
|
||||
).format(good_pw_len) )
|
||||
|
||||
if pf in ('bip39','hex') and pw_bytes < seed.byte_len:
|
||||
if not keypress_confirm(
|
||||
f'WARNING: requested {self.pw_info[pf].desc} length has less entropy ' +
|
||||
'than underlying seed!\nIs this what you want?',
|
||||
default_yes = True ):
|
||||
die(1,'Exiting at user request')
|
||||
|
||||
def gen_passwd(self,hex_sec):
|
||||
assert self.pw_fmt in self.pw_info
|
||||
if self.pw_fmt == 'hex':
|
||||
# take most significant part
|
||||
return hex_sec[:self.pw_len]
|
||||
elif self.pw_fmt == 'bip39':
|
||||
from .bip39 import bip39
|
||||
pw_len_hex = bip39.nwords2seedlen(self.pw_len,in_hex=True)
|
||||
# take most significant part
|
||||
return ' '.join(bip39.fromhex(hex_sec[:pw_len_hex],wl_id='bip39'))
|
||||
elif self.pw_fmt == 'xmrseed':
|
||||
pw_len_hex = baseconv.seedlen_map_rev['xmrseed'][self.pw_len] * 2
|
||||
# take most significant part
|
||||
bytes_trunc = bytes.fromhex(hex_sec[:pw_len_hex])
|
||||
from .protocol import init_proto
|
||||
bytes_preproc = init_proto('xmr').preprocess_key(bytes_trunc,None)
|
||||
return ' '.join(baseconv.frombytes(bytes_preproc,wl_id='xmrseed'))
|
||||
else:
|
||||
# take least significant part
|
||||
return baseconv.fromhex(hex_sec,self.pw_fmt,pad=self.pw_len,tostr=True)[-self.pw_len:]
|
||||
|
||||
def check_format(self,pw):
|
||||
if not self.pw_info[self.pw_fmt].chk_func(pw):
|
||||
raise ValueError(f'Password is not valid {self.pw_info[self.pw_fmt].desc} data')
|
||||
pwlen = len(pw.split()) if self.pw_fmt in ('bip39','xmrseed') else len(pw)
|
||||
if pwlen != self.pw_len:
|
||||
raise ValueError(f'Password has incorrect length ({pwlen} != {self.pw_len})')
|
||||
return True
|
||||
|
||||
def scramble_seed(self,seed):
|
||||
# Changing either pw_fmt or pw_len will cause a different, unrelated
|
||||
# set of passwords to be generated: this is what we want.
|
||||
# NB: In original implementation, pw_id_str was 'baseN', not 'bN'
|
||||
scramble_key = f'{self.pw_fmt}:{self.pw_len}:{self.pw_id_str}'
|
||||
|
||||
if self.hex2bip39:
|
||||
from .bip39 import bip39
|
||||
pwlen = bip39.nwords2seedlen(self.pw_len,in_hex=True)
|
||||
scramble_key = f'hex:{pwlen}:{self.pw_id_str}'
|
||||
|
||||
from .crypto import scramble_seed
|
||||
dmsg_sc('str',scramble_key)
|
||||
return scramble_seed(seed,scramble_key.encode())
|
||||
|
|
@ -25,6 +25,9 @@ from .common import *
|
|||
from .crypto import *
|
||||
from .seedsplit import MasterShareIdx
|
||||
from .addr import *
|
||||
from .addrlist import AddrList,KeyAddrList
|
||||
from .passwdlist import PasswordList
|
||||
from .baseconv import baseconv
|
||||
|
||||
NL = ('\n','\r\n')[g.platform=='win']
|
||||
|
||||
|
|
@ -900,6 +903,7 @@ class MMGenToolCmdWallet(MMGenToolCmds):
|
|||
ss = Wallet(sf)
|
||||
if ss.seed.sid != addr.sid:
|
||||
die(1,f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})')
|
||||
from .addrlist import AddrList,AddrIdxList
|
||||
al = AddrList(
|
||||
proto = self.proto,
|
||||
seed = ss.seed,
|
||||
|
|
@ -968,6 +972,7 @@ class MMGenToolCmdRPC(MMGenToolCmds):
|
|||
die(1,
|
||||
f'{mmgen_addrs}: invalid address list argument ' +
|
||||
'(must be in form <seed ID>:[<type>:]<idx list>)' )
|
||||
from .addrlist import AddrIdxList
|
||||
usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])]
|
||||
|
||||
al = await TwAddrList(self.proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels)
|
||||
|
|
|
|||
|
|
@ -723,8 +723,8 @@ class MMGenTX:
|
|||
die(2,'At least one output must be specified on the command line')
|
||||
|
||||
async def get_outputs_from_cmdline(self,cmd_args):
|
||||
from .addr import AddrList
|
||||
from .addrdata import AddrData,TwAddrData
|
||||
from .addrlist import AddrList
|
||||
from .addrfile import AddrFile
|
||||
addrfiles = remove_dups(
|
||||
tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
|
||||
|
|
@ -755,6 +755,7 @@ class MMGenTX:
|
|||
while True:
|
||||
reply = line_input(prompt).strip()
|
||||
if reply:
|
||||
from .addrlist import AddrIdxList
|
||||
selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
|
||||
if selected:
|
||||
if selected[-1] <= len(unspent):
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ txsign: Sign a transaction generated by 'mmgen-txcreate'
|
|||
"""
|
||||
|
||||
from .common import *
|
||||
from .addr import AddrIdxList,KeyAddrList
|
||||
from .addrlist import AddrIdxList,KeyAddrList
|
||||
from .obj import MMGenAddrType,MMGenList
|
||||
from .wallet import Wallet,WalletUnenc,WalletEnc,MMGenWallet
|
||||
from .tx import MMGenTX
|
||||
|
|
|
|||
|
|
@ -24,11 +24,11 @@ import os,re,time,json
|
|||
from collections import namedtuple
|
||||
from .common import *
|
||||
from .objmethods import Hilite,InitErrors
|
||||
from .addr import KeyAddrList,AddrIdxList
|
||||
from .rpc import MoneroRPCClientRaw,MoneroWalletRPCClient,json_encoder
|
||||
from .seed import SeedID
|
||||
from .daemon import MoneroWalletDaemon
|
||||
from .protocol import _b58a,init_proto
|
||||
from .addrlist import KeyAddrList,AddrIdxList
|
||||
from .obj import CoinAddr,CoinTxID,AddrIdx
|
||||
|
||||
xmrwallet_uarg_info = (
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ os.environ['MMGEN_TEST_SUITE'] = '1'
|
|||
|
||||
# Import these _after_ local path's been added to sys.path
|
||||
from test.objattrtest_py_d.oat_common import *
|
||||
from mmgen.addrlist import *
|
||||
from mmgen.passwdlist import *
|
||||
|
||||
opts_data = {
|
||||
'sets': [
|
||||
|
|
|
|||
|
|
@ -33,6 +33,9 @@ from mmgen.common import *
|
|||
from mmgen.obj import *
|
||||
from mmgen.altcoins.eth.obj import *
|
||||
from mmgen.seedsplit import *
|
||||
from mmgen.addr import *
|
||||
from mmgen.addrlist import *
|
||||
from mmgen.addrdata import *
|
||||
from mmgen.amt import *
|
||||
|
||||
opts_data = {
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ test.objtest_py_d.ot_btc_mainnet: BTC mainnet test vectors for MMGen data object
|
|||
"""
|
||||
|
||||
from mmgen.obj import *
|
||||
from mmgen.addrlist import AddrIdxList
|
||||
from mmgen.seedsplit import *
|
||||
from .ot_common import *
|
||||
|
||||
|
|
|
|||
|
|
@ -199,7 +199,7 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
|
|||
def _get_addrfile_checksum(self,display=False):
|
||||
addrfile = self.get_file_with_ext('addrs')
|
||||
silence()
|
||||
from mmgen.addr import AddrList
|
||||
from mmgen.addrlist import AddrList
|
||||
chk = AddrList(self.proto,addrfile).chksum
|
||||
if opt.verbose and display:
|
||||
msg(f'Checksum: {cyan(chk)}')
|
||||
|
|
@ -397,8 +397,7 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
|
|||
return out
|
||||
|
||||
def _create_tx_data(self,sources,addrs_per_wallet=addrs_per_wallet):
|
||||
from mmgen.addr import AddrList
|
||||
from mmgen.obj import AddrIdxList
|
||||
from mmgen.addrlist import AddrList,AddrIdxList
|
||||
from mmgen.addrdata import AddrData
|
||||
tx_data,ad = {},AddrData(self.proto)
|
||||
for s in sources:
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ from mmgen.globalvars import g
|
|||
from mmgen.opts import opt
|
||||
from mmgen.util import die,gmsg,write_data_to_file
|
||||
from mmgen.protocol import init_proto
|
||||
from mmgen.addr import AddrList
|
||||
from mmgen.addrlist import AddrList
|
||||
from mmgen.wallet import MMGenWallet
|
||||
from ..include.common import *
|
||||
from .common import *
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from mmgen.globalvars import g
|
|||
from mmgen.opts import opt
|
||||
from mmgen.obj import MMGenRange
|
||||
from mmgen.amt import XMRAmt
|
||||
from mmgen.addr import KeyAddrList,AddrIdxList
|
||||
from mmgen.addrlist import KeyAddrList,AddrIdxList
|
||||
from ..include.common import *
|
||||
from .common import *
|
||||
|
||||
|
|
|
|||
63
test/unit_tests_d/ut_addrlist.py
Executable file
63
test/unit_tests_d/ut_addrlist.py
Executable file
|
|
@ -0,0 +1,63 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
test.unit_tests_d.ut_addrlist: address list unit tests for the MMGen suite
|
||||
"""
|
||||
|
||||
from mmgen.common import *
|
||||
from mmgen.seed import Seed
|
||||
from mmgen.addr import MMGenAddrType
|
||||
from mmgen.addrlist import AddrIdxList,AddrList,KeyList,KeyAddrList
|
||||
from mmgen.passwdlist import PasswordList
|
||||
from mmgen.protocol import init_proto
|
||||
|
||||
def do_test(list_type,chksum,pw_id_str=None,add_kwargs=None):
|
||||
qmsg(blue(f'Testing {list_type.__name__}'))
|
||||
proto = init_proto('btc')
|
||||
seed = Seed(seed_bin=bytes.fromhex('feedbead'*8))
|
||||
mmtype = MMGenAddrType(proto,'C')
|
||||
idxs = AddrIdxList('1-3')
|
||||
|
||||
kwargs = {
|
||||
'seed': seed,
|
||||
'pw_idxs': idxs,
|
||||
'pw_id_str': pw_id_str,
|
||||
'pw_fmt': 'b58',
|
||||
} if pw_id_str else {
|
||||
'seed': seed,
|
||||
'addr_idxs': idxs,
|
||||
'mmtype': mmtype,
|
||||
}
|
||||
|
||||
if add_kwargs:
|
||||
kwargs.update(add_kwargs)
|
||||
|
||||
al = list_type( proto, **kwargs )
|
||||
|
||||
af = al.get_file()
|
||||
af.format()
|
||||
|
||||
qmsg(f'Filename: {af.filename}\n')
|
||||
# af.write('-')
|
||||
vmsg(f'------------\n{af.fmt_data}\n------------')
|
||||
|
||||
if chksum:
|
||||
assert al.chksum == chksum, f'{al.chksum} != {chksum}'
|
||||
|
||||
return True
|
||||
|
||||
class unit_tests:
|
||||
|
||||
def addr(self,name,ut):
|
||||
return do_test(AddrList,'BCE8 082C 0973 A525')
|
||||
|
||||
def key(self,name,ut):
|
||||
return do_test(KeyList,None)
|
||||
|
||||
def keyaddr(self,name,ut):
|
||||
return do_test(KeyAddrList,'4A36 AA65 8C2B 7C35')
|
||||
|
||||
def passwd(self,name,ut):
|
||||
return do_test(PasswordList,'FF4A B716 4513 8F8F',pw_id_str='foo')
|
||||
|
||||
def passwd_bip39(self,name,ut):
|
||||
return do_test(PasswordList,'C3A8 B2B2 1AA1 FB40',pw_id_str='foo',add_kwargs={'pw_fmt':'bip39'})
|
||||
Loading…
Add table
Add a link
Reference in a new issue