whitespace: scripts, setup.py, other

This commit is contained in:
The MMGen Project 2024-10-18 10:32:11 +00:00
commit de6750a34b
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
8 changed files with 98 additions and 95 deletions

View file

@ -201,7 +201,7 @@ async def main():
if await tx4.send():
tx4.file.write(ask_write=False)
else:
die(2,'Transaction could not be signed')
die(2, 'Transaction could not be signed')
else:
tx.file.write(
outdir = asi.txauto_dir if cfg.autosign else None,

View file

@ -1,11 +1,11 @@
#!/usr/bin/env python3
import sys,os
import sys, os
import script_init
from mmgen.main import launch
from mmgen.cfg import Config
from mmgen.util import msg,Msg,make_chksum_6
from mmgen.util import msg, Msg, make_chksum_6
from mmgen.fileutil import get_lines_from_file
opts_data = {
@ -23,7 +23,7 @@ cfg = Config(opts_data=opts_data)
def main():
lines = get_lines_from_file(cfg, cfg._args[0])
start = (1,0)[bool(cfg.include_first_line)]
start = (1, 0)[bool(cfg.include_first_line)]
a = make_chksum_6(' '.join(lines[start:]).encode())
if start == 1:
b = lines[0]

View file

@ -34,18 +34,18 @@ def curve_clsname_abbr(s):
'Bip32Slip10Ed25519Blake2b': 'blk',
'Bip32Slip10Nist256p1': 'nist',
'Bip32KholawEd25519': 'khol',
}.get(s,s)
}.get(s, s)
fs2 = '{:5} {:2} {:16} {:8} {:8} {:6} {:8} {:8}'
hdr2 = fs2.format('CURVE','NW','ADDR_CLS','VB_PRV','VB_PUB','VB_WIF','VB_ADDR','DFL_PATH')
hdr2 = fs2.format('CURVE', 'NW', 'ADDR_CLS', 'VB_PRV', 'VB_PUB', 'VB_WIF', 'VB_ADDR', 'DFL_PATH')
dfl_vb_prv = '0488ade4'
dfl_vb_pub = '0488b21e'
dfl_curve = 'secp'
dfl_dfl_path = "0'/0/0"
def get_bip_utils_data(bipnum,n):
name,v = bip_utils_data[bipnum][n]
def get_bip_utils_data(bipnum, n):
name, v = bip_utils_data[bipnum][n]
#pexit(v.__dict__)
vb_prv = v.m_key_net_ver.m_priv_net_ver.hex()
vb_pub = v.m_key_net_ver.m_pub_net_ver.hex()
@ -56,44 +56,44 @@ def get_bip_utils_data(bipnum,n):
v.m_addr_cls.__name__.removesuffix('AddrEncoder'),
'x' if vb_prv == dfl_vb_prv else vb_prv,
'x' if vb_pub == dfl_vb_pub else vb_pub,
v.m_wif_net_ver.hex() if isinstance(v.m_wif_net_ver,bytes) else '-',
v.m_wif_net_ver.hex() if isinstance(v.m_wif_net_ver, bytes) else '-',
ap['net_ver'].hex() if 'net_ver' in ap else 'h:'+ap['hrp'] if 'hrp' in ap else 'spec' if ap else '-',
'x' if v.m_def_path == dfl_dfl_path else v.m_def_path,
)
def gen():
def format_data(bipnum,n,sym,name):
def format_data(bipnum, n, sym, name):
return fs.format(
n,
sym if sym else '---',
get_bip_utils_data(bipnum,n) if bipnum else '-',
get_bip_utils_data(bipnum, n) if bipnum else '-',
name if name else '---')
fs = '{:<6} {:6} {:1} {}'
yield f'[defaults]'
yield fs.format('IDX','CHAIN',hdr2,'NAME')
yield fs.format('IDX', 'CHAIN', hdr2, 'NAME')
yield fs.format('0', '-', fs2.format(dfl_curve, '-', '-', dfl_vb_prv, dfl_vb_pub, '-', '-', dfl_dfl_path), '-')
yield f'\n[bip-44]'
yield fs.format('IDX','CHAIN',hdr2,'NAME')
for k,v in slip44_data.items():
yield fs.format('IDX', 'CHAIN', hdr2, 'NAME')
for k, v in slip44_data.items():
if int(k) in bip_utils_data[44]:
yield format_data(44,int(k),v['symbol'],v['name'])
yield format_data(44, int(k), v['symbol'], v['name'])
for bipnum in (49, 84, 86):
yield f'\n[bip-{bipnum}]'
yield fs.format('IDX','CHAIN',hdr2,'NAME')
for n,v in sorted(bip_utils_data[bipnum].items()):
yield fs.format('IDX', 'CHAIN', hdr2, 'NAME')
for n, v in sorted(bip_utils_data[bipnum].items()):
nd = v[1].m_coin_names
yield format_data(bipnum,n,nd.m_abbr,nd.m_name)
yield format_data(bipnum, n, nd.m_abbr, nd.m_name)
yield f'\n[bip-44-unsupported]'
yield fs.format('IDX','CHAIN','','NAME')
for k,v in slip44_data.items():
yield fs.format('IDX', 'CHAIN', '', 'NAME')
for k, v in slip44_data.items():
if not int(k) in bip_utils_data[44]:
yield format_data(None,int(k),v['symbol'],v['name'])
yield format_data(None, int(k), v['symbol'], v['name'])
def main():
@ -106,7 +106,7 @@ def main():
slip44_data = json.loads(fh.read())
bip_utils_data = {
n:{v.m_coin_idx:(k,v) for k,v in globals()[f'Bip{n}Conf'].__dict__.items() if not k.startswith('_')}
n:{v.m_coin_idx:(k, v) for k, v in globals()[f'Bip{n}Conf'].__dict__.items() if not k.startswith('_')}
for n in (44, 49, 84, 86)
}

View file

@ -20,18 +20,18 @@
scripts/create-token.py: Automated ERC20 token creation for the MMGen suite
"""
import sys,json,re
from subprocess import run,PIPE
import sys, json, re
from subprocess import run, PIPE
from collections import namedtuple
import script_init
from mmgen.main import launch
from mmgen.cfg import Config
from mmgen.util import Msg,msg,rmsg,ymsg,die
from mmgen.util import Msg, msg, rmsg, ymsg, die
ti = namedtuple('token_param_info',['default','conversion','test'])
ti = namedtuple('token_param_info', ['default', 'conversion', 'test'])
class TokenData:
fields = ('decimals','supply','name','symbol','owner_addr')
fields = ('decimals', 'supply', 'name', 'symbol', 'owner_addr')
decimals = ti('18', int, lambda s: s.isascii() and s.isdigit() and 0 < int(s) <= 36)
name = ti(None, str, lambda s: s.isascii() and s.isprintable() and len(s) < 256)
supply = ti(None, int, lambda s: s.isascii() and s.isdigit() and 0 < int(s) < 2**256)
@ -186,25 +186,25 @@ contract Token is ERC20Interface, Owned, SafeMath {
}
""" % req_solc_ver_pat
def create_src(cfg,template,token_data):
def create_src(cfg, template, token_data):
def gen():
for k in token_data.fields:
field = getattr(token_data,k)
field = getattr(token_data, k)
if k == 'owner_addr':
owner_addr = cfg._args[0]
from mmgen.addr import is_coin_addr
if not is_coin_addr( cfg._proto, owner_addr.lower() ):
die(1,f'{owner_addr}: not a valid {cfg._proto.coin} coin address')
if not is_coin_addr(cfg._proto, owner_addr.lower()):
die(1, f'{owner_addr}: not a valid {cfg._proto.coin} coin address')
val = '0x' + owner_addr
else:
val = (
getattr(cfg,k)
or getattr(field,'default',None)
or die(1,f'The --{k} option must be specified')
getattr(cfg, k)
or getattr(field, 'default', None)
or die(1, f'The --{k} option must be specified')
)
if not field.test(val):
die(1,f'{val!r}: invalid parameter for option --{k}')
die(1, f'{val!r}: invalid parameter for option --{k}')
yield (k, field.conversion(val))
@ -216,7 +216,7 @@ def check_solc_version():
The output is used by other programs, so write to stdout only
"""
try:
cp = run(['solc','--version'],check=True,stdout=PIPE)
cp = run(['solc', '--version'], check=True, stdout=PIPE)
except:
msg('solc missing or could not be executed') # this must go to stderr
return False
@ -226,14 +226,14 @@ def check_solc_version():
return False
line = cp.stdout.decode().splitlines()[1]
version_str = re.sub(r'Version:\s*','',line)
m = re.match(r'(\d+)\.(\d+)\.(\d+)',version_str)
version_str = re.sub(r'Version:\s*', '', line)
m = re.match(r'(\d+)\.(\d+)\.(\d+)', version_str)
if not m:
Msg(f'Unrecognized solc version string: {version_str}')
return False
from semantic_version import Version,NpmSpec
from semantic_version import Version, NpmSpec
version = Version('{}.{}.{}'.format(*m.groups()))
if version in NpmSpec(req_solc_ver_pat):
@ -243,25 +243,25 @@ def check_solc_version():
Msg(f'solc version ({version_str}) does not match requirement ({req_solc_ver_pat})')
return False
def compile_code(cfg,code):
def compile_code(cfg, code):
cmd = ['solc', '--optimize', '--bin', '--overwrite', '--evm-version=constantinople']
if not cfg.stdout:
cmd += ['--output-dir', cfg.outdir or '.']
cmd += ['-']
msg(f"Executing: {' '.join(cmd)}")
cp = run(cmd,input=code.encode(),stdout=PIPE,stderr=PIPE)
out = cp.stdout.decode().replace('\r','')
err = cp.stderr.decode().replace('\r','').strip()
cp = run(cmd, input=code.encode(), stdout=PIPE, stderr=PIPE)
out = cp.stdout.decode().replace('\r', '')
err = cp.stderr.decode().replace('\r', '').strip()
if cp.returncode != 0:
rmsg('Solidity compiler produced the following error:')
msg(err)
die(4,f'Solidity compiler exited with error (return val: {cp.returncode})')
die(4, f'Solidity compiler exited with error (return val: {cp.returncode})')
if err:
ymsg('Solidity compiler produced the following warning:')
msg(err)
if cfg.stdout:
o = out.split('\n')
return {k:o[i+2] for k in ('SafeMath','Owned','Token') for i in range(len(o)) if k in o[i]}
return {k:o[i+2] for k in ('SafeMath', 'Owned', 'Token') for i in range(len(o)) if k in o[i]}
else:
cfg._util.vmsg(out)
@ -271,19 +271,19 @@ def main():
if cfg.check_solc_version:
sys.exit(0 if check_solc_version() else 1)
if not cfg._proto.coin in ('ETH','ETC'):
die(1,'--coin option must be ETH or ETC')
if not cfg._proto.coin in ('ETH', 'ETC'):
die(1, '--coin option must be ETH or ETC')
if not len(cfg._args) == 1:
cfg._usage()
code = create_src( cfg, solidity_code_template, token_data )
code = create_src(cfg, solidity_code_template, token_data)
if cfg.preprocess:
Msg(code)
sys.exit(0)
out = compile_code( cfg, code )
out = compile_code(cfg, code)
if cfg.stdout:
print(json.dumps(out))

View file

@ -18,10 +18,10 @@ scripts/exec_wrapper.py: wrapper to launch MMGen scripts in a testing environmen
def exec_wrapper_get_colors():
from collections import namedtuple
return namedtuple('colors',['red','green','yellow','blue','purple'])(*[
return namedtuple('colors', ['red', 'green', 'yellow', 'blue', 'purple'])(*[
(lambda s:s) if exec_wrapper_os.getenv('MMGEN_DISABLE_COLOR') else
(lambda s,n=n:f'\033[{n};1m{s}\033[0m' )
for n in (31,32,33,34,35) ])
(lambda s, n=n:f'\033[{n};1m{s}\033[0m')
for n in (31, 32, 33, 34, 35)])
def exec_wrapper_init():
@ -37,12 +37,12 @@ def exec_wrapper_init():
exec_wrapper_os.environ['MMGEN_EXEC_WRAPPER'] = '1'
def exec_wrapper_write_traceback(e,exit_val):
def exec_wrapper_write_traceback(e, exit_val):
import sys,os
import sys, os
exc_line = (
f'{type(e).__name__}({e.mmcode}) {e}' if type(e).__name__ in ('MMGenError','MMGenSystemExit') else
f'{type(e).__name__}({e.mmcode}) {e}' if type(e).__name__ in ('MMGenError', 'MMGenSystemExit') else
f'{type(e).__name__}: {e}')
c = exec_wrapper_get_colors()
@ -50,7 +50,7 @@ def exec_wrapper_write_traceback(e,exit_val):
if os.getenv('EXEC_WRAPPER_TRACEBACK'):
import traceback
cwd = os.getcwd()
sys.path.insert(0,cwd)
sys.path.insert(0, cwd)
from test.overlay import get_overlay_tree_dir
overlay_path_pfx = os.path.relpath(get_overlay_tree_dir(cwd)) + '/'
@ -65,9 +65,9 @@ def exec_wrapper_write_traceback(e,exit_val):
f = exec_wrapper_execed_file if e.filename == '<string>' else fixup_fn(e.filename),
l = '(scrubbed)' if os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC') else e.lineno,
n = e.name,
L = e.line or 'N/A' )
L = e.line or 'N/A')
tb_lines = list( gen_output() )
tb_lines = list(gen_output())
if 'SystemExit' in exc_line:
tb_lines.pop()
@ -79,22 +79,22 @@ def exec_wrapper_write_traceback(e,exit_val):
c.yellow('\n'.join(tb_lines)),
c.red(exc_line)))
print(c.blue('{} script exited with error').format(
'Test' if os.path.dirname(sys.argv[0]) == 'test' else 'Spawned' ))
'Test' if os.path.dirname(sys.argv[0]) == 'test' else 'Spawned'))
with open('test.err','w') as fp:
with open('test.err', 'w') as fp:
fp.write('\n'.join(tb_lines + [exc_line]))
else:
sys.stdout.write( c.purple((f'NONZERO_EXIT[{exit_val}]: ' if exit_val else '') + exc_line) + '\n' )
sys.stdout.write(c.purple((f'NONZERO_EXIT[{exit_val}]: ' if exit_val else '') + exc_line) + '\n')
def exec_wrapper_end_msg():
if (
exec_wrapper_os.getenv('EXEC_WRAPPER_DO_RUNTIME_MSG')
and not exec_wrapper_os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC') ):
and not exec_wrapper_os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC')):
c = exec_wrapper_get_colors()
# write to stdout to ensure script output gets to terminal first
exec_wrapper_sys.stdout.write(c.blue('Runtime: {:0.5f} secs\n'.format(
exec_wrapper_time.time() - exec_wrapper_tstart )))
exec_wrapper_time.time() - exec_wrapper_tstart)))
def exec_wrapper_tracemalloc_setup():
exec_wrapper_os.environ['PYTHONTRACEMALLOC'] = '1'
@ -103,25 +103,25 @@ def exec_wrapper_tracemalloc_setup():
exec_wrapper_sys.stderr.write("INFO → Appending memory allocation stats to 'tracemalloc.log'\n")
def exec_wrapper_tracemalloc_log():
import tracemalloc,re
import tracemalloc, re
snapshot = tracemalloc.take_snapshot()
stats = snapshot.statistics('lineno')
depth = 100
col1w = 100
with open('tracemalloc.log','a') as fp:
fp.write('##### TOP {} {} #####\n'.format(depth,' '.join(exec_wrapper_sys.argv)))
with open('tracemalloc.log', 'a') as fp:
fp.write('##### TOP {} {} #####\n'.format(depth, ' '.join(exec_wrapper_sys.argv)))
for stat in stats[:depth]:
frame = stat.traceback[0]
fn = re.sub(r'.*\/site-packages\/|.*\/mmgen\/test\/overlay\/tree\/','',frame.filename)
fn = re.sub(r'.*\/mmgen\/test\/','test/',fn)
fn = re.sub(r'.*\/site-packages\/|.*\/mmgen\/test\/overlay\/tree\/', '', frame.filename)
fn = re.sub(r'.*\/mmgen\/test\/', 'test/', fn)
fp.write('{f:{w}} {s:>8.2f} KiB\n'.format(
f = f'{fn}:{frame.lineno}:',
s = stat.size/1024,
w = col1w ))
w = col1w))
fp.write('{f:{w}} {s:8.2f} KiB\n\n'.format(
f = 'TOTAL {}:'.format(' '.join(exec_wrapper_sys.argv))[:col1w],
s = sum(stat.size for stat in stats) / 1024,
w = col1w ))
w = col1w))
import sys as exec_wrapper_sys
import os as exec_wrapper_os
@ -146,15 +146,15 @@ try:
exec(fp.read())
except SystemExit as e:
if e.code != 0:
exec_wrapper_write_traceback(e,e.code)
exec_wrapper_write_traceback(e, e.code)
else:
if exec_wrapper_os.getenv('MMGEN_TRACEMALLOC'):
exec_wrapper_tracemalloc_log()
exec_wrapper_end_msg()
exec_wrapper_sys.exit(e.code)
except Exception as e:
exit_val = e.mmcode if hasattr(e,'mmcode') else e.code if hasattr(e,'code') else 1
exec_wrapper_write_traceback(e,exit_val)
exit_val = e.mmcode if hasattr(e, 'mmcode') else e.code if hasattr(e, 'code') else 1
exec_wrapper_write_traceback(e, exit_val)
exec_wrapper_sys.exit(exit_val)
if exec_wrapper_os.getenv('MMGEN_TRACEMALLOC'):

View file

@ -15,7 +15,7 @@ command.
The cleaned source files are saved with the .clean extension.
"""
import sys,re
import sys, re
from difflib import unified_diff
fns = sys.argv[1:3]
@ -30,20 +30,20 @@ translate = {
def cleanup_file(fn):
# must use binary mode to prevent conversion of DOS CR into newline
with open(fn,'rb') as fp:
with open(fn, 'rb') as fp:
data = fp.read().decode()
def gen_text():
for line in data.split('\n'): # do not use splitlines()
line = line.translate({ord(a):b for a,b in translate.items()})
line = re.sub(r'\s+$','',line) # trailing whitespace
line = line.translate({ord(a):b for a, b in translate.items()})
line = re.sub(r'\s+$', '', line) # trailing whitespace
yield line
ret = list(gen_text())
sys.stderr.write(f'Saving cleaned file to {fn}.clean\n')
with open(f'{fn}.clean','w') as fp:
with open(f'{fn}.clean', 'w') as fp:
fp.write('\n'.join(ret))
return ret
@ -62,5 +62,8 @@ if len(fns) == 2:
else:
print(
f'diff a/{fns[0]} b/{fns[1]}\n' +
'\n'.join(a.rstrip() for a in unified_diff(*cleaned_texts,fromfile=f'a/{fns[0]}',tofile=f'b/{fns[1]}'))
'\n'.join(a.rstrip() for a in unified_diff(
*cleaned_texts,
fromfile = f'a/{fns[0]}',
tofile = f'b/{fns[1]}'))
)

View file

@ -5,7 +5,7 @@ Convert an MMGen 'v2' transaction file (amounts as BTCAmt()) to 'v3' (amounts as
strings). Version 3 TX files were introduced with MMGen version 0.9.7
"""
import sys,os,asyncio
import sys, os, asyncio
from mmgen.cfg import Config
from mmgen.tx import CompletedTX
@ -31,5 +31,5 @@ cfg = Config(opts_data=opts_data)
if len(cfg._args) != 1:
cfg._usage()
tx = asyncio.run(CompletedTX(cfg._args[0],quiet_open=True))
tx.file.write(ask_tty=False,ask_overwrite=not cfg.quiet,ask_write=not cfg.quiet)
tx = asyncio.run(CompletedTX(cfg._args[0], quiet_open=True))
tx.file.write(ask_tty=False, ask_overwrite=not cfg.quiet, ask_write=not cfg.quiet)

View file

@ -10,15 +10,15 @@
import sys, os
from pathlib import Path
from subprocess import run,PIPE
from setuptools import setup,Extension
from subprocess import run, PIPE
from setuptools import setup, Extension
from setuptools.command.build_ext import build_ext
def build_libsecp256k1():
home = Path(os.environ['HOME'])
cache_path = home.joinpath('.cache','mmgen')
src_path = home.joinpath('.cache','mmgen','secp256k1')
cache_path = home.joinpath('.cache', 'mmgen')
src_path = home.joinpath('.cache', 'mmgen', 'secp256k1')
lib_file = src_path.joinpath('.libs', 'libsecp256k1.dll.a')
root = Path().resolve().anchor
installed_lib_file = Path(root, 'msys64', 'ucrt64', 'lib', 'libsecp256k1.dll.a')
@ -32,29 +32,29 @@ def build_libsecp256k1():
if not src_path.exists():
cache_path.mkdir(parents=True)
print('\nCloning libsecp256k1')
run(['git','clone','https://github.com/bitcoin-core/secp256k1.git'],check=True,cwd=cache_path)
run(['git', 'clone', 'https://github.com/bitcoin-core/secp256k1.git'], check=True, cwd=cache_path)
if not lib_file.exists():
print(f'\nBuilding libsecp256k1 (cwd={str(src_path)})')
cmds = (
['sh','./autogen.sh'],
['sh','./configure','CFLAGS=-g -O2 -fPIC','--disable-dependency-tracking'],
['mingw32-make','MAKE=mingw32-make'])
['sh', './autogen.sh'],
['sh', './configure', 'CFLAGS=-g -O2 -fPIC', '--disable-dependency-tracking'],
['mingw32-make', 'MAKE=mingw32-make'])
for cmd in cmds:
print('Executing {}'.format(' '.join(cmd)))
run(cmd,check=True,cwd=src_path)
run(cmd, check=True, cwd=src_path)
if not installed_lib_file.exists():
run(['mingw32-make','install','MAKE=mingw32-make'],check=True,cwd=src_path)
run(['mingw32-make', 'install', 'MAKE=mingw32-make'], check=True, cwd=src_path)
class my_build_ext(build_ext):
def build_extension(self,ext):
def build_extension(self, ext):
if sys.platform == 'win32':
build_libsecp256k1()
build_ext.build_extension(self,ext)
build_ext.build_extension(self, ext)
setup(
cmdclass = { 'build_ext': my_build_ext },
cmdclass = {'build_ext': my_build_ext},
ext_modules = [Extension(
name = 'mmgen.proto.secp256k1.secp256k1',
sources = ['extmod/secp256k1mod.c'],