Change all eval() calls to ast.literal_eval()

- closes an exploit whereby an infected online MMGen installation could craft
  a special TX file to trick an offline signing MMGen into executing an
  arbitrary expression
- update TX file format (to v3) to permit calling literal_eval() on unmodified
  inputs and outputs data (old v2 file format continues to be supported)
- new TX file conversion script: `scripts/tx-v2-to-v3.py`
- `scripts/tx-old2new.py` modified and renamed to `scripts/tx-v1-to-v3.py`
This commit is contained in:
MMGen 2018-02-17 15:35:45 +03:00
commit 6b9df0ea44
Signed by untrusted user who does not match committer: mmgen
GPG key ID: 62DBE9E5212F05BE
7 changed files with 105 additions and 42 deletions

View file

@ -349,10 +349,12 @@ def check_opts(usr_opts): # Returns false if any check fails
(val,desc,n,sepword))
return False
def opt_compares(val,op,target,desc,what=''):
def opt_compares(val,op_str,target,desc,what=''):
import operator as o
op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str]
if what: what += ' '
if not eval('%s %s %s' % (val, op, target)):
msg('%s: invalid %s (%snot %s %s)' % (val,desc,what,op,target))
if not op_f(val,target):
msg('{}: invalid {} ({}not {} {})'.format(val,desc,what,op_str,target))
return False
return True

View file

@ -158,7 +158,8 @@ def send(addr,amt):
def show_mempool():
p = start_cmd('cli','getrawmempool')
from pprint import pformat
msg(pformat(eval(p.stdout.read())))
from ast import literal_eval
msg(pformat(literal_eval(p.stdout.read())))
p.wait()
def cli(*args):
@ -291,7 +292,8 @@ def generate(blocks=1,silent=False):
wait_for_daemon('ready',silent=True)
p = start_cmd('cli','generate',str(blocks))
out = process_output(p,silent=silent)[0]
if len(eval(out)) != blocks:
from ast import literal_eval
if len(literal_eval(out)) != blocks:
rdie(1,'Error generating blocks')
p.wait()
gmsg('Mined {} block{}'.format(blocks,suf(blocks,'s')))

View file

@ -70,7 +70,7 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
def get_unspent_data(self):
if g.bogus_wallet_data: # for debugging purposes only
us_rpc = eval(get_data_from_file(g.bogus_wallet_data))
us_rpc = eval(get_data_from_file(g.bogus_wallet_data)) # testing, so ok
else:
us_rpc = g.rpch.listunspent(self.minconf)
# write_data_to_file('bogus_unspent.json', repr(us), 'bogus unspent data')

View file

@ -557,6 +557,8 @@ class MMGenTX(MMGenObject):
def format(self):
self.inputs.check_coin_mismatch()
self.outputs.check_coin_mismatch()
def amt_to_str(d):
return dict([(k,str(d[k]) if k == 'amt' else d[k]) for k in d])
lines = [
'{}{} {} {} {} {}{}'.format(
(g.coin+' ','')[g.coin=='BTC'],
@ -568,8 +570,8 @@ class MMGenTX(MMGenObject):
('',' LT={}'.format(self.locktime))[bool(self.locktime)]
),
self.hex,
repr([e.__dict__ for e in self.inputs]),
repr([e.__dict__ for e in self.outputs])
repr([amt_to_str(e.__dict__) for e in self.inputs]),
repr([amt_to_str(e.__dict__) for e in self.outputs])
]
if self.label:
lines.append(baseconv.b58encode(self.label.encode('utf8')))
@ -971,6 +973,20 @@ class MMGenTX(MMGenObject):
tx_data = get_lines_from_file(infile,self.desc+' data',silent=silent_open)
def eval_io_data(raw_data,desc):
from ast import literal_eval
try:
d = literal_eval(raw_data)
except:
if desc == 'inputs' and not silent_open:
ymsg('Warning: transaction data appears to be in old format')
import re
d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
assert type(d) == list,'{} data not a list!'.format(desc)
assert len(d),'no {}!'.format(desc)
for e in d: e['amt'] = g.proto.coin_amt(e['amt'])
return self.decode_io(desc,d)
try:
desc = 'data'
assert len(tx_data) >= 5,'number of lines less than 5'
@ -1017,11 +1033,9 @@ class MMGenTX(MMGenObject):
desc = 'coin type in metadata'
assert self.coin == g.coin,'invalid coin type'
desc = 'inputs data'
self.inputs = self.decode_io('inputs',eval(inputs_data))
assert len(self.inputs),'no inputs!'
desc = '{}-to-MMGen address map data'.format(g.coin)
self.outputs = self.decode_io('outputs',eval(outputs_data))
assert len(self.outputs),'no outputs!'
self.inputs = eval_io_data(inputs_data,'inputs')
desc = 'outputs data'
self.outputs = eval_io_data(outputs_data,'outputs')
except Exception as e:
die(2,'Invalid {} in transaction file: {}'.format(desc,e[0]))

View file

@ -1,25 +1,39 @@
#!/usr/bin/env python
# Convert MMGen 'v1' transaction file (extension '.raw' or '.sig')
# to MMGen 'v3' ('.rawtx' or '.sigtx' + amounts as strings)
import sys,os
repo_root = os.path.split(os.path.abspath(os.path.dirname(sys.argv[0])))[0]
sys.path = [repo_root] + sys.path
from mmgen.common import *
from mmgen.tx import *
opts_data = lambda: {
'desc': "Convert MMGen transaction file from old format to new format",
'desc': "Convert MMGen transaction file from v1 format to v3 format",
'usage': "<tx file>",
'options': """
-h, --help Print this help message
-S, --stdout Write data to STDOUT instead of file
-h, --help Print this help message
-d, --outdir=d Output files to directory 'd' instead of working dir
-q, --quiet Write (and overwrite) files without prompting
-S, --stdout Write data to STDOUT instead of file
"""
}
cmd_args = opts.init(opts_data)
from mmgen.tx import *
if len(cmd_args) != 1: opts.usage()
def parse_tx_file(infile):
from ast import literal_eval
def eval_io_data(raw_data,desc):
import re
d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
assert type(d) == list,'{} data not a list!'.format(desc)
assert len(d),'no {}!'.format(desc)
for e in d: e['amount'] = g.proto.coin_amt(e['amount'])
return d
err_fmt = 'Invalid {} in transaction file'
tx_data = get_lines_from_file(infile)
@ -37,9 +51,9 @@ def parse_tx_file(infile):
err_str = 'hex data'
unhexlify(tx_hex)
err_str = 'inputs data'
inputs = eval(inputs)
inputs = eval_io_data(inputs,'inputs')
err_str = 'btc-to-mmgen address map data'
outputs = eval(outputs)
outputs = literal_eval(outputs)
if comment:
from mmgen.bitcoin import b58decode
comment = b58decode(comment)
@ -53,17 +67,17 @@ def parse_tx_file(infile):
else:
return metadata.split(),tx_hex,inputs,outputs,comment
def find_block_by_time(c,timestamp):
def find_block_by_time(timestamp):
secs = decode_timestamp(timestamp)
block_num = c.getblockcount()
block_num = g.rpch.getblockcount()
# print 'secs:',secs, 'last block:',last_block
top,bot = block_num,0
m = 'Searching for block'
msg_r(m)
for i in range(40):
msg_r('.')
bhash = c.getblockhash(block_num)
block = c.getblock(bhash)
bhash = g.rpch.getblockhash(block_num)
block = g.rpch.getblock(bhash)
# print 'block_num:',block_num, 'mediantime:',block['mediantime'], 'target:',secs
cur_secs = block['mediantime']
if cur_secs > secs:
@ -79,33 +93,33 @@ def find_block_by_time(c,timestamp):
tx = MMGenTX()
metadata,tx.hex,inputs,b2m_map,tx.label = parse_tx_file(cmd_args[0])
metadata,tx.hex,inputs,outputs,tx.label = parse_tx_file(cmd_args[0])
tx.txid,send_amt,tx.timestamp = metadata
tx.send_amt = Decimal(send_amt)
g.testnet = False
g.rpc_host = 'localhost'
c = bitcoin_connection()
rpc_init()
for i in inputs:
if not 'mmid' in i and 'account' in i:
from mmgen.tw import parse_tw_acct_label
a,b = parse_tw_acct_label(i['account'])
if a:
i['mmid'] = a.decode('utf8')
if b: i['comment'] = b.decode('utf8')
lbl = TwLabel(i['account'])
i['mmid'] = lbl.mmid
i['comment'] = lbl.comment
tx.inputs = tx.decode_io_oldfmt(inputs)
tx.inputs = tx.MMGenTxInputList(tx.decode_io_oldfmt(inputs))
if tx.marked_signed(c):
if tx.marked_signed():
msg('Transaction is signed')
dec_tx = c.decoderawtransaction(tx.hex)
tx.outputs = MMGenList(MMGenTX.MMGenTxOutput(addr=i['scriptPubKey']['addresses'][0],amt=i['value'])
for i in dec_tx['vout'])
dec_tx = g.rpch.decoderawtransaction(tx.hex)
tx.outputs = tx.MMGenTxOutputList(
MMGenTX.MMGenTxOutput(addr=i['scriptPubKey']['addresses'][0],
amt=g.proto.coin_amt(i['value']))
for i in dec_tx['vout'])
for e in tx.outputs:
if e.addr in b2m_map:
f = b2m_map[e.addr]
if e.addr in outputs:
f = outputs[e.addr]
e.mmid = f[0]
if f[1]: e.label = f[1].decode('utf8')
else:
@ -114,5 +128,5 @@ for e in tx.outputs:
e.mmid = f.mmid
if f.label: e.label = f.label.decode('utf8')
tx.blockcount = find_block_by_time(c,tx.timestamp)
tx.write_to_file(ask_tty=False)
tx.blockcount = find_block_by_time(tx.timestamp)
tx.write_to_file(ask_tty=False,ask_overwrite=not opt.quiet,ask_write=not opt.quiet)

30
scripts/tx-v2-to-v3.py Executable file
View file

@ -0,0 +1,30 @@
#!/usr/bin/env python
# Convert MMGen 'v2' transaction file (amounts as BTCAmt())
# to MMGen 'v3' (amounts as strings)
# v3 tx files were introduced with MMGen version 0.9.7
import sys,os
repo_root = os.path.split(os.path.abspath(os.path.dirname(sys.argv[0])))[0]
sys.path = [repo_root] + sys.path
from mmgen.common import *
opts_data = lambda: {
'desc': "Convert MMGen transaction file from v2 format to v3 format",
'usage': "<tx file>",
'options': """
-h, --help Print this help message
-d, --outdir=d Output files to directory 'd' instead of working dir
-q, --quiet Write (and overwrite) files without prompting
-S, --stdout Write data to STDOUT instead of file
"""
}
cmd_args = opts.init(opts_data)
from mmgen.tx import *
if len(cmd_args) != 1: opts.usage()
tx = MMGenTX(cmd_args[0],silent_open=True)
tx.write_to_file(ask_tty=False,ask_overwrite=not opt.quiet,ask_write=not opt.quiet)

View file

@ -2421,7 +2421,8 @@ class MMGenTestSuite(object):
def regtest_get_mempool(self,name):
t = MMGenExpect(name,'mmgen-regtest',['show_mempool'])
return eval(t.read())
from ast import literal_eval
return literal_eval(t.read())
def regtest_get_mempool1(self,name):
mp = self.regtest_get_mempool(name)
@ -2681,10 +2682,10 @@ class MMGenTestSuite(object):
with open(fn) as f:
lines = f.read().splitlines()
from mmgen.obj import BTCAmt,LTCAmt,BCHAmt,B2XAmt
tx = {}
from ast import literal_eval
for k,i in (('in',3),('out',4)):
tx[k] = eval(lines[i])
tx[k] = literal_eval(lines[i])
tx[k+'_addrs'] = [i['addr'] for i in tx[k]]
psave = g.proto