#!/usr/bin/env python3 # # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution # Copyright (C)2013-2021 The MMGen Project # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation, either version 3 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . """ mmgen_node_tools.BlocksInfo: Display information about a block or range of blocks """ import re, json from collections import namedtuple from time import strftime, gmtime from decimal import Decimal from mmgen.util import msg, Msg, Msg_r, die, suf, secs_to_ms, secs_to_dhms, is_int from mmgen.rpc.util import json_encoder class RangeParser: debug = False def __init__(self, caller, arg): self.caller = caller self.arg = self.orig_arg = arg def parse(self, target): ret = getattr(self, 'parse_'+target)() if self.debug: msg(f'arg after parse({target}): {self.arg}') return ret def finalize(self): if self.arg: die(1, f'{self.orig_arg!r}: invalid range specifier') def parse_from_tip(self): m = re.match(r'-([0-9]+)(.*)', self.arg) if m: res, self.arg = (m[1], m[2]) return self.caller.check_nblocks(int(res)) def parse_abs_range(self): m = re.match(r'([^+-]+)(-([^+-]+)){0,1}(.*)', self.arg) if m: if self.debug: msg(f'abs_range parse: first={m[1]}, last={m[3]}') self.arg = m[4] return ( self.caller.conv_blkspec(m[1]), self.caller.conv_blkspec(m[3]) if m[3] else None) return (None, None) def parse_add(self): m = re.match(r'\+([0-9*]+)(.*)', self.arg) if m: res, self.arg = (m[1], m[2]) if res.strip('*') != res: die(1, f"'+{res}': malformed nBlocks specifier") if len(res) > 30: die(1, f"'+{res}': overly long nBlocks specifier") return self.caller.check_nblocks(eval(res)) # res is only digits plus '*', so eval safe class BlocksInfo: total_bytes = 0 total_weight = 0 total_solve_time = 0 header_printed = False bf = namedtuple('block_info_fields', ['fmt_func', 'src', 'fs', 'hdr1', 'hdr2', 'key1', 'key2']) # bh=getblockheader, bs=getblockstats, lo=local fields = { 'block': bf(None, 'bh', '{:<6}', '', 'Block', 'height', None), 'hash': bf(None, 'bh', '{:<64}', '', 'Hash', 'hash', None), 'date': bf('da', 'bh', '{:<19}', '', 'Date', 'time', None), 'interval': bf('td', 'lo', '{:>8}', 'Solve', 'Time ', 'interval', None), 'subsidy': bf('su', 'bs', '{:<5}', 'Sub-', 'sidy', 'subsidy', None), 'totalfee': bf('tf', 'bs', '{:>10}', '', 'Total Fee', 'totalfee', None), 'size': bf(None, 'bs', '{:>7}', '', 'Size', 'total_size', None), 'weight': bf(None, 'bs', '{:>7}', '', 'Weight', 'total_weight', None), 'fee90': bf('fe', 'bs', '{:>3}', '90%', 'Fee', 'feerate_percentiles', 4), 'fee75': bf('fe', 'bs', '{:>3}', '75%', 'Fee', 'feerate_percentiles', 3), 'fee50': bf('fe', 'bs', '{:>3}', '50%', 'Fee', 'feerate_percentiles', 2), 'fee25': bf('fe', 'bs', '{:>3}', '25%', 'Fee', 'feerate_percentiles', 1), 'fee10': bf('fe', 'bs', '{:>3}', '10%', 'Fee', 'feerate_percentiles', 0), 'fee_max': bf('fe', 'bs', '{:>5}', 'Max', 'Fee', 'maxfeerate', None), 'fee_avg': bf('fe', 'bs', '{:>3}', 'Avg', 'Fee', 'avgfeerate', None), 'fee_min': bf('fe', 'bs', '{:>3}', 'Min', 'Fee', 'minfeerate', None), 'nTx': bf(None, 'bh', '{:>5}', '', ' nTx ', 'nTx', None), 'inputs': bf(None, 'bs', '{:>5}', 'In- ', 'puts', 'ins', None), 'outputs': bf(None, 'bs', '{:>5}', 'Out-', 'puts', 'outs', None), 'utxo_inc': bf(None, 'bs', '{:>6}', ' UTXO', ' Incr', 'utxo_increase', None), 'version': bf(None, 'bh', '{:<8}', '', 'Version', 'versionHex', None), 'difficulty': bf('di', 'bh', '{:<8}', 'Diffi-','culty', 'difficulty', None), 'miner': bf(None, 'lo', '{:<5}', '', 'Miner', 'miner', None)} dfl_fields = ( 'block', 'date', 'interval', 'subsidy', 'totalfee', 'size', 'weight', 'fee50', 'fee25', 'fee10', 'fee_avg', 'fee_min', 'version') fixed_fields = ( 'block', # until ≈ 09/01/2028 (block 1000000) 'hash', 'date', 'size', # until ≈ 6x block size increase 'weight', # until ≈ 2.5x block size increase 'version', 'subsidy', # until ≈ 01/04/2028 (increases by 1 digit per halving until 9th halving [max 10 digits]) 'difficulty') # until 1.00e+100 (i.e. never) # column width adjustment data: fs_lsqueeze = ('totalfee', 'inputs', 'outputs', 'nTx') fs_rsqueeze = () fs_groups = ( ('fee10', 'fee25', 'fee50', 'fee75', 'fee90', 'fee_avg', 'fee_min', 'fee_max')) fs_lsqueeze2 = ('interval',) all_stats = ['col_avg', 'range', 'avg', 'mini_avg', 'total', 'diff'] dfl_stats = ['range', 'mini_avg', 'diff'] noindent_stats = ['col_avg'] avg_stats_skip = {'block', 'hash', 'date', 'version', 'miner'} range_data = namedtuple('parsed_range_data', ['first', 'last', 'from_tip', 'nblocks', 'step']) t_fmt = lambda self, t: f'{t/86400:.2f} days' if t > 172800 else f'{t/3600:.2f} hrs' @classmethod def parse_cslist(cls, uarg, full_set, dfl_set, desc): def make_list(m, func): groups_lc = [set(e.lower() for e in gi.split(',')) for gi in m.groups()] for group in groups_lc: for e in group: if e not in full_set_lc: die(1, f'{e!r}: unrecognized {desc}') # display elements in order: return [e for e in full_set if e.lower() in func(groups_lc)] full_set_lc = set(e.lower() for e in full_set) dfl_set_lc = set(e.lower() for e in dfl_set) cspat = r'(\w+(?:,\w+)*)' for pat, func in ( (rf'{cspat}$', lambda g: g[0]), (rf'\+{cspat}$', lambda g: dfl_set_lc | g[0]), (rf'\-{cspat}$', lambda g: dfl_set_lc - g[0]), (rf'\+{cspat}\-{cspat}$', lambda g: (dfl_set_lc | g[0]) - g[1]), (rf'\-{cspat}\+{cspat}$', lambda g: (dfl_set_lc - g[0]) | g[1]), (rf'all\-{cspat}$', lambda g: full_set_lc - g[0])): m = re.match(pat, uarg, re.ASCII|re.IGNORECASE) if m: return make_list(m, func) else: die(1, f'{uarg}: invalid parameter') def __init__(self, cfg, cmd_args, rpc): def parse_cs_uarg(uarg, full_set, dfl_set, desc): return ( full_set if uarg == 'all' else [] if uarg == 'none' else self.parse_cslist(uarg, full_set, dfl_set, desc)) def get_fields(): return parse_cs_uarg(self.cfg.fields, list(self.fields), self.dfl_fields, 'field') def get_stats(): return parse_cs_uarg(self.cfg.stats.lower(), self.all_stats, self.dfl_stats, 'stat') def parse_cmd_args(): # => (block_list, first, last, step) match cmd_args: case [] | None: return (None, self.tip, self.tip, None) case [arg]: r = self.parse_rangespec(arg) return ( list(range(r.first, r.last+1, r.step)) if r.step else None, r.first, r.last, r.step) case [*args]: return ([self.conv_blkspec(a) for a in args], None, None, None) self.cfg = cfg self.rpc = rpc self.tip = rpc.blockcount from_satoshi = self.rpc.proto.coin_amt.satoshi to_satoshi = 1 / from_satoshi self.block_list, self.first, self.last, self.step = parse_cmd_args() have_segwit = self.rpc.info('segwit_is_active') if not have_segwit: del self.fields['weight'] self.dfl_fields = tuple(f for f in self.dfl_fields if f != 'weight') self.stats_deps = { 'avg': set(self.fields) - self.avg_stats_skip, 'col_avg': set(self.fields) - self.avg_stats_skip, 'mini_avg': {'interval', 'size'} | ({'weight'} if have_segwit else set()), 'total': {'interval', 'subsidy', 'totalfee', 'nTx', 'inputs', 'outputs', 'utxo_inc'}, 'range': {}, 'diff': {}} self.fmt_funcs = { 'da': lambda arg: strftime('%Y-%m-%d %X', gmtime(arg)), 'td': lambda arg: ( '-{:02}:{:02}'.format(abs(arg)//60, abs(arg)%60) if arg < 0 else ' {:02}:{:02}'.format(arg//60, arg%60)), 'tf': lambda arg: '{:.8f}'.format(arg * from_satoshi), 'su': lambda arg: str(arg * from_satoshi).rstrip('0').rstrip('.'), 'fe': lambda arg: str(arg), 'di': lambda arg: '{:.2e}'.format(Decimal(arg))} if self.cfg.coin == 'BCH': self.fmt_funcs.update({ 'su': lambda arg: str(arg).rstrip('0').rstrip('.'), 'fe': lambda arg: str(int(Decimal(arg) * to_satoshi)), 'tf': lambda arg: '{:.8f}'.format(Decimal(arg))}) self.fnames = tuple( [f for f in self.fields if self.fields[f].src == 'bh' or f == 'interval'] if self.cfg.header_info else get_fields() if self.cfg.fields else self.dfl_fields) if self.cfg.miner_info and 'miner' not in self.fnames: self.fnames += ('miner',) self.stats = get_stats() if self.cfg.stats else self.dfl_stats # Display diff stats by default only if user-requested range ends with chain tip if 'diff' in self.stats and not self.cfg.stats and self.last != self.tip: self.stats.remove('diff') if {'avg', 'col_avg'} <= set(self.stats) and self.cfg.stats_only: self.stats.remove('col_avg') if {'avg', 'mini_avg'} <= set(self.stats): self.stats.remove('mini_avg') if self.cfg.full_stats: add_fnames = {fname for sname in self.stats for fname in self.stats_deps[sname]} self.fnames = tuple(f for f in self.fields if f in {'block'} | set(self.fnames) | add_fnames) else: if 'col_avg' in self.stats and not self.fnames: self.stats.remove('col_avg') # self.fnames is now finalized self.fvals = [self.fields[name] for name in self.fnames] self.fs = ''.join(self.gen_fs(self.fnames)).strip() self.bs_keys = set( [v.key1 for v in self.fvals if v.src == 'bs'] + ['total_size'] + (['total_weight'] if have_segwit else [])) if 'miner' in self.fnames: # capturing parens must contain only ASCII chars! self.miner_pats = [re.compile(pat) for pat in ( rb'`/([_a-zA-Z0-9&. #/-]+)/', rb'[\xe3\xe4\xe5][\^/]([\x20-\x7e]+?)\xfa', rb'([a-zA-Z0-9&. -]+/Mined by [a-zA-Z0-9. ]+)', rb'\x08/(.*Mined by [a-zA-Z0-9. ]+)', rb'Mined by ([a-zA-Z0-9. ]+)', rb'[`]([_a-zA-Z0-9&. #/-]+)[/\xfa]', rb'([\x20-\x7e]{9,})', rb'[/^]([a-zA-Z0-9&. #/-]{5,})', rb'[/^]([_a-zA-Z0-9&. #/-]+)/', rb'^\x03...\W{0,5}([\\_a-zA-Z0-9&. #/-]+)[/\\]')] self.block_data = namedtuple('block_data', self.fnames) self.deps = {v.src for v in self.fvals} def gen_fs(self, fnames, fill=[], fill_char='-', add_name=False): for i in range(len(fnames)): name = fnames[i] ls = (' ', '')[name in self.fs_lsqueeze + self.fs_lsqueeze2] rs = (' ', '')[name in self.fs_rsqueeze] if i < len(fnames) - 1 and fnames[i+1] in self.fs_lsqueeze2: rs = '' if i: for group in self.fs_groups: if name in group and fnames[i-1] in group: ls = '' break repl = (name if add_name else '') + ':' + (fill_char if name in fill else '') yield (ls + self.fields[name].fs.replace(':', repl) + rs) def conv_blkspec(self, arg): match arg: case str() if arg.lower() == 'cur': return self.tip case x if is_int(x): match int(arg): case x if x < 0: die(1, f'{x}: block number must be non-negative') case x if x > self.tip: die(1, f'{x}: requested block height greater than current chain tip!') case x: return x case _: die(1, f'{arg}: invalid block specifier') def check_nblocks(self, arg): match arg: case x if x <= 0: die(1, 'nBlocks must be a positive integer') case x if x > self.tip: die(1, f'{arg}: nBlocks must be less than current chain height') case _: return arg def parse_rangespec(self, arg): p = RangeParser(self, arg) from_tip = p.parse('from_tip') first, last = (self.tip-from_tip, None) if from_tip else p.parse('abs_range') add1 = p.parse('add') add2 = p.parse('add') p.finalize() if add2 and last is not None: die(1, f'{arg!r}: invalid range specifier') nblocks, step = (add1, add2) if last is None else (None, add1) if p.debug: msg(repr(self.range_data(first, last, from_tip, nblocks, step))) if nblocks: if first is None: first = self.tip - nblocks + 1 last = first + nblocks - 1 first = self.conv_blkspec(first) last = self.conv_blkspec(last or first) if p.debug: msg(repr(self.range_data(first, last, from_tip, nblocks, step))) if first > last: die(1, f'{first}-{last}: invalid block range') return self.range_data(first, last, from_tip, nblocks, step) async def process_blocks(self): async def get_hdrs(heights): hashes = await c.gathered_call('getblockhash',[(height,) for height in heights]) return await c.gathered_call('getblockheader',[(H,) for H in hashes]) c = self.rpc heights = self.block_list or range(self.first, self.last+1) self.hdrs = await get_hdrs(heights) if self.block_list: self.prev_hdrs = await get_hdrs([(n-1 if n else 0) for n in self.block_list]) self.first_prev_hdr = self.prev_hdrs[0] else: self.first_prev_hdr = ( self.hdrs[0] if heights[0] == 0 else await c.call('getblockheader', await c.call('getblockhash', heights[0]-1))) self.t_cur = self.first_prev_hdr['time'] self.res = [] for n in range(len(heights)): if self.block_list: self.t_cur = self.prev_hdrs[n]['time'] ret = await self.process_block(self.hdrs[n]) self.res.append(ret) if self.fnames and not self.cfg.stats_only: self.output_block(ret, n) def output_block(self, data, n): def gen(): for k, v in data._asdict().items(): func = self.fields[k].fmt_func yield self.fmt_funcs[func](v) if func else v Msg(self.fs.format(*gen())) async def process_block(self, hdr): self.t_diff = hdr['time'] - self.t_cur self.t_cur = hdr['time'] self.total_solve_time += self.t_diff blk_data = { 'bh': hdr, 'lo': {'interval': self.t_diff}} if 'bs' in self.deps: bs = ( self.genesis_stats if hdr['height'] == 0 else await self.rpc.call('getblockstats', hdr['hash'], list(self.bs_keys))) self.total_bytes += bs['total_size'] if 'total_weight' in bs: self.total_weight += bs['total_weight'] blk_data['bs'] = bs if 'miner' in self.fnames: blk_data['lo']['miner'] = '-' if hdr['height'] == 0 else await self.get_miner_string(hdr['hash']) def gen(): for v in self.fvals: yield ( blk_data[v.src][v.key1] if v.key2 is None else blk_data[v.src][v.key1][v.key2]) return self.block_data(*gen()) async def get_miner_string(self, H): tx0 = (await self.rpc.call('getblock', H))['tx'][0] bd = await self.rpc.call('getrawtransaction', tx0, 1) if type(bd) == tuple: return '---' else: cb = bytes.fromhex(bd['vin'][0]['coinbase']) if self.cfg.raw_miner_info: return repr(cb) else: trmap_in = { '\\': ' ', '/': ' ', ',': ' '} trmap = {ord(a): b for a, b in trmap_in.items()} for pat in self.miner_pats: m = pat.search(cb) if m: return re.sub(r'\s+', ' ', m[1].decode().strip('^').translate(trmap).strip()) return '' def print_header(self): Msg('\n'.join(self.gen_header())) self.header_printed = True def gen_header(self): hdr1 = [v.hdr1 for v in self.fvals] hdr2 = [v.hdr2 for v in self.fvals] if ''.join(hdr1): yield self.fs.format(*hdr1) yield self.fs.format(*hdr2) def process_stats(self, sname): method = getattr(self, f'create_{sname}_stats', None) return self.output_stats(method() if method else self.create_stats(sname), sname) def fmt_stat_item(self, fs, s): return fs.format(s) if type(fs) == str else fs(s) async def output_stats(self, res, sname): def gen(data): for d in data: match d: case [a, b]: yield (indent + a).format(**{k: self.fmt_stat_item(*v) for k, v in b.items()}) case [a, _, b, c]: yield (indent + a).format(self.fmt_stat_item(b, c)) case str(): yield d case _: assert False, f'{d}: invalid stats data' foo, data = await res indent = '' if sname in self.noindent_stats else ' ' Msg('\n'.join(gen(data))) async def create_range_stats(self): # These figures don’t include the Genesis Block: elapsed = self.hdrs[-1]['time'] - self.first_prev_hdr['time'] nblocks = self.hdrs[-1]['height'] - self.first_prev_hdr['height'] total_blks = len(self.hdrs) step_disp = f', nBlocks={total_blks}, step={self.step}' if self.step else '' def gen(): yield 'Range Statistics:' yield ( 'Range: {start}-{end} ({range} blocks [{elapsed}]%s)' % step_disp, { 'start': ('{}', self.hdrs[0]['height']), 'end': ('{}', self.hdrs[-1]['height']), 'range': ('{}', self.hdrs[-1]['height'] - self.hdrs[0]['height'] + 1), 'elapsed': (self.t_fmt, elapsed), 'nBlocks': ('{}', total_blks), 'step': ('{}', self.step)}) if elapsed: yield ('Start: {}', 'start_date', self.fmt_funcs['da'], self.hdrs[0]['time']) yield ('End: {}', 'end_date', self.fmt_funcs['da'], self.hdrs[-1]['time']) yield ('Avg BDI: {} min', 'avg_bdi', '{:.2f}', elapsed / nblocks / 60) return ('range', gen()) async def create_diff_stats(self): c = self.rpc rel = self.tip % self.rpc.proto.diff_adjust_interval tip_hdr = ( self.hdrs[-1] if self.hdrs[-1]['height'] == self.tip else await c.call('getblockheader', await c.call('getblockhash', self.tip))) min_sample_blks = 432 # ≈3 days rel_hdr = await c.call('getblockheader', await c.call('getblockhash', self.tip-rel)) if rel >= min_sample_blks: sample_blks = rel bdi = (tip_hdr['time'] - rel_hdr['time']) / rel else: sample_blks = min(min_sample_blks, self.tip) start_hdr = await c.call('getblockheader', await c.call('getblockhash', self.tip-sample_blks)) diff_adj = Decimal(tip_hdr['difficulty']) / Decimal(start_hdr['difficulty']) time1 = rel_hdr['time'] - start_hdr['time'] time2 = tip_hdr['time'] - rel_hdr['time'] bdi = ((time1 * diff_adj) + time2) / sample_blks rem = self.rpc.proto.diff_adjust_interval - rel return ('difficulty', ( 'Difficulty Statistics:', ('Current height: {}', 'chain_tip', '{}', self.tip), ('Next diff adjust: {next_diff_adjust} (in {blks_remaining} block%s [{time_remaining}])' % suf(rem), { 'next_diff_adjust': ('{}', self.tip + rem), 'blks_remaining': ('{}', rem), 'time_remaining': (self.t_fmt, rem * bdi) } ), ('Avg BDI: {avg_bdi} min (over {sample_blks}-block period)', { 'avg_bdi': ('{:.2f}', bdi/60), 'sample_blks': ('{}', sample_blks) } ), ('Cur difficulty: {}', 'cur_diff', '{:.2e}', Decimal(tip_hdr['difficulty'])), ('Est. diff adjust: {}%', 'est_diff_adjust_pct', '{:+.2f}', ((600 / bdi) - 1) * 100), )) def sum_field_avg(self, field): return self.sum_field_total(field) // len(self.res) def sum_field_total(self, field): if isinstance(getattr(self.res[0], field), str): return sum(Decimal(getattr(block, field)) for block in self.res) else: return sum(getattr(block, field) for block in self.res) async def create_col_avg_stats(self): def gen(): for field in self.fnames: if field in self.avg_stats_skip: yield (field, ('{}', '')) else: ret = self.sum_field_avg(field) func = self.fields[field].fmt_func yield (field, ((self.fmt_funcs[func] if func else '{}'), ret)) if not self.header_printed: self.print_header() fs = ''.join(self.gen_fs(self.fnames, fill=self.avg_stats_skip, add_name=True)).strip() return ('column_averages', ('Column averages:', (fs, dict(gen())))) def avg_stats_data(self, data, spec_conv, spec_val): coin = self.rpc.proto.coin return data( hdr = 'Averages for processed blocks:', func = self.sum_field_avg, spec_sufs = {'subsidy': f' {coin}', 'totalfee': f' {coin}'}, spec_convs = { 'interval': spec_conv(0, lambda arg: secs_to_ms(arg)), 'utxo_inc': spec_conv(-1, '{:<+}'), 'mb_per_hour': spec_conv(0, '{}')}, spec_vals = ( spec_val( 'mb_per_hour', 'MB/hr', 'interval', lambda values: 'bs' in self.deps, lambda values: ( '{:.4f}'.format((self.total_bytes / 10000) / (self.total_solve_time / 36)) if self.total_solve_time else 'N/A')), )) mini_avg_stats_data = avg_stats_data def total_stats_data(self, data, spec_conv, spec_val): coin = self.rpc.proto.coin return data( hdr = 'Totals for processed blocks:', func = self.sum_field_total, spec_sufs = {'subsidy': f' {coin}', 'totalfee': f' {coin}', 'reward': f' {coin}'}, spec_convs = { 'interval': spec_conv(0, lambda arg: secs_to_dhms(arg)), 'utxo_inc': spec_conv(-1, '{:<+}'), 'reward': spec_conv(0, self.fmt_funcs['tf'])}, spec_vals = ( spec_val( 'reward', 'Reward', 'totalfee', lambda values: {'subsidy', 'totalfee'} <= set(values), lambda values: values['subsidy'] + values['totalfee']), )) async def create_stats(self, sname): def convert_stats_hdr(field): v = self.fields[field] return '{} {}'.format( v.hdr1.strip(), v.hdr2.strip()).replace('- ', '') if v.hdr1 else v.hdr2.strip() d = getattr(self, f'{sname}_stats_data')( namedtuple('stats_data', ['hdr', 'func', 'spec_sufs', 'spec_convs', 'spec_vals']), namedtuple('spec_conv', ['width_adj', 'conv']), namedtuple('spec_val', ['name', 'lbl', 'insert_after', 'condition', 'code'])) fnames = [n for n in self.fnames if n in self.stats_deps[sname]] lbls = {n: convert_stats_hdr(n) for n in fnames} values = {n: d.func(n) for n in fnames} col1_w = max((len(l) for l in lbls.values()), default=0) + 2 print(d.spec_vals) for v in d.spec_vals: print(v) if v.condition(values): try: idx = fnames.index(v.insert_after) + 1 except: idx = 0 fnames.insert(idx, v.name) lbls[v.name] = v.lbl values[v.name] = v.code(values) def gen(): for n, fname in enumerate(fnames): spec_conv = d.spec_convs.get(fname) yield ( '{lbl:{wid}} {{}}{suf}'.format( lbl = lbls[fname] + ':', wid = col1_w + (spec_conv.width_adj if spec_conv else 0), suf = d.spec_sufs.get(fname) or ''), fname, spec_conv.conv if spec_conv else ( (lambda x: self.fmt_funcs[x] if x else '{}')(self.fields[fname].fmt_func)), values[fname]) return (sname, (d.hdr,) + tuple(gen())) def process_stats_pre(self, i): if (self.fnames and not self.cfg.stats_only) or i != 0: Msg('') def finalize_output(self): pass # 'getblockstats' RPC raises exception on Genesis Block, so provide our own stats: genesis_stats = { 'avgfee': 0, 'avgfeerate': 0, 'avgtxsize': 0, 'feerate_percentiles': [ 0, 0, 0, 0, 0 ], 'height': 0, 'ins': 0, 'maxfee': 0, 'maxfeerate': 0, 'maxtxsize': 0, 'medianfee': 0, 'mediantxsize': 0, 'minfee': 0, 'minfeerate': 0, 'mintxsize': 0, 'outs': 1, 'subsidy': 5000000000, 'swtotal_size': 0, 'swtotal_weight': 0, 'swtxs': 0, 'total_out': 0, 'total_size': 0, 'total_weight': 0, 'totalfee': 0, 'txs': 1, 'utxo_increase': 1, 'utxo_size_inc': 117} class JSONBlocksInfo(BlocksInfo): def __init__(self, cfg, cmd_args, rpc): super().__init__(cfg, cmd_args, rpc) if self.cfg.json_raw: self.output_block = self.output_block_raw self.fmt_stat_item = self.fmt_stat_item_raw Msg_r('{') async def process_blocks(self): Msg_r('"block_data": [') await super().process_blocks() Msg_r(']') def output_block_raw(self, data, n): Msg_r((', ', '')[n==0] + json.dumps(data._asdict(), cls=json_encoder)) def output_block(self, data, n): def gen(): for k, v in data._asdict().items(): func = self.fields[k].fmt_func yield (k, (self.fmt_funcs[func](v) if func else v)) Msg_r((', ', '')[n==0] + json.dumps(dict(gen()), cls=json_encoder)) def print_header(self): pass def fmt_stat_item_raw(self, fs, s): return s async def output_stats(self, res, sname): def gen(data): for d in data: match d: case [_, a]: for k, v in a.items(): yield (k, self.fmt_stat_item(*v)) case [_, a, b, c]: yield (a, self.fmt_stat_item(b, c)) case str(): pass case _: assert False, f'{d}: invalid stats data' varname, data = await res Msg_r(', "{}_data": {}'.format(varname, json.dumps(dict(gen(data)), cls=json_encoder))) def process_stats_pre(self, i): pass def finalize_output(self): Msg('}')