util.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2021 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. util.py: Low-level routines imported by other modules in the MMGen suite
  20. """
  21. import sys,os,time,stat,re
  22. from hashlib import sha256
  23. from string import hexdigits,digits
  24. from .color import *
  25. from .exception import *
  26. from .globalvars import *
  27. CUR_HIDE = '\033[?25l'
  28. CUR_SHOW = '\033[?25h'
  29. if g.platform == 'win':
  30. def msg_r(s):
  31. try:
  32. g.stderr.write(s)
  33. g.stderr.flush()
  34. except:
  35. os.write(2,s.encode())
  36. def Msg_r(s):
  37. try:
  38. g.stdout.write(s)
  39. g.stdout.flush()
  40. except:
  41. os.write(1,s.encode())
  42. def msg(s): msg_r(s + '\n')
  43. def Msg(s): Msg_r(s + '\n')
  44. else:
  45. def msg_r(s):
  46. g.stderr.write(s)
  47. g.stderr.flush()
  48. def Msg_r(s):
  49. g.stdout.write(s)
  50. g.stdout.flush()
  51. def msg(s): g.stderr.write(s + '\n')
  52. def Msg(s): g.stdout.write(s + '\n')
  53. def msgred(s): msg(red(s))
  54. def rmsg(s): msg(red(s))
  55. def rmsg_r(s): msg_r(red(s))
  56. def ymsg(s): msg(yellow(s))
  57. def ymsg_r(s): msg_r(yellow(s))
  58. def gmsg(s): msg(green(s))
  59. def gmsg_r(s): msg_r(green(s))
  60. def bmsg(s): msg(blue(s))
  61. def bmsg_r(s): msg_r(blue(s))
  62. def mmsg(*args):
  63. for d in args: Msg(repr(d))
  64. def mdie(*args):
  65. mmsg(*args); sys.exit(0)
  66. def die_wait(delay,ev=0,s=''):
  67. assert isinstance(delay,int)
  68. assert isinstance(ev,int)
  69. if s: msg(s)
  70. time.sleep(delay)
  71. sys.exit(ev)
  72. def die_pause(ev=0,s=''):
  73. assert isinstance(ev,int)
  74. if s: msg(s)
  75. input('Press ENTER to exit')
  76. sys.exit(ev)
  77. def die(ev=0,s=''):
  78. assert isinstance(ev,int)
  79. if s: msg(s)
  80. sys.exit(ev)
  81. def Die(ev=0,s=''):
  82. assert isinstance(ev,int)
  83. if s: Msg(s)
  84. sys.exit(ev)
  85. def rdie(ev=0,s=''): die(ev,red(s))
  86. def ydie(ev=0,s=''): die(ev,yellow(s))
  87. def pp_fmt(d):
  88. import pprint
  89. return pprint.PrettyPrinter(indent=4,compact=True).pformat(d)
  90. def pp_msg(d):
  91. msg(pp_fmt(d))
  92. def fmt(s,indent='',strip_char=None):
  93. "de-indent multiple lines of text, or indent with specified string"
  94. return indent + ('\n'+indent).join([l.strip(strip_char) for l in s.strip().splitlines()]) + '\n'
  95. def fmt_list(l,fmt='dfl',indent=''):
  96. "pretty-format a list"
  97. sep,lq,rq = {
  98. 'utf8': ("“, ”", "“", "”"),
  99. 'dfl': ("', '", "'", "'"),
  100. 'bare': (' ', '', '' ),
  101. 'no_quotes': (', ', '', '' ),
  102. 'no_spc': ("','", "'", "'"),
  103. 'min': (",", "'", "'"),
  104. 'col': ('\n'+indent, indent, '' ),
  105. }[fmt]
  106. return lq + sep.join(l) + rq
  107. def list_gen(*data):
  108. """
  109. add element to list if condition is true or absent
  110. """
  111. assert type(data) in (list,tuple), f'{type(data).__name__} not in (list,tuple)'
  112. def gen():
  113. for i in data:
  114. assert type(i) == list, f'{type(i).__name__} != list'
  115. assert len(i) in (1,2), f'{len(i)} not in (1,2)'
  116. if len(i) == 1 or i[1]:
  117. yield i[0]
  118. return list(gen())
  119. def exit_if_mswin(feature):
  120. if g.platform == 'win':
  121. m = capfirst(feature) + ' not supported on the MSWin / MSYS2 platform'
  122. ydie(1,m)
  123. def warn_altcoins(coinsym,trust_level):
  124. if trust_level > 3:
  125. return
  126. tl_str = (
  127. red('COMPLETELY UNTESTED'),
  128. red('LOW'),
  129. yellow('MEDIUM'),
  130. green('HIGH'),
  131. )[trust_level]
  132. m = """
  133. Support for coin {!r} is EXPERIMENTAL. The {pn} project
  134. assumes no responsibility for any loss of funds you may incur.
  135. This coin’s {pn} testing status: {}
  136. Are you sure you want to continue?
  137. """
  138. m = fmt(m).strip().format(coinsym.upper(),tl_str,pn=g.proj_name)
  139. if g.test_suite:
  140. qmsg(m)
  141. return
  142. if not keypress_confirm(m,default_yes=True):
  143. sys.exit(0)
  144. def set_for_type(val,refval,desc,invert_bool=False,src=None):
  145. if type(refval) == bool:
  146. v = str(val).lower()
  147. ret = (
  148. True if v in ('true','yes','1','on') else
  149. False if v in ('false','no','none','0','off','') else
  150. None
  151. )
  152. if ret is not None:
  153. return not ret if invert_bool else ret
  154. else:
  155. try:
  156. return type(refval)(not val if invert_bool else val)
  157. except:
  158. pass
  159. die(1,'{!r}: invalid value for {!r}{} (must be of type {!r})'.format(
  160. val,
  161. desc,
  162. ' in {!r}'.format(src) if src else '',
  163. type(refval).__name__) )
  164. # From 'man dd':
  165. # c=1, w=2, b=512, kB=1000, K=1024, MB=1000*1000, M=1024*1024,
  166. # GB=1000*1000*1000, G=1024*1024*1024, and so on for T, P, E, Z, Y.
  167. bytespec_map = (
  168. ('c', 1),
  169. ('w', 2),
  170. ('b', 512),
  171. ('kB', 1000),
  172. ('K', 1024),
  173. ('MB', 1000000),
  174. ('M', 1048576),
  175. ('GB', 1000000000),
  176. ('G', 1073741824),
  177. ('TB', 1000000000000),
  178. ('T', 1099511627776),
  179. ('PB', 1000000000000000),
  180. ('P', 1125899906842624),
  181. ('EB', 1000000000000000000),
  182. ('E', 1152921504606846976),
  183. )
  184. def int2bytespec(n,spec,fmt,print_sym=True):
  185. def spec2int(spec):
  186. for k,v in bytespec_map:
  187. if k == spec:
  188. return v
  189. else:
  190. die('{spec}: unrecognized bytespec')
  191. return '{:{}f}{}'.format( n / spec2int(spec), fmt, spec if print_sym else '' )
  192. def parse_bytespec(nbytes):
  193. import re
  194. m = re.match(r'([0123456789.]+)(.*)',nbytes)
  195. if m:
  196. if m.group(2):
  197. for k,v in bytespec_map:
  198. if k == m.group(2):
  199. from decimal import Decimal
  200. return int(Decimal(m.group(1)) * v)
  201. else:
  202. msg("Valid byte specifiers: '{}'".format("' '".join([i[0] for i in bytespec_map])))
  203. elif '.' in nbytes:
  204. raise ValueError('fractional bytes not allowed')
  205. else:
  206. return int(nbytes)
  207. die(1,"'{}': invalid byte specifier".format(nbytes))
  208. def check_or_create_dir(path):
  209. try:
  210. os.listdir(path)
  211. except:
  212. if os.getenv('MMGEN_TEST_SUITE'):
  213. from subprocess import run
  214. try: # exception handling required for MSWin/MSYS2
  215. run(['/bin/rm','-rf',path])
  216. except:
  217. pass
  218. try:
  219. os.makedirs(path,0o700)
  220. except:
  221. die(2,f'ERROR: unable to read or create path {path!r}')
  222. from .opts import opt
  223. def qmsg(s,alt=None):
  224. if opt.quiet:
  225. if alt != None: msg(alt)
  226. else: msg(s)
  227. def qmsg_r(s,alt=None):
  228. if opt.quiet:
  229. if alt != None: msg_r(alt)
  230. else: msg_r(s)
  231. def vmsg(s,force=False):
  232. if opt.verbose or force: msg(s)
  233. def vmsg_r(s,force=False):
  234. if opt.verbose or force: msg_r(s)
  235. def Vmsg(s,force=False):
  236. if opt.verbose or force: Msg(s)
  237. def Vmsg_r(s,force=False):
  238. if opt.verbose or force: Msg_r(s)
  239. def dmsg(s):
  240. if opt.debug: msg(s)
  241. def suf(arg,suf_type='s',verb='none'):
  242. suf_types = {
  243. 'none': {
  244. 's': ('s', ''),
  245. 'es': ('es', ''),
  246. 'ies': ('ies','y'),
  247. },
  248. 'is': {
  249. 's': ('s are', ' is'),
  250. 'es': ('es are', ' is'),
  251. 'ies': ('ies are','y is'),
  252. },
  253. 'has': {
  254. 's': ('s have', ' has'),
  255. 'es': ('es have', ' has'),
  256. 'ies': ('ies have','y has'),
  257. },
  258. }
  259. if isinstance(arg,int):
  260. n = arg
  261. elif isinstance(arg,(list,tuple,set,dict)):
  262. n = len(arg)
  263. else:
  264. die(2,'{}: invalid parameter for suf()'.format(arg))
  265. return suf_types[verb][suf_type][n == 1]
  266. def get_extension(fn):
  267. return os.path.splitext(fn)[1][1:]
  268. def remove_extension(fn,ext):
  269. a,b = os.path.splitext(fn)
  270. return a if b[1:] == ext else fn
  271. def make_chksum_N(s,nchars,sep=False):
  272. if isinstance(s,str): s = s.encode()
  273. if nchars%4 or not (4 <= nchars <= 64): return False
  274. s = sha256(sha256(s).digest()).hexdigest().upper()
  275. sep = ('',' ')[bool(sep)]
  276. return sep.join([s[i*4:i*4+4] for i in range(nchars//4)])
  277. def make_chksum_8(s,sep=False):
  278. from .obj import HexStr
  279. s = HexStr(sha256(sha256(s).digest()).hexdigest()[:8].upper(),case='upper')
  280. return '{} {}'.format(s[:4],s[4:]) if sep else s
  281. def make_chksum_6(s):
  282. from .obj import HexStr
  283. if isinstance(s,str): s = s.encode()
  284. return HexStr(sha256(s).hexdigest()[:6])
  285. def is_chksum_6(s): return len(s) == 6 and is_hex_str_lc(s)
  286. def make_iv_chksum(s): return sha256(s).hexdigest()[:8].upper()
  287. def splitN(s,n,sep=None): # always return an n-element list
  288. ret = s.split(sep,n-1)
  289. return ret + ['' for i in range(n-len(ret))]
  290. def split2(s,sep=None): return splitN(s,2,sep) # always return a 2-element list
  291. def split3(s,sep=None): return splitN(s,3,sep) # always return a 3-element list
  292. def split_into_cols(col_wid,s):
  293. return ' '.join([s[col_wid*i:col_wid*(i+1)] for i in range(len(s)//col_wid+1)]).rstrip()
  294. def capfirst(s): # different from str.capitalize() - doesn't downcase any uc in string
  295. return s if len(s) == 0 else s[0].upper() + s[1:]
  296. def decode_timestamp(s):
  297. # tz_save = open('/etc/timezone').read().rstrip()
  298. os.environ['TZ'] = 'UTC'
  299. ts = time.strptime(s,'%Y%m%d_%H%M%S')
  300. t = time.mktime(ts)
  301. # os.environ['TZ'] = tz_save
  302. return int(t)
  303. def make_timestamp(secs=None):
  304. t = int(secs) if secs else time.time()
  305. return '{:04d}{:02d}{:02d}_{:02d}{:02d}{:02d}'.format(*time.gmtime(t)[:6])
  306. def make_timestr(secs=None):
  307. t = int(secs) if secs else time.time()
  308. return '{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(*time.gmtime(t)[:6])
  309. def secs_to_dhms(secs):
  310. dsecs = secs // 3600
  311. return '{}{:02d}:{:02d}:{:02d} h/m/s'.format(
  312. ('{} day{}, '.format(dsecs//24,suf(dsecs//24)) if dsecs > 24 else ''),
  313. dsecs % 24,
  314. (secs // 60) % 60,
  315. secs % 60
  316. )
  317. def secs_to_hms(secs):
  318. return '{:02d}:{:02d}:{:02d}'.format(secs//3600, (secs//60) % 60, secs % 60)
  319. def secs_to_ms(secs):
  320. return '{:02d}:{:02d}'.format(secs//60, secs % 60)
  321. def is_digits(s): return set(list(s)) <= set(list(digits))
  322. def is_int(s):
  323. try:
  324. int(str(s))
  325. return True
  326. except:
  327. return False
  328. def is_hex_str(s): return set(list(s.lower())) <= set(list(hexdigits.lower()))
  329. def is_hex_str_lc(s): return set(list(s)) <= set(list(hexdigits.lower()))
  330. def is_hex_str_uc(s): return set(list(s)) <= set(list(hexdigits.upper()))
  331. def is_utf8(s):
  332. return is_ascii(s,enc='utf8')
  333. def is_ascii(s,enc='ascii'):
  334. try: s.decode(enc)
  335. except: return False
  336. else: return True
  337. def check_int_between(n,lo,hi,desc='value'):
  338. import re
  339. m = re.match(r'-{0,1}[0-9]+',str(n))
  340. if m == None:
  341. raise NotAnInteger(f'{n}: {desc} must be an integer')
  342. n = int(n)
  343. if n < lo or n > hi:
  344. raise IntegerOutOfRange(f'{n}: {desc} must be between {lo} and {hi}')
  345. return n
  346. def match_ext(addr,ext):
  347. return addr.split('.')[-1] == ext
  348. def file_exists(f):
  349. try:
  350. os.stat(f)
  351. return True
  352. except:
  353. return False
  354. def file_is_readable(f):
  355. from stat import S_IREAD
  356. try:
  357. assert os.stat(f).st_mode & S_IREAD
  358. except:
  359. return False
  360. else:
  361. return True
  362. def get_from_brain_opt_params():
  363. l,p = opt.from_brain.split(',')
  364. return(int(l),p)
  365. def remove_whitespace(s):
  366. return s.translate(dict((ord(ws),None) for ws in '\t\r\n '))
  367. def pretty_format(s,width=80,pfx=''):
  368. out = []
  369. while(s):
  370. if len(s) <= width:
  371. out.append(s)
  372. break
  373. i = s[:width].rfind(' ')
  374. out.append(s[:i])
  375. s = s[i+1:]
  376. return pfx + ('\n'+pfx).join(out)
  377. def block_format(data,gw=2,cols=8,line_nums=None,data_is_hex=False):
  378. assert line_nums in (None,'hex','dec'),"'line_nums' must be one of None, 'hex' or 'dec'"
  379. ln_fs = '{:06x}: ' if line_nums == 'hex' else '{:06}: '
  380. bytes_per_chunk = gw
  381. if data_is_hex:
  382. gw *= 2
  383. nchunks = len(data)//gw + bool(len(data)%gw)
  384. return ''.join(
  385. ('' if (line_nums == None or i % cols) else ln_fs.format(i*bytes_per_chunk))
  386. + data[i*gw:i*gw+gw]
  387. + (' ' if (not cols or (i+1) % cols) else '\n')
  388. for i in range(nchunks)
  389. ).rstrip() + '\n'
  390. def pretty_hexdump(data,gw=2,cols=8,line_nums=None):
  391. return block_format(data.hex(),gw,cols,line_nums,data_is_hex=True)
  392. def decode_pretty_hexdump(data):
  393. from string import hexdigits
  394. pat = re.compile(fr'^[{hexdigits}]+:\s+')
  395. lines = [pat.sub('',line) for line in data.splitlines()]
  396. try:
  397. return bytes.fromhex(''.join((''.join(lines).split())))
  398. except:
  399. msg('Data not in hexdump format')
  400. return False
  401. def strip_comments(line):
  402. return re.sub(r'\s+$','',re.sub(r'#.*','',line,1))
  403. def remove_comments(lines):
  404. return [m for m in [strip_comments(l) for l in lines] if m != '']
  405. def get_hash_params(hash_preset):
  406. if hash_preset in g.hash_presets:
  407. return g.hash_presets[hash_preset] # N,p,r,buflen
  408. else: # Shouldn't be here
  409. die(3,"{}: invalid 'hash_preset' value".format(hash_preset))
  410. def compare_chksums(chk1,desc1,chk2,desc2,hdr='',die_on_fail=False,verbose=False):
  411. if not chk1 == chk2:
  412. fs = "{} ERROR: {} checksum ({}) doesn't match {} checksum ({})"
  413. m = fs.format((hdr+':\n ' if hdr else 'CHECKSUM'),desc2,chk2,desc1,chk1)
  414. if die_on_fail:
  415. die(3,m)
  416. else:
  417. vmsg(m,force=verbose)
  418. return False
  419. vmsg('{} checksum OK ({})'.format(capfirst(desc1),chk1))
  420. return True
  421. def compare_or_die(val1, desc1, val2, desc2, e='Error'):
  422. if val1 != val2:
  423. die(3,"{}: {} ({}) doesn't match {} ({})".format(e,desc2,val2,desc1,val1))
  424. dmsg('{} OK ({})'.format(capfirst(desc2),val2))
  425. return True
  426. def open_file_or_exit(filename,mode,silent=False):
  427. try:
  428. return open(filename, mode)
  429. except:
  430. op = ('writing','reading')['r' in mode]
  431. die(2,("Unable to open file '{}' for {}".format(filename,op),'')[silent])
  432. def check_file_type_and_access(fname,ftype,blkdev_ok=False):
  433. a = ((os.R_OK,'read'),(os.W_OK,'writ'))
  434. access,m = a[ftype in ('output file','output directory')]
  435. ok_types = [
  436. (stat.S_ISREG,'regular file'),
  437. (stat.S_ISLNK,'symbolic link')
  438. ]
  439. if blkdev_ok: ok_types.append((stat.S_ISBLK,'block device'))
  440. if ftype == 'output directory': ok_types = [(stat.S_ISDIR, 'output directory')]
  441. try: mode = os.stat(fname).st_mode
  442. except:
  443. raise FileNotFound("Requested {} '{}' not found".format(ftype,fname))
  444. for t in ok_types:
  445. if t[0](mode): break
  446. else:
  447. die(1,"Requested {} '{}' is not a {}".format(ftype,fname,' or '.join([t[1] for t in ok_types])))
  448. if not os.access(fname, access):
  449. die(1,"Requested {} '{}' is not {}able by you".format(ftype,fname,m))
  450. return True
  451. def check_infile(f,blkdev_ok=False):
  452. return check_file_type_and_access(f,'input file',blkdev_ok=blkdev_ok)
  453. def check_outfile(f,blkdev_ok=False):
  454. return check_file_type_and_access(f,'output file',blkdev_ok=blkdev_ok)
  455. def check_outdir(f):
  456. return check_file_type_and_access(f,'output directory')
  457. def check_wallet_extension(fn):
  458. from .wallet import Wallet
  459. if not Wallet.ext_to_type(get_extension(fn)):
  460. raise BadFileExtension("'{}': unrecognized seed source file extension".format(fn))
  461. def make_full_path(outdir,outfile):
  462. return os.path.normpath(os.path.join(outdir, os.path.basename(outfile)))
  463. def get_seed_file(cmd_args,nargs,invoked_as=None):
  464. from .filename import find_file_in_dir
  465. from .wallet import MMGenWallet
  466. wf = find_file_in_dir(MMGenWallet,g.data_dir)
  467. wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
  468. import mmgen.opts as opts
  469. if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs:
  470. if not wf:
  471. msg('No default wallet found, and no other seed source was specified')
  472. opts.usage()
  473. elif len(cmd_args) > nargs:
  474. opts.usage()
  475. elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
  476. qmsg('Warning: overriding default wallet with user-supplied wallet')
  477. if cmd_args or wf:
  478. check_infile(cmd_args[0] if cmd_args else wf)
  479. return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt]
  480. def confirm_or_raise(message,q,expect='YES',exit_msg='Exiting at user request'):
  481. m = message.strip()
  482. if m: msg(m)
  483. a = q+' ' if q[0].isupper() else 'Are you sure you want to {}?\n'.format(q)
  484. b = "Type uppercase '{}' to confirm: ".format(expect)
  485. if my_raw_input(a+b).strip() != expect:
  486. raise UserNonConfirmation(exit_msg)
  487. def write_data_to_file( outfile,data,desc='data',
  488. ask_write=False,
  489. ask_write_prompt='',
  490. ask_write_default_yes=True,
  491. ask_overwrite=True,
  492. ask_tty=True,
  493. no_tty=False,
  494. quiet=False,
  495. binary=False,
  496. ignore_opt_outdir=False,
  497. check_data=False,
  498. cmp_data=None):
  499. if quiet: ask_tty = ask_overwrite = False
  500. if opt.quiet: ask_overwrite = False
  501. if ask_write_default_yes == False or ask_write_prompt:
  502. ask_write = True
  503. def do_stdout():
  504. qmsg('Output to STDOUT requested')
  505. if g.stdin_tty:
  506. if no_tty:
  507. die(2,'Printing {} to screen is not allowed'.format(desc))
  508. if (ask_tty and not opt.quiet) or binary:
  509. confirm_or_raise('','output {} to screen'.format(desc))
  510. else:
  511. try: of = os.readlink('/proc/{}/fd/1'.format(os.getpid())) # Linux
  512. except: of = None # Windows
  513. if of:
  514. if of[:5] == 'pipe:':
  515. if no_tty:
  516. die(2,'Writing {} to pipe is not allowed'.format(desc))
  517. if ask_tty and not opt.quiet:
  518. confirm_or_raise('','output {} to pipe'.format(desc))
  519. msg('')
  520. of2,pd = os.path.relpath(of),os.path.pardir
  521. msg("Redirecting output to file '{}'".format((of2,of)[of2[:len(pd)] == pd]))
  522. else:
  523. msg('Redirecting output to file')
  524. if binary and g.platform == 'win':
  525. import msvcrt
  526. msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
  527. # MSWin workaround. See msg_r()
  528. try:
  529. sys.stdout.write(data.decode() if isinstance(data,bytes) else data)
  530. except:
  531. os.write(1,data if isinstance(data,bytes) else data.encode())
  532. def do_file(outfile,ask_write_prompt):
  533. if opt.outdir and not ignore_opt_outdir and not os.path.isabs(outfile):
  534. outfile = make_full_path(opt.outdir,outfile)
  535. if ask_write:
  536. if not ask_write_prompt: ask_write_prompt = 'Save {}?'.format(desc)
  537. if not keypress_confirm(ask_write_prompt,
  538. default_yes=ask_write_default_yes):
  539. die(1,'{} not saved'.format(capfirst(desc)))
  540. hush = False
  541. if file_exists(outfile) and ask_overwrite:
  542. q = "File '{}' already exists\nOverwrite?".format(outfile)
  543. confirm_or_raise('',q)
  544. msg("Overwriting file '{}'".format(outfile))
  545. hush = True
  546. # not atomic, but better than nothing
  547. # if cmp_data is empty, file can be either empty or non-existent
  548. if check_data:
  549. try:
  550. d = open(outfile,('r','rb')[bool(binary)]).read()
  551. except:
  552. d = ''
  553. finally:
  554. if d != cmp_data:
  555. if g.test_suite:
  556. print_diff(cmp_data,d)
  557. m = "{} in file '{}' has been altered by some other program! Aborting file write"
  558. die(3,m.format(desc,outfile))
  559. # To maintain portability, always open files in binary mode
  560. # If 'binary' option not set, encode/decode data before writing and after reading
  561. f = open_file_or_exit(outfile,'wb')
  562. try:
  563. f.write(data if binary else data.encode())
  564. except:
  565. die(2,"Failed to write {} to file '{}'".format(desc,outfile))
  566. f.close
  567. if not (hush or quiet):
  568. msg("{} written to file '{}'".format(capfirst(desc),outfile))
  569. return True
  570. if opt.stdout or outfile in ('','-'):
  571. do_stdout()
  572. elif sys.stdin.isatty() and not sys.stdout.isatty():
  573. do_stdout()
  574. else:
  575. do_file(outfile,ask_write_prompt)
  576. def get_words_from_user(prompt):
  577. words = my_raw_input(prompt, echo=opt.echo_passphrase).split()
  578. dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  579. return words
  580. def get_words_from_file(infile,desc,quiet=False):
  581. if not quiet:
  582. qmsg("Getting {} from file '{}'".format(desc,infile))
  583. f = open_file_or_exit(infile, 'rb')
  584. try: words = f.read().decode().split()
  585. except: die(1,'{} data must be UTF-8 encoded.'.format(capfirst(desc)))
  586. f.close()
  587. dmsg('Sanitized input: [{}]'.format(' '.join(words)))
  588. return words
  589. def get_words(infile,desc,prompt):
  590. if infile:
  591. return get_words_from_file(infile,desc)
  592. else:
  593. return get_words_from_user(prompt)
  594. def mmgen_decrypt_file_maybe(fn,desc='',quiet=False,silent=False):
  595. d = get_data_from_file(fn,desc,binary=True,quiet=quiet,silent=silent)
  596. have_enc_ext = get_extension(fn) == g.mmenc_ext
  597. if have_enc_ext or not is_utf8(d):
  598. m = ('Attempting to decrypt','Decrypting')[have_enc_ext]
  599. qmsg("{} {} '{}'".format(m,desc,fn))
  600. from .crypto import mmgen_decrypt_retry
  601. d = mmgen_decrypt_retry(d,desc)
  602. return d
  603. def get_lines_from_file(fn,desc='',trim_comments=False,quiet=False,silent=False):
  604. dec = mmgen_decrypt_file_maybe(fn,desc,quiet=quiet,silent=silent)
  605. ret = dec.decode().splitlines()
  606. if trim_comments: ret = remove_comments(ret)
  607. dmsg("Got {} lines from file '{}'".format(len(ret),fn))
  608. return ret
  609. def get_data_from_user(desc='data'): # user input MUST be UTF-8
  610. p = ('','Enter {}: '.format(desc))[g.stdin_tty]
  611. data = my_raw_input(p,echo=opt.echo_passphrase)
  612. dmsg('User input: [{}]'.format(data))
  613. return data
  614. def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False):
  615. if not opt.quiet and not silent and not quiet and desc:
  616. qmsg("Getting {} from file '{}'".format(desc,infile))
  617. if dash and infile == '-':
  618. data = os.fdopen(0,'rb').read(g.max_input_size+1)
  619. else:
  620. data = open_file_or_exit(infile,'rb',silent=silent).read(g.max_input_size+1)
  621. if not binary:
  622. data = data.decode()
  623. if len(data) == g.max_input_size + 1:
  624. raise MaxInputSizeExceeded('Too much input data! Max input data size: {} bytes'.format(g.max_input_size))
  625. return data
  626. passwd_files_used = {}
  627. def pwfile_reuse_warning(passwd_file):
  628. if passwd_file in passwd_files_used:
  629. qmsg(f'Reusing passphrase from file {passwd_file!r} at user request')
  630. return True
  631. passwd_files_used[passwd_file] = True
  632. return False
  633. def my_raw_input(prompt,echo=True,insert_txt='',use_readline=True):
  634. try: import readline
  635. except: use_readline = False # Windows
  636. if use_readline and sys.stdout.isatty():
  637. def st_hook(): readline.insert_text(insert_txt)
  638. readline.set_startup_hook(st_hook)
  639. else:
  640. msg_r(prompt)
  641. prompt = ''
  642. from .term import kb_hold_protect
  643. kb_hold_protect()
  644. if g.test_suite_popen_spawn:
  645. msg(prompt)
  646. sys.stderr.flush()
  647. reply = os.read(0,4096).decode()
  648. elif echo or not sys.stdin.isatty():
  649. reply = input(prompt)
  650. else:
  651. from getpass import getpass
  652. if g.platform == 'win':
  653. # MSWin hack - getpass('foo') doesn't flush stderr
  654. msg_r(prompt.strip()) # getpass('') adds a space
  655. sys.stderr.flush()
  656. reply = getpass('')
  657. else:
  658. reply = getpass(prompt)
  659. kb_hold_protect()
  660. try:
  661. return reply.strip()
  662. except:
  663. die(1,'User input must be UTF-8 encoded.')
  664. def keypress_confirm(prompt,default_yes=False,verbose=False,no_nl=False,complete_prompt=False):
  665. q = ('(y/N)','(Y/n)')[bool(default_yes)]
  666. p = prompt if complete_prompt else '{} {}: '.format(prompt,q)
  667. nl = ('\n','\r{}\r'.format(' '*len(p)))[no_nl]
  668. if g.accept_defaults:
  669. msg(p)
  670. return default_yes
  671. from .term import get_char
  672. while True:
  673. reply = get_char(p,immed_chars='yYnN').strip('\n\r')
  674. if not reply:
  675. msg_r(nl)
  676. return True if default_yes else False
  677. elif reply in 'yYnN':
  678. msg_r(nl)
  679. return True if reply in 'yY' else False
  680. else:
  681. msg_r('\nInvalid reply\n' if verbose else '\r')
  682. def do_pager(text):
  683. pagers = ['less','more']
  684. end_msg = '\n(end of text)\n\n'
  685. # --- Non-MSYS Windows code deleted ---
  686. # raw, chop, horiz scroll 8 chars, disable buggy line chopping in MSYS
  687. os.environ['LESS'] = (('--shift 8 -RS'),('-cR -#1'))[g.platform=='win']
  688. if 'PAGER' in os.environ and os.environ['PAGER'] != pagers[0]:
  689. pagers = [os.environ['PAGER']] + pagers
  690. for pager in pagers:
  691. try:
  692. from subprocess import run
  693. m = text + ('' if pager == 'less' else end_msg)
  694. p = run([pager],input=m.encode(),check=True)
  695. msg_r('\r')
  696. except:
  697. pass
  698. else:
  699. break
  700. else:
  701. Msg(text+end_msg)
  702. def do_license_msg(immed=False):
  703. if opt.quiet or g.no_license or opt.yes or not g.stdin_tty:
  704. return
  705. p = "Press 'w' for conditions and warranty info, or 'c' to continue:"
  706. import mmgen.license as gpl
  707. msg(gpl.warning)
  708. prompt = '{} '.format(p.strip())
  709. from .term import get_char
  710. while True:
  711. reply = get_char(prompt, immed_chars=('','wc')[bool(immed)])
  712. if reply == 'w':
  713. do_pager(gpl.conditions)
  714. elif reply == 'c':
  715. msg('')
  716. break
  717. else:
  718. msg_r('\r')
  719. msg('')
  720. def format_par(s,indent=0,width=80,as_list=False):
  721. words,lines = s.split(),[]
  722. assert width >= indent + 4,'width must be >= indent + 4'
  723. while words:
  724. line = ''
  725. while len(line) <= (width-indent) and words:
  726. if line and len(line) + len(words[0]) + 1 > width-indent: break
  727. line += ('',' ')[bool(line)] + words.pop(0)
  728. lines.append(' '*indent + line)
  729. return lines if as_list else '\n'.join(lines) + '\n'
  730. def altcoin_subclass(cls,proto,mod_dir):
  731. """
  732. magic module loading and class retrieval
  733. """
  734. from .protocol import CoinProtocol
  735. if isinstance(proto,CoinProtocol.Bitcoin):
  736. return cls
  737. modname = f'mmgen.altcoins.{proto.base_coin.lower()}.{mod_dir}'
  738. import importlib
  739. if mod_dir == 'tx': # nested classes
  740. outer_clsname,inner_clsname = (
  741. proto.mod_clsname
  742. + ('Token' if proto.tokensym else '')
  743. + cls.__qualname__ ).split('.')
  744. return getattr(getattr(importlib.import_module(modname),outer_clsname),inner_clsname)
  745. else:
  746. clsname = (
  747. proto.mod_clsname
  748. + ('Token' if proto.tokensym else '')
  749. + cls.__name__ )
  750. return getattr(importlib.import_module(modname),clsname)
  751. # decorator for TrackingWallet
  752. def write_mode(orig_func):
  753. def f(self,*args,**kwargs):
  754. if self.mode != 'w':
  755. m = '{} opened in read-only mode: cannot execute method {}()'
  756. die(1,m.format(type(self).__name__,locals()['orig_func'].__name__))
  757. return orig_func(self,*args,**kwargs)
  758. return f
  759. def run_session(callback,backend=None):
  760. backend = backend or opt.rpc_backend
  761. import asyncio
  762. async def do():
  763. if backend == 'aiohttp':
  764. import aiohttp
  765. async with aiohttp.ClientSession(
  766. headers = { 'Content-Type': 'application/json' },
  767. connector = aiohttp.TCPConnector(limit_per_host=g.aiohttp_rpc_queue_len),
  768. ) as g.session:
  769. ret = await callback
  770. return ret
  771. else:
  772. return await callback
  773. # return asyncio.run(do()) # Python 3.7+
  774. return asyncio.get_event_loop().run_until_complete(do())