whitespace, cleanups, imports throughout [73 files changed]

This commit is contained in:
The MMGen Project 2023-03-27 10:48:21 +00:00
commit 78e882143c
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
73 changed files with 482 additions and 268 deletions

View file

@ -13,8 +13,10 @@ examples.halving-calculator.py: Demonstrate use of the MMGen asyncio/aiohttp JSO
"""
import time
from decimal import Decimal
from mmgen.common import *
import mmgen.opts as opts
from mmgen.opts import opt
from mmgen.util import async_run
opts.init({
'text': {

View file

@ -20,13 +20,14 @@
addrfile: Address and password file classes for the MMGen suite
"""
from .globalvars import g
from .util import msg,qmsg,qmsg_r,die,capfirst
from .protocol import init_proto
from .obj import MMGenObject,TwComment,WalletPassword,MMGenPWIDString
from .seed import SeedID,is_seed_id
from .key import PrivKey
from .addr import ViewKey,AddrListID,MMGenAddrType,MMGenPasswordType,is_addr_idx
from .addrlist import KeyList,AddrListData,dmsg_sc
from .addrlist import KeyList,AddrListData
class AddrFile(MMGenObject):
desc = 'addresses'
@ -57,12 +58,10 @@ class AddrFile(MMGenObject):
@property
def filename(self):
from .globalvars import g
return '{}{x}{}.{}'.format(
return '{}{}.{}'.format(
self.parent.id_str,
('.' + self.parent.proto.network) if self.parent.proto.testnet else '',
self.ext,
x = '' if g.debug_utf8 else '' )
self.ext )
def write(self,fn=None,ask_tty=True,ask_write_default_yes=False,binary=False,desc=None):
from .opts import opt
@ -97,7 +96,7 @@ class AddrFile(MMGenObject):
out.append('# Record this value to a secure location.\n')
lbl = self.make_label()
dmsg_sc('lbl',lbl[9:])
self.parent.dmsg_sc('lbl',lbl[9:])
out.append(f'{lbl} {{')
fs = ' {:<%s} {:<34}{}' % len(str(p.data[-1].idx))

View file

@ -20,17 +20,13 @@
addrlist: Address list classes for the MMGen suite
"""
from .globalvars import g
from .util import qmsg,qmsg_r,suf,make_chksum_N,Msg,die
from .objmethods import MMGenObject,Hilite,InitErrors
from .obj import MMGenListItem,ListItemAttr,MMGenDict,TwComment,WalletPassword
from .key import PrivKey
from .addr import MMGenID,MMGenAddrType,CoinAddr,AddrIdx,AddrListID,ViewKey
def dmsg_sc(desc,data):
from .globalvars import g
if g.debug_addrlist:
Msg(f'sc_debug_{desc}: {data}')
class AddrIdxList(tuple,InitErrors,MMGenObject):
max_len = 1000000
@ -135,7 +131,7 @@ class AddrListIDStr(str,Hilite):
('-'+mt,'')[mt in ('L','E')],
s )
dmsg_sc('id_str',ret[8:].split('[')[0])
addrlist.dmsg_sc('id_str',ret[8:].split('[')[0])
return str.__new__(cls,ret)
@ -154,7 +150,15 @@ class AddrList(MMGenObject): # Address info for a single seed ID
has_keys = False
chksum_rec_f = lambda foo,e: ( str(e.idx), e.addr )
def __init__(self,proto,
def dmsg_sc(self,desc,data):
Msg(f'sc_debug_{desc}: {data}')
def noop(self,desc,data):
pass
def __init__(
self,
proto,
addrfile = '',
al_id = '',
adata = [],
@ -166,14 +170,17 @@ class AddrList(MMGenObject): # Address info for a single seed ID
mmtype = None,
key_address_validity_check = None, # None=prompt user, True=check without prompt, False=skip check
skip_chksum = False,
add_p2pkh = False,
):
skip_chksum_msg = False,
add_p2pkh = False ):
self.ka_validity_chk = key_address_validity_check
self.add_p2pkh = add_p2pkh
self.proto = proto
do_chksum = False
if not g.debug_addrlist:
self.dmsg_sc = self.noop
if seed and addr_idxs: # data from seed + idxs
self.al_id = AddrListID( sid=seed.sid, mmtype=MMGenAddrType(proto, mmtype or proto.dfl_mmtype) )
src = 'gen'
@ -217,7 +224,8 @@ class AddrList(MMGenObject): # Address info for a single seed ID
if do_chksum and not skip_chksum:
self.chksum = AddrListChksum(self)
self.do_chksum_msg(record=src=='gen')
if not skip_chksum_msg:
self.do_chksum_msg(record=src=='gen')
def do_chksum_msg(self,record):
chk = 'Check this value against your records'
@ -229,7 +237,7 @@ class AddrList(MMGenObject): # Address info for a single seed ID
def generate(self,seed,addr_idxs):
seed = self.scramble_seed(seed.data)
dmsg_sc('seed',seed[:8].hex())
self.dmsg_sc('seed',seed[:8].hex())
mmtype = self.al_id.mmtype
@ -298,7 +306,7 @@ class AddrList(MMGenObject): # Address info for a single seed ID
def scramble_seed(self,seed):
is_btcfork = self.proto.base_coin == 'BTC'
if is_btcfork and self.al_id.mmtype == 'L' and not self.proto.testnet:
dmsg_sc('str','(none)')
self.dmsg_sc('str','(none)')
return seed
if self.proto.base_coin == 'ETH':
scramble_key = self.proto.coin.lower()
@ -307,7 +315,7 @@ class AddrList(MMGenObject): # Address info for a single seed ID
from .crypto import scramble_seed
if self.proto.testnet:
scramble_key += ':' + self.proto.network
dmsg_sc('str',scramble_key)
self.dmsg_sc('str',scramble_key)
return scramble_seed(seed,scramble_key.encode())
def idxs(self):

View file

@ -290,13 +290,13 @@ def add_user_random(
return rand_bytes
def get_hash_preset_from_user(
hash_preset = g.dfl_hash_preset,
old_preset = g.dfl_hash_preset,
data_desc = 'data',
prompt = None ):
prompt = prompt or (
f'Enter hash preset for {data_desc},\n' +
f'or hit ENTER to accept the default value ({hash_preset!r}): ' )
f'or hit ENTER to accept the default value ({old_preset!r}): ' )
from .ui import line_input
while True:
@ -307,7 +307,7 @@ def get_hash_preset_from_user(
else:
msg('Invalid input. Valid choices are {}'.format(', '.join(hash_presets)))
else:
return hash_preset
return old_preset
def get_new_passphrase(data_desc,hash_preset,passwd_file,pw_desc='passphrase'):
message = f"""

View file

@ -163,7 +163,7 @@ class MMGenObjectMethods: # mixin class for MMGenObject
if isList(e) or isDict(e):
out.append('{:>{l}}{:<10} {:16}'.format( '', k, f'<{type(e).__name__}>', l=(lvl*8)+4 ))
do_list(out,e,lvl=lvl,is_dict=isDict(e))
elif hasattr(e,'pfmt') and type(e) != type:
elif hasattr(e,'pfmt') and callable(e.pfmt) and type(e) != type:
out.append('{:>{l}}{:10} {}'.format(
'',
k,

View file

@ -20,11 +20,8 @@
filename: File and MMGenFile classes and methods for the MMGen suite
"""
import sys,os
from .obj import *
import os
from .util import die,get_extension
from .seed import *
class File:
@ -46,6 +43,7 @@ class File:
import stat
if stat.S_ISBLK(st.st_mode):
mode = (os.O_RDONLY,os.O_RDWR)[bool(write)]
from .globalvars import g
if g.platform == 'win':
mode |= os.O_BINARY
try:

View file

@ -114,7 +114,7 @@ def check_outfile(f,blkdev_ok=False):
def check_outdir(f):
return _check_file_type_and_access(f,'output directory')
def get_seed_file(cmd_args,nargs,invoked_as=None):
def get_seed_file(wallets,nargs,invoked_as=None):
from .opts import opt
from .filename import find_file_in_dir
@ -125,19 +125,19 @@ def get_seed_file(cmd_args,nargs,invoked_as=None):
wd_from_opt = bool(opt.hidden_incog_input_params or opt.in_fmt) # have wallet data from opt?
import mmgen.opts as opts
if len(cmd_args) + (wd_from_opt or bool(wf)) < nargs:
if len(wallets) + (wd_from_opt or bool(wf)) < nargs:
if not wf:
msg('No default wallet found, and no other seed source was specified')
opts.usage()
elif len(cmd_args) > nargs:
elif len(wallets) > nargs:
opts.usage()
elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
elif len(wallets) == nargs and wf and invoked_as != 'gen':
qmsg('Warning: overriding default wallet with user-supplied wallet')
if cmd_args or wf:
check_infile(cmd_args[0] if cmd_args else wf)
if wallets or wf:
check_infile(wallets[0] if wallets else wf)
return cmd_args[0] if cmd_args else (wf,None)[wd_from_opt]
return wallets[0] if wallets else (wf,None)[wd_from_opt]
def _open_or_die(filename,mode,silent=False):
try:
@ -149,18 +149,21 @@ def _open_or_die(filename,mode,silent=False):
('reading' if 'r' in mode else 'writing')
))
def write_data_to_file( outfile,data,desc='data',
ask_write=False,
ask_write_prompt='',
ask_write_default_yes=True,
ask_overwrite=True,
ask_tty=True,
no_tty=False,
quiet=False,
binary=False,
ignore_opt_outdir=False,
check_data=False,
cmp_data=None):
def write_data_to_file(
outfile,
data,
desc = 'data',
ask_write = False,
ask_write_prompt = '',
ask_write_default_yes = True,
ask_overwrite = True,
ask_tty = True,
no_tty = False,
quiet = False,
binary = False,
ignore_opt_outdir = False,
check_data = False,
cmp_data = None):
from .opts import opt
@ -220,8 +223,9 @@ def write_data_to_file( outfile,data,desc='data',
if not ask_write_prompt:
ask_write_prompt = f'Save {desc}?'
from .ui import keypress_confirm
if not keypress_confirm(ask_write_prompt,
default_yes=ask_write_default_yes):
if not keypress_confirm(
ask_write_prompt,
default_yes = ask_write_default_yes ):
die(1,f'{capfirst(desc)} not saved')
hush = False
@ -282,7 +286,13 @@ def get_words_from_file(infile,desc,quiet=False):
return words
def get_data_from_file(infile,desc='data',dash=False,silent=False,binary=False,quiet=False):
def get_data_from_file(
infile,
desc = 'data',
dash = False,
silent = False,
binary = False,
quiet = False ):
from .opts import opt
if not (opt.quiet or silent or quiet):
@ -314,7 +324,13 @@ def _mmgen_decrypt_file_maybe(fn,desc='data',quiet=False,silent=False):
d = mmgen_decrypt_retry(d,desc)
return d
def get_lines_from_file(fn,desc='data',trim_comments=False,quiet=False,silent=False):
def get_lines_from_file(
fn,
desc = 'data',
trim_comments = False,
quiet = False,
silent = False ):
dec = _mmgen_decrypt_file_maybe(fn,desc=desc,quiet=quiet,silent=silent)
ret = dec.decode().splitlines()
if trim_comments:

View file

@ -169,7 +169,7 @@ class GlobalConfig(Lockable):
# user opt sets global var:
opt_sets_global = ( 'cached_balances', )
# 'long' opt sets global var (subset of common_opts_data):
# 'long' opts (subset of common_opts_data):
common_opts = (
'accept_defaults',
'aiohttp_rpc_queue_len',
@ -196,6 +196,7 @@ class GlobalConfig(Lockable):
# opts not in common_opts but required to be set during opts initialization
init_opts = ('show_hash_presets','yes','verbose')
incompatible_opts = (
('help','longhelp'),
('bob','alice','carol'),
@ -203,6 +204,7 @@ class GlobalConfig(Lockable):
('tx_id','info'),
('tx_id','terse_info'),
)
cfg_file_opts = (
'autochg_ignore_labels',
'color',
@ -269,7 +271,6 @@ class GlobalConfig(Lockable):
'MMGEN_FORCE_256_COLOR',
'MMGEN_HOLD_PROTECT_DISABLE',
'MMGEN_QUIET',
'MMGEN_MIN_URANDCHARS',
'MMGEN_NO_LICENSE',
'MMGEN_RPC_HOST',
'MMGEN_RPC_FAIL_ON_COMMAND',
@ -290,12 +291,11 @@ class GlobalConfig(Lockable):
'comment_file',
'contract_data',
)
# Auto-typechecked and auto-set opts. These have no corresponding value in g.
# First value in list is the default
ov = namedtuple('autoset_opt_info',['type','choices'])
# Auto-typechecked and auto-set opts - first value in list is the default
_ov = namedtuple('autoset_opt_info',['type','choices'])
autoset_opts = {
'fee_estimate_mode': ov('nocase_pfx', ['conservative','economical']),
'rpc_backend': ov('nocase_pfx', ['auto','httplib','curl','aiohttp','requests']),
'fee_estimate_mode': _ov('nocase_pfx', ['conservative','economical']),
'rpc_backend': _ov('nocase_pfx', ['auto','httplib','curl','aiohttp','requests']),
}
if platform == 'win':
_skip_type_check = ('stdout','stderr')
@ -319,6 +319,7 @@ class GlobalConfig(Lockable):
force_standalone_scrypt_module = False
if os.getenv('MMGEN_TEST_SUITE'):
min_urandchars = 3
err_disp_timeout = 0.1
short_disp_timeout = 0.1
if os.getenv('MMGEN_TEST_SUITE_POPEN_SPAWN'):

View file

@ -20,8 +20,9 @@
help: help notes for MMGen suite commands
"""
from .globalvars import g
def help_notes_func(proto,opt,k):
from .globalvars import g
def fee_spec_letters(use_quotes=False):
cu = proto.coin_amt.units

View file

@ -20,9 +20,10 @@
led: Control the LED on a single-board computer
"""
import sys,time
from mmgen.common import *
import threading
import os,threading
from collections import namedtuple
from .util import msg,msg_r,fmt,die
class LEDControl:

View file

@ -21,7 +21,9 @@ mmgen-addrgen: Generate a series or range of addresses from an MMGen
deterministic wallet
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .addr import MMGenAddrType
from .addrfile import AddrFile
from .wallet import Wallet
@ -126,15 +128,13 @@ FMT CODES:
cmd_args = opts.init(opts_data,opt_filter=opt_filter)
errmsg = f'{opt.type!r}: invalid parameter for --type option'
from .protocol import init_proto_from_opts
proto = init_proto_from_opts()
addr_type = MMGenAddrType(
proto = proto,
id_str = opt.type or proto.dfl_mmtype,
errmsg = errmsg )
errmsg = f'{opt.type!r}: invalid parameter for --type option' )
if len(cmd_args) < 1:
opts.usage()

View file

@ -22,7 +22,10 @@ mmgen-addrimport: Import addresses into a MMGen coin daemon tracking wallet
from collections import namedtuple
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import msg,qmsg,suf,die,fmt,async_run
from .addrlist import AddrList,KeyAddrList
from .tw.shared import TwLabel
@ -181,7 +184,8 @@ async def main():
del twctl
cmd_args = opts.init(opts_data)
from .protocol import init_proto_from_opts
proto = init_proto_from_opts()
import asyncio
async_run(main())

View file

@ -25,7 +25,11 @@ from subprocess import run,PIPE,DEVNULL
from collections import namedtuple
from stat import *
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import msg,msg_r,vmsg,qmsg,ymsg,rmsg,gmsg,bmsg,die,suf,fmt_list,async_run,exit_if_mswin
from .color import yellow,red,orange
mountpoint = '/mnt/tx'
tx_dir = '/mnt/tx/tx'
@ -38,8 +42,6 @@ mn_fmts = {
}
mn_fmt_dfl = 'mmgen'
opts.UserOpts._set_ok += ('outdir','passwd_file')
opts_data = {
'sets': [('stealth_led', True, 'led', True)],
'text': {
@ -131,6 +133,8 @@ cmd_args = opts.init(
'label': 'Autosign Wallet',
})
type(opt)._set_ok += ('outdir','passwd_file')
exit_if_mswin('autosigning')
if opt.mnemonic_fmt:

View file

@ -12,9 +12,20 @@
mmgen-msg: Message signing operations for the MMGen suite
"""
import sys
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .base_obj import AsyncInit
from .common import *
from .msg import *
from .util import msg,suf,async_run,stdout_or_pager
from .msg import (
NewMsg,
CompletedMsg,
UnsignedMsg,
SignedMsg,
SignedOnlineMsg,
ExportedMsgSigs,
)
class MsgOps:
ops = ('create','sign','verify')

View file

@ -21,7 +21,9 @@ mmgen-passgen: Generate a series or range of passwords from an MMGen
deterministic wallet
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .addrlist import AddrIdxList
from .passwdlist import PasswordList
from .wallet import Wallet
@ -177,5 +179,6 @@ if keypress_confirm('Encrypt password list?'):
af.write(binary=True,desc='encrypted password list')
else:
if g.test_suite_popen_spawn and g.platform == 'win':
import time
time.sleep(0.1)
af.write(desc='password list')

View file

@ -21,7 +21,9 @@ mmgen-regtest: Coin daemon regression test mode setup and operations for the MMG
suite
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .util import die,async_run
opts_data = {
'sets': [('yes', True, 'quiet', True)],

View file

@ -21,7 +21,11 @@ mmgen-seedjoin: Regenerate an MMGen deterministic wallet from seed shares
created by 'mmgen-seedsplit'
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import msg,msg_r,qmsg,die
from .color import yellow
from .obj import MMGenWalletLabel
from .seed import Seed
from .seedsplit import SeedSplitIDString,MasterShareIdx,SeedShareMasterJoining

View file

@ -21,8 +21,11 @@ mmgen-tool: Perform various MMGen- and cryptocoin-related operations.
Part of the MMGen suite
"""
import os,importlib
from .common import *
import sys,os,importlib
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import msg,Msg,die,capfirst,suf,async_run
opts_data = {
'text': {
@ -301,8 +304,6 @@ def process_result(ret,pager=False,print_result=False):
pager instead of returning it.
"""
from .util import Msg,die
def triage_result(o):
if print_result:
if pager:

View file

@ -21,7 +21,11 @@ mmgen-txbump: Increase the fee on a replaceable (replace-by-fee) MMGen
transaction, and optionally sign and send it
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import msg,msg_r,qmsg,die,async_run
from .color import green
from .wallet import Wallet
opts_data = {
@ -106,8 +110,8 @@ tx_file = cmd_args.pop(0)
from .fileutil import check_infile
check_infile(tx_file)
from .tx import *
from .tx.sign import *
from .tx import CompletedTX,BumpTX,UnsignedTX,OnlineSignedTX
from .tx.sign import txsign,get_seed_files,get_keyaddrlist,get_keylist
seed_files = get_seed_files(opt,cmd_args) if (cmd_args or opt.send) else None
@ -116,8 +120,6 @@ do_license_msg()
silent = opt.yes and opt.fee != None and opt.output_to_reduce != None
from .tx import CompletedTX,BumpTX,UnsignedTX,OnlineSignedTX
async def main():
orig_tx = await CompletedTX(filename=tx_file)

View file

@ -21,7 +21,10 @@ mmgen-txcreate: Create a cryptocoin transaction with MMGen- and/or non-MMGen
inputs and outputs
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import fmt_list,async_run
opts_data = {
'sets': [('yes', True, 'quiet', True)],

View file

@ -20,7 +20,10 @@
mmgen-txdo: Create, sign and broadcast an online MMGen transaction
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import die,fmt_list,async_run
from .wallet import Wallet
from .subseed import SubSeedIdxRange
@ -124,7 +127,7 @@ FMT CODES:
cmd_args = opts.init(opts_data)
from .tx import NewTX,OnlineSignedTX
from .tx.sign import *
from .tx.sign import txsign,get_seed_files,get_keyaddrlist,get_keylist
seed_files = get_seed_files(opt,cmd_args)

View file

@ -20,7 +20,12 @@
mmgen-txsend: Broadcast a transaction signed by 'mmgen-txsign' to the network
"""
from .common import *
import sys
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import vmsg,qmsg,async_run
opts_data = {
'sets': [('yes', True, 'quiet', True)],

View file

@ -20,7 +20,10 @@
mmgen-txsign: Sign a transaction generated by 'mmgen-txcreate'
"""
from .common import *
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .util import msg,ymsg,die,async_run
from .subseed import SubSeedIdxRange
from .wallet import Wallet
from .color import orange

View file

@ -20,8 +20,12 @@
main_wallet: Entry point for MMGen wallet-related scripts
"""
import os
from .common import *
import sys,os
import mmgen.opts as opts
from .globalvars import g
from .opts import opt
from .color import green,yellow
from .util import msg,qmsg,vmsg,gmsg_r,ymsg,bmsg,die,capfirst
from .wallet import Wallet,get_wallet_cls
usage = '[opts] [infile]'
@ -182,7 +186,9 @@ if invoked_as != 'chk':
if invoked_as == 'gen':
ss_in = None
else:
ss_in = Wallet(sf,passchg=(invoked_as=='passchg'))
ss_in = Wallet(
sf,
passchg = invoked_as=='passchg' )
m1 = green('Processing input wallet ')
m2 = ss_in.seed.sid.hl()
m3 = yellow(' (default wallet)') if sf and os.path.dirname(sf) == g.data_dir else ''
@ -198,12 +204,14 @@ if invoked_as != 'gen':
gmsg_r('Processing output wallet' + ('\n',' ')[invoked_as == 'seedsplit'])
if invoked_as == 'subgen':
ss_out = Wallet( seed_bin = ss_in.seed.subseed(ss_idx,print_msg=True).data )
ss_out = Wallet(
seed_bin = ss_in.seed.subseed(ss_idx,print_msg=True).data )
elif invoked_as == 'seedsplit':
shares = ss_in.seed.split(sss.count,sss.id,master_share)
seed_out = shares.get_share_by_idx(sss.idx,base_seed=True)
msg(seed_out.get_desc(ui=True))
ss_out = Wallet(seed=seed_out)
ss_out = Wallet(
seed = seed_out )
else:
ss_out = Wallet(
ss = ss_in,

View file

@ -376,10 +376,9 @@ class MnemonicEntry(object):
if validate:
self.bconv.tohex(words)
if self.has_chksum:
qmsg('Mnemonic is valid')
else:
qmsg('Mnemonic is well-formed (mnemonic format has no checksum to validate)')
qmsg(
'Mnemonic is valid' if self.has_chksum else
'Mnemonic is well-formed (mnemonic format has no checksum to validate)' )
return ' '.join(words)

View file

@ -79,12 +79,7 @@ def print_help(opt,opts_data,opt_filter):
remove_unneeded_long_opts()
from .ui import do_pager
do_pager(
mmgen.share.Opts.make_help(
proto,
opt,
opts_data,
opt_filter ))
do_pager(mmgen.share.Opts.make_help( proto, opt, opts_data, opt_filter ))
sys.exit(0)
@ -130,11 +125,11 @@ def opt_postproc_debug():
for k in a:
v = getattr(opt,k)
Msg(' {:18}: {!r:<6} [{}]'.format(k,v,type(v).__name__))
Msg(" Opts set to 'None':")
Msg(' {}\n'.format('\n '.join(b)))
Msg(' Global vars:')
for e in [d for d in dir(g) if d[:2] != '__']:
Msg(' {:<20}: {}'.format(e, getattr(g,e)))
Msg(" Opts set to 'None':")
Msg(' {}\n'.format('\n '.join(b)))
Msg('\n=== end opts.py debug ===\n')
def set_for_type(val,refval,desc,invert_bool=False,src=None):
@ -161,7 +156,12 @@ def set_for_type(val,refval,desc,invert_bool=False,src=None):
' in {!r}'.format(src) if src else '',
type(refval).__name__) )
def override_globals_from_cfg_file(ucfg,autoset_opts,env_globals,need_proto):
def override_globals_from_cfg_file(
ucfg,
autoset_opts,
env_globals,
need_proto ):
if need_proto:
from .protocol import init_proto
for d in ucfg.get_lines():
@ -563,13 +563,13 @@ def check_usr_opts(usr_opts): # Raises an exception if any check fails
opt_is_in_list(val,list(hash_presets.keys()),desc)
def chk_brain_params(key,val,desc):
from .seed import Seed
from .crypto import hash_presets
a = val.split(',')
if len(a) != 2:
opt_display(key,val)
die( 'UserOptError', 'Option requires two comma-separated arguments' )
opt_is_int(a[0],'seed length '+desc)
from .seed import Seed
opt_is_in_list(int(a[0]),Seed.lens,'seed length '+desc)
opt_is_in_list(a[1],list(hash_presets.keys()),'hash preset '+desc)
@ -649,11 +649,13 @@ def check_usr_opts(usr_opts): # Raises an exception if any check fails
Msg(f'check_usr_opts(): No test for opt {key!r}')
def set_auto_typeset_opts():
def do_set(key,val,ref_type):
setattr(opt,key,None if val is None else ref_type(val))
for key,ref_type in g.auto_typeset_opts.items():
if hasattr(opt,key):
val = getattr(opt,key)
if val is not None: # typeset only if opt is set
setattr(opt,key,ref_type(val))
do_set(key, getattr(opt,key), ref_type)
def get_autoset_opt(key,val,src):

View file

@ -22,6 +22,7 @@ passwdlist: Password list class for the MMGen suite
from collections import namedtuple
from .globalvars import g
from .util import ymsg,is_int,die
from .obj import ImmutableAttr,ListItemAttr,MMGenPWIDString,TwComment
from .key import PrivKey
@ -31,7 +32,6 @@ from .addrlist import (
AddrListIDStr,
AddrListEntryBase,
AddrList,
dmsg_sc,
)
class PasswordListEntry(AddrListEntryBase):
@ -64,7 +64,9 @@ class PasswordList(AddrList):
feature_warn_fs = 'WARNING: {!r} is a potentially dangerous feature. Use at your own risk!'
hex2bip39 = False
def __init__(self,proto,
def __init__(
self,
proto,
infile = None,
seed = None,
pw_idxs = None,
@ -72,10 +74,13 @@ class PasswordList(AddrList):
pw_len = None,
pw_fmt = None,
chk_params_only = False,
):
skip_chksum_msg = False ):
self.proto = proto # proto is ignored
if not g.debug_addrlist:
self.dmsg_sc = self.noop
if infile:
self.infile = infile
# sets self.pw_id_str, self.pw_fmt, self.pw_len, self.chk_func:
@ -101,7 +106,9 @@ class PasswordList(AddrList):
fs = f'{self.al_id.sid}-{self.pw_id_str}-{self.pw_fmt_disp}-{self.pw_len}[{{}}]'
self.id_str = AddrListIDStr(self,fs)
self.do_chksum_msg(record=not infile)
if not skip_chksum_msg:
self.do_chksum_msg(record=not infile)
def set_pw_fmt(self,pw_fmt):
if pw_fmt == 'hex2bip39':
@ -231,6 +238,6 @@ class PasswordList(AddrList):
pwlen = self.bip39.nwords2seedlen(self.pw_len,in_hex=True)
scramble_key = f'hex:{pwlen}:{self.pw_id_str}'
self.dmsg_sc('str',scramble_key)
from .crypto import scramble_seed
dmsg_sc('str',scramble_key)
return scramble_seed(seed,scramble_key.encode())

View file

@ -130,12 +130,12 @@ class bitcoin_cash_node_daemon(bitcoin_core_daemon):
exec_fn = 'bitcoind-bchn'
cli_fn = 'bitcoin-cli-bchn'
rpc_ports = _nw(8432, 18432, 18543) # use non-standard ports (core+100)
cfg_file_hdr = '# Bitcoin Cash Node config file\n'
nonstd_datadir = True
datadirs = {
'linux': [g.home_dir,'.bitcoin-bchn'],
'win': [os.getenv('APPDATA'),'Bitcoin_ABC']
}
cfg_file_hdr = '# Bitcoin Cash Node config file\n'
nonstd_datadir = True
def set_comment_args(self,rpc,coinaddr,lbl):
# bitcoin-{abc,bchn} 'setlabel' RPC is broken, so use old 'importaddress' method to set label
@ -160,8 +160,8 @@ class litecoin_core_daemon(bitcoin_core_daemon):
testnet_dir = 'testnet4'
rpc_ports = _nw(9332, 19332, 19443)
cfg_file = 'litecoin.conf'
cfg_file_hdr = '# Litecoin Core config file\n'
datadirs = {
'linux': [g.home_dir,'.litecoin'],
'win': [os.getenv('APPDATA'),'Litecoin']
}
cfg_file_hdr = '# Litecoin Core config file\n'

View file

@ -247,9 +247,10 @@ class Base(TxBase.Base):
ret = (old_size * 3 + new_size) // 4
dmsg('\nData from estimate_size():')
dmsg(f' inputs size: {isize}, outputs size: {osize}, witness size: {wsize}')
dmsg(f' size: {new_size}, vsize: {ret}, old_size: {old_size}')
dmsg(
'\nData from estimate_size():\n' +
f' inputs size: {isize}, outputs size: {osize}, witness size: {wsize}\n' +
f' size: {new_size}, vsize: {ret}, old_size: {old_size}' )
return int(ret * (opt.vsize_adj or 1))

View file

@ -32,7 +32,10 @@ class New(Base,TxBase.New):
async def get_rel_fee_from_network(self):
try:
ret = await self.rpc.call('estimatesmartfee',opt.fee_estimate_confs,opt.fee_estimate_mode.upper())
ret = await self.rpc.call(
'estimatesmartfee',
opt.fee_estimate_confs,
opt.fee_estimate_mode.upper() )
fee_per_kb = ret['feerate'] if 'feerate' in ret else -2
fe_type = 'estimatesmartfee'
except:

View file

@ -1,5 +1,4 @@
async def erigon_sleep(self):
from ...globalvars import g
if self.proto.network == 'regtest' and self.rpc.daemon.id == 'erigon':
import asyncio
await asyncio.sleep(5)

View file

@ -21,7 +21,6 @@ proto.eth.addrdata: Ethereum TwAddrData classes
"""
from ...addrdata import TwAddrData
from ...util import vmsg
class EthereumTwAddrData(TwAddrData):

View file

@ -94,12 +94,12 @@ class geth_daemon(ethereum_daemon):
exec_fn = 'geth'
use_pidfile = False
use_threads = True
avail_opts = ('no_daemonize','online')
version_info_arg = 'version'
datadirs = {
'linux': [g.home_dir,'.ethereum','geth'],
'win': [os.getenv('LOCALAPPDATA'),'Geth'] # FIXME
}
avail_opts = ('no_daemonize','online')
version_info_arg = 'version'
def init_subclass(self):
@ -131,11 +131,11 @@ class erigon_daemon(geth_daemon):
exec_fn = 'erigon'
private_ports = _nw(9090,9091,9092) # testnet and regtest are non-standard
torrent_ports = _nw(42069,42070,None) # testnet is non-standard
version_info_arg = '--version'
datadirs = {
'linux': [g.home_dir,'.local','share','erigon'],
'win': [os.getenv('LOCALAPPDATA'),'Erigon'] # FIXME
}
version_info_arg = '--version'
def init_subclass(self):

View file

@ -31,6 +31,8 @@ class coin_msg(coin_msg):
async def do_verify(self,addr,sig,message,msghash_type):
from ...tool.coin import tool_cmd
from .misc import ec_recover_pubkey
return tool_cmd(proto=self.proto).pubhex2addr(ec_recover_pubkey( message, sig, msghash_type )) == addr
return tool_cmd(
proto = self.proto).pubhex2addr(
ec_recover_pubkey( message, sig, msghash_type )) == addr
class exported_sigs(coin_msg.exported_sigs,signed_online): pass

View file

@ -173,6 +173,7 @@ class EthereumTokenTwCtl(EthereumTwCtl):
cur_eth_balances = {}
async def __init__(self,proto,mode='r',token_addr=None):
await super().__init__(proto,mode=mode)
for v in self.data['tokens'].values():

View file

@ -120,7 +120,12 @@ class MoneroWalletDaemon(RPCDaemon):
self.proxy = proxy
self.daemon_addr = daemon_addr
self.daemon_port = None if daemon_addr else CoinDaemon(proto=proto,test_suite=test_suite).rpc_port
self.daemon_port = (
None if daemon_addr else
CoinDaemon(
proto = proto,
test_suite = test_suite).rpc_port
)
self.host = host or opt.wallet_rpc_host or g.monero_wallet_rpc_host
self.user = user or opt.wallet_rpc_user or g.monero_wallet_rpc_user
@ -151,4 +156,6 @@ class MoneroWalletDaemon(RPCDaemon):
)
from .rpc import MoneroWalletRPCClient
self.rpc = MoneroWalletRPCClient( daemon=self, test_connection=False )
self.rpc = MoneroWalletRPCClient(
daemon = self,
test_connection = False )

View file

@ -230,7 +230,14 @@ class CoinProtocol(MMGenObject):
pubkey_type = self.pubkey_type,
compressed = False )
def init_proto(coin=None,testnet=False,regtest=False,network=None,network_id=None,tokensym=None,need_amt=False):
def init_proto(
coin = None,
testnet = False,
regtest = False,
network = None,
network_id = None,
tokensym = None,
need_amt = False ):
assert type(testnet) == bool, 'init_proto_chk1'
assert type(regtest) == bool, 'init_proto_chk2'

View file

@ -32,18 +32,19 @@ from .objmethods import Hilite,InitErrors,MMGenObject
auth_data = namedtuple('rpc_auth_data',['user','passwd'])
def dmsg_rpc(fs,data=None,is_json=False):
if g.debug_rpc:
msg(
fs if data == None else
fs.format(pp_fmt(json.loads(data) if is_json else data))
)
msg(
fs if data == None else
fs.format(pp_fmt(json.loads(data) if is_json else data))
)
def dmsg_rpc_backend(host_url,host_path,payload):
if g.debug_rpc:
msg(
f'\n RPC URL: {host_url}{host_path}' +
f'\n RPC PAYLOAD data (httplib) ==>' +
f'\n{pp_fmt(payload)}\n' )
msg(
f'\n RPC URL: {host_url}{host_path}' +
f'\n RPC PAYLOAD data (httplib) ==>' +
f'\n{pp_fmt(payload)}\n' )
def noop(*args,**kwargs):
pass
class IPPort(str,Hilite,InitErrors):
color = 'yellow'
@ -266,6 +267,10 @@ class RPCClient(MMGenObject):
if g.platform == 'win' and host == 'localhost':
host = '127.0.0.1'
global dmsg_rpc,dmsg_rpc_backend
if not g.debug_rpc:
dmsg_rpc = dmsg_rpc_backend = noop
dmsg_rpc(f'=== {type(self).__name__}.__init__() debug ===')
dmsg_rpc(f' cls [{type(self).__name__}] host [{host}] port [{port}]\n')

View file

@ -135,7 +135,8 @@ class SeedShareList(SubSeedList):
return self.get_subseed_by_seed_id(sid)
def join(self):
return Seed.join_shares(self.get_share_by_idx(i+1) for i in range(len(self)))
return Seed.join_shares(
[self.get_share_by_idx(i+1) for i in range(len(self))] )
def format(self):
assert self.split_type == 'N-of-N'
@ -218,7 +219,9 @@ class SeedShareLast(SeedShareBase,SeedBase):
def __init__(self,parent_list):
self.idx = parent_list.count
self.parent_list = parent_list
SeedBase.__init__(self,seed_bin=self.make_subseed_bin(parent_list))
SeedBase.__init__(
self,
seed_bin=self.make_subseed_bin(parent_list) )
@staticmethod
def make_subseed_bin(parent_list):
@ -240,9 +243,12 @@ class SeedShareMaster(SeedBase,SeedShareBase):
self.idx = idx
self.nonce = nonce
self.parent_list = parent_list
SeedBase.__init__(self,self.make_base_seed_bin())
self.derived_seed = SeedBase(self.make_derived_seed_bin(parent_list.id_str,parent_list.count))
SeedBase.__init__( self, self.make_base_seed_bin() )
self.derived_seed = SeedBase(
self.make_derived_seed_bin( parent_list.id_str, parent_list.count )
)
@property
def fn_stem(self):
@ -275,13 +281,18 @@ class SeedShareMasterJoining(SeedShareMaster):
count = ImmutableAttr(SeedShareCount)
def __init__(self,idx,base_seed,id_str,count):
SeedBase.__init__(self,seed_bin=base_seed.data)
self.id_str = id_str or 'default'
self.count = count
self.derived_seed = SeedBase(self.make_derived_seed_bin(self.id_str,self.count))
self.derived_seed = SeedBase( self.make_derived_seed_bin(self.id_str,self.count) )
def join_shares(
seed_list,
master_idx = None,
id_str = None ):
def join_shares(seed_list,master_idx=None,id_str=None):
if not hasattr(seed_list,'__next__'): # seed_list can be iterator or iterable
seed_list = iter(seed_list)
@ -305,5 +316,6 @@ def join_shares(seed_list,master_idx=None,id_str=None):
if master_idx:
add_share(SeedShareMasterJoining(master_idx,master_share,id_str,d.count+1).derived_seed)
SeedShareCount(d.count)
SeedShareCount(d.count) # check that d.count is in valid range
return Seed(seed_bin=d.ret.to_bytes(d.byte_len,'big'))

View file

@ -65,7 +65,9 @@ class SubSeed(SeedBase):
self.nonce = nonce
self.ss_idx = str(idx) + { 'long': 'L', 'short': 'S' }[length]
self.parent_list = parent_list
SeedBase.__init__(self,seed_bin=type(self).make_subseed_bin(parent_list,idx,nonce,length))
SeedBase.__init__(
self,
seed_bin=self.make_subseed_bin( parent_list, idx, nonce, length ))
@staticmethod
def make_subseed_bin(parent_list,idx:int,nonce:int,length:str):

View file

@ -33,12 +33,12 @@ class tool_cmd(tool_cmd_base):
super().__init__(cmdname=cmdname,proto=proto,mmtype=mmtype)
def _file_chksum(self,mmgen_addrfile,obj):
kwargs = {'skip_chksum_msg':True}
if not obj.__name__ == 'PasswordList':
kwargs.update({'key_address_validity_check':False})
ret = obj( self.proto, mmgen_addrfile, **kwargs )
from ..opts import opt
verbose,yes,quiet = [bool(i) for i in (opt.verbose,opt.yes,opt.quiet)]
opt.verbose,opt.yes,opt.quiet = (False,True,True)
ret = obj(self.proto,mmgen_addrfile)
opt.verbose,opt.yes,opt.quiet = (verbose,yes,quiet)
if verbose:
if opt.verbose:
from ..util import msg,capfirst
if ret.al_id.mmtype.name == 'password':
msg('Passwd fmt: {}\nPasswd len: {}\nID string: {}'.format(

View file

@ -60,8 +60,7 @@ class tool_cmd(tool_cmd_base):
use the hex2wif command.
"""
@staticmethod
def _xmr_reduce(bytestr):
def _xmr_reduce(self,bytestr):
from ..protocol import init_proto
proto = init_proto('xmr')
if len(bytestr) != proto.privkey_len:

View file

@ -31,13 +31,14 @@ class tool_cmd(tool_cmd_base):
"key, address or subseed generation from an MMGen wallet"
def __init__(self,cmdname=None,proto=None,mmtype=None):
if cmdname in ('gen_key','gen_addr'):
self.need_proto = True
self.need_proto = cmdname in ('gen_key','gen_addr')
super().__init__(cmdname=cmdname,proto=proto,mmtype=mmtype)
def _get_seed_file(self,wallet):
from ..fileutil import get_seed_file
return get_seed_file([wallet] if wallet else [],1)
return get_seed_file(
wallets = [wallet] if wallet else [],
nargs = 1 )
def get_subseed(self,subseed_idx:str,wallet=''):
"get the Seed ID of a single subseed by Subseed Index for default or specified wallet"
@ -89,6 +90,7 @@ class tool_cmd(tool_cmd_base):
proto = self.proto,
seed = ss.seed,
addr_idxs = AddrIdxList(str(addr.idx)),
mmtype = addr.mmtype ).data[0]
mmtype = addr.mmtype,
skip_chksum = True ).data[0]
return { 'wif': d.sec.wif, 'addr': d.addr }[target]

View file

@ -109,8 +109,10 @@ class TwJSON:
check_network(d['data'])
check_chksum(d)
compare_or_die(
self.mappings_chksum, 'computed mappings checksum',
d['data']['mappings_checksum'], 'saved checksum' )
val1 = self.mappings_chksum,
val2 = d['data']['mappings_checksum'],
desc1 = 'computed mappings checksum',
desc2 = 'saved checksum' )
if not await self.check_and_create_wallet():
return True
@ -189,10 +191,10 @@ class TwJSON:
from ..fileutil import write_data_to_file
write_data_to_file(
outfile = self.dump_fn,
data = self.json_dump(
data = self.json_dump(
{
'checksum': self.make_chksum(data),
'data': data
},
pretty = pretty ),
desc = f'tracking wallet JSON data' )
desc = f'tracking wallet JSON data' )

View file

@ -611,20 +611,20 @@ class TwView(MMGenObject,metaclass=AsyncInit):
('' if parent.proto.network == 'mainnet' else '-'+parent.proto.network.upper()),
','.join(parent.sort_info(include_group=False)).replace(' ','') )
from ..fileutil import write_data_to_file
from ..exception import UserNonConfirmation
print_hdr = getattr(parent.display_type,output_type).print_header.format(parent.cols)
msg_r(parent.blank_prompt if parent.scroll else '\n')
from ..fileutil import write_data_to_file
from ..exception import UserNonConfirmation
try:
write_data_to_file(
outfile = outfile,
data = print_hdr + await parent.format(
display_type = output_type,
data = print_hdr + await parent.format(
display_type = output_type,
line_processing = 'print',
color = False ),
desc = f'{parent.desc} listing' )
color = False ),
desc = f'{parent.desc} listing' )
except UserNonConfirmation as e:
parent.oneshot_msg = yellow(f'File {outfile!r} not overwritten by user request')
else:
@ -652,7 +652,8 @@ class TwView(MMGenObject,metaclass=AsyncInit):
from ..ui import line_input
while True:
msg_r(parent.blank_prompt if parent.scroll else '\n')
ret = line_input(f'Enter {parent.item_desc} number (or ENTER to return to main menu): ')
ret = line_input(
f'Enter {parent.item_desc} number (or ENTER to return to main menu): ' )
if ret == '':
if parent.scroll:
msg_r( CUR_UP(1) + '\r' + ''.ljust(parent.term_width) )
@ -685,7 +686,7 @@ class TwView(MMGenObject,metaclass=AsyncInit):
async def i_balance_refresh(self,parent,idx):
if not parent.keypress_confirm(
f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?'):
f'Refreshing tracking wallet {parent.item_desc} #{idx}. Is this what you want?' ):
return 'redo'
await parent.twctl.get_balance( parent.disp_data[idx-1].addr, force_rpc=True )
await parent.get_data()
@ -739,7 +740,8 @@ class TwView(MMGenObject,metaclass=AsyncInit):
parent.oneshot_msg = yellow(f'Label for {desc} unchanged')
return None
elif res == '':
if not parent.keypress_confirm(f'Removing label for {desc}. Is this what you want?'):
if not parent.keypress_confirm(
f'Removing label for {desc}. Is this what you want?' ):
return 'redo'
return await do_comment_add(res)

View file

@ -113,7 +113,7 @@ class Base(MMGenObject):
self.inputs = self.InputList(self)
self.outputs = self.OutputList(self)
self.name = type(self).__name__
self.proto = kwargs.get('proto')
self.proto = kwargs['proto']
self.twctl = kwargs.get('twctl')
@property

View file

@ -148,8 +148,6 @@ class MMGenTxFile(MMGenObject):
if tx.get_serialized_locktime():
yield ',tl={}'.format(tx.get_serialized_locktime())
yield ']'
if g.debug_utf8:
yield ''
if tx.proto.testnet:
yield '.' + tx.proto.network
yield '.' + tx.ext

View file

@ -250,9 +250,9 @@ class New(Base):
ad_f = AddrData(self.proto)
from ..fileutil import check_infile
for a in addrfiles:
check_infile(a)
ad_f.add(AddrList(self.proto,a))
for addrfile in addrfiles:
check_infile(addrfile)
ad_f.add(AddrList( self.proto, addrfile ))
ad_w = await TwAddrData(self.proto,twctl=self.twctl)
@ -406,6 +406,7 @@ class New(Base):
if do_info:
del self.twuo.twctl
import sys
sys.exit(0)
outputs_sum = self.sum_outputs()

View file

@ -20,7 +20,9 @@
tx.sign: Sign a transaction generated by 'mmgen-txcreate'
"""
from ..common import *
from ..globalvars import g
from ..opts import opt
from ..util import msg,vmsg,qmsg,suf,fmt,die,remove_dups,get_extension
from ..obj import MMGenList
from ..addr import MMGenAddrType
from ..addrlist import AddrIdxList,KeyAddrList

View file

@ -29,12 +29,14 @@ def confirm_or_raise(message,action,expect='YES',exit_msg='Exiting at user reque
def get_words_from_user(prompt):
words = line_input(prompt, echo=opt.echo_passphrase).split()
dmsg('Sanitized input: [{}]'.format(' '.join(words)))
if g.debug:
msg('Sanitized input: [{}]'.format(' '.join(words)))
return words
def get_data_from_user(desc='data'): # user input MUST be UTF-8
data = line_input(f'Enter {desc}: ',echo=opt.echo_passphrase)
dmsg(f'User input: [{data}]')
if g.debug:
msg(f'User input: [{data}]')
return data
def line_input(prompt,echo=True,insert_txt='',hold_protect=True):
@ -80,7 +82,12 @@ def line_input(prompt,echo=True,insert_txt='',hold_protect=True):
return reply.strip()
def keypress_confirm(prompt,default_yes=False,verbose=False,no_nl=False,complete_prompt=False):
def keypress_confirm(
prompt,
default_yes = False,
verbose = False,
no_nl = False,
complete_prompt = False ):
if not complete_prompt:
prompt = '{} {}: '.format( prompt, '(Y/n)' if default_yes else '(y/N)' )

View file

@ -40,7 +40,6 @@ def get_keccak(cached_ret=[]):
if not cached_ret:
from .opts import opt
# called in opts.init() via CoinProtocol, so must use getattr():
if getattr(opt,'use_internal_keccak_module',False):
qmsg('Using internal keccak module by user request')
from .contrib.keccak import keccak_256

View file

@ -95,7 +95,7 @@ def format_fmt_codes():
('Format','FileExt','Valid codes'),
('------','-------','-----------')
] + sorted(d) ]
return '\n'.join(ret) + ('','')[g.debug_utf8] + '\n'
return '\n'.join(ret) + '\n'
def _get_me(modname):
return MMGenObject.__new__( getattr( importlib.import_module(f'mmgen.wallet.{modname}'), 'wallet' ) )

View file

@ -75,7 +75,10 @@ class wallet(MMGenObject,metaclass=WalletMeta):
def _get_data(self):
if hasattr(self,'infile'):
from ..fileutil import get_data_from_file
self.fmt_data = get_data_from_file(self.infile.name,self.desc,binary=self.file_mode=='binary')
self.fmt_data = get_data_from_file(
self.infile.name,
self.desc,
binary = self.file_mode=='binary' )
elif self.in_data:
self.fmt_data = self.in_data
else:

View file

@ -37,9 +37,6 @@ class wallet(wallet):
def _decrypt(self):
d = self.ssdata
if opt.brain_params:
"""
Don't set opt.seed_len! When using multiple wallets, BW seed len might differ from others
"""
bw_seed_len,d.hash_preset = self.get_bw_params()
else:
if not opt.seed_len:

View file

@ -30,15 +30,15 @@ class wallet(wallet):
die(2,'Passphrase from password file, so exiting')
msg('Trying again...')
def _get_hash_preset_from_user(self,hp,add_desc=''):
def _get_hash_preset_from_user(self,old_preset,add_desc=''):
prompt = 'Enter {}hash preset for {}{}{},\nor hit ENTER to {} value ({!r}): '.format(
('old ' if self.op=='pwchg_old' else 'new ' if self.op=='pwchg_new' else ''),
('','new ')[self.op=='new'],
self.desc,
('',' '+add_desc)[bool(add_desc)],
('accept the default','reuse the old')[self.op=='pwchg_new'],
hp )
return crypto.get_hash_preset_from_user( hash_preset=hp, prompt=prompt )
old_preset )
return crypto.get_hash_preset_from_user( old_preset=old_preset, prompt=prompt )
def _get_hash_preset(self,add_desc=''):
if hasattr(self,'ss_in') and hasattr(self.ss_in.ssdata,'hash_preset'):
@ -57,7 +57,9 @@ class wallet(wallet):
hp = opt.hash_preset
qmsg(f'Using hash preset {hp!r} requested on command line')
else:
hp = self._get_hash_preset_from_user(g.dfl_hash_preset,add_desc)
hp = self._get_hash_preset_from_user(
old_preset = g.dfl_hash_preset,
add_desc = add_desc )
self.ssdata.hash_preset = hp
def _get_new_passphrase(self):

View file

@ -110,14 +110,13 @@ class wallet(wallet):
def _filename(self):
s = self.seed
d = self.ssdata
return '{}-{}-{}[{},{}]{x}.{}'.format(
s.fn_stem,
d.key_id,
d.iv_id,
s.bitlen,
d.hash_preset,
self.ext,
x='' if g.debug_utf8 else '')
return '{}-{}-{}[{},{}].{}'.format(
s.fn_stem,
d.key_id,
d.iv_id,
s.bitlen,
d.hash_preset,
self.ext )
def _deformat(self):

View file

@ -85,8 +85,11 @@ class wallet(wallet):
def write_to_file(self):
d = self.ssdata
self._format()
compare_or_die(d.target_data_len, 'target data length',
len(self.fmt_data),'length of formatted ' + self.desc)
compare_or_die(
val1 = d.target_data_len,
desc1 = 'target data length',
val2 = len(self.fmt_data),
desc2 = 'length of formatted ' + self.desc )
k = ('output','input')[self.op=='pwchg_new']
fn,d.hincog_offset = self._get_hincog_params(k)

View file

@ -183,10 +183,9 @@ class wallet(wallet):
def _filename(self):
s = self.seed
d = self.ssdata
return '{}-{}[{},{}]{x}.{}'.format(
s.fn_stem,
d.key_id,
s.bitlen,
d.hash_preset,
self.ext,
x='' if g.debug_utf8 else '')
return '{}-{}[{},{}].{}'.format(
s.fn_stem,
d.key_id,
s.bitlen,
d.hash_preset,
self.ext )

View file

@ -35,8 +35,9 @@ class wallet(wallet):
from ..ui import get_data_from_user
return get_data_from_user(desc)
from ..mn_entry import mn_entry # import here to catch cfg var errors
mn_len = self._choose_seedlen( self.mn_lens )
from ..mn_entry import mn_entry
return mn_entry(self.wl_id).get_mnemonic_from_user(mn_len)
def _format(self):
@ -76,8 +77,13 @@ class wallet(wallet):
msg('Invalid mnemonic (produces too large a number)')
return False
# Internal error, so just die
compare_or_die( ' '.join(rev), 'recomputed mnemonic', ' '.join(mn), 'original', e='Internal error' )
# Internal error, so just die:
compare_or_die(
val1 = ' '.join(rev),
val2 = ' '.join(mn),
desc1 = 'recomputed mnemonic',
desc2 = 'original mnemonic',
e = 'Internal error' )
self.seed = Seed(bytes.fromhex(hexseed))
self.ssdata.mnemonic = mn

View file

@ -27,11 +27,10 @@ class wallet(wallet):
def _filename(self):
s = self.seed
return '{}[{}]{x}.{}'.format(
return '{}[{}].{}'.format(
s.fn_stem,
s.bitlen,
self.ext,
x='' if g.debug_utf8 else '')
self.ext )
def _choose_seedlen(self,ok_lens):
@ -54,5 +53,8 @@ class wallet(wallet):
usr_len = choose_len()
prompt = self.choose_seedlen_confirm.format(usr_len)
from ..ui import keypress_confirm
if keypress_confirm(prompt,default_yes=True,no_nl=not g.test_suite):
if keypress_confirm(
prompt,
default_yes = True,
no_nl = not g.test_suite ):
return usr_len

View file

@ -22,9 +22,28 @@ xmrwallet.py - MoneroWalletOps class
import os,re,time,json
from collections import namedtuple
from .common import *
from .globalvars import g
from .opts import opt
from .objmethods import MMGenObject,Hilite,InitErrors
from .obj import CoinTxID
from .color import red,yellow,green,blue,cyan,pink,orange
from .util import (
msg,
msg_r,
gmsg,
ymsg,
gmsg_r,
pp_msg,
die,
fmt,
suf,
async_run,
make_timestr,
make_chksum_6,
capfirst,
stdout_or_pager,
)
from .seed import SeedID
from .protocol import init_proto
from .proto.btc.common import b58a
@ -183,15 +202,23 @@ class MoneroMMGenTX:
(lambda s: '' if s == 'mainnet' else f'.{s}')(self.data.network),
)
from .fileutil import write_data_to_file
write_data_to_file(fn,out,desc='MoneroMMGenTX data',ask_write=True,ask_write_default_yes=False)
write_data_to_file(
outfile = fn,
data = out,
desc = 'MoneroMMGenTX data',
ask_write = True,
ask_write_default_yes = False )
class NewSigned(Base):
def __init__(self,*args,**kwargs):
assert not args, 'Non-keyword args not permitted'
d = namedtuple('kwargs_tuple',kwargs)(**kwargs)
proto = init_proto( 'xmr', network=d.network, need_amt=True )
now = int(time.time())
self.data = self.xmrwallet_tx_data(
op = d.op,
create_time = now,
@ -370,7 +397,10 @@ class MoneroWalletOps:
daemon_addr = uopt.daemon or None,
)
self.c = MoneroWalletRPCClient(daemon=self.wd,test_connection=False)
self.c = MoneroWalletRPCClient(
daemon = self.wd,
test_connection = False,
)
if not uopt.no_start_wallet_daemon:
async_run(self.c.restart_daemon())
@ -390,11 +420,10 @@ class MoneroWalletOps:
def get_wallet_fn(self,d):
return os.path.join(
uopt.wallet_dir or '.','{}-{}-MoneroWallet{}{}'.format(
uopt.wallet_dir or '.','{}-{}-MoneroWallet{}'.format(
self.kal.al_id.sid,
d.idx,
f'.{g.network}' if g.network != 'mainnet' else '',
'' if g.debug_utf8 else '' ))
f'.{g.network}' if g.network != 'mainnet' else ''))
async def main(self):
gmsg('\n{}ing {} wallet{}'.format(
@ -646,9 +675,11 @@ class MoneroWalletOps:
port = int(port),
user = None,
passwd = None )
self.accts_data = {}
async def process_wallet(self,d,fn,last):
chain_height = self.dc.call_raw('get_height')['height']
msg(f' Chain height: {chain_height}')
@ -823,7 +854,8 @@ class MoneroWalletOps:
wd2.start()
self.c = MoneroWalletRPCClient(daemon=wd2)
self.c = MoneroWalletRPCClient(
daemon = wd2 )
async def main(self):

View file

@ -187,9 +187,9 @@ def create_src(proto,template,token_data,owner_addr):
val = '0x' + owner_addr
else:
val = (
getattr(opt,k) if getattr(opt,k,None) else
field.default if field.default is not None else
die(1,f'The --{k} option must be specified')
getattr(opt,k)
or getattr(field,'default',None)
or die(1,f'The --{k} option must be specified')
)
if not field.test(val):
die(1,f'{val!r}: invalid parameter for option --{k}')

View file

@ -4,9 +4,8 @@
# file, as all names will be seen by the exec'ed code. To prevent name collisions, all names
# defined here should begin with 'exec_wrapper_'
import sys,os,time
def exec_wrapper_get_colors():
import os
from collections import namedtuple
return namedtuple('colors',['red','green','yellow','blue','purple'])(*[
(lambda s:s) if os.getenv('MMGEN_DISABLE_COLOR') else
@ -15,6 +14,7 @@ def exec_wrapper_get_colors():
def exec_wrapper_init(): # don't change: name is used to test if script is running under exec_wrapper
import os
if os.path.dirname(sys.argv[1]) == 'test': # scripts in ./test do overlay setup themselves
sys.path[0] = 'test'
else:
@ -39,6 +39,7 @@ def exec_wrapper_write_traceback(e,exit_val):
'{}: {}'.format( type(e).__name__, e ))
c = exec_wrapper_get_colors()
import os
if os.getenv('EXEC_WRAPPER_TRACEBACK'):
import traceback
@ -73,12 +74,15 @@ def exec_wrapper_write_traceback(e,exit_val):
sys.stdout.write( c.purple((f'NONZERO_EXIT[{exit_val}]: ' if exit_val else '') + exc_line) + '\n' )
def exec_wrapper_end_msg():
import os
if os.getenv('EXEC_WRAPPER_SPAWN') and not os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC'):
c = exec_wrapper_get_colors()
# write to stdout to ensure script output gets to terminal first
import time
sys.stdout.write(c.blue('Runtime: {:0.5f} secs\n'.format(time.time() - exec_wrapper_tstart)))
def exec_wrapper_tracemalloc_setup():
import os
if os.getenv('MMGEN_TRACEMALLOC'):
os.environ['PYTHONTRACEMALLOC'] = '1'
import tracemalloc
@ -86,6 +90,7 @@ def exec_wrapper_tracemalloc_setup():
sys.stderr.write("INFO → Appending memory allocation stats to 'tracemalloc.log'\n")
def exec_wrapper_tracemalloc_log():
import os
if os.getenv('MMGEN_TRACEMALLOC'):
import tracemalloc,re
snapshot = tracemalloc.take_snapshot()
@ -107,13 +112,20 @@ def exec_wrapper_tracemalloc_log():
s = sum(stat.size for stat in stats) / 1024,
w = col1w ))
def exec_wrapper_get_tstart():
import time
return time.time()
import sys # this is the only module we import into namespace of exec’ed code
exec_wrapper_init() # sets sys.path[0], runs overlay_setup()
exec_wrapper_tstart = time.time()
exec_wrapper_tracemalloc_setup()
from mmgen.devinit import init_dev # import mmgen mods only after overlay setup!
init_dev()
exec_wrapper_tstart = exec_wrapper_get_tstart()
try:
sys.argv.pop(0)
exec_wrapper_execed_file = sys.argv[0]

View file

@ -18,6 +18,10 @@
import sys,os
import mmgen.opts as opts
from mmgen.globalvars import g
from mmgen.util import msg,die
def normalize_path(p):
return os.path.normpath(os.path.realpath(os.path.abspath(p)))
@ -27,15 +31,14 @@ for n in reversed(range(len(sys.path))):
if normalize_path(sys.path[n]) == curdir:
del(sys.path[n])
try: import mmgen.main
try:
import mmgen.main
except:
sys.stderr.write('Failed to import mmgen.main module. Is MMGen installed?\n')
sys.exit(1)
modpath_save = sys.modules['mmgen.main'].__spec__.origin
from mmgen.common import *
opts_data = {
'text': {
'desc': 'Remove MMGen from your system',

View file

@ -20,15 +20,18 @@
test/gentest.py: Cryptocoin key/address generation tests for the MMGen suite
"""
import sys,os
import sys,os,time
from include.tests_header import repo_root
from test.overlay import overlay_setup
sys.path.insert(0,overlay_setup(repo_root))
# Import these _after_ local path's been added to sys.path
from mmgen.common import *
from test.include.common import getrand,get_ethkey
import mmgen.opts as opts
from mmgen.globalvars import g
from mmgen.opts import opt
from mmgen.color import green,red,purple
from mmgen.util import msg,qmsg,qmsg_r,vmsg,capfirst,is_int,die
results_file = 'gentest.out.json'
@ -133,9 +136,6 @@ SUPPORTED EXTERNAL TOOLS:
}
}
gtr = namedtuple('gen_tool_result',['wif','addr','viewkey'])
sd = namedtuple('saved_data_item',['reduced','wif','addr','viewkey'])
def get_cmd_output(cmd,input=None):
return run(cmd,input=input,stdout=PIPE,stderr=DEVNULL).stdout.decode().splitlines()
@ -466,7 +466,6 @@ def parse_args():
opts.usage()
arg1,arg2 = cmd_args
cfg = namedtuple('parsed_args',['test','gen1','gen2','rounds','tool','all_backends','dumpfile'])
gen1,gen2,rounds = (0,0,0)
tool,all_backends,dumpfile = (None,None,None)
@ -509,7 +508,7 @@ def parse_args():
if b not in ext_progs:
die(1,f'Second part of first argument must be a generator backend number or one of {ext_progs}')
return cfg(
return namedtuple('parsed_args',['test','gen1','gen2','rounds','tool','all_backends','dumpfile'])(
test,
int(gen1) or None,
int(gen2) or None,
@ -550,6 +549,11 @@ from mmgen.addr import MMGenAddrType
from mmgen.addrgen import KeyGenerator,AddrGenerator
from mmgen.keygen import get_backends
from test.include.common import getrand,get_ethkey
gtr = namedtuple('gen_tool_result',['wif','addr','viewkey'])
sd = namedtuple('saved_data_item',['reduced','wif','addr','viewkey'])
sys.argv = [sys.argv[0]] + ['--skip-cfg-file'] + sys.argv[1:]
cmd_args = opts.init(opts_data)

View file

@ -26,11 +26,14 @@ opts_data = {
-q, --quiet Be quieter
-X, --cached-balances Use cached balances (Ethereum only)
-v, --verbose Be more verbose
sample help_note: {kgs}
sample help_note: {coin_id}
""",
'notes': """
NOTES FOR THIS COMMAND
{nn}
sample note: {nn}
"""
},
'code': {

View file

@ -5,11 +5,12 @@ if overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA'):
class overlay_fake_data:
async def get_rpc_data(foo):
async def get_rpc_data(self):
from decimal import Decimal
import json
from ....fileutil import get_data_from_file
return json.loads(get_data_from_file(
overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA')),parse_float=Decimal)
overlay_fake_os.getenv('MMGEN_BOGUS_UNSPENT_DATA')
), parse_float=Decimal)
BitcoinTwUnspentOutputs.get_rpc_data = overlay_fake_data.get_rpc_data

View file

@ -153,11 +153,11 @@ init_tests() {
d_btc="overall operations with emulated RPC data (Bitcoin)"
t_btc="
- $python scripts/compute-file-chksum.py $REFDIR/*testnet.rawtx >/dev/null 2>&1
- $test_py --exclude regtest,autosign,ref_altcoin
- $test_py --segwit
- $test_py --segwit-random
- $test_py --bech32
- $python scripts/compute-file-chksum.py $REFDIR/*testnet.rawtx >/dev/null 2>&1
"
d_btc_tn="overall operations with emulated RPC data (Bitcoin testnet)"

View file

@ -343,7 +343,6 @@ def set_environ_for_spawned_scripts():
os.environ['PYTHONPATH'] = repo_root
os.environ['MMGEN_NO_LICENSE'] = '1'
os.environ['MMGEN_MIN_URANDCHARS'] = '3'
os.environ['MMGEN_BOGUS_SEND'] = '1'
os.environ['MMGEN_TEST_SUITE_PEXPECT'] = '1'
@ -704,7 +703,7 @@ class TestSuiteRunner(object):
self.ts = self.gm.gm_init_group(self,gname,sg_name,self.spawn_wrapper)
self.ts_clsname = type(self.ts).__name__
# only pass through opts that are explicitly set on cmdline (po.user_opts)
# pass through opts from cmdline (po.user_opts)
self.passthru_opts = ['--{}{}'.format(
k.replace('_','-'),
'=' + getattr(opt,k) if getattr(opt,k) != True else ''

View file

@ -70,7 +70,8 @@ chksum_pat = r'\b[A-F0-9]{4} [A-F0-9]{4} [A-F0-9]{4} [A-F0-9]{4}\b'
Ctrl_U = '\x15'
def ok_msg():
if opt.profile: return
if opt.profile:
return
sys.stderr.write(green('\nOK\n') if opt.exact_output or opt.verbose else ' OK\n')
def skip(name,reason=None):
@ -79,8 +80,12 @@ def skip(name,reason=None):
def confirm_continue():
from mmgen.ui import keypress_confirm
if keypress_confirm(blue('Continue? (Y/n): '),default_yes=True,complete_prompt=True):
if opt.verbose or opt.exact_output: sys.stderr.write('\n')
if keypress_confirm(
blue('Continue? (Y/n): '),
default_yes = True,
complete_prompt = True ):
if opt.verbose or opt.exact_output:
sys.stderr.write('\n')
else:
raise KeyboardInterrupt('Exiting at user request')
@ -104,8 +109,11 @@ def get_file_with_ext(tdir,ext,delete=True,no_dot=False,return_list=False,delete
dot = ('.','')[bool(no_dot)]
flist = [os.path.join(tdir,f) for f in os.listdir(tdir) if f == ext or f[-len(dot+ext):] == dot+ext]
if not flist: return False
if return_list: return flist
if not flist:
return False
if return_list:
return flist
if len(flist) > 1 or delete_all:
if delete or delete_all:
@ -127,21 +135,21 @@ def get_comment(do_shuffle=False):
"Healthcare",
tx_comment_jp[:40],
tx_comment_zh[:40],
"Alice's allowance",
"Bob's bequest",
"Alices allowance",
"Bobs bequest",
"House purchase",
"Real estate fund",
"Job 1",
"XYZ Corp.",
"Eddie's endowment",
"Eddies endowment",
"Emergency fund",
"Real estate fund",
"Ian's inheritance",
"Ians inheritance",
"",
"Rainy day",
"Fred's funds",
"Freds funds",
"Job 2",
"Carl's capital",
"Carls capital",
]
from random import shuffle
global label_iter

View file

@ -205,12 +205,12 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
def _get_addrfile_checksum(self,display=False):
addrfile = self.get_file_with_ext('addrs')
silence()
from mmgen.addrlist import AddrList
chk = AddrList(self.proto,addrfile).chksum
silence()
chk = AddrList( self.proto, addrfile ).chksum
end_silence()
if opt.verbose and display:
msg(f'Checksum: {cyan(chk)}')
end_silence()
return chk
def walletgen_dfl_wallet(self,seed_len=None):
@ -239,7 +239,8 @@ class TestSuiteMain(TestSuiteBase,TestSuiteShared):
def delete_dfl_wallet(self,pf):
self.write_to_tmpfile('del_dw_run',b'',binary=True)
if opt.no_dw_delete: return 'skip'
if opt.no_dw_delete:
return 'skip'
for wf in [f for f in os.listdir(g.data_dir) if f[-6:]=='.mmdat']:
os.unlink(joinpath(g.data_dir,wf))
self.spawn('',msg_only=True)

View file

@ -247,7 +247,10 @@ class TestSuiteXMRWallet(TestSuiteBase):
datadir = os.path.join('test','daemons'),
daemon_addr = f'127.0.0.1:{md.rpc_port}',
)
wd_rpc = MoneroWalletRPCClient( daemon=wd, test_connection=False )
wd_rpc = MoneroWalletRPCClient(
daemon = wd,
test_connection = False,
)
self.users[user] = ud(
sid = sid,
mmwords = f'test/ref/{sid}.mmwords',

View file

@ -53,7 +53,7 @@ class unit_test(object):
qmsg('OK')
qmsg_r('Testing class Lockable...')
class MyLockable(Lockable): # class has no attrs, like UserOpts
class MyLockable(Lockable): # class without attrs
_autolock = False
_set_ok = ('foo','baz','alpha','beta','gamma','delta','epsilon')
_reset_ok = ('bar','baz')
@ -81,7 +81,7 @@ class unit_test(object):
lc.epsilon = [0]
class MyLockableClsCheck(Lockable): # class has attrs, like GlobalConfig
class MyLockableClsCheck(Lockable): # class with attrs
_autolock = False
_use_class_attr = True
_set_ok = ('foo','baz')