requests.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. rpc.backends.requests: requests RPC backend for the MMGen Project
  12. """
  13. import json
  14. from ..util import dmsg_rpc_backend, json_encoder
  15. from .base import base
  16. class requests(base):
  17. def __del__(self):
  18. self.session.close()
  19. def __init__(self, caller):
  20. super().__init__(caller)
  21. import requests, urllib3
  22. urllib3.disable_warnings()
  23. self.session = requests.Session()
  24. self.session.trust_env = False # ignore *_PROXY environment vars
  25. self.session.headers = caller.http_hdrs
  26. if caller.auth_type:
  27. auth = 'HTTP' + caller.auth_type.capitalize() + 'Auth'
  28. self.session.auth = getattr(requests.auth, auth)(*caller.auth)
  29. if self.proxy: # used only by XMR for now: requires pysocks package
  30. self.session.proxies.update({
  31. 'http': f'socks5h://{self.proxy}',
  32. 'https': f'socks5h://{self.proxy}'})
  33. async def run(self, *args, **kwargs):
  34. return self.run_noasync(*args, **kwargs)
  35. def run_noasync(self, payload, timeout, host_path):
  36. dmsg_rpc_backend(self.host_url, host_path, payload)
  37. res = self.session.post(
  38. url = self.host_url + host_path,
  39. data = json.dumps(payload, cls=json_encoder),
  40. timeout = timeout or self.timeout,
  41. verify = False)
  42. return (res.content, res.status_code)