ui.keypress_confirm(): add do_exit, exit_msg params

This commit is contained in:
The MMGen Project 2025-03-25 09:46:01 +00:00
commit c452db345c
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
12 changed files with 129 additions and 124 deletions

View file

@ -233,18 +233,17 @@ class Signable:
def shred_abortable(self):
files = self.get_abortable() # raises AutosignTXError if no unsent TXs available
if keypress_confirm(
self.cfg,
'The following file{} will be securely deleted:\n{}\nOK?'.format(
suf(files),
fmt_list(map(str, files), fmt='col', indent=' '))):
for f in files:
msg(f'Shredding file ‘{f}')
from .fileutil import shred_file
shred_file(f)
sys.exit(0)
else:
die(1, 'Exiting at user request')
keypress_confirm(
self.cfg,
'The following file{} will be securely deleted:\n{}\nOK?'.format(
suf(files),
fmt_list(map(str, files), fmt='col', indent=' ')),
do_exit = True)
for f in files:
msg(f'Shredding file ‘{f}')
from .fileutil import shred_file
shred_file(f)
sys.exit(0)
async def get_last_created(self):
from .tx import CompletedTX

View file

@ -119,11 +119,11 @@ def check_opts(twctl):
if rescan and not cfg.quiet:
from .ui import keypress_confirm
if not keypress_confirm(
cfg,
f'\n{addrimport_msgs["rescan"]}\n\nContinue?',
default_yes = True):
die(1, 'Exiting at user request')
keypress_confirm(
cfg,
f'\n{addrimport_msgs["rescan"]}\n\nContinue?',
default_yes = True,
do_exit = True)
if batch and not 'batch' in twctl.caps:
msg(f"‘--batch’ ignored: not supported by {type(twctl).__name__}")

View file

@ -189,12 +189,12 @@ class PasswordList(AddrList):
if pf in ('bip39', 'hex') and pw_bytes < seed.byte_len:
from .ui import keypress_confirm
if not keypress_confirm(
self.cfg,
f'WARNING: requested {self.pw_info[pf].desc} length has less entropy ' +
'than underlying seed!\nIs this what you want?',
default_yes = True):
die(1, 'Exiting at user request')
keypress_confirm(
self.cfg,
f'WARNING: requested {self.pw_info[pf].desc} length has less entropy ' +
'than underlying seed!\nIs this what you want?',
default_yes = True,
do_exit = True)
def gen_passwd(self, secbytes):
assert self.pw_fmt in self.pw_info

View file

@ -218,5 +218,4 @@ class Base(MMGenObject):
msg(f'\n{indent}WARNING: {m}\n')
if not self.cfg.yes:
from ..ui import keypress_confirm
if not keypress_confirm(self.cfg, 'Continue?', default_yes=True):
die(1, 'Exiting at user request')
keypress_confirm(self.cfg, 'Continue?', default_yes=True, do_exit=True)

View file

@ -297,13 +297,13 @@ class New(Base):
def confirm_autoselected_addr(self, mmid, desc):
from ..ui import keypress_confirm
if not keypress_confirm(
self.cfg,
'Using {a} as {b}. OK?'.format(
a = mmid.hl(),
b = 'single output address' if len(self.nondata_outputs) == 1 else desc),
default_yes = True):
die(1, 'Exiting at user request')
keypress_confirm(
self.cfg,
'Using {a} as {b}. OK?'.format(
a = mmid.hl(),
b = 'single output address' if len(self.nondata_outputs) == 1 else desc),
default_yes = True,
do_exit = True)
async def warn_addr_used(self, proto, chg, desc):
if proto.address_reuse_ok:
@ -311,17 +311,17 @@ class New(Base):
from ..tw.addresses import TwAddresses
if (await TwAddresses(self.cfg, proto, get_data=True)).is_used(chg.addr):
from ..ui import keypress_confirm
if not keypress_confirm(
self.cfg,
'{a} {b} {c}\n{d}'.format(
a = yellow(f'Requested {desc}'),
b = chg.mmid.hl() if chg.mmid else chg.addr.hl(chg.addr.view_pref),
c = yellow('is already used!'),
d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
),
complete_prompt = True,
default_yes = False):
die(1, 'Exiting at user request')
keypress_confirm(
self.cfg,
'{a} {b} {c}\n{d}'.format(
a = yellow(f'Requested {desc}'),
b = chg.mmid.hl() if chg.mmid else chg.addr.hl(chg.addr.view_pref),
c = yellow('is already used!'),
d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
),
complete_prompt = True,
default_yes = False,
do_exit = True)
# inputs methods
def get_unspent_nums_from_user(self, unspent):

View file

@ -87,7 +87,14 @@ def keypress_confirm(
default_yes = False,
verbose = False,
no_nl = False,
complete_prompt = False):
complete_prompt = False,
do_exit = False,
exit_msg = 'Exiting at user request'):
def do_return(retval):
if do_exit and not retval:
die(1, exit_msg)
return retval
if not complete_prompt:
prompt = '{} {}: '.format(prompt, '(Y/n)' if default_yes else '(y/N)')
@ -96,17 +103,17 @@ def keypress_confirm(
if cfg.accept_defaults:
msg(prompt)
return default_yes
return do_return(default_yes)
from .term import get_char
while True:
reply = get_char(prompt, immed_chars='yYnN').strip('\n\r')
if not reply:
msg_r(nl)
return default_yes
return do_return(default_yes)
elif reply in 'yYnN':
msg_r(nl)
return reply in 'yY'
return do_return(reply in 'yY')
else:
msg_r('\nInvalid reply\n' if verbose else '\r')

View file

@ -100,23 +100,23 @@ class wallet(wallet):
os.stat(fn)
except:
from ..ui import keypress_confirm, line_input
if keypress_confirm(
self.cfg,
f'Requested file {fn!r} does not exist. Create?',
default_yes = True):
min_fsize = d.target_data_len + d.hincog_offset
msg('\n ' + self.msg['choose_file_size'].strip().format(min_fsize)+'\n')
while True:
fsize = parse_bytespec(line_input(self.cfg, 'Enter file size: '))
if fsize >= min_fsize:
break
msg(f'File size must be an integer no less than {min_fsize}')
keypress_confirm(
self.cfg,
f'Requested file {fn!r} does not exist. Create?',
default_yes = True,
do_exit = True)
from ..tool.fileutil import tool_cmd
tool_cmd(self.cfg).rand2file(fn, str(fsize))
check_offset = False
else:
die(1, 'Exiting at user request')
min_fsize = d.target_data_len + d.hincog_offset
msg('\n ' + self.msg['choose_file_size'].strip().format(min_fsize)+'\n')
while True:
fsize = parse_bytespec(line_input(self.cfg, 'Enter file size: '))
if fsize >= min_fsize:
break
msg(f'File size must be an integer no less than {min_fsize}')
from ..tool.fileutil import tool_cmd
tool_cmd(self.cfg).rand2file(fn, str(fsize))
check_offset = False
from ..filename import MMGenFile
f = MMGenFile(fn, subclass=type(self), write=True)

View file

@ -62,26 +62,23 @@ class OpRelay(OpBase):
if self.cfg.tx_relay_daemon:
self.display_tx_relay_info(indent=' ')
if keypress_confirm(self.cfg, 'Relay transaction?'):
if self.cfg.tx_relay_daemon:
msg_r('Relaying transaction to remote daemon, please be patient...')
t_start = time.time()
res = self.dc.call_raw(
'send_raw_transaction',
tx_as_hex = self.tx.data.blob
)
if res['status'] == 'OK':
if res['not_relayed']:
msg('not relayed')
ymsg('Transaction not relayed')
else:
msg('success')
if self.cfg.tx_relay_daemon:
from ...util2 import format_elapsed_hr
msg(f'Relay time: {format_elapsed_hr(t_start, rel_now=False, show_secs=True)}')
gmsg('OK')
return True
keypress_confirm(self.cfg, 'Relay transaction?', do_exit=True)
if self.cfg.tx_relay_daemon:
msg_r('Relaying transaction to remote daemon, please be patient...')
t_start = time.time()
res = self.dc.call_raw('send_raw_transaction', tx_as_hex=self.tx.data.blob)
if res['status'] == 'OK':
if res['not_relayed']:
msg('not relayed')
ymsg('Transaction not relayed')
else:
die('RPCFailure', repr(res))
msg('success')
if self.cfg.tx_relay_daemon:
from ...util2 import format_elapsed_hr
msg(f'Relay time: {format_elapsed_hr(t_start, rel_now=False, show_secs=True)}')
gmsg('OK')
return True
else:
die(1, 'Exiting at user request')
die('RPCFailure', repr(res))

View file

@ -89,19 +89,18 @@ class OpSubmit(OpWallet):
if self.cfg.tx_relay_daemon:
self.display_tx_relay_info(indent=' ')
if keypress_confirm(self.cfg, f'{self.name.capitalize()} transaction?'):
if self.cfg.tx_relay_daemon:
msg_r('Relaying transaction to remote daemon, please be patient...')
t_start = time.time()
res = self.c.call(
'submit_transfer',
tx_data_hex = tx.data.signed_txset)
assert res['tx_hash_list'][0] == tx.data.txid, 'TxID mismatch in ‘submit_transfer’ result!'
if self.cfg.tx_relay_daemon:
from ...util2 import format_elapsed_hr
msg(f'success\nRelay time: {format_elapsed_hr(t_start, rel_now=False, show_secs=True)}')
else:
die(1, 'Exiting at user request')
keypress_confirm(self.cfg, f'{self.name.capitalize()} transaction?', do_exit=True)
if self.cfg.tx_relay_daemon:
msg_r('Relaying transaction to remote daemon, please be patient...')
t_start = time.time()
res = self.c.call(
'submit_transfer',
tx_data_hex = tx.data.signed_txset)
assert res['tx_hash_list'][0] == tx.data.txid, 'TxID mismatch in ‘submit_transfer’ result!'
if self.cfg.tx_relay_daemon:
from ...util2 import format_elapsed_hr
msg(f'success\nRelay time: {format_elapsed_hr(t_start, rel_now=False, show_secs=True)}')
new_tx = MoneroMMGenTX.NewSubmitted(
cfg = self.cfg,

View file

@ -70,9 +70,12 @@ class OpSweep(OpMixinSpec, OpWallet):
def create_new_addr_maybe(h, account, label):
if keypress_confirm(self.cfg, f'\nCreate new address for account #{account}?'):
return h.create_new_addr(account, label)
elif not keypress_confirm(self.cfg, f'Sweep to last existing address of account #{account}?'):
die(1, 'Exiting at user request')
return None
else:
keypress_confirm(
self.cfg,
f'Sweep to last existing address of account #{account}?',
do_exit = True)
return None
dest_addr_chk = None
@ -97,11 +100,13 @@ class OpSweep(OpMixinSpec, OpWallet):
label = f'{self.name} from {self.source.idx}:{self.account} [{make_timestr()}]')
dest_addr_idx = 0
h2.get_wallet_data()
elif keypress_confirm(self.cfg, f'Sweep to last existing account of wallet {wf.name!r}?'):
else:
keypress_confirm(
self.cfg,
f'Sweep to last existing account of wallet {wf.name!r}?',
do_exit = True)
dest_acct, dest_addr_chk = h2.get_last_acct(wallet_data2.accts_data)
dest_addr, dest_addr_idx = h2.get_last_addr(dest_acct, wallet_data2, display=False)
else:
die(1, 'Exiting at user request')
h2.close_wallet('destination')
h.open_wallet('source', refresh=False)
@ -181,19 +186,22 @@ class OpSweep(OpMixinSpec, OpWallet):
if self.cfg.no_relay or self.cfg.autosign:
return True
if keypress_confirm(self.cfg, f'Relay {self.name} transaction?'):
if self.cfg.tx_relay_daemon:
await h.stop_wallet('source')
msg('')
self.init_tx_relay_daemon()
h = MoneroWalletRPC(self, self.source)
h.open_wallet('TX-relay-configured source', refresh=False)
msg_r(f'\n Relaying {self.name} transaction...')
h.relay_tx(new_tx.data.metadata)
gmsg('\nAll done')
return True
else:
die(1, '\nExiting at user request')
keypress_confirm(
self.cfg,
f'Relay {self.name} transaction?',
do_exit = True,
exit_msg = '\nExiting at user request')
if self.cfg.tx_relay_daemon:
await h.stop_wallet('source')
msg('')
self.init_tx_relay_daemon()
h = MoneroWalletRPC(self, self.source)
h.open_wallet('TX-relay-configured source', refresh=False)
msg_r(f'\n Relaying {self.name} transaction...')
h.relay_tx(new_tx.data.metadata)
gmsg('\nAll done')
return True
class OpSweepAll(OpSweep):
stem = 'sweep'

View file

@ -203,10 +203,8 @@ class CmdTestXMRWallet(CmdTestBase):
""", indent=' ', strip_char='\t'))
from mmgen.ui import keypress_confirm
if keypress_confirm(cfg, 'Continue?'):
start_proxy()
else:
die(1, 'Exiting at user request')
keypress_confirm(cfg, 'Continue?', do_exit=True)
start_proxy()
else:
die(2, fmt(f"""
Please start the SSH SOCKS proxy by entering the following command:

View file

@ -24,9 +24,7 @@ opts_data = {
cfg = Config(opts_data=opts_data)
def confirm_or_exit(prompt):
if not keypress_confirm(cfg, f'{prompt}. OK?', default_yes=True):
msg('Exiting at user request')
sys.exit(1)
keypress_confirm(cfg, f'{prompt}. OK?', default_yes=True, do_exit=True)
confirm_or_exit('This script will interactively test LED functionality')