Browse Source

message signing: low-level infrastructure and unit test

- Messages can be signed for arbitrary lists or ranges of addresses.
- Addresses and ranges are specified using a SEED_ID:ADDR_TYPE:IDX_RANGE
  selector.
- For segwit addresses, signature data includes a pubkey hash and p2pkh address
  sharing the same key to allow verification of messages with external tools,
  e.g. `bitcoin-cli verifymessage`.

Testing:

    $ test/unit_tests.py -av msg
The MMGen Project 3 years ago
parent
commit
86e3b273
6 changed files with 450 additions and 1 deletions
  1. 7 0
      mmgen/addrlist.py
  2. 35 0
      mmgen/base_proto/bitcoin/msg.py
  3. 1 1
      mmgen/data/version
  4. 305 0
      mmgen/msg.py
  5. 3 0
      mmgen/util.py
  6. 99 0
      test/unit_tests_d/ut_msg.py

+ 7 - 0
mmgen/addrlist.py

@@ -69,6 +69,7 @@ class AddrListEntryBase(MMGenListItem):
 
 class AddrListEntry(AddrListEntryBase):
 	addr          = ListItemAttr(CoinAddr,include_proto=True)
+	addr_p2pkh    = ListItemAttr(CoinAddr,include_proto=True)
 	idx           = ListItemAttr(AddrIdx) # not present in flat addrlists
 	label         = ListItemAttr(TwComment,reassign_ok=True)
 	sec           = ListItemAttr(PrivKey,include_proto=True)
@@ -148,9 +149,11 @@ class AddrList(MMGenObject): # Address info for a single seed ID
 			mmtype    = None,
 			skip_key_address_validity_check = False,
 			skip_chksum = False,
+			add_p2pkh = False,
 		):
 
 		self.skip_ka_check = skip_key_address_validity_check
+		self.add_p2pkh = add_p2pkh
 		self.proto = proto
 		do_chksum = False
 
@@ -227,6 +230,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
 			from .addr import KeyGenerator,AddrGenerator
 			kg = KeyGenerator( self.proto, mmtype.pubkey_type )
 			ag = AddrGenerator( self.proto, mmtype )
+			if self.add_p2pkh:
+				ag2 = AddrGenerator( self.proto, 'compressed' )
 
 		t_addrs,out = ( len(addr_idxs), AddrListData() )
 		le = self.entry_type
@@ -259,6 +264,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
 			if self.gen_addrs:
 				data = kg.gen_data(e.sec)
 				e.addr = ag.to_addr(data)
+				if self.add_p2pkh:
+					e.addr_p2pkh = ag2.to_addr(data)
 				if gen_viewkey:
 					e.viewkey = ag.to_viewkey(data)
 				if gen_wallet_passwd:

+ 35 - 0
mmgen/base_proto/bitcoin/msg.py

@@ -0,0 +1,35 @@
+#!/usr/bin/env python3
+#
+# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
+# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
+# Licensed under the GNU General Public License, Version 3:
+#   https://www.gnu.org/licenses
+# Public project repositories:
+#   https://github.com/mmgen/mmgen
+#   https://gitlab.com/mmgen/mmgen
+
+"""
+base_proto.bitcoin.msg: Bitcoin base protocol message signing classes
+"""
+
+from ...msg import coin_msg
+
+class coin_msg(coin_msg):
+
+	class base(coin_msg.base): pass
+
+	class new(base,coin_msg.new): pass
+
+	class completed(base,coin_msg.completed): pass
+
+	class unsigned(completed,coin_msg.unsigned):
+
+		async def do_sign(self,wif,message):
+			return await self.rpc.call( 'signmessagewithprivkey', wif, message )
+
+	class signed(completed,coin_msg.signed): pass
+
+	class signed_online(signed,coin_msg.signed_online):
+
+		async def do_verify(self,addr,sig,message):
+			return await self.rpc.call( 'verifymessage', addr, sig, message )

+ 1 - 1
mmgen/data/version

@@ -1 +1 @@
-13.1.dev20
+13.1.dev21

+ 305 - 0
mmgen/msg.py

