THORChain DEX integration

Cross-chain native asset swaps directly from MMGen Wallet!

Currently supported coins: BTC, LTC, BCH.  Work on ETH support is underway.

All supported asset pairs have undergone thorough testing on mainnet.

Sample workflow for a BTC->LTC swap (assumes offline autosigning is set up and
the removable device inserted on the online machine):

    $ mmgen-swaptxcreate --autosign BTC LTC

    remove device - insert - wait for signing - remove - insert

    $ mmgen-txsend --autosign

Note that other command-line options and arguments will likely be required. For
further information, see:

    $ mmgen-swaptxcreate --help

Be aware that transactions stuck for a long time in the mempool can potentially
lead to loss of funds, so users should first learn how to create replacement
transactions with ‘mmgen-txbump’ before attempting a swap.  In all cases, it’s
advisable to begin with small amounts.  Double-checking the vault address on a
block explorer such as thorchain.net or runescan.io before sending the
transaction is also recommended.

Testing:

    $ test/modtest.py tx.memo
    $ test/cmdtest.py regtest_legacy.main autosign_automount swap
This commit is contained in:
The MMGen Project 2025-02-24 11:27:49 +00:00
commit 85cec5655d
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
40 changed files with 1510 additions and 72 deletions

View file

@ -22,6 +22,7 @@ amt: MMGen CoinAmt and related classes
from decimal import Decimal
from .objmethods import Hilite, InitErrors
from .obj import get_obj
class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
"""
@ -155,6 +156,9 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
def __mod__(self, *args, **kwargs):
self.method_not_implemented()
def is_coin_amt(proto, num, from_unit=None, from_decimal=False):
return get_obj(proto.coin_amt, num=num, from_unit=from_unit, from_decimal=from_decimal, silent=True, return_bool=True)
class BTCAmt(CoinAmt):
coin = 'BTC'
max_prec = 8

View file

@ -1 +1 @@
15.1.dev16
15.1.dev17

View file

@ -70,6 +70,7 @@ class ExtensionModuleError(Exception): mmcode = 2
class MoneroMMGenTXFileParseError(Exception): mmcode = 2
class AutosignTXError(Exception): mmcode = 2
class MMGenImportError(Exception): mmcode = 2
class SwapMemoParseError(Exception): mmcode = 2
# 3: yellow hl, 'MMGen Error' + exception + message
class RPCFailure(Exception): mmcode = 3

82
mmgen/help/swaptxcreate.py Executable file
View file

@ -0,0 +1,82 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
help.swaptxcreate: swaptxcreate and swaptxdo help notes for the MMGen Wallet suite
"""
def help(proto, cfg):
return """
This script is similar in operation to mmgen-txcreate, only with additional
steps. Users are advised to first familiarize themselves with the use of that
script before attempting to perform a swap with this one.
The tracking wallets of both the send and receive coins must be available when
the script is invoked. If the two coin daemons are running on different hosts
than the script, or with non-standard ports, coin-specific RPC options may be
required (see EXAMPLES below).
The swap protocols quote server on the Internet must be reachable either
directly or via the SOCKS5 proxy specified with the --proxy option. To improve
privacy, its recommended to proxy requests to the quote server via Tor or
some other anonymity network.
The resulting transaction file is saved, signed, sent, and optionally bumped,
exactly the same way as one created with mmgen-txcreate. Autosign with
automount is likewise supported via the --autosign option.
The command line must contain at minimum a send coin (COIN1) and receive coin
(COIN2) symbol. Currently supported coins are BTC, LTC and BCH. All other
arguments are optional. If AMT is specified, the specified value of send coin
will be swapped and the rest returned to a change address in the originating
tracking wallet. Otherwise, the entire value of the interactively selected
inputs will be swapped.
By default, the change and destination addresses are chosen automatically by
finding the lowest-indexed unused addresses of the preferred address types in
the send and receive tracking wallets. Types B, S and C (see ADDRESS
TYPES below) are searched in that order for unused addresses.
If the wallet contains eligible unused addresses with multiple Seed IDs, the
user will be presented with a list of the lowest-indexed addresses of
preferred type for each Seed ID and prompted to choose from among them.
Change and destination addresses may also be specified manually with the
CHG_ADDR and ADDR arguments. These may be given as full MMGen IDs or in the
form ADDRTYPE_CODE or SEED_ID:ADDRTYPE_CODE (see EXAMPLES below and the
mmgen-txcreate help screen for details).
While discouraged, sending change or swapping to non-wallet addresses is also
supported, in which case the signing script (mmgen-txsign or mmgen-
autosign, as applicable) must be invoked with the --allow-non-wallet-swap
option.
Rather than specifying a transaction fee on the command line, its advisable
to start with the fee suggested by the swap protocol quote server (the script
does this automatically) and then adjust the fee interactively if desired.
When choosing a fee, bear in mind that the longer the transaction remains
unconfirmed, the greater the risk that the vault address will expire, leading
to loss of funds. Its therefore advisable to learn how to create, sign and
send replacement transactions with mmgen-txbump before performing a swap
with this script. When bumping a stuck swap transaction, the safest option
is to create a replacement transaction with one output that returns funds back
to the originating tracking wallet, thus aborting the swap, rather than one
that merely increases the fee (see EXAMPLES below).
Before broadcasting the transaction, its advisable to double-check the vault
address on a block explorer such as thorchain.net or runescan.io.
The MMGen Node Tools suite contains two useful tools to help with fine-tuning
transaction fees, mmnode-feeview and mmnode-blocks-info, in addition to
mmnode-ticker, which can be used to calculate the current cross-rate between
the asset pair of a swap, as well as the total receive value in terms of the
send value.
"""

View file

@ -0,0 +1,72 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
help.swaptxcreate_examples: swaptxcreate and swaptxdo help examples for the MMGen Wallet suite
"""
from ..cfg import gc
def help(proto, cfg):
return f"""
EXAMPLES:
Create a BTC-to-LTC swap transaction, prompting the user for transaction
inputs. The full value of the inputs, minus miner fees, will be swapped
and sent to an unused address in the users LTC tracking wallet:
$ {gc.prog_name} BTC LTC
Same as above, but swap 0.123 BTC, minus miner fees, and send the change to
an unused address in the BTC tracking wallet:
$ {gc.prog_name} BTC 0.123 LTC
Same as above, but specify that the change address be a Segwit P2SH (S)
address:
$ {gc.prog_name} BTC 0.123 S LTC
Same as above, but additionally specify that the destination LTC address be
a compressed P2PKH (C) address:
$ {gc.prog_name} BTC 0.123 S LTC C
Same as above, but specify the BTC change address explicitly and the
destination LTC address by Seed ID and address type:
$ {gc.prog_name} BTC 0.123 BEADCAFE:S:6 LTC BEADCAFE:C
Abort the above swap by creating a replacement transaction that returns the
funds to the originating tracking wallet (omit the transaction filename if
using --autosign):
$ mmgen-txbump BEADCAFE:S:6 [raw transaction file]
Swap 0.123 BTC to a non-wallet address (not recommended):
$ {gc.prog_name} BTC 0.123 LTC ltc1qaq8t3pakcftpk095tnqfv5cmmczysls0xx9388
Create an LTC-to-BCH swap transaction, with the Litecoin daemon running on
host orion and Bitcoin Cash Node daemon on host gemini with non-standard
RPC port 8332. Communicate with the swap quote server via Tor.
$ {gc.prog_name} --ltc-rpc-host=orion --bch-rpc-host=gemini --bch-rpc-port=8332 --proxy=localhost:9050 LTC BCH
After sending, check the status of the above swaps LTC deposit transaction
(omit the transaction filename if using --autosign):
$ mmgen-txsend --ltc-rpc-host=orion --status [transaction file]
Check whether the funds have arrived in the BCH destination wallet:
$ mmgen-tool --coin=bch --bch-rpc-host=gemini twview minconf=0
"""

View file

@ -57,6 +57,8 @@ opts_data = {
-v, --verbose Produce more verbose output
-w, --wallet-dir=D Specify an alternate wallet dir
(default: {asi.dfl_wallet_dir!r})
-W, --allow-non-wallet-swap Allow signing of swap transactions that send funds
to non-wallet addresses
-x, --xmrwallets=L Range or list of wallets to be used for XMR autosigning
""",
'notes': """

View file

@ -79,6 +79,9 @@ opts_data = {
-- -s, --send Sign and send the transaction (the default if seed
+ data is provided)
-- -v, --verbose Produce more verbose output
-- -W, --allow-non-wallet-swap Allow signing of swap transactions that send funds
+ to non-wallet addresses
-- -x, --proxy=P Fetch the swap quote via SOCKS5 proxy P (host:port)
-- -y, --yes Answer 'yes' to prompts, suppress non-essential output
-- -z, --show-hash-presets Show information on available hash presets
""",
@ -95,6 +98,11 @@ identical to that of ‘mmgen-txcreate’.
The user should take care to select a fee sufficient to ensure the original
transaction is replaced in the mempool.
When bumping a swap transaction, the swap protocols quote server on the
Internet must be reachable either directly or via the SOCKS5 proxy specified
with the --proxy option. To improve privacy, its recommended to proxy
requests to the quote server via Tor or some other anonymity network.
{e}
{s}
Seed source files must have the canonical extensions listed in the 'FileExt'

View file

