file.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. tx.file: Transaction file operations for the MMGen suite
  20. """
  21. import os
  22. from ..util import ymsg,make_chksum_6,die
  23. from ..obj import MMGenObject,HexStr,MMGenTxID,CoinTxID,MMGenTxComment
  24. class MMGenTxFile(MMGenObject):
  25. def __init__(self,tx):
  26. self.tx = tx
  27. self.chksum = None
  28. self.fmt_data = None
  29. self.filename = None
  30. def parse(self,infile,metadata_only=False,quiet_open=False):
  31. tx = self.tx
  32. def eval_io_data(raw_data,desc):
  33. from ast import literal_eval
  34. try:
  35. d = literal_eval(raw_data)
  36. except:
  37. if desc == 'inputs' and not quiet_open:
  38. ymsg('Warning: transaction data appears to be in old format')
  39. import re
  40. d = literal_eval(re.sub(r"[A-Za-z]+?\(('.+?')\)",r'\1',raw_data))
  41. assert isinstance(d,list), f'{desc} data not a list!'
  42. if not (desc == 'outputs' and tx.proto.base_coin == 'ETH'): # ETH txs can have no outputs
  43. assert len(d), f'no {desc}!'
  44. for e in d:
  45. e['amt'] = tx.proto.coin_amt(e['amt'])
  46. if 'label' in e:
  47. e['comment'] = e['label']
  48. del e['label']
  49. io,io_list = {
  50. 'inputs': ( tx.Input, tx.InputList ),
  51. 'outputs': ( tx.Output, tx.OutputList ),
  52. }[desc]
  53. return io_list( parent=tx, data=[io(tx.proto,**e) for e in d] )
  54. from ..fileutil import get_data_from_file
  55. tx_data = get_data_from_file( tx.cfg, infile, tx.desc+' data', quiet=quiet_open )
  56. desc = 'data'
  57. try:
  58. if len(tx_data) > tx.cfg.max_tx_file_size:
  59. die('MaxFileSizeExceeded',
  60. f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)')
  61. tx_data = tx_data.splitlines()
  62. assert len(tx_data) >= 5,'number of lines less than 5'
  63. assert len(tx_data[0]) == 6,'invalid length of first line'
  64. self.chksum = HexStr(tx_data.pop(0))
  65. assert self.chksum == make_chksum_6(' '.join(tx_data)),'file data does not match checksum'
  66. if len(tx_data) == 7:
  67. desc = 'sent timestamp'
  68. (_, tx.sent_timestamp) = tx_data.pop(-1).split()
  69. assert _ == 'Sent', 'invalid sent timestamp line'
  70. if len(tx_data) == 6:
  71. assert len(tx_data[-1]) == 64,'invalid coin TxID length'
  72. desc = 'coin TxID'
  73. tx.coin_txid = CoinTxID(tx_data.pop(-1))
  74. if len(tx_data) == 5:
  75. # rough check: allow for 4-byte utf8 characters + base58 (4 * 11 / 8 = 6 (rounded up))
  76. assert len(tx_data[-1]) < MMGenTxComment.max_len*6,'invalid comment length'
  77. c = tx_data.pop(-1)
  78. if c != '-':
  79. desc = 'encoded comment (not base58)'
  80. from ..baseconv import baseconv
  81. comment = baseconv('b58').tobytes(c).decode()
  82. assert comment is not False,'invalid comment'
  83. desc = 'comment'
  84. tx.comment = MMGenTxComment(comment)
  85. desc = 'number of lines' # four required lines
  86. ( metadata, tx.serialized, inputs_data, outputs_data ) = tx_data
  87. assert len(metadata) < 100,'invalid metadata length' # rough check
  88. metadata = metadata.split()
  89. if metadata[-1].startswith('LT='):
  90. desc = 'locktime'
  91. tx.locktime = int(metadata.pop()[3:])
  92. desc = 'coin token in metadata'
  93. coin = metadata.pop(0) if len(metadata) == 6 else 'BTC'
  94. coin,tokensym = coin.split(':') if ':' in coin else (coin,None)
  95. desc = 'chain token in metadata'
  96. tx.chain = metadata.pop(0).lower() if len(metadata) == 5 else 'mainnet'
  97. from ..protocol import CoinProtocol,init_proto
  98. network = CoinProtocol.Base.chain_name_to_network(tx.cfg,coin,tx.chain)
  99. desc = 'initialization of protocol'
  100. tx.proto = init_proto( tx.cfg, coin, network=network, need_amt=True )
  101. if tokensym:
  102. tx.proto.tokensym = tokensym
  103. desc = 'metadata (4 items)'
  104. (txid, send_amt, tx.timestamp, blockcount) = metadata
  105. desc = 'TxID in metadata'
  106. tx.txid = MMGenTxID(txid)
  107. desc = 'block count in metadata'
  108. tx.blockcount = int(blockcount)
  109. if metadata_only:
  110. return
  111. desc = 'transaction file hex data'
  112. tx.check_txfile_hex_data()
  113. desc = 'Ethereum RLP or JSON data'
  114. tx.parse_txfile_serialized_data()
  115. desc = 'inputs data'
  116. tx.inputs = eval_io_data(inputs_data,'inputs')
  117. desc = 'outputs data'
  118. tx.outputs = eval_io_data(outputs_data,'outputs')
  119. desc = 'send amount in metadata'
  120. from decimal import Decimal
  121. assert Decimal(send_amt) == tx.send_amt, f'{send_amt} != {tx.send_amt}'
  122. except Exception as e:
  123. die(2,f'Invalid {desc} in transaction file: {e!s}')
  124. def make_filename(self):
  125. tx = self.tx
  126. def gen_filename():
  127. yield tx.txid
  128. if tx.coin != 'BTC':
  129. yield '-' + tx.dcoin
  130. yield f'[{tx.send_amt!s}'
  131. if tx.is_replaceable():
  132. yield ',{}'.format(tx.fee_abs2rel(tx.fee,to_unit=tx.fn_fee_unit))
  133. if tx.get_serialized_locktime():
  134. yield f',tl={tx.get_serialized_locktime()}'
  135. yield ']'
  136. if tx.proto.testnet:
  137. yield '.' + tx.proto.network
  138. yield '.' + tx.ext
  139. return ''.join(gen_filename())
  140. def format(self):
  141. tx = self.tx
  142. def amt_to_str(d):
  143. return {k: (str(d[k]) if k == 'amt' else d[k]) for k in d}
  144. coin_id = '' if tx.coin == 'BTC' else tx.coin + ('' if tx.coin == tx.dcoin else ':'+tx.dcoin)
  145. lines = [
  146. '{}{} {} {} {} {}{}'.format(
  147. (coin_id+' ' if coin_id else ''),
  148. tx.chain.upper(),
  149. tx.txid,
  150. tx.send_amt,
  151. tx.timestamp,
  152. tx.blockcount,
  153. (f' LT={tx.locktime}' if tx.locktime else ''),
  154. ),
  155. tx.serialized,
  156. ascii([amt_to_str(e._asdict()) for e in tx.inputs]),
  157. ascii([amt_to_str(e._asdict()) for e in tx.outputs])
  158. ]
  159. if tx.comment:
  160. from ..baseconv import baseconv
  161. lines.append(baseconv('b58').frombytes(tx.comment.encode(),tostr=True))
  162. if tx.coin_txid:
  163. if not tx.comment:
  164. lines.append('-') # keep old tx files backwards compatible
  165. lines.append(tx.coin_txid)
  166. if tx.sent_timestamp:
  167. lines.append(f'Sent {tx.sent_timestamp}')
  168. self.chksum = make_chksum_6(' '.join(lines))
  169. fmt_data = '\n'.join([self.chksum] + lines) + '\n'
  170. if len(fmt_data) > tx.cfg.max_tx_file_size:
  171. die( 'MaxFileSizeExceeded', f'Transaction file size exceeds limit ({tx.cfg.max_tx_file_size} bytes)' )
  172. return fmt_data
  173. def write(self,
  174. add_desc = '',
  175. outdir = None,
  176. ask_write = True,
  177. ask_write_default_yes = False,
  178. ask_tty = True,
  179. ask_overwrite = True ):
  180. if ask_write is False:
  181. ask_write_default_yes = True
  182. if not self.filename:
  183. self.filename = self.make_filename()
  184. if not self.fmt_data:
  185. self.fmt_data = self.format()
  186. from ..fileutil import write_data_to_file
  187. write_data_to_file(
  188. cfg = self.tx.cfg,
  189. outfile = os.path.join((outdir or ''), self.filename),
  190. data = self.fmt_data,
  191. desc = self.tx.desc + add_desc,
  192. ask_overwrite = ask_overwrite,
  193. ask_write = ask_write,
  194. ask_tty = ask_tty,
  195. ask_write_default_yes = ask_write_default_yes,
  196. ignore_opt_outdir = outdir)
  197. @classmethod
  198. def get_proto(cls,cfg,filename,quiet_open=False):
  199. from . import BaseTX
  200. tmp_tx = BaseTX(cfg=cfg)
  201. cls(tmp_tx).parse(filename,metadata_only=True,quiet_open=quiet_open)
  202. return tmp_tx.proto