ut_rpc.py 7.1 KB


  1. #!/usr/bin/env python3
  2. """
  3. test.daemontest_d.ut_rpc: RPC unit test for the MMGen suite
  4. """
  5. import sys, os
  6. from mmgen.cfg import Config
  7. from mmgen.color import yellow, cyan
  8. from mmgen.util import msg, gmsg, make_timestr, pp_fmt, die
  9. from mmgen.protocol import init_proto
  10. from mmgen.rpc import rpc_init
  11. from mmgen.daemon import CoinDaemon
  12. from mmgen.proto.xmr.rpc import MoneroRPCClient, MoneroWalletRPCClient
  13. from mmgen.proto.xmr.daemon import MoneroWalletDaemon
  14. from ..include.common import cfg, qmsg, vmsg, in_nix_environment, test_exec
  15. async def cfg_file_auth_test(cfg, d, bad_auth=False):
  16. m = 'missing credentials' if bad_auth else f'credentials from {d.cfg_file}'
  17. qmsg(cyan(f'\n Testing authentication with {m}:'))
  18. d.stop()
  19. d.remove_datadir() # removes cookie file to force authentication from cfg file
  20. os.makedirs(d.network_datadir)
  21. if not bad_auth:
  22. cf = os.path.join(d.datadir, d.cfg_file)
  23. with open(cf, 'a') as fp:
  24. fp.write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
  25. d.flag.keep_cfg_file = True
  26. d.start()
  27. if bad_auth:
  28. os.rename(d.auth_cookie_fn, d.auth_cookie_fn+'.bak')
  29. try:
  30. await rpc_init(cfg, d.proto)
  31. except Exception as e:
  32. vmsg(yellow(str(e)))
  33. else:
  34. die(3, 'No error on missing credentials!')
  35. os.rename(d.auth_cookie_fn+'.bak', d.auth_cookie_fn)
  36. else:
  37. rpc = await rpc_init(cfg, d.proto)
  38. assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!'
  39. if not cfg.no_daemon_stop:
  40. d.stop()
  41. d.remove_datadir()
  42. async def print_daemon_info(rpc):
  43. if rpc.proto.base_proto == 'Monero':
  44. msg(f"""
  45. DAEMON VERSION: {rpc.daemon_version} [{rpc.daemon_version_str}]
  46. NETWORK: {rpc.proto.coin} {rpc.proto.network.upper()}
  47. """.rstrip())
  48. else:
  49. msg(f"""
  50. DAEMON VERSION: {rpc.daemon_version} [{rpc.daemon_version_str}]
  51. CAPS: {rpc.caps}
  52. NETWORK: {rpc.proto.coin} {rpc.proto.network.upper()}
  53. CHAIN: {rpc.chain}
  54. BLOCKCOUNT: {rpc.blockcount}
  55. CUR_DATE: {rpc.cur_date} [{make_timestr(rpc.cur_date)}]
  56. """.rstrip())
  57. if rpc.proto.base_proto == 'Bitcoin':
  58. def fmt_dict(d):
  59. return '\n ' + '\n '.join(pp_fmt(d).split('\n')) + '\n'
  60. msg(f"""
  61. NETWORKINFO: {fmt_dict(rpc.cached["networkinfo"])}
  62. BLOCKCHAININFO: {fmt_dict(rpc.cached["blockchaininfo"])}
  63. DEPLOYMENTINFO: {fmt_dict(rpc.cached["deploymentinfo"])}
  64. WALLETINFO: {fmt_dict(await rpc.walletinfo)}
  65. """.rstrip())
  66. msg('')
  67. def do_msg(rpc, backend):
  68. bname = type(rpc.backend).__name__
  69. qmsg(' Testing backend {!r}{}'.format(bname, '' if backend == bname else f' [{backend}]'))
  70. class init_test:
  71. @staticmethod
  72. async def btc(cfg, daemon, backend):
  73. rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
  74. do_msg(rpc, backend)
  75. if cfg.tw_name:
  76. wi = await rpc.walletinfo
  77. assert wi['walletname'] == rpc.cfg.tw_name, f'{wi["walletname"]!r} != {rpc.cfg.tw_name!r}'
  78. bh = (await rpc.call('getblockchaininfo', timeout=300))['bestblockhash']
  79. await rpc.gathered_call('getblock', ((bh,), (bh, 1)), timeout=300)
  80. await rpc.gathered_call(None, (('getblock', (bh,)), ('getblock', (bh, 1))), timeout=300)
  81. return rpc
  82. @staticmethod
  83. async def bch(cfg, daemon, backend):
  84. rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
  85. do_msg(rpc, backend)
  86. return rpc
  87. ltc = bch
  88. @staticmethod
  89. async def eth(cfg, daemon, backend):
  90. rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
  91. do_msg(rpc, backend)
  92. await rpc.call('eth_blockNumber', timeout=300)
  93. return rpc
  94. etc = eth
  95. async def run_test(network_ids, test_cf_auth=False, daemon_ids=None, cfg_in=None):
  96. async def do_test(d, cfg):
  97. d.wait = True
  98. if not cfg.no_daemon_stop:
  99. d.stop()
  100. d.remove_datadir()
  101. if not cfg.no_daemon_autostart:
  102. d.remove_datadir()
  103. d.start()
  104. for n, backend in enumerate(cfg._autoset_opts['rpc_backend'].choices):
  105. test = getattr(init_test, d.proto.coin.lower())
  106. rpc = await test(cfg, d, backend)
  107. if not n and cfg.verbose:
  108. await print_daemon_info(rpc)
  109. if not cfg.no_daemon_stop:
  110. d.stop()
  111. d.remove_datadir()
  112. if test_cf_auth and sys.platform != 'win32':
  113. await cfg_file_auth_test(cfg, d)
  114. await cfg_file_auth_test(cfg, d, bad_auth=True)
  115. qmsg('')
  116. cfg_arg = cfg_in or cfg
  117. for network_id in network_ids:
  118. proto = init_proto(cfg_arg, network_id=network_id)
  119. all_ids = CoinDaemon.get_daemon_ids(cfg_arg, proto.coin)
  120. ids = set(daemon_ids) & set(all_ids) if daemon_ids else all_ids
  121. for daemon_id in ids:
  122. await do_test(CoinDaemon(cfg_arg, proto=proto, test_suite=True, daemon_id=daemon_id), cfg_arg)
  123. return True
  124. class unit_tests:
  125. altcoin_deps = ('ltc', 'bch', 'geth', 'erigon', 'parity', 'xmrwallet')
  126. arm_skip = ('parity',) # no prebuilt binaries for ARM
  127. async def btc(self, name, ut):
  128. return await run_test(
  129. ['btc', 'btc_tn'],
  130. test_cf_auth = True,
  131. cfg_in = Config({'_clone': cfg, 'tw_name': 'alternate-tracking-wallet'}))
  132. async def ltc(self, name, ut):
  133. return await run_test(['ltc', 'ltc_tn'], test_cf_auth=True)
  134. async def bch(self, name, ut):
  135. return await run_test(['bch', 'bch_tn'], test_cf_auth=True)
  136. async def geth(self, name, ut):
  137. # mainnet returns EIP-155 error on empty blockchain:
  138. return await run_test(['eth_tn', 'eth_rt'], daemon_ids=['geth'])
  139. async def erigon(self, name, ut):
  140. return await run_test(['eth', 'eth_tn', 'eth_rt'], daemon_ids=['erigon'])
  141. async def parity(self, name, ut):
  142. if in_nix_environment() and not test_exec('parity --help'):
  143. ut.skip_msg('Nix environment')
  144. return True
  145. return await run_test(['etc'])
  146. async def xmrwallet(self, name, ut):
  147. async def test_monerod_rpc(md):
  148. rpc = MoneroRPCClient(
  149. cfg = cfg,
  150. proto = md.proto,
  151. host = 'localhost',
  152. port = md.rpc_port,
  153. user = None,
  154. passwd = None,
  155. daemon = md,
  156. )
  157. if cfg.verbose:
  158. await print_daemon_info(rpc)
  159. rpc.call_raw('get_height')
  160. rpc.call('get_last_block_header')
  161. from mmgen.xmrseed import xmrseed
  162. async def run():
  163. networks = init_proto(cfg, 'xmr').networks
  164. daemons = [(
  165. CoinDaemon(cfg, proto=proto, test_suite=True),
  166. MoneroWalletDaemon(
  167. cfg = cfg,
  168. proto = proto,
  169. test_suite = True,
  170. wallet_dir = os.path.join('test', 'trash2'),
  171. datadir = os.path.join('test', 'trash2', 'wallet_rpc'),
  172. passwd = 'ut_rpc_passw0rd')
  173. ) for proto in (init_proto(cfg, 'xmr', network=network) for network in networks)]
  174. for md, wd in daemons:
  175. if not cfg.no_daemon_autostart:
  176. md.start()
  177. wd.start()
  178. await test_monerod_rpc(md)
  179. c = MoneroWalletRPCClient(cfg=cfg, daemon=wd)
  180. fn = f'monero-{wd.network}-junk-wallet'
  181. qmsg(f'Creating {wd.network} wallet')
  182. c.call(
  183. 'restore_deterministic_wallet',
  184. filename = fn,
  185. password = 'foo',
  186. seed = xmrseed().fromhex('beadface'*8, tostr=True))
  187. if sys.platform == 'win32':
  188. wd.stop()
  189. wd.start()
  190. qmsg(f'Opening {wd.network} wallet')
  191. c.call('open_wallet', filename=fn, password='foo')
  192. await c.stop_daemon()
  193. if not cfg.no_daemon_stop:
  194. md.stop()
  195. gmsg('OK')
  196. import shutil
  197. shutil.rmtree('test/trash2', ignore_errors=True)
  198. os.makedirs('test/trash2/wallet_rpc')
  199. await run()
  200. return True