|
@@ -16,7 +16,7 @@
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
"""
|
|
|
-util.py: Shared routines for the mmgen suite
|
|
|
+util.py: Low-level routines imported by other modules for the MMGen suite
|
|
|
"""
|
|
|
|
|
|
import sys
|
|
@@ -24,8 +24,6 @@ from hashlib import sha256
|
|
|
from binascii import hexlify,unhexlify
|
|
|
|
|
|
import mmgen.config as g
|
|
|
-from mmgen.bitcoin import b58decode_pad,b58encode_pad
|
|
|
-from mmgen.term import *
|
|
|
|
|
|
def msg(s): sys.stderr.write(s + "\n")
|
|
|
def msg_r(s): sys.stderr.write(s)
|
|
@@ -42,97 +40,6 @@ def vmsg(s):
|
|
|
def vmsg_r(s):
|
|
|
if g.verbose: sys.stderr.write(s)
|
|
|
|
|
|
-def bail(): sys.exit(9)
|
|
|
-
|
|
|
-def get_extension(f):
|
|
|
- import os
|
|
|
- return os.path.splitext(f)[1][1:]
|
|
|
-
|
|
|
-def get_random_data_from_user(uchars):
|
|
|
-
|
|
|
- if g.quiet: msg("Enter %s random symbols" % uchars)
|
|
|
- else: msg(cmessages['usr_rand_notice'] % uchars)
|
|
|
-
|
|
|
- prompt = "You may begin typing. %s symbols left: "
|
|
|
- msg_r(prompt % uchars)
|
|
|
-
|
|
|
- import time
|
|
|
- # time.clock() always returns zero, so we'll use time.time()
|
|
|
- saved_time = time.time()
|
|
|
-
|
|
|
- key_data,time_data = "",[]
|
|
|
-
|
|
|
- for i in range(uchars):
|
|
|
- key_data += get_char(immed_chars="ALL")
|
|
|
- msg_r("\r" + prompt % (uchars - i - 1))
|
|
|
- now = time.time()
|
|
|
- time_data.append(now - saved_time)
|
|
|
- saved_time = now
|
|
|
-
|
|
|
- if g.quiet: msg_r("\r")
|
|
|
- else: msg_r("\rThank you. That's enough.%s\n\n" % (" "*18))
|
|
|
-
|
|
|
- fmt_time_data = ["{:.22f}".format(i) for i in time_data]
|
|
|
-
|
|
|
- if g.debug:
|
|
|
- msg("\nUser input:\n%s\nKeystroke time intervals:\n%s\n" %
|
|
|
- (key_data,"\n".join(fmt_time_data)))
|
|
|
-
|
|
|
- prompt = "User random data successfully acquired. Press ENTER to continue"
|
|
|
- prompt_and_get_char(prompt,"",enter_ok=True)
|
|
|
-
|
|
|
- return key_data+"".join(fmt_time_data)
|
|
|
-
|
|
|
-
|
|
|
-def get_random(length,opts):
|
|
|
- from Crypto import Random
|
|
|
- os_rand = Random.new().read(length)
|
|
|
- if 'usr_randchars' in opts and opts['usr_randchars'] not in (0,-1):
|
|
|
- kwhat = "a key from random data with "
|
|
|
- if not g.user_entropy:
|
|
|
- g.user_entropy = sha256(
|
|
|
- get_random_data_from_user(opts['usr_randchars'])).digest()
|
|
|
- kwhat += "user entropy"
|
|
|
- else:
|
|
|
- kwhat += "saved user entropy"
|
|
|
- key = make_key(g.user_entropy, "", '2', what=kwhat)
|
|
|
- return encrypt_data(os_rand,key,what="random data",verify=False)
|
|
|
- else:
|
|
|
- return os_rand
|
|
|
-
|
|
|
-def my_raw_input(prompt,echo=True):
|
|
|
- try:
|
|
|
- if echo:
|
|
|
- reply = raw_input(prompt)
|
|
|
- else:
|
|
|
- from getpass import getpass
|
|
|
- reply = getpass(prompt)
|
|
|
- except KeyboardInterrupt:
|
|
|
- msg("\nUser interrupt")
|
|
|
- sys.exit(1)
|
|
|
-
|
|
|
- kb_hold_protect()
|
|
|
- return reply
|
|
|
-
|
|
|
-
|
|
|
-def _get_hash_params(hash_preset):
|
|
|
- if hash_preset in g.hash_presets:
|
|
|
- return g.hash_presets[hash_preset] # N,p,r,buflen
|
|
|
- else: # Shouldn't be here
|
|
|
- msg("%s: invalid 'hash_preset' value" % hash_preset)
|
|
|
- sys.exit(3)
|
|
|
-
|
|
|
-
|
|
|
-def show_hash_presets():
|
|
|
- fs = " {:<7} {:<6} {:<3} {}"
|
|
|
- msg("Available parameters for scrypt.hash():")
|
|
|
- msg(fs.format("Preset","N","r","p"))
|
|
|
- for i in sorted(g.hash_presets.keys()):
|
|
|
- msg(fs.format("'%s'" % i, *g.hash_presets[i]))
|
|
|
- msg("N = memory usage (power of two), p = iterations (rounds)")
|
|
|
- sys.exit(0)
|
|
|
-
|
|
|
-
|
|
|
cmessages = {
|
|
|
'null': "",
|
|
|
'incog_iv_id': """
|
|
@@ -189,71 +96,113 @@ just hit ENTER twice.
|
|
|
""".strip()
|
|
|
}
|
|
|
|
|
|
+def get_extension(f):
|
|
|
+ import os
|
|
|
+ return os.path.splitext(f)[1][1:]
|
|
|
|
|
|
-def confirm_or_exit(message, question, expect="YES"):
|
|
|
-
|
|
|
- vmsg("")
|
|
|
-
|
|
|
- m = message.strip()
|
|
|
- if m: msg(m)
|
|
|
-
|
|
|
- conf_msg = "Type uppercase '%s' to confirm: " % expect
|
|
|
-
|
|
|
- p = question+" "+conf_msg if question[0].isupper() else \
|
|
|
- "Are you sure you want to %s?\n%s" % (question,conf_msg)
|
|
|
-
|
|
|
- if my_raw_input(p).strip() != expect:
|
|
|
- msg("Exiting at user request")
|
|
|
- sys.exit(2)
|
|
|
-
|
|
|
- vmsg("")
|
|
|
+def make_chksum_N(s,n,sep=False):
|
|
|
+ if n%4 or not (4 <= n <= 64): return False
|
|
|
+ s = sha256(sha256(s).digest()).hexdigest().upper()
|
|
|
+ sep = " " if sep else ""
|
|
|
+ return sep.join([s[i*4:i*4+4] for i in range(n/4)])
|
|
|
+def make_chksum_8(s,sep=False):
|
|
|
+ s = sha256(sha256(s).digest()).hexdigest()[:8].upper()
|
|
|
+ return "{} {}".format(s[:4],s[4:]) if sep else s
|
|
|
+def make_chksum_6(s): return sha256(s).hexdigest()[:6]
|
|
|
+def make_iv_chksum(s): return sha256(s).hexdigest()[:8].upper()
|
|
|
|
|
|
+def splitN(s,n,sep=None): # always return an n-element list
|
|
|
+ ret = s.split(sep,n-1)
|
|
|
+ return ret + ["" for i in range(n-len(ret))]
|
|
|
+def split2(s,sep=None): return splitN(s,2,sep) # always return a 2-element list
|
|
|
+def split3(s,sep=None): return splitN(s,3,sep) # always return a 3-element list
|
|
|
|
|
|
-def user_confirm(prompt,default_yes=False,verbose=False):
|
|
|
+def col4(s):
|
|
|
+ nondiv = 1 if len(s) % 4 else 0
|
|
|
+ return " ".join([s[4*i:4*i+4] for i in range(len(s)/4 + nondiv)])
|
|
|
|
|
|
- q = "(Y/n)" if default_yes else "(y/N)"
|
|
|
+def make_timestamp():
|
|
|
+ import time
|
|
|
+ tv = time.gmtime(time.time())[:6]
|
|
|
+ return "{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}".format(*tv)
|
|
|
+def make_timestr():
|
|
|
+ import time
|
|
|
+ tv = time.gmtime(time.time())[:6]
|
|
|
+ return "{:04d}/{:02d}/{:02d} {:02d}:{:02d}:{:02d}".format(*tv)
|
|
|
+def secs_to_hms(secs):
|
|
|
+ return "{:02d}:{:02d}:{:02d}".format(secs/3600, (secs/60) % 60, secs % 60)
|
|
|
|
|
|
- while True:
|
|
|
- reply = get_char("%s %s: " % (prompt, q)).strip("\n\r")
|
|
|
+def _is_hex(s):
|
|
|
+ try: int(s,16)
|
|
|
+ except: return False
|
|
|
+ else: return True
|
|
|
|
|
|
- if not reply:
|
|
|
- if default_yes: msg(""); return True
|
|
|
- else: msg(""); return False
|
|
|
- elif reply in 'yY': msg(""); return True
|
|
|
- elif reply in 'nN': msg(""); return False
|
|
|
- else:
|
|
|
- if verbose: msg("\nInvalid reply")
|
|
|
- else: msg_r("\r")
|
|
|
+def match_ext(addr,ext):
|
|
|
+ return addr.split(".")[-1] == ext
|
|
|
|
|
|
+def get_from_brain_opt_params(opts):
|
|
|
+ l,p = opts['from_brain'].split(",")
|
|
|
+ return(int(l),p)
|
|
|
|
|
|
-def prompt_and_get_char(prompt,chars,enter_ok=False,verbose=False):
|
|
|
+def pretty_hexdump(data,gw=2,cols=8,line_nums=False):
|
|
|
+ r = 1 if len(data) % gw else 0
|
|
|
+ return "".join(
|
|
|
+ [
|
|
|
+ ("" if (line_nums == False or i % cols) else "%03i: " % (i/cols)) +
|
|
|
+ hexlify(data[i*gw:i*gw+gw]) +
|
|
|
+ (" " if (i+1) % cols else "\n")
|
|
|
+ for i in range(len(data)/gw + r)
|
|
|
+ ]
|
|
|
+ ).rstrip()
|
|
|
|
|
|
- while True:
|
|
|
- reply = get_char("%s: " % prompt).strip("\n\r")
|
|
|
+def decode_pretty_hexdump(data):
|
|
|
+ import re
|
|
|
+ lines = [re.sub('^\d+:\s+','',l) for l in data.split("\n")]
|
|
|
+ return unhexlify("".join(("".join(lines).split())))
|
|
|
|
|
|
- if reply in chars or (enter_ok and not reply):
|
|
|
- msg("")
|
|
|
- return reply
|
|
|
+def get_hash_params(hash_preset):
|
|
|
+ if hash_preset in g.hash_presets:
|
|
|
+ return g.hash_presets[hash_preset] # N,p,r,buflen
|
|
|
+ else: # Shouldn't be here
|
|
|
+ msg("%s: invalid 'hash_preset' value" % hash_preset)
|
|
|
+ sys.exit(3)
|
|
|
|
|
|
- if verbose: msg("\nInvalid reply")
|
|
|
- else: msg_r("\r")
|
|
|
+def show_hash_presets():
|
|
|
+ fs = " {:<7} {:<6} {:<3} {}"
|
|
|
+ msg("Available parameters for scrypt.hash():")
|
|
|
+ msg(fs.format("Preset","N","r","p"))
|
|
|
+ for i in sorted(g.hash_presets.keys()):
|
|
|
+ msg(fs.format("'%s'" % i, *g.hash_presets[i]))
|
|
|
+ msg("N = memory usage (power of two), p = iterations (rounds)")
|
|
|
+ sys.exit(0)
|
|
|
|
|
|
+def compare_checksums(chksum1, desc1, chksum2, desc2):
|
|
|
|
|
|
-def make_chksum_N(s,n,sep=False):
|
|
|
- if n%4 or not (4 <= n <= 64): return False
|
|
|
- s = sha256(sha256(s).digest()).hexdigest().upper()
|
|
|
- sep = " " if sep else ""
|
|
|
- return sep.join([s[i*4:i*4+4] for i in range(n/4)])
|
|
|
+ if chksum1.lower() == chksum2.lower():
|
|
|
+ vmsg("OK (%s)" % chksum1.upper())
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ if g.debug:
|
|
|
+ print \
|
|
|
+ "ERROR!\nComputed checksum %s (%s) doesn't match checksum %s (%s)" \
|
|
|
+ % (desc1,chksum1,desc2,chksum2)
|
|
|
+ return False
|
|
|
|
|
|
-def make_chksum_8(s,sep=False):
|
|
|
- s = sha256(sha256(s).digest()).hexdigest()[:8].upper()
|
|
|
- return "{} {}".format(s[:4],s[4:]) if sep else s
|
|
|
+def get_default_wordlist():
|
|
|
|
|
|
-def make_chksum_6(s):
|
|
|
- return sha256(s).hexdigest()[:6]
|
|
|
+ wl_id = g.default_wl
|
|
|
+ if wl_id == "electrum": from mmgen.mn_electrum import electrum_words as wl
|
|
|
+ elif wl_id == "tirosh": from mmgen.mn_tirosh import tirosh_words as wl
|
|
|
+ return wl.strip().split("\n")
|
|
|
|
|
|
-def make_iv_chksum(s):
|
|
|
- return sha256(s).hexdigest()[:8].upper()
|
|
|
+def open_file_or_exit(filename,mode):
|
|
|
+ try:
|
|
|
+ f = open(filename, mode)
|
|
|
+ except:
|
|
|
+ what = "reading" if mode == 'r' else "writing"
|
|
|
+ msg("Unable to open file '%s' for %s" % (filename,what))
|
|
|
+ sys.exit(2)
|
|
|
+ return f
|
|
|
|
|
|
|
|
|
def check_file_type_and_access(fname,ftype):
|
|
@@ -287,6 +236,7 @@ def check_infile(f): return check_file_type_and_access(f,"input file")
|
|
|
def check_outfile(f): return check_file_type_and_access(f,"output file")
|
|
|
def check_outdir(f): return check_file_type_and_access(f,"directory")
|
|
|
|
|
|
+
|
|
|
def _validate_addr_num(n):
|
|
|
|
|
|
try: n = int(n)
|
|
@@ -301,6 +251,12 @@ def _validate_addr_num(n):
|
|
|
return n
|
|
|
|
|
|
|
|
|
+def make_full_path(outdir,outfile):
|
|
|
+ import os
|
|
|
+ return os.path.normpath(os.sep.join([outdir, os.path.basename(outfile)]))
|
|
|
+# os.path.join() doesn't work?
|
|
|
+
|
|
|
+
|
|
|
def parse_address_list(arg,sep=","):
|
|
|
|
|
|
ret = []
|
|
@@ -354,65 +310,23 @@ def get_new_passphrase(what, opts):
|
|
|
return pw
|
|
|
|
|
|
|
|
|
-def _scrypt_hash_passphrase(passwd, salt, hash_preset, buflen=32):
|
|
|
-
|
|
|
- # Buflen arg is for brainwallets only, which use this function to generate
|
|
|
- # the seed directly.
|
|
|
-
|
|
|
- N,r,p = _get_hash_params(hash_preset)
|
|
|
-
|
|
|
- import scrypt
|
|
|
- return scrypt.hash(passwd, salt, 2**N, r, p, buflen=buflen)
|
|
|
-
|
|
|
-
|
|
|
-def get_from_brain_opt_params(opts):
|
|
|
- l,p = opts['from_brain'].split(",")
|
|
|
- return(int(l),p)
|
|
|
-
|
|
|
-
|
|
|
-def _get_seed_from_brain_passphrase(words,opts):
|
|
|
- bp = " ".join(words)
|
|
|
- if g.debug: print "Sanitized brain passphrase: %s" % bp
|
|
|
- seed_len,hash_preset = get_from_brain_opt_params(opts)
|
|
|
- if g.debug: print "Brainwallet l = %s, p = %s" % (seed_len,hash_preset)
|
|
|
- vmsg_r("Hashing brainwallet data. Please wait...")
|
|
|
- # Use buflen arg of scrypt.hash() to get seed of desired length
|
|
|
- seed = _scrypt_hash_passphrase(bp, "", hash_preset, buflen=seed_len/8)
|
|
|
- vmsg("Done")
|
|
|
- return seed
|
|
|
-
|
|
|
-
|
|
|
-def encrypt_seed(seed, key):
|
|
|
- return encrypt_data(seed, key, iv=1, what="seed")
|
|
|
-
|
|
|
-def encrypt_data(data, key, iv=1, what="data", verify=True):
|
|
|
- """
|
|
|
- Encrypt arbitrary data using AES256 in counter mode
|
|
|
- """
|
|
|
-
|
|
|
- # 192-bit seed is 24 bytes -> not multiple of 16. Must use MODE_CTR
|
|
|
- from Crypto.Cipher import AES
|
|
|
- from Crypto.Util import Counter
|
|
|
+def confirm_or_exit(message, question, expect="YES"):
|
|
|
|
|
|
- vmsg("Encrypting %s" % what)
|
|
|
+ vmsg("")
|
|
|
|
|
|
- c = AES.new(key, AES.MODE_CTR,
|
|
|
- counter=Counter.new(g.aesctr_iv_len*8,initial_value=iv))
|
|
|
- enc_data = c.encrypt(data)
|
|
|
+ m = message.strip()
|
|
|
+ if m: msg(m)
|
|
|
|
|
|
- if verify:
|
|
|
- vmsg_r("Performing a test decryption of the %s..." % what)
|
|
|
+ conf_msg = "Type uppercase '%s' to confirm: " % expect
|
|
|
|
|
|
- c = AES.new(key, AES.MODE_CTR,
|
|
|
- counter=Counter.new(g.aesctr_iv_len*8,initial_value=iv))
|
|
|
- dec_data = c.decrypt(enc_data)
|
|
|
+ p = question+" "+conf_msg if question[0].isupper() else \
|
|
|
+ "Are you sure you want to %s?\n%s" % (question,conf_msg)
|
|
|
|
|
|
- if dec_data == data: vmsg("done\n")
|
|
|
- else:
|
|
|
- msg("ERROR.\nDecrypted %s doesn't match original %s" % (what,what))
|
|
|
- sys.exit(2)
|
|
|
+ if my_raw_input(p).strip() != expect:
|
|
|
+ msg("Exiting at user request")
|
|
|
+ sys.exit(2)
|
|
|
|
|
|
- return enc_data
|
|
|
+ vmsg("")
|
|
|
|
|
|
|
|
|
def write_to_stdout(data, what, confirm=True):
|
|
@@ -422,36 +336,12 @@ def write_to_stdout(data, what, confirm=True):
|
|
|
try:
|
|
|
import os
|
|
|
of = os.readlink("/proc/%d/fd/1" % os.getpid())
|
|
|
- msg("Redirecting output to file '%s'" % of)
|
|
|
+ msg("Redirecting output to file '%s'" % os.path.relpath(of))
|
|
|
except:
|
|
|
msg("Redirecting output to file")
|
|
|
sys.stdout.write(data)
|
|
|
|
|
|
|
|
|
-def get_default_wordlist():
|
|
|
-
|
|
|
- wl_id = g.default_wl
|
|
|
- if wl_id == "electrum": from mmgen.mn_electrum import electrum_words as wl
|
|
|
- elif wl_id == "tirosh": from mmgen.mn_tirosh import tirosh_words as wl
|
|
|
- return wl.strip().split("\n")
|
|
|
-
|
|
|
-
|
|
|
-def open_file_or_exit(filename,mode):
|
|
|
- try:
|
|
|
- f = open(filename, mode)
|
|
|
- except:
|
|
|
- what = "reading" if mode == 'r' else "writing"
|
|
|
- msg("Unable to open file '%s' for %s" % (filename,what))
|
|
|
- sys.exit(2)
|
|
|
- return f
|
|
|
-
|
|
|
-
|
|
|
-def make_full_path(outdir,outfile):
|
|
|
- import os
|
|
|
- return os.path.normpath(os.sep.join([outdir, os.path.basename(outfile)]))
|
|
|
-# os.path.join() doesn't work?
|
|
|
-
|
|
|
-
|
|
|
def write_to_file(outfile,data,opts,what="data",confirm=False,verbose=False):
|
|
|
|
|
|
if 'outdir' in opts: outfile = make_full_path(opts['outdir'],outfile)
|
|
@@ -487,11 +377,12 @@ def export_to_file(outfile, data, opts, what="data"):
|
|
|
write_to_file(outfile,data,opts,what,c,True)
|
|
|
|
|
|
|
|
|
-def _display_control_data(label,metadata,hash_preset,salt,enc_seed):
|
|
|
+from mmgen.bitcoin import b58decode_pad,b58encode_pad
|
|
|
+
|
|
|
+def display_control_data(label,metadata,hash_preset,salt,enc_seed):
|
|
|
msg("WALLET DATA")
|
|
|
fs = " {:18} {}"
|
|
|
pw_empty = "yes" if metadata[3] == "E" else "no"
|
|
|
- from mmgen.bitcoin import b58encode_pad
|
|
|
for i in (
|
|
|
("Label:", label),
|
|
|
("Seed ID:", metadata[0].upper()),
|
|
@@ -499,7 +390,7 @@ def _display_control_data(label,metadata,hash_preset,salt,enc_seed):
|
|
|
("Seed length:", "%s bits (%s bytes)" %
|
|
|
(metadata[2],int(metadata[2])/8)),
|
|
|
("Scrypt params:", "Preset '%s' (%s)" % (hash_preset,
|
|
|
- " ".join([str(i) for i in _get_hash_params(hash_preset)]))),
|
|
|
+ " ".join([str(i) for i in get_hash_params(hash_preset)]))),
|
|
|
("Passphrase empty?", pw_empty.capitalize()),
|
|
|
("Timestamp:", "%s UTC" % metadata[4]),
|
|
|
): msg(fs.format(*i))
|
|
@@ -515,31 +406,6 @@ def _display_control_data(label,metadata,hash_preset,salt,enc_seed):
|
|
|
): msg(fs.format(*i))
|
|
|
|
|
|
|
|
|
-def splitN(s,n,sep=None): # always return an n-element list
|
|
|
- ret = s.split(sep,n-1)
|
|
|
- return ret + ["" for i in range(n-len(ret))]
|
|
|
-
|
|
|
-def split2(s,sep=None): return splitN(s,2,sep) # always return a 2-element list
|
|
|
-def split3(s,sep=None): return splitN(s,3,sep) # always return a 3-element list
|
|
|
-
|
|
|
-def col4(s):
|
|
|
- nondiv = 1 if len(s) % 4 else 0
|
|
|
- return " ".join([s[4*i:4*i+4] for i in range(len(s)/4 + nondiv)])
|
|
|
-
|
|
|
-def make_timestamp():
|
|
|
- import time
|
|
|
- tv = time.gmtime(time.time())[:6]
|
|
|
- return "{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}".format(*tv)
|
|
|
-
|
|
|
-def make_timestr():
|
|
|
- import time
|
|
|
- tv = time.gmtime(time.time())[:6]
|
|
|
- return "{:04d}/{:02d}/{:02d} {:02d}:{:02d}:{:02d}".format(*tv)
|
|
|
-
|
|
|
-def secs_to_hms(secs):
|
|
|
- return "{:02d}:{:02d}:{:02d}".format(secs/3600, (secs/60) % 60, secs % 60)
|
|
|
-
|
|
|
-
|
|
|
def write_wallet_to_file(seed, passwd, key_id, salt, enc_seed, opts):
|
|
|
|
|
|
seed_id = make_chksum_8(seed)
|
|
@@ -555,7 +421,7 @@ def write_wallet_to_file(seed, passwd, key_id, salt, enc_seed, opts):
|
|
|
lines = (
|
|
|
label,
|
|
|
"{} {} {} {} {}".format(*metadata),
|
|
|
- "{}: {} {} {}".format(hash_preset,*_get_hash_params(hash_preset)),
|
|
|
+ "{}: {} {} {}".format(hash_preset,*get_hash_params(hash_preset)),
|
|
|
"{} {}".format(make_chksum_6(sf), col4(sf)),
|
|
|
"{} {}".format(make_chksum_6(esf), col4(esf))
|
|
|
)
|
|
@@ -569,28 +435,8 @@ def write_wallet_to_file(seed, passwd, key_id, salt, enc_seed, opts):
|
|
|
write_to_file(outfile,d,opts,"wallet",c,True)
|
|
|
|
|
|
if g.verbose:
|
|
|
- _display_control_data(label,metadata,hash_preset,salt,enc_seed)
|
|
|
-
|
|
|
+ display_control_data(label,metadata,hash_preset,salt,enc_seed)
|
|
|
|
|
|
-def _compare_checksums(chksum1, desc1, chksum2, desc2):
|
|
|
-
|
|
|
- if chksum1.lower() == chksum2.lower():
|
|
|
- vmsg("OK (%s)" % chksum1.upper())
|
|
|
- return True
|
|
|
- else:
|
|
|
- if g.debug:
|
|
|
- print \
|
|
|
- "ERROR!\nComputed checksum %s (%s) doesn't match checksum %s (%s)" \
|
|
|
- % (desc1,chksum1,desc2,chksum2)
|
|
|
- return False
|
|
|
-
|
|
|
-def _is_hex(s):
|
|
|
- try: int(s,16)
|
|
|
- except: return False
|
|
|
- else: return True
|
|
|
-
|
|
|
-def match_ext(addr,ext):
|
|
|
- return addr.split(".")[-1] == ext
|
|
|
|
|
|
def _check_mmseed_format(words):
|
|
|
|
|
@@ -664,7 +510,7 @@ def get_data_from_wallet(infile,silent=False):
|
|
|
hash_preset = hd[0][:-1]
|
|
|
hash_params = [int(i) for i in hd[1:]]
|
|
|
|
|
|
- if hash_params != _get_hash_params(hash_preset):
|
|
|
+ if hash_params != get_hash_params(hash_preset):
|
|
|
msg("Hash parameters '%s' don't match hash preset '%s'" %
|
|
|
(" ".join(hash_params), hash_preset))
|
|
|
sys.exit(9)
|
|
@@ -702,22 +548,29 @@ def _get_words_from_file(infile,what):
|
|
|
return words
|
|
|
|
|
|
|
|
|
-def get_lines_from_file(infile,what="",remove_comments=False):
|
|
|
+def get_words(infile,what,prompt,opts):
|
|
|
+ if infile:
|
|
|
+ return _get_words_from_file(infile,what)
|
|
|
+ else:
|
|
|
+ return _get_words_from_user(prompt,opts)
|
|
|
+
|
|
|
+def remove_comments(lines):
|
|
|
+ import re
|
|
|
+ # re.sub(pattern, repl, string, count=0, flags=0)
|
|
|
+ ret = []
|
|
|
+ for i in lines:
|
|
|
+ i = re.sub('#.*','',i,1)
|
|
|
+ i = re.sub('\s+$','',i)
|
|
|
+ if i: ret.append(i)
|
|
|
+ return ret
|
|
|
+
|
|
|
+def get_lines_from_file(infile,what="",trim_comments=False):
|
|
|
if what != "":
|
|
|
qmsg("Getting %s from file '%s'" % (what,infile))
|
|
|
f = open_file_or_exit(infile,'r')
|
|
|
- lines = f.read().splitlines(); f.close()
|
|
|
- if remove_comments:
|
|
|
- import re
|
|
|
- # re.sub(pattern, repl, string, count=0, flags=0)
|
|
|
- ret = []
|
|
|
- for i in lines:
|
|
|
- i = re.sub('#.*','',i,1)
|
|
|
- i = re.sub('\s+$','',i)
|
|
|
- if i: ret.append(i)
|
|
|
- return ret
|
|
|
- else:
|
|
|
- return lines
|
|
|
+ lines = f.read().splitlines()
|
|
|
+ f.close()
|
|
|
+ return remove_comments(lines) if trim_comments else lines
|
|
|
|
|
|
|
|
|
def get_data_from_file(infile,what="data",dash=False):
|
|
@@ -729,7 +582,7 @@ def get_data_from_file(infile,what="data",dash=False):
|
|
|
return data
|
|
|
|
|
|
|
|
|
-def _get_seed_from_seed_data(words):
|
|
|
+def get_seed_from_seed_data(words):
|
|
|
|
|
|
if not _check_mmseed_format(words):
|
|
|
msg("Invalid %s data" % g.seed_ext)
|
|
@@ -741,7 +594,7 @@ def _get_seed_from_seed_data(words):
|
|
|
chk = make_chksum_6(seed_b58)
|
|
|
vmsg_r("Validating %s checksum..." % g.seed_ext)
|
|
|
|
|
|
- if _compare_checksums(chk, "from seed", stored_chk, "from input"):
|
|
|
+ if compare_checksums(chk, "from seed", stored_chk, "from input"):
|
|
|
seed = b58decode_pad(seed_b58)
|
|
|
if seed == False:
|
|
|
msg("Invalid b58 number: %s" % val)
|
|
@@ -782,25 +635,6 @@ def get_bitcoind_passphrase(prompt,opts):
|
|
|
echo=True if 'echo_passphrase' in opts else False)
|
|
|
|
|
|
|
|
|
-def get_seed_from_wallet(
|
|
|
- infile,
|
|
|
- opts,
|
|
|
- prompt="Enter {} wallet passphrase: ".format(g.proj_name),
|
|
|
- silent=False
|
|
|
- ):
|
|
|
-
|
|
|
- wdata = get_data_from_wallet(infile,silent=silent)
|
|
|
- label,metadata,hash_preset,salt,enc_seed = wdata
|
|
|
-
|
|
|
- if g.verbose: _display_control_data(*wdata)
|
|
|
-
|
|
|
- passwd = get_mmgen_passphrase(prompt,opts)
|
|
|
-
|
|
|
- key = make_key(passwd, salt, hash_preset)
|
|
|
-
|
|
|
- return decrypt_seed(enc_seed, key, metadata[0], metadata[1])
|
|
|
-
|
|
|
-
|
|
|
def check_data_fits_file_at_offset(fname,offset,dlen,action):
|
|
|
# TODO: Check for Windows
|
|
|
import os, stat
|
|
@@ -835,236 +669,6 @@ def get_hidden_incog_data(opts):
|
|
|
"Data read from file")
|
|
|
return data
|
|
|
|
|
|
-def get_seed_from_incog_wallet(
|
|
|
- infile,
|
|
|
- opts,
|
|
|
- prompt="Enter {} wallet passphrase: ".format(g.proj_name),
|
|
|
- silent=False,
|
|
|
- hex_input=False
|
|
|
- ):
|
|
|
-
|
|
|
- what = "incognito wallet data"
|
|
|
-
|
|
|
- if "from_incog_hidden" in opts:
|
|
|
- d = get_hidden_incog_data(opts)
|
|
|
- else:
|
|
|
- d = get_data_from_file(infile,what)
|
|
|
- if hex_input:
|
|
|
- try:
|
|
|
- d = unhexlify("".join(d.split()).strip())
|
|
|
- except:
|
|
|
- msg("Data in file '%s' is not in hexadecimal format" % infile)
|
|
|
- sys.exit(2)
|
|
|
- # File could be of invalid length, so check:
|
|
|
- valid_dlens = [i/8 + g.aesctr_iv_len + g.salt_len for i in g.seed_lens]
|
|
|
- if len(d) not in valid_dlens:
|
|
|
- qmsg("Invalid incognito file size: %s. Valid sizes (in bytes): %s" %
|
|
|
- (len(d), " ".join([str(i) for i in valid_dlens]))
|
|
|
- )
|
|
|
- return False
|
|
|
-
|
|
|
- iv, enc_incog_data = d[0:g.aesctr_iv_len], d[g.aesctr_iv_len:]
|
|
|
-
|
|
|
- msg("Incog ID: %s (IV ID: %s)" % (make_iv_chksum(iv),make_chksum_8(iv)))
|
|
|
- qmsg("Check the applicable value against your records.")
|
|
|
- vmsg(cmessages['incog_iv_id_hidden' if "from_incog_hidden" in opts
|
|
|
- else 'incog_iv_id'])
|
|
|
-
|
|
|
- passwd = get_mmgen_passphrase(prompt,opts)
|
|
|
-
|
|
|
- qmsg("Configured hash presets: %s" % " ".join(sorted(g.hash_presets)))
|
|
|
- while True:
|
|
|
- p = "Enter hash preset for %s wallet (default='%s'): "
|
|
|
- hp = my_raw_input(p % (g.proj_name, g.hash_preset))
|
|
|
- if not hp:
|
|
|
- hp = g.hash_preset; break
|
|
|
- elif hp in g.hash_presets:
|
|
|
- break
|
|
|
- msg("%s: Invalid hash preset" % hp)
|
|
|
-
|
|
|
- # IV is used BOTH to initialize counter and to salt password!
|
|
|
- key = make_key(passwd, iv, hp, "wrapper key")
|
|
|
- d = decrypt_data(enc_incog_data, key, int(hexlify(iv),16), "incog data")
|
|
|
- if d == False: sys.exit(2)
|
|
|
-
|
|
|
- salt,enc_seed = d[0:g.salt_len], d[g.salt_len:]
|
|
|
-
|
|
|
- key = make_key(passwd, salt, hp, "main key")
|
|
|
- vmsg("Key ID: %s" % make_chksum_8(key))
|
|
|
-
|
|
|
- seed = decrypt_seed(enc_seed, key, "", "")
|
|
|
- qmsg("Seed ID: %s. Check that this value is correct." % make_chksum_8(seed))
|
|
|
- vmsg(cmessages['incog_key_id_hidden' if "from_incog_hidden" in opts
|
|
|
- else 'incog_key_id'])
|
|
|
-
|
|
|
- return seed
|
|
|
-
|
|
|
-
|
|
|
-def make_key(passwd, salt, hash_preset, what="key"):
|
|
|
-
|
|
|
- vmsg_r("Generating %s. Please wait..." % what)
|
|
|
- key = _scrypt_hash_passphrase(passwd, salt, hash_preset)
|
|
|
- vmsg("done")
|
|
|
- if g.debug: print "Key: %s" % hexlify(key)
|
|
|
- return key
|
|
|
-
|
|
|
-
|
|
|
-def decrypt_seed(enc_seed, key, seed_id, key_id):
|
|
|
-
|
|
|
- vmsg("Checking key...")
|
|
|
- chk1 = make_chksum_8(key)
|
|
|
- if key_id:
|
|
|
- if not _compare_checksums(chk1, "of key", key_id, "in header"):
|
|
|
- msg("Incorrect passphrase")
|
|
|
- return False
|
|
|
-
|
|
|
- dec_seed = decrypt_data(enc_seed, key, iv=1, what="seed")
|
|
|
-
|
|
|
- chk2 = make_chksum_8(dec_seed)
|
|
|
-
|
|
|
- if seed_id:
|
|
|
- if _compare_checksums(chk2,"of decrypted seed",seed_id,"in header"):
|
|
|
- qmsg("Passphrase is OK")
|
|
|
- else:
|
|
|
- if not g.debug:
|
|
|
- msg_r("Checking key ID...")
|
|
|
- if _compare_checksums(chk1, "of key", key_id, "in header"):
|
|
|
- msg("Key ID is correct but decryption of seed failed")
|
|
|
- else:
|
|
|
- msg("Incorrect passphrase")
|
|
|
-
|
|
|
- return False
|
|
|
-# else:
|
|
|
-# qmsg("Generated IDs (Seed/Key): %s/%s" % (chk2,chk1))
|
|
|
-
|
|
|
- if g.debug: print "Decrypted seed: %s" % hexlify(dec_seed)
|
|
|
-
|
|
|
- return dec_seed
|
|
|
-
|
|
|
-
|
|
|
-def decrypt_data(enc_data, key, iv=1, what="data"):
|
|
|
-
|
|
|
- vmsg("Decrypting %s with key..." % what)
|
|
|
-
|
|
|
- from Crypto.Cipher import AES
|
|
|
- from Crypto.Util import Counter
|
|
|
-
|
|
|
- c = AES.new(key, AES.MODE_CTR,
|
|
|
- counter=Counter.new(g.aesctr_iv_len*8,initial_value=iv))
|
|
|
-
|
|
|
- return c.decrypt(enc_data)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-def _get_words(infile,what,prompt,opts):
|
|
|
- if infile:
|
|
|
- return _get_words_from_file(infile,what)
|
|
|
- else:
|
|
|
- return _get_words_from_user(prompt,opts)
|
|
|
-
|
|
|
-
|
|
|
-def get_seed(infile,opts,silent=False):
|
|
|
-
|
|
|
- ext = get_extension(infile)
|
|
|
-
|
|
|
- if ext == g.mn_ext: source = "mnemonic"
|
|
|
- elif ext == g.brain_ext: source = "brainwallet"
|
|
|
- elif ext == g.seed_ext: source = "seed"
|
|
|
- elif ext == g.wallet_ext: source = "wallet"
|
|
|
- elif ext == g.incog_ext: source = "incognito wallet"
|
|
|
- elif ext == g.incog_hex_ext: source = "incognito wallet"
|
|
|
- elif 'from_mnemonic' in opts: source = "mnemonic"
|
|
|
- elif 'from_brain' in opts: source = "brainwallet"
|
|
|
- elif 'from_seed' in opts: source = "seed"
|
|
|
- elif 'from_incog' in opts: source = "incognito wallet"
|
|
|
- else:
|
|
|
- if infile: msg(
|
|
|
- "Invalid file extension for file: %s\nValid extensions: '.%s'" %
|
|
|
- (infile, "', '.".join(g.seedfile_exts)))
|
|
|
- else: msg("No seed source type specified and no file supplied")
|
|
|
- sys.exit(2)
|
|
|
-
|
|
|
- if source == "mnemonic":
|
|
|
- prompt = "Enter mnemonic: "
|
|
|
- words = _get_words(infile,"mnemonic data",prompt,opts)
|
|
|
- wl = get_default_wordlist()
|
|
|
- from mmgen.mnemonic import get_seed_from_mnemonic
|
|
|
- seed = get_seed_from_mnemonic(words,wl)
|
|
|
- elif source == "brainwallet":
|
|
|
- if 'from_brain' not in opts:
|
|
|
- msg("'--from-brain' parameters must be specified for brainwallet file")
|
|
|
- sys.exit(2)
|
|
|
- prompt = "Enter brainwallet passphrase: "
|
|
|
- words = _get_words(infile,"brainwallet data",prompt,opts)
|
|
|
- seed = _get_seed_from_brain_passphrase(words,opts)
|
|
|
- elif source == "seed":
|
|
|
- prompt = "Enter seed in %s format: " % g.seed_ext
|
|
|
- words = _get_words(infile,"seed data",prompt,opts)
|
|
|
- seed = _get_seed_from_seed_data(words)
|
|
|
- elif source == "wallet":
|
|
|
- seed = get_seed_from_wallet(infile, opts, silent=silent)
|
|
|
- elif source == "incognito wallet":
|
|
|
- h = True if ext == g.incog_hex_ext or 'from_incog_hex' in opts else False
|
|
|
- seed = get_seed_from_incog_wallet(infile, opts, silent=silent, hex_input=h)
|
|
|
-
|
|
|
-
|
|
|
- if infile and not seed and (
|
|
|
- source == "seed" or source == "mnemonic" or source == "incognito wallet"):
|
|
|
- msg("Invalid %s file '%s'" % (source,infile))
|
|
|
- sys.exit(2)
|
|
|
-
|
|
|
- if g.debug: print "Seed: %s" % hexlify(seed)
|
|
|
-
|
|
|
- return seed
|
|
|
-
|
|
|
-# Repeat if entered data is invalid
|
|
|
-def get_seed_retry(infile,opts):
|
|
|
- silent = False
|
|
|
- while True:
|
|
|
- seed = get_seed(infile,opts,silent=silent)
|
|
|
- silent = True
|
|
|
- if seed: return seed
|
|
|
-
|
|
|
-
|
|
|
-def do_pager(text):
|
|
|
-
|
|
|
- pagers = ["less","more"]
|
|
|
- shell = False
|
|
|
-
|
|
|
- from os import environ
|
|
|
-
|
|
|
-# Hack for MS Windows command line (i.e. non CygWin) environment
|
|
|
-# When 'shell' is true, Windows aborts the calling program if executable
|
|
|
-# not found.
|
|
|
-# When 'shell' is false, an exception is raised, invoking the fallback
|
|
|
-# 'print' instead of the pager.
|
|
|
-# We risk assuming that "more" will always be available on a stock
|
|
|
-# Windows installation.
|
|
|
- if sys.platform.startswith("win") and 'HOME' not in environ:
|
|
|
- shell = True
|
|
|
- pagers = ["more"]
|
|
|
-
|
|
|
- if 'PAGER' in environ and environ['PAGER'] != pagers[0]:
|
|
|
- pagers = [environ['PAGER']] + pagers
|
|
|
-
|
|
|
- for pager in pagers:
|
|
|
- end = "" if pager == "less" else "\n(end of text)\n"
|
|
|
- try:
|
|
|
- from subprocess import Popen, PIPE, STDOUT
|
|
|
- p = Popen([pager], stdin=PIPE, shell=shell)
|
|
|
- except: pass
|
|
|
- else:
|
|
|
- try:
|
|
|
- p.communicate(text+end+"\n")
|
|
|
- except KeyboardInterrupt:
|
|
|
- # Has no effect. Why?
|
|
|
- if pager != "less":
|
|
|
- msg("\n(User interrupt)\n")
|
|
|
- finally:
|
|
|
- msg_r("\r")
|
|
|
- break
|
|
|
- else: print text+end
|
|
|
-
|
|
|
|
|
|
def export_to_hidden_incog(incog_enc,opts):
|
|
|
outfile,offset = opts['export_incog_hidden'].split(",") #Already sanity-checked
|
|
@@ -1081,46 +685,50 @@ def export_to_hidden_incog(incog_enc,opts):
|
|
|
(os.path.relpath(outfile),offset))
|
|
|
|
|
|
|
|
|
-def pretty_hexdump(data,gw=2,cols=8,line_nums=False):
|
|
|
- r = 1 if len(data) % gw else 0
|
|
|
- return "".join(
|
|
|
- [
|
|
|
- ("" if (line_nums == False or i % cols) else "%03i: " % (i/cols)) +
|
|
|
- hexlify(data[i*gw:i*gw+gw]) +
|
|
|
- (" " if (i+1) % cols else "\n")
|
|
|
- for i in range(len(data)/gw + r)
|
|
|
- ]
|
|
|
- ).rstrip()
|
|
|
+from mmgen.term import kb_hold_protect,get_char
|
|
|
|
|
|
-def decode_pretty_hexdump(data):
|
|
|
- import re
|
|
|
- lines = [re.sub('^\d+:\s+','',l) for l in data.split("\n")]
|
|
|
- return unhexlify("".join(("".join(lines).split())))
|
|
|
+def my_raw_input(prompt,echo=True):
|
|
|
+ try:
|
|
|
+ if echo:
|
|
|
+ reply = raw_input(prompt)
|
|
|
+ else:
|
|
|
+ from getpass import getpass
|
|
|
+ reply = getpass(prompt)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ msg("\nUser interrupt")
|
|
|
+ sys.exit(1)
|
|
|
|
|
|
+ kb_hold_protect()
|
|
|
+ return reply
|
|
|
|
|
|
-def wallet_to_incog_data(infile,opts):
|
|
|
|
|
|
- d = get_data_from_wallet(infile,silent=True)
|
|
|
- seed_id,key_id,preset,salt,enc_seed = \
|
|
|
- d[1][0], d[1][1], d[2].split(":")[0], d[3], d[4]
|
|
|
+def user_confirm(prompt,default_yes=False,verbose=False):
|
|
|
|
|
|
- passwd = get_mmgen_passphrase("Enter mmgen passphrase: ",opts)
|
|
|
- key = make_key(passwd, salt, preset, "main key")
|
|
|
- # We don't need the seed; just do this to verify password.
|
|
|
- if decrypt_seed(enc_seed, key, seed_id, key_id) == False:
|
|
|
- sys.exit(2)
|
|
|
+ q = "(Y/n)" if default_yes else "(y/N)"
|
|
|
+
|
|
|
+ while True:
|
|
|
+ reply = get_char("%s %s: " % (prompt, q)).strip("\n\r")
|
|
|
+
|
|
|
+ if not reply:
|
|
|
+ if default_yes: msg(""); return True
|
|
|
+ else: msg(""); return False
|
|
|
+ elif reply in 'yY': msg(""); return True
|
|
|
+ elif reply in 'nN': msg(""); return False
|
|
|
+ else:
|
|
|
+ if verbose: msg("\nInvalid reply")
|
|
|
+ else: msg_r("\r")
|
|
|
|
|
|
- iv = get_random(g.aesctr_iv_len,opts)
|
|
|
- iv_id = make_iv_chksum(iv)
|
|
|
- msg("Incog ID: %s" % iv_id)
|
|
|
|
|
|
- # IV is used BOTH to initialize counter and to salt password!
|
|
|
- key = make_key(passwd, iv, preset, "wrapper key")
|
|
|
- m = "incog data"
|
|
|
- wrap_enc = encrypt_data(salt + enc_seed, key, int(hexlify(iv),16), m)
|
|
|
+def prompt_and_get_char(prompt,chars,enter_ok=False,verbose=False):
|
|
|
+
|
|
|
+ while True:
|
|
|
+ reply = get_char("%s: " % prompt).strip("\n\r")
|
|
|
|
|
|
- return iv+wrap_enc,seed_id,key_id,iv_id,preset
|
|
|
+ if reply in chars or (enter_ok and not reply):
|
|
|
+ msg("")
|
|
|
+ return reply
|
|
|
+
|
|
|
+ if verbose: msg("\nInvalid reply")
|
|
|
+ else: msg_r("\r")
|
|
|
|
|
|
|
|
|
-if __name__ == "__main__":
|
|
|
- print "util.py"
|