rpc.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. rpc: Cryptocoin RPC library for the MMGen suite
  20. """
  21. import sys, re, base64, json, asyncio, importlib
  22. from decimal import Decimal
  23. from collections import namedtuple
  24. from .util import msg, ymsg, die, fmt, fmt_list, pp_fmt, oneshot_warning
  25. from .base_obj import AsyncInit
  26. from .obj import NonNegativeInt
  27. from .objmethods import HiliteStr, InitErrors, MMGenObject
  28. auth_data = namedtuple('rpc_auth_data', ['user', 'passwd'])
  29. def dmsg_rpc(fs, data=None, is_json=False):
  30. msg(
  31. fs if data is None else
  32. fs.format(pp_fmt(json.loads(data) if is_json else data))
  33. )
  34. def dmsg_rpc_backend(host_url, host_path, payload):
  35. msg(
  36. f'\n RPC URL: {host_url}{host_path}' +
  37. '\n RPC PAYLOAD data (httplib) ==>' +
  38. f'\n{pp_fmt(payload)}\n')
  39. def noop(*args, **kwargs):
  40. pass
  41. class IPPort(HiliteStr, InitErrors):
  42. color = 'yellow'
  43. width = 0
  44. trunc_ok = False
  45. min_len = 9 # 0.0.0.0:0
  46. max_len = 21 # 255.255.255.255:65535
  47. def __new__(cls, s):
  48. if isinstance(s, cls):
  49. return s
  50. try:
  51. m = re.fullmatch(r'{q}\.{q}\.{q}\.{q}:(\d{{1,10}})'.format(q=r'([0-9]{1,3})'), s)
  52. assert m is not None, f'{s!r}: invalid IP:HOST specifier'
  53. for e in m.groups():
  54. if len(e) != 1 and e[0] == '0':
  55. raise ValueError(f'{e}: leading zeroes not permitted in dotted decimal element or port number')
  56. res = [int(e) for e in m.groups()]
  57. for e in res[:4]:
  58. assert e <= 255, f'{e}: dotted decimal element > 255'
  59. assert res[4] <= 65535, f'{res[4]}: port number > 65535'
  60. me = str.__new__(cls, s)
  61. me.ip = '{}.{}.{}.{}'.format(*res)
  62. me.ip_num = sum(res[i] * (2 ** (-(i-3)*8)) for i in range(4))
  63. me.port = res[4]
  64. return me
  65. except Exception as e:
  66. return cls.init_fail(e, s)
  67. class json_encoder(json.JSONEncoder):
  68. def default(self, o):
  69. if isinstance(o, Decimal):
  70. return str(o)
  71. else:
  72. return json.JSONEncoder.default(self, o)
  73. class RPCBackends:
  74. class base:
  75. def __init__(self, caller):
  76. self.cfg = caller.cfg
  77. self.host = caller.host
  78. self.port = caller.port
  79. self.proxy = caller.proxy
  80. self.host_url = caller.host_url
  81. self.timeout = caller.timeout
  82. self.http_hdrs = caller.http_hdrs
  83. self.name = type(self).__name__
  84. self.caller = caller
  85. class aiohttp(base, metaclass=AsyncInit):
  86. """
  87. Contrary to the requests library, aiohttp won’t read environment variables by
  88. default. But you can do so by passing trust_env=True into aiohttp.ClientSession
  89. constructor to honor HTTP_PROXY, HTTPS_PROXY, WS_PROXY or WSS_PROXY environment
  90. variables (all are case insensitive).
  91. """
  92. def __del__(self):
  93. self.connector.close()
  94. self.session.detach()
  95. del self.session
  96. async def __init__(self, caller):
  97. super().__init__(caller)
  98. import aiohttp
  99. self.connector = aiohttp.TCPConnector(limit_per_host=self.cfg.aiohttp_rpc_queue_len)
  100. self.session = aiohttp.ClientSession(
  101. headers = {'Content-Type': 'application/json'},
  102. connector = self.connector,
  103. )
  104. if caller.auth_type == 'basic':
  105. self.auth = aiohttp.BasicAuth(*caller.auth, encoding='UTF-8')
  106. else:
  107. self.auth = None
  108. async def run(self, payload, timeout, host_path):
  109. dmsg_rpc_backend(self.host_url, host_path, payload)
  110. async with self.session.post(
  111. url = self.host_url + host_path,
  112. auth = self.auth,
  113. data = json.dumps(payload, cls=json_encoder),
  114. timeout = timeout or self.timeout,
  115. ) as res:
  116. return (await res.text(), res.status)
  117. class requests(base):
  118. def __del__(self):
  119. self.session.close()
  120. def __init__(self, caller):
  121. super().__init__(caller)
  122. import requests, urllib3
  123. urllib3.disable_warnings()
  124. self.session = requests.Session()
  125. self.session.trust_env = False # ignore *_PROXY environment vars
  126. self.session.headers = caller.http_hdrs
  127. if caller.auth_type:
  128. auth = 'HTTP' + caller.auth_type.capitalize() + 'Auth'
  129. self.session.auth = getattr(requests.auth, auth)(*caller.auth)
  130. if self.proxy: # used only by XMR for now: requires pysocks package
  131. self.session.proxies.update({
  132. 'http': f'socks5h://{self.proxy}',
  133. 'https': f'socks5h://{self.proxy}'
  134. })
  135. async def run(self, *args, **kwargs):
  136. return self.run_noasync(*args, **kwargs)
  137. def run_noasync(self, payload, timeout, host_path):
  138. dmsg_rpc_backend(self.host_url, host_path, payload)
  139. res = self.session.post(
  140. url = self.host_url + host_path,
  141. data = json.dumps(payload, cls=json_encoder),
  142. timeout = timeout or self.timeout,
  143. verify = False)
  144. return (res.content, res.status_code)
  145. class httplib(base):
  146. """
  147. Ignores *_PROXY environment vars
  148. """
  149. def __del__(self):
  150. self.session.close()
  151. def __init__(self, caller):
  152. super().__init__(caller)
  153. import http.client
  154. self.session = http.client.HTTPConnection(caller.host, caller.port, caller.timeout)
  155. if caller.auth_type == 'basic':
  156. auth_str = f'{caller.auth.user}:{caller.auth.passwd}'
  157. auth_str_b64 = 'Basic ' + base64.b64encode(auth_str.encode()).decode()
  158. self.http_hdrs.update({'Host': self.host, 'Authorization': auth_str_b64})
  159. dmsg_rpc(f' RPC AUTHORIZATION data ==> raw: [{auth_str}]\n{"":>31}enc: [{auth_str_b64}]\n')
  160. async def run(self, payload, timeout, host_path):
  161. dmsg_rpc_backend(self.host_url, host_path, payload)
  162. if timeout:
  163. import http.client
  164. s = http.client.HTTPConnection(self.host, self.port, timeout)
  165. else:
  166. s = self.session
  167. try:
  168. s.request(
  169. method = 'POST',
  170. url = host_path,
  171. body = json.dumps(payload, cls=json_encoder),
  172. headers = self.http_hdrs)
  173. r = s.getresponse() # => http.client.HTTPResponse instance
  174. except Exception as e:
  175. die('RPCFailure', str(e))
  176. if timeout:
  177. ret = (r.read(), r.status)
  178. s.close()
  179. return ret
  180. else:
  181. return (r.read(), r.status)
  182. class curl(base):
  183. def __init__(self, caller):
  184. def gen_opts():
  185. for k, v in caller.http_hdrs.items():
  186. yield from ('--header', f'{k}: {v}')
  187. if caller.auth_type:
  188. # Authentication with curl is insecure, as it exposes the user's credentials
  189. # via the command line. Use for testing only.
  190. yield from ('--user', f'{caller.auth.user}:{caller.auth.passwd}')
  191. if caller.auth_type == 'digest':
  192. yield '--digest'
  193. if caller.network_proto == 'https' and caller.verify_server is False:
  194. yield '--insecure'
  195. super().__init__(caller)
  196. self.exec_opts = list(gen_opts()) + ['--silent']
  197. self.arg_max = 8192 # set way below system ARG_MAX, just to be safe
  198. async def run(self, payload, timeout, host_path):
  199. data = json.dumps(payload, cls=json_encoder)
  200. if len(data) > self.arg_max:
  201. ymsg('Warning: Curl data payload length exceeded - falling back on httplib')
  202. return RPCBackends.httplib(self.caller).run(payload, timeout, host_path)
  203. dmsg_rpc_backend(self.host_url, host_path, payload)
  204. exec_cmd = [
  205. 'curl',
  206. '--proxy', f'socks5h://{self.proxy}' if self.proxy else '',
  207. '--connect-timeout', str(timeout or self.timeout),
  208. '--write-out', '%{http_code}',
  209. '--data-binary', data
  210. ] + self.exec_opts + [self.host_url + host_path]
  211. dmsg_rpc(' RPC curl exec data ==>\n{}\n', exec_cmd)
  212. from subprocess import run, PIPE
  213. from .color import set_vt100
  214. res = run(exec_cmd, stdout=PIPE, check=True, text=True).stdout
  215. set_vt100()
  216. return (res[:-3], int(res[-3:]))
  217. class RPCClient(MMGenObject):
  218. auth_type = None
  219. has_auth_cookie = False
  220. network_proto = 'http'
  221. proxy = None
  222. def __init__(self, cfg, host, port, test_connection=True):
  223. self.cfg = cfg
  224. self.name = type(self).__name__
  225. # aiohttp workaround, and may speed up RPC performance overall on some systems:
  226. if sys.platform == 'win32' and host == 'localhost':
  227. host = '127.0.0.1'
  228. global dmsg_rpc, dmsg_rpc_backend
  229. if not self.cfg.debug_rpc:
  230. dmsg_rpc = dmsg_rpc_backend = noop
  231. dmsg_rpc(f'=== {self.name}.__init__() debug ===')
  232. dmsg_rpc(f' cls [{self.name}] host [{host}] port [{port}]\n')
  233. if test_connection:
  234. import socket
  235. try:
  236. socket.create_connection((host, port), timeout=1).close()
  237. except:
  238. die('SocketError', f'Unable to connect to {host}:{port}')
  239. self.http_hdrs = {'Content-Type': 'application/json'}
  240. self.host_url = f'{self.network_proto}://{host}:{port}'
  241. self.host = host
  242. self.port = port
  243. self.timeout = self.cfg.http_timeout
  244. self.auth = None
  245. def _get_backend(self, backend):
  246. backend_id = backend or self.cfg.rpc_backend
  247. if backend_id == 'auto':
  248. return {
  249. 'linux': RPCBackends.httplib,
  250. 'darwin': RPCBackends.httplib,
  251. 'win32': RPCBackends.requests
  252. }[sys.platform](self)
  253. else:
  254. return getattr(RPCBackends, backend_id)(self)
  255. def set_backend(self, backend=None):
  256. self.backend = self._get_backend(backend)
  257. async def set_backend_async(self, backend=None):
  258. ret = self._get_backend(backend)
  259. self.backend = (await ret) if type(ret).__name__ == 'coroutine' else ret
  260. # Call family of methods - direct-to-daemon RPC call:
  261. # - positional params are passed to the daemon, 'timeout' and 'wallet' kwargs to the backend
  262. # - 'wallet' kwarg is used only by regtest
  263. async def call(self, method, *params, timeout=None, wallet=None):
  264. """
  265. default call: call with param list unrolled, exactly as with cli
  266. """
  267. return self.process_http_resp(await self.backend.run(
  268. payload = {'id': 1, 'jsonrpc': '2.0', 'method': method, 'params': params},
  269. timeout = timeout,
  270. host_path = self.make_host_path(wallet)
  271. ))
  272. async def batch_call(self, method, param_list, timeout=None, wallet=None):
  273. """
  274. Make a single call with a list of tuples as first argument
  275. For RPC calls that return a list of results
  276. """
  277. return self.process_http_resp(await self.backend.run(
  278. payload = [{
  279. 'id': n,
  280. 'jsonrpc': '2.0',
  281. 'method': method,
  282. 'params': params} for n, params in enumerate(param_list, 1)],
  283. timeout = timeout,
  284. host_path = self.make_host_path(wallet)
  285. ), batch=True)
  286. async def gathered_call(self, method, args_list, timeout=None, wallet=None):
  287. """
  288. Perform multiple RPC calls, returning results in a list
  289. Can be called two ways:
  290. 1) method = methodname, args_list = [args_tuple1, args_tuple2,...]
  291. 2) method = None, args_list = [(methodname1, args_tuple1), (methodname2, args_tuple2), ...]
  292. """
  293. cmd_list = args_list if method is None else tuple(zip([method] * len(args_list), args_list))
  294. cur_pos = 0
  295. chunk_size = 1024
  296. ret = []
  297. while cur_pos < len(cmd_list):
  298. tasks = [self.backend.run(
  299. payload = {'id': n, 'jsonrpc': '2.0', 'method': method, 'params': params},
  300. timeout = timeout,
  301. host_path = self.make_host_path(wallet)
  302. ) for n, (method, params) in enumerate(cmd_list[cur_pos:chunk_size+cur_pos], 1)]
  303. ret.extend(await asyncio.gather(*tasks))
  304. cur_pos += chunk_size
  305. return [self.process_http_resp(r) for r in ret]
  306. # Icall family of methods - indirect RPC call using CallSigs mechanism:
  307. # - 'timeout' and 'wallet' kwargs are passed to corresponding Call method
  308. # - remaining kwargs are passed to CallSigs method
  309. # - CallSigs method returns method and positional params for Call method
  310. def icall(self, method, **kwargs):
  311. timeout = kwargs.pop('timeout', None)
  312. wallet = kwargs.pop('wallet', None)
  313. return self.call(
  314. *getattr(self.call_sigs, method)(**kwargs),
  315. timeout = timeout,
  316. wallet = wallet)
  317. def gathered_icall(self, method, args_list, timeout=None, wallet=None):
  318. return self.gathered_call(
  319. method,
  320. [getattr(self.call_sigs, method)(*a)[1:] for a in args_list],
  321. timeout = timeout,
  322. wallet = wallet)
  323. def process_http_resp(self, run_ret, batch=False, json_rpc=True):
  324. def float_parser(n):
  325. return n
  326. text, status = run_ret
  327. if status == 200:
  328. dmsg_rpc(' RPC RESPONSE data ==>\n{}\n', text, is_json=True)
  329. m = None
  330. if batch:
  331. return [r['result'] for r in json.loads(text, parse_float=float_parser)]
  332. else:
  333. try:
  334. if json_rpc:
  335. ret = json.loads(text, parse_float=float_parser)['result']
  336. if isinstance(ret, list) and ret and type(ret[0]) == dict and 'success' in ret[0]:
  337. for res in ret:
  338. if not res['success']:
  339. m = str(res['error'])
  340. assert False
  341. return ret
  342. else:
  343. return json.loads(text, parse_float=float_parser)
  344. except:
  345. if not m:
  346. t = json.loads(text)
  347. try:
  348. m = t['error']['message']
  349. except:
  350. try:
  351. m = t['error']
  352. except:
  353. m = t
  354. die('RPCFailure', m)
  355. else:
  356. import http
  357. m, s = ('', http.HTTPStatus(status))
  358. if text:
  359. try:
  360. m = json.loads(text)['error']['message']
  361. except:
  362. try:
  363. m = text.decode()
  364. except:
  365. m = text
  366. die('RPCFailure', f'{s.value} {s.name}: {m}')
  367. async def stop_daemon(self, quiet=False, silent=False):
  368. if self.daemon.state == 'ready':
  369. if not (quiet or silent):
  370. msg(f'Stopping {self.daemon.desc} on port {self.daemon.bind_port}')
  371. ret = await self.do_stop_daemon(silent=silent)
  372. if self.daemon.wait:
  373. self.daemon.wait_for_state('stopped')
  374. return ret
  375. else:
  376. if not (quiet or silent):
  377. msg(f'{self.daemon.desc} on port {self.daemon.bind_port} not running')
  378. return True
  379. def start_daemon(self, silent=False):
  380. return self.daemon.start(silent=silent)
  381. async def restart_daemon(self, quiet=False, silent=False):
  382. await self.stop_daemon(quiet=quiet, silent=silent)
  383. return self.daemon.start(silent=silent)
  384. def handle_unsupported_daemon_version(self, name, warn_only):
  385. class daemon_version_warning(oneshot_warning):
  386. color = 'yellow'
  387. message = 'ignoring unsupported {} daemon version at user request'
  388. if warn_only:
  389. daemon_version_warning(div=name, fmt_args=[self.daemon.coind_name])
  390. else:
  391. name = self.daemon.coind_name
  392. die(2, '\n'+fmt(f"""
  393. The running {name} daemon has version {self.daemon_version_str}.
  394. This version of MMGen is tested only on {name} v{self.daemon.coind_version_str} and below.
  395. To avoid this error, downgrade your daemon to a supported version.
  396. Alternatively, you may invoke the command with the --ignore-daemon-version
  397. option, in which case you proceed at your own risk.
  398. """, indent=' '))
  399. async def rpc_init(
  400. cfg,
  401. proto = None,
  402. backend = None,
  403. daemon = None,
  404. ignore_daemon_version = False,
  405. ignore_wallet = False):
  406. proto = proto or cfg._proto
  407. if not 'rpc_init' in proto.mmcaps:
  408. die(1, f'rpc_init() not supported for {proto.name} protocol!')
  409. cls = getattr(
  410. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.rpc'),
  411. proto.base_proto + 'RPCClient')
  412. from .daemon import CoinDaemon
  413. rpc = await cls(
  414. cfg = cfg,
  415. proto = proto,
  416. daemon = daemon or CoinDaemon(cfg, proto=proto, test_suite=cfg.test_suite),
  417. backend = backend or cfg.rpc_backend,
  418. ignore_wallet = ignore_wallet)
  419. if rpc.daemon_version > rpc.daemon.coind_version:
  420. rpc.handle_unsupported_daemon_version(
  421. proto.name,
  422. ignore_daemon_version or proto.ignore_daemon_version or cfg.ignore_daemon_version)
  423. if rpc.chain not in proto.chain_names:
  424. die('RPCChainMismatch', '\n' + fmt(f"""
  425. Protocol: {proto.cls_name}
  426. Valid chain names: {fmt_list(proto.chain_names, fmt='bare')}
  427. RPC client chain: {rpc.chain}
  428. """, indent=' ').rstrip())
  429. rpc.blockcount = NonNegativeInt(rpc.blockcount)
  430. return rpc