| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- #!/usr/bin/env python3
- """
- test.modtest_d.swap: swap unit tests for the MMGen suite
- """
- from mmgen.color import cyan
- from mmgen.cfg import Config
- from mmgen.amt import UniAmt
- from mmgen.swap.proto.thorchain import SwapCfg, SwapAsset, Memo
- from mmgen.protocol import init_proto
- from ..include.common import cfg, vmsg, make_burn_addr
- class unit_tests:
- def cfg(self, name, ut, desc='Swap configuration'):
- for tl_arg, tl_chk, si_arg in (
- (None, None, None),
- ('1', UniAmt('1'), None),
- ('33', UniAmt('33'), 7),
- ('2%', 0.98, 14),
- ('-2%', 1.02, 1),
- ('3.333%', 0.96667, 1),
- ('-3.333%', 1.03333, 3),
- ('1.2345', UniAmt('1.2345'), 10)):
- cfg_data = {
- 'trade_limit': tl_arg,
- 'stream_interval': None if si_arg is None else str(si_arg)}
- sc = SwapCfg(Config(cfg_data))
- vmsg(f' trade_limit: {tl_arg} => {sc.trade_limit}')
- vmsg(f' stream_interval: {si_arg} => {sc.stream_interval}')
- assert sc.trade_limit == tl_chk
- assert sc.stream_interval == sc.si.dfl if si_arg is None else si_arg
- assert sc.stream_quantity == 0
- vmsg('\n Testing error handling')
- def bad1():
- SwapCfg(Config({'trade_limit': 'x'}))
- def bad2():
- SwapCfg(Config({'trade_limit': '1.23x'}))
- def bad3():
- SwapCfg(Config({'stream_interval': 30}))
- def bad4():
- SwapCfg(Config({'stream_interval': 0}))
- def bad5():
- SwapCfg(Config({'stream_interval': 'x'}))
- ut.process_bad_data((
- ('bad1', 'SwapCfgValueError', 'invalid parameter', bad1),
- ('bad2', 'SwapCfgValueError', 'invalid parameter', bad2),
- ('bad3', 'SwapCfgValueError', 'invalid parameter', bad3),
- ('bad4', 'SwapCfgValueError', 'invalid parameter', bad4),
- ('bad5', 'SwapCfgValueError', 'invalid parameter', bad5),
- ), pfx='')
- return True
- def asset(self, name, ut, desc='SwapAsset class'):
- for name, full_name, memo_name, chain, tokensym, direction in (
- ('BTC', 'BTC.BTC', 'b', 'BTC', None, 'recv'),
- ('LTC', 'LTC.LTC', 'l', 'LTC', None, 'recv'),
- ('BCH', 'BCH.BCH', 'c', 'BCH', None, 'recv'),
- ('ETH.USDT', 'ETH.USDT', 'ETH.USDT', 'ETH', 'USDT', 'recv'),
- ):
- a = SwapAsset(name, direction)
- vmsg(f' {a.name}')
- assert a.name == name
- assert a.full_name == full_name
- assert a.direction == direction
- assert a.tokensym == tokensym
- assert a.chain == chain
- assert a.memo_asset_name == memo_name
- return True
- def memo(self, name, ut, desc='Swap transaction memo'):
- for coin, addrtype, asset_name, token in (
- ('ltc', 'bech32', 'LTC', None),
- ('bch', 'compressed', 'BCH', None),
- ('eth', None, 'ETH', None),
- ('eth', None, 'ETH.USDT', 'USDT'),
- ):
- proto = init_proto(cfg, coin, tokensym=token, need_amt=True)
- addr = make_burn_addr(proto, addrtype)
- asset = SwapAsset(asset_name, 'recv')
- vmsg(f'\nTesting asset {cyan(asset_name)}:')
- for limit, limit_chk, si, suf in (
- ('123.4567', 12340000000, None, '1234e7/3/0'),
- ('1.234567', 123400000, 1, '1234e5/1/0'),
- ('0.01234567', 1234000, 10, '1234e3/10/0'),
- ('0.00012345', 12345, 20, '12345/20/0'),
- (None, 0, 3, '0/3/0'),
- ):
- vmsg('\nTesting memo initialization:')
- swap_cfg = SwapCfg(Config({'trade_limit': limit, 'stream_interval': si}))
- m = Memo(
- swap_cfg,
- proto,
- asset,
- addr,
- trade_limit = None if limit is None else UniAmt(limit))
- vmsg(f'str(memo): {m}')
- vmsg(f'repr(memo): {m!r}')
- vmsg(f'limit: {limit}')
- assert str(m).endswith(':' + suf), f'{m} doesn’t end with {suf}'
- p = Memo.parse(m)
- limit_dec = UniAmt(p.trade_limit, from_unit='satoshi')
- vmsg(f'limit_dec: {limit_dec.hl()}')
- vmsg('\nTesting memo parsing:')
- from pprint import pformat
- vmsg(pformat(p._asdict()))
- assert p.proto == 'THORChain'
- assert p.function == 'SWAP'
- assert p.asset.chain == coin.upper()
- assert p.asset.coin == token or coin.upper()
- assert p.address == addr.views[addr.view_pref]
- assert p.trade_limit == limit_chk
- assert p.stream_interval == si or swap_cfg.si.dfl, f'{p.stream_interval} != {swap_cfg.si.dfl}'
- assert p.stream_quantity == 0 # auto
- vmsg('\nTesting is_partial_memo():')
- for vec in (
- str(m),
- 'SWAP:xyz',
- '=:xyz',
- 's:xyz',
- 'a:xz',
- '+:xz',
- 'WITHDRAW:xz',
- 'LOAN+:xz:x:x',
- 'TRADE-:xz:x:x',
- 'BOND:xz',
- ):
- vmsg(f' pass: {vec}')
- assert Memo.is_partial_memo(vec.encode('ascii')), vec
- for vec in (
- '=',
- 'swap',
- 'swap:',
- 'swap:abc',
- 'SWAP:a',
- ):
- vmsg(f' fail: {vec}')
- assert not Memo.is_partial_memo(vec.encode('ascii')), vec
- vmsg('\nTesting error handling:')
- def bad(s):
- return lambda: Memo.parse(s)
- def bad10():
- coin = 'BTC'
- proto = init_proto(cfg, coin, need_amt=True)
- addr = make_burn_addr(proto, 'C')
- asset = SwapAsset(coin, 'send')
- Memo(swap_cfg, proto, asset, addr, trade_limit=None)
- def bad11():
- SwapAsset('XYZ', 'send')
- def bad12():
- SwapAsset('DOGE', 'send')
- ut.process_bad_data((
- ('bad1', 'SwapMemoParseError', 'must contain', bad('x')),
- ('bad2', 'SwapMemoParseError', 'must contain', bad('y:z:x')),
- ('bad3', 'SwapMemoParseError', 'function abbrev', bad('z:l:foobar:0/3/0')),
- ('bad4', 'SwapAssetError', 'unrecognized', bad('=:x:foobar:0/3/0')),
- ('bad5', 'SwapMemoParseError', 'failed to parse', bad('=:l:foobar:n')),
- ('bad6', 'SwapMemoParseError', 'invalid specifier', bad('=:l:foobar:x/3/0')),
- ('bad7', 'SwapMemoParseError', 'extra', bad('=:l:foobar:0/3/0:x')),
- ('bad10', 'AssertionError', 'recv', bad10),
- ('bad11', 'SwapAssetError', 'unrecognized', bad11),
- ('bad12', 'SwapAssetError', 'unsupported', bad12),
- ), pfx='')
- return True
|