daemon.py 14 KB


  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. daemon: Daemon control interface for the MMGen suite
  20. """
  21. import os,time,importlib
  22. from subprocess import run,PIPE,CompletedProcess
  23. from collections import namedtuple
  24. from .globalvars import g
  25. from .color import set_vt100
  26. from .util import msg,Msg_r,ymsg,die,remove_dups,oneshot_warning
  27. from .flags import *
  28. _dd = namedtuple('daemon_data',['coind_name','coind_version','coind_version_str']) # latest tested version
  29. _nw = namedtuple('coin_networks',['mainnet','testnet','regtest'])
  30. class Daemon(Lockable):
  31. desc = 'daemon'
  32. debug = False
  33. wait = True
  34. use_pidfile = True
  35. force_kill = False
  36. pids = ()
  37. use_threads = False
  38. cfg_file = None
  39. new_console_mswin = False
  40. lockfile = None
  41. private_port = None
  42. avail_opts = ()
  43. avail_flags = () # like opts, but can be set or unset after instantiation
  44. _reset_ok = ('debug','wait','pids')
  45. def __init__(self,opts=None,flags=None):
  46. self.platform = g.platform
  47. if self.platform == 'win':
  48. self.use_pidfile = False
  49. self.use_threads = True
  50. self.opt = ClassOpts(self,opts)
  51. self.flag = ClassFlags(self,flags)
  52. def exec_cmd_thread(self,cmd):
  53. import threading
  54. tname = ('exec_cmd','exec_cmd_win_console')[self.platform == 'win' and self.new_console_mswin]
  55. t = threading.Thread(target=getattr(self,tname),args=(cmd,))
  56. t.daemon = True
  57. t.start()
  58. if self.platform == 'win':
  59. Msg_r(' \b') # blocks w/o this...crazy
  60. return True
  61. def exec_cmd_win_console(self,cmd):
  62. from subprocess import Popen,CREATE_NEW_CONSOLE,STARTUPINFO,STARTF_USESHOWWINDOW,SW_HIDE
  63. si = STARTUPINFO(dwFlags=STARTF_USESHOWWINDOW,wShowWindow=SW_HIDE)
  64. p = Popen(cmd,creationflags=CREATE_NEW_CONSOLE,startupinfo=si)
  65. p.wait()
  66. def exec_cmd(self,cmd,is_daemon=False):
  67. out = (PIPE,None)[is_daemon and self.opt.no_daemonize]
  68. try:
  69. cp = run(cmd,check=False,stdout=out,stderr=out)
  70. except Exception as e:
  71. die( 'MMGenCalledProcessError', f'Error starting executable: {type(e).__name__} [Errno {e.errno}]' )
  72. set_vt100()
  73. if self.debug:
  74. print(cp)
  75. return cp
  76. def run_cmd(self,cmd,silent=False,is_daemon=False):
  77. if self.debug:
  78. msg('\n\n')
  79. if self.debug or (is_daemon and not silent):
  80. msg(f'Starting {self.desc} on port {self.bind_port}')
  81. if self.use_threads and is_daemon and not self.opt.no_daemonize:
  82. ret = self.exec_cmd_thread(cmd)
  83. else:
  84. ret = self.exec_cmd(cmd,is_daemon)
  85. if isinstance(ret,CompletedProcess):
  86. if ret.stdout and (self.debug or not silent):
  87. msg(ret.stdout.decode().rstrip())
  88. if ret.stderr and (self.debug or (ret.returncode and not silent)):
  89. msg(ret.stderr.decode().rstrip())
  90. return ret
  91. @property
  92. def pid(self):
  93. if self.use_pidfile:
  94. with open(self.pidfile) as fp:
  95. return fp.read().strip()
  96. elif self.platform == 'win':
  97. """
  98. Assumes only one running instance of given daemon. If multiple daemons are running,
  99. the first PID in the list is returned and self.pids is set to the PID list.
  100. """
  101. ss = f'{self.exec_fn}.exe'
  102. cp = self.run_cmd(['ps','-Wl'],silent=True)
  103. self.pids = ()
  104. # use Windows, not Cygwin, PID
  105. pids = tuple(line.split()[3] for line in cp.stdout.decode().splitlines() if ss in line)
  106. if pids:
  107. if len(pids) > 1:
  108. self.pids = pids
  109. return pids[0]
  110. elif self.platform == 'linux':
  111. ss = ' '.join(self.start_cmd)
  112. cp = self.run_cmd(['pgrep','-f',ss],silent=True)
  113. if cp.stdout:
  114. return cp.stdout.strip().decode()
  115. die(2,f'{ss!r} not found in process list, cannot determine PID')
  116. @property
  117. def bind_port(self):
  118. return self.private_port or self.rpc_port
  119. @property
  120. def state(self):
  121. if self.debug:
  122. msg(f'Testing port {self.bind_port}')
  123. return 'ready' if self.test_socket('localhost',self.bind_port) else 'stopped'
  124. @property
  125. def start_cmds(self):
  126. return [self.start_cmd]
  127. @property
  128. def stop_cmd(self):
  129. return (
  130. ['kill','-Wf',self.pid] if self.platform == 'win' else
  131. ['kill','-9',self.pid] if self.force_kill else
  132. ['kill',self.pid] )
  133. def cmd(self,action,*args,**kwargs):
  134. return getattr(self,action)(*args,**kwargs)
  135. def do_start(self,silent=False):
  136. if not silent:
  137. msg(f'Starting {self.desc} on port {self.bind_port}')
  138. return self.run_cmd(self.start_cmd,silent=True,is_daemon=True)
  139. def do_stop(self,silent=False):
  140. if not silent:
  141. msg(f'Stopping {self.desc} on port {self.bind_port}')
  142. return self.run_cmd(self.stop_cmd,silent=True)
  143. def cli(self,*cmds,silent=False):
  144. return self.run_cmd(self.cli_cmd(*cmds),silent=silent)
  145. def state_msg(self,extra_text=None):
  146. extra_text = f'{extra_text} ' if extra_text else ''
  147. return '{:{w}} {:10} {}'.format(
  148. f'{self.desc} {extra_text}running',
  149. 'pid N/A' if self.pid is None or self.pids else f'pid {self.pid}',
  150. f'port {self.bind_port}',
  151. w = 52 + len(extra_text) )
  152. def pre_start(self): pass
  153. def start(self,quiet=False,silent=False):
  154. if self.state == 'ready':
  155. if not (quiet or silent):
  156. msg(self.state_msg(extra_text='already'))
  157. return True
  158. self.wait_for_state('stopped')
  159. self.pre_start()
  160. ret = self.do_start(silent=silent)
  161. if self.wait:
  162. self.wait_for_state('ready')
  163. return ret
  164. def stop(self,quiet=False,silent=False):
  165. if self.state == 'ready':
  166. ret = self.do_stop(silent=silent)
  167. if self.pids:
  168. msg('Warning: multiple PIDs [{}] -- we may be stopping the wrong instance'.format(
  169. fmt_list(self.pids,fmt='bare')
  170. ))
  171. if self.wait:
  172. self.wait_for_state('stopped')
  173. return ret
  174. else:
  175. if not (quiet or silent):
  176. msg(f'{self.desc} on port {self.bind_port} not running')
  177. return True
  178. def restart(self,silent=False):
  179. self.stop(silent=silent)
  180. return self.start(silent=silent)
  181. def test_socket(self,host,port,timeout=10):
  182. import socket
  183. try:
  184. socket.create_connection((host,port),timeout=timeout).close()
  185. except:
  186. return False
  187. else:
  188. return True
  189. def wait_for_state(self,req_state):
  190. for i in range(300):
  191. if self.state == req_state:
  192. return True
  193. time.sleep(0.2)
  194. else:
  195. die(2,f'Wait for state {req_state!r} timeout exceeded for {self.desc} (port {self.bind_port})')
  196. class RPCDaemon(Daemon):
  197. avail_opts = ('no_daemonize',)
  198. def __init__(self):
  199. super().__init__()
  200. self.desc = '{} {} {}RPC daemon'.format(
  201. self.rpc_type,
  202. getattr(self.proto.network_names,self.proto.network),
  203. 'test suite ' if self.test_suite else '' )
  204. self._set_ok += ('usr_daemon_args',)
  205. self.usr_daemon_args = []
  206. @property
  207. def start_cmd(self):
  208. return ([self.exec_fn] + self.daemon_args + self.usr_daemon_args)
  209. class CoinDaemon(Daemon):
  210. networks = ('mainnet','testnet','regtest')
  211. cfg_file_hdr = ''
  212. avail_flags = ('keep_cfg_file',)
  213. avail_opts = ('no_daemonize','online')
  214. testnet_dir = None
  215. test_suite_port_shift = 1237
  216. rpc_user = None
  217. rpc_password = None
  218. version_info_arg = '--version'
  219. _cd = namedtuple('coins_data',['daemon_ids'])
  220. coins = {
  221. 'BTC': _cd(['bitcoin_core']),
  222. 'BCH': _cd(['bitcoin_cash_node']),
  223. 'LTC': _cd(['litecoin_core']),
  224. 'XMR': _cd(['monero']),
  225. 'ETH': _cd(['openethereum','geth','erigon']),
  226. 'ETC': _cd(['parity']),
  227. }
  228. @classmethod
  229. def all_daemon_ids(cls):
  230. return [i for coin in cls.coins for i in cls.coins[coin].daemon_ids]
  231. class warn_blacklisted(oneshot_warning):
  232. color = 'yellow'
  233. message = 'blacklisted daemon: {!r}'
  234. @classmethod
  235. def get_daemon_ids(cls,coin):
  236. ret = cls.coins[coin].daemon_ids
  237. if 'erigon' in ret and not g.enable_erigon:
  238. ret.remove('erigon')
  239. if g.blacklist_daemons:
  240. blacklist = g.blacklist_daemons.split()
  241. def gen():
  242. for daemon_id in ret:
  243. if daemon_id in blacklist:
  244. cls.warn_blacklisted(div=daemon_id,fmt_args=[daemon_id])
  245. else:
  246. yield daemon_id
  247. ret = list(gen())
  248. return ret
  249. @classmethod
  250. def get_daemon(cls,coin,daemon_id,proto=None):
  251. if not proto:
  252. from .protocol import init_proto
  253. proto = init_proto(coin)
  254. return getattr(
  255. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.daemon'),
  256. daemon_id+'_daemon' )
  257. @classmethod
  258. def get_network_ids(cls):
  259. from .protocol import CoinProtocol
  260. def gen():
  261. for coin in cls.coins:
  262. for daemon_id in cls.get_daemon_ids(coin):
  263. for network in cls.get_daemon(coin,daemon_id).networks:
  264. yield CoinProtocol.Base.create_network_id(coin,network)
  265. return remove_dups(list(gen()),quiet=True)
  266. @classmethod
  267. def get_exec_version_str(cls):
  268. try:
  269. cp = run([cls.exec_fn,cls.version_info_arg],stdout=PIPE,stderr=PIPE,check=True)
  270. except:
  271. die(2,f'Unable to execute {cls.exec_fn}')
  272. if cp.returncode:
  273. die(2,f'Unable to execute {cls.exec_fn}')
  274. else:
  275. res = cp.stdout.decode().splitlines()
  276. return ( res[0] if len(res) == 1 else [s for s in res if 'ersion' in s][0] ).strip()
  277. def __new__(cls,
  278. network_id = None,
  279. proto = None,
  280. opts = None,
  281. flags = None,
  282. test_suite = False,
  283. port_shift = None,
  284. p2p_port = None,
  285. datadir = None,
  286. daemon_id = None ):
  287. assert network_id or proto, 'CoinDaemon_chk1'
  288. assert not (network_id and proto), 'CoinDaemon_chk2'
  289. if proto:
  290. network_id = proto.network_id
  291. network = proto.network
  292. coin = proto.coin
  293. else:
  294. network_id = network_id.lower()
  295. from .protocol import CoinProtocol,init_proto
  296. proto = init_proto(network_id=network_id)
  297. coin,network = CoinProtocol.Base.parse_network_id(network_id)
  298. coin = coin.upper()
  299. daemon_ids = cls.get_daemon_ids(coin)
  300. if not daemon_ids:
  301. die(1,f'No configured daemons for coin {coin}!')
  302. daemon_id = daemon_id or g.daemon_id or daemon_ids[0]
  303. if daemon_id not in daemon_ids:
  304. die(1,f'{daemon_id!r}: invalid daemon_id - valid choices: {fmt_list(daemon_ids)}')
  305. me = Daemon.__new__(cls.get_daemon( None, daemon_id, proto=proto ))
  306. assert network in me.networks, f'{network!r}: unsupported network for daemon {daemon_id}'
  307. me.network_id = network_id
  308. me.network = network
  309. me.coin = coin
  310. me.id = daemon_id
  311. me.proto = proto
  312. return me
  313. def __init__(self,
  314. network_id = None,
  315. proto = None,
  316. opts = None,
  317. flags = None,
  318. test_suite = False,
  319. port_shift = None,
  320. p2p_port = None,
  321. datadir = None,
  322. daemon_id = None ):
  323. self.test_suite = test_suite
  324. super().__init__(opts=opts,flags=flags)
  325. self._set_ok += ('shared_args','usr_coind_args')
  326. self.shared_args = []
  327. self.usr_coind_args = []
  328. for k,v in self.daemon_data._asdict().items():
  329. setattr(self,k,v)
  330. self.desc = '{} {} {}daemon'.format(
  331. self.coind_name,
  332. getattr(self.proto.network_names,self.network),
  333. 'test suite ' if test_suite else '' )
  334. # user-set values take precedence
  335. self.datadir = os.path.abspath(datadir or g.daemon_data_dir or self.init_datadir())
  336. self.non_dfl_datadir = bool(datadir or g.daemon_data_dir or test_suite or self.network == 'regtest')
  337. # init_datadir() may have already initialized logdir
  338. self.logdir = os.path.abspath(getattr(self,'logdir',self.datadir))
  339. ps_adj = (port_shift or 0) + (self.test_suite_port_shift if test_suite else 0)
  340. # user-set values take precedence
  341. self.rpc_port = (g.rpc_port or 0) + (port_shift or 0) if g.rpc_port else ps_adj + self.get_rpc_port()
  342. self.p2p_port = (
  343. p2p_port or (
  344. self.get_p2p_port() + ps_adj if self.get_p2p_port() and (test_suite or ps_adj) else None
  345. ) if self.network != 'regtest' else None )
  346. if hasattr(self,'private_ports'):
  347. self.private_port = getattr(self.private_ports,self.network)
  348. # bind_port == self.private_port or self.rpc_port
  349. self.pidfile = '{}/{}-{}-daemon-{}.pid'.format(self.logdir,self.id,self.network,self.bind_port)
  350. self.logfile = '{}/{}-{}-daemon-{}.log'.format(self.logdir,self.id,self.network,self.bind_port)
  351. self.init_subclass()
  352. def init_datadir(self):
  353. if self.test_suite:
  354. return os.path.join('test','daemons',self.network_id)
  355. else:
  356. return os.path.join(*self.datadirs[self.platform])
  357. @property
  358. def network_datadir(self):
  359. return self.datadir
  360. def get_rpc_port(self):
  361. return getattr(self.rpc_ports,self.network)
  362. def get_p2p_port(self):
  363. return None
  364. @property
  365. def start_cmd(self):
  366. return ([self.exec_fn]
  367. + self.coind_args
  368. + self.shared_args
  369. + self.usr_coind_args )
  370. def cli_cmd(self,*cmds):
  371. return ([self.cli_fn]
  372. + self.shared_args
  373. + list(cmds) )
  374. def start(self,*args,**kwargs):
  375. assert self.test_suite or self.network == 'regtest', 'start() restricted to test suite and regtest'
  376. return super().start(*args,**kwargs)
  377. def stop(self,*args,**kwargs):
  378. assert self.test_suite or self.network == 'regtest', 'stop() restricted to test suite and regtest'
  379. return super().stop(*args,**kwargs)
  380. def pre_start(self):
  381. os.makedirs(self.datadir,exist_ok=True)
  382. if self.test_suite or self.network == 'regtest':
  383. if self.cfg_file and not self.flag.keep_cfg_file:
  384. with open(f'{self.datadir}/{self.cfg_file}','w') as fp:
  385. fp.write(self.cfg_file_hdr)
  386. if self.use_pidfile and os.path.exists(self.pidfile):
  387. # Parity overwrites the data in the existing pidfile without zeroing it first, leading
  388. # to interesting consequences when the new PID has fewer digits than the previous one.
  389. os.unlink(self.pidfile)
  390. def remove_datadir(self):
  391. "remove the network's datadir"
  392. assert self.test_suite, 'datadir removal restricted to test suite'
  393. if self.state == 'stopped':
  394. run([
  395. ('rm' if g.platform == 'win' else '/bin/rm'),
  396. '-rf',
  397. self.datadir ])
  398. set_vt100()
  399. else:
  400. msg(f'Cannot remove {self.network_datadir!r} - daemon is not stopped')