whitespace: tool

This commit is contained in:
The MMGen Project 2024-10-18 10:32:10 +00:00
commit f77a6115e4
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
12 changed files with 301 additions and 301 deletions

View file

@ -1,4 +1,4 @@
# provide this for backwards compatibility:
def tool_api(*args,**kwargs):
def tool_api(*args, **kwargs):
from .api import tool_api
return tool_api(*args,**kwargs)
return tool_api(*args, **kwargs)

View file

@ -29,7 +29,7 @@ class tool_api(
util_cmds,
coin_cmds,
mnemonic_cmds,
tool_cmd_base ):
tool_cmd_base):
"""
API providing access to a subset of methods from the mmgen.tool module
@ -41,7 +41,7 @@ class tool_api(
tool = tool_api(Config())
# Set the coin and network:
tool.init_coin('btc','mainnet')
tool.init_coin('btc', 'mainnet')
# Print available address types:
tool.print_addrtypes()
@ -53,38 +53,38 @@ class tool_api(
tool.usr_randchars = 0
# Generate a random BTC segwit keypair:
wif,addr = tool.randpair()
wif, addr = tool.randpair()
# Set coin, network and address type:
tool.init_coin('ltc','testnet')
tool.init_coin('ltc', 'testnet')
tool.addrtype = 'bech32'
# Generate a random LTC testnet Bech32 keypair:
wif,addr = tool.randpair()
wif, addr = tool.randpair()
print('wif: ',wif)
print('addr:',addr)
print('wif: ', wif)
print('addr:', addr)
"""
need_proto = True
need_addrtype = True
def __init__(self,cfg):
def __init__(self, cfg):
"""
Initializer - takes no arguments
"""
type(cfg)._reset_ok += ('usr_randchars',)
super().__init__(cfg=cfg)
def init_coin(self,coinsym,network):
def init_coin(self, coinsym, network):
"""
Initialize a coin/network pair
Valid choices for coins: one of the symbols returned by the 'coins' attribute
Valid choices for network: 'mainnet','testnet','regtest'
Valid choices for network: 'mainnet', 'testnet', 'regtest'
"""
from ..protocol import init_proto,warn_trustlevel
from ..protocol import init_proto, warn_trustlevel
warn_trustlevel(self.cfg)
self.proto = init_proto( self.cfg, coinsym, network=network, need_amt=True )
self.proto = init_proto(self.cfg, coinsym, network=network, need_amt=True)
return self.proto
@property
@ -114,7 +114,7 @@ class tool_api(
first-listed is the default
"""
from ..addr import MMGenAddrType
return [MMGenAddrType(proto=self.proto,id_str=id_str).name for id_str in self.proto.mmtypes]
return [MMGenAddrType(proto=self.proto, id_str=id_str).name for id_str in self.proto.mmtypes]
def print_addrtypes(self):
"""
@ -122,7 +122,7 @@ class tool_api(
a description. The first-listed is the default
"""
from ..addr import MMGenAddrType
for t in [MMGenAddrType(proto=self.proto,id_str=id_str) for id_str in self.proto.mmtypes]:
for t in [MMGenAddrType(proto=self.proto, id_str=id_str) for id_str in self.proto.mmtypes]:
print(f'{t.name:<12} - {t.desc}')
@property
@ -131,9 +131,9 @@ class tool_api(
return self.mmtype
@addrtype.setter
def addrtype(self,val):
def addrtype(self, val):
from ..addr import MMGenAddrType
self.mmtype = MMGenAddrType(self.proto,val)
self.mmtype = MMGenAddrType(self.proto, val)
@property
def usr_randchars(self):
@ -144,5 +144,5 @@ class tool_api(
return self.cfg.usr_randchars
@usr_randchars.setter
def usr_randchars(self,val):
def usr_randchars(self, val):
self.cfg.usr_randchars = val

View file

@ -21,13 +21,13 @@ tool.coin: Cryptocoin routines for the 'mmgen-tool' utility
"""
from collections import namedtuple
generator_data = namedtuple('generator_data',['kg','ag'])
generator_data = namedtuple('generator_data', ['kg', 'ag'])
from .common import tool_cmd_base
from ..key import PrivKey
from ..addr import CoinAddr,MMGenAddrType
from ..addrgen import KeyGenerator,AddrGenerator
from ..addr import CoinAddr, MMGenAddrType
from ..addrgen import KeyGenerator, AddrGenerator
class tool_cmd(tool_cmd_base):
"""
@ -45,8 +45,8 @@ class tool_cmd(tool_cmd_base):
def _init_generators(self):
return generator_data(
kg = KeyGenerator( self.cfg, self.proto, self.mmtype.pubkey_type ),
ag = AddrGenerator( self.cfg, self.proto, self.mmtype ),
kg = KeyGenerator(self.cfg, self.proto, self.mmtype.pubkey_type),
ag = AddrGenerator(self.cfg, self.proto, self.mmtype),
)
def randwif(self):
@ -56,7 +56,7 @@ class tool_cmd(tool_cmd_base):
self.proto,
Crypto(self.cfg).get_random(32),
pubkey_type = self.mmtype.pubkey_type,
compressed = self.mmtype.compressed ).wif
compressed = self.mmtype.compressed).wif
def randpair(self):
"generate a random private key/address pair"
@ -66,77 +66,77 @@ class tool_cmd(tool_cmd_base):
self.proto,
Crypto(self.cfg).get_random(32),
pubkey_type = self.mmtype.pubkey_type,
compressed = self.mmtype.compressed )
compressed = self.mmtype.compressed)
return (
privkey.wif,
gd.ag.to_addr( gd.kg.gen_data(privkey) ))
gd.ag.to_addr(gd.kg.gen_data(privkey)))
def wif2hex(self,wifkey:'sstr'):
def wif2hex(self, wifkey: 'sstr'):
"convert a private key from WIF to hexadecimal format"
return PrivKey(
self.proto,
wif = wifkey ).hex()
wif = wifkey).hex()
def hex2wif(self,privhex:'sstr'):
def hex2wif(self, privhex: 'sstr'):
"convert a private key from hexadecimal to WIF format"
return PrivKey(
self.proto,
bytes.fromhex(privhex),
pubkey_type = self.mmtype.pubkey_type,
compressed = self.mmtype.compressed ).wif
compressed = self.mmtype.compressed).wif
def wif2addr(self,wifkey:'sstr'):
def wif2addr(self, wifkey: 'sstr'):
"generate a coin address from a key in WIF format"
gd = self._init_generators()
privkey = PrivKey(
self.proto,
wif = wifkey )
return gd.ag.to_addr( gd.kg.gen_data(privkey) )
wif = wifkey)
return gd.ag.to_addr(gd.kg.gen_data(privkey))
def wif2redeem_script(self,wifkey:'sstr'): # new
def wif2redeem_script(self, wifkey: 'sstr'): # new
"convert a WIF private key to a Segwit P2SH-P2WPKH redeem script"
assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit'
assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit'
gd = self._init_generators()
privkey = PrivKey(
self.proto,
wif = wifkey )
return gd.ag.to_segwit_redeem_script( gd.kg.gen_data(privkey) )
wif = wifkey)
return gd.ag.to_segwit_redeem_script(gd.kg.gen_data(privkey))
def wif2segwit_pair(self,wifkey:'sstr'):
def wif2segwit_pair(self, wifkey: 'sstr'):
"generate a Segwit P2SH-P2WPKH redeem script and address from a WIF private key"
assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit'
assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit'
gd = self._init_generators()
data = gd.kg.gen_data(PrivKey(
self.proto,
wif = wifkey ))
wif = wifkey))
return (
gd.ag.to_segwit_redeem_script(data),
gd.ag.to_addr(data) )
gd.ag.to_addr(data))
def _privhex2out(self,privhex:'sstr',output_pubhex=False):
def _privhex2out(self, privhex: 'sstr', output_pubhex=False):
gd = self._init_generators()
pk = PrivKey(
self.proto,
bytes.fromhex(privhex),
compressed = self.mmtype.compressed,
pubkey_type = self.mmtype.pubkey_type )
pubkey_type = self.mmtype.pubkey_type)
data = gd.kg.gen_data(pk)
return data.pubkey.hex() if output_pubhex else gd.ag.to_addr(data)
def privhex2addr(self,privhex:'sstr'):
def privhex2addr(self, privhex: 'sstr'):
"generate a coin address from raw hexadecimal private key data"
return self._privhex2out(privhex)
def privhex2pubhex(self,privhex:'sstr'): # new
def privhex2pubhex(self, privhex: 'sstr'): # new
"generate a hexadecimal public key from raw hexadecimal private key data"
return self._privhex2out(privhex,output_pubhex=True)
return self._privhex2out(privhex, output_pubhex=True)
def pubhex2addr(self,pubkeyhex:'sstr'):
def pubhex2addr(self, pubkeyhex: 'sstr'):
"convert a hexadecimal pubkey to an address"
if self.proto.base_proto == 'Ethereum' and len(pubkeyhex) == 128: # support raw ETH pubkeys
pubkeyhex = '04' + pubkeyhex
from ..keygen import keygen_public_data
ag = AddrGenerator( self.cfg, self.proto, self.mmtype )
ag = AddrGenerator(self.cfg, self.proto, self.mmtype)
return ag.to_addr(keygen_public_data(
pubkey = bytes.fromhex(pubkeyhex),
viewkey_bytes = None,
@ -144,13 +144,13 @@ class tool_cmd(tool_cmd_base):
compressed = self.mmtype.compressed,
))
def pubhex2redeem_script(self,pubkeyhex:'sstr'): # new
def pubhex2redeem_script(self, pubkeyhex: 'sstr'): # new
"convert a hexadecimal pubkey to a Segwit P2SH-P2WPKH redeem script"
assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit'
assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit'
from ..proto.btc.common import hash160
return self.proto.pubhash2redeem_script(hash160(bytes.fromhex(pubkeyhex))).hex()
def redeem_script2addr(self,redeem_script_hex:'sstr'): # new
def redeem_script2addr(self, redeem_script_hex: 'sstr'): # new
"convert a Segwit P2SH-P2WPKH redeem script to an address"
assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit'
assert redeem_script_hex[:4] == '0014', f'{redeem_script_hex!r}: invalid redeem script'
@ -158,7 +158,7 @@ class tool_cmd(tool_cmd_base):
from ..proto.btc.common import hash160
return self.proto.pubhash2addr(hash160(bytes.fromhex(redeem_script_hex)), 'p2sh')
def pubhash2addr(self,pubhashhex:'sstr'):
def pubhash2addr(self, pubhashhex: 'sstr'):
"convert public key hash to address"
pubhash = bytes.fromhex(pubhashhex)
if self.mmtype.name == 'segwit':
@ -168,26 +168,26 @@ class tool_cmd(tool_cmd_base):
else:
return self.proto.pubhash2addr(pubhash, self.mmtype.addr_fmt)
def addr2pubhash(self,addr:'sstr'):
def addr2pubhash(self, addr: 'sstr'):
"convert coin address to public key hash"
ap = self.proto.decode_addr(addr)
assert ap, f'coin address {addr!r} could not be parsed'
if ap.fmt not in MMGenAddrType.pkh_fmts:
from ..util import die
die(2,f'{ap.fmt} addresses cannot be converted to pubhash')
die(2, f'{ap.fmt} addresses cannot be converted to pubhash')
return ap.bytes.hex()
def addr2scriptpubkey(self,addr:'sstr'):
def addr2scriptpubkey(self, addr: 'sstr'):
"convert coin address to scriptPubKey"
from ..proto.btc.tx.base import addr2scriptPubKey
return addr2scriptPubKey( self.proto, CoinAddr(self.proto,addr) )
return addr2scriptPubKey(self.proto, CoinAddr(self.proto, addr))
def scriptpubkey2addr(self,hexstr:'sstr'):
def scriptpubkey2addr(self, hexstr: 'sstr'):
"convert scriptPubKey to coin address"
from ..proto.btc.tx.base import scriptPubKey2addr
return scriptPubKey2addr( self.proto, hexstr )[0]
return scriptPubKey2addr(self.proto, hexstr)[0]
def eth_checksummed_addr(self,addr:'sstr'):
def eth_checksummed_addr(self, addr: 'sstr'):
"create a checksummed Ethereum address"
from ..protocol import init_proto
return init_proto( self.cfg, 'eth' ).checksummed_addr(addr)
return init_proto(self.cfg, 'eth').checksummed_addr(addr)

View file

@ -23,7 +23,7 @@ tool.common: Base class and shared routines for the 'mmgen-tool' utility
from ..objmethods import MMGenObject
def options_annot_str(l):
return "(valid choices: '{}')".format( "','".join(l) )
return "(valid choices: '{}')".format("','".join(l))
class tool_cmd_base(MMGenObject):
@ -31,13 +31,13 @@ class tool_cmd_base(MMGenObject):
need_addrtype = False
need_amt = False
def __init__(self,cfg,cmdname=None,proto=None,mmtype=None):
def __init__(self, cfg, cmdname=None, proto=None, mmtype=None):
self.cfg = cfg
if self.need_proto:
from ..protocol import init_proto_from_cfg
self.proto = proto or cfg._proto or init_proto_from_cfg(cfg,need_amt=True)
self.proto = proto or cfg._proto or init_proto_from_cfg(cfg, need_amt=True)
if cfg.token:
self.proto.tokensym = cfg.token.upper()
@ -45,8 +45,8 @@ class tool_cmd_base(MMGenObject):
from ..addr import MMGenAddrType
self.mmtype = MMGenAddrType(
self.proto,
mmtype or cfg.type or self.proto.dfl_mmtype )
mmtype or cfg.type or self.proto.dfl_mmtype)
@property
def user_commands(self):
return {k:v for k,v in type(self).__dict__.items() if callable(v) and not k.startswith('_')}
return {k:v for k, v in type(self).__dict__.items() if callable(v) and not k.startswith('_')}

View file

@ -20,55 +20,55 @@
tool.file: Address and transaction file routines for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base,options_annot_str
from .common import tool_cmd_base, options_annot_str
class tool_cmd(tool_cmd_base):
"utilities for viewing/checking MMGen address and transaction files"
need_proto = True
def __init__(self,cfg,cmdname=None,proto=None,mmtype=None):
def __init__(self, cfg, cmdname=None, proto=None, mmtype=None):
if cmdname == 'txview':
self.need_amt = True
super().__init__(cfg=cfg,cmdname=cmdname,proto=proto,mmtype=mmtype)
super().__init__(cfg=cfg, cmdname=cmdname, proto=proto, mmtype=mmtype)
def _file_chksum(self,mmgen_addrfile,obj):
def _file_chksum(self, mmgen_addrfile, obj):
kwargs = {'skip_chksum_msg':True}
if not obj.__name__ == 'PasswordList':
kwargs.update({'key_address_validity_check':False})
ret = obj( self.cfg, self.proto, mmgen_addrfile, **kwargs )
ret = obj(self.cfg, self.proto, mmgen_addrfile, **kwargs)
if self.cfg.verbose:
from ..util import msg,capfirst
from ..util import msg, capfirst
if ret.al_id.mmtype.name == 'password':
msg('Passwd fmt: {}\nPasswd len: {}\nID string: {}'.format(
capfirst(ret.pw_info[ret.pw_fmt].desc),
ret.pw_len,
ret.pw_id_str ))
ret.pw_id_str))
else:
msg(f'Base coin: {ret.base_coin} {capfirst(ret.network)}')
msg(f'MMType: {capfirst(ret.al_id.mmtype.name)}')
msg( f'List length: {len(ret.data)}')
msg(f'List length: {len(ret.data)}')
return ret.chksum
def addrfile_chksum(self,mmgen_addrfile:str):
def addrfile_chksum(self, mmgen_addrfile: str):
"compute checksum for MMGen address file"
from ..addrlist import AddrList
return self._file_chksum(mmgen_addrfile,AddrList)
return self._file_chksum(mmgen_addrfile, AddrList)
def keyaddrfile_chksum(self,mmgen_keyaddrfile:str):
def keyaddrfile_chksum(self, mmgen_keyaddrfile: str):
"compute checksum for MMGen key-address file"
from ..addrlist import KeyAddrList
return self._file_chksum(mmgen_keyaddrfile,KeyAddrList)
return self._file_chksum(mmgen_keyaddrfile, KeyAddrList)
def viewkeyaddrfile_chksum(self,mmgen_viewkeyaddrfile:str):
def viewkeyaddrfile_chksum(self, mmgen_viewkeyaddrfile: str):
"compute checksum for MMGen key-address file"
from ..addrlist import ViewKeyAddrList
return self._file_chksum(mmgen_viewkeyaddrfile,ViewKeyAddrList)
return self._file_chksum(mmgen_viewkeyaddrfile, ViewKeyAddrList)
def passwdfile_chksum(self,mmgen_passwdfile:str):
def passwdfile_chksum(self, mmgen_passwdfile: str):
"compute checksum for MMGen password file"
from ..passwdlist import PasswordList
return self._file_chksum(mmgen_passwdfile,PasswordList)
return self._file_chksum(mmgen_passwdfile, PasswordList)
async def txview(
self,
@ -78,18 +78,18 @@ class tool_cmd(tool_cmd_base):
'pager',
'terse',
'sort',
'filesort' ),
'dfls': ( False, False, 'addr', 'mtime' ),
'filesort'),
'dfls': (False, False, 'addr', 'mtime'),
'annots': {
'mmgen_tx_file(s)': str,
'pager': 'send output to pager',
'terse': 'produce compact tabular output',
'sort': 'sort order for transaction inputs and outputs ' + options_annot_str(['addr','raw']),
'filesort': 'file sort order ' + options_annot_str(['mtime','ctime','atime']),
'sort': 'sort order for transaction inputs and outputs ' + options_annot_str(['addr', 'raw']),
'filesort': 'file sort order ' + options_annot_str(['mtime', 'ctime', 'atime']),
}
},
*infiles,
**kwargs ):
**kwargs):
"display specified raw or signed MMGen transaction files in human-readable form"
terse = bool(kwargs.get('terse'))
@ -97,14 +97,14 @@ class tool_cmd(tool_cmd_base):
file_sort = kwargs.get('filesort') or 'mtime'
from ..filename import MMGenFileList
from ..tx import completed,CompletedTX
flist = MMGenFileList( infiles, base_class=completed.Completed, proto=self.proto )
flist.sort_by_age( key=file_sort ) # in-place sort
from ..tx import completed, CompletedTX
flist = MMGenFileList(infiles, base_class=completed.Completed, proto=self.proto)
flist.sort_by_age(key=file_sort) # in-place sort
async def process_file(f):
return (await CompletedTX(
cfg = self.cfg,
filename = f.name,
quiet_open = True)).info.format( terse=terse, sort=tx_sort )
quiet_open = True)).info.format(terse=terse, sort=tx_sort)
return (''*77+'\n').join([await process_file(f) for f in flist]).rstrip()

View file

@ -24,7 +24,7 @@ import os
from .common import tool_cmd_base
from ..crypto import Crypto
from ..fileutil import get_data_from_file,write_data_to_file
from ..fileutil import get_data_from_file, write_data_to_file
class tool_cmd(tool_cmd_base):
"""
@ -35,20 +35,20 @@ class tool_cmd(tool_cmd_base):
* Enc: AES256_CTR, 16-byte rand IV, sha256 hash + 32-byte nonce + data
* The encrypted file is indistinguishable from random data
"""
def encrypt(self,infile:str,outfile='',hash_preset=''):
def encrypt(self, infile: str, outfile='', hash_preset=''):
"encrypt a file"
data = get_data_from_file( self.cfg, infile, 'data for encryption', binary=True )
enc_d = Crypto(self.cfg).mmgen_encrypt( data, 'data', hash_preset )
data = get_data_from_file(self.cfg, infile, 'data for encryption', binary=True)
enc_d = Crypto(self.cfg).mmgen_encrypt(data, 'data', hash_preset)
if not outfile:
outfile = f'{os.path.basename(infile)}.{Crypto.mmenc_ext}'
write_data_to_file( self.cfg, outfile, enc_d, 'encrypted data', binary=True )
write_data_to_file(self.cfg, outfile, enc_d, 'encrypted data', binary=True)
return True
def decrypt(self,infile:str,outfile='',hash_preset=''):
def decrypt(self, infile: str, outfile='', hash_preset=''):
"decrypt a file"
enc_d = get_data_from_file( self.cfg, infile, 'encrypted data', binary=True )
enc_d = get_data_from_file(self.cfg, infile, 'encrypted data', binary=True)
while True:
dec_d = Crypto(self.cfg).mmgen_decrypt( enc_d, 'data', hash_preset )
dec_d = Crypto(self.cfg).mmgen_decrypt(enc_d, 'data', hash_preset)
if dec_d:
break
from ..util import msg
@ -56,8 +56,8 @@ class tool_cmd(tool_cmd_base):
if not outfile:
from ..util import remove_extension
o = os.path.basename(infile)
outfile = remove_extension(o,Crypto.mmenc_ext)
outfile = remove_extension(o, Crypto.mmenc_ext)
if outfile == o:
outfile += '.dec'
write_data_to_file( self.cfg, outfile, dec_d, 'decrypted data', binary=True )
write_data_to_file(self.cfg, outfile, dec_d, 'decrypted data', binary=True)
return True

View file

@ -20,10 +20,10 @@
tool.fileutil: File routines for the 'mmgen-tool' utility
"""
import sys,os
import sys, os
from .common import tool_cmd_base
from ..util import msg,msg_r,die,suf,make_full_path
from ..util import msg, msg_r, die, suf, make_full_path
from ..crypto import Crypto
class tool_cmd(tool_cmd_base):
@ -37,15 +37,15 @@ class tool_cmd(tool_cmd_base):
from hashlib import sha256
ivsize,bsize,mod = ( Crypto.aesctr_iv_len, 4096, 4096*8 )
n,carry = 0,b' '*ivsize
ivsize, bsize, mod = (Crypto.aesctr_iv_len, 4096, 4096*8)
n, carry = 0, b' '*ivsize
flgs = os.O_RDONLY|os.O_BINARY if sys.platform == 'win32' else os.O_RDONLY
f = os.open(filename,flgs)
f = os.open(filename, flgs)
for ch in incog_id:
if ch not in '0123456789ABCDEF':
die(2,f'{incog_id!r}: invalid Incog ID')
die(2, f'{incog_id!r}: invalid Incog ID')
while True:
d = os.read(f,bsize)
d = os.read(f, bsize)
if not d:
break
d = carry + d
@ -65,7 +65,7 @@ class tool_cmd(tool_cmd_base):
os.close(f)
return True
def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False):
def rand2file(self, outfile: str, nbytes: str, threads=4, silent=False):
"""
write nbytes bytes of random data to specified file (dd-style byte specifiers supported)
@ -89,39 +89,39 @@ class tool_cmd(tool_cmd_base):
"""
from threading import Thread
from queue import Queue
from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from ..util2 import parse_bytespec
def encrypt_worker():
ctr_init_val = os.urandom( Crypto.aesctr_iv_len )
c = Cipher( algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend() )
ctr_init_val = os.urandom(Crypto.aesctr_iv_len)
c = Cipher(algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend())
encryptor = c.encryptor()
while True:
q2.put( encryptor.update(q1.get()) )
q2.put(encryptor.update(q1.get()))
q1.task_done()
def output_worker():
while True:
f.write( q2.get() )
f.write(q2.get())
q2.task_done()
nbytes = parse_bytespec(nbytes)
if self.cfg.outdir:
outfile = make_full_path( self.cfg.outdir, outfile )
outfile = make_full_path(self.cfg.outdir, outfile)
f = open(outfile,'wb')
f = open(outfile, 'wb')
key = Crypto(self.cfg).get_random(32)
q1,q2 = ( Queue(), Queue() )
q1, q2 = (Queue(), Queue())
for i in range(max(1,threads-2)):
for i in range(max(1, threads-2)):
t = Thread(target=encrypt_worker)
t.daemon = True
t.start()
t = Thread( target=output_worker )
t = Thread(target=output_worker)
t.daemon = True
t.start()
@ -129,10 +129,10 @@ class tool_cmd(tool_cmd_base):
for i in range(nbytes // blk_size):
if not i % 4:
msg_r(f'\rRead: {i * blk_size} bytes')
q1.put( os.urandom(blk_size) )
q1.put(os.urandom(blk_size))
if nbytes % blk_size:
q1.put( os.urandom(nbytes % blk_size) )
q1.put(os.urandom(nbytes % blk_size))
q1.join()
q2.join()
@ -140,7 +140,7 @@ class tool_cmd(tool_cmd_base):
fsize = os.stat(outfile).st_size
if fsize != nbytes:
die(3,f'{fsize}: incorrect random file size (should be {nbytes})')
die(3, f'{fsize}: incorrect random file size (should be {nbytes})')
if not silent:
msg(f'\rRead: {nbytes} bytes')
@ -148,7 +148,7 @@ class tool_cmd(tool_cmd_base):
return True
def decrypt_keystore(self, wallet_file:str, output_hex=False):
def decrypt_keystore(self, wallet_file: str, output_hex=False):
"decrypt the data in a keystore wallet, returning the decrypted data in binary format"
from ..ui import line_input
passwd = line_input(self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase).strip().encode()
@ -159,7 +159,7 @@ class tool_cmd(tool_cmd_base):
ret = decrypt_keystore(data[0]['keystore'], passwd)
return ret.hex() if output_hex else ret
def decrypt_geth_keystore(self, wallet_file:str, check_addr=True):
def decrypt_geth_keystore(self, wallet_file: str, check_addr=True):
"decrypt the private key in a Geth keystore wallet, returning the decrypted key in hex format"
from ..ui import line_input
passwd = line_input(self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase).strip().encode()

View file

@ -30,7 +30,7 @@ def main_help():
from ..util2 import pretty_format
def do():
for clsname,cmdlist in main_tool.mods.items():
for clsname, cmdlist in main_tool.mods.items():
cls = main_tool.get_mod_cls(clsname)
cls_docstr = cls.__doc__.strip()
yield capfirst(cls_docstr.split('\n')[0].strip()) + ':'
@ -41,10 +41,10 @@ def main_help():
yield ' ' + line.lstrip('\t')
yield ''
max_w = max(map(len,cmdlist))
max_w = max(map(len, cmdlist))
for cmdname in cmdlist:
code = getattr(cls,cmdname)
code = getattr(cls, cmdname)
if code.__doc__:
yield ' {:{}} - {}'.format(
cmdname,
@ -109,11 +109,11 @@ def gen_tool_usage():
for line in m1.lstrip().split('\n'):
yield line.lstrip('\t')
for clsname,cmdlist in main_tool.mods.items():
for clsname, cmdlist in main_tool.mods.items():
cls = main_tool.get_mod_cls(clsname)
cls_docstr = cls.__doc__.strip()
yield ''
yield ' {}:'.format( capfirst(cls_docstr.split('\n')[0].strip()) )
yield ' {}:'.format(capfirst(cls_docstr.split('\n')[0].strip()))
yield ''
if '\n' in cls_docstr:
@ -121,49 +121,49 @@ def gen_tool_usage():
yield ' ' + line.lstrip('\t')
yield ''
max_w = max(map(len,cmdlist))
max_w = max(map(len, cmdlist))
for cmdname in cmdlist:
yield ' {a:{w}} {b}'.format(
a = cmdname,
b = main_tool.create_call_sig(cmdname,cls,as_string=True),
w = max_w )
b = main_tool.create_call_sig(cmdname, cls, as_string=True),
w = max_w)
yield ''
for line in m2.rstrip().split('\n'):
yield line.lstrip('\t')
def gen_tool_cmd_usage(mod,cmdname):
def gen_tool_cmd_usage(mod, cmdname):
from ..cfg import gc
from ..util import capfirst
cls = main_tool.get_mod_cls(mod)
docstr = getattr(cls,cmdname).__doc__.strip()
args,kwargs,kwargs_types,_,ann = main_tool.create_call_sig(cmdname,cls)
docstr = getattr(cls, cmdname).__doc__.strip()
args, kwargs, kwargs_types, _, ann = main_tool.create_call_sig(cmdname, cls)
ARGS = 'ARG' if len(args) == 1 else 'ARGS' if args else ''
KWARGS = 'KEYWORD ARG' if len(kwargs) == 1 else 'KEYWORD ARGS' if kwargs else ''
yield capfirst( docstr.split('\n')[0].strip() )
yield capfirst(docstr.split('\n')[0].strip())
yield ''
yield 'USAGE: {b} [OPTS] {c}{d}{e}'.format(
b = gc.prog_name,
c = cmdname,
d = f' {ARGS}' if ARGS else '',
e = f' [{KWARGS}]' if KWARGS else '' )
e = f' [{KWARGS}]' if KWARGS else '')
if args:
max_w = max(len(k[0]) for k in args)
yield ''
yield f'Required {ARGS} (type shown in square brackets):'
yield ''
for argname,argtype in args:
for argname, argtype in args:
have_sstr = ann.get(argname) == 'sstr'
yield ' {a:{w}} [{b}]{c}{d}'.format(
a = argname,
b = argtype,
c = " (use '-' to read from STDIN)" if have_sstr else '',
d = ' ' + ann[argname] if isinstance(ann.get(argname),str) and not have_sstr else '',
w = max_w )
d = ' ' + ann[argname] if isinstance(ann.get(argname), str) and not have_sstr else '',
w = max_w)
if kwargs:
max_w = max(len(k) for k in kwargs)
@ -174,26 +174,26 @@ def gen_tool_cmd_usage(mod,cmdname):
for argname in kwargs:
yield ' {a:{w}} {b:{w2}} {c}'.format(
a = argname,
b = '[{}={}]'.format( kwargs_types[argname].__name__, repr(kwargs[argname]) ),
c = capfirst(ann[argname]) if isinstance(ann.get(argname),str) else '',
b = '[{}={}]'.format(kwargs_types[argname].__name__, repr(kwargs[argname])),
c = capfirst(ann[argname]) if isinstance(ann.get(argname), str) else '',
w = max_w,
w2 = max_w2 ).rstrip()
w2 = max_w2).rstrip()
if '\n' in docstr:
for line in docstr.split('\n')[1:]:
yield line.lstrip('\t')
def usage(cmdname=None,exit_val=1):
def usage(cmdname=None, exit_val=1):
from ..util import Msg,die
from ..util import Msg, die
if cmdname:
for mod,cmdlist in main_tool.mods.items():
for mod, cmdlist in main_tool.mods.items():
if cmdname in cmdlist:
Msg('\n'.join(gen_tool_cmd_usage(mod,cmdname)))
Msg('\n'.join(gen_tool_cmd_usage(mod, cmdname)))
break
else:
die(1,f'{cmdname!r}: no such tool command')
die(1, f'{cmdname!r}: no such tool command')
else:
from ..ui import do_pager
do_pager('\n'.join(gen_tool_usage()) + '\n')
@ -204,10 +204,10 @@ def usage(cmdname=None,exit_val=1):
class tool_cmd(tool_cmd_base):
"help/usage commands"
def help(self,command_name=''):
def help(self, command_name=''):
"display usage information for a single command or all commands"
usage(command_name,exit_val=0)
usage(command_name, exit_val=0)
def usage(self,command_name=''):
def usage(self, command_name=''):
"display usage information for a single command or all commands"
usage(command_name,exit_val=0)
usage(command_name, exit_val=0)

View file

@ -22,18 +22,18 @@ tool.mnemonic: Mnemonic routines for the 'mmgen-tool' utility
from collections import namedtuple
from .common import tool_cmd_base,options_annot_str
from .common import tool_cmd_base, options_annot_str
from ..baseconv import baseconv
from ..xmrseed import xmrseed
from ..bip39 import bip39
dfl_mnemonic_fmt = 'mmgen'
mft = namedtuple('mnemonic_format',['fmt','pad','conv_cls'])
mft = namedtuple('mnemonic_format', ['fmt', 'pad', 'conv_cls'])
mnemonic_fmts = {
'mmgen': mft( 'words', 'seed', baseconv ),
'bip39': mft( 'bip39', None, bip39 ),
'xmrseed': mft( 'xmrseed', None, xmrseed ),
'mmgen': mft('words', 'seed', baseconv),
'bip39': mft('bip39', None, bip39),
'xmrseed': mft('xmrseed', None, xmrseed),
}
mn_opts_disp = 'seed phrase format ' + options_annot_str(mnemonic_fmts)
@ -60,18 +60,18 @@ class tool_cmd(tool_cmd_base):
use the hex2wif command.
"""
def _xmr_reduce(self,bytestr):
def _xmr_reduce(self, bytestr):
from ..protocol import init_proto
proto = init_proto( self.cfg, 'xmr' )
proto = init_proto(self.cfg, 'xmr')
if len(bytestr) != proto.privkey_len:
from ..util import die
die(1,'{!r}: invalid bit length for Monero private key (must be {})'.format(
die(1, '{!r}: invalid bit length for Monero private key (must be {})'.format(
len(bytestr*8),
proto.privkey_len*8 ))
return proto.preprocess_key(bytestr,None)
proto.privkey_len*8))
return proto.preprocess_key(bytestr, None)
def _do_random_mn(self,nbytes:int,fmt:str):
assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32'
def _do_random_mn(self, nbytes: int, fmt: str):
assert nbytes in (16, 24, 32), 'nbytes must be 16, 24 or 32'
from ..crypto import Crypto
randbytes = Crypto(self.cfg).get_random(nbytes)
if fmt == 'xmrseed':
@ -79,54 +79,54 @@ class tool_cmd(tool_cmd_base):
if self.cfg.verbose:
from ..util import msg
msg(f'Seed: {randbytes.hex()}')
return self.hex2mn(randbytes.hex(),fmt=fmt)
return self.hex2mn(randbytes.hex(), fmt=fmt)
def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt):
"generate a random 128-bit mnemonic seed phrase"
return self._do_random_mn(16,fmt)
return self._do_random_mn(16, fmt)
def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt):
"generate a random 192-bit mnemonic seed phrase"
return self._do_random_mn(24,fmt)
return self._do_random_mn(24, fmt)
def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt):
"generate a random 256-bit mnemonic seed phrase"
return self._do_random_mn(32,fmt)
return self._do_random_mn(32, fmt)
def hex2mn( self, hexstr:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ):
def hex2mn(self, hexstr: 'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt):
"convert a 16, 24 or 32-byte hexadecimal string to a mnemonic seed phrase"
if fmt == 'xmrseed':
hexstr = self._xmr_reduce(bytes.fromhex(hexstr)).hex()
f = mnemonic_fmts[fmt]
return ' '.join( f.conv_cls(fmt).fromhex(hexstr,f.pad) )
return ' '.join(f.conv_cls(fmt).fromhex(hexstr, f.pad))
def mn2hex( self, seed_mnemonic:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ):
def mn2hex(self, seed_mnemonic: 'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt):
"convert a mnemonic seed phrase to a hexadecimal string"
f = mnemonic_fmts[fmt]
return f.conv_cls(fmt).tohex( seed_mnemonic.split(), f.pad )
return f.conv_cls(fmt).tohex(seed_mnemonic.split(), f.pad)
def mn2hex_interactive( self,
def mn2hex_interactive(self,
fmt: mn_opts_disp = dfl_mnemonic_fmt,
mn_len: 'length of seed phrase in words' = 24,
print_mn: 'print the seed phrase after entry' = False ):
print_mn: 'print the seed phrase after entry' = False):
"convert an interactively supplied mnemonic seed phrase to a hexadecimal string"
from ..mn_entry import mn_entry
mn = mn_entry( self.cfg, fmt ).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len,validate=False)
mn = mn_entry(self.cfg, fmt).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len, validate=False)
if print_mn:
from ..util import msg
msg(mn)
return self.mn2hex(seed_mnemonic=mn,fmt=fmt)
return self.mn2hex(seed_mnemonic=mn, fmt=fmt)
def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ):
def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt):
"show stats for a mnemonic wordlist"
return mnemonic_fmts[fmt].conv_cls(fmt).check_wordlist(self.cfg)
def mn_printlist(self,
fmt: mn_opts_disp = dfl_mnemonic_fmt,
enum: 'enumerate the list' = False,
pager: 'send output to pager' = False ):
pager: 'send output to pager' = False):
"print a mnemonic wordlist"
ret = mnemonic_fmts[fmt].conv_cls(fmt).get_wordlist()
if enum:
ret = [f'{n:>4} {e}' for n,e in enumerate(ret)]
ret = [f'{n:>4} {e}' for n, e in enumerate(ret)]
return '\n'.join(ret)

View file

@ -20,7 +20,7 @@
tool.rpc: JSON/RPC routines for the 'mmgen-tool' utility
"""
from .common import tool_cmd_base,options_annot_str
from .common import tool_cmd_base, options_annot_str
from ..tw.view import TwView
from ..tw.txhistory import TwTxHistory
@ -33,7 +33,7 @@ class tool_cmd(tool_cmd_base):
async def daemon_version(self):
"print coin daemon version"
from ..daemon import CoinDaemon
d = CoinDaemon( cfg=self.cfg, proto=self.proto, test_suite=self.cfg.test_suite )
d = CoinDaemon(cfg=self.cfg, proto=self.proto, test_suite=self.cfg.test_suite)
if self.proto.base_proto == 'Monero':
from ..proto.xmr.rpc import MoneroRPCClient
r = MoneroRPCClient(
@ -44,7 +44,7 @@ class tool_cmd(tool_cmd_base):
port = d.rpc_port,
user = None,
passwd = None,
ignore_daemon_version = True )
ignore_daemon_version = True)
else:
from ..rpc import rpc_init
r = await rpc_init(self.cfg, self.proto, ignore_daemon_version=True, ignore_wallet=True)
@ -53,22 +53,22 @@ class tool_cmd(tool_cmd_base):
async def getbalance(self,
minconf: 'minimum number of confirmations' = 1,
quiet: 'produce quieter output' = False,
pager: 'send output to pager' = False ):
pager: 'send output to pager' = False):
"list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet"
from ..tw.bal import TwGetBalance
return (await TwGetBalance(self.cfg,self.proto,minconf,quiet)).format(color=self.cfg.color)
return (await TwGetBalance(self.cfg, self.proto, minconf, quiet)).format(color=self.cfg.color)
async def twops(self,
obj,pager,reverse,detail,sort,age_fmt,interactive,
**kwargs ):
obj, pager, reverse, detail, sort, age_fmt, interactive,
**kwargs):
obj.reverse = reverse
obj.age_fmt = age_fmt
for k,v in kwargs.items():
setattr(obj,k,v)
for k, v in kwargs.items():
setattr(obj, k, v)
await obj.get_data(sort_key=sort,reverse_sort=reverse)
await obj.get_data(sort_key=sort, reverse_sort=reverse)
if interactive:
await obj.view_filter_and_sort()
@ -76,7 +76,7 @@ class tool_cmd(tool_cmd_base):
else:
ret = await obj.format('detail' if detail else 'squeezed')
if hasattr(obj,'twctl'):
if hasattr(obj, 'twctl'):
del obj.twctl
return ret
@ -89,14 +89,14 @@ class tool_cmd(tool_cmd_base):
sort: 'unspent output sort order ' + options_annot_str(TwView.sort_funcs) = 'age',
age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs',
interactive: 'enable interactive operation' = False,
show_mmid: 'show MMGen IDs along with coin addresses' = True ):
show_mmid: 'show MMGen IDs along with coin addresses' = True):
"view tracking wallet unspent outputs"
from ..tw.unspent import TwUnspentOutputs
obj = await TwUnspentOutputs(self.cfg,self.proto,minconf=minconf)
obj = await TwUnspentOutputs(self.cfg, self.proto, minconf=minconf)
return await self.twops(
obj,pager,reverse,wide,sort,age_fmt,interactive,
show_mmid = show_mmid )
obj, pager, reverse, wide, sort, age_fmt, interactive,
show_mmid = show_mmid)
async def txhist(self,
pager: 'send output to pager' = False,
@ -105,19 +105,19 @@ class tool_cmd(tool_cmd_base):
sinceblock: 'display transactions starting from this block' = 0,
sort: 'transaction sort order ' + options_annot_str(TwTxHistory.sort_funcs) = 'age',
age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs',
interactive: 'enable interactive operation' = False ):
interactive: 'enable interactive operation' = False):
"view transaction history of tracking wallet"
obj = await TwTxHistory(self.cfg,self.proto,sinceblock=sinceblock)
obj = await TwTxHistory(self.cfg, self.proto, sinceblock=sinceblock)
return await self.twops(
obj,pager,reverse,detail,sort,age_fmt,interactive )
obj, pager, reverse, detail, sort, age_fmt, interactive)
async def listaddress(self,
mmgen_addr:str,
mmgen_addr: str,
wide: 'display data in wide tabular format' = False,
minconf: 'minimum number of confirmations' = 1,
showcoinaddr: 'display coin address in addition to MMGen ID' = True,
age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs' ):
age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs'):
"list the specified MMGen address in the tracking wallet and its balance"
return await self.listaddresses(
@ -125,72 +125,72 @@ class tool_cmd(tool_cmd_base):
wide = wide,
minconf = minconf,
showcoinaddrs = showcoinaddr,
age_fmt = age_fmt )
age_fmt = age_fmt)
async def listaddresses(self,
pager: 'send output to pager' = False,
reverse: 'reverse order of unspent outputs' = False,
wide: 'display data in wide tabular format' = False,
minconf: 'minimum number of confirmations' = 1,
sort: 'address sort order ' + options_annot_str(['reverse','mmid','addr','amt']) = '',
sort: 'address sort order ' + options_annot_str(['reverse', 'mmid', 'addr', 'amt']) = '',
age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs',
interactive: 'enable interactive operation' = False,
mmgen_addrs: 'hyphenated range or comma-separated list of addresses' = '',
showcoinaddrs:'display coin addresses in addition to MMGen IDs' = True,
showempty: 'show addresses with no balances' = True,
showused: 'show used addresses (tristate: 0=no, 1=yes, 2=all)' = 1,
all_labels: 'show all addresses with labels' = False ):
all_labels: 'show all addresses with labels' = False):
"list MMGen addresses in the tracking wallet and their balances"
assert showused in (0,1,2), "‘showused’ must have a value of 0, 1 or 2"
assert showused in (0, 1, 2), "‘showused’ must have a value of 0, 1 or 2"
from ..tw.addresses import TwAddresses
obj = await TwAddresses(self.cfg,self.proto,minconf=minconf,mmgen_addrs=mmgen_addrs)
obj = await TwAddresses(self.cfg, self.proto, minconf=minconf, mmgen_addrs=mmgen_addrs)
return await self.twops(
obj,pager,reverse,wide,sort,age_fmt,interactive,
obj, pager, reverse, wide, sort, age_fmt, interactive,
showcoinaddrs = showcoinaddrs,
showempty = showempty,
showused = showused,
all_labels = all_labels )
all_labels = all_labels)
async def add_label(self,mmgen_or_coin_addr:str,label:str):
async def add_label(self, mmgen_or_coin_addr: str, label: str):
"add descriptive label for address in tracking wallet"
from ..tw.ctl import TwCtl
return await (await TwCtl(self.cfg,self.proto,mode='w')).set_comment(mmgen_or_coin_addr,label)
return await (await TwCtl(self.cfg, self.proto, mode='w')).set_comment(mmgen_or_coin_addr, label)
async def remove_label(self,mmgen_or_coin_addr:str):
async def remove_label(self, mmgen_or_coin_addr: str):
"remove descriptive label for address in tracking wallet"
await self.add_label( mmgen_or_coin_addr, '' )
await self.add_label(mmgen_or_coin_addr, '')
return True
async def remove_address(self,mmgen_or_coin_addr:str):
async def remove_address(self, mmgen_or_coin_addr: str):
"remove an address from tracking wallet"
from ..tw.ctl import TwCtl
# returns None on failure:
ret = await (await TwCtl(self.cfg,self.proto,mode='w')).remove_address(mmgen_or_coin_addr)
ret = await (await TwCtl(self.cfg, self.proto, mode='w')).remove_address(mmgen_or_coin_addr)
if ret:
from ..util import msg
msg(f'Address {ret!r} deleted from tracking wallet')
return ret
async def resolve_address(self,mmgen_or_coin_addr:str):
async def resolve_address(self, mmgen_or_coin_addr: str):
"resolve an MMGen address in the tracking wallet to a coin address or vice-versa"
from ..tw.ctl import TwCtl
ret = await (await TwCtl(self.cfg,self.proto,mode='w')).resolve_address( mmgen_or_coin_addr )
ret = await (await TwCtl(self.cfg, self.proto, mode='w')).resolve_address(mmgen_or_coin_addr)
if ret:
from ..addr import is_coin_addr
return ret.twmmid if is_coin_addr(self.proto,mmgen_or_coin_addr) else ret.coinaddr
return ret.twmmid if is_coin_addr(self.proto, mmgen_or_coin_addr) else ret.coinaddr
else:
return False
async def rescan_address(self,mmgen_or_coin_addr:str):
async def rescan_address(self, mmgen_or_coin_addr: str):
"rescan an address in the tracking wallet to update its balance"
from ..tw.ctl import TwCtl
return await (await TwCtl(self.cfg,self.proto,mode='w')).rescan_address( mmgen_or_coin_addr )
return await (await TwCtl(self.cfg, self.proto, mode='w')).rescan_address(mmgen_or_coin_addr)
async def rescan_blockchain(self,
start_block: int = None,
stop_block: int = None ):
stop_block: int = None):
"""
rescan the blockchain to update historical transactions in the tracking wallet
@ -201,10 +201,10 @@ class tool_cmd(tool_cmd_base):
parameter.
"""
from ..tw.ctl import TwCtl
await (await TwCtl(self.cfg,self.proto,mode='w')).rescan_blockchain(start_block,stop_block)
await (await TwCtl(self.cfg, self.proto, mode='w')).rescan_blockchain(start_block, stop_block)
return True
async def twexport(self,include_amts=True,pretty=False,prune=False,warn_used=False,force=False):
async def twexport(self, include_amts=True, pretty=False, prune=False, warn_used=False, force=False):
"""
export a tracking wallet to JSON format
@ -234,10 +234,10 @@ class tool_cmd(tool_cmd_base):
pretty = pretty,
prune = prune,
warn_used = warn_used,
force_overwrite = force )
force_overwrite = force)
return True
async def twimport(self,filename:str,ignore_checksum=False,batch=False):
async def twimport(self, filename: str, ignore_checksum=False, batch=False):
"""
restore a tracking wallet from a JSON dump created by twexport
@ -251,5 +251,5 @@ class tool_cmd(tool_cmd_base):
rescan_blockchain.
"""
from ..tw.json import TwJSON
await TwJSON.Import( self.cfg, self.proto, filename, ignore_checksum=ignore_checksum, batch=batch )
await TwJSON.Import(self.cfg, self.proto, filename, ignore_checksum=ignore_checksum, batch=batch)
return True

View file

@ -20,7 +20,7 @@
tool.util: Utility commands for the 'mmgen-tool' utility
"""
import sys,os
import sys, os
from .common import tool_cmd_base
@ -28,7 +28,7 @@ class tool_cmd(tool_cmd_base):
"general string conversion and hashing utilities"
# mmgen.util2.bytespec_map
def bytespec(self,dd_style_byte_specifier:str):
def bytespec(self, dd_style_byte_specifier: str):
"""
convert a byte specifier such as 4GB into an integer
@ -60,7 +60,7 @@ class tool_cmd(tool_cmd_base):
fmt: 'width and precision of output' = '0.2',
print_sym: 'print the specifier after the numerical value' = True,
strip: 'strip trailing zeroes' = False,
add_space: 'with print_sym, add space between value and specifier' = False ):
add_space: 'with print_sym, add space between value and specifier' = False):
"""
convert an integer to a byte specifier such as 4GB
@ -89,25 +89,25 @@ class tool_cmd(tool_cmd_base):
fmt,
print_sym = print_sym,
strip = strip,
add_space = add_space )
add_space = add_space)
def randhex(self,
nbytes: 'number of bytes to output' = 32 ):
nbytes: 'number of bytes to output' = 32):
"print 'n' bytes (default 32) of random data in hex format"
from ..crypto import Crypto
return Crypto(self.cfg).get_random( nbytes ).hex()
return Crypto(self.cfg).get_random(nbytes).hex()
def hexreverse(self,hexstr:'sstr'):
def hexreverse(self, hexstr: 'sstr'):
"reverse bytes of a hexadecimal string"
return bytes.fromhex( hexstr.strip() )[::-1].hex()
return bytes.fromhex(hexstr.strip())[::-1].hex()
def hexlify(self,infile:str):
def hexlify(self, infile: str):
"convert bytes in file to hexadecimal (use '-' for stdin)"
from ..fileutil import get_data_from_file
data = get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True )
data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True)
return data.hex()
def unhexlify(self,hexstr:'sstr'):
def unhexlify(self, hexstr: 'sstr'):
"convert a hexadecimal string to bytes (warning: outputs binary data)"
return bytes.fromhex(hexstr)
@ -118,34 +118,34 @@ class tool_cmd(tool_cmd_base):
"create hexdump of data from file (use '-' for stdin)"
from ..fileutil import get_data_from_file
from ..util2 import pretty_hexdump
data = get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True )
return pretty_hexdump( data, cols=cols, line_nums=line_nums ).rstrip()
data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True)
return pretty_hexdump(data, cols=cols, line_nums=line_nums).rstrip()
def unhexdump(self,infile:str):
def unhexdump(self, infile: str):
"decode hexdump from file (use '-' for stdin) (warning: outputs binary data)"
if sys.platform == 'win32':
import msvcrt
msvcrt.setmode( sys.stdout.fileno(), os.O_BINARY )
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
from ..fileutil import get_data_from_file
from ..util2 import decode_pretty_hexdump
hexdata = get_data_from_file( self.cfg, infile, dash=True, quiet=True )
hexdata = get_data_from_file(self.cfg, infile, dash=True, quiet=True)
return decode_pretty_hexdump(hexdata)
def hash160(self,hexstr:'sstr'):
def hash160(self, hexstr: 'sstr'):
"compute ripemd160(sha256(data)) (convert hex pubkey to hex addr)"
from ..proto.btc.common import hash160
return hash160( bytes.fromhex(hexstr) ).hex()
return hash160(bytes.fromhex(hexstr)).hex()
# TODO: handle stdin
def hash256(self,
data: str,
file_input: 'first arg is the name of a file containing the data' = False,
hex_input: 'first arg is a hexadecimal string' = False ):
hex_input: 'first arg is a hexadecimal string' = False):
"compute sha256(sha256(data)) (double sha256)"
from hashlib import sha256
if file_input:
from ..fileutil import get_data_from_file
b = get_data_from_file( self.cfg, data, binary=True )
b = get_data_from_file(self.cfg, data, binary=True)
elif hex_input:
from ..util2 import decode_pretty_hexdump
b = decode_pretty_hexdump(data)
@ -153,87 +153,87 @@ class tool_cmd(tool_cmd_base):
b = data
return sha256(sha256(b.encode()).digest()).hexdigest()
def id6(self,infile:str):
def id6(self, infile: str):
"generate 6-character MMGen ID for a file (use '-' for stdin)"
from ..util import make_chksum_6
from ..fileutil import get_data_from_file
return make_chksum_6(
get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True ))
get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True))
def str2id6(self,string:'sstr'): # retain ignoring of space for backwards compat
def str2id6(self, string: 'sstr'): # retain ignoring of space for backwards compat
"generate 6-character MMGen ID for a string, ignoring spaces in string"
from ..util import make_chksum_6
return make_chksum_6( ''.join(string.split()) )
return make_chksum_6(''.join(string.split()))
def id8(self,infile:str):
def id8(self, infile: str):
"generate 8-character MMGen ID for a file (use '-' for stdin)"
from ..util import make_chksum_8
from ..fileutil import get_data_from_file
return make_chksum_8(
get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True ))
get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True))
def randb58(self,
nbytes: 'number of bytes to output' = 32,
pad: 'pad output to this width' = 0 ):
pad: 'pad output to this width' = 0):
"generate random data (default: 32 bytes) and convert it to base 58"
from ..baseconv import baseconv
from ..crypto import Crypto
return baseconv('b58').frombytes( Crypto(self.cfg).get_random(nbytes), pad=pad, tostr=True )
return baseconv('b58').frombytes(Crypto(self.cfg).get_random(nbytes), pad=pad, tostr=True)
def bytestob58(self,infile:str,pad: 'pad output to this width' = 0):
def bytestob58(self, infile: str, pad: 'pad output to this width' = 0):
"convert bytes to base 58 (supply data via STDIN)"
from ..fileutil import get_data_from_file
from ..baseconv import baseconv
data = get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True )
return baseconv('b58').frombytes( data, pad=pad, tostr=True )
data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True)
return baseconv('b58').frombytes(data, pad=pad, tostr=True)
def b58tobytes(self,b58_str:'sstr',pad: 'pad output to this width' = 0):
def b58tobytes(self, b58_str: 'sstr', pad: 'pad output to this width' = 0):
"convert a base 58 string to bytes (warning: outputs binary data)"
from ..baseconv import baseconv
return baseconv('b58').tobytes( b58_str, pad=pad )
return baseconv('b58').tobytes(b58_str, pad=pad)
def hextob58(self,hexstr:'sstr',pad: 'pad output to this width' = 0):
def hextob58(self, hexstr: 'sstr', pad: 'pad output to this width' = 0):
"convert a hexadecimal string to base 58"
from ..baseconv import baseconv
return baseconv('b58').fromhex( hexstr, pad=pad, tostr=True )
return baseconv('b58').fromhex(hexstr, pad=pad, tostr=True)
def b58tohex(self,b58_str:'sstr',pad: 'pad output to this width' = 0):
def b58tohex(self, b58_str: 'sstr', pad: 'pad output to this width' = 0):
"convert a base 58 string to hexadecimal"
from ..baseconv import baseconv
return baseconv('b58').tohex( b58_str, pad=pad )
return baseconv('b58').tohex(b58_str, pad=pad)
def hextob58chk(self,hexstr:'sstr'):
def hextob58chk(self, hexstr: 'sstr'):
"convert a hexadecimal string to base58-check encoding"
from ..proto.btc.common import b58chk_encode
return b58chk_encode( bytes.fromhex(hexstr) )
return b58chk_encode(bytes.fromhex(hexstr))
def b58chktohex(self,b58chk_str:'sstr'):
def b58chktohex(self, b58chk_str: 'sstr'):
"convert a base58-check encoded string to hexadecimal"
from ..proto.btc.common import b58chk_decode
return b58chk_decode(b58chk_str).hex()
def hextob32(self,hexstr:'sstr',pad: 'pad output to this width' = 0):
def hextob32(self, hexstr: 'sstr', pad: 'pad output to this width' = 0):
"convert a hexadecimal string to an MMGen-flavor base 32 string"
from ..baseconv import baseconv
return baseconv('b32').fromhex( hexstr, pad, tostr=True )
return baseconv('b32').fromhex(hexstr, pad, tostr=True)
def b32tohex(self,b32_str:'sstr',pad: 'pad output to this width' = 0):
def b32tohex(self, b32_str: 'sstr', pad: 'pad output to this width' = 0):
"convert an MMGen-flavor base 32 string to hexadecimal"
from ..baseconv import baseconv
return baseconv('b32').tohex( b32_str.upper(), pad )
return baseconv('b32').tohex(b32_str.upper(), pad)
def hextob6d(self,
hexstr:'sstr',
hexstr: 'sstr',
pad: 'pad output to this width' = 0,
add_spaces: 'add a space after every 5th character' = True):
"convert a hexadecimal string to die roll base6 (base6d)"
from ..baseconv import baseconv
from ..util2 import block_format
ret = baseconv('b6d').fromhex(hexstr,pad,tostr=True)
return block_format( ret, gw=5, cols=None ).strip() if add_spaces else ret
ret = baseconv('b6d').fromhex(hexstr, pad, tostr=True)
return block_format(ret, gw=5, cols=None).strip() if add_spaces else ret
def b6dtohex(self,b6d_str:'sstr',pad: 'pad output to this width' = 0):
def b6dtohex(self, b6d_str: 'sstr', pad: 'pad output to this width' = 0):
"convert a die roll base6 (base6d) string to hexadecimal"
from ..baseconv import baseconv
from ..util import remove_whitespace
return baseconv('b6d').tohex( remove_whitespace(b6d_str), pad )
return baseconv('b6d').tohex(remove_whitespace(b6d_str), pad)

