subprocess.run(): use text arg (10 files)

This commit is contained in:
The MMGen Project 2025-10-03 10:34:04 +00:00
commit 2b7080c227
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
10 changed files with 31 additions and 32 deletions

View file

@ -61,7 +61,7 @@ def get_terminfo_colors(term=None):
cmd.append(term)
try:
cmdout = run(cmd, stdout=PIPE, check=True).stdout.decode()
cmdout = run(cmd, stdout=PIPE, check=True, text=True).stdout
except:
set_vt100()
return None

View file

@ -247,14 +247,14 @@ class Daemon(Lockable):
@classmethod
def get_exec_version_str(cls):
try:
cp = run([cls.exec_fn, cls.version_info_arg], stdout=PIPE, stderr=PIPE, check=True)
cp = run([cls.exec_fn, cls.version_info_arg], stdout=PIPE, stderr=PIPE, check=True, text=True)
except Exception as e:
die(2, f'{e}\nUnable to execute {cls.exec_fn}')
if cp.returncode:
die(2, f'Unable to execute {cls.exec_fn}')
else:
res = cp.stdout.decode().splitlines()
res = cp.stdout.splitlines()
return (res[0] if len(res) == 1 else [s for s in res if 'ersion' in s][0]).strip()
class RPCDaemon(Daemon):

View file

@ -73,8 +73,9 @@ class MacOSRamDisk:
cp = run(
['hdiutil', 'attach', '-nomount', f'ram://{2048 * self.size}'],
stdout = PIPE,
text = True,
check = True)
self.dev_name = cp.stdout.decode().strip()
self.dev_name = cp.stdout.strip()
self.cfg._util.qmsg(f'Created {self.desc} {self.label.hl()} [{self.dev_name}]')
run(['diskutil', 'eraseVolume', 'APFS', self.label, self.dev_name], stdout=redir, check=True)
diskutil_size = self.get_diskutil_size()

View file

