keyword-only parameters throughout
This commit is contained in:
parent
0563916c96
commit
89ad0fd29b
101 changed files with 314 additions and 269 deletions
|
|
@ -182,14 +182,14 @@ class CoinAddr(HiliteStr, InitErrors, MMGenObject):
|
|||
|
||||
# reimplement some HiliteStr methods:
|
||||
@classmethod
|
||||
def fmtc(cls, s, width, color=False):
|
||||
def fmtc(cls, s, width, *, color=False):
|
||||
return super().fmtc(s=s[:width-2]+'..' if len(s) > width else s, width=width, color=color)
|
||||
|
||||
def fmt(self, view_pref, width, color=False):
|
||||
def fmt(self, view_pref, width, *, color=False):
|
||||
s = self.views[view_pref]
|
||||
return super().fmtc(f'{s[:width-2]}..' if len(s) > width else s, width=width, color=color)
|
||||
|
||||
def hl(self, view_pref, color=True):
|
||||
def hl(self, view_pref, *, color=True):
|
||||
return getattr(color_mod, self.color)(self.views[view_pref]) if color else self.views[view_pref]
|
||||
|
||||
def is_coin_addr(proto, s):
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ class AddrFile(MMGenObject):
|
|||
def write(
|
||||
self,
|
||||
fn = None,
|
||||
*,
|
||||
binary = False,
|
||||
desc = None,
|
||||
ask_overwrite = True,
|
||||
|
|
@ -92,7 +93,7 @@ class AddrFile(MMGenObject):
|
|||
)
|
||||
return self.parent.al_id.sid + (' ' if lbl_p2 else '') + lbl_p2
|
||||
|
||||
def format(self, add_comments=False):
|
||||
def format(self, *, add_comments=False):
|
||||
p = self.parent
|
||||
if p.gen_passwds and p.pw_fmt in ('bip39', 'xmrseed'):
|
||||
desc_pfx = f'{p.pw_fmt.upper()} '
|
||||
|
|
@ -200,7 +201,7 @@ class AddrFile(MMGenObject):
|
|||
|
||||
return ret
|
||||
|
||||
def parse_file(self, fn, buf=[], exit_on_error=True):
|
||||
def parse_file(self, fn, *, buf=[], exit_on_error=True):
|
||||
|
||||
def parse_addrfile_label(lbl):
|
||||
"""
|
||||
|
|
@ -249,7 +250,7 @@ class AddrFile(MMGenObject):
|
|||
p = self.parent
|
||||
|
||||
from .fileutil import get_lines_from_file
|
||||
lines = get_lines_from_file(p.cfg, fn, p.desc+' data', trim_comments=True)
|
||||
lines = get_lines_from_file(p.cfg, fn, desc=f'{p.desc} data', trim_comments=True)
|
||||
|
||||
try:
|
||||
assert len(lines) >= 3, f'Too few lines in address file ({len(lines)})'
|
||||
|
|
|
|||
|
|
@ -159,6 +159,7 @@ class AddrList(MMGenObject): # Address info for a single seed ID
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
infile = '',
|
||||
al_id = '',
|
||||
adata = [],
|
||||
|
|
|
|||
|
|
@ -267,7 +267,7 @@ class CoinInfo:
|
|||
return None
|
||||
return cls.coin_constants[network][idx]
|
||||
|
||||
def make_proto(e, testnet=False):
|
||||
def make_proto(e, *, testnet=False):
|
||||
|
||||
proto = ('X_' if e.name[0] in '0123456789' else '') + e.name + ('Testnet' if testnet else '')
|
||||
|
||||
|
|
@ -297,7 +297,7 @@ def make_proto(e, testnet=False):
|
|||
)
|
||||
)
|
||||
|
||||
def init_genonly_altcoins(usr_coin=None, testnet=False):
|
||||
def init_genonly_altcoins(usr_coin=None, *, testnet=False):
|
||||
"""
|
||||
Initialize altcoin protocol class or classes for current network.
|
||||
If usr_coin is a core coin, initialization is skipped.
|
||||
|
|
|
|||
10
mmgen/amt.py
10
mmgen/amt.py
|
|
@ -40,7 +40,7 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
|
|||
max_amt = None # coin supply if known, otherwise None
|
||||
units = () # defined unit names, e.g. ('satoshi',...)
|
||||
|
||||
def __new__(cls, num, from_unit=None, from_decimal=False):
|
||||
def __new__(cls, num, *, from_unit=None, from_decimal=False):
|
||||
|
||||
if isinstance(num, CoinAmt):
|
||||
raise TypeError(f'CoinAmt: {num} is instance of {cls.__name__}')
|
||||
|
|
@ -71,7 +71,7 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
|
|||
def fmtc(cls, *args, **kwargs):
|
||||
cls.method_not_implemented()
|
||||
|
||||
def fmt(self, color=False, iwidth=1, prec=None): # iwidth: width of the integer part
|
||||
def fmt(self, *, color=False, iwidth=1, prec=None): # iwidth: width of the integer part
|
||||
prec = prec or self.max_prec
|
||||
if '.' in (s := str(self)):
|
||||
a, b = s.split('.', 1)
|
||||
|
|
@ -83,11 +83,11 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
|
|||
s.rjust(iwidth).ljust(iwidth+prec+1),
|
||||
color = color)
|
||||
|
||||
def hl(self, color=True):
|
||||
def hl(self, *, color=True):
|
||||
return self.colorize(str(self), color=color)
|
||||
|
||||
# fancy highlighting with coin unit, enclosure, formatting
|
||||
def hl2(self, color=True, unit=False, fs='{}', encl=''):
|
||||
def hl2(self, *, color=True, unit=False, fs='{}', encl=''):
|
||||
res = fs.format(self)
|
||||
return (
|
||||
encl[:-1]
|
||||
|
|
@ -156,7 +156,7 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
|
|||
def __mod__(self, *args, **kwargs):
|
||||
self.method_not_implemented()
|
||||
|
||||
def is_coin_amt(proto, num, from_unit=None, from_decimal=False):
|
||||
def is_coin_amt(proto, num, *, from_unit=None, from_decimal=False):
|
||||
assert proto.coin_amt, 'proto.coin_amt is None! Did you call init_proto() with ‘need_amt’?'
|
||||
return get_obj(proto.coin_amt, num=num, from_unit=from_unit, from_decimal=from_decimal, silent=True, return_bool=True)
|
||||
|
||||
|
|
|
|||
|
|
@ -33,12 +33,12 @@ def SwapMgr(*args, **kwargs):
|
|||
|
||||
class SwapMgrBase:
|
||||
|
||||
def __init__(self, cfg, ignore_zram=False):
|
||||
def __init__(self, cfg, *, ignore_zram=False):
|
||||
self.cfg = cfg
|
||||
self.ignore_zram = ignore_zram
|
||||
self.desc = 'disk swap' if ignore_zram else 'swap'
|
||||
|
||||
def enable(self, quiet=False):
|
||||
def enable(self, *, quiet=False):
|
||||
ret = self.do_enable()
|
||||
if not quiet:
|
||||
self.cfg._util.qmsg(
|
||||
|
|
@ -47,7 +47,7 @@ class SwapMgrBase:
|
|||
f'Could not enable {self.desc}')
|
||||
return ret
|
||||
|
||||
def disable(self, quiet=False):
|
||||
def disable(self, *, quiet=False):
|
||||
self.cfg._util.qmsg_r(f'Attempting to disable {self.desc}...')
|
||||
ret = self.do_disable()
|
||||
self.cfg._util.qmsg('success')
|
||||
|
|
@ -189,7 +189,7 @@ class Signable:
|
|||
b = ' {}\n'.format('\n '.join(self.gen_bad_list(sorted(bad_files, key=lambda f: f.name))))
|
||||
))
|
||||
|
||||
def die_wrong_num_txs(self, tx_type, msg=None, desc=None, show_dir=False):
|
||||
def die_wrong_num_txs(self, tx_type, *, msg=None, desc=None, show_dir=False):
|
||||
num_txs = len(getattr(self, tx_type))
|
||||
die('AutosignTXError', "{m}{a} {b} transaction{c} {d} {e}!".format(
|
||||
m = msg + '\n' if msg else '',
|
||||
|
|
@ -581,7 +581,7 @@ class Autosign:
|
|||
|
||||
return self._wallet_files
|
||||
|
||||
def do_mount(self, silent=False, verbose=False):
|
||||
def do_mount(self, *, silent=False, verbose=False):
|
||||
|
||||
def check_or_create(dirname):
|
||||
path = getattr(self, dirname)
|
||||
|
|
@ -615,7 +615,7 @@ class Autosign:
|
|||
for dirname in self.dirs:
|
||||
check_or_create(dirname)
|
||||
|
||||
def do_umount(self, silent=False, verbose=False):
|
||||
def do_umount(self, *, silent=False, verbose=False):
|
||||
if self.mountpoint.is_mount():
|
||||
run(['sync'], check=True)
|
||||
if not silent:
|
||||
|
|
@ -630,7 +630,7 @@ class Autosign:
|
|||
fails = 0
|
||||
for wf in self.wallet_files:
|
||||
try:
|
||||
Wallet(self.cfg, wf, ignore_in_fmt=True, passwd_file=str(self.keyfile))
|
||||
Wallet(self.cfg, fn=wf, ignore_in_fmt=True, passwd_file=str(self.keyfile))
|
||||
except SystemExit as e:
|
||||
if e.code != 0:
|
||||
fails += 1
|
||||
|
|
@ -710,7 +710,7 @@ class Autosign:
|
|||
die(2, 'Unable to write ' + desc)
|
||||
msg('Wrote ' + desc)
|
||||
|
||||
def gen_key(self, no_unmount=False):
|
||||
def gen_key(self, *, no_unmount=False):
|
||||
if not self.device_inserted:
|
||||
die(1, 'Removable device not present!')
|
||||
self.do_mount()
|
||||
|
|
@ -776,7 +776,7 @@ class Autosign:
|
|||
cfg = self.cfg,
|
||||
prompt = f"Default wallet '{wf}' found.\nUse default wallet for autosigning?",
|
||||
default_yes = True):
|
||||
ss_in = Wallet(Config(), wf)
|
||||
ss_in = Wallet(Config(), fn=wf)
|
||||
else:
|
||||
ss_in = Wallet(self.cfg, in_fmt=self.mn_fmts[self.cfg.mnemonic_fmt or self.dfl_mn_fmt])
|
||||
ss_out = Wallet(self.cfg, ss=ss_in, passwd_file=str(self.keyfile))
|
||||
|
|
|
|||
|
|
@ -129,11 +129,11 @@ class baseconv:
|
|||
else:
|
||||
die('BaseConversionPadError', f"{pad!r}: illegal value for 'pad' (must be None, 'seed' or int)")
|
||||
|
||||
def tohex(self, words_arg, pad=None):
|
||||
def tohex(self, words_arg, *, pad=None):
|
||||
"convert string or list data of instance base to a hexadecimal string"
|
||||
return self.tobytes(words_arg, pad//2 if type(pad) is int else pad).hex()
|
||||
return self.tobytes(words_arg, pad=pad//2 if type(pad) is int else pad).hex()
|
||||
|
||||
def tobytes(self, words_arg, pad=None):
|
||||
def tobytes(self, words_arg, *, pad=None):
|
||||
"convert string or list data of instance base to byte string"
|
||||
|
||||
words = words_arg if isinstance(words_arg, (list, tuple)) else tuple(words_arg.strip())
|
||||
|
|
@ -163,7 +163,7 @@ class baseconv:
|
|||
bl = ret.bit_length()
|
||||
return ret.to_bytes(max(pad_val, bl//8+bool(bl%8)), 'big')
|
||||
|
||||
def fromhex(self, hexstr, pad=None, tostr=False):
|
||||
def fromhex(self, hexstr, *, pad=None, tostr=False):
|
||||
"convert a hexadecimal string to a list or string data of instance base"
|
||||
|
||||
from .util import is_hex_str
|
||||
|
|
@ -172,9 +172,9 @@ class baseconv:
|
|||
('seed data' if pad == 'seed' else f'{hexstr!r}:') +
|
||||
' not a hexadecimal string')
|
||||
|
||||
return self.frombytes(bytes.fromhex(hexstr), pad, tostr)
|
||||
return self.frombytes(bytes.fromhex(hexstr), pad=pad, tostr=tostr)
|
||||
|
||||
def frombytes(self, bytestr, pad=None, tostr=False):
|
||||
def frombytes(self, bytestr, *, pad=None, tostr=False):
|
||||
"convert byte string to list or string data of instance base"
|
||||
|
||||
if not bytestr:
|
||||
|
|
|
|||
|
|
@ -54,14 +54,14 @@ class bip39(baseconv):
|
|||
self.wl_id = 'bip39'
|
||||
|
||||
@classmethod
|
||||
def nwords2seedlen(cls, nwords, in_bytes=False, in_hex=False):
|
||||
def nwords2seedlen(cls, nwords, *, in_bytes=False, in_hex=False):
|
||||
for k, v in cls.constants.items():
|
||||
if v.mn_len == nwords:
|
||||
return k//8 if in_bytes else k//4 if in_hex else k
|
||||
die('MnemonicError', f'{nwords!r}: invalid word length for BIP39 mnemonic')
|
||||
|
||||
@classmethod
|
||||
def seedlen2nwords(cls, seed_len, in_bytes=False, in_hex=False):
|
||||
def seedlen2nwords(cls, seed_len, *, in_bytes=False, in_hex=False):
|
||||
seed_bits = seed_len * 8 if in_bytes else seed_len * 4 if in_hex else seed_len
|
||||
try:
|
||||
return cls.constants[seed_bits].mn_len
|
||||
|
|
@ -105,11 +105,11 @@ class bip39(baseconv):
|
|||
|
||||
return seed_bytes
|
||||
|
||||
def fromhex(self, hexstr, pad=None, tostr=False):
|
||||
def fromhex(self, hexstr, *, pad=None, tostr=False):
|
||||
assert is_hex_str(hexstr), 'seed data not a hexadecimal string'
|
||||
return self.frombytes(bytes.fromhex(hexstr), pad=pad, tostr=tostr)
|
||||
|
||||
def frombytes(self, seed_bytes, pad=None, tostr=False):
|
||||
def frombytes(self, seed_bytes, *, pad=None, tostr=False):
|
||||
assert tostr is False, "'tostr' must be False for 'bip39'"
|
||||
assert pad in (None, 'seed'), f"{pad}: invalid 'pad' argument (must be None or 'seed')"
|
||||
|
||||
|
|
|
|||
|
|
@ -185,6 +185,7 @@ class MasterNode(Lockable):
|
|||
def init_cfg(
|
||||
self,
|
||||
coin = None,
|
||||
*,
|
||||
network = None,
|
||||
addr_type = None,
|
||||
from_path = False,
|
||||
|
|
@ -204,11 +205,11 @@ class MasterNode(Lockable):
|
|||
new._lock()
|
||||
return new
|
||||
|
||||
def to_coin_type(self, coin=None, network=None, addr_type=None):
|
||||
return self.init_cfg(coin, network, addr_type).to_coin_type()
|
||||
def to_coin_type(self, *, coin=None, network=None, addr_type=None):
|
||||
return self.init_cfg(coin, network=network, addr_type=addr_type).to_coin_type()
|
||||
|
||||
def to_chain(self, idx, coin=None, network=None, addr_type=None, hardened=False, public=False):
|
||||
return self.init_cfg(coin, network, addr_type).to_chain(
|
||||
def to_chain(self, idx, *, coin=None, network=None, addr_type=None, hardened=False, public=False):
|
||||
return self.init_cfg(coin, network=network, addr_type=addr_type).to_chain(
|
||||
idx = idx,
|
||||
hardened = hardened,
|
||||
public = public)
|
||||
|
|
@ -294,7 +295,7 @@ class BipHDNode(Lockable):
|
|||
def xprv(self):
|
||||
return self.key_extended(public=False, as_str=True)
|
||||
|
||||
def key_extended(self, public, as_str=False):
|
||||
def key_extended(self, public, *, as_str=False):
|
||||
if self.public and not public:
|
||||
raise ValueError('cannot create extended private key for public node!')
|
||||
ret = b58chk_encode(
|
||||
|
|
@ -363,6 +364,7 @@ class BipHDNode(Lockable):
|
|||
base_cfg,
|
||||
seed,
|
||||
path_str,
|
||||
*,
|
||||
coin = None,
|
||||
addr_type = None,
|
||||
no_path_checks = False):
|
||||
|
|
@ -441,7 +443,7 @@ class BipHDNodeMaster(BipHDNode):
|
|||
# purpose coin_type
|
||||
return self.derive_private().derive_private()
|
||||
|
||||
def to_chain(self, idx, hardened=False, public=False):
|
||||
def to_chain(self, idx, *, hardened=False, public=False):
|
||||
# purpose coin_type account #0 chain
|
||||
return self.derive_private().derive_private().derive_private(idx=0).derive(
|
||||
idx = idx,
|
||||
|
|
@ -475,7 +477,7 @@ class BipHDNodeCoinType(BipHDNode):
|
|||
f'chain index {chain_idx} for coin {cfg.base_cfg.coin!r}')
|
||||
return (chain_idx, type(self).hardened)
|
||||
|
||||
def to_chain(self, idx, hardened=False, public=False):
|
||||
def to_chain(self, idx, *, hardened=False, public=False):
|
||||
# account #0 chain
|
||||
return self.derive_private(idx=0).derive(
|
||||
idx = idx,
|
||||
|
|
|
|||
|
|
@ -438,6 +438,7 @@ class Config(Lockable):
|
|||
def __init__(
|
||||
self,
|
||||
cfg = None,
|
||||
*,
|
||||
opts_data = None,
|
||||
init_opts = None,
|
||||
parse_only = False,
|
||||
|
|
@ -845,7 +846,7 @@ def check_opts(cfg): # Raises exception if any check fails
|
|||
if hasattr(cfg, key2):
|
||||
val2 = getattr(cfg, key2)
|
||||
from .wallet import get_wallet_data
|
||||
wd = get_wallet_data('incog_hidden')
|
||||
wd = get_wallet_data(wtype='incog_hidden')
|
||||
if val2 and val2 not in wd.fmt_codes:
|
||||
die('UserOptError', f'Option conflict:\n {fmt_opt(name)}, with\n {fmt_opt(key2)}={val2}')
|
||||
|
||||
|
|
@ -929,7 +930,7 @@ def opt_postproc_debug(cfg):
|
|||
Msg(' {}\n'.format('\n '.join(none_opts)))
|
||||
Msg('\n=== end opts.py debug ===\n')
|
||||
|
||||
def conv_type(name, val, refval, src, invert_bool=False):
|
||||
def conv_type(name, val, refval, *, src, invert_bool=False):
|
||||
|
||||
def do_fail():
|
||||
desc = {
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ class Crypto:
|
|||
return self.sha256_rounds(step1)
|
||||
|
||||
def encrypt_seed(self, data, key, desc='seed'):
|
||||
return self.encrypt_data(data, key, desc=desc)
|
||||
return self.encrypt_data(data, key=key, desc=desc)
|
||||
|
||||
def decrypt_seed(self, enc_seed, key, seed_id, key_id):
|
||||
self.util.vmsg_r('Checking key...')
|
||||
|
|
@ -121,6 +121,7 @@ class Crypto:
|
|||
def encrypt_data(
|
||||
self,
|
||||
data,
|
||||
*,
|
||||
key,
|
||||
iv = aesctr_dfl_iv,
|
||||
desc = 'data',
|
||||
|
|
@ -214,6 +215,7 @@ class Crypto:
|
|||
passwd,
|
||||
salt,
|
||||
hash_preset,
|
||||
*,
|
||||
desc = 'encryption key',
|
||||
from_what = 'passphrase',
|
||||
verbose = False):
|
||||
|
|
@ -407,7 +409,7 @@ class Crypto:
|
|||
passwd_file = self.cfg.passwd_file)
|
||||
key = self.make_key(passwd, salt, hp)
|
||||
from hashlib import sha256
|
||||
enc_d = self.encrypt_data(sha256(nonce+data).digest() + nonce + data, key, iv, desc=desc)
|
||||
enc_d = self.encrypt_data(sha256(nonce+data).digest() + nonce + data, key=key, iv=iv, desc=desc)
|
||||
return salt+iv+enc_d
|
||||
|
||||
def mmgen_decrypt(self, data, desc='data', hash_preset=None):
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ class Daemon(Lockable):
|
|||
print(cp)
|
||||
return cp
|
||||
|
||||
def run_cmd(self, cmd, silent=False, is_daemon=False, check_retcode=False):
|
||||
def run_cmd(self, cmd, *, silent=False, is_daemon=False, check_retcode=False):
|
||||
|
||||
if self.debug:
|
||||
msg('\n\n')
|
||||
|
|
@ -181,7 +181,7 @@ class Daemon(Lockable):
|
|||
def pre_start(self):
|
||||
pass
|
||||
|
||||
def start(self, quiet=False, silent=False):
|
||||
def start(self, *, quiet=False, silent=False):
|
||||
if self.state == 'ready':
|
||||
if not (quiet or silent):
|
||||
msg(self.state_msg(extra_text='already'))
|
||||
|
|
@ -200,7 +200,7 @@ class Daemon(Lockable):
|
|||
|
||||
return ret
|
||||
|
||||
def stop(self, quiet=False, silent=False):
|
||||
def stop(self, *, quiet=False, silent=False):
|
||||
if self.state == 'ready':
|
||||
if not silent:
|
||||
msg(f'Stopping {self.desc} on port {self.bind_port}')
|
||||
|
|
@ -221,7 +221,7 @@ class Daemon(Lockable):
|
|||
msg(f'{self.desc} on port {self.bind_port} not running')
|
||||
return True
|
||||
|
||||
def restart(self, silent=False):
|
||||
def restart(self, *, silent=False):
|
||||
self.stop(silent=silent)
|
||||
return self.start(silent=silent)
|
||||
|
||||
|
|
@ -339,6 +339,7 @@ class CoinDaemon(Daemon):
|
|||
|
||||
def __new__(cls,
|
||||
cfg,
|
||||
*,
|
||||
network_id = None,
|
||||
proto = None,
|
||||
opts = None,
|
||||
|
|
@ -384,6 +385,7 @@ class CoinDaemon(Daemon):
|
|||
|
||||
def __init__(self,
|
||||
cfg,
|
||||
*,
|
||||
network_id = None,
|
||||
proto = None,
|
||||
opts = None,
|
||||
|
|
|
|||
|
|
@ -131,7 +131,7 @@ class MMGenObjectMethods: # mixin class for MMGenObject
|
|||
def isScalar(obj):
|
||||
return isinstance(obj, scalars)
|
||||
|
||||
def do_list(out, e, lvl=0, is_dict=False):
|
||||
def do_list(out, e, *, lvl=0, is_dict=False):
|
||||
out.append('\n')
|
||||
for i in e:
|
||||
el = i if not is_dict else e[i]
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ from .util import die, get_extension
|
|||
|
||||
class File:
|
||||
|
||||
def __init__(self, fn, write=False):
|
||||
def __init__(self, fn, *, write=False):
|
||||
|
||||
self.name = fn
|
||||
self.dirname = os.path.dirname(fn)
|
||||
|
|
@ -66,21 +66,21 @@ class File:
|
|||
|
||||
class FileList(list):
|
||||
|
||||
def __init__(self, fns, write=False):
|
||||
def __init__(self, fns, *, write=False):
|
||||
list.__init__(
|
||||
self,
|
||||
[File(fn, write) for fn in fns])
|
||||
[File(fn, write=write) for fn in fns])
|
||||
|
||||
def names(self):
|
||||
return [f.name for f in self]
|
||||
|
||||
def sort_by_age(self, key='mtime', reverse=False):
|
||||
def sort_by_age(self, *, key='mtime', reverse=False):
|
||||
assert key in ('atime', 'ctime', 'mtime'), f'{key!r}: invalid sort key'
|
||||
self.sort(key=lambda a: getattr(a, key), reverse=reverse)
|
||||
|
||||
class MMGenFile(File):
|
||||
|
||||
def __init__(self, fn, base_class=None, subclass=None, proto=None, write=False):
|
||||
def __init__(self, fn, *, base_class=None, subclass=None, proto=None, write=False):
|
||||
"""
|
||||
'base_class' - a base class with an 'ext_to_cls' method
|
||||
'subclass' - a subclass with an 'ext' attribute
|
||||
|
|
@ -91,7 +91,7 @@ class MMGenFile(File):
|
|||
attribute to True.
|
||||
"""
|
||||
|
||||
super().__init__(fn, write)
|
||||
super().__init__(fn, write=write)
|
||||
|
||||
assert (subclass or base_class) and not (subclass and base_class), 'MMGenFile chk1'
|
||||
|
||||
|
|
@ -107,12 +107,12 @@ class MMGenFile(File):
|
|||
|
||||
class MMGenFileList(FileList):
|
||||
|
||||
def __init__(self, fns, base_class, proto=None, write=False):
|
||||
def __init__(self, fns, base_class, *, proto=None, write=False):
|
||||
list.__init__(
|
||||
self,
|
||||
[MMGenFile(fn, base_class=base_class, proto=proto, write=write) for fn in fns])
|
||||
|
||||
def find_files_in_dir(subclass, fdir, no_dups=False):
|
||||
def find_files_in_dir(subclass, fdir, *, no_dups=False):
|
||||
|
||||
assert isinstance(subclass, type), f'{subclass}: not a class'
|
||||
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ def check_binary(args):
|
|||
die(2, f'{args[0]!r} binary missing, not in path, or not executable')
|
||||
set_vt100()
|
||||
|
||||
def shred_file(fn, verbose=False):
|
||||
def shred_file(fn, *, verbose=False):
|
||||
check_binary(['shred', '--version'])
|
||||
from subprocess import run
|
||||
run(
|
||||
|
|
@ -67,7 +67,7 @@ def shred_file(fn, verbose=False):
|
|||
check=True)
|
||||
set_vt100()
|
||||
|
||||
def _check_file_type_and_access(fname, ftype, blkdev_ok=False):
|
||||
def _check_file_type_and_access(fname, ftype, *, blkdev_ok=False):
|
||||
|
||||
import stat
|
||||
|
||||
|
|
@ -103,16 +103,16 @@ def _check_file_type_and_access(fname, ftype, blkdev_ok=False):
|
|||
|
||||
return True
|
||||
|
||||
def check_infile(f, blkdev_ok=False):
|
||||
def check_infile(f, *, blkdev_ok=False):
|
||||
return _check_file_type_and_access(f, 'input file', blkdev_ok=blkdev_ok)
|
||||
|
||||
def check_outfile(f, blkdev_ok=False):
|
||||
def check_outfile(f, *, blkdev_ok=False):
|
||||
return _check_file_type_and_access(f, 'output file', blkdev_ok=blkdev_ok)
|
||||
|
||||
def check_outdir(f):
|
||||
return _check_file_type_and_access(f, 'output directory')
|
||||
|
||||
def get_seed_file(cfg, nargs, wallets=None, invoked_as=None):
|
||||
def get_seed_file(cfg, *, nargs, wallets=None, invoked_as=None):
|
||||
|
||||
wallets = wallets or cfg._args
|
||||
|
||||
|
|
@ -137,7 +137,7 @@ def get_seed_file(cfg, nargs, wallets=None, invoked_as=None):
|
|||
|
||||
return str(wallets[0]) if wallets else (wf, None)[wd_from_opt] # could be a Path instance
|
||||
|
||||
def _open_or_die(filename, mode, silent=False):
|
||||
def _open_or_die(filename, mode, *, silent=False):
|
||||
try:
|
||||
return open(filename, mode)
|
||||
except:
|
||||
|
|
@ -152,6 +152,7 @@ def write_data_to_file(
|
|||
cfg,
|
||||
outfile,
|
||||
data,
|
||||
*,
|
||||
desc = 'data',
|
||||
ask_write = False,
|
||||
ask_write_prompt = '',
|
||||
|
|
@ -279,7 +280,7 @@ def write_data_to_file(
|
|||
else:
|
||||
do_file(outfile, ask_write_prompt)
|
||||
|
||||
def get_words_from_file(cfg, infile, desc, quiet=False):
|
||||
def get_words_from_file(cfg, infile, *, desc, quiet=False):
|
||||
|
||||
if not quiet:
|
||||
cfg._util.qmsg(f'Getting {desc} from file ‘{infile}’')
|
||||
|
|
@ -299,6 +300,7 @@ def get_words_from_file(cfg, infile, desc, quiet=False):
|
|||
def get_data_from_file(
|
||||
cfg,
|
||||
infile,
|
||||
*,
|
||||
desc = 'data',
|
||||
dash = False,
|
||||
silent = False,
|
||||
|
|
@ -326,6 +328,7 @@ def get_data_from_file(
|
|||
def get_lines_from_file(
|
||||
cfg,
|
||||
fn,
|
||||
*,
|
||||
desc = 'data',
|
||||
trim_comments = False,
|
||||
quiet = False,
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class help_notes:
|
|||
def account_info_desc(self):
|
||||
return 'unspent outputs' if self.proto.base_proto == 'Bitcoin' else 'account info'
|
||||
|
||||
def fee_spec_letters(self, use_quotes=False):
|
||||
def fee_spec_letters(self, *, use_quotes=False):
|
||||
cu = self.proto.coin_amt.units
|
||||
sep, conj = ((',', ' or '), ("','", "' or '"))[use_quotes]
|
||||
return sep.join(u[0] for u in cu[:-1]) + ('', conj)[len(cu)>1] + cu[-1][0]
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ from ..daemon import CoinDaemon
|
|||
def help(proto, cfg):
|
||||
|
||||
def coind_exec():
|
||||
return CoinDaemon(cfg, proto.coin).exec_fn if proto.coin in CoinDaemon.coins else 'bitcoind'
|
||||
return CoinDaemon(cfg, network_id=proto.coin).exec_fn if proto.coin in CoinDaemon.coins else 'bitcoind'
|
||||
|
||||
return """
|
||||
Transactions may contain both {pnm} or non-{pnm} input addresses.
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ class keygen_base:
|
|||
return None
|
||||
|
||||
@classmethod
|
||||
def get_clsname(cls, cfg, silent=False):
|
||||
def get_clsname(cls, cfg, *, silent=False):
|
||||
return cls.__name__
|
||||
|
||||
backend_data = {
|
||||
|
|
@ -104,7 +104,7 @@ def check_backend(cfg, proto, backend, addr_type):
|
|||
|
||||
return _check_backend(cfg, backend, pubkey_type, desc='--keygen-backend parameter')
|
||||
|
||||
def KeyGenerator(cfg, proto, pubkey_type, backend=None, silent=False):
|
||||
def KeyGenerator(cfg, proto, pubkey_type, *, backend=None, silent=False):
|
||||
"""
|
||||
factory function returning a key generator backend for the specified pubkey type
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ class LEDControl:
|
|||
trigger = '/tmp/led_trigger'),
|
||||
}
|
||||
|
||||
def __init__(self, enabled, simulate=False, debug=False):
|
||||
def __init__(self, *, enabled, simulate=False, debug=False):
|
||||
|
||||
self.enabled = enabled
|
||||
self.debug = debug or simulate
|
||||
|
|
|
|||
|
|
@ -141,12 +141,12 @@ if cfg.keygen_backend:
|
|||
idxs = addrlist.AddrIdxList(fmt_str=cfg._args.pop())
|
||||
|
||||
from .fileutil import get_seed_file
|
||||
sf = get_seed_file(cfg, 1)
|
||||
sf = get_seed_file(cfg, nargs=1)
|
||||
|
||||
from .ui import do_license_msg
|
||||
do_license_msg(cfg)
|
||||
|
||||
ss = Wallet(cfg, sf)
|
||||
ss = Wallet(cfg, fn=sf)
|
||||
|
||||
ss_seed = ss.seed if cfg.subwallet is None else ss.seed.subseed(cfg.subwallet, print_msg=True)
|
||||
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ addrimport_msgs = {
|
|||
def parse_cmd_args(rpc, cmd_args):
|
||||
|
||||
def import_mmgen_list(infile):
|
||||
return (AddrList, KeyAddrList)[bool(cfg.keyaddr_file)](cfg, proto, infile)
|
||||
return (AddrList, KeyAddrList)[bool(cfg.keyaddr_file)](cfg, proto, infile=infile)
|
||||
|
||||
if len(cmd_args) == 1:
|
||||
infile = cmd_args[0]
|
||||
|
|
@ -97,7 +97,7 @@ def parse_cmd_args(rpc, cmd_args):
|
|||
addrlist = get_lines_from_file(
|
||||
cfg,
|
||||
infile,
|
||||
f'non-{gc.proj_name} addresses',
|
||||
desc = f'non-{gc.proj_name} addresses',
|
||||
trim_comments = True))
|
||||
else:
|
||||
al = import_mmgen_list(infile)
|
||||
|
|
|
|||
|
|
@ -146,7 +146,7 @@ pw_idxs = AddrIdxList(fmt_str=cfg._args.pop())
|
|||
pw_id_str = cfg._args.pop()
|
||||
|
||||
from .fileutil import get_seed_file
|
||||
sf = get_seed_file(cfg, 1)
|
||||
sf = get_seed_file(cfg, nargs=1)
|
||||
|
||||
pw_fmt = cfg.passwd_fmt or PasswordList.dfl_pw_fmt
|
||||
pw_len = pwi[pw_fmt].dfl_len // 2 if cfg.passwd_len in ('h', 'H') else cfg.passwd_len
|
||||
|
|
@ -165,7 +165,7 @@ PasswordList(
|
|||
from .ui import do_license_msg
|
||||
do_license_msg(cfg)
|
||||
|
||||
ss = Wallet(cfg, sf)
|
||||
ss = Wallet(cfg, fn=sf)
|
||||
|
||||
al = PasswordList(
|
||||
cfg = cfg,
|
||||
|
|
|
|||
|
|
@ -131,7 +131,7 @@ do_license_msg(cfg)
|
|||
cfg._util.qmsg('Input files:\n {}\n'.format('\n '.join(cfg._args)))
|
||||
|
||||
shares = [Wallet(cfg).seed] if cfg.hidden_incog_input_params else []
|
||||
shares += [Wallet(cfg,fn).seed for fn in cfg._args]
|
||||
shares += [Wallet(cfg, fn=fn).seed for fn in cfg._args]
|
||||
|
||||
if cfg.master_share:
|
||||
share1 = SeedShareMasterJoining(cfg, master_idx, shares[0], id_str, len(shares)).derived_seed
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ mods = {
|
|||
def get_cmds():
|
||||
return [cmd for mod, cmds in mods.items() if mod != 'help' for cmd in cmds]
|
||||
|
||||
def create_call_sig(cmd, cls, as_string=False):
|
||||
def create_call_sig(cmd, cls, *, as_string=False):
|
||||
|
||||
m = getattr(cls, cmd)
|
||||
|
||||
|
|
@ -189,8 +189,11 @@ def create_call_sig(cmd, cls, as_string=False):
|
|||
args, dfls, ann = va['args'], va['dfls'], va['annots']
|
||||
else:
|
||||
flag = None
|
||||
args = m.__code__.co_varnames[1:m.__code__.co_argcount]
|
||||
dfls = m.__defaults__ or ()
|
||||
c = m.__code__
|
||||
args = c.co_varnames[1:c.co_argcount + c.co_posonlyargcount + c.co_kwonlyargcount]
|
||||
dfls = (
|
||||
(m.__defaults__ or ()) +
|
||||
tuple(m.__kwdefaults__[k] for k in args if k in (m.__kwdefaults__ or ())))
|
||||
ann = m.__annotations__
|
||||
|
||||
nargs = len(args) - len(dfls)
|
||||
|
|
@ -295,7 +298,7 @@ def process_args(cmd, cmd_args, cls):
|
|||
|
||||
return (args, kwargs)
|
||||
|
||||
def process_result(ret, pager=False, print_result=False):
|
||||
def process_result(ret, *, pager=False, print_result=False):
|
||||
"""
|
||||
Convert result to something suitable for output to screen and return it.
|
||||
If result is bytes and not convertible to utf8, output as binary using os.write().
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ if cmd_args:
|
|||
cfg._usage()
|
||||
check_infile(cmd_args[0])
|
||||
|
||||
sf = get_seed_file(cfg, nargs, invoked_as=invoked_as)
|
||||
sf = get_seed_file(cfg, nargs=nargs, invoked_as=invoked_as)
|
||||
|
||||
if invoked_as != 'chk':
|
||||
from .ui import do_license_msg
|
||||
|
|
@ -212,7 +212,7 @@ if invoked_as == 'subgen':
|
|||
cfg = cfg,
|
||||
seed_bin = ss_in.seed.subseed(ss_idx, print_msg=True).data)
|
||||
elif invoked_as == 'seedsplit':
|
||||
shares = ss_in.seed.split(sss.count, sss.id, master_share)
|
||||
shares = ss_in.seed.split(sss.count, id_str=sss.id, master_idx=master_share)
|
||||
seed_out = shares.get_share_by_idx(sss.idx, base_seed=True)
|
||||
msg(seed_out.get_desc(ui=True))
|
||||
ss_out = Wallet(
|
||||
|
|
|
|||
|
|
@ -323,7 +323,7 @@ class MnemonicEntry:
|
|||
time.sleep(self.cfg.err_disp_timeout)
|
||||
msg_r(erase)
|
||||
|
||||
def get_mnemonic_from_user(self, mn_len, validate=True):
|
||||
def get_mnemonic_from_user(self, mn_len, *, validate=True):
|
||||
mll = list(self.bconv.seedlen_map_rev)
|
||||
assert mn_len in mll, f'{mn_len}: invalid mnemonic length (must be one of {mll})'
|
||||
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ class coin_msg:
|
|||
coin, network = network_id.split('_')
|
||||
return init_proto(cfg=cfg, coin=coin, network=network)
|
||||
|
||||
def write_to_file(self, outdir=None, ask_overwrite=False):
|
||||
def write_to_file(self, *, outdir=None, ask_overwrite=False):
|
||||
data = {
|
||||
'id': f'{gc.proj_name} {self.desc}',
|
||||
'metadata': self.data,
|
||||
|
|
|
|||
10
mmgen/obj.py
10
mmgen/obj.py
|
|
@ -102,7 +102,7 @@ class ImmutableAttr: # Descriptor
|
|||
"""
|
||||
ok_dtypes = (type, type(None), type(lambda:0))
|
||||
|
||||
def __init__(self, dtype, typeconv=True, set_none_ok=False, include_proto=False):
|
||||
def __init__(self, dtype, *, typeconv=True, set_none_ok=False, include_proto=False):
|
||||
self.set_none_ok = set_none_ok
|
||||
self.typeconv = typeconv
|
||||
|
||||
|
|
@ -154,7 +154,7 @@ class ListItemAttr(ImmutableAttr):
|
|||
For attributes that might not be present in the data instance
|
||||
Reassignment or deletion allowed if specified
|
||||
"""
|
||||
def __init__(self, dtype, typeconv=True, include_proto=False, reassign_ok=False, delete_ok=False):
|
||||
def __init__(self, dtype, *, typeconv=True, include_proto=False, reassign_ok=False, delete_ok=False):
|
||||
self.reassign_ok = reassign_ok
|
||||
self.delete_ok = delete_ok
|
||||
ImmutableAttr.__init__(self, dtype, typeconv=typeconv, include_proto=include_proto)
|
||||
|
|
@ -280,10 +280,10 @@ class Int(int, Hilite, InitErrors):
|
|||
return cls.init_fail(e, n)
|
||||
|
||||
@classmethod
|
||||
def fmtc(cls, s, width, color=False):
|
||||
def fmtc(cls, s, width, *, color=False):
|
||||
return super().fmtc(str(s), width=width, color=color)
|
||||
|
||||
def fmt(self, width, color=False):
|
||||
def fmt(self, width, *, color=False):
|
||||
return super().fmtc(str(self), width=width, color=color)
|
||||
|
||||
def hl(self, **kwargs):
|
||||
|
|
@ -324,7 +324,7 @@ class HexStr(HiliteStr, InitErrors):
|
|||
except Exception as e:
|
||||
return cls.init_fail(e, s)
|
||||
|
||||
def truncate(self, width, color=True):
|
||||
def truncate(self, width, *, color=True):
|
||||
return self.colorize(
|
||||
self if width >= self.width else self[:width-2] + '..',
|
||||
color = color)
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ class Hilite:
|
|||
|
||||
# class method equivalent of fmt()
|
||||
@classmethod
|
||||
def fmtc(cls, s, width, color=False):
|
||||
def fmtc(cls, s, width, *, color=False):
|
||||
if len(s) > width:
|
||||
assert cls.trunc_ok, "If 'trunc_ok' is false, 'width' must be >= width of string"
|
||||
return cls.colorize(s[:width].ljust(width), color=color)
|
||||
|
|
@ -57,21 +57,21 @@ class Hilite:
|
|||
return cls.colorize(s.ljust(width), color=color)
|
||||
|
||||
@classmethod
|
||||
def hlc(cls, s, color=True):
|
||||
def hlc(cls, s, *, color=True):
|
||||
return getattr(color_mod, cls.color)(s) if color else s
|
||||
|
||||
@classmethod
|
||||
def colorize(cls, s, color=True):
|
||||
def colorize(cls, s, *, color=True):
|
||||
return getattr(color_mod, cls.color)(s) if color else s
|
||||
|
||||
@classmethod
|
||||
def colorize2(cls, s, color=True, color_override=''):
|
||||
def colorize2(cls, s, *, color=True, color_override=''):
|
||||
return getattr(color_mod, color_override or cls.color)(s) if color else s
|
||||
|
||||
class HiliteStr(str, Hilite):
|
||||
|
||||
# supports single-width characters only
|
||||
def fmt(self, width, color=False):
|
||||
def fmt(self, width, *, color=False):
|
||||
if len(self) > width:
|
||||
assert self.trunc_ok, "If 'trunc_ok' is false, 'width' must be >= width of string"
|
||||
return self.colorize(self[:width].ljust(width), color=color)
|
||||
|
|
@ -82,6 +82,7 @@ class HiliteStr(str, Hilite):
|
|||
def fmt2(
|
||||
self,
|
||||
width, # screen width - must be at least 2 (one wide char)
|
||||
*,
|
||||
color = False,
|
||||
encl = '', # if set, must be exactly 2 single-width chars
|
||||
nullrepl = '',
|
||||
|
|
@ -112,12 +113,12 @@ class HiliteStr(str, Hilite):
|
|||
else:
|
||||
return self.colorize2(s.ljust(width-s_wide_count), color=color, color_override=color_override)
|
||||
|
||||
def hl(self, color=True):
|
||||
def hl(self, *, color=True):
|
||||
return getattr(color_mod, self.color)(self) if color else self
|
||||
|
||||
# an alternative to hl(), with enclosure and color override
|
||||
# can be called as an unbound method with class as first argument
|
||||
def hl2(self, s=None, color=True, encl='', color_override=''):
|
||||
def hl2(self, s=None, *, color=True, encl='', color_override=''):
|
||||
if encl:
|
||||
return self.colorize2(encl[0]+(s or self)+encl[1], color=color, color_override=color_override)
|
||||
else:
|
||||
|
|
@ -126,7 +127,7 @@ class HiliteStr(str, Hilite):
|
|||
class InitErrors:
|
||||
|
||||
@classmethod
|
||||
def init_fail(cls, e, m, e2=None, m2=None, objname=None, preformat=False):
|
||||
def init_fail(cls, e, m, *, e2=None, m2=None, objname=None, preformat=False):
|
||||
|
||||
def get_errmsg():
|
||||
ret = m if preformat else (
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@ class PasswordList(AddrList):
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
infile = None,
|
||||
seed = None,
|
||||
pw_idxs = None,
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ class MacOSRamDisk:
|
|||
def get_diskutil_size(self):
|
||||
return get_device_size(self.label) // (2**20)
|
||||
|
||||
def create(self, quiet=False):
|
||||
def create(self, *, quiet=False):
|
||||
redir = DEVNULL if quiet else None
|
||||
if self.exists():
|
||||
diskutil_size = self.get_diskutil_size()
|
||||
|
|
@ -81,7 +81,7 @@ class MacOSRamDisk:
|
|||
self.path.mkdir(parents=True, exist_ok=True)
|
||||
run(['diskutil', 'mount', '-mountPoint', str(self.path.absolute()), self.label], stdout=redir, check=True)
|
||||
|
||||
def destroy(self, quiet=False):
|
||||
def destroy(self, *, quiet=False):
|
||||
if not self.exists():
|
||||
self.cfg._util.qmsg(f'{self.desc.capitalize()} {self.label.hl()} at path {self.path} not found')
|
||||
return
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ class MMGenRegtest(MMGenObject):
|
|||
'bch': 'n2fxhNx27GhHAWQhyuZ5REcBNrJqCJsJ12',
|
||||
}
|
||||
|
||||
def __init__(self, cfg, coin, bdb_wallet=False):
|
||||
def __init__(self, cfg, coin, *, bdb_wallet=False):
|
||||
self.cfg = cfg
|
||||
self.coin = coin.lower()
|
||||
self.bdb_wallet = bdb_wallet
|
||||
|
|
@ -83,7 +83,7 @@ class MMGenRegtest(MMGenObject):
|
|||
self.proto = init_proto(cfg, self.coin, regtest=True, need_amt=True)
|
||||
self.d = CoinDaemon(
|
||||
cfg,
|
||||
self.coin + '_rt',
|
||||
network_id = self.coin + '_rt',
|
||||
test_suite = cfg.test_suite,
|
||||
opts = ['bdb_wallet'] if bdb_wallet else None)
|
||||
|
||||
|
|
@ -116,7 +116,7 @@ class MMGenRegtest(MMGenObject):
|
|||
t.addrtype = 'compressed' if self.proto.coin == 'BCH' else 'bech32'
|
||||
return t.hex2wif(self.bdb_hdseed)
|
||||
|
||||
async def generate(self, blocks=1, silent=False):
|
||||
async def generate(self, blocks=1, *, silent=False):
|
||||
|
||||
blocks = int(blocks)
|
||||
|
||||
|
|
@ -194,11 +194,11 @@ class MMGenRegtest(MMGenObject):
|
|||
msg('Stopping regtest daemon')
|
||||
await self.rpc_call('stop')
|
||||
|
||||
def init_daemon(self, reindex=False):
|
||||
def init_daemon(self, *, reindex=False):
|
||||
if reindex:
|
||||
self.d.usr_coind_args.append('--reindex')
|
||||
|
||||
async def start_daemon(self, reindex=False, silent=True):
|
||||
async def start_daemon(self, *, reindex=False, silent=True):
|
||||
self.init_daemon(reindex=reindex)
|
||||
self.d.start(silent=silent)
|
||||
for user in ('miner', 'bob', 'alice'):
|
||||
|
|
@ -260,7 +260,7 @@ class MMGenRegtest(MMGenObject):
|
|||
|
||||
async def fork(self, coin): # currently disabled
|
||||
|
||||
proto = init_proto(self.cfg, coin, False)
|
||||
proto = init_proto(self.cfg, coin, testnet=False)
|
||||
if not [f for f in proto.forks if f[2] == proto.coin.lower() and f[3] is True]:
|
||||
die(1, f'Coin {proto.coin} is not a replayable fork of coin {coin}')
|
||||
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ class CallSigs:
|
|||
def createwallet(
|
||||
self,
|
||||
wallet_name,
|
||||
*,
|
||||
no_keys = True,
|
||||
blank = True,
|
||||
passphrase = '',
|
||||
|
|
@ -86,6 +87,7 @@ class CallSigs:
|
|||
def createwallet(
|
||||
self,
|
||||
wallet_name,
|
||||
*,
|
||||
no_keys = True,
|
||||
blank = True,
|
||||
passphrase = '',
|
||||
|
|
@ -269,7 +271,7 @@ class BitcoinRPCClient(RPCClient, metaclass=AsyncInit):
|
|||
fn = self.get_daemon_cfg_fn()
|
||||
try:
|
||||
lines = get_lines_from_file(
|
||||
self.cfg, fn, 'daemon config file', silent=not self.cfg.verbose)
|
||||
self.cfg, fn, desc='daemon config file', silent=not self.cfg.verbose)
|
||||
except:
|
||||
self.cfg._util.vmsg(f'Warning: {fn!r} does not exist or is unreadable')
|
||||
return dict((k, None) for k in req_keys)
|
||||
|
|
@ -289,7 +291,7 @@ class BitcoinRPCClient(RPCClient, metaclass=AsyncInit):
|
|||
def get_daemon_auth_cookie(self):
|
||||
fn = self.daemon.auth_cookie_fn
|
||||
return get_lines_from_file(
|
||||
self.cfg, fn, 'cookie', quiet=True)[0] if os.access(fn, os.R_OK) else ''
|
||||
self.cfg, fn, desc='cookie', quiet=True)[0] if os.access(fn, os.R_OK) else ''
|
||||
|
||||
def info(self, info_id):
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ class BitcoinTwCtl(TwCtl):
|
|||
raise NotImplementedError('not implemented')
|
||||
|
||||
@write_mode
|
||||
async def import_address(self, addr, label, rescan=False):
|
||||
async def import_address(self, addr, *, label, rescan=False):
|
||||
if (await self.rpc.walletinfo).get('descriptors'):
|
||||
return await self.batch_import_address([(addr, label, rescan)])
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ def DeserializeTX(proto, txhex):
|
|||
def bytes2coin_amt(bytes_le):
|
||||
return proto.coin_amt(bytes2int(bytes_le), from_unit='satoshi')
|
||||
|
||||
def bshift(n, skip=False, sub_null=False):
|
||||
def bshift(n, *, skip=False, sub_null=False):
|
||||
nonlocal idx, raw_tx
|
||||
ret = tx[idx:idx+n]
|
||||
idx += n
|
||||
|
|
@ -87,7 +87,7 @@ def DeserializeTX(proto, txhex):
|
|||
|
||||
# https://bitcoin.org/en/developer-reference#compactsize-unsigned-integers
|
||||
# For example, the number 515 is encoded as 0xfd0302.
|
||||
def readVInt(skip=False):
|
||||
def readVInt(*, skip=False):
|
||||
nonlocal idx, raw_tx
|
||||
s = tx[idx]
|
||||
idx += 1
|
||||
|
|
|
|||
|
|
@ -145,7 +145,7 @@ class TxInfo(TxInfo):
|
|||
+ ''.join(format_io('inputs'))
|
||||
+ ''.join(format_io('outputs')))
|
||||
|
||||
def strfmt_locktime(self, locktime=None, terse=False):
|
||||
def strfmt_locktime(self, locktime=None, *, terse=False):
|
||||
# Locktime itself is an unsigned 4-byte integer which can be parsed two ways:
|
||||
#
|
||||
# If less than 500 million, locktime is parsed as a block height. The transaction can be
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ from .signed import Signed
|
|||
|
||||
class OnlineSigned(Signed, TxBase.OnlineSigned):
|
||||
|
||||
async def send(self, prompt_user=True):
|
||||
async def send(self, *, prompt_user=True):
|
||||
|
||||
self.check_correct_chain()
|
||||
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ class OpReturnData(bytes, InitErrors):
|
|||
self.display_hex = False
|
||||
return ret
|
||||
|
||||
def hl(self, add_label=False):
|
||||
def hl(self, *, add_label=False):
|
||||
'colorize and optionally label the result of str()'
|
||||
from ....color import blue, pink
|
||||
ret = str(self)
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ class TokenCommon(MMGenObject):
|
|||
int(parse_abi(data)[-1], 16) * self.base_unit,
|
||||
from_decimal = True)
|
||||
|
||||
async def do_call(self, method_sig, method_args='', toUnit=False):
|
||||
async def do_call(self, method_sig, method_args='', *, toUnit=False):
|
||||
data = self.create_method_id(method_sig) + method_args
|
||||
if self.cfg.debug:
|
||||
msg('ETH_CALL {}: {}'.format(
|
||||
|
|
|
|||
|
|
@ -174,12 +174,12 @@ class erigon_daemon(geth_daemon):
|
|||
test_suite = self.test_suite,
|
||||
datadir = self.datadir)
|
||||
|
||||
def start(self, quiet=False, silent=False):
|
||||
def start(self, *, quiet=False, silent=False):
|
||||
super().start(quiet=quiet, silent=silent)
|
||||
self.rpc_d.debug = self.debug
|
||||
return self.rpc_d.start(quiet=quiet, silent=silent)
|
||||
|
||||
def stop(self, quiet=False, silent=False):
|
||||
def stop(self, *, quiet=False, silent=False):
|
||||
self.rpc_d.debug = self.debug
|
||||
self.rpc_d.stop(quiet=quiet, silent=silent)
|
||||
return super().stop(quiet=quiet, silent=silent)
|
||||
|
|
@ -196,7 +196,7 @@ class erigon_rpcdaemon(RPCDaemon):
|
|||
use_pidfile = False
|
||||
use_threads = True
|
||||
|
||||
def __init__(self, cfg, proto, rpc_port, private_port, test_suite, datadir):
|
||||
def __init__(self, cfg, proto, *, rpc_port, private_port, test_suite, datadir):
|
||||
|
||||
self.proto = proto
|
||||
self.test_suite = test_suite
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ proto.eth.misc: miscellaneous utilities for Ethereum base protocol
|
|||
|
||||
from ...util2 import get_keccak
|
||||
|
||||
def decrypt_geth_keystore(cfg, wallet_fn, passwd, check_addr=True):
|
||||
def decrypt_geth_keystore(cfg, wallet_fn, passwd, *, check_addr=True):
|
||||
"""
|
||||
Decrypt the encrypted private key in a Geth keystore wallet, returning the decrypted key
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -88,13 +88,13 @@ class EthereumTwCtl(TwCtl):
|
|||
|
||||
@write_mode
|
||||
async def batch_import_address(self, args_list):
|
||||
return [await self.import_address(*a) for a in args_list]
|
||||
return [await self.import_address(a, label=b, rescan=c) for a, b, c in args_list]
|
||||
|
||||
async def rescan_addresses(self, coin_addrs):
|
||||
pass
|
||||
|
||||
@write_mode
|
||||
async def import_address(self, addr, label, rescan=False):
|
||||
async def import_address(self, addr, *, label, rescan=False):
|
||||
r = self.data_root
|
||||
if addr in r:
|
||||
if not r[addr]['mmid'] and label.mmid:
|
||||
|
|
@ -174,7 +174,7 @@ class EthereumTokenTwCtl(EthereumTwCtl):
|
|||
symbol = None
|
||||
cur_eth_balances = {}
|
||||
|
||||
async def __init__(self, cfg, proto, mode='r', token_addr=None, no_rpc=False):
|
||||
async def __init__(self, cfg, proto, *, mode='r', token_addr=None, no_rpc=False):
|
||||
|
||||
await super().__init__(cfg, proto, mode=mode, no_rpc=no_rpc)
|
||||
|
||||
|
|
@ -215,13 +215,13 @@ class EthereumTokenTwCtl(EthereumTwCtl):
|
|||
async def rpc_get_balance(self, addr):
|
||||
return await Token(self.cfg, self.proto, self.token, self.decimals, self.rpc).get_balance(addr)
|
||||
|
||||
async def get_eth_balance(self, addr, force_rpc=False):
|
||||
async def get_eth_balance(self, addr, *, force_rpc=False):
|
||||
cache = self.cur_eth_balances
|
||||
r = self.data['accounts']
|
||||
ret = None if force_rpc else self.get_cached_balance(addr, cache, r)
|
||||
if ret is None:
|
||||
ret = await super().rpc_get_balance(addr)
|
||||
self.cache_balance(addr, ret, cache, r)
|
||||
self.cache_balance(addr, ret, session_cache=cache, data_root=r)
|
||||
return ret
|
||||
|
||||
def get_param(self, param):
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ class EthereumTwJSON(TwJSON):
|
|||
|
||||
class Export(TwJSON.Export, Base):
|
||||
|
||||
async def get_entries(self, include_amts=True):
|
||||
async def get_entries(self, *, include_amts=True):
|
||||
|
||||
def gen_data(data):
|
||||
for k, v in data.items():
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ class EthereumTwUnspentOutputs(EthereumTwView, TwUnspentOutputs):
|
|||
interactive = interactive,
|
||||
)
|
||||
|
||||
def do_sort(self, key=None, reverse=False):
|
||||
def do_sort(self, key=None, *, reverse=False):
|
||||
if key == 'txid':
|
||||
return
|
||||
super().do_sort(key=key, reverse=reverse)
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from .signed import Signed, TokenSigned
|
|||
|
||||
class OnlineSigned(Signed, TxBase.OnlineSigned):
|
||||
|
||||
async def send(self, prompt_user=True):
|
||||
async def send(self, *, prompt_user=True):
|
||||
|
||||
self.check_correct_chain()
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ class backend:
|
|||
compressed = privkey.compressed)
|
||||
|
||||
@classmethod
|
||||
def get_clsname(cls, cfg, silent=False):
|
||||
def get_clsname(cls, cfg, *, silent=False):
|
||||
try:
|
||||
from .secp256k1 import pubkey_gen
|
||||
if not pubkey_gen(bytes.fromhex('deadbeef'*8), 1):
|
||||
|
|
@ -67,7 +67,7 @@ class backend:
|
|||
Uncompressed public keys start with 0x04; compressed public keys begin with 0x03 or
|
||||
0x02 depending on whether they're greater or less than the midpoint of the curve.
|
||||
"""
|
||||
def privnum2pubkey(numpriv, compressed=False):
|
||||
def privnum2pubkey(numpriv, *, compressed=False):
|
||||
pk = self.ecdsa.SigningKey.from_secret_exponent(numpriv, curve=self.ecdsa.SECP256k1)
|
||||
# vk_bytes = x (32 bytes) + y (32 bytes) (unsigned big-endian)
|
||||
return pubkey_format(pk.verifying_key.to_string(), compressed)
|
||||
|
|
|
|||
|
|
@ -104,6 +104,7 @@ class MoneroWalletDaemon(RPCDaemon):
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
wallet_dir = None,
|
||||
test_suite = False,
|
||||
user = None,
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ class MoneroRPCClient(RPCClient):
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
host,
|
||||
port,
|
||||
user,
|
||||
|
|
@ -42,7 +43,7 @@ class MoneroRPCClient(RPCClient):
|
|||
if host.endswith('.onion'):
|
||||
self.network_proto = 'http'
|
||||
|
||||
super().__init__(cfg, host, port, test_connection)
|
||||
super().__init__(cfg, host, port, test_connection=test_connection)
|
||||
|
||||
if self.auth_type:
|
||||
self.auth = auth_data(user, passwd)
|
||||
|
|
@ -90,7 +91,7 @@ class MoneroRPCClient(RPCClient):
|
|||
host_path = f'/{method}'
|
||||
), json_rpc=False)
|
||||
|
||||
async def do_stop_daemon(self, silent=False):
|
||||
async def do_stop_daemon(self, *, silent=False):
|
||||
return self.call_raw('stop_daemon') # unreliable on macOS (daemon stops, but closes connection)
|
||||
|
||||
rpcmethods = ('get_info',)
|
||||
|
|
@ -100,7 +101,7 @@ class MoneroWalletRPCClient(MoneroRPCClient):
|
|||
|
||||
auth_type = 'digest'
|
||||
|
||||
def __init__(self, cfg, daemon, test_connection=True):
|
||||
def __init__(self, cfg, daemon, *, test_connection=True):
|
||||
|
||||
RPCClient.__init__(
|
||||
self = self,
|
||||
|
|
@ -128,7 +129,7 @@ class MoneroWalletRPCClient(MoneroRPCClient):
|
|||
def call_raw(self, *args, **kwargs):
|
||||
raise NotImplementedError('call_raw() not implemented for class MoneroWalletRPCClient')
|
||||
|
||||
async def do_stop_daemon(self, silent=False):
|
||||
async def do_stop_daemon(self, *, silent=False):
|
||||
"""
|
||||
NB: the 'stop_wallet' RPC call closes the open wallet before shutting down the daemon,
|
||||
returning an error if no wallet is open
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ class CoinProtocol(MMGenObject):
|
|||
decimal_prec = 28
|
||||
_set_ok = ('tokensym',)
|
||||
|
||||
def __init__(self, cfg, coin, name, network, tokensym=None, need_amt=False):
|
||||
def __init__(self, cfg, coin, *, name, network, tokensym=None, need_amt=False):
|
||||
self.cfg = cfg
|
||||
self.coin = coin.upper()
|
||||
self.coin_id = self.coin
|
||||
|
|
@ -180,7 +180,7 @@ class CoinProtocol(MMGenObject):
|
|||
def viewkey(self, viewkey_str):
|
||||
raise NotImplementedError(f'{self.name} protocol does not support view keys')
|
||||
|
||||
def base_proto_subclass(self, cls, modname, sub_clsname=None, is_token=False):
|
||||
def base_proto_subclass(self, cls, modname, *, sub_clsname=None, is_token=False):
|
||||
"""
|
||||
magic module loading and class selection
|
||||
"""
|
||||
|
|
@ -281,6 +281,7 @@ class CoinProtocol(MMGenObject):
|
|||
def init_proto(
|
||||
cfg,
|
||||
coin = None,
|
||||
*,
|
||||
testnet = False,
|
||||
regtest = False,
|
||||
network = None,
|
||||
|
|
|
|||
13
mmgen/rpc.py
13
mmgen/rpc.py
|
|
@ -30,7 +30,7 @@ from .objmethods import HiliteStr, InitErrors, MMGenObject
|
|||
|
||||
auth_data = namedtuple('rpc_auth_data', ['user', 'passwd'])
|
||||
|
||||
def dmsg_rpc(fs, data=None, is_json=False):
|
||||
def dmsg_rpc(fs, data=None, *, is_json=False):
|
||||
msg(
|
||||
fs if data is None else
|
||||
fs.format(pp_fmt(json.loads(data) if is_json else data))
|
||||
|
|
@ -255,7 +255,7 @@ class RPCClient(MMGenObject):
|
|||
network_proto = 'http'
|
||||
proxy = None
|
||||
|
||||
def __init__(self, cfg, host, port, test_connection=True):
|
||||
def __init__(self, cfg, host, port, *, test_connection=True):
|
||||
|
||||
self.cfg = cfg
|
||||
self.name = type(self).__name__
|
||||
|
|
@ -376,7 +376,7 @@ class RPCClient(MMGenObject):
|
|||
timeout = timeout,
|
||||
wallet = wallet)
|
||||
|
||||
def process_http_resp(self, run_ret, batch=False, json_rpc=True):
|
||||
def process_http_resp(self, run_ret, *, batch=False, json_rpc=True):
|
||||
|
||||
def float_parser(n):
|
||||
return n
|
||||
|
|
@ -424,7 +424,7 @@ class RPCClient(MMGenObject):
|
|||
m = text
|
||||
die('RPCFailure', f'{s.value} {s.name}: {m}')
|
||||
|
||||
async def stop_daemon(self, quiet=False, silent=False):
|
||||
async def stop_daemon(self, *, quiet=False, silent=False):
|
||||
if self.daemon.state == 'ready':
|
||||
if not (quiet or silent):
|
||||
msg(f'Stopping {self.daemon.desc} on port {self.daemon.bind_port}')
|
||||
|
|
@ -437,10 +437,10 @@ class RPCClient(MMGenObject):
|
|||
msg(f'{self.daemon.desc} on port {self.daemon.bind_port} not running')
|
||||
return True
|
||||
|
||||
def start_daemon(self, silent=False):
|
||||
def start_daemon(self, *, silent=False):
|
||||
return self.daemon.start(silent=silent)
|
||||
|
||||
async def restart_daemon(self, quiet=False, silent=False):
|
||||
async def restart_daemon(self, *, quiet=False, silent=False):
|
||||
await self.stop_daemon(quiet=quiet, silent=silent)
|
||||
return self.daemon.start(silent=silent)
|
||||
|
||||
|
|
@ -467,6 +467,7 @@ class RPCClient(MMGenObject):
|
|||
async def rpc_init(
|
||||
cfg,
|
||||
proto = None,
|
||||
*,
|
||||
backend = None,
|
||||
daemon = None,
|
||||
ignore_daemon_version = False,
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class SeedShareList(SubSeedList):
|
|||
count = ImmutableAttr(SeedShareCount)
|
||||
id_str = ImmutableAttr(SeedSplitIDString)
|
||||
|
||||
def __init__(self, parent_seed, count, id_str=None, master_idx=None, debug_last_share=False):
|
||||
def __init__(self, parent_seed, count, *, id_str=None, master_idx=None, debug_last_share=False):
|
||||
self.member_type = SeedShare
|
||||
self.parent_seed = parent_seed
|
||||
self.id_str = id_str or 'default'
|
||||
|
|
@ -118,7 +118,7 @@ class SeedShareList(SubSeedList):
|
|||
B = self.join().data
|
||||
assert A == B, f'Data mismatch!\noriginal seed: {A!r}\nrejoined seed: {B!r}'
|
||||
|
||||
def get_share_by_idx(self, idx, base_seed=False):
|
||||
def get_share_by_idx(self, idx, *, base_seed=False):
|
||||
if idx < 1 or idx > self.count:
|
||||
die('RangeError', f'{idx}: share index out of range')
|
||||
elif idx == self.count:
|
||||
|
|
@ -129,7 +129,7 @@ class SeedShareList(SubSeedList):
|
|||
ss_idx = SubSeedIdx(str(idx) + 'L')
|
||||
return self.get_subseed_by_ss_idx(ss_idx)
|
||||
|
||||
def get_share_by_seed_id(self, sid, base_seed=False):
|
||||
def get_share_by_seed_id(self, sid, *, base_seed=False):
|
||||
if sid == self.data['long'].key(self.count-1):
|
||||
return self.last_share
|
||||
elif self.master_share and sid == self.data['long'].key(0):
|
||||
|
|
@ -181,7 +181,7 @@ class SeedShareBase(MMGenObject):
|
|||
def desc(self):
|
||||
return self.get_desc()
|
||||
|
||||
def get_desc(self, ui=False):
|
||||
def get_desc(self, *, ui=False):
|
||||
pl = self.parent_list
|
||||
mss = f', with master share #{pl.master_share.idx}' if pl.master_share else ''
|
||||
if ui:
|
||||
|
|
@ -274,7 +274,7 @@ class SeedShareMaster(SeedBase, SeedShareBase):
|
|||
scramble_key = id_str.encode() + b':' + count.to_bytes(2, 'big')
|
||||
return Crypto(self.cfg).scramble_seed(self.data, scramble_key)[:self.byte_len]
|
||||
|
||||
def get_desc(self, ui=False):
|
||||
def get_desc(self, *, ui=False):
|
||||
psid = self.parent_list.parent_seed.sid
|
||||
mss = f'master share #{self.idx} of '
|
||||
return yellow('(' + mss) + psid.hl() + yellow(')') if ui else mss + psid
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class Sha2:
|
|||
# First wordBits bits of the fractional parts of the cube roots of the first nRounds primes
|
||||
cls.K = tuple(getFractionalBits(cbrt(n)) for n in primes)
|
||||
|
||||
def __init__(self, message, preprocess=True):
|
||||
def __init__(self, message, *, preprocess=True):
|
||||
'Use preprocess=False for Sha256Compress'
|
||||
assert isinstance(message, (bytes, bytearray, list)), 'message must be of type bytes, bytearray or list'
|
||||
if not self.K:
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ class SubSeedList(MMGenObject):
|
|||
def __len__(self):
|
||||
return len(self.data['long'])
|
||||
|
||||
def get_subseed_by_ss_idx(self, ss_idx_in, print_msg=False):
|
||||
def get_subseed_by_ss_idx(self, ss_idx_in, *, print_msg=False):
|
||||
ss_idx = SubSeedIdx(ss_idx_in)
|
||||
if print_msg:
|
||||
msg_r('{} {} of {}...'.format(
|
||||
|
|
@ -122,7 +122,7 @@ class SubSeedList(MMGenObject):
|
|||
assert seed.sid == sid, f'{seed.sid} != {sid}: Seed ID mismatch!'
|
||||
return seed
|
||||
|
||||
def get_subseed_by_seed_id(self, sid, last_idx=None, print_msg=False):
|
||||
def get_subseed_by_seed_id(self, sid, *, last_idx=None, print_msg=False):
|
||||
|
||||
def get_existing_subseed_by_seed_id(sid):
|
||||
for k in ('long', 'short') if self.have_short else ('long',):
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class MMGenTerm:
|
|||
pass
|
||||
|
||||
@classmethod
|
||||
def init(cls, noecho=False):
|
||||
def init(cls, *, noecho=False):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
|
|
@ -93,7 +93,7 @@ class MMGenTermLinux(MMGenTerm):
|
|||
cls.cur_term = termios.tcgetattr(cls.stdin_fd)
|
||||
|
||||
@classmethod
|
||||
def init(cls, noecho=False):
|
||||
def init(cls, *, noecho=False):
|
||||
cls.stdin_fd = sys.stdin.fileno()
|
||||
cls.cur_term = termios.tcgetattr(cls.stdin_fd)
|
||||
if not hasattr(cls, 'orig_term'):
|
||||
|
|
@ -128,7 +128,7 @@ class MMGenTermLinux(MMGenTerm):
|
|||
break
|
||||
|
||||
@classmethod
|
||||
def get_char(cls, prompt='', immed_chars='', prehold_protect=True, num_bytes=5):
|
||||
def get_char(cls, prompt='', *, immed_chars='', prehold_protect=True, num_bytes=5):
|
||||
"""
|
||||
Use os.read(), not file.read(), to get a variable number of bytes without blocking.
|
||||
Request 5 bytes to cover escape sequences generated by F1, F2, .. Fn keys (5 bytes)
|
||||
|
|
@ -169,7 +169,7 @@ class MMGenTermLinuxStub(MMGenTermLinux):
|
|||
pass
|
||||
|
||||
@classmethod
|
||||
def init(cls, noecho=False):
|
||||
def init(cls, *, noecho=False):
|
||||
cls.stdin_fd = sys.stdin.fileno()
|
||||
|
||||
@classmethod
|
||||
|
|
@ -181,7 +181,7 @@ class MMGenTermLinuxStub(MMGenTermLinux):
|
|||
pass
|
||||
|
||||
@classmethod
|
||||
def get_char(cls, prompt='', immed_chars='', prehold_protect=None, num_bytes=5):
|
||||
def get_char(cls, prompt='', *, immed_chars='', prehold_protect=None, num_bytes=5):
|
||||
msg_r(prompt)
|
||||
return os.read(0, num_bytes).decode()
|
||||
|
||||
|
|
@ -230,7 +230,7 @@ class MMGenTermMSWin(MMGenTerm):
|
|||
return
|
||||
|
||||
@classmethod
|
||||
def get_char(cls, prompt='', immed_chars='', prehold_protect=True, num_bytes=None):
|
||||
def get_char(cls, prompt='', *, immed_chars='', prehold_protect=True, num_bytes=None):
|
||||
"""
|
||||
always return a single character, ignore num_bytes
|
||||
first character of 2-character sequence returned by F1-F12 keys is discarded
|
||||
|
|
@ -268,7 +268,7 @@ class MMGenTermMSWin(MMGenTerm):
|
|||
class MMGenTermMSWinStub(MMGenTermMSWin):
|
||||
|
||||
@classmethod
|
||||
def get_char(cls, prompt='', immed_chars='', prehold_protect=None, num_bytes=None):
|
||||
def get_char(cls, prompt='', *, immed_chars='', prehold_protect=None, num_bytes=None):
|
||||
"""
|
||||
Use stdin to allow UTF-8 and emulate the one-character behavior of MMGenTermMSWin
|
||||
"""
|
||||
|
|
@ -289,7 +289,7 @@ def get_term():
|
|||
'win32': (MMGenTermMSWin if sys.stdin.isatty() else MMGenTermMSWinStub),
|
||||
}[sys.platform]
|
||||
|
||||
def init_term(cfg, noecho=False):
|
||||
def init_term(cfg, *, noecho=False):
|
||||
|
||||
term = get_term()
|
||||
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ class tool_cmd(tool_cmd_base):
|
|||
gd.ag.to_segwit_redeem_script(data),
|
||||
gd.ag.to_addr(data))
|
||||
|
||||
def _privhex2out(self, privhex: 'sstr', output_pubhex=False):
|
||||
def _privhex2out(self, privhex: 'sstr', *, output_pubhex=False):
|
||||
gd = self._init_generators()
|
||||
pk = PrivKey(
|
||||
self.proto,
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ class tool_cmd(tool_cmd_base):
|
|||
kwargs = {'skip_chksum_msg':True}
|
||||
if not obj.__name__ == 'PasswordList':
|
||||
kwargs.update({'key_address_validity_check':False})
|
||||
ret = obj(self.cfg, self.proto, mmgen_addrfile, **kwargs)
|
||||
ret = obj(self.cfg, self.proto, infile=mmgen_addrfile, **kwargs)
|
||||
if self.cfg.verbose:
|
||||
from ..util import msg, capfirst
|
||||
if ret.al_id.mmtype.name == 'password':
|
||||
|
|
|
|||
|
|
@ -37,16 +37,16 @@ class tool_cmd(tool_cmd_base):
|
|||
"""
|
||||
def encrypt(self, infile: str, outfile='', hash_preset=''):
|
||||
"encrypt a file"
|
||||
data = get_data_from_file(self.cfg, infile, 'data for encryption', binary=True)
|
||||
data = get_data_from_file(self.cfg, infile, desc='data for encryption', binary=True)
|
||||
enc_d = Crypto(self.cfg).mmgen_encrypt(data, 'data', hash_preset)
|
||||
if not outfile:
|
||||
outfile = f'{os.path.basename(infile)}.{Crypto.mmenc_ext}'
|
||||
write_data_to_file(self.cfg, outfile, enc_d, 'encrypted data', binary=True)
|
||||
write_data_to_file(self.cfg, outfile, enc_d, desc='encrypted data', binary=True)
|
||||
return True
|
||||
|
||||
def decrypt(self, infile: str, outfile='', hash_preset=''):
|
||||
"decrypt a file"
|
||||
enc_d = get_data_from_file(self.cfg, infile, 'encrypted data', binary=True)
|
||||
enc_d = get_data_from_file(self.cfg, infile, desc='encrypted data', binary=True)
|
||||
while True:
|
||||
dec_d = Crypto(self.cfg).mmgen_decrypt(enc_d, 'data', hash_preset)
|
||||
if dec_d:
|
||||
|
|
@ -59,5 +59,5 @@ class tool_cmd(tool_cmd_base):
|
|||
outfile = remove_extension(o, Crypto.mmenc_ext)
|
||||
if outfile == o:
|
||||
outfile += '.dec'
|
||||
write_data_to_file(self.cfg, outfile, dec_d, 'decrypted data', binary=True)
|
||||
write_data_to_file(self.cfg, outfile, dec_d, desc='decrypted data', binary=True)
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ class tool_cmd(tool_cmd_base):
|
|||
def find_incog_data(self,
|
||||
filename: str,
|
||||
incog_id: str,
|
||||
*,
|
||||
keep_searching: 'continue search after finding data (ID collisions can yield false positives)' = False):
|
||||
"Use an Incog ID to find hidden incognito wallet data"
|
||||
|
||||
|
|
@ -65,7 +66,7 @@ class tool_cmd(tool_cmd_base):
|
|||
os.close(f)
|
||||
return True
|
||||
|
||||
def rand2file(self, outfile: str, nbytes: str, threads=4, silent=False):
|
||||
def rand2file(self, outfile: str, nbytes: str, *, threads=4, silent=False):
|
||||
"""
|
||||
write ‘nbytes’ bytes of random data to specified file (dd-style byte specifiers supported)
|
||||
|
||||
|
|
@ -148,7 +149,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
return True
|
||||
|
||||
def decrypt_keystore(self, wallet_file: str, output_hex=False):
|
||||
def decrypt_keystore(self, wallet_file: str, *, output_hex=False):
|
||||
"decrypt the data in a keystore wallet, returning the decrypted data in binary format"
|
||||
from ..ui import line_input
|
||||
passwd = line_input(self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase).strip().encode()
|
||||
|
|
@ -159,9 +160,9 @@ class tool_cmd(tool_cmd_base):
|
|||
ret = decrypt_keystore(data[0]['keystore'], passwd)
|
||||
return ret.hex() if output_hex else ret
|
||||
|
||||
def decrypt_geth_keystore(self, wallet_file: str, check_addr=True):
|
||||
def decrypt_geth_keystore(self, wallet_file: str, *, check_addr=True):
|
||||
"decrypt the private key in a Geth keystore wallet, returning the decrypted key in hex format"
|
||||
from ..ui import line_input
|
||||
passwd = line_input(self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase).strip().encode()
|
||||
from ..proto.eth.misc import decrypt_geth_keystore
|
||||
return decrypt_geth_keystore(self.cfg, wallet_file, passwd, check_addr).hex()
|
||||
return decrypt_geth_keystore(self.cfg, wallet_file, passwd, check_addr=check_addr).hex()
|
||||
|
|
|
|||
|
|
@ -98,14 +98,15 @@ class tool_cmd(tool_cmd_base):
|
|||
if fmt == 'xmrseed':
|
||||
hexstr = self._xmr_reduce(bytes.fromhex(hexstr)).hex()
|
||||
f = mnemonic_fmts[fmt]
|
||||
return ' '.join(f.conv_cls(fmt).fromhex(hexstr, f.pad))
|
||||
return ' '.join(f.conv_cls(fmt).fromhex(hexstr, pad=f.pad))
|
||||
|
||||
def mn2hex(self, seed_mnemonic: 'sstr', fmt:mn_opts_disp = dfl_mnemonic_fmt):
|
||||
"convert a mnemonic seed phrase to a hexadecimal string"
|
||||
f = mnemonic_fmts[fmt]
|
||||
return f.conv_cls(fmt).tohex(seed_mnemonic.split(), f.pad)
|
||||
return f.conv_cls(fmt).tohex(seed_mnemonic.split(), pad=f.pad)
|
||||
|
||||
def mn2hex_interactive(self,
|
||||
*,
|
||||
fmt: mn_opts_disp = dfl_mnemonic_fmt,
|
||||
mn_len: 'length of seed phrase in words' = 24,
|
||||
print_mn: 'print the seed phrase after entry' = False):
|
||||
|
|
@ -122,6 +123,7 @@ class tool_cmd(tool_cmd_base):
|
|||
return mnemonic_fmts[fmt].conv_cls(fmt).check_wordlist(self.cfg)
|
||||
|
||||
def mn_printlist(self,
|
||||
*,
|
||||
fmt: mn_opts_disp = dfl_mnemonic_fmt,
|
||||
enum: 'enumerate the list' = False,
|
||||
pager: 'send output to pager' = False):
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class tool_cmd(tool_cmd_base):
|
|||
r = await rpc_init(self.cfg, self.proto, ignore_daemon_version=True, ignore_wallet=True)
|
||||
return f'{d.coind_name} version {r.daemon_version} ({r.daemon_version_str})'
|
||||
|
||||
async def getbalance(self,
|
||||
async def getbalance(self, *,
|
||||
minconf: 'minimum number of confirmations' = 1,
|
||||
quiet: 'produce quieter output' = False,
|
||||
pager: 'send output to pager' = False):
|
||||
|
|
@ -81,7 +81,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
return ret
|
||||
|
||||
async def twview(self,
|
||||
async def twview(self, *,
|
||||
pager: 'send output to pager' = False,
|
||||
reverse: 'reverse order of unspent outputs' = False,
|
||||
wide: 'display data in wide tabular format' = False,
|
||||
|
|
@ -98,7 +98,7 @@ class tool_cmd(tool_cmd_base):
|
|||
obj, pager, reverse, wide, sort, age_fmt, interactive,
|
||||
show_mmid = show_mmid)
|
||||
|
||||
async def txhist(self,
|
||||
async def txhist(self, *,
|
||||
pager: 'send output to pager' = False,
|
||||
reverse: 'reverse order of transactions' = False,
|
||||
detail: 'produce detailed, non-tabular output' = False,
|
||||
|
|
@ -114,6 +114,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
async def listaddress(self,
|
||||
mmgen_addr: str,
|
||||
*,
|
||||
wide: 'display data in wide tabular format' = False,
|
||||
minconf: 'minimum number of confirmations' = 1,
|
||||
showcoinaddr: 'display coin address in addition to MMGen ID' = True,
|
||||
|
|
@ -127,7 +128,7 @@ class tool_cmd(tool_cmd_base):
|
|||
showcoinaddrs = showcoinaddr,
|
||||
age_fmt = age_fmt)
|
||||
|
||||
async def listaddresses(self,
|
||||
async def listaddresses(self, *,
|
||||
pager: 'send output to pager' = False,
|
||||
reverse: 'reverse order of unspent outputs' = False,
|
||||
wide: 'display data in wide tabular format' = False,
|
||||
|
|
@ -204,7 +205,7 @@ class tool_cmd(tool_cmd_base):
|
|||
await (await TwCtl(self.cfg, self.proto, mode='w')).rescan_blockchain(start_block, stop_block)
|
||||
return True
|
||||
|
||||
async def twexport(self,
|
||||
async def twexport(self, *,
|
||||
include_amts = True,
|
||||
pretty = False,
|
||||
prune = False,
|
||||
|
|
@ -242,7 +243,7 @@ class tool_cmd(tool_cmd_base):
|
|||
force_overwrite = force)
|
||||
return True
|
||||
|
||||
async def twimport(self, filename: str, ignore_checksum=False, batch=False):
|
||||
async def twimport(self, filename: str, *, ignore_checksum=False, batch=False):
|
||||
"""
|
||||
restore a tracking wallet from a JSON dump created by ‘twexport’
|
||||
|
||||
|
|
|
|||
|
|
@ -57,6 +57,7 @@ class tool_cmd(tool_cmd_base):
|
|||
def to_bytespec(self,
|
||||
n: int,
|
||||
dd_style_byte_specifier: str,
|
||||
*,
|
||||
fmt: 'width and precision of output' = '0.2',
|
||||
print_sym: 'print the specifier after the numerical value' = True,
|
||||
strip: 'strip trailing zeroes' = False,
|
||||
|
|
@ -139,6 +140,7 @@ class tool_cmd(tool_cmd_base):
|
|||
# TODO: handle stdin
|
||||
def hash256(self,
|
||||
data: str,
|
||||
*,
|
||||
file_input: 'first arg is the name of a file containing the data' = False,
|
||||
hex_input: 'first arg is a hexadecimal string' = False):
|
||||
"compute sha256(sha256(data)) (double sha256)"
|
||||
|
|
@ -215,25 +217,26 @@ class tool_cmd(tool_cmd_base):
|
|||
def hextob32(self, hexstr: 'sstr', pad: 'pad output to this width' = 0):
|
||||
"convert a hexadecimal string to an MMGen-flavor base 32 string"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b32').fromhex(hexstr, pad, tostr=True)
|
||||
return baseconv('b32').fromhex(hexstr, pad=pad, tostr=True)
|
||||
|
||||
def b32tohex(self, b32_str: 'sstr', pad: 'pad output to this width' = 0):
|
||||
"convert an MMGen-flavor base 32 string to hexadecimal"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b32').tohex(b32_str.upper(), pad)
|
||||
return baseconv('b32').tohex(b32_str.upper(), pad=pad)
|
||||
|
||||
def hextob6d(self,
|
||||
hexstr: 'sstr',
|
||||
*,
|
||||
pad: 'pad output to this width' = 0,
|
||||
add_spaces: 'add a space after every 5th character' = True):
|
||||
"convert a hexadecimal string to die roll base6 (base6d)"
|
||||
from ..baseconv import baseconv
|
||||
from ..util2 import block_format
|
||||
ret = baseconv('b6d').fromhex(hexstr, pad, tostr=True)
|
||||
ret = baseconv('b6d').fromhex(hexstr, pad=pad, tostr=True)
|
||||
return block_format(ret, gw=5, cols=None).strip() if add_spaces else ret
|
||||
|
||||
def b6dtohex(self, b6d_str: 'sstr', pad: 'pad output to this width' = 0):
|
||||
"convert a die roll base6 (base6d) string to hexadecimal"
|
||||
from ..baseconv import baseconv
|
||||
from ..util import remove_whitespace
|
||||
return baseconv('b6d').tohex(remove_whitespace(b6d_str), pad)
|
||||
return baseconv('b6d').tohex(remove_whitespace(b6d_str), pad=pad)
|
||||
|
|
|
|||
|
|
@ -43,19 +43,21 @@ class tool_cmd(tool_cmd_base):
|
|||
def get_subseed(self, subseed_idx: str, wallet=''):
|
||||
"get the Seed ID of a single subseed by Subseed Index for default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
return Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid
|
||||
return Wallet(self.cfg, fn=self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid
|
||||
|
||||
def get_subseed_by_seed_id(self, seed_id: str, wallet='', last_idx=SubSeedList.dfl_len):
|
||||
"get the Subseed Index of a single subseed by Seed ID for default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
ret = Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseed_by_seed_id(seed_id, last_idx)
|
||||
ret = Wallet(
|
||||
self.cfg,
|
||||
fn = self._get_seed_file(wallet)).seed.subseed_by_seed_id(seed_id, last_idx=last_idx)
|
||||
return ret.ss_idx if ret else None
|
||||
|
||||
def list_subseeds(self, subseed_idx_range: str, wallet=''):
|
||||
"list a range of subseed Seed IDs for default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
from ..subseed import SubSeedIdxRange
|
||||
return Wallet(self.cfg, self._get_seed_file(wallet)).seed.subseeds.format(
|
||||
return Wallet(self.cfg, fn=self._get_seed_file(wallet)).seed.subseeds.format(
|
||||
*SubSeedIdxRange(subseed_idx_range))
|
||||
|
||||
def list_shares(self,
|
||||
|
|
@ -65,8 +67,8 @@ class tool_cmd(tool_cmd_base):
|
|||
wallet = ''):
|
||||
"list the Seed IDs of the shares resulting from a split of default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
return Wallet(self.cfg, self._get_seed_file(wallet)).seed.split(
|
||||
share_count, id_str, master_share).format()
|
||||
return Wallet(self.cfg, fn=self._get_seed_file(wallet)).seed.split(
|
||||
share_count, id_str=id_str, master_idx=master_share).format()
|
||||
|
||||
def gen_key(self, mmgen_addr: str, wallet=''):
|
||||
"generate a single WIF key for specified MMGen address from default or specified wallet"
|
||||
|
|
@ -82,7 +84,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
addr = MMGenID(self.proto, mmgen_addr)
|
||||
self.cfg._set_quiet(True)
|
||||
ss = Wallet(self.cfg, self._get_seed_file(wallet))
|
||||
ss = Wallet(self.cfg, fn=self._get_seed_file(wallet))
|
||||
|
||||
if ss.seed.sid != addr.sid:
|
||||
from ..util import die
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class TwAddresses(TwView):
|
|||
def coinaddr_list(self):
|
||||
return [d.addr for d in self.data]
|
||||
|
||||
async def __init__(self, cfg, proto, minconf=1, mmgen_addrs='', get_data=False):
|
||||
async def __init__(self, cfg, proto, *, minconf=1, mmgen_addrs='', get_data=False):
|
||||
|
||||
await super().__init__(cfg, proto)
|
||||
|
||||
|
|
|
|||
|
|
@ -63,6 +63,7 @@ class TwCtl(MMGenObject, metaclass=AsyncInit):
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
mode = 'r',
|
||||
token_addr = None,
|
||||
no_rpc = False,
|
||||
|
|
@ -167,7 +168,7 @@ class TwCtl(MMGenObject, metaclass=AsyncInit):
|
|||
def data_root_desc(self):
|
||||
return self.data_key
|
||||
|
||||
def cache_balance(self, addr, bal, session_cache, data_root, force=False):
|
||||
def cache_balance(self, addr, bal, *, session_cache, data_root, force=False):
|
||||
if force or addr not in session_cache:
|
||||
session_cache[addr] = str(bal)
|
||||
if addr in data_root:
|
||||
|
|
@ -183,11 +184,11 @@ class TwCtl(MMGenObject, metaclass=AsyncInit):
|
|||
if addr in data_root and 'balance' in data_root[addr]:
|
||||
return self.proto.coin_amt(data_root[addr]['balance'])
|
||||
|
||||
async def get_balance(self, addr, force_rpc=False):
|
||||
async def get_balance(self, addr, *, force_rpc=False):
|
||||
ret = None if force_rpc else self.get_cached_balance(addr, self.cur_balances, self.data_root)
|
||||
if ret is None:
|
||||
ret = await self.rpc_get_balance(addr)
|
||||
self.cache_balance(addr, ret, self.cur_balances, self.data_root)
|
||||
self.cache_balance(addr, ret, session_cache=self.cur_balances, data_root=self.data_root)
|
||||
return ret
|
||||
|
||||
def force_write(self):
|
||||
|
|
@ -212,7 +213,7 @@ class TwCtl(MMGenObject, metaclass=AsyncInit):
|
|||
|
||||
self.orig_data = data
|
||||
|
||||
def write(self, quiet=True):
|
||||
def write(self, *, quiet=True):
|
||||
if not self.use_tw_file:
|
||||
self.cfg._util.dmsg("'use_tw_file' is False, doing nothing")
|
||||
return
|
||||
|
|
@ -256,6 +257,7 @@ class TwCtl(MMGenObject, metaclass=AsyncInit):
|
|||
self,
|
||||
addrspec,
|
||||
comment = '',
|
||||
*,
|
||||
trusted_pair = None,
|
||||
silent = False):
|
||||
|
||||
|
|
@ -296,11 +298,11 @@ class TwCtl(MMGenObject, metaclass=AsyncInit):
|
|||
async def remove_comment(self, mmaddr):
|
||||
await self.set_comment(mmaddr, '')
|
||||
|
||||
async def import_address_common(self, data, batch=False, gather=False):
|
||||
async def import_address_common(self, data, *, batch=False, gather=False):
|
||||
|
||||
async def do_import(address, comment, message):
|
||||
try:
|
||||
res = await self.import_address(address, comment)
|
||||
res = await self.import_address(address, label=comment)
|
||||
self.cfg._util.qmsg(message)
|
||||
return res
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -30,7 +30,8 @@ class TwJSON:
|
|||
fn_pfx = 'mmgen-tracking-wallet-dump'
|
||||
|
||||
def __new__(cls, cfg, proto, *args, **kwargs):
|
||||
return MMGenObject.__new__(proto.base_proto_subclass(TwJSON, 'tw.json', cls.__name__))
|
||||
return MMGenObject.__new__(
|
||||
proto.base_proto_subclass(TwJSON, 'tw.json', sub_clsname=cls.__name__))
|
||||
|
||||
def __init__(self, cfg, proto):
|
||||
self.cfg = cfg
|
||||
|
|
@ -62,7 +63,7 @@ class TwJSON:
|
|||
|
||||
return fn
|
||||
|
||||
def json_dump(self, data, pretty=False):
|
||||
def json_dump(self, data, *, pretty=False):
|
||||
return json.dumps(
|
||||
data,
|
||||
cls = json_encoder,
|
||||
|
|
@ -90,6 +91,7 @@ class TwJSON:
|
|||
cfg,
|
||||
proto,
|
||||
filename,
|
||||
*,
|
||||
ignore_checksum = False,
|
||||
batch = False):
|
||||
|
||||
|
|
@ -163,6 +165,7 @@ class TwJSON:
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
include_amts = True,
|
||||
pretty = False,
|
||||
prune = False,
|
||||
|
|
|
|||
|
|
@ -250,14 +250,14 @@ class TwView(MMGenObject, metaclass=AsyncInit):
|
|||
'twmmid': lambda i: '{} {:010} {:024.12f}'.format(i.twmmid.sort_key, 0xffffffff - abs(i.confs), i.amt)
|
||||
}
|
||||
|
||||
def sort_info(self, include_group=True):
|
||||
def sort_info(self, *, include_group=True):
|
||||
ret = ([], ['Reverse'])[self.reverse]
|
||||
ret.append(self.sort_disp[self.sort_key])
|
||||
if include_group and self.group and (self.sort_key in ('addr', 'txid', 'twmmid')):
|
||||
ret.append('Grouped')
|
||||
return ret
|
||||
|
||||
def do_sort(self, key=None, reverse=False):
|
||||
def do_sort(self, key=None, *, reverse=False):
|
||||
key = key or self.sort_key
|
||||
if key not in self.sort_funcs:
|
||||
die(1, f'{key!r}: invalid sort key. Valid options: {" ".join(self.sort_funcs)}')
|
||||
|
|
@ -268,7 +268,7 @@ class TwView(MMGenObject, metaclass=AsyncInit):
|
|||
if self.data != save:
|
||||
self.pos = 0
|
||||
|
||||
async def get_data(self, sort_key=None, reverse_sort=False):
|
||||
async def get_data(self, *, sort_key=None, reverse_sort=False):
|
||||
|
||||
rpc_data = await self.get_rpc_data()
|
||||
|
||||
|
|
@ -390,6 +390,7 @@ class TwView(MMGenObject, metaclass=AsyncInit):
|
|||
async def format(
|
||||
self,
|
||||
display_type,
|
||||
*,
|
||||
color = True,
|
||||
interactive = False,
|
||||
line_processing = None,
|
||||
|
|
|
|||
|
|
@ -181,7 +181,8 @@ class Base(MMGenObject):
|
|||
def add_comment(self, infile=None):
|
||||
if infile:
|
||||
from ..fileutil import get_data_from_file
|
||||
self.comment = MMGenTxComment(get_data_from_file(self.cfg, infile, 'transaction comment'))
|
||||
self.comment = MMGenTxComment(
|
||||
get_data_from_file(self.cfg, infile, desc='transaction comment'))
|
||||
else:
|
||||
from ..ui import keypress_confirm, line_input
|
||||
if keypress_confirm(
|
||||
|
|
|
|||
|
|
@ -80,10 +80,10 @@ class MMGenTxFile(MMGenObject):
|
|||
self.fmt_data = None
|
||||
self.filename = None
|
||||
|
||||
def parse(self, infile, metadata_only=False, quiet_open=False):
|
||||
def parse(self, infile, *, metadata_only=False, quiet_open=False):
|
||||
tx = self.tx
|
||||
from ..fileutil import get_data_from_file
|
||||
data = get_data_from_file(tx.cfg, infile, f'{tx.desc} data', quiet=quiet_open)
|
||||
data = get_data_from_file(tx.cfg, infile, desc=f'{tx.desc} data', quiet=quiet_open)
|
||||
if len(data) > tx.cfg.max_tx_file_size:
|
||||
die('MaxFileSizeExceeded',
|
||||
f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)')
|
||||
|
|
@ -286,7 +286,7 @@ class MMGenTxFile(MMGenObject):
|
|||
|
||||
return fmt_data
|
||||
|
||||
def write(self,
|
||||
def write(self, *,
|
||||
add_desc = '',
|
||||
outdir = None,
|
||||
ask_write = True,
|
||||
|
|
@ -316,7 +316,7 @@ class MMGenTxFile(MMGenObject):
|
|||
ignore_opt_outdir = outdir)
|
||||
|
||||
@classmethod
|
||||
def get_proto(cls, cfg, filename, quiet_open=False):
|
||||
def get_proto(cls, cfg, filename, *, quiet_open=False):
|
||||
from . import BaseTX
|
||||
tmp_tx = BaseTX(cfg=cfg)
|
||||
cls(tmp_tx).parse(filename, metadata_only=True, quiet_open=quiet_open)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class TxInfo:
|
|||
self.cfg = cfg
|
||||
self.tx = tx
|
||||
|
||||
def format(self, terse=False, sort='addr'):
|
||||
def format(self, *, terse=False, sort='addr'):
|
||||
|
||||
tx = self.tx
|
||||
|
||||
|
|
@ -115,7 +115,7 @@ class TxInfo:
|
|||
|
||||
return ''.join(gen_view())
|
||||
|
||||
def view_with_prompt(self, prompt, pause=True):
|
||||
def view_with_prompt(self, prompt, *, pause=True):
|
||||
prompt += ' (y)es, (N)o, pager (v)iew, (t)erse view: '
|
||||
from ..term import get_char
|
||||
while True:
|
||||
|
|
@ -131,7 +131,7 @@ class TxInfo:
|
|||
break
|
||||
msg('Invalid reply')
|
||||
|
||||
def view(self, pager=False, pause=True, terse=False):
|
||||
def view(self, *, pager=False, pause=True, terse=False):
|
||||
o = self.format(terse=terse)
|
||||
if pager:
|
||||
from ..ui import do_pager
|
||||
|
|
|
|||
|
|
@ -164,7 +164,7 @@ class New(Base):
|
|||
return False
|
||||
return True
|
||||
|
||||
def add_output(self, coinaddr, amt, is_chg=False, is_vault=False, data=None):
|
||||
def add_output(self, coinaddr, amt, *, is_chg=False, is_vault=False, data=None):
|
||||
self.outputs.append(
|
||||
self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, is_vault=is_vault, data=data))
|
||||
|
||||
|
|
@ -197,7 +197,7 @@ class New(Base):
|
|||
|
||||
return _pa(arg, mmid, coin_addr, amt, None, is_vault)
|
||||
|
||||
async def get_autochg_addr(self, proto, arg, exclude, desc, all_addrtypes=False):
|
||||
async def get_autochg_addr(self, proto, arg, *, exclude, desc, all_addrtypes=False):
|
||||
from ..tw.addresses import TwAddresses
|
||||
al = await TwAddresses(self.cfg, proto, get_data=True)
|
||||
|
||||
|
|
@ -287,7 +287,7 @@ class New(Base):
|
|||
for addrfile in addrfiles:
|
||||
check_infile(addrfile)
|
||||
try:
|
||||
ad_f.add(AddrList(self.cfg, proto, addrfile))
|
||||
ad_f.add(AddrList(self.cfg, proto, infile=addrfile))
|
||||
except Exception as e:
|
||||
msg(f'{type(e).__name__}: {e}')
|
||||
return ad_f
|
||||
|
|
@ -419,7 +419,7 @@ class New(Base):
|
|||
else:
|
||||
self.warn_insufficient_funds(funds.amt, self.coin)
|
||||
|
||||
async def create(self, cmd_args, locktime=None, do_info=False, caller='txcreate'):
|
||||
async def create(self, cmd_args, *, locktime=None, do_info=False, caller='txcreate'):
|
||||
|
||||
assert isinstance(locktime, (int, type(None))), 'locktime must be of type int'
|
||||
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ def get_seed_for_seed_id(sid, infiles, saved_seeds):
|
|||
subseeds_checked = False
|
||||
while True:
|
||||
if infiles:
|
||||
seed = Wallet(cfg, infiles.pop(0), ignore_in_fmt=True, passwd_file=global_passwd_file).seed
|
||||
seed = Wallet(cfg, fn=infiles.pop(0), ignore_in_fmt=True, passwd_file=global_passwd_file).seed
|
||||
elif subseeds_checked is False:
|
||||
seed = saved_seeds[list(saved_seeds)[0]].subseed_by_seed_id(sid, print_msg=True)
|
||||
subseeds_checked = True
|
||||
|
|
@ -128,7 +128,7 @@ def get_tx_files(cfg, args):
|
|||
die(1, 'You must specify a raw transaction file!')
|
||||
return ret
|
||||
|
||||
def get_seed_files(cfg, args, ignore_dfl_wallet=False, empty_ok=False):
|
||||
def get_seed_files(cfg, args, *, ignore_dfl_wallet=False, empty_ok=False):
|
||||
# favor unencrypted seed sources first, as they don't require passwords
|
||||
ret = _pop_matching_fns(args, get_wallet_extensions('unenc'))
|
||||
from ..filename import find_file_in_dir
|
||||
|
|
@ -142,13 +142,13 @@ def get_seed_files(cfg, args, ignore_dfl_wallet=False, empty_ok=False):
|
|||
|
||||
def get_keyaddrlist(cfg, proto):
|
||||
if cfg.mmgen_keys_from_file:
|
||||
return KeyAddrList(cfg, proto, cfg.mmgen_keys_from_file)
|
||||
return KeyAddrList(cfg, proto, infile=cfg.mmgen_keys_from_file)
|
||||
return None
|
||||
|
||||
def get_keylist(cfg):
|
||||
if cfg.keys_from_file:
|
||||
from ..fileutil import get_lines_from_file
|
||||
return get_lines_from_file(cfg, cfg.keys_from_file, 'key-address data', trim_comments=True)
|
||||
return get_lines_from_file(cfg, cfg.keys_from_file, desc='key-address data', trim_comments=True)
|
||||
return None
|
||||
|
||||
async def txsign(cfg_parm, tx, seed_files, kl, kal, tx_num_str='', passwd_file=None):
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ def get_data_from_user(cfg, desc='data'): # user input MUST be UTF-8
|
|||
msg(f'User input: [{data}]')
|
||||
return data
|
||||
|
||||
def line_input(cfg, prompt, echo=True, insert_txt='', hold_protect=True):
|
||||
def line_input(cfg, prompt, *, echo=True, insert_txt='', hold_protect=True):
|
||||
"""
|
||||
multi-line prompts OK
|
||||
one-line prompts must begin at beginning of line
|
||||
|
|
@ -83,6 +83,7 @@ def line_input(cfg, prompt, echo=True, insert_txt='', hold_protect=True):
|
|||
def keypress_confirm(
|
||||
cfg,
|
||||
prompt,
|
||||
*,
|
||||
default_yes = False,
|
||||
verbose = False,
|
||||
no_nl = False,
|
||||
|
|
@ -133,7 +134,7 @@ def do_pager(text):
|
|||
Msg(text+end_msg)
|
||||
set_vt100()
|
||||
|
||||
def do_license_msg(cfg, immed=False):
|
||||
def do_license_msg(cfg, *, immed=False):
|
||||
|
||||
if cfg.quiet or cfg.no_license or cfg.yes or not cfg.stdin_tty:
|
||||
return
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@ class Util:
|
|||
desc1,
|
||||
chk2,
|
||||
desc2,
|
||||
*,
|
||||
hdr = '',
|
||||
die_on_fail = False,
|
||||
verbose = False):
|
||||
|
|
@ -156,7 +157,7 @@ def mdie(*args):
|
|||
mmsg(*args)
|
||||
sys.exit(0)
|
||||
|
||||
def die(ev, s='', stdout=False):
|
||||
def die(ev, s='', *, stdout=False):
|
||||
if isinstance(ev, int):
|
||||
from .exception import MMGenSystemExit, MMGenError
|
||||
if ev <= 2:
|
||||
|
|
@ -242,7 +243,7 @@ def list_gen(*data):
|
|||
yield d[idx]
|
||||
return list(gen())
|
||||
|
||||
def remove_dups(iterable, edesc='element', desc='list', quiet=False, hide=False):
|
||||
def remove_dups(iterable, *, edesc='element', desc='list', quiet=False, hide=False):
|
||||
"""
|
||||
Remove duplicate occurrences of iterable elements, preserving first occurrence
|
||||
If iterable is a generator, return a list, else type(iterable)
|
||||
|
|
@ -292,7 +293,7 @@ def remove_extension(fn, ext):
|
|||
a, b = os.path.splitext(fn)
|
||||
return a if b[1:] == ext else fn
|
||||
|
||||
def make_chksum_N(s, nchars, sep=False, rounds=2, upper=True):
|
||||
def make_chksum_N(s, nchars, *, sep=False, rounds=2, upper=True):
|
||||
if isinstance(s, str):
|
||||
s = s.encode()
|
||||
from hashlib import sha256
|
||||
|
|
@ -306,7 +307,7 @@ def make_chksum_N(s, nchars, sep=False, rounds=2, upper=True):
|
|||
assert 4 <= nchars <= 64, 'illegal ‘nchars’ value'
|
||||
return ret.upper() if upper else ret
|
||||
|
||||
def make_chksum_8(s, sep=False):
|
||||
def make_chksum_8(s, *, sep=False):
|
||||
from .obj import HexStr
|
||||
from hashlib import sha256
|
||||
s = HexStr(sha256(sha256(s).digest()).hexdigest()[:8].upper(), case='upper')
|
||||
|
|
@ -396,7 +397,7 @@ class oneshot_warning:
|
|||
|
||||
color = 'nocolor'
|
||||
|
||||
def __init__(self, div=None, fmt_args=[], reverse=False):
|
||||
def __init__(self, *, div=None, fmt_args=[], reverse=False):
|
||||
self.do(type(self), div, fmt_args, reverse)
|
||||
|
||||
def do(self, wcls, div, fmt_args, reverse):
|
||||
|
|
@ -422,10 +423,10 @@ class oneshot_warning:
|
|||
|
||||
class oneshot_warning_group(oneshot_warning):
|
||||
|
||||
def __init__(self, wcls, div=None, fmt_args=[], reverse=False):
|
||||
def __init__(self, wcls, *, div=None, fmt_args=[], reverse=False):
|
||||
self.do(getattr(self, wcls), div, fmt_args, reverse)
|
||||
|
||||
def get_subclasses(cls, names=False):
|
||||
def get_subclasses(cls, *, names=False):
|
||||
def gen(cls):
|
||||
for i in cls.__subclasses__():
|
||||
yield i
|
||||
|
|
@ -456,7 +457,7 @@ def exit_if_mswin(feature):
|
|||
if sys.platform == 'win32':
|
||||
die(2, capfirst(feature) + ' not supported on the MSWin / MSYS2 platform')
|
||||
|
||||
def have_sudo(silent=False):
|
||||
def have_sudo(*, silent=False):
|
||||
from subprocess import run, DEVNULL
|
||||
redir = DEVNULL if silent else None
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ def die_pause(ev=0, s=''):
|
|||
def cffi_override_fixup():
|
||||
from cffi import FFI
|
||||
class FFI_override:
|
||||
def cdef(self, csource, override=False, packed=False, pack=None):
|
||||
def cdef(self, csource, *, override=False, packed=False, pack=None):
|
||||
self._cdef(csource, override=True, packed=packed, pack=pack)
|
||||
FFI.cdef = FFI_override.cdef
|
||||
|
||||
|
|
@ -92,7 +92,7 @@ bytespec_map = (
|
|||
('E', 1152921504606846976),
|
||||
)
|
||||
|
||||
def int2bytespec(n, spec, fmt, print_sym=True, strip=False, add_space=False):
|
||||
def int2bytespec(n, spec, fmt, *, print_sym=True, strip=False, add_space=False):
|
||||
|
||||
def spec2int(spec):
|
||||
for k, v in bytespec_map:
|
||||
|
|
@ -137,6 +137,7 @@ def format_elapsed_days_hr(t, now=None, cached={}):
|
|||
|
||||
def format_elapsed_hr(
|
||||
t,
|
||||
*,
|
||||
now = None,
|
||||
cached = {},
|
||||
rel_now = True,
|
||||
|
|
@ -177,7 +178,7 @@ def pretty_format(s, width=80, pfx=''):
|
|||
s = s[i+1:]
|
||||
return pfx + ('\n'+pfx).join(out)
|
||||
|
||||
def block_format(data, gw=2, cols=8, line_nums=None, data_is_hex=False):
|
||||
def block_format(data, *, gw=2, cols=8, line_nums=None, data_is_hex=False):
|
||||
assert line_nums in (None, 'hex', 'dec'), "'line_nums' must be one of None, 'hex' or 'dec'"
|
||||
ln_fs = '{:06x}: ' if line_nums == 'hex' else '{:06}: '
|
||||
bytes_per_chunk = gw
|
||||
|
|
@ -192,7 +193,7 @@ def block_format(data, gw=2, cols=8, line_nums=None, data_is_hex=False):
|
|||
).rstrip() + '\n'
|
||||
|
||||
def pretty_hexdump(data, gw=2, cols=8, line_nums=None):
|
||||
return block_format(data.hex(), gw, cols, line_nums, data_is_hex=True)
|
||||
return block_format(data.hex(), gw=gw, cols=cols, line_nums=line_nums, data_is_hex=True)
|
||||
|
||||
def decode_pretty_hexdump(data):
|
||||
pat = re.compile(fr'^[{hexdigits}]+:\s+')
|
||||
|
|
|
|||
|
|
@ -47,6 +47,7 @@ _wd('words', 'MMGenMnemonic', 'mmwords', 'mnemonic', False, ('mmwor
|
|||
}
|
||||
|
||||
def get_wallet_data(
|
||||
*,
|
||||
wtype = None,
|
||||
fmt_code = None,
|
||||
ext = None,
|
||||
|
|
@ -73,6 +74,7 @@ def get_wallet_data(
|
|||
|
||||
def get_wallet_cls(
|
||||
wtype = None,
|
||||
*,
|
||||
fmt_code = None,
|
||||
ext = None,
|
||||
die_on_fail = False):
|
||||
|
|
@ -111,6 +113,7 @@ def _get_me(modname):
|
|||
|
||||
def Wallet(
|
||||
cfg,
|
||||
*,
|
||||
fn = None,
|
||||
ss = None,
|
||||
seed_bin = None,
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ class wallet(MMGenObject, metaclass=WalletMeta):
|
|||
self.fmt_data = get_data_from_file(
|
||||
self.cfg,
|
||||
self.infile.name,
|
||||
self.desc,
|
||||
desc = self.desc,
|
||||
binary = self.file_mode=='binary')
|
||||
elif self.in_data:
|
||||
self.fmt_data = self.in_data
|
||||
|
|
|
|||
|
|
@ -142,9 +142,9 @@ class wallet(wallet):
|
|||
return False
|
||||
|
||||
def _verify_seed_oldfmt(self, seed):
|
||||
m = f'Seed ID: {make_chksum_8(seed)}. Is the Seed ID correct?'
|
||||
prompt = f'Seed ID: {make_chksum_8(seed)}. Is the Seed ID correct?'
|
||||
from ..ui import keypress_confirm
|
||||
if keypress_confirm(self.cfg, m, True):
|
||||
if keypress_confirm(self.cfg, prompt, default_yes=True):
|
||||
return seed
|
||||
else:
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -53,8 +53,8 @@ class wallet(wallet):
|
|||
seed = self.seed.data
|
||||
|
||||
bc = self.conv_cls(self.wl_id)
|
||||
mn = bc.frombytes(seed, 'seed')
|
||||
rev = bc.tobytes(mn, 'seed')
|
||||
mn = bc.frombytes(seed, pad='seed')
|
||||
rev = bc.tobytes(mn, pad='seed')
|
||||
|
||||
# Internal error, so just die on fail
|
||||
self.cfg._util.compare_or_die(rev, 'recomputed seed', seed, 'original seed', e='Internal error')
|
||||
|
|
@ -78,8 +78,8 @@ class wallet(wallet):
|
|||
msg(f'Invalid mnemonic: word #{n} is not in the {self.wl_id.upper()} wordlist')
|
||||
return False
|
||||
|
||||
seed = bc.tobytes(mn, 'seed')
|
||||
rev = bc.frombytes(seed, 'seed')
|
||||
seed = bc.tobytes(mn, pad='seed')
|
||||
rev = bc.frombytes(seed, pad='seed')
|
||||
|
||||
if len(seed) * 8 not in Seed.lens:
|
||||
msg('Invalid mnemonic (produces too large a number)')
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ class xmrseed(baseconv):
|
|||
|
||||
return b''.join(gen())
|
||||
|
||||
def frombytes(self, bytestr, pad=None, tostr=False):
|
||||
def frombytes(self, bytestr, *, pad=None, tostr=False):
|
||||
assert pad is None, f"{pad}: invalid 'pad' argument (must be None)"
|
||||
|
||||
desc = self.desc.short
|
||||
|
|
|
|||
|
|
@ -60,5 +60,5 @@ class MoneroMMGenFile:
|
|||
|
||||
def extract_data_from_file(self, cfg, fn):
|
||||
return json.loads(
|
||||
get_data_from_file(cfg, str(fn), self.desc, silent=self.silent_load)
|
||||
get_data_from_file(cfg, str(fn), desc=self.desc, silent=self.silent_load)
|
||||
)[self.data_label]
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ class MoneroWalletOutputsFile:
|
|||
self.name = type(self).__name__
|
||||
self.cfg = cfg
|
||||
|
||||
def write(self, add_suf='', quiet=False):
|
||||
def write(self, *, add_suf='', quiet=False):
|
||||
from ...fileutil import write_data_to_file
|
||||
write_data_to_file(
|
||||
cfg = self.cfg,
|
||||
|
|
@ -81,7 +81,7 @@ class MoneroWalletOutputsFile:
|
|||
class New(Base):
|
||||
ext = 'raw'
|
||||
|
||||
def __init__(self, parent, wallet_fn, data, wallet_idx=None, sign=False):
|
||||
def __init__(self, parent, wallet_fn, data, *, wallet_idx=None, sign=False):
|
||||
super().__init__(parent.cfg)
|
||||
self.wallet_fn = wallet_fn
|
||||
init_data = dict.fromkeys(self.data_tuple._fields)
|
||||
|
|
@ -115,7 +115,7 @@ class MoneroWalletOutputsFile:
|
|||
self.check_checksums(d_wrap)
|
||||
|
||||
@classmethod
|
||||
def find_fn_from_wallet_fn(cls, cfg, wallet_fn, ret_on_no_match=False):
|
||||
def find_fn_from_wallet_fn(cls, cfg, wallet_fn, *, ret_on_no_match=False):
|
||||
path = get_autosign_obj(cfg).xmr_outputs_dir or Path()
|
||||
pat = cls.fn_fs.format(
|
||||
a = wallet_fn.name,
|
||||
|
|
|
|||
|
|
@ -153,7 +153,7 @@ class MoneroMMGenTX:
|
|||
def file_id(self):
|
||||
return (self.base_chksum + ('-' + self.full_chksum if self.full_chksum else '')).upper()
|
||||
|
||||
def write(self, delete_metadata=False, ask_write=True, ask_overwrite=True):
|
||||
def write(self, *, delete_metadata=False, ask_write=True, ask_overwrite=True):
|
||||
dict_data = self.data._asdict()
|
||||
if delete_metadata:
|
||||
dict_data['metadata'] = None
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ class OpImportOutputs(OpWallet):
|
|||
action = 'importing wallet outputs into'
|
||||
start_daemon = False
|
||||
|
||||
async def main(self, fn, wallet_idx, restart_daemon=True):
|
||||
async def main(self, fn, wallet_idx, *, restart_daemon=True):
|
||||
if restart_daemon:
|
||||
await self.restart_wallet_daemon()
|
||||
h = MoneroWalletRPC(self, self.addr_data[0])
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class OpRelay(OpBase):
|
|||
md = None
|
||||
else:
|
||||
from ...daemon import CoinDaemon
|
||||
md = CoinDaemon(self.cfg, 'xmr', test_suite=self.cfg.test_suite)
|
||||
md = CoinDaemon(self.cfg, network_id='xmr', test_suite=self.cfg.test_suite)
|
||||
host, port = ('localhost', md.rpc_port)
|
||||
proxy = None
|
||||
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class OpSign(OpWallet):
|
|||
action = 'signing transaction with'
|
||||
start_daemon = False
|
||||
|
||||
async def main(self, fn, restart_daemon=True):
|
||||
async def main(self, fn, *, restart_daemon=True):
|
||||
if restart_daemon:
|
||||
await self.restart_wallet_daemon()
|
||||
tx = MoneroMMGenTX.Unsigned(self.cfg, fn)
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ class OpWallet(OpBase):
|
|||
return MoneroRPCClient(
|
||||
cfg = self.cfg,
|
||||
proto = self.proto,
|
||||
daemon = CoinDaemon(self.cfg, 'xmr'),
|
||||
daemon = CoinDaemon(self.cfg, network_id='xmr'),
|
||||
host = host,
|
||||
port = int(port),
|
||||
user = None,
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ class MoneroWalletRPC:
|
|||
await self.c.stop_daemon(quiet=True) # closes wallet
|
||||
gmsg_r('done')
|
||||
|
||||
def gen_accts_info(self, accts_data, addrs_data, indent=' ', skip_empty_ok=False):
|
||||
def gen_accts_info(self, accts_data, addrs_data, *, indent=' ', skip_empty_ok=False):
|
||||
from .ops import addr_width
|
||||
fs = indent + ' {I:<3} {A} {N} {B} {L}'
|
||||
yield indent + f'Accounts of wallet {self.fn.name}:'
|
||||
|
|
@ -84,7 +84,7 @@ class MoneroWalletRPC:
|
|||
B = fmt_amt(e['unlocked_balance']),
|
||||
L = pink(e['label']))
|
||||
|
||||
def get_wallet_data(self, print=True, skip_empty_ok=False):
|
||||
def get_wallet_data(self, *, print=True, skip_empty_ok=False):
|
||||
accts_data = self.c.call('get_accounts')
|
||||
addrs_data = [
|
||||
self.c.call('get_address', account_index=i)
|
||||
|
|
@ -123,7 +123,7 @@ class MoneroWalletRPC:
|
|||
msg(cyan(ret['address']))
|
||||
return ret['address']
|
||||
|
||||
def get_last_addr(self, account, wallet_data, display=True):
|
||||
def get_last_addr(self, account, wallet_data, *, display=True):
|
||||
if display:
|
||||
msg('\n Getting last address:')
|
||||
acct_addrs = wallet_data.addrs_data[account]['addresses']
|
||||
|
|
|
|||
|
|
@ -426,7 +426,7 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
|
|||
self.proto = init_proto( cfg, cfg.coin, network='regtest', need_amt=True)
|
||||
|
||||
from mmgen.daemon import CoinDaemon
|
||||
self.daemon = CoinDaemon( cfg, self.proto.coin+'_rt', test_suite=True)
|
||||
self.daemon = CoinDaemon( cfg, network_id=self.proto.coin+'_rt', test_suite=True)
|
||||
|
||||
if self.daemon.id == 'reth':
|
||||
global dfl_devkey, dfl_devaddr
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ def make_brainwallet_file(fn):
|
|||
d = ''.join(rand_pairs).rstrip() + '\n'
|
||||
if cfg.verbose:
|
||||
msg_r(f'Brainwallet password:\n{cyan(d)}')
|
||||
write_data_to_file(cfg, fn, d, 'brainwallet password', quiet=True, ignore_opt_outdir=True)
|
||||
write_data_to_file(cfg, fn, d, desc='brainwallet password', quiet=True, ignore_opt_outdir=True)
|
||||
|
||||
def verify_checksum_or_exit(checksum, chk):
|
||||
chk = strip_ansi_escapes(chk)
|
||||
|
|
@ -397,7 +397,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
|
|||
addrfile = self.get_file_with_ext('addrs')
|
||||
from mmgen.addrlist import AddrList
|
||||
silence()
|
||||
chk = AddrList(cfg, self.proto, addrfile).chksum
|
||||
chk = AddrList(cfg, self.proto, infile=addrfile).chksum
|
||||
end_silence()
|
||||
if cfg.verbose and display:
|
||||
msg(f'Checksum: {cyan(chk)}')
|
||||
|
|
@ -537,7 +537,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
|
|||
cfg,
|
||||
self.unspent_data_file,
|
||||
d,
|
||||
'Unspent outputs',
|
||||
desc = 'Unspent outputs',
|
||||
quiet = True,
|
||||
ignore_opt_outdir = True)
|
||||
if cfg.verbose or cfg.exact_output:
|
||||
|
|
@ -633,7 +633,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
|
|||
tx_data, ad = {}, AddrData(self.proto)
|
||||
for s in sources:
|
||||
addrfile = get_file_with_ext(self.cfgs[s]['tmpdir'], 'addrs')
|
||||
al = AddrList(cfg, self.proto, addrfile)
|
||||
al = AddrList(cfg, self.proto, infile=addrfile)
|
||||
ad.add(al)
|
||||
aix = AddrIdxList(fmt_str=self.cfgs[s]['addr_idx_list'])
|
||||
if len(aix) != addrs_per_wallet:
|
||||
|
|
@ -849,7 +849,7 @@ class CmdTestMain(CmdTestBase, CmdTestShared):
|
|||
wcls = get_wallet_cls(fmt_code=out_fmt)
|
||||
msg('==> {}: {}'.format(
|
||||
wcls.desc,
|
||||
cyan(get_data_from_file(cfg, f, wcls.desc))
|
||||
cyan(get_data_from_file(cfg, f, desc=wcls.desc))
|
||||
))
|
||||
end_silence()
|
||||
return t
|
||||
|
|
|
|||
|
|
@ -498,7 +498,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
|
|||
def _add_comments_to_addr_file(self, proto, addrfile, outfile, use_comments=False):
|
||||
silence()
|
||||
gmsg(f'Adding comments to address file {addrfile!r}')
|
||||
a = AddrList(cfg, proto, addrfile)
|
||||
a = AddrList(cfg, proto, infile=addrfile)
|
||||
for n, idx in enumerate(a.idxs(), 1):
|
||||
if use_comments:
|
||||
a.set_comment(idx, get_comment())
|
||||
|
|
@ -1110,7 +1110,7 @@ class CmdTestRegtest(CmdTestBase, CmdTestShared):
|
|||
sid, self.get_altcoin_pfx(proto.coin), id_str, addr_range, x='-α' if cfg.debug_utf8 else '')
|
||||
addrfile = get_file_with_ext(self._user_dir(user), ext, no_dot=True)
|
||||
silence()
|
||||
addr = AddrList(cfg, proto, addrfile).data[idx].addr
|
||||
addr = AddrList(cfg, proto, infile=addrfile).data[idx].addr
|
||||
end_silence()
|
||||
return addr
|
||||
|
||||
|
|
|
|||
|
|
@ -295,7 +295,7 @@ class CmdTestShared:
|
|||
fn = t.written_to_file('Password list' if passgen else 'Addresses')
|
||||
cls = PasswordList if passgen else AddrList
|
||||
silence()
|
||||
al = cls(cfg, self.proto, fn, skip_chksum_msg=True) # read back the file we’ve written
|
||||
al = cls(cfg, self.proto, infile=fn, skip_chksum_msg=True) # read back the file we’ve written
|
||||
end_silence()
|
||||
cmp_or_die(al.chksum, chksum, desc=f'{ftype}list data checksum from file')
|
||||
return t
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ class CmdTestXMRAutosign(CmdTestXMRWallet, CmdTestAutosignThreaded):
|
|||
cfg = self.cfg,
|
||||
proto = self.proto,
|
||||
addr_idxs = '1-2',
|
||||
seed = Wallet(cfg, data.mmwords).seed,
|
||||
seed = Wallet(cfg, fn=data.mmwords).seed,
|
||||
skip_chksum_msg = True,
|
||||
key_address_validity_check = False)
|
||||
kal.file.write(ask_overwrite=False)
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ from mmgen.daemon import CoinDaemon
|
|||
from ..include.common import cfg, qmsg, qmsg_r, vmsg, msg
|
||||
|
||||
def test_flags(coin):
|
||||
d = CoinDaemon(cfg, coin)
|
||||
d = CoinDaemon(cfg, network_id=coin)
|
||||
vmsg(f'Available opts: {fmt_list(d.avail_opts, fmt="bare")}')
|
||||
vmsg(f'Available flags: {fmt_list(d.avail_flags, fmt="bare")}')
|
||||
vals = namedtuple('vals', ['online', 'no_daemonize', 'keep_cfg_file'])
|
||||
|
|
@ -26,7 +26,7 @@ def test_flags(coin):
|
|||
(['online'], ['keep_cfg_file'], vals(True, False, True)),
|
||||
(['online', 'no_daemonize'], ['keep_cfg_file'], vals(True, True, True)),
|
||||
):
|
||||
d = CoinDaemon(cfg, coin, opts=opts, flags=flags)
|
||||
d = CoinDaemon(cfg, network_id=coin, opts=opts, flags=flags)
|
||||
assert d.flag.keep_cfg_file == val.keep_cfg_file
|
||||
assert d.opt.online == val.online
|
||||
assert d.opt.no_daemonize == val.no_daemonize
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ class init_test:
|
|||
|
||||
@staticmethod
|
||||
async def btc(cfg, daemon, backend, cfg_override):
|
||||
rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
|
||||
rpc = await rpc_init(cfg, daemon.proto, backend=backend, daemon=daemon)
|
||||
do_msg(rpc, backend)
|
||||
|
||||
wi = await rpc.walletinfo
|
||||
|
|
@ -106,7 +106,7 @@ class init_test:
|
|||
|
||||
@staticmethod
|
||||
async def bch(cfg, daemon, backend, cfg_override):
|
||||
rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
|
||||
rpc = await rpc_init(cfg, daemon.proto, backend=backend, daemon=daemon)
|
||||
do_msg(rpc, backend)
|
||||
return rpc
|
||||
|
||||
|
|
@ -114,7 +114,7 @@ class init_test:
|
|||
|
||||
@staticmethod
|
||||
async def eth(cfg, daemon, backend, cfg_override):
|
||||
rpc = await rpc_init(cfg, daemon.proto, backend, daemon)
|
||||
rpc = await rpc_init(cfg, daemon.proto, backend=backend, daemon=daemon)
|
||||
do_msg(rpc, backend)
|
||||
await rpc.call('eth_blockNumber', timeout=300)
|
||||
if rpc.proto.network == 'testnet':
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ class unit_tests:
|
|||
|
||||
async def newtx(self, name, ut):
|
||||
qmsg(' Testing NewTX initializer')
|
||||
d = CoinDaemon(cfg, 'btc', test_suite=True)
|
||||
d = CoinDaemon(cfg, network_id='btc', test_suite=True)
|
||||
d.start()
|
||||
|
||||
proto = init_proto(cfg, 'btc', need_amt=True)
|
||||
|
|
|
|||
|
|
@ -322,7 +322,7 @@ def do_ab_test(proto, scfg, addr_type, gen1, kg2, ag, tool, cache_data):
|
|||
for _ in range(scfg.rounds):
|
||||
yield getrand(32)
|
||||
|
||||
kg1 = KeyGenerator(cfg, proto, addr_type.pubkey_type, gen1)
|
||||
kg1 = KeyGenerator(cfg, proto, addr_type.pubkey_type, backend=gen1)
|
||||
if type(kg1) == type(kg2):
|
||||
die(4, 'Key generators are the same!')
|
||||
|
||||
|
|
@ -378,7 +378,7 @@ def ab_test(proto, scfg):
|
|||
|
||||
if scfg.gen2:
|
||||
assert scfg.gen1 != 'all', "'all' must be used only with external tool"
|
||||
kg2 = KeyGenerator(cfg, proto, addr_type.pubkey_type, scfg.gen2)
|
||||
kg2 = KeyGenerator(cfg, proto, addr_type.pubkey_type, backend=scfg.gen2)
|
||||
tool = None
|
||||
else:
|
||||
toolname = find_or_check_tool(proto, addr_type, scfg.tool)
|
||||
|
|
@ -541,7 +541,7 @@ def main():
|
|||
for p in protos:
|
||||
ab_test(p, scfg)
|
||||
else:
|
||||
kg = KeyGenerator(cfg, proto, addr_type.pubkey_type, scfg.gen1)
|
||||
kg = KeyGenerator(cfg, proto, addr_type.pubkey_type, backend=scfg.gen1)
|
||||
ag = AddrGenerator(cfg, proto, addr_type)
|
||||
if scfg.test == 'speed':
|
||||
speed_test(proto, kg, ag, scfg.rounds)
|
||||
|
|
|
|||
|
|
@ -307,7 +307,7 @@ def test_daemons_ops(*network_ids, op, remove_datadir=False):
|
|||
silent = not (cfg.verbose or cfg.exact_output)
|
||||
ret = False
|
||||
for network_id in network_ids:
|
||||
d = CoinDaemon(cfg, network_id, test_suite=True)
|
||||
d = CoinDaemon(cfg, network_id=network_id, test_suite=True)
|
||||
if remove_datadir:
|
||||
d.wait = True
|
||||
d.stop(silent=True)
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ class unit_tests:
|
|||
|
||||
coin_type1 = purpose.derive_private()
|
||||
|
||||
coin_type2 = m.to_coin_type('btc', addr_type='bech32')
|
||||
coin_type2 = m.to_coin_type(coin='btc', addr_type='bech32')
|
||||
assert coin_type1.address == coin_type2.address
|
||||
vmsg(f' {coin_type1.address=}')
|
||||
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ def do_test(proto, wif, addr_chk, addr_type, internal_keccak):
|
|||
|
||||
for n, backend in enumerate(get_backends(at.pubkey_type)):
|
||||
|
||||
kg = KeyGenerator( cfg, proto, at.pubkey_type, n+1)
|
||||
kg = KeyGenerator(cfg, proto, at.pubkey_type, silent=n+1)
|
||||
qmsg(blue(f' Testing backend {backend!r} for addr type {addr_type!r}{add_msg}'))
|
||||
|
||||
data = kg.gen_data(privkey)
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class unit_tests:
|
|||
vmsg(brown(' vectors:'))
|
||||
vmsg(fs.format('REL_NOW', 'SHOW_SECS', 'ELAPSED', 'OUTPUT'))
|
||||
for (t, now, rel_now, show_secs, out_chk) in vectors:
|
||||
out = format_elapsed_hr(t, now, rel_now=rel_now, show_secs=show_secs)
|
||||
out = format_elapsed_hr(t, now=now, rel_now=rel_now, show_secs=show_secs)
|
||||
assert out == out_chk, f'{out} != {out_chk}'
|
||||
vmsg(fs.format(repr(rel_now), repr(show_secs), now-t, out))
|
||||
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue