modularize the 'mmgen-tool' utility

- each command group now has its own module under the `tool` directory
- only modules required by a given command are loaded
- code used only by the command-line tool has been relocated to `main_tool.py`
This commit is contained in:
The MMGen Project 2022-01-24 19:30:13 +00:00
commit 341ee2c741
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
25 changed files with 1770 additions and 1306 deletions

View file

@ -1 +1 @@
13.1.dev008
13.1.dev009

View file

@ -39,6 +39,10 @@ def help_notes_func(proto,po,k):
class help_notes:
def tool_help():
from .tool.help import main_help
return main_help()
def dfl_subseeds():
from .subseed import SubSeedList
return str(SubSeedList.dfl_len)

View file

@ -21,39 +21,9 @@ mmgen-tool: Perform various MMGen- and cryptocoin-related operations.
Part of the MMGen suite
"""
import os,importlib
from .common import *
def make_cmd_help():
import mmgen.tool
def do():
for bc in mmgen.tool.MMGenToolCmds.classes.values():
cls_doc = bc.__doc__.strip().split('\n')
for l in cls_doc:
if l is cls_doc[0]:
l += ':'
l = l.replace('\t','',1)
if l:
l = l.replace('\t',' ')
yield l[0].upper() + l[1:]
else:
yield ''
yield ''
max_w = max(map(len,bc.user_commands))
for name,code in sorted(bc.user_commands.items()):
if code.__doc__:
yield ' {:{}} - {}'.format(
name,
max_w,
pretty_format(
code.__doc__.strip().replace('\n\t\t',' '),
width = 79-(max_w+7),
pfx = ' '*(max_w+5)).lstrip()
)
yield ''
return '\n'.join(do())
opts_data = {
'text': {
'desc': f'Perform various {g.proj_name}- and cryptocoin-related operations',
@ -91,32 +61,288 @@ Type '{pn} help <command>' for help on a particular command
coin_id=help_notes('coin_id'),
g=g,
),
'notes': lambda s: s.format(
ch=make_cmd_help(),
'notes': lambda s, help_notes: s.format(
ch=help_notes('tool_help'),
pn=g.prog_name)
}
}
cmd_args = opts.init(opts_data)
mods = {
'help': (
'help',
'usage',
),
'util': (
'bytespec',
'to_bytespec',
'randhex',
'hexreverse',
'hexlify',
'unhexlify',
'hexdump',
'unhexdump',
'hash160',
'hash256',
'id6',
'str2id6',
'id8',
'randb58',
'bytestob58',
'b58tobytes',
'hextob58',
'b58tohex',
'hextob58chk',
'b58chktohex',
'hextob32',
'b32tohex',
'hextob6d',
'b6dtohex',
),
'coin': (
'randwif',
'randpair',
'wif2hex',
'hex2wif',
'wif2addr',
'wif2redeem_script',
'wif2segwit_pair',
'privhex2addr',
'privhex2pubhex',
'pubhex2addr',
'pubhex2redeem_script',
'redeem_script2addr',
'pubhash2addr',
'addr2pubhash',
'addr2scriptpubkey',
'scriptpubkey2addr',
),
'mnemonic': (
'mn_rand128',
'mn_rand192',
'mn_rand256',
'hex2mn',
'mn2hex',
'mn2hex_interactive',
'mn_stats',
'mn_printlist',
),
'file': (
'addrfile_chksum',
'keyaddrfile_chksum',
'passwdfile_chksum',
'txview',
),
'filecrypt': (
'encrypt',
'decrypt',
),
'fileutil': (
'find_incog_data',
'rand2file',
),
'wallet': (
'get_subseed',
'get_subseed_by_seed_id',
'list_subseeds',
'list_shares',
'gen_key',
'gen_addr',
),
'rpc': (
'daemon_version',
'getbalance',
'listaddress',
'listaddresses',
'twview',
'add_label',
'remove_label',
'remove_address',
),
}
if len(cmd_args) < 1:
opts.usage()
def create_call_sig(cmd,cls,parsed=False):
cmd = cmd_args.pop(0)
m = getattr(cls,cmd)
import mmgen.tool as tool
if 'varargs_call_sig' in m.__code__.co_varnames: # hack
flag = 'VAR_ARGS'
va = m.__defaults__[0]
args,dfls,ann = va['args'],va['dfls'],va['annots']
else:
flag = None
args = m.__code__.co_varnames[1:m.__code__.co_argcount]
dfls = m.__defaults__ or ()
ann = m.__annotations__
if cmd in ('help','usage') and cmd_args:
cmd_args[0] = 'command_name=' + cmd_args[0]
nargs = len(args) - len(dfls)
if cmd not in tool.MMGenToolCmds:
die(1,f'{cmd!r}: no such command')
def get_type_from_ann(arg):
return ann[arg][1:] + (' or STDIN','')[parsed] if ann[arg] == 'sstr' else ann[arg].__name__
args,kwargs = tool._process_args(cmd,cmd_args)
if parsed:
c_args = [(a,get_type_from_ann(a)) for a in args[:nargs]]
c_kwargs = [(a,dfls[n]) for n,a in enumerate(args[nargs:])]
return (
c_args,
dict(c_kwargs),
'STDIN_OK' if c_args and ann[args[0]] == 'sstr' else flag )
else:
c_args = [f'{a} [{get_type_from_ann(a)}]' for a in args[:nargs]]
c_kwargs = ['"{}" [{}={!r}{}]'.format(
a,
type(dfls[n]).__name__, dfls[n],
(' ' + ann[a] if a in ann else '') )
for n,a in enumerate(args[nargs:])]
return ' '.join(c_args + c_kwargs)
ret = tool.MMGenToolCmds.call(cmd,*args,**kwargs)
def process_args(cmd,cmd_args,cls):
c_args,c_kwargs,flag = create_call_sig(cmd,cls,parsed=True)
have_stdin_input = False
if type(ret).__name__ == 'coroutine':
ret = run_session(ret)
def usage_die(s):
msg(s)
from .tool.help import usage
usage(cmd)
tool._process_result(ret,pager='pager' in kwargs and kwargs['pager'],print_result=True)
if flag != 'VAR_ARGS':
if len(cmd_args) < len(c_args):
usage_die(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}')
u_args = cmd_args[:len(c_args)]
# If we're reading from a pipe, replace '-' with output of previous command
if flag == 'STDIN_OK' and u_args and u_args[0] == '-':
if sys.stdin.isatty():
raise BadFilename("Standard input is a TTY. Can't use '-' as a filename")
else:
max_dlen_spec = '10kB' # limit input to 10KB for now
max_dlen = parse_bytespec(max_dlen_spec)
u_args[0] = os.read(0,max_dlen)
have_stdin_input = True
if len(u_args[0]) >= max_dlen:
die(2,f'Maximum data input for this command is {max_dlen_spec}')
if not u_args[0]:
die(2,f'{cmd}: ERROR: no output from previous command in pipe')
u_nkwargs = len(cmd_args) - len(c_args)
u_kwargs = {}
if flag == 'VAR_ARGS':
t = [a.split('=',1) for a in cmd_args if '=' in a]
tk = [a[0] for a in t]
tk_bad = [a for a in tk if a not in c_kwargs]
if set(tk_bad) != set(tk[:len(tk_bad)]): # permit non-kw args to contain '='
die(1,f'{tk_bad[-1]!r}: illegal keyword argument')
u_kwargs = dict(t[len(tk_bad):])
u_args = cmd_args[:-len(u_kwargs) or None]
elif u_nkwargs > 0:
u_kwargs = dict([a.split('=',1) for a in cmd_args[len(c_args):] if '=' in a])
if len(u_kwargs) != u_nkwargs:
usage_die(f'Command requires exactly {len(c_args)} non-keyword argument{suf(c_args)}')
if len(u_kwargs) > len(c_kwargs):
usage_die(f'Command accepts no more than {len(c_kwargs)} keyword argument{suf(c_kwargs)}')
for k in u_kwargs:
if k not in c_kwargs:
usage_die(f'{k!r}: invalid keyword argument')
def conv_type(arg,arg_name,arg_type):
if arg_type == 'bytes' and type(arg) != bytes:
die(1,"'Binary input data must be supplied via STDIN")
if have_stdin_input and arg_type == 'str' and isinstance(arg,bytes):
from .globalvars import g
NL = '\r\n' if g.platform == 'win' else '\n'
arg = arg.decode()
if arg[-len(NL):] == NL: # rstrip one newline
arg = arg[:-len(NL)]
if arg_type == 'bool':
if arg.lower() in ('true','yes','1','on'):
arg = True
elif arg.lower() in ('false','no','0','off'):
arg = False
else:
usage_die(f'{arg!r}: invalid boolean value for keyword argument')
try:
return __builtins__[arg_type](arg)
except:
die(1,f'{arg!r}: Invalid argument for argument {arg_name} ({arg_type!r} required)')
if flag == 'VAR_ARGS':
args = [conv_type(u_args[i],c_args[0][0],c_args[0][1]) for i in range(len(u_args))]
else:
args = [conv_type(u_args[i],c_args[i][0],c_args[i][1]) for i in range(len(c_args))]
kwargs = {k:conv_type(u_kwargs[k],k,type(c_kwargs[k]).__name__) for k in u_kwargs}
return ( args, kwargs )
def process_result(ret,pager=False,print_result=False):
from .util import Msg,ydie,parse_bytespec
"""
Convert result to something suitable for output to screen and return it.
If result is bytes and not convertible to utf8, output as binary using os.write().
If 'print_result' is True, send the converted result directly to screen or
pager instead of returning it.
"""
def triage_result(o):
return o if not print_result else do_pager(o) if pager else Msg(o)
if ret == True:
return True
elif ret in (False,None):
ydie(1,f'tool command returned {ret!r}')
elif isinstance(ret,str):
return triage_result(ret)
elif isinstance(ret,int):
return triage_result(str(ret))
elif isinstance(ret,tuple):
return triage_result('\n'.join([r.decode() if isinstance(r,bytes) else r for r in ret]))
elif isinstance(ret,bytes):
try:
o = ret.decode()
return o if not print_result else do_pager(o) if pager else Msg(o)
except:
# don't add NL to binary data if it can't be converted to utf8
if print_result:
return os.write(1,ret)
else:
return ret
else:
ydie(1,f'tool.py: can’t handle return value of type {type(ret).__name__!r}')
def get_cmd_cls(cmd):
for modname,cmdlist in mods.items():
if cmd in cmdlist:
return getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
else:
return False
def get_mod_cls(modname):
return getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
if g.prog_name == 'mmgen-tool' and not opt._lock:
cmd_args = opts.init(opts_data)
if len(cmd_args) < 1:
opts.usage()
cmd = cmd_args.pop(0)
if cmd in ('help','usage') and cmd_args:
cmd_args[0] = 'command_name=' + cmd_args[0]
cls = get_cmd_cls(cmd)
if not cls:
die(1,f'{cmd!r}: no such command')
args,kwargs = process_args(cmd,cmd_args,cls)
ret = getattr(cls(),cmd)(*args,**kwargs)
if type(ret).__name__ == 'coroutine':
ret = run_session(ret)
process_result(ret,pager='pager' in kwargs and kwargs['pager'],print_result=True)

View file

@ -44,7 +44,7 @@ def create_hdseed(proto):
# addr=bcrt1qaq8t3pakcftpk095tnqfv5cmmczysls024atnd
# cTEkSYCWKvNo757uwFPd4yuCXsbZvfJDipHsHWFRapXpnikMHvgn label=
# addr=bcrt1q537rgyctcqdgs8nm8gvku05znka4h2m00lx8ps hdkeypath=m/0'/0'/0'
from .tool import tool_api
from .tool.api import tool_api
t = tool_api()
t.init_coin(proto.coin,proto.network)
t.addrtype = 'bech32'

File diff suppressed because it is too large Load diff

4
mmgen/tool/__init__.py Executable file
View file

@ -0,0 +1,4 @@
# provide this for backwards compatibility:
def tool_api(*args,**kwargs):
from .api import tool_api
return tool_api(*args,**kwargs)

147
mmgen/tool/api.py Executable file
View file

@ -0,0 +1,147 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/api.py: tool_api interface for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base
from .util import tool_cmd as util_cmds
from .coin import tool_cmd as coin_cmds
from .mnemonic import tool_cmd as mnemonic_cmds
class tool_api(
util_cmds,
coin_cmds,
mnemonic_cmds,
tool_cmd_base ):
"""
API providing access to a subset of methods from the mmgen.tool module
Example:
from mmgen.tool.api import tool_api
tool = tool_api()
# Set the coin and network:
tool.init_coin('btc','mainnet')
# Print available address types:
tool.print_addrtypes()
# Set the address type:
tool.addrtype = 'segwit'
# Disable user entropy gathering (optional, reduces security):
tool.usr_randchars = 0
# Generate a random BTC segwit keypair:
wif,addr = tool.randpair()
# Set coin, network and address type:
tool.init_coin('ltc','testnet')
tool.addrtype = 'bech32'
# Generate a random LTC testnet Bech32 keypair:
wif,addr = tool.randpair()
"""
def __init__(self):
"""
Initializer - takes no arguments
"""
import mmgen.opts as opts
from ..opts import opt
opts.UserOpts._reset_ok += ('usr_randchars',)
if not opt._lock:
opts.init()
super().__init__()
def init_coin(self,coinsym,network):
"""
Initialize a coin/network pair
Valid choices for coins: one of the symbols returned by the 'coins' attribute
Valid choices for network: 'mainnet','testnet','regtest'
"""
from ..protocol import init_proto,init_genonly_altcoins
from ..util import warn_altcoins
altcoin_trust_level = init_genonly_altcoins(coinsym,testnet=network in ('testnet','regtest'))
warn_altcoins(coinsym,altcoin_trust_level)
self.proto = init_proto(coinsym,network=network)
return self.proto
@property
def coins(self):
"""The available coins"""
from ..protocol import CoinProtocol
from ..altcoin import CoinInfo
return sorted(set(
[c.upper() for c in CoinProtocol.coins]
+ [c.symbol for c in CoinInfo.get_supported_coins(self.proto.network)]
))
@property
def coin(self):
"""The currently configured coin"""
return self.proto.coin
@property
def network(self):
"""The currently configured network"""
return self.proto.network
@property
def addrtypes(self):
"""
The available address types for current coin/network pair. The
first-listed is the default
"""
from ..addr import MMGenAddrType
return [MMGenAddrType(proto=self.proto,id_str=id_str).name for id_str in self.proto.mmtypes]
def print_addrtypes(self):
"""
Print the available address types for current coin/network pair along with
a description. The first-listed is the default
"""
from ..addr import MMGenAddrType
for t in [MMGenAddrType(proto=self.proto,id_str=id_str) for id_str in self.proto.mmtypes]:
print(f'{t.name:<12} - {t.desc}')
@property
def addrtype(self):
"""The currently configured address type (is assignable)"""
return self.mmtype
@addrtype.setter
def addrtype(self,val):
from ..addr import MMGenAddrType
self.mmtype = MMGenAddrType(self.proto,val)
@property
def usr_randchars(self):
"""
The number of keystrokes of entropy to be gathered from the user.
Setting to zero disables user entropy gathering.
"""
from ..opts import opt
return opt.usr_randchars
@usr_randchars.setter
def usr_randchars(self,val):
from ..opts import opt
opt.usr_randchars = val

186
mmgen/tool/coin.py Executable file
View file

@ -0,0 +1,186 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/coin.py: Cryptocoin routines for the 'mmgen-tool' utility
"""
from collections import namedtuple
generator_data = namedtuple('generator_data',['kg','ag'])
from .common import tool_cmd_base
from ..protocol import init_proto,init_genonly_altcoins,hash160
from ..key import PrivKey
from ..addr import KeyGenerator,AddrGenerator,MMGenAddrType,CoinAddr
class tool_cmd(tool_cmd_base):
"""
cryptocoin key/address utilities
May require use of the '--coin', '--type' and/or '--testnet' options
Examples:
mmgen-tool --coin=ltc --type=bech32 wif2addr <wif key>
mmgen-tool --coin=zec --type=zcash_z randpair
"""
def __init__(self,proto=None,mmtype=None):
if proto:
self.proto = proto
else:
from ..protocol import init_proto_from_opts
self.proto = init_proto_from_opts()
from ..opts import opt
self.mmtype = MMGenAddrType(
self.proto,
mmtype or opt.type or self.proto.dfl_mmtype )
from ..globalvars import g
if g.token:
self.proto.tokensym = g.token.upper()
def _init_generators(self,arg=None):
return generator_data(
kg = KeyGenerator( self.proto, self.mmtype.pubkey_type ),
ag = AddrGenerator( self.proto, self.mmtype ),
)
def randwif(self):
"generate a random private key in WIF format"
from ..crypto import get_random
return PrivKey(
self.proto,
get_random(32),
pubkey_type = self.mmtype.pubkey_type,
compressed = self.mmtype.compressed ).wif
def randpair(self):
"generate a random private key/address pair"
gd = self._init_generators()
from ..crypto import get_random
privkey = PrivKey(
self.proto,
get_random(32),
pubkey_type = self.mmtype.pubkey_type,
compressed = self.mmtype.compressed )
return (
privkey.wif,
gd.ag.to_addr( gd.kg.gen_data(privkey) ))
def wif2hex(self,wifkey:'sstr'):
"convert a private key from WIF to hex format"
return PrivKey(
self.proto,
wif = wifkey ).hex()
def hex2wif(self,privhex:'sstr'):
"convert a private key from hex to WIF format"
return PrivKey(
self.proto,
bytes.fromhex(privhex),
pubkey_type = self.mmtype.pubkey_type,
compressed = self.mmtype.compressed ).wif
def wif2addr(self,wifkey:'sstr'):
"generate a coin address from a key in WIF format"
gd = self._init_generators()
privkey = PrivKey(
self.proto,
wif = wifkey )
return gd.ag.to_addr( gd.kg.gen_data(privkey) )
def wif2redeem_script(self,wifkey:'sstr'): # new
"convert a WIF private key to a Segwit P2SH-P2WPKH redeem script"
assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit'
gd = self._init_generators()
privkey = PrivKey(
self.proto,
wif = wifkey )
return gd.ag.to_segwit_redeem_script( gd.kg.gen_data(privkey) )
def wif2segwit_pair(self,wifkey:'sstr'):
"generate both a Segwit P2SH-P2WPKH redeem script and address from WIF"
assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit'
gd = self._init_generators()
data = gd.kg.gen_data(PrivKey(
self.proto,
wif = wifkey ))
return (
gd.ag.to_segwit_redeem_script(data),
gd.ag.to_addr(data) )
def privhex2addr(self,privhex:'sstr',output_pubhex=False):
"generate coin address from raw private key data in hexadecimal format"
gd = self._init_generators()
pk = PrivKey(
self.proto,
bytes.fromhex(privhex),
compressed = self.mmtype.compressed,
pubkey_type = self.mmtype.pubkey_type )
data = gd.kg.gen_data(pk)
return data.pubkey.hex() if output_pubhex else gd.ag.to_addr(data)
def privhex2pubhex(self,privhex:'sstr'): # new
"generate a hex public key from a hex private key"
return self.privhex2addr(privhex,output_pubhex=True)
def pubhex2addr(self,pubkeyhex:'sstr'):
"convert a hex pubkey to an address"
pubkey = bytes.fromhex(pubkeyhex)
if self.mmtype.name == 'segwit':
return self.proto.pubkey2segwitaddr( pubkey )
else:
return self.pubhash2addr( hash160(pubkey).hex() )
def pubhex2redeem_script(self,pubkeyhex:'sstr'): # new
"convert a hex pubkey to a Segwit P2SH-P2WPKH redeem script"
assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit'
return self.proto.pubkey2redeem_script( bytes.fromhex(pubkeyhex) ).hex()
def redeem_script2addr(self,redeem_scripthex:'sstr'): # new
"convert a Segwit P2SH-P2WPKH redeem script to an address"
assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit'
assert redeem_scripthex[:4] == '0014', f'{redeem_scripthex!r}: invalid redeem script'
assert len(redeem_scripthex) == 44, f'{len(redeem_scripthex)//2} bytes: invalid redeem script length'
return self.pubhash2addr( hash160(bytes.fromhex(redeem_scripthex)).hex() )
def pubhash2addr(self,pubhashhex:'sstr'):
"convert public key hash to address"
pubhash = bytes.fromhex(pubhashhex)
if self.mmtype.name == 'bech32':
return self.proto.pubhash2bech32addr( pubhash )
else:
return self.proto.pubhash2addr( pubhash, self.mmtype.addr_fmt=='p2sh' )
def addr2pubhash(self,addr:'sstr'):
"convert coin address to public key hash"
from ..tx import addr2pubhash
return addr2pubhash( self.proto, CoinAddr(self.proto,addr) )
def addr2scriptpubkey(self,addr:'sstr'):
"convert coin address to scriptPubKey"
from ..tx import addr2scriptPubKey
return addr2scriptPubKey( self.proto, CoinAddr(self.proto,addr) )
def scriptpubkey2addr(self,hexstr:'sstr'):
"convert scriptPubKey to coin address"
from ..tx import scriptPubKey2addr
return scriptPubKey2addr( self.proto, hexstr )[0]

33
mmgen/tool/common.py Executable file
View file

@ -0,0 +1,33 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/common.py: Base class and shared routines for the 'mmgen-tool' utility
"""
def options_annot_str(l):
return "(valid options: '{}')".format( "','".join(l) )
class tool_cmd_base:
def __init__(self,proto=None,mmtype=None):
pass
@property
def user_commands(self):
return {k:v for k,v in type(self).__dict__.items() if not k.startswith('_')}

108
mmgen/tool/file.py Executable file
View file

@ -0,0 +1,108 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/file.py: Address and transaction file routines for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base,options_annot_str
class tool_cmd(tool_cmd_base):
"utilities for viewing/checking MMGen address and transaction files"
def __init__(self,proto=None,mmtype=None):
if proto:
self.proto = proto
else:
from ..protocol import init_proto_from_opts
self.proto = init_proto_from_opts()
def _file_chksum(self,mmgen_addrfile,objname):
from ..opts import opt
from ..addrlist import AddrList,KeyAddrList
from ..passwdlist import PasswordList
verbose,yes,quiet = [bool(i) for i in (opt.verbose,opt.yes,opt.quiet)]
opt.verbose,opt.yes,opt.quiet = (False,True,True)
ret = locals()[objname](self.proto,mmgen_addrfile)
opt.verbose,opt.yes,opt.quiet = (verbose,yes,quiet)
if verbose:
from ..util import msg,capfirst
if ret.al_id.mmtype.name == 'password':
msg('Passwd fmt: {}\nPasswd len: {}\nID string: {}'.format(
capfirst(ret.pw_info[ret.pw_fmt].desc),
ret.pw_len,
ret.pw_id_str ))
else:
msg(f'Base coin: {ret.base_coin} {capfirst(ret.network)}')
msg(f'MMType: {capfirst(ret.al_id.mmtype.name)}')
msg( f'List length: {len(ret.data)}')
return ret.chksum
def addrfile_chksum(self,mmgen_addrfile:str):
"compute checksum for MMGen address file"
return self._file_chksum(mmgen_addrfile,'AddrList')
def keyaddrfile_chksum(self,mmgen_keyaddrfile:str):
"compute checksum for MMGen key-address file"
return self._file_chksum(mmgen_keyaddrfile,'KeyAddrList')
def passwdfile_chksum(self,mmgen_passwdfile:str):
"compute checksum for MMGen password file"
return self._file_chksum(mmgen_passwdfile,'PasswordList')
async def txview(
varargs_call_sig = { # hack to allow for multiple filenames
'args': (
'mmgen_tx_file(s)',
'pager',
'terse',
'sort',
'filesort' ),
'dfls': ( False, False, 'addr', 'mtime' ),
'annots': {
'mmgen_tx_file(s)': str,
'sort': options_annot_str(['addr','raw']),
'filesort': options_annot_str(['mtime','ctime','atime']),
}
},
*infiles,
**kwargs ):
"show raw/signed MMGen transaction in human-readable form"
terse = bool(kwargs.get('terse'))
tx_sort = kwargs.get('sort') or 'addr'
file_sort = kwargs.get('filesort') or 'mtime'
from ..filename import MMGenFileList
from ..tx import MMGenTX
flist = MMGenFileList( infiles, ftype=MMGenTX )
flist.sort_by_age( key=file_sort ) # in-place sort
async def process_file(fn):
if fn.endswith(MMGenTX.Signed.ext):
tx = MMGenTX.Signed(
filename = fn,
quiet_open = True,
tw = await MMGenTX.Signed.get_tracking_wallet(fn) )
else:
tx = MMGenTX.Unsigned(
filename = fn,
quiet_open = True )
return tx.format_view( terse=terse, sort=tx_sort )
return (''*77+'\n').join([await process_file(fn) for fn in flist.names()]).rstrip()

62
mmgen/tool/filecrypt.py Executable file
View file

@ -0,0 +1,62 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/filecrypt.py: File encryption/decryption routines for the 'mmgen-tool' utility
"""
import os
from .common import tool_cmd_base
from ..crypto import mmgen_encrypt,mmgen_decrypt,mmenc_ext
from ..fileutil import get_data_from_file,write_data_to_file
class tool_cmd(tool_cmd_base):
"""
file encryption and decryption
MMGen encryption suite:
* Key: Scrypt (user-configurable hash parameters, 32-byte salt)
* Enc: AES256_CTR, 16-byte rand IV, sha256 hash + 32-byte nonce + data
* The encrypted file is indistinguishable from random data
"""
def encrypt(self,infile:str,outfile='',hash_preset=''):
"encrypt a file"
data = get_data_from_file( infile, 'data for encryption', binary=True )
enc_d = mmgen_encrypt( data, 'data', hash_preset )
if not outfile:
outfile = f'{os.path.basename(infile)}.{mmenc_ext}'
write_data_to_file( outfile, enc_d, 'encrypted data', binary=True )
return True
def decrypt(self,infile:str,outfile='',hash_preset=''):
"decrypt a file"
enc_d = get_data_from_file( infile, 'encrypted data', binary=True )
while True:
dec_d = mmgen_decrypt( enc_d, 'data', hash_preset )
if dec_d:
break
msg('Trying again...')
if not outfile:
from ..util import remove_extension
o = os.path.basename(infile)
outfile = remove_extension(o,mmenc_ext)
if outfile == o:
outfile += '.dec'
write_data_to_file( outfile, dec_d, 'decrypted data', binary=True )
return True

127
mmgen/tool/fileutil.py Executable file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/fileutil.py: File routines for the 'mmgen-tool' utility
"""
import os
from .common import tool_cmd_base
from ..util import msg,msg_r,qmsg,die,suf,parse_bytespec,make_full_path
from ..crypto import get_random,aesctr_iv_len
class tool_cmd(tool_cmd_base):
"file utilities"
def find_incog_data(self,filename:str,incog_id:str,keep_searching=False):
"Use an Incog ID to find hidden incognito wallet data"
from hashlib import sha256
from ..globalvars import g
ivsize,bsize,mod = ( aesctr_iv_len, 4096, 4096*8 )
n,carry = 0,b' '*ivsize
flgs = os.O_RDONLY|os.O_BINARY if g.platform == 'win' else os.O_RDONLY
f = os.open(filename,flgs)
for ch in incog_id:
if ch not in '0123456789ABCDEF':
die(2,f'{incog_id!r}: invalid Incog ID')
while True:
d = os.read(f,bsize)
if not d:
break
d = carry + d
for i in range(bsize):
if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == incog_id:
if n+i < ivsize:
continue
msg(f'\rIncog data for ID {incog_id} found at offset {n+i-ivsize}')
if not keep_searching:
import sys
sys.exit(0)
carry = d[len(d)-ivsize:]
n += bsize
if not n % mod:
msg_r(f'\rSearched: {n} bytes')
msg('')
os.close(f)
return True
def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False):
"write 'n' bytes of random data to specified file"
from threading import Thread
from queue import Queue
from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes
from cryptography.hazmat.backends import default_backend
from ..opts import opt
def encrypt_worker(wid):
ctr_init_val = os.urandom( aesctr_iv_len )
c = Cipher( algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend() )
encryptor = c.encryptor()
while True:
q2.put( encryptor.update(q1.get()) )
q1.task_done()
def output_worker():
while True:
f.write( q2.get() )
q2.task_done()
nbytes = parse_bytespec(nbytes)
if opt.outdir:
outfile = make_full_path( opt.outdir, outfile )
f = open(outfile,'wb')
key = get_random(32)
q1,q2 = ( Queue(), Queue() )
for i in range(max(1,threads-2)):
t = Thread( target=encrypt_worker, args=[i] )
t.daemon = True
t.start()
t = Thread( target=output_worker )
t.daemon = True
t.start()
blk_size = 1024 * 1024
for i in range(nbytes // blk_size):
if not i % 4:
msg_r(f'\rRead: {i * blk_size} bytes')
q1.put( os.urandom(blk_size) )
if nbytes % blk_size:
q1.put( os.urandom(nbytes % blk_size) )
q1.join()
q2.join()
f.close()
fsize = os.stat(outfile).st_size
if fsize != nbytes:
die(3,f'{fsize}: incorrect random file size (should be {nbytes})')
if not silent:
msg(f'\rRead: {nbytes} bytes')
qmsg(f'\r{nbytes} byte{suf(nbytes)} of random data written to file {outfile!r}')
return True

144
mmgen/tool/help.py Executable file
View file

@ -0,0 +1,144 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/help.py: Help screen routines for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base
import mmgen.main_tool as main_tool
def main_help():
from ..util import pretty_format
def do():
for clsname,cmdlist in main_tool.mods.items():
cls = main_tool.get_mod_cls(clsname)
cls_doc = cls.__doc__.strip().split('\n')
for l in cls_doc:
if l is cls_doc[0]:
l += ':'
l = l.replace('\t','',1)
if l:
l = l.replace('\t',' ')
yield l[0].upper() + l[1:]
else:
yield ''
yield ''
max_w = max(map(len,cmdlist))
for cmdname in cmdlist:
code = getattr(cls,cmdname)
if code.__doc__:
yield ' {:{}} - {}'.format(
cmdname,
max_w,
pretty_format(
code.__doc__.strip().replace('\n\t\t',' '),
width = 79-(max_w+7),
pfx = ' '*(max_w+5)).lstrip()
)
yield ''
return '\n'.join(do())
def usage(cmdname=None,exit_val=1):
m1 = """
USAGE INFORMATION FOR MMGEN-TOOL COMMANDS:
Unquoted arguments are mandatory
Quoted arguments are optional, default values will be used
Argument types and default values are shown in square brackets
"""
m2 = """
To force a command to read from STDIN instead of file (for commands taking
a filename as their first argument), substitute "-" for the filename.
EXAMPLES:
Generate a random Bech32 public/private keypair for LTC:
$ mmgen-tool -r0 --coin=ltc --type=bech32 randpair
Generate a DASH compressed public key address from the supplied WIF key:
$ mmgen-tool --coin=dash --type=compressed wif2addr XJkVRC3eGKurc9Uzx1wfQoio3yqkmaXVqLMTa6y7s3M3jTBnmxfw
Generate a well-known burn address:
$ mmgen-tool hextob58chk 000000000000000000000000000000000000000000
Generate a random 12-word seed phrase:
$ mmgen-tool -r0 mn_rand128
Same as above, but get additional entropy from user:
$ mmgen-tool mn_rand128
Encode bytes from a file to base 58:
$ mmgen-tool bytestob58 /etc/timezone pad=20
Reverse a hex string:
$ mmgen-tool hexreverse "deadbeefcafe"
Same as above, but use a pipe:
$ echo "deadbeefcafe" | mmgen-tool hexreverse -
"""
from ..util import Msg,Msg_r,fmt,die,capfirst
if cmdname:
from ..globalvars import g
for mod,cmdlist in main_tool.mods.items():
if cmdname in cmdlist:
cls = main_tool.get_mod_cls(mod)
p1 = fmt(capfirst(getattr(cls,cmdname).__doc__.strip()),strip_char='\t').strip()
Msg('{}{}\nUSAGE: {} {} {}'.format(
p1,
('\n' if '\n' in p1 else ''),
g.prog_name,cmdname,
main_tool.create_call_sig(cmdname,cls))
)
break
else:
die(1,f'{cmdname!r}: no such tool command')
else:
Msg(fmt(m1,strip_char='\t'))
for clsname,cmdlist in main_tool.mods.items():
cls = main_tool.get_mod_cls(clsname)
cls_info = cls.__doc__.strip().split('\n')[0]
Msg(' {}{}:\n'.format( cls_info[0].upper(), cls_info[1:] ))
max_w = max(map(len,cmdlist))
for cmdname in cmdlist:
Msg(f' {cmdname:{max_w}} {main_tool.create_call_sig(cmdname,cls)}')
Msg('')
Msg_r(' ' + fmt(m2,strip_char='\t'))
import sys
sys.exit(exit_val)
class tool_cmd(tool_cmd_base):
"help/usage commands"
def help(self,command_name=''):
"display usage information for a single command or all commands"
usage(command_name,exit_val=0)
def usage(self,command_name=''):
"display usage information for a single command"
usage(command_name,exit_val=0)

125
mmgen/tool/mnemonic.py Executable file
View file

@ -0,0 +1,125 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/mnemonic.py: Mnemonic routines for the 'mmgen-tool' utility
"""
from collections import namedtuple
from .common import tool_cmd_base,options_annot_str
from ..baseconv import baseconv
from ..xmrseed import xmrseed
from ..bip39 import bip39
dfl_mnemonic_fmt = 'mmgen'
mft = namedtuple('mnemonic_format',['fmt','pad','conv_cls'])
mnemonic_fmts = {
'mmgen': mft( 'words', 'seed', baseconv ),
'bip39': mft( 'bip39', None, bip39 ),
'xmrseed': mft( 'xmrseed', None, xmrseed ),
}
mn_opts_disp = options_annot_str(mnemonic_fmts)
class tool_cmd(tool_cmd_base):
"""
seed phrase utilities (valid formats: 'mmgen' (default), 'bip39', 'xmrseed')
IMPORTANT NOTE: MMGen's default seed phrase format uses the Electrum
wordlist, however seed phrases are computed using a different algorithm
and are NOT Electrum-compatible!
BIP39 support is fully compatible with the standard, allowing users to
import and export seed entropy from BIP39-compatible wallets. However,
users should be aware that BIP39 support does not imply BIP32 support!
MMGen uses its own key derivation scheme differing from the one described
by the BIP32 protocol.
For Monero ('xmrseed') seed phrases, input data is reduced to a spendkey
before conversion so that a canonical seed phrase is produced. This is
required because Monero seeds, unlike ordinary wallet seeds, are tied
to a concrete key/address pair. To manually generate a Monero spendkey,
use the 'hex2wif' command.
"""
@staticmethod
def _xmr_reduce(bytestr):
from ..protocol import init_proto
proto = init_proto('xmr')
if len(bytestr) != proto.privkey_len:
die(1,'{!r}: invalid bit length for Monero private key (must be {})'.format(
len(bytestr*8),
proto.privkey_len*8 ))
return proto.preprocess_key(bytestr,None)
def _do_random_mn(self,nbytes:int,fmt:str):
assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32'
from ..crypto import get_random
randbytes = get_random(nbytes)
if fmt == 'xmrseed':
randbytes = self._xmr_reduce(randbytes)
from ..opts import opt
if opt.verbose:
from ..util import msg
msg(f'Seed: {randbytes.hex()}')
return self.hex2mn(randbytes.hex(),fmt=fmt)
def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"generate random 128-bit mnemonic seed phrase"
return self._do_random_mn(16,fmt)
def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"generate random 192-bit mnemonic seed phrase"
return self._do_random_mn(24,fmt)
def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"generate random 256-bit mnemonic seed phrase"
return self._do_random_mn(32,fmt)
def hex2mn( self, hexstr:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"convert a 16, 24 or 32-byte hexadecimal number to a mnemonic seed phrase"
if fmt == 'xmrseed':
hexstr = self._xmr_reduce(bytes.fromhex(hexstr)).hex()
f = mnemonic_fmts[fmt]
return ' '.join( f.conv_cls(fmt).fromhex(hexstr,f.pad) )
def mn2hex( self, seed_mnemonic:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"convert a mnemonic seed phrase to a hexadecimal number"
f = mnemonic_fmts[fmt]
return f.conv_cls(fmt).tohex( seed_mnemonic.split(), f.pad )
def mn2hex_interactive( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, mn_len=24, print_mn=False ):
"convert an interactively supplied mnemonic seed phrase to a hexadecimal number"
from ..mn_entry import mn_entry
mn = mn_entry(fmt).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len,validate=False)
if print_mn:
from ..util import msg
msg(mn)
return self.mn2hex(seed_mnemonic=mn,fmt=fmt)
def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
"show stats for mnemonic wordlist"
return mnemonic_fmts[fmt].conv_cls(fmt).check_wordlist()
def mn_printlist( self, fmt:mn_opts_disp = dfl_mnemonic_fmt, enum=False, pager=False ):
"print mnemonic wordlist"
ret = mnemonic_fmts[fmt].conv_cls(fmt).get_wordlist()
if enum:
ret = [f'{n:>4} {e}' for n,e in enumerate(ret)]
return '\n'.join(ret)

149
mmgen/tool/rpc.py Executable file
View file

@ -0,0 +1,149 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/rpc.py: JSON/RPC routines for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base,options_annot_str
from ..tw import TwCommon
class tool_cmd(tool_cmd_base):
"tracking wallet commands using the JSON-RPC interface"
def __init__(self,proto=None,mmtype=None):
if proto:
self.proto = proto
else:
from ..protocol import init_proto_from_opts
self.proto = init_proto_from_opts()
from ..globalvars import g
if g.token:
self.proto.tokensym = g.token.upper()
async def daemon_version(self):
"print coin daemon version"
from ..rpc import rpc_init
r = await rpc_init( self.proto, ignore_daemon_version=True )
return f'{r.daemon.coind_name} version {r.daemon_version} ({r.daemon_version_str})'
async def getbalance(self,minconf=1,quiet=False,pager=False):
"list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet"
from ..twbal import TwGetBalance
return (await TwGetBalance(self.proto,minconf,quiet)).format()
async def listaddress(self,
mmgen_addr:str,
minconf = 1,
pager = False,
showempty = True,
showbtcaddr = True,
age_fmt: options_annot_str(TwCommon.age_fmts) = 'confs' ):
"list the specified MMGen address and its balance"
return await self.listaddresses(
mmgen_addrs = mmgen_addr,
minconf = minconf,
pager = pager,
showempty = showempty,
showbtcaddrs = showbtcaddr,
age_fmt = age_fmt )
async def listaddresses(self,
mmgen_addrs:'(range or list)' = '',
minconf = 1,
showempty = False,
pager = False,
showbtcaddrs = True,
all_labels = False,
sort: options_annot_str(['reverse','age']) = '',
age_fmt: options_annot_str(TwCommon.age_fmts) = 'confs' ):
"list MMGen addresses and their balances"
show_age = bool(age_fmt)
if sort:
sort = set(sort.split(','))
sort_params = {'reverse','age'}
if not sort.issubset( sort_params ):
from ..util import die
die(1,"The sort option takes the following parameters: '{}'".format( "','".join(sort_params) ))
usr_addr_list = []
if mmgen_addrs:
a = mmgen_addrs.rsplit(':',1)
if len(a) != 2:
from ..util import die
die(1,
f'{mmgen_addrs}: invalid address list argument ' +
'(must be in form <seed ID>:[<type>:]<idx list>)' )
from ..addr import MMGenID
from ..addrlist import AddrIdxList
usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])]
from ..twaddrs import TwAddrList
al = await TwAddrList( self.proto, usr_addr_list, minconf, showempty, showbtcaddrs, all_labels )
if not al:
from ..util import die
die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty])
return await al.format( showbtcaddrs, sort, show_age, age_fmt or 'confs' )
async def twview(self,
pager = False,
reverse = False,
wide = False,
minconf = 1,
sort = 'age',
age_fmt: options_annot_str(TwCommon.age_fmts) = 'confs',
show_mmid = True,
wide_show_confs = True ):
"view tracking wallet"
from ..twuo import TwUnspentOutputs
twuo = await TwUnspentOutputs(self.proto,minconf=minconf)
await twuo.get_unspent_data(reverse_sort=reverse)
twuo.age_fmt = age_fmt
twuo.show_mmid = show_mmid
if wide:
ret = twuo.format_for_printing( color=True, show_confs=wide_show_confs )
else:
ret = twuo.format_for_display()
del twuo.wallet
return await ret
async def add_label(self,mmgen_or_coin_addr:str,label:str):
"add descriptive label for address in tracking wallet"
from ..twctl import TrackingWallet
await (await TrackingWallet(self.proto,mode='w')).add_label( mmgen_or_coin_addr, label, on_fail='raise' )
return True
async def remove_label(self,mmgen_or_coin_addr:str):
"remove descriptive label for address in tracking wallet"
await self.add_label( mmgen_or_coin_addr, '' )
return True
async def remove_address(self,mmgen_or_coin_addr:str):
"remove an address from tracking wallet"
from ..twctl import TrackingWallet
ret = await (await TrackingWallet(self.proto,mode='w')).remove_address(mmgen_or_coin_addr) # returns None on failure
if ret:
from ..util import msg
msg(f'Address {ret!r} deleted from tracking wallet')
return ret

175
mmgen/tool/util.py Executable file
View file

@ -0,0 +1,175 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/util.py: Utility commands for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base
class tool_cmd(tool_cmd_base):
"general string conversion and hashing utilities"
def bytespec(self,dd_style_byte_specifier:str):
"convert a byte specifier such as '1GB' into an integer"
from ..util import parse_bytespec
return parse_bytespec(dd_style_byte_specifier)
def to_bytespec(self,
n: int,
dd_style_byte_specifier: str,
fmt = '0.2',
print_sym = True ):
"convert an integer to a byte specifier such as '1GB'"
from ..util import int2bytespec
return int2bytespec( n, dd_style_byte_specifier, fmt, print_sym )
def randhex(self,nbytes='32'):
"print 'n' bytes (default 32) of random data in hex format"
from ..crypto import get_random
return get_random( int(nbytes) ).hex()
def hexreverse(self,hexstr:'sstr'):
"reverse bytes of a hexadecimal string"
return bytes.fromhex( hexstr.strip() )[::-1].hex()
def hexlify(self,infile:str):
"convert bytes in file to hexadecimal (use '-' for stdin)"
from ..fileutil import get_data_from_file
data = get_data_from_file( infile, dash=True, quiet=True, binary=True )
return data.hex()
def unhexlify(self,hexstr:'sstr'):
"convert hexadecimal value to bytes (warning: outputs binary data)"
return bytes.fromhex(hexstr)
def hexdump(self,infile:str,cols=8,line_nums='hex'):
"create hexdump of data from file (use '-' for stdin)"
from ..fileutil import get_data_from_file
from ..util import pretty_hexdump
data = get_data_from_file( infile, dash=True, quiet=True, binary=True )
return pretty_hexdump( data, cols=cols, line_nums=line_nums ).rstrip()
def unhexdump(self,infile:str):
"decode hexdump from file (use '-' for stdin) (warning: outputs binary data)"
from ..globalvars import g
if g.platform == 'win':
import msvcrt
msvcrt.setmode( sys.stdout.fileno(), os.O_BINARY )
from ..fileutil import get_data_from_file
from ..util import decode_pretty_hexdump
hexdata = get_data_from_file( infile, dash=True, quiet=True )
return decode_pretty_hexdump(hexdata)
def hash160(self,hexstr:'sstr'):
"compute ripemd160(sha256(data)) (convert hex pubkey to hex addr)"
from ..protocol import hash160
return hash160( bytes.fromhex(hexstr) ).hex()
def hash256(self,string_or_bytes:str,file_input=False,hex_input=False): # TODO: handle stdin
"compute sha256(sha256(data)) (double sha256)"
from hashlib import sha256
if file_input:
from ..fileutil import get_data_from_file
b = get_data_from_file( string_or_bytes, binary=True )
elif hex_input:
from ..util import decode_pretty_hexdump
b = decode_pretty_hexdump(string_or_bytes)
else:
b = string_or_bytes
return sha256(sha256(b.encode()).digest()).hexdigest()
def id6(self,infile:str):
"generate 6-character MMGen ID for a file (use '-' for stdin)"
from ..util import make_chksum_6
from ..fileutil import get_data_from_file
return make_chksum_6(
get_data_from_file( infile, dash=True, quiet=True, binary=True ))
def str2id6(self,string:'sstr'): # retain ignoring of space for backwards compat
"generate 6-character MMGen ID for a string, ignoring spaces"
from ..util import make_chksum_6
return make_chksum_6( ''.join(string.split()) )
def id8(self,infile:str):
"generate 8-character MMGen ID for a file (use '-' for stdin)"
from ..util import make_chksum_8
from ..fileutil import get_data_from_file
return make_chksum_8(
get_data_from_file( infile, dash=True, quiet=True, binary=True ))
def randb58(self,nbytes=32,pad=0):
"generate random data (default: 32 bytes) and convert it to base 58"
from ..crypto import get_random
from ..baseconv import baseconv
return baseconv('b58').frombytes( get_random(nbytes), pad=pad, tostr=True )
def bytestob58(self,infile:str,pad=0):
"convert bytes to base 58 (supply data via STDIN)"
from ..fileutil import get_data_from_file
from ..baseconv import baseconv
data = get_data_from_file( infile, dash=True, quiet=True, binary=True )
return baseconv('b58').frombytes( data, pad=pad, tostr=True )
def b58tobytes(self,b58num:'sstr',pad=0):
"convert a base 58 number to bytes (warning: outputs binary data)"
from ..baseconv import baseconv
return baseconv('b58').tobytes( b58num, pad=pad )
def hextob58(self,hexstr:'sstr',pad=0):
"convert a hexadecimal number to base 58"
from ..baseconv import baseconv
return baseconv('b58').fromhex( hexstr, pad=pad, tostr=True )
def b58tohex(self,b58num:'sstr',pad=0):
"convert a base 58 number to hexadecimal"
from ..baseconv import baseconv
return baseconv('b58').tohex( b58num, pad=pad )
def hextob58chk(self,hexstr:'sstr'):
"convert a hexadecimal number to base58-check encoding"
from ..protocol import _b58chk_encode
return _b58chk_encode( bytes.fromhex(hexstr) )
def b58chktohex(self,b58chk_num:'sstr'):
"convert a base58-check encoded number to hexadecimal"
from ..protocol import _b58chk_decode
return _b58chk_decode(b58chk_num).hex()
def hextob32(self,hexstr:'sstr',pad=0):
"convert a hexadecimal number to MMGen's flavor of base 32"
from ..baseconv import baseconv
return baseconv('b32').fromhex( hexstr, pad, tostr=True )
def b32tohex(self,b32num:'sstr',pad=0):
"convert an MMGen-flavor base 32 number to hexadecimal"
from ..baseconv import baseconv
return baseconv('b32').tohex( b32num.upper(), pad )
def hextob6d(self,hexstr:'sstr',pad=0,add_spaces=True):
"convert a hexadecimal number to die roll base6 (base6d)"
from ..baseconv import baseconv
from ..util import block_format
ret = baseconv('b6d').fromhex(hexstr,pad,tostr=True)
return block_format( ret, gw=5, cols=None ).strip() if add_spaces else ret
def b6dtohex(self,b6d_num:'sstr',pad=0):
"convert a die roll base6 (base6d) number to hexadecimal"
from ..baseconv import baseconv
from ..util import remove_whitespace
return baseconv('b6d').tohex( remove_whitespace(b6d_num), pad )

91
mmgen/tool/wallet.py Executable file
View file

@ -0,0 +1,91 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/wallet.py: Wallet routines for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base
from ..opts import opt
from ..fileutil import get_seed_file
from ..subseed import SubSeedList
from ..seedsplit import MasterShareIdx
from ..wallet import Wallet
class tool_cmd(tool_cmd_base):
"key, address or subseed generation from an MMGen wallet"
def __init__(self,proto=None,mmtype=None):
if proto:
self.proto = proto
else:
from ..protocol import init_proto_from_opts
self.proto = init_proto_from_opts()
def get_subseed(self,subseed_idx:str,wallet=''):
"get the Seed ID of a single subseed by Subseed Index for default or specified wallet"
opt.quiet = True
sf = get_seed_file([wallet] if wallet else [],1)
return Wallet(sf).seed.subseed(subseed_idx).sid
def get_subseed_by_seed_id(self,seed_id:str,wallet='',last_idx=SubSeedList.dfl_len):
"get the Subseed Index of a single subseed by Seed ID for default or specified wallet"
opt.quiet = True
sf = get_seed_file([wallet] if wallet else [],1)
ret = Wallet(sf).seed.subseed_by_seed_id( seed_id, last_idx )
return ret.ss_idx if ret else None
def list_subseeds(self,subseed_idx_range:str,wallet=''):
"list a range of subseed Seed IDs for default or specified wallet"
opt.quiet = True
sf = get_seed_file([wallet] if wallet else [],1)
from ..subseed import SubSeedIdxRange
return Wallet(sf).seed.subseeds.format( *SubSeedIdxRange(subseed_idx_range) )
def list_shares(self,
share_count: int,
id_str = 'default',
master_share: f'(min:1, max:{MasterShareIdx.max_val}, 0=no master share)' = 0,
wallet = '' ):
"list the Seed IDs of the shares resulting from a split of default or specified wallet"
opt.quiet = True
sf = get_seed_file([wallet] if wallet else [],1)
return Wallet(sf).seed.split( share_count, id_str, master_share ).format()
def gen_key(self,mmgen_addr:str,wallet=''):
"generate a single MMGen WIF key from default or specified wallet"
return self.gen_addr( mmgen_addr, wallet, target='wif' )
def gen_addr(self,mmgen_addr:str,wallet='',target='addr'):
"generate a single MMGen address from default or specified wallet"
from ..addr import MMGenID
from ..addrlist import AddrList,AddrIdxList
addr = MMGenID( self.proto, mmgen_addr )
opt.quiet = True
sf = get_seed_file([wallet] if wallet else [],1)
ss = Wallet(sf)
if ss.seed.sid != addr.sid:
from ..util import die
die(1,f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})')
d = AddrList(
proto = self.proto,
seed = ss.seed,
addr_idxs = AddrIdxList(str(addr.idx)),
mmtype = addr.mmtype ).data[0]
return d.sec.wif if target == 'wif' else d.addr

View file

@ -1156,8 +1156,8 @@ harder to find, you're advised to choose a much larger file size than this.
break
msg(f'File size must be an integer no less than {min_fsize}')
from .tool import MMGenToolCmdFileUtil
MMGenToolCmdFileUtil().rand2file(fn,str(fsize))
from .tool.fileutil import tool_cmd
tool_cmd().rand2file(fn,str(fsize))
check_offset = False
else:
die(1,'Exiting at user request')

View file

@ -63,7 +63,7 @@ def test_triplet(tool,coin,network,addrtype,key_idx,wif_chk,addr_chk):
def run_test():
from mmgen.tool import tool_api
from mmgen.tool.api import tool_api
tool = tool_api()
tool.coins

View file

@ -36,6 +36,7 @@ def overlay_setup(repo_root):
'mmgen',
'mmgen.data',
'mmgen.share',
'mmgen.tool',
'mmgen.altcoins',
'mmgen.altcoins.eth',
'mmgen.altcoins.eth.pyethereum',

View file

@ -335,7 +335,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
if g.daemon_id == 'erigon':
from hashlib import sha256
from mmgen.tool import tool_api
from mmgen.tool.api import tool_api
devkey = sha256(b'erigon devnet key').hexdigest()
t = tool_api()
t.init_coin(g.coin,'regtest')
@ -837,9 +837,9 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
async def token_transfer_ops(self,op,amt=1000):
self.spawn('',msg_only=True)
sid = dfl_sid
from mmgen.tool import MMGenToolCmdWallet
from mmgen.tool.wallet import tool_cmd
usr_mmaddrs = [f'{sid}:E:{i}' for i in (11,21)]
usr_addrs = [MMGenToolCmdWallet(proto=self.proto).gen_addr(addr,dfl_words_file) for addr in usr_mmaddrs]
usr_addrs = [tool_cmd(proto=self.proto).gen_addr(addr,dfl_words_file) for addr in usr_mmaddrs]
from mmgen.altcoins.eth.contract import TokenResolve
from mmgen.altcoins.eth.tx import EthereumMMGenTX as etx

View file

@ -132,7 +132,7 @@ rt_data = {
}
def create_burn_addr(proto):
from mmgen.tool import tool_api
from mmgen.tool.api import tool_api
t = tool_api()
t.init_coin(proto.coin,proto.network)
t.addrtype = 'compressed'
@ -808,7 +808,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
f'Replacing transactions:\s+{new_txid}' )
def _gen_pairs(self,n):
from mmgen.tool import tool_api
from mmgen.tool.api import tool_api
t = tool_api()
t.init_coin(self.proto.coin,self.proto.network)

View file

@ -32,14 +32,14 @@ class TestSuiteTool(TestSuiteMain,TestSuiteBase):
def tool_rand2file(self):
outfile = os.path.join(self.tmpdir,'rand2file.out')
from mmgen.tool import MMGenToolCmdUtil
from mmgen.util import parse_bytespec
for nbytes in ('1','1023','1K','1048575','1M','1048577','123M'):
t = self.spawn(
'mmgen-tool',
['-d',self.tmpdir,'-r0','rand2file','rand2file.out',nbytes],
extra_desc='({} byte{})'.format(
nbytes,
suf(MMGenToolCmdUtil().bytespec(nbytes)) )
suf(parse_bytespec(nbytes)) )
)
t.expect('random data written to file')
t.read()

View file

@ -560,7 +560,7 @@ class TestSuiteXMRWallet(TestSuiteBase):
die(2,'Restart attempt limit exceeded')
async def send_random_txs():
from mmgen.tool import tool_api
from mmgen.tool.api import tool_api
t = tool_api()
t.init_coin('XMR','testnet')
t.usr_randchars = 0

View file

