cfg.py: cleanups, improve options checking

This commit is contained in:
The MMGen Project 2023-04-06 09:14:02 +00:00
commit de8f6f34d1
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
3 changed files with 189 additions and 208 deletions

View file

@ -226,6 +226,10 @@ class Config(Lockable):
_use_cfg_file = False
_use_env = False
_forbidden_opts = (
'data_dir_root',
)
_incompatible_opts = (
('help','longhelp'),
('bob','alice','carol'),
@ -410,10 +414,14 @@ class Config(Lockable):
opt_filter = opt_filter,
parse_only = parse_only,
parsed_opts = parsed_opts )
desc = 'command-line option'
self._uopt_desc = 'command-line option'
else:
self._uopts = {} if cfg is None else cfg
desc = 'configuration option'
self._uopt_desc = 'configuration option'
if 'data_dir' in self._uopts:
self._data_dir_root_override = self._uopts['data_dir']
del self._uopts['data_dir']
if parse_only and not any(k in self._uopts for k in ['help','longhelp']):
return
@ -422,11 +430,16 @@ class Config(Lockable):
# class attribute, if it exists:
auto_opts = tuple(self._autoset_opts) + tuple(self._auto_typeset_opts)
for key,val in self._uopts.items():
assert key.isascii() and key.isidentifier() and key[0] != '_', '{key!r}: malformed configuration option'
assert key not in self._forbidden_opts, '{key!r}: forbidden configuration option'
if key not in auto_opts:
setattr(self, key, set_for_type(key, val, getattr(self,key), desc) if hasattr(self,key) else val)
setattr(
self,
key,
conv_type(key, val, getattr(self,key), self._uopt_desc ) if hasattr(self,key) else val )
# Step 3: set cfg from environment, skipping already-set opts; save names set from environment:
env_cfg = tuple(self._set_cfg_from_env()) if self._use_env else ()
self._envopts = tuple(self._set_cfg_from_env()) if self._use_env else ()
from .term import init_term
init_term(self) # requires ‘hold_protect_disable’ (set from env)
@ -437,15 +450,16 @@ class Config(Lockable):
from .util import wrap_ripemd160
wrap_ripemd160() # ripemd160 required by mmgen_cfg_file() in _set_cfg_from_cfg_file()
# Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save auto opts to be set:
# (cfgfile.py requires ‘data_dir_root’, ‘test_suite_cfgtest’)
cfgfile_opts = self._set_cfg_from_cfg_file( env_cfg, need_proto )
# Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save set opts and auto
# opts to be set:
# requires ‘data_dir_root’, ‘test_suite_cfgtest’
self._cfgfile_opts = self._set_cfg_from_cfg_file( self._envopts, need_proto )
# Step 5: set autoset opts from user-supplied data, cfgfile data, or default values, in that order:
self._set_autoset_opts( cfgfile_opts.autoset )
self._set_autoset_opts( self._cfgfile_opts.autoset )
# Step 6: set auto typeset opts from user-supplied data or cfgfile data, in that order:
self._set_auto_typeset_opts( cfgfile_opts.auto_typeset )
self._set_auto_typeset_opts( self._cfgfile_opts.auto_typeset )
if self.regtest or self.bob or self.alice or self.carol or gc.prog_name == 'mmgen-regtest':
self.network = 'regtest'
@ -487,7 +501,7 @@ class Config(Lockable):
self._opts.init_bottom(self)
# Check user-set opts without modifying them
check_usr_opts(self,self._uopts)
check_opts(self)
def _set_cfg_from_env(self):
for name,val in ((k,v) for k,v in os.environ.items() if k.startswith('MMGEN_')):
@ -497,13 +511,13 @@ class Config(Lockable):
if val: # ignore empty string values; string value of '0' or 'false' sets variable to False
disable = name.startswith('MMGEN_DISABLE_')
gname = name[(6,14)[disable]:].lower()
if gname in self._uopts: # don’t touch attr if already set on cmdline
if gname in self._uopts: # don’t touch attr if already set by user
continue
elif hasattr(self,gname):
setattr(
self,
gname,
set_for_type( name, val, getattr(self,gname), 'environment var', invert_bool=disable ))
conv_type( name, val, getattr(self,gname), 'environment var', invert_bool=disable ))
yield gname
else:
raise ValueError(f'Name {gname!r} not present in globals')
@ -515,10 +529,10 @@ class Config(Lockable):
env_cfg,
need_proto ):
_ret = namedtuple('cfgfile_opts',['autoset','auto_typeset'])
_ret = namedtuple('cfgfile_opts',['non_auto','autoset','auto_typeset'])
if not self._use_cfg_file:
return _ret( {}, {} )
return _ret( (), {}, {} )
# check for changes in system template file (term must be initialized)
from .cfgfile import mmgen_cfg_file
@ -526,11 +540,14 @@ class Config(Lockable):
ucfg = mmgen_cfg_file(self,'usr')
self._cfgfile_fn = ucfg.fn
if need_proto:
from .protocol import init_proto
autoset_opts = {}
auto_typeset_opts = {}
non_auto_opts = []
already_set = tuple(self._uopts) + env_cfg
for d in ucfg.get_lines():
@ -552,9 +569,10 @@ class Config(Lockable):
val = ucfg.parse_value(d.value,refval)
if not val:
die( 'CfgFileParseError', f'Parse error in file {ucfg.fn!r}, line {d.lineno}' )
val_conv = set_for_type( attr, val, refval, 'configuration file option', src=ucfg.fn )
val_conv = conv_type( attr, val, refval, 'configuration file option', src=ucfg.fn )
if not attr in already_set:
setattr(cls,attr,val_conv)
non_auto_opts.append(attr)
elif d.name in self._autoset_opts:
autoset_opts[d.name] = d.value
elif d.name in self._auto_typeset_opts:
@ -562,7 +580,7 @@ class Config(Lockable):
else:
die( 'CfgFileParseError', f'{d.name!r}: unrecognized option in {ucfg.fn!r}, line {d.lineno}' )
return _ret( autoset_opts, auto_typeset_opts )
return _ret( tuple(non_auto_opts), autoset_opts, auto_typeset_opts )
def _set_autoset_opts(self,cfgfile_autoset_opts):
@ -638,209 +656,176 @@ class Config(Lockable):
if len(bad) > 1:
die(1,'Conflicting options: {}'.format(', '.join(map(fmt_opt,bad))))
def check_usr_opts(cfg,usr_opts): # Raises an exception if any check fails
def check_opts(cfg): # Raises exception if any check fails
def opt_splits(val,sep,n,desc):
sepword = 'comma' if sep == ',' else 'colon' if sep == ':' else repr(sep)
try:
l = val.split(sep)
except:
die( 'UserOptError', f'{val!r}: invalid {desc} (not {sepword}-separated list)' )
def get_desc(desc_pfx=''):
return (
(desc_pfx + ' ' if desc_pfx else '')
+ (
f'parameter for command-line option {fmt_opt(name)!r}'
if name in cfg._uopts and 'command-line' in cfg._uopt_desc else
f'value for configuration option {name!r}'
)
+ ( ' from environment' if name in cfg._envopts else '')
+ (f' in {cfg._cfgfile_fn!r}' if name in cfg._cfgfile_opts.non_auto else '')
)
if len(l) != n:
die( 'UserOptError', f'{val!r}: invalid {desc} ({n} {sepword}-separated items required)' )
def display_opt(name,val='',beg='For selected',end=':\n'):
from .util import msg_r
msg_r('{} option {!r}{}'.format(
beg,
f'{fmt_opt(name)}={val}' if val else fmt_opt(name),
end ))
def opt_compares(val,op_str,target,desc,desc2=''):
import operator as o
op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str]
if not op_f(val,target):
if desc2:
desc2 += ' '
die( 'UserOptError', f'{val}: invalid {desc} ({desc2}not {op_str} {target})' )
def opt_compares(val,op_str,target):
import operator
if not {
'<': operator.lt,
'<=': operator.le,
'>': operator.gt,
'>=': operator.ge,
'=': operator.eq,
}[op_str](val,target):
die( 'UserOptError', f'{val}: invalid {get_desc()} (not {op_str} {target})' )
def opt_is_int(val,desc):
def opt_is_int(val,desc_pfx=''):
if not is_int(val):
die( 'UserOptError', f'{val!r}: invalid {desc} (not an integer)' )
die( 'UserOptError', f'{val!r}: invalid {get_desc(desc_pfx)} (not an integer)' )
def opt_is_float(val,desc):
try:
float(val)
except:
die( 'UserOptError', f'{val!r}: invalid {desc} (not a floating-point number)' )
def opt_is_in_list(val,tlist,desc):
def opt_is_in_list(val,tlist,desc_pfx=''):
if val not in tlist:
q,sep = (('',','),("'","','"))[type(tlist[0]) == str]
die( 'UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format(
v = val,
w = desc,
w = get_desc(desc_pfx),
q = q,
o = sep.join(map(str,sorted(tlist))) ))
def opt_unrecognized(key,val,desc='value'):
die( 'UserOptError', f'{val!r}: unrecognized {desc} for option {fmt_opt(key)!r}' )
def opt_unrecognized():
die( 'UserOptError', f'{val!r}: unrecognized {get_desc()}' )
def opt_display(key,val='',beg='For selected',end=':\n'):
from .util import msg_r
msg_r('{} option {!r}{}'.format(
beg,
f'{fmt_opt(key)}={val}' if val else fmt_opt(key),
end ))
class check_funcs:
def chk_in_fmt(key,val,desc):
from .wallet import get_wallet_data
wd = get_wallet_data(fmt_code=val)
if not wd:
opt_unrecognized(key,val)
if key == 'out_fmt':
p = 'hidden_incog_output_params'
if wd.type == 'incog_hidden' and not getattr(cfg,p):
die( 'UserOptError',
'Hidden incog format output requested. ' +
f'You must supply a file and offset with the {fmt_opt(p)!r} option' )
if wd.base_type == 'incog_base' and cfg.old_incog_fmt:
opt_display(key,val,beg='Selected',end=' ')
opt_display('old_incog_fmt',beg='conflicts with',end=':\n')
die( 'UserOptError', 'Export to old incog wallet format unsupported' )
elif wd.type == 'brain':
die( 'UserOptError', 'Output to brainwallet format unsupported' )
chk_out_fmt = chk_in_fmt
def chk_hidden_incog_input_params(key,val,desc):
a = val.rsplit(',',1) # permit comma in filename
if len(a) != 2:
opt_display(key,val)
die( 'UserOptError', 'Option requires two comma-separated arguments' )
fn,offset = a
opt_is_int(offset,desc)
from .fileutil import check_infile,check_outdir,check_outfile
if key == 'hidden_incog_input_params':
check_infile(fn,blkdev_ok=True)
key2 = 'in_fmt'
else:
try: os.stat(fn)
except:
b = os.path.dirname(fn)
if b:
check_outdir(b)
else:
check_outfile(fn,blkdev_ok=True)
key2 = 'out_fmt'
if hasattr(cfg,key2):
val2 = getattr(cfg,key2)
def in_fmt():
from .wallet import get_wallet_data
wd = get_wallet_data('incog_hidden')
if val2 and val2 not in wd.fmt_codes:
die( 'UserOptError', f'Option conflict:\n {fmt_opt(key)}, with\n {fmt_opt(key2)}={val2}' )
wd = get_wallet_data(fmt_code=val)
if not wd:
opt_unrecognized()
if name == 'out_fmt':
p = 'hidden_incog_output_params'
chk_hidden_incog_output_params = chk_hidden_incog_input_params
if wd.type == 'incog_hidden' and not getattr(cfg,p):
die( 'UserOptError',
'Hidden incog format output requested. ' +
f'You must supply a file and offset with the {fmt_opt(p)!r} option' )
def chk_subseeds(key,val,desc):
from .subseed import SubSeedIdxRange
opt_is_int(val,desc)
opt_compares(int(val),'>=',SubSeedIdxRange.min_idx,desc)
opt_compares(int(val),'<=',SubSeedIdxRange.max_idx,desc)
if wd.base_type == 'incog_base' and cfg.old_incog_fmt:
display_opt(name,val,beg='Selected',end=' ')
display_opt('old_incog_fmt',beg='conflicts with',end=':\n')
die( 'UserOptError', 'Export to old incog wallet format unsupported' )
elif wd.type == 'brain':
die( 'UserOptError', 'Output to brainwallet format unsupported' )
def chk_seed_len(key,val,desc):
from .seed import Seed
opt_is_int(val,desc)
opt_is_in_list(int(val),Seed.lens,desc)
out_fmt = in_fmt
def chk_hash_preset(key,val,desc):
from .crypto import Crypto
opt_is_in_list(val,list(Crypto.hash_presets.keys()),desc)
def hidden_incog_input_params():
a = val.rsplit(',',1) # permit comma in filename
if len(a) != 2:
display_opt(name,val)
die( 'UserOptError', 'Option requires two comma-separated arguments' )
def chk_brain_params(key,val,desc):
a = val.split(',')
if len(a) != 2:
opt_display(key,val)
die( 'UserOptError', 'Option requires two comma-separated arguments' )
opt_is_int(a[0],'seed length '+desc)
from .seed import Seed
opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc)
from .crypto import Crypto
opt_is_in_list(a[1],list(Crypto.hash_presets.keys()),'hash preset '+desc)
fn,offset = a
opt_is_int(offset)
def chk_usr_randchars(key,val,desc):
if val == 0:
return
opt_is_int(val,desc)
opt_compares(val,'>=',cfg.min_urandchars,desc)
opt_compares(val,'<=',cfg.max_urandchars,desc)
from .fileutil import check_infile,check_outdir,check_outfile
if name == 'hidden_incog_input_params':
check_infile(fn,blkdev_ok=True)
key2 = 'in_fmt'
else:
try: os.stat(fn)
except:
b = os.path.dirname(fn)
if b:
check_outdir(b)
else:
check_outfile(fn,blkdev_ok=True)
key2 = 'out_fmt'
def chk_tx_fee(key,val,desc):
pass
# opt_is_tx_fee(key,val,desc) # TODO: move this check elsewhere
if hasattr(cfg,key2):
val2 = getattr(cfg,key2)
from .wallet import get_wallet_data
wd = get_wallet_data('incog_hidden')
if val2 and val2 not in wd.fmt_codes:
die( 'UserOptError', f'Option conflict:\n {fmt_opt(name)}, with\n {fmt_opt(key2)}={val2}' )
def chk_tx_confs(key,val,desc):
opt_is_int(val,desc)
opt_compares(val,'>=',1,desc)
hidden_incog_output_params = hidden_incog_input_params
def chk_vsize_adj(key,val,desc):
opt_is_float(val,desc)
from .util import ymsg
ymsg(f'Adjusting transaction vsize by a factor of {float(val):1.2f}')
def subseeds():
from .subseed import SubSeedIdxRange
opt_compares(val,'>=',SubSeedIdxRange.min_idx)
opt_compares(val,'<=',SubSeedIdxRange.max_idx)
def chk_daemon_id(key,val,desc):
from .daemon import CoinDaemon
opt_is_in_list(val,CoinDaemon.all_daemon_ids(),desc)
def seed_len():
from .seed import Seed
opt_is_in_list(int(val),Seed.lens)
# TODO: move this check elsewhere
# def chk_rbf(key,val,desc):
# if not proto.cap('rbf'):
# die( 'UserOptError', f'--rbf requested, but {proto.coin} does not support replace-by-fee transactions' )
def hash_preset():
from .crypto import Crypto
opt_is_in_list(val,list(Crypto.hash_presets.keys()))
# def chk_bob(key,val,desc):
# from .proto.btc.regtest import MMGenRegtest
# try:
# os.stat(os.path.join(MMGenRegtest(cfg,cfg.coin).d.datadir,'regtest','debug.log'))
# except:
# die( 'UserOptError',
# 'Regtest (Bob and Alice) mode not set up yet. ' +
# f"Run '{gc.proj_name.lower()}-regtest setup' to initialize." )
#
# chk_alice = chk_bob
def brain_params():
a = val.split(',')
if len(a) != 2:
display_opt(name,val)
die( 'UserOptError', 'Option requires two comma-separated arguments' )
def chk_locktime(key,val,desc):
opt_is_int(val,desc)
opt_compares(int(val),'>',0,desc)
opt_is_int( a[0], desc_pfx='seed length' )
from .seed import Seed
opt_is_in_list( int(a[0]), Seed.lens, desc_pfx='seed length' )
def chk_columns(key,val,desc):
opt_compares(int(val),'>',10,desc)
from .crypto import Crypto
opt_is_in_list( a[1], list(Crypto.hash_presets.keys()), desc_pfx='hash preset' )
# TODO: move this check elsewhere
# def chk_token(key,val,desc):
# if not 'token' in proto.caps:
# die( 'UserOptError', f'Coin {tx.coin!r} does not support the --token option' )
# if len(val) == 40 and is_hex_str(val):
# return
# if len(val) > 20 or not all(s.isalnum() for s in val):
# die( 'UserOptError', f'{val!r}: invalid parameter for --token option' )
def usr_randchars():
if val != 0:
opt_compares(val,'>=',cfg.min_urandchars)
opt_compares(val,'<=',cfg.max_urandchars)
def tx_confs():
opt_is_int(val)
opt_compares(int(val),'>=',1)
def vsize_adj():
from .util import ymsg
ymsg(f'Adjusting transaction vsize by a factor of {val:1.2f}')
def daemon_id():
from .daemon import CoinDaemon
opt_is_in_list(val,CoinDaemon.all_daemon_ids())
def locktime():
opt_is_int(val)
opt_compares(int(val),'>',0)
def columns():
opt_compares(val,'>',10)
from .util import is_int,Msg
cfuncs = { k:v for k,v in locals().items() if k.startswith('chk_') }
for key in usr_opts:
val = getattr(cfg,key)
desc = f'parameter for {fmt_opt(key)!r} option'
if key in cfg._infile_opts:
# TODO: add checks for token, rbf, tx_fee
check_funcs_names = tuple(check_funcs.__dict__)
for name in tuple(cfg._uopts) + cfg._envopts + cfg._cfgfile_opts.non_auto:
val = getattr(cfg,name)
if name in cfg._infile_opts:
from .fileutil import check_infile
check_infile(val) # file exists and is readable - dies on error
elif key == 'outdir':
elif name == 'outdir':
from .fileutil import check_outdir
check_outdir(val) # dies on error
elif 'chk_'+key in cfuncs:
cfuncs['chk_'+key](key,val,desc)
elif name in check_funcs_names:
getattr(check_funcs,name)()
elif cfg.debug:
Msg(f'check_usr_opts(): No test for config opt {key!r}')
Msg(f'check_opts(): No test for config opt {name!r}')
def fmt_opt(o):
return '--' + o.replace('_','-')
@ -856,7 +841,7 @@ def opt_postproc_debug(cfg):
Msg(' {}\n'.format('\n '.join(b)))
Msg('\n=== end opts.py debug ===\n')
def set_for_type(
def conv_type(
name,
val,
refval,
@ -864,6 +849,14 @@ def set_for_type(
invert_bool = False,
src = None ):
def do_fail():
die(1,'{a!r}: invalid value for {b} {c!r}{d} (must be of type {e!r})'.format(
a = val,
b = desc,
c = fmt_opt(name) if 'command-line' in desc else name,
d = f' in {src!r}' if src else '',
e = type(refval).__name__ ))
if type(refval) == bool:
v = str(val).lower()
ret = (
@ -871,17 +864,9 @@ def set_for_type(
False if v in ('false','no','none','0','off','') else
None
)
if ret is not None:
return not ret if invert_bool else ret
return do_fail() if ret is None else (not ret) if invert_bool else ret
else:
try:
return type(refval)(not val if invert_bool else val)
except:
pass
die(1,'{a!r}: invalid value for {b} {c!r}{d} (must be of type {e!r})'.format(
a = val,
b = desc,
c = fmt_opt(name) if desc == 'command-line option' else name,
d = f' in {src!r}' if src else '',
e = type(refval).__name__ ))
do_fail()

View file

@ -1 +1 @@
13.3.dev44
13.3.dev45

View file

@ -134,10 +134,6 @@ class UserOpts:
if parse_only:
return
if 'data_dir' in uopts:
cfg._data_dir_root_override = uopts['data_dir']
del uopts['data_dir']
def init_bottom(self,cfg):
# print help screen only after globals initialized and locked: