Browse Source

Change all eval() calls to ast.literal_eval()
- closes an exploit whereby an infected online MMGen installation could craft
a special TX file to trick an offline signing MMGen into executing an
arbitrary expression
- update TX file format (to v3) to permit calling literal_eval() on unmodified
inputs and outputs data (old v2 file format continues to be supported)
- new TX file conversion script: `scripts/tx-v2-to-v3.py`
- `scripts/tx-old2new.py` modified and renamed to `scripts/tx-v1-to-v3.py`

MMGen 7 years ago
parent
commit
6b9df0e
7 changed files with 105 additions and 42 deletions
  1. 5 3
      mmgen/opts.py
  2. 4 2
      mmgen/regtest.py
  3. 1 1
      mmgen/tw.py
  4. 21 7
      mmgen/tx.py
  5. 40 26
      scripts/tx-v1-to-v3.py
  6. 30 0
      scripts/tx-v2-to-v3.py
  7. 4 3
      test/test.py

+ 5 - 3
mmgen/opts.py

@@ -349,10 +349,12 @@ def check_opts(usr_opts):       # Returns false if any check fails
 					(val,desc,n,sepword))
 			return False
 
-	def opt_compares(val,op,target,desc,what=''):
+	def opt_compares(val,op_str,target,desc,what=''):
+		import operator as o
+		op_f = { '<':o.lt, '<=':o.le, '>':o.gt, '>=':o.ge, '=':o.eq }[op_str]
 		if what: what += ' '
-		if not eval('%s %s %s' % (val, op, target)):
-			msg('%s: invalid %s (%snot %s %s)' % (val,desc,what,op,target))
+		if not op_f(val,target):
+			msg('{}: invalid {} ({}not {} {})'.format(val,desc,what,op_str,target))
 			return False
 		return True
 

+ 4 - 2
mmgen/regtest.py

@@ -158,7 +158,8 @@ def send(addr,amt):
 def show_mempool():
 	p = start_cmd('cli','getrawmempool')
 	from pprint import pformat
-	msg(pformat(eval(p.stdout.read())))
+	from ast import literal_eval
+	msg(pformat(literal_eval(p.stdout.read())))
 	p.wait()
 
 def cli(*args):
@@ -291,7 +292,8 @@ def generate(blocks=1,silent=False):
 	wait_for_daemon('ready',silent=True)
 	p = start_cmd('cli','generate',str(blocks))
 	out = process_output(p,silent=silent)[0]
-	if len(eval(out)) != blocks:
+	from ast import literal_eval
+	if len(literal_eval(out)) != blocks:
 		rdie(1,'Error generating blocks')
 	p.wait()
 	gmsg('Mined {} block{}'.format(blocks,suf(blocks,'s')))

+ 1 - 1
mmgen/tw.py

@@ -70,7 +70,7 @@ watch-only wallet using '{}-addrimport' and then re-run this program.
 
 	def get_unspent_data(self):
 		if g.bogus_wallet_data: # for debugging purposes only
-			us_rpc = eval(get_data_from_file(g.bogus_wallet_data))
+			us_rpc = eval(get_data_from_file(g.bogus_wallet_data)) # testing, so ok
 		else:
 			us_rpc = g.rpch.listunspent(self.minconf)
 #		write_data_to_file('bogus_unspent.json', repr(us), 'bogus unspent data')

+ 21 - 7
mmgen/tx.py

@@ -557,6 +557,8 @@ class MMGenTX(MMGenObject):
 	def format(self):
 		self.inputs.check_coin_mismatch()
 		self.outputs.check_coin_mismatch()
+		def amt_to_str(d):
+			return dict([(k,str(d[k]) if k == 'amt' else d[k]) for k in d])
 		lines = [
 			'{}{} {} {} {} {}{}'.format(
 				(g.coin+' ','')[g.coin=='BTC'],
@@ -568,8 +570,8 @@ class MMGenTX(MMGenObject):
 				('',' LT={}'.format(self.locktime))[bool(self.locktime)]
 			),
 			self.hex,
-			repr([e.__dict__ for e in self.inputs]),
-			repr([e.__dict__ for e in self.outputs])
+			repr([amt_to_str(e.__dict__) for e in self.inputs]),
+			repr([amt_to_str(e.__dict__) for e in self.outputs])
 		]
 		if self.label:
 			lines.append(baseconv.b58encode(self.label.encode('utf8')))
@@ -971,6 +973,20 @@ class MMGenTX(MMGenObject):
 
 		tx_data = get_lines_from_file(infile,self.desc+' data',silent=silent_open)
 
+		def eval_io_data(raw_data,desc):
+			from ast import literal_eval
+			try:
+				d = literal_eval(raw_data)
+			except:
+				if desc == 'inputs' and not silent_open:
+					ymsg('Warning: transaction data appears to be in old format')
+				import re
+				d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
+			assert type(d) == list,'{} data not a list!'.format(desc)
+			assert len(d),'no {}!'.format(desc)
+			for e in d: e['amt'] = g.proto.coin_amt(e['amt'])
+			return self.decode_io(desc,d)
+
 		try:
 			desc = 'data'
 			assert len(tx_data) >= 5,'number of lines less than 5'
@@ -1017,11 +1033,9 @@ class MMGenTX(MMGenObject):
 			desc = 'coin type in metadata'
 			assert self.coin == g.coin,'invalid coin type'
 			desc = 'inputs data'
-			self.inputs = self.decode_io('inputs',eval(inputs_data))
-			assert len(self.inputs),'no inputs!'
-			desc = '{}-to-MMGen address map data'.format(g.coin)
-			self.outputs = self.decode_io('outputs',eval(outputs_data))
-			assert len(self.outputs),'no outputs!'
+			self.inputs  = eval_io_data(inputs_data,'inputs')
+			desc = 'outputs data'
+			self.outputs = eval_io_data(outputs_data,'outputs')
 		except Exception as e:
 			die(2,'Invalid {} in transaction file: {}'.format(desc,e[0]))
 

+ 40 - 26
scripts/tx-old2new.py → scripts/tx-v1-to-v3.py

@@ -1,25 +1,39 @@
 #!/usr/bin/env python
+# Convert MMGen 'v1' transaction file (extension '.raw' or '.sig')
+# to MMGen 'v3' ('.rawtx' or '.sigtx' + amounts as strings)
 
 import sys,os
 repo_root = os.path.split(os.path.abspath(os.path.dirname(sys.argv[0])))[0]
 sys.path = [repo_root] + sys.path
 
 from mmgen.common import *
-from mmgen.tx import *
 
 opts_data = lambda: {
-	'desc':    "Convert MMGen transaction file from old format to new format",
+	'desc':    "Convert MMGen transaction file from v1 format to v3 format",
 	'usage':   "<tx file>",
 	'options': """
--h, --help    Print this help message
--S, --stdout  Write data to STDOUT instead of file
+-h, --help     Print this help message
+-d, --outdir=d Output files to directory 'd' instead of working dir
+-q, --quiet    Write (and overwrite) files without prompting
+-S, --stdout   Write data to STDOUT instead of file
 """
 }
 
 cmd_args = opts.init(opts_data)
 
+from mmgen.tx import *
+
 if len(cmd_args) != 1: opts.usage()
 def parse_tx_file(infile):
+	from ast import literal_eval
+
+	def eval_io_data(raw_data,desc):
+		import re
+		d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
+		assert type(d) == list,'{} data not a list!'.format(desc)
+		assert len(d),'no {}!'.format(desc)
+		for e in d: e['amount'] = g.proto.coin_amt(e['amount'])
+		return d
 
 	err_fmt = 'Invalid {} in transaction file'
 	tx_data = get_lines_from_file(infile)
@@ -37,9 +51,9 @@ def parse_tx_file(infile):
 		err_str = 'hex data'
 		unhexlify(tx_hex)
 		err_str = 'inputs data'
-		inputs = eval(inputs)
+		inputs = eval_io_data(inputs,'inputs')
 		err_str = 'btc-to-mmgen address map data'
-		outputs = eval(outputs)
+		outputs = literal_eval(outputs)
 		if comment:
 			from mmgen.bitcoin import b58decode
 			comment = b58decode(comment)
@@ -53,17 +67,17 @@ def parse_tx_file(infile):
 	else:
 		return metadata.split(),tx_hex,inputs,outputs,comment
 
-def find_block_by_time(c,timestamp):
+def find_block_by_time(timestamp):
 	secs = decode_timestamp(timestamp)
-	block_num = c.getblockcount()
+	block_num = g.rpch.getblockcount()
 #	print 'secs:',secs, 'last block:',last_block
 	top,bot = block_num,0
 	m = 'Searching for block'
 	msg_r(m)
 	for i in range(40):
 		msg_r('.')
-		bhash = c.getblockhash(block_num)
-		block = c.getblock(bhash)
+		bhash = g.rpch.getblockhash(block_num)
+		block = g.rpch.getblock(bhash)
 #		print 'block_num:',block_num, 'mediantime:',block['mediantime'], 'target:',secs
 		cur_secs = block['mediantime']
 		if cur_secs > secs:
@@ -79,33 +93,33 @@ def find_block_by_time(c,timestamp):
 
 tx = MMGenTX()
 
-metadata,tx.hex,inputs,b2m_map,tx.label = parse_tx_file(cmd_args[0])
+metadata,tx.hex,inputs,outputs,tx.label = parse_tx_file(cmd_args[0])
 tx.txid,send_amt,tx.timestamp = metadata
 tx.send_amt = Decimal(send_amt)
 
 g.testnet = False
 g.rpc_host = 'localhost'
-c = bitcoin_connection()
+rpc_init()
 
 for i in inputs:
 	if not 'mmid' in i and 'account' in i:
-		from mmgen.tw import parse_tw_acct_label
-		a,b = parse_tw_acct_label(i['account'])
-		if a:
-			i['mmid'] = a.decode('utf8')
-			if b: i['comment'] = b.decode('utf8')
+		lbl = TwLabel(i['account'])
+		i['mmid'] = lbl.mmid
+		i['comment'] = lbl.comment
 
-tx.inputs = tx.decode_io_oldfmt(inputs)
+tx.inputs = tx.MMGenTxInputList(tx.decode_io_oldfmt(inputs))
 
-if tx.marked_signed(c):
+if tx.marked_signed():
 	msg('Transaction is signed')
 
-dec_tx = c.decoderawtransaction(tx.hex)
-tx.outputs = MMGenList(MMGenTX.MMGenTxOutput(addr=i['scriptPubKey']['addresses'][0],amt=i['value'])
-				for i in dec_tx['vout'])
+dec_tx = g.rpch.decoderawtransaction(tx.hex)
+tx.outputs = tx.MMGenTxOutputList(
+				MMGenTX.MMGenTxOutput(addr=i['scriptPubKey']['addresses'][0],
+						amt=g.proto.coin_amt(i['value']))
+			for i in dec_tx['vout'])
 for e in tx.outputs:
-	if e.addr in b2m_map:
-		f = b2m_map[e.addr]
+	if e.addr in outputs:
+		f = outputs[e.addr]
 		e.mmid = f[0]
 		if f[1]: e.label = f[1].decode('utf8')
 	else:
@@ -114,5 +128,5 @@ for e in tx.outputs:
 				e.mmid = f.mmid
 				if f.label: e.label = f.label.decode('utf8')
 
-tx.blockcount = find_block_by_time(c,tx.timestamp)
-tx.write_to_file(ask_tty=False)
+tx.blockcount = find_block_by_time(tx.timestamp)
+tx.write_to_file(ask_tty=False,ask_overwrite=not opt.quiet,ask_write=not opt.quiet)

+ 30 - 0
scripts/tx-v2-to-v3.py

@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# Convert MMGen 'v2' transaction file (amounts as BTCAmt())
+# to MMGen 'v3' (amounts as strings)
+# v3 tx files were introduced with MMGen version 0.9.7
+
+import sys,os
+repo_root = os.path.split(os.path.abspath(os.path.dirname(sys.argv[0])))[0]
+sys.path = [repo_root] + sys.path
+
+from mmgen.common import *
+
+opts_data = lambda: {
+	'desc':    "Convert MMGen transaction file from v2 format to v3 format",
+	'usage':   "<tx file>",
+	'options': """
+-h, --help     Print this help message
+-d, --outdir=d Output files to directory 'd' instead of working dir
+-q, --quiet    Write (and overwrite) files without prompting
+-S, --stdout   Write data to STDOUT instead of file
+"""
+}
+
+cmd_args = opts.init(opts_data)
+
+from mmgen.tx import *
+
+if len(cmd_args) != 1: opts.usage()
+
+tx = MMGenTX(cmd_args[0],silent_open=True)
+tx.write_to_file(ask_tty=False,ask_overwrite=not opt.quiet,ask_write=not opt.quiet)

+ 4 - 3
test/test.py

@@ -2421,7 +2421,8 @@ class MMGenTestSuite(object):
 
 	def regtest_get_mempool(self,name):
 		t = MMGenExpect(name,'mmgen-regtest',['show_mempool'])
-		return eval(t.read())
+		from ast import literal_eval
+		return literal_eval(t.read())
 
 	def regtest_get_mempool1(self,name):
 		mp = self.regtest_get_mempool(name)
@@ -2681,10 +2682,10 @@ class MMGenTestSuite(object):
 		with open(fn) as f:
 			lines = f.read().splitlines()
 
-		from mmgen.obj import BTCAmt,LTCAmt,BCHAmt,B2XAmt
 		tx = {}
+		from ast import literal_eval
 		for k,i in (('in',3),('out',4)):
-			tx[k] = eval(lines[i])
+			tx[k] = literal_eval(lines[i])
 			tx[k+'_addrs'] = [i['addr'] for i in tx[k]]
 
 		psave = g.proto