|
@@ -3,6 +3,8 @@
|
|
test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
|
|
test.unit_tests_d.ut_rpc: RPC unit test for the MMGen suite
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
+import time
|
|
|
|
+
|
|
from mmgen.common import *
|
|
from mmgen.common import *
|
|
|
|
|
|
from mmgen.protocol import init_proto
|
|
from mmgen.protocol import init_proto
|
|
@@ -11,23 +13,32 @@ from mmgen.daemon import CoinDaemon
|
|
from mmgen.proto.xmr.rpc import MoneroRPCClient,MoneroRPCClientRaw,MoneroWalletRPCClient
|
|
from mmgen.proto.xmr.rpc import MoneroRPCClient,MoneroRPCClientRaw,MoneroWalletRPCClient
|
|
from mmgen.proto.xmr.daemon import MoneroWalletDaemon
|
|
from mmgen.proto.xmr.daemon import MoneroWalletDaemon
|
|
|
|
|
|
-def cfg_file_auth_test(proto,d):
|
|
|
|
- qmsg(cyan(f'\n Testing authentication with credentials from {d.cfg_file}:'))
|
|
|
|
|
|
+def cfg_file_auth_test(proto,d,bad_auth=False):
|
|
|
|
+ m = 'missing credentials' if bad_auth else f'credentials from {d.cfg_file}'
|
|
|
|
+ qmsg(cyan(f'\n Testing authentication with {m}:'))
|
|
|
|
+ time.sleep(0.1) # race condition
|
|
d.remove_datadir() # removes cookie file to force authentication from cfg file
|
|
d.remove_datadir() # removes cookie file to force authentication from cfg file
|
|
os.makedirs(d.network_datadir)
|
|
os.makedirs(d.network_datadir)
|
|
|
|
|
|
- cf = os.path.join(d.datadir,d.cfg_file)
|
|
|
|
- with open(cf,'a') as fp:
|
|
|
|
- fp.write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
|
|
|
|
|
|
+ if not bad_auth:
|
|
|
|
+ cf = os.path.join(d.datadir,d.cfg_file)
|
|
|
|
+ with open(cf,'a') as fp:
|
|
|
|
+ fp.write('\nrpcuser = ut_rpc\nrpcpassword = ut_rpc_passw0rd\n')
|
|
|
|
+ d.flag.keep_cfg_file = True
|
|
|
|
|
|
- d.flag.keep_cfg_file = True
|
|
|
|
d.start()
|
|
d.start()
|
|
|
|
|
|
- async def do():
|
|
|
|
- rpc = await rpc_init(proto)
|
|
|
|
|
|
+ if bad_auth:
|
|
|
|
+ os.rename(d.auth_cookie_fn,d.auth_cookie_fn+'.bak')
|
|
|
|
+ try: async_run(rpc_init(proto))
|
|
|
|
+ except Exception as e:
|
|
|
|
+ vmsg(yellow(str(e)))
|
|
|
|
+ else: die(3,'No error on missing credentials!')
|
|
|
|
+ os.rename(d.auth_cookie_fn+'.bak',d.auth_cookie_fn)
|
|
|
|
+ else:
|
|
|
|
+ rpc = async_run(rpc_init(proto))
|
|
assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!'
|
|
assert rpc.auth.user == 'ut_rpc', f'{rpc.auth.user}: user is not ut_rpc!'
|
|
|
|
|
|
- async_run(do())
|
|
|
|
d.stop()
|
|
d.stop()
|
|
|
|
|
|
def print_daemon_info(rpc):
|
|
def print_daemon_info(rpc):
|
|
@@ -105,6 +116,7 @@ def run_test(network_ids,test_cf_auth=False,daemon_ids=None):
|
|
|
|
|
|
if test_cf_auth and g.platform != 'win':
|
|
if test_cf_auth and g.platform != 'win':
|
|
cfg_file_auth_test(d.proto,d)
|
|
cfg_file_auth_test(d.proto,d)
|
|
|
|
+ cfg_file_auth_test(d.proto,d,bad_auth=True)
|
|
|
|
|
|
qmsg('')
|
|
qmsg('')
|
|
|
|
|