base.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.base: base transaction class
  12. """
  13. from ..cfg import gc
  14. from ..objmethods import MMGenObject
  15. from ..obj import (
  16. ImmutableAttr,
  17. ListItemAttr,
  18. MMGenListItem,
  19. MMGenTxComment,
  20. TwComment,
  21. CoinTxID,
  22. HexStr,
  23. NonNegativeInt
  24. )
  25. from ..amt import CoinAmtChk
  26. from ..addr import MMGenID, CoinAddr
  27. from ..util import msg, ymsg, fmt, remove_dups, make_timestamp, die, cached_property
  28. class MMGenTxIO(MMGenListItem):
  29. vout = ListItemAttr(NonNegativeInt)
  30. amt = ImmutableAttr(CoinAmtChk, include_proto=True)
  31. comment = ListItemAttr(TwComment, reassign_ok=True)
  32. mmid = ListItemAttr(MMGenID, include_proto=True)
  33. addr = ImmutableAttr(CoinAddr, include_proto=True)
  34. confs = ListItemAttr(int) # confs of type long exist in the wild, so convert
  35. txid = ListItemAttr(CoinTxID)
  36. have_wif = ListItemAttr(bool, typeconv=False, delete_ok=True)
  37. invalid_attrs = {'proto', 'tw_copy_attrs'}
  38. def __init__(self, proto, **kwargs):
  39. self.__dict__['proto'] = proto
  40. MMGenListItem.__init__(self, **kwargs)
  41. @property
  42. def mmtype(self):
  43. """
  44. Attempt to determine input or output’s MMGenAddrType. For non-MMGen
  45. addresses, infer the type from the address format, returning None for
  46. P2PKH, which could be either 'L' or 'C'.
  47. """
  48. return (
  49. str(self.mmid.mmtype) if self.mmid else
  50. 'B' if self.addr.addr_fmt == 'bech32' else
  51. 'S' if self.addr.addr_fmt == 'p2sh' else
  52. None
  53. ) if self.addr else None
  54. class MMGenTxIOList(list, MMGenObject):
  55. def __init__(self, parent, data=None):
  56. self.parent = parent
  57. if data:
  58. assert isinstance(data, list), 'MMGenTxIOList_check1'
  59. else:
  60. data = []
  61. list.__init__(self, data)
  62. class Base(MMGenObject):
  63. desc = 'transaction'
  64. comment = None
  65. txid = None
  66. coin_txid = None
  67. timestamp = None
  68. sent_timestamp = None
  69. blockcount = None
  70. locktime = None
  71. chain = None
  72. signed = False
  73. is_bump = False
  74. is_swap = False
  75. swap_attrs = {
  76. 'swap_proto': None,
  77. 'swap_quote_expiry': None,
  78. 'swap_recv_addr_mmid': None,
  79. 'swap_recv_asset_spec': None,
  80. 'swap_memo': None,
  81. 'token_vault_addr': None,
  82. 'serialized2': None,
  83. 'coin_txid2': CoinTxID}
  84. file_format = 'json'
  85. non_mmgen_inputs_msg = f"""
  86. This transaction includes inputs with non-{gc.proj_name} addresses. When
  87. signing the transaction, private keys for the addresses listed below must
  88. be supplied using the --keys-from-file option. The key file must contain
  89. one key per line.
  90. Non-{gc.proj_name} addresses found in inputs:
  91. {{}}
  92. """
  93. class Input(MMGenTxIO):
  94. scriptPubKey = ListItemAttr(HexStr)
  95. sequence = ListItemAttr(int, typeconv=False)
  96. tw_copy_attrs = {'scriptPubKey', 'vout', 'amt', 'comment', 'mmid', 'addr', 'confs', 'txid'}
  97. class Output(MMGenTxIO):
  98. addr = ListItemAttr(CoinAddr, include_proto=True) # ImmutableAttr in parent cls
  99. is_chg = ListItemAttr(bool, typeconv=False)
  100. is_vault = ListItemAttr(bool, typeconv=False)
  101. data = ListItemAttr(None, typeconv=False) # placeholder
  102. class InputList(MMGenTxIOList):
  103. desc = 'transaction inputs'
  104. class OutputList(MMGenTxIOList):
  105. desc = 'transaction outputs'
  106. def __init__(self, *args, **kwargs):
  107. self.cfg = kwargs['cfg']
  108. self.inputs = self.InputList(self)
  109. self.outputs = self.OutputList(self)
  110. self.name = type(self).__name__
  111. self.proto = kwargs['proto']
  112. self.twctl = kwargs.get('twctl')
  113. self.is_token = 'Token' in self.name
  114. @property
  115. def coin(self):
  116. return self.proto.coin
  117. @property
  118. def dcoin(self):
  119. return self.proto.dcoin
  120. @property
  121. def info(self):
  122. from .info import init_info
  123. return init_info(self.cfg, self)
  124. def check_correct_chain(self):
  125. if hasattr(self, 'rpc'):
  126. if self.chain != self.rpc.chain:
  127. die('TransactionChainMismatch',
  128. f'Transaction is for {self.chain}, but coin daemon chain is {self.rpc.chain}!')
  129. def sum_inputs(self):
  130. return sum(e.amt for e in self.inputs)
  131. def sum_outputs(self, *, exclude=None):
  132. if exclude is None:
  133. olist = self.outputs
  134. else:
  135. olist = self.outputs[:exclude] + self.outputs[exclude+1:]
  136. if not olist:
  137. return self.proto.coin_amt('0')
  138. return sum(e.amt for e in olist)
  139. def _chg_output_ops(self, op, attr):
  140. is_chgs = [getattr(x, attr) for x in self.outputs]
  141. if is_chgs.count(True) == 1:
  142. return is_chgs.index(True) if op == 'idx' else self.outputs[is_chgs.index(True)]
  143. elif is_chgs.count(True) == 0:
  144. return None
  145. else:
  146. raise ValueError('more than one change output!')
  147. @property
  148. def chg_idx(self):
  149. return self._chg_output_ops('idx', 'is_chg')
  150. @property
  151. def chg_output(self):
  152. return self._chg_output_ops('output', 'is_chg')
  153. def add_timestamp(self):
  154. self.timestamp = make_timestamp()
  155. def add_sent_timestamp(self):
  156. self.sent_timestamp = make_timestamp()
  157. def add_blockcount(self):
  158. self.blockcount = self.rpc.blockcount
  159. # returns True if comment added or changed, False otherwise
  160. def add_comment(self, *, infile=None):
  161. if infile:
  162. from ..fileutil import get_data_from_file
  163. self.comment = MMGenTxComment(
  164. get_data_from_file(self.cfg, infile, desc='transaction comment'))
  165. else:
  166. from ..ui import keypress_confirm, line_input
  167. if keypress_confirm(
  168. self.cfg,
  169. prompt = 'Edit transaction comment?' if self.comment else 'Add a comment to transaction?',
  170. default_yes = False):
  171. res = MMGenTxComment(line_input(self.cfg, 'Comment: ', insert_txt=self.comment))
  172. if not res:
  173. ymsg('Warning: comment is empty')
  174. changed = res != self.comment
  175. self.comment = res
  176. return changed
  177. else:
  178. return False
  179. def get_non_mmaddrs(self, desc):
  180. return remove_dups(
  181. (i.addr for i in getattr(self, desc) if not i.mmid),
  182. edesc = 'non-MMGen address',
  183. quiet = True)
  184. def check_non_mmgen_inputs(self, *, caller, non_mmaddrs=None):
  185. assert caller in ('txcreate', 'txdo', 'txsign', 'autosign')
  186. non_mmaddrs = non_mmaddrs or self.get_non_mmaddrs('inputs')
  187. if non_mmaddrs:
  188. indent = ' '
  189. fs = fmt(self.non_mmgen_inputs_msg, strip_char='\t', indent=indent).strip()
  190. m = fs.format('\n '.join(non_mmaddrs))
  191. if caller in ('txdo', 'txsign'):
  192. if not self.cfg.keys_from_file:
  193. die('UserOptError', f'\n{indent}ERROR: {m}\n')
  194. else:
  195. msg(f'\n{indent}WARNING: {m}\n')
  196. if not (caller == 'autosign' or self.cfg.yes):
  197. from ..ui import keypress_confirm
  198. keypress_confirm(self.cfg, 'Continue?', default_yes=True, do_exit=True)
  199. # swap methods:
  200. @cached_property
  201. def swap_proto_mod(self):
  202. from .new_swap import get_swap_proto_mod
  203. return get_swap_proto_mod(self.swap_proto)
  204. @cached_property
  205. def send_asset(self):
  206. spec = self.proto.coin + (f'.{self.proto.tokensym}' if self.proto.tokensym else '')
  207. return self.swap_proto_mod.SwapAsset(spec, 'send')
  208. @cached_property
  209. def recv_asset(self):
  210. if hasattr(self, 'swap_recv_asset_spec'):
  211. return self.swap_proto_mod.SwapAsset(self.swap_recv_asset_spec, 'recv')
  212. else: # backwards-compatibility workaround
  213. from ..swap.asset import SwapAsset
  214. x = '[unknown]'
  215. return SwapAsset._ad(x, x, x, x, x)
  216. # token methods:
  217. @property
  218. def token_op(self):
  219. return 'approve' if self.is_swap else 'transfer'