From f77a6115e4880fc015fd9473a90b6d53fc40a4d0 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Fri, 18 Oct 2024 10:32:10 +0000 Subject: [PATCH] whitespace: tool --- mmgen/tool/__init__.py | 4 +- mmgen/tool/api.py | 34 +++++++------- mmgen/tool/coin.py | 84 +++++++++++++++++------------------ mmgen/tool/common.py | 10 ++--- mmgen/tool/file.py | 50 ++++++++++----------- mmgen/tool/filecrypt.py | 20 ++++----- mmgen/tool/fileutil.py | 46 +++++++++---------- mmgen/tool/help.py | 56 +++++++++++------------ mmgen/tool/mnemonic.py | 60 ++++++++++++------------- mmgen/tool/rpc.py | 88 ++++++++++++++++++------------------ mmgen/tool/util.py | 98 ++++++++++++++++++++--------------------- mmgen/tool/wallet.py | 52 +++++++++++----------- 12 files changed, 301 insertions(+), 301 deletions(-) diff --git a/mmgen/tool/__init__.py b/mmgen/tool/__init__.py index 0f41cc31..31c8228b 100755 --- a/mmgen/tool/__init__.py +++ b/mmgen/tool/__init__.py @@ -1,4 +1,4 @@ # provide this for backwards compatibility: -def tool_api(*args,**kwargs): +def tool_api(*args, **kwargs): from .api import tool_api - return tool_api(*args,**kwargs) + return tool_api(*args, **kwargs) diff --git a/mmgen/tool/api.py b/mmgen/tool/api.py index 1eae357e..523f2fad 100755 --- a/mmgen/tool/api.py +++ b/mmgen/tool/api.py @@ -29,7 +29,7 @@ class tool_api( util_cmds, coin_cmds, mnemonic_cmds, - tool_cmd_base ): + tool_cmd_base): """ API providing access to a subset of methods from the mmgen.tool module @@ -41,7 +41,7 @@ class tool_api( tool = tool_api(Config()) # Set the coin and network: - tool.init_coin('btc','mainnet') + tool.init_coin('btc', 'mainnet') # Print available address types: tool.print_addrtypes() @@ -53,38 +53,38 @@ class tool_api( tool.usr_randchars = 0 # Generate a random BTC segwit keypair: - wif,addr = tool.randpair() + wif, addr = tool.randpair() # Set coin, network and address type: - tool.init_coin('ltc','testnet') + tool.init_coin('ltc', 'testnet') tool.addrtype = 'bech32' # Generate a random LTC testnet Bech32 keypair: - wif,addr = tool.randpair() + wif, addr = tool.randpair() - print('wif: ',wif) - print('addr:',addr) + print('wif: ', wif) + print('addr:', addr) """ need_proto = True need_addrtype = True - def __init__(self,cfg): + def __init__(self, cfg): """ Initializer - takes no arguments """ type(cfg)._reset_ok += ('usr_randchars',) super().__init__(cfg=cfg) - def init_coin(self,coinsym,network): + def init_coin(self, coinsym, network): """ Initialize a coin/network pair Valid choices for coins: one of the symbols returned by the 'coins' attribute - Valid choices for network: 'mainnet','testnet','regtest' + Valid choices for network: 'mainnet', 'testnet', 'regtest' """ - from ..protocol import init_proto,warn_trustlevel + from ..protocol import init_proto, warn_trustlevel warn_trustlevel(self.cfg) - self.proto = init_proto( self.cfg, coinsym, network=network, need_amt=True ) + self.proto = init_proto(self.cfg, coinsym, network=network, need_amt=True) return self.proto @property @@ -114,7 +114,7 @@ class tool_api( first-listed is the default """ from ..addr import MMGenAddrType - return [MMGenAddrType(proto=self.proto,id_str=id_str).name for id_str in self.proto.mmtypes] + return [MMGenAddrType(proto=self.proto, id_str=id_str).name for id_str in self.proto.mmtypes] def print_addrtypes(self): """ @@ -122,7 +122,7 @@ class tool_api( a description. The first-listed is the default """ from ..addr import MMGenAddrType - for t in [MMGenAddrType(proto=self.proto,id_str=id_str) for id_str in self.proto.mmtypes]: + for t in [MMGenAddrType(proto=self.proto, id_str=id_str) for id_str in self.proto.mmtypes]: print(f'{t.name:<12} - {t.desc}') @property @@ -131,9 +131,9 @@ class tool_api( return self.mmtype @addrtype.setter - def addrtype(self,val): + def addrtype(self, val): from ..addr import MMGenAddrType - self.mmtype = MMGenAddrType(self.proto,val) + self.mmtype = MMGenAddrType(self.proto, val) @property def usr_randchars(self): @@ -144,5 +144,5 @@ class tool_api( return self.cfg.usr_randchars @usr_randchars.setter - def usr_randchars(self,val): + def usr_randchars(self, val): self.cfg.usr_randchars = val diff --git a/mmgen/tool/coin.py b/mmgen/tool/coin.py index 8ea35e51..4d30228a 100755 --- a/mmgen/tool/coin.py +++ b/mmgen/tool/coin.py @@ -21,13 +21,13 @@ tool.coin: Cryptocoin routines for the 'mmgen-tool' utility """ from collections import namedtuple -generator_data = namedtuple('generator_data',['kg','ag']) +generator_data = namedtuple('generator_data', ['kg', 'ag']) from .common import tool_cmd_base from ..key import PrivKey -from ..addr import CoinAddr,MMGenAddrType -from ..addrgen import KeyGenerator,AddrGenerator +from ..addr import CoinAddr, MMGenAddrType +from ..addrgen import KeyGenerator, AddrGenerator class tool_cmd(tool_cmd_base): """ @@ -45,8 +45,8 @@ class tool_cmd(tool_cmd_base): def _init_generators(self): return generator_data( - kg = KeyGenerator( self.cfg, self.proto, self.mmtype.pubkey_type ), - ag = AddrGenerator( self.cfg, self.proto, self.mmtype ), + kg = KeyGenerator(self.cfg, self.proto, self.mmtype.pubkey_type), + ag = AddrGenerator(self.cfg, self.proto, self.mmtype), ) def randwif(self): @@ -56,7 +56,7 @@ class tool_cmd(tool_cmd_base): self.proto, Crypto(self.cfg).get_random(32), pubkey_type = self.mmtype.pubkey_type, - compressed = self.mmtype.compressed ).wif + compressed = self.mmtype.compressed).wif def randpair(self): "generate a random private key/address pair" @@ -66,77 +66,77 @@ class tool_cmd(tool_cmd_base): self.proto, Crypto(self.cfg).get_random(32), pubkey_type = self.mmtype.pubkey_type, - compressed = self.mmtype.compressed ) + compressed = self.mmtype.compressed) return ( privkey.wif, - gd.ag.to_addr( gd.kg.gen_data(privkey) )) + gd.ag.to_addr(gd.kg.gen_data(privkey))) - def wif2hex(self,wifkey:'sstr'): + def wif2hex(self, wifkey: 'sstr'): "convert a private key from WIF to hexadecimal format" return PrivKey( self.proto, - wif = wifkey ).hex() + wif = wifkey).hex() - def hex2wif(self,privhex:'sstr'): + def hex2wif(self, privhex: 'sstr'): "convert a private key from hexadecimal to WIF format" return PrivKey( self.proto, bytes.fromhex(privhex), pubkey_type = self.mmtype.pubkey_type, - compressed = self.mmtype.compressed ).wif + compressed = self.mmtype.compressed).wif - def wif2addr(self,wifkey:'sstr'): + def wif2addr(self, wifkey: 'sstr'): "generate a coin address from a key in WIF format" gd = self._init_generators() privkey = PrivKey( self.proto, - wif = wifkey ) - return gd.ag.to_addr( gd.kg.gen_data(privkey) ) + wif = wifkey) + return gd.ag.to_addr(gd.kg.gen_data(privkey)) - def wif2redeem_script(self,wifkey:'sstr'): # new + def wif2redeem_script(self, wifkey: 'sstr'): # new "convert a WIF private key to a Segwit P2SH-P2WPKH redeem script" - assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' + assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit' gd = self._init_generators() privkey = PrivKey( self.proto, - wif = wifkey ) - return gd.ag.to_segwit_redeem_script( gd.kg.gen_data(privkey) ) + wif = wifkey) + return gd.ag.to_segwit_redeem_script(gd.kg.gen_data(privkey)) - def wif2segwit_pair(self,wifkey:'sstr'): + def wif2segwit_pair(self, wifkey: 'sstr'): "generate a Segwit P2SH-P2WPKH redeem script and address from a WIF private key" - assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' + assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit' gd = self._init_generators() data = gd.kg.gen_data(PrivKey( self.proto, - wif = wifkey )) + wif = wifkey)) return ( gd.ag.to_segwit_redeem_script(data), - gd.ag.to_addr(data) ) + gd.ag.to_addr(data)) - def _privhex2out(self,privhex:'sstr',output_pubhex=False): + def _privhex2out(self, privhex: 'sstr', output_pubhex=False): gd = self._init_generators() pk = PrivKey( self.proto, bytes.fromhex(privhex), compressed = self.mmtype.compressed, - pubkey_type = self.mmtype.pubkey_type ) + pubkey_type = self.mmtype.pubkey_type) data = gd.kg.gen_data(pk) return data.pubkey.hex() if output_pubhex else gd.ag.to_addr(data) - def privhex2addr(self,privhex:'sstr'): + def privhex2addr(self, privhex: 'sstr'): "generate a coin address from raw hexadecimal private key data" return self._privhex2out(privhex) - def privhex2pubhex(self,privhex:'sstr'): # new + def privhex2pubhex(self, privhex: 'sstr'): # new "generate a hexadecimal public key from raw hexadecimal private key data" - return self._privhex2out(privhex,output_pubhex=True) + return self._privhex2out(privhex, output_pubhex=True) - def pubhex2addr(self,pubkeyhex:'sstr'): + def pubhex2addr(self, pubkeyhex: 'sstr'): "convert a hexadecimal pubkey to an address" if self.proto.base_proto == 'Ethereum' and len(pubkeyhex) == 128: # support raw ETH pubkeys pubkeyhex = '04' + pubkeyhex from ..keygen import keygen_public_data - ag = AddrGenerator( self.cfg, self.proto, self.mmtype ) + ag = AddrGenerator(self.cfg, self.proto, self.mmtype) return ag.to_addr(keygen_public_data( pubkey = bytes.fromhex(pubkeyhex), viewkey_bytes = None, @@ -144,13 +144,13 @@ class tool_cmd(tool_cmd_base): compressed = self.mmtype.compressed, )) - def pubhex2redeem_script(self,pubkeyhex:'sstr'): # new + def pubhex2redeem_script(self, pubkeyhex: 'sstr'): # new "convert a hexadecimal pubkey to a Segwit P2SH-P2WPKH redeem script" - assert self.mmtype.name == 'segwit','This command is meaningful only for --type=segwit' + assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit' from ..proto.btc.common import hash160 return self.proto.pubhash2redeem_script(hash160(bytes.fromhex(pubkeyhex))).hex() - def redeem_script2addr(self,redeem_script_hex:'sstr'): # new + def redeem_script2addr(self, redeem_script_hex: 'sstr'): # new "convert a Segwit P2SH-P2WPKH redeem script to an address" assert self.mmtype.name == 'segwit', 'This command is meaningful only for --type=segwit' assert redeem_script_hex[:4] == '0014', f'{redeem_script_hex!r}: invalid redeem script' @@ -158,7 +158,7 @@ class tool_cmd(tool_cmd_base): from ..proto.btc.common import hash160 return self.proto.pubhash2addr(hash160(bytes.fromhex(redeem_script_hex)), 'p2sh') - def pubhash2addr(self,pubhashhex:'sstr'): + def pubhash2addr(self, pubhashhex: 'sstr'): "convert public key hash to address" pubhash = bytes.fromhex(pubhashhex) if self.mmtype.name == 'segwit': @@ -168,26 +168,26 @@ class tool_cmd(tool_cmd_base): else: return self.proto.pubhash2addr(pubhash, self.mmtype.addr_fmt) - def addr2pubhash(self,addr:'sstr'): + def addr2pubhash(self, addr: 'sstr'): "convert coin address to public key hash" ap = self.proto.decode_addr(addr) assert ap, f'coin address {addr!r} could not be parsed' if ap.fmt not in MMGenAddrType.pkh_fmts: from ..util import die - die(2,f'{ap.fmt} addresses cannot be converted to pubhash') + die(2, f'{ap.fmt} addresses cannot be converted to pubhash') return ap.bytes.hex() - def addr2scriptpubkey(self,addr:'sstr'): + def addr2scriptpubkey(self, addr: 'sstr'): "convert coin address to scriptPubKey" from ..proto.btc.tx.base import addr2scriptPubKey - return addr2scriptPubKey( self.proto, CoinAddr(self.proto,addr) ) + return addr2scriptPubKey(self.proto, CoinAddr(self.proto, addr)) - def scriptpubkey2addr(self,hexstr:'sstr'): + def scriptpubkey2addr(self, hexstr: 'sstr'): "convert scriptPubKey to coin address" from ..proto.btc.tx.base import scriptPubKey2addr - return scriptPubKey2addr( self.proto, hexstr )[0] + return scriptPubKey2addr(self.proto, hexstr)[0] - def eth_checksummed_addr(self,addr:'sstr'): + def eth_checksummed_addr(self, addr: 'sstr'): "create a checksummed Ethereum address" from ..protocol import init_proto - return init_proto( self.cfg, 'eth' ).checksummed_addr(addr) + return init_proto(self.cfg, 'eth').checksummed_addr(addr) diff --git a/mmgen/tool/common.py b/mmgen/tool/common.py index 1a7e0f4c..9ac5b38c 100755 --- a/mmgen/tool/common.py +++ b/mmgen/tool/common.py @@ -23,7 +23,7 @@ tool.common: Base class and shared routines for the 'mmgen-tool' utility from ..objmethods import MMGenObject def options_annot_str(l): - return "(valid choices: '{}')".format( "','".join(l) ) + return "(valid choices: '{}')".format("','".join(l)) class tool_cmd_base(MMGenObject): @@ -31,13 +31,13 @@ class tool_cmd_base(MMGenObject): need_addrtype = False need_amt = False - def __init__(self,cfg,cmdname=None,proto=None,mmtype=None): + def __init__(self, cfg, cmdname=None, proto=None, mmtype=None): self.cfg = cfg if self.need_proto: from ..protocol import init_proto_from_cfg - self.proto = proto or cfg._proto or init_proto_from_cfg(cfg,need_amt=True) + self.proto = proto or cfg._proto or init_proto_from_cfg(cfg, need_amt=True) if cfg.token: self.proto.tokensym = cfg.token.upper() @@ -45,8 +45,8 @@ class tool_cmd_base(MMGenObject): from ..addr import MMGenAddrType self.mmtype = MMGenAddrType( self.proto, - mmtype or cfg.type or self.proto.dfl_mmtype ) + mmtype or cfg.type or self.proto.dfl_mmtype) @property def user_commands(self): - return {k:v for k,v in type(self).__dict__.items() if callable(v) and not k.startswith('_')} + return {k:v for k, v in type(self).__dict__.items() if callable(v) and not k.startswith('_')} diff --git a/mmgen/tool/file.py b/mmgen/tool/file.py index 4fd983f0..e08aa8ce 100755 --- a/mmgen/tool/file.py +++ b/mmgen/tool/file.py @@ -20,55 +20,55 @@ tool.file: Address and transaction file routines for the 'mmgen-tool' utility """ -from .common import tool_cmd_base,options_annot_str +from .common import tool_cmd_base, options_annot_str class tool_cmd(tool_cmd_base): "utilities for viewing/checking MMGen address and transaction files" need_proto = True - def __init__(self,cfg,cmdname=None,proto=None,mmtype=None): + def __init__(self, cfg, cmdname=None, proto=None, mmtype=None): if cmdname == 'txview': self.need_amt = True - super().__init__(cfg=cfg,cmdname=cmdname,proto=proto,mmtype=mmtype) + super().__init__(cfg=cfg, cmdname=cmdname, proto=proto, mmtype=mmtype) - def _file_chksum(self,mmgen_addrfile,obj): + def _file_chksum(self, mmgen_addrfile, obj): kwargs = {'skip_chksum_msg':True} if not obj.__name__ == 'PasswordList': kwargs.update({'key_address_validity_check':False}) - ret = obj( self.cfg, self.proto, mmgen_addrfile, **kwargs ) + ret = obj(self.cfg, self.proto, mmgen_addrfile, **kwargs) if self.cfg.verbose: - from ..util import msg,capfirst + from ..util import msg, capfirst if ret.al_id.mmtype.name == 'password': msg('Passwd fmt: {}\nPasswd len: {}\nID string: {}'.format( capfirst(ret.pw_info[ret.pw_fmt].desc), ret.pw_len, - ret.pw_id_str )) + ret.pw_id_str)) else: msg(f'Base coin: {ret.base_coin} {capfirst(ret.network)}') msg(f'MMType: {capfirst(ret.al_id.mmtype.name)}') - msg( f'List length: {len(ret.data)}') + msg(f'List length: {len(ret.data)}') return ret.chksum - def addrfile_chksum(self,mmgen_addrfile:str): + def addrfile_chksum(self, mmgen_addrfile: str): "compute checksum for MMGen address file" from ..addrlist import AddrList - return self._file_chksum(mmgen_addrfile,AddrList) + return self._file_chksum(mmgen_addrfile, AddrList) - def keyaddrfile_chksum(self,mmgen_keyaddrfile:str): + def keyaddrfile_chksum(self, mmgen_keyaddrfile: str): "compute checksum for MMGen key-address file" from ..addrlist import KeyAddrList - return self._file_chksum(mmgen_keyaddrfile,KeyAddrList) + return self._file_chksum(mmgen_keyaddrfile, KeyAddrList) - def viewkeyaddrfile_chksum(self,mmgen_viewkeyaddrfile:str): + def viewkeyaddrfile_chksum(self, mmgen_viewkeyaddrfile: str): "compute checksum for MMGen key-address file" from ..addrlist import ViewKeyAddrList - return self._file_chksum(mmgen_viewkeyaddrfile,ViewKeyAddrList) + return self._file_chksum(mmgen_viewkeyaddrfile, ViewKeyAddrList) - def passwdfile_chksum(self,mmgen_passwdfile:str): + def passwdfile_chksum(self, mmgen_passwdfile: str): "compute checksum for MMGen password file" from ..passwdlist import PasswordList - return self._file_chksum(mmgen_passwdfile,PasswordList) + return self._file_chksum(mmgen_passwdfile, PasswordList) async def txview( self, @@ -78,18 +78,18 @@ class tool_cmd(tool_cmd_base): 'pager', 'terse', 'sort', - 'filesort' ), - 'dfls': ( False, False, 'addr', 'mtime' ), + 'filesort'), + 'dfls': (False, False, 'addr', 'mtime'), 'annots': { 'mmgen_tx_file(s)': str, 'pager': 'send output to pager', 'terse': 'produce compact tabular output', - 'sort': 'sort order for transaction inputs and outputs ' + options_annot_str(['addr','raw']), - 'filesort': 'file sort order ' + options_annot_str(['mtime','ctime','atime']), + 'sort': 'sort order for transaction inputs and outputs ' + options_annot_str(['addr', 'raw']), + 'filesort': 'file sort order ' + options_annot_str(['mtime', 'ctime', 'atime']), } }, *infiles, - **kwargs ): + **kwargs): "display specified raw or signed MMGen transaction files in human-readable form" terse = bool(kwargs.get('terse')) @@ -97,14 +97,14 @@ class tool_cmd(tool_cmd_base): file_sort = kwargs.get('filesort') or 'mtime' from ..filename import MMGenFileList - from ..tx import completed,CompletedTX - flist = MMGenFileList( infiles, base_class=completed.Completed, proto=self.proto ) - flist.sort_by_age( key=file_sort ) # in-place sort + from ..tx import completed, CompletedTX + flist = MMGenFileList(infiles, base_class=completed.Completed, proto=self.proto) + flist.sort_by_age(key=file_sort) # in-place sort async def process_file(f): return (await CompletedTX( cfg = self.cfg, filename = f.name, - quiet_open = True)).info.format( terse=terse, sort=tx_sort ) + quiet_open = True)).info.format(terse=terse, sort=tx_sort) return ('—'*77+'\n').join([await process_file(f) for f in flist]).rstrip() diff --git a/mmgen/tool/filecrypt.py b/mmgen/tool/filecrypt.py index 7b3ff054..230c132a 100755 --- a/mmgen/tool/filecrypt.py +++ b/mmgen/tool/filecrypt.py @@ -24,7 +24,7 @@ import os from .common import tool_cmd_base from ..crypto import Crypto -from ..fileutil import get_data_from_file,write_data_to_file +from ..fileutil import get_data_from_file, write_data_to_file class tool_cmd(tool_cmd_base): """ @@ -35,20 +35,20 @@ class tool_cmd(tool_cmd_base): * Enc: AES256_CTR, 16-byte rand IV, sha256 hash + 32-byte nonce + data * The encrypted file is indistinguishable from random data """ - def encrypt(self,infile:str,outfile='',hash_preset=''): + def encrypt(self, infile: str, outfile='', hash_preset=''): "encrypt a file" - data = get_data_from_file( self.cfg, infile, 'data for encryption', binary=True ) - enc_d = Crypto(self.cfg).mmgen_encrypt( data, 'data', hash_preset ) + data = get_data_from_file(self.cfg, infile, 'data for encryption', binary=True) + enc_d = Crypto(self.cfg).mmgen_encrypt(data, 'data', hash_preset) if not outfile: outfile = f'{os.path.basename(infile)}.{Crypto.mmenc_ext}' - write_data_to_file( self.cfg, outfile, enc_d, 'encrypted data', binary=True ) + write_data_to_file(self.cfg, outfile, enc_d, 'encrypted data', binary=True) return True - def decrypt(self,infile:str,outfile='',hash_preset=''): + def decrypt(self, infile: str, outfile='', hash_preset=''): "decrypt a file" - enc_d = get_data_from_file( self.cfg, infile, 'encrypted data', binary=True ) + enc_d = get_data_from_file(self.cfg, infile, 'encrypted data', binary=True) while True: - dec_d = Crypto(self.cfg).mmgen_decrypt( enc_d, 'data', hash_preset ) + dec_d = Crypto(self.cfg).mmgen_decrypt(enc_d, 'data', hash_preset) if dec_d: break from ..util import msg @@ -56,8 +56,8 @@ class tool_cmd(tool_cmd_base): if not outfile: from ..util import remove_extension o = os.path.basename(infile) - outfile = remove_extension(o,Crypto.mmenc_ext) + outfile = remove_extension(o, Crypto.mmenc_ext) if outfile == o: outfile += '.dec' - write_data_to_file( self.cfg, outfile, dec_d, 'decrypted data', binary=True ) + write_data_to_file(self.cfg, outfile, dec_d, 'decrypted data', binary=True) return True diff --git a/mmgen/tool/fileutil.py b/mmgen/tool/fileutil.py index 613457bb..58d3b8d4 100755 --- a/mmgen/tool/fileutil.py +++ b/mmgen/tool/fileutil.py @@ -20,10 +20,10 @@ tool.fileutil: File routines for the 'mmgen-tool' utility """ -import sys,os +import sys, os from .common import tool_cmd_base -from ..util import msg,msg_r,die,suf,make_full_path +from ..util import msg, msg_r, die, suf, make_full_path from ..crypto import Crypto class tool_cmd(tool_cmd_base): @@ -37,15 +37,15 @@ class tool_cmd(tool_cmd_base): from hashlib import sha256 - ivsize,bsize,mod = ( Crypto.aesctr_iv_len, 4096, 4096*8 ) - n,carry = 0,b' '*ivsize + ivsize, bsize, mod = (Crypto.aesctr_iv_len, 4096, 4096*8) + n, carry = 0, b' '*ivsize flgs = os.O_RDONLY|os.O_BINARY if sys.platform == 'win32' else os.O_RDONLY - f = os.open(filename,flgs) + f = os.open(filename, flgs) for ch in incog_id: if ch not in '0123456789ABCDEF': - die(2,f'{incog_id!r}: invalid Incog ID') + die(2, f'{incog_id!r}: invalid Incog ID') while True: - d = os.read(f,bsize) + d = os.read(f, bsize) if not d: break d = carry + d @@ -65,7 +65,7 @@ class tool_cmd(tool_cmd_base): os.close(f) return True - def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False): + def rand2file(self, outfile: str, nbytes: str, threads=4, silent=False): """ write ‘nbytes’ bytes of random data to specified file (dd-style byte specifiers supported) @@ -89,39 +89,39 @@ class tool_cmd(tool_cmd_base): """ from threading import Thread from queue import Queue - from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from ..util2 import parse_bytespec def encrypt_worker(): - ctr_init_val = os.urandom( Crypto.aesctr_iv_len ) - c = Cipher( algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend() ) + ctr_init_val = os.urandom(Crypto.aesctr_iv_len) + c = Cipher(algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend()) encryptor = c.encryptor() while True: - q2.put( encryptor.update(q1.get()) ) + q2.put(encryptor.update(q1.get())) q1.task_done() def output_worker(): while True: - f.write( q2.get() ) + f.write(q2.get()) q2.task_done() nbytes = parse_bytespec(nbytes) if self.cfg.outdir: - outfile = make_full_path( self.cfg.outdir, outfile ) + outfile = make_full_path(self.cfg.outdir, outfile) - f = open(outfile,'wb') + f = open(outfile, 'wb') key = Crypto(self.cfg).get_random(32) - q1,q2 = ( Queue(), Queue() ) + q1, q2 = (Queue(), Queue()) - for i in range(max(1,threads-2)): + for i in range(max(1, threads-2)): t = Thread(target=encrypt_worker) t.daemon = True t.start() - t = Thread( target=output_worker ) + t = Thread(target=output_worker) t.daemon = True t.start() @@ -129,10 +129,10 @@ class tool_cmd(tool_cmd_base): for i in range(nbytes // blk_size): if not i % 4: msg_r(f'\rRead: {i * blk_size} bytes') - q1.put( os.urandom(blk_size) ) + q1.put(os.urandom(blk_size)) if nbytes % blk_size: - q1.put( os.urandom(nbytes % blk_size) ) + q1.put(os.urandom(nbytes % blk_size)) q1.join() q2.join() @@ -140,7 +140,7 @@ class tool_cmd(tool_cmd_base): fsize = os.stat(outfile).st_size if fsize != nbytes: - die(3,f'{fsize}: incorrect random file size (should be {nbytes})') + die(3, f'{fsize}: incorrect random file size (should be {nbytes})') if not silent: msg(f'\rRead: {nbytes} bytes') @@ -148,7 +148,7 @@ class tool_cmd(tool_cmd_base): return True - def decrypt_keystore(self, wallet_file:str, output_hex=False): + def decrypt_keystore(self, wallet_file: str, output_hex=False): "decrypt the data in a keystore wallet, returning the decrypted data in binary format" from ..ui import line_input passwd = line_input(self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase).strip().encode() @@ -159,7 +159,7 @@ class tool_cmd(tool_cmd_base): ret = decrypt_keystore(data[0]['keystore'], passwd) return ret.hex() if output_hex else ret - def decrypt_geth_keystore(self, wallet_file:str, check_addr=True): + def decrypt_geth_keystore(self, wallet_file: str, check_addr=True): "decrypt the private key in a Geth keystore wallet, returning the decrypted key in hex format" from ..ui import line_input passwd = line_input(self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase).strip().encode() diff --git a/mmgen/tool/help.py b/mmgen/tool/help.py index 403194b3..909d2876 100755 --- a/mmgen/tool/help.py +++ b/mmgen/tool/help.py @@ -30,7 +30,7 @@ def main_help(): from ..util2 import pretty_format def do(): - for clsname,cmdlist in main_tool.mods.items(): + for clsname, cmdlist in main_tool.mods.items(): cls = main_tool.get_mod_cls(clsname) cls_docstr = cls.__doc__.strip() yield capfirst(cls_docstr.split('\n')[0].strip()) + ':' @@ -41,10 +41,10 @@ def main_help(): yield ' ' + line.lstrip('\t') yield '' - max_w = max(map(len,cmdlist)) + max_w = max(map(len, cmdlist)) for cmdname in cmdlist: - code = getattr(cls,cmdname) + code = getattr(cls, cmdname) if code.__doc__: yield ' {:{}} - {}'.format( cmdname, @@ -109,11 +109,11 @@ def gen_tool_usage(): for line in m1.lstrip().split('\n'): yield line.lstrip('\t') - for clsname,cmdlist in main_tool.mods.items(): + for clsname, cmdlist in main_tool.mods.items(): cls = main_tool.get_mod_cls(clsname) cls_docstr = cls.__doc__.strip() yield '' - yield ' {}:'.format( capfirst(cls_docstr.split('\n')[0].strip()) ) + yield ' {}:'.format(capfirst(cls_docstr.split('\n')[0].strip())) yield '' if '\n' in cls_docstr: @@ -121,49 +121,49 @@ def gen_tool_usage(): yield ' ' + line.lstrip('\t') yield '' - max_w = max(map(len,cmdlist)) + max_w = max(map(len, cmdlist)) for cmdname in cmdlist: yield ' {a:{w}} {b}'.format( a = cmdname, - b = main_tool.create_call_sig(cmdname,cls,as_string=True), - w = max_w ) + b = main_tool.create_call_sig(cmdname, cls, as_string=True), + w = max_w) yield '' for line in m2.rstrip().split('\n'): yield line.lstrip('\t') -def gen_tool_cmd_usage(mod,cmdname): +def gen_tool_cmd_usage(mod, cmdname): from ..cfg import gc from ..util import capfirst cls = main_tool.get_mod_cls(mod) - docstr = getattr(cls,cmdname).__doc__.strip() - args,kwargs,kwargs_types,_,ann = main_tool.create_call_sig(cmdname,cls) + docstr = getattr(cls, cmdname).__doc__.strip() + args, kwargs, kwargs_types, _, ann = main_tool.create_call_sig(cmdname, cls) ARGS = 'ARG' if len(args) == 1 else 'ARGS' if args else '' KWARGS = 'KEYWORD ARG' if len(kwargs) == 1 else 'KEYWORD ARGS' if kwargs else '' - yield capfirst( docstr.split('\n')[0].strip() ) + yield capfirst(docstr.split('\n')[0].strip()) yield '' yield 'USAGE: {b} [OPTS] {c}{d}{e}'.format( b = gc.prog_name, c = cmdname, d = f' {ARGS}' if ARGS else '', - e = f' [{KWARGS}]' if KWARGS else '' ) + e = f' [{KWARGS}]' if KWARGS else '') if args: max_w = max(len(k[0]) for k in args) yield '' yield f'Required {ARGS} (type shown in square brackets):' yield '' - for argname,argtype in args: + for argname, argtype in args: have_sstr = ann.get(argname) == 'sstr' yield ' {a:{w}} [{b}]{c}{d}'.format( a = argname, b = argtype, c = " (use '-' to read from STDIN)" if have_sstr else '', - d = ' ' + ann[argname] if isinstance(ann.get(argname),str) and not have_sstr else '', - w = max_w ) + d = ' ' + ann[argname] if isinstance(ann.get(argname), str) and not have_sstr else '', + w = max_w) if kwargs: max_w = max(len(k) for k in kwargs) @@ -174,26 +174,26 @@ def gen_tool_cmd_usage(mod,cmdname): for argname in kwargs: yield ' {a:{w}} {b:{w2}} {c}'.format( a = argname, - b = '[{}={}]'.format( kwargs_types[argname].__name__, repr(kwargs[argname]) ), - c = capfirst(ann[argname]) if isinstance(ann.get(argname),str) else '', + b = '[{}={}]'.format(kwargs_types[argname].__name__, repr(kwargs[argname])), + c = capfirst(ann[argname]) if isinstance(ann.get(argname), str) else '', w = max_w, - w2 = max_w2 ).rstrip() + w2 = max_w2).rstrip() if '\n' in docstr: for line in docstr.split('\n')[1:]: yield line.lstrip('\t') -def usage(cmdname=None,exit_val=1): +def usage(cmdname=None, exit_val=1): - from ..util import Msg,die + from ..util import Msg, die if cmdname: - for mod,cmdlist in main_tool.mods.items(): + for mod, cmdlist in main_tool.mods.items(): if cmdname in cmdlist: - Msg('\n'.join(gen_tool_cmd_usage(mod,cmdname))) + Msg('\n'.join(gen_tool_cmd_usage(mod, cmdname))) break else: - die(1,f'{cmdname!r}: no such tool command') + die(1, f'{cmdname!r}: no such tool command') else: from ..ui import do_pager do_pager('\n'.join(gen_tool_usage()) + '\n') @@ -204,10 +204,10 @@ def usage(cmdname=None,exit_val=1): class tool_cmd(tool_cmd_base): "help/usage commands" - def help(self,command_name=''): + def help(self, command_name=''): "display usage information for a single command or all commands" - usage(command_name,exit_val=0) + usage(command_name, exit_val=0) - def usage(self,command_name=''): + def usage(self, command_name=''): "display usage information for a single command or all commands" - usage(command_name,exit_val=0) + usage(command_name, exit_val=0) diff --git a/mmgen/tool/mnemonic.py b/mmgen/tool/mnemonic.py index 0b804fd0..106c4069 100755 --- a/mmgen/tool/mnemonic.py +++ b/mmgen/tool/mnemonic.py @@ -22,18 +22,18 @@ tool.mnemonic: Mnemonic routines for the 'mmgen-tool' utility from collections import namedtuple -from .common import tool_cmd_base,options_annot_str +from .common import tool_cmd_base, options_annot_str from ..baseconv import baseconv from ..xmrseed import xmrseed from ..bip39 import bip39 dfl_mnemonic_fmt = 'mmgen' -mft = namedtuple('mnemonic_format',['fmt','pad','conv_cls']) +mft = namedtuple('mnemonic_format', ['fmt', 'pad', 'conv_cls']) mnemonic_fmts = { - 'mmgen': mft( 'words', 'seed', baseconv ), - 'bip39': mft( 'bip39', None, bip39 ), - 'xmrseed': mft( 'xmrseed', None, xmrseed ), + 'mmgen': mft('words', 'seed', baseconv), + 'bip39': mft('bip39', None, bip39), + 'xmrseed': mft('xmrseed', None, xmrseed), } mn_opts_disp = 'seed phrase format ' + options_annot_str(mnemonic_fmts) @@ -60,18 +60,18 @@ class tool_cmd(tool_cmd_base): use the ‘hex2wif’ command. """ - def _xmr_reduce(self,bytestr): + def _xmr_reduce(self, bytestr): from ..protocol import init_proto - proto = init_proto( self.cfg, 'xmr' ) + proto = init_proto(self.cfg, 'xmr') if len(bytestr) != proto.privkey_len: from ..util import die - die(1,'{!r}: invalid bit length for Monero private key (must be {})'.format( + die(1, '{!r}: invalid bit length for Monero private key (must be {})'.format( len(bytestr*8), - proto.privkey_len*8 )) - return proto.preprocess_key(bytestr,None) + proto.privkey_len*8)) + return proto.preprocess_key(bytestr, None) - def _do_random_mn(self,nbytes:int,fmt:str): - assert nbytes in (16,24,32), 'nbytes must be 16, 24 or 32' + def _do_random_mn(self, nbytes: int, fmt: str): + assert nbytes in (16, 24, 32), 'nbytes must be 16, 24 or 32' from ..crypto import Crypto randbytes = Crypto(self.cfg).get_random(nbytes) if fmt == 'xmrseed': @@ -79,54 +79,54 @@ class tool_cmd(tool_cmd_base): if self.cfg.verbose: from ..util import msg msg(f'Seed: {randbytes.hex()}') - return self.hex2mn(randbytes.hex(),fmt=fmt) + return self.hex2mn(randbytes.hex(), fmt=fmt) - def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + def mn_rand128(self, fmt:mn_opts_disp = dfl_mnemonic_fmt): "generate a random 128-bit mnemonic seed phrase" - return self._do_random_mn(16,fmt) + return self._do_random_mn(16, fmt) - def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + def mn_rand192(self, fmt:mn_opts_disp = dfl_mnemonic_fmt): "generate a random 192-bit mnemonic seed phrase" - return self._do_random_mn(24,fmt) + return self._do_random_mn(24, fmt) - def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + def mn_rand256(self, fmt:mn_opts_disp = dfl_mnemonic_fmt): "generate a random 256-bit mnemonic seed phrase" - return self._do_random_mn(32,fmt) + return self._do_random_mn(32, fmt) - def hex2mn( self, hexstr:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ): + def hex2mn(self, hexstr: 'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt): "convert a 16, 24 or 32-byte hexadecimal string to a mnemonic seed phrase" if fmt == 'xmrseed': hexstr = self._xmr_reduce(bytes.fromhex(hexstr)).hex() f = mnemonic_fmts[fmt] - return ' '.join( f.conv_cls(fmt).fromhex(hexstr,f.pad) ) + return ' '.join(f.conv_cls(fmt).fromhex(hexstr, f.pad)) - def mn2hex( self, seed_mnemonic:'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt ): + def mn2hex(self, seed_mnemonic: 'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt): "convert a mnemonic seed phrase to a hexadecimal string" f = mnemonic_fmts[fmt] - return f.conv_cls(fmt).tohex( seed_mnemonic.split(), f.pad ) + return f.conv_cls(fmt).tohex(seed_mnemonic.split(), f.pad) - def mn2hex_interactive( self, + def mn2hex_interactive(self, fmt: mn_opts_disp = dfl_mnemonic_fmt, mn_len: 'length of seed phrase in words' = 24, - print_mn: 'print the seed phrase after entry' = False ): + print_mn: 'print the seed phrase after entry' = False): "convert an interactively supplied mnemonic seed phrase to a hexadecimal string" from ..mn_entry import mn_entry - mn = mn_entry( self.cfg, fmt ).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len,validate=False) + mn = mn_entry(self.cfg, fmt).get_mnemonic_from_user(25 if fmt == 'xmrseed' else mn_len, validate=False) if print_mn: from ..util import msg msg(mn) - return self.mn2hex(seed_mnemonic=mn,fmt=fmt) + return self.mn2hex(seed_mnemonic=mn, fmt=fmt) - def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt ): + def mn_stats(self, fmt:mn_opts_disp = dfl_mnemonic_fmt): "show stats for a mnemonic wordlist" return mnemonic_fmts[fmt].conv_cls(fmt).check_wordlist(self.cfg) def mn_printlist(self, fmt: mn_opts_disp = dfl_mnemonic_fmt, enum: 'enumerate the list' = False, - pager: 'send output to pager' = False ): + pager: 'send output to pager' = False): "print a mnemonic wordlist" ret = mnemonic_fmts[fmt].conv_cls(fmt).get_wordlist() if enum: - ret = [f'{n:>4} {e}' for n,e in enumerate(ret)] + ret = [f'{n:>4} {e}' for n, e in enumerate(ret)] return '\n'.join(ret) diff --git a/mmgen/tool/rpc.py b/mmgen/tool/rpc.py index ffa0256a..dbfa78a8 100755 --- a/mmgen/tool/rpc.py +++ b/mmgen/tool/rpc.py @@ -20,7 +20,7 @@ tool.rpc: JSON/RPC routines for the 'mmgen-tool' utility """ -from .common import tool_cmd_base,options_annot_str +from .common import tool_cmd_base, options_annot_str from ..tw.view import TwView from ..tw.txhistory import TwTxHistory @@ -33,7 +33,7 @@ class tool_cmd(tool_cmd_base): async def daemon_version(self): "print coin daemon version" from ..daemon import CoinDaemon - d = CoinDaemon( cfg=self.cfg, proto=self.proto, test_suite=self.cfg.test_suite ) + d = CoinDaemon(cfg=self.cfg, proto=self.proto, test_suite=self.cfg.test_suite) if self.proto.base_proto == 'Monero': from ..proto.xmr.rpc import MoneroRPCClient r = MoneroRPCClient( @@ -44,7 +44,7 @@ class tool_cmd(tool_cmd_base): port = d.rpc_port, user = None, passwd = None, - ignore_daemon_version = True ) + ignore_daemon_version = True) else: from ..rpc import rpc_init r = await rpc_init(self.cfg, self.proto, ignore_daemon_version=True, ignore_wallet=True) @@ -53,22 +53,22 @@ class tool_cmd(tool_cmd_base): async def getbalance(self, minconf: 'minimum number of confirmations' = 1, quiet: 'produce quieter output' = False, - pager: 'send output to pager' = False ): + pager: 'send output to pager' = False): "list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet" from ..tw.bal import TwGetBalance - return (await TwGetBalance(self.cfg,self.proto,minconf,quiet)).format(color=self.cfg.color) + return (await TwGetBalance(self.cfg, self.proto, minconf, quiet)).format(color=self.cfg.color) async def twops(self, - obj,pager,reverse,detail,sort,age_fmt,interactive, - **kwargs ): + obj, pager, reverse, detail, sort, age_fmt, interactive, + **kwargs): obj.reverse = reverse obj.age_fmt = age_fmt - for k,v in kwargs.items(): - setattr(obj,k,v) + for k, v in kwargs.items(): + setattr(obj, k, v) - await obj.get_data(sort_key=sort,reverse_sort=reverse) + await obj.get_data(sort_key=sort, reverse_sort=reverse) if interactive: await obj.view_filter_and_sort() @@ -76,7 +76,7 @@ class tool_cmd(tool_cmd_base): else: ret = await obj.format('detail' if detail else 'squeezed') - if hasattr(obj,'twctl'): + if hasattr(obj, 'twctl'): del obj.twctl return ret @@ -89,14 +89,14 @@ class tool_cmd(tool_cmd_base): sort: 'unspent output sort order ' + options_annot_str(TwView.sort_funcs) = 'age', age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs', interactive: 'enable interactive operation' = False, - show_mmid: 'show MMGen IDs along with coin addresses' = True ): + show_mmid: 'show MMGen IDs along with coin addresses' = True): "view tracking wallet unspent outputs" from ..tw.unspent import TwUnspentOutputs - obj = await TwUnspentOutputs(self.cfg,self.proto,minconf=minconf) + obj = await TwUnspentOutputs(self.cfg, self.proto, minconf=minconf) return await self.twops( - obj,pager,reverse,wide,sort,age_fmt,interactive, - show_mmid = show_mmid ) + obj, pager, reverse, wide, sort, age_fmt, interactive, + show_mmid = show_mmid) async def txhist(self, pager: 'send output to pager' = False, @@ -105,19 +105,19 @@ class tool_cmd(tool_cmd_base): sinceblock: 'display transactions starting from this block' = 0, sort: 'transaction sort order ' + options_annot_str(TwTxHistory.sort_funcs) = 'age', age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs', - interactive: 'enable interactive operation' = False ): + interactive: 'enable interactive operation' = False): "view transaction history of tracking wallet" - obj = await TwTxHistory(self.cfg,self.proto,sinceblock=sinceblock) + obj = await TwTxHistory(self.cfg, self.proto, sinceblock=sinceblock) return await self.twops( - obj,pager,reverse,detail,sort,age_fmt,interactive ) + obj, pager, reverse, detail, sort, age_fmt, interactive) async def listaddress(self, - mmgen_addr:str, + mmgen_addr: str, wide: 'display data in wide tabular format' = False, minconf: 'minimum number of confirmations' = 1, showcoinaddr: 'display coin address in addition to MMGen ID' = True, - age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs' ): + age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs'): "list the specified MMGen address in the tracking wallet and its balance" return await self.listaddresses( @@ -125,72 +125,72 @@ class tool_cmd(tool_cmd_base): wide = wide, minconf = minconf, showcoinaddrs = showcoinaddr, - age_fmt = age_fmt ) + age_fmt = age_fmt) async def listaddresses(self, pager: 'send output to pager' = False, reverse: 'reverse order of unspent outputs' = False, wide: 'display data in wide tabular format' = False, minconf: 'minimum number of confirmations' = 1, - sort: 'address sort order ' + options_annot_str(['reverse','mmid','addr','amt']) = '', + sort: 'address sort order ' + options_annot_str(['reverse', 'mmid', 'addr', 'amt']) = '', age_fmt: 'format for the Age/Date column ' + options_annot_str(TwView.age_fmts) = 'confs', interactive: 'enable interactive operation' = False, mmgen_addrs: 'hyphenated range or comma-separated list of addresses' = '', showcoinaddrs:'display coin addresses in addition to MMGen IDs' = True, showempty: 'show addresses with no balances' = True, showused: 'show used addresses (tristate: 0=no, 1=yes, 2=all)' = 1, - all_labels: 'show all addresses with labels' = False ): + all_labels: 'show all addresses with labels' = False): "list MMGen addresses in the tracking wallet and their balances" - assert showused in (0,1,2), "‘showused’ must have a value of 0, 1 or 2" + assert showused in (0, 1, 2), "‘showused’ must have a value of 0, 1 or 2" from ..tw.addresses import TwAddresses - obj = await TwAddresses(self.cfg,self.proto,minconf=minconf,mmgen_addrs=mmgen_addrs) + obj = await TwAddresses(self.cfg, self.proto, minconf=minconf, mmgen_addrs=mmgen_addrs) return await self.twops( - obj,pager,reverse,wide,sort,age_fmt,interactive, + obj, pager, reverse, wide, sort, age_fmt, interactive, showcoinaddrs = showcoinaddrs, showempty = showempty, showused = showused, - all_labels = all_labels ) + all_labels = all_labels) - async def add_label(self,mmgen_or_coin_addr:str,label:str): + async def add_label(self, mmgen_or_coin_addr: str, label: str): "add descriptive label for address in tracking wallet" from ..tw.ctl import TwCtl - return await (await TwCtl(self.cfg,self.proto,mode='w')).set_comment(mmgen_or_coin_addr,label) + return await (await TwCtl(self.cfg, self.proto, mode='w')).set_comment(mmgen_or_coin_addr, label) - async def remove_label(self,mmgen_or_coin_addr:str): + async def remove_label(self, mmgen_or_coin_addr: str): "remove descriptive label for address in tracking wallet" - await self.add_label( mmgen_or_coin_addr, '' ) + await self.add_label(mmgen_or_coin_addr, '') return True - async def remove_address(self,mmgen_or_coin_addr:str): + async def remove_address(self, mmgen_or_coin_addr: str): "remove an address from tracking wallet" from ..tw.ctl import TwCtl # returns None on failure: - ret = await (await TwCtl(self.cfg,self.proto,mode='w')).remove_address(mmgen_or_coin_addr) + ret = await (await TwCtl(self.cfg, self.proto, mode='w')).remove_address(mmgen_or_coin_addr) if ret: from ..util import msg msg(f'Address {ret!r} deleted from tracking wallet') return ret - async def resolve_address(self,mmgen_or_coin_addr:str): + async def resolve_address(self, mmgen_or_coin_addr: str): "resolve an MMGen address in the tracking wallet to a coin address or vice-versa" from ..tw.ctl import TwCtl - ret = await (await TwCtl(self.cfg,self.proto,mode='w')).resolve_address( mmgen_or_coin_addr ) + ret = await (await TwCtl(self.cfg, self.proto, mode='w')).resolve_address(mmgen_or_coin_addr) if ret: from ..addr import is_coin_addr - return ret.twmmid if is_coin_addr(self.proto,mmgen_or_coin_addr) else ret.coinaddr + return ret.twmmid if is_coin_addr(self.proto, mmgen_or_coin_addr) else ret.coinaddr else: return False - async def rescan_address(self,mmgen_or_coin_addr:str): + async def rescan_address(self, mmgen_or_coin_addr: str): "rescan an address in the tracking wallet to update its balance" from ..tw.ctl import TwCtl - return await (await TwCtl(self.cfg,self.proto,mode='w')).rescan_address( mmgen_or_coin_addr ) + return await (await TwCtl(self.cfg, self.proto, mode='w')).rescan_address(mmgen_or_coin_addr) async def rescan_blockchain(self, start_block: int = None, - stop_block: int = None ): + stop_block: int = None): """ rescan the blockchain to update historical transactions in the tracking wallet @@ -201,10 +201,10 @@ class tool_cmd(tool_cmd_base): parameter. """ from ..tw.ctl import TwCtl - await (await TwCtl(self.cfg,self.proto,mode='w')).rescan_blockchain(start_block,stop_block) + await (await TwCtl(self.cfg, self.proto, mode='w')).rescan_blockchain(start_block, stop_block) return True - async def twexport(self,include_amts=True,pretty=False,prune=False,warn_used=False,force=False): + async def twexport(self, include_amts=True, pretty=False, prune=False, warn_used=False, force=False): """ export a tracking wallet to JSON format @@ -234,10 +234,10 @@ class tool_cmd(tool_cmd_base): pretty = pretty, prune = prune, warn_used = warn_used, - force_overwrite = force ) + force_overwrite = force) return True - async def twimport(self,filename:str,ignore_checksum=False,batch=False): + async def twimport(self, filename: str, ignore_checksum=False, batch=False): """ restore a tracking wallet from a JSON dump created by ‘twexport’ @@ -251,5 +251,5 @@ class tool_cmd(tool_cmd_base): rescan_blockchain’. """ from ..tw.json import TwJSON - await TwJSON.Import( self.cfg, self.proto, filename, ignore_checksum=ignore_checksum, batch=batch ) + await TwJSON.Import(self.cfg, self.proto, filename, ignore_checksum=ignore_checksum, batch=batch) return True diff --git a/mmgen/tool/util.py b/mmgen/tool/util.py index 676f4a57..d1b82843 100755 --- a/mmgen/tool/util.py +++ b/mmgen/tool/util.py @@ -20,7 +20,7 @@ tool.util: Utility commands for the 'mmgen-tool' utility """ -import sys,os +import sys, os from .common import tool_cmd_base @@ -28,7 +28,7 @@ class tool_cmd(tool_cmd_base): "general string conversion and hashing utilities" # mmgen.util2.bytespec_map - def bytespec(self,dd_style_byte_specifier:str): + def bytespec(self, dd_style_byte_specifier: str): """ convert a byte specifier such as ‘4GB’ into an integer @@ -60,7 +60,7 @@ class tool_cmd(tool_cmd_base): fmt: 'width and precision of output' = '0.2', print_sym: 'print the specifier after the numerical value' = True, strip: 'strip trailing zeroes' = False, - add_space: 'with print_sym, add space between value and specifier' = False ): + add_space: 'with print_sym, add space between value and specifier' = False): """ convert an integer to a byte specifier such as ‘4GB’ @@ -89,25 +89,25 @@ class tool_cmd(tool_cmd_base): fmt, print_sym = print_sym, strip = strip, - add_space = add_space ) + add_space = add_space) def randhex(self, - nbytes: 'number of bytes to output' = 32 ): + nbytes: 'number of bytes to output' = 32): "print 'n' bytes (default 32) of random data in hex format" from ..crypto import Crypto - return Crypto(self.cfg).get_random( nbytes ).hex() + return Crypto(self.cfg).get_random(nbytes).hex() - def hexreverse(self,hexstr:'sstr'): + def hexreverse(self, hexstr: 'sstr'): "reverse bytes of a hexadecimal string" - return bytes.fromhex( hexstr.strip() )[::-1].hex() + return bytes.fromhex(hexstr.strip())[::-1].hex() - def hexlify(self,infile:str): + def hexlify(self, infile: str): "convert bytes in file to hexadecimal (use '-' for stdin)" from ..fileutil import get_data_from_file - data = get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True ) + data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True) return data.hex() - def unhexlify(self,hexstr:'sstr'): + def unhexlify(self, hexstr: 'sstr'): "convert a hexadecimal string to bytes (warning: outputs binary data)" return bytes.fromhex(hexstr) @@ -118,34 +118,34 @@ class tool_cmd(tool_cmd_base): "create hexdump of data from file (use '-' for stdin)" from ..fileutil import get_data_from_file from ..util2 import pretty_hexdump - data = get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True ) - return pretty_hexdump( data, cols=cols, line_nums=line_nums ).rstrip() + data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True) + return pretty_hexdump(data, cols=cols, line_nums=line_nums).rstrip() - def unhexdump(self,infile:str): + def unhexdump(self, infile: str): "decode hexdump from file (use '-' for stdin) (warning: outputs binary data)" if sys.platform == 'win32': import msvcrt - msvcrt.setmode( sys.stdout.fileno(), os.O_BINARY ) + msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) from ..fileutil import get_data_from_file from ..util2 import decode_pretty_hexdump - hexdata = get_data_from_file( self.cfg, infile, dash=True, quiet=True ) + hexdata = get_data_from_file(self.cfg, infile, dash=True, quiet=True) return decode_pretty_hexdump(hexdata) - def hash160(self,hexstr:'sstr'): + def hash160(self, hexstr: 'sstr'): "compute ripemd160(sha256(data)) (convert hex pubkey to hex addr)" from ..proto.btc.common import hash160 - return hash160( bytes.fromhex(hexstr) ).hex() + return hash160(bytes.fromhex(hexstr)).hex() # TODO: handle stdin def hash256(self, data: str, file_input: 'first arg is the name of a file containing the data' = False, - hex_input: 'first arg is a hexadecimal string' = False ): + hex_input: 'first arg is a hexadecimal string' = False): "compute sha256(sha256(data)) (double sha256)" from hashlib import sha256 if file_input: from ..fileutil import get_data_from_file - b = get_data_from_file( self.cfg, data, binary=True ) + b = get_data_from_file(self.cfg, data, binary=True) elif hex_input: from ..util2 import decode_pretty_hexdump b = decode_pretty_hexdump(data) @@ -153,87 +153,87 @@ class tool_cmd(tool_cmd_base): b = data return sha256(sha256(b.encode()).digest()).hexdigest() - def id6(self,infile:str): + def id6(self, infile: str): "generate 6-character MMGen ID for a file (use '-' for stdin)" from ..util import make_chksum_6 from ..fileutil import get_data_from_file return make_chksum_6( - get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True )) + get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True)) - def str2id6(self,string:'sstr'): # retain ignoring of space for backwards compat + def str2id6(self, string: 'sstr'): # retain ignoring of space for backwards compat "generate 6-character MMGen ID for a string, ignoring spaces in string" from ..util import make_chksum_6 - return make_chksum_6( ''.join(string.split()) ) + return make_chksum_6(''.join(string.split())) - def id8(self,infile:str): + def id8(self, infile: str): "generate 8-character MMGen ID for a file (use '-' for stdin)" from ..util import make_chksum_8 from ..fileutil import get_data_from_file return make_chksum_8( - get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True )) + get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True)) def randb58(self, nbytes: 'number of bytes to output' = 32, - pad: 'pad output to this width' = 0 ): + pad: 'pad output to this width' = 0): "generate random data (default: 32 bytes) and convert it to base 58" from ..baseconv import baseconv from ..crypto import Crypto - return baseconv('b58').frombytes( Crypto(self.cfg).get_random(nbytes), pad=pad, tostr=True ) + return baseconv('b58').frombytes(Crypto(self.cfg).get_random(nbytes), pad=pad, tostr=True) - def bytestob58(self,infile:str,pad: 'pad output to this width' = 0): + def bytestob58(self, infile: str, pad: 'pad output to this width' = 0): "convert bytes to base 58 (supply data via STDIN)" from ..fileutil import get_data_from_file from ..baseconv import baseconv - data = get_data_from_file( self.cfg, infile, dash=True, quiet=True, binary=True ) - return baseconv('b58').frombytes( data, pad=pad, tostr=True ) + data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True) + return baseconv('b58').frombytes(data, pad=pad, tostr=True) - def b58tobytes(self,b58_str:'sstr',pad: 'pad output to this width' = 0): + def b58tobytes(self, b58_str: 'sstr', pad: 'pad output to this width' = 0): "convert a base 58 string to bytes (warning: outputs binary data)" from ..baseconv import baseconv - return baseconv('b58').tobytes( b58_str, pad=pad ) + return baseconv('b58').tobytes(b58_str, pad=pad) - def hextob58(self,hexstr:'sstr',pad: 'pad output to this width' = 0): + def hextob58(self, hexstr: 'sstr', pad: 'pad output to this width' = 0): "convert a hexadecimal string to base 58" from ..baseconv import baseconv - return baseconv('b58').fromhex( hexstr, pad=pad, tostr=True ) + return baseconv('b58').fromhex(hexstr, pad=pad, tostr=True) - def b58tohex(self,b58_str:'sstr',pad: 'pad output to this width' = 0): + def b58tohex(self, b58_str: 'sstr', pad: 'pad output to this width' = 0): "convert a base 58 string to hexadecimal" from ..baseconv import baseconv - return baseconv('b58').tohex( b58_str, pad=pad ) + return baseconv('b58').tohex(b58_str, pad=pad) - def hextob58chk(self,hexstr:'sstr'): + def hextob58chk(self, hexstr: 'sstr'): "convert a hexadecimal string to base58-check encoding" from ..proto.btc.common import b58chk_encode - return b58chk_encode( bytes.fromhex(hexstr) ) + return b58chk_encode(bytes.fromhex(hexstr)) - def b58chktohex(self,b58chk_str:'sstr'): + def b58chktohex(self, b58chk_str: 'sstr'): "convert a base58-check encoded string to hexadecimal" from ..proto.btc.common import b58chk_decode return b58chk_decode(b58chk_str).hex() - def hextob32(self,hexstr:'sstr',pad: 'pad output to this width' = 0): + def hextob32(self, hexstr: 'sstr', pad: 'pad output to this width' = 0): "convert a hexadecimal string to an MMGen-flavor base 32 string" from ..baseconv import baseconv - return baseconv('b32').fromhex( hexstr, pad, tostr=True ) + return baseconv('b32').fromhex(hexstr, pad, tostr=True) - def b32tohex(self,b32_str:'sstr',pad: 'pad output to this width' = 0): + def b32tohex(self, b32_str: 'sstr', pad: 'pad output to this width' = 0): "convert an MMGen-flavor base 32 string to hexadecimal" from ..baseconv import baseconv - return baseconv('b32').tohex( b32_str.upper(), pad ) + return baseconv('b32').tohex(b32_str.upper(), pad) def hextob6d(self, - hexstr:'sstr', + hexstr: 'sstr', pad: 'pad output to this width' = 0, add_spaces: 'add a space after every 5th character' = True): "convert a hexadecimal string to die roll base6 (base6d)" from ..baseconv import baseconv from ..util2 import block_format - ret = baseconv('b6d').fromhex(hexstr,pad,tostr=True) - return block_format( ret, gw=5, cols=None ).strip() if add_spaces else ret + ret = baseconv('b6d').fromhex(hexstr, pad, tostr=True) + return block_format(ret, gw=5, cols=None).strip() if add_spaces else ret - def b6dtohex(self,b6d_str:'sstr',pad: 'pad output to this width' = 0): + def b6dtohex(self, b6d_str: 'sstr', pad: 'pad output to this width' = 0): "convert a die roll base6 (base6d) string to hexadecimal" from ..baseconv import baseconv from ..util import remove_whitespace - return baseconv('b6d').tohex( remove_whitespace(b6d_str), pad ) + return baseconv('b6d').tohex(remove_whitespace(b6d_str), pad) diff --git a/mmgen/tool/wallet.py b/mmgen/tool/wallet.py index 23703133..747353fa 100755 --- a/mmgen/tool/wallet.py +++ b/mmgen/tool/wallet.py @@ -29,64 +29,64 @@ from ..wallet import Wallet class tool_cmd(tool_cmd_base): "key, address or subseed generation from an MMGen wallet" - def __init__(self,cfg,cmdname=None,proto=None,mmtype=None): - self.need_proto = cmdname in ('gen_key','gen_addr') - super().__init__(cfg,cmdname=cmdname,proto=proto,mmtype=mmtype) + def __init__(self, cfg, cmdname=None, proto=None, mmtype=None): + self.need_proto = cmdname in ('gen_key', 'gen_addr') + super().__init__(cfg, cmdname=cmdname, proto=proto, mmtype=mmtype) - def _get_seed_file(self,wallet): + def _get_seed_file(self, wallet): from ..fileutil import get_seed_file return get_seed_file( cfg = self.cfg, wallets = [wallet] if wallet else [], - nargs = 1 ) + nargs = 1) - def get_subseed(self,subseed_idx:str,wallet=''): + def get_subseed(self, subseed_idx: str, wallet=''): "get the Seed ID of a single subseed by Subseed Index for default or specified wallet" self.cfg._set_quiet(True) - return Wallet(self.cfg,self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid + return Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid - def get_subseed_by_seed_id(self,seed_id:str,wallet='',last_idx=SubSeedList.dfl_len): + def get_subseed_by_seed_id(self, seed_id: str, wallet='', last_idx=SubSeedList.dfl_len): "get the Subseed Index of a single subseed by Seed ID for default or specified wallet" self.cfg._set_quiet(True) - ret = Wallet(self.cfg,self._get_seed_file(wallet)).seed.subseed_by_seed_id( seed_id, last_idx ) + ret = Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseed_by_seed_id(seed_id, last_idx) return ret.ss_idx if ret else None - def list_subseeds(self,subseed_idx_range:str,wallet=''): + def list_subseeds(self, subseed_idx_range: str, wallet=''): "list a range of subseed Seed IDs for default or specified wallet" self.cfg._set_quiet(True) from ..subseed import SubSeedIdxRange - return Wallet(self.cfg,self._get_seed_file(wallet)).seed.subseeds.format( - *SubSeedIdxRange(subseed_idx_range) ) + return Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseeds.format( + *SubSeedIdxRange(subseed_idx_range)) def list_shares(self, share_count: int, id_str = 'default', master_share: f'(min:1, max:{MasterShareIdx.max_val}, 0=no master share)' = 0, - wallet = '' ): + wallet = ''): "list the Seed IDs of the shares resulting from a split of default or specified wallet" self.cfg._set_quiet(True) - return Wallet(self.cfg,self._get_seed_file(wallet)).seed.split( - share_count, id_str, master_share ).format() + return Wallet(self.cfg, self._get_seed_file(wallet)).seed.split( + share_count, id_str, master_share).format() - def gen_key(self,mmgen_addr:str,wallet=''): + def gen_key(self, mmgen_addr: str, wallet=''): "generate a single WIF key for specified MMGen address from default or specified wallet" - return self._gen_keyaddr( mmgen_addr, 'wif', wallet ) + return self._gen_keyaddr(mmgen_addr, 'wif', wallet) - def gen_addr(self,mmgen_addr:str,wallet=''): + def gen_addr(self, mmgen_addr: str, wallet=''): "generate a single MMGen address from default or specified wallet" - return self._gen_keyaddr( mmgen_addr, 'addr', wallet ) + return self._gen_keyaddr(mmgen_addr, 'addr', wallet) - def _gen_keyaddr(self,mmgen_addr,target,wallet=''): + def _gen_keyaddr(self, mmgen_addr, target, wallet=''): from ..addr import MMGenID - from ..addrlist import AddrList,AddrIdxList + from ..addrlist import AddrList, AddrIdxList - addr = MMGenID( self.proto, mmgen_addr ) + addr = MMGenID(self.proto, mmgen_addr) self.cfg._set_quiet(True) - ss = Wallet(self.cfg,self._get_seed_file(wallet)) + ss = Wallet(self.cfg, self._get_seed_file(wallet)) if ss.seed.sid != addr.sid: from ..util import die - die(1,f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})') + die(1, f'Seed ID of requested address ({addr.sid}) does not match wallet ({ss.seed.sid})') d = AddrList( cfg = self.cfg, @@ -94,6 +94,6 @@ class tool_cmd(tool_cmd_base): seed = ss.seed, addr_idxs = AddrIdxList(str(addr.idx)), mmtype = addr.mmtype, - skip_chksum = True ).data[0] + skip_chksum = True).data[0] - return { 'wif': d.sec.wif, 'addr': d.addr }[target] + return {'wif': d.sec.wif, 'addr': d.addr}[target]