util2.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. util2: Less frequently-used variables, classes and utility functions for the MMGen suite
  12. """
  13. import sys, re, time
  14. from .util import msg, suf, hexdigits, die
  15. def die_wait(delay, ev=0, s=''):
  16. assert isinstance(delay, int)
  17. assert isinstance(ev, int)
  18. if s:
  19. msg(s)
  20. time.sleep(delay)
  21. sys.exit(ev)
  22. def die_pause(ev=0, s=''):
  23. assert isinstance(ev, int)
  24. if s:
  25. msg(s)
  26. input('Press ENTER to exit')
  27. sys.exit(ev)
  28. def cffi_override_fixup():
  29. from cffi import FFI
  30. class FFI_override:
  31. def cdef(self, csource, *, override=False, packed=False, pack=None):
  32. self._cdef(csource, override=True, packed=packed, pack=pack)
  33. FFI.cdef = FFI_override.cdef
  34. # monkey-patch function: makes modules pycryptodome and pycryptodomex available to packages that
  35. # expect them (monero-python, eth-keys), regardless of which one is installed on system
  36. def load_cryptodome(called=[]):
  37. if not called:
  38. cffi_override_fixup()
  39. try:
  40. import Crypto # Crypto == pycryptodome
  41. except ImportError:
  42. try:
  43. import Cryptodome # Crypto == pycryptodome
  44. except ImportError:
  45. die(2, 'Unable to import the ‘pycryptodome’ or ‘pycryptodomex’ package')
  46. else:
  47. sys.modules['Crypto'] = Cryptodome # Crypto == pycryptodome
  48. else:
  49. sys.modules['Cryptodome'] = Crypto # Cryptodome == pycryptodomex
  50. called.append(True)
  51. # called with no arguments by pyethereum.utils:
  52. def get_keccak(cfg=None, cached_ret=[]):
  53. if not cached_ret:
  54. if cfg and cfg.use_internal_keccak_module:
  55. cfg._util.qmsg('Using internal keccak module by user request')
  56. from .contrib.keccak import keccak_256
  57. else:
  58. load_cryptodome()
  59. from Crypto.Hash import keccak
  60. def keccak_256(data):
  61. return keccak.new(data=data, digest_bytes=32)
  62. cached_ret.append(keccak_256)
  63. return cached_ret[0]
  64. # From 'man dd':
  65. # c=1, w=2, b=512, kB=1000, K=1024, MB=1000*1000, M=1024*1024,
  66. # GB=1000*1000*1000, G=1024*1024*1024, and so on for T, P, E, Z, Y.
  67. bytespec_map = (
  68. ('c', 1),
  69. ('w', 2),
  70. ('b', 512),
  71. ('kB', 1000),
  72. ('K', 1024),
  73. ('MB', 1000000),
  74. ('M', 1048576),
  75. ('GB', 1000000000),
  76. ('G', 1073741824),
  77. ('TB', 1000000000000),
  78. ('T', 1099511627776),
  79. ('PB', 1000000000000000),
  80. ('P', 1125899906842624),
  81. ('EB', 1000000000000000000),
  82. ('E', 1152921504606846976),
  83. )
  84. def int2bytespec(n, spec, fmt, *, print_sym=True, strip=False, add_space=False):
  85. def spec2int(spec):
  86. for k, v in bytespec_map:
  87. if k == spec:
  88. return v
  89. die(1, f'{spec!r}: unrecognized bytespec')
  90. ret = f'{n/spec2int(spec):{fmt}f}'
  91. if strip:
  92. ret = ret.rstrip('0')
  93. return (
  94. ret
  95. + ('0' if ret.endswith('.') else '')
  96. + ((' ' if add_space else '') + spec if print_sym else ''))
  97. else:
  98. return (
  99. ret
  100. + ((' ' if add_space else '') + spec if print_sym else ''))
  101. def parse_bytespec(nbytes):
  102. m = re.match(r'([0123456789.]+)(.*)', nbytes)
  103. if m:
  104. if m.group(2):
  105. for k, v in bytespec_map:
  106. if k == m.group(2):
  107. from decimal import Decimal
  108. return int(Decimal(m.group(1)) * v)
  109. msg("Valid byte specifiers: '{}'".format("' '".join([i[0] for i in bytespec_map])))
  110. elif '.' in nbytes:
  111. raise ValueError('fractional bytes not allowed')
  112. else:
  113. return int(nbytes)
  114. die(1, f'{nbytes!r}: invalid byte specifier')
  115. def format_elapsed_days_hr(t, *, now=None, cached={}):
  116. e = int((now or time.time()) - t)
  117. if not e in cached:
  118. days = abs(e) // 86400
  119. cached[e] = f'{days} day{suf(days)} ' + ('ago' if e > 0 else 'in the future')
  120. return cached[e]
  121. def format_elapsed_hr(
  122. t,
  123. *,
  124. now = None,
  125. cached = {},
  126. rel_now = True,
  127. show_secs = False,
  128. future_msg = 'in the future'):
  129. e = int((now or time.time()) - t)
  130. key = f'{e}:{rel_now}:{show_secs}'
  131. if not key in cached:
  132. def add_suffix():
  133. return (
  134. ((' ago' if rel_now else '') if e > 0 else
  135. (f' {future_msg}' if rel_now else ' (negative elapsed)'))
  136. if (abs_e if show_secs else abs_e // 60) else
  137. ('just now' if rel_now else ('0 ' + ('seconds' if show_secs else 'minutes')))
  138. )
  139. abs_e = abs(e)
  140. data = (
  141. ('day', abs_e // 86400),
  142. ('hour', abs_e // 3600 % 24),
  143. ('minute', abs_e // 60 % 60),
  144. ('second', abs_e % 60),
  145. ) if show_secs else (
  146. ('day', abs_e // 86400),
  147. ('hour', abs_e // 3600 % 24),
  148. ('minute', abs_e // 60 % 60),
  149. )
  150. cached[key] = ' '.join(f'{n} {desc}{suf(n)}' for desc, n in data if n) + add_suffix()
  151. return cached[key]
  152. def pretty_format(s, *, width=80, pfx=''):
  153. out = []
  154. while s:
  155. if len(s) <= width:
  156. out.append(s)
  157. break
  158. i = s[:width].rfind(' ')
  159. out.append(s[:i])
  160. s = s[i+1:]
  161. return pfx + ('\n'+pfx).join(out)
  162. def block_format(data, *, gw=2, cols=8, line_nums=None, data_is_hex=False):
  163. assert line_nums in (None, 'hex', 'dec'), "'line_nums' must be one of None, 'hex' or 'dec'"
  164. ln_fs = '{:06x}: ' if line_nums == 'hex' else '{:06}: '
  165. bytes_per_chunk = gw
  166. if data_is_hex:
  167. gw *= 2
  168. nchunks = len(data)//gw + bool(len(data)%gw)
  169. return ''.join(
  170. ('' if (line_nums is None or i % cols) else ln_fs.format(i*bytes_per_chunk))
  171. + data[i*gw:i*gw+gw]
  172. + (' ' if (not cols or (i+1) % cols) else '\n')
  173. for i in range(nchunks)
  174. ).rstrip() + '\n'
  175. def pretty_hexdump(data, *, gw=2, cols=8, line_nums=None):
  176. return block_format(data.hex(), gw=gw, cols=cols, line_nums=line_nums, data_is_hex=True)
  177. def decode_pretty_hexdump(data):
  178. pat = re.compile(fr'^[{hexdigits}]+:\s+')
  179. lines = [pat.sub('', line) for line in data.splitlines()]
  180. try:
  181. return bytes.fromhex(''.join((''.join(lines).split())))
  182. except:
  183. msg('Data not in hexdump format')
  184. return False
  185. def cliargs_convert(args):
  186. # return str instead of float for input into JSON-RPC
  187. def float_parser(n):
  188. return n
  189. import json
  190. def gen():
  191. for arg in args:
  192. try:
  193. yield json.loads(arg, parse_float=float_parser) # list, dict, bool, int, null, float
  194. except json.decoder.JSONDecodeError:
  195. yield arg # arbitrary string
  196. return tuple(gen())
  197. def port_in_use(port):
  198. import socket
  199. try:
  200. socket.create_connection(('localhost', port)).close()
  201. except:
  202. return False
  203. else:
  204. return True
  205. class ExpInt(int):
  206. 'encode or parse an integer in exponential notation with specified precision'
  207. max_prec = 10
  208. def __new__(cls, spec, *, prec):
  209. assert 0 < prec < cls.max_prec
  210. cls.prec = prec
  211. from .util import is_int
  212. if is_int(spec):
  213. return int.__new__(cls, spec)
  214. else:
  215. assert isinstance(spec, str), f'ExpInt: {spec!r}: not a string!'
  216. assert len(spec) >= 3, f'ExpInt: {spec!r}: invalid specifier'
  217. val, exp = spec.split('e')
  218. assert is_int(val) and is_int(exp)
  219. return int.__new__(cls, val + '0' * int(exp))
  220. @property
  221. def trunc(self):
  222. s = str(self)
  223. return int(s[:self.prec] + '0' * (len(s) - self.prec))
  224. @property
  225. def enc(self):
  226. s = str(self)
  227. s_len = len(s)
  228. digits = s[:min(s_len, self.prec)].rstrip('0')
  229. ret = '{}e{}'.format(digits, s_len - len(digits))
  230. return ret if len(ret) < s_len else s