123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- #!/usr/bin/env python3
- """
- test.daemontest_d.ut_rpc: RPC unit test for the MMGen suite
- """
- import sys, os
- from mmgen.cfg import Config
- from mmgen.color import yellow, cyan
- from mmgen.util import msg, gmsg, make_timestr, pp_fmt, die
- from mmgen.protocol import init_proto
- from mmgen.rpc import rpc_init
- from mmgen.daemon import CoinDaemon
- from mmgen.proto.xmr.rpc import MoneroRPCClient, MoneroWalletRPCClient
- from mmgen.proto.xmr.daemon import MoneroWalletDaemon
- from ..include.common import cfg, qmsg, vmsg, in_nix_environment, test_exec
- async def cfg_file_auth_test(cfg, d, bad_auth=False):
- m = 'missing credentials' if bad_auth else f'credentials from {d.cfg_file}'
- qmsg(cyan(f'\n Testing authentication with {m}:'))
- d.stop()
- d.remove_datadir() # removes cookie file to force authentication from cfg file
- os.makedirs(d.network_datadir)
- if not bad_auth:
- cf = os.path.join(d.datadir, d.cfg_file)
- with open(cf, 'a') as fp:
- fp.write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
- d.flag.keep_cfg_file = True
- d.start()
- if bad_auth:
- os.rename(d.auth_cookie_fn, d.auth_cookie_fn+'.bak')
- try:
- await rpc_init(cfg, d.proto)
- except Exception as e:
- vmsg(yellow(str(e)))
- else:
- die(3, 'No error on missing credentials!')
- os.rename(d.auth_cookie_fn+'.bak', d.auth_cookie_fn)
- else:
- rpc = await rpc_init(cfg, d.proto)
- assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!'
- if not cfg.no_daemon_stop:
- d.stop()
- d.remove_datadir()
- async def print_daemon_info(rpc):
- if rpc.proto.base_proto == 'Monero':
- msg(f"""
- DAEMON VERSION: {rpc.daemon_version} [{rpc.daemon_version_str}]
- NETWORK: {rpc.proto.coin} {rpc.proto.network.upper()}
- """.rstrip())
- else:
- msg(f"""
- DAEMON VERSION: {rpc.daemon_version} [{rpc.daemon_version_str}]
- CAPS: {rpc.caps}
- NETWORK: {rpc.proto.coin} {rpc.proto.network.upper()}
- CHAIN: {rpc.chain}
- BLOCKCOUNT: {rpc.blockcount}
- CUR_DATE: {rpc.cur_date} [{make_timestr(rpc.cur_date)}]
- """.rstrip())
- if rpc.proto.base_proto == 'Bitcoin':
- def fmt_dict(d):
- return '\n ' + '\n '.join(pp_fmt(d).split('\n')) + '\n'
- msg(f"""
- NETWORKINFO: {fmt_dict(rpc.cached["networkinfo"])}
- BLOCKCHAININFO: {fmt_dict(rpc.cached["blockchaininfo"])}
- DEPLOYMENTINFO: {fmt_dict(rpc.cached["deploymentinfo"])}
- WALLETINFO: {fmt_dict(await rpc.walletinfo)}
- """.rstrip())
- msg('')
- def do_msg(rpc, backend):
- bname = type(rpc.backend).__name__
- qmsg(' Testing backend {!r}{}'.format(bname, '' if backend == bname else f' [{backend}]'))
- class init_test:
- @staticmethod
- async def btc(cfg, daemon, backend):
- rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
- do_msg(rpc, backend)
- if cfg.tw_name:
- wi = await rpc.walletinfo
- assert wi['walletname'] == rpc.cfg.tw_name, f'{wi["walletname"]!r} != {rpc.cfg.tw_name!r}'
- bh = (await rpc.call('getblockchaininfo', timeout=300))['bestblockhash']
- await rpc.gathered_call('getblock', ((bh,), (bh, 1)), timeout=300)
- await rpc.gathered_call(None, (('getblock', (bh,)), ('getblock', (bh, 1))), timeout=300)
- return rpc
- @staticmethod
- async def bch(cfg, daemon, backend):
- rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
- do_msg(rpc, backend)
- return rpc
- ltc = bch
- @staticmethod
- async def eth(cfg, daemon, backend):
- rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
- do_msg(rpc, backend)
- await rpc.call('eth_blockNumber', timeout=300)
- return rpc
- etc = eth
- async def run_test(network_ids, test_cf_auth=False, daemon_ids=None, cfg_in=None):
- async def do_test(d, cfg):
- d.wait = True
- if not cfg.no_daemon_stop:
- d.stop()
- d.remove_datadir()
- if not cfg.no_daemon_autostart:
- d.remove_datadir()
- d.start()
- for n, backend in enumerate(cfg._autoset_opts['rpc_backend'].choices):
- test = getattr(init_test, d.proto.coin.lower())
- rpc = await test(cfg, d, backend)
- if not n and cfg.verbose:
- await print_daemon_info(rpc)
- if not cfg.no_daemon_stop:
- d.stop()
- d.remove_datadir()
- if test_cf_auth and sys.platform != 'win32':
- await cfg_file_auth_test(cfg, d)
- await cfg_file_auth_test(cfg, d, bad_auth=True)
- qmsg('')
- cfg_arg = cfg_in or cfg
- for network_id in network_ids:
- proto = init_proto(cfg_arg, network_id=network_id)
- all_ids = CoinDaemon.get_daemon_ids(cfg_arg, proto.coin)
- ids = set(daemon_ids) & set(all_ids) if daemon_ids else all_ids
- for daemon_id in ids:
- await do_test(CoinDaemon(cfg_arg, proto=proto, test_suite=True, daemon_id=daemon_id), cfg_arg)
- return True
- class unit_tests:
- altcoin_deps = ('ltc', 'bch', 'geth', 'erigon', 'parity', 'xmrwallet')
- arm_skip = ('parity',) # no prebuilt binaries for ARM
- async def btc(self, name, ut):
- return await run_test(
- ['btc', 'btc_tn'],
- test_cf_auth = True,
- cfg_in = Config({'_clone': cfg, 'tw_name': 'alternate-tracking-wallet'}))
- async def ltc(self, name, ut):
- return await run_test(['ltc', 'ltc_tn'], test_cf_auth=True)
- async def bch(self, name, ut):
- return await run_test(['bch', 'bch_tn'], test_cf_auth=True)
- async def geth(self, name, ut):
- # mainnet returns EIP-155 error on empty blockchain:
- return await run_test(['eth_tn', 'eth_rt'], daemon_ids=['geth'])
- async def erigon(self, name, ut):
- return await run_test(['eth', 'eth_tn', 'eth_rt'], daemon_ids=['erigon'])
- async def parity(self, name, ut):
- if in_nix_environment() and not test_exec('parity --help'):
- ut.skip_msg('Nix environment')
- return True
- return await run_test(['etc'])
- async def xmrwallet(self, name, ut):
- async def test_monerod_rpc(md):
- rpc = MoneroRPCClient(
- cfg = cfg,
- proto = md.proto,
- host = 'localhost',
- port = md.rpc_port,
- user = None,
- passwd = None,
- daemon = md,
- )
- if cfg.verbose:
- await print_daemon_info(rpc)
- rpc.call_raw('get_height')
- rpc.call('get_last_block_header')
- from mmgen.xmrseed import xmrseed
- async def run():
- networks = init_proto(cfg, 'xmr').networks
- daemons = [(
- CoinDaemon(cfg, proto=proto, test_suite=True),
- MoneroWalletDaemon(
- cfg = cfg,
- proto = proto,
- test_suite = True,
- wallet_dir = os.path.join('test', 'trash2'),
- datadir = os.path.join('test', 'trash2', 'wallet_rpc'),
- passwd = 'ut_rpc_passw0rd')
- ) for proto in (init_proto(cfg, 'xmr', network=network) for network in networks)]
- for md, wd in daemons:
- if not cfg.no_daemon_autostart:
- md.start()
- wd.start()
- await test_monerod_rpc(md)
- c = MoneroWalletRPCClient(cfg=cfg, daemon=wd)
- fn = f'monero-{wd.network}-junk-wallet'
- qmsg(f'Creating {wd.network} wallet')
- c.call(
- 'restore_deterministic_wallet',
- filename = fn,
- password = 'foo',
- seed = xmrseed().fromhex('beadface'*8, tostr=True))
- if sys.platform == 'win32':
- wd.stop()
- wd.start()
- qmsg(f'Opening {wd.network} wallet')
- c.call('open_wallet', filename=fn, password='foo')
- await c.stop_daemon()
- if not cfg.no_daemon_stop:
- md.stop()
- gmsg('OK')
- import shutil
- shutil.rmtree('test/trash2', ignore_errors=True)
- os.makedirs('test/trash2/wallet_rpc')
- await run()
- return True
|