txhistory.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. proto.btc.tw.txhistory: Bitcoin base protocol tracking wallet transaction history class
  12. """
  13. from collections import namedtuple
  14. from ....tw.txhistory import TwTxHistory
  15. from ....tw.shared import get_tw_label, TwMMGenID
  16. from ....addr import CoinAddr
  17. from ....util import msg, msg_r
  18. from ....color import nocolor, red, pink, gray
  19. from ....obj import TwComment, CoinTxID, Int
  20. from .rpc import BitcoinTwRPC
  21. from .view import BitcoinTwView
  22. class BitcoinTwTransaction:
  23. no_address_str = '[DATA]'
  24. def __init__(self, *, parent, proto, rpc,
  25. idx, # unique numeric identifier of this transaction in listing
  26. unspent_info, # addrs in wallet with balances: {'mmid': {'addr', 'comment', 'amt'}}
  27. mm_map, # all addrs in wallet: ['addr', ['twmmid', 'comment']]
  28. tx, # the decoded transaction data
  29. wallet_vouts, # list of ints - wallet-related vouts
  30. prevouts, # list of (txid,vout) pairs
  31. prevout_txs # decoded transaction data for prevouts
  32. ):
  33. self.parent = parent
  34. self.proto = proto
  35. self.rpc = rpc
  36. self.idx = idx
  37. self.unspent_info = unspent_info
  38. self.tx = tx
  39. def gen_prevouts_data():
  40. _d = namedtuple('prevout_data', ['txid', 'data'])
  41. for tx in prevout_txs:
  42. for e in prevouts:
  43. if e.txid == tx['txid']:
  44. yield _d(e.txid, tx['vout'][e.vout])
  45. def gen_wallet_vouts_data():
  46. _d = namedtuple('wallet_vout_data', ['txid', 'data'])
  47. txid = self.tx['txid']
  48. vouts = self.tx['decoded']['vout']
  49. for n in wallet_vouts:
  50. yield _d(txid, vouts[n])
  51. def gen_vouts_info(data):
  52. _d = namedtuple('vout_info', ['txid', 'coin_addr', 'twlabel', 'data'])
  53. def gen():
  54. for d in data:
  55. addr = (
  56. d.data['scriptPubKey'].get('address') or
  57. d.data['scriptPubKey'].get('addresses',[self.no_address_str])[0])
  58. yield _d(
  59. txid = d.txid,
  60. coin_addr = addr,
  61. twlabel = mm_map[addr] if (addr in mm_map and mm_map[addr].twmmid) else None,
  62. data = d.data)
  63. return sorted(
  64. gen(),
  65. # if address is not MMGen, ignore address and sort by TxID + vout only
  66. key = lambda d: (
  67. (d.twlabel.twmmid.sort_key if d.twlabel and d.twlabel.twmmid.type == 'mmgen' else '')
  68. + '_'
  69. + d.txid
  70. + '{:08d}'.format(d.data['n'])
  71. ))
  72. def gen_all_addrs(src):
  73. for e in self.vouts_info[src]:
  74. if e.twlabel:
  75. mmid = e.twlabel.twmmid
  76. yield (
  77. (mmid if mmid.type == 'mmgen' else mmid.split(':', 1)[1]) +
  78. ('*' if mmid in self.unspent_info else '')
  79. )
  80. else:
  81. yield e.coin_addr
  82. def total(data):
  83. return sum(coin_amt(d.data['value']) for d in data)
  84. def get_best_comment():
  85. """
  86. find the most relevant comment for tabular (squeezed) display
  87. """
  88. def vouts_labels(src):
  89. return [d.twlabel.comment for d in self.vouts_info[src] if d.twlabel and d.twlabel.comment]
  90. ret = vouts_labels('outputs') or vouts_labels('inputs')
  91. return ret[0] if ret else TwComment('')
  92. coin_amt = self.proto.coin_amt
  93. # 'outputs' refers to wallet-related outputs only
  94. self.vouts_info = {
  95. 'inputs': gen_vouts_info(gen_prevouts_data()),
  96. 'outputs': gen_vouts_info(gen_wallet_vouts_data())
  97. }
  98. self.max_addrlen = {
  99. 'inputs': max(len(addr) for addr in gen_all_addrs('inputs')),
  100. 'outputs': max(len(addr) for addr in gen_all_addrs('outputs'))
  101. }
  102. self.inputs_total = total(self.vouts_info['inputs'])
  103. self.outputs_total = sum(coin_amt(i['value']) for i in self.tx['decoded']['vout'])
  104. self.wallet_outputs_total = total(self.vouts_info['outputs'])
  105. self.fee = self.inputs_total - self.outputs_total
  106. self.nOutputs = len(self.tx['decoded']['vout'])
  107. self.confirmations = self.tx['confirmations']
  108. self.comment = get_best_comment()
  109. self.vsize = self.tx['decoded'].get('vsize') or self.tx['decoded']['size']
  110. self.txid = CoinTxID(self.tx['txid'])
  111. # Though 'blocktime' is flagged as an “optional” field, it’s always present for transactions
  112. # that are in the blockchain. However, Bitcoin Core wallet saves a record of broadcast but
  113. # unconfirmed transactions, e.g. replaced transactions, and the 'blocktime' field is missing
  114. # for these, so use 'time' as a fallback.
  115. self.time = self.tx.get('blocktime') or self.tx['time']
  116. self.time_received = self.tx.get('timereceived')
  117. def blockheight_disp(self, *, color):
  118. return (
  119. # old/altcoin daemons return no 'blockheight' field, so use confirmations instead
  120. Int(self.rpc.blockcount + 1 - self.confirmations).hl(color=color)
  121. if self.confirmations > 0 else None)
  122. def age_disp(self, age_fmt, *, width, color):
  123. if age_fmt == 'confs':
  124. ret_str = str(self.confirmations).ljust(width)
  125. return gray(ret_str) if self.confirmations < 0 and color else ret_str
  126. elif age_fmt == 'block':
  127. ret = (self.rpc.blockcount - (abs(self.confirmations) - 1)) * (-1 if self.confirmations < 0 else 1)
  128. ret_str = str(ret).ljust(width)
  129. return gray(ret_str) if ret < 0 and color else ret_str
  130. else:
  131. return self.parent.date_formatter[age_fmt](self.rpc, self.tx.get('blocktime', 0))
  132. def txdate_disp(self, age_fmt):
  133. return self.parent.date_formatter[age_fmt](self.rpc, self.time)
  134. def txid_disp(self, *, color, width=None):
  135. return self.txid.hl(color=color) if width is None else self.txid.truncate(width, color=color)
  136. def vouts_list_disp(self, src, color, indent, addr_view_pref):
  137. fs1, fs2 = {
  138. 'inputs': ('{i},{n} {a} {A}', '{i},{n} {a} {A} {l}'),
  139. 'outputs': ( '{n} {a} {A}', '{n} {a} {A} {l}')
  140. }[src]
  141. def gen_output():
  142. for e in self.vouts_info[src]:
  143. mmid = e.twlabel.twmmid if e.twlabel else None
  144. if not mmid:
  145. yield fs1.format(
  146. i = CoinTxID(e.txid).hl(color=color),
  147. n = (nocolor, red)[color](str(e.data['n']).ljust(3)),
  148. a = CoinAddr(self.proto, e.coin_addr).fmt(
  149. addr_view_pref, self.max_addrlen[src], color=color)
  150. if e.coin_addr != self.no_address_str else
  151. CoinAddr.fmtc(e.coin_addr, self.max_addrlen[src], color=color),
  152. A = self.proto.coin_amt(e.data['value']).fmt(color=color)
  153. ).rstrip()
  154. else:
  155. bal_star, co = ('*', 'melon') if mmid in self.unspent_info else ('', 'brown')
  156. addr_out = mmid if mmid.type == 'mmgen' else mmid.split(':', 1)[1]
  157. yield fs2.format(
  158. i = CoinTxID(e.txid).hl(color=color),
  159. n = (nocolor, red)[color](str(e.data['n']).ljust(3)),
  160. a = TwMMGenID.hl2(
  161. TwMMGenID,
  162. s = '{:{w}}'.format(addr_out + bal_star, w=self.max_addrlen[src]),
  163. color = color,
  164. color_override = co),
  165. A = self.proto.coin_amt(e.data['value']).fmt(color=color),
  166. l = e.twlabel.comment.hl(color=color)
  167. ).rstrip()
  168. return f'\n{indent}'.join(gen_output()).strip()
  169. def vouts_disp(self, src, width, color, addr_view_pref):
  170. def gen_output():
  171. nonlocal space_left
  172. for e in self.vouts_info[src]:
  173. mmid = e.twlabel.twmmid if e.twlabel else None
  174. bal_star, addr_w, co = ('*', 16, 'melon') if mmid in self.unspent_info else ('', 15, 'brown')
  175. if not mmid:
  176. if width and space_left < addr_w:
  177. break
  178. yield (
  179. CoinAddr(self.proto, e.coin_addr).fmt(addr_view_pref, addr_w, color=color)
  180. if e.coin_addr != self.no_address_str else
  181. CoinAddr.fmtc(e.coin_addr, addr_w, color=color))
  182. space_left -= addr_w
  183. elif mmid.type == 'mmgen':
  184. mmid_disp = mmid + bal_star
  185. if width and space_left < len(mmid_disp):
  186. break
  187. yield TwMMGenID.hl2(TwMMGenID, s=mmid_disp, color=color, color_override=co)
  188. space_left -= len(mmid_disp)
  189. else:
  190. if width and space_left < addr_w:
  191. break
  192. yield TwMMGenID.hl2(
  193. TwMMGenID,
  194. s = CoinAddr.fmtc(mmid.split(':', 1)[1] + bal_star, addr_w),
  195. color = color,
  196. color_override = co)
  197. space_left -= addr_w
  198. space_left -= 1
  199. space_left = width or 0
  200. return ' '.join(gen_output()) + ' ' * (space_left + 1 if width else 0)
  201. def amt_disp(self, show_total_amt):
  202. return (
  203. self.outputs_total if show_total_amt else
  204. self.wallet_outputs_total)
  205. def fee_disp(self, color):
  206. atomic_unit = self.proto.coin_amt.units[0]
  207. return '{} {}'.format(
  208. self.fee.hl(color=color),
  209. (nocolor, pink)[color]('({:,} {}s/byte)'.format(
  210. self.fee.to_unit(atomic_unit) // self.vsize,
  211. atomic_unit)))
  212. class BitcoinTwTxHistory(BitcoinTwView, TwTxHistory, BitcoinTwRPC):
  213. has_age = True
  214. hdr_lbl = 'transaction history'
  215. desc = 'transaction history'
  216. item_desc = 'transaction'
  217. item_desc_pl = 'transactions'
  218. prompt_fs_in = [
  219. 'Sort options: [t]xid, [a]mt, total a[m]t, [A]ge, block[n]um, [r]everse',
  220. 'Column options: toggle [D]ays/date/confs/block, tx[i]d, [T]otal amt',
  221. 'View/Print: pager [v]iew, full pager [V]iew, [p]rint, full [P]rint{s}',
  222. 'Filters/Actions: show [u]nconfirmed, [q]uit menu, r[e]draw:']
  223. prompt_fs_repl = {
  224. 'BCH': (1, 'Column options: toggle [D]ate/confs, cas[h]addr, tx[i]d, [T]otal amt')
  225. }
  226. key_mappings = {
  227. 'A':'s_age',
  228. 'n':'s_blockheight',
  229. 'a':'s_amt',
  230. 'm':'s_total_amt',
  231. 't':'s_txid',
  232. 'r':'s_reverse',
  233. 'D':'d_days',
  234. 'e':'d_redraw',
  235. 'u':'d_show_unconfirmed',
  236. 'i':'d_show_txid',
  237. 'T':'d_show_total_amt',
  238. 'v':'a_view',
  239. 'V':'a_view_detail',
  240. 'p':'a_print_squeezed',
  241. 'P':'a_print_detail'}
  242. async def get_rpc_data(self):
  243. blockhash = (
  244. await self.rpc.call('getblockhash', self.sinceblock)
  245. if self.sinceblock else '')
  246. # bitcoin-cli help listsinceblock:
  247. # Arguments:
  248. # 1. blockhash (string, optional) If set, the block hash to list transactions since,
  249. # otherwise list all transactions.
  250. # 2. target_confirmations (numeric, optional, default=1) Return the nth block hash from the main
  251. # chain. e.g. 1 would mean the best block hash. Note: this is not used
  252. # as a filter, but only affects [lastblock] in the return value
  253. # 3. include_watchonly (boolean, optional, default=true for watch-only wallets, otherwise
  254. # false) Include transactions to watch-only addresses
  255. # 4. include_removed (boolean, optional, default=true) Show transactions that were removed
  256. # due to a reorg in the "removed" array (not guaranteed to work on
  257. # pruned nodes)
  258. return (await self.rpc.call('listsinceblock', blockhash, 1, True, False))['transactions']
  259. async def gen_data(self, rpc_data, lbl_id):
  260. def gen_parsed_data():
  261. for o in rpc_data:
  262. if lbl_id in o:
  263. l = get_tw_label(self.proto, o[lbl_id])
  264. else:
  265. assert o['category'] == 'send', f"{o['address']}: {o['category']} != 'send'"
  266. l = None
  267. o.update({
  268. 'twmmid': l.mmid if l else None,
  269. 'comment': (l.comment or '') if l else None,
  270. })
  271. yield o
  272. data = list(gen_parsed_data())
  273. if self.cfg.debug_tw:
  274. import json
  275. from ....rpc.util import json_encoder
  276. def do_json_dump(*data):
  277. nw = f'{self.proto.coin.lower()}-{self.proto.network}'
  278. for d, fn_stem in data:
  279. with open(f'/tmp/{fn_stem}-{nw}.json', 'w') as fh:
  280. fh.write(json.dumps(d, cls=json_encoder))
  281. _mmp = namedtuple('mmap_datum', ['twmmid', 'comment'])
  282. mm_map = {
  283. i['address']: (
  284. _mmp(TwMMGenID(self.proto, i['twmmid']), TwComment(i['comment']))
  285. if i['twmmid'] else _mmp(None, None)
  286. )
  287. for i in data if 'address' in i}
  288. if self.sinceblock: # mapping data may be incomplete for inputs, so update from 'listlabels'
  289. mm_map.update(
  290. {e.coinaddr: _mmp(e.label.mmid, e.label.comment) if e.label else _mmp(None, None)
  291. for e in await self.get_label_addr_pairs()}
  292. )
  293. msg_r('Getting wallet transactions...')
  294. _wallet_txs = await self.rpc.gathered_icall(
  295. 'gettransaction',
  296. [(i, True, True) for i in {d['txid'] for d in data}])
  297. msg('done')
  298. if not 'decoded' in _wallet_txs[0]:
  299. _decoded_txs = iter(
  300. await self.rpc.gathered_call(
  301. 'decoderawtransaction',
  302. [(d['hex'],) for d in _wallet_txs]))
  303. for tx in _wallet_txs:
  304. tx['decoded'] = next(_decoded_txs)
  305. if self.cfg.debug_tw:
  306. do_json_dump((_wallet_txs, 'wallet-txs'),)
  307. _wip = namedtuple('prevout', ['txid', 'vout'])
  308. txdata = [
  309. {
  310. 'tx': tx,
  311. 'wallet_vouts': sorted({i.vout for i in
  312. [_wip(CoinTxID(d['txid']), d['vout']) for d in data]
  313. if i.txid == tx['txid']}),
  314. 'prevouts': [_wip(CoinTxID(vin['txid']), vin['vout']) for vin in tx['decoded']['vin']]
  315. }
  316. for tx in _wallet_txs]
  317. _prevout_txids = {i.txid for d in txdata for i in d['prevouts']}
  318. msg_r('Getting input transactions...')
  319. _prevout_txs = await self.rpc.gathered_call('getrawtransaction', [(i, True) for i in _prevout_txids])
  320. msg('done')
  321. _prevout_txs_dict = dict(zip(_prevout_txids, _prevout_txs))
  322. for d in txdata:
  323. d['prevout_txs'] = [_prevout_txs_dict[txid] for txid in {i.txid for i in d['prevouts']}]
  324. if self.cfg.debug_tw:
  325. do_json_dump(
  326. (rpc_data, 'txhist-rpc'),
  327. (data, 'txhist'),
  328. (mm_map, 'mmap'),
  329. (_prevout_txs, 'prevout-txs'),
  330. (txdata, 'txdata'),
  331. )
  332. unspent_info = await self.get_unspent_by_mmid()
  333. return (
  334. BitcoinTwTransaction(
  335. parent = self,
  336. proto = self.proto,
  337. rpc = self.rpc,
  338. idx = idx,
  339. unspent_info = unspent_info,
  340. mm_map = mm_map,
  341. **d) for idx, d in enumerate(txdata))