@@ -0,0 +1,305 @@
+#!/usr/bin/env python3
+#
+# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
+# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
+# Licensed under the GNU General Public License, Version 3:
+#   https://www.gnu.org/licenses
+# Public project repositories:
+#   https://github.com/mmgen/mmgen
+#   https://gitlab.com/mmgen/mmgen
+
+"""
+msg: base message signing classes
+"""
+
+import os,importlib,json
+from .globalvars import g
+from .objmethods import MMGenObject,Hilite,InitErrors
+from .util import msg,vmsg,die,suf,make_chksum_6,fmt_list,remove_dups
+from .color import orange
+from .protocol import init_proto
+from .fileutil import get_data_from_file,write_data_to_file
+from .addr import MMGenID
+
+class MMGenIDRange(str,Hilite,InitErrors,MMGenObject):
+	"""
+	closely based on MMGenID
+	"""
+	color = 'orange'
+	width = 0
+	trunc_ok = False
+	def __new__(cls,proto,id_str):
+		from .addrlist import AddrIdxList
+		from .addr import AddrListID
+		from .seed import SeedID
+		try:
+			ss = str(id_str).split(':')
+			assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
+			t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2])
+			me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
+			me.sid = SeedID(sid=ss[0])
+			me.idxlist = AddrIdxList(ss[-1])
+			me.mmtype = t
+			assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
+			me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
+			me.proto = proto
+			return me
+		except Exception as e:
+			return cls.init_fail(e,id_str)
+
+class coin_msg:
+
+	class base(MMGenObject):
+
+		ext = 'rawmsg.json'
+		signed = False
+
+		@property
+		def desc(self):
+			return ('signed' if self.signed else 'unsigned') + ' message data'
+
+		@property
+		def chksum(self):
+			return make_chksum_6(
+				json.dumps( self.data, sort_keys=True, separators=(',', ':') ))
+
+		@property
+		def filename_stem(self):
+			coin,network = self.data['network'].split('_')
+			return '{}[{}]{}'.format(
+				self.chksum.upper(),
+				coin.upper(),
+				('' if network == 'mainnet' else '.'+network) )
+
+		@property
+		def filename(self):
+			return f'{self.filename_stem}.{self.ext}'
+
+		@property
+		def signed_filename(self):
+			return f'{self.filename_stem}.{coin_msg.signed.ext}'
+
+		def get_proto_from_file(self,filename):
+			coin,network = json.loads(get_data_from_file(filename))['metadata']['network'].split('_')
+			return init_proto( coin=coin, network=network )
+
+		def write_to_file(self,outdir=None,ask_overwrite=False):
+			data = {
+				'id': f'{g.proj_name} {self.desc}',
+				'metadata': self.data,
+				'signatures': self.sigs,
+			}
+
+			if hasattr(self,'failed_sids'):
+				data.update({'failed_seed_ids':self.failed_sids})
+
+			write_data_to_file(
+				outfile       = os.path.join(outdir or '',self.filename),
+				data          = json.dumps(data,sort_keys=True,indent=4),
+				desc          = f'{self.desc} data',
+				ask_overwrite = ask_overwrite )
+
+	class new(base):
+
+		def __init__(self,message,addrlists,*args,**kwargs):
+			self.data = {
+				'network': '{}_{}'.format( self.proto.coin.lower(), self.proto.network ),
+				'addrlists': [MMGenIDRange(self.proto,i) for i in addrlists.split()],
+				'message': message,
+			}
+			self.sigs = {}
+
+	class completed(base):
+
+		def __init__(self,data,infile,*args,**kwargs):
+
+			if data:
+				self.__dict__ = data
+				return
+			elif infile:
+				self.infile = infile
+
+			self.data = get_data_from_file(
+				infile = self.infile,
+				desc   = f'{self.desc} data' )
+
+			d = json.loads(self.data)
+			self.data = d['metadata']
+			self.sigs = d['signatures']
+			self.addrlists = [MMGenIDRange(self.proto,i) for i in self.data['addrlists']]
+			if d.get('failed_seed_ids'):
+				self.failed_seed_ids = d['failed_seed_ids']
+
+		def format(self,mmid=None):
+
+			def gen_entry(e):
+				yield fs2.format( 'addr:', e['addr'] )
+				if e.get('addr_p2pkh'):
+					yield fs2.format( 'addr_p2pkh:', e['addr_p2pkh'] )
+				if e.get('pubhash'):
+					yield fs2.format( 'pubkey hash:', e['pubhash'] )
+				yield fs2.format('sig:', e['sig'] )
+
+			def gen_all():
+				fs = '{:16s} {}'
+				for k,v in disp_data.items():
+					yield fs.format( v[0]+':', v[1](self.data[k]) )
+				if hasattr(self,'failed_seed_ids'):
+					yield fs.format(
+						'Failed Seed IDs:',
+						fmt_list(self.failed_seed_ids,fmt='bare') )
+				yield ''
+				yield 'Signatures:'
+				for n,(k,v) in enumerate(self.sigs.items()):
+					yield ''
+					yield '{:>3}) {}'.format(n+1,k)
+					for res in gen_entry(v):
+						yield res
+
+			def gen_single():
+				fs = '{:8s} {}'
+				for k,v in disp_data.items():
+					yield fs.format( v[0]+':', v[1](self.data[k]) )
+				yield 'Signature data:'
+				k = MMGenID(self.proto,mmid)
+				if k not in self.sigs:
+					die(1,f'{k}: address not found in signature data')
+				for res in gen_entry(self.sigs[k]):
+					yield res
+
+			disp_data = {
+				'message':   ('Message',        lambda v: repr(v) ),
+				'network':   ('Network',        lambda v: v.replace('_',' ').upper() ),
+				'addrlists': ('Address Ranges', lambda v: fmt_list(v,fmt='bare') ),
+			}
+
+			if mmid:
+				del disp_data['addrlists']
+				fs2 = '  {:12s} {}'
+				return '\n'.join(gen_single())
+			else:
+				fs2 = '     {:12s} {}'
+				return 'SIGNED MESSAGE DATA:\n\n  ' + '\n  '.join(gen_all())
+
+	class unsigned(completed):
+
+		async def sign(self,wallet_files):
+
+			async def sign_list(al_in,seed):
+				al = KeyAddrList(
+					proto       = self.proto,
+					seed        = seed,
+					addr_idxs   = al_in.idxlist,
+					mmtype      = al_in.mmtype,
+					skip_chksum = True,
+					add_p2pkh   = al_in.mmtype in ('S','B') )
+
+				for e in al.data:
+					sig = await self.do_sign(
+						wif     = e.sec.wif,
+						message = self.data['message'] )
+
+					mmid = '{}:{}:{}'.format( al_in.sid, al_in.mmtype, e.idx )
+					data = {
+						'addr': e.addr,
+						'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex(),
+						'sig': sig,
+					}
+
+					if e.addr_p2pkh:
+						data.update({'addr_p2pkh': e.addr_p2pkh})
+
+					self.sigs[mmid] = data
+
+			from .rpc import rpc_init
+			self.rpc = await rpc_init(self.proto)
+
+			from .wallet import Wallet
+			from .addrlist import KeyAddrList
+			wallet_seeds = [Wallet(fn=fn).seed for fn in wallet_files]
+			need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
+			saved_seeds = list()
+
+			# First try wallet seeds:
+			for sid in need_sids:
+				for seed in wallet_seeds:
+					if sid == seed.sid:
+						saved_seeds.append(seed)
+						need_sids.remove(sid)
+						break
+
+			# Then subseeds:
+			for sid in need_sids:
+				for seed in wallet_seeds:
+					subseed = seed.subseeds.get_subseed_by_seed_id(sid,print_msg=True)
+					if subseed:
+						saved_seeds.append(subseed)
+						need_sids.remove(sid)
+						break
+
+			for al in self.addrlists:
+				for seed in saved_seeds:
+					if al.sid == seed.sid:
+						await sign_list(al,seed)
+						break
+
+			if need_sids:
+				msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids,fmt='bare'))))
+
+			self.failed_sids = need_sids
+
+	class signed(completed):
+
+		ext = 'sigmsg.json'
+		signed = True
+
+	class signed_online(signed):
+
+		async def verify(self):
+
+			from .rpc import rpc_init
+			self.rpc = await rpc_init(self.proto)
+
+			for k,v in self.sigs.items():
+				ret = await self.do_verify(
+					addr    = v.get('addr_p2pkh') or v['addr'],
+					sig     = v['sig'],
+					message = self.data['message'] )
+				if not ret:
+					die(3,f'Invalid signature for address {k} ({v["addr"]})')
+
+			vmsg('{} signature{} verified'.format( len(self.sigs), suf(self.sigs) ))
+
+def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs):
+
+	assert not args, 'msg:_get_obj(): only keyword args allowed'
+
+	if clsname == 'signed':
+		assert data and not (coin or infile), 'msg:_get_obj(): chk2'
+	else:
+		assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3'
+
+	proto = (
+		data['proto'] if data else
+		init_proto( coin=coin, network=network ) if coin else
+		coin_msg.base().get_proto_from_file(infile) )
+
+	cls = getattr(
+		getattr(importlib.import_module(f'mmgen.base_proto.{proto.base_proto.lower()}.msg'),'coin_msg'),
+		clsname )
+
+	me = MMGenObject.__new__(cls)
+	me.proto = proto
+
+	me.__init__(infile=infile,data=data,*args,**kwargs)
+
+	return me
+
+def _get(clsname):
+	return lambda *args,**kwargs: _get_obj(clsname,*args,**kwargs)
+
+NewMsg          = _get('new')
+CompletedMsg    = _get('completed')
+UnsignedMsg     = _get('unsigned')
+SignedMsg       = _get('signed')
+SignedOnlineMsg = _get('signed_online')

