new TxKeys class

This commit is contained in:
The MMGen Project 2025-06-18 12:55:52 +00:00
commit ef0affe4c1
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
7 changed files with 254 additions and 239 deletions

View file

@ -269,14 +269,14 @@ class Signable:
if tx1.proto.sign_mode == 'daemon':
from .rpc import rpc_init
tx1.rpc = await rpc_init(self.cfg, tx1.proto, ignore_wallet=True)
from .tx.sign import txsign
tx2 = await txsign(
cfg_parm = self.cfg,
tx = tx1,
seed_files = self.parent.wallet_files[:],
kl = None,
kal = None,
passwd_file = str(self.parent.keyfile))
from .tx.keys import TxKeys
tx2 = await tx1.sign(
TxKeys(
self.cfg,
tx1,
seedfiles = self.parent.wallet_files[:],
passwdfile = str(self.parent.keyfile),
auto = True).keys)
if tx2:
tx2.file.write(ask_write=False, outdir=self.dir)
return tx2

View file

@ -1 +1 @@
15.1.dev47
15.1.dev48

View file

@ -141,13 +141,9 @@ column below:
cfg = Config(opts_data=opts_data)
from .tx import CompletedTX, BumpTX, UnsignedTX, OnlineSignedTX
from .tx.sign import txsign, get_seed_files, get_keyaddrlist, get_keylist
from .tx.keys import TxKeys, pop_seedfiles, get_keylist, get_keyaddrlist
seedfiles = get_seed_files(
cfg,
cfg._args,
ignore_dfl_wallet = not cfg.send,
empty_ok = not cfg.send)
seedfiles = pop_seedfiles(cfg, ignore_dfl_wallet=not cfg.send, empty_ok=not cfg.send)
if cfg.autosign:
if cfg.send:
@ -207,8 +203,8 @@ async def main():
if sign_and_send:
tx2 = UnsignedTX(cfg=cfg, data=tx.__dict__)
tx3 = await txsign(cfg, tx2, seedfiles, kl, kal)
if tx3:
if tx3 := await tx2.sign(
TxKeys(cfg, tx2, seedfiles=seedfiles, keylist=kl, keyaddrlist=kal).keys):
tx4 = await OnlineSignedTX(cfg=cfg, data=tx3.__dict__)
tx4.file.write(ask_write=False)
await tx4.send(cfg, asi if cfg.autosign else None)

View file

@ -168,9 +168,9 @@ column below:
cfg = Config(opts_data=opts_data)
from .tx import NewTX, SentTX
from .tx.sign import txsign, get_seed_files, get_keyaddrlist, get_keylist
from .tx.keys import TxKeys, pop_seedfiles
seed_files = get_seed_files(cfg, cfg._args)
seedfiles = pop_seedfiles(cfg)
async def main():
@ -187,10 +187,7 @@ async def main():
locktime = int(cfg.locktime or 0),
caller = 'txdo')
kal = get_keyaddrlist(cfg, cfg._proto)
kl = get_keylist(cfg)
tx3 = await txsign(cfg, tx2, seed_files, kl, kal)
tx3 = await tx2.sign(TxKeys(cfg, tx2, seedfiles=seedfiles).keys)
if tx3:
tx3.file.write(ask_write=False)

View file

@ -112,10 +112,10 @@ if not cfg.info and not cfg.terse_info:
from .ui import do_license_msg
do_license_msg(cfg, immed=True)
from .tx.sign import txsign, get_tx_files, get_seed_files, get_keylist, get_keyaddrlist
from .tx.keys import TxKeys, pop_txfiles, pop_seedfiles
txfiles = get_tx_files(cfg, cfg._args)
seed_files = get_seed_files(cfg, cfg._args)
txfiles = pop_txfiles(cfg)
seedfiles = pop_seedfiles(cfg)
async def main():
@ -148,11 +148,7 @@ async def main():
if not cfg.yes:
tx1.info.view_with_prompt(f'View data for transaction{tx_num_disp}?')
kal = get_keyaddrlist(cfg, tx1.proto)
kl = get_keylist(cfg)
tx2 = await txsign(cfg, tx1, seed_files, kl, kal, tx_num_str=tx_num_disp)
if tx2:
if tx2 := await tx1.sign(TxKeys(cfg, tx1, seedfiles=seedfiles).keys, tx_num_disp):
if not cfg.yes:
tx2.add_comment() # edits an existing comment
tx2.file.write(ask_write=not cfg.yes, ask_write_default_yes=True, add_desc=tx_num_disp)

234
mmgen/tx/keys.py Executable file
View file

@ -0,0 +1,234 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen-wallet
# https://gitlab.com/mmgen/mmgen-wallet
"""
tx.keys: transaction keys class for the MMGen suite
"""
from ..cfg import gc
from ..util import msg, suf, fmt, die, remove_dups, get_extension
from ..addr import MMGenAddrType
from ..addrlist import AddrIdxList, KeyAddrList
from ..wallet import Wallet, get_wallet_extensions, get_wallet_cls
def _pop_matching_fns(args, cmplist): # strips found args
return list(reversed(
[args.pop(args.index(a)) for a in reversed(args) if get_extension(a) in cmplist]))
def pop_txfiles(cfg):
from .unsigned import Unsigned, AutomountUnsigned
ext = (AutomountUnsigned if cfg.autosign else Unsigned).ext
ret = _pop_matching_fns(cfg._args, [ext])
if not ret:
die(1, f'You must specify a raw transaction file with extension ‘.{ext}')
return ret
def pop_seedfiles(cfg, *, ignore_dfl_wallet=False, empty_ok=False):
# favor unencrypted seed sources first, as they don't require passwords
ret = _pop_matching_fns(cfg._args, get_wallet_extensions('unenc'))
from ..filename import find_file_in_dir
if not ignore_dfl_wallet: # Make this the first encrypted ss in the list
if wf := find_file_in_dir(get_wallet_cls('mmgen'), cfg.data_dir):
ret.append(wf)
ret += _pop_matching_fns(cfg._args, get_wallet_extensions('enc'))
if not (ret
or empty_ok
or cfg.mmgen_keys_from_file
# or cfg.use_wallet_dat
or cfg.keys_from_file):
die(1, 'You must specify a seed or key source!')
return ret
def get_keylist(cfg):
if cfg.keys_from_file:
from ..fileutil import get_lines_from_file
return get_lines_from_file(
cfg,
cfg.keys_from_file,
desc = 'key-address data',
trim_comments = True)
def get_keyaddrlist(cfg, proto):
if cfg.mmgen_keys_from_file:
return KeyAddrList(cfg, proto, infile=cfg.mmgen_keys_from_file)
class TxKeys:
"""
produce a list of keys to be used for transaction signing
Keys are taken from a key-address list and/or flat key list, and/or generated
from provided seed sources. Seed sources are searched for subseeds to allow
signing of inputs with subseed addresses by parent seeds.
In addition, MMGenID-to-address mappings are verified for output addresses, as
well as the swap destination address, if applicable. For this reason, seeds or
key-address files for all MMGen addresses involved in the transaction must be
provided.
Verification of the swap memo against TX metadata is also performed.
"""
def __init__(
self,
cfg,
tx,
*,
seedfiles = None,
keylist = None,
keyaddrlist = None,
passwdfile = None,
auto = False):
self.cfg = cfg
self.tx = tx
self.seedfiles = seedfiles or pop_seedfiles(cfg)
self.keylist = None if auto else keylist or get_keylist(cfg)
self.keyaddrlist = None if auto else keyaddrlist or get_keyaddrlist(cfg, tx.proto)
self.passwdfile = passwdfile
self.saved_seeds = {}
def get_keys_for_non_mmgen_inputs(self):
err_fs = 'ERROR: a key file must be supplied for the following non-{} address{}:{}'
sep = '\n '
if addrs := self.tx.get_non_mmaddrs('inputs'):
self.tx.check_non_mmgen_inputs(caller='txsign', non_mmaddrs=addrs)
kal = KeyAddrList(
cfg = self.cfg,
proto = self.tx.proto,
addrlist = addrs,
skip_chksum = True)
if self.keylist:
kal.add_wifs(self.keylist)
if missing := kal.list_missing('sec'):
die(2, err_fs.format(gc.proj_name, suf(missing, 'es'), sep + sep.join(missing)))
return kal.data
else:
return []
def get_seed_for_seed_id(self, sid):
if sid in self.saved_seeds:
return self.saved_seeds[sid]
subseeds_checked = False
seed = None
while True:
if self.seedfiles:
seed = Wallet(
self.cfg,
fn = self.seedfiles.pop(0),
ignore_in_fmt = True,
passwd_file = self.passwdfile).seed
elif self.saved_seeds and subseeds_checked is False:
seed = self.saved_seeds[list(self.saved_seeds)[0]].subseed_by_seed_id(sid, print_msg=True)
subseeds_checked = True
if not seed:
continue
elif self.cfg.in_fmt:
self.cfg._util.qmsg(f'Need seed data for Seed ID {sid}')
seed = Wallet(self.cfg, passwd_file=self.passwdfile).seed
msg(f'User input produced Seed ID {seed.sid}')
if not seed.sid == sid: # TODO: add test
seed = seed.subseed_by_seed_id(sid, print_msg=True)
if seed:
self.saved_seeds[seed.sid] = seed
if seed.sid == sid:
return seed
else:
die(2, f'ERROR: No seed source found for Seed ID: {sid}')
def generate_kals_for_mmgen_addrs(self, need_keys, proto):
mmids = [e.mmid for e in need_keys]
sids = remove_dups((i.sid for i in mmids), quiet=True)
self.cfg._util.vmsg('Need seed{}: {}'.format(suf(sids), ' '.join(sids)))
for sid in sids:
seed = self.get_seed_for_seed_id(sid) # raises exception if seed not found
for id_str in MMGenAddrType.mmtypes:
idx_list = [i.idx for i in mmids if i.sid == sid and i.mmtype == id_str]
if idx_list:
yield KeyAddrList(
cfg = self.cfg,
proto = proto,
seed = seed,
addr_idxs = AddrIdxList(idx_list=idx_list),
mmtype = MMGenAddrType(proto, id_str),
skip_chksum = True)
def add_keys(self, src, io_list, *, from_keyaddrlist=False):
if not (need_keys := [e for e in io_list if e.mmid and not e.have_wif]):
return []
proto = need_keys[0].proto
if from_keyaddrlist:
desc = 'key-address file'
err_desc = 'From key-address file:'
kals = tuple([self.keyaddrlist])
else:
desc = 'seed(s)'
err_desc = 'Generated from seed:'
kals = tuple(self.generate_kals_for_mmgen_addrs(need_keys, proto))
self.cfg._util.qmsg(
f'Checking {gc.proj_name} -> {proto.coin} address mappings for {src} (from {desc})')
def gen_keys():
for e in need_keys:
for kal in kals:
for f in kal.data:
if mmid := f'{kal.al_id}:{f.idx}' == e.mmid:
if f.addr == e.addr:
e.have_wif = True
if src == 'inputs':
yield f
else:
die(3, fmt(f"""
{gc.proj_name} -> {proto.coin} address mappings differ!
{err_desc:<23} {mmid} -> {f.addr}
{'tx file:':<23} {e.mmid} -> {e.addr}
""").strip())
if new_keys := list(gen_keys()):
self.cfg._util.vmsg(f'Added {len(new_keys)} wif key{suf(new_keys)} from {desc}')
return new_keys
@property
def keys(self):
"""
produce a list of signing keys and perform checks on output and swap destination
addresses
"""
ret = self.get_keys_for_non_mmgen_inputs()
memo_output = self.tx.check_swap_memo() # do this for non-swap transactions too!
if self.cfg.mmgen_keys_from_file:
ret += self.add_keys('inputs', self.tx.inputs, from_keyaddrlist=True)
self.add_keys('outputs', self.tx.outputs, from_keyaddrlist=True)
if memo_output:
self.add_keys('swap destination address', [memo_output], from_keyaddrlist=True)
ret += self.add_keys('inputs', self.tx.inputs)
self.add_keys('outputs', self.tx.outputs)
if memo_output:
self.add_keys('swap destination address', [memo_output])
# this (boolean) attr isn't needed in transaction file
self.tx.delete_attrs('inputs', 'have_wif')
self.tx.delete_attrs('outputs', 'have_wif')
if extra_sids := remove_dups(
(s for s in self.saved_seeds
if s not in self.tx.get_sids('inputs') + self.tx.get_sids('outputs')),
quiet = True):
msg('Unused Seed ID{}: {}'.format(suf(extra_sids), ' '.join(extra_sids)))
return ret

View file

@ -1,208 +0,0 @@
#!/usr/bin/env python3
#
# MMGen Wallet, a terminal-based cryptocurrency wallet
# Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tx.sign: Sign a transaction generated by 'mmgen-txcreate'
"""
from ..cfg import gc
from ..util import msg, suf, fmt, die, remove_dups, get_extension
from ..obj import MMGenList
from ..addr import MMGenAddrType
from ..addrlist import AddrIdxList, KeyAddrList
from ..wallet import Wallet, get_wallet_extensions, get_wallet_cls
saved_seeds = {}
def get_seed_for_seed_id(sid, infiles, saved_seeds):
if sid in saved_seeds:
return saved_seeds[sid]
subseeds_checked = False
seed = None
while True:
if infiles:
seed = Wallet(cfg, fn=infiles.pop(0), ignore_in_fmt=True, passwd_file=global_passwd_file).seed
elif saved_seeds and subseeds_checked is False:
seed = saved_seeds[list(saved_seeds)[0]].subseed_by_seed_id(sid, print_msg=True)
subseeds_checked = True
if not seed:
continue
elif cfg.in_fmt:
cfg._util.qmsg(f'Need seed data for Seed ID {sid}')
seed = Wallet(cfg, passwd_file=global_passwd_file).seed
msg(f'User input produced Seed ID {seed.sid}')
if not seed.sid == sid: # TODO: add test
seed = seed.subseed_by_seed_id(sid, print_msg=True)
if seed:
saved_seeds[seed.sid] = seed
if seed.sid == sid:
return seed
else:
die(2, f'ERROR: No seed source found for Seed ID: {sid}')
def generate_kals_for_mmgen_addrs(need_keys, infiles, saved_seeds, proto):
mmids = [e.mmid for e in need_keys]
sids = remove_dups((i.sid for i in mmids), quiet=True)
cfg._util.vmsg(f"Need seed{suf(sids)}: {' '.join(sids)}")
def gen_kals():
for sid in sids:
# Returns only if seed is found
seed = get_seed_for_seed_id(sid, infiles, saved_seeds)
for id_str in MMGenAddrType.mmtypes:
idx_list = [i.idx for i in mmids if i.sid == sid and i.mmtype == id_str]
if idx_list:
yield KeyAddrList(
cfg = cfg,
proto = proto,
seed = seed,
addr_idxs = AddrIdxList(idx_list=idx_list),
mmtype = MMGenAddrType(proto, id_str),
skip_chksum = True)
return MMGenList(gen_kals())
def add_keys(src, io_list, infiles=None, saved_seeds=None, *, keyaddr_list=None):
need_keys = [e for e in io_list if e.mmid and not e.have_wif]
if not need_keys:
return []
proto = need_keys[0].proto
if keyaddr_list:
desc = 'key-address file'
src_desc = 'From key-address file:'
d = MMGenList([keyaddr_list])
else:
desc = 'seed(s)'
src_desc = 'Generated from seed:'
d = generate_kals_for_mmgen_addrs(need_keys, infiles, saved_seeds, proto)
cfg._util.qmsg(f'Checking {gc.proj_name} -> {proto.coin} address mappings for {src} (from {desc})')
def gen_keys():
for e in need_keys:
for kal in d:
for f in kal.data:
if mmid := f'{kal.al_id}:{f.idx}' == e.mmid:
if f.addr == e.addr:
e.have_wif = True
if src == 'inputs':
yield f
else:
die(3, fmt(f"""
{gc.proj_name} -> {proto.coin} address mappings differ!
{src_desc:<23} {mmid} -> {f.addr}
{'tx file:':<23} {e.mmid} -> {e.addr}
""").strip())
if new_keys := list(gen_keys()):
cfg._util.vmsg(f'Added {len(new_keys)} wif key{suf(new_keys)} from {desc}')
return new_keys
def _pop_matching_fns(args, cmplist): # strips found args
return list(reversed([args.pop(args.index(a)) for a in reversed(args) if get_extension(a) in cmplist]))
def get_tx_files(cfg, args):
from .unsigned import Unsigned, AutomountUnsigned
ext = (AutomountUnsigned if cfg.autosign else Unsigned).ext
ret = _pop_matching_fns(args, [ext])
if not ret:
die(1, f'You must specify a raw transaction file with extension ‘.{ext}')
return ret
def get_seed_files(cfg, args, *, ignore_dfl_wallet=False, empty_ok=False):
# favor unencrypted seed sources first, as they don't require passwords
ret = _pop_matching_fns(args, get_wallet_extensions('unenc'))
from ..filename import find_file_in_dir
if not ignore_dfl_wallet: # Make this the first encrypted ss in the list
if wf := find_file_in_dir(get_wallet_cls('mmgen'), cfg.data_dir):
ret.append(wf)
ret += _pop_matching_fns(args, get_wallet_extensions('enc'))
if not (ret or empty_ok or cfg.mmgen_keys_from_file or cfg.keys_from_file): # or cfg.use_wallet_dat
die(1, 'You must specify a seed or key source!')
return ret
def get_keyaddrlist(cfg, proto):
if cfg.mmgen_keys_from_file:
return KeyAddrList(cfg, proto, infile=cfg.mmgen_keys_from_file)
return None
def get_keylist(cfg):
if cfg.keys_from_file:
from ..fileutil import get_lines_from_file
return get_lines_from_file(cfg, cfg.keys_from_file, desc='key-address data', trim_comments=True)
return None
async def txsign(cfg_parm, tx, seed_files, kl, kal, *, tx_num_str='', passwd_file=None):
keys = MMGenList() # list of AddrListEntry objects
non_mmaddrs = tx.get_non_mmaddrs('inputs')
global cfg, global_passwd_file
cfg = cfg_parm
global_passwd_file = passwd_file
if non_mmaddrs:
tx.check_non_mmgen_inputs(caller='txsign', non_mmaddrs=non_mmaddrs)
tmp = KeyAddrList(
cfg = cfg,
proto = tx.proto,
addrlist = non_mmaddrs,
skip_chksum = True)
if kl:
tmp.add_wifs(kl)
missing = tmp.list_missing('sec')
if missing:
sep = '\n '
die(2, 'ERROR: a key file must be supplied for the following non-{} address{}:{}'.format(
gc.proj_name,
suf(missing, 'es'),
sep + sep.join(missing)))
keys += tmp.data
memo_output = tx.check_swap_memo() # do this for non-swap transactions too!
if cfg.mmgen_keys_from_file:
keys += add_keys('inputs', tx.inputs, keyaddr_list=kal)
add_keys('outputs', tx.outputs, keyaddr_list=kal)
if memo_output:
add_keys('swap destination address', [memo_output], keyaddr_list=kal)
keys += add_keys('inputs', tx.inputs, seed_files, saved_seeds)
add_keys('outputs', tx.outputs, seed_files, saved_seeds)
if memo_output:
add_keys('swap destination address', [memo_output], seed_files, saved_seeds)
# this (boolean) attr isn't needed in transaction file
tx.delete_attrs('inputs', 'have_wif')
tx.delete_attrs('outputs', 'have_wif')
extra_sids = remove_dups(
(s for s in saved_seeds if s not in tx.get_sids('inputs') + tx.get_sids('outputs')),
quiet = True)
if extra_sids:
msg(f"Unused Seed ID{suf(extra_sids)}: {' '.join(extra_sids)}")
return await tx.sign(keys, tx_num_str) # returns signed TX object or False