CmdTestBase: add cfg attr

This commit is contained in:
The MMGen Project 2025-03-18 20:09:59 +03:00
commit 699b21f313
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
24 changed files with 199 additions and 203 deletions

View file

@ -249,7 +249,7 @@ def list_cmds():
yield green('AVAILABLE COMMANDS:')
for gname in gm.cmd_groups:
tg = gm.gm_init_group(None, gname, None, None)
tg = gm.gm_init_group(cfg, None, gname, None, None)
desc = tg.__doc__.strip() if tg.__doc__ else type(tg).__name__
d.append((gname, desc, gm.cmd_list, gm.dpy_data))
cw = max(max(len(k) for k in gm.dpy_data), cw)
@ -311,7 +311,7 @@ if __name__ == '__main__':
from test.cmdtest_d.runner import CmdTestRunner
try:
tr = CmdTestRunner(cfg, cfg._proto, repo_root, data_dir, trash_dir, trash_dir2)
tr = CmdTestRunner(cfg, repo_root, data_dir, trash_dir, trash_dir2)
tr.run_tests(cmd_args)
tr.warn_skipped()
if tr.daemon_started and not cfg.no_daemon_stop:

View file

@ -15,7 +15,7 @@ import time
from .ct_autosign import CmdTestAutosignThreaded
from .ct_regtest import CmdTestRegtest, rt_pw
from ..include.common import cfg, gr_uc
from ..include.common import gr_uc
class CmdTestAutosignAutomount(CmdTestAutosignThreaded, CmdTestRegtest):
'automounted transacting operations via regtest mode'
@ -76,12 +76,12 @@ class CmdTestAutosignAutomount(CmdTestAutosignThreaded, CmdTestRegtest):
('txview', 'viewing transactions'),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
self.coins = [cfg.coin.lower()]
CmdTestAutosignThreaded.__init__(self, trunner, cfgs, spawn)
CmdTestRegtest.__init__(self, trunner, cfgs, spawn)
CmdTestAutosignThreaded.__init__(self, cfg, trunner, cfgs, spawn)
CmdTestRegtest.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return

View file

@ -52,12 +52,12 @@ class CmdTestAutosignETH(CmdTestAutosignThreaded, CmdTestEthdev):
('txview', 'viewing transactions'),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
self.coins = [cfg.coin.lower()]
CmdTestAutosignThreaded.__init__(self, trunner, cfgs, spawn)
CmdTestEthdev.__init__(self, trunner, cfgs, spawn)
CmdTestAutosignThreaded.__init__(self, cfg, trunner, cfgs, spawn)
CmdTestEthdev.__init__(self, cfg, trunner, cfgs, spawn)
self.txop_opts = ['--autosign', '--regtest=1', '--quiet']

View file

@ -31,7 +31,6 @@ from mmgen.led import LEDControl
from mmgen.autosign import Autosign, Signable
from ..include.common import (
cfg,
omsg,
omsg_r,
oqmsg,
@ -58,9 +57,9 @@ class CmdTestAutosignBase(CmdTestBase):
threaded = False
daemon_coins = []
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestBase.__init__(self, trunner, cfgs, spawn)
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return
@ -98,7 +97,7 @@ class CmdTestAutosignBase(CmdTestBase):
if hasattr(self, 'txdev'):
del self.txdev
if not cfg.no_daemon_stop:
if not self.cfg.no_daemon_stop:
if sys.platform == 'darwin':
for label in (self.asi.dev_label, self.asi.ramdisk.label):
self._macOS_eject_disk(label)
@ -165,7 +164,7 @@ class CmdTestAutosignBase(CmdTestBase):
'hdiutil', 'create', '-size', '10M', '-fs', 'exFAT',
'-volname', self.asi.dev_label,
str(self.fs_image_path)]
redir = None if cfg.exact_output or cfg.verbose else DEVNULL
redir = None if self.cfg.exact_output or self.cfg.verbose else DEVNULL
run(cmd, stdout=redir, check=True)
def _macOS_mount_fs_image(self, loc):
@ -203,7 +202,7 @@ class CmdTestAutosignBase(CmdTestBase):
mn_desc = mn_type or 'default'
mn_type = mn_type or 'mmgen'
if sys.platform == 'darwin' and not cfg.no_daemon_stop:
if sys.platform == 'darwin' and not self.cfg.no_daemon_stop:
self._macOS_eject_disk(self.asi.ramdisk.label)
self.insert_device()
@ -229,7 +228,7 @@ class CmdTestAutosignBase(CmdTestBase):
t.expect('OK? (Y/n): ', '\n')
from mmgen.mn_entry import mn_entry
entry_mode = 'full'
mne = mn_entry(cfg, mn_type, entry_mode=entry_mode)
mne = mn_entry(self.cfg, mn_type, entry_mode=entry_mode)
if usr_entry_modes:
t.expect('user-configured')
else:
@ -247,7 +246,7 @@ class CmdTestAutosignBase(CmdTestBase):
t.read()
self.remove_device()
if sys.platform == 'darwin' and not cfg.no_daemon_stop:
if sys.platform == 'darwin' and not self.cfg.no_daemon_stop:
atexit.register(self._macOS_eject_disk, self.asi.ramdisk.label)
return t
@ -596,7 +595,7 @@ class CmdTestAutosignThreaded(CmdTestAutosignBase):
src = Path(self.asi.txauto_dir)
from mmgen.tx import CompletedTX
txs = sorted(
[await CompletedTX(cfg=cfg, filename=path, quiet_open=True) for path in sorted(src.iterdir())],
[await CompletedTX(cfg=self.cfg, filename=path, quiet_open=True) for path in sorted(src.iterdir())],
key = lambda x: x.timestamp)
for tx in txs:
imsg(blue(f'\nViewing ‘{tx.infile.name}’:'))
@ -662,14 +661,14 @@ class CmdTestAutosign(CmdTestAutosignBase):
('sign_bad_no_daemon', 'signing transactions (error, no daemons running)'),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
super().__init__(trunner, cfgs, spawn)
super().__init__(cfg, trunner, cfgs, spawn)
if trunner is None:
return
if self.live and not cfg.exact_output:
if self.live and not self.cfg.exact_output:
die(1, red('autosign_live tests must be run with --exact-output enabled!'))
if self.no_insert_check:
@ -714,7 +713,7 @@ class CmdTestAutosign(CmdTestAutosignBase):
def create_dfl_wallet(self):
t = self.spawn('mmgen-walletconv', [
f'--outdir={cfg.data_dir}',
f'--outdir={self.cfg.data_dir}',
'--usr-randchars=0', '--quiet', '--hash-preset=1', '--label=foo',
'test/ref/98831F3A.hex'
]
@ -742,8 +741,8 @@ class CmdTestAutosign(CmdTestAutosignBase):
def run_setup_bip39(self):
from mmgen.cfgfile import mmgen_cfg_file
fn = mmgen_cfg_file(cfg, 'usr').fn
old_data = mmgen_cfg_file(cfg, 'usr').get_data(fn)
fn = mmgen_cfg_file(self.cfg, 'usr').fn
old_data = mmgen_cfg_file(self.cfg, 'usr').get_data(fn)
new_data = [d.replace('bip39:fixed', 'bip39:full')[2:]
if d.startswith('# mnemonic_entry_modes') else d for d in old_data]
with open(fn, 'w') as fh:
@ -791,12 +790,12 @@ class CmdTestAutosign(CmdTestAutosignBase):
self.insert_device()
silence()
self.do_mount(verbose=cfg.verbose or cfg.exact_output)
self.do_mount(verbose=self.cfg.verbose or self.cfg.exact_output)
end_silence()
for coindir, fn in data:
src = joinpath(ref_dir, coindir, fn)
if cfg.debug_utf8:
if self.cfg.debug_utf8:
ext = '.testnet.rawtx' if fn.endswith('.testnet.rawtx') else '.rawtx'
fn = fn[:-len(ext)] + '' + ext
target = joinpath(self.asi.tx_dir, fn)
@ -1016,9 +1015,9 @@ class CmdTestAutosignLive(CmdTestAutosignBTC):
('stop_daemons', 'stopping daemons'),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
super().__init__(trunner, cfgs, spawn)
super().__init__(cfg, trunner, cfgs, spawn)
if trunner is None:
return
@ -1075,7 +1074,7 @@ class CmdTestAutosignLive(CmdTestAutosignBTC):
no_msg = True,
exit_val = 1)
if not cfg.exact_output:
if not self.cfg.exact_output:
omsg('')
prompt_insert_sign(t)

