pylint throughout (excluding tests) - various cleanups

This commit is contained in:
The MMGen Project 2023-10-11 12:58:50 +00:00
commit d326c96af8
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
32 changed files with 80 additions and 76 deletions

View file

@ -58,7 +58,7 @@ class AddrData(MMGenObject):
self.al_ids[addrlist.al_id] = addrlist
return True
else:
raise TypeError(f'Error: object {addrlist!r} is not of type AddrList')
raise TypeError(f'Error: object {addrlist!r} is not an instance of AddrList')
def make_reverse_dict(self,coinaddrs):
d = MMGenDict()

View file

@ -182,7 +182,7 @@ class AddrFile(MMGenObject):
f'Key doesn’t match address!\n {e.sec.wif}\n {e.addr}')
p.cfg._util.qmsg(' - done')
if self.cfg.yes or p.ka_validity_chk == True:
if self.cfg.yes or p.ka_validity_chk:
verify_keys()
else:
from .ui import keypress_confirm

View file

@ -107,15 +107,15 @@ class AddrListIDStr(str,Hilite):
def __new__(cls,addrlist,fmt_str=None):
idxs = [e.idx for e in addrlist.data]
prev = idxs[0]
ret = prev,
ret = [prev]
for i in idxs[1:]:
if i == prev + 1:
if i == idxs[-1]:
ret += '-', i
ret.extend(['-', i])
else:
if prev != ret[-1]:
ret += '-', prev
ret += ',', i
ret.extend(['-', prev])
ret.extend([',', i])
prev = i
s = ''.join(map(str,ret))

View file

@ -45,7 +45,7 @@ def test_equal(desc,a,b,*cdata):
if type(a) is int:
a = hex(a)
b = hex(b)
(network,coin,e,b_desc,verbose) = cdata
(network,coin,_,b_desc,verbose) = cdata
if verbose:
msg(f' {desc:20}: {a!r}')
if a != b:

View file

@ -71,7 +71,7 @@ class CoinAmt(Decimal,Hilite,InitErrors): # abstract class
return int(ret)
@classmethod
def fmtc(cls):
def fmtc(cls,*args,**kwargs):
cls.method_not_implemented()
def fmt(
@ -80,7 +80,7 @@ class CoinAmt(Decimal,Hilite,InitErrors): # abstract class
iwidth = 1, # width of the integer part
prec = None ):
s = self.__str__()
s = str(self)
prec = prec or self.max_prec
if '.' in s:
@ -94,7 +94,7 @@ class CoinAmt(Decimal,Hilite,InitErrors): # abstract class
color = color )
def hl(self,color=True):
return self.colorize(self.__str__(),color=color)
return self.colorize(str(self),color=color)
# fancy highlighting with coin unit, enclosure, formatting
def hl2(self,color=True,unit=False,fs='{}',encl=''):

View file

@ -87,15 +87,15 @@ class Signable:
else:
return False
def print_summary(self,txs):
def print_summary(self,signables):
if self.cfg.full_summary:
bmsg('\nAutosign summary:\n')
msg_r('\n'.join(tx.info.format(terse=True) for tx in txs))
msg_r('\n'.join(tx.info.format(terse=True) for tx in signables))
return
def gen():
for tx in txs:
for tx in signables:
non_mmgen = [o for o in tx.outputs if not o.mmid]
if non_mmgen:
yield (tx,non_mmgen)
@ -198,10 +198,10 @@ class Signable:
die('MsgFileFailedSID',f'Failed Seed IDs: {fmt_list(m.data["failed_sids"],fmt="bare")}')
return m
def print_summary(self,messages):
def print_summary(self,signables):
gmsg('\nSigned message files:')
for m in messages:
gmsg(' ' + m.signed_filename)
for message in signables:
gmsg(' ' + message.signed_filename)
def gen_bad_list(self,bad_files):
for f in bad_files:
@ -370,7 +370,7 @@ class Autosign:
if e.code != 0:
fails += 1
return False if fails else True
return not fails
async def sign_all(self,target_name):
target = getattr(Signable,target_name)(self)

View file

@ -160,7 +160,7 @@ class baseconv:
( 'seed data' if pad == 'seed' else f'{words_arg!r}:' ) +
f' not in {desc} format' )
ret = sum([wl.index(words[::-1][i])*(base**i) for i in range(len(words))])
ret = sum(wl.index(words[::-1][i])*(base**i) for i in range(len(words)))
bl = ret.bit_length()
return ret.to_bytes(max(pad_val,bl//8+bool(bl%8)),'big')

View file

@ -65,8 +65,8 @@ class bip39(baseconv):
seed_bits = seed_len * 8 if in_bytes else seed_len * 4 if in_hex else seed_len
try:
return cls.constants[seed_bits].mn_len
except:
raise ValueError(f'{seed_bits!r}: invalid seed length for BIP39 mnemonic')
except Exception as e:
raise ValueError(f'{seed_bits!r}: invalid seed length for BIP39 mnemonic') from e
def tobytes(self,*args,**kwargs):
raise NotImplementedError('Method not supported')
@ -74,24 +74,24 @@ class bip39(baseconv):
def frombytes(self,*args,**kwargs):
raise NotImplementedError('Method not supported')
def tohex(self,words,pad=None):
assert isinstance(words,(list,tuple)),'words must be list or tuple'
def tohex(self,words_arg,pad=None):
assert isinstance(words_arg,(list,tuple)),'words_arg must be list or tuple'
assert pad in (None,'seed'), f"{pad}: invalid 'pad' argument (must be None or 'seed')"
wl = self.digits
for n,w in enumerate(words):
for n,w in enumerate(words_arg):
if w not in wl:
die( 'MnemonicError', f'word #{n+1} is not in the BIP39 word list' )
res = ''.join(f'{wl.index(w):011b}' for w in words_arg)
for k,v in self.constants.items():
if len(words) == v.mn_len:
if len(words_arg) == v.mn_len:
bitlen = k
break
else:
die( 'MnemonicError', f'{len(words)}: invalid BIP39 seed phrase length' )
die( 'MnemonicError', f'{len(words_arg)}: invalid BIP39 seed phrase length' )
seed_bin = res[:bitlen]
chk_bin = res[bitlen:]
@ -108,13 +108,13 @@ class bip39(baseconv):
return seed_hex
def fromhex(self,seed_hex,pad=None,tostr=False):
assert is_hex_str(seed_hex),'seed data not a hexadecimal string'
def fromhex(self,hexstr,pad=None,tostr=False):
assert is_hex_str(hexstr),'seed data not a hexadecimal string'
assert tostr is False,"'tostr' must be False for 'bip39'"
assert pad in (None,'seed'), f"{pad}: invalid 'pad' argument (must be None or 'seed')"
wl = self.digits
seed_bytes = bytes.fromhex(seed_hex)
seed_bytes = bytes.fromhex(hexstr)
bitlen = len(seed_bytes) * 8
assert bitlen in self.constants, f'{bitlen}: invalid seed bit length'

View file

@ -368,31 +368,27 @@ class Config(Lockable):
"""
location of mmgen.cfg
"""
if hasattr(self,'_data_dir_root'):
return self._data_dir_root
else:
if not hasattr(self,'_data_dir_root'):
if self._data_dir_root_override:
self._data_dir_root = os.path.normpath(os.path.abspath(self._data_dir_root_override))
elif self.test_suite:
self._data_dir_root = self.test_datadir
else:
self._data_dir_root = os.path.join(gc.home_dir,'.'+gc.proj_name.lower())
return self._data_dir_root
return self._data_dir_root
@property
def data_dir(self):
"""
location of wallet and other data - same as data_dir_root for mainnet
"""
if hasattr(self,'_data_dir'):
return self._data_dir
else:
if not hasattr(self,'_data_dir'):
self._data_dir = os.path.normpath(os.path.join(*{
'regtest': (self.data_dir_root, 'regtest', self.coin.lower(), (self.regtest_user or 'none') ),
'testnet': (self.data_dir_root, 'testnet'),
'mainnet': (self.data_dir_root,),
}[self.network] ))
return self._data_dir
return self._data_dir
def __init__(
self,
@ -855,14 +851,13 @@ def fmt_opt(o):
return '--' + o.replace('_','-')
def opt_postproc_debug(cfg):
a = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) != None]
b = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) == None]
none_opts = [k for k in dir(cfg) if k[:2] != '__' and getattr(cfg,k) is None]
from .util import Msg
Msg('\n Configuration opts:')
for e in [d for d in dir(cfg) if d[:2] != '__']:
Msg(f' {e:<20}: {getattr(cfg,e)}')
Msg(" Configuration opts set to 'None':")
Msg(' {}\n'.format('\n '.join(b)))
Msg(' {}\n'.format('\n '.join(none_opts)))
Msg('\n=== end opts.py debug ===\n')
def conv_type(

View file

@ -90,7 +90,7 @@ class cfg_file:
return gen_lines()
@classmethod
def get_cls_by_id(self,id_str):
def get_cls_by_id(cls,id_str):
d = {
'usr': CfgFileUsr,
'sys': CfgFileSampleSys,

View file

@ -55,4 +55,4 @@ def encodepoint(P):
x = P[0]
y = P[1]
bits = [(y >> i) & 1 for i in range(b-1)] + [x & 1]
return bytes([sum([bits[i * 8 + j] << j for j in range(8)]) for i in range(b//8)])
return bytes([sum(bits[i * 8 + j] << j for j in range(8)) for i in range(b//8)])

View file

@ -268,7 +268,7 @@ class RPCDaemon(Daemon):
@property
def start_cmd(self):
return ([self.exec_fn] + self.daemon_args + self.usr_daemon_args)
return [self.exec_fn] + self.daemon_args + self.usr_daemon_args
class CoinDaemon(Daemon):
networks = ('mainnet','testnet','regtest')

View file

@ -256,7 +256,7 @@ class coin_msg:
from .wallet import Wallet
wallet_seeds = [Wallet(cfg=self.cfg,fn=fn).seed for fn in wallet_files]
need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
saved_seeds = list()
saved_seeds = []
# First try wallet seeds:
for sid in need_sids:
@ -363,7 +363,7 @@ class coin_msg:
self.data['signatures']
)}
def _get_obj(clsname,cfg,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs):
def _get_obj(clsname,cfg,*args,coin=None,network='mainnet',infile=None,data=None,**kwargs):
assert not args, 'msg:_get_obj(): only keyword args allowed'

View file

@ -32,7 +32,7 @@ def get_obj(objname,*args,**kwargs):
- If return_bool is True, return True instead of the object.
Only keyword args are accepted.
"""
assert args == (), 'get_obj_chk1'
assert not args, 'get_obj_chk1'
silent,return_bool = (False,False)
if 'silent' in kwargs:

View file

@ -208,7 +208,7 @@ class MMGenRegtest(MMGenObject):
async def send(self,addr,amt):
gmsg(f'Sending {amt} miner {self.d.coin} to address {addr}')
cp = await self.rpc_call('sendtoaddress',addr,str(amt),wallet='miner')
await self.rpc_call('sendtoaddress',addr,str(amt),wallet='miner')
await self.generate(1)
async def mempool(self):
@ -256,7 +256,7 @@ class MMGenRegtest(MMGenObject):
create_data_dir( self.cfg, self.d.datadir )
os.rmdir(self.d.datadir)
shutil.copytree(source_data_dir,self.d.datadir,symlinks=True)
shutil.copytree(source_rt.d.datadir,self.d.datadir,symlinks=True)
await self.start_daemon(reindex=True)
await self.rpc_call('stop')

View file

@ -36,6 +36,7 @@ class BitcoinTwTransaction:
self.parent = parent
self.proto = proto
self.rpc = rpc
self.idx = idx
self.unspent_info = unspent_info
self.tx = tx
@ -296,7 +297,8 @@ Filters/Actions: show [u]nconfirmed, [q]uit menu, r[e]draw:
def do_json_dump(*data):
nw = f'{self.proto.coin.lower()}-{self.proto.network}'
for d,fn_stem in data:
open(f'/tmp/{fn_stem}-{nw}.json','w').write(json.dumps(d,cls=json_encoder))
with open(f'/tmp/{fn_stem}-{nw}.json','w') as fh:
fh.write(json.dumps(d,cls=json_encoder))
_mmp = namedtuple('mmap_datum',['twmmid','comment'])

View file

@ -41,7 +41,7 @@ class New(Base,TxBase.New):
if self.cfg.contract_data:
m = "'--contract-data' option may not be used with token transaction"
assert not 'Token' in self.name, m
assert 'Token' not in self.name, m
with open(self.cfg.contract_data) as fp:
self.usr_contract_data = HexStr(fp.read().strip())
self.disable_fee_check = True
@ -81,7 +81,7 @@ class New(Base,TxBase.New):
async def process_cmd_args(self,cmd_args,ad_f,ad_w):
lc = len(cmd_args)
if lc == 0 and self.usr_contract_data and not 'Token' in self.name:
if lc == 0 and self.usr_contract_data and 'Token' not in self.name:
return
if lc != 1:
die(1,f'{lc} output{suf(lc)} specified, but Ethereum transactions must have exactly one')

View file

@ -75,7 +75,7 @@ class MoneroRPCClient(RPCClient):
self.daemon_version = None
def call(self,method,*params,**kwargs):
assert params == (), f'{self.name}.call() accepts keyword arguments only'
assert not params, f'{self.name}.call() accepts keyword arguments only'
return self.process_http_resp(self.backend.run_noasync(
payload = {'id': 0, 'jsonrpc': '2.0', 'method': method, 'params': kwargs },
timeout = 3600, # allow enough time to sync ≈1,000,000 blocks
@ -83,7 +83,7 @@ class MoneroRPCClient(RPCClient):
))
def call_raw(self,method,*params,**kwargs):
assert params == (), f'{self.name}.call() accepts keyword arguments only'
assert not params, f'{self.name}.call() accepts keyword arguments only'
return self.process_http_resp(self.backend.run_noasync(
payload = kwargs,
timeout = self.timeout,
@ -125,7 +125,7 @@ class MoneroWalletRPCClient(MoneroRPCClient):
'refresh', # start_height
)
def call_raw(*args,**kwargs):
def call_raw(self,*args,**kwargs):
raise NotImplementedError('call_raw() not implemented for class MoneroWalletRPCClient')
async def do_stop_daemon(self,silent=False):
@ -139,7 +139,7 @@ class MoneroWalletRPCClient(MoneroRPCClient):
from ...util import msg,msg_r,ymsg
from ...color import yellow
msg(f'{type(e).__name__}: {e}')
msg_r(yellow(f'Unable to shut down wallet daemon gracefully, so killing process instead...'))
msg_r(yellow('Unable to shut down wallet daemon gracefully, so killing process instead...'))
ret = self.daemon.stop(silent=True)
ymsg('done')
return ret

View file

@ -103,7 +103,7 @@ class Sha2:
def bytesToWords(self):
ws = self.wordSize
assert len(self.M) % ws == 0
self.M = tuple([unpack(self.word_fmt,self.M[i*ws:ws+(i*ws)])[0] for i in range(len(self.M) // ws)])
self.M = tuple(unpack(self.word_fmt,self.M[i*ws:ws+(i*ws)])[0] for i in range(len(self.M) // ws))
def digest(self):
return b''.join((pack(self.word_fmt,w) for w in self.H))

View file

@ -31,7 +31,7 @@ try:
import tty,termios
from select import select
_platform = 'linux'
except:
except ImportError:
try:
import msvcrt
_platform = 'mswin'

View file

@ -123,7 +123,7 @@ class TwJSON:
desc2 = 'saved checksum' )
if not await self.check_and_create_wallet():
return True
return
from ..fileutil import get_data_from_file
self.data = json.loads(get_data_from_file( self.cfg, filename, quiet=True ))

View file

@ -131,8 +131,8 @@ class TwAddressesPrune(TwAddresses):
dfl[desc] = auto[desc] and prune
skip_all_used = auto['used'] and not dfl['used']
if auto[desc]: # we’ve switched to auto mode, so go back and fix up all previous entries
for addrnum in addrnums[:n]:
e = parent.disp_data[addrnum-1]
for idx in addrnums[:n]:
e = parent.disp_data[idx-1]
if skip_all_used and e.recvd:
e.tag = False
elif desc == 'amt' and e.amt:

View file

@ -48,7 +48,7 @@ def _get_cls_info(clsname,modname,args,kwargs):
if not cls:
die(1,f'{ext!r}: unrecognized file extension for CompletedTX')
clsname = cls.__name__
modname = cls.__module__.split('.')[-1]
modname = cls.__module__.rsplit('.',maxsplit=1)[-1]
kwargs['proto'] = proto

View file

@ -67,7 +67,7 @@ class MMGenTxIOList(list,MMGenObject):
if data:
assert isinstance(data,list), 'MMGenTxIOList_check1'
else:
data = list()
data = []
list.__init__(self,data)
class Base(MMGenObject):

View file

@ -20,7 +20,7 @@ class Completed(Base):
"""
filename_api = True
def __init__(self,cfg,filename=None,data=None,quiet_open=False,*args,**kwargs):
def __init__(self,cfg,*args,filename=None,data=None,quiet_open=False,**kwargs):
assert (filename or data) and not (filename and data), 'CompletedTX_chk1'

View file

@ -325,8 +325,8 @@ class New(Base):
die(1,f'{addr!r}: not an MMGen ID or {self.coin} address')
found = False
for idx in range(len(unspent)):
if getattr(unspent[idx],attr) == addr:
for idx,us in enumerate(unspent):
if getattr(us,attr) == addr:
yield idx2num(idx)
found = True

View file

@ -102,10 +102,10 @@ def keypress_confirm(
reply = get_char(prompt,immed_chars='yYnN').strip('\n\r')
if not reply:
msg_r(nl)
return True if default_yes else False
return default_yes
elif reply in 'yYnN':
msg_r(nl)
return True if reply in 'yY' else False
return reply in 'yY'
else:
msg_r('\nInvalid reply\n' if verbose else '\r')

View file

@ -82,11 +82,17 @@ def int2bytespec(n,spec,fmt,print_sym=True,strip=False,add_space=False):
else:
die(1,f'{spec!r}: unrecognized bytespec')
ret = f'{n/spec2int(spec):{fmt}f}'
if strip:
ret = '{:{}f}'.format(n/spec2int(spec),fmt).rstrip('0')
return ret + ('0' if ret.endswith('.') else '') + ((' ' if add_space else '') + spec if print_sym else '')
ret = ret.rstrip('0')
return (
ret
+ ('0' if ret.endswith('.') else '')
+ ((' ' if add_space else '') + spec if print_sym else '') )
else:
return '{:{}f}'.format(n/spec2int(spec),fmt) + ((' ' if add_space else '') + spec if print_sym else '')
return (
ret
+ ((' ' if add_space else '') + spec if print_sym else '') )
def parse_bytespec(nbytes):
m = re.match(r'([0123456789.]+)(.*)',nbytes)

View file

@ -21,7 +21,7 @@ from . import wallet_data,get_wallet_cls
class WalletMeta(type):
def __init__(cls,name,bases,namespace):
t = cls.__module__.split('.')[-1]
t = cls.__module__.rsplit('.',maxsplit=1)[-1]
if t in wallet_data:
for k,v in wallet_data[t]._asdict().items():
setattr(cls,k,v)

View file

@ -81,7 +81,7 @@ class wallet(wallet):
self.cfg._util.qmsg(f'Data read from file {self.infile.name!r} at offset {d.hincog_offset}')
# overrides method in Wallet
def write_to_file(self):
def write_to_file(self,**kwargs):
d = self.ssdata
self._format()
self.cfg._util.compare_or_die(

View file

@ -46,10 +46,13 @@ class xmrseed(baseconv):
wstr = ''.join(word[:3] for word in words)
return words[crc32(wstr.encode()) % len(words)]
def tobytes(self,words,pad=None):
assert isinstance(words,(list,tuple)),'words must be list or tuple'
def tobytes(self,words_arg,pad=None):
assert isinstance(words_arg,(list,tuple)),'words must be list or tuple'
assert pad is None, f"{pad}: invalid 'pad' argument (must be None)"
words = words_arg
desc = self.desc.short
wl = self.digits
base = len(wl)

View file

@ -1010,7 +1010,7 @@ class MoneroWalletOps:
msg('\n Addresses of account #{} ({}):'.format(
account,
accts_data['subaddress_accounts'][account]['label']))
fs = ' {:6} {:18} {:%s} {}' % max( [len(e['label']) for e in d], default=0 )
fs = ' {:6} {:18} {:%s} {}' % max((len(e['label']) for e in d), default=0)
msg(fs.format('Index ','Address','Label','Used'))
for e in d:
msg(fs.format(
@ -1881,8 +1881,6 @@ class MoneroWalletOps:
super().__init__(cfg,uarg_tuple)
check_uopts = MoneroWalletOps.submit.check_uopts
self.tx = MoneroMMGenTX.Signed( self.cfg, Path(uarg.infile) )
if self.cfg.tx_relay_daemon: