use keyword-only parameters where practicable (103 files changed)
This commit is contained in:
parent
168e6bbc56
commit
70022b04b2
103 changed files with 307 additions and 279 deletions
|
|
@ -54,7 +54,7 @@ class MMGenAddrType(HiliteStr, InitErrors, MMGenObject):
|
|||
'Z': ati('zcash_z','zcash_z',False,'zcash_z', 'zcash_z', 'wif', ('viewkey',), 'Zcash z-address'),
|
||||
'M': ati('monero', 'monero', False,'monero', 'monero', 'spendkey',('viewkey','wallet_passwd'),'Monero address'),
|
||||
}
|
||||
def __new__(cls, proto, id_str, errmsg=None):
|
||||
def __new__(cls, proto, id_str, *, errmsg=None):
|
||||
if isinstance(id_str, cls):
|
||||
return id_str
|
||||
try:
|
||||
|
|
@ -99,7 +99,7 @@ class AddrListID(HiliteStr, InitErrors, MMGenObject):
|
|||
width = 10
|
||||
trunc_ok = False
|
||||
color = 'yellow'
|
||||
def __new__(cls, sid=None, mmtype=None, proto=None, id_str=None):
|
||||
def __new__(cls, *, sid=None, mmtype=None, proto=None, id_str=None):
|
||||
try:
|
||||
if id_str:
|
||||
a, b = id_str.split(':')
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ from .addrlist import AddrListEntry, AddrListData, AddrList
|
|||
|
||||
class AddrData(MMGenObject):
|
||||
|
||||
def __init__(self, proto, *args, **kwargs):
|
||||
def __init__(self, proto):
|
||||
self.al_ids = {}
|
||||
self.proto = proto
|
||||
self.rpc = None
|
||||
|
|
@ -68,10 +68,10 @@ class AddrData(MMGenObject):
|
|||
|
||||
class TwAddrData(AddrData, metaclass=AsyncInit):
|
||||
|
||||
def __new__(cls, cfg, proto, *args, **kwargs):
|
||||
def __new__(cls, cfg, proto, *, twctl=None):
|
||||
return MMGenObject.__new__(proto.base_proto_subclass(cls, 'addrdata'))
|
||||
|
||||
async def __init__(self, cfg, proto, twctl=None):
|
||||
async def __init__(self, cfg, proto, *, twctl=None):
|
||||
from .rpc import rpc_init
|
||||
from .tw.shared import TwLabel
|
||||
from .seed import SeedID
|
||||
|
|
@ -79,7 +79,7 @@ class TwAddrData(AddrData, metaclass=AsyncInit):
|
|||
self.proto = proto
|
||||
self.rpc = await rpc_init(cfg, proto)
|
||||
self.al_ids = {}
|
||||
twd = await self.get_tw_data(twctl)
|
||||
twd = await self.get_tw_data(twctl=twctl)
|
||||
out, i = {}, 0
|
||||
for acct, addr_array in twd:
|
||||
l = get_obj(TwLabel, proto=self.proto, text=acct, silent=True)
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ class AddrIdxList(tuple, InitErrors, MMGenObject):
|
|||
|
||||
max_len = 1000000
|
||||
|
||||
def __new__(cls, fmt_str=None, idx_list=None, sep=','):
|
||||
def __new__(cls, *, fmt_str=None, idx_list=None, sep=','):
|
||||
try:
|
||||
if fmt_str:
|
||||
def gen():
|
||||
|
|
@ -103,7 +103,7 @@ class AddrListIDStr(HiliteStr):
|
|||
color = 'green'
|
||||
trunc_ok = False
|
||||
|
||||
def __new__(cls, addrlist, fmt_str=None):
|
||||
def __new__(cls, addrlist, *, fmt_str=None):
|
||||
idxs = [e.idx for e in addrlist.data]
|
||||
prev = idxs[0]
|
||||
ret = [prev]
|
||||
|
|
@ -188,7 +188,7 @@ class AddrList(MMGenObject): # Address info for a single seed ID
|
|||
src = 'gen'
|
||||
adata = self.generate(
|
||||
seed,
|
||||
addr_idxs if isinstance(addr_idxs, AddrIdxList) else AddrIdxList(addr_idxs))
|
||||
addr_idxs if isinstance(addr_idxs, AddrIdxList) else AddrIdxList(fmt_str=addr_idxs))
|
||||
do_chksum = True
|
||||
elif infile: # data from MMGen address file
|
||||
self.infile = infile
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ altcoin.util: various altcoin-related utilities
|
|||
|
||||
from ..util import die
|
||||
|
||||
def decrypt_keystore(data, passwd, mac_algo=None, mac_params={}):
|
||||
def decrypt_keystore(data, passwd, *, mac_algo=None, mac_params={}):
|
||||
"""
|
||||
Decrypt the encrypted data in a cross-chain keystore
|
||||
Returns the decrypted data as a bytestring
|
||||
|
|
|
|||
|
|
@ -459,7 +459,7 @@ class Autosign:
|
|||
def init_fixup(self): # see test/overlay/fakemods/mmgen/autosign.py
|
||||
pass
|
||||
|
||||
def __init__(self, cfg, cmd=None):
|
||||
def __init__(self, cfg, *, cmd=None):
|
||||
|
||||
if cfg.mnemonic_fmt:
|
||||
if cfg.mnemonic_fmt not in self.mn_fmts:
|
||||
|
|
@ -728,7 +728,7 @@ class Autosign:
|
|||
def _get_macOS_ramdisk_size(self):
|
||||
from .platform.darwin.util import MacOSRamDisk, warn_ramdisk_too_small
|
||||
# allow 1MB for each Monero wallet
|
||||
xmr_size = len(AddrIdxList(self.cfg.xmrwallets)) if self.cfg.xmrwallets else 0
|
||||
xmr_size = len(AddrIdxList(fmt_str=self.cfg.xmrwallets)) if self.cfg.xmrwallets else 0
|
||||
calc_size = xmr_size + 1
|
||||
usr_size = self.cfg.macos_ramdisk_size or self.cfg.macos_autosign_ramdisk_size
|
||||
if is_int(usr_size):
|
||||
|
|
|
|||
|
|
@ -113,7 +113,7 @@ class baseconv:
|
|||
die(3, 'ERROR: List is not sorted!')
|
||||
|
||||
@staticmethod
|
||||
def get_pad(pad, seed_pad_func):
|
||||
def get_pad(pad, /, seed_pad_func):
|
||||
"""
|
||||
'pad' argument to baseconv conversion methods must be either None, 'seed' or an integer.
|
||||
If None, output of minimum (but never zero) length will be produced.
|
||||
|
|
@ -129,11 +129,11 @@ class baseconv:
|
|||
else:
|
||||
die('BaseConversionPadError', f"{pad!r}: illegal value for 'pad' (must be None, 'seed' or int)")
|
||||
|
||||
def tohex(self, words_arg, *, pad=None):
|
||||
def tohex(self, words_arg, /, *, pad=None):
|
||||
"convert string or list data of instance base to a hexadecimal string"
|
||||
return self.tobytes(words_arg, pad=pad//2 if type(pad) is int else pad).hex()
|
||||
|
||||
def tobytes(self, words_arg, *, pad=None):
|
||||
def tobytes(self, words_arg, /, *, pad=None):
|
||||
"convert string or list data of instance base to byte string"
|
||||
|
||||
words = words_arg if isinstance(words_arg, (list, tuple)) else tuple(words_arg.strip())
|
||||
|
|
@ -163,7 +163,7 @@ class baseconv:
|
|||
bl = ret.bit_length()
|
||||
return ret.to_bytes(max(pad_val, bl//8+bool(bl%8)), 'big')
|
||||
|
||||
def fromhex(self, hexstr, *, pad=None, tostr=False):
|
||||
def fromhex(self, hexstr, /, *, pad=None, tostr=False):
|
||||
"convert a hexadecimal string to a list or string data of instance base"
|
||||
|
||||
from .util import is_hex_str
|
||||
|
|
@ -174,7 +174,7 @@ class baseconv:
|
|||
|
||||
return self.frombytes(bytes.fromhex(hexstr), pad=pad, tostr=tostr)
|
||||
|
||||
def frombytes(self, bytestr, *, pad=None, tostr=False):
|
||||
def frombytes(self, bytestr, /, *, pad=None, tostr=False):
|
||||
"convert byte string to list or string data of instance base"
|
||||
|
||||
if not bytestr:
|
||||
|
|
|
|||
|
|
@ -54,24 +54,24 @@ class bip39(baseconv):
|
|||
self.wl_id = 'bip39'
|
||||
|
||||
@classmethod
|
||||
def nwords2seedlen(cls, nwords, *, in_bytes=False, in_hex=False):
|
||||
def nwords2seedlen(cls, nwords, /, *, in_bytes=False, in_hex=False):
|
||||
for k, v in cls.constants.items():
|
||||
if v.mn_len == nwords:
|
||||
return k//8 if in_bytes else k//4 if in_hex else k
|
||||
die('MnemonicError', f'{nwords!r}: invalid word length for BIP39 mnemonic')
|
||||
|
||||
@classmethod
|
||||
def seedlen2nwords(cls, seed_len, *, in_bytes=False, in_hex=False):
|
||||
def seedlen2nwords(cls, seed_len, /, *, in_bytes=False, in_hex=False):
|
||||
seed_bits = seed_len * 8 if in_bytes else seed_len * 4 if in_hex else seed_len
|
||||
try:
|
||||
return cls.constants[seed_bits].mn_len
|
||||
except Exception as e:
|
||||
raise ValueError(f'{seed_bits!r}: invalid seed length for BIP39 mnemonic') from e
|
||||
|
||||
def tohex(self, words_arg, pad=None):
|
||||
def tohex(self, words_arg, /, *, pad=None):
|
||||
return self.tobytes(words_arg, pad=pad).hex()
|
||||
|
||||
def tobytes(self, words_arg, pad=None):
|
||||
def tobytes(self, words_arg, /, *, pad=None):
|
||||
assert isinstance(words_arg, (list, tuple)), 'words_arg must be list or tuple'
|
||||
assert pad in (None, 'seed'), f"{pad}: invalid 'pad' argument (must be None or 'seed')"
|
||||
|
||||
|
|
@ -105,11 +105,11 @@ class bip39(baseconv):
|
|||
|
||||
return seed_bytes
|
||||
|
||||
def fromhex(self, hexstr, *, pad=None, tostr=False):
|
||||
def fromhex(self, hexstr, /, *, pad=None, tostr=False):
|
||||
assert is_hex_str(hexstr), 'seed data not a hexadecimal string'
|
||||
return self.frombytes(bytes.fromhex(hexstr), pad=pad, tostr=tostr)
|
||||
|
||||
def frombytes(self, seed_bytes, *, pad=None, tostr=False):
|
||||
def frombytes(self, seed_bytes, /, *, pad=None, tostr=False):
|
||||
assert tostr is False, "'tostr' must be False for 'bip39'"
|
||||
assert pad in (None, 'seed'), f"{pad}: invalid 'pad' argument (must be None or 'seed')"
|
||||
|
||||
|
|
@ -128,7 +128,7 @@ class bip39(baseconv):
|
|||
|
||||
return tuple(wl[int(res[i*11:(i+1)*11], 2)] for i in range(c.mn_len))
|
||||
|
||||
def generate_seed(self, words_arg, passwd=''):
|
||||
def generate_seed(self, words_arg, /, *, passwd=''):
|
||||
|
||||
self.tohex(words_arg) # validate
|
||||
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ class BipHDConfig(Lockable):
|
|||
|
||||
supported_coins = ('btc', 'eth', 'doge', 'ltc', 'bch')
|
||||
|
||||
def __init__(self, base_cfg, coin, network, addr_type, from_path, no_path_checks):
|
||||
def __init__(self, base_cfg, coin, *, network, addr_type, from_path, no_path_checks):
|
||||
|
||||
if not coin.lower() in self.supported_coins:
|
||||
raise ValueError(f'bip_hd: coin {coin.upper()} not supported')
|
||||
|
|
@ -196,10 +196,10 @@ class MasterNode(Lockable):
|
|||
new.cfg = BipHDConfig(
|
||||
self.base_cfg,
|
||||
coin,
|
||||
network,
|
||||
addr_type,
|
||||
from_path,
|
||||
no_path_checks)
|
||||
network = network,
|
||||
addr_type = addr_type,
|
||||
from_path = from_path,
|
||||
no_path_checks = no_path_checks)
|
||||
new.par_print = self.par_print
|
||||
new.depth = self.depth
|
||||
new.key = self.key
|
||||
|
|
@ -237,7 +237,7 @@ class BipHDNode(Lockable):
|
|||
'None' if getattr(cls, name) is None else f'None or {getattr(cls, name)}')
|
||||
)
|
||||
|
||||
def set_params(self, cfg, idx, hardened):
|
||||
def set_params(self, cfg, idx, *, hardened):
|
||||
self.check_param('idx', idx)
|
||||
self.check_param('hardened', hardened)
|
||||
return (
|
||||
|
|
@ -323,7 +323,7 @@ class BipHDNode(Lockable):
|
|||
def derive_private(self, idx=None, hardened=None):
|
||||
return self.derive(idx=idx, hardened=hardened, public=False)
|
||||
|
||||
def derive(self, idx, hardened, public):
|
||||
def derive(self, idx, *, hardened, public):
|
||||
|
||||
if self.public and not public:
|
||||
raise ValueError('cannot derive private node from public node!')
|
||||
|
|
@ -341,7 +341,7 @@ class BipHDNode(Lockable):
|
|||
if new.public and type(new).hardened:
|
||||
raise ValueError(
|
||||
f'‘public’ requested, but node of depth {new.depth} ({new.desc}) must be hardened!')
|
||||
new.idx, new.hardened = new.set_params(new.cfg, idx, hardened)
|
||||
new.idx, new.hardened = new.set_params(new.cfg, idx, hardened=hardened)
|
||||
|
||||
key_in = b'\x00' + self.key if new.hardened else self.pubkey_bytes
|
||||
|
||||
|
|
@ -398,14 +398,14 @@ class BipHDNode(Lockable):
|
|||
if not is_int(idx):
|
||||
raise ValueError(f'invalid path component {s!r}')
|
||||
|
||||
res = res.derive(int(idx), hardened, public=False)
|
||||
res = res.derive(int(idx), hardened=hardened, public=False)
|
||||
|
||||
return res
|
||||
|
||||
@staticmethod
|
||||
# ‘addr_type’ is required for broken coins with duplicate version bytes across BIP protocols
|
||||
# (i.e. Dogecoin)
|
||||
def from_extended_key(base_cfg, coin, xkey_b58, addr_type=None):
|
||||
def from_extended_key(base_cfg, coin, xkey_b58, *, addr_type=None):
|
||||
xk = Bip32ExtendedKey(xkey_b58)
|
||||
|
||||
if xk.public:
|
||||
|
|
@ -424,10 +424,10 @@ class BipHDNode(Lockable):
|
|||
new.cfg = BipHDConfig(
|
||||
base_cfg,
|
||||
coin,
|
||||
xk.network,
|
||||
addr_type or addr_types[xk.bip_proto],
|
||||
False,
|
||||
False)
|
||||
network = xk.network,
|
||||
addr_type = addr_type or addr_types[xk.bip_proto],
|
||||
from_path = False,
|
||||
no_path_checks = False)
|
||||
|
||||
new.par_print = xk.par_print
|
||||
new.depth = xk.depth
|
||||
|
|
@ -460,7 +460,7 @@ class BipHDNodePurpose(BipHDNode):
|
|||
desc = 'Purpose'
|
||||
hardened = True
|
||||
|
||||
def set_params(self, cfg, idx, hardened):
|
||||
def set_params(self, cfg, idx, *, hardened):
|
||||
self.check_param('hardened', hardened)
|
||||
if idx not in (None, cfg.bip_proto):
|
||||
raise ValueError(
|
||||
|
|
@ -472,7 +472,7 @@ class BipHDNodeCoinType(BipHDNode):
|
|||
desc = 'Coin Type'
|
||||
hardened = True
|
||||
|
||||
def set_params(self, cfg, idx, hardened):
|
||||
def set_params(self, cfg, idx, *, hardened):
|
||||
self.check_param('hardened', hardened)
|
||||
chain_idx = get_chain_params(
|
||||
bipnum = get_bip_by_addr_type(cfg.addr_type),
|
||||
|
|
@ -498,7 +498,7 @@ class BipHDNodeChain(BipHDNode):
|
|||
desc = 'Chain'
|
||||
hardened = False
|
||||
|
||||
def set_params(self, cfg, idx, hardened):
|
||||
def set_params(self, cfg, idx, *, hardened):
|
||||
self.check_param('hardened', hardened)
|
||||
if idx not in (0, 1):
|
||||
raise ValueError(
|
||||
|
|
|
|||
12
mmgen/cfg.py
12
mmgen/cfg.py
|
|
@ -99,7 +99,7 @@ class GlobalConstants(Lockable):
|
|||
else:
|
||||
die2(2, '$HOME is not set! Unable to determine home directory')
|
||||
|
||||
def get_mmgen_data_file(self, filename, package='mmgen'):
|
||||
def get_mmgen_data_file(self, *, filename, package='mmgen'):
|
||||
"""
|
||||
this is an expensive import, so do only when required
|
||||
"""
|
||||
|
|
@ -522,7 +522,7 @@ class Config(Lockable):
|
|||
# Step 4: set cfg from cfgfile, skipping already-set opts and auto opts; save set opts and auto
|
||||
# opts to be set:
|
||||
# requires ‘data_dir_root’, ‘test_suite_cfgtest’
|
||||
self._cfgfile_opts = self._set_cfg_from_cfg_file(self._envopts, need_proto)
|
||||
self._cfgfile_opts = self._set_cfg_from_cfg_file(self._envopts, need_proto=need_proto)
|
||||
|
||||
# Step 5: set autoset opts from user-supplied data, cfgfile data, or default values, in that order:
|
||||
self._set_autoset_opts(self._cfgfile_opts.autoset)
|
||||
|
|
@ -615,7 +615,7 @@ class Config(Lockable):
|
|||
else:
|
||||
raise ValueError(f'{name!r} is not a valid MMGen environment variable')
|
||||
|
||||
def _set_cfg_from_cfg_file(self, env_cfg, need_proto):
|
||||
def _set_cfg_from_cfg_file(self, env_cfg, *, need_proto):
|
||||
|
||||
_ret = namedtuple('cfgfile_opts', ['non_auto', 'autoset', 'auto_typeset'])
|
||||
|
||||
|
|
@ -761,7 +761,7 @@ def check_opts(cfg): # Raises exception if any check fails
|
|||
+ (f' in {cfg._cfgfile_fn!r}' if name in cfg._cfgfile_opts.non_auto else '')
|
||||
)
|
||||
|
||||
def display_opt(name, val='', beg='For selected', end=':\n'):
|
||||
def display_opt(name, val='', *, beg='For selected', end=':\n'):
|
||||
from .util import msg_r
|
||||
msg_r('{} option {!r}{}'.format(
|
||||
beg,
|
||||
|
|
@ -779,11 +779,11 @@ def check_opts(cfg): # Raises exception if any check fails
|
|||
}[op_str](val, target):
|
||||
die('UserOptError', f'{val}: invalid {get_desc()} (not {op_str} {target})')
|
||||
|
||||
def opt_is_int(val, desc_pfx=''):
|
||||
def opt_is_int(val, *, desc_pfx=''):
|
||||
if not is_int(val):
|
||||
die('UserOptError', f'{val!r}: invalid {get_desc(desc_pfx)} (not an integer)')
|
||||
|
||||
def opt_is_in_list(val, tlist, desc_pfx=''):
|
||||
def opt_is_in_list(val, tlist, *, desc_pfx=''):
|
||||
if val not in tlist:
|
||||
q, sep = (('', ','), ("'", "','"))[isinstance(tlist[0], str)]
|
||||
die('UserOptError', '{q}{v}{q}: invalid {w}\nValid choices: {q}{o}{q}'.format(
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ class CfgFileSampleSys(cfg_file_sample):
|
|||
else:
|
||||
# self.fn is used for error msgs only, so file need not exist on filesystem
|
||||
self.fn = os.path.join(os.path.dirname(__file__), 'data', self.fn_base)
|
||||
self.data = gc.get_mmgen_data_file(self.fn_base).splitlines()
|
||||
self.data = gc.get_mmgen_data_file(filename=self.fn_base).splitlines()
|
||||
|
||||
def make_metadata(self):
|
||||
return [f'# Version {self.cur_ver} {self.computed_chksum}']
|
||||
|
|
|
|||
|
|
@ -89,10 +89,10 @@ class Crypto:
|
|||
msg(f'Seed: {seed.hex()!r}\nScramble key: {scramble_key}\nScrambled seed: {step1.hex()}\n')
|
||||
return self.sha256_rounds(step1)
|
||||
|
||||
def encrypt_seed(self, data, key, desc='seed'):
|
||||
def encrypt_seed(self, data, key, *, desc='seed'):
|
||||
return self.encrypt_data(data, key=key, desc=desc)
|
||||
|
||||
def decrypt_seed(self, enc_seed, key, seed_id, key_id):
|
||||
def decrypt_seed(self, enc_seed, key, *, seed_id, key_id):
|
||||
self.util.vmsg_r('Checking key...')
|
||||
chk1 = make_chksum_8(key)
|
||||
if key_id:
|
||||
|
|
@ -152,6 +152,7 @@ class Crypto:
|
|||
self,
|
||||
enc_data,
|
||||
key,
|
||||
*,
|
||||
iv = aesctr_dfl_iv,
|
||||
desc = 'data'):
|
||||
|
||||
|
|
@ -167,6 +168,7 @@ class Crypto:
|
|||
passwd,
|
||||
salt,
|
||||
hash_preset,
|
||||
*,
|
||||
buflen = 32):
|
||||
|
||||
# Buflen arg is for brainwallets only, which use this function to generate
|
||||
|
|
@ -228,7 +230,7 @@ class Crypto:
|
|||
self.util.dmsg(f'Key: {key.hex()}')
|
||||
return key
|
||||
|
||||
def _get_random_data_from_user(self, uchars=None, desc='data'):
|
||||
def _get_random_data_from_user(self, uchars=None, *, desc='data'):
|
||||
|
||||
if uchars is None:
|
||||
uchars = self.cfg.usr_randchars
|
||||
|
|
@ -297,6 +299,7 @@ class Crypto:
|
|||
def add_user_random(
|
||||
self,
|
||||
rand_bytes,
|
||||
*,
|
||||
desc,
|
||||
urand = {'data':b'', 'counter':0}):
|
||||
|
||||
|
|
@ -329,6 +332,7 @@ class Crypto:
|
|||
def get_hash_preset_from_user(
|
||||
self,
|
||||
old_preset = gc.dfl_hash_preset,
|
||||
*,
|
||||
data_desc = 'data',
|
||||
prompt = None):
|
||||
|
||||
|
|
@ -347,7 +351,7 @@ class Crypto:
|
|||
else:
|
||||
return old_preset
|
||||
|
||||
def get_new_passphrase(self, data_desc, hash_preset, passwd_file, pw_desc='passphrase'):
|
||||
def get_new_passphrase(self, data_desc, hash_preset, passwd_file, *, pw_desc='passphrase'):
|
||||
message = f"""
|
||||
You must choose a passphrase to encrypt your {data_desc} with.
|
||||
A key will be generated from your passphrase using a hash preset of '{hash_preset}'.
|
||||
|
|
@ -383,7 +387,7 @@ class Crypto:
|
|||
|
||||
return pw
|
||||
|
||||
def get_passphrase(self, data_desc, passwd_file, pw_desc='passphrase'):
|
||||
def get_passphrase(self, data_desc, passwd_file, *, pw_desc='passphrase'):
|
||||
if passwd_file:
|
||||
from .fileutil import get_words_from_file
|
||||
return ' '.join(get_words_from_file(
|
||||
|
|
@ -395,7 +399,7 @@ class Crypto:
|
|||
from .ui import get_words_from_user
|
||||
return ' '.join(get_words_from_user(self.cfg, f'Enter {pw_desc} for {data_desc}: '))
|
||||
|
||||
def mmgen_encrypt(self, data, desc='data', hash_preset=None):
|
||||
def mmgen_encrypt(self, data, *, desc='data', hash_preset=None):
|
||||
salt = self.get_random(self.mmenc_salt_len)
|
||||
iv = self.get_random(self.aesctr_iv_len)
|
||||
nonce = self.get_random(self.mmenc_nonce_len)
|
||||
|
|
@ -412,7 +416,7 @@ class Crypto:
|
|||
enc_d = self.encrypt_data(sha256(nonce+data).digest() + nonce + data, key=key, iv=iv, desc=desc)
|
||||
return salt+iv+enc_d
|
||||
|
||||
def mmgen_decrypt(self, data, desc='data', hash_preset=None):
|
||||
def mmgen_decrypt(self, data, *, desc='data', hash_preset=None):
|
||||
self.util.vmsg(f'Preparing to decrypt {desc}')
|
||||
dstart = self.mmenc_salt_len + self.aesctr_iv_len
|
||||
salt = data[:self.mmenc_salt_len]
|
||||
|
|
@ -425,7 +429,7 @@ class Crypto:
|
|||
data_desc = desc,
|
||||
passwd_file = self.cfg.passwd_file)
|
||||
key = self.make_key(passwd, salt, hp)
|
||||
dec_d = self.decrypt_data(enc_d, key, iv, desc)
|
||||
dec_d = self.decrypt_data(enc_d, key, iv=iv, desc=desc)
|
||||
sha256_len = 32
|
||||
from hashlib import sha256
|
||||
if dec_d[:sha256_len] == sha256(dec_d[sha256_len:]).digest():
|
||||
|
|
@ -435,9 +439,9 @@ class Crypto:
|
|||
msg('Incorrect passphrase or hash preset')
|
||||
return False
|
||||
|
||||
def mmgen_decrypt_retry(self, d, desc='data'):
|
||||
def mmgen_decrypt_retry(self, d, *, desc='data'):
|
||||
while True:
|
||||
d_dec = self.mmgen_decrypt(d, desc)
|
||||
d_dec = self.mmgen_decrypt(d, desc=desc)
|
||||
if d_dec:
|
||||
return d_dec
|
||||
msg('Trying again...')
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class Daemon(Lockable):
|
|||
_reset_ok = ('debug', 'wait', 'pids')
|
||||
version_info_arg = '--version'
|
||||
|
||||
def __init__(self, cfg, opts=None, flags=None):
|
||||
def __init__(self, cfg, *, opts=None, flags=None):
|
||||
|
||||
self.cfg = cfg
|
||||
self.platform = sys.platform
|
||||
|
|
@ -78,7 +78,7 @@ class Daemon(Lockable):
|
|||
p = Popen(cmd, creationflags=CREATE_NEW_CONSOLE, startupinfo=si)
|
||||
p.wait()
|
||||
|
||||
def exec_cmd(self, cmd, is_daemon=False, check_retcode=False):
|
||||
def exec_cmd(self, cmd, *, is_daemon=False, check_retcode=False):
|
||||
out = (PIPE, None)[is_daemon and self.opt.no_daemonize]
|
||||
try:
|
||||
cp = run(cmd, check=False, stdout=out, stderr=out)
|
||||
|
|
@ -105,7 +105,7 @@ class Daemon(Lockable):
|
|||
if self.use_threads and is_daemon and not self.opt.no_daemonize:
|
||||
ret = self.exec_cmd_thread(cmd)
|
||||
else:
|
||||
ret = self.exec_cmd(cmd, is_daemon, check_retcode)
|
||||
ret = self.exec_cmd(cmd, is_daemon=is_daemon, check_retcode=check_retcode)
|
||||
|
||||
if isinstance(ret, CompletedProcess):
|
||||
if ret.stdout and (self.debug or not silent):
|
||||
|
|
@ -166,7 +166,7 @@ class Daemon(Lockable):
|
|||
def cli(self, *cmds, silent=False):
|
||||
return self.run_cmd(self.cli_cmd(*cmds), silent=silent)
|
||||
|
||||
def state_msg(self, extra_text=None):
|
||||
def state_msg(self, *, extra_text=None):
|
||||
try:
|
||||
pid = self.pid
|
||||
except:
|
||||
|
|
@ -225,7 +225,7 @@ class Daemon(Lockable):
|
|||
self.stop(silent=silent)
|
||||
return self.start(silent=silent)
|
||||
|
||||
def test_socket(self, host, port, timeout=10):
|
||||
def test_socket(self, host, port, *, timeout=10):
|
||||
import socket
|
||||
try:
|
||||
socket.create_connection((host, port), timeout=timeout).close()
|
||||
|
|
@ -258,7 +258,7 @@ class RPCDaemon(Daemon):
|
|||
|
||||
avail_opts = ('no_daemonize',)
|
||||
|
||||
def __init__(self, cfg, opts=None, flags=None):
|
||||
def __init__(self, cfg, *, opts=None, flags=None):
|
||||
super().__init__(cfg, opts=opts, flags=flags)
|
||||
self.desc = '{} {} {}RPC daemon'.format(
|
||||
self.rpc_type,
|
||||
|
|
@ -317,7 +317,7 @@ class CoinDaemon(Daemon):
|
|||
return ret
|
||||
|
||||
@classmethod
|
||||
def get_daemon(cls, cfg, coin, daemon_id, proto=None):
|
||||
def get_daemon(cls, cfg, coin, daemon_id, *, proto=None):
|
||||
if proto:
|
||||
proto_cls = type(proto)
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -341,7 +341,7 @@ def get_lines_from_file(
|
|||
if have_enc_ext or not is_utf8(data):
|
||||
m = ('Attempting to decrypt', 'Decrypting')[have_enc_ext]
|
||||
cfg._util.qmsg(f'{m} {desc} ‘{fn}’')
|
||||
data = Crypto(cfg).mmgen_decrypt_retry(data, desc)
|
||||
data = Crypto(cfg).mmgen_decrypt_retry(data, desc=desc)
|
||||
return data
|
||||
|
||||
lines = decrypt_file_maybe().decode().splitlines()
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ class PrivKey(bytes, InitErrors, MMGenObject):
|
|||
wif = ImmutableAttr(WifKey, typeconv=False)
|
||||
|
||||
# initialize with (priv_bin, compressed), WIF or self
|
||||
def __new__(cls, proto, s=None, compressed=None, wif=None, pubkey_type=None):
|
||||
def __new__(cls, proto, s=None, *, compressed=None, wif=None, pubkey_type=None):
|
||||
if isinstance(s, cls):
|
||||
return s
|
||||
if wif:
|
||||
|
|
@ -103,7 +103,7 @@ class PrivKey(bytes, InitErrors, MMGenObject):
|
|||
assert type(compressed) is bool, (
|
||||
f"'compressed' must be of type bool, not {type(compressed).__name__}")
|
||||
me = bytes.__new__(cls, proto.preprocess_key(s, pubkey_type))
|
||||
me.wif = WifKey(proto, proto.encode_wif(me, pubkey_type, compressed))
|
||||
me.wif = WifKey(proto, proto.encode_wif(me, pubkey_type, compressed=compressed))
|
||||
me.compressed = compressed
|
||||
me.pubkey_type = pubkey_type
|
||||
me.orig_bytes = s # save the non-preprocessed key
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ def get_pubkey_type_cls(pubkey_type):
|
|||
importlib.import_module(f'mmgen.proto.{backend_data[pubkey_type]["package"]}.keygen'),
|
||||
'backend')
|
||||
|
||||
def _check_backend(cfg, backend, pubkey_type, desc='keygen backend'):
|
||||
def _check_backend(cfg, backend, pubkey_type, *, desc='keygen backend'):
|
||||
|
||||
from .util import is_int, die
|
||||
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ class LEDControl:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
name,
|
||||
control,
|
||||
trigger = None,
|
||||
|
|
@ -125,7 +126,7 @@ class LEDControl:
|
|||
))
|
||||
sys.exit(1)
|
||||
|
||||
def init_state(fn, desc, init_val=None):
|
||||
def init_state(fn, *, desc, init_val=None):
|
||||
try:
|
||||
write_init_val(fn, init_val)
|
||||
except PermissionError:
|
||||
|
|
|
|||
|
|
@ -211,7 +211,7 @@ if cmd not in ('sign', 'wait'):
|
|||
if getattr(cfg, opt):
|
||||
die(1, f'--{opt.replace("_", "-")} makes no sense for the ‘{cmd}’ operation')
|
||||
|
||||
asi = Autosign(cfg, cmd)
|
||||
asi = Autosign(cfg, cmd=cmd)
|
||||
|
||||
cfg._post_init()
|
||||
|
||||
|
|
|
|||
|
|
@ -61,13 +61,13 @@ class MsgOps:
|
|||
|
||||
class verify(sign):
|
||||
|
||||
async def __init__(self, msgfile, addr=None):
|
||||
async def __init__(self, msgfile, *, addr=None):
|
||||
try:
|
||||
m = SignedOnlineMsg(cfg, infile=msgfile)
|
||||
except:
|
||||
m = ExportedMsgSigs(cfg, infile=msgfile)
|
||||
|
||||
nSigs = await m.verify(addr)
|
||||
nSigs = await m.verify(addr=addr)
|
||||
|
||||
summary = f'{nSigs} signature{suf(nSigs)} verified'
|
||||
|
||||
|
|
@ -81,13 +81,13 @@ class MsgOps:
|
|||
|
||||
class export(sign):
|
||||
|
||||
async def __init__(self, msgfile, addr=None):
|
||||
async def __init__(self, msgfile, *, addr=None):
|
||||
|
||||
from .fileutil import write_data_to_file
|
||||
write_data_to_file(
|
||||
cfg = cfg,
|
||||
outfile = 'signatures.json',
|
||||
data = SignedOnlineMsg(cfg, infile=msgfile).get_json_for_export(addr),
|
||||
data = SignedOnlineMsg(cfg, infile=msgfile).get_json_for_export(addr=addr),
|
||||
desc = 'signature data')
|
||||
|
||||
opts_data = {
|
||||
|
|
@ -223,7 +223,7 @@ async def main():
|
|||
elif op in ('verify', 'export'):
|
||||
if len(cmd_args) not in (1, 2):
|
||||
cfg._usage()
|
||||
await getattr(MsgOps, op)(cmd_args[0], cmd_args[1] if len(cmd_args) == 2 else None)
|
||||
await getattr(MsgOps, op)(cmd_args[0], addr=cmd_args[1] if len(cmd_args) == 2 else None)
|
||||
else:
|
||||
die(1, f'{op!r}: unrecognized operation')
|
||||
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ async def main():
|
|||
kal = get_keyaddrlist(cfg, tx1.proto)
|
||||
kl = get_keylist(cfg)
|
||||
|
||||
tx2 = await txsign(cfg, tx1, seed_files, kl, kal, tx_num_disp)
|
||||
tx2 = await txsign(cfg, tx1, seed_files, kl, kal, tx_num_str=tx_num_disp)
|
||||
if tx2:
|
||||
if not cfg.yes:
|
||||
tx2.add_comment() # edits an existing comment
|
||||
|
|
|
|||
|
|
@ -143,7 +143,7 @@ elif op in ('export-outputs', 'export-outputs-sign', 'import-key-images'):
|
|||
else:
|
||||
die(1, f'{op!r}: unrecognized operation')
|
||||
|
||||
m = xmrwallet.op(op, cfg, infile, wallets, spec)
|
||||
m = xmrwallet.op(op, cfg, infile, wallets, spec=spec)
|
||||
|
||||
if asyncio.run(m.main()):
|
||||
m.post_main_success()
|
||||
|
|
|
|||
|
|
@ -261,7 +261,7 @@ class MnemonicEntry:
|
|||
self._usl = usl
|
||||
return self._usl
|
||||
|
||||
def idx(self, w, entry_mode, lo_idx=None, hi_idx=None):
|
||||
def idx(self, w, entry_mode, *, lo_idx=None, hi_idx=None):
|
||||
"""
|
||||
Return values:
|
||||
- all modes:
|
||||
|
|
@ -306,7 +306,7 @@ class MnemonicEntry:
|
|||
msg(' {}) {:8} {}'.format(
|
||||
n,
|
||||
mode.name + ':',
|
||||
fmt(mode.choose_info, ' '*14).lstrip().format(usl=self.uniq_ss_len),
|
||||
fmt(mode.choose_info, indent=' '*14).lstrip().format(usl=self.uniq_ss_len),
|
||||
))
|
||||
prompt = f'Type a number, or hit ENTER for the default ({capfirst(self.dfl_entry_mode)}): '
|
||||
erase = '\r' + ' ' * (len(prompt)+19) + '\r'
|
||||
|
|
@ -418,7 +418,7 @@ class MnemonicEntryMonero(MnemonicEntry):
|
|||
dfl_entry_mode = 'short'
|
||||
has_chksum = True
|
||||
|
||||
def mn_entry(cfg, wl_id, entry_mode=None):
|
||||
def mn_entry(cfg, wl_id, *, entry_mode=None):
|
||||
if wl_id == 'words':
|
||||
wl_id = 'mmgen'
|
||||
me = MnemonicEntry.get_cls_by_wordlist(wl_id)(cfg)
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ class MMGenIDRange(HiliteStr, InitErrors, MMGenObject):
|
|||
t = proto.addr_type((ss[1], proto.dfl_mmtype)[len(ss)==2])
|
||||
me = str.__new__(cls, '{}:{}:{}'.format(ss[0], t, ss[-1]))
|
||||
me.sid = SeedID(sid=ss[0])
|
||||
me.idxlist = AddrIdxList(ss[-1])
|
||||
me.idxlist = AddrIdxList(fmt_str=ss[-1])
|
||||
me.mmtype = t
|
||||
assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
|
||||
me.al_id = str.__new__(AddrListID, me.sid+':'+me.mmtype) # checks already done
|
||||
|
|
@ -208,7 +208,7 @@ class coin_msg:
|
|||
|
||||
class unsigned(completed):
|
||||
|
||||
async def sign(self, wallet_files, passwd_file=None):
|
||||
async def sign(self, wallet_files, *, passwd_file=None):
|
||||
|
||||
from .addrlist import KeyAddrList
|
||||
|
||||
|
|
@ -300,7 +300,7 @@ class coin_msg:
|
|||
|
||||
return sigs
|
||||
|
||||
async def verify(self, addr=None):
|
||||
async def verify(self, *, addr=None):
|
||||
|
||||
sigs = self.get_sigs(addr)
|
||||
|
||||
|
|
@ -319,7 +319,7 @@ class coin_msg:
|
|||
|
||||
return len(sigs)
|
||||
|
||||
def get_json_for_export(self, addr=None):
|
||||
def get_json_for_export(self, *, addr=None):
|
||||
sigs = list(self.get_sigs(addr).values())
|
||||
pfx = self.msg_cls.sigdata_pfx
|
||||
if pfx:
|
||||
|
|
|
|||
|
|
@ -264,7 +264,7 @@ class Int(int, Hilite, InitErrors):
|
|||
max_digits = None
|
||||
color = 'red'
|
||||
|
||||
def __new__(cls, n, base=10):
|
||||
def __new__(cls, n, *, base=10):
|
||||
if isinstance(n, cls):
|
||||
return n
|
||||
try:
|
||||
|
|
@ -306,7 +306,7 @@ class HexStr(HiliteStr, InitErrors):
|
|||
width = None
|
||||
hexcase = 'lower'
|
||||
trunc_ok = False
|
||||
def __new__(cls, s, case=None):
|
||||
def __new__(cls, s, *, case=None):
|
||||
if isinstance(s, cls):
|
||||
return s
|
||||
if case is None:
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ global_opts_help_pat = re.compile(r'^\t\t\t(.)(.) (?:--([{}a-zA-Z0-9-]{2,64})(=|
|
|||
|
||||
opt_tuple = namedtuple('cmdline_option', ['name', 'has_parm'])
|
||||
|
||||
def parse_opts(cfg, opts_data, global_opts_data, global_filter_codes, need_proto):
|
||||
def parse_opts(cfg, opts_data, global_opts_data, global_filter_codes, *, need_proto):
|
||||
|
||||
def parse_v1():
|
||||
for line in opts_data['text']['options'].strip().splitlines():
|
||||
|
|
@ -243,6 +243,7 @@ class Opts:
|
|||
def __init__(
|
||||
self,
|
||||
cfg,
|
||||
*,
|
||||
opts_data,
|
||||
init_opts, # dict containing opts to pre-initialize
|
||||
parsed_opts,
|
||||
|
|
@ -261,7 +262,7 @@ class Opts:
|
|||
opts_data,
|
||||
self.global_opts_data,
|
||||
self.global_filter_codes,
|
||||
need_proto)
|
||||
need_proto = need_proto)
|
||||
|
||||
cfg._args = po.cmd_args
|
||||
cfg._uopts = uopts = po.user_opts
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ class PasswordList(AddrList):
|
|||
self.chksum = AddrListChksum(self)
|
||||
|
||||
fs = f'{self.al_id.sid}-{self.pw_id_str}-{self.pw_fmt_disp}-{self.pw_len}[{{}}]'
|
||||
self.id_str = AddrListIDStr(self, fs)
|
||||
self.id_str = AddrListIDStr(self, fmt_str=fs)
|
||||
|
||||
if not skip_chksum_msg:
|
||||
self.do_chksum_msg(record=not infile)
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ class MacOSRamDisk:
|
|||
desc = 'ramdisk'
|
||||
min_size = 10 # 10MB is the minimum supported by hdiutil
|
||||
|
||||
def __init__(self, cfg, label, size, path=None):
|
||||
def __init__(self, cfg, label, size, *, path=None):
|
||||
if size < self.min_size:
|
||||
warn_ramdisk_too_small(size, self.min_size)
|
||||
size = self.min_size
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class BitcoinTwAddrData(TwAddrData):
|
|||
"""
|
||||
}
|
||||
|
||||
async def get_tw_data(self, twctl=None):
|
||||
async def get_tw_data(self, *, twctl=None):
|
||||
self.cfg._util.vmsg('Getting address data from tracking wallet')
|
||||
c = self.rpc
|
||||
if 'label_api' in c.caps:
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class mainnet(CoinProtocol.Secp256k1): # chainparams.cpp
|
|||
'cashaddr',
|
||||
)
|
||||
|
||||
def encode_wif(self, privbytes, pubkey_type, compressed): # input is preprocessed
|
||||
def encode_wif(self, privbytes, pubkey_type, *, compressed): # input is preprocessed
|
||||
assert len(privbytes) == self.privkey_len, f'{len(privbytes)} bytes: incorrect private key length!'
|
||||
assert pubkey_type in self.wif_ver_bytes, f'{pubkey_type!r}: invalid pubkey_type'
|
||||
return b58chk_encode(
|
||||
|
|
|
|||
|
|
@ -119,6 +119,7 @@ class BitcoinRPCClient(RPCClient, metaclass=AsyncInit):
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
daemon,
|
||||
backend,
|
||||
ignore_wallet):
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ class BitcoinTwAddresses(TwAddresses, BitcoinTwRPC):
|
|||
qmsg = self.cfg._util.qmsg
|
||||
qmsg_r = self.cfg._util.qmsg_r
|
||||
qmsg_r('Getting unspent outputs...')
|
||||
addrs = await self.get_unspent_by_mmid(self.minconf)
|
||||
addrs = await self.get_unspent_by_mmid(minconf=self.minconf)
|
||||
qmsg('done')
|
||||
|
||||
coin_amt = self.proto.coin_amt
|
||||
|
|
|
|||
|
|
@ -18,10 +18,10 @@ from ....rpc import rpc_init
|
|||
|
||||
class BitcoinTwGetBalance(TwGetBalance):
|
||||
|
||||
async def __init__(self, cfg, proto, minconf, quiet):
|
||||
async def __init__(self, cfg, proto, *, minconf, quiet):
|
||||
self.rpc = await rpc_init(cfg, proto)
|
||||
self.walletinfo = await self.rpc.walletinfo
|
||||
await super().__init__(cfg, proto, minconf, quiet)
|
||||
await super().__init__(cfg, proto, minconf=minconf, quiet=quiet)
|
||||
|
||||
start_labels = ('TOTAL', 'Non-MMGen', 'Non-wallet')
|
||||
conf_cols = {
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ class BitcoinTwRPC(TwRPC):
|
|||
return [label_addr_pair(label, CoinAddr(self.proto, addrs[0]))
|
||||
for label, addrs in zip(acct_labels, acct_addrs)]
|
||||
|
||||
async def get_unspent_by_mmid(self, minconf=1, mmid_filter=[]):
|
||||
async def get_unspent_by_mmid(self, *, minconf=1, mmid_filter=[]):
|
||||
"""
|
||||
get unspent outputs in tracking wallet, compute balances per address
|
||||
and return a dict with elements {'twmmid': {'addr', 'lbl', 'amt'}}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class BitcoinTwTransaction:
|
|||
|
||||
no_address_str = '[DATA]'
|
||||
|
||||
def __init__(self, parent, proto, rpc,
|
||||
def __init__(self, *, parent, proto, rpc,
|
||||
idx, # unique numeric identifier of this transaction in listing
|
||||
unspent_info, # addrs in wallet with balances: {'mmid': {'addr', 'comment', 'amt'}}
|
||||
mm_map, # all addrs in wallet: ['addr', ['twmmid', 'comment']]
|
||||
|
|
@ -127,13 +127,13 @@ class BitcoinTwTransaction:
|
|||
self.time = self.tx.get('blocktime') or self.tx['time']
|
||||
self.time_received = self.tx.get('timereceived')
|
||||
|
||||
def blockheight_disp(self, color):
|
||||
def blockheight_disp(self, *, color):
|
||||
return (
|
||||
# old/altcoin daemons return no 'blockheight' field, so use confirmations instead
|
||||
Int(self.rpc.blockcount + 1 - self.confirmations).hl(color=color)
|
||||
if self.confirmations > 0 else None)
|
||||
|
||||
def age_disp(self, age_fmt, width, color):
|
||||
def age_disp(self, age_fmt, *, width, color):
|
||||
if age_fmt == 'confs':
|
||||
ret_str = str(self.confirmations).ljust(width)
|
||||
return gray(ret_str) if self.confirmations < 0 and color else ret_str
|
||||
|
|
|
|||
|
|
@ -292,7 +292,7 @@ class Base(TxBase):
|
|||
return int(ret * (self.cfg.vsize_adj or 1))
|
||||
|
||||
# convert absolute CoinAmt fee to sat/byte for display using estimated size
|
||||
def fee_abs2rel(self, abs_fee, to_unit='satoshi'):
|
||||
def fee_abs2rel(self, abs_fee, *, to_unit='satoshi'):
|
||||
return str(int(
|
||||
abs_fee /
|
||||
getattr(self.proto.coin_amt, to_unit) /
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ class TxInfo(TxInfo):
|
|||
out += f', Base {tsize-wsize}, Witness {wsize}'
|
||||
return out + '\n'
|
||||
|
||||
def format_body(self, blockcount, nonmm_str, max_mmwid, enl, terse, sort):
|
||||
def format_body(self, blockcount, nonmm_str, max_mmwid, enl, *, terse, sort):
|
||||
|
||||
if sort not in self.sort_orders:
|
||||
die(1, '{!r}: invalid transaction view sort order. Valid options: {}'.format(
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ class New(Base, TxNew):
|
|||
return self.proto.coin_amt(amt_in_units * tx_size, from_unit=units[unit])
|
||||
|
||||
# given network fee estimate in BTC/kB, return absolute fee using estimated tx size
|
||||
def fee_est2abs(self, fee_per_kb, fe_type=None):
|
||||
def fee_est2abs(self, fee_per_kb, *, fe_type=None):
|
||||
tx_size = self.estimate_size()
|
||||
ret = self.proto.coin_amt('1') * (fee_per_kb * self.cfg.fee_adjust * tx_size / 1024)
|
||||
if self.cfg.verbose:
|
||||
|
|
@ -128,6 +128,7 @@ class New(Base, TxNew):
|
|||
def check_chg_addr_is_wallet_addr(
|
||||
self,
|
||||
output = None,
|
||||
*,
|
||||
message = 'Change address is not an MMGen wallet address!'):
|
||||
def do_err():
|
||||
from ....ui import confirm_or_raise
|
||||
|
|
@ -141,7 +142,7 @@ class New(Base, TxNew):
|
|||
elif len(self.nondata_outputs) > 1 and not self.chg_output.mmid:
|
||||
do_err()
|
||||
|
||||
async def create_serialized(self, locktime=None):
|
||||
async def create_serialized(self, *, locktime=None):
|
||||
|
||||
if not self.is_bump:
|
||||
# Set all sequence numbers to the same value, in conformity with the behavior of most modern wallets:
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class EthereumTwAddrData(TwAddrData):
|
|||
"""
|
||||
}
|
||||
|
||||
async def get_tw_data(self, twctl=None):
|
||||
async def get_tw_data(self, *, twctl=None):
|
||||
from ...tw.ctl import TwCtl
|
||||
self.cfg._util.vmsg('Getting address data from tracking wallet')
|
||||
twctl = (twctl or await TwCtl(self.cfg, self.proto)).mmid_ordered_dict
|
||||
|
|
|
|||
|
|
@ -99,6 +99,7 @@ class TokenCommon(MMGenObject):
|
|||
self,
|
||||
to_addr,
|
||||
amt,
|
||||
*,
|
||||
method_sig = 'transfer(address,uint256)'):
|
||||
from_arg = ''
|
||||
to_arg = to_addr.rjust(64, '0')
|
||||
|
|
@ -112,6 +113,7 @@ class TokenCommon(MMGenObject):
|
|||
start_gas,
|
||||
gasPrice,
|
||||
nonce,
|
||||
*,
|
||||
method_sig = 'transfer(address,uint256)'):
|
||||
data = self.create_data(
|
||||
to_addr,
|
||||
|
|
@ -125,7 +127,7 @@ class TokenCommon(MMGenObject):
|
|||
'nonce': nonce,
|
||||
'data': bytes.fromhex(data)}
|
||||
|
||||
async def txsign(self, tx_in, key, from_addr, chain_id=None):
|
||||
async def txsign(self, tx_in, key, from_addr, *, chain_id=None):
|
||||
|
||||
from .pyethereum.transactions import Transaction
|
||||
|
||||
|
|
@ -162,6 +164,7 @@ class TokenCommon(MMGenObject):
|
|||
key,
|
||||
start_gas,
|
||||
gasPrice,
|
||||
*,
|
||||
method_sig = 'transfer(address,uint256)'):
|
||||
tx_in = self.make_tx_in(
|
||||
to_addr,
|
||||
|
|
@ -175,7 +178,7 @@ class TokenCommon(MMGenObject):
|
|||
|
||||
class Token(TokenCommon):
|
||||
|
||||
def __init__(self, cfg, proto, addr, decimals, rpc=None):
|
||||
def __init__(self, cfg, proto, addr, decimals, *, rpc=None):
|
||||
if type(self).__name__ == 'Token':
|
||||
from ...util2 import get_keccak
|
||||
self.keccak_256 = get_keccak(cfg)
|
||||
|
|
@ -199,4 +202,4 @@ class ResolvedToken(TokenCommon, metaclass=AsyncInit):
|
|||
decimals = await self.get_decimals() # requires self.addr!
|
||||
if not decimals:
|
||||
die('TokenNotInBlockchain', f'Token {addr!r} not in blockchain')
|
||||
Token.__init__(self, cfg, proto, addr, decimals, rpc)
|
||||
Token.__init__(self, cfg, proto, addr, decimals, rpc=rpc)
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ class EthereumRPCClient(RPCClient, metaclass=AsyncInit):
|
|||
self,
|
||||
cfg,
|
||||
proto,
|
||||
*,
|
||||
daemon,
|
||||
backend,
|
||||
ignore_wallet):
|
||||
|
|
@ -83,13 +84,13 @@ class EthereumRPCClient(RPCClient, metaclass=AsyncInit):
|
|||
if (await self.call('parity_nodeKind'))['capability'] == 'full':
|
||||
self.caps += ('full_node',)
|
||||
# parity/openethereum return chainID only for dev chain:
|
||||
self.chainID = None if ci is None else Int(ci, 16)
|
||||
self.chainID = None if ci is None else Int(ci, base=16)
|
||||
self.chain = (await self.call('parity_chain')).replace(' ', '_').replace('_testnet', '')
|
||||
elif self.daemon.id in ('geth', 'reth', 'erigon'):
|
||||
if self.daemon.network == 'mainnet':
|
||||
daemon_warning(self.daemon.id)
|
||||
self.caps += ('full_node',)
|
||||
self.chainID = Int(ci, 16)
|
||||
self.chainID = Int(ci, base=16)
|
||||
self.chain = self.proto.chain_ids[self.chainID]
|
||||
|
||||
def make_host_path(self, wallet):
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ class EthereumTwAddresses(TwAddresses, EthereumTwView, EthereumTwRPC):
|
|||
'w':'a_view_detail',
|
||||
'p':'a_print_detail'}
|
||||
|
||||
def get_column_widths(self, data, wide, interactive):
|
||||
def get_column_widths(self, data, *, wide, interactive):
|
||||
|
||||
return self.compute_column_widths(
|
||||
widths = { # fixed cols
|
||||
|
|
|
|||
|
|
@ -30,9 +30,9 @@ class EthereumTwGetBalance(TwGetBalance):
|
|||
'ge_minconf': 'Balance',
|
||||
}
|
||||
|
||||
async def __init__(self, cfg, proto, *args, **kwargs):
|
||||
async def __init__(self, cfg, proto, *, minconf, quiet):
|
||||
self.twctl = await TwCtl(cfg, proto, mode='w')
|
||||
await super().__init__(cfg, proto, *args, **kwargs)
|
||||
await super().__init__(cfg, proto, minconf=minconf, quiet=quiet)
|
||||
|
||||
async def create_data(self):
|
||||
in_data = self.twctl.mmid_ordered_dict
|
||||
|
|
|
|||
|
|
@ -213,7 +213,8 @@ class EthereumTokenTwCtl(EthereumTwCtl):
|
|||
return 'token ' + self.get_param('symbol')
|
||||
|
||||
async def rpc_get_balance(self, addr):
|
||||
return await Token(self.cfg, self.proto, self.token, self.decimals, self.rpc).get_balance(addr)
|
||||
return await Token(
|
||||
self.cfg, self.proto, self.token, self.decimals, rpc=self.rpc).get_balance(addr)
|
||||
|
||||
async def get_eth_balance(self, addr, *, force_rpc=False):
|
||||
cache = self.cur_eth_balances
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ class EthereumTwUnspentOutputs(EthereumTwView, TwUnspentOutputs):
|
|||
|
||||
no_data_errmsg = 'No accounts in tracking wallet!'
|
||||
|
||||
def get_column_widths(self, data, wide, interactive):
|
||||
def get_column_widths(self, data, *, wide, interactive):
|
||||
# min screen width: 80 cols
|
||||
# num addr [mmid] [comment] amt [amt2]
|
||||
return self.compute_column_widths(
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class Base(TxBase.Base):
|
|||
return str(int(fee))
|
||||
|
||||
# given absolute fee in ETH, return gas price for display in selected unit
|
||||
def fee_abs2rel(self, abs_fee, to_unit='Gwei'):
|
||||
def fee_abs2rel(self, abs_fee, *, to_unit='Gwei'):
|
||||
return self.pretty_fmt_fee(
|
||||
self.fee_abs2gas(abs_fee).to_unit(to_unit))
|
||||
|
||||
|
|
@ -63,10 +63,10 @@ class Base(TxBase.Base):
|
|||
tx = await self.rpc.call('eth_getTransactionByHash', '0x'+txid)
|
||||
return namedtuple('exec_status',
|
||||
['status', 'gas_sent', 'gas_used', 'gas_price', 'contract_addr', 'tx', 'rx'])(
|
||||
status = Int(rx['status'], 16), # zero is failure, non-zero success
|
||||
gas_sent = Int(tx['gas'], 16),
|
||||
gas_used = Int(rx['gasUsed'], 16),
|
||||
gas_price = self.proto.coin_amt(int(tx['gasPrice'], 16), from_unit='wei'),
|
||||
status = Int(rx['status'], base=16), # zero is failure, non-zero success
|
||||
gas_sent = Int(tx['gas'], base=16),
|
||||
gas_used = Int(rx['gasUsed'], base=16),
|
||||
gas_price = self.proto.coin_amt(Int(tx['gasPrice'], base=16), from_unit='wei'),
|
||||
contract_addr = self.proto.coin_addr(rx['contractAddress'][2:])
|
||||
if rx['contractAddress'] else None,
|
||||
tx = tx,
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class TxInfo(TxInfo):
|
|||
""")
|
||||
to_addr_key = 'to'
|
||||
|
||||
def format_body(self, blockcount, nonmm_str, max_mmwid, enl, terse, sort):
|
||||
def format_body(self, blockcount, nonmm_str, max_mmwid, enl, *, terse, sort):
|
||||
tx = self.tx
|
||||
m = {}
|
||||
for k in ('inputs', 'outputs'):
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ class New(Base, TxBase.New):
|
|||
# Instead of serializing tx data as with BTC, just create a JSON dump.
|
||||
# This complicates things but means we avoid using the rlp library to deserialize the data,
|
||||
# thus removing an attack vector
|
||||
async def create_serialized(self, locktime=None):
|
||||
async def create_serialized(self, *, locktime=None):
|
||||
assert len(self.inputs) == 1, 'Transaction has more than one input!'
|
||||
o_num = len(self.outputs)
|
||||
o_ok = 0 if self.usr_contract_data else 1
|
||||
|
|
@ -117,7 +117,7 @@ class New(Base, TxBase.New):
|
|||
|
||||
# get rel_fee (gas price) from network, return in native wei
|
||||
async def get_rel_fee_from_network(self):
|
||||
return Int(await self.rpc.call('eth_gasPrice'), 16), 'eth_gasPrice'
|
||||
return Int(await self.rpc.call('eth_gasPrice'), base=16), 'eth_gasPrice'
|
||||
|
||||
def check_chg_addr_is_wallet_addr(self):
|
||||
pass
|
||||
|
|
@ -131,7 +131,7 @@ class New(Base, TxBase.New):
|
|||
return self.proto.coin_amt(amt_in_units, from_unit=units[unit]) * self.gas.toWei()
|
||||
|
||||
# given fee estimate (gas price) in wei, return absolute fee, adjusting by self.cfg.fee_adjust
|
||||
def fee_est2abs(self, rel_fee, fe_type=None):
|
||||
def fee_est2abs(self, rel_fee, *, fe_type=None):
|
||||
ret = self.fee_gasPrice2abs(rel_fee) * self.cfg.fee_adjust
|
||||
if self.cfg.verbose:
|
||||
msg(f'Estimated fee: {ret} ETH')
|
||||
|
|
|
|||
|
|
@ -317,7 +317,7 @@ class RPCClient(MMGenObject):
|
|||
host_path = self.make_host_path(wallet)
|
||||
))
|
||||
|
||||
async def batch_call(self, method, param_list, timeout=None, wallet=None):
|
||||
async def batch_call(self, method, param_list, *, timeout=None, wallet=None):
|
||||
"""
|
||||
Make a single call with a list of tuples as first argument
|
||||
For RPC calls that return a list of results
|
||||
|
|
@ -332,7 +332,7 @@ class RPCClient(MMGenObject):
|
|||
host_path = self.make_host_path(wallet)
|
||||
), batch=True)
|
||||
|
||||
async def gathered_call(self, method, args_list, timeout=None, wallet=None):
|
||||
async def gathered_call(self, method, args_list, *, timeout=None, wallet=None):
|
||||
"""
|
||||
Perform multiple RPC calls, returning results in a list
|
||||
Can be called two ways:
|
||||
|
|
@ -369,7 +369,7 @@ class RPCClient(MMGenObject):
|
|||
timeout = timeout,
|
||||
wallet = wallet)
|
||||
|
||||
def gathered_icall(self, method, args_list, timeout=None, wallet=None):
|
||||
def gathered_icall(self, method, args_list, *, timeout=None, wallet=None):
|
||||
return self.gathered_call(
|
||||
method,
|
||||
[getattr(self.call_sigs, method)(*a)[1:] for a in args_list],
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class SeedID(HiliteStr, InitErrors):
|
|||
color = 'blue'
|
||||
width = 8
|
||||
trunc_ok = False
|
||||
def __new__(cls, seed=None, sid=None):
|
||||
def __new__(cls, *, seed=None, sid=None):
|
||||
if isinstance(sid, cls):
|
||||
return sid
|
||||
try:
|
||||
|
|
@ -54,7 +54,7 @@ class SeedBase(MMGenObject):
|
|||
data = ImmutableAttr(bytes, typeconv=False)
|
||||
sid = ImmutableAttr(SeedID, typeconv=False)
|
||||
|
||||
def __init__(self, cfg, seed_bin=None, nSubseeds=None):
|
||||
def __init__(self, cfg, *, seed_bin=None, nSubseeds=None):
|
||||
|
||||
if not seed_bin:
|
||||
from .crypto import Crypto
|
||||
|
|
|
|||
|
|
@ -106,7 +106,8 @@ class SeedShareList(SubSeedList):
|
|||
if last_share_debug(ls) or ls.sid in self.data['long'] or ls.sid == parent_seed.sid:
|
||||
# collision: throw out entire split list and redo with new start nonce
|
||||
if parent_seed.cfg.debug_subseed:
|
||||
self._collision_debug_msg(ls.sid, count, nonce, 'nonce_start', debug_last_share)
|
||||
self._collision_debug_msg(
|
||||
ls.sid, count, nonce, nonce_desc='nonce_start', debug_last_share=debug_last_share)
|
||||
else:
|
||||
self.data['long'][ls.sid] = (count, nonce)
|
||||
break
|
||||
|
|
@ -250,12 +251,11 @@ class SeedShareMaster(SeedBase, SeedShareBase):
|
|||
self.parent_list = parent_list
|
||||
self.cfg = parent_list.parent_seed.cfg
|
||||
|
||||
SeedBase.__init__(self, self.cfg, self.make_base_seed_bin())
|
||||
SeedBase.__init__(self, self.cfg, seed_bin=self.make_base_seed_bin())
|
||||
|
||||
self.derived_seed = SeedBase(
|
||||
self.cfg,
|
||||
self.make_derived_seed_bin(parent_list.id_str, parent_list.count)
|
||||
)
|
||||
seed_bin = self.make_derived_seed_bin(parent_list.id_str, parent_list.count))
|
||||
|
||||
@property
|
||||
def fn_stem(self):
|
||||
|
|
@ -291,11 +291,14 @@ class SeedShareMasterJoining(SeedShareMaster):
|
|||
self.cfg = cfg
|
||||
self.id_str = id_str or 'default'
|
||||
self.count = count
|
||||
self.derived_seed = SeedBase(cfg, self.make_derived_seed_bin(self.id_str, self.count))
|
||||
self.derived_seed = SeedBase(
|
||||
cfg,
|
||||
seed_bin = self.make_derived_seed_bin(self.id_str, self.count))
|
||||
|
||||
def join_shares(
|
||||
cfg,
|
||||
seed_list,
|
||||
*,
|
||||
master_idx = None,
|
||||
id_str = None):
|
||||
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ class SubSeedList(MMGenObject):
|
|||
debug_last_share_sid_len = 3
|
||||
dfl_len = 100
|
||||
|
||||
def __init__(self, parent_seed, length=None):
|
||||
def __init__(self, parent_seed, *, length=None):
|
||||
self.member_type = SubSeed
|
||||
self.parent_seed = parent_seed
|
||||
self.data = {'long': IndexedDict(), 'short': IndexedDict()}
|
||||
|
|
@ -157,7 +157,7 @@ class SubSeedList(MMGenObject):
|
|||
do_msg(subseed)
|
||||
return subseed
|
||||
|
||||
def _collision_debug_msg(self, sid, idx, nonce, nonce_desc='nonce', debug_last_share=False):
|
||||
def _collision_debug_msg(self, sid, idx, nonce, *, nonce_desc='nonce', debug_last_share=False):
|
||||
slen = 'short' if sid in self.data['short'] else 'long'
|
||||
m1 = f'add_subseed(idx={idx},{slen}):'
|
||||
if sid == self.parent_seed.sid:
|
||||
|
|
@ -172,7 +172,7 @@ class SubSeedList(MMGenObject):
|
|||
m2 = f'collision with ID {sid} (idx={colliding_idx},{slen}),'
|
||||
msg(f'{m1:30} {m2:46} incrementing {nonce_desc} to {nonce+1}')
|
||||
|
||||
def _generate(self, last_idx=None, last_sid=None):
|
||||
def _generate(self, last_idx=None, *, last_sid=None):
|
||||
|
||||
if last_idx is None:
|
||||
last_idx = self.len
|
||||
|
|
|
|||
|
|
@ -121,7 +121,7 @@ class Memo:
|
|||
|
||||
return ret(proto_name, function, chain, asset, address, limit_int, int(interval), int(quantity))
|
||||
|
||||
def __init__(self, proto, addr, chain=None, trade_limit=None):
|
||||
def __init__(self, proto, addr, *, chain=None, trade_limit=None):
|
||||
self.proto = proto
|
||||
self.chain = chain or proto.coin
|
||||
if trade_limit is None:
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class ThornodeRPCClient:
|
|||
verify = True
|
||||
timeout = 5
|
||||
|
||||
def __init__(self, tx, proto=None, host=None):
|
||||
def __init__(self, tx, *, proto=None, host=None):
|
||||
self.cfg = tx.cfg
|
||||
if proto:
|
||||
self.proto = proto
|
||||
|
|
@ -38,7 +38,7 @@ class ThornodeRPCClient:
|
|||
'https': f'socks5h://{self.cfg.proxy}'
|
||||
})
|
||||
|
||||
def get(self, path, timeout=None):
|
||||
def get(self, path, *, timeout=None):
|
||||
return self.session.get(
|
||||
url = self.proto + '://' + self.host + path,
|
||||
timeout = timeout or self.timeout,
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class tool_cmd_base(MMGenObject):
|
|||
need_addrtype = False
|
||||
need_amt = False
|
||||
|
||||
def __init__(self, cfg, cmdname=None, proto=None, mmtype=None):
|
||||
def __init__(self, cfg, *, cmdname=None, proto=None, mmtype=None):
|
||||
|
||||
self.cfg = cfg
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
need_proto = True
|
||||
|
||||
def __init__(self, cfg, cmdname=None, proto=None, mmtype=None):
|
||||
def __init__(self, cfg, *, cmdname=None, proto=None, mmtype=None):
|
||||
if cmdname == 'txview':
|
||||
self.need_amt = True
|
||||
super().__init__(cfg=cfg, cmdname=cmdname, proto=proto, mmtype=mmtype)
|
||||
|
|
|
|||
|
|
@ -35,20 +35,20 @@ class tool_cmd(tool_cmd_base):
|
|||
* Enc: AES256_CTR, 16-byte rand IV, sha256 hash + 32-byte nonce + data
|
||||
* The encrypted file is indistinguishable from random data
|
||||
"""
|
||||
def encrypt(self, infile: str, outfile='', hash_preset=''):
|
||||
def encrypt(self, infile: str, *, outfile='', hash_preset=''):
|
||||
"encrypt a file"
|
||||
data = get_data_from_file(self.cfg, infile, desc='data for encryption', binary=True)
|
||||
enc_d = Crypto(self.cfg).mmgen_encrypt(data, 'data', hash_preset)
|
||||
enc_d = Crypto(self.cfg).mmgen_encrypt(data, desc='data', hash_preset=hash_preset)
|
||||
if not outfile:
|
||||
outfile = f'{os.path.basename(infile)}.{Crypto.mmenc_ext}'
|
||||
write_data_to_file(self.cfg, outfile, enc_d, desc='encrypted data', binary=True)
|
||||
return True
|
||||
|
||||
def decrypt(self, infile: str, outfile='', hash_preset=''):
|
||||
def decrypt(self, infile: str, *, outfile='', hash_preset=''):
|
||||
"decrypt a file"
|
||||
enc_d = get_data_from_file(self.cfg, infile, desc='encrypted data', binary=True)
|
||||
while True:
|
||||
dec_d = Crypto(self.cfg).mmgen_decrypt(enc_d, 'data', hash_preset)
|
||||
dec_d = Crypto(self.cfg).mmgen_decrypt(enc_d, desc='data', hash_preset=hash_preset)
|
||||
if dec_d:
|
||||
break
|
||||
from ..util import msg
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ def gen_tool_cmd_usage(mod, cmdname):
|
|||
for line in docstr.split('\n')[1:]:
|
||||
yield line.lstrip('\t')
|
||||
|
||||
def usage(cmdname=None, exit_val=1):
|
||||
def usage(cmdname=None, *, exit_val=1):
|
||||
|
||||
from ..util import Msg, die
|
||||
|
||||
|
|
|
|||
|
|
@ -56,7 +56,8 @@ class tool_cmd(tool_cmd_base):
|
|||
pager: 'send output to pager' = False):
|
||||
"list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet"
|
||||
from ..tw.bal import TwGetBalance
|
||||
return (await TwGetBalance(self.cfg, self.proto, minconf, quiet)).format(color=self.cfg.color)
|
||||
return (await TwGetBalance(
|
||||
self.cfg, self.proto, minconf=minconf, quiet=quiet)).format(color=self.cfg.color)
|
||||
|
||||
async def twops(self,
|
||||
obj, pager, reverse, detail, sort, age_fmt, interactive,
|
||||
|
|
@ -189,7 +190,7 @@ class tool_cmd(tool_cmd_base):
|
|||
from ..tw.ctl import TwCtl
|
||||
return await (await TwCtl(self.cfg, self.proto, mode='w')).rescan_address(mmgen_or_coin_addr)
|
||||
|
||||
async def rescan_blockchain(self,
|
||||
async def rescan_blockchain(self, *,
|
||||
start_block: int = None,
|
||||
stop_block: int = None):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
def hexdump(self,
|
||||
infile: str,
|
||||
*,
|
||||
cols: 'number of columns in output' = 8,
|
||||
line_nums: "format for line numbers (valid choices: 'hex','dec')" = 'hex'):
|
||||
"create hexdump of data from file (use '-' for stdin)"
|
||||
|
|
@ -174,7 +175,7 @@ class tool_cmd(tool_cmd_base):
|
|||
return make_chksum_8(
|
||||
get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True))
|
||||
|
||||
def randb58(self,
|
||||
def randb58(self, *,
|
||||
nbytes: 'number of bytes to output' = 32,
|
||||
pad: 'pad output to this width' = 0):
|
||||
"generate random data (default: 32 bytes) and convert it to base 58"
|
||||
|
|
@ -182,24 +183,24 @@ class tool_cmd(tool_cmd_base):
|
|||
from ..crypto import Crypto
|
||||
return baseconv('b58').frombytes(Crypto(self.cfg).get_random(nbytes), pad=pad, tostr=True)
|
||||
|
||||
def bytestob58(self, infile: str, pad: 'pad output to this width' = 0):
|
||||
def bytestob58(self, infile: str, *, pad: 'pad output to this width' = 0):
|
||||
"convert bytes to base 58 (supply data via STDIN)"
|
||||
from ..fileutil import get_data_from_file
|
||||
from ..baseconv import baseconv
|
||||
data = get_data_from_file(self.cfg, infile, dash=True, quiet=True, binary=True)
|
||||
return baseconv('b58').frombytes(data, pad=pad, tostr=True)
|
||||
|
||||
def b58tobytes(self, b58_str: 'sstr', pad: 'pad output to this width' = 0):
|
||||
def b58tobytes(self, b58_str: 'sstr', *, pad: 'pad output to this width' = 0):
|
||||
"convert a base 58 string to bytes (warning: outputs binary data)"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b58').tobytes(b58_str, pad=pad)
|
||||
|
||||
def hextob58(self, hexstr: 'sstr', pad: 'pad output to this width' = 0):
|
||||
def hextob58(self, hexstr: 'sstr', *, pad: 'pad output to this width' = 0):
|
||||
"convert a hexadecimal string to base 58"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b58').fromhex(hexstr, pad=pad, tostr=True)
|
||||
|
||||
def b58tohex(self, b58_str: 'sstr', pad: 'pad output to this width' = 0):
|
||||
def b58tohex(self, b58_str: 'sstr', *, pad: 'pad output to this width' = 0):
|
||||
"convert a base 58 string to hexadecimal"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b58').tohex(b58_str, pad=pad)
|
||||
|
|
@ -214,12 +215,12 @@ class tool_cmd(tool_cmd_base):
|
|||
from ..proto.btc.common import b58chk_decode
|
||||
return b58chk_decode(b58chk_str).hex()
|
||||
|
||||
def hextob32(self, hexstr: 'sstr', pad: 'pad output to this width' = 0):
|
||||
def hextob32(self, hexstr: 'sstr', *, pad: 'pad output to this width' = 0):
|
||||
"convert a hexadecimal string to an MMGen-flavor base 32 string"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b32').fromhex(hexstr, pad=pad, tostr=True)
|
||||
|
||||
def b32tohex(self, b32_str: 'sstr', pad: 'pad output to this width' = 0):
|
||||
def b32tohex(self, b32_str: 'sstr', *, pad: 'pad output to this width' = 0):
|
||||
"convert an MMGen-flavor base 32 string to hexadecimal"
|
||||
from ..baseconv import baseconv
|
||||
return baseconv('b32').tohex(b32_str.upper(), pad=pad)
|
||||
|
|
@ -235,7 +236,7 @@ class tool_cmd(tool_cmd_base):
|
|||
ret = baseconv('b6d').fromhex(hexstr, pad=pad, tostr=True)
|
||||
return block_format(ret, gw=5, cols=None).strip() if add_spaces else ret
|
||||
|
||||
def b6dtohex(self, b6d_str: 'sstr', pad: 'pad output to this width' = 0):
|
||||
def b6dtohex(self, b6d_str: 'sstr', *, pad: 'pad output to this width' = 0):
|
||||
"convert a die roll base6 (base6d) string to hexadecimal"
|
||||
from ..baseconv import baseconv
|
||||
from ..util import remove_whitespace
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ from ..wallet import Wallet
|
|||
class tool_cmd(tool_cmd_base):
|
||||
"key, address or subseed generation from an MMGen wallet"
|
||||
|
||||
def __init__(self, cfg, cmdname=None, proto=None, mmtype=None):
|
||||
def __init__(self, cfg, *, cmdname=None, proto=None, mmtype=None):
|
||||
self.need_proto = cmdname in ('gen_key', 'gen_addr')
|
||||
super().__init__(cfg, cmdname=cmdname, proto=proto, mmtype=mmtype)
|
||||
|
||||
|
|
@ -40,12 +40,12 @@ class tool_cmd(tool_cmd_base):
|
|||
wallets = [wallet] if wallet else [],
|
||||
nargs = 1)
|
||||
|
||||
def get_subseed(self, subseed_idx: str, wallet=''):
|
||||
def get_subseed(self, subseed_idx: str, *, wallet=''):
|
||||
"get the Seed ID of a single subseed by Subseed Index for default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
return Wallet(self.cfg, fn=self._get_seed_file(wallet)).seed.subseed(subseed_idx).sid
|
||||
|
||||
def get_subseed_by_seed_id(self, seed_id: str, wallet='', last_idx=SubSeedList.dfl_len):
|
||||
def get_subseed_by_seed_id(self, seed_id: str, *, wallet='', last_idx=SubSeedList.dfl_len):
|
||||
"get the Subseed Index of a single subseed by Seed ID for default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
ret = Wallet(
|
||||
|
|
@ -53,7 +53,7 @@ class tool_cmd(tool_cmd_base):
|
|||
fn = self._get_seed_file(wallet)).seed.subseed_by_seed_id(seed_id, last_idx=last_idx)
|
||||
return ret.ss_idx if ret else None
|
||||
|
||||
def list_subseeds(self, subseed_idx_range: str, wallet=''):
|
||||
def list_subseeds(self, subseed_idx_range: str, *, wallet=''):
|
||||
"list a range of subseed Seed IDs for default or specified wallet"
|
||||
self.cfg._set_quiet(True)
|
||||
from ..subseed import SubSeedIdxRange
|
||||
|
|
@ -62,6 +62,7 @@ class tool_cmd(tool_cmd_base):
|
|||
|
||||
def list_shares(self,
|
||||
share_count: int,
|
||||
*,
|
||||
id_str = 'default',
|
||||
master_share: f'(min:1, max:{MasterShareIdx.max_val}, 0=no master share)' = 0,
|
||||
wallet = ''):
|
||||
|
|
@ -70,15 +71,15 @@ class tool_cmd(tool_cmd_base):
|
|||
return Wallet(self.cfg, fn=self._get_seed_file(wallet)).seed.split(
|
||||
share_count, id_str=id_str, master_idx=master_share).format()
|
||||
|
||||
def gen_key(self, mmgen_addr: str, wallet=''):
|
||||
def gen_key(self, mmgen_addr: str, *, wallet=''):
|
||||
"generate a single WIF key for specified MMGen address from default or specified wallet"
|
||||
return self._gen_keyaddr(mmgen_addr, 'wif', wallet)
|
||||
return self._gen_keyaddr(mmgen_addr, 'wif', wallet=wallet)
|
||||
|
||||
def gen_addr(self, mmgen_addr: str, wallet=''):
|
||||
def gen_addr(self, mmgen_addr: str, *, wallet=''):
|
||||
"generate a single MMGen address from default or specified wallet"
|
||||
return self._gen_keyaddr(mmgen_addr, 'addr', wallet)
|
||||
return self._gen_keyaddr(mmgen_addr, 'addr', wallet=wallet)
|
||||
|
||||
def _gen_keyaddr(self, mmgen_addr, target, wallet=''):
|
||||
def _gen_keyaddr(self, mmgen_addr, target, *, wallet=''):
|
||||
from ..addr import MMGenID
|
||||
from ..addrlist import AddrList, AddrIdxList
|
||||
|
||||
|
|
@ -94,7 +95,7 @@ class tool_cmd(tool_cmd_base):
|
|||
cfg = self.cfg,
|
||||
proto = self.proto,
|
||||
seed = ss.seed,
|
||||
addr_idxs = AddrIdxList(str(addr.idx)),
|
||||
addr_idxs = AddrIdxList(fmt_str=str(addr.idx)),
|
||||
mmtype = addr.mmtype,
|
||||
skip_chksum = True).data[0]
|
||||
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ class TwAddresses(TwView):
|
|||
f'{mmgen_addrs}: invalid address list argument ' +
|
||||
'(must be in form <seed ID>:[<type>:]<idx list>)')
|
||||
from ..addrlist import AddrIdxList
|
||||
self.usr_addr_list = [MMGenID(self.proto, f'{a[0]}:{i}') for i in AddrIdxList(a[1])]
|
||||
self.usr_addr_list = [MMGenID(self.proto, f'{a[0]}:{i}') for i in AddrIdxList(fmt_str=a[1])]
|
||||
else:
|
||||
self.usr_addr_list = []
|
||||
|
||||
|
|
@ -274,7 +274,7 @@ class TwAddresses(TwView):
|
|||
return bool(e.recvd)
|
||||
return None # addr not in tracking wallet
|
||||
|
||||
def get_change_address(self, al_id, bot=None, top=None, exclude=None, desc=None):
|
||||
def get_change_address(self, al_id, *, bot=None, top=None, exclude=None, desc=None):
|
||||
"""
|
||||
Get lowest-indexed unused address in tracking wallet for requested AddrListID.
|
||||
Return values on failure:
|
||||
|
|
@ -333,7 +333,7 @@ class TwAddresses(TwView):
|
|||
break
|
||||
return False
|
||||
|
||||
def get_change_address_by_addrtype(self, mmtype, exclude, desc):
|
||||
def get_change_address_by_addrtype(self, mmtype, *, exclude, desc):
|
||||
"""
|
||||
Find the lowest-indexed change addresses in tracking wallet of given address type,
|
||||
present them in a menu and return a single change address chosen by the user.
|
||||
|
|
@ -366,7 +366,8 @@ class TwAddresses(TwView):
|
|||
msg(f'{res}: invalid entry')
|
||||
|
||||
def get_addr(mmtype):
|
||||
return [self.get_change_address(f'{sid}:{mmtype}', r.bot, r.top, exclude=exclude, desc=desc)
|
||||
return [self.get_change_address(
|
||||
f'{sid}:{mmtype}', bot=r.bot, top=r.top, exclude=exclude, desc=desc)
|
||||
for sid, r in self.sid_ranges.items()]
|
||||
|
||||
assert isinstance(mmtype, (type(None), MMGenAddrType))
|
||||
|
|
|
|||
|
|
@ -26,10 +26,10 @@ from ..obj import NonNegativeInt
|
|||
|
||||
class TwGetBalance(MMGenObject, metaclass=AsyncInit):
|
||||
|
||||
def __new__(cls, cfg, proto, *args, **kwargs):
|
||||
def __new__(cls, cfg, proto, *, minconf, quiet):
|
||||
return MMGenObject.__new__(proto.base_proto_subclass(cls, 'tw.bal'))
|
||||
|
||||
async def __init__(self, cfg, proto, minconf, quiet):
|
||||
async def __init__(self, cfg, proto, *, minconf, quiet):
|
||||
|
||||
class BalanceInfo(dict):
|
||||
def __init__(self):
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class TwTxHistory(TwView):
|
|||
filters = ('show_unconfirmed',)
|
||||
mod_subpath = 'tw.txhistory'
|
||||
|
||||
async def __init__(self, cfg, proto, sinceblock=0):
|
||||
async def __init__(self, cfg, proto, *, sinceblock=0):
|
||||
await super().__init__(cfg, proto)
|
||||
self.sinceblock = NonNegativeInt(sinceblock if sinceblock >= 0 else self.rpc.blockcount + sinceblock)
|
||||
|
||||
|
|
@ -57,7 +57,7 @@ class TwTxHistory(TwView):
|
|||
amts_tuple = namedtuple('amts_data', ['amt'])
|
||||
return super().set_amt_widths([amts_tuple(d.amt_disp(self.show_total_amt)) for d in data])
|
||||
|
||||
def get_column_widths(self, data, wide, interactive):
|
||||
def get_column_widths(self, data, *, wide, interactive):
|
||||
|
||||
# var cols: inputs outputs comment [txid]
|
||||
if not hasattr(self, 'varcol_maxwidths'):
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ class TwUnspentOutputs(TwView):
|
|||
self.__dict__['proto'] = proto
|
||||
MMGenListItem.__init__(self, **kwargs)
|
||||
|
||||
async def __init__(self, cfg, proto, minconf=1, addrs=[]):
|
||||
async def __init__(self, cfg, proto, *, minconf=1, addrs=[]):
|
||||
await super().__init__(cfg, proto)
|
||||
self.minconf = minconf
|
||||
self.addrs = addrs
|
||||
|
|
|
|||
|
|
@ -290,7 +290,7 @@ class TwView(MMGenObject, metaclass=AsyncInit):
|
|||
# so add NL here (' ' required because CUR_HOME erases preceding blank lines)
|
||||
msg(' ')
|
||||
|
||||
def get_term_dimensions(self, min_cols, min_lines=None):
|
||||
def get_term_dimensions(self, min_cols, *, min_lines=None):
|
||||
from ..term import get_terminal_size, get_char_raw, _term_dimensions
|
||||
user_resized = False
|
||||
while True:
|
||||
|
|
@ -311,7 +311,7 @@ class TwView(MMGenObject, metaclass=AsyncInit):
|
|||
else:
|
||||
return _term_dimensions(min_cols, ts.height)
|
||||
|
||||
def compute_column_widths(self, widths, maxws, minws, maxws_nice, wide, interactive):
|
||||
def compute_column_widths(self, widths, maxws, minws, maxws_nice, *, wide, interactive):
|
||||
|
||||
def do_ret(freews):
|
||||
widths.update({k:minws[k] + freews.get(k, 0) for k in minws})
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ class Base(MMGenObject):
|
|||
def sum_inputs(self):
|
||||
return sum(e.amt for e in self.inputs)
|
||||
|
||||
def sum_outputs(self, exclude=None):
|
||||
def sum_outputs(self, *, exclude=None):
|
||||
if exclude is None:
|
||||
olist = self.outputs
|
||||
else:
|
||||
|
|
@ -178,7 +178,7 @@ class Base(MMGenObject):
|
|||
self.blockcount = self.rpc.blockcount
|
||||
|
||||
# returns True if comment added or changed, False otherwise
|
||||
def add_comment(self, infile=None):
|
||||
def add_comment(self, *, infile=None):
|
||||
if infile:
|
||||
from ..fileutil import get_data_from_file
|
||||
self.comment = MMGenTxComment(
|
||||
|
|
@ -204,7 +204,7 @@ class Base(MMGenObject):
|
|||
edesc = 'non-MMGen address',
|
||||
quiet = True)
|
||||
|
||||
def check_non_mmgen_inputs(self, caller, non_mmaddrs=None):
|
||||
def check_non_mmgen_inputs(self, caller, *, non_mmaddrs=None):
|
||||
non_mmaddrs = non_mmaddrs or self.get_non_mmaddrs('inputs')
|
||||
if non_mmaddrs:
|
||||
indent = ' '
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ def get_proto_from_coin_id(tx, coin_id, chain):
|
|||
|
||||
return init_proto(tx.cfg, coin, network=network, need_amt=True, tokensym=tokensym)
|
||||
|
||||
def eval_io_data(tx, data, desc):
|
||||
def eval_io_data(tx, data, *, desc):
|
||||
if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs
|
||||
assert len(data), f'no {desc}!'
|
||||
for d in data:
|
||||
|
|
@ -112,7 +112,7 @@ class MMGenTxFile(MMGenObject):
|
|||
setattr(tx, k, v(data[k]) if v else data[k])
|
||||
|
||||
for k in ('inputs', 'outputs'):
|
||||
setattr(tx, k, eval_io_data(tx, data[k], k))
|
||||
setattr(tx, k, eval_io_data(tx, data[k], desc=k))
|
||||
|
||||
tx.check_txfile_hex_data()
|
||||
|
||||
|
|
@ -124,7 +124,7 @@ class MMGenTxFile(MMGenObject):
|
|||
tx = self.tx
|
||||
tx.file_format = 'legacy'
|
||||
|
||||
def deserialize(raw_data, desc):
|
||||
def deserialize(raw_data, *, desc):
|
||||
from ast import literal_eval
|
||||
try:
|
||||
return literal_eval(raw_data)
|
||||
|
|
@ -199,12 +199,12 @@ class MMGenTxFile(MMGenObject):
|
|||
tx.parse_txfile_serialized_data()
|
||||
for k in ('inputs', 'outputs'):
|
||||
desc = f'{k} data'
|
||||
res = deserialize(io_data[k], k)
|
||||
res = deserialize(io_data[k], desc=k)
|
||||
for d in res:
|
||||
if 'label' in d:
|
||||
d['comment'] = d['label']
|
||||
del d['label']
|
||||
setattr(tx, k, eval_io_data(tx, res, k))
|
||||
setattr(tx, k, eval_io_data(tx, res, desc=k))
|
||||
desc = 'send amount in metadata'
|
||||
assert tx.proto.coin_amt(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}'
|
||||
except Exception as e:
|
||||
|
|
|
|||
|
|
@ -126,7 +126,7 @@ class New(Base):
|
|||
|
||||
return False
|
||||
|
||||
def get_usr_fee_interactive(self, fee=None, desc='Starting'):
|
||||
def get_usr_fee_interactive(self, fee=None, *, desc='Starting'):
|
||||
abs_fee = None
|
||||
from ..ui import line_input
|
||||
while True:
|
||||
|
|
@ -400,12 +400,12 @@ class New(Base):
|
|||
async def get_fee(self, fee, outputs_sum, start_fee_desc):
|
||||
|
||||
if fee:
|
||||
self.usr_fee = self.get_usr_fee_interactive(fee, start_fee_desc)
|
||||
self.usr_fee = self.get_usr_fee_interactive(fee, desc=start_fee_desc)
|
||||
else:
|
||||
fee_per_kb, fe_type = await self.get_rel_fee_from_network()
|
||||
self.usr_fee = self.get_usr_fee_interactive(
|
||||
None if fee_per_kb is None else self.fee_est2abs(fee_per_kb, fe_type),
|
||||
self.network_estimated_fee_label)
|
||||
None if fee_per_kb is None else self.fee_est2abs(fee_per_kb, fe_type=fe_type),
|
||||
desc = self.network_estimated_fee_label)
|
||||
|
||||
funds = await self.get_funds_available(self.usr_fee, outputs_sum)
|
||||
|
||||
|
|
@ -426,7 +426,7 @@ class New(Base):
|
|||
from ..tw.unspent import TwUnspentOutputs
|
||||
|
||||
if self.cfg.comment_file:
|
||||
self.add_comment(self.cfg.comment_file)
|
||||
self.add_comment(infile=self.cfg.comment_file)
|
||||
|
||||
if not do_info:
|
||||
cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ def generate_kals_for_mmgen_addrs(need_keys, infiles, saved_seeds, proto):
|
|||
skip_chksum = True)
|
||||
return MMGenList(gen_kals())
|
||||
|
||||
def add_keys(src, io_list, infiles=None, saved_seeds=None, keyaddr_list=None):
|
||||
def add_keys(src, io_list, infiles=None, saved_seeds=None, *, keyaddr_list=None):
|
||||
|
||||
need_keys = [e for e in io_list if e.mmid and not e.have_wif]
|
||||
|
||||
|
|
@ -151,7 +151,7 @@ def get_keylist(cfg):
|
|||
return get_lines_from_file(cfg, cfg.keys_from_file, desc='key-address data', trim_comments=True)
|
||||
return None
|
||||
|
||||
async def txsign(cfg_parm, tx, seed_files, kl, kal, tx_num_str='', passwd_file=None):
|
||||
async def txsign(cfg_parm, tx, seed_files, kl, kal, *, tx_num_str='', passwd_file=None):
|
||||
|
||||
keys = MMGenList() # list of AddrListEntry objects
|
||||
non_mmaddrs = tx.get_non_mmaddrs('inputs')
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import sys, os
|
|||
|
||||
from .util import msg, msg_r, Msg, die
|
||||
|
||||
def confirm_or_raise(cfg, message, action, expect='YES', exit_msg='Exiting at user request'):
|
||||
def confirm_or_raise(cfg, message, action, *, expect='YES', exit_msg='Exiting at user request'):
|
||||
if message:
|
||||
msg(message)
|
||||
if line_input(
|
||||
|
|
@ -32,7 +32,7 @@ def get_words_from_user(cfg, prompt):
|
|||
msg('Sanitized input: [{}]'.format(' '.join(words)))
|
||||
return words
|
||||
|
||||
def get_data_from_user(cfg, desc='data'): # user input MUST be UTF-8
|
||||
def get_data_from_user(cfg, *, desc='data'): # user input MUST be UTF-8
|
||||
data = line_input(cfg, f'Enter {desc}: ', echo=cfg.echo_passphrase)
|
||||
if cfg.debug:
|
||||
msg(f'User input: [{data}]')
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ class Util:
|
|||
|
||||
return True
|
||||
|
||||
def compare_or_die(self, val1, desc1, val2, desc2, e='Error'):
|
||||
def compare_or_die(self, val1, desc1, val2, desc2, *, e='Error'):
|
||||
if val1 != val2:
|
||||
die(3, f"{e}: {desc2} ({val2}) doesn't match {desc1} ({val1})")
|
||||
if self.cfg.debug:
|
||||
|
|
@ -180,15 +180,15 @@ def pp_fmt(d):
|
|||
def pp_msg(d):
|
||||
msg(pp_fmt(d))
|
||||
|
||||
def indent(s, indent=' ', append='\n'):
|
||||
def indent(s, *, indent=' ', append='\n'):
|
||||
"indent multiple lines of text with specified string"
|
||||
return indent + ('\n'+indent).join(s.strip().splitlines()) + append
|
||||
|
||||
def fmt(s, indent='', strip_char=None, append='\n'):
|
||||
def fmt(s, *, indent='', strip_char=None, append='\n'):
|
||||
"de-indent multiple lines of text, or indent with specified string"
|
||||
return indent + ('\n'+indent).join([l.lstrip(strip_char) for l in s.strip().splitlines()]) + append
|
||||
|
||||
def fmt_list(iterable, fmt='dfl', indent='', conv=None):
|
||||
def fmt_list(iterable, *, fmt='dfl', indent='', conv=None):
|
||||
"pretty-format a list"
|
||||
_conv, sep, lq, rq = {
|
||||
'dfl': (str, ", ", "'", "'"),
|
||||
|
|
@ -207,7 +207,7 @@ def fmt_list(iterable, fmt='dfl', indent='', conv=None):
|
|||
conv = conv or _conv
|
||||
return indent + (sep+indent).join(lq+conv(e)+rq for e in iterable)
|
||||
|
||||
def fmt_dict(mapping, fmt='dfl', kconv=None, vconv=None):
|
||||
def fmt_dict(mapping, *, fmt='dfl', kconv=None, vconv=None):
|
||||
"pretty-format a dict"
|
||||
kc, vc, sep, fs = {
|
||||
'dfl': (str, str, ", ", "'{}' ({})"),
|
||||
|
|
@ -260,7 +260,7 @@ def remove_dups(iterable, *, edesc='element', desc='list', quiet=False, hide=Fal
|
|||
def contains_any(target_list, source_list):
|
||||
return any(map(target_list.count, source_list))
|
||||
|
||||
def suf(arg, suf_type='s', verb='none'):
|
||||
def suf(arg, suf_type='s', *, verb='none'):
|
||||
suf_types = {
|
||||
'none': {
|
||||
's': ('s', ''),
|
||||
|
|
@ -361,7 +361,7 @@ def secs_to_ms(secs):
|
|||
def is_int(s): # actually is_nonnegative_int()
|
||||
return set(str(s) or 'x') <= set(digits)
|
||||
|
||||
def check_int_between(val, imin, imax, desc):
|
||||
def check_int_between(val, imin, imax, *, desc):
|
||||
if not imin <= int(val) <= imax:
|
||||
die(1, f'{val}: invalid value for {desc} (must be between {imin} and {imax})')
|
||||
return int(val)
|
||||
|
|
@ -380,7 +380,7 @@ def is_utf8(s):
|
|||
else:
|
||||
return True
|
||||
|
||||
def remove_whitespace(s, ws='\t\r\n '):
|
||||
def remove_whitespace(s, *, ws='\t\r\n '):
|
||||
return s.translate(dict((ord(e), None) for e in ws))
|
||||
|
||||
def strip_comment(line):
|
||||
|
|
|
|||
|
|
@ -128,7 +128,7 @@ def parse_bytespec(nbytes):
|
|||
|
||||
die(1, f'{nbytes!r}: invalid byte specifier')
|
||||
|
||||
def format_elapsed_days_hr(t, now=None, cached={}):
|
||||
def format_elapsed_days_hr(t, *, now=None, cached={}):
|
||||
e = int((now or time.time()) - t)
|
||||
if not e in cached:
|
||||
days = abs(e) // 86400
|
||||
|
|
@ -167,7 +167,7 @@ def format_elapsed_hr(
|
|||
cached[key] = ' '.join(f'{n} {desc}{suf(n)}' for desc, n in data if n) + add_suffix()
|
||||
return cached[key]
|
||||
|
||||
def pretty_format(s, width=80, pfx=''):
|
||||
def pretty_format(s, *, width=80, pfx=''):
|
||||
out = []
|
||||
while s:
|
||||
if len(s) <= width:
|
||||
|
|
@ -192,7 +192,7 @@ def block_format(data, *, gw=2, cols=8, line_nums=None, data_is_hex=False):
|
|||
for i in range(nchunks)
|
||||
).rstrip() + '\n'
|
||||
|
||||
def pretty_hexdump(data, gw=2, cols=8, line_nums=None):
|
||||
def pretty_hexdump(data, *, gw=2, cols=8, line_nums=None):
|
||||
return block_format(data.hex(), gw=gw, cols=cols, line_nums=line_nums, data_is_hex=True)
|
||||
|
||||
def decode_pretty_hexdump(data):
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class wallet(MMGenObject, metaclass=WalletMeta):
|
|||
class WalletData(MMGenObject):
|
||||
pass
|
||||
|
||||
def __init__(self,
|
||||
def __init__(self, *,
|
||||
in_data = None,
|
||||
passwd_file = None):
|
||||
|
||||
|
|
@ -88,7 +88,7 @@ class wallet(MMGenObject, metaclass=WalletMeta):
|
|||
|
||||
def _get_data_from_user(self, desc):
|
||||
from ..ui import get_data_from_user
|
||||
return get_data_from_user(self.cfg, desc)
|
||||
return get_data_from_user(self.cfg, desc=desc)
|
||||
|
||||
def _deformat_once(self):
|
||||
self._get_data()
|
||||
|
|
@ -110,7 +110,7 @@ class wallet(MMGenObject, metaclass=WalletMeta):
|
|||
self._format()
|
||||
return self.fmt_data
|
||||
|
||||
def write_to_file(self, outdir='', desc=''):
|
||||
def write_to_file(self, *, outdir='', desc=''):
|
||||
self._format()
|
||||
kwargs = {
|
||||
'desc': desc or self.desc,
|
||||
|
|
@ -130,7 +130,7 @@ class wallet(MMGenObject, metaclass=WalletMeta):
|
|||
self.fmt_data,
|
||||
**kwargs)
|
||||
|
||||
def check_usr_seed_len(self, bitlen=None):
|
||||
def check_usr_seed_len(self, *, bitlen=None):
|
||||
chk = bitlen or self.seed.bitlen
|
||||
if self.cfg.seed_len and self.cfg.seed_len != chk:
|
||||
die(1, f'ERROR: requested seed length ({self.cfg.seed_len}) doesn’t match seed length of source ({chk})')
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ class wallet(wallet):
|
|||
d.hash_preset,
|
||||
buflen = bw_seed_len // 8)
|
||||
self.cfg._util.qmsg('Done')
|
||||
self.seed = Seed(self.cfg, seed)
|
||||
self.seed = Seed(self.cfg, seed_bin=seed)
|
||||
msg(f'Seed ID: {self.seed.sid}')
|
||||
self.cfg._util.qmsg('Check this value against your records')
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ class wallet(wallet):
|
|||
desc = 'gathered from your die rolls')
|
||||
self.desc += ' plus user-supplied entropy'
|
||||
|
||||
self.seed = Seed(self.cfg, seed_bytes)
|
||||
self.seed = Seed(self.cfg, seed_bin=seed_bytes)
|
||||
|
||||
self.check_usr_seed_len()
|
||||
return True
|
||||
|
|
@ -69,7 +69,7 @@ class wallet(wallet):
|
|||
|
||||
if not self.cfg.stdin_tty:
|
||||
from ..ui import get_data_from_user
|
||||
return get_data_from_user(self.cfg, desc)
|
||||
return get_data_from_user(self.cfg, desc=desc)
|
||||
|
||||
bc = baseconv('b6d')
|
||||
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class wallet(wallet):
|
|||
die(2, 'Passphrase from password file, so exiting')
|
||||
msg('Trying again...')
|
||||
|
||||
def _get_hash_preset_from_user(self, old_preset, add_desc=''):
|
||||
def _get_hash_preset_from_user(self, old_preset, *, add_desc=''):
|
||||
prompt = 'Enter {}hash preset for {}{}{},\nor hit ENTER to {} value ({!r}): '.format(
|
||||
('old ' if self.op=='pwchg_old' else 'new ' if self.op=='pwchg_new' else ''),
|
||||
('', 'new ')[self.op=='new'],
|
||||
|
|
@ -41,7 +41,7 @@ class wallet(wallet):
|
|||
old_preset)
|
||||
return self.crypto.get_hash_preset_from_user(old_preset=old_preset, prompt=prompt)
|
||||
|
||||
def _get_hash_preset(self, add_desc=''):
|
||||
def _get_hash_preset(self, *, add_desc=''):
|
||||
if hasattr(self, 'ss_in') and hasattr(self.ss_in.ssdata, 'hash_preset'):
|
||||
old_hp = self.ss_in.ssdata.hash_preset
|
||||
if self.cfg.keep_hash_preset:
|
||||
|
|
@ -71,7 +71,7 @@ class wallet(wallet):
|
|||
passwd_file = self.passwd_file,
|
||||
pw_desc = ('new ' if self.op=='pwchg_new' else '') + 'passphrase')
|
||||
|
||||
def _get_passphrase(self, add_desc=''):
|
||||
def _get_passphrase(self, *, add_desc=''):
|
||||
return self.crypto.get_passphrase(
|
||||
data_desc = self.desc + (f' {add_desc}' if add_desc else ''),
|
||||
passwd_file = self.passwd_file,
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ class wallet(wallet):
|
|||
key_id = ''))
|
||||
|
||||
if seed:
|
||||
self.seed = Seed(self.cfg, seed)
|
||||
self.seed = Seed(self.cfg, seed_bin=seed)
|
||||
msg(f'Seed ID: {self.seed.sid}')
|
||||
return True
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ class wallet(wallet):
|
|||
super().__init__(*args, **kwargs)
|
||||
|
||||
# logic identical to _get_hash_preset_from_user()
|
||||
def _get_label_from_user(self, old_lbl=''):
|
||||
def _get_label_from_user(self, *, old_lbl=''):
|
||||
prompt = 'Enter a wallet label, or hit ENTER {}: '.format(
|
||||
'to reuse the label {}'.format(old_lbl.hl2(encl='‘’')) if old_lbl else
|
||||
'for no label')
|
||||
|
|
@ -60,7 +60,7 @@ class wallet(wallet):
|
|||
lbl = self.label
|
||||
self.cfg._util.qmsg('Using user-configured label {}'.format(lbl.hl2(encl='‘’')))
|
||||
else: # Prompt, using old value as default
|
||||
lbl = self._get_label_from_user(old_lbl)
|
||||
lbl = self._get_label_from_user(old_lbl=old_lbl)
|
||||
if (not self.cfg.keep_label) and self.op == 'pwchg_new':
|
||||
self.cfg._util.qmsg('Label {}'.format('unchanged' if lbl == old_lbl else f'changed to {lbl!r}'))
|
||||
elif self.label:
|
||||
|
|
@ -122,7 +122,7 @@ class wallet(wallet):
|
|||
d1, d2, d3, d4, d5 = lines[2].split()
|
||||
d.seed_id = d1.upper()
|
||||
d.key_id = d2.upper()
|
||||
self.check_usr_seed_len(int(d3))
|
||||
self.check_usr_seed_len(bitlen=int(d3))
|
||||
d.pw_status, d.timestamp = d4, d5
|
||||
|
||||
hpdata = lines[3].split()
|
||||
|
|
@ -171,9 +171,9 @@ class wallet(wallet):
|
|||
d.passwd = self._get_passphrase(
|
||||
add_desc = os.path.basename(self.infile.name) if self.cfg.quiet else '')
|
||||
key = self.crypto.make_key(d.passwd, d.salt, d.hash_preset)
|
||||
ret = self.crypto.decrypt_seed(d.enc_seed, key, d.seed_id, d.key_id)
|
||||
ret = self.crypto.decrypt_seed(d.enc_seed, key, seed_id=d.seed_id, key_id=d.key_id)
|
||||
if ret:
|
||||
self.seed = Seed(self.cfg, ret)
|
||||
self.seed = Seed(self.cfg, seed_bin=ret)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class wallet(wallet):
|
|||
if not self.cfg._util.compare_chksums(chk, 'file', make_chksum_6(hstr), 'computed', verbose=True):
|
||||
return False
|
||||
|
||||
self.seed = Seed(self.cfg, bytes.fromhex(hstr))
|
||||
self.seed = Seed(self.cfg, seed_bin=bytes.fromhex(hstr))
|
||||
self.ssdata.chksum = chk
|
||||
|
||||
self.check_usr_seed_len()
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class wallet(wallet):
|
|||
|
||||
if not self.cfg.stdin_tty:
|
||||
from ..ui import get_data_from_user
|
||||
return get_data_from_user(self.cfg, desc)
|
||||
return get_data_from_user(self.cfg, desc=desc)
|
||||
|
||||
self._print_seed_type()
|
||||
|
||||
|
|
@ -93,7 +93,7 @@ class wallet(wallet):
|
|||
desc2 = 'original mnemonic',
|
||||
e = 'Internal error')
|
||||
|
||||
self.seed = Seed(self.cfg, seed)
|
||||
self.seed = Seed(self.cfg, seed_bin=seed)
|
||||
self.ssdata.mnemonic = mn
|
||||
|
||||
self.check_usr_seed_len()
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ class wallet(wallet):
|
|||
msg(f'Invalid data length ({len(d)}) in {desc}')
|
||||
return False
|
||||
|
||||
self.seed = Seed(self.cfg, bytes.fromhex(d))
|
||||
self.seed = Seed(self.cfg, seed_bin=bytes.fromhex(d))
|
||||
|
||||
self.check_usr_seed_len()
|
||||
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ class wallet(wallet):
|
|||
msg(f'Invalid base-58 encoded seed: {b}')
|
||||
return False
|
||||
|
||||
self.seed = Seed(self.cfg, ret)
|
||||
self.seed = Seed(self.cfg, seed_bin=ret)
|
||||
self.ssdata.chksum = a
|
||||
self.ssdata.b58seed = b
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ class xmrseed(baseconv):
|
|||
wstr = ''.join(word[:3] for word in words)
|
||||
return words[crc32(wstr.encode()) % len(words)]
|
||||
|
||||
def tobytes(self, words_arg, pad=None):
|
||||
def tobytes(self, words_arg, *, pad=None):
|
||||
|
||||
assert isinstance(words_arg, (list, tuple)), 'words must be list or tuple'
|
||||
assert pad is None, f"{pad}: invalid 'pad' argument (must be None)"
|
||||
|
|
|
|||
|
|
@ -113,5 +113,5 @@ def op_cls(op_name):
|
|||
cls.name = op_name
|
||||
return cls
|
||||
|
||||
def op(op, cfg, infile, wallets, spec=None):
|
||||
def op(op, cfg, infile, wallets, *, spec=None):
|
||||
return op_cls(op.replace('-', '_'))(cfg, uargs(infile, wallets, spec))
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class MoneroMMGenFile:
|
|||
|
||||
silent_load = False
|
||||
|
||||
def make_chksum(self, keys=None):
|
||||
def make_chksum(self, *, keys=None):
|
||||
res = json.dumps(
|
||||
dict((k, v) for k, v in self.data._asdict().items() if (not keys or k in keys)),
|
||||
cls = json_encoder
|
||||
|
|
@ -30,11 +30,11 @@ class MoneroMMGenFile:
|
|||
|
||||
@property
|
||||
def base_chksum(self):
|
||||
return self.make_chksum(self.base_chksum_fields)
|
||||
return self.make_chksum(keys=self.base_chksum_fields)
|
||||
|
||||
@property
|
||||
def full_chksum(self):
|
||||
return self.make_chksum(self.full_chksum_fields) if self.full_chksum_fields else None
|
||||
return self.make_chksum(keys=self.full_chksum_fields) if self.full_chksum_fields else None
|
||||
|
||||
def check_checksums(self, d_wrap):
|
||||
for k in ('base_chksum', 'full_chksum'):
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ class MoneroWalletOutputsFile:
|
|||
)
|
||||
return fn.parent / fn.name[:-(len(self.ext)+self.ext_offset+1)]
|
||||
|
||||
def get_info(self, indent=''):
|
||||
def get_info(self, *, indent=''):
|
||||
if self.data.signed_key_images is not None:
|
||||
data = self.data.signed_key_images or []
|
||||
return f'{indent}{self.wallet_fn.name}: {len(data)} signed key image{suf(data)}'
|
||||
|
|
@ -96,7 +96,7 @@ class MoneroWalletOutputsFile:
|
|||
|
||||
class Completed(New):
|
||||
|
||||
def __init__(self, parent, fn=None, wallet_fn=None):
|
||||
def __init__(self, parent, *, fn=None, wallet_fn=None):
|
||||
def check_equal(desc, a, b):
|
||||
assert a == b, f'{desc} mismatch: {a} (from file) != {b} (from filename)'
|
||||
fn = fn or self.get_outfile(parent.cfg, wallet_fn)
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ class MoneroMMGenTX:
|
|||
def src_wallet_idx(self):
|
||||
return int(self.data.source.split(':')[0])
|
||||
|
||||
def get_info_oneline(self, indent='', addr_w=None):
|
||||
def get_info_oneline(self, *, indent='', addr_w=None):
|
||||
d = self.data
|
||||
return self.oneline_fs.format(
|
||||
a = yellow(d.network),
|
||||
|
|
@ -99,7 +99,7 @@ class MoneroMMGenTX:
|
|||
x = '->'
|
||||
)
|
||||
|
||||
def get_info(self, indent='', addr_w=None):
|
||||
def get_info(self, *, indent='', addr_w=None):
|
||||
d = self.data
|
||||
pmt_id = d.dest_address.parsed.payment_id
|
||||
fs = '\n'.join(list_gen(
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ from ..color import red, green, pink
|
|||
from ..addr import CoinAddr, AddrIdx
|
||||
from ..util import die
|
||||
|
||||
def gen_acct_addr_info(self, wallet_data, account, indent=''):
|
||||
def gen_acct_addr_info(self, wallet_data, account, *, indent=''):
|
||||
fs = indent + '{I:<3} {A} {U} {B} {L}'
|
||||
addrs_data = wallet_data.addrs_data[account]['addresses']
|
||||
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class OpBase:
|
|||
self.cfg.tx_relay_daemon,
|
||||
re.ASCII)
|
||||
|
||||
def display_tx_relay_info(self, indent=''):
|
||||
def display_tx_relay_info(self, *, indent=''):
|
||||
m = self.parse_tx_relay_opt()
|
||||
msg(fmt(f"""
|
||||
TX relay info:
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ class OpTxview(OpBase):
|
|||
footer = ''
|
||||
do_umount = False
|
||||
|
||||
async def main(self, cols=None):
|
||||
async def main(self, *, cols=None):
|
||||
|
||||
self.mount_removable_device()
|
||||
|
||||
|
|
|
|||
|
|
@ -163,7 +163,7 @@ class OpWallet(OpBase):
|
|||
|
||||
def create_addr_data(self):
|
||||
if self.uargs.wallets:
|
||||
idxs = AddrIdxList(self.uargs.wallets)
|
||||
idxs = AddrIdxList(fmt_str=self.uargs.wallets)
|
||||
self.addr_data = [d for d in self.kal.data if d.idx in idxs]
|
||||
if len(self.addr_data) != len(idxs):
|
||||
die(1, f'List {self.uargs.wallets!r} contains addresses not present in supplied key-address file')
|
||||
|
|
@ -183,7 +183,7 @@ class OpWallet(OpBase):
|
|||
self.c.daemon.force_kill = True
|
||||
self.c.daemon.stop()
|
||||
|
||||
def get_wallet_fn(self, data, watch_only=None):
|
||||
def get_wallet_fn(self, data, *, watch_only=None):
|
||||
if watch_only is None:
|
||||
watch_only = self.cfg.watch_only
|
||||
return Path(
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ class MoneroWalletRPC:
|
|||
MoneroMMGenTX.NewUnsigned if self.cfg.watch_only else
|
||||
MoneroMMGenTX.NewSigned)
|
||||
|
||||
def open_wallet(self, desc=None, refresh=True):
|
||||
def open_wallet(self, desc=None, *, refresh=True):
|
||||
add_desc = desc + ' ' if desc else self.parent.add_wallet_desc
|
||||
gmsg_r(f'\n Opening {add_desc}wallet...')
|
||||
self.c.call( # returns {}
|
||||
|
|
|
|||
|
|
@ -228,7 +228,7 @@ class CmdTestAutosignBase(CmdTestBase):
|
|||
t.expect('OK? (Y/n): ', '\n')
|
||||
from mmgen.mn_entry import mn_entry
|
||||
entry_mode = 'full'
|
||||
mne = mn_entry(cfg, mn_type, entry_mode)
|
||||
mne = mn_entry(cfg, mn_type, entry_mode=entry_mode)
|
||||
if usr_entry_modes:
|
||||
t.expect('user-configured')
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -1127,7 +1127,8 @@ class CmdTestEthdev(CmdTestBase, CmdTestShared):
|
|||
usr_addrs[i]))
|
||||
|
||||
def gen_addr(addr):
|
||||
return tool_cmd(cfg, cmdname='gen_addr', proto=self.proto).gen_addr(addr, dfl_words_file)
|
||||
return tool_cmd(
|
||||
cfg, cmdname='gen_addr', proto=self.proto).gen_addr(addr, wallet=dfl_words_file)
|
||||
|
||||
silence()
|
||||
usr_addrs = list(map(gen_addr, usr_mmaddrs))
|
||||
|
|
|
|||
|
|
@ -422,7 +422,7 @@ class CmdTestInput(CmdTestBase):
|
|||
mn = mn or sample_mn[fmt]['mn'].split()
|
||||
t = self.spawn('mmgen-tool', ['mn2hex_interactive', 'fmt='+fmt, 'mn_len=12', 'print_mn=1'])
|
||||
from mmgen.mn_entry import mn_entry
|
||||
mne = mn_entry(cfg, fmt, entry_mode)
|
||||
mne = mn_entry(cfg, fmt, entry_mode=entry_mode)
|
||||
t.expect(
|
||||
'Type a number.*: ',
|
||||
('\n' if enter_for_dfl else str(mne.entry_modes.index(entry_mode)+1)),
|
||||
|
|
@ -465,7 +465,7 @@ class CmdTestInput(CmdTestBase):
|
|||
t.expect('Type a number.*: ', '6', regex=True)
|
||||
t.expect('invalid')
|
||||
from mmgen.mn_entry import mn_entry
|
||||
mne = mn_entry(cfg, fmt, entry_mode)
|
||||
mne = mn_entry(cfg, fmt, entry_mode=entry_mode)
|
||||
t.expect('Type a number.*: ', str(mne.entry_modes.index(entry_mode)+1), regex=True)
|
||||
t.expect(r'Using entry mode (\S+)', regex=True)
|
||||
mode = strip_ansi_escapes(t.p.match.group(1)).lower()
|
||||
|
|
@ -489,7 +489,7 @@ class CmdTestInput(CmdTestBase):
|
|||
def mnemonic_entry_mmgen_minimal(self):
|
||||
from mmgen.mn_entry import mn_entry
|
||||
# erase_chars: '\b\x7f'
|
||||
m = mn_entry(cfg, 'mmgen', 'minimal')
|
||||
m = mn_entry(cfg, 'mmgen', entry_mode='minimal')
|
||||
np = 2
|
||||
mn = (
|
||||
'z',
|
||||
|
|
|
|||
|
|
@ -512,7 +512,7 @@ class CmdTestXMRWallet(CmdTestBase):
|
|||
+ ([] if data.autosign else [data.kafile])
|
||||
+ ([wallets] if wallets else [])
|
||||
)
|
||||
wlist = AddrIdxList(wallets) if wallets else MMGenRange(data.kal_range).items
|
||||
wlist = AddrIdxList(fmt_str=wallets) if wallets else MMGenRange(data.kal_range).items
|
||||
for n, wnum in enumerate(wlist, 1):
|
||||
t.expect('ing wallet {}/{} ({})'.format(
|
||||
n,
|
||||
|
|
|
|||
|
|
@ -84,13 +84,13 @@ async def run_test(network_id, chksum, msghash_type='raw'):
|
|||
print_total(await m.verify())
|
||||
|
||||
pumsg('\nTesting single address verification:\n')
|
||||
print_total(await m.verify(single_addr))
|
||||
print_total(await m.verify(addr=single_addr))
|
||||
|
||||
pumsg('\nTesting JSON dump for export:\n')
|
||||
msg(m.get_json_for_export())
|
||||
|
||||
pumsg('\nTesting single address JSON dump for export:\n')
|
||||
msg(m.get_json_for_export(single_addr))
|
||||
msg(m.get_json_for_export(addr=single_addr))
|
||||
|
||||
from mmgen.fileutil import write_data_to_file
|
||||
exported_sigs = os.path.join(tmpdir, 'signatures.json')
|
||||
|
|
@ -107,7 +107,7 @@ async def run_test(network_id, chksum, msghash_type='raw'):
|
|||
print_total(await m.verify())
|
||||
|
||||
pumsg('\nTesting single address verification (exported data):\n')
|
||||
print_total(await m.verify(single_addr_coin))
|
||||
print_total(await m.verify(addr=single_addr_coin))
|
||||
|
||||
pumsg('\nTesting display (exported data):\n')
|
||||
msg(m.format())
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ def do_test(
|
|||
proto = init_proto(cfg, coin or 'btc')
|
||||
seed = Seed(cfg, seed_bin=bytes.fromhex('feedbead'*8))
|
||||
mmtype = MMGenAddrType(proto, addrtype or 'C')
|
||||
idxs = AddrIdxList(idx_spec or '1-3')
|
||||
idxs = AddrIdxList(fmt_str=idx_spec or '1-3')
|
||||
|
||||
if cfg.verbose:
|
||||
debug_addrlist_save = cfg.debug_addrlist
|
||||
|
|
@ -84,7 +84,7 @@ class unit_tests:
|
|||
('2,4', '2,4'),
|
||||
('', ''),
|
||||
):
|
||||
l = AddrIdxList(i)
|
||||
l = AddrIdxList(fmt_str=i)
|
||||
if cfg.verbose:
|
||||
msg(f'list: {list(l)}\nin: {i}\nout: {o}\n')
|
||||
assert l.id_str == o, f'{l.id_str} != {o}'
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ class unit_tests:
|
|||
assert seed_hex == '3c30b98d3d9a713cf5a7a42f5dd27b3bf7f4d792d2b9225f6f519a0da978e13c6f36989ef2123b12a96d6ad5a443a95d61022ffaa9fbce8f946da7b67f75d339'
|
||||
|
||||
passwd = 'passw0rd'
|
||||
seed_hex = bip39().generate_seed(mnemonic.split(), passwd).hex()
|
||||
seed_hex = bip39().generate_seed(mnemonic.split(), passwd=passwd).hex()
|
||||
vmsg(f' Password: {orange(passwd)}\n {seed_hex}')
|
||||
assert seed_hex == '7eb773bf60f1a5071f96736b6ddbe5c544a7b7740182a80493e29577e58b7cde011d4e38d26f65dab6c9fdebe5594e523447a1427ffd60746e6d04b4daa42eb1'
|
||||
|
||||
|
|
|
|||
|
|
@ -66,7 +66,7 @@ class unit_test:
|
|||
|
||||
for a, b, c, d, e, f, h, i, p in test_data[id_str if id_str is not None else 'default']:
|
||||
seed_bin = bytes.fromhex('deadbeef' * a)
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
assert seed.sid == b, seed.sid
|
||||
|
||||
for share_count, j, k, l, m in (
|
||||
|
|
@ -103,7 +103,7 @@ class unit_test:
|
|||
|
||||
if master_idx:
|
||||
slist = [shares.get_share_by_idx(i+1, base_seed=True) for i in range(len(shares))]
|
||||
A = Seed.join_shares(cfg, slist, master_idx, id_str).sid
|
||||
A = Seed.join_shares(cfg, slist, master_idx=master_idx, id_str=id_str).sid
|
||||
assert A == b, A
|
||||
|
||||
msg('OK')
|
||||
|
|
@ -112,7 +112,7 @@ class unit_test:
|
|||
msg_r('Testing defaults and limits...')
|
||||
|
||||
seed_bin = bytes.fromhex('deadbeef' * 8)
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
|
||||
shares = seed.split(SeedShareIdx.max_val)
|
||||
s = shares.format()
|
||||
|
|
@ -136,7 +136,7 @@ class unit_test:
|
|||
vmsg('')
|
||||
|
||||
seed_bin = bytes.fromhex(seed_hex)
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
|
||||
SeedShareIdx.max_val = ss_count
|
||||
shares = seed.split(ss_count, master_idx=master_idx)
|
||||
|
|
@ -159,7 +159,7 @@ class unit_test:
|
|||
msg_r('Testing last share collisions with shortened Seed IDs')
|
||||
vmsg('')
|
||||
seed_bin = bytes.fromhex('2eadbeef'*8)
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
ssm_save = SeedShareIdx.max_val
|
||||
ssm = SeedShareIdx.max_val = 2048
|
||||
shares = SeedShareList(seed, count=ssm, id_str='foo', master_idx=1, debug_last_share=True)
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ class unit_test:
|
|||
):
|
||||
|
||||
seed_bin = bytes.fromhex('deadbeef' * a)
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
assert seed.sid == b, seed.sid
|
||||
|
||||
subseed = seed.subseed('2s')
|
||||
|
|
@ -40,7 +40,7 @@ class unit_test:
|
|||
assert subseed.idx == 10, subseed.idx
|
||||
assert subseed.ss_idx == h, subseed.ss_idx
|
||||
|
||||
seed2 = Seed(cfg, seed_bin)
|
||||
seed2 = Seed(cfg, seed_bin=seed_bin)
|
||||
ss2_list = seed2.subseeds
|
||||
|
||||
seed2.subseeds._generate(1)
|
||||
|
|
@ -98,31 +98,31 @@ class unit_test:
|
|||
|
||||
seed_bin = bytes.fromhex('deadbeef' * 8)
|
||||
|
||||
seed = Seed(cfg, seed_bin, nSubseeds=11)
|
||||
seed = Seed(cfg, seed_bin=seed_bin, nSubseeds=11)
|
||||
seed.subseeds._generate()
|
||||
ss = seed.subseeds
|
||||
assert len(ss.data['long']) == len(ss.data['short']), len(ss.data['short'])
|
||||
assert len(ss) == 11, len(ss)
|
||||
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
seed.subseeds._generate()
|
||||
ss = seed.subseeds
|
||||
assert len(ss.data['long']) == len(ss.data['short']), len(ss.data['short'])
|
||||
assert len(ss) == nSubseeds, len(ss)
|
||||
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
seed.subseed_by_seed_id('EEEEEEEE')
|
||||
ss = seed.subseeds
|
||||
assert len(ss.data['long']) == len(ss.data['short']), len(ss.data['short'])
|
||||
assert len(ss) == nSubseeds, len(ss)
|
||||
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
subseed = seed.subseed_by_seed_id('803B165C')
|
||||
assert len(ss.data['long']) == len(ss.data['short']), len(ss.data['short'])
|
||||
assert subseed.sid == '803B165C', subseed.sid
|
||||
assert subseed.idx == 3, subseed.idx
|
||||
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
subseed = seed.subseed_by_seed_id('803B165C', last_idx=1)
|
||||
assert len(ss.data['long']) == len(ss.data['short']), len(ss.data['short'])
|
||||
assert subseed is None, subseed
|
||||
|
|
@ -169,7 +169,7 @@ class unit_test:
|
|||
msg_r(f'Testing Seed ID collisions ({ss_count} subseed pairs)...')
|
||||
|
||||
seed_bin = bytes.fromhex('12abcdef' * 8) # 95B3D78D
|
||||
seed = Seed(cfg, seed_bin)
|
||||
seed = Seed(cfg, seed_bin=seed_bin)
|
||||
|
||||
seed.subseeds._generate(ss_count)
|
||||
ss = seed.subseeds
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue