Use subprocess.run() throughout, related cleanups
This commit is contained in:
parent
d8252c56ac
commit
6cf4fcc7e4
17 changed files with 168 additions and 161 deletions
|
|
@ -20,7 +20,8 @@
|
|||
mmgen-autosign: Auto-sign MMGen transactions
|
||||
"""
|
||||
|
||||
import sys,os,subprocess,time,signal,shutil
|
||||
import sys,os,time,signal,shutil
|
||||
from subprocess import run,PIPE,DEVNULL
|
||||
from stat import *
|
||||
|
||||
mountpoint = '/mnt/tx'
|
||||
|
|
@ -151,7 +152,7 @@ def get_wallet_files():
|
|||
|
||||
def do_mount():
|
||||
if not os.path.ismount(mountpoint):
|
||||
if subprocess.Popen(['mount',mountpoint],stderr=subprocess.PIPE,stdout=subprocess.PIPE).wait() == 0:
|
||||
if run(['mount',mountpoint],stderr=DEVNULL,stdout=DEVNULL).returncode == 0:
|
||||
msg('Mounting '+mountpoint)
|
||||
try:
|
||||
ds = os.stat(tx_dir)
|
||||
|
|
@ -164,9 +165,9 @@ def do_mount():
|
|||
|
||||
def do_umount():
|
||||
if os.path.ismount(mountpoint):
|
||||
subprocess.call(['sync'])
|
||||
run(['sync'],check=True)
|
||||
msg('Unmounting '+mountpoint)
|
||||
subprocess.call(['umount',mountpoint])
|
||||
run(['umount',mountpoint],check=True)
|
||||
|
||||
def sign_tx_file(txfile,signed_txs):
|
||||
try:
|
||||
|
|
@ -304,7 +305,7 @@ def wipe_existing_key():
|
|||
except: pass
|
||||
else:
|
||||
msg('\nWiping existing key {}'.format(fn))
|
||||
subprocess.call(['wipe','-cf',fn])
|
||||
run(['wipe','-cf',fn],check=True)
|
||||
|
||||
def create_key():
|
||||
kdata = os.urandom(32).hex()
|
||||
|
|
@ -422,7 +423,7 @@ def check_access(fn,desc='status LED control',init_val=None):
|
|||
|
||||
def check_wipe_present():
|
||||
try:
|
||||
subprocess.Popen(['wipe','-v'],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||||
run(['wipe','-v'],stdout=DEVNULL,stderr=DEVNULL,check=True)
|
||||
except:
|
||||
die(2,"The 'wipe' utility must be installed before running this program")
|
||||
|
||||
|
|
|
|||
|
|
@ -221,12 +221,12 @@ if invoked_as == 'passchg' and ss_in.infile.dirname == g.data_dir:
|
|||
confirm_or_raise(m1,m2,exit_msg='Password not changed')
|
||||
ss_out.write_to_file(desc='New wallet',outdir=g.data_dir)
|
||||
msg('Securely deleting old wallet')
|
||||
from subprocess import check_output,CalledProcessError
|
||||
sd_cmd = (['wipe','-sf'],['sdelete','-p','20'])[g.platform=='win']
|
||||
from subprocess import run
|
||||
wipe_cmd = ['sdelete','-p','20'] if g.platform=='win' else ['wipe','-sf']
|
||||
try:
|
||||
check_output(sd_cmd + [ss_in.infile.name])
|
||||
run(wipe_cmd + [ss_in.infile.name],check=True)
|
||||
except:
|
||||
ymsg("WARNING: '{}' command failed, using regular file delete instead".format(sd_cmd[0]))
|
||||
ymsg("WARNING: '{}' command failed, using regular file delete instead".format(wipe_cmd[0]))
|
||||
os.unlink(ss_in.infile.name)
|
||||
else:
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@
|
|||
regtest: Coin daemon regression test mode setup and operations for the MMGen suite
|
||||
"""
|
||||
|
||||
import os,subprocess,time,shutil
|
||||
import os,time,shutil
|
||||
from subprocess import run,PIPE
|
||||
from mmgen.common import *
|
||||
PIPE = subprocess.PIPE
|
||||
|
||||
data_dir = os.path.join(g.data_dir_root,'regtest',g.coin.lower())
|
||||
daemon_dir = os.path.join(data_dir,'regtest')
|
||||
|
|
@ -54,8 +54,8 @@ def start_daemon(user,quiet=False,daemon=True,reindex=False):
|
|||
if daemon: cmd += ('--daemon',)
|
||||
if reindex: cmd += ('--reindex',)
|
||||
if not g.debug or quiet: vmsg('{}'.format(' '.join(cmd)))
|
||||
p = subprocess.Popen(cmd,stdout=PIPE,stderr=PIPE)
|
||||
err = process_output(p,silent=False)[1]
|
||||
cp = run(cmd,stdout=PIPE,stderr=PIPE)
|
||||
err = process_output(cp,silent=False)[1]
|
||||
if err:
|
||||
rdie(1,'Error starting the {} daemon:\n{}'.format(g.proto.name.capitalize(),err))
|
||||
|
||||
|
|
@ -74,16 +74,17 @@ def start_cmd(*args,**kwargs):
|
|||
vmsg('{}'.format(' '.join(cmd)))
|
||||
ip = op = ep = (PIPE,None)['no_pipe' in kwargs and kwargs['no_pipe']]
|
||||
if 'pipe_stdout_only' in kwargs and kwargs['pipe_stdout_only']: ip = ep = None
|
||||
return subprocess.Popen(cmd,stdin=ip,stdout=op,stderr=ep)
|
||||
return run(cmd,stdin=ip,stdout=op,stderr=ep)
|
||||
|
||||
def test_daemon():
|
||||
p = start_cmd('cli','getblockcount',quiet=True)
|
||||
err = process_output(p,silent=True)[1]
|
||||
ret,state = p.wait(),None
|
||||
cp = start_cmd('cli','getblockcount',quiet=True)
|
||||
err = process_output(cp,silent=True)[1]
|
||||
if "error: couldn't connect" in err or "error: Could not connect" in err:
|
||||
state = 'stopped'
|
||||
if not state: state = ('busy','ready')[ret==0]
|
||||
return state
|
||||
return 'stopped'
|
||||
elif cp.returncode == 0:
|
||||
return 'ready'
|
||||
else:
|
||||
return 'busy'
|
||||
|
||||
def wait_for_daemon(state,silent=False,nonl=False):
|
||||
for i in range(200):
|
||||
|
|
@ -92,7 +93,8 @@ def wait_for_daemon(state,silent=False,nonl=False):
|
|||
if opt.verbose: msg('returning state '+ret)
|
||||
else: gmsg_r('.')
|
||||
if ret == state and not nonl: msg('')
|
||||
if ret == state: return True
|
||||
if ret == state:
|
||||
return True
|
||||
time.sleep(1)
|
||||
else:
|
||||
die(1,'timeout exceeded')
|
||||
|
|
@ -132,10 +134,11 @@ def create_data_dir():
|
|||
try: os.makedirs(data_dir)
|
||||
except: pass
|
||||
|
||||
def process_output(p,silent=False):
|
||||
out = p.stdout.read().decode()
|
||||
if g.platform == 'win' and not opt.verbose: Msg_r(' \b')
|
||||
err = p.stderr.read().decode()
|
||||
def process_output(cp,silent=False):
|
||||
out = cp.stdout.decode()
|
||||
if g.platform == 'win' and not opt.verbose:
|
||||
Msg_r(' \b')
|
||||
err = cp.stderr.decode()
|
||||
if g.debug or not silent:
|
||||
vmsg('stdout: [{}]'.format(out.strip()))
|
||||
vmsg('stderr: [{}]'.format(err.strip()))
|
||||
|
|
@ -153,22 +156,19 @@ def stop_and_wait(silent=False,nonl=False,stop_silent=False,ignore_noconnect_err
|
|||
def send(addr,amt):
|
||||
user('miner')
|
||||
gmsg('Sending {} {} to address {}'.format(amt,g.coin,addr))
|
||||
p = start_cmd('cli','sendtoaddress',addr,str(amt))
|
||||
process_output(p)
|
||||
p.wait()
|
||||
cp = start_cmd('cli','sendtoaddress',addr,str(amt))
|
||||
process_output(cp)
|
||||
generate(1)
|
||||
|
||||
def show_mempool():
|
||||
p = start_cmd('cli','getrawmempool')
|
||||
cp = start_cmd('cli','getrawmempool')
|
||||
from ast import literal_eval
|
||||
pp_msg(literal_eval(p.stdout.read().decode()))
|
||||
p.wait()
|
||||
pp_msg(literal_eval(cp.stdout.decode()))
|
||||
|
||||
def cli(*args):
|
||||
p = start_cmd(*(('cli',) + args))
|
||||
Msg_r(p.stdout.read().decode())
|
||||
msg_r(p.stderr.read().decode())
|
||||
p.wait()
|
||||
cp = start_cmd(*(('cli',) + args))
|
||||
Msg_r(cp.stdout.decode())
|
||||
msg_r(cp.stderr.decode())
|
||||
|
||||
def fork(coin):
|
||||
coin = coin.upper()
|
||||
|
|
@ -235,7 +235,7 @@ def get_current_user_win(quiet=False):
|
|||
if test_daemon() == 'stopped': return None
|
||||
logfile = os.path.join(daemon_dir,'debug.log')
|
||||
for ss in ('Wallet completed loading in','Using wallet wallet'):
|
||||
o = start_cmd('grep',ss,logfile,quiet=True).stdout.readlines()
|
||||
o = start_cmd('grep',ss,logfile,quiet=True).stdout.splitlines()
|
||||
if o:
|
||||
last_line = o[-1].decode()
|
||||
break
|
||||
|
|
@ -256,9 +256,10 @@ def get_current_user_win(quiet=False):
|
|||
return None
|
||||
|
||||
def get_current_user_unix(quiet=False):
|
||||
p = start_cmd('pgrep','-af','{}.*--rpcport={}.*'.format(g.proto.daemon_name,rpc_port),quiet=True)
|
||||
cmdline = p.stdout.read().decode()
|
||||
if not cmdline: return None
|
||||
cp = start_cmd('pgrep','-af','{}.*--rpcport={}.*'.format(g.proto.daemon_name,rpc_port),quiet=True)
|
||||
cmdline = cp.stdout.decode()
|
||||
if not cmdline:
|
||||
return None
|
||||
for k in ('miner','bob','alice'):
|
||||
if 'wallet.dat.'+k in cmdline:
|
||||
if not quiet: msg('Current user is {}'.format(k.capitalize()))
|
||||
|
|
@ -292,24 +293,23 @@ def user(user=None,quiet=False):
|
|||
def stop(silent=False,ignore_noconnect_error=True):
|
||||
if test_daemon() != 'stopped' and not silent:
|
||||
gmsg('Stopping {} regtest daemon for coin {}'.format(g.proto.name,g.coin))
|
||||
p = start_cmd('cli','stop')
|
||||
err = process_output(p)[1]
|
||||
cp = start_cmd('cli','stop')
|
||||
err = process_output(cp)[1]
|
||||
if err:
|
||||
if "couldn't connect to server" in err and not ignore_noconnect_error:
|
||||
rdie(1,'Error stopping the {} daemon:\n{}'.format(g.proto.name.capitalize(),err))
|
||||
msg(err)
|
||||
return p.wait()
|
||||
|
||||
def generate(blocks=1,silent=False):
|
||||
|
||||
def have_generatetoaddress():
|
||||
p = start_cmd('cli','help','generatetoaddress')
|
||||
out,err = process_output(p,silent=True)
|
||||
cp = start_cmd('cli','help','generatetoaddress')
|
||||
out = process_output(cp,silent=True)[0]
|
||||
return not 'unknown command' in out
|
||||
|
||||
def get_miner_address():
|
||||
p = start_cmd('cli','getnewaddress')
|
||||
out,err = process_output(p,silent=True)
|
||||
cp = start_cmd('cli','getnewaddress')
|
||||
out,err = process_output(cp,silent=True)
|
||||
if not err:
|
||||
return out.strip()
|
||||
else:
|
||||
|
|
@ -321,14 +321,14 @@ def generate(blocks=1,silent=False):
|
|||
wait_for_daemon('ready',silent=True)
|
||||
|
||||
if have_generatetoaddress():
|
||||
p = start_cmd('cli','generatetoaddress',str(blocks),get_miner_address())
|
||||
cp = start_cmd('cli','generatetoaddress',str(blocks),get_miner_address())
|
||||
else:
|
||||
p = start_cmd('cli','generate',str(blocks))
|
||||
cp = start_cmd('cli','generate',str(blocks))
|
||||
|
||||
out,err = process_output(p,silent=silent)
|
||||
out,err = process_output(cp,silent=silent)
|
||||
|
||||
from ast import literal_eval
|
||||
if not out or len(literal_eval(out)) != blocks:
|
||||
rdie(1,'Error generating blocks')
|
||||
p.wait()
|
||||
|
||||
gmsg('Mined {} block{}'.format(blocks,suf(blocks)))
|
||||
|
|
|
|||
|
|
@ -882,17 +882,16 @@ class MMGenToolCmdMonero(MMGenToolCmdBase):
|
|||
exit_if_mswin('Monero wallet operations')
|
||||
|
||||
def run_cmd(cmd):
|
||||
import subprocess as sp
|
||||
p = sp.Popen(cmd,stdin=sp.PIPE,stdout=sp.PIPE,stderr=sp.PIPE)
|
||||
return p
|
||||
from subprocess import run,PIPE,DEVNULL
|
||||
return run(cmd,stdout=PIPE,stderr=DEVNULL,check=True)
|
||||
|
||||
def test_rpc():
|
||||
p = run_cmd(['monero-wallet-cli','--version'])
|
||||
if not b'Monero' in p.stdout.read():
|
||||
cp = run_cmd(['monero-wallet-cli','--version'])
|
||||
if not b'Monero' in cp.stdout:
|
||||
die(1,"Unable to run 'monero-wallet-cli'!")
|
||||
p = run_cmd(['monerod','status'])
|
||||
cp = run_cmd(['monerod','status'])
|
||||
import re
|
||||
m = re.search(r'Height: (\d+)/\d+ ',p.stdout.read().decode())
|
||||
m = re.search(r'Height: (\d+)/\d+ ',cp.stdout.decode())
|
||||
if not m:
|
||||
die(1,'Unable to connect to monerod!')
|
||||
return int(m.group(1))
|
||||
|
|
|
|||
|
|
@ -848,14 +848,16 @@ def do_pager(text):
|
|||
|
||||
for pager in pagers:
|
||||
try:
|
||||
from subprocess import Popen,PIPE
|
||||
p = Popen([pager],stdin=PIPE,shell=False)
|
||||
except: pass
|
||||
else:
|
||||
p.communicate((text+(end_msg,'')[pager=='less']).encode())
|
||||
from subprocess import run
|
||||
m = text + ('' if pager == 'less' else end_msg)
|
||||
p = run([pager],input=m.encode(),check=True)
|
||||
msg_r('\r')
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
break
|
||||
else: Msg(text+end_msg)
|
||||
else:
|
||||
Msg(text+end_msg)
|
||||
|
||||
def do_license_msg(immed=False):
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@
|
|||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import sys,os,json,re
|
||||
from subprocess import Popen,PIPE
|
||||
from subprocess import run,PIPE
|
||||
from mmgen.common import *
|
||||
from mmgen.obj import CoinAddr,is_coin_addr
|
||||
|
||||
|
|
@ -185,8 +185,7 @@ def create_src(code):
|
|||
return code
|
||||
|
||||
def check_version():
|
||||
p = Popen(['solc','--version'],stdout=PIPE)
|
||||
res = p.stdout.read().decode()
|
||||
res = run(['solc','--version'],stdout=PIPE).stdout.decode()
|
||||
ver = re.search(r'Version:\s*(.*)',res).group(1)
|
||||
msg("Installed solc version: {}".format(ver))
|
||||
if not re.search(r'{}\b'.format(solc_version_pat),ver):
|
||||
|
|
@ -195,18 +194,17 @@ def check_version():
|
|||
def compile_code(code):
|
||||
check_version()
|
||||
cmd = ['solc','--optimize','--bin','--overwrite']
|
||||
if not opt.stdout: cmd += ['--output-dir', opt.outdir or '.']
|
||||
if not opt.stdout:
|
||||
cmd += ['--output-dir', opt.outdir or '.']
|
||||
cmd += ['-']
|
||||
msg('Executing: {}'.format(' '.join(cmd)))
|
||||
p = Popen(cmd,stdin=PIPE,stdout=PIPE,stderr=PIPE)
|
||||
res = p.communicate(code.encode())
|
||||
out = res[0].decode().replace('\r','')
|
||||
err = res[1].decode().replace('\r','').strip()
|
||||
rc = p.wait()
|
||||
if rc != 0:
|
||||
cp = run(cmd,input=code.encode(),stdout=PIPE,stderr=PIPE)
|
||||
out = cp.stdout.decode().replace('\r','')
|
||||
err = cp.stderr.decode().replace('\r','').strip()
|
||||
if cp.returncode != 0:
|
||||
rmsg('Solidity compiler produced the following error:')
|
||||
msg(err)
|
||||
rdie(2,'Solidity compiler exited with error (return val: {})'.format(rc))
|
||||
rdie(2,'Solidity compiler exited with error (return val: {})'.format(cp.returncode))
|
||||
if err:
|
||||
ymsg('Solidity compiler produced the following warning:')
|
||||
msg(err)
|
||||
|
|
|
|||
|
|
@ -88,20 +88,21 @@ if not 1 <= len(cmd_args) <= 2: opts.usage()
|
|||
|
||||
addr_type = MMGenAddrType(opt.type or g.proto.dfl_mmtype)
|
||||
|
||||
from subprocess import run,PIPE,DEVNULL
|
||||
def get_cmd_output(cmd,input=None):
|
||||
return run(cmd,input=input,stdout=PIPE,stderr=DEVNULL).stdout.decode().splitlines()
|
||||
|
||||
def ethkey_sec2addr(sec):
|
||||
p = Popen(['ethkey','info',sec],stdout=PIPE)
|
||||
o = p.stdout.read().decode().splitlines()
|
||||
return sec,o[-1].split()[1]
|
||||
o = get_cmd_output(['ethkey','info',sec])
|
||||
return (sec,o[-1].split()[1])
|
||||
|
||||
def keyconv_sec2addr(sec):
|
||||
p = Popen(['keyconv','-C',g.coin,sec.wif],stderr=PIPE,stdout=PIPE)
|
||||
o = p.stdout.read().decode().splitlines()
|
||||
o = get_cmd_output(['keyconv','-C',g.coin,sec.wif])
|
||||
return (o[1].split()[1],o[0].split()[1])
|
||||
|
||||
def zcash_mini_sec2addr(sec):
|
||||
p = Popen(['zcash-mini','-key','-simple'],stderr=PIPE,stdin=PIPE,stdout=PIPE)
|
||||
ret = p.communicate(sec.wif.encode()+b'\n')[0].decode().strip().split('\n')
|
||||
return (sec.wif,ret[0],ret[-1])
|
||||
o = get_cmd_output(['zcash-mini','-key','-simple'],input=(sec.wif+'\n').encode())
|
||||
return (sec.wif,o[0],o[-1])
|
||||
|
||||
def pycoin_sec2addr(sec):
|
||||
coin = ci.external_tests['testnet']['pycoin'][g.coin] if g.testnet else g.coin
|
||||
|
|
@ -247,7 +248,6 @@ def dump_test():
|
|||
qmsg(green(('\n','')[bool(opt.verbose)] + 'OK'))
|
||||
|
||||
# begin execution
|
||||
from subprocess import Popen,PIPE
|
||||
from mmgen.protocol import init_coin
|
||||
from mmgen.altcoin import CoinInfo as ci
|
||||
|
||||
|
|
|
|||
|
|
@ -45,11 +45,8 @@ class MMGenPexpect(object):
|
|||
|
||||
if opt.direct_exec:
|
||||
msg('')
|
||||
from subprocess import call,check_output
|
||||
f = (call,check_output)[bool(no_output)]
|
||||
ret = f([args[0]] + args[1:])
|
||||
if f == call and ret != 0:
|
||||
die(1,red('ERROR: process returned a non-zero exit status ({})'.format(ret)))
|
||||
from subprocess import run,DEVNULL
|
||||
run([args[0]] + args[1:],check=True,stdout=DEVNULL if no_output else None)
|
||||
else:
|
||||
if opt.pexpect_spawn:
|
||||
self.p = pexpect.spawn(args[0],args[1:],encoding='utf8')
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ test/scrambletest.py: seed scrambling and addrlist data generation tests for all
|
|||
supported coins + passwords
|
||||
"""
|
||||
|
||||
import sys,os,subprocess
|
||||
import sys,os
|
||||
from subprocess import run,PIPE
|
||||
repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),os.pardir)))
|
||||
os.chdir(repo_root)
|
||||
sys.path.__setitem__(0,repo_root)
|
||||
|
|
@ -97,13 +98,10 @@ cvr_opts = ' -m trace --count --coverdir={} --file={}'.format(*init_coverage())
|
|||
cmd_base = 'python3{} cmds/mmgen-{{}}gen -qS'.format(cvr_opts)
|
||||
|
||||
def get_cmd_output(cmd):
|
||||
p = subprocess.Popen(cmd.split(),stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||||
o = p.stdout.read().decode()
|
||||
err = p.stderr.read().decode()
|
||||
exit_val = p.wait()
|
||||
if exit_val != 0:
|
||||
ydie(2,'\nSpawned program exited with error code {}:\n{}'.format(exit_val,err))
|
||||
return o.splitlines()
|
||||
cp = run(cmd.split(),stdout=PIPE,stderr=PIPE)
|
||||
if cp.returncode != 0:
|
||||
ydie(2,'\nSpawned program exited with error code {}:\n{}'.format(cp.returncode,cp.stderr.decode()))
|
||||
return cp.stdout.decode().splitlines()
|
||||
|
||||
def do_test(cmd,tdata,msg_str,addr_desc):
|
||||
vmsg(green('Executing: {}'.format(cmd)))
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ def create_shm_dir(data_dir,trash_dir):
|
|||
# Laggy flash media can cause pexpect to fail, so create a temporary directory
|
||||
# under '/dev/shm' and put datadir and tmpdirs here.
|
||||
import shutil
|
||||
from subprocess import run
|
||||
if g.platform == 'win':
|
||||
for tdir in (data_dir,trash_dir):
|
||||
try: os.listdir(tdir)
|
||||
|
|
@ -36,8 +37,10 @@ def create_shm_dir(data_dir,trash_dir):
|
|||
else:
|
||||
try: shutil.rmtree(tdir)
|
||||
except: # we couldn't remove data dir - perhaps regtest daemon is running
|
||||
try: subprocess.call(['python3',os.path.join('cmds','mmgen-regtest'),'stop'])
|
||||
except: rdie(1,"Unable to remove {!r}!".format(tdir))
|
||||
try:
|
||||
run(['python3',os.path.join('cmds','mmgen-regtest'),'stop'],check=True)
|
||||
except:
|
||||
rdie(1,"Unable to remove {!r}!".format(tdir))
|
||||
else:
|
||||
time.sleep(2)
|
||||
shutil.rmtree(tdir)
|
||||
|
|
@ -46,7 +49,7 @@ def create_shm_dir(data_dir,trash_dir):
|
|||
else:
|
||||
tdir,pfx = '/dev/shm','mmgen-test-'
|
||||
try:
|
||||
subprocess.call('rm -rf {}/{}*'.format(tdir,pfx),shell=True)
|
||||
run('rm -rf {}/{}*'.format(tdir,pfx),shell=True,check=True)
|
||||
except Exception as e:
|
||||
die(2,'Unable to delete directory tree {}/{}* ({})'.format(tdir,pfx,e.args[0]))
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@
|
|||
common.py: Shared routines and data for the test.py test suite
|
||||
"""
|
||||
|
||||
import os,time,subprocess
|
||||
import os,time
|
||||
from mmgen.common import *
|
||||
from ..common import *
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ ts_autosign.py: Autosign tests for the test.py test suite
|
|||
"""
|
||||
|
||||
import os,shutil
|
||||
from subprocess import run
|
||||
|
||||
from mmgen.globalvars import g
|
||||
from mmgen.opts import opt
|
||||
from test.common import *
|
||||
|
|
@ -107,11 +109,11 @@ class TestSuiteAutosign(TestSuiteBase):
|
|||
def do_autosign_live(opts,mountpoint,led_opts=[],gen_wallet=True):
|
||||
|
||||
def do_mount():
|
||||
try: subprocess.check_call(['mount',mountpoint])
|
||||
try: run(['mount',mountpoint],check=True)
|
||||
except: pass
|
||||
|
||||
def do_unmount():
|
||||
try: subprocess.check_call(['umount',mountpoint])
|
||||
try: run(['umount',mountpoint],check=True)
|
||||
except: pass
|
||||
omsg_r(blue('\nRemove removable device and then hit ENTER '))
|
||||
input()
|
||||
|
|
@ -180,7 +182,7 @@ class TestSuiteAutosign(TestSuiteBase):
|
|||
mountpoint = '/mnt/tx'
|
||||
if not os.path.ismount(mountpoint):
|
||||
try:
|
||||
subprocess.check_call(['mount',mountpoint])
|
||||
run(['mount',mountpoint],check=True)
|
||||
imsg("Mounted '{}'".format(mountpoint))
|
||||
except:
|
||||
ydie(1,"Could not mount '{}'! Exiting".format(mountpoint))
|
||||
|
|
@ -201,7 +203,7 @@ class TestSuiteAutosign(TestSuiteBase):
|
|||
|
||||
if led_support:
|
||||
for fn in (led_files[led_support]):
|
||||
subprocess.check_call(['sudo','chmod','0666',fn])
|
||||
run(['sudo','chmod','0666',fn],check=True)
|
||||
omsg(purple('Running autosign test with no LED'))
|
||||
do_autosign_live(opts,mountpoint)
|
||||
omsg(purple("Running autosign test with '--led'"))
|
||||
|
|
|
|||
|
|
@ -20,8 +20,10 @@
|
|||
ts_ethdev.py: Ethdev tests for the test.py test suite
|
||||
"""
|
||||
|
||||
import sys,os,subprocess,re,shutil
|
||||
import sys,os,re,shutil
|
||||
from decimal import Decimal
|
||||
from subprocess import run,PIPE,DEVNULL
|
||||
|
||||
from mmgen.globalvars import g
|
||||
from mmgen.opts import opt
|
||||
from mmgen.util import die
|
||||
|
|
@ -45,9 +47,8 @@ parity_key_fn = 'parity.devkey'
|
|||
|
||||
# Token sends require varying amounts of gas, depending on compiler version
|
||||
try:
|
||||
solc_ver = re.search(r'Version:\s*(.*)',
|
||||
subprocess.Popen(['solc','--version'],stdout=subprocess.PIPE
|
||||
).stdout.read().decode()).group(1)
|
||||
cmd_out = run(['solc','--version'],stdout=PIPE).stdout.decode()
|
||||
solc_ver = re.search(r'Version:\s*(.*)',cmd_out).group(1)
|
||||
except:
|
||||
solc_ver = '' # no solc on system - prompt for precompiled v0.5.3 contract files
|
||||
|
||||
|
|
@ -306,10 +307,10 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
|
|||
m3 = ['parity',lf_arg] + opts
|
||||
m4 = '\nPress ENTER to continue: '
|
||||
my_raw_input(m1 + m2 + ' '.join(m3) + m4)
|
||||
elif subprocess.call(['which','parity'],stdout=subprocess.PIPE) == 0:
|
||||
elif run(['which','parity'],stdout=DEVNULL).returncode == 0:
|
||||
ss = 'parity.*--log-file=test/data_dir.*/parity.log' # allow for UTF8_DEBUG
|
||||
try:
|
||||
pid = subprocess.check_output(['pgrep','-af',ss]).split()[0]
|
||||
pid = run(['pgrep','-af',ss],stdout=PIPE).stdout.split()[0]
|
||||
os.kill(int(pid),9)
|
||||
except: pass
|
||||
# '--base-path' doesn't work together with daemon mode, so we have to clobber the main dev chain
|
||||
|
|
@ -318,12 +319,12 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
|
|||
bdir = joinpath(self.tr.data_dir,'parity')
|
||||
try: os.mkdir(bdir)
|
||||
except: pass
|
||||
redir = None if opt.exact_output else subprocess.PIPE
|
||||
redir = None if opt.exact_output else PIPE
|
||||
pidfile = joinpath(self.tmpdir,parity_pid_fn)
|
||||
subprocess.check_call(['parity',lf_arg] + opts + ['daemon',pidfile],stderr=redir,stdout=redir)
|
||||
run(['parity',lf_arg] + opts + ['daemon',pidfile],stderr=redir,stdout=redir,check=True)
|
||||
time.sleep(3) # race condition
|
||||
pid = self.read_from_tmpfile(parity_pid_fn)
|
||||
elif subprocess.call('netstat -tnl | grep -q 127.0.0.1:8549',shell=True) == 0:
|
||||
elif run('netstat -tnl | grep -q 127.0.0.1:8549',shell=True).returncode == 0:
|
||||
m1 = 'No parity executable found on system, but port 8549 is active!'
|
||||
m2 = 'Before continuing, you should probably run the command'
|
||||
m3 = 'test/test.py -X setup ethdev'
|
||||
|
|
@ -574,9 +575,16 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
|
|||
imsg("Compiling solidity token contract '{}' with 'solc'".format(token_data['symbol']))
|
||||
try: os.mkdir(odir)
|
||||
except: pass
|
||||
cmd = ['scripts/traceback_run.py','scripts/create-token.py','--coin='+g.coin,'--outdir='+odir] + cmd_args + [dfl_addr_chk]
|
||||
cmd = [
|
||||
'scripts/traceback_run.py',
|
||||
'scripts/create-token.py',
|
||||
'--coin=' + g.coin,
|
||||
'--outdir=' + odir
|
||||
] + cmd_args + [dfl_addr_chk]
|
||||
imsg("Executing: {}".format(' '.join(cmd)))
|
||||
subprocess.check_output(cmd,stderr=subprocess.STDOUT)
|
||||
cp = run(cmd,stdout=DEVNULL,stderr=PIPE)
|
||||
if cp.returncode != 0:
|
||||
rdie(2,'solc failed with the following output: {}'.format(cp.stderr))
|
||||
imsg("ERC20 token '{}' compiled".format(token_data['symbol']))
|
||||
return 'ok'
|
||||
|
||||
|
|
@ -937,12 +945,12 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
|
|||
self.spawn('',msg_only=True)
|
||||
if g.platform == 'win':
|
||||
my_raw_input('Please stop parity and Press ENTER to continue: ')
|
||||
elif subprocess.call(['which','parity'],stdout=subprocess.PIPE) == 0:
|
||||
elif run(['which','parity'],stdout=DEVNULL).returncode == 0:
|
||||
pid = self.read_from_tmpfile(parity_pid_fn)
|
||||
if opt.no_daemon_stop:
|
||||
msg_r('(leaving daemon running by user request)')
|
||||
else:
|
||||
subprocess.check_call(['kill',pid])
|
||||
run(['kill',pid],check=True)
|
||||
else:
|
||||
imsg('No parity executable found on system. Ignoring')
|
||||
return 'ok'
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@
|
|||
ts_regtest.py: Regtest tests for the test.py test suite
|
||||
"""
|
||||
|
||||
import os,subprocess
|
||||
import os
|
||||
from decimal import Decimal
|
||||
from ast import literal_eval
|
||||
from mmgen.globalvars import g
|
||||
|
|
@ -680,11 +680,12 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
|
|||
@staticmethod
|
||||
def _gen_pairs(n):
|
||||
disable_debug()
|
||||
ret = [subprocess.check_output(
|
||||
['python3',joinpath('cmds','mmgen-tool'),'--testnet=1'] +
|
||||
(['--type=compressed'],[])[i==0] +
|
||||
['-r0','randpair']
|
||||
).decode().split() for i in range(n)]
|
||||
from subprocess import run,PIPE
|
||||
ret = [run(['python3',joinpath('cmds','mmgen-tool'),'--testnet=1'] +
|
||||
(['--type=compressed'],[])[i==0] +
|
||||
['-r0','randpair'],
|
||||
stdout=PIPE,check=True
|
||||
).stdout.decode().split() for i in range(n)]
|
||||
restore_debug()
|
||||
return ret
|
||||
|
||||
|
|
|
|||
|
|
@ -155,22 +155,25 @@ class TestSuiteWalletConv(TestSuiteBase,TestSuiteShared):
|
|||
pw = True )
|
||||
|
||||
def ref_hincog_blkdev_conv_out(self):
|
||||
def do_run(cmd):
|
||||
from subprocess import run,PIPE,DEVNULL
|
||||
return run(cmd,stdout=PIPE,stderr=DEVNULL,check=True)
|
||||
if self.skip_for_win(): return 'skip'
|
||||
imsg('Creating block device image file')
|
||||
ic_img = joinpath(self.tmpdir,'hincog_blkdev_img')
|
||||
subprocess.check_output(['dd','if=/dev/zero','of='+ic_img,'bs=1K','count=1'],stderr=subprocess.PIPE)
|
||||
ic_dev = subprocess.check_output(['sudo','/sbin/losetup','-f']).strip().decode()
|
||||
do_run(['dd','if=/dev/zero','of='+ic_img,'bs=1K','count=1'])
|
||||
ic_dev = do_run(['sudo','/sbin/losetup','-f']).stdout.strip().decode()
|
||||
ic_dev_mode_orig = '{:o}'.format(os.stat(ic_dev).st_mode & 0xfff)
|
||||
ic_dev_mode = '0666'
|
||||
imsg("Changing permissions on loop device to '{}'".format(ic_dev_mode))
|
||||
subprocess.check_output(['sudo','chmod',ic_dev_mode,ic_dev],stderr=subprocess.PIPE)
|
||||
do_run(['sudo','chmod',ic_dev_mode,ic_dev])
|
||||
imsg("Attaching loop device '{}'".format(ic_dev))
|
||||
subprocess.check_output(['sudo','/sbin/losetup',ic_dev,ic_img])
|
||||
do_run(['sudo','/sbin/losetup',ic_dev,ic_img])
|
||||
self.ref_hincog_conv_out(ic_f=ic_dev)
|
||||
imsg("Detaching loop device '{}'".format(ic_dev))
|
||||
subprocess.check_output(['sudo','/sbin/losetup','-d',ic_dev])
|
||||
do_run(['sudo','/sbin/losetup','-d',ic_dev])
|
||||
imsg("Resetting permissions on loop device to '{}'".format(ic_dev_mode_orig))
|
||||
subprocess.check_output(['sudo','chmod',ic_dev_mode_orig,ic_dev],stderr=subprocess.PIPE)
|
||||
do_run(['sudo','chmod',ic_dev_mode_orig,ic_dev])
|
||||
return 'ok'
|
||||
|
||||
# wallet conversion tests
|
||||
|
|
|
|||
|
|
@ -20,7 +20,8 @@
|
|||
test/tooltest.py: Tests for the 'mmgen-tool' utility
|
||||
"""
|
||||
|
||||
import sys,os,subprocess,binascii
|
||||
import sys,os,binascii
|
||||
from subprocess import run,PIPE
|
||||
|
||||
repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),os.pardir)))
|
||||
os.chdir(repo_root)
|
||||
|
|
@ -153,6 +154,7 @@ if opt.list_cmds:
|
|||
Msg(fs.format('clean','Clean the tmp directory',w=w))
|
||||
sys.exit(0)
|
||||
if opt.list_names:
|
||||
tcmd = ['python3','test/tooltest2.py','--list-tested-cmds']
|
||||
tested_in = {
|
||||
'tooltest.py': [],
|
||||
'test.py': (
|
||||
|
|
@ -161,7 +163,7 @@ if opt.list_names:
|
|||
'add_label','remove_label','remove_address','twview',
|
||||
'getbalance','listaddresses','listaddress'),
|
||||
'test-release.sh': ('keyaddrlist2monerowallets','syncmonerowallets'),
|
||||
'tooltest2.py': subprocess.check_output(['test/tooltest2.py','--list-tested-cmds']).decode().split()
|
||||
'tooltest2.py': run(tcmd,stdout=PIPE,check=True).stdout.decode().split()
|
||||
}
|
||||
for v in cmd_data.values():
|
||||
tested_in['tooltest.py'] += list(v['cmd_data'].keys())
|
||||
|
|
@ -218,17 +220,18 @@ class MMGenToolTestUtils(object):
|
|||
else:
|
||||
msg_r('Testing {:{w}}'.format(full_name+':',w=msg_w))
|
||||
|
||||
p = subprocess.Popen(sys_cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
|
||||
a,b = p.communicate()
|
||||
cp = run(sys_cmd,stdout=PIPE,stderr=PIPE)
|
||||
out = cp.stdout
|
||||
err = cp.stderr
|
||||
if opt.debug:
|
||||
try: dmsg(b.decode())
|
||||
except: dmsg(repr(b))
|
||||
if not binary: a = a.decode()
|
||||
retcode = p.wait()
|
||||
if retcode != 0:
|
||||
msg('{}\n{}\n{}'.format(red('FAILED'),yellow('Command stderr output:'),b.decode()))
|
||||
rdie(1,'Called process returned with an error (retcode {})'.format(retcode))
|
||||
return (a,a.rstrip())[bool(strip)]
|
||||
try: dmsg(err.decode())
|
||||
except: dmsg(repr(err))
|
||||
if not binary:
|
||||
out = out.decode()
|
||||
if cp.returncode != 0:
|
||||
msg('{}\n{}\n{}'.format(red('FAILED'),yellow('Command stderr output:'),err.decode()))
|
||||
rdie(1,'Called process returned with an error (retcode {})'.format(cp.returncode))
|
||||
return (out,out.rstrip())[bool(strip)]
|
||||
|
||||
def run_cmd_chk(self,name,f1,f2,kwargs='',extra_msg='',strip_hex=False,add_opts=[]):
|
||||
idata = read_from_file(f1).rstrip()
|
||||
|
|
@ -391,9 +394,7 @@ class MMGenToolTestCmds(object):
|
|||
test_msg('command piping')
|
||||
if opt.verbose:
|
||||
sys.stderr.write(green('Executing ') + cyan(cmd) + '\n')
|
||||
p = subprocess.Popen(cmd,stdout=subprocess.PIPE,shell=True)
|
||||
res = p.stdout.read().decode().strip()
|
||||
p.wait()
|
||||
res = run(cmd,stdout=PIPE,shell=True).stdout.decode().strip()
|
||||
addr = read_from_tmpfile(cfg,'wif2addr3.out').strip()
|
||||
cmp_or_die(addr,res)
|
||||
ok()
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ test/tooltest2.py: Simple tests for the 'mmgen-tool' utility
|
|||
# TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
|
||||
|
||||
import sys,os,time
|
||||
from subprocess import Popen,PIPE
|
||||
from subprocess import run,PIPE
|
||||
from decimal import Decimal
|
||||
|
||||
repo_root = os.path.normpath(os.path.abspath(os.path.join(os.path.dirname(sys.argv[0]),os.pardir)))
|
||||
|
|
@ -649,26 +649,20 @@ def run_test(gid,cmd_name):
|
|||
def fork_cmd(cmd_name,args,out,opts,exec_code):
|
||||
cmd = list(tool_cmd) + (opts or []) + [cmd_name] + args
|
||||
vmsg('{} {}'.format(green('Executing'),cyan(' '.join(cmd))))
|
||||
p = Popen(cmd,stdin=(PIPE if stdin_input else None),stdout=PIPE,stderr=PIPE)
|
||||
if stdin_input:
|
||||
p.stdin.write(stdin_input)
|
||||
p.stdin.close()
|
||||
cmd_out = p.stdout.read()
|
||||
try:
|
||||
cmd_out = cmd_out.decode().strip()
|
||||
except:
|
||||
pass
|
||||
cmd_err = p.stderr.read()
|
||||
if cmd_err: vmsg(cmd_err.strip().decode())
|
||||
if p.wait() != 0:
|
||||
cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
|
||||
try: cmd_out = cp.stdout.decode()
|
||||
except: cmd_out = cp.stdout
|
||||
if cp.stderr:
|
||||
vmsg(cp.stderr.strip().decode())
|
||||
if cp.returncode != 0:
|
||||
import re
|
||||
m = re.match(b"tool command returned '(None|False)'"+NL.encode(),cmd_err)
|
||||
m = re.match(b"tool command returned '(None|False)'"+NL.encode(),cp.stderr)
|
||||
if m:
|
||||
return { b'None': None, b'False': False }[m.group(1)]
|
||||
else:
|
||||
ydie(1,'Spawned program exited with error: {}'.format(cmd_err))
|
||||
ydie(1,'Spawned program exited with error: {}'.format(cp.stderr))
|
||||
|
||||
return cmd_out
|
||||
return cmd_out.strip()
|
||||
|
||||
def run_func(cmd_name,args,out,opts,exec_code):
|
||||
vmsg('{}: {}{}'.format(purple('Running'),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue