f-strings, whitespace (test files) [44 files patched]

This commit is contained in:
The MMGen Project 2021-09-29 21:17:57 +00:00
commit 6692a43d59
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
44 changed files with 431 additions and 341 deletions

View file

@ -21,18 +21,20 @@ def test_color():
ret = get_terminfo_colors(t)
if ret == None:
set_vt100()
ymsg('Warning: unable to get info for terminal {!r}'.format(t))
ymsg(f'Warning: unable to get info for terminal {t!r}')
continue
msg('{}: {}'.format(t,ret))
assert c == ret, "'colors' value for terminal {} ({}) does not match expected value of {}".format(t,ret,c)
msg(f'{t}: {ret}')
assert c == ret, f"'colors' value for terminal {t} ({ret}) does not match expected value of {c}"
ret = get_terminfo_colors()
msg('This terminal ({}): {}'.format(os.getenv('TERM'),ret))
msg(f'This terminal ({os.getenv("TERM")}): {ret}')
set_vt100()
gmsg("Terminal display:")
for desc,n in (('auto','auto'),('8-color',8),('256-color',256)):
init_color(num_colors=n)
msg('{:9}: {}'.format(desc,' '.join([globals()[c](c) for c in sorted(_colors)])))
msg('{:9}: {}'.format(
desc,
' '.join(globals()[c](c) for c in sorted(_colors)) ))
test_color()

View file

@ -179,7 +179,7 @@ class GenToolPycoin(GenTool):
network = self.nfnc(vcoin)
key = network.keys.private(secret_exponent=int(sec,16),is_compressed=addr_type.name != 'legacy')
if key is None:
die(1,"can't parse {}".format(sec))
die(1,f'can’t parse {sec}')
if addr_type.name in ('segwit','bech32'):
hash160_c = key.hash160(is_compressed=True)
if addr_type.name == 'segwit':
@ -214,7 +214,7 @@ class GenToolMoneropy(GenTool):
def get_tool(arg):
if arg not in ext_progs + ['ext']:
die(1,'{!r}: unsupported tool for network {}'.format(arg,proto.network))
die(1,f'{arg!r}: unsupported tool for network {proto.network}')
if opt.all:
if arg == 'ext':
@ -268,7 +268,7 @@ def gentool_test(kg_a,kg_b,ag,rounds):
def do_compare_test(n,trounds,in_bytes):
global last_t
if opt.verbose or time.time() - last_t >= 0.1:
qmsg_r('\rRound {}/{} '.format(i+1,trounds))
qmsg_r(f'\rRound {i+1}/{trounds} ')
last_t = time.time()
sec = PrivKey(proto,in_bytes,compressed=addr_type.compressed,pubkey_type=addr_type.pubkey_type)
a_ph = kg_a.to_pubhex(sec)
@ -286,7 +286,7 @@ def gentool_test(kg_a,kg_b,ag,rounds):
b_addr = ag.to_addr(kg_b.to_pubhex(sec))
test_equal('addresses',a_addr,b_addr,*tinfo)
vmsg(fs.format(b=in_bytes.hex(),k=sec.wif,v=a_vk,a=a_addr))
qmsg_r('\rRound {}/{} '.format(n+1,trounds))
qmsg_r(f'\rRound {n+1}/{trounds} ')
fs = ( '\ninput: {b}\n%-9s {k}\naddr: {a}\n',
'\ninput: {b}\n%-9s {k}\nviewkey: {v}\naddr: {a}\n')[
@ -317,37 +317,39 @@ def speed_test(kg,ag,rounds):
from struct import pack,unpack
seed = os.urandom(28)
qmsg('Incrementing key with each round')
qmsg('Starting key: {}'.format((seed + pack('I',0)).hex()))
qmsg('Starting key: {}'.format(
(seed + pack('I',0)).hex()
))
import time
start = last_t = time.time()
for i in range(rounds):
if time.time() - last_t >= 0.1:
qmsg_r('\rRound {}/{} '.format(i+1,rounds))
qmsg_r(f'\rRound {i+1}/{rounds} ')
last_t = time.time()
sec = PrivKey(proto,seed+pack('I',i),compressed=addr_type.compressed,pubkey_type=addr_type.pubkey_type)
addr = ag.to_addr(kg.to_pubhex(sec))
vmsg('\nkey: {}\naddr: {}\n'.format(sec.wif,addr))
qmsg_r('\rRound {}/{} '.format(i+1,rounds))
qmsg('\n{} addresses generated in {:.2f} seconds'.format(rounds,time.time()-start))
vmsg(f'\nkey: {sec.wif}\naddr: {addr}\n')
qmsg_r(f'\rRound {i+1}/{rounds} ')
qmsg(f'\n{rounds} addresses generated in {time.time()-start:.2f} seconds')
def dump_test(kg,ag,fh):
dump = [[*(e.split()[0] for e in line.split('addr='))] for line in fh.readlines() if 'addr=' in line]
if not dump:
die(1,'File {!r} appears not to be a wallet dump'.format(fh.name))
die(1,f'File {fh.name!r} appears not to be a wallet dump')
m = 'Comparing output of address generator {!r} against wallet dump {!r}'
qmsg(green(m.format(kg.desc,fh.name)))
for count,(b_wif,b_addr) in enumerate(dump,1):
qmsg_r('\rKey {}/{} '.format(count,len(dump)))
qmsg_r(f'\rKey {count}/{len(dump)} ')
try:
b_sec = PrivKey(proto,wif=b_wif)
except:
die(2,'\nInvalid {} WIF address in dump file: {}'.format(proto.network,b_wif))
die(2,f'\nInvalid {proto.network} WIF address in dump file: {b_wif}')
a_addr = ag.to_addr(kg.to_pubhex(b_sec))
vmsg('\nwif: {}\naddr: {}\n'.format(b_wif,b_addr))
vmsg(f'\nwif: {b_wif}\naddr: {b_addr}\n')
tinfo = (bytes.fromhex(b_sec),b_sec,b_wif,kg.desc,fh.name)
test_equal('addresses',a_addr,b_addr,*tinfo)
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))
@ -362,7 +364,7 @@ def parse_arg1(arg,arg_id):
def check_gen_num(n):
if not (1 <= int(n) <= len(g.key_generators)):
die(1,'{}: invalid generator ID'.format(n))
die(1,f'{n}: invalid generator ID')
return int(n)
if arg_id == 'a':
@ -432,7 +434,7 @@ elif a and b and type(arg2) == int:
init_genonly_altcoins(testnet=proto.testnet)
for coin in ci.external_tests[proto.network][b.desc]:
if coin.lower() not in CoinProtocol.coins:
# ymsg('Coin {} not configured'.format(coin))
# ymsg(f'Coin {coin} not configured')
continue
proto = init_proto(coin)
if addr_type not in proto.mmtypes:

View file

@ -67,7 +67,7 @@ class TestHashFunc(object):
'\x00\x0f'*1024,'\x0e\x0f'*1023,'\x0a\x0d'*1025 )
for i,data in enumerate([inputs[input_n]] if input_n is not None else inputs):
msg('\rTesting reference input data: {:4}/{} '.format(i+1,len(inputs)))
msg(f'\rTesting reference input data: {i+1:4}/{len(inputs)} ')
self.compare_hashes(len(data),data.encode())
msg('OK\n')
@ -75,7 +75,7 @@ class TestHashFunc(object):
def test_random(self,rounds):
for i in range(rounds):
if i+1 in (1,rounds) or not (i+1) % 10:
msg('\rTesting random input data: {:4}/{} '.format(i+1,rounds))
msg(f'\rTesting random input data: {i+1:4}/{rounds} ')
dlen = int(os.urandom(4).hex(),16) >> 18
self.compare_hashes(dlen,os.urandom(dlen))
msg('OK\n')
@ -136,7 +136,7 @@ class TestSha512(TestSha2):
0x431d67c49c100d4c, 0x4cc5d4becb3e42b6, 0x597f299cfc657e2a, 0x5fcb6fab3ad6faec, 0x6c44198c4a475817 )
t = globals()['Test'+test]()
msg('Testing internal implementation of {}\n'.format(t.desc))
msg(f'Testing internal implementation of {t.desc}\n')
t.test_constants()
t.test_ref()
t.test_random(random_rounds)

View file

