fileutil.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. tool/fileutil.py: File routines for the 'mmgen-tool' utility
  20. """
  21. import os
  22. from .common import tool_cmd_base
  23. from ..util import msg,msg_r,qmsg,die,suf,parse_bytespec,make_full_path
  24. from ..crypto import get_random,aesctr_iv_len
  25. class tool_cmd(tool_cmd_base):
  26. "file utilities"
  27. def find_incog_data(self,filename:str,incog_id:str,keep_searching=False):
  28. "Use an Incog ID to find hidden incognito wallet data"
  29. from hashlib import sha256
  30. from ..globalvars import g
  31. ivsize,bsize,mod = ( aesctr_iv_len, 4096, 4096*8 )
  32. n,carry = 0,b' '*ivsize
  33. flgs = os.O_RDONLY|os.O_BINARY if g.platform == 'win' else os.O_RDONLY
  34. f = os.open(filename,flgs)
  35. for ch in incog_id:
  36. if ch not in '0123456789ABCDEF':
  37. die(2,f'{incog_id!r}: invalid Incog ID')
  38. while True:
  39. d = os.read(f,bsize)
  40. if not d:
  41. break
  42. d = carry + d
  43. for i in range(bsize):
  44. if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == incog_id:
  45. if n+i < ivsize:
  46. continue
  47. msg(f'\rIncog data for ID {incog_id} found at offset {n+i-ivsize}')
  48. if not keep_searching:
  49. import sys
  50. sys.exit(0)
  51. carry = d[len(d)-ivsize:]
  52. n += bsize
  53. if not n % mod:
  54. msg_r(f'\rSearched: {n} bytes')
  55. msg('')
  56. os.close(f)
  57. return True
  58. def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False):
  59. "write 'n' bytes of random data to specified file"
  60. from threading import Thread
  61. from queue import Queue
  62. from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes
  63. from cryptography.hazmat.backends import default_backend
  64. from ..opts import opt
  65. def encrypt_worker(wid):
  66. ctr_init_val = os.urandom( aesctr_iv_len )
  67. c = Cipher( algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend() )
  68. encryptor = c.encryptor()
  69. while True:
  70. q2.put( encryptor.update(q1.get()) )
  71. q1.task_done()
  72. def output_worker():
  73. while True:
  74. f.write( q2.get() )
  75. q2.task_done()
  76. nbytes = parse_bytespec(nbytes)
  77. if opt.outdir:
  78. outfile = make_full_path( opt.outdir, outfile )
  79. f = open(outfile,'wb')
  80. key = get_random(32)
  81. q1,q2 = ( Queue(), Queue() )
  82. for i in range(max(1,threads-2)):
  83. t = Thread( target=encrypt_worker, args=[i] )
  84. t.daemon = True
  85. t.start()
  86. t = Thread( target=output_worker )
  87. t.daemon = True
  88. t.start()
  89. blk_size = 1024 * 1024
  90. for i in range(nbytes // blk_size):
  91. if not i % 4:
  92. msg_r(f'\rRead: {i * blk_size} bytes')
  93. q1.put( os.urandom(blk_size) )
  94. if nbytes % blk_size:
  95. q1.put( os.urandom(nbytes % blk_size) )
  96. q1.join()
  97. q2.join()
  98. f.close()
  99. fsize = os.stat(outfile).st_size
  100. if fsize != nbytes:
  101. die(3,f'{fsize}: incorrect random file size (should be {nbytes})')
  102. if not silent:
  103. msg(f'\rRead: {nbytes} bytes')
  104. qmsg(f'\r{nbytes} byte{suf(nbytes)} of random data written to file {outfile!r}')
  105. return True
  106. def extract_key_from_geth_wallet( self, wallet_file:str, check_addr=True ):
  107. "decrypt the encrypted private key in a Geth keystore wallet, returning the decrypted key"
  108. from ..util import line_input
  109. from ..opts import opt
  110. from ..base_proto.ethereum.misc import extract_key_from_geth_keystore_wallet
  111. passwd = line_input( 'Enter passphrase: ', echo=opt.echo_passphrase ).strip().encode()
  112. return extract_key_from_geth_keystore_wallet( wallet_file, passwd, check_addr ).hex()