fileutil.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. fileutil.py: Routines that read, write, execute or stat files
  20. """
  21. import sys,os
  22. from .globalvars import g
  23. from .color import set_vt100
  24. from .util import (
  25. msg,
  26. qmsg,
  27. dmsg,
  28. die,
  29. confirm_or_raise,
  30. get_extension,
  31. is_utf8,
  32. capfirst,
  33. make_full_path,
  34. strip_comments,
  35. keypress_confirm,
  36. )
  37. def check_or_create_dir(path):
  38. try:
  39. os.listdir(path)
  40. except:
  41. if os.getenv('MMGEN_TEST_SUITE'):
  42. from subprocess import run
  43. run([
  44. ('rm' if g.platform == 'win' else '/bin/rm'),
  45. '-rf',
  46. path ])
  47. set_vt100()
  48. try:
  49. os.makedirs(path,0o700)
  50. except:
  51. die(2,f'ERROR: unable to read or create path {path!r}')
  52. def check_binary(args):
  53. from subprocess import run,DEVNULL
  54. try:
  55. run(args,stdout=DEVNULL,stderr=DEVNULL,check=True)
  56. except:
  57. die(2,f'{args[0]!r} binary missing, not in path, or not executable')
  58. set_vt100()
  59. def shred_file(fn,verbose=False):
  60. check_binary(['shred','--version'])
  61. from subprocess import run
  62. run(
  63. ['shred','--force','--iterations=30','--zero','--remove=wipesync']
  64. + (['--verbose'] if verbose else [])
  65. + [fn],
  66. check=True )
  67. set_vt100()
  68. def _check_file_type_and_access(fname,ftype,blkdev_ok=False):
  69. import stat
  70. access,op_desc = (
  71. (os.W_OK,'writ') if ftype in ('output file','output directory') else
  72. (os.R_OK,'read') )
  73. if ftype == 'output directory':
  74. ok_types = [(stat.S_ISDIR, 'output directory')]
  75. else:
  76. ok_types = [
  77. (stat.S_ISREG,'regular file'),
  78. (stat.S_ISLNK,'symbolic link')
  79. ]
  80. if blkdev_ok:
  81. ok_types.append((stat.S_ISBLK,'block device'))
  82. try:
  83. mode = os.stat(fname).st_mode
  84. except:
  85. die( 'FileNotFound', f'Requested {ftype} {fname!r} not found' )
  86. for t in ok_types:
  87. if t[0](mode):
  88. break
  89. else:
  90. ok_list = ' or '.join( t[1] for t in ok_types )
  91. die(1,f'Requested {ftype} {fname!r} is not a {ok_list}')
  92. if not os.access(fname,access):
  93. die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you')
  94. return True
  95. def check_infile(f,blkdev_ok=False):
  96. return _check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok)
  97. def check_outfile(f,blkdev_ok=False):
  98. return _check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok)
  99. def check_outdir(f):
  100. return _check_file_type_and_access(f,'output directory')
  101. def get_seed_file(cmd_args,nargs,invoked_as=None):
  102. from .opts import opt
  103. from .filename import find_file_in_dir
  104. from .wallet.mmgen import wallet
  105. wf = find_file_in_dir(wallet,g.data_dir)
  106. wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
  107. import mmgen.opts as opts
  108. if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs:
  109. if not wf:
  110. msg('No default wallet found, and no other seed source was specified')
  111. opts.usage()
  112. elif len(cmd_args) > nargs:
  113. opts.usage()
  114. elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
  115. qmsg('Warning: overriding default wallet with user-supplied wallet')
  116. if cmd_args or wf:
  117. check_infile(cmd_args[0] if cmd_args else wf)
  118. return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt]
  119. def _open_or_die(filename,mode,silent=False):
  120. try:
  121. return open(filename,mode)
  122. except:
  123. die(2,'' if silent else
  124. 'Unable to open file {!r} for {}'.format(
  125. ({0:'STDIN',1:'STDOUT',2:'STDERR'}[filename] if type(filename) == int else filename),
  126. ('reading' if 'r' in mode else 'writing')
  127. ))
  128. def write_data_to_file( outfile,data,desc='data',
  129. ask_write=False,
  130. ask_write_prompt='',
  131. ask_write_default_yes=True,
  132. ask_overwrite=True,
  133. ask_tty=True,
  134. no_tty=False,
  135. quiet=False,
  136. binary=False,
  137. ignore_opt_outdir=False,
  138. check_data=False,
  139. cmp_data=None):
  140. from .opts import opt
  141. if quiet:
  142. ask_tty = ask_overwrite = False
  143. if opt.quiet:
  144. ask_overwrite = False
  145. if ask_write_default_yes == False or ask_write_prompt:
  146. ask_write = True
  147. def do_stdout():
  148. qmsg('Output to STDOUT requested')
  149. if g.stdin_tty:
  150. if no_tty:
  151. die(2,f'Printing {desc} to screen is not allowed')
  152. if (ask_tty and not opt.quiet) or binary:
  153. confirm_or_raise(
  154. message = '',
  155. action = f'output {desc} to screen' )
  156. else:
  157. try: of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux
  158. except: of = None # Windows
  159. if of:
  160. if of[:5] == 'pipe:':
  161. if no_tty:
  162. die(2,f'Writing {desc} to pipe is not allowed')
  163. if ask_tty and not opt.quiet:
  164. confirm_or_raise(
  165. message = '',
  166. action = f'output {desc} to pipe' )
  167. msg('')
  168. of2,pd = os.path.relpath(of),os.path.pardir
  169. msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2))
  170. else:
  171. msg('Redirecting output to file')
  172. if binary and g.platform == 'win':
  173. import msvcrt
  174. msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
  175. # MSWin workaround. See msg_r()
  176. try:
  177. sys.stdout.write(data.decode() if isinstance(data,bytes) else data)
  178. except:
  179. os.write(1,data if isinstance(data,bytes) else data.encode())
  180. def do_file(outfile,ask_write_prompt):
  181. if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile):
  182. outfile = make_full_path(opt.outdir,outfile)
  183. if ask_write:
  184. if not ask_write_prompt:
  185. ask_write_prompt = f'Save {desc}?'
  186. if not keypress_confirm(ask_write_prompt,
  187. default_yes=ask_write_default_yes):
  188. die(1,f'{capfirst(desc)} not saved')
  189. hush = False
  190. if os.path.lexists(outfile) and ask_overwrite:
  191. confirm_or_raise(
  192. message = '',
  193. action = f'File {outfile!r} already exists\nOverwrite?' )
  194. msg(f'Overwriting file {outfile!r}')
  195. hush = True
  196. # not atomic, but better than nothing
  197. # if cmp_data is empty, file can be either empty or non-existent
  198. if check_data:
  199. try:
  200. with open(outfile,('r','rb')[bool(binary)]) as fp:
  201. d = fp.read()
  202. except:
  203. d = ''
  204. finally:
  205. if d != cmp_data:
  206. if g.test_suite:
  207. print_diff(cmp_data,d)
  208. die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write')
  209. # To maintain portability, always open files in binary mode
  210. # If 'binary' option not set, encode/decode data before writing and after reading
  211. try:
  212. with _open_or_die(outfile,'wb') as fp:
  213. fp.write(data if binary else data.encode())
  214. except:
  215. die(2,f'Failed to write {desc} to file {outfile!r}')
  216. if not (hush or quiet):
  217. msg(f'{capfirst(desc)} written to file {outfile!r}')
  218. return True
  219. if opt.stdout or outfile in ('','-'):
  220. do_stdout()
  221. elif sys.stdin.isatty() and not sys.stdout.isatty():
  222. do_stdout()
  223. else:
  224. do_file(outfile,ask_write_prompt)
  225. def get_words_from_file(infile,desc,quiet=False):
  226. if not quiet:
  227. qmsg(f'Getting {desc} from file {infile!r}')
  228. with _open_or_die(infile, 'rb') as fp:
  229. data = fp.read()
  230. try:
  231. words = data.decode().split()
  232. except:
  233. die(1,f'{capfirst(desc)} data must be UTF-8 encoded.')
  234. dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  235. return words
  236. def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False):
  237. from .opts import opt
  238. if not (opt.quiet or silent or quiet):
  239. qmsg(f'Getting {desc} from file {infile!r}')
  240. with _open_or_die(
  241. (0 if dash and infile == '-' else infile),
  242. 'rb',
  243. silent=silent) as fp:
  244. data = fp.read(g.max_input_size+1)
  245. if not binary:
  246. data = data.decode()
  247. if len(data) == g.max_input_size + 1:
  248. die( 'MaxInputSizeExceeded',
  249. f'Too much input data! Max input data size: {f.max_input_size} bytes' )
  250. return data
  251. def _mmgen_decrypt_file_maybe(fn,desc='data',quiet=False,silent=False):
  252. d = get_data_from_file(fn,desc=desc,binary=True,quiet=quiet,silent=silent)
  253. from .crypto import mmenc_ext
  254. have_enc_ext = get_extension(fn) == mmenc_ext
  255. if have_enc_ext or not is_utf8(d):
  256. m = ('Attempting to decrypt','Decrypting')[have_enc_ext]
  257. qmsg(f'{m} {desc} {fn!r}')
  258. from .crypto import mmgen_decrypt_retry
  259. d = mmgen_decrypt_retry(d,desc)
  260. return d
  261. def get_lines_from_file(fn,desc='data',trim_comments=False,quiet=False,silent=False):
  262. dec = _mmgen_decrypt_file_maybe(fn,desc=desc,quiet=quiet,silent=silent)
  263. ret = dec.decode().splitlines()
  264. if trim_comments:
  265. ret = strip_comments(ret)
  266. dmsg(f'Got {len(ret)} lines from file {fn!r}')
  267. return ret