From b3d7f23440034dd8d00719abf842b7f6fe45bad0 Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Sat, 15 Jan 2022 14:00:06 +0000 Subject: [PATCH] seed.py: move subseed-related classes to subseed.py - for increased efficiency, Seed() now loads subseeds on demand --- mmgen/help.py | 2 +- mmgen/main_txdo.py | 2 +- mmgen/main_txsign.py | 2 +- mmgen/main_wallet.py | 2 +- mmgen/obj.py | 27 ---- mmgen/seed.py | 180 ++------------------------ mmgen/seedsplit.py | 2 +- mmgen/subseed.py | 216 ++++++++++++++++++++++++++++++++ mmgen/tool.py | 1 + test/unit_tests_d/ut_subseed.py | 2 +- 10 files changed, 233 insertions(+), 203 deletions(-) create mode 100755 mmgen/subseed.py diff --git a/mmgen/help.py b/mmgen/help.py index 42f63019..e7fc6a79 100755 --- a/mmgen/help.py +++ b/mmgen/help.py @@ -226,7 +226,7 @@ EXAMPLES: mi = MasterShareIdx ) def subwallet(): - from .obj import SubSeedIdxRange + from .subseed import SubSeedIdxRange return f""" SUBWALLETS: diff --git a/mmgen/main_txdo.py b/mmgen/main_txdo.py index d52e7001..8a3d8b0a 100755 --- a/mmgen/main_txdo.py +++ b/mmgen/main_txdo.py @@ -22,7 +22,7 @@ mmgen-txdo: Create, sign and broadcast an online MMGen transaction from .common import * from .wallet import Wallet -from .obj import SubSeedIdxRange +from .subseed import SubSeedIdxRange opts_data = { 'sets': [('yes', True, 'quiet', True)], diff --git a/mmgen/main_txsign.py b/mmgen/main_txsign.py index ce265a85..0501e239 100755 --- a/mmgen/main_txsign.py +++ b/mmgen/main_txsign.py @@ -21,7 +21,7 @@ mmgen-txsign: Sign a transaction generated by 'mmgen-txcreate' """ from .common import * -from .obj import SubSeedIdxRange +from .subseed import SubSeedIdxRange from .wallet import Wallet # -w, --use-wallet-dat (keys from running coin daemon) removed: use walletdump rpc instead diff --git a/mmgen/main_wallet.py b/mmgen/main_wallet.py index 4c1e5273..a8e3255f 100755 --- a/mmgen/main_wallet.py +++ b/mmgen/main_wallet.py @@ -146,7 +146,7 @@ FMT CODES: cmd_args = opts.init(opts_data,opt_filter=opt_filter) if invoked_as == 'subgen': - from .obj import SubSeedIdx + from .subseed import SubSeedIdx ss_idx = SubSeedIdx(cmd_args.pop()) elif invoked_as == 'seedsplit': from .obj import get_obj diff --git a/mmgen/obj.py b/mmgen/obj.py index 437449fa..275aea3d 100755 --- a/mmgen/obj.py +++ b/mmgen/obj.py @@ -441,10 +441,6 @@ class MMGenRange(tuple,InitErrors,MMGenObject): def items(self): return list(self.iterate()) -class SubSeedIdxRange(MMGenRange): - min_idx = 1 - max_idx = 1000000 - class UnknownCoinAmt(Decimal): pass class DecimalNegateResult(Decimal): pass @@ -659,29 +655,6 @@ class SeedID(str,Hilite,InitErrors): except Exception as e: return cls.init_fail(e,seed or sid) -class SubSeedIdx(str,Hilite,InitErrors): - color = 'red' - trunc_ok = False - def __new__(cls,s): - if type(s) == cls: - return s - try: - assert isinstance(s,str),'not a string or string subclass' - idx = s[:-1] if s[-1] in 'SsLl' else s - from .util import is_int - assert is_int(idx),"valid format: an integer, plus optional letter 'S','s','L' or 'l'" - idx = int(idx) - assert idx >= SubSeedIdxRange.min_idx, f'subseed index < {SubSeedIdxRange.min_idx:,}' - assert idx <= SubSeedIdxRange.max_idx, f'subseed index > {SubSeedIdxRange.max_idx:,}' - - sstype,ltr = ('short','S') if s[-1] in 'Ss' else ('long','L') - me = str.__new__(cls,str(idx)+ltr) - me.idx = idx - me.type = sstype - return me - except Exception as e: - return cls.init_fail(e,s) - class MMGenID(str,Hilite,InitErrors,MMGenObject): color = 'orange' width = 0 diff --git a/mmgen/seed.py b/mmgen/seed.py index 094d5263..d891a355 100755 --- a/mmgen/seed.py +++ b/mmgen/seed.py @@ -55,158 +55,20 @@ class SeedBase(MMGenObject): def fn_stem(self): return self.sid -class SubSeedList(MMGenObject): - have_short = True - nonce_start = 0 - debug_last_share_sid_len = 3 - - def __init__(self,parent_seed): - self.member_type = SubSeed - self.parent_seed = parent_seed - self.data = { 'long': IndexedDict(), 'short': IndexedDict() } - - def __len__(self): - return len(self.data['long']) - - def get_subseed_by_ss_idx(self,ss_idx_in,print_msg=False): - ss_idx = SubSeedIdx(ss_idx_in) - if print_msg: - msg_r('{} {} of {}...'.format( - green('Generating subseed'), - ss_idx.hl(), - self.parent_seed.sid.hl(), - )) - - if ss_idx.idx > len(self): - self._generate(ss_idx.idx) - - sid = self.data[ss_idx.type].key(ss_idx.idx-1) - idx,nonce = self.data[ss_idx.type][sid] - if idx != ss_idx.idx: - die(3, "{} != {}: self.data[{t!r}].key(i) does not match self.data[{t!r}][i]!".format( - idx, - ss_idx.idx, - t = ss_idx.type )) - - if print_msg: - msg(f'\b\b\b => {SeedID.hlc(sid)}') - - seed = self.member_type(self,idx,nonce,length=ss_idx.type) - assert seed.sid == sid, f'{seed.sid} != {sid}: Seed ID mismatch!' - return seed - - def get_subseed_by_seed_id(self,sid,last_idx=None,print_msg=False): - - def get_existing_subseed_by_seed_id(sid): - for k in ('long','short') if self.have_short else ('long',): - if sid in self.data[k]: - idx,nonce = self.data[k][sid] - return self.member_type(self,idx,nonce,length=k) - - def do_msg(subseed): - if print_msg: - qmsg('{} {} ({}:{})'.format( - green('Found subseed'), - subseed.sid.hl(), - self.parent_seed.sid.hl(), - subseed.ss_idx.hl(), - )) - - if last_idx == None: - last_idx = g.subseeds - - subseed = get_existing_subseed_by_seed_id(sid) - if subseed: - do_msg(subseed) - return subseed - - if len(self) >= last_idx: - return None - - self._generate(last_idx,last_sid=sid) - - subseed = get_existing_subseed_by_seed_id(sid) - if subseed: - do_msg(subseed) - return subseed - - def _collision_debug_msg(self,sid,idx,nonce,nonce_desc='nonce',debug_last_share=False): - slen = 'short' if sid in self.data['short'] else 'long' - m1 = f'add_subseed(idx={idx},{slen}):' - if sid == self.parent_seed.sid: - m2 = f'collision with parent Seed ID {sid},' - else: - if debug_last_share: - sl = self.debug_last_share_sid_len - colliding_idx = [d[:sl] for d in self.data[slen].keys].index(sid[:sl]) + 1 - sid = sid[:sl] - else: - colliding_idx = self.data[slen][sid][0] - m2 = f'collision with ID {sid} (idx={colliding_idx},{slen}),' - msg(f'{m1:30} {m2:46} incrementing {nonce_desc} to {nonce+1}') - - def _generate(self,last_idx=None,last_sid=None): - - if last_idx == None: - last_idx = g.subseeds - - first_idx = len(self) + 1 - - if first_idx > last_idx: - return None - - if last_sid != None: - last_sid = SeedID(sid=last_sid) - - def add_subseed(idx,length): - for nonce in range(self.nonce_start,self.member_type.max_nonce+1): # handle SeedID collisions - sid = make_chksum_8(self.member_type.make_subseed_bin(self,idx,nonce,length)) - if sid in self.data['long'] or sid in self.data['short'] or sid == self.parent_seed.sid: - if g.debug_subseed: # should get ≈450 collisions for first 1,000,000 subseeds - self._collision_debug_msg(sid,idx,nonce) - else: - self.data[length][sid] = (idx,nonce) - return last_sid == sid - else: # must exit here, as this could leave self.data in inconsistent state - raise SubSeedNonceRangeExceeded('add_subseed(): nonce range exceeded') - - for idx in SubSeedIdxRange(first_idx,last_idx).iterate(): - match1 = add_subseed(idx,'long') - match2 = add_subseed(idx,'short') if self.have_short else False - if match1 or match2: - break - - def format(self,first_idx,last_idx): - - r = SubSeedIdxRange(first_idx,last_idx) - - if len(self) < last_idx: - self._generate(last_idx) - - fs1 = '{:>18} {:>18}\n' - fs2 = '{i:>7}L: {:8} {i:>7}S: {:8}\n' - - hdr = f' Parent Seed: {self.parent_seed.sid.hl()} ({self.parent_seed.bitlen} bits)\n\n' - hdr += fs1.format('Long Subseeds','Short Subseeds') - hdr += fs1.format('-------------','--------------') - - sl = self.data['long'].keys - ss = self.data['short'].keys - body = (fs2.format( sl[n-1], ss[n-1], i=n ) for n in r.iterate()) - - return hdr + ''.join(body) - class Seed(SeedBase): - def __init__(self,seed_bin=None): - self.subseeds = SubSeedList(self) - SeedBase.__init__(self,seed_bin=seed_bin) + @property + def subseeds(self): + if not hasattr(self,'_subseeds'): + from .subseed import SubSeedList + self._subseeds = SubSeedList(self) + return self._subseeds - def subseed(self,ss_idx_in,print_msg=False): - return self.subseeds.get_subseed_by_ss_idx(ss_idx_in,print_msg=print_msg) + def subseed(self,*args,**kwargs): + return self.subseeds.get_subseed_by_ss_idx(*args,**kwargs) - def subseed_by_seed_id(self,sid,last_idx=None,print_msg=False): - return self.subseeds.get_subseed_by_seed_id(sid,last_idx=last_idx,print_msg=print_msg) + def subseed_by_seed_id(self,*args,**kwargs): + return self.subseeds.get_subseed_by_seed_id(*args,**kwargs) def split(self,*args,**kwargs): from .seedsplit import SeedShareList @@ -216,25 +78,3 @@ class Seed(SeedBase): def join_shares(*args,**kwargs): from .seedsplit import join_shares return join_shares(*args,**kwargs) - -class SubSeed(SeedBase): - - idx = ImmutableAttr(int,typeconv=False) - nonce = ImmutableAttr(int,typeconv=False) - ss_idx = ImmutableAttr(SubSeedIdx) - max_nonce = 1000 - - def __init__(self,parent_list,idx,nonce,length): - self.idx = idx - self.nonce = nonce - self.ss_idx = str(idx) + { 'long': 'L', 'short': 'S' }[length] - self.parent_list = parent_list - SeedBase.__init__(self,seed_bin=type(self).make_subseed_bin(parent_list,idx,nonce,length)) - - @staticmethod - def make_subseed_bin(parent_list,idx:int,nonce:int,length:str): - seed = parent_list.parent_seed - short = { 'short': True, 'long': False }[length] - # field maximums: idx: 4294967295 (1000000), nonce: 65535 (1000), short: 255 (1) - scramble_key = idx.to_bytes(4,'big') + nonce.to_bytes(2,'big') + short.to_bytes(1,'big') - return scramble_seed(seed.data,scramble_key)[:16 if short else seed.byte_len] diff --git a/mmgen/seedsplit.py b/mmgen/seedsplit.py index 70333527..51c54c29 100755 --- a/mmgen/seedsplit.py +++ b/mmgen/seedsplit.py @@ -22,7 +22,7 @@ seedsplit.py: Seed split classes and methods for the MMGen suite from .exception import RangeError from .obj import MMGenPWIDString,MMGenIdx -from .seed import * +from .subseed import * class SeedShareIdx(MMGenIdx): max_val = 1024 diff --git a/mmgen/subseed.py b/mmgen/subseed.py new file mode 100755 index 00000000..947fc72a --- /dev/null +++ b/mmgen/subseed.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +subseed.py: Subseed classes and methods for the MMGen suite +""" + +from .exception import SubSeedNonceRangeExceeded +from .obj import MMGenRange,IndexedDict +from .seed import * +from .crypto import scramble_seed + +class SubSeedIdxRange(MMGenRange): + min_idx = 1 + max_idx = 1000000 + +class SubSeedIdx(str,Hilite,InitErrors): + color = 'red' + trunc_ok = False + def __new__(cls,s): + if type(s) == cls: + return s + try: + assert isinstance(s,str),'not a string or string subclass' + idx = s[:-1] if s[-1] in 'SsLl' else s + from .util import is_int + assert is_int(idx),"valid format: an integer, plus optional letter 'S','s','L' or 'l'" + idx = int(idx) + assert idx >= SubSeedIdxRange.min_idx, f'subseed index < {SubSeedIdxRange.min_idx:,}' + assert idx <= SubSeedIdxRange.max_idx, f'subseed index > {SubSeedIdxRange.max_idx:,}' + + sstype,ltr = ('short','S') if s[-1] in 'Ss' else ('long','L') + me = str.__new__(cls,str(idx)+ltr) + me.idx = idx + me.type = sstype + return me + except Exception as e: + return cls.init_fail(e,s) + +class SubSeed(SeedBase): + + idx = ImmutableAttr(int,typeconv=False) + nonce = ImmutableAttr(int,typeconv=False) + ss_idx = ImmutableAttr(SubSeedIdx) + max_nonce = 1000 + + def __init__(self,parent_list,idx,nonce,length): + self.idx = idx + self.nonce = nonce + self.ss_idx = str(idx) + { 'long': 'L', 'short': 'S' }[length] + self.parent_list = parent_list + SeedBase.__init__(self,seed_bin=type(self).make_subseed_bin(parent_list,idx,nonce,length)) + + @staticmethod + def make_subseed_bin(parent_list,idx:int,nonce:int,length:str): + seed = parent_list.parent_seed + short = { 'short': True, 'long': False }[length] + # field maximums: idx: 4294967295 (1000000), nonce: 65535 (1000), short: 255 (1) + scramble_key = idx.to_bytes(4,'big') + nonce.to_bytes(2,'big') + short.to_bytes(1,'big') + return scramble_seed(seed.data,scramble_key)[:16 if short else seed.byte_len] + +class SubSeedList(MMGenObject): + have_short = True + nonce_start = 0 + debug_last_share_sid_len = 3 + + def __init__(self,parent_seed): + self.member_type = SubSeed + self.parent_seed = parent_seed + self.data = { 'long': IndexedDict(), 'short': IndexedDict() } + + def __len__(self): + return len(self.data['long']) + + def get_subseed_by_ss_idx(self,ss_idx_in,print_msg=False): + ss_idx = SubSeedIdx(ss_idx_in) + if print_msg: + msg_r('{} {} of {}...'.format( + green('Generating subseed'), + ss_idx.hl(), + self.parent_seed.sid.hl(), + )) + + if ss_idx.idx > len(self): + self._generate(ss_idx.idx) + + sid = self.data[ss_idx.type].key(ss_idx.idx-1) + idx,nonce = self.data[ss_idx.type][sid] + if idx != ss_idx.idx: + die(3, "{} != {}: self.data[{t!r}].key(i) does not match self.data[{t!r}][i]!".format( + idx, + ss_idx.idx, + t = ss_idx.type )) + + if print_msg: + msg(f'\b\b\b => {SeedID.hlc(sid)}') + + seed = self.member_type(self,idx,nonce,length=ss_idx.type) + assert seed.sid == sid, f'{seed.sid} != {sid}: Seed ID mismatch!' + return seed + + def get_subseed_by_seed_id(self,sid,last_idx=None,print_msg=False): + + def get_existing_subseed_by_seed_id(sid): + for k in ('long','short') if self.have_short else ('long',): + if sid in self.data[k]: + idx,nonce = self.data[k][sid] + return self.member_type(self,idx,nonce,length=k) + + def do_msg(subseed): + if print_msg: + qmsg('{} {} ({}:{})'.format( + green('Found subseed'), + subseed.sid.hl(), + self.parent_seed.sid.hl(), + subseed.ss_idx.hl(), + )) + + if last_idx == None: + last_idx = g.subseeds + + subseed = get_existing_subseed_by_seed_id(sid) + if subseed: + do_msg(subseed) + return subseed + + if len(self) >= last_idx: + return None + + self._generate(last_idx,last_sid=sid) + + subseed = get_existing_subseed_by_seed_id(sid) + if subseed: + do_msg(subseed) + return subseed + + def _collision_debug_msg(self,sid,idx,nonce,nonce_desc='nonce',debug_last_share=False): + slen = 'short' if sid in self.data['short'] else 'long' + m1 = f'add_subseed(idx={idx},{slen}):' + if sid == self.parent_seed.sid: + m2 = f'collision with parent Seed ID {sid},' + else: + if debug_last_share: + sl = self.debug_last_share_sid_len + colliding_idx = [d[:sl] for d in self.data[slen].keys].index(sid[:sl]) + 1 + sid = sid[:sl] + else: + colliding_idx = self.data[slen][sid][0] + m2 = f'collision with ID {sid} (idx={colliding_idx},{slen}),' + msg(f'{m1:30} {m2:46} incrementing {nonce_desc} to {nonce+1}') + + def _generate(self,last_idx=None,last_sid=None): + + if last_idx == None: + last_idx = g.subseeds + + first_idx = len(self) + 1 + + if first_idx > last_idx: + return None + + if last_sid != None: + last_sid = SeedID(sid=last_sid) + + def add_subseed(idx,length): + for nonce in range(self.nonce_start,self.member_type.max_nonce+1): # handle SeedID collisions + sid = make_chksum_8(self.member_type.make_subseed_bin(self,idx,nonce,length)) + if sid in self.data['long'] or sid in self.data['short'] or sid == self.parent_seed.sid: + if g.debug_subseed: # should get ≈450 collisions for first 1,000,000 subseeds + self._collision_debug_msg(sid,idx,nonce) + else: + self.data[length][sid] = (idx,nonce) + return last_sid == sid + else: # must exit here, as this could leave self.data in inconsistent state + raise SubSeedNonceRangeExceeded('add_subseed(): nonce range exceeded') + + for idx in SubSeedIdxRange(first_idx,last_idx).iterate(): + match1 = add_subseed(idx,'long') + match2 = add_subseed(idx,'short') if self.have_short else False + if match1 or match2: + break + + def format(self,first_idx,last_idx): + + r = SubSeedIdxRange(first_idx,last_idx) + + if len(self) < last_idx: + self._generate(last_idx) + + fs1 = '{:>18} {:>18}\n' + fs2 = '{i:>7}L: {:8} {i:>7}S: {:8}\n' + + hdr = f' Parent Seed: {self.parent_seed.sid.hl()} ({self.parent_seed.bitlen} bits)\n\n' + hdr += fs1.format('Long Subseeds','Short Subseeds') + hdr += fs1.format('-------------','--------------') + + sl = self.data['long'].keys + ss = self.data['short'].keys + body = (fs2.format( sl[n-1], ss[n-1], i=n ) for n in r.iterate()) + + return hdr + ''.join(body) diff --git a/mmgen/tool.py b/mmgen/tool.py index 03729e88..c18b35ff 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -873,6 +873,7 @@ class MMGenToolCmdWallet(MMGenToolCmds): opt.quiet = True sf = get_seed_file([wallet] if wallet else [],1) from .wallet import Wallet + from .subseed import SubSeedIdxRange return Wallet(sf).seed.subseeds.format(*SubSeedIdxRange(subseed_idx_range)) def list_shares(self, diff --git a/test/unit_tests_d/ut_subseed.py b/test/unit_tests_d/ut_subseed.py index 50c6b6c0..af8b88ba 100755 --- a/test/unit_tests_d/ut_subseed.py +++ b/test/unit_tests_d/ut_subseed.py @@ -9,7 +9,7 @@ class unit_test(object): def run_test(self,name,ut): from mmgen.seed import Seed - from mmgen.obj import SubSeedIdxRange + from mmgen.subseed import SubSeedIdxRange def basic_ops(): msg_r('Testing basic ops...')