util.py: move file utilities to fileutil.py

This commit is contained in:
The MMGen Project 2022-01-21 11:23:46 +00:00
commit 4819c97f8a
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
29 changed files with 363 additions and 288 deletions

View file

@ -26,8 +26,6 @@ from .util import (
qmsg_r,
die,
capfirst,
get_lines_from_file,
write_data_to_file,
keypress_confirm,
)
from .protocol import init_proto
@ -74,6 +72,7 @@ class AddrFile(MMGenObject):
def write(self,fn=None,ask_tty=True,ask_write_default_yes=False,binary=False,desc=None):
from .opts import opt
from .fileutil import write_data_to_file
write_data_to_file(
fn or self.filename,
self.fmt_data,
@ -224,6 +223,7 @@ class AddrFile(MMGenObject):
return ( proto, proto.addr_type(mmtype_key) )
p = self.parent
from .fileutil import get_lines_from_file
lines = get_lines_from_file(fn,p.desc+' data',trim_comments=True)
try:

View file

@ -232,6 +232,7 @@ def get_hash_preset_from_user(hp=g.dfl_hash_preset,desc='data'):
def get_new_passphrase(desc,passchg=False):
pw_desc = f"{'new ' if passchg else ''}passphrase for {desc}"
if opt.passwd_file:
from .fileutil import get_words_from_file
pw = ' '.join(get_words_from_file(opt.passwd_file,pw_desc))
elif opt.echo_passphrase:
pw = ' '.join(get_words_from_user(f'Enter {pw_desc}: '))
@ -254,6 +255,7 @@ def get_new_passphrase(desc,passchg=False):
def get_passphrase(desc,passchg=False):
pw_desc = f"{'old ' if passchg else ''}passphrase for {desc}"
if opt.passwd_file:
from .fileutil import get_words_from_file
pwfile_reuse_warning(opt.passwd_file)
return ' '.join(get_words_from_file(opt.passwd_file,pw_desc))
else:

View file

@ -1 +1 @@
13.1.dev006
13.1.dev007

313
mmgen/fileutil.py Executable file
View file

@ -0,0 +1,313 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
fileutil.py: Routines that read, write, execute or stat files
"""
import sys,os
from .globalvars import g
from .exception import FileNotFound,MaxInputSizeExceeded
from .util import (
msg,
qmsg,
dmsg,
die,
confirm_or_raise,
get_extension,
is_utf8,
capfirst,
make_full_path,
strip_comments,
keypress_confirm,
)
def check_or_create_dir(path):
try:
os.listdir(path)
except:
if os.getenv('MMGEN_TEST_SUITE'):
from subprocess import run
try: # exception handling required for MSWin/MSYS2
run(['/bin/rm','-rf',path])
except:
pass
try:
os.makedirs(path,0o700)
except:
die(2,f'ERROR: unable to read or create path {path!r}')
def check_binary(args):
from subprocess import run,DEVNULL
try:
run(args,stdout=DEVNULL,stderr=DEVNULL,check=True)
except:
rdie(2,f'{args[0]!r} binary missing, not in path, or not executable')
def shred_file(fn,verbose=False):
check_binary(['shred','--version'])
from subprocess import run
run(
['shred','--force','--iterations=30','--zero','--remove=wipesync']
+ (['--verbose'] if verbose else [])
+ [fn],
check=True )
def _check_file_type_and_access(fname,ftype,blkdev_ok=False):
import stat
access,op_desc = (
(os.W_OK,'writ') if ftype in ('output file','output directory') else
(os.R_OK,'read') )
if ftype == 'output directory':
ok_types = [(stat.S_ISDIR, 'output directory')]
else:
ok_types = [
(stat.S_ISREG,'regular file'),
(stat.S_ISLNK,'symbolic link')
]
if blkdev_ok:
ok_types.append((stat.S_ISBLK,'block device'))
try:
mode = os.stat(fname).st_mode
except:
raise FileNotFound(f'Requested {ftype} {fname!r} not found')
for t in ok_types:
if t[0](mode):
break
else:
ok_list = ' or '.join( t[1] for t in ok_types )
die(1,f'Requested {ftype} {fname!r} is not a {ok_list}')
if not os.access(fname,access):
die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you')
return True
def check_infile(f,blkdev_ok=False):
return _check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok)
def check_outfile(f,blkdev_ok=False):
return _check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok)
def check_outdir(f):
return _check_file_type_and_access(f,'output directory')
def get_seed_file(cmd_args,nargs,invoked_as=None):
from .opts import opt
from .filename import find_file_in_dir
from .wallet import MMGenWallet
wf = find_file_in_dir(MMGenWallet,g.data_dir)
wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
import mmgen.opts as opts
if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs:
if not wf:
msg('No default wallet found, and no other seed source was specified')
opts.usage()
elif len(cmd_args) > nargs:
opts.usage()
elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
qmsg('Warning: overriding default wallet with user-supplied wallet')
if cmd_args or wf:
check_infile(cmd_args[0] if cmd_args else wf)
return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt]
def _open_or_die(filename,mode,silent=False):
try:
return open(filename,mode)
except:
die(2,'' if silent else
'Unable to open file {!r} for {}'.format(
({0:'STDIN',1:'STDOUT',2:'STDERR'}[filename] if type(filename) == int else filename),
('reading' if 'r' in mode else 'writing')
))
def write_data_to_file( outfile,data,desc='data',
ask_write=False,
ask_write_prompt='',
ask_write_default_yes=True,
ask_overwrite=True,
ask_tty=True,
no_tty=False,
quiet=False,
binary=False,
ignore_opt_outdir=False,
check_data=False,
cmp_data=None):
from .opts import opt
if quiet:
ask_tty = ask_overwrite = False
if opt.quiet:
ask_overwrite = False
if ask_write_default_yes == False or ask_write_prompt:
ask_write = True
def do_stdout():
qmsg('Output to STDOUT requested')
if g.stdin_tty:
if no_tty:
die(2,f'Printing {desc} to screen is not allowed')
if (ask_tty and not opt.quiet) or binary:
confirm_or_raise('',f'output {desc} to screen')
else:
try: of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux
except: of = None # Windows
if of:
if of[:5] == 'pipe:':
if no_tty:
die(2,f'Writing {desc} to pipe is not allowed')
if ask_tty and not opt.quiet:
confirm_or_raise('',f'output {desc} to pipe')
msg('')
of2,pd = os.path.relpath(of),os.path.pardir
msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2))
else:
msg('Redirecting output to file')
if binary and g.platform == 'win':
import msvcrt
msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
# MSWin workaround. See msg_r()
try:
sys.stdout.write(data.decode() if isinstance(data,bytes) else data)
except:
os.write(1,data if isinstance(data,bytes) else data.encode())
def do_file(outfile,ask_write_prompt):
if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile):
outfile = make_full_path(opt.outdir,outfile)
if ask_write:
if not ask_write_prompt:
ask_write_prompt = f'Save {desc}?'
if not keypress_confirm(ask_write_prompt,
default_yes=ask_write_default_yes):
die(1,f'{capfirst(desc)} not saved')
hush = False
if os.path.lexists(outfile) and ask_overwrite:
confirm_or_raise('',f'File {outfile!r} already exists\nOverwrite?')
msg(f'Overwriting file {outfile!r}')
hush = True
# not atomic, but better than nothing
# if cmp_data is empty, file can be either empty or non-existent
if check_data:
try:
with open(outfile,('r','rb')[bool(binary)]) as fp:
d = fp.read()
except:
d = ''
finally:
if d != cmp_data:
if g.test_suite:
print_diff(cmp_data,d)
die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write')
# To maintain portability, always open files in binary mode
# If 'binary' option not set, encode/decode data before writing and after reading
try:
with _open_or_die(outfile,'wb') as fp:
fp.write(data if binary else data.encode())
except:
die(2,f'Failed to write {desc} to file {outfile!r}')
if not (hush or quiet):
msg(f'{capfirst(desc)} written to file {outfile!r}')
return True
if opt.stdout or outfile in ('','-'):
do_stdout()
elif sys.stdin.isatty() and not sys.stdout.isatty():
do_stdout()
else:
do_file(outfile,ask_write_prompt)
def get_words_from_file(infile,desc,quiet=False):
if not quiet:
qmsg(f'Getting {desc} from file {infile!r}')
with _open_or_die(infile, 'rb') as fp:
data = fp.read()
try:
words = data.decode().split()
except:
die(1,f'{capfirst(desc)} data must be UTF-8 encoded.')
dmsg('Sanitized input: [{}]'.format(' '.join(words)))
return words
def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False):
from .opts import opt
if not opt.quiet and not silent and not quiet and desc:
qmsg(f'Getting {desc} from file {infile!r}')
with _open_or_die(
(0 if dash and infile == '-' else infile),
'rb',
silent=silent) as fp:
data = fp.read(g.max_input_size+1)
if not binary:
data = data.decode()
if len(data) == g.max_input_size + 1:
raise MaxInputSizeExceeded(f'Too much input data! Max input data size: {f.max_input_size} bytes')
return data
def _mmgen_decrypt_file_maybe(fn,desc='',quiet=False,silent=False):
d = get_data_from_file(fn,desc,binary=True,quiet=quiet,silent=silent)
from .crypto import mmenc_ext
have_enc_ext = get_extension(fn) == mmenc_ext
if have_enc_ext or not is_utf8(d):
m = ('Attempting to decrypt','Decrypting')[have_enc_ext]
qmsg(f'{m} {desc} {fn!r}')
from .crypto import mmgen_decrypt_retry
d = mmgen_decrypt_retry(d,desc)
return d
def get_lines_from_file(fn,desc='',trim_comments=False,quiet=False,silent=False):
dec = _mmgen_decrypt_file_maybe(fn,desc,quiet=quiet,silent=silent)
ret = dec.decode().splitlines()
if trim_comments:
ret = strip_comments(ret)
dmsg(f'Got {len(ret)} lines from file {fn!r}')
return ret

View file

@ -145,6 +145,7 @@ if opt.keygen_backend:
idxs = AddrIdxList(fmt_str=cmd_args.pop())
from .fileutil import get_seed_file
sf = get_seed_file(cmd_args,1)
do_license_msg()

View file

@ -83,6 +83,7 @@ def parse_cmd_args(rpc,cmd_args):
if len(cmd_args) == 1:
infile = cmd_args[0]
from .fileutil import check_infile,get_lines_from_file
check_infile(infile)
if opt.addrlist:
al = AddrList(

View file

@ -317,6 +317,7 @@ def wipe_existing_key():
try: os.stat(fn)
except: pass
else:
from .fileutil import shred_file
msg(f'\nShredding existing key {fn!r}')
shred_file( fn, verbose=opt.verbose )

View file

@ -139,6 +139,7 @@ pw_idxs = AddrIdxList(fmt_str=cmd_args.pop())
pw_id_str = cmd_args.pop()
from .fileutil import get_seed_file
sf = get_seed_file(cmd_args,1)
pw_fmt = opt.passwd_fmt or PasswordList.dfl_pw_fmt

View file

@ -118,6 +118,7 @@ if opt.master_share:
if opt.id_str and not opt.master_share:
die(1,'--id-str option meaningless in context of non-master-share join')
from .fileutil import check_infile
for arg in cmd_args:
check_wallet_extension(arg)
check_infile(arg)

View file

@ -102,6 +102,8 @@ FMT CODES:
cmd_args = opts.init(opts_data)
tx_file = cmd_args.pop(0)
from .fileutil import check_infile
check_infile(tx_file)
from .tx import *

View file

@ -42,6 +42,7 @@ cmd_args = opts.init(opts_data)
if len(cmd_args) == 1:
infile = cmd_args[0]
from .fileutil import check_infile
check_infile(infile)
else:
opts.usage()

View file

@ -100,6 +100,7 @@ infiles = opts.init(opts_data)
if not infiles:
opts.usage()
from .fileutil import check_infile
for i in infiles:
check_infile(i)

View file

@ -169,6 +169,8 @@ elif invoked_as == 'seedsplit':
else:
opts.usage()
from .fileutil import check_infile,get_seed_file
if cmd_args:
if invoked_as == 'gen' or len(cmd_args) > 1:
opts.usage()
@ -227,6 +229,7 @@ if invoked_as == 'passchg' and ss_in.infile.dirname == g.data_dir:
confirm_or_raise(m1,m2,exit_msg='Password not changed')
ss_out.write_to_file(desc='New wallet',outdir=g.data_dir)
bmsg('Securely deleting old wallet')
from .fileutil import shred_file
shred_file(
ss_in.infile.name,
verbose = opt.verbose )

View file

@ -24,6 +24,7 @@ import sys,os,stat
from .exception import UserOptError
from .globalvars import g
from .base_obj import Lockable
import mmgen.share.Opts
class UserOpts(Lockable):
@ -307,7 +308,7 @@ def init(opts_data=None,add_opts=None,init_opts=None,opt_filter=None,parse_only=
else:
g.data_dir_root = os.path.join(g.home_dir,'.'+g.proj_name.lower())
from .util import check_or_create_dir
from .fileutil import check_or_create_dir
check_or_create_dir(g.data_dir_root)
from .term import init_term
@ -516,7 +517,7 @@ def check_usr_opts(usr_opts): # Raises an exception if any check fails
fn,offset = a
opt_is_int(offset,desc)
from .util import check_infile,check_outdir,check_outfile
from .fileutil import check_infile,check_outdir,check_outfile
if key == 'hidden_incog_input_params':
check_infile(fn,blkdev_ok=True)
key2 = 'in_fmt'
@ -624,10 +625,10 @@ def check_usr_opts(usr_opts): # Raises an exception if any check fails
desc = f'parameter for {fmt_opt(key)!r} option'
if key in g.infile_opts:
from .util import check_infile
from .fileutil import check_infile
check_infile(val) # file exists and is readable - dies on error
elif key == 'outdir':
from .util import check_outdir
from .fileutil import check_outdir
check_outdir(val) # dies on error
elif 'chk_'+key in cfuncs:
cfuncs['chk_'+key](key,val,desc)

View file

@ -23,6 +23,7 @@ rpc.py: Cryptocoin RPC library for the MMGen suite
import base64,json,asyncio
from decimal import Decimal
from .common import *
from .fileutil import get_lines_from_file
from .objmethods import Hilite,InitErrors
from .base_obj import AsyncInit

View file

@ -32,6 +32,7 @@ from .passwdlist import PasswordList
from .baseconv import baseconv
from .xmrseed import xmrseed
from .bip39 import bip39
from .fileutil import get_seed_file,get_data_from_file,write_data_to_file
NL = ('\n','\r\n')[g.platform=='win']

View file

@ -22,15 +22,7 @@ twctl: Tracking wallet control class for the MMGen suite
from .globalvars import g
from .exception import WalletFileError
from .util import (
msg,
dmsg,
check_or_create_dir,
write_data_to_file,
get_data_from_file,
write_mode,
altcoin_subclass
)
from .util import msg,dmsg,write_mode,altcoin_subclass
from .base_obj import AsyncInit
from .objmethods import MMGenObject
from .obj import TwComment,get_obj
@ -93,6 +85,7 @@ class TrackingWallet(MMGenObject,metaclass=AsyncInit):
))
self.tw_fn = os.path.join(tw_dir,'tracking-wallet.json')
from .fileutil import check_or_create_dir,get_data_from_file
check_or_create_dir(tw_dir)
try:
@ -210,6 +203,7 @@ class TrackingWallet(MMGenObject,metaclass=AsyncInit):
@write_mode
def write_changed(self,data):
from .fileutil import write_data_to_file
write_data_to_file(
self.tw_fn,
data,

View file

@ -35,7 +35,6 @@ from .util import (
fmt,
make_timestr,
keypress_confirm,
write_data_to_file,
line_input,
do_pager,
altcoin_subclass
@ -507,6 +506,7 @@ Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, add [l]abel:
self.proto.dcoin,
','.join(self.sort_info(include_group=False)).lower() )
msg('')
from .fileutil import write_data_to_file
try:
write_data_to_file(
of,

View file

@ -488,6 +488,7 @@ class MMGenTX:
# returns true if comment added or changed
def add_comment(self,infile=None):
if infile:
from .fileutil import get_data_from_file
self.label = MMGenTxLabel(get_data_from_file(infile,'transaction comment'))
else: # get comment from user, or edit existing comment
m = ('Add a comment to transaction?','Edit transaction comment?')[bool(self.label)]
@ -739,6 +740,7 @@ class MMGenTX:
)
ad_f = AddrData(self.proto)
from .fileutil import check_infile
for a in addrfiles:
check_infile(a)
ad_f.add(AddrList(self.proto,a))

View file

@ -57,6 +57,7 @@ class MMGenTxFile:
)[desc=='inputs']
return io_list(tx,[io(tx.proto,**e) for e in d])
from .fileutil import get_data_from_file
tx_data = get_data_from_file(infile,tx.desc+' data',quiet=quiet_open)
try:
@ -206,6 +207,7 @@ class MMGenTxFile:
if not self.fmt_data:
self.fmt_data = self.format()
from .fileutil import write_data_to_file
write_data_to_file(
outfile = self.filename,
data = self.fmt_data,

View file

@ -134,6 +134,7 @@ def get_keyaddrlist(proto,opt):
def get_keylist(proto,opt):
if opt.keys_from_file:
from .fileutil import get_lines_from_file
return get_lines_from_file(opt.keys_from_file,'key-address data',trim_comments=True)
return None

View file

@ -20,13 +20,13 @@
util.py: Low-level routines imported by other modules in the MMGen suite
"""
import sys,os,time,stat,re
from subprocess import run,PIPE,DEVNULL
import sys,os,time,re
from hashlib import sha256
from string import hexdigits,digits
from .color import *
from .exception import *
from .globalvars import *
from .exception import BadFileExtension,UserNonConfirmation
from .globalvars import g
CUR_HIDE = '\033[?25l'
CUR_SHOW = '\033[?25h'
@ -239,20 +239,6 @@ def parse_bytespec(nbytes):
die(1,f'{nbytes!r}: invalid byte specifier')
def check_or_create_dir(path):
try:
os.listdir(path)
except:
if os.getenv('MMGEN_TEST_SUITE'):
try: # exception handling required for MSWin/MSYS2
run(['/bin/rm','-rf',path])
except:
pass
try:
os.makedirs(path,0o700)
except:
die(2,f'ERROR: unable to read or create path {path!r}')
from .opts import opt
def qmsg(s,alt=None):
@ -460,69 +446,6 @@ def compare_or_die(val1, desc1, val2, desc2, e='Error'):
dmsg(f'{capfirst(desc2)} OK ({val2})')
return True
def check_binary(args):
try:
run(args,stdout=DEVNULL,stderr=DEVNULL,check=True)
except:
rdie(2,f'{args[0]!r} binary missing, not in path, or not executable')
def shred_file(fn,verbose=False):
check_binary(['shred','--version'])
run(
['shred','--force','--iterations=30','--zero','--remove=wipesync']
+ (['--verbose'] if verbose else [])
+ [fn],
check=True )
def open_or_die(filename,mode,silent=False):
try:
return open(filename,mode)
except:
die(2,'' if silent else
'Unable to open file {!r} for {}'.format(
({0:'STDIN',1:'STDOUT',2:'STDERR'}[filename] if type(filename) == int else filename),
('reading' if 'r' in mode else 'writing')
))
def check_file_type_and_access(fname,ftype,blkdev_ok=False):
access,op_desc = (
(os.W_OK,'writ') if ftype in ('output file','output directory') else
(os.R_OK,'read') )
if ftype == 'output directory':
ok_types = [(stat.S_ISDIR, 'output directory')]
else:
ok_types = [
(stat.S_ISREG,'regular file'),
(stat.S_ISLNK,'symbolic link')
]
if blkdev_ok:
ok_types.append((stat.S_ISBLK,'block device'))
try:
mode = os.stat(fname).st_mode
except:
raise FileNotFound(f'Requested {ftype} {fname!r} not found')
for t in ok_types:
if t[0](mode):
break
else:
ok_list = ' or '.join( t[1] for t in ok_types )
die(1,f'Requested {ftype} {fname!r} is not a {ok_list}')
if not os.access(fname,access):
die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you')
return True
def check_infile(f,blkdev_ok=False):
return check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok)
def check_outfile(f,blkdev_ok=False):
return check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok)
def check_outdir(f):
return check_file_type_and_access(f,'output directory')
def check_wallet_extension(fn):
from .wallet import Wallet
if not Wallet.ext_to_type(get_extension(fn)):
@ -531,29 +454,6 @@ def check_wallet_extension(fn):
def make_full_path(outdir,outfile):
return os.path.normpath(os.path.join(outdir, os.path.basename(outfile)))
def get_seed_file(cmd_args,nargs,invoked_as=None):
from .filename import find_file_in_dir
from .wallet import MMGenWallet
wf = find_file_in_dir(MMGenWallet,g.data_dir)
wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
import mmgen.opts as opts
if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs:
if not wf:
msg('No default wallet found, and no other seed source was specified')
opts.usage()
elif len(cmd_args) > nargs:
opts.usage()
elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
qmsg('Warning: overriding default wallet with user-supplied wallet')
if cmd_args or wf:
check_infile(cmd_args[0] if cmd_args else wf)
return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt]
def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'):
if message.strip():
msg(message.strip())
@ -562,180 +462,16 @@ def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'):
if line_input(a+b).strip() != expect:
raise UserNonConfirmation(exit_msg)
def write_data_to_file( outfile,data,desc='data',
ask_write=False,
ask_write_prompt='',
ask_write_default_yes=True,
ask_overwrite=True,
ask_tty=True,
no_tty=False,
quiet=False,
binary=False,
ignore_opt_outdir=False,
check_data=False,
cmp_data=None):
if quiet: ask_tty = ask_overwrite = False
if opt.quiet: ask_overwrite = False
if ask_write_default_yes == False or ask_write_prompt:
ask_write = True
def do_stdout():
qmsg('Output to STDOUT requested')
if g.stdin_tty:
if no_tty:
die(2,f'Printing {desc} to screen is not allowed')
if (ask_tty and not opt.quiet) or binary:
confirm_or_raise('',f'output {desc} to screen')
else:
try: of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux
except: of = None # Windows
if of:
if of[:5] == 'pipe:':
if no_tty:
die(2,f'Writing {desc} to pipe is not allowed')
if ask_tty and not opt.quiet:
confirm_or_raise('',f'output {desc} to pipe')
msg('')
of2,pd = os.path.relpath(of),os.path.pardir
msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2))
else:
msg('Redirecting output to file')
if binary and g.platform == 'win':
import msvcrt
msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
# MSWin workaround. See msg_r()
try:
sys.stdout.write(data.decode() if isinstance(data,bytes) else data)
except:
os.write(1,data if isinstance(data,bytes) else data.encode())
def do_file(outfile,ask_write_prompt):
if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile):
outfile = make_full_path(opt.outdir,outfile)
if ask_write:
if not ask_write_prompt:
ask_write_prompt = f'Save {desc}?'
if not keypress_confirm(ask_write_prompt,
default_yes=ask_write_default_yes):
die(1,f'{capfirst(desc)} not saved')
hush = False
if os.path.lexists(outfile) and ask_overwrite:
confirm_or_raise('',f'File {outfile!r} already exists\nOverwrite?')
msg(f'Overwriting file {outfile!r}')
hush = True
# not atomic, but better than nothing
# if cmp_data is empty, file can be either empty or non-existent
if check_data:
try:
with open(outfile,('r','rb')[bool(binary)]) as fp:
d = fp.read()
except:
d = ''
finally:
if d != cmp_data:
if g.test_suite:
print_diff(cmp_data,d)
die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write')
# To maintain portability, always open files in binary mode
# If 'binary' option not set, encode/decode data before writing and after reading
try:
with open_or_die(outfile,'wb') as fp:
fp.write(data if binary else data.encode())
except:
die(2,f'Failed to write {desc} to file {outfile!r}')
if not (hush or quiet):
msg(f'{capfirst(desc)} written to file {outfile!r}')
return True
if opt.stdout or outfile in ('','-'):
do_stdout()
elif sys.stdin.isatty() and not sys.stdout.isatty():
do_stdout()
else:
do_file(outfile,ask_write_prompt)
def get_words_from_user(prompt):
words = line_input(prompt, echo=opt.echo_passphrase).split()
dmsg('Sanitized input: [{}]'.format(' '.join(words)))
return words
def get_words_from_file(infile,desc,quiet=False):
if not quiet:
qmsg(f'Getting {desc} from file {infile!r}')
with open_or_die(infile, 'rb') as fp:
data = fp.read()
try:
words = data.decode().split()
except:
die(1,f'{capfirst(desc)} data must be UTF-8 encoded.')
dmsg('Sanitized input: [{}]'.format(' '.join(words)))
return words
def get_words(infile,desc,prompt):
if infile:
return get_words_from_file(infile,desc)
else:
return get_words_from_user(prompt)
def mmgen_decrypt_file_maybe(fn,desc='',quiet=False,silent=False):
d = get_data_from_file(fn,desc,binary=True,quiet=quiet,silent=silent)
from .crypto import mmenc_ext
have_enc_ext = get_extension(fn) == mmenc_ext
if have_enc_ext or not is_utf8(d):
m = ('Attempting to decrypt','Decrypting')[have_enc_ext]
qmsg(f'{m} {desc} {fn!r}')
from .crypto import mmgen_decrypt_retry
d = mmgen_decrypt_retry(d,desc)
return d
def get_lines_from_file(fn,desc='',trim_comments=False,quiet=False,silent=False):
dec = mmgen_decrypt_file_maybe(fn,desc,quiet=quiet,silent=silent)
ret = dec.decode().splitlines()
if trim_comments:
ret = strip_comments(ret)
dmsg(f'Got {len(ret)} lines from file {fn!r}')
return ret
def get_data_from_user(desc='data'): # user input MUST be UTF-8
data = line_input(f'Enter {desc}: ',echo=opt.echo_passphrase)
dmsg(f'User input: [{data}]')
return data
def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False):
if not opt.quiet and not silent and not quiet and desc:
qmsg(f'Getting {desc} from file {infile!r}')
with open_or_die(
(0 if dash and infile == '-' else infile),
'rb',
silent=silent) as fp:
data = fp.read(g.max_input_size+1)
if not binary:
data = data.decode()
if len(data) == g.max_input_size + 1:
raise MaxInputSizeExceeded(f'Too much input data! Max input data size: {f.max_input_size} bytes')
return data
class oneshot_warning:
color = 'nocolor'
@ -860,6 +596,7 @@ def do_pager(text):
if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]:
pagers = [os.environ['PAGER']] + pagers
from subprocess import run
for pager in pagers:
try:
m = text + ('' if pager == 'less' else end_msg)

View file

@ -162,6 +162,7 @@ class Wallet(MMGenObject,metaclass=WalletMeta):
def _get_data(self):
if hasattr(self,'infile'):
from .fileutil import get_data_from_file
self.fmt_data = get_data_from_file(self.infile.name,self.desc,binary=self.file_mode=='binary')
elif self.in_data:
self.fmt_data = self.in_data
@ -238,6 +239,7 @@ class Wallet(MMGenObject,metaclass=WalletMeta):
# write_data_to_file(): outfile with absolute path overrides opt.outdir
if outdir:
of = os.path.abspath(os.path.join(outdir,self._filename()))
from .fileutil import write_data_to_file
write_data_to_file(of if outdir else self._filename(),self.fmt_data,**kwargs)
class WalletUnenc(Wallet):
@ -332,6 +334,7 @@ class WalletEnc(Wallet):
self.desc
)
if self.passwd_file:
from .fileutil import get_words_from_file
pw = ' '.join(get_words_from_file(
self.passwd_file,
desc,
@ -365,6 +368,7 @@ class WalletEnc(Wallet):
('',' '+add_desc)[bool(add_desc)]
)
if self.passwd_file:
from .fileutil import get_words_from_file
ret = ' '.join(get_words_from_file(
self.passwd_file,
desc,

View file

@ -163,6 +163,7 @@ class MoneroMMGenTX:
self.data.amount,
(lambda s: '' if s == 'mainnet' else f'.{s}')(self.data.network),
)
from .fileutil import write_data_to_file
write_data_to_file(fn,out,desc='MoneroMMGenTX data',ask_write=True,ask_write_default_yes=False)
class NewSigned(Base):
@ -191,6 +192,7 @@ class MoneroMMGenTX:
class Signed(Base):
def __init__(self,fn):
from .fileutil import get_data_from_file
self.fn = fn
d_wrap = json.loads(get_data_from_file(fn))['MoneroMMGenTX']
d = self.xmrwallet_tx_data(**d_wrap['data'])

View file

@ -19,6 +19,7 @@ opts_data = {
cmd_args = opts.init(opts_data)
from mmgen.fileutil import get_lines_from_file
lines = get_lines_from_file(cmd_args[0])
start = (1,0)[bool(opt.include_first_line)]
a = make_chksum_6(' '.join(lines[start:]).encode())

View file

@ -26,6 +26,7 @@ class TestSuiteFatalException(Exception): pass
import os
from mmgen.common import *
from mmgen.devtools import *
from mmgen.fileutil import write_data_to_file,get_data_from_file
def strip_ansi_escapes(s):
import re
@ -130,7 +131,6 @@ def write_to_tmpfile(cfg,fn,data,binary=False):
write_to_file( os.path.join(cfg['tmpdir'],fn), data=data, binary=binary )
def read_from_file(fn,binary=False):
from mmgen.util import get_data_from_file
return get_data_from_file(fn,quiet=True,binary=binary)
def read_from_tmpfile(cfg,fn,binary=False):

View file

@ -10,7 +10,7 @@ if os.getenv('MMGEN_BOGUS_WALLET_DATA'):
async def fake_get_unspent_rpc(foo):
from decimal import Decimal
import json
from mmgen.util import get_data_from_file
from mmgen.fileutil import get_data_from_file
return json.loads(get_data_from_file(os.getenv('MMGEN_BOGUS_WALLET_DATA')),parse_float=Decimal)
TwUnspentOutputs.set_dates = fake_set_dates

View file

@ -22,6 +22,7 @@ ts_main.py: Basic operations tests for the test.py test suite
from mmgen.globalvars import g
from mmgen.opts import opt
from mmgen.fileutil import get_data_from_file,write_data_to_file
from mmgen.wallet import Wallet,MMGenWallet,MMGenMnemonic,IncogWallet,MMGenSeedFile
from mmgen.rpc import rpc_init
from ..include.common import *

View file

@ -24,7 +24,7 @@ import os,json
from decimal import Decimal
from mmgen.globalvars import g
from mmgen.opts import opt
from mmgen.util import die,gmsg,write_data_to_file
from mmgen.util import die,gmsg
from mmgen.protocol import init_proto
from mmgen.addrlist import AddrList
from mmgen.wallet import MMGenWallet
@ -293,6 +293,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
if n % 2: a.set_comment(idx,f'Test address {n}')
af = a.get_file()
af.format(add_comments=True)
from mmgen.fileutil import write_data_to_file
write_data_to_file(outfile,af.fmt_data,quiet=True,ignore_opt_outdir=True)
end_silence()