daemon.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. daemon: Daemon control interface for the MMGen suite
  20. """
  21. import os,time,importlib
  22. from subprocess import run,PIPE,CompletedProcess
  23. from collections import namedtuple
  24. from .cfg import gc
  25. from .color import set_vt100
  26. from .util import msg,Msg_r,ymsg,die,remove_dups,oneshot_warning
  27. from .flags import *
  28. _dd = namedtuple('daemon_data',['coind_name','coind_version','coind_version_str']) # latest tested version
  29. _nw = namedtuple('coin_networks',['mainnet','testnet','regtest'])
  30. class Daemon(Lockable):
  31. desc = 'daemon'
  32. debug = False
  33. wait = True
  34. use_pidfile = True
  35. force_kill = False
  36. pids = ()
  37. use_threads = False
  38. cfg_file = None
  39. new_console_mswin = False
  40. lockfile = None
  41. private_port = None
  42. avail_opts = ()
  43. avail_flags = () # like opts, but can be set or unset after instantiation
  44. _reset_ok = ('debug','wait','pids')
  45. def __init__(self,cfg,opts=None,flags=None):
  46. self.cfg = cfg
  47. self.platform = gc.platform
  48. if self.platform == 'win':
  49. self.use_pidfile = False
  50. self.use_threads = True
  51. self.opt = ClassOpts(self,opts)
  52. self.flag = ClassFlags(self,flags)
  53. def exec_cmd_thread(self,cmd):
  54. import threading
  55. tname = ('exec_cmd','exec_cmd_win_console')[self.platform == 'win' and self.new_console_mswin]
  56. t = threading.Thread(target=getattr(self,tname),args=(cmd,))
  57. t.daemon = True
  58. t.start()
  59. if self.platform == 'win':
  60. Msg_r(' \b') # blocks w/o this...crazy
  61. return True
  62. def exec_cmd_win_console(self,cmd):
  63. from subprocess import Popen,CREATE_NEW_CONSOLE,STARTUPINFO,STARTF_USESHOWWINDOW,SW_HIDE
  64. si = STARTUPINFO(dwFlags=STARTF_USESHOWWINDOW,wShowWindow=SW_HIDE)
  65. p = Popen(cmd,creationflags=CREATE_NEW_CONSOLE,startupinfo=si)
  66. p.wait()
  67. def exec_cmd(self,cmd,is_daemon=False):
  68. out = (PIPE,None)[is_daemon and self.opt.no_daemonize]
  69. try:
  70. cp = run(cmd,check=False,stdout=out,stderr=out)
  71. except Exception as e:
  72. die( 'MMGenCalledProcessError', f'Error starting executable: {type(e).__name__} [Errno {e.errno}]' )
  73. set_vt100()
  74. if self.debug:
  75. print(cp)
  76. return cp
  77. def run_cmd(self,cmd,silent=False,is_daemon=False):
  78. if self.debug:
  79. msg('\n\n')
  80. if self.debug or (is_daemon and not silent):
  81. msg(f'Starting {self.desc} on port {self.bind_port}')
  82. if self.use_threads and is_daemon and not self.opt.no_daemonize:
  83. ret = self.exec_cmd_thread(cmd)
  84. else:
  85. ret = self.exec_cmd(cmd,is_daemon)
  86. if isinstance(ret,CompletedProcess):
  87. if ret.stdout and (self.debug or not silent):
  88. msg(ret.stdout.decode().rstrip())
  89. if ret.stderr and (self.debug or (ret.returncode and not silent)):
  90. msg(ret.stderr.decode().rstrip())
  91. return ret
  92. @property
  93. def pid(self):
  94. if self.use_pidfile:
  95. with open(self.pidfile) as fp:
  96. return fp.read().strip()
  97. elif self.platform == 'win':
  98. """
  99. Assumes only one running instance of given daemon. If multiple daemons are running,
  100. the first PID in the list is returned and self.pids is set to the PID list.
  101. """
  102. ss = f'{self.exec_fn}.exe'
  103. cp = self.run_cmd(['ps','-Wl'],silent=True)
  104. self.pids = ()
  105. # use Windows, not Cygwin, PID
  106. pids = tuple(line.split()[3] for line in cp.stdout.decode().splitlines() if ss in line)
  107. if pids:
  108. if len(pids) > 1:
  109. self.pids = pids
  110. return pids[0]
  111. elif self.platform == 'linux':
  112. ss = ' '.join(self.start_cmd)
  113. cp = self.run_cmd(['pgrep','-f',ss],silent=True)
  114. if cp.stdout:
  115. return cp.stdout.strip().decode()
  116. die(2,f'{ss!r} not found in process list, cannot determine PID')
  117. @property
  118. def bind_port(self):
  119. return self.private_port or self.rpc_port
  120. @property
  121. def state(self):
  122. if self.debug:
  123. msg(f'Testing port {self.bind_port}')
  124. return 'ready' if self.test_socket('localhost',self.bind_port) else 'stopped'
  125. @property
  126. def start_cmds(self):
  127. return [self.start_cmd]
  128. @property
  129. def stop_cmd(self):
  130. return (
  131. ['kill','-Wf',self.pid] if self.platform == 'win' else
  132. ['kill','-9',self.pid] if self.force_kill else
  133. ['kill',self.pid] )
  134. def cmd(self,action,*args,**kwargs):
  135. return getattr(self,action)(*args,**kwargs)
  136. def cli(self,*cmds,silent=False):
  137. return self.run_cmd(self.cli_cmd(*cmds),silent=silent)
  138. def state_msg(self,extra_text=None):
  139. extra_text = f'{extra_text} ' if extra_text else ''
  140. return '{:{w}} {:10} {}'.format(
  141. f'{self.desc} {extra_text}running',
  142. 'pid N/A' if self.pid is None or self.pids else f'pid {self.pid}',
  143. f'port {self.bind_port}',
  144. w = 52 + len(extra_text) )
  145. def pre_start(self): pass
  146. def start(self,quiet=False,silent=False):
  147. if self.state == 'ready':
  148. if not (quiet or silent):
  149. msg(self.state_msg(extra_text='already'))
  150. return True
  151. self.wait_for_state('stopped')
  152. self.pre_start()
  153. if not silent:
  154. msg(f'Starting {self.desc} on port {self.bind_port}')
  155. ret = self.run_cmd(self.start_cmd,silent=True,is_daemon=True)
  156. if self.wait:
  157. self.wait_for_state('ready')
  158. return ret
  159. def stop(self,quiet=False,silent=False):
  160. if self.state == 'ready':
  161. if not silent:
  162. msg(f'Stopping {self.desc} on port {self.bind_port}')
  163. if self.force_kill:
  164. run(['sync'])
  165. ret = self.run_cmd(self.stop_cmd,silent=True)
  166. if self.pids:
  167. msg('Warning: multiple PIDs [{}] -- we may be stopping the wrong instance'.format(
  168. fmt_list(self.pids,fmt='bare')
  169. ))
  170. if self.wait:
  171. self.wait_for_state('stopped')
  172. return ret
  173. else:
  174. if not (quiet or silent):
  175. msg(f'{self.desc} on port {self.bind_port} not running')
  176. return True
  177. def restart(self,silent=False):
  178. self.stop(silent=silent)
  179. return self.start(silent=silent)
  180. def test_socket(self,host,port,timeout=10):
  181. import socket
  182. try:
  183. socket.create_connection((host,port),timeout=timeout).close()
  184. except:
  185. return False
  186. else:
  187. return True
  188. def wait_for_state(self,req_state):
  189. for i in range(300):
  190. if self.state == req_state:
  191. return True
  192. time.sleep(0.2)
  193. else:
  194. die(2,f'Wait for state {req_state!r} timeout exceeded for {self.desc} (port {self.bind_port})')
  195. class RPCDaemon(Daemon):
  196. avail_opts = ('no_daemonize',)
  197. def __init__(self,cfg):
  198. super().__init__(cfg)
  199. self.desc = '{} {} {}RPC daemon'.format(
  200. self.rpc_type,
  201. getattr(self.proto.network_names,self.proto.network),
  202. 'test suite ' if self.test_suite else '' )
  203. self._set_ok += ('usr_daemon_args',)
  204. self.usr_daemon_args = []
  205. @property
  206. def start_cmd(self):
  207. return ([self.exec_fn] + self.daemon_args + self.usr_daemon_args)
  208. class CoinDaemon(Daemon):
  209. networks = ('mainnet','testnet','regtest')
  210. cfg_file_hdr = ''
  211. avail_flags = ('keep_cfg_file',)
  212. avail_opts = ('no_daemonize','online')
  213. testnet_dir = None
  214. test_suite_port_shift = 1237
  215. rpc_user = None
  216. rpc_password = None
  217. version_info_arg = '--version'
  218. _cd = namedtuple('coins_data',['daemon_ids'])
  219. coins = {
  220. 'BTC': _cd(['bitcoin_core']),
  221. 'BCH': _cd(['bitcoin_cash_node']),
  222. 'LTC': _cd(['litecoin_core']),
  223. 'XMR': _cd(['monero']),
  224. 'ETH': _cd(['geth','erigon','openethereum']),
  225. 'ETC': _cd(['parity']),
  226. }
  227. @classmethod
  228. def all_daemon_ids(cls):
  229. return [i for coin in cls.coins for i in cls.coins[coin].daemon_ids]
  230. class warn_blacklisted(oneshot_warning):
  231. color = 'yellow'
  232. message = 'blacklisted daemon: {!r}'
  233. @classmethod
  234. def get_daemon_ids(cls,cfg,coin):
  235. ret = cls.coins[coin].daemon_ids
  236. if 'erigon' in ret and not cfg.enable_erigon:
  237. ret.remove('erigon')
  238. if cfg.blacklisted_daemons:
  239. blacklist = cfg.blacklisted_daemons.split()
  240. def gen():
  241. for daemon_id in ret:
  242. if daemon_id in blacklist:
  243. cls.warn_blacklisted(div=daemon_id,fmt_args=[daemon_id])
  244. else:
  245. yield daemon_id
  246. ret = list(gen())
  247. return ret
  248. @classmethod
  249. def get_daemon(cls,cfg,coin,daemon_id,proto=None):
  250. if not proto:
  251. from .protocol import init_proto
  252. proto = init_proto( cfg, coin )
  253. return getattr(
  254. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.daemon'),
  255. daemon_id+'_daemon' )
  256. @classmethod
  257. def get_network_ids(cls,cfg):
  258. from .protocol import CoinProtocol
  259. def gen():
  260. for coin in cls.coins:
  261. for daemon_id in cls.get_daemon_ids(cfg,coin):
  262. for network in cls.get_daemon( cfg, coin, daemon_id ).networks:
  263. yield CoinProtocol.Base.create_network_id(coin,network)
  264. return remove_dups(list(gen()),quiet=True)
  265. @classmethod
  266. def get_exec_version_str(cls):
  267. try:
  268. cp = run([cls.exec_fn,cls.version_info_arg],stdout=PIPE,stderr=PIPE,check=True)
  269. except:
  270. die(2,f'Unable to execute {cls.exec_fn}')
  271. if cp.returncode:
  272. die(2,f'Unable to execute {cls.exec_fn}')
  273. else:
  274. res = cp.stdout.decode().splitlines()
  275. return ( res[0] if len(res) == 1 else [s for s in res if 'ersion' in s][0] ).strip()
  276. def __new__(cls,
  277. cfg,
  278. network_id = None,
  279. proto = None,
  280. opts = None,
  281. flags = None,
  282. test_suite = False,
  283. port_shift = None,
  284. p2p_port = None,
  285. datadir = None,
  286. daemon_id = None ):
  287. assert network_id or proto, 'CoinDaemon_chk1'
  288. assert not (network_id and proto), 'CoinDaemon_chk2'
  289. if proto:
  290. network_id = proto.network_id
  291. network = proto.network
  292. coin = proto.coin
  293. else:
  294. network_id = network_id.lower()
  295. from .protocol import CoinProtocol,init_proto
  296. proto = init_proto( cfg, network_id=network_id )
  297. coin,network = CoinProtocol.Base.parse_network_id(network_id)
  298. coin = coin.upper()
  299. daemon_ids = cls.get_daemon_ids(cfg,coin)
  300. if not daemon_ids:
  301. die(1,f'No configured daemons for coin {coin}!')
  302. daemon_id = daemon_id or cfg.daemon_id or daemon_ids[0]
  303. if daemon_id not in daemon_ids:
  304. die(1,f'{daemon_id!r}: invalid daemon_id - valid choices: {fmt_list(daemon_ids)}')
  305. me = Daemon.__new__(cls.get_daemon( cfg, None, daemon_id, proto=proto ))
  306. assert network in me.networks, f'{network!r}: unsupported network for daemon {daemon_id}'
  307. me.network_id = network_id
  308. me.network = network
  309. me.coin = coin
  310. me.id = daemon_id
  311. me.proto = proto
  312. return me
  313. def __init__(self,
  314. cfg,
  315. network_id = None,
  316. proto = None,
  317. opts = None,
  318. flags = None,
  319. test_suite = False,
  320. port_shift = None,
  321. p2p_port = None,
  322. datadir = None,
  323. daemon_id = None ):
  324. self.test_suite = test_suite
  325. super().__init__(cfg=cfg,opts=opts,flags=flags)
  326. self._set_ok += ('shared_args','usr_coind_args')
  327. self.shared_args = []
  328. self.usr_coind_args = []
  329. for k,v in self.daemon_data._asdict().items():
  330. setattr(self,k,v)
  331. self.desc = '{} {} {}daemon'.format(
  332. self.coind_name,
  333. getattr(self.proto.network_names,self.network),
  334. 'test suite ' if test_suite else '' )
  335. # user-set values take precedence
  336. self.datadir = os.path.abspath(datadir or cfg.daemon_data_dir or self.init_datadir())
  337. self.non_dfl_datadir = bool(datadir or cfg.daemon_data_dir or test_suite or self.network == 'regtest')
  338. # init_datadir() may have already initialized logdir
  339. self.logdir = os.path.abspath(getattr(self,'logdir',self.datadir))
  340. ps_adj = (port_shift or 0) + (self.test_suite_port_shift if test_suite else 0)
  341. # user-set values take precedence
  342. self.rpc_port = (cfg.rpc_port or 0) + (port_shift or 0) if cfg.rpc_port else ps_adj + self.get_rpc_port()
  343. self.p2p_port = (
  344. p2p_port or (
  345. self.get_p2p_port() + ps_adj if self.get_p2p_port() and (test_suite or ps_adj) else None
  346. ) if self.network != 'regtest' else None )
  347. if hasattr(self,'private_ports'):
  348. self.private_port = getattr(self.private_ports,self.network)
  349. # bind_port == self.private_port or self.rpc_port
  350. self.pidfile = '{}/{}-{}-daemon-{}.pid'.format(self.logdir,self.id,self.network,self.bind_port)
  351. self.logfile = '{}/{}-{}-daemon-{}.log'.format(self.logdir,self.id,self.network,self.bind_port)
  352. self.init_subclass()
  353. def init_datadir(self):
  354. if self.test_suite:
  355. return os.path.join('test','daemons',self.network_id)
  356. else:
  357. return os.path.join(*self.datadirs[self.platform])
  358. @property
  359. def network_datadir(self):
  360. return self.datadir
  361. def get_rpc_port(self):
  362. return getattr(self.rpc_ports,self.network)
  363. def get_p2p_port(self):
  364. return None
  365. @property
  366. def start_cmd(self):
  367. return ([self.exec_fn]
  368. + self.coind_args
  369. + self.shared_args
  370. + self.usr_coind_args )
  371. def cli_cmd(self,*cmds):
  372. return ([self.cli_fn]
  373. + self.shared_args
  374. + list(cmds) )
  375. def start(self,*args,**kwargs):
  376. assert self.test_suite or self.network == 'regtest', 'start() restricted to test suite and regtest'
  377. return super().start(*args,**kwargs)
  378. def stop(self,*args,**kwargs):
  379. assert self.test_suite or self.network == 'regtest', 'stop() restricted to test suite and regtest'
  380. return super().stop(*args,**kwargs)
  381. def pre_start(self):
  382. os.makedirs(self.datadir,exist_ok=True)
  383. if self.test_suite or self.network == 'regtest':
  384. if self.cfg_file and not self.flag.keep_cfg_file:
  385. with open(f'{self.datadir}/{self.cfg_file}','w') as fp:
  386. fp.write(self.cfg_file_hdr)
  387. if self.use_pidfile and os.path.exists(self.pidfile):
  388. # Parity overwrites the data in the existing pidfile without zeroing it first, leading
  389. # to interesting consequences when the new PID has fewer digits than the previous one.
  390. os.unlink(self.pidfile)
  391. def remove_datadir(self):
  392. "remove the network's datadir"
  393. assert self.test_suite, 'datadir removal restricted to test suite'
  394. if self.state == 'stopped':
  395. run([
  396. ('rm' if gc.platform == 'win' else '/bin/rm'),
  397. '-rf',
  398. self.datadir ])
  399. set_vt100()
  400. else:
  401. msg(f'Cannot remove {self.network_datadir!r} - daemon is not stopped')