@ -73,6 +73,7 @@ opts_data = {
+ Choices: {x_all})
-- -v, --verbose Produce more verbose output
b- -V, --vsize-adj= f Adjust transaction's estimated vsize by factor 'f'
-s -x, --proxy=P Fetch the swap quote via SOCKS5 proxy P (host:port)
-- -y, --yes Answer 'yes' to prompts, suppress non-essential output
e- -X, --cached-balances Use cached balances
""",
@ -80,7 +81,7 @@ opts_data = {
},
'code': {
'usage': lambda cfg, proto, help_notes, s: s.format(
u_args = help_notes('txcreate_args')),
u_args = help_notes(f'{target}create_args')),
'options': lambda cfg, proto, help_notes, s: s.format(
cfg = cfg,
cu = proto.coin,
@ -92,10 +93,10 @@ opts_data = {
x_all = fmt_list(cfg._autoset_opts['swap_proto'].choices, fmt='no_spc'),
x_dfl = cfg._autoset_opts['swap_proto'].choices[0]),
'notes': lambda cfg, help_mod, help_notes, s: s.format(
c = help_mod('txcreate'),
c = help_mod(f'{target}create'),
F = help_notes('fee'),
n_at = help_notes('address_types'),
x = help_mod('txcreate_examples'))
x = help_mod(f'{target}create_examples'))
}
}

View file

@ -93,6 +93,7 @@ opts_data = {
+ wallet is scanned for subseeds.
-- -v, --verbose Produce more verbose output
b- -V, --vsize-adj= f Adjust transaction's estimated vsize by factor 'f'
-s -x, --proxy=P Fetch the swap quote via SOCKS5 proxy P (host:port)
e- -X, --cached-balances Use cached balances
-- -y, --yes Answer 'yes' to prompts, suppress non-essential output
-- -z, --show-hash-presets Show information on available hash presets
@ -114,7 +115,7 @@ column below:
},
'code': {
'usage': lambda cfg, proto, help_notes, s: s.format(
u_args = help_notes('txcreate_args')),
u_args = help_notes(f'{target}create_args')),
'options': lambda cfg, proto, help_notes, s: s.format(
gc = gc,
cfg = cfg,
@ -134,12 +135,12 @@ column below:
x_all = fmt_list(cfg._autoset_opts['swap_proto'].choices, fmt='no_spc'),
x_dfl = cfg._autoset_opts['swap_proto'].choices[0]),
'notes': lambda cfg, help_mod, help_notes, s: s.format(
c = help_mod('txcreate'),
c = help_mod(f'{target}create'),
F = help_notes('fee'),
n_at = help_notes('address_types'),
f = help_notes('fmt_codes'),
s = help_mod('txsign'),
x = help_mod('txcreate_examples'))
x = help_mod(f'{target}create_examples'))
}
}

View file

@ -110,6 +110,9 @@ async def main():
tx.info.view_with_prompt('View transaction details?', pause=False)
sys.exit(retval)
if tx.is_swap:
tx.check_swap_expiry()
if not cfg.yes:
tx.info.view_with_prompt('View transaction details?')
if tx.add_comment(): # edits an existing comment, returns true if changed

View file

@ -67,6 +67,8 @@ opts_data = {
wallet is scanned for subseeds.
-v, --verbose Produce more verbose output
-V, --vsize-adj= f Adjust transaction's estimated vsize by factor 'f'
-W, --allow-non-wallet-swap Allow signing of swap transactions that send funds
to non-wallet addresses
-y, --yes Answer 'yes' to prompts, suppress non-essential output
""",
'notes': """

View file

@ -21,6 +21,7 @@ from .cashaddr import cashaddr_decode_addr, cashaddr_encode_addr, cashaddr_addr_
class mainnet(mainnet):
is_fork_of = 'Bitcoin'
mmtypes = ('L', 'C')
preferred_mmtypes = ('C',)
sighash_type = 'ALL|FORKID'
forks = [
_finfo(478559, '000000000000000000651ef99cb9fcbe0dadde1d424bd9f15ff20136191a5eec', 'BTC', False)

View file

@ -26,6 +26,7 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
addr_len = 20
wif_ver_num = {'std': '80'}
mmtypes = ('L', 'C', 'S', 'B')
preferred_mmtypes = ('B', 'S', 'C')
dfl_mmtype = 'L'
coin_amt = 'BTCAmt'
max_tx_fee = 0.003

View file

@ -250,7 +250,10 @@ class Base(TxBase):
# DATA: opcode_byte ('6a') + push_byte + nulldata_bytes
return sum(
{'p2pkh':34, 'p2sh':32, 'bech32':31}[o.addr.addr_fmt] if o.addr else
(11 + len(o.data))
(11 + len(o.data)) if o.data else
# guess value if o.addr is missing (probably a vault address):
34 if self.proto.coin == 'BCH' else
31
for o in self.outputs)
# https://github.com/bitcoin/bips/blob/master/bip-0141.mediawiki
@ -295,6 +298,17 @@ class Base(TxBase):
getattr(self.proto.coin_amt, to_unit) /
self.estimate_size()))
@property
def data_output(self):
res = self.data_outputs
if len(res) > 1:
raise ValueError(f'{res}: too many data outputs in transaction (only one allowed)')
return res[0] if len(res) == 1 else None
@property
def data_outputs(self):
return [o for o in self.outputs if o.data]
@property
def nondata_outputs(self):
return [o for o in self.outputs if not o.data]

View file

@ -14,11 +14,11 @@ proto.btc.tx.bump: Bitcoin transaction bump class
from ....tx import bump as TxBase
from ....util import msg
from .new import New
from .new_swap import NewSwap
from .completed import Completed
from .unsigned import AutomountUnsigned
class Bump(Completed, New, TxBase.Bump):
class Bump(Completed, NewSwap, TxBase.Bump):
desc = 'fee-bumped transaction'
def get_orig_rel_fee(self):

View file

@ -14,7 +14,7 @@ proto.btc.tx.completed: Bitcoin completed transaction class
from ....tx import completed as TxBase
from ....obj import HexStr
from ....util import msg, die
from ....util import msg, ymsg, die
from .base import Base, decodeScriptPubKey
class Completed(Base, TxBase.Completed):
@ -43,6 +43,24 @@ class Completed(Base, TxBase.Completed):
assert (200 < len(ti['scriptSig']) < 300), 'malformed scriptSig' # VERY rough check
return True
def check_swap_memo(self):
if o := self.data_output:
from ....swap.proto.thorchain.memo import Memo
if Memo.is_partial_memo(o.data):
from ....protocol import init_proto
p = Memo.parse(o.data)
assert p.function == 'SWAP', f'{p.function}’: unsupported function in swap memo ‘{o.data}'
assert p.chain == p.asset, f'{p.chain} != {p.asset}: chain/asset mismatch in swap memo ‘{o.data}'
proto = init_proto(self.cfg, p.asset, network=self.cfg.network, need_amt=True)
if self.swap_recv_addr_mmid:
mmid = self.swap_recv_addr_mmid
elif self.cfg.allow_non_wallet_swap:
ymsg('Warning: allowing swap to non-wallet address (--allow-non-wallet-swap)')
mmid = None
else:
raise ValueError('Swap to non-wallet address forbidden (override with --allow-non-wallet-swap)')
return self.Output(proto, addr=p.address, mmid=mmid, amt=proto.coin_amt('0'))
def check_pubkey_scripts(self):
for n, i in enumerate(self.inputs, 1):
ds = decodeScriptPubKey(self.proto, i.scriptPubKey)

View file

@ -65,7 +65,7 @@ class TxInfo(TxInfo):
append_color='green')
else:
return MMGenID.fmtc(
nonmm_str,
'[vault address]' if not is_input and e.is_vault else nonmm_str,
width = max_mmwid,
color = True)

View file

@ -125,14 +125,17 @@ class New(Base, TxNew):
def final_inputs_ok_msg(self, funds_left):
return 'Transaction produces {} {} in change'.format(funds_left.hl(), self.coin)
def check_chg_addr_is_wallet_addr(self, message='Change address is not an MMGen wallet address!'):
def check_chg_addr_is_wallet_addr(self, output=None, message='Change address is not an MMGen wallet address!'):
def do_err():
from ....ui import confirm_or_raise
confirm_or_raise(
cfg = self.cfg,
message = yellow(message),
action = 'Are you sure this is what you want?')
if len(self.nondata_outputs) > 1 and not self.chg_output.mmid:
if output:
if not output.mmid:
do_err()
elif len(self.nondata_outputs) > 1 and not self.chg_output.mmid:
do_err()
async def create_serialized(self, locktime=None):

View file

@ -12,12 +12,135 @@
proto.btc.tx.new_swap: Bitcoin new swap transaction class
"""
from collections import namedtuple
from ....cfg import gc
from ....tx.new_swap import NewSwap as TxNewSwap
from .new import New
class NewSwap(New, TxNewSwap):
desc = 'Bitcoin swap transaction'
async def process_swap_cmdline_args(self, cmd_args, addrfile_args):
import sys
sys.exit(0)
async def get_swap_output(self, proto, arg, addrfiles, desc):
ret = namedtuple('swap_output', ['coin', 'network', 'addr', 'mmid'])
if arg:
from ..addrdata import TwAddrData
pa = self.parse_cmdline_arg(
proto,
arg,
self.get_addrdata_from_files(proto, addrfiles),
await TwAddrData(self.cfg, proto, twctl=None)) # TODO: twctl required for Ethereum
if pa.addr:
await self.warn_addr_used(proto, pa, desc)
return ret(proto.coin, proto.network, pa.addr, pa.mmid)
full_desc = '{} on the {} {} network'.format(desc, proto.coin, proto.network)
res = await self.get_autochg_addr(proto, arg, exclude=[], desc=full_desc, all_addrtypes=not arg)
self.confirm_autoselected_addr(res.twmmid, full_desc)
return ret(proto.coin, proto.network, res.addr, res.twmmid)
async def process_swap_cmdline_args(self, cmd_args, addrfiles):
from ....protocol import init_proto
import importlib
sp = importlib.import_module(f'mmgen.swap.proto.{self.swap_proto}')
class CmdlineArgs: # listed in command-line order
# send_coin # required: uppercase coin symbol
send_amt = None # optional: Omit to skip change addr and send value of all inputs minus fees to vault
chg_spec = None # optional: change address spec, e.g. ‘B’ ‘DEADBEEF:B’ ‘DEADBEEF:B:1’ or coin address.
# Omit for autoselected change address. Use of non-wallet change address
# will emit warning and prompt user for confirmation
# recv_coin # required: uppercase coin symbol
recv_spec = None # optional: destination address spec. Same rules as for chg_spec
def check_coin_arg(coin, desc):
if coin not in sp.params.coins[desc]:
raise ValueError(f'{coin!r}: unsupported {desc} coin for {gc.proj_name} {sp.name} swap')
return coin
def get_arg():
try:
return args_in.pop(0)
except:
self.cfg._usage()
def init_proto_from_coin(coinsym, desc):
return init_proto(
self.cfg,
check_coin_arg(coinsym, desc),
network = self.proto.network,
need_amt = True)
def parse():
from ....amt import is_coin_amt
arg = get_arg()
# arg 1: send_coin
self.send_proto = init_proto_from_coin(arg, 'send')
arg = get_arg()
# arg 2: amt
if is_coin_amt(self.send_proto, arg):
args.send_amt = self.send_proto.coin_amt(arg)
arg = get_arg()
# arg 3: chg_spec (change address spec)
if args.send_amt:
if not arg in sp.params.coins['receive']: # is change arg
args.chg_spec = arg
arg = get_arg()
# arg 4: recv_coin
self.recv_proto = init_proto_from_coin(arg, 'receive')
# arg 5: recv_spec (receive address spec)
if args_in:
args.recv_spec = get_arg()
if args_in: # done parsing, all args consumed
self.cfg._usage()
args_in = list(cmd_args)
args = CmdlineArgs()
parse()
chg_output = (
await self.get_swap_output(self.send_proto, args.chg_spec, addrfiles, 'change address')
if args.send_amt else None)
if chg_output:
self.check_chg_addr_is_wallet_addr(chg_output)
recv_output = await self.get_swap_output(self.recv_proto, args.recv_spec, addrfiles, 'destination address')
self.check_chg_addr_is_wallet_addr(
recv_output,
message = (
'Swap destination address is not an MMGen wallet address!\n'
'To sign this transaction, autosign or txsign must be invoked with --allow-non-wallet-swap'))
memo = sp.data(self.recv_proto, recv_output.addr)
# this goes into the transaction file:
self.swap_recv_addr_mmid = recv_output.mmid
return (
[f'vault,{args.send_amt}', chg_output.mmid, f'data:{memo}'] if args.send_amt else
['vault', f'data:{memo}'])
def update_vault_addr(self, addr):
vault_idx = self.vault_idx
assert vault_idx == 0, f'{vault_idx}: vault index is not zero!'
o = self.outputs[vault_idx]._asdict()
o['addr'] = addr
self.outputs[vault_idx] = self.Output(self.proto, **o)
@property
def vault_idx(self):
return self._chg_output_ops('idx', 'is_vault')
@property
def vault_output(self):
return self._chg_output_ops('output', 'is_vault')

