various changes and fixes

This commit is contained in:
The MMGen Project 2019-10-10 19:53:42 +00:00
commit d49159a92b
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
15 changed files with 98 additions and 70 deletions

View file

@ -27,6 +27,7 @@ class BadFilename(Exception): mmcode = 1
class SocketError(Exception): mmcode = 1
class UserAddressNotInWallet(Exception): mmcode = 1
class MnemonicError(Exception): mmcode = 1
class RangeError(Exception): mmcode = 1
# 2: yellow hl, message only
class InvalidTokenAddress(Exception): mmcode = 2

View file

@ -27,7 +27,7 @@ from mmgen.globalvars import g
import mmgen.share.Opts
from mmgen.util import *
def usage(): Die(2,'USAGE: {} {}'.format(g.prog_name,usage_txt))
def usage(): Die(1,'USAGE: {} {}'.format(g.prog_name,usage_txt))
def die_on_incompatible_opts(incompat_list):
for group in incompat_list:

View file

@ -323,7 +323,9 @@ class SeedShareList(SubSeedList):
assert A == B,'Data mismatch!\noriginal seed: {!r}\nrejoined seed: {!r}'.format(A,B)
def get_share_by_idx(self,idx,base_seed=False):
if idx == self.count:
if idx < 1 or idx > self.count:
raise RangeError('{}: share index out of range'.format(idx))
elif idx == self.count:
return self.last_share
elif self.master_share and idx == 1:
return self.master_share if base_seed else self.master_share.derived_seed
@ -348,7 +350,7 @@ class SeedShareList(SubSeedList):
fs2 = '{i:>5}: {}\n'
mfs1,mfs2,midx,msid = ('','','','')
if self.master_share:
mfs1,mfs2 = (' with master share #{} ({})',' master share #{}')
mfs1,mfs2 = (' with master share #{} ({})',' (master share #{})')
midx,msid = (self.master_share.idx,self.master_share.sid)
hdr = ' {} {} ({} bits)\n'.format('Seed:',self.parent_seed.sid.hl(),self.parent_seed.bitlen)
@ -458,9 +460,14 @@ class SeedSource(MMGenObject):
sstype.__name__, 'input file format'
)
if ss:
if seed or seed_bin:
sstype = cls.fmt_code_to_type(opt.out_fmt)
me = super(cls,cls).__new__(sstype or Wallet) # default to Wallet
me.seed = seed or Seed(seed_bin=seed_bin)
me.op = 'new'
elif ss:
sstype = ss.__class__ if passchg else cls.fmt_code_to_type(opt.out_fmt)
me = super(cls,cls).__new__(sstype or Wallet) # default: Wallet
me = super(cls,cls).__new__(sstype or Wallet)
me.seed = ss.seed
me.ss_in = ss
me.op = ('conv','pwchg_new')[bool(passchg)]
@ -477,16 +484,15 @@ class SeedSource(MMGenObject):
me = super(cls,cls).__new__(f.ftype)
me.infile = f
me.op = ('old','pwchg_old')[bool(passchg)]
elif in_fmt: # Input format
elif in_fmt:
sstype = cls.fmt_code_to_type(in_fmt)
me = super(cls,cls).__new__(sstype)
me.op = ('old','pwchg_old')[bool(passchg)]
else: # Called with no args, 'seed' or 'seed_bin' - initialize with random or supplied seed
else: # called with no arguments: initialize with random seed
sstype = cls.fmt_code_to_type(opt.out_fmt)
me = super(cls,cls).__new__(sstype or Wallet) # default: Wallet
me.seed = seed or Seed(seed_bin=seed_bin or None)
me = super(cls,cls).__new__(sstype or Wallet)
me.seed = Seed(None)
me.op = 'new'
# die(1,me.seed.sid.hl()) # DEBUG
return me
@ -1194,6 +1200,12 @@ class Brainwallet (SeedSourceEnc):
qmsg('Check this value against your records')
return True
def _format(self):
raise NotImplementedError('Brainwallet not supported as an output format')
def _encrypt(self):
raise NotImplementedError('Brainwallet not supported as an output format')
class IncogWallet (SeedSourceEnc):
file_mode = 'binary'

View file

@ -22,6 +22,11 @@ sha2.py: A non-optimized but very compact implementation of the SHA2 hash
SHA256Compress (unpadded SHA256, required for Zcash addresses)
"""
# IMPORTANT NOTE: Since GMP precision is platform-dependent, generated constants
# for SHA512 are not guaranteed to be correct! Therefore, the SHA512
# implementation must not be used for anything but testing and study. Test with
# the test/hashfunc.py script in the MMGen repository.
from struct import pack,unpack
class Sha2(object):
@ -52,7 +57,8 @@ class Sha2(object):
if cls.use_gmp:
from gmpy2 import context,set_context,sqrt,cbrt
set_context(context(precision=75))
# context() parameters are platform-dependent!
set_context(context(precision=75,round=1)) # OK for gmp 6.1.2 / gmpy 2.1.0
else:
cbrt = lambda n: pow(n, 1 / 3)

View file

@ -516,14 +516,13 @@ class MMGenToolCmdMnemonic(MMGenToolCmdBase):
def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"show stats for mnemonic wordlist"
conv_cls = mnemonic_fmts[fmt]['conv_cls']()
fmt in conv_cls.digits or die(1,"'{}': not a valid format".format(fmt))
conv_cls.check_wordlist(fmt)
return True
def mn_printlist( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, enum=False, pager=False ):
"print mnemonic wordlist"
self._get_mnemonic_fmt(fmt) # perform check
ret = mnemonic_fmts[fmt]['conv_cls']().digits[fmt]
conv_cls = mnemonic_fmts[fmt]['conv_cls']()
ret = conv_cls.get_wordlist(fmt)
if enum:
ret = ['{:>4} {}'.format(n,e) for n,e in enumerate(ret)]
return '\n'.join(ret)

View file

@ -96,9 +96,9 @@ def pformat(d):
def pmsg(*args):
msg(pformat(args if len(args) > 1 else args[0]))
def Pmsg(*args):
sys.stdout.write(ppformat(args if len(args) > 1 else args[0]) + '\n')
sys.stdout.write(pformat(args if len(args) > 1 else args[0]) + '\n')
def pdie(*args,exit_val=1):
sys.stderr.write(ppformat(args if len(args) > 1 else args[0]))
sys.stderr.write(pformat(args if len(args) > 1 else args[0]))
sys.exit(exit_val)
def set_for_type(val,refval,desc,invert_bool=False,src=None):
@ -340,6 +340,11 @@ class baseconv(object):
else:
return None
@classmethod
def get_wordlist(cls,wl_id):
cls.init_mn(wl_id)
return cls.digits[wl_id]
@classmethod
def get_wordlist_chksum(cls,wl_id):
cls.init_mn(wl_id)
@ -353,6 +358,7 @@ class baseconv(object):
@classmethod
def check_wordlist(cls,wl_id):
cls.init_mn(wl_id)
wl = cls.digits[wl_id]
qmsg('Wordlist: {}\nLength: {} words'.format(wl_id,len(wl)))

View file

@ -94,7 +94,7 @@ def pyethereum_sec2addr(sec):
def keyconv_sec2addr(sec):
p = sp.Popen(['keyconv','-C',g.coin,sec.wif],stderr=sp.PIPE,stdout=sp.PIPE)
o = p.stdout.read().decode().splitlines()
return o[1].split()[1],o[0].split()[1]
return (o[1].split()[1],o[0].split()[1])
def zcash_mini_sec2addr(sec):
p = sp.Popen(['zcash-mini','-key','-simple'],stderr=sp.PIPE,stdin=sp.PIPE,stdout=sp.PIPE)
@ -103,14 +103,20 @@ def zcash_mini_sec2addr(sec):
def pycoin_sec2addr(sec):
coin = ci.external_tests['testnet']['pycoin'][g.coin] if g.testnet else g.coin
key = pcku.parse_key(sec,[network_for_netcode(coin)])[1]
if key is None: die(1,"can't parse {}".format(sec))
d = {
'legacy': ('wif_uncompressed','address_uncompressed'),
'compressed': ('wif','address'),
'segwit': ('wif','p2sh_segwit'),
}[addr_type.name]
return [pcku.create_output(sec,key,network_for_netcode(coin),d[i])[0][d[i]] for i in (0,1)]
network = network_for_netcode(coin)
key = network.keys.private(secret_exponent=int(sec,16),is_compressed=addr_type.name != 'legacy')
if key is None:
die(1,"can't parse {}".format(sec))
if addr_type.name in ('segwit','bech32'):
hash160_c = key.hash160(is_compressed=True)
if addr_type.name == 'segwit':
p2sh_script = network.contract.for_p2pkh_wit(hash160_c)
addr = network.address.for_p2s(p2sh_script)
else:
addr = network.address.for_p2pkh_wit(hash160_c)
else:
addr = key.address()
return (key.wif(),addr)
# pycoin/networks/all.py pycoin/networks/legacy_networks.py
def init_external_prog():
@ -138,13 +144,11 @@ def init_external_prog():
ext_sec2addr = pyethereum_sec2addr
ext_lib = 'pyethereum'
elif test_support('pycoin'):
global network_for_netcode
try:
global pcku,secp256k1_generator,network_for_netcode
import pycoin.cmds.ku as pcku
from pycoin.ecdsa.secp256k1 import secp256k1_generator
from pycoin.networks.registry import network_for_netcode
except:
raise ImportError("Unable to import pycoin modules. Is pycoin installed and up-to-date?")
raise ImportError("Unable to import pycoin.networks.registry Is pycoin installed and up-to-date?")
ext_sec2addr = pycoin_sec2addr
ext_lib = 'pycoin'
elif test_support('keyconv'):
@ -176,14 +180,14 @@ def compare_test():
if g.coin not in ci.external_tests[('mainnet','testnet')[g.testnet]][ext_lib]:
msg("Coin '{}' incompatible with external generator '{}'".format(g.coin,ext_lib))
return
m = "Comparing address generators '{}' and '{}' for coin {}"
last_t = time.time()
A = kg_a.desc
B = ext_lib if b == 'ext' else kg_b.desc
if A == B:
msg('skipping - generation methods A and B are the same ({})'.format(A))
return
qmsg(green(m.format(A,B,g.coin)))
m = "Comparing address generators '{}' and '{}' for coin {}, addrtype {!r}"
qmsg(green(m.format(A,B,g.coin,addr_type.name)))
for i in range(rounds):
if opt.verbose or time.time() - last_t >= 0.1:

View file

@ -20,6 +20,11 @@ test/hashfunc.py: Test internal implementations of SHA256, SHA512 and Keccak256
"""
import sys,os
repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),os.pardir)))
os.chdir(repo_root)
sys.path = [repo_root] + sys.path
from mmgen.util import die
assert len(sys.argv) in (2,3),"Test takes 1 or 2 arguments: test name, plus optional rounds count"

View file

@ -141,6 +141,15 @@ class MMGenPexpect(object):
vmsg('{} file: {}'.format(desc,cyan(outfile.replace("'",''))))
return outfile
def hincog_create(self,hincog_bytes):
ret = self.expect(['Create? (Y/n): ',"'YES' to confirm: "])
if ret == 0:
self.send('\n')
self.expect('Enter file size: ',str(hincog_bytes)+'\n')
else:
self.send('YES\n')
return ret
def no_overwrite(self):
self.expect("Overwrite? Type uppercase 'YES' to confirm: ",'\n')
self.expect('Exiting at user request')

View file

@ -256,6 +256,7 @@ t_alts="
$gentest_py --coin=btc 2:ext $rounds
$gentest_py --coin=btc --type=compressed 2:ext $rounds
$gentest_py --coin=btc --type=segwit 2:ext $rounds
$gentest_py --coin=btc --type=bech32 2:ext $rounds
$gentest_py --coin=ltc 2:ext $rounds
$gentest_py --coin=ltc --type=compressed 2:ext $rounds
$gentest_py --coin=zec 2:ext $rounds
@ -389,8 +390,8 @@ f_ltc_rt='Regtest (Bob and Alice) mode tests for LTC completed'
i_tool2='Tooltest2'
s_tool2="The following tests will run '$tooltest2_py' for all supported coins"
t_tool2="
$tooltest2_py --quiet
$tooltest2_py --quiet --coin=btc
$tooltest2_py --quiet --fork # run once with --fork so commands are actually executed
$tooltest2_py --quiet --coin=btc --fork
$tooltest2_py --quiet --coin=btc --testnet=1
$tooltest2_py --quiet --coin=ltc
$tooltest2_py --quiet --coin=ltc --testnet=1

View file

@ -366,9 +366,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
t = self.spawn('mmgen-addrimport', self.eth_args[1:] + add_args + [fn])
if bad_input:
t.read()
t.req_exit_val = 2
return t
# if g.debug: t.expect("Type uppercase 'YES' to confirm: ",'YES\n')
t.expect('Importing')
t.expect(expect)
t.read()

View file

@ -504,12 +504,7 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
t.expect(m)
if desc == 'hidden incognito data':
self.write_to_tmpfile(incog_id_fn,incog_id)
ret = t.expect(['Create? (Y/n): ',"'YES' to confirm: "])
if ret == 0:
t.send('\n')
t.expect('Enter file size: ',str(hincog_bytes)+'\n')
else:
t.send('YES\n')
t.hincog_create(hincog_bytes)
if out_fmt == 'w': t.label()
return t.written_to_file(capfirst(desc),oo=True),t

View file

@ -159,7 +159,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
('bob_bal2d', "Bob's balance (minconf=2)"),
('bob_bal2e', "Bob's balance (showempty=1 sort=age)"),
('bob_bal2f', "Bob's balance (showempty=1 sort=age,reverse)"),
('bob_rbf_send', 'sending funds to Alice (RBF)'),
('bob_send_maybe_rbf', 'sending funds to Alice (RBF, if supported)'),
('get_mempool1', 'mempool (before RBF bump)'),
('bob_rbf_status1', 'getting status of transaction'),
('bob_rbf_bump', 'bumping RBF transaction'),
@ -557,16 +557,18 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return [self.get_addr_from_addrlist(user,sid,mmtype,idx-1)+amt_str for mmtype,idx,amt_str in data]
def bob_rbf_1output_create(self):
if g.coin != 'BTC': return 'skip' # non-coin-dependent test, so run just once for BTC
out_addr = self._create_tx_outputs('alice',(('B',5,''),))
t = self.spawn('mmgen-txcreate',['-d',self.tr.trash_dir,'-B','--bob','--rbf'] + out_addr)
return self.txcreate_ui_common(t,menu=[],inputs='3',interactive_fee='3s') # out amt: 199.99999343
def bob_rbf_1output_bump(self):
if g.coin != 'BTC': return 'skip'
ext = '9343,3]{x}.testnet.rawtx'.format(x='' if g.debug_utf8 else '')
txfile = get_file_with_ext(self.tr.trash_dir,ext,delete=False,no_dot=True)
return self.user_txbump('bob',self.tr.trash_dir,txfile,'8s',has_label=False,signed_tx=False)
def bob_rbf_send(self):
def bob_send_maybe_rbf(self):
outputs_cl = self._create_tx_outputs('alice',(('L',1,',60'),('C',1,',40'))) # alice_sid:L:1, alice_sid:C:1
outputs_cl += [self._user_sid('bob')+':'+rtBobOp3]
return self.user_txdo('bob',rtFee[1],outputs_cl,'3',
@ -586,8 +588,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return self.user_txdo('alice',None,outputs_cl,'1') # fee=None
def user_txbump(self,user,outdir,txfile,fee,add_args=[],has_label=True,signed_tx=True):
if not g.proto.cap('rbf'):
msg('Skipping RBF'); return 'skip'
if not g.proto.cap('rbf'): return 'skip'
os.environ['MMGEN_BOGUS_SEND'] = ''
t = self.spawn('mmgen-txbump',
['-d',outdir,'--'+user,'--tx-fee='+fee,'--output-to-reduce=c'] + add_args + [txfile])
@ -631,20 +632,18 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
self.write_to_tmpfile('rbf_txid',mp[0]+'\n')
return 'ok'
def bob_rbf_status(self,fee,exp1,exp2='',skip_bch=False):
if skip_bch and not g.proto.cap('rbf'):
msg('skipping test {} for BCH'.format(self.test_name))
return 'skip'
def bob_rbf_status(self,fee,exp1,exp2=''):
if not g.proto.cap('rbf'): return 'skip'
ext = ',{}]{x}.testnet.sigtx'.format(fee[:-1],x='' if g.debug_utf8 else '')
txfile = self.get_file_with_ext(ext,delete=False,no_dot=True)
return self.user_txsend_status('bob',txfile,exp1,exp2)
def bob_rbf_status1(self):
return self.bob_rbf_status(rtFee[1],'in mempool, replaceable',skip_bch=True)
if not g.proto.cap('rbf'): return 'skip'
return self.bob_rbf_status(rtFee[1],'in mempool, replaceable')
def get_mempool2(self):
if not g.proto.cap('rbf'):
msg('Skipping post-RBF mempool check'); return 'skip'
if not g.proto.cap('rbf'): return 'skip'
mp = self._get_mempool()
if len(mp) != 1:
rdie(2,'Mempool has more or less than one TX!')
@ -656,30 +655,27 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
def bob_rbf_status2(self):
if not g.proto.cap('rbf'): return 'skip'
return self.bob_rbf_status(rtFee[1],
'Transaction has been replaced','{} in mempool'.format(self.mempool[0]),
skip_bch=True)
'Transaction has been replaced','{} in mempool'.format(self.mempool[0]))
def bob_rbf_status3(self):
if not g.proto.cap('rbf'): return 'skip'
return self.bob_rbf_status(rtFee[2],'status: in mempool, replaceable',skip_bch=True)
return self.bob_rbf_status(rtFee[2],'status: in mempool, replaceable')
def bob_rbf_status4(self):
if not g.proto.cap('rbf'): return 'skip'
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 1 confirmation',
'Replacing transactions:\n {}'.format(self.mempool[0]),
skip_bch=True)
'Replacing transactions:\n {}'.format(self.mempool[0]))
def bob_rbf_status5(self):
if not g.proto.cap('rbf'): return 'skip'
return self.bob_rbf_status(rtFee[2],'Transaction has 1 confirmation',skip_bch=True)
return self.bob_rbf_status(rtFee[2],'Transaction has 1 confirmation')
def bob_rbf_status6(self):
if not g.proto.cap('rbf'): return 'skip'
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 2 confirmations',
'Replacing transactions:\n {}'.format(self.mempool[0]),
skip_bch=True)
'Replacing transactions:\n {}'.format(self.mempool[0]))
@staticmethod
def _gen_pairs(n):

