message signing: support Ethereum

- Ethereum signatures conform to the standard defined by the Geth `eth_sign`
  JSON-RPC call

Usage information:

    $ mmgen-msg --help

Testing:

    $ test/unit_tests.py -v msg.eth
    $ test/test.py -e --coin=eth --daemon-id=geth -X msgverify_export ethdev
This commit is contained in:
The MMGen Project 2022-04-28 11:00:53 +00:00
commit 770b209afc
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
7 changed files with 182 additions and 15 deletions

View file

@ -66,3 +66,36 @@ def extract_key_from_geth_keystore_wallet(wallet_fn,passwd,check_addr=True):
assert addr == addr_chk, f'incorrect address: ({addr} != {addr_chk})'
return key
def ec_sign_message_with_privkey(message,key):
"""
Sign an arbitrary string with an Ethereum private key, returning the signature
Conforms to the standard defined by the Geth `eth_sign` JSON-RPC call
"""
from ...util import get_keccak
msghash = get_keccak()(
'\x19Ethereum Signed Message:\n{}{}'.format( len(message), message ).encode()
).digest()
from py_ecc.secp256k1 import ecdsa_raw_sign
v,r,s = ecdsa_raw_sign( msghash, key )
return '{:064x}{:064x}{:02x}'.format(r,s,v)
def ec_recover_pubkey(message,sig):
"""
Given a message and signature, recover the public key associated with the private key
used to make the signature
Conforms to the standard defined by the Geth `eth_sign` JSON-RPC call
"""
from ...util import get_keccak
msghash = get_keccak()(
'\x19Ethereum Signed Message:\n{}{}'.format( len(message), message ).encode()
).digest()
from py_ecc.secp256k1 import ecdsa_raw_recover
r,s,v = ( sig[:64], sig[64:128], sig[128:] )
return '{:064x}{:064x}'.format(
*ecdsa_raw_recover( msghash, tuple(int(hexstr,16) for hexstr in (v,r,s)) )
)

View file

