msg.py 11 KB


  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. msg: base message signing classes
  12. """
  13. import os, importlib, json
  14. from .cfg import gc
  15. from .objmethods import MMGenObject, HiliteStr, InitErrors
  16. from .util import msg, die, make_chksum_6, fmt_list, remove_dups
  17. from .color import red, orange, grnbg
  18. from .protocol import init_proto
  19. from .fileutil import get_data_from_file, write_data_to_file
  20. from .addr import MMGenID, CoinAddr
  21. class MMGenIDRange(HiliteStr, InitErrors, MMGenObject):
  22. """
  23. closely based on MMGenID
  24. """
  25. color = 'orange'
  26. width = 0
  27. trunc_ok = False
  28. def __new__(cls, proto, id_str):
  29. from .addrlist import AddrIdxList
  30. from .addr import AddrListID
  31. from .seed import SeedID
  32. try:
  33. match id_str.split(':'):
  34. case [sid, t, fmt_str]:
  35. assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
  36. mmtype = proto.addr_type(t)
  37. case [sid, fmt_str]:
  38. mmtype = proto.addr_type(proto.dfl_mmtype)
  39. case _:
  40. raise ValueError('not 2 or 3 colon-separated items')
  41. me = str.__new__(cls, f'{sid}:{mmtype}:{fmt_str}')
  42. me.sid = SeedID(sid=sid)
  43. me.idxlist = AddrIdxList(fmt_str=fmt_str)
  44. me.mmtype = mmtype
  45. me.al_id = str.__new__(AddrListID, me.sid + ':' + me.mmtype) # checks already done
  46. me.proto = proto
  47. return me
  48. except Exception as e:
  49. return cls.init_fail(e, id_str)
  50. class coin_msg:
  51. class base(MMGenObject):
  52. ext = 'rawmsg.json'
  53. signed = False
  54. chksum_keys = ('addrlists', 'message', 'msghash_type', 'network')
  55. @property
  56. def desc(self):
  57. return ('signed' if self.signed else 'unsigned') + ' message data'
  58. @property
  59. def chksum(self):
  60. return make_chksum_6(
  61. json.dumps(
  62. {k: self.data[k] for k in self.chksum_keys},
  63. sort_keys = True,
  64. separators = (',', ':')
  65. ))
  66. @property
  67. def filename_stem(self):
  68. coin, network = self.data['network'].split('_')
  69. return '{}[{}]{}'.format(
  70. self.chksum.upper(),
  71. coin.upper(),
  72. ('' if network == 'mainnet' else '.'+network))
  73. @property
  74. def filename(self):
  75. return f'{self.filename_stem}.{self.ext}'
  76. @property
  77. def signed_filename(self):
  78. return f'{self.filename_stem}.{coin_msg.signed.ext}'
  79. @staticmethod
  80. def get_proto_from_file(cfg, filename):
  81. data = json.loads(get_data_from_file(cfg, filename))
  82. network_id = data['metadata']['network'] if 'metadata' in data else data['network'].lower()
  83. coin, network = network_id.split('_')
  84. return init_proto(cfg=cfg, coin=coin, network=network)
  85. def write_to_file(self, *, outdir=None, ask_overwrite=False):
  86. data = {
  87. 'id': f'{gc.proj_name} {self.desc}',
  88. 'metadata': self.data,
  89. 'signatures': self.sigs}
  90. write_data_to_file(
  91. cfg = self.cfg,
  92. outfile = os.path.join(outdir or '', self.filename),
  93. data = json.dumps(data, sort_keys=True, indent=4),
  94. desc = self.desc,
  95. ask_overwrite = ask_overwrite)
  96. class new(base):
  97. def __init__(self, message, addrlists, msghash_type, *args, **kwargs):
  98. msghash_type = msghash_type or self.msg_cls.msghash_types[0]
  99. if msghash_type not in self.msg_cls.msghash_types:
  100. die(2, f'msghash_type {msghash_type!r} not supported for {self.proto.base_proto} protocol')
  101. self.data = {
  102. 'network': '{}_{}'.format(self.proto.coin.lower(), self.proto.network),
  103. 'addrlists': [MMGenIDRange(self.proto, i) for i in addrlists.split()],
  104. 'message': message,
  105. 'msghash_type': msghash_type}
  106. self.sigs = {}
  107. class completed(base):
  108. def __init__(self, data, infile, *args, **kwargs):
  109. if data:
  110. self.__dict__ = data
  111. return
  112. self.data = get_data_from_file(
  113. cfg = self.cfg,
  114. infile = infile,
  115. desc = self.desc)
  116. d = json.loads(self.data)
  117. self.data = d['metadata']
  118. self.sigs = d['signatures']
  119. self.addrlists = [MMGenIDRange(self.proto, i) for i in self.data['addrlists']]
  120. def format(self, req_addr=None):
  121. labels = {
  122. 'addr': 'address:',
  123. 'addr_p2pkh': 'addr_p2pkh:',
  124. 'pubhash': 'pubkey hash:',
  125. 'sig': 'signature:'}
  126. def gen_entry(e):
  127. for k in labels:
  128. if e.get(k):
  129. yield fs_sig.format(labels[k], e[k])
  130. def gen_all():
  131. for k, v in hdr_data.items():
  132. yield fs_hdr.format(v[0], v[1](self.data[k]))
  133. if self.sigs:
  134. yield ''
  135. yield 'Signatures:'
  136. for n, (k, v) in enumerate(self.sigs.items()):
  137. yield ''
  138. yield f'{n+1:>3}) {k}'
  139. yield from gen_entry(v)
  140. def gen_single():
  141. for k, v in hdr_data.items():
  142. yield fs_hdr.format(v[0], v[1](self.data[k]))
  143. if self.sigs:
  144. yield 'Signature data:'
  145. k = (
  146. CoinAddr(self.proto, req_addr) if type(self).__name__ == 'exported_sigs' else
  147. MMGenID(self.proto, req_addr))
  148. if k not in self.sigs:
  149. die(1, f'{k}: address not found in signature data')
  150. yield from gen_entry(self.sigs[k])
  151. hdr_data = {
  152. 'message': ('Message:', grnbg),
  153. 'network': ('Network:', lambda v: v.replace('_', ' ').upper()),
  154. 'msghash_type': ('Message Hash Type:', lambda v: v),
  155. 'addrlists': ('Address Ranges:', lambda v: fmt_list(v, fmt='bare')),
  156. 'failed_sids': ('Failed Seed IDs:', lambda v: red(fmt_list(v, fmt='bare')))}
  157. if len(self.msg_cls.msghash_types) == 1:
  158. del hdr_data['msghash_type']
  159. if req_addr or type(self).__name__ == 'exported_sigs':
  160. del hdr_data['addrlists']
  161. if req_addr or not self.data.get('failed_sids'):
  162. del hdr_data['failed_sids']
  163. fs_hdr = '{:%s} {}' % max(len(v[0]) for v in hdr_data.values())
  164. fs_sig = '%s{:%s} %s{}' % (
  165. ' ' * (2 if req_addr else 5),
  166. max(len(labels[k]) for v in self.sigs.values() for k in v.keys()),
  167. self.msg_cls.sigdata_pfx or ''
  168. ) if self.sigs else None
  169. if req_addr:
  170. return '\n'.join(gen_single())
  171. else:
  172. return ('' if self.sigs else 'UN') + 'SIGNED MESSAGE DATA:\n\n ' + '\n '.join(gen_all())
  173. class unsigned(completed):
  174. async def sign(self, wallet_files, *, passwd_file=None):
  175. from .addrlist import KeyAddrList
  176. async def sign_list(al_in, seed):
  177. al = KeyAddrList(
  178. cfg = self.cfg,
  179. proto = self.proto,
  180. seed = seed,
  181. addr_idxs = al_in.idxlist,
  182. mmtype = al_in.mmtype,
  183. skip_chksum = True,
  184. add_p2pkh = al_in.mmtype in ('S', 'B'))
  185. for e in al.data:
  186. sig = await self.do_sign(
  187. wif = e.sec.wif,
  188. message = self.data['message'],
  189. msghash_type = self.data['msghash_type'])
  190. mmid = f'{al_in.sid}:{al_in.mmtype}:{e.idx}'
  191. data = {
  192. 'addr': e.addr,
  193. 'sig': sig}
  194. if self.msg_cls.include_pubhash:
  195. data.update(
  196. {'pubhash': self.proto.decode_addr(e.addr_p2pkh or e.addr).bytes.hex()})
  197. if e.addr_p2pkh:
  198. data.update({'addr_p2pkh': e.addr_p2pkh})
  199. self.sigs[mmid] = data
  200. if self.proto.sign_mode == 'daemon':
  201. from .rpc import rpc_init
  202. self.rpc = await rpc_init(self.cfg, self.proto, ignore_wallet=True)
  203. from .wallet import Wallet
  204. wallet_seeds = [Wallet(cfg=self.cfg, fn=fn, passwd_file=passwd_file).seed for fn in wallet_files]
  205. need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
  206. saved_seeds = []
  207. # First try wallet seeds:
  208. for sid in need_sids:
  209. for seed in wallet_seeds:
  210. if sid == seed.sid:
  211. saved_seeds.append(seed)
  212. need_sids.remove(sid)
  213. break
  214. # Then subseeds:
  215. for sid in need_sids:
  216. for seed in wallet_seeds:
  217. subseed = seed.subseeds.get_subseed_by_seed_id(sid, print_msg=True)
  218. if subseed:
  219. saved_seeds.append(subseed)
  220. need_sids.remove(sid)
  221. break
  222. for al in self.addrlists:
  223. for seed in saved_seeds:
  224. if al.sid == seed.sid:
  225. await sign_list(al, seed)
  226. break
  227. if need_sids:
  228. msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids, fmt='bare'))))
  229. self.data['failed_sids'] = need_sids
  230. class signed(completed):
  231. ext = 'sigmsg.json'
  232. signed = True
  233. class signed_online(signed):
  234. def get_sigs(self, addr):
  235. if addr:
  236. req_addr = (
  237. CoinAddr(self.proto, addr) if type(self).__name__ == 'exported_sigs' else
  238. MMGenID(self.proto, addr))
  239. sigs = {k: v for k, v in self.sigs.items() if k == req_addr}
  240. else:
  241. sigs = self.sigs
  242. if not sigs:
  243. die(1, 'No signatures')
  244. return sigs
  245. async def verify(self, *, addr=None):
  246. sigs = self.get_sigs(addr)
  247. if self.proto.sign_mode == 'daemon':
  248. from .rpc import rpc_init
  249. self.rpc = await rpc_init(self.cfg, self.proto, ignore_wallet=True)
  250. for k, v in sigs.items():
  251. ret = await self.do_verify(
  252. addr = v.get('addr_p2pkh') or v['addr'],
  253. sig = v['sig'],
  254. message = self.data['message'],
  255. msghash_type = self.data['msghash_type'])
  256. if not ret:
  257. die(3, f'Invalid signature for address {k} ({v["addr"]})')
  258. return len(sigs)
  259. def get_json_for_export(self, *, addr=None):
  260. sigs = list(self.get_sigs(addr).values())
  261. pfx = self.msg_cls.sigdata_pfx
  262. if pfx:
  263. sigs = [{k: pfx+v for k, v in e.items()} for e in sigs]
  264. return json.dumps({
  265. 'message': self.data['message'],
  266. 'msghash_type': self.data['msghash_type'],
  267. 'network': self.data['network'].upper(),
  268. 'signatures': sigs},
  269. sort_keys = True,
  270. indent = 4)
  271. class exported_sigs(signed_online):
  272. def __init__(self, infile, *args, **kwargs):
  273. self.data = json.loads(
  274. get_data_from_file(
  275. cfg = self.cfg,
  276. infile = infile,
  277. desc = self.desc)
  278. )
  279. pfx = self.msg_cls.sigdata_pfx
  280. self.sigs = {sig_data['addr']: sig_data for sig_data in (
  281. [{k: v[len(pfx):] for k, v in e.items()} for e in self.data['signatures']]
  282. if pfx else
  283. self.data['signatures']
  284. )}
  285. def _get_obj(clsname, cfg, *args, coin=None, network='mainnet', infile=None, data=None, **kwargs):
  286. assert not args, 'msg:_get_obj(): only keyword args allowed'
  287. if clsname == 'signed':
  288. assert data and not (coin or infile), 'msg:_get_obj(): chk2'
  289. else:
  290. assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3'
  291. proto = (
  292. data['proto'] if data else
  293. init_proto(cfg=cfg, coin=coin, network=network) if coin else
  294. coin_msg.base.get_proto_from_file(cfg, infile))
  295. try:
  296. msg_cls = getattr(
  297. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.msg'),
  298. 'coin_msg')
  299. except:
  300. die(1, f'Message signing operations not supported for {proto.base_proto} protocol')
  301. me = MMGenObject.__new__(getattr(msg_cls, clsname, getattr(coin_msg, clsname)))
  302. me.msg_cls = msg_cls
  303. me.cfg = cfg
  304. me.proto = proto
  305. me.__init__(infile=infile, data=data, *args, **kwargs)
  306. return me
  307. def _get(clsname):
  308. return lambda *args, **kwargs: _get_obj(clsname, *args, **kwargs)
  309. NewMsg = _get('new')
  310. CompletedMsg = _get('completed')
  311. UnsignedMsg = _get('unsigned')
  312. SignedMsg = _get('signed')
  313. SignedOnlineMsg = _get('signed_online')
  314. ExportedMsgSigs = _get('exported_sigs')