View file

@ -21,6 +21,7 @@ class mainnet(CoinProtocol.DummyWIF, CoinProtocol.Secp256k1):
network_names = _nw('mainnet', 'testnet', 'devnet')
addr_len = 20
mmtypes = ('E',)
preferred_mmtypes = ('E',)
dfl_mmtype = 'E'
mod_clsname = 'Ethereum'
pubkey_type = 'std' # required by DummyWIF

View file

@ -27,6 +27,9 @@ class Completed(Base, TxBase.Completed):
self.gas = self.proto.coin_amt(self.dfl_gas, from_unit='wei')
self.start_gas = self.proto.coin_amt(self.dfl_start_gas, from_unit='wei')
def check_swap_memo(self):
pass
@property
def send_amt(self):
return self.outputs[0].amt if self.outputs else self.proto.coin_amt('0')

View file

@ -0,0 +1,25 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.proto.thorchain: THORChain swap protocol implementation for the MMGen Wallet suite
"""
__all__ = ['params', 'data']
name = 'THORChain'
from .params import params
from .memo import Memo as data
def rpc_client(tx, amt):
from .midgard import Midgard
return Midgard(tx, amt)

View file

@ -0,0 +1,134 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.proto.thorchain.memo: THORChain swap protocol memo class
"""
from . import name as proto_name
class Memo:
# The trade limit, i.e., set 100000000 to get a minimum of 1 full asset, else a refund
# Optional. 1e8 or scientific notation
trade_limit = 0
# Swap interval in blocks. Optional. If 0, do not stream
stream_interval = 1
# Swap quantity. The interval value determines the frequency of swaps in blocks
# Optional. If 0, network will determine the number of swaps
stream_quantity = 0
max_len = 250
function = 'SWAP'
asset_abbrevs = {
'BTC.BTC': 'b',
'LTC.LTC': 'l',
'BCH.BCH': 'c',
'ETH.ETH': 'e',
'DOGE.DOGE': 'd',
'THOR.RUNE': 'r',
}
function_abbrevs = {
'SWAP': '=',
}
@classmethod
def is_partial_memo(cls, s):
import re
ops = {
'swap': ('SWAP', 's', '='),
'add': ('ADD', 'a', r'\+'),
'withdraw': ('WITHDRAW', 'wd', '-'),
'loan': (r'LOAN(\+|-)', r'\$(\+|-)'), # open/repay
'pool': (r'POOL(\+|-)',),
'trade': (r'TRADE(\+|-)',),
'secure': (r'SECURE(\+|-)',),
'misc': ('BOND', 'UNBOND', 'LEAVE', 'MIGRATE', 'NOOP', 'DONATE', 'RESERVE'),
}
pat = r'^(' + '|'.join('|'.join(pats) for pats in ops.values()) + r'):\S\S+'
return bool(re.search(pat, str(s)))
@classmethod
def parse(cls, s):
"""
All fields are validated, excluding address (cannot validate, since network is unknown)
"""
from collections import namedtuple
from ....exception import SwapMemoParseError
from ....util import is_int
def get_item(desc):
try:
return fields.pop(0)
except IndexError as e:
raise SwapMemoParseError(f'malformed {proto_name} memo (missing {desc} field)') from e
def get_id(data, item, desc):
if item in data:
return item
rev_data = {v:k for k,v in data.items()}
if item in rev_data:
return rev_data[item]
raise SwapMemoParseError(f'{item!r}: unrecognized {proto_name} {desc} abbreviation')
fields = str(s).split(':')
if len(fields) < 4:
raise SwapMemoParseError('memo must contain at least 4 comma-separated fields')
function = get_id(cls.function_abbrevs, get_item('function'), 'function')
chain, asset = get_id(cls.asset_abbrevs, get_item('asset'), 'asset').split('.')
address = get_item('address')
desc = 'trade_limit/stream_interval/stream_quantity'
lsq = get_item(desc)
try:
limit, interval, quantity = lsq.split('/')
except ValueError as e:
raise SwapMemoParseError(f'malformed memo (failed to parse {desc} field) [{lsq}]') from e
for n in (limit, interval, quantity):
if not is_int(n):
raise SwapMemoParseError(f'malformed memo (non-integer in {desc} field [{lsq}])')
if fields:
raise SwapMemoParseError('malformed memo (unrecognized extra data)')
ret = namedtuple(
'parsed_memo',
['proto', 'function', 'chain', 'asset', 'address', 'trade_limit', 'stream_interval', 'stream_quantity'])
return ret(proto_name, function, chain, asset, address, int(limit), int(interval), int(quantity))
def __init__(self, proto, addr, chain=None):
self.proto = proto
self.chain = chain or proto.coin
from ....addr import CoinAddr
assert isinstance(addr, CoinAddr)
self.addr = addr.views[addr.view_pref]
assert not ':' in self.addr # colon is record separator, so address mustn’t contain one
def __str__(self):
suf = '/'.join(str(n) for n in (self.trade_limit, self.stream_interval, self.stream_quantity))
asset = f'{self.chain}.{self.proto.coin}'
ret = ':'.join([
self.function_abbrevs[self.function],
self.asset_abbrevs[asset],
self.addr,
suf])
assert len(ret) <= self.max_len, f'{proto_name} memo exceeds maximum length of {self.max_len}'
return ret

View file

