rpc.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. rpc: Cryptocoin RPC library for the MMGen suite
  20. """
  21. import re,base64,json,asyncio,importlib
  22. from decimal import Decimal
  23. from collections import namedtuple
  24. from .cfg import gc
  25. from .util import msg,die,fmt,fmt_list,pp_fmt,oneshot_warning
  26. from .base_obj import AsyncInit
  27. from .obj import NonNegativeInt
  28. from .objmethods import Hilite,InitErrors,MMGenObject
  29. auth_data = namedtuple('rpc_auth_data',['user','passwd'])
  30. def dmsg_rpc(fs,data=None,is_json=False):
  31. msg(
  32. fs if data == None else
  33. fs.format(pp_fmt(json.loads(data) if is_json else data))
  34. )
  35. def dmsg_rpc_backend(host_url,host_path,payload):
  36. msg(
  37. f'\n RPC URL: {host_url}{host_path}' +
  38. f'\n RPC PAYLOAD data (httplib) ==>' +
  39. f'\n{pp_fmt(payload)}\n' )
  40. def noop(*args,**kwargs):
  41. pass
  42. class IPPort(str,Hilite,InitErrors):
  43. color = 'yellow'
  44. width = 0
  45. trunc_ok = False
  46. min_len = 9 # 0.0.0.0:0
  47. max_len = 21 # 255.255.255.255:65535
  48. def __new__(cls,s):
  49. if type(s) == cls:
  50. return s
  51. try:
  52. m = re.fullmatch(r'{q}\.{q}\.{q}\.{q}:(\d{{1,10}})'.format(q=r'([0-9]{1,3})'),s)
  53. assert m is not None, f'{s!r}: invalid IP:HOST specifier'
  54. for e in m.groups():
  55. if len(e) != 1 and e[0] == '0':
  56. raise ValueError(f'{e}: leading zeroes not permitted in dotted decimal element or port number')
  57. res = [int(e) for e in m.groups()]
  58. for e in res[:4]:
  59. assert e <= 255, f'{e}: dotted decimal element > 255'
  60. assert res[4] <= 65535, f'{res[4]}: port number > 65535'
  61. me = str.__new__(cls,s)
  62. me.ip = '{}.{}.{}.{}'.format(*res)
  63. me.ip_num = sum( res[i] * ( 2 ** (-(i-3)*8) ) for i in range(4) )
  64. me.port = res[4]
  65. return me
  66. except Exception as e:
  67. return cls.init_fail(e,s)
  68. class json_encoder(json.JSONEncoder):
  69. def default(self,obj):
  70. if isinstance(obj,Decimal):
  71. return str(obj)
  72. else:
  73. return json.JSONEncoder.default(self,obj)
  74. class RPCBackends:
  75. class base:
  76. def __init__(self,caller):
  77. self.cfg = caller.cfg
  78. self.host = caller.host
  79. self.port = caller.port
  80. self.proxy = caller.proxy
  81. self.host_url = caller.host_url
  82. self.timeout = caller.timeout
  83. self.http_hdrs = caller.http_hdrs
  84. self.name = type(self).__name__
  85. class aiohttp(base,metaclass=AsyncInit):
  86. """
  87. Contrary to the requests library, aiohttp won’t read environment variables by
  88. default. But you can do so by passing trust_env=True into aiohttp.ClientSession
  89. constructor to honor HTTP_PROXY, HTTPS_PROXY, WS_PROXY or WSS_PROXY environment
  90. variables (all are case insensitive).
  91. """
  92. def __del__(self):
  93. self.connector.close()
  94. self.session.detach()
  95. del self.session
  96. async def __init__(self,caller):
  97. super().__init__(caller)
  98. import aiohttp
  99. self.connector = aiohttp.TCPConnector(limit_per_host=self.cfg.aiohttp_rpc_queue_len)
  100. self.session = aiohttp.ClientSession(
  101. headers = { 'Content-Type': 'application/json' },
  102. connector = self.connector,
  103. )
  104. if caller.auth_type == 'basic':
  105. self.auth = aiohttp.BasicAuth(*caller.auth,encoding='UTF-8')
  106. else:
  107. self.auth = None
  108. async def run(self,payload,timeout,host_path):
  109. dmsg_rpc_backend(self.host_url,host_path,payload)
  110. async with self.session.post(
  111. url = self.host_url + host_path,
  112. auth = self.auth,
  113. data = json.dumps(payload,cls=json_encoder),
  114. timeout = timeout or self.timeout,
  115. ) as res:
  116. return (await res.text(),res.status)
  117. class requests(base):
  118. def __del__(self):
  119. self.session.close()
  120. def __init__(self,caller):
  121. super().__init__(caller)
  122. import requests,urllib3
  123. urllib3.disable_warnings()
  124. self.session = requests.Session()
  125. self.session.trust_env = False # ignore *_PROXY environment vars
  126. self.session.headers = caller.http_hdrs
  127. if caller.auth_type:
  128. auth = 'HTTP' + caller.auth_type.capitalize() + 'Auth'
  129. self.session.auth = getattr(requests.auth,auth)(*caller.auth)
  130. if self.proxy:
  131. self.session.proxies.update({
  132. 'http': f'socks5h://{self.proxy}',
  133. 'https': f'socks5h://{self.proxy}'
  134. })
  135. async def run(self,*args,**kwargs):
  136. return self.run_noasync(*args,**kwargs)
  137. def run_noasync(self,payload,timeout,host_path):
  138. dmsg_rpc_backend(self.host_url,host_path,payload)
  139. res = self.session.post(
  140. url = self.host_url + host_path,
  141. data = json.dumps(payload,cls=json_encoder),
  142. timeout = timeout or self.timeout,
  143. verify = False )
  144. return (res.content,res.status_code)
  145. class httplib(base):
  146. """
  147. Ignores *_PROXY environment vars
  148. """
  149. def __del__(self):
  150. self.session.close()
  151. def __init__(self,caller):
  152. super().__init__(caller)
  153. import http.client
  154. self.session = http.client.HTTPConnection(caller.host,caller.port,caller.timeout)
  155. if caller.auth_type == 'basic':
  156. auth_str = f'{caller.auth.user}:{caller.auth.passwd}'
  157. auth_str_b64 = 'Basic ' + base64.b64encode(auth_str.encode()).decode()
  158. self.http_hdrs.update({ 'Host': self.host, 'Authorization': auth_str_b64 })
  159. dmsg_rpc(' RPC AUTHORIZATION data ==> raw: [{}]\n{:>31}enc: [{}]\n'.format(
  160. auth_str,
  161. '',
  162. auth_str_b64 ))
  163. async def run(self,payload,timeout,host_path):
  164. dmsg_rpc_backend(self.host_url,host_path,payload)
  165. if timeout:
  166. import http.client
  167. s = http.client.HTTPConnection(self.host,self.port,timeout)
  168. else:
  169. s = self.session
  170. try:
  171. s.request(
  172. method = 'POST',
  173. url = host_path,
  174. body = json.dumps(payload,cls=json_encoder),
  175. headers = self.http_hdrs )
  176. r = s.getresponse() # => http.client.HTTPResponse instance
  177. except Exception as e:
  178. die( 'RPCFailure', str(e) )
  179. if timeout:
  180. ret = ( r.read(), r.status )
  181. s.close()
  182. return ret
  183. else:
  184. return ( r.read(), r.status )
  185. class curl(base):
  186. def __init__(self,caller):
  187. def gen_opts():
  188. for k,v in caller.http_hdrs.items():
  189. for s in ('--header',f'{k}: {v}'):
  190. yield s
  191. if caller.auth_type:
  192. """
  193. Authentication with curl is insecure, as it exposes the user's credentials
  194. via the command line. Use for testing only.
  195. """
  196. for s in ('--user',f'{caller.auth.user}:{caller.auth.passwd}'):
  197. yield s
  198. if caller.auth_type == 'digest':
  199. yield '--digest'
  200. if caller.network_proto == 'https' and caller.verify_server == False:
  201. yield '--insecure'
  202. super().__init__(caller)
  203. self.exec_opts = list(gen_opts()) + ['--silent']
  204. self.arg_max = 8192 # set way below system ARG_MAX, just to be safe
  205. async def run(self,payload,timeout,host_path):
  206. data = json.dumps(payload,cls=json_encoder)
  207. if len(data) > self.arg_max:
  208. return self.httplib(payload,timeout=timeout)
  209. dmsg_rpc_backend(self.host_url,host_path,payload)
  210. exec_cmd = [
  211. 'curl',
  212. '--proxy', f'socks5h://{self.proxy}' if self.proxy else '',
  213. '--connect-timeout', str(timeout or self.timeout),
  214. '--write-out', '%{http_code}',
  215. '--data-binary', data
  216. ] + self.exec_opts + [self.host_url + host_path]
  217. dmsg_rpc(' RPC curl exec data ==>\n{}\n',exec_cmd)
  218. from subprocess import run,PIPE
  219. from .color import set_vt100
  220. res = run(exec_cmd,stdout=PIPE,check=True).stdout.decode()
  221. set_vt100()
  222. # res = run(exec_cmd,stdout=PIPE,check=True,text='UTF-8').stdout # Python 3.7+
  223. return (res[:-3],int(res[-3:]))
  224. class RPCClient(MMGenObject):
  225. auth_type = None
  226. has_auth_cookie = False
  227. network_proto = 'http'
  228. proxy = None
  229. def __init__(self,cfg,host,port,test_connection=True):
  230. self.cfg = cfg
  231. # aiohttp workaround, and may speed up RPC performance overall on some systems:
  232. if gc.platform == 'win' and host == 'localhost':
  233. host = '127.0.0.1'
  234. global dmsg_rpc,dmsg_rpc_backend
  235. if not self.cfg.debug_rpc:
  236. dmsg_rpc = dmsg_rpc_backend = noop
  237. dmsg_rpc(f'=== {type(self).__name__}.__init__() debug ===')
  238. dmsg_rpc(f' cls [{type(self).__name__}] host [{host}] port [{port}]\n')
  239. if test_connection:
  240. import socket
  241. try:
  242. socket.create_connection((host,port),timeout=1).close()
  243. except:
  244. die( 'SocketError', f'Unable to connect to {host}:{port}' )
  245. self.http_hdrs = { 'Content-Type': 'application/json' }
  246. self.host_url = f'{self.network_proto}://{host}:{port}'
  247. self.host = host
  248. self.port = port
  249. self.timeout = self.cfg.http_timeout
  250. self.auth = None
  251. def _get_backend(self,backend):
  252. backend_id = backend or self.cfg.rpc_backend
  253. if backend_id == 'auto':
  254. return {'linux':RPCBackends.httplib,'win':RPCBackends.requests}[gc.platform](self)
  255. else:
  256. return getattr(RPCBackends,backend_id)(self)
  257. def set_backend(self,backend=None):
  258. self.backend = self._get_backend(backend)
  259. async def set_backend_async(self,backend=None):
  260. ret = self._get_backend(backend)
  261. self.backend = (await ret) if type(ret).__name__ == 'coroutine' else ret
  262. # Call family of methods - direct-to-daemon RPC call:
  263. # positional params are passed to the daemon, 'timeout' and 'wallet' kwargs to the backend
  264. async def call(self,method,*params,timeout=None,wallet=None):
  265. """
  266. default call: call with param list unrolled, exactly as with cli
  267. """
  268. return self.process_http_resp(await self.backend.run(
  269. payload = {'id': 1, 'jsonrpc': '2.0', 'method': method, 'params': params },
  270. timeout = timeout,
  271. host_path = self.make_host_path(wallet)
  272. ))
  273. async def batch_call(self,method,param_list,timeout=None,wallet=None):
  274. """
  275. Make a single call with a list of tuples as first argument
  276. For RPC calls that return a list of results
  277. """
  278. return self.process_http_resp(await self.backend.run(
  279. payload = [{
  280. 'id': n,
  281. 'jsonrpc': '2.0',
  282. 'method': method,
  283. 'params': params } for n,params in enumerate(param_list,1) ],
  284. timeout = timeout,
  285. host_path = self.make_host_path(wallet)
  286. ),batch=True)
  287. async def gathered_call(self,method,args_list,timeout=None,wallet=None):
  288. """
  289. Perform multiple RPC calls, returning results in a list
  290. Can be called two ways:
  291. 1) method = methodname, args_list = [args_tuple1, args_tuple2,...]
  292. 2) method = None, args_list = [(methodname1,args_tuple1), (methodname2,args_tuple2), ...]
  293. """
  294. cmd_list = args_list if method == None else tuple(zip([method] * len(args_list), args_list))
  295. cur_pos = 0
  296. chunk_size = 1024
  297. ret = []
  298. while cur_pos < len(cmd_list):
  299. tasks = [self.backend.run(
  300. payload = {'id': n, 'jsonrpc': '2.0', 'method': method, 'params': params },
  301. timeout = timeout,
  302. host_path = self.make_host_path(wallet)
  303. ) for n,(method,params) in enumerate(cmd_list[cur_pos:chunk_size+cur_pos],1)]
  304. ret.extend(await asyncio.gather(*tasks))
  305. cur_pos += chunk_size
  306. return [self.process_http_resp(r) for r in ret]
  307. # Icall family of methods - indirect RPC call using CallSigs mechanism:
  308. # - 'timeout' and 'wallet' kwargs are passed to corresponding Call method
  309. # - remaining kwargs are passed to CallSigs method
  310. # - CallSigs method returns method and positional params for Call method
  311. def icall(self,method,**kwargs):
  312. timeout = kwargs.pop('timeout',None)
  313. wallet = kwargs.pop('wallet',None)
  314. return self.call(
  315. *getattr(self.call_sigs,method)(**kwargs),
  316. timeout = timeout,
  317. wallet = wallet )
  318. def gathered_icall(self,method,args_list,timeout=None,wallet=None):
  319. return self.gathered_call(
  320. method,
  321. [getattr(self.call_sigs,method)(*a)[1:] for a in args_list],
  322. timeout = timeout,
  323. wallet = wallet )
  324. def process_http_resp(self,run_ret,batch=False,json_rpc=True):
  325. text,status = run_ret
  326. if status == 200:
  327. dmsg_rpc(' RPC RESPONSE data ==>\n{}\n',text,is_json=True)
  328. if batch:
  329. return [r['result'] for r in json.loads(text,parse_float=Decimal)]
  330. else:
  331. try:
  332. if json_rpc:
  333. return json.loads(text,parse_float=Decimal)['result']
  334. else:
  335. return json.loads(text,parse_float=Decimal)
  336. except:
  337. t = json.loads(text)
  338. try:
  339. m = t['error']['message']
  340. except:
  341. try: m = t['error']
  342. except: m = t
  343. die( 'RPCFailure', m )
  344. else:
  345. import http
  346. m,s = ( '', http.HTTPStatus(status) )
  347. if text:
  348. try:
  349. m = json.loads(text)['error']['message']
  350. except:
  351. try: m = text.decode()
  352. except: m = text
  353. die( 'RPCFailure', f'{s.value} {s.name}: {m}' )
  354. async def stop_daemon(self,quiet=False,silent=False):
  355. if self.daemon.state == 'ready':
  356. if not (quiet or silent):
  357. msg(f'Stopping {self.daemon.desc} on port {self.daemon.bind_port}')
  358. ret = await self.do_stop_daemon(silent=silent)
  359. if self.daemon.wait:
  360. self.daemon.wait_for_state('stopped')
  361. return ret
  362. else:
  363. if not (quiet or silent):
  364. msg(f'{self.daemon.desc} on port {self.daemon.bind_port} not running')
  365. return True
  366. async def restart_daemon(self,quiet=False,silent=False):
  367. await self.stop_daemon(quiet=quiet,silent=silent)
  368. return self.daemon.start(silent=silent)
  369. def handle_unsupported_daemon_version(self,name,warn_only):
  370. class daemon_version_warning(oneshot_warning):
  371. color = 'yellow'
  372. message = 'ignoring unsupported {} daemon version at user request'
  373. if warn_only:
  374. daemon_version_warning(div=name,fmt_args=[self.daemon.coind_name])
  375. else:
  376. name = self.daemon.coind_name
  377. die(2,'\n'+fmt(f"""
  378. The running {name} daemon has version {self.daemon_version_str}.
  379. This version of MMGen is tested only on {name} v{self.daemon.coind_version_str} and below.
  380. To avoid this error, downgrade your daemon to a supported version.
  381. Alternatively, you may invoke the command with the --ignore-daemon-version
  382. option, in which case you proceed at your own risk.
  383. """,indent=' '))
  384. async def rpc_init(
  385. cfg,
  386. proto = None,
  387. backend = None,
  388. daemon = None,
  389. ignore_daemon_version = False,
  390. ignore_wallet = False ):
  391. proto = proto or cfg._proto
  392. if not 'rpc_init' in proto.mmcaps:
  393. die(1,f'rpc_init() not supported for {proto.name} protocol!')
  394. cls = getattr(
  395. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.rpc'),
  396. proto.base_proto + 'RPCClient' )
  397. from .daemon import CoinDaemon
  398. rpc = await cls(
  399. cfg = cfg,
  400. proto = proto,
  401. daemon = daemon or CoinDaemon(cfg,proto=proto,test_suite=cfg.test_suite),
  402. backend = backend or cfg.rpc_backend,
  403. ignore_wallet = ignore_wallet )
  404. if rpc.daemon_version > rpc.daemon.coind_version:
  405. rpc.handle_unsupported_daemon_version(
  406. proto.name,
  407. ignore_daemon_version or proto.ignore_daemon_version or cfg.ignore_daemon_version )
  408. if rpc.chain not in proto.chain_names:
  409. die( 'RPCChainMismatch', '\n' + fmt(f"""
  410. Protocol: {proto.cls_name}
  411. Valid chain names: {fmt_list(proto.chain_names,fmt='bare')}
  412. RPC client chain: {rpc.chain}
  413. """,indent=' ').rstrip() )
  414. rpc.blockcount = NonNegativeInt(rpc.blockcount)
  415. return rpc