obj.py: remove 'on_fail' keyword arg

- During initialization, data objects now invariably raise an exception on
  failure (ObjectInitError by default, configurable via the 'exc' attribute).

- For callers that need to handle the exception, the new get_obj() wrapper is
  provided.

Testing:

    $ test/objtest.py -S
    $ test/objtest.py -S --getobj
This commit is contained in:
The MMGen Project 2020-06-01 09:59:04 +00:00
commit 0852321c21
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
14 changed files with 209 additions and 144 deletions

View file

@ -1159,7 +1159,7 @@ class TwAddrData(AddrData,metaclass=aInitMeta):
twd = await self.get_tw_data(wallet)
out,i = {},0
for acct,addr_array in twd:
l = TwLabel(self.proto,acct,on_fail='silent')
l = get_obj(TwLabel,proto=self.proto,text=acct,silent=True)
if l and l.mmid.type == 'mmgen':
obj = l.mmid.obj
if len(addr_array) != 1:

View file

@ -257,7 +257,7 @@ Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view,
if self.addrs:
wl = [d for d in wl if d['addr'] in self.addrs]
return [{
'account': TwLabel(self.proto,d['mmid']+' '+d['comment'],on_fail='raise'),
'account': TwLabel(self.proto,d['mmid']+' '+d['comment']),
'address': d['addr'],
'amount': await self.wallet.get_balance(d['addr']),
'confirmations': 0, # TODO
@ -298,7 +298,7 @@ class EthereumTwAddrList(TwAddrList):
from mmgen.obj import CoinAddr
for mmid,d in list(tw_dict.items()):
# if d['confirmations'] < minconf: continue # cannot get confirmations for eth account
label = TwLabel(self.proto,mmid+' '+d['comment'],on_fail='raise')
label = TwLabel(self.proto,mmid+' '+d['comment'])
if usr_addr_list and (label.mmid not in usr_addr_list):
continue
bal = await self.wallet.get_balance(d['addr'])

View file

@ -40,10 +40,11 @@ class InvalidTokenAddress(Exception): mmcode = 2
class UnrecognizedTokenSymbol(Exception): mmcode = 2
class TokenNotInBlockchain(Exception): mmcode = 2
class TokenNotInWallet(Exception): mmcode = 2
class BadTwComment(Exception): mmcode = 2
class BadTwLabel(Exception): mmcode = 2
class BaseConversionError(Exception): mmcode = 2
class BaseConversionPadError(Exception): mmcode = 2
class TransactionChainMismatch(Exception):mmcode = 2
class ObjectInitError(Exception): mmcode = 2
# 3: yellow hl, 'MMGen Error' + exception + message
class RPCFailure(Exception): mmcode = 3

View file

@ -104,7 +104,7 @@ if len(cmd_args) != 2:
from .obj import MMGenID
try:
mmids = [MMGenID(a,on_fail='die') for a in cmd_args]
mmids = [MMGenID(a) for a in cmd_args]
except:
die(1,'Command line arguments must be valid MMGen IDs')

View file

@ -148,10 +148,10 @@ if invoked_as == 'subgen':
from .obj import SubSeedIdx
ss_idx = SubSeedIdx(cmd_args.pop())
elif invoked_as == 'seedsplit':
from .obj import SeedSplitSpecifier
from .obj import get_obj,SeedSplitSpecifier
master_share = MasterShareIdx(opt.master_share) if opt.master_share else None
if cmd_args:
sss = SeedSplitSpecifier(cmd_args.pop(),on_fail='silent')
sss = get_obj(SeedSplitSpecifier,s=cmd_args.pop(),silent=True)
if master_share:
if not sss:
sss = SeedSplitSpecifier('1:2')

View file

@ -34,14 +34,42 @@ class aInitMeta(type):
await instance.__ainit__(*args,**kwargs)
return instance
def is_mmgen_seed_id(s): return SeedID(sid=s,on_fail='silent')
def is_mmgen_idx(s): return AddrIdx(s,on_fail='silent')
def is_addrlist_id(s): return AddrListID(s,on_fail='silent')
def is_seed_split_specifier(s): return SeedSplitSpecifier(s,on_fail='silent')
def get_obj(objname,*args,**kwargs):
"""
Wrapper for data objects
- If the object throws an exception on instantiation, return False, otherwise return the object.
- If silent is True, suppress display of the exception.
- If return_bool is True, return True instead of the object.
Only keyword args are accepted.
"""
assert args == (), 'get_obj_chk1'
def is_mmgen_id(proto,s): return MMGenID(proto,s,on_fail='silent')
def is_coin_addr(proto,s): return CoinAddr(proto,s,on_fail='silent')
def is_wif(proto,s): return WifKey(proto,s,on_fail='silent')
silent,return_bool = (False,False)
if 'silent' in kwargs:
silent = kwargs['silent']
del kwargs['silent']
if 'return_bool' in kwargs:
return_bool = kwargs['return_bool']
del kwargs['return_bool']
try:
ret = objname(**kwargs)
except Exception as e:
if not silent:
from .util import msg
msg(f'{e!s}')
return False
else:
return True if return_bool else ret
def is_mmgen_seed_id(s): return get_obj(SeedID, sid=s, silent=True,return_bool=True)
def is_mmgen_idx(s): return get_obj(AddrIdx, n=s, silent=True,return_bool=True)
def is_addrlist_id(s): return get_obj(AddrListID, sid=s, silent=True,return_bool=True)
def is_seed_split_specifier(s): return get_obj(SeedSplitSpecifier, s=s, silent=True,return_bool=True)
def is_mmgen_id(proto,s): return get_obj(MMGenID, proto=proto, id_str=s, silent=True,return_bool=True)
def is_coin_addr(proto,s): return get_obj(CoinAddr, proto=proto, addr=s, silent=True,return_bool=True)
def is_wif(proto,s): return get_obj(WifKey, proto=proto, wif=s, silent=True,return_bool=True)
def truncate_str(s,width): # width = screen width
wide_count = 0
@ -88,15 +116,7 @@ class MMGenList(list,MMGenObject): pass
class MMGenDict(dict,MMGenObject): pass
class AddrListData(list,MMGenObject): pass
class InitErrors(object):
on_fail='die'
@classmethod
def arg_chk(cls,on_fail):
cls.on_fail = on_fail
assert on_fail in ('die','return','silent','raise'),(
"'{}': invalid value for 'on_fail' in class {}".format(on_fail,cls.__name__) )
class InitErrors:
@classmethod
def init_fail(cls,e,m,e2=None,m2=None,objname=None,preformat=False):
@ -104,27 +124,19 @@ class InitErrors(object):
if preformat:
errmsg = m
else:
fs = "{!r}: value cannot be converted to {} {}({})"
e2_fmt = '({}) '.format(e2.args[0]) if e2 else ''
errmsg = fs.format(m,objname or cls.__name__,e2_fmt,e.args[0])
errmsg = '{!r}: value cannot be converted to {} {}({!s})'.format(
m,
(objname or cls.__name__),
(f'({e2!s}) ' if e2 else ''),
e )
if m2:
errmsg = '{!r}\n{}'.format(m2,errmsg)
errmsg = repr(m2) + '\n' + errmsg
from .util import die,msg
if cls.on_fail == 'silent':
return None # TODO: return False instead?
elif cls.on_fail == 'return':
if errmsg:
msg(errmsg)
return None # TODO: return False instead?
elif g.traceback or cls.on_fail == 'raise':
if hasattr(cls,'exc'):
raise cls.exc(errmsg)
else:
raise
elif cls.on_fail == 'die':
die(1,errmsg)
if hasattr(cls,'exc'):
raise cls.exc(errmsg)
else:
raise ObjectInitError(errmsg)
@classmethod
def method_not_implemented(cls):
@ -201,10 +213,9 @@ class Int(int,Hilite,InitErrors):
max_digits = None
color = 'red'
def __new__(cls,n,base=10,on_fail='raise'):
def __new__(cls,n,base=10):
if type(n) == cls:
return n
cls.arg_chk(on_fail)
try:
me = int.__new__(cls,str(n),base)
if cls.min_val != None:
@ -246,9 +257,9 @@ class ImmutableAttr: # Descriptor
"convert this attribute's type"
if type(dtype) == str:
if include_proto:
self.conv = lambda instance,value: globals()[dtype](instance.proto,value,on_fail='raise')
self.conv = lambda instance,value: globals()[dtype](instance.proto,value)
else:
self.conv = lambda instance,value: globals()[dtype](value,on_fail='raise')
self.conv = lambda instance,value: globals()[dtype](value)
else:
if set_none_ok:
self.conv = lambda instance,value: None if value is None else dtype(value)
@ -359,28 +370,27 @@ class AddrIdx(MMGenIdx): max_digits = 7
class AddrIdxList(list,InitErrors,MMGenObject):
max_len = 1000000
def __init__(self,fmt_str=None,idx_list=None,on_fail='die',sep=','):
type(self).arg_chk(on_fail)
def __init__(self,fmt_str=None,idx_list=None,sep=','):
try:
if idx_list:
return list.__init__(self,sorted({AddrIdx(i,on_fail='raise') for i in idx_list}))
return list.__init__(self,sorted({AddrIdx(i) for i in idx_list}))
elif fmt_str:
ret = []
for i in (fmt_str.split(sep)):
j = i.split('-')
if len(j) == 1:
idx = AddrIdx(i,on_fail='raise')
idx = AddrIdx(i)
if not idx:
break
ret.append(idx)
elif len(j) == 2:
beg = AddrIdx(j[0],on_fail='raise')
beg = AddrIdx(j[0])
if not beg:
break
end = AddrIdx(j[1],on_fail='raise')
end = AddrIdx(j[1])
if not beg or (end < beg):
break
ret.extend([AddrIdx(x,on_fail='raise') for x in range(beg,end+1)])
ret.extend([AddrIdx(x) for x in range(beg,end+1)])
else: break
else:
return list.__init__(self,sorted(set(ret))) # fell off end of loop - success
@ -393,8 +403,7 @@ class MMGenRange(tuple,InitErrors,MMGenObject):
min_idx = None
max_idx = None
def __new__(cls,*args,on_fail='die'):
cls.arg_chk(on_fail)
def __new__(cls,*args):
try:
if len(args) == 1:
s = args[0]
@ -449,10 +458,9 @@ class BTCAmt(Decimal,Hilite,InitErrors):
forbidden_types = (float,int)
# NB: 'from_decimal' rounds down to precision of 'min_coin_unit'
def __new__(cls,num,from_unit=None,from_decimal=False,on_fail='die'):
def __new__(cls,num,from_unit=None,from_decimal=False):
if type(num) == cls:
return num
cls.arg_chk(on_fail)
try:
if from_unit:
assert from_unit in cls.units,(
@ -546,10 +554,9 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
hex_width = 40
width = 1
trunc_ok = False
def __new__(cls,proto,addr,on_fail='die'):
def __new__(cls,proto,addr):
if type(addr) == cls:
return addr
cls.arg_chk(on_fail)
try:
assert set(addr) <= set(ascii_letters+digits),'contains non-alphanumeric characters'
me = str.__new__(cls,addr)
@ -571,11 +578,11 @@ class TokenAddr(CoinAddr):
color = 'blue'
class ViewKey(object):
def __new__(cls,proto,viewkey,on_fail='die'):
def __new__(cls,proto,viewkey):
if proto.name == 'Zcash':
return ZcashViewKey.__new__(ZcashViewKey,proto,viewkey,on_fail)
return ZcashViewKey.__new__(ZcashViewKey,proto,viewkey)
elif proto.name == 'Monero':
return MoneroViewKey.__new__(MoneroViewKey,viewkey,on_fail)
return MoneroViewKey.__new__(MoneroViewKey,viewkey)
else:
raise ValueError(f'{proto.name}: protocol does not support view keys')
@ -585,10 +592,9 @@ class SeedID(str,Hilite,InitErrors):
color = 'blue'
width = 8
trunc_ok = False
def __new__(cls,seed=None,sid=None,on_fail='die'):
def __new__(cls,seed=None,sid=None):
if type(sid) == cls:
return sid
cls.arg_chk(on_fail)
try:
if seed:
from .seed import SeedBase
@ -606,10 +612,9 @@ class SeedID(str,Hilite,InitErrors):
class SubSeedIdx(str,Hilite,InitErrors):
color = 'red'
trunc_ok = False
def __new__(cls,s,on_fail='die'):
def __new__(cls,s):
if type(s) == cls:
return s
cls.arg_chk(on_fail)
try:
assert isinstance(s,str),'not a string or string subclass'
idx = s[:-1] if s[-1] in 'SsLl' else s
@ -631,15 +636,14 @@ class MMGenID(str,Hilite,InitErrors,MMGenObject):
color = 'orange'
width = 0
trunc_ok = False
def __new__(cls,proto,id_str,on_fail='die'):
cls.arg_chk(on_fail)
def __new__(cls,proto,id_str):
try:
ss = str(id_str).split(':')
assert len(ss) in (2,3),'not 2 or 3 colon-separated items'
t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2],on_fail='raise')
t = proto.addr_type((ss[1],proto.dfl_mmtype)[len(ss)==2])
me = str.__new__(cls,'{}:{}:{}'.format(ss[0],t,ss[-1]))
me.sid = SeedID(sid=ss[0],on_fail='raise')
me.idx = AddrIdx(ss[-1],on_fail='raise')
me.sid = SeedID(sid=ss[0])
me.idx = AddrIdx(ss[-1])
me.mmtype = t
assert t in proto.mmtypes, f'{t}: invalid address type for {proto.cls_name}'
me.al_id = str.__new__(AddrListID,me.sid+':'+me.mmtype) # checks already done
@ -653,13 +657,12 @@ class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
color = 'orange'
width = 0
trunc_ok = False
def __new__(cls,proto,id_str,on_fail='die'):
def __new__(cls,proto,id_str):
if type(id_str) == cls:
return id_str
cls.arg_chk(on_fail)
ret = None
try:
ret = MMGenID(proto,id_str,on_fail='raise')
ret = MMGenID(proto,id_str)
sort_key,idtype = ret.sort_key,'mmgen'
except Exception as e:
try:
@ -680,14 +683,14 @@ class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
# non-displaying container for TwMMGenID,TwComment
class TwLabel(str,InitErrors,MMGenObject):
def __new__(cls,proto,text,on_fail='die'):
exc = BadTwLabel
def __new__(cls,proto,text):
if type(text) == cls:
return text
cls.arg_chk(on_fail)
try:
ts = text.split(None,1)
mmid = TwMMGenID(proto,ts[0],on_fail='raise')
comment = TwComment(ts[1] if len(ts) == 2 else '',on_fail='raise')
mmid = TwMMGenID(proto,ts[0])
comment = TwComment(ts[1] if len(ts) == 2 else '')
me = str.__new__( cls, mmid + (' ' + comment if comment else '') )
me.mmid = mmid
me.comment = comment
@ -701,10 +704,9 @@ class HexStr(str,Hilite,InitErrors):
width = None
hexcase = 'lower'
trunc_ok = False
def __new__(cls,s,on_fail='die',case=None):
def __new__(cls,s,case=None):
if type(s) == cls:
return s
cls.arg_chk(on_fail)
if case == None:
case = cls.hexcase
try:
@ -730,10 +732,9 @@ class WifKey(str,Hilite,InitErrors):
"""
width = 53
color = 'blue'
def __new__(cls,proto,wif,on_fail='die'):
def __new__(cls,proto,wif):
if type(wif) == cls:
return wif
cls.arg_chk(on_fail)
try:
assert set(wif) <= set(ascii_letters+digits),'not an ascii alphanumeric string'
proto.parse_wif(wif) # raises exception on error
@ -742,12 +743,12 @@ class WifKey(str,Hilite,InitErrors):
return cls.init_fail(e,wif)
class PubKey(HexStr,MMGenObject): # TODO: add some real checks
def __new__(cls,s,compressed,on_fail='die'):
def __new__(cls,s,compressed):
try:
assert type(compressed) == bool,"'compressed' must be of type bool"
except Exception as e:
return cls.init_fail(e,s)
me = HexStr.__new__(cls,s,case='lower',on_fail=on_fail)
me = HexStr.__new__(cls,s,case='lower')
if me:
me.compressed = compressed
return me
@ -767,12 +768,9 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
wif = ImmutableAttr(WifKey,typeconv=False)
# initialize with (priv_bin,compressed), WIF or self
def __new__(cls,proto,s=None,compressed=None,wif=None,pubkey_type=None,on_fail='die'):
def __new__(cls,proto,s=None,compressed=None,wif=None,pubkey_type=None):
if type(s) == cls:
return s
cls.arg_chk(on_fail)
if wif:
try:
assert s == None,"'wif' and key hex args are mutually exclusive"
@ -801,7 +799,7 @@ class PrivKey(str,Hilite,InitErrors,MMGenObject):
assert compressed is not None, "'compressed' arg missing"
assert type(compressed) == bool,"{!r}: 'compressed' not of type 'bool'".format(compressed)
me = str.__new__(cls,proto.preprocess_key(s,pubkey_type).hex())
me.wif = WifKey(proto,proto.hex2wif(me,pubkey_type,compressed),on_fail='raise')
me.wif = WifKey(proto,proto.hex2wif(me,pubkey_type,compressed))
me.compressed = compressed
me.pubkey_type = pubkey_type
me.orig_hex = s.hex() # save the non-preprocessed key
@ -814,8 +812,7 @@ class AddrListID(str,Hilite,InitErrors,MMGenObject):
width = 10
trunc_ok = False
color = 'yellow'
def __new__(cls,sid,mmtype,on_fail='die'):
cls.arg_chk(on_fail)
def __new__(cls,sid,mmtype):
try:
assert type(sid) == SeedID,"{!r} not a SeedID instance".format(sid)
if not isinstance(mmtype,(MMGenAddrType,MMGenPasswordType)):
@ -835,10 +832,9 @@ class MMGenLabel(str,Hilite,InitErrors):
min_len = 0
max_screen_width = 0 # if != 0, overrides max_len
desc = 'label'
def __new__(cls,s,on_fail='die',msg=None):
def __new__(cls,s,msg=None):
if type(s) == cls:
return s
cls.arg_chk(on_fail)
for k in cls.forbidden,cls.allowed:
assert type(k) == list
for ch in k: assert type(ch) == str and len(ch) == 1
@ -874,7 +870,6 @@ class MMGenWalletLabel(MMGenLabel):
class TwComment(MMGenLabel):
max_screen_width = 80
desc = 'tracking wallet comment'
exc = BadTwComment
class MMGenTxLabel(MMGenLabel):
max_len = 72
@ -889,18 +884,17 @@ class MMGenPWIDString(MMGenLabel):
class SeedSplitSpecifier(str,Hilite,InitErrors,MMGenObject):
color = 'red'
def __new__(cls,s,on_fail='raise'):
def __new__(cls,s):
if type(s) == cls:
return s
cls.arg_chk(on_fail)
try:
arr = s.split(':')
assert len(arr) in (2,3), 'cannot be parsed'
a,b,c = arr if len(arr) == 3 else ['default'] + arr
me = str.__new__(cls,s)
me.id = SeedSplitIDString(a,on_fail=on_fail)
me.idx = SeedShareIdx(b,on_fail=on_fail)
me.count = SeedShareCount(c,on_fail=on_fail)
me.id = SeedSplitIDString(a)
me.idx = SeedShareIdx(b)
me.count = SeedShareCount(c)
assert me.idx <= me.count, 'share index greater than share count'
return me
except Exception as e:
@ -936,10 +930,9 @@ class MMGenAddrType(str,Hilite,InitErrors,MMGenObject):
'Z': ati('zcash_z','zcash_z',False,'zcash_z', 'zcash_z', 'wif', ('viewkey',), 'Zcash z-address'),
'M': ati('monero', 'monero', False,'monero', 'monero', 'spendkey',('viewkey','wallet_passwd'),'Monero address'),
}
def __new__(cls,proto,id_str,on_fail='die',errmsg=None):
def __new__(cls,proto,id_str,errmsg=None):
if type(id_str) == cls:
return id_str
cls.arg_chk(on_fail)
try:
for k,v in cls.mmtypes.items():
if id_str in (k,v.name):

View file

@ -168,11 +168,11 @@ class CoinProtocol(MMGenObject):
def coin_addr(self,addr):
return CoinAddr( proto=self, addr=addr )
def addr_type(self,id_str,on_fail='die'):
return MMGenAddrType(proto=self,id_str=id_str,on_fail=on_fail)
def addr_type(self,id_str):
return MMGenAddrType( proto=self, id_str=id_str )
def priv_key(self,s,on_fail='die'):
return PrivKey(proto=self,s=s,on_fail=on_fail)
def priv_key(self,s):
return PrivKey( proto=self, s=s )
class Secp256k1(Base):
"""

View file

@ -33,8 +33,8 @@ def CUR_RIGHT(n): return '\033[{}C'.format(n)
def get_tw_label(proto,s):
try:
return TwLabel(proto,s,on_fail='raise')
except BadTwComment:
return TwLabel(proto,s)
except BadTwLabel:
raise
except:
return None
@ -410,7 +410,7 @@ Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, add [l]abel:
while True:
ret = my_raw_input(f'Enter {self.item_desc} number (or RETURN to return to main menu): ')
if ret == '': return (None,None) if action == 'a_lbl_add' else None
n = AddrIdx(ret,on_fail='silent')
n = get_obj(AddrIdx,n=ret,silent=True)
if not n or n < 1 or n > len(self.unspent):
msg(f'Choice must be a single number between 1 and {len(self.unspent)}')
else:
@ -424,7 +424,7 @@ Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, add [l]abel:
f'Removing label for {self.item_desc} #{n}. Is this what you want?'):
return n,s
elif s:
if TwComment(s,on_fail='return'):
if get_obj(TwComment,s=s):
return n,s
else:
if action == 'a_addr_delete':
@ -801,8 +801,8 @@ class TrackingWallet(MMGenObject,metaclass=aInitMeta):
def conv_types(self,ad):
for k,v in ad.items():
if k not in ('params','coin'):
v['mmid'] = TwMMGenID(self.proto,v['mmid'],on_fail='raise')
v['comment'] = TwComment(v['comment'],on_fail='raise')
v['mmid'] = TwMMGenID(self.proto,v['mmid'])
v['comment'] = TwComment(v['comment'])
@property
def data_root(self):
@ -918,9 +918,10 @@ class TrackingWallet(MMGenObject,metaclass=aInitMeta):
# returns on failure
@write_mode
async def add_label(self,arg1,label='',addr=None,silent=False,on_fail='return'):
assert on_fail in ('return','raise'), 'add_label_chk1'
mmaddr,coinaddr = None,None
if is_coin_addr(self.proto,addr or arg1):
coinaddr = CoinAddr(self.proto,addr or arg1,on_fail='return')
coinaddr = get_obj(CoinAddr,proto=self.proto,addr=addr or arg1)
if is_mmgen_id(self.proto,arg1):
mmaddr = TwMMGenID(self.proto,arg1)
@ -948,11 +949,14 @@ class TrackingWallet(MMGenObject,metaclass=aInitMeta):
mmaddr = TwMMGenID(self.proto,mmaddr)
cmt = TwComment(label,on_fail=on_fail)
cmt = TwComment(label) if on_fail=='raise' else get_obj(TwComment,s=label)
if cmt in (False,None):
return False
lbl = TwLabel(self.proto,mmaddr + ('',' '+cmt)[bool(cmt)],on_fail=on_fail)
lbl_txt = mmaddr + (' ' + cmt if cmt else '')
lbl = (
TwLabel(self.proto,lbl_txt) if on_fail == 'raise' else
get_obj(TwLabel,proto=self.proto,text=lbl_txt) )
if await self.set_label(coinaddr,lbl) == False:
if not silent:

