common.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2020 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. common.py: Shared routines and data for the MMGen test suites
  20. """
  21. class TestSuiteException(Exception): pass
  22. class TestSuiteFatalException(Exception): pass
  23. import os
  24. from mmgen.common import *
  25. from mmgen.devtools import *
  26. ascii_uc = ''.join(map(chr,list(range(65,91)))) # 26 chars
  27. ascii_lc = ''.join(map(chr,list(range(97,123)))) # 26 chars
  28. lat_accent = ''.join(map(chr,list(range(192,383)))) # 191 chars, L,S
  29. ru_uc = ''.join(map(chr,list(range(1040,1072)))) # 32 chars
  30. gr_uc = ''.join(map(chr,list(range(913,930)) + list(range(931,940)))) # 26 chars (930 is ctrl char)
  31. gr_uc_w_ctrl = ''.join(map(chr,list(range(913,940)))) # 27 chars, L,C
  32. lat_cyr_gr = lat_accent[:130:5] + ru_uc + gr_uc # 84 chars
  33. ascii_cyr_gr = ascii_uc + ru_uc + gr_uc # 84 chars
  34. utf8_text = '[α-$ample UTF-8 text-ω]' * 10 # 230 chars, L,N,P,S,Z
  35. utf8_combining = '[α-$ámple UTF-8 téxt-ω]' * 10 # L,N,P,S,Z,M
  36. utf8_ctrl = '[α-$ample\nUTF-8\ntext-ω]' * 10 # L,N,P,S,Z,C
  37. text_jp = '必要なのは、信用ではなく暗号化された証明に基づく電子取引システムであり、これにより希望する二者が信用できる第三者機関を介さずに直接取引できるよう' # 72 chars ('W'ide)
  38. text_zh = '所以,我們非常需要這樣一種電子支付系統,它基於密碼學原理而不基於信用,使得任何達成一致的雙方,能夠直接進行支付,從而不需要協力廠商仲介的參與。。' # 72 chars ('F'ull + 'W'ide)
  39. sample_text = 'The Times 03/Jan/2009 Chancellor on brink of second bailout for banks'
  40. ref_kafile_pass = 'kafile password'
  41. ref_kafile_hash_preset = '1'
  42. def getrandnum(n): return int(os.urandom(n).hex(),16)
  43. def getrandhex(n): return os.urandom(n).hex()
  44. def getrandnum_range(nbytes,rn_max):
  45. while True:
  46. rn = int(os.urandom(nbytes).hex(),16)
  47. if rn < rn_max: return rn
  48. def getrandstr(num_chars,no_space=False):
  49. n,m = 95,32
  50. if no_space: n,m = 94,33
  51. return ''.join([chr(i%n+m) for i in list(os.urandom(num_chars))])
  52. # Windows uses non-UTF8 encodings in filesystem, so use raw bytes here
  53. def cleandir(d,do_msg=False):
  54. d_enc = d.encode()
  55. try: files = os.listdir(d_enc)
  56. except: return
  57. from shutil import rmtree
  58. if do_msg: gmsg("Cleaning directory '{}'".format(d))
  59. for f in files:
  60. try:
  61. os.unlink(os.path.join(d_enc,f))
  62. except:
  63. rmtree(os.path.join(d_enc,f),ignore_errors=True)
  64. def mk_tmpdir(d):
  65. try: os.mkdir(d,0o755)
  66. except OSError as e:
  67. if e.errno != 17: raise
  68. else:
  69. vmsg("Created directory '{}'".format(d))
  70. # def mk_tmpdir_path(path,cfg):
  71. # try:
  72. # name = os.path.split(cfg['tmpdir'])[-1]
  73. # src = os.path.join(path,name)
  74. # try:
  75. # os.unlink(cfg['tmpdir'])
  76. # except OSError as e:
  77. # if e.errno != 2: raise
  78. # finally:
  79. # os.mkdir(src)
  80. # os.symlink(src,cfg['tmpdir'])
  81. # except OSError as e:
  82. # if e.errno != 17: raise
  83. # else: msg("Created directory '{}'".format(cfg['tmpdir']))
  84. def get_tmpfile(cfg,fn):
  85. return os.path.join(cfg['tmpdir'],fn)
  86. def write_to_file(fn,data,binary=False):
  87. write_data_to_file( fn,
  88. data,
  89. quiet = True,
  90. binary = binary,
  91. ignore_opt_outdir = True )
  92. def write_to_tmpfile(cfg,fn,data,binary=False):
  93. write_to_file( os.path.join(cfg['tmpdir'],fn), data=data, binary=binary )
  94. def read_from_file(fn,binary=False):
  95. from mmgen.util import get_data_from_file
  96. return get_data_from_file(fn,quiet=True,binary=binary)
  97. def read_from_tmpfile(cfg,fn,binary=False):
  98. return read_from_file(os.path.join(cfg['tmpdir'],fn),binary=binary)
  99. def joinpath(*args,**kwargs):
  100. return os.path.join(*args,**kwargs)
  101. def ok():
  102. if opt.profile: return
  103. if opt.verbose or opt.exact_output:
  104. gmsg('OK')
  105. else: msg(' OK')
  106. def cmp_or_die(s,t,desc=None):
  107. if s != t:
  108. m = 'ERROR: recoded data:\n{!r}\ndiffers from original data:\n{!r}'
  109. if desc: m = 'For {}:\n{}'.format(desc,m)
  110. raise TestSuiteFatalException(m.format(t,s))
  111. def init_coverage():
  112. coverdir = os.path.join('test','trace')
  113. acc_file = os.path.join('test','trace.acc')
  114. try: os.mkdir(coverdir,0o755)
  115. except: pass
  116. return coverdir,acc_file
  117. devnull_fh = open(('/dev/null','null.out')[g.platform == 'win'],'w')
  118. def silence():
  119. if not (opt.verbose or (hasattr(opt,'exact_output') and opt.exact_output)):
  120. g.stdout = g.stderr = devnull_fh
  121. def end_silence():
  122. if not (opt.verbose or (hasattr(opt,'exact_output') and opt.exact_output)):
  123. g.stdout = sys.stdout
  124. g.stderr = sys.stderr
  125. def omsg(s):
  126. sys.stderr.write(s + '\n')
  127. def omsg_r(s):
  128. sys.stderr.write(s)
  129. sys.stderr.flush()
  130. def imsg(s):
  131. if opt.verbose or (hasattr(opt,'exact_output') and opt.exact_output):
  132. omsg(s)
  133. def imsg_r(s):
  134. if opt.verbose or (hasattr(opt,'exact_output') and opt.exact_output):
  135. omsg_r(s)
  136. def iqmsg(s):
  137. if not opt.quiet: omsg(s)
  138. def iqmsg_r(s):
  139. if not opt.quiet: omsg_r(s)
  140. def start_test_daemons(*network_ids):
  141. return test_daemons_ops(*network_ids,op='start')
  142. def stop_test_daemons(*network_ids):
  143. return test_daemons_ops(*network_ids,op='stop')
  144. def test_daemons_ops(*network_ids,op):
  145. if opt.no_daemon_autostart:
  146. return
  147. from mmgen.daemon import CoinDaemon
  148. silent = not opt.verbose and not (hasattr(opt,'exact_output') and opt.exact_output)
  149. for network_id in network_ids:
  150. if network_id not in CoinDaemon.network_ids: # silently ignore invalid IDs
  151. continue
  152. CoinDaemon(network_id,test_suite=True).cmd(op,silent=silent)