View file

@ -159,16 +159,16 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
imsg('Creating block device image file')
ic_img = joinpath(self.tmpdir,'hincog_blkdev_img')
subprocess.check_output(['dd','if=/dev/zero','of='+ic_img,'bs=1K','count=1'],stderr=subprocess.PIPE)
ic_dev = subprocess.check_output(['/sbin/losetup','-f']).strip().decode()
ic_dev = subprocess.check_output(['sudo','/sbin/losetup','-f']).strip().decode()
ic_dev_mode_orig = '{:o}'.format(os.stat(ic_dev).st_mode & 0xfff)
ic_dev_mode = '0666'
imsg("Changing permissions on loop device to '{}'".format(ic_dev_mode))
subprocess.check_output(['sudo','chmod',ic_dev_mode,ic_dev],stderr=subprocess.PIPE)
imsg("Attaching loop device '{}'".format(ic_dev))
subprocess.check_output(['/sbin/losetup',ic_dev,ic_img])
subprocess.check_output(['sudo','/sbin/losetup',ic_dev,ic_img])
self.ref_hincog_conv_out(ic_f=ic_dev)
imsg("Detaching loop device '{}'".format(ic_dev))
subprocess.check_output(['/sbin/losetup','-d',ic_dev])
subprocess.check_output(['sudo','/sbin/losetup','-d',ic_dev])
imsg("Resetting permissions on loop device to '{}'".format(ic_dev_mode_orig))
subprocess.check_output(['sudo','chmod',ic_dev_mode_orig,ic_dev],stderr=subprocess.PIPE)
return 'ok'
@ -213,12 +213,7 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
for i in (1,2,3):
t.expect('Generating encryption key from OS random data ')
if desc == 'hidden incognito data':
ret = t.expect(['Create? (Y/n): ',"'YES' to confirm: "])
if ret == 0:
t.send('\n')
t.expect('Enter file size: ',str(hincog_bytes)+'\n')
else:
t.send('YES\n')
t.hincog_create(hincog_bytes)
if out_fmt == 'w': t.label()
wf = t.written_to_file(capfirst(desc),oo=True)
pf = None

View file

@ -690,7 +690,8 @@ def run_test(gid,cmd_name):
continue
cmd_out = run_func(cmd_name,args,out,opts,exec_code)
vmsg('Output: {}\n'.format(cmd_out if isinstance(out,str) else repr(cmd_out)))
try: vmsg('Output:\n{}\n'.format(cmd_out))
except: vmsg('Output:\n{}\n'.format(repr(cmd_out)))
def check_output(cmd_out,out):
if isinstance(out,str): out = out.encode()