swap: new SwapAsset class

This commit is contained in:
The MMGen Project 2025-04-21 14:01:15 +00:00
commit f33957d1bb
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
15 changed files with 205 additions and 71 deletions

View file

@ -71,6 +71,7 @@ class MoneroMMGenTXFileParseError(Exception): mmcode = 2
class AutosignTXError(Exception): mmcode = 2
class MMGenImportError(Exception): mmcode = 2
class SwapMemoParseError(Exception): mmcode = 2
class SwapAssetError(Exception): mmcode = 2
# 3: yellow hl, 'MMGen Error' + exception + message
class RPCFailure(Exception): mmcode = 3

View file

@ -25,6 +25,7 @@ class NewSwap(New, TxNewSwap):
parsed_memo = sp.Memo.parse(o['data'].decode())
memo = sp.Memo(
self.recv_proto,
self.recv_asset,
self.recv_proto.coin_addr(parsed_memo.address),
trade_limit = trade_limit)
o['data'] = f'data:{memo}'

View file

@ -25,6 +25,7 @@ class NewSwap(New, TxNewSwap):
parsed_memo = sp.Memo.parse(data.decode())
memo = sp.Memo(
self.recv_proto,
self.recv_asset,
self.recv_proto.coin_addr(parsed_memo.address),
trade_limit = trade_limit)
self.usr_contract_data = str(memo).encode()

66
mmgen/swap/asset.py Normal file
View file

@ -0,0 +1,66 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.asset: swap asset class the MMGen Wallet suite
"""
from collections import namedtuple
from ..util import die
class SwapAsset:
_ad = namedtuple('swap_asset_data', ['desc', 'name', 'full_name', 'abbr'])
assets_data = {}
send = ()
recv = ()
evm_chains = ()
@classmethod
def get_full_name(self, s):
for d in self.assets_data.values():
if s in (d.abbr, d.full_name):
return d.full_name or f'{d.name}.{d.name}'
die('SwapAssetError', f'{s!r}: unrecognized asset name or abbreviation')
@property
def chain(self):
return self.data.full_name.split('.', 1)[0] if self.data.full_name else self.name
@property
def asset(self):
return self.data.full_name.split('.', 1)[1] if self.data.full_name else None
@property
def full_name(self):
return self.data.full_name or f'{self.data.name}.{self.data.name}'
@property
def memo_asset_name(self):
return self.data.abbr or self.data.full_name
def __init__(self, name, direction):
if name not in self.assets_data:
die('SwapAssetError', f'{name!r}: unrecognized asset')
assert direction in ('send', 'recv'), 'direction must be ‘send’ or ‘recv’'
if direction == 'send' and name not in self.send:
die('SwapAssetError', f'{name!r} unsupported send asset')
if direction == 'recv' and name not in self.recv:
die('SwapAssetError', f'{name!r} unsupported receive asset')
self.direction = direction
self.name = name
self.data = self.assets_data[name]
self.desc = self.data.desc

View file

@ -12,34 +12,20 @@
swap.proto.thorchain: THORChain swap protocol implementation for the MMGen Wallet suite
"""
__all__ = ['Memo']
__all__ = ['SwapAsset', 'Memo']
name = 'THORChain'
class params:
exp_prec = 4
coins = {
'send': {
'BTC': 'Bitcoin',
'LTC': 'Litecoin',
'BCH': 'Bitcoin Cash',
'ETH': 'Ethereum',
},
'receive': {
'BTC': 'Bitcoin',
'LTC': 'Litecoin',
'BCH': 'Bitcoin Cash',
'ETH': 'Ethereum',
}
}
exp_prec = 4
from ....util2 import ExpInt
class ExpInt4(ExpInt):
def __new__(cls, spec):
return ExpInt.__new__(cls, spec, prec=params.exp_prec)
return ExpInt.__new__(cls, spec, prec=exp_prec)
def rpc_client(tx, amt):
from .thornode import Thornode
return Thornode(tx, amt)
from .asset import THORChainSwapAsset as SwapAsset
from .memo import THORChainMemo as Memo

