util.py: relocate user-prompting functions to ui.py

This commit is contained in:
The MMGen Project 2022-10-17 18:37:22 +00:00
commit 1d8b908c7c
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
45 changed files with 300 additions and 229 deletions

View file

@ -20,14 +20,7 @@
addrfile.py: Address and password file classes for the MMGen suite
"""
from .util import (
msg,
qmsg,
qmsg_r,
die,
capfirst,
keypress_confirm,
)
from .util import msg,qmsg,qmsg_r,die,capfirst
from .protocol import init_proto
from .obj import MMGenObject,TwComment,WalletPassword,MMGenPWIDString
from .seed import SeedID,is_seed_id
@ -166,8 +159,8 @@ class AddrFile(MMGenObject):
ret.append(a)
if p.has_keys and p.ka_validity_chk != False:
from .opts import opt
if opt.yes or p.ka_validity_chk == True or keypress_confirm('Check key-to-address validity?'):
def verify_keys():
from .addrgen import KeyGenerator,AddrGenerator
kg = KeyGenerator(p.proto,p.al_id.mmtype.pubkey_type)
ag = AddrGenerator(p.proto,p.al_id.mmtype)
@ -178,6 +171,14 @@ class AddrFile(MMGenObject):
f'Key doesn’t match address!\n {e.sec.wif}\n {e.addr}')
qmsg(' - done')
from .opts import opt
if opt.yes or p.ka_validity_chk == True:
verify_keys()
else:
from .ui import keypress_confirm
if keypress_confirm('Check key-to-address validity?'):
verify_keys()
return ret
def parse_file(self,fn,buf=[],exit_on_error=True):

View file

@ -271,6 +271,8 @@ class CfgFileSampleUsr(CfgFileSample):
{' ' + fmt_list(bad,fmt='bare')}
"""
ymsg(fmt(m,indent=' ',strip_char='\t'))
from .ui import keypress_confirm,do_pager
while True:
if not keypress_confirm(self.details_confirm_prompt,no_nl=True):
return

View file

@ -34,8 +34,6 @@ from .util import (
qmsg,
fmt,
die,
line_input,
get_words_from_user,
make_chksum_8,
compare_chksums,
oneshot_warning,
@ -246,6 +244,7 @@ def _get_random_data_from_user(uchars,desc):
if g.debug:
msg(f'USER ENTROPY (user input + keystroke timings):\n{ret}')
from .ui import line_input
line_input('User random data successfully acquired. Press ENTER to continue: ')
return ret.encode()
@ -299,6 +298,7 @@ def get_hash_preset_from_user(
f'Enter hash preset for {data_desc},\n' +
f'or hit ENTER to accept the default value ({hash_preset!r}): ' )
from .ui import line_input
while True:
ret = line_input(prompt)
if ret:
@ -324,6 +324,7 @@ def get_new_passphrase(data_desc,hash_preset,passwd_file,pw_desc='passphrase'):
quiet = pwfile_reuse_warning(passwd_file).warning_shown ))
else:
qmsg('\n'+fmt(message,indent=' '))
from .ui import get_words_from_user
if opt.echo_passphrase:
pw = ' '.join(get_words_from_user(f'Enter {pw_desc} for {data_desc}: '))
else:
@ -352,6 +353,7 @@ def get_passphrase(data_desc,passwd_file,pw_desc='passphrase'):
desc = f'{pw_desc} for {data_desc}',
quiet = pwfile_reuse_warning(passwd_file).warning_shown ))
else:
from .ui import get_words_from_user
return ' '.join(get_words_from_user(f'Enter {pw_desc} for {data_desc}: '))
mmenc_salt_len = 32

View file

@ -29,13 +29,11 @@ from .util import (
qmsg,
dmsg,
die,
confirm_or_raise,
get_extension,
is_utf8,
capfirst,
make_full_path,
strip_comments,
keypress_confirm,
)
def check_or_create_dir(path):
@ -181,6 +179,7 @@ def write_data_to_file( outfile,data,desc='data',
if no_tty:
die(2,f'Printing {desc} to screen is not allowed')
if (ask_tty and not opt.quiet) or binary:
from .ui import confirm_or_raise
confirm_or_raise(
message = '',
action = f'output {desc} to screen' )
@ -193,6 +192,7 @@ def write_data_to_file( outfile,data,desc='data',
if no_tty:
die(2,f'Writing {desc} to pipe is not allowed')
if ask_tty and not opt.quiet:
from .ui import confirm_or_raise
confirm_or_raise(
message = '',
action = f'output {desc} to pipe' )
@ -219,12 +219,14 @@ def write_data_to_file( outfile,data,desc='data',
if ask_write:
if not ask_write_prompt:
ask_write_prompt = f'Save {desc}?'
from .ui import keypress_confirm
if not keypress_confirm(ask_write_prompt,
default_yes=ask_write_default_yes):
die(1,f'{capfirst(desc)} not saved')
hush = False
if os.path.lexists(outfile) and ask_overwrite:
from .ui import confirm_or_raise
confirm_or_raise(
message = '',
action = f'File {outfile!r} already exists\nOverwrite?' )

View file

@ -148,6 +148,7 @@ idxs = mmgen.addrlist.AddrIdxList( fmt_str=cmd_args.pop() )
from .fileutil import get_seed_file
sf = get_seed_file(cmd_args,1)
from .ui import do_license_msg
do_license_msg()
ss = Wallet(sf)
@ -170,6 +171,7 @@ af.format()
if al.gen_addrs and opt.print_checksum:
Die(0,al.checksum)
from .ui import keypress_confirm
if al.gen_keys and keypress_confirm('Encrypt key list?'):
af.encrypt()
af.write(

View file

@ -120,6 +120,7 @@ def check_opts(tw):
rescan = False
if rescan and not opt.quiet:
from .ui import keypress_confirm
if not keypress_confirm(
'\n{}\n\nContinue?'.format(addrimport_msgs['rescan']),
default_yes = True ):

View file

@ -154,6 +154,7 @@ PasswordList(
pw_fmt = pw_fmt,
chk_params_only = True )
from .ui import do_license_msg
do_license_msg()
ss = Wallet(sf)
@ -170,6 +171,7 @@ af = al.get_file()
af.format()
from .ui import keypress_confirm
if keypress_confirm('Encrypt password list?'):
af.encrypt()
af.write(binary=True,desc='encrypted password list')

View file

@ -119,10 +119,12 @@ if opt.id_str and not opt.master_share:
die(1,'--id-str option meaningless in context of non-master-share join')
from .fileutil import check_infile
from .wallet import check_wallet_extension
for arg in cmd_args:
check_wallet_extension(arg)
check_infile(arg)
from .ui import do_license_msg
do_license_msg()
qmsg('Input files:\n {}\n'.format('\n '.join(cmd_args)))

View file

@ -294,15 +294,24 @@ def process_args(cmd,cmd_args,cls):
return ( args, kwargs )
def process_result(ret,pager=False,print_result=False):
from .util import Msg,die
"""
Convert result to something suitable for output to screen and return it.
If result is bytes and not convertible to utf8, output as binary using os.write().
If 'print_result' is True, send the converted result directly to screen or
pager instead of returning it.
"""
from .util import Msg,die
def triage_result(o):
return o if not print_result else do_pager(o) if pager else Msg(o)
if print_result:
if pager:
from .ui import do_pager
do_pager(o)
else:
Msg(o)
else:
return o
if ret == True:
return True
@ -316,8 +325,7 @@ def process_result(ret,pager=False,print_result=False):
return triage_result('\n'.join([r.decode() if isinstance(r,bytes) else r for r in ret]))
elif isinstance(ret,bytes):
try:
o = ret.decode()
return o if not print_result else do_pager(o) if pager else Msg(o)
return triage_result(ret.decode())
except:
# don't add NL to binary data if it can't be converted to utf8
if print_result:

View file

@ -111,6 +111,7 @@ from .tx.sign import *
seed_files = get_seed_files(opt,cmd_args) if (cmd_args or opt.send) else None
from .ui import do_license_msg
do_license_msg()
silent = opt.yes and opt.tx_fee != None and opt.output_to_reduce != None

View file

@ -48,6 +48,7 @@ else:
opts.usage()
if not opt.status:
from .ui import do_license_msg
do_license_msg()
async def main():

View file

@ -106,6 +106,7 @@ for i in infiles:
check_infile(i)
if not opt.info and not opt.terse_info:
from .ui import do_license_msg
do_license_msg(immed=True)
from .tx.sign import *

View file

@ -176,6 +176,7 @@ if cmd_args:
sf = get_seed_file(cmd_args,nargs,invoked_as=invoked_as)
if invoked_as != 'chk':
from .ui import do_license_msg
do_license_msg()
if invoked_as == 'gen':
@ -242,6 +243,7 @@ if invoked_as == 'passchg':
return old_fn
if ss_in.infile.dirname == g.data_dir:
from .ui import confirm_or_raise
confirm_or_raise(
message = yellow('Confirmation of default wallet update'),
action = 'update the default wallet',
@ -252,18 +254,21 @@ if invoked_as == 'passchg':
else:
old_wallet = rename_old_wallet_maybe(silent=False)
ss_out.write_to_file()
from .ui import keypress_confirm
if keypress_confirm(f'Securely delete old wallet {old_wallet!r}?'):
secure_delete( old_wallet )
elif invoked_as == 'gen' and not opt.outdir and not opt.stdout:
from .filename import find_file_in_dir
if (
not find_file_in_dir( get_wallet_cls('mmgen'), g.data_dir )
and keypress_confirm(
'Make this wallet your default and move it to the data directory?',
default_yes = True ) ):
ss_out.write_to_file(outdir=g.data_dir)
else:
if find_file_in_dir( get_wallet_cls('mmgen'), g.data_dir ):
ss_out.write_to_file()
else:
from .ui import keypress_confirm
if keypress_confirm(
'Make this wallet your default and move it to the data directory?',
default_yes = True ):
ss_out.write_to_file(outdir=g.data_dir)
else:
ss_out.write_to_file()
else:
ss_out.write_to_file()

View file

@ -78,7 +78,7 @@ def print_help(po,opts_data,opt_filter):
opts_data['text']['long_options'] = d
remove_unneeded_long_opts()
from .util import do_pager
from .ui import do_pager
do_pager(
mmgen.share.Opts.make_help(
proto,

View file

@ -22,7 +22,7 @@ passwdlist.py: Password list class for the MMGen suite
from collections import namedtuple
from .util import ymsg,is_int,keypress_confirm,die
from .util import ymsg,is_int,die
from .obj import ImmutableAttr,ListItemAttr,MMGenPWIDString,TwComment
from .key import PrivKey
from .addr import MMGenPasswordType,AddrIdx,AddrListID
@ -184,6 +184,7 @@ class PasswordList(AddrList):
).format(good_pw_len) )
if pf in ('bip39','hex') and pw_bytes < seed.byte_len:
from .ui import keypress_confirm
if not keypress_confirm(
f'WARNING: requested {self.pw_info[pf].desc} length has less entropy ' +
'than underlying seed!\nIs this what you want?',

View file

@ -30,6 +30,7 @@ def create_data_dir(data_dir):
try: os.stat(os.path.join(data_dir,'regtest'))
except: pass
else:
from ...ui import keypress_confirm
if keypress_confirm(
f'Delete your existing MMGen regtest setup at {data_dir!r} and create a new one?'):
shutil.rmtree(data_dir)

View file

@ -19,7 +19,7 @@ from .base import Base,TokenBase
from ....opts import opt
from ....obj import Int,ETHNonce,MMGenTxID,Str,HexStr
from ....amt import ETHAmt
from ....util import msg,line_input,is_int,is_hex_str,make_chksum_6
from ....util import msg,is_int,is_hex_str,make_chksum_6
from ....tw.ctl import TrackingWallet
from ....addr import is_mmgen_id,is_coin_addr
from ..contract import Token
@ -88,6 +88,7 @@ class New(Base,TxBase.New):
self.process_cmd_arg(a,ad_f,ad_w)
def select_unspent(self,unspent):
from ....ui import line_input
while True:
reply = line_input('Enter an account to spend from: ').strip()
if reply:

View file

@ -302,7 +302,7 @@ def warn_trustlevel(coinsym):
Are you sure you want to continue?
"""
from .util import qmsg,fmt,keypress_confirm
from .util import qmsg,fmt
from .color import red,yellow,green
warning = fmt(m).strip().format(
@ -319,6 +319,7 @@ def warn_trustlevel(coinsym):
qmsg(warning)
return
from .ui import keypress_confirm
if not keypress_confirm(warning,default_yes=True):
import sys
sys.exit(0)

View file

@ -152,7 +152,7 @@ class tool_cmd(tool_cmd_base):
def extract_key_from_geth_wallet( self, wallet_file:str, check_addr=True ):
"decrypt the encrypted private key in a Geth keystore wallet, returning the decrypted key"
from ..util import line_input
from ..ui import line_input
from ..opts import opt
from ..proto.eth.misc import extract_key_from_geth_keystore_wallet
passwd = line_input( 'Enter passphrase: ', echo=opt.echo_passphrase ).strip().encode()

View file

@ -184,7 +184,7 @@ def gen_tool_cmd_usage(mod,cmdname):
def usage(cmdname=None,exit_val=1):
from ..util import Msg,die,do_pager
from ..util import Msg,die
if cmdname:
for mod,cmdlist in main_tool.mods.items():
@ -194,6 +194,7 @@ def usage(cmdname=None,exit_val=1):
else:
die(1,f'{cmdname!r}: no such tool command')
else:
from ..ui import do_pager
do_pager('\n'.join(gen_tool_usage()))
import sys

View file

@ -26,7 +26,7 @@ from ..globalvars import g
from ..objmethods import Hilite,InitErrors,MMGenObject
from ..obj import TwComment,get_obj,MMGenIdx,MMGenList
from ..color import nocolor,yellow,green
from ..util import msg,msg_r,fmt,die,line_input,do_pager,capfirst,make_timestr
from ..util import msg,msg_r,fmt,die,capfirst,make_timestr
from ..addr import MMGenID
# mixin class for TwUnspentOutputs,TwAddrList,TwTxHistory:
@ -313,10 +313,12 @@ class TwCommon:
parent.oneshot_msg = green(f'Data written to {outfile!r}\n\n')
async def a_view(self,parent):
from ..ui import do_pager
do_pager( await parent.format_squeezed(color=True,cached=True) )
self.post_view(parent)
async def a_view_detail(self,parent):
from ..ui import do_pager
do_pager( await parent.format_detail(color=True) )
self.post_view(parent)
@ -329,6 +331,7 @@ class TwCommon:
async def run(self,parent,action):
msg('')
from ..ui import line_input
while True:
ret = line_input(f'Enter {parent.item_desc} number (or RETURN to return to main menu): ')
if ret == '':

View file

@ -15,15 +15,7 @@ tw.json: export and import tracking wallet to JSON format
import json
from collections import namedtuple
from ..util import (
msg,
ymsg,
fmt,
die,
make_timestamp,
make_chksum_8,
keypress_confirm,
compare_or_die )
from ..util import msg,ymsg,fmt,die,make_timestamp,make_chksum_8,compare_or_die
from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..rpc import json_encoder
@ -121,6 +113,7 @@ class TwJSON:
msg('\n'+fmt(self.info_msg.strip(),indent=' '))
from ..ui import keypress_confirm
if not keypress_confirm('Continue?'):
msg('Exiting at user request')
return False

View file

@ -25,15 +25,7 @@ from collections import namedtuple
from ..globalvars import g
from ..color import red,yellow
from ..util import (
msg,
die,
capfirst,
suf,
fmt,
keypress_confirm,
line_input,
)
from ..util import msg,die,capfirst,suf,fmt
from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..obj import ImmutableAttr,ListItemAttr,MMGenListItem,TwComment,get_obj,HexStr,CoinTxID,MMGenList
@ -258,6 +250,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
class item_action(TwCommon.item_action):
async def a_balance_refresh(self,uo,idx):
from ..ui import keypress_confirm
if not keypress_confirm(
f'Refreshing tracking wallet {uo.item_desc} #{idx}. Is this what you want?'):
return 'redo'
@ -266,6 +259,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
uo.oneshot_msg = yellow(f'{uo.proto.dcoin} balance for account #{idx} refreshed\n\n')
async def a_addr_delete(self,uo,idx):
from ..ui import keypress_confirm
if not keypress_confirm(
f'Removing {uo.item_desc} #{idx} from tracking wallet. Is this what you want?'):
return 'redo'
@ -297,6 +291,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
cur_lbl = uo.data[idx-1].label
msg('Current label: {}'.format(cur_lbl.hl() if cur_lbl else '(none)'))
from ..ui import line_input
res = line_input(
"Enter label text (or ENTER to return to main menu): ",
insert_txt = cur_lbl )
@ -304,6 +299,7 @@ class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit):
if res == cur_lbl:
return None
elif res == '':
from ..ui import keypress_confirm
return (await do_lbl_add('')) if keypress_confirm(
f'Removing label for {desc}. Is this what you want?') else 'redo'
else:

View file

@ -16,7 +16,7 @@ from ..globalvars import *
from ..objmethods import MMGenObject
from ..obj import ImmutableAttr,ListItemAttr,MMGenListItem,MMGenTxLabel,TwComment,CoinTxID,HexStr
from ..addr import MMGenID,CoinAddr
from ..util import msg,ymsg,fmt,remove_dups,keypress_confirm,make_timestamp,line_input,die
from ..util import msg,ymsg,fmt,remove_dups,make_timestamp,die
from ..opts import opt
class MMGenTxIO(MMGenListItem):
@ -153,6 +153,7 @@ class Base(MMGenObject):
self.label = MMGenTxLabel(get_data_from_file(infile,'transaction comment'))
else: # get comment from user, or edit existing comment
m = ('Add a comment to transaction?','Edit transaction comment?')[bool(self.label)]
from ..ui import keypress_confirm,line_input
if keypress_confirm(m,default_yes=False):
while True:
s = MMGenTxLabel(line_input('Comment: ',insert_txt=self.label))
@ -180,5 +181,7 @@ class Base(MMGenObject):
die( 'UserOptError', f'\n{indent}ERROR: {m}\n' )
else:
msg(f'\n{indent}WARNING: {m}\n')
if not (opt.yes or keypress_confirm('Continue?',default_yes=True)):
die(1,'Exiting at user request')
if not opt.yes:
from ..ui import keypress_confirm
if not keypress_confirm('Continue?',default_yes=True):
die(1,'Exiting at user request')

View file

@ -15,7 +15,7 @@ tx.bump: transaction bump class
from .new import New
from .completed import Completed
from ..opts import opt
from ..util import line_input,is_int,keypress_confirm
from ..util import is_int
class Bump(Completed,New):
desc = 'fee-bumped transaction'
@ -60,6 +60,7 @@ class Bump(Completed,New):
while True:
if init_reply == None:
from ..ui import line_input
m = 'Choose an output to deduct the fee from (Hit ENTER for the change output): '
reply = line_input(m) or 'c'
else:
@ -77,8 +78,11 @@ class Bump(Completed,New):
cm = ' (change output)' if chg_idx == idx else ''
prompt = f'Fee will be deducted from output {idx+1}{cm} ({o_amt} {self.coin})'
if check_sufficient_funds(o_amt):
if opt.yes or keypress_confirm(prompt+'. OK?',default_yes=True):
if opt.yes:
msg(prompt)
self.bump_output_idx = idx
return idx
if opt.yes:
msg(prompt)
else:
from ..ui import keypress_confirm
if not keypress_confirm(prompt+'. OK?',default_yes=True):
continue
self.bump_output_idx = idx
return idx

View file

@ -15,7 +15,7 @@ tx.info: transaction info class
from ..globalvars import *
from ..color import red,green,orange
from ..opts import opt
from ..util import msg,msg_r,do_pager
from ..util import msg,msg_r
import importlib
@ -107,6 +107,7 @@ class TxInfo:
def view(self,pager=False,pause=True,terse=False):
o = self.format(terse=terse)
if pager:
from ..ui import do_pager
do_pager(o)
else:
msg_r(o)

View file

@ -17,7 +17,7 @@ from ..opts import opt
from .base import Base
from ..color import pink
from ..obj import get_obj,MMGenList
from ..util import msg,qmsg,fmt,die,suf,remove_dups,get_extension,keypress_confirm,do_license_msg,line_input
from ..util import msg,qmsg,fmt,die,suf,remove_dups,get_extension
from ..addr import is_mmgen_id,CoinAddr,is_coin_addr
def mmaddr2coinaddr(mmaddr,ad_w,ad_f,proto):
@ -50,6 +50,7 @@ def mmaddr2coinaddr(mmaddr,ad_w,ad_f,proto):
coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
if coin_addr:
msg(wmsg('addr_in_addrfile_only'))
from ..ui import keypress_confirm
if not (opt.yes or keypress_confirm('Continue anyway?')):
sys.exit(1)
else:
@ -108,6 +109,7 @@ class New(Base):
def get_usr_fee_interactive(self,tx_fee=None,desc='Starting'):
abs_fee = None
from ..ui import line_input
while True:
if tx_fee:
abs_fee = self.convert_and_check_fee(tx_fee,desc)
@ -122,6 +124,7 @@ class New(Base):
self.coin,
pink(str(self.fee_abs2rel(abs_fee))),
self.rel_fee_disp)
from ..ui import keypress_confirm
if opt.yes or keypress_confirm(prompt+'OK?',default_yes=True):
if opt.yes:
msg(prompt)
@ -231,6 +234,7 @@ class New(Base):
# inputs methods
def select_unspent(self,unspent):
prompt = 'Enter a range or space-separated list of outputs to spend: '
from ..ui import line_input
while True:
reply = line_input(prompt).strip()
if reply:
@ -307,6 +311,7 @@ class New(Base):
if funds_left >= 0:
p = self.final_inputs_ok_msg(funds_left)
from ..ui import keypress_confirm
if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
if opt.yes:
msg(p)
@ -331,6 +336,7 @@ class New(Base):
if not do_info:
await self.get_outputs_from_cmdline(cmd_args)
from ..ui import do_license_msg
do_license_msg()
if not opt.inputs:

View file

@ -13,8 +13,6 @@ tx.online: online signed transaction class
"""
from .signed import Signed
from ..util import msg,confirm_or_raise
from ..opts import opt
class OnlineSigned(Signed):
@ -24,6 +22,9 @@ class OnlineSigned(Signed):
return _base_proto_subclass('Status','status',self.proto)(self)
def confirm_send(self):
from ..opts import opt
from ..util import msg
from ..ui import confirm_or_raise
confirm_or_raise(
message = '' if opt.quiet else 'Once this transaction is sent, there’s no taking it back!',
action = f'broadcast this transaction to the {self.proto.coin} {self.proto.network.upper()} network',

159
mmgen/ui.py Executable file
View file

@ -0,0 +1,159 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
ui.py: Interactive user interface functions for the MMGen suite
"""
import sys,os
from .globalvars import g
from .opts import opt
from .util import msg,msg_r,Msg,dmsg,die
def confirm_or_raise(message,action,expect='YES',exit_msg='Exiting at user request'):
if message:
msg(message)
if line_input(
(f'{action} ' if action[0].isupper() else f'Are you sure you want to {action}?\n') +
f'Type uppercase {expect!r} to confirm: '
).strip() != expect:
die( 'UserNonConfirmation', exit_msg )
def get_words_from_user(prompt):
words = line_input(prompt, echo=opt.echo_passphrase).split()
dmsg('Sanitized input: [{}]'.format(' '.join(words)))
return words
def get_data_from_user(desc='data'): # user input MUST be UTF-8
data = line_input(f'Enter {desc}: ',echo=opt.echo_passphrase)
dmsg(f'User input: [{data}]')
return data
def line_input(prompt,echo=True,insert_txt=''):
"""
multi-line prompts OK
one-line prompts must begin at beginning of line
empty prompts forbidden due to interactions with readline
"""
assert prompt,'calling line_input() with an empty prompt forbidden'
def init_readline():
try:
import readline
except ImportError:
return False
else:
if insert_txt:
readline.set_startup_hook(lambda: readline.insert_text(insert_txt))
return True
else:
return False
if not sys.stdout.isatty():
msg_r(prompt)
prompt = ''
from .term import kb_hold_protect
kb_hold_protect()
if g.test_suite_popen_spawn:
msg(prompt)
sys.stderr.flush()
reply = os.read(0,4096).decode().rstrip('\n') # strip NL to mimic behavior of input()
elif echo or not sys.stdin.isatty():
clear_buffer = init_readline() if sys.stdin.isatty() else False
reply = input(prompt)
if clear_buffer:
import readline
readline.set_startup_hook(lambda: readline.insert_text(''))
else:
from getpass import getpass
if g.platform == 'win':
# MSWin hack - getpass('foo') doesn't flush stderr
msg_r(prompt.strip()) # getpass('') adds a space
sys.stderr.flush()
reply = getpass('')
else:
reply = getpass(prompt)
kb_hold_protect()
return reply.strip()
def keypress_confirm(prompt,default_yes=False,verbose=False,no_nl=False,complete_prompt=False):
if not complete_prompt:
prompt = '{} {}: '.format( prompt, '(Y/n)' if default_yes else '(y/N)' )
nl = f'\r{" "*len(prompt)}\r' if no_nl else '\n'
if g.accept_defaults:
msg(prompt)
return default_yes
from .term import get_char
while True:
reply = get_char(prompt,immed_chars='yYnN').strip('\n\r')
if not reply:
msg_r(nl)
return True if default_yes else False
elif reply in 'yYnN':
msg_r(nl)
return True if reply in 'yY' else False
else:
msg_r('\nInvalid reply\n' if verbose else '\r')
def do_pager(text):
pagers = ['less','more']
end_msg = '\n(end of text)\n\n'
# --- Non-MSYS Windows code deleted ---
# raw, chop, horiz scroll 8 chars, disable buggy line chopping in MSYS
os.environ['LESS'] = (('--shift 8 -RS'),('-cR -#1'))[g.platform=='win']
if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]:
pagers = [os.environ['PAGER']] + pagers
from subprocess import run
from .color import set_vt100
for pager in pagers:
try:
m = text + ('' if pager == 'less' else end_msg)
p = run([pager],input=m.encode(),check=True)
msg_r('\r')
except:
pass
else:
break
else:
Msg(text+end_msg)
set_vt100()
def do_license_msg(immed=False):
if opt.quiet or g.no_license or opt.yes or not g.stdin_tty:
return
import mmgen.contrib.license as gpl
msg(gpl.warning)
from .term import get_char
prompt = "Press 'w' for conditions and warranty info, or 'c' to continue: "
while True:
reply = get_char(prompt, immed_chars=('','wc')[bool(immed)])
if reply == 'w':
do_pager(gpl.conditions)
elif reply == 'c':
msg('')
break
else:
msg_r('\r')
msg('')

View file

@ -17,7 +17,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
util.py: Frequently-used variables, classes and functions for the MMGen suite
util.py: Frequently-used variables, classes and utility functions for the MMGen suite
"""
import sys,os,time,re
@ -188,10 +188,6 @@ def remove_dups(iterable,edesc='element',desc='list',quiet=False,hide=False):
ret.append(e)
return ret if type(iterable).__name__ == 'generator' else type(iterable)(ret)
def exit_if_mswin(feature):
if g.platform == 'win':
die(2, capfirst(feature) + ' not supported on the MSWin / MSYS2 platform' )
def suf(arg,suf_type='s',verb='none'):
suf_types = {
'none': {
@ -334,32 +330,9 @@ def compare_or_die(val1, desc1, val2, desc2, e='Error'):
dmsg(f'{capfirst(desc2)} OK ({val2})')
return True
def check_wallet_extension(fn):
from .wallet import get_wallet_data
get_wallet_data( ext=get_extension(fn), die_on_fail=True ) # raises exception on failure
def make_full_path(outdir,outfile):
return os.path.normpath(os.path.join(outdir, os.path.basename(outfile)))
def confirm_or_raise(message,action,expect='YES',exit_msg='Exiting at user request'):
if message:
msg(message)
if line_input(
(f'{action} ' if action[0].isupper() else f'Are you sure you want to {action}?\n') +
f'Type uppercase {expect!r} to confirm: '
).strip() != expect:
die( 'UserNonConfirmation', exit_msg )
def get_words_from_user(prompt):
words = line_input(prompt, echo=opt.echo_passphrase).split()
dmsg('Sanitized input: [{}]'.format(' '.join(words)))
return words
def get_data_from_user(desc='data'): # user input MUST be UTF-8
data = line_input(f'Enter {desc}: ',echo=opt.echo_passphrase)
dmsg(f'User input: [{data}]')
return data
class oneshot_warning:
color = 'nocolor'
@ -395,129 +368,12 @@ class oneshot_warning_group(oneshot_warning):
def __init__(self,wcls,div=None,fmt_args=[],reverse=False):
self.do(getattr(self,wcls),div,fmt_args,reverse)
def line_input(prompt,echo=True,insert_txt=''):
"""
multi-line prompts OK
one-line prompts must begin at beginning of line
empty prompts forbidden due to interactions with readline
"""
assert prompt,'calling line_input() with an empty prompt forbidden'
def init_readline():
try:
import readline
except ImportError:
return False
else:
if insert_txt:
readline.set_startup_hook(lambda: readline.insert_text(insert_txt))
return True
else:
return False
if not sys.stdout.isatty():
msg_r(prompt)
prompt = ''
from .term import kb_hold_protect
kb_hold_protect()
if g.test_suite_popen_spawn:
msg(prompt)
sys.stderr.flush()
reply = os.read(0,4096).decode().rstrip('\n') # strip NL to mimic behavior of input()
elif echo or not sys.stdin.isatty():
clear_buffer = init_readline() if sys.stdin.isatty() else False
reply = input(prompt)
if clear_buffer:
import readline
readline.set_startup_hook(lambda: readline.insert_text(''))
else:
from getpass import getpass
if g.platform == 'win':
# MSWin hack - getpass('foo') doesn't flush stderr
msg_r(prompt.strip()) # getpass('') adds a space
sys.stderr.flush()
reply = getpass('')
else:
reply = getpass(prompt)
kb_hold_protect()
return reply.strip()
def keypress_confirm(prompt,default_yes=False,verbose=False,no_nl=False,complete_prompt=False):
if not complete_prompt:
prompt = '{} {}: '.format( prompt, '(Y/n)' if default_yes else '(y/N)' )
nl = f'\r{" "*len(prompt)}\r' if no_nl else '\n'
if g.accept_defaults:
msg(prompt)
return default_yes
from .term import get_char
while True:
reply = get_char(prompt,immed_chars='yYnN').strip('\n\r')
if not reply:
msg_r(nl)
return True if default_yes else False
elif reply in 'yYnN':
msg_r(nl)
return True if reply in 'yY' else False
else:
msg_r('\nInvalid reply\n' if verbose else '\r')
def stdout_or_pager(s):
(do_pager if opt.pager else Msg_r)(s)
def do_pager(text):
pagers = ['less','more']
end_msg = '\n(end of text)\n\n'
# --- Non-MSYS Windows code deleted ---
# raw, chop, horiz scroll 8 chars, disable buggy line chopping in MSYS
os.environ['LESS'] = (('--shift 8 -RS'),('-cR -#1'))[g.platform=='win']
if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]:
pagers = [os.environ['PAGER']] + pagers
from subprocess import run
from .color import set_vt100
for pager in pagers:
try:
m = text + ('' if pager == 'less' else end_msg)
p = run([pager],input=m.encode(),check=True)
msg_r('\r')
except:
pass
else:
break
if opt.pager:
from .ui import do_pager
do_pager(s)
else:
Msg(text+end_msg)
set_vt100()
def do_license_msg(immed=False):
if opt.quiet or g.no_license or opt.yes or not g.stdin_tty:
return
import mmgen.contrib.license as gpl
msg(gpl.warning)
from .term import get_char
prompt = "Press 'w' for conditions and warranty info, or 'c' to continue: "
while True:
reply = get_char(prompt, immed_chars=('','wc')[bool(immed)])
if reply == 'w':
do_pager(gpl.conditions)
elif reply == 'c':
msg('')
break
else:
msg_r('\r')
msg('')
Msg_r(s)
def get_subclasses(cls,names=False):
def gen(cls):
@ -558,3 +414,7 @@ def wrap_ripemd160(called=[]):
hashlib_new = hashlib.new
hashlib.new = hashlib_new_wrapper
called.append(True)
def exit_if_mswin(feature):
if g.platform == 'win':
die(2, capfirst(feature) + ' not supported on the MSWin / MSYS2 platform' )

View file

@ -153,3 +153,6 @@ def Wallet(
passwd_file = passwd_file )
return me
def check_wallet_extension(fn):
get_wallet_data( ext=get_extension(fn), die_on_fail=True ) # raises exception on failure

View file

@ -16,7 +16,7 @@ import os
from ..globalvars import g
from ..opts import opt
from ..util import msg,qmsg,die,get_data_from_user
from ..util import msg,qmsg,die
from ..objmethods import MMGenObject
from . import Wallet,wallet_data,get_wallet_cls
@ -82,6 +82,7 @@ class wallet(MMGenObject,metaclass=WalletMeta):
self.fmt_data = self._get_data_from_user(self.desc)
def _get_data_from_user(self,desc):
from ..ui import get_data_from_user
return get_data_from_user(desc)
def _deformat_once(self):

View file

@ -15,7 +15,7 @@ wallet.dieroll: dieroll wallet class
import time
from ..globalvars import g
from ..opts import opt
from ..util import msg,msg_r,die,fmt,remove_whitespace,keypress_confirm
from ..util import msg,msg_r,die,fmt,remove_whitespace
from ..util2 import block_format
from ..seed import Seed
from ..baseconv import baseconv
@ -54,6 +54,7 @@ class wallet(wallet):
seed_bytes = bc.tobytes( d, pad='seed' )[-seed_len:]
if self.interactive_input and opt.usr_randchars:
from ..ui import keypress_confirm
if keypress_confirm(self.user_entropy_prompt):
from ..crypto import add_user_random
seed_bytes = add_user_random(
@ -70,6 +71,7 @@ class wallet(wallet):
def _get_data_from_user(self,desc):
if not g.stdin_tty:
from ..ui import get_data_from_user
return get_data_from_user(desc)
bc = baseconv('b6d')

View file

@ -15,7 +15,7 @@ wallet.incog_base: incognito wallet base class
from ..globalvars import g
from ..opts import opt
from ..seed import Seed
from ..util import msg,vmsg,qmsg,make_chksum_8,keypress_confirm
from ..util import msg,vmsg,qmsg,make_chksum_8
from .enc import wallet
import mmgen.crypto as crypto
@ -146,6 +146,7 @@ class wallet(wallet):
def _verify_seed_oldfmt(self,seed):
m = f'Seed ID: {make_chksum_8(seed)}. Is the Seed ID correct?'
from ..ui import keypress_confirm
if keypress_confirm(m, True):
return seed
else:

View file

@ -17,17 +17,7 @@ import os
from ..globalvars import g
from ..opts import opt
from ..seed import Seed
from ..util import (
msg,
dmsg,
qmsg,
die,
compare_or_die,
keypress_confirm,
line_input,
capfirst,
confirm_or_raise
)
from ..util import msg,dmsg,qmsg,die,compare_or_die,capfirst
from ..util2 import parse_bytespec
from .incog_base import wallet
@ -108,6 +98,7 @@ class wallet(wallet):
try:
os.stat(fn)
except:
from ..ui import keypress_confirm,line_input
if keypress_confirm(
f'Requested file {fn!r} does not exist. Create?',
default_yes = True ):
@ -136,6 +127,7 @@ class wallet(wallet):
if check_offset:
self._check_valid_offset(f,'write')
if not opt.quiet:
from ..ui import confirm_or_raise
confirm_or_raise(
message = '',
action = f'alter file {f.name!r}' )

View file

@ -17,7 +17,7 @@ import os
from ..globalvars import g
from ..opts import opt
from ..seed import Seed
from ..util import msg,qmsg,line_input,make_timestamp,make_chksum_6,split_into_cols,is_chksum_6,compare_chksums
from ..util import msg,qmsg,make_timestamp,make_chksum_6,split_into_cols,is_chksum_6,compare_chksums
from ..obj import MMGenWalletLabel,get_obj
from ..baseconv import baseconv
@ -43,6 +43,7 @@ class wallet(wallet):
prompt = 'Enter a wallet label, or hit ENTER {}: '.format(
'to reuse the label {}'.format(old_lbl.hl(encl="''")) if old_lbl else
'for no label' )
from ..ui import line_input
while True:
ret = line_input(prompt)
if ret:

View file

@ -14,7 +14,7 @@ wallet.mnemonic: MMGen mnemonic wallet base class
from ..globalvars import g
from ..baseconv import baseconv
from ..util import msg,compare_or_die,get_data_from_user
from ..util import msg,compare_or_die
from ..seed import Seed
from .unenc import wallet
@ -32,6 +32,7 @@ class wallet(wallet):
def _get_data_from_user(self,desc):
if not g.stdin_tty:
from ..ui import get_data_from_user
return get_data_from_user(desc)
from ..mn_entry import mn_entry # import here to catch cfg var errors

View file

@ -14,7 +14,7 @@ wallet.unenc: unencrypted wallet base class
from ..globalvars import g
from ..color import blue,yellow
from ..util import msg,msg_r,capfirst,is_int,keypress_confirm
from ..util import msg,msg_r,capfirst,is_int
from .base import wallet
class wallet(wallet):
@ -53,5 +53,6 @@ class wallet(wallet):
while True:
usr_len = choose_len()
prompt = self.choose_seedlen_confirm.format(usr_len)
from ..ui import keypress_confirm
if keypress_confirm(prompt,default_yes=True,no_nl=not g.test_suite):
return usr_len

View file

@ -33,6 +33,7 @@ from .addrlist import KeyAddrList,AddrIdxList
from .rpc import json_encoder
from .proto.xmr.rpc import MoneroRPCClientRaw,MoneroWalletRPCClient
from .proto.xmr.daemon import MoneroWalletDaemon
from .ui import keypress_confirm
xmrwallet_uarg_info = (
lambda e,hp: {

View file

@ -83,6 +83,7 @@ if opt.list_paths:
if not opt.no_prompt:
m = 'Deleting the following paths and files:\n {}\nProceed?'
from mmgen.ui import keypress_confirm
if not keypress_confirm(m.format('\n '.join(del_list))):
die(1,'Exiting at user request')

View file

@ -12,5 +12,6 @@ cmd_args = opts.init({'text': { 'desc': '', 'usage':'', 'options':'-e, --echo-pa
p = ('Enter passphrase: ','Enter passphrase (echoed): ')[bool(opt.echo_passphrase)]
from mmgen.ui import get_words_from_user
pw = get_words_from_user(p)
msg('Entered: {}'.format(' '.join(pw)))

View file

@ -22,6 +22,7 @@ opts_data = {
cmd_args = opts.init(opts_data)
from mmgen.term import get_char,get_char_raw,get_terminal_size
from mmgen.ui import line_input,keypress_confirm,do_license_msg
import mmgen.term as term_mod
def cmsg(m):

View file

@ -261,6 +261,7 @@ def list_cmds():
for cmd in sorted(utils):
yield ' {:{w}} - {}'.format( cmd, utils[cmd], w=w )
from mmgen.ui import do_pager
do_pager('\n'.join(gen_output()))
sys.exit(0)
@ -479,6 +480,7 @@ class CmdGroupMgr(object):
for k,v in cls.cmd_subgroups.items():
yield ' + {} · {}'.format( cyan(k.ljust(max_w+1)), v[0] )
from mmgen.ui import do_pager
do_pager('\n'.join(gen_output()))
Msg( '\n' + ' '.join(e[0] for e in ginfo) )

View file

@ -78,6 +78,7 @@ def skip(name,reason=None):
return 'skip'
def confirm_continue():
from mmgen.ui import keypress_confirm
if keypress_confirm(blue('Continue? (Y/n): '),default_yes=True,complete_prompt=True):
if opt.verbose or opt.exact_output: sys.stderr.write('\n')
else:

View file

@ -169,6 +169,7 @@ class TestSuiteXMRWallet(TestSuiteBase):
{' '.join(a+b2)}
""",indent=' ',strip_char='\t'))
from mmgen.ui import keypress_confirm
if keypress_confirm('Continue?'):
start_proxy()
else: