mmgen-node-tools/mmgen_node_tools/PeerBlocks.py

155 lines
4.4 KiB
Python
Executable file

#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
mmgen_node_tools.PeerBlocks: List blocks in flight, disconnect stalling nodes
"""
import asyncio
from collections import namedtuple
from mmgen.util import msg, msg_r, is_int
from mmgen.term import get_term, get_terminal_size, get_char
from mmgen.ui import line_input
from .PollDisplay import PollDisplay
RED, RESET = ('\033[31m', '\033[0m')
COLORS = ['\033[38;5;%s;1m' % c for c in list(range(247, 256)) + [231]]
ERASE_ALL, CUR_HOME = ('\033[J', '\033[H')
CUR_HIDE, CUR_SHOW = ('\033[?25l', '\033[?25h')
term = None
class Display(PollDisplay):
poll_secs = 2
def __init__(self, cfg):
super().__init__(cfg)
global term, term_width
if not term:
term = get_term()
term.init(noecho=True)
term_width = self.cfg.columns or get_terminal_size().width
msg_r(CUR_HOME+ERASE_ALL+CUR_HOME)
async def get_info(self, rpc):
return await rpc.call('getpeerinfo')
def display(self, count):
msg_r(
CUR_HOME
+ (ERASE_ALL if count == 1 else '')
+ 'CONNECTED PEERS ({a}) {b} - poll {c}'.format(
a = len(self.info),
b = self.desc,
c = count).ljust(term_width)[:term_width]
+ '\n'
+ ('\n'.join(self.gen_display()) + '\n' if self.info else '')
+ ERASE_ALL
+ f"Type a peer number to disconnect, 'q' to quit, or any other key for {self.other_desc} display:"
+ '\b')
async def disconnect_node(self, rpc, addr):
return await rpc.call('disconnectnode', addr)
def get_input(self):
s = get_char(immed_chars='q0123456789', prehold_protect=False, num_bytes=1)
if not is_int(s):
return s
with self.info_lock:
msg('')
term.reset()
# readline required for correct operation here; without it, user must re-type first digit
ret = line_input(self.cfg, 'peer number> ', insert_txt=s, hold_protect=False)
term.init(noecho=True)
self.enable_display = False # prevent display from updating before process_input()
return ret
async def process_input(self, rpc):
ids = tuple(str(i['id']) for i in self.info)
ret = False
msg_r(CUR_HIDE)
if self.input in ids:
from mmgen.exception import RPCFailure
addr = self.info[ids.index(self.input)]['addr']
try:
await self.disconnect_node(rpc, addr)
except RPCFailure:
msg_r(f'Unable to disconnect peer {self.input} ({addr})')
else:
msg_r(f'Disconnecting peer {self.input} ({addr})')
await asyncio.sleep(1)
elif self.input and is_int(self.input[0]):
msg_r(f'{self.input}: invalid peer number ')
await asyncio.sleep(0.5)
else:
ret = True
msg_r(CUR_SHOW)
return ret
class BlocksDisplay(Display):
desc = 'Blocks in Flight'
other_desc = 'address'
def gen_display(self):
pd = namedtuple('peer_data', ['id', 'blks_data', 'blks_width'])
bd = namedtuple('block_datum', ['num', 'disp'])
def gen_block_data():
global min_height
min_height = None
for d in self.info:
if d.get('inflight'):
blocks = d['inflight']
min_height = min(blocks) if not min_height else min(min_height, min(blocks))
line = ' '.join(map(str, blocks))[:blks_field_width]
blocks_disp = line.split()
yield pd(
d['id'],
[bd(blocks[i], blocks_disp[i]) for i in range(len(blocks_disp))],
len(line))
else:
yield pd(d['id'], [], 0)
def gen_line(peer_data):
for blk in peer_data.blks_data:
yield (RED if blk.num == min_height else COLORS[blk.num % 10]) + blk.disp + RESET
id_width = max(2, max(len(str(i['id'])) for i in self.info))
blks_field_width = term_width - 2 - id_width
fs = '{:>%s}: {}' % id_width
# we must iterate through all data to get 'min_height' before calling gen_line():
for peer_data in tuple(gen_block_data()):
yield fs.format(
peer_data.id,
' '.join(gen_line(peer_data)) + ' ' * (blks_field_width - peer_data.blks_width))
class PeersDisplay(Display):
desc = 'Address Menu'
other_desc = 'blocks'
def gen_display(self):
id_width = max(2, max(len(str(i['id'])) for i in self.info))
addr_width = max(len(str(i['addr'])) for i in self.info)
for d in self.info:
yield '{a:>{A}}: {b:{B}} {c}'.format(
a = d['id'],
A = id_width,
b = d['addr'],
B = addr_width,
c = d['subver']).ljust(term_width)[:term_width]