View file

@ -25,7 +25,7 @@ import sys, os
from mmgen.util import msg
from mmgen.color import gray, purple, yellow
from ..include.common import cfg, write_to_file, read_from_file, imsg
from ..include.common import write_to_file, read_from_file, imsg
from .common import get_file_with_ext
class CmdTestBase:
@ -40,10 +40,11 @@ class CmdTestBase:
tmpdir_nums = []
test_name = None
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
if hasattr(self, 'name'): # init will be called multiple times for classes with multiple inheritance
return
self.name = type(self).__name__
self.cfg = cfg
self.proto = cfg._proto
self.tr = trunner
self.cfgs = cfgs
@ -71,7 +72,7 @@ class CmdTestBase:
@property
def tmpdir(self):
return os.path.join('test', 'tmp', '{}{}'.format(self.tmpdir_num, '' if cfg.debug_utf8 else ''))
return os.path.join('test', 'tmp', '{}{}'.format(self.tmpdir_num, '' if self.cfg.debug_utf8 else ''))
def get_file_with_ext(self, ext, **kwargs):
return get_file_with_ext(self.tmpdir, ext, **kwargs)

View file

@ -15,7 +15,7 @@ import sys, os, time, shutil
from mmgen.color import yellow
from mmgen.cfgfile import CfgFileSampleSys, CfgFileSampleUsr, cfg_file_sample
from ..include.common import cfg, read_from_file, write_to_file, imsg
from ..include.common import read_from_file, write_to_file, imsg
from .ct_base import CmdTestBase
class CmdTestCfgFile(CmdTestBase):
@ -42,8 +42,8 @@ class CmdTestCfgFile(CmdTestBase):
('opt_override2', (40, 'negative cmdline opts overriding cfg file opts', [])),
)
def __init__(self, trunner, cfgs, spawn):
CmdTestBase.__init__(self, trunner, cfgs, spawn)
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
self.spawn_env['MMGEN_TEST_SUITE_CFGTEST'] = '1'
def read_from_cfgfile(self, loc):
@ -139,7 +139,7 @@ class CmdTestCfgFile(CmdTestBase):
t.expect(s)
if t.pexpect_spawn: # view and exit pager
time.sleep(1 if cfg.exact_output else t.send_delay)
time.sleep(1 if self.cfg.exact_output else t.send_delay)
t.send('q')
t.expect(cp, 'n')
@ -208,7 +208,7 @@ class CmdTestCfgFile(CmdTestBase):
('ETH', 'True', '5.4321', True),
('ETC', 'None', '5.4321', False)
):
if cfg.no_altcoin and coin != 'BTC':
if self.cfg.no_altcoin and coin != 'BTC':
continue
t = self.spawn_test(
args = [
@ -249,7 +249,7 @@ class CmdTestCfgFile(CmdTestBase):
def chain_names(self):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
def run(chk, testnet):

View file

@ -69,7 +69,7 @@ dfl_sid = '98831F3A'
dfl_devaddr = '00a329c0648769a73afac7f9381e08fb43dbea72'
dfl_devkey = '4d5db4107d237df6a3d58ee5f70ae63d73d7658d4026f2eefd2f204c81682cb7'
def get_reth_dev_keypair():
def get_reth_dev_keypair(cfg):
from mmgen.bip39 import bip39
from mmgen.bip_hd import MasterNode
mn = 'test test test test test test test test test test test junk' # See ‘reth node --help’
@ -423,23 +423,26 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
),
}
def __init__(self, trunner, cfgs, spawn):
CmdTestBase.__init__(self, trunner, cfgs, spawn)
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return
global coin
coin = cfg.coin
self.eth_args = [f'--outdir={self.tmpdir}', '--regtest=1', '--quiet']
self.eth_args_noquiet = [f'--outdir={self.tmpdir}', '--regtest=1']
from mmgen.protocol import init_proto
self.proto = init_proto( cfg, cfg.coin, network='regtest', need_amt=True)
self.proto = init_proto(cfg, network_id=self.proto.coin+'_rt', need_amt=True)
from mmgen.daemon import CoinDaemon
self.daemon = CoinDaemon( cfg, network_id=self.proto.coin+'_rt', test_suite=True)
self.daemon = CoinDaemon(cfg, network_id=self.proto.coin+'_rt', test_suite=True)
if self.daemon.id == 'reth':
global dfl_devkey, dfl_devaddr
dfl_devkey, dfl_devaddr = get_reth_dev_keypair()
dfl_devkey, dfl_devaddr = get_reth_dev_keypair(cfg)
set_vbals(self.daemon.id)
@ -462,7 +465,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
@property
async def rpc(self):
from mmgen.rpc import rpc_init
return await rpc_init(cfg, self.proto)
return await rpc_init(self.cfg, self.proto)
def mining_delay(self): # workaround for mining race condition in dev mode
if self.daemon.id == 'reth':
@ -493,11 +496,11 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
if d.id in ('geth', 'erigon'):
imsg(' {:19} {}'.format('Cmdline:', ' '.join(e for e in d.start_cmd if not 'verbosity' in e)))
if not cfg.no_daemon_autostart:
if not self.cfg.no_daemon_autostart:
if not d.id in ('geth', 'erigon'):
d.stop(silent=True)
d.remove_datadir()
d.start( silent = not (cfg.verbose or cfg.exact_output))
d.start( silent = not (self.cfg.verbose or self.cfg.exact_output))
rpc = await self.rpc
imsg(f'Daemon: {rpc.daemon.coind_name} v{rpc.daemon_version_str}')
@ -511,7 +514,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
from mmgen.proto.eth.misc import decrypt_geth_keystore
key = decrypt_geth_keystore(
cfg = cfg,
cfg = self.cfg,
wallet_fn = wallet_fn,
passwd = b'')
@ -621,7 +624,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
msg(f'skipping test {self.test_name!r} for ETC')
return 'skip'
from mmgen.tw.ctl import TwCtl
twctl = await TwCtl(cfg, self.proto, no_wallet_init=True)
twctl = await TwCtl(self.cfg, self.proto, no_wallet_init=True)
from_fn = Path(ref_dir) / 'ethereum' / src_fn
bak_fn = twctl.tw_dir / f'upgraded-{src_fn}'
twctl.tw_dir.mkdir(mode=0o750, parents=True, exist_ok=True)
@ -656,7 +659,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
add_args = [],
bad_input = False,
exit_val = None):
ext = ext.format('' if cfg.debug_utf8 else '')
ext = ext.format('' if self.cfg.debug_utf8 else '')
fn = self.get_file_with_ext(ext, no_dot=True, delete=False)
t = self.spawn('mmgen-addrimport', ['--regtest=1'] + add_args + [fn], exit_val=exit_val)
if bad_input:
@ -710,7 +713,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
return t
def txsign(self, ni=False, ext='{}.regtest.rawtx', add_args=[], dev_send=False):
ext = ext.format('' if cfg.debug_utf8 else '')
ext = ext.format('' if self.cfg.debug_utf8 else '')
keyfile = joinpath(self.tmpdir, parity_devkey_fn)
txfile = self.get_file_with_ext(ext, no_dot=True)
t = self.spawn(
@ -725,19 +728,19 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
return self.txsign_ui_common(t, ni=ni, has_label=True)
def txsend(self, ext='{}.regtest.sigtx', add_args=[], test=False):
ext = ext.format('' if cfg.debug_utf8 else '')
ext = ext.format('' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, no_dot=True)
t = self.spawn('mmgen-txsend', self.eth_args + add_args + [txfile], no_passthru_opts=['coin'])
self.txsend_ui_common(
t,
quiet = not cfg.debug,
quiet = not self.cfg.debug,
bogus_send = False,
test = test,
has_label = True)
return t
def txview(self, ext_fs):
ext = ext_fs.format('' if cfg.debug_utf8 else '')
ext = ext_fs.format('' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, no_dot=True)
return self.spawn('mmgen-tool', ['--verbose', 'txview', txfile])
@ -757,7 +760,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
caller = 'txdo',
acct = '1',
no_read = True)
self._do_confirm_send(t, quiet=not cfg.debug, sure=False)
self._do_confirm_send(t, quiet=not self.cfg.debug, sure=False)
t.read()
self.get_file_with_ext('sigtx', delete_all=True)
return t
@ -818,7 +821,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
def tx_status(self, ext, expect_str, expect_str2='', add_args=[], exit_val=0):
self.mining_delay()
ext = ext.format('' if cfg.debug_utf8 else '')
ext = ext.format('' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, no_dot=True)
t = self.spawn(
'mmgen-txsend',
@ -842,7 +845,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
key = self.keystore_data['key']
imsg(f'Key: {key}')
from mmgen.proto.eth.misc import ec_sign_message_with_privkey
return ec_sign_message_with_privkey(cfg, self.message, bytes.fromhex(key), 'eth_sign')
return ec_sign_message_with_privkey(self.cfg, self.message, bytes.fromhex(key), 'eth_sign')
async def create_signature_rpc():
addr = self.read_from_tmpfile('signer_addr').strip()
@ -925,7 +928,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
fee_info_data = ('0.00084', '40'))
def txbump(self, ext=',40000]{}.regtest.rawtx', fee='50G', add_args=[]):
ext = ext.format('' if cfg.debug_utf8 else '')
ext = ext.format('' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, no_dot=True)
t = self.spawn('mmgen-txbump', self.eth_args + add_args + ['--yes', txfile])
t.expect('or gas price: ', fee+'\n')
@ -1043,7 +1046,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
if self.daemon.id in ('geth', 'reth'): # workaround for mining race condition in dev mode
await asyncio.sleep(1 if self.daemon.id == 'reth' else 0.5)
from mmgen.tx import NewTX
tx = await NewTX(cfg=cfg, proto=self.proto, target='tx')
tx = await NewTX(cfg=self.cfg, proto=self.proto, target='tx')
tx.rpc = await self.rpc
res = await tx.get_receipt(txid)
imsg(f'Gas sent: {res.gas_sent.hl():<9} {(res.gas_sent*res.gas_price).hl2(encl="()")}')
@ -1069,7 +1072,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
t = self.spawn('mmgen-'+mmgen_cmd, self.eth_args + args)
if mmgen_cmd == 'txcreate':
t.written_to_file('transaction')
ext = '[0,8000]{}.regtest.rawtx'.format('' if cfg.debug_utf8 else '')
ext = '[0,8000]{}.regtest.rawtx'.format('' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, no_dot=True)
t = self.spawn(
'mmgen-txsign',
@ -1081,7 +1084,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
txid = self.txsend_ui_common(t,
caller = mmgen_cmd,
quiet = mmgen_cmd == 'txdo' or not cfg.debug,
quiet = mmgen_cmd == 'txdo' or not self.cfg.debug,
bogus_send = False)
addr = strip_ansi_escapes(t.expect_getend('Contract address: '))
if (await self.get_tx_receipt(txid)).status == 0:
@ -1123,7 +1126,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
async def do_transfer(rpc):
for i in range(num_tokens):
tk = await ResolvedToken(
cfg,
self.cfg,
self.proto,
rpc,
self.read_from_tmpfile(f'token_addr{i+1}').strip())
@ -1143,7 +1146,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
async def show_bals(rpc):
for i in range(num_tokens):
tk = await ResolvedToken(
cfg,
self.cfg,
self.proto,
rpc,
self.read_from_tmpfile(f'token_addr{i+1}').strip())
@ -1156,7 +1159,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
def gen_addr(addr):
return tool_cmd(
cfg, cmdname='gen_addr', proto=self.proto).gen_addr(addr, wallet=dfl_words_file)
self.cfg, cmdname='gen_addr', proto=self.proto).gen_addr(addr, wallet=dfl_words_file)
silence()
usr_addrs = list(map(gen_addr, usr_mmaddrs))
@ -1372,7 +1375,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
caller = 'txdo',
fee_info_data = fee_info_data,
no_read = True)
self._do_confirm_send(t, quiet=not cfg.debug, sure=False)
self._do_confirm_send(t, quiet=not self.cfg.debug, sure=False)
return t
def txcreate_refresh_balances(self):
@ -1522,7 +1525,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
async def twmove(self):
self.spawn(msg_only=True)
from mmgen.tw.ctl import TwCtl
twctl = await TwCtl(cfg, self.proto, no_wallet_init=True)
twctl = await TwCtl(self.cfg, self.proto, no_wallet_init=True)
imsg('Moving tracking wallet')
fn_bak = twctl.tw_path.with_suffix('.bak.json')
fn_bak.unlink(missing_ok=True)
@ -1531,7 +1534,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
def twimport(self, add_args=[], expect_str=None):
from mmgen.tw.json import TwJSON
fn = joinpath(self.tmpdir, TwJSON.Base(cfg, self.proto).dump_fn)
fn = joinpath(self.tmpdir, TwJSON.Base(self.cfg, self.proto).dump_fn)
t = self.spawn('mmgen-tool', self.eth_args_noquiet + ['twimport', fn] + add_args)
t.expect('(y/N): ', 'y')
if expect_str:
@ -1545,7 +1548,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
def tw_chktotal(self):
self.spawn(msg_only=True)
from mmgen.tw.json import TwJSON
fn = joinpath(self.tmpdir, TwJSON.Base(cfg, self.proto).dump_fn)
fn = joinpath(self.tmpdir, TwJSON.Base(self.cfg, self.proto).dump_fn)
res = json.loads(read_from_file(fn))
cmp_or_die(res['data']['value'], vbal6, 'value in tracking wallet JSON dump')
return 'ok'
@ -1553,7 +1556,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
async def twcompare(self):
self.spawn(msg_only=True)
from mmgen.tw.ctl import TwCtl
twctl = await TwCtl(cfg, self.proto, no_wallet_init=True)
twctl = await TwCtl(self.cfg, self.proto, no_wallet_init=True)
fn = twctl.tw_path
fn_bak = fn.with_suffix('.bak.json')
imsg('Comparing imported tracking wallet with original')
@ -1564,7 +1567,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
def edit_json_twdump(self):
self.spawn(msg_only=True)
from mmgen.tw.json import TwJSON
fn = TwJSON.Base(cfg, self.proto).dump_fn
fn = TwJSON.Base(self.cfg, self.proto).dump_fn
text = json.loads(self.read_from_tmpfile(fn))
token_addr = self.read_from_tmpfile('token_addr2').strip()
text['data']['entries']['tokens'][token_addr][2][3] = f'edited comment [фубар] [{gr_uc}]'
@ -1573,7 +1576,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
def stop(self):
self.spawn(msg_only=True)
if not cfg.no_daemon_stop:
if not self.cfg.no_daemon_stop:
if not stop_test_daemons(self.proto.coin+'_rt', remove_datadir=True):
return False
set_vt100()

