From 86e3b273c7fa8639af05e8276e8f16a2f0b3b521 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Mon, 14 Feb 2022 10:18:44 +0000 Subject: [PATCH] message signing: low-level infrastructure and unit test - Messages can be signed for arbitrary lists or ranges of addresses. - Addresses and ranges are specified using a SEED_ID:ADDR_TYPE:IDX_RANGE selector. - For segwit addresses, signature data includes a pubkey hash and p2pkh address sharing the same key to allow verification of messages with external tools, e.g. `bitcoin-cli verifymessage`. Testing: $ test/unit_tests.py -av msg --- mmgen/addrlist.py | 7 + mmgen/base_proto/bitcoin/msg.py | 35 ++++ mmgen/data/version | 2 +- mmgen/msg.py | 305 ++++++++++++++++++++++++++++++++ mmgen/util.py | 3 + test/unit_tests_d/ut_msg.py | 99 +++++++++++ 6 files changed, 450 insertions(+), 1 deletion(-) create mode 100755 mmgen/base_proto/bitcoin/msg.py create mode 100755 mmgen/msg.py create mode 100755 test/unit_tests_d/ut_msg.py diff --git a/mmgen/addrlist.py b/mmgen/addrlist.py index 1732eb04..81833479 100755 --- a/mmgen/addrlist.py +++ b/mmgen/addrlist.py @@ -69,6 +69,7 @@ class AddrListEntryBase(MMGenListItem): class AddrListEntry(AddrListEntryBase): addr = ListItemAttr(CoinAddr,include_proto=True) + addr_p2pkh = ListItemAttr(CoinAddr,include_proto=True) idx = ListItemAttr(AddrIdx) # not present in flat addrlists label = ListItemAttr(TwComment,reassign_ok=True) sec = ListItemAttr(PrivKey,include_proto=True) @@ -148,9 +149,11 @@ class AddrList(MMGenObject): # Address info for a single seed ID mmtype = None, skip_key_address_validity_check = False, skip_chksum = False, + add_p2pkh = False, ): self.skip_ka_check = skip_key_address_validity_check + self.add_p2pkh = add_p2pkh self.proto = proto do_chksum = False @@ -227,6 +230,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID from .addr import KeyGenerator,AddrGenerator kg = KeyGenerator( self.proto, mmtype.pubkey_type ) ag = AddrGenerator( self.proto, mmtype ) + if self.add_p2pkh: + ag2 = AddrGenerator( self.proto, 'compressed' ) t_addrs,out = ( len(addr_idxs), AddrListData() ) le = self.entry_type @@ -259,6 +264,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID if self.gen_addrs: data = kg.gen_data(e.sec) e.addr = ag.to_addr(data) + if self.add_p2pkh: + e.addr_p2pkh = ag2.to_addr(data) if gen_viewkey: e.viewkey = ag.to_viewkey(data) if gen_wallet_passwd: diff --git a/mmgen/base_proto/bitcoin/msg.py b/mmgen/base_proto/bitcoin/msg.py new file mode 100755 index 00000000..0829f975 --- /dev/null +++ b/mmgen/base_proto/bitcoin/msg.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet +# Copyright (C)2013-2022 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen +# https://gitlab.com/mmgen/mmgen + +""" +base_proto.bitcoin.msg: Bitcoin base protocol message signing classes +""" + +from ...msg import coin_msg + +class coin_msg(coin_msg): + + class base(coin_msg.base): pass + + class new(base,coin_msg.new): pass + + class completed(base,coin_msg.completed): pass + + class unsigned(completed,coin_msg.unsigned): + + async def do_sign(self,wif,message): + return await self.rpc.call( 'signmessagewithprivkey', wif, message ) + + class signed(completed,coin_msg.signed): pass + + class signed_online(signed,coin_msg.signed_online): + + async def do_verify(self,addr,sig,message): + return await self.rpc.call( 'verifymessage', addr, sig, message ) diff --git a/mmgen/data/version b/mmgen/data/version index 8dcbceb3..5234c8ba 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -13.1.dev20 +13.1.dev21 diff --git a/mmgen/msg.py b/mmgen/msg.py new file mode 100755 index 00000000..4400e062 --- /dev/null +++ b/mmgen/msg.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet +# Copyright (C)2013-2022 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen +# https://gitlab.com/mmgen/mmgen + +""" +msg: base message signing classes +""" + +import os,importlib,json +from .globalvars import g +from .objmethods import MMGenObject,Hilite,InitErrors +from .util import msg,vmsg,die,suf,make_chksum_6,fmt_list,remove_dups +from .color import orange +from .protocol import init_proto +from .fileutil import get_data_from_file,write_data_to_file +from .addr import MMGenID + +class MMGenIDRange(str,Hilite,InitErrors,MMGenObject): + """ + closely based on MMGenID + """ + color = 'orange' + width = 0 + trunc_ok = False + def __new__(cls,proto,id_str): + from .addrlist import AddrIdxList + from .addr import AddrListID + from .seed import SeedID + try: + ss = str(id_str).split(':') + assert len(ss) in (2,3),'not 2 or 3 colon-separated items' + t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2]) + me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1])) + me.sid = SeedID(sid=ss[0]) + me.idxlist = AddrIdxList(ss[-1]) + me.mmtype = t + assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}' + me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done + me.proto = proto + return me + except Exception as e: + return cls.init_fail(e,id_str) + +class coin_msg: + + class base(MMGenObject): + + ext = 'rawmsg.json' + signed = False + + @property + def desc(self): + return ('signed' if self.signed else 'unsigned') + ' message data' + + @property + def chksum(self): + return make_chksum_6( + json.dumps( self.data, sort_keys=True, separators=(',', ':') )) + + @property + def filename_stem(self): + coin,network = self.data['network'].split('_') + return '{}[{}]{}'.format( + self.chksum.upper(), + coin.upper(), + ('' if network == 'mainnet' else '.'+network) ) + + @property + def filename(self): + return f'{self.filename_stem}.{self.ext}' + + @property + def signed_filename(self): + return f'{self.filename_stem}.{coin_msg.signed.ext}' + + def get_proto_from_file(self,filename): + coin,network = json.loads(get_data_from_file(filename))['metadata']['network'].split('_') + return init_proto( coin=coin, network=network ) + + def write_to_file(self,outdir=None,ask_overwrite=False): + data = { + 'id': f'{g.proj_name} {self.desc}', + 'metadata': self.data, + 'signatures': self.sigs, + } + + if hasattr(self,'failed_sids'): + data.update({'failed_seed_ids':self.failed_sids}) + + write_data_to_file( + outfile = os.path.join(outdir or '',self.filename), + data = json.dumps(data,sort_keys=True,indent=4), + desc = f'{self.desc} data', + ask_overwrite = ask_overwrite ) + + class new(base): + + def __init__(self,message,addrlists,*args,**kwargs): + self.data = { + 'network': '{}_{}'.format( self.proto.coin.lower(), self.proto.network ), + 'addrlists': [MMGenIDRange(self.proto,i) for i in addrlists.split()], + 'message': message, + } + self.sigs = {} + + class completed(base): + + def __init__(self,data,infile,*args,**kwargs): + + if data: + self.__dict__ = data + return + elif infile: + self.infile = infile + + self.data = get_data_from_file( + infile = self.infile, + desc = f'{self.desc} data' ) + + d = json.loads(self.data) + self.data = d['metadata'] + self.sigs = d['signatures'] + self.addrlists = [MMGenIDRange(self.proto,i) for i in self.data['addrlists']] + if d.get('failed_seed_ids'): + self.failed_seed_ids = d['failed_seed_ids'] + + def format(self,mmid=None): + + def gen_entry(e): + yield fs2.format( 'addr:', e['addr'] ) + if e.get('addr_p2pkh'): + yield fs2.format( 'addr_p2pkh:', e['addr_p2pkh'] ) + if e.get('pubhash'): + yield fs2.format( 'pubkey hash:', e['pubhash'] ) + yield fs2.format('sig:', e['sig'] ) + + def gen_all(): + fs = '{:16s} {}' + for k,v in disp_data.items(): + yield fs.format( v[0]+':', v[1](self.data[k]) ) + if hasattr(self,'failed_seed_ids'): + yield fs.format( + 'Failed Seed IDs:', + fmt_list(self.failed_seed_ids,fmt='bare') ) + yield '' + yield 'Signatures:' + for n,(k,v) in enumerate(self.sigs.items()): + yield '' + yield '{:>3}) {}'.format(n+1,k) + for res in gen_entry(v): + yield res + + def gen_single(): + fs = '{:8s} {}' + for k,v in disp_data.items(): + yield fs.format( v[0]+':', v[1](self.data[k]) ) + yield 'Signature data:' + k = MMGenID(self.proto,mmid) + if k not in self.sigs: + die(1,f'{k}: address not found in signature data') + for res in gen_entry(self.sigs[k]): + yield res + + disp_data = { + 'message': ('Message', lambda v: repr(v) ), + 'network': ('Network', lambda v: v.replace('_',' ').upper() ), + 'addrlists': ('Address Ranges', lambda v: fmt_list(v,fmt='bare') ), + } + + if mmid: + del disp_data['addrlists'] + fs2 = ' {:12s} {}' + return '\n'.join(gen_single()) + else: + fs2 = ' {:12s} {}' + return 'SIGNED MESSAGE DATA:\n\n ' + '\n '.join(gen_all()) + + class unsigned(completed): + + async def sign(self,wallet_files): + + async def sign_list(al_in,seed): + al = KeyAddrList( + proto = self.proto, + seed = seed, + addr_idxs = al_in.idxlist, + mmtype = al_in.mmtype, + skip_chksum = True, + add_p2pkh = al_in.mmtype in ('S','B') ) + + for e in al.data: + sig = await self.do_sign( + wif = e.sec.wif, + message = self.data['message'] ) + + mmid = '{}:{}:{}'.format( al_in.sid, al_in.mmtype, e.idx ) + data = { + 'addr': e.addr, + 'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex(), + 'sig': sig, + } + + if e.addr_p2pkh: + data.update({'addr_p2pkh': e.addr_p2pkh}) + + self.sigs[mmid] = data + + from .rpc import rpc_init + self.rpc = await rpc_init(self.proto) + + from .wallet import Wallet + from .addrlist import KeyAddrList + wallet_seeds = [Wallet(fn=fn).seed for fn in wallet_files] + need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True) + saved_seeds = list() + + # First try wallet seeds: + for sid in need_sids: + for seed in wallet_seeds: + if sid == seed.sid: + saved_seeds.append(seed) + need_sids.remove(sid) + break + + # Then subseeds: + for sid in need_sids: + for seed in wallet_seeds: + subseed = seed.subseeds.get_subseed_by_seed_id(sid,print_msg=True) + if subseed: + saved_seeds.append(subseed) + need_sids.remove(sid) + break + + for al in self.addrlists: + for seed in saved_seeds: + if al.sid == seed.sid: + await sign_list(al,seed) + break + + if need_sids: + msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids,fmt='bare')))) + + self.failed_sids = need_sids + + class signed(completed): + + ext = 'sigmsg.json' + signed = True + + class signed_online(signed): + + async def verify(self): + + from .rpc import rpc_init + self.rpc = await rpc_init(self.proto) + + for k,v in self.sigs.items(): + ret = await self.do_verify( + addr = v.get('addr_p2pkh') or v['addr'], + sig = v['sig'], + message = self.data['message'] ) + if not ret: + die(3,f'Invalid signature for address {k} ({v["addr"]})') + + vmsg('{} signature{} verified'.format( len(self.sigs), suf(self.sigs) )) + +def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs): + + assert not args, 'msg:_get_obj(): only keyword args allowed' + + if clsname == 'signed': + assert data and not (coin or infile), 'msg:_get_obj(): chk2' + else: + assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3' + + proto = ( + data['proto'] if data else + init_proto( coin=coin, network=network ) if coin else + coin_msg.base().get_proto_from_file(infile) ) + + cls = getattr( + getattr(importlib.import_module(f'mmgen.base_proto.{proto.base_proto.lower()}.msg'),'coin_msg'), + clsname ) + + me = MMGenObject.__new__(cls) + me.proto = proto + + me.__init__(infile=infile,data=data,*args,**kwargs) + + return me + +def _get(clsname): + return lambda *args,**kwargs: _get_obj(clsname,*args,**kwargs) + +NewMsg = _get('new') +CompletedMsg = _get('completed') +UnsignedMsg = _get('unsigned') +SignedMsg = _get('signed') +SignedOnlineMsg = _get('signed_online') diff --git a/mmgen/util.py b/mmgen/util.py index 418deef0..3bbb6c90 100755 --- a/mmgen/util.py +++ b/mmgen/util.py @@ -84,6 +84,9 @@ def gmsg_r(s): def bmsg(s): msg(blue(s)) +def pumsg(s): + msg(purple(s)) + def qmsg(s): if not opt.quiet: msg(s) diff --git a/test/unit_tests_d/ut_msg.py b/test/unit_tests_d/ut_msg.py new file mode 100755 index 00000000..6eb5d6e5 --- /dev/null +++ b/test/unit_tests_d/ut_msg.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +""" +test.unit_tests_d.ut_msg: message signing unit tests for the MMGen suite +""" + +import os + +from test.include.common import silence,end_silence,restart_test_daemons,stop_test_daemons +from mmgen.opts import opt +from mmgen.util import msg,bmsg,pumsg +from mmgen.protocol import CoinProtocol,init_proto +from mmgen.msg import NewMsg,UnsignedMsg,SignedMsg,SignedOnlineMsg + +def get_obj(coin,network): + + if coin == 'bch': + addrlists = 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1' + else: + # A091ABAA = 98831F3A:5S + addrlists = 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1' + + return NewMsg( + coin = coin, + network = network, + message = '08/Jun/2021 Bitcoin Law Enacted by El Salvador Legislative Assembly', + addrlists = addrlists ) + +async def run_test(network_id): + + coin,network = CoinProtocol.Base.parse_network_id(network_id) + + if not opt.verbose: + silence() + + bmsg(f'\nTesting {coin.upper()} {network.upper()}:\n') + + restart_test_daemons(network_id) + + pumsg('\nTesting data creation:\n') + + m = get_obj(coin,network) + + tmpdir = os.path.join('test','trash2') + + os.makedirs(tmpdir,exist_ok=True) + + m.write_to_file( + outdir = tmpdir, + ask_overwrite = False ) + + pumsg('\nTesting signing:\n') + + m = UnsignedMsg( infile = os.path.join(tmpdir,get_obj(coin,network).filename) ) + await m.sign(wallet_files=['test/ref/98831F3A.mmwords']) + + m = SignedMsg( data=m.__dict__ ) + m.write_to_file( + outdir = tmpdir, + ask_overwrite = False ) + + pumsg('\nTesting display:\n') + + m = SignedOnlineMsg( infile = os.path.join(tmpdir,get_obj(coin,network).signed_filename) ) + + msg(m.format()) + + pumsg('\nTesting single address display:\n') + msg(m.format('A091ABAA:111')) + + pumsg('\nTesting verification:\n') + await m.verify() + + stop_test_daemons(network_id) + + msg('\n') + + if not opt.verbose: + end_silence() + + return True + +class unit_tests: + + altcoin_deps = ('ltc','bch') + + def btc(self,name,ut): + return run_test('btc') + + def btc_tn(self,name,ut): + return run_test('btc_tn') + + def btc_rt(self,name,ut): + return run_test('btc_rt') + + def ltc(self,name,ut): + return run_test('ltc') + + def bch(self,name,ut): + return run_test('bch')