msg.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2023 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen
  9. # https://gitlab.com/mmgen/mmgen
  10. """
  11. msg: base message signing classes
  12. """
  13. import os,importlib,json
  14. from .cfg import gc
  15. from .objmethods import MMGenObject,HiliteStr,InitErrors
  16. from .util import msg,die,make_chksum_6,fmt_list,remove_dups
  17. from .color import red,orange,grnbg
  18. from .protocol import init_proto
  19. from .fileutil import get_data_from_file,write_data_to_file
  20. from .addr import MMGenID,CoinAddr
  21. class MMGenIDRange(HiliteStr,InitErrors,MMGenObject):
  22. """
  23. closely based on MMGenID
  24. """
  25. color = 'orange'
  26. width = 0
  27. trunc_ok = False
  28. def __new__(cls,proto,id_str):
  29. from .addrlist import AddrIdxList
  30. from .addr import AddrListID
  31. from .seed import SeedID
  32. try:
  33. ss = str(id_str).split(':')
  34. assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
  35. t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2])
  36. me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
  37. me.sid = SeedID(sid=ss[0])
  38. me.idxlist = AddrIdxList(ss[-1])
  39. me.mmtype = t
  40. assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
  41. me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
  42. me.proto = proto
  43. return me
  44. except Exception as e:
  45. return cls.init_fail(e,id_str)
  46. class coin_msg:
  47. class base(MMGenObject):
  48. ext = 'rawmsg.json'
  49. signed = False
  50. chksum_keys = ('addrlists','message','msghash_type','network')
  51. @property
  52. def desc(self):
  53. return ('signed' if self.signed else 'unsigned') + ' message data'
  54. @property
  55. def chksum(self):
  56. return make_chksum_6(
  57. json.dumps(
  58. {k:self.data[k] for k in self.chksum_keys},
  59. sort_keys = True,
  60. separators = (',', ':')
  61. ))
  62. @property
  63. def filename_stem(self):
  64. coin,network = self.data['network'].split('_')
  65. return '{}[{}]{}'.format(
  66. self.chksum.upper(),
  67. coin.upper(),
  68. ('' if network == 'mainnet' else '.'+network) )
  69. @property
  70. def filename(self):
  71. return f'{self.filename_stem}.{self.ext}'
  72. @property
  73. def signed_filename(self):
  74. return f'{self.filename_stem}.{coin_msg.signed.ext}'
  75. @staticmethod
  76. def get_proto_from_file(cfg,filename):
  77. data = json.loads(get_data_from_file(cfg, filename))
  78. network_id = data['metadata']['network'] if 'metadata' in data else data['network'].lower()
  79. coin,network = network_id.split('_')
  80. return init_proto(cfg=cfg, coin=coin, network=network)
  81. def write_to_file(self,outdir=None,ask_overwrite=False):
  82. data = {
  83. 'id': f'{gc.proj_name} {self.desc}',
  84. 'metadata': self.data,
  85. 'signatures': self.sigs,
  86. }
  87. write_data_to_file(
  88. cfg = self.cfg,
  89. outfile = os.path.join(outdir or '',self.filename),
  90. data = json.dumps(data,sort_keys=True,indent=4),
  91. desc = self.desc,
  92. ask_overwrite = ask_overwrite )
  93. class new(base):
  94. def __init__(self,message,addrlists,msghash_type,*args,**kwargs):
  95. msghash_type = msghash_type or self.msg_cls.msghash_types[0]
  96. if msghash_type not in self.msg_cls.msghash_types:
  97. die(2,f'msghash_type {msghash_type!r} not supported for {self.proto.base_proto} protocol')
  98. self.data = {
  99. 'network': '{}_{}'.format( self.proto.coin.lower(), self.proto.network ),
  100. 'addrlists': [MMGenIDRange(self.proto,i) for i in addrlists.split()],
  101. 'message': message,
  102. 'msghash_type': msghash_type,
  103. }
  104. self.sigs = {}
  105. class completed(base):
  106. def __init__(self,data,infile,*args,**kwargs):
  107. if data:
  108. self.__dict__ = data
  109. return
  110. self.data = get_data_from_file(
  111. cfg = self.cfg,
  112. infile = infile,
  113. desc = self.desc )
  114. d = json.loads(self.data)
  115. self.data = d['metadata']
  116. self.sigs = d['signatures']
  117. self.addrlists = [MMGenIDRange(self.proto,i) for i in self.data['addrlists']]
  118. def format(self,req_addr=None):
  119. labels = {
  120. 'addr': 'address:',
  121. 'addr_p2pkh': 'addr_p2pkh:',
  122. 'pubhash': 'pubkey hash:',
  123. 'sig': 'signature:',
  124. }
  125. def gen_entry(e):
  126. for k in labels:
  127. if e.get(k):
  128. yield fs_sig.format( labels[k], e[k] )
  129. def gen_all():
  130. for k,v in hdr_data.items():
  131. yield fs_hdr.format( v[0], v[1](self.data[k]) )
  132. if self.sigs:
  133. yield ''
  134. yield 'Signatures:'
  135. for n,(k,v) in enumerate(self.sigs.items()):
  136. yield ''
  137. yield f'{n+1:>3}) {k}'
  138. for res in gen_entry(v):
  139. yield res
  140. def gen_single():
  141. for k,v in hdr_data.items():
  142. yield fs_hdr.format( v[0], v[1](self.data[k]) )
  143. if self.sigs:
  144. yield 'Signature data:'
  145. k = (
  146. CoinAddr(self.proto,req_addr) if type(self).__name__ == 'exported_sigs' else
  147. MMGenID(self.proto,req_addr) )
  148. if k not in self.sigs:
  149. die(1,f'{k}: address not found in signature data')
  150. for res in gen_entry(self.sigs[k]):
  151. yield res
  152. hdr_data = {
  153. 'message': ('Message:', grnbg ),
  154. 'network': ('Network:', lambda v: v.replace('_',' ').upper() ),
  155. 'msghash_type': ('Message Hash Type:', lambda v: v ),
  156. 'addrlists': ('Address Ranges:', lambda v: fmt_list(v,fmt='bare') ),
  157. 'failed_sids': ('Failed Seed IDs:', lambda v: red(fmt_list(v,fmt='bare')) ),
  158. }
  159. if len(self.msg_cls.msghash_types) == 1:
  160. del hdr_data['msghash_type']
  161. if req_addr or type(self).__name__ == 'exported_sigs':
  162. del hdr_data['addrlists']
  163. if req_addr or not self.data.get('failed_sids'):
  164. del hdr_data['failed_sids']
  165. fs_hdr = '{:%s} {}' % max(len(v[0]) for v in hdr_data.values())
  166. fs_sig = '%s{:%s} %s{}' % (
  167. ' ' * (2 if req_addr else 5),
  168. max(len(labels[k]) for v in self.sigs.values() for k in v.keys()),
  169. self.msg_cls.sigdata_pfx or ''
  170. ) if self.sigs else None
  171. if req_addr:
  172. return '\n'.join(gen_single())
  173. else:
  174. return ('' if self.sigs else 'UN') + 'SIGNED MESSAGE DATA:\n\n ' + '\n '.join(gen_all())
  175. class unsigned(completed):
  176. async def sign(self,wallet_files):
  177. from .addrlist import KeyAddrList
  178. async def sign_list(al_in,seed):
  179. al = KeyAddrList(
  180. cfg = self.cfg,
  181. proto = self.proto,
  182. seed = seed,
  183. addr_idxs = al_in.idxlist,
  184. mmtype = al_in.mmtype,
  185. skip_chksum = True,
  186. add_p2pkh = al_in.mmtype in ('S','B') )
  187. for e in al.data:
  188. sig = await self.do_sign(
  189. wif = e.sec.wif,
  190. message = self.data['message'],
  191. msghash_type = self.data['msghash_type'] )
  192. mmid = f'{al_in.sid}:{al_in.mmtype}:{e.idx}'
  193. data = {
  194. 'addr': e.addr,
  195. 'sig': sig,
  196. }
  197. if self.msg_cls.include_pubhash:
  198. data.update({ 'pubhash': self.proto.decode_addr(e.addr_p2pkh or e.addr).bytes.hex() })
  199. if e.addr_p2pkh:
  200. data.update({'addr_p2pkh': e.addr_p2pkh})
  201. self.sigs[mmid] = data
  202. if self.proto.sign_mode == 'daemon':
  203. from .rpc import rpc_init
  204. self.rpc = await rpc_init( self.cfg, self.proto, ignore_wallet=True )
  205. from .wallet import Wallet
  206. wallet_seeds = [Wallet(cfg=self.cfg,fn=fn).seed for fn in wallet_files]
  207. need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
  208. saved_seeds = []
  209. # First try wallet seeds:
  210. for sid in need_sids:
  211. for seed in wallet_seeds:
  212. if sid == seed.sid:
  213. saved_seeds.append(seed)
  214. need_sids.remove(sid)
  215. break
  216. # Then subseeds:
  217. for sid in need_sids:
  218. for seed in wallet_seeds:
  219. subseed = seed.subseeds.get_subseed_by_seed_id(sid,print_msg=True)
  220. if subseed:
  221. saved_seeds.append(subseed)
  222. need_sids.remove(sid)
  223. break
  224. for al in self.addrlists:
  225. for seed in saved_seeds:
  226. if al.sid == seed.sid:
  227. await sign_list(al,seed)
  228. break
  229. if need_sids:
  230. msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids,fmt='bare'))))
  231. self.data['failed_sids'] = need_sids
  232. class signed(completed):
  233. ext = 'sigmsg.json'
  234. signed = True
  235. class signed_online(signed):
  236. def get_sigs(self,addr):
  237. if addr:
  238. req_addr = (
  239. CoinAddr(self.proto,addr) if type(self).__name__ == 'exported_sigs' else
  240. MMGenID(self.proto,addr) )
  241. sigs = {k:v for k,v in self.sigs.items() if k == req_addr}
  242. else:
  243. sigs = self.sigs
  244. if not sigs:
  245. die(1,'No signatures')
  246. return sigs
  247. async def verify(self,addr=None):
  248. sigs = self.get_sigs(addr)
  249. if self.proto.sign_mode == 'daemon':
  250. from .rpc import rpc_init
  251. self.rpc = await rpc_init( self.cfg, self.proto, ignore_wallet=True )
  252. for k,v in sigs.items():
  253. ret = await self.do_verify(
  254. addr = v.get('addr_p2pkh') or v['addr'],
  255. sig = v['sig'],
  256. message = self.data['message'],
  257. msghash_type = self.data['msghash_type'] )
  258. if not ret:
  259. die(3,f'Invalid signature for address {k} ({v["addr"]})')
  260. return len(sigs)
  261. def get_json_for_export(self,addr=None):
  262. sigs = list( self.get_sigs(addr).values() )
  263. pfx = self.msg_cls.sigdata_pfx
  264. if pfx:
  265. sigs = [{k:pfx+v for k,v in e.items()} for e in sigs]
  266. return json.dumps(
  267. {
  268. 'message': self.data['message'],
  269. 'msghash_type': self.data['msghash_type'],
  270. 'network': self.data['network'].upper(),
  271. 'signatures': sigs,
  272. },
  273. sort_keys = True,
  274. indent = 4
  275. )
  276. class exported_sigs(signed_online):
  277. def __init__(self,infile,*args,**kwargs):
  278. self.data = json.loads(
  279. get_data_from_file(
  280. cfg = self.cfg,
  281. infile = infile,
  282. desc = self.desc )
  283. )
  284. pfx = self.msg_cls.sigdata_pfx
  285. self.sigs = {sig_data['addr']:sig_data for sig_data in (
  286. [{k:v[len(pfx):] for k,v in e.items()} for e in self.data['signatures']]
  287. if pfx else
  288. self.data['signatures']
  289. )}
  290. def _get_obj(clsname,cfg,*args,coin=None,network='mainnet',infile=None,data=None,**kwargs):
  291. assert not args, 'msg:_get_obj(): only keyword args allowed'
  292. if clsname == 'signed':
  293. assert data and not (coin or infile), 'msg:_get_obj(): chk2'
  294. else:
  295. assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3'
  296. proto = (
  297. data['proto'] if data else
  298. init_proto( cfg=cfg, coin=coin, network=network ) if coin else
  299. coin_msg.base.get_proto_from_file(cfg,infile) )
  300. try:
  301. msg_cls = getattr(
  302. importlib.import_module(f'mmgen.proto.{proto.base_proto_coin.lower()}.msg'),
  303. 'coin_msg' )
  304. except:
  305. die(1,f'Message signing operations not supported for {proto.base_proto} protocol')
  306. me = MMGenObject.__new__(getattr( msg_cls, clsname, getattr(coin_msg,clsname) ))
  307. me.msg_cls = msg_cls
  308. me.cfg = cfg
  309. me.proto = proto
  310. me.__init__(infile=infile,data=data,*args,**kwargs)
  311. return me
  312. def _get(clsname):
  313. return lambda *args,**kwargs: _get_obj(clsname,*args,**kwargs)
  314. NewMsg = _get('new')
  315. CompletedMsg = _get('completed')
  316. UnsignedMsg = _get('unsigned')
  317. SignedMsg = _get('signed')
  318. SignedOnlineMsg = _get('signed_online')
  319. ExportedMsgSigs = _get('exported_sigs')