View file

@ -616,8 +616,9 @@ class MMGenTX:
# given tx size and absolute fee or fee spec, return absolute fee
# relative fee is N+<first letter of unit name>
def process_fee_spec(self,tx_fee,tx_size):
if self.proto.coin_amt(tx_fee,on_fail='silent'):
return self.proto.coin_amt(tx_fee)
fee = get_obj(self.proto.coin_amt,num=tx_fee,silent=True)
if fee:
return fee
else:
import re
units = {u[0]:u for u in self.proto.coin_amt.units}
@ -733,7 +734,7 @@ class MMGenTX:
while True:
reply = my_raw_input(prompt).strip()
if reply:
selected = AddrIdxList(fmt_str=','.join(reply.split()),on_fail='return')
selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
if selected:
if selected[-1] <= len(unspent):
return selected
@ -1004,7 +1005,7 @@ class MMGenTX:
return self.inputs[0].sequence == g.max_int - 2
def check_txfile_hex_data(self):
self.hex = HexStr(self.hex,on_fail='raise')
self.hex = HexStr(self.hex)
def parse_txfile_hex_data(self):
pass
@ -1242,7 +1243,7 @@ class MMGenTX:
tx_decoded = await self.rpc.call('decoderawtransaction',ret['hex'])
new.compare_size_and_estimated_size(tx_decoded)
new.check_hex_tx_matches_mmgen_tx(dtx)
new.coin_txid = CoinTxID(dtx['txid'],on_fail='raise')
new.coin_txid = CoinTxID(dtx['txid'])
if not new.coin_txid == tx_decoded['txid']:
raise BadMMGenTxID('txid mismatch (after signing)')
msg('OK')

View file

@ -65,13 +65,13 @@ class MMGenTxFile:
tx_data = tx_data.splitlines()
assert len(tx_data) >= 5,'number of lines less than 5'
assert len(tx_data[0]) == 6,'invalid length of first line'
self.chksum = HexStr(tx_data.pop(0),on_fail='raise')
self.chksum = HexStr(tx_data.pop(0))
assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum'
if len(tx_data) == 6:
assert len(tx_data[-1]) == 64,'invalid coin TxID length'
desc = f'coin TxID'
tx.coin_txid = CoinTxID(tx_data.pop(-1),on_fail='raise')
tx.coin_txid = CoinTxID(tx_data.pop(-1))
if len(tx_data) == 5:
# rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up))
@ -83,7 +83,7 @@ class MMGenTxFile:
comment = baseconv.tobytes(c,'b58').decode()
assert comment != False,'invalid comment'
desc = 'comment'
tx.label = MMGenTxLabel(comment,on_fail='raise')
tx.label = MMGenTxLabel(comment)
desc = 'number of lines' # four required lines
metadata,tx.hex,inputs_data,outputs_data = tx_data
@ -113,7 +113,7 @@ class MMGenTxFile:
txid,send_amt,tx.timestamp,blockcount = metadata
desc = 'TxID in metadata'
tx.txid = MMGenTxID(txid,on_fail='raise')
tx.txid = MMGenTxID(txid)
desc = 'send amount in metadata'
tx.send_amt = tx.proto.coin_amt(send_amt)
desc = 'block count in metadata'

View file

@ -724,7 +724,7 @@ class MMGenWallet(WalletEnc):
msg_r(prompt)
ret = my_raw_input('')
if ret:
lbl = MMGenWalletLabel(ret,on_fail='return')
lbl = get_obj(MMGenWalletLabel,s=ret)
if lbl:
return lbl
else:

View file

@ -41,6 +41,7 @@ opts_data = {
'options': """
-h, --help Print this help message
--, --longhelp Print help message for long options (common options)
-g, --getobj Instantiate objects with get_obj() wrapper
-q, --quiet Produce quieter output
-s, --silent Silence output of tested objects
-S, --super-silent Silence all output except for errors
@ -51,12 +52,11 @@ opts_data = {
cmd_args = opts.init(opts_data)
def run_test(test,arg,input_data):
def run_test(test,arg,input_data,arg1,exc_name):
arg_copy = arg
kwargs = {'on_fail':'silent'} if opt.silent else {'on_fail':'die'}
kwargs = {}
ret_chk = arg
ret_idx = None
exc_type = None
if input_data == 'good' and type(arg) == tuple:
arg,ret_chk = arg
if type(arg) == dict: # pass one arg + kwargs to constructor
@ -76,28 +76,42 @@ def run_test(test,arg,input_data):
ret_idx = arg['ret_idx']
del arg['ret_idx']
del arg_copy['ret_idx']
if 'ExcType' in arg:
exc_type = arg['ExcType']
del arg['ExcType']
del arg_copy['ExcType']
kwargs.update(arg)
elif type(arg) == tuple:
args = arg
else:
args = [arg]
if opt.getobj:
if args:
assert len(args) == 1, 'objtest_chk1: only one positional arg is allowed'
kwargs.update( { arg1: args[0] } )
if opt.silent:
kwargs.update( { 'silent': True } )
try:
if not opt.super_silent:
arg_disp = repr(arg_copy[0] if type(arg_copy) == tuple else arg_copy)
msg_r((orange,green)[input_data=='good']('{:<22}'.format(arg_disp+':')))
cls = globals()[test]
ret = cls(*args,**kwargs)
if opt.getobj:
ret = get_obj(globals()[test],**kwargs)
else:
ret = cls(*args,**kwargs)
bad_ret = list() if issubclass(cls,list) else None
if isinstance(ret_chk,str): ret_chk = ret_chk.encode()
if isinstance(ret,str): ret = ret.encode()
if (opt.silent and input_data=='bad' and ret!=bad_ret) or (not opt.silent and input_data=='bad'):
raise UserWarning("Non-'None' return value {} with bad input data".format(repr(ret)))
if opt.getobj:
if input_data == 'bad':
assert ret == False, 'non-False return on bad input data'
else:
if (opt.silent and input_data=='bad' and ret!=bad_ret) or (not opt.silent and input_data=='bad'):
raise UserWarning(f"Non-'None' return value {ret!r} with bad input data")
if opt.silent and input_data=='good' and ret==bad_ret:
raise UserWarning("'None' returned with good input data")
@ -105,20 +119,31 @@ def run_test(test,arg,input_data):
if ret_idx:
ret_chk = arg[list(arg.keys())[ret_idx]].encode()
if ret != ret_chk and repr(ret) != repr(ret_chk):
raise UserWarning("Return value ({!r}) doesn't match expected value ({!r})".format(ret,ret_chk))
if not opt.super_silent:
raise UserWarning(f"Return value ({ret!r}) doesn't match expected value ({ret_chk!r})")
if opt.super_silent:
return
if opt.getobj and (not opt.silent and input_data == 'bad'):
pass
else:
try: ret_disp = ret.decode()
except: ret_disp = ret
msg(f'==> {ret_disp!r}')
if opt.verbose and issubclass(cls,MMGenObject):
ret.pmsg() if hasattr(ret,'pmsg') else pmsg(ret)
except Exception as e:
if not type(e).__name__ == exc_type:
if not type(e).__name__ == exc_name:
msg(f'Incorrect exception: expected {exc_name} but got {type(e).__name__}')
raise
if not opt.super_silent:
msg_r(' {}'.format(yellow(exc_type+':')))
msg(e.args[0])
if opt.super_silent:
pass
elif opt.silent:
msg(f'==> {exc_name}')
else:
msg( yellow(f' {exc_name}:') + str(e) )
except SystemExit as e:
if input_data == 'good':
raise ValueError('Error on good input data')
@ -137,10 +162,17 @@ def do_loop():
clr = None
utests = cmd_args
for test in test_data:
arg1 = test_data[test].get('arg1')
if utests and test not in utests: continue
nl = ('\n','')[bool(opt.super_silent) or clr == None]
clr = (blue,nocolor)[bool(opt.super_silent)]
msg(clr('{}Testing {}'.format(nl,test)))
if opt.getobj and arg1 is None:
msg(gray(f'{nl}Skipping {test}'))
continue
else:
msg(clr(f'{nl}Testing {test}'))
for k in ('bad','good'):
if not opt.silent:
msg(purple(capfirst(k)+' input:'))
@ -149,6 +181,8 @@ def do_loop():
test,
arg,
input_data = k,
arg1 = arg1,
exc_name = test_data[test].get('exc_name') or 'ObjectInitError',
)
from mmgen.protocol import init_proto_from_opts

View file

@ -19,6 +19,7 @@ ssm = str(SeedShareCount.max_val)
tests = {
'Int': {
'arg1': 'n',
'bad': ('1L',0.0,'0.0','1.0',1.0,'s',1.1,'1.1'),
'good': (
('0',0),('-1',-1),('7',7),-1,0,1,9999999,
@ -29,22 +30,27 @@ tests = {
)
},
'AddrIdx': {
'arg1': 'n',
'bad': ('s',1.1,10000000,-1,0),
'good': (('7',7),(1,1),(9999999,9999999))
},
'SeedShareIdx': {
'arg1': 'n',
'bad': ('s',1.1,1025,-1,0),
'good': (('7',7),(1,1),(1024,1024))
},
'SeedShareCount': {
'arg1': 'n',
'bad': ('s',2.1,1025,-1,0,1),
'good': (('7',7),(2,2),(1024,1024))
},
'MasterShareIdx': {
'arg1': 'n',
'bad': ('s',1.1,1025,-1,0),
'good': (('7',7),(1,1),(1024,1024))
},
'AddrIdxList': {
'arg1': 'fmt_str',
'bad': ('x','5,9,1-2-3','8,-11','66,3-2'),
'good': (
('3,2,2',[2,3]),
@ -63,6 +69,7 @@ tests = {
)
},
'BTCAmt': {
'arg1': 'num',
'bad': ('-3.2','0.123456789',123,'123L','22000000',20999999.12345678,
{'num':'1','from_decimal':True},
{'num':1,'from_decimal':True},
@ -86,6 +93,7 @@ tests = {
)
},
'CoinAddr': {
'arg1': 'addr',
'good': (
{'addr':'1MjjELEy6EJwk8fSNfpS8b5teFRo4X5fZr', 'proto':proto},
{'addr':'32GiSWo9zJQgkCmjAaLRrbPwXhKry2jHhj', 'proto':proto},
@ -97,6 +105,7 @@ tests = {
),
},
'SeedID': {
'arg1': 'sid',
'bad': (
{'sid':'я'},
{'sid':'F00F00'},
@ -106,13 +115,19 @@ tests = {
{'sid':'f00baa12'},
'я',r32,'abc'
),
'good': (({'sid':'F00BAA12'},'F00BAA12'),(Seed(r16),Seed(r16).sid))
'good': (
{'sid':'F00BAA12'},
{'seed': Seed(r16), 'ret': SeedID(seed=Seed(r16))},
{'sid': Seed(r16).sid, 'ret': SeedID(seed=Seed(r16))}
)
},
'SubSeedIdx': {
'arg1': 's',
'bad': (33,'x','я','1x',200,'1ss','L','s','200LS','30ll','s100',str(SubSeedIdxRange.max_idx+1),'0'),
'good': (('1','1L'),('1s','1S'),'20S','30L',('300l','300L'),('200','200L'),str(SubSeedIdxRange.max_idx)+'S')
},
'MMGenID': {
'arg1': 'id_str',
'bad': (
{'id_str':'x', 'proto':proto},
{'id_str':1, 'proto':proto},
@ -129,6 +144,7 @@ tests = {
),
},
'TwMMGenID': {
'arg1': 'id_str',
'bad': (
{'id_str':'x', 'proto':proto},
{'id_str':'я', 'proto':proto},
@ -150,6 +166,8 @@ tests = {
),
},
'TwLabel': {
'arg1': 'proto',
'exc_name': 'BadTwLabel',
'bad': (
{'text':'x x', 'proto':proto},
{'text':'x я', 'proto':proto},
@ -163,7 +181,7 @@ tests = {
{'text':tw_pfx+' x', 'proto':proto},
{'text':tw_pfx+'я x', 'proto':proto},
{'text':utf8_ctrl[:40], 'proto':proto},
{'text':'F00BAA12:S:1 '+ utf8_ctrl[:40], 'proto':proto, 'on_fail':'raise','ExcType':'BadTwComment'},
{'text':'F00BAA12:S:1 '+ utf8_ctrl[:40], 'proto':proto, },
),
'good': (
{'text':'F00BAA12:99 a comment', 'proto':proto, 'ret':'F00BAA12:L:99 a comment'},
@ -174,14 +192,17 @@ tests = {
),
},
'MMGenTxID': {
'arg1': 's',
'bad': (1,[],'\0','\1','я','g','gg','FF','f00','F00F0012'),
'good': ('DEADBE','F00BAA')
},
'CoinTxID':{
'arg1': 's',
'bad': (1,[],'\0','\1','я','g','gg','FF','f00','F00F0012',r16.hex(),r32.hex()+'ee'),
'good': (r32.hex(),)
},
'WifKey': {
'arg1': 'proto',
'bad': (
{'proto':proto, 'wif':1},
{'proto':proto, 'wif':[]},
@ -201,10 +222,12 @@ tests = {
)
},
'PubKey': {
'arg1': 's',
'bad': ({'arg':1,'compressed':False},{'arg':'F00BAA12','compressed':False},),
'good': ({'arg':'deadbeef','compressed':True},) # TODO: add real pubkeys
},
'PrivKey': {
'arg1': 'proto',
'bad': (
{'proto':proto, 'wif':1},
{'proto':proto, 'wif':'1'},
@ -227,6 +250,7 @@ tests = {
)
},
'AddrListID': { # a rather pointless test, but do it anyway
'arg1': 'sid',
'bad': (
{'sid':SeedID(sid='F00BAA12'),'mmtype':'Z','ret':'F00BAA12:Z'},
),
@ -236,10 +260,12 @@ tests = {
)
},
'MMGenWalletLabel': {
'arg1': 's',
'bad': (utf8_text[:49],utf8_combining[:48],utf8_ctrl[:48],gr_uc_w_ctrl),
'good': (utf8_text[:48],)
},
'TwComment': {
'arg1': 's',
'bad': ( utf8_combining[:40],
utf8_ctrl[:40],
text_jp[:41],
@ -252,14 +278,17 @@ tests = {
text_zh[:40] )
},
'MMGenTxLabel':{
'arg1': 's',
'bad': (utf8_text[:73],utf8_combining[:72],utf8_ctrl[:72],gr_uc_w_ctrl),
'good': (utf8_text[:72],)
},
'MMGenPWIDString': { # forbidden = list(u' :/\\')
'arg1': 's',
'bad': ('foo/','foo:','foo:\\'),
'good': ('qwerty@яяя',)
},
'MMGenAddrType': {
'arg1': 'proto',
'bad': (
{'proto':proto, 'id_str':'U', 'ret':'L'},
{'proto':proto, 'id_str':'z', 'ret':'L'},
@ -278,6 +307,7 @@ tests = {
)
},
'MMGenPasswordType': {
'arg1': 'proto',
'bad': (
{'proto':proto, 'id_str':'U', 'ret':'L'},
{'proto':proto, 'id_str':'z', 'ret':'L'},
@ -291,6 +321,7 @@ tests = {
)
},
'SeedSplitSpecifier': {
'arg1': 's',
'bad': ('M','αβ:2',1,'0:1','1:1','2:1','3:2','1:2000','abc:0:2'),
'good': (
('1:2','2:2','alice:2:2','αβ:2:2','1:'+ssm,ssm+':'+ssm)

View file

@ -248,6 +248,7 @@ i_obj='Data object'
s_obj='Testing data objects'
t_obj="
$objtest_py --coin=btc
$objtest_py --getobj --coin=btc
$objtest_py --coin=btc --testnet=1
$objtest_py --coin=ltc
$objtest_py --coin=ltc --testnet=1