@ -8,7 +8,7 @@ action = g.prog_name.split('-')[0]
opts_data = {
'sets': [('debug',True,'verbose',True)],
'text': {
'desc': '{} coin daemons for the MMGen test suite'.format(action.capitalize()),
'desc': f'{action.capitalize()} coin daemons for the MMGen test suite',
'usage':'[opts] <network IDs>',
'options': """
-h, --help Print this help message

View file

@ -98,7 +98,8 @@ def cleandir(d,do_msg=False):
except: return
from shutil import rmtree
if do_msg: gmsg("Cleaning directory '{}'".format(d))
if do_msg:
gmsg(f'Cleaning directory {d!r}')
for f in files:
try:
os.unlink(os.path.join(d_enc,f))
@ -110,7 +111,7 @@ def mk_tmpdir(d):
except OSError as e:
if e.errno != 17: raise
else:
vmsg("Created directory '{}'".format(d))
vmsg(f'Created directory {d!r}')
def get_tmpfile(cfg,fn):
return os.path.join(cfg['tmpdir'],fn)
@ -143,9 +144,10 @@ def ok():
def cmp_or_die(s,t,desc=None):
if s != t:
m = 'ERROR: recoded data:\n{!r}\ndiffers from original data:\n{!r}'
if desc: m = 'For {}:\n{}'.format(desc,m)
raise TestSuiteFatalException(m.format(t,s))
raise TestSuiteFatalException(
(f'For {desc}:\n' if desc else '') +
f'ERROR: recoded data:\n{t!r}\ndiffers from original data:\n{s!r}'
)
def init_coverage():
coverdir = os.path.join('test','trace')

View file

@ -30,12 +30,12 @@ try:
import pexpect
from pexpect.popen_spawn import PopenSpawn
except ImportError as e:
die(2,red('Pexpect module is missing. Cannnot run test suite ({!r})'.format(e)))
die(2,red(f'Pexpect module is missing. Cannnot run test suite ({e!r})'))
def debug_pexpect_msg(p):
if opt.debug_pexpect:
msg('\n{}{}{}'.format(red('BEFORE ['),p.before,red(']')))
msg('{}{}{}'.format(red('MATCH ['),p.after,red(']')))
msg('\n{}{}{}'.format( red('BEFORE ['), p.before, red(']') ))
msg('{}{}{}'.format( red('MATCH ['), p.after, red(']') ))
NL = '\n'
@ -75,14 +75,14 @@ class MMGenPexpect(object):
def do_comment(self,add_comment,has_label=False):
p = ('Add a comment to transaction','Edit transaction comment')[has_label]
self.expect('{}? (y/N): '.format(p),('n','y')[bool(add_comment)])
self.expect(f'{p}? (y/N): ',('n','y')[bool(add_comment)])
if add_comment:
self.expect('Comment: ',add_comment+'\n')
def ok(self):
ret = self.p.wait()
if ret != self.req_exit_val and not opt.coverage:
die(1,red('test.py: spawned program exited with value {}'.format(ret)))
die(1,red(f'test.py: spawned program exited with value {ret}'))
if opt.profile:
return
if not self.skip_ok:
@ -110,19 +110,19 @@ class MMGenPexpect(object):
self.expect('ENTER to continue: ','\n')
def passphrase_new(self,desc,passphrase):
self.expect('Enter passphrase for {}: '.format(desc),passphrase+'\n')
self.expect(f'Enter passphrase for {desc}: ',passphrase+'\n')
self.expect('Repeat passphrase: ',passphrase+'\n')
def passphrase(self,desc,passphrase,pwtype=''):
if pwtype: pwtype += ' '
self.expect('Enter {}passphrase for {}.*?: '.format(pwtype,desc),passphrase+'\n',regex=True)
self.expect(f'Enter {pwtype}passphrase for {desc}.*?: ',passphrase+'\n',regex=True)
def hash_preset(self,desc,preset=''):
self.expect('Enter hash preset for {}'.format(desc))
self.expect(f'Enter hash preset for {desc}')
self.expect('or hit ENTER .*?:',str(preset)+'\n',regex=True)
def written_to_file(self,desc,overwrite_unlikely=False,query='Overwrite? ',oo=False):
s1 = '{} written to file '.format(desc)
s1 = f'{desc} written to file '
s2 = query + "Type uppercase 'YES' to confirm: "
ret = self.expect(([s1,s2],s1)[overwrite_unlikely])
if ret == 1:
@ -131,8 +131,8 @@ class MMGenPexpect(object):
self.expect(NL,nonl=True)
outfile = self.p.before.strip().strip("'")
if opt.debug_pexpect:
rmsg('Outfile [{}]'.format(outfile))
vmsg('{} file: {}'.format(desc,cyan(outfile.replace("'",''))))
rmsg(f'Outfile [{outfile}]')
vmsg('{} file: {}'.format( desc, cyan(outfile.replace('"',"")) ))
return outfile
def hincog_create(self,hincog_bytes):
@ -156,7 +156,7 @@ class MMGenPexpect(object):
debug_pexpect_msg(self.p)
end = self.p.before.rstrip()
if not g.debug:
vmsg(' ==> {}'.format(cyan(end)))
vmsg(f' ==> {cyan(end)}')
return end
def interactive(self):
@ -181,18 +181,18 @@ class MMGenPexpect(object):
ret = f(s)
except pexpect.TIMEOUT:
if opt.debug_pexpect: raise
m1 = red('\nERROR. Expect {!r} timed out. Exiting\n'.format(s))
m2 = 'before: [{}]\n'.format(self.p.before)
m3 = 'sent value: [{}]'.format(self.sent_value) if self.sent_value != None else ''
m1 = red(f'\nERROR. Expect {s!r} timed out. Exiting\n')
m2 = f'before: [{self.p.before}]\n'
m3 = f'sent value: [{self.sent_value}]' if self.sent_value != None else ''
rdie(1,m1+m2+m3)
debug_pexpect_msg(self.p)
if opt.verbose and type(s) != str:
msg_r(' ==> {} '.format(ret))
msg_r(f' ==> {ret} ')
if ret == -1:
rdie(1,'Error. Expect returned {}'.format(ret))
rdie(1,f'Error. Expect returned {ret}')
else:
if t == '':
if not nonl and not silent: vmsg('')
@ -209,9 +209,10 @@ class MMGenPexpect(object):
self.sent_value = t
if delay: time.sleep(delay)
if opt.verbose:
ls = (' ','')[bool(opt.debug or not s)]
es = (' ','')[bool(s)]
msg('{}SEND {}{}'.format(ls,es,yellow("'{}'".format(t.replace('\n',r'\n')))))
ls = '' if opt.debug or not s else ' '
es = '' if s else ' '
yt = yellow('{!r}'.format( t.replace('\n',r'\n') ))
msg(f'{ls}SEND {es}{yt}')
return ret
def read(self,n=-1):

View file

@ -11,24 +11,24 @@ cf_usr = cfg_file('usr')
cf_sys = cfg_file('sys')
cf_sample = cfg_file('sample')
msg('Usr cfg file: {}'.format(cf_usr.fn))
msg('Sys cfg file: {}'.format(cf_sys.fn))
msg('Sample cfg file: {}'.format(cf_sample.fn))
msg(f'Usr cfg file: {cf_usr.fn}')
msg(f'Sys cfg file: {cf_sys.fn}')
msg(f'Sample cfg file: {cf_sample.fn}')
if cmd_args:
if cmd_args[0] == 'parse_test':
ps = cf_sample.get_lines()
msg('parsed chunks: {}'.format(len(ps)))
msg(f'parsed chunks: {len(ps)}')
pu = cf_usr.get_lines()
msg('usr cfg: {}'.format(' '.join(['{}={}'.format(i.name,i.value) for i in pu])))
msg('usr cfg: {}'.format( ' '.join(f'{i.name}={i.value}' for i in pu) ))
elif cmd_args[0] == 'coin_specific_vars':
from mmgen.protocol import init_proto_from_opts
proto = init_proto_from_opts()
for varname in cmd_args[1:]:
print('{}.{}: {}'.format(
msg('{}.{}: {}'.format(
type(proto).__name__,
varname,
getattr(proto,varname)
))
elif cmd_args[0] == 'mnemonic_entry_modes':
print( 'mnemonic_entry_modes: {}'.format(g.mnemonic_entry_modes) )
msg( 'mnemonic_entry_modes: {}'.format(g.mnemonic_entry_modes) )

View file

@ -35,7 +35,7 @@ opts_data = {
},
'code': {
'options': lambda s: s.format(
kgs=' '.join(['{}:{}'.format(n,k) for n,k in enumerate(g.key_generators,1)]),
kgs=' '.join([f'{n}:{k}' for n,k in enumerate(g.key_generators,1)]),
kg=g.key_generator,
g=g,
),
@ -61,11 +61,11 @@ for k in (
'key_generator', # global_sets_opt
'hidden_incog_input_params',
):
msg('{:30} {}'.format('opt.'+k+':',getattr(opt,k)))
msg('{:30} {}'.format( f'opt.{k}:', getattr(opt,k) ))
msg('')
for k in (
'subseeds', # opt_sets_global
'key_generator', # global_sets_opt
):
msg('{:30} {}'.format('g.'+k+':',getattr(opt,k)))
msg('{:30} {}'.format( f'g.{k}:', getattr(opt,k) ))

View file

@ -65,7 +65,7 @@ def tt_my_raw_input():
"""))
get_char_raw('Ready? ',num_chars=1)
reply = my_raw_input('\nEnter text: ')
confirm('Did you enter the text {!r}?'.format(reply))
confirm(f'Did you enter the text {reply!r}?')
def tt_get_char(raw=False,one_char=False,sleep=0,immed_chars=''):
fname = ('get_char','get_char_raw')[raw]
@ -81,11 +81,11 @@ def tt_get_char(raw=False,one_char=False,sleep=0,immed_chars=''):
)[raw]
m2 = (
'',
'\nA delay of {} seconds will added before each prompt'.format(sleep)
f'\nA delay of {sleep} seconds will added before each prompt'
)[bool(sleep)]
m3 = (
'',
'\nThe characters {!r} will be repeated immediately, the others with delay.'.format(immed_chars)
f'\nThe characters {immed_chars!r} will be repeated immediately, the others with delay.'
)[bool(immed_chars)]
m4 = 'The F1-F12 keys will be ' + (
'blocked entirely.'
@ -104,13 +104,16 @@ def tt_get_char(raw=False,one_char=False,sleep=0,immed_chars=''):
if immed_chars:
kwargs.update({'immed_chars':immed_chars})
cmsg('Testing {}({}):'.format(fname,','.join(['{}={!r}'.format(*i) for i in kwargs.items()])))
msg(fs.format(m1,yellow(m2),yellow(m3),yellow(m4)))
cmsg('Testing {}({}):'.format(
fname,
','.join(f'{a}={b!r}' for a,b in kwargs.items())
))
msg(fs.format( m1, yellow(m2), yellow(m3), yellow(m4) ))
try:
while True:
ret = globals()[fname]('Enter a letter: ',**kwargs)
msg('You typed {!r}'.format(ret))
msg(f'You typed {ret!r}')
except KeyboardInterrupt:
msg('\nDone')
@ -118,14 +121,13 @@ def tt_urand():
cmsg('Testing _get_random_data_from_user():')
from mmgen.crypto import _get_random_data_from_user
ret = _get_random_data_from_user(10,desc='data').decode()
msg('USER ENTROPY (user input + keystroke timings):\n\n{}'.format(fmt(ret,' ')))
msg(f'USER ENTROPY (user input + keystroke timings):\n\n{fmt(ret," ")}')
times = ret.splitlines()[1:]
avg_prec = sum(len(t.split('.')[1]) for t in times) // len(times)
if avg_prec < g.min_time_precision:
m = 'WARNING: Avg. time precision of only {} decimal points. User entropy quality is degraded!'
ymsg(m.format(avg_prec))
ymsg(f'WARNING: Avg. time precision of only {avg_prec} decimal points. User entropy quality is degraded!')
else:
msg('Average time precision: {} decimal points - OK'.format(avg_prec))
msg(f'Average time precision: {avg_prec} decimal points - OK')
my_raw_input('Press ENTER to continue: ')
def tt_txview():

View file

@ -65,7 +65,7 @@ def get_descriptor_obj(objclass,attrname):
for o in (objclass,objclass.__bases__[0]): # assume there's only one base class
if attrname in o.__dict__:
return o.__dict__[attrname]
rdie(3,'unable to find descriptor {}.{}'.format(objclass.__name__,attrname))
rdie(3,f'unable to find descriptor {objclass.__name__}.{attrname}')
def test_attr_perm(obj,attrname,perm_name,perm_value,dobj,attrval_type):
@ -81,7 +81,7 @@ def test_attr_perm(obj,attrname,perm_name,perm_value,dobj,attrval_type):
try:
so = sample_objs[attrval_type.__name__]
except:
raise SampleObjError('unable to find sample object of type {!r}'.format(attrval_type.__name__))
raise SampleObjError(f'unable to find sample object of type {attrval_type.__name__!r}')
# ListItemAttr allows setting an attribute if its value is None
if type(dobj) == ListItemAttr and getattr(obj,attrname) == None:
setattr(obj,attrname,so)
@ -89,7 +89,7 @@ def test_attr_perm(obj,attrname,perm_name,perm_value,dobj,attrval_type):
elif perm_name == 'delete_ok':
delattr(obj,attrname)
except SampleObjError as e:
rdie(2,'Test script error ({})'.format(e))
rdie(2,f'Test script error ({e})')
except Exception as e:
if perm_value == True:
fs = '{!r}: unable to {} attribute {!r}, though {}ing is allowed ({})'
@ -116,12 +116,12 @@ def test_attr(data,obj,attrname,dobj,bits,attrval_type):
fs = 'init value {iv}={a} for attr {n!r} does not match test data ({iv}={b})'
rdie(2,fs.format(iv=k,n=attrname,a=d[k],b=bits[k]))
if opt.verbose and d[k] == True:
msg_r(' {}={!r}'.format(k,d[k]))
msg_r(f' {k}={d[k]!r}')
if opt.show_nonstandard_init:
for k,v in (('typeconv',False),('set_none_ok',True)):
if d[k] == v:
msg_r(' {}={}'.format(k,v))
msg_r(f' {k}={v}')
def test_object(test_data,objname):

View file

@ -96,7 +96,7 @@ def run_test(test,arg,input_data,arg1,exc_name):
try:
if not opt.super_silent:
arg_disp = repr(arg_copy[0] if type(arg_copy) == tuple else arg_copy)
msg_r((orange,green)[input_data=='good']('{:<22}'.format(arg_disp+':')))
msg_r((green if input_data=='good' else orange)(f'{arg_disp+":":<22}'))
cls = globals()[test]
if opt.getobj:

View file

@ -91,17 +91,17 @@ passwd_data = {
'xmrseed_dfl_αω':td('62f5b72a5ca89cab', 'xmrseed:25:αω','-αω-xmrseed-25','αω xmrseed:25','tequila eden skulls giving jester hospital dreams bakery adjust nanny cactus inwardly films amply nanny soggy vials muppet yellow woken ashtray organs exhale foes eden'),
}
cvr_opts = ' -m trace --count --coverdir={} --file={}'.format(*init_coverage()) if opt.coverage else ''
cmd_base = 'python3{} cmds/mmgen-{{}}gen -qS'.format(cvr_opts)
cvr_opts = ' -m trace --count --coverdir={} --file={}'.format( *init_coverage() ) if opt.coverage else ''
cmd_base = f'python3{cvr_opts} cmds/mmgen-{{}}gen -qS'
def get_cmd_output(cmd):
cp = run(cmd.split(),stdout=PIPE,stderr=PIPE)
if cp.returncode != 0:
ydie(2,'\nSpawned program exited with error code {}:\n{}'.format(cp.returncode,cp.stderr.decode()))
ydie(2,f'\nSpawned program exited with error code {cp.returncode}:\n{cp.stderr.decode()}')
return cp.stdout.decode().splitlines()
def do_test(cmd,tdata,msg_str,addr_desc):
vmsg(green('Executing: {}'.format(cmd)))
vmsg(green(f'Executing: {cmd}'))
msg_r('Testing: ' + msg_str)
lines = get_cmd_output(cmd)
@ -113,9 +113,9 @@ def do_test(cmd,tdata,msg_str,addr_desc):
for k in ref_data:
if cmd_out[k] == ref_data[k]:
s = k.replace('seed','seed[:8]').replace('addr',addr_desc)
vmsg(' {:9}: {}'.format(s,cmd_out[k]))
vmsg(f' {s:9}: {cmd_out[k]}')
else:
rdie(1,'\nError: sc_{} value {} does not match reference value {}'.format(k,cmd_out[k],ref_data[k]))
rdie(1,f'\nError: sc_{k} value {cmd_out[k]} does not match reference value {ref_data[k]}')
msg('OK')
def do_coin_tests():
@ -126,15 +126,15 @@ def do_coin_tests():
continue
coin,mmtype = tname.split('_',1) if '_' in tname else (tname,None)
type_arg = ' --type='+mmtype if mmtype else ''
cmd = cmd_base.format('addr') + ' --coin={}{} test/ref/98831F3A.mmwords 1'.format(coin,type_arg)
do_test(cmd,tdata,'--coin {:4} {:22}'.format(coin.upper(),type_arg),'address')
cmd = cmd_base.format('addr') + f' --coin={coin}{type_arg} test/ref/98831F3A.mmwords 1'
do_test(cmd,tdata,f'--coin {coin.upper():4} {type_arg:22}','address')
def do_passwd_tests():
bmsg('Testing password scramble strings and list IDs')
for tname,tdata in passwd_data.items():
a,b,pwid = tname.split('_')
fmt_arg = '' if a == 'dfl' else '--passwd-fmt={} '.format(a)
len_arg = '' if b == 'dfl' else '--passwd-len={} '.format(b)
fmt_arg = '' if a == 'dfl' else f'--passwd-fmt={a} '
len_arg = '' if b == 'dfl' else f'--passwd-len={b} '
fs = '{}' + fmt_arg + len_arg + '{}' + pwid + ' 1'
cmd = cmd_base.format('pass') + ' ' + fs.format('--accept-defaults ','test/ref/98831F3A.mmwords ')
s = fs.format('','')

View file

@ -40,7 +40,7 @@ def create_shm_dir(data_dir,trash_dir):
try:
run(['python3',os.path.join('cmds','mmgen-regtest'),'stop'],check=True)
except:
rdie(1,"Unable to remove {!r}!".format(tdir))
rdie(1,f'Unable to remove {tdir!r}!')
else:
time.sleep(2)
shutil.rmtree(tdir)
@ -49,14 +49,14 @@ def create_shm_dir(data_dir,trash_dir):
else:
tdir,pfx = '/dev/shm','mmgen-test-'
try:
run('rm -rf {}/{}*'.format(tdir,pfx),shell=True,check=True)
run(f'rm -rf {tdir}/{pfx}*',shell=True,check=True)
except Exception as e:
die(2,'Unable to delete directory tree {}/{}* ({})'.format(tdir,pfx,e.args[0]))
die(2,f'Unable to delete directory tree {tdir}/{pfx}* ({e.args[0]})')
try:
import tempfile
shm_dir = str(tempfile.mkdtemp('',pfx,tdir))
except Exception as e:
die(2,'Unable to create temporary directory in {} ({})'.format(tdir,e.args[0]))
die(2,f'Unable to create temporary directory in {tdir} ({e.args[0]})')
dest = os.path.join(shm_dir,os.path.basename(trash_dir))
os.mkdir(dest,0o755)
@ -164,7 +164,9 @@ def add_cmdline_opts():
sys.argv.insert(1,'--data-dir=' + data_dir)
sys.argv.insert(1,'--daemon-data-dir=test/daemons/' + get_coin())
from mmgen.daemon import CoinDaemon
sys.argv.insert(1,'--rpc-port={}'.format(CoinDaemon(network_id,test_suite=True).rpc_port))
sys.argv.insert(1,'--rpc-port={}'.format(
CoinDaemon(network_id,test_suite=True).rpc_port
))
# add_cmdline_opts()
@ -374,7 +376,7 @@ def fixup_cfgs():
cfgs['2'+k].update(cfgs[k])
for k in cfgs:
cfgs[k]['tmpdir'] = os.path.join('test','tmp{}'.format(k))
cfgs[k]['tmpdir'] = os.path.join('test',f'tmp{k}')
cfgs[k]['segwit'] = randbool() if opt.segwit_random else bool(opt.segwit or opt.bech32)
from copy import deepcopy
@ -402,15 +404,18 @@ def list_cmds():
cw = max(max(len(k) for k in gm.dpy_data),cw)
for gname,gdesc,clist,dpdata in d:
Msg('\n'+green('{!r} - {}:'.format(gname,gdesc)))
Msg('\n'+green(f'{gname!r} - {gdesc}:'))
for cmd in clist:
data = dpdata[cmd]
Msg(' {:{w}} - {}'.format(cmd,data if type(data) == str else data[1],w=cw))
Msg(' {:{w}} - {}'.format(
cmd,
(data if type(data) == str else data[1]),
w = cw ))
w = max(map(len,utils))
Msg('\n'+green('AVAILABLE UTILITIES:'))
for cmd in sorted(utils):
Msg(' {:{w}} - {}'.format(cmd,utils[cmd],w=w))
Msg(' {:{w}} - {}'.format( cmd, utils[cmd], w=w ))
sys.exit(0)
@ -433,12 +438,15 @@ def clean(usr_dirs=None):
if d in all_dirs:
cleandir(all_dirs[d])
else:
die(1,'{}: invalid directory number'.format(d))
die(1,f'{d}: invalid directory number')
if dirlist:
iqmsg(green('Cleaned tmp director{} {}'.format(suf(dirlist,'ies'),' '.join(dirlist))))
iqmsg(green('Cleaned tmp director{} {}'.format(
suf(dirlist,'ies'),
' '.join(dirlist))
))
cleandir(data_dir)
cleandir(trash_dir)
iqmsg(green("Cleaned directories '{}'".format("' '".join([data_dir,trash_dir]))))
iqmsg(green(f'Cleaned directories {data_dir!r} {trash_dir!r}'))
def create_tmp_dirs(shm_dir):
if g.platform == 'win':
@ -530,7 +538,7 @@ class CmdGroupMgr(object):
if modname == None and 'modname' in kwargs:
modname = kwargs['modname']
import importlib
modpath = 'test.test_py_d.ts_{}'.format(modname or gname)
modpath = f'test.test_py_d.ts_{modname or gname}'
return getattr(importlib.import_module(modpath),clsname)
def create_group(self,gname,full_data=False,modname=None,is3seed=False,add_dpy=False):
@ -557,14 +565,14 @@ class CmdGroupMgr(object):
for a,b in cls.cmd_group:
if is3seed:
for n,(i,j) in enumerate(zip(cls.tmpdir_nums,(128,192,256))):
k = '{}_{}'.format(a,n+1)
k = f'{a}_{n+1}'
if hasattr(cls,'skip_cmds') and k in cls.skip_cmds:
continue
sdeps = get_shared_deps(k,i)
if type(b) == str:
cdata.append( (k, (i,'{} ({}-bit)'.format(b,j),[[[]+sdeps,i]])) )
cdata.append( (k, (i,f'{b} ({j}-bit)',[[[]+sdeps,i]])) )
else:
cdata.append( (k, (i,'{} ({}-bit)'.format(b[1],j),[[b[0]+sdeps,i]])) )
cdata.append( (k, (i,f'{b[1]} ({j}-bit)',[[b[0]+sdeps,i]])) )
else:
cdata.append( (a, b if full_data else (cls.tmpdir_nums[0],b,[[[],cls.tmpdir_nums[0]]])) )
@ -597,7 +605,7 @@ class CmdGroupMgr(object):
and g[0] in tuple(self.cmd_groups_dfl) + tuple(usr_args) ]
for name,cls in ginfo:
msg('{:17} - {}'.format(name,cls.__doc__))
msg(f'{name:17} - {cls.__doc__}')
Die(0,'\n'+' '.join(e[0] for e in ginfo))
@ -608,7 +616,7 @@ class CmdGroupMgr(object):
"""
if group:
if not group in [e[0] for e in self.cmd_groups]:
die(1,'{!r}: unrecognized group'.format(group))
die(1,f'{group!r}: unrecognized group')
groups = [self.cmd_groups[group]]
else:
groups = self.cmd_groups
@ -626,6 +634,7 @@ class TestSuiteRunner(object):
'test suite runner'
def __init__(self,data_dir,trash_dir):
self.data_dir = data_dir
self.trash_dir = trash_dir
self.cmd_total = 0
@ -636,23 +645,23 @@ class TestSuiteRunner(object):
if opt.log:
self.log_fd = open(log_file,'a')
self.log_fd.write('\nLog started: {} UTC\n'.format(make_timestr()))
omsg('INFO → Logging to file {!r}'.format(log_file))
self.log_fd.write(f'\nLog started: {make_timestr()} UTC\n')
omsg(f'INFO → Logging to file {log_file!r}')
else:
self.log_fd = None
if opt.coverage:
self.coverdir,self.accfile = init_coverage()
omsg('INFO → Writing coverage files to {!r}'.format(self.coverdir))
omsg(f'INFO → Writing coverage files to {self.coverdir!r}')
def spawn_wrapper( self, cmd,
args = [],
extra_desc = '',
no_output = False,
msg_only = False,
no_msg = False,
cmd_dir = 'cmds',
no_traceback = False ):
def spawn_wrapper(self,cmd,
args = [],
extra_desc = '',
no_output = False,
msg_only = False,
no_msg = False,
cmd_dir = 'cmds',
no_traceback = False ):
desc = self.ts.test_name if opt.names else self.gm.dpy_data[self.ts.test_name][1]
if extra_desc: desc += ' ' + extra_desc
@ -662,9 +671,10 @@ class TestSuiteRunner(object):
elif g.platform == 'win':
cmd = os.path.join('/mingw64','opt','bin',cmd)
passthru_opts = ['--{}{}'.format(k.replace('_','-'),
'=' + getattr(opt,k) if getattr(opt,k) != True else '')
for k in self.ts.base_passthru_opts + self.ts.passthru_opts if getattr(opt,k)]
passthru_opts = ['--{}{}'.format(
k.replace('_','-'),
'=' + getattr(opt,k) if getattr(opt,k) != True else ''
) for k in self.ts.base_passthru_opts + self.ts.passthru_opts if getattr(opt,k)]
args = [cmd] + passthru_opts + self.ts.extra_spawn_args + args
@ -682,7 +692,10 @@ class TestSuiteRunner(object):
if opt.coverage:
args = ['python3','-m','trace','--count','--coverdir='+self.coverdir,'--file='+self.accfile] + args
qargs = ['{q}{}{q}'.format(a,q=('',"'")[' ' in a]) for a in args]
qargs = ['{q}{}{q}'.format(
a,
q = "'" if ' ' in a else ''
) for a in args]
cmd_disp = ' '.join(qargs).replace('\\','/') # for mingw
if not no_msg:
@ -735,9 +748,9 @@ class TestSuiteRunner(object):
def gen_msg():
yield ('{g}:{c}' if cmd else 'test group {g!r}').format(g=gname,c=cmd)
if len(ts_cls.networks) != 1:
yield ' for {} {}'.format(proto.coin,proto.network)
yield f' for {proto.coin} {proto.network}'
if segwit_opt:
yield ' (--{})'.format(segwit_opt.replace('_','-'))
yield ' (--{})'.format( segwit_opt.replace('_','-') )
m = ''.join(gen_msg())
@ -769,7 +782,7 @@ class TestSuiteRunner(object):
omsg(f'INFO → Resuming at command {resume!r}')
if opt.exit_after and opt.exit_after not in self.gm.cmd_list:
die(1,'{!r}: command not recognized'.format(opt.exit_after))
die(1,f'{opt.exit_after!r}: command not recognized')
return True
@ -809,13 +822,13 @@ class TestSuiteRunner(object):
do_between()
gname_save = gname
else:
die(1,'{!r}: command not recognized'.format(arg))
die(1,f'{arg!r}: command not recognized')
else:
if opt.exclude_groups:
exclude = opt.exclude_groups.split(',')
for e in exclude:
if e not in self.gm.cmd_groups_dfl:
die(1,'{!r}: group not recognized'.format(e))
die(1,f'{e!r}: group not recognized')
for gname in self.gm.cmd_groups_dfl:
if opt.exclude_groups and gname in exclude:
continue
@ -827,13 +840,11 @@ class TestSuiteRunner(object):
self.end_msg()
def check_needs_rerun(self,
cmd,
build=False,
root=True,
force_delete=False,
dpy=False
):
def check_needs_rerun(self,cmd,
build = False,
root = True,
force_delete = False,
dpy = False ):
self.ts.test_name = cmd
rerun = root # force_delete is not passed to recursive call
@ -893,13 +904,14 @@ class TestSuiteRunner(object):
if resume:
if cmd != resume:
return
bmsg('Resuming at {!r}'.format(cmd))
bmsg(f'Resuming at {cmd!r}')
resume = False
opt.skip_deps = False
if opt.profile: start = time.time()
if opt.profile:
start = time.time()
self.ts.test_name = cmd # NB: Do not remove, this needs to set twice
self.ts.test_name = cmd # NB: Do not remove, this needs to be set twice
cdata = self.gm.dpy_data[cmd]
# self.ts.test_dpydata = cdata
self.ts.tmpdir_num = cdata[0]
@ -919,7 +931,7 @@ class TestSuiteRunner(object):
self.process_retval(cmd,ret)
if opt.profile:
omsg('\r\033[50C{:.4f}'.format(time.time() - start))
omsg('\r\033[50C{:.4f}'.format( time.time() - start ))
if cmd == opt.exit_after:
sys.exit(0)
@ -943,19 +955,19 @@ class TestSuiteRunner(object):
self.skipped_warnings.append(
'Test {!r} was skipped:\n {}'.format(cmd,'\n '.join(ret[1].split('\n'))))
else:
rdie(1,'{!r} returned {}'.format(cmd,ret))
rdie(1,f'{cmd!r} returned {ret}')
def check_deps(self,cmds): # TODO: broken
if len(cmds) != 1:
die(1,'Usage: {} check_deps <command>'.format(g.prog_name))
die(1,f'Usage: {g.prog_name} check_deps <command>')
cmd = cmds[0]
if cmd not in self.gm.cmd_list:
die(1,'{!r}: unrecognized command'.format(cmd))
die(1,f'{cmd!r}: unrecognized command')
if not opt.quiet:
omsg('Checking dependencies for {!r}'.format(cmd))
omsg(f'Checking dependencies for {cmd!r}')
self.check_needs_rerun(self.ts,cmd,build=False)
@ -963,7 +975,7 @@ class TestSuiteRunner(object):
for cmd in self.rebuild_list:
c = self.rebuild_list[cmd]
m = 'Rebuild' if (c[0] and c[1]) else 'Build' if c[0] else 'OK'
omsg('cmd {:<{w}} {}'.format(cmd+':', m, w=w))
omsg('cmd {:<{w}} {}'.format( cmd+':', m, w=w ))
def generate_file_deps(self,cmd):
return [(str(n),e) for exts,n in self.gm.dpy_data[cmd][2] for e in exts]
@ -975,14 +987,14 @@ class TestSuiteRunner(object):
try:
num = str(self.gm.dpy_data[cmd][0])
except KeyError:
qmsg_r('Missing dependency {!r}'.format(cmd))
qmsg_r(f'Missing dependency {cmd!r}')
gname = self.gm.find_cmd_in_groups(cmd)
if gname:
kwargs = self.gm.cmd_groups[gname][1]
kwargs.update({'add_dpy':True})
self.gm.create_group(gname,**kwargs)
num = str(self.gm.dpy_data[cmd][0])
qmsg(' found in group {!r}'.format(gname))
qmsg(f' found in group {gname!r}')
else:
qmsg(' not found in any command group!')
raise

View file

@ -72,7 +72,7 @@ def ok_msg():
sys.stderr.write(green('\nOK\n') if opt.exact_output or opt.verbose else ' OK\n')
def skip(name,reason=None):
msg('Skipping {}{}'.format(name,' ({})'.format(reason) if reason else ''))
msg('Skipping {}{}'.format( name, f' ({reason})' if reason else '' ))
return 'skip'
def confirm_continue():

View file

@ -136,9 +136,10 @@ class TestSuiteAutosign(TestSuiteBase):
def do_autosign_live(opts,mountpoint,led_opts=[],gen_wallet=True):
omsg(purple('Running autosign test with {}'.format(
f"'{' '.join(led_opts)}'" if led_opts else 'no LED'
)))
omsg(purple(
'Running autosign test with ' +
(f"'{' '.join(led_opts)}'" if led_opts else 'no LED')
))
def do_mount():
try: run(['mount',mountpoint],check=True)
@ -173,7 +174,7 @@ class TestSuiteAutosign(TestSuiteBase):
def do_loop():
omsg(blue(m2))
t.expect('{} transactions signed'.format(txcount))
t.expect(f'{txcount} transactions signed')
if not led_opts:
t.expect('2 transactions failed to sign')
t.expect('Waiting')
@ -197,7 +198,7 @@ class TestSuiteAutosign(TestSuiteBase):
def do_autosign(opts,mountpoint,mn_type=None,short=False):
def autosign_expect(t,txcount):
t.expect('{} transactions signed'.format(txcount))
t.expect(f'{txcount} transactions signed')
t.expect('2 transactions failed to sign')
t.expect('Waiting')
t.kill(2)

View file

@ -43,7 +43,7 @@ class TestSuiteBase(object):
self.spawn = spawn
self.have_dfl_wallet = False
self.usr_rand_chars = (5,30)[bool(opt.usr_random)]
self.usr_rand_arg = '-r{}'.format(self.usr_rand_chars)
self.usr_rand_arg = f'-r{self.usr_rand_chars}'
self.altcoin_pfx = '' if self.proto.base_coin == 'BTC' else '-'+self.proto.base_coin
self.tn_ext = ('','.testnet')[self.proto.testnet]
d = {'bch':'btc','btc':'btc','ltc':'ltc'}
@ -74,7 +74,7 @@ class TestSuiteBase(object):
def skip_for_win(self):
if g.platform == 'win':
msg("Skipping test '{}': not supported on MSys2 platform".format(self.test_name))
msg(f'Skipping test {self.test_name!r}: not supported on MSys2 platform')
return True
else:
return False

View file

@ -200,7 +200,7 @@ class TestSuiteCfg(TestSuiteBase):
txt = 'mnemonic_entry_modes mmgen:full bip39:short'
write_to_file(self.path('usr'),txt+'\n')
imsg(yellow(f'Wrote cfg file: "{txt}"'))
imsg(yellow(f'Wrote cfg file: {txt!r}'))
return run("{'mmgen': 'full', 'bip39': 'short'}")
def chain_names(self):
@ -209,7 +209,7 @@ class TestSuiteCfg(TestSuiteBase):
for coin,chain_chk in (('ETH',chk),('ETC',None)):
t = self.spawn_test(
args = [f'--coin={coin}',f'--testnet={(0,1)[testnet]}','coin_specific_vars','chain_names'],
extra_desc = f'({coin} testnet={testnet} chain_names={chain_chk})' )
extra_desc = f'({coin} testnet={testnet!r:5} chain_names={chain_chk})' )
chain = t.expect_getend('chain_names: ')
if chain_chk:
assert chain == chain_chk, f'{chain} != {chain_chk}'
@ -221,13 +221,13 @@ class TestSuiteCfg(TestSuiteBase):
txt = 'eth_mainnet_chain_names istanbul constantinople'
write_to_file(self.path('usr'),txt+'\n')
imsg(yellow(f'Wrote cfg file: "{txt}"'))
imsg(yellow(f'Wrote cfg file: {txt!r}'))
t = run("['istanbul', 'constantinople']",False)
t = run(None,True)
txt = 'eth_testnet_chain_names rinkeby'
write_to_file(self.path('usr'),txt+'\n')
imsg(yellow(f'Wrote cfg file: "{txt}"'))
imsg(yellow(f'Wrote cfg file: {txt!r}'))
t = run(None,False)
t = run("['rinkeby']",True)

View file

@ -348,7 +348,12 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
@property
def eth_args(self):
return ['--outdir={}'.format(self.tmpdir),'--coin='+self.proto.coin,'--rpc-port={}'.format(self.rpc_port),'--quiet']
return [
f'--outdir={self.tmpdir}',
f'--coin={self.proto.coin}',
f'--rpc-port={self.rpc_port}',
'--quiet'
]
async def setup(self):
self.spawn('',msg_only=True)
@ -368,7 +373,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
return False
from mmgen.rpc import rpc_init
rpc = await rpc_init(self.proto)
imsg('Daemon: {} v{}'.format(rpc.daemon.coind_name,rpc.daemon_version_str))
imsg(f'Daemon: {rpc.daemon.coind_name} v{rpc.daemon_version_str}')
return 'ok'
@ -443,7 +448,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
def wallet_upgrade(self,src_file):
if self.proto.coin == 'ETC':
msg('skipping test {!r} for ETC'.format(self.test_name))
msg(f'skipping test {self.test_name!r} for ETC')
return 'skip'
src_dir = joinpath(ref_dir,'ethereum')
dest_dir = joinpath(self.tr.data_dir,'altcoins',self.proto.coin.lower())
@ -510,7 +515,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
fee_desc = 'gas price',
no_read = False,
tweaks = [] ):
fee_res = r'\D{}\D.*{c} .*\D{}\D.*gas price in Gwei'.format(*fee_res_data,c=self.proto.coin)
fee_res = r'\D{}\D.*{c} .*\D{}\D.*gas price in Gwei'.format( *fee_res_data, c=self.proto.coin )
t = self.spawn('mmgen-'+caller, self.eth_args + ['-B'] + args)
t.expect(r'add \[l\]abel, .*?:.','p', regex=True)
t.written_to_file('Account balances listing')
@ -536,7 +541,9 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
keyfile = joinpath(self.tmpdir,parity_devkey_fn)
txfile = self.get_file_with_ext(ext,no_dot=True)
t = self.spawn( 'mmgen-txsign',
['--outdir={}'.format(self.tmpdir),'--coin='+self.proto.coin,'--quiet']
[f'--outdir={self.tmpdir}']
+ [f'--coin={self.proto.coin}']
+ ['--quiet']
+ ['--rpc-host=bad_host'] # ETH signing must work without RPC
+ add_args
+ ([],['--yes'])[ni]
@ -672,7 +679,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
addr,amt,adj = b if len(b) == 3 else b + (False,)
if adj and self.proto.coin == 'ETC':
amt = str(Decimal(amt) + Decimal(adj[1]) * self.bal_corr)
pat = r'\D{}\D.*\D{}\D'.format(addr,amt.replace('.',r'\.'))
pat = r'\D{}\D.*\D{}\D'.format( addr, amt.replace('.',r'\.') )
assert re.search(pat,text), pat
ss = f'Total {self.proto.coin}:'
assert re.search(ss,text),ss
@ -685,7 +692,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
addr,_amt1,_amt2,adj = b if len(b) == 4 else b + (False,)
if adj and self.proto.coin == 'ETC':
_amt2 = str(Decimal(_amt2) + Decimal(adj[1]) * self.bal_corr)
pat = r'{}\b.*\D{}\D.*\b{}\D'.format(addr,_amt1,_amt2)
pat = rf'{addr}\b.*\D{_amt1}\D.*\b{_amt2}\D'
assert re.search(pat,text), pat
ss = 'Total MM1:'
assert re.search(ss,text),ss
@ -712,7 +719,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
def chk_label(self,lbl_pat,addr='98831F3A:E:3'):
t = self.spawn('mmgen-tool', self.eth_args + ['listaddresses','all_labels=1'])
t.expect(r'{}\b.*\S{{30}}\b.*{}\b'.format(addr,lbl_pat),regex=True)
t.expect(rf'{addr}\b.*\S{{30}}\b.*{lbl_pat}\b',regex=True)
return t
def add_label1(self): return self.add_label(lbl=tw_label_zh)
@ -731,8 +738,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
imsg(f'Using precompiled contract data in {odir}')
return 'skip' if os.path.exists(odir) else False
self.spawn('',msg_only=True)
cmd_args = ['--{}={}'.format(k,v) for k,v in list(token_data.items())]
imsg("Compiling solidity token contract '{}' with 'solc'".format(token_data['symbol']))
cmd_args = [f'--{k}={v}' for k,v in list(token_data.items())]
imsg("Compiling solidity token contract '{}' with 'solc'".format( token_data['symbol'] ))
try: os.mkdir(odir)
except: pass
cmd = [
@ -741,12 +748,12 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
'--coin=' + self.proto.coin,
'--outdir=' + odir
] + cmd_args + [CoinProtocol.Ethereum.checksummed_addr(dfl_devaddr)]
imsg("Executing: {}".format(' '.join(cmd)))
imsg('Executing: {}'.format( ' '.join(cmd) ))
cp = run(cmd,stdout=DEVNULL,stderr=PIPE)
if cp.returncode != 0:
rmsg('solc failed with the following output:')
ydie(2,cp.stderr.decode())
imsg("ERC20 token '{}' compiled".format(token_data['symbol']))
imsg('ERC20 token {!r} compiled'.format( token_data['symbol'] ))
return 'ok'
def token_compile1(self):
@ -773,12 +780,14 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
async def token_deploy(self,num,key,gas,mmgen_cmd='txdo',tx_fee='8G'):
keyfile = joinpath(self.tmpdir,parity_devkey_fn)
fn = joinpath(self.tmpdir,'mm'+str(num),key+'.bin')
args = ['-B',
'--tx-fee='+tx_fee,
'--tx-gas={}'.format(gas),
'--contract-data='+fn,
'--inputs='+dfl_devaddr,
'--yes' ]
args = [
'-B',
f'--tx-fee={tx_fee}',
f'--tx-gas={gas}',
f'--contract-data={fn}',
f'--inputs={dfl_devaddr}',
'--yes',
]
if mmgen_cmd == 'txdo':
args += ['-k',keyfile]
t = self.spawn( 'mmgen-'+mmgen_cmd, self.eth_args + args)
@ -823,7 +832,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
self.spawn('',msg_only=True)
sid = dfl_sid
from mmgen.tool import MMGenToolCmdWallet
usr_mmaddrs = ['{}:E:{}'.format(sid,i) for i in (11,21)]
usr_mmaddrs = [f'{sid}:E:{i}' for i in (11,21)]
usr_addrs = [MMGenToolCmdWallet(proto=self.proto).gen_addr(addr,dfl_words_file) for addr in usr_mmaddrs]
from mmgen.altcoins.eth.contract import TokenResolve
@ -835,8 +844,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
rpc,
self.read_from_tmpfile(f'token_addr{i+1}').strip() )
imsg_r( '\n' + await tk.info() )
imsg('dev token balance (pre-send): {}'.format(await tk.get_balance(dfl_devaddr)))
imsg('Sending {} {} to address {} ({})'.format(amt,self.proto.dcoin,usr_addrs[i],usr_mmaddrs[i]))
imsg('dev token balance (pre-send): {}'.format( await tk.get_balance(dfl_devaddr) ))
imsg(f'Sending {amt} {self.proto.dcoin} to address {usr_addrs[i]} ({usr_mmaddrs[i]})')
txid = await tk.transfer(
dfl_devaddr,
usr_addrs[i],
@ -853,10 +862,12 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
self.proto,
rpc,
self.read_from_tmpfile(f'token_addr{i+1}').strip() )
imsg('Token: {}'.format(await tk.get_symbol()))
imsg('dev token balance: {}'.format(await tk.get_balance(dfl_devaddr)))
imsg('Token: {}'.format( await tk.get_symbol() ))
imsg(f'dev token balance: {await tk.get_balance(dfl_devaddr)}')
imsg('usr token balance: {} ({} {})'.format(
await tk.get_balance(usr_addrs[i]),usr_mmaddrs[i],usr_addrs[i]))
await tk.get_balance(usr_addrs[i]),
usr_mmaddrs[i],
usr_addrs[i] ))
from mmgen.rpc import rpc_init
rpc = await rpc_init(self.proto)
@ -1068,7 +1079,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
t.expect(' main menu): ',n)
t.expect('Is this what you want? (y/N): ','y')
t.expect('[R]efresh balance:\b','q')
t.expect('Total unspent:.*\D{}\D.*{}'.format(total,total_coin),regex=True)
t.expect(f'Total unspent:.*\D{total}\D.*{total_coin}',regex=True)
t.read()
return t

View file

@ -160,7 +160,7 @@ class TestSuiteInput(TestSuiteBase):
pw = 'abc-α'
t.expect(prompt,pw)
ret = t.expect_getend('Entered: ')
assert ret == pw,'Password mismatch! {} != {}'.format(ret,pw)
assert ret == pw, f'Password mismatch! {ret} != {pw}'
return t
def password_entry_noecho(self):
@ -190,7 +190,7 @@ class TestSuiteInput(TestSuiteBase):
regex = True )
t.expect('Using (.+) entry mode',regex=True)
mode = strip_ansi_escapes(t.p.match.group(1)).lower()
assert mode == mne.em.name.lower(), '{} != {}'.format(mode,mne.em.name.lower())
assert mode == mne.em.name.lower(), f'{mode} != {mne.em.name.lower()}'
stealth_mnemonic_entry(t,mne,mn,entry_mode=entry_mode,pad_entry=pad_entry)
t.expect(sample_mn[fmt]['hex'])
t.read()
@ -198,7 +198,7 @@ class TestSuiteInput(TestSuiteBase):
def _user_seed_entry(self,fmt,usr_rand=False,out_fmt=None,entry_mode='full',mn=None):
wcls = Wallet.fmt_code_to_type(fmt)
wf = os.path.join(ref_dir,'FE3C6545.{}'.format(wcls.ext))
wf = os.path.join(ref_dir,f'FE3C6545.{wcls.ext}')
if wcls.wclass == 'mnemonic':
mn = mn or read_from_file(wf).strip().split()
elif wcls.wclass == 'dieroll':
@ -206,7 +206,7 @@ class TestSuiteInput(TestSuiteBase):
for idx,val in ((5,'x'),(18,'0'),(30,'7'),(44,'9')):
mn.insert(idx,val)
t = self.spawn('mmgen-walletconv',['-r10','-S','-i',fmt,'-o',out_fmt or fmt])
t.expect('{} type:.*{}'.format(capfirst(wcls.wclass),wcls.mn_type),regex=True)
t.expect(f'{capfirst(wcls.wclass)} type:.*{wcls.mn_type}',regex=True)
t.expect(wcls.choose_seedlen_prompt,'1')
t.expect('(Y/n): ','y')
if wcls.wclass == 'mnemonic':
@ -217,7 +217,7 @@ class TestSuiteInput(TestSuiteBase):
t.expect('Type a number.*: ',str(mne.entry_modes.index(entry_mode)+1),regex=True)
t.expect('Using (.+) entry mode',regex=True)
mode = strip_ansi_escapes(t.p.match.group(1)).lower()
assert mode == mne.em.name.lower(), '{} != {}'.format(mode,mne.em.name.lower())
assert mode == mne.em.name.lower(), f'{mode} != {mne.em.name.lower()}'
stealth_mnemonic_entry(t,mne,mn,entry_mode=entry_mode)
elif wcls.wclass == 'dieroll':
user_dieroll_entry(t,mn)
@ -228,9 +228,9 @@ class TestSuiteInput(TestSuiteBase):
t.expect(wcls.user_entropy_prompt,'n')
if not usr_rand:
sid_chk = 'FE3C6545'
sid = t.expect_getend('Valid {} for Seed ID '.format(wcls.desc))
sid = t.expect_getend(f'Valid {wcls.desc} for Seed ID ')
sid = strip_ansi_escapes(sid.split(',')[0])
assert sid == sid_chk,'Seed ID mismatch! {} != {}'.format(sid,sid_chk)
assert sid == sid_chk, f'Seed ID mismatch! {sid} != {sid_chk}'
t.expect('to confirm: ','YES\n')
t.read()
return t

View file

@ -313,7 +313,9 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
else f'{al_id}:{idx}{lbl}' ),
'vout': int(getrandnum(4) % 8),
'txid': os.urandom(32).hex(),
'amount': self.proto.coin_amt('{}.{}'.format(amt1 + getrandnum(4) % amt2, getrandnum(4) % 100000000)),
'amount': self.proto.coin_amt('{}.{}'.format(
amt1 + getrandnum(4) % amt2,
getrandnum(4) % 100000000 )),
'address': coinaddr,
'spendable': False,
'scriptPubKey': f'{s_beg}{coinaddr.hex}{s_end}',
@ -387,18 +389,28 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
for k in self.cfgs:
self.cfgs[k]['amts'] = [None,None]
for idx,mod in enumerate(mods):
self.cfgs[k]['amts'][idx] = '{}.{}'.format(getrandnum(4) % mod, str(getrandnum(4))[:5])
self.cfgs[k]['amts'][idx] = '{}.{}'.format(
getrandnum(4) % mod,
str(getrandnum(4))[:5] )
cmd_args = ['--outdir='+self.tmpdir]
for num in tx_data:
s = tx_data[num]
cmd_args += [
'{}:{},{}'.format(s['al_id'],s['addr_idxs'][0],self.cfgs[num]['amts'][0]),
]
'{}:{},{}'.format(
s['al_id'],
s['addr_idxs'][0],
self.cfgs[num]['amts'][0] )]
# + one change address and one BTC address
if num is list(tx_data.keys())[-1]:
cmd_args += ['{}:{}'.format(s['al_id'],s['addr_idxs'][1])]
cmd_args += ['{},{}'.format(rand_coinaddr,self.cfgs[num]['amts'][1])]
cmd_args += [
'{}:{}'.format(
s['al_id'],
s['addr_idxs'][1] )]
cmd_args += [
'{},{}'.format(
rand_coinaddr,
self.cfgs[num]['amts'][1] )]
return cmd_args + [tx_data[num]['addrfile'] for num in tx_data]
@ -427,11 +439,14 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
if cmdline_inputs:
from mmgen.tx import TwLabel
cmd_args = ['--inputs={},{},{},{},{},{}'.format(
TwLabel(self.proto,dfake[0][self.lbl_id]).mmid,dfake[1]['address'],
TwLabel(self.proto,dfake[2][self.lbl_id]).mmid,dfake[3]['address'],
TwLabel(self.proto,dfake[4][self.lbl_id]).mmid,dfake[5]['address']
),'--outdir='+self.tr.trash_dir] + cmd_args[1:]
cmd_args = [
'--inputs={},{},{},{},{},{}'.format(
TwLabel(self.proto,dfake[0][self.lbl_id]).mmid,dfake[1]['address'],
TwLabel(self.proto,dfake[2][self.lbl_id]).mmid,dfake[3]['address'],
TwLabel(self.proto,dfake[4][self.lbl_id]).mmid,dfake[5]['address']
),
f'--outdir={self.tr.trash_dir}'
] + cmd_args[1:]
end_silence()
@ -552,7 +567,9 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
f,t = self._walletconv_export(wf,out_fmt=out_fmt,pf=pf)
silence()
wcls = Wallet.fmt_code_to_type(out_fmt)
msg('==> {}: {}'.format(wcls.desc,cyan(get_data_from_file(f,wcls.desc))))
msg('==> {}: {}'.format(
wcls.desc,
cyan(get_data_from_file(f,wcls.desc)) ))
end_silence()
return t
@ -609,7 +626,7 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
t.expect_getend('Incog Wallet ID: ')
wcls = Wallet.fmt_code_to_type(in_fmt)
t.hash_preset(wcls.desc,'1')
t.passphrase('{} \w{{8}}'.format(wcls.desc),self.wpasswd)
t.passphrase(f'{wcls.desc} \w{{8}}',self.wpasswd)
vmsg('Comparing generated checksum with checksum from address file')
chk = t.expect_getend(r'Checksum for address data .*?: ',regex=True)
verify_checksum_or_exit(self._get_addrfile_checksum(),chk)

View file

@ -34,7 +34,7 @@ class TestSuiteOpts(TestSuiteBase):
def check_vals(self,args,vals):
t = self.spawn_prog(args)
for k,v in vals:
t.expect(r'{}:\s+{}'.format(k,v),regex=True)
t.expect(rf'{k}:\s+{v}',regex=True)
t.read()
return t
@ -82,7 +82,7 @@ class TestSuiteOpts(TestSuiteBase):
'--passwd-file='+pf,
'--outdir='+self.tmpdir,
'--subseeds=200',
'--hidden-incog-input-params={},123'.format(pf),
f'--hidden-incog-input-params={pf},123',
],
(
('opt.print_checksum', 'True'),

View file

@ -137,7 +137,9 @@ class TestSuiteRef(TestSuiteBase,TestSuiteShared):
@property
def nw_desc(self):
return '{} {}'.format(self.proto.coin,('Mainnet','Testnet')[self.proto.testnet])
return '{} {}'.format(
self.proto.coin,
('Mainnet','Testnet')[self.proto.testnet] )
def _get_ref_subdir_by_coin(self,coin):
return {'btc': '',
@ -165,30 +167,30 @@ class TestSuiteRef(TestSuiteBase,TestSuiteShared):
args = ['-d',self.tr.trash_dir,'-o',ocls.fmt_codes[-1],wf,ss_idx]
t = self.spawn('mmgen-subwalletgen',args,extra_desc='(generate subwallet)')
t.expect('Generating subseed {}'.format(ss_idx))
chk_sid = self.chk_data['ref_subwallet_sid']['98831F3A:{}'.format(ss_idx)]
t.expect(f'Generating subseed {ss_idx}')
chk_sid = self.chk_data['ref_subwallet_sid'][f'98831F3A:{ss_idx}']
fn = t.written_to_file(capfirst(ocls.desc))
assert chk_sid in fn,'incorrect filename: {} (does not contain {})'.format(fn,chk_sid)
assert chk_sid in fn,f'incorrect filename: {fn} (does not contain {chk_sid})'
ok()
t = self.spawn('mmgen-walletchk',[fn],extra_desc='(check subwallet)')
t.expect(r'Valid MMGen native mnemonic data for Seed ID ([0-9A-F]*)\b',regex=True)
sid = t.p.match.group(1)
assert sid == chk_sid,'subseed ID {} does not match expected value {}'.format(sid,chk_sid)
assert sid == chk_sid,f'subseed ID {sid} does not match expected value {chk_sid}'
t.read()
return t
def ref_subwallet_addrgen(self,ss_idx,target='addr'):
wf = dfl_words_file
args = ['-d',self.tr.trash_dir,'--subwallet='+ss_idx,wf,'1-10']
t = self.spawn('mmgen-{}gen'.format(target),args)
t.expect('Generating subseed {}'.format(ss_idx))
chk_sid = self.chk_data['ref_subwallet_sid']['98831F3A:{}'.format(ss_idx)]
t = self.spawn(f'mmgen-{target}gen',args)
t.expect(f'Generating subseed {ss_idx}')
chk_sid = self.chk_data['ref_subwallet_sid'][f'98831F3A:{ss_idx}']
assert chk_sid == t.expect_getend('Checksum for .* data ',regex=True)[:8]
if target == 'key':
t.expect('Encrypt key list? (y/N): ','n')
fn = t.written_to_file(('Addresses','Secret keys')[target=='key'])
assert chk_sid in fn,'incorrect filename: {} (does not contain {})'.format(fn,chk_sid)
assert chk_sid in fn,f'incorrect filename: {fn} (does not contain {chk_sid})'
return t
def ref_subwallet_addrgen1(self):
@ -215,7 +217,7 @@ class TestSuiteRef(TestSuiteBase,TestSuiteShared):
pat = None ):
pat = pat or f'{self.nw_desc}.*Legacy'
af_key = 'ref_{}file'.format(ftype) + ('_' + id_key if id_key else '')
af_key = f'ref_{ftype}file' + ('_' + id_key if id_key else '')
af_fn = TestSuiteRef.sources[af_key].format(pfx or self.altcoin_pfx,'' if coin else self.tn_ext)
af = joinpath(ref_dir,(subdir or self.ref_subdir,'')[ftype=='passwd'],af_fn)
coin_arg = [] if coin == None else ['--coin='+coin]
@ -237,12 +239,12 @@ class TestSuiteRef(TestSuiteBase,TestSuiteShared):
def ref_segwitaddrfile_chk(self):
if not 'S' in self.proto.mmtypes:
return skip(f'not supported by {self.proto.cls_name} protocol')
return self.ref_addrfile_chk(ftype='segwitaddr',pat='{}.*Segwit'.format(self.nw_desc))
return self.ref_addrfile_chk(ftype='segwitaddr',pat=f'{self.nw_desc}.*Segwit')
def ref_bech32addrfile_chk(self):
if not 'B' in self.proto.mmtypes:
return skip(f'not supported by {self.proto.cls_name} protocol')
return self.ref_addrfile_chk(ftype='bech32addr',pat='{}.*Bech32'.format(self.nw_desc))
return self.ref_addrfile_chk(ftype='bech32addr',pat=f'{self.nw_desc}.*Bech32')
def ref_keyaddrfile_chk(self):
return self.ref_addrfile_chk(ftype='keyaddr')

View file

@ -73,7 +73,7 @@ class TestSuiteRef3Seed(TestSuiteBase,TestSuiteShared):
def __init__(self,trunner,cfgs,spawn):
for k,j in self.cmd_group:
for n in (1,2,3): # 128,192,256 bits
setattr(self,'{}_{}'.format(k,n),getattr(self,k))
setattr(self,f'{k}_{n}',getattr(self,k))
if cfgs:
for n in self.tmpdir_nums:
cfgs[str(n)]['addr_idx_list'] = self.addr_idx_list_in
@ -85,7 +85,7 @@ class TestSuiteRef3Seed(TestSuiteBase,TestSuiteShared):
return self.walletchk(wf,pf=None,sid=self.seed_id)
def ref_ss_chk(self,ss=None):
wf = joinpath(ref_dir,'{}.{}'.format(self.seed_id,ss.ext))
wf = joinpath(ref_dir,f'{self.seed_id}.{ss.ext}')
return self.walletchk(wf,pf=None,wcls=ss,sid=self.seed_id)
def ref_seed_chk(self):
@ -110,8 +110,11 @@ class TestSuiteRef3Seed(TestSuiteBase,TestSuiteShared):
source = TestSuiteWalletConv.sources[str(self.seed_len)]
for wtype,edesc,of_arg in ('hic_wallet','',[]), \
('hic_wallet_old','(old format)',['-O']):
ic_arg = ['-H{},{}'.format(joinpath(ref_dir,source[wtype]),ref_wallet_incog_offset)]
slarg = ['-l{} '.format(self.seed_len)]
ic_arg = ['-H{},{}'.format(
joinpath(ref_dir,source[wtype]),
ref_wallet_incog_offset )
]
slarg = [f'-l{self.seed_len} ']
hparg = ['-p1']
if wtype == 'hic_wallet_old' and opt.profile: msg('')
t = self.spawn('mmgen-walletchk',
@ -128,9 +131,9 @@ class TestSuiteRef3Seed(TestSuiteBase,TestSuiteShared):
return t
def ref_walletgen_brain(self):
sl_arg = '-l{}'.format(self.seed_len)
hp_arg = '-p{}'.format(ref_wallet_hash_preset)
label = "test.py ref. wallet (pw '{}', seed len {}) α".format(ref_wallet_brainpass,self.seed_len)
sl_arg = f'-l{self.seed_len}'
hp_arg = f'-p{ref_wallet_hash_preset}'
label = f'test.py ref. wallet (pw {ref_wallet_brainpass!r}, seed len {self.seed_len}) α'
bf = 'ref.mmbrain'
args = ['-d',self.tmpdir,hp_arg,sl_arg,'-ib','-L',label]
self.write_to_tmpfile(bf,ref_wallet_brainpass)
@ -148,7 +151,7 @@ class TestSuiteRef3Seed(TestSuiteBase,TestSuiteShared):
self.chk_data['sids'][idx],
self.chk_data['lens'][idx],
'' if g.debug_utf8 else '')
assert re.match(pat,fn),'{} != {}'.format(pat,fn)
assert re.match(pat,fn), f'{pat} != {fn}'
sid = os.path.basename(fn.split('-')[0])
cmp_or_die(sid,self.seed_id,desc='Seed ID')
return t
@ -165,9 +168,14 @@ class TestSuiteRef3Seed(TestSuiteBase,TestSuiteShared):
if re_pat:
import re
pat = re_pat.format(sid,slen)
assert re.match(pat,fn),'{} != {}'.format(pat,fn)
assert re.match(pat,fn), f'{pat} != {fn}'
else:
cmp_or_die('{}[{}]{}.{}'.format(sid,slen,'' if g.debug_utf8 else '',wcls.ext),fn)
cmp_or_die('{}[{}]{}.{}'.format(
sid,
slen,
'' if g.debug_utf8 else '',
wcls.ext),
fn )
return t
def ref_walletconv_words(self): return self.ref_walletconv(ofmt='mn')

View file

@ -116,7 +116,7 @@ class TestSuiteRefAltcoin(TestSuiteRef,TestSuiteBase):
def ref_altcoin_addrgen(self,coin,mmtype,gen_what='addr',coin_suf='',add_args=[]):
wf = dfl_words_file
t = self.spawn('mmgen-{}gen'.format(gen_what),
t = self.spawn(f'mmgen-{gen_what}gen',
['-Sq','--coin='+coin] +
(['--type='+mmtype] if mmtype else []) +
add_args +
@ -124,9 +124,14 @@ class TestSuiteRefAltcoin(TestSuiteRef,TestSuiteBase):
if gen_what == 'key':
t.expect('Encrypt key list? (y/N): ','N')
chk = t.expect_getend(r'.* data checksum for \S*: ',regex=True)
chk_ref = self.chk_data['ref_{}addrfile_chksum_{}{}'.format(('','key')[gen_what=='key'],coin.lower(),coin_suf)]
chk_ref = self.chk_data[
'ref_{}addrfile_chksum_{}{}'.format(
('key' if gen_what == 'key' else ''),
coin.lower(),
coin_suf )
]
t.read()
cmp_or_die(chk,chk_ref,desc='{}list data checksum'.format(gen_what))
cmp_or_die(chk,chk_ref,desc=f'{gen_what}list data checksum')
return t
def ref_addrfile_gen_eth(self):

View file

@ -264,13 +264,13 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
def _add_comments_to_addr_file(self,addrfile,outfile,use_labels=False):
silence()
gmsg("Adding comments to address file '{}'".format(addrfile))
gmsg(f'Adding comments to address file {addrfile!r}')
a = AddrList(self.proto,addrfile)
for n,idx in enumerate(a.idxs(),1):
if use_labels:
a.set_comment(idx,get_label())
else:
if n % 2: a.set_comment(idx,'Test address {}'.format(n))
if n % 2: a.set_comment(idx,f'Test address {n}')
a.format(add_comments=True)
write_data_to_file(outfile,a.fmt_data,quiet=True,ignore_opt_outdir=True)
end_silence()
@ -330,11 +330,11 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
from mmgen.addr import MMGenAddrType
for mmtype in mmtypes or self.proto.mmtypes:
t = self.spawn('mmgen-addrgen',
['--quiet','--'+user,'--type='+mmtype,'--outdir={}'.format(self._user_dir(user))] +
['--quiet','--'+user,'--type='+mmtype,f'--outdir={self._user_dir(user)}'] +
([wf] if wf else []) +
(['--subwallet='+subseed_idx] if subseed_idx else []) +
[addr_range],
extra_desc='({})'.format(MMGenAddrType.mmtypes[mmtype].name))
extra_desc='({})'.format( MMGenAddrType.mmtypes[mmtype].name ))
t.passphrase(dfl_wcls.desc,rt_pw)
t.written_to_file('Addresses')
ok_msg()
@ -356,13 +356,14 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
x='' if g.debug_utf8 else ''))
if mmtype == self.proto.mmtypes[0] and user == 'bob':
self._add_comments_to_addr_file(addrfile,addrfile,use_labels=True)
t = self.spawn( 'mmgen-addrimport',
['--quiet', '--'+user, '--batch', addrfile],
extra_desc='({})'.format(desc))
t = self.spawn(
'mmgen-addrimport',
['--quiet', '--'+user, '--batch', addrfile],
extra_desc=f'({desc})' )
if g.debug:
t.expect("Type uppercase 'YES' to confirm: ",'YES\n')
t.expect('Importing')
t.expect('{} addresses imported'.format(num_addrs))
t.expect(f'{num_addrs} addresses imported')
ok_msg()
t.skip_ok = True
@ -507,7 +508,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
def bob_getbalance(self,bals,confs=1):
for i in (0,1,2):
assert Decimal(bals['mmgen'][i]) + Decimal(bals['nonmm'][i]) == Decimal(bals['total'][i])
t = self.spawn('mmgen-tool',['--bob','getbalance','minconf={}'.format(confs)])
t = self.spawn('mmgen-tool',['--bob','getbalance',f'minconf={confs}'])
t.expect('Wallet')
for k in ('mmgen','nonmm','total'):
ret = strip_ansi_escapes(t.expect_getend(r'\S+: ',regex=True))
@ -655,7 +656,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
def generate(self,coin=None,num_blocks=1):
int(num_blocks)
t = self.spawn('mmgen-regtest',['generate',str(num_blocks)])
t.expect('Mined {} block'.format(num_blocks))
t.expect(f'Mined {num_blocks} block')
return t
def _get_mempool(self):
@ -703,7 +704,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Transaction has been replaced','{} in mempool'.format(new_txid))
'Transaction has been replaced',f'{new_txid} in mempool')
def bob_rbf_status3(self):
if not self.proto.cap('rbf'):
@ -716,7 +717,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 1 confirmation',
'Replacing transactions:\s+{}'.format(new_txid))
f'Replacing transactions:\s+{new_txid}' )
def bob_rbf_status5(self):
if not self.proto.cap('rbf'):
@ -729,7 +730,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 2 confirmations',
'Replacing transactions:\s+{}'.format(new_txid))
f'Replacing transactions:\s+{new_txid}' )
def _gen_pairs(self,n):
from mmgen.tool import tool_api
@ -947,7 +948,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
( '\d+\s+\d+\s+'+pat_date_time,'q'),
):
t.expect(
r'\D{}\D.*\b{}\b'.format(rtAmts[0],d),
r'\D{}\D.*\b{}\b'.format( rtAmts[0], d ),
s,
regex=True )
t.read()

View file

@ -99,7 +99,7 @@ class TestSuiteSeedSplit(TestSuiteBase):
['-q','-d',self.get_tmp_subdir(tdir),'-r0','-o',ofmt]
+ (['-L',(spec or 'label')] if ofmt == 'w' else [])
+ add_args
+ (['--master-share={}'.format(master)] if master else [])
+ ([f'--master-share={master}'] if master else [])
+ ([wf] if wf else [])
+ ([spec] if spec else []))
if not wf:
@ -107,9 +107,9 @@ class TestSuiteSeedSplit(TestSuiteBase):
if spec:
from mmgen.obj import SeedSplitSpecifier
sss = SeedSplitSpecifier(spec)
pat = r"Processing .*\b{}\b of \b{}\b of .* id .*'{}'".format(sss.idx,sss.count,sss.id)
pat = rf'Processing .*\b{sss.idx}\b of \b{sss.count}\b of .* id .*{sss.id!r}'
else:
pat = "master share #{}".format(master)
pat = f'master share #{master}'
t.expect(pat,regex=True)
ocls = Wallet.fmt_code_to_type(ofmt)
pw = issubclass(ocls,WalletEnc)
@ -128,8 +128,8 @@ class TestSuiteSeedSplit(TestSuiteBase):
sid = self.read_from_tmpfile('dfl.sid')
t = self.spawn('mmgen-seedjoin',
add_args
+ (['--master-share={}'.format(master)] if master else [])
+ (['--id-str={}'.format(id_str)] if id_str else [])
+ ([f'--master-share={master}'] if master else [])
+ ([f'--id-str={id_str}'] if id_str else [])
+ ['-d',td,'-o',ofmt]
+ (['--label','Joined Wallet Label','-r0'] if ofmt == 'w' else [])
+ shares)

View file

@ -141,7 +141,7 @@ class TestSuiteShared(object):
t.expect('BOGUS transaction NOT sent')
else:
txid = strip_ansi_escapes(t.expect_getend('Transaction sent: '))
assert len(txid) == 64,"'{}': Incorrect txid length!".format(txid)
assert len(txid) == 64, f'{txid!r}: Incorrect txid length!'
t.written_to_file(file_desc)
@ -181,7 +181,7 @@ class TestSuiteShared(object):
def ref_brain_chk(self,bw_file=ref_bw_file):
wf = joinpath(ref_dir,bw_file)
add_args = ['-l{}'.format(self.seed_len), '-p'+ref_bw_hash_preset]
add_args = [f'-l{self.seed_len}', f'-p{ref_bw_hash_preset}']
return self.walletchk(wf,pf=None,add_args=add_args,sid=self.ref_bw_seed_id)
def walletchk(self,wf,pf,wcls=None,add_args=[],sid=None,extra_desc='',dfl_wallet=False):
@ -193,12 +193,12 @@ class TestSuiteShared(object):
+ ([wf] if wf else []),
extra_desc=extra_desc)
if wcls != IncogWalletHidden:
t.expect("Getting {} from file '".format(wcls.desc))
t.expect(f"Getting {wcls.desc} from file '")
pw = issubclass(wcls,WalletEnc) and wcls != Brainwallet
if pw:
t.passphrase(wcls.desc,self.wpasswd)
t.expect(['Passphrase is OK', 'Passphrase.* are correct'],regex=True)
chk = t.expect_getend('Valid {} for Seed ID '.format(wcls.desc))[:8]
chk = t.expect_getend(f'Valid {wcls.desc} for Seed ID ')[:8]
if sid: cmp_or_die(chk,sid)
return t
@ -215,27 +215,27 @@ class TestSuiteShared(object):
if not mmtype and not passgen:
mmtype = self.segwit_mmtype
cmd_pfx = (ftype,'pass')[passgen]
t = self.spawn('mmgen-{}gen'.format(cmd_pfx),
t = self.spawn(f'mmgen-{cmd_pfx}gen',
['-d',self.tmpdir] + extra_args +
([],['--type='+str(mmtype)])[bool(mmtype)] +
([],['--stdout'])[stdout] +
([],[wf])[bool(wf)] +
([],[id_str])[bool(id_str)] +
[getattr(self,'{}_idx_list'.format(cmd_pfx))],
extra_desc='({})'.format(mmtype) if mmtype in ('segwit','bech32') else '')
[getattr(self,f'{cmd_pfx}_idx_list')],
extra_desc=f'({mmtype})' if mmtype in ('segwit','bech32') else '')
t.license()
wcls = MMGenWallet if dfl_wallet else Wallet.ext_to_type(get_extension(wf))
t.passphrase(wcls.desc,self.wpasswd)
t.expect('Passphrase is OK')
desc = ('address','password')[passgen]
chk = t.expect_getend(r'Checksum for {} data .*?: '.format(desc),regex=True)
chk = t.expect_getend(rf'Checksum for {desc} data .*?: ',regex=True)
if passgen:
t.expect('Encrypt password list? (y/N): ','N')
t.read() if stdout else t.written_to_file(('Addresses','Password list')[passgen])
if check_ref:
chk_ref = (self.chk_data[self.test_name] if passgen else
self.chk_data[self.test_name][self.fork][self.proto.testnet])
cmp_or_die(chk,chk_ref,desc='{}list data checksum'.format(ftype))
cmp_or_die(chk,chk_ref,desc=f'{ftype}list data checksum')
return t
def keyaddrgen(self,wf,pf=None,check_ref=False,mmtype=None):
@ -244,7 +244,7 @@ class TestSuiteShared(object):
args = ['-d',self.tmpdir,self.usr_rand_arg,wf,self.addr_idx_list]
t = self.spawn('mmgen-keygen',
([],['--type='+str(mmtype)])[bool(mmtype)] + args,
extra_desc='({})'.format(mmtype) if mmtype in ('segwit','bech32') else '')
extra_desc=f'({mmtype})' if mmtype in ('segwit','bech32') else '')
t.license()
wcls = Wallet.ext_to_type(get_extension(wf))
t.passphrase(wcls.desc,self.wpasswd)
@ -263,4 +263,4 @@ class TestSuiteShared(object):
if sure:
t.expect('Are you sure you want to broadcast this')
m = ('YES, I REALLY WANT TO DO THIS','YES')[quiet]
t.expect("'{}' to confirm: ".format(m),('',m)[confirm_send]+'\n')
t.expect(f'{m!r} to confirm: ',('',m)[confirm_send]+'\n')

View file

@ -34,10 +34,13 @@ class TestSuiteTool(TestSuiteMain,TestSuiteBase):
outfile = os.path.join(self.tmpdir,'rand2file.out')
from mmgen.tool import MMGenToolCmdUtil
for nbytes in ('1','1023','1K','1048575','1M','1048577','123M'):
t = self.spawn( 'mmgen-tool',
['-d',self.tmpdir,'-r0','rand2file','rand2file.out',nbytes],
extra_desc='({} byte{})'.format(nbytes,suf(MMGenToolCmdUtil().bytespec(nbytes)))
)
t = self.spawn(
'mmgen-tool',
['-d',self.tmpdir,'-r0','rand2file','rand2file.out',nbytes],
extra_desc='({} byte{})'.format(
nbytes,
suf(MMGenToolCmdUtil().bytespec(nbytes)) )
)
t.expect('random data written to file')
t.read()
t.p.wait()
@ -67,9 +70,9 @@ class TestSuiteTool(TestSuiteMain,TestSuiteBase):
def tool_find_incog_data(self,f1,f2):
i_id = read_from_file(f2).rstrip()
vmsg('Incog ID: {}'.format(cyan(i_id)))
vmsg(f'Incog ID: {cyan(i_id)}')
t = self.spawn('mmgen-tool',['-d',self.tmpdir,'find_incog_data',f1,i_id])
o = t.expect_getend('Incog data for ID {} found at offset '.format(i_id))
o = t.expect_getend(f'Incog data for ID {i_id} found at offset ')
if not g.platform == 'win':
os.unlink(f1) # causes problems with MSYS2
cmp_or_die(hincog_offset,int(o))

View file

@ -88,7 +88,7 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
def __init__(self,trunner,cfgs,spawn):
for k,j in self.cmd_group:
for n in (1,2,3):
setattr(self,'{}_{}'.format(k,n),getattr(self,k))
setattr(self,f'{k}_{n}',getattr(self,k))
return TestSuiteBase.__init__(self,trunner,cfgs,spawn)
def ref_wallet_conv(self):
@ -120,7 +120,7 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
def ref_hincog_conv(self,wfk='hic_wallet',add_uopts=[]):
ic_f = joinpath(ref_dir,self.sources[str(self.seed_len)][wfk])
uopts = ['-i','hi','-p','1','-l',str(self.seed_len)] + add_uopts
hi_opt = ['-H','{},{}'.format(ic_f,ref_wallet_incog_offset)]
hi_opt = ['-H',f'{ic_f},{ref_wallet_incog_offset}']
return self.walletconv_in(None,uopts+hi_opt,oo=True,icls=IncogWalletHidden)
def ref_hincog_conv_old(self):
@ -139,7 +139,7 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
def ref_hincog_conv_out(self,ic_f=None):
if not ic_f:
ic_f = joinpath(self.tmpdir,hincog_fn)
hi_parms = '{},{}'.format(ic_f,ref_wallet_incog_offset)
hi_parms = f'{ic_f},{ref_wallet_incog_offset}'
sl_parm = '-l' + str(self.seed_len)
return self.walletconv_out('hi',
uopts = ['-J',hi_parms,sl_parm],
@ -154,16 +154,16 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
ic_img = joinpath(self.tmpdir,'hincog_blkdev_img')
do_run(['dd','if=/dev/zero','of='+ic_img,'bs=1K','count=1'])
ic_dev = do_run(['sudo','/sbin/losetup','-f']).stdout.strip().decode()
ic_dev_mode_orig = '{:o}'.format(os.stat(ic_dev).st_mode & 0xfff)
ic_dev_mode_orig = '{:o}'.format( os.stat(ic_dev).st_mode & 0xfff )
ic_dev_mode = '0666'
imsg("Changing permissions on loop device to '{}'".format(ic_dev_mode))
imsg(f'Changing permissions on loop device to {ic_dev_mode!r}')
do_run(['sudo','chmod',ic_dev_mode,ic_dev])
imsg("Attaching loop device '{}'".format(ic_dev))
imsg(f'Attaching loop device {ic_dev!r}')
do_run(['sudo','/sbin/losetup',ic_dev,ic_img])
self.ref_hincog_conv_out(ic_f=ic_dev)
imsg("Detaching loop device '{}'".format(ic_dev))
imsg(f'Detaching loop device {ic_dev!r}')
do_run(['sudo','/sbin/losetup','-d',ic_dev])
imsg("Resetting permissions on loop device to '{}'".format(ic_dev_mode_orig))
imsg(f'Resetting permissions on loop device to {ic_dev_mode_orig!r}')
do_run(['sudo','chmod',ic_dev_mode_orig,ic_dev])
return 'ok'
@ -200,7 +200,7 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
infile = joinpath(ref_dir,self.seed_id+'.mmwords')
t = self.spawn('mmgen-walletconv',[self.usr_rand_arg]+opts+[infile],extra_desc='(convert)')
add_args = ['-l{}'.format(self.seed_len)]
add_args = [f'-l{self.seed_len}']
t.license()
pw = issubclass(wcls,WalletEnc) and wcls != Brainwallet
if pw:

View file

@ -82,7 +82,7 @@ class TestSuiteXMRWallet(TestSuiteBase):
self.init_proxy()
self.tx_relay_daemon_parm = 'localhost:{}'.format(self.users['bob'].md.rpc_port)
self.tx_relay_daemon_parm = 'localhost:{}'.format( self.users['bob'].md.rpc_port )
self.tx_relay_daemon_proxy_parm = (
self.tx_relay_daemon_parm + f':127.0.0.1:{self.socks_port}' # proxy must be IP, not 'localhost'
if self.use_proxy else None )
@ -291,7 +291,10 @@ class TestSuiteXMRWallet(TestSuiteBase):
def create_wallets(self,user,wallet=None):
assert wallet is None or is_int(wallet), 'wallet arg'
data = self.users[user]
run('rm -f {}*'.format( data.walletfile_fs.format(wallet or '*') ),shell=True)
run(
'rm -f {}*'.format( data.walletfile_fs.format(wallet or '*') ),
shell = True
)
dir_opt = [f'--wallet-dir={data.udir}']
t = self.spawn(
'mmgen-xmrwallet',
@ -626,7 +629,7 @@ class TestSuiteXMRWallet(TestSuiteBase):
def get_status(self,ret):
if ret['status'] != 'OK':
imsg( 'RPC status: {}'.format(ret['status']) )
imsg( 'RPC status: {}'.format( ret['status'] ))
return ret['status']
def do_msg(self,extra_desc=None):

View file

@ -189,8 +189,8 @@ def is_coin_addr_loc(s):
msg_w = 35
def test_msg(m):
m2 = 'Testing {}'.format(m)
msg_r(green(m2+'\n') if opt.verbose else '{:{w}}'.format(m2,w=msg_w+8))
m2 = f'Testing {m}'
msg_r(green(m2+'\n') if opt.verbose else '{:{w}}'.format( m2, w=msg_w+8 ))
compressed = ('','compressed')['C' in proto.mmtypes]
segwit = ('','segwit')['S' in proto.mmtypes]
@ -211,14 +211,14 @@ class MMGenToolTestUtils(object):
tool_args +
kwargs.split()
)
if extra_msg: extra_msg = '({})'.format(extra_msg)
if extra_msg: extra_msg = f'({extra_msg})'
full_name = ' '.join([name.lower()]+add_opts+kwargs.split()+extra_msg.split())
if not silent:
if opt.verbose:
sys.stderr.write(green('Testing {}\nExecuting '.format(full_name)))
sys.stderr.write(green(f'Testing {full_name}\nExecuting '))
sys.stderr.write(cyan(' '.join(sys_cmd)+'\n'))
else:
msg_r('Testing {:{w}}'.format(full_name+':',w=msg_w))
msg_r('Testing {:{w}}'.format( full_name+':', w=msg_w ))
cp = run(sys_cmd,stdout=PIPE,stderr=PIPE)
out = cp.stdout
@ -229,8 +229,11 @@ class MMGenToolTestUtils(object):
if not binary:
out = out.decode()
if cp.returncode != 0:
msg('{}\n{}\n{}'.format(red('FAILED'),yellow('Command stderr output:'),err.decode()))
rdie(1,'Called process returned with an error (retcode {})'.format(cp.returncode))
msg('{}\n{}\n{}'.format(
red('FAILED'),
yellow('Command stderr output:'),
err.decode() ))
rdie(1,f'Called process returned with an error (retcode {cp.returncode})')
return (out,out.rstrip())[bool(strip)]
def run_cmd_chk(self,name,f1,f2,kwargs='',extra_msg='',strip_hex=False,add_opts=[]):
@ -256,13 +259,13 @@ class MMGenToolTestUtils(object):
def run_cmd_out(self,name,carg=None,Return=False,kwargs='',fn_idx='',extra_msg='',
literal=False,chkdata='',hush=False,add_opts=[]):
if carg: write_to_tmpfile(cfg,'{}{}.in'.format(name,fn_idx),carg+'\n')
if carg: write_to_tmpfile(cfg,f'{name}{fn_idx}.in',carg+'\n')
ret = self.run_cmd(name,([],[carg])[bool(carg)],kwargs=kwargs,
extra_msg=extra_msg,add_opts=add_opts)
if carg: vmsg('In: ' + repr(carg))
vmsg('Out: ' + (repr(ret),ret)[literal])
if ret or ret == '':
write_to_tmpfile(cfg,'{}{}.out'.format(name,fn_idx),ret+'\n')
write_to_tmpfile(cfg,f'{name}{fn_idx}.out',ret+'\n')
if chkdata:
cmp_or_die(ret,chkdata)
return
@ -270,7 +273,7 @@ class MMGenToolTestUtils(object):
else:
if not hush: ok()
else:
rdie(3,"Error for command '{}'".format(name))
rdie(3,f'Error for command {name!r}')
def run_cmd_randinput(self,name,strip=True,add_opts=[]):
s = os.urandom(128)
@ -280,7 +283,7 @@ class MMGenToolTestUtils(object):
fn = name+'.out'
write_to_tmpfile(cfg,fn,ret+'\n')
ok()
vmsg('Returned: {}'.format(ret))
vmsg(f'Returned: {ret}')
tu = MMGenToolTestUtils()
@ -290,7 +293,7 @@ def ok_or_die(val,chk_func,s,skip_ok=False):
if ret:
if not skip_ok: ok()
else:
rdie(3,"Returned value '{}' is not a {}".format((val,s)))
rdie(3,f'Returned value {val!r} is not a {s}')
class MMGenToolTestCmds(object):
@ -320,8 +323,8 @@ class MMGenToolTestCmds(object):
for n,f,m in (
(1,f1,''),
(2,f2,compressed),
(3,f3,'{} for {}'.format(compressed or 'uncompressed',segwit or 'p2pkh')),
(4,f4,'{} for {}'.format(compressed or 'uncompressed',bech32 or 'p2pkh'))
(3,f3,'{} for {}'.format( compressed or 'uncompressed', segwit or 'p2pkh' )),
(4,f4,'{} for {}'.format( compressed or 'uncompressed', bech32 or 'p2pkh' ))
):
wif = read_from_file(f).split()[0]
tu.run_cmd_out(name,wif,fn_idx=n,extra_msg=m)
@ -330,8 +333,8 @@ class MMGenToolTestCmds(object):
for n,k in enumerate(('',compressed,segwit,bech32)):
ao = ['--type='+k] if k else []
ret = tu.run_cmd(name,[keys[n]],add_opts=ao).rstrip()
iaddr = read_from_tmpfile(cfg,'randpair{}.out'.format(n+1)).split()[-1]
vmsg('Out: {}'.format(ret))
iaddr = read_from_tmpfile(cfg,f'randpair{n+1}.out').split()[-1]
vmsg(f'Out: {ret}')
cmp_or_die(iaddr,ret)
ok()
def hex2wif(self,name,f1,f2,f3,f4):
@ -341,7 +344,7 @@ class MMGenToolTestCmds(object):
def addr2pubhash(self,name,f1,f2,f3,f4):
for n,f,m,ao in (
(1,f1,'',[]),
(2,f2,'from {}'.format(compressed or 'uncompressed'),[]),
(2,f2,'from {}'.format( compressed or 'uncompressed' ),[]),
(4,f4,'',type_bech32_arg),
):
addr = read_from_file(f).split()[-1]
@ -349,7 +352,7 @@ class MMGenToolTestCmds(object):
def pubhash2addr(self,name,f1,f2,f3,f4,f5,f6,f7,f8):
for n,fi,fo,m,ao in (
(1,f1,f2,'',[]),
(2,f3,f4,'from {}'.format(compressed or 'uncompressed'),[]),
(2,f3,f4,'from {}'.format( compressed or 'uncompressed' ),[]),
(4,f7,f8,'',type_bech32_arg)
):
tu.run_cmd_chk(name,fi,fo,add_opts=ao,extra_msg=m)
@ -423,7 +426,11 @@ def gen_deps_for_cmd(cmd,cdata):
name,code = cdata
io,count = (code[:-1],int(code[-1])) if code[-1] in '0123456789' else (code,1)
for c in range(count):
fns += ['{}{}{}'.format(name,('',c+1)[count > 1],('.out','.in')[ch=='i']) for ch in io]
fns += ['{}{}{}'.format(
name,
(c+1 if count > 1 else ''),
('.in' if ch == 'i' else '.out'),
) for ch in io]
return fns
def do_cmds(cmd_group):
@ -441,21 +448,21 @@ try:
cmd = cmd_args[0]
if cmd in cmd_data:
cleandir(cfg['tmpdir'],do_msg=True)
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
do_cmds(cmd)
elif cmd == 'clean':
cleandir(cfg['tmpdir'],do_msg=True)
sys.exit(0)
else:
die(1,"'{}': unrecognized command".format(cmd))
die(1,f'{cmd!r}: unrecognized command')
else:
cleandir(cfg['tmpdir'],do_msg=True)
for cmd in cmd_data:
msg('Running tests for {}:'.format(cmd_data[cmd]['desc']))
msg('Running tests for {}:'.format( cmd_data[cmd]['desc'] ))
do_cmds(cmd)
if cmd is not list(cmd_data.keys())[-1]: msg('')
except KeyboardInterrupt:
die(1,green('\nExiting at user request'))
t = int(time.time()) - start_time
gmsg('All requested tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60))
gmsg(f'All requested tests finished OK, elapsed time: {t//60:02}:{t%60:02}')

View file

@ -802,7 +802,7 @@ async def run_test(gid,cmd_name):
('mainnet','testnet')[proto.testnet] )
if k in data:
data = data[k]
m2 = ' ({})'.format(k)
m2 = f' ({k})'
else:
qmsg(f'-- no data for {cmd_name} ({k}) - skipping')
return
@ -810,16 +810,23 @@ async def run_test(gid,cmd_name):
if proto.coin != 'BTC' or proto.testnet:
return
m2 = ''
m = '{} {}{}'.format(purple('Testing'), cmd_name if opt.names else docstring_head(tc[cmd_name]),m2)
m = '{} {}{}'.format(
purple('Testing'),
cmd_name if opt.names else docstring_head(tc[cmd_name]),
m2 )
msg_r(green(m)+'\n' if opt.verbose else m)
def fork_cmd(cmd_name,args,out,opts):
cmd = list(tool_cmd) + (opts or []) + [cmd_name] + args
vmsg('{} {}'.format(green('Executing'),cyan(' '.join(cmd))))
vmsg('{} {}'.format(
green('Executing'),
cyan(' '.join(cmd)) ))
cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
try: cmd_out = cp.stdout.decode()
except: cmd_out = cp.stdout
try:
cmd_out = cp.stdout.decode()
except:
cmd_out = cp.stdout
if cp.stderr:
vmsg(cp.stderr.strip().decode())
if cp.returncode != 0:
@ -858,7 +865,7 @@ async def run_test(gid,cmd_name):
else: # child
os.close(fd0)
os.write(fd1,stdin_input)
vmsg('Input: {!r}'.format(stdin_input))
vmsg(f'Input: {stdin_input!r}')
sys.exit(0)
else:
ret = method(*aargs,**kwargs)
@ -896,8 +903,8 @@ async def run_test(gid,cmd_name):
continue
cmd_out = await run_func(cmd_name,args,out,opts,mmtype)
try: vmsg('Output:\n{}\n'.format(cmd_out))
except: vmsg('Output:\n{}\n'.format(repr(cmd_out)))
try: vmsg(f'Output:\n{cmd_out}\n')
except: vmsg(f'Output:\n{cmd_out!r}\n')
def check_output(out,chk):
if isinstance(chk,str):
@ -1051,4 +1058,4 @@ async def main():
run_session(main())
t = int(time.time()) - start_time
gmsg('All requested tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60))
gmsg(f'All requested tests finished OK, elapsed time: {t//60:02}:{t%60:02}')

View file

@ -55,7 +55,7 @@ file_pfx = 'nt_' if opt.node_tools else 'ut_'
def exit_msg():
t = int(time.time()) - start_time
gmsg('All requested unit tests finished OK, elapsed time: {:02}:{:02}'.format(t//60,t%60))
gmsg(f'All requested unit tests finished OK, elapsed time: {t//60:02}:{t%60:02}')
all_tests = sorted(
[fn[3:-3] for fn in os.listdir(os.path.join(repo_root,'test','unit_tests_d')) if fn[:3] == file_pfx])
@ -83,12 +83,12 @@ class UnitTestHelpers(object):
m_noraise = "\nillegal action 'bad {}' failed to raise exception {!r}"
for (desc,exc_chk,emsg_chk,func) in data:
try:
vmsg_r(' bad {:{w}}'.format(desc+':',w=desc_w+1))
vmsg_r(' bad {:{w}}'.format( desc+':', w=desc_w+1 ))
func()
except Exception as e:
exc = type(e).__name__
emsg = e.args[0]
vmsg(' {:{w}} [{}]'.format(exc,emsg,w=exc_w))
vmsg(' {:{w}} [{}]'.format( exc, emsg, w=exc_w ))
assert exc == exc_chk, m_exc.format(exc,exc_chk)
assert re.search(emsg_chk,emsg), m_err.format(emsg,emsg_chk)
else:
@ -97,7 +97,7 @@ class UnitTestHelpers(object):
tests_seen = []
def run_test(test,subtest=None):
modname = 'test.unit_tests_d.{}{}'.format(file_pfx,test)
modname = f'test.unit_tests_d.{file_pfx}{test}'
mod = importlib.import_module(modname)
def run_subtest(subtest):

View file

@ -157,8 +157,9 @@ class unit_test(object):
qmsg_r('\nChecking hex-to-base conversion:')
for base,data in self.vectors.items():
fs = " {h:%s} {p:<6} {r}" % max(len(d[0][0]) for d in data)
if not opt.verbose: qmsg_r(' {}'.format(base))
vmsg('\nBase: {}'.format(base))
if not opt.verbose:
qmsg_r(f' {base}')
vmsg(f'\nBase: {base}')
vmsg(fs.format(h='Input',p='Pad',r='Output'))
for (hexstr,pad),ret_chk in data:
ret = baseconv.fromhex(hexstr,wl_id=base,pad=pad,tostr=True)
@ -172,8 +173,9 @@ class unit_test(object):
qmsg_r('\nChecking base-to-hex conversion:')
for base,data in self.vectors.items():
fs = " {h:%s} {p:<6} {r}" % max(len(d[1]) for d in data)
if not opt.verbose: qmsg_r(' {}'.format(base))
vmsg('\nBase: {}'.format(base))
if not opt.verbose:
qmsg_r(f' {base}')
vmsg(f'\nBase: {base}')
vmsg(fs.format(h='Input',p='Pad',r='Output'))
for (hexstr,pad),ret_chk in data:
if type(pad) == int:

View file

@ -99,7 +99,7 @@ class unit_test(object):
chk = tuple(v[1].split())
vmsg(' '+v[1])
res = bip39.fromhex(v[0],'bip39')
assert res == chk, 'mismatch:\nres: {}\nchk: {}'.format(res,chk)
assert res == chk, f'mismatch:\nres: {res}\nchk: {chk}'
vmsg('')
qmsg('Checking mnemonic to seed conversion:')
@ -107,7 +107,7 @@ class unit_test(object):
chk = v[0]
vmsg(' '+chk)
res = bip39.tohex(v[1].split(),'bip39')
assert res == chk, 'mismatch:\nres: {}\nchk: {}'.format(res,chk)
assert res == chk, f'mismatch:\nres: {res}\nchk: {chk}'
vmsg('')
qmsg('Checking error handling:')

View file

@ -76,7 +76,9 @@ def test_cmds(op):
if cp.returncode:
ydie(1,f'Unable to execute {d.exec_fn}')
else:
vmsg('{:16} {}'.format(d.exec_fn+':',cp.stdout.decode().splitlines()[0]))
vmsg('{:16} {}'.format(
d.exec_fn+':',
cp.stdout.decode().splitlines()[0] ))
else:
if opt.quiet:
msg_r('.')

View file

@ -38,7 +38,7 @@ class unit_test(object):
)):
vmsg(f'Cfg {n+1}:')
for k in ('opt','flag'):
vmsg(' {}s: {}'.format( k, getattr(cls,k) ))
vmsg(f' {k}s: {getattr(cls,k)}')
yield cls
return list(gen())

View file

@ -21,7 +21,7 @@ class unit_test(object):
def bad4(): d.clear()
def bad5(): d.update(d)
def odie(n): rdie(3,"\nillegal action '{}' failed to raise exception".format(bad_msg[n]))
def odie(n): rdie(3,f'\nillegal action {bad_msg[n]!r} failed to raise exception')
def omsg(e): vmsg(' - ' + e.args[0])
msg_r('Testing class IndexedDict...')

View file

@ -50,7 +50,7 @@ class unit_test(object):
for j,k in (('uniq_ss_len','usl'),('shortest_word','sw'),('longest_word','lw')):
a = getattr(mn_entry(wl_id),j)
b = self.vectors[wl_id][k]
assert a == b, '{}:{} {} != {}'.format(wl_id,j,a,b)
assert a == b, f'{wl_id}:{j} {a} != {b}'
msg('OK')
msg_r('Testing idx()...')
@ -62,11 +62,11 @@ class unit_test(object):
for entry_mode in ('full','short'):
for a,word in enumerate(m.wl):
b = m.idx(word,entry_mode)
assert a == b, '{} != {} ({!r} - entry mode: {!r})'.format(a,b,word,entry_mode)
assert a == b, f'{a} != {b} ({word!r} - entry mode: {entry_mode!r})'
a = None
for word in junk.split():
b = m.idx(word,entry_mode)
assert a == b, '{} != {} ({!r} - entry mode: {!r})'.format(a,b,word,entry_mode)
assert a == b, f'{a} != {b} ({word!r} - entry mode: {entry_mode!r})'
if 'idx_minimal' in self.vectors[wl_id]:
for vec in self.vectors[wl_id]['idx_minimal']:
chk = vec[1]

View file

@ -29,7 +29,7 @@ def cfg_file_auth_test(proto,d):
d.stop()
def do_msg(rpc):
qmsg(' Testing backend {!r}'.format(type(rpc.backend).__name__))
qmsg(' Testing backend {!r}'.format( type(rpc.backend).__name__ ))
class init_test:

View file

@ -35,7 +35,7 @@ class unit_test(object):
if opt.quiet:
omsg_r('.')
else:
msg_r('\n password {:9} '.format(pw_disp))
msg_r(f'\n password {pw_disp:9} ')
ret = scrypt_hash_passphrase(pw,salt,'1').hex()
assert ret == res, ret
@ -48,11 +48,11 @@ class unit_test(object):
if opt.quiet:
omsg_r('.')
else:
msg_r('\n {!r:3}: {!r:12} '.format(hp,g.hash_presets[hp]))
msg_r(f'\n {hp!r:3}: {g.hash_presets[hp]!r:12} ')
st = time.time()
ret = scrypt_hash_passphrase(pw,salt,hp).hex()
t = time.time() - st
vmsg(' {:0.4f} secs'.format(t))
vmsg(f' {t:0.4f} secs')
assert ret == res, ret
if opt.quiet:

View file

@ -50,7 +50,7 @@ class unit_test(object):
test_data = test_data_master[str(master_idx)]
for id_str in (None,'default','φυβαρ'):
msg_r('Testing basic ops (id_str={!r}, master_idx={})...'.format(id_str,master_idx))
msg_r(f'Testing basic ops (id_str={id_str!r}, master_idx={master_idx})...')
vmsg('')
for a,b,c,d,e,f,h,i,p in test_data[id_str if id_str is not None else 'default']:
@ -65,7 +65,7 @@ class unit_test(object):
assert A == share_count, A
s = shares.format()
vmsg_r('\n{}'.format(s))
vmsg_r(f'\n{s}')
assert len(s.strip().split('\n')) == share_count+6, s
if master_idx:
@ -103,7 +103,7 @@ class unit_test(object):
shares = seed.split(SeedShareIdx.max_val)
s = shares.format()
# vmsg_r('\n{}'.format(s))
# vmsg_r(f'\n{s}')
assert len(s.strip().split('\n')) == 1030, s
A = shares.get_share_by_idx(1024).sid
@ -119,7 +119,7 @@ class unit_test(object):
def collisions(seed_hex,ss_count,last_sid,collisions_chk,master_idx):
msg_r('Testing Seed ID collisions ({} seed shares, master_idx={})...'.format(ss_count,master_idx))
msg_r(f'Testing Seed ID collisions ({ss_count} seed shares, master_idx={master_idx})...')
vmsg('')
seed_bin = bytes.fromhex(seed_hex)
@ -139,7 +139,7 @@ class unit_test(object):
collisions += shares.data['long'][sid][1]
assert collisions == collisions_chk, collisions
vmsg_r('{} collisions, last_sid {}'.format(collisions,last_sid))
vmsg_r(f'{collisions} collisions, last_sid {last_sid}')
msg('OK')
def last_share_collisions():
@ -155,7 +155,7 @@ class unit_test(object):
assert collisions == 2, collisions
assert lsid == 'B5B8AD09', lsid
SeedShareIdx.max_val = ssm_save
vmsg_r('{} collisions, last_share sid {}'.format(collisions,lsid))
vmsg_r(f'{collisions} collisions, last_share sid {lsid}')
msg('..OK')
basic_ops(master_idx=None)

View file

@ -76,14 +76,14 @@ class unit_test(object):
ss_idx = str(g.subseeds+2) + 'S'
a = seed.subseed(ss_idx).sid
b = [e for e in s_lines if ' {}:'.format(ss_idx) in e][0].strip().split()[3]
b = [e for e in s_lines if f' {ss_idx}:' in e][0].strip().split()[3]
assert a == b, b
s = seed.subseeds.format(1,10)
s_lines = s.strip().split('\n')
assert len(s_lines) == 14, s
vmsg_r('\n{}'.format(s))
vmsg_r(f'\n{s}')
msg('OK')
@ -135,7 +135,7 @@ class unit_test(object):
r = SubSeedIdxRange('3-3')
assert r.items == [3], r.items
r = SubSeedIdxRange('{}-{}'.format(g.subseeds-1,g.subseeds))
r = SubSeedIdxRange(f'{g.subseeds-1}-{g.subseeds}')
assert r.items == [g.subseeds-1,g.subseeds], r.items
for n,e in enumerate(SubSeedIdxRange('1-5').iterate(),1):
@ -153,7 +153,7 @@ class unit_test(object):
last_idx = str(ss_count) + ltr
msg_r('Testing Seed ID collisions ({} subseed pairs)...'.format(ss_count))
msg_r(f'Testing Seed ID collisions ({ss_count} subseed pairs)...')
seed_bin = bytes.fromhex('12abcdef' * 8) # 95B3D78D
seed = Seed(seed_bin)
@ -173,7 +173,7 @@ class unit_test(object):
collisions += ss.data[k][sid][1]
assert collisions == collisions_chk, collisions
vmsg_r('\n{} collisions, last_sid {}'.format(collisions,last_sid))
vmsg_r(f'\n{collisions} collisions, last_sid {last_sid}')
msg('OK')
basic_ops()

View file

@ -39,7 +39,7 @@ class unit_test(object):
if opt.verbose:
Msg('\n====================================================')
Msg_r('.' if opt.quiet else '{:>3}) {}\n'.format(n,desc))
Msg_r('.' if opt.quiet else f'{n:>3}) {desc}\n')
if opt.verbose:
Pmsg(d)
Msg('----------------------------------------------------')
@ -53,13 +53,13 @@ class unit_test(object):
# inputs
a,b = d['vin'],dt['txins']
for i in range(len(a)):
assert a[i]['txid'] == b[i]['txid'],'TxID of input {} does not match'.format(i)
assert a[i]['vout'] == b[i]['vout'],'vout of input {} does not match'.format(i)
assert a[i]['txid'] == b[i]['txid'],f'TxID of input {i} does not match'
assert a[i]['vout'] == b[i]['vout'],f'vout of input {i} does not match'
assert a[i]['sequence'] == int(b[i]['nSeq'],16),(
'nSeq of input {} does not match'.format(i))
f'nSeq of input {i} does not match')
if 'txinwitness' in a[i]:
assert a[i]['txinwitness'] == b[i]['witness'],(
'witness of input {} does not match'.format(i))
f'witness of input {i} does not match')
# outputs
a,b = d['vout'],dt['txouts']