rpc.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. rpc: Cryptocoin RPC library for the MMGen suite
  20. """
  21. import sys,re,base64,json,asyncio,importlib
  22. from decimal import Decimal
  23. from collections import namedtuple
  24. from .util import msg,ymsg,die,fmt,fmt_list,pp_fmt,oneshot_warning
  25. from .base_obj import AsyncInit
  26. from .obj import NonNegativeInt
  27. from .objmethods import HiliteStr,InitErrors,MMGenObject
  28. auth_data = namedtuple('rpc_auth_data',['user','passwd'])
  29. def dmsg_rpc(fs,data=None,is_json=False):
  30. msg(
  31. fs if data is None else
  32. fs.format(pp_fmt(json.loads(data) if is_json else data))
  33. )
  34. def dmsg_rpc_backend(host_url,host_path,payload):
  35. msg(
  36. f'\n RPC URL: {host_url}{host_path}' +
  37. '\n RPC PAYLOAD data (httplib) ==>' +
  38. f'\n{pp_fmt(payload)}\n' )
  39. def noop(*args,**kwargs):
  40. pass
  41. class IPPort(HiliteStr,InitErrors):
  42. color = 'yellow'
  43. width = 0
  44. trunc_ok = False
  45. min_len = 9 # 0.0.0.0:0
  46. max_len = 21 # 255.255.255.255:65535
  47. def __new__(cls,s):
  48. if isinstance(s,cls):
  49. return s
  50. try:
  51. m = re.fullmatch(r'{q}\.{q}\.{q}\.{q}:(\d{{1,10}})'.format(q=r'([0-9]{1,3})'),s)
  52. assert m is not None, f'{s!r}: invalid IP:HOST specifier'
  53. for e in m.groups():
  54. if len(e) != 1 and e[0] == '0':
  55. raise ValueError(f'{e}: leading zeroes not permitted in dotted decimal element or port number')
  56. res = [int(e) for e in m.groups()]
  57. for e in res[:4]:
  58. assert e <= 255, f'{e}: dotted decimal element > 255'
  59. assert res[4] <= 65535, f'{res[4]}: port number > 65535'
  60. me = str.__new__(cls,s)
  61. me.ip = '{}.{}.{}.{}'.format(*res)
  62. me.ip_num = sum( res[i] * ( 2 ** (-(i-3)*8) ) for i in range(4) )
  63. me.port = res[4]
  64. return me
  65. except Exception as e:
  66. return cls.init_fail(e,s)
  67. class json_encoder(json.JSONEncoder):
  68. def default(self,obj):
  69. if isinstance(obj,Decimal):
  70. return str(obj)
  71. else:
  72. return json.JSONEncoder.default(self,obj)
  73. class RPCBackends:
  74. class base:
  75. def __init__(self,caller):
  76. self.cfg = caller.cfg
  77. self.host = caller.host
  78. self.port = caller.port
  79. self.proxy = caller.proxy
  80. self.host_url = caller.host_url
  81. self.timeout = caller.timeout
  82. self.http_hdrs = caller.http_hdrs
  83. self.name = type(self).__name__
  84. self.caller = caller
  85. class aiohttp(base,metaclass=AsyncInit):
  86. """
  87. Contrary to the requests library, aiohttp won’t read environment variables by
  88. default. But you can do so by passing trust_env=True into aiohttp.ClientSession
  89. constructor to honor HTTP_PROXY, HTTPS_PROXY, WS_PROXY or WSS_PROXY environment
  90. variables (all are case insensitive).
  91. """
  92. def __del__(self):
  93. self.connector.close()
  94. self.session.detach()
  95. del self.session
  96. async def __init__(self,caller):
  97. super().__init__(caller)
  98. import aiohttp
  99. self.connector = aiohttp.TCPConnector(limit_per_host=self.cfg.aiohttp_rpc_queue_len)
  100. self.session = aiohttp.ClientSession(
  101. headers = { 'Content-Type': 'application/json' },
  102. connector = self.connector,
  103. )
  104. if caller.auth_type == 'basic':
  105. self.auth = aiohttp.BasicAuth(*caller.auth,encoding='UTF-8')
  106. else:
  107. self.auth = None
  108. async def run(self,payload,timeout,host_path):
  109. dmsg_rpc_backend(self.host_url,host_path,payload)
  110. async with self.session.post(
  111. url = self.host_url + host_path,
  112. auth = self.auth,
  113. data = json.dumps(payload,cls=json_encoder),
  114. timeout = timeout or self.timeout,
  115. ) as res:
  116. return (await res.text(),res.status)
  117. class requests(base):
  118. def __del__(self):
  119. self.session.close()
  120. def __init__(self,caller):
  121. super().__init__(caller)
  122. import requests,urllib3
  123. urllib3.disable_warnings()
  124. self.session = requests.Session()
  125. self.session.trust_env = False # ignore *_PROXY environment vars
  126. self.session.headers = caller.http_hdrs
  127. if caller.auth_type:
  128. auth = 'HTTP' + caller.auth_type.capitalize() + 'Auth'
  129. self.session.auth = getattr(requests.auth,auth)(*caller.auth)
  130. if self.proxy: # used only by XMR for now: requires pysocks package
  131. self.session.proxies.update({
  132. 'http': f'socks5h://{self.proxy}',
  133. 'https': f'socks5h://{self.proxy}'
  134. })
  135. async def run(self,*args,**kwargs):
  136. return self.run_noasync(*args,**kwargs)
  137. def run_noasync(self,payload,timeout,host_path):
  138. dmsg_rpc_backend(self.host_url,host_path,payload)
  139. res = self.session.post(
  140. url = self.host_url + host_path,
  141. data = json.dumps(payload,cls=json_encoder),
  142. timeout = timeout or self.timeout,
  143. verify = False )
  144. return (res.content,res.status_code)
  145. class httplib(base):
  146. """
  147. Ignores *_PROXY environment vars
  148. """
  149. def __del__(self):
  150. self.session.close()
  151. def __init__(self,caller):
  152. super().__init__(caller)
  153. import http.client
  154. self.session = http.client.HTTPConnection(caller.host,caller.port,caller.timeout)
  155. if caller.auth_type == 'basic':
  156. auth_str = f'{caller.auth.user}:{caller.auth.passwd}'
  157. auth_str_b64 = 'Basic ' + base64.b64encode(auth_str.encode()).decode()
  158. self.http_hdrs.update({ 'Host': self.host, 'Authorization': auth_str_b64 })
  159. dmsg_rpc(f' RPC AUTHORIZATION data ==> raw: [{auth_str}]\n{"":>31}enc: [{auth_str_b64}]\n')
  160. async def run(self,payload,timeout,host_path):
  161. dmsg_rpc_backend(self.host_url,host_path,payload)
  162. if timeout:
  163. import http.client
  164. s = http.client.HTTPConnection(self.host,self.port,timeout)
  165. else:
  166. s = self.session
  167. try:
  168. s.request(
  169. method = 'POST',
  170. url = host_path,
  171. body = json.dumps(payload,cls=json_encoder),
  172. headers = self.http_hdrs )
  173. r = s.getresponse() # => http.client.HTTPResponse instance
  174. except Exception as e:
  175. die( 'RPCFailure', str(e) )
  176. if timeout:
  177. ret = ( r.read(), r.status )
  178. s.close()
  179. return ret
  180. else:
  181. return ( r.read(), r.status )
  182. class curl(base):
  183. def __init__(self,caller):
  184. def gen_opts():
  185. for k,v in caller.http_hdrs.items():
  186. for s in ('--header',f'{k}: {v}'):
  187. yield s
  188. if caller.auth_type:
  189. # Authentication with curl is insecure, as it exposes the user's credentials
  190. # via the command line. Use for testing only.
  191. for s in ('--user',f'{caller.auth.user}:{caller.auth.passwd}'):
  192. yield s
  193. if caller.auth_type == 'digest':
  194. yield '--digest'
  195. if caller.network_proto == 'https' and caller.verify_server is False:
  196. yield '--insecure'
  197. super().__init__(caller)
  198. self.exec_opts = list(gen_opts()) + ['--silent']
  199. self.arg_max = 8192 # set way below system ARG_MAX, just to be safe
  200. async def run(self,payload,timeout,host_path):
  201. data = json.dumps(payload,cls=json_encoder)
  202. if len(data) > self.arg_max:
  203. ymsg('Warning: Curl data payload length exceeded - falling back on httplib')
  204. return RPCBackends.httplib(self.caller).run(payload,timeout,host_path)
  205. dmsg_rpc_backend(self.host_url,host_path,payload)
  206. exec_cmd = [
  207. 'curl',
  208. '--proxy', f'socks5h://{self.proxy}' if self.proxy else '',
  209. '--connect-timeout', str(timeout or self.timeout),
  210. '--write-out', '%{http_code}',
  211. '--data-binary', data
  212. ] + self.exec_opts + [self.host_url + host_path]
  213. dmsg_rpc(' RPC curl exec data ==>\n{}\n',exec_cmd)
  214. from subprocess import run,PIPE
  215. from .color import set_vt100
  216. res = run(exec_cmd,stdout=PIPE,check=True,text=True).stdout
  217. set_vt100()
  218. return (res[:-3],int(res[-3:]))
  219. class RPCClient(MMGenObject):
  220. auth_type = None
  221. has_auth_cookie = False
  222. network_proto = 'http'
  223. proxy = None
  224. def __init__(self,cfg,host,port,test_connection=True):
  225. self.cfg = cfg
  226. self.name = type(self).__name__
  227. # aiohttp workaround, and may speed up RPC performance overall on some systems:
  228. if sys.platform == 'win32' and host == 'localhost':
  229. host = '127.0.0.1'
  230. global dmsg_rpc,dmsg_rpc_backend
  231. if not self.cfg.debug_rpc:
  232. dmsg_rpc = dmsg_rpc_backend = noop
  233. dmsg_rpc(f'=== {self.name}.__init__() debug ===')
  234. dmsg_rpc(f' cls [{self.name}] host [{host}] port [{port}]\n')
  235. if test_connection:
  236. import socket
  237. try:
  238. socket.create_connection((host,port),timeout=1).close()
  239. except:
  240. die( 'SocketError', f'Unable to connect to {host}:{port}' )
  241. self.http_hdrs = { 'Content-Type': 'application/json' }
  242. self.host_url = f'{self.network_proto}://{host}:{port}'
  243. self.host = host
  244. self.port = port
  245. self.timeout = self.cfg.http_timeout
  246. self.auth = None
  247. def _get_backend(self,backend):
  248. backend_id = backend or self.cfg.rpc_backend
  249. if backend_id == 'auto':
  250. return {'linux':RPCBackends.httplib,'win32':RPCBackends.requests}[sys.platform](self)
  251. else:
  252. return getattr(RPCBackends,backend_id)(self)
  253. def set_backend(self,backend=None):
  254. self.backend = self._get_backend(backend)
  255. async def set_backend_async(self,backend=None):
  256. ret = self._get_backend(backend)
  257. self.backend = (await ret) if type(ret).__name__ == 'coroutine' else ret
  258. # Call family of methods - direct-to-daemon RPC call:
  259. # - positional params are passed to the daemon, 'timeout' and 'wallet' kwargs to the backend
  260. # - 'wallet' kwarg is used only by regtest
  261. async def call(self,method,*params,timeout=None,wallet=None):
  262. """
  263. default call: call with param list unrolled, exactly as with cli
  264. """
  265. return self.process_http_resp(await self.backend.run(
  266. payload = {'id': 1, 'jsonrpc': '2.0', 'method': method, 'params': params },
  267. timeout = timeout,
  268. host_path = self.make_host_path(wallet)
  269. ))
  270. async def batch_call(self,method,param_list,timeout=None,wallet=None):
  271. """
  272. Make a single call with a list of tuples as first argument
  273. For RPC calls that return a list of results
  274. """
  275. return self.process_http_resp(await self.backend.run(
  276. payload = [{
  277. 'id': n,
  278. 'jsonrpc': '2.0',
  279. 'method': method,
  280. 'params': params } for n,params in enumerate(param_list,1) ],
  281. timeout = timeout,
  282. host_path = self.make_host_path(wallet)
  283. ),batch=True)
  284. async def gathered_call(self,method,args_list,timeout=None,wallet=None):
  285. """
  286. Perform multiple RPC calls, returning results in a list
  287. Can be called two ways:
  288. 1) method = methodname, args_list = [args_tuple1, args_tuple2,...]
  289. 2) method = None, args_list = [(methodname1,args_tuple1), (methodname2,args_tuple2), ...]
  290. """
  291. cmd_list = args_list if method is None else tuple(zip([method] * len(args_list), args_list))
  292. cur_pos = 0
  293. chunk_size = 1024
  294. ret = []
  295. while cur_pos < len(cmd_list):
  296. tasks = [self.backend.run(
  297. payload = {'id': n, 'jsonrpc': '2.0', 'method': method, 'params': params },
  298. timeout = timeout,
  299. host_path = self.make_host_path(wallet)
  300. ) for n,(method,params) in enumerate(cmd_list[cur_pos:chunk_size+cur_pos],1)]
  301. ret.extend(await asyncio.gather(*tasks))
  302. cur_pos += chunk_size
  303. return [self.process_http_resp(r) for r in ret]
  304. # Icall family of methods - indirect RPC call using CallSigs mechanism:
  305. # - 'timeout' and 'wallet' kwargs are passed to corresponding Call method
  306. # - remaining kwargs are passed to CallSigs method
  307. # - CallSigs method returns method and positional params for Call method
  308. def icall(self,method,**kwargs):
  309. timeout = kwargs.pop('timeout',None)
  310. wallet = kwargs.pop('wallet',None)
  311. return self.call(
  312. *getattr(self.call_sigs,method)(**kwargs),
  313. timeout = timeout,
  314. wallet = wallet )
  315. def gathered_icall(self,method,args_list,timeout=None,wallet=None):
  316. return self.gathered_call(
  317. method,
  318. [getattr(self.call_sigs,method)(*a)[1:] for a in args_list],
  319. timeout = timeout,
  320. wallet = wallet )
  321. def process_http_resp(self,run_ret,batch=False,json_rpc=True):
  322. text,status = run_ret
  323. if status == 200:
  324. dmsg_rpc(' RPC RESPONSE data ==>\n{}\n',text,is_json=True)
  325. m = None
  326. if batch:
  327. return [r['result'] for r in json.loads(text,parse_float=Decimal)]
  328. else:
  329. try:
  330. if json_rpc:
  331. ret = json.loads(text,parse_float=Decimal)['result']
  332. if isinstance(ret,list) and ret and type(ret[0]) == dict and 'success' in ret[0]:
  333. for res in ret:
  334. if not res['success']:
  335. m = str(res['error'])
  336. assert False
  337. return ret
  338. else:
  339. return json.loads(text,parse_float=Decimal)
  340. except:
  341. if not m:
  342. t = json.loads(text)
  343. try:
  344. m = t['error']['message']
  345. except:
  346. try:
  347. m = t['error']
  348. except:
  349. m = t
  350. die('RPCFailure', m)
  351. else:
  352. import http
  353. m,s = ( '', http.HTTPStatus(status) )
  354. if text:
  355. try:
  356. m = json.loads(text)['error']['message']
  357. except:
  358. try:
  359. m = text.decode()
  360. except:
  361. m = text
  362. die( 'RPCFailure', f'{s.value} {s.name}: {m}' )
  363. async def stop_daemon(self,quiet=False,silent=False):
  364. if self.daemon.state == 'ready':
  365. if not (quiet or silent):
  366. msg(f'Stopping {self.daemon.desc} on port {self.daemon.bind_port}')
  367. ret = await self.do_stop_daemon(silent=silent)
  368. if self.daemon.wait:
  369. self.daemon.wait_for_state('stopped')
  370. return ret
  371. else:
  372. if not (quiet or silent):
  373. msg(f'{self.daemon.desc} on port {self.daemon.bind_port} not running')
  374. return True
  375. def start_daemon(self,silent=False):
  376. return self.daemon.start(silent=silent)
  377. async def restart_daemon(self,quiet=False,silent=False):
  378. await self.stop_daemon(quiet=quiet,silent=silent)
  379. return self.daemon.start(silent=silent)
  380. def handle_unsupported_daemon_version(self,name,warn_only):
  381. class daemon_version_warning(oneshot_warning):
  382. color = 'yellow'
  383. message = 'ignoring unsupported {} daemon version at user request'
  384. if warn_only:
  385. daemon_version_warning(div=name,fmt_args=[self.daemon.coind_name])
  386. else:
  387. name = self.daemon.coind_name
  388. die(2,'\n'+fmt(f"""
  389. The running {name} daemon has version {self.daemon_version_str}.
  390. This version of MMGen is tested only on {name} v{self.daemon.coind_version_str} and below.
  391. To avoid this error, downgrade your daemon to a supported version.
  392. Alternatively, you may invoke the command with the --ignore-daemon-version
  393. option, in which case you proceed at your own risk.
  394. """,indent=' '))
  395. async def rpc_init(
  396. cfg,
  397. proto = None,
  398. backend = None,
  399. daemon = None,
  400. ignore_daemon_version = False,
  401. ignore_wallet = False ):
  402. proto = proto or cfg._proto
  403. if not 'rpc_init' in proto.mmcaps:
  404. die(1,f'rpc_init() not supported for {proto.name} protocol!')
  405. cls = getattr(
  406. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.rpc'),
  407. proto.base_proto + 'RPCClient' )
  408. from .daemon import CoinDaemon
  409. rpc = await cls(
  410. cfg = cfg,
  411. proto = proto,
  412. daemon = daemon or CoinDaemon(cfg,proto=proto,test_suite=cfg.test_suite),
  413. backend = backend or cfg.rpc_backend,
  414. ignore_wallet = ignore_wallet )
  415. if rpc.daemon_version > rpc.daemon.coind_version:
  416. rpc.handle_unsupported_daemon_version(
  417. proto.name,
  418. ignore_daemon_version or proto.ignore_daemon_version or cfg.ignore_daemon_version )
  419. if rpc.chain not in proto.chain_names:
  420. die( 'RPCChainMismatch', '\n' + fmt(f"""
  421. Protocol: {proto.cls_name}
  422. Valid chain names: {fmt_list(proto.chain_names,fmt='bare')}
  423. RPC client chain: {rpc.chain}
  424. """,indent=' ').rstrip() )
  425. rpc.blockcount = NonNegativeInt(rpc.blockcount)
  426. return rpc