various changes and fixes throughout

This commit is contained in:
The MMGen Project 2022-04-28 11:00:50 +00:00
commit 44cfe1c3dd
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
15 changed files with 92 additions and 64 deletions

View file

@ -32,7 +32,7 @@ from .protocol import init_proto
from .obj import MMGenObject,TwComment,WalletPassword,MMGenPWIDString
from .seed import SeedID,is_seed_id
from .key import PrivKey
from .addr import ViewKey,AddrListID,MMGenPasswordType,is_addr_idx
from .addr import ViewKey,AddrListID,MMGenAddrType,MMGenPasswordType,is_addr_idx
from .addrlist import KeyList,AddrListData,dmsg_sc
class AddrFile(MMGenObject):
@ -209,7 +209,8 @@ class AddrFile(MMGenObject):
else:
network = 'mainnet'
if lbl in p.bitcoin_addrtypes:
from .proto.btc import mainnet
if lbl in [MMGenAddrType(mainnet,key).name for key in mainnet.mmtypes]:
coin,mmtype_key = ( 'BTC', lbl )
elif ':' in lbl: # first component is coin, second is mmtype_key
coin,mmtype_key = lbl.split(':')

View file

@ -157,16 +157,10 @@ class AddrList(MMGenObject): # Address info for a single seed ID
self.proto = proto
do_chksum = False
mmtype = mmtype or proto.dfl_mmtype
assert mmtype in MMGenAddrType.mmtypes, f'{mmtype}: mmtype not in {MMGenAddrType.mmtypes!r}'
from .proto.btc import mainnet
self.bitcoin_addrtypes = tuple(
MMGenAddrType(mainnet,key).name for key in mainnet.mmtypes )
if seed and addr_idxs and mmtype: # data from seed + idxs
self.al_id,src = AddrListID(seed.sid,mmtype),'gen'
adata = self.generate(seed,addr_idxs)
if seed and addr_idxs: # data from seed + idxs
self.al_id = AddrListID( seed.sid, MMGenAddrType(proto, mmtype or proto.dfl_mmtype) )
src = 'gen'
adata = self.generate(seed, addr_idxs if isinstance(addr_idxs,AddrIdxList) else AddrIdxList(addr_idxs))
do_chksum = True
elif addrfile: # data from MMGen address file
self.infile = addrfile

View file

@ -33,6 +33,7 @@ from .util import (
vmsg_r,
qmsg,
fmt,
die,
line_input,
get_words_from_user,
make_chksum_8,

View file

@ -50,6 +50,7 @@ class CfgFileParseError(Exception): mmcode = 1
class UserOptError(Exception): mmcode = 1
class NoLEDSupport(Exception): mmcode = 1
class MsgFileFailedSID(Exception): mmcode = 1
class TestSuiteException(Exception): mmcode = 1
# 2: yellow hl, message only
class InvalidTokenAddress(Exception): mmcode = 2
@ -77,6 +78,7 @@ class HexadecimalStringError(Exception): mmcode = 3
class SeedLengthError(Exception): mmcode = 3
class PrivateKeyError(Exception): mmcode = 3
class MMGenCalledProcessError(Exception): mmcode = 3
class TestSuiteFatalException(Exception): mmcode = 3
# 4: red hl, 'MMGen Fatal Error' + exception + message
class BadMMGenTxID(Exception): mmcode = 4

View file

@ -70,11 +70,10 @@ class MsgOps:
async def __init__(self,msgfile,addr=None):
from .fileutil import write_data_to_file
write_data_to_file(
outfile = 'signatures.json',
data = SignedOnlineMsg( infile=msgfile ).get_json_for_export( addr ),
desc = 'signature data' )
outfile = 'signatures.json',
data = SignedOnlineMsg( infile=msgfile ).get_json_for_export( addr ),
desc = 'signature data' )
opts_data = {
'text': {

View file

@ -49,6 +49,8 @@ class MMGenIDRange(str,Hilite,InitErrors,MMGenObject):
class coin_msg:
supported_base_protos = ('Bitcoin',)
class base(MMGenObject):
ext = 'rawmsg.json'
@ -61,7 +63,11 @@ class coin_msg:
@property
def chksum(self):
return make_chksum_6(
json.dumps( self.data, sort_keys=True, separators=(',', ':') ))
json.dumps(
self.data,
sort_keys = True,
separators = (',', ':')
))
@property
def filename_stem(self):
@ -80,7 +86,9 @@ class coin_msg:
return f'{self.filename_stem}.{coin_msg.signed.ext}'
def get_proto_from_file(self,filename):
coin,network = json.loads(get_data_from_file(filename))['metadata']['network'].split('_')
data = json.loads(get_data_from_file(filename))
network_id = data['metadata']['network']
coin,network = network_id.split('_')
return init_proto( coin=coin, network=network )
def write_to_file(self,outdir=None,ask_overwrite=False):
@ -96,7 +104,7 @@ class coin_msg:
write_data_to_file(
outfile = os.path.join(outdir or '',self.filename),
data = json.dumps(data,sort_keys=True,indent=4),
desc = f'{self.desc} data',
desc = self.desc,
ask_overwrite = ask_overwrite )
class new(base):
@ -117,10 +125,9 @@ class coin_msg:
self.__dict__ = data
return
self.infile = infile
self.data = get_data_from_file(
infile = self.infile,
desc = f'{self.desc} data' )
infile = infile,
desc = self.desc )
d = json.loads(self.data)
self.data = d['metadata']
@ -129,22 +136,25 @@ class coin_msg:
if d.get('failed_seed_ids'):
self.failed_sids = d['failed_seed_ids']
def format(self,mmid=None):
def format(self,req_addr=None):
labels = {
'addr': 'address:',
'addr_p2pkh': 'addr_p2pkh:',
'pubhash': 'pubkey hash:',
'sig': 'signature:',
}
def gen_entry(e):
yield fs2.format( 'addr:', e['addr'] )
if e.get('addr_p2pkh'):
yield fs2.format( 'addr_p2pkh:', e['addr_p2pkh'] )
if e.get('pubhash'):
yield fs2.format( 'pubkey hash:', e['pubhash'] )
yield fs2.format('sig:', e['sig'] )
for k in labels:
if e.get(k):
yield fs2.format( labels[k], e[k] )
def gen_all():
fs = '{:16s} {}'
for k,v in disp_data.items():
yield fs.format( v[0]+':', v[1](self.data[k]) )
for k,v in hdr_data.items():
yield fs1.format( v[0], v[1](self.data[k]) )
if hasattr(self,'failed_sids'):
yield fs.format(
yield fs1.format(
'Failed Seed IDs:',
red(fmt_list(self.failed_sids,fmt='bare')) )
if self.sigs:
@ -157,29 +167,33 @@ class coin_msg:
yield res
def gen_single():
fs = '{:8s} {}'
for k,v in disp_data.items():
yield fs.format( v[0]+':', v[1](self.data[k]) )
for k,v in hdr_data.items():
yield fs1.format( v[0], v[1](self.data[k]) )
if self.sigs:
yield 'Signature data:'
k = MMGenID(self.proto,mmid)
k = MMGenID(self.proto,req_addr)
if k not in self.sigs:
die(1,f'{k}: address not found in signature data')
for res in gen_entry(self.sigs[k]):
yield res
disp_data = {
'message': ('Message', lambda v: grnbg(v) ),
'network': ('Network', lambda v: v.replace('_',' ').upper() ),
'addrlists': ('Address Ranges', lambda v: fmt_list(v,fmt='bare') ),
hdr_data = {
'message': ('Message:', lambda v: grnbg(v) ),
'network': ('Network:', lambda v: v.replace('_',' ').upper() ),
'addrlists': ('Address Ranges:', lambda v: fmt_list(v,fmt='bare') ),
}
if mmid:
del disp_data['addrlists']
fs2 = ' {:12s} {}'
if req_addr or type(self).__name__ == 'exported_sigs':
del hdr_data['addrlists']
fs1 = '{:%s} {}' % max(len(v[0]) for v in hdr_data.values())
fs2 = '{:%s} {}' % max(len(labels[k]) for v in self.sigs.values() for k in v.keys())
if req_addr:
fs2 = ' ' * 2 + fs2
return '\n'.join(gen_single())
else:
fs2 = ' {:12s} {}'
fs2 = ' ' * 5 + fs2
return (
'{}SIGNED MESSAGE DATA:\n\n '.format('' if self.sigs else 'UN') +
'\n '.join(gen_all()) )
@ -214,8 +228,9 @@ class coin_msg:
self.sigs[mmid] = data
from .rpc import rpc_init
self.rpc = await rpc_init(self.proto)
if self.proto.sign_mode == 'daemon':
from .rpc import rpc_init
self.rpc = await rpc_init(self.proto)
from .wallet import Wallet
from .addrlist import KeyAddrList
@ -261,8 +276,8 @@ class coin_msg:
def get_sigs(self,addr):
if addr:
mmaddr = MMGenID(self.proto,addr)
sigs = {k:v for k,v in self.sigs.items() if k == mmaddr}
req_addr = MMGenID(self.proto,addr)
sigs = {k:v for k,v in self.sigs.items() if k == req_addr}
else:
sigs = self.sigs
@ -275,8 +290,9 @@ class coin_msg:
sigs = self.get_sigs(addr)
from .rpc import rpc_init
self.rpc = await rpc_init(self.proto)
if self.proto.sign_mode == 'daemon':
from .rpc import rpc_init
self.rpc = await rpc_init(self.proto)
for k,v in sigs.items():
ret = await self.do_verify(
@ -290,11 +306,12 @@ class coin_msg:
msg('{} signature{} verified'.format( len(sigs), suf(sigs) ))
def get_json_for_export(self,addr=None):
sigs = list( self.get_sigs(addr).values() )
return json.dumps(
{
'message': self.data['message'],
'network': self.data['network'].upper(),
'signatures': tuple( self.get_sigs(addr).values() ),
'signatures': sigs,
},
sort_keys = True,
indent = 4
@ -314,6 +331,9 @@ def _get_obj(clsname,coin=None,network='mainnet',infile=None,data=None,*args,**k
init_proto( coin=coin, network=network ) if coin else
coin_msg.base().get_proto_from_file(infile) )
if proto.base_proto not in coin_msg.supported_base_protos:
die(f'Message signing operations not supported for {proto.base_proto} protocol')
cls = getattr(
getattr(importlib.import_module(f'mmgen.base_proto.{proto.base_proto.lower()}.msg'),'coin_msg'),
clsname )

View file

@ -12,6 +12,7 @@
Ethereum protocol
"""
from ..globalvars import g
from ..protocol import CoinProtocol,_nw,parsed_addr
from ..util import is_hex_str_lc,Msg

View file

@ -59,5 +59,8 @@ class mainnet(CoinProtocol.DummyWIF,CoinProtocol.Base):
return self.parse_addr_bytes(ret)
def pubhash2addr(self,*args,**kwargs):
raise NotImplementedError('Monero addresses do not support pubhash2addr()')
class testnet(mainnet): # use stagenet for testnet
addr_ver_bytes = { '18': 'monero', '24': 'monero_sub' } # testnet is ('35','3f')

View file

@ -17,7 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool/api.py: tool_api interface for the 'mmgen-tool' utility
tool/api.py:
Python interface to selected commands from the 'mmgen-tool' utility
"""
from .common import tool_cmd_base

View file

@ -20,9 +20,6 @@
common.py: Shared routines and data for the MMGen test suites
"""
class TestSuiteException(Exception): pass
class TestSuiteFatalException(Exception): pass
import os
from subprocess import run,PIPE
from mmgen.common import *

View file

@ -496,6 +496,7 @@ t_tool="
- $tooltest_py --coin=dash cryptocoin
- $tooltest_py --coin=doge cryptocoin
- $tooltest_py --coin=emc cryptocoin
- $tooltest_py --coin=xmr cryptocoin
- $tooltest_py --coin=zec cryptocoin
z $tooltest_py --coin=zec --type=zcash_z cryptocoin
"

View file

@ -1054,6 +1054,8 @@ if opt.pause:
set_environ_for_spawned_scripts()
from mmgen.exception import TestSuiteException,TestSuiteFatalException
try:
tr = TestSuiteRunner(data_dir,trash_dir)
tr.run_tests(usr_args)

View file

@ -306,7 +306,8 @@ class TestSuiteEthdev(TestSuiteBase,TestSuiteShared):
from mmgen.protocol import init_proto
self.proto = init_proto(g.coin,network='regtest',need_amt=True)
from mmgen.daemon import CoinDaemon
self.rpc_port = CoinDaemon(proto=self.proto,test_suite=True).rpc_port
d = CoinDaemon(proto=self.proto,test_suite=True)
self.rpc_port = d.rpc_port
self.using_solc = check_solc_ver()
if not self.using_solc:
omsg(yellow('Using precompiled contract data'))

View file

@ -57,6 +57,8 @@ sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
cmd_args = opts.init(opts_data)
assert opt.type in (None,'zcash_z'), 'Only zcash-z permitted for --type argument'
from mmgen.protocol import init_proto_from_opts
proto = init_proto_from_opts()
@ -98,8 +100,9 @@ if proto.coin in ('BTC','LTC'):
'pipetest': ('randpair','o3')
})
if opt.type == 'zcash_z':
if proto.coin == 'XMR' or opt.type == 'zcash_z':
del cmd_data['cryptocoin']['cmd_data']['pubhash2addr']
del cmd_data['cryptocoin']['cmd_data']['addr2pubhash']
cfg = {
'name': 'the tool utility',
@ -197,10 +200,10 @@ def test_msg(m):
m2 = f'Testing {m}'
msg_r(green(m2+'\n') if opt.verbose else '{:{w}}'.format( m2, w=msg_w+8 ))
compressed = ('','compressed')['C' in proto.mmtypes]
compressed = opt.type or ('','compressed')['C' in proto.mmtypes]
segwit = ('','segwit')['S' in proto.mmtypes]
bech32 = ('','bech32')['B' in proto.mmtypes]
type_compressed_arg = ([],['--type=compressed'])['C' in proto.mmtypes]
type_compressed_arg = ([],['--type=' + (opt.type or 'compressed')])[bool(opt.type) or 'C' in proto.mmtypes]
type_segwit_arg = ([],['--type=segwit'])['S' in proto.mmtypes]
type_bech32_arg = ([],['--type=bech32'])['B' in proto.mmtypes]

View file

@ -8,7 +8,7 @@ import os
from test.include.common import silence,end_silence,restart_test_daemons,stop_test_daemons
from mmgen.opts import opt
from mmgen.util import msg,bmsg,pumsg
from mmgen.protocol import CoinProtocol,init_proto
from mmgen.protocol import CoinProtocol
from mmgen.msg import NewMsg,UnsignedMsg,SignedMsg,SignedOnlineMsg
def get_obj(coin,network):
@ -64,20 +64,22 @@ async def run_test(network_id):
msg(m.format())
single_addr = 'A091ABAA:111'
pumsg('\nTesting single address display:\n')
msg(m.format('A091ABAA:111'))
msg(m.format(single_addr))
pumsg('\nTesting verification:\n')
await m.verify(summary=opt.verbose)
pumsg('\nTesting single address verification:\n')
await m.verify('A091ABAA:111',summary=opt.verbose)
await m.verify(single_addr,summary=opt.verbose)
pumsg('\nTesting JSON dump for export:\n')
msg( m.get_json_for_export() )
pumsg('\nTesting single address JSON dump for export:\n')
msg( m.get_json_for_export('A091ABAA:111') )
msg( m.get_json_for_export(single_addr) )
stop_test_daemons(network_id)