EthereumTrackingWallet: addrimport,listaddresses,add_label

This commit is contained in:
The MMGen Project 2018-05-25 14:28:12 +00:00
commit 50e2e9b486
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
10 changed files with 215 additions and 15 deletions

View file

@ -862,6 +862,12 @@ re-import your addresses.
""".strip().format(pnm=pnm)
}
def __new__(cls,source=None):
if g.coin == 'ETH':
from mmgen.altcoins.eth.tw import EthereumAddrData
cls = EthereumAddrData
return MMGenObject.__new__(cls,source)
def __init__(self,source=None):
self.al_ids = {}
if source == 'tw': self.add_tw_data()

0
mmgen/altcoins/__init__.py Executable file
View file

0
mmgen/altcoins/eth/__init__.py Executable file
View file

137
mmgen/altcoins/eth/tw.py Executable file
View file

@ -0,0 +1,137 @@
#!/usr/bin/env python
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2018 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
altcoins.eth.tw: ETH tracking wallet functions and methods for the MMGen suite
"""
from mmgen.common import *
from mmgen.obj import *
import json
# No file locking - 2 processes accessing the wallet at the same time will corrupt it
class EthereumTrackingWallet(object):
data_dir = os.path.join(g.altcoin_data_dir,'eth')
tw_file = os.path.join(data_dir,'tracking-wallet.json')
def __init__(self):
check_or_create_dir(self.data_dir)
try:
self.data = json.loads(get_data_from_file(self.tw_file,silent=True))
except:
try: os.stat(self.tw_file)
except: self.data = {}
else: die(2,"File '{}' exists but does not contain valid json data")
else:
for d in self.data:
self.data[d]['mmid'] = TwMMGenID(self.data[d]['mmid'],on_fail='raise')
self.data[d]['comment'] = TwComment(self.data[d]['comment'],on_fail='raise')
def import_address(self,addr,label):
if addr in self.data:
if not self.data[addr]['mmid'] and label.mmid:
msg("Warning: MMGen ID '{}' was missing in tracking wallet!".format(label.mmid))
elif self.data[addr]['mmid'] != label.mmid:
die(3,"MMGen ID '{}' does not match tracking wallet!".format(label.mmid))
self.data[addr] = { 'mmid': label.mmid, 'comment': label.comment }
def write(self):
write_data_to_file(
self.tw_file,
json.dumps(self.data),
'Ethereum address data',
ask_overwrite=False,
silent=True)
def delete_all(self):
self.data = {}
self.write()
def delete(self,addr):
if is_coin_addr(addr):
have_match = lambda k: k == addr
elif is_mmgen_id(addr):
have_match = lambda k: self.data[k]['mmid'] == addr
else:
die(1,"'{}' is not an Ethereum address or MMGen ID".format(addr))
for k in self.data:
if have_match(k):
del self.data[k]
break
else:
die(1,"Address '{}' not found in tracking wallet".format(addr))
self.write()
def sorted_list(self):
return sorted(
map(lambda x: {'addr':x[0], 'mmid':x[1]['mmid'], 'comment':x[1]['comment'] }, self.data.items()),
key=lambda x: x['mmid'].sort_key+x['addr']
)
def mmid_ordered_dict(self):
from collections import OrderedDict
return OrderedDict(map(lambda x: (x['mmid'],{'addr':x['addr'],'comment':x['comment']}), self.sorted_list()))
@classmethod
def import_label(cls,coinaddr,lbl):
tw = cls()
for addr,d in tw.data.items():
if addr == coinaddr:
d['comment'] = lbl.comment
tw.write()
return None
else: # emulate RPC library
return ('rpcfail',(None,2,"Address '{}' not found in tracking wallet".format(coinaddr)))
from mmgen.tw import TwAddrList
class EthereumTwAddrList(TwAddrList):
def __init__(self,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels):
tw = EthereumTrackingWallet().mmid_ordered_dict()
self.total = g.proto.coin_amt('0')
rpc_init()
# cur_blk = int(g.rpch.eth_blockNumber(),16)
for mmid,d in tw.items():
# if d['confirmations'] < minconf: continue
label = TwLabel(mmid+' '+d['comment'],on_fail='raise')
if usr_addr_list and (label.mmid not in usr_addr_list): continue
bal = ETHAmt(int(g.rpch.eth_getBalance('0x'+d['addr']),16),fromWei=True)
if bal == 0 and not showempty:
if not label.comment: continue
if not all_labels: continue
self[label.mmid] = {'amt': g.proto.coin_amt('0'), 'lbl': label }
if showbtcaddrs:
self[label.mmid]['addr'] = CoinAddr(d['addr'])
self[label.mmid]['lbl'].mmid.confs = 9999 # TODO
self[label.mmid]['amt'] += bal
self.total += bal
from mmgen.addr import AddrData
class EthereumAddrData(AddrData):
@classmethod
def get_tw_data(cls):
vmsg('Getting address data from tracking wallet')
tw = EthereumTrackingWallet().mmid_ordered_dict()
# emulate the output of RPC 'listaccounts' and 'getaddressesbyaccount'
return [(mmid+' '+d['comment'],[d['addr']]) for mmid,d in tw.items()]

View file

@ -101,16 +101,25 @@ else:
m = ' from Seed ID {}'.format(al.al_id.sid) if hasattr(al.al_id,'sid') else ''
qmsg('OK. {} addresses{}'.format(al.num_addrs,m))
if not opt.quiet: confirm_or_exit(ai_msgs('rescan'),'continue',expect='YES')
err_flag = False
def import_address(addr,label,rescan):
try:
g.rpch.importaddress(addr,label,rescan,timeout=(False,3600)[rescan])
except:
global err_flag
err_flag = True
if g.coin == 'ETH':
if opt.rescan:
die('--rescan option meaningless for coin {}'.format(g.coin))
from mmgen.altcoins.eth.tw import EthereumTrackingWallet
eth_tw = EthereumTrackingWallet()
def import_address(addr,label,rescan):
eth_tw.import_address(addr,label)
else:
if not opt.quiet: confirm_or_exit(ai_msgs('rescan'),'continue',expect='YES')
def import_address(addr,label,rescan):
try:
g.rpch.importaddress(addr,label,rescan,timeout=(False,3600)[rescan])
except:
global err_flag
err_flag = True
w_n_of_m = len(str(al.num_addrs)) * 2 + 2
w_mmid = 1 if opt.addrlist or opt.address else len(str(max(al.idxs()))) + 13
@ -168,3 +177,6 @@ for n,e in enumerate(al.data):
if opt.batch:
ret = g.rpch.importaddress(arg_list,batch=True)
msg('OK: {} addresses imported'.format(len(ret)))
if g.coin == 'ETH':
eth_tw.write()

View file

@ -369,6 +369,25 @@ class BTCAmt(Decimal,Hilite,InitErrors):
class BCHAmt(BTCAmt): pass
class B2XAmt(BTCAmt): pass
class LTCAmt(BTCAmt): max_amt = 84000000
class ETHAmt(BTCAmt):
max_prec = 18
max_amt = 999999999 # TODO
min_coin_unit = Decimal('0.000000000000000001') # wei
def __new__(cls,num,on_fail='die',fromWei=False):
if type(num) == cls: return num
cls.arg_chk(cls,on_fail)
try:
if fromWei:
assert type(num) in (int,long),'value is not an integer or long integer'
return super(cls,cls).__new__(cls,num * cls.min_coin_unit)
return super(cls,cls).__new__(cls,num)
except Exception as e:
m = "{!r}: value cannot be converted to {} ({})"
return cls.init_fail(m.format(num,cls.__name__,e[0]),on_fail)
def toWei(self):
return int(Decimal(self) / self.min_coin_unit)
class CoinAddr(str,Hilite,InitErrors,MMGenObject):
color = 'cyan'
@ -403,13 +422,16 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
def is_for_chain(self,chain):
from mmgen.globalvars import g
if g.coin in ('ETH','ETC'):
return True
def pfx_ok(pfx):
if type(pfx) == tuple:
if self[0] in pfx: return True
elif self[:len(pfx)] == pfx: return True
return False
from mmgen.globalvars import g
proto = g.proto.get_protocol_by_chain(chain)
vn = proto.addr_ver_num
@ -421,6 +443,12 @@ class CoinAddr(str,Hilite,InitErrors,MMGenObject):
return pfx_ok(vn[self.addr_fmt][1])
def is_in_tracking_wallet(self):
from mmgen.globalvars import g
if g.coin in ('ETH','ETC'):
from mmgen.altcoins.eth.tw import EthereumTrackingWallet
return self in EthereumTrackingWallet().data.keys()
from mmgen.rpc import rpc_init
d = rpc_init().validateaddress(self)
return d['iswatchonly'] and 'account' in d

View file

@ -331,6 +331,7 @@ def init(opts_f,add_opts=[],opt_filter=None):
for k in ('prog_name','desc','usage','options','notes'):
if k in opts_data: del opts_data[k]
g.altcoin_data_dir = os.path.join(g.data_dir,'altcoins')
warn_altcoins(altcoin_trust_level)
return args

View file

@ -23,7 +23,7 @@ protocol.py: Coin protocol functions, classes and methods
import sys,os,hashlib
from binascii import unhexlify
from mmgen.util import msg,pmsg,Msg,pdie
from mmgen.obj import MMGenObject,BTCAmt,LTCAmt,BCHAmt,B2XAmt
from mmgen.obj import MMGenObject,BTCAmt,LTCAmt,BCHAmt,B2XAmt,ETHAmt
from mmgen.globalvars import g
import mmgen.bech32 as bech32
@ -297,6 +297,7 @@ class EthereumProtocol(DummyWIF,BitcoinProtocolAddrgen):
daemon_name = 'parity'
rpc_port = 8545
mmcaps = ('key','addr','rpc')
coin_amt = ETHAmt
@classmethod
def verify_addr(cls,addr,hex_width,return_dict=False):

View file

@ -312,6 +312,14 @@ Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
self.display()
msg(prompt)
@classmethod
def import_label(cls,coinaddr,lbl):
# NOTE: this works because importaddress() removes the old account before
# associating the new account with the address.
# Will be replaced by setlabel() with new RPC label API
# RPC args: addr,label,rescan[=true],p2sh[=none]
return g.rpch.importaddress(coinaddr,lbl,False,on_fail='return')
# returns on failure
@classmethod
def add_label(cls,arg1,label='',addr=None,silent=False,on_fail='return'):
@ -350,11 +358,11 @@ Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
lbl = TwLabel(mmaddr + ('',' '+cmt)[bool(cmt)],on_fail=on_fail)
# NOTE: this works because importaddress() removes the old account before
# associating the new account with the address.
# Will be replaced by setlabel() with new RPC label API
# RPC args: addr,label,rescan[=true],p2sh[=none]
ret = g.rpch.importaddress(coinaddr,lbl,False,on_fail='return')
if g.coin == 'ETH':
from mmgen.altcoins.eth.tw import EthereumTrackingWallet
cls = EthereumTrackingWallet
ret = cls.import_label(coinaddr,lbl)
from mmgen.rpc import rpc_error,rpc_errmsg
if rpc_error(ret):
@ -375,6 +383,12 @@ Display options: show [D]ays, [g]roup, show [m]mgen addr, r[e]draw screen
class TwAddrList(MMGenDict):
def __new__(cls,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels):
if g.coin == 'ETH':
from mmgen.altcoins.eth.tw import EthereumTwAddrList
cls = EthereumTwAddrList
return MMGenDict.__new__(cls,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels)
def __init__(self,usr_addr_list,minconf,showempty,showbtcaddrs,all_labels):
def check_dup_mmid(acct_labels):

View file

@ -846,6 +846,7 @@ def rpc_init(reinit=False):
auth=False)
if not g.daemon_version: # First call
g.daemon_version = conn.parity_versionInfo()['version'] # fail immediately if daemon is geth
g.chain = conn.parity_chain()
else:
cfg = get_daemon_cfg_options(('rpcuser','rpcpassword'))
conn = mmgen.rpc.CoinDaemonRPCConnection(