+ 3 - 0
mmgen/util.py

@@ -84,6 +84,9 @@ def gmsg_r(s):
 def bmsg(s):
 	msg(blue(s))
 
+def pumsg(s):
+	msg(purple(s))
+
 def qmsg(s):
 	if not opt.quiet:
 		msg(s)

+ 99 - 0
test/unit_tests_d/ut_msg.py

@@ -0,0 +1,99 @@
+#!/usr/bin/env python3
+"""
+test.unit_tests_d.ut_msg: message signing unit tests for the MMGen suite
+"""
+
+import os
+
+from test.include.common import silence,end_silence,restart_test_daemons,stop_test_daemons
+from mmgen.opts import opt
+from mmgen.util import msg,bmsg,pumsg
+from mmgen.protocol import CoinProtocol,init_proto
+from mmgen.msg import NewMsg,UnsignedMsg,SignedMsg,SignedOnlineMsg
+
+def get_obj(coin,network):
+
+	if coin == 'bch':
+		addrlists = 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1'
+	else:
+		# A091ABAA = 98831F3A:5S
+		addrlists = 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1'
+
+	return NewMsg(
+		coin      = coin,
+		network   = network,
+		message   = '08/Jun/2021 Bitcoin Law Enacted by El Salvador Legislative Assembly',
+		addrlists = addrlists )
+
+async def run_test(network_id):
+
+	coin,network = CoinProtocol.Base.parse_network_id(network_id)
+
+	if not opt.verbose:
+		silence()
+
+	bmsg(f'\nTesting {coin.upper()} {network.upper()}:\n')
+
+	restart_test_daemons(network_id)
+
+	pumsg('\nTesting data creation:\n')
+
+	m = get_obj(coin,network)
+
+	tmpdir = os.path.join('test','trash2')
+
+	os.makedirs(tmpdir,exist_ok=True)
+
+	m.write_to_file(
+		outdir        = tmpdir,
+		ask_overwrite = False )
+
+	pumsg('\nTesting signing:\n')
+
+	m = UnsignedMsg( infile = os.path.join(tmpdir,get_obj(coin,network).filename) )
+	await m.sign(wallet_files=['test/ref/98831F3A.mmwords'])
+
+	m = SignedMsg( data=m.__dict__ )
+	m.write_to_file(
+		outdir        = tmpdir,
+		ask_overwrite = False )
+
+	pumsg('\nTesting display:\n')
+
+	m = SignedOnlineMsg( infile = os.path.join(tmpdir,get_obj(coin,network).signed_filename) )
+
+	msg(m.format())
+
+	pumsg('\nTesting single address display:\n')
+	msg(m.format('A091ABAA:111'))
+
+	pumsg('\nTesting verification:\n')
+	await m.verify()
+
+	stop_test_daemons(network_id)
+
+	msg('\n')
+
+	if not opt.verbose:
+		end_silence()
+
+	return True
+
+class unit_tests:
+
+	altcoin_deps = ('ltc','bch')
+
+	def btc(self,name,ut):
+		return run_test('btc')
+
+	def btc_tn(self,name,ut):
+		return run_test('btc_tn')
+
+	def btc_rt(self,name,ut):
+		return run_test('btc_rt')
+
+	def ltc(self,name,ut):
+		return run_test('ltc')
+
+	def bch(self,name,ut):
+		return run_test('bch')