mmgen-wallet/test/unit_tests_d/ut_rpc.py

126 lines
3.1 KiB
Python
Executable file

#!/usr/bin/env python3
"""
test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
"""
from mmgen.common import *
from mmgen.exception import *
from mmgen.protocol import init_proto
from mmgen.rpc import rpc_init,MoneroWalletRPCClient
from mmgen.daemon import CoinDaemon,MoneroWalletDaemon
def auth_test(proto,d):
if g.platform != 'win':
qmsg(f'\n Testing authentication with credentials from {d.cfg_file}:')
d.remove_datadir()
os.makedirs(d.datadir)
cf = os.path.join(d.datadir,d.cfg_file)
open(cf,'a').write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
d.add_flag('keep_cfg_file')
d.start()
async def do():
rpc = await rpc_init(proto)
assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!'
run_session(do())
d.stop()
def do_msg(rpc):
qmsg(' Testing backend {!r}'.format(type(rpc.backend).__name__))
class init_test:
async def btc(proto,backend):
rpc = await rpc_init(proto,backend)
do_msg(rpc)
addrs = (
('bc1qvmqas4maw7lg9clqu6kqu9zq9cluvlln5hw97q','test address #1'), # deadbeef * 8
('bc1qe50rj25cldtskw5huxam335kyshtqtlrf4pt9x','test address #2'), # deadbeef * 7 + deadbeee
)
await rpc.batch_call('importaddress',addrs,timeout=120)
ret = await rpc.batch_call('getaddressesbylabel',[(l,) for a,l in addrs])
assert list(ret[0].keys())[0] == addrs[0][0]
bh = (await rpc.call('getblockchaininfo',timeout=300))['bestblockhash']
await rpc.gathered_call('getblock',((bh,),(bh,1)),timeout=300)
await rpc.gathered_call(None,(('getblock',(bh,)),('getblock',(bh,1))),timeout=300)
async def bch(proto,backend):
rpc = await rpc_init(proto,backend)
do_msg(rpc)
ltc = bch
async def eth(proto,backend):
rpc = await rpc_init(proto,backend)
do_msg(rpc)
await rpc.call('parity_versionInfo',timeout=300)
def run_test(coin,auth):
proto = init_proto(coin,network=('mainnet','regtest')[coin=='eth']) # FIXME CoinDaemon's network handling broken
d = CoinDaemon(network_id=coin,test_suite=True)
d.stop()
d.remove_datadir()
d.start()
for backend in g.autoset_opts['rpc_backend'].choices:
run_session(getattr(init_test,coin)(proto,backend),backend=backend)
d.stop()
if auth:
auth_test(proto,d)
qmsg(' OK')
return True
class unit_tests:
altcoin_deps = ('ltc','bch','eth','xmr_wallet')
def btc(self,name,ut):
return run_test('btc',auth=True)
def ltc(self,name,ut):
return run_test('ltc',auth=True)
def bch(self,name,ut):
return run_test('bch',auth=True)
def eth(self,name,ut):
return run_test('eth',auth=False)
def xmr_wallet(self,name,ut):
async def run():
md = CoinDaemon('xmr',test_suite=True)
if not opt.no_daemon_autostart:
md.start()
wd = MoneroWalletDaemon(
wallet_dir = 'test/trash',
passwd = 'ut_rpc_passw0rd',
test_suite = True )
wd.start()
c = MoneroWalletRPCClient(
host = wd.host,
port = wd.rpc_port,
user = wd.user,
passwd = wd.passwd )
await c.call('get_version')
gmsg('OK')
wd.wait = False
wd.stop()
if not opt.no_daemon_stop:
md.wait = False
md.stop()
run_session(run())
return True