message signing: low-level infrastructure and unit test
- Messages can be signed for arbitrary lists or ranges of addresses.
- Addresses and ranges are specified using a SEED_ID:ADDR_TYPE:IDX_RANGE
selector.
- For segwit addresses, signature data includes a pubkey hash and p2pkh address
sharing the same key to allow verification of messages with external tools,
e.g. `bitcoin-cli verifymessage`.
Testing:
$ test/unit_tests.py -av msg
This commit is contained in:
parent
937fb94470
commit
86e3b273c7
6 changed files with 450 additions and 1 deletions
|
|
@ -69,6 +69,7 @@ class AddrListEntryBase(MMGenListItem):
|
|||
|
||||
class AddrListEntry(AddrListEntryBase):
|
||||
addr = ListItemAttr(CoinAddr,include_proto=True)
|
||||
addr_p2pkh = ListItemAttr(CoinAddr,include_proto=True)
|
||||
idx = ListItemAttr(AddrIdx) # not present in flat addrlists
|
||||
label = ListItemAttr(TwComment,reassign_ok=True)
|
||||
sec = ListItemAttr(PrivKey,include_proto=True)
|
||||
|
|
@ -148,9 +149,11 @@ class AddrList(MMGenObject): # Address info for a single seed ID
|
|||
mmtype = None,
|
||||
skip_key_address_validity_check = False,
|
||||
skip_chksum = False,
|
||||
add_p2pkh = False,
|
||||
):
|
||||
|
||||
self.skip_ka_check = skip_key_address_validity_check
|
||||
self.add_p2pkh = add_p2pkh
|
||||
self.proto = proto
|
||||
do_chksum = False
|
||||
|
||||
|
|
@ -227,6 +230,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
|
|||
from .addr import KeyGenerator,AddrGenerator
|
||||
kg = KeyGenerator( self.proto, mmtype.pubkey_type )
|
||||
ag = AddrGenerator( self.proto, mmtype )
|
||||
if self.add_p2pkh:
|
||||
ag2 = AddrGenerator( self.proto, 'compressed' )
|
||||
|
||||
t_addrs,out = ( len(addr_idxs), AddrListData() )
|
||||
le = self.entry_type
|
||||
|
|
@ -259,6 +264,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
|
|||
if self.gen_addrs:
|
||||
data = kg.gen_data(e.sec)
|
||||
e.addr = ag.to_addr(data)
|
||||
if self.add_p2pkh:
|
||||
e.addr_p2pkh = ag2.to_addr(data)
|
||||
if gen_viewkey:
|
||||
e.viewkey = ag.to_viewkey(data)
|
||||
if gen_wallet_passwd:
|
||||
|
|
|
|||
35
mmgen/base_proto/bitcoin/msg.py
Executable file
35
mmgen/base_proto/bitcoin/msg.py
Executable file
|
|
@ -0,0 +1,35 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
|
||||
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
|
||||
# Licensed under the GNU General Public License, Version 3:
|
||||
# https://www.gnu.org/licenses
|
||||
# Public project repositories:
|
||||
# https://github.com/mmgen/mmgen
|
||||
# https://gitlab.com/mmgen/mmgen
|
||||
|
||||
"""
|
||||
base_proto.bitcoin.msg: Bitcoin base protocol message signing classes
|
||||
"""
|
||||
|
||||
from ...msg import coin_msg
|
||||
|
||||
class coin_msg(coin_msg):
|
||||
|
||||
class base(coin_msg.base): pass
|
||||
|
||||
class new(base,coin_msg.new): pass
|
||||
|
||||
class completed(base,coin_msg.completed): pass
|
||||
|
||||
class unsigned(completed,coin_msg.unsigned):
|
||||
|
||||
async def do_sign(self,wif,message):
|
||||
return await self.rpc.call( 'signmessagewithprivkey', wif, message )
|
||||
|
||||
class signed(completed,coin_msg.signed): pass
|
||||
|
||||
class signed_online(signed,coin_msg.signed_online):
|
||||
|
||||
async def do_verify(self,addr,sig,message):
|
||||
return await self.rpc.call( 'verifymessage', addr, sig, message )
|
||||
|
|
@ -1 +1 @@
|
|||
13.1.dev20
|
||||
13.1.dev21
|
||||
|
|
|
|||
305
mmgen/msg.py
Executable file
305
mmgen/msg.py
Executable file
|
|
@ -0,0 +1,305 @@
|
|||
#!/usr/bin/env python3
|
||||
#
|
||||
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
|
||||
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
|
||||
# Licensed under the GNU General Public License, Version 3:
|
||||
# https://www.gnu.org/licenses
|
||||
# Public project repositories:
|
||||
# https://github.com/mmgen/mmgen
|
||||
# https://gitlab.com/mmgen/mmgen
|
||||
|
||||
"""
|
||||
msg: base message signing classes
|
||||
"""
|
||||
|
||||
import os,importlib,json
|
||||
from .globalvars import g
|
||||
from .objmethods import MMGenObject,Hilite,InitErrors
|
||||
from .util import msg,vmsg,die,suf,make_chksum_6,fmt_list,remove_dups
|
||||
from .color import orange
|
||||
from .protocol import init_proto
|
||||
from .fileutil import get_data_from_file,write_data_to_file
|
||||
from .addr import MMGenID
|
||||
|
||||
class MMGenIDRange(str,Hilite,InitErrors,MMGenObject):
|
||||
"""
|
||||
closely based on MMGenID
|
||||
"""
|
||||
color = 'orange'
|
||||
width = 0
|
||||
trunc_ok = False
|
||||
def __new__(cls,proto,id_str):
|
||||
from .addrlist import AddrIdxList
|
||||
from .addr import AddrListID
|
||||
from .seed import SeedID
|
||||
try:
|
||||
ss = str(id_str).split(':')
|
||||
assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
|
||||
t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2])
|
||||
me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
|
||||
me.sid = SeedID(sid=ss[0])
|
||||
me.idxlist = AddrIdxList(ss[-1])
|
||||
me.mmtype = t
|
||||
assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
|
||||
me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
|
||||
me.proto = proto
|
||||
return me
|
||||
except Exception as e:
|
||||
return cls.init_fail(e,id_str)
|
||||
|
||||
class coin_msg:
|
||||
|
||||
class base(MMGenObject):
|
||||
|
||||
ext = 'rawmsg.json'
|
||||
signed = False
|
||||
|
||||
@property
|
||||
def desc(self):
|
||||
return ('signed' if self.signed else 'unsigned') + ' message data'
|
||||
|
||||
@property
|
||||
def chksum(self):
|
||||
return make_chksum_6(
|
||||
json.dumps( self.data, sort_keys=True, separators=(',', ':') ))
|
||||
|
||||
@property
|
||||
def filename_stem(self):
|
||||
coin,network = self.data['network'].split('_')
|
||||
return '{}[{}]{}'.format(
|
||||
self.chksum.upper(),
|
||||
coin.upper(),
|
||||
('' if network == 'mainnet' else '.'+network) )
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
return f'{self.filename_stem}.{self.ext}'
|
||||
|
||||
@property
|
||||
def signed_filename(self):
|
||||
return f'{self.filename_stem}.{coin_msg.signed.ext}'
|
||||
|
||||
def get_proto_from_file(self,filename):
|
||||
coin,network = json.loads(get_data_from_file(filename))['metadata']['network'].split('_')
|
||||
return init_proto( coin=coin, network=network )
|
||||
|
||||
def write_to_file(self,outdir=None,ask_overwrite=False):
|
||||
data = {
|
||||
'id': f'{g.proj_name} {self.desc}',
|
||||
'metadata': self.data,
|
||||
'signatures': self.sigs,
|
||||
}
|
||||
|
||||
if hasattr(self,'failed_sids'):
|
||||
data.update({'failed_seed_ids':self.failed_sids})
|
||||
|
||||
write_data_to_file(
|
||||
outfile = os.path.join(outdir or '',self.filename),
|
||||
data = json.dumps(data,sort_keys=True,indent=4),
|
||||
desc = f'{self.desc} data',
|
||||
ask_overwrite = ask_overwrite )
|
||||
|
||||
class new(base):
|
||||
|
||||
def __init__(self,message,addrlists,*args,**kwargs):
|
||||
self.data = {
|
||||
'network': '{}_{}'.format( self.proto.coin.lower(), self.proto.network ),
|
||||
'addrlists': [MMGenIDRange(self.proto,i) for i in addrlists.split()],
|
||||
'message': message,
|
||||
}
|
||||
self.sigs = {}
|
||||
|
||||
class completed(base):
|
||||
|
||||
def __init__(self,data,infile,*args,**kwargs):
|
||||
|
||||
if data:
|
||||
self.__dict__ = data
|
||||
return
|
||||
elif infile:
|
||||
self.infile = infile
|
||||
|
||||
self.data = get_data_from_file(
|
||||
infile = self.infile,
|
||||
desc = f'{self.desc} data' )
|
||||
|
||||
d = json.loads(self.data)
|
||||
self.data = d['metadata']
|
||||
self.sigs = d['signatures']
|
||||
self.addrlists = [MMGenIDRange(self.proto,i) for i in self.data['addrlists']]
|
||||
if d.get('failed_seed_ids'):
|
||||
self.failed_seed_ids = d['failed_seed_ids']
|
||||
|
||||
def format(self,mmid=None):
|
||||
|
||||
def gen_entry(e):
|
||||
yield fs2.format( 'addr:', e['addr'] )
|
||||
if e.get('addr_p2pkh'):
|
||||
yield fs2.format( 'addr_p2pkh:', e['addr_p2pkh'] )
|
||||
if e.get('pubhash'):
|
||||
yield fs2.format( 'pubkey hash:', e['pubhash'] )
|
||||
yield fs2.format('sig:', e['sig'] )
|
||||
|
||||
def gen_all():
|
||||
fs = '{:16s} {}'
|
||||
for k,v in disp_data.items():
|
||||
yield fs.format( v[0]+':', v[1](self.data[k]) )
|
||||
if hasattr(self,'failed_seed_ids'):
|
||||
yield fs.format(
|
||||
'Failed Seed IDs:',
|
||||
fmt_list(self.failed_seed_ids,fmt='bare') )
|
||||
yield ''
|
||||
yield 'Signatures:'
|
||||
for n,(k,v) in enumerate(self.sigs.items()):
|
||||
yield ''
|
||||
yield '{:>3}) {}'.format(n+1,k)
|
||||
for res in gen_entry(v):
|
||||
yield res
|
||||
|
||||
def gen_single():
|
||||
fs = '{:8s} {}'
|
||||
for k,v in disp_data.items():
|
||||
yield fs.format( v[0]+':', v[1](self.data[k]) )
|
||||
yield 'Signature data:'
|
||||
k = MMGenID(self.proto,mmid)
|
||||
if k not in self.sigs:
|
||||
die(1,f'{k}: address not found in signature data')
|
||||
for res in gen_entry(self.sigs[k]):
|
||||
yield res
|
||||
|
||||
disp_data = {
|
||||
'message': ('Message', lambda v: repr(v) ),
|
||||
'network': ('Network', lambda v: v.replace('_',' ').upper() ),
|
||||
'addrlists': ('Address Ranges', lambda v: fmt_list(v,fmt='bare') ),
|
||||
}
|
||||
|
||||
if mmid:
|
||||
del disp_data['addrlists']
|
||||
fs2 = ' {:12s} {}'
|
||||
return '\n'.join(gen_single())
|
||||
else:
|
||||
fs2 = ' {:12s} {}'
|
||||
return 'SIGNED MESSAGE DATA:\n\n ' + '\n '.join(gen_all())
|
||||
|
||||
class unsigned(completed):
|
||||
|
||||
async def sign(self,wallet_files):
|
||||
|
||||
async def sign_list(al_in,seed):
|
||||
al = KeyAddrList(
|
||||
proto = self.proto,
|
||||
seed = seed,
|
||||
addr_idxs = al_in.idxlist,
|
||||
mmtype = al_in.mmtype,
|
||||
skip_chksum = True,
|
||||
add_p2pkh = al_in.mmtype in ('S','B') )
|
||||
|
||||
for e in al.data:
|
||||
sig = await self.do_sign(
|
||||
wif = e.sec.wif,
|
||||
message = self.data['message'] )
|
||||
|
||||
mmid = '{}:{}:{}'.format( al_in.sid, al_in.mmtype, e.idx )
|
||||
data = {
|
||||
'addr': e.addr,
|
||||
'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex(),
|
||||
'sig': sig,
|
||||
}
|
||||
|
||||
if e.addr_p2pkh:
|
||||
data.update({'addr_p2pkh': e.addr_p2pkh})
|
||||
|
||||
self.sigs[mmid] = data
|
||||
|
||||
from .rpc import rpc_init
|
||||
self.rpc = await rpc_init(self.proto)
|
||||
|
||||
from .wallet import Wallet
|
||||
from .addrlist import KeyAddrList
|
||||
wallet_seeds = [Wallet(fn=fn).seed for fn in wallet_files]
|
||||
need_sids = remove_dups([al.sid for al in self.addrlists], quiet=True)
|
||||
saved_seeds = list()
|
||||
|
||||
# First try wallet seeds:
|
||||
for sid in need_sids:
|
||||
for seed in wallet_seeds:
|
||||
if sid == seed.sid:
|
||||
saved_seeds.append(seed)
|
||||
need_sids.remove(sid)
|
||||
break
|
||||
|
||||
# Then subseeds:
|
||||
for sid in need_sids:
|
||||
for seed in wallet_seeds:
|
||||
subseed = seed.subseeds.get_subseed_by_seed_id(sid,print_msg=True)
|
||||
if subseed:
|
||||
saved_seeds.append(subseed)
|
||||
need_sids.remove(sid)
|
||||
break
|
||||
|
||||
for al in self.addrlists:
|
||||
for seed in saved_seeds:
|
||||
if al.sid == seed.sid:
|
||||
await sign_list(al,seed)
|
||||
break
|
||||
|
||||
if need_sids:
|
||||
msg('Failed Seed IDs: {}'.format(orange(fmt_list(need_sids,fmt='bare'))))
|
||||
|
||||
self.failed_sids = need_sids
|
||||
|
||||
class signed(completed):
|
||||
|
||||
ext = 'sigmsg.json'
|
||||
signed = True
|
||||
|
||||
class signed_online(signed):
|
||||
|
||||
async def verify(self):
|
||||
|
||||
from .rpc import rpc_init
|
||||
self.rpc = await rpc_init(self.proto)
|
||||
|
||||
for k,v in self.sigs.items():
|
||||
ret = await self.do_verify(
|
||||
addr = v.get('addr_p2pkh') or v['addr'],
|
||||
sig = v['sig'],
|
||||
message = self.data['message'] )
|
||||
if not ret:
|
||||
die(3,f'Invalid signature for address {k} ({v["addr"]})')
|
||||
|
||||
vmsg('{} signature{} verified'.format( len(self.sigs), suf(self.sigs) ))
|
||||
|
||||
def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs):
|
||||
|
||||
assert not args, 'msg:_get_obj(): only keyword args allowed'
|
||||
|
||||
if clsname == 'signed':
|
||||
assert data and not (coin or infile), 'msg:_get_obj(): chk2'
|
||||
else:
|
||||
assert not data and (coin or infile) and not (coin and infile), 'msg:_get_obj(): chk3'
|
||||
|
||||
proto = (
|
||||
data['proto'] if data else
|
||||
init_proto( coin=coin, network=network ) if coin else
|
||||
coin_msg.base().get_proto_from_file(infile) )
|
||||
|
||||
cls = getattr(
|
||||
getattr(importlib.import_module(f'mmgen.base_proto.{proto.base_proto.lower()}.msg'),'coin_msg'),
|
||||
clsname )
|
||||
|
||||
me = MMGenObject.__new__(cls)
|
||||
me.proto = proto
|
||||
|
||||
me.__init__(infile=infile,data=data,*args,**kwargs)
|
||||
|
||||
return me
|
||||
|
||||
def _get(clsname):
|
||||
return lambda *args,**kwargs: _get_obj(clsname,*args,**kwargs)
|
||||
|
||||
NewMsg = _get('new')
|
||||
CompletedMsg = _get('completed')
|
||||
UnsignedMsg = _get('unsigned')
|
||||
SignedMsg = _get('signed')
|
||||
SignedOnlineMsg = _get('signed_online')
|
||||
|
|
@ -84,6 +84,9 @@ def gmsg_r(s):
|
|||
def bmsg(s):
|
||||
msg(blue(s))
|
||||
|
||||
def pumsg(s):
|
||||
msg(purple(s))
|
||||
|
||||
def qmsg(s):
|
||||
if not opt.quiet:
|
||||
msg(s)
|
||||
|
|
|
|||
99
test/unit_tests_d/ut_msg.py
Executable file
99
test/unit_tests_d/ut_msg.py
Executable file
|
|
@ -0,0 +1,99 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
test.unit_tests_d.ut_msg: message signing unit tests for the MMGen suite
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from test.include.common import silence,end_silence,restart_test_daemons,stop_test_daemons
|
||||
from mmgen.opts import opt
|
||||
from mmgen.util import msg,bmsg,pumsg
|
||||
from mmgen.protocol import CoinProtocol,init_proto
|
||||
from mmgen.msg import NewMsg,UnsignedMsg,SignedMsg,SignedOnlineMsg
|
||||
|
||||
def get_obj(coin,network):
|
||||
|
||||
if coin == 'bch':
|
||||
addrlists = 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1'
|
||||
else:
|
||||
# A091ABAA = 98831F3A:5S
|
||||
addrlists = 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1'
|
||||
|
||||
return NewMsg(
|
||||
coin = coin,
|
||||
network = network,
|
||||
message = '08/Jun/2021 Bitcoin Law Enacted by El Salvador Legislative Assembly',
|
||||
addrlists = addrlists )
|
||||
|
||||
async def run_test(network_id):
|
||||
|
||||
coin,network = CoinProtocol.Base.parse_network_id(network_id)
|
||||
|
||||
if not opt.verbose:
|
||||
silence()
|
||||
|
||||
bmsg(f'\nTesting {coin.upper()} {network.upper()}:\n')
|
||||
|
||||
restart_test_daemons(network_id)
|
||||
|
||||
pumsg('\nTesting data creation:\n')
|
||||
|
||||
m = get_obj(coin,network)
|
||||
|
||||
tmpdir = os.path.join('test','trash2')
|
||||
|
||||
os.makedirs(tmpdir,exist_ok=True)
|
||||
|
||||
m.write_to_file(
|
||||
outdir = tmpdir,
|
||||
ask_overwrite = False )
|
||||
|
||||
pumsg('\nTesting signing:\n')
|
||||
|
||||
m = UnsignedMsg( infile = os.path.join(tmpdir,get_obj(coin,network).filename) )
|
||||
await m.sign(wallet_files=['test/ref/98831F3A.mmwords'])
|
||||
|
||||
m = SignedMsg( data=m.__dict__ )
|
||||
m.write_to_file(
|
||||
outdir = tmpdir,
|
||||
ask_overwrite = False )
|
||||
|
||||
pumsg('\nTesting display:\n')
|
||||
|
||||
m = SignedOnlineMsg( infile = os.path.join(tmpdir,get_obj(coin,network).signed_filename) )
|
||||
|
||||
msg(m.format())
|
||||
|
||||
pumsg('\nTesting single address display:\n')
|
||||
msg(m.format('A091ABAA:111'))
|
||||
|
||||
pumsg('\nTesting verification:\n')
|
||||
await m.verify()
|
||||
|
||||
stop_test_daemons(network_id)
|
||||
|
||||
msg('\n')
|
||||
|
||||
if not opt.verbose:
|
||||
end_silence()
|
||||
|
||||
return True
|
||||
|
||||
class unit_tests:
|
||||
|
||||
altcoin_deps = ('ltc','bch')
|
||||
|
||||
def btc(self,name,ut):
|
||||
return run_test('btc')
|
||||
|
||||
def btc_tn(self,name,ut):
|
||||
return run_test('btc_tn')
|
||||
|
||||
def btc_rt(self,name,ut):
|
||||
return run_test('btc_rt')
|
||||
|
||||
def ltc(self,name,ut):
|
||||
return run_test('ltc')
|
||||
|
||||
def bch(self,name,ut):
|
||||
return run_test('bch')
|
||||
Loading…
Add table
Add a link
Reference in a new issue