rpc.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. test.cmdtest_d.httpd.thornode.rpc: Thornode RPC HTTP server
  12. """
  13. import re, json
  14. from wsgiref.util import request_uri
  15. from . import ThornodeServer
  16. class ThornodeRPCServer(ThornodeServer):
  17. port = 18800
  18. name = 'thornode RPC server'
  19. def make_response_body(self, method, environ):
  20. req_str = request_uri(environ)
  21. if re.search(r'/bank/balances/(\S+)', req_str):
  22. data = {
  23. 'result': [
  24. {'denom': 'foocoin', 'amount': 321321321321},
  25. {'denom': 'rune', 'amount': 987654321321},
  26. {'denom': 'barcoin', 'amount': 123123123123},
  27. ]}
  28. elif m := re.search(r'/auth/accounts/(\S+)', req_str):
  29. data = {
  30. 'result': {
  31. 'value': {
  32. 'address': m[1],
  33. 'pub_key': 'PubKeySecp256k1{0000}',
  34. 'account_number': '1234',
  35. 'sequence': '333444'
  36. }}}
  37. elif m := re.search(r'/tx$', req_str):
  38. assert method == 'POST'
  39. txid = environ['wsgi.input'].read(71).decode().removeprefix('hash=0x').upper()
  40. data = {
  41. 'result': {
  42. 'hash': txid,
  43. 'height': '21298600',
  44. 'index': 2,
  45. 'tx_result': {
  46. 'gas_used': '173222',
  47. 'events': [],
  48. 'codespace': ''
  49. },
  50. 'tx': 'MHgwMGZvb2Jhcg=='
  51. }
  52. }
  53. elif m := re.search(r'/check_tx$', req_str):
  54. assert method == 'POST'
  55. data = {
  56. 'result': {
  57. 'code': 0,
  58. 'data': '',
  59. 'log': '',
  60. 'info': '',
  61. 'gas_wanted': '-1',
  62. 'gas_used': '53774',
  63. 'events': [],
  64. 'codespace': ''
  65. }
  66. }
  67. elif m := re.search(r'/broadcast_tx_sync$', req_str):
  68. assert method == 'POST'
  69. txhex = environ['wsgi.input'].read(24).decode().removeprefix('tx=0x').upper()
  70. if txhex.startswith('0A540A52'):
  71. data = {
  72. 'result': {
  73. 'code': 0,
  74. 'codespace': '',
  75. 'data': '',
  76. 'hash': '14463C716CF08A814868DB779156BCD85A1DF8EE49E924900A74482E9DEE132D',
  77. 'log': ''
  78. }
  79. }
  80. else:
  81. raise ValueError(f'‘{req_str}’: malformed query path')
  82. return json.dumps(data).encode()