util.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2021 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. util.py: Low-level routines imported by other modules in the MMGen suite
  20. """
  21. import sys,os,time,stat,re
  22. from subprocess import run,PIPE,DEVNULL
  23. from hashlib import sha256
  24. from string import hexdigits,digits
  25. from .color import *
  26. from .exception import *
  27. from .globalvars import *
  28. CUR_HIDE = '\033[?25l'
  29. CUR_SHOW = '\033[?25h'
  30. if g.platform == 'win':
  31. def msg_r(s):
  32. try:
  33. g.stderr.write(s)
  34. g.stderr.flush()
  35. except:
  36. os.write(2,s.encode())
  37. def Msg_r(s):
  38. try:
  39. g.stdout.write(s)
  40. g.stdout.flush()
  41. except:
  42. os.write(1,s.encode())
  43. def msg(s): msg_r(s + '\n')
  44. def Msg(s): Msg_r(s + '\n')
  45. else:
  46. def msg_r(s):
  47. g.stderr.write(s)
  48. g.stderr.flush()
  49. def Msg_r(s):
  50. g.stdout.write(s)
  51. g.stdout.flush()
  52. def msg(s): g.stderr.write(s + '\n')
  53. def Msg(s): g.stdout.write(s + '\n')
  54. def msgred(s): msg(red(s))
  55. def rmsg(s): msg(red(s))
  56. def rmsg_r(s): msg_r(red(s))
  57. def ymsg(s): msg(yellow(s))
  58. def ymsg_r(s): msg_r(yellow(s))
  59. def gmsg(s): msg(green(s))
  60. def gmsg_r(s): msg_r(green(s))
  61. def bmsg(s): msg(blue(s))
  62. def bmsg_r(s): msg_r(blue(s))
  63. def mmsg(*args):
  64. for d in args: Msg(repr(d))
  65. def mdie(*args):
  66. mmsg(*args); sys.exit(0)
  67. def die_wait(delay,ev=0,s=''):
  68. assert isinstance(delay,int)
  69. assert isinstance(ev,int)
  70. if s: msg(s)
  71. time.sleep(delay)
  72. sys.exit(ev)
  73. def die_pause(ev=0,s=''):
  74. assert isinstance(ev,int)
  75. if s: msg(s)
  76. input('Press ENTER to exit')
  77. sys.exit(ev)
  78. def die(ev=0,s=''):
  79. assert isinstance(ev,int)
  80. if s: msg(s)
  81. sys.exit(ev)
  82. def Die(ev=0,s=''):
  83. assert isinstance(ev,int)
  84. if s: Msg(s)
  85. sys.exit(ev)
  86. def rdie(ev=0,s=''): die(ev,red(s))
  87. def ydie(ev=0,s=''): die(ev,yellow(s))
  88. def pp_fmt(d):
  89. import pprint
  90. return pprint.PrettyPrinter(indent=4,compact=True).pformat(d)
  91. def pp_msg(d):
  92. msg(pp_fmt(d))
  93. def fmt(s,indent='',strip_char=None):
  94. "de-indent multiple lines of text, or indent with specified string"
  95. return indent + ('\n'+indent).join([l.strip(strip_char) for l in s.strip().splitlines()]) + '\n'
  96. def fmt_list(l,fmt='dfl',indent=''):
  97. "pretty-format a list"
  98. sep,lq,rq = {
  99. 'utf8': ("“, ”", "“", "”"),
  100. 'dfl': ("', '", "'", "'"),
  101. 'bare': (' ', '', '' ),
  102. 'no_quotes': (', ', '', '' ),
  103. 'no_spc': ("','", "'", "'"),
  104. 'min': (",", "'", "'"),
  105. 'col': ('\n'+indent, indent, '' ),
  106. }[fmt]
  107. return lq + sep.join(l) + rq
  108. def list_gen(*data):
  109. """
  110. add element to list if condition is true or absent
  111. """
  112. assert type(data) in (list,tuple), f'{type(data).__name__} not in (list,tuple)'
  113. def gen():
  114. for i in data:
  115. assert type(i) == list, f'{type(i).__name__} != list'
  116. assert len(i) in (1,2), f'{len(i)} not in (1,2)'
  117. if len(i) == 1 or i[1]:
  118. yield i[0]
  119. return list(gen())
  120. def exit_if_mswin(feature):
  121. if g.platform == 'win':
  122. m = capfirst(feature) + ' not supported on the MSWin / MSYS2 platform'
  123. ydie(1,m)
  124. def warn_altcoins(coinsym,trust_level):
  125. if trust_level > 3:
  126. return
  127. tl_str = (
  128. red('COMPLETELY UNTESTED'),
  129. red('LOW'),
  130. yellow('MEDIUM'),
  131. green('HIGH'),
  132. )[trust_level]
  133. m = """
  134. Support for coin {!r} is EXPERIMENTAL. The {pn} project
  135. assumes no responsibility for any loss of funds you may incur.
  136. This coin’s {pn} testing status: {}
  137. Are you sure you want to continue?
  138. """
  139. m = fmt(m).strip().format(coinsym.upper(),tl_str,pn=g.proj_name)
  140. if g.test_suite:
  141. qmsg(m)
  142. return
  143. if not keypress_confirm(m,default_yes=True):
  144. sys.exit(0)
  145. def set_for_type(val,refval,desc,invert_bool=False,src=None):
  146. if type(refval) == bool:
  147. v = str(val).lower()
  148. ret = (
  149. True if v in ('true','yes','1','on') else
  150. False if v in ('false','no','none','0','off','') else
  151. None
  152. )
  153. if ret is not None:
  154. return not ret if invert_bool else ret
  155. else:
  156. try:
  157. return type(refval)(not val if invert_bool else val)
  158. except:
  159. pass
  160. die(1,'{!r}: invalid value for {!r}{} (must be of type {!r})'.format(
  161. val,
  162. desc,
  163. ' in {!r}'.format(src) if src else '',
  164. type(refval).__name__) )
  165. # From 'man dd':
  166. # c=1, w=2, b=512, kB=1000, K=1024, MB=1000*1000, M=1024*1024,
  167. # GB=1000*1000*1000, G=1024*1024*1024, and so on for T, P, E, Z, Y.
  168. bytespec_map = (
  169. ('c', 1),
  170. ('w', 2),
  171. ('b', 512),
  172. ('kB', 1000),
  173. ('K', 1024),
  174. ('MB', 1000000),
  175. ('M', 1048576),
  176. ('GB', 1000000000),
  177. ('G', 1073741824),
  178. ('TB', 1000000000000),
  179. ('T', 1099511627776),
  180. ('PB', 1000000000000000),
  181. ('P', 1125899906842624),
  182. ('EB', 1000000000000000000),
  183. ('E', 1152921504606846976),
  184. )
  185. def int2bytespec(n,spec,fmt,print_sym=True):
  186. def spec2int(spec):
  187. for k,v in bytespec_map:
  188. if k == spec:
  189. return v
  190. else:
  191. die('{spec}: unrecognized bytespec')
  192. return '{:{}f}{}'.format( n / spec2int(spec), fmt, spec if print_sym else '' )
  193. def parse_bytespec(nbytes):
  194. import re
  195. m = re.match(r'([0123456789.]+)(.*)',nbytes)
  196. if m:
  197. if m.group(2):
  198. for k,v in bytespec_map:
  199. if k == m.group(2):
  200. from decimal import Decimal
  201. return int(Decimal(m.group(1)) * v)
  202. else:
  203. msg("Valid byte specifiers: '{}'".format("' '".join([i[0] for i in bytespec_map])))
  204. elif '.' in nbytes:
  205. raise ValueError('fractional bytes not allowed')
  206. else:
  207. return int(nbytes)
  208. die(1,f'{nbytes!r}: invalid byte specifier')
  209. def check_or_create_dir(path):
  210. try:
  211. os.listdir(path)
  212. except:
  213. if os.getenv('MMGEN_TEST_SUITE'):
  214. try: # exception handling required for MSWin/MSYS2
  215. run(['/bin/rm','-rf',path])
  216. except:
  217. pass
  218. try:
  219. os.makedirs(path,0o700)
  220. except:
  221. die(2,f'ERROR: unable to read or create path {path!r}')
  222. from .opts import opt
  223. def qmsg(s,alt=None):
  224. if opt.quiet:
  225. if alt != None: msg(alt)
  226. else: msg(s)
  227. def qmsg_r(s,alt=None):
  228. if opt.quiet:
  229. if alt != None: msg_r(alt)
  230. else: msg_r(s)
  231. def vmsg(s,force=False):
  232. if opt.verbose or force: msg(s)
  233. def vmsg_r(s,force=False):
  234. if opt.verbose or force: msg_r(s)
  235. def Vmsg(s,force=False):
  236. if opt.verbose or force: Msg(s)
  237. def Vmsg_r(s,force=False):
  238. if opt.verbose or force: Msg_r(s)
  239. def dmsg(s):
  240. if opt.debug: msg(s)
  241. def suf(arg,suf_type='s',verb='none'):
  242. suf_types = {
  243. 'none': {
  244. 's': ('s', ''),
  245. 'es': ('es', ''),
  246. 'ies': ('ies','y'),
  247. },
  248. 'is': {
  249. 's': ('s are', ' is'),
  250. 'es': ('es are', ' is'),
  251. 'ies': ('ies are','y is'),
  252. },
  253. 'has': {
  254. 's': ('s have', ' has'),
  255. 'es': ('es have', ' has'),
  256. 'ies': ('ies have','y has'),
  257. },
  258. }
  259. if isinstance(arg,int):
  260. n = arg
  261. elif isinstance(arg,(list,tuple,set,dict)):
  262. n = len(arg)
  263. else:
  264. die(2,f'{arg}: invalid parameter for suf()')
  265. return suf_types[verb][suf_type][n == 1]
  266. def get_extension(fn):
  267. return os.path.splitext(fn)[1][1:]
  268. def remove_extension(fn,ext):
  269. a,b = os.path.splitext(fn)
  270. return a if b[1:] == ext else fn
  271. def make_chksum_N(s,nchars,sep=False):
  272. if isinstance(s,str): s = s.encode()
  273. if nchars%4 or not (4 <= nchars <= 64): return False
  274. s = sha256(sha256(s).digest()).hexdigest().upper()
  275. sep = ('',' ')[bool(sep)]
  276. return sep.join([s[i*4:i*4+4] for i in range(nchars//4)])
  277. def make_chksum_8(s,sep=False):
  278. from .obj import HexStr
  279. s = HexStr(sha256(sha256(s).digest()).hexdigest()[:8].upper(),case='upper')
  280. return '{} {}'.format(s[:4],s[4:]) if sep else s
  281. def make_chksum_6(s):
  282. from .obj import HexStr
  283. if isinstance(s,str): s = s.encode()
  284. return HexStr(sha256(s).hexdigest()[:6])
  285. def is_chksum_6(s): return len(s) == 6 and is_hex_str_lc(s)
  286. def make_iv_chksum(s): return sha256(s).hexdigest()[:8].upper()
  287. def splitN(s,n,sep=None): # always return an n-element list
  288. ret = s.split(sep,n-1)
  289. return ret + ['' for i in range(n-len(ret))]
  290. def split2(s,sep=None): return splitN(s,2,sep) # always return a 2-element list
  291. def split3(s,sep=None): return splitN(s,3,sep) # always return a 3-element list
  292. def split_into_cols(col_wid,s):
  293. return ' '.join([s[col_wid*i:col_wid*(i+1)] for i in range(len(s)//col_wid+1)]).rstrip()
  294. def capfirst(s): # different from str.capitalize() - doesn't downcase any uc in string
  295. return s if len(s) == 0 else s[0].upper() + s[1:]
  296. def decode_timestamp(s):
  297. # tz_save = open('/etc/timezone').read().rstrip()
  298. os.environ['TZ'] = 'UTC'
  299. ts = time.strptime(s,'%Y%m%d_%H%M%S')
  300. t = time.mktime(ts)
  301. # os.environ['TZ'] = tz_save
  302. return int(t)
  303. def make_timestamp(secs=None):
  304. t = int(secs) if secs else time.time()
  305. return '{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}'.format(*time.gmtime(t)[:6])
  306. def make_timestr(secs=None):
  307. t = int(secs) if secs else time.time()
  308. return '{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(*time.gmtime(t)[:6])
  309. def secs_to_dhms(secs):
  310. dsecs = secs // 3600
  311. return '{}{:02d}:{:02d}:{:02d} h/m/s'.format(
  312. ('{} day{}, '.format(dsecs//24,suf(dsecs//24)) if dsecs > 24 else ''),
  313. dsecs % 24,
  314. (secs // 60) % 60,
  315. secs % 60
  316. )
  317. def secs_to_hms(secs):
  318. return '{:02d}:{:02d}:{:02d}'.format(secs//3600, (secs//60) % 60, secs % 60)
  319. def secs_to_ms(secs):
  320. return '{:02d}:{:02d}'.format(secs//60, secs % 60)
  321. def is_int(s):
  322. try:
  323. int(str(s))
  324. return True
  325. except:
  326. return False
  327. def is_hex_str(s): return set(list(s.lower())) <= set(list(hexdigits.lower()))
  328. def is_hex_str_lc(s): return set(list(s)) <= set(list(hexdigits.lower()))
  329. def is_utf8(s):
  330. try: s.decode('utf8')
  331. except: return False
  332. else: return True
  333. def remove_whitespace(s,ws='\t\r\n '):
  334. return s.translate(dict((ord(e),None) for e in ws))
  335. def pretty_format(s,width=80,pfx=''):
  336. out = []
  337. while(s):
  338. if len(s) <= width:
  339. out.append(s)
  340. break
  341. i = s[:width].rfind(' ')
  342. out.append(s[:i])
  343. s = s[i+1:]
  344. return pfx + ('\n'+pfx).join(out)
  345. def block_format(data,gw=2,cols=8,line_nums=None,data_is_hex=False):
  346. assert line_nums in (None,'hex','dec'),"'line_nums' must be one of None, 'hex' or 'dec'"
  347. ln_fs = '{:06x}: ' if line_nums == 'hex' else '{:06}: '
  348. bytes_per_chunk = gw
  349. if data_is_hex:
  350. gw *= 2
  351. nchunks = len(data)//gw + bool(len(data)%gw)
  352. return ''.join(
  353. ('' if (line_nums == None or i % cols) else ln_fs.format(i*bytes_per_chunk))
  354. + data[i*gw:i*gw+gw]
  355. + (' ' if (not cols or (i+1) % cols) else '\n')
  356. for i in range(nchunks)
  357. ).rstrip() + '\n'
  358. def pretty_hexdump(data,gw=2,cols=8,line_nums=None):
  359. return block_format(data.hex(),gw,cols,line_nums,data_is_hex=True)
  360. def decode_pretty_hexdump(data):
  361. from string import hexdigits
  362. pat = re.compile(fr'^[{hexdigits}]+:\s+')
  363. lines = [pat.sub('',line) for line in data.splitlines()]
  364. try:
  365. return bytes.fromhex(''.join((''.join(lines).split())))
  366. except:
  367. msg('Data not in hexdump format')
  368. return False
  369. def strip_comment(line):
  370. return re.sub(r'\s+$','',re.sub(r'#.*','',line))
  371. def strip_comments(lines):
  372. return [m for m in [strip_comment(l) for l in lines] if m != '']
  373. def get_hash_params(hash_preset):
  374. if hash_preset in g.hash_presets:
  375. return g.hash_presets[hash_preset] # N,r,p
  376. else: # Shouldn't be here
  377. die(3,f"{hash_preset}: invalid 'hash_preset' value")
  378. def compare_chksums(chk1,desc1,chk2,desc2,hdr='',die_on_fail=False,verbose=False):
  379. if not chk1 == chk2:
  380. fs = "{} ERROR: {} checksum ({}) doesn't match {} checksum ({})"
  381. m = fs.format((hdr+':\n ' if hdr else 'CHECKSUM'),desc2,chk2,desc1,chk1)
  382. if die_on_fail:
  383. die(3,m)
  384. else:
  385. vmsg(m,force=verbose)
  386. return False
  387. vmsg(f'{capfirst(desc1)} checksum OK ({chk1})')
  388. return True
  389. def compare_or_die(val1, desc1, val2, desc2, e='Error'):
  390. if val1 != val2:
  391. die(3,f"{e}: {desc2} ({val2}) doesn't match {desc1} ({val1})")
  392. dmsg(f'{capfirst(desc2)} OK ({val2})')
  393. return True
  394. def check_binary(args):
  395. try:
  396. run(args,stdout=DEVNULL,stderr=DEVNULL,check=True)
  397. except:
  398. rdie(2,f'{args[0]!r} binary missing, not in path, or not executable')
  399. def shred_file(fn,verbose=False):
  400. check_binary(['shred','--version'])
  401. run(
  402. ['shred','--force','--iterations=30','--zero','--remove=wipesync']
  403. + (['--verbose'] if verbose else [])
  404. + [fn],
  405. check=True )
  406. def open_file_or_exit(filename,mode,silent=False):
  407. try:
  408. return open(filename, mode)
  409. except:
  410. op = ('writing','reading')['r' in mode]
  411. die(2,'' if silent else f'Unable to open file {filename!r} for {op}')
  412. def check_file_type_and_access(fname,ftype,blkdev_ok=False):
  413. access,op_desc = (
  414. (os.W_OK,'writ') if ftype in ('output file','output directory') else
  415. (os.R_OK,'read') )
  416. if ftype == 'output directory':
  417. ok_types = [(stat.S_ISDIR, 'output directory')]
  418. else:
  419. ok_types = [
  420. (stat.S_ISREG,'regular file'),
  421. (stat.S_ISLNK,'symbolic link')
  422. ]
  423. if blkdev_ok:
  424. ok_types.append((stat.S_ISBLK,'block device'))
  425. try:
  426. mode = os.stat(fname).st_mode
  427. except:
  428. raise FileNotFound(f'Requested {ftype} {fname!r} not found')
  429. for t in ok_types:
  430. if t[0](mode):
  431. break
  432. else:
  433. ok_list = ' or '.join( t[1] for t in ok_types )
  434. die(1,f'Requested {ftype} {fname!r} is not a {ok_list}')
  435. if not os.access(fname,access):
  436. die(1,f'Requested {ftype} {fname!r} is not {op_desc}able by you')
  437. return True
  438. def check_infile(f,blkdev_ok=False):
  439. return check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok)
  440. def check_outfile(f,blkdev_ok=False):
  441. return check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok)
  442. def check_outdir(f):
  443. return check_file_type_and_access(f,'output directory')
  444. def check_wallet_extension(fn):
  445. from .wallet import Wallet
  446. if not Wallet.ext_to_type(get_extension(fn)):
  447. raise BadFileExtension(f'{fn!r}: unrecognized seed source file extension')
  448. def make_full_path(outdir,outfile):
  449. return os.path.normpath(os.path.join(outdir, os.path.basename(outfile)))
  450. def get_seed_file(cmd_args,nargs,invoked_as=None):
  451. from .filename import find_file_in_dir
  452. from .wallet import MMGenWallet
  453. wf = find_file_in_dir(MMGenWallet,g.data_dir)
  454. wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
  455. import mmgen.opts as opts
  456. if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs:
  457. if not wf:
  458. msg('No default wallet found, and no other seed source was specified')
  459. opts.usage()
  460. elif len(cmd_args) > nargs:
  461. opts.usage()
  462. elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
  463. qmsg('Warning: overriding default wallet with user-supplied wallet')
  464. if cmd_args or wf:
  465. check_infile(cmd_args[0] if cmd_args else wf)
  466. return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt]
  467. def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'):
  468. if message.strip():
  469. msg(message.strip())
  470. a = f'{q} ' if q[0].isupper() else f'Are you sure you want to {q}?\n'
  471. b = f'Type uppercase {expect!r} to confirm: '
  472. if my_raw_input(a+b).strip() != expect:
  473. raise UserNonConfirmation(exit_msg)
  474. def write_data_to_file( outfile,data,desc='data',
  475. ask_write=False,
  476. ask_write_prompt='',
  477. ask_write_default_yes=True,
  478. ask_overwrite=True,
  479. ask_tty=True,
  480. no_tty=False,
  481. quiet=False,
  482. binary=False,
  483. ignore_opt_outdir=False,
  484. check_data=False,
  485. cmp_data=None):
  486. if quiet: ask_tty = ask_overwrite = False
  487. if opt.quiet: ask_overwrite = False
  488. if ask_write_default_yes == False or ask_write_prompt:
  489. ask_write = True
  490. def do_stdout():
  491. qmsg('Output to STDOUT requested')
  492. if g.stdin_tty:
  493. if no_tty:
  494. die(2,f'Printing {desc} to screen is not allowed')
  495. if (ask_tty and not opt.quiet) or binary:
  496. confirm_or_raise('',f'output {desc} to screen')
  497. else:
  498. try: of = os.readlink(f'/proc/{os.getpid()}/fd/1') # Linux
  499. except: of = None # Windows
  500. if of:
  501. if of[:5] == 'pipe:':
  502. if no_tty:
  503. die(2,f'Writing {desc} to pipe is not allowed')
  504. if ask_tty and not opt.quiet:
  505. confirm_or_raise('',f'output {desc} to pipe')
  506. msg('')
  507. of2,pd = os.path.relpath(of),os.path.pardir
  508. msg('Redirecting output to file {!r}'.format(of if of2[:len(pd)] == pd else of2))
  509. else:
  510. msg('Redirecting output to file')
  511. if binary and g.platform == 'win':
  512. import msvcrt
  513. msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
  514. # MSWin workaround. See msg_r()
  515. try:
  516. sys.stdout.write(data.decode() if isinstance(data,bytes) else data)
  517. except:
  518. os.write(1,data if isinstance(data,bytes) else data.encode())
  519. def do_file(outfile,ask_write_prompt):
  520. if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile):
  521. outfile = make_full_path(opt.outdir,outfile)
  522. if ask_write:
  523. if not ask_write_prompt:
  524. ask_write_prompt = f'Save {desc}?'
  525. if not keypress_confirm(ask_write_prompt,
  526. default_yes=ask_write_default_yes):
  527. die(1,f'{capfirst(desc)} not saved')
  528. hush = False
  529. if os.path.lexists(outfile) and ask_overwrite:
  530. confirm_or_raise('',f'File {outfile!r} already exists\nOverwrite?')
  531. msg(f'Overwriting file {outfile!r}')
  532. hush = True
  533. # not atomic, but better than nothing
  534. # if cmp_data is empty, file can be either empty or non-existent
  535. if check_data:
  536. try:
  537. d = open(outfile,('r','rb')[bool(binary)]).read()
  538. except:
  539. d = ''
  540. finally:
  541. if d != cmp_data:
  542. if g.test_suite:
  543. print_diff(cmp_data,d)
  544. die(3,f'{desc} in file {outfile!r} has been altered by some other program! Aborting file write')
  545. # To maintain portability, always open files in binary mode
  546. # If 'binary' option not set, encode/decode data before writing and after reading
  547. f = open_file_or_exit(outfile,'wb')
  548. try:
  549. f.write(data if binary else data.encode())
  550. except:
  551. die(2,f'Failed to write {desc} to file {outfile!r}')
  552. f.close
  553. if not (hush or quiet):
  554. msg(f'{capfirst(desc)} written to file {outfile!r}')
  555. return True
  556. if opt.stdout or outfile in ('','-'):
  557. do_stdout()
  558. elif sys.stdin.isatty() and not sys.stdout.isatty():
  559. do_stdout()
  560. else:
  561. do_file(outfile,ask_write_prompt)
  562. def get_words_from_user(prompt):
  563. words = my_raw_input(prompt, echo=opt.echo_passphrase).split()
  564. dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  565. return words
  566. def get_words_from_file(infile,desc,quiet=False):
  567. if not quiet:
  568. qmsg(f'Getting {desc} from file {infile!r}')
  569. f = open_file_or_exit(infile, 'rb')
  570. try: words = f.read().decode().split()
  571. except: die(1,f'{capfirst(desc)} data must be UTF-8 encoded.')
  572. f.close()
  573. dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  574. return words
  575. def get_words(infile,desc,prompt):
  576. if infile:
  577. return get_words_from_file(infile,desc)
  578. else:
  579. return get_words_from_user(prompt)
  580. def mmgen_decrypt_file_maybe(fn,desc='',quiet=False,silent=False):
  581. d = get_data_from_file(fn,desc,binary=True,quiet=quiet,silent=silent)
  582. have_enc_ext = get_extension(fn) == g.mmenc_ext
  583. if have_enc_ext or not is_utf8(d):
  584. m = ('Attempting to decrypt','Decrypting')[have_enc_ext]
  585. qmsg(f'{m} {desc} {fn!r}')
  586. from .crypto import mmgen_decrypt_retry
  587. d = mmgen_decrypt_retry(d,desc)
  588. return d
  589. def get_lines_from_file(fn,desc='',trim_comments=False,quiet=False,silent=False):
  590. dec = mmgen_decrypt_file_maybe(fn,desc,quiet=quiet,silent=silent)
  591. ret = dec.decode().splitlines()
  592. if trim_comments:
  593. ret = strip_comments(ret)
  594. dmsg(f'Got {len(ret)} lines from file {fn!r}')
  595. return ret
  596. def get_data_from_user(desc='data'): # user input MUST be UTF-8
  597. p = f'Enter {desc}: ' if g.stdin_tty else ''
  598. data = my_raw_input(p,echo=opt.echo_passphrase)
  599. dmsg(f'User input: [{data}]')
  600. return data
  601. def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False):
  602. if not opt.quiet and not silent and not quiet and desc:
  603. qmsg(f'Getting {desc} from file {infile!r}')
  604. if dash and infile == '-':
  605. data = os.fdopen(0,'rb').read(g.max_input_size+1)
  606. else:
  607. data = open_file_or_exit(infile,'rb',silent=silent).read(g.max_input_size+1)
  608. if not binary:
  609. data = data.decode()
  610. if len(data) == g.max_input_size + 1:
  611. raise MaxInputSizeExceeded(f'Too much input data! Max input data size: {f.max_input_size} bytes')
  612. return data
  613. class oneshot_warning:
  614. color = 'nocolor'
  615. def __init__(self,div=None,fmt_args=[],reverse=False):
  616. self.do(type(self),div,fmt_args,reverse)
  617. def do(self,wcls,div,fmt_args,reverse):
  618. def do_warning():
  619. message = getattr(wcls,'message')
  620. color = globals()[getattr(wcls,'color')]
  621. msg(color('WARNING: ' + message.format(*fmt_args)))
  622. if not hasattr(wcls,'data'):
  623. setattr(wcls,'data',[])
  624. data = getattr(wcls,'data')
  625. condition = (div in data) if reverse else (not div in data)
  626. if not div in data:
  627. data.append(div)
  628. if condition:
  629. do_warning()
  630. self.warning_shown = True
  631. else:
  632. self.warning_shown = False
  633. class oneshot_warning_group(oneshot_warning):
  634. def __init__(self,wcls,div=None,fmt_args=[],reverse=False):
  635. self.do(getattr(self,wcls),div,fmt_args,reverse)
  636. class pwfile_reuse_warning(oneshot_warning):
  637. message = 'Reusing passphrase from file {!r} at user request'
  638. def __init__(self,fn):
  639. oneshot_warning.__init__(self,div=fn,fmt_args=[fn],reverse=True)
  640. def my_raw_input(prompt,echo=True,insert_txt='',use_readline=True):
  641. try: import readline
  642. except: use_readline = False # Windows
  643. if use_readline and sys.stdout.isatty():
  644. def st_hook(): readline.insert_text(insert_txt)
  645. readline.set_startup_hook(st_hook)
  646. else:
  647. msg_r(prompt)
  648. prompt = ''
  649. from .term import kb_hold_protect
  650. kb_hold_protect()
  651. if g.test_suite_popen_spawn:
  652. msg(prompt)
  653. sys.stderr.flush()
  654. reply = os.read(0,4096).decode()
  655. elif echo or not sys.stdin.isatty():
  656. reply = input(prompt)
  657. else:
  658. from getpass import getpass
  659. if g.platform == 'win':
  660. # MSWin hack - getpass('foo') doesn't flush stderr
  661. msg_r(prompt.strip()) # getpass('') adds a space
  662. sys.stderr.flush()
  663. reply = getpass('')
  664. else:
  665. reply = getpass(prompt)
  666. kb_hold_protect()
  667. try:
  668. return reply.strip()
  669. except:
  670. die(1,'User input must be UTF-8 encoded.')
  671. def keypress_confirm(prompt,default_yes=False,verbose=False,no_nl=False,complete_prompt=False):
  672. q = ('(y/N)','(Y/n)')[bool(default_yes)]
  673. p = prompt if complete_prompt else f'{prompt} {q}: '
  674. nl = ('\n','\r{}\r'.format(' '*len(p)))[no_nl]
  675. if g.accept_defaults:
  676. msg(p)
  677. return default_yes
  678. from .term import get_char
  679. while True:
  680. reply = get_char(p,immed_chars='yYnN').strip('\n\r')
  681. if not reply:
  682. msg_r(nl)
  683. return True if default_yes else False
  684. elif reply in 'yYnN':
  685. msg_r(nl)
  686. return True if reply in 'yY' else False
  687. else:
  688. msg_r('\nInvalid reply\n' if verbose else '\r')
  689. def do_pager(text):
  690. pagers = ['less','more']
  691. end_msg = '\n(end of text)\n\n'
  692. # --- Non-MSYS Windows code deleted ---
  693. # raw, chop, horiz scroll 8 chars, disable buggy line chopping in MSYS
  694. os.environ['LESS'] = (('--shift 8 -RS'),('-cR -#1'))[g.platform=='win']
  695. if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]:
  696. pagers = [os.environ['PAGER']] + pagers
  697. for pager in pagers:
  698. try:
  699. m = text + ('' if pager == 'less' else end_msg)
  700. p = run([pager],input=m.encode(),check=True)
  701. msg_r('\r')
  702. except:
  703. pass
  704. else:
  705. break
  706. else:
  707. Msg(text+end_msg)
  708. def do_license_msg(immed=False):
  709. if opt.quiet or g.no_license or opt.yes or not g.stdin_tty:
  710. return
  711. import mmgen.license as gpl
  712. msg(gpl.warning)
  713. from .term import get_char
  714. prompt = "Press 'w' for conditions and warranty info, or 'c' to continue: "
  715. while True:
  716. reply = get_char(prompt, immed_chars=('','wc')[bool(immed)])
  717. if reply == 'w':
  718. do_pager(gpl.conditions)
  719. elif reply == 'c':
  720. msg('')
  721. break
  722. else:
  723. msg_r('\r')
  724. msg('')
  725. def format_par(s,indent=0,width=80,as_list=False):
  726. words,lines = s.split(),[]
  727. assert width >= indent + 4,'width must be >= indent + 4'
  728. while words:
  729. line = ''
  730. while len(line) <= (width-indent) and words:
  731. if line and len(line) + len(words[0]) + 1 > width-indent: break
  732. line += ('',' ')[bool(line)] + words.pop(0)
  733. lines.append(' '*indent + line)
  734. return lines if as_list else '\n'.join(lines) + '\n'
  735. def get_subclasses(cls,names=False):
  736. def gen(cls):
  737. for i in cls.__subclasses__():
  738. yield i
  739. for j in gen(i):
  740. yield j
  741. return tuple((c.__name__ for c in gen(cls)) if names else gen(cls))
  742. def altcoin_subclass(cls,proto,mod_dir):
  743. """
  744. magic module loading and class retrieval
  745. """
  746. from .protocol import CoinProtocol
  747. if isinstance(proto,CoinProtocol.Bitcoin):
  748. return cls
  749. modname = f'mmgen.altcoins.{proto.base_coin.lower()}.{mod_dir}'
  750. import importlib
  751. if mod_dir == 'tx': # nested classes
  752. outer_clsname,inner_clsname = (
  753. proto.mod_clsname
  754. + ('Token' if proto.tokensym else '')
  755. + cls.__qualname__ ).split('.')
  756. return getattr(getattr(importlib.import_module(modname),outer_clsname),inner_clsname)
  757. else:
  758. clsname = (
  759. proto.mod_clsname
  760. + ('Token' if proto.tokensym else '')
  761. + cls.__name__ )
  762. return getattr(importlib.import_module(modname),clsname)
  763. # decorator for TrackingWallet
  764. def write_mode(orig_func):
  765. def f(self,*args,**kwargs):
  766. if self.mode != 'w':
  767. die(1,'{} opened in read-only mode: cannot execute method {}()'.format(
  768. type(self).__name__,
  769. locals()['orig_func'].__name__
  770. ))
  771. return orig_func(self,*args,**kwargs)
  772. return f
  773. def run_session(callback,backend=None):
  774. backend = backend or opt.rpc_backend
  775. import asyncio
  776. async def do():
  777. if backend == 'aiohttp':
  778. import aiohttp
  779. async with aiohttp.ClientSession(
  780. headers = { 'Content-Type': 'application/json' },
  781. connector = aiohttp.TCPConnector(limit_per_host=g.aiohttp_rpc_queue_len),
  782. ) as g.session:
  783. ret = await callback
  784. return ret
  785. else:
  786. return await callback
  787. return asyncio.run(do())