swap: new SwapCfg class

This commit is contained in:
The MMGen Project 2025-05-19 09:23:55 +00:00
commit fe7a2a204b
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
12 changed files with 125 additions and 38 deletions

View file

@ -72,6 +72,7 @@ class AutosignTXError(Exception): mmcode = 2
class MMGenImportError(Exception): mmcode = 2
class SwapMemoParseError(Exception): mmcode = 2
class SwapAssetError(Exception): mmcode = 2
class SwapCfgValueError(Exception): mmcode = 2
# 3: yellow hl, 'MMGen Error' + exception + message
class RPCFailure(Exception): mmcode = 3

View file

@ -22,6 +22,7 @@ class NewSwap(New, TxNewSwap):
o = self.data_output._asdict()
parsed_memo = self.swap_proto_mod.Memo.parse(o['data'].decode())
memo = self.swap_proto_mod.Memo(
self.swap_cfg,
self.recv_proto,
self.recv_asset,
self.recv_proto.coin_addr(parsed_memo.address),

View file

@ -21,6 +21,7 @@ class NewSwap(New, TxNewSwap):
def update_data_output(self, trade_limit):
parsed_memo = self.swap_proto_mod.Memo.parse(self.swap_memo)
self.swap_memo = str(self.swap_proto_mod.Memo(
self.swap_cfg,
self.recv_proto,
self.recv_asset,
self.recv_proto.coin_addr(parsed_memo.address),

46
mmgen/swap/cfg.py Normal file
View file

@ -0,0 +1,46 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.cfg: swap configuration class the MMGen Wallet suite
"""
import re
from ..amt import UniAmt
from ..util import die
class SwapCfg:
# The trade limit, i.e., set 100000000 to get a minimum of 1 full asset, else a refund
# Optional. 1e8 or scientific notation
trade_limit = None
# Swap interval for streaming swap in blocks. Optional. If 0, do not stream
stream_interval = 3
# Swap quantity for streaming swap.
# The interval value determines the frequency of swaps in blocks
# Optional. If 0, network will determine the number of swaps
stream_quantity = 0
def __init__(self, cfg):
self.cfg = cfg
if cfg.trade_limit is not None:
self.set_trade_limit(desc='parameter for --trade-limit')
def set_trade_limit(self, *, desc):
s = self.cfg.trade_limit
if re.match(r'-*[0-9]+(\.[0-9]+)*%*$', s):
self.trade_limit = 1 - float(s[:-1]) / 100 if s.endswith('%') else UniAmt(s)
else:
die('SwapCfgValueError', f'{s}: invalid {desc}')

View file

@ -12,7 +12,7 @@
swap.proto.thorchain: THORChain swap protocol implementation for the MMGen Wallet suite
"""
__all__ = ['SwapAsset', 'Memo']
__all__ = ['SwapCfg', 'SwapAsset', 'Memo']
name = 'THORChain'
exp_prec = 4
@ -26,6 +26,8 @@ def rpc_client(tx, amt):
from .thornode import Thornode
return Thornode(tx, amt)
from .cfg import THORChainSwapCfg as SwapCfg
from .asset import THORChainSwapAsset as SwapAsset
from .memo import THORChainMemo as Memo

View file

@ -0,0 +1,18 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.proto.thorchain.cfg: THORChain swap configuration class the MMGen Wallet suite
"""
from ...cfg import SwapCfg
class THORChainSwapCfg(SwapCfg):
pass

View file

@ -13,7 +13,6 @@ swap.proto.thorchain.memo: THORChain swap protocol memo class
"""
from ....util import die, is_hex_str
from ....amt import UniAmt
from . import name as proto_name
@ -21,17 +20,6 @@ from . import SwapAsset
class THORChainMemo:
# The trade limit, i.e., set 100000000 to get a minimum of 1 full asset, else a refund
# Optional. 1e8 or scientific notation
trade_limit = None
# Swap interval in blocks. Optional. If 0, do not stream
stream_interval = 3
# Swap quantity. The interval value determines the frequency of swaps in blocks
# Optional. If 0, network will determine the number of swaps
stream_quantity = 0
max_len = 250
function = 'SWAP'
@ -120,7 +108,7 @@ class THORChainMemo:
return ret(proto_name, function, chain, asset, address, limit_int, int(interval), int(quantity))
def __init__(self, proto, asset, addr, *, trade_limit=None):
def __init__(self, swap_cfg, proto, asset, addr, *, trade_limit):
from ....amt import UniAmt
from ....addr import is_coin_addr
@ -143,6 +131,7 @@ class THORChainMemo:
self.proto = proto
self.asset = asset
self.swap_cfg = swap_cfg
self.trade_limit = trade_limit
def __str__(self):
@ -155,8 +144,8 @@ class THORChainMemo:
die('SwapMemoParseError', str(e))
suf = '/'.join(str(n) for n in (
tl_enc,
self.stream_interval,
self.stream_quantity))
self.swap_cfg.stream_interval,
self.swap_cfg.stream_quantity))
ret = ':'.join([
self.function_abbrevs[self.function],
self.asset.memo_asset_name,

View file

@ -17,8 +17,6 @@ from collections import namedtuple
from ....amt import UniAmt
from . import Memo
_gd = namedtuple('gas_unit_data', ['code', 'disp'])
gas_unit_data = {
'satsperbyte': _gd('s', 'sat/byte'),
@ -64,7 +62,7 @@ class Thornode:
self.in_amt = UniAmt(f'{amt:.8f}')
self.rpc = ThornodeRPCClient(tx)
def get_quote(self):
def get_quote(self, swap_cfg):
def get_data(send, recv, amt):
get_str = (
@ -72,7 +70,7 @@ class Thornode:
f'from_asset={send}&'
f'to_asset={recv}&'
f'amount={amt}&'
f'streaming_interval={Memo.stream_interval}')
f'streaming_interval={swap_cfg.stream_interval}')
data = json.loads(self.rpc.get(get_str).content)
if not 'expiry' in data:
from ....util import pp_fmt, die

View file

@ -87,7 +87,7 @@ class Bump(Completed, NewSwap):
if self.is_swap:
self.recv_proto = self.check_swap_memo().proto
self.init_swap_cfg()
self.swap_cfg = self.swap_proto_mod.SwapCfg(self.cfg)
fee_hint = await self.update_vault_output(self.send_amt)
else:
fee_hint = None

View file

@ -453,7 +453,6 @@ class New(Base):
cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
if self.is_swap:
cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
self.init_swap_cfg()
from ..rpc import rpc_init
self.rpc = await rpc_init(self.cfg, self.proto)
from ..addrdata import TwAddrData

View file

@ -156,10 +156,15 @@ class NewSwap(New):
'To sign this transaction, autosign or txsign must be invoked'
' with --allow-non-wallet-swap'))
sc = self.swap_cfg = self.swap_proto_mod.SwapCfg(self.cfg)
memo = sp.Memo(
self.swap_cfg,
self.recv_proto,
self.recv_asset,
recv_output.addr)
recv_output.addr,
# sc.trade_limit could be a float:
trade_limit = sc.trade_limit if isinstance(sc.trade_limit, UniAmt) else None)
# this goes into the transaction file:
self.swap_recv_addr_mmid = recv_output.mmid
@ -169,14 +174,6 @@ class NewSwap(New):
[f'vault,{args.send_amt}', chg_output.mmid, f'data:{memo}'] if args.send_amt else
['vault', f'data:{memo}'])
def init_swap_cfg(self):
if s := self.cfg.trade_limit:
self.usr_trade_limit = (
1 - float(s[:-1]) / 100 if s.endswith('%') else
UniAmt(self.cfg.trade_limit))
else:
self.usr_trade_limit = None
def update_vault_addr(self, c, *, addr='inbound_address'):
vault_idx = self.vault_idx
assert vault_idx == 0, f'{vault_idx}: vault index is not zero!'
@ -192,16 +189,16 @@ class NewSwap(New):
from ..term import get_char
def get_trade_limit():
if type(self.usr_trade_limit) is UniAmt:
return self.usr_trade_limit
elif type(self.usr_trade_limit) is float:
if type(self.swap_cfg.trade_limit) is UniAmt:
return self.swap_cfg.trade_limit
elif type(self.swap_cfg.trade_limit) is float:
return (
UniAmt(int(c.data['expected_amount_out']), from_unit='satoshi')
* self.usr_trade_limit)
* self.swap_cfg.trade_limit)
while True:
self.cfg._util.qmsg(f'Retrieving data from {c.rpc.host}...')
c.get_quote()
c.get_quote(self.swap_cfg)
self.cfg._util.qmsg('OK')
self.swap_quote_refresh_time = time.time()
await self.set_gas(to_addr=c.router if self.is_token else None)

View file

@ -7,13 +7,46 @@ test.modtest_d.swap: swap unit tests for the MMGen suite
from mmgen.color import cyan
from mmgen.cfg import Config
from mmgen.amt import UniAmt
from mmgen.swap.proto.thorchain import SwapAsset, Memo
from mmgen.swap.proto.thorchain import SwapCfg, SwapAsset, Memo
from mmgen.protocol import init_proto
from ..include.common import cfg, vmsg, make_burn_addr
class unit_tests:
def cfg(self, name, ut, desc='Swap configuration'):
for tl_arg, tl_chk in (
(None, None),
('1', UniAmt('1')),
('33', UniAmt('33')),
('2%', 0.98),
('-2%', 1.02),
('3.333%', 0.96667),
('-3.333%', 1.03333),
('1.2345', UniAmt('1.2345'))):
cfg_data = {'trade_limit': tl_arg}
sc = SwapCfg(Config(cfg_data))
vmsg(f' trade_limit: {tl_arg} => {sc.trade_limit}')
assert sc.trade_limit == tl_chk
assert sc.stream_interval == 3
assert sc.stream_quantity == 0
vmsg('\n Testing error handling')
def bad1():
SwapCfg(Config({'trade_limit': 'x'}))
def bad2():
SwapCfg(Config({'trade_limit': '1.23x'}))
ut.process_bad_data((
('bad1', 'SwapCfgValueError', 'invalid parameter', bad1),
('bad2', 'SwapCfgValueError', 'invalid parameter', bad2),
), pfx='')
return True
def asset(self, name, ut, desc='SwapAsset class'):
for name, full_name, memo_name, chain, asset, direction in (
('BTC', 'BTC.BTC', 'b', 'BTC', None, 'recv'),
@ -53,7 +86,9 @@ class unit_tests:
(None, 0, '0/3/0'),
):
vmsg('\nTesting memo initialization:')
swap_cfg = SwapCfg(Config({'trade_limit': limit}))
m = Memo(
swap_cfg,
proto,
asset,
addr,
@ -116,7 +151,7 @@ class unit_tests:
proto = init_proto(cfg, coin, need_amt=True)
addr = make_burn_addr(proto, 'C')
asset = SwapAsset(coin, 'send')
Memo(proto, asset, addr, trade_limit=None)
Memo(swap_cfg, proto, asset, addr, trade_limit=None)
def bad11():
SwapAsset('XYZ', 'send')