From e90e25b2f023f143702440f1d460df154882739b Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Tue, 4 Apr 2023 16:04:13 +0000 Subject: [PATCH] Config API, Part II This patch completes the implementation of the API, making the entire MMGen code base usable as a library for external projects. A usage example can be found in the script `examples/coin-daemon-info.py`. Testing: $ test/test.py -e coin_daemon_info --- examples/coin-daemon-info.py | 94 +++++ mmgen/cfg.py | 531 ++++++++++++++++++++++++++--- mmgen/data/version | 2 +- mmgen/opts.py | 642 +++++------------------------------ test/test_py_d/ts_misc.py | 11 + 5 files changed, 679 insertions(+), 601 deletions(-) create mode 100755 examples/coin-daemon-info.py diff --git a/examples/coin-daemon-info.py b/examples/coin-daemon-info.py new file mode 100755 index 00000000..00faa530 --- /dev/null +++ b/examples/coin-daemon-info.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet +# Copyright (C)2013-2023 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen +# https://gitlab.com/mmgen/mmgen + +""" +examples/coin-daemon-info.py: + Get info about multiple running coin daemons. + Demonstrates use of the MMGen Config API. +""" + +# Instructions +# +# Testing mode: +# +# 1) From the MMGen repository root, start the mainnet test suite daemons as follows +# (note that openethereum is the default testing daemon for ETH): +# +# test/start-coin-daemons.py btc ltc eth +# +# 2) Then run the script as follows: +# +# PYTHONPATH=. MMGEN_TEST_SUITE=1 examples/coin-daemon-info.py btc ltc eth +# +# Live mode: +# +# 1) Start up one or more of bitcoind, litecoind or geth with the standard mainnet ports +# and datadirs. For geth, the options ‘--http --http.api=eth,web3’ are required. +# +# 2) Then run the script as follows: +# +# PYTHONPATH=. examples/coin-daemon-info.py btc ltc eth + +import sys,os,asyncio + +from mmgen.exception import SocketError +from mmgen.cfg import Config +from mmgen.rpc import rpc_init +from mmgen.util import make_timestr + +async def get_rpc(cfg): + try: + return await rpc_init(cfg) + except SocketError as e: + return False + +async def main(coins): + + rpcs = {} + cfgs = {} + test_suite = os.getenv('MMGEN_TEST_SUITE') + base_cfg = Config({'pager':True}) + + for coin in coins: + cfg_in = { + 'coin': coin, + 'test_suite': test_suite, + } + if coin == 'eth' and not test_suite: + cfg_in.update({'daemon_id': 'geth'}) + cfgs[coin] = Config(cfg_in) + rpcs[coin] = await get_rpc(cfgs[coin]) + + def gen_output(): + fs = '{:4} {:7} {:6} {:<5} {:<8} {:30} {:13} {:23} {}' + yield fs.format('Coin','Network','Status','Port','Chain','Latest Block','Daemon','Version','Datadir') + for coin,rpc in rpcs.items(): + info = ('Down','-','-','-','-','-','-') if rpc is False else ( + 'Up', + rpc.port, + rpc.chain, + '{:<8} [{}]'.format(rpc.blockcount, make_timestr(rpc.cur_date)), + rpc.daemon.coind_name, + rpc.daemon_version_str, + rpc.daemon.datadir + ) + yield fs.format( coin.upper(), cfgs[coin].network, *info ) + + base_cfg._util.stdout_or_pager('\n'.join(gen_output())) + +all_coins = ('btc', 'ltc', 'eth') + +coins = sys.argv[1:] + +if coins and all(coin in all_coins for coin in coins): + asyncio.run(main(coins)) +else: + print(f'You must supply one or more of the following coins on the command line:\n {all_coins}') + sys.exit(1) diff --git a/mmgen/cfg.py b/mmgen/cfg.py index 0b34a2ba..5ef23c45 100755 --- a/mmgen/cfg.py +++ b/mmgen/cfg.py @@ -177,6 +177,7 @@ class Config(Lockable): force_color = False # placeholder force_256_color = False scroll = False + pager = False columns = 0 color = bool( ( sys.stdout.isatty() and not os.getenv('MMGEN_TEST_SUITE_PEXPECT') ) or @@ -216,37 +217,14 @@ class Config(Lockable): mnemonic_entry_modes = {} - # placeholders: + # external use: + _opts = None _proto = None - pager = False - # 'long' opts (subset of common_opts_data): - _common_opts = ( - 'accept_defaults', - 'aiohttp_rpc_queue_len', - 'bob', - 'alice', - 'carol', - 'coin', - 'color', - 'columns', - 'daemon_data_dir', - 'daemon_id', - 'force_256_color', - 'http_timeout', - 'ignore_daemon_version', - 'no_license', - 'regtest', - 'rpc_host', - 'rpc_password', - 'rpc_port', - 'rpc_user', - 'scroll', - 'testnet', - 'token' ) - - # opts not in _common_opts but required to be set during opts initialization - _init_opts = ('show_hash_presets','yes','verbose') + # internal use: + _data_dir_root_override = None + _use_cfg_file = False + _use_env = False _incompatible_opts = ( ('help','longhelp'), @@ -355,6 +333,7 @@ class Config(Lockable): 'vsize_adj': float, } + # test suite: err_disp_timeout = 0.7 short_disp_timeout = 0.3 stdin_tty = sys.stdin.isatty() @@ -410,7 +389,6 @@ class Config(Lockable): self, cfg = None, opts_data = None, - add_opts = None, init_opts = None, opt_filter = None, parse_only = False, @@ -420,23 +398,490 @@ class Config(Lockable): do_post_init = False, process_opts = False ): + # Step 1: get user-supplied configuration data from a) command line, or b) first argument + # to constructor; save to self._uopts: if opts_data or parsed_opts or process_opts: - - import mmgen.opts as opts - - self._opts = opts - - opts.init( + assert cfg is None + from mmgen.opts import UserOpts + UserOpts( cfg = self, opts_data = opts_data, - add_opts = add_opts, init_opts = init_opts, opt_filter = opt_filter, parse_only = parse_only, - parsed_opts = parsed_opts, - need_proto = need_proto, - need_amt = need_amt, - do_post_init = do_post_init ) + parsed_opts = parsed_opts ) + desc = 'command-line option' + else: + self._uopts = {} if cfg is None else cfg + desc = 'configuration option' - if do_post_init: - self._post_init = opts.post_init + if parse_only and not any(k in self._uopts for k in ['help','longhelp']): + return + + # Step 2: set cfg from user-supplied data, skipping auto opts; set type from corresponding + # class attribute, if it exists: + auto_opts = tuple(self._autoset_opts) + tuple(self._auto_typeset_opts) + for key,val in self._uopts.items(): + if key not in auto_opts: + setattr(self, key, set_for_type(key, val, getattr(self,key), desc) if hasattr(self,key) else val) + + # Step 3: set cfg from environment, skipping already-set opts; save names set from environment: + env_cfg = tuple(self._set_cfg_from_env()) if self._use_env else () + + from .term import init_term + init_term(self) # requires ‘hold_protect_disable’ (set from env) + + from .fileutil import check_or_create_dir + check_or_create_dir(self.data_dir_root) + + from .util import wrap_ripemd160 + wrap_ripemd160() # ripemd160 required by mmgen_cfg_file() in _set_cfg_from_cfg_file() + + # Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save auto opts to be set: + # (cfgfile.py requires ‘data_dir_root’, ‘test_suite_cfgtest’) + cfgfile_opts = self._set_cfg_from_cfg_file( env_cfg, need_proto ) + + # Step 5: set autoset opts from user-supplied data, cfgfile data, or default values, in that order: + self._set_autoset_opts( cfgfile_opts.autoset ) + + # Step 6: set auto typeset opts from user-supplied data or cfgfile data, in that order: + self._set_auto_typeset_opts( cfgfile_opts.auto_typeset ) + + if self.regtest or self.bob or self.alice or self.carol or gc.prog_name == 'mmgen-regtest': + self.network = 'regtest' + self.regtest_user = 'bob' if self.bob else 'alice' if self.alice else 'carol' if self.carol else None + else: + self.network = 'testnet' if self.testnet else 'mainnet' + + self.coin = self.coin.upper() + self.token = self.token.upper() or None + + # self.color is finalized, so initialize color: + if self.color: # MMGEN_DISABLE_COLOR sets this to False + from .color import init_color + init_color(num_colors=256 if self.force_256_color else 'auto') + + self._die_on_incompatible_opts() + + check_or_create_dir(self.data_dir) + + if self.debug and gc.prog_name != 'test.py': + self.verbose = True + self.quiet = False + + if self.debug_opts: + opt_postproc_debug(self) + + from .util import Util + self._util = Util(self) + + self._lock() + + if need_proto: + from .protocol import warn_trustlevel,init_proto_from_cfg + warn_trustlevel(self) + # requires the default-to-none behavior, so do after the lock: + self._proto = init_proto_from_cfg(self,need_amt=need_amt) + + if self._opts and not do_post_init: + self._opts.init_bottom(self) + + # Check user-set opts without modifying them + check_usr_opts(self,self._uopts) + + def _set_cfg_from_env(self): + for name,val in ((k,v) for k,v in os.environ.items() if k.startswith('MMGEN_')): + if name == 'MMGEN_DEBUG_ALL': + continue + elif name in self._env_opts: + if val: # ignore empty string values; string value of '0' or 'false' sets variable to False + disable = name.startswith('MMGEN_DISABLE_') + gname = name[(6,14)[disable]:].lower() + if gname in self._uopts: # don’t touch attr if already set on cmdline + continue + elif hasattr(self,gname): + setattr( + self, + gname, + set_for_type( name, val, getattr(self,gname), 'environment var', invert_bool=disable )) + yield gname + else: + raise ValueError(f'Name {gname!r} not present in globals') + else: + raise ValueError(f'{name!r} is not a valid MMGen environment variable') + + def _set_cfg_from_cfg_file( + self, + env_cfg, + need_proto ): + + _ret = namedtuple('cfgfile_opts',['autoset','auto_typeset']) + + if not self._use_cfg_file: + return _ret( {}, {} ) + + # check for changes in system template file (term must be initialized) + from .cfgfile import mmgen_cfg_file + mmgen_cfg_file(self,'sample') + + ucfg = mmgen_cfg_file(self,'usr') + + if need_proto: + from .protocol import init_proto + + autoset_opts = {} + auto_typeset_opts = {} + already_set = tuple(self._uopts) + env_cfg + + for d in ucfg.get_lines(): + if d.name in self._cfg_file_opts: + ns = d.name.split('_') + if ns[0] in gc.core_coins: + if not need_proto: + continue + nse,tn = ( + (ns[2:],ns[1]=='testnet') if len(ns) > 2 and ns[1] in ('mainnet','testnet') else + (ns[1:],False) + ) + cls = type(init_proto( self, ns[0], tn, need_amt=True )) # no instance yet, so override _class_ attr + attr = '_'.join(nse) + else: + cls = self + attr = d.name + refval = getattr(cls,attr) + val = ucfg.parse_value(d.value,refval) + if not val: + die( 'CfgFileParseError', f'Parse error in file {ucfg.fn!r}, line {d.lineno}' ) + val_conv = set_for_type( attr, val, refval, 'configuration file option', src=ucfg.fn ) + if not attr in already_set: + setattr(cls,attr,val_conv) + elif d.name in self._autoset_opts: + autoset_opts[d.name] = d.value + elif d.name in self._auto_typeset_opts: + auto_typeset_opts[d.name] = d.value + else: + die( 'CfgFileParseError', f'{d.name!r}: unrecognized option in {ucfg.fn!r}, line {d.lineno}' ) + + return _ret( autoset_opts, auto_typeset_opts ) + + def _set_autoset_opts(self,cfgfile_autoset_opts): + + def get_autoset_opt(key,val,src): + + def die_on_err(desc): + from .util import fmt_list + die( + 'UserOptError', + '{a!r}: invalid {b} (not {c}: {d})'.format( + a = val, + b = { + 'cmdline': 'parameter for option --{}'.format(key.replace('_','-')), + 'cfgfile': 'value for cfg file option {!r}'.format(key) + }[src], + c = desc, + d = fmt_list(data.choices) )) + + class opt_type: + + def nocase_str(): + if val.lower() in data.choices: + return val.lower() + else: + die_on_err('one of') + + def nocase_pfx(): + cs = [s for s in data.choices if s.startswith(val.lower())] + if len(cs) == 1: + return cs[0] + else: + die_on_err('unique substring of') + + data = self._autoset_opts[key] + + return getattr(opt_type,data.type)() + + # Check autoset opts, setting if unset + for key in self._autoset_opts: + + assert not hasattr(self,key), f'autoset opt {key!r} is already set, but it shouldn’t be!' + + if key in self._uopts: + val,src = (self._uopts[key],'cmdline') + elif key in cfgfile_autoset_opts: + val,src = (cfgfile_autoset_opts[key],'cfgfile') + else: + val = None + + if val is None: + setattr(self, key, self._autoset_opts[key].choices[0]) + else: + setattr(self, key, get_autoset_opt(key,val,src=src)) + + def _set_auto_typeset_opts(self,cfgfile_auto_typeset_opts): + + def do_set(key,val,ref_type): + assert not hasattr(self,key), f'{key!r} is in cfg!' + setattr(self,key,None if val is None else ref_type(val)) + + for key,ref_type in self._auto_typeset_opts.items(): + if key in self._uopts: + do_set(key, self._uopts[key], ref_type) + elif key in cfgfile_auto_typeset_opts: + do_set(key, cfgfile_auto_typeset_opts[key], ref_type) + + def _post_init(self): + return self._opts.init_bottom(self) + + def _die_on_incompatible_opts(self): + for group in self._incompatible_opts: + bad = [k for k in self.__dict__ if k in group and getattr(self,k) != None] + if len(bad) > 1: + die(1,'Conflicting options: {}'.format(', '.join(map(fmt_opt,bad)))) + +def check_usr_opts(cfg,usr_opts): # Raises an exception if any check fails + + def opt_splits(val,sep,n,desc): + sepword = 'comma' if sep == ',' else 'colon' if sep == ':' else repr(sep) + try: + l = val.split(sep) + except: + die( 'UserOptError', f'{val!r}: invalid {desc} (not {sepword}-separated list)' ) + + if len(l) != n: + die( 'UserOptError', f'{val!r}: invalid {desc} ({n} {sepword}-separated items required)' ) + + def opt_compares(val,op_str,target,desc,desc2=''): + import operator as o + op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str] + if not op_f(val,target): + if desc2: + desc2 += ' ' + die( 'UserOptError', f'{val}: invalid {desc} ({desc2}not {op_str} {target})' ) + + def opt_is_int(val,desc): + if not is_int(val): + die( 'UserOptError', f'{val!r}: invalid {desc} (not an integer)' ) + + def opt_is_float(val,desc): + try: + float(val) + except: + die( 'UserOptError', f'{val!r}: invalid {desc} (not a floating-point number)' ) + + def opt_is_in_list(val,tlist,desc): + if val not in tlist: + q,sep = (('',','),("'","','"))[type(tlist[0]) == str] + die( 'UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format( + v = val, + w = desc, + q = q, + o = sep.join(map(str,sorted(tlist))) )) + + def opt_unrecognized(key,val,desc='value'): + die( 'UserOptError', f'{val!r}: unrecognized {desc} for option {fmt_opt(key)!r}' ) + + def opt_display(key,val='',beg='For selected',end=':\n'): + from .util import msg_r + msg_r('{} option {!r}{}'.format( + beg, + f'{fmt_opt(key)}={val}' if val else fmt_opt(key), + end )) + + def chk_in_fmt(key,val,desc): + from .wallet import get_wallet_data + wd = get_wallet_data(fmt_code=val) + if not wd: + opt_unrecognized(key,val) + if key == 'out_fmt': + p = 'hidden_incog_output_params' + + if wd.type == 'incog_hidden' and not getattr(cfg,p): + die( 'UserOptError', + 'Hidden incog format output requested. ' + + f'You must supply a file and offset with the {fmt_opt(p)!r} option' ) + + if wd.base_type == 'incog_base' and cfg.old_incog_fmt: + opt_display(key,val,beg='Selected',end=' ') + opt_display('old_incog_fmt',beg='conflicts with',end=':\n') + die( 'UserOptError', 'Export to old incog wallet format unsupported' ) + elif wd.type == 'brain': + die( 'UserOptError', 'Output to brainwallet format unsupported' ) + + chk_out_fmt = chk_in_fmt + + def chk_hidden_incog_input_params(key,val,desc): + a = val.rsplit(',',1) # permit comma in filename + if len(a) != 2: + opt_display(key,val) + die( 'UserOptError', 'Option requires two comma-separated arguments' ) + + fn,offset = a + opt_is_int(offset,desc) + + from .fileutil import check_infile,check_outdir,check_outfile + if key == 'hidden_incog_input_params': + check_infile(fn,blkdev_ok=True) + key2 = 'in_fmt' + else: + try: os.stat(fn) + except: + b = os.path.dirname(fn) + if b: + check_outdir(b) + else: + check_outfile(fn,blkdev_ok=True) + key2 = 'out_fmt' + + if hasattr(cfg,key2): + val2 = getattr(cfg,key2) + from .wallet import get_wallet_data + wd = get_wallet_data('incog_hidden') + if val2 and val2 not in wd.fmt_codes: + die( 'UserOptError', f'Option conflict:\n {fmt_opt(key)}, with\n {fmt_opt(key2)}={val2}' ) + + chk_hidden_incog_output_params = chk_hidden_incog_input_params + + def chk_subseeds(key,val,desc): + from .subseed import SubSeedIdxRange + opt_is_int(val,desc) + opt_compares(int(val),'>=',SubSeedIdxRange.min_idx,desc) + opt_compares(int(val),'<=',SubSeedIdxRange.max_idx,desc) + + def chk_seed_len(key,val,desc): + from .seed import Seed + opt_is_int(val,desc) + opt_is_in_list(int(val),Seed.lens,desc) + + def chk_hash_preset(key,val,desc): + from .crypto import Crypto + opt_is_in_list(val,list(Crypto.hash_presets.keys()),desc) + + def chk_brain_params(key,val,desc): + a = val.split(',') + if len(a) != 2: + opt_display(key,val) + die( 'UserOptError', 'Option requires two comma-separated arguments' ) + opt_is_int(a[0],'seed length '+desc) + from .seed import Seed + opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc) + from .crypto import Crypto + opt_is_in_list(a[1],list(Crypto.hash_presets.keys()),'hash preset '+desc) + + def chk_usr_randchars(key,val,desc): + if val == 0: + return + opt_is_int(val,desc) + opt_compares(val,'>=',cfg.min_urandchars,desc) + opt_compares(val,'<=',cfg.max_urandchars,desc) + + def chk_tx_fee(key,val,desc): + pass +# opt_is_tx_fee(key,val,desc) # TODO: move this check elsewhere + + def chk_tx_confs(key,val,desc): + opt_is_int(val,desc) + opt_compares(val,'>=',1,desc) + + def chk_vsize_adj(key,val,desc): + opt_is_float(val,desc) + from .util import ymsg + ymsg(f'Adjusting transaction vsize by a factor of {float(val):1.2f}') + + def chk_daemon_id(key,val,desc): + from .daemon import CoinDaemon + opt_is_in_list(val,CoinDaemon.all_daemon_ids(),desc) + +# TODO: move this check elsewhere +# def chk_rbf(key,val,desc): +# if not proto.cap('rbf'): +# die( 'UserOptError', f'--rbf requested, but {proto.coin} does not support replace-by-fee transactions' ) + +# def chk_bob(key,val,desc): +# from .proto.btc.regtest import MMGenRegtest +# try: +# os.stat(os.path.join(MMGenRegtest(cfg,cfg.coin).d.datadir,'regtest','debug.log')) +# except: +# die( 'UserOptError', +# 'Regtest (Bob and Alice) mode not set up yet. ' + +# f"Run '{gc.proj_name.lower()}-regtest setup' to initialize." ) +# +# chk_alice = chk_bob + + def chk_locktime(key,val,desc): + opt_is_int(val,desc) + opt_compares(int(val),'>',0,desc) + + def chk_columns(key,val,desc): + opt_compares(int(val),'>',10,desc) + +# TODO: move this check elsewhere +# def chk_token(key,val,desc): +# if not 'token' in proto.caps: +# die( 'UserOptError', f'Coin {tx.coin!r} does not support the --token option' ) +# if len(val) == 40 and is_hex_str(val): +# return +# if len(val) > 20 or not all(s.isalnum() for s in val): +# die( 'UserOptError', f'{val!r}: invalid parameter for --token option' ) + + from .util import is_int,Msg + + cfuncs = { k:v for k,v in locals().items() if k.startswith('chk_') } + + for key in usr_opts: + val = getattr(cfg,key) + desc = f'parameter for {fmt_opt(key)!r} option' + + if key in cfg._infile_opts: + from .fileutil import check_infile + check_infile(val) # file exists and is readable - dies on error + elif key == 'outdir': + from .fileutil import check_outdir + check_outdir(val) # dies on error + elif 'chk_'+key in cfuncs: + cfuncs['chk_'+key](key,val,desc) + elif cfg.debug: + Msg(f'check_usr_opts(): No test for config opt {key!r}') + +def fmt_opt(o): + return '--' + o.replace('_','-') + +def opt_postproc_debug(cfg): + a = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) != None] + b = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) == None] + from .util import Msg + Msg('\n Configuration opts:') + for e in [d for d in dir(cfg) if d[:2] != '__']: + Msg(' {:<20}: {}'.format(e, getattr(cfg,e))) + Msg(" Configuration opts set to 'None':") + Msg(' {}\n'.format('\n '.join(b))) + Msg('\n=== end opts.py debug ===\n') + +def set_for_type( + name, + val, + refval, + desc, + invert_bool = False, + src = None ): + + if type(refval) == bool: + v = str(val).lower() + ret = ( + True if v in ('true','yes','1','on') else + False if v in ('false','no','none','0','off','') else + None + ) + if ret is not None: + return not ret if invert_bool else ret + else: + try: + return type(refval)(not val if invert_bool else val) + except: + pass + + die(1,'{a!r}: invalid value for {b} {c!r}{d} (must be of type {e!r})'.format( + a = val, + b = desc, + c = fmt_opt(name) if desc == 'command-line option' else name, + d = f' in {src!r}' if src else '', + e = type(refval).__name__ )) diff --git a/mmgen/data/version b/mmgen/data/version index 7d783022..c0283cb9 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -13.3.dev43 +13.3.dev44 diff --git a/mmgen/opts.py b/mmgen/opts.py index 361bdbbe..950337d2 100755 --- a/mmgen/opts.py +++ b/mmgen/opts.py @@ -21,84 +21,9 @@ opts: MMGen-specific command-line options processing after generic processing by """ import sys,os -from .cfg import gc,Config -from .base_obj import Lockable +from .cfg import gc -import mmgen.share.Opts - -class UserOpts: pass -opt = UserOpts() - -def usage(): - from .util import Die - Die(1,mmgen.share.Opts.make_usage_str(gc.prog_name,'user',usage_data)) - -def version(): - from .util import Die,fmt - Die(0,fmt(f""" - {gc.prog_name.upper()} version {gc.version} - Part of the {gc.proj_name} suite, an online/offline cryptocurrency wallet for the - command line. Copyright (C){gc.Cdates} {gc.author} {gc.email} - """,indent=' ').rstrip()) - -def delete_data(opts_data): - for k in ('text','notes','code'): - if k in opts_data: - del opts_data[k] - del mmgen.share.Opts.make_help - del mmgen.share.Opts.process_uopts - del mmgen.share.Opts.parse_opts - -def post_init(cfg): - global opts_data_save,opt_filter_save - if cfg.help or cfg.longhelp: - print_help(cfg,opts_data_save,opt_filter_save) - else: - delete_data(opts_data_save) - del opts_data_save,opt_filter_save - -def print_help(cfg,opts_data,opt_filter): - - if not 'code' in opts_data: - opts_data['code'] = {} - - from .protocol import init_proto_from_cfg - proto = init_proto_from_cfg(cfg,need_amt=True) - - if getattr(cfg,'longhelp',None): - opts_data['code']['long_options'] = common_opts_data['code'] - def remove_unneeded_long_opts(): - d = opts_data['text']['long_options'] - if proto.base_proto != 'Ethereum': - d = '\n'.join(''+i for i in d.split('\n') if not '--token' in i) - opts_data['text']['long_options'] = d - remove_unneeded_long_opts() - - from .ui import do_pager - do_pager(mmgen.share.Opts.make_help( cfg, proto, opts_data, opt_filter )) - - sys.exit(0) - -def fmt_opt(o): - return '--' + o.replace('_','-') - -def die_on_incompatible_opts(cfg): - for group in cfg._incompatible_opts: - bad = [k for k in cfg.__dict__ if k in group and getattr(cfg,k) != None] - if len(bad) > 1: - from .util import die - die(1,'Conflicting options: {}'.format(', '.join(map(fmt_opt,bad)))) - -def _show_hash_presets(): - fs = ' {:<7} {:<6} {:<3} {}' - from .util import msg - from .crypto import Crypto - msg('Available parameters for scrypt.hash():') - msg(fs.format('Preset','N','r','p')) - for i in sorted(Crypto.hash_presets.keys()): - msg(fs.format(i,*Crypto.hash_presets[i])) - msg('N = memory usage (power of two), p = iterations (rounds)') - sys.exit(0) +import mmgen.share.Opts as Opts def opt_preproc_debug(po): d = ( @@ -114,101 +39,7 @@ def opt_preproc_debug(po): for label,data,pretty in d: Msg(' {:<20}: {}'.format(label,'\n' + fmt_list(data,fmt='col',indent=' '*8) if pretty else data)) -def opt_postproc_debug(cfg): - a = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) != None] - b = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) == None] - from .util import Msg - Msg('\n Global vars:') - for e in [d for d in dir(cfg) if d[:2] != '__']: - Msg(' {:<20}: {}'.format(e, getattr(cfg,e))) - Msg(" Global vars set to 'None':") - Msg(' {}\n'.format('\n '.join(b))) - Msg('\n=== end opts.py debug ===\n') - -def set_for_type(val,refval,desc,invert_bool=False,src=None): - - if type(refval) == bool: - v = str(val).lower() - ret = ( - True if v in ('true','yes','1','on') else - False if v in ('false','no','none','0','off','') else - None - ) - if ret is not None: - return not ret if invert_bool else ret - else: - try: - return type(refval)(not val if invert_bool else val) - except: - pass - - from .util import die - die(1,'{!r}: invalid value for {!r}{} (must be of type {!r})'.format( - val, - desc, - ' in {!r}'.format(src) if src else '', - type(refval).__name__) ) - -def set_cfg_from_cfg_file( - cfg, - ucfg, - cfgfile_autoset_opts, - cfgfile_auto_typeset_opts, - env_cfg, - need_proto ): - - if need_proto: - from .protocol import init_proto - - for d in ucfg.get_lines(): - if d.name in cfg._cfg_file_opts: - ns = d.name.split('_') - if ns[0] in gc.core_coins: - if not need_proto: - continue - nse,tn = ( - (ns[2:],ns[1]=='testnet') if len(ns) > 2 and ns[1] in ('mainnet','testnet') else - (ns[1:],False) - ) - cls = type(init_proto( cfg, ns[0], tn, need_amt=True )) # no instance yet, so override _class_ attr - attr = '_'.join(nse) - else: - cls = cfg - attr = d.name - refval = getattr(cls,attr) - val = ucfg.parse_value(d.value,refval) - if not val: - from .util import die - die( 'CfgFileParseError', f'Parse error in file {ucfg.fn!r}, line {d.lineno}' ) - val_conv = set_for_type(val,refval,attr,src=ucfg.fn) - if attr not in env_cfg: - setattr(cls,attr,val_conv) - elif d.name in cfg._autoset_opts: - cfgfile_autoset_opts[d.name] = d.value - elif d.name in cfg._auto_typeset_opts: - cfgfile_auto_typeset_opts[d.name] = d.value - else: - from .util import die - die( 'CfgFileParseError', f'{d.name!r}: unrecognized option in {ucfg.fn!r}, line {d.lineno}' ) - -def set_cfg_from_env(cfg): - for name,val in ((k,v) for k,v in os.environ.items() if k.startswith('MMGEN_')): - if name == 'MMGEN_DEBUG_ALL': - continue - elif name in cfg._env_opts: - if val: # ignore empty string values; string value of '0' or 'false' sets variable to False - disable = name.startswith('MMGEN_DISABLE_') - gname = name[(6,14)[disable]:].lower() - if hasattr(cfg,gname): - setattr(cfg,gname,set_for_type(val,getattr(cfg,gname),name,disable)) - yield gname - else: - raise ValueError(f'Name {gname!r} not present in globals') - else: - raise ValueError(f'{name!r} is not a valid MMGen environment variable') - -common_opts_data = { - # Most but not all of these set the corresponding global var +long_opts_data = { 'text': """ --, --accept-defaults Accept defaults at all prompts --, --coin=c Choose coin unit. Default: BTC. Current choice: {cu_dfl} @@ -256,415 +87,112 @@ opts_data_dfl = { } } -def init( - cfg, - opts_data = None, - add_opts = None, - init_opts = None, - opt_filter = None, - parse_only = False, - parsed_opts = None, - need_proto = True, - need_amt = True, - do_post_init = False ): +class UserOpts: - if opts_data is None: - opts_data = opts_data_dfl - - opts_data['text']['long_options'] = common_opts_data['text'] - - # Make this available to usage() - global usage_data - usage_data = opts_data['text'].get('usage2') or opts_data['text']['usage'] - - # po: (user_opts,cmd_args,opts,filtered_opts) - po = parsed_opts or mmgen.share.Opts.parse_opts(opts_data,opt_filter=opt_filter,parse_only=parse_only) - - if init_opts: # allow programs to preload user opts - for uopt,val in init_opts.items(): - if uopt not in po.user_opts: - po.user_opts[uopt] = val - - if parse_only and not any(k in po.user_opts for k in ('version','help','longhelp')): - cfg._parsed_opts = po - return - - if os.getenv('MMGEN_DEBUG_OPTS'): - opt_preproc_debug(po) - - # Copy parsed opts to opt, setting values to None if not set by user - for o in set( - po.opts - + po.filtered_opts - + tuple(add_opts or []) - + tuple(init_opts or []) - + cfg._init_opts - + cfg._common_opts ): - setattr(opt,o,po.user_opts[o] if o in po.user_opts else None) - - if opt.version: - version() # exits - - if opt.show_hash_presets: - _show_hash_presets() # exits - - from .util import wrap_ripemd160 - wrap_ripemd160() # ripemd160 used by mmgen_cfg_file() - - # === begin Config() initialization === # - - env_cfg = tuple(set_cfg_from_env(cfg)) - - # do this after setting ‘hold_protect_disable’ from env - from .term import init_term - init_term(cfg) - - # --data-dir overrides computed value of data_dir_root - cfg._data_dir_root_override = opt.data_dir - if opt.data_dir: - del opt.data_dir - - from .fileutil import check_or_create_dir - check_or_create_dir(cfg.data_dir_root) - - cfgfile_autoset_opts = {} - cfgfile_auto_typeset_opts = {} - - if not opt.skip_cfg_file: - from .cfgfile import mmgen_cfg_file - # check for changes in system template file - term must be initialized - mmgen_cfg_file(cfg,'sample') - set_cfg_from_cfg_file( + def __init__( + self, cfg, - mmgen_cfg_file(cfg,'usr'), - cfgfile_autoset_opts, - cfgfile_auto_typeset_opts, - env_cfg, - need_proto ) + opts_data = None, + init_opts = None, # dict containing opts to pre-initialize + opt_filter = None, # whitelist of opt letters; all others are skipped + parse_only = False, + parsed_opts = None ): - # Set cfg from opts, setting type from original class attr in Config if it exists: - auto_keys = tuple(cfg._autoset_opts.keys()) + tuple(cfg._auto_typeset_opts.keys()) - for key,val in opt.__dict__.items(): - if val is not None and key not in auto_keys: - setattr(cfg, key, set_for_type(val,getattr(cfg,key),'--'+key) if hasattr(cfg,key) else val) + self.opts_data = od = opts_data or opts_data_dfl + self.opt_filter = opt_filter - if cfg.regtest or cfg.bob or cfg.alice or cfg.carol or gc.prog_name == 'mmgen-regtest': - cfg.network = 'regtest' - cfg.regtest_user = 'bob' if cfg.bob else 'alice' if cfg.alice else 'carol' if cfg.carol else None - else: - cfg.network = 'testnet' if cfg.testnet else 'mainnet' + od['text']['long_options'] = long_opts_data['text'] - cfg.coin = cfg.coin.upper() - cfg.token = cfg.token.upper() or None + # Make this available to usage() + self.usage_data = od['text'].get('usage2') or od['text']['usage'] - # === end global var initialization === # + # po: (user_opts,cmd_args,opts,filtered_opts) + po = parsed_opts or Opts.parse_opts(od,opt_filter=opt_filter,parse_only=parse_only) - """ - cfg.color is finalized, so initialize color - """ - if cfg.color: # MMGEN_DISABLE_COLOR sets this to False - from .color import init_color - init_color(num_colors=('auto',256)[bool(cfg.force_256_color)]) + cfg._args = po.cmd_args + cfg._uopts = uopts = po.user_opts - die_on_incompatible_opts(cfg) + if init_opts: # initialize user opts to given value + for uopt,val in init_opts.items(): + if uopt not in uopts: + uopts[uopt] = val - check_or_create_dir(cfg.data_dir) + cfg._opts = self + cfg._parsed_opts = po + cfg._use_env = True + cfg._use_cfg_file = not 'skip_cfg_file' in uopts - # Set globals from opts, setting type from original global value if it exists: - for key in cfg._autoset_opts: - if hasattr(opt,key): - assert not hasattr(cfg,key), f'{key!r} is in cfg!' - if getattr(opt,key) is not None: - setattr(cfg, key, get_autoset_opt(cfg,key,getattr(opt,key),src='cmdline')) - elif key in cfgfile_autoset_opts: - setattr(cfg, key, get_autoset_opt(cfg,key,cfgfile_autoset_opts[key],src='cfgfile')) - else: - setattr(cfg, key, cfg._autoset_opts[key].choices[0]) + if os.getenv('MMGEN_DEBUG_OPTS'): + opt_preproc_debug(po) - set_auto_typeset_opts(cfg,cfgfile_auto_typeset_opts) + if 'version' in uopts: + self.version() # exits - if cfg.debug and gc.prog_name != 'test.py': - cfg.verbose = True + if 'show_hash_presets' in uopts: + self.show_hash_presets() # exits - if cfg.verbose: - cfg.quiet = False - - if cfg.debug_opts: - opt_postproc_debug(cfg) - - # Check user-set opts without modifying them - check_usr_opts(cfg,po.user_opts) - - from .util import Util - cfg._util = Util(cfg) - cfg._uopts = po.user_opts - cfg._args = po.cmd_args - - cfg._lock() - - if need_proto: - from .protocol import warn_trustlevel,init_proto_from_cfg - warn_trustlevel(cfg) - # requires the default-to-none behavior, so do after the lock: - cfg._proto = init_proto_from_cfg(cfg,need_amt=need_amt) - - # print help screen only after globals initialized and locked: - if cfg.help or cfg.longhelp: - if not do_post_init: - print_help(cfg,opts_data,opt_filter) # exits - - if do_post_init: - global opts_data_save,opt_filter_save - opts_data_save = opts_data - opt_filter_save = opt_filter - else: - delete_data(opts_data) - -def check_usr_opts(cfg,usr_opts): # Raises an exception if any check fails - - def opt_splits(val,sep,n,desc): - sepword = 'comma' if sep == ',' else 'colon' if sep == ':' else repr(sep) - try: - l = val.split(sep) - except: - die( 'UserOptError', f'{val!r}: invalid {desc} (not {sepword}-separated list)' ) - - if len(l) != n: - die( 'UserOptError', f'{val!r}: invalid {desc} ({n} {sepword}-separated items required)' ) - - def opt_compares(val,op_str,target,desc,desc2=''): - import operator as o - op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str] - if not op_f(val,target): - d2 = desc2 + ' ' if desc2 else '' - die( 'UserOptError', f'{val}: invalid {desc} ({d2}not {op_str} {target})' ) - - def opt_is_int(val,desc): - if not is_int(val): - die( 'UserOptError', f'{val!r}: invalid {desc} (not an integer)' ) - - def opt_is_float(val,desc): - try: - float(val) - except: - die( 'UserOptError', f'{val!r}: invalid {desc} (not a floating-point number)' ) - - def opt_is_in_list(val,tlist,desc): - if val not in tlist: - q,sep = (('',','),("'","','"))[type(tlist[0]) == str] - die( 'UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format( - v = val, - w = desc, - q = q, - o = sep.join(map(str,sorted(tlist))) )) - - def opt_unrecognized(key,val,desc='value'): - die( 'UserOptError', f'{val!r}: unrecognized {desc} for option {fmt_opt(key)!r}' ) - - def opt_display(key,val='',beg='For selected',end=':\n'): - from .util import msg_r - msg_r('{} option {!r}{}'.format( - beg, - f'{fmt_opt(key)}={val}' if val else fmt_opt(key), - end )) - - def chk_in_fmt(key,val,desc): - from .wallet import get_wallet_data - wd = get_wallet_data(fmt_code=val) - if not wd: - opt_unrecognized(key,val) - if key == 'out_fmt': - p = 'hidden_incog_output_params' - - if wd.type == 'incog_hidden' and not getattr(opt,p): - die( 'UserOptError', - 'Hidden incog format output requested. ' + - f'You must supply a file and offset with the {fmt_opt(p)!r} option' ) - - if wd.base_type == 'incog_base' and opt.old_incog_fmt: - opt_display(key,val,beg='Selected',end=' ') - opt_display('old_incog_fmt',beg='conflicts with',end=':\n') - die( 'UserOptError', 'Export to old incog wallet format unsupported' ) - elif wd.type == 'brain': - die( 'UserOptError', 'Output to brainwallet format unsupported' ) - - chk_out_fmt = chk_in_fmt - - def chk_hidden_incog_input_params(key,val,desc): - a = val.rsplit(',',1) # permit comma in filename - if len(a) != 2: - opt_display(key,val) - die( 'UserOptError', 'Option requires two comma-separated arguments' ) - - fn,offset = a - opt_is_int(offset,desc) - - from .fileutil import check_infile,check_outdir,check_outfile - if key == 'hidden_incog_input_params': - check_infile(fn,blkdev_ok=True) - key2 = 'in_fmt' - else: - try: os.stat(fn) - except: - b = os.path.dirname(fn) - if b: - check_outdir(b) - else: - check_outfile(fn,blkdev_ok=True) - key2 = 'out_fmt' - - if hasattr(opt,key2): - val2 = getattr(opt,key2) - from .wallet import get_wallet_data - wd = get_wallet_data('incog_hidden') - if val2 and val2 not in wd.fmt_codes: - die( 'UserOptError', f'Option conflict:\n {fmt_opt(key)}, with\n {fmt_opt(key2)}={val2}' ) - - chk_hidden_incog_output_params = chk_hidden_incog_input_params - - def chk_subseeds(key,val,desc): - from .subseed import SubSeedIdxRange - opt_is_int(val,desc) - opt_compares(int(val),'>=',SubSeedIdxRange.min_idx,desc) - opt_compares(int(val),'<=',SubSeedIdxRange.max_idx,desc) - - def chk_seed_len(key,val,desc): - from .seed import Seed - opt_is_int(val,desc) - opt_is_in_list(int(val),Seed.lens,desc) - - def chk_hash_preset(key,val,desc): - from .crypto import Crypto - opt_is_in_list(val,list(Crypto.hash_presets.keys()),desc) - - def chk_brain_params(key,val,desc): - a = val.split(',') - if len(a) != 2: - opt_display(key,val) - die( 'UserOptError', 'Option requires two comma-separated arguments' ) - opt_is_int(a[0],'seed length '+desc) - from .seed import Seed - opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc) - from .crypto import Crypto - opt_is_in_list(a[1],list(Crypto.hash_presets.keys()),'hash preset '+desc) - - def chk_usr_randchars(key,val,desc): - if val == 0: + if parse_only: return - opt_is_int(val,desc) - opt_compares(val,'>=',cfg.min_urandchars,desc) - opt_compares(val,'<=',cfg.max_urandchars,desc) - def chk_tx_fee(key,val,desc): - pass -# opt_is_tx_fee(key,val,desc) # TODO: move this check elsewhere + if 'data_dir' in uopts: + cfg._data_dir_root_override = uopts['data_dir'] + del uopts['data_dir'] - def chk_tx_confs(key,val,desc): - opt_is_int(val,desc) - opt_compares(val,'>=',1,desc) + def init_bottom(self,cfg): - def chk_vsize_adj(key,val,desc): - opt_is_float(val,desc) - from .util import ymsg - ymsg(f'Adjusting transaction vsize by a factor of {float(val):1.2f}') + # print help screen only after globals initialized and locked: + if cfg.help or cfg.longhelp: + self.print_help(cfg) # exits - def chk_daemon_id(key,val,desc): - from .daemon import CoinDaemon - opt_is_in_list(val,CoinDaemon.all_daemon_ids(),desc) + # delete unneeded data: + for k in ('text','notes','code'): + if k in self.opts_data: + del self.opts_data[k] + del Opts.make_help + del Opts.process_uopts + del Opts.parse_opts -# TODO: move this check elsewhere -# def chk_rbf(key,val,desc): -# if not proto.cap('rbf'): -# die( 'UserOptError', f'--rbf requested, but {proto.coin} does not support replace-by-fee transactions' ) + def usage(self): + from .util import Die + Die(1,Opts.make_usage_str(gc.prog_name,'user',self.usage_data)) -# def chk_bob(key,val,desc): -# from .proto.btc.regtest import MMGenRegtest -# try: -# os.stat(os.path.join(MMGenRegtest(cfg,cfg.coin).d.datadir,'regtest','debug.log')) -# except: -# die( 'UserOptError', -# 'Regtest (Bob and Alice) mode not set up yet. ' + -# f"Run '{gc.proj_name.lower()}-regtest setup' to initialize." ) -# -# chk_alice = chk_bob + def version(self): + from .util import Die,fmt + Die(0,fmt(f""" + {gc.prog_name.upper()} version {gc.version} + Part of the {gc.proj_name} suite, an online/offline cryptocurrency wallet for the + command line. Copyright (C){gc.Cdates} {gc.author} {gc.email} + """,indent=' ').rstrip()) - def chk_locktime(key,val,desc): - opt_is_int(val,desc) - opt_compares(int(val),'>',0,desc) + def print_help(self,cfg): - def chk_columns(key,val,desc): - opt_compares(int(val),'>',10,desc) + if not 'code' in self.opts_data: + self.opts_data['code'] = {} -# TODO: move this check elsewhere -# def chk_token(key,val,desc): -# if not 'token' in proto.caps: -# die( 'UserOptError', f'Coin {tx.coin!r} does not support the --token option' ) -# if len(val) == 40 and is_hex_str(val): -# return -# if len(val) > 20 or not all(s.isalnum() for s in val): -# die( 'UserOptError', f'{val!r}: invalid parameter for --token option' ) + from .protocol import init_proto_from_cfg + proto = init_proto_from_cfg(cfg,need_amt=True) - from .util import is_int,die,Msg + if getattr(cfg,'longhelp',None): + self.opts_data['code']['long_options'] = long_opts_data['code'] + def remove_unneeded_long_opts(): + d = self.opts_data['text']['long_options'] + if proto.base_proto != 'Ethereum': + d = '\n'.join(''+i for i in d.split('\n') if not '--token' in i) + self.opts_data['text']['long_options'] = d + remove_unneeded_long_opts() - cfuncs = { k:v for k,v in locals().items() if k.startswith('chk_') } + from .ui import do_pager + do_pager(Opts.make_help( cfg, proto, self.opts_data, self.opt_filter )) - for key in usr_opts: - val = getattr(cfg,key) - desc = f'parameter for {fmt_opt(key)!r} option' + sys.exit(0) - if key in cfg._infile_opts: - from .fileutil import check_infile - check_infile(val) # file exists and is readable - dies on error - elif key == 'outdir': - from .fileutil import check_outdir - check_outdir(val) # dies on error - elif 'chk_'+key in cfuncs: - cfuncs['chk_'+key](key,val,desc) - elif cfg.debug: - Msg(f'check_usr_opts(): No test for opt {key!r}') - -def set_auto_typeset_opts(cfg,cfgfile_auto_typeset_opts): - - def do_set(key,val,ref_type): - assert not hasattr(cfg,key), f'{key!r} is in cfg!' - setattr(cfg,key,None if val is None else ref_type(val)) - - for key,ref_type in cfg._auto_typeset_opts.items(): - if hasattr(opt,key): - do_set(key, getattr(opt,key), ref_type) - elif key in cfgfile_auto_typeset_opts: - do_set(key, cfgfile_auto_typeset_opts[key], ref_type) - -def get_autoset_opt(cfg,key,val,src): - - def die_on_err(desc): - from .util import fmt_list,die - die( - 'UserOptError', - '{a!r}: invalid {b} (not {c}: {d})'.format( - a = val, - b = { - 'cmdline': 'parameter for option --{}'.format(key.replace('_','-')), - 'cfgfile': 'value for cfg file option {!r}'.format(key) - }[src], - c = desc, - d = fmt_list(data.choices) )) - - class opt_type: - - def nocase_str(): - if val.lower() in data.choices: - return val.lower() - else: - die_on_err('one of') - - def nocase_pfx(): - cs = [s for s in data.choices if s.startswith(val.lower())] - if len(cs) == 1: - return cs[0] - else: - die_on_err('unique substring of') - - data = cfg._autoset_opts[key] - - return getattr(opt_type,data.type)() + def show_hash_presets(self): + fs = ' {:<6} {:<3} {:<2} {}' + from .util import msg + from .crypto import Crypto + msg(' Available parameters for scrypt.hash():') + msg(fs.format('Preset','N','r','p')) + for i in sorted(Crypto.hash_presets.keys()): + msg(fs.format(i,*Crypto.hash_presets[i])) + msg(' N = memory usage (power of two)\n p = iterations (rounds)') + sys.exit(0) diff --git a/test/test_py_d/ts_misc.py b/test/test_py_d/ts_misc.py index 8023e1af..964dbc37 100755 --- a/test/test_py_d/ts_misc.py +++ b/test/test_py_d/ts_misc.py @@ -34,6 +34,7 @@ class TestSuiteMisc(TestSuiteBase): cmd_group = ( ('rpc_backends', 'RPC backends'), ('xmrwallet_txview', "'mmgen-xmrwallet' txview"), + ('coin_daemon_info', "'examples/coin-daemon-info.py'"), ('term_echo', "term.set('echo')"), ('term_cleanup', 'term.register_cleanup()'), ) @@ -56,6 +57,16 @@ class TestSuiteMisc(TestSuiteBase): assert s in res, s return t + def coin_daemon_info(self): + start_test_daemons('ltc','eth') + t = self.spawn(f'examples/coin-daemon-info.py',['btc','ltc','eth'],cmd_dir='.') + for s in ('BTC','LTC','ETH'): + t.expect(s + r'\s+mainnet\s+Up',regex=True) + if cfg.pexpect_spawn: + t.send('q') + stop_test_daemons('ltc','eth') + return t + def term_echo(self): def test_echo():