diff --git a/mmgen/addrfile.py b/mmgen/addrfile.py index d5a8770d..956b3307 100755 --- a/mmgen/addrfile.py +++ b/mmgen/addrfile.py @@ -26,8 +26,6 @@ from .util import ( qmsg_r, die, capfirst, - get_lines_from_file, - write_data_to_file, keypress_confirm, ) from .protocol import init_proto @@ -74,6 +72,7 @@ class AddrFile(MMGenObject): def write(self,fn=None,ask_tty=True,ask_write_default_yes=False,binary=False,desc=None): from .opts import opt + from .fileutil import write_data_to_file write_data_to_file( fn or self.filename, self.fmt_data, @@ -224,6 +223,7 @@ class AddrFile(MMGenObject): return ( proto, proto.addr_type(mmtype_key) ) p = self.parent + from .fileutil import get_lines_from_file lines = get_lines_from_file(fn,p.desc+' data',trim_comments=True) try: diff --git a/mmgen/crypto.py b/mmgen/crypto.py index 947c6426..91cfc0dd 100755 --- a/mmgen/crypto.py +++ b/mmgen/crypto.py @@ -232,6 +232,7 @@ def get_hash_preset_from_user(hp=g.dfl_hash_preset,desc='data'): def get_new_passphrase(desc,passchg=False): pw_desc = f"{'new ' if passchg else ''}passphrase for {desc}" if opt.passwd_file: + from .fileutil import get_words_from_file pw = ' '.join(get_words_from_file(opt.passwd_file,pw_desc)) elif opt.echo_passphrase: pw = ' '.join(get_words_from_user(f'Enter {pw_desc}: ')) @@ -254,6 +255,7 @@ def get_new_passphrase(desc,passchg=False): def get_passphrase(desc,passchg=False): pw_desc = f"{'old ' if passchg else ''}passphrase for {desc}" if opt.passwd_file: + from .fileutil import get_words_from_file pwfile_reuse_warning(opt.passwd_file) return ' '.join(get_words_from_file(opt.passwd_file,pw_desc)) else: diff --git a/mmgen/data/version b/mmgen/data/version index c400957f..28601de0 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -13.1.dev006 +13.1.dev007 diff --git a/mmgen/fileutil.py b/mmgen/fileutil.py new file mode 100755 index 00000000..da7b3d78 --- /dev/null +++ b/mmgen/fileutil.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +fileutil.py: Routines that read, write, execute or stat files +""" + +import sys,os + +from .globalvars import g +from .exception import FileNotFound,MaxInputSizeExceeded +from .util import ( + msg, + qmsg, + dmsg, + die, + confirm_or_raise, + get_extension, + is_utf8, + capfirst, + make_full_path, + strip_comments, + keypress_confirm, +) + +def check_or_create_dir(path): + try: + os.listdir(path) + except: + if os.getenv('MMGEN_TEST_SUITE'): + from subprocess import run + try: # exception handling required for MSWin/MSYS2 + run(['/bin/rm','-rf',path]) + except: + pass + try: + os.makedirs(path,0o700) + except: + die(2,f'ERROR: unable to read or create path {path!r}') + +def check_binary(args): + from subprocess import run,DEVNULL + try: + run(args,stdout=DEVNULL,stderr=DEVNULL,check=True) + except: + rdie(2,f'{args[0]!r} binary missing, not in path, or not executable') + +def shred_file(fn,verbose=False): + check_binary(['shred','--version']) + from subprocess import run + run( + ['shred','--force','--iterations=30','--zero','--remove=wipesync'] + + (['--verbose'] if verbose else []) + + [fn], + check=True ) + +def _check_file_type_and_access(fname,ftype,blkdev_ok=False): + + import stat + + access,op_desc = ( + (os.W_OK,'writ') if ftype in ('output file','output directory') else + (os.R_OK,'read') ) + + if ftype == 'output directory': + ok_types = [(stat.S_ISDIR, 'output directory')] + else: + ok_types = [ + (stat.S_ISREG,'regular file'), + (stat.S_ISLNK,'symbolic link') + ] + if blkdev_ok: + ok_types.append((stat.S_ISBLK,'block device')) + + try: + mode = os.stat(fname).st_mode + except: + raise FileNotFound(f'Requested {ftype} {fname!r} not found') + + for t in ok_types: + if t[0](mode): + break + else: + ok_list = ' or '.join( t[1] for t in ok_types ) + die(1,f'Requested {ftype} {fname!r} is not a {ok_list}') + + if not os.access(fname,access): + die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you') + + return True + +def check_infile(f,blkdev_ok=False): + return _check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok) + +def check_outfile(f,blkdev_ok=False): + return _check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok) + +def check_outdir(f): + return _check_file_type_and_access(f,'output directory') + +def get_seed_file(cmd_args,nargs,invoked_as=None): + + from .opts import opt + from .filename import find_file_in_dir + from .wallet import MMGenWallet + + wf = find_file_in_dir(MMGenWallet,g.data_dir) + + wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt? + + import mmgen.opts as opts + if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs: + if not wf: + msg('No default wallet found, and no other seed source was specified') + opts.usage() + elif len(cmd_args) > nargs: + opts.usage() + elif len(cmd_args) == nargs and wf and invoked_as != 'gen': + qmsg('Warning: overriding default wallet with user-supplied wallet') + + if cmd_args or wf: + check_infile(cmd_args[0] if cmd_args else wf) + + return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt] + +def _open_or_die(filename,mode,silent=False): + try: + return open(filename,mode) + except: + die(2,'' if silent else + 'Unable to open file {!r} for {}'.format( + ({0:'STDIN',1:'STDOUT',2:'STDERR'}[filename] if type(filename) == int else filename), + ('reading' if 'r' in mode else 'writing') + )) + +def write_data_to_file( outfile,data,desc='data', + ask_write=False, + ask_write_prompt='', + ask_write_default_yes=True, + ask_overwrite=True, + ask_tty=True, + no_tty=False, + quiet=False, + binary=False, + ignore_opt_outdir=False, + check_data=False, + cmp_data=None): + + from .opts import opt + + if quiet: + ask_tty = ask_overwrite = False + + if opt.quiet: + ask_overwrite = False + + if ask_write_default_yes == False or ask_write_prompt: + ask_write = True + + def do_stdout(): + qmsg('Output to STDOUT requested') + if g.stdin_tty: + if no_tty: + die(2,f'Printing {desc} to screen is not allowed') + if (ask_tty and not opt.quiet) or binary: + confirm_or_raise('',f'output {desc} to screen') + else: + try: of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux + except: of = None # Windows + + if of: + if of[:5] == 'pipe:': + if no_tty: + die(2,f'Writing {desc} to pipe is not allowed') + if ask_tty and not opt.quiet: + confirm_or_raise('',f'output {desc} to pipe') + msg('') + of2,pd = os.path.relpath(of),os.path.pardir + msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2)) + else: + msg('Redirecting output to file') + + if binary and g.platform == 'win': + import msvcrt + msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY) + + # MSWin workaround. See msg_r() + try: + sys.stdout.write(data.decode() if isinstance(data,bytes) else data) + except: + os.write(1,data if isinstance(data,bytes) else data.encode()) + + def do_file(outfile,ask_write_prompt): + if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile): + outfile = make_full_path(opt.outdir,outfile) + + if ask_write: + if not ask_write_prompt: + ask_write_prompt = f'Save {desc}?' + if not keypress_confirm(ask_write_prompt, + default_yes=ask_write_default_yes): + die(1,f'{capfirst(desc)} not saved') + + hush = False + if os.path.lexists(outfile) and ask_overwrite: + confirm_or_raise('',f'File {outfile!r} already exists\nOverwrite?') + msg(f'Overwriting file {outfile!r}') + hush = True + + # not atomic, but better than nothing + # if cmp_data is empty, file can be either empty or non-existent + if check_data: + try: + with open(outfile,('r','rb')[bool(binary)]) as fp: + d = fp.read() + except: + d = '' + finally: + if d != cmp_data: + if g.test_suite: + print_diff(cmp_data,d) + die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write') + + # To maintain portability, always open files in binary mode + # If 'binary' option not set, encode/decode data before writing and after reading + try: + with _open_or_die(outfile,'wb') as fp: + fp.write(data if binary else data.encode()) + except: + die(2,f'Failed to write {desc} to file {outfile!r}') + + if not (hush or quiet): + msg(f'{capfirst(desc)} written to file {outfile!r}') + + return True + + if opt.stdout or outfile in ('','-'): + do_stdout() + elif sys.stdin.isatty() and not sys.stdout.isatty(): + do_stdout() + else: + do_file(outfile,ask_write_prompt) + +def get_words_from_file(infile,desc,quiet=False): + + if not quiet: + qmsg(f'Getting {desc} from file {infile!r}') + + with _open_or_die(infile, 'rb') as fp: + data = fp.read() + + try: + words = data.decode().split() + except: + die(1,f'{capfirst(desc)} data must be UTF-8 encoded.') + + dmsg('Sanitized input: [{}]'.format(' '.join(words))) + + return words + +def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False): + + from .opts import opt + if not opt.quiet and not silent and not quiet and desc: + qmsg(f'Getting {desc} from file {infile!r}') + + with _open_or_die( + (0 if dash and infile == '-' else infile), + 'rb', + silent=silent) as fp: + data = fp.read(g.max_input_size+1) + + if not binary: + data = data.decode() + + if len(data) == g.max_input_size + 1: + raise MaxInputSizeExceeded(f'Too much input data! Max input data size: {f.max_input_size} bytes') + + return data + +def _mmgen_decrypt_file_maybe(fn,desc='',quiet=False,silent=False): + d = get_data_from_file(fn,desc,binary=True,quiet=quiet,silent=silent) + from .crypto import mmenc_ext + have_enc_ext = get_extension(fn) == mmenc_ext + if have_enc_ext or not is_utf8(d): + m = ('Attempting to decrypt','Decrypting')[have_enc_ext] + qmsg(f'{m} {desc} {fn!r}') + from .crypto import mmgen_decrypt_retry + d = mmgen_decrypt_retry(d,desc) + return d + +def get_lines_from_file(fn,desc='',trim_comments=False,quiet=False,silent=False): + dec = _mmgen_decrypt_file_maybe(fn,desc,quiet=quiet,silent=silent) + ret = dec.decode().splitlines() + if trim_comments: + ret = strip_comments(ret) + dmsg(f'Got {len(ret)} lines from file {fn!r}') + return ret diff --git a/mmgen/main_addrgen.py b/mmgen/main_addrgen.py index 7d0cfd32..c2c71cdf 100755 --- a/mmgen/main_addrgen.py +++ b/mmgen/main_addrgen.py @@ -145,6 +145,7 @@ if opt.keygen_backend: idxs = AddrIdxList(fmt_str=cmd_args.pop()) +from .fileutil import get_seed_file sf = get_seed_file(cmd_args,1) do_license_msg() diff --git a/mmgen/main_addrimport.py b/mmgen/main_addrimport.py index 58b0e992..8ca52b24 100755 --- a/mmgen/main_addrimport.py +++ b/mmgen/main_addrimport.py @@ -83,6 +83,7 @@ def parse_cmd_args(rpc,cmd_args): if len(cmd_args) == 1: infile = cmd_args[0] + from .fileutil import check_infile,get_lines_from_file check_infile(infile) if opt.addrlist: al = AddrList( diff --git a/mmgen/main_autosign.py b/mmgen/main_autosign.py index 4394d4ea..514a0fb1 100755 --- a/mmgen/main_autosign.py +++ b/mmgen/main_autosign.py @@ -317,6 +317,7 @@ def wipe_existing_key(): try: os.stat(fn) except: pass else: + from .fileutil import shred_file msg(f'\nShredding existing key {fn!r}') shred_file( fn, verbose=opt.verbose ) diff --git a/mmgen/main_passgen.py b/mmgen/main_passgen.py index 38ed11ca..d6db030f 100755 --- a/mmgen/main_passgen.py +++ b/mmgen/main_passgen.py @@ -139,6 +139,7 @@ pw_idxs = AddrIdxList(fmt_str=cmd_args.pop()) pw_id_str = cmd_args.pop() +from .fileutil import get_seed_file sf = get_seed_file(cmd_args,1) pw_fmt = opt.passwd_fmt or PasswordList.dfl_pw_fmt diff --git a/mmgen/main_seedjoin.py b/mmgen/main_seedjoin.py index 771c363b..dc02be5b 100755 --- a/mmgen/main_seedjoin.py +++ b/mmgen/main_seedjoin.py @@ -118,6 +118,7 @@ if opt.master_share: if opt.id_str and not opt.master_share: die(1,'--id-str option meaningless in context of non-master-share join') +from .fileutil import check_infile for arg in cmd_args: check_wallet_extension(arg) check_infile(arg) diff --git a/mmgen/main_txbump.py b/mmgen/main_txbump.py index dcf92014..5f9d2584 100755 --- a/mmgen/main_txbump.py +++ b/mmgen/main_txbump.py @@ -102,6 +102,8 @@ FMT CODES: cmd_args = opts.init(opts_data) tx_file = cmd_args.pop(0) + +from .fileutil import check_infile check_infile(tx_file) from .tx import * diff --git a/mmgen/main_txsend.py b/mmgen/main_txsend.py index efe16b6d..4089dd50 100755 --- a/mmgen/main_txsend.py +++ b/mmgen/main_txsend.py @@ -42,6 +42,7 @@ cmd_args = opts.init(opts_data) if len(cmd_args) == 1: infile = cmd_args[0] + from .fileutil import check_infile check_infile(infile) else: opts.usage() diff --git a/mmgen/main_txsign.py b/mmgen/main_txsign.py index c0c2a8ac..44718146 100755 --- a/mmgen/main_txsign.py +++ b/mmgen/main_txsign.py @@ -100,6 +100,7 @@ infiles = opts.init(opts_data) if not infiles: opts.usage() +from .fileutil import check_infile for i in infiles: check_infile(i) diff --git a/mmgen/main_wallet.py b/mmgen/main_wallet.py index 6106897c..b9a457a7 100755 --- a/mmgen/main_wallet.py +++ b/mmgen/main_wallet.py @@ -169,6 +169,8 @@ elif invoked_as == 'seedsplit': else: opts.usage() +from .fileutil import check_infile,get_seed_file + if cmd_args: if invoked_as == 'gen' or len(cmd_args) > 1: opts.usage() @@ -227,6 +229,7 @@ if invoked_as == 'passchg' and ss_in.infile.dirname == g.data_dir: confirm_or_raise(m1,m2,exit_msg='Password not changed') ss_out.write_to_file(desc='New wallet',outdir=g.data_dir) bmsg('Securely deleting old wallet') + from .fileutil import shred_file shred_file( ss_in.infile.name, verbose = opt.verbose ) diff --git a/mmgen/opts.py b/mmgen/opts.py index e2fee23d..9c360f1b 100755 --- a/mmgen/opts.py +++ b/mmgen/opts.py @@ -24,6 +24,7 @@ import sys,os,stat from .exception import UserOptError from .globalvars import g from .base_obj import Lockable + import mmgen.share.Opts class UserOpts(Lockable): @@ -307,7 +308,7 @@ def init(opts_data=None,add_opts=None,init_opts=None,opt_filter=None,parse_only= else: g.data_dir_root = os.path.join(g.home_dir,'.'+g.proj_name.lower()) - from .util import check_or_create_dir + from .fileutil import check_or_create_dir check_or_create_dir(g.data_dir_root) from .term import init_term @@ -516,7 +517,7 @@ def check_usr_opts(usr_opts): # Raises an exception if any check fails fn,offset = a opt_is_int(offset,desc) - from .util import check_infile,check_outdir,check_outfile + from .fileutil import check_infile,check_outdir,check_outfile if key == 'hidden_incog_input_params': check_infile(fn,blkdev_ok=True) key2 = 'in_fmt' @@ -624,10 +625,10 @@ def check_usr_opts(usr_opts): # Raises an exception if any check fails desc = f'parameter for {fmt_opt(key)!r} option' if key in g.infile_opts: - from .util import check_infile + from .fileutil import check_infile check_infile(val) # file exists and is readable - dies on error elif key == 'outdir': - from .util import check_outdir + from .fileutil import check_outdir check_outdir(val) # dies on error elif 'chk_'+key in cfuncs: cfuncs['chk_'+key](key,val,desc) diff --git a/mmgen/rpc.py b/mmgen/rpc.py index 116b5a6e..34412275 100755 --- a/mmgen/rpc.py +++ b/mmgen/rpc.py @@ -23,6 +23,7 @@ rpc.py: Cryptocoin RPC library for the MMGen suite import base64,json,asyncio from decimal import Decimal from .common import * +from .fileutil import get_lines_from_file from .objmethods import Hilite,InitErrors from .base_obj import AsyncInit diff --git a/mmgen/tool.py b/mmgen/tool.py index 3bb8bf07..dd20bd19 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -32,6 +32,7 @@ from .passwdlist import PasswordList from .baseconv import baseconv from .xmrseed import xmrseed from .bip39 import bip39 +from .fileutil import get_seed_file,get_data_from_file,write_data_to_file NL = ('\n','\r\n')[g.platform=='win'] diff --git a/mmgen/twctl.py b/mmgen/twctl.py index 20a23638..55ebf5bd 100755 --- a/mmgen/twctl.py +++ b/mmgen/twctl.py @@ -22,15 +22,7 @@ twctl: Tracking wallet control class for the MMGen suite from .globalvars import g from .exception import WalletFileError -from .util import ( - msg, - dmsg, - check_or_create_dir, - write_data_to_file, - get_data_from_file, - write_mode, - altcoin_subclass -) +from .util import msg,dmsg,write_mode,altcoin_subclass from .base_obj import AsyncInit from .objmethods import MMGenObject from .obj import TwComment,get_obj @@ -93,6 +85,7 @@ class TrackingWallet(MMGenObject,metaclass=AsyncInit): )) self.tw_fn = os.path.join(tw_dir,'tracking-wallet.json') + from .fileutil import check_or_create_dir,get_data_from_file check_or_create_dir(tw_dir) try: @@ -210,6 +203,7 @@ class TrackingWallet(MMGenObject,metaclass=AsyncInit): @write_mode def write_changed(self,data): + from .fileutil import write_data_to_file write_data_to_file( self.tw_fn, data, diff --git a/mmgen/twuo.py b/mmgen/twuo.py index 60234247..bdd0a5e4 100755 --- a/mmgen/twuo.py +++ b/mmgen/twuo.py @@ -35,7 +35,6 @@ from .util import ( fmt, make_timestr, keypress_confirm, - write_data_to_file, line_input, do_pager, altcoin_subclass @@ -507,6 +506,7 @@ Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, add [l]abel: self.proto.dcoin, ','.join(self.sort_info(include_group=False)).lower() ) msg('') + from .fileutil import write_data_to_file try: write_data_to_file( of, diff --git a/mmgen/tx.py b/mmgen/tx.py index e5614ba0..432585f2 100755 --- a/mmgen/tx.py +++ b/mmgen/tx.py @@ -488,6 +488,7 @@ class MMGenTX: # returns true if comment added or changed def add_comment(self,infile=None): if infile: + from .fileutil import get_data_from_file self.label = MMGenTxLabel(get_data_from_file(infile,'transaction comment')) else: # get comment from user, or edit existing comment m = ('Add a comment to transaction?','Edit transaction comment?')[bool(self.label)] @@ -739,6 +740,7 @@ class MMGenTX: ) ad_f = AddrData(self.proto) + from .fileutil import check_infile for a in addrfiles: check_infile(a) ad_f.add(AddrList(self.proto,a)) diff --git a/mmgen/txfile.py b/mmgen/txfile.py index 4fea57d9..bbdb6a2b 100755 --- a/mmgen/txfile.py +++ b/mmgen/txfile.py @@ -57,6 +57,7 @@ class MMGenTxFile: )[desc=='inputs'] return io_list(tx,[io(tx.proto,**e) for e in d]) + from .fileutil import get_data_from_file tx_data = get_data_from_file(infile,tx.desc+' data',quiet=quiet_open) try: @@ -206,6 +207,7 @@ class MMGenTxFile: if not self.fmt_data: self.fmt_data = self.format() + from .fileutil import write_data_to_file write_data_to_file( outfile = self.filename, data = self.fmt_data, diff --git a/mmgen/txsign.py b/mmgen/txsign.py index 8a8f9d20..ab9455d6 100755 --- a/mmgen/txsign.py +++ b/mmgen/txsign.py @@ -134,6 +134,7 @@ def get_keyaddrlist(proto,opt): def get_keylist(proto,opt): if opt.keys_from_file: + from .fileutil import get_lines_from_file return get_lines_from_file(opt.keys_from_file,'key-address data',trim_comments=True) return None diff --git a/mmgen/util.py b/mmgen/util.py index bc796c7f..3852f76e 100755 --- a/mmgen/util.py +++ b/mmgen/util.py @@ -20,13 +20,13 @@ util.py: Low-level routines imported by other modules in the MMGen suite """ -import sys,os,time,stat,re -from subprocess import run,PIPE,DEVNULL +import sys,os,time,re from hashlib import sha256 from string import hexdigits,digits + from .color import * -from .exception import * -from .globalvars import * +from .exception import BadFileExtension,UserNonConfirmation +from .globalvars import g CUR_HIDE = '\033[?25l' CUR_SHOW = '\033[?25h' @@ -239,20 +239,6 @@ def parse_bytespec(nbytes): die(1,f'{nbytes!r}: invalid byte specifier') -def check_or_create_dir(path): - try: - os.listdir(path) - except: - if os.getenv('MMGEN_TEST_SUITE'): - try: # exception handling required for MSWin/MSYS2 - run(['/bin/rm','-rf',path]) - except: - pass - try: - os.makedirs(path,0o700) - except: - die(2,f'ERROR: unable to read or create path {path!r}') - from .opts import opt def qmsg(s,alt=None): @@ -460,69 +446,6 @@ def compare_or_die(val1, desc1, val2, desc2, e='Error'): dmsg(f'{capfirst(desc2)} OK ({val2})') return True -def check_binary(args): - try: - run(args,stdout=DEVNULL,stderr=DEVNULL,check=True) - except: - rdie(2,f'{args[0]!r} binary missing, not in path, or not executable') - -def shred_file(fn,verbose=False): - check_binary(['shred','--version']) - run( - ['shred','--force','--iterations=30','--zero','--remove=wipesync'] - + (['--verbose'] if verbose else []) - + [fn], - check=True ) - -def open_or_die(filename,mode,silent=False): - try: - return open(filename,mode) - except: - die(2,'' if silent else - 'Unable to open file {!r} for {}'.format( - ({0:'STDIN',1:'STDOUT',2:'STDERR'}[filename] if type(filename) == int else filename), - ('reading' if 'r' in mode else 'writing') - )) - -def check_file_type_and_access(fname,ftype,blkdev_ok=False): - - access,op_desc = ( - (os.W_OK,'writ') if ftype in ('output file','output directory') else - (os.R_OK,'read') ) - - if ftype == 'output directory': - ok_types = [(stat.S_ISDIR, 'output directory')] - else: - ok_types = [ - (stat.S_ISREG,'regular file'), - (stat.S_ISLNK,'symbolic link') - ] - if blkdev_ok: - ok_types.append((stat.S_ISBLK,'block device')) - - try: - mode = os.stat(fname).st_mode - except: - raise FileNotFound(f'Requested {ftype} {fname!r} not found') - - for t in ok_types: - if t[0](mode): - break - else: - ok_list = ' or '.join( t[1] for t in ok_types ) - die(1,f'Requested {ftype} {fname!r} is not a {ok_list}') - - if not os.access(fname,access): - die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you') - - return True - -def check_infile(f,blkdev_ok=False): - return check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok) -def check_outfile(f,blkdev_ok=False): - return check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok) -def check_outdir(f): - return check_file_type_and_access(f,'output directory') def check_wallet_extension(fn): from .wallet import Wallet if not Wallet.ext_to_type(get_extension(fn)): @@ -531,29 +454,6 @@ def check_wallet_extension(fn): def make_full_path(outdir,outfile): return os.path.normpath(os.path.join(outdir, os.path.basename(outfile))) -def get_seed_file(cmd_args,nargs,invoked_as=None): - from .filename import find_file_in_dir - from .wallet import MMGenWallet - - wf = find_file_in_dir(MMGenWallet,g.data_dir) - - wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt? - - import mmgen.opts as opts - if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs: - if not wf: - msg('No default wallet found, and no other seed source was specified') - opts.usage() - elif len(cmd_args) > nargs: - opts.usage() - elif len(cmd_args) == nargs and wf and invoked_as != 'gen': - qmsg('Warning: overriding default wallet with user-supplied wallet') - - if cmd_args or wf: - check_infile(cmd_args[0] if cmd_args else wf) - - return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt] - def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'): if message.strip(): msg(message.strip()) @@ -562,180 +462,16 @@ def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'): if line_input(a+b).strip() != expect: raise UserNonConfirmation(exit_msg) -def write_data_to_file( outfile,data,desc='data', - ask_write=False, - ask_write_prompt='', - ask_write_default_yes=True, - ask_overwrite=True, - ask_tty=True, - no_tty=False, - quiet=False, - binary=False, - ignore_opt_outdir=False, - check_data=False, - cmp_data=None): - - if quiet: ask_tty = ask_overwrite = False - if opt.quiet: ask_overwrite = False - - if ask_write_default_yes == False or ask_write_prompt: - ask_write = True - - def do_stdout(): - qmsg('Output to STDOUT requested') - if g.stdin_tty: - if no_tty: - die(2,f'Printing {desc} to screen is not allowed') - if (ask_tty and not opt.quiet) or binary: - confirm_or_raise('',f'output {desc} to screen') - else: - try: of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux - except: of = None # Windows - - if of: - if of[:5] == 'pipe:': - if no_tty: - die(2,f'Writing {desc} to pipe is not allowed') - if ask_tty and not opt.quiet: - confirm_or_raise('',f'output {desc} to pipe') - msg('') - of2,pd = os.path.relpath(of),os.path.pardir - msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2)) - else: - msg('Redirecting output to file') - - if binary and g.platform == 'win': - import msvcrt - msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY) - - # MSWin workaround. See msg_r() - try: - sys.stdout.write(data.decode() if isinstance(data,bytes) else data) - except: - os.write(1,data if isinstance(data,bytes) else data.encode()) - - def do_file(outfile,ask_write_prompt): - if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile): - outfile = make_full_path(opt.outdir,outfile) - - if ask_write: - if not ask_write_prompt: - ask_write_prompt = f'Save {desc}?' - if not keypress_confirm(ask_write_prompt, - default_yes=ask_write_default_yes): - die(1,f'{capfirst(desc)} not saved') - - hush = False - if os.path.lexists(outfile) and ask_overwrite: - confirm_or_raise('',f'File {outfile!r} already exists\nOverwrite?') - msg(f'Overwriting file {outfile!r}') - hush = True - - # not atomic, but better than nothing - # if cmp_data is empty, file can be either empty or non-existent - if check_data: - try: - with open(outfile,('r','rb')[bool(binary)]) as fp: - d = fp.read() - except: - d = '' - finally: - if d != cmp_data: - if g.test_suite: - print_diff(cmp_data,d) - die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write') - - # To maintain portability, always open files in binary mode - # If 'binary' option not set, encode/decode data before writing and after reading - try: - with open_or_die(outfile,'wb') as fp: - fp.write(data if binary else data.encode()) - except: - die(2,f'Failed to write {desc} to file {outfile!r}') - - if not (hush or quiet): - msg(f'{capfirst(desc)} written to file {outfile!r}') - - return True - - if opt.stdout or outfile in ('','-'): - do_stdout() - elif sys.stdin.isatty() and not sys.stdout.isatty(): - do_stdout() - else: - do_file(outfile,ask_write_prompt) - def get_words_from_user(prompt): words = line_input(prompt, echo=opt.echo_passphrase).split() dmsg('Sanitized input: [{}]'.format(' '.join(words))) return words -def get_words_from_file(infile,desc,quiet=False): - - if not quiet: - qmsg(f'Getting {desc} from file {infile!r}') - - with open_or_die(infile, 'rb') as fp: - data = fp.read() - - try: - words = data.decode().split() - except: - die(1,f'{capfirst(desc)} data must be UTF-8 encoded.') - - dmsg('Sanitized input: [{}]'.format(' '.join(words))) - - return words - -def get_words(infile,desc,prompt): - if infile: - return get_words_from_file(infile,desc) - else: - return get_words_from_user(prompt) - -def mmgen_decrypt_file_maybe(fn,desc='',quiet=False,silent=False): - d = get_data_from_file(fn,desc,binary=True,quiet=quiet,silent=silent) - from .crypto import mmenc_ext - have_enc_ext = get_extension(fn) == mmenc_ext - if have_enc_ext or not is_utf8(d): - m = ('Attempting to decrypt','Decrypting')[have_enc_ext] - qmsg(f'{m} {desc} {fn!r}') - from .crypto import mmgen_decrypt_retry - d = mmgen_decrypt_retry(d,desc) - return d - -def get_lines_from_file(fn,desc='',trim_comments=False,quiet=False,silent=False): - dec = mmgen_decrypt_file_maybe(fn,desc,quiet=quiet,silent=silent) - ret = dec.decode().splitlines() - if trim_comments: - ret = strip_comments(ret) - dmsg(f'Got {len(ret)} lines from file {fn!r}') - return ret - def get_data_from_user(desc='data'): # user input MUST be UTF-8 data = line_input(f'Enter {desc}: ',echo=opt.echo_passphrase) dmsg(f'User input: [{data}]') return data -def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False): - - if not opt.quiet and not silent and not quiet and desc: - qmsg(f'Getting {desc} from file {infile!r}') - - with open_or_die( - (0 if dash and infile == '-' else infile), - 'rb', - silent=silent) as fp: - data = fp.read(g.max_input_size+1) - - if not binary: - data = data.decode() - - if len(data) == g.max_input_size + 1: - raise MaxInputSizeExceeded(f'Too much input data! Max input data size: {f.max_input_size} bytes') - - return data - class oneshot_warning: color = 'nocolor' @@ -860,6 +596,7 @@ def do_pager(text): if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]: pagers = [os.environ['PAGER']] + pagers + from subprocess import run for pager in pagers: try: m = text + ('' if pager == 'less' else end_msg) diff --git a/mmgen/wallet.py b/mmgen/wallet.py index 1e90adfe..cc248e7f 100755 --- a/mmgen/wallet.py +++ b/mmgen/wallet.py @@ -162,6 +162,7 @@ class Wallet(MMGenObject,metaclass=WalletMeta): def _get_data(self): if hasattr(self,'infile'): + from .fileutil import get_data_from_file self.fmt_data = get_data_from_file(self.infile.name,self.desc,binary=self.file_mode=='binary') elif self.in_data: self.fmt_data = self.in_data @@ -238,6 +239,7 @@ class Wallet(MMGenObject,metaclass=WalletMeta): # write_data_to_file(): outfile with absolute path overrides opt.outdir if outdir: of = os.path.abspath(os.path.join(outdir,self._filename())) + from .fileutil import write_data_to_file write_data_to_file(of if outdir else self._filename(),self.fmt_data,**kwargs) class WalletUnenc(Wallet): @@ -332,6 +334,7 @@ class WalletEnc(Wallet): self.desc ) if self.passwd_file: + from .fileutil import get_words_from_file pw = ' '.join(get_words_from_file( self.passwd_file, desc, @@ -365,6 +368,7 @@ class WalletEnc(Wallet): ('',' '+add_desc)[bool(add_desc)] ) if self.passwd_file: + from .fileutil import get_words_from_file ret = ' '.join(get_words_from_file( self.passwd_file, desc, diff --git a/mmgen/xmrwallet.py b/mmgen/xmrwallet.py index 0c07e301..24fdf79e 100755 --- a/mmgen/xmrwallet.py +++ b/mmgen/xmrwallet.py @@ -163,6 +163,7 @@ class MoneroMMGenTX: self.data.amount, (lambda s: '' if s == 'mainnet' else f'.{s}')(self.data.network), ) + from .fileutil import write_data_to_file write_data_to_file(fn,out,desc='MoneroMMGenTX data',ask_write=True,ask_write_default_yes=False) class NewSigned(Base): @@ -191,6 +192,7 @@ class MoneroMMGenTX: class Signed(Base): def __init__(self,fn): + from .fileutil import get_data_from_file self.fn = fn d_wrap = json.loads(get_data_from_file(fn))['MoneroMMGenTX'] d = self.xmrwallet_tx_data(**d_wrap['data']) diff --git a/scripts/compute-file-chksum.py b/scripts/compute-file-chksum.py index f21fafb0..4971fa09 100755 --- a/scripts/compute-file-chksum.py +++ b/scripts/compute-file-chksum.py @@ -19,6 +19,7 @@ opts_data = { cmd_args = opts.init(opts_data) +from mmgen.fileutil import get_lines_from_file lines = get_lines_from_file(cmd_args[0]) start = (1,0)[bool(opt.include_first_line)] a = make_chksum_6(' '.join(lines[start:]).encode()) diff --git a/test/include/common.py b/test/include/common.py index 17348729..e3ece6cc 100755 --- a/test/include/common.py +++ b/test/include/common.py @@ -26,6 +26,7 @@ class TestSuiteFatalException(Exception): pass import os from mmgen.common import * from mmgen.devtools import * +from mmgen.fileutil import write_data_to_file,get_data_from_file def strip_ansi_escapes(s): import re @@ -130,7 +131,6 @@ def write_to_tmpfile(cfg,fn,data,binary=False): write_to_file( os.path.join(cfg['tmpdir'],fn), data=data, binary=binary ) def read_from_file(fn,binary=False): - from mmgen.util import get_data_from_file return get_data_from_file(fn,quiet=True,binary=binary) def read_from_tmpfile(cfg,fn,binary=False): diff --git a/test/overlay/fakemods/twuo.py b/test/overlay/fakemods/twuo.py index 42204387..86b4d9d5 100644 --- a/test/overlay/fakemods/twuo.py +++ b/test/overlay/fakemods/twuo.py @@ -10,7 +10,7 @@ if os.getenv('MMGEN_BOGUS_WALLET_DATA'): async def fake_get_unspent_rpc(foo): from decimal import Decimal import json - from mmgen.util import get_data_from_file + from mmgen.fileutil import get_data_from_file return json.loads(get_data_from_file(os.getenv('MMGEN_BOGUS_WALLET_DATA')),parse_float=Decimal) TwUnspentOutputs.set_dates = fake_set_dates diff --git a/test/test_py_d/ts_main.py b/test/test_py_d/ts_main.py index 67cf0bff..561b3d6f 100755 --- a/test/test_py_d/ts_main.py +++ b/test/test_py_d/ts_main.py @@ -22,6 +22,7 @@ ts_main.py: Basic operations tests for the test.py test suite from mmgen.globalvars import g from mmgen.opts import opt +from mmgen.fileutil import get_data_from_file,write_data_to_file from mmgen.wallet import Wallet,MMGenWallet,MMGenMnemonic,IncogWallet,MMGenSeedFile from mmgen.rpc import rpc_init from ..include.common import * diff --git a/test/test_py_d/ts_regtest.py b/test/test_py_d/ts_regtest.py index 8e7249c9..aed9e093 100755 --- a/test/test_py_d/ts_regtest.py +++ b/test/test_py_d/ts_regtest.py @@ -24,7 +24,7 @@ import os,json from decimal import Decimal from mmgen.globalvars import g from mmgen.opts import opt -from mmgen.util import die,gmsg,write_data_to_file +from mmgen.util import die,gmsg from mmgen.protocol import init_proto from mmgen.addrlist import AddrList from mmgen.wallet import MMGenWallet @@ -293,6 +293,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared): if n % 2: a.set_comment(idx,f'Test address {n}') af = a.get_file() af.format(add_comments=True) + from mmgen.fileutil import write_data_to_file write_data_to_file(outfile,af.fmt_data,quiet=True,ignore_opt_outdir=True) end_silence()