Config API, Part II
This patch completes the implementation of the API, making the entire MMGen
code base usable as a library for external projects. A usage example can be
found in the script `examples/coin-daemon-info.py`.
Testing:
$ test/test.py -e coin_daemon_info
This commit is contained in:
parent
bac47a6a0a
commit
e90e25b2f0
5 changed files with 693 additions and 615 deletions
94
examples/coin-daemon-info.py
Executable file
94
examples/coin-daemon-info.py
Executable file
|
|
@ -0,0 +1,94 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
#
|
||||||
|
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
|
||||||
|
# Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
|
||||||
|
# Licensed under the GNU General Public License, Version 3:
|
||||||
|
# https://www.gnu.org/licenses
|
||||||
|
# Public project repositories:
|
||||||
|
# https://github.com/mmgen/mmgen
|
||||||
|
# https://gitlab.com/mmgen/mmgen
|
||||||
|
|
||||||
|
"""
|
||||||
|
examples/coin-daemon-info.py:
|
||||||
|
Get info about multiple running coin daemons.
|
||||||
|
Demonstrates use of the MMGen Config API.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Instructions
|
||||||
|
#
|
||||||
|
# Testing mode:
|
||||||
|
#
|
||||||
|
# 1) From the MMGen repository root, start the mainnet test suite daemons as follows
|
||||||
|
# (note that openethereum is the default testing daemon for ETH):
|
||||||
|
#
|
||||||
|
# test/start-coin-daemons.py btc ltc eth
|
||||||
|
#
|
||||||
|
# 2) Then run the script as follows:
|
||||||
|
#
|
||||||
|
# PYTHONPATH=. MMGEN_TEST_SUITE=1 examples/coin-daemon-info.py btc ltc eth
|
||||||
|
#
|
||||||
|
# Live mode:
|
||||||
|
#
|
||||||
|
# 1) Start up one or more of bitcoind, litecoind or geth with the standard mainnet ports
|
||||||
|
# and datadirs. For geth, the options ‘--http --http.api=eth,web3’ are required.
|
||||||
|
#
|
||||||
|
# 2) Then run the script as follows:
|
||||||
|
#
|
||||||
|
# PYTHONPATH=. examples/coin-daemon-info.py btc ltc eth
|
||||||
|
|
||||||
|
import sys,os,asyncio
|
||||||
|
|
||||||
|
from mmgen.exception import SocketError
|
||||||
|
from mmgen.cfg import Config
|
||||||
|
from mmgen.rpc import rpc_init
|
||||||
|
from mmgen.util import make_timestr
|
||||||
|
|
||||||
|
async def get_rpc(cfg):
|
||||||
|
try:
|
||||||
|
return await rpc_init(cfg)
|
||||||
|
except SocketError as e:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def main(coins):
|
||||||
|
|
||||||
|
rpcs = {}
|
||||||
|
cfgs = {}
|
||||||
|
test_suite = os.getenv('MMGEN_TEST_SUITE')
|
||||||
|
base_cfg = Config({'pager':True})
|
||||||
|
|
||||||
|
for coin in coins:
|
||||||
|
cfg_in = {
|
||||||
|
'coin': coin,
|
||||||
|
'test_suite': test_suite,
|
||||||
|
}
|
||||||
|
if coin == 'eth' and not test_suite:
|
||||||
|
cfg_in.update({'daemon_id': 'geth'})
|
||||||
|
cfgs[coin] = Config(cfg_in)
|
||||||
|
rpcs[coin] = await get_rpc(cfgs[coin])
|
||||||
|
|
||||||
|
def gen_output():
|
||||||
|
fs = '{:4} {:7} {:6} {:<5} {:<8} {:30} {:13} {:23} {}'
|
||||||
|
yield fs.format('Coin','Network','Status','Port','Chain','Latest Block','Daemon','Version','Datadir')
|
||||||
|
for coin,rpc in rpcs.items():
|
||||||
|
info = ('Down','-','-','-','-','-','-') if rpc is False else (
|
||||||
|
'Up',
|
||||||
|
rpc.port,
|
||||||
|
rpc.chain,
|
||||||
|
'{:<8} [{}]'.format(rpc.blockcount, make_timestr(rpc.cur_date)),
|
||||||
|
rpc.daemon.coind_name,
|
||||||
|
rpc.daemon_version_str,
|
||||||
|
rpc.daemon.datadir
|
||||||
|
)
|
||||||
|
yield fs.format( coin.upper(), cfgs[coin].network, *info )
|
||||||
|
|
||||||
|
base_cfg._util.stdout_or_pager('\n'.join(gen_output()))
|
||||||
|
|
||||||
|
all_coins = ('btc', 'ltc', 'eth')
|
||||||
|
|
||||||
|
coins = sys.argv[1:]
|
||||||
|
|
||||||
|
if coins and all(coin in all_coins for coin in coins):
|
||||||
|
asyncio.run(main(coins))
|
||||||
|
else:
|
||||||
|
print(f'You must supply one or more of the following coins on the command line:\n {all_coins}')
|
||||||
|
sys.exit(1)
|
||||||
531
mmgen/cfg.py
531
mmgen/cfg.py
|
|
@ -177,6 +177,7 @@ class Config(Lockable):
|
||||||
force_color = False # placeholder
|
force_color = False # placeholder
|
||||||
force_256_color = False
|
force_256_color = False
|
||||||
scroll = False
|
scroll = False
|
||||||
|
pager = False
|
||||||
columns = 0
|
columns = 0
|
||||||
color = bool(
|
color = bool(
|
||||||
( sys.stdout.isatty() and not os.getenv('MMGEN_TEST_SUITE_PEXPECT') ) or
|
( sys.stdout.isatty() and not os.getenv('MMGEN_TEST_SUITE_PEXPECT') ) or
|
||||||
|
|
@ -216,37 +217,14 @@ class Config(Lockable):
|
||||||
|
|
||||||
mnemonic_entry_modes = {}
|
mnemonic_entry_modes = {}
|
||||||
|
|
||||||
# placeholders:
|
# external use:
|
||||||
|
_opts = None
|
||||||
_proto = None
|
_proto = None
|
||||||
pager = False
|
|
||||||
|
|
||||||
# 'long' opts (subset of common_opts_data):
|
# internal use:
|
||||||
_common_opts = (
|
_data_dir_root_override = None
|
||||||
'accept_defaults',
|
_use_cfg_file = False
|
||||||
'aiohttp_rpc_queue_len',
|
_use_env = False
|
||||||
'bob',
|
|
||||||
'alice',
|
|
||||||
'carol',
|
|
||||||
'coin',
|
|
||||||
'color',
|
|
||||||
'columns',
|
|
||||||
'daemon_data_dir',
|
|
||||||
'daemon_id',
|
|
||||||
'force_256_color',
|
|
||||||
'http_timeout',
|
|
||||||
'ignore_daemon_version',
|
|
||||||
'no_license',
|
|
||||||
'regtest',
|
|
||||||
'rpc_host',
|
|
||||||
'rpc_password',
|
|
||||||
'rpc_port',
|
|
||||||
'rpc_user',
|
|
||||||
'scroll',
|
|
||||||
'testnet',
|
|
||||||
'token' )
|
|
||||||
|
|
||||||
# opts not in _common_opts but required to be set during opts initialization
|
|
||||||
_init_opts = ('show_hash_presets','yes','verbose')
|
|
||||||
|
|
||||||
_incompatible_opts = (
|
_incompatible_opts = (
|
||||||
('help','longhelp'),
|
('help','longhelp'),
|
||||||
|
|
@ -355,6 +333,7 @@ class Config(Lockable):
|
||||||
'vsize_adj': float,
|
'vsize_adj': float,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# test suite:
|
||||||
err_disp_timeout = 0.7
|
err_disp_timeout = 0.7
|
||||||
short_disp_timeout = 0.3
|
short_disp_timeout = 0.3
|
||||||
stdin_tty = sys.stdin.isatty()
|
stdin_tty = sys.stdin.isatty()
|
||||||
|
|
@ -410,7 +389,6 @@ class Config(Lockable):
|
||||||
self,
|
self,
|
||||||
cfg = None,
|
cfg = None,
|
||||||
opts_data = None,
|
opts_data = None,
|
||||||
add_opts = None,
|
|
||||||
init_opts = None,
|
init_opts = None,
|
||||||
opt_filter = None,
|
opt_filter = None,
|
||||||
parse_only = False,
|
parse_only = False,
|
||||||
|
|
@ -420,23 +398,490 @@ class Config(Lockable):
|
||||||
do_post_init = False,
|
do_post_init = False,
|
||||||
process_opts = False ):
|
process_opts = False ):
|
||||||
|
|
||||||
|
# Step 1: get user-supplied configuration data from a) command line, or b) first argument
|
||||||
|
# to constructor; save to self._uopts:
|
||||||
if opts_data or parsed_opts or process_opts:
|
if opts_data or parsed_opts or process_opts:
|
||||||
|
assert cfg is None
|
||||||
import mmgen.opts as opts
|
from mmgen.opts import UserOpts
|
||||||
|
UserOpts(
|
||||||
self._opts = opts
|
|
||||||
|
|
||||||
opts.init(
|
|
||||||
cfg = self,
|
cfg = self,
|
||||||
opts_data = opts_data,
|
opts_data = opts_data,
|
||||||
add_opts = add_opts,
|
|
||||||
init_opts = init_opts,
|
init_opts = init_opts,
|
||||||
opt_filter = opt_filter,
|
opt_filter = opt_filter,
|
||||||
parse_only = parse_only,
|
parse_only = parse_only,
|
||||||
parsed_opts = parsed_opts,
|
parsed_opts = parsed_opts )
|
||||||
need_proto = need_proto,
|
desc = 'command-line option'
|
||||||
need_amt = need_amt,
|
else:
|
||||||
do_post_init = do_post_init )
|
self._uopts = {} if cfg is None else cfg
|
||||||
|
desc = 'configuration option'
|
||||||
|
|
||||||
if do_post_init:
|
if parse_only and not any(k in self._uopts for k in ['help','longhelp']):
|
||||||
self._post_init = opts.post_init
|
return
|
||||||
|
|
||||||
|
# Step 2: set cfg from user-supplied data, skipping auto opts; set type from corresponding
|
||||||
|
# class attribute, if it exists:
|
||||||
|
auto_opts = tuple(self._autoset_opts) + tuple(self._auto_typeset_opts)
|
||||||
|
for key,val in self._uopts.items():
|
||||||
|
if key not in auto_opts:
|
||||||
|
setattr(self, key, set_for_type(key, val, getattr(self,key), desc) if hasattr(self,key) else val)
|
||||||
|
|
||||||
|
# Step 3: set cfg from environment, skipping already-set opts; save names set from environment:
|
||||||
|
env_cfg = tuple(self._set_cfg_from_env()) if self._use_env else ()
|
||||||
|
|
||||||
|
from .term import init_term
|
||||||
|
init_term(self) # requires ‘hold_protect_disable’ (set from env)
|
||||||
|
|
||||||
|
from .fileutil import check_or_create_dir
|
||||||
|
check_or_create_dir(self.data_dir_root)
|
||||||
|
|
||||||
|
from .util import wrap_ripemd160
|
||||||
|
wrap_ripemd160() # ripemd160 required by mmgen_cfg_file() in _set_cfg_from_cfg_file()
|
||||||
|
|
||||||
|
# Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save auto opts to be set:
|
||||||
|
# (cfgfile.py requires ‘data_dir_root’, ‘test_suite_cfgtest’)
|
||||||
|
cfgfile_opts = self._set_cfg_from_cfg_file( env_cfg, need_proto )
|
||||||
|
|
||||||
|
# Step 5: set autoset opts from user-supplied data, cfgfile data, or default values, in that order:
|
||||||
|
self._set_autoset_opts( cfgfile_opts.autoset )
|
||||||
|
|
||||||
|
# Step 6: set auto typeset opts from user-supplied data or cfgfile data, in that order:
|
||||||
|
self._set_auto_typeset_opts( cfgfile_opts.auto_typeset )
|
||||||
|
|
||||||
|
if self.regtest or self.bob or self.alice or self.carol or gc.prog_name == 'mmgen-regtest':
|
||||||
|
self.network = 'regtest'
|
||||||
|
self.regtest_user = 'bob' if self.bob else 'alice' if self.alice else 'carol' if self.carol else None
|
||||||
|
else:
|
||||||
|
self.network = 'testnet' if self.testnet else 'mainnet'
|
||||||
|
|
||||||
|
self.coin = self.coin.upper()
|
||||||
|
self.token = self.token.upper() or None
|
||||||
|
|
||||||
|
# self.color is finalized, so initialize color:
|
||||||
|
if self.color: # MMGEN_DISABLE_COLOR sets this to False
|
||||||
|
from .color import init_color
|
||||||
|
init_color(num_colors=256 if self.force_256_color else 'auto')
|
||||||
|
|
||||||
|
self._die_on_incompatible_opts()
|
||||||
|
|
||||||
|
check_or_create_dir(self.data_dir)
|
||||||
|
|
||||||
|
if self.debug and gc.prog_name != 'test.py':
|
||||||
|
self.verbose = True
|
||||||
|
self.quiet = False
|
||||||
|
|
||||||
|
if self.debug_opts:
|
||||||
|
opt_postproc_debug(self)
|
||||||
|
|
||||||
|
from .util import Util
|
||||||
|
self._util = Util(self)
|
||||||
|
|
||||||
|
self._lock()
|
||||||
|
|
||||||
|
if need_proto:
|
||||||
|
from .protocol import warn_trustlevel,init_proto_from_cfg
|
||||||
|
warn_trustlevel(self)
|
||||||
|
# requires the default-to-none behavior, so do after the lock:
|
||||||
|
self._proto = init_proto_from_cfg(self,need_amt=need_amt)
|
||||||
|
|
||||||
|
if self._opts and not do_post_init:
|
||||||
|
self._opts.init_bottom(self)
|
||||||
|
|
||||||
|
# Check user-set opts without modifying them
|
||||||
|
check_usr_opts(self,self._uopts)
|
||||||
|
|
||||||
|
def _set_cfg_from_env(self):
|
||||||
|
for name,val in ((k,v) for k,v in os.environ.items() if k.startswith('MMGEN_')):
|
||||||
|
if name == 'MMGEN_DEBUG_ALL':
|
||||||
|
continue
|
||||||
|
elif name in self._env_opts:
|
||||||
|
if val: # ignore empty string values; string value of '0' or 'false' sets variable to False
|
||||||
|
disable = name.startswith('MMGEN_DISABLE_')
|
||||||
|
gname = name[(6,14)[disable]:].lower()
|
||||||
|
if gname in self._uopts: # don’t touch attr if already set on cmdline
|
||||||
|
continue
|
||||||
|
elif hasattr(self,gname):
|
||||||
|
setattr(
|
||||||
|
self,
|
||||||
|
gname,
|
||||||
|
set_for_type( name, val, getattr(self,gname), 'environment var', invert_bool=disable ))
|
||||||
|
yield gname
|
||||||
|
else:
|
||||||
|
raise ValueError(f'Name {gname!r} not present in globals')
|
||||||
|
else:
|
||||||
|
raise ValueError(f'{name!r} is not a valid MMGen environment variable')
|
||||||
|
|
||||||
|
def _set_cfg_from_cfg_file(
|
||||||
|
self,
|
||||||
|
env_cfg,
|
||||||
|
need_proto ):
|
||||||
|
|
||||||
|
_ret = namedtuple('cfgfile_opts',['autoset','auto_typeset'])
|
||||||
|
|
||||||
|
if not self._use_cfg_file:
|
||||||
|
return _ret( {}, {} )
|
||||||
|
|
||||||
|
# check for changes in system template file (term must be initialized)
|
||||||
|
from .cfgfile import mmgen_cfg_file
|
||||||
|
mmgen_cfg_file(self,'sample')
|
||||||
|
|
||||||
|
ucfg = mmgen_cfg_file(self,'usr')
|
||||||
|
|
||||||
|
if need_proto:
|
||||||
|
from .protocol import init_proto
|
||||||
|
|
||||||
|
autoset_opts = {}
|
||||||
|
auto_typeset_opts = {}
|
||||||
|
already_set = tuple(self._uopts) + env_cfg
|
||||||
|
|
||||||
|
for d in ucfg.get_lines():
|
||||||
|
if d.name in self._cfg_file_opts:
|
||||||
|
ns = d.name.split('_')
|
||||||
|
if ns[0] in gc.core_coins:
|
||||||
|
if not need_proto:
|
||||||
|
continue
|
||||||
|
nse,tn = (
|
||||||
|
(ns[2:],ns[1]=='testnet') if len(ns) > 2 and ns[1] in ('mainnet','testnet') else
|
||||||
|
(ns[1:],False)
|
||||||
|
)
|
||||||
|
cls = type(init_proto( self, ns[0], tn, need_amt=True )) # no instance yet, so override _class_ attr
|
||||||
|
attr = '_'.join(nse)
|
||||||
|
else:
|
||||||
|
cls = self
|
||||||
|
attr = d.name
|
||||||
|
refval = getattr(cls,attr)
|
||||||
|
val = ucfg.parse_value(d.value,refval)
|
||||||
|
if not val:
|
||||||
|
die( 'CfgFileParseError', f'Parse error in file {ucfg.fn!r}, line {d.lineno}' )
|
||||||
|
val_conv = set_for_type( attr, val, refval, 'configuration file option', src=ucfg.fn )
|
||||||
|
if not attr in already_set:
|
||||||
|
setattr(cls,attr,val_conv)
|
||||||
|
elif d.name in self._autoset_opts:
|
||||||
|
autoset_opts[d.name] = d.value
|
||||||
|
elif d.name in self._auto_typeset_opts:
|
||||||
|
auto_typeset_opts[d.name] = d.value
|
||||||
|
else:
|
||||||
|
die( 'CfgFileParseError', f'{d.name!r}: unrecognized option in {ucfg.fn!r}, line {d.lineno}' )
|
||||||
|
|
||||||
|
return _ret( autoset_opts, auto_typeset_opts )
|
||||||
|
|
||||||
|
def _set_autoset_opts(self,cfgfile_autoset_opts):
|
||||||
|
|
||||||
|
def get_autoset_opt(key,val,src):
|
||||||
|
|
||||||
|
def die_on_err(desc):
|
||||||
|
from .util import fmt_list
|
||||||
|
die(
|
||||||
|
'UserOptError',
|
||||||
|
'{a!r}: invalid {b} (not {c}: {d})'.format(
|
||||||
|
a = val,
|
||||||
|
b = {
|
||||||
|
'cmdline': 'parameter for option --{}'.format(key.replace('_','-')),
|
||||||
|
'cfgfile': 'value for cfg file option {!r}'.format(key)
|
||||||
|
}[src],
|
||||||
|
c = desc,
|
||||||
|
d = fmt_list(data.choices) ))
|
||||||
|
|
||||||
|
class opt_type:
|
||||||
|
|
||||||
|
def nocase_str():
|
||||||
|
if val.lower() in data.choices:
|
||||||
|
return val.lower()
|
||||||
|
else:
|
||||||
|
die_on_err('one of')
|
||||||
|
|
||||||
|
def nocase_pfx():
|
||||||
|
cs = [s for s in data.choices if s.startswith(val.lower())]
|
||||||
|
if len(cs) == 1:
|
||||||
|
return cs[0]
|
||||||
|
else:
|
||||||
|
die_on_err('unique substring of')
|
||||||
|
|
||||||
|
data = self._autoset_opts[key]
|
||||||
|
|
||||||
|
return getattr(opt_type,data.type)()
|
||||||
|
|
||||||
|
# Check autoset opts, setting if unset
|
||||||
|
for key in self._autoset_opts:
|
||||||
|
|
||||||
|
assert not hasattr(self,key), f'autoset opt {key!r} is already set, but it shouldn’t be!'
|
||||||
|
|
||||||
|
if key in self._uopts:
|
||||||
|
val,src = (self._uopts[key],'cmdline')
|
||||||
|
elif key in cfgfile_autoset_opts:
|
||||||
|
val,src = (cfgfile_autoset_opts[key],'cfgfile')
|
||||||
|
else:
|
||||||
|
val = None
|
||||||
|
|
||||||
|
if val is None:
|
||||||
|
setattr(self, key, self._autoset_opts[key].choices[0])
|
||||||
|
else:
|
||||||
|
setattr(self, key, get_autoset_opt(key,val,src=src))
|
||||||
|
|
||||||
|
def _set_auto_typeset_opts(self,cfgfile_auto_typeset_opts):
|
||||||
|
|
||||||
|
def do_set(key,val,ref_type):
|
||||||
|
assert not hasattr(self,key), f'{key!r} is in cfg!'
|
||||||
|
setattr(self,key,None if val is None else ref_type(val))
|
||||||
|
|
||||||
|
for key,ref_type in self._auto_typeset_opts.items():
|
||||||
|
if key in self._uopts:
|
||||||
|
do_set(key, self._uopts[key], ref_type)
|
||||||
|
elif key in cfgfile_auto_typeset_opts:
|
||||||
|
do_set(key, cfgfile_auto_typeset_opts[key], ref_type)
|
||||||
|
|
||||||
|
def _post_init(self):
|
||||||
|
return self._opts.init_bottom(self)
|
||||||
|
|
||||||
|
def _die_on_incompatible_opts(self):
|
||||||
|
for group in self._incompatible_opts:
|
||||||
|
bad = [k for k in self.__dict__ if k in group and getattr(self,k) != None]
|
||||||
|
if len(bad) > 1:
|
||||||
|
die(1,'Conflicting options: {}'.format(', '.join(map(fmt_opt,bad))))
|
||||||
|
|
||||||
|
def check_usr_opts(cfg,usr_opts): # Raises an exception if any check fails
|
||||||
|
|
||||||
|
def opt_splits(val,sep,n,desc):
|
||||||
|
sepword = 'comma' if sep == ',' else 'colon' if sep == ':' else repr(sep)
|
||||||
|
try:
|
||||||
|
l = val.split(sep)
|
||||||
|
except:
|
||||||
|
die( 'UserOptError', f'{val!r}: invalid {desc} (not {sepword}-separated list)' )
|
||||||
|
|
||||||
|
if len(l) != n:
|
||||||
|
die( 'UserOptError', f'{val!r}: invalid {desc} ({n} {sepword}-separated items required)' )
|
||||||
|
|
||||||
|
def opt_compares(val,op_str,target,desc,desc2=''):
|
||||||
|
import operator as o
|
||||||
|
op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str]
|
||||||
|
if not op_f(val,target):
|
||||||
|
if desc2:
|
||||||
|
desc2 += ' '
|
||||||
|
die( 'UserOptError', f'{val}: invalid {desc} ({desc2}not {op_str} {target})' )
|
||||||
|
|
||||||
|
def opt_is_int(val,desc):
|
||||||
|
if not is_int(val):
|
||||||
|
die( 'UserOptError', f'{val!r}: invalid {desc} (not an integer)' )
|
||||||
|
|
||||||
|
def opt_is_float(val,desc):
|
||||||
|
try:
|
||||||
|
float(val)
|
||||||
|
except:
|
||||||
|
die( 'UserOptError', f'{val!r}: invalid {desc} (not a floating-point number)' )
|
||||||
|
|
||||||
|
def opt_is_in_list(val,tlist,desc):
|
||||||
|
if val not in tlist:
|
||||||
|
q,sep = (('',','),("'","','"))[type(tlist[0]) == str]
|
||||||
|
die( 'UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format(
|
||||||
|
v = val,
|
||||||
|
w = desc,
|
||||||
|
q = q,
|
||||||
|
o = sep.join(map(str,sorted(tlist))) ))
|
||||||
|
|
||||||
|
def opt_unrecognized(key,val,desc='value'):
|
||||||
|
die( 'UserOptError', f'{val!r}: unrecognized {desc} for option {fmt_opt(key)!r}' )
|
||||||
|
|
||||||
|
def opt_display(key,val='',beg='For selected',end=':\n'):
|
||||||
|
from .util import msg_r
|
||||||
|
msg_r('{} option {!r}{}'.format(
|
||||||
|
beg,
|
||||||
|
f'{fmt_opt(key)}={val}' if val else fmt_opt(key),
|
||||||
|
end ))
|
||||||
|
|
||||||
|
def chk_in_fmt(key,val,desc):
|
||||||
|
from .wallet import get_wallet_data
|
||||||
|
wd = get_wallet_data(fmt_code=val)
|
||||||
|
if not wd:
|
||||||
|
opt_unrecognized(key,val)
|
||||||
|
if key == 'out_fmt':
|
||||||
|
p = 'hidden_incog_output_params'
|
||||||
|
|
||||||
|
if wd.type == 'incog_hidden' and not getattr(cfg,p):
|
||||||
|
die( 'UserOptError',
|
||||||
|
'Hidden incog format output requested. ' +
|
||||||
|
f'You must supply a file and offset with the {fmt_opt(p)!r} option' )
|
||||||
|
|
||||||
|
if wd.base_type == 'incog_base' and cfg.old_incog_fmt:
|
||||||
|
opt_display(key,val,beg='Selected',end=' ')
|
||||||
|
opt_display('old_incog_fmt',beg='conflicts with',end=':\n')
|
||||||
|
die( 'UserOptError', 'Export to old incog wallet format unsupported' )
|
||||||
|
elif wd.type == 'brain':
|
||||||
|
die( 'UserOptError', 'Output to brainwallet format unsupported' )
|
||||||
|
|
||||||
|
chk_out_fmt = chk_in_fmt
|
||||||
|
|
||||||
|
def chk_hidden_incog_input_params(key,val,desc):
|
||||||
|
a = val.rsplit(',',1) # permit comma in filename
|
||||||
|
if len(a) != 2:
|
||||||
|
opt_display(key,val)
|
||||||
|
die( 'UserOptError', 'Option requires two comma-separated arguments' )
|
||||||
|
|
||||||
|
fn,offset = a
|
||||||
|
opt_is_int(offset,desc)
|
||||||
|
|
||||||
|
from .fileutil import check_infile,check_outdir,check_outfile
|
||||||
|
if key == 'hidden_incog_input_params':
|
||||||
|
check_infile(fn,blkdev_ok=True)
|
||||||
|
key2 = 'in_fmt'
|
||||||
|
else:
|
||||||
|
try: os.stat(fn)
|
||||||
|
except:
|
||||||
|
b = os.path.dirname(fn)
|
||||||
|
if b:
|
||||||
|
check_outdir(b)
|
||||||
|
else:
|
||||||
|
check_outfile(fn,blkdev_ok=True)
|
||||||
|
key2 = 'out_fmt'
|
||||||
|
|
||||||
|
if hasattr(cfg,key2):
|
||||||
|
val2 = getattr(cfg,key2)
|
||||||
|
from .wallet import get_wallet_data
|
||||||
|
wd = get_wallet_data('incog_hidden')
|
||||||
|
if val2 and val2 not in wd.fmt_codes:
|
||||||
|
die( 'UserOptError', f'Option conflict:\n {fmt_opt(key)}, with\n {fmt_opt(key2)}={val2}' )
|
||||||
|
|
||||||
|
chk_hidden_incog_output_params = chk_hidden_incog_input_params
|
||||||
|
|
||||||
|
def chk_subseeds(key,val,desc):
|
||||||
|
from .subseed import SubSeedIdxRange
|
||||||
|
opt_is_int(val,desc)
|
||||||
|
opt_compares(int(val),'>=',SubSeedIdxRange.min_idx,desc)
|
||||||
|
opt_compares(int(val),'<=',SubSeedIdxRange.max_idx,desc)
|
||||||
|
|
||||||
|
def chk_seed_len(key,val,desc):
|
||||||
|
from .seed import Seed
|
||||||
|
opt_is_int(val,desc)
|
||||||
|
opt_is_in_list(int(val),Seed.lens,desc)
|
||||||
|
|
||||||
|
def chk_hash_preset(key,val,desc):
|
||||||
|
from .crypto import Crypto
|
||||||
|
opt_is_in_list(val,list(Crypto.hash_presets.keys()),desc)
|
||||||
|
|
||||||
|
def chk_brain_params(key,val,desc):
|
||||||
|
a = val.split(',')
|
||||||
|
if len(a) != 2:
|
||||||
|
opt_display(key,val)
|
||||||
|
die( 'UserOptError', 'Option requires two comma-separated arguments' )
|
||||||
|
opt_is_int(a[0],'seed length '+desc)
|
||||||
|
from .seed import Seed
|
||||||
|
opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc)
|
||||||
|
from .crypto import Crypto
|
||||||
|
opt_is_in_list(a[1],list(Crypto.hash_presets.keys()),'hash preset '+desc)
|
||||||
|
|
||||||
|
def chk_usr_randchars(key,val,desc):
|
||||||
|
if val == 0:
|
||||||
|
return
|
||||||
|
opt_is_int(val,desc)
|
||||||
|
opt_compares(val,'>=',cfg.min_urandchars,desc)
|
||||||
|
opt_compares(val,'<=',cfg.max_urandchars,desc)
|
||||||
|
|
||||||
|
def chk_tx_fee(key,val,desc):
|
||||||
|
pass
|
||||||
|
# opt_is_tx_fee(key,val,desc) # TODO: move this check elsewhere
|
||||||
|
|
||||||
|
def chk_tx_confs(key,val,desc):
|
||||||
|
opt_is_int(val,desc)
|
||||||
|
opt_compares(val,'>=',1,desc)
|
||||||
|
|
||||||
|
def chk_vsize_adj(key,val,desc):
|
||||||
|
opt_is_float(val,desc)
|
||||||
|
from .util import ymsg
|
||||||
|
ymsg(f'Adjusting transaction vsize by a factor of {float(val):1.2f}')
|
||||||
|
|
||||||
|
def chk_daemon_id(key,val,desc):
|
||||||
|
from .daemon import CoinDaemon
|
||||||
|
opt_is_in_list(val,CoinDaemon.all_daemon_ids(),desc)
|
||||||
|
|
||||||
|
# TODO: move this check elsewhere
|
||||||
|
# def chk_rbf(key,val,desc):
|
||||||
|
# if not proto.cap('rbf'):
|
||||||
|
# die( 'UserOptError', f'--rbf requested, but {proto.coin} does not support replace-by-fee transactions' )
|
||||||
|
|
||||||
|
# def chk_bob(key,val,desc):
|
||||||
|
# from .proto.btc.regtest import MMGenRegtest
|
||||||
|
# try:
|
||||||
|
# os.stat(os.path.join(MMGenRegtest(cfg,cfg.coin).d.datadir,'regtest','debug.log'))
|
||||||
|
# except:
|
||||||
|
# die( 'UserOptError',
|
||||||
|
# 'Regtest (Bob and Alice) mode not set up yet. ' +
|
||||||
|
# f"Run '{gc.proj_name.lower()}-regtest setup' to initialize." )
|
||||||
|
#
|
||||||
|
# chk_alice = chk_bob
|
||||||
|
|
||||||
|
def chk_locktime(key,val,desc):
|
||||||
|
opt_is_int(val,desc)
|
||||||
|
opt_compares(int(val),'>',0,desc)
|
||||||
|
|
||||||
|
def chk_columns(key,val,desc):
|
||||||
|
opt_compares(int(val),'>',10,desc)
|
||||||
|
|
||||||
|
# TODO: move this check elsewhere
|
||||||
|
# def chk_token(key,val,desc):
|
||||||
|
# if not 'token' in proto.caps:
|
||||||
|
# die( 'UserOptError', f'Coin {tx.coin!r} does not support the --token option' )
|
||||||
|
# if len(val) == 40 and is_hex_str(val):
|
||||||
|
# return
|
||||||
|
# if len(val) > 20 or not all(s.isalnum() for s in val):
|
||||||
|
# die( 'UserOptError', f'{val!r}: invalid parameter for --token option' )
|
||||||
|
|
||||||
|
from .util import is_int,Msg
|
||||||
|
|
||||||
|
cfuncs = { k:v for k,v in locals().items() if k.startswith('chk_') }
|
||||||
|
|
||||||
|
for key in usr_opts:
|
||||||
|
val = getattr(cfg,key)
|
||||||
|
desc = f'parameter for {fmt_opt(key)!r} option'
|
||||||
|
|
||||||
|
if key in cfg._infile_opts:
|
||||||
|
from .fileutil import check_infile
|
||||||
|
check_infile(val) # file exists and is readable - dies on error
|
||||||
|
elif key == 'outdir':
|
||||||
|
from .fileutil import check_outdir
|
||||||
|
check_outdir(val) # dies on error
|
||||||
|
elif 'chk_'+key in cfuncs:
|
||||||
|
cfuncs['chk_'+key](key,val,desc)
|
||||||
|
elif cfg.debug:
|
||||||
|
Msg(f'check_usr_opts(): No test for config opt {key!r}')
|
||||||
|
|
||||||
|
def fmt_opt(o):
|
||||||
|
return '--' + o.replace('_','-')
|
||||||
|
|
||||||
|
def opt_postproc_debug(cfg):
|
||||||
|
a = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) != None]
|
||||||
|
b = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) == None]
|
||||||
|
from .util import Msg
|
||||||
|
Msg('\n Configuration opts:')
|
||||||
|
for e in [d for d in dir(cfg) if d[:2] != '__']:
|
||||||
|
Msg(' {:<20}: {}'.format(e, getattr(cfg,e)))
|
||||||
|
Msg(" Configuration opts set to 'None':")
|
||||||
|
Msg(' {}\n'.format('\n '.join(b)))
|
||||||
|
Msg('\n=== end opts.py debug ===\n')
|
||||||
|
|
||||||
|
def set_for_type(
|
||||||
|
name,
|
||||||
|
val,
|
||||||
|
refval,
|
||||||
|
desc,
|
||||||
|
invert_bool = False,
|
||||||
|
src = None ):
|
||||||
|
|
||||||
|
if type(refval) == bool:
|
||||||
|
v = str(val).lower()
|
||||||
|
ret = (
|
||||||
|
True if v in ('true','yes','1','on') else
|
||||||
|
False if v in ('false','no','none','0','off','') else
|
||||||
|
None
|
||||||
|
)
|
||||||
|
if ret is not None:
|
||||||
|
return not ret if invert_bool else ret
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
return type(refval)(not val if invert_bool else val)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
die(1,'{a!r}: invalid value for {b} {c!r}{d} (must be of type {e!r})'.format(
|
||||||
|
a = val,
|
||||||
|
b = desc,
|
||||||
|
c = fmt_opt(name) if desc == 'command-line option' else name,
|
||||||
|
d = f' in {src!r}' if src else '',
|
||||||
|
e = type(refval).__name__ ))
|
||||||
|
|
|
||||||
|
|
@ -1 +1 @@
|
||||||
13.3.dev43
|
13.3.dev44
|
||||||
|
|
|
||||||
642
mmgen/opts.py
642
mmgen/opts.py
|
|
@ -21,84 +21,9 @@ opts: MMGen-specific command-line options processing after generic processing by
|
||||||
"""
|
"""
|
||||||
import sys,os
|
import sys,os
|
||||||
|
|
||||||
from .cfg import gc,Config
|
from .cfg import gc
|
||||||
from .base_obj import Lockable
|
|
||||||
|
|
||||||
import mmgen.share.Opts
|
import mmgen.share.Opts as Opts
|
||||||
|
|
||||||
class UserOpts: pass
|
|
||||||
opt = UserOpts()
|
|
||||||
|
|
||||||
def usage():
|
|
||||||
from .util import Die
|
|
||||||
Die(1,mmgen.share.Opts.make_usage_str(gc.prog_name,'user',usage_data))
|
|
||||||
|
|
||||||
def version():
|
|
||||||
from .util import Die,fmt
|
|
||||||
Die(0,fmt(f"""
|
|
||||||
{gc.prog_name.upper()} version {gc.version}
|
|
||||||
Part of the {gc.proj_name} suite, an online/offline cryptocurrency wallet for the
|
|
||||||
command line. Copyright (C){gc.Cdates} {gc.author} {gc.email}
|
|
||||||
""",indent=' ').rstrip())
|
|
||||||
|
|
||||||
def delete_data(opts_data):
|
|
||||||
for k in ('text','notes','code'):
|
|
||||||
if k in opts_data:
|
|
||||||
del opts_data[k]
|
|
||||||
del mmgen.share.Opts.make_help
|
|
||||||
del mmgen.share.Opts.process_uopts
|
|
||||||
del mmgen.share.Opts.parse_opts
|
|
||||||
|
|
||||||
def post_init(cfg):
|
|
||||||
global opts_data_save,opt_filter_save
|
|
||||||
if cfg.help or cfg.longhelp:
|
|
||||||
print_help(cfg,opts_data_save,opt_filter_save)
|
|
||||||
else:
|
|
||||||
delete_data(opts_data_save)
|
|
||||||
del opts_data_save,opt_filter_save
|
|
||||||
|
|
||||||
def print_help(cfg,opts_data,opt_filter):
|
|
||||||
|
|
||||||
if not 'code' in opts_data:
|
|
||||||
opts_data['code'] = {}
|
|
||||||
|
|
||||||
from .protocol import init_proto_from_cfg
|
|
||||||
proto = init_proto_from_cfg(cfg,need_amt=True)
|
|
||||||
|
|
||||||
if getattr(cfg,'longhelp',None):
|
|
||||||
opts_data['code']['long_options'] = common_opts_data['code']
|
|
||||||
def remove_unneeded_long_opts():
|
|
||||||
d = opts_data['text']['long_options']
|
|
||||||
if proto.base_proto != 'Ethereum':
|
|
||||||
d = '\n'.join(''+i for i in d.split('\n') if not '--token' in i)
|
|
||||||
opts_data['text']['long_options'] = d
|
|
||||||
remove_unneeded_long_opts()
|
|
||||||
|
|
||||||
from .ui import do_pager
|
|
||||||
do_pager(mmgen.share.Opts.make_help( cfg, proto, opts_data, opt_filter ))
|
|
||||||
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
def fmt_opt(o):
|
|
||||||
return '--' + o.replace('_','-')
|
|
||||||
|
|
||||||
def die_on_incompatible_opts(cfg):
|
|
||||||
for group in cfg._incompatible_opts:
|
|
||||||
bad = [k for k in cfg.__dict__ if k in group and getattr(cfg,k) != None]
|
|
||||||
if len(bad) > 1:
|
|
||||||
from .util import die
|
|
||||||
die(1,'Conflicting options: {}'.format(', '.join(map(fmt_opt,bad))))
|
|
||||||
|
|
||||||
def _show_hash_presets():
|
|
||||||
fs = ' {:<7} {:<6} {:<3} {}'
|
|
||||||
from .util import msg
|
|
||||||
from .crypto import Crypto
|
|
||||||
msg('Available parameters for scrypt.hash():')
|
|
||||||
msg(fs.format('Preset','N','r','p'))
|
|
||||||
for i in sorted(Crypto.hash_presets.keys()):
|
|
||||||
msg(fs.format(i,*Crypto.hash_presets[i]))
|
|
||||||
msg('N = memory usage (power of two), p = iterations (rounds)')
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
def opt_preproc_debug(po):
|
def opt_preproc_debug(po):
|
||||||
d = (
|
d = (
|
||||||
|
|
@ -114,101 +39,7 @@ def opt_preproc_debug(po):
|
||||||
for label,data,pretty in d:
|
for label,data,pretty in d:
|
||||||
Msg(' {:<20}: {}'.format(label,'\n' + fmt_list(data,fmt='col',indent=' '*8) if pretty else data))
|
Msg(' {:<20}: {}'.format(label,'\n' + fmt_list(data,fmt='col',indent=' '*8) if pretty else data))
|
||||||
|
|
||||||
def opt_postproc_debug(cfg):
|
long_opts_data = {
|
||||||
a = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) != None]
|
|
||||||
b = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) == None]
|
|
||||||
from .util import Msg
|
|
||||||
Msg('\n Global vars:')
|
|
||||||
for e in [d for d in dir(cfg) if d[:2] != '__']:
|
|
||||||
Msg(' {:<20}: {}'.format(e, getattr(cfg,e)))
|
|
||||||
Msg(" Global vars set to 'None':")
|
|
||||||
Msg(' {}\n'.format('\n '.join(b)))
|
|
||||||
Msg('\n=== end opts.py debug ===\n')
|
|
||||||
|
|
||||||
def set_for_type(val,refval,desc,invert_bool=False,src=None):
|
|
||||||
|
|
||||||
if type(refval) == bool:
|
|
||||||
v = str(val).lower()
|
|
||||||
ret = (
|
|
||||||
True if v in ('true','yes','1','on') else
|
|
||||||
False if v in ('false','no','none','0','off','') else
|
|
||||||
None
|
|
||||||
)
|
|
||||||
if ret is not None:
|
|
||||||
return not ret if invert_bool else ret
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
return type(refval)(not val if invert_bool else val)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
from .util import die
|
|
||||||
die(1,'{!r}: invalid value for {!r}{} (must be of type {!r})'.format(
|
|
||||||
val,
|
|
||||||
desc,
|
|
||||||
' in {!r}'.format(src) if src else '',
|
|
||||||
type(refval).__name__) )
|
|
||||||
|
|
||||||
def set_cfg_from_cfg_file(
|
|
||||||
cfg,
|
|
||||||
ucfg,
|
|
||||||
cfgfile_autoset_opts,
|
|
||||||
cfgfile_auto_typeset_opts,
|
|
||||||
env_cfg,
|
|
||||||
need_proto ):
|
|
||||||
|
|
||||||
if need_proto:
|
|
||||||
from .protocol import init_proto
|
|
||||||
|
|
||||||
for d in ucfg.get_lines():
|
|
||||||
if d.name in cfg._cfg_file_opts:
|
|
||||||
ns = d.name.split('_')
|
|
||||||
if ns[0] in gc.core_coins:
|
|
||||||
if not need_proto:
|
|
||||||
continue
|
|
||||||
nse,tn = (
|
|
||||||
(ns[2:],ns[1]=='testnet') if len(ns) > 2 and ns[1] in ('mainnet','testnet') else
|
|
||||||
(ns[1:],False)
|
|
||||||
)
|
|
||||||
cls = type(init_proto( cfg, ns[0], tn, need_amt=True )) # no instance yet, so override _class_ attr
|
|
||||||
attr = '_'.join(nse)
|
|
||||||
else:
|
|
||||||
cls = cfg
|
|
||||||
attr = d.name
|
|
||||||
refval = getattr(cls,attr)
|
|
||||||
val = ucfg.parse_value(d.value,refval)
|
|
||||||
if not val:
|
|
||||||
from .util import die
|
|
||||||
die( 'CfgFileParseError', f'Parse error in file {ucfg.fn!r}, line {d.lineno}' )
|
|
||||||
val_conv = set_for_type(val,refval,attr,src=ucfg.fn)
|
|
||||||
if attr not in env_cfg:
|
|
||||||
setattr(cls,attr,val_conv)
|
|
||||||
elif d.name in cfg._autoset_opts:
|
|
||||||
cfgfile_autoset_opts[d.name] = d.value
|
|
||||||
elif d.name in cfg._auto_typeset_opts:
|
|
||||||
cfgfile_auto_typeset_opts[d.name] = d.value
|
|
||||||
else:
|
|
||||||
from .util import die
|
|
||||||
die( 'CfgFileParseError', f'{d.name!r}: unrecognized option in {ucfg.fn!r}, line {d.lineno}' )
|
|
||||||
|
|
||||||
def set_cfg_from_env(cfg):
|
|
||||||
for name,val in ((k,v) for k,v in os.environ.items() if k.startswith('MMGEN_')):
|
|
||||||
if name == 'MMGEN_DEBUG_ALL':
|
|
||||||
continue
|
|
||||||
elif name in cfg._env_opts:
|
|
||||||
if val: # ignore empty string values; string value of '0' or 'false' sets variable to False
|
|
||||||
disable = name.startswith('MMGEN_DISABLE_')
|
|
||||||
gname = name[(6,14)[disable]:].lower()
|
|
||||||
if hasattr(cfg,gname):
|
|
||||||
setattr(cfg,gname,set_for_type(val,getattr(cfg,gname),name,disable))
|
|
||||||
yield gname
|
|
||||||
else:
|
|
||||||
raise ValueError(f'Name {gname!r} not present in globals')
|
|
||||||
else:
|
|
||||||
raise ValueError(f'{name!r} is not a valid MMGen environment variable')
|
|
||||||
|
|
||||||
common_opts_data = {
|
|
||||||
# Most but not all of these set the corresponding global var
|
|
||||||
'text': """
|
'text': """
|
||||||
--, --accept-defaults Accept defaults at all prompts
|
--, --accept-defaults Accept defaults at all prompts
|
||||||
--, --coin=c Choose coin unit. Default: BTC. Current choice: {cu_dfl}
|
--, --coin=c Choose coin unit. Default: BTC. Current choice: {cu_dfl}
|
||||||
|
|
@ -256,415 +87,112 @@ opts_data_dfl = {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def init(
|
class UserOpts:
|
||||||
cfg,
|
|
||||||
opts_data = None,
|
|
||||||
add_opts = None,
|
|
||||||
init_opts = None,
|
|
||||||
opt_filter = None,
|
|
||||||
parse_only = False,
|
|
||||||
parsed_opts = None,
|
|
||||||
need_proto = True,
|
|
||||||
need_amt = True,
|
|
||||||
do_post_init = False ):
|
|
||||||
|
|
||||||
if opts_data is None:
|
def __init__(
|
||||||
opts_data = opts_data_dfl
|
self,
|
||||||
|
|
||||||
opts_data['text']['long_options'] = common_opts_data['text']
|
|
||||||
|
|
||||||
# Make this available to usage()
|
|
||||||
global usage_data
|
|
||||||
usage_data = opts_data['text'].get('usage2') or opts_data['text']['usage']
|
|
||||||
|
|
||||||
# po: (user_opts,cmd_args,opts,filtered_opts)
|
|
||||||
po = parsed_opts or mmgen.share.Opts.parse_opts(opts_data,opt_filter=opt_filter,parse_only=parse_only)
|
|
||||||
|
|
||||||
if init_opts: # allow programs to preload user opts
|
|
||||||
for uopt,val in init_opts.items():
|
|
||||||
if uopt not in po.user_opts:
|
|
||||||
po.user_opts[uopt] = val
|
|
||||||
|
|
||||||
if parse_only and not any(k in po.user_opts for k in ('version','help','longhelp')):
|
|
||||||
cfg._parsed_opts = po
|
|
||||||
return
|
|
||||||
|
|
||||||
if os.getenv('MMGEN_DEBUG_OPTS'):
|
|
||||||
opt_preproc_debug(po)
|
|
||||||
|
|
||||||
# Copy parsed opts to opt, setting values to None if not set by user
|
|
||||||
for o in set(
|
|
||||||
po.opts
|
|
||||||
+ po.filtered_opts
|
|
||||||
+ tuple(add_opts or [])
|
|
||||||
+ tuple(init_opts or [])
|
|
||||||
+ cfg._init_opts
|
|
||||||
+ cfg._common_opts ):
|
|
||||||
setattr(opt,o,po.user_opts[o] if o in po.user_opts else None)
|
|
||||||
|
|
||||||
if opt.version:
|
|
||||||
version() # exits
|
|
||||||
|
|
||||||
if opt.show_hash_presets:
|
|
||||||
_show_hash_presets() # exits
|
|
||||||
|
|
||||||
from .util import wrap_ripemd160
|
|
||||||
wrap_ripemd160() # ripemd160 used by mmgen_cfg_file()
|
|
||||||
|
|
||||||
# === begin Config() initialization === #
|
|
||||||
|
|
||||||
env_cfg = tuple(set_cfg_from_env(cfg))
|
|
||||||
|
|
||||||
# do this after setting ‘hold_protect_disable’ from env
|
|
||||||
from .term import init_term
|
|
||||||
init_term(cfg)
|
|
||||||
|
|
||||||
# --data-dir overrides computed value of data_dir_root
|
|
||||||
cfg._data_dir_root_override = opt.data_dir
|
|
||||||
if opt.data_dir:
|
|
||||||
del opt.data_dir
|
|
||||||
|
|
||||||
from .fileutil import check_or_create_dir
|
|
||||||
check_or_create_dir(cfg.data_dir_root)
|
|
||||||
|
|
||||||
cfgfile_autoset_opts = {}
|
|
||||||
cfgfile_auto_typeset_opts = {}
|
|
||||||
|
|
||||||
if not opt.skip_cfg_file:
|
|
||||||
from .cfgfile import mmgen_cfg_file
|
|
||||||
# check for changes in system template file - term must be initialized
|
|
||||||
mmgen_cfg_file(cfg,'sample')
|
|
||||||
set_cfg_from_cfg_file(
|
|
||||||
cfg,
|
cfg,
|
||||||
mmgen_cfg_file(cfg,'usr'),
|
opts_data = None,
|
||||||
cfgfile_autoset_opts,
|
init_opts = None, # dict containing opts to pre-initialize
|
||||||
cfgfile_auto_typeset_opts,
|
opt_filter = None, # whitelist of opt letters; all others are skipped
|
||||||
env_cfg,
|
parse_only = False,
|
||||||
need_proto )
|
parsed_opts = None ):
|
||||||
|
|
||||||
# Set cfg from opts, setting type from original class attr in Config if it exists:
|
self.opts_data = od = opts_data or opts_data_dfl
|
||||||
auto_keys = tuple(cfg._autoset_opts.keys()) + tuple(cfg._auto_typeset_opts.keys())
|
self.opt_filter = opt_filter
|
||||||
for key,val in opt.__dict__.items():
|
|
||||||
if val is not None and key not in auto_keys:
|
|
||||||
setattr(cfg, key, set_for_type(val,getattr(cfg,key),'--'+key) if hasattr(cfg,key) else val)
|
|
||||||
|
|
||||||
if cfg.regtest or cfg.bob or cfg.alice or cfg.carol or gc.prog_name == 'mmgen-regtest':
|
od['text']['long_options'] = long_opts_data['text']
|
||||||
cfg.network = 'regtest'
|
|
||||||
cfg.regtest_user = 'bob' if cfg.bob else 'alice' if cfg.alice else 'carol' if cfg.carol else None
|
|
||||||
else:
|
|
||||||
cfg.network = 'testnet' if cfg.testnet else 'mainnet'
|
|
||||||
|
|
||||||
cfg.coin = cfg.coin.upper()
|
# Make this available to usage()
|
||||||
cfg.token = cfg.token.upper() or None
|
self.usage_data = od['text'].get('usage2') or od['text']['usage']
|
||||||
|
|
||||||
# === end global var initialization === #
|
# po: (user_opts,cmd_args,opts,filtered_opts)
|
||||||
|
po = parsed_opts or Opts.parse_opts(od,opt_filter=opt_filter,parse_only=parse_only)
|
||||||
|
|
||||||
"""
|
cfg._args = po.cmd_args
|
||||||
cfg.color is finalized, so initialize color
|
cfg._uopts = uopts = po.user_opts
|
||||||
"""
|
|
||||||
if cfg.color: # MMGEN_DISABLE_COLOR sets this to False
|
|
||||||
from .color import init_color
|
|
||||||
init_color(num_colors=('auto',256)[bool(cfg.force_256_color)])
|
|
||||||
|
|
||||||
die_on_incompatible_opts(cfg)
|
if init_opts: # initialize user opts to given value
|
||||||
|
for uopt,val in init_opts.items():
|
||||||
|
if uopt not in uopts:
|
||||||
|
uopts[uopt] = val
|
||||||
|
|
||||||
check_or_create_dir(cfg.data_dir)
|
cfg._opts = self
|
||||||
|
cfg._parsed_opts = po
|
||||||
|
cfg._use_env = True
|
||||||
|
cfg._use_cfg_file = not 'skip_cfg_file' in uopts
|
||||||
|
|
||||||
# Set globals from opts, setting type from original global value if it exists:
|
if os.getenv('MMGEN_DEBUG_OPTS'):
|
||||||
for key in cfg._autoset_opts:
|
opt_preproc_debug(po)
|
||||||
if hasattr(opt,key):
|
|
||||||
assert not hasattr(cfg,key), f'{key!r} is in cfg!'
|
|
||||||
if getattr(opt,key) is not None:
|
|
||||||
setattr(cfg, key, get_autoset_opt(cfg,key,getattr(opt,key),src='cmdline'))
|
|
||||||
elif key in cfgfile_autoset_opts:
|
|
||||||
setattr(cfg, key, get_autoset_opt(cfg,key,cfgfile_autoset_opts[key],src='cfgfile'))
|
|
||||||
else:
|
|
||||||
setattr(cfg, key, cfg._autoset_opts[key].choices[0])
|
|
||||||
|
|
||||||
set_auto_typeset_opts(cfg,cfgfile_auto_typeset_opts)
|
if 'version' in uopts:
|
||||||
|
self.version() # exits
|
||||||
|
|
||||||
if cfg.debug and gc.prog_name != 'test.py':
|
if 'show_hash_presets' in uopts:
|
||||||
cfg.verbose = True
|
self.show_hash_presets() # exits
|
||||||
|
|
||||||
if cfg.verbose:
|
if parse_only:
|
||||||
cfg.quiet = False
|
|
||||||
|
|
||||||
if cfg.debug_opts:
|
|
||||||
opt_postproc_debug(cfg)
|
|
||||||
|
|
||||||
# Check user-set opts without modifying them
|
|
||||||
check_usr_opts(cfg,po.user_opts)
|
|
||||||
|
|
||||||
from .util import Util
|
|
||||||
cfg._util = Util(cfg)
|
|
||||||
cfg._uopts = po.user_opts
|
|
||||||
cfg._args = po.cmd_args
|
|
||||||
|
|
||||||
cfg._lock()
|
|
||||||
|
|
||||||
if need_proto:
|
|
||||||
from .protocol import warn_trustlevel,init_proto_from_cfg
|
|
||||||
warn_trustlevel(cfg)
|
|
||||||
# requires the default-to-none behavior, so do after the lock:
|
|
||||||
cfg._proto = init_proto_from_cfg(cfg,need_amt=need_amt)
|
|
||||||
|
|
||||||
# print help screen only after globals initialized and locked:
|
|
||||||
if cfg.help or cfg.longhelp:
|
|
||||||
if not do_post_init:
|
|
||||||
print_help(cfg,opts_data,opt_filter) # exits
|
|
||||||
|
|
||||||
if do_post_init:
|
|
||||||
global opts_data_save,opt_filter_save
|
|
||||||
opts_data_save = opts_data
|
|
||||||
opt_filter_save = opt_filter
|
|
||||||
else:
|
|
||||||
delete_data(opts_data)
|
|
||||||
|
|
||||||
def check_usr_opts(cfg,usr_opts): # Raises an exception if any check fails
|
|
||||||
|
|
||||||
def opt_splits(val,sep,n,desc):
|
|
||||||
sepword = 'comma' if sep == ',' else 'colon' if sep == ':' else repr(sep)
|
|
||||||
try:
|
|
||||||
l = val.split(sep)
|
|
||||||
except:
|
|
||||||
die( 'UserOptError', f'{val!r}: invalid {desc} (not {sepword}-separated list)' )
|
|
||||||
|
|
||||||
if len(l) != n:
|
|
||||||
die( 'UserOptError', f'{val!r}: invalid {desc} ({n} {sepword}-separated items required)' )
|
|
||||||
|
|
||||||
def opt_compares(val,op_str,target,desc,desc2=''):
|
|
||||||
import operator as o
|
|
||||||
op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str]
|
|
||||||
if not op_f(val,target):
|
|
||||||
d2 = desc2 + ' ' if desc2 else ''
|
|
||||||
die( 'UserOptError', f'{val}: invalid {desc} ({d2}not {op_str} {target})' )
|
|
||||||
|
|
||||||
def opt_is_int(val,desc):
|
|
||||||
if not is_int(val):
|
|
||||||
die( 'UserOptError', f'{val!r}: invalid {desc} (not an integer)' )
|
|
||||||
|
|
||||||
def opt_is_float(val,desc):
|
|
||||||
try:
|
|
||||||
float(val)
|
|
||||||
except:
|
|
||||||
die( 'UserOptError', f'{val!r}: invalid {desc} (not a floating-point number)' )
|
|
||||||
|
|
||||||
def opt_is_in_list(val,tlist,desc):
|
|
||||||
if val not in tlist:
|
|
||||||
q,sep = (('',','),("'","','"))[type(tlist[0]) == str]
|
|
||||||
die( 'UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format(
|
|
||||||
v = val,
|
|
||||||
w = desc,
|
|
||||||
q = q,
|
|
||||||
o = sep.join(map(str,sorted(tlist))) ))
|
|
||||||
|
|
||||||
def opt_unrecognized(key,val,desc='value'):
|
|
||||||
die( 'UserOptError', f'{val!r}: unrecognized {desc} for option {fmt_opt(key)!r}' )
|
|
||||||
|
|
||||||
def opt_display(key,val='',beg='For selected',end=':\n'):
|
|
||||||
from .util import msg_r
|
|
||||||
msg_r('{} option {!r}{}'.format(
|
|
||||||
beg,
|
|
||||||
f'{fmt_opt(key)}={val}' if val else fmt_opt(key),
|
|
||||||
end ))
|
|
||||||
|
|
||||||
def chk_in_fmt(key,val,desc):
|
|
||||||
from .wallet import get_wallet_data
|
|
||||||
wd = get_wallet_data(fmt_code=val)
|
|
||||||
if not wd:
|
|
||||||
opt_unrecognized(key,val)
|
|
||||||
if key == 'out_fmt':
|
|
||||||
p = 'hidden_incog_output_params'
|
|
||||||
|
|
||||||
if wd.type == 'incog_hidden' and not getattr(opt,p):
|
|
||||||
die( 'UserOptError',
|
|
||||||
'Hidden incog format output requested. ' +
|
|
||||||
f'You must supply a file and offset with the {fmt_opt(p)!r} option' )
|
|
||||||
|
|
||||||
if wd.base_type == 'incog_base' and opt.old_incog_fmt:
|
|
||||||
opt_display(key,val,beg='Selected',end=' ')
|
|
||||||
opt_display('old_incog_fmt',beg='conflicts with',end=':\n')
|
|
||||||
die( 'UserOptError', 'Export to old incog wallet format unsupported' )
|
|
||||||
elif wd.type == 'brain':
|
|
||||||
die( 'UserOptError', 'Output to brainwallet format unsupported' )
|
|
||||||
|
|
||||||
chk_out_fmt = chk_in_fmt
|
|
||||||
|
|
||||||
def chk_hidden_incog_input_params(key,val,desc):
|
|
||||||
a = val.rsplit(',',1) # permit comma in filename
|
|
||||||
if len(a) != 2:
|
|
||||||
opt_display(key,val)
|
|
||||||
die( 'UserOptError', 'Option requires two comma-separated arguments' )
|
|
||||||
|
|
||||||
fn,offset = a
|
|
||||||
opt_is_int(offset,desc)
|
|
||||||
|
|
||||||
from .fileutil import check_infile,check_outdir,check_outfile
|
|
||||||
if key == 'hidden_incog_input_params':
|
|
||||||
check_infile(fn,blkdev_ok=True)
|
|
||||||
key2 = 'in_fmt'
|
|
||||||
else:
|
|
||||||
try: os.stat(fn)
|
|
||||||
except:
|
|
||||||
b = os.path.dirname(fn)
|
|
||||||
if b:
|
|
||||||
check_outdir(b)
|
|
||||||
else:
|
|
||||||
check_outfile(fn,blkdev_ok=True)
|
|
||||||
key2 = 'out_fmt'
|
|
||||||
|
|
||||||
if hasattr(opt,key2):
|
|
||||||
val2 = getattr(opt,key2)
|
|
||||||
from .wallet import get_wallet_data
|
|
||||||
wd = get_wallet_data('incog_hidden')
|
|
||||||
if val2 and val2 not in wd.fmt_codes:
|
|
||||||
die( 'UserOptError', f'Option conflict:\n {fmt_opt(key)}, with\n {fmt_opt(key2)}={val2}' )
|
|
||||||
|
|
||||||
chk_hidden_incog_output_params = chk_hidden_incog_input_params
|
|
||||||
|
|
||||||
def chk_subseeds(key,val,desc):
|
|
||||||
from .subseed import SubSeedIdxRange
|
|
||||||
opt_is_int(val,desc)
|
|
||||||
opt_compares(int(val),'>=',SubSeedIdxRange.min_idx,desc)
|
|
||||||
opt_compares(int(val),'<=',SubSeedIdxRange.max_idx,desc)
|
|
||||||
|
|
||||||
def chk_seed_len(key,val,desc):
|
|
||||||
from .seed import Seed
|
|
||||||
opt_is_int(val,desc)
|
|
||||||
opt_is_in_list(int(val),Seed.lens,desc)
|
|
||||||
|
|
||||||
def chk_hash_preset(key,val,desc):
|
|
||||||
from .crypto import Crypto
|
|
||||||
opt_is_in_list(val,list(Crypto.hash_presets.keys()),desc)
|
|
||||||
|
|
||||||
def chk_brain_params(key,val,desc):
|
|
||||||
a = val.split(',')
|
|
||||||
if len(a) != 2:
|
|
||||||
opt_display(key,val)
|
|
||||||
die( 'UserOptError', 'Option requires two comma-separated arguments' )
|
|
||||||
opt_is_int(a[0],'seed length '+desc)
|
|
||||||
from .seed import Seed
|
|
||||||
opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc)
|
|
||||||
from .crypto import Crypto
|
|
||||||
opt_is_in_list(a[1],list(Crypto.hash_presets.keys()),'hash preset '+desc)
|
|
||||||
|
|
||||||
def chk_usr_randchars(key,val,desc):
|
|
||||||
if val == 0:
|
|
||||||
return
|
return
|
||||||
opt_is_int(val,desc)
|
|
||||||
opt_compares(val,'>=',cfg.min_urandchars,desc)
|
|
||||||
opt_compares(val,'<=',cfg.max_urandchars,desc)
|
|
||||||
|
|
||||||
def chk_tx_fee(key,val,desc):
|
if 'data_dir' in uopts:
|
||||||
pass
|
cfg._data_dir_root_override = uopts['data_dir']
|
||||||
# opt_is_tx_fee(key,val,desc) # TODO: move this check elsewhere
|
del uopts['data_dir']
|
||||||
|
|
||||||
def chk_tx_confs(key,val,desc):
|
def init_bottom(self,cfg):
|
||||||
opt_is_int(val,desc)
|
|
||||||
opt_compares(val,'>=',1,desc)
|
|
||||||
|
|
||||||
def chk_vsize_adj(key,val,desc):
|
# print help screen only after globals initialized and locked:
|
||||||
opt_is_float(val,desc)
|
if cfg.help or cfg.longhelp:
|
||||||
from .util import ymsg
|
self.print_help(cfg) # exits
|
||||||
ymsg(f'Adjusting transaction vsize by a factor of {float(val):1.2f}')
|
|
||||||
|
|
||||||
def chk_daemon_id(key,val,desc):
|
# delete unneeded data:
|
||||||
from .daemon import CoinDaemon
|
for k in ('text','notes','code'):
|
||||||
opt_is_in_list(val,CoinDaemon.all_daemon_ids(),desc)
|
if k in self.opts_data:
|
||||||
|
del self.opts_data[k]
|
||||||
|
del Opts.make_help
|
||||||
|
del Opts.process_uopts
|
||||||
|
del Opts.parse_opts
|
||||||
|
|
||||||
# TODO: move this check elsewhere
|
def usage(self):
|
||||||
# def chk_rbf(key,val,desc):
|
from .util import Die
|
||||||
# if not proto.cap('rbf'):
|
Die(1,Opts.make_usage_str(gc.prog_name,'user',self.usage_data))
|
||||||
# die( 'UserOptError', f'--rbf requested, but {proto.coin} does not support replace-by-fee transactions' )
|
|
||||||
|
|
||||||
# def chk_bob(key,val,desc):
|
def version(self):
|
||||||
# from .proto.btc.regtest import MMGenRegtest
|
from .util import Die,fmt
|
||||||
# try:
|
Die(0,fmt(f"""
|
||||||
# os.stat(os.path.join(MMGenRegtest(cfg,cfg.coin).d.datadir,'regtest','debug.log'))
|
{gc.prog_name.upper()} version {gc.version}
|
||||||
# except:
|
Part of the {gc.proj_name} suite, an online/offline cryptocurrency wallet for the
|
||||||
# die( 'UserOptError',
|
command line. Copyright (C){gc.Cdates} {gc.author} {gc.email}
|
||||||
# 'Regtest (Bob and Alice) mode not set up yet. ' +
|
""",indent=' ').rstrip())
|
||||||
# f"Run '{gc.proj_name.lower()}-regtest setup' to initialize." )
|
|
||||||
#
|
|
||||||
# chk_alice = chk_bob
|
|
||||||
|
|
||||||
def chk_locktime(key,val,desc):
|
def print_help(self,cfg):
|
||||||
opt_is_int(val,desc)
|
|
||||||
opt_compares(int(val),'>',0,desc)
|
|
||||||
|
|
||||||
def chk_columns(key,val,desc):
|
if not 'code' in self.opts_data:
|
||||||
opt_compares(int(val),'>',10,desc)
|
self.opts_data['code'] = {}
|
||||||
|
|
||||||
# TODO: move this check elsewhere
|
from .protocol import init_proto_from_cfg
|
||||||
# def chk_token(key,val,desc):
|
proto = init_proto_from_cfg(cfg,need_amt=True)
|
||||||
# if not 'token' in proto.caps:
|
|
||||||
# die( 'UserOptError', f'Coin {tx.coin!r} does not support the --token option' )
|
|
||||||
# if len(val) == 40 and is_hex_str(val):
|
|
||||||
# return
|
|
||||||
# if len(val) > 20 or not all(s.isalnum() for s in val):
|
|
||||||
# die( 'UserOptError', f'{val!r}: invalid parameter for --token option' )
|
|
||||||
|
|
||||||
from .util import is_int,die,Msg
|
if getattr(cfg,'longhelp',None):
|
||||||
|
self.opts_data['code']['long_options'] = long_opts_data['code']
|
||||||
|
def remove_unneeded_long_opts():
|
||||||
|
d = self.opts_data['text']['long_options']
|
||||||
|
if proto.base_proto != 'Ethereum':
|
||||||
|
d = '\n'.join(''+i for i in d.split('\n') if not '--token' in i)
|
||||||
|
self.opts_data['text']['long_options'] = d
|
||||||
|
remove_unneeded_long_opts()
|
||||||
|
|
||||||
cfuncs = { k:v for k,v in locals().items() if k.startswith('chk_') }
|
from .ui import do_pager
|
||||||
|
do_pager(Opts.make_help( cfg, proto, self.opts_data, self.opt_filter ))
|
||||||
|
|
||||||
for key in usr_opts:
|
sys.exit(0)
|
||||||
val = getattr(cfg,key)
|
|
||||||
desc = f'parameter for {fmt_opt(key)!r} option'
|
|
||||||
|
|
||||||
if key in cfg._infile_opts:
|
def show_hash_presets(self):
|
||||||
from .fileutil import check_infile
|
fs = ' {:<6} {:<3} {:<2} {}'
|
||||||
check_infile(val) # file exists and is readable - dies on error
|
from .util import msg
|
||||||
elif key == 'outdir':
|
from .crypto import Crypto
|
||||||
from .fileutil import check_outdir
|
msg(' Available parameters for scrypt.hash():')
|
||||||
check_outdir(val) # dies on error
|
msg(fs.format('Preset','N','r','p'))
|
||||||
elif 'chk_'+key in cfuncs:
|
for i in sorted(Crypto.hash_presets.keys()):
|
||||||
cfuncs['chk_'+key](key,val,desc)
|
msg(fs.format(i,*Crypto.hash_presets[i]))
|
||||||
elif cfg.debug:
|
msg(' N = memory usage (power of two)\n p = iterations (rounds)')
|
||||||
Msg(f'check_usr_opts(): No test for opt {key!r}')
|
sys.exit(0)
|
||||||
|
|
||||||
def set_auto_typeset_opts(cfg,cfgfile_auto_typeset_opts):
|
|
||||||
|
|
||||||
def do_set(key,val,ref_type):
|
|
||||||
assert not hasattr(cfg,key), f'{key!r} is in cfg!'
|
|
||||||
setattr(cfg,key,None if val is None else ref_type(val))
|
|
||||||
|
|
||||||
for key,ref_type in cfg._auto_typeset_opts.items():
|
|
||||||
if hasattr(opt,key):
|
|
||||||
do_set(key, getattr(opt,key), ref_type)
|
|
||||||
elif key in cfgfile_auto_typeset_opts:
|
|
||||||
do_set(key, cfgfile_auto_typeset_opts[key], ref_type)
|
|
||||||
|
|
||||||
def get_autoset_opt(cfg,key,val,src):
|
|
||||||
|
|
||||||
def die_on_err(desc):
|
|
||||||
from .util import fmt_list,die
|
|
||||||
die(
|
|
||||||
'UserOptError',
|
|
||||||
'{a!r}: invalid {b} (not {c}: {d})'.format(
|
|
||||||
a = val,
|
|
||||||
b = {
|
|
||||||
'cmdline': 'parameter for option --{}'.format(key.replace('_','-')),
|
|
||||||
'cfgfile': 'value for cfg file option {!r}'.format(key)
|
|
||||||
}[src],
|
|
||||||
c = desc,
|
|
||||||
d = fmt_list(data.choices) ))
|
|
||||||
|
|
||||||
class opt_type:
|
|
||||||
|
|
||||||
def nocase_str():
|
|
||||||
if val.lower() in data.choices:
|
|
||||||
return val.lower()
|
|
||||||
else:
|
|
||||||
die_on_err('one of')
|
|
||||||
|
|
||||||
def nocase_pfx():
|
|
||||||
cs = [s for s in data.choices if s.startswith(val.lower())]
|
|
||||||
if len(cs) == 1:
|
|
||||||
return cs[0]
|
|
||||||
else:
|
|
||||||
die_on_err('unique substring of')
|
|
||||||
|
|
||||||
data = cfg._autoset_opts[key]
|
|
||||||
|
|
||||||
return getattr(opt_type,data.type)()
|
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,7 @@ class TestSuiteMisc(TestSuiteBase):
|
||||||
cmd_group = (
|
cmd_group = (
|
||||||
('rpc_backends', 'RPC backends'),
|
('rpc_backends', 'RPC backends'),
|
||||||
('xmrwallet_txview', "'mmgen-xmrwallet' txview"),
|
('xmrwallet_txview', "'mmgen-xmrwallet' txview"),
|
||||||
|
('coin_daemon_info', "'examples/coin-daemon-info.py'"),
|
||||||
('term_echo', "term.set('echo')"),
|
('term_echo', "term.set('echo')"),
|
||||||
('term_cleanup', 'term.register_cleanup()'),
|
('term_cleanup', 'term.register_cleanup()'),
|
||||||
)
|
)
|
||||||
|
|
@ -56,6 +57,16 @@ class TestSuiteMisc(TestSuiteBase):
|
||||||
assert s in res, s
|
assert s in res, s
|
||||||
return t
|
return t
|
||||||
|
|
||||||
|
def coin_daemon_info(self):
|
||||||
|
start_test_daemons('ltc','eth')
|
||||||
|
t = self.spawn(f'examples/coin-daemon-info.py',['btc','ltc','eth'],cmd_dir='.')
|
||||||
|
for s in ('BTC','LTC','ETH'):
|
||||||
|
t.expect(s + r'\s+mainnet\s+Up',regex=True)
|
||||||
|
if cfg.pexpect_spawn:
|
||||||
|
t.send('q')
|
||||||
|
stop_test_daemons('ltc','eth')
|
||||||
|
return t
|
||||||
|
|
||||||
def term_echo(self):
|
def term_echo(self):
|
||||||
|
|
||||||
def test_echo():
|
def test_echo():
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue