base.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.base: base transaction class
  12. """
  13. from ..cfg import gc
  14. from ..objmethods import MMGenObject
  15. from ..obj import (
  16. ImmutableAttr,
  17. ListItemAttr,
  18. MMGenListItem,
  19. MMGenTxComment,
  20. TwComment,
  21. CoinTxID,
  22. HexStr,
  23. NonNegativeInt
  24. )
  25. from ..amt import CoinAmtChk
  26. from ..addr import MMGenID, CoinAddr
  27. from ..util import msg, ymsg, fmt, remove_dups, make_timestamp, die, cached_property
  28. class MMGenTxIO(MMGenListItem):
  29. vout = ListItemAttr(NonNegativeInt)
  30. amt = ImmutableAttr(CoinAmtChk, include_proto=True)
  31. comment = ListItemAttr(TwComment, reassign_ok=True)
  32. mmid = ListItemAttr(MMGenID, include_proto=True)
  33. addr = ImmutableAttr(CoinAddr, include_proto=True)
  34. confs = ListItemAttr(int) # confs of type long exist in the wild, so convert
  35. txid = ListItemAttr(CoinTxID)
  36. have_wif = ListItemAttr(bool, typeconv=False, delete_ok=True)
  37. invalid_attrs = {'proto', 'tw_copy_attrs'}
  38. def __init__(self, proto, **kwargs):
  39. self.__dict__['proto'] = proto
  40. MMGenListItem.__init__(self, **kwargs)
  41. @property
  42. def mmtype(self):
  43. """
  44. Attempt to determine input or output’s MMGenAddrType. For non-MMGen
  45. addresses, infer the type from the address format, returning None for
  46. P2PKH, which could be either 'L' or 'C'.
  47. """
  48. return (
  49. str(self.mmid.mmtype) if self.mmid else
  50. 'B' if self.addr.addr_fmt == 'bech32' else
  51. 'S' if self.addr.addr_fmt == 'p2sh' else
  52. None
  53. ) if self.addr else None
  54. class MMGenTxIOList(list, MMGenObject):
  55. def __init__(self, parent, data=None):
  56. self.parent = parent
  57. if data:
  58. assert isinstance(data, list), 'MMGenTxIOList_check1'
  59. else:
  60. data = []
  61. list.__init__(self, data)
  62. class Base(MMGenObject):
  63. desc = 'transaction'
  64. comment = None
  65. txid = None
  66. coin_txid = None
  67. timestamp = None
  68. sent_timestamp = None
  69. blockcount = None
  70. locktime = None
  71. chain = None
  72. signed = False
  73. is_bump = False
  74. is_swap = False
  75. is_compat = False
  76. has_comment = True
  77. swap_attrs = {
  78. 'swap_proto': None,
  79. 'swap_quote_expiry': None,
  80. 'swap_recv_addr_mmid': None,
  81. 'swap_recv_asset_spec': None,
  82. 'swap_memo': None,
  83. 'token_vault_addr': None,
  84. 'serialized2': None,
  85. 'coin_txid2': CoinTxID}
  86. file_format = 'json'
  87. non_mmgen_inputs_msg = f"""
  88. This transaction includes inputs with non-{gc.proj_name} addresses. When
  89. signing the transaction, private keys for the addresses listed below must
  90. be supplied using the --keys-from-file option. The key file must contain
  91. one key per line.
  92. Non-{gc.proj_name} addresses found in inputs:
  93. {{}}
  94. """
  95. class Input(MMGenTxIO):
  96. scriptPubKey = ListItemAttr(HexStr)
  97. sequence = ListItemAttr(int, typeconv=False)
  98. tw_copy_attrs = {'scriptPubKey', 'vout', 'amt', 'comment', 'mmid', 'addr', 'confs', 'txid'}
  99. class Output(MMGenTxIO):
  100. addr = ListItemAttr(CoinAddr, include_proto=True) # ImmutableAttr in parent cls
  101. is_chg = ListItemAttr(bool, typeconv=False)
  102. is_vault = ListItemAttr(bool, typeconv=False)
  103. data = ListItemAttr(None, typeconv=False) # placeholder
  104. class InputList(MMGenTxIOList):
  105. desc = 'transaction inputs'
  106. class OutputList(MMGenTxIOList):
  107. desc = 'transaction outputs'
  108. def __init__(self, *args, **kwargs):
  109. self.cfg = kwargs['cfg']
  110. self.inputs = self.InputList(self)
  111. self.outputs = self.OutputList(self)
  112. self.name = type(self).__name__
  113. self.proto = kwargs['proto']
  114. self.twctl = kwargs.get('twctl')
  115. self.is_token = 'Token' in self.name
  116. @property
  117. def coin(self):
  118. return self.proto.coin
  119. @property
  120. def dcoin(self):
  121. return self.proto.dcoin
  122. @property
  123. def info(self):
  124. from .info import init_info
  125. return init_info(self.cfg, self)
  126. def check_correct_chain(self):
  127. if hasattr(self, 'rpc'):
  128. if self.chain != self.rpc.chain:
  129. die('TransactionChainMismatch',
  130. f'Transaction is for {self.chain}, but coin daemon chain is {self.rpc.chain}!')
  131. def sum_inputs(self):
  132. return sum(e.amt for e in self.inputs)
  133. def sum_outputs(self, *, exclude=None):
  134. if exclude is None:
  135. olist = self.outputs
  136. else:
  137. olist = self.outputs[:exclude] + self.outputs[exclude+1:]
  138. if not olist:
  139. return self.proto.coin_amt('0')
  140. return sum(e.amt for e in olist)
  141. def _chg_output_ops(self, op, attr):
  142. is_chgs = [getattr(x, attr) for x in self.outputs]
  143. if is_chgs.count(True) == 1:
  144. return is_chgs.index(True) if op == 'idx' else self.outputs[is_chgs.index(True)]
  145. elif is_chgs.count(True) == 0:
  146. return None
  147. else:
  148. raise ValueError('more than one change output!')
  149. @property
  150. def chg_idx(self):
  151. return self._chg_output_ops('idx', 'is_chg')
  152. @property
  153. def chg_output(self):
  154. return self._chg_output_ops('output', 'is_chg')
  155. def add_timestamp(self):
  156. self.timestamp = make_timestamp()
  157. def add_sent_timestamp(self):
  158. self.sent_timestamp = make_timestamp()
  159. def add_blockcount(self):
  160. self.blockcount = self.rpc.blockcount
  161. # returns True if comment added or changed, False otherwise
  162. def add_comment(self, *, infile=None):
  163. if infile:
  164. from ..fileutil import get_data_from_file
  165. self.comment = MMGenTxComment(
  166. get_data_from_file(self.cfg, infile, desc='transaction comment'))
  167. else:
  168. from ..ui import keypress_confirm, line_input
  169. if keypress_confirm(
  170. self.cfg,
  171. prompt = 'Edit transaction comment?' if self.comment else 'Add a comment to transaction?',
  172. default_yes = False):
  173. res = MMGenTxComment(line_input(self.cfg, 'Comment: ', insert_txt=self.comment))
  174. if not res:
  175. ymsg('Warning: comment is empty')
  176. changed = res != self.comment
  177. self.comment = res
  178. return changed
  179. else:
  180. return False
  181. def get_non_mmaddrs(self, desc):
  182. return remove_dups(
  183. (i.addr for i in getattr(self, desc) if not i.mmid),
  184. edesc = 'non-MMGen address',
  185. quiet = True)
  186. def check_non_mmgen_inputs(self, *, caller, non_mmaddrs=None):
  187. assert caller in ('txcreate', 'txdo', 'txsign', 'autosign')
  188. non_mmaddrs = non_mmaddrs or self.get_non_mmaddrs('inputs')
  189. if non_mmaddrs:
  190. indent = ' '
  191. fs = fmt(self.non_mmgen_inputs_msg, strip_char='\t', indent=indent).strip()
  192. m = fs.format('\n '.join(non_mmaddrs))
  193. if caller in ('txdo', 'txsign'):
  194. if not self.cfg.keys_from_file:
  195. die('UserOptError', f'\n{indent}ERROR: {m}\n')
  196. else:
  197. msg(f'\n{indent}WARNING: {m}\n')
  198. if not (caller == 'autosign' or self.cfg.yes):
  199. from ..ui import keypress_confirm
  200. keypress_confirm(self.cfg, 'Continue?', default_yes=True, do_exit=True)
  201. # swap methods:
  202. @cached_property
  203. def swap_proto_mod(self):
  204. from .new_swap import get_swap_proto_mod
  205. return get_swap_proto_mod(self.swap_proto)
  206. @cached_property
  207. def send_asset(self):
  208. spec = self.proto.coin + (f'.{self.proto.tokensym}' if self.proto.tokensym else '')
  209. return self.swap_proto_mod.SwapAsset(spec, 'send')
  210. @cached_property
  211. def recv_asset(self):
  212. if hasattr(self, 'swap_recv_asset_spec'):
  213. return self.swap_proto_mod.SwapAsset(self.swap_recv_asset_spec, 'recv')
  214. else: # backwards-compatibility workaround
  215. from ..swap.asset import SwapAsset
  216. x = '[unknown]'
  217. return SwapAsset._ad(x, x, x, x, x)
  218. # token methods:
  219. @property
  220. def token_op(self):
  221. return 'approve' if self.is_swap else 'transfer'