mmgen-wallet/mmgen/tool.py
2019-02-12 21:35:12 +03:00

748 lines
27 KiB
Python
Executable file

#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
# Copyright (C)2013-2019 The MMGen Project <mmgen@tuta.io>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
tool.py: Routines and data for the 'mmgen-tool' utility
"""
import binascii
from collections import OrderedDict
from mmgen.protocol import hash160
from mmgen.common import *
from mmgen.crypto import *
from mmgen.tx import *
from mmgen.addr import *
pnm = g.proj_name
cmd_data = OrderedDict([
('Help', ['<tool command> [str]']),
('Usage', ['<tool command> [str]']),
('Strtob58', ['<string> [str-]','pad [int=0]']),
('B58tostr', ['<b58 number> [str-]']),
('Hextob58', ['<hex number> [str-]','pad [int=0]']),
('Hextob58chk', ['<hex number> [str-]']),
('B58tohex', ['<b58 number> [str-]','pad [int=0]']),
('B58chktohex', ['<b58 number> [str-]']),
('B58randenc', []),
('B32tohex', ['<b32 num> [str-]','pad [int=0]']),
('Hextob32', ['<hex num> [str-]','pad [int=0]']),
('Randhex', ['nbytes [int=32]']),
('Id8', ['<infile> [str]']),
('Id6', ['<infile> [str]']),
('Hash160', ['<hexadecimal string> [str-]']),
('Hash256', ['<str, hexstr or filename> [str]', # TODO handle stdin
'hex_input [bool=False]','file_input [bool=False]']),
('Str2id6', ['<string (spaces are ignored)> [str-]']),
('Hexdump', ['<infile> [str]', 'cols [int=8]', 'line_nums [bool=True]']),
('Unhexdump', ['<infile> [str]']),
('Hexreverse', ['<hexadecimal string> [str-]']),
('Hexlify', ['<string> [str-]']),
('Rand2file', ['<outfile> [str]','<nbytes> [str]','threads [int=4]','silent [bool=False]']),
('Randwif', []),
('Randpair', []),
('Hex2wif', ['<private key in hex format> [str-]']),
('Wif2hex', ['<wif> [str-]']),
('Wif2addr', ['<wif> [str-]']),
('Wif2segwit_pair',['<wif> [str-]']),
('Pubhash2addr', ['<coin address in hex format> [str-]']),
('Addr2hexaddr', ['<coin address> [str-]']),
('Privhex2addr', ['<private key in hex format> [str-]']),
('Privhex2pubhex',['<private key in hex format> [str-]']),
('Pubhex2addr', ['<public key in hex format> [str-]']), # new
('Pubhex2redeem_script',['<public key in hex format> [str-]']), # new
('Wif2redeem_script', ['<private key in WIF format> [str-]']), # new
('Hex2mn', ['<hexadecimal string> [str-]',"wordlist [str='electrum']"]),
('Mn2hex', ['<mnemonic> [str-]', "wordlist [str='electrum']"]),
('Mn_rand128', ["wordlist [str='electrum']"]),
('Mn_rand192', ["wordlist [str='electrum']"]),
('Mn_rand256', ["wordlist [str='electrum']"]),
('Mn_stats', ["wordlist [str='electrum']"]),
('Mn_printlist', ["wordlist [str='electrum']"]),
('Gen_addr', ['<{} ID> [str]'.format(pnm),"wallet [str='']"]),
('Gen_key', ['<{} ID> [str]'.format(pnm),"wallet [str='']"]),
('Listaddress',['<{} address> [str]'.format(pnm),'minconf [int=1]','pager [bool=False]','showempty [bool=True]','showbtcaddr [bool=True]','show_age [bool=False]','show_days [bool=True]']),
('Listaddresses',["addrs [str='']",'minconf [int=1]','showempty [bool=False]','pager [bool=False]','showbtcaddrs [bool=True]','all_labels [bool=False]',"sort [str=''] (options: reverse, age)",'show_age [bool=False]','show_days [bool=True]']),
('Getbalance', ['minconf [int=1]','quiet [bool=False]','pager [bool=False]']),
('Txview', ['<{} TX file(s)> [str]'.format(pnm),'pager [bool=False]','terse [bool=False]',"sort [str='mtime'] (options: ctime, atime)",'MARGS']),
('Twview', ["sort [str='age']",'reverse [bool=False]','show_days [bool=True]','show_mmid [bool=True]','minconf [int=1]','wide [bool=False]','pager [bool=False]']),
('Add_label', ['<{} or coin address> [str]'.format(pnm),'<label> [str]']),
('Remove_label', ['<{} or coin address> [str]'.format(pnm)]),
('Remove_address', ['<{} or coin address> [str]'.format(pnm)]),
('Addrfile_chksum', ['<{} addr file> [str]'.format(pnm),"mmtype [str='']"]),
('Keyaddrfile_chksum', ['<{} addr file> [str]'.format(pnm),"mmtype [str='']"]),
('Passwdfile_chksum', ['<{} password file> [str]'.format(pnm)]),
('Find_incog_data', ['<file or device name> [str]','<Incog ID> [str]','keep_searching [bool=False]']),
('Encrypt', ['<infile> [str]',"outfile [str='']","hash_preset [str='']"]),
('Decrypt', ['<infile> [str]',"outfile [str='']","hash_preset [str='']"]),
('Bytespec', ['<bytespec> [str]']),
('Keyaddrlist2monerowallets',['<{} XMR key-address file> [str]'.format(pnm),'blockheight [int=(current height)]',"addrs [str=''] (addr idx list or range)"]),
('Syncmonerowallets', ['<{} XMR key-address file> [str]'.format(pnm),"addrs [str=''] (addr idx list or range)"]),
])
def usage(command):
for v in cmd_data.values():
if v and v[0][-2:] == '-]':
v[0] = v[0][:-2] + ' or STDIN]'
if 'MARGS' in v: v.remove('MARGS')
if not command:
Msg('Usage information for mmgen-tool commands:')
for k,v in list(cmd_data.items()):
Msg(' {:18} {}'.format(k.lower(),' '.join(v)))
from mmgen.main_tool import stdin_msg
Msg('\n '+'\n '.join(stdin_msg.split('\n')))
sys.exit(0)
Command = command.capitalize()
if Command in cmd_data:
import re
from mmgen.main_tool import cmd_help
for line in cmd_help.split('\n'):
if re.match(r'\s+{}\s+'.format(command),line):
c,h = line.split('-',1)
Msg('MMGEN-TOOL {}: {}'.format(c.strip().upper(),h.strip()))
cd = cmd_data[Command]
msg('USAGE: {} {} {}'.format(g.prog_name,command.lower(),' '.join(cd)))
else:
msg("'{}': no such tool command".format(command))
sys.exit(1)
Help = usage
def process_args(command,cmd_args):
if 'MARGS' in cmd_data[command]:
cmd_data[command].remove('MARGS')
margs = True
else:
margs = False
c_args = [[i.split(' [')[0],i.split(' [')[1][:-1]]
for i in cmd_data[command] if '=' not in i]
c_kwargs = dict([[
i.split(' [')[0],
[i.split(' [')[1].split('=')[0],i.split(' [')[1].split('=')[1][:-1]]
] for i in cmd_data[command] if '=' in i])
if not margs:
u_args = [a for a in cmd_args[:len(c_args)]]
if c_args and c_args[0][1][-1] == '-':
c_args[0][1] = c_args[0][1][:-1] # [str-] -> [str]
# If we're reading from a pipe, replace '-' with output of previous command
if u_args and u_args[0] == '-':
if not sys.stdin.isatty():
u_args[0] = sys.stdin.read().strip()
if not u_args[0]:
die(2,'{}: ERROR: no output from previous command in pipe'.format(command.lower()))
if not margs and len(u_args) < len(c_args):
m1 = 'Command requires exactly {} non-keyword argument{}'
msg(m1.format(len(c_args),suf(c_args,'s')))
usage(command)
extra_args = len(cmd_args) - len(c_args)
u_kwargs = {}
if margs:
t = [a.split('=') for a in cmd_args if '=' in a]
tk = [a[0] for a in t]
tk_bad = [a for a in tk if a not in c_kwargs]
if set(tk_bad) != set(tk[:len(tk_bad)]):
die(1,"'{}': illegal keyword argument".format(tk_bad[-1]))
u_kwargs = dict(t[len(tk_bad):])
u_args = cmd_args[:-len(u_kwargs) or None]
elif extra_args > 0:
u_kwargs = dict([a.split('=') for a in cmd_args[len(c_args):] if '=' in a])
if len(u_kwargs) != extra_args:
msg('Command requires exactly {} non-keyword argument{}'.format(len(c_args),suf(c_args,'s')))
usage(command)
if len(u_kwargs) > len(c_kwargs):
msg('Command requires exactly {} keyword argument{}'.format(len(c_kwargs),suf(c_kwargs,'s')))
usage(command)
# mdie(c_args,c_kwargs,u_args,u_kwargs)
for k in u_kwargs:
if k not in c_kwargs:
msg("'{}': invalid keyword argument".format(k))
usage(command)
def conv_type(arg,arg_name,arg_type):
if arg_type == 'bytes': pdie(arg,arg_name,arg_type)
if arg_type == 'bool':
if arg.lower() in ('true','yes','1','on'): arg = True
elif arg.lower() in ('false','no','0','off'): arg = False
else:
msg("'{}': invalid boolean value for keyword argument".format(arg))
usage(command)
try:
return __builtins__[arg_type](arg)
except:
die(1,"'{}': Invalid argument for argument {} ('{}' required)".format(arg,arg_name,arg_type))
if margs:
args = [conv_type(u_args[i],c_args[0][0],c_args[0][1]) for i in range(len(u_args))]
else:
args = [conv_type(u_args[i],c_args[i][0],c_args[i][1]) for i in range(len(c_args))]
kwargs = dict([(k,conv_type(u_kwargs[k],k,c_kwargs[k][0])) for k in u_kwargs])
return args,kwargs
# Individual cmd_data
def are_equal(a,b,dtype=''):
if dtype == 'str': return a.lstrip('\0') == b.lstrip('\0')
if dtype == 'hex': return a.lstrip('0') == b.lstrip('0')
if dtype == 'b58': return a.lstrip('1') == b.lstrip('1')
else: return a == b
def print_convert_results(indata,enc,dec,dtype):
error = (True,False)[are_equal(indata,dec,dtype)]
if error or opt.verbose:
Msg('Input: {}'.format(indata))
Msg('Encoded data: {}'.format(enc))
Msg('Recoded data: {}'.format(dec))
else: Msg(enc.decode())
if error:
die(3,"Error! Recoded data doesn't match input!")
from mmgen.obj import MMGenAddrType
at = MMGenAddrType((hasattr(opt,'type') and opt.type) or g.proto.dfl_mmtype)
kg = KeyGenerator(at)
ag = AddrGenerator(at)
def Hexdump(infile,cols=8,line_nums=True):
Msg(pretty_hexdump(
get_data_from_file(infile,dash=True,silent=True,binary=True),
cols=cols,line_nums=line_nums))
def Unhexdump(infile):
if g.platform == 'win':
import msvcrt
msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
hexdata = get_data_from_file(infile,dash=True,silent=True)
ret = decode_pretty_hexdump(hexdata)
os.write(g.stdout_fileno,ret)
def B58randenc():
r = get_random(32)
enc = baseconv.b58encode(r,pad=True)
dec = baseconv.b58decode(enc,pad=True)
print_convert_results(r,enc.encode(),dec,'bytes')
def Randhex(nbytes='32'):
Msg(binascii.hexlify(get_random(int(nbytes))).decode())
def Randwif():
Msg(PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed).wif)
def Randpair():
privhex = PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed)
addr = ag.to_addr(kg.to_pubhex(privhex))
Vmsg('Key (hex): {}'.format(privhex))
Vmsg_r('Key (WIF): '); Msg(privhex.wif)
Vmsg_r('Addr: '); Msg(addr)
def Wif2addr(wif):
privhex = PrivKey(wif=wif)
addr = ag.to_addr(kg.to_pubhex(privhex))
Vmsg_r('Addr: '); Msg(addr)
def Wif2segwit_pair(wif):
pubhex = kg.to_pubhex(PrivKey(wif=wif))
addr = ag.to_addr(pubhex)
rs = ag.to_segwit_redeem_script(pubhex)
Msg('{}\n{}'.format(rs.decode(),addr))
def Pubhash2addr(pubhash):
if opt.type == 'bech32':
ret = g.proto.pubhash2bech32addr(pubhash.encode())
else:
ret = g.proto.pubhash2addr(pubhash.encode(),at.addr_fmt=='p2sh')
Msg(ret)
def Addr2hexaddr(addr): Msg(g.proto.verify_addr(addr,CoinAddr.hex_width,return_dict=True)['hex'].decode())
def Hash160(pubkeyhex): Msg(hash160(pubkeyhex).decode())
def Pubhex2addr(pubkeyhex): Pubhash2addr(hash160(pubkeyhex.encode()).decode())
def Wif2hex(wif): Msg(PrivKey(wif=wif).decode())
def Hex2wif(hexpriv):
Msg(g.proto.hex2wif(hexpriv.encode(),pubkey_type=at.pubkey_type,compressed=at.compressed))
def Privhex2addr(privhex,output_pubhex=False):
pk = PrivKey(binascii.unhexlify(privhex),compressed=at.compressed,pubkey_type=at.pubkey_type)
ph = kg.to_pubhex(pk)
Msg(ph.decode() if output_pubhex else ag.to_addr(ph))
def Privhex2pubhex(privhex): # new
Privhex2addr(privhex,output_pubhex=True)
def Pubhex2redeem_script(pubhex): # new
Msg(g.proto.pubhex2redeem_script(pubhex).decode())
def Wif2redeem_script(wif): # new
privhex = PrivKey(wif=wif)
Msg(ag.to_segwit_redeem_script(kg.to_pubhex(privhex)).decode())
wordlists = 'electrum','tirosh'
dfl_wl_id = 'electrum'
def do_random_mn(nbytes,wordlist):
hexrand = binascii.hexlify(get_random(nbytes))
Vmsg('Seed: {}'.format(hexrand))
for wl_id in ([wordlist],wordlists)[wordlist=='all']:
if wordlist == 'all':
Msg('{} mnemonic:'.format(capfirst(wl_id)))
mn = baseconv.fromhex(hexrand,wl_id)
Msg(' '.join(mn))
def Mn_rand128(wordlist=dfl_wl_id): do_random_mn(16,wordlist)
def Mn_rand192(wordlist=dfl_wl_id): do_random_mn(24,wordlist)
def Mn_rand256(wordlist=dfl_wl_id): do_random_mn(32,wordlist)
def Hex2mn(s,wordlist=dfl_wl_id): Msg(' '.join(baseconv.fromhex(s.encode(),wordlist)))
def Mn2hex(s,wordlist=dfl_wl_id): Msg(baseconv.tohex(s.split(),wordlist))
def Strtob58(s,pad=None): Msg(baseconv.fromhex(binascii.hexlify(s.encode()),'b58',pad,tostr=True))
def Hextob58(s,pad=None): Msg(baseconv.fromhex(s.encode(),'b58',pad,tostr=True))
def Hextob58chk(s):
from mmgen.protocol import _b58chk_encode
Msg(_b58chk_encode(s))
def Hextob32(s,pad=None): Msg(baseconv.fromhex(s.encode(),'b32',pad,tostr=True))
def B58tostr(s): Msg(binascii.unhexlify(baseconv.tohex(s,'b58')).decode())
def B58tohex(s,pad=None): Msg(baseconv.tohex(s,'b58',pad))
def B58chktohex(s):
from mmgen.protocol import _b58chk_decode
Msg(_b58chk_decode(s))
def B32tohex(s,pad=None): Msg(baseconv.tohex(s.upper(),'b32',pad))
from mmgen.seed import Mnemonic
def Mn_stats(wordlist=dfl_wl_id):
wordlist in baseconv.digits or die(1,"'{}': not a valid wordlist".format(wordlist))
baseconv.check_wordlist(wordlist)
def Mn_printlist(wordlist=dfl_wl_id):
wordlist in baseconv.digits or die(1,"'{}': not a valid wordlist".format(wordlist))
Msg('\n'.join(baseconv.digits[wordlist]))
def Id8(infile):
Msg(make_chksum_8(
get_data_from_file(infile,dash=True,silent=True,binary=True)
))
def Id6(infile):
Msg(make_chksum_6(
get_data_from_file(infile,dash=True,silent=True,binary=True)
))
def Str2id6(s): # retain ignoring of space for backwards compat
Msg(make_chksum_6(''.join(s.split())))
def Addrfile_chksum(infile,mmtype=''):
from mmgen.addr import AddrList
mmtype = None if not mmtype else MMGenAddrType(mmtype)
AddrList(infile,chksum_only=True,mmtype=mmtype)
def Keyaddrfile_chksum(infile,mmtype=''):
from mmgen.addr import KeyAddrList
mmtype = None if not mmtype else MMGenAddrType(mmtype)
KeyAddrList(infile,chksum_only=True,mmtype=mmtype)
def Passwdfile_chksum(infile):
from mmgen.addr import PasswordList
PasswordList(infile=infile,chksum_only=True)
def Hexreverse(s):
Msg(binascii.hexlify(binascii.unhexlify(s.strip())[::-1]).decode())
def Hexlify(s):
Msg(binascii.hexlify(s.encode()).decode())
def Hash256(s,file_input=False,hex_input=False):
from hashlib import sha256
if file_input: b = get_data_from_file(s,binary=True)
elif hex_input: b = decode_pretty_hexdump(s)
else: b = s
Msg(sha256(sha256(b.encode()).digest()).hexdigest())
def Encrypt(infile,outfile='',hash_preset=''):
data = get_data_from_file(infile,'data for encryption',binary=True)
enc_d = mmgen_encrypt(data,'user data',hash_preset)
if not outfile:
outfile = '{}.{}'.format(os.path.basename(infile),g.mmenc_ext)
write_data_to_file(outfile,enc_d,'encrypted data',binary=True)
def Decrypt(infile,outfile='',hash_preset=''):
enc_d = get_data_from_file(infile,'encrypted data',binary=True)
while True:
dec_d = mmgen_decrypt(enc_d,'user data',hash_preset)
if dec_d: break
msg('Trying again...')
if not outfile:
o = os.path.basename(infile)
outfile = remove_extension(o,g.mmenc_ext)
if outfile == o: outfile += '.dec'
write_data_to_file(outfile,dec_d,'decrypted data',binary=True)
def Find_incog_data(filename,iv_id,keep_searching=False):
ivsize,bsize,mod = g.aesctr_iv_len,4096,4096*8
n,carry = 0,b' '*ivsize
flgs = os.O_RDONLY|os.O_BINARY if g.platform == 'win' else os.O_RDONLY
f = os.open(filename,flgs)
for ch in iv_id:
if ch not in '0123456789ABCDEF':
die(2,"'{}': invalid Incog ID".format(iv_id))
while True:
d = os.read(f,bsize)
if not d: break
d = carry + d
for i in range(bsize):
if sha256(d[i:i+ivsize]).hexdigest()[:8].upper() == iv_id:
if n+i < ivsize: continue
msg('\rIncog data for ID {} found at offset {}'.format(iv_id,n+i-ivsize))
if not keep_searching: sys.exit(0)
carry = d[len(d)-ivsize:]
n += bsize
if not n % mod:
msg_r('\rSearched: {} bytes'.format(n))
msg('')
os.close(f)
def Rand2file(outfile,nbytes,threads=4,silent=False):
nbytes = parse_nbytes(nbytes)
from Crypto import Random
rh = Random.new()
from queue import Queue
from threading import Thread
bsize = 2**20
roll = bsize * 4
if opt.outdir: outfile = make_full_path(opt.outdir,outfile)
f = open(outfile,'wb')
from Crypto.Cipher import AES
from Crypto.Util import Counter
key = get_random(32)
def encrypt_worker(wid):
while True:
i,d = q1.get()
c = AES.new(key,AES.MODE_CTR,counter=Counter.new(g.aesctr_iv_len*8,initial_value=i))
enc_data = c.encrypt(d)
q2.put(enc_data)
q1.task_done()
def output_worker():
while True:
data = q2.get()
f.write(data)
q2.task_done()
q1 = Queue()
for i in range(max(1,threads-2)):
t = Thread(target=encrypt_worker,args=(i,))
t.daemon = True
t.start()
q2 = Queue()
t = Thread(target=output_worker)
t.daemon = True
t.start()
i = 1; rbytes = nbytes
while rbytes > 0:
d = rh.read(min(bsize,rbytes))
q1.put((i,d))
rbytes -= bsize
i += 1
if not (bsize*i) % roll:
msg_r('\rRead: {} bytes'.format(bsize*i))
if not silent:
msg('\rRead: {} bytes'.format(nbytes))
qmsg("\r{} bytes of random data written to file '{}'".format(nbytes,outfile))
q1.join()
q2.join()
f.close()
def Bytespec(s): Msg(str(parse_nbytes(s)))
def Keyaddrlist2monerowallets(infile,blockheight=None,addrs=None):
monero_wallet_ops(infile=infile,op='create',blockheight=blockheight,addrs=addrs)
def Syncmonerowallets(infile,addrs=None):
monero_wallet_ops(infile=infile,op='sync',addrs=addrs)
def monero_wallet_ops(infile,op,blockheight=None,addrs=None):
def run_cmd(cmd):
import subprocess as sp
p = sp.Popen(cmd,stdin=sp.PIPE,stdout=sp.PIPE,stderr=sp.PIPE)
return p
def test_rpc():
p = run_cmd(['monero-wallet-cli','--version'])
if not b'Monero' in p.stdout.read():
die(1,"Unable to run 'monero-wallet-cli'!")
p = run_cmd(['monerod','status'])
import re
m = re.search(r'Height: (\d+)/\d+ ',p.stdout.read().decode())
if not m:
die(1,'Unable to connect to monerod!')
return int(m.group(1))
def my_expect(p,m,s,regex=False):
if m: msg_r(' {}...'.format(m))
ret = (p.expect_exact,p.expect)[regex](s)
vmsg("\nexpect: '{}' => {}".format(s,ret))
if not (ret == 0 or (type(s) == list and ret in (0,1))):
die(2,"Expect failed: '{}' (return value: {})".format(s,ret))
if m: msg('OK')
return ret
def my_sendline(p,m,s,usr_ret):
if m: msg_r(' {}...'.format(m))
ret = p.sendline(s)
if ret != usr_ret:
die(2,"Unable to send line '{}' (return value {})".format(s,ret))
if m: msg('OK')
vmsg("sendline: '{}' => {}".format(s,ret))
def create(n,d,fn):
try: os.stat(fn)
except: pass
else: die(1,"Wallet '{}' already exists!".format(fn))
p = pexpect.spawn('monero-wallet-cli --generate-from-spend-key {}'.format(fn))
if g.debug: p.logfile = sys.stdout
my_expect(p,'Awaiting initial prompt','Secret spend key: ')
my_sendline(p,'',d.sec.decode(),65)
my_expect(p,'','Enter.* new.* password.*: ',regex=True)
my_sendline(p,'Sending password',d.wallet_passwd,33)
my_expect(p,'','Confirm password: ')
my_sendline(p,'Sending password again',d.wallet_passwd,33)
my_expect(p,'','of your choice: ')
my_sendline(p,'','1',2)
my_expect(p,'monerod generating wallet','Generated new wallet: ')
my_expect(p,'','\n')
if d.addr not in p.before.decode():
die(3,'Addresses do not match!\n MMGen: {}\n Monero: {}'.format(d.addr,p.before.decode()))
my_expect(p,'','View key: ')
my_expect(p,'','\n')
if d.viewkey not in p.before.decode():
die(3,'View keys do not match!\n MMGen: {}\n Monero: {}'.format(d.viewkey,p.before.decode()))
my_expect(p,'','(YYYY-MM-DD): ')
h = str(blockheight or cur_height-1)
my_sendline(p,'',h,len(h)+1)
ret = my_expect(p,'',['Starting refresh','Still apply restore height? (Y/Yes/N/No): '])
if ret == 1:
my_sendline(p,'','Y',2)
m = ' Warning: {}: blockheight argument is higher than current blockheight'
ymsg(m.format(blockheight))
elif blockheight != None:
p.logfile = sys.stderr
my_expect(p,'Syncing wallet','\[wallet.*$',regex=True)
p.logfile = None
my_sendline(p,'Exiting','exit',5)
p.read()
def sync(n,d,fn):
try: os.stat(fn)
except: die(1,"Wallet '{}' does not exist!".format(fn))
p = pexpect.spawn('monero-wallet-cli --wallet-file={}'.format(fn))
if g.debug: p.logfile = sys.stdout
my_expect(p,'Awaiting password prompt','Wallet password: ')
my_sendline(p,'Sending password',d.wallet_passwd,33)
msg(' Starting refresh...')
height = None
while True:
ret = p.expect([r' / .*',r'\[wallet.*:.*'])
if ret == 0: # TODO: coverage
height = p.after
msg_r('\r Block {}{}'.format(p.before.split()[-1],height))
elif ret == 1:
if height:
height = height.split()[-1]
msg('\r Block {h} / {h}'.format(h=height))
else:
msg(' Wallet in sync')
b = [l for l in p.before.decode().splitlines() if len(l) > 7 and l[:8] == 'Balance:'][0].split()
msg(' Balance: {} Unlocked balance: {}'.format(b[1],b[4]))
from mmgen.obj import XMRAmt
bals[fn] = ( XMRAmt(b[1][:-1]), XMRAmt(b[4]) )
my_sendline(p,'Exiting','exit',5)
p.read()
break
else:
die(2,"\nExpect failed: (return value: {})".format(ret))
def process_wallets():
m = { 'create': ('Creat','Generat',create,False),
'sync': ('Sync', 'Sync', sync, True) }
opt.accept_defaults = opt.accept_defaults or m[op][3]
from mmgen.protocol import init_coin
init_coin('xmr')
from mmgen.addr import AddrList
al = KeyAddrList(infile)
data = [d for d in al.data if addrs == None or d.idx in AddrIdxList(addrs)]
dl = len(data)
assert dl,"No addresses in addrfile within range '{}'".format(addrs)
gmsg('\n{}ing {} wallet{}'.format(m[op][0],dl,suf(dl)))
for n,d in enumerate(data): # [d.sec,d.wallet_passwd,d.viewkey,d.addr]
fn = os.path.join(
opt.outdir or '','{}-{}-MoneroWallet{}'.format(
al.al_id.sid,
d.idx,
'' if g.debug_utf8 else ''))
gmsg('\n{}ing wallet {}/{} ({})'.format(m[op][1],n+1,dl,fn))
m[op][2](n,d,fn)
gmsg('\n{} wallet{} {}ed'.format(dl,suf(dl),m[op][0].lower()))
if op == 'sync':
col1_w = max(map(len,bals)) + 1
fs = '{:%s} {} {}' % col1_w
msg('\n'+fs.format('Wallet','Balance ','Unlocked Balance '))
from mmgen.obj import XMRAmt
tbals = [XMRAmt('0'),XMRAmt('0')]
for bal in bals:
for i in (0,1): tbals[i] += bals[bal][i]
msg(fs.format(bal+':',*[XMRAmt(b).fmt(fs='5.12',color=True) for b in bals[bal]]))
msg(fs.format('-'*col1_w,'-'*18,'-'*18))
msg(fs.format('TOTAL:',*[XMRAmt(b).fmt(fs='5.12',color=True) for b in tbals]))
os.environ['LANG'] = 'C'
import pexpect
if blockheight != None and int(blockheight) < 0:
blockheight = 0 # TODO: non-zero coverage
cur_height = test_rpc()
bals = OrderedDict() # locked,unlocked
try:
process_wallets()
except KeyboardInterrupt:
rdie(1,'\nUser interrupt\n')
except EOFError:
rdie(2,'\nEnd of file\n')
except Exception as e:
try:
die(1,'Error: {}'.format(e.args[0]))
except:
rdie(1,'Error: {!r}'.format(e.args[0]))
# ================ RPC commands ================== #
def Gen_addr(addr,wallet='',target='addr',return_result=False):
addr = MMGenID(addr)
sf = get_seed_file([wallet] if wallet else [],1)
opt.quiet = True
from mmgen.seed import SeedSource
ss = SeedSource(sf)
if ss.seed.sid != addr.sid:
m = 'Seed ID of requested address ({}) does not match wallet ({})'
die(1,m.format(addr.sid,ss.seed.sid))
al = AddrList(seed=ss.seed,addr_idxs=AddrIdxList(str(addr.idx)),mmtype=addr.mmtype,do_chksum=False)
d = al.data[0]
ret = d.sec.wif if target=='wif' else d.addr
if return_result: return ret
else: Msg(ret)
def Gen_key(addr,wallet=''):
return Gen_addr(addr,wallet,target='wif')
def Listaddress(addr,minconf=1,pager=False,showempty=True,showbtcaddr=True,show_age=False,show_days=None):
return Listaddresses(addrs=addr,minconf=minconf,pager=pager,
showempty=showempty,showbtcaddrs=showbtcaddr,show_age=show_age,show_days=show_days)
# List MMGen addresses and their balances. TODO: move this code to AddrList
def Listaddresses(addrs='',minconf=1,
showempty=False,pager=False,showbtcaddrs=True,all_labels=False,sort=None,show_age=False,show_days=None):
if show_days == None: show_days = False # user-set show_days triggers show_age
else: show_age = True
if sort:
sort = set(sort.split(','))
sort_params = set(['reverse','age'])
if not sort.issubset(sort_params):
die(1,"The sort option takes the following parameters: '{}'".format("','".join(sort_params)))
usr_addr_list = []
if addrs:
a = addrs.rsplit(':',1)
if len(a) != 2:
m = "'{}': invalid address list argument (must be in form <seed ID>:[<type>:]<idx list>)"
die(1,m.format(addrs))
usr_addr_list = [MMGenID('{}:{}'.format(a[0],i)) for i in AddrIdxList(a[1])]
from mmgen.tw import TwAddrList
al = TwAddrList(usr_addr_list,minconf,showempty,showbtcaddrs,all_labels)
if not al:
die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty])
o = al.format(showbtcaddrs,sort,show_age,show_days)
return do_pager(o) if pager else Msg(o)
def Getbalance(minconf=1,quiet=False,return_val=False,pager=False):
from mmgen.tw import TwGetBalance
o = TwGetBalance(minconf,quiet).format()
return o if return_val else do_pager(o) if pager else Msg_r(o)
def Txview(*infiles,**kwargs):
from mmgen.filename import MMGenFileList
pager = 'pager' in kwargs and kwargs['pager']
terse = 'terse' in kwargs and kwargs['terse']
sort_key = kwargs['sort'] if 'sort' in kwargs else 'mtime'
flist = MMGenFileList(infiles,ftype=MMGenTX)
flist.sort_by_age(key=sort_key) # in-place sort
from mmgen.term import get_terminal_size
sep = ''*77+'\n'
out = sep.join([MMGenTX(fn).format_view(terse=terse) for fn in flist.names()])
(Msg,do_pager)[pager](out.rstrip())
def Twview(pager=False,reverse=False,wide=False,minconf=1,sort='age',show_days=True,show_mmid=True):
rpc_init()
from mmgen.tw import TwUnspentOutputs
tw = TwUnspentOutputs(minconf=minconf)
tw.do_sort(sort,reverse=reverse)
tw.show_days = show_days
tw.show_mmid = show_mmid
out = tw.format_for_printing(color=True) if wide else tw.format_for_display()
(Msg_r,do_pager)[pager](out)
def Add_label(mmaddr_or_coin_addr,label):
rpc_init()
from mmgen.tw import TrackingWallet
TrackingWallet(mode='w').add_label(mmaddr_or_coin_addr,label,on_fail='raise')
def Remove_label(mmaddr_or_coin_addr):
Add_label(mmaddr_or_coin_addr,'')
def Remove_address(mmaddr_or_coin_addr):
from mmgen.tw import TrackingWallet
tw = TrackingWallet(mode='w')
ret = tw.remove_address(mmaddr_or_coin_addr)
if ret:
msg("Address '{}' deleted from tracking wallet".format(ret))