exec_wrapper.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. scripts/exec_wrapper.py: wrapper to launch MMGen scripts in a testing environment
  12. """
  13. # Import as few modules and define as few names as possible at module level before exec'ing the
  14. # file, as all names will be seen by the exec'ed code. To prevent name collisions, all names
  15. # defined or imported here at module level should begin with 'exec_wrapper_'
  16. def exec_wrapper_get_colors():
  17. from collections import namedtuple
  18. return namedtuple('colors', ['red', 'green', 'yellow', 'blue', 'purple'])(*[
  19. (lambda s:s) if exec_wrapper_os.getenv('MMGEN_DISABLE_COLOR') else
  20. (lambda s, n=n:f'\033[{n};1m{s}\033[0m')
  21. for n in (31, 32, 33, 34, 35)])
  22. def exec_wrapper_init():
  23. if exec_wrapper_os.path.dirname(exec_wrapper_sys.argv[1]) == 'test':
  24. # support running of test scripts under wrapper
  25. repo_root = exec_wrapper_os.getcwd() # assume we’re in repo root
  26. exec_wrapper_sys.path[0] = repo_root
  27. # ensure loading of mmgen mods from overlay tree, not repo root:
  28. from test.overlay import overlay_setup
  29. overlay_setup(repo_root)
  30. else:
  31. exec_wrapper_sys.path.pop(0)
  32. exec_wrapper_os.environ['MMGEN_EXEC_WRAPPER'] = '1'
  33. def exec_wrapper_write_traceback(e, exit_val):
  34. import sys, os
  35. exc_line = (
  36. f'{type(e).__name__}({e.mmcode}) {e}' if type(e).__name__ in ('MMGenError', 'MMGenSystemExit') else
  37. f'{type(e).__name__}: {e}')
  38. c = exec_wrapper_get_colors()
  39. if os.getenv('EXEC_WRAPPER_TRACEBACK'):
  40. import traceback
  41. cwd = os.getcwd()
  42. sys.path.insert(0, cwd)
  43. from test.overlay import get_overlay_tree_dir
  44. overlay_path_pfx = os.path.relpath(get_overlay_tree_dir(cwd)) + '/'
  45. def fixup_fn(fn_in):
  46. fn = fn_in.removeprefix(cwd+'/').removeprefix(overlay_path_pfx)
  47. return fn.removesuffix('_orig.py') + '.py' if fn.endswith('_orig.py') else fn
  48. def gen_output():
  49. yield 'Traceback (most recent call last):'
  50. for e in traceback.extract_tb(sys.exc_info()[2]):
  51. yield ' File "{f}", line {l}, in {n}\n {L}'.format(
  52. f = exec_wrapper_execed_file if e.filename == '<string>' else fixup_fn(e.filename),
  53. l = '(scrubbed)' if os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC') else e.lineno,
  54. n = e.name,
  55. L = e.line or 'N/A')
  56. tb_lines = list(gen_output())
  57. if 'SystemExit' in exc_line:
  58. tb_lines.pop()
  59. if os.getenv('EXEC_WRAPPER_EXIT_VAL') == str(exit_val):
  60. sys.stdout.write(c.yellow(exc_line) + '\n')
  61. else:
  62. sys.stdout.write('{}\n{}\n'.format(
  63. c.yellow('\n'.join(tb_lines)),
  64. c.red(exc_line)))
  65. print(c.blue('{} script exited with error').format(
  66. 'Test' if os.path.dirname(sys.argv[0]) == 'test' else 'Spawned'))
  67. with open('test.err', 'w') as fp:
  68. fp.write('\n'.join(tb_lines + [exc_line]))
  69. else:
  70. sys.stdout.write(c.purple((f'NONZERO_EXIT[{exit_val}]: ' if exit_val else '') + exc_line) + '\n')
  71. def exec_wrapper_end_msg():
  72. if (
  73. exec_wrapper_os.getenv('EXEC_WRAPPER_DO_RUNTIME_MSG')
  74. and not exec_wrapper_os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC')):
  75. c = exec_wrapper_get_colors()
  76. # write to stdout to ensure script output gets to terminal first
  77. exec_wrapper_sys.stdout.write(c.blue('Runtime: {:0.5f} secs\n'.format(
  78. exec_wrapper_time.time() - exec_wrapper_tstart)))
  79. def exec_wrapper_tracemalloc_setup():
  80. exec_wrapper_os.environ['PYTHONTRACEMALLOC'] = '1'
  81. import tracemalloc
  82. tracemalloc.start()
  83. exec_wrapper_sys.stderr.write("INFO → Appending memory allocation stats to 'tracemalloc.log'\n")
  84. def exec_wrapper_tracemalloc_log():
  85. import tracemalloc, re
  86. snapshot = tracemalloc.take_snapshot()
  87. stats = snapshot.statistics('lineno')
  88. depth = 100
  89. col1w = 100
  90. with open('tracemalloc.log', 'a') as fp:
  91. fp.write('##### TOP {} {} #####\n'.format(depth, ' '.join(exec_wrapper_sys.argv)))
  92. for stat in stats[:depth]:
  93. frame = stat.traceback[0]
  94. fn = re.sub(r'.*\/site-packages\/|.*\/mmgen\/test\/overlay\/tree\/', '', frame.filename)
  95. fn = re.sub(r'.*\/mmgen\/test\/', 'test/', fn)
  96. fp.write('{f:{w}} {s:>8.2f} KiB\n'.format(
  97. f = f'{fn}:{frame.lineno}:',
  98. s = stat.size/1024,
  99. w = col1w))
  100. fp.write('{f:{w}} {s:8.2f} KiB\n\n'.format(
  101. f = 'TOTAL {}:'.format(' '.join(exec_wrapper_sys.argv))[:col1w],
  102. s = sum(stat.size for stat in stats) / 1024,
  103. w = col1w))
  104. import sys as exec_wrapper_sys
  105. import os as exec_wrapper_os
  106. import time as exec_wrapper_time
  107. exec_wrapper_init() # sets sys.path[0] to overlay root
  108. if exec_wrapper_os.getenv('MMGEN_TRACEMALLOC'):
  109. exec_wrapper_tracemalloc_setup()
  110. # import mmgen mods only after sys.path[0] is set to overlay root!
  111. if exec_wrapper_os.getenv('MMGEN_DEVTOOLS'):
  112. from mmgen.devinit import init_dev as exec_wrapper_init_dev
  113. exec_wrapper_init_dev()
  114. exec_wrapper_tstart = exec_wrapper_time.time()
  115. try:
  116. exec_wrapper_sys.argv.pop(0)
  117. exec_wrapper_execed_file = exec_wrapper_sys.argv[0]
  118. with open(exec_wrapper_execed_file) as fp:
  119. exec(fp.read())
  120. except SystemExit as e:
  121. if e.code != 0:
  122. exec_wrapper_write_traceback(e, e.code)
  123. else:
  124. if exec_wrapper_os.getenv('MMGEN_TRACEMALLOC'):
  125. exec_wrapper_tracemalloc_log()
  126. exec_wrapper_end_msg()
  127. exec_wrapper_sys.exit(e.code)
  128. except Exception as e:
  129. exit_val = e.mmcode if hasattr(e, 'mmcode') else e.code if hasattr(e, 'code') else 1
  130. exec_wrapper_write_traceback(e, exit_val)
  131. exec_wrapper_sys.exit(exit_val)
  132. if exec_wrapper_os.getenv('MMGEN_TRACEMALLOC'):
  133. exec_wrapper_tracemalloc_log()
  134. exec_wrapper_end_msg()