util.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. util: Frequently-used variables, classes and utility functions for the MMGen suite
  20. """
  21. import sys, os, time, re
  22. from .color import red, yellow, green, blue, purple
  23. from .cfg import gv
  24. ascii_lowercase = 'abcdefghijklmnopqrstuvwxyz'
  25. digits = '0123456789'
  26. hexdigits = '0123456789abcdefABCDEF'
  27. hexdigits_uc = '0123456789ABCDEF'
  28. hexdigits_lc = '0123456789abcdef'
  29. def noop(*args, **kwargs):
  30. pass
  31. class Util:
  32. def __init__(self, cfg):
  33. self.cfg = cfg
  34. if cfg.quiet:
  35. self.qmsg = self.qmsg_r = noop
  36. else:
  37. self.qmsg = msg
  38. self.qmsg_r = msg_r
  39. if cfg.verbose:
  40. self.vmsg = msg
  41. self.vmsg_r = msg_r
  42. self.Vmsg = Msg
  43. self.Vmsg_r = Msg_r
  44. else:
  45. self.vmsg = self.vmsg_r = self.Vmsg = self.Vmsg_r = noop
  46. self.dmsg = msg if cfg.debug else noop
  47. if cfg.pager:
  48. from .ui import do_pager
  49. self.stdout_or_pager = do_pager
  50. else:
  51. self.stdout_or_pager = Msg_r
  52. def compare_chksums(
  53. self,
  54. chk1,
  55. desc1,
  56. chk2,
  57. desc2,
  58. *,
  59. hdr = '',
  60. die_on_fail = False,
  61. verbose = False):
  62. if not chk1 == chk2:
  63. fs = "{} ERROR: {} checksum ({}) doesn't match {} checksum ({})"
  64. m = fs.format((hdr+':\n ' if hdr else 'CHECKSUM'), desc2, chk2, desc1, chk1)
  65. if die_on_fail:
  66. die(3, m)
  67. else:
  68. if verbose or self.cfg.verbose:
  69. msg(m)
  70. return False
  71. if self.cfg.verbose:
  72. msg(f'{capfirst(desc1)} checksum OK ({chk1})')
  73. return True
  74. def compare_or_die(self, val1, desc1, val2, desc2, *, e='Error'):
  75. if val1 != val2:
  76. die(3, f"{e}: {desc2} ({val2}) doesn't match {desc1} ({val1})")
  77. if self.cfg.debug:
  78. msg(f'{capfirst(desc2)} OK ({val2})')
  79. return True
  80. if sys.platform == 'win32':
  81. def msg_r(s):
  82. try:
  83. gv.stderr.write(s)
  84. gv.stderr.flush()
  85. except:
  86. os.write(2, s.encode())
  87. def msg(s):
  88. msg_r(s + '\n')
  89. def Msg_r(s):
  90. try:
  91. gv.stdout.write(s)
  92. gv.stdout.flush()
  93. except:
  94. os.write(1, s.encode())
  95. def Msg(s):
  96. Msg_r(s + '\n')
  97. else:
  98. def msg(s):
  99. gv.stderr.write(s + '\n')
  100. def msg_r(s):
  101. gv.stderr.write(s)
  102. gv.stderr.flush()
  103. def Msg(s):
  104. gv.stdout.write(s + '\n')
  105. def Msg_r(s):
  106. gv.stdout.write(s)
  107. gv.stdout.flush()
  108. def rmsg(s):
  109. msg(red(s))
  110. def ymsg(s):
  111. msg(yellow(s))
  112. def gmsg(s):
  113. msg(green(s))
  114. def gmsg_r(s):
  115. msg_r(green(s))
  116. def bmsg(s):
  117. msg(blue(s))
  118. def pumsg(s):
  119. msg(purple(s))
  120. def mmsg(*args):
  121. for d in args:
  122. Msg(repr(d))
  123. def mdie(*args):
  124. mmsg(*args)
  125. sys.exit(0)
  126. def die(ev, s='', *, stdout=False):
  127. if isinstance(ev, int):
  128. from .exception import MMGenSystemExit, MMGenError
  129. if ev <= 2:
  130. raise MMGenSystemExit(ev, s, stdout)
  131. else:
  132. raise MMGenError(ev, s, stdout)
  133. elif isinstance(ev, str):
  134. from . import exception
  135. raise getattr(exception, ev)(s)
  136. else:
  137. raise ValueError(f'{ev}: exit value must be string or int instance')
  138. def Die(ev=0, s=''):
  139. die(ev=ev, s=s, stdout=True)
  140. def pp_fmt(d):
  141. import pprint
  142. return pprint.PrettyPrinter().pformat(d)
  143. def pp_msg(d):
  144. msg(pp_fmt(d))
  145. def indent(s, *, indent=' ', append='\n'):
  146. "indent multiple lines of text with specified string"
  147. return indent + ('\n'+indent).join(s.strip().splitlines()) + append
  148. def fmt(s, *, indent='', strip_char=None, append='\n'):
  149. "de-indent multiple lines of text, or indent with specified string"
  150. return indent + ('\n'+indent).join([l.lstrip(strip_char) for l in s.strip().splitlines()]) + append
  151. def fmt_list(iterable, *, fmt='dfl', indent='', conv=None):
  152. "pretty-format a list"
  153. _conv, sep, lq, rq = {
  154. 'dfl': (str, ", ", "'", "'"),
  155. 'utf8': (str, ", ", "“", "”"),
  156. 'bare': (repr, " ", "", ""),
  157. 'barest': (str, " ", "", ""),
  158. 'fancy': (str, " ", "‘", "’"),
  159. 'no_quotes': (str, ", ", "", ""),
  160. 'compact': (str, ",", "", ""),
  161. 'no_spc': (str, ",", "'", "'"),
  162. 'min': (str, ",", "", ""),
  163. 'repr': (repr, ", ", "", ""),
  164. 'csv': (repr, ",", "", ""),
  165. 'col': (str, "\n", "", ""),
  166. }[fmt]
  167. conv = conv or _conv
  168. return indent + (sep+indent).join(lq+conv(e)+rq for e in iterable)
  169. def fmt_dict(mapping, *, fmt='dfl', kconv=None, vconv=None):
  170. "pretty-format a dict"
  171. kc, vc, sep, fs = {
  172. 'dfl': (str, str, ", ", "'{}' ({})"),
  173. 'dfl_compact': (str, str, " ", "{} ({})"),
  174. 'square': (str, str, ", ", "'{}' [{}]"),
  175. 'square_compact':(str, str, " ", "{} [{}]"),
  176. 'equal': (str, str, ", ", "'{}'={}"),
  177. 'equal_spaced': (str, str, ", ", "'{}' = {}"),
  178. 'equal_compact': (str, str, " ", "{}={}"),
  179. 'kwargs': (str, repr, ", ", "{}={}"),
  180. 'colon': (str, repr, ", ", "{}:{}"),
  181. 'colon_compact': (str, str, " ", "{}:{}"),
  182. }[fmt]
  183. kconv = kconv or kc
  184. vconv = vconv or vc
  185. return sep.join(fs.format(kconv(k), vconv(v)) for k, v in mapping.items())
  186. def list_gen(*data):
  187. """
  188. Generate a list from an arg tuple of sublists
  189. - The last element of each sublist is a condition. If it evaluates to true, the preceding
  190. elements of the sublist are included in the result. Otherwise the sublist is skipped.
  191. - If a sublist contains only one element, the condition defaults to true.
  192. """
  193. assert type(data) in (list, tuple), f'{type(data).__name__} not in (list, tuple)'
  194. def gen():
  195. for d in data:
  196. assert isinstance(d, list), f'{type(d).__name__} != list'
  197. if len(d) == 1:
  198. yield d[0]
  199. elif d[-1]:
  200. for idx in range(len(d)-1):
  201. yield d[idx]
  202. return list(gen())
  203. def remove_dups(iterable, *, edesc='element', desc='list', quiet=False, hide=False):
  204. """
  205. Remove duplicate occurrences of iterable elements, preserving first occurrence
  206. If iterable is a generator, return a list, else type(iterable)
  207. """
  208. ret = []
  209. for e in iterable:
  210. if e in ret:
  211. if not quiet:
  212. ymsg(f'Warning: removing duplicate {edesc} {"(hidden)" if hide else e} in {desc}')
  213. else:
  214. ret.append(e)
  215. return ret if type(iterable).__name__ == 'generator' else type(iterable)(ret)
  216. def contains_any(target_list, source_list):
  217. return any(map(target_list.count, source_list))
  218. def suf(arg, suf_type='s', *, verb='none'):
  219. suf_types = {
  220. 'none': {
  221. 's': ('s', ''),
  222. 'es': ('es', ''),
  223. 'ies': ('ies', 'y'),
  224. },
  225. 'is': {
  226. 's': ('s are', ' is'),
  227. 'es': ('es are', ' is'),
  228. 'ies': ('ies are', 'y is'),
  229. },
  230. 'has': {
  231. 's': ('s have', ' has'),
  232. 'es': ('es have', ' has'),
  233. 'ies': ('ies have', 'y has'),
  234. },
  235. }
  236. if isinstance(arg, int):
  237. n = arg
  238. elif isinstance(arg, (list, tuple, set, dict)):
  239. n = len(arg)
  240. else:
  241. die(2, f'{arg}: invalid parameter for suf()')
  242. return suf_types[verb][suf_type][n == 1]
  243. def get_extension(fn):
  244. return os.path.splitext(fn)[1][1:]
  245. def remove_extension(fn, ext):
  246. a, b = os.path.splitext(fn)
  247. return a if b[1:] == ext else fn
  248. def make_chksum_N(s, nchars, *, sep=False, rounds=2, upper=True):
  249. if isinstance(s, str):
  250. s = s.encode()
  251. from hashlib import sha256
  252. for i in range(rounds):
  253. s = sha256(s).digest()
  254. ret = s.hex()[:nchars]
  255. if sep:
  256. assert 4 <= nchars <= 64 and (not nchars % 4), 'illegal ‘nchars’ value'
  257. ret = ' '.join(ret[i:i+4] for i in range(0, nchars, 4))
  258. else:
  259. assert 4 <= nchars <= 64, 'illegal ‘nchars’ value'
  260. return ret.upper() if upper else ret
  261. def make_chksum_8(s, *, sep=False):
  262. from .obj import HexStr
  263. from hashlib import sha256
  264. s = HexStr(sha256(sha256(s).digest()).hexdigest()[:8].upper(), case='upper')
  265. return '{} {}'.format(s[:4], s[4:]) if sep else s
  266. def make_chksum_6(s):
  267. from .obj import HexStr
  268. from hashlib import sha256
  269. if isinstance(s, str):
  270. s = s.encode()
  271. return HexStr(sha256(s).hexdigest()[:6])
  272. def is_chksum_6(s):
  273. return len(s) == 6 and set(s) <= set(hexdigits_lc)
  274. def split_into_cols(col_wid, s):
  275. return ' '.join([s[col_wid*i:col_wid*(i+1)] for i in range(len(s)//col_wid+1)]).rstrip()
  276. def capfirst(s): # different from str.capitalize() - doesn't downcase any uc in string
  277. return s if len(s) == 0 else s[0].upper() + s[1:]
  278. def decode_timestamp(s):
  279. # tz_save = open('/etc/timezone').read().rstrip()
  280. os.environ['TZ'] = 'UTC'
  281. # os.environ['TZ'] = tz_save
  282. return int(time.mktime(time.strptime(s, '%Y%m%d_%H%M%S')))
  283. def make_timestamp(secs=None):
  284. return '{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}'.format(*time.gmtime(
  285. int(secs) if secs is not None else time.time())[:6])
  286. def make_timestr(secs=None):
  287. return '{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(*time.gmtime(
  288. int(secs) if secs is not None else time.time())[:6])
  289. def secs_to_dhms(secs):
  290. hrs = secs // 3600
  291. return '{}{:02d}:{:02d}:{:02d} h/m/s'.format(
  292. ('{} day{}, '.format(hrs//24, suf(hrs//24)) if hrs > 24 else ''),
  293. hrs % 24,
  294. (secs // 60) % 60,
  295. secs % 60
  296. )
  297. def secs_to_hms(secs):
  298. return '{:02d}:{:02d}:{:02d}'.format(secs//3600, (secs//60) % 60, secs % 60)
  299. def secs_to_ms(secs):
  300. return '{:02d}:{:02d}'.format(secs//60, secs % 60)
  301. def is_int(s): # actually is_nonnegative_int()
  302. return set(str(s) or 'x') <= set(digits)
  303. def check_int_between(val, imin, imax, *, desc):
  304. if not imin <= int(val) <= imax:
  305. die(1, f'{val}: invalid value for {desc} (must be between {imin} and {imax})')
  306. return int(val)
  307. def check_member(e, iterable, desc, message='unsupported'):
  308. if e not in iterable:
  309. from mmgen.color import yellow
  310. die(1, yellow(f'{e}: {message} {desc} (must be one of {fmt_list(iterable)})'))
  311. def is_hex_str(s):
  312. return set(s) <= set(hexdigits)
  313. def is_hex_str_lc(s):
  314. return set(s) <= set(hexdigits_lc)
  315. def is_utf8(s):
  316. try:
  317. s.decode('utf8')
  318. except:
  319. return False
  320. else:
  321. return True
  322. def remove_whitespace(s, *, ws='\t\r\n '):
  323. return s.translate(dict((ord(e), None) for e in ws))
  324. def strip_comment(line):
  325. return re.sub('#.*', '', line).rstrip()
  326. def strip_comments(lines):
  327. pat = re.compile('#.*')
  328. return [m for m in [pat.sub('', l).rstrip() for l in lines] if m != '']
  329. def make_full_path(outdir, outfile):
  330. return os.path.normpath(os.path.join(outdir, os.path.basename(outfile)))
  331. class oneshot_warning:
  332. color = 'nocolor'
  333. def __init__(self, *, div=None, fmt_args=[], reverse=False):
  334. self.do(type(self), div, fmt_args, reverse)
  335. def do(self, wcls, div, fmt_args, reverse):
  336. def do_warning():
  337. from . import color
  338. msg(getattr(color, getattr(wcls, 'color'))('WARNING: ' + getattr(wcls, 'message').format(*fmt_args)))
  339. if not hasattr(wcls, 'data'):
  340. setattr(wcls, 'data', [])
  341. data = getattr(wcls, 'data')
  342. condition = (div in data) if reverse else (not div in data)
  343. if not div in data:
  344. data.append(div)
  345. if condition:
  346. do_warning()
  347. self.warning_shown = True
  348. else:
  349. self.warning_shown = False
  350. class oneshot_warning_group(oneshot_warning):
  351. def __init__(self, wcls, *, div=None, fmt_args=[], reverse=False):
  352. self.do(getattr(self, wcls), div, fmt_args, reverse)
  353. def get_subclasses(cls, *, names=False):
  354. def gen(cls):
  355. for i in cls.__subclasses__():
  356. yield i
  357. yield from gen(i)
  358. return tuple((c.__name__ for c in gen(cls)) if names else gen(cls))
  359. def async_run(coro):
  360. import asyncio
  361. return asyncio.run(coro)
  362. def wrap_ripemd160(called=[]):
  363. if not called:
  364. try:
  365. import hashlib
  366. hashlib.new('ripemd160')
  367. except ValueError:
  368. def hashlib_new_wrapper(name, *args, **kwargs):
  369. if name == 'ripemd160':
  370. return ripemd160(*args, **kwargs)
  371. else:
  372. return hashlib_new(name, *args, **kwargs)
  373. from .contrib.ripemd160 import ripemd160
  374. hashlib_new = hashlib.new
  375. hashlib.new = hashlib_new_wrapper
  376. called.append(True)
  377. def exit_if_mswin(feature):
  378. if sys.platform == 'win32':
  379. die(2, capfirst(feature) + ' not supported on the MSWin / MSYS2 platform')
  380. def have_sudo(*, silent=False):
  381. from subprocess import run, DEVNULL
  382. redir = DEVNULL if silent else None
  383. try:
  384. run(['sudo', '--non-interactive', 'true'], stdout=redir, stderr=redir, check=True)
  385. return True
  386. except:
  387. return False
  388. def cached_property(orig_func):
  389. @property
  390. def new_func(self):
  391. attr_name = '_' + orig_func.__name__
  392. if not hasattr(self, attr_name):
  393. setattr(self, attr_name, orig_func(self))
  394. return getattr(self, attr_name)
  395. return new_func