test suite: ThornodeRPCServer: refactor

This commit is contained in:
The MMGen Project 2025-09-23 09:20:54 +00:00
commit 954ff01b41
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2

View file

@ -22,55 +22,70 @@ class ThornodeRPCServer(ThornodeServer):
name = 'thornode RPC server'
def make_response_body(self, method, environ):
req_str = request_uri(environ)
if method == 'POST':
length = int(environ.get('CONTENT_LENGTH', '0'))
if re.search(r'/bank/balances/(\S+)', req_str):
res = [
{'denom': 'foocoin', 'amount': 321321321321},
{'denom': 'rune', 'amount': 987654321321},
{'denom': 'barcoin', 'amount': 123123123123}]
elif m := re.search(r'/auth/accounts/(\S+)', req_str):
res = {
'value': {
'address': m[1],
'pub_key': 'PubKeySecp256k1{0000}',
'account_number': '1234',
'sequence': '333444'}}
elif m := re.search(r'/tx$', req_str):
assert method == 'POST'
txid = environ['wsgi.input'].read(length).decode().removeprefix('hash=0x').upper()
res = {
'hash': txid,
'height': '21298600',
'index': 2,
'tx_result': {
'gas_used': '173222',
class responses:
def get_balance(m, length):
return [
{'denom': 'foocoin', 'amount': 321321321321},
{'denom': 'rune', 'amount': 987654321321},
{'denom': 'barcoin', 'amount': 123123123123}]
def get_account_info(m, length):
return {
'value': {
'address': m[1],
'pub_key': 'PubKeySecp256k1{0000}',
'account_number': '1234',
'sequence': '333444'}}
def get_tx_info(m, length):
txid = environ['wsgi.input'].read(length).decode().removeprefix('hash=0x').upper()
return {
'hash': txid,
'height': '21298600',
'index': 2,
'tx_result': {
'gas_used': '173222',
'events': [],
'codespace': ''
},
'tx': 'MHgwMGZvb2Jhcg=='}
def check_tx(m, length):
return {
'code': 0,
'data': '',
'log': '',
'info': '',
'gas_wanted': '-1',
'gas_used': '53774',
'events': [],
'codespace': ''
},
'tx': 'MHgwMGZvb2Jhcg=='}
elif m := re.search(r'/check_tx$', req_str):
assert method == 'POST'
res = {
'code': 0,
'data': '',
'log': '',
'info': '',
'gas_wanted': '-1',
'gas_used': '53774',
'events': [],
'codespace': ''}
elif m := re.search(r'/broadcast_tx_sync$', req_str):
assert method == 'POST'
txhex = environ['wsgi.input'].read(length).decode().removeprefix('tx=0x').upper()
res = {'code': 0, 'codespace': '', 'data': '', 'log': ''}
if txhex.startswith('0A540A52'):
res.update({'hash': '14463C716CF08A814868DB779156BCD85A1DF8EE49E924900A74482E9DEE132D'})
elif txhex.startswith('0AC1010A'):
res.update({'hash': '17F9411E48542C0DCA4D40A0DD4A1795DE6D5791A873A27CBBDC1031FE8D1BC5'})
else:
raise ValueError(f'{req_str}’: malformed query path')
'codespace': ''}
return json.dumps({'result': res}).encode()
def broadcast_tx_sync(m, length):
txhex = environ['wsgi.input'].read(length).decode().removeprefix('tx=0x').upper()
res = {'code': 0, 'codespace': '', 'data': '', 'log': ''}
if txhex.startswith('0A540A52'):
res.update({'hash': '14463C716CF08A814868DB779156BCD85A1DF8EE49E924900A74482E9DEE132D'})
elif txhex.startswith('0AC1010A'):
res.update({'hash': '17F9411E48542C0DCA4D40A0DD4A1795DE6D5791A873A27CBBDC1031FE8D1BC5'})
return res
pat_info = (
('get_balance', 'GET', r'/bank/balances/(\S+)'),
('get_account_info', 'GET', r'/auth/accounts/(\S+)'),
('get_tx_info', 'POST', r'/tx$'),
('check_tx', 'POST', r'/check_tx$'),
('broadcast_tx_sync', 'POST', r'/broadcast_tx_sync$'))
req_str = request_uri(environ)
for name, method_chk, pat in pat_info:
if m := re.search(pat, req_str):
assert method == method_chk
length = int(environ.get('CONTENT_LENGTH', '0')) if method == 'POST' else None
res = getattr(responses, name)(m, length)
return json.dumps({'result': res}).encode()
raise ValueError(f'{req_str}’: malformed query path')