tool.py: have tool cmds return their results instead of printing them

This commit is contained in:
The MMGen Project 2019-02-19 08:59:14 +00:00
commit a5ef33136a
Signed by: mmgen
GPG key ID: 3F8B1861E32B7DA2
5 changed files with 143 additions and 114 deletions

View file

@ -350,8 +350,9 @@ Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
chksum_rec_f = lambda foo,e: (str(e.idx), e.addr)
def __init__(self,addrfile='',al_id='',adata=[],seed='',addr_idxs='',src='',
addrlist='',keylist='',mmtype=None,do_chksum=True,chksum_only=False):
addrlist='',keylist='',mmtype=None):
do_chksum = True
self.update_msgs()
mmtype = mmtype or g.proto.dfl_mmtype
assert mmtype in MMGenAddrType.mmtypes,'{}: mmtype not in {}'.format(mmtype,repr(MMGenAddrType.mmtypes))
@ -390,12 +391,9 @@ Removed {{}} duplicate WIF key{{}} from keylist (also in {pnm} key-address file
if do_chksum:
self.chksum = AddrListChksum(self)
if chksum_only:
Msg(self.chksum)
else:
qmsg('Checksum for {} data {}: {}'.format(
self.data_desc,self.id_str.hl(),self.chksum.hl()))
qmsg(self.msgs[('check_chksum','record_chksum')[src=='gen']])
qmsg('Checksum for {} data {}: {}'.format(
self.data_desc,self.id_str.hl(),self.chksum.hl()))
qmsg(self.msgs[('check_chksum','record_chksum')[src=='gen']])
def update_msgs(self):
self.msgs = AddrList.msgs
@ -778,8 +776,9 @@ Record this checksum: it will be used to verify the password file in the future
}
chksum_rec_f = lambda foo,e: (str(e.idx), e.passwd)
def __init__(self,infile=None,seed=None,pw_idxs=None,pw_id_str=None,pw_len=None,pw_fmt=None,
chksum_only=False,chk_params_only=False):
def __init__( self,infile=None,seed=None,
pw_idxs=None,pw_id_str=None,pw_len=None,pw_fmt=None,
chk_params_only=False):
self.update_msgs()
@ -799,13 +798,10 @@ Record this checksum: it will be used to verify the password file in the future
self.fmt_data = ''
self.chksum = AddrListChksum(self)
if chksum_only:
Msg(self.chksum)
else:
fs = '{}-{}-{}-{}[{{}}]'.format(self.al_id.sid,self.pw_id_str,self.pw_fmt,self.pw_len)
self.id_str = AddrListIDStr(self,fs)
qmsg('Checksum for {} data {}: {}'.format(self.data_desc,self.id_str.hl(),self.chksum.hl()))
qmsg(self.msgs[('record_chksum','check_chksum')[bool(infile)]])
fs = '{}-{}-{}-{}[{{}}]'.format(self.al_id.sid,self.pw_id_str,self.pw_fmt,self.pw_len)
self.id_str = AddrListIDStr(self,fs)
qmsg('Checksum for {} data {}: {}'.format(self.data_desc,self.id_str.hl(),self.chksum.hl()))
qmsg(self.msgs[('record_chksum','check_chksum')[bool(infile)]])
def set_pw_fmt(self,pw_fmt):
assert pw_fmt in self.pw_info

View file

@ -155,5 +155,7 @@ if cmd not in dir(tc):
die(1,"'{}': no such command".format(cmd))
args,kwargs = tool._process_args(cmd,cmd_args)
ret = getattr(tc,cmd)(*args,**kwargs)
sys.exit((1,0)[ret in (None,True)]) # some commands die, some return False on failure
tool._print_result(ret,'pager' in kwargs and kwargs['pager'])

View file

