memo.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. swap.proto.thorchain.memo: THORChain swap protocol memo class
  12. """
  13. from ....util import die, is_hex_str
  14. from . import name as proto_name
  15. from . import SwapAsset
  16. class THORChainMemo:
  17. max_len = 250
  18. function = 'SWAP'
  19. function_abbrevs = {
  20. 'SWAP': '='}
  21. @classmethod
  22. def is_partial_memo(cls, bytes_data):
  23. import re
  24. ops = {
  25. 'swap': ('SWAP', 's', '='),
  26. 'add': ('ADD', 'a', r'\+'),
  27. 'withdraw': ('WITHDRAW', 'wd', '-'),
  28. 'loan': (r'LOAN(\+|-)', r'\$(\+|-)'), # open/repay
  29. 'pool': (r'POOL(\+|-)',),
  30. 'trade': (r'TRADE(\+|-)',),
  31. 'secure': (r'SECURE(\+|-)',),
  32. 'misc': ('BOND', 'UNBOND', 'LEAVE', 'MIGRATE', 'NOOP', 'DONATE', 'RESERVE')}
  33. pat = r'^(' + '|'.join('|'.join(pats) for pats in ops.values()) + r'):\S\S+'
  34. return bool(re.search(pat.encode(), bytes_data))
  35. @classmethod
  36. def parse(cls, s):
  37. """
  38. All fields are validated, excluding address (cannot validate, since network is unknown)
  39. """
  40. from collections import namedtuple
  41. from ....util import is_int
  42. def get_item(desc):
  43. try:
  44. return fields.pop(0)
  45. except IndexError:
  46. die('SwapMemoParseError', f'malformed {proto_name} memo (missing {desc} field)')
  47. def get_id(data, item, desc):
  48. if item in data:
  49. return item
  50. rev_data = {v: k for k,v in data.items()}
  51. if item in rev_data:
  52. return rev_data[item]
  53. die('SwapMemoParseError', f'{item!r}: unrecognized {proto_name} {desc} abbreviation')
  54. fields = str(s).split(':')
  55. if len(fields) < 4:
  56. die('SwapMemoParseError', 'memo must contain at least 4 comma-separated fields')
  57. function = get_id(cls.function_abbrevs, get_item('function'), 'function')
  58. asset = SwapAsset.init_from_memo(get_item('asset'))
  59. address = get_item('address')
  60. if asset.chain in SwapAsset.evm_chains:
  61. assert address.startswith('0x'), f'{address}: address does not start with ‘0x’'
  62. assert len(address) == 42, f'{address}: address has incorrect length ({len(address)} != 42)'
  63. address = address.removeprefix('0x')
  64. desc = 'trade_limit/stream_interval/stream_quantity'
  65. lsq = get_item(desc)
  66. try:
  67. limit, interval, quantity = lsq.split('/')
  68. except ValueError:
  69. die('SwapMemoParseError', f'malformed memo (failed to parse {desc} field) [{lsq}]')
  70. from . import ExpInt4
  71. try:
  72. limit_int = ExpInt4(limit)
  73. except Exception as e:
  74. die('SwapMemoParseError', str(e))
  75. for n in (interval, quantity):
  76. if not is_int(n):
  77. die('SwapMemoParseError', f'malformed memo (non-integer in {desc} field [{lsq}])')
  78. if fields:
  79. die('SwapMemoParseError', 'malformed memo (unrecognized extra data)')
  80. ret = namedtuple(
  81. 'parsed_memo',
  82. ['proto', 'function', 'asset', 'address', 'trade_limit', 'stream_interval', 'stream_quantity'])
  83. return ret(proto_name, function, asset, address, limit_int, int(interval), int(quantity))
  84. def __init__(self, swap_cfg, proto, asset, addr, *, trade_limit):
  85. from ....amt import UniAmt
  86. from ....addr import is_coin_addr
  87. assert trade_limit is None or isinstance(trade_limit, UniAmt), f'{type(trade_limit)} != {UniAmt}'
  88. assert is_coin_addr(proto, addr)
  89. assert asset.coin == proto.coin, f'{asset.coin} != {proto.coin}'
  90. assert asset.tokensym == getattr(proto, 'tokensym', None), (
  91. f'{asset.tokensym} != {getattr(proto, "tokensym", None)}')
  92. assert asset.direction == 'recv', f'{asset.direction} != ‘recv’'
  93. self.addr = addr.views[addr.view_pref]
  94. assert not ':' in self.addr # colon is record separator, so address mustn’t contain one
  95. if asset.chain in SwapAsset.evm_chains:
  96. assert len(self.addr) == 40, f'{self.addr}: address has incorrect length ({len(self.addr)} != 40)'
  97. assert is_hex_str(self.addr), f'{self.addr}: address is not a hexadecimal string'
  98. self.addr = '0x' + self.addr
  99. self.proto = proto
  100. self.asset = asset
  101. self.swap_cfg = swap_cfg
  102. self.trade_limit = trade_limit
  103. def __str__(self):
  104. from . import ExpInt4
  105. try:
  106. tl_enc = (
  107. 0 if self.trade_limit is None else
  108. ExpInt4(self.trade_limit.to_unit('satoshi')).enc)
  109. except Exception as e:
  110. die('SwapMemoParseError', str(e))
  111. suf = '/'.join(str(n) for n in (
  112. tl_enc,
  113. self.swap_cfg.stream_interval,
  114. self.swap_cfg.stream_quantity))
  115. ret = ':'.join([
  116. self.function_abbrevs[self.function],
  117. self.asset.memo_asset_name,
  118. self.addr,
  119. suf])
  120. assert len(ret) <= self.max_len, f'{proto_name} memo exceeds maximum length of {self.max_len}'
  121. return ret