new.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.new: new transaction class
  12. """
  13. from collections import namedtuple
  14. from .base import Base
  15. from ..cfg import gc
  16. from ..color import pink, yellow
  17. from ..obj import get_obj, MMGenList
  18. from ..util import msg, fmt, die, suf, remove_dups, get_extension
  19. from ..addr import (
  20. is_mmgen_id,
  21. MMGenAddrType,
  22. MMGenID,
  23. CoinAddr,
  24. is_mmgen_addrtype,
  25. is_coin_addr,
  26. is_addrlist_id
  27. )
  28. def mmaddr2coinaddr(cfg, mmaddr, ad_w, ad_f, proto):
  29. def wmsg(k):
  30. messages = {
  31. 'addr_in_addrfile_only': f"""
  32. Warning: output address {mmaddr} is not in the tracking wallet, which
  33. means its balance will not be tracked. You're strongly advised to import
  34. the address into your tracking wallet before broadcasting this transaction.
  35. """,
  36. 'addr_not_found': f"""
  37. No data for {gc.proj_name} address {mmaddr} could be found in either the
  38. tracking wallet or the supplied address file. Please import this address
  39. into your tracking wallet, or supply an address file on the command line.
  40. """,
  41. 'addr_not_found_no_addrfile': f"""
  42. No data for {gc.proj_name} address {mmaddr} could be found in the tracking
  43. wallet. Please import this address into your tracking wallet or supply an
  44. address file for it on the command line.
  45. """
  46. }
  47. return '\n' + fmt(messages[k], indent=' ')
  48. # assume mmaddr has already been checked
  49. coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
  50. if not coin_addr:
  51. if ad_f:
  52. coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
  53. if coin_addr:
  54. msg(wmsg('addr_in_addrfile_only'))
  55. from ..ui import keypress_confirm
  56. if not (cfg.yes or keypress_confirm(cfg, 'Continue anyway?')):
  57. import sys
  58. sys.exit(1)
  59. else:
  60. die(2, wmsg('addr_not_found'))
  61. else:
  62. die(2, wmsg('addr_not_found_no_addrfile'))
  63. return CoinAddr(proto, coin_addr)
  64. def parse_fee_spec(proto, fee_arg):
  65. import re
  66. units = {u[0]:u for u in proto.coin_amt.units}
  67. pat = re.compile(r'((?:[1-9][0-9]*)|(?:[0-9]+\.[0-9]+))({})'.format('|'.join(units)))
  68. if m := pat.match(fee_arg):
  69. return namedtuple('parsed_fee_spec', ['amt', 'unit'])(m[1], units[m[2]])
  70. class New(Base):
  71. fee_is_approximate = False
  72. msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
  73. msg_no_change_output = """
  74. ERROR: No change address specified. If you wish to create a transaction with
  75. only one output, specify a single output address with no {} amount
  76. """
  77. chg_autoselected = False
  78. _funds_available = namedtuple('funds_available', ['is_positive', 'amt'])
  79. def warn_insufficient_funds(self, amt, coin):
  80. msg(self.msg_insufficient_funds.format(amt.hl(), coin))
  81. def update_output_amt(self, idx, amt):
  82. o = self.outputs[idx]._asdict()
  83. o['amt'] = amt
  84. self.outputs[idx] = self.Output(self.proto, **o)
  85. def add_mmaddrs_to_outputs(self, ad_f, ad_w):
  86. a = [e.addr for e in self.outputs]
  87. d = ad_w.make_reverse_dict(a)
  88. if ad_f:
  89. d.update(ad_f.make_reverse_dict(a))
  90. for e in self.outputs:
  91. if e.addr and e.addr in d:
  92. e.mmid, f = d[e.addr]
  93. if f:
  94. e.comment = f
  95. def check_dup_addrs(self, io_desc):
  96. assert io_desc in ('inputs', 'outputs')
  97. addrs = [e.addr for e in getattr(self, io_desc) if e.addr]
  98. if len(addrs) != len(set(addrs)):
  99. die(2, f'{addrs}: duplicate address in transaction {io_desc}')
  100. # given tx size and absolute fee or fee spec, return absolute fee
  101. # relative fee is N+<first letter of unit name>
  102. def feespec2abs(self, fee_arg, tx_size):
  103. if type(fee_arg) is self.proto.coin_amt:
  104. return fee_arg
  105. if fee := get_obj(self.proto.coin_amt, num=fee_arg, silent=True):
  106. return fee
  107. if res := parse_fee_spec(self.proto, fee_arg):
  108. return self.fee_rel2abs(tx_size, float(res.amt), res.unit)
  109. return False
  110. def get_usr_fee_interactive(self, fee=None, *, desc='Starting'):
  111. abs_fee = None
  112. from ..ui import line_input
  113. while True:
  114. if fee:
  115. abs_fee = self.convert_and_check_fee(fee, desc)
  116. if abs_fee:
  117. if self.is_bump and not self.check_bumped_fee_ok(abs_fee):
  118. pass
  119. else:
  120. prompt = '{a} TX fee{b}: {c}{d} {e} ({f} {g})\n'.format(
  121. a = desc,
  122. b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
  123. if self.cfg.fee_adjust != 1 and desc.startswith('Network-estimated')
  124. else ''),
  125. c = ('', '≈')[self.fee_is_approximate],
  126. d = abs_fee.hl(),
  127. e = self.coin,
  128. f = pink(self.fee_abs2rel(abs_fee)),
  129. g = self.rel_fee_disp)
  130. from ..ui import keypress_confirm
  131. if self.cfg.yes or keypress_confirm(self.cfg, prompt+'OK?', default_yes=True):
  132. if self.cfg.yes:
  133. msg(prompt)
  134. return abs_fee
  135. fee = line_input(self.cfg, self.usr_fee_prompt)
  136. desc = 'User-selected'
  137. # we don't know fee yet, so perform preliminary check with fee == 0
  138. async def precheck_sufficient_funds(self, inputs_sum, sel_unspent, outputs_sum):
  139. if self.twuo.total < outputs_sum:
  140. msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum, self.dcoin))
  141. return False
  142. if inputs_sum < outputs_sum:
  143. self.warn_insufficient_funds(outputs_sum - inputs_sum, self.dcoin)
  144. return False
  145. return True
  146. def add_output(self, coinaddr, amt, *, is_chg=False, is_vault=False, data=None):
  147. self.outputs.append(
  148. self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, is_vault=is_vault, data=data))
  149. def process_data_output_arg(self, arg):
  150. return None
  151. def parse_cmdline_arg(self, proto, arg_in, ad_f, ad_w):
  152. _pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data', 'is_vault'])
  153. if data := self.process_data_output_arg(arg_in):
  154. return _pa(arg_in, None, None, None, data, False)
  155. arg, amt = arg_in.split(',', 1) if ',' in arg_in else (arg_in, None)
  156. coin_addr, mmid, is_vault = (None, None, False)
  157. if arg == 'vault' and self.is_swap:
  158. is_vault = True
  159. elif mmid := get_obj(MMGenID, proto=proto, id_str=arg, silent=True):
  160. coin_addr = mmaddr2coinaddr(self.cfg, arg, ad_w, ad_f, proto)
  161. elif is_coin_addr(proto, arg):
  162. coin_addr = CoinAddr(proto, arg)
  163. elif is_mmgen_addrtype(proto, arg) or is_addrlist_id(proto, arg):
  164. if proto.base_proto_coin != 'BTC':
  165. die(2, f'Change addresses not supported for {proto.name} protocol')
  166. self.chg_autoselected = True
  167. else:
  168. die(2, f'{arg_in}: invalid command-line argument')
  169. return _pa(arg, mmid, coin_addr, amt, None, is_vault)
  170. async def get_autochg_addr(self, proto, arg, *, exclude, desc, all_addrtypes=False):
  171. from ..tw.addresses import TwAddresses
  172. al = await TwAddresses(self.cfg, proto, get_data=True)
  173. if all_addrtypes:
  174. res = al.get_change_address_by_addrtype(None, exclude=exclude, desc=desc)
  175. req_desc = 'of any allowed address type'
  176. elif obj := get_obj(MMGenAddrType, proto=proto, id_str=arg, silent=True):
  177. res = al.get_change_address_by_addrtype(obj, exclude=exclude, desc=desc)
  178. req_desc = f'of address type {arg!r}'
  179. else:
  180. res = al.get_change_address(arg, exclude=exclude, desc=desc)
  181. req_desc = f'from address list {arg!r}'
  182. if res:
  183. return res
  184. die(2, 'Tracking wallet contains no {t}addresses {d}'.format(
  185. t = '' if res is None else 'unused ',
  186. d = req_desc))
  187. async def process_cmdline_args(self, cmd_args, ad_f, ad_w):
  188. parsed_args = [self.parse_cmdline_arg(self.proto, arg, ad_f, ad_w) for arg in cmd_args]
  189. chg_args = [a for a in parsed_args if not (a.amt or a.data)]
  190. if len(chg_args) > 1:
  191. desc = 'requested' if self.chg_autoselected else 'listed'
  192. die(2, f'ERROR: More than one change address {desc} on command line')
  193. for a in parsed_args:
  194. if a.data:
  195. self.add_output(None, self.proto.coin_amt('0'), data=a.data)
  196. else:
  197. self.add_output(
  198. coinaddr = None if a.is_vault else a.addr or (
  199. await self.get_autochg_addr(
  200. self.proto,
  201. a.arg,
  202. exclude = [a.mmid for a in parsed_args if a.mmid],
  203. desc = 'change address')).addr,
  204. amt = self.proto.coin_amt(a.amt or '0'),
  205. is_chg = not a.amt,
  206. is_vault = a.is_vault)
  207. if self.chg_idx is None:
  208. die(2,
  209. fmt(self.msg_no_change_output.format(self.dcoin)).strip()
  210. if len(self.outputs) == 1 else
  211. 'ERROR: No change output specified')
  212. if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
  213. die(2,
  214. f'{gc.proj_name} Segwit address requested on the command line, '
  215. 'but Segwit is not active on this chain')
  216. if not self.nondata_outputs:
  217. die(2, 'At least one spending output must be specified on the command line')
  218. self.add_mmaddrs_to_outputs(ad_f, ad_w)
  219. self.check_dup_addrs('outputs')
  220. if self.chg_output is not None:
  221. if self.chg_autoselected and not self.is_swap: # swap TX, so user has already confirmed
  222. self.confirm_autoselected_addr(self.chg_output.mmid, 'change address')
  223. elif len(self.nondata_outputs) > 1:
  224. await self.warn_addr_used(self.proto, self.chg_output, 'change address')
  225. def get_addrfiles_from_cmdline(self, cmd_args):
  226. from ..addrfile import AddrFile
  227. addrfile_args = remove_dups(
  228. tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
  229. desc = 'command line',
  230. edesc = 'argument',
  231. )
  232. cmd_args = tuple(a for a in cmd_args if a not in addrfile_args)
  233. if not self.is_swap:
  234. cmd_args = remove_dups(cmd_args, desc='command line', edesc='argument')
  235. return cmd_args, addrfile_args
  236. def get_addrdata_from_files(self, proto, addrfiles):
  237. from ..addrdata import AddrData
  238. from ..addrlist import AddrList
  239. from ..fileutil import check_infile
  240. ad_f = AddrData(proto)
  241. for addrfile in addrfiles:
  242. check_infile(addrfile)
  243. try:
  244. ad_f.add(AddrList(self.cfg, proto, infile=addrfile))
  245. except Exception as e:
  246. msg(f'{type(e).__name__}: {e}')
  247. return ad_f
  248. def confirm_autoselected_addr(self, mmid, desc):
  249. from ..ui import keypress_confirm
  250. if not keypress_confirm(
  251. self.cfg,
  252. 'Using {a} as {b}. OK?'.format(
  253. a = mmid.hl(),
  254. b = 'single output address' if len(self.nondata_outputs) == 1 else desc),
  255. default_yes = True):
  256. die(1, 'Exiting at user request')
  257. async def warn_addr_used(self, proto, chg, desc):
  258. from ..tw.addresses import TwAddresses
  259. if (await TwAddresses(self.cfg, proto, get_data=True)).is_used(chg.addr):
  260. from ..ui import keypress_confirm
  261. if not keypress_confirm(
  262. self.cfg,
  263. '{a} {b} {c}\n{d}'.format(
  264. a = yellow(f'Requested {desc}'),
  265. b = chg.mmid.hl() if chg.mmid else chg.addr.hl(chg.addr.view_pref),
  266. c = yellow('is already used!'),
  267. d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
  268. ),
  269. complete_prompt = True,
  270. default_yes = False):
  271. die(1, 'Exiting at user request')
  272. # inputs methods
  273. def get_unspent_nums_from_user(self, unspent):
  274. prompt = 'Enter a range or space-separated list of outputs to spend: '
  275. from ..ui import line_input
  276. while True:
  277. reply = line_input(self.cfg, prompt).strip()
  278. if reply:
  279. from ..addrlist import AddrIdxList
  280. selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()))
  281. if selected:
  282. if selected[-1] <= len(unspent):
  283. return selected
  284. msg(f'Unspent output number must be <= {len(unspent)}')
  285. def get_unspent_nums_from_inputs_opt(self, unspent):
  286. def do_add_msg(idx):
  287. uo = unspent[idx]
  288. mm_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
  289. msg('Adding input: {} {}{}'.format(idx + 1, uo.addr, mm_disp))
  290. def get_uo_nums():
  291. for addr in self.cfg.inputs.split(','):
  292. if is_mmgen_id(self.proto, addr):
  293. attr = 'twmmid'
  294. elif is_coin_addr(self.proto, addr):
  295. attr = 'addr'
  296. else:
  297. die(1, f'{addr!r}: not an MMGen ID or {self.coin} address')
  298. found = False
  299. for idx, e in enumerate(unspent):
  300. if getattr(e, attr) == addr:
  301. do_add_msg(idx)
  302. yield idx + 1
  303. found = True
  304. if not found:
  305. die(1, f'{addr!r}: address not found in tracking wallet')
  306. return set(get_uo_nums()) # silently discard duplicates
  307. def copy_inputs_from_tw(self, tw_unspent_data):
  308. def gen_inputs():
  309. for d in tw_unspent_data:
  310. i = self.Input(
  311. self.proto,
  312. **{attr:getattr(d, attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs})
  313. if d.twmmid.type == 'mmgen':
  314. i.mmid = d.twmmid # twmmid -> mmid
  315. yield i
  316. self.inputs = type(self.inputs)(self, list(gen_inputs()))
  317. async def get_funds_available(self, fee, outputs_sum):
  318. in_sum = self.sum_inputs()
  319. out_sum = outputs_sum + fee
  320. return self._funds_available(
  321. in_sum >= out_sum,
  322. # CoinAmt must be non-negative, so cannot use abs():
  323. in_sum - out_sum if in_sum >= out_sum else out_sum - in_sum)
  324. async def get_inputs(self, outputs_sum):
  325. sel_nums = (
  326. self.get_unspent_nums_from_inputs_opt if self.cfg.inputs else
  327. self.get_unspent_nums_from_user
  328. )(self.twuo.data)
  329. msg('Selected {}{}: {}'.format(
  330. self.twuo.item_desc,
  331. suf(sel_nums),
  332. ' '.join(str(n) for n in sel_nums)))
  333. sel_unspent = MMGenList(self.twuo.data[i-1] for i in sel_nums)
  334. if not await self.precheck_sufficient_funds(
  335. sum(s.amt for s in sel_unspent),
  336. sel_unspent,
  337. outputs_sum):
  338. return False
  339. self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
  340. return True
  341. async def get_fee(self, fee, outputs_sum, start_fee_desc):
  342. if fee:
  343. self.usr_fee = self.get_usr_fee_interactive(fee, desc=start_fee_desc)
  344. else:
  345. fee_per_kb, fe_type = await self.get_rel_fee_from_network()
  346. self.usr_fee = self.get_usr_fee_interactive(
  347. None if fee_per_kb is None else self.fee_est2abs(fee_per_kb, fe_type=fe_type),
  348. desc = self.network_estimated_fee_label)
  349. funds = await self.get_funds_available(self.usr_fee, outputs_sum)
  350. if funds.is_positive:
  351. p = self.final_inputs_ok_msg(funds.amt)
  352. from ..ui import keypress_confirm
  353. if self.cfg.yes or keypress_confirm(self.cfg, p+'. OK?', default_yes=True):
  354. if self.cfg.yes:
  355. msg(p)
  356. return funds.amt
  357. else:
  358. self.warn_insufficient_funds(funds.amt, self.coin)
  359. async def create(self, cmd_args, *, locktime=None, do_info=False, caller='txcreate'):
  360. assert isinstance(locktime, (int, type(None))), 'locktime must be of type int'
  361. from ..tw.unspent import TwUnspentOutputs
  362. if self.cfg.comment_file:
  363. self.add_comment(infile=self.cfg.comment_file)
  364. if not do_info:
  365. cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
  366. if self.is_swap:
  367. cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
  368. self.process_swap_options()
  369. self.proto = self.send_proto # updating self.proto!
  370. from ..rpc import rpc_init
  371. self.rpc = await rpc_init(self.cfg, self.proto)
  372. from ..addrdata import TwAddrData
  373. await self.process_cmdline_args(
  374. cmd_args,
  375. self.get_addrdata_from_files(self.proto, addrfile_args),
  376. await TwAddrData(self.cfg, self.proto, twctl=self.twctl))
  377. if not self.is_bump:
  378. self.twuo = await TwUnspentOutputs(
  379. self.cfg,
  380. self.proto,
  381. minconf = self.cfg.minconf,
  382. addrs = await self.get_input_addrs_from_inputs_opt())
  383. await self.twuo.get_data()
  384. from ..ui import do_license_msg
  385. do_license_msg(self.cfg)
  386. if not (self.is_bump or self.cfg.inputs):
  387. await self.twuo.view_filter_and_sort()
  388. if not self.is_bump:
  389. self.twuo.display_total()
  390. if do_info:
  391. del self.twuo.twctl
  392. import sys
  393. sys.exit(0)
  394. outputs_sum = self.sum_outputs()
  395. msg('Total amount to spend: {}'.format(
  396. f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'
  397. ))
  398. while True:
  399. if not await self.get_inputs(outputs_sum):
  400. continue
  401. fee_hint = None
  402. if self.is_swap:
  403. fee_hint = self.update_vault_output(
  404. self.vault_output.amt or self.sum_inputs(),
  405. deduct_est_fee = self.vault_output == self.chg_output)
  406. desc = 'User-selected' if self.cfg.fee else 'Recommended' if fee_hint else None
  407. if (funds_left := await self.get_fee(
  408. self.cfg.fee or fee_hint,
  409. outputs_sum,
  410. desc)) is not None:
  411. break
  412. self.check_non_mmgen_inputs(caller)
  413. self.update_change_output(funds_left)
  414. self.check_chg_addr_is_wallet_addr()
  415. if not self.cfg.yes:
  416. self.add_comment() # edits an existing comment
  417. if self.is_swap:
  418. self.update_vault_output(self.vault_output.amt)
  419. await self.create_serialized(locktime=locktime) # creates self.txid too
  420. self.add_timestamp()
  421. self.add_blockcount()
  422. self.chain = self.proto.chain_name
  423. self.check_fee()
  424. self.cfg._util.qmsg('Transaction successfully created')
  425. if self.is_bump:
  426. return
  427. from . import UnsignedTX
  428. new = UnsignedTX(cfg=self.cfg, data=self.__dict__, automount=self.cfg.autosign)
  429. if not self.cfg.yes:
  430. new.info.view_with_prompt('View transaction details?')
  431. del new.twuo.twctl
  432. return new