CoinAmt: improvements

- do strict type checking in initializer, forbid double initialization
- add dynamic decimal precision based on protocol
- dunder method fixes, cleanups
- JSON-RPC library now returns floats (i.e. amounts) as strings instead of Decimal,
  eliminating an extra conversion step
This commit is contained in:
The MMGen Project 2024-10-18 10:32:05 +00:00
commit 50fc415282
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
27 changed files with 220 additions and 109 deletions

View file

@ -74,7 +74,7 @@ async def main():
bdr = (cur['time'] - old['time']) / sample_size
t_rem = remaining * int(bdr)
sub = cur['subsidy'] * proto.coin_amt.satoshi
sub = proto.coin_amt(cur['subsidy'], from_unit='satoshi' if isinstance(cur['subsidy'], int) else None)
print(
f'Current block: {tip}\n'

View file

@ -23,19 +23,17 @@ amt: MMGen CoinAmt and related classes
from decimal import Decimal
from .objmethods import Hilite, InitErrors
class DecimalNegateResult(Decimal):
pass
class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
"""
Instantiating with 'from_decimal' rounds value down to 'max_prec' precision.
For addition and subtraction, operand types must match.
For multiplication and division, operand types may differ.
Negative amounts, floor division and modulus operation are unimplemented.
Decimal precision is set in init_proto()
"""
coin = 'Coin'
color = 'yellow'
forbidden_types = (float,int)
max_prec = 0 # number of decimal places for this coin
max_amt = None # coin supply if known, otherwise None
@ -43,8 +41,8 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
def __new__(cls, num, from_unit=None, from_decimal=False):
if isinstance(num, cls):
return num
if isinstance(num, CoinAmt):
raise TypeError(f'CoinAmt: {num} is instance of {cls.__name__}')
try:
if from_unit:
@ -55,9 +53,8 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
assert isinstance(num, Decimal), f'number must be of type Decimal, not {type(num).__name__})'
me = Decimal.__new__(cls, num.quantize(Decimal('10') ** -cls.max_prec))
else:
for bad_type in cls.forbidden_types:
assert not isinstance(num, bad_type), f'number is of forbidden type {bad_type.__name__}'
me = Decimal.__new__(cls, str(num))
assert isinstance(num, str), f'non-string passed to {cls.__name__} initializer'
me = Decimal.__new__(cls, num)
assert me.normalize().as_tuple()[-1] >= -cls.max_prec, 'too many decimal places in coin amount'
if cls.max_amt:
assert me <= cls.max_amt, f'{me}: coin amount too large (>{cls.max_amt})'
@ -74,11 +71,8 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
cls.method_not_implemented()
def fmt(self, color=False, iwidth=1, prec=None): # iwidth: width of the integer part
s = str(self)
prec = prec or self.max_prec
if '.' in s:
if '.' in (s := str(self)):
a, b = s.split('.', 1)
return self.colorize(
a.rjust(iwidth) + '.' + b.ljust(prec)[:prec], # truncation, not rounding!
@ -113,28 +107,24 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
"""
we must allow other to be int(0) to use the sum() builtin
"""
if type(other) not in ( type(self), DecimalNegateResult ) and other != 0:
raise ValueError(
f'operand {other} of incorrect type ({type(other).__name__} != {type(self).__name__})')
return type(self)(Decimal.__add__(self, other, *args, **kwargs))
if type(other) is type(self) or (other == 0 and isinstance(other, int)):
return type(self)(Decimal.__add__(self, other, *args, **kwargs), from_decimal=True)
raise TypeError(
f'operand {other} is of incorrect type ({type(other).__name__} != {type(self).__name__})')
__radd__ = __add__
def __sub__(self, other, *args, **kwargs):
if type(other) is not type(self):
raise ValueError(
f'operand {other} of incorrect type ({type(other).__name__} != {type(self).__name__})')
return type(self)(Decimal.__sub__(self, other, *args, **kwargs))
if type(other) is type(self):
return type(self)(Decimal.__sub__(self, other, *args, **kwargs), from_decimal=True)
raise TypeError(
f'operand {other} is of incorrect type ({type(other).__name__} != {type(self).__name__})')
def copy_negate(self, *args, **kwargs):
"""
We implement this so that __add__() can check type, because:
class Decimal:
def __sub__(self, other, ...):
...
return self.__add__(other.copy_negate(), ...)
"""
return DecimalNegateResult(Decimal.copy_negate(self, *args, **kwargs))
def __rsub__(self, other, *args, **kwargs):
if type(other) is type(self):
return type(self)(Decimal.__rsub__(self, other, *args, **kwargs), from_decimal=True)
raise TypeError(
f'operand {other} is of incorrect type ({type(other).__name__} != {type(self).__name__})')
def __mul__(self, other, *args, **kwargs):
return type(self)('{:0.{p}f}'.format(
@ -150,6 +140,12 @@ class CoinAmt(Decimal, Hilite, InitErrors): # abstract class
p = self.max_prec
))
def __rtruediv__(self, other, *args, **kwargs):
return type(self)('{:0.{p}f}'.format(
Decimal.__rtruediv__(self, Decimal(other), *args, **kwargs),
p = self.max_prec
))
def __neg__(self, *args, **kwargs):
self.method_not_implemented()
@ -196,3 +192,7 @@ class ETHAmt(CoinAmt):
class ETCAmt(ETHAmt):
coin = 'ETC'
def CoinAmtChk(proto, num):
assert type(num) is proto.coin_amt, f'CoinAmtChk: {type(num)} != {proto.coin_amt}'
return num

View file

@ -112,10 +112,7 @@ class ImmutableAttr: # Descriptor
if set_none_ok:
assert typeconv and not isinstance(dtype,str), 'ImmutableAttr_check3'
if dtype is None:
# use instance-defined conversion function for this attribute
self.conv = lambda instance,value: getattr(instance.conv_funcs,self.name)(instance,value)
elif typeconv:
if typeconv:
# convert this attribute's type
if set_none_ok:
self.conv = lambda instance,value: None if value is None else dtype(value)
@ -190,7 +187,6 @@ class MMGenListItem(MMGenObject):
'valid_attrs',
'invalid_attrs',
'immutable_attr_init_check',
'conv_funcs',
}
def __init__(self,*args,**kwargs):

View file

@ -231,7 +231,7 @@ class MMGenRegtest(MMGenObject):
users = ('bob','alice')
for user in users:
out = await self.rpc_call('listunspent',0,wallet=user)
bal[user] = sum(e['amount'] for e in out)
bal[user] = sum(self.proto.coin_amt(e['amount']) for e in out)
fs = '{:<16} {:18.8f}'
for user in users:

View file

@ -51,7 +51,8 @@ class BitcoinTwAddresses(TwAddresses,BitcoinTwRPC):
addrs = await self.get_unspent_by_mmid(self.minconf)
msg('done')
amt0 = self.proto.coin_amt('0')
coin_amt = self.proto.coin_amt
amt0 = coin_amt('0')
self.total = sum((v['amt'] for v in addrs.values()), start=amt0)
msg_r('Getting labels and associated addresses...')
@ -71,7 +72,7 @@ class BitcoinTwAddresses(TwAddresses,BitcoinTwRPC):
label = get_obj( TwLabel, proto=self.proto, text=d['label'] )
if label:
assert label.mmid in addrs, f'{label.mmid!r} not found in addrlist!'
addrs[label.mmid]['recvd'] = d['amount']
addrs[label.mmid]['recvd'] = coin_amt(d['amount'])
addrs[label.mmid]['confs'] = d['confirmations']
msg('done')

View file

@ -102,7 +102,7 @@ class BitcoinTwRPC(TwRPC):
'amt': amt0,
'lbl': label,
'addr': CoinAddr(self.proto,d['address']) }
amt = self.proto.coin_amt(d['amount'])
data[lm]['amt'] += amt
data[lm]['amt'] += self.proto.coin_amt(d['amount'])
return data

View file

@ -86,7 +86,7 @@ class BitcoinTwTransaction:
yield e.coin_addr
def total(data):
return self.proto.coin_amt( sum(d.data['value'] for d in data) )
return sum(coin_amt(d.data['value']) for d in data)
def get_best_comment():
"""
@ -97,6 +97,7 @@ class BitcoinTwTransaction:
ret = vouts_labels('outputs') or vouts_labels('inputs')
return ret[0] if ret else TwComment('')
coin_amt = self.proto.coin_amt
# 'outputs' refers to wallet-related outputs only
self.vouts_info = {
'inputs': gen_vouts_info( gen_prevouts_data() ),
@ -107,7 +108,7 @@ class BitcoinTwTransaction:
'outputs': max(len(addr) for addr in gen_all_addrs('outputs'))
}
self.inputs_total = total(self.vouts_info['inputs'])
self.outputs_total = self.proto.coin_amt(sum(i['value'] for i in self.tx['decoded']['vout']))
self.outputs_total = sum(coin_amt(i['value']) for i in self.tx['decoded']['vout'])
self.wallet_outputs_total = total(self.vouts_info['outputs'])
self.fee = self.inputs_total - self.outputs_total
self.nOutputs = len(self.tx['decoded']['vout'])

View file

@ -51,7 +51,7 @@ def DeserializeTX(proto, txhex):
return int(bytes_le[::-1].hex(), 16)
def bytes2coin_amt(bytes_le):
return proto.coin_amt(bytes2int(bytes_le) * proto.coin_amt.satoshi)
return proto.coin_amt(bytes2int(bytes_le), from_unit='satoshi')
def bshift(n, skip=False, sub_null=False):
nonlocal idx, raw_tx
@ -114,7 +114,7 @@ def DeserializeTX(proto, txhex):
d['num_txouts'] = readVInt()
d['txouts'] = MMGenList([{
'amount': bytes2coin_amt(bshift(8)),
'amt': bytes2coin_amt(bshift(8)),
'scriptPubKey': bshift(readVInt()).hex()
} for i in range(d['num_txouts'])])
@ -317,7 +317,7 @@ class Base(TxBase.Base):
check_equal(
'outputs',
sorted((o['address'], self.proto.coin_amt(o['amount'])) for o in dtx.txouts),
sorted((o['address'], o['amt']) for o in dtx.txouts),
sorted((o.addr, o.amt) for o in self.outputs))
if str(self.txid) != make_chksum_6(bytes.fromhex(dtx.unsigned_hex)).upper():

View file

@ -37,7 +37,7 @@ class TxInfo(TxInfo):
)
def format_abs_fee(self,color,iwidth):
return self.tx.proto.coin_amt(self.tx.fee).fmt(color=color,iwidth=iwidth)
return self.tx.fee.fmt(color=color, iwidth=iwidth)
def format_verbose_footer(self):
tx = self.tx

View file

@ -66,8 +66,7 @@ class New(Base,TxBase.New):
# given tx size, rel fee and units, return absolute fee
def fee_rel2abs(self, tx_size, units, amt_in_units, unit):
if tx_size:
return self.proto.coin_amt(
amt_in_units * tx_size * getattr(self.proto.coin_amt, units[unit]))
return self.proto.coin_amt(amt_in_units * tx_size, from_unit=units[unit])
else:
return None
@ -110,7 +109,7 @@ class New(Base,TxBase.New):
msg(self.no_chg_msg)
self.outputs.pop(self.chg_idx)
else:
self.update_output_amt(self.chg_idx, self.proto.coin_amt(funds_left))
self.update_output_amt(self.chg_idx, funds_left)
def check_fee(self):
fee = self.sum_inputs() - self.sum_outputs()
@ -119,7 +118,7 @@ class New(Base,TxBase.New):
die( 'MaxFeeExceeded', f'Transaction fee of {fee} {c} too high! (> {self.proto.max_tx_fee} {c})' )
def final_inputs_ok_msg(self,funds_left):
return 'Transaction produces {} {} in change'.format(self.proto.coin_amt(funds_left).hl(), self.coin)
return 'Transaction produces {} {} in change'.format(funds_left.hl(), self.coin)
async def create_serialized(self,locktime=None,bump=None):

View file

@ -25,3 +25,4 @@ class testnet(mainnet):
class regtest(testnet):
chain_names = ['developmentchain']
decimal_prec = 64

View file

@ -42,7 +42,8 @@ class TokenCommon(MMGenObject):
def transferdata2amt(self,data): # online
return self.proto.coin_amt(
int(parse_abi(data)[-1], 16) * self.base_unit)
int(parse_abi(data)[-1], 16) * self.base_unit,
from_decimal = True)
async def do_call(self,method_sig,method_args='',toUnit=False):
data = self.create_method_id(method_sig) + method_args
@ -59,7 +60,8 @@ class TokenCommon(MMGenObject):
async def get_balance(self,acct_addr):
return self.proto.coin_amt(
await self.do_call('balanceOf(address)', acct_addr.rjust(64, '0'), toUnit=True))
await self.do_call('balanceOf(address)', acct_addr.rjust(64, '0'), toUnit=True),
from_decimal = True)
def strip(self,s):
return ''.join([chr(b) for b in s if 32 <= b <= 127]).strip()

View file

@ -36,6 +36,7 @@ class mainnet(CoinProtocol.DummyWIF,CoinProtocol.Secp256k1):
base_coin = 'ETH'
avg_bdi = 15
ignore_daemon_version = False
decimal_prec = 36
chain_ids = {
1: 'ethereum', # ethereum mainnet

View file

@ -107,7 +107,7 @@ class EthereumTwUnspentOutputs(EthereumTwView,TwUnspentOutputs):
return [{
'account': TwLabel(self.proto,d['mmid']+' '+d['comment']),
'address': d['addr'],
'amount': await self.twctl.get_balance(d['addr']),
'amt': await self.twctl.get_balance(d['addr']),
'confirmations': 0, # TODO
} for d in wl]

View file

@ -23,7 +23,7 @@ class Bump(Completed,New,TxBase.Bump):
@property
def min_fee(self):
return self.proto.coin_amt(self.fee * Decimal('1.101'))
return self.fee * Decimal('1.101')
def bump_fee(self,idx,fee):
self.txobj['gasPrice'] = self.fee_abs2gas(fee)

View file

@ -124,9 +124,7 @@ class New(Base,TxBase.New):
# given rel fee and units, return absolute fee using self.gas
def fee_rel2abs(self, tx_size, units, amt_in_units, unit):
return self.proto.coin_amt(
self.proto.coin_amt(amt_in_units, units[unit]).toWei() * self.gas.toWei(),
from_unit = 'wei')
return self.proto.coin_amt(amt_in_units, from_unit=units[unit]) * self.gas.toWei()
# given fee estimate (gas price) in wei, return absolute fee, adjusting by self.cfg.fee_adjust
def fee_est2abs(self,rel_fee,fe_type=None):
@ -151,7 +149,7 @@ class New(Base,TxBase.New):
def update_change_output(self,funds_left):
if self.outputs and self.outputs[0].is_chg:
self.update_output_amt(0,self.proto.coin_amt(funds_left))
self.update_output_amt(0, funds_left)
async def get_input_addrs_from_cmdline(self):
ret = []
@ -175,11 +173,8 @@ class New(Base,TxBase.New):
return ret
def final_inputs_ok_msg(self, funds_left):
chg = '0' if (self.outputs and self.outputs[0].is_chg) else funds_left
return 'Transaction leaves {} {} in the sender’s account'.format(
self.proto.coin_amt(chg).hl(),
self.proto.coin
)
chg = self.proto.coin_amt('0') if (self.outputs and self.outputs[0].is_chg) else funds_left
return 'Transaction leaves {} {} in the sender’s account'.format(chg.hl(), self.proto.coin)
class TokenNew(TokenBase,New):
desc = 'transaction'

View file

@ -55,6 +55,7 @@ class CoinProtocol(MMGenObject):
is_fork_of = None
chain_names = None
networks = ('mainnet','testnet','regtest')
decimal_prec = 28
def __init__(self,cfg,coin,name,network,tokensym=None,need_amt=False):
self.cfg = cfg
@ -105,8 +106,10 @@ class CoinProtocol(MMGenObject):
if need_amt:
from . import amt
from decimal import getcontext
self.coin_amt = getattr(amt,self.coin_amt)
self.max_tx_fee = self.coin_amt(self.max_tx_fee) if hasattr(self,'max_tx_fee') else None
getcontext().prec = self.decimal_prec
else:
self.coin_amt = None
self.max_tx_fee = None

View file

@ -379,17 +379,20 @@ class RPCClient(MMGenObject):
def process_http_resp(self,run_ret,batch=False,json_rpc=True):
def float_parser(n):
return n
text, status = run_ret
if status == 200:
dmsg_rpc(' RPC RESPONSE data ==>\n{}\n',text,is_json=True)
m = None
if batch:
return [r['result'] for r in json.loads(text,parse_float=Decimal)]
return [r['result'] for r in json.loads(text,parse_float=float_parser)]
else:
try:
if json_rpc:
ret = json.loads(text,parse_float=Decimal)['result']
ret = json.loads(text,parse_float=float_parser)['result']
if isinstance(ret,list) and ret and type(ret[0]) == dict and 'success' in ret[0]:
for res in ret:
if not res['success']:
@ -397,7 +400,7 @@ class RPCClient(MMGenObject):
assert False
return ret
else:
return json.loads(text,parse_float=Decimal)
return json.loads(text,parse_float=float_parser)
except:
if not m:
t = json.loads(text)

View file

@ -15,6 +15,7 @@ tw.addresses: Tracking wallet listaddresses class for the MMGen suite
from ..util import msg,suf,is_int
from ..obj import MMGenListItem,ImmutableAttr,ListItemAttr,TwComment,NonNegativeInt
from ..addr import CoinAddr,MMGenID,MMGenAddrType
from ..amt import CoinAmtChk
from ..color import red,green,yellow
from .view import TwView
from .shared import TwMMGenID
@ -54,8 +55,8 @@ class TwAddresses(TwView):
al_id = ImmutableAttr(str) # set to '_' for non-MMGen addresses
confs = ImmutableAttr(int,typeconv=False)
comment = ListItemAttr(TwComment,reassign_ok=True)
amt = ImmutableAttr(None)
recvd = ImmutableAttr(None)
amt = ImmutableAttr(CoinAmtChk, include_proto=True)
recvd = ImmutableAttr(CoinAmtChk, include_proto=True)
date = ListItemAttr(int,typeconv=False,reassign_ok=True)
skip = ListItemAttr(str,typeconv=False,reassign_ok=True)
@ -63,14 +64,6 @@ class TwAddresses(TwView):
self.__dict__['proto'] = proto
MMGenListItem.__init__(self,**kwargs)
class conv_funcs:
@staticmethod
def amt(instance,value):
return instance.proto.coin_amt(value)
@staticmethod
def recvd(instance,value):
return instance.proto.coin_amt(value)
@property
def coinaddr_list(self):
return [d.addr for d in self.data]

View file

@ -30,6 +30,7 @@ from ..obj import (
CoinTxID,
NonNegativeInt )
from ..addr import CoinAddr
from ..amt import CoinAmtChk
from .shared import TwMMGenID,get_tw_label
from .view import TwView
@ -55,8 +56,8 @@ class TwUnspentOutputs(TwView):
class MMGenTwUnspentOutput(MMGenListItem):
txid = ListItemAttr(CoinTxID)
vout = ListItemAttr(NonNegativeInt)
amt = ImmutableAttr(None)
amt2 = ListItemAttr(None) # the ETH balance for token account
amt = ImmutableAttr(CoinAmtChk, include_proto=True)
amt2 = ListItemAttr(CoinAmtChk, include_proto=True) # the ETH balance for token account
comment = ListItemAttr(TwComment,reassign_ok=True)
twmmid = ImmutableAttr(TwMMGenID,include_proto=True)
addr = ImmutableAttr(CoinAddr,include_proto=True)
@ -69,14 +70,6 @@ class TwUnspentOutputs(TwView):
self.__dict__['proto'] = proto
MMGenListItem.__init__(self,**kwargs)
class conv_funcs:
@staticmethod
def amt(instance,value):
return instance.proto.coin_amt(value)
@staticmethod
def amt2(instance,value):
return instance.proto.coin_amt(value)
async def __init__(self,cfg,proto,minconf=1,addrs=[]):
await super().__init__(cfg,proto)
self.minconf = minconf
@ -94,10 +87,11 @@ class TwUnspentOutputs(TwView):
continue # coinbase outputs have no account field
l = get_tw_label(self.proto,o[lbl_id])
if l:
if not 'amt' in o:
o['amt'] = self.proto.coin_amt(o['amount'])
o.update({
'twmmid': l.mmid,
'comment': l.comment or '',
'amt': self.proto.coin_amt(o['amount']),
'addr': CoinAddr(self.proto,o['address']),
'confs': o['confirmations']
})

View file

@ -24,12 +24,13 @@ from ..obj import (
HexStr,
NonNegativeInt
)
from ..amt import CoinAmtChk
from ..addr import MMGenID,CoinAddr
from ..util import msg,ymsg,fmt,remove_dups,make_timestamp,die
class MMGenTxIO(MMGenListItem):
vout = ListItemAttr(NonNegativeInt)
amt = ImmutableAttr(None)
amt = ImmutableAttr(CoinAmtChk, include_proto=True)
comment = ListItemAttr(TwComment,reassign_ok=True)
mmid = ListItemAttr(MMGenID,include_proto=True)
addr = ImmutableAttr(CoinAddr,include_proto=True)
@ -56,11 +57,6 @@ class MMGenTxIO(MMGenListItem):
'S' if self.addr.addr_fmt == 'p2sh' else
None )
class conv_funcs:
@staticmethod
def amt(instance,value):
return instance.proto.coin_amt(value)
class MMGenTxIOList(list,MMGenObject):
def __init__(self,parent,data=None):
@ -145,7 +141,7 @@ class Base(MMGenObject):
olist = self.outputs[:exclude] + self.outputs[exclude+1:]
if not olist:
return self.proto.coin_amt('0')
return self.proto.coin_amt(sum(e.amt for e in olist))
return sum(e.amt for e in olist)
def _chg_output_ops(self,op):
is_chgs = [x.is_chg for x in self.outputs]

View file

@ -111,6 +111,9 @@ class New(Base):
# relative fee is N+<first letter of unit name>
def feespec2abs(self, fee_arg, tx_size):
if type(fee_arg) is self.proto.coin_amt:
return fee_arg
if fee := get_obj(self.proto.coin_amt, num=fee_arg, silent=True):
return fee

View file

@ -391,9 +391,9 @@ class MoneroMMGenTX:
dest = None if d.dest is None else XMRWalletAddrSpec(d.dest),
dest_address = CoinAddr(proto,d.dest_address),
txid = CoinTxID(d.txid),
amount = proto.coin_amt(d.amount,from_unit='atomic'),
amount = d.amount,
priority = self.cfg.priority if self.name in ('NewSigned','NewUnsigned') else d.priority,
fee = proto.coin_amt(d.fee,from_unit='atomic'),
fee = d.fee,
blob = d.blob,
metadata = d.metadata,
unsigned_txset = d.unsigned_txset,
@ -1161,8 +1161,8 @@ class MoneroWalletOps:
dest = None,
dest_address = addr,
txid = res['tx_hash'],
amount = res['amount'],
fee = res['fee'],
amount = self.proto.coin_amt(res['amount'], from_unit='atomic'),
fee = self.proto.coin_amt(res['fee'], from_unit='atomic'),
blob = res['tx_blob'],
metadata = res['tx_metadata'],
unsigned_txset = res['unsigned_txset'] if self.cfg.watch_only else None,
@ -1196,8 +1196,8 @@ class MoneroWalletOps:
dest_addr_idx),
dest_address = addr,
txid = res['tx_hash_list'][0],
amount = res['amount_list'][0],
fee = res['fee_list'][0],
amount = self.proto.coin_amt(res['amount_list'][0], from_unit='atomic'),
fee = self.proto.coin_amt(res['fee_list'][0], from_unit='atomic'),
blob = res['tx_blob_list'][0],
metadata = res['tx_metadata_list'][0],
unsigned_txset = res['unsigned_txset'] if self.cfg.watch_only else None,

View file

@ -61,7 +61,7 @@ def strip_ansi_escapes(s):
cmdtest_py_log_fn = 'cmdtest.py.log'
cmdtest_py_error_fn = 'cmdtest.py.err'
parity_dev_amt = 1606938044258990275541962092341162602522202993782792835301376
ascii_uc = ''.join(map(chr,list(range(65,91)))) # 26 chars
ascii_lc = ''.join(map(chr,list(range(97,123)))) # 26 chars
lat_accent = ''.join(map(chr,list(range(192,383)))) # 191 chars, L,S

View file

@ -12,6 +12,6 @@ if overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA'):
return json.loads(get_data_from_file(
self.cfg,
overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA')
), parse_float=Decimal)
))
BitcoinTwUnspentOutputs.get_rpc_data = overlay_fake_BitcoinTwUnspentOutputs.get_rpc_data

View file

@ -4,9 +4,17 @@
test.unit_tests_d.ut_obj: data object unit tests for the MMGen suite
"""
from decimal import Decimal
from decimal import Decimal, getcontext
from ..include.common import vmsg
from ..include.common import vmsg, cfg, parity_dev_amt
from mmgen.protocol import init_proto
def test_equal(res, chk):
vmsg(f' checking {res}')
if type(res) is type:
assert res is chk, f'{res} != {chk}'
else:
assert res == chk, f'{res} != {chk}'
def coinamt_test(cls, aa, bb, ut):
@ -46,8 +54,8 @@ def coinamt_test(cls, aa, bb, ut):
('modulus', 'NotImplementedError', 'not implemented', lambda: b % a),
('floor division', 'NotImplementedError', 'not implemented', lambda: b // a),
('negative result', 'ObjectInitError', 'cannot be negative', lambda: a - b),
('operand type', 'ValueError', 'incorrect type', lambda: a + B),
('operand type', 'ValueError', 'incorrect type', lambda: b - A),
('operand type', 'TypeError', 'incorrect type', lambda: a + B),
('operand type', 'TypeError', 'incorrect type', lambda: b - A),
)
if cls.max_amt is not None:
@ -81,3 +89,118 @@ class unit_tests:
):
coinamt_test(cls, aa, bb, ut)
return True
def coinamt2(self, name, ut, desc='CoinAmt class'):
from decimal import Decimal
proto = init_proto(cfg, 'btc', network='testnet', need_amt=True)
test_equal(getcontext().prec, proto.decimal_prec)
coin_amt = proto.coin_amt
test_equal(coin_amt.__name__, 'BTCAmt')
a = coin_amt('1.234')
a2 = coin_amt('2.468')
# addition with integer zero:
b = a + 0 # __add__
b = 0 + a # __radd__
test_equal(sum([a, a]), a2) # __radd__ (sum() starts with integer 0)
# __add__
b = coin_amt('333.2456')
test_equal(a + b, coin_amt('334.4796'))
# __sub__
test_equal(a - coin_amt('1'), coin_amt('0.234'))
test_equal(coin_amt('2') - a, coin_amt('0.766'))
# __mul__
b = a * 2
test_equal(type(b), coin_amt)
test_equal(b, a2)
# __rmul__
b = 2 * a
test_equal(type(b), coin_amt)
test_equal(b, a2)
# __truediv__
b = a / 2
test_equal(type(b), coin_amt)
test_equal(b, coin_amt('0.617'))
# __rtruediv__
b = 2 / a
test_equal(type(b), coin_amt)
test_equal(b, coin_amt('1.62074554'))
def bad1(): b = a + 1
def bad2(): b = a - 1
def bad3(): a + Decimal(1)
def bad4(): b = a + 0.0
def bad5(): b = a - 0.0
def bad1r(): b = 1 + a
def bad2r(): b = 3 - a
def bad3r(): Decimal(1) + a
def bad4r(): b = 0.0 + a
def bad5r(): b = 0.0 - a
def bad10(): b = coin_amt('1') - a
def bad11(): b = a * -2
def bad12(): b = a / -2
def bad13(): b = a - coin_amt('2')
def bad14(): b = -2 * a
def bad15(): b = -2 / a
def bad16(): b = coin_amt(a)
vmsg('Testing error handling:')
ut.process_bad_data(
(
('addition with int', 'TypeError', 'incorrect type', bad1),
('subtraction with int', 'TypeError', 'incorrect type', bad2),
('addition with Decimal', 'TypeError', 'incorrect type', bad3),
('addition with float', 'TypeError', 'incorrect type', bad4),
('subtraction with float', 'TypeError', 'incorrect type', bad5),
('addition with int', 'TypeError', 'incorrect type', bad1r),
('subtraction with int', 'TypeError', 'incorrect type', bad2r),
('addition with Decimal', 'TypeError', 'incorrect type', bad3r),
('addition with float', 'TypeError', 'incorrect type', bad4r),
('subtraction with float', 'TypeError', 'incorrect type', bad5r),
('negative result', 'ObjectInitError', 'cannot be negative', bad10),
('negative result', 'ObjectInitError', 'cannot be negative', bad11),
('negative result', 'ObjectInitError', 'cannot be negative', bad12),
('negative result', 'ObjectInitError', 'cannot be negative', bad13),
('negative result', 'ObjectInitError', 'cannot be negative', bad14),
('negative result', 'ObjectInitError', 'cannot be negative', bad15),
('double initialization', 'TypeError', 'is instance', bad16),
),
pfx = '')
return True
def coinamt_alt2(self, name, ut, desc='CoinAmt class (altcoins)'):
proto = init_proto(cfg, 'etc', network='regtest', need_amt=True)
test_equal(getcontext().prec, proto.decimal_prec)
coin_amt = proto.coin_amt
dev_amt = coin_amt(parity_dev_amt, from_unit='wei')
dev_amt_s = '1606938044258990275541962092341162602522202.993782792835301376'
dev_amt_a1 = '1606938044258990275541962092341162602522203.993782792835301377'
dev_amt_s1 = '1606938044258990275541962092341162602522201.993782792835301375'
dev_amt_d2 = '803469022129495137770981046170581301261101.496891396417650688'
dev_amt_d10 = '160693804425899027554196209234116260252220.299378279283530138'
test_equal(str(dev_amt), dev_amt_s)
addend = coin_amt('1.000000000000000001')
test_equal(dev_amt + addend, coin_amt(dev_amt_a1))
test_equal(dev_amt - addend, coin_amt(dev_amt_s1))
test_equal(dev_amt / coin_amt('2'), coin_amt(dev_amt_d2))
test_equal(dev_amt / coin_amt('10'), coin_amt(dev_amt_d10))
test_equal(2 / coin_amt('0.3456'), coin_amt('5.787037037037037037'))
test_equal(2.345 * coin_amt('2.3456'), coin_amt('5.500432000000000458'))
return True

View file

@ -77,8 +77,8 @@ async def test_tx(tx_proto,tx_hex,desc,n):
fs = 'address of output {} does not match\nA: {}\nB: {}'
assert A == B, fs.format(i,A,B)
A = a[i]['value']
B = b[i]['amount']
A = tx_proto.coin_amt(a[i]['value'])
B = b[i]['amt']
fs = 'value of output {} does not match\nA: {}\nB: {}'
assert A == B, fs.format(i,A,B)