From 9e8f625c9b9ea6586a57c468a76725fc514cf49a Mon Sep 17 00:00:00 2001 From: The MMGen Project Date: Thu, 20 Jan 2022 12:04:52 +0000 Subject: [PATCH] tw.py: relocate tracking wallet classes to new modules - TwAddrList -> twaddrs.py - TwGetBalance -> twbal.py - TrackingWallet -> twctl.py - TwUnspentOutputs -> twuo.py The corresponding Ethereum classes in altcoins/eth/tw.py have been similarly relocated. --- mmgen/altcoins/eth/tw.py | 334 +------ mmgen/altcoins/eth/twaddrs.py | 59 ++ mmgen/altcoins/eth/twbal.py | 52 ++ mmgen/altcoins/eth/twctl.py | 224 +++++ mmgen/altcoins/eth/twuo.py | 89 ++ mmgen/altcoins/eth/tx.py | 2 +- mmgen/data/version | 2 +- mmgen/main_addrimport.py | 2 +- mmgen/main_txbump.py | 2 +- mmgen/main_txcreate.py | 2 +- mmgen/main_txdo.py | 2 +- mmgen/tool.py | 16 +- mmgen/tw.py | 1087 +--------------------- mmgen/twaddrs.py | 191 ++++ mmgen/twbal.py | 103 ++ mmgen/twctl.py | 327 +++++++ mmgen/twuo.py | 525 +++++++++++ mmgen/tx.py | 4 +- test/objattrtest_py_d/oat_btc_mainnet.py | 6 +- test/objattrtest_py_d/oat_common.py | 2 +- test/overlay/fakemods/tw.py | 18 +- test/overlay/fakemods/twuo.py | 17 + 22 files changed, 1648 insertions(+), 1418 deletions(-) create mode 100755 mmgen/altcoins/eth/twaddrs.py create mode 100755 mmgen/altcoins/eth/twbal.py create mode 100755 mmgen/altcoins/eth/twctl.py create mode 100755 mmgen/altcoins/eth/twuo.py create mode 100755 mmgen/twaddrs.py create mode 100755 mmgen/twbal.py create mode 100755 mmgen/twctl.py create mode 100755 mmgen/twuo.py create mode 100644 test/overlay/fakemods/twuo.py diff --git a/mmgen/altcoins/eth/tw.py b/mmgen/altcoins/eth/tw.py index 6bafe4c9..e3786588 100755 --- a/mmgen/altcoins/eth/tw.py +++ b/mmgen/altcoins/eth/tw.py @@ -17,349 +17,21 @@ # along with this program. If not, see . """ -altcoins.eth.tw: Ethereum tracking wallet and related classes for the MMGen suite +altcoins.eth.tw: Ethereum tracking wallet dependency classes for the MMGen suite """ -import os - -from mmgen.util import msg,vmsg,ymsg,write_mode -from mmgen.obj import ListItemAttr,ImmutableAttr -from mmgen.tw import TrackingWallet,TwAddrList,TwUnspentOutputs,TwGetBalance,TwLabel -from mmgen.addr import is_coin_addr,is_mmgen_id from mmgen.addrdata import AddrData,TwAddrData -from .contract import Token,TokenResolve -from .obj import ETHAmt - -class EthereumTrackingWallet(TrackingWallet): - - caps = ('batch',) - data_key = 'accounts' - use_tw_file = True - - async def is_in_wallet(self,addr): - return addr in self.data_root - - def init_empty(self): - self.data = { 'coin': self.proto.coin, 'accounts': {}, 'tokens': {} } - - def upgrade_wallet_maybe(self): - - upgraded = False - - if not 'accounts' in self.data or not 'coin' in self.data: - ymsg(f'Upgrading {self.desc} (v1->v2: accounts field added)') - if not 'accounts' in self.data: - self.data = {} - import json - self.data['accounts'] = json.loads(self.orig_data) - if not 'coin' in self.data: - self.data['coin'] = self.proto.coin - upgraded = True - - def have_token_params_fields(): - for k in self.data['tokens']: - if 'params' in self.data['tokens'][k]: - return True - - def add_token_params_fields(): - for k in self.data['tokens']: - self.data['tokens'][k]['params'] = {} - - if not 'tokens' in self.data: - self.data['tokens'] = {} - upgraded = True - - if self.data['tokens'] and not have_token_params_fields(): - ymsg(f'Upgrading {self.desc} (v2->v3: token params fields added)') - add_token_params_fields() - upgraded = True - - if upgraded: - self.force_write() - msg(f'{self.desc} upgraded successfully!') - - async def rpc_get_balance(self,addr): - return ETHAmt(int(await self.rpc.call('eth_getBalance','0x'+addr,'latest'),16),'wei') - - @write_mode - async def batch_import_address(self,args_list): - for arg_list in args_list: - await self.import_address(*arg_list) - return args_list - - @write_mode - async def import_address(self,addr,label,foo): - r = self.data_root - if addr in r: - if not r[addr]['mmid'] and label.mmid: - msg(f'Warning: MMGen ID {label.mmid!r} was missing in tracking wallet!') - elif r[addr]['mmid'] != label.mmid: - die(3,'MMGen ID {label.mmid!r} does not match tracking wallet!') - r[addr] = { 'mmid': label.mmid, 'comment': label.comment } - - @write_mode - async def remove_address(self,addr): - r = self.data_root - - if is_coin_addr(self.proto,addr): - have_match = lambda k: k == addr - elif is_mmgen_id(self.proto,addr): - have_match = lambda k: r[k]['mmid'] == addr - else: - die(1,f'{addr!r} is not an Ethereum address or MMGen ID') - - for k in r: - if have_match(k): - # return the addr resolved to mmid if possible - ret = r[k]['mmid'] if is_mmgen_id(self.proto,r[k]['mmid']) else addr - del r[k] - self.write() - return ret - else: - msg(f'Address {addr!r} not found in {self.data_root_desc!r} section of tracking wallet') - return None - - @write_mode - async def set_label(self,coinaddr,lbl): - for addr,d in list(self.data_root.items()): - if addr == coinaddr: - d['comment'] = lbl.comment - self.write() - return None - else: - msg(f'Address {coinaddr!r} not found in {self.data_root_desc!r} section of tracking wallet') - return False - - async def addr2sym(self,req_addr): - for addr in self.data['tokens']: - if addr == req_addr: - return self.data['tokens'][addr]['params']['symbol'] - else: - return None - - async def sym2addr(self,sym): - for addr in self.data['tokens']: - if self.data['tokens'][addr]['params']['symbol'] == sym.upper(): - return addr - else: - return None - - def get_token_param(self,token,param): - if token in self.data['tokens']: - return self.data['tokens'][token]['params'].get(param) - return None - -class EthereumTokenTrackingWallet(EthereumTrackingWallet): - - desc = 'Ethereum token tracking wallet' - decimals = None - symbol = None - cur_eth_balances = {} - - async def __init__(self,proto,mode='r',token_addr=None): - await super().__init__(proto,mode=mode) - - for v in self.data['tokens'].values(): - self.conv_types(v) - - if self.importing and token_addr: - if not is_coin_addr(proto,token_addr): - raise InvalidTokenAddress(f'{token_addr!r}: invalid token address') - else: - assert token_addr == None,'EthereumTokenTrackingWallet_chk1' - token_addr = await self.sym2addr(proto.tokensym) # returns None on failure - if not is_coin_addr(proto,token_addr): - from mmgen.exception import UnrecognizedTokenSymbol - raise UnrecognizedTokenSymbol(f'Specified token {proto.tokensym!r} could not be resolved!') - - from mmgen.addr import TokenAddr - self.token = TokenAddr(proto,token_addr) - - if self.token not in self.data['tokens']: - if self.importing: - await self.import_token(self.token) - else: - raise TokenNotInWallet(f'Specified token {self.token!r} not in wallet!') - - self.decimals = self.get_param('decimals') - self.symbol = self.get_param('symbol') - - proto.tokensym = self.symbol - - async def is_in_wallet(self,addr): - return addr in self.data['tokens'][self.token] - - @property - def data_root(self): - return self.data['tokens'][self.token] - - @property - def data_root_desc(self): - return 'token ' + self.get_param('symbol') - - async def rpc_get_balance(self,addr): - return await Token(self.proto,self.token,self.decimals,self.rpc).get_balance(addr) - - async def get_eth_balance(self,addr,force_rpc=False): - cache = self.cur_eth_balances - r = self.data['accounts'] - ret = None if force_rpc else self.get_cached_balance(addr,cache,r) - if ret == None: - ret = await super().rpc_get_balance(addr) - self.cache_balance(addr,ret,cache,r) - return ret - - def get_param(self,param): - return self.data['tokens'][self.token]['params'][param] - - @write_mode - async def import_token(self,tokenaddr): - """ - Token 'symbol' and 'decimals' values are resolved from the network by the system just - once, upon token import. Thereafter, token address, symbol and decimals are resolved - either from the tracking wallet (online operations) or transaction file (when signing). - """ - t = await TokenResolve(self.proto,self.rpc,tokenaddr) - self.data['tokens'][tokenaddr] = { - 'params': { - 'symbol': await t.get_symbol(), - 'decimals': t.decimals - } - } - -# No unspent outputs with Ethereum, but naming must be consistent -class EthereumTwUnspentOutputs(TwUnspentOutputs): - - disp_type = 'eth' - can_group = False - col_adj = 29 - hdr_fmt = 'TRACKED ACCOUNTS (sort order: {})\nTotal {}: {}' - desc = 'account balances' - item_desc = 'account' - dump_fn_pfx = 'balances' - prompt = """ -Sort options: [a]mount, a[d]dress, [r]everse, [M]mgen addr -Display options: show [m]mgen addr, r[e]draw screen -Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, - add [l]abel, [D]elete address, [R]efresh balance: -""" - key_mappings = { - 'a':'s_amt','d':'s_addr','r':'d_reverse','M':'s_twmmid', - 'm':'d_mmid','e':'d_redraw', - 'q':'a_quit','p':'a_print','v':'a_view','w':'a_view_wide', - 'l':'a_lbl_add','D':'a_addr_delete','R':'a_balance_refresh' } - - async def __init__(self,proto,*args,**kwargs): - from mmgen.globalvars import g - if g.cached_balances: - from mmgen.color import yellow - self.hdr_fmt += '\n' + yellow('WARNING: Using cached balances. These may be out of date!') - await TwUnspentOutputs.__init__(self,proto,*args,**kwargs) - - def do_sort(self,key=None,reverse=False): - if key == 'txid': return - super().do_sort(key=key,reverse=reverse) - - async def get_unspent_rpc(self): - wl = self.wallet.sorted_list - if self.addrs: - wl = [d for d in wl if d['addr'] in self.addrs] - return [{ - 'account': TwLabel(self.proto,d['mmid']+' '+d['comment']), - 'address': d['addr'], - 'amount': await self.wallet.get_balance(d['addr']), - 'confirmations': 0, # TODO - } for d in wl] - - class MMGenTwUnspentOutput(TwUnspentOutputs.MMGenTwUnspentOutput): - valid_attrs = {'txid','vout','amt','amt2','label','twmmid','addr','confs','skip'} - invalid_attrs = {'proto'} - - def age_disp(self,o,age_fmt): # TODO - return None - -class EthereumTokenTwUnspentOutputs(EthereumTwUnspentOutputs): - - disp_type = 'token' - prompt_fs = 'Total to spend: {} {}\n\n' - col_adj = 37 - - def get_display_precision(self): - return 10 # truncate precision for narrow display - - async def get_unspent_data(self,*args,**kwargs): - await super().get_unspent_data(*args,**kwargs) - for e in self.unspent: - e.amt2 = await self.wallet.get_eth_balance(e.addr) - -class EthereumTwAddrList(TwAddrList): - - has_age = False - - async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None): - - self.proto = proto - self.wallet = wallet or await TrackingWallet(self.proto,mode='w') - tw_dict = self.wallet.mmid_ordered_dict - self.total = self.proto.coin_amt('0') - - from mmgen.addr import CoinAddr - for mmid,d in list(tw_dict.items()): -# if d['confirmations'] < minconf: continue # cannot get confirmations for eth account - label = TwLabel(self.proto,mmid+' '+d['comment']) - if usr_addr_list and (label.mmid not in usr_addr_list): - continue - bal = await self.wallet.get_balance(d['addr']) - if bal == 0 and not showempty: - if not label.comment or not all_labels: - continue - self[label.mmid] = {'amt': self.proto.coin_amt('0'), 'lbl': label } - if showbtcaddrs: - self[label.mmid]['addr'] = CoinAddr(self.proto,d['addr']) - self[label.mmid]['lbl'].mmid.confs = None - self[label.mmid]['amt'] += bal - self.total += bal - - del self.wallet - -class EthereumTokenTwAddrList(EthereumTwAddrList): - pass - -class EthereumTwGetBalance(TwGetBalance): - - fs = '{w:13} {c}\n' # TODO - for now, just suppress display of meaningless data - - async def __init__(self,proto,*args,**kwargs): - self.wallet = await TrackingWallet(proto,mode='w') - await TwGetBalance.__init__(self,proto,*args,**kwargs) - - async def create_data(self): - data = self.wallet.mmid_ordered_dict - for d in data: - if d.type == 'mmgen': - key = d.obj.sid - if key not in self.data: - self.data[key] = [self.proto.coin_amt('0')] * 4 - else: - key = 'Non-MMGen' - - conf_level = 2 # TODO - amt = await self.wallet.get_balance(data[d]['addr']) - - self.data['TOTAL'][conf_level] += amt - self.data[key][conf_level] += amt - - del self.wallet class EthereumTwAddrData(TwAddrData): async def get_tw_data(self,wallet=None): + from mmgen.twctl import TrackingWallet + from mmgen.util import vmsg vmsg('Getting address data from tracking wallet') tw = (wallet or await TrackingWallet(self.proto)).mmid_ordered_dict # emulate the output of RPC 'listaccounts' and 'getaddressesbyaccount' return [(mmid+' '+d['comment'],[d['addr']]) for mmid,d in list(tw.items())] -class EthereumTokenTwGetBalance(EthereumTwGetBalance): pass class EthereumTokenTwAddrData(EthereumTwAddrData): pass class EthereumAddrData(AddrData): pass diff --git a/mmgen/altcoins/eth/twaddrs.py b/mmgen/altcoins/eth/twaddrs.py new file mode 100755 index 00000000..511e1b39 --- /dev/null +++ b/mmgen/altcoins/eth/twaddrs.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +altcoins.eth.twaddrs: Ethereum tracking wallet listaddresses class for the MMGen suite +""" + +from mmgen.twaddrs import TwAddrList + +class EthereumTwAddrList(TwAddrList): + + has_age = False + + async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None): + + from mmgen.tw import TwLabel + from mmgen.twctl import TrackingWallet + from mmgen.addr import CoinAddr + + self.proto = proto + self.wallet = wallet or await TrackingWallet(self.proto,mode='w') + tw_dict = self.wallet.mmid_ordered_dict + self.total = self.proto.coin_amt('0') + + for mmid,d in list(tw_dict.items()): +# if d['confirmations'] < minconf: continue # cannot get confirmations for eth account + label = TwLabel(self.proto,mmid+' '+d['comment']) + if usr_addr_list and (label.mmid not in usr_addr_list): + continue + bal = await self.wallet.get_balance(d['addr']) + if bal == 0 and not showempty: + if not label.comment or not all_labels: + continue + self[label.mmid] = {'amt': self.proto.coin_amt('0'), 'lbl': label } + if showbtcaddrs: + self[label.mmid]['addr'] = CoinAddr(self.proto,d['addr']) + self[label.mmid]['lbl'].mmid.confs = None + self[label.mmid]['amt'] += bal + self.total += bal + + del self.wallet + +class EthereumTokenTwAddrList(EthereumTwAddrList): + pass diff --git a/mmgen/altcoins/eth/twbal.py b/mmgen/altcoins/eth/twbal.py new file mode 100755 index 00000000..f93d36af --- /dev/null +++ b/mmgen/altcoins/eth/twbal.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +altcoins.eth.twbal: Ethereum tracking wallet getbalance class for the MMGen suite +""" + +from mmgen.twctl import TrackingWallet +from mmgen.twbal import TwGetBalance + +class EthereumTwGetBalance(TwGetBalance): + + fs = '{w:13} {c}\n' # TODO - for now, just suppress display of meaningless data + + async def __init__(self,proto,*args,**kwargs): + self.wallet = await TrackingWallet(proto,mode='w') + await TwGetBalance.__init__(self,proto,*args,**kwargs) + + async def create_data(self): + data = self.wallet.mmid_ordered_dict + for d in data: + if d.type == 'mmgen': + key = d.obj.sid + if key not in self.data: + self.data[key] = [self.proto.coin_amt('0')] * 4 + else: + key = 'Non-MMGen' + + conf_level = 2 # TODO + amt = await self.wallet.get_balance(data[d]['addr']) + + self.data['TOTAL'][conf_level] += amt + self.data[key][conf_level] += amt + + del self.wallet + +class EthereumTokenTwGetBalance(EthereumTwGetBalance): pass diff --git a/mmgen/altcoins/eth/twctl.py b/mmgen/altcoins/eth/twctl.py new file mode 100755 index 00000000..4b4800a5 --- /dev/null +++ b/mmgen/altcoins/eth/twctl.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +altcoins.eth.twctl: Ethereum tracking wallet control class for the MMGen suite +""" + +from mmgen.util import msg,ymsg,write_mode +from mmgen.twctl import TrackingWallet +from mmgen.addr import is_coin_addr,is_mmgen_id +from .contract import Token,TokenResolve +from .obj import ETHAmt + +class EthereumTrackingWallet(TrackingWallet): + + caps = ('batch',) + data_key = 'accounts' + use_tw_file = True + + async def is_in_wallet(self,addr): + return addr in self.data_root + + def init_empty(self): + self.data = { 'coin': self.proto.coin, 'accounts': {}, 'tokens': {} } + + def upgrade_wallet_maybe(self): + + upgraded = False + + if not 'accounts' in self.data or not 'coin' in self.data: + ymsg(f'Upgrading {self.desc} (v1->v2: accounts field added)') + if not 'accounts' in self.data: + self.data = {} + import json + self.data['accounts'] = json.loads(self.orig_data) + if not 'coin' in self.data: + self.data['coin'] = self.proto.coin + upgraded = True + + def have_token_params_fields(): + for k in self.data['tokens']: + if 'params' in self.data['tokens'][k]: + return True + + def add_token_params_fields(): + for k in self.data['tokens']: + self.data['tokens'][k]['params'] = {} + + if not 'tokens' in self.data: + self.data['tokens'] = {} + upgraded = True + + if self.data['tokens'] and not have_token_params_fields(): + ymsg(f'Upgrading {self.desc} (v2->v3: token params fields added)') + add_token_params_fields() + upgraded = True + + if upgraded: + self.force_write() + msg(f'{self.desc} upgraded successfully!') + + async def rpc_get_balance(self,addr): + return ETHAmt(int(await self.rpc.call('eth_getBalance','0x'+addr,'latest'),16),'wei') + + @write_mode + async def batch_import_address(self,args_list): + for arg_list in args_list: + await self.import_address(*arg_list) + return args_list + + @write_mode + async def import_address(self,addr,label,foo): + r = self.data_root + if addr in r: + if not r[addr]['mmid'] and label.mmid: + msg(f'Warning: MMGen ID {label.mmid!r} was missing in tracking wallet!') + elif r[addr]['mmid'] != label.mmid: + die(3,'MMGen ID {label.mmid!r} does not match tracking wallet!') + r[addr] = { 'mmid': label.mmid, 'comment': label.comment } + + @write_mode + async def remove_address(self,addr): + r = self.data_root + + if is_coin_addr(self.proto,addr): + have_match = lambda k: k == addr + elif is_mmgen_id(self.proto,addr): + have_match = lambda k: r[k]['mmid'] == addr + else: + die(1,f'{addr!r} is not an Ethereum address or MMGen ID') + + for k in r: + if have_match(k): + # return the addr resolved to mmid if possible + ret = r[k]['mmid'] if is_mmgen_id(self.proto,r[k]['mmid']) else addr + del r[k] + self.write() + return ret + else: + msg(f'Address {addr!r} not found in {self.data_root_desc!r} section of tracking wallet') + return None + + @write_mode + async def set_label(self,coinaddr,lbl): + for addr,d in list(self.data_root.items()): + if addr == coinaddr: + d['comment'] = lbl.comment + self.write() + return None + else: + msg(f'Address {coinaddr!r} not found in {self.data_root_desc!r} section of tracking wallet') + return False + + async def addr2sym(self,req_addr): + for addr in self.data['tokens']: + if addr == req_addr: + return self.data['tokens'][addr]['params']['symbol'] + else: + return None + + async def sym2addr(self,sym): + for addr in self.data['tokens']: + if self.data['tokens'][addr]['params']['symbol'] == sym.upper(): + return addr + else: + return None + + def get_token_param(self,token,param): + if token in self.data['tokens']: + return self.data['tokens'][token]['params'].get(param) + return None + +class EthereumTokenTrackingWallet(EthereumTrackingWallet): + + desc = 'Ethereum token tracking wallet' + decimals = None + symbol = None + cur_eth_balances = {} + + async def __init__(self,proto,mode='r',token_addr=None): + await super().__init__(proto,mode=mode) + + for v in self.data['tokens'].values(): + self.conv_types(v) + + if self.importing and token_addr: + if not is_coin_addr(proto,token_addr): + raise InvalidTokenAddress(f'{token_addr!r}: invalid token address') + else: + assert token_addr == None,'EthereumTokenTrackingWallet_chk1' + token_addr = await self.sym2addr(proto.tokensym) # returns None on failure + if not is_coin_addr(proto,token_addr): + from mmgen.exception import UnrecognizedTokenSymbol + raise UnrecognizedTokenSymbol(f'Specified token {proto.tokensym!r} could not be resolved!') + + from mmgen.addr import TokenAddr + self.token = TokenAddr(proto,token_addr) + + if self.token not in self.data['tokens']: + if self.importing: + await self.import_token(self.token) + else: + raise TokenNotInWallet(f'Specified token {self.token!r} not in wallet!') + + self.decimals = self.get_param('decimals') + self.symbol = self.get_param('symbol') + + proto.tokensym = self.symbol + + async def is_in_wallet(self,addr): + return addr in self.data['tokens'][self.token] + + @property + def data_root(self): + return self.data['tokens'][self.token] + + @property + def data_root_desc(self): + return 'token ' + self.get_param('symbol') + + async def rpc_get_balance(self,addr): + return await Token(self.proto,self.token,self.decimals,self.rpc).get_balance(addr) + + async def get_eth_balance(self,addr,force_rpc=False): + cache = self.cur_eth_balances + r = self.data['accounts'] + ret = None if force_rpc else self.get_cached_balance(addr,cache,r) + if ret == None: + ret = await super().rpc_get_balance(addr) + self.cache_balance(addr,ret,cache,r) + return ret + + def get_param(self,param): + return self.data['tokens'][self.token]['params'][param] + + @write_mode + async def import_token(self,tokenaddr): + """ + Token 'symbol' and 'decimals' values are resolved from the network by the system just + once, upon token import. Thereafter, token address, symbol and decimals are resolved + either from the tracking wallet (online operations) or transaction file (when signing). + """ + t = await TokenResolve(self.proto,self.rpc,tokenaddr) + self.data['tokens'][tokenaddr] = { + 'params': { + 'symbol': await t.get_symbol(), + 'decimals': t.decimals + } + } diff --git a/mmgen/altcoins/eth/twuo.py b/mmgen/altcoins/eth/twuo.py new file mode 100755 index 00000000..f8c7535d --- /dev/null +++ b/mmgen/altcoins/eth/twuo.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +altcoins.eth.twuo: Ethereum tracking wallet unspent outputs class for the MMGen suite +""" + +from mmgen.tw import TwLabel +from mmgen.twuo import TwUnspentOutputs + +# No unspent outputs with Ethereum, but naming must be consistent +class EthereumTwUnspentOutputs(TwUnspentOutputs): + + disp_type = 'eth' + can_group = False + col_adj = 29 + hdr_fmt = 'TRACKED ACCOUNTS (sort order: {})\nTotal {}: {}' + desc = 'account balances' + item_desc = 'account' + dump_fn_pfx = 'balances' + prompt = """ +Sort options: [a]mount, a[d]dress, [r]everse, [M]mgen addr +Display options: show [m]mgen addr, r[e]draw screen +Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, + add [l]abel, [D]elete address, [R]efresh balance: +""" + key_mappings = { + 'a':'s_amt','d':'s_addr','r':'d_reverse','M':'s_twmmid', + 'm':'d_mmid','e':'d_redraw', + 'q':'a_quit','p':'a_print','v':'a_view','w':'a_view_wide', + 'l':'a_lbl_add','D':'a_addr_delete','R':'a_balance_refresh' } + + async def __init__(self,proto,*args,**kwargs): + from mmgen.globalvars import g + if g.cached_balances: + from mmgen.color import yellow + self.hdr_fmt += '\n' + yellow('WARNING: Using cached balances. These may be out of date!') + await TwUnspentOutputs.__init__(self,proto,*args,**kwargs) + + def do_sort(self,key=None,reverse=False): + if key == 'txid': return + super().do_sort(key=key,reverse=reverse) + + async def get_unspent_rpc(self): + wl = self.wallet.sorted_list + if self.addrs: + wl = [d for d in wl if d['addr'] in self.addrs] + return [{ + 'account': TwLabel(self.proto,d['mmid']+' '+d['comment']), + 'address': d['addr'], + 'amount': await self.wallet.get_balance(d['addr']), + 'confirmations': 0, # TODO + } for d in wl] + + class MMGenTwUnspentOutput(TwUnspentOutputs.MMGenTwUnspentOutput): + valid_attrs = {'txid','vout','amt','amt2','label','twmmid','addr','confs','skip'} + invalid_attrs = {'proto'} + + def age_disp(self,o,age_fmt): # TODO + return None + +class EthereumTokenTwUnspentOutputs(EthereumTwUnspentOutputs): + + disp_type = 'token' + prompt_fs = 'Total to spend: {} {}\n\n' + col_adj = 37 + + def get_display_precision(self): + return 10 # truncate precision for narrow display + + async def get_unspent_data(self,*args,**kwargs): + await super().get_unspent_data(*args,**kwargs) + for e in self.unspent: + e.amt2 = await self.wallet.get_eth_balance(e.addr) diff --git a/mmgen/altcoins/eth/tx.py b/mmgen/altcoins/eth/tx.py index 3f3365e1..ff80c680 100755 --- a/mmgen/altcoins/eth/tx.py +++ b/mmgen/altcoins/eth/tx.py @@ -34,7 +34,7 @@ from mmgen.obj import Int,Str,HexStr,CoinTxID,MMGenTxID from mmgen.addr import MMGenID,CoinAddr,TokenAddr,is_mmgen_id,is_coin_addr from mmgen.tx import MMGenTX -from mmgen.tw import TrackingWallet +from mmgen.twctl import TrackingWallet from .contract import Token from .obj import ETHAmt,ETHNonce diff --git a/mmgen/data/version b/mmgen/data/version index 6c72c7e1..c400957f 100644 --- a/mmgen/data/version +++ b/mmgen/data/version @@ -1 +1 @@ -13.1.dev005 +13.1.dev006 diff --git a/mmgen/main_addrimport.py b/mmgen/main_addrimport.py index 17a6c3d3..58b0e992 100755 --- a/mmgen/main_addrimport.py +++ b/mmgen/main_addrimport.py @@ -157,7 +157,7 @@ def make_args_list(tw,al,batch,rescan): yield (tw,e.addr,TwLabel(proto,label),rescan,fs,msg_args) async def main(): - from .tw import TrackingWallet + from .twctl import TrackingWallet if opt.token_addr: proto.tokensym = 'foo' # hack to trigger 'Token' in altcoin_subclass() diff --git a/mmgen/main_txbump.py b/mmgen/main_txbump.py index d48d3e91..dcf92014 100755 --- a/mmgen/main_txbump.py +++ b/mmgen/main_txbump.py @@ -133,7 +133,7 @@ async def main(): kl = get_keylist(orig_tx.proto,opt) sign_and_send = bool(seed_files or kl or kal) - from .tw import TrackingWallet + from .twctl import TrackingWallet tx = MMGenTX.Bump( data = orig_tx.__dict__, send = sign_and_send, diff --git a/mmgen/main_txcreate.py b/mmgen/main_txcreate.py index 808fcd0d..76cd986a 100755 --- a/mmgen/main_txcreate.py +++ b/mmgen/main_txcreate.py @@ -82,7 +82,7 @@ async def main(): proto = init_proto_from_opts() from .tx import MMGenTX - from .tw import TrackingWallet + from .twctl import TrackingWallet tx1 = MMGenTX.New( proto = proto, tw = await TrackingWallet(proto) if proto.tokensym else None ) diff --git a/mmgen/main_txdo.py b/mmgen/main_txdo.py index 178674e7..ddaf82a7 100755 --- a/mmgen/main_txdo.py +++ b/mmgen/main_txdo.py @@ -122,7 +122,7 @@ from .txsign import * seed_files = get_seed_files(opt,cmd_args) async def main(): - from .tw import TrackingWallet + from .twctl import TrackingWallet from .protocol import init_proto_from_opts proto = init_proto_from_opts() diff --git a/mmgen/tool.py b/mmgen/tool.py index 35471094..3bb8bf07 100755 --- a/mmgen/tool.py +++ b/mmgen/tool.py @@ -904,7 +904,7 @@ class MMGenToolCmdWallet(MMGenToolCmds): ret = d.sec.wif if target=='wif' else d.addr return ret -from .tw import TwAddrList,TwUnspentOutputs +from .tw import TwCommon class MMGenToolCmdRPC(MMGenToolCmds): "tracking wallet commands using the JSON-RPC interface" @@ -917,7 +917,7 @@ class MMGenToolCmdRPC(MMGenToolCmds): async def getbalance(self,minconf=1,quiet=False,pager=False): "list confirmed/unconfirmed, spendable/unspendable balances in tracking wallet" - from .tw import TwGetBalance + from .twbal import TwGetBalance return (await TwGetBalance(self.proto,minconf,quiet)).format() async def listaddress(self, @@ -926,7 +926,7 @@ class MMGenToolCmdRPC(MMGenToolCmds): pager = False, showempty = True, showbtcaddr = True, - age_fmt: _options_annot_str(TwAddrList.age_fmts) = 'confs', + age_fmt: _options_annot_str(TwCommon.age_fmts) = 'confs', ): "list the specified MMGen address and its balance" return await self.listaddresses( mmgen_addrs = mmgen_addr, @@ -945,7 +945,7 @@ class MMGenToolCmdRPC(MMGenToolCmds): showbtcaddrs = True, all_labels = False, sort: _options_annot_str(['reverse','age']) = '', - age_fmt: _options_annot_str(TwAddrList.age_fmts) = 'confs', + age_fmt: _options_annot_str(TwCommon.age_fmts) = 'confs', ): "list MMGen addresses and their balances" show_age = bool(age_fmt) @@ -966,6 +966,7 @@ class MMGenToolCmdRPC(MMGenToolCmds): from .addrlist import AddrIdxList usr_addr_list = [MMGenID(self.proto,f'{a[0]}:{i}') for i in AddrIdxList(a[1])] + from .twaddrs import TwAddrList al = await TwAddrList(self.proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels) if not al: die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty]) @@ -977,10 +978,11 @@ class MMGenToolCmdRPC(MMGenToolCmds): wide = False, minconf = 1, sort = 'age', - age_fmt: _options_annot_str(TwUnspentOutputs.age_fmts) = 'confs', + age_fmt: _options_annot_str(TwCommon.age_fmts) = 'confs', show_mmid = True, wide_show_confs = True): "view tracking wallet" + from .twuo import TwUnspentOutputs twuo = await TwUnspentOutputs(self.proto,minconf=minconf) await twuo.get_unspent_data(reverse_sort=reverse) twuo.age_fmt = age_fmt @@ -994,7 +996,7 @@ class MMGenToolCmdRPC(MMGenToolCmds): async def add_label(self,mmgen_or_coin_addr:str,label:str): "add descriptive label for address in tracking wallet" - from .tw import TrackingWallet + from .twctl import TrackingWallet await (await TrackingWallet(self.proto,mode='w')).add_label(mmgen_or_coin_addr,label,on_fail='raise') return True @@ -1005,7 +1007,7 @@ class MMGenToolCmdRPC(MMGenToolCmds): async def remove_address(self,mmgen_or_coin_addr:str): "remove an address from tracking wallet" - from .tw import TrackingWallet + from .twctl import TrackingWallet ret = await (await TrackingWallet(self.proto,mode='w')).remove_address(mmgen_or_coin_addr) # returns None on failure if ret: msg(f'Address {ret!r} deleted from tracking wallet') diff --git a/mmgen/tw.py b/mmgen/tw.py index b5fb59f5..b5468052 100755 --- a/mmgen/tw.py +++ b/mmgen/tw.py @@ -17,39 +17,45 @@ # along with this program. If not, see . """ -tw: Tracking wallet methods for the MMGen suite +tw: Tracking wallet dependency classes for the MMGen suite """ -import os,json,time -from collections import namedtuple +import time from string import ascii_letters,digits -from .globalvars import g -from .color import red,yellow,green -from .exception import BadTwLabel,BadTwComment,BadAgeFormat,WalletFileError -from .util import ( - msg, - msg_r, - dmsg, - die, - capfirst, - suf, - fmt, - make_timestr, - check_or_create_dir, - keypress_confirm, - write_data_to_file, - get_data_from_file, - line_input, - do_pager, - write_mode, - altcoin_subclass -) -from .base_obj import AsyncInit +from .exception import BadTwLabel,BadTwComment from .objmethods import Hilite,InitErrors,MMGenObject -from .obj import ImmutableAttr,ListItemAttr,MMGenListItem,MMGenList,MMGenDict,TwComment,get_obj -from .addr import CoinAddr,MMGenID,AddrIdx,is_mmgen_id,is_coin_addr -from .rpc import rpc_init +from .obj import TwComment +from .addr import MMGenID + +# mixin class for TwUnspentOutputs,TwAddrList: +class TwCommon: + + age_fmts = ('confs','block','days','date','date_time') + + date_formatter = { + 'days': lambda rpc,secs: (rpc.cur_date - secs) // 86400, + 'date': lambda rpc,secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:], + 'date_time': lambda rpc,secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5]), + } + + def age_disp(self,o,age_fmt): + if age_fmt == 'confs': + return o.confs + elif age_fmt == 'block': + return self.rpc.blockcount - (o.confs - 1) + else: + return self.date_formatter[age_fmt](self.rpc,o.date) + + @staticmethod + async def set_dates(rpc,us): + if rpc.proto.base_proto != 'Bitcoin': + return + if us and us[0].date is None: + # 'blocktime' differs from 'time', is same as getblockheader['time'] + dates = [o['blocktime'] for o in await rpc.gathered_call('gettransaction',[(o.txid,) for o in us])] + for idx,o in enumerate(us): + o.date = dates[idx] class TwMMGenID(str,Hilite,InitErrors,MMGenObject): color = 'orange' @@ -109,1028 +115,3 @@ def get_tw_label(proto,s): except Exception as e: # print(e) return None - -class TwUnspentOutputs(MMGenObject,metaclass=AsyncInit): - - def __new__(cls,proto,*args,**kwargs): - return MMGenObject.__new__(altcoin_subclass(cls,proto,'tw')) - - txid_w = 64 - disp_type = 'btc' - can_group = True - hdr_fmt = 'UNSPENT OUTPUTS (sort order: {}) Total {}: {}' - desc = 'unspent outputs' - item_desc = 'unspent output' - dump_fn_pfx = 'listunspent' - prompt_fs = 'Total to spend, excluding fees: {} {}\n\n' - prompt = """ -Sort options: [t]xid, [a]mount, a[d]dress, [A]ge, [r]everse, [M]mgen addr -Display options: toggle [D]ays/date, show [g]roup, show [m]mgen addr, r[e]draw -Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, add [l]abel: -""" - key_mappings = { - 't':'s_txid','a':'s_amt','d':'s_addr','A':'s_age','r':'d_reverse','M':'s_twmmid', - 'D':'d_days','g':'d_group','m':'d_mmid','e':'d_redraw', - 'q':'a_quit','p':'a_print','v':'a_view','w':'a_view_wide','l':'a_lbl_add' } - col_adj = 38 - age_fmts = ('confs','block','days','date','date_time') - age_fmts_date_dependent = ('days','date','date_time') - age_fmts_interactive = ('confs','block','days','date') - _age_fmt = 'confs' - - class MMGenTwOutputList(list,MMGenObject): pass - - class MMGenTwUnspentOutput(MMGenListItem): - txid = ListItemAttr('CoinTxID') - vout = ListItemAttr(int,typeconv=False) - amt = ImmutableAttr(None) - amt2 = ListItemAttr(None) - label = ListItemAttr('TwComment',reassign_ok=True) - twmmid = ImmutableAttr(TwMMGenID,include_proto=True) - addr = ImmutableAttr(CoinAddr,include_proto=True) - confs = ImmutableAttr(int,typeconv=False) - date = ListItemAttr(int,typeconv=False,reassign_ok=True) - scriptPubKey = ImmutableAttr('HexStr') - skip = ListItemAttr(str,typeconv=False,reassign_ok=True) - - # required by gen_unspent(); setting valid_attrs explicitly is also more efficient - valid_attrs = {'txid','vout','amt','amt2','label','twmmid','addr','confs','date','scriptPubKey','skip'} - invalid_attrs = {'proto'} - - def __init__(self,proto,**kwargs): - self.__dict__['proto'] = proto - MMGenListItem.__init__(self,**kwargs) - - class conv_funcs: - def amt(self,value): - return self.proto.coin_amt(value) - def amt2(self,value): - return self.proto.coin_amt(value) - - async def __init__(self,proto,minconf=1,addrs=[]): - self.proto = proto - self.unspent = self.MMGenTwOutputList() - self.fmt_display = '' - self.fmt_print = '' - self.cols = None - self.reverse = False - self.group = False - self.show_mmid = True - self.minconf = minconf - self.addrs = addrs - self.sort_key = 'age' - self.disp_prec = self.get_display_precision() - self.rpc = await rpc_init(proto) - - self.wallet = await TrackingWallet(proto,mode='w') - if self.disp_type == 'token': - self.proto.tokensym = self.wallet.symbol - - @property - def age_fmt(self): - return self._age_fmt - - @age_fmt.setter - def age_fmt(self,val): - if val not in self.age_fmts: - raise BadAgeFormat(f'{val!r}: invalid age format (must be one of {self.age_fmts!r})') - self._age_fmt = val - - def get_display_precision(self): - return self.proto.coin_amt.max_prec - - @property - def total(self): - return sum(i.amt for i in self.unspent) - - async def get_unspent_rpc(self): - # bitcoin-cli help listunspent: - # Arguments: - # 1. minconf (numeric, optional, default=1) The minimum confirmations to filter - # 2. maxconf (numeric, optional, default=9999999) The maximum confirmations to filter - # 3. addresses (json array, optional, default=empty array) A json array of bitcoin addresses - # 4. include_unsafe (boolean, optional, default=true) Include outputs that are not safe to spend - # 5. query_options (json object, optional) JSON with query options - - # for now, self.addrs is just an empty list for Bitcoin and friends - add_args = (9999999,self.addrs) if self.addrs else () - return await self.rpc.call('listunspent',self.minconf,*add_args) - - async def get_unspent_data(self,sort_key=None,reverse_sort=False): - - us_raw = await self.get_unspent_rpc() - - if not us_raw: - die(0,fmt(f""" - No spendable outputs found! Import addresses with balances into your - watch-only wallet using '{g.proj_name.lower()}-addrimport' and then re-run this program. - """).strip()) - - lbl_id = ('account','label')['label_api' in self.rpc.caps] - - def gen_unspent(): - for o in us_raw: - if not lbl_id in o: - continue # coinbase outputs have no account field - l = get_tw_label(self.proto,o[lbl_id]) - if l: - o.update({ - 'twmmid': l.mmid, - 'label': l.comment or '', - 'amt': self.proto.coin_amt(o['amount']), - 'addr': CoinAddr(self.proto,o['address']), - 'confs': o['confirmations'] - }) - yield self.MMGenTwUnspentOutput( - self.proto, - **{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } ) - - self.unspent = self.MMGenTwOutputList(gen_unspent()) - - if not self.unspent: - die(1, f'No tracked {self.item_desc}s in tracking wallet!') - - self.do_sort(key=sort_key,reverse=reverse_sort) - - def do_sort(self,key=None,reverse=False): - sort_funcs = { - 'addr': lambda i: i.addr, - 'age': lambda i: 0 - i.confs, - 'amt': lambda i: i.amt, - 'txid': lambda i: f'{i.txid} {i.vout:04}', - 'twmmid': lambda i: i.twmmid.sort_key - } - key = key or self.sort_key - if key not in sort_funcs: - die(1,f'{key!r}: invalid sort key. Valid options: {" ".join(sort_funcs.keys())}') - self.sort_key = key - assert type(reverse) == bool - self.unspent.sort(key=sort_funcs[key],reverse=reverse or self.reverse) - - def sort_info(self,include_group=True): - ret = ([],['Reverse'])[self.reverse] - ret.append(capfirst(self.sort_key).replace('Twmmid','MMGenID')) - if include_group and self.group and (self.sort_key in ('addr','txid','twmmid')): - ret.append('Grouped') - return ret - - def set_term_columns(self): - from .term import get_terminal_size - while True: - self.cols = g.terminal_width or get_terminal_size().width - if self.cols >= g.min_screen_width: - break - line_input( - 'Screen too narrow to display the tracking wallet\n' - + f'Please resize your screen to at least {g.min_screen_width} characters and hit ENTER ' ) - - def get_display_constants(self): - unsp = self.unspent - for i in unsp: - i.skip = '' - - # allow for 7-digit confirmation nums - col1_w = max(3,len(str(len(unsp)))+1) # num + ')' - mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in unsp) or 12 # DEADBEEF:S:1 - max_acct_w = max(i.label.screen_width for i in unsp) + mmid_w + 1 - max_btcaddr_w = max(len(i.addr) for i in unsp) - min_addr_w = self.cols - self.col_adj - addr_w = min(max_btcaddr_w + (0,1+max_acct_w)[self.show_mmid],min_addr_w) - acct_w = min(max_acct_w, max(24,addr_w-10)) - btaddr_w = addr_w - acct_w - 1 - label_w = acct_w - mmid_w - 1 - tx_w = min(self.txid_w,self.cols-addr_w-29-col1_w) # min=6 TODO - txdots = ('','..')[tx_w < self.txid_w] - - dc = namedtuple('display_constants',['col1_w','mmid_w','addr_w','btaddr_w','label_w','tx_w','txdots']) - return dc(col1_w,mmid_w,addr_w,btaddr_w,label_w,tx_w,txdots) - - @staticmethod - async def set_dates(rpc,us): - if rpc.proto.base_proto != 'Bitcoin': - return - if us and us[0].date is None: - # 'blocktime' differs from 'time', is same as getblockheader['time'] - dates = [o['blocktime'] for o in await rpc.gathered_call('gettransaction',[(o.txid,) for o in us])] - for idx,o in enumerate(us): - o.date = dates[idx] - - async def format_for_display(self): - unsp = self.unspent - if self.age_fmt in self.age_fmts_date_dependent: - await self.set_dates(self.rpc,unsp) - self.set_term_columns() - - c = getattr(self,'display_constants',None) - if not c: - c = self.display_constants = self.get_display_constants() - - if self.group and (self.sort_key in ('addr','txid','twmmid')): - for a,b in [(unsp[i],unsp[i+1]) for i in range(len(unsp)-1)]: - for k in ('addr','txid','twmmid'): - if self.sort_key == k and getattr(a,k) == getattr(b,k): - b.skip = (k,'addr')[k=='twmmid'] - - def gen_output(): - yield self.hdr_fmt.format(' '.join(self.sort_info()),self.proto.dcoin,self.total.hl()) - if self.proto.chain_name != 'mainnet': - yield 'Chain: '+green(self.proto.chain_name.upper()) - fs = { 'btc': ' {n:%s} {t:%s} {v:2} {a} {A} {c:<}' % (c.col1_w,c.tx_w), - 'eth': ' {n:%s} {a} {A}' % c.col1_w, - 'token': ' {n:%s} {a} {A} {A2}' % c.col1_w }[self.disp_type] - fs_hdr = ' {n:%s} {t:%s} {a} {A} {c:<}' % (c.col1_w,c.tx_w) if self.disp_type == 'btc' else fs - date_hdr = { - 'confs': 'Confs', - 'block': 'Block', - 'days': 'Age(d)', - 'date': 'Date', - 'date_time': 'Date', - } - yield fs_hdr.format( - n = 'Num', - t = 'TXid'.ljust(c.tx_w - 2) + ' Vout', - a = 'Address'.ljust(c.addr_w), - A = f'Amt({self.proto.dcoin})'.ljust(self.disp_prec+5), - A2 = f' Amt({self.proto.coin})'.ljust(self.disp_prec+4), - c = date_hdr[self.age_fmt], - ).rstrip() - - for n,i in enumerate(unsp): - addr_dots = '|' + '.'*(c.addr_w-1) - mmid_disp = MMGenID.fmtc( - ( - '.'*c.mmid_w if i.skip == 'addr' else - i.twmmid if i.twmmid.type == 'mmgen' else - f'Non-{g.proj_name}' - ), - width = c.mmid_w, - color = True ) - - if self.show_mmid: - addr_out = '{} {}{}'.format(( - type(i.addr).fmtc(addr_dots,width=c.btaddr_w,color=True) if i.skip == 'addr' else - i.addr.fmt(width=c.btaddr_w,color=True) - ), - mmid_disp, - (' ' + i.label.fmt(width=c.label_w,color=True)) if c.label_w > 0 else '' - ) - else: - addr_out = ( - type(i.addr).fmtc(addr_dots,width=c.addr_w,color=True) if i.skip=='addr' else - i.addr.fmt(width=c.addr_w,color=True) ) - - yield fs.format( - n = str(n+1)+')', - t = ( - '' if not i.txid else - ' ' * (c.tx_w-4) + '|...' if i.skip == 'txid' else - i.txid[:c.tx_w-len(c.txdots)] + c.txdots ), - v = i.vout, - a = addr_out, - A = i.amt.fmt(color=True,prec=self.disp_prec), - A2 = (i.amt2.fmt(color=True,prec=self.disp_prec) if i.amt2 is not None else ''), - c = self.age_disp(i,self.age_fmt), - ).rstrip() - - self.fmt_display = '\n'.join(gen_output()) + '\n' - return self.fmt_display - - async def format_for_printing(self,color=False,show_confs=True): - await self.set_dates(self.rpc,self.unspent) - addr_w = max(len(i.addr) for i in self.unspent) - mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in self.unspent) or 12 # DEADBEEF:S:1 - amt_w = self.proto.coin_amt.max_prec + 5 - cfs = '{c:<8} ' if show_confs else '' - fs = { - 'btc': (' {n:4} {t:%s} {a} {m} {A:%s} ' + cfs + '{b:<8} {D:<19} {l}') % (self.txid_w+3,amt_w), - 'eth': ' {n:4} {a} {m} {A:%s} {l}' % amt_w, - 'token': ' {n:4} {a} {m} {A:%s} {A2:%s} {l}' % (amt_w,amt_w) - }[self.disp_type] - - def gen_output(): - yield fs.format( - n = 'Num', - t = 'Tx ID,Vout', - a = 'Address'.ljust(addr_w), - m = 'MMGen ID'.ljust(mmid_w), - A = f'Amount({self.proto.dcoin})', - A2 = f'Amount({self.proto.coin})', - c = 'Confs', # skipped for eth - b = 'Block', # skipped for eth - D = 'Date', - l = 'Label' ) - - max_lbl_len = max([len(i.label) for i in self.unspent if i.label] or [2]) - for n,i in enumerate(self.unspent): - yield fs.format( - n = str(n+1) + ')', - t = '{},{}'.format( - ('|'+'.'*63 if i.skip == 'txid' and self.group else i.txid), - i.vout ), - a = ( - '|'+'.' * addr_w if i.skip == 'addr' and self.group else - i.addr.fmt(color=color,width=addr_w) ), - m = MMGenID.fmtc( - (i.twmmid if i.twmmid.type == 'mmgen' else f'Non-{g.proj_name}'), - width = mmid_w, - color = color ), - A = i.amt.fmt(color=color), - A2 = ( i.amt2.fmt(color=color) if i.amt2 is not None else '' ), - c = i.confs, - b = self.rpc.blockcount - (i.confs - 1), - D = self.age_disp(i,'date_time'), - l = i.label.hl(color=color) if i.label else - TwComment.fmtc( - s = '', - color = color, - nullrepl = '-', - width = max_lbl_len ) - ).rstrip() - - fs2 = '{} (block #{}, {} UTC)\n{}Sort order: {}\n{}\n\nTotal {}: {}\n' - self.fmt_print = fs2.format( - capfirst(self.desc), - self.rpc.blockcount, - make_timestr(self.rpc.cur_date), - ('' if self.proto.chain_name == 'mainnet' else - 'Chain: {}\n'.format(green(self.proto.chain_name.upper())) ), - ' '.join(self.sort_info(include_group=False)), - '\n'.join(gen_output()), - self.proto.dcoin, - self.total.hl(color=color) ) - - return self.fmt_print - - def display_total(self): - msg('\nTotal unspent: {} {} ({} output{})'.format( - self.total.hl(), - self.proto.dcoin, - len(self.unspent), - suf(self.unspent) )) - - def get_idx_from_user(self,action): - msg('') - while True: - ret = line_input(f'Enter {self.item_desc} number (or RETURN to return to main menu): ') - if ret == '': - return (None,None) if action == 'a_lbl_add' else None - n = get_obj(AddrIdx,n=ret,silent=True) - if not n or n < 1 or n > len(self.unspent): - msg(f'Choice must be a single number between 1 and {len(self.unspent)}') - else: - if action == 'a_lbl_add': - cur_lbl = self.unspent[n-1].label - msg('Current label: {}'.format(cur_lbl.hl() if cur_lbl else '(none)')) - while True: - s = line_input( - "Enter label text (or 'q' to return to main menu): ", - insert_txt = cur_lbl ) - if s == 'q': - return None,None - elif s == '': - if keypress_confirm( - f'Removing label for {self.item_desc} #{n}. Is this what you want?'): - return n,s - elif s: - if get_obj(TwComment,s=s): - return n,s - else: - if action == 'a_addr_delete': - fs = 'Removing {} #{} from tracking wallet. Is this what you want?' - elif action == 'a_balance_refresh': - fs = 'Refreshing tracking wallet {} #{}. Is this what you want?' - if keypress_confirm(fs.format(self.item_desc,n)): - return n - - async def view_and_sort(self,tx): - from .term import get_char - prompt = self.prompt.strip() + '\b' - no_output,oneshot_msg = False,None - from .opts import opt - CUR_HOME,ERASE_ALL = '\033[H','\033[0J' - CUR_RIGHT = lambda n: f'\033[{n}C' - - while True: - msg_r('' if no_output else '\n\n' if opt.no_blank else CUR_HOME+ERASE_ALL) - reply = get_char( - '' if no_output else await self.format_for_display()+'\n'+(oneshot_msg or '')+prompt, - immed_chars=''.join(self.key_mappings.keys()) - ) - no_output = False - oneshot_msg = '' if oneshot_msg else None # tristate, saves previous state - if reply not in self.key_mappings: - msg_r('\ninvalid keypress ') - time.sleep(0.5) - continue - - action = self.key_mappings[reply] - if action[:2] == 's_': - self.do_sort(action[2:]) - if action == 's_twmmid': self.show_mmid = True - elif action == 'd_days': - af = self.age_fmts_interactive - self.age_fmt = af[(af.index(self.age_fmt) + 1) % len(af)] - elif action == 'd_mmid': - self.show_mmid = not self.show_mmid - elif action == 'd_group': - if self.can_group: - self.group = not self.group - elif action == 'd_redraw': - pass - elif action == 'd_reverse': - self.unspent.reverse() - self.reverse = not self.reverse - elif action == 'a_quit': - msg('') - return self.unspent - elif action == 'a_balance_refresh': - idx = self.get_idx_from_user(action) - if idx: - e = self.unspent[idx-1] - bal = await self.wallet.get_balance(e.addr,force_rpc=True) - await self.get_unspent_data() - oneshot_msg = yellow(f'{self.proto.dcoin} balance for account #{idx} refreshed\n\n') - self.display_constants = self.get_display_constants() - elif action == 'a_lbl_add': - idx,lbl = self.get_idx_from_user(action) - if idx: - e = self.unspent[idx-1] - if await self.wallet.add_label(e.twmmid,lbl,addr=e.addr): - await self.get_unspent_data() - oneshot_msg = yellow('Label {} {} #{}\n\n'.format( - ('added to' if lbl else 'removed from'), - self.item_desc, - idx )) - else: - oneshot_msg = red('Label could not be added\n\n') - self.display_constants = self.get_display_constants() - elif action == 'a_addr_delete': - idx = self.get_idx_from_user(action) - if idx: - e = self.unspent[idx-1] - if await self.wallet.remove_address(e.addr): - await self.get_unspent_data() - oneshot_msg = yellow(f'{capfirst(self.item_desc)} #{idx} removed\n\n') - else: - oneshot_msg = red('Address could not be removed\n\n') - self.display_constants = self.get_display_constants() - elif action == 'a_print': - of = '{}-{}[{}].out'.format( - self.dump_fn_pfx, - self.proto.dcoin, - ','.join(self.sort_info(include_group=False)).lower() ) - msg('') - try: - write_data_to_file( - of, - await self.format_for_printing(), - desc = f'{self.desc} listing' ) - except UserNonConfirmation as e: - oneshot_msg = red(f'File {of!r} not overwritten by user request\n\n') - else: - oneshot_msg = yellow(f'Data written to {of!r}\n\n') - elif action in ('a_view','a_view_wide'): - do_pager( - self.fmt_display if action == 'a_view' else - await self.format_for_printing(color=True) ) - if g.platform == 'linux' and oneshot_msg == None: - msg_r(CUR_RIGHT(len(prompt.split('\n')[-1])-2)) - no_output = True - - def age_disp(self,o,age_fmt): - if age_fmt == 'confs': - return o.confs - elif age_fmt == 'block': - return self.rpc.blockcount - (o.confs - 1) - else: - return self.date_formatter[age_fmt](self.rpc,o.date) - - date_formatter = { - 'days': lambda rpc,secs: (rpc.cur_date - secs) // 86400, - 'date': lambda rpc,secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:], - 'date_time': lambda rpc,secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5]), - } - - -class TwAddrList(MMGenDict,metaclass=AsyncInit): - has_age = True - age_fmts = TwUnspentOutputs.age_fmts - age_disp = TwUnspentOutputs.age_disp - date_formatter = TwUnspentOutputs.date_formatter - - def __new__(cls,proto,*args,**kwargs): - return MMGenDict.__new__(altcoin_subclass(cls,proto,'tw'),*args,**kwargs) - - async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None): - - def check_dup_mmid(acct_labels): - mmid_prev,err = None,False - for mmid in sorted(a.mmid for a in acct_labels if a): - if mmid == mmid_prev: - err = True - msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n') - mmid_prev = mmid - if err: rdie(3,'Tracking wallet is corrupted!') - - def check_addr_array_lens(acct_pairs): - err = False - for label,addrs in acct_pairs: - if not label: continue - if len(addrs) != 1: - err = True - if len(addrs) == 0: - msg(f'Label {label!r}: has no associated address!') - else: - msg(f'{addrs!r}: more than one {proto.coin} address in account!') - if err: rdie(3,'Tracking wallet is corrupted!') - - self.rpc = await rpc_init(proto) - self.total = proto.coin_amt('0') - self.proto = proto - - lbl_id = ('account','label')['label_api' in self.rpc.caps] - for d in await self.rpc.call('listunspent',0): - if not lbl_id in d: continue # skip coinbase outputs with missing account - if d['confirmations'] < minconf: continue - label = get_tw_label(proto,d[lbl_id]) - if label: - lm = label.mmid - if usr_addr_list and (lm not in usr_addr_list): - continue - if lm in self: - if self[lm]['addr'] != d['address']: - die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format( - proto.coin, - d['address'], - self[lm]['addr'] )) - else: - lm.confs = d['confirmations'] - lm.txid = d['txid'] - lm.date = None - self[lm] = { - 'amt': proto.coin_amt('0'), - 'lbl': label, - 'addr': CoinAddr(proto,d['address']) } - amt = proto.coin_amt(d['amount']) - self[lm]['amt'] += amt - self.total += amt - - # We use listaccounts only for empty addresses, as it shows false positive balances - if showempty or all_labels: - # for compatibility with old mmids, must use raw RPC rather than native data for matching - # args: minconf,watchonly, MUST use keys() so we get list, not dict - if 'label_api' in self.rpc.caps: - acct_list = await self.rpc.call('listlabels') - aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list]) - acct_addrs = [list(a.keys()) for a in aa] - else: - acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L' - acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) # use raw list here - acct_labels = MMGenList([get_tw_label(proto,a) for a in acct_list]) - check_dup_mmid(acct_labels) - assert len(acct_list) == len(acct_addrs),( - 'listaccounts() and getaddressesbyaccount() not equal in length') - addr_pairs = list(zip(acct_labels,acct_addrs)) - check_addr_array_lens(addr_pairs) - for label,addr_arr in addr_pairs: - if not label: continue - if all_labels and not showempty and not label.comment: continue - if usr_addr_list and (label.mmid not in usr_addr_list): continue - if label.mmid not in self: - self[label.mmid] = { 'amt':proto.coin_amt('0'), 'lbl':label, 'addr':'' } - if showbtcaddrs: - self[label.mmid]['addr'] = CoinAddr(proto,addr_arr[0]) - - def raw_list(self): - return [((k if k.type == 'mmgen' else 'Non-MMGen'),self[k]['addr'],self[k]['amt']) for k in self] - - def coinaddr_list(self): - return [self[k]['addr'] for k in self] - - async def format(self,showbtcaddrs,sort,show_age,age_fmt): - if not self.has_age: - show_age = False - if age_fmt not in self.age_fmts: - raise BadAgeFormat(f'{age_fmt!r}: invalid age format (must be one of {self.age_fmts!r})') - fs = '{mid}' + ('',' {addr}')[showbtcaddrs] + ' {cmt} {amt}' + ('',' {age}')[show_age] - mmaddrs = [k for k in self.keys() if k.type == 'mmgen'] - max_mmid_len = max(len(k) for k in mmaddrs) + 2 if mmaddrs else 10 - max_cmt_width = max(max(v['lbl'].comment.screen_width for v in self.values()),7) - addr_width = max(len(self[mmid]['addr']) for mmid in self) - - max_fp_len = max([len(a.split('.')[1]) for a in [str(v['amt']) for v in self.values()] if '.' in a] or [1]) - - def sort_algo(j): - if sort and 'age' in sort: - return '{}_{:>012}_{}'.format( - j.obj.rsplit(':',1)[0], - # Hack, but OK for the foreseeable future: - (1000000000-(j.confs or 0) if hasattr(j,'confs') else 0), - j.sort_key) - else: - return j.sort_key - - mmids = sorted(self,key=sort_algo,reverse=bool(sort and 'reverse' in sort)) - if show_age: - await TwUnspentOutputs.set_dates( - self.rpc, - [o for o in mmids if hasattr(o,'confs')] ) - - def gen_output(): - - if self.proto.chain_name != 'mainnet': - yield 'Chain: '+green(self.proto.chain_name.upper()) - - yield fs.format( - mid=MMGenID.fmtc('MMGenID',width=max_mmid_len), - addr=(CoinAddr.fmtc('ADDRESS',width=addr_width) if showbtcaddrs else None), - cmt=TwComment.fmtc('COMMENT',width=max_cmt_width+1), - amt='BALANCE'.ljust(max_fp_len+4), - age=age_fmt.upper(), - ).rstrip() - - al_id_save = None - for mmid in mmids: - if mmid.type == 'mmgen': - if al_id_save and al_id_save != mmid.obj.al_id: - yield '' - al_id_save = mmid.obj.al_id - mmid_disp = mmid - else: - if al_id_save: - yield '' - al_id_save = None - mmid_disp = 'Non-MMGen' - e = self[mmid] - yield fs.format( - mid=MMGenID.fmtc(mmid_disp,width=max_mmid_len,color=True), - addr=(e['addr'].fmt(color=True,width=addr_width) if showbtcaddrs else None), - cmt=e['lbl'].comment.fmt(width=max_cmt_width,color=True,nullrepl='-'), - amt=e['amt'].fmt('4.{}'.format(max(max_fp_len,3)),color=True), - age=self.age_disp(mmid,age_fmt) if show_age and hasattr(mmid,'confs') else '-' - ).rstrip() - - yield '\nTOTAL: {} {}'.format( - self.total.hl(color=True), - self.proto.dcoin ) - - return '\n'.join(gen_output()) - -class TrackingWallet(MMGenObject,metaclass=AsyncInit): - - caps = ('rescan','batch') - data_key = 'addresses' - use_tw_file = False - aggressive_sync = False - importing = False - - def __new__(cls,proto,*args,**kwargs): - return MMGenObject.__new__(altcoin_subclass(cls,proto,'tw')) - - async def __init__(self,proto,mode='r',token_addr=None): - - assert mode in ('r','w','i'), f"{mode!r}: wallet mode must be 'r','w' or 'i'" - if mode == 'i': - self.importing = True - mode = 'w' - - if g.debug: - print_stack_trace(f'TW INIT {mode!r} {self!r}') - - self.rpc = await rpc_init(proto) # TODO: create on demand - only certain ops require RPC - self.proto = proto - self.mode = mode - self.desc = self.base_desc = f'{self.proto.name} tracking wallet' - - if self.use_tw_file: - self.init_from_wallet_file() - else: - self.init_empty() - - if self.data['coin'] != self.proto.coin: # TODO remove? - raise WalletFileError( - 'Tracking wallet coin ({}) does not match current coin ({})!'.format( - self.data['coin'], - self.proto.coin )) - - self.conv_types(self.data[self.data_key]) - self.cur_balances = {} # cache balances to prevent repeated lookups per program invocation - - def init_empty(self): - self.data = { 'coin': self.proto.coin, 'addresses': {} } - - def init_from_wallet_file(self): - tw_dir = ( - os.path.join(g.data_dir) if self.proto.coin == 'BTC' else - os.path.join( - g.data_dir_root, - 'altcoins', - self.proto.coin.lower(), - ('' if self.proto.network == 'mainnet' else 'testnet') - )) - self.tw_fn = os.path.join(tw_dir,'tracking-wallet.json') - - check_or_create_dir(tw_dir) - - try: - self.orig_data = get_data_from_file(self.tw_fn,quiet=True) - self.data = json.loads(self.orig_data) - except: - try: os.stat(self.tw_fn) - except: - self.orig_data = '' - self.init_empty() - self.force_write() - else: - raise WalletFileError(f'File {self.tw_fn!r} exists but does not contain valid json data') - else: - self.upgrade_wallet_maybe() - - # ensure that wallet file is written when user exits via KeyboardInterrupt: - if self.mode == 'w': - import atexit - def del_tw(tw): - dmsg(f'Running exit handler del_tw() for {tw!r}') - del tw - atexit.register(del_tw,self) - - def __del__(self): - """ - TrackingWallet instances opened in write or import mode must be explicitly destroyed - with 'del tw', 'del twuo.wallet' and the like to ensure the instance is deleted and - wallet is written before global vars are destroyed by the interpreter at shutdown. - - Not that this code can only be debugged by examining the program output, as exceptions - are ignored within __del__(): - - /usr/share/doc/python3.6-doc/html/reference/datamodel.html#object.__del__ - - Since no exceptions are raised, errors will not be caught by the test suite. - """ - if g.debug: - print_stack_trace(f'TW DEL {self!r}') - - if getattr(self,'mode',None) == 'w': # mode attr might not exist in this state - self.write() - elif g.debug: - msg('read-only wallet, doing nothing') - - def upgrade_wallet_maybe(self): - pass - - def conv_types(self,ad): - for k,v in ad.items(): - if k not in ('params','coin'): - v['mmid'] = TwMMGenID(self.proto,v['mmid']) - v['comment'] = TwComment(v['comment']) - - @property - def data_root(self): - return self.data[self.data_key] - - @property - def data_root_desc(self): - return self.data_key - - def cache_balance(self,addr,bal,session_cache,data_root,force=False): - if force or addr not in session_cache: - session_cache[addr] = str(bal) - if addr in data_root: - data_root[addr]['balance'] = str(bal) - if self.aggressive_sync: - self.write() - - def get_cached_balance(self,addr,session_cache,data_root): - if addr in session_cache: - return self.proto.coin_amt(session_cache[addr]) - if not g.cached_balances: - return None - if addr in data_root and 'balance' in data_root[addr]: - return self.proto.coin_amt(data_root[addr]['balance']) - - async def get_balance(self,addr,force_rpc=False): - ret = None if force_rpc else self.get_cached_balance(addr,self.cur_balances,self.data_root) - if ret == None: - ret = await self.rpc_get_balance(addr) - self.cache_balance(addr,ret,self.cur_balances,self.data_root) - return ret - - async def rpc_get_balance(self,addr): - raise NotImplementedError('not implemented') - - @property - def sorted_list(self): - return sorted( - [ { 'addr':x[0], - 'mmid':x[1]['mmid'], - 'comment':x[1]['comment'] } - for x in self.data_root.items() if x[0] not in ('params','coin') ], - key=lambda x: x['mmid'].sort_key+x['addr'] ) - - @property - def mmid_ordered_dict(self): - return dict((x['mmid'],{'addr':x['addr'],'comment':x['comment']}) for x in self.sorted_list) - - @write_mode - async def import_address(self,addr,label,rescan): - return await self.rpc.call('importaddress',addr,label,rescan,timeout=(False,3600)[rescan]) - - @write_mode - def batch_import_address(self,arg_list): - return self.rpc.batch_call('importaddress',arg_list) - - def force_write(self): - mode_save = self.mode - self.mode = 'w' - self.write() - self.mode = mode_save - - @write_mode - def write_changed(self,data): - write_data_to_file( - self.tw_fn, - data, - desc = f'{self.base_desc} data', - ask_overwrite = False, - ignore_opt_outdir = True, - quiet = True, - check_data = True, - cmp_data = self.orig_data ) - - self.orig_data = data - - def write(self): # use 'check_data' to check wallet hasn't been altered by another program - if not self.use_tw_file: - dmsg("'use_tw_file' is False, doing nothing") - return - dmsg(f'write(): checking if {self.desc} data has changed') - wdata = json.dumps(self.data) - - if self.orig_data != wdata: - if g.debug: - print_stack_trace(f'TW DATA CHANGED {self!r}') - print_diff(self.orig_data,wdata,from_json=True) - self.write_changed(wdata) - elif g.debug: - msg('Data is unchanged\n') - - async def is_in_wallet(self,addr): - return addr in (await TwAddrList(self.proto,[],0,True,True,True,wallet=self)).coinaddr_list() - - @write_mode - async def set_label(self,coinaddr,lbl): - # bitcoin-{abc,bchn} 'setlabel' RPC is broken, so use old 'importaddress' method to set label - # broken behavior: new label is set OK, but old label gets attached to another address - if 'label_api' in self.rpc.caps and self.proto.coin != 'BCH': - args = ('setlabel',coinaddr,lbl) - else: - # NOTE: this works because importaddress() removes the old account before - # associating the new account with the address. - # RPC args: addr,label,rescan[=true],p2sh[=none] - args = ('importaddress',coinaddr,lbl,False) - - try: - return await self.rpc.call(*args) - except Exception as e: - rmsg(e.args[0]) - return False - - # returns on failure - @write_mode - async def add_label(self,arg1,label='',addr=None,silent=False,on_fail='return'): - assert on_fail in ('return','raise'), 'add_label_chk1' - mmaddr,coinaddr = None,None - if is_coin_addr(self.proto,addr or arg1): - coinaddr = get_obj(CoinAddr,proto=self.proto,addr=addr or arg1) - if is_mmgen_id(self.proto,arg1): - mmaddr = TwMMGenID(self.proto,arg1) - - if mmaddr and not coinaddr: - from .addrdata import TwAddrData - coinaddr = (await TwAddrData(self.proto)).mmaddr2coinaddr(mmaddr) - - try: - if not is_mmgen_id(self.proto,arg1): - assert coinaddr, f'Invalid coin address for this chain: {arg1}' - assert coinaddr, f'{g.proj_name} address {mmaddr!r} not found in tracking wallet' - assert await self.is_in_wallet(coinaddr), f'Address {coinaddr!r} not found in tracking wallet' - except Exception as e: - msg(str(e)) - return False - - # Allow for the possibility that BTC addr of MMGen addr was entered. - # Do reverse lookup, so that MMGen addr will not be marked as non-MMGen. - if not mmaddr: - from .addrdata import TwAddrData - mmaddr = (await TwAddrData(proto=self.proto)).coinaddr2mmaddr(coinaddr) - - if not mmaddr: - mmaddr = f'{self.proto.base_coin.lower()}:{coinaddr}' - - mmaddr = TwMMGenID(self.proto,mmaddr) - - cmt = TwComment(label) if on_fail=='raise' else get_obj(TwComment,s=label) - if cmt in (False,None): - return False - - lbl_txt = mmaddr + (' ' + cmt if cmt else '') - lbl = ( - TwLabel(self.proto,lbl_txt) if on_fail == 'raise' else - get_obj(TwLabel,proto=self.proto,text=lbl_txt) ) - - if await self.set_label(coinaddr,lbl) == False: - if not silent: - msg( 'Label could not be {}'.format('added' if label else 'removed') ) - return False - else: - desc = '{} address {} in tracking wallet'.format( - mmaddr.type.replace('mmg','MMG'), - mmaddr.replace(self.proto.base_coin.lower()+':','') ) - if label: - msg(f'Added label {label!r} to {desc}') - else: - msg(f'Removed label from {desc}') - return True - - @write_mode - async def remove_label(self,mmaddr): - await self.add_label(mmaddr,'') - - @write_mode - async def remove_address(self,addr): - raise NotImplementedError(f'address removal not implemented for coin {self.proto.coin}') - -class TwGetBalance(MMGenObject,metaclass=AsyncInit): - - fs = '{w:13} {u:<16} {p:<16} {c}' - - def __new__(cls,proto,*args,**kwargs): - return MMGenObject.__new__(altcoin_subclass(cls,proto,'tw')) - - async def __init__(self,proto,minconf,quiet): - - self.minconf = minconf - self.quiet = quiet - self.data = {k:[proto.coin_amt('0')] * 4 for k in ('TOTAL','Non-MMGen','Non-wallet')} - self.rpc = await rpc_init(proto) - self.proto = proto - await self.create_data() - - async def create_data(self): - # 0: unconfirmed, 1: below minconf, 2: confirmed, 3: spendable (privkey in wallet) - lbl_id = ('account','label')['label_api' in self.rpc.caps] - for d in await self.rpc.call('listunspent',0): - lbl = get_tw_label(self.proto,d[lbl_id]) - if lbl: - if lbl.mmid.type == 'mmgen': - key = lbl.mmid.obj.sid - if key not in self.data: - self.data[key] = [self.proto.coin_amt('0')] * 4 - else: - key = 'Non-MMGen' - else: - lbl,key = None,'Non-wallet' - - amt = self.proto.coin_amt(d['amount']) - - if not d['confirmations']: - self.data['TOTAL'][0] += amt - self.data[key][0] += amt - - conf_level = (1,2)[d['confirmations'] >= self.minconf] - - self.data['TOTAL'][conf_level] += amt - self.data[key][conf_level] += amt - - if d['spendable']: - self.data[key][3] += amt - - def format(self): - def gen_output(): - if self.proto.chain_name != 'mainnet': - yield 'Chain: ' + green(self.proto.chain_name.upper()) - - if self.quiet: - yield str(self.data['TOTAL'][2] if self.data else 0) - else: - yield self.fs.format( - w = 'Wallet', - u = ' Unconfirmed', - p = f' <{self.minconf} confirms', - c = f' >={self.minconf} confirms' ) - - for key in sorted(self.data): - if not any(self.data[key]): - continue - yield self.fs.format(**dict(zip( - ('w','u','p','c'), - [key+':'] + [a.fmt(color=True,suf=' '+self.proto.dcoin) for a in self.data[key]] - ))) - - for key,vals in list(self.data.items()): - if key == 'TOTAL': - continue - if vals[3]: - yield red(f'Warning: this wallet contains PRIVATE KEYS for {key} outputs!') - - return '\n'.join(gen_output()).rstrip() diff --git a/mmgen/twaddrs.py b/mmgen/twaddrs.py new file mode 100755 index 00000000..80608cd1 --- /dev/null +++ b/mmgen/twaddrs.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +twaddrs: Tracking wallet listaddresses class for the MMGen suite +""" + +from .color import green +from .exception import BadAgeFormat +from .util import msg,die,altcoin_subclass +from .base_obj import AsyncInit +from .obj import MMGenList,MMGenDict,TwComment +from .addr import CoinAddr,MMGenID +from .rpc import rpc_init +from .tw import TwCommon,get_tw_label + +class TwAddrList(MMGenDict,TwCommon,metaclass=AsyncInit): + has_age = True + + def __new__(cls,proto,*args,**kwargs): + return MMGenDict.__new__(altcoin_subclass(cls,proto,'twaddrs'),*args,**kwargs) + + async def __init__(self,proto,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels,wallet=None): + + def check_dup_mmid(acct_labels): + mmid_prev,err = None,False + for mmid in sorted(a.mmid for a in acct_labels if a): + if mmid == mmid_prev: + err = True + msg(f'Duplicate MMGen ID ({mmid}) discovered in tracking wallet!\n') + mmid_prev = mmid + if err: rdie(3,'Tracking wallet is corrupted!') + + def check_addr_array_lens(acct_pairs): + err = False + for label,addrs in acct_pairs: + if not label: continue + if len(addrs) != 1: + err = True + if len(addrs) == 0: + msg(f'Label {label!r}: has no associated address!') + else: + msg(f'{addrs!r}: more than one {proto.coin} address in account!') + if err: rdie(3,'Tracking wallet is corrupted!') + + self.rpc = await rpc_init(proto) + self.total = proto.coin_amt('0') + self.proto = proto + + lbl_id = ('account','label')['label_api' in self.rpc.caps] + for d in await self.rpc.call('listunspent',0): + if not lbl_id in d: continue # skip coinbase outputs with missing account + if d['confirmations'] < minconf: continue + label = get_tw_label(proto,d[lbl_id]) + if label: + lm = label.mmid + if usr_addr_list and (lm not in usr_addr_list): + continue + if lm in self: + if self[lm]['addr'] != d['address']: + die(2,'duplicate {} address ({}) for this MMGen address! ({})'.format( + proto.coin, + d['address'], + self[lm]['addr'] )) + else: + lm.confs = d['confirmations'] + lm.txid = d['txid'] + lm.date = None + self[lm] = { + 'amt': proto.coin_amt('0'), + 'lbl': label, + 'addr': CoinAddr(proto,d['address']) } + amt = proto.coin_amt(d['amount']) + self[lm]['amt'] += amt + self.total += amt + + # We use listaccounts only for empty addresses, as it shows false positive balances + if showempty or all_labels: + # for compatibility with old mmids, must use raw RPC rather than native data for matching + # args: minconf,watchonly, MUST use keys() so we get list, not dict + if 'label_api' in self.rpc.caps: + acct_list = await self.rpc.call('listlabels') + aa = await self.rpc.batch_call('getaddressesbylabel',[(k,) for k in acct_list]) + acct_addrs = [list(a.keys()) for a in aa] + else: + acct_list = list((await self.rpc.call('listaccounts',0,True)).keys()) # raw list, no 'L' + acct_addrs = await self.rpc.batch_call('getaddressesbyaccount',[(a,) for a in acct_list]) # use raw list here + acct_labels = MMGenList([get_tw_label(proto,a) for a in acct_list]) + check_dup_mmid(acct_labels) + assert len(acct_list) == len(acct_addrs),( + 'listaccounts() and getaddressesbyaccount() not equal in length') + addr_pairs = list(zip(acct_labels,acct_addrs)) + check_addr_array_lens(addr_pairs) + for label,addr_arr in addr_pairs: + if not label: continue + if all_labels and not showempty and not label.comment: continue + if usr_addr_list and (label.mmid not in usr_addr_list): continue + if label.mmid not in self: + self[label.mmid] = { 'amt':proto.coin_amt('0'), 'lbl':label, 'addr':'' } + if showbtcaddrs: + self[label.mmid]['addr'] = CoinAddr(proto,addr_arr[0]) + + def raw_list(self): + return [((k if k.type == 'mmgen' else 'Non-MMGen'),self[k]['addr'],self[k]['amt']) for k in self] + + def coinaddr_list(self): + return [self[k]['addr'] for k in self] + + async def format(self,showbtcaddrs,sort,show_age,age_fmt): + if not self.has_age: + show_age = False + if age_fmt not in self.age_fmts: + raise BadAgeFormat(f'{age_fmt!r}: invalid age format (must be one of {self.age_fmts!r})') + fs = '{mid}' + ('',' {addr}')[showbtcaddrs] + ' {cmt} {amt}' + ('',' {age}')[show_age] + mmaddrs = [k for k in self.keys() if k.type == 'mmgen'] + max_mmid_len = max(len(k) for k in mmaddrs) + 2 if mmaddrs else 10 + max_cmt_width = max(max(v['lbl'].comment.screen_width for v in self.values()),7) + addr_width = max(len(self[mmid]['addr']) for mmid in self) + + max_fp_len = max([len(a.split('.')[1]) for a in [str(v['amt']) for v in self.values()] if '.' in a] or [1]) + + def sort_algo(j): + if sort and 'age' in sort: + return '{}_{:>012}_{}'.format( + j.obj.rsplit(':',1)[0], + # Hack, but OK for the foreseeable future: + (1000000000-(j.confs or 0) if hasattr(j,'confs') else 0), + j.sort_key) + else: + return j.sort_key + + mmids = sorted(self,key=sort_algo,reverse=bool(sort and 'reverse' in sort)) + if show_age: + await self.set_dates( + self.rpc, + [o for o in mmids if hasattr(o,'confs')] ) + + def gen_output(): + + if self.proto.chain_name != 'mainnet': + yield 'Chain: '+green(self.proto.chain_name.upper()) + + yield fs.format( + mid=MMGenID.fmtc('MMGenID',width=max_mmid_len), + addr=(CoinAddr.fmtc('ADDRESS',width=addr_width) if showbtcaddrs else None), + cmt=TwComment.fmtc('COMMENT',width=max_cmt_width+1), + amt='BALANCE'.ljust(max_fp_len+4), + age=age_fmt.upper(), + ).rstrip() + + al_id_save = None + for mmid in mmids: + if mmid.type == 'mmgen': + if al_id_save and al_id_save != mmid.obj.al_id: + yield '' + al_id_save = mmid.obj.al_id + mmid_disp = mmid + else: + if al_id_save: + yield '' + al_id_save = None + mmid_disp = 'Non-MMGen' + e = self[mmid] + yield fs.format( + mid=MMGenID.fmtc(mmid_disp,width=max_mmid_len,color=True), + addr=(e['addr'].fmt(color=True,width=addr_width) if showbtcaddrs else None), + cmt=e['lbl'].comment.fmt(width=max_cmt_width,color=True,nullrepl='-'), + amt=e['amt'].fmt('4.{}'.format(max(max_fp_len,3)),color=True), + age=self.age_disp(mmid,age_fmt) if show_age and hasattr(mmid,'confs') else '-' + ).rstrip() + + yield '\nTOTAL: {} {}'.format( + self.total.hl(color=True), + self.proto.dcoin ) + + return '\n'.join(gen_output()) diff --git a/mmgen/twbal.py b/mmgen/twbal.py new file mode 100755 index 00000000..3ae13b0c --- /dev/null +++ b/mmgen/twbal.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +twbal: Tracking wallet getbalance class for the MMGen suite +""" + +from .color import red,green +from .util import altcoin_subclass +from .base_obj import AsyncInit +from .objmethods import MMGenObject +from .rpc import rpc_init +from .tw import get_tw_label + +class TwGetBalance(MMGenObject,metaclass=AsyncInit): + + fs = '{w:13} {u:<16} {p:<16} {c}' + + def __new__(cls,proto,*args,**kwargs): + return MMGenObject.__new__(altcoin_subclass(cls,proto,'twbal')) + + async def __init__(self,proto,minconf,quiet): + + self.minconf = minconf + self.quiet = quiet + self.data = {k:[proto.coin_amt('0')] * 4 for k in ('TOTAL','Non-MMGen','Non-wallet')} + self.rpc = await rpc_init(proto) + self.proto = proto + await self.create_data() + + async def create_data(self): + # 0: unconfirmed, 1: below minconf, 2: confirmed, 3: spendable (privkey in wallet) + lbl_id = ('account','label')['label_api' in self.rpc.caps] + for d in await self.rpc.call('listunspent',0): + lbl = get_tw_label(self.proto,d[lbl_id]) + if lbl: + if lbl.mmid.type == 'mmgen': + key = lbl.mmid.obj.sid + if key not in self.data: + self.data[key] = [self.proto.coin_amt('0')] * 4 + else: + key = 'Non-MMGen' + else: + lbl,key = None,'Non-wallet' + + amt = self.proto.coin_amt(d['amount']) + + if not d['confirmations']: + self.data['TOTAL'][0] += amt + self.data[key][0] += amt + + conf_level = (1,2)[d['confirmations'] >= self.minconf] + + self.data['TOTAL'][conf_level] += amt + self.data[key][conf_level] += amt + + if d['spendable']: + self.data[key][3] += amt + + def format(self): + def gen_output(): + if self.proto.chain_name != 'mainnet': + yield 'Chain: ' + green(self.proto.chain_name.upper()) + + if self.quiet: + yield str(self.data['TOTAL'][2] if self.data else 0) + else: + yield self.fs.format( + w = 'Wallet', + u = ' Unconfirmed', + p = f' <{self.minconf} confirms', + c = f' >={self.minconf} confirms' ) + + for key in sorted(self.data): + if not any(self.data[key]): + continue + yield self.fs.format(**dict(zip( + ('w','u','p','c'), + [key+':'] + [a.fmt(color=True,suf=' '+self.proto.dcoin) for a in self.data[key]] + ))) + + for key,vals in list(self.data.items()): + if key == 'TOTAL': + continue + if vals[3]: + yield red(f'Warning: this wallet contains PRIVATE KEYS for {key} outputs!') + + return '\n'.join(gen_output()).rstrip() diff --git a/mmgen/twctl.py b/mmgen/twctl.py new file mode 100755 index 00000000..20a23638 --- /dev/null +++ b/mmgen/twctl.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +twctl: Tracking wallet control class for the MMGen suite +""" + +from .globalvars import g +from .exception import WalletFileError +from .util import ( + msg, + dmsg, + check_or_create_dir, + write_data_to_file, + get_data_from_file, + write_mode, + altcoin_subclass +) +from .base_obj import AsyncInit +from .objmethods import MMGenObject +from .obj import TwComment,get_obj +from .addr import CoinAddr,is_mmgen_id,is_coin_addr +from .rpc import rpc_init +from .tw import TwMMGenID,TwLabel + +class TrackingWallet(MMGenObject,metaclass=AsyncInit): + + caps = ('rescan','batch') + data_key = 'addresses' + use_tw_file = False + aggressive_sync = False + importing = False + + def __new__(cls,proto,*args,**kwargs): + return MMGenObject.__new__(altcoin_subclass(cls,proto,'twctl')) + + async def __init__(self,proto,mode='r',token_addr=None): + + assert mode in ('r','w','i'), f"{mode!r}: wallet mode must be 'r','w' or 'i'" + if mode == 'i': + self.importing = True + mode = 'w' + + if g.debug: + print_stack_trace(f'TW INIT {mode!r} {self!r}') + + self.rpc = await rpc_init(proto) # TODO: create on demand - only certain ops require RPC + self.proto = proto + self.mode = mode + self.desc = self.base_desc = f'{self.proto.name} tracking wallet' + + if self.use_tw_file: + self.init_from_wallet_file() + else: + self.init_empty() + + if self.data['coin'] != self.proto.coin: # TODO remove? + raise WalletFileError( + 'Tracking wallet coin ({}) does not match current coin ({})!'.format( + self.data['coin'], + self.proto.coin )) + + self.conv_types(self.data[self.data_key]) + self.cur_balances = {} # cache balances to prevent repeated lookups per program invocation + + def init_empty(self): + self.data = { 'coin': self.proto.coin, 'addresses': {} } + + def init_from_wallet_file(self): + import os,json + tw_dir = ( + os.path.join(g.data_dir) if self.proto.coin == 'BTC' else + os.path.join( + g.data_dir_root, + 'altcoins', + self.proto.coin.lower(), + ('' if self.proto.network == 'mainnet' else 'testnet') + )) + self.tw_fn = os.path.join(tw_dir,'tracking-wallet.json') + + check_or_create_dir(tw_dir) + + try: + self.orig_data = get_data_from_file(self.tw_fn,quiet=True) + self.data = json.loads(self.orig_data) + except: + try: os.stat(self.tw_fn) + except: + self.orig_data = '' + self.init_empty() + self.force_write() + else: + raise WalletFileError(f'File {self.tw_fn!r} exists but does not contain valid json data') + else: + self.upgrade_wallet_maybe() + + # ensure that wallet file is written when user exits via KeyboardInterrupt: + if self.mode == 'w': + import atexit + def del_tw(tw): + dmsg(f'Running exit handler del_tw() for {tw!r}') + del tw + atexit.register(del_tw,self) + + def __del__(self): + """ + TrackingWallet instances opened in write or import mode must be explicitly destroyed + with 'del twctl', 'del twuo.wallet' and the like to ensure the instance is deleted and + wallet is written before global vars are destroyed by the interpreter at shutdown. + + Not that this code can only be debugged by examining the program output, as exceptions + are ignored within __del__(): + + /usr/share/doc/python3.6-doc/html/reference/datamodel.html#object.__del__ + + Since no exceptions are raised, errors will not be caught by the test suite. + """ + if g.debug: + print_stack_trace(f'TW DEL {self!r}') + + if getattr(self,'mode',None) == 'w': # mode attr might not exist in this state + self.write() + elif g.debug: + msg('read-only wallet, doing nothing') + + def upgrade_wallet_maybe(self): + pass + + def conv_types(self,ad): + for k,v in ad.items(): + if k not in ('params','coin'): + v['mmid'] = TwMMGenID(self.proto,v['mmid']) + v['comment'] = TwComment(v['comment']) + + @property + def data_root(self): + return self.data[self.data_key] + + @property + def data_root_desc(self): + return self.data_key + + def cache_balance(self,addr,bal,session_cache,data_root,force=False): + if force or addr not in session_cache: + session_cache[addr] = str(bal) + if addr in data_root: + data_root[addr]['balance'] = str(bal) + if self.aggressive_sync: + self.write() + + def get_cached_balance(self,addr,session_cache,data_root): + if addr in session_cache: + return self.proto.coin_amt(session_cache[addr]) + if not g.cached_balances: + return None + if addr in data_root and 'balance' in data_root[addr]: + return self.proto.coin_amt(data_root[addr]['balance']) + + async def get_balance(self,addr,force_rpc=False): + ret = None if force_rpc else self.get_cached_balance(addr,self.cur_balances,self.data_root) + if ret == None: + ret = await self.rpc_get_balance(addr) + self.cache_balance(addr,ret,self.cur_balances,self.data_root) + return ret + + async def rpc_get_balance(self,addr): + raise NotImplementedError('not implemented') + + @property + def sorted_list(self): + return sorted( + [ { 'addr':x[0], + 'mmid':x[1]['mmid'], + 'comment':x[1]['comment'] } + for x in self.data_root.items() if x[0] not in ('params','coin') ], + key=lambda x: x['mmid'].sort_key+x['addr'] ) + + @property + def mmid_ordered_dict(self): + return dict((x['mmid'],{'addr':x['addr'],'comment':x['comment']}) for x in self.sorted_list) + + @write_mode + async def import_address(self,addr,label,rescan): + return await self.rpc.call('importaddress',addr,label,rescan,timeout=(False,3600)[rescan]) + + @write_mode + def batch_import_address(self,arg_list): + return self.rpc.batch_call('importaddress',arg_list) + + def force_write(self): + mode_save = self.mode + self.mode = 'w' + self.write() + self.mode = mode_save + + @write_mode + def write_changed(self,data): + write_data_to_file( + self.tw_fn, + data, + desc = f'{self.base_desc} data', + ask_overwrite = False, + ignore_opt_outdir = True, + quiet = True, + check_data = True, + cmp_data = self.orig_data ) + + self.orig_data = data + + def write(self): # use 'check_data' to check wallet hasn't been altered by another program + if not self.use_tw_file: + dmsg("'use_tw_file' is False, doing nothing") + return + dmsg(f'write(): checking if {self.desc} data has changed') + + import json + wdata = json.dumps(self.data) + + if self.orig_data != wdata: + if g.debug: + print_stack_trace(f'TW DATA CHANGED {self!r}') + print_diff(self.orig_data,wdata,from_json=True) + self.write_changed(wdata) + elif g.debug: + msg('Data is unchanged\n') + + async def is_in_wallet(self,addr): + from .twaddrs import TwAddrList + return addr in (await TwAddrList(self.proto,[],0,True,True,True,wallet=self)).coinaddr_list() + + @write_mode + async def set_label(self,coinaddr,lbl): + # bitcoin-{abc,bchn} 'setlabel' RPC is broken, so use old 'importaddress' method to set label + # broken behavior: new label is set OK, but old label gets attached to another address + if 'label_api' in self.rpc.caps and self.proto.coin != 'BCH': + args = ('setlabel',coinaddr,lbl) + else: + # NOTE: this works because importaddress() removes the old account before + # associating the new account with the address. + # RPC args: addr,label,rescan[=true],p2sh[=none] + args = ('importaddress',coinaddr,lbl,False) + + try: + return await self.rpc.call(*args) + except Exception as e: + rmsg(e.args[0]) + return False + + # returns on failure + @write_mode + async def add_label(self,arg1,label='',addr=None,silent=False,on_fail='return'): + assert on_fail in ('return','raise'), 'add_label_chk1' + mmaddr,coinaddr = None,None + if is_coin_addr(self.proto,addr or arg1): + coinaddr = get_obj(CoinAddr,proto=self.proto,addr=addr or arg1) + if is_mmgen_id(self.proto,arg1): + mmaddr = TwMMGenID(self.proto,arg1) + + if mmaddr and not coinaddr: + from .addrdata import TwAddrData + coinaddr = (await TwAddrData(self.proto)).mmaddr2coinaddr(mmaddr) + + try: + if not is_mmgen_id(self.proto,arg1): + assert coinaddr, f'Invalid coin address for this chain: {arg1}' + assert coinaddr, f'{g.proj_name} address {mmaddr!r} not found in tracking wallet' + assert await self.is_in_wallet(coinaddr), f'Address {coinaddr!r} not found in tracking wallet' + except Exception as e: + msg(str(e)) + return False + + # Allow for the possibility that BTC addr of MMGen addr was entered. + # Do reverse lookup, so that MMGen addr will not be marked as non-MMGen. + if not mmaddr: + from .addrdata import TwAddrData + mmaddr = (await TwAddrData(proto=self.proto)).coinaddr2mmaddr(coinaddr) + + if not mmaddr: + mmaddr = f'{self.proto.base_coin.lower()}:{coinaddr}' + + mmaddr = TwMMGenID(self.proto,mmaddr) + + cmt = TwComment(label) if on_fail=='raise' else get_obj(TwComment,s=label) + if cmt in (False,None): + return False + + lbl_txt = mmaddr + (' ' + cmt if cmt else '') + lbl = ( + TwLabel(self.proto,lbl_txt) if on_fail == 'raise' else + get_obj(TwLabel,proto=self.proto,text=lbl_txt) ) + + if await self.set_label(coinaddr,lbl) == False: + if not silent: + msg( 'Label could not be {}'.format('added' if label else 'removed') ) + return False + else: + desc = '{} address {} in tracking wallet'.format( + mmaddr.type.replace('mmg','MMG'), + mmaddr.replace(self.proto.base_coin.lower()+':','') ) + if label: + msg(f'Added label {label!r} to {desc}') + else: + msg(f'Removed label from {desc}') + return True + + @write_mode + async def remove_label(self,mmaddr): + await self.add_label(mmaddr,'') + + @write_mode + async def remove_address(self,addr): + raise NotImplementedError(f'address removal not implemented for coin {self.proto.coin}') diff --git a/mmgen/twuo.py b/mmgen/twuo.py new file mode 100755 index 00000000..60234247 --- /dev/null +++ b/mmgen/twuo.py @@ -0,0 +1,525 @@ +#!/usr/bin/env python3 +# +# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution +# Copyright (C)2013-2022 The MMGen Project +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +twuo: Tracking wallet unspent outputs class for the MMGen suite +""" + +import time +from collections import namedtuple + +from .globalvars import g +from .color import red,yellow,green +from .exception import BadAgeFormat +from .util import ( + msg, + msg_r, + die, + capfirst, + suf, + fmt, + make_timestr, + keypress_confirm, + write_data_to_file, + line_input, + do_pager, + altcoin_subclass +) +from .base_obj import AsyncInit +from .objmethods import MMGenObject +from .obj import ImmutableAttr,ListItemAttr,MMGenListItem,TwComment,get_obj +from .addr import CoinAddr,MMGenID,AddrIdx +from .rpc import rpc_init +from .tw import TwCommon,TwMMGenID,get_tw_label + +class TwUnspentOutputs(MMGenObject,TwCommon,metaclass=AsyncInit): + + def __new__(cls,proto,*args,**kwargs): + return MMGenObject.__new__(altcoin_subclass(cls,proto,'twuo')) + + txid_w = 64 + disp_type = 'btc' + can_group = True + hdr_fmt = 'UNSPENT OUTPUTS (sort order: {}) Total {}: {}' + desc = 'unspent outputs' + item_desc = 'unspent output' + dump_fn_pfx = 'listunspent' + prompt_fs = 'Total to spend, excluding fees: {} {}\n\n' + prompt = """ +Sort options: [t]xid, [a]mount, a[d]dress, [A]ge, [r]everse, [M]mgen addr +Display options: toggle [D]ays/date, show [g]roup, show [m]mgen addr, r[e]draw +Actions: [q]uit view, [p]rint to file, pager [v]iew, [w]ide view, add [l]abel: +""" + key_mappings = { + 't':'s_txid','a':'s_amt','d':'s_addr','A':'s_age','r':'d_reverse','M':'s_twmmid', + 'D':'d_days','g':'d_group','m':'d_mmid','e':'d_redraw', + 'q':'a_quit','p':'a_print','v':'a_view','w':'a_view_wide','l':'a_lbl_add' } + col_adj = 38 + age_fmts_date_dependent = ('days','date','date_time') + age_fmts_interactive = ('confs','block','days','date') + _age_fmt = 'confs' + + class MMGenTwOutputList(list,MMGenObject): pass + + class MMGenTwUnspentOutput(MMGenListItem): + txid = ListItemAttr('CoinTxID') + vout = ListItemAttr(int,typeconv=False) + amt = ImmutableAttr(None) + amt2 = ListItemAttr(None) + label = ListItemAttr('TwComment',reassign_ok=True) + twmmid = ImmutableAttr(TwMMGenID,include_proto=True) + addr = ImmutableAttr(CoinAddr,include_proto=True) + confs = ImmutableAttr(int,typeconv=False) + date = ListItemAttr(int,typeconv=False,reassign_ok=True) + scriptPubKey = ImmutableAttr('HexStr') + skip = ListItemAttr(str,typeconv=False,reassign_ok=True) + + # required by gen_unspent(); setting valid_attrs explicitly is also more efficient + valid_attrs = {'txid','vout','amt','amt2','label','twmmid','addr','confs','date','scriptPubKey','skip'} + invalid_attrs = {'proto'} + + def __init__(self,proto,**kwargs): + self.__dict__['proto'] = proto + MMGenListItem.__init__(self,**kwargs) + + class conv_funcs: + def amt(self,value): + return self.proto.coin_amt(value) + def amt2(self,value): + return self.proto.coin_amt(value) + + async def __init__(self,proto,minconf=1,addrs=[]): + self.proto = proto + self.unspent = self.MMGenTwOutputList() + self.fmt_display = '' + self.fmt_print = '' + self.cols = None + self.reverse = False + self.group = False + self.show_mmid = True + self.minconf = minconf + self.addrs = addrs + self.sort_key = 'age' + self.disp_prec = self.get_display_precision() + self.rpc = await rpc_init(proto) + + from .twctl import TrackingWallet + self.wallet = await TrackingWallet(proto,mode='w') + if self.disp_type == 'token': + self.proto.tokensym = self.wallet.symbol + + @property + def age_fmt(self): + return self._age_fmt + + @age_fmt.setter + def age_fmt(self,val): + if val not in self.age_fmts: + raise BadAgeFormat(f'{val!r}: invalid age format (must be one of {self.age_fmts!r})') + self._age_fmt = val + + def get_display_precision(self): + return self.proto.coin_amt.max_prec + + @property + def total(self): + return sum(i.amt for i in self.unspent) + + async def get_unspent_rpc(self): + # bitcoin-cli help listunspent: + # Arguments: + # 1. minconf (numeric, optional, default=1) The minimum confirmations to filter + # 2. maxconf (numeric, optional, default=9999999) The maximum confirmations to filter + # 3. addresses (json array, optional, default=empty array) A json array of bitcoin addresses + # 4. include_unsafe (boolean, optional, default=true) Include outputs that are not safe to spend + # 5. query_options (json object, optional) JSON with query options + + # for now, self.addrs is just an empty list for Bitcoin and friends + add_args = (9999999,self.addrs) if self.addrs else () + return await self.rpc.call('listunspent',self.minconf,*add_args) + + async def get_unspent_data(self,sort_key=None,reverse_sort=False): + + us_raw = await self.get_unspent_rpc() + + if not us_raw: + die(0,fmt(f""" + No spendable outputs found! Import addresses with balances into your + watch-only wallet using '{g.proj_name.lower()}-addrimport' and then re-run this program. + """).strip()) + + lbl_id = ('account','label')['label_api' in self.rpc.caps] + + def gen_unspent(): + for o in us_raw: + if not lbl_id in o: + continue # coinbase outputs have no account field + l = get_tw_label(self.proto,o[lbl_id]) + if l: + o.update({ + 'twmmid': l.mmid, + 'label': l.comment or '', + 'amt': self.proto.coin_amt(o['amount']), + 'addr': CoinAddr(self.proto,o['address']), + 'confs': o['confirmations'] + }) + yield self.MMGenTwUnspentOutput( + self.proto, + **{ k:v for k,v in o.items() if k in self.MMGenTwUnspentOutput.valid_attrs } ) + + self.unspent = self.MMGenTwOutputList(gen_unspent()) + + if not self.unspent: + die(1, f'No tracked {self.item_desc}s in tracking wallet!') + + self.do_sort(key=sort_key,reverse=reverse_sort) + + def do_sort(self,key=None,reverse=False): + sort_funcs = { + 'addr': lambda i: i.addr, + 'age': lambda i: 0 - i.confs, + 'amt': lambda i: i.amt, + 'txid': lambda i: f'{i.txid} {i.vout:04}', + 'twmmid': lambda i: i.twmmid.sort_key + } + key = key or self.sort_key + if key not in sort_funcs: + die(1,f'{key!r}: invalid sort key. Valid options: {" ".join(sort_funcs.keys())}') + self.sort_key = key + assert type(reverse) == bool + self.unspent.sort(key=sort_funcs[key],reverse=reverse or self.reverse) + + def sort_info(self,include_group=True): + ret = ([],['Reverse'])[self.reverse] + ret.append(capfirst(self.sort_key).replace('Twmmid','MMGenID')) + if include_group and self.group and (self.sort_key in ('addr','txid','twmmid')): + ret.append('Grouped') + return ret + + def set_term_columns(self): + from .term import get_terminal_size + while True: + self.cols = g.terminal_width or get_terminal_size().width + if self.cols >= g.min_screen_width: + break + line_input( + 'Screen too narrow to display the tracking wallet\n' + + f'Please resize your screen to at least {g.min_screen_width} characters and hit ENTER ' ) + + def get_display_constants(self): + unsp = self.unspent + for i in unsp: + i.skip = '' + + # allow for 7-digit confirmation nums + col1_w = max(3,len(str(len(unsp)))+1) # num + ')' + mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in unsp) or 12 # DEADBEEF:S:1 + max_acct_w = max(i.label.screen_width for i in unsp) + mmid_w + 1 + max_btcaddr_w = max(len(i.addr) for i in unsp) + min_addr_w = self.cols - self.col_adj + addr_w = min(max_btcaddr_w + (0,1+max_acct_w)[self.show_mmid],min_addr_w) + acct_w = min(max_acct_w, max(24,addr_w-10)) + btaddr_w = addr_w - acct_w - 1 + label_w = acct_w - mmid_w - 1 + tx_w = min(self.txid_w,self.cols-addr_w-29-col1_w) # min=6 TODO + txdots = ('','..')[tx_w < self.txid_w] + + dc = namedtuple('display_constants',['col1_w','mmid_w','addr_w','btaddr_w','label_w','tx_w','txdots']) + return dc(col1_w,mmid_w,addr_w,btaddr_w,label_w,tx_w,txdots) + + async def format_for_display(self): + unsp = self.unspent + if self.age_fmt in self.age_fmts_date_dependent: + await self.set_dates(self.rpc,unsp) + self.set_term_columns() + + c = getattr(self,'display_constants',None) + if not c: + c = self.display_constants = self.get_display_constants() + + if self.group and (self.sort_key in ('addr','txid','twmmid')): + for a,b in [(unsp[i],unsp[i+1]) for i in range(len(unsp)-1)]: + for k in ('addr','txid','twmmid'): + if self.sort_key == k and getattr(a,k) == getattr(b,k): + b.skip = (k,'addr')[k=='twmmid'] + + def gen_output(): + yield self.hdr_fmt.format(' '.join(self.sort_info()),self.proto.dcoin,self.total.hl()) + if self.proto.chain_name != 'mainnet': + yield 'Chain: '+green(self.proto.chain_name.upper()) + fs = { 'btc': ' {n:%s} {t:%s} {v:2} {a} {A} {c:<}' % (c.col1_w,c.tx_w), + 'eth': ' {n:%s} {a} {A}' % c.col1_w, + 'token': ' {n:%s} {a} {A} {A2}' % c.col1_w }[self.disp_type] + fs_hdr = ' {n:%s} {t:%s} {a} {A} {c:<}' % (c.col1_w,c.tx_w) if self.disp_type == 'btc' else fs + date_hdr = { + 'confs': 'Confs', + 'block': 'Block', + 'days': 'Age(d)', + 'date': 'Date', + 'date_time': 'Date', + } + yield fs_hdr.format( + n = 'Num', + t = 'TXid'.ljust(c.tx_w - 2) + ' Vout', + a = 'Address'.ljust(c.addr_w), + A = f'Amt({self.proto.dcoin})'.ljust(self.disp_prec+5), + A2 = f' Amt({self.proto.coin})'.ljust(self.disp_prec+4), + c = date_hdr[self.age_fmt], + ).rstrip() + + for n,i in enumerate(unsp): + addr_dots = '|' + '.'*(c.addr_w-1) + mmid_disp = MMGenID.fmtc( + ( + '.'*c.mmid_w if i.skip == 'addr' else + i.twmmid if i.twmmid.type == 'mmgen' else + f'Non-{g.proj_name}' + ), + width = c.mmid_w, + color = True ) + + if self.show_mmid: + addr_out = '{} {}{}'.format(( + type(i.addr).fmtc(addr_dots,width=c.btaddr_w,color=True) if i.skip == 'addr' else + i.addr.fmt(width=c.btaddr_w,color=True) + ), + mmid_disp, + (' ' + i.label.fmt(width=c.label_w,color=True)) if c.label_w > 0 else '' + ) + else: + addr_out = ( + type(i.addr).fmtc(addr_dots,width=c.addr_w,color=True) if i.skip=='addr' else + i.addr.fmt(width=c.addr_w,color=True) ) + + yield fs.format( + n = str(n+1)+')', + t = ( + '' if not i.txid else + ' ' * (c.tx_w-4) + '|...' if i.skip == 'txid' else + i.txid[:c.tx_w-len(c.txdots)] + c.txdots ), + v = i.vout, + a = addr_out, + A = i.amt.fmt(color=True,prec=self.disp_prec), + A2 = (i.amt2.fmt(color=True,prec=self.disp_prec) if i.amt2 is not None else ''), + c = self.age_disp(i,self.age_fmt), + ).rstrip() + + self.fmt_display = '\n'.join(gen_output()) + '\n' + return self.fmt_display + + async def format_for_printing(self,color=False,show_confs=True): + await self.set_dates(self.rpc,self.unspent) + addr_w = max(len(i.addr) for i in self.unspent) + mmid_w = max(len(('',i.twmmid)[i.twmmid.type=='mmgen']) for i in self.unspent) or 12 # DEADBEEF:S:1 + amt_w = self.proto.coin_amt.max_prec + 5 + cfs = '{c:<8} ' if show_confs else '' + fs = { + 'btc': (' {n:4} {t:%s} {a} {m} {A:%s} ' + cfs + '{b:<8} {D:<19} {l}') % (self.txid_w+3,amt_w), + 'eth': ' {n:4} {a} {m} {A:%s} {l}' % amt_w, + 'token': ' {n:4} {a} {m} {A:%s} {A2:%s} {l}' % (amt_w,amt_w) + }[self.disp_type] + + def gen_output(): + yield fs.format( + n = 'Num', + t = 'Tx ID,Vout', + a = 'Address'.ljust(addr_w), + m = 'MMGen ID'.ljust(mmid_w), + A = f'Amount({self.proto.dcoin})', + A2 = f'Amount({self.proto.coin})', + c = 'Confs', # skipped for eth + b = 'Block', # skipped for eth + D = 'Date', + l = 'Label' ) + + max_lbl_len = max([len(i.label) for i in self.unspent if i.label] or [2]) + for n,i in enumerate(self.unspent): + yield fs.format( + n = str(n+1) + ')', + t = '{},{}'.format( + ('|'+'.'*63 if i.skip == 'txid' and self.group else i.txid), + i.vout ), + a = ( + '|'+'.' * addr_w if i.skip == 'addr' and self.group else + i.addr.fmt(color=color,width=addr_w) ), + m = MMGenID.fmtc( + (i.twmmid if i.twmmid.type == 'mmgen' else f'Non-{g.proj_name}'), + width = mmid_w, + color = color ), + A = i.amt.fmt(color=color), + A2 = ( i.amt2.fmt(color=color) if i.amt2 is not None else '' ), + c = i.confs, + b = self.rpc.blockcount - (i.confs - 1), + D = self.age_disp(i,'date_time'), + l = i.label.hl(color=color) if i.label else + TwComment.fmtc( + s = '', + color = color, + nullrepl = '-', + width = max_lbl_len ) + ).rstrip() + + fs2 = '{} (block #{}, {} UTC)\n{}Sort order: {}\n{}\n\nTotal {}: {}\n' + self.fmt_print = fs2.format( + capfirst(self.desc), + self.rpc.blockcount, + make_timestr(self.rpc.cur_date), + ('' if self.proto.chain_name == 'mainnet' else + 'Chain: {}\n'.format(green(self.proto.chain_name.upper())) ), + ' '.join(self.sort_info(include_group=False)), + '\n'.join(gen_output()), + self.proto.dcoin, + self.total.hl(color=color) ) + + return self.fmt_print + + def display_total(self): + msg('\nTotal unspent: {} {} ({} output{})'.format( + self.total.hl(), + self.proto.dcoin, + len(self.unspent), + suf(self.unspent) )) + + def get_idx_from_user(self,action): + msg('') + while True: + ret = line_input(f'Enter {self.item_desc} number (or RETURN to return to main menu): ') + if ret == '': + return (None,None) if action == 'a_lbl_add' else None + n = get_obj(AddrIdx,n=ret,silent=True) + if not n or n < 1 or n > len(self.unspent): + msg(f'Choice must be a single number between 1 and {len(self.unspent)}') + else: + if action == 'a_lbl_add': + cur_lbl = self.unspent[n-1].label + msg('Current label: {}'.format(cur_lbl.hl() if cur_lbl else '(none)')) + while True: + s = line_input( + "Enter label text (or 'q' to return to main menu): ", + insert_txt = cur_lbl ) + if s == 'q': + return None,None + elif s == '': + if keypress_confirm( + f'Removing label for {self.item_desc} #{n}. Is this what you want?'): + return n,s + elif s: + if get_obj(TwComment,s=s): + return n,s + else: + if action == 'a_addr_delete': + fs = 'Removing {} #{} from tracking wallet. Is this what you want?' + elif action == 'a_balance_refresh': + fs = 'Refreshing tracking wallet {} #{}. Is this what you want?' + if keypress_confirm(fs.format(self.item_desc,n)): + return n + + async def view_and_sort(self,tx): + from .term import get_char + prompt = self.prompt.strip() + '\b' + no_output,oneshot_msg = False,None + from .opts import opt + CUR_HOME,ERASE_ALL = '\033[H','\033[0J' + CUR_RIGHT = lambda n: f'\033[{n}C' + + while True: + msg_r('' if no_output else '\n\n' if opt.no_blank else CUR_HOME+ERASE_ALL) + reply = get_char( + '' if no_output else await self.format_for_display()+'\n'+(oneshot_msg or '')+prompt, + immed_chars=''.join(self.key_mappings.keys()) + ) + no_output = False + oneshot_msg = '' if oneshot_msg else None # tristate, saves previous state + if reply not in self.key_mappings: + msg_r('\ninvalid keypress ') + time.sleep(0.5) + continue + + action = self.key_mappings[reply] + if action[:2] == 's_': + self.do_sort(action[2:]) + if action == 's_twmmid': self.show_mmid = True + elif action == 'd_days': + af = self.age_fmts_interactive + self.age_fmt = af[(af.index(self.age_fmt) + 1) % len(af)] + elif action == 'd_mmid': + self.show_mmid = not self.show_mmid + elif action == 'd_group': + if self.can_group: + self.group = not self.group + elif action == 'd_redraw': + pass + elif action == 'd_reverse': + self.unspent.reverse() + self.reverse = not self.reverse + elif action == 'a_quit': + msg('') + return self.unspent + elif action == 'a_balance_refresh': + idx = self.get_idx_from_user(action) + if idx: + e = self.unspent[idx-1] + bal = await self.wallet.get_balance(e.addr,force_rpc=True) + await self.get_unspent_data() + oneshot_msg = yellow(f'{self.proto.dcoin} balance for account #{idx} refreshed\n\n') + self.display_constants = self.get_display_constants() + elif action == 'a_lbl_add': + idx,lbl = self.get_idx_from_user(action) + if idx: + e = self.unspent[idx-1] + if await self.wallet.add_label(e.twmmid,lbl,addr=e.addr): + await self.get_unspent_data() + oneshot_msg = yellow('Label {} {} #{}\n\n'.format( + ('added to' if lbl else 'removed from'), + self.item_desc, + idx )) + else: + oneshot_msg = red('Label could not be added\n\n') + self.display_constants = self.get_display_constants() + elif action == 'a_addr_delete': + idx = self.get_idx_from_user(action) + if idx: + e = self.unspent[idx-1] + if await self.wallet.remove_address(e.addr): + await self.get_unspent_data() + oneshot_msg = yellow(f'{capfirst(self.item_desc)} #{idx} removed\n\n') + else: + oneshot_msg = red('Address could not be removed\n\n') + self.display_constants = self.get_display_constants() + elif action == 'a_print': + of = '{}-{}[{}].out'.format( + self.dump_fn_pfx, + self.proto.dcoin, + ','.join(self.sort_info(include_group=False)).lower() ) + msg('') + try: + write_data_to_file( + of, + await self.format_for_printing(), + desc = f'{self.desc} listing' ) + except UserNonConfirmation as e: + oneshot_msg = red(f'File {of!r} not overwritten by user request\n\n') + else: + oneshot_msg = yellow(f'Data written to {of!r}\n\n') + elif action in ('a_view','a_view_wide'): + do_pager( + self.fmt_display if action == 'a_view' else + await self.format_for_printing(color=True) ) + if g.platform == 'linux' and oneshot_msg == None: + msg_r(CUR_RIGHT(len(prompt.split('\n')[-1])-2)) + no_output = True diff --git a/mmgen/tx.py b/mmgen/tx.py index f8114e9e..ce6abce9 100755 --- a/mmgen/tx.py +++ b/mmgen/tx.py @@ -881,7 +881,7 @@ class MMGenTX: assert isinstance(locktime,int),'locktime must be of type int' - from .tw import TwUnspentOutputs + from .twuo import TwUnspentOutputs if opt.comment_file: self.add_comment(opt.comment_file) @@ -1506,7 +1506,7 @@ class MMGenTX: tmp_tx = MMGenTX.Base() MMGenTxFile(tmp_tx).parse(filename,metadata_only=True) if tmp_tx.proto.tokensym: - from .tw import TrackingWallet + from .twctl import TrackingWallet return await TrackingWallet(tmp_tx.proto) else: return None diff --git a/test/objattrtest_py_d/oat_btc_mainnet.py b/test/objattrtest_py_d/oat_btc_mainnet.py index e7bd4ff6..365dc407 100755 --- a/test/objattrtest_py_d/oat_btc_mainnet.py +++ b/test/objattrtest_py_d/oat_btc_mainnet.py @@ -4,8 +4,8 @@ # Copyright (C)2013-2022 The MMGen Project """ -test.objattrtest_py_d.oat_btc_mainnet: BTC mainnet test vectors for MMGen data -objects +test.objattrtest_py_d.oat_btc_mainnet: + BTC mainnet test vectors for MMGen data objects """ from .oat_common import * @@ -108,7 +108,7 @@ tests = { [sample_objs['MasterShareIdx'], sample_objs['Seed'], 'foo', 2], {}, ), - # tw.py + # twuo.py 'TwUnspentOutputs.MMGenTwUnspentOutput': atd({ 'txid': (0b001, CoinTxID), 'vout': (0b001, int), diff --git a/test/objattrtest_py_d/oat_common.py b/test/objattrtest_py_d/oat_common.py index 35e9827d..4be91bdb 100755 --- a/test/objattrtest_py_d/oat_common.py +++ b/test/objattrtest_py_d/oat_common.py @@ -15,7 +15,7 @@ from mmgen.seedsplit import * from mmgen.protocol import * from mmgen.addr import * from mmgen.tx import * -from mmgen.tw import * +from mmgen.twuo import * from mmgen.key import * from ..include.common import getrand diff --git a/test/overlay/fakemods/tw.py b/test/overlay/fakemods/tw.py index 71f4b9f1..62547bfc 100644 --- a/test/overlay/fakemods/tw.py +++ b/test/overlay/fakemods/tw.py @@ -1,3 +1,4 @@ +import os from .tw_orig import * if os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC'): @@ -8,25 +9,12 @@ if os.getenv('MMGEN_TEST_SUITE_DETERMINISTIC'): _time_iter = _time_gen() - TwUnspentOutputs.date_formatter = { + TwCommon.date_formatter = { 'days': lambda rpc,secs: (next(_time_iter) - secs) // 86400, 'date': lambda rpc,secs: '{}-{:02}-{:02}'.format(*time.gmtime(next(_time_iter))[:3])[2:], 'date_time': lambda rpc,secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(next(_time_iter))[:5]), } - TwAddrList.date_formatter = TwUnspentOutputs.date_formatter - if os.getenv('MMGEN_BOGUS_WALLET_DATA'): # 1831006505 (09 Jan 2028) = projected time of block 1000000 - TwUnspentOutputs.date_formatter['days'] = lambda rpc,secs: (1831006505 - secs) // 86400 - - async def fake_set_dates(foo,rpc,us): - for o in us: - o.date = 1831006505 - int(9.7 * 60 * (o.confs - 1)) - - async def fake_get_unspent_rpc(foo): - from decimal import Decimal - return json.loads(get_data_from_file(os.getenv('MMGEN_BOGUS_WALLET_DATA')),parse_float=Decimal) - - TwUnspentOutputs.set_dates = fake_set_dates - TwUnspentOutputs.get_unspent_rpc = fake_get_unspent_rpc + TwCommon.date_formatter['days'] = lambda rpc,secs: (1831006505 - secs) // 86400 diff --git a/test/overlay/fakemods/twuo.py b/test/overlay/fakemods/twuo.py new file mode 100644 index 00000000..42204387 --- /dev/null +++ b/test/overlay/fakemods/twuo.py @@ -0,0 +1,17 @@ +import os +from .twuo_orig import * + +if os.getenv('MMGEN_BOGUS_WALLET_DATA'): + + async def fake_set_dates(foo,rpc,us): + for o in us: + o.date = 1831006505 - int(9.7 * 60 * (o.confs - 1)) + + async def fake_get_unspent_rpc(foo): + from decimal import Decimal + import json + from mmgen.util import get_data_from_file + return json.loads(get_data_from_file(os.getenv('MMGEN_BOGUS_WALLET_DATA')),parse_float=Decimal) + + TwUnspentOutputs.set_dates = fake_set_dates + TwUnspentOutputs.get_unspent_rpc = fake_get_unspent_rpc