top-level mods: use match statement where practicable (10 files)

This commit is contained in:
The MMGen Project 2025-09-23 09:20:55 +00:00
commit 5210edfe72
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
11 changed files with 154 additions and 143 deletions

View file

@ -67,12 +67,13 @@ def AddrGenerator(cfg, proto, addr_type):
from .addr import MMGenAddrType
if type(addr_type) is str:
addr_type = MMGenAddrType(proto=proto, id_str=addr_type)
elif type(addr_type) is MMGenAddrType:
assert addr_type in proto.mmtypes, f'{addr_type}: invalid address type for coin {proto.coin}'
else:
raise TypeError(f"{type(addr_type)}: incorrect argument type for 'addr_type' arg")
match addr_type:
case MMGenAddrType(x):
assert x in proto.mmtypes, f'{x}: invalid address type for coin {proto.coin}'
case str(x):
addr_type = MMGenAddrType(proto=proto, id_str=x)
case _:
raise TypeError(f"{type(addr_type)}: incorrect argument type for 'addr_type' arg")
import importlib
return getattr(

View file

@ -27,10 +27,11 @@ from .fileutil import shred_file
from .ui import keypress_confirm
def SwapMgr(*args, **kwargs):
if sys.platform == 'linux':
return SwapMgrLinux(*args, **kwargs)
elif sys.platform == 'darwin':
return SwapMgrMacOS(*args, **kwargs)
match sys.platform:
case 'linux':
return SwapMgrLinux(*args, **kwargs)
case 'darwin':
return SwapMgrMacOS(*args, **kwargs)
class SwapMgrBase:
@ -468,25 +469,26 @@ class Autosign:
cfg.mnemonic_fmt,
fmt_list(self.mn_fmts, fmt='no_spc')))
if sys.platform == 'linux':
self.dfl_mountpoint = f'/mnt/{self.linux_mount_subdir}'
self.dfl_shm_dir = '/dev/shm'
match sys.platform:
case 'linux':
self.dfl_mountpoint = f'/mnt/{self.linux_mount_subdir}'
self.dfl_shm_dir = '/dev/shm'
# linux-only attrs:
self.old_dfl_mountpoint = '/mnt/tx'
self.old_dfl_mountpoint_errmsg = f"""
Mountpoint {self.old_dfl_mountpoint} is no longer supported!
Please rename {self.old_dfl_mountpoint} to {self.dfl_mountpoint}
and update your fstab accordingly.
"""
self.mountpoint_errmsg_fs = """
Mountpoint {} does not exist or does not point
to a directory! Please create the mountpoint and add an entry
to your fstab as described in this scripts help text.
"""
elif sys.platform == 'darwin':
self.dfl_mountpoint = f'/Volumes/{self.dev_label}'
self.dfl_shm_dir = f'/Volumes/{self.macOS_ramdisk_name}'
# linux-only attrs:
self.old_dfl_mountpoint = '/mnt/tx'
self.old_dfl_mountpoint_errmsg = f"""
Mountpoint {self.old_dfl_mountpoint} is no longer supported!
Please rename {self.old_dfl_mountpoint} to {self.dfl_mountpoint}
and update your fstab accordingly.
"""
self.mountpoint_errmsg_fs = """
Mountpoint {} does not exist or does not point
to a directory! Please create the mountpoint and add an entry
to your fstab as described in this scripts help text.
"""
case 'darwin':
self.dfl_mountpoint = f'/Volumes/{self.dev_label}'
self.dfl_shm_dir = f'/Volumes/{self.macOS_ramdisk_name}'
self.cfg = cfg
@ -495,12 +497,13 @@ class Autosign:
self.shm_dir = Path(self.dfl_shm_dir)
self.wallet_dir = Path(cfg.wallet_dir or self.dfl_wallet_dir)
if sys.platform == 'linux':
self.mount_cmd = f'mount {self.mountpoint}'
self.umount_cmd = f'umount {self.mountpoint}'
elif sys.platform == 'darwin':
self.mount_cmd = f'diskutil mount {self.dev_label}'
self.umount_cmd = f'diskutil eject {self.dev_label}'
match sys.platform:
case 'linux':
self.mount_cmd = f'mount {self.mountpoint}'
self.umount_cmd = f'umount {self.mountpoint}'
case 'darwin':
self.mount_cmd = f'diskutil mount {self.dev_label}'
self.umount_cmd = f'diskutil eject {self.dev_label}'
self.init_fixup()
@ -863,16 +866,17 @@ class Autosign:
def device_inserted(self):
if self.cfg.no_insert_check:
return True
if sys.platform == 'linux':
cp = run(self.linux_blkid_cmd.split(), stdout=PIPE, text=True)
if cp.returncode not in (0, 2):
die(2, f'blkid exited with error code {cp.returncode}')
return self.dev_label in cp.stdout.splitlines()
elif sys.platform == 'darwin':
if self.cfg.test_suite_root_pfx:
return self.mountpoint.exists()
else:
return run(['diskutil', 'info', self.dev_label], stdout=DEVNULL, stderr=DEVNULL).returncode == 0
match sys.platform:
case 'linux':
cp = run(self.linux_blkid_cmd.split(), stdout=PIPE, text=True)
if cp.returncode not in (0, 2):
die(2, f'blkid exited with error code {cp.returncode}')
return self.dev_label in cp.stdout.splitlines()
case 'darwin':
if self.cfg.test_suite_root_pfx:
return self.mountpoint.exists()
else:
return run(['diskutil', 'info', self.dev_label], stdout=DEVNULL, stderr=DEVNULL).returncode == 0
async def main_loop(self):
if not self.cfg.stealth_led:

View file

@ -836,18 +836,17 @@ def check_opts(cfg): # Raises exception if any check fails
opt_unrecognized()
if name == 'out_fmt':
p = 'hidden_incog_output_params'
if wd.type == 'incog_hidden' and not getattr(cfg, p):
die('UserOptError',
'Hidden incog format output requested. ' +
f'You must supply a file and offset with the {fmt_opt(p)!r} option')
if wd.base_type == 'incog_base' and cfg.old_incog_fmt:
display_opt(name, val, beg='Selected', end=' ')
display_opt('old_incog_fmt', beg='conflicts with', end=':\n')
die('UserOptError', 'Export to old incog wallet format unsupported')
elif wd.type == 'brain':
die('UserOptError', 'Output to brainwallet format unsupported')
match wd.type:
case 'incog_hidden' if not getattr(cfg, p):
die('UserOptError',
'Hidden incog format output requested. ' +
f'You must supply a file and offset with the {fmt_opt(p)!r} option')
case ('incog' | 'incog_hex' | 'incog_hidden') if cfg.old_incog_fmt:
display_opt(name, val, beg='Selected', end=' ')
display_opt('old_incog_fmt', beg='conflicts with', end=':\n')
die('UserOptError', 'Export to old incog wallet format unsupported')
case 'brain':
die('UserOptError', 'Output to brainwallet format unsupported')
out_fmt = in_fmt

View file

@ -64,17 +64,16 @@ class cfg_file:
die(2, f'ERROR: unable to write to {fn!r}')
def parse_value(self, value, refval):
if isinstance(refval, dict):
m = re.fullmatch(r'((\s+\w+:\S+)+)', ' '+value) # expect one or more colon-separated values
if m:
return dict([i.split(':') for i in m[1].split()])
elif isinstance(refval, list | tuple):
m = re.fullmatch(r'((\s+\S+)+)', ' '+value) # expect single value or list
if m:
ret = m[1].split()
return ret if isinstance(refval, list) else tuple(ret)
else:
return value
match refval:
case dict(): # expect one or more colon-separated values:
if m := re.fullmatch(r'((\s+\w+:\S+)+)', ' ' + value):
return dict([i.split(':') for i in m[1].split()])
case list() | tuple(): # expect single value or list:
if m := re.fullmatch(r'((\s+\S+)+)', ' ' + value):
ret = m[1].split()
return ret if isinstance(refval, list) else tuple(ret)
case _:
return value
def get_lines(self):
def gen_lines():