@ -23,7 +23,7 @@ test/tooltest2.py: Simple tests for the 'mmgen-tool' utility
# TODO: move all non-interactive 'mmgen-tool' tests in 'test.py' here
# TODO: move all(?) tests in 'tooltest.py' here (or duplicate them?)
import sys,os,time
import sys,os,time,importlib
from subprocess import run,PIPE
from decimal import Decimal
@ -757,10 +757,105 @@ tests = {
},
}
coin_dependent_groups = ('Coin','File') # TODO: do this as attr of each group in tool.py
coin_dependent_groups = ('Coin','File')
async def run_test(gid,cmd_name):
def fork_cmd(cmd_name,args,out,opts,stdin_input):
cmd = (
tool_cmd_preargs +
tool_cmd +
(opts or []) +
[cmd_name] + args
)
vmsg('{} {}'.format(
green('Executing'),
cyan(' '.join(cmd)) ))
cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
try:
cmd_out = cp.stdout.decode()
except:
cmd_out = cp.stdout
if cp.stderr:
vmsg(cp.stderr.strip().decode())
if cp.returncode != 0:
import re
m = re.match(b'tool command returned (None|False)'+NL.encode(),cp.stderr)
if m:
return { b'None': None, b'False': False }[m.group(1)]
else:
ydie(1,f'Spawned program exited with error: {cp.stderr}')
return cmd_out.strip()
async def call_method(cls,method,cmd_name,args,out,opts,mmtype,stdin_input):
vmsg('{}: {}{}'.format(purple('Running'),
' '.join([cmd_name]+[repr(e) for e in args]),
' '+mmtype if mmtype else '' ))
aargs,kwargs = main_tool.process_args(cmd_name,args,cls)
oq_save = bool(opt.quiet)
if not opt.verbose:
opt.quiet = True
if stdin_input:
fd0,fd1 = os.pipe()
if os.fork(): # parent
os.close(fd1)
stdin_save = os.dup(0)
os.dup2(fd0,0)
cmd_out = method(*aargs,**kwargs)
os.dup2(stdin_save,0)
os.wait()
opt.quiet = oq_save
return cmd_out
else: # child
os.close(fd0)
os.write(fd1,stdin_input)
vmsg(f'Input: {stdin_input!r}')
sys.exit(0)
else:
ret = method(*aargs,**kwargs)
if type(ret).__name__ == 'coroutine':
ret = await ret
opt.quiet = oq_save
return ret
def tool_api(cls,cmd_name,args,out,opts):
from mmgen.tool.api import tool_api
tool = tool_api()
if opts:
for o in opts:
if o.startswith('--type='):
tool.addrtype = o.split('=')[1]
pargs,kwargs = main_tool.process_args(cmd_name,args,cls)
return getattr(tool,cmd_name)(*pargs,**kwargs)
def check_output(out,chk):
if isinstance(chk,str):
chk = chk.encode()
if isinstance(out,int):
out = str(out).encode()
if isinstance(out,str):
out = out.encode()
err_fs = "Output ({!r}) doesn't match expected output ({!r})"
try: outd = out.decode()
except: outd = None
if type(chk).__name__ == 'function':
assert chk(outd), f'{chk.__name__}({outd}) failed!'
elif type(chk) == dict:
for k,v in chk.items():
if k == 'boolfunc':
assert v(outd), f'{v.__name__}({outd}) failed!'
elif k == 'value':
assert outd == v, err_fs.format(outd,v)
else:
outval = getattr(__builtins__,k)(out)
if outval != v:
die(1,f'{k}({out}) returned {outval}, not {v}!')
elif chk is not None:
assert out == chk, err_fs.format(out,chk)
async def run_test(cls,gid,cmd_name):
data = tests[gid][cmd_name]
# behavior is like test.py: run coin-dependent tests only if proto.testnet or proto.coin != BTC
if gid in coin_dependent_groups:
k = '{}_{}'.format(
@ -776,85 +871,14 @@ async def run_test(gid,cmd_name):
if proto.coin != 'BTC' or proto.testnet:
return
m2 = ''
m = '{} {}{}'.format(
purple('Testing'),
cmd_name if opt.names else docstring_head(tc[cmd_name]),
cmd_name if opt.names else docstring_head(getattr(cls,cmd_name)),
m2 )
msg_r(green(m)+'\n' if opt.verbose else m)
def fork_cmd(cmd_name,args,out,opts):
cmd = (
tool_cmd_preargs +
tool_cmd +
(opts or []) +
[cmd_name] + args
)
vmsg('{} {}'.format(
green('Executing'),
cyan(' '.join(cmd)) ))
cp = run(cmd,input=stdin_input or None,stdout=PIPE,stderr=PIPE)
try:
cmd_out = cp.stdout.decode()
except:
cmd_out = cp.stdout
if cp.stderr:
vmsg(cp.stderr.strip().decode())
if cp.returncode != 0:
import re
m = re.match(b'tool command returned (None|False)'+NL.encode(),cp.stderr)
if m:
return { b'None': None, b'False': False }[m.group(1)]
else:
ydie(1,f'Spawned program exited with error: {cp.stderr}')
return cmd_out.strip()
async def run_func(cmd_name,args,out,opts,mmtype):
vmsg('{}: {}{}'.format(purple('Running'),
' '.join([cmd_name]+[repr(e) for e in args]),
' '+mmtype if mmtype else '' ))
aargs,kwargs = tool._process_args(cmd_name,args)
tm = tool.MMGenToolCmdMeta
cls_name = tm.classname(tm,cmd_name)
tobj = getattr(tool,cls_name)(mmtype=mmtype)
method = getattr(tobj,cmd_name)
oq_save = bool(opt.quiet)
if not opt.verbose:
opt.quiet = True
if stdin_input:
fd0,fd1 = os.pipe()
if os.fork(): # parent
os.close(fd1)
stdin_save = os.dup(0)
os.dup2(fd0,0)
cmd_out = method(*aargs,**kwargs)
os.dup2(stdin_save,0)
os.wait()
opt.quiet = oq_save
return cmd_out
else: # child
os.close(fd0)
os.write(fd1,stdin_input)
vmsg(f'Input: {stdin_input!r}')
sys.exit(0)
else:
ret = method(*aargs,**kwargs)
if type(ret).__name__ == 'coroutine':
ret = await ret
opt.quiet = oq_save
return ret
def tool_api(cmd_name,args,out,opts):
from mmgen.tool import tool_api,_process_args
tool = tool_api()
if opts:
for o in opts:
if o.startswith('--type='):
tool.addrtype = o.split('=')[1]
pargs,kwargs = _process_args(cmd_name,args)
return getattr(tool,cmd_name)(*pargs,**kwargs)
for d in data:
args,out,opts,mmtype = d + tuple([None] * (4-len(d)))
stdin_input = None
@ -865,43 +889,20 @@ async def run_test(gid,cmd_name):
if opt.tool_api:
if args and args[0 ]== '-':
continue
cmd_out = tool_api(cmd_name,args,out,opts)
cmd_out = tool_api(cls,cmd_name,args,out,opts)
elif opt.fork:
cmd_out = fork_cmd(cmd_name,args,out,opts)
cmd_out = fork_cmd(cmd_name,args,out,opts,stdin_input)
else:
if stdin_input and g.platform == 'win':
msg('Skipping for MSWin - no os.fork()')
continue
cmd_out = await run_func(cmd_name,args,out,opts,mmtype)
method = getattr(cls(proto=proto,mmtype=mmtype),cmd_name)
cmd_out = await call_method(cls,method,cmd_name,args,out,opts,mmtype,stdin_input)
try: vmsg(f'Output:\n{cmd_out}\n')
except: vmsg(f'Output:\n{cmd_out!r}\n')
def check_output(out,chk):
if isinstance(chk,str):
chk = chk.encode()
if isinstance(out,int):
out = str(out).encode()
if isinstance(out,str):
out = out.encode()
err_fs = "Output ({!r}) doesn't match expected output ({!r})"
try: outd = out.decode()
except: outd = None
if type(chk).__name__ == 'function':
assert chk(outd), f'{chk.__name__}({outd}) failed!'
elif type(chk) == dict:
for k,v in chk.items():
if k == 'boolfunc':
assert v(outd), f'{v.__name__}({outd}) failed!'
elif k == 'value':
assert outd == v, err_fs.format(outd,v)
else:
outval = getattr(__builtins__,k)(out)
if outval != v:
die(1,f'{k}({out}) returned {outval}, not {v}!')
elif chk is not None:
assert out == chk, err_fs.format(out,chk)
try:
vmsg(f'Output:\n{cmd_out}\n')
except:
vmsg(f'Output:\n{cmd_out!r}\n')
if type(out) == tuple and type(out[0]).__name__ == 'function':
func_out = out[0](cmd_out)
@ -917,7 +918,9 @@ async def run_test(gid,cmd_name):
else:
check_output(cmd_out,out)
if not opt.verbose: msg_r('.')
if not opt.verbose:
msg_r('.')
if not opt.verbose:
msg('OK')
@ -926,28 +929,30 @@ def docstring_head(obj):
async def do_group(gid):
desc = f'command group {gid!r}'
cls = main_tool.get_mod_cls(gid.lower())
qmsg(blue('Testing ' +
desc if opt.names else
( docstring_head(tc.classes['MMGenToolCmd'+gid]) or desc )
( docstring_head(cls) or desc )
))
for cname in tc.classes['MMGenToolCmd'+gid].user_commands:
if cname in skipped_tests:
for cmdname in cls().user_commands:
if cmdname in skipped_tests:
continue
if cname not in tests[gid]:
m = f'No test for command {cname!r} in group {gid!r}!'
if cmdname not in tests[gid]:
m = f'No test for command {cmdname!r} in group {gid!r}!'
if opt.die_on_missing:
die(1,m+' Aborting')
else:
msg(m)
continue
await run_test(gid,cname)
await run_test(cls,gid,cmdname)
async def do_cmd_in_group(cmd):
for gid in tests:
for cname in tests[gid]:
if cname == cmd:
await run_test(gid,cname)
async def do_cmd_in_group(cmdname):
cls = main_tool.get_cmd_cls(cmdname)
for gid,cmds in tests.items():
for cmd in cmds:
if cmd == cmdname:
await run_test(cls,gid,cmdname)
return True
return False
@ -972,15 +977,13 @@ if opt.tool_api:
del tests['Wallet']
del tests['File']
import mmgen.tool as tool
tc = tool.MMGenToolCmds
import mmgen.main_tool as main_tool
if opt.list_tests:
Msg('Available tests:')
for gid in tests:
Msg(' {:6} - {}'.format(
gid,
docstring_head(tc.classes['MMGenToolCmd'+gid]) ))
for modname,cmdlist in main_tool.mods.items():
cls = getattr(importlib.import_module(f'mmgen.tool.{modname}'),'tool_cmd')
Msg(' {:6} - {}'.format( modname, docstring_head(cls) ))
sys.exit(0)
if opt.list_tested_cmds: