util.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. util.py: Low-level routines imported by other modules in the MMGen suite
  20. """
  21. import sys,os,time,re
  22. from .color import *
  23. from .globalvars import g
  24. from .opts import opt
  25. import mmgen.color as color_mod
  26. ascii_lowercase = 'abcdefghijklmnopqrstuvwxyz'
  27. hexdigits = '0123456789abcdefABCDEF'
  28. hexdigits_uc = '0123456789ABCDEF'
  29. hexdigits_lc = '0123456789abcdef'
  30. if g.platform == 'win':
  31. def msg_r(s):
  32. try:
  33. g.stderr.write(s)
  34. g.stderr.flush()
  35. except:
  36. os.write(2,s.encode())
  37. def msg(s):
  38. msg_r(s + '\n')
  39. def Msg_r(s):
  40. try:
  41. g.stdout.write(s)
  42. g.stdout.flush()
  43. except:
  44. os.write(1,s.encode())
  45. def Msg(s):
  46. Msg_r(s + '\n')
  47. else:
  48. def msg(s):
  49. g.stderr.write(s + '\n')
  50. def msg_r(s):
  51. g.stderr.write(s)
  52. g.stderr.flush()
  53. def Msg(s):
  54. g.stdout.write(s + '\n')
  55. def Msg_r(s):
  56. g.stdout.write(s)
  57. g.stdout.flush()
  58. def rmsg(s):
  59. msg(red(s))
  60. def ymsg(s):
  61. msg(yellow(s))
  62. def gmsg(s):
  63. msg(green(s))
  64. def gmsg_r(s):
  65. msg_r(green(s))
  66. def bmsg(s):
  67. msg(blue(s))
  68. def qmsg(s):
  69. if not opt.quiet:
  70. msg(s)
  71. def qmsg_r(s):
  72. if not opt.quiet:
  73. msg_r(s)
  74. def vmsg(s,force=False):
  75. if opt.verbose or force:
  76. msg(s)
  77. def vmsg_r(s,force=False):
  78. if opt.verbose or force:
  79. msg_r(s)
  80. def Vmsg(s,force=False):
  81. if opt.verbose or force:
  82. Msg(s)
  83. def Vmsg_r(s,force=False):
  84. if opt.verbose or force:
  85. Msg_r(s)
  86. def dmsg(s):
  87. if opt.debug:
  88. msg(s)
  89. def mmsg(*args):
  90. for d in args:
  91. Msg(repr(d))
  92. def mdie(*args):
  93. mmsg(*args)
  94. sys.exit(0)
  95. def die(ev,s='',stdout=False):
  96. if isinstance(ev,int):
  97. from .exception import MMGenSystemExit,MMGenError
  98. if ev <= 2:
  99. raise MMGenSystemExit(ev,s,stdout)
  100. else:
  101. raise MMGenError(ev,s,stdout)
  102. elif isinstance(ev,str):
  103. import mmgen.exception
  104. raise getattr(mmgen.exception,ev)(s)
  105. else:
  106. raise ValueError(f'{ev}: exit value must be string or int instance')
  107. def die_wait(delay,ev=0,s=''):
  108. assert isinstance(delay,int)
  109. assert isinstance(ev,int)
  110. if s:
  111. msg(s)
  112. time.sleep(delay)
  113. sys.exit(ev)
  114. def die_pause(ev=0,s=''):
  115. assert isinstance(ev,int)
  116. if s:
  117. msg(s)
  118. input('Press ENTER to exit')
  119. sys.exit(ev)
  120. def Die(ev=0,s=''):
  121. die(ev=ev,s=s,stdout=True)
  122. def pp_fmt(d):
  123. import pprint
  124. return pprint.PrettyPrinter(indent=4,compact=False).pformat(d)
  125. def pp_msg(d):
  126. msg(pp_fmt(d))
  127. def fmt(s,indent='',strip_char=None):
  128. "de-indent multiple lines of text, or indent with specified string"
  129. return indent + ('\n'+indent).join([l.strip(strip_char) for l in s.strip().splitlines()]) + '\n'
  130. def fmt_list(iterable,fmt='dfl',indent=''):
  131. "pretty-format a list"
  132. sep,lq,rq = {
  133. 'utf8': ("“, ”", "“", "”"),
  134. 'dfl': ("', '", "'", "'"),
  135. 'bare': (' ', '', '' ),
  136. 'no_quotes': (', ', '', '' ),
  137. 'no_spc': ("','", "'", "'"),
  138. 'min': (",", "'", "'"),
  139. 'col': ('\n'+indent, indent, '' ),
  140. }[fmt]
  141. return lq + sep.join(iterable) + rq
  142. def list_gen(*data):
  143. """
  144. add element to list if condition is true or absent
  145. """
  146. assert type(data) in (list,tuple), f'{type(data).__name__} not in (list,tuple)'
  147. def gen():
  148. for i in data:
  149. assert type(i) == list, f'{type(i).__name__} != list'
  150. assert len(i) in (1,2), f'{len(i)} not in (1,2)'
  151. if len(i) == 1 or i[1]:
  152. yield i[0]
  153. return list(gen())
  154. def remove_dups(iterable,edesc='element',desc='list',quiet=False,hide=False):
  155. """
  156. Remove duplicate occurrences of iterable elements, preserving first occurrence
  157. If iterable is a generator, return a list, else type(iterable)
  158. """
  159. ret = []
  160. for e in iterable:
  161. if e in ret:
  162. if not quiet:
  163. ymsg(f'Warning: removing duplicate {edesc} {"(hidden)" if hide else e} in {desc}')
  164. else:
  165. ret.append(e)
  166. return ret if type(iterable).__name__ == 'generator' else type(iterable)(ret)
  167. def exit_if_mswin(feature):
  168. if g.platform == 'win':
  169. die(2, capfirst(feature) + ' not supported on the MSWin / MSYS2 platform' )
  170. def get_keccak():
  171. from .opts import opt
  172. # called in opts.init() via CoinProtocol, so must use getattr():
  173. if getattr(opt,'use_internal_keccak_module',False):
  174. from .keccak import keccak_256
  175. qmsg('Using internal keccak module by user request')
  176. return keccak_256
  177. try:
  178. from sha3 import keccak_256
  179. except:
  180. from .keccak import keccak_256
  181. return keccak_256
  182. # From 'man dd':
  183. # c=1, w=2, b=512, kB=1000, K=1024, MB=1000*1000, M=1024*1024,
  184. # GB=1000*1000*1000, G=1024*1024*1024, and so on for T, P, E, Z, Y.
  185. bytespec_map = (
  186. ('c', 1),
  187. ('w', 2),
  188. ('b', 512),
  189. ('kB', 1000),
  190. ('K', 1024),
  191. ('MB', 1000000),
  192. ('M', 1048576),
  193. ('GB', 1000000000),
  194. ('G', 1073741824),
  195. ('TB', 1000000000000),
  196. ('T', 1099511627776),
  197. ('PB', 1000000000000000),
  198. ('P', 1125899906842624),
  199. ('EB', 1000000000000000000),
  200. ('E', 1152921504606846976),
  201. )
  202. def int2bytespec(n,spec,fmt,print_sym=True):
  203. def spec2int(spec):
  204. for k,v in bytespec_map:
  205. if k == spec:
  206. return v
  207. else:
  208. die('{spec}: unrecognized bytespec')
  209. return '{:{}f}{}'.format( n / spec2int(spec), fmt, spec if print_sym else '' )
  210. def parse_bytespec(nbytes):
  211. m = re.match(r'([0123456789.]+)(.*)',nbytes)
  212. if m:
  213. if m.group(2):
  214. for k,v in bytespec_map:
  215. if k == m.group(2):
  216. from decimal import Decimal
  217. return int(Decimal(m.group(1)) * v)
  218. else:
  219. msg("Valid byte specifiers: '{}'".format("' '".join([i[0] for i in bytespec_map])))
  220. elif '.' in nbytes:
  221. raise ValueError('fractional bytes not allowed')
  222. else:
  223. return int(nbytes)
  224. die(1,f'{nbytes!r}: invalid byte specifier')
  225. def suf(arg,suf_type='s',verb='none'):
  226. suf_types = {
  227. 'none': {
  228. 's': ('s', ''),
  229. 'es': ('es', ''),
  230. 'ies': ('ies','y'),
  231. },
  232. 'is': {
  233. 's': ('s are', ' is'),
  234. 'es': ('es are', ' is'),
  235. 'ies': ('ies are','y is'),
  236. },
  237. 'has': {
  238. 's': ('s have', ' has'),
  239. 'es': ('es have', ' has'),
  240. 'ies': ('ies have','y has'),
  241. },
  242. }
  243. if isinstance(arg,int):
  244. n = arg
  245. elif isinstance(arg,(list,tuple,set,dict)):
  246. n = len(arg)
  247. else:
  248. die(2,f'{arg}: invalid parameter for suf()')
  249. return suf_types[verb][suf_type][n == 1]
  250. def get_extension(fn):
  251. return os.path.splitext(fn)[1][1:]
  252. def remove_extension(fn,ext):
  253. a,b = os.path.splitext(fn)
  254. return a if b[1:] == ext else fn
  255. def make_chksum_N(s,nchars,sep=False):
  256. if isinstance(s,str):
  257. s = s.encode()
  258. if nchars%4 or not (4 <= nchars <= 64):
  259. return False
  260. from hashlib import sha256
  261. s = sha256(sha256(s).digest()).hexdigest().upper()
  262. sep = ('',' ')[bool(sep)]
  263. return sep.join([s[i*4:i*4+4] for i in range(nchars//4)])
  264. def make_chksum_8(s,sep=False):
  265. from .obj import HexStr
  266. from hashlib import sha256
  267. s = HexStr(sha256(sha256(s).digest()).hexdigest()[:8].upper(),case='upper')
  268. return '{} {}'.format(s[:4],s[4:]) if sep else s
  269. def make_chksum_6(s):
  270. from .obj import HexStr
  271. from hashlib import sha256
  272. if isinstance(s,str):
  273. s = s.encode()
  274. return HexStr(sha256(s).hexdigest()[:6])
  275. def is_chksum_6(s):
  276. return len(s) == 6 and set(s) <= set(hexdigits_lc)
  277. def make_iv_chksum(s):
  278. from hashlib import sha256
  279. return sha256(s).hexdigest()[:8].upper()
  280. def splitN(s,n,sep=None): # always return an n-element list
  281. ret = s.split(sep,n-1)
  282. return ret + ['' for i in range(n-len(ret))]
  283. def split2(s,sep=None):
  284. return splitN(s,2,sep) # always return a 2-element list
  285. def split3(s,sep=None):
  286. return splitN(s,3,sep) # always return a 3-element list
  287. def split_into_cols(col_wid,s):
  288. return ' '.join([s[col_wid*i:col_wid*(i+1)] for i in range(len(s)//col_wid+1)]).rstrip()
  289. def capfirst(s): # different from str.capitalize() - doesn't downcase any uc in string
  290. return s if len(s) == 0 else s[0].upper() + s[1:]
  291. def decode_timestamp(s):
  292. # tz_save = open('/etc/timezone').read().rstrip()
  293. os.environ['TZ'] = 'UTC'
  294. # os.environ['TZ'] = tz_save
  295. return int(time.mktime( time.strptime(s,'%Y%m%d_%H%M%S') ))
  296. def make_timestamp(secs=None):
  297. return '{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}'.format(*time.gmtime(
  298. int(secs) if secs else time.time() )[:6])
  299. def make_timestr(secs=None):
  300. return '{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(*time.gmtime(
  301. int(secs) if secs else time.time() )[:6])
  302. def secs_to_dhms(secs):
  303. hrs = secs // 3600
  304. return '{}{:02d}:{:02d}:{:02d} h/m/s'.format(
  305. ('{} day{}, '.format(hrs//24,suf(hrs//24)) if hrs > 24 else ''),
  306. hrs % 24,
  307. (secs // 60) % 60,
  308. secs % 60
  309. )
  310. def secs_to_hms(secs):
  311. return '{:02d}:{:02d}:{:02d}'.format(secs//3600, (secs//60) % 60, secs % 60)
  312. def secs_to_ms(secs):
  313. return '{:02d}:{:02d}'.format(secs//60, secs % 60)
  314. def is_int(s):
  315. try:
  316. int(str(s))
  317. return True
  318. except:
  319. return False
  320. def is_hex_str(s):
  321. return set(s) <= set(hexdigits)
  322. def is_hex_str_lc(s):
  323. return set(s) <= set(hexdigits_lc)
  324. def is_utf8(s):
  325. try: s.decode('utf8')
  326. except: return False
  327. else: return True
  328. def remove_whitespace(s,ws='\t\r\n '):
  329. return s.translate(dict((ord(e),None) for e in ws))
  330. def pretty_format(s,width=80,pfx=''):
  331. out = []
  332. while(s):
  333. if len(s) <= width:
  334. out.append(s)
  335. break
  336. i = s[:width].rfind(' ')
  337. out.append(s[:i])
  338. s = s[i+1:]
  339. return pfx + ('\n'+pfx).join(out)
  340. def block_format(data,gw=2,cols=8,line_nums=None,data_is_hex=False):
  341. assert line_nums in (None,'hex','dec'),"'line_nums' must be one of None, 'hex' or 'dec'"
  342. ln_fs = '{:06x}: ' if line_nums == 'hex' else '{:06}: '
  343. bytes_per_chunk = gw
  344. if data_is_hex:
  345. gw *= 2
  346. nchunks = len(data)//gw + bool(len(data)%gw)
  347. return ''.join(
  348. ('' if (line_nums == None or i % cols) else ln_fs.format(i*bytes_per_chunk))
  349. + data[i*gw:i*gw+gw]
  350. + (' ' if (not cols or (i+1) % cols) else '\n')
  351. for i in range(nchunks)
  352. ).rstrip() + '\n'
  353. def pretty_hexdump(data,gw=2,cols=8,line_nums=None):
  354. return block_format(data.hex(),gw,cols,line_nums,data_is_hex=True)
  355. def decode_pretty_hexdump(data):
  356. pat = re.compile(fr'^[{hexdigits}]+:\s+')
  357. lines = [pat.sub('',line) for line in data.splitlines()]
  358. try:
  359. return bytes.fromhex(''.join((''.join(lines).split())))
  360. except:
  361. msg('Data not in hexdump format')
  362. return False
  363. def strip_comment(line):
  364. return re.sub('#.*','',line).rstrip()
  365. def strip_comments(lines):
  366. pat = re.compile('#.*')
  367. return [m for m in [pat.sub('',l).rstrip() for l in lines] if m != '']
  368. def compare_chksums(chk1,desc1,chk2,desc2,hdr='',die_on_fail=False,verbose=False):
  369. if not chk1 == chk2:
  370. fs = "{} ERROR: {} checksum ({}) doesn't match {} checksum ({})"
  371. m = fs.format((hdr+':\n ' if hdr else 'CHECKSUM'),desc2,chk2,desc1,chk1)
  372. if die_on_fail:
  373. die(3,m)
  374. else:
  375. vmsg(m,force=verbose)
  376. return False
  377. vmsg(f'{capfirst(desc1)} checksum OK ({chk1})')
  378. return True
  379. def compare_or_die(val1, desc1, val2, desc2, e='Error'):
  380. if val1 != val2:
  381. die(3,f"{e}: {desc2} ({val2}) doesn't match {desc1} ({val1})")
  382. dmsg(f'{capfirst(desc2)} OK ({val2})')
  383. return True
  384. def check_wallet_extension(fn):
  385. from .wallet import Wallet
  386. if not Wallet.ext_to_type(get_extension(fn)):
  387. die( 'BadFileExtension', f'{fn!r}: unrecognized seed source file extension' )
  388. def make_full_path(outdir,outfile):
  389. return os.path.normpath(os.path.join(outdir, os.path.basename(outfile)))
  390. def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'):
  391. if message.strip():
  392. msg(message.strip())
  393. a = f'{q} ' if q[0].isupper() else f'Are you sure you want to {q}?\n'
  394. b = f'Type uppercase {expect!r} to confirm: '
  395. if line_input(a+b).strip() != expect:
  396. die( 'UserNonConfirmation', exit_msg )
  397. def get_words_from_user(prompt):
  398. words = line_input(prompt, echo=opt.echo_passphrase).split()
  399. dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  400. return words
  401. def get_data_from_user(desc='data'): # user input MUST be UTF-8
  402. data = line_input(f'Enter {desc}: ',echo=opt.echo_passphrase)
  403. dmsg(f'User input: [{data}]')
  404. return data
  405. class oneshot_warning:
  406. color = 'nocolor'
  407. def __init__(self,div=None,fmt_args=[],reverse=False):
  408. self.do(type(self),div,fmt_args,reverse)
  409. def do(self,wcls,div,fmt_args,reverse):
  410. def do_warning():
  411. message = getattr(wcls,'message')
  412. color = getattr( color_mod, getattr(wcls,'color') )
  413. msg(color('WARNING: ' + message.format(*fmt_args)))
  414. if not hasattr(wcls,'data'):
  415. setattr(wcls,'data',[])
  416. data = getattr(wcls,'data')
  417. condition = (div in data) if reverse else (not div in data)
  418. if not div in data:
  419. data.append(div)
  420. if condition:
  421. do_warning()
  422. self.warning_shown = True
  423. else:
  424. self.warning_shown = False
  425. class oneshot_warning_group(oneshot_warning):
  426. def __init__(self,wcls,div=None,fmt_args=[],reverse=False):
  427. self.do(getattr(self,wcls),div,fmt_args,reverse)
  428. class pwfile_reuse_warning(oneshot_warning):
  429. message = 'Reusing passphrase from file {!r} at user request'
  430. def __init__(self,fn):
  431. oneshot_warning.__init__(self,div=fn,fmt_args=[fn],reverse=True)
  432. def line_input(prompt,echo=True,insert_txt=''):
  433. """
  434. multi-line prompts OK
  435. one-line prompts must begin at beginning of line
  436. empty prompts forbidden due to interactions with readline
  437. """
  438. assert prompt,'calling line_input() with an empty prompt forbidden'
  439. def init_readline():
  440. try:
  441. import readline
  442. except ImportError:
  443. return False
  444. else:
  445. if insert_txt:
  446. readline.set_startup_hook(lambda: readline.insert_text(insert_txt))
  447. return True
  448. else:
  449. return False
  450. if not sys.stdout.isatty():
  451. msg_r(prompt)
  452. prompt = ''
  453. from .term import kb_hold_protect
  454. kb_hold_protect()
  455. if g.test_suite_popen_spawn:
  456. msg(prompt)
  457. sys.stderr.flush()
  458. reply = os.read(0,4096).decode().rstrip('\n') # strip NL to mimic behavior of input()
  459. elif echo or not sys.stdin.isatty():
  460. clear_buffer = init_readline() if sys.stdin.isatty() else False
  461. reply = input(prompt)
  462. if clear_buffer:
  463. import readline
  464. readline.set_startup_hook(lambda: readline.insert_text(''))
  465. else:
  466. from getpass import getpass
  467. if g.platform == 'win':
  468. # MSWin hack - getpass('foo') doesn't flush stderr
  469. msg_r(prompt.strip()) # getpass('') adds a space
  470. sys.stderr.flush()
  471. reply = getpass('')
  472. else:
  473. reply = getpass(prompt)
  474. kb_hold_protect()
  475. return reply.strip()
  476. def keypress_confirm(prompt,default_yes=False,verbose=False,no_nl=False,complete_prompt=False):
  477. if not complete_prompt:
  478. prompt = '{} {}: '.format( prompt, '(Y/n)' if default_yes else '(y/N)' )
  479. nl = f'\r{" "*len(prompt)}\r' if no_nl else '\n'
  480. if g.accept_defaults:
  481. msg(prompt)
  482. return default_yes
  483. from .term import get_char
  484. while True:
  485. reply = get_char(prompt,immed_chars='yYnN').strip('\n\r')
  486. if not reply:
  487. msg_r(nl)
  488. return True if default_yes else False
  489. elif reply in 'yYnN':
  490. msg_r(nl)
  491. return True if reply in 'yY' else False
  492. else:
  493. msg_r('\nInvalid reply\n' if verbose else '\r')
  494. def do_pager(text):
  495. pagers = ['less','more']
  496. end_msg = '\n(end of text)\n\n'
  497. # --- Non-MSYS Windows code deleted ---
  498. # raw, chop, horiz scroll 8 chars, disable buggy line chopping in MSYS
  499. os.environ['LESS'] = (('--shift 8 -RS'),('-cR -#1'))[g.platform=='win']
  500. if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]:
  501. pagers = [os.environ['PAGER']] + pagers
  502. from subprocess import run
  503. for pager in pagers:
  504. try:
  505. m = text + ('' if pager == 'less' else end_msg)
  506. p = run([pager],input=m.encode(),check=True)
  507. msg_r('\r')
  508. except:
  509. pass
  510. else:
  511. break
  512. else:
  513. Msg(text+end_msg)
  514. def do_license_msg(immed=False):
  515. if opt.quiet or g.no_license or opt.yes or not g.stdin_tty:
  516. return
  517. import mmgen.license as gpl
  518. msg(gpl.warning)
  519. from .term import get_char
  520. prompt = "Press 'w' for conditions and warranty info, or 'c' to continue: "
  521. while True:
  522. reply = get_char(prompt, immed_chars=('','wc')[bool(immed)])
  523. if reply == 'w':
  524. do_pager(gpl.conditions)
  525. elif reply == 'c':
  526. msg('')
  527. break
  528. else:
  529. msg_r('\r')
  530. msg('')
  531. def format_par(s,indent=0,width=80,as_list=False):
  532. words,lines = s.split(),[]
  533. assert width >= indent + 4,'width must be >= indent + 4'
  534. while words:
  535. line = ''
  536. while len(line) <= (width-indent) and words:
  537. if line and len(line) + len(words[0]) + 1 > width-indent: break
  538. line += ('',' ')[bool(line)] + words.pop(0)
  539. lines.append(' '*indent + line)
  540. return lines if as_list else '\n'.join(lines) + '\n'
  541. def get_subclasses(cls,names=False):
  542. def gen(cls):
  543. for i in cls.__subclasses__():
  544. yield i
  545. for j in gen(i):
  546. yield j
  547. return tuple((c.__name__ for c in gen(cls)) if names else gen(cls))
  548. def base_proto_subclass(cls,proto,mod_dir):
  549. """
  550. magic module loading and class retrieval
  551. """
  552. if proto.base_proto != 'Ethereum':
  553. return cls
  554. modname = f'mmgen.base_proto.{proto.base_proto.lower()}.{mod_dir}'
  555. import importlib
  556. if mod_dir == 'tx': # nested classes
  557. outer_clsname,inner_clsname = (
  558. proto.mod_clsname
  559. + ('Token' if proto.tokensym else '')
  560. + cls.__qualname__ ).split('.')
  561. return getattr(getattr(importlib.import_module(modname),outer_clsname),inner_clsname)
  562. else:
  563. clsname = (
  564. proto.mod_clsname
  565. + ('Token' if proto.tokensym else '')
  566. + cls.__name__ )
  567. return getattr(importlib.import_module(modname),clsname)
  568. # decorator for TrackingWallet
  569. def write_mode(orig_func):
  570. def f(self,*args,**kwargs):
  571. if self.mode != 'w':
  572. die(1,'{} opened in read-only mode: cannot execute method {}()'.format(
  573. type(self).__name__,
  574. locals()['orig_func'].__name__
  575. ))
  576. return orig_func(self,*args,**kwargs)
  577. return f
  578. def run_session(callback,backend=None):
  579. async def do():
  580. if (backend or opt.rpc_backend) == 'aiohttp':
  581. import aiohttp
  582. async with aiohttp.ClientSession(
  583. headers = { 'Content-Type': 'application/json' },
  584. connector = aiohttp.TCPConnector(limit_per_host=g.aiohttp_rpc_queue_len),
  585. ) as g.session:
  586. return await callback
  587. else:
  588. return await callback
  589. import asyncio
  590. return asyncio.run(do())