util2.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. util2: Less frequently-used variables, classes and utility functions for the MMGen suite
  12. """
  13. import sys, re, time
  14. from .util import msg, suf, hexdigits, die
  15. def die_wait(delay, ev=0, s=''):
  16. assert isinstance(delay, int)
  17. assert isinstance(ev, int)
  18. if s:
  19. msg(s)
  20. time.sleep(delay)
  21. sys.exit(ev)
  22. def die_pause(ev=0, s=''):
  23. assert isinstance(ev, int)
  24. if s:
  25. msg(s)
  26. input('Press ENTER to exit')
  27. sys.exit(ev)
  28. def load_fake_cryptodome():
  29. import hashlib
  30. try:
  31. hashlib.new('keccak-256')
  32. except ValueError:
  33. return False
  34. class FakeHash:
  35. class keccak:
  36. def new(data=b'', digest_bits=256):
  37. assert digest_bits == 256
  38. return hashlib.new('keccak-256', data=data)
  39. sys.modules['Cryptodome.Hash'] = FakeHash
  40. sys.modules['Crypto.Hash'] = FakeHash
  41. return True
  42. def cffi_override_fixup():
  43. from cffi import FFI
  44. class FFI_override:
  45. def cdef(self, csource, *, override=False, packed=False, pack=None):
  46. self._cdef(csource, override=True, packed=packed, pack=pack)
  47. FFI.cdef = FFI_override.cdef
  48. # monkey-patch function: makes modules pycryptodome and pycryptodomex available to packages that
  49. # expect them for the keccak256 function (monero-python, eth-keys), regardless of which one is
  50. # installed on the system
  51. #
  52. # if the hashlib keccak256 function is available (>=OpenSSL 3.2, >=Python 3.13), it’s used instead
  53. # and loaded as Crypto[dome].Hash via load_fake_cryptodome()
  54. def load_cryptodome(called=[]):
  55. if not called:
  56. if not load_fake_cryptodome():
  57. cffi_override_fixup()
  58. try:
  59. import Crypto # Crypto == pycryptodome
  60. except ImportError:
  61. try:
  62. import Cryptodome # Crypto == pycryptodome
  63. except ImportError:
  64. die(2, 'Unable to import the ‘pycryptodome’ or ‘pycryptodomex’ package')
  65. else:
  66. sys.modules['Crypto'] = Cryptodome # Crypto == pycryptodome
  67. else:
  68. sys.modules['Cryptodome'] = Crypto # Cryptodome == pycryptodomex
  69. called.append(True)
  70. def get_hashlib_keccak():
  71. import hashlib
  72. try:
  73. hashlib.new('keccak-256')
  74. except ValueError:
  75. return False
  76. return lambda data: hashlib.new('keccak-256', data)
  77. # called with no arguments by proto.eth.tx.transaction:
  78. def get_keccak(cfg=None, cached_ret=[]):
  79. if not cached_ret:
  80. if cfg and cfg.use_internal_keccak_module:
  81. cfg._util.qmsg('Using internal keccak module by user request')
  82. from .contrib.keccak import keccak_256
  83. elif not (keccak_256 := get_hashlib_keccak()):
  84. load_cryptodome()
  85. from Crypto.Hash import keccak
  86. keccak_256 = lambda data: keccak.new(data=data, digest_bytes=32)
  87. cached_ret.append(keccak_256)
  88. return cached_ret[0]
  89. # From 'man dd':
  90. # c=1, w=2, b=512, kB=1000, K=1024, MB=1000*1000, M=1024*1024,
  91. # GB=1000*1000*1000, G=1024*1024*1024, and so on for T, P, E, Z, Y.
  92. bytespec_map = (
  93. ('c', 1),
  94. ('w', 2),
  95. ('b', 512),
  96. ('kB', 1000),
  97. ('K', 1024),
  98. ('MB', 1000000),
  99. ('M', 1048576),
  100. ('GB', 1000000000),
  101. ('G', 1073741824),
  102. ('TB', 1000000000000),
  103. ('T', 1099511627776),
  104. ('PB', 1000000000000000),
  105. ('P', 1125899906842624),
  106. ('EB', 1000000000000000000),
  107. ('E', 1152921504606846976),
  108. )
  109. def int2bytespec(n, spec, fmt, *, print_sym=True, strip=False, add_space=False):
  110. def spec2int(spec):
  111. for k, v in bytespec_map:
  112. if k == spec:
  113. return v
  114. die(1, f'{spec!r}: unrecognized bytespec')
  115. ret = f'{n/spec2int(spec):{fmt}f}'
  116. if strip:
  117. ret = ret.rstrip('0')
  118. return (
  119. ret
  120. + ('0' if ret.endswith('.') else '')
  121. + ((' ' if add_space else '') + spec if print_sym else ''))
  122. else:
  123. return (
  124. ret
  125. + ((' ' if add_space else '') + spec if print_sym else ''))
  126. def parse_bytespec(nbytes):
  127. if m := re.match(r'([0123456789.]+)(.*)', nbytes):
  128. if m.group(2):
  129. for k, v in bytespec_map:
  130. if k == m.group(2):
  131. from decimal import Decimal
  132. return int(Decimal(m.group(1)) * v)
  133. msg("Valid byte specifiers: '{}'".format("' '".join([i[0] for i in bytespec_map])))
  134. elif '.' in nbytes:
  135. raise ValueError('fractional bytes not allowed')
  136. else:
  137. return int(nbytes)
  138. die(1, f'{nbytes!r}: invalid byte specifier')
  139. def format_elapsed_days_hr(t, *, now=None, cached={}):
  140. e = int((now or time.time()) - t)
  141. if not e in cached:
  142. days = abs(e) // 86400
  143. cached[e] = f'{days} day{suf(days)} ' + ('ago' if e > 0 else 'in the future')
  144. return cached[e]
  145. def format_elapsed_hr(
  146. t,
  147. *,
  148. now = None,
  149. cached = {},
  150. rel_now = True,
  151. show_secs = False,
  152. future_msg = 'in the future'):
  153. e = int((now or time.time()) - t)
  154. key = f'{e}:{rel_now}:{show_secs}'
  155. if not key in cached:
  156. def add_suffix():
  157. return (
  158. ((' ago' if rel_now else '') if e > 0 else
  159. (f' {future_msg}' if rel_now else ' (negative elapsed)'))
  160. if (abs_e if show_secs else abs_e // 60) else
  161. ('just now' if rel_now else ('0 ' + ('seconds' if show_secs else 'minutes')))
  162. )
  163. abs_e = abs(e)
  164. data = (
  165. ('day', abs_e // 86400),
  166. ('hour', abs_e // 3600 % 24),
  167. ('minute', abs_e // 60 % 60),
  168. ('second', abs_e % 60),
  169. ) if show_secs else (
  170. ('day', abs_e // 86400),
  171. ('hour', abs_e // 3600 % 24),
  172. ('minute', abs_e // 60 % 60),
  173. )
  174. cached[key] = ' '.join(f'{n} {desc}{suf(n)}' for desc, n in data if n) + add_suffix()
  175. return cached[key]
  176. def pretty_format(s, *, width=80, pfx=''):
  177. out = []
  178. while s:
  179. if len(s) <= width:
  180. out.append(s)
  181. break
  182. i = s[:width].rfind(' ')
  183. out.append(s[:i])
  184. s = s[i+1:]
  185. return pfx + ('\n'+pfx).join(out)
  186. def block_format(data, *, gw=2, cols=8, line_nums=None, data_is_hex=False):
  187. assert line_nums in (None, 'hex', 'dec'), "'line_nums' must be one of None, 'hex' or 'dec'"
  188. ln_fs = '{:06x}: ' if line_nums == 'hex' else '{:06}: '
  189. bytes_per_chunk = gw
  190. if data_is_hex:
  191. gw *= 2
  192. nchunks = len(data)//gw + bool(len(data)%gw)
  193. return ''.join(
  194. ('' if (line_nums is None or i % cols) else ln_fs.format(i*bytes_per_chunk))
  195. + data[i*gw:i*gw+gw]
  196. + (' ' if (not cols or (i+1) % cols) else '\n')
  197. for i in range(nchunks)
  198. ).rstrip() + '\n'
  199. def pretty_hexdump(data, *, gw=2, cols=8, line_nums=None):
  200. return block_format(data.hex(), gw=gw, cols=cols, line_nums=line_nums, data_is_hex=True)
  201. def decode_pretty_hexdump(data):
  202. pat = re.compile(fr'^[{hexdigits}]+:\s+')
  203. lines = [pat.sub('', line) for line in data.splitlines()]
  204. try:
  205. return bytes.fromhex(''.join((''.join(lines).split())))
  206. except:
  207. msg('Data not in hexdump format')
  208. return False
  209. def cliargs_convert(args):
  210. # return str instead of float for input into JSON-RPC
  211. def float_parser(n):
  212. return n
  213. import json
  214. def gen():
  215. for arg in args:
  216. try:
  217. yield json.loads(arg, parse_float=float_parser) # list, dict, bool, int, null, float
  218. except json.decoder.JSONDecodeError:
  219. yield arg # arbitrary string
  220. return tuple(gen())
  221. def port_in_use(port):
  222. import socket
  223. try:
  224. socket.create_connection(('localhost', port)).close()
  225. except:
  226. return False
  227. else:
  228. return True
  229. class ExpInt(int):
  230. 'encode or parse an integer in exponential notation with specified precision'
  231. max_prec = 10
  232. def __new__(cls, spec, *, prec):
  233. assert 0 < prec < cls.max_prec
  234. cls.prec = prec
  235. from .util import is_int
  236. if is_int(spec):
  237. return int.__new__(cls, spec)
  238. else:
  239. assert isinstance(spec, str), f'ExpInt: {spec!r}: not a string!'
  240. assert len(spec) >= 3, f'ExpInt: {spec!r}: invalid specifier'
  241. val, exp = spec.split('e')
  242. assert is_int(val) and is_int(exp)
  243. return int.__new__(cls, val + '0' * int(exp))
  244. @property
  245. def trunc(self):
  246. s = str(self)
  247. return int(s[:self.prec] + '0' * (len(s) - self.prec))
  248. @property
  249. def enc(self):
  250. s = str(self)
  251. s_len = len(s)
  252. digits = s[:min(s_len, self.prec)].rstrip('0')
  253. ret = '{}e{}'.format(digits, s_len - len(digits))
  254. return ret if len(ret) < s_len else s