Browse Source

aiohttp: initialize session in backend

The MMGen Project 2 years ago
parent
commit
24a612fca7

+ 2 - 4
mmgen/globalvars.py

@@ -40,7 +40,7 @@ class GlobalContext(Lockable):
 	  3 - command line
 	  3 - command line
 	"""
 	"""
 	_autolock = False
 	_autolock = False
-	_set_ok = ('session',)
+	_set_ok = ()
 	_reset_ok = ('stdout','stderr','accept_defaults')
 	_reset_ok = ('stdout','stderr','accept_defaults')
 	_use_class_attr = True
 	_use_class_attr = True
 
 
@@ -99,7 +99,6 @@ class GlobalContext(Lockable):
 	monero_wallet_rpc_user = 'monero'
 	monero_wallet_rpc_user = 'monero'
 	monero_wallet_rpc_password = ''
 	monero_wallet_rpc_password = ''
 	aiohttp_rpc_queue_len = 16
 	aiohttp_rpc_queue_len = 16
-	session              = None
 	cached_balances      = False
 	cached_balances      = False
 
 
 	# regtest:
 	# regtest:
@@ -181,7 +180,6 @@ class GlobalContext(Lockable):
 		'ignore_daemon_version',
 		'ignore_daemon_version',
 		'no_license',
 		'no_license',
 		'regtest',
 		'regtest',
-		'rpc_backend',
 		'rpc_host',
 		'rpc_host',
 		'rpc_password',
 		'rpc_password',
 		'rpc_port',
 		'rpc_port',
@@ -313,7 +311,7 @@ class GlobalContext(Lockable):
 			stdin_tty = True
 			stdin_tty = True
 		if prog_name == 'unit_tests.py':
 		if prog_name == 'unit_tests.py':
 			_set_ok += ('debug_subseed',)
 			_set_ok += ('debug_subseed',)
-			_reset_ok += ('force_standalone_scrypt_module','session')
+			_reset_ok += ('force_standalone_scrypt_module',)
 
 
 	if os.getenv('MMGEN_DEBUG_ALL'):
 	if os.getenv('MMGEN_DEBUG_ALL'):
 		for name in env_opts:
 		for name in env_opts:

+ 1 - 1
mmgen/proto/btc/rpc.py

@@ -90,7 +90,7 @@ class BitcoinRPCClient(RPCClient,metaclass=AsyncInit):
 			port = daemon.rpc_port )
 			port = daemon.rpc_port )
 
 
 		self.set_auth() # set_auth() requires cookie, so must be called after __init__() tests daemon is listening
 		self.set_auth() # set_auth() requires cookie, so must be called after __init__() tests daemon is listening
-		self.set_backend(backend) # backend requires self.auth
+		await self.set_backend_async(backend) # backend requires self.auth
 
 
 		self.cached = {}
 		self.cached = {}
 
 

+ 1 - 1
mmgen/proto/eth/rpc.py

@@ -44,7 +44,7 @@ class EthereumRPCClient(RPCClient,metaclass=AsyncInit):
 			host = 'localhost' if g.test_suite else (g.rpc_host or 'localhost'),
 			host = 'localhost' if g.test_suite else (g.rpc_host or 'localhost'),
 			port = daemon.rpc_port )
 			port = daemon.rpc_port )
 
 
