new.py 18 KB


  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2026 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.new: new transaction class
  12. """
  13. from collections import namedtuple
  14. from .base import Base
  15. from ..cfg import gc
  16. from ..color import pink, yellow
  17. from ..obj import get_obj, MMGenList
  18. from ..util import msg, fmt, die, suf, remove_dups, get_extension
  19. from ..addr import (
  20. is_mmgen_id,
  21. MMGenAddrType,
  22. MMGenID,
  23. CoinAddr,
  24. is_mmgen_addrtype,
  25. is_coin_addr,
  26. is_addrlist_id
  27. )
  28. def mmaddr2coinaddr(cfg, mmaddr, ad_w, ad_f, proto):
  29. def wmsg(k):
  30. messages = {
  31. 'addr_in_addrfile_only': f"""
  32. Warning: output address {mmaddr} is not in the tracking wallet, which
  33. means its balance will not be tracked. You're strongly advised to import
  34. the address into your tracking wallet before broadcasting this transaction.
  35. """,
  36. 'addr_not_found': f"""
  37. No data for {gc.proj_name} address {mmaddr} could be found in either the
  38. tracking wallet or the supplied address file. Please import this address
  39. into your tracking wallet, or supply an address file on the command line.
  40. """,
  41. 'addr_not_found_no_addrfile': f"""
  42. No data for {gc.proj_name} address {mmaddr} could be found in the tracking
  43. wallet. Please import this address into your tracking wallet or supply an
  44. address file for it on the command line.
  45. """}
  46. return '\n' + fmt(messages[k], indent=' ')
  47. # assume mmaddr has already been checked
  48. coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
  49. if not coin_addr:
  50. if ad_f:
  51. coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
  52. if coin_addr:
  53. msg(wmsg('addr_in_addrfile_only'))
  54. from ..ui import keypress_confirm
  55. if not (cfg.yes or keypress_confirm(cfg, 'Continue anyway?')):
  56. import sys
  57. sys.exit(1)
  58. else:
  59. die(2, wmsg('addr_not_found'))
  60. else:
  61. die(2, wmsg('addr_not_found_no_addrfile'))
  62. return CoinAddr(proto, coin_addr)
  63. def parse_fee_spec(proto, fee_arg):
  64. import re
  65. units = {u[0]: u for u in proto.coin_amt.units}
  66. pat = re.compile(r'((?:[1-9][0-9]*)|(?:[0-9]+\.[0-9]+))({})'.format('|'.join(units)))
  67. if m := pat.match(fee_arg):
  68. return namedtuple('parsed_fee_spec', ['amt', 'unit'])(m[1], units[m[2]])
  69. class New(Base):
  70. fee_is_approximate = False
  71. is_sweep = False
  72. msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
  73. msg_no_change_output = """
  74. ERROR: No change address specified. If you wish to create a transaction with
  75. only one output, specify a single output address with no {} amount
  76. """
  77. chg_autoselected = False
  78. _funds_available = namedtuple('funds_available', ['is_positive', 'amt'])
  79. _net_fee = namedtuple('network_fee_estimate', ['fee', 'type'])
  80. def warn_insufficient_funds(self, amt, coin):
  81. msg(self.msg_insufficient_funds.format(amt.hl(), coin))
  82. def update_output_amt(self, idx, amt):
  83. o = self.outputs[idx]._asdict()
  84. o['amt'] = amt
  85. self.outputs[idx] = self.Output(self.proto, **o)
  86. def add_mmaddrs_to_outputs(self, ad_f, ad_w):
  87. a = [e.addr for e in self.outputs]
  88. d = ad_w.make_reverse_dict(a)
  89. if ad_f:
  90. d.update(ad_f.make_reverse_dict(a))
  91. for e in self.outputs:
  92. if e.addr and e.addr in d:
  93. e.mmid, f = d[e.addr]
  94. if f:
  95. e.comment = f
  96. def check_dup_addrs(self, io_desc):
  97. assert io_desc in ('inputs', 'outputs')
  98. addrs = [e.addr for e in getattr(self, io_desc) if e.addr]
  99. if len(addrs) != len(set(addrs)):
  100. die(2, f'{addrs}: duplicate address in transaction {io_desc}')
  101. # given tx size and absolute fee or fee spec, return absolute fee
  102. # relative fee is N+<first letter of unit name>
  103. def feespec2abs(self, fee_arg, tx_size):
  104. if type(fee_arg) is self.proto.coin_amt:
  105. return fee_arg
  106. if fee := get_obj(self.proto.coin_amt, num=fee_arg, silent=True):
  107. return fee
  108. if res := parse_fee_spec(self.proto, fee_arg):
  109. return self.fee_rel2abs(tx_size, float(res.amt), res.unit)
  110. return False
  111. def get_usr_fee_interactive(self, fee=None, *, desc='Starting'):
  112. abs_fee = None
  113. from ..ui import line_input
  114. while True:
  115. if fee:
  116. abs_fee = self.convert_and_check_fee(fee, desc)
  117. if abs_fee:
  118. if self.is_bump and not self.check_bumped_fee_ok(abs_fee):
  119. pass
  120. else:
  121. prompt = '{a} TX fee{b}: {c}{d} {e} ({f} {g})\n'.format(
  122. a = desc,
  123. b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
  124. if self.cfg.fee_adjust != 1 and desc.startswith('Network-estimated')
  125. else ''),
  126. c = ('', '≈')[self.fee_is_approximate],
  127. d = abs_fee.hl(),
  128. e = self.coin,
  129. f = pink(self.fee_abs2rel(abs_fee)),
  130. g = self.rel_fee_disp)
  131. from ..ui import keypress_confirm
  132. if self.cfg.yes or keypress_confirm(self.cfg, prompt+'OK?', default_yes=True):
  133. if self.cfg.yes:
  134. msg(prompt)
  135. return abs_fee
  136. fee = line_input(self.cfg, self.usr_fee_prompt)
  137. desc = 'User-selected'
  138. # we don't know fee yet, so perform preliminary check with fee == 0
  139. async def precheck_sufficient_funds(self, inputs_sum, sel_unspent, outputs_sum):
  140. if self.twuo.total < outputs_sum:
  141. msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum, self.dcoin))
  142. return False
  143. if inputs_sum < outputs_sum:
  144. self.warn_insufficient_funds(outputs_sum - inputs_sum, self.dcoin)
  145. return False
  146. return True
  147. def add_output(self, coinaddr, amt, *, is_chg=False, is_vault=False, data=None):
  148. self.outputs.append(
  149. self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, is_vault=is_vault, data=data))
  150. def process_data_output_arg(self, arg):
  151. return None
  152. def parse_cmdline_arg(self, proto, arg_in, ad_f, ad_w):
  153. _pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data', 'is_vault'])
  154. if data := self.process_data_output_arg(arg_in):
  155. return _pa(arg_in, None, None, None, data, False)
  156. arg, amt = arg_in.split(',', 1) if ',' in arg_in else (arg_in, None)
  157. coin_addr, mmid, is_vault = (None, None, False)
  158. if arg == 'vault' and self.is_swap:
  159. is_vault = True
  160. elif mmid := get_obj(MMGenID, proto=proto, id_str=arg, silent=True):
  161. coin_addr = mmaddr2coinaddr(self.cfg, arg, ad_w, ad_f, proto)
  162. elif is_coin_addr(proto, arg):
  163. coin_addr = CoinAddr(proto, arg)
  164. elif is_mmgen_addrtype(proto, arg) or is_addrlist_id(proto, arg):
  165. if proto.base_proto_coin != 'BTC':
  166. die(2, f'Change addresses not supported for {proto.name} protocol')
  167. self.chg_autoselected = True
  168. else:
  169. die(2, f'{arg_in}: invalid command-line argument')
  170. return _pa(arg, mmid, coin_addr, amt, None, is_vault)
  171. async def get_autochg_addr(self, proto, arg, *, exclude, desc, all_addrtypes=False):
  172. from ..tw.addresses import TwAddresses
  173. al = await TwAddresses(self.cfg, proto, get_data=True)
  174. if all_addrtypes:
  175. res = al.get_change_address_by_addrtype(None, exclude=exclude, desc=desc)
  176. req_desc = 'of any allowed address type'
  177. elif obj := get_obj(MMGenAddrType, proto=proto, id_str=arg, silent=True):
  178. res = al.get_change_address_by_addrtype(obj, exclude=exclude, desc=desc)
  179. req_desc = f'of address type {arg!r}'
  180. else:
  181. res = al.get_change_address(arg, exclude=exclude, desc=desc)
  182. req_desc = f'from address list {arg!r}'
  183. if res:
  184. return res
  185. die(2, 'Tracking wallet contains no {t}addresses {d}'.format(
  186. t = '' if res is None else 'unused ',
  187. d = req_desc))
  188. async def process_cmdline_args(self, cmd_args, ad_f, ad_w):
  189. parsed_args = [self.parse_cmdline_arg(self.proto, arg, ad_f, ad_w) for arg in cmd_args]
  190. chg_args = [a for a in parsed_args if not (a.amt or a.data)]
  191. if len(chg_args) > 1:
  192. desc = 'requested' if self.chg_autoselected else 'listed'
  193. die(2, f'ERROR: More than one change address {desc} on command line')
  194. for a in parsed_args:
  195. if a.data:
  196. self.add_output(None, self.proto.coin_amt('0'), data=a.data)
  197. else:
  198. self.add_output(
  199. coinaddr = None if a.is_vault else a.addr or (
  200. await self.get_autochg_addr(
  201. self.proto,
  202. a.arg,
  203. exclude = [a.mmid for a in parsed_args if a.mmid],
  204. desc = 'change address')).addr,
  205. amt = self.proto.coin_amt(a.amt or '0'),
  206. is_chg = not a.amt,
  207. is_vault = a.is_vault)
  208. if self.is_compat:
  209. return
  210. if self.chg_idx is None:
  211. die(2,
  212. fmt(self.msg_no_change_output.format(self.dcoin)).strip()
  213. if len(self.outputs) == 1 else
  214. 'ERROR: No change output specified')
  215. if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
  216. die(2,
  217. f'{gc.proj_name} Segwit address requested on the command line, '
  218. 'but Segwit is not active on this chain')
  219. if not self.nondata_outputs:
  220. die(2, 'At least one spending output must be specified on the command line')
  221. self.add_mmaddrs_to_outputs(ad_f, ad_w)
  222. self.check_dup_addrs('outputs')
  223. if self.chg_output is not None:
  224. if self.chg_autoselected and not self.is_swap: # swap TX, so user has already confirmed
  225. self.confirm_autoselected_addr(self.chg_output.mmid, 'change address')
  226. elif len(self.nondata_outputs) > 1:
  227. await self.warn_addr_used(self.proto, self.chg_output, 'change address')
  228. def get_addrfiles_from_cmdline(self, cmd_args):
  229. from ..addrfile import AddrFile
  230. addrfile_args = remove_dups(
  231. tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
  232. desc = 'command line',
  233. edesc = 'argument',
  234. )
  235. cmd_args = tuple(a for a in cmd_args if a not in addrfile_args)
  236. if not self.is_swap:
  237. cmd_args = remove_dups(cmd_args, desc='command line', edesc='argument')
  238. return cmd_args, addrfile_args
  239. def get_addrdata_from_files(self, proto, addrfiles):
  240. from ..addrdata import AddrData
  241. from ..addrlist import AddrList
  242. from ..fileutil import check_infile
  243. ad_f = AddrData(proto)
  244. for addrfile in addrfiles:
  245. check_infile(addrfile)
  246. try:
  247. ad_f.add(AddrList(self.cfg, proto, infile=addrfile))
  248. except Exception as e:
  249. msg(f'{type(e).__name__}: {e}')
  250. return ad_f
  251. def confirm_autoselected_addr(self, mmid, desc):
  252. from ..ui import keypress_confirm
  253. keypress_confirm(
  254. self.cfg,
  255. 'Using {a} as {b}. OK?'.format(
  256. a = mmid.hl(),
  257. b = 'single output address' if len(self.nondata_outputs) == 1 else desc),
  258. default_yes = True,
  259. do_exit = True)
  260. async def warn_addr_used(self, proto, chg, desc):
  261. if proto.address_reuse_ok:
  262. return
  263. from ..tw.addresses import TwAddresses
  264. if (await TwAddresses(self.cfg, proto, get_data=True)).is_used(chg.addr):
  265. from ..ui import keypress_confirm
  266. keypress_confirm(
  267. self.cfg,
  268. '{a} {b} {c}\n{d}'.format(
  269. a = yellow(f'Requested {desc}'),
  270. b = chg.mmid.hl() if chg.mmid else chg.addr.hl(chg.addr.view_pref),
  271. c = yellow('is already used!'),
  272. d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
  273. ),
  274. complete_prompt = True,
  275. default_yes = False,
  276. do_exit = True)
  277. # inputs methods
  278. def get_unspent_nums_from_user(self, unspent):
  279. prompt = 'Enter a range or space-separated list of outputs to spend: '
  280. from ..ui import line_input
  281. while True:
  282. reply = line_input(self.cfg, prompt).strip()
  283. if reply:
  284. from ..addrlist import AddrIdxList
  285. selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()))
  286. if selected:
  287. if selected[-1] <= len(unspent):
  288. return selected
  289. msg(f'Unspent output number must be <= {len(unspent)}')
  290. def get_unspent_nums_from_inputs_opt(self, unspent):
  291. def do_add_msg(idx):
  292. uo = unspent[idx]
  293. mm_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
  294. msg('Adding input: {} {}{}'.format(idx + 1, uo.addr, mm_disp))
  295. def get_uo_nums():
  296. for addr in self.cfg.inputs.split(','):
  297. if is_mmgen_id(self.proto, addr):
  298. attr = 'twmmid'
  299. elif is_coin_addr(self.proto, addr):
  300. attr = 'addr'
  301. else:
  302. die(1, f'{addr!r}: not an MMGen ID or {self.coin} address')
  303. found = False
  304. for idx, e in enumerate(unspent):
  305. if getattr(e, attr) == addr:
  306. do_add_msg(idx)
  307. yield idx + 1
  308. found = True
  309. if not found:
  310. die(1, f'{addr!r}: address not found in tracking wallet')
  311. return set(get_uo_nums()) # silently discard duplicates
  312. def copy_inputs_from_tw(self, tw_unspent_data):
  313. def gen_inputs():
  314. for d in tw_unspent_data:
  315. i = self.Input(
  316. self.proto,
  317. **{attr: getattr(d, attr) for attr in d.__dict__
  318. if attr in self.Input.tw_copy_attrs})
  319. if d.twmmid.type == 'mmgen':
  320. i.mmid = d.twmmid # twmmid -> mmid
  321. yield i
  322. self.inputs = type(self.inputs)(self, list(gen_inputs()))
  323. async def get_funds_available(self, fee, outputs_sum):
  324. in_sum = self.sum_inputs()
  325. out_sum = outputs_sum + fee
  326. return self._funds_available(
  327. in_sum >= out_sum,
  328. # CoinAmt must be non-negative, so cannot use abs():
  329. in_sum - out_sum if in_sum >= out_sum else out_sum - in_sum)
  330. async def get_inputs(self, outputs_sum):
  331. data = self.twuo.accts_data if self.twuo.is_account_based else self.twuo.data
  332. sel_nums = (
  333. self.get_unspent_nums_from_inputs_opt if self.cfg.inputs else
  334. self.get_unspent_nums_from_user
  335. )(data)
  336. msg('Selected {}{}: {}'.format(
  337. self.twuo.item_desc,
  338. suf(sel_nums),
  339. ' '.join(str(n) for n in sel_nums)))
  340. sel_unspent = MMGenList(data[i-1] for i in sel_nums)
  341. if not (self.is_compat or await self.precheck_sufficient_funds(
  342. sum(s.amt for s in sel_unspent),
  343. sel_unspent,
  344. outputs_sum)):
  345. return False
  346. self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
  347. return True
  348. async def network_fee_disp(self):
  349. res = await self.get_rel_fee_from_network()
  350. return pink(
  351. 'N/A' if res.fee is None else
  352. self.network_fee_to_unit_disp(res))
  353. async def get_fee(self, fee, outputs_sum, start_fee_desc):
  354. if fee:
  355. self.usr_fee = self.get_usr_fee_interactive(fee, desc=start_fee_desc)
  356. else:
  357. res = await self.get_rel_fee_from_network()
  358. self.usr_fee = self.get_usr_fee_interactive(
  359. None if res.fee is None else self.fee_est2abs(res),
  360. desc = self.network_estimated_fee_label)
  361. funds = await self.get_funds_available(self.usr_fee, outputs_sum)
  362. if funds.is_positive:
  363. p = self.final_inputs_ok_msg(funds.amt)
  364. from ..ui import keypress_confirm
  365. if self.cfg.yes or keypress_confirm(self.cfg, p+'. OK?', default_yes=True):
  366. if self.cfg.yes:
  367. msg(p)
  368. return funds.amt
  369. else:
  370. self.warn_insufficient_funds(funds.amt, self.coin)
  371. def _non_wallet_addr_confirm(self, message):
  372. from ..ui import confirm_or_raise
  373. confirm_or_raise(
  374. cfg = self.cfg,
  375. message = yellow(message),
  376. action = 'Are you sure this is what you want?')
  377. async def create(self, cmd_args, *, locktime=None, do_info=False, caller='txcreate'):
  378. assert isinstance(locktime, int | type(None)), 'locktime must be of type int'
  379. from ..tw.unspent import TwUnspentOutputs
  380. if self.cfg.comment_file:
  381. self.add_comment(infile=self.cfg.comment_file)
  382. if not (do_info or self.is_sweep):
  383. cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
  384. if self.is_swap:
  385. cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
  386. if self.is_compat:
  387. await self.process_cmdline_args(cmd_args, None, None)
  388. else:
  389. from ..rpc import rpc_init
  390. self.rpc = await rpc_init(self.cfg, self.proto)
  391. from ..addrdata import TwAddrData
  392. await self.process_cmdline_args(
  393. cmd_args,
  394. self.get_addrdata_from_files(self.proto, addrfile_args),
  395. await TwAddrData(self.cfg, self.proto, twctl=self.twctl))
  396. if not self.is_bump:
  397. self.twuo = await TwUnspentOutputs(
  398. self.cfg,
  399. self.proto,
  400. minconf = self.cfg.minconf,
  401. addrs = await self.get_input_addrs_from_inputs_opt(),
  402. tx = self if self.is_sweep else None)
  403. await self.twuo.get_data()
  404. self.twctl = self.twuo.twctl
  405. from ..ui import do_license_msg
  406. do_license_msg(self.cfg)
  407. if not (self.is_bump or self.cfg.inputs):
  408. await self.twuo.view_filter_and_sort()
  409. if self.is_sweep:
  410. del self.twctl
  411. del self.twuo.twctl
  412. return await self.compat_create()
  413. if not self.is_bump:
  414. self.twuo.display_total()
  415. if do_info:
  416. del self.twctl
  417. del self.twuo.twctl
  418. import sys
  419. sys.exit(0)
  420. outputs_sum = self.sum_outputs()
  421. msg('Total amount to spend: {}'.format(
  422. f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'))
  423. while True:
  424. if not await self.get_inputs(outputs_sum):
  425. continue
  426. if self.is_swap:
  427. fee_hint = await self.update_vault_output(
  428. self.vault_output.amt or self.sum_inputs(),
  429. deduct_est_fee = self.vault_output == self.chg_output)
  430. else:
  431. await self.set_gas()
  432. fee_hint = None
  433. desc = 'User-selected' if self.cfg.fee else 'Recommended' if fee_hint else None
  434. if (funds_left := await self.get_fee(
  435. self.cfg.fee or fee_hint,
  436. outputs_sum,
  437. desc)) is not None:
  438. break
  439. if not self.is_compat:
  440. self.check_non_mmgen_inputs(caller=caller)
  441. self.update_change_output(funds_left)
  442. self.check_chg_addr_is_wallet_addr()
  443. if self.has_comment and not self.cfg.yes:
  444. self.add_comment() # edits an existing comment
  445. if self.is_swap:
  446. import time
  447. if time.time() > self.swap_quote_refresh_time + self.swap_quote_refresh_timeout:
  448. await self.update_vault_output(self.vault_output.amt)
  449. if self.is_compat:
  450. del self.twctl
  451. del self.twuo.twctl
  452. return await self.compat_create()
  453. await self.create_serialized(locktime=locktime) # creates self.txid too
  454. self.add_timestamp()
  455. self.add_blockcount()
  456. self.chain = self.proto.chain_name
  457. self.check_fee()
  458. self.cfg._util.qmsg('Transaction successfully created')
  459. if self.is_bump:
  460. return
  461. from . import UnsignedTX
  462. new = UnsignedTX(cfg=self.cfg, data=self.__dict__, automount=self.cfg.autosign)
  463. if not self.cfg.yes:
  464. new.info.view_with_prompt('View transaction details?')
  465. del new.twuo.twctl
  466. return new