msg.py 8.6 KB


  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen
  9. # https://gitlab.com/mmgen/mmgen
  10. """
  11. msg: base message signing classes
  12. """
  13. import os,importlib,json
  14. from .globalvars import g
  15. from .objmethods import MMGenObject,Hilite,InitErrors
  16. from .util import msg,vmsg,die,suf,make_chksum_6,fmt_list,remove_dups
  17. from .color import red,orange,grnbg
  18. from .protocol import init_proto
  19. from .fileutil import get_data_from_file,write_data_to_file
  20. from .addr import MMGenID
  21. class MMGenIDRange(str,Hilite,InitErrors,MMGenObject):
  22. """
  23. closely based on MMGenID
  24. """
  25. color = 'orange'
  26. width = 0
  27. trunc_ok = False
  28. def __new__(cls,proto,id_str):
  29. from .addrlist import AddrIdxList
  30. from .addr import AddrListID
  31. from .seed import SeedID
  32. try:
  33. ss = str(id_str).split(':')
  34. assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
  35. t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2])
  36. me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
  37. me.sid = SeedID(sid=ss[0])
  38. me.idxlist = AddrIdxList(ss[-1])
  39. me.mmtype = t
  40. assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
  41. me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
  42. me.proto = proto
  43. return me
  44. except Exception as e:
  45. return cls.init_fail(e,id_str)
  46. class coin_msg:
  47. class base(MMGenObject):
  48. ext = 'rawmsg.json'
  49. signed = False
  50. @property
  51. def desc(self):
  52. return ('signed' if self.signed else 'unsigned') + ' message data'
  53. @property
  54. def chksum(self):
  55. return make_chksum_6(
  56. json.dumps( self.data, sort_keys=True, separators=(',', ':') ))
  57. @property
  58. def filename_stem(self):
  59. coin,network = self.data['network'].split('_')
  60. return '{}[{}]{}'.format(
  61. self.chksum.upper(),
  62. coin.upper(),
  63. ('' if network == 'mainnet' else '.'+network) )
  64. @property
  65. def filename(self):
  66. return f'{self.filename_stem}.{self.ext}'
  67. @property
  68. def signed_filename(self):
  69. return f'{self.filename_stem}.{coin_msg.signed.ext}'
  70. def get_proto_from_file(self,filename):
  71. coin,network = json.loads(get_data_from_file(filename))['metadata']['network'].split('_')
  72. return init_proto( coin=coin, network=network )
  73. def write_to_file(self,outdir=None,ask_overwrite=False):
  74. data = {
  75. 'id': f'{g.proj_name} {self.desc}',
  76. 'metadata': self.data,
  77. 'signatures': self.sigs,
  78. }
  79. if hasattr(self,'failed_sids'):
  80. data.update({'failed_seed_ids':self.failed_sids})
  81. write_data_to_file(
  82. outfile = os.path.join(outdir or '',self.filename),
  83. data = json.dumps(data,sort_keys=True,indent=4),
  84. desc = f'{self.desc} data',
  85. ask_overwrite = ask_overwrite )
  86. class new(base):
  87. def __init__(self,message,addrlists,*args,**kwargs):
  88. self.data = {
  89. 'network': '{}_{}'.format( self.proto.coin.lower(), self.proto.network ),
  90. 'addrlists': [MMGenIDRange(self.proto,i) for i in addrlists.split()],
  91. 'message': message,
  92. }
  93. self.sigs = {}
  94. class completed(base):
  95. def __init__(self,data,infile,*args,**kwargs):
  96. if data:
  97. self.__dict__ = data
  98. return
  99. self.infile = infile
  100. self.data = get_data_from_file(
  101. infile = self.infile,
  102. desc = f'{self.desc} data' )
  103. d = json.loads(self.data)
  104. self.data = d['metadata']
  105. self.sigs = d['signatures']
  106. self.addrlists = [MMGenIDRange(self.proto,i) for i in self.data['addrlists']]
  107. if d.get('failed_seed_ids'):
  108. self.failed_sids = d['failed_seed_ids']
  109. def format(self,mmid=None):
  110. def gen_entry(e):
  111. yield fs2.format( 'addr:', e['addr'] )
  112. if e.get('addr_p2pkh'):
  113. yield fs2.format( 'addr_p2pkh:', e['addr_p2pkh'] )
  114. if e.get('pubhash'):
  115. yield fs2.format( 'pubkey hash:', e['pubhash'] )
  116. yield fs2.format('sig:', e['sig'] )
  117. def gen_all():
  118. fs = '{:16s} {}'
  119. for k,v in disp_data.items():
  120. yield fs.format( v[0]+':', v[1](self.data[k]) )
  121. if hasattr(self,'failed_sids'):
  122. yield fs.format(
  123. 'Failed Seed IDs:',
  124. red(fmt_list(self.failed_sids,fmt='bare')) )
  125. if self.sigs:
  126. yield ''
  127. yield 'Signatures:'
  128. for n,(k,v) in enumerate(self.sigs.items()):
  129. yield ''
  130. yield '{:>3}) {}'.format(n+1,k)
  131. for res in gen_entry(v):
  132. yield res
  133. def gen_single():
  134. fs = '{:8s} {}'
  135. for k,v in disp_data.items():
  136. yield fs.format( v[0]+':', v[1](self.data[k]) )
  137. if self.sigs:
  138. yield 'Signature data:'
  139. k = MMGenID(self.proto,mmid)
  140. if k not in self.sigs:
  141. die(1,f'{k}: address not found in signature data')
  142. for res in gen_entry(self.sigs[k]):
  143. yield res
  144. disp_data = {
  145. 'message': ('Message', lambda v: grnbg(v) ),
  146. 'network': ('Network', lambda v: v.replace('_',' ').upper() ),
  147. 'addrlists': ('Address Ranges', lambda v: fmt_list(v,fmt='bare') ),
  148. }
  149. if mmid:
  150. del disp_data['addrlists']
  151. fs2 = ' {:12s} {}'
  152. return '\n'.join(gen_single())
  153. else:
  154. fs2 = ' {:12s} {}'
  155. return (
  156. '{}SIGNED MESSAGE DATA:\n\n '.format('' if self.sigs else 'UN') +
  157. '\n '.join(gen_all()) )
  158. class unsigned(completed):
  159. async def sign(self,wallet_files):
  160. async def sign_list(al_in,seed):
  161. al = KeyAddrList(
  162. proto = self.proto,
  163. seed = seed,
  164. addr_idxs = al_in.idxlist,
  165. mmtype = al_in.mmtype,
  166. skip_chksum = True,
  167. add_p2pkh = al_in.mmtype in ('S','B') )
  168. for e in al.data:
  169. sig = await self.do_sign(
  170. wif = e.sec.wif,
  171. message = self.data['message'] )
  172. mmid = '{}:{}:{}'.format( al_in.sid, al_in.mmtype, e.idx )
  173. data = {
  174. 'addr': e.addr,
  175. 'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex(),
  176. 'sig': sig,
  177. }
  178. if e.addr_p2pkh:
  179. data.update({'addr_p2pkh': e.addr_p2pkh})
  180. self.sigs[mmid] = data
  181. from .rpc import rpc_init
  182. self.rpc = await rpc_init(self.proto)
  183. from .wallet import Wallet
  184. from .addrlist import KeyAddrList
  185. wallet_seeds = [Wallet(fn=fn).seed for fn in wallet_files]
  186. need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
  187. saved_seeds = list()
  188. # First try wallet seeds:
  189. for sid in need_sids:
  190. for seed in wallet_seeds:
  191. if sid == seed.sid:
  192. saved_seeds.append(seed)
  193. need_sids.remove(sid)
  194. break
  195. # Then subseeds:
  196. for sid in need_sids:
  197. for seed in wallet_seeds:
  198. subseed = seed.subseeds.get_subseed_by_seed_id(sid,print_msg=True)
  199. if subseed:
  200. saved_seeds.append(subseed)
  201. need_sids.remove(sid)
  202. break
  203. for al in self.addrlists:
  204. for seed in saved_seeds:
  205. if al.sid == seed.sid:
  206. await sign_list(al,seed)
  207. break
  208. if need_sids:
  209. msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids,fmt='bare'))))
  210. self.failed_sids = need_sids
  211. class signed(completed):
  212. ext = 'sigmsg.json'
  213. signed = True
  214. class signed_online(signed):
  215. async def verify(self,addr=None,summary=False):
  216. if addr:
  217. mmaddr = MMGenID(self.proto,addr)
  218. sigs = {k:v for k,v in self.sigs.items() if k == mmaddr}
  219. else:
  220. sigs = self.sigs
  221. if sigs:
  222. from .rpc import rpc_init
  223. self.rpc = await rpc_init(self.proto)
  224. for k,v in sigs.items():
  225. ret = await self.do_verify(
  226. addr = v.get('addr_p2pkh') or v['addr'],
  227. sig = v['sig'],
  228. message = self.data['message'] )
  229. if not ret:
  230. die(3,f'Invalid signature for address {k} ({v["addr"]})')
  231. if summary:
  232. msg('{} signature{} verified'.format( len(sigs), suf(sigs) ))
  233. else:
  234. die(1,'No signatures')
  235. def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs):
  236. assert not args, 'msg:_get_obj(): only keyword args allowed'
  237. if clsname == 'signed':
  238. assert data and not (coin or infile), 'msg:_get_obj(): chk2'
  239. else:
  240. assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3'
  241. proto = (
  242. data['proto'] if data else
  243. init_proto( coin=coin, network=network ) if coin else
  244. coin_msg.base().get_proto_from_file(infile) )
  245. cls = getattr(
  246. getattr(importlib.import_module(f'mmgen.base_proto.{proto.base_proto.lower()}.msg'),'coin_msg'),
  247. clsname )
  248. me = MMGenObject.__new__(cls)
  249. me.proto = proto
  250. me.__init__(infile=infile,data=data,*args,**kwargs)
  251. return me
  252. def _get(clsname):
  253. return lambda *args,**kwargs: _get_obj(clsname,*args,**kwargs)
  254. NewMsg = _get('new')
  255. CompletedMsg = _get('completed')
  256. UnsignedMsg = _get('unsigned')
  257. SignedMsg = _get('signed')
  258. SignedOnlineMsg = _get('signed_online')