#!/usr/bin/env python3 # # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution # Copyright (C)2013-2021 The MMGen Project # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation, either version 3 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program. If not, see . """ mmgen-blocks-info: Display information about a block or range of blocks """ import time,re from collections import namedtuple from mmgen.common import * from decimal import Decimal opts_data = { 'sets': [ ('raw_miner_info',True,'miner_info',True), ('summary',True,'raw_miner_info',False), ('summary',True,'miner_info',False) ], 'text': { 'desc': 'Display information about a range of blocks', 'usage': '[opts] +||', 'options': """ -h, --help Print this help message --, --longhelp Print help message for long options (common options) -H, --hashes Display only block numbers and hashes -m, --miner-info Display miner info in coinbase transaction -M, --raw-miner-info Display miner info in uninterpreted form -n, --no-header Don't print the column header -o, --fields= Display the specified fields -s, --summary Print the summary only -S, --no-summary Don't print the summary """, 'notes': """ If no block number is specified, the current block is assumed. In addition to information about the requested range of blocks, an estimate of the next difficulty adjustment is also displayed based on the average Block Discovery Interval from the beginning of the current 2016-block period to the chain tip. Requires --txindex for correct operation. """ } } class local_vars: pass class BlocksInfo: first = None last = None nblocks = None def __init__(self): self.get_block_range() self.post_init() def get_block_range(self): if not cmd_args: first = last = c.blockcount else: arg = cmd_args[0] if arg.startswith('+') and is_int(arg[1:]): last = c.blockcount first = last - int(arg[1:]) + 1 elif is_int(arg): first = last = int(arg) else: try: first,last = [int(ep) for ep in arg.split('-')] except: opts.usage() if first > last: die(2,f'{first}-{last}: invalid block range') if last > c.blockcount: die(2,f'Requested block number ({last}) greater than current block height') self.first = first self.last = last self.nblocks = last - first + 1 def post_init(self): pass async def run(self): heights = range(self.first,self.last+1) hashes = await c.gathered_call('getblockhash',[(height,) for height in heights]) hdrs = await c.gathered_call('getblockheader',[(H,) for H in hashes]) for height in heights: await self.process_block(height,hashes.pop(0),hdrs.pop(0)) def print_header(self): hdr1 = [v.hdr1 for v in self.fvals] hdr2 = [v.hdr2 for v in self.fvals] if opt.miner_info: hdr1.append(' ') hdr2.append('Miner') Msg(self.fs.format(*hdr1)) Msg(self.fs.format(*hdr2)) async def print_summary(self): from mmgen.util import secs_to_hms tip = c.blockcount rel = tip % 2016 if rel: H1,H2,HA,HB = await c.gathered_call('getblockhash',[[self.first],[self.last],[tip-rel],[tip]]) h1,h2,hA,hB = await c.gathered_call('getblockheader',[[H1],[H2],[HA],[HB]]) bdi = (hB['time']-hA['time']) / rel adj_pct = ((600 / bdi) - 1) * 100 Msg_r(fmt(f""" Current height: {tip} Next diff adjust: {tip-rel+2016} (in {2016-rel} blocks [{((2016-rel)*bdi)/86400:.2f} days]) BDI (cur period): {bdi/60:.2f} min Est. diff adjust: {adj_pct:+.2f}% """)) else: H1,H2 = await c.gathered_call('getblockhash',[[self.first],[self.last]]) h1,h2 = await c.gathered_call('getblockheader',[[H1],[H2]]) Msg_r(fmt(f""" Current height: {tip} Next diff adjust: {tip-rel+2016} (in {2016-rel} blocks) """)) Msg('\nRange: {}-{} ({} blocks [{}])'.format( self.first, self.last, self.nblocks, secs_to_hms(h2['time'] - h1['time']) )) class BlocksInfoOverview(BlocksInfo): total_bytes = 0 total_weight = 0 t_start = None t_prev = None t_cur = None bf = namedtuple('block_info_fields',['hdr1','hdr2','fs','bs_key','varname','deps','key']) # bs=getblockstats(), bh=getblockheader() # If 'bs_key' is set, it's included in self.bs_keys instead of 'key' fields = { 'block': bf('', 'Block', '{:<6}', None, 'height',[], None), 'hash': bf('', 'Hash', '{:<64}', None, 'H', [], None), 'date': bf('', 'Date', '{:<19}', None, 'df', ['bs'],None), 'interval': bf('Solve','Time ', '{:>6}', None, 'if', ['bs'],None), 'size': bf('', 'Size', '{:>7}', None, 'bs', [], 'total_size'), 'weight': bf('', 'Weight', '{:>7}', None, 'bs', [], 'total_weight'), 'utxo_inc': bf(' UTXO',' Incr', '{:>5}', None, 'bs', [], 'utxo_increase'), 'fee10': bf('10%', 'Fee', '{:>3}', 'feerate_percentiles','fp', ['bs'],0), 'fee25': bf('25%', 'Fee', '{:>3}', 'feerate_percentiles','fp', ['bs'],1), 'fee50': bf('50%', 'Fee', '{:>3}', 'feerate_percentiles','fp', ['bs'],2), 'fee75': bf('75%', 'Fee', '{:>3}', 'feerate_percentiles','fp', ['bs'],3), 'fee90': bf('90%', 'Fee', '{:>3}', 'feerate_percentiles','fp', ['bs'],4), 'fee_avg': bf('Avg', 'Fee', '{:>3}', None, 'bs', [], 'avgfeerate'), 'fee_min': bf('Min', 'Fee', '{:>3}', None, 'bs', [], 'minfeerate'), 'totalfee': bf('', 'Total Fee','{:>10}', 'totalfee', 'tf', ['bs'],None), 'outputs': bf('Out-', 'puts', '{:>5}', None, 'bs', [], 'outs'), 'inputs': bf('In- ', 'puts', '{:>5}', None, 'bs', [], 'ins'), 'version': bf('', 'Version', '{:8}', None, 'bh', [], 'versionHex'), 'nTx': bf('', 'nTx ', '{:>5}', None, 'bh', [], 'nTx'), 'subsidy': bf('', 'Subsidy', '{:7}', 'subsidy', 'su', ['bs'], None), } dfl_fields = ['block','date','interval','subsidy','totalfee','size','weight','fee50','fee25','fee10','version'] funcs = { 'df': lambda self,loc: time.strftime('%Y-%m-%d %X',time.gmtime(self.t_cur)), 'if': lambda self,loc: ( '-{:02}:{:02}'.format(abs(loc.t_diff)//60,abs(loc.t_diff)%60) if loc.t_diff < 0 else ' {:02}:{:02}'.format(loc.t_diff//60,loc.t_diff%60) ), 'tf': lambda self,loc: '{:.8f}'.format(loc.bs["totalfee"] * Decimal('0.00000001')), 'bh': lambda self,loc: loc.hdr, 'fp': lambda self,loc: loc.bs['feerate_percentiles'], 'su': lambda self,loc: str(loc.bs['subsidy'] * Decimal('0.00000001')).rstrip('0'), } def __init__(self): super().__init__() if opt.fields: fnames = opt.fields.split(',') for n in fnames: if n not in self.fields: die(1,f'{n!r}: unrecognized field') else: fnames = self.dfl_fields self.fvals = list(self.fields[k] for k in fnames if k in self.fields) self.fs = ' '.join( v.fs for v in self.fvals ) self.deps = set(' '.join(v.varname + ' ' + ' '.join(v.deps) for v in self.fvals).split()) self.bs_keys = [(v.bs_key or v.key) for v in self.fvals if v.bs_key or v.varname == 'bs'] self.bs_keys.extend(['total_size','time']) self.ufuncs = {v.varname:self.funcs[v.varname] for v in self.fvals if v.varname in self.funcs} if opt.miner_info: self.fs += ' {}' self.miner_pats = [re.compile(pat) for pat in ( rb'[\xe3\xe4\xe5][\^/](.*?)\xfa', rb'([a-zA-Z0-9&. -]+/Mined by [a-zA-Z0-9. ]+)', rb'\x08/(.*Mined by [a-zA-Z0-9. ]+)', rb'Mined by ([a-zA-Z0-9. ]+)', rb'[/^]([a-zA-Z0-9&. /-]{5,})', rb'[/^]([a-zA-Z0-9&. /-]+)/', )] else: self.miner_pats = None async def get_miner_string(self,H): tx0 = (await c.call('getblock',H))['tx'][0] bd = await c.call('getrawtransaction',tx0,1) if type(bd) == tuple: return '---' else: cb = bytes.fromhex(bd['vin'][0]['coinbase']) if opt.raw_miner_info: return repr(cb) else: for pat in self.miner_pats: m = pat.search(cb) if m: return ''.join(chr(b) for b in m[1] if 31 < b < 127).strip('^').strip('/').replace('/',' ') async def process_block(self,height,H,hdr): loc = local_vars() loc.height = height loc.H = H loc.hdr = hdr if 'bs' in self.deps: loc.bs = await c.call('getblockstats',H,self.bs_keys) self.total_bytes += loc.bs['total_size'] self.total_weight += loc.bs['total_weight'] self.t_cur = loc.bs['time'] if self.t_prev == None: if height == 0: b_prev = loc.bs else: bH = await c.call('getblockhash',height-1) b_prev = await c.call('getblockstats',bH) self.t_start = self.t_prev = b_prev['time'] loc.t_diff = self.t_cur - self.t_prev self.t_prev = self.t_cur if opt.summary: return for varname,func in self.ufuncs.items(): ret = func(self,loc) if type(ret).__name__ == 'coroutine': ret = await ret setattr(loc,varname,ret) if opt.miner_info: miner_info = await self.get_miner_string(H) def gen(): for v in self.fvals: if v.key is None: yield getattr(loc,v.varname) else: yield getattr(loc,v.varname)[v.key] if opt.miner_info: yield miner_info Msg(self.fs.format(*gen())) async def print_summary(self): if 'bs' in self.deps: await super().print_summary() if self.nblocks > 1: elapsed = self.t_cur - self.t_start ac = int(elapsed / self.nblocks) rate = (self.total_bytes / 10000) / (elapsed / 36) Msg_r (fmt(f""" Avg size: {self.total_bytes//self.nblocks} bytes Avg weight: {self.total_weight//self.nblocks} bytes MB/hr: {rate:0.4f} Avg BDI: {ac/60:.2f} min """)) class BlocksInfoHashes(BlocksInfo): def print_header(self): Msg('{:<7} {}'.format('BLOCK','HASH')) async def run(self): heights = range(self.first,self.last+1) hashes = await c.gathered_call('getblockhash',[(height,) for height in heights]) Msg('\n'.join('{:<7} {}'.format(height,H) for height,H in zip(heights,hashes))) cmd_args = opts.init(opts_data) if len(cmd_args) not in (0,1): opts.usage() async def main(): from mmgen.protocol import init_proto_from_opts proto = init_proto_from_opts() from mmgen.rpc import rpc_init global c c = await rpc_init(proto) if opt.hashes: m = BlocksInfoHashes() else: m = BlocksInfoOverview() if not (opt.summary or opt.no_header): m.print_header() await m.run() if not opt.no_summary: if not opt.summary: Msg('') await m.print_summary() run_session(main())