ut_rpc.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #!/usr/bin/env python3
  2. """
  3. test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
  4. """
  5. from mmgen.common import *
  6. from mmgen.exception import *
  7. from mmgen.protocol import init_coin,EthereumProtocol
  8. from mmgen.rpc import MoneroWalletRPCClient
  9. from mmgen.daemon import CoinDaemon,MoneroWalletDaemon
  10. class unit_tests:
  11. def btc(self,name,ut):
  12. async def run_test():
  13. c = g.rpc
  14. qmsg(' Testing backend {!r}'.format(type(c.backend).__name__))
  15. addrs = (
  16. ('bc1qvmqas4maw7lg9clqu6kqu9zq9cluvlln5hw97q','test address #1'), # deadbeef * 8
  17. ('bc1qe50rj25cldtskw5huxam335kyshtqtlrf4pt9x','test address #2'), # deadbeef * 7 + deadbeee
  18. )
  19. await c.batch_call('importaddress',addrs,timeout=120)
  20. ret = await c.batch_call('getaddressesbylabel',[(l,) for a,l in addrs])
  21. assert list(ret[0].keys())[0] == addrs[0][0]
  22. bh = (await c.call('getblockchaininfo',timeout=300))['bestblockhash']
  23. await c.gathered_call('getblock',((bh,),(bh,1)),timeout=300)
  24. await c.gathered_call(None,(('getblock',(bh,)),('getblock',(bh,1))),timeout=300)
  25. d = CoinDaemon('btc',test_suite=True)
  26. d.remove_datadir()
  27. d.start()
  28. g.proto.daemon_data_dir = d.datadir # used by BitcoinRPCClient.set_auth() to find the cookie
  29. g.rpc_port = d.rpc_port
  30. for backend in g.autoset_opts['rpc_backend'].choices:
  31. run_session(run_test(),backend=backend)
  32. d.stop()
  33. if g.platform != 'win':
  34. qmsg(f'\n Testing authentication with credentials from bitcoin.conf:')
  35. d.remove_datadir()
  36. os.makedirs(d.datadir)
  37. cf = os.path.join(d.datadir,'bitcoin.conf')
  38. open(cf,'a').write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
  39. d.add_flag('keep_cfg_file')
  40. d.start()
  41. async def do():
  42. assert g.rpc.auth.user == 'ut_rpc', 'user is not ut_rpc!'
  43. run_session(do())
  44. d.stop()
  45. qmsg(' OK')
  46. return True
  47. def eth(self,name,ut):
  48. ed = CoinDaemon('eth',test_suite=True)
  49. ed.start()
  50. init_coin('eth')
  51. g.rpc_port = CoinDaemon('eth',test_suite=True).rpc_port
  52. async def run_test():
  53. qmsg(' Testing backend {!r}'.format(type(g.rpc.backend).__name__))
  54. ret = await g.rpc.call('parity_versionInfo',timeout=300)
  55. #print(ret)
  56. for backend in g.autoset_opts['rpc_backend'].choices:
  57. run_session(run_test(),backend=backend)
  58. ed.stop()
  59. return True
  60. def xmr_wallet(self,name,ut):
  61. async def run():
  62. md = CoinDaemon('xmr',test_suite=True)
  63. if not opt.no_daemon_autostart:
  64. md.start()
  65. g.monero_wallet_rpc_password = 'passwOrd'
  66. mwd = MoneroWalletDaemon(wallet_dir='test/trash',test_suite=True)
  67. mwd.start()
  68. c = MoneroWalletRPCClient(
  69. host = g.monero_wallet_rpc_host,
  70. port = mwd.rpc_port,
  71. user = g.monero_wallet_rpc_user,
  72. passwd = g.monero_wallet_rpc_password)
  73. await c.call('get_version')
  74. gmsg('OK')
  75. mwd.wait = False
  76. mwd.stop()
  77. if not opt.no_daemon_stop:
  78. md.wait = False
  79. md.stop()
  80. run_session(run(),do_rpc_init=False)
  81. return True