|
@@ -21,9 +21,9 @@ mmgen-tool: Perform various MMGen- and cryptocoin-related operations.
|
|
|
Part of the MMGen suite
|
|
|
"""
|
|
|
|
|
|
-import sys,os,importlib
|
|
|
-from .cfg import gc,Config
|
|
|
-from .util import msg,Msg,die,capfirst,suf,async_run
|
|
|
+import sys, os, importlib
|
|
|
+from .cfg import gc, Config
|
|
|
+from .util import msg, Msg, die, capfirst, suf, async_run
|
|
|
|
|
|
opts_data = {
|
|
|
'text': {
|
|
@@ -59,15 +59,15 @@ Type ‘{pn} help <command>’ for help on a particular command
|
|
|
"""
|
|
|
},
|
|
|
'code': {
|
|
|
- 'options': lambda cfg,s, help_notes: s.format(
|
|
|
- kgs=help_notes('keygen_backends'),
|
|
|
- coin_id=help_notes('coin_id'),
|
|
|
- cfg=cfg,
|
|
|
- gc=gc,
|
|
|
+ 'options': lambda cfg, s, help_notes: s.format(
|
|
|
+ kgs = help_notes('keygen_backends'),
|
|
|
+ coin_id = help_notes('coin_id'),
|
|
|
+ cfg = cfg,
|
|
|
+ gc = gc,
|
|
|
),
|
|
|
- 'notes': lambda cfg,s, help_notes: s.format(
|
|
|
- ch=help_notes('tool_help'),
|
|
|
- pn=gc.prog_name)
|
|
|
+ 'notes': lambda cfg, s, help_notes: s.format(
|
|
|
+ ch = help_notes('tool_help'),
|
|
|
+ pn = gc.prog_name)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -177,16 +177,16 @@ mods = {
|
|
|
}
|
|
|
|
|
|
def get_cmds():
|
|
|
- return [cmd for mod,cmds in mods.items() if mod != 'help' for cmd in cmds]
|
|
|
+ return [cmd for mod, cmds in mods.items() if mod != 'help' for cmd in cmds]
|
|
|
|
|
|
-def create_call_sig(cmd,cls,as_string=False):
|
|
|
+def create_call_sig(cmd, cls, as_string=False):
|
|
|
|
|
|
- m = getattr(cls,cmd)
|
|
|
+ m = getattr(cls, cmd)
|
|
|
|
|
|
if 'varargs_call_sig' in m.__code__.co_varnames: # hack
|
|
|
flag = 'VAR_ARGS'
|
|
|
va = m.__defaults__[0]
|
|
|
- args,dfls,ann = va['args'],va['dfls'],va['annots']
|
|
|
+ args, dfls, ann = va['args'], va['dfls'], va['annots']
|
|
|
else:
|
|
|
flag = None
|
|
|
args = m.__code__.co_varnames[1:m.__code__.co_argcount]
|
|
@@ -195,25 +195,25 @@ def create_call_sig(cmd,cls,as_string=False):
|
|
|
|
|
|
nargs = len(args) - len(dfls)
|
|
|
dfl_types = tuple(
|
|
|
- ann[a] if a in ann and isinstance(ann[a],type) else type(dfls[i])
|
|
|
- for i,a in enumerate(args[nargs:]) )
|
|
|
+ ann[a] if a in ann and isinstance(ann[a], type) else type(dfls[i])
|
|
|
+ for i, a in enumerate(args[nargs:]))
|
|
|
|
|
|
if as_string:
|
|
|
get_type_from_ann = lambda x: 'str or STDIN' if ann[x] == 'sstr' else ann[x].__name__
|
|
|
return ' '.join(
|
|
|
[f'{a} [{get_type_from_ann(a)}]' for a in args[:nargs]] +
|
|
|
- [f'{a} [{dfl_types[n].__name__}={dfls[n]!r}]' for n,a in enumerate(args[nargs:])] )
|
|
|
+ [f'{a} [{dfl_types[n].__name__}={dfls[n]!r}]' for n, a in enumerate(args[nargs:])])
|
|
|
else:
|
|
|
get_type_from_ann = lambda x: 'str' if ann[x] == 'sstr' else ann[x].__name__
|
|
|
return (
|
|
|
- [(a,get_type_from_ann(a)) for a in args[:nargs]], # c_args
|
|
|
- {a:dfls[n] for n,a in enumerate(args[nargs:])}, # c_kwargs
|
|
|
- {a:dfl_types[n] for n,a in enumerate(args[nargs:])}, # c_kwargs_types
|
|
|
+ [(a, get_type_from_ann(a)) for a in args[:nargs]], # c_args
|
|
|
+ {a:dfls[n] for n, a in enumerate(args[nargs:])}, # c_kwargs
|
|
|
+ {a:dfl_types[n] for n, a in enumerate(args[nargs:])}, # c_kwargs_types
|
|
|
('STDIN_OK' if nargs and ann[args[0]] == 'sstr' else flag), # flag
|
|
|
- ann ) # ann
|
|
|
+ ann) # ann
|
|
|
|
|
|
-def process_args(cmd,cmd_args,cls):
|
|
|
- c_args,c_kwargs,c_kwargs_types,flag,_ = create_call_sig(cmd,cls)
|
|
|
+def process_args(cmd, cmd_args, cls):
|
|
|
+ c_args, c_kwargs, c_kwargs_types, flag, _ = create_call_sig(cmd, cls)
|
|
|
have_stdin_input = False
|
|
|
|
|
|
def usage_die(s):
|
|
@@ -230,31 +230,31 @@ def process_args(cmd,cmd_args,cls):
|
|
|
# If we're reading from a pipe, replace '-' with output of previous command
|
|
|
if flag == 'STDIN_OK' and u_args and u_args[0] == '-':
|
|
|
if sys.stdin.isatty():
|
|
|
- die( 'BadFilename', "Standard input is a TTY. Can't use '-' as a filename" )
|
|
|
+ die('BadFilename', "Standard input is a TTY. Can't use '-' as a filename")
|
|
|
else:
|
|
|
from .util2 import parse_bytespec
|
|
|
max_dlen_spec = '10kB' # limit input to 10KB for now
|
|
|
max_dlen = parse_bytespec(max_dlen_spec)
|
|
|
- u_args[0] = os.read(0,max_dlen)
|
|
|
+ u_args[0] = os.read(0, max_dlen)
|
|
|
have_stdin_input = True
|
|
|
if len(u_args[0]) >= max_dlen:
|
|
|
- die(2,f'Maximum data input for this command is {max_dlen_spec}')
|
|
|
+ die(2, f'Maximum data input for this command is {max_dlen_spec}')
|
|
|
if not u_args[0]:
|
|
|
- die(2,f'{cmd}: ERROR: no output from previous command in pipe')
|
|
|
+ die(2, f'{cmd}: ERROR: no output from previous command in pipe')
|
|
|
|
|
|
u_nkwargs = len(cmd_args) - len(c_args)
|
|
|
u_kwargs = {}
|
|
|
if flag == 'VAR_ARGS':
|
|
|
cmd_args = ['dummy_arg'] + cmd_args
|
|
|
- t = [a.split('=',1) for a in cmd_args if '=' in a]
|
|
|
+ t = [a.split('=', 1) for a in cmd_args if '=' in a]
|
|
|
tk = [a[0] for a in t]
|
|
|
tk_bad = [a for a in tk if a not in c_kwargs]
|
|
|
if set(tk_bad) != set(tk[:len(tk_bad)]): # permit non-kw args to contain '='
|
|
|
- die(1,f'{tk_bad[-1]!r}: illegal keyword argument')
|
|
|
+ die(1, f'{tk_bad[-1]!r}: illegal keyword argument')
|
|
|
u_kwargs = dict(t[len(tk_bad):])
|
|
|
u_args = cmd_args[:-len(u_kwargs) or None]
|
|
|
elif u_nkwargs > 0:
|
|
|
- u_kwargs = dict([a.split('=',1) for a in cmd_args[len(c_args):] if '=' in a])
|
|
|
+ u_kwargs = dict([a.split('=', 1) for a in cmd_args[len(c_args):] if '=' in a])
|
|
|
if len(u_kwargs) != u_nkwargs:
|
|
|
usage_die(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}')
|
|
|
if len(u_kwargs) > len(c_kwargs):
|
|
@@ -264,20 +264,20 @@ def process_args(cmd,cmd_args,cls):
|
|
|
if k not in c_kwargs:
|
|
|
usage_die(f'{k!r}: invalid keyword argument')
|
|
|
|
|
|
- def conv_type(arg,arg_name,arg_type):
|
|
|
- if arg_type == 'bytes' and not isinstance(arg,bytes):
|
|
|
- die(1,"'Binary input data must be supplied via STDIN")
|
|
|
+ def conv_type(arg, arg_name, arg_type):
|
|
|
+ if arg_type == 'bytes' and not isinstance(arg, bytes):
|
|
|
+ die(1, "'Binary input data must be supplied via STDIN")
|
|
|
|
|
|
- if have_stdin_input and arg_type == 'str' and isinstance(arg,bytes):
|
|
|
+ if have_stdin_input and arg_type == 'str' and isinstance(arg, bytes):
|
|
|
NL = '\r\n' if sys.platform == 'win32' else '\n'
|
|
|
arg = arg.decode()
|
|
|
if arg[-len(NL):] == NL: # rstrip one newline
|
|
|
arg = arg[:-len(NL)]
|
|
|
|
|
|
if arg_type == 'bool':
|
|
|
- if arg.lower() in ('true','yes','1','on'):
|
|
|
+ if arg.lower() in ('true', 'yes', '1', 'on'):
|
|
|
arg = True
|
|
|
- elif arg.lower() in ('false','no','0','off'):
|
|
|
+ elif arg.lower() in ('false', 'no', '0', 'off'):
|
|
|
arg = False
|
|
|
else:
|
|
|
usage_die(f'{arg!r}: invalid boolean value for keyword argument')
|
|
@@ -285,17 +285,17 @@ def process_args(cmd,cmd_args,cls):
|
|
|
try:
|
|
|
return __builtins__[arg_type](arg)
|
|
|
except:
|
|
|
- die(1,f'{arg!r}: Invalid argument for argument {arg_name} ({arg_type!r} required)')
|
|
|
+ die(1, f'{arg!r}: Invalid argument for argument {arg_name} ({arg_type!r} required)')
|
|
|
|
|
|
if flag == 'VAR_ARGS':
|
|
|
- args = [conv_type(u_args[i],c_args[0][0],c_args[0][1]) for i in range(len(u_args))]
|
|
|
+ args = [conv_type(u_args[i], c_args[0][0], c_args[0][1]) for i in range(len(u_args))]
|
|
|
else:
|
|
|
- args = [conv_type(u_args[i],c_args[i][0],c_args[i][1]) for i in range(len(c_args))]
|
|
|
- kwargs = {k:conv_type(v,k,c_kwargs_types[k].__name__) for k,v in u_kwargs.items()}
|
|
|
+ args = [conv_type(u_args[i], c_args[i][0], c_args[i][1]) for i in range(len(c_args))]
|
|
|
+ kwargs = {k:conv_type(v, k, c_kwargs_types[k].__name__) for k, v in u_kwargs.items()}
|
|
|
|
|
|
- return ( args, kwargs )
|
|
|
+ return (args, kwargs)
|
|
|
|
|
|
-def process_result(ret,pager=False,print_result=False):
|
|
|
+def process_result(ret, pager=False, print_result=False):
|
|
|
"""
|
|
|
Convert result to something suitable for output to screen and return it.
|
|
|
If result is bytes and not convertible to utf8, output as binary using os.write().
|
|
@@ -315,43 +315,43 @@ def process_result(ret,pager=False,print_result=False):
|
|
|
|
|
|
if ret is True:
|
|
|
return True
|
|
|
- elif ret in (False,None):
|
|
|
- die(2,f'tool command returned {ret!r}')
|
|
|
- elif isinstance(ret,str):
|
|
|
+ elif ret in (False, None):
|
|
|
+ die(2, f'tool command returned {ret!r}')
|
|
|
+ elif isinstance(ret, str):
|
|
|
return triage_result(ret)
|
|
|
- elif isinstance(ret,int):
|
|
|
+ elif isinstance(ret, int):
|
|
|
return triage_result(str(ret))
|
|
|
- elif isinstance(ret,tuple):
|
|
|
- return triage_result('\n'.join([r.decode() if isinstance(r,bytes) else r for r in ret]))
|
|
|
- elif isinstance(ret,bytes):
|
|
|
+ elif isinstance(ret, tuple):
|
|
|
+ return triage_result('\n'.join([r.decode() if isinstance(r, bytes) else r for r in ret]))
|
|
|
+ elif isinstance(ret, bytes):
|
|
|
try:
|
|
|
return triage_result(ret.decode())
|
|
|
except:
|
|
|
# don't add NL to binary data if it can't be converted to utf8
|
|
|
- return os.write(1,ret) if print_result else ret
|
|
|
+ return os.write(1, ret) if print_result else ret
|
|
|
else:
|
|
|
- die(2,f'tool.py: can’t handle return value of type {type(ret).__name__!r}')
|
|
|
+ die(2, f'tool.py: can’t handle return value of type {type(ret).__name__!r}')
|
|
|
|
|
|
def get_cmd_cls(cmd):
|
|
|
- for modname,cmdlist in mods.items():
|
|
|
+ for modname, cmdlist in mods.items():
|
|
|
if cmd in cmdlist:
|
|
|
- return getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
|
|
|
+ return getattr(importlib.import_module(f'mmgen.tool.{modname}'), 'tool_cmd')
|
|
|
return False
|
|
|
|
|
|
def get_mod_cls(modname):
|
|
|
- return getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
|
|
|
+ return getattr(importlib.import_module(f'mmgen.tool.{modname}'), 'tool_cmd')
|
|
|
|
|
|
if gc.prog_name.endswith('-tool'):
|
|
|
|
|
|
- cfg = Config( opts_data=opts_data, parse_only=True )
|
|
|
+ cfg = Config(opts_data=opts_data, parse_only=True)
|
|
|
po = cfg._parsed_opts
|
|
|
|
|
|
if po.user_opts.get('list'):
|
|
|
def gen():
|
|
|
- for mod,cmdlist in mods.items():
|
|
|
+ for mod, cmdlist in mods.items():
|
|
|
if mod == 'help':
|
|
|
continue
|
|
|
- yield capfirst( get_mod_cls(mod).__doc__.lstrip().split('\n')[0] ) + ':'
|
|
|
+ yield capfirst(get_mod_cls(mod).__doc__.lstrip().split('\n')[0]) + ':'
|
|
|
for cmd in cmdlist:
|
|
|
yield ' ' + cmd
|
|
|
yield ''
|
|
@@ -366,23 +366,23 @@ if gc.prog_name.endswith('-tool'):
|
|
|
cls = get_cmd_cls(cmd)
|
|
|
|
|
|
if not cls:
|
|
|
- die(1,f'{cmd!r}: no such command')
|
|
|
+ die(1, f'{cmd!r}: no such command')
|
|
|
|
|
|
cfg = Config(
|
|
|
opts_data = opts_data,
|
|
|
parsed_opts = po,
|
|
|
need_proto = cls.need_proto,
|
|
|
init_opts = {'rpc_backend':'aiohttp'} if cmd == 'twimport' else None,
|
|
|
- process_opts = True )
|
|
|
+ process_opts = True)
|
|
|
|
|
|
- cmd,*args = cfg._args
|
|
|
+ cmd, *args = cfg._args
|
|
|
|
|
|
- if cmd in ('help','usage') and args:
|
|
|
+ if cmd in ('help', 'usage') and args:
|
|
|
args[0] = 'command_name=' + args[0]
|
|
|
|
|
|
- args,kwargs = process_args(cmd,args,cls)
|
|
|
+ args, kwargs = process_args(cmd, args, cls)
|
|
|
|
|
|
- ret = getattr(cls(cfg,cmdname=cmd),cmd)(*args,**kwargs)
|
|
|
+ ret = getattr(cls(cfg, cmdname=cmd), cmd)(*args, **kwargs)
|
|
|
|
|
|
if type(ret).__name__ == 'coroutine':
|
|
|
ret = async_run(ret)
|
|
@@ -390,4 +390,4 @@ if gc.prog_name.endswith('-tool'):
|
|
|
process_result(
|
|
|
ret,
|
|
|
pager = kwargs.get('pager'),
|
|
|
- print_result = True )
|
|
|
+ print_result = True)
|