|
@@ -0,0 +1,366 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+#
|
|
|
+# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
|
|
|
+# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
|
|
|
+# Licensed under the GNU General Public License, Version 3:
|
|
|
+# https://www.gnu.org/licenses
|
|
|
+# Public project repositories:
|
|
|
+# https://github.com/mmgen/mmgen
|
|
|
+# https://gitlab.com/mmgen/mmgen
|
|
|
+
|
|
|
+"""
|
|
|
+base_proto.bitcoin.tw.txhistory: Bitcoin base protocol tracking wallet transaction history class
|
|
|
+"""
|
|
|
+
|
|
|
+from collections import namedtuple
|
|
|
+from ....globalvars import g
|
|
|
+from ....tw.txhistory import TwTxHistory
|
|
|
+from ....tw.common import get_tw_label,TwMMGenID
|
|
|
+from ....addr import CoinAddr
|
|
|
+from ....util import msg,msg_r,remove_dups
|
|
|
+from ....color import nocolor,red,pink,gray
|
|
|
+from ....obj import TwComment,CoinTxID,Int
|
|
|
+from .common import BitcoinTwCommon
|
|
|
+
|
|
|
+class BitcoinTwTransaction(BitcoinTwCommon):
|
|
|
+
|
|
|
+ def __init__(self,parent,proto,rpc,
|
|
|
+ idx, # unique numeric identifier of this transaction in listing
|
|
|
+ unspent_info, # addrs in wallet with balances: { 'mmid': {'addr','label','amt'} }
|
|
|
+ mm_map, # all addrs in wallet: ['addr', ['twmmid','label']]
|
|
|
+ tx, # the decoded transaction data
|
|
|
+ wallet_vouts, # list of ints - wallet-related vouts
|
|
|
+ prevouts, # list of (txid,vout) pairs
|
|
|
+ prevout_txs # decoded transaction data for prevouts
|
|
|
+ ):
|
|
|
+
|
|
|
+ self.parent = parent
|
|
|
+ self.proto = proto
|
|
|
+ self.rpc = rpc
|
|
|
+ self.unspent_info = unspent_info
|
|
|
+ self.tx = tx
|
|
|
+
|
|
|
+ def gen_prevouts_data():
|
|
|
+ _d = namedtuple('prevout_data',['txid','data'])
|
|
|
+ for tx in prevout_txs:
|
|
|
+ for e in prevouts:
|
|
|
+ if e.txid == tx['txid']:
|
|
|
+ yield _d( e.txid, tx['vout'][e.vout] )
|
|
|
+
|
|
|
+ def gen_wallet_vouts_data():
|
|
|
+ _d = namedtuple('wallet_vout_data',['txid','data'])
|
|
|
+ txid = self.tx['txid']
|
|
|
+ vouts = self.tx['decoded']['vout']
|
|
|
+ for n in wallet_vouts:
|
|
|
+ yield _d( txid, vouts[n] )
|
|
|
+
|
|
|
+ def gen_vouts_info(data):
|
|
|
+ _d = namedtuple('vout_info',['txid','coin_addr','twlabel','data'])
|
|
|
+ def gen():
|
|
|
+ for d in data:
|
|
|
+ addr = d.data['scriptPubKey'].get('address') or d.data['scriptPubKey']['addresses'][0]
|
|
|
+ yield _d(
|
|
|
+ txid = d.txid,
|
|
|
+ coin_addr = addr,
|
|
|
+ twlabel = mm_map[addr] if (addr in mm_map and mm_map[addr].twmmid) else None,
|
|
|
+ data = d.data )
|
|
|
+ return sorted(
|
|
|
+ gen(),
|
|
|
+ key = lambda d: d.twlabel.twmmid.sort_key if d.twlabel else 'zz_' + d.coin_addr )
|
|
|
+
|
|
|
+ def gen_all_addrs(src):
|
|
|
+ for e in self.vouts_info[src]:
|
|
|
+ if e.twlabel:
|
|
|
+ mmid = e.twlabel.twmmid
|
|
|
+ yield (
|
|
|
+ (mmid if mmid.type == 'mmgen' else mmid.split(':',1)[1]) +
|
|
|
+ ('*' if mmid in self.unspent_info else '')
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ yield e.coin_addr
|
|
|
+
|
|
|
+ def total(data):
|
|
|
+ return self.proto.coin_amt( sum(d.data['value'] for d in data) )
|
|
|
+
|
|
|
+ def get_best_label():
|
|
|
+ """
|
|
|
+ find the most relevant label for tabular (squeezed) display
|
|
|
+ """
|
|
|
+ def vouts_labels(src):
|
|
|
+ return [ d.twlabel.label for d in self.vouts_info[src] if d.twlabel and d.twlabel.label ]
|
|
|
+ ret = vouts_labels('outputs') or vouts_labels('inputs')
|
|
|
+ return ret[0] if ret else TwComment('')
|
|
|
+
|
|
|
+ # 'outputs' refers to wallet-related outputs only
|
|
|
+ self.vouts_info = {
|
|
|
+ 'inputs': gen_vouts_info( gen_prevouts_data() ),
|
|
|
+ 'outputs': gen_vouts_info( gen_wallet_vouts_data() )
|
|
|
+ }
|
|
|
+ self.max_addrlen = {
|
|
|
+ 'inputs': max(len(addr) for addr in gen_all_addrs('inputs')),
|
|
|
+ 'outputs': max(len(addr) for addr in gen_all_addrs('outputs'))
|
|
|
+ }
|
|
|
+ self.inputs_total = total( self.vouts_info['inputs'] )
|
|
|
+ self.outputs_total = self.proto.coin_amt( sum(i['value'] for i in self.tx['decoded']['vout']) )
|
|
|
+ self.wallet_outputs_total = total( self.vouts_info['outputs'] )
|
|
|
+ self.fee = self.inputs_total - self.outputs_total
|
|
|
+ self.nOutputs = len(self.tx['decoded']['vout'])
|
|
|
+ self.confirmations = self.tx['confirmations']
|
|
|
+ self.label = get_best_label()
|
|
|
+ self.vsize = self.tx['decoded'].get('vsize') or self.tx['decoded']['size']
|
|
|
+ self.txid = CoinTxID(self.tx['txid'])
|
|
|
+ self.time = self.tx['time']
|
|
|
+
|
|
|
+ def blockheight_disp(self,color):
|
|
|
+ return (
|
|
|
+ # old/altcoin daemons return no 'blockheight' field, so use confirmations instead
|
|
|
+ Int( self.rpc.blockcount + 1 - self.confirmations ).hl(color=color)
|
|
|
+ if self.confirmations > 0 else None )
|
|
|
+
|
|
|
+ def age_disp(self,age_fmt,width,color):
|
|
|
+ if age_fmt == 'confs':
|
|
|
+ ret_str = str(self.confirmations).rjust(width)
|
|
|
+ return gray(ret_str) if self.confirmations < 0 and color else ret_str
|
|
|
+ elif age_fmt == 'block':
|
|
|
+ ret = (self.rpc.blockcount - (abs(self.confirmations) - 1)) * (-1 if self.confirmations < 0 else 1)
|
|
|
+ ret_str = str(ret).rjust(width)
|
|
|
+ return gray(ret_str) if ret < 0 and color else ret_str
|
|
|
+ else:
|
|
|
+ return self.parent.date_formatter[age_fmt](self.rpc,self.tx.get('blocktime',0))
|
|
|
+
|
|
|
+ def txdate_disp(self,age_fmt):
|
|
|
+ return self.parent.date_formatter[age_fmt](self.rpc,self.time)
|
|
|
+
|
|
|
+ def txid_disp(self,width,color):
|
|
|
+ return self.txid.truncate(width=width,color=color)
|
|
|
+
|
|
|
+ def vouts_list_disp(self,src,color,indent=''):
|
|
|
+
|
|
|
+ fs1,fs2 = {
|
|
|
+ 'inputs': ('{i},{n} {a} {A}', '{i},{n} {a} {A} {l}'),
|
|
|
+ 'outputs': ( '{n} {a} {A}', '{n} {a} {A} {l}')
|
|
|
+ }[src]
|
|
|
+
|
|
|
+ def gen_output():
|
|
|
+ for e in self.vouts_info[src]:
|
|
|
+ mmid = e.twlabel.twmmid if e.twlabel else None
|
|
|
+ if not mmid:
|
|
|
+ yield fs1.format(
|
|
|
+ i = CoinTxID(e.txid).hl(color=color),
|
|
|
+ n = (nocolor,red)[color](str(e.data['n']).ljust(3)),
|
|
|
+ a = CoinAddr(self.proto,e.coin_addr).fmt( width=self.max_addrlen[src], color=color ),
|
|
|
+ A = self.proto.coin_amt( e.data['value'] ).fmt(color=color)
|
|
|
+ ).rstrip()
|
|
|
+ else:
|
|
|
+ bal_star,co = ('*','melon') if mmid in self.unspent_info else ('','brown')
|
|
|
+ addr_out = mmid if mmid.type == 'mmgen' else mmid.split(':',1)[1]
|
|
|
+ yield fs2.format(
|
|
|
+ i = CoinTxID(e.txid).hl(color=color),
|
|
|
+ n = (nocolor,red)[color](str(e.data['n']).ljust(3)),
|
|
|
+ a = TwMMGenID.hlc(
|
|
|
+ '{:{w}}'.format( addr_out + bal_star, w=self.max_addrlen[src] ),
|
|
|
+ color = color,
|
|
|
+ color_override = co ),
|
|
|
+ A = self.proto.coin_amt( e.data['value'] ).fmt(color=color),
|
|
|
+ l = e.twlabel.label.hl(color=color)
|
|
|
+ ).rstrip()
|
|
|
+
|
|
|
+ return f'\n{indent}'.join( gen_output() ).strip()
|
|
|
+
|
|
|
+ def vouts_disp(self,src,width,color):
|
|
|
+
|
|
|
+ class x: space_left = width or 0
|
|
|
+
|
|
|
+ def gen_output():
|
|
|
+ for e in self.vouts_info[src]:
|
|
|
+ mmid = e.twlabel.twmmid if e.twlabel else None
|
|
|
+ bal_star,addr_w,co = ('*',16,'melon') if mmid in self.unspent_info else ('',15,'brown')
|
|
|
+ if not mmid:
|
|
|
+ if width and x.space_left < addr_w:
|
|
|
+ break
|
|
|
+ yield CoinAddr( self.proto, e.coin_addr ).fmt(width=addr_w,color=color)
|
|
|
+ x.space_left -= addr_w
|
|
|
+ elif mmid.type == 'mmgen':
|
|
|
+ mmid_disp = mmid + bal_star
|
|
|
+ if width and x.space_left < len(mmid_disp):
|
|
|
+ break
|
|
|
+ yield TwMMGenID.hlc( mmid_disp, color=color, color_override=co )
|
|
|
+ x.space_left -= len(mmid_disp)
|
|
|
+ else:
|
|
|
+ if width and x.space_left < addr_w:
|
|
|
+ break
|
|
|
+ yield TwMMGenID.hlc(
|
|
|
+ CoinAddr.fmtc( mmid.split(':',1)[1] + bal_star, width=addr_w ),
|
|
|
+ color = color,
|
|
|
+ color_override = co )
|
|
|
+ x.space_left -= addr_w
|
|
|
+ x.space_left -= 1
|
|
|
+
|
|
|
+ return ' '.join(gen_output()) + ' ' * (x.space_left + 1 if width else 0)
|
|
|
+
|
|
|
+ def amt_disp(self,show_total_amt):
|
|
|
+ return (
|
|
|
+ self.outputs_total if show_total_amt else
|
|
|
+ self.wallet_outputs_total )
|
|
|
+
|
|
|
+ def fee_disp(self,color):
|
|
|
+ atomic_unit = self.proto.coin_amt.units[0]
|
|
|
+ return '{} {}'.format(
|
|
|
+ self.fee.hl(color=color),
|
|
|
+ (nocolor,pink)[color]('({:,} {}s/byte)'.format(
|
|
|
+ self.fee.to_unit(atomic_unit) // self.vsize,
|
|
|
+ atomic_unit )) )
|
|
|
+
|
|
|
+class BitcoinTwTxHistory(TwTxHistory,BitcoinTwCommon):
|
|
|
+
|
|
|
+ has_age = True
|
|
|
+ hdr_fmt = 'TRANSACTION HISTORY (sort order: {a})'
|
|
|
+ desc = 'transaction history'
|
|
|
+ item_desc = 'transaction'
|
|
|
+ no_data_errmsg = 'No transactions in tracking wallet!'
|
|
|
+ prompt = """
|
|
|
+Sort options: [t]xid, [a]mt, total a[m]t, [A]ge, [b]locknum, [r]everse
|
|
|
+Column options: toggle [D]ays/date/confs/block, tx[i]d, [T]otal amt
|
|
|
+Filter options: show [u]nconfirmed
|
|
|
+View/Print: pager [v]iew, full [V]iew, screen [p]rint, full [P]rint
|
|
|
+Actions: [q]uit, r[e]draw:
|
|
|
+"""
|
|
|
+ key_mappings = {
|
|
|
+ 'A':'s_age',
|
|
|
+ 'b':'s_blockheight',
|
|
|
+ 'a':'s_amt',
|
|
|
+ 'm':'s_total_amt',
|
|
|
+ 't':'s_txid',
|
|
|
+ 'r':'d_reverse',
|
|
|
+ 'D':'d_days',
|
|
|
+ 'e':'d_redraw',
|
|
|
+ 'u':'d_show_unconfirmed',
|
|
|
+ 'i':'d_show_txid',
|
|
|
+ 'T':'d_show_total_amt',
|
|
|
+ 'q':'a_quit',
|
|
|
+ 'v':'a_view',
|
|
|
+ 'V':'a_view_detail',
|
|
|
+ 'p':'a_print_squeezed',
|
|
|
+ 'P':'a_print_detail' }
|
|
|
+
|
|
|
+ squeezed_fs_fs = ' {{n:>{nw}}} {{d:>{dw}}} {txid_fs}{{a1}} {{A}} {{a2}} {{l}}'
|
|
|
+ squeezed_hdr_fs_fs = ' {{n:>{nw}}} {{d:{dw}}} {txid_fs}{{a1:{aw}}} {{A}} {{a2:{a2w}}} {{l}}'
|
|
|
+
|
|
|
+ async def get_rpc_data(self):
|
|
|
+ blockhash = (
|
|
|
+ await self.rpc.call( 'getblockhash', self.sinceblock )
|
|
|
+ if self.sinceblock else '' )
|
|
|
+ # bitcoin-cli help listsinceblock:
|
|
|
+ # Arguments:
|
|
|
+ # 1. blockhash (string, optional) If set, the block hash to list transactions since,
|
|
|
+ # otherwise list all transactions.
|
|
|
+ # 2. target_confirmations (numeric, optional, default=1) Return the nth block hash from the main
|
|
|
+ # chain. e.g. 1 would mean the best block hash. Note: this is not used
|
|
|
+ # as a filter, but only affects [lastblock] in the return value
|
|
|
+ # 3. include_watchonly (boolean, optional, default=true for watch-only wallets, otherwise
|
|
|
+ # false) Include transactions to watch-only addresses (see
|
|
|
+ # 'importaddress')
|
|
|
+ # 4. include_removed (boolean, optional, default=true) Show transactions that were removed
|
|
|
+ # due to a reorg in the "removed" array (not guaranteed to work on
|
|
|
+ # pruned nodes)
|
|
|
+ return (await self.rpc.call('listsinceblock',blockhash,1,True,False))['transactions']
|
|
|
+
|
|
|
+ async def gen_data(self,rpc_data,lbl_id):
|
|
|
+
|
|
|
+ def gen_parsed_data():
|
|
|
+ for o in rpc_data:
|
|
|
+ if lbl_id in o:
|
|
|
+ l = get_tw_label(self.proto,o[lbl_id])
|
|
|
+ else:
|
|
|
+ assert o['category'] == 'send', f"{o['address']}: {o['category']} != 'send'"
|
|
|
+ l = None
|
|
|
+ o.update({
|
|
|
+ 'twmmid': l.mmid if l else None,
|
|
|
+ 'label': (l.comment or '') if l else None,
|
|
|
+ })
|
|
|
+ yield o
|
|
|
+
|
|
|
+ data = list(gen_parsed_data())
|
|
|
+
|
|
|
+ if g.debug_tw:
|
|
|
+ import json
|
|
|
+ from ....rpc import json_encoder
|
|
|
+ def do_json_dump(*data):
|
|
|
+ nw = f'{self.proto.coin.lower()}-{self.proto.network}'
|
|
|
+ for d,fn_stem in data:
|
|
|
+ open(f'/tmp/{fn_stem}-{nw}.json','w').write(json.dumps(d,cls=json_encoder))
|
|
|
+
|
|
|
+ _mmp = namedtuple('mmap_datum',['twmmid','label'])
|
|
|
+
|
|
|
+ mm_map = {
|
|
|
+ i['address']: (
|
|
|
+ _mmp( TwMMGenID(self.proto,i['twmmid']), TwComment(i['label']) )
|
|
|
+ if i['twmmid'] else _mmp(None,None)
|
|
|
+ )
|
|
|
+ for i in data }
|
|
|
+
|
|
|
+ if self.sinceblock: # mapping data may be incomplete for inputs, so update from 'listlabels'
|
|
|
+ mm_map.update(
|
|
|
+ { addr: _mmp(lbl.mmid, lbl.comment) if lbl else _mmp(None,None) for lbl,addr in
|
|
|
+ [(get_tw_label(self.proto,a), b) for a,b in await self.get_addr_label_pairs()] }
|
|
|
+ )
|
|
|
+
|
|
|
+ msg_r('Getting wallet transactions...')
|
|
|
+ _wallet_txs = await self.rpc.gathered_icall(
|
|
|
+ 'gettransaction',
|
|
|
+ [ (i,True,True) for i in {d['txid'] for d in data} ] )
|
|
|
+ msg('done')
|
|
|
+
|
|
|
+ if not 'decoded' in _wallet_txs[0]:
|
|
|
+ _decoded_txs = iter(
|
|
|
+ await self.rpc.gathered_call(
|
|
|
+ 'decoderawtransaction',
|
|
|
+ [ (d['hex'],) for d in _wallet_txs ] ))
|
|
|
+ for tx in _wallet_txs:
|
|
|
+ tx['decoded'] = next(_decoded_txs)
|
|
|
+
|
|
|
+ if g.debug_tw:
|
|
|
+ do_json_dump((_wallet_txs, 'wallet-txs'),)
|
|
|
+
|
|
|
+ _wip = namedtuple('prevout',['txid','vout'])
|
|
|
+ txdata = [
|
|
|
+ {
|
|
|
+ 'tx': tx,
|
|
|
+ 'wallet_vouts': sorted({i.vout for i in
|
|
|
+ [_wip( CoinTxID(d['txid']), d['vout'] ) for d in data]
|
|
|
+ if i.txid == tx['txid']}),
|
|
|
+ 'prevouts': [_wip( CoinTxID(vin['txid']), vin['vout'] ) for vin in tx['decoded']['vin']]
|
|
|
+ }
|
|
|
+ for tx in _wallet_txs]
|
|
|
+
|
|
|
+ _prevout_txids = {i.txid for d in txdata for i in d['prevouts']}
|
|
|
+
|
|
|
+ msg_r('Getting input transactions...')
|
|
|
+ _prevout_txs = await self.rpc.gathered_call('getrawtransaction', [ (i,True) for i in _prevout_txids ])
|
|
|
+ msg('done')
|
|
|
+
|
|
|
+ _prevout_txs_dict = dict(zip(_prevout_txids,_prevout_txs))
|
|
|
+
|
|
|
+ for d in txdata:
|
|
|
+ d['prevout_txs'] = [_prevout_txs_dict[txid] for txid in {i.txid for i in d['prevouts']} ]
|
|
|
+
|
|
|
+ if g.debug_tw:
|
|
|
+ do_json_dump(
|
|
|
+ (rpc_data, 'txhist-rpc'),
|
|
|
+ (data, 'txhist'),
|
|
|
+ (mm_map, 'mmap'),
|
|
|
+ (_prevout_txs, 'prevout-txs'),
|
|
|
+ (txdata, 'txdata'),
|
|
|
+ )
|
|
|
+
|
|
|
+ unspent_info = await self.get_unspent_by_mmid()
|
|
|
+
|
|
|
+ return (
|
|
|
+ BitcoinTwTransaction(
|
|
|
+ parent = self,
|
|
|
+ proto = self.proto,
|
|
|
+ rpc = self.rpc,
|
|
|
+ idx = idx,
|
|
|
+ unspent_info = unspent_info,
|
|
|
+ mm_map = mm_map,
|
|
|
+ **d ) for idx,d in enumerate(txdata) )
|