From ef0affe4c13749768dd54bc1de483e1048c5bc0d Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Wed, 18 Jun 2025 12:55:52 +0000 Subject: [PATCH] new `TxKeys` class --- mmgen/autosign.py | 16 +-- mmgen/data/version | 2 +- mmgen/main_txbump.py | 12 +-- mmgen/main_txdo.py | 9 +- mmgen/main_txsign.py | 12 +-- mmgen/tx/keys.py | 234 +++++++++++++++++++++++++++++++++++++++++++ mmgen/tx/sign.py | 208 -------------------------------------- 7 files changed, 254 insertions(+), 239 deletions(-) create mode 100755 mmgen/tx/keys.py delete mode 100755 mmgen/tx/sign.py diff --git a/mmgen/autosign.py b/mmgen/autosign.py index 655798b1..243e0a1a 100755 --- a/mmgen/autosign.py +++ b/mmgen/autosign.py @@ -269,14 +269,14 @@ class Signable: if tx1.proto.sign_mode == 'daemon': from .rpc import rpc_init tx1.rpc = await rpc_init(self.cfg, tx1.proto, ignore_wallet=True) - from .tx.sign import txsign - tx2 = await txsign( - cfg_parm = self.cfg, - tx = tx1, - seed_files = self.parent.wallet_files[:], - kl = None, - kal = None, - passwd_file = str(self.parent.keyfile)) + from .tx.keys import TxKeys + tx2 = await tx1.sign( + TxKeys( + self.cfg, + tx1, + seedfiles = self.parent.wallet_files[:], + passwdfile = str(self.parent.keyfile), + auto = True).keys) if tx2: tx2.file.write(ask_write=False, outdir=self.dir) return tx2 diff --git a/mmgen/data/version b/mmgen/data/version index 8709d4b4..26b375ca 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -15.1.dev47 +15.1.dev48 diff --git a/mmgen/main_txbump.py b/mmgen/main_txbump.py index c5ec2419..000a73d7 100755 --- a/mmgen/main_txbump.py +++ b/mmgen/main_txbump.py @@ -141,13 +141,9 @@ column below: cfg = Config(opts_data=opts_data) from .tx import CompletedTX, BumpTX, UnsignedTX, OnlineSignedTX -from .tx.sign import txsign, get_seed_files, get_keyaddrlist, get_keylist +from .tx.keys import TxKeys, pop_seedfiles, get_keylist, get_keyaddrlist -seedfiles = get_seed_files( - cfg, - cfg._args, - ignore_dfl_wallet = not cfg.send, - empty_ok = not cfg.send) +seedfiles = pop_seedfiles(cfg, ignore_dfl_wallet=not cfg.send, empty_ok=not cfg.send) if cfg.autosign: if cfg.send: @@ -207,8 +203,8 @@ async def main(): if sign_and_send: tx2 = UnsignedTX(cfg=cfg, data=tx.__dict__) - tx3 = await txsign(cfg, tx2, seedfiles, kl, kal) - if tx3: + if tx3 := await tx2.sign( + TxKeys(cfg, tx2, seedfiles=seedfiles, keylist=kl, keyaddrlist=kal).keys): tx4 = await OnlineSignedTX(cfg=cfg, data=tx3.__dict__) tx4.file.write(ask_write=False) await tx4.send(cfg, asi if cfg.autosign else None) diff --git a/mmgen/main_txdo.py b/mmgen/main_txdo.py index 16759393..bd92fbca 100755 --- a/mmgen/main_txdo.py +++ b/mmgen/main_txdo.py @@ -168,9 +168,9 @@ column below: cfg = Config(opts_data=opts_data) from .tx import NewTX, SentTX -from .tx.sign import txsign, get_seed_files, get_keyaddrlist, get_keylist +from .tx.keys import TxKeys, pop_seedfiles -seed_files = get_seed_files(cfg, cfg._args) +seedfiles = pop_seedfiles(cfg) async def main(): @@ -187,10 +187,7 @@ async def main(): locktime = int(cfg.locktime or 0), caller = 'txdo') - kal = get_keyaddrlist(cfg, cfg._proto) - kl = get_keylist(cfg) - - tx3 = await txsign(cfg, tx2, seed_files, kl, kal) + tx3 = await tx2.sign(TxKeys(cfg, tx2, seedfiles=seedfiles).keys) if tx3: tx3.file.write(ask_write=False) diff --git a/mmgen/main_txsign.py b/mmgen/main_txsign.py index bdbc5cd4..55e0b181 100755 --- a/mmgen/main_txsign.py +++ b/mmgen/main_txsign.py @@ -112,10 +112,10 @@ if not cfg.info and not cfg.terse_info: from .ui import do_license_msg do_license_msg(cfg, immed=True) -from .tx.sign import txsign, get_tx_files, get_seed_files, get_keylist, get_keyaddrlist +from .tx.keys import TxKeys, pop_txfiles, pop_seedfiles -txfiles = get_tx_files(cfg, cfg._args) -seed_files = get_seed_files(cfg, cfg._args) +txfiles = pop_txfiles(cfg) +seedfiles = pop_seedfiles(cfg) async def main(): @@ -148,11 +148,7 @@ async def main(): if not cfg.yes: tx1.info.view_with_prompt(f'View data for transaction{tx_num_disp}?') - kal = get_keyaddrlist(cfg, tx1.proto) - kl = get_keylist(cfg) - - tx2 = await txsign(cfg, tx1, seed_files, kl, kal, tx_num_str=tx_num_disp) - if tx2: + if tx2 := await tx1.sign(TxKeys(cfg, tx1, seedfiles=seedfiles).keys, tx_num_disp): if not cfg.yes: tx2.add_comment() # edits an existing comment tx2.file.write(ask_write=not cfg.yes, ask_write_default_yes=True, add_desc=tx_num_disp) diff --git a/mmgen/tx/keys.py b/mmgen/tx/keys.py new file mode 100755 index 00000000..56c58918 --- /dev/null +++ b/mmgen/tx/keys.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +# +# MMGen Wallet, a terminal-based cryptocurrency wallet +# Copyright (C)2013-2025 The MMGen Project +# Licensed under the GNU General Public License, Version 3: +# https://www.gnu.org/licenses +# Public project repositories: +# https://github.com/mmgen/mmgen-wallet +# https://gitlab.com/mmgen/mmgen-wallet + +""" +tx.keys: transaction keys class for the MMGen suite +""" + +from ..cfg import gc +from ..util import msg, suf, fmt, die, remove_dups, get_extension +from ..addr import MMGenAddrType +from ..addrlist import AddrIdxList, KeyAddrList +from ..wallet import Wallet, get_wallet_extensions, get_wallet_cls + +def _pop_matching_fns(args, cmplist): # strips found args + return list(reversed( + [args.pop(args.index(a)) for a in reversed(args) if get_extension(a) in cmplist])) + +def pop_txfiles(cfg): + from .unsigned import Unsigned, AutomountUnsigned + ext = (AutomountUnsigned if cfg.autosign else Unsigned).ext + ret = _pop_matching_fns(cfg._args, [ext]) + if not ret: + die(1, f'You must specify a raw transaction file with extension ‘.{ext}’') + return ret + +def pop_seedfiles(cfg, *, ignore_dfl_wallet=False, empty_ok=False): + # favor unencrypted seed sources first, as they don't require passwords + ret = _pop_matching_fns(cfg._args, get_wallet_extensions('unenc')) + from ..filename import find_file_in_dir + if not ignore_dfl_wallet: # Make this the first encrypted ss in the list + if wf := find_file_in_dir(get_wallet_cls('mmgen'), cfg.data_dir): + ret.append(wf) + ret += _pop_matching_fns(cfg._args, get_wallet_extensions('enc')) + if not (ret + or empty_ok + or cfg.mmgen_keys_from_file + # or cfg.use_wallet_dat + or cfg.keys_from_file): + die(1, 'You must specify a seed or key source!') + return ret + +def get_keylist(cfg): + if cfg.keys_from_file: + from ..fileutil import get_lines_from_file + return get_lines_from_file( + cfg, + cfg.keys_from_file, + desc = 'key-address data', + trim_comments = True) + +def get_keyaddrlist(cfg, proto): + if cfg.mmgen_keys_from_file: + return KeyAddrList(cfg, proto, infile=cfg.mmgen_keys_from_file) + +class TxKeys: + """ + produce a list of keys to be used for transaction signing + + Keys are taken from a key-address list and/or flat key list, and/or generated + from provided seed sources. Seed sources are searched for subseeds to allow + signing of inputs with subseed addresses by parent seeds. + + In addition, MMGenID-to-address mappings are verified for output addresses, as + well as the swap destination address, if applicable. For this reason, seeds or + key-address files for all MMGen addresses involved in the transaction must be + provided. + + Verification of the swap memo against TX metadata is also performed. + """ + def __init__( + self, + cfg, + tx, + *, + seedfiles = None, + keylist = None, + keyaddrlist = None, + passwdfile = None, + auto = False): + self.cfg = cfg + self.tx = tx + self.seedfiles = seedfiles or pop_seedfiles(cfg) + self.keylist = None if auto else keylist or get_keylist(cfg) + self.keyaddrlist = None if auto else keyaddrlist or get_keyaddrlist(cfg, tx.proto) + self.passwdfile = passwdfile + self.saved_seeds = {} + + def get_keys_for_non_mmgen_inputs(self): + err_fs = 'ERROR: a key file must be supplied for the following non-{} address{}:{}' + sep = '\n ' + if addrs := self.tx.get_non_mmaddrs('inputs'): + self.tx.check_non_mmgen_inputs(caller='txsign', non_mmaddrs=addrs) + kal = KeyAddrList( + cfg = self.cfg, + proto = self.tx.proto, + addrlist = addrs, + skip_chksum = True) + if self.keylist: + kal.add_wifs(self.keylist) + if missing := kal.list_missing('sec'): + die(2, err_fs.format(gc.proj_name, suf(missing, 'es'), sep + sep.join(missing))) + return kal.data + else: + return [] + + def get_seed_for_seed_id(self, sid): + + if sid in self.saved_seeds: + return self.saved_seeds[sid] + + subseeds_checked = False + seed = None + + while True: + if self.seedfiles: + seed = Wallet( + self.cfg, + fn = self.seedfiles.pop(0), + ignore_in_fmt = True, + passwd_file = self.passwdfile).seed + elif self.saved_seeds and subseeds_checked is False: + seed = self.saved_seeds[list(self.saved_seeds)[0]].subseed_by_seed_id(sid, print_msg=True) + subseeds_checked = True + if not seed: + continue + elif self.cfg.in_fmt: + self.cfg._util.qmsg(f'Need seed data for Seed ID {sid}') + seed = Wallet(self.cfg, passwd_file=self.passwdfile).seed + msg(f'User input produced Seed ID {seed.sid}') + if not seed.sid == sid: # TODO: add test + seed = seed.subseed_by_seed_id(sid, print_msg=True) + + if seed: + self.saved_seeds[seed.sid] = seed + if seed.sid == sid: + return seed + else: + die(2, f'ERROR: No seed source found for Seed ID: {sid}') + + def generate_kals_for_mmgen_addrs(self, need_keys, proto): + mmids = [e.mmid for e in need_keys] + sids = remove_dups((i.sid for i in mmids), quiet=True) + self.cfg._util.vmsg('Need seed{}: {}'.format(suf(sids), ' '.join(sids))) + for sid in sids: + seed = self.get_seed_for_seed_id(sid) # raises exception if seed not found + for id_str in MMGenAddrType.mmtypes: + idx_list = [i.idx for i in mmids if i.sid == sid and i.mmtype == id_str] + if idx_list: + yield KeyAddrList( + cfg = self.cfg, + proto = proto, + seed = seed, + addr_idxs = AddrIdxList(idx_list=idx_list), + mmtype = MMGenAddrType(proto, id_str), + skip_chksum = True) + + def add_keys(self, src, io_list, *, from_keyaddrlist=False): + + if not (need_keys := [e for e in io_list if e.mmid and not e.have_wif]): + return [] + + proto = need_keys[0].proto + + if from_keyaddrlist: + desc = 'key-address file' + err_desc = 'From key-address file:' + kals = tuple([self.keyaddrlist]) + else: + desc = 'seed(s)' + err_desc = 'Generated from seed:' + kals = tuple(self.generate_kals_for_mmgen_addrs(need_keys, proto)) + + self.cfg._util.qmsg( + f'Checking {gc.proj_name} -> {proto.coin} address mappings for {src} (from {desc})') + + def gen_keys(): + for e in need_keys: + for kal in kals: + for f in kal.data: + if mmid := f'{kal.al_id}:{f.idx}' == e.mmid: + if f.addr == e.addr: + e.have_wif = True + if src == 'inputs': + yield f + else: + die(3, fmt(f""" + {gc.proj_name} -> {proto.coin} address mappings differ! + {err_desc:<23} {mmid} -> {f.addr} + {'tx file:':<23} {e.mmid} -> {e.addr} + """).strip()) + + if new_keys := list(gen_keys()): + self.cfg._util.vmsg(f'Added {len(new_keys)} wif key{suf(new_keys)} from {desc}') + + return new_keys + + @property + def keys(self): + """ + produce a list of signing keys and perform checks on output and swap destination + addresses + """ + ret = self.get_keys_for_non_mmgen_inputs() + memo_output = self.tx.check_swap_memo() # do this for non-swap transactions too! + + if self.cfg.mmgen_keys_from_file: + ret += self.add_keys('inputs', self.tx.inputs, from_keyaddrlist=True) + self.add_keys('outputs', self.tx.outputs, from_keyaddrlist=True) + if memo_output: + self.add_keys('swap destination address', [memo_output], from_keyaddrlist=True) + + ret += self.add_keys('inputs', self.tx.inputs) + self.add_keys('outputs', self.tx.outputs) + if memo_output: + self.add_keys('swap destination address', [memo_output]) + + # this (boolean) attr isn't needed in transaction file + self.tx.delete_attrs('inputs', 'have_wif') + self.tx.delete_attrs('outputs', 'have_wif') + + if extra_sids := remove_dups( + (s for s in self.saved_seeds + if s not in self.tx.get_sids('inputs') + self.tx.get_sids('outputs')), + quiet = True): + msg('Unused Seed ID{}: {}'.format(suf(extra_sids), ' '.join(extra_sids))) + + return ret diff --git a/mmgen/tx/sign.py b/mmgen/tx/sign.py deleted file mode 100755 index 54ff9104..00000000 --- a/mmgen/tx/sign.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python3 -# -# MMGen Wallet, a terminal-based cryptocurrency wallet -# Copyright (C)2013-2025 The MMGen Project -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -tx.sign: Sign a transaction generated by 'mmgen-txcreate' -""" - -from ..cfg import gc -from ..util import msg, suf, fmt, die, remove_dups, get_extension -from ..obj import MMGenList -from ..addr import MMGenAddrType -from ..addrlist import AddrIdxList, KeyAddrList -from ..wallet import Wallet, get_wallet_extensions, get_wallet_cls - -saved_seeds = {} - -def get_seed_for_seed_id(sid, infiles, saved_seeds): - - if sid in saved_seeds: - return saved_seeds[sid] - - subseeds_checked = False - seed = None - - while True: - if infiles: - seed = Wallet(cfg, fn=infiles.pop(0), ignore_in_fmt=True, passwd_file=global_passwd_file).seed - elif saved_seeds and subseeds_checked is False: - seed = saved_seeds[list(saved_seeds)[0]].subseed_by_seed_id(sid, print_msg=True) - subseeds_checked = True - if not seed: - continue - elif cfg.in_fmt: - cfg._util.qmsg(f'Need seed data for Seed ID {sid}') - seed = Wallet(cfg, passwd_file=global_passwd_file).seed - msg(f'User input produced Seed ID {seed.sid}') - if not seed.sid == sid: # TODO: add test - seed = seed.subseed_by_seed_id(sid, print_msg=True) - - if seed: - saved_seeds[seed.sid] = seed - if seed.sid == sid: - return seed - else: - die(2, f'ERROR: No seed source found for Seed ID: {sid}') - -def generate_kals_for_mmgen_addrs(need_keys, infiles, saved_seeds, proto): - mmids = [e.mmid for e in need_keys] - sids = remove_dups((i.sid for i in mmids), quiet=True) - cfg._util.vmsg(f"Need seed{suf(sids)}: {' '.join(sids)}") - def gen_kals(): - for sid in sids: - # Returns only if seed is found - seed = get_seed_for_seed_id(sid, infiles, saved_seeds) - for id_str in MMGenAddrType.mmtypes: - idx_list = [i.idx for i in mmids if i.sid == sid and i.mmtype == id_str] - if idx_list: - yield KeyAddrList( - cfg = cfg, - proto = proto, - seed = seed, - addr_idxs = AddrIdxList(idx_list=idx_list), - mmtype = MMGenAddrType(proto, id_str), - skip_chksum = True) - return MMGenList(gen_kals()) - -def add_keys(src, io_list, infiles=None, saved_seeds=None, *, keyaddr_list=None): - - need_keys = [e for e in io_list if e.mmid and not e.have_wif] - - if not need_keys: - return [] - - proto = need_keys[0].proto - - if keyaddr_list: - desc = 'key-address file' - src_desc = 'From key-address file:' - d = MMGenList([keyaddr_list]) - else: - desc = 'seed(s)' - src_desc = 'Generated from seed:' - d = generate_kals_for_mmgen_addrs(need_keys, infiles, saved_seeds, proto) - - cfg._util.qmsg(f'Checking {gc.proj_name} -> {proto.coin} address mappings for {src} (from {desc})') - - def gen_keys(): - for e in need_keys: - for kal in d: - for f in kal.data: - if mmid := f'{kal.al_id}:{f.idx}' == e.mmid: - if f.addr == e.addr: - e.have_wif = True - if src == 'inputs': - yield f - else: - die(3, fmt(f""" - {gc.proj_name} -> {proto.coin} address mappings differ! - {src_desc:<23} {mmid} -> {f.addr} - {'tx file:':<23} {e.mmid} -> {e.addr} - """).strip()) - - if new_keys := list(gen_keys()): - cfg._util.vmsg(f'Added {len(new_keys)} wif key{suf(new_keys)} from {desc}') - - return new_keys - -def _pop_matching_fns(args, cmplist): # strips found args - return list(reversed([args.pop(args.index(a)) for a in reversed(args) if get_extension(a) in cmplist])) - -def get_tx_files(cfg, args): - from .unsigned import Unsigned, AutomountUnsigned - ext = (AutomountUnsigned if cfg.autosign else Unsigned).ext - ret = _pop_matching_fns(args, [ext]) - if not ret: - die(1, f'You must specify a raw transaction file with extension ‘.{ext}’') - return ret - -def get_seed_files(cfg, args, *, ignore_dfl_wallet=False, empty_ok=False): - # favor unencrypted seed sources first, as they don't require passwords - ret = _pop_matching_fns(args, get_wallet_extensions('unenc')) - from ..filename import find_file_in_dir - if not ignore_dfl_wallet: # Make this the first encrypted ss in the list - if wf := find_file_in_dir(get_wallet_cls('mmgen'), cfg.data_dir): - ret.append(wf) - ret += _pop_matching_fns(args, get_wallet_extensions('enc')) - if not (ret or empty_ok or cfg.mmgen_keys_from_file or cfg.keys_from_file): # or cfg.use_wallet_dat - die(1, 'You must specify a seed or key source!') - return ret - -def get_keyaddrlist(cfg, proto): - if cfg.mmgen_keys_from_file: - return KeyAddrList(cfg, proto, infile=cfg.mmgen_keys_from_file) - return None - -def get_keylist(cfg): - if cfg.keys_from_file: - from ..fileutil import get_lines_from_file - return get_lines_from_file(cfg, cfg.keys_from_file, desc='key-address data', trim_comments=True) - return None - -async def txsign(cfg_parm, tx, seed_files, kl, kal, *, tx_num_str='', passwd_file=None): - - keys = MMGenList() # list of AddrListEntry objects - non_mmaddrs = tx.get_non_mmaddrs('inputs') - - global cfg, global_passwd_file - cfg = cfg_parm - global_passwd_file = passwd_file - - if non_mmaddrs: - tx.check_non_mmgen_inputs(caller='txsign', non_mmaddrs=non_mmaddrs) - tmp = KeyAddrList( - cfg = cfg, - proto = tx.proto, - addrlist = non_mmaddrs, - skip_chksum = True) - if kl: - tmp.add_wifs(kl) - missing = tmp.list_missing('sec') - if missing: - sep = '\n ' - die(2, 'ERROR: a key file must be supplied for the following non-{} address{}:{}'.format( - gc.proj_name, - suf(missing, 'es'), - sep + sep.join(missing))) - keys += tmp.data - - memo_output = tx.check_swap_memo() # do this for non-swap transactions too! - - if cfg.mmgen_keys_from_file: - keys += add_keys('inputs', tx.inputs, keyaddr_list=kal) - add_keys('outputs', tx.outputs, keyaddr_list=kal) - if memo_output: - add_keys('swap destination address', [memo_output], keyaddr_list=kal) - - keys += add_keys('inputs', tx.inputs, seed_files, saved_seeds) - add_keys('outputs', tx.outputs, seed_files, saved_seeds) - if memo_output: - add_keys('swap destination address', [memo_output], seed_files, saved_seeds) - - # this (boolean) attr isn't needed in transaction file - tx.delete_attrs('inputs', 'have_wif') - tx.delete_attrs('outputs', 'have_wif') - - extra_sids = remove_dups( - (s for s in saved_seeds if s not in tx.get_sids('inputs') + tx.get_sids('outputs')), - quiet = True) - - if extra_sids: - msg(f"Unused Seed ID{suf(extra_sids)}: {' '.join(extra_sids)}") - - return await tx.sign(keys, tx_num_str) # returns signed TX object or False