View file

@ -29,64 +29,64 @@ from ..wallet import Wallet
class tool_cmd(tool_cmd_base):
"key, address or subseed generation from an MMGen wallet"
def __init__(self,cfg,cmdname=None,proto=None,mmtype=None):
self.need_proto = cmdname in ('gen_key','gen_addr')
super().__init__(cfg,cmdname=cmdname,proto=proto,mmtype=mmtype)
def __init__(self, cfg, cmdname=None, proto=None, mmtype=None):
self.need_proto = cmdname in ('gen_key', 'gen_addr')
super().__init__(cfg, cmdname=cmdname, proto=proto, mmtype=mmtype)
def _get_seed_file(self,wallet):
def _get_seed_file(self, wallet):
from ..fileutil import get_seed_file
return get_seed_file(
cfg = self.cfg,
wallets = [wallet] if wallet else [],
nargs = 1 )
nargs = 1)
def get_subseed(self,subseed_idx:str,wallet=''):
def get_subseed(self, subseed_idx: str, wallet=''):
"get the Seed ID of a single subseed by Subseed Index for default or specified wallet"
self.cfg._set_quiet(True)
return Wallet(self.cfg,self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid
return Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid
def get_subseed_by_seed_id(self,seed_id:str,wallet='',last_idx=SubSeedList.dfl_len):
def get_subseed_by_seed_id(self, seed_id: str, wallet='', last_idx=SubSeedList.dfl_len):
"get the Subseed Index of a single subseed by Seed ID for default or specified wallet"
self.cfg._set_quiet(True)
ret = Wallet(self.cfg,self._get_seed_file(wallet)).seed.subseed_by_seed_id( seed_id, last_idx )
ret = Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseed_by_seed_id(seed_id, last_idx)
return ret.ss_idx if ret else None
def list_subseeds(self,subseed_idx_range:str,wallet=''):
def list_subseeds(self, subseed_idx_range: str, wallet=''):
"list a range of subseed Seed IDs for default or specified wallet"
self.cfg._set_quiet(True)
from ..subseed import SubSeedIdxRange
return Wallet(self.cfg,self._get_seed_file(wallet)).seed.subseeds.format(
*SubSeedIdxRange(subseed_idx_range) )
return Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseeds.format(
*SubSeedIdxRange(subseed_idx_range))
def list_shares(self,
share_count: int,
id_str = 'default',
master_share: f'(min:1, max:{MasterShareIdx.max_val}, 0=no master share)' = 0,
wallet = '' ):
wallet = ''):
"list the Seed IDs of the shares resulting from a split of default or specified wallet"
self.cfg._set_quiet(True)
return Wallet(self.cfg,self._get_seed_file(wallet)).seed.split(
share_count, id_str, master_share ).format()
return Wallet(self.cfg, self._get_seed_file(wallet)).seed.split(
share_count, id_str, master_share).format()
def gen_key(self,mmgen_addr:str,wallet=''):
def gen_key(self, mmgen_addr: str, wallet=''):
"generate a single WIF key for specified MMGen address from default or specified wallet"
return self._gen_keyaddr( mmgen_addr, 'wif', wallet )
return self._gen_keyaddr(mmgen_addr, 'wif', wallet)
def gen_addr(self,mmgen_addr:str,wallet=''):
def gen_addr(self, mmgen_addr: str, wallet=''):
"generate a single MMGen address from default or specified wallet"
return self._gen_keyaddr( mmgen_addr, 'addr', wallet )
return self._gen_keyaddr(mmgen_addr, 'addr', wallet)
def _gen_keyaddr(self,mmgen_addr,target,wallet=''):
def _gen_keyaddr(self, mmgen_addr, target, wallet=''):
from ..addr import MMGenID
from ..addrlist import AddrList,AddrIdxList
from ..addrlist import AddrList, AddrIdxList
addr = MMGenID( self.proto, mmgen_addr )
addr = MMGenID(self.proto, mmgen_addr)
self.cfg._set_quiet(True)
ss = Wallet(self.cfg,self._get_seed_file(wallet))
ss = Wallet(self.cfg, self._get_seed_file(wallet))
if ss.seed.sid != addr.sid:
from ..util import die
die(1,f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})')
die(1, f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})')
d = AddrList(
cfg = self.cfg,
@ -94,6 +94,6 @@ class tool_cmd(tool_cmd_base):
seed = ss.seed,
addr_idxs = AddrIdxList(str(addr.idx)),
mmtype = addr.mmtype,
skip_chksum = True ).data[0]
skip_chksum = True).data[0]
return { 'wif': d.sec.wif, 'addr': d.addr }[target]
return {'wif': d.sec.wif, 'addr': d.addr}[target]