various fixes and cleanups

This commit is contained in:
The MMGen Project 2022-10-27 16:47:22 +00:00
commit 6ad22bdf5c
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
6 changed files with 36 additions and 98 deletions

View file

@ -37,7 +37,7 @@ class Display(PollDisplay):
if not term:
term = get_term()
term.init(noecho=True)
term_width = g.columns or get_terminal_size()[0]
term_width = g.columns or get_terminal_size().width
msg_r(CUR_HOME+ERASE_ALL+CUR_HOME)
async def get_info(self,rpc):

View file

@ -1,49 +0,0 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2016 Philemon <mmgen-py@yandex.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
mmgen_node_tools.Term: terminal routines for MMGen node tools
"""
import sys,os,termios
def get_keypress(prompt="",esc_sequences=False):
import time,tty,select
sys.stderr.write(prompt)
fd = sys.stdin.fileno()
# old = termios.tcgetattr(fd) # see below
tty.setcbreak(fd) # must do this, even if it was set at program launch
def osread_chk(n):
while True:
try:
return os.read(fd,n)
except:
time.sleep(0.1)
# Must use os.read() for unbuffered read, otherwise select() will never return true
s = osread_chk(1)
if esc_sequences:
if s == '\x1b':
if select.select([sys.stdin],[],[],0)[0]:
s += osread_chk(2)
# Leave the term in cbreak mode, restore at exit
# termios.tcsetattr(fd, termios.TCSADRAIN, old)
return s

View file

@ -61,7 +61,6 @@ opts.init({
-P, --pager Pipe the output to a pager
-r, --ranges Display fee brackets as ranges
-s, --show-mb-col Display column with each fee brackets megabyte count
-w, --width=W Force output width of 'W' columns (default: term width)
""",
'notes': """
+ By default, fee bracket row labels include only the top of the range.
@ -83,16 +82,12 @@ if opt.ignore_below:
die(1,'Conflicting options: --ignore-below, --show-empty')
ignore_below = parse_bytespec(opt.ignore_below)
if opt.precision:
precision = check_int_between(opt.precision,min_prec,max_prec,'--precision arg')
else:
precision = dfl_prec
precision = (
check_int_between(opt.precision,min_prec,max_prec,'--precision arg')
if opt.precision else dfl_prec )
if opt.width:
width = check_int_between(opt.width,40,1024,'--width arg')
else:
from mmgen.term import get_terminal_size
width = get_terminal_size()[0]
from mmgen.term import get_terminal_size
width = g.columns or get_terminal_size().width
class fee_bracket:
def __init__(self,top,bottom):
@ -102,12 +97,6 @@ class fee_bracket:
self.tx_bytes_cum = 0
self.skip = False
def get_fake_data(fn): # for debugging
import json
from mmgen.rpc import json_encoder
from decimal import Decimal
return json.loads(open(os.path.join(fn)).read(),parse_float=Decimal)
def log(data,fn):
import json
from mmgen.rpc import json_encoder
@ -129,9 +118,6 @@ def create_data(coin_amt,mempool):
while out and out[-1].tx_bytes == 0:
out.pop()
if not out:
die(1,'No data!')
out.reverse() # cumulative totals and display are top-down
# calculate cumulative byte totals, filter rows:
@ -152,6 +138,7 @@ def gen_header(host,blockcount):
make_timestr(),
blockcount,
))
if opt.show_empty:
yield('Displaying all fee brackets')
elif opt.ignore_below:
@ -159,29 +146,28 @@ def gen_header(host,blockcount):
ignore_below,
int2bytespec(ignore_below,'MB','0.6'),
))
if opt.include_current:
yield('Including transactions in current fee bracket in Total MB amounts')
def fmt_mb(n):
return int2bytespec(n,'MB',f'0.{precision}',False)
return int2bytespec(n,'MB',f'0.{precision}',print_sym=False)
def gen_body(data):
tx_bytes_max = max(i.tx_bytes for i in data)
top_max = max(i.top for i in data if not i.skip)
bot_max = max(i.bottom for i in data if not i.skip)
tx_bytes_max = max((i.tx_bytes for i in data),default=0)
top_max = max((i.top for i in data),default=0)
bot_max = max((i.bottom for i in data),default=0)
col1_w = max(len(f'{bot_max}-{top_max}') if opt.ranges else len(f'{top_max}'),6)
col2_w = len(fmt_mb(tx_bytes_max)) if opt.show_mb_col else 0
col3_w = len(fmt_mb(data[-1].tx_bytes_cum))
col3_w = len(fmt_mb(data[-1].tx_bytes_cum)) if data else 0
col4_w = width - col1_w - col2_w - col3_w - (4 if col2_w else 3)
if opt.show_mb_col:
fs = '{a:<%i} {b:>%i} {c:>%i} {d}' % (col1_w,col2_w,col3_w)
else:
fs = '{a:<%i} {c:>%i} {d}' % (col1_w,col3_w)
yield(
'\n' + fs.format(a='', b='', c=f'{"Total":<{col3_w}}', d='') +
'\n' + fs.format(a='sat/B', b=f'{"MB":<{col2_w}}', c=f'{"MB":<{col3_w}}', d='')
)
yield fs.format(a='', b='', c=f'{"Total":<{col3_w}}', d='')
yield fs.format(a='sat/B', b=f'{"MB":<{col2_w}}', c=f'{"MB":<{col3_w}}', d='')
for i in data:
if not i.skip:
@ -195,7 +181,7 @@ def gen_body(data):
yield(fs.format(
a = 'TOTAL',
b = '',
c = fmt_mb(data[-1].tx_bytes_cum + data[-1].tx_bytes),
c = fmt_mb(data[-1].tx_bytes_cum + data[-1].tx_bytes if data else 0),
d = '' ))
async def main():
@ -206,9 +192,7 @@ async def main():
from mmgen.rpc import rpc_init
c = await rpc_init(proto)
# pmsg(await c.call('getmempoolinfo'))
mempool = await c.call('getrawmempool',True)
# mempool = get_fake_data('test_data/mempool-sample.json')
if opt.log:
log(mempool,'mempool.json')

View file

@ -297,25 +297,22 @@ class fake_data:
"""
}
def make_address_data():
for line in fake_data.addresses.strip().split('\n'):
data = line.split(maxsplit=2)
yield (data[0], {k:v for k,v in zip(('id','addr','subver'),data)})
def make_iterations_data():
for line in fake_data.iterations.strip().split('\n'):
data = line.split(maxsplit=1)
yield (data[0], data[1].split())
def make_blocks_data(iterations):
for peer,blocks_str in fake_data.blocks.items():
iter_strs = dict([s.lstrip().split(maxsplit=1) for s in blocks_str.strip().split('\n') if ' ' in s])
yield (peer,dict((i,iter_strs.get(i,'').split()) for i in iterations))
def make_data():
address_data = dict(fake_data.make_address_data())
iterations_data = dict(fake_data.make_iterations_data())
blocks_data = dict(fake_data.make_blocks_data(iterations_data))
def gen_address_data():
for line in fake_data.addresses.strip().split('\n'):
data = line.split(maxsplit=2)
yield (data[0], {k:v for k,v in zip(('id','addr','subver'),data)})
def gen_iterations_data():
for line in fake_data.iterations.strip().split('\n'):
data = line.split(maxsplit=1)
yield (data[0], data[1].split())
def gen_blocks_data(iterations):
for peer,blocks_str in fake_data.blocks.items():
iter_strs = dict([s.lstrip().split(maxsplit=1) for s in blocks_str.strip().split('\n') if ' ' in s])
yield (peer,dict((i,iter_strs.get(i,'').split()) for i in iterations))
def make_peerinfo(peer_id,blocks,iter_no):
d = address_data[peer_id]
@ -334,6 +331,10 @@ class fake_data:
if peer_id in iterations_data[iter_no] ]
)
address_data = dict(gen_address_data())
iterations_data = dict(gen_iterations_data())
blocks_data = dict(gen_blocks_data(iterations_data))
fake_data.peerinfo = dict(gen_data())
async def get_info(self,rpc):

View file

@ -24,7 +24,7 @@ groups_desc="
default - All tests minus the extra tests
extra - All tests minus the default tests
noalt - BTC-only tests
quick - Default tests minus btc_tn, bch, bch_rt, ltc and ltc_rt
quick - Default tests minus bch_rt and ltc_rt
qskip - The tests skipped in the 'quick' test group
"

View file

@ -17,6 +17,7 @@ from mmgen.globalvars import g
from mmgen.opts import opt
from mmgen.util import die,gmsg
from mmgen.protocol import init_proto
from mmgen.proto.btc.regtest import MMGenRegtest
from ..include.common import *
from .common import *
@ -93,6 +94,7 @@ class TestSuiteRegtest(TestSuiteBase):
die(2,'--testnet and --regtest options incompatible with regtest test suite')
self.proto = init_proto(self.proto.coin,network='regtest',need_amt=True)
self.addrs = gen_addrs(self.proto,'regtest',[1,2,3,4,5])
self.regtest = MMGenRegtest(self.proto.coin)
def setup(self):
stop_test_daemons(self.proto.network_id,force=True,remove_datadir=True)