mmgen-tool twexport: address pruning

Bitcoin Core and code forks thereof provide no mechanism for deleting addresses
from a tracking wallet.  Enter tracking wallet exporting with address pruning:
calling ‘mmgen-tool twexport’ with the ‘prune=1’ option launches an interactive
prune menu, allowing the user to delete unwanted addresses from the JSON wallet
dump.  A new tracking wallet can then be easily created from the pruned dump
using ‘mmgen-tool twimport’.

Testing/demo:

    # run the dependencies for the regtest ‘twprune’ test:
    $ test/test.py -d regtest.twprune

    # run the test, displaying output:
    $ test/test.py -SA --demo regtest.twprune
This commit is contained in:
The MMGen Project 2022-12-01 12:32:33 +00:00
commit f62322b177
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
8 changed files with 414 additions and 9 deletions

View file

@ -1 +1 @@
November 2022
December 2022

View file

@ -1 +1 @@
13.3.dev23
13.3.dev24

View file

@ -20,6 +20,8 @@ class BitcoinTwJSON(TwJSON):
class Base(TwJSON.Base):
can_prune = True
@property
def mappings_json(self):
return self.json_dump([(e.mmgen_id,e.address) for e in self.entries])
@ -76,8 +78,17 @@ class BitcoinTwJSON(TwJSON):
@property
async def addrlist(self):
if not hasattr(self,'_addrlist'):
from .addresses import TwAddresses
self._addrlist = await TwAddresses(self.proto,get_data=True)
if self.prune:
from .prune import TwAddressesPrune
self._addrlist = al = await TwAddressesPrune(
self.proto,
get_data = True,
warn_used = self.warn_used )
await al.view_filter_and_sort()
self.pruned = al.do_prune()
else:
from .addresses import TwAddresses
self._addrlist = await TwAddresses(self.proto,get_data=True)
return self._addrlist
async def get_entries(self): # TODO: include 'received' field

43
mmgen/proto/btc/tw/prune.py Executable file
View file

@ -0,0 +1,43 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
proto.btc.tw.prune: Bitcoin base protocol tracking wallet address list prune class
"""
from ....tw.prune import TwAddressesPrune
from .addresses import BitcoinTwAddresses
class BitcoinTwAddressesPrune(BitcoinTwAddresses,TwAddressesPrune):
prompt = """
Sort options: [a]mt, [A]ge, [M]mid, [r]everse
Column options: toggle [D]ays/date/confs/block
Filters: show [E]mpty addrs, [U]sed addrs, all [L]abels
View/Print: pager [v]iew, [w]ide view
Actions: [q]uit pruning, r[e]draw,
Pruning: [p]rune, [u]nprune, [c]lear prune list:
"""
key_mappings = {
'a':'s_amt',
'A':'s_age',
'M':'s_twmmid',
'r':'d_reverse',
'D':'d_days',
'e':'d_redraw',
'E':'d_showempty',
'U':'d_showused',
'L':'d_all_labels',
'q':'a_quit',
'v':'a_view',
'w':'a_view_detail',
'p':'a_prune',
'u':'a_unprune',
'c':'a_clear_prune_list' }

View file

@ -193,7 +193,7 @@ class tool_cmd(tool_cmd_base):
ret = await (await TwCtl(self.proto,mode='w')).rescan_blockchain(start_block,stop_block)
return True
async def twexport(self,include_amts=True,pretty=False):
async def twexport(self,include_amts=True,pretty=False,prune=False,warn_used=False):
"""
export a tracking wallet to JSON format
@ -205,9 +205,21 @@ class tool_cmd(tool_cmd_base):
If pretty is true, JSON will be dumped in human-readable format to allow
for editing of comment fields.
If prune is true, an interactive menu will be launched allowing the user
to prune unwanted addresses before creating the JSON dump. Pruning has no
effect on the existing tracking wallet.
If warn_used is true, the user will be prompted before pruning used
addresses.
"""
from ..tw.json import TwJSON
await TwJSON.Export( self.proto, include_amts=include_amts, pretty=pretty )
await TwJSON.Export(
self.proto,
include_amts = include_amts,
pretty = pretty,
prune = prune,
warn_used = warn_used )
return True
async def twimport(self,filename:str,ignore_checksum=False,batch=False):

View file

@ -15,7 +15,7 @@ tw.json: export and import tracking wallet to JSON format
import json
from collections import namedtuple
from ..util import msg,ymsg,fmt,die,make_timestamp,make_chksum_8,compare_or_die
from ..util import msg,ymsg,fmt,suf,die,make_timestamp,make_chksum_8,compare_or_die
from ..base_obj import AsyncInit
from ..objmethods import MMGenObject
from ..rpc import json_encoder
@ -25,6 +25,8 @@ class TwJSON:
class Base(MMGenObject):
can_prune = False
pruned = None
fn_pfx = 'mmgen-tracking-wallet-dump'
def __new__(cls,proto,*args,**kwargs):
@ -39,7 +41,14 @@ class TwJSON:
@property
def dump_fn(self):
return f'{self.fn_pfx}-{self.coin}-{self.network}.json'
if self.pruned:
from ..addrlist import AddrIdxList
pruned_id = AddrIdxList(idx_list=self.pruned).id_str
return '{a}{b}-{c}-{d}.json'.format(
a = self.fn_pfx,
b = f'-pruned[{pruned_id}]' if self.pruned else '',
c = self.coin,
d = self.network )
def json_dump(self,data,pretty=False):
return json.dumps(
@ -130,7 +139,13 @@ class TwJSON:
class Export(Base,metaclass=AsyncInit):
async def __init__(self,proto,include_amts=True,pretty=False):
async def __init__(self,proto,include_amts=True,pretty=False,prune=False,warn_used=False):
if prune and not self.can_prune:
die(1,f'Pruning not supported for {proto.name} protocol')
self.prune = prune
self.warn_used = warn_used
super().__init__(proto)
@ -141,6 +156,11 @@ class TwJSON:
self.entries = await self.get_entries()
if self.prune:
msg('Pruned {} address{}'.format( len(self.pruned), suf(self.pruned,'es') ))
msg('Exporting {} address{}'.format( self.num_entries, suf(self.num_entries,'es') ))
data = {
'id': 'mmgen_tracking_wallet',
'version': 1,

165
mmgen/tw/prune.py Executable file
View file

@ -0,0 +1,165 @@
#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
# https://github.com/mmgen/mmgen
# https://gitlab.com/mmgen/mmgen
"""
tw.prune: Tracking wallet pruned listaddresses class for the MMGen suite
"""
from ..util import msg,rmsg,ymsg
from ..color import red,green,gray,yellow
from ..obj import ListItemAttr
from .addresses import TwAddresses
class TwAddressesPrune(TwAddresses):
mod_subpath = 'tw.prune'
class TwAddress(TwAddresses.TwAddress):
valid_attrs = TwAddresses.TwAddress.valid_attrs | {'tag'}
tag = ListItemAttr(bool,typeconv=False,reassign_ok=True)
async def __init__(self,*args,warn_used=False,**kwargs):
self.warn_used = warn_used
await super().__init__(*args,**kwargs)
def gen_display(self,data,cw,fs,color,fmt_method):
id_save = data[0].al_id
yes,no = red('Yes '),green('No ')
for n,d in enumerate(data,1):
if id_save != d.al_id:
id_save = d.al_id
yield ''
yield (
gray(fmt_method(n,d,cw,fs,False,'Yes ','No ')) if d.tag else
fmt_method(n,d,cw,fs,True,yes,no) )
def do_prune(self):
def gen():
for n,d in enumerate(self.data,1):
if d.tag:
pruned.append(n)
if d.amt:
rmsg(f'Warning: pruned address {d.twmmid.addr} has a balance!')
elif self.warn_used and d.recvd:
ymsg(f'Warning: pruned address {d.twmmid.addr} is used!')
else:
yield d
pruned = []
self.reverse = False
self.do_sort('twmmid')
self.data = list(gen())
return pruned
class action(TwAddresses.action):
def get_addrnums(self,parent,desc):
prompt = f'Enter a range or space-separated list of addresses to {desc}: '
from ..ui import line_input
msg('')
while True:
reply = line_input(prompt).strip()
if reply:
from ..addrlist import AddrIdxList
from ..obj import get_obj
selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
if selected:
if selected[-1] <= len(parent.disp_data):
return selected
msg(f'Address number must be <= {len(parent.disp_data)}')
else:
return []
def query_user(self,desc,addrnum,e):
from collections import namedtuple
md = namedtuple('mdata',['wmsg','prompt'])
m = {
'amt': md(
red('Address #{a} ({b}) has a balance of {c}!'.format(
a = addrnum,
b = e.twmmid.addr,
c = e.amt.hl2(color=False,unit=True) )),
'[p]rune anyway, [P]rune all with balance, [s]kip, [S]kip all with balance: ',
),
'used': md(
yellow('Address #{} ({}) is used!'.format( addrnum, e.twmmid.addr )),
'[p]rune anyway, [P]rune all used, [s]kip, [S]kip all used: ',
),
}
from ..term import get_char
valid_res = 'pPsS'
msg(m[desc].wmsg)
while True:
res = get_char( m[desc].prompt, immed_chars=valid_res )
if res in valid_res:
msg('')
return {
# auto, prune
'p': (False, True),
'P': (True, True),
's': (False, False),
'S': (True, False),
}[res]
else:
msg('\nInvalid keypress')
def a_prune(self,parent):
def do_entry(desc,n,addrnum,e):
if auto[desc]:
return False
else:
auto[desc],prune = self.query_user(desc,addrnum,e)
dfl[desc] = auto[desc] and prune
skip_all_used = auto['used'] and not dfl['used']
if auto[desc]: # we’ve switched to auto mode, so go back and fix up all previous entries
for addrnum in addrnums[:n]:
e = parent.disp_data[addrnum-1]
if skip_all_used and e.recvd:
e.tag = False
elif desc == 'amt' and e.amt:
e.tag = prune
elif desc == 'used' and (e.recvd and not e.amt):
e.tag = prune
# skipping all used addrs implies skipping all addrs with balances
if skip_all_used:
auto['amt'] = True
dfl['amt'] = False
return prune
addrnums = self.get_addrnums(parent,'prune')
dfl = {'amt': False, 'used': False} # default prune policy for given property (has amt, is used)
auto = {'amt': False, 'used': False} # whether to ask the user, or apply default policy automatically
for n,addrnum in enumerate(addrnums):
e = parent.disp_data[addrnum-1]
if e.amt and not dfl['amt']:
e.tag = do_entry('amt',n,addrnum,e)
elif parent.warn_used and (e.recvd and not e.amt) and not dfl['used']:
e.tag = do_entry('used',n,addrnum,e)
else:
e.tag = True
def a_unprune(self,parent):
for addrnum in self.get_addrnums(parent,'unprune'):
parent.disp_data[addrnum-1].tag = False
def a_clear_prune_list(self,parent):
for d in parent.data:
d.tag = False

View file

@ -161,6 +161,7 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
('subgroup.twexport', ['fund_users']),
('subgroup.rescan', ['fund_users']),
('subgroup.main', ['fund_users']),
('subgroup.twprune', ['main']),
('subgroup.txhist', ['main']),
('subgroup.label', ['main']),
('subgroup.view', ['label']),
@ -307,6 +308,22 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
('bob_nochg_burn', 'zero-change transaction to burn address'),
('generate', 'mining a block'),
),
'twprune': (
'exporting a pruned tracking wallet to JSON',
('bob_twprune_noask', 'pruning a tracking wallet'),
('bob_twprune_skip', 'pruning a tracking wallet (skip pruning)'),
('bob_twprune_all', 'pruning a tracking wallet (pruning all addrs)'),
('bob_twprune_skipamt', 'pruning a tracking wallet (skipping addrs with amt)'),
('bob_twprune_skipused', 'pruning a tracking wallet (skipping used addrs)'),
('bob_twprune_allamt', 'pruning a tracking wallet (pruning addrs with amt)'),
('bob_twprune_allused', 'pruning a tracking wallet (pruning used addrs)'),
('bob_twprune1', 'pruning a tracking wallet (selective prune)'),
('bob_twprune2', 'pruning a tracking wallet (selective prune)'),
('bob_twprune3', 'pruning a tracking wallet (selective prune)'),
('bob_twprune4', 'pruning a tracking wallet (selective prune)'),
('bob_twprune5', 'pruning a tracking wallet (selective prune)'),
('bob_twprune6', 'pruning a tracking wallet (selective prune)'),
),
'txhist': (
'viewing transaction history',
('bob_txhist1', "viewing Bob's transaction history (sort=age)"),
@ -1095,6 +1112,143 @@ class TestSuiteRegtest(TestSuiteBase,TestSuiteShared):
def bob_twexport_pretty(self):
return self.bob_twexport(add_args=['pretty=1'])
def _bob_twprune(
self,
prune_spec,
npruned,
expect_menu = (),
expect = (),
expect2 = (),
warn_used = False,
non_segwit_ok = False ):
if not (non_segwit_ok or self.proto.cap('segwit')):
return 'skip'
t = self.spawn(
'mmgen-tool',
['--bob',f'--outdir={self.tr.trash_dir}','twexport','prune=1']
+ (['warn_used=1'] if warn_used else []) )
for s in expect_menu:
t.expect('prune list:\b',s)
t.expect('prune list:\b','p')
t.expect('addresses to prune: ',f'{prune_spec}\n')
for p,s in expect:
t.expect(p,s,regex=True)
t.expect('prune list:\b','q')
for p,s in expect2:
t.expect(p,s,regex=True)
if npruned:
t.expect('Pruned {} addresses'.format(npruned))
taddr = 35 if self.proto.cap('segwit') else 25
t.expect('Exporting {} addresses'.format(taddr-npruned))
fn = t.written_to_file('JSON data')
return t
def bob_twprune_noask(self):
return self._bob_twprune(
expect_menu = 'a', # sort by amt to make address order deterministic
prune_spec = '35,12,18,3-5',
npruned = 6 )
def bob_twprune_all(self):
taddr = 35 if self.proto.cap('segwit') else 25
return self._bob_twprune(
prune_spec = f'1-{taddr}',
npruned = taddr,
expect = [('all with balance: ','P')],
non_segwit_ok = True )
def bob_twprune_skip(self):
return self._bob_twprune(
prune_spec = '',
npruned = 0,
non_segwit_ok = True )
def bob_twprune_skipamt(self):
return self._bob_twprune(
prune_spec = '1-35',
npruned = 32,
expect = [('all with balance: ','S')] )
def bob_twprune_skipused(self):
return self._bob_twprune(
prune_spec = '1-35',
npruned = 18,
expect = [('all used: ','S')],
warn_used = True )
def bob_twprune_allamt(self):
return self._bob_twprune(
prune_spec = '1-35',
npruned = 35,
expect = [('all with balance: ','P')],
expect2 = [('Warning: pruned address .* has a balance',None)] )
def bob_twprune_allused(self):
return self._bob_twprune(
prune_spec = '1-35',
npruned = 32,
expect = [('all used: ','P'),('all with balance: ','S')],
expect2 = [('Warning: pruned address .* used',None)],
warn_used = True )
@property
def _b_start(self):
"""
SIDs sort non-deterministically, so we must search for start of main (not subseeds) group, i.e. ':B:1'
"""
assert self.proto.cap('segwit')
if not hasattr(self,'_b_start_'):
t = self.spawn( 'mmgen-tool', ['--color=0','--bob','listaddresses'], no_msg=True )
self._b_start_ = int([e for e in t.read().split('\n') if f':B:1' in e][0].split()[0].rstrip(')'))
t.close()
return self._b_start_
def _bob_twprune_selected(self,resp,npruned):
if not self.proto.cap('segwit'):
return 'skip'
B = self._b_start
a,b,c,d,e,f = resp
return self._bob_twprune(
expect_menu = 'a', # sort by amt to make address order deterministic
prune_spec = f'31-32,{B+14},{B+9},{B}-{B+4}',
npruned = npruned,
expect = [
('all used: ', a),
('all used: ', b),
('all with balance: ', c),
('all with balance: ', d),
('all used: ', e),
('all used: ', f),
],
warn_used = True )
def bob_twprune1(self):
return self._bob_twprune_selected(resp='sssssS',npruned=3)
def bob_twprune2(self):
return self._bob_twprune_selected(resp='sppPsS',npruned=3)
def bob_twprune3(self):
return self._bob_twprune_selected(resp='sssPpS',npruned=3)
def bob_twprune4(self):
return self._bob_twprune_selected(resp='sssPpP',npruned=9)
def bob_twprune5(self):
return self._bob_twprune_selected(resp='pppPpP',npruned=9)
def bob_twprune6(self):
return self._bob_twprune_selected(resp='sssSpP',npruned=7)
def bob_edit_json_twdump(self):
self.spawn('',msg_only=True)
from mmgen.tw.json import TwJSON