file.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. tx.file: Transaction file operations for the MMGen suite
  20. """
  21. import os, json
  22. from ..util import ymsg, make_chksum_6, die
  23. from ..obj import MMGenObject, HexStr, MMGenTxID, CoinTxID, MMGenTxComment
  24. def get_monero_proto(tx, data):
  25. from ..protocol import init_proto
  26. return init_proto(tx.cfg, 'XMR', network=data['MoneroMMGenTX']['data']['network'])
  27. class txdata_json_encoder(json.JSONEncoder):
  28. def default(self, o):
  29. if type(o).__name__.endswith('Amt'):
  30. return str(o)
  31. elif type(o).__name__ == 'OpReturnData':
  32. return repr(o)
  33. else:
  34. return json.JSONEncoder.default(self, o)
  35. def json_dumps(data):
  36. return json.dumps(data, separators = (',', ':'), cls=txdata_json_encoder)
  37. def get_proto_from_coin_id(tx, coin_id, chain):
  38. coin, tokensym = coin_id.split(':') if ':' in coin_id else (coin_id, None)
  39. from ..protocol import CoinProtocol, init_proto
  40. network = CoinProtocol.Base.chain_name_to_network(tx.cfg, coin, chain)
  41. return init_proto(tx.cfg, coin, network=network, need_amt=True, tokensym=tokensym)
  42. def eval_io_data(tx, data, *, desc):
  43. if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs
  44. assert len(data), f'no {desc}!'
  45. for d in data:
  46. d['amt'] = tx.proto.coin_amt(d['amt'])
  47. io, io_list = {
  48. 'inputs': (tx.Input, tx.InputList),
  49. 'outputs': (tx.Output, tx.OutputList),
  50. }[desc]
  51. return io_list(parent=tx, data=[io(tx.proto, **d) for d in data])
  52. class MMGenTxFile(MMGenObject):
  53. data_label = 'MMGenTransaction'
  54. attrs = {
  55. 'chain': None,
  56. 'txid': MMGenTxID,
  57. 'send_amt': 'skip',
  58. 'timestamp': None,
  59. 'blockcount': None,
  60. 'serialized': None}
  61. extra_attrs = {
  62. 'locktime': None,
  63. 'comment': MMGenTxComment,
  64. 'coin_txid': CoinTxID,
  65. 'sent_timestamp': None,
  66. 'is_swap': None}
  67. def __init__(self, tx):
  68. self.tx = tx
  69. self.fmt_data = None
  70. self.filename = None
  71. def parse(self, infile, *, metadata_only=False, quiet_open=False):
  72. tx = self.tx
  73. from ..fileutil import get_data_from_file
  74. data = get_data_from_file(tx.cfg, infile, desc=f'{tx.desc} data', quiet=quiet_open)
  75. if len(data) > tx.cfg.max_tx_file_size:
  76. die('MaxFileSizeExceeded',
  77. f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)')
  78. return (self.parse_data_json if data[0] == '{' else self.parse_data_legacy)(data, metadata_only)
  79. def parse_data_json(self, data, metadata_only):
  80. tx = self.tx
  81. tx.file_format = 'json'
  82. outer_data = json.loads(data)
  83. if 'MoneroMMGenTX' in outer_data:
  84. tx.proto = get_monero_proto(tx, outer_data)
  85. return None
  86. data = outer_data[self.data_label]
  87. if outer_data['chksum'] != make_chksum_6(json_dumps(data)):
  88. chk = make_chksum_6(json_dumps(data))
  89. die(3, f'{self.data_label}: invalid checksum for TxID {data["txid"]} ({chk} != {outer_data["chksum"]})')
  90. tx.proto = get_proto_from_coin_id(tx, data['coin_id'], data['chain'])
  91. for k, v in self.attrs.items():
  92. if v != 'skip':
  93. setattr(tx, k, v(data[k]) if v else data[k])
  94. if metadata_only:
  95. return
  96. for k, v in self.extra_attrs.items():
  97. if k in data:
  98. setattr(tx, k, v(data[k]) if v else data[k])
  99. if tx.is_swap:
  100. for k, v in tx.swap_attrs.items():
  101. if k in data:
  102. setattr(tx, k, v(data[k]) if v else data[k])
  103. for k in ('inputs', 'outputs'):
  104. setattr(tx, k, eval_io_data(tx, data[k], desc=k))
  105. tx.check_txfile_hex_data()
  106. tx.parse_txfile_serialized_data() # Ethereum RLP or JSON data
  107. assert tx.proto.coin_amt(data['send_amt']) == tx.send_amt, f'{data["send_amt"]} != {tx.send_amt}'
  108. def parse_data_legacy(self, data, metadata_only):
  109. tx = self.tx
  110. tx.file_format = 'legacy'
  111. def deserialize(raw_data, *, desc):
  112. from ast import literal_eval
  113. try:
  114. return literal_eval(raw_data)
  115. except:
  116. if desc == 'inputs':
  117. ymsg('Warning: transaction data appears to be in old format')
  118. import re
  119. return literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)", r'\1', raw_data))
  120. desc = 'data'
  121. try:
  122. tx_data = data.splitlines()
  123. assert len(tx_data) >= 5, 'number of lines less than 5'
  124. assert len(tx_data[0]) == 6, 'invalid length of first line'
  125. assert HexStr(tx_data.pop(0)) == make_chksum_6(' '.join(tx_data)), 'file data does not match checksum'
  126. if len(tx_data) == 7:
  127. desc = 'sent timestamp'
  128. (_, tx.sent_timestamp) = tx_data.pop(-1).split()
  129. assert _ == 'Sent', 'invalid sent timestamp line'
  130. if len(tx_data) == 6:
  131. assert len(tx_data[-1]) == 64, 'invalid coin TxID length'
  132. desc = 'coin TxID'
  133. tx.coin_txid = CoinTxID(tx_data.pop(-1))
  134. if len(tx_data) == 5:
  135. # rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up))
  136. assert len(tx_data[-1]) < MMGenTxComment.max_len*6, 'invalid comment length'
  137. c = tx_data.pop(-1)
  138. if c != '-':
  139. desc = 'encoded comment (not base58)'
  140. from ..baseconv import baseconv
  141. comment = baseconv('b58').tobytes(c).decode()
  142. assert comment is not False, 'invalid comment'
  143. desc = 'comment'
  144. tx.comment = MMGenTxComment(comment)
  145. desc = 'number of lines' # four required lines
  146. io_data = {}
  147. (metadata, tx.serialized, io_data['inputs'], io_data['outputs']) = tx_data
  148. assert len(metadata) < 100, 'invalid metadata length' # rough check
  149. metadata = metadata.split()
  150. if metadata[-1].startswith('LT='):
  151. desc = 'locktime'
  152. tx.locktime = int(metadata.pop()[3:])
  153. desc = 'coin token in metadata'
  154. coin_id = metadata.pop(0) if len(metadata) == 6 else 'BTC'
  155. desc = 'chain token in metadata'
  156. tx.chain = metadata.pop(0).lower() if len(metadata) == 5 else 'mainnet'
  157. desc = 'coin_id or chain'
  158. tx.proto = get_proto_from_coin_id(tx, coin_id, tx.chain)
  159. desc = 'metadata (4 items)'
  160. (txid, send_amt, tx.timestamp, blockcount) = metadata
  161. desc = 'TxID in metadata'
  162. tx.txid = MMGenTxID(txid)
  163. desc = 'block count in metadata'
  164. tx.blockcount = int(blockcount)
  165. if metadata_only:
  166. return
  167. desc = 'transaction file hex data'
  168. tx.check_txfile_hex_data()
  169. desc = 'Ethereum RLP or JSON data'
  170. tx.parse_txfile_serialized_data()
  171. for k in ('inputs', 'outputs'):
  172. desc = f'{k} data'
  173. res = deserialize(io_data[k], desc=k)
  174. for d in res:
  175. if 'label' in d:
  176. d['comment'] = d['label']
  177. del d['label']
  178. setattr(tx, k, eval_io_data(tx, res, desc=k))
  179. desc = 'send amount in metadata'
  180. assert tx.proto.coin_amt(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}'
  181. except Exception as e:
  182. die(2, f'Invalid {desc} in transaction file: {e!s}')
  183. def make_filename(self):
  184. tx = self.tx
  185. def gen_filename():
  186. yield tx.txid
  187. if tx.coin != 'BTC':
  188. yield '-' + tx.dcoin
  189. yield f'[{tx.send_amt!s}'
  190. if tx.is_replaceable():
  191. yield ',{}'.format(tx.fee_abs2rel(tx.fee, to_unit=tx.fn_fee_unit))
  192. if tx.get_serialized_locktime():
  193. yield f',tl={tx.get_serialized_locktime()}'
  194. yield ']'
  195. if tx.proto.testnet:
  196. yield '.' + tx.proto.network
  197. yield '.' + tx.ext
  198. return ''.join(gen_filename())
  199. def format(self):
  200. tx = self.tx
  201. coin_id = tx.coin + ('' if tx.coin == tx.dcoin else ':'+tx.dcoin)
  202. def format_data_legacy():
  203. def amt_to_str(d):
  204. return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d}
  205. lines = [
  206. '{}{} {} {} {} {}{}'.format(
  207. (f'{coin_id} ' if coin_id and tx.coin != 'BTC' else ''),
  208. tx.chain.upper(),
  209. tx.txid,
  210. tx.send_amt,
  211. tx.timestamp,
  212. tx.blockcount,
  213. (f' LT={tx.locktime}' if tx.locktime else ''),
  214. ),
  215. tx.serialized,
  216. ascii([amt_to_str(e._asdict()) for e in tx.inputs]),
  217. ascii([amt_to_str(e._asdict()) for e in tx.outputs])
  218. ]
  219. if tx.comment:
  220. from ..baseconv import baseconv
  221. lines.append(baseconv('b58').frombytes(tx.comment.encode(), tostr=True))
  222. if tx.coin_txid:
  223. if not tx.comment:
  224. lines.append('-') # keep old tx files backwards compatible
  225. lines.append(tx.coin_txid)
  226. if tx.sent_timestamp:
  227. lines.append(f'Sent {tx.sent_timestamp}')
  228. return '\n'.join([make_chksum_6(' '.join(lines))] + lines) + '\n'
  229. def format_data_json():
  230. data = json_dumps({
  231. 'coin_id': coin_id
  232. } | {
  233. k: getattr(tx, k) for k in self.attrs
  234. } | {
  235. 'inputs': [e._asdict() for e in tx.inputs],
  236. 'outputs': [{k: v for k,v in e._asdict().items()
  237. if not (type(v) is bool and v is False)} for e in tx.outputs]
  238. } | {
  239. k: getattr(tx, k) for k in self.extra_attrs if getattr(tx, k)
  240. } | ({
  241. k: getattr(tx, k) for k in tx.swap_attrs if getattr(tx, k, None)
  242. } if tx.is_swap else {}))
  243. return '{{"{}":{},"chksum":"{}"}}'.format(self.data_label, data, make_chksum_6(data))
  244. fmt_data = {'json': format_data_json, 'legacy': format_data_legacy}[tx.file_format]()
  245. if len(fmt_data) > tx.cfg.max_tx_file_size:
  246. die('MaxFileSizeExceeded', f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)')
  247. return fmt_data
  248. def write(self, *,
  249. add_desc = '',
  250. outdir = None,
  251. ask_write = True,
  252. ask_write_default_yes = False,
  253. ask_tty = True,
  254. ask_overwrite = True):
  255. if ask_write is False:
  256. ask_write_default_yes = True
  257. if not self.filename:
  258. self.filename = self.make_filename()
  259. if not self.fmt_data:
  260. self.fmt_data = self.format()
  261. from ..fileutil import write_data_to_file
  262. write_data_to_file(
  263. cfg = self.tx.cfg,
  264. outfile = os.path.join((outdir or ''), self.filename),
  265. data = self.fmt_data,
  266. desc = self.tx.desc + add_desc,
  267. ask_overwrite = ask_overwrite,
  268. ask_write = ask_write,
  269. ask_tty = ask_tty,
  270. ask_write_default_yes = ask_write_default_yes,
  271. ignore_opt_outdir = outdir)
  272. @classmethod
  273. def get_proto(cls, cfg, filename, *, quiet_open=False):
  274. from . import BaseTX
  275. tmp_tx = BaseTX(cfg=cfg)
  276. cls(tmp_tx).parse(filename, metadata_only=True, quiet_open=quiet_open)
  277. return tmp_tx.proto