mmgen-wallet/test/modtest_d/swap.py

190 lines
5.8 KiB
Python
Executable file

#!/usr/bin/env python3
"""
test.modtest_d.swap: swap unit tests for the MMGen suite
"""
from mmgen.color import cyan
from mmgen.cfg import Config
from mmgen.amt import UniAmt
from mmgen.swap.proto.thorchain import SwapCfg, SwapAsset, Memo
from mmgen.protocol import init_proto
from ..include.common import cfg, vmsg, make_burn_addr
class unit_tests:
def cfg(self, name, ut, desc='Swap configuration'):
for tl_arg, tl_chk, si_arg in (
(None, None, None),
('1', UniAmt('1'), None),
('33', UniAmt('33'), 7),
('2%', 0.98, 14),
('-2%', 1.02, 1),
('3.333%', 0.96667, 1),
('-3.333%', 1.03333, 3),
('1.2345', UniAmt('1.2345'), 10)):
cfg_data = {
'trade_limit': tl_arg,
'stream_interval': None if si_arg is None else str(si_arg)}
sc = SwapCfg(Config(cfg_data))
vmsg(f' trade_limit: {tl_arg} => {sc.trade_limit}')
vmsg(f' stream_interval: {si_arg} => {sc.stream_interval}')
assert sc.trade_limit == tl_chk
assert sc.stream_interval == sc.si.dfl if si_arg is None else si_arg
assert sc.stream_quantity == 0
vmsg('\n Testing error handling')
def bad1():
SwapCfg(Config({'trade_limit': 'x'}))
def bad2():
SwapCfg(Config({'trade_limit': '1.23x'}))
def bad3():
SwapCfg(Config({'stream_interval': 30}))
def bad4():
SwapCfg(Config({'stream_interval': 0}))
def bad5():
SwapCfg(Config({'stream_interval': 'x'}))
ut.process_bad_data((
('bad1', 'SwapCfgValueError', 'invalid parameter', bad1),
('bad2', 'SwapCfgValueError', 'invalid parameter', bad2),
('bad3', 'SwapCfgValueError', 'invalid parameter', bad3),
('bad4', 'SwapCfgValueError', 'invalid parameter', bad4),
('bad5', 'SwapCfgValueError', 'invalid parameter', bad5),
), pfx='')
return True
def asset(self, name, ut, desc='SwapAsset class'):
for name, full_name, memo_name, chain, tokensym, direction in (
('BTC', 'BTC.BTC', 'b', 'BTC', None, 'recv'),
('LTC', 'LTC.LTC', 'l', 'LTC', None, 'recv'),
('BCH', 'BCH.BCH', 'c', 'BCH', None, 'recv'),
('ETH.USDT', 'ETH.USDT', 'ETH.USDT', 'ETH', 'USDT', 'recv'),
):
a = SwapAsset(name, direction)
vmsg(f' {a.name}')
assert a.name == name
assert a.full_name == full_name
assert a.direction == direction
assert a.tokensym == tokensym
assert a.chain == chain
assert a.memo_asset_name == memo_name
return True
def memo(self, name, ut, desc='Swap transaction memo'):
for coin, addrtype, asset_name, token in (
('ltc', 'bech32', 'LTC', None),
('bch', 'compressed', 'BCH', None),
('eth', None, 'ETH', None),
('eth', None, 'ETH.USDT', 'USDT'),
):
proto = init_proto(cfg, coin, tokensym=token, need_amt=True)
addr = make_burn_addr(proto, addrtype)
asset = SwapAsset(asset_name, 'recv')
vmsg(f'\nTesting asset {cyan(asset_name)}:')
for limit, limit_chk, si, suf in (
('123.4567', 12340000000, None, '1234e7/3/0'),
('1.234567', 123400000, 1, '1234e5/1/0'),
('0.01234567', 1234000, 10, '1234e3/10/0'),
('0.00012345', 12345, 20, '12345/20/0'),
(None, 0, 3, '0/3/0'),
):
vmsg('\nTesting memo initialization:')
swap_cfg = SwapCfg(Config({'trade_limit': limit, 'stream_interval': si}))
m = Memo(
swap_cfg,
proto,
asset,
addr,
trade_limit = None if limit is None else UniAmt(limit))
vmsg(f'str(memo): {m}')
vmsg(f'repr(memo): {m!r}')
vmsg(f'limit: {limit}')
assert str(m).endswith(':' + suf), f'{m} doesn’t end with {suf}'
p = Memo.parse(m)
limit_dec = UniAmt(p.trade_limit, from_unit='satoshi')
vmsg(f'limit_dec: {limit_dec.hl()}')
vmsg('\nTesting memo parsing:')
from pprint import pformat
vmsg(pformat(p._asdict()))
assert p.proto == 'THORChain'
assert p.function == 'SWAP'
assert p.asset.chain == coin.upper()
assert p.asset.coin == token or coin.upper()
assert p.address == addr.views[addr.view_pref]
assert p.trade_limit == limit_chk
assert p.stream_interval == si or swap_cfg.si.dfl, f'{p.stream_interval} != {swap_cfg.si.dfl}'
assert p.stream_quantity == 0 # auto
vmsg('\nTesting is_partial_memo():')
for vec in (
str(m),
'SWAP:xyz',
'=:xyz',
's:xyz',
'a:xz',
'+:xz',
'WITHDRAW:xz',
'LOAN+:xz:x:x',
'TRADE-:xz:x:x',
'BOND:xz',
):
vmsg(f' pass: {vec}')
assert Memo.is_partial_memo(vec.encode('ascii')), vec
for vec in (
'=',
'swap',
'swap:',
'swap:abc',
'SWAP:a',
):
vmsg(f' fail: {vec}')
assert not Memo.is_partial_memo(vec.encode('ascii')), vec
vmsg('\nTesting error handling:')
def bad(s):
return lambda: Memo.parse(s)
def bad10():
coin = 'BTC'
proto = init_proto(cfg, coin, need_amt=True)
addr = make_burn_addr(proto, 'C')
asset = SwapAsset(coin, 'send')
Memo(swap_cfg, proto, asset, addr, trade_limit=None)
def bad11():
SwapAsset('XYZ', 'send')
def bad12():
SwapAsset('DOGE', 'send')
ut.process_bad_data((
('bad1', 'SwapMemoParseError', 'must contain', bad('x')),
('bad2', 'SwapMemoParseError', 'must contain', bad('y:z:x')),
('bad3', 'SwapMemoParseError', 'function abbrev', bad('z:l:foobar:0/3/0')),
('bad4', 'SwapAssetError', 'unrecognized', bad('=:x:foobar:0/3/0')),
('bad5', 'SwapMemoParseError', 'failed to parse', bad('=:l:foobar:n')),
('bad6', 'SwapMemoParseError', 'invalid specifier', bad('=:l:foobar:x/3/0')),
('bad7', 'SwapMemoParseError', 'extra', bad('=:l:foobar:0/3/0:x')),
('bad10', 'AssertionError', 'recv', bad10),
('bad11', 'SwapAssetError', 'unrecognized', bad11),
('bad12', 'SwapAssetError', 'unsupported', bad12),
), pfx='')
return True