-		self.set_backend(backend)
+		await self.set_backend_async(backend)
 
 
 		vi,bh,ci = await self.gathered_call(None, (
 		vi,bh,ci = await self.gathered_call(None, (
 				('web3_clientVersion',()),
 				('web3_clientVersion',()),

+ 27 - 9
mmgen/rpc.py

@@ -25,6 +25,7 @@ from decimal import Decimal
 from collections import namedtuple
 from collections import namedtuple
 
 
 from .common import *
 from .common import *
+from .base_obj import AsyncInit
 from .objmethods import Hilite,InitErrors
 from .objmethods import Hilite,InitErrors
 
 
 auth_data = namedtuple('rpc_auth_data',['user','passwd'])
 auth_data = namedtuple('rpc_auth_data',['user','passwd'])
@@ -114,18 +115,28 @@ class RPCBackends:
 			self.http_hdrs      = caller.http_hdrs
 			self.http_hdrs      = caller.http_hdrs
 			self.name           = type(self).__name__
 			self.name           = type(self).__name__
 
 
-	class aiohttp(base):
+	class aiohttp(base,metaclass=AsyncInit):
 		"""
 		"""
 		Contrary to the requests library, aiohttp won’t read environment variables by
 		Contrary to the requests library, aiohttp won’t read environment variables by
 		default.  But you can do so by passing trust_env=True into aiohttp.ClientSession
 		default.  But you can do so by passing trust_env=True into aiohttp.ClientSession
 		constructor to honor HTTP_PROXY, HTTPS_PROXY, WS_PROXY or WSS_PROXY environment
 		constructor to honor HTTP_PROXY, HTTPS_PROXY, WS_PROXY or WSS_PROXY environment
 		variables (all are case insensitive).
 		variables (all are case insensitive).
 		"""
 		"""
-		def __init__(self,caller):
+
+		def __del__(self):
+			self.connector.close()
+			self.session.detach()
+			del self.session
+
+		async def __init__(self,caller):
 			super().__init__(caller)
 			super().__init__(caller)
-			self.session = g.session
+			import aiohttp
+			self.connector = aiohttp.TCPConnector(limit_per_host=g.aiohttp_rpc_queue_len)
+			self.session = aiohttp.ClientSession(
+				headers = { 'Content-Type': 'application/json' },
+				connector = self.connector,
+			)
 			if caller.auth_type == 'basic':
 			if caller.auth_type == 'basic':
-				import aiohttp
 				self.auth = aiohttp.BasicAuth(*caller.auth,encoding='UTF-8')
 				self.auth = aiohttp.BasicAuth(*caller.auth,encoding='UTF-8')
 			else:
 			else:
 				self.auth = None
 				self.auth = None
@@ -293,12 +304,19 @@ class RPCClient(MMGenObject):
 		self.timeout = g.http_timeout
 		self.timeout = g.http_timeout
 		self.auth = None
 		self.auth = None
 
 
-	def set_backend(self,backend=None):
-		bn = backend or opt.rpc_backend
-		if bn == 'auto':
-			self.backend = {'linux':RPCBackends.httplib,'win':RPCBackends.requests}[g.platform](self)
+	def _get_backend(self,backend):
+		backend_id = backend or opt.rpc_backend
+		if backend_id == 'auto':
+			return {'linux':RPCBackends.httplib,'win':RPCBackends.requests}[g.platform](self)
 		else:
 		else:
-			self.backend = getattr(RPCBackends,bn)(self)
+			return getattr(RPCBackends,backend_id)(self)
+
+	def set_backend(self,backend=None):
+		self.backend = self._get_backend(backend)
+
+	async def set_backend_async(self,backend=None):
+		ret = self._get_backend(backend)
+		self.backend = (await ret) if type(ret).__name__ == 'coroutine' else ret
 
 
 	def set_auth(self):
 	def set_auth(self):
 		"""
 		"""

+ 2 - 14
mmgen/util.py

@@ -383,21 +383,9 @@ def get_subclasses(cls,names=False):
 				yield j
 				yield j
 	return tuple((c.__name__ for c in gen(cls)) if names else gen(cls))
 	return tuple((c.__name__ for c in gen(cls)) if names else gen(cls))
 
 
-def run_session(callback,backend=None):
-
-	async def do():
-		if (backend or opt.rpc_backend) == 'aiohttp':
-			import aiohttp
-			async with aiohttp.ClientSession(
-				headers = { 'Content-Type': 'application/json' },
-				connector = aiohttp.TCPConnector(limit_per_host=g.aiohttp_rpc_queue_len),
-			) as g.session:
-				return await callback
-		else:
-			return await callback
-
+def run_session(coro):
 	import asyncio
 	import asyncio
-	return asyncio.run(do())
+	return asyncio.run(coro)
 
 
 def wrap_ripemd160(called=[]):
 def wrap_ripemd160(called=[]):
 	if not called:
 	if not called:

+ 3 - 5
test/test_py_d/ts_ethdev.py

@@ -127,7 +127,7 @@ coin = g.coin
 class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
 class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
 	'Ethereum transacting, token deployment and tracking wallet operations'
 	'Ethereum transacting, token deployment and tracking wallet operations'
 	networks = ('eth','etc')
 	networks = ('eth','etc')
-	passthru_opts = ('coin','daemon_id','http_timeout')
+	passthru_opts = ('coin','daemon_id','http_timeout','rpc_backend')
 	extra_spawn_args = ['--regtest=1']
 	extra_spawn_args = ['--regtest=1']
 	tmpdir_nums = [22]
 	tmpdir_nums = [22]
 	color = True
 	color = True
@@ -407,10 +407,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
 
 
 	@property
 	@property
 	async def rpc(self):
 	async def rpc(self):
-		if not hasattr(self,'_rpc'):
-			from mmgen.rpc import rpc_init
-			self._rpc = await rpc_init(self.proto)
-		return self._rpc
+		from mmgen.rpc import rpc_init
+		return await rpc_init(self.proto)
 
 
 	async def setup(self):
 	async def setup(self):
 		self.spawn('',msg_only=True)
 		self.spawn('',msg_only=True)

+ 1 - 1
test/test_py_d/ts_main.py

@@ -69,7 +69,7 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
 	'basic operations with emulated tracking wallet'
 	'basic operations with emulated tracking wallet'
 	tmpdir_nums = [1,2,3,4,5,14,15,16,20,21]
 	tmpdir_nums = [1,2,3,4,5,14,15,16,20,21]
 	networks = ('btc','btc_tn','ltc','ltc_tn','bch','bch_tn')
 	networks = ('btc','btc_tn','ltc','ltc_tn','bch','bch_tn')
-	passthru_opts = ('daemon_data_dir','rpc_port','coin','testnet')
+	passthru_opts = ('daemon_data_dir','rpc_port','coin','testnet','rpc_backend')
 	segwit_opts_ok = True
 	segwit_opts_ok = True
 	color = True
 	color = True
 	need_daemon = True
 	need_daemon = True

+ 1 - 1
test/test_py_d/ts_regtest.py

@@ -145,7 +145,7 @@ from .ts_shared import *
 class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
 class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
 	'transacting and tracking wallet operations via regtest mode'
 	'transacting and tracking wallet operations via regtest mode'
 	networks = ('btc','ltc','bch')
 	networks = ('btc','ltc','bch')
-	passthru_opts = ('coin',)
+	passthru_opts = ('coin','rpc_backend')
 	extra_spawn_args = ['--regtest=1']
 	extra_spawn_args = ['--regtest=1']
 	tmpdir_nums = [17]
 	tmpdir_nums = [17]
 	color = True
 	color = True

+ 1 - 1
test/unit_tests_d/ut_rpc.py

@@ -96,7 +96,7 @@ def run_test(network_ids,test_cf_auth=False,daemon_ids=None):
 
 
 		for n,backend in enumerate(g.autoset_opts['rpc_backend'].choices):
 		for n,backend in enumerate(g.autoset_opts['rpc_backend'].choices):
 			test = getattr(init_test,d.proto.coin.lower())
 			test = getattr(init_test,d.proto.coin.lower())
-			rpc = run_session(test(d.proto,backend,d),backend=backend)
+			rpc = run_session(test(d.proto,backend,d))
 			if not n and opt.verbose:
 			if not n and opt.verbose:
 				print_daemon_info(rpc)
 				print_daemon_info(rpc)