@ -209,24 +209,38 @@ def _process_args(cmd,cmd_args):
return args,kwargs
# Individual cmd_data
def _get_result(ret): # returns a string or string subclass
if issubclass(type(ret),str):
return ret
elif type(ret) == tuple:
return '\n'.join([r.decode() if issubclass(type(r),bytes) else r for r in ret])
elif issubclass(type(ret),bytes):
try: return ret.decode()
except: return repr(ret)
elif ret == True:
return ''
elif ret in (False,None):
ydie(1,"tool command returned '{}'".format(ret))
else:
ydie(1,"tool.py: can't handle return value of type '{}'".format(type(ret).__name__))
def _print_convert_results(indata,enc,dec,dtype):
def are_equal(a,b,dtype=''):
if dtype == 'str': return a.lstrip('\0') == b.lstrip('\0')
if dtype == 'hex': return a.lstrip('0') == b.lstrip('0')
if dtype == 'b58': return a.lstrip('1') == b.lstrip('1')
else: return a == b
error = (True,False)[are_equal(indata,dec,dtype)]
if error or opt.verbose:
Msg('Input: {}'.format(indata))
Msg('Encoded data: {}'.format(enc))
Msg('Recoded data: {}'.format(dec))
else: Msg(enc.decode())
if error:
die(3,"Error! Recoded data doesn't match input!")
def _print_result(ret,pager):
if issubclass(type(ret),str):
do_pager(ret) if pager else Msg(ret)
elif type(ret) == tuple:
o = '\n'.join([r.decode() if issubclass(type(r),bytes) else r for r in ret])
do_pager(o) if pager else Msg(o)
elif issubclass(type(ret),bytes):
try:
o = ret.decode()
do_pager(o) if pager else Msg(o)
except: os.write(1,ret)
elif ret == True:
pass
elif ret in (False,None):
ydie(1,"tool command returned '{}'".format(ret))
else:
ydie(1,"tool.py: can't handle return value of type '{}'".format(type(ret).__name__))
from mmgen.obj import MMGenAddrType
at = MMGenAddrType((hasattr(opt,'type') and opt.type) or g.proto.dfl_mmtype)
@ -244,167 +258,183 @@ class MMGenToolCmd(object):
_usage(cmd,exit_val=0)
def hexdump(self,infile,cols=8,line_nums=True):
Msg(pretty_hexdump(
return pretty_hexdump(
get_data_from_file(infile,dash=True,silent=True,binary=True),
cols=cols,line_nums=line_nums))
cols=cols,line_nums=line_nums)
def unhexdump(self,infile):
if g.platform == 'win':
import msvcrt
msvcrt.setmode(sys.stdout.fileno(),os.O_BINARY)
hexdata = get_data_from_file(infile,dash=True,silent=True)
ret = decode_pretty_hexdump(hexdata)
os.write(g.stdout_fileno,ret)
return decode_pretty_hexdump(hexdata)
def b58randenc(self):
r = get_random(32)
enc = baseconv.b58encode(r,pad=True)
dec = baseconv.b58decode(enc,pad=True)
_print_convert_results(r,enc.encode(),dec,'bytes')
return baseconv.b58encode(r,pad=True)
def randhex(self,nbytes='32'):
Msg(binascii.hexlify(get_random(int(nbytes))).decode())
return binascii.hexlify(get_random(int(nbytes)))
def randwif(self):
Msg(PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed).wif)
return PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed).wif
def randpair(self):
privhex = PrivKey(get_random(32),pubkey_type=at.pubkey_type,compressed=at.compressed)
addr = ag.to_addr(kg.to_pubhex(privhex))
Vmsg('Key (hex): {}'.format(privhex))
Vmsg_r('Key (WIF): '); Msg(privhex.wif)
Vmsg_r('Addr: '); Msg(addr)
return (privhex.wif,addr)
def wif2addr(self,wif):
privhex = PrivKey(wif=wif)
addr = ag.to_addr(kg.to_pubhex(privhex))
Vmsg_r('Addr: '); Msg(addr)
return addr
def wif2segwit_pair(self,wif):
pubhex = kg.to_pubhex(PrivKey(wif=wif))
addr = ag.to_addr(pubhex)
rs = ag.to_segwit_redeem_script(pubhex)
Msg('{}\n{}'.format(rs.decode(),addr))
return (rs,addr)
def pubhash2addr(self,pubhash):
if opt.type == 'bech32':
ret = g.proto.pubhash2bech32addr(pubhash.encode())
return g.proto.pubhash2bech32addr(pubhash.encode())
else:
ret = g.proto.pubhash2addr(pubhash.encode(),at.addr_fmt=='p2sh')
Msg(ret)
return g.proto.pubhash2addr(pubhash.encode(),at.addr_fmt=='p2sh')
def addr2hexaddr(self,addr):
Msg(g.proto.verify_addr(addr,CoinAddr.hex_width,return_dict=True)['hex'].decode())
return g.proto.verify_addr(addr,CoinAddr.hex_width,return_dict=True)['hex']
def hash160(self,pubkeyhex):
Msg(hash160(pubkeyhex).decode())
return hash160(pubkeyhex)
def pubhex2addr(self,pubkeyhex):
self.pubhash2addr(hash160(pubkeyhex.encode()).decode())
return self.pubhash2addr(hash160(pubkeyhex.encode()).decode())
def wif2hex(self,wif):
Msg(PrivKey(wif=wif).decode())
return PrivKey(wif=wif)
def hex2wif(self,hexpriv):
Msg(g.proto.hex2wif(hexpriv.encode(),pubkey_type=at.pubkey_type,compressed=at.compressed))
return g.proto.hex2wif(hexpriv.encode(),pubkey_type=at.pubkey_type,compressed=at.compressed)
def privhex2addr(self,privhex,output_pubhex=False):
pk = PrivKey(binascii.unhexlify(privhex),compressed=at.compressed,pubkey_type=at.pubkey_type)
ph = kg.to_pubhex(pk)
Msg(ph.decode() if output_pubhex else ag.to_addr(ph))
return ph if output_pubhex else ag.to_addr(ph)
def privhex2pubhex(self,privhex): # new
self.privhex2addr(privhex,output_pubhex=True)
return self.privhex2addr(privhex,output_pubhex=True)
def pubhex2redeem_script(self,pubhex): # new
Msg(g.proto.pubhex2redeem_script(pubhex).decode())
return g.proto.pubhex2redeem_script(pubhex)
def wif2redeem_script(self,wif): # new
privhex = PrivKey(wif=wif)
Msg(ag.to_segwit_redeem_script(kg.to_pubhex(privhex)).decode())
return ag.to_segwit_redeem_script(kg.to_pubhex(privhex))
def do_random_mn(self,nbytes,wordlist):
hexrand = binascii.hexlify(get_random(nbytes))
Vmsg('Seed: {}'.format(hexrand))
for wl_id in ([wordlist],wordlists)[wordlist=='all']:
if wordlist == 'all':
if wordlist == 'all': # TODO
Msg('{} mnemonic:'.format(capfirst(wl_id)))
mn = baseconv.fromhex(hexrand,wl_id)
Msg(' '.join(mn))
return ' '.join(mn)
def mn_rand128(self,wordlist=dfl_wl_id): self.do_random_mn(16,wordlist)
def mn_rand192(self,wordlist=dfl_wl_id): self.do_random_mn(24,wordlist)
def mn_rand256(self,wordlist=dfl_wl_id): self.do_random_mn(32,wordlist)
def mn_rand128(self,wordlist=dfl_wl_id):
return self.do_random_mn(16,wordlist)
def hex2mn(self,s,wordlist=dfl_wl_id): Msg(' '.join(baseconv.fromhex(s.encode(),wordlist)))
def mn2hex(self,s,wordlist=dfl_wl_id): Msg(baseconv.tohex(s.split(),wordlist))
def mn_rand192(self,wordlist=dfl_wl_id):
return self.do_random_mn(24,wordlist)
def mn_rand256(self,wordlist=dfl_wl_id):
return self.do_random_mn(32,wordlist)
def hex2mn(self,s,wordlist=dfl_wl_id):
return ' '.join(baseconv.fromhex(s.encode(),wordlist))
def mn2hex(self,s,wordlist=dfl_wl_id):
return baseconv.tohex(s.split(),wordlist)
def strtob58(self,s,pad=None):
return baseconv.fromhex(binascii.hexlify(s.encode()),'b58',pad,tostr=True)
def hextob58(self,s,pad=None):
return baseconv.fromhex(s.encode(),'b58',pad,tostr=True)
def strtob58(self,s,pad=None): Msg(baseconv.fromhex(binascii.hexlify(s.encode()),'b58',pad,tostr=True))
def hextob58(self,s,pad=None): Msg(baseconv.fromhex(s.encode(),'b58',pad,tostr=True))
def hextob58chk(self,s):
from mmgen.protocol import _b58chk_encode
Msg(_b58chk_encode(s.encode()))
def hextob32(self,s,pad=None): Msg(baseconv.fromhex(s.encode(),'b32',pad,tostr=True))
return _b58chk_encode(s.encode())
def hextob32(self,s,pad=None):
return baseconv.fromhex(s.encode(),'b32',pad,tostr=True)
def b58tostr(self,s):
return binascii.unhexlify(baseconv.tohex(s,'b58'))
def b58tohex(self,s,pad=None):
return baseconv.tohex(s,'b58',pad)
def b58tostr(self,s): Msg(binascii.unhexlify(baseconv.tohex(s,'b58')).decode())
def b58tohex(self,s,pad=None): Msg(baseconv.tohex(s,'b58',pad))
def b58chktohex(self,s):
from mmgen.protocol import _b58chk_decode
Msg(_b58chk_decode(s).decode())
def b32tohex(self,s,pad=None): Msg(baseconv.tohex(s.upper(),'b32',pad))
return _b58chk_decode(s)
def b32tohex(self,s,pad=None):
return baseconv.tohex(s.upper(),'b32',pad)
def mn_stats(self,wordlist=dfl_wl_id):
wordlist in baseconv.digits or die(1,"'{}': not a valid wordlist".format(wordlist))
baseconv.check_wordlist(wordlist)
return True
def mn_printlist(self,wordlist=dfl_wl_id):
wordlist in baseconv.digits or die(1,"'{}': not a valid wordlist".format(wordlist))
Msg('\n'.join(baseconv.digits[wordlist]))
return '\n'.join(baseconv.digits[wordlist])
def id8(self,infile):
Msg(make_chksum_8(
get_data_from_file(infile,dash=True,silent=True,binary=True)
))
return make_chksum_8(
get_data_from_file(infile,dash=True,silent=True,binary=True))
def id6(self,infile):
Msg(make_chksum_6(
get_data_from_file(infile,dash=True,silent=True,binary=True)
))
return make_chksum_6(
get_data_from_file(infile,dash=True,silent=True,binary=True))
def str2id6(self,s): # retain ignoring of space for backwards compat
Msg(make_chksum_6(''.join(s.split())))
return make_chksum_6(''.join(s.split()))
def addrfile_chksum(self,infile,mmtype=''):
from mmgen.addr import AddrList
mmtype = None if not mmtype else MMGenAddrType(mmtype)
AddrList(infile,chksum_only=True,mmtype=mmtype)
return AddrList(infile,mmtype=mmtype).chksum
def keyaddrfile_chksum(self,infile,mmtype=''):
from mmgen.addr import KeyAddrList
mmtype = None if not mmtype else MMGenAddrType(mmtype)
KeyAddrList(infile,chksum_only=True,mmtype=mmtype)
return KeyAddrList(infile,mmtype=mmtype).chksum
def passwdfile_chksum(self,infile):
from mmgen.addr import PasswordList
PasswordList(infile=infile,chksum_only=True)
return PasswordList(infile=infile).chksum
def hexreverse(self,s):
Msg(binascii.hexlify(binascii.unhexlify(s.strip())[::-1]).decode())
return binascii.hexlify(binascii.unhexlify(s.strip())[::-1])
def hexlify(self,s):
Msg(binascii.hexlify(s.encode()).decode())
return binascii.hexlify(s.encode())
def hash256(self,s,file_input=False,hex_input=False):
from hashlib import sha256
if file_input: b = get_data_from_file(s,binary=True)
elif hex_input: b = decode_pretty_hexdump(s)
else: b = s
Msg(sha256(sha256(b.encode()).digest()).hexdigest())
return sha256(sha256(b.encode()).digest()).hexdigest()
def encrypt(self,infile,outfile='',hash_preset=''):
data = get_data_from_file(infile,'data for encryption',binary=True)
enc_d = mmgen_encrypt(data,'user data',hash_preset)
if not outfile:
outfile = '{}.{}'.format(os.path.basename(infile),g.mmenc_ext)
write_data_to_file(outfile,enc_d,'encrypted data',binary=True)
return True
def decrypt(self,infile,outfile='',hash_preset=''):
enc_d = get_data_from_file(infile,'encrypted data',binary=True)
@ -412,13 +442,12 @@ class MMGenToolCmd(object):
dec_d = mmgen_decrypt(enc_d,'user data',hash_preset)
if dec_d: break
msg('Trying again...')
if not outfile:
o = os.path.basename(infile)
outfile = remove_extension(o,g.mmenc_ext)
if outfile == o: outfile += '.dec'
write_data_to_file(outfile,dec_d,'decrypted data',binary=True)
return True
def find_incog_data(self,filename,iv_id,keep_searching=False):
ivsize,bsize,mod = g.aesctr_iv_len,4096,4096*8
@ -444,6 +473,7 @@ class MMGenToolCmd(object):
msg('')
os.close(f)
return True
def rand2file(self,outfile,nbytes,threads=4,silent=False):
nbytes = parse_nbytes(nbytes)
@ -501,14 +531,16 @@ class MMGenToolCmd(object):
q1.join()
q2.join()
f.close()
return True
def bytespec(self,s): Msg(str(parse_nbytes(s)))
def bytespec(self,s):
return str(parse_nbytes(s))
def keyaddrlist2monerowallets(self,infile,blockheight=None,addrs=None):
monero_wallet_ops(infile=infile,op='create',blockheight=blockheight,addrs=addrs)
return self.monero_wallet_ops(infile=infile,op='create',blockheight=blockheight,addrs=addrs)
def syncmonerowallets(self,infile,addrs=None):
monero_wallet_ops(infile=infile,op='sync',addrs=addrs)
return self.monero_wallet_ops(infile=infile,op='sync',addrs=addrs)
def monero_wallet_ops(self,infile,op,blockheight=None,addrs=None):
@ -665,9 +697,11 @@ class MMGenToolCmd(object):
except:
rdie(1,'Error: {!r}'.format(e.args[0]))
return True
# ================ RPC commands ================== #
def gen_addr(self,addr,wallet='',target='addr',return_result=False):
def gen_addr(self,addr,wallet='',target='addr'):
addr = MMGenID(addr)
sf = get_seed_file([wallet] if wallet else [],1)
opt.quiet = True
@ -676,11 +710,10 @@ class MMGenToolCmd(object):
if ss.seed.sid != addr.sid:
m = 'Seed ID of requested address ({}) does not match wallet ({})'
die(1,m.format(addr.sid,ss.seed.sid))
al = AddrList(seed=ss.seed,addr_idxs=AddrIdxList(str(addr.idx)),mmtype=addr.mmtype,do_chksum=False)
al = AddrList(seed=ss.seed,addr_idxs=AddrIdxList(str(addr.idx)),mmtype=addr.mmtype)
d = al.data[0]
ret = d.sec.wif if target=='wif' else d.addr
if return_result: return ret
else: Msg(ret)
return ret
def gen_key(self,addr,wallet=''):
return self.gen_addr(addr,wallet,target='wif')
@ -713,25 +746,21 @@ class MMGenToolCmd(object):
al = TwAddrList(usr_addr_list,minconf,showempty,showbtcaddrs,all_labels)
if not al:
die(0,('No tracked addresses with balances!','No tracked addresses!')[showempty])
o = al.format(showbtcaddrs,sort,show_age,show_days)
return do_pager(o) if pager else Msg(o)
return al.format(showbtcaddrs,sort,show_age,show_days)
def getbalance(self,minconf=1,quiet=False,return_val=False,pager=False):
def getbalance(self,minconf=1,quiet=False,pager=False):
from mmgen.tw import TwGetBalance
o = TwGetBalance(minconf,quiet).format()
return o if return_val else do_pager(o) if pager else Msg_r(o)
return TwGetBalance(minconf,quiet).format()
def txview(self,*infiles,**kwargs):
from mmgen.filename import MMGenFileList
pager = 'pager' in kwargs and kwargs['pager']
terse = 'terse' in kwargs and kwargs['terse']
sort_key = kwargs['sort'] if 'sort' in kwargs else 'mtime'
flist = MMGenFileList(infiles,ftype=MMGenTX)
flist.sort_by_age(key=sort_key) # in-place sort
from mmgen.term import get_terminal_size
sep = ''*77+'\n'
out = sep.join([MMGenTX(fn).format_view(terse=terse) for fn in flist.names()])
(Msg,do_pager)[pager](out.rstrip())
return sep.join([MMGenTX(fn).format_view(terse=terse) for fn in flist.names()]).rstrip()
def twview(self,pager=False,reverse=False,wide=False,minconf=1,sort='age',show_days=True,show_mmid=True):
rpc_init()
@ -740,20 +769,22 @@ class MMGenToolCmd(object):
tw.do_sort(sort,reverse=reverse)
tw.show_days = show_days
tw.show_mmid = show_mmid
out = tw.format_for_printing(color=True) if wide else tw.format_for_display()
(Msg_r,do_pager)[pager](out)
return tw.format_for_printing(color=True) if wide else tw.format_for_display()
def add_label(self,mmaddr_or_coin_addr,label):
rpc_init()
from mmgen.tw import TrackingWallet
TrackingWallet(mode='w').add_label(mmaddr_or_coin_addr,label,on_fail='raise')
return True
def remove_label(self,mmaddr_or_coin_addr):
self.add_label(mmaddr_or_coin_addr,'')
return True
def remove_address(self,mmaddr_or_coin_addr):
from mmgen.tw import TrackingWallet
tw = TrackingWallet(mode='w')
ret = tw.remove_address(mmaddr_or_coin_addr)
ret = tw.remove_address(mmaddr_or_coin_addr) # returns None on failure
if ret:
msg("Address '{}' deleted from tracking wallet".format(ret))
return ret

