rpc.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. rpc: Cryptocoin RPC library for the MMGen suite
  20. """
  21. import sys,re,base64,json,asyncio,importlib
  22. from decimal import Decimal
  23. from collections import namedtuple
  24. from .util import msg,ymsg,die,fmt,fmt_list,pp_fmt,oneshot_warning
  25. from .base_obj import AsyncInit
  26. from .obj import NonNegativeInt
  27. from .objmethods import HiliteStr,InitErrors,MMGenObject
  28. auth_data = namedtuple('rpc_auth_data',['user','passwd'])
  29. def dmsg_rpc(fs,data=None,is_json=False):
  30. msg(
  31. fs if data is None else
  32. fs.format(pp_fmt(json.loads(data) if is_json else data))
  33. )
  34. def dmsg_rpc_backend(host_url,host_path,payload):
  35. msg(
  36. f'\n RPC URL: {host_url}{host_path}' +
  37. '\n RPC PAYLOAD data (httplib) ==>' +
  38. f'\n{pp_fmt(payload)}\n' )
  39. def noop(*args,**kwargs):
  40. pass
  41. class IPPort(HiliteStr,InitErrors):
  42. color = 'yellow'
  43. width = 0
  44. trunc_ok = False
  45. min_len = 9 # 0.0.0.0:0
  46. max_len = 21 # 255.255.255.255:65535
  47. def __new__(cls,s):
  48. if isinstance(s,cls):
  49. return s
  50. try:
  51. m = re.fullmatch(r'{q}\.{q}\.{q}\.{q}:(\d{{1,10}})'.format(q=r'([0-9]{1,3})'),s)
  52. assert m is not None, f'{s!r}: invalid IP:HOST specifier'
  53. for e in m.groups():
  54. if len(e) != 1 and e[0] == '0':
  55. raise ValueError(f'{e}: leading zeroes not permitted in dotted decimal element or port number')
  56. res = [int(e) for e in m.groups()]
  57. for e in res[:4]:
  58. assert e <= 255, f'{e}: dotted decimal element > 255'
  59. assert res[4] <= 65535, f'{res[4]}: port number > 65535'
  60. me = str.__new__(cls,s)
  61. me.ip = '{}.{}.{}.{}'.format(*res)
  62. me.ip_num = sum( res[i] * ( 2 ** (-(i-3)*8) ) for i in range(4) )
  63. me.port = res[4]
  64. return me
  65. except Exception as e:
  66. return cls.init_fail(e,s)
  67. class json_encoder(json.JSONEncoder):
  68. def default(self,obj):
  69. if isinstance(obj,Decimal):
  70. return str(obj)
  71. else:
  72. return json.JSONEncoder.default(self,obj)
  73. class RPCBackends:
  74. class base:
  75. def __init__(self,caller):
  76. self.cfg = caller.cfg
  77. self.host = caller.host
  78. self.port = caller.port
  79. self.proxy = caller.proxy
  80. self.host_url = caller.host_url
  81. self.timeout = caller.timeout
  82. self.http_hdrs = caller.http_hdrs
  83. self.name = type(self).__name__
  84. self.caller = caller
  85. class aiohttp(base,metaclass=AsyncInit):
  86. """
  87. Contrary to the requests library, aiohttp won’t read environment variables by
  88. default. But you can do so by passing trust_env=True into aiohttp.ClientSession
  89. constructor to honor HTTP_PROXY, HTTPS_PROXY, WS_PROXY or WSS_PROXY environment
  90. variables (all are case insensitive).
  91. """
  92. def __del__(self):
  93. self.connector.close()
  94. self.session.detach()
  95. del self.session
  96. async def __init__(self,caller):
  97. super().__init__(caller)
  98. import aiohttp
  99. self.connector = aiohttp.TCPConnector(limit_per_host=self.cfg.aiohttp_rpc_queue_len)
  100. self.session = aiohttp.ClientSession(
  101. headers = { 'Content-Type': 'application/json' },
  102. connector = self.connector,
  103. )
  104. if caller.auth_type == 'basic':
  105. self.auth = aiohttp.BasicAuth(*caller.auth,encoding='UTF-8')
  106. else:
  107. self.auth = None
  108. async def run(self,payload,timeout,host_path):
  109. dmsg_rpc_backend(self.host_url,host_path,payload)
  110. async with self.session.post(
  111. url = self.host_url + host_path,
  112. auth = self.auth,
  113. data = json.dumps(payload,cls=json_encoder),
  114. timeout = timeout or self.timeout,
  115. ) as res:
  116. return (await res.text(),res.status)
  117. class requests(base):
  118. def __del__(self):
  119. self.session.close()
  120. def __init__(self,caller):
  121. super().__init__(caller)
  122. import requests,urllib3
  123. urllib3.disable_warnings()
  124. self.session = requests.Session()
  125. self.session.trust_env = False # ignore *_PROXY environment vars
  126. self.session.headers = caller.http_hdrs
  127. if caller.auth_type:
  128. auth = 'HTTP' + caller.auth_type.capitalize() + 'Auth'
  129. self.session.auth = getattr(requests.auth,auth)(*caller.auth)
  130. if self.proxy: # used only by XMR for now: requires pysocks package
  131. self.session.proxies.update({
  132. 'http': f'socks5h://{self.proxy}',
  133. 'https': f'socks5h://{self.proxy}'
  134. })
  135. async def run(self,*args,**kwargs):
  136. return self.run_noasync(*args,**kwargs)
  137. def run_noasync(self,payload,timeout,host_path):
  138. dmsg_rpc_backend(self.host_url,host_path,payload)
  139. res = self.session.post(
  140. url = self.host_url + host_path,
  141. data = json.dumps(payload,cls=json_encoder),
  142. timeout = timeout or self.timeout,
  143. verify = False )
  144. return (res.content,res.status_code)
  145. class httplib(base):
  146. """
  147. Ignores *_PROXY environment vars
  148. """
  149. def __del__(self):
  150. self.session.close()
  151. def __init__(self,caller):
  152. super().__init__(caller)
  153. import http.client
  154. self.session = http.client.HTTPConnection(caller.host,caller.port,caller.timeout)
  155. if caller.auth_type == 'basic':
  156. auth_str = f'{caller.auth.user}:{caller.auth.passwd}'
  157. auth_str_b64 = 'Basic ' + base64.b64encode(auth_str.encode()).decode()
  158. self.http_hdrs.update({ 'Host': self.host, 'Authorization': auth_str_b64 })
  159. dmsg_rpc(f' RPC AUTHORIZATION data ==> raw: [{auth_str}]\n{"":>31}enc: [{auth_str_b64}]\n')
  160. async def run(self,payload,timeout,host_path):
  161. dmsg_rpc_backend(self.host_url,host_path,payload)
  162. if timeout:
  163. import http.client
  164. s = http.client.HTTPConnection(self.host,self.port,timeout)
  165. else:
  166. s = self.session
  167. try:
  168. s.request(
  169. method = 'POST',
  170. url = host_path,
  171. body = json.dumps(payload,cls=json_encoder),
  172. headers = self.http_hdrs )
  173. r = s.getresponse() # => http.client.HTTPResponse instance
  174. except Exception as e:
  175. die( 'RPCFailure', str(e) )
  176. if timeout:
  177. ret = ( r.read(), r.status )
  178. s.close()
  179. return ret
  180. else:
  181. return ( r.read(), r.status )
  182. class curl(base):
  183. def __init__(self,caller):
  184. def gen_opts():
  185. for k,v in caller.http_hdrs.items():
  186. for s in ('--header',f'{k}: {v}'):
  187. yield s
  188. if caller.auth_type:
  189. # Authentication with curl is insecure, as it exposes the user's credentials
  190. # via the command line. Use for testing only.
  191. for s in ('--user',f'{caller.auth.user}:{caller.auth.passwd}'):
  192. yield s
  193. if caller.auth_type == 'digest':
  194. yield '--digest'
  195. if caller.network_proto == 'https' and caller.verify_server is False:
  196. yield '--insecure'
  197. super().__init__(caller)
  198. self.exec_opts = list(gen_opts()) + ['--silent']
  199. self.arg_max = 8192 # set way below system ARG_MAX, just to be safe
  200. async def run(self,payload,timeout,host_path):
  201. data = json.dumps(payload,cls=json_encoder)
  202. if len(data) > self.arg_max:
  203. ymsg('Warning: Curl data payload length exceeded - falling back on httplib')
  204. return RPCBackends.httplib(self.caller).run(payload,timeout,host_path)
  205. dmsg_rpc_backend(self.host_url,host_path,payload)
  206. exec_cmd = [
  207. 'curl',
  208. '--proxy', f'socks5h://{self.proxy}' if self.proxy else '',
  209. '--connect-timeout', str(timeout or self.timeout),
  210. '--write-out', '%{http_code}',
  211. '--data-binary', data
  212. ] + self.exec_opts + [self.host_url + host_path]
  213. dmsg_rpc(' RPC curl exec data ==>\n{}\n',exec_cmd)
  214. from subprocess import run,PIPE
  215. from .color import set_vt100
  216. res = run(exec_cmd,stdout=PIPE,check=True,text=True).stdout
  217. set_vt100()
  218. return (res[:-3],int(res[-3:]))
  219. class RPCClient(MMGenObject):
  220. auth_type = None
  221. has_auth_cookie = False
  222. network_proto = 'http'
  223. proxy = None
  224. def __init__(self,cfg,host,port,test_connection=True):
  225. self.cfg = cfg
  226. self.name = type(self).__name__
  227. # aiohttp workaround, and may speed up RPC performance overall on some systems:
  228. if sys.platform == 'win32' and host == 'localhost':
  229. host = '127.0.0.1'
  230. global dmsg_rpc,dmsg_rpc_backend
  231. if not self.cfg.debug_rpc:
  232. dmsg_rpc = dmsg_rpc_backend = noop
  233. dmsg_rpc(f'=== {self.name}.__init__() debug ===')
  234. dmsg_rpc(f' cls [{self.name}] host [{host}] port [{port}]\n')
  235. if test_connection:
  236. import socket
  237. try:
  238. socket.create_connection((host,port),timeout=1).close()
  239. except:
  240. die( 'SocketError', f'Unable to connect to {host}:{port}' )
  241. self.http_hdrs = { 'Content-Type': 'application/json' }
  242. self.host_url = f'{self.network_proto}://{host}:{port}'
  243. self.host = host
  244. self.port = port
  245. self.timeout = self.cfg.http_timeout
  246. self.auth = None
  247. def _get_backend(self,backend):
  248. backend_id = backend or self.cfg.rpc_backend
  249. if backend_id == 'auto':
  250. return {
  251. 'linux': RPCBackends.httplib,
  252. 'darwin': RPCBackends.httplib,
  253. 'win32': RPCBackends.requests
  254. }[sys.platform](self)
  255. else:
  256. return getattr(RPCBackends,backend_id)(self)
  257. def set_backend(self,backend=None):
  258. self.backend = self._get_backend(backend)
  259. async def set_backend_async(self,backend=None):
  260. ret = self._get_backend(backend)
  261. self.backend = (await ret) if type(ret).__name__ == 'coroutine' else ret
  262. # Call family of methods - direct-to-daemon RPC call:
  263. # - positional params are passed to the daemon, 'timeout' and 'wallet' kwargs to the backend
  264. # - 'wallet' kwarg is used only by regtest
  265. async def call(self,method,*params,timeout=None,wallet=None):
  266. """
  267. default call: call with param list unrolled, exactly as with cli
  268. """
  269. return self.process_http_resp(await self.backend.run(
  270. payload = {'id': 1, 'jsonrpc': '2.0', 'method': method, 'params': params },
  271. timeout = timeout,
  272. host_path = self.make_host_path(wallet)
  273. ))
  274. async def batch_call(self,method,param_list,timeout=None,wallet=None):
  275. """
  276. Make a single call with a list of tuples as first argument
  277. For RPC calls that return a list of results
  278. """
  279. return self.process_http_resp(await self.backend.run(
  280. payload = [{
  281. 'id': n,
  282. 'jsonrpc': '2.0',
  283. 'method': method,
  284. 'params': params } for n,params in enumerate(param_list,1) ],
  285. timeout = timeout,
  286. host_path = self.make_host_path(wallet)
  287. ),batch=True)
  288. async def gathered_call(self,method,args_list,timeout=None,wallet=None):
  289. """
  290. Perform multiple RPC calls, returning results in a list
  291. Can be called two ways:
  292. 1) method = methodname, args_list = [args_tuple1, args_tuple2,...]
  293. 2) method = None, args_list = [(methodname1,args_tuple1), (methodname2,args_tuple2), ...]
  294. """
  295. cmd_list = args_list if method is None else tuple(zip([method] * len(args_list), args_list))
  296. cur_pos = 0
  297. chunk_size = 1024
  298. ret = []
  299. while cur_pos < len(cmd_list):
  300. tasks = [self.backend.run(
  301. payload = {'id': n, 'jsonrpc': '2.0', 'method': method, 'params': params },
  302. timeout = timeout,
  303. host_path = self.make_host_path(wallet)
  304. ) for n,(method,params) in enumerate(cmd_list[cur_pos:chunk_size+cur_pos],1)]
  305. ret.extend(await asyncio.gather(*tasks))
  306. cur_pos += chunk_size
  307. return [self.process_http_resp(r) for r in ret]
  308. # Icall family of methods - indirect RPC call using CallSigs mechanism:
  309. # - 'timeout' and 'wallet' kwargs are passed to corresponding Call method
  310. # - remaining kwargs are passed to CallSigs method
  311. # - CallSigs method returns method and positional params for Call method
  312. def icall(self,method,**kwargs):
  313. timeout = kwargs.pop('timeout',None)
  314. wallet = kwargs.pop('wallet',None)
  315. return self.call(
  316. *getattr(self.call_sigs,method)(**kwargs),
  317. timeout = timeout,
  318. wallet = wallet )
  319. def gathered_icall(self,method,args_list,timeout=None,wallet=None):
  320. return self.gathered_call(
  321. method,
  322. [getattr(self.call_sigs,method)(*a)[1:] for a in args_list],
  323. timeout = timeout,
  324. wallet = wallet )
  325. def process_http_resp(self,run_ret,batch=False,json_rpc=True):
  326. text,status = run_ret
  327. if status == 200:
  328. dmsg_rpc(' RPC RESPONSE data ==>\n{}\n',text,is_json=True)
  329. m = None
  330. if batch:
  331. return [r['result'] for r in json.loads(text,parse_float=Decimal)]
  332. else:
  333. try:
  334. if json_rpc:
  335. ret = json.loads(text,parse_float=Decimal)['result']
  336. if isinstance(ret,list) and ret and type(ret[0]) == dict and 'success' in ret[0]:
  337. for res in ret:
  338. if not res['success']:
  339. m = str(res['error'])
  340. assert False
  341. return ret
  342. else:
  343. return json.loads(text,parse_float=Decimal)
  344. except:
  345. if not m:
  346. t = json.loads(text)
  347. try:
  348. m = t['error']['message']
  349. except:
  350. try:
  351. m = t['error']
  352. except:
  353. m = t
  354. die('RPCFailure', m)
  355. else:
  356. import http
  357. m,s = ( '', http.HTTPStatus(status) )
  358. if text:
  359. try:
  360. m = json.loads(text)['error']['message']
  361. except:
  362. try:
  363. m = text.decode()
  364. except:
  365. m = text
  366. die( 'RPCFailure', f'{s.value} {s.name}: {m}' )
  367. async def stop_daemon(self,quiet=False,silent=False):
  368. if self.daemon.state == 'ready':
  369. if not (quiet or silent):
  370. msg(f'Stopping {self.daemon.desc} on port {self.daemon.bind_port}')
  371. ret = await self.do_stop_daemon(silent=silent)
  372. if self.daemon.wait:
  373. self.daemon.wait_for_state('stopped')
  374. return ret
  375. else:
  376. if not (quiet or silent):
  377. msg(f'{self.daemon.desc} on port {self.daemon.bind_port} not running')
  378. return True
  379. def start_daemon(self,silent=False):
  380. return self.daemon.start(silent=silent)
  381. async def restart_daemon(self,quiet=False,silent=False):
  382. await self.stop_daemon(quiet=quiet,silent=silent)
  383. return self.daemon.start(silent=silent)
  384. def handle_unsupported_daemon_version(self,name,warn_only):
  385. class daemon_version_warning(oneshot_warning):
  386. color = 'yellow'
  387. message = 'ignoring unsupported {} daemon version at user request'
  388. if warn_only:
  389. daemon_version_warning(div=name,fmt_args=[self.daemon.coind_name])
  390. else:
  391. name = self.daemon.coind_name
  392. die(2,'\n'+fmt(f"""
  393. The running {name} daemon has version {self.daemon_version_str}.
  394. This version of MMGen is tested only on {name} v{self.daemon.coind_version_str} and below.
  395. To avoid this error, downgrade your daemon to a supported version.
  396. Alternatively, you may invoke the command with the --ignore-daemon-version
  397. option, in which case you proceed at your own risk.
  398. """,indent=' '))
  399. async def rpc_init(
  400. cfg,
  401. proto = None,
  402. backend = None,
  403. daemon = None,
  404. ignore_daemon_version = False,
  405. ignore_wallet = False ):
  406. proto = proto or cfg._proto
  407. if not 'rpc_init' in proto.mmcaps:
  408. die(1,f'rpc_init() not supported for {proto.name} protocol!')
  409. cls = getattr(
  410. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.rpc'),
  411. proto.base_proto + 'RPCClient' )
  412. from .daemon import CoinDaemon
  413. rpc = await cls(
  414. cfg = cfg,
  415. proto = proto,
  416. daemon = daemon or CoinDaemon(cfg,proto=proto,test_suite=cfg.test_suite),
  417. backend = backend or cfg.rpc_backend,
  418. ignore_wallet = ignore_wallet )
  419. if rpc.daemon_version > rpc.daemon.coind_version:
  420. rpc.handle_unsupported_daemon_version(
  421. proto.name,
  422. ignore_daemon_version or proto.ignore_daemon_version or cfg.ignore_daemon_version )
  423. if rpc.chain not in proto.chain_names:
  424. die( 'RPCChainMismatch', '\n' + fmt(f"""
  425. Protocol: {proto.cls_name}
  426. Valid chain names: {fmt_list(proto.chain_names,fmt='bare')}
  427. RPC client chain: {rpc.chain}
  428. """,indent=' ').rstrip() )
  429. rpc.blockcount = NonNegativeInt(rpc.blockcount)
  430. return rpc