group_mgr.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. cmdtest_d.include.group_mgr: Command group manager for the MMGen Wallet cmdtest suite
  12. """
  13. import sys, os, time
  14. from mmgen.color import yellow, green, cyan
  15. from mmgen.util import Msg, die
  16. from .cfg import cfgs, cmd_groups_dfl, cmd_groups_extra
  17. class CmdGroupMgr:
  18. dpy_data = None
  19. cmd_groups = cmd_groups_dfl.copy()
  20. cmd_groups.update(cmd_groups_extra)
  21. cfg_attrs = (
  22. 'seed_len',
  23. 'seed_id',
  24. 'wpasswd',
  25. 'kapasswd',
  26. 'segwit',
  27. 'hash_preset',
  28. 'bw_filename',
  29. 'bw_params',
  30. 'ref_bw_seed_id',
  31. 'addr_idx_list',
  32. 'pass_idx_list')
  33. def __init__(self, cfg):
  34. self.cfg = cfg
  35. self.network_id = cfg._proto.coin.lower() + ('_tn' if cfg._proto.testnet else '')
  36. self.name = type(self).__name__
  37. @classmethod
  38. def get_cmd_groups(cls, cfg):
  39. exclude = cfg.exclude_groups.split(',') if cfg.exclude_groups else []
  40. for e in exclude:
  41. if e not in cmd_groups_dfl:
  42. die(1, f'{e!r}: group not recognized')
  43. return [s for s in cmd_groups_dfl if s not in exclude]
  44. def create_cmd_group(self, cls, sg_name=None):
  45. cmd_group_in = dict(cls.cmd_group_in)
  46. if sg_name and 'subgroup.' + sg_name not in cmd_group_in:
  47. die(1, f'{sg_name!r}: no such subgroup in test group {cls.__name__}')
  48. def add_entries(key, add_deps=True, added_subgroups=[]):
  49. if add_deps:
  50. for dep in cmd_group_in['subgroup.'+key]:
  51. yield from add_entries(dep)
  52. assert isinstance(cls.cmd_subgroups[key][0], str), f'header for subgroup {key!r} missing!'
  53. if not key in added_subgroups:
  54. yield from cls.cmd_subgroups[key][1:]
  55. added_subgroups.append(key)
  56. def gen():
  57. for name, data in cls.cmd_group_in:
  58. if name.startswith('subgroup.'):
  59. sg_key = name.removeprefix('subgroup.')
  60. if sg_name in (None, sg_key):
  61. yield from add_entries(
  62. sg_key,
  63. add_deps = sg_name and not self.cfg.skipping_deps,
  64. added_subgroups = [sg_name] if self.cfg.deps_only else [])
  65. if self.cfg.deps_only and sg_key == sg_name:
  66. return
  67. else:
  68. yield (name, data)
  69. return tuple(gen())
  70. def load_mod(self, gname, modname=None):
  71. grp = self.cmd_groups[gname]
  72. if modname is None and 'modname' in grp.params:
  73. modname = grp.params['modname']
  74. import importlib
  75. modpath = f'test.cmdtest_d.{modname or gname}'
  76. return getattr(importlib.import_module(modpath), grp.clsname)
  77. def create_group(self, gname, sg_name, full_data=False, modname=None, is3seed=False, add_dpy=False):
  78. """
  79. Initializes the list 'cmd_list' and dict 'dpy_data' from module's cmd_group data.
  80. Alternatively, if called with 'add_dpy=True', updates 'dpy_data' from module data
  81. without touching 'cmd_list'
  82. """
  83. cls = self.load_mod(gname, modname)
  84. cdata = []
  85. def get_shared_deps(cmdname, tmpdir_idx):
  86. """
  87. shared_deps are "implied" dependencies for all cmds in cmd_group that don't appear in
  88. the cmd_group data or cmds' argument lists. Supported only for 3seed tests at present.
  89. """
  90. if not hasattr(cls, 'shared_deps'):
  91. return []
  92. return [k for k, v in cfgs[str(tmpdir_idx)]['dep_generators'].items()
  93. if k in cls.shared_deps and v != cmdname]
  94. if not 'cmd_group' in cls.__dict__ and hasattr(cls, 'cmd_group_in'):
  95. cls.cmd_group = self.create_cmd_group(cls, sg_name)
  96. for a, b in cls.cmd_group:
  97. if is3seed:
  98. for n, (i, j) in enumerate(zip(cls.tmpdir_nums, (128, 192, 256))):
  99. k = f'{a}_{n+1}'
  100. if hasattr(cls, 'skip_cmds') and k in cls.skip_cmds:
  101. continue
  102. sdeps = get_shared_deps(k, i)
  103. if isinstance(b, str):
  104. cdata.append((k, (i, f'{b} ({j}-bit)', [[[]+sdeps, i]])))
  105. else:
  106. cdata.append((k, (i, f'{b[1]} ({j}-bit)', [[b[0]+sdeps, i]])))
  107. elif full_data:
  108. cdata.append((a, b))
  109. else:
  110. cdata.append((a, (cls.tmpdir_nums[0], b, [[[], cls.tmpdir_nums[0]]])))
  111. if add_dpy:
  112. self.dpy_data.update(dict(cdata))
  113. else:
  114. self.cmd_list = tuple(e[0] for e in cdata)
  115. self.dpy_data = dict(cdata)
  116. cls.full_data = full_data or is3seed
  117. if not cls.full_data:
  118. cls.tmpdir_num = cls.tmpdir_nums[0]
  119. for k, v in cfgs[str(cls.tmpdir_num)].items():
  120. setattr(cls, k, v)
  121. return cls
  122. def gm_init_group(self, cfg, trunner, gname, sg_name, spawn_prog):
  123. cls = self.create_group(gname, sg_name, **self.cmd_groups[gname].params)
  124. cls.group_name = gname
  125. return cls(cfg, trunner, cfgs, spawn_prog)
  126. def get_cls_by_gname(self, gname):
  127. return self.load_mod(gname, self.cmd_groups[gname].params.get('modname'))
  128. def list_cmd_groups(self):
  129. if self.cfg.list_current_cmd_groups:
  130. names = tuple(cmd_groups_dfl) + tuple(self.cfg._args)
  131. exclude = self.cfg.exclude_groups.split(',') if self.cfg.exclude_groups else []
  132. ginfo = {name: cls
  133. for name, cls in [(gname, self.get_cls_by_gname(gname)) for gname in names]
  134. if self.network_id in cls.networks and not name in exclude}
  135. desc = 'CONFIGURED'
  136. else:
  137. ginfo = {name: self.get_cls_by_gname(name) for name in self.cmd_groups}
  138. desc = 'AVAILABLE'
  139. def gen_output():
  140. yield green(f'{desc} COMMAND GROUPS AND SUBGROUPS:')
  141. yield ''
  142. name_w = max(len(name) for name in ginfo)
  143. for name, cls in ginfo.items():
  144. if not cls.is_helper:
  145. yield ' {} - {}'.format(
  146. yellow(name.ljust(name_w)),
  147. (cls.__doc__.strip() if cls.__doc__ else cls.__name__))
  148. if 'cmd_subgroups' in cls.__dict__:
  149. subgroups = {k:v for k, v in cls.cmd_subgroups.items() if not k.startswith('_')}
  150. max_w = max(len(k) for k in subgroups)
  151. for k, v in subgroups.items():
  152. yield ' + {} · {}'.format(cyan(k.ljust(max_w+1)), v[0])
  153. from mmgen.ui import do_pager
  154. do_pager('\n'.join(gen_output()))
  155. Msg('\n' + ' '.join(ginfo))
  156. def find_cmd_in_groups(self, cmd, group=None):
  157. """
  158. Search for a test command in specified group or all configured command groups
  159. and return it as a string. Loads modules but alters no global variables.
  160. """
  161. if group:
  162. if not group in [e[0] for e in self.cmd_groups]:
  163. die(1, f'{group!r}: unrecognized group')
  164. groups = [self.cmd_groups[group]]
  165. else:
  166. groups = self.cmd_groups
  167. for gname in groups:
  168. cls = self.get_cls_by_gname(gname)
  169. if not hasattr(cls, 'cmd_group'):
  170. cls.cmd_group = self.create_cmd_group(cls)
  171. if cmd in cls.cmd_group: # first search the class
  172. return gname
  173. if cmd in dir(cls(self.cfg, None, None, None)): # then a throwaway instance
  174. return gname # cmd might exist in more than one group - we'll go with the first
  175. return None