fileutil.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. fileutil: Routines that read, write, execute or stat files
  20. """
  21. import sys,os
  22. from .color import set_vt100
  23. from .util import (
  24. msg,
  25. die,
  26. get_extension,
  27. is_utf8,
  28. capfirst,
  29. make_full_path,
  30. strip_comments,
  31. )
  32. def check_or_create_dir(path):
  33. try:
  34. os.listdir(path)
  35. except:
  36. if os.getenv('MMGEN_TEST_SUITE'):
  37. if os.path.exists(path): # path is a link or regular file
  38. from subprocess import run
  39. run([
  40. ('rm' if sys.platform == 'win32' else '/bin/rm'),
  41. '-rf',
  42. path ])
  43. set_vt100()
  44. try:
  45. os.makedirs(path,0o700)
  46. except:
  47. die(2,f'ERROR: unable to read or create path {path!r}')
  48. def check_binary(args):
  49. from subprocess import run,DEVNULL
  50. try:
  51. run(args,stdout=DEVNULL,stderr=DEVNULL,check=True)
  52. except:
  53. die(2,f'{args[0]!r} binary missing, not in path, or not executable')
  54. set_vt100()
  55. def shred_file(fn,verbose=False):
  56. check_binary(['shred','--version'])
  57. from subprocess import run
  58. run(
  59. ['shred','--force','--iterations=30','--zero','--remove=wipesync']
  60. + (['--verbose'] if verbose else [])
  61. + [fn],
  62. check=True )
  63. set_vt100()
  64. def _check_file_type_and_access(fname,ftype,blkdev_ok=False):
  65. import stat
  66. access,op_desc = (
  67. (os.W_OK,'writ') if ftype in ('output file','output directory') else
  68. (os.R_OK,'read') )
  69. if ftype == 'output directory':
  70. ok_types = [(stat.S_ISDIR, 'output directory')]
  71. else:
  72. ok_types = [
  73. (stat.S_ISREG,'regular file'),
  74. (stat.S_ISLNK,'symbolic link')
  75. ]
  76. if blkdev_ok:
  77. ok_types.append((stat.S_ISBLK,'block device'))
  78. try:
  79. mode = os.stat(fname).st_mode
  80. except:
  81. die( 'FileNotFound', f'Requested {ftype} {fname!r} not found' )
  82. for t in ok_types:
  83. if t[0](mode):
  84. break
  85. else:
  86. ok_list = ' or '.join( t[1] for t in ok_types )
  87. die(1,f'Requested {ftype} {fname!r} is not a {ok_list}')
  88. if not os.access(fname,access):
  89. die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you')
  90. return True
  91. def check_infile(f,blkdev_ok=False):
  92. return _check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok)
  93. def check_outfile(f,blkdev_ok=False):
  94. return _check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok)
  95. def check_outdir(f):
  96. return _check_file_type_and_access(f,'output directory')
  97. def get_seed_file(cfg,nargs,wallets=None,invoked_as=None):
  98. wallets = wallets or cfg._args
  99. from .filename import find_file_in_dir
  100. from .wallet.mmgen import wallet
  101. wf = find_file_in_dir(wallet,cfg.data_dir)
  102. wd_from_opt = bool(cfg.hidden_incog_input_params or cfg.in_fmt) # have wallet data from opt?
  103. if len(wallets) + (wd_from_opt or bool(wf)) < nargs:
  104. if not wf:
  105. msg('No default wallet found, and no other seed source was specified')
  106. cfg._opts.usage()
  107. elif len(wallets) > nargs:
  108. cfg._opts.usage()
  109. elif len(wallets) == nargs and wf and invoked_as != 'gen':
  110. cfg._util.qmsg('Warning: overriding default wallet with user-supplied wallet')
  111. if wallets or wf:
  112. check_infile(wallets[0] if wallets else wf)
  113. return wallets[0] if wallets else (wf,None)[wd_from_opt]
  114. def _open_or_die(filename,mode,silent=False):
  115. try:
  116. return open(filename,mode)
  117. except:
  118. die(2,'' if silent else
  119. 'Unable to open file {!r} for {}'.format(
  120. ({0:'STDIN',1:'STDOUT',2:'STDERR'}[filename] if isinstance(filename,int) else filename),
  121. ('reading' if 'r' in mode else 'writing')
  122. ))
  123. def write_data_to_file(
  124. cfg,
  125. outfile,
  126. data,
  127. desc = 'data',
  128. ask_write = False,
  129. ask_write_prompt = '',
  130. ask_write_default_yes = True,
  131. ask_overwrite = True,
  132. ask_tty = True,
  133. no_tty = False,
  134. quiet = False,
  135. binary = False,
  136. ignore_opt_outdir = False,
  137. check_data = False,
  138. cmp_data = None):
  139. if quiet:
  140. ask_tty = ask_overwrite = False
  141. if cfg.quiet:
  142. ask_overwrite = False
  143. if ask_write_default_yes is False or ask_write_prompt:
  144. ask_write = True
  145. def do_stdout():
  146. cfg._util.qmsg('Output to STDOUT requested')
  147. if cfg.stdin_tty:
  148. if no_tty:
  149. die(2,f'Printing {desc} to screen is not allowed')
  150. if (ask_tty and not cfg.quiet) or binary:
  151. from .ui import confirm_or_raise
  152. confirm_or_raise(
  153. cfg,
  154. message = '',
  155. action = f'output {desc} to screen' )
  156. else:
  157. try:
  158. of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux
  159. except:
  160. of = None # Windows
  161. if of:
  162. if of[:5] == 'pipe:':
  163. if no_tty:
  164. die(2,f'Writing {desc} to pipe is not allowed')
  165. if ask_tty and not cfg.quiet:
  166. from .ui import confirm_or_raise
  167. confirm_or_raise(
  168. cfg,
  169. message = '',
  170. action = f'output {desc} to pipe' )
  171. msg('')
  172. of2,pd = os.path.relpath(of),os.path.pardir
  173. msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2))
  174. else:
  175. msg('Redirecting output to file')
  176. if binary:
  177. if sys.platform == 'win32': # condition on separate line for pylint
  178. import msvcrt
  179. msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
  180. # MSWin workaround. See msg_r()
  181. try:
  182. sys.stdout.write(data.decode() if isinstance(data,bytes) else data)
  183. except:
  184. os.write(1,data if isinstance(data,bytes) else data.encode())
  185. def do_file(outfile,ask_write_prompt):
  186. if cfg.outdir and not ignore_opt_outdir and not os.path.isabs(outfile):
  187. outfile = make_full_path(cfg.outdir,outfile)
  188. if ask_write:
  189. if not ask_write_prompt:
  190. ask_write_prompt = f'Save {desc}?'
  191. from .ui import keypress_confirm
  192. if not keypress_confirm(
  193. cfg,
  194. ask_write_prompt,
  195. default_yes = ask_write_default_yes ):
  196. die(1,f'{capfirst(desc)} not saved')
  197. hush = False
  198. if os.path.lexists(outfile) and ask_overwrite:
  199. from .ui import confirm_or_raise
  200. confirm_or_raise(
  201. cfg,
  202. message = '',
  203. action = f'File {outfile!r} already exists\nOverwrite?' )
  204. msg(f'Overwriting file {outfile!r}')
  205. hush = True
  206. # not atomic, but better than nothing
  207. # if cmp_data is empty, file can be either empty or non-existent
  208. if check_data:
  209. d = ''
  210. try:
  211. with open(outfile,('r','rb')[bool(binary)]) as fp:
  212. d = fp.read()
  213. except:
  214. pass
  215. if d != cmp_data:
  216. die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write')
  217. # To maintain portability, always open files in binary mode
  218. # If 'binary' option not set, encode/decode data before writing and after reading
  219. try:
  220. with _open_or_die(outfile,'wb') as fp:
  221. fp.write(data if binary else data.encode())
  222. except:
  223. die(2,f'Failed to write {desc} to file {outfile!r}')
  224. if not (hush or quiet):
  225. msg(f'{capfirst(desc)} written to file {outfile!r}')
  226. return True
  227. if cfg.stdout or outfile in ('','-'):
  228. do_stdout()
  229. elif sys.stdin.isatty() and not sys.stdout.isatty():
  230. do_stdout()
  231. else:
  232. do_file(outfile,ask_write_prompt)
  233. def get_words_from_file(cfg,infile,desc,quiet=False):
  234. if not quiet:
  235. cfg._util.qmsg(f'Getting {desc} from file {infile!r}')
  236. with _open_or_die(infile, 'rb') as fp:
  237. data = fp.read()
  238. try:
  239. words = data.decode().split()
  240. except:
  241. die(1,f'{capfirst(desc)} data must be UTF-8 encoded.')
  242. cfg._util.dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  243. return words
  244. def get_data_from_file(
  245. cfg,
  246. infile,
  247. desc = 'data',
  248. dash = False,
  249. silent = False,
  250. binary = False,
  251. quiet = False ):
  252. if not (cfg.quiet or silent or quiet):
  253. cfg._util.qmsg(f'Getting {desc} from file {infile!r}')
  254. with _open_or_die(
  255. (0 if dash and infile == '-' else infile),
  256. 'rb',
  257. silent=silent) as fp:
  258. data = fp.read(cfg.max_input_size+1)
  259. if not binary:
  260. data = data.decode()
  261. if len(data) == cfg.max_input_size + 1:
  262. die( 'MaxInputSizeExceeded',
  263. f'Too much input data! Max input data size: {cfg.max_input_size} bytes' )
  264. return data
  265. def get_lines_from_file(
  266. cfg,
  267. fn,
  268. desc = 'data',
  269. trim_comments = False,
  270. quiet = False,
  271. silent = False ):
  272. def decrypt_file_maybe():
  273. data = get_data_from_file( cfg, fn, desc=desc, binary=True, quiet=quiet, silent=silent )
  274. from .crypto import Crypto
  275. have_enc_ext = get_extension(fn) == Crypto.mmenc_ext
  276. if have_enc_ext or not is_utf8(data):
  277. m = ('Attempting to decrypt','Decrypting')[have_enc_ext]
  278. cfg._util.qmsg(f'{m} {desc} {fn!r}')
  279. data = Crypto(cfg).mmgen_decrypt_retry(data,desc)
  280. return data
  281. lines = decrypt_file_maybe().decode().splitlines()
  282. if trim_comments:
  283. lines = strip_comments(lines)
  284. cfg._util.dmsg(f'Got {len(lines)} lines from file {fn!r}')
  285. return lines