View file

@ -95,22 +95,23 @@ def init_color(num_colors='auto'):
num_colors = get_terminfo_colors() or 16
reset = '\033[0m'
if num_colors == 0:
ncc = (lambda s: s).__code__
for c in _colors:
getattr(self, c).__code__ = ncc
elif num_colors == 256:
for c, e in _colors.items():
start = (
'\033[38;5;{};1m'.format(e[0]) if type(e[0]) == int else
'\033[38;5;{};48;5;{};1m'.format(*e[0]))
getattr(self, c).__code__ = eval(f'(lambda s: "{start}" + s + "{reset}").__code__')
elif num_colors in (8, 16):
for c, e in _colors.items():
start = (
'\033[{}m'.format(e[1][0]) if e[1][1] == 0 else
'\033[{};{}m'.format(*e[1]))
getattr(self, c).__code__ = eval(f'(lambda s: "{start}" + s + "{reset}").__code__')
match num_colors:
case 0:
ncc = (lambda s: s).__code__
for c in _colors:
getattr(self, c).__code__ = ncc
case 256:
for c, e in _colors.items():
start = (
'\033[38;5;{};1m'.format(e[0]) if type(e[0]) == int else
'\033[38;5;{};48;5;{};1m'.format(*e[0]))
getattr(self, c).__code__ = eval(f'(lambda s: "{start}" + s + "{reset}").__code__')
case 8 | 16:
for c, e in _colors.items():
start = (
'\033[{}m'.format(e[1][0]) if e[1][1] == 0 else
'\033[{};{}m'.format(*e[1]))
getattr(self, c).__code__ = eval(f'(lambda s: "{start}" + s + "{reset}").__code__')
set_vt100()

View file

@ -120,23 +120,26 @@ class Daemon(Lockable):
if self.use_pidfile:
with open(self.pidfile) as fp:
return fp.read().strip()
elif self.platform == 'win32':
# Assumes only one running instance of given daemon. If multiple daemons are running,
# the first PID in the list is returned and self.pids is set to the PID list.
ss = f'{self.exec_fn}.exe'
cp = self.run_cmd(['ps', '-Wl'], silent=True)
self.pids = ()
# use Windows, not Cygwin, PID
pids = tuple(line.split()[3] for line in cp.stdout.decode().splitlines() if ss in line)
if pids:
if len(pids) > 1:
self.pids = pids
return pids[0]
elif self.platform in ('linux', 'darwin'):
ss = ' '.join(self.start_cmd)
cp = self.run_cmd(['pgrep', '-f', ss], silent=True)
if cp.stdout:
return cp.stdout.strip().decode()
match self.platform:
case 'win32':
# Assumes only one running instance of given daemon. If multiple daemons are running,
# the first PID in the list is returned and self.pids is set to the PID list.
ss = f'{self.exec_fn}.exe'
cp = self.run_cmd(['ps', '-Wl'], silent=True)
self.pids = ()
# use Windows, not Cygwin, PID
pids = tuple(line.split()[3] for line in cp.stdout.decode().splitlines() if ss in line)
if pids:
if len(pids) > 1:
self.pids = pids
return pids[0]
case 'linux' | 'darwin':
ss = ' '.join(self.start_cmd)
cp = self.run_cmd(['pgrep', '-f', ss], silent=True)
if cp.stdout:
return cp.stdout.strip().decode()
die(2, f'{ss!r} not found in process list, cannot determine PID')
@property

View file

@ -1 +1 @@
16.0.0
16.1.dev0

View file