@ -0,0 +1,113 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.proto.thorchain.midgard: THORChain swap protocol network query ops
"""
import json
class MidgardRPCClient:
http_hdrs = {'Content-Type': 'application/json'}
proto = 'https'
host = 'thornode.ninerealms.com'
verify = True
timeout = 5
def __init__(self, tx, proto=None, host=None):
self.cfg = tx.cfg
if proto:
self.proto = proto
if host:
self.host = host
import requests
self.session = requests.Session()
self.session.trust_env = False # ignore *_PROXY environment vars
self.session.headers = self.http_hdrs
if self.cfg.proxy:
self.session.proxies.update({
'http': f'socks5h://{self.cfg.proxy}',
'https': f'socks5h://{self.cfg.proxy}'
})
def get(self, path, timeout=None):
return self.session.get(
url = self.proto + '://' + self.host + path,
timeout = timeout or self.timeout,
verify = self.verify)
class Midgard:
def __init__(self, tx, amt):
self.tx = tx
self.in_amt = amt
self.rpc = MidgardRPCClient(tx)
def get_quote(self):
self.get_str = '/thorchain/quote/swap?from_asset={a}.{a}&to_asset={b}.{b}&amount={c}'.format(
a = self.tx.send_proto.coin,
b = self.tx.recv_proto.coin,
c = self.in_amt.to_unit('satoshi'))
self.result = self.rpc.get(self.get_str)
self.data = json.loads(self.result.content)
def format_quote(self):
from ....util import make_timestr, pp_fmt, die
from ....util2 import format_elapsed_hr
from ....color import blue, cyan, pink, orange
from . import name
d = self.data
if not 'expiry' in d:
die(2, pp_fmt(d))
tx = self.tx
in_coin = tx.send_proto.coin
out_coin = tx.recv_proto.coin
out_amt = tx.recv_proto.coin_amt(int(d['expected_amount_out']), from_unit='satoshi')
min_in_amt = tx.send_proto.coin_amt(int(d['recommended_min_amount_in']), from_unit='satoshi')
gas_unit = {
'satsperbyte': 'sat/byte',
}.get(d['gas_rate_units'], d['gas_rate_units'])
elapsed_disp = format_elapsed_hr(d['expiry'], future_msg='from now')
fees = d['fees']
fees_t = tx.recv_proto.coin_amt(int(fees['total']), from_unit='satoshi')
fees_pct_disp = str(fees['total_bps'] / 100) + '%'
slip_pct_disp = str(fees['slippage_bps'] / 100) + '%'
hdr = f'SWAP QUOTE (source: {self.rpc.host})'
return f"""
{cyan(hdr)}
Protocol: {blue(name)}
Direction: {orange(f'{in_coin} => {out_coin}')}
Vault address: {cyan(d['inbound_address'])}
Quote expires: {pink(elapsed_disp)} [{make_timestr(d['expiry'])}]
Amount in: {self.in_amt.hl()} {in_coin}
Expected amount out: {out_amt.hl()} {out_coin}
Rate: {(out_amt / self.in_amt).hl()} {out_coin}/{in_coin}
Reverse rate: {(self.in_amt / out_amt).hl()} {in_coin}/{out_coin}
Recommended minimum in amount: {min_in_amt.hl()} {in_coin}
Recommended fee: {pink(d['recommended_gas_rate'])} {pink(gas_unit)}
Fees:
Total: {fees_t.hl()} {out_coin} ({pink(fees_pct_disp)})
Slippage: {pink(slip_pct_disp)}
"""
@property
def inbound_address(self):
return self.data['inbound_address']
@property
def rel_fee_hint(self):
if self.data['gas_rate_units'] == 'satsperbyte':
return f'{self.data["recommended_gas_rate"]}s'
def __str__(self):
from pprint import pformat
return pformat(self.data)

View file

@ -0,0 +1,28 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
swap.proto.thorchain.params: THORChain swap protocol parameters
"""
class params:
coins = {
'send': {
'BTC': 'Bitcoin',
'LTC': 'Litecoin',
'BCH': 'Bitcoin Cash',
},
'receive': {
'BTC': 'Bitcoin',
'LTC': 'Litecoin',
'BCH': 'Bitcoin Cash',
}
}

View file

@ -338,6 +338,8 @@ class TwAddresses(TwView):
Find the lowest-indexed change addresses in tracking wallet of given address type,
present them in a menu and return a single change address chosen by the user.
If mmtype is None, search all preferred_mmtypes in tracking wallet
Return values on failure:
None: no addresses in wallet of requested address type
False: no unused addresses in wallet of requested address type
@ -363,10 +365,24 @@ class TwAddresses(TwView):
return addrs[int(res)-1]
msg(f'{res}: invalid entry')
assert isinstance(mmtype, MMGenAddrType)
def get_addr(mmtype):
return [self.get_change_address(f'{sid}:{mmtype}', r.bot, r.top, exclude=exclude, desc=desc)
for sid, r in self.sid_ranges.items()]
res = [self.get_change_address(f'{sid}:{mmtype}', r.bot, r.top, exclude)
for sid, r in self.sid_ranges.items()]
assert isinstance(mmtype, (type(None), MMGenAddrType))
if mmtype:
res = get_addr(mmtype)
else:
have_used = False
for mmtype in self.proto.preferred_mmtypes:
res = get_addr(mmtype)
if any(res):
break
if False in res:
have_used = True
else:
return False if have_used else None
if any(res):
res = list(filter(None, res))

View file

@ -81,6 +81,9 @@ class Base(MMGenObject):
signed = False
is_bump = False
is_swap = False
swap_proto = None
swap_quote_expiry = None
swap_recv_addr_mmid = None
file_format = 'json'
non_mmgen_inputs_msg = f"""
This transaction includes inputs with non-{gc.proj_name} addresses. When
@ -100,6 +103,7 @@ class Base(MMGenObject):
class Output(MMGenTxIO):
is_chg = ListItemAttr(bool, typeconv=False)
is_vault = ListItemAttr(bool, typeconv=False)
data = ListItemAttr(None, typeconv=False) # placeholder
class InputList(MMGenTxIOList):

View file

@ -12,17 +12,21 @@
tx.bump: transaction bump class
"""
from .new import New
from .new_swap import NewSwap
from .completed import Completed
from ..util import msg, ymsg, is_int, die
from ..color import pink
class Bump(Completed, New):
class Bump(Completed, NewSwap):
desc = 'fee-bumped transaction'
ext = 'rawtx'
bump_output_idx = None
is_bump = True
swap_attrs = ('is_swap',)
swap_attrs = (
'is_swap',
'swap_proto',
'swap_quote_expiry',
'swap_recv_addr_mmid')
def __init__(self, *, check_sent, new_outputs, **kwargs):
@ -79,7 +83,16 @@ class Bump(Completed, New):
pink(self.fee_abs2rel(self.min_fee)),
self.rel_fee_disp))
self.usr_fee = self.get_usr_fee_interactive(fee=self.cfg.fee, desc='User-selected')
if self.is_swap:
self.send_proto = self.proto
self.recv_proto = self.check_swap_memo().proto
fee_hint = self.update_vault_output(self.send_amt)
else:
fee_hint = None
self.usr_fee = self.get_usr_fee_interactive(
fee = self.cfg.fee or fee_hint,
desc = 'User-selected' if self.cfg.fee else 'Recommended' if fee_hint else None)
self.bump_fee(output_idx, self.usr_fee)

View file

@ -70,7 +70,10 @@ class MMGenTxFile(MMGenObject):
'comment': MMGenTxComment,
'coin_txid': CoinTxID,
'sent_timestamp': None,
'is_swap': None}
'is_swap': None,
'swap_proto': None,
'swap_quote_expiry': None,
'swap_recv_addr_mmid': None}
def __init__(self, tx):
self.tx = tx

View file

@ -15,7 +15,7 @@ tx.info: transaction info class
import importlib
from ..cfg import gc
from ..color import red, green, cyan, orange
from ..color import red, green, cyan, orange, blue, yellow, magenta
from ..util import msg, msg_r, decode_timestamp, make_timestr
from ..util2 import format_elapsed_hr
@ -51,7 +51,7 @@ class TxInfo:
def gen_view():
yield (self.txinfo_hdr_fs_short if terse else self.txinfo_hdr_fs).format(
hdr = cyan('TRANSACTION DATA'),
hdr = cyan(('SWAP ' if tx.is_swap else '') + 'TRANSACTION DATA'),
i = tx.txid.hl(),
a = tx.send_amt.hl(),
c = tx.dcoin,
@ -72,6 +72,18 @@ class TxInfo:
if tx.coin_txid:
yield f' {tx.coin} TxID: {tx.coin_txid.hl()}\n'
if tx.is_swap:
from ..swap.proto.thorchain.memo import Memo, proto_name
if Memo.is_partial_memo(tx.data_output.data):
p = Memo.parse(tx.data_output.data)
yield ' {} {}\n'.format(magenta('DEX Protocol:'), blue(proto_name))
yield ' Swap: {}\n'.format(orange(f'{tx.proto.coin} => {p.asset}'))
yield ' Dest: {}{}\n'.format(
cyan(p.address),
orange(f' ({tx.swap_recv_addr_mmid})') if tx.swap_recv_addr_mmid else '')
if not tx.swap_recv_addr_mmid:
yield yellow(' Warning: swap destination address is not a wallet address!\n')
enl = ('\n', '')[bool(terse)]
yield enl

View file

@ -170,24 +170,27 @@ class New(Base):
return False
return True
def add_output(self, coinaddr, amt, is_chg=False, data=None):
self.outputs.append(self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, data=data))
def add_output(self, coinaddr, amt, is_chg=False, is_vault=False, data=None):
self.outputs.append(
self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, is_vault=is_vault, data=data))
def process_data_output_arg(self, arg):
return None
def parse_cmdline_arg(self, proto, arg_in, ad_f, ad_w):
_pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data'])
_pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data', 'is_vault'])
if data := self.process_data_output_arg(arg_in):
return _pa(arg_in, None, None, None, data)
return _pa(arg_in, None, None, None, data, False)
arg, amt = arg_in.split(',', 1) if ',' in arg_in else (arg_in, None)
coin_addr, mmid = (None, None)
coin_addr, mmid, is_vault = (None, None, False)
if mmid := get_obj(MMGenID, proto=proto, id_str=arg, silent=True):
if arg == 'vault' and self.is_swap:
is_vault = True
elif mmid := get_obj(MMGenID, proto=proto, id_str=arg, silent=True):
coin_addr = mmaddr2coinaddr(self.cfg, arg, ad_w, ad_f, proto)
elif is_coin_addr(proto, arg):
coin_addr = CoinAddr(proto, arg)
@ -198,13 +201,16 @@ class New(Base):
else:
die(2, f'{arg_in}: invalid command-line argument')
return _pa(arg, mmid, coin_addr, amt, None)
return _pa(arg, mmid, coin_addr, amt, None, is_vault)
async def get_autochg_addr(self, proto, arg, exclude, desc):
async def get_autochg_addr(self, proto, arg, exclude, desc, all_addrtypes=False):
from ..tw.addresses import TwAddresses
al = await TwAddresses(self.cfg, proto, get_data=True)
if obj := get_obj(MMGenAddrType, proto=proto, id_str=arg, silent=True):
if all_addrtypes:
res = al.get_change_address_by_addrtype(None, exclude=exclude, desc=desc)
req_desc = 'of any allowed address type'
elif obj := get_obj(MMGenAddrType, proto=proto, id_str=arg, silent=True):
res = al.get_change_address_by_addrtype(obj, exclude=exclude, desc=desc)
req_desc = f'of address type {arg!r}'
else:
@ -233,14 +239,15 @@ class New(Base):
self.add_output(None, self.proto.coin_amt('0'), data=a.data)
else:
self.add_output(
coinaddr = a.addr or (
coinaddr = None if a.is_vault else a.addr or (
await self.get_autochg_addr(
self.proto,
a.arg,
exclude = [a.mmid for a in parsed_args if a.mmid],
desc = 'change address')).addr,
amt = self.proto.coin_amt(a.amt or '0'),
is_chg = not a.amt)
is_chg = not a.amt,
is_vault = a.is_vault)
if self.chg_idx is None:
die(2,
@ -261,7 +268,7 @@ class New(Base):
self.check_dup_addrs('outputs')
if self.chg_output is not None:
if self.chg_autoselected:
if self.chg_autoselected and not self.is_swap: # swap TX, so user has already confirmed
self.confirm_autoselected_addr(self.chg_output.mmid, 'change address')
elif len(self.nondata_outputs) > 1:
await self.warn_addr_used(self.proto, self.chg_output, 'change address')
@ -430,8 +437,8 @@ class New(Base):
if not do_info:
cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
if self.is_swap:
# updates self.proto!
self.proto, cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
self.proto = self.send_proto # updating self.proto!
from ..rpc import rpc_init
self.rpc = await rpc_init(self.cfg, self.proto)
from ..addrdata import TwAddrData
@ -489,6 +496,9 @@ class New(Base):
if not self.cfg.yes:
self.add_comment() # edits an existing comment
if self.is_swap:
self.update_vault_output(self.vault_output.amt)
await self.create_serialized(locktime=locktime) # creates self.txid too
self.add_timestamp()

View file

@ -16,7 +16,27 @@ from .new import New
class NewSwap(New):
desc = 'swap transaction'
is_swap = True
async def process_swap_cmdline_args(self, cmd_args, addrfiles):
raise NotImplementedError(f'Swap not implemented for protocol {self.proto.__name__}')
def update_vault_output(self, amt):
import importlib
sp = importlib.import_module(f'mmgen.swap.proto.{self.swap_proto}')
c = sp.rpc_client(self, amt)
from ..util import msg
from ..term import get_char
while True:
self.cfg._util.qmsg(f'Retrieving data from {c.rpc.host}...')
c.get_quote()
self.cfg._util.qmsg('OK')
msg(c.format_quote())
ch = get_char('Press ‘r’ to refresh quote, any other key to continue: ')
msg('')
if ch not in 'Rr':
break
self.swap_quote_expiry = c.data['expiry']
self.update_vault_addr(c.inbound_address)
return c.rel_fee_hint

View file

@ -21,6 +21,22 @@ class OnlineSigned(Signed):
from . import _base_proto_subclass
return _base_proto_subclass('Status', 'status', self.proto)(self)
def check_swap_expiry(self):
import time
from ..util import msg, make_timestr, die
from ..util2 import format_elapsed_hr
from ..color import pink, yellow
expiry = self.swap_quote_expiry
now = int(time.time())
t_rem = expiry - now
clr = yellow if t_rem < 0 else pink
msg('Swap quote {a} {b} [{c}]'.format(
a = clr('expired' if t_rem < 0 else 'expires'),
b = clr(format_elapsed_hr(expiry, now=now, future_msg='from now')),
c = make_timestr(expiry)))
if t_rem < 0:
die(2, 'Swap quote has expired. Please re-create the transaction')
def confirm_send(self):
from ..util import msg
from ..ui import confirm_or_raise

View file

@ -178,12 +178,18 @@ async def txsign(cfg_parm, tx, seed_files, kl, kal, tx_num_str='', passwd_file=N
sep + sep.join(missing)))
keys += tmp.data
sm_output = tx.check_swap_memo() # do this for non-swap transactions too!
if cfg.mmgen_keys_from_file:
keys += add_keys('inputs', tx.inputs, keyaddr_list=kal)
add_keys('outputs', tx.outputs, keyaddr_list=kal)
if sm_output:
add_keys('swap destination address', [sm_output], keyaddr_list=kal)
keys += add_keys('inputs', tx.inputs, seed_files, saved_seeds)
add_keys('outputs', tx.outputs, seed_files, saved_seeds)
if sm_output:
add_keys('swap destination address', [sm_output], seed_files, saved_seeds)
# this (boolean) attr isn't needed in transaction file
tx.delete_attrs('inputs', 'have_wif')

View file

@ -86,6 +86,9 @@ packages =
mmgen.proto.secp256k1
mmgen.proto.xmr
mmgen.proto.zec
mmgen.swap
mmgen.swap.proto
mmgen.swap.proto.thorchain
mmgen.tool
mmgen.tx
mmgen.tw

View file