View file

@ -17,7 +17,6 @@ import sys, os, time
from mmgen.util import ymsg
from mmgen.cfg import gc
from ..include.common import cfg
from .ct_base import CmdTestBase
class CmdTestHelp(CmdTestBase):
@ -76,10 +75,10 @@ class CmdTestHelp(CmdTestBase):
no_passthru_opts = True)
t.expect('to continue: ', 'w')
t.expect('TERMS AND CONDITIONS') # start of GPL text
if cfg.pexpect_spawn:
if self.cfg.pexpect_spawn:
t.send('G')
t.expect('return for a fee.') # end of GPL text
if cfg.pexpect_spawn:
if self.cfg.pexpect_spawn:
t.send('q')
t.expect('to continue: ', 'c')
t.expect('data: ', 'beadcafe'*4 + '\n')

View file

@ -17,7 +17,6 @@ from mmgen.util import fmt, capfirst, remove_whitespace
from mmgen.wallet import get_wallet_cls
from ..include.common import (
cfg,
imsg,
imsg_r,
sample_mn,
@ -120,7 +119,7 @@ class CmdTestInput(CmdTestBase):
from mmgen.color import set_vt100
set_vt100()
imsg(cp.stderr.decode().strip())
res = get_data_from_file(cfg, 'test/trash/A773B05C[128].mmwords', silent=True).strip()
res = get_data_from_file(self.cfg, 'test/trash/A773B05C[128].mmwords', silent=True).strip()
assert res == mn, f'{res} != {mn}'
return 'ok' if b'written to file' in cp.stderr else 'error'
@ -422,7 +421,7 @@ class CmdTestInput(CmdTestBase):
mn = mn or sample_mn[fmt]['mn'].split()
t = self.spawn('mmgen-tool', ['mn2hex_interactive', 'fmt='+fmt, 'mn_len=12', 'print_mn=1'])
from mmgen.mn_entry import mn_entry
mne = mn_entry(cfg, fmt, entry_mode=entry_mode)
mne = mn_entry(self.cfg, fmt, entry_mode=entry_mode)
t.expect(
'Type a number.*: ',
('\n' if enter_for_dfl else str(mne.entry_modes.index(entry_mode)+1)),
@ -465,7 +464,7 @@ class CmdTestInput(CmdTestBase):
t.expect('Type a number.*: ', '6', regex=True)
t.expect('invalid')
from mmgen.mn_entry import mn_entry
mne = mn_entry(cfg, fmt, entry_mode=entry_mode)
mne = mn_entry(self.cfg, fmt, entry_mode=entry_mode)
t.expect('Type a number.*: ', str(mne.entry_modes.index(entry_mode)+1), regex=True)
t.expect(r'Using entry mode (\S+)', regex=True)
mode = strip_ansi_escapes(t.p.match.group(1)).lower()
@ -489,7 +488,7 @@ class CmdTestInput(CmdTestBase):
def mnemonic_entry_mmgen_minimal(self):
from mmgen.mn_entry import mn_entry
# erase_chars: '\b\x7f'
m = mn_entry(cfg, 'mmgen', entry_mode='minimal')
m = mn_entry(self.cfg, 'mmgen', entry_mode='minimal')
np = 2
mn = (
'z',

View file

@ -31,7 +31,6 @@ from mmgen.wallet.incog import wallet as IncogWallet
from mmgen.rpc import rpc_init
from ..include.common import (
cfg,
vmsg,
joinpath,
silence,
@ -57,7 +56,7 @@ from .common import (
from .ct_base import CmdTestBase
from .ct_shared import CmdTestShared
def make_brainwallet_file(fn):
def make_brainwallet_file(cfg, fn):
# Print random words with random whitespace in between
wl = rwords.split()
nwords, ws_list, max_spaces = 10, ' \n', 5
@ -375,8 +374,8 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
'txsign6',
)
def __init__(self, trunner, cfgs, spawn):
CmdTestBase.__init__(self, trunner, cfgs, spawn)
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None or self.coin not in self.networks:
return
if self.coin in ('btc', 'bch', 'ltc'):
@ -389,7 +388,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
@property
def lbl_id(self):
if not hasattr(self, '_lbl_id'):
rpc = async_run(rpc_init(cfg, self.proto))
rpc = async_run(rpc_init(self.cfg, self.proto))
self._lbl_id = ('account', 'label')['label_api' in rpc.caps]
return self._lbl_id
@ -397,9 +396,9 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
addrfile = self.get_file_with_ext('addrs')
from mmgen.addrlist import AddrList
silence()
chk = AddrList(cfg, self.proto, infile=addrfile).chksum
chk = AddrList(self.cfg, self.proto, infile=addrfile).chksum
end_silence()
if cfg.verbose and display:
if self.cfg.verbose and display:
msg(f'Checksum: {cyan(chk)}')
return chk
@ -429,10 +428,10 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
def delete_dfl_wallet(self, pf):
self.write_to_tmpfile('del_dw_run', b'', binary=True)
if cfg.no_dw_delete:
if self.cfg.no_dw_delete:
return 'skip'
for wf in [f for f in os.listdir(cfg.data_dir) if f[-6:]=='.mmdat']:
os.unlink(joinpath(cfg.data_dir, wf))
for wf in [f for f in os.listdir(self.cfg.data_dir) if f[-6:]=='.mmdat']:
os.unlink(joinpath(self.cfg.data_dir, wf))
self.spawn(msg_only=True)
self.have_dfl_wallet = False
return 'ok'
@ -487,7 +486,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
def passchg(self, wf, pf, label_action='cmdline', dfl_wallet=False, delete=False):
silence()
self.write_to_tmpfile(pwfile, get_data_from_file(cfg, pf))
self.write_to_tmpfile(pwfile, get_data_from_file(self.cfg, pf))
end_silence()
add_args = {
'cmdline': ['-d', self.tmpdir, '-L', 'Changed label (UTF-8) α'],
@ -534,13 +533,13 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
def _write_fake_data_to_file(self, d):
write_data_to_file(
cfg,
self.cfg,
self.unspent_data_file,
d,
desc = 'Unspent outputs',
quiet = True,
ignore_opt_outdir = True)
if cfg.verbose or cfg.exact_output:
if self.cfg.verbose or self.cfg.exact_output:
sys.stderr.write(f'Fake transaction wallet data written to file {self.unspent_data_file!r}\n')
def _create_fake_unspent_entry(
@ -595,7 +594,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
for d in tx_data.values():
al = adata.addrlist(al_id=d['al_id'])
for n, (idx, coinaddr) in enumerate(al.addrpairs()):
comment = get_comment(do_shuffle=not cfg.test_suite_deterministic)
comment = get_comment(do_shuffle=not self.cfg.test_suite_deterministic)
out.append(self._create_fake_unspent_entry(
coinaddr, d['al_id'], idx, comment, segwit=d['segwit']))
if n == 0: # create a duplicate address. This means addrs_per_wallet += 1
@ -611,13 +610,13 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
pubkey_type = 'std')
from mmgen.addrgen import KeyGenerator, AddrGenerator
rand_coinaddr = AddrGenerator(
cfg,
self.cfg,
self.proto,
('legacy', 'compressed')[non_mmgen_input_compressed]
).to_addr(KeyGenerator(cfg, self.proto, 'std').gen_data(privkey))
).to_addr(KeyGenerator(self.cfg, self.proto, 'std').gen_data(privkey))
of = joinpath(self.cfgs[non_mmgen_input]['tmpdir'], non_mmgen_fn)
write_data_to_file(
cfg = cfg,
cfg = self.cfg,
outfile = of,
data = privkey.wif + '\n',
desc = f'compressed {self.proto.name} key',
@ -633,7 +632,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
tx_data, ad = {}, AddrData(self.proto)
for s in sources:
addrfile = get_file_with_ext(self.cfgs[s]['tmpdir'], 'addrs')
al = AddrList(cfg, self.proto, infile=addrfile)
al = AddrList(self.cfg, self.proto, infile=addrfile)
ad.add(al)
aix = AddrIdxList(fmt_str=self.cfgs[s]['addr_idx_list'])
if len(aix) != addrs_per_wallet:
@ -652,8 +651,8 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
privkey = PrivKey(self.proto, getrand(32), compressed=True, pubkey_type='std')
t = ('compressed', 'segwit')['S' in self.proto.mmtypes]
from mmgen.addrgen import KeyGenerator, AddrGenerator
rand_coinaddr = AddrGenerator(cfg, self.proto, t).to_addr(
KeyGenerator(cfg, self.proto, 'std').gen_data(privkey)
rand_coinaddr = AddrGenerator(self.cfg, self.proto, t).to_addr(
KeyGenerator(self.cfg, self.proto, 'std').gen_data(privkey)
)
# total of two outputs must be < 10 BTC (<1000 LTC)
@ -703,7 +702,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
cmdline_inputs = False,
tweaks = []):
if cfg.verbose or cfg.exact_output:
if self.cfg.verbose or self.cfg.exact_output:
sys.stderr.write(green('Generating fake tracking wallet info\n'))
silence()
@ -727,7 +726,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
end_silence()
if cfg.verbose or cfg.exact_output:
if self.cfg.verbose or self.cfg.exact_output:
sys.stderr.write('\n')
t = self.spawn(
@ -789,8 +788,8 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
t.do_comment(False, has_label=True)
for cnum, wcls in (('1', IncogWallet), ('3', MMGenWallet), ('4', MMGenWallet)):
t.passphrase(wcls.desc, self.cfgs[cnum]['wpasswd'])
self._do_confirm_send(t, quiet=not cfg.debug, confirm_send=True)
if cfg.debug:
self._do_confirm_send(t, quiet=not self.cfg.debug, confirm_send=True)
if self.cfg.debug:
t.written_to_file('Transaction')
else:
t.do_comment(False)
@ -849,7 +848,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
wcls = get_wallet_cls(fmt_code=out_fmt)
msg('==> {}: {}'.format(
wcls.desc,
cyan(get_data_from_file(cfg, f, desc=wcls.desc))
cyan(get_data_from_file(self.cfg, f, desc=wcls.desc))
))
end_silence()
return t
@ -990,7 +989,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
def walletgen4(self, del_dw_run='dummy'):
bwf = joinpath(self.tmpdir, self.bw_filename)
make_brainwallet_file(bwf)
make_brainwallet_file(self.cfg, bwf)
seed_len = str(self.seed_len)
args = ['-d', self.tmpdir, '-p1', self.usr_rand_arg, '-l'+seed_len, '-ibw']
t = self.spawn('mmgen-walletconv', self.testnet_opt + args + [bwf], no_passthru_opts=True)

View file

@ -24,7 +24,7 @@ import sys, re
from mmgen.util import die
from ..include.common import cfg, start_test_daemons, stop_test_daemons, imsg
from ..include.common import start_test_daemons, stop_test_daemons, imsg
from .common import get_file_with_ext, dfl_words_file
from .ct_base import CmdTestBase
from .ct_main import CmdTestMain
@ -79,13 +79,13 @@ class CmdTestMisc(CmdTestBase):
color = True
def rpc_backends(self):
backends = cfg._autoset_opts['rpc_backend'][1]
backends = self.cfg._autoset_opts['rpc_backend'][1]
for b in backends:
t = self.spawn_chk('mmgen-tool', [f'--rpc-backend={b}', 'daemon_version'], extra_desc=f'({b})')
return t
def _bch_txview(self, view_pref, terse, expect):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
tx = 'test/ref/bitcoin_cash/895108-BCH[2.65913].rawtx'
t = self.spawn('mmgen-tool', ['--coin=bch', f'--cashaddr={view_pref}', 'txview', tx, f'terse={terse}'])
@ -106,7 +106,7 @@ class CmdTestMisc(CmdTestBase):
return self._bch_txview(1, 1, '[1HpynST7vkLn8yNtdrqPfeghexZk4sdB3W]')
def xmrwallet_txview(self, op='txview'):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
files = get_file_with_ext('test/ref/monero', 'tx', no_dot=True, delete=False, return_list=True)
t = self.spawn('mmgen-xmrwallet', [op] + files)
@ -125,12 +125,12 @@ class CmdTestMisc(CmdTestBase):
return self.xmrwallet_txview(op='txlist')
def examples_bip_hd(self):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
return self.spawn('examples/bip_hd.py', cmd_dir='.')
def coin_daemon_info(self):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
coins = ['btc']
else:
coins = ['btc', 'ltc', 'eth']
@ -138,9 +138,9 @@ class CmdTestMisc(CmdTestBase):
t = self.spawn('examples/coin-daemon-info.py', coins, cmd_dir='.')
for coin in coins:
t.expect(coin.upper() + r'\s+mainnet\s+Up', regex=True)
if cfg.pexpect_spawn:
if self.cfg.pexpect_spawn:
t.send('q')
if not cfg.no_altcoin:
if not self.cfg.no_altcoin:
stop_test_daemons('ltc', 'eth', remove_datadir=True)
return t
@ -166,7 +166,7 @@ class CmdTestMisc(CmdTestBase):
t = self.spawn('test/misc/term_ni.py', ['echo'], cmd_dir='.', pexpect_spawn=True, timeout=1)
t.p.logfile = None
t.p.logfile_read = sys.stdout if cfg.verbose or cfg.exact_output else None
t.p.logfile_read = sys.stdout if self.cfg.verbose or self.cfg.exact_output else None
t.p.logfile_send = None
test_noecho()
@ -252,7 +252,7 @@ class CmdTestRefTX(CmdTestMain, CmdTestBase):
),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
if cfgs:
for n in self.tmpdir_nums:
cfgs[str(n)].update({
@ -260,7 +260,7 @@ class CmdTestRefTX(CmdTestMain, CmdTestBase):
'segwit': n in (33, 34),
'dep_generators': {'addrs':'ref_tx_addrgen'+str(n)[-1]}
})
CmdTestMain.__init__(self, trunner, cfgs, spawn)
CmdTestMain.__init__(self, cfg, trunner, cfgs, spawn)
def ref_tx_addrgen(self, atype):
if atype not in self.proto.mmtypes:

View file

@ -12,7 +12,6 @@ test.cmdtest_d.ct_opts: options processing tests for the MMGen cmdtest.py test s
import os, time
from ..include.common import cfg
from .ct_base import CmdTestBase
class CmdTestOpts(CmdTestBase):
@ -117,7 +116,7 @@ class CmdTestOpts(CmdTestBase):
def opt_helpscreen(self):
expect = r'OPTS.PY: Opts test.*USAGE:\s+opts.py'
if not cfg.pexpect_spawn:
if not self.cfg.pexpect_spawn:
expect += r'.*--minconf.*NOTES FOR THIS.*a note'
t = self.do_run(['--help'], expect, 0, regex=True)
if t.pexpect_spawn:
@ -206,12 +205,12 @@ class CmdTestOpts(CmdTestBase):
return self.check_vals(['--minc=7'], (('cfg.minconf', '7'),))
def opt_good6(self):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
return self.check_vals(['--coin=xmr'], (('cfg.coin', 'XMR'),))
def opt_good7(self):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
return self.check_vals(['--coin', 'xmr'], (('cfg.coin', 'XMR'),))

View file

@ -25,7 +25,6 @@ import os
from mmgen.util import capfirst
from mmgen.wallet import get_wallet_cls
from ..include.common import (
cfg,
imsg_r,
ok,
joinpath,
@ -341,7 +340,7 @@ class CmdTestRef(CmdTestBase, CmdTestShared):
t = self.spawn(
'mmgen-tool',
['-q', 'decrypt', f, 'outfile='+dec_file, 'hash_preset=1'],
env = os.environ if cfg.debug_utf8 else get_env_without_debug_vars())
env = os.environ if self.cfg.debug_utf8 else get_env_without_debug_vars())
t.passphrase('data', tool_enc_passwd)
t.written_to_file('Decrypted data')
dec_txt = read_from_file(dec_file)

View file

@ -26,7 +26,7 @@ import os
from mmgen.util import msg, capfirst
from mmgen.wallet import get_wallet_cls
from ..include.common import cfg, cmp_or_die, joinpath
from ..include.common import cmp_or_die, joinpath
from .common import (
pwfile,
ref_wallet_hash_preset,
@ -79,7 +79,7 @@ class CmdTestRef3Seed(CmdTestBase, CmdTestShared):
('ref_walletconv_hexincog', ([], 'wallet filename (hex incog)')),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
for k, _ in self.cmd_group:
for n in (1, 2, 3): # 128, 192, 256 bits
setattr(self, f'{k}_{n}', getattr(self, k))
@ -87,7 +87,7 @@ class CmdTestRef3Seed(CmdTestBase, CmdTestShared):
for n in self.tmpdir_nums:
cfgs[str(n)]['addr_idx_list'] = self.addr_idx_list_in
cfgs[str(n)]['pass_idx_list'] = self.pass_idx_list_in
CmdTestBase.__init__(self, trunner, cfgs, spawn)
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
def ref_wallet_chk(self):
wf = joinpath(ref_dir, CmdTestWalletConv.sources[str(self.seed_len)]['ref_wallet'])
@ -129,7 +129,7 @@ class CmdTestRef3Seed(CmdTestBase, CmdTestShared):
]
slarg = [f'-l{self.seed_len} ']
hparg = ['-p1']
if wtype == 'hic_wallet_old' and cfg.profile:
if wtype == 'hic_wallet_old' and self.cfg.profile:
msg('')
t = self.spawn('mmgen-walletchk',
slarg + hparg + of_arg + ic_arg,
@ -163,7 +163,7 @@ class CmdTestRef3Seed(CmdTestBase, CmdTestShared):
pat = r'{}-[0-9A-F]{{8}}\[{},1\]{}.mmdat'.format(
self.chk_data['sids'][idx],
self.chk_data['lens'][idx],
'' if cfg.debug_utf8 else '')
'' if self.cfg.debug_utf8 else '')
assert re.match(pat, fn), f'{pat} != {fn}'
sid = os.path.basename(fn.split('-')[0])
cmp_or_die(sid, self.seed_id, desc='Seed ID')
@ -186,7 +186,7 @@ class CmdTestRef3Seed(CmdTestBase, CmdTestShared):
cmp_or_die('{}[{}]{}.{}'.format(
sid,
slen,
'' if cfg.debug_utf8 else '',
'' if self.cfg.debug_utf8 else '',
wcls.ext),
fn)
return t
@ -206,7 +206,7 @@ class CmdTestRef3Seed(CmdTestBase, CmdTestShared):
def ref_walletconv_incog(self, ofmt='incog', ext='mmincog'):
args = ['-r0', '-p1']
pat = r'{}-[0-9A-F]{{8}}-[0-9A-F]{{8}}\[{},1\]' + ('' if cfg.debug_utf8 else '') + '.' + ext
pat = r'{}-[0-9A-F]{{8}}-[0-9A-F]{{8}}\[{},1\]' + ('' if self.cfg.debug_utf8 else '') + '.' + ext
return self.ref_walletconv(ofmt=ofmt, extra_args=args, re_pat=pat)
def ref_walletconv_hexincog(self):
@ -465,7 +465,7 @@ class CmdTestRef3Passwd(CmdTestRef3Seed):
if pwlen > {'1':12, '2':18, '3':24}[self.test_name[-1]]:
return 'skip'
if pwfmt == 'xmrseed':
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
pwlen += 1
return self.pwgen(ftype, 'фубар@crypto.org', pwfmt, pwlen, ['--accept-defaults'])

View file

@ -23,7 +23,7 @@ test.cmdtest_d.ct_ref_altcoin: Altcoin reference file tests for the cmdtest.py t
from mmgen.color import set_vt100
from .common import pwfile, dfl_wpasswd, ref_dir, dfl_words_file, dfl_addr_idx_list
from ..include.common import cfg, joinpath, start_test_daemons, stop_test_daemons, cmp_or_die
from ..include.common import joinpath, start_test_daemons, stop_test_daemons, cmp_or_die
from .ct_ref import CmdTestRef
from .ct_base import CmdTestBase
@ -102,7 +102,7 @@ class CmdTestRefAltcoin(CmdTestRef, CmdTestBase):
ref_dir,
self._get_ref_subdir_by_coin(coin),
fn)
proto = MMGenTxFile.get_proto(cfg, txfile, quiet_open=True)
proto = MMGenTxFile.get_proto(self.cfg, txfile, quiet_open=True)
if proto.sign_mode == 'daemon':
start_test_daemons(proto.network_id)
set_vt100()

View file

@ -33,7 +33,6 @@ from mmgen.addrlist import AddrList
from mmgen.wallet import Wallet, get_wallet_cls
from ..include.common import (
cfg,
imsg,
omsg,
ok,
@ -472,9 +471,9 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
),
}
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestBase.__init__(self, trunner, cfgs, spawn)
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return
@ -511,7 +510,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def _add_comments_to_addr_file(self, proto, addrfile, outfile, use_comments=False):
silence()
gmsg(f'Adding comments to address file {addrfile!r}')
a = AddrList(cfg, proto, infile=addrfile)
a = AddrList(self.cfg, proto, infile=addrfile)
for n, idx in enumerate(a.idxs(), 1):
if use_comments:
a.set_comment(idx, get_comment())
@ -521,7 +520,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
a.file.format(add_comments=True)
from mmgen.fileutil import write_data_to_file
write_data_to_file(
cfg,
self.cfg,
outfile = outfile,
data = a.file.fmt_data,
quiet = True,
@ -600,7 +599,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def _get_user_subsid(self, user, subseed_idx):
fn = get_file_with_ext(self._user_dir(user), dfl_wcls.ext)
silence()
w = Wallet(cfg, fn=fn, passwd_file=os.path.join(self.tmpdir, 'wallet_password'))
w = Wallet(self.cfg, fn=fn, passwd_file=os.path.join(self.tmpdir, 'wallet_password'))
end_silence()
return w.seed.subseed(subseed_idx).sid
@ -655,7 +654,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
addrfile = joinpath(self._user_dir(user),
'{}{}{}[{}]{x}.regtest.addrs'.format(
sid, self.get_altcoin_pfx(proto.coin), id_strs[desc], addr_range,
x='' if cfg.debug_utf8 else ''))
x='' if self.cfg.debug_utf8 else ''))
if mmtype == proto.mmtypes[0] and user == 'bob':
self._add_comments_to_addr_file(proto, addrfile, addrfile, use_comments=True)
t = self.spawn(
@ -666,7 +665,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
(['--batch'] if batch else []) +
[f'--coin={proto.coin}', addrfile]),
extra_desc = f'({desc})')
if cfg.debug:
if self.cfg.debug:
t.expect("Type uppercase 'YES' to confirm: ", 'YES\n')
t.expect('Importing')
if batch:
@ -1127,10 +1126,10 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
proto = proto or self.proto
id_str = {'L':'', 'S':'-S', 'C':'-C', 'B':'-B'}[mmtype]
ext = '{}{}{}[{}]{x}.regtest.addrs'.format(
sid, self.get_altcoin_pfx(proto.coin), id_str, addr_range, x='' if cfg.debug_utf8 else '')
sid, self.get_altcoin_pfx(proto.coin), id_str, addr_range, x='' if self.cfg.debug_utf8 else '')
addrfile = get_file_with_ext(self._user_dir(user), ext, no_dot=True)
silence()
addr = AddrList(cfg, proto, infile=addrfile).data[idx].addr
addr = AddrList(self.cfg, proto, infile=addrfile).data[idx].addr
end_silence()
return addr
@ -1148,7 +1147,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def bob_rbf_1output_bump(self):
if not self.test_rbf:
return 'skip'
ext = '9343,3]{x}.regtest.rawtx'.format(x='' if cfg.debug_utf8 else '')
ext = '9343,3]{x}.regtest.rawtx'.format(x='' if self.cfg.debug_utf8 else '')
txfile = get_file_with_ext(self.tr.trash_dir, ext, delete=False, no_dot=True)
return self.user_txbump('bob',
self.tr.trash_dir,
@ -1221,7 +1220,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
return self._bob_rbf_bump(['--send'], rtFee[2])
def _bob_rbf_bump(self, add_args, fee):
ext = ',{}]{x}.regtest.sigtx'.format(rtFee[1][:-1], x='' if cfg.debug_utf8 else '')
ext = ',{}]{x}.regtest.sigtx'.format(rtFee[1][:-1], x='' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, delete=False, no_dot=True)
return self.user_txbump('bob', self.tmpdir, txfile, fee, add_args=add_args)
@ -1238,7 +1237,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
ret = self.spawn(
'mmgen-regtest',
cmd_args,
env = (os.environ if cfg.debug_utf8 else get_env_without_debug_vars()) | (
env = (os.environ if self.cfg.debug_utf8 else get_env_without_debug_vars()) | (
{'EXEC_WRAPPER_DO_RUNTIME_MSG': ''}),
no_msg = True
).read().strip()
@ -1285,7 +1284,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
r1, r2 = (f'Transaction has {confirmations} confirmation', '')
else:
r1, r2 = ('in mempool, replaceable', '')
ext = ',{}]{x}.regtest.sigtx'.format(fee[:-1], x='' if cfg.debug_utf8 else '')
ext = ',{}]{x}.regtest.sigtx'.format(fee[:-1], x='' if self.cfg.debug_utf8 else '')
txfile = self.get_file_with_ext(ext, delete=False, no_dot=True)
return self.user_txsend_status('bob', txfile, r1, r2, exit_val=exit_val)
@ -1309,7 +1308,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def _gen_pairs(self, n):
from mmgen.tool.api import tool_api
t = tool_api(cfg)
t = tool_api(self.cfg)
t.init_coin(self.proto.coin, self.proto.network)
def gen_addr(Type):
@ -1327,7 +1326,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def user_import(self, user, args, nAddr):
t = self.spawn('mmgen-addrimport', ['--'+user] + args)
if cfg.debug:
if self.cfg.debug:
t.expect("Type uppercase 'YES' to confirm: ", 'YES\n')
t.expect(f'Importing {nAddr} address')
if '--rescan' in args:
@ -1538,7 +1537,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def bob_edit_json_twdump(self):
self.spawn(msg_only=True)
from mmgen.tw.json import TwJSON
fn = TwJSON.Base(cfg, self.proto).dump_fn
fn = TwJSON.Base(self.cfg, self.proto).dump_fn
text = json.loads(self.read_from_tmpfile(fn))
text['data']['entries'][3][3] = f'edited comment [фубар] [{gr_uc}]'
self.write_to_tmpfile(fn, json.dumps(text, indent=4))
@ -1551,7 +1550,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
expect_str = None,
expect_str2 = 'Found 1 unspent output'):
from mmgen.tw.json import TwJSON
fn = joinpath(self.tmpdir, TwJSON.Base(cfg, self.proto).dump_fn)
fn = joinpath(self.tmpdir, TwJSON.Base(self.cfg, self.proto).dump_fn)
t = self.spawn(
'mmgen-tool',
([f'--rpc-backend={rpc_backend}'] if rpc_backend else [])
@ -1684,7 +1683,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def alice_add_comment_badaddr2(self):
# mainnet zero address:
addr = init_proto(cfg, self.proto.coin, network='mainnet').pubhash2addr(bytes(20), 'p2pkh')
addr = init_proto(self.cfg, self.proto.coin, network='mainnet').pubhash2addr(bytes(20), 'p2pkh')
return self.alice_add_comment_badaddr(addr, 'invalid address', 2)
def alice_add_comment_badaddr3(self):
@ -1861,7 +1860,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
s,
regex=True)
if t.pexpect_spawn and s == 'w':
t.expect(r'Total.*', 'q', regex=True, delay=1 if cfg.exact_output else t.send_delay)
t.expect(r'Total.*', 'q', regex=True, delay=1 if self.cfg.exact_output else t.send_delay)
time.sleep(t.send_delay)
t.send('e')
return t
@ -2262,7 +2261,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
def stop(self):
self.spawn(msg_only=True)
if cfg.no_daemon_stop:
if self.cfg.no_daemon_stop:
msg_r(f'(leaving regtest daemon{suf(self.protos)} running by user request)')
imsg('')
else:

View file

@ -20,7 +20,7 @@ from .common import dfl_bip39_file
from .httpd.thornode import ThornodeServer
from .ct_autosign import CmdTestAutosign, CmdTestAutosignThreaded
from .ct_regtest import CmdTestRegtest, rt_data, dfl_wcls, rt_pw, cfg, strip_ansi_escapes
from .ct_regtest import CmdTestRegtest, rt_data, dfl_wcls, rt_pw, strip_ansi_escapes
thornode_server = ThornodeServer()
@ -158,10 +158,10 @@ class CmdTestSwap(CmdTestRegtest, CmdTestAutosignThreaded):
),
}
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestAutosignThreaded.__init__(self, trunner, cfgs, spawn)
CmdTestRegtest.__init__(self, trunner, cfgs, spawn)
CmdTestAutosignThreaded.__init__(self, cfg, trunner, cfgs, spawn)
CmdTestRegtest.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return

