new CfgFile API for mmgen.cfg and related files

Testing:

  $ test/test.py cfg
This commit is contained in:
The MMGen Project 2020-03-12 17:01:47 +00:00
commit 924ccc6012
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
9 changed files with 598 additions and 74 deletions

283
mmgen/cfg.py Executable file
View file

@ -0,0 +1,283 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2020 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
cfg.py: API for the MMGen runtime configuration file and related files
"""
# NB: This module is used by override_from_cfg_file(), which is called before override_from_env()
# during init, so global config vars that are set from the environment (such as g.test_suite)
# cannot be used here.
import sys,os,re,hashlib
from collections import namedtuple
from mmgen.globalvars import *
from mmgen.util import *
def cfg_file(id_str):
return CfgFile.get_cls_by_id(id_str)()
class CfgFile(object):
cur_ver = 2
ver = None
write_ok = False
warn_missing = True
write_metadata = False
fn_base = g.proj_name.lower() + '.cfg'
file_not_found_fs = 'WARNING: {} not found at {!r}'
def __init__(self):
self.fn = os.path.join(self.fn_dir,self.fn_base)
self.data = self.get_data()
def get_data(self):
try:
return open(self.fn).read().splitlines()
except:
if self.warn_missing:
msg(self.file_not_found_fs.format(self.desc,self.fn))
return ''
def copy_data(self):
assert self.write_ok, 'writing to file {!r} not allowed!'.format(self.fn)
src = cfg_file('sys')
if src.data:
data = src.data + src.make_metadata() if self.write_metadata else src.data
try:
open(self.fn,'w').write('\n'.join(data)+'\n')
os.chmod(self.fn,0o600)
except:
die(2,'ERROR: unable to write to {!r}'.format(self.fn))
def parse_var(self,line,lineno):
try:
m = re.match(r'(\w+)(\s+(\S+)|(\s+\w+:\S+)+)$',line) # allow multiple colon-separated values
return (m[1], dict([i.split(':') for i in m[2].split()]) if m[4] else m[3])
except:
raise CfgFileParseError('Parse error in file {!r}, line {}'.format(self.fn,lineno))
def parse(self):
cdata = namedtuple('cfg_var',['name','value','lineno'])
def do_parse():
for n,line in enumerate(self.data,1):
line = strip_comments(line)
if line == '':
continue
yield cdata(*self.parse_var(line,n),n)
return do_parse()
@classmethod
def get_cls_by_id(self,id_str):
d = {
'usr': CfgFileUsr,
'sys': CfgFileSampleSys,
'sample': CfgFileSampleUsr,
'dist': CfgFileSampleDist,
}
return d[id_str]
class CfgFileSample(CfgFile):
@classmethod
def cls_make_metadata(cls,data):
return ['# Version {} {}'.format(cls.cur_ver,cls.compute_chksum(data))]
@staticmethod
def compute_chksum(data):
return hashlib.new('ripemd160','\n'.join(data).encode()).hexdigest()
@property
def computed_chksum(self):
return type(self).compute_chksum(self.data)
def parse(self,parse_vars=False):
"""
The config file template contains some 'magic':
- lines must either be empty or begin with '# '
- each commented chunk must end with a parsable cfg variable line
- chunks are delimited by one or more blank lines
- lines beginning with '##' are ignored
- everything up to first line beginning with '##' is ignored
- last line is metadata line of the form '# Version VER_NUM HASH'
"""
cdata = namedtuple('chunk_data',['name','lines','lineno','parsed'])
def process_chunk(chunk,n):
last_line = chunk[-1].split()
return cdata(
last_line[1],
chunk,
n,
self.parse_var(' '.join(last_line[1:]),n) if parse_vars else None,
)
def get_chunks(lines):
hdr = True
chunk = []
in_chunk = False
for n,line in enumerate(lines,1):
if line.startswith('##'):
hdr = False
continue
if hdr:
continue
if line == '':
in_chunk = False
elif line.startswith('# '):
if in_chunk == False:
if chunk:
yield process_chunk(chunk,last_nonblank)
chunk = [line]
in_chunk = True
else:
chunk.append(line)
last_nonblank = n
else:
die(2,'parse error in file {!r}, line {}'.format(self.fn,n))
if chunk:
yield process_chunk(chunk,last_nonblank)
return list(get_chunks(self.data))
class CfgFileUsr(CfgFile):
desc = 'user configuration file'
warn_missing = False
fn_dir = g.data_dir_root
write_ok = True
def __init__(self):
super().__init__()
if not self.data:
self.copy_data()
class CfgFileSampleDist(CfgFileSample):
desc = 'source distribution configuration file'
fn_dir = 'data_files'
class CfgFileSampleSys(CfgFileSample):
desc = 'system sample configuration file'
test_fn_subdir = 'usr.local.share'
@property
def fn_dir(self):
if os.getenv('MMGEN_TEST_SUITE_CFGTEST'):
return os.path.join(g.data_dir_root,self.test_fn_subdir)
else:
return g.shared_data_path
def make_metadata(self):
return ['# Version {} {}'.format(self.cur_ver,self.computed_chksum)]
class CfgFileSampleUsr(CfgFileSample):
desc = 'sample configuration file'
warn_missing = False
fn_base = g.proj_name.lower() + '.cfg.sample'
fn_dir = g.data_dir_root
write_ok = True
chksum = None
write_metadata = True
details_confirm_prompt = 'View details?'
out_of_date_fs = 'File {!r} is out of date - replacing'
altered_by_user_fs = 'File {!r} was altered by user - replacing'
def __init__(self):
super().__init__()
src = cfg_file('sys')
if not src.data:
return
if self.data:
if self.parse_metadata():
if self.chksum == self.computed_chksum:
diff = self.diff(self.parse(),src.parse())
if not diff:
return
self.show_changes(diff)
else:
msg(self.altered_by_user_fs.format(self.fn))
else:
msg(self.out_of_date_fs.format(self.fn))
self.copy_data()
def parse_metadata(self):
if self.data:
m = re.match(r'# Version (\d+) ([a-f0-9]{40})$',self.data[-1])
if m:
self.ver = m[1]
self.chksum = m[2]
self.data = self.data[:-1] # remove metadata line
return True
def diff(self,a_tup,b_tup): # a=user, b=system
a = [i.name for i in a_tup]#[3:] # Debug
b = [i.name for i in b_tup]#[:-2] # Debug
removed = set(a) - set(b)
added = set(b) - set(a)
if removed or added:
return {
'removed': [i for i in a_tup if i.name in removed],
'added': [i for i in b_tup if i.name in added],
}
else:
return None
def show_changes(self,diff):
ymsg('Warning: configuration file options have changed!\n')
m1 = ' The following option{} been {}:\n {}\n'
m2 = """
The following removed option{} set in {!r}
and must be deleted or commented out:
{}
"""
for desc in ('added','removed'):
data = diff[desc]
if data:
opts = fmt_list([i.name for i in data],fmt='bare')
msg(m1.format(suf(data,verb='has'),desc,opts))
if desc == 'removed' and data:
uc = cfg_file('usr')
usr_names = [i.name for i in uc.parse()]
rm_names = [i.name for i in data]
bad = sorted(set(usr_names).intersection(rm_names))
if bad:
ymsg(fmt(m2,' ').format(suf(bad,verb='is'),uc.fn,' '+fmt_list(bad,fmt='bare')))
while True:
if not keypress_confirm(self.details_confirm_prompt,no_nl=True):
return
def get_details():
for desc,data in diff.items():
sep,sep2 = ('\n ','\n\n ')
if data:
yield (
'{} section{}:'.format(capfirst(desc),suf(data))
+ sep2
+ sep2.join(['{}'.format(sep.join(v.lines)) for v in data])
)
do_pager(
'CONFIGURATION FILE CHANGES\n\n'
+ '\n\n'.join(get_details()) + '\n'
)

View file

@ -32,6 +32,7 @@ class RangeError(Exception): mmcode = 1
class FileNotFound(Exception): mmcode = 1
class InvalidPasswdFormat(Exception): mmcode = 1
class CfgFileParseError(Exception): mmcode = 1
class UserOptError(Exception): mmcode = 1
# 2: yellow hl, message only
class InvalidTokenAddress(Exception): mmcode = 2

View file

@ -131,6 +131,14 @@ class g(object):
else:
die(2,'$HOME is not set! Unable to determine home directory')
# https://wiki.debian.org/Python:
# Debian (Ubuntu) sys.prefix is '/usr' rather than '/usr/local, so add 'local'
# This must match the configuration in setup.py
shared_data_path = os.path.join(
sys.prefix,
*(['local','share'] if platform == 'linux' else ['share']),
proj_name.lower()
)
data_dir_root,data_dir,cfg_file = None,None,None
daemon_data_dir = '' # set by user or protocol

View file

@ -76,15 +76,6 @@ def opt_postproc_debug():
Msg('\n=== end opts.py debug ===\n')
def opt_postproc_initializations():
from mmgen.term import set_terminal_vars
set_terminal_vars()
if g.color: # MMGEN_DISABLE_COLOR sets this to False
from mmgen.color import start_mscolor,init_color
if g.platform == 'win':
start_mscolor()
init_color(num_colors=('auto',256)[bool(g.force_256_color)])
g.coin = g.coin.upper() # allow user to use lowercase
g.dcoin = g.coin # the display coin; for ERC20 tokens, g.dcoin is set to the token symbol
@ -95,85 +86,39 @@ def set_data_dir_root():
# mainnet and testnet share cfg file, as with Core
g.cfg_file = os.path.join(g.data_dir_root,'{}.cfg'.format(g.proj_name.lower()))
def get_cfg_template_data():
# https://wiki.debian.org/Python:
# Debian (Ubuntu) sys.prefix is '/usr' rather than '/usr/local, so add 'local'
# TODO - test for Windows
# This must match the configuration in setup.py
cfg_template = os.path.join(*([sys.prefix]
+ (['share'],['local','share'])[g.platform=='linux']
+ [g.proj_name.lower(),os.path.basename(g.cfg_file)]))
try:
return open(cfg_template).read()
except:
msg("WARNING: configuration template not found at '{}'".format(cfg_template))
return ''
def init_term_and_color():
from mmgen.term import set_terminal_vars
set_terminal_vars()
def get_data_from_cfg_file():
check_or_create_dir(g.data_dir_root) # dies on error
template_data = get_cfg_template_data()
data = {}
if g.color: # MMGEN_DISABLE_COLOR sets this to False
from mmgen.color import start_mscolor,init_color
if g.platform == 'win':
start_mscolor()
init_color(num_colors=('auto',256)[bool(g.force_256_color)])
def copy_template_data(fn):
try:
open(fn,'wb').write(template_data.encode())
os.chmod(fn,0o600)
except:
die(2,"ERROR: unable to write to datadir '{}'".format(g.data_dir))
for k,suf in (('cfg',''),('sample','.sample')):
try:
data[k] = open(g.cfg_file+suf,'rb').read().decode()
except:
if template_data:
copy_template_data(g.cfg_file+suf)
data[k] = template_data
else:
data[k] = ''
if template_data and data['sample'] != template_data:
g.cfg_options_changed = True
copy_template_data(g.cfg_file+'.sample')
return data['cfg']
def override_globals_from_cfg_file(cfg_data):
import re
def override_globals_from_cfg_file(ucfg):
from mmgen.protocol import CoinProtocol
from mmgen.util import strip_comments
for n,l in enumerate(cfg_data.splitlines(),1):
l = strip_comments(l)
if l == '':
continue
try:
m = re.match(r'(\w+)(\s+(\S+)|(\s+\w+:\S+)+)$',l) # allow multiple colon-separated values
name = m[1]
val = dict([i.split(':') for i in m[2].split()]) if m[4] else m[3]
except:
raise CfgFileParseError('Parse error in file {!r}, line {}'.format(g.cfg_file,n))
if name in g.cfg_file_opts:
ns = name.split('_')
for d in ucfg.parse():
val = d.value
if d.name in g.cfg_file_opts:
ns = d.name.split('_')
if ns[0] in CoinProtocol.coins:
nse,tn = (ns[2:],True) if len(ns) > 2 and ns[1] == 'testnet' else (ns[1:],False)
cls = CoinProtocol(ns[0],tn)
attr = '_'.join(nse)
else:
cls = g
attr = name
attr = d.name
refval = getattr(cls,attr)
if type(refval) is dict and type(val) is str: # catch single colon-separated value
if type(refval) is dict and type(val) is str: # hack - catch single colon-separated value
try:
val = dict([val.split(':')])
except:
raise CfgFileParseError('Parse error in file {!r}, line {}'.format(g.cfg_file,n))
val_conv = set_for_type(val,refval,attr,src=g.cfg_file)
raise CfgFileParseError('Parse error in file {!r}, line {}'.format(ucfg.fn,d.lineno))
val_conv = set_for_type(val,refval,attr,src=ucfg.fn)
setattr(cls,attr,val_conv)
else:
die(2,'{!r}: unrecognized option in {!r}'.format(name,g.cfg_file))
die(2,'{!r}: unrecognized option in {!r}, line {}'.format(d.name,ucfg.fn,d.lineno))
def override_globals_from_env():
for name in g.env_opts:
@ -263,8 +208,15 @@ def init(opts_data,add_opts=[],opt_filter=None,parse_only=False):
# cfg file is in g.data_dir_root, wallet and other data are in g.data_dir
# We must set g.data_dir_root and g.cfg_file from cmdline before processing cfg file
set_data_dir_root()
check_or_create_dir(g.data_dir_root)
init_term_and_color()
if not opt.skip_cfg_file:
override_globals_from_cfg_file(get_data_from_cfg_file())
from mmgen.cfg import cfg_file
cfg_file('sample') # check for changes in system template file
override_globals_from_cfg_file(cfg_file('usr'))
override_globals_from_env()
# Set globals from opts, setting type from original global value

View file

@ -100,6 +100,7 @@ setup(
'mmgen.baseconv',
'mmgen.bech32',
'mmgen.bip39',
'mmgen.cfg',
'mmgen.color',
'mmgen.common',
'mmgen.crypto',

20
test/misc/cfg.py Executable file
View file

@ -0,0 +1,20 @@
#!/usr/bin/env python3
from mmgen.util import msg
from mmgen.common import *
cmd_args = opts.init({'text': { 'desc': '', 'usage':'', 'options':'' }})
from mmgen.cfg import cfg_file
cu = cfg_file('usr')
cS = cfg_file('sys')
cs = cfg_file('sample')
msg('usr cfg: {}'.format(cu.fn))
msg('sys cfg: {}'.format(cS.fn))
msg('sample cfg: {}'.format(cs.fn))
if cmd_args == ['parse_test']:
ps = cs.parse(parse_vars=True)
msg('parsed chunks: {}'.format(len(ps)))
pu = cu.parse()
msg('usr cfg: {}'.format(' '.join(['{}={}'.format(i.name,i.value) for i in pu])))

107
test/ref/mmgen.cfg Normal file
View file

@ -0,0 +1,107 @@
# Configuration file for the MMGen suite
# Everything following a '#' is ignored.
##################
## User options ##
##################
# Uncomment to suppress the GPL license prompt:
# no_license true
# Uncomment to enable quieter output:
# quiet true
# Uncomment to disable color output:
# color false
# Uncomment to force 256-color output when 'color' is true:
# force_256_color true
# Uncomment to use regtest mode (this also sets testnet to true):
# regtest true
# Uncomment to use testnet instead of mainnet:
# testnet true
# Set the RPC host (the host the coin daemon is running on):
# rpc_host localhost
# Set the RPC host's port number:
# rpc_port 8332
# Uncomment to override 'rpcuser' from coin daemon config file:
# rpc_user myusername
# Uncomment to override 'rpcpassword' from coin daemon config file:
# rpc_password mypassword
# Uncomment to set the coin daemon datadir:
# daemon_data_dir /path/to/datadir
# Set the default hash preset:
# hash_preset 3
# Set the default number of subseeds:
# subseeds 100
# Set the default number of entropy characters to get from user.
# Must be between 10 and 80.
# A value of 0 disables user entropy, but this is not recommended:
# usr_randchars 30
# Set the maximum transaction fee for BTC:
# btc_max_tx_fee 0.003
# Set the transaction fee adjustment factor. Auto-calculated fees are
# multiplied by this value:
# tx_fee_adj 1.0
# Set the maximum transaction file size:
# max_tx_file_size 100000
# Set the maximum input size - applies both to files and standard input:
# max_input_size 1048576
# Set the mnemonic entry mode for each supported wordlist. Setting this option
# also turns off all information output for the configured wordlists:
# mnemonic_entry_modes mmgen:minimal bip39:fixed xmrseed:short
#####################
## Altcoin options ##
#####################
# Set the maximum transaction fee for BCH:
# bch_max_tx_fee 0.1
# Set the maximum transaction fee for LTC:
# ltc_max_tx_fee 0.3
# Set the maximum transaction fee for ETH:
# eth_max_tx_fee 0.005
# Set the Ethereum mainnet name:
# eth_mainnet_chain_name foundation
# Set the Ethereum testnet name:
# eth_testnet_chain_name kovan
# Set the Monero wallet RPC host:
# monero_wallet_rpc_host localhost
# Set the Monero wallet RPC username:
# monero_wallet_rpc_user monero
# Set the Monero wallet RPC password to something secure:
# monero_wallet_rpc_password passw0rd
#######################################################################
## The following options are probably of interest only to developers ##
#######################################################################
# Uncomment to display lots of debugging information:
# debug true
# Set the timeout for RPC connections:
# http_timeout 60

View file

@ -338,6 +338,7 @@ cfgs = { # addr_idx_lists (except 31,32,33,34) must contain exactly 8 addresses
'32': {},
'33': {},
'34': {},
'40': {},
}
def fixup_cfgs():
@ -462,6 +463,7 @@ def set_restore_term_at_exit():
class CmdGroupMgr(object):
cmd_groups_dfl = {
'cfg': ('TestSuiteCfg',{'full_data':True}),
'helpscreens': ('TestSuiteHelp',{'modname':'misc','full_data':True}),
'main': ('TestSuiteMain',{'full_data':True}),
'conv': ('TestSuiteWalletConv',{'is3seed':True,'modname':'wallet'}),

150
test/test_py_d/ts_cfg.py Executable file
View file

@ -0,0 +1,150 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2020 The MMGen Project <mmgen@tuta.io>
#
# Project source code repository: https://github.com/mmgen/mmgen
# Licensed according to the terms of GPL Version 3. See LICENSE for details.
"""
ts_misc.py: CfgFile tests for the MMGen test.py test suite
"""
import shutil
from test.common import *
from test.test_py_d.ts_base import *
from mmgen.cfg import *
class TestSuiteCfg(TestSuiteBase):
'CfgFile API'
networks = ('btc',)
tmpdir_nums = [40]
base_passthru_opts = ()
cmd_group = (
('nosysfile', (40,'init with missing system cfg sample file', [])),
('sysfile', (40,'init with system cfg sample file in place', [])),
('no_metadata_sample', (40,'init with unversioned cfg sample file', [])),
('altered_sample', (40,'init with user-modified cfg sample file', [])),
('old_sample', (40,'init with old v2 cfg sample file', [])),
('old_sample_bad_var', (40,'init with old v2 cfg sample file and bad variable in mmgen.cfg', [])),
)
def __init__(self,trunner,cfgs,spawn):
os.environ['MMGEN_TEST_SUITE_CFGTEST'] = '1'
TestSuiteBase.__init__(self,trunner,cfgs,spawn)
def spawn_test(self,args=[]):
return self.spawn('test/misc/cfg.py',['--data-dir={}'.format(self.path('data_dir'))]+args,cmd_dir='.')
def path(self,id_str):
return {
'ref': 'test/ref/mmgen.cfg',
'data_dir': '{}/data_dir'.format(self.tmpdir),
'shared_data': '{}/data_dir/{}'.format(self.tmpdir,CfgFileSampleSys.test_fn_subdir),
'usr': '{}/data_dir/mmgen.cfg'.format(self.tmpdir),
'sys': '{}/data_dir/{}/mmgen.cfg'.format(self.tmpdir,CfgFileSampleSys.test_fn_subdir),
'sample': '{}/data_dir/mmgen.cfg.sample'.format(self.tmpdir),
}[id_str]
def nosysfile(self):
t = self.spawn_test()
errstr = CfgFile.file_not_found_fs.format(CfgFileSampleSys.desc,self.path('shared_data')+'/mmgen.cfg')
for i in (1,2,3,4,5):
t.expect(errstr)
for k in ('usr','sys','sample'):
t.expect('{} cfg: {}'.format(k,self.path(k)))
assert not os.path.exists(self.path(k)), self.path(k)
t.read()
return t
def copy_sys_sample(self):
os.makedirs(self.path('shared_data'),exist_ok=True)
shutil.copy2(self.path('ref'),self.path('sys'))
def sysfile(self):
self.copy_sys_sample()
t = self.spawn_test()
t.read()
u = read_from_file(self.path('usr'))
S = read_from_file(self.path('sys'))
assert u[-1] == '\n', u
assert u == S, 'u != S'
self.check_replaced_sample()
return t
def check_replaced_sample(self):
S = read_from_file(self.path('sys'))
s = read_from_file(self.path('sample'))
assert s[-1] == '\n', s
assert S.splitlines() == s.splitlines()[:-1], 'sys != sample[:-1]'
def bad_sample(self,s,e):
write_to_file(self.path('sample'),s)
t = self.spawn_test()
t.expect(e)
t.read()
self.check_replaced_sample()
return t
def no_metadata_sample(self):
self.copy_sys_sample()
s = read_from_file(self.path('sys'))
e = CfgFileSampleUsr.out_of_date_fs.format(self.path('sample'))
return self.bad_sample(s,e)
def altered_sample(self):
s = '\n'.join(read_from_file(self.path('sample')).splitlines()[1:]) + '\n'
e = CfgFileSampleUsr.altered_by_user_fs.format(self.path('sample'))
return self.bad_sample(s,e)
def old_sample_common(self,old_set=False,args=[]):
s = read_from_file(self.path('sys'))
d = s.replace('monero_','zcash_').splitlines()
a1 = ['','# Uncomment to make foo true:','# foo true']
a2 = ['','# Uncomment to make bar false:','# bar false']
d = d + a1 + a2
chk = CfgFileSample.cls_make_metadata(d)
write_to_file(self.path('sample'),'\n'.join(d+chk) + '\n')
t = self.spawn_test(args=args)
t.expect('options have changed')
for s in ('have been added','monero_','have been removed','zcash_','foo','bar'):
t.expect(s)
if old_set:
for s in ('must be deleted','bar','foo'):
t.expect(s)
cp = CfgFileSampleUsr.details_confirm_prompt + ' (y/N): '
t.expect(cp,'y')
for s in ('CHANGES','Removed','# zcash_','# foo','# bar','Added','# monero_'):
t.expect(s)
t.expect(cp,'n')
if old_set:
t.expect('unrecognized option')
t.req_exit_val = 2
if args == ['parse_test']:
t.expect('parsed chunks: 29')
t.expect('usr cfg: testnet=true rpc_password=passwOrd')
t.read()
if not old_set:
self.check_replaced_sample()
return t
def old_sample(self):
d = ['testnet true','rpc_password passwOrd']
write_to_file(self.path('usr'),'\n'.join(d) + '\n')
return self.old_sample_common(args=['parse_test'])
def old_sample_bad_var(self):
d = ['foo true','bar false']
write_to_file(self.path('usr'),'\n'.join(d) + '\n')
return self.old_sample_common(old_set=True)