View file

@ -0,0 +1,31 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.asset: THORChain swap asset class the MMGen Wallet suite
"""
from ...asset import SwapAsset
class THORChainSwapAsset(SwapAsset):
_ad = SwapAsset._ad
assets_data = {
'BTC': _ad('Bitcoin', 'BTC', None, 'b'),
'LTC': _ad('Litecoin', 'LTC', None, 'l'),
'BCH': _ad('Bitcoin Cash', 'BCH', None, 'c'),
'ETH': _ad('Ethereum', 'ETH', None, 'e'),
'DOGE': _ad('Dogecoin', 'DOGE', None, 'd'),
'RUNE': _ad('THORChain Rune', 'RUNE', 'THOR.RUNE', 'r'),
}
send = ('BTC', 'LTC', 'BCH', 'ETH')
recv = ('BTC', 'LTC', 'BCH', 'ETH')
evm_chains = ('ETH', 'AVAX', 'BSC', 'BASE')

View file

@ -17,6 +17,8 @@ from ....amt import UniAmt
from . import name as proto_name
from . import SwapAsset
class THORChainMemo:
# The trade limit, i.e., set 100000000 to get a minimum of 1 full asset, else a refund
@ -33,17 +35,6 @@ class THORChainMemo:
max_len = 250
function = 'SWAP'
asset_abbrevs = {
'BTC.BTC': 'b',
'LTC.LTC': 'l',
'BCH.BCH': 'c',
'ETH.ETH': 'e',
'DOGE.DOGE': 'd',
'THOR.RUNE': 'r',
}
evm_chains = ('ETH', 'AVAX', 'BSC', 'BASE')
function_abbrevs = {
'SWAP': '=',
}
@ -93,11 +84,11 @@ class THORChainMemo:
function = get_id(cls.function_abbrevs, get_item('function'), 'function')
chain, asset = get_id(cls.asset_abbrevs, get_item('asset'), 'asset').split('.')
chain, asset = SwapAsset.get_full_name(get_item('asset')).split('.')
address = get_item('address')
if chain in cls.evm_chains:
if chain in SwapAsset.evm_chains:
assert address.startswith('0x'), f'{address}: address does not start with ‘0x’'
assert len(address) == 42, f'{address}: address has incorrect length ({len(address)} != 42)'
address = address.removeprefix('0x')
@ -129,9 +120,13 @@ class THORChainMemo:
return ret(proto_name, function, chain, asset, address, limit_int, int(interval), int(quantity))
def __init__(self, proto, addr, *, chain=None, trade_limit=None):
def __init__(self, proto, asset, addr, *, trade_limit=None):
self.proto = proto
self.chain = chain or proto.coin
self.asset = asset
assert asset.chain == proto.coin, f'{asset.chain} != {proto.coin}'
assert asset.asset == getattr(proto, 'tokensym', None), (
f'{asset.asset} != {getattr(proto, "tokensym", None)}')
assert asset.direction == 'recv', f'{asset.direction} != ‘recv’'
if trade_limit is None:
self.trade_limit = UniAmt('0')
else:
@ -142,7 +137,7 @@ class THORChainMemo:
self.addr = addr.views[addr.view_pref]
assert not ':' in self.addr # colon is record separator, so address mustn’t contain one
if self.chain in self.evm_chains:
if asset.chain in SwapAsset.evm_chains:
assert len(self.addr) == 40, f'{self.addr}: address has incorrect length ({len(self.addr)} != 40)'
assert is_hex_str(self.addr), f'{self.addr}: address is not a hexadecimal string'
self.addr = '0x' + self.addr
@ -154,10 +149,9 @@ class THORChainMemo:
except Exception as e:
die('SwapMemoParseError', str(e))
suf = '/'.join(str(n) for n in (tl_enc, self.stream_interval, self.stream_quantity))
asset = f'{self.chain}.{self.proto.coin}'
ret = ':'.join([
self.function_abbrevs[self.function],
self.asset_abbrevs[asset],
self.asset.memo_asset_name,
self.addr,
suf])
assert len(ret) <= self.max_len, f'{proto_name} memo exceeds maximum length of {self.max_len}'

View file

@ -60,9 +60,9 @@ class Thornode:
self.rpc = ThornodeRPCClient(tx)
def get_quote(self):
self.get_str = '/thorchain/quote/swap?from_asset={a}.{a}&to_asset={b}.{b}&amount={c}'.format(
a = self.tx.proto.coin,
b = self.tx.recv_proto.coin,
self.get_str = '/thorchain/quote/swap?from_asset={a}&to_asset={b}&amount={c}'.format(
a = self.tx.send_asset.full_name,
b = self.tx.recv_asset.full_name,
c = self.in_amt.to_unit('satoshi'))
self.result = self.rpc.get(self.get_str)
self.data = json.loads(self.result.content)
@ -78,8 +78,8 @@ class Thornode:
d = self.data
tx = self.tx
in_coin = tx.proto.coin
out_coin = tx.recv_proto.coin
in_coin = tx.send_asset.chain
out_coin = tx.recv_asset.chain
in_amt = self.in_amt
out_amt = UniAmt(int(d['expected_amount_out']), from_unit='satoshi')
gas_unit = d['gas_rate_units']

View file

@ -84,6 +84,7 @@ class Base(MMGenObject):
swap_proto = None
swap_quote_expiry = None
swap_recv_addr_mmid = None
swap_recv_asset_spec = None
file_format = 'json'
non_mmgen_inputs_msg = f"""
This transaction includes inputs with non-{gc.proj_name} addresses. When

View file

@ -12,7 +12,7 @@
tx.bump: transaction bump class
"""
from .new_swap import NewSwap
from .new_swap import NewSwap, get_swap_proto_mod
from .completed import Completed
from ..util import msg, ymsg, is_int, die
from ..color import pink
@ -26,7 +26,8 @@ class Bump(Completed, NewSwap):
'is_swap',
'swap_proto',
'swap_quote_expiry',
'swap_recv_addr_mmid')
'swap_recv_addr_mmid',
'swap_recv_asset_spec')
def __init__(self, *, check_sent, new_outputs, **kwargs):
@ -35,11 +36,16 @@ class Bump(Completed, NewSwap):
self.new_outputs = new_outputs
self.orig_rel_fee = self.get_orig_rel_fee()
if new_outputs:
from .base import Base
if self.is_swap:
if self.is_swap:
if new_outputs:
from .base import Base
for attr in self.swap_attrs:
setattr(self, attr, getattr(Base, attr))
else:
sp = get_swap_proto_mod(self.swap_proto)
self.recv_asset = sp.SwapAsset(self.swap_recv_asset_spec, 'recv')
if new_outputs:
self.outputs = self.OutputList(self)
self.cfg = kwargs['cfg'] # must use current cfg opts, not those from orig_tx

View file

@ -73,7 +73,8 @@ class MMGenTxFile(MMGenObject):
'is_swap': None,
'swap_proto': None,
'swap_quote_expiry': None,
'swap_recv_addr_mmid': None}
'swap_recv_addr_mmid': None,
'swap_recv_asset_spec': None}
def __init__(self, tx):
self.tx = tx

View file

@ -13,7 +13,6 @@ tx.new_swap: new swap transaction class
"""
from collections import namedtuple
from ..cfg import gc
from .new import New
from ..amt import UniAmt
@ -22,18 +21,25 @@ def get_swap_proto_mod(swap_proto_name):
import importlib
return importlib.import_module(f'mmgen.swap.proto.{swap_proto_name}')
def init_proto_from_coin(cfg, sp, coin, desc):
if coin not in sp.params.coins[desc]:
raise ValueError(f'{coin!r}: unsupported {desc} coin for {gc.proj_name} {sp.name} swap')
def init_swap_proto(cfg, asset):
from ..protocol import init_proto
return init_proto(cfg, coin, network=cfg._proto.network, need_amt=True)
return init_proto(
cfg,
asset.chain,
network = cfg._proto.network,
tokensym = asset.asset,
need_amt = True)
def get_send_proto(cfg):
try:
arg = cfg._args.pop(0)
except:
cfg._usage()
return init_proto_from_coin(cfg, get_swap_proto_mod(cfg.swap_proto), arg, 'send')
global send_asset
send_asset = get_swap_proto_mod(cfg.swap_proto).SwapAsset(arg, 'send')
return init_swap_proto(cfg, send_asset)
class NewSwap(New):
desc = 'swap transaction'
@ -98,12 +104,14 @@ class NewSwap(New):
# arg 3: chg_spec (change address spec)
if args.send_amt and not self.proto.is_evm:
if not arg in sp.params.coins['receive']: # is change arg
if not arg in sp.SwapAsset.recv: # is change arg
args.chg_spec = arg
arg = get_arg()
# arg 4: recv_coin
self.recv_proto = init_proto_from_coin(self.cfg, sp, arg, 'receive')
self.swap_recv_asset_spec = arg # this goes into the transaction file
self.recv_asset = sp.SwapAsset(arg, 'recv')
self.recv_proto = init_swap_proto(self.cfg, self.recv_asset)
# arg 5: recv_spec (receive address spec)
if args_in:
@ -139,7 +147,10 @@ class NewSwap(New):
'To sign this transaction, autosign or txsign must be invoked'
' with --allow-non-wallet-swap'))
memo = sp.Memo(self.recv_proto, recv_output.addr)
memo = sp.Memo(self.recv_proto, self.recv_asset, recv_output.addr)
self.is_token_swap = self.proto.tokensym or self.recv_asset.asset
self.send_asset = send_asset
# this goes into the transaction file:
self.swap_recv_addr_mmid = recv_output.mmid

View file

@ -75,12 +75,12 @@ class ThornodeServer(HTTPD):
from wsgiref.util import request_uri
m = re.search(request_pat, request_uri(environ))
_, send_coin, _, recv_coin, amt_atomic = m.groups()
send_chain, send_asset, recv_chain, recv_asset, amt_atomic = m.groups()
from mmgen.protocol import init_proto
send_proto = init_proto(cfg, send_coin, network='regtest', need_amt=True)
send_proto = init_proto(cfg, send_chain, network='regtest', need_amt=True)
in_amt = UniAmt(int(amt_atomic), from_unit='satoshi')
out_amt = in_amt * (prices[send_coin] / prices[recv_coin])
out_amt = in_amt * (prices[send_asset] / prices[recv_asset])
addr = make_inbound_addr(send_proto, send_proto.preferred_mmtypes[0])
data = data_template | {

View file

@ -594,10 +594,10 @@ class CmdTestSwap(CmdTestSwapMethods, CmdTestRegtest, CmdTestAutosignThreaded):
return t
def swaptxcreate_bad3(self):
return self._swaptxcreate_bad(['RTC', 'LTC'], expect1='unsupported send coin')
return self._swaptxcreate_bad(['RTC', 'LTC'], exit_val=2, expect1='unrecognized asset')
def swaptxcreate_bad4(self):
return self._swaptxcreate_bad(['LTC', 'XTC'], expect1='unsupported receive coin')
return self._swaptxcreate_bad(['LTC', 'XTC'], exit_val=2, expect1='unrecognized asset')
def swaptxcreate_bad5(self):
return self._swaptxcreate_bad(['LTC'], expect1='USAGE:')

View file

@ -8,21 +8,40 @@ from mmgen.color import cyan
from ..include.common import cfg, vmsg, make_burn_addr
from mmgen.swap.proto.thorchain import SwapAsset
class unit_tests:
def asset(self, name, ut, desc='SwapAsset class'):
for name, full_name, memo_name, chain, asset, direction in (
('BTC', 'BTC.BTC', 'b', 'BTC', None, 'recv'),
('LTC', 'LTC.LTC', 'l', 'LTC', None, 'recv'),
('BCH', 'BCH.BCH', 'c', 'BCH', None, 'recv'),
):
a = SwapAsset(name, direction)
vmsg(f' {a.name}')
assert a.name == name
assert a.full_name == full_name
assert a.direction == direction
assert a.asset == asset
assert a.chain == chain
assert a.memo_asset_name == memo_name
return True
def memo(self, name, ut, desc='Swap transaction memo'):
from mmgen.protocol import init_proto
from mmgen.amt import UniAmt
from mmgen.swap.proto.thorchain import Memo
for coin, addrtype in (
('ltc', 'bech32'),
('bch', 'compressed'),
('eth', None),
for coin, addrtype, asset_name, token in (
('ltc', 'bech32', 'LTC', None),
('bch', 'compressed', 'BCH', None),
('eth', None, 'ETH', None),
):
proto = init_proto(cfg, coin, need_amt=True)
proto = init_proto(cfg, coin, tokensym=token, need_amt=True)
addr = make_burn_addr(proto, addrtype)
asset = SwapAsset(asset_name, 'recv')
vmsg(f'\nTesting coin {cyan(coin.upper())}:')
vmsg(f'\nTesting asset {cyan(asset_name)}:')
for limit, limit_chk in (
('123.4567', 12340000000),
@ -32,7 +51,7 @@ class unit_tests:
(None, 0),
):
vmsg('\nTesting memo initialization:')
m = Memo(proto, addr, trade_limit=UniAmt(limit) if limit else None)
m = Memo(proto, asset, addr, trade_limit=UniAmt(limit) if limit else None)
vmsg(f'str(memo): {m}')
vmsg(f'repr(memo): {m!r}')
vmsg(f'limit: {limit}')
@ -47,7 +66,7 @@ class unit_tests:
assert p.proto == 'THORChain'
assert p.function == 'SWAP'
assert p.chain == coin.upper()
assert p.asset == coin.upper()
assert p.asset == token or coin.upper()
assert p.address == addr.views[addr.view_pref]
assert p.trade_limit == limit_chk
assert p.stream_interval == 1
@ -84,14 +103,30 @@ class unit_tests:
def bad(s):
return lambda: Memo.parse(s)
def bad10():
coin = 'BTC'
proto = init_proto(cfg, coin, need_amt=True)
addr = make_burn_addr(proto, 'C')
asset = SwapAsset(coin, 'send')
Memo(proto, asset, addr)
def bad11():
SwapAsset('XYZ', 'send')
def bad12():
SwapAsset('DOGE', 'send')
ut.process_bad_data((
('bad1', 'SwapMemoParseError', 'must contain', bad('x')),
('bad2', 'SwapMemoParseError', 'must contain', bad('y:z:x')),
('bad3', 'SwapMemoParseError', 'function abbrev', bad('z:l:foobar:0/1/0')),
('bad4', 'SwapMemoParseError', 'asset abbrev', bad('=:x:foobar:0/1/0')),
('bad4', 'SwapAssetError', 'unrecognized', bad('=:x:foobar:0/1/0')),
('bad5', 'SwapMemoParseError', 'failed to parse', bad('=:l:foobar:n')),
('bad6', 'SwapMemoParseError', 'invalid specifier', bad('=:l:foobar:x/1/0')),
('bad7', 'SwapMemoParseError', 'extra', bad('=:l:foobar:0/1/0:x')),
('bad10', 'AssertionError', 'recv', bad10),
('bad11', 'SwapAssetError', 'unrecognized', bad11),
('bad12', 'SwapAssetError', 'unsupported', bad12),
), pfx='')
return True