View file

@ -16,7 +16,6 @@ from mmgen.util import suf
from mmgen.color import cyan
from ..include.common import (
cfg,
vmsg,
read_from_file,
write_to_file,
@ -117,7 +116,7 @@ class CmdTestTool(CmdTestMain, CmdTestBase):
return t
def _decrypt_keystore(self, cmd, fn, pw, chk):
if cfg.no_altcoin:
if self.cfg.no_altcoin:
return 'skip'
t = self.spawn('mmgen-tool', ['-d', self.tmpdir, cmd, fn])
t.expect('Enter passphrase: ', pw+'\n')
@ -141,7 +140,7 @@ class CmdTestTool(CmdTestMain, CmdTestBase):
def tool_api(self):
t = self.spawn(
'tool_api_test.py',
(['no_altcoin'] if cfg.no_altcoin else []),
(['no_altcoin'] if self.cfg.no_altcoin else []),
cmd_dir = 'test/misc')
t.expect('legacy.*compressed.*segwit.*bech32', regex=True)
return t

View file

@ -25,7 +25,7 @@ import sys, os
from mmgen.util import msg, capfirst, get_extension
from mmgen.wallet import get_wallet_cls
from ..include.common import cfg, joinpath, VirtBlockDevice
from ..include.common import joinpath, VirtBlockDevice
from .common import ref_dir, ref_wallet_brainpass, ref_wallet_incog_offset, hincog_fn, hincog_bytes
from .ct_base import CmdTestBase
from .ct_shared import CmdTestShared
@ -89,11 +89,11 @@ class CmdTestWalletConv(CmdTestBase, CmdTestShared):
('ref_hincog_blkdev_conv_out', 'ref seed conversion to hidden incog data on block device')
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
for k, _ in self.cmd_group:
for n in (1, 2, 3):
setattr(self, f'{k}_{n}', getattr(self, k))
CmdTestBase.__init__(self, trunner, cfgs, spawn)
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
def ref_wallet_conv(self):
wf = joinpath(ref_dir, self.sources[str(self.seed_len)]['ref_wallet'])
@ -200,7 +200,7 @@ class CmdTestWalletConv(CmdTestBase, CmdTestShared):
wf = t.written_to_file(capfirst(ocls.desc))
t.p.wait()
# back check of result
msg('' if cfg.profile else ' OK')
msg('' if self.cfg.profile else ' OK')
return self.walletchk(
wf,
extra_desc = '(check)',
@ -229,7 +229,7 @@ class CmdTestWalletConv(CmdTestBase, CmdTestShared):
if wcls.type == 'incog_hidden':
add_args += uopts_chk
wf = None
msg('' if cfg.profile else ' OK')
msg('' if self.cfg.profile else ' OK')
return self.walletchk(
wf,
wcls = wcls,

View file

@ -18,13 +18,13 @@ import re
from mmgen.color import blue, cyan, brown
from mmgen.util import async_run
from ..include.common import cfg, imsg, silence, end_silence
from ..include.common import imsg, silence, end_silence
from .common import get_file_with_ext
from .ct_xmrwallet import CmdTestXMRWallet
from .ct_autosign import CmdTestAutosignThreaded
def make_burn_addr():
def make_burn_addr(cfg):
from mmgen.tool.coin import tool_cmd
return tool_cmd(
cfg = cfg,
@ -99,16 +99,16 @@ class CmdTestXMRAutosign(CmdTestXMRWallet, CmdTestAutosignThreaded):
('check_tx_dirs', 'cleaning and checking signable file directories'),
)
def __init__(self, trunner, cfgs, spawn):
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestAutosignThreaded.__init__(self, trunner, cfgs, spawn)
CmdTestXMRWallet.__init__(self, trunner, cfgs, spawn)
CmdTestAutosignThreaded.__init__(self, cfg, trunner, cfgs, spawn)
CmdTestXMRWallet.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return
from mmgen.cfg import Config
self.cfg = Config({
self.alice_cfg = Config({
'coin': 'XMR',
'outdir': self.users['alice'].udir,
'wallet_dir': self.users['alice'].udir,
@ -116,7 +116,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet, CmdTestAutosignThreaded):
'test_suite': True,
})
self.burn_addr = make_burn_addr()
self.burn_addr = make_burn_addr(cfg)
self.opts.append('--xmrwallets={}'.format(self.users['alice'].kal_range)) # mmgen-autosign opts
self.autosign_opts = ['--autosign'] # mmgen-xmrwallet opts
@ -130,15 +130,15 @@ class CmdTestXMRAutosign(CmdTestXMRWallet, CmdTestAutosignThreaded):
from mmgen.addrlist import KeyAddrList
silence()
kal = KeyAddrList(
cfg = self.cfg,
cfg = self.alice_cfg,
proto = self.proto,
addr_idxs = '1-2',
seed = Wallet(cfg, fn=data.mmwords).seed,
seed = Wallet(self.alice_cfg, fn=data.mmwords).seed,
skip_chksum_msg = True,
key_address_validity_check = False)
kal.file.write(ask_overwrite=False)
fn = get_file_with_ext(data.udir, 'akeys')
m = op('create', self.cfg, fn, '1-2')
m = op('create', self.alice_cfg, fn, '1-2')
async_run(m.main())
async_run(m.stop_wallet_daemon())
end_silence()

View file

@ -31,7 +31,6 @@ from mmgen.amt import XMRAmt
from mmgen.addrlist import ViewKeyAddrList, KeyAddrList, AddrIdxList
from ..include.common import (
cfg,
omsg,
oqmsg_r,
ok,
@ -118,8 +117,9 @@ class CmdTestXMRWallet(CmdTestBase):
('stop_daemons', 'stopping all wallet and coin daemons'),
)
def __init__(self, trunner, cfgs, spawn):
CmdTestBase.__init__(self, trunner, cfgs, spawn)
def __init__(self, cfg, trunner, cfgs, spawn):
CmdTestBase.__init__(self, cfg, trunner, cfgs, spawn)
if trunner is None:
return
@ -147,14 +147,14 @@ class CmdTestXMRWallet(CmdTestBase):
shutil.rmtree(self.datadir_base)
os.makedirs(self.datadir_base)
self.start_daemons()
self.init_proxy()
self.init_proxy(cfg)
self.balance = None
# init methods
@classmethod
def init_proxy(cls, external_call=False):
def init_proxy(cls, cfg, external_call=False):
def start_proxy():
if external_call or not cfg.no_daemon_autostart:
@ -256,7 +256,7 @@ class CmdTestXMRWallet(CmdTestBase):
udir = os.path.join(tmpdir, user)
datadir = os.path.join(self.datadir_base, user)
md = CoinDaemon(
cfg = cfg,
cfg = self.cfg,
proto = self.proto,
test_suite = True,
port_shift = shift,
@ -264,7 +264,7 @@ class CmdTestXMRWallet(CmdTestBase):
datadir = datadir
)
md_rpc = MoneroRPCClient(
cfg = cfg,
cfg = self.cfg,
proto = self.proto,
host = 'localhost',
port = md.rpc_port,
@ -274,7 +274,7 @@ class CmdTestXMRWallet(CmdTestBase):
daemon = md,
)
wd = MoneroWalletDaemon(
cfg = cfg,
cfg = self.cfg,
proto = self.proto,
test_suite = True,
wallet_dir = udir,
@ -284,7 +284,7 @@ class CmdTestXMRWallet(CmdTestBase):
monerod_addr = f'127.0.0.1:{md.rpc_port}',
)
wd_rpc = MoneroWalletRPCClient(
cfg = cfg,
cfg = self.cfg,
daemon = wd,
test_connection = False,
)
@ -376,7 +376,7 @@ class CmdTestXMRWallet(CmdTestBase):
)
for i in MMGenRange(wallet or data.kal_range).items:
write_data_to_file(
cfg,
self.cfg,
self.users[user].addrfile_fs.format(i),
t.expect_getend('Address: '),
quiet = True
@ -674,7 +674,7 @@ class CmdTestXMRWallet(CmdTestBase):
self.do_mount_online()
silence()
kal = (ViewKeyAddrList if data.autosign else KeyAddrList)(
cfg = cfg,
cfg = self.cfg,
proto = self.proto,
infile = data.kafile,
skip_chksum_msg = True,
@ -683,14 +683,14 @@ class CmdTestXMRWallet(CmdTestBase):
if data.autosign:
self.do_umount_online()
self.remove_device_online()
self.users[user].wd.start(silent=not (cfg.exact_output or cfg.verbose))
self.users[user].wd.start(silent=not (self.cfg.exact_output or self.cfg.verbose))
return data.wd_rpc.call(
'open_wallet',
filename = os.path.basename(data.walletfile_fs.format(wnum)),
password = kal.entry(wnum).wallet_passwd)
async def stop_wallet_user(self, user):
await self.users[user].wd_rpc.stop_daemon(silent=not (cfg.exact_output or cfg.verbose))
await self.users[user].wd_rpc.stop_daemon(silent=not (self.cfg.exact_output or self.cfg.verbose))
return 'ok'
# mining methods
@ -786,7 +786,7 @@ class CmdTestXMRWallet(CmdTestBase):
async def send_random_txs():
from mmgen.tool.api import tool_api
t = tool_api(cfg)
t = tool_api(self.cfg)
t.init_coin('XMR', 'mainnet')
t.usr_randchars = 0
imsg_r('Sending random transactions: ')
@ -901,7 +901,7 @@ class CmdTestXMRWallet(CmdTestBase):
def stop_daemons(self):
self.spawn(msg_only=True)
if cfg.no_daemon_stop:
if self.cfg.no_daemon_stop:
omsg('[not stopping daemons at user request]')
else:
omsg('')

View file

@ -28,7 +28,7 @@ class CmdGroupMgr:
def __init__(self, cfg):
self.cfg = cfg
self.network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '')
self.network_id = cfg._proto.coin.lower() + ('_tn' if cfg._proto.testnet else '')
self.name = type(self).__name__
def create_cmd_group(self, cls, sg_name=None):
@ -120,11 +120,11 @@ class CmdGroupMgr:
return cls
def gm_init_group(self, trunner, gname, sg_name, spawn_prog):
def gm_init_group(self, cfg, trunner, gname, sg_name, spawn_prog):
kwargs = self.cmd_groups[gname][1]
cls = self.create_group(gname, sg_name, **kwargs)
cls.group_name = gname
return cls(trunner, cfgs, spawn_prog)
return cls(cfg, trunner, cfgs, spawn_prog)
def get_cls_by_gname(self, gname):
return self.load_mod(gname, self.cmd_groups[gname][1].get('modname'))
@ -184,7 +184,7 @@ class CmdGroupMgr:
if cmd in cls.cmd_group: # first search the class
return gname
if cmd in dir(cls(None, None, None)): # then a throwaway instance
if cmd in dir(cls(self.cfg, None, None, None)): # then a throwaway instance
return gname # cmd might exist in more than one group - we'll go with the first
return None

View file

@ -40,10 +40,10 @@ class CmdTestRunner:
if self.logging:
self.log_fd.close()
def __init__(self, cfg, proto, repo_root, data_dir, trash_dir, trash_dir2):
def __init__(self, cfg, repo_root, data_dir, trash_dir, trash_dir2):
self.cfg = cfg
self.proto = proto
self.proto = cfg._proto
self.data_dir = data_dir
self.trash_dir = trash_dir
self.trash_dir2 = trash_dir2
@ -56,7 +56,8 @@ class CmdTestRunner:
self.deps_only = None
self.logging = self.cfg.log or os.getenv('MMGEN_EXEC_WRAPPER')
self.testing_segwit = cfg.segwit or cfg.segwit_random or cfg.bech32
self.network_id = cfg.coin.lower() + ('_tn' if cfg.testnet else '')
self.network_id = self.proto.coin.lower() + ('_tn' if self.proto.testnet else '')
self.daemon_started = False
global qmsg, qmsg_r
if cfg.exact_output:
@ -83,6 +84,7 @@ class CmdTestRunner:
omsg('INFO → Using pexpect.spawn() for real terminal emulation')
self.set_spawn_env()
self.start_time = time.time()
def do_between(self):
if self.cfg.pause:
@ -279,7 +281,7 @@ class CmdTestRunner:
if hasattr(self, 'tg'):
del self.tg
self.tg = self.gm.gm_init_group(self, gname, sg_name, self.spawn_wrapper)
self.tg = self.gm.gm_init_group(self.cfg, self, gname, sg_name, self.spawn_wrapper)
self.ct_clsname = type(self.tg).__name__
# pass through opts from cmdline (po.user_opts)
@ -302,8 +304,6 @@ class CmdTestRunner:
return self.tg
def run_tests(self, cmd_args):
self.start_time = time.time()
self.daemon_started = False
gname_save = None
def parse_arg(arg):
@ -497,7 +497,7 @@ class CmdTestRunner:
else:
die(2, f'{cmd!r} returned {ret}')
def check_deps(self, cmds): # TODO: broken
def check_deps(self, cmds): # TODO: broken, unused
if len(cmds) != 1:
die(1, f'Usage: {gc.prog_name} check_deps <command>')
@ -509,7 +509,7 @@ class CmdTestRunner:
if not self.cfg.quiet:
omsg(f'Checking dependencies for {cmd!r}')
self.check_needs_rerun(self.tg, cmd)
self.check_needs_rerun(cmd)
w = max(map(len, self.rebuild_list)) + 1
for cmd in self.rebuild_list:

View file

@ -66,4 +66,4 @@ class unit_tests:
def ssh_socks_proxy(self, name, ut):
from test.cmdtest_d.ct_xmrwallet import CmdTestXMRWallet
return CmdTestXMRWallet.init_proxy(external_call=True)
return CmdTestXMRWallet.init_proxy(cfg, external_call=True)