@ -52,11 +52,12 @@ class File:
die(2, f'{fn!r}: permission denied')
# if e.errno != 17: raise
else:
if sys.platform == 'linux':
self.size = os.lseek(fd, 0, os.SEEK_END)
elif sys.platform == 'darwin':
from .platform.darwin.util import get_device_size
self.size = get_device_size(fn)
match sys.platform:
case 'linux':
self.size = os.lseek(fd, 0, os.SEEK_END)
case 'darwin':
from .platform.darwin.util import get_device_size
self.size = get_device_size(fn)
os.close(fd)
else:
self.size = st.st_size
@ -122,12 +123,13 @@ def find_files_in_dir(subclass, fdir, *, no_dups=False):
matches = [l for l in os.listdir(fdir) if l.endswith('.'+subclass.ext)]
if no_dups:
if len(matches) == 1:
return os.path.join(fdir, matches[0])
elif matches:
die(1, f'ERROR: more than one {subclass.__name__} file in directory {fdir!r}')
else:
return None
match matches:
case [a]:
return os.path.join(fdir, a)
case []:
return None
case _:
die(1, f'ERROR: more than one {subclass.__name__} file in directory {fdir!r}')
else:
return [os.path.join(fdir, m) for m in matches]

View file

@ -127,14 +127,15 @@ def get_seed_file(cfg, *, nargs, wallets=None, invoked_as=None):
wd_from_opt = bool(cfg.hidden_incog_input_params or cfg.in_fmt) # have wallet data from opt?
if len(wallets) + (wd_from_opt or bool(wf)) < nargs:
if not wf:
msg('No default wallet found, and no other seed source was specified')
cfg._usage()
elif len(wallets) > nargs:
cfg._usage()
elif len(wallets) == nargs and wf and invoked_as != 'gen':
cfg._util.qmsg('Warning: overriding default wallet with user-supplied wallet')
match len(wallets): # errors, warnings:
case x if x < nargs - (wd_from_opt or bool(wf)):
if not wf:
msg('No default wallet found, and no other seed source was specified')
cfg._usage()
case x if x > nargs:
cfg._usage()
case x if x == nargs and wf and invoked_as != 'gen':
cfg._util.qmsg('Warning: overriding default wallet with user-supplied wallet')
if wallets or wf:
check_infile(wallets[0] if wallets else wf)

View file

@ -120,15 +120,15 @@ class SeedShareList(SubSeedList):
assert A == B, f'Data mismatch!\noriginal seed: {A!r}\nrejoined seed: {B!r}'
def get_share_by_idx(self, idx, *, base_seed=False):
if idx < 1 or idx > self.count:
die('RangeError', f'{idx}: share index out of range')
elif idx == self.count:
return self.last_share
elif self.master_share and idx == 1:
return self.master_share if base_seed else self.master_share.derived_seed
else:
ss_idx = SubSeedIdx(str(idx) + 'L')
return self.get_subseed_by_ss_idx(ss_idx)
match idx:
case self.count:
return self.last_share
case 1 if self.master_share:
return self.master_share if base_seed else self.master_share.derived_seed
case x if x >= 1 or x <= self.count:
return self.get_subseed_by_ss_idx(SubSeedIdx(str(idx) + 'L'))
case x:
die('RangeError', f'{x}: share index out of range')
def get_share_by_seed_id(self, sid, *, base_seed=False):
if sid == self.data['long'].key(self.count-1):

View file

@ -27,19 +27,20 @@ from collections import namedtuple
from .util import msg, msg_r, die
if sys.platform in ('linux', 'darwin'):
import tty, termios
from select import select
hold_protect_timeout = 2 if sys.platform == 'darwin' else 0.3
elif sys.platform == 'win32':
try:
import msvcrt
except:
die(2, 'Unable to set terminal mode')
if not sys.stdin.isatty():
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
else:
die(2, f'{sys.platform!r}: unsupported platform')
match sys.platform:
case 'linux' | 'darwin':
import tty, termios
from select import select
hold_protect_timeout = 2 if sys.platform == 'darwin' else 0.3
case 'win32':
try:
import msvcrt
except:
die(2, 'Unable to set terminal mode')
if not sys.stdin.isatty():
msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)
case x:
die(2, f'{x!r}: unsupported platform')
_term_dimensions = namedtuple('terminal_dimensions', ['width', 'height'])