From de8f6f34d1b7f2b62661fc962bdad48c65cc5ce0 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Thu, 6 Apr 2023 09:14:02 +0000 Subject: [PATCH] cfg.py: cleanups, improve options checking --- mmgen/cfg.py | 367 ++++++++++++++++++++++----------------------- mmgen/data/version | 2 +- mmgen/opts.py | 4 - 3 files changed, 177 insertions(+), 196 deletions(-) diff --git a/mmgen/cfg.py b/mmgen/cfg.py index 5ef23c45..5efa3304 100755 --- a/mmgen/cfg.py +++ b/mmgen/cfg.py @@ -226,6 +226,10 @@ class Config(Lockable): _use_cfg_file = False _use_env = False + _forbidden_opts = ( + 'data_dir_root', + ) + _incompatible_opts = ( ('help','longhelp'), ('bob','alice','carol'), @@ -410,10 +414,14 @@ class Config(Lockable): opt_filter = opt_filter, parse_only = parse_only, parsed_opts = parsed_opts ) - desc = 'command-line option' + self._uopt_desc = 'command-line option' else: self._uopts = {} if cfg is None else cfg - desc = 'configuration option' + self._uopt_desc = 'configuration option' + + if 'data_dir' in self._uopts: + self._data_dir_root_override = self._uopts['data_dir'] + del self._uopts['data_dir'] if parse_only and not any(k in self._uopts for k in ['help','longhelp']): return @@ -422,11 +430,16 @@ class Config(Lockable): # class attribute, if it exists: auto_opts = tuple(self._autoset_opts) + tuple(self._auto_typeset_opts) for key,val in self._uopts.items(): + assert key.isascii() and key.isidentifier() and key[0] != '_', '{key!r}: malformed configuration option' + assert key not in self._forbidden_opts, '{key!r}: forbidden configuration option' if key not in auto_opts: - setattr(self, key, set_for_type(key, val, getattr(self,key), desc) if hasattr(self,key) else val) + setattr( + self, + key, + conv_type(key, val, getattr(self,key), self._uopt_desc ) if hasattr(self,key) else val ) # Step 3: set cfg from environment, skipping already-set opts; save names set from environment: - env_cfg = tuple(self._set_cfg_from_env()) if self._use_env else () + self._envopts = tuple(self._set_cfg_from_env()) if self._use_env else () from .term import init_term init_term(self) # requires ‘hold_protect_disable’ (set from env) @@ -437,15 +450,16 @@ class Config(Lockable): from .util import wrap_ripemd160 wrap_ripemd160() # ripemd160 required by mmgen_cfg_file() in _set_cfg_from_cfg_file() - # Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save auto opts to be set: - # (cfgfile.py requires ‘data_dir_root’, ‘test_suite_cfgtest’) - cfgfile_opts = self._set_cfg_from_cfg_file( env_cfg, need_proto ) + # Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save set opts and auto + # opts to be set: + # requires ‘data_dir_root’, ‘test_suite_cfgtest’ + self._cfgfile_opts = self._set_cfg_from_cfg_file( self._envopts, need_proto ) # Step 5: set autoset opts from user-supplied data, cfgfile data, or default values, in that order: - self._set_autoset_opts( cfgfile_opts.autoset ) + self._set_autoset_opts( self._cfgfile_opts.autoset ) # Step 6: set auto typeset opts from user-supplied data or cfgfile data, in that order: - self._set_auto_typeset_opts( cfgfile_opts.auto_typeset ) + self._set_auto_typeset_opts( self._cfgfile_opts.auto_typeset ) if self.regtest or self.bob or self.alice or self.carol or gc.prog_name == 'mmgen-regtest': self.network = 'regtest' @@ -487,7 +501,7 @@ class Config(Lockable): self._opts.init_bottom(self) # Check user-set opts without modifying them - check_usr_opts(self,self._uopts) + check_opts(self) def _set_cfg_from_env(self): for name,val in ((k,v) for k,v in os.environ.items() if k.startswith('MMGEN_')): @@ -497,13 +511,13 @@ class Config(Lockable): if val: # ignore empty string values; string value of '0' or 'false' sets variable to False disable = name.startswith('MMGEN_DISABLE_') gname = name[(6,14)[disable]:].lower() - if gname in self._uopts: # don’t touch attr if already set on cmdline + if gname in self._uopts: # don’t touch attr if already set by user continue elif hasattr(self,gname): setattr( self, gname, - set_for_type( name, val, getattr(self,gname), 'environment var', invert_bool=disable )) + conv_type( name, val, getattr(self,gname), 'environment var', invert_bool=disable )) yield gname else: raise ValueError(f'Name {gname!r} not present in globals') @@ -515,10 +529,10 @@ class Config(Lockable): env_cfg, need_proto ): - _ret = namedtuple('cfgfile_opts',['autoset','auto_typeset']) + _ret = namedtuple('cfgfile_opts',['non_auto','autoset','auto_typeset']) if not self._use_cfg_file: - return _ret( {}, {} ) + return _ret( (), {}, {} ) # check for changes in system template file (term must be initialized) from .cfgfile import mmgen_cfg_file @@ -526,11 +540,14 @@ class Config(Lockable): ucfg = mmgen_cfg_file(self,'usr') + self._cfgfile_fn = ucfg.fn + if need_proto: from .protocol import init_proto autoset_opts = {} auto_typeset_opts = {} + non_auto_opts = [] already_set = tuple(self._uopts) + env_cfg for d in ucfg.get_lines(): @@ -552,9 +569,10 @@ class Config(Lockable): val = ucfg.parse_value(d.value,refval) if not val: die( 'CfgFileParseError', f'Parse error in file {ucfg.fn!r}, line {d.lineno}' ) - val_conv = set_for_type( attr, val, refval, 'configuration file option', src=ucfg.fn ) + val_conv = conv_type( attr, val, refval, 'configuration file option', src=ucfg.fn ) if not attr in already_set: setattr(cls,attr,val_conv) + non_auto_opts.append(attr) elif d.name in self._autoset_opts: autoset_opts[d.name] = d.value elif d.name in self._auto_typeset_opts: @@ -562,7 +580,7 @@ class Config(Lockable): else: die( 'CfgFileParseError', f'{d.name!r}: unrecognized option in {ucfg.fn!r}, line {d.lineno}' ) - return _ret( autoset_opts, auto_typeset_opts ) + return _ret( tuple(non_auto_opts), autoset_opts, auto_typeset_opts ) def _set_autoset_opts(self,cfgfile_autoset_opts): @@ -638,209 +656,176 @@ class Config(Lockable): if len(bad) > 1: die(1,'Conflicting options: {}'.format(', '.join(map(fmt_opt,bad)))) -def check_usr_opts(cfg,usr_opts): # Raises an exception if any check fails +def check_opts(cfg): # Raises exception if any check fails - def opt_splits(val,sep,n,desc): - sepword = 'comma' if sep == ',' else 'colon' if sep == ':' else repr(sep) - try: - l = val.split(sep) - except: - die( 'UserOptError', f'{val!r}: invalid {desc} (not {sepword}-separated list)' ) + def get_desc(desc_pfx=''): + return ( + (desc_pfx + ' ' if desc_pfx else '') + + ( + f'parameter for command-line option {fmt_opt(name)!r}' + if name in cfg._uopts and 'command-line' in cfg._uopt_desc else + f'value for configuration option {name!r}' + ) + + ( ' from environment' if name in cfg._envopts else '') + + (f' in {cfg._cfgfile_fn!r}' if name in cfg._cfgfile_opts.non_auto else '') + ) - if len(l) != n: - die( 'UserOptError', f'{val!r}: invalid {desc} ({n} {sepword}-separated items required)' ) + def display_opt(name,val='',beg='For selected',end=':\n'): + from .util import msg_r + msg_r('{} option {!r}{}'.format( + beg, + f'{fmt_opt(name)}={val}' if val else fmt_opt(name), + end )) - def opt_compares(val,op_str,target,desc,desc2=''): - import operator as o - op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str] - if not op_f(val,target): - if desc2: - desc2 += ' ' - die( 'UserOptError', f'{val}: invalid {desc} ({desc2}not {op_str} {target})' ) + def opt_compares(val,op_str,target): + import operator + if not { + '<': operator.lt, + '<=': operator.le, + '>': operator.gt, + '>=': operator.ge, + '=': operator.eq, + }[op_str](val,target): + die( 'UserOptError', f'{val}: invalid {get_desc()} (not {op_str} {target})' ) - def opt_is_int(val,desc): + def opt_is_int(val,desc_pfx=''): if not is_int(val): - die( 'UserOptError', f'{val!r}: invalid {desc} (not an integer)' ) + die( 'UserOptError', f'{val!r}: invalid {get_desc(desc_pfx)} (not an integer)' ) - def opt_is_float(val,desc): - try: - float(val) - except: - die( 'UserOptError', f'{val!r}: invalid {desc} (not a floating-point number)' ) - - def opt_is_in_list(val,tlist,desc): + def opt_is_in_list(val,tlist,desc_pfx=''): if val not in tlist: q,sep = (('',','),("'","','"))[type(tlist[0]) == str] die( 'UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format( v = val, - w = desc, + w = get_desc(desc_pfx), q = q, o = sep.join(map(str,sorted(tlist))) )) - def opt_unrecognized(key,val,desc='value'): - die( 'UserOptError', f'{val!r}: unrecognized {desc} for option {fmt_opt(key)!r}' ) + def opt_unrecognized(): + die( 'UserOptError', f'{val!r}: unrecognized {get_desc()}' ) - def opt_display(key,val='',beg='For selected',end=':\n'): - from .util import msg_r - msg_r('{} option {!r}{}'.format( - beg, - f'{fmt_opt(key)}={val}' if val else fmt_opt(key), - end )) + class check_funcs: - def chk_in_fmt(key,val,desc): - from .wallet import get_wallet_data - wd = get_wallet_data(fmt_code=val) - if not wd: - opt_unrecognized(key,val) - if key == 'out_fmt': - p = 'hidden_incog_output_params' - - if wd.type == 'incog_hidden' and not getattr(cfg,p): - die( 'UserOptError', - 'Hidden incog format output requested. ' + - f'You must supply a file and offset with the {fmt_opt(p)!r} option' ) - - if wd.base_type == 'incog_base' and cfg.old_incog_fmt: - opt_display(key,val,beg='Selected',end=' ') - opt_display('old_incog_fmt',beg='conflicts with',end=':\n') - die( 'UserOptError', 'Export to old incog wallet format unsupported' ) - elif wd.type == 'brain': - die( 'UserOptError', 'Output to brainwallet format unsupported' ) - - chk_out_fmt = chk_in_fmt - - def chk_hidden_incog_input_params(key,val,desc): - a = val.rsplit(',',1) # permit comma in filename - if len(a) != 2: - opt_display(key,val) - die( 'UserOptError', 'Option requires two comma-separated arguments' ) - - fn,offset = a - opt_is_int(offset,desc) - - from .fileutil import check_infile,check_outdir,check_outfile - if key == 'hidden_incog_input_params': - check_infile(fn,blkdev_ok=True) - key2 = 'in_fmt' - else: - try: os.stat(fn) - except: - b = os.path.dirname(fn) - if b: - check_outdir(b) - else: - check_outfile(fn,blkdev_ok=True) - key2 = 'out_fmt' - - if hasattr(cfg,key2): - val2 = getattr(cfg,key2) + def in_fmt(): from .wallet import get_wallet_data - wd = get_wallet_data('incog_hidden') - if val2 and val2 not in wd.fmt_codes: - die( 'UserOptError', f'Option conflict:\n {fmt_opt(key)}, with\n {fmt_opt(key2)}={val2}' ) + wd = get_wallet_data(fmt_code=val) + if not wd: + opt_unrecognized() + if name == 'out_fmt': + p = 'hidden_incog_output_params' - chk_hidden_incog_output_params = chk_hidden_incog_input_params + if wd.type == 'incog_hidden' and not getattr(cfg,p): + die( 'UserOptError', + 'Hidden incog format output requested. ' + + f'You must supply a file and offset with the {fmt_opt(p)!r} option' ) - def chk_subseeds(key,val,desc): - from .subseed import SubSeedIdxRange - opt_is_int(val,desc) - opt_compares(int(val),'>=',SubSeedIdxRange.min_idx,desc) - opt_compares(int(val),'<=',SubSeedIdxRange.max_idx,desc) + if wd.base_type == 'incog_base' and cfg.old_incog_fmt: + display_opt(name,val,beg='Selected',end=' ') + display_opt('old_incog_fmt',beg='conflicts with',end=':\n') + die( 'UserOptError', 'Export to old incog wallet format unsupported' ) + elif wd.type == 'brain': + die( 'UserOptError', 'Output to brainwallet format unsupported' ) - def chk_seed_len(key,val,desc): - from .seed import Seed - opt_is_int(val,desc) - opt_is_in_list(int(val),Seed.lens,desc) + out_fmt = in_fmt - def chk_hash_preset(key,val,desc): - from .crypto import Crypto - opt_is_in_list(val,list(Crypto.hash_presets.keys()),desc) + def hidden_incog_input_params(): + a = val.rsplit(',',1) # permit comma in filename + if len(a) != 2: + display_opt(name,val) + die( 'UserOptError', 'Option requires two comma-separated arguments' ) - def chk_brain_params(key,val,desc): - a = val.split(',') - if len(a) != 2: - opt_display(key,val) - die( 'UserOptError', 'Option requires two comma-separated arguments' ) - opt_is_int(a[0],'seed length '+desc) - from .seed import Seed - opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc) - from .crypto import Crypto - opt_is_in_list(a[1],list(Crypto.hash_presets.keys()),'hash preset '+desc) + fn,offset = a + opt_is_int(offset) - def chk_usr_randchars(key,val,desc): - if val == 0: - return - opt_is_int(val,desc) - opt_compares(val,'>=',cfg.min_urandchars,desc) - opt_compares(val,'<=',cfg.max_urandchars,desc) + from .fileutil import check_infile,check_outdir,check_outfile + if name == 'hidden_incog_input_params': + check_infile(fn,blkdev_ok=True) + key2 = 'in_fmt' + else: + try: os.stat(fn) + except: + b = os.path.dirname(fn) + if b: + check_outdir(b) + else: + check_outfile(fn,blkdev_ok=True) + key2 = 'out_fmt' - def chk_tx_fee(key,val,desc): - pass -# opt_is_tx_fee(key,val,desc) # TODO: move this check elsewhere + if hasattr(cfg,key2): + val2 = getattr(cfg,key2) + from .wallet import get_wallet_data + wd = get_wallet_data('incog_hidden') + if val2 and val2 not in wd.fmt_codes: + die( 'UserOptError', f'Option conflict:\n {fmt_opt(name)}, with\n {fmt_opt(key2)}={val2}' ) - def chk_tx_confs(key,val,desc): - opt_is_int(val,desc) - opt_compares(val,'>=',1,desc) + hidden_incog_output_params = hidden_incog_input_params - def chk_vsize_adj(key,val,desc): - opt_is_float(val,desc) - from .util import ymsg - ymsg(f'Adjusting transaction vsize by a factor of {float(val):1.2f}') + def subseeds(): + from .subseed import SubSeedIdxRange + opt_compares(val,'>=',SubSeedIdxRange.min_idx) + opt_compares(val,'<=',SubSeedIdxRange.max_idx) - def chk_daemon_id(key,val,desc): - from .daemon import CoinDaemon - opt_is_in_list(val,CoinDaemon.all_daemon_ids(),desc) + def seed_len(): + from .seed import Seed + opt_is_in_list(int(val),Seed.lens) -# TODO: move this check elsewhere -# def chk_rbf(key,val,desc): -# if not proto.cap('rbf'): -# die( 'UserOptError', f'--rbf requested, but {proto.coin} does not support replace-by-fee transactions' ) + def hash_preset(): + from .crypto import Crypto + opt_is_in_list(val,list(Crypto.hash_presets.keys())) -# def chk_bob(key,val,desc): -# from .proto.btc.regtest import MMGenRegtest -# try: -# os.stat(os.path.join(MMGenRegtest(cfg,cfg.coin).d.datadir,'regtest','debug.log')) -# except: -# die( 'UserOptError', -# 'Regtest (Bob and Alice) mode not set up yet. ' + -# f"Run '{gc.proj_name.lower()}-regtest setup' to initialize." ) -# -# chk_alice = chk_bob + def brain_params(): + a = val.split(',') + if len(a) != 2: + display_opt(name,val) + die( 'UserOptError', 'Option requires two comma-separated arguments' ) - def chk_locktime(key,val,desc): - opt_is_int(val,desc) - opt_compares(int(val),'>',0,desc) + opt_is_int( a[0], desc_pfx='seed length' ) + from .seed import Seed + opt_is_in_list( int(a[0]), Seed.lens, desc_pfx='seed length' ) - def chk_columns(key,val,desc): - opt_compares(int(val),'>',10,desc) + from .crypto import Crypto + opt_is_in_list( a[1], list(Crypto.hash_presets.keys()), desc_pfx='hash preset' ) -# TODO: move this check elsewhere -# def chk_token(key,val,desc): -# if not 'token' in proto.caps: -# die( 'UserOptError', f'Coin {tx.coin!r} does not support the --token option' ) -# if len(val) == 40 and is_hex_str(val): -# return -# if len(val) > 20 or not all(s.isalnum() for s in val): -# die( 'UserOptError', f'{val!r}: invalid parameter for --token option' ) + def usr_randchars(): + if val != 0: + opt_compares(val,'>=',cfg.min_urandchars) + opt_compares(val,'<=',cfg.max_urandchars) + + def tx_confs(): + opt_is_int(val) + opt_compares(int(val),'>=',1) + + def vsize_adj(): + from .util import ymsg + ymsg(f'Adjusting transaction vsize by a factor of {val:1.2f}') + + def daemon_id(): + from .daemon import CoinDaemon + opt_is_in_list(val,CoinDaemon.all_daemon_ids()) + + def locktime(): + opt_is_int(val) + opt_compares(int(val),'>',0) + + def columns(): + opt_compares(val,'>',10) from .util import is_int,Msg - cfuncs = { k:v for k,v in locals().items() if k.startswith('chk_') } - - for key in usr_opts: - val = getattr(cfg,key) - desc = f'parameter for {fmt_opt(key)!r} option' - - if key in cfg._infile_opts: + # TODO: add checks for token, rbf, tx_fee + check_funcs_names = tuple(check_funcs.__dict__) + for name in tuple(cfg._uopts) + cfg._envopts + cfg._cfgfile_opts.non_auto: + val = getattr(cfg,name) + if name in cfg._infile_opts: from .fileutil import check_infile check_infile(val) # file exists and is readable - dies on error - elif key == 'outdir': + elif name == 'outdir': from .fileutil import check_outdir check_outdir(val) # dies on error - elif 'chk_'+key in cfuncs: - cfuncs['chk_'+key](key,val,desc) + elif name in check_funcs_names: + getattr(check_funcs,name)() elif cfg.debug: - Msg(f'check_usr_opts(): No test for config opt {key!r}') + Msg(f'check_opts(): No test for config opt {name!r}') def fmt_opt(o): return '--' + o.replace('_','-') @@ -856,7 +841,7 @@ def opt_postproc_debug(cfg): Msg(' {}\n'.format('\n '.join(b))) Msg('\n=== end opts.py debug ===\n') -def set_for_type( +def conv_type( name, val, refval, @@ -864,6 +849,14 @@ def set_for_type( invert_bool = False, src = None ): + def do_fail(): + die(1,'{a!r}: invalid value for {b} {c!r}{d} (must be of type {e!r})'.format( + a = val, + b = desc, + c = fmt_opt(name) if 'command-line' in desc else name, + d = f' in {src!r}' if src else '', + e = type(refval).__name__ )) + if type(refval) == bool: v = str(val).lower() ret = ( @@ -871,17 +864,9 @@ def set_for_type( False if v in ('false','no','none','0','off','') else None ) - if ret is not None: - return not ret if invert_bool else ret + return do_fail() if ret is None else (not ret) if invert_bool else ret else: try: return type(refval)(not val if invert_bool else val) except: - pass - - die(1,'{a!r}: invalid value for {b} {c!r}{d} (must be of type {e!r})'.format( - a = val, - b = desc, - c = fmt_opt(name) if desc == 'command-line option' else name, - d = f' in {src!r}' if src else '', - e = type(refval).__name__ )) + do_fail() diff --git a/mmgen/data/version b/mmgen/data/version index c0283cb9..0b4ffb5c 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -13.3.dev44 +13.3.dev45 diff --git a/mmgen/opts.py b/mmgen/opts.py index 950337d2..be0d006c 100755 --- a/mmgen/opts.py +++ b/mmgen/opts.py @@ -134,10 +134,6 @@ class UserOpts: if parse_only: return - if 'data_dir' in uopts: - cfg._data_dir_root_override = uopts['data_dir'] - del uopts['data_dir'] - def init_bottom(self,cfg): # print help screen only after globals initialized and locked: