fileutil.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. fileutil: Routines that read, write, execute or stat files
  20. """
  21. # 09 Mar 2024: make all utils accept Path instances as arguments
  22. import sys, os
  23. from .color import set_vt100
  24. from .util import (
  25. msg,
  26. die,
  27. get_extension,
  28. is_utf8,
  29. capfirst,
  30. make_full_path,
  31. strip_comments,
  32. )
  33. def check_or_create_dir(path):
  34. try:
  35. os.listdir(path)
  36. except:
  37. if os.getenv('MMGEN_TEST_SUITE'):
  38. if os.path.exists(path): # path is a link or regular file
  39. from subprocess import run
  40. run(['rm', '-rf', str(path)])
  41. set_vt100()
  42. try:
  43. os.makedirs(path, 0o700)
  44. except:
  45. die(2, f'ERROR: unable to read or create path ‘{path}’')
  46. def check_binary(args):
  47. from subprocess import run, DEVNULL
  48. try:
  49. run(args, stdout=DEVNULL, stderr=DEVNULL, check=True)
  50. except:
  51. die(2, f'{args[0]!r} binary missing, not in path, or not executable')
  52. set_vt100()
  53. def shred_file(fn, *, verbose=False):
  54. check_binary(['shred', '--version'])
  55. from subprocess import run
  56. run(
  57. ['shred', '--force', '--iterations=30', '--zero', '--remove=wipesync']
  58. + (['--verbose'] if verbose else [])
  59. + [str(fn)],
  60. check=True)
  61. set_vt100()
  62. def _check_file_type_and_access(fname, ftype, *, blkdev_ok=False):
  63. import stat
  64. access, op_desc = (
  65. (os.W_OK, 'writ') if ftype in ('output file', 'output directory') else
  66. (os.R_OK, 'read'))
  67. if ftype == 'output directory':
  68. ok_types = [(stat.S_ISDIR, 'output directory')]
  69. else:
  70. ok_types = [
  71. (stat.S_ISREG, 'regular file'),
  72. (stat.S_ISLNK, 'symbolic link')
  73. ]
  74. if blkdev_ok:
  75. if not sys.platform in ('win32',):
  76. ok_types.append((stat.S_ISBLK, 'block device'))
  77. try:
  78. mode = os.stat(fname).st_mode
  79. except:
  80. die('FileNotFound', f'Requested {ftype} ‘{fname}’ not found')
  81. for t in ok_types:
  82. if t[0](mode):
  83. break
  84. else:
  85. ok_list = ' or '.join(t[1] for t in ok_types)
  86. die(1, f'Requested {ftype} ‘{fname}’ is not a {ok_list}')
  87. if not os.access(fname, access):
  88. die(1, f'Requested {ftype} ‘{fname}’ is not {op_desc}able by you')
  89. return True
  90. def check_infile(f, *, blkdev_ok=False):
  91. return _check_file_type_and_access(f, 'input file', blkdev_ok=blkdev_ok)
  92. def check_outfile(f, *, blkdev_ok=False):
  93. return _check_file_type_and_access(f, 'output file', blkdev_ok=blkdev_ok)
  94. def check_outfile_dir(fn, *, blkdev_ok=False):
  95. return _check_file_type_and_access(
  96. os.path.dirname(os.path.abspath(fn)), 'output directory', blkdev_ok=blkdev_ok)
  97. def check_outdir(f):
  98. return _check_file_type_and_access(f, 'output directory')
  99. def get_seed_file(cfg, *, nargs, wallets=None, invoked_as=None):
  100. wallets = wallets or cfg._args
  101. from .filename import find_file_in_dir
  102. from .wallet.mmgen import wallet
  103. wf = find_file_in_dir(wallet, cfg.data_dir)
  104. wd_from_opt = bool(cfg.hidden_incog_input_params or cfg.in_fmt) # have wallet data from opt?
  105. if len(wallets) + (wd_from_opt or bool(wf)) < nargs:
  106. if not wf:
  107. msg('No default wallet found, and no other seed source was specified')
  108. cfg._usage()
  109. elif len(wallets) > nargs:
  110. cfg._usage()
  111. elif len(wallets) == nargs and wf and invoked_as != 'gen':
  112. cfg._util.qmsg('Warning: overriding default wallet with user-supplied wallet')
  113. if wallets or wf:
  114. check_infile(wallets[0] if wallets else wf)
  115. return str(wallets[0]) if wallets else (wf, None)[wd_from_opt] # could be a Path instance
  116. def _open_or_die(filename, mode, *, silent=False):
  117. try:
  118. return open(filename, mode)
  119. except:
  120. if silent:
  121. die(2, '')
  122. else:
  123. fn = {0:'STDIN', 1:'STDOUT', 2:'STDERR'}[filename] if isinstance(filename, int) else f'‘{filename}’'
  124. desc = 'reading' if 'r' in mode else 'writing'
  125. die(2, f'Unable to open file {fn} for {desc}')
  126. def write_data_to_file(
  127. cfg,
  128. outfile,
  129. data,
  130. *,
  131. desc = 'data',
  132. ask_write = False,
  133. ask_write_prompt = '',
  134. ask_write_default_yes = True,
  135. ask_overwrite = True,
  136. ask_tty = True,
  137. no_tty = False,
  138. no_stdout = False,
  139. quiet = False,
  140. binary = False,
  141. ignore_opt_outdir = False,
  142. outdir = None,
  143. check_data = False,
  144. cmp_data = None):
  145. outfile = str(outfile) # could be a Path instance
  146. if quiet:
  147. ask_tty = ask_overwrite = False
  148. if cfg.quiet:
  149. ask_overwrite = False
  150. if ask_write_default_yes is False or ask_write_prompt:
  151. ask_write = True
  152. def do_stdout():
  153. cfg._util.qmsg('Output to STDOUT requested')
  154. if cfg.stdin_tty:
  155. if no_tty:
  156. die(2, f'Printing {desc} to screen is not allowed')
  157. if (ask_tty and not cfg.quiet) or binary:
  158. from .ui import confirm_or_raise
  159. confirm_or_raise(
  160. cfg,
  161. message = '',
  162. action = f'output {desc} to screen')
  163. else:
  164. try:
  165. of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux
  166. except:
  167. of = None # Windows
  168. if of:
  169. if of[:5] == 'pipe:':
  170. if no_tty:
  171. die(2, f'Writing {desc} to pipe is not allowed')
  172. if ask_tty and not cfg.quiet:
  173. from .ui import confirm_or_raise
  174. confirm_or_raise(
  175. cfg,
  176. message = '',
  177. action = f'output {desc} to pipe')
  178. msg('')
  179. of2, pd = os.path.relpath(of), os.path.pardir
  180. msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2))
  181. else:
  182. msg('Redirecting output to file')
  183. if binary and sys.platform == 'win32':
  184. import msvcrt
  185. msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
  186. # MSWin workaround. See msg_r()
  187. try:
  188. sys.stdout.write(data.decode() if isinstance(data, bytes) else data)
  189. except:
  190. os.write(1, data if isinstance(data, bytes) else data.encode())
  191. def do_file(outfile, ask_write_prompt):
  192. if (outdir or (cfg.outdir and not ignore_opt_outdir)) and not os.path.isabs(outfile):
  193. outfile = make_full_path(str(outdir or cfg.outdir), outfile) # outdir could be Path instance
  194. if ask_write:
  195. if not ask_write_prompt:
  196. ask_write_prompt = f'Save {desc}?'
  197. from .ui import keypress_confirm
  198. if not keypress_confirm(
  199. cfg,
  200. ask_write_prompt,
  201. default_yes = ask_write_default_yes):
  202. die(1, f'{capfirst(desc)} not saved')
  203. hush = False
  204. if os.path.lexists(outfile) and ask_overwrite:
  205. from .ui import confirm_or_raise
  206. confirm_or_raise(
  207. cfg,
  208. message = '',
  209. action = f'File {outfile!r} already exists\nOverwrite?')
  210. msg(f'Overwriting file {outfile!r}')
  211. hush = True
  212. # not atomic, but better than nothing
  213. # if cmp_data is empty, file can be either empty or non-existent
  214. if check_data:
  215. d = ''
  216. try:
  217. with open(outfile, ('r', 'rb')[bool(binary)]) as fp:
  218. d = fp.read()
  219. except:
  220. pass
  221. if d != cmp_data:
  222. die(3, f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write')
  223. # To maintain portability, always open files in binary mode
  224. # If 'binary' option not set, encode/decode data before writing and after reading
  225. try:
  226. with _open_or_die(outfile, 'wb') as fp:
  227. fp.write(data if binary else data.encode())
  228. except:
  229. die(2, f'Failed to write {desc} to file {outfile!r}')
  230. if not (hush or quiet):
  231. msg(f'{capfirst(desc)} written to file {outfile!r}')
  232. return True
  233. if no_stdout:
  234. do_file(outfile, ask_write_prompt)
  235. elif cfg.stdout or outfile in ('', '-'):
  236. do_stdout()
  237. elif sys.stdin.isatty() and not sys.stdout.isatty():
  238. do_stdout()
  239. else:
  240. do_file(outfile, ask_write_prompt)
  241. def get_words_from_file(cfg, infile, *, desc, quiet=False):
  242. if not quiet:
  243. cfg._util.qmsg(f'Getting {desc} from file ‘{infile}’')
  244. with _open_or_die(infile, 'rb') as fp:
  245. data = fp.read()
  246. try:
  247. words = data.decode().split()
  248. except:
  249. die(1, f'{capfirst(desc)} data must be UTF-8 encoded.')
  250. cfg._util.dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  251. return words
  252. def get_data_from_file(
  253. cfg,
  254. infile,
  255. *,
  256. desc = 'data',
  257. dash = False,
  258. silent = False,
  259. binary = False,
  260. quiet = False):
  261. if not (cfg.quiet or silent or quiet):
  262. cfg._util.qmsg(f'Getting {desc} from file ‘{infile}’')
  263. with _open_or_die(
  264. (0 if dash and infile == '-' else infile),
  265. 'rb',
  266. silent=silent) as fp:
  267. data = fp.read(cfg.max_input_size+1)
  268. if not binary:
  269. data = data.decode()
  270. if len(data) == cfg.max_input_size + 1:
  271. die('MaxInputSizeExceeded',
  272. f'Too much input data! Max input data size: {cfg.max_input_size} bytes')
  273. return data
  274. def get_lines_from_file(
  275. cfg,
  276. fn,
  277. *,
  278. desc = 'data',
  279. trim_comments = False,
  280. quiet = False,
  281. silent = False):
  282. def decrypt_file_maybe():
  283. data = get_data_from_file(cfg, fn, desc=desc, binary=True, quiet=quiet, silent=silent)
  284. from .crypto import Crypto
  285. have_enc_ext = get_extension(fn) == Crypto.mmenc_ext
  286. if have_enc_ext or not is_utf8(data):
  287. m = ('Attempting to decrypt', 'Decrypting')[have_enc_ext]
  288. cfg._util.qmsg(f'{m} {desc} ‘{fn}’')
  289. data = Crypto(cfg).mmgen_decrypt_retry(data, desc=desc)
  290. return data
  291. lines = decrypt_file_maybe().decode().splitlines()
  292. if trim_comments:
  293. lines = strip_comments(lines)
  294. cfg._util.dmsg(f'Got {len(lines)} lines from file ‘{fn}’')
  295. return lines