#!/usr/bin/env python3 # # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution # Copyright (C)2013-2021 The MMGen Project # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ ts_xmrwallet.py: xmrwallet tests for the test.py test suite """ import sys,os,atexit,asyncio from subprocess import run,PIPE from mmgen.globalvars import g from mmgen.opts import opt from mmgen.obj import MMGenRange,XMRAmt from mmgen.addr import KeyAddrList,AddrIdxList from ..include.common import * from .common import * from .ts_base import * class TestSuiteXMRWallet(TestSuiteBase): """ Monero wallet operations """ networks = ('xmr',) passthru_opts = ('coin',) tmpdir_nums = [29] dfl_random_txs = 3 cmd_group = ( ('gen_kafiles', 'generating key-address files'), ('create_wallets', 'creating Monero wallets'), ('set_dest_miner', 'opening miner wallet'), ('mine_blocks', 'mining blocks'), ('fund_alice', 'sending funds'), ('mine_blocks_tx', 'mining blocks'), ('sync_wallets', 'syncing all wallets'), ('sync_wallets_selected', 'syncing selected wallets'), ('sweep_to_address_proxy', 'sweeping to new address (via TX relay + proxy)'), ('mine_blocks', 'mining blocks'), ('sweep_to_account', 'sweeping to new account'), ('mine_blocks', 'mining blocks'), ('sweep_to_address_noproxy', 'sweeping to new address (via TX relay)'), ('mine_blocks', 'mining blocks'), ) def __init__(self,trunner,cfgs,spawn): TestSuiteBase.__init__(self,trunner,cfgs,spawn) if trunner == None: return from mmgen.protocol import init_proto self.proto = init_proto('XMR',network='testnet') self.datadir_base = os.path.join('test','daemons','xmrtest') self.tool_args = ['--testnet=1', '--monero-wallet-rpc-password=passw0rd'] self.init_users() self.init_daemon_args() for v in self.users.values(): run(['mkdir','-p',v.udir]) if not opt.no_daemon_autostart: self.start_daemons() self.start_wallet_daemons() if not opt.no_daemon_stop: atexit.register(self.stop_daemons) atexit.register(self.stop_wallet_daemons) self.init_proxy() self.balance = None # init methods def init_proxy(self): def kill_proxy(): omsg(f'Killing SSH SOCKS server at localhost:{self.socks_port}') cmd = [ 'pkill', '-f', ' '.join(a + b2) ] run(cmd) self.use_proxy = False self.socks_port = 9060 a = ['ssh','-x','-o','ExitOnForwardFailure=True','-D',f'localhost:{self.socks_port}'] b1 = ['localhost','true'] b2 = ['-fN','-E','txrelay-proxy.debug','localhost'] cp = run(a+b1,stdout=PIPE,stderr=PIPE) if cp.returncode == 0: if not opt.no_daemon_autostart: run(a+b2) omsg(f'SSH SOCKS server started, listening at localhost:{self.socks_port}') self.use_proxy = True elif b'already in use' in cp.stderr: omsg(f'Port {self.socks_port} already in use. Assuming SSH SOCKS server is running') self.use_proxy = True else: m1 = 'Unable to start command {!r}\n'.format(' '.join(a + b)) m2 = 'Will not test proxied TX relay daemon' omsg(cp.stderr.decode()) omsg(yellow(m1+m2)) if not opt.no_daemon_stop: atexit.register(kill_proxy) def init_users(self): from mmgen.daemon import CoinDaemon,MoneroWalletDaemon from mmgen.rpc import MoneroRPCClient,MoneroRPCClientRaw,MoneroWalletRPCClient self.users = {} n = self.tmpdir_nums[0] ud = namedtuple('user_data',[ 'sid', 'mmwords', 'udir', 'datadir', 'kal_range', 'kafile', 'walletfile_fs', 'addrfile_fs', 'md', 'md_rpc', 'md_json_rpc', 'wd', 'wd_rpc', ]) for user,sid,shift,kal_range in ( # kal_range must be None, a single digit, or a single hyphenated range ('miner', '98831F3A', 130, '1'), ('bob', '1378FC64', 140, None), ('alice', 'FE3C6545', 150, '1-4'), ): udir = os.path.join('test',f'tmp{n}',user) datadir = os.path.join(self.datadir_base,user) md = CoinDaemon( proto = self.proto, test_suite = True, port_shift = shift, opts = ['online'], datadir = datadir ) md_rpc = MoneroRPCClientRaw( host = md.host, port = md.rpc_port, user = None, passwd = None, test_connection = False, ) md_json_rpc = MoneroRPCClient( host = md.host, port = md.rpc_port, user = None, passwd = None, test_connection = False, ) wd = MoneroWalletDaemon( user = 'foo', passwd = 'bar', wallet_dir = udir, test_suite = True, port_shift = shift, datadir = os.path.join('test','daemons'), daemon_addr = f'127.0.0.1:{md.rpc_port}', testnet = True ) wd_rpc = MoneroWalletRPCClient( host = wd.host, port = wd.rpc_port, user = wd.user, passwd = wd.passwd, test_connection = False, ) self.users[user] = ud( sid = sid, mmwords = f'test/ref/{sid}.mmwords', udir = udir, datadir = datadir, kal_range = kal_range, kafile = f'{udir}/{sid}-XMR-M[{kal_range}].testnet.akeys', walletfile_fs = f'{udir}/{sid}-{{}}-MoneroWallet.testnet', addrfile_fs = f'{udir}/{sid}-{{}}-MoneroWallet.testnet.address.txt', md = md, md_rpc = md_rpc, md_json_rpc = md_json_rpc, wd = wd, wd_rpc = wd_rpc, ) def init_daemon_args(self): common_args = ['--p2p-bind-ip=127.0.0.1','--fixed-difficulty=1'] # ,'--rpc-ssl-allow-any-cert'] for u in self.users: other_ports = [self.users[u2].md.p2p_port for u2 in self.users if u2 != u] node_args = [f'--add-exclusive-node=127.0.0.1:{p}' for p in other_ports] self.users[u].md.usr_coind_args = common_args + node_args # cmd_group methods def gen_kafiles(self): for user,data in self.users.items(): if not data.kal_range: continue run(['mkdir','-p',data.udir]) run(f'rm -f {data.kafile}',shell=True) t = self.spawn( 'mmgen-keygen', [ '--testnet=1','-q', '--accept-defaults', '--coin=xmr', f'--outdir={data.udir}', data.mmwords, data.kal_range ], extra_desc = f'({capfirst(user)})' ) t.read() t.ok() t.skip_ok = True return t def create_wallets(self): for user,data in self.users.items(): if not data.kal_range: continue run('rm -f {}*'.format( data.walletfile_fs.format('*') ),shell=True) dir_arg = [f'--outdir='+data.udir] cmd_opts = ['wallets={}'.format(data.kal_range)] t = self.spawn( 'mmgen-tool', self.tool_args + dir_arg + [ 'xmrwallet', 'create', data.kafile ] + cmd_opts, extra_desc = f'({capfirst(user)})' ) t.expect('Check key-to-address validity? (y/N): ','n') for i in MMGenRange(data.kal_range).items: t.expect('Address: ') t.read() t.ok() t.skip_ok = True return t async def set_dest_miner(self): self.do_msg() self.set_dest('miner',1,0,lambda x: x > 20,'unlocked balance > 20') await self.open_wallet_user('miner',1) return 'ok' async def fund_alice(self): self.do_msg() await self.transfer( 'miner', 1234567891234, read_from_file(self.users['alice'].addrfile_fs.format(1)), ) self.set_dest('alice',1,0,lambda x: x > 1,'unlocked balance > 1') return 'ok' def sync_wallets_selected(self): return self.sync_wallets(wallets='1,3-4') def sync_wallets(self,wallets=None): data = self.users['alice'] dir_arg = [f'--outdir={data.udir}'] cmd_opts = list_gen( [f'daemon=localhost:{data.md.rpc_port}'], [f'wallets={wallets}', wallets], ) t = self.spawn( 'mmgen-tool', self.tool_args + dir_arg + [ 'xmrwallet', 'sync', data.kafile ] + cmd_opts ) t.expect('Check key-to-address validity? (y/N): ','n') wlist = AddrIdxList(wallets) if wallets else MMGenRange(data.kal_range).items for n,wnum in enumerate(wlist): t.expect('Syncing wallet {}/{} ({})'.format( n+1, len(wlist), os.path.basename(data.walletfile_fs.format(wnum)), )) t.expect('Chain height: ') t.expect('Wallet height: ') t.expect('Balance: ') t.read() return t def _sweep_user(self,user,spec,tx_relay_daemon=None): data = self.users[user] dir_arg = [f'--outdir='+data.udir] cmd_opts = list_gen( [f'daemon=localhost:{data.md.rpc_port}'], [f'wallets={spec}'], [f'tx_relay_daemon={tx_relay_daemon}', tx_relay_daemon] ) t = self.spawn( 'mmgen-tool', self.tool_args + dir_arg + [ 'xmrwallet', 'sweep', data.kafile ] + cmd_opts, extra_desc = f'({capfirst(user)})' ) t.expect('Check key-to-address validity? (y/N): ','n') t.expect( 'Create new {} .* \(y/N\): '.format('account' if ',' in spec else 'address'), 'y', regex=True ) t.expect('Relay sweep transaction? (y/N): ','y') t.read() return t def sweep_to_address_proxy(self): ret = self._sweep_user( 'alice', '1:0', tx_relay_daemon = 'localhost:{}:127.0.0.1:{}'.format( # proxy must be IP, not 'localhost' self.users['bob'].md.rpc_port, self.socks_port ) if self.use_proxy else None ) self.set_dest('alice',1,0,lambda x: x > 1,'unlocked balance > 1') return ret def sweep_to_account(self): ret = self._sweep_user('alice','1:0,2') self.set_dest('alice',2,1,lambda x: x > 1,'unlocked balance > 1') return ret def sweep_to_address_noproxy(self): ret = self._sweep_user( 'alice', '2:1', tx_relay_daemon = 'localhost:{}'.format(self.users['bob'].md.rpc_port) ) self.set_dest('alice',2,1,lambda x: x > 1,'unlocked balance > 1') return ret # wallet methods async def open_wallet_user(self,user,wnum): data = self.users[user] silence() kal = KeyAddrList(self.proto,data.kafile,skip_key_address_validity_check=True) end_silence() return await data.wd_rpc.call( 'open_wallet', filename = os.path.basename(data.walletfile_fs.format(wnum)), password = kal.entry(wnum).wallet_passwd ) async def close_wallet_user(self,user): ret = await self.users[user].wd_rpc.call('close_wallet') return 'ok' # mining methods async def start_mining(self): data = self.users['miner'] addr = read_from_file(data.addrfile_fs.format(1)) # mine to wallet #1, account 0 for i in range(20): ret = await data.md_rpc.call( 'start_mining', do_background_mining = False, # run mining in background or foreground ignore_battery = True, # ignore battery state (on laptop) miner_address = addr, # account address to mine to threads_count = 3 ) # number of mining threads to run status = self.get_status(ret) if status == 'OK': return True elif status == 'BUSY': await asyncio.sleep(5) omsg('Daemon busy. Attempting to start mining...') else: die(2,f'Monerod returned status {status}') else: die(2,'Max retries exceeded') async def stop_mining(self): ret = await self.users['miner'].md_rpc.call('stop_mining') return self.get_status(ret) async def mine_blocks(self,random_txs=None): """ - open destination wallet - optionally create and broadcast random TXs - start mining - mine until funds appear in wallet - stop mining - close wallet """ async def get_height(): u = self.users['miner'] for i in range(20): try: return (await u.md_json_rpc.call('get_last_block_header'))['block_header']['height'] except Exception as e: if 'onnection refused' in str(e): omsg(f'{e}\nMonerod appears to have crashed. Attempting to restart...') await asyncio.sleep(5) u.md.restart() await asyncio.sleep(5) await self.start_mining() else: raise else: die(2,'Restart attempt limit exceeded') async def send_random_txs(): from mmgen.tool import tool_api t = tool_api() t.init_coin('XMR','testnet') t.usr_randchars = 0 imsg_r(f'Sending random transactions: ') for i in range(random_txs): await self.transfer( 'miner', 123456789, t.randpair()[1], ) imsg_r(f'{i+1} ') oqmsg_r('+') await asyncio.sleep(0.5) imsg('') def print_balance(dest,ub): imsg('Total balance in {}’s wallet #{}, account {}: {}'.format( capfirst(dest.user), dest.wnum, dest.account, ub.hl() )) async def get_balance(dest): data = self.users[dest.user] await data.wd_rpc.call('refresh') ret = await data.wd_rpc.call('get_accounts') return XMRAmt(ret['subaddress_accounts'][dest.account]['unlocked_balance'],from_unit='atomic') self.do_msg(extra_desc=f'+{random_txs} random TXs' if random_txs else None) if self.dest.user != 'miner': await self.open_wallet_user(self.dest.user,self.dest.wnum) if random_txs: await send_random_txs() await self.start_mining() h = await get_height() imsg_r(f'Chain height: {h} ') while True: ub = await get_balance(self.dest) if self.dest.test(ub): imsg('') oqmsg_r('+') print_balance(self.dest,ub) break # else: # imsg(f'Test {self.dest.test_desc!r} failed') await asyncio.sleep(2) h = await get_height() imsg_r(f'{h} ') oqmsg_r('+') await self.stop_mining() if self.dest.user != 'miner': await self.close_wallet_user(self.dest.user) return 'ok' async def mine_blocks_tx(self): return await self.mine_blocks(random_txs=self.dfl_random_txs) # util methods def get_status(self,ret): if ret['status'] != 'OK': imsg( 'RPC status: {}'.format(ret['status']) ) return ret['status'] def do_msg(self,extra_desc=None): self.spawn( '', msg_only = True, extra_desc = f'({extra_desc})' if extra_desc else None ) def set_dest(self,user,wnum,account,test,test_desc): self.dest = namedtuple( 'dest_info',['user','wnum','account','test','test_desc'])(user,wnum,account,test,test_desc) async def transfer(self,user,amt,addr): return await self.users[user].wd_rpc.call('transfer',destinations=[{'amount':amt,'address':addr}]) # daemon start/stop methods def start_daemons(self): self.stop_daemons() for v in self.users.values(): run(['mkdir','-p',v.datadir]) v.md.start() def stop_daemons(self): for v in self.users.values(): if v.md.state != 'stopped': v.md.stop() run(['rm','-rf',self.datadir_base]) def start_wallet_daemons(self): for v in self.users.values(): if v.kal_range: v.wd.start() def stop_wallet_daemons(self): for v in self.users.values(): if v.kal_range and v.wd.state != 'stopped': v.wd.stop()