message signing: low-level infrastructure and unit test

- Messages can be signed for arbitrary lists or ranges of addresses.
- Addresses and ranges are specified using a SEED_ID:ADDR_TYPE:IDX_RANGE
  selector.
- For segwit addresses, signature data includes a pubkey hash and p2pkh address
  sharing the same key to allow verification of messages with external tools,
  e.g. `bitcoin-cli verifymessage`.

Testing:

    $ test/unit_tests.py -av msg
This commit is contained in:
The MMGen Project 2022-02-14 10:18:44 +00:00
commit 86e3b273c7
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
6 changed files with 450 additions and 1 deletions

View file

@ -69,6 +69,7 @@ class AddrListEntryBase(MMGenListItem):
class AddrListEntry(AddrListEntryBase):
addr = ListItemAttr(CoinAddr,include_proto=True)
addr_p2pkh = ListItemAttr(CoinAddr,include_proto=True)
idx = ListItemAttr(AddrIdx) # not present in flat addrlists
label = ListItemAttr(TwComment,reassign_ok=True)
sec = ListItemAttr(PrivKey,include_proto=True)
@ -148,9 +149,11 @@ class AddrList(MMGenObject): # Address info for a single seed ID
mmtype = None,
skip_key_address_validity_check = False,
skip_chksum = False,
add_p2pkh = False,
):
self.skip_ka_check = skip_key_address_validity_check
self.add_p2pkh = add_p2pkh
self.proto = proto
do_chksum = False
@ -227,6 +230,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
from .addr import KeyGenerator,AddrGenerator
kg = KeyGenerator( self.proto, mmtype.pubkey_type )
ag = AddrGenerator( self.proto, mmtype )
if self.add_p2pkh:
ag2 = AddrGenerator( self.proto, 'compressed' )
t_addrs,out = ( len(addr_idxs), AddrListData() )
le = self.entry_type
@ -259,6 +264,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
if self.gen_addrs:
data = kg.gen_data(e.sec)
e.addr = ag.to_addr(data)
if self.add_p2pkh:
e.addr_p2pkh = ag2.to_addr(data)
if gen_viewkey:
e.viewkey = ag.to_viewkey(data)
if gen_wallet_passwd:

35
mmgen/base_proto/bitcoin/msg.py Executable file
View file

@ -0,0 +1,35 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.bitcoin.msg: Bitcoin base protocol message signing classes
"""
from ...msg import coin_msg
class coin_msg(coin_msg):
class base(coin_msg.base): pass
class new(base,coin_msg.new): pass
class completed(base,coin_msg.completed): pass
class unsigned(completed,coin_msg.unsigned):
async def do_sign(self,wif,message):
return await self.rpc.call( 'signmessagewithprivkey', wif, message )
class signed(completed,coin_msg.signed): pass
class signed_online(signed,coin_msg.signed_online):
async def do_verify(self,addr,sig,message):
return await self.rpc.call( 'verifymessage', addr, sig, message )

View file

@ -1 +1 @@
13.1.dev20
13.1.dev21

305
mmgen/msg.py Executable file
View file

@ -0,0 +1,305 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
msg: base message signing classes
"""
import os,importlib,json
from .globalvars import g
from .objmethods import MMGenObject,Hilite,InitErrors
from .util import msg,vmsg,die,suf,make_chksum_6,fmt_list,remove_dups
from .color import orange
from .protocol import init_proto
from .fileutil import get_data_from_file,write_data_to_file
from .addr import MMGenID
class MMGenIDRange(str,Hilite,InitErrors,MMGenObject):
"""
closely based on MMGenID
"""
color = 'orange'
width = 0
trunc_ok = False
def __new__(cls,proto,id_str):
from .addrlist import AddrIdxList
from .addr import AddrListID
from .seed import SeedID
try:
ss = str(id_str).split(':')
assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2])
me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
me.sid = SeedID(sid=ss[0])
me.idxlist = AddrIdxList(ss[-1])
me.mmtype = t
assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
me.proto = proto
return me
except Exception as e:
return cls.init_fail(e,id_str)
class coin_msg:
class base(MMGenObject):
ext = 'rawmsg.json'
signed = False
@property
def desc(self):
return ('signed' if self.signed else 'unsigned') + ' message data'
@property
def chksum(self):
return make_chksum_6(
json.dumps( self.data, sort_keys=True, separators=(',', ':') ))
@property
def filename_stem(self):
coin,network = self.data['network'].split('_')
return '{}[{}]{}'.format(
self.chksum.upper(),
coin.upper(),
('' if network == 'mainnet' else '.'+network) )
@property
def filename(self):
return f'{self.filename_stem}.{self.ext}'
@property
def signed_filename(self):
return f'{self.filename_stem}.{coin_msg.signed.ext}'
def get_proto_from_file(self,filename):
coin,network = json.loads(get_data_from_file(filename))['metadata']['network'].split('_')
return init_proto( coin=coin, network=network )
def write_to_file(self,outdir=None,ask_overwrite=False):
data = {
'id': f'{g.proj_name} {self.desc}',
'metadata': self.data,
'signatures': self.sigs,
}
if hasattr(self,'failed_sids'):
data.update({'failed_seed_ids':self.failed_sids})
write_data_to_file(
outfile = os.path.join(outdir or '',self.filename),
data = json.dumps(data,sort_keys=True,indent=4),
desc = f'{self.desc} data',
ask_overwrite = ask_overwrite )
class new(base):
def __init__(self,message,addrlists,*args,**kwargs):
self.data = {
'network': '{}_{}'.format( self.proto.coin.lower(), self.proto.network ),
'addrlists': [MMGenIDRange(self.proto,i) for i in addrlists.split()],
'message': message,
}
self.sigs = {}
class completed(base):
def __init__(self,data,infile,*args,**kwargs):
if data:
self.__dict__ = data
return
elif infile:
self.infile = infile
self.data = get_data_from_file(
infile = self.infile,
desc = f'{self.desc} data' )
d = json.loads(self.data)
self.data = d['metadata']
self.sigs = d['signatures']
self.addrlists = [MMGenIDRange(self.proto,i) for i in self.data['addrlists']]
if d.get('failed_seed_ids'):
self.failed_seed_ids = d['failed_seed_ids']
def format(self,mmid=None):
def gen_entry(e):
yield fs2.format( 'addr:', e['addr'] )
if e.get('addr_p2pkh'):
yield fs2.format( 'addr_p2pkh:', e['addr_p2pkh'] )
if e.get('pubhash'):
yield fs2.format( 'pubkey hash:', e['pubhash'] )
yield fs2.format('sig:', e['sig'] )
def gen_all():
fs = '{:16s} {}'
for k,v in disp_data.items():
yield fs.format( v[0]+':', v[1](self.data[k]) )
if hasattr(self,'failed_seed_ids'):
yield fs.format(
'Failed Seed IDs:',
fmt_list(self.failed_seed_ids,fmt='bare') )
yield ''
yield 'Signatures:'
for n,(k,v) in enumerate(self.sigs.items()):
yield ''
yield '{:>3}) {}'.format(n+1,k)
for res in gen_entry(v):
yield res
def gen_single():
fs = '{:8s} {}'
for k,v in disp_data.items():
yield fs.format( v[0]+':', v[1](self.data[k]) )
yield 'Signature data:'
k = MMGenID(self.proto,mmid)
if k not in self.sigs:
die(1,f'{k}: address not found in signature data')
for res in gen_entry(self.sigs[k]):
yield res
disp_data = {
'message': ('Message', lambda v: repr(v) ),
'network': ('Network', lambda v: v.replace('_',' ').upper() ),
'addrlists': ('Address Ranges', lambda v: fmt_list(v,fmt='bare') ),
}
if mmid:
del disp_data['addrlists']
fs2 = ' {:12s} {}'
return '\n'.join(gen_single())
else:
fs2 = ' {:12s} {}'
return 'SIGNED MESSAGE DATA:\n\n ' + '\n '.join(gen_all())
class unsigned(completed):
async def sign(self,wallet_files):
async def sign_list(al_in,seed):
al = KeyAddrList(
proto = self.proto,
seed = seed,
addr_idxs = al_in.idxlist,
mmtype = al_in.mmtype,
skip_chksum = True,
add_p2pkh = al_in.mmtype in ('S','B') )
for e in al.data:
sig = await self.do_sign(
wif = e.sec.wif,
message = self.data['message'] )
mmid = '{}:{}:{}'.format( al_in.sid, al_in.mmtype, e.idx )
data = {
'addr': e.addr,
'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex(),
'sig': sig,
}
if e.addr_p2pkh:
data.update({'addr_p2pkh': e.addr_p2pkh})
self.sigs[mmid] = data
from .rpc import rpc_init
self.rpc = await rpc_init(self.proto)
from .wallet import Wallet
from .addrlist import KeyAddrList
wallet_seeds = [Wallet(fn=fn).seed for fn in wallet_files]
need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
saved_seeds = list()
# First try wallet seeds:
for sid in need_sids:
for seed in wallet_seeds:
if sid == seed.sid:
saved_seeds.append(seed)
need_sids.remove(sid)
break
# Then subseeds:
for sid in need_sids:
for seed in wallet_seeds:
subseed = seed.subseeds.get_subseed_by_seed_id(sid,print_msg=True)
if subseed:
saved_seeds.append(subseed)
need_sids.remove(sid)
break
for al in self.addrlists:
for seed in saved_seeds:
if al.sid == seed.sid:
await sign_list(al,seed)
break
if need_sids:
msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids,fmt='bare'))))
self.failed_sids = need_sids
class signed(completed):
ext = 'sigmsg.json'
signed = True
class signed_online(signed):
async def verify(self):
from .rpc import rpc_init
self.rpc = await rpc_init(self.proto)
for k,v in self.sigs.items():
ret = await self.do_verify(
addr = v.get('addr_p2pkh') or v['addr'],
sig = v['sig'],
message = self.data['message'] )
if not ret:
die(3,f'Invalid signature for address {k} ({v["addr"]})')
vmsg('{} signature{} verified'.format( len(self.sigs), suf(self.sigs) ))
def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs):
assert not args, 'msg:_get_obj(): only keyword args allowed'
if clsname == 'signed':
assert data and not (coin or infile), 'msg:_get_obj(): chk2'
else:
assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3'
proto = (
data['proto'] if data else
init_proto( coin=coin, network=network ) if coin else
coin_msg.base().get_proto_from_file(infile) )
cls = getattr(
getattr(importlib.import_module(f'mmgen.base_proto.{proto.base_proto.lower()}.msg'),'coin_msg'),
clsname )
me = MMGenObject.__new__(cls)
me.proto = proto
me.__init__(infile=infile,data=data,*args,**kwargs)
return me
def _get(clsname):
return lambda *args,**kwargs: _get_obj(clsname,*args,**kwargs)
NewMsg = _get('new')
CompletedMsg = _get('completed')
UnsignedMsg = _get('unsigned')
SignedMsg = _get('signed')
SignedOnlineMsg = _get('signed_online')

