From 15cf330f9b5796bc3f8b50ca70fbbffac472409a Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Mon, 23 May 2022 16:28:56 +0000 Subject: [PATCH] base_proto.bitcoin.tw.addrs: refactor RPC routines --- mmgen/base_proto/bitcoin/tw/addrs.py | 88 ++++--------------------- mmgen/base_proto/bitcoin/tw/common.py | 94 +++++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 77 deletions(-) diff --git a/mmgen/base_proto/bitcoin/tw/addrs.py b/mmgen/base_proto/bitcoin/tw/addrs.py index 7a5fd87f..89e7b117 100755 --- a/mmgen/base_proto/bitcoin/tw/addrs.py +++ b/mmgen/base_proto/bitcoin/tw/addrs.py @@ -18,95 +18,29 @@ from ....addr import CoinAddr from ....rpc import rpc_init from ....tw.addrs import TwAddrList from ....tw.common import get_tw_label +from .common import BitcoinTwCommon -class BitcoinTwAddrList(TwAddrList): +class BitcoinTwAddrList(TwAddrList,BitcoinTwCommon): has_age = True async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None): - def check_dup_mmid(acct_labels): - mmid_prev,err = None,False - for mmid in sorted(a.mmid for a in acct_labels if a): - if mmid == mmid_prev: - err = True - msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n') - mmid_prev = mmid - if err: - die(4,'Tracking wallet is corrupted!') - - def check_addr_array_lens(acct_pairs): - err = False - for label,addrs in acct_pairs: - if not label: - continue - if len(addrs) != 1: - err = True - if len(addrs) == 0: - msg(f'Label {label!r}: has no associated address!') - else: - msg(f'{addrs!r}: more than one {proto.coin} address in account!') - if err: - die(4,'Tracking wallet is corrupted!') - self.rpc = await rpc_init(proto) - self.total = proto.coin_amt('0') self.proto = proto - lbl_id = ('account','label')['label_api' in self.rpc.caps] - for d in await self.rpc.call('listunspent',0): - if not lbl_id in d: - continue # skip coinbase outputs with missing account - if d['confirmations'] < minconf: - continue - label = get_tw_label(proto,d[lbl_id]) - if label: - lm = label.mmid - if usr_addr_list and (lm not in usr_addr_list): - continue - if lm in self: - if self[lm]['addr'] != d['address']: - die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format( - proto.coin, - d['address'], - self[lm]['addr'] )) - else: - lm.confs = d['confirmations'] - lm.txid = d['txid'] - lm.date = None - self[lm] = { - 'amt': proto.coin_amt('0'), - 'lbl': label, - 'addr': CoinAddr(proto,d['address']) } - amt = proto.coin_amt(d['amount']) - self[lm]['amt'] += amt - self.total += amt + # get balances with 'listunspent' + self.update( await self.get_unspent_by_mmid(minconf,usr_addr_list) ) + self.total = sum(v['amt'] for v in self.values()) or proto.coin_amt('0') - # We use listaccounts only for empty addresses, as it shows false positive balances + # use 'listaccounts' only for empty addresses, as it shows false positive balances if showempty or all_labels: - # for compatibility with old mmids, must use raw RPC rather than native data for matching - # args: minconf,watchonly, MUST use keys() so we get list, not dict - if 'label_api' in self.rpc.caps: - acct_list = await self.rpc.call('listlabels') - aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list]) - acct_addrs = [list(a.keys()) for a in aa] - else: - acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L' - acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) # use raw list here - acct_labels = MMGenList([get_tw_label(proto,a) for a in acct_list]) - check_dup_mmid(acct_labels) - assert len(acct_list) == len(acct_addrs),( - 'listaccounts() and getaddressesbyaccount() not equal in length') - addr_pairs = list(zip(acct_labels,acct_addrs)) - check_addr_array_lens(addr_pairs) - for label,addr_arr in addr_pairs: - if not label: - continue - if all_labels and not showempty and not label.comment: - continue - if usr_addr_list and (label.mmid not in usr_addr_list): + for label,addr in await self.get_addr_label_pairs(): + if (not label + or (all_labels and not showempty and not label.comment) + or (usr_addr_list and (label.mmid not in usr_addr_list)) ): continue if label.mmid not in self: self[label.mmid] = { 'amt':proto.coin_amt('0'), 'lbl':label, 'addr':'' } if showbtcaddrs: - self[label.mmid]['addr'] = CoinAddr(proto,addr_arr[0]) + self[label.mmid]['addr'] = CoinAddr(proto,addr) diff --git a/mmgen/base_proto/bitcoin/tw/common.py b/mmgen/base_proto/bitcoin/tw/common.py index b8a05d6b..4088927d 100755 --- a/mmgen/base_proto/bitcoin/tw/common.py +++ b/mmgen/base_proto/bitcoin/tw/common.py @@ -11,3 +11,97 @@ """ base_proto.bitcoin.tw: Bitcoin base protocol tracking wallet dependency classes """ + +from ....addr import CoinAddr +from ....util import die +from ....obj import MMGenList +from ....tw.common import get_tw_label + +class BitcoinTwCommon: + + async def get_addr_label_pairs(self): + """ + Get all the accounts in the tracking wallet and their associated addresses. + Returns list of (label,address) tuples. + """ + def check_dup_mmid(acct_labels): + mmid_prev,err = None,False + for mmid in sorted(a.mmid for a in acct_labels if a): + if mmid == mmid_prev: + err = True + msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n') + mmid_prev = mmid + if err: + die(4,'Tracking wallet is corrupted!') + + def check_addr_array_lens(acct_pairs): + err = False + for label,addrs in acct_pairs: + if not label: + continue + if len(addrs) != 1: + err = True + if len(addrs) == 0: + msg(f'Label {label!r}: has no associated address!') + else: + msg(f'{addrs!r}: more than one {self.proto.coin} address in account!') + if err: + die(4,'Tracking wallet is corrupted!') + + # for compatibility with old mmids, must use raw RPC rather than native data for matching + # args: minconf,watchonly, MUST use keys() so we get list, not dict + if 'label_api' in self.rpc.caps: + acct_list = await self.rpc.call('listlabels') + aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list]) + acct_addrs = [list(a.keys()) for a in aa] + else: + acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L' + # use raw list here + acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) + acct_labels = MMGenList([get_tw_label(self.proto,a) for a in acct_list]) + check_dup_mmid(acct_labels) + assert len(acct_list) == len(acct_addrs), 'len(listaccounts()) != len(getaddressesbyaccount())' + addr_pairs = list(zip(acct_labels,acct_addrs)) + check_addr_array_lens(addr_pairs) + return [(lbl,addrs[0]) for lbl,addrs in addr_pairs] + + async def get_unspent_by_mmid(self,minconf=1,mmid_filter=[]): + """ + get unspent outputs in tracking wallet, compute balances per address + and return a dict with elements { 'twmmid': {'addr','lbl','amt'} } + """ + data = {} + lbl_id = ('account','label')['label_api' in self.rpc.caps] + + for d in await self.rpc.call('listunspent',0): + + if not lbl_id in d: + continue # skip coinbase outputs with missing account + + if d['confirmations'] < minconf: + continue + + label = get_tw_label(self.proto,d[lbl_id]) + + if label: + lm = label.mmid + if mmid_filter and (lm not in mmid_filter): + continue + if lm in data: + if data[lm]['addr'] != d['address']: + die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format( + self.proto.coin, + d['address'], + data[lm]['addr'] )) + else: + lm.confs = d['confirmations'] + lm.txid = d['txid'] + lm.date = None + data[lm] = { + 'amt': self.proto.coin_amt('0'), + 'lbl': label, + 'addr': CoinAddr(self.proto,d['address']) } + amt = self.proto.coin_amt(d['amount']) + data[lm]['amt'] += amt + + return data