View file

@ -71,7 +71,7 @@ def generate_kals_for_mmgen_addrs(need_keys,infiles,saved_seeds):
idx_list = [i.idx for i in mmids if i.sid == sid and i.mmtype == t]
if idx_list:
addr_idxs = AddrIdxList(idx_list=idx_list)
d.append(KeyAddrList(seed=seed,addr_idxs=addr_idxs,do_chksum=False,mmtype=MMGenAddrType(t)))
d.append(KeyAddrList(seed=seed,addr_idxs=addr_idxs,mmtype=MMGenAddrType(t)))
return d
def add_keys(tx,src,infiles=None,saved_seeds=None,keyaddr_list=None):
@ -139,7 +139,7 @@ def txsign(tx,seed_files,kl,kal,tx_num_str=''):
if non_mm_addrs:
if not kl:
die(2,'Transaction has non-{} inputs, but no flat key list is present'.format(g.proj_name))
tmp = KeyAddrList(addrlist=non_mm_addrs,do_chksum=False)
tmp = KeyAddrList(addrlist=non_mm_addrs)
tmp.add_wifs(kl)
m = tmp.list_missing('sec')
if m: die(2,wmsg['missing_keys_error'].format(suf(m,'es'),'\n '.join(m)))

View file

@ -497,7 +497,7 @@ def get_seed_file(cmd_args,nargs,invoked_as=None):
elif len(cmd_args) > nargs:
opts.usage()
elif len(cmd_args) == nargs and wf and invoked_as != 'gen':
msg('Warning: overriding default wallet with user-supplied wallet')
qmsg('Warning: overriding default wallet with user-supplied wallet')
if cmd_args or wf:
check_infile(cmd_args[0] if cmd_args else wf)