@ -100,7 +100,7 @@ def check_solc_version():
The output is used by other programs, so write to stdout only
"""
try:
cp = run(['solc', '--version'], check=True, stdout=PIPE)
cp = run(['solc', '--version'], check=True, text=True, stdout=PIPE)
except:
msg('solc missing or could not be executed') # this must go to stderr
return False
@ -109,7 +109,7 @@ def check_solc_version():
Msg('solc exited with error')
return False
line = cp.stdout.decode().splitlines()[1]
line = cp.stdout.splitlines()[1]
version_str = re.sub(r'Version:\s*', '', line)
m = re.match(r'(\d+)\.(\d+)\.(\d+)', version_str)

View file

@ -829,9 +829,8 @@ class CmdTestEthdev(CmdTestEthdevMethods, CmdTestBase, CmdTestShared):
write_to_file(pwfile, '')
run(['rm', '-rf', self.keystore_dir])
cmd = f'geth account new --password={pwfile} --lightkdf --keystore {self.keystore_dir}'
cp = run(cmd.split(), stdout=PIPE, stderr=PIPE)
if cp.returncode:
die(1, cp.stderr.decode())
if (cp := run(cmd.split(), stdout=PIPE, stderr=PIPE, text=True)).returncode:
die(1, cp.stderr)
def make_genesis(signer_addr, prealloc_addr):
return {
@ -875,9 +874,8 @@ class CmdTestEthdev(CmdTestEthdevMethods, CmdTestBase, CmdTestShared):
def init_genesis(fn):
cmd = f'{d.exec_fn} init --datadir {d.datadir} {fn}'
cp = run(cmd.split(), stdout=PIPE, stderr=PIPE)
if cp.returncode:
die(1, cp.stderr.decode())
if (cp := run(cmd.split(), stdout=PIPE, stderr=PIPE, text=True)).returncode:
die(1, cp.stderr)
d.stop(quiet=True)
d.remove_datadir()
@ -1349,10 +1347,9 @@ class CmdTestEthdev(CmdTestEthdevMethods, CmdTestBase, CmdTestShared):
'--outdir=' + odir
] + cmd_args + [self.proto.checksummed_addr(dfl_devaddr)]
imsg('Executing: {}'.format(' '.join(cmd)))
cp = run(cmd, stdout=DEVNULL, stderr=PIPE)
if cp.returncode != 0:
if (cp := run(cmd, stdout=DEVNULL, stderr=PIPE, text=True)).returncode:
rmsg('solc failed with the following output:')
die(2, cp.stderr.decode())
die(2, cp.stderr)
imsg('ERC20 token {!r} compiled'.format(token_data['symbol']))
return 'ok'

View file

@ -76,14 +76,14 @@ class TestProxy:
if port_in_use(self.port):
omsg(f'Port {self.port} already in use. Assuming SSH SOCKS server is running')
else:
cp = run(a + b0 + b1, stdout=PIPE, stderr=PIPE)
if err := cp.stderr.decode():
omsg(err)
cp = run(a + b0 + b1, stdout=PIPE, stderr=PIPE, text=True)
if cp.stderr:
omsg(cp.stderr)
if cp.returncode == 0:
start_proxy()
elif 'onnection refused' in err:
elif 'onnection refused' in cp.stderr:
die(2, fmt(self.no_ssh_errmsg, indent=' '))
elif 'ermission denied' in err:
elif 'ermission denied' in cp.stderr:
msg(fmt(self.bad_perm_errmsg.format(' '.join(a + b2)), indent=' ', strip_char='\t'))
from mmgen.ui import keypress_confirm
keypress_confirm(cfg, 'Continue?', do_exit=True)

View file

@ -138,7 +138,7 @@ SUPPORTED EXTERNAL TOOLS:
}
def get_cmd_output(cmd, input=None):
return run(cmd, input=input, stdout=PIPE, stderr=DEVNULL).stdout.decode().splitlines()
return run(cmd, input=input, stdout=PIPE, stderr=DEVNULL, text=True).stdout.splitlines()
saved_results = {}
@ -201,7 +201,7 @@ class GenToolKeyconv(GenTool):
class GenToolZcash_mini(GenTool):
desc = 'zcash-mini'
def run(self, sec, vcoin):
o = get_cmd_output(['zcash-mini', '-key', '-simple'], input=(sec.wif+'\n').encode())
o = get_cmd_output(['zcash-mini', '-key', '-simple'], input=sec.wif+'\n')
return gtr(o[1], o[0], o[-1])
class GenToolPycoin(GenTool):

View file

@ -324,10 +324,10 @@ tested_solc_ver = '0.8.26'
def check_solc_ver():
cmd = 'python3 scripts/create-token.py --check-solc-version'
try:
cp = run(cmd.split(), check=False, stdout=PIPE)
cp = run(cmd.split(), check=False, stdout=PIPE, text=True)
except Exception as e:
die(4, f'Unable to execute {cmd!r}: {e}')
res = cp.stdout.decode().strip()
res = cp.stdout.strip()
if cp.returncode == 0:
omsg(
orange(f'Found supported solc version {res}') if res == tested_solc_ver else
@ -352,7 +352,7 @@ def get_ethkey():
return None
def do_run(cmd, check=True):
return run(cmd, stdout=PIPE, stderr=DEVNULL, check=check)
return run(cmd, stdout=PIPE, stderr=DEVNULL, check=check, text=True)
def test_exec(cmd):
try:
@ -459,10 +459,10 @@ class VirtBlockDeviceLinux(VirtBlockDeviceBase):
def _get_associations(self):
cmd = ['sudo', 'losetup', '-n', '-O', 'NAME', '-j', str(self.img_path)]
return do_run(cmd).stdout.decode().splitlines()
return do_run(cmd).stdout.splitlines()
def get_new_dev(self):
return do_run(['sudo', 'losetup', '-f']).stdout.decode().strip()
return do_run(['sudo', 'losetup', '-f']).stdout.strip()
def do_create(self, size, path):
do_run(['truncate', f'--size={size}', str(path)])

View file

@ -151,9 +151,9 @@ class unit_tests:
'--stdout',
init_proto(cfg, 'eth').checksummed_addr('deadbeef'*5),
]
cp = run(cmd, stdout=PIPE, stderr=PIPE)
vmsg(cp.stderr.decode())
cp = run(cmd, stdout=PIPE, stderr=PIPE, text=True)
vmsg(cp.stderr)
if cp.returncode:
msg(cp.stderr.decode())
msg(cp.stderr)
return False
return True

View file

@ -189,8 +189,9 @@ if cfg.testing_status:
'tooltest2.py': run(
['python3', 'test/tooltest2.py', '--list-tested-cmds'],
stdout = PIPE,
text = True,
check = True
).stdout.decode().split()
).stdout.split()
}
for v in cmd_data.values():
tested_in['tooltest.py'] += list(v['cmd_data'].keys())
@ -460,7 +461,7 @@ class MMGenToolTestCmds:
test_msg('command piping')
if cfg.verbose:
sys.stderr.write(green('Executing ') + cyan(cmd) + '\n')
res = run(cmd, stdout=PIPE, shell=True).stdout.decode().strip()
res = run(cmd, stdout=PIPE, shell=True, text=True).stdout.strip()
addr = read_from_tmpfile(tcfg, 'wif2addr3.out').strip()
cmp_or_die(addr, res)
ok()