rpc.py,daemon.py: fixes and cleanups

This commit is contained in:
The MMGen Project 2021-08-01 20:54:51 +00:00
commit 508069dc48
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
7 changed files with 104 additions and 84 deletions

View file

@ -21,7 +21,7 @@ daemon.py: Daemon control interface for the MMGen suite
"""
import shutil
from subprocess import run,PIPE
from subprocess import run,PIPE,CompletedProcess
from collections import namedtuple
from .exception import *
from .common import *
@ -36,10 +36,12 @@ class Daemon(MMGenObject):
debug = False
wait = True
use_pidfile = True
use_threads = False
cfg_file = None
new_console_mswin = False
ps_pid_mswin = False
lockfile = None
private_port = None
avail_opts = ()
avail_flags = () # like opts, but can be added or removed after instantiation
@ -49,67 +51,76 @@ class Daemon(MMGenObject):
def subclass_init(self): pass
def exec_cmd_thread(self,cmd,check):
def exec_cmd_thread(self,cmd):
import threading
tname = ('exec_cmd','exec_cmd_win_console')[self.platform == 'win' and self.new_console_mswin]
t = threading.Thread(target=getattr(self,tname),args=(cmd,check))
t = threading.Thread(target=getattr(self,tname),args=(cmd,))
t.daemon = True
t.start()
Msg_r(' \b') # blocks w/o this...crazy
if self.platform == 'win':
Msg_r(' \b') # blocks w/o this...crazy
return True
def exec_cmd_win_console(self,cmd,check):
def exec_cmd_win_console(self,cmd):
from subprocess import Popen,CREATE_NEW_CONSOLE,STARTUPINFO,STARTF_USESHOWWINDOW,SW_HIDE
si = STARTUPINFO(dwFlags=STARTF_USESHOWWINDOW,wShowWindow=SW_HIDE)
p = Popen(cmd,creationflags=CREATE_NEW_CONSOLE,startupinfo=si)
p.wait()
def exec_cmd(self,cmd,check):
cp = run(cmd,check=False,stdout=PIPE,stderr=PIPE)
def exec_cmd(self,cmd):
try:
cp = run(cmd,check=False,stdout=PIPE,stderr=PIPE)
except Exception as e:
ymsg(f'Error starting executable: {type(e).__name__} [Errno {e.errno}]')
raise
if self.debug:
print(cp)
if check and cp.returncode != 0:
raise MMGenCalledProcessError(cp)
return cp
def run_cmd(self,cmd,silent=False,check=True,is_daemon=False):
def run_cmd(self,cmd,silent=False,is_daemon=False):
if is_daemon and not silent:
msg('Starting {} {} on port {}'.format(self.net_desc,self.desc,self.rpc_port))
msg(f'Starting {self.desc} on port {self.bind_port}')
if self.debug:
msg('\nExecuting: {}'.format(' '.join(cmd)))
if self.platform == 'win' and is_daemon:
cp = self.exec_cmd_thread(cmd,check)
if (self.platform == 'win' or self.use_threads) and is_daemon:
ret = self.exec_cmd_thread(cmd)
else:
cp = self.exec_cmd(cmd,check)
ret = self.exec_cmd(cmd)
if cp:
out = cp.stdout.decode().rstrip()
err = cp.stderr.decode().rstrip()
if out and (self.debug or not silent):
msg(out)
if err and (self.debug or (cp.returncode and not silent)):
msg(err)
if isinstance(ret,CompletedProcess):
if ret.stdout and (self.debug or not silent):
msg(ret.stdout.decode().rstrip())
if ret.stderr and (self.debug or (ret.returncode and not silent)):
msg(ret.stderr.decode().rstrip())
return cp
return ret
@property
def pid(self):
if self.ps_pid_mswin and self.platform == 'win':
# TODO: assumes only one running instance of given daemon
cp = self.run_cmd(['ps','-Wl'],silent=True,check=False)
cp = self.run_cmd(['ps','-Wl'],silent=True)
for line in cp.stdout.decode().splitlines():
if f'{self.exec_fn}.exe' in line:
return line.split()[3] # use Windows, not Cygwin, PID
die(2,'PID for {!r} not found in ps output'.format(ss))
die(2,f'PID for {ss!r} not found in ps output')
elif self.use_pidfile:
return open(self.pidfile).read().strip()
else:
return '(unknown)'
return '(N/A)'
@property
def bind_port(self):
return self.private_port or self.rpc_port
@property
def state(self):
return 'ready' if self.test_socket('localhost',self.rpc_port) else 'stopped'
if self.debug:
msg(f'Testing port {self.bind_port}')
return 'ready' if self.test_socket('localhost',self.bind_port) else 'stopped'
@property
def stop_cmd(self):
@ -120,29 +131,37 @@ class Daemon(MMGenObject):
def do_start(self,silent=False):
if not silent:
msg('Starting {} {} on port {}'.format(self.net_desc,self.desc,self.rpc_port))
msg(f'Starting {self.desc} on port {self.bind_port}')
return self.run_cmd(self.start_cmd,silent=True,is_daemon=True)
def do_stop(self,silent=False):
if not silent:
msg('Stopping {} {} on port {}'.format(self.net_desc,self.desc,self.rpc_port))
msg(f'Stopping {self.desc} on port {self.bind_port}')
return self.run_cmd(self.stop_cmd,silent=True)
def cli(self,*cmds,silent=False,check=True):
return self.run_cmd(self.cli_cmd(*cmds),silent=silent,check=check)
def cli(self,*cmds,silent=False):
return self.run_cmd(self.cli_cmd(*cmds),silent=silent)
def state_msg(self,extra_text=None):
extra_text = f'{extra_text} ' if extra_text else ''
return '{:{w}} {:10} {}'.format(
f'{self.desc} {extra_text}running',
f'pid {self.pid}',
f'port {self.bind_port}',
w = 52 + len(extra_text) )
def start(self,silent=False):
if self.state == 'ready':
if not silent:
m = '{} {} already running with pid {}'
msg(m.format(self.net_desc,self.desc,self.pid))
msg(self.state_msg(extra_text='already'))
return True
self.wait_for_state('stopped')
os.makedirs(self.datadir,exist_ok=True)
if self.cfg_file and not 'keep_cfg_file' in self.flags:
open('{}/{}'.format(self.datadir,self.cfg_file),'w').write(self.cfg_file_hdr)
if self.datadir:
os.makedirs(self.datadir,exist_ok=True)
if self.cfg_file and not 'keep_cfg_file' in self.flags:
open('{}/{}'.format(self.datadir,self.cfg_file),'w').write(self.cfg_file_hdr)
if self.use_pidfile and os.path.exists(self.pidfile):
# OpenEthereum just overwrites the data in the existing pidfile without zeroing it first,
@ -150,11 +169,12 @@ class Daemon(MMGenObject):
os.unlink(self.pidfile)
for i in range(20):
try: ret = self.do_start(silent=silent)
except FileNotFoundError as e:
die(e.errno,e.strerror)
except: pass
else: break
try:
ret = self.do_start(silent=silent)
except Exception as e:
ymsg(str(e))
else:
break
time.sleep(1)
else:
die(2,'Unable to start daemon')
@ -172,7 +192,7 @@ class Daemon(MMGenObject):
return ret
else:
if not silent:
msg('{} {} on port {} not running'.format(self.net_desc,self.desc,self.rpc_port))
msg(f'{self.desc} on port {self.bind_port} not running')
return True
def restart(self,silent=False):
@ -191,8 +211,7 @@ class Daemon(MMGenObject):
return True
time.sleep(0.2)
else:
m = 'Wait for state {!r} timeout exceeded for daemon {} {} (port {})'
die(2,m.format(req_state,self.coin,self.network,self.rpc_port))
die(2,f'Wait for state {req_state!r} timeout exceeded for {self.desc} (port {self.bind_port})')
@property
def flags(self):
@ -215,20 +234,19 @@ class Daemon(MMGenObject):
self._flags.remove(val)
def remove_datadir(self):
if self.state == 'stopped':
try: # exception handling required for MSWin/MSYS2
run(['/bin/rm','-rf',self.datadir])
except:
pass
else:
msg(f'Cannot remove {self.datadir!r} - daemon is not stopped')
if self.datadir:
if self.state == 'stopped':
try: # exception handling required for MSWin/MSYS2
run(['/bin/rm','-rf',self.datadir])
except:
pass
else:
msg(f'Cannot remove {self.datadir!r} - daemon is not stopped')
class MoneroWalletDaemon(Daemon):
desc = 'RPC daemon'
net_desc = 'Monero wallet'
exec_fn = 'monero-wallet-rpc'
coin = 'XMR'
network = 'wallet RPC'
new_console_mswin = True
ps_pid_mswin = True
@ -250,9 +268,12 @@ class MoneroWalletDaemon(Daemon):
if port_shift:
self.rpc_port += port_shift
bn = 'monero-wallet-rpc'
id_str = f'{bn}-{self.rpc_port}'
self.datadir = os.path.join(datadir or ('','test')[test_suite], bn)
self.desc = 'Monero wallet {} {}RPC daemon'.format(
'testnet' if testnet else 'mainnet',
'test suite ' if test_suite else '' )
id_str = f'{self.exec_fn}-{self.bind_port}'
self.datadir = os.path.join(datadir or ('','test')[test_suite], self.exec_fn)
self.pidfile = os.path.join(self.datadir,id_str+'.pid')
self.logfile = os.path.join(self.datadir,id_str+'.log')
@ -294,7 +315,7 @@ class MoneroWalletDaemon(Daemon):
@property
def start_cmd(self):
return (['monero-wallet-rpc'] + self.daemon_args + self.usr_daemon_args )
return ([self.exec_fn] + self.daemon_args + self.usr_daemon_args )
class CoinDaemon(Daemon):
networks = ('mainnet','testnet','regtest')
@ -329,7 +350,8 @@ class CoinDaemon(Daemon):
proto = None,
opts = None,
port_shift = None,
datadir = None ):
datadir = None,
daemon_id = None ):
assert network_id or proto, 'CoinDaemon_chk1'
assert not (network_id and proto), 'CoinDaemon_chk2'
@ -346,7 +368,7 @@ class CoinDaemon(Daemon):
coin = coin.upper()
daemon_ids = cls.coins[coin].daemon_ids
daemon_id = g.daemon_id or daemon_ids[0]
daemon_id = daemon_id or g.daemon_id or daemon_ids[0]
if daemon_id not in daemon_ids:
die(1,f'{daemon_id!r}: invalid daemon_id - valid choices: {fmt_list(daemon_ids)}')
@ -369,7 +391,8 @@ class CoinDaemon(Daemon):
proto = None,
opts = None,
port_shift = None,
datadir = None ):
datadir = None,
daemon_id = None ):
self.test_suite = test_suite
@ -406,7 +429,6 @@ class CoinDaemon(Daemon):
else:
dfl_datadir = os.path.join(g.data_dir_root,'regtest',self.coin.lower())
elif test_suite:
self.desc = 'test suite daemon'
rel_datadir = os.path.join('test','daemons',self.coin.lower())
else:
dfl_datadir = os.path.join(*self.datadirs[g.platform])
@ -424,16 +446,19 @@ class CoinDaemon(Daemon):
if self.datadir_is_subdir:
self.datadir = os.path.join(self.datadir,self.testnet_dir)
self.init_rpc_port(test_suite,port_shift)
self.pidfile = '{}/{}-daemon-{}.pid'.format(self.datadir,self.network,self.rpc_port)
self.desc = '{} {} {}daemon'.format(self.coind_name,self.network,'test suite ' if test_suite else '')
self.subclass_init()
def init_rpc_port(self,test_suite,port_shift):
self.port_shift = (1237 if test_suite else 0) + (port_shift or 0)
self.rpc_port = getattr(self.rpc_ports,self.network) + self.port_shift
if g.rpc_port: # user-set global overrides everything else
self.rpc_port = g.rpc_port
self.pidfile = '{}/{}-daemon-{}.pid'.format(self.datadir,self.network,self.rpc_port)
self.net_desc = '{} {}'.format(self.coin_name,self.network)
self.subclass_init()
@property
def start_cmd(self):
return ([self.exec_fn]
@ -495,7 +520,7 @@ class bitcoin_core_daemon(CoinDaemon):
@property
def state(self):
cp = self.cli('getblockcount',silent=True,check=False)
cp = self.cli('getblockcount',silent=True)
err = cp.stderr.decode()
if ("error: couldn't connect" in err
or "error: Could not connect" in err
@ -554,7 +579,7 @@ class monero_daemon(CoinDaemon):
def subclass_init(self):
if self.network == 'testnet':
self.net_desc = f'{self.coin_name} stagenet'
self.desc = 'Monero stagenet {}daemon'.format('test suite ' if self.test_suite else '')
self.p2p_port = self.rpc_port - 1
self.zmq_port = self.rpc_port + 1
@ -588,7 +613,7 @@ class openethereum_daemon(CoinDaemon):
version_pat = r'OpenEthereum//v(\d+)\.(\d+)\.(\d+)'
exec_fn = 'openethereum'
ps_pid_mswin = True
ports_shift = _nw(0,20,40)
ports_shift = _nw(0,10,20)
rpc_ports = _nw(*[8545 + n for n in ports_shift]) # testnet and regtest are non-standard
cfg_file = 'parity.conf'
datadirs = {
@ -621,7 +646,6 @@ class openethereum_daemon(CoinDaemon):
class parity_daemon(openethereum_daemon):
daemon_data = _dd('Parity', 2007002, '2.7.2')
version_pat = r'Parity-Ethereum//v(\d+)\.(\d+)\.(\d+)'
exec_fn = 'parity'
ports_shift = _nw(100,120,140)
ports_shift = _nw(100,110,120)
rpc_ports = _nw(*[8545 + n for n in ports_shift]) # non-standard

View file

@ -94,9 +94,6 @@ If network fee estimation fails, the user will be prompted for a fee.
Network-estimated fees will be multiplied by the value of '--tx-fee-adj',
if specified.
Ages of transactions are approximate based on an average block discovery
interval of one per {proto.avg_bdi} seconds.
All addresses on the command line can be either {proto.name} addresses or {g.proj_name}
addresses of the form <seed ID>:<index>.

View file

@ -117,7 +117,7 @@ class MMGenRegtest(MMGenObject):
await self.rpc_call('stop')
def init_daemon(self,reindex=False):
self.d.net_desc = self.coin.upper()
self.d.desc = f'{self.d.coind_name} regtest daemon'
if reindex:
self.d.usr_coind_args.append('--reindex')

View file

@ -234,11 +234,7 @@ class CallSigs:
class bitcoin_cash_node(litecoin_core): pass
class Ethereum:
class openethereum: pass
class parity: pass
class Ethereum: pass
class RPCClient(MMGenObject):
@ -413,7 +409,7 @@ class BitcoinRPCClient(RPCClient,metaclass=aInitMeta):
self.proto = proto
self.daemon = daemon
self.call_sigs = getattr(getattr(CallSigs,proto.base_proto),daemon.id)
self.call_sigs = getattr(getattr(CallSigs,proto.base_proto),daemon.id,None)
super().__init__(
host = 'localhost' if g.test_suite else (g.rpc_host or 'localhost'),
@ -591,7 +587,7 @@ class EthereumRPCClient(RPCClient,metaclass=aInitMeta):
async def __ainit__(self,proto,daemon,backend):
self.proto = proto
self.daemon = daemon
self.call_sigs = getattr(getattr(CallSigs,proto.base_proto),daemon.id)
self.call_sigs = getattr(getattr(CallSigs,proto.base_proto),daemon.id,None)
super().__init__(
host = 'localhost' if g.test_suite else (g.rpc_host or 'localhost'),

View file

@ -201,7 +201,7 @@ def test_daemons_ops(*network_ids,op,remove_datadir=False):
silent = not opt.verbose and not getattr(opt,'exact_output',False)
ret = False
for network_id in network_ids:
d = CoinDaemon(network_id,test_suite=True)
d = CoinDaemon(network_id,test_suite=True,daemon_id=g.daemon_id)
if remove_datadir:
d.stop(silent=True)
d.remove_datadir()

View file

@ -64,7 +64,7 @@ for network_id in ids:
d.debug = d.debug or opt.debug
d.wait = not opt.no_wait
if opt.get_state:
print('{} {} (port {}) is {}'.format(d.net_desc,d.desc,d.rpc_port,d.state))
print(d.state_msg())
elif opt.testing:
print(' '.join(getattr(d,action+'_cmd')))
else:

View file

@ -319,7 +319,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
def eth_args(self):
return ['--outdir={}'.format(self.tmpdir),'--coin='+self.proto.coin,'--rpc-port={}'.format(self.rpc_port),'--quiet']
def setup(self):
async def setup(self):
self.spawn('',msg_only=True)
if solc_ver in self.solc_vers:
imsg('Found solc version {}'.format(solc_ver))
@ -334,6 +334,9 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
if not opt.no_daemon_autostart:
if not start_test_daemons(self.proto.coin+'_rt',remove_datadir=True):
return False
from mmgen.rpc import rpc_init
rpc = await rpc_init(self.proto)
imsg('Daemon: {} v{}'.format(rpc.daemon.coind_name,rpc.daemon_version_str))
return 'ok'
def wallet_upgrade(self,src_file):