daemon.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. daemon: Daemon control interface for the MMGen suite
  20. """
  21. import sys, os, time, importlib
  22. from subprocess import run, PIPE, CompletedProcess
  23. from collections import namedtuple
  24. from .base_obj import Lockable
  25. from .color import set_vt100
  26. from .util import msg, Msg_r, die, remove_dups, oneshot_warning, fmt_list
  27. from .flags import ClassFlags, ClassOpts
  28. _dd = namedtuple('daemon_data', ['coind_name', 'coind_version', 'coind_version_str']) # latest tested version
  29. _nw = namedtuple('coin_networks', ['mainnet', 'testnet', 'regtest'])
  30. class Daemon(Lockable):
  31. desc = 'daemon'
  32. debug = False
  33. wait = True
  34. use_pidfile = True
  35. force_kill = False
  36. pids = ()
  37. use_threads = False
  38. cfg_file = None
  39. new_console_mswin = False
  40. lockfile = None
  41. private_port = None
  42. avail_opts = ()
  43. avail_flags = () # like opts, but can be set or unset after instantiation
  44. _reset_ok = ('debug', 'wait', 'pids')
  45. version_info_arg = '--version'
  46. def __init__(self, cfg, opts=None, flags=None):
  47. self.cfg = cfg
  48. self.platform = sys.platform
  49. if self.platform == 'win32':
  50. self.use_pidfile = False
  51. self.use_threads = True
  52. self.opt = ClassOpts(self, opts)
  53. self.flag = ClassFlags(self, flags)
  54. self.debug = self.debug or cfg.debug_daemon
  55. def exec_cmd_thread(self, cmd):
  56. import threading
  57. tname = ('exec_cmd', 'exec_cmd_win_console')[self.platform == 'win32' and self.new_console_mswin]
  58. t = threading.Thread(target=getattr(self, tname), args=(cmd,))
  59. t.daemon = True
  60. t.start()
  61. if self.platform == 'win32':
  62. Msg_r(' \b') # blocks w/o this...crazy
  63. return True
  64. def exec_cmd_win_console(self, cmd):
  65. from subprocess import Popen, CREATE_NEW_CONSOLE, STARTUPINFO, STARTF_USESHOWWINDOW, SW_HIDE
  66. si = STARTUPINFO(dwFlags=STARTF_USESHOWWINDOW, wShowWindow=SW_HIDE)
  67. p = Popen(cmd, creationflags=CREATE_NEW_CONSOLE, startupinfo=si)
  68. p.wait()
  69. def exec_cmd(self, cmd, is_daemon=False, check_retcode=False):
  70. out = (PIPE, None)[is_daemon and self.opt.no_daemonize]
  71. try:
  72. cp = run(cmd, check=False, stdout=out, stderr=out)
  73. except OSError as e:
  74. die('MMGenCalledProcessError', f'Error starting executable: {type(e).__name__} [Errno {e.errno}]')
  75. set_vt100()
  76. if check_retcode and cp.returncode:
  77. die(1, str(cp))
  78. if self.debug:
  79. print(cp)
  80. return cp
  81. def run_cmd(self, cmd, silent=False, is_daemon=False, check_retcode=False):
  82. if self.debug:
  83. msg('\n\n')
  84. if self.debug or (is_daemon and not silent):
  85. msg(f'Starting {self.desc} on port {self.bind_port}')
  86. if self.debug:
  87. msg(f'\nExecuting:\n{fmt_list(cmd, fmt="col", indent=" ")}\n')
  88. if self.use_threads and is_daemon and not self.opt.no_daemonize:
  89. ret = self.exec_cmd_thread(cmd)
  90. else:
  91. ret = self.exec_cmd(cmd, is_daemon, check_retcode)
  92. if isinstance(ret, CompletedProcess):
  93. if ret.stdout and (self.debug or not silent):
  94. msg(ret.stdout.decode().rstrip())
  95. if ret.stderr and (self.debug or (ret.returncode and not silent)):
  96. msg(ret.stderr.decode().rstrip())
  97. return ret
  98. @property
  99. def pid(self):
  100. if self.use_pidfile:
  101. with open(self.pidfile) as fp:
  102. return fp.read().strip()
  103. elif self.platform == 'win32':
  104. # Assumes only one running instance of given daemon. If multiple daemons are running,
  105. # the first PID in the list is returned and self.pids is set to the PID list.
  106. ss = f'{self.exec_fn}.exe'
  107. cp = self.run_cmd(['ps', '-Wl'], silent=True)
  108. self.pids = ()
  109. # use Windows, not Cygwin, PID
  110. pids = tuple(line.split()[3] for line in cp.stdout.decode().splitlines() if ss in line)
  111. if pids:
  112. if len(pids) > 1:
  113. self.pids = pids
  114. return pids[0]
  115. elif self.platform in ('linux', 'darwin'):
  116. ss = ' '.join(self.start_cmd)
  117. cp = self.run_cmd(['pgrep', '-f', ss], silent=True)
  118. if cp.stdout:
  119. return cp.stdout.strip().decode()
  120. die(2, f'{ss!r} not found in process list, cannot determine PID')
  121. @property
  122. def bind_port(self):
  123. return self.private_port or self.rpc_port
  124. @property
  125. def state(self):
  126. if self.debug:
  127. msg(f'Testing port {self.bind_port}')
  128. return 'ready' if self.test_socket('localhost', self.bind_port) else 'stopped'
  129. @property
  130. def start_cmds(self):
  131. return [self.start_cmd]
  132. @property
  133. def stop_cmd(self):
  134. return (
  135. ['kill', '-Wf', self.pid] if self.platform == 'win32' else
  136. ['kill', '-9', self.pid] if self.force_kill else
  137. ['kill', self.pid])
  138. def cmd(self, action, *args, **kwargs):
  139. return getattr(self, action)(*args, **kwargs)
  140. def cli(self, *cmds, silent=False):
  141. return self.run_cmd(self.cli_cmd(*cmds), silent=silent)
  142. def state_msg(self, extra_text=None):
  143. try:
  144. pid = self.pid
  145. except:
  146. pid = None
  147. extra_text = 'not ' if self.state == 'stopped' else f'{extra_text} ' if extra_text else ''
  148. return '{:{w}} {:10} {}'.format(
  149. f'{self.desc} {extra_text}running',
  150. 'pid N/A' if pid is None or self.pids or self.state == 'stopped' else f'pid {pid}',
  151. f'port {self.bind_port}',
  152. w = 60)
  153. def pre_start(self):
  154. pass
  155. def start(self, quiet=False, silent=False):
  156. if self.state == 'ready':
  157. if not (quiet or silent):
  158. msg(self.state_msg(extra_text='already'))
  159. return True
  160. self.wait_for_state('stopped')
  161. self.pre_start()
  162. if not silent:
  163. msg(f'Starting {self.desc} on port {self.bind_port}')
  164. ret = self.run_cmd(self.start_cmd, silent=True, is_daemon=True, check_retcode=True)
  165. if self.wait:
  166. self.wait_for_state('ready')
  167. return ret
  168. def stop(self, quiet=False, silent=False):
  169. if self.state == 'ready':
  170. if not silent:
  171. msg(f'Stopping {self.desc} on port {self.bind_port}')
  172. if self.force_kill:
  173. run(['sync'])
  174. ret = self.run_cmd(self.stop_cmd, silent=True)
  175. if self.pids:
  176. msg('Warning: multiple PIDs [{}] -- we may be stopping the wrong instance'.format(
  177. fmt_list(self.pids, fmt='bare')
  178. ))
  179. if self.wait:
  180. self.wait_for_state('stopped')
  181. time.sleep(0.3) # race condition
  182. return ret
  183. else:
  184. if not (quiet or silent):
  185. msg(f'{self.desc} on port {self.bind_port} not running')
  186. return True
  187. def restart(self, silent=False):
  188. self.stop(silent=silent)
  189. return self.start(silent=silent)
  190. def test_socket(self, host, port, timeout=10):
  191. import socket
  192. try:
  193. socket.create_connection((host, port), timeout=timeout).close()
  194. except:
  195. return False
  196. else:
  197. return True
  198. def wait_for_state(self, req_state):
  199. for _ in range(300):
  200. if self.state == req_state:
  201. return True
  202. time.sleep(0.2)
  203. die(2, f'Wait for state {req_state!r} timeout exceeded for {self.desc} (port {self.bind_port})')
  204. @classmethod
  205. def get_exec_version_str(cls):
  206. try:
  207. cp = run([cls.exec_fn, cls.version_info_arg], stdout=PIPE, stderr=PIPE, check=True)
  208. except Exception as e:
  209. die(2, f'{e}\nUnable to execute {cls.exec_fn}')
  210. if cp.returncode:
  211. die(2, f'Unable to execute {cls.exec_fn}')
  212. else:
  213. res = cp.stdout.decode().splitlines()
  214. return (res[0] if len(res) == 1 else [s for s in res if 'ersion' in s][0]).strip()
  215. class RPCDaemon(Daemon):
  216. avail_opts = ('no_daemonize',)
  217. def __init__(self, cfg, opts=None, flags=None):
  218. super().__init__(cfg, opts=opts, flags=flags)
  219. self.desc = '{} {} {}RPC daemon'.format(
  220. self.rpc_type,
  221. getattr(self.proto.network_names, self.proto.network),
  222. 'test suite ' if self.test_suite else '')
  223. self._set_ok += ('usr_daemon_args',)
  224. self.usr_daemon_args = []
  225. @property
  226. def start_cmd(self):
  227. return [self.exec_fn] + self.daemon_args + self.usr_daemon_args
  228. class CoinDaemon(Daemon):
  229. networks = ('mainnet', 'testnet', 'regtest')
  230. cfg_file_hdr = ''
  231. avail_flags = ('keep_cfg_file',)
  232. avail_opts = ('no_daemonize', 'online')
  233. testnet_dir = None
  234. test_suite_port_shift = 1237
  235. rpc_user = None
  236. rpc_password = None
  237. _cd = namedtuple('coins_data', ['daemon_ids'])
  238. coins = {
  239. 'BTC': _cd(['bitcoin_core']),
  240. 'BCH': _cd(['bitcoin_cash_node']),
  241. 'LTC': _cd(['litecoin_core']),
  242. 'XMR': _cd(['monero']),
  243. 'ETH': _cd(['geth', 'erigon', 'openethereum']),
  244. 'ETC': _cd(['parity']),
  245. }
  246. @classmethod
  247. def all_daemon_ids(cls):
  248. return [i for coin in cls.coins for i in cls.coins[coin].daemon_ids]
  249. class warn_blacklisted(oneshot_warning):
  250. color = 'yellow'
  251. message = 'blacklisted daemon: {!r}'
  252. @classmethod
  253. def get_daemon_ids(cls, cfg, coin):
  254. ret = cls.coins[coin].daemon_ids
  255. if 'erigon' in ret and not cfg.enable_erigon:
  256. ret.remove('erigon')
  257. if cfg.blacklisted_daemons:
  258. blacklist = cfg.blacklisted_daemons.split()
  259. def gen():
  260. for daemon_id in ret:
  261. if daemon_id in blacklist:
  262. cls.warn_blacklisted(div=daemon_id, fmt_args=[daemon_id])
  263. else:
  264. yield daemon_id
  265. ret = list(gen())
  266. return ret
  267. @classmethod
  268. def get_daemon(cls, cfg, coin, daemon_id, proto=None):
  269. if proto:
  270. proto_cls = type(proto)
  271. else:
  272. from .protocol import init_proto
  273. proto_cls = init_proto(cfg, coin, return_cls=True)
  274. return getattr(
  275. importlib.import_module(f'mmgen.proto.{proto_cls.base_proto_coin.lower()}.daemon'),
  276. daemon_id+'_daemon')
  277. @classmethod
  278. def get_network_ids(cls, cfg):
  279. from .protocol import CoinProtocol
  280. def gen():
  281. for coin in cls.coins:
  282. for daemon_id in cls.get_daemon_ids(cfg, coin):
  283. for network in cls.get_daemon(cfg, coin, daemon_id).networks:
  284. yield CoinProtocol.Base.create_network_id(coin, network)
  285. return remove_dups(list(gen()), quiet=True)
  286. def __new__(cls,
  287. cfg,
  288. network_id = None,
  289. proto = None,
  290. opts = None,
  291. flags = None,
  292. test_suite = False,
  293. port_shift = None,
  294. p2p_port = None,
  295. datadir = None,
  296. daemon_id = None):
  297. assert network_id or proto, 'CoinDaemon_chk1'
  298. assert not (network_id and proto), 'CoinDaemon_chk2'
  299. if proto:
  300. network_id = proto.network_id
  301. network = proto.network
  302. coin = proto.coin
  303. else:
  304. network_id = network_id.lower()
  305. from .protocol import CoinProtocol, init_proto
  306. proto = init_proto(cfg, network_id=network_id)
  307. coin, network = CoinProtocol.Base.parse_network_id(network_id)
  308. coin = coin.upper()
  309. daemon_ids = cls.get_daemon_ids(cfg, coin)
  310. if not daemon_ids:
  311. die(1, f'No configured daemons for coin {coin}!')
  312. daemon_id = daemon_id or cfg.daemon_id or daemon_ids[0]
  313. if daemon_id not in daemon_ids:
  314. die(1, f'{daemon_id!r}: invalid daemon_id - valid choices: {fmt_list(daemon_ids)}')
  315. me = Daemon.__new__(cls.get_daemon(cfg, None, daemon_id, proto=proto))
  316. assert network in me.networks, f'{network!r}: unsupported network for daemon {daemon_id}'
  317. me.network_id = network_id
  318. me.network = network
  319. me.coin = coin
  320. me.id = daemon_id
  321. me.proto = proto
  322. return me
  323. def __init__(self,
  324. cfg,
  325. network_id = None,
  326. proto = None,
  327. opts = None,
  328. flags = None,
  329. test_suite = False,
  330. port_shift = None,
  331. p2p_port = None,
  332. datadir = None,
  333. daemon_id = None):
  334. self.test_suite = test_suite
  335. super().__init__(cfg=cfg, opts=opts, flags=flags)
  336. self._set_ok += ('shared_args', 'usr_coind_args')
  337. self.shared_args = []
  338. self.usr_coind_args = []
  339. for k, v in self.daemon_data._asdict().items():
  340. setattr(self, k, v)
  341. self.desc = '{} {} {}daemon'.format(
  342. self.coind_name,
  343. getattr(self.proto.network_names, self.network),
  344. 'test suite ' if test_suite else '')
  345. # user-set values take precedence
  346. self.datadir = os.path.abspath(datadir or cfg.daemon_data_dir or self.init_datadir())
  347. self.non_dfl_datadir = bool(datadir or cfg.daemon_data_dir or test_suite or self.network == 'regtest')
  348. # init_datadir() may have already initialized logdir
  349. self.logdir = os.path.abspath(getattr(self, 'logdir', self.datadir))
  350. ps_adj = (port_shift or 0) + (self.test_suite_port_shift if test_suite else 0)
  351. # user-set values take precedence
  352. self.rpc_port = (cfg.rpc_port or 0) + (port_shift or 0) if cfg.rpc_port else ps_adj + self.get_rpc_port()
  353. self.p2p_port = (
  354. p2p_port or (
  355. self.get_p2p_port() + ps_adj if self.get_p2p_port() and (test_suite or ps_adj) else None
  356. ) if self.network != 'regtest' else None)
  357. if hasattr(self, 'private_ports'):
  358. self.private_port = getattr(self.private_ports, self.network)
  359. # bind_port == self.private_port or self.rpc_port
  360. self.pidfile = f'{self.logdir}/{self.id}-{self.network}-daemon-{self.bind_port}.pid'
  361. self.logfile = f'{self.logdir}/{self.id}-{self.network}-daemon-{self.bind_port}.log'
  362. self.init_subclass()
  363. def init_datadir(self):
  364. if self.test_suite:
  365. return os.path.join('test', 'daemons', self.network_id)
  366. else:
  367. return os.path.join(*self.datadirs[self.platform])
  368. @property
  369. def network_datadir(self):
  370. return self.datadir
  371. def get_rpc_port(self):
  372. return getattr(self.rpc_ports, self.network)
  373. def get_p2p_port(self):
  374. return None
  375. @property
  376. def start_cmd(self):
  377. return ([self.exec_fn]
  378. + self.coind_args
  379. + self.shared_args
  380. + self.usr_coind_args)
  381. def cli_cmd(self, *cmds):
  382. return ([self.cli_fn]
  383. + self.shared_args
  384. + list(cmds))
  385. def start(self, *args, **kwargs):
  386. assert self.test_suite or self.network == 'regtest', 'start() restricted to test suite and regtest'
  387. return super().start(*args, **kwargs)
  388. def stop(self, *args, **kwargs):
  389. assert self.test_suite or self.network == 'regtest', 'stop() restricted to test suite and regtest'
  390. return super().stop(*args, **kwargs)
  391. def pre_start(self):
  392. os.makedirs(self.datadir, exist_ok=True)
  393. if self.test_suite or self.network == 'regtest':
  394. if self.cfg_file and not self.flag.keep_cfg_file:
  395. with open(f'{self.datadir}/{self.cfg_file}', 'w') as fp:
  396. fp.write(self.cfg_file_hdr)
  397. if self.use_pidfile and os.path.exists(self.pidfile):
  398. # Parity overwrites the data in the existing pidfile without zeroing it first, leading
  399. # to interesting consequences when the new PID has fewer digits than the previous one.
  400. os.unlink(self.pidfile)
  401. def remove_datadir(self):
  402. "remove the network's datadir"
  403. assert self.test_suite, 'datadir removal restricted to test suite'
  404. if self.state == 'stopped':
  405. run([
  406. ('rm' if self.platform == 'win32' else '/bin/rm'),
  407. '-rf',
  408. self.datadir])
  409. set_vt100()
  410. else:
  411. msg(f'Cannot remove {self.network_datadir!r} - daemon is not stopped')