minor cleanups, variable & method renames

This commit is contained in:
The MMGen Project 2025-02-15 09:54:18 +00:00
commit 487cbfcc0d
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
10 changed files with 41 additions and 33 deletions

View file

@ -56,7 +56,7 @@ def launch(*, mod=None, func=None, fqmod=None, package='mmgen'):
2: _o(yellow, 2, '{message}'),
3: _o(yellow, 3, '\nMMGen Error ({name}):\n{message}'),
4: _o(red, 4, '\nMMGen Fatal Error ({name}):\n{message}'),
'x': _o(yellow, 5, '\nMMGen Unhandled Exception ({name}): {e}'),
'x': _o(yellow, 5, '\nMMGen Python Exception ({name}): {e}'),
}[getattr(e, 'mmcode', 'x')]
(sys.stdout if getattr(e, 'stdout', None) else sys.stderr).write(

View file

@ -14,7 +14,6 @@ proto.btc.tw.addresses: Bitcoin base protocol tracking wallet address list class
from ....tw.addresses import TwAddresses
from ....tw.shared import TwLabel
from ....util import msg, msg_r
from ....obj import get_obj
from .rpc import BitcoinTwRPC
@ -47,15 +46,17 @@ class BitcoinTwAddresses(TwAddresses, BitcoinTwRPC):
async def get_rpc_data(self):
msg_r('Getting unspent outputs...')
qmsg = self.cfg._util.qmsg
qmsg_r = self.cfg._util.qmsg_r
qmsg_r('Getting unspent outputs...')
addrs = await self.get_unspent_by_mmid(self.minconf)
msg('done')
qmsg('done')
coin_amt = self.proto.coin_amt
amt0 = coin_amt('0')
self.total = sum((v['amt'] for v in addrs.values()), start=amt0)
msg_r('Getting labels and associated addresses...')
qmsg_r('Getting labels and associated addresses...')
for e in await self.get_label_addr_pairs():
if e.label and e.label.mmid not in addrs:
addrs[e.label.mmid] = {
@ -64,9 +65,9 @@ class BitcoinTwAddresses(TwAddresses, BitcoinTwRPC):
'recvd': amt0,
'confs': 0,
'lbl': e.label}
msg('done')
qmsg('done')
msg_r('Getting received funds data...')
qmsg_r('Getting received funds data...')
# args: 1:minconf, 2:include_empty, 3:include_watchonly, 4:include_immature_coinbase (>=v23.0.0)
for d in await self.rpc.call('listreceivedbylabel', 1, True, True):
label = get_obj(TwLabel, proto=self.proto, text=d['label'])
@ -74,6 +75,6 @@ class BitcoinTwAddresses(TwAddresses, BitcoinTwRPC):
assert label.mmid in addrs, f'{label.mmid!r} not found in addrlist!'
addrs[label.mmid]['recvd'] = coin_amt(d['amount'])
addrs[label.mmid]['confs'] = d['confirmations']
msg('done')
qmsg('done')
return addrs

View file

@ -18,7 +18,7 @@ from .new import New
class NewSwap(New, TxNewSwap):
desc = 'Bitcoin swap transaction'
async def process_swap_cmd_args(self, cmd_args):
async def process_swap_cmdline_args(self, cmd_args):
import sys
from ....util import msg
msg(' '.join(cmd_args))

View file

@ -80,7 +80,7 @@ class New(Base, TxBase.New):
'update_txid() must be called only when self.serialized is not hex data')
self.txid = MMGenTxID(make_chksum_6(self.serialized).upper())
async def process_cmd_args(self, cmd_args, ad_f, ad_w):
async def process_cmdline_args(self, cmd_args, ad_f, ad_w):
lc = len(cmd_args)
@ -90,10 +90,10 @@ class New(Base, TxBase.New):
if lc != 1:
die(1, f'{lc} output{suf(lc)} specified, but Ethereum transactions must have exactly one')
arg = self.parse_cmd_arg(cmd_args[0], ad_f, ad_w)
arg = self.parse_cmdline_arg(cmd_args[0], ad_f, ad_w)
self.add_output(
coinaddr = arg.coin_addr,
coinaddr = arg.addr,
amt = self.proto.coin_amt(arg.amt or '0'),
is_chg = not arg.amt)

View file

@ -171,9 +171,9 @@ class New(Base):
def process_data_output_arg(self, arg):
return None
def parse_cmd_arg(self, arg_in, ad_f, ad_w):
def parse_cmdline_arg(self, arg_in, ad_f, ad_w):
_pa = namedtuple('parsed_txcreate_cmdline_arg', ['arg', 'mmid', 'coin_addr', 'amt', 'data'])
_pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data'])
if data := self.process_data_output_arg(arg_in):
return _pa(arg_in, None, None, None, data)
@ -194,7 +194,7 @@ class New(Base):
return _pa(arg, mmid, coin_addr, amt, None)
async def process_cmd_args(self, cmd_args, ad_f, ad_w):
async def process_cmdline_args(self, cmd_args, ad_f, ad_w):
async def get_autochg_addr(arg, parsed_args):
from ..tw.addresses import TwAddresses
@ -216,9 +216,9 @@ class New(Base):
d = desc,
a = arg))
parsed_args = [self.parse_cmd_arg(a, ad_f, ad_w) for a in cmd_args]
parsed_args = [self.parse_cmdline_arg(arg, ad_f, ad_w) for arg in cmd_args]
chg_args = [a for a in parsed_args if not ((a.amt and a.coin_addr) or a.data)]
chg_args = [a for a in parsed_args if not ((a.amt and a.addr) or a.data)]
if len(chg_args) > 1:
desc = 'requested' if self.chg_autoselected else 'listed'
@ -229,7 +229,7 @@ class New(Base):
self.add_output(None, self.proto.coin_amt('0'), data=a.data)
else:
self.add_output(
coinaddr = a.coin_addr or (await get_autochg_addr(a.arg, parsed_args)).addr,
coinaddr = a.addr or (await get_autochg_addr(a.arg, parsed_args)).addr,
amt = self.proto.coin_amt(a.amt or '0'),
is_chg = not a.amt)
@ -260,19 +260,19 @@ class New(Base):
from ..addrdata import AddrData
from ..addrlist import AddrList
from ..addrfile import AddrFile
addrfiles = remove_dups(
addrfile_args = remove_dups(
tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
desc = 'command line',
edesc = 'argument',
)
cmd_args = remove_dups(
tuple(a for a in cmd_args if a not in addrfiles),
tuple(a for a in cmd_args if a not in addrfile_args),
desc = 'command line',
edesc = 'argument',
)
ad_f = AddrData(self.proto)
from ..fileutil import check_infile
for addrfile in addrfiles:
for addrfile in addrfile_args:
check_infile(addrfile)
ad_f.add(AddrList(self.cfg, self.proto, addrfile))
return ad_f, cmd_args
@ -417,8 +417,8 @@ class New(Base):
from ..addrdata import TwAddrData
ad_w = await TwAddrData(self.cfg, self.proto, twctl=self.twctl)
if self.target == 'swaptx':
cmd_args = await self.process_swap_cmd_args(cmd_args)
await self.process_cmd_args(cmd_args, ad_f, ad_w)
cmd_args = await self.process_swap_cmdline_args(cmd_args)
await self.process_cmdline_args(cmd_args, ad_f, ad_w)
from ..ui import do_license_msg
do_license_msg(self.cfg)

View file

@ -17,6 +17,5 @@ from .new import New
class NewSwap(New):
desc = 'swap transaction'
async def process_swap_cmd_args(self, cmd_args):
raise NotImplementedError('Work in Progress!')
return cmd_args
async def process_swap_cmdline_args(self, cmd_args):
raise NotImplementedError(f'Swap not implemented for protocol {self.proto.__name__}')

View file

@ -358,7 +358,7 @@ def secs_to_ms(secs):
return '{:02d}:{:02d}'.format(secs//60, secs % 60)
def is_int(s): # actually is_nonnegative_int()
return set(str(s)) <= set(digits)
return set(str(s) or 'x') <= set(digits)
def check_int_between(val, imin, imax, desc):
if not imin <= int(val) <= imax:

View file

@ -586,7 +586,7 @@ class CmdTestRunner:
if logging:
self.log_fd.write('[{}][{}:{}] {}\n'.format(
proto.coin.lower(),
(proto.coin.lower() if 'coin' in self.tg.passthru_opts else 'NONE'),
self.tg.group_name,
self.tg.test_name,
cmd_disp))

View file

@ -161,13 +161,13 @@ rt_data = {
}
}
def make_burn_addr(proto):
def make_burn_addr(proto, mmtype='compressed'):
from mmgen.tool.coin import tool_cmd
return tool_cmd(
cfg = cfg,
cmdname = 'pubhash2addr',
proto = proto,
mmtype = 'compressed').pubhash2addr('00'*20)
mmtype = mmtype).pubhash2addr('00'*20)
class CmdTestRegtest(CmdTestBase, CmdTestShared):
'transacting and tracking wallet operations via regtest mode'

View file

@ -12,7 +12,13 @@
test.cmdtest_d.ct_swap: asset swap tests for the cmdtest.py test suite
"""
from .ct_regtest import CmdTestRegtest, rt_data, dfl_wcls, rt_pw
from mmgen.protocol import init_proto
from .ct_regtest import (
CmdTestRegtest,
rt_data,
dfl_wcls,
rt_pw)
sample1 = '=:ETH.ETH:0x86d526d6624AbC0178cF7296cD538Ecc080A95F1:0/1/0'
sample2 = '00010203040506'
@ -62,10 +68,12 @@ class CmdTestSwap(CmdTestRegtest):
}
def __init__(self, trunner, cfgs, spawn):
super().__init__(trunner, cfgs, spawn)
gldict = globals()
globals_dict = globals()
for k in rt_data:
gldict[k] = rt_data[k]['btc']
globals_dict[k] = rt_data[k]['btc']
@property
def sid(self):