View file

@ -84,6 +84,9 @@ def gmsg_r(s):
def bmsg(s):
msg(blue(s))
def pumsg(s):
msg(purple(s))
def qmsg(s):
if not opt.quiet:
msg(s)

99
test/unit_tests_d/ut_msg.py Executable file
View file

@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""
test.unit_tests_d.ut_msg: message signing unit tests for the MMGen suite
"""
import os
from test.include.common import silence,end_silence,restart_test_daemons,stop_test_daemons
from mmgen.opts import opt
from mmgen.util import msg,bmsg,pumsg
from mmgen.protocol import CoinProtocol,init_proto
from mmgen.msg import NewMsg,UnsignedMsg,SignedMsg,SignedOnlineMsg
def get_obj(coin,network):
if coin == 'bch':
addrlists = 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1'
else:
# A091ABAA = 98831F3A:5S
addrlists = 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1'
return NewMsg(
coin = coin,
network = network,
message = '08/Jun/2021 Bitcoin Law Enacted by El Salvador Legislative Assembly',
addrlists = addrlists )
async def run_test(network_id):
coin,network = CoinProtocol.Base.parse_network_id(network_id)
if not opt.verbose:
silence()
bmsg(f'\nTesting {coin.upper()} {network.upper()}:\n')
restart_test_daemons(network_id)
pumsg('\nTesting data creation:\n')
m = get_obj(coin,network)
tmpdir = os.path.join('test','trash2')
os.makedirs(tmpdir,exist_ok=True)
m.write_to_file(
outdir = tmpdir,
ask_overwrite = False )
pumsg('\nTesting signing:\n')
m = UnsignedMsg( infile = os.path.join(tmpdir,get_obj(coin,network).filename) )
await m.sign(wallet_files=['test/ref/98831F3A.mmwords'])
m = SignedMsg( data=m.__dict__ )
m.write_to_file(
outdir = tmpdir,
ask_overwrite = False )
pumsg('\nTesting display:\n')
m = SignedOnlineMsg( infile = os.path.join(tmpdir,get_obj(coin,network).signed_filename) )
msg(m.format())
pumsg('\nTesting single address display:\n')
msg(m.format('A091ABAA:111'))
pumsg('\nTesting verification:\n')
await m.verify()
stop_test_daemons(network_id)
msg('\n')
if not opt.verbose:
end_silence()
return True
class unit_tests:
altcoin_deps = ('ltc','bch')
def btc(self,name,ut):
return run_test('btc')
def btc_tn(self,name,ut):
return run_test('btc_tn')
def btc_rt(self,name,ut):
return run_test('btc_rt')
def ltc(self,name,ut):
return run_test('ltc')
def bch(self,name,ut):
return run_test('bch')