#!/usr/bin/env python3 """ test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite """ import sys,os,time from mmgen.color import yellow,cyan from mmgen.util import msg,gmsg,make_timestr,pp_fmt,die from mmgen.protocol import init_proto from mmgen.rpc import rpc_init from mmgen.daemon import CoinDaemon from mmgen.proto.xmr.rpc import MoneroRPCClient,MoneroWalletRPCClient from mmgen.proto.xmr.daemon import MoneroWalletDaemon from ..include.common import cfg,qmsg,vmsg async def cfg_file_auth_test(cfg, d, bad_auth=False): m = 'missing credentials' if bad_auth else f'credentials from {d.cfg_file}' qmsg(cyan(f'\n Testing authentication with {m}:')) time.sleep(0.1) # race condition d.remove_datadir() # removes cookie file to force authentication from cfg file os.makedirs(d.network_datadir) if not bad_auth: cf = os.path.join(d.datadir,d.cfg_file) with open(cf,'a') as fp: fp.write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n') d.flag.keep_cfg_file = True d.start() if bad_auth: os.rename(d.auth_cookie_fn,d.auth_cookie_fn+'.bak') try: await rpc_init(cfg, d.proto) except Exception as e: vmsg(yellow(str(e))) else: die(3,'No error on missing credentials!') os.rename(d.auth_cookie_fn+'.bak',d.auth_cookie_fn) else: rpc = await rpc_init(cfg, d.proto) assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!' d.stop() async def print_daemon_info(rpc): if rpc.proto.base_proto == 'Monero': msg(f""" DAEMON VERSION: {rpc.daemon_version} [{rpc.daemon_version_str}] NETWORK: {rpc.proto.coin} {rpc.proto.network.upper()} """.rstrip()) else: msg(f""" DAEMON VERSION: {rpc.daemon_version} [{rpc.daemon_version_str}] CAPS: {rpc.caps} NETWORK: {rpc.proto.coin} {rpc.proto.network.upper()} CHAIN: {rpc.chain} BLOCKCOUNT: {rpc.blockcount} CUR_DATE: {rpc.cur_date} [{make_timestr(rpc.cur_date)}] """.rstrip()) if rpc.proto.base_proto == 'Bitcoin': def fmt_dict(d): return '\n ' + '\n '.join( pp_fmt(d).split('\n') ) + '\n' msg(f""" NETWORKINFO: {fmt_dict(rpc.cached["networkinfo"])} BLOCKCHAININFO: {fmt_dict(rpc.cached["blockchaininfo"])} DEPLOYMENTINFO: {fmt_dict(rpc.cached["deploymentinfo"])} """.rstrip()) msg('') def do_msg(rpc,backend): bname = type(rpc.backend).__name__ qmsg(' Testing backend {!r}{}'.format( bname, '' if backend == bname else f' [{backend}]' )) class init_test: @staticmethod async def btc(cfg, daemon, backend): rpc = await rpc_init(cfg, daemon.proto, backend, daemon) do_msg(rpc,backend) bh = (await rpc.call('getblockchaininfo',timeout=300))['bestblockhash'] await rpc.gathered_call('getblock',((bh,),(bh,1)),timeout=300) await rpc.gathered_call(None,(('getblock',(bh,)),('getblock',(bh,1))),timeout=300) return rpc @staticmethod async def bch(cfg, daemon, backend): rpc = await rpc_init(cfg, daemon.proto, backend, daemon) do_msg(rpc,backend) return rpc ltc = bch @staticmethod async def eth(cfg, daemon, backend): rpc = await rpc_init(cfg, daemon.proto, backend, daemon) do_msg(rpc,backend) await rpc.call('eth_blockNumber',timeout=300) return rpc etc = eth async def run_test(network_ids, test_cf_auth=False, daemon_ids=None, cfg_in=None): async def do_test(d, cfg): if not cfg.no_daemon_stop: d.stop() if not cfg.no_daemon_autostart: d.remove_datadir() d.start() for n, backend in enumerate(cfg._autoset_opts['rpc_backend'].choices): test = getattr(init_test, d.proto.coin.lower()) rpc = await test(cfg, d, backend) if not n and cfg.verbose: await print_daemon_info(rpc) if not cfg.no_daemon_stop: d.stop() if test_cf_auth and sys.platform != 'win32': await cfg_file_auth_test(cfg, d) await cfg_file_auth_test(cfg, d, bad_auth=True) qmsg('') cfg_arg = cfg_in or cfg for network_id in network_ids: proto = init_proto(cfg_arg, network_id=network_id) all_ids = CoinDaemon.get_daemon_ids(cfg_arg, proto.coin) ids = set(daemon_ids) & set(all_ids) if daemon_ids else all_ids for daemon_id in ids: await do_test(CoinDaemon(cfg_arg, proto=proto,test_suite=True,daemon_id=daemon_id), cfg_arg) return True class unit_tests: altcoin_deps = ('ltc','bch','geth','erigon','parity','xmrwallet') arm_skip = ('parity',) # no prebuilt binaries for ARM async def btc(self, name, ut): return await run_test(['btc','btc_tn'], test_cf_auth=True) async def ltc(self, name, ut): return await run_test(['ltc','ltc_tn'], test_cf_auth=True) async def bch(self, name, ut): return await run_test(['bch','bch_tn'], test_cf_auth=True) async def geth(self, name, ut): return await run_test(['eth_tn','eth_rt'], daemon_ids=['geth']) # mainnet returns EIP-155 error on empty blockchain async def erigon(self, name, ut): return await run_test(['eth','eth_tn','eth_rt'], daemon_ids=['erigon']) async def parity(self, name, ut): return await run_test(['etc']) async def xmrwallet(self, name, ut): async def test_monerod_rpc(md): rpc = MoneroRPCClient( cfg = cfg, proto = md.proto, host = 'localhost', port = md.rpc_port, user = None, passwd = None, daemon = md, ) if cfg.verbose: await print_daemon_info(rpc) rpc.call_raw('get_height') rpc.call('get_last_block_header') from mmgen.xmrseed import xmrseed async def run(): networks = init_proto( cfg, 'xmr' ).networks daemons = [( CoinDaemon( cfg, proto=proto, test_suite=True ), MoneroWalletDaemon( cfg = cfg, proto = proto, test_suite = True, wallet_dir = os.path.join('test','trash2'), datadir = os.path.join('test','trash2','wallet_rpc'), passwd = 'ut_rpc_passw0rd' ) ) for proto in (init_proto( cfg, 'xmr', network=network ) for network in networks) ] for md,wd in daemons: if not cfg.no_daemon_autostart: md.start() wd.start() await test_monerod_rpc(md) c = MoneroWalletRPCClient( cfg=cfg, daemon=wd ) fn = f'monero-{wd.network}-junk-wallet' qmsg(f'Creating {wd.network} wallet') c.call( 'restore_deterministic_wallet', filename = fn, password = 'foo', seed = xmrseed().fromhex('beadface'*8,tostr=True) ) if sys.platform == 'win32': wd.stop() wd.start() qmsg(f'Opening {wd.network} wallet') c.call( 'open_wallet', filename=fn, password='foo' ) await c.stop_daemon() if not cfg.no_daemon_stop: await md.rpc.stop_daemon() gmsg('OK') import shutil shutil.rmtree('test/trash2',ignore_errors=True) os.makedirs('test/trash2/wallet_rpc') await run() return True