cmdtest.py regtest: mempool, RBF status cleanups

This commit is contained in:
The MMGen Project 2025-02-16 14:42:32 +00:00
commit 967fad0b14
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2

View file

@ -1197,77 +1197,65 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
).read().strip()
return json.loads(ret) if decode_json else ret
def get_mempool1(self):
return self._get_mempool_compare_txid(None, 'rbf_txid1')
def get_mempool2(self):
return self._get_mempool_compare_txid('rbf_txid1', 'rbf_txid2')
def _get_mempool(self, do_msg=False):
if do_msg:
self.spawn('', msg_only=True)
return self._do_mmgen_regtest(['mempool'], decode_json=True)
def get_mempool1(self):
def _get_mempool_compare_txid(self, txid1, txid2):
if not self.proto.cap('rbf'):
return 'skip'
mp = self._get_mempool(do_msg=True)
if len(mp) != 1:
die(4, 'Mempool has more or less than one TX!')
self.write_to_tmpfile('rbf_txid', mp[0]+'\n')
if txid1:
chk = self.read_from_tmpfile(txid1)
if chk.strip() == mp[0]:
die(4, 'TX in mempool has not changed! RBF bump failed')
self.write_to_tmpfile(txid2, mp[0]+'\n')
return 'ok'
def bob_rbf_status(self, fee, exp1, exp2='', exit_val=None):
def _bob_rbf_status(self, fee, exit_val=None, txid=None, confirmations=0):
if not self.proto.cap('rbf'):
return 'skip'
if txid:
txid = self.read_from_tmpfile(txid).strip()
if confirmations:
r1 = f'Replacement transaction has {confirmations} confirmation'
r2 = rf'Replacing transactions:.*{txid}'
else:
r1, r2 = ('Transaction has been replaced', f'{txid} in mempool')
elif confirmations:
r1, r2 = (f'Transaction has {confirmations} confirmation', '')
else:
r1, r2 = ('in mempool, replaceable', '')
ext = ',{}]{x}.regtest.sigtx'.format(fee[:-1], x='' if cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, delete=False, no_dot=True)
return self.user_txsend_status('bob', txfile, exp1, exp2, exit_val=exit_val)
return self.user_txsend_status('bob', txfile, r1, r2, exit_val=exit_val)
def bob_rbf_status1(self):
if not self.proto.cap('rbf'):
return 'skip'
return self.bob_rbf_status(rtFee[1], 'in mempool, replaceable')
def get_mempool2(self):
if not self.proto.cap('rbf'):
return 'skip'
mp = self._get_mempool(do_msg=True)
if len(mp) != 1:
die(4, 'Mempool has more or less than one TX!')
chk = self.read_from_tmpfile('rbf_txid')
if chk.strip() == mp[0]:
die(4, 'TX in mempool has not changed! RBF bump failed')
self.write_to_tmpfile('rbf_txid2', mp[0]+'\n')
return 'ok'
return self._bob_rbf_status(rtFee[1])
def bob_rbf_status2(self):
if not self.proto.cap('rbf'):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(
rtFee[1],
'Transaction has been replaced',
f'{new_txid} in mempool',
exit_val = 0)
return self._bob_rbf_status(rtFee[1], txid='rbf_txid2')
def bob_rbf_status3(self):
if not self.proto.cap('rbf'):
return 'skip'
return self.bob_rbf_status(rtFee[2], 'status: in mempool, replaceable')
return self._bob_rbf_status(rtFee[2])
def bob_rbf_status4(self):
if not self.proto.cap('rbf'):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 1 confirmation',
rf'Replacing transactions:\s+{new_txid}')
return self._bob_rbf_status(rtFee[1], txid='rbf_txid2', confirmations=1, exit_val=0)
def bob_rbf_status5(self):
if not self.proto.cap('rbf'):
return 'skip'
return self.bob_rbf_status(rtFee[2], 'Transaction has 1 confirmation')
return self._bob_rbf_status(rtFee[2], confirmations=1, exit_val=0)
def bob_rbf_status6(self):
if not self.proto.cap('rbf'):
return 'skip'
new_txid = self.read_from_tmpfile('rbf_txid2').strip()
return self.bob_rbf_status(rtFee[1],
'Replacement transaction has 2 confirmations',
rf'Replacing transactions:\s+{new_txid}')
return self._bob_rbf_status(rtFee[1], txid='rbf_txid2', confirmations=2, exit_val=0)
def _gen_pairs(self, n):
from mmgen.tool.api import tool_api