@ -0,0 +1,40 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
base_proto.ethereum.msg: Ethereum base protocol message signing classes
"""
from ...msg import coin_msg
class coin_msg(coin_msg):
class base(coin_msg.base): pass
class new(base,coin_msg.new): pass
class completed(base,coin_msg.completed): pass
class unsigned(completed,coin_msg.unsigned):
async def do_sign(self,wif,message):
from .misc import ec_sign_message_with_privkey
return ec_sign_message_with_privkey( message, bytes.fromhex(wif) )
class signed(completed,coin_msg.signed): pass
class signed_online(signed,coin_msg.signed_online):
async def do_verify(self,addr,sig,message):
from ...tool.coin import tool_cmd
from .misc import ec_recover_pubkey
return tool_cmd(proto=self.proto).pubhex2addr(ec_recover_pubkey( message, sig )) == addr
class exported_sigs(coin_msg.exported_sigs,signed_online): pass

View file

@ -1 +1 @@
13.1.dev25
13.1.dev26

View file

@ -24,8 +24,6 @@ class MsgOps:
def __init__(self,msg,addr_specs):
from .protocol import init_proto_from_opts
proto = init_proto_from_opts()
if proto.base_proto != 'Bitcoin':
die('Message signing operations are supported for Bitcoin and Bitcoin-derived coins only')
NewMsg(
coin = proto.coin,
network = proto.network,
@ -124,8 +122,11 @@ separated address index ranges.
NOTES
Message signing operations are currently supported for Bitcoin and Bitcoin
code fork coins only.
Message signing operations are supported for Bitcoin, Ethereum and code forks
thereof.
Ethereum signatures conform to the standard defined by the Geth eth_sign
JSON-RPC call.
Messages signed for Segwit-P2SH addresses cannot be verified directly using
the Bitcoin Core `verifymessage` RPC call, since such addresses are not hashes

View file

@ -49,7 +49,7 @@ class MMGenIDRange(str,Hilite,InitErrors,MMGenObject):
class coin_msg:
supported_base_protos = ('Bitcoin',)
supported_base_protos = ('Bitcoin','Ethereum')
class base(MMGenObject):
@ -184,7 +184,10 @@ class coin_msg:
del hdr_data['failed_sids']
fs1 = '{:%s} {}' % max(len(v[0]) for v in hdr_data.values())
fs2 = '{:%s} {}' % max(len(labels[k]) for v in self.sigs.values() for k in v.keys())
fs2 = '{:%s} %s{}' % (
max(len(labels[k]) for v in self.sigs.values() for k in v.keys()),
'0x' if self.proto.base_proto == 'Ethereum' else ''
)
if req_addr:
fs2 = ' ' * 2 + fs2
@ -216,9 +219,10 @@ class coin_msg:
mmid = '{}:{}:{}'.format( al_in.sid, al_in.mmtype, e.idx )
data = {
'addr': e.addr,
'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex(),
'sig': sig,
}
if self.proto.base_proto != 'Ethereum':
data.update({ 'pubhash': self.proto.parse_addr(e.addr_p2pkh or e.addr).bytes.hex() })
if e.addr_p2pkh:
data.update({'addr_p2pkh': e.addr_p2pkh})
@ -306,6 +310,8 @@ class coin_msg:
def get_json_for_export(self,addr=None):
sigs = list( self.get_sigs(addr).values() )
if self.proto.base_proto == 'Ethereum':
sigs = [{k:'0x'+v for k,v in e.items()} for e in sigs]
return json.dumps(
{
'message': self.data['message'],
@ -326,11 +332,11 @@ class coin_msg:
desc = self.desc )
)
def gen_sigs():
for e in self.data['signatures']:
yield e
self.sigs = {e['addr']:e for e in gen_sigs()}
self.sigs = {sig['addr']:sig for sig in (
[{k:v[2:] for k,v in e.items()} for e in self.data['signatures']]
if self.proto.base_proto == 'Ethereum' else
self.data['signatures']
)}
def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**kwargs):

View file

@ -141,6 +141,14 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
('addrimport_erigon_dev_addr', 'importing Erigon dev faucet address'),
('fund_dev_address', 'funding the default (Parity dev) address'),
('msgsign_chk', "signing a message (low-level, check against 'eth_sign' RPC call)"),
('msgcreate', 'creating a message file'),
('msgsign', 'signing the message file'),
('msgverify', 'verifying the message file'),
('msgexport', 'exporting the message file data to JSON for third-party verifier'),
('msgverify_export', 'verifying the exported JSON data'),
('txcreate1', 'creating a transaction (spend from dev address to address :1)'),
('txview1_raw', 'viewing the raw transaction'),
('txsign1', 'signing the transaction'),
@ -308,6 +316,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
from mmgen.daemon import CoinDaemon
d = CoinDaemon(proto=self.proto,test_suite=True)
self.rpc_port = d.rpc_port
self.daemon_datadir = d.datadir
self.using_solc = check_solc_ver()
if not self.using_solc:
omsg(yellow('Using precompiled contract data'))
@ -328,6 +337,7 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
devkey+'\n' )
os.environ['MMGEN_BOGUS_SEND'] = ''
self.message = 'attack at dawn'
def __del__(self):
os.environ['MMGEN_BOGUS_SEND'] = '1'
@ -341,6 +351,10 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
'--quiet'
]
@property
def eth_args_noquiet(self):
return self.eth_args[:-1]
async def setup(self):
self.spawn('',msg_only=True)
@ -435,6 +449,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
imsg(f' Keystore: {keystore}')
signer_addr = make_key(keystore)
self.write_to_tmpfile( 'signer_addr', signer_addr + '\n' )
imsg(f' Signer address: {signer_addr}')
imsg(f' Faucet: {dfl_devaddr} ({prealloc_amt} ETH)')
@ -638,6 +654,72 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
def tx_status1a(self):
return self.tx_status(ext='2.345,50000]{}.regtest.sigtx',expect_str='has 2 confirmations')
async def msgsign_chk(self): # NB: Geth only!
def create_signature_mmgen():
wallet_dir = os.path.relpath(os.path.join(self.daemon_datadir,'keystore'))
wallet_fn = os.path.join(wallet_dir,os.listdir(wallet_dir)[0])
from mmgen.base_proto.ethereum.misc import extract_key_from_geth_keystore_wallet
key = extract_key_from_geth_keystore_wallet(
wallet_fn = wallet_fn,
passwd = b'' )
imsg(f'Key: {key.hex()}')
from mmgen.base_proto.ethereum.misc import ec_sign_message_with_privkey
return ec_sign_message_with_privkey(self.message,key)
async def create_signature_rpc():
from mmgen.rpc import rpc_init
rpc = await rpc_init(self.proto)
addr = self.read_from_tmpfile('signer_addr').strip()
imsg(f'Address: {addr}')
return await rpc.call(
'eth_sign',
'0x' + addr,
'0x' + self.message.encode().hex() )
if not g.daemon_id == 'geth':
return 'skip'
self.spawn('',msg_only=True)
sig = '0x' + create_signature_mmgen()
sig_chk = await create_signature_rpc()
# Compare signatures
imsg(f'Message: {self.message}')
imsg(f'Signature: {sig}')
cmp_or_die(sig,sig_chk,'message signatures')
return 'ok'
def msgcreate(self):
t = self.spawn('mmgen-msg', self.eth_args_noquiet + [ 'create', self.message, '98831F3A:E:1' ])
t.written_to_file('Unsigned message data')
return t
def msgsign(self):
fn = get_file_with_ext(self.tmpdir,'rawmsg.json')
t = self.spawn('mmgen-msg', self.eth_args_noquiet + [ 'sign', fn, dfl_words_file ])
t.written_to_file('Signed message data')
return t
def msgverify(self,fn=None):
fn = fn or get_file_with_ext(self.tmpdir,'sigmsg.json')
t = self.spawn('mmgen-msg', self.eth_args_noquiet + [ 'verify', fn ])
t.expect('1 signature verified')
return t
def msgexport(self):
fn = get_file_with_ext(self.tmpdir,'sigmsg.json')
t = self.spawn('mmgen-msg', self.eth_args_noquiet + [ 'export', fn ])
t.written_to_file('Signature data')
return t
def msgverify_export(self):
return self.msgverify( fn=os.path.join(self.tmpdir,'signatures.json') )
def txcreate4(self):
return self.txcreate(
args = ['98831F3A:E:2,23.45495'],

View file

@ -16,6 +16,8 @@ def get_obj(coin,network):
if coin == 'bch':
addrlists = 'DEADBEEF:C:1-20 98831F3A:C:8,2 A091ABAA:L:111 A091ABAA:C:1'
elif coin == 'eth':
addrlists = 'DEADBEEF:E:1-20 98831F3A:E:8,2 A091ABAA:E:111'
else:
# A091ABAA = 98831F3A:5S
addrlists = 'DEADBEEF:C:1-20 98831F3A:B:8,2 A091ABAA:S:10-11 A091ABAA:111 A091ABAA:C:1'
@ -65,7 +67,7 @@ async def run_test(network_id):
msg(m.format())
single_addr = 'A091ABAA:111'
single_addr = 'A091ABAA:E:111' if m.proto.base_proto == 'Ethereum' else 'A091ABAA:111'
single_addr_coin = m.sigs[MMGenID(m.proto,single_addr)]['addr']
pumsg('\nTesting single address display:\n')
@ -116,7 +118,7 @@ async def run_test(network_id):
class unit_tests:
altcoin_deps = ('ltc','bch')
altcoin_deps = ('ltc','bch','eth')
def btc(self,name,ut):
return run_test('btc')
@ -132,3 +134,6 @@ class unit_tests:
def bch(self,name,ut):
return run_test('bch')
def eth(self,name,ut):
return run_test('eth')