base_proto.bitcoin.tw.addrs: refactor RPC routines

This commit is contained in:
The MMGen Project 2022-05-23 16:28:56 +00:00
commit 15cf330f9b
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
2 changed files with 105 additions and 77 deletions

View file

@ -18,95 +18,29 @@ from ....addr import CoinAddr
from ....rpc import rpc_init
from ....tw.addrs import TwAddrList
from ....tw.common import get_tw_label
from .common import BitcoinTwCommon
class BitcoinTwAddrList(TwAddrList):
class BitcoinTwAddrList(TwAddrList,BitcoinTwCommon):
has_age = True
async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None):
def check_dup_mmid(acct_labels):
mmid_prev,err = None,False
for mmid in sorted(a.mmid for a in acct_labels if a):
if mmid == mmid_prev:
err = True
msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n')
mmid_prev = mmid
if err:
die(4,'Tracking wallet is corrupted!')
def check_addr_array_lens(acct_pairs):
err = False
for label,addrs in acct_pairs:
if not label:
continue
if len(addrs) != 1:
err = True
if len(addrs) == 0:
msg(f'Label {label!r}: has no associated address!')
else:
msg(f'{addrs!r}: more than one {proto.coin} address in account!')
if err:
die(4,'Tracking wallet is corrupted!')
self.rpc = await rpc_init(proto)
self.total = proto.coin_amt('0')
self.proto = proto
lbl_id = ('account','label')['label_api' in self.rpc.caps]
for d in await self.rpc.call('listunspent',0):
if not lbl_id in d:
continue # skip coinbase outputs with missing account
if d['confirmations'] < minconf:
continue
label = get_tw_label(proto,d[lbl_id])
if label:
lm = label.mmid
if usr_addr_list and (lm not in usr_addr_list):
continue
if lm in self:
if self[lm]['addr'] != d['address']:
die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format(
proto.coin,
d['address'],
self[lm]['addr'] ))
else:
lm.confs = d['confirmations']
lm.txid = d['txid']
lm.date = None
self[lm] = {
'amt': proto.coin_amt('0'),
'lbl': label,
'addr': CoinAddr(proto,d['address']) }
amt = proto.coin_amt(d['amount'])
self[lm]['amt'] += amt
self.total += amt
# get balances with 'listunspent'
self.update( await self.get_unspent_by_mmid(minconf,usr_addr_list) )
self.total = sum(v['amt'] for v in self.values()) or proto.coin_amt('0')
# We use listaccounts only for empty addresses, as it shows false positive balances
# use 'listaccounts' only for empty addresses, as it shows false positive balances
if showempty or all_labels:
# for compatibility with old mmids, must use raw RPC rather than native data for matching
# args: minconf,watchonly, MUST use keys() so we get list, not dict
if 'label_api' in self.rpc.caps:
acct_list = await self.rpc.call('listlabels')
aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list])
acct_addrs = [list(a.keys()) for a in aa]
else:
acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L'
acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) # use raw list here
acct_labels = MMGenList([get_tw_label(proto,a) for a in acct_list])
check_dup_mmid(acct_labels)
assert len(acct_list) == len(acct_addrs),(
'listaccounts() and getaddressesbyaccount() not equal in length')
addr_pairs = list(zip(acct_labels,acct_addrs))
check_addr_array_lens(addr_pairs)
for label,addr_arr in addr_pairs:
if not label:
continue
if all_labels and not showempty and not label.comment:
continue
if usr_addr_list and (label.mmid not in usr_addr_list):
for label,addr in await self.get_addr_label_pairs():
if (not label
or (all_labels and not showempty and not label.comment)
or (usr_addr_list and (label.mmid not in usr_addr_list)) ):
continue
if label.mmid not in self:
self[label.mmid] = { 'amt':proto.coin_amt('0'), 'lbl':label, 'addr':'' }
if showbtcaddrs:
self[label.mmid]['addr'] = CoinAddr(proto,addr_arr[0])
self[label.mmid]['addr'] = CoinAddr(proto,addr)

View file

@ -11,3 +11,97 @@
"""
base_proto.bitcoin.tw: Bitcoin base protocol tracking wallet dependency classes
"""
from ....addr import CoinAddr
from ....util import die
from ....obj import MMGenList
from ....tw.common import get_tw_label
class BitcoinTwCommon:
async def get_addr_label_pairs(self):
"""
Get all the accounts in the tracking wallet and their associated addresses.
Returns list of (label,address) tuples.
"""
def check_dup_mmid(acct_labels):
mmid_prev,err = None,False
for mmid in sorted(a.mmid for a in acct_labels if a):
if mmid == mmid_prev:
err = True
msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n')
mmid_prev = mmid
if err:
die(4,'Tracking wallet is corrupted!')
def check_addr_array_lens(acct_pairs):
err = False
for label,addrs in acct_pairs:
if not label:
continue
if len(addrs) != 1:
err = True
if len(addrs) == 0:
msg(f'Label {label!r}: has no associated address!')
else:
msg(f'{addrs!r}: more than one {self.proto.coin} address in account!')
if err:
die(4,'Tracking wallet is corrupted!')
# for compatibility with old mmids, must use raw RPC rather than native data for matching
# args: minconf,watchonly, MUST use keys() so we get list, not dict
if 'label_api' in self.rpc.caps:
acct_list = await self.rpc.call('listlabels')
aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list])
acct_addrs = [list(a.keys()) for a in aa]
else:
acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L'
# use raw list here
acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list])
acct_labels = MMGenList([get_tw_label(self.proto,a) for a in acct_list])
check_dup_mmid(acct_labels)
assert len(acct_list) == len(acct_addrs), 'len(listaccounts()) != len(getaddressesbyaccount())'
addr_pairs = list(zip(acct_labels,acct_addrs))
check_addr_array_lens(addr_pairs)
return [(lbl,addrs[0]) for lbl,addrs in addr_pairs]
async def get_unspent_by_mmid(self,minconf=1,mmid_filter=[]):
"""
get unspent outputs in tracking wallet, compute balances per address
and return a dict with elements { 'twmmid': {'addr','lbl','amt'} }
"""
data = {}
lbl_id = ('account','label')['label_api' in self.rpc.caps]
for d in await self.rpc.call('listunspent',0):
if not lbl_id in d:
continue # skip coinbase outputs with missing account
if d['confirmations'] < minconf:
continue
label = get_tw_label(self.proto,d[lbl_id])
if label:
lm = label.mmid
if mmid_filter and (lm not in mmid_filter):
continue
if lm in data:
if data[lm]['addr'] != d['address']:
die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format(
self.proto.coin,
d['address'],
data[lm]['addr'] ))
else:
lm.confs = d['confirmations']
lm.txid = d['txid']
lm.date = None
data[lm] = {
'amt': self.proto.coin_amt('0'),
'lbl': label,
'addr': CoinAddr(self.proto,d['address']) }
amt = self.proto.coin_amt(d['amount'])
data[lm]['amt'] += amt
return data