ut_rpc.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. #!/usr/bin/env python3
  2. """
  3. test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
  4. """
  5. from mmgen.common import *
  6. from mmgen.exception import *
  7. from mmgen.protocol import init_proto
  8. from mmgen.rpc import MoneroWalletRPCClient
  9. from mmgen.daemon import CoinDaemon,MoneroWalletDaemon
  10. def auth_test(d):
  11. d.stop()
  12. if g.platform != 'win':
  13. qmsg(f'\n Testing authentication with credentials from bitcoin.conf:')
  14. d.remove_datadir()
  15. os.makedirs(d.datadir)
  16. cf = os.path.join(d.datadir,'bitcoin.conf')
  17. open(cf,'a').write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
  18. d.add_flag('keep_cfg_file')
  19. d.start()
  20. async def do():
  21. assert g.rpc.auth.user == 'ut_rpc', 'user is not ut_rpc!'
  22. run_session(do())
  23. d.stop()
  24. class unit_tests:
  25. def bch(self,name,ut):
  26. async def run_test():
  27. qmsg(' Testing backend {!r}'.format(type(g.rpc.backend).__name__))
  28. d = CoinDaemon('bch',test_suite=True)
  29. d.remove_datadir()
  30. d.start()
  31. g.proto.daemon_data_dir = d.datadir # location of cookie file
  32. g.rpc_port = d.rpc_port
  33. for backend in g.autoset_opts['rpc_backend'].choices:
  34. run_session(run_test(),backend=backend)
  35. auth_test(d)
  36. qmsg(' OK')
  37. return True
  38. def btc(self,name,ut):
  39. async def run_test():
  40. c = g.rpc
  41. qmsg(' Testing backend {!r}'.format(type(c.backend).__name__))
  42. addrs = (
  43. ('bc1qvmqas4maw7lg9clqu6kqu9zq9cluvlln5hw97q','test address #1'), # deadbeef * 8
  44. ('bc1qe50rj25cldtskw5huxam335kyshtqtlrf4pt9x','test address #2'), # deadbeef * 7 + deadbeee
  45. )
  46. await c.batch_call('importaddress',addrs,timeout=120)
  47. ret = await c.batch_call('getaddressesbylabel',[(l,) for a,l in addrs])
  48. assert list(ret[0].keys())[0] == addrs[0][0]
  49. bh = (await c.call('getblockchaininfo',timeout=300))['bestblockhash']
  50. await c.gathered_call('getblock',((bh,),(bh,1)),timeout=300)
  51. await c.gathered_call(None,(('getblock',(bh,)),('getblock',(bh,1))),timeout=300)
  52. d = CoinDaemon('btc',test_suite=True)
  53. d.remove_datadir()
  54. d.start()
  55. g.proto.daemon_data_dir = d.datadir # used by BitcoinRPCClient.set_auth() to find the cookie
  56. g.rpc_port = d.rpc_port
  57. for backend in g.autoset_opts['rpc_backend'].choices:
  58. run_session(run_test(),backend=backend)
  59. auth_test(d)
  60. qmsg(' OK')
  61. return True
  62. def eth(self,name,ut):
  63. ed = CoinDaemon('eth',test_suite=True)
  64. ed.start()
  65. g.rpc_port = CoinDaemon('eth',test_suite=True).rpc_port
  66. async def run_test():
  67. qmsg(' Testing backend {!r}'.format(type(g.rpc.backend).__name__))
  68. ret = await g.rpc.call('parity_versionInfo',timeout=300)
  69. for backend in g.autoset_opts['rpc_backend'].choices:
  70. run_session(run_test(),proto=init_proto('eth'),backend=backend)
  71. ed.stop()
  72. return True
  73. def xmr_wallet(self,name,ut):
  74. async def run():
  75. md = CoinDaemon('xmr',test_suite=True)
  76. if not opt.no_daemon_autostart:
  77. md.start()
  78. g.monero_wallet_rpc_password = 'passwOrd'
  79. mwd = MoneroWalletDaemon(wallet_dir='test/trash',test_suite=True)
  80. mwd.start()
  81. c = MoneroWalletRPCClient(
  82. host = g.monero_wallet_rpc_host,
  83. port = mwd.rpc_port,
  84. user = g.monero_wallet_rpc_user,
  85. passwd = g.monero_wallet_rpc_password)
  86. await c.call('get_version')
  87. gmsg('OK')
  88. mwd.wait = False
  89. mwd.stop()
  90. if not opt.no_daemon_stop:
  91. md.wait = False
  92. md.stop()
  93. run_session(run(),do_rpc_init=False)
  94. return True