unit_tests rpc: async/await cleanups

This commit is contained in:
The MMGen Project 2024-03-12 13:21:56 +00:00
commit d3d4b11c7f
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2

View file

@ -7,7 +7,7 @@ test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
import sys,os,time
from mmgen.color import yellow,cyan
from mmgen.util import msg,gmsg,async_run,make_timestr,pp_fmt,die
from mmgen.util import msg,gmsg,make_timestr,pp_fmt,die
from mmgen.protocol import init_proto
from mmgen.rpc import rpc_init
from mmgen.daemon import CoinDaemon
@ -16,7 +16,7 @@ from mmgen.proto.xmr.daemon import MoneroWalletDaemon
from ..include.common import cfg,qmsg,vmsg
def cfg_file_auth_test(proto,d,bad_auth=False):
async def cfg_file_auth_test(cfg, d, bad_auth=False):
m = 'missing credentials' if bad_auth else f'credentials from {d.cfg_file}'
qmsg(cyan(f'\n Testing authentication with {m}:'))
time.sleep(0.1) # race condition
@ -34,19 +34,19 @@ def cfg_file_auth_test(proto,d,bad_auth=False):
if bad_auth:
os.rename(d.auth_cookie_fn,d.auth_cookie_fn+'.bak')
try:
async_run(rpc_init(cfg,proto))
await rpc_init(cfg, d.proto)
except Exception as e:
vmsg(yellow(str(e)))
else:
die(3,'No error on missing credentials!')
os.rename(d.auth_cookie_fn+'.bak',d.auth_cookie_fn)
else:
rpc = async_run(rpc_init(cfg,proto))
rpc = await rpc_init(cfg, d.proto)
assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!'
d.stop()
def print_daemon_info(rpc):
async def print_daemon_info(rpc):
if rpc.proto.base_proto == 'Monero':
msg(f"""
@ -81,8 +81,8 @@ def do_msg(rpc,backend):
class init_test:
@staticmethod
async def btc(proto,backend,daemon):
rpc = await rpc_init(cfg,proto,backend,daemon)
async def btc(cfg, daemon, backend):
rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
do_msg(rpc,backend)
bh = (await rpc.call('getblockchaininfo',timeout=300))['bestblockhash']
@ -91,25 +91,25 @@ class init_test:
return rpc
@staticmethod
async def bch(proto,backend,daemon):
rpc = await rpc_init(cfg,proto,backend,daemon)
async def bch(cfg, daemon, backend):
rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
do_msg(rpc,backend)
return rpc
ltc = bch
@staticmethod
async def eth(proto,backend,daemon):
rpc = await rpc_init(cfg,proto,backend,daemon)
async def eth(cfg, daemon, backend):
rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
do_msg(rpc,backend)
await rpc.call('eth_blockNumber',timeout=300)
return rpc
etc = eth
def run_test(network_ids,test_cf_auth=False,daemon_ids=None):
async def run_test(network_ids, test_cf_auth=False, daemon_ids=None, cfg_in=None):
def do_test(d):
async def do_test(d, cfg):
if not cfg.no_daemon_stop:
d.stop()
@ -118,27 +118,29 @@ def run_test(network_ids,test_cf_auth=False,daemon_ids=None):
d.remove_datadir()
d.start()
for n,backend in enumerate(cfg._autoset_opts['rpc_backend'].choices):
test = getattr(init_test,d.proto.coin.lower())
rpc = async_run(test(d.proto,backend,d))
for n, backend in enumerate(cfg._autoset_opts['rpc_backend'].choices):
test = getattr(init_test, d.proto.coin.lower())
rpc = await test(cfg, d, backend)
if not n and cfg.verbose:
print_daemon_info(rpc)
await print_daemon_info(rpc)
if not cfg.no_daemon_stop:
d.stop()
if test_cf_auth and sys.platform != 'win32':
cfg_file_auth_test(d.proto,d)
cfg_file_auth_test(d.proto,d,bad_auth=True)
await cfg_file_auth_test(cfg, d)
await cfg_file_auth_test(cfg, d, bad_auth=True)
qmsg('')
cfg_arg = cfg_in or cfg
for network_id in network_ids:
proto = init_proto( cfg, network_id=network_id )
all_ids = CoinDaemon.get_daemon_ids(cfg,proto.coin)
proto = init_proto(cfg_arg, network_id=network_id)
all_ids = CoinDaemon.get_daemon_ids(cfg_arg, proto.coin)
ids = set(daemon_ids) & set(all_ids) if daemon_ids else all_ids
for daemon_id in ids:
do_test( CoinDaemon(cfg, proto=proto,test_suite=True,daemon_id=daemon_id) )
await do_test(CoinDaemon(cfg_arg, proto=proto,test_suite=True,daemon_id=daemon_id), cfg_arg)
return True
@ -147,27 +149,27 @@ class unit_tests:
altcoin_deps = ('ltc','bch','geth','erigon','parity','xmrwallet')
arm_skip = ('parity',) # no prebuilt binaries for ARM
def btc(self,name,ut):
return run_test(['btc','btc_tn'],test_cf_auth=True)
async def btc(self, name, ut):
return await run_test(['btc','btc_tn'], test_cf_auth=True)
def ltc(self,name,ut):
return run_test(['ltc','ltc_tn'],test_cf_auth=True)
async def ltc(self, name, ut):
return await run_test(['ltc','ltc_tn'], test_cf_auth=True)
def bch(self,name,ut):
return run_test(['bch','bch_tn'],test_cf_auth=True)
async def bch(self, name, ut):
return await run_test(['bch','bch_tn'], test_cf_auth=True)
def geth(self,name,ut):
return run_test(['eth_tn','eth_rt'],daemon_ids=['geth']) # mainnet returns EIP-155 error on empty blockchain
async def geth(self, name, ut):
return await run_test(['eth_tn','eth_rt'], daemon_ids=['geth']) # mainnet returns EIP-155 error on empty blockchain
def erigon(self,name,ut):
return run_test(['eth','eth_tn','eth_rt'],daemon_ids=['erigon'])
async def erigon(self, name, ut):
return await run_test(['eth','eth_tn','eth_rt'], daemon_ids=['erigon'])
def parity(self,name,ut):
return run_test(['etc'])
async def parity(self, name, ut):
return await run_test(['etc'])
def xmrwallet(self,name,ut):
async def xmrwallet(self, name, ut):
def test_monerod_rpc(md):
async def test_monerod_rpc(md):
rpc = MoneroRPCClient(
cfg = cfg,
proto = md.proto,
@ -178,7 +180,7 @@ class unit_tests:
daemon = md,
)
if cfg.verbose:
print_daemon_info(rpc)
await print_daemon_info(rpc)
rpc.call_raw('get_height')
rpc.call('get_last_block_header')
@ -202,7 +204,7 @@ class unit_tests:
md.start()
wd.start()
test_monerod_rpc(md)
await test_monerod_rpc(md)
c = MoneroWalletRPCClient( cfg=cfg, daemon=wd )
fn = f'monero-{wd.network}-junk-wallet'
@ -230,5 +232,5 @@ class unit_tests:
import shutil
shutil.rmtree('test/trash2',ignore_errors=True)
os.makedirs('test/trash2/wallet_rpc')
async_run(run())
await run()
return True