From 341ee2c741c7f2262476f3650c3671a1c5df2972 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Mon, 24 Jan 2022 19:30:13 +0000 Subject: [PATCH] modularize the 'mmgen-tool' utility - each command group now has its own module under the `tool` directory - only modules required by a given command are loaded - code used only by the command-line tool has been relocated to `main_tool.py` --- mmgen/data/version | 2 +- mmgen/help.py | 4 + mmgen/main_tool.py | 320 +++++++-- mmgen/regtest.py | 2 +- mmgen/tool.py | 1121 -------------------------------- mmgen/tool/__init__.py | 4 + mmgen/tool/api.py | 147 +++++ mmgen/tool/coin.py | 186 ++++++ mmgen/tool/common.py | 33 + mmgen/tool/file.py | 108 +++ mmgen/tool/filecrypt.py | 62 ++ mmgen/tool/fileutil.py | 127 ++++ mmgen/tool/help.py | 144 ++++ mmgen/tool/mnemonic.py | 125 ++++ mmgen/tool/rpc.py | 149 +++++ mmgen/tool/util.py | 175 +++++ mmgen/tool/wallet.py | 91 +++ mmgen/wallet.py | 4 +- test/misc/tool_api_test.py | 2 +- test/overlay/__init__.py | 1 + test/test_py_d/ts_ethdev.py | 6 +- test/test_py_d/ts_regtest.py | 4 +- test/test_py_d/ts_tool.py | 4 +- test/test_py_d/ts_xmrwallet.py | 2 +- test/tooltest2.py | 253 +++---- 25 files changed, 1770 insertions(+), 1306 deletions(-) delete mode 100755 mmgen/tool.py create mode 100755 mmgen/tool/__init__.py create mode 100755 mmgen/tool/api.py create mode 100755 mmgen/tool/coin.py create mode 100755 mmgen/tool/common.py create mode 100755 mmgen/tool/file.py create mode 100755 mmgen/tool/filecrypt.py create mode 100755 mmgen/tool/fileutil.py create mode 100755 mmgen/tool/help.py create mode 100755 mmgen/tool/mnemonic.py create mode 100755 mmgen/tool/rpc.py create mode 100755 mmgen/tool/util.py create mode 100755 mmgen/tool/wallet.py diff --git a/mmgen/data/version b/mmgen/data/version index 00986ebc..d3df1164 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -13.1.dev008 +13.1.dev009 diff --git a/mmgen/help.py b/mmgen/help.py index 137840e8..01229d46 100755 --- a/mmgen/help.py +++ b/mmgen/help.py @@ -39,6 +39,10 @@ def help_notes_func(proto,po,k): class help_notes: + def tool_help(): + from .tool.help import main_help + return main_help() + def dfl_subseeds(): from .subseed import SubSeedList return str(SubSeedList.dfl_len) diff --git a/mmgen/main_tool.py b/mmgen/main_tool.py index 826fb3ad..2cbbdfe3 100755 --- a/mmgen/main_tool.py +++ b/mmgen/main_tool.py @@ -21,39 +21,9 @@ mmgen-tool: Perform various MMGen- and cryptocoin-related operations. Part of the MMGen suite """ +import os,importlib from .common import * -def make_cmd_help(): - import mmgen.tool - def do(): - for bc in mmgen.tool.MMGenToolCmds.classes.values(): - cls_doc = bc.__doc__.strip().split('\n') - for l in cls_doc: - if l is cls_doc[0]: - l += ':' - l = l.replace('\t','',1) - if l: - l = l.replace('\t',' ') - yield l[0].upper() + l[1:] - else: - yield '' - yield '' - - max_w = max(map(len,bc.user_commands)) - for name,code in sorted(bc.user_commands.items()): - if code.__doc__: - yield ' {:{}} - {}'.format( - name, - max_w, - pretty_format( - code.__doc__.strip().replace('\n\t\t',' '), - width = 79-(max_w+7), - pfx = ' '*(max_w+5)).lstrip() - ) - yield '' - - return '\n'.join(do()) - opts_data = { 'text': { 'desc': f'Perform various {g.proj_name}- and cryptocoin-related operations', @@ -91,32 +61,288 @@ Type '{pn} help ' for help on a particular command coin_id=help_notes('coin_id'), g=g, ), - 'notes': lambda s: s.format( - ch=make_cmd_help(), + 'notes': lambda s, help_notes: s.format( + ch=help_notes('tool_help'), pn=g.prog_name) } } -cmd_args = opts.init(opts_data) +mods = { + 'help': ( + 'help', + 'usage', + ), + 'util': ( + 'bytespec', + 'to_bytespec', + 'randhex', + 'hexreverse', + 'hexlify', + 'unhexlify', + 'hexdump', + 'unhexdump', + 'hash160', + 'hash256', + 'id6', + 'str2id6', + 'id8', + 'randb58', + 'bytestob58', + 'b58tobytes', + 'hextob58', + 'b58tohex', + 'hextob58chk', + 'b58chktohex', + 'hextob32', + 'b32tohex', + 'hextob6d', + 'b6dtohex', + ), + 'coin': ( + 'randwif', + 'randpair', + 'wif2hex', + 'hex2wif', + 'wif2addr', + 'wif2redeem_script', + 'wif2segwit_pair', + 'privhex2addr', + 'privhex2pubhex', + 'pubhex2addr', + 'pubhex2redeem_script', + 'redeem_script2addr', + 'pubhash2addr', + 'addr2pubhash', + 'addr2scriptpubkey', + 'scriptpubkey2addr', + ), + 'mnemonic': ( + 'mn_rand128', + 'mn_rand192', + 'mn_rand256', + 'hex2mn', + 'mn2hex', + 'mn2hex_interactive', + 'mn_stats', + 'mn_printlist', + ), + 'file': ( + 'addrfile_chksum', + 'keyaddrfile_chksum', + 'passwdfile_chksum', + 'txview', + ), + 'filecrypt': ( + 'encrypt', + 'decrypt', + ), + 'fileutil': ( + 'find_incog_data', + 'rand2file', + ), + 'wallet': ( + 'get_subseed', + 'get_subseed_by_seed_id', + 'list_subseeds', + 'list_shares', + 'gen_key', + 'gen_addr', + ), + 'rpc': ( + 'daemon_version', + 'getbalance', + 'listaddress', + 'listaddresses', + 'twview', + 'add_label', + 'remove_label', + 'remove_address', + ), +} -if len(cmd_args) < 1: - opts.usage() +def create_call_sig(cmd,cls,parsed=False): -cmd = cmd_args.pop(0) + m = getattr(cls,cmd) -import mmgen.tool as tool + if 'varargs_call_sig' in m.__code__.co_varnames: # hack + flag = 'VAR_ARGS' + va = m.__defaults__[0] + args,dfls,ann = va['args'],va['dfls'],va['annots'] + else: + flag = None + args = m.__code__.co_varnames[1:m.__code__.co_argcount] + dfls = m.__defaults__ or () + ann = m.__annotations__ -if cmd in ('help','usage') and cmd_args: - cmd_args[0] = 'command_name=' + cmd_args[0] + nargs = len(args) - len(dfls) -if cmd not in tool.MMGenToolCmds: - die(1,f'{cmd!r}: no such command') + def get_type_from_ann(arg): + return ann[arg][1:] + (' or STDIN','')[parsed] if ann[arg] == 'sstr' else ann[arg].__name__ -args,kwargs = tool._process_args(cmd,cmd_args) + if parsed: + c_args = [(a,get_type_from_ann(a)) for a in args[:nargs]] + c_kwargs = [(a,dfls[n]) for n,a in enumerate(args[nargs:])] + return ( + c_args, + dict(c_kwargs), + 'STDIN_OK' if c_args and ann[args[0]] == 'sstr' else flag ) + else: + c_args = [f'{a} [{get_type_from_ann(a)}]' for a in args[:nargs]] + c_kwargs = ['"{}" [{}={!r}{}]'.format( + a, + type(dfls[n]).__name__, dfls[n], + (' ' + ann[a] if a in ann else '') ) + for n,a in enumerate(args[nargs:])] + return ' '.join(c_args + c_kwargs) -ret = tool.MMGenToolCmds.call(cmd,*args,**kwargs) +def process_args(cmd,cmd_args,cls): + c_args,c_kwargs,flag = create_call_sig(cmd,cls,parsed=True) + have_stdin_input = False -if type(ret).__name__ == 'coroutine': - ret = run_session(ret) + def usage_die(s): + msg(s) + from .tool.help import usage + usage(cmd) -tool._process_result(ret,pager='pager' in kwargs and kwargs['pager'],print_result=True) + if flag != 'VAR_ARGS': + if len(cmd_args) < len(c_args): + usage_die(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}') + + u_args = cmd_args[:len(c_args)] + + # If we're reading from a pipe, replace '-' with output of previous command + if flag == 'STDIN_OK' and u_args and u_args[0] == '-': + if sys.stdin.isatty(): + raise BadFilename("Standard input is a TTY. Can't use '-' as a filename") + else: + max_dlen_spec = '10kB' # limit input to 10KB for now + max_dlen = parse_bytespec(max_dlen_spec) + u_args[0] = os.read(0,max_dlen) + have_stdin_input = True + if len(u_args[0]) >= max_dlen: + die(2,f'Maximum data input for this command is {max_dlen_spec}') + if not u_args[0]: + die(2,f'{cmd}: ERROR: no output from previous command in pipe') + + u_nkwargs = len(cmd_args) - len(c_args) + u_kwargs = {} + if flag == 'VAR_ARGS': + t = [a.split('=',1) for a in cmd_args if '=' in a] + tk = [a[0] for a in t] + tk_bad = [a for a in tk if a not in c_kwargs] + if set(tk_bad) != set(tk[:len(tk_bad)]): # permit non-kw args to contain '=' + die(1,f'{tk_bad[-1]!r}: illegal keyword argument') + u_kwargs = dict(t[len(tk_bad):]) + u_args = cmd_args[:-len(u_kwargs) or None] + elif u_nkwargs > 0: + u_kwargs = dict([a.split('=',1) for a in cmd_args[len(c_args):] if '=' in a]) + if len(u_kwargs) != u_nkwargs: + usage_die(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}') + if len(u_kwargs) > len(c_kwargs): + usage_die(f'Command accepts no more than {len(c_kwargs)} keyword argument{suf(c_kwargs)}') + + for k in u_kwargs: + if k not in c_kwargs: + usage_die(f'{k!r}: invalid keyword argument') + + def conv_type(arg,arg_name,arg_type): + if arg_type == 'bytes' and type(arg) != bytes: + die(1,"'Binary input data must be supplied via STDIN") + + if have_stdin_input and arg_type == 'str' and isinstance(arg,bytes): + from .globalvars import g + NL = '\r\n' if g.platform == 'win' else '\n' + arg = arg.decode() + if arg[-len(NL):] == NL: # rstrip one newline + arg = arg[:-len(NL)] + + if arg_type == 'bool': + if arg.lower() in ('true','yes','1','on'): + arg = True + elif arg.lower() in ('false','no','0','off'): + arg = False + else: + usage_die(f'{arg!r}: invalid boolean value for keyword argument') + + try: + return __builtins__[arg_type](arg) + except: + die(1,f'{arg!r}: Invalid argument for argument {arg_name} ({arg_type!r} required)') + + if flag == 'VAR_ARGS': + args = [conv_type(u_args[i],c_args[0][0],c_args[0][1]) for i in range(len(u_args))] + else: + args = [conv_type(u_args[i],c_args[i][0],c_args[i][1]) for i in range(len(c_args))] + kwargs = {k:conv_type(u_kwargs[k],k,type(c_kwargs[k]).__name__) for k in u_kwargs} + + return ( args, kwargs ) + +def process_result(ret,pager=False,print_result=False): + from .util import Msg,ydie,parse_bytespec + """ + Convert result to something suitable for output to screen and return it. + If result is bytes and not convertible to utf8, output as binary using os.write(). + If 'print_result' is True, send the converted result directly to screen or + pager instead of returning it. + """ + def triage_result(o): + return o if not print_result else do_pager(o) if pager else Msg(o) + + if ret == True: + return True + elif ret in (False,None): + ydie(1,f'tool command returned {ret!r}') + elif isinstance(ret,str): + return triage_result(ret) + elif isinstance(ret,int): + return triage_result(str(ret)) + elif isinstance(ret,tuple): + return triage_result('\n'.join([r.decode() if isinstance(r,bytes) else r for r in ret])) + elif isinstance(ret,bytes): + try: + o = ret.decode() + return o if not print_result else do_pager(o) if pager else Msg(o) + except: + # don't add NL to binary data if it can't be converted to utf8 + if print_result: + return os.write(1,ret) + else: + return ret + else: + ydie(1,f'tool.py: can’t handle return value of type {type(ret).__name__!r}') + +def get_cmd_cls(cmd): + for modname,cmdlist in mods.items(): + if cmd in cmdlist: + return getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd') + else: + return False + +def get_mod_cls(modname): + return getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd') + +if g.prog_name == 'mmgen-tool' and not opt._lock: + + cmd_args = opts.init(opts_data) + + if len(cmd_args) < 1: + opts.usage() + + cmd = cmd_args.pop(0) + + if cmd in ('help','usage') and cmd_args: + cmd_args[0] = 'command_name=' + cmd_args[0] + + cls = get_cmd_cls(cmd) + + if not cls: + die(1,f'{cmd!r}: no such command') + + args,kwargs = process_args(cmd,cmd_args,cls) + + ret = getattr(cls(),cmd)(*args,**kwargs) + + if type(ret).__name__ == 'coroutine': + ret = run_session(ret) + + process_result(ret,pager='pager' in kwargs and kwargs['pager'],print_result=True) diff --git a/mmgen/regtest.py b/mmgen/regtest.py index 3255b6ea..9d24d008 100755 --- a/mmgen/regtest.py +++ b/mmgen/regtest.py @@ -44,7 +44,7 @@ def create_hdseed(proto): # addr=bcrt1qaq8t3pakcftpk095tnqfv5cmmczysls024atnd # cTEkSYCWKvNo757uwFPd4yuCXsbZvfJDipHsHWFRapXpnikMHvgn label= # addr=bcrt1q537rgyctcqdgs8nm8gvku05znka4h2m00lx8ps hdkeypath=m/0'/0'/0' - from .tool import tool_api + from .tool.api import tool_api t = tool_api() t.init_coin(proto.coin,proto.network) t.addrtype = 'bech32' diff --git a/mmgen/tool.py b/mmgen/tool.py deleted file mode 100755 index f91f9939..00000000 --- a/mmgen/tool.py +++ /dev/null @@ -1,1121 +0,0 @@ -#!/usr/bin/env python3 -# -# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution -# Copyright (C)2013-2022 The MMGen Project -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -tool.py: Routines for the 'mmgen-tool' utility -""" - -from .common import * -from .protocol import hash160 -from .fileutil import get_seed_file,get_data_from_file,write_data_to_file -from .crypto import get_random,aesctr_iv_len,mmgen_encrypt,mmgen_decrypt,mmenc_ext -from .key import PrivKey -from .subseed import SubSeedList -from .seedsplit import MasterShareIdx -from .addr import * -from .addrlist import AddrList,KeyAddrList,AddrIdxList -from .passwdlist import PasswordList -from .baseconv import baseconv -from .xmrseed import xmrseed -from .bip39 import bip39 -from .tw import TwCommon - -NL = ('\n','\r\n')[g.platform=='win'] - -def _options_annot_str(l): - return "(valid options: '{}')".format( "','".join(l) ) - -def _create_argtuple(method,localvars): - co = method.__code__ - args = co.co_varnames[1:co.co_argcount] - return namedtuple('cmd_args',args)(*(localvars[a] for a in args)) - -def _create_call_sig(cmd,parsed=False): - - m = MMGenToolCmds[cmd] - - if 'varargs_call_sig' in m.__code__.co_varnames: # hack - flag = 'VAR_ARGS' - va = m.__defaults__[0] - args,dfls,ann = va['args'],va['dfls'],va['annots'] - else: - flag = None - args = m.__code__.co_varnames[1:m.__code__.co_argcount] - dfls = m.__defaults__ or () - ann = m.__annotations__ - - nargs = len(args) - len(dfls) - - def get_type_from_ann(arg): - return ann[arg][1:] + (' or STDIN','')[parsed] if ann[arg] == 'sstr' else ann[arg].__name__ - - if parsed: - c_args = [(a,get_type_from_ann(a)) for a in args[:nargs]] - c_kwargs = [(a,dfls[n]) for n,a in enumerate(args[nargs:])] - return c_args,dict(c_kwargs),'STDIN_OK' if c_args and ann[args[0]] == 'sstr' else flag - else: - c_args = [f'{a} [{get_type_from_ann(a)}]' for a in args[:nargs]] - c_kwargs = ['"{}" [{}={!r}{}]'.format( - a, - type(dfls[n]).__name__, dfls[n], - (' ' + ann[a] if a in ann else '') ) - for n,a in enumerate(args[nargs:])] - return ' '.join(c_args + c_kwargs) - -def _usage(cmd=None,exit_val=1): - - m1=('USAGE INFORMATION FOR MMGEN-TOOL COMMANDS:\n\n' - ' Unquoted arguments are mandatory\n' - ' Quoted arguments are optional, default values will be used\n' - ' Argument types and default values are shown in square brackets\n') - - m2=(' To force a command to read from STDIN instead of file (for commands taking\n' - ' a filename as their first argument), substitute "-" for the filename.\n\n' - 'EXAMPLES:\n\n' - ' Generate a random Bech32 public/private keypair for LTC:\n' - ' $ mmgen-tool -r0 --coin=ltc --type=bech32 randpair\n\n' - ' Generate a DASH compressed public key address from the supplied WIF key:\n' - ' $ mmgen-tool --coin=dash --type=compressed wif2addr XJkVRC3eGKurc9Uzx1wfQoio3yqkmaXVqLMTa6y7s3M3jTBnmxfw\n\n' - ' Generate a well-known burn address:\n' - ' $ mmgen-tool hextob58chk 000000000000000000000000000000000000000000\n\n' - ' Generate a random 12-word seed phrase:\n' - ' $ mmgen-tool -r0 mn_rand128\n\n' - ' Same as above, but get additional entropy from user:\n' - ' $ mmgen-tool mn_rand128\n\n' - ' Encode bytes from a file to base 58:\n' - ' $ mmgen-tool bytestob58 /etc/timezone pad=20\n\n' - ' Reverse a hex string:\n' - ' $ mmgen-tool hexreverse "deadbeefcafe"\n\n' - ' Same as above, but use a pipe:\n' - ' $ echo "deadbeefcafe" | mmgen-tool hexreverse -') - - if not cmd: - Msg(m1) - for bc in MMGenToolCmds.classes.values(): - cls_info = bc.__doc__.strip().split('\n')[0] - Msg(' {}{}\n'.format(cls_info[0].upper(),cls_info[1:])) - max_w = max(map(len,bc.user_commands)) - for cmd in sorted(bc.user_commands): - Msg(f' {cmd:{max_w}} {_create_call_sig(cmd)}') - Msg('') - Msg(m2) - elif cmd in MMGenToolCmds: - p1 = fmt(capfirst(MMGenToolCmds[cmd].__doc__.strip()),strip_char='\t').strip() - msg('{}{}\nUSAGE: {} {} {}'.format( - p1, - ('\n' if '\n' in p1 else ''), - g.prog_name,cmd, - _create_call_sig(cmd)) - ) - else: - die(1,f'{cmd!r}: no such tool command') - - sys.exit(exit_val) - -def _process_args(cmd,cmd_args): - c_args,c_kwargs,flag = _create_call_sig(cmd,parsed=True) - have_stdin_input = False - - if flag != 'VAR_ARGS': - if len(cmd_args) < len(c_args): - msg(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}') - _usage(cmd) - - u_args = cmd_args[:len(c_args)] - - # If we're reading from a pipe, replace '-' with output of previous command - if flag == 'STDIN_OK' and u_args and u_args[0] == '-': - if sys.stdin.isatty(): - raise BadFilename("Standard input is a TTY. Can't use '-' as a filename") - else: - max_dlen_spec = '10kB' # limit input to 10KB for now - max_dlen = MMGenToolCmdUtil().bytespec(max_dlen_spec) - u_args[0] = os.read(0,max_dlen) - have_stdin_input = True - if len(u_args[0]) >= max_dlen: - die(2,f'Maximum data input for this command is {max_dlen_spec}') - if not u_args[0]: - die(2,f'{cmd}: ERROR: no output from previous command in pipe') - - u_nkwargs = len(cmd_args) - len(c_args) - u_kwargs = {} - if flag == 'VAR_ARGS': - t = [a.split('=',1) for a in cmd_args if '=' in a] - tk = [a[0] for a in t] - tk_bad = [a for a in tk if a not in c_kwargs] - if set(tk_bad) != set(tk[:len(tk_bad)]): # permit non-kw args to contain '=' - die(1,f'{tk_bad[-1]!r}: illegal keyword argument') - u_kwargs = dict(t[len(tk_bad):]) - u_args = cmd_args[:-len(u_kwargs) or None] - elif u_nkwargs > 0: - u_kwargs = dict([a.split('=',1) for a in cmd_args[len(c_args):] if '=' in a]) - if len(u_kwargs) != u_nkwargs: - msg(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}') - _usage(cmd) - if len(u_kwargs) > len(c_kwargs): - msg(f'Command accepts no more than {len(c_kwargs)} keyword argument{suf(c_kwargs)}') - _usage(cmd) - - for k in u_kwargs: - if k not in c_kwargs: - msg(f'{k!r}: invalid keyword argument') - _usage(cmd) - - def conv_type(arg,arg_name,arg_type): - if arg_type == 'bytes' and type(arg) != bytes: - die(1,"'Binary input data must be supplied via STDIN") - - if have_stdin_input and arg_type == 'str' and isinstance(arg,bytes): - arg = arg.decode() - if arg[-len(NL):] == NL: # rstrip one newline - arg = arg[:-len(NL)] - - if arg_type == 'bool': - if arg.lower() in ('true','yes','1','on'): arg = True - elif arg.lower() in ('false','no','0','off'): arg = False - else: - msg(f'{arg!r}: invalid boolean value for keyword argument') - _usage(cmd) - - try: - return __builtins__[arg_type](arg) - except: - die(1,f'{arg!r}: Invalid argument for argument {arg_name} ({arg_type!r} required)') - - if flag == 'VAR_ARGS': - args = [conv_type(u_args[i],c_args[0][0],c_args[0][1]) for i in range(len(u_args))] - else: - args = [conv_type(u_args[i],c_args[i][0],c_args[i][1]) for i in range(len(c_args))] - kwargs = {k:conv_type(u_kwargs[k],k,type(c_kwargs[k]).__name__) for k in u_kwargs} - - return args,kwargs - -def _process_result(ret,pager=False,print_result=False): - """ - Convert result to something suitable for output to screen and return it. - If result is bytes and not convertible to utf8, output as binary using os.write(). - If 'print_result' is True, send the converted result directly to screen or - pager instead of returning it. - """ - def triage_result(o): - return o if not print_result else do_pager(o) if pager else Msg(o) - - if ret == True: - return True - elif ret in (False,None): - ydie(1,f'tool command returned {ret!r}') - elif isinstance(ret,str): - return triage_result(ret) - elif isinstance(ret,int): - return triage_result(str(ret)) - elif isinstance(ret,tuple): - return triage_result('\n'.join([r.decode() if isinstance(r,bytes) else r for r in ret])) - elif isinstance(ret,bytes): - try: - o = ret.decode() - return o if not print_result else do_pager(o) if pager else Msg(o) - except: - # don't add NL to binary data if it can't be converted to utf8 - return ret if not print_result else os.write(1,ret) - else: - ydie(1,f'tool.py: can’t handle return value of type {type(ret).__name__!r}') - -dfl_mnemonic_fmt = 'mmgen' -mft = namedtuple('mnemonic_format',['fmt','pad','conv_cls']) -mnemonic_fmts = { - 'mmgen': mft( 'words', 'seed', baseconv ), - 'bip39': mft( 'bip39', None, bip39 ), - 'xmrseed': mft( 'xmrseed', None, xmrseed ), -} -mn_opts_disp = _options_annot_str(mnemonic_fmts) - -class MMGenToolCmdMeta(type): - classes = {} - methods = {} - def __new__(mcls,name,bases,namespace): - methods = { k:v for k,v in namespace.items() if k[0] != '_' and callable(v) } - cls = super().__new__(mcls,name,bases,namespace) - if bases and name != 'tool_api': - mcls.classes[name] = cls - mcls.methods.update(methods) - return cls - - def __iter__(cls): - return cls.methods.__iter__() - - def __getitem__(cls,val): - return cls.methods.__getitem__(val) - - def __contains__(cls,val): - return cls.methods.__contains__(val) - - def classname(cls,cmd_name): - return cls.methods[cmd_name].__qualname__.split('.')[0] - - def call(cls,cmd_name,*args,**kwargs): - return getattr(cls.classes[cls.classname(cmd_name)](),cmd_name)(*args,**kwargs) - - @property - def user_commands(cls): - return {k:v for k,v in cls.__dict__.items() if k in cls.methods} - -# all non-user-visible methods must begin with an underscore! -class MMGenToolCmds(metaclass=MMGenToolCmdMeta): - - def __init__(self,proto=None,mmtype=None): - from .protocol import init_proto_from_opts - self.proto = proto or init_proto_from_opts() - self.mmtype = MMGenAddrType( - self.proto, - mmtype or opt.type or self.proto.dfl_mmtype ) - if g.token: - self.proto.tokensym = g.token.upper() - - def _init_generators(self,arg=None): - gd = namedtuple('generator_data',['at','kg','ag']) - - at = MMGenAddrType( - proto = self.proto, - id_str = self.mmtype ) - - if arg == 'addrtype_only': - return gd(at,None,None) - else: - return gd( - at, - KeyGenerator(self.proto,at.pubkey_type), - AddrGenerator(self.proto,at), - ) - - -class MMGenToolCmdMisc(MMGenToolCmds): - "miscellaneous commands" - - def help(self,command_name=''): - "display usage information for a single command or all commands" - _usage(command_name,exit_val=0) - - usage = help - -class MMGenToolCmdUtil(MMGenToolCmds): - "general string conversion and hashing utilities" - - def bytespec(self,dd_style_byte_specifier:str): - "convert a byte specifier such as '1GB' into an integer" - return parse_bytespec(dd_style_byte_specifier) - - def to_bytespec(self, - n: int, - dd_style_byte_specifier: str, - fmt = '0.2', - print_sym = True ): - "convert an integer to a byte specifier such as '1GB'" - return int2bytespec(n,dd_style_byte_specifier,fmt,print_sym) - - def randhex(self,nbytes='32'): - "print 'n' bytes (default 32) of random data in hex format" - return get_random(int(nbytes)).hex() - - def hexreverse(self,hexstr:'sstr'): - "reverse bytes of a hexadecimal string" - return bytes.fromhex(hexstr.strip())[::-1].hex() - - def hexlify(self,infile:str): - "convert bytes in file to hexadecimal (use '-' for stdin)" - data = get_data_from_file(infile,dash=True,quiet=True,binary=True) - return data.hex() - - def unhexlify(self,hexstr:'sstr'): - "convert hexadecimal value to bytes (warning: outputs binary data)" - return bytes.fromhex(hexstr) - - def hexdump(self,infile:str,cols=8,line_nums='hex'): - "create hexdump of data from file (use '-' for stdin)" - data = get_data_from_file(infile,dash=True,quiet=True,binary=True) - return pretty_hexdump(data,cols=cols,line_nums=line_nums).rstrip() - - def unhexdump(self,infile:str): - "decode hexdump from file (use '-' for stdin) (warning: outputs binary data)" - if g.platform == 'win': - import msvcrt - msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY) - hexdata = get_data_from_file(infile,dash=True,quiet=True) - return decode_pretty_hexdump(hexdata) - - def hash160(self,hexstr:'sstr'): - "compute ripemd160(sha256(data)) (convert hex pubkey to hex addr)" - return hash160(bytes.fromhex(hexstr)).hex() - - def hash256(self,string_or_bytes:str,file_input=False,hex_input=False): # TODO: handle stdin - "compute sha256(sha256(data)) (double sha256)" - from hashlib import sha256 - if file_input: b = get_data_from_file(string_or_bytes,binary=True) - elif hex_input: b = decode_pretty_hexdump(string_or_bytes) - else: b = string_or_bytes - return sha256(sha256(b.encode()).digest()).hexdigest() - - def id6(self,infile:str): - "generate 6-character MMGen ID for a file (use '-' for stdin)" - return make_chksum_6( - get_data_from_file(infile,dash=True,quiet=True,binary=True)) - - def str2id6(self,string:'sstr'): # retain ignoring of space for backwards compat - "generate 6-character MMGen ID for a string, ignoring spaces" - return make_chksum_6(''.join(string.split())) - - def id8(self,infile:str): - "generate 8-character MMGen ID for a file (use '-' for stdin)" - return make_chksum_8( - get_data_from_file(infile,dash=True,quiet=True,binary=True)) - - def randb58(self,nbytes=32,pad=0): - "generate random data (default: 32 bytes) and convert it to base 58" - return baseconv('b58').frombytes(get_random(nbytes),pad=pad,tostr=True) - - def bytestob58(self,infile:str,pad=0): - "convert bytes to base 58 (supply data via STDIN)" - data = get_data_from_file(infile,dash=True,quiet=True,binary=True) - return baseconv('b58').frombytes(data,pad=pad,tostr=True) - - def b58tobytes(self,b58num:'sstr',pad=0): - "convert a base 58 number to bytes (warning: outputs binary data)" - return baseconv('b58').tobytes(b58num,pad=pad) - - def hextob58(self,hexstr:'sstr',pad=0): - "convert a hexadecimal number to base 58" - return baseconv('b58').fromhex(hexstr,pad=pad,tostr=True) - - def b58tohex(self,b58num:'sstr',pad=0): - "convert a base 58 number to hexadecimal" - return baseconv('b58').tohex(b58num,pad=pad) - - def hextob58chk(self,hexstr:'sstr'): - "convert a hexadecimal number to base58-check encoding" - from .protocol import _b58chk_encode - return _b58chk_encode(bytes.fromhex(hexstr)) - - def b58chktohex(self,b58chk_num:'sstr'): - "convert a base58-check encoded number to hexadecimal" - from .protocol import _b58chk_decode - return _b58chk_decode(b58chk_num).hex() - - def hextob32(self,hexstr:'sstr',pad=0): - "convert a hexadecimal number to MMGen's flavor of base 32" - return baseconv('b32').fromhex(hexstr,pad,tostr=True) - - def b32tohex(self,b32num:'sstr',pad=0): - "convert an MMGen-flavor base 32 number to hexadecimal" - return baseconv('b32').tohex(b32num.upper(),pad) - - def hextob6d(self,hexstr:'sstr',pad=0,add_spaces=True): - "convert a hexadecimal number to die roll base6 (base6d)" - ret = baseconv('b6d').fromhex(hexstr,pad,tostr=True) - return block_format(ret,gw=5,cols=None).strip() if add_spaces else ret - - def b6dtohex(self,b6d_num:'sstr',pad=0): - "convert a die roll base6 (base6d) number to hexadecimal" - return baseconv('b6d').tohex(remove_whitespace(b6d_num),pad) - -class MMGenToolCmdCoin(MMGenToolCmds): - """ - cryptocoin key/address utilities - - May require use of the '--coin', '--type' and/or '--testnet' options - - Examples: - mmgen-tool --coin=ltc --type=bech32 wif2addr - mmgen-tool --coin=zec --type=zcash_z randpair - """ - def randwif(self): - "generate a random private key in WIF format" - gd = self._init_generators('addrtype_only') - return PrivKey( - self.proto, - get_random(32), - pubkey_type = gd.at.pubkey_type, - compressed = gd.at.compressed ).wif - - def randpair(self): - "generate a random private key/address pair" - gd = self._init_generators() - privkey = PrivKey( - self.proto, - get_random(32), - pubkey_type = gd.at.pubkey_type, - compressed = gd.at.compressed ) - addr = gd.ag.to_addr(gd.kg.gen_data(privkey)) - return ( privkey.wif, addr ) - - def wif2hex(self,wifkey:'sstr'): - "convert a private key from WIF to hex format" - return PrivKey( - self.proto, - wif = wifkey ).hex() - - def hex2wif(self,privhex:'sstr'): - "convert a private key from hex to WIF format" - gd = self._init_generators('addrtype_only') - return PrivKey( - self.proto, - bytes.fromhex(privhex), - pubkey_type = gd.at.pubkey_type, - compressed = gd.at.compressed ).wif - - def wif2addr(self,wifkey:'sstr'): - "generate a coin address from a key in WIF format" - gd = self._init_generators() - privkey = PrivKey( - self.proto, - wif = wifkey ) - addr = gd.ag.to_addr(gd.kg.gen_data(privkey)) - return addr - - def wif2redeem_script(self,wifkey:'sstr'): # new - "convert a WIF private key to a Segwit P2SH-P2WPKH redeem script" - assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' - gd = self._init_generators() - privkey = PrivKey( - self.proto, - wif = wifkey ) - return gd.ag.to_segwit_redeem_script(gd.kg.gen_data(privkey)) - - def wif2segwit_pair(self,wifkey:'sstr'): - "generate both a Segwit P2SH-P2WPKH redeem script and address from WIF" - assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' - gd = self._init_generators() - data = gd.kg.gen_data(PrivKey( - self.proto, - wif = wifkey )) - return ( - gd.ag.to_segwit_redeem_script(data), - gd.ag.to_addr(data) ) - - def privhex2addr(self,privhex:'sstr',output_pubhex=False): - "generate coin address from raw private key data in hexadecimal format" - gd = self._init_generators() - pk = PrivKey( - self.proto, - bytes.fromhex(privhex), - compressed = gd.at.compressed, - pubkey_type = gd.at.pubkey_type ) - data = gd.kg.gen_data(pk) - return data.pubkey.hex() if output_pubhex else gd.ag.to_addr(data) - - def privhex2pubhex(self,privhex:'sstr'): # new - "generate a hex public key from a hex private key" - return self.privhex2addr(privhex,output_pubhex=True) - - def pubhex2addr(self,pubkeyhex:'sstr'): - "convert a hex pubkey to an address" - pubkey = bytes.fromhex(pubkeyhex) - if self.mmtype.name == 'segwit': - return self.proto.pubkey2segwitaddr( pubkey ) - else: - return self.pubhash2addr( hash160(pubkey).hex() ) - - def pubhex2redeem_script(self,pubkeyhex:'sstr'): # new - "convert a hex pubkey to a Segwit P2SH-P2WPKH redeem script" - assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' - return self.proto.pubkey2redeem_script( bytes.fromhex(pubkeyhex) ).hex() - - def redeem_script2addr(self,redeem_scripthex:'sstr'): # new - "convert a Segwit P2SH-P2WPKH redeem script to an address" - assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit' - assert redeem_scripthex[:4] == '0014', f'{redeem_scripthex!r}: invalid redeem script' - assert len(redeem_scripthex) == 44, f'{len(redeem_scripthex)//2} bytes: invalid redeem script length' - return self.pubhash2addr( hash160(bytes.fromhex(redeem_scripthex)).hex() ) - - def pubhash2addr(self,pubhashhex:'sstr'): - "convert public key hash to address" - pubhash = bytes.fromhex(pubhashhex) - if self.mmtype.name == 'bech32': - return self.proto.pubhash2bech32addr( pubhash ) - else: - gd = self._init_generators('addrtype_only') - return self.proto.pubhash2addr( pubhash, gd.at.addr_fmt=='p2sh' ) - - def addr2pubhash(self,addr:'sstr'): - "convert coin address to public key hash" - from .tx import addr2pubhash - return addr2pubhash(self.proto,CoinAddr(self.proto,addr)) - - def addr2scriptpubkey(self,addr:'sstr'): - "convert coin address to scriptPubKey" - from .tx import addr2scriptPubKey - return addr2scriptPubKey(self.proto,CoinAddr(self.proto,addr)) - - def scriptpubkey2addr(self,hexstr:'sstr'): - "convert scriptPubKey to coin address" - from .tx import scriptPubKey2addr - return scriptPubKey2addr(self.proto,hexstr)[0] - -class MMGenToolCmdMnemonic(MMGenToolCmds): - """ - seed phrase utilities (valid formats: 'mmgen' (default), 'bip39', 'xmrseed') - - IMPORTANT NOTE: MMGen's default seed phrase format uses the Electrum - wordlist, however seed phrases are computed using a different algorithm - and are NOT Electrum-compatible! - - BIP39 support is fully compatible with the standard, allowing users to - import and export seed entropy from BIP39-compatible wallets. However, - users should be aware that BIP39 support does not imply BIP32 support! - MMGen uses its own key derivation scheme differing from the one described - by the BIP32 protocol. - - For Monero ('xmrseed') seed phrases, input data is reduced to a spendkey - before conversion so that a canonical seed phrase is produced. This is - required because Monero seeds, unlike ordinary wallet seeds, are tied - to a concrete key/address pair. To manually generate a Monero spendkey, - use the 'hex2wif' command. - """ - - @staticmethod - def _xmr_reduce(bytestr): - from .protocol import init_proto - proto = init_proto('xmr') - if len(bytestr) != proto.privkey_len: - die(1,'{!r}: invalid bit length for Monero private key (must be {})'.format( - len(bytestr*8), - proto.privkey_len*8 )) - return proto.preprocess_key(bytestr,None) - - def _do_random_mn(self,nbytes:int,fmt:str): - assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32' - randbytes = get_random(nbytes) - if fmt == 'xmrseed': - randbytes = self._xmr_reduce(randbytes) - if opt.verbose: - msg(f'Seed: {randbytes.hex()}') - return self.hex2mn(randbytes.hex(),fmt=fmt) - - def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): - "generate random 128-bit mnemonic seed phrase" - return self._do_random_mn(16,fmt) - - def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): - "generate random 192-bit mnemonic seed phrase" - return self._do_random_mn(24,fmt) - - def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): - "generate random 256-bit mnemonic seed phrase" - return self._do_random_mn(32,fmt) - - def hex2mn( self, hexstr:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ): - "convert a 16, 24 or 32-byte hexadecimal number to a mnemonic seed phrase" - if fmt == 'xmrseed': - hexstr = self._xmr_reduce(bytes.fromhex(hexstr)).hex() - f = mnemonic_fmts[fmt] - return ' '.join( f.conv_cls(fmt).fromhex(hexstr,f.pad) ) - - def mn2hex( self, seed_mnemonic:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ): - "convert a mnemonic seed phrase to a hexadecimal number" - f = mnemonic_fmts[fmt] - return f.conv_cls(fmt).tohex( seed_mnemonic.split(), f.pad ) - - def mn2hex_interactive( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, mn_len=24, print_mn=False ): - "convert an interactively supplied mnemonic seed phrase to a hexadecimal number" - from .mn_entry import mn_entry - mn = mn_entry(fmt).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len,validate=False) - if print_mn: - msg(mn) - return self.mn2hex(seed_mnemonic=mn,fmt=fmt) - - def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): - "show stats for mnemonic wordlist" - return mnemonic_fmts[fmt].conv_cls(fmt).check_wordlist() - - def mn_printlist( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, enum=False, pager=False ): - "print mnemonic wordlist" - ret = mnemonic_fmts[fmt].conv_cls(fmt).get_wordlist() - if enum: - ret = [f'{n:>4} {e}' for n,e in enumerate(ret)] - return '\n'.join(ret) - -class MMGenToolCmdFile(MMGenToolCmds): - "utilities for viewing/checking MMGen address and transaction files" - - def _file_chksum(self,mmgen_addrfile,objname): - verbose,yes,quiet = [bool(i) for i in (opt.verbose,opt.yes,opt.quiet)] - opt.verbose,opt.yes,opt.quiet = (False,True,True) - ret = globals()[objname](self.proto,mmgen_addrfile) - opt.verbose,opt.yes,opt.quiet = (verbose,yes,quiet) - if verbose: - if ret.al_id.mmtype.name == 'password': - msg('Passwd fmt: {}\nPasswd len: {}\nID string: {}'.format( - capfirst(ret.pw_info[ret.pw_fmt].desc), - ret.pw_len, - ret.pw_id_str )) - else: - msg(f'Base coin: {ret.base_coin} {capfirst(ret.network)}') - msg(f'MMType: {capfirst(ret.al_id.mmtype.name)}') - msg( f'List length: {len(ret.data)}') - return ret.chksum - - def addrfile_chksum(self,mmgen_addrfile:str): - "compute checksum for MMGen address file" - return self._file_chksum(mmgen_addrfile,'AddrList') - - def keyaddrfile_chksum(self,mmgen_keyaddrfile:str): - "compute checksum for MMGen key-address file" - return self._file_chksum(mmgen_keyaddrfile,'KeyAddrList') - - def passwdfile_chksum(self,mmgen_passwdfile:str): - "compute checksum for MMGen password file" - return self._file_chksum(mmgen_passwdfile,'PasswordList') - - async def txview( varargs_call_sig = { # hack to allow for multiple filenames - 'args': ( - 'mmgen_tx_file(s)', - 'pager', - 'terse', - 'sort', - 'filesort' ), - 'dfls': ( False, False, 'addr', 'mtime' ), - 'annots': { - 'mmgen_tx_file(s)': str, - 'sort': _options_annot_str(['addr','raw']), - 'filesort': _options_annot_str(['mtime','ctime','atime']), - } }, - *infiles,**kwargs): - "show raw/signed MMGen transaction in human-readable form" - - terse = bool(kwargs.get('terse')) - tx_sort = kwargs.get('sort') or 'addr' - file_sort = kwargs.get('filesort') or 'mtime' - - from .filename import MMGenFileList - from .tx import MMGenTX - flist = MMGenFileList(infiles,ftype=MMGenTX) - flist.sort_by_age(key=file_sort) # in-place sort - - async def process_file(fn): - if fn.endswith(MMGenTX.Signed.ext): - tx = MMGenTX.Signed( - filename = fn, - quiet_open = True, - tw = await MMGenTX.Signed.get_tracking_wallet(fn) ) - else: - tx = MMGenTX.Unsigned( - filename = fn, - quiet_open = True ) - return tx.format_view(terse=terse,sort=tx_sort) - - return ('—'*77+'\n').join([await process_file(fn) for fn in flist.names()]).rstrip() - -class MMGenToolCmdFileCrypt(MMGenToolCmds): - """ - file encryption and decryption - - MMGen encryption suite: - * Key: Scrypt (user-configurable hash parameters, 32-byte salt) - * Enc: AES256_CTR, 16-byte rand IV, sha256 hash + 32-byte nonce + data - * The encrypted file is indistinguishable from random data - """ - def encrypt(self,infile:str,outfile='',hash_preset=''): - "encrypt a file" - data = get_data_from_file(infile,'data for encryption',binary=True) - enc_d = mmgen_encrypt(data,'data',hash_preset) - if not outfile: - outfile = f'{os.path.basename(infile)}.{mmenc_ext}' - write_data_to_file(outfile,enc_d,'encrypted data',binary=True) - return True - - def decrypt(self,infile:str,outfile='',hash_preset=''): - "decrypt a file" - enc_d = get_data_from_file(infile,'encrypted data',binary=True) - while True: - dec_d = mmgen_decrypt(enc_d,'data',hash_preset) - if dec_d: break - msg('Trying again...') - if not outfile: - o = os.path.basename(infile) - outfile = remove_extension(o,mmenc_ext) - if outfile == o: outfile += '.dec' - write_data_to_file(outfile,dec_d,'decrypted data',binary=True) - return True - -class MMGenToolCmdFileUtil(MMGenToolCmds): - "file utilities" - - def find_incog_data(self,filename:str,incog_id:str,keep_searching=False): - "Use an Incog ID to find hidden incognito wallet data" - ivsize,bsize,mod = ( aesctr_iv_len, 4096, 4096*8 ) - n,carry = 0,b' '*ivsize - flgs = os.O_RDONLY|os.O_BINARY if g.platform == 'win' else os.O_RDONLY - f = os.open(filename,flgs) - for ch in incog_id: - if ch not in '0123456789ABCDEF': - die(2,f'{incog_id!r}: invalid Incog ID') - while True: - d = os.read(f,bsize) - if not d: break - d = carry + d - for i in range(bsize): - if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == incog_id: - if n+i < ivsize: - continue - msg(f'\rIncog data for ID {incog_id} found at offset {n+i-ivsize}') - if not keep_searching: - sys.exit(0) - carry = d[len(d)-ivsize:] - n += bsize - if not n % mod: - msg_r(f'\rSearched: {n} bytes') - - msg('') - os.close(f) - return True - - def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False): - "write 'n' bytes of random data to specified file" - from threading import Thread - from queue import Queue - from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes - from cryptography.hazmat.backends import default_backend - - def encrypt_worker(wid): - ctr_init_val = os.urandom( aesctr_iv_len ) - c = Cipher(algorithms.AES(key),modes.CTR(ctr_init_val),backend=default_backend()) - encryptor = c.encryptor() - while True: - q2.put(encryptor.update(q1.get())) - q1.task_done() - - def output_worker(): - while True: - f.write(q2.get()) - q2.task_done() - - nbytes = parse_bytespec(nbytes) - if opt.outdir: - outfile = make_full_path(opt.outdir,outfile) - f = open(outfile,'wb') - - key = get_random(32) - q1,q2 = Queue(),Queue() - - for i in range(max(1,threads-2)): - t = Thread(target=encrypt_worker,args=[i]) - t.daemon = True - t.start() - - t = Thread(target=output_worker) - t.daemon = True - t.start() - - blk_size = 1024 * 1024 - for i in range(nbytes // blk_size): - if not i % 4: - msg_r(f'\rRead: {i * blk_size} bytes') - q1.put(os.urandom(blk_size)) - - if nbytes % blk_size: - q1.put(os.urandom(nbytes % blk_size)) - - q1.join() - q2.join() - f.close() - - fsize = os.stat(outfile).st_size - if fsize != nbytes: - die(3,f'{fsize}: incorrect random file size (should be {nbytes})') - - if not silent: - msg(f'\rRead: {nbytes} bytes') - qmsg(f'\r{nbytes} byte{suf(nbytes)} of random data written to file {outfile!r}') - - return True - -class MMGenToolCmdWallet(MMGenToolCmds): - "key, address or subseed generation from an MMGen wallet" - - def get_subseed(self,subseed_idx:str,wallet=''): - "get the Seed ID of a single subseed by Subseed Index for default or specified wallet" - opt.quiet = True - sf = get_seed_file([wallet] if wallet else [],1) - from .wallet import Wallet - return Wallet(sf).seed.subseed(subseed_idx).sid - - def get_subseed_by_seed_id(self,seed_id:str,wallet='',last_idx=SubSeedList.dfl_len): - "get the Subseed Index of a single subseed by Seed ID for default or specified wallet" - opt.quiet = True - sf = get_seed_file([wallet] if wallet else [],1) - from .wallet import Wallet - ret = Wallet(sf).seed.subseed_by_seed_id(seed_id,last_idx) - return ret.ss_idx if ret else None - - def list_subseeds(self,subseed_idx_range:str,wallet=''): - "list a range of subseed Seed IDs for default or specified wallet" - opt.quiet = True - sf = get_seed_file([wallet] if wallet else [],1) - from .wallet import Wallet - from .subseed import SubSeedIdxRange - return Wallet(sf).seed.subseeds.format(*SubSeedIdxRange(subseed_idx_range)) - - def list_shares(self, - share_count: int, - id_str = 'default', - master_share: f'(min:1, max:{MasterShareIdx.max_val}, 0=no master share)' = 0, - wallet = '' ): - "list the Seed IDs of the shares resulting from a split of default or specified wallet" - opt.quiet = True - sf = get_seed_file([wallet] if wallet else [],1) - from .wallet import Wallet - return Wallet(sf).seed.split(share_count,id_str,master_share).format() - - def gen_key(self,mmgen_addr:str,wallet=''): - "generate a single MMGen WIF key from default or specified wallet" - return self.gen_addr(mmgen_addr,wallet,target='wif') - - def gen_addr(self,mmgen_addr:str,wallet='',target='addr'): - "generate a single MMGen address from default or specified wallet" - addr = MMGenID(self.proto,mmgen_addr) - opt.quiet = True - sf = get_seed_file([wallet] if wallet else [],1) - from .wallet import Wallet - ss = Wallet(sf) - if ss.seed.sid != addr.sid: - die(1,f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})') - al = AddrList( - proto = self.proto, - seed = ss.seed, - addr_idxs = AddrIdxList(str(addr.idx)), - mmtype = addr.mmtype ) - d = al.data[0] - ret = d.sec.wif if target=='wif' else d.addr - return ret - -class MMGenToolCmdRPC(MMGenToolCmds): - "tracking wallet commands using the JSON-RPC interface" - - async def daemon_version(self): - "print coin daemon version" - from .rpc import rpc_init - r = await rpc_init(self.proto,ignore_daemon_version=True) - return f'{r.daemon.coind_name} version {r.daemon_version} ({r.daemon_version_str})' - - async def getbalance(self,minconf=1,quiet=False,pager=False): - "list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet" - from .twbal import TwGetBalance - return (await TwGetBalance(self.proto,minconf,quiet)).format() - - async def listaddress(self, - mmgen_addr:str, - minconf = 1, - pager = False, - showempty = True, - showbtcaddr = True, - age_fmt: _options_annot_str(TwCommon.age_fmts) = 'confs', - ): - "list the specified MMGen address and its balance" - return await self.listaddresses( mmgen_addrs = mmgen_addr, - minconf = minconf, - pager = pager, - showempty = showempty, - showbtcaddrs = showbtcaddr, - age_fmt = age_fmt, - ) - - async def listaddresses( self, - mmgen_addrs:'(range or list)' = '', - minconf = 1, - showempty = False, - pager = False, - showbtcaddrs = True, - all_labels = False, - sort: _options_annot_str(['reverse','age']) = '', - age_fmt: _options_annot_str(TwCommon.age_fmts) = 'confs', - ): - "list MMGen addresses and their balances" - show_age = bool(age_fmt) - - if sort: - sort = set(sort.split(',')) - sort_params = {'reverse','age'} - if not sort.issubset(sort_params): - die(1,"The sort option takes the following parameters: '{}'".format( "','".join(sort_params) )) - - usr_addr_list = [] - if mmgen_addrs: - a = mmgen_addrs.rsplit(':',1) - if len(a) != 2: - die(1, - f'{mmgen_addrs}: invalid address list argument ' + - '(must be in form :[:])' ) - usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])] - - from .twaddrs import TwAddrList - al = await TwAddrList(self.proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels) - if not al: - die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty]) - return await al.format(showbtcaddrs,sort,show_age,age_fmt or 'confs') - - async def twview( self, - pager = False, - reverse = False, - wide = False, - minconf = 1, - sort = 'age', - age_fmt: _options_annot_str(TwCommon.age_fmts) = 'confs', - show_mmid = True, - wide_show_confs = True): - "view tracking wallet" - from .twuo import TwUnspentOutputs - twuo = await TwUnspentOutputs(self.proto,minconf=minconf) - await twuo.get_unspent_data(reverse_sort=reverse) - twuo.age_fmt = age_fmt - twuo.show_mmid = show_mmid - if wide: - ret = twuo.format_for_printing(color=True,show_confs=wide_show_confs) - else: - ret = twuo.format_for_display() - del twuo.wallet - return await ret - - async def add_label(self,mmgen_or_coin_addr:str,label:str): - "add descriptive label for address in tracking wallet" - from .twctl import TrackingWallet - await (await TrackingWallet(self.proto,mode='w')).add_label(mmgen_or_coin_addr,label,on_fail='raise') - return True - - async def remove_label(self,mmgen_or_coin_addr:str): - "remove descriptive label for address in tracking wallet" - await self.add_label(mmgen_or_coin_addr,'') - return True - - async def remove_address(self,mmgen_or_coin_addr:str): - "remove an address from tracking wallet" - from .twctl import TrackingWallet - ret = await (await TrackingWallet(self.proto,mode='w')).remove_address(mmgen_or_coin_addr) # returns None on failure - if ret: - msg(f'Address {ret!r} deleted from tracking wallet') - return ret - -class tool_api( - MMGenToolCmdUtil, - MMGenToolCmdCoin, - MMGenToolCmdMnemonic, - ): - """ - API providing access to a subset of methods from the mmgen.tool module - - Example: - from mmgen.tool import tool_api - tool = tool_api() - - # Set the coin and network: - tool.init_coin('btc','mainnet') - - # Print available address types: - tool.print_addrtypes() - - # Set the address type: - tool.addrtype = 'segwit' - - # Disable user entropy gathering (optional, reduces security): - tool.usr_randchars = 0 - - # Generate a random BTC segwit keypair: - wif,addr = tool.randpair() - - # Set coin, network and address type: - tool.init_coin('ltc','testnet') - tool.addrtype = 'bech32' - - # Generate a random LTC testnet Bech32 keypair: - wif,addr = tool.randpair() - """ - - def __init__(self): - """ - Initializer - takes no arguments - """ - opts.UserOpts._reset_ok += ('usr_randchars',) - if not opt._lock: - opts.init() - super().__init__() - - def init_coin(self,coinsym,network): - """ - Initialize a coin/network pair - Valid choices for coins: one of the symbols returned by the 'coins' attribute - Valid choices for network: 'mainnet','testnet','regtest' - """ - from .protocol import init_proto,init_genonly_altcoins - altcoin_trust_level = init_genonly_altcoins(coinsym,testnet=network in ('testnet','regtest')) - warn_altcoins(coinsym,altcoin_trust_level) - self.proto = init_proto(coinsym,network=network) - return self.proto - - @property - def coins(self): - """The available coins""" - from .protocol import CoinProtocol - from .altcoin import CoinInfo - return sorted(set( - [c.upper() for c in CoinProtocol.coins] - + [c.symbol for c in CoinInfo.get_supported_coins(self.proto.network)] - )) - - @property - def coin(self): - """The currently configured coin""" - return self.proto.coin - - @property - def network(self): - """The currently configured network""" - return self.proto.network - - @property - def addrtypes(self): - """ - The available address types for current coin/network pair. The - first-listed is the default - """ - return [MMGenAddrType(proto=self.proto,id_str=id_str).name for id_str in self.proto.mmtypes] - - def print_addrtypes(self): - """ - Print the available address types for current coin/network pair along with - a description. The first-listed is the default - """ - for t in [MMGenAddrType(proto=self.proto,id_str=id_str) for id_str in self.proto.mmtypes]: - print(f'{t.name:<12} - {t.desc}') - - @property - def addrtype(self): - """The currently configured address type (is assignable)""" - return self.mmtype - - @addrtype.setter - def addrtype(self,val): - self.mmtype = MMGenAddrType(self.proto,val) - - @property - def usr_randchars(self): - """ - The number of keystrokes of entropy to be gathered from the user. - Setting to zero disables user entropy gathering. - """ - return opt.usr_randchars - - @usr_randchars.setter - def usr_randchars(self,val): - opt.usr_randchars = val diff --git a/mmgen/tool/__init__.py b/mmgen/tool/__init__.py new file mode 100755 index 00000000..0f41cc31 --- /dev/null +++ b/mmgen/tool/__init__.py @@ -0,0 +1,4 @@ +# provide this for backwards compatibility: +def tool_api(*args,**kwargs): + from .api import tool_api + return tool_api(*args,**kwargs) diff --git a/mmgen/tool/api.py b/mmgen/tool/api.py new file mode 100755 index 00000000..a2e38f74 --- /dev/null +++ b/mmgen/tool/api.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/api.py: tool_api interface for the 'mmgen-tool' utility +""" + +from .common import tool_cmd_base +from .util import tool_cmd as util_cmds +from .coin import tool_cmd as coin_cmds +from .mnemonic import tool_cmd as mnemonic_cmds + +class tool_api( + util_cmds, + coin_cmds, + mnemonic_cmds, + tool_cmd_base ): + """ + API providing access to a subset of methods from the mmgen.tool module + + Example: + from mmgen.tool.api import tool_api + tool = tool_api() + + # Set the coin and network: + tool.init_coin('btc','mainnet') + + # Print available address types: + tool.print_addrtypes() + + # Set the address type: + tool.addrtype = 'segwit' + + # Disable user entropy gathering (optional, reduces security): + tool.usr_randchars = 0 + + # Generate a random BTC segwit keypair: + wif,addr = tool.randpair() + + # Set coin, network and address type: + tool.init_coin('ltc','testnet') + tool.addrtype = 'bech32' + + # Generate a random LTC testnet Bech32 keypair: + wif,addr = tool.randpair() + """ + + def __init__(self): + """ + Initializer - takes no arguments + """ + import mmgen.opts as opts + from ..opts import opt + opts.UserOpts._reset_ok += ('usr_randchars',) + if not opt._lock: + opts.init() + super().__init__() + + def init_coin(self,coinsym,network): + """ + Initialize a coin/network pair + Valid choices for coins: one of the symbols returned by the 'coins' attribute + Valid choices for network: 'mainnet','testnet','regtest' + """ + from ..protocol import init_proto,init_genonly_altcoins + from ..util import warn_altcoins + altcoin_trust_level = init_genonly_altcoins(coinsym,testnet=network in ('testnet','regtest')) + warn_altcoins(coinsym,altcoin_trust_level) + self.proto = init_proto(coinsym,network=network) + return self.proto + + @property + def coins(self): + """The available coins""" + from ..protocol import CoinProtocol + from ..altcoin import CoinInfo + return sorted(set( + [c.upper() for c in CoinProtocol.coins] + + [c.symbol for c in CoinInfo.get_supported_coins(self.proto.network)] + )) + + @property + def coin(self): + """The currently configured coin""" + return self.proto.coin + + @property + def network(self): + """The currently configured network""" + return self.proto.network + + @property + def addrtypes(self): + """ + The available address types for current coin/network pair. The + first-listed is the default + """ + from ..addr import MMGenAddrType + return [MMGenAddrType(proto=self.proto,id_str=id_str).name for id_str in self.proto.mmtypes] + + def print_addrtypes(self): + """ + Print the available address types for current coin/network pair along with + a description. The first-listed is the default + """ + from ..addr import MMGenAddrType + for t in [MMGenAddrType(proto=self.proto,id_str=id_str) for id_str in self.proto.mmtypes]: + print(f'{t.name:<12} - {t.desc}') + + @property + def addrtype(self): + """The currently configured address type (is assignable)""" + return self.mmtype + + @addrtype.setter + def addrtype(self,val): + from ..addr import MMGenAddrType + self.mmtype = MMGenAddrType(self.proto,val) + + @property + def usr_randchars(self): + """ + The number of keystrokes of entropy to be gathered from the user. + Setting to zero disables user entropy gathering. + """ + from ..opts import opt + return opt.usr_randchars + + @usr_randchars.setter + def usr_randchars(self,val): + from ..opts import opt + opt.usr_randchars = val diff --git a/mmgen/tool/coin.py b/mmgen/tool/coin.py new file mode 100755 index 00000000..3d158f0c --- /dev/null +++ b/mmgen/tool/coin.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/coin.py: Cryptocoin routines for the 'mmgen-tool' utility +""" + +from collections import namedtuple +generator_data = namedtuple('generator_data',['kg','ag']) + +from .common import tool_cmd_base + +from ..protocol import init_proto,init_genonly_altcoins,hash160 +from ..key import PrivKey +from ..addr import KeyGenerator,AddrGenerator,MMGenAddrType,CoinAddr + +class tool_cmd(tool_cmd_base): + """ + cryptocoin key/address utilities + + May require use of the '--coin', '--type' and/or '--testnet' options + + Examples: + mmgen-tool --coin=ltc --type=bech32 wif2addr + mmgen-tool --coin=zec --type=zcash_z randpair + """ + + def __init__(self,proto=None,mmtype=None): + + if proto: + self.proto = proto + else: + from ..protocol import init_proto_from_opts + self.proto = init_proto_from_opts() + + from ..opts import opt + self.mmtype = MMGenAddrType( + self.proto, + mmtype or opt.type or self.proto.dfl_mmtype ) + + from ..globalvars import g + if g.token: + self.proto.tokensym = g.token.upper() + + def _init_generators(self,arg=None): + return generator_data( + kg = KeyGenerator( self.proto, self.mmtype.pubkey_type ), + ag = AddrGenerator( self.proto, self.mmtype ), + ) + + def randwif(self): + "generate a random private key in WIF format" + from ..crypto import get_random + return PrivKey( + self.proto, + get_random(32), + pubkey_type = self.mmtype.pubkey_type, + compressed = self.mmtype.compressed ).wif + + def randpair(self): + "generate a random private key/address pair" + gd = self._init_generators() + from ..crypto import get_random + privkey = PrivKey( + self.proto, + get_random(32), + pubkey_type = self.mmtype.pubkey_type, + compressed = self.mmtype.compressed ) + return ( + privkey.wif, + gd.ag.to_addr( gd.kg.gen_data(privkey) )) + + def wif2hex(self,wifkey:'sstr'): + "convert a private key from WIF to hex format" + return PrivKey( + self.proto, + wif = wifkey ).hex() + + def hex2wif(self,privhex:'sstr'): + "convert a private key from hex to WIF format" + return PrivKey( + self.proto, + bytes.fromhex(privhex), + pubkey_type = self.mmtype.pubkey_type, + compressed = self.mmtype.compressed ).wif + + def wif2addr(self,wifkey:'sstr'): + "generate a coin address from a key in WIF format" + gd = self._init_generators() + privkey = PrivKey( + self.proto, + wif = wifkey ) + return gd.ag.to_addr( gd.kg.gen_data(privkey) ) + + def wif2redeem_script(self,wifkey:'sstr'): # new + "convert a WIF private key to a Segwit P2SH-P2WPKH redeem script" + assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' + gd = self._init_generators() + privkey = PrivKey( + self.proto, + wif = wifkey ) + return gd.ag.to_segwit_redeem_script( gd.kg.gen_data(privkey) ) + + def wif2segwit_pair(self,wifkey:'sstr'): + "generate both a Segwit P2SH-P2WPKH redeem script and address from WIF" + assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' + gd = self._init_generators() + data = gd.kg.gen_data(PrivKey( + self.proto, + wif = wifkey )) + return ( + gd.ag.to_segwit_redeem_script(data), + gd.ag.to_addr(data) ) + + def privhex2addr(self,privhex:'sstr',output_pubhex=False): + "generate coin address from raw private key data in hexadecimal format" + gd = self._init_generators() + pk = PrivKey( + self.proto, + bytes.fromhex(privhex), + compressed = self.mmtype.compressed, + pubkey_type = self.mmtype.pubkey_type ) + data = gd.kg.gen_data(pk) + return data.pubkey.hex() if output_pubhex else gd.ag.to_addr(data) + + def privhex2pubhex(self,privhex:'sstr'): # new + "generate a hex public key from a hex private key" + return self.privhex2addr(privhex,output_pubhex=True) + + def pubhex2addr(self,pubkeyhex:'sstr'): + "convert a hex pubkey to an address" + pubkey = bytes.fromhex(pubkeyhex) + if self.mmtype.name == 'segwit': + return self.proto.pubkey2segwitaddr( pubkey ) + else: + return self.pubhash2addr( hash160(pubkey).hex() ) + + def pubhex2redeem_script(self,pubkeyhex:'sstr'): # new + "convert a hex pubkey to a Segwit P2SH-P2WPKH redeem script" + assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' + return self.proto.pubkey2redeem_script( bytes.fromhex(pubkeyhex) ).hex() + + def redeem_script2addr(self,redeem_scripthex:'sstr'): # new + "convert a Segwit P2SH-P2WPKH redeem script to an address" + assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit' + assert redeem_scripthex[:4] == '0014', f'{redeem_scripthex!r}: invalid redeem script' + assert len(redeem_scripthex) == 44, f'{len(redeem_scripthex)//2} bytes: invalid redeem script length' + return self.pubhash2addr( hash160(bytes.fromhex(redeem_scripthex)).hex() ) + + def pubhash2addr(self,pubhashhex:'sstr'): + "convert public key hash to address" + pubhash = bytes.fromhex(pubhashhex) + if self.mmtype.name == 'bech32': + return self.proto.pubhash2bech32addr( pubhash ) + else: + return self.proto.pubhash2addr( pubhash, self.mmtype.addr_fmt=='p2sh' ) + + def addr2pubhash(self,addr:'sstr'): + "convert coin address to public key hash" + from ..tx import addr2pubhash + return addr2pubhash( self.proto, CoinAddr(self.proto,addr) ) + + def addr2scriptpubkey(self,addr:'sstr'): + "convert coin address to scriptPubKey" + from ..tx import addr2scriptPubKey + return addr2scriptPubKey( self.proto, CoinAddr(self.proto,addr) ) + + def scriptpubkey2addr(self,hexstr:'sstr'): + "convert scriptPubKey to coin address" + from ..tx import scriptPubKey2addr + return scriptPubKey2addr( self.proto, hexstr )[0] diff --git a/mmgen/tool/common.py b/mmgen/tool/common.py new file mode 100755 index 00000000..7c2875d0 --- /dev/null +++ b/mmgen/tool/common.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/common.py: Base class and shared routines for the 'mmgen-tool' utility +""" + +def options_annot_str(l): + return "(valid options: '{}')".format( "','".join(l) ) + +class tool_cmd_base: + + def __init__(self,proto=None,mmtype=None): + pass + + @property + def user_commands(self): + return {k:v for k,v in type(self).__dict__.items() if not k.startswith('_')} diff --git a/mmgen/tool/file.py b/mmgen/tool/file.py new file mode 100755 index 00000000..61672c1b --- /dev/null +++ b/mmgen/tool/file.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/file.py: Address and transaction file routines for the 'mmgen-tool' utility +""" + +from .common import tool_cmd_base,options_annot_str + +class tool_cmd(tool_cmd_base): + "utilities for viewing/checking MMGen address and transaction files" + + def __init__(self,proto=None,mmtype=None): + if proto: + self.proto = proto + else: + from ..protocol import init_proto_from_opts + self.proto = init_proto_from_opts() + + def _file_chksum(self,mmgen_addrfile,objname): + from ..opts import opt + from ..addrlist import AddrList,KeyAddrList + from ..passwdlist import PasswordList + verbose,yes,quiet = [bool(i) for i in (opt.verbose,opt.yes,opt.quiet)] + opt.verbose,opt.yes,opt.quiet = (False,True,True) + ret = locals()[objname](self.proto,mmgen_addrfile) + opt.verbose,opt.yes,opt.quiet = (verbose,yes,quiet) + if verbose: + from ..util import msg,capfirst + if ret.al_id.mmtype.name == 'password': + msg('Passwd fmt: {}\nPasswd len: {}\nID string: {}'.format( + capfirst(ret.pw_info[ret.pw_fmt].desc), + ret.pw_len, + ret.pw_id_str )) + else: + msg(f'Base coin: {ret.base_coin} {capfirst(ret.network)}') + msg(f'MMType: {capfirst(ret.al_id.mmtype.name)}') + msg( f'List length: {len(ret.data)}') + return ret.chksum + + def addrfile_chksum(self,mmgen_addrfile:str): + "compute checksum for MMGen address file" + return self._file_chksum(mmgen_addrfile,'AddrList') + + def keyaddrfile_chksum(self,mmgen_keyaddrfile:str): + "compute checksum for MMGen key-address file" + return self._file_chksum(mmgen_keyaddrfile,'KeyAddrList') + + def passwdfile_chksum(self,mmgen_passwdfile:str): + "compute checksum for MMGen password file" + return self._file_chksum(mmgen_passwdfile,'PasswordList') + + async def txview( + varargs_call_sig = { # hack to allow for multiple filenames + 'args': ( + 'mmgen_tx_file(s)', + 'pager', + 'terse', + 'sort', + 'filesort' ), + 'dfls': ( False, False, 'addr', 'mtime' ), + 'annots': { + 'mmgen_tx_file(s)': str, + 'sort': options_annot_str(['addr','raw']), + 'filesort': options_annot_str(['mtime','ctime','atime']), + } + }, + *infiles, + **kwargs ): + "show raw/signed MMGen transaction in human-readable form" + + terse = bool(kwargs.get('terse')) + tx_sort = kwargs.get('sort') or 'addr' + file_sort = kwargs.get('filesort') or 'mtime' + + from ..filename import MMGenFileList + from ..tx import MMGenTX + flist = MMGenFileList( infiles, ftype=MMGenTX ) + flist.sort_by_age( key=file_sort ) # in-place sort + + async def process_file(fn): + if fn.endswith(MMGenTX.Signed.ext): + tx = MMGenTX.Signed( + filename = fn, + quiet_open = True, + tw = await MMGenTX.Signed.get_tracking_wallet(fn) ) + else: + tx = MMGenTX.Unsigned( + filename = fn, + quiet_open = True ) + return tx.format_view( terse=terse, sort=tx_sort ) + + return ('—'*77+'\n').join([await process_file(fn) for fn in flist.names()]).rstrip() diff --git a/mmgen/tool/filecrypt.py b/mmgen/tool/filecrypt.py new file mode 100755 index 00000000..162993b0 --- /dev/null +++ b/mmgen/tool/filecrypt.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/filecrypt.py: File encryption/decryption routines for the 'mmgen-tool' utility +""" + +import os + +from .common import tool_cmd_base +from ..crypto import mmgen_encrypt,mmgen_decrypt,mmenc_ext +from ..fileutil import get_data_from_file,write_data_to_file + +class tool_cmd(tool_cmd_base): + """ + file encryption and decryption + + MMGen encryption suite: + * Key: Scrypt (user-configurable hash parameters, 32-byte salt) + * Enc: AES256_CTR, 16-byte rand IV, sha256 hash + 32-byte nonce + data + * The encrypted file is indistinguishable from random data + """ + def encrypt(self,infile:str,outfile='',hash_preset=''): + "encrypt a file" + data = get_data_from_file( infile, 'data for encryption', binary=True ) + enc_d = mmgen_encrypt( data, 'data', hash_preset ) + if not outfile: + outfile = f'{os.path.basename(infile)}.{mmenc_ext}' + write_data_to_file( outfile, enc_d, 'encrypted data', binary=True ) + return True + + def decrypt(self,infile:str,outfile='',hash_preset=''): + "decrypt a file" + enc_d = get_data_from_file( infile, 'encrypted data', binary=True ) + while True: + dec_d = mmgen_decrypt( enc_d, 'data', hash_preset ) + if dec_d: + break + msg('Trying again...') + if not outfile: + from ..util import remove_extension + o = os.path.basename(infile) + outfile = remove_extension(o,mmenc_ext) + if outfile == o: + outfile += '.dec' + write_data_to_file( outfile, dec_d, 'decrypted data', binary=True ) + return True diff --git a/mmgen/tool/fileutil.py b/mmgen/tool/fileutil.py new file mode 100755 index 00000000..62d50e98 --- /dev/null +++ b/mmgen/tool/fileutil.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/fileutil.py: File routines for the 'mmgen-tool' utility +""" + +import os + +from .common import tool_cmd_base +from ..util import msg,msg_r,qmsg,die,suf,parse_bytespec,make_full_path +from ..crypto import get_random,aesctr_iv_len + +class tool_cmd(tool_cmd_base): + "file utilities" + + def find_incog_data(self,filename:str,incog_id:str,keep_searching=False): + "Use an Incog ID to find hidden incognito wallet data" + + from hashlib import sha256 + from ..globalvars import g + + ivsize,bsize,mod = ( aesctr_iv_len, 4096, 4096*8 ) + n,carry = 0,b' '*ivsize + flgs = os.O_RDONLY|os.O_BINARY if g.platform == 'win' else os.O_RDONLY + f = os.open(filename,flgs) + for ch in incog_id: + if ch not in '0123456789ABCDEF': + die(2,f'{incog_id!r}: invalid Incog ID') + while True: + d = os.read(f,bsize) + if not d: + break + d = carry + d + for i in range(bsize): + if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == incog_id: + if n+i < ivsize: + continue + msg(f'\rIncog data for ID {incog_id} found at offset {n+i-ivsize}') + if not keep_searching: + import sys + sys.exit(0) + carry = d[len(d)-ivsize:] + n += bsize + if not n % mod: + msg_r(f'\rSearched: {n} bytes') + + msg('') + os.close(f) + return True + + def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False): + "write 'n' bytes of random data to specified file" + from threading import Thread + from queue import Queue + from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes + from cryptography.hazmat.backends import default_backend + + from ..opts import opt + + def encrypt_worker(wid): + ctr_init_val = os.urandom( aesctr_iv_len ) + c = Cipher( algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend() ) + encryptor = c.encryptor() + while True: + q2.put( encryptor.update(q1.get()) ) + q1.task_done() + + def output_worker(): + while True: + f.write( q2.get() ) + q2.task_done() + + nbytes = parse_bytespec(nbytes) + if opt.outdir: + outfile = make_full_path( opt.outdir, outfile ) + f = open(outfile,'wb') + + key = get_random(32) + q1,q2 = ( Queue(), Queue() ) + + for i in range(max(1,threads-2)): + t = Thread( target=encrypt_worker, args=[i] ) + t.daemon = True + t.start() + + t = Thread( target=output_worker ) + t.daemon = True + t.start() + + blk_size = 1024 * 1024 + for i in range(nbytes // blk_size): + if not i % 4: + msg_r(f'\rRead: {i * blk_size} bytes') + q1.put( os.urandom(blk_size) ) + + if nbytes % blk_size: + q1.put( os.urandom(nbytes % blk_size) ) + + q1.join() + q2.join() + f.close() + + fsize = os.stat(outfile).st_size + if fsize != nbytes: + die(3,f'{fsize}: incorrect random file size (should be {nbytes})') + + if not silent: + msg(f'\rRead: {nbytes} bytes') + qmsg(f'\r{nbytes} byte{suf(nbytes)} of random data written to file {outfile!r}') + + return True diff --git a/mmgen/tool/help.py b/mmgen/tool/help.py new file mode 100755 index 00000000..0eeaa4b4 --- /dev/null +++ b/mmgen/tool/help.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/help.py: Help screen routines for the 'mmgen-tool' utility +""" + +from .common import tool_cmd_base +import mmgen.main_tool as main_tool + +def main_help(): + + from ..util import pretty_format + + def do(): + for clsname,cmdlist in main_tool.mods.items(): + cls = main_tool.get_mod_cls(clsname) + cls_doc = cls.__doc__.strip().split('\n') + for l in cls_doc: + if l is cls_doc[0]: + l += ':' + l = l.replace('\t','',1) + if l: + l = l.replace('\t',' ') + yield l[0].upper() + l[1:] + else: + yield '' + yield '' + + max_w = max(map(len,cmdlist)) + + for cmdname in cmdlist: + code = getattr(cls,cmdname) + if code.__doc__: + yield ' {:{}} - {}'.format( + cmdname, + max_w, + pretty_format( + code.__doc__.strip().replace('\n\t\t',' '), + width = 79-(max_w+7), + pfx = ' '*(max_w+5)).lstrip() + ) + yield '' + + return '\n'.join(do()) + +def usage(cmdname=None,exit_val=1): + + m1 = """ + USAGE INFORMATION FOR MMGEN-TOOL COMMANDS: + + Unquoted arguments are mandatory + Quoted arguments are optional, default values will be used + Argument types and default values are shown in square brackets + """ + + m2 = """ + To force a command to read from STDIN instead of file (for commands taking + a filename as their first argument), substitute "-" for the filename. + + EXAMPLES: + + Generate a random Bech32 public/private keypair for LTC: + $ mmgen-tool -r0 --coin=ltc --type=bech32 randpair + + Generate a DASH compressed public key address from the supplied WIF key: + $ mmgen-tool --coin=dash --type=compressed wif2addr XJkVRC3eGKurc9Uzx1wfQoio3yqkmaXVqLMTa6y7s3M3jTBnmxfw + + Generate a well-known burn address: + $ mmgen-tool hextob58chk 000000000000000000000000000000000000000000 + + Generate a random 12-word seed phrase: + $ mmgen-tool -r0 mn_rand128 + + Same as above, but get additional entropy from user: + $ mmgen-tool mn_rand128 + + Encode bytes from a file to base 58: + $ mmgen-tool bytestob58 /etc/timezone pad=20 + + Reverse a hex string: + $ mmgen-tool hexreverse "deadbeefcafe" + + Same as above, but use a pipe: + $ echo "deadbeefcafe" | mmgen-tool hexreverse - + """ + + from ..util import Msg,Msg_r,fmt,die,capfirst + + if cmdname: + from ..globalvars import g + for mod,cmdlist in main_tool.mods.items(): + if cmdname in cmdlist: + cls = main_tool.get_mod_cls(mod) + p1 = fmt(capfirst(getattr(cls,cmdname).__doc__.strip()),strip_char='\t').strip() + Msg('{}{}\nUSAGE: {} {} {}'.format( + p1, + ('\n' if '\n' in p1 else ''), + g.prog_name,cmdname, + main_tool.create_call_sig(cmdname,cls)) + ) + break + else: + die(1,f'{cmdname!r}: no such tool command') + else: + Msg(fmt(m1,strip_char='\t')) + for clsname,cmdlist in main_tool.mods.items(): + cls = main_tool.get_mod_cls(clsname) + cls_info = cls.__doc__.strip().split('\n')[0] + Msg(' {}{}:\n'.format( cls_info[0].upper(), cls_info[1:] )) + max_w = max(map(len,cmdlist)) + for cmdname in cmdlist: + Msg(f' {cmdname:{max_w}} {main_tool.create_call_sig(cmdname,cls)}') + Msg('') + Msg_r(' ' + fmt(m2,strip_char='\t')) + + import sys + sys.exit(exit_val) + +class tool_cmd(tool_cmd_base): + "help/usage commands" + + def help(self,command_name=''): + "display usage information for a single command or all commands" + usage(command_name,exit_val=0) + + def usage(self,command_name=''): + "display usage information for a single command" + usage(command_name,exit_val=0) diff --git a/mmgen/tool/mnemonic.py b/mmgen/tool/mnemonic.py new file mode 100755 index 00000000..fb41d693 --- /dev/null +++ b/mmgen/tool/mnemonic.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/mnemonic.py: Mnemonic routines for the 'mmgen-tool' utility +""" + +from collections import namedtuple + +from .common import tool_cmd_base,options_annot_str + +from ..baseconv import baseconv +from ..xmrseed import xmrseed +from ..bip39 import bip39 + +dfl_mnemonic_fmt = 'mmgen' +mft = namedtuple('mnemonic_format',['fmt','pad','conv_cls']) +mnemonic_fmts = { + 'mmgen': mft( 'words', 'seed', baseconv ), + 'bip39': mft( 'bip39', None, bip39 ), + 'xmrseed': mft( 'xmrseed', None, xmrseed ), +} +mn_opts_disp = options_annot_str(mnemonic_fmts) + +class tool_cmd(tool_cmd_base): + """ + seed phrase utilities (valid formats: 'mmgen' (default), 'bip39', 'xmrseed') + + IMPORTANT NOTE: MMGen's default seed phrase format uses the Electrum + wordlist, however seed phrases are computed using a different algorithm + and are NOT Electrum-compatible! + + BIP39 support is fully compatible with the standard, allowing users to + import and export seed entropy from BIP39-compatible wallets. However, + users should be aware that BIP39 support does not imply BIP32 support! + MMGen uses its own key derivation scheme differing from the one described + by the BIP32 protocol. + + For Monero ('xmrseed') seed phrases, input data is reduced to a spendkey + before conversion so that a canonical seed phrase is produced. This is + required because Monero seeds, unlike ordinary wallet seeds, are tied + to a concrete key/address pair. To manually generate a Monero spendkey, + use the 'hex2wif' command. + """ + + @staticmethod + def _xmr_reduce(bytestr): + from ..protocol import init_proto + proto = init_proto('xmr') + if len(bytestr) != proto.privkey_len: + die(1,'{!r}: invalid bit length for Monero private key (must be {})'.format( + len(bytestr*8), + proto.privkey_len*8 )) + return proto.preprocess_key(bytestr,None) + + def _do_random_mn(self,nbytes:int,fmt:str): + assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32' + from ..crypto import get_random + randbytes = get_random(nbytes) + if fmt == 'xmrseed': + randbytes = self._xmr_reduce(randbytes) + from ..opts import opt + if opt.verbose: + from ..util import msg + msg(f'Seed: {randbytes.hex()}') + return self.hex2mn(randbytes.hex(),fmt=fmt) + + def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + "generate random 128-bit mnemonic seed phrase" + return self._do_random_mn(16,fmt) + + def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + "generate random 192-bit mnemonic seed phrase" + return self._do_random_mn(24,fmt) + + def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + "generate random 256-bit mnemonic seed phrase" + return self._do_random_mn(32,fmt) + + def hex2mn( self, hexstr:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ): + "convert a 16, 24 or 32-byte hexadecimal number to a mnemonic seed phrase" + if fmt == 'xmrseed': + hexstr = self._xmr_reduce(bytes.fromhex(hexstr)).hex() + f = mnemonic_fmts[fmt] + return ' '.join( f.conv_cls(fmt).fromhex(hexstr,f.pad) ) + + def mn2hex( self, seed_mnemonic:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ): + "convert a mnemonic seed phrase to a hexadecimal number" + f = mnemonic_fmts[fmt] + return f.conv_cls(fmt).tohex( seed_mnemonic.split(), f.pad ) + + def mn2hex_interactive( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, mn_len=24, print_mn=False ): + "convert an interactively supplied mnemonic seed phrase to a hexadecimal number" + from ..mn_entry import mn_entry + mn = mn_entry(fmt).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len,validate=False) + if print_mn: + from ..util import msg + msg(mn) + return self.mn2hex(seed_mnemonic=mn,fmt=fmt) + + def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + "show stats for mnemonic wordlist" + return mnemonic_fmts[fmt].conv_cls(fmt).check_wordlist() + + def mn_printlist( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, enum=False, pager=False ): + "print mnemonic wordlist" + ret = mnemonic_fmts[fmt].conv_cls(fmt).get_wordlist() + if enum: + ret = [f'{n:>4} {e}' for n,e in enumerate(ret)] + return '\n'.join(ret) diff --git a/mmgen/tool/rpc.py b/mmgen/tool/rpc.py new file mode 100755 index 00000000..c8570dca --- /dev/null +++ b/mmgen/tool/rpc.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/rpc.py: JSON/RPC routines for the 'mmgen-tool' utility +""" + +from .common import tool_cmd_base,options_annot_str +from ..tw import TwCommon + +class tool_cmd(tool_cmd_base): + "tracking wallet commands using the JSON-RPC interface" + + def __init__(self,proto=None,mmtype=None): + + if proto: + self.proto = proto + else: + from ..protocol import init_proto_from_opts + self.proto = init_proto_from_opts() + + from ..globalvars import g + if g.token: + self.proto.tokensym = g.token.upper() + + async def daemon_version(self): + "print coin daemon version" + from ..rpc import rpc_init + r = await rpc_init( self.proto, ignore_daemon_version=True ) + return f'{r.daemon.coind_name} version {r.daemon_version} ({r.daemon_version_str})' + + async def getbalance(self,minconf=1,quiet=False,pager=False): + "list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet" + from ..twbal import TwGetBalance + return (await TwGetBalance(self.proto,minconf,quiet)).format() + + async def listaddress(self, + mmgen_addr:str, + minconf = 1, + pager = False, + showempty = True, + showbtcaddr = True, + age_fmt: options_annot_str(TwCommon.age_fmts) = 'confs' ): + "list the specified MMGen address and its balance" + + return await self.listaddresses( + mmgen_addrs = mmgen_addr, + minconf = minconf, + pager = pager, + showempty = showempty, + showbtcaddrs = showbtcaddr, + age_fmt = age_fmt ) + + async def listaddresses(self, + mmgen_addrs:'(range or list)' = '', + minconf = 1, + showempty = False, + pager = False, + showbtcaddrs = True, + all_labels = False, + sort: options_annot_str(['reverse','age']) = '', + age_fmt: options_annot_str(TwCommon.age_fmts) = 'confs' ): + "list MMGen addresses and their balances" + + show_age = bool(age_fmt) + + if sort: + sort = set(sort.split(',')) + sort_params = {'reverse','age'} + if not sort.issubset( sort_params ): + from ..util import die + die(1,"The sort option takes the following parameters: '{}'".format( "','".join(sort_params) )) + + usr_addr_list = [] + if mmgen_addrs: + a = mmgen_addrs.rsplit(':',1) + if len(a) != 2: + from ..util import die + die(1, + f'{mmgen_addrs}: invalid address list argument ' + + '(must be in form :[:])' ) + from ..addr import MMGenID + from ..addrlist import AddrIdxList + usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])] + + from ..twaddrs import TwAddrList + al = await TwAddrList( self.proto, usr_addr_list, minconf, showempty, showbtcaddrs, all_labels ) + if not al: + from ..util import die + die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty]) + return await al.format( showbtcaddrs, sort, show_age, age_fmt or 'confs' ) + + async def twview(self, + pager = False, + reverse = False, + wide = False, + minconf = 1, + sort = 'age', + age_fmt: options_annot_str(TwCommon.age_fmts) = 'confs', + show_mmid = True, + wide_show_confs = True ): + "view tracking wallet" + + from ..twuo import TwUnspentOutputs + twuo = await TwUnspentOutputs(self.proto,minconf=minconf) + await twuo.get_unspent_data(reverse_sort=reverse) + twuo.age_fmt = age_fmt + twuo.show_mmid = show_mmid + if wide: + ret = twuo.format_for_printing( color=True, show_confs=wide_show_confs ) + else: + ret = twuo.format_for_display() + del twuo.wallet + return await ret + + async def add_label(self,mmgen_or_coin_addr:str,label:str): + "add descriptive label for address in tracking wallet" + from ..twctl import TrackingWallet + await (await TrackingWallet(self.proto,mode='w')).add_label( mmgen_or_coin_addr, label, on_fail='raise' ) + return True + + async def remove_label(self,mmgen_or_coin_addr:str): + "remove descriptive label for address in tracking wallet" + await self.add_label( mmgen_or_coin_addr, '' ) + return True + + async def remove_address(self,mmgen_or_coin_addr:str): + "remove an address from tracking wallet" + from ..twctl import TrackingWallet + ret = await (await TrackingWallet(self.proto,mode='w')).remove_address(mmgen_or_coin_addr) # returns None on failure + if ret: + from ..util import msg + msg(f'Address {ret!r} deleted from tracking wallet') + return ret diff --git a/mmgen/tool/util.py b/mmgen/tool/util.py new file mode 100755 index 00000000..6b79ac70 --- /dev/null +++ b/mmgen/tool/util.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/util.py: Utility commands for the 'mmgen-tool' utility +""" + +from .common import tool_cmd_base + +class tool_cmd(tool_cmd_base): + "general string conversion and hashing utilities" + + def bytespec(self,dd_style_byte_specifier:str): + "convert a byte specifier such as '1GB' into an integer" + from ..util import parse_bytespec + return parse_bytespec(dd_style_byte_specifier) + + def to_bytespec(self, + n: int, + dd_style_byte_specifier: str, + fmt = '0.2', + print_sym = True ): + "convert an integer to a byte specifier such as '1GB'" + from ..util import int2bytespec + return int2bytespec( n, dd_style_byte_specifier, fmt, print_sym ) + + def randhex(self,nbytes='32'): + "print 'n' bytes (default 32) of random data in hex format" + from ..crypto import get_random + return get_random( int(nbytes) ).hex() + + def hexreverse(self,hexstr:'sstr'): + "reverse bytes of a hexadecimal string" + return bytes.fromhex( hexstr.strip() )[::-1].hex() + + def hexlify(self,infile:str): + "convert bytes in file to hexadecimal (use '-' for stdin)" + from ..fileutil import get_data_from_file + data = get_data_from_file( infile, dash=True, quiet=True, binary=True ) + return data.hex() + + def unhexlify(self,hexstr:'sstr'): + "convert hexadecimal value to bytes (warning: outputs binary data)" + return bytes.fromhex(hexstr) + + def hexdump(self,infile:str,cols=8,line_nums='hex'): + "create hexdump of data from file (use '-' for stdin)" + from ..fileutil import get_data_from_file + from ..util import pretty_hexdump + data = get_data_from_file( infile, dash=True, quiet=True, binary=True ) + return pretty_hexdump( data, cols=cols, line_nums=line_nums ).rstrip() + + def unhexdump(self,infile:str): + "decode hexdump from file (use '-' for stdin) (warning: outputs binary data)" + from ..globalvars import g + if g.platform == 'win': + import msvcrt + msvcrt.setmode( sys.stdout.fileno(), os.O_BINARY ) + from ..fileutil import get_data_from_file + from ..util import decode_pretty_hexdump + hexdata = get_data_from_file( infile, dash=True, quiet=True ) + return decode_pretty_hexdump(hexdata) + + def hash160(self,hexstr:'sstr'): + "compute ripemd160(sha256(data)) (convert hex pubkey to hex addr)" + from ..protocol import hash160 + return hash160( bytes.fromhex(hexstr) ).hex() + + def hash256(self,string_or_bytes:str,file_input=False,hex_input=False): # TODO: handle stdin + "compute sha256(sha256(data)) (double sha256)" + from hashlib import sha256 + if file_input: + from ..fileutil import get_data_from_file + b = get_data_from_file( string_or_bytes, binary=True ) + elif hex_input: + from ..util import decode_pretty_hexdump + b = decode_pretty_hexdump(string_or_bytes) + else: + b = string_or_bytes + return sha256(sha256(b.encode()).digest()).hexdigest() + + def id6(self,infile:str): + "generate 6-character MMGen ID for a file (use '-' for stdin)" + from ..util import make_chksum_6 + from ..fileutil import get_data_from_file + return make_chksum_6( + get_data_from_file( infile, dash=True, quiet=True, binary=True )) + + def str2id6(self,string:'sstr'): # retain ignoring of space for backwards compat + "generate 6-character MMGen ID for a string, ignoring spaces" + from ..util import make_chksum_6 + return make_chksum_6( ''.join(string.split()) ) + + def id8(self,infile:str): + "generate 8-character MMGen ID for a file (use '-' for stdin)" + from ..util import make_chksum_8 + from ..fileutil import get_data_from_file + return make_chksum_8( + get_data_from_file( infile, dash=True, quiet=True, binary=True )) + + def randb58(self,nbytes=32,pad=0): + "generate random data (default: 32 bytes) and convert it to base 58" + from ..crypto import get_random + from ..baseconv import baseconv + return baseconv('b58').frombytes( get_random(nbytes), pad=pad, tostr=True ) + + def bytestob58(self,infile:str,pad=0): + "convert bytes to base 58 (supply data via STDIN)" + from ..fileutil import get_data_from_file + from ..baseconv import baseconv + data = get_data_from_file( infile, dash=True, quiet=True, binary=True ) + return baseconv('b58').frombytes( data, pad=pad, tostr=True ) + + def b58tobytes(self,b58num:'sstr',pad=0): + "convert a base 58 number to bytes (warning: outputs binary data)" + from ..baseconv import baseconv + return baseconv('b58').tobytes( b58num, pad=pad ) + + def hextob58(self,hexstr:'sstr',pad=0): + "convert a hexadecimal number to base 58" + from ..baseconv import baseconv + return baseconv('b58').fromhex( hexstr, pad=pad, tostr=True ) + + def b58tohex(self,b58num:'sstr',pad=0): + "convert a base 58 number to hexadecimal" + from ..baseconv import baseconv + return baseconv('b58').tohex( b58num, pad=pad ) + + def hextob58chk(self,hexstr:'sstr'): + "convert a hexadecimal number to base58-check encoding" + from ..protocol import _b58chk_encode + return _b58chk_encode( bytes.fromhex(hexstr) ) + + def b58chktohex(self,b58chk_num:'sstr'): + "convert a base58-check encoded number to hexadecimal" + from ..protocol import _b58chk_decode + return _b58chk_decode(b58chk_num).hex() + + def hextob32(self,hexstr:'sstr',pad=0): + "convert a hexadecimal number to MMGen's flavor of base 32" + from ..baseconv import baseconv + return baseconv('b32').fromhex( hexstr, pad, tostr=True ) + + def b32tohex(self,b32num:'sstr',pad=0): + "convert an MMGen-flavor base 32 number to hexadecimal" + from ..baseconv import baseconv + return baseconv('b32').tohex( b32num.upper(), pad ) + + def hextob6d(self,hexstr:'sstr',pad=0,add_spaces=True): + "convert a hexadecimal number to die roll base6 (base6d)" + from ..baseconv import baseconv + from ..util import block_format + ret = baseconv('b6d').fromhex(hexstr,pad,tostr=True) + return block_format( ret, gw=5, cols=None ).strip() if add_spaces else ret + + def b6dtohex(self,b6d_num:'sstr',pad=0): + "convert a die roll base6 (base6d) number to hexadecimal" + from ..baseconv import baseconv + from ..util import remove_whitespace + return baseconv('b6d').tohex( remove_whitespace(b6d_num), pad ) diff --git a/mmgen/tool/wallet.py b/mmgen/tool/wallet.py new file mode 100755 index 00000000..172be958 --- /dev/null +++ b/mmgen/tool/wallet.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +tool/wallet.py: Wallet routines for the 'mmgen-tool' utility +""" + +from .common import tool_cmd_base + +from ..opts import opt +from ..fileutil import get_seed_file +from ..subseed import SubSeedList +from ..seedsplit import MasterShareIdx +from ..wallet import Wallet + +class tool_cmd(tool_cmd_base): + "key, address or subseed generation from an MMGen wallet" + + def __init__(self,proto=None,mmtype=None): + if proto: + self.proto = proto + else: + from ..protocol import init_proto_from_opts + self.proto = init_proto_from_opts() + + def get_subseed(self,subseed_idx:str,wallet=''): + "get the Seed ID of a single subseed by Subseed Index for default or specified wallet" + opt.quiet = True + sf = get_seed_file([wallet] if wallet else [],1) + return Wallet(sf).seed.subseed(subseed_idx).sid + + def get_subseed_by_seed_id(self,seed_id:str,wallet='',last_idx=SubSeedList.dfl_len): + "get the Subseed Index of a single subseed by Seed ID for default or specified wallet" + opt.quiet = True + sf = get_seed_file([wallet] if wallet else [],1) + ret = Wallet(sf).seed.subseed_by_seed_id( seed_id, last_idx ) + return ret.ss_idx if ret else None + + def list_subseeds(self,subseed_idx_range:str,wallet=''): + "list a range of subseed Seed IDs for default or specified wallet" + opt.quiet = True + sf = get_seed_file([wallet] if wallet else [],1) + from ..subseed import SubSeedIdxRange + return Wallet(sf).seed.subseeds.format( *SubSeedIdxRange(subseed_idx_range) ) + + def list_shares(self, + share_count: int, + id_str = 'default', + master_share: f'(min:1, max:{MasterShareIdx.max_val}, 0=no master share)' = 0, + wallet = '' ): + "list the Seed IDs of the shares resulting from a split of default or specified wallet" + opt.quiet = True + sf = get_seed_file([wallet] if wallet else [],1) + return Wallet(sf).seed.split( share_count, id_str, master_share ).format() + + def gen_key(self,mmgen_addr:str,wallet=''): + "generate a single MMGen WIF key from default or specified wallet" + return self.gen_addr( mmgen_addr, wallet, target='wif' ) + + def gen_addr(self,mmgen_addr:str,wallet='',target='addr'): + "generate a single MMGen address from default or specified wallet" + from ..addr import MMGenID + from ..addrlist import AddrList,AddrIdxList + addr = MMGenID( self.proto, mmgen_addr ) + opt.quiet = True + sf = get_seed_file([wallet] if wallet else [],1) + ss = Wallet(sf) + if ss.seed.sid != addr.sid: + from ..util import die + die(1,f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})') + d = AddrList( + proto = self.proto, + seed = ss.seed, + addr_idxs = AddrIdxList(str(addr.idx)), + mmtype = addr.mmtype ).data[0] + return d.sec.wif if target == 'wif' else d.addr diff --git a/mmgen/wallet.py b/mmgen/wallet.py index 7d99dce8..56c84744 100755 --- a/mmgen/wallet.py +++ b/mmgen/wallet.py @@ -1156,8 +1156,8 @@ harder to find, you're advised to choose a much larger file size than this. break msg(f'File size must be an integer no less than {min_fsize}') - from .tool import MMGenToolCmdFileUtil - MMGenToolCmdFileUtil().rand2file(fn,str(fsize)) + from .tool.fileutil import tool_cmd + tool_cmd().rand2file(fn,str(fsize)) check_offset = False else: die(1,'Exiting at user request') diff --git a/test/misc/tool_api_test.py b/test/misc/tool_api_test.py index 66db0830..68b28957 100755 --- a/test/misc/tool_api_test.py +++ b/test/misc/tool_api_test.py @@ -63,7 +63,7 @@ def test_triplet(tool,coin,network,addrtype,key_idx,wif_chk,addr_chk): def run_test(): - from mmgen.tool import tool_api + from mmgen.tool.api import tool_api tool = tool_api() tool.coins diff --git a/test/overlay/__init__.py b/test/overlay/__init__.py index 016694ac..9f9aa601 100644 --- a/test/overlay/__init__.py +++ b/test/overlay/__init__.py @@ -36,6 +36,7 @@ def overlay_setup(repo_root): 'mmgen', 'mmgen.data', 'mmgen.share', + 'mmgen.tool', 'mmgen.altcoins', 'mmgen.altcoins.eth', 'mmgen.altcoins.eth.pyethereum', diff --git a/test/test_py_d/ts_ethdev.py b/test/test_py_d/ts_ethdev.py index 90fe3208..f5b2e652 100755 --- a/test/test_py_d/ts_ethdev.py +++ b/test/test_py_d/ts_ethdev.py @@ -335,7 +335,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared): if g.daemon_id == 'erigon': from hashlib import sha256 - from mmgen.tool import tool_api + from mmgen.tool.api import tool_api devkey = sha256(b'erigon devnet key').hexdigest() t = tool_api() t.init_coin(g.coin,'regtest') @@ -837,9 +837,9 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared): async def token_transfer_ops(self,op,amt=1000): self.spawn('',msg_only=True) sid = dfl_sid - from mmgen.tool import MMGenToolCmdWallet + from mmgen.tool.wallet import tool_cmd usr_mmaddrs = [f'{sid}:E:{i}' for i in (11,21)] - usr_addrs = [MMGenToolCmdWallet(proto=self.proto).gen_addr(addr,dfl_words_file) for addr in usr_mmaddrs] + usr_addrs = [tool_cmd(proto=self.proto).gen_addr(addr,dfl_words_file) for addr in usr_mmaddrs] from mmgen.altcoins.eth.contract import TokenResolve from mmgen.altcoins.eth.tx import EthereumMMGenTX as etx diff --git a/test/test_py_d/ts_regtest.py b/test/test_py_d/ts_regtest.py index b2d0cbe3..0cbb1a95 100755 --- a/test/test_py_d/ts_regtest.py +++ b/test/test_py_d/ts_regtest.py @@ -132,7 +132,7 @@ rt_data = { } def create_burn_addr(proto): - from mmgen.tool import tool_api + from mmgen.tool.api import tool_api t = tool_api() t.init_coin(proto.coin,proto.network) t.addrtype = 'compressed' @@ -808,7 +808,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared): f'Replacing transactions:\s+{new_txid}' ) def _gen_pairs(self,n): - from mmgen.tool import tool_api + from mmgen.tool.api import tool_api t = tool_api() t.init_coin(self.proto.coin,self.proto.network) diff --git a/test/test_py_d/ts_tool.py b/test/test_py_d/ts_tool.py index 81d1d045..181884f7 100755 --- a/test/test_py_d/ts_tool.py +++ b/test/test_py_d/ts_tool.py @@ -32,14 +32,14 @@ class TestSuiteTool(TestSuiteMain,TestSuiteBase): def tool_rand2file(self): outfile = os.path.join(self.tmpdir,'rand2file.out') - from mmgen.tool import MMGenToolCmdUtil + from mmgen.util import parse_bytespec for nbytes in ('1','1023','1K','1048575','1M','1048577','123M'): t = self.spawn( 'mmgen-tool', ['-d',self.tmpdir,'-r0','rand2file','rand2file.out',nbytes], extra_desc='({} byte{})'.format( nbytes, - suf(MMGenToolCmdUtil().bytespec(nbytes)) ) + suf(parse_bytespec(nbytes)) ) ) t.expect('random data written to file') t.read() diff --git a/test/test_py_d/ts_xmrwallet.py b/test/test_py_d/ts_xmrwallet.py index e925828b..1545b409 100755 --- a/test/test_py_d/ts_xmrwallet.py +++ b/test/test_py_d/ts_xmrwallet.py @@ -560,7 +560,7 @@ class TestSuiteXMRWallet(TestSuiteBase): die(2,'Restart attempt limit exceeded') async def send_random_txs(): - from mmgen.tool import tool_api + from mmgen.tool.api import tool_api t = tool_api() t.init_coin('XMR','testnet') t.usr_randchars = 0 diff --git a/test/tooltest2.py b/test/tooltest2.py index cc61c798..280d9418 100755 --- a/test/tooltest2.py +++ b/test/tooltest2.py @@ -23,7 +23,7 @@ test/tooltest2.py: Simple tests for the 'mmgen-tool' utility # TODO: move all non-interactive 'mmgen-tool' tests in 'test.py' here # TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?) -import sys,os,time +import sys,os,time,importlib from subprocess import run,PIPE from decimal import Decimal @@ -757,10 +757,105 @@ tests = { }, } -coin_dependent_groups = ('Coin','File') # TODO: do this as attr of each group in tool.py +coin_dependent_groups = ('Coin','File') -async def run_test(gid,cmd_name): +def fork_cmd(cmd_name,args,out,opts,stdin_input): + cmd = ( + tool_cmd_preargs + + tool_cmd + + (opts or []) + + [cmd_name] + args + ) + vmsg('{} {}'.format( + green('Executing'), + cyan(' '.join(cmd)) )) + cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE) + try: + cmd_out = cp.stdout.decode() + except: + cmd_out = cp.stdout + if cp.stderr: + vmsg(cp.stderr.strip().decode()) + if cp.returncode != 0: + import re + m = re.match(b'tool command returned (None|False)'+NL.encode(),cp.stderr) + if m: + return { b'None': None, b'False': False }[m.group(1)] + else: + ydie(1,f'Spawned program exited with error: {cp.stderr}') + + return cmd_out.strip() + +async def call_method(cls,method,cmd_name,args,out,opts,mmtype,stdin_input): + vmsg('{}: {}{}'.format(purple('Running'), + ' '.join([cmd_name]+[repr(e) for e in args]), + ' '+mmtype if mmtype else '' )) + aargs,kwargs = main_tool.process_args(cmd_name,args,cls) + oq_save = bool(opt.quiet) + if not opt.verbose: + opt.quiet = True + if stdin_input: + fd0,fd1 = os.pipe() + if os.fork(): # parent + os.close(fd1) + stdin_save = os.dup(0) + os.dup2(fd0,0) + cmd_out = method(*aargs,**kwargs) + os.dup2(stdin_save,0) + os.wait() + opt.quiet = oq_save + return cmd_out + else: # child + os.close(fd0) + os.write(fd1,stdin_input) + vmsg(f'Input: {stdin_input!r}') + sys.exit(0) + else: + ret = method(*aargs,**kwargs) + if type(ret).__name__ == 'coroutine': + ret = await ret + opt.quiet = oq_save + return ret + +def tool_api(cls,cmd_name,args,out,opts): + from mmgen.tool.api import tool_api + tool = tool_api() + if opts: + for o in opts: + if o.startswith('--type='): + tool.addrtype = o.split('=')[1] + pargs,kwargs = main_tool.process_args(cmd_name,args,cls) + return getattr(tool,cmd_name)(*pargs,**kwargs) + +def check_output(out,chk): + if isinstance(chk,str): + chk = chk.encode() + if isinstance(out,int): + out = str(out).encode() + if isinstance(out,str): + out = out.encode() + err_fs = "Output ({!r}) doesn't match expected output ({!r})" + try: outd = out.decode() + except: outd = None + + if type(chk).__name__ == 'function': + assert chk(outd), f'{chk.__name__}({outd}) failed!' + elif type(chk) == dict: + for k,v in chk.items(): + if k == 'boolfunc': + assert v(outd), f'{v.__name__}({outd}) failed!' + elif k == 'value': + assert outd == v, err_fs.format(outd,v) + else: + outval = getattr(__builtins__,k)(out) + if outval != v: + die(1,f'{k}({out}) returned {outval}, not {v}!') + elif chk is not None: + assert out == chk, err_fs.format(out,chk) + +async def run_test(cls,gid,cmd_name): data = tests[gid][cmd_name] + # behavior is like test.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC if gid in coin_dependent_groups: k = '{}_{}'.format( @@ -776,85 +871,14 @@ async def run_test(gid,cmd_name): if proto.coin != 'BTC' or proto.testnet: return m2 = '' + m = '{} {}{}'.format( purple('Testing'), - cmd_name if opt.names else docstring_head(tc[cmd_name]), + cmd_name if opt.names else docstring_head(getattr(cls,cmd_name)), m2 ) msg_r(green(m)+'\n' if opt.verbose else m) - def fork_cmd(cmd_name,args,out,opts): - cmd = ( - tool_cmd_preargs + - tool_cmd + - (opts or []) + - [cmd_name] + args - ) - vmsg('{} {}'.format( - green('Executing'), - cyan(' '.join(cmd)) )) - cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE) - try: - cmd_out = cp.stdout.decode() - except: - cmd_out = cp.stdout - if cp.stderr: - vmsg(cp.stderr.strip().decode()) - if cp.returncode != 0: - import re - m = re.match(b'tool command returned (None|False)'+NL.encode(),cp.stderr) - if m: - return { b'None': None, b'False': False }[m.group(1)] - else: - ydie(1,f'Spawned program exited with error: {cp.stderr}') - - return cmd_out.strip() - - async def run_func(cmd_name,args,out,opts,mmtype): - vmsg('{}: {}{}'.format(purple('Running'), - ' '.join([cmd_name]+[repr(e) for e in args]), - ' '+mmtype if mmtype else '' )) - aargs,kwargs = tool._process_args(cmd_name,args) - tm = tool.MMGenToolCmdMeta - cls_name = tm.classname(tm,cmd_name) - tobj = getattr(tool,cls_name)(mmtype=mmtype) - method = getattr(tobj,cmd_name) - oq_save = bool(opt.quiet) - if not opt.verbose: - opt.quiet = True - if stdin_input: - fd0,fd1 = os.pipe() - if os.fork(): # parent - os.close(fd1) - stdin_save = os.dup(0) - os.dup2(fd0,0) - cmd_out = method(*aargs,**kwargs) - os.dup2(stdin_save,0) - os.wait() - opt.quiet = oq_save - return cmd_out - else: # child - os.close(fd0) - os.write(fd1,stdin_input) - vmsg(f'Input: {stdin_input!r}') - sys.exit(0) - else: - ret = method(*aargs,**kwargs) - if type(ret).__name__ == 'coroutine': - ret = await ret - opt.quiet = oq_save - return ret - - def tool_api(cmd_name,args,out,opts): - from mmgen.tool import tool_api,_process_args - tool = tool_api() - if opts: - for o in opts: - if o.startswith('--type='): - tool.addrtype = o.split('=')[1] - pargs,kwargs = _process_args(cmd_name,args) - return getattr(tool,cmd_name)(*pargs,**kwargs) - for d in data: args,out,opts,mmtype = d + tuple([None] * (4-len(d))) stdin_input = None @@ -865,43 +889,20 @@ async def run_test(gid,cmd_name): if opt.tool_api: if args and args[0 ]== '-': continue - cmd_out = tool_api(cmd_name,args,out,opts) + cmd_out = tool_api(cls,cmd_name,args,out,opts) elif opt.fork: - cmd_out = fork_cmd(cmd_name,args,out,opts) + cmd_out = fork_cmd(cmd_name,args,out,opts,stdin_input) else: if stdin_input and g.platform == 'win': msg('Skipping for MSWin - no os.fork()') continue - cmd_out = await run_func(cmd_name,args,out,opts,mmtype) + method = getattr(cls(proto=proto,mmtype=mmtype),cmd_name) + cmd_out = await call_method(cls,method,cmd_name,args,out,opts,mmtype,stdin_input) - try: vmsg(f'Output:\n{cmd_out}\n') - except: vmsg(f'Output:\n{cmd_out!r}\n') - - def check_output(out,chk): - if isinstance(chk,str): - chk = chk.encode() - if isinstance(out,int): - out = str(out).encode() - if isinstance(out,str): - out = out.encode() - err_fs = "Output ({!r}) doesn't match expected output ({!r})" - try: outd = out.decode() - except: outd = None - - if type(chk).__name__ == 'function': - assert chk(outd), f'{chk.__name__}({outd}) failed!' - elif type(chk) == dict: - for k,v in chk.items(): - if k == 'boolfunc': - assert v(outd), f'{v.__name__}({outd}) failed!' - elif k == 'value': - assert outd == v, err_fs.format(outd,v) - else: - outval = getattr(__builtins__,k)(out) - if outval != v: - die(1,f'{k}({out}) returned {outval}, not {v}!') - elif chk is not None: - assert out == chk, err_fs.format(out,chk) + try: + vmsg(f'Output:\n{cmd_out}\n') + except: + vmsg(f'Output:\n{cmd_out!r}\n') if type(out) == tuple and type(out[0]).__name__ == 'function': func_out = out[0](cmd_out) @@ -917,7 +918,9 @@ async def run_test(gid,cmd_name): else: check_output(cmd_out,out) - if not opt.verbose: msg_r('.') + if not opt.verbose: + msg_r('.') + if not opt.verbose: msg('OK') @@ -926,28 +929,30 @@ def docstring_head(obj): async def do_group(gid): desc = f'command group {gid!r}' + cls = main_tool.get_mod_cls(gid.lower()) qmsg(blue('Testing ' + desc if opt.names else - ( docstring_head(tc.classes['MMGenToolCmd'+gid]) or desc ) + ( docstring_head(cls) or desc ) )) - for cname in tc.classes['MMGenToolCmd'+gid].user_commands: - if cname in skipped_tests: + for cmdname in cls().user_commands: + if cmdname in skipped_tests: continue - if cname not in tests[gid]: - m = f'No test for command {cname!r} in group {gid!r}!' + if cmdname not in tests[gid]: + m = f'No test for command {cmdname!r} in group {gid!r}!' if opt.die_on_missing: die(1,m+' Aborting') else: msg(m) continue - await run_test(gid,cname) + await run_test(cls,gid,cmdname) -async def do_cmd_in_group(cmd): - for gid in tests: - for cname in tests[gid]: - if cname == cmd: - await run_test(gid,cname) +async def do_cmd_in_group(cmdname): + cls = main_tool.get_cmd_cls(cmdname) + for gid,cmds in tests.items(): + for cmd in cmds: + if cmd == cmdname: + await run_test(cls,gid,cmdname) return True return False @@ -972,15 +977,13 @@ if opt.tool_api: del tests['Wallet'] del tests['File'] -import mmgen.tool as tool -tc = tool.MMGenToolCmds +import mmgen.main_tool as main_tool if opt.list_tests: Msg('Available tests:') - for gid in tests: - Msg(' {:6} - {}'.format( - gid, - docstring_head(tc.classes['MMGenToolCmd'+gid]) )) + for modname,cmdlist in main_tool.mods.items(): + cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd') + Msg(' {:6} - {}'.format( modname, docstring_head(cls) )) sys.exit(0) if opt.list_tested_cmds: