fileutil.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. tool.fileutil: File routines for the 'mmgen-tool' utility
  20. """
  21. import sys,os
  22. from .common import tool_cmd_base
  23. from ..util import msg,msg_r,die,suf,make_full_path
  24. from ..crypto import Crypto
  25. class tool_cmd(tool_cmd_base):
  26. "file utilities"
  27. def find_incog_data(self,
  28. filename: str,
  29. incog_id: str,
  30. keep_searching: 'continue search after finding data (ID collisions can yield false positives)' = False):
  31. "Use an Incog ID to find hidden incognito wallet data"
  32. from hashlib import sha256
  33. ivsize,bsize,mod = ( Crypto.aesctr_iv_len, 4096, 4096*8 )
  34. n,carry = 0,b' '*ivsize
  35. flgs = os.O_RDONLY|os.O_BINARY if sys.platform == 'win32' else os.O_RDONLY
  36. f = os.open(filename,flgs)
  37. for ch in incog_id:
  38. if ch not in '0123456789ABCDEF':
  39. die(2,f'{incog_id!r}: invalid Incog ID')
  40. while True:
  41. d = os.read(f,bsize)
  42. if not d:
  43. break
  44. d = carry + d
  45. for i in range(bsize):
  46. if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == incog_id:
  47. if n+i < ivsize:
  48. continue
  49. msg(f'\rIncog data for ID {incog_id} found at offset {n+i-ivsize}')
  50. if not keep_searching:
  51. sys.exit(0)
  52. carry = d[len(d)-ivsize:]
  53. n += bsize
  54. if not n % mod:
  55. msg_r(f'\rSearched: {n} bytes')
  56. msg('')
  57. os.close(f)
  58. return True
  59. def rand2file(self,outfile:str,nbytes:str,threads=4,silent=False):
  60. """
  61. write ‘nbytes’ bytes of random data to specified file (dd-style byte specifiers supported)
  62. Valid specifiers:
  63. c = 1
  64. w = 2
  65. b = 512
  66. kB = 1000
  67. K = 1024
  68. MB = 1000000
  69. M = 1048576
  70. GB = 1000000000
  71. G = 1073741824
  72. TB = 1000000000000
  73. T = 1099511627776
  74. PB = 1000000000000000
  75. P = 1125899906842624
  76. EB = 1000000000000000000
  77. E = 1152921504606846976
  78. """
  79. from threading import Thread
  80. from queue import Queue
  81. from cryptography.hazmat.primitives.ciphers import Cipher,algorithms,modes
  82. from cryptography.hazmat.backends import default_backend
  83. from ..util2 import parse_bytespec
  84. def encrypt_worker():
  85. ctr_init_val = os.urandom( Crypto.aesctr_iv_len )
  86. c = Cipher( algorithms.AES(key), modes.CTR(ctr_init_val), backend=default_backend() )
  87. encryptor = c.encryptor()
  88. while True:
  89. q2.put( encryptor.update(q1.get()) )
  90. q1.task_done()
  91. def output_worker():
  92. while True:
  93. f.write( q2.get() )
  94. q2.task_done()
  95. nbytes = parse_bytespec(nbytes)
  96. if self.cfg.outdir:
  97. outfile = make_full_path( self.cfg.outdir, outfile )
  98. f = open(outfile,'wb')
  99. key = Crypto(self.cfg).get_random(32)
  100. q1,q2 = ( Queue(), Queue() )
  101. for i in range(max(1,threads-2)):
  102. t = Thread(target=encrypt_worker)
  103. t.daemon = True
  104. t.start()
  105. t = Thread( target=output_worker )
  106. t.daemon = True
  107. t.start()
  108. blk_size = 1024 * 1024
  109. for i in range(nbytes // blk_size):
  110. if not i % 4:
  111. msg_r(f'\rRead: {i * blk_size} bytes')
  112. q1.put( os.urandom(blk_size) )
  113. if nbytes % blk_size:
  114. q1.put( os.urandom(nbytes % blk_size) )
  115. q1.join()
  116. q2.join()
  117. f.close()
  118. fsize = os.stat(outfile).st_size
  119. if fsize != nbytes:
  120. die(3,f'{fsize}: incorrect random file size (should be {nbytes})')
  121. if not silent:
  122. msg(f'\rRead: {nbytes} bytes')
  123. self.cfg._util.qmsg(f'\r{nbytes} byte{suf(nbytes)} of random data written to file {outfile!r}')
  124. return True
  125. def extract_key_from_geth_wallet( self, wallet_file:str, check_addr=True ):
  126. "decrypt the encrypted private key in a Geth keystore wallet, returning the decrypted key"
  127. from ..ui import line_input
  128. from ..proto.eth.misc import extract_key_from_geth_keystore_wallet
  129. passwd = line_input( self.cfg, 'Enter passphrase: ', echo=self.cfg.echo_passphrase ).strip().encode()
  130. return extract_key_from_geth_keystore_wallet( self.cfg, wallet_file, passwd, check_addr ).hex()