@ -1194,8 +1194,8 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
t.expect(f'Mined {num_blocks} block')
return t
def _do_cli(self, cmd_args, decode_json=False):
return self._do_mmgen_regtest(['cli'] + cmd_args, decode_json=decode_json)
def _do_cli(self, cmd_args, add_opts=[], decode_json=False):
return self._do_mmgen_regtest(add_opts + ['cli'] + cmd_args, decode_json=decode_json)
def _do_mmgen_regtest(self, cmd_args, decode_json=False):
ret = self.spawn(

View file

@ -12,39 +12,54 @@
test.cmdtest_d.ct_swap: asset swap tests for the cmdtest.py test suite
"""
from pathlib import Path
from mmgen.protocol import init_proto
from ..include.common import gr_uc
from .ct_regtest import CmdTestRegtest, rt_data, dfl_wcls, rt_pw, cfg
from ..include.common import make_burn_addr, gr_uc
from .common import dfl_bip39_file
from .midgard import run_midgard_server
from .ct_autosign import CmdTestAutosign, CmdTestAutosignThreaded
from .ct_regtest import CmdTestRegtest, rt_data, dfl_wcls, rt_pw, cfg, strip_ansi_escapes
sample1 = gr_uc[:24]
sample2 = '00010203040506'
class CmdTestSwap(CmdTestRegtest):
def midgard_server_start():
import threading
t = threading.Thread(target=run_midgard_server, name='Midgard server thread')
t.daemon = True
t.start()
class CmdTestSwap(CmdTestRegtest, CmdTestAutosignThreaded):
bdb_wallet = True
networks = ('btc',)
tmpdir_nums = [37]
passthru_opts = ('rpc_backend',)
coins = ('btc',)
need_daemon = True
cmd_group_in = (
('setup', 'regtest (Bob and Alice) mode setup'),
('subgroup.init_bob', []),
('subgroup.fund_bob', ['init_bob']),
('subgroup.data', ['fund_bob']),
('subgroup.swap', ['fund_bob']),
('stop', 'stopping regtest daemon'),
('subgroup.init_data', []),
('subgroup.data', ['init_data']),
('subgroup.init_swap', []),
('subgroup.create', ['init_swap']),
('subgroup.create_bad', ['init_swap']),
('subgroup.signsend', ['init_swap']),
('subgroup.signsend_bad', ['init_swap']),
('subgroup.autosign', ['init_data', 'signsend']),
('stop', 'stopping regtest daemons'),
)
cmd_subgroups = {
'init_bob': (
'creating Bob’s MMGen wallet and tracking wallet',
('walletgen_bob', 'wallet generation (Bob)'),
('addrgen_bob', 'address generation (Bob)'),
('addrimport_bob', 'importing Bob’s addresses'),
),
'fund_bob': (
'funding Bob’s wallet',
('fund_bob1', 'funding Bob’s wallet (bech32)'),
('fund_bob2', 'funding Bob’s wallet (native Segwit)'),
('bob_bal', 'displaying Bob’s balance'),
'init_data': (
'Initialize regtest setup for OP_RETURN data operations',
('setup', 'regtest (Bob and Alice) mode setup'),
('walletcreate_bob', 'wallet creation (Bob)'),
('addrgen_bob', 'address generation (Bob)'),
('addrimport_bob', 'importing Bob’s addresses'),
('fund_bob1', 'funding Bob’s wallet (bech32)'),
('fund_bob2', 'funding Bob’s wallet (native Segwit)'),
('bob_bal', 'displaying Bob’s balance'),
),
'data': (
'OP_RETURN data operations',
@ -58,15 +73,101 @@ class CmdTestSwap(CmdTestRegtest):
('generate3', 'Generate 3 blocks'),
('bob_listaddrs', 'Display Bob’s addresses'),
),
'swap': (
'Swap operations',
('bob_swaptxcreate1', 'Create a swap transaction'),
'init_swap': (
'Initialize regtest setup for swap operations',
('setup_send_coin', 'setting up the sending coin regtest blockchain'),
('walletcreate_bob', 'wallet creation (Bob)'),
('addrgen_bob_send', 'address generation (Bob, sending coin)'),
('addrimport_bob_send', 'importing Bob’s addresses (sending coin)'),
('fund_bob_send', 'funding Bob’s wallet (bech32)'),
('bob_bal_send', 'displaying Bob’s send balance'),
('setup_recv_coin', 'setting up the receiving coin regtest blockchain'),
('addrgen_bob_recv', 'address generation (Bob, receiving coin)'),
('addrimport_bob_recv', 'importing Bob’s addresses (receiving coin)'),
('fund_bob_recv1', 'funding Bob’s wallet (bech32)'),
('fund_bob_recv2', 'funding Bob’s wallet (native Segwit)'),
('addrgen_bob_recv_subwallet', 'address generation (Bob, receiving coin)'),
('addrimport_bob_recv_subwallet', 'importing Bob’s addresses (receiving coin)'),
('fund_bob_recv_subwallet', 'funding Bob’s subwwallet (native Segwit)'),
('bob_bal_recv', 'displaying Bob’s receive balance'),
),
'create': (
'Swap TX create operations (BCH => LTC)',
('swaptxcreate1', 'creating a swap transaction (full args)'),
('swaptxcreate2', 'creating a swap transaction (coin args only)'),
('swaptxcreate3', 'creating a swap transaction (no chg arg)'),
('swaptxcreate4', 'creating a swap transaction (chg and dest by addrtype)'),
('swaptxcreate5', 'creating a swap transaction (chg and dest by addrlist ID)'),
('swaptxcreate6', 'creating a swap transaction (dest is non-wallet addr)'),
('swaptxcreate7', 'creating a swap transaction (coin-amt-coin)'),
),
'create_bad': (
'Swap TX create operations: error handling',
('swaptxcreate_bad1', 'creating a swap transaction (bad, used destination address)'),
('swaptxcreate_bad2', 'creating a swap transaction (bad, used change address)'),
('swaptxcreate_bad3', 'creating a swap transaction (bad, unsupported send coin)'),
('swaptxcreate_bad4', 'creating a swap transaction (bad, unsupported recv coin)'),
('swaptxcreate_bad5', 'creating a swap transaction (bad, malformed cmdline)'),
('swaptxcreate_bad6', 'creating a swap transaction (bad, malformed cmdline)'),
('swaptxcreate_bad7', 'creating a swap transaction (bad, bad user input, user exit)'),
('swaptxcreate_bad8', 'creating a swap transaction (bad, non-MMGen change address)'),
('swaptxcreate_bad9', 'creating a swap transaction (bad, invalid addrtype)'),
),
'signsend': (
'Swap TX create, sign and send operations (LTC => BCH)',
('swaptxsign1_create', 'creating a swap transaction (full args)'),
('swaptxsign1', 'signing the transaction'),
('swaptxsend1', 'sending the transaction'),
('swaptxsend1_status', 'getting status of sent transaction'),
('generate1', 'generating a block'),
('swaptxsign2_create', 'creating a swap transaction (non-wallet swap address)'),
('swaptxsign2', 'signing the transaction'),
('swaptxsend2', 'sending the transaction'),
('mempool1', 'viewing the mempool'),
('swaptxbump1', 'bumping the transaction'),
('swaptxsign3', 'signing the transaction'),
('swaptxsend3', 'sending the transaction'),
('mempool1', 'viewing the mempool'),
('swaptxbump2', 'bumping the transaction again'),
('swaptxsign4', 'signing the transaction'),
('swaptxsend4', 'sending the transaction'),
('mempool1', 'viewing the mempool'),
('generate1', 'generating a block'),
('swap_bal1', 'checking the balance'),
('swaptxsign1_do', 'creating, signing and sending a swap transaction'),
('generate1', 'generating a block'),
('swap_bal2', 'checking the balance'),
),
'signsend_bad': (
'Swap TX create, sign and send operations: error handling',
('swaptxsign_bad1_create', 'creating a swap transaction (non-wallet swap address)'),
('swaptxsign_bad1', 'signing the transaction (non-wallet swap address)'),
('swaptxsign_bad2_create', 'creating a swap transaction'),
('swaptxsign_bad2', 'signing the transaction'),
('swaptxsend_bad2', 'sending the transaction (swap quote expired)'),
),
'autosign': (
'Swap TX operations with autosigning (BTC => LTC)',
('run_setup_bip39', 'setting up offline autosigning'),
('swap_wait_loop_start', 'starting autosign wait loop'),
('autosign_swaptxcreate1', 'creating a swap transaction'),
('autosign_swaptxsend1', 'sending the transaction'),
('autosign_swaptxbump1', 'bumping the transaction'),
('autosign_swaptxsend2', 'sending the transaction'),
('generate0', 'generating a block'),
('swap_bal3', 'checking the balance'),
('wait_loop_kill', 'stopping autosign wait loop'),
),
}
def __init__(self, trunner, cfgs, spawn):
super().__init__(trunner, cfgs, spawn)
CmdTestAutosignThreaded.__init__(self, trunner, cfgs, spawn)
CmdTestRegtest.__init__(self, trunner, cfgs, spawn)
if trunner is None:
return
globals_dict = globals()
for k in rt_data:
@ -74,10 +175,29 @@ class CmdTestSwap(CmdTestRegtest):
self.protos = [init_proto(cfg, k, network='regtest', need_amt=True) for k in ('btc', 'ltc', 'bch')]
midgard_server_start() # TODO: stop server when test group finishes executing
self.opts.append('--bob')
@property
def sid(self):
return self._user_sid('bob')
def walletcreate_bob(self):
dest = Path(self.tr.data_dir, 'regtest', 'bob')
dest.mkdir(exist_ok=True)
t = self.spawn('mmgen-walletconv', [
'--quiet',
'--usr-randchars=0',
'--hash-preset=1',
'--label=SwapWalletLabel',
f'--outdir={str(dest)}',
dfl_bip39_file])
t.expect('wallet: ', rt_pw + '\n')
t.expect('phrase: ', rt_pw + '\n')
t.written_to_file('wallet')
return t
def _addrgen_bob(self, proto_idx, mmtypes, subseed_idx=None):
return self.addrgen('bob', subseed_idx=subseed_idx, mmtypes=mmtypes, proto=self.protos[proto_idx])
@ -186,8 +306,412 @@ class CmdTestSwap(CmdTestRegtest):
t = self.spawn('mmgen-tool', ['--bob', 'listaddresses'])
return t
def bob_swaptxcreate1(self):
t = self.spawn(
'mmgen-swaptxcreate',
['-d', self.tmpdir, '-B', '--bob', 'BTC', '1.234', f'{self.sid}:S:3', 'LTC'])
def setup_send_coin(self):
self.user_sids = {}
return self._setup(proto=self.protos[2], remove_datadir=True)
def addrgen_bob_send(self):
return self._addrgen_bob(2, ['C'])
def addrimport_bob_send(self):
return self.addrimport('bob', mmtypes=['C'], proto=self.protos[2])
def fund_bob_send(self):
return self._fund_bob(2, 'C', '500')
def bob_bal_send(self):
return self._bob_bal(2, '500')
def setup_recv_coin(self):
return self._setup(proto=self.protos[1], remove_datadir=False)
def addrgen_bob_recv(self):
return self._addrgen_bob(1, ['S', 'B'])
def addrimport_bob_recv(self):
return self._addrimport_bob(1)
def fund_bob_recv1(self):
return self._fund_bob(1, 'S', '500')
def fund_bob_recv2(self):
return self._fund_bob(1, 'B', '500')
def addrgen_bob_recv_subwallet(self):
return self._addrgen_bob(1, ['C', 'B'], subseed_idx='29L')
def addrimport_bob_recv_subwallet(self):
return self._subwallet_addrimport('bob', '29L', ['C', 'B'], proto=self.protos[1])
def fund_bob_recv_subwallet(self, proto_idx=1, amt='500'):
coin_arg = f'--coin={self.protos[proto_idx].coin}'
t = self.spawn('mmgen-tool', ['--bob', coin_arg, 'listaddresses'])
addr = [s for s in strip_ansi_escapes(t.read()).splitlines() if 'C:1 No' in s][0].split()[3]
t = self.spawn('mmgen-regtest', [coin_arg, 'send', addr, str(amt)], no_passthru_opts=True, no_msg=True)
return t
def bob_bal_recv(self):
return self._bob_bal(1, '1500')
def _swaptxcreate_ui_common(
self,
t,
*,
inputs = '1',
interactive_fee = None,
file_desc = 'Unsigned transaction',
reload_quote = False,
sign_and_send = False):
t.expect('abel:\b', 'q')
t.expect('to spend: ', f'{inputs}\n')
if reload_quote:
t.expect('to continue: ', 'r') # reload swap quote
t.expect('to continue: ', '\n') # exit swap quote view
t.expect('(Y/n): ', 'y') # fee OK?
t.expect('(Y/n): ', 'y') # change OK?
t.expect('(y/N): ', 'n') # add comment?
if reload_quote:
t.expect('to continue: ', 'r') # reload swap quote
t.expect('to continue: ', '\n') # exit swap quote view
t.expect('view: ', 'y') # view TX
t.expect('to continue: ', '\n')
if sign_and_send:
t.passphrase(dfl_wcls.desc, rt_pw)
t.expect('to confirm: ', 'YES\n')
else:
t.expect('(y/N): ', 'y') # save?
t.written_to_file(file_desc)
return t
def _swaptxcreate(self, args, *, action='txcreate', add_opts=[], exit_val=None):
return self.spawn(
f'mmgen-swap{action}',
['-q', '-d', self.tmpdir, '-B', '--bob']
+ add_opts
+ args,
exit_val = exit_val)
def swaptxcreate1(self, idx=3):
return self._swaptxcreate_ui_common(
self._swaptxcreate(['BCH', '1.234', f'{self.sid}:C:{idx}', 'LTC', f'{self.sid}:B:3']))
def swaptxcreate2(self):
t = self._swaptxcreate(['BCH', 'LTC'], add_opts=['--no-quiet'])
t.expect('Enter a number> ', '1')
t.expect('OK? (Y/n): ', 'y')
return self._swaptxcreate_ui_common(t, reload_quote=True)
def swaptxcreate3(self):
return self._swaptxcreate_ui_common(
self._swaptxcreate(['BCH', 'LTC', f'{self.sid}:B:3']))
def swaptxcreate4(self):
t = self._swaptxcreate(['BCH', '1.234', 'C', 'LTC', 'B'])
t.expect('OK? (Y/n): ', 'y')
t.expect('Enter a number> ', '1')
t.expect('OK? (Y/n): ', 'y')
return self._swaptxcreate_ui_common(t)
def swaptxcreate5(self):
t = self._swaptxcreate(['BCH', '1.234', f'{self.sid}:C', 'LTC', f'{self.sid}:B'])
t.expect('OK? (Y/n): ', 'y')
t.expect('OK? (Y/n): ', 'y')
return self._swaptxcreate_ui_common(t)
def swaptxcreate6(self):
addr = make_burn_addr(self.protos[1], mmtype='bech32')
t = self._swaptxcreate(['BCH', '1.234', f'{self.sid}:C', 'LTC', addr])
t.expect('OK? (Y/n): ', 'y')
t.expect('to confirm: ', 'YES\n')
return self._swaptxcreate_ui_common(t)
def swaptxcreate7(self):
t = self._swaptxcreate(['BCH', '0.56789', 'LTC'])
t.expect('OK? (Y/n): ', 'y')
t.expect('Enter a number> ', '1')
t.expect('OK? (Y/n): ', 'y')
return self._swaptxcreate_ui_common(t)
def _swaptxcreate_bad(self, args, *, exit_val=1, expect1=None, expect2=None):
t = self._swaptxcreate(args, exit_val=exit_val)
if expect1:
t.expect(expect1)
if expect2:
t.expect(expect2)
return t
def swaptxcreate_bad1(self):
t = self._swaptxcreate_bad(
['BCH', '1.234', f'{self.sid}:C:3', 'LTC', f'{self.sid}:S:1'],
expect1 = 'Requested destination address',
expect2 = 'Address reuse harms your privacy')
t.expect('(y/N): ', 'n')
return t
def swaptxcreate_bad2(self):
t = self._swaptxcreate_bad(
['BCH', '1.234', f'{self.sid}:C:1', 'LTC', f'{self.sid}:S:2'],
expect1 = 'Requested change address',
expect2 = 'Address reuse harms your privacy')
t.expect('(y/N): ', 'n')
return t
def swaptxcreate_bad3(self):
return self._swaptxcreate_bad(['RTC', 'LTC'], expect1='unsupported send coin')
def swaptxcreate_bad4(self):
return self._swaptxcreate_bad(['LTC', 'XTC'], expect1='unsupported receive coin')
def swaptxcreate_bad5(self):
return self._swaptxcreate_bad(['LTC'], expect1='USAGE:')
def swaptxcreate_bad6(self):
return self._swaptxcreate_bad(['LTC', '1.2345'], expect1='USAGE:')
def swaptxcreate_bad7(self):
t = self._swaptxcreate(['BCH', 'LTC'], exit_val=1)
t.expect('Enter a number> ', '3')
t.expect('Enter a number> ', '1')
t.expect('OK? (Y/n): ', 'n')
return t
def swaptxcreate_bad8(self):
addr = make_burn_addr(self.protos[2], mmtype='compressed')
t = self._swaptxcreate_bad(['BCH', '1.234', addr, 'LTC', 'S'])
t.expect('to confirm: ', 'NO\n')
return t
def swaptxcreate_bad9(self):
return self._swaptxcreate_bad(['BCH', '1.234', 'S', 'LTC', 'B'], exit_val=2, expect1='invalid command-')
def swaptxsign1_create(self):
self.get_file_with_ext('rawtx', delete_all=True)
return self._swaptxcreate_ui_common(
self._swaptxcreate(['LTC', '5.4321', f'{self.sid}:S:2', 'BCH', f'{self.sid}:C:2']))
def swaptxsign1(self):
return self._swaptxsign()
def swaptxsend1(self):
return self._swaptxsend1()
def swaptxsend1_status(self):
t = self._swaptxsend1(add_opts=['--status'], spawn_only=True)
t.expect('in mempool')
return t
def _swaptxsend1(self, *, add_opts=[], spawn_only=False):
return self._swaptxsend(
add_opts = add_opts + [
# test overriding host:port with coin-specific options:
'--rpc-host=unreachable', # unreachable host
'--ltc-rpc-host=localhost',
'--rpc-port=46381', # bad port
'--ltc-rpc-port=20680',
],
spawn_only = spawn_only)
def _swaptxsend(self, *, add_opts=[], spawn_only=False):
fn = self.get_file_with_ext('sigtx')
t = self.spawn('mmgen-txsend', add_opts + ['-q', '-d', self.tmpdir, '--bob', fn])
if spawn_only:
return t
t.expect('view: ', 'v')
t.expect('(y/N): ', 'n')
t.expect('to confirm: ', 'YES\n')
return t
def _swaptxsign(self, *, add_opts=[], expect=None):
self.get_file_with_ext('sigtx', delete_all=True)
fn = self.get_file_with_ext('rawtx')
t = self.spawn('mmgen-txsign', add_opts + ['-d', self.tmpdir, '--bob', fn])
t.view_tx('t')
if expect:
t.expect(expect)
t.passphrase(dfl_wcls.desc, rt_pw)
t.do_comment(None)
t.expect('(Y/n): ', 'y')
t.written_to_file('Signed transaction')
return t
def swaptxsign2_create(self):
self.get_file_with_ext('rawtx', delete_all=True)
addr = make_burn_addr(self.protos[2], mmtype='compressed')
t = self._swaptxcreate(['LTC', '4.56789', f'{self.sid}:S:3', 'BCH', addr])
t.expect('to confirm: ', 'YES\n') # confirm non-MMGen destination
return self._swaptxcreate_ui_common(t)
def swaptxsign2(self):
return self._swaptxsign(add_opts=['--allow-non-wallet-swap'], expect='swap to non-wallet address')
def swaptxsend2(self):
return self._swaptxsend()
def swaptxbump1(self):
return self._swaptxbump('20s', add_opts=['--allow-non-wallet-swap'])
def swaptxbump2(self): # create one-output TX back to self to rescue funds
return self._swaptxbump('40s', output_args=[f'{self.sid}:S:4'])
def _swaptxbump(self, fee, *, add_opts=[], output_args=[], exit_val=None):
self.get_file_with_ext('rawtx', delete_all=True)
fn = self.get_file_with_ext('sigtx')
t = self.spawn(
'mmgen-txbump',
['-q', '-d', self.tmpdir, '--bob'] + add_opts + output_args + [fn],
exit_val = exit_val)
return self._swaptxbump_ui_common(t, interactive_fee=fee, new_outputs=bool(output_args))
def _swaptxbump_ui_common_new_outputs(self, t, *, inputs=None, interactive_fee=None, file_desc=None):
return self._swaptxbump_ui_common(t, interactive_fee=interactive_fee, new_outputs=True)
def _swaptxbump_ui_common(self, t, *, inputs=None, interactive_fee=None, file_desc=None, new_outputs=False):
if new_outputs:
t.expect('fee: ', interactive_fee + '\n')
t.expect('(Y/n): ', 'y') # fee ok?
t.expect('(Y/n): ', 'y') # change ok?
else:
t.expect('ENTER for the change output): ', '\n')
t.expect('(Y/n): ', 'y') # confirm deduct from chg output
t.expect('to continue: ', '\n') # exit swap quote
t.expect('fee: ', interactive_fee + '\n')
t.expect('(Y/n): ', 'y') # fee ok?
t.expect('(y/N): ', 'n') # comment?
t.expect('(y/N): ', 'y') # save?
return t
def swaptxsign3(self):
return self.swaptxsign2()
def swaptxsend3(self):
return self._swaptxsend()
def swaptxsign4(self):
return self._swaptxsign()
def swaptxsend4(self):
return self._swaptxsend()
def _generate_for_proto(self, proto_idx):
return self.generate(num_blocks=1, add_opts=[f'--coin={self.protos[proto_idx].coin}'])
def generate0(self):
return self._generate_for_proto(0)
def generate1(self):
return self._generate_for_proto(1)
def generate2(self):
return self._generate_for_proto(2)
def swap_bal1(self):
return self._bob_bal(1, '1494.56784238')
def swap_bal2(self):
return self._bob_bal(1, '1382.79038152')
def swap_bal3(self):
return self._bob_bal(0, '999.99990407')
def swaptxsign1_do(self):
return self._swaptxcreate_ui_common(
self._swaptxcreate(['LTC', '111.777444', f'{self.sid}:B:2', 'BCH', f'{self.sid}:C:2'], action='txdo'),
sign_and_send = True,
file_desc = 'Sent transaction')
def swaptxsign_bad1_create(self):
self.get_file_with_ext('rawtx', delete_all=True)
return self.swaptxcreate6()
def swaptxsign_bad1(self):
self.get_file_with_ext('sigtx', delete_all=True)
return self._swaptxsign_bad('non-wallet address forbidden')
def _swaptxsign_bad(self, expect, *, add_opts=[], exit_val=1):
fn = self.get_file_with_ext('rawtx')
t = self.spawn('mmgen-txsign', add_opts + ['-d', self.tmpdir, '--bob', fn], exit_val=exit_val)
t.expect('view: ', '\n')
t.expect(expect)
return t
def swaptxsign_bad2_create(self):
self.get_file_with_ext('rawtx', delete_all=True)
return self.swaptxcreate1(idx=4)
def swaptxsign_bad2(self):
return self._swaptxsign()
def swaptxsend_bad2(self):
import json
from mmgen.tx.file import json_dumps
from mmgen.util import make_chksum_6
fn = self.get_file_with_ext('sigtx')
with open(fn) as fh:
data = json.load(fh)
data['MMGenTransaction']['swap_quote_expiry'] -= 2400
data['chksum'] = make_chksum_6(json_dumps(data['MMGenTransaction']))
with open(fn, 'w') as fh:
json.dump(data, fh)
t = self.spawn('mmgen-txsend', ['-d', self.tmpdir, '--bob', fn], exit_val=2)
t.expect('expired')
return t
run_setup_bip39 = CmdTestAutosign.run_setup_bip39
run_setup = CmdTestAutosign.run_setup
def swap_wait_loop_start(self):
return self.wait_loop_start(add_opts=['--allow-non-wallet-swap'])
def autosign_swaptxcreate1(self):
return self._user_txcreate(
'bob',
progname = 'swaptxcreate',
input_handler = self._swaptxcreate_ui_common,
output_args = ['BTC', '8.88', f'{self.sid}:S:3', 'LTC', f'{self.sid}:S:3'])
def autosign_swaptxsend1(self):
return self._user_txsend('bob', need_rbf=True)
def autosign_swaptxbump1(self):
return self._user_txcreate(
'bob',
progname = 'txbump',
input_handler = self._swaptxbump_ui_common_new_outputs,
output_args = [f'{self.sid}:S:3'])
def autosign_swaptxsend2(self):
return self._user_txsend('bob', need_rbf=True)
# admin methods:
def sleep(self):
import time
time.sleep(1000)
return 'ok'
def listaddresses0(self):
return self._listaddresses(0)
def listaddresses1(self):
return self._listaddresses(1)
def listaddresses2(self):
return self._listaddresses(2)
def _listaddresses(self, proto_idx):
return self.user_bal('bob', None, proto=self.protos[proto_idx], skip_check=True)
def mempool0(self):
return self._mempool(0)
def mempool1(self):
return self._mempool(1)
def mempool2(self):
return self._mempool(2)
def _mempool(self, proto_idx):
self.spawn('', msg_only=True)
data = self._do_cli(['getrawmempool'], add_opts=[f'--coin={self.protos[proto_idx].coin}'])
assert data
return 'ok'

89
test/cmdtest_d/midgard.py Executable file
View file

@ -0,0 +1,89 @@
#!/usr/bin/env python3
import json, re, time
from http.server import HTTPServer, BaseHTTPRequestHandler
from mmgen.cfg import Config
from mmgen.util import msg, make_timestr
cfg = Config()
def make_inbound_addr(proto, mmtype):
from mmgen.tool.coin import tool_cmd
n = int(time.time()) // (60 * 60 * 24) # increments once every 24 hrs
return tool_cmd(
cfg = cfg,
cmdname = 'pubhash2addr',
proto = proto,
mmtype = mmtype).pubhash2addr(f'{n:040x}')
data_template = {
'inbound_address': None,
'inbound_confirmation_blocks': 4,
'inbound_confirmation_seconds': 2400,
'outbound_delay_blocks': 5,
'outbound_delay_seconds': 30,
'fees': {
'asset': 'LTC.LTC',
'affiliate': '0',
'outbound': '878656',
'liquidity': '8945012',
'total': '9823668',
'slippage_bps': 31,
'total_bps': 34
},
'expiry': None,
'warning': 'Do not cache this response. Do not send funds after the expiry.',
'notes': 'First output should be to inbound_address, second output should be change back to self, third output should be OP_RETURN, limited to 80 bytes. Do not send below the dust threshold. Do not use exotic spend scripts, locks or address formats.',
'dust_threshold': '10000',
'recommended_min_amount_in': '1222064',
'recommended_gas_rate': '6',
'gas_rate_units': 'satsperbyte',
'expected_amount_out': None,
'max_streaming_quantity': 0,
'streaming_swap_blocks': 0,
'total_swap_seconds': 2430
}
# https://thornode.ninerealms.com/thorchain/quote/swap?from_asset=BCH.BCH&to_asset=LTC.LTC&amount=1000000
sample_request = 'GET /thorchain/quote/swap?from_asset=BCH.BCH&to_asset=LTC.LTC&amount=1000000000 HTTP/1.1'
request_pat = r'/thorchain/quote/swap\?from_asset=(\S+)\.(\S+)&to_asset=(\S+)\.(\S+)&amount=(\d+) HTTP/'
prices = { 'BTC': 97000, 'LTC': 115, 'BCH': 330 }
def create_data(request_line):
m = re.search(request_pat, request_line)
try:
_, send_coin, _, recv_coin, amt_atomic = m.groups()
except Exception as e:
msg(f'{type(e)}: {e}')
return {}
from mmgen.protocol import init_proto
send_proto = init_proto(cfg, send_coin, network='regtest', need_amt=True)
in_amt = send_proto.coin_amt(int(amt_atomic), from_unit='satoshi')
out_amt = in_amt * (prices[send_coin] / prices[recv_coin])
addr = make_inbound_addr(send_proto, send_proto.preferred_mmtypes[0])
expiry = int(time.time()) + (10 * 60)
return data_template | {
'expected_amount_out': str(out_amt.to_unit('satoshi')),
'expiry': expiry,
'inbound_address': addr,
}
class handler(BaseHTTPRequestHandler):
header = b'HTTP/1.1 200 OK\nContent-type: application/json\n\n'
def do_GET(self):
# print(f'Midgard server received:\n {self.requestline}')
self.wfile.write(self.header + json.dumps(create_data(self.requestline)).encode())
def run_midgard_server(server_class=HTTPServer, handler_class=handler):
print('Midgard server listening on port 18800')
server_address = ('localhost', 18800)
httpd = server_class(server_address, handler_class)
httpd.serve_forever()
print('Midgard server exiting')

View file

@ -10,7 +10,7 @@ from mmgen.tx import CompletedTX, UnsignedTX
from mmgen.tx.file import MMGenTxFile
from mmgen.cfg import Config
from ..include.common import cfg, qmsg, vmsg, gr_uc
from ..include.common import cfg, qmsg, vmsg, gr_uc, make_burn_addr
async def do_txfile_test(desc, fns, cfg=cfg, check=False):
qmsg(f'\n Testing CompletedTX initializer ({desc})')
@ -169,3 +169,74 @@ class unit_tests:
), pfx='')
return True
def memo(self, name, ut, desc='Swap transaction memo'):
from mmgen.protocol import init_proto
from mmgen.swap.proto.thorchain.memo import Memo
for coin, addrtype in (
('ltc', 'bech32'),
('bch', 'compressed'),
):
proto = init_proto(cfg, coin)
addr = make_burn_addr(proto, addrtype)
vmsg('\nTesting memo initialization:')
m = Memo(proto, addr)
vmsg(f'str(memo): {m}')
vmsg(f'repr(memo): {m!r}')
vmsg('\nTesting memo parsing:')
p = Memo.parse(m)
from pprint import pformat
vmsg(pformat(p._asdict()))
assert p.proto == 'THORChain'
assert p.function == 'SWAP'
assert p.chain == coin.upper()
assert p.asset == coin.upper()
assert p.address == addr.views[addr.view_pref]
assert p.trade_limit == 0
assert p.stream_interval == 1
assert p.stream_quantity == 0 # auto
vmsg('\nTesting is_partial_memo():')
for vec in (
str(m),
'SWAP:xyz',
'=:xyz',
's:xyz',
'a:xz',
'+:xz',
'WITHDRAW:xz',
'LOAN+:xz:x:x',
'TRADE-:xz:x:x',
'BOND:xz',
):
vmsg(f' pass: {vec}')
assert Memo.is_partial_memo(vec), vec
for vec in (
'=',
'swap',
'swap:',
'swap:abc',
'SWAP:a',
):
vmsg(f' fail: {vec}')
assert not Memo.is_partial_memo(vec), vec
vmsg('\nTesting error handling:')
def bad(s):
return lambda: Memo.parse(s)
ut.process_bad_data((
('bad1', 'SwapMemoParseError', 'must contain', bad('x')),
('bad2', 'SwapMemoParseError', 'must contain', bad('y:z:x')),
('bad3', 'SwapMemoParseError', 'function abbrev', bad('z:l:foobar:0/1/0')),
('bad4', 'SwapMemoParseError', 'asset abbrev', bad('=:x:foobar:0/1/0')),
('bad5', 'SwapMemoParseError', 'failed to parse', bad('=:l:foobar:n')),
('bad6', 'SwapMemoParseError', 'non-integer', bad('=:l:foobar:x/1/0')),
('bad7', 'SwapMemoParseError', 'extra', bad('=:l:foobar:0/1/0:x')),
), pfx='')
return True

View file

@ -0,0 +1,11 @@
from .midgard_orig import *
class overlay_fake_MidgardRPCClient:
proto = 'http'
host = 'localhost:18800'
verify = False
MidgardRPCClient.proto = overlay_fake_MidgardRPCClient.proto
MidgardRPCClient.host = overlay_fake_MidgardRPCClient.host
MidgardRPCClient.verify = overlay_fake_MidgardRPCClient.verify