new.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.new: new transaction class
  12. """
  13. from collections import namedtuple
  14. from .base import Base
  15. from ..cfg import gc
  16. from ..color import pink, yellow
  17. from ..obj import get_obj, MMGenList
  18. from ..util import msg, fmt, die, suf, remove_dups, get_extension
  19. from ..addr import (
  20. is_mmgen_id,
  21. MMGenAddrType,
  22. MMGenID,
  23. CoinAddr,
  24. is_mmgen_addrtype,
  25. is_coin_addr,
  26. is_addrlist_id
  27. )
  28. def mmaddr2coinaddr(cfg, mmaddr, ad_w, ad_f, proto):
  29. def wmsg(k):
  30. messages = {
  31. 'addr_in_addrfile_only': f"""
  32. Warning: output address {mmaddr} is not in the tracking wallet, which
  33. means its balance will not be tracked. You're strongly advised to import
  34. the address into your tracking wallet before broadcasting this transaction.
  35. """,
  36. 'addr_not_found': f"""
  37. No data for {gc.proj_name} address {mmaddr} could be found in either the
  38. tracking wallet or the supplied address file. Please import this address
  39. into your tracking wallet, or supply an address file on the command line.
  40. """,
  41. 'addr_not_found_no_addrfile': f"""
  42. No data for {gc.proj_name} address {mmaddr} could be found in the tracking
  43. wallet. Please import this address into your tracking wallet or supply an
  44. address file for it on the command line.
  45. """
  46. }
  47. return '\n' + fmt(messages[k], indent=' ')
  48. # assume mmaddr has already been checked
  49. coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
  50. if not coin_addr:
  51. if ad_f:
  52. coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
  53. if coin_addr:
  54. msg(wmsg('addr_in_addrfile_only'))
  55. from ..ui import keypress_confirm
  56. if not (cfg.yes or keypress_confirm(cfg, 'Continue anyway?')):
  57. import sys
  58. sys.exit(1)
  59. else:
  60. die(2, wmsg('addr_not_found'))
  61. else:
  62. die(2, wmsg('addr_not_found_no_addrfile'))
  63. return CoinAddr(proto, coin_addr)
  64. def parse_fee_spec(proto, fee_arg):
  65. import re
  66. units = {u[0]:u for u in proto.coin_amt.units}
  67. pat = re.compile(r'((?:[1-9][0-9]*)|(?:[0-9]+\.[0-9]+))({})'.format('|'.join(units)))
  68. if m := pat.match(fee_arg):
  69. return namedtuple('parsed_fee_spec', ['amt', 'unit'])(m[1], units[m[2]])
  70. class New(Base):
  71. fee_is_approximate = False
  72. msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
  73. msg_no_change_output = """
  74. ERROR: No change address specified. If you wish to create a transaction with
  75. only one output, specify a single output address with no {} amount
  76. """
  77. msg_insufficient_funds = 'Selected outputs insufficient to fund this transaction ({} {} needed)'
  78. chg_autoselected = False
  79. _funds_available = namedtuple('funds_available', ['is_positive', 'amt'])
  80. def warn_insufficient_funds(self, amt, coin):
  81. msg(self.msg_insufficient_funds.format(amt.hl(), coin))
  82. def update_output_amt(self, idx, amt):
  83. o = self.outputs[idx]._asdict()
  84. o['amt'] = amt
  85. self.outputs[idx] = self.Output(self.proto, **o)
  86. def add_mmaddrs_to_outputs(self, ad_f, ad_w):
  87. a = [e.addr for e in self.outputs]
  88. d = ad_w.make_reverse_dict(a)
  89. if ad_f:
  90. d.update(ad_f.make_reverse_dict(a))
  91. for e in self.outputs:
  92. if e.addr and e.addr in d:
  93. e.mmid, f = d[e.addr]
  94. if f:
  95. e.comment = f
  96. def check_dup_addrs(self, io_desc):
  97. assert io_desc in ('inputs', 'outputs')
  98. addrs = [e.addr for e in getattr(self, io_desc) if e.addr]
  99. if len(addrs) != len(set(addrs)):
  100. die(2, f'{addrs}: duplicate address in transaction {io_desc}')
  101. # given tx size and absolute fee or fee spec, return absolute fee
  102. # relative fee is N+<first letter of unit name>
  103. def feespec2abs(self, fee_arg, tx_size):
  104. if type(fee_arg) is self.proto.coin_amt:
  105. return fee_arg
  106. if fee := get_obj(self.proto.coin_amt, num=fee_arg, silent=True):
  107. return fee
  108. if res := parse_fee_spec(self.proto, fee_arg):
  109. return self.fee_rel2abs(tx_size, float(res.amt), res.unit)
  110. return False
  111. def get_usr_fee_interactive(self, fee=None, *, desc='Starting'):
  112. abs_fee = None
  113. from ..ui import line_input
  114. while True:
  115. if fee:
  116. abs_fee = self.convert_and_check_fee(fee, desc)
  117. if abs_fee:
  118. if self.is_bump and not self.check_bumped_fee_ok(abs_fee):
  119. pass
  120. else:
  121. prompt = '{a} TX fee{b}: {c}{d} {e} ({f} {g})\n'.format(
  122. a = desc,
  123. b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
  124. if self.cfg.fee_adjust != 1 and desc.startswith('Network-estimated')
  125. else ''),
  126. c = ('', '≈')[self.fee_is_approximate],
  127. d = abs_fee.hl(),
  128. e = self.coin,
  129. f = pink(self.fee_abs2rel(abs_fee)),
  130. g = self.rel_fee_disp)
  131. from ..ui import keypress_confirm
  132. if self.cfg.yes or keypress_confirm(self.cfg, prompt+'OK?', default_yes=True):
  133. if self.cfg.yes:
  134. msg(prompt)
  135. return abs_fee
  136. fee = line_input(self.cfg, self.usr_fee_prompt)
  137. desc = 'User-selected'
  138. # we don't know fee yet, so perform preliminary check with fee == 0
  139. async def precheck_sufficient_funds(self, inputs_sum, sel_unspent, outputs_sum):
  140. if self.twuo.total < outputs_sum:
  141. msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum, self.dcoin))
  142. return False
  143. if inputs_sum < outputs_sum:
  144. self.warn_insufficient_funds(outputs_sum - inputs_sum, self.dcoin)
  145. return False
  146. return True
  147. def add_output(self, coinaddr, amt, *, is_chg=False, is_vault=False, data=None):
  148. self.outputs.append(
  149. self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, is_vault=is_vault, data=data))
  150. def process_data_output_arg(self, arg):
  151. return None
  152. def parse_cmdline_arg(self, proto, arg_in, ad_f, ad_w):
  153. _pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data', 'is_vault'])
  154. if data := self.process_data_output_arg(arg_in):
  155. return _pa(arg_in, None, None, None, data, False)
  156. arg, amt = arg_in.split(',', 1) if ',' in arg_in else (arg_in, None)
  157. coin_addr, mmid, is_vault = (None, None, False)
  158. if arg == 'vault' and self.is_swap:
  159. is_vault = True
  160. elif mmid := get_obj(MMGenID, proto=proto, id_str=arg, silent=True):
  161. coin_addr = mmaddr2coinaddr(self.cfg, arg, ad_w, ad_f, proto)
  162. elif is_coin_addr(proto, arg):
  163. coin_addr = CoinAddr(proto, arg)
  164. elif is_mmgen_addrtype(proto, arg) or is_addrlist_id(proto, arg):
  165. if proto.base_proto_coin != 'BTC':
  166. die(2, f'Change addresses not supported for {proto.name} protocol')
  167. self.chg_autoselected = True
  168. else:
  169. die(2, f'{arg_in}: invalid command-line argument')
  170. return _pa(arg, mmid, coin_addr, amt, None, is_vault)
  171. async def get_autochg_addr(self, proto, arg, *, exclude, desc, all_addrtypes=False):
  172. from ..tw.addresses import TwAddresses
  173. al = await TwAddresses(self.cfg, proto, get_data=True)
  174. if all_addrtypes:
  175. res = al.get_change_address_by_addrtype(None, exclude=exclude, desc=desc)
  176. req_desc = 'of any allowed address type'
  177. elif obj := get_obj(MMGenAddrType, proto=proto, id_str=arg, silent=True):
  178. res = al.get_change_address_by_addrtype(obj, exclude=exclude, desc=desc)
  179. req_desc = f'of address type {arg!r}'
  180. else:
  181. res = al.get_change_address(arg, exclude=exclude, desc=desc)
  182. req_desc = f'from address list {arg!r}'
  183. if res:
  184. return res
  185. die(2, 'Tracking wallet contains no {t}addresses {d}'.format(
  186. t = '' if res is None else 'unused ',
  187. d = req_desc))
  188. async def process_cmdline_args(self, cmd_args, ad_f, ad_w):
  189. parsed_args = [self.parse_cmdline_arg(self.proto, arg, ad_f, ad_w) for arg in cmd_args]
  190. chg_args = [a for a in parsed_args if not (a.amt or a.data)]
  191. if len(chg_args) > 1:
  192. desc = 'requested' if self.chg_autoselected else 'listed'
  193. die(2, f'ERROR: More than one change address {desc} on command line')
  194. for a in parsed_args:
  195. if a.data:
  196. self.add_output(None, self.proto.coin_amt('0'), data=a.data)
  197. else:
  198. self.add_output(
  199. coinaddr = None if a.is_vault else a.addr or (
  200. await self.get_autochg_addr(
  201. self.proto,
  202. a.arg,
  203. exclude = [a.mmid for a in parsed_args if a.mmid],
  204. desc = 'change address')).addr,
  205. amt = self.proto.coin_amt(a.amt or '0'),
  206. is_chg = not a.amt,
  207. is_vault = a.is_vault)
  208. if self.chg_idx is None:
  209. die(2,
  210. fmt(self.msg_no_change_output.format(self.dcoin)).strip()
  211. if len(self.outputs) == 1 else
  212. 'ERROR: No change output specified')
  213. if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
  214. die(2,
  215. f'{gc.proj_name} Segwit address requested on the command line, '
  216. 'but Segwit is not active on this chain')
  217. if not self.nondata_outputs:
  218. die(2, 'At least one spending output must be specified on the command line')
  219. self.add_mmaddrs_to_outputs(ad_f, ad_w)
  220. self.check_dup_addrs('outputs')
  221. if self.chg_output is not None:
  222. if self.chg_autoselected and not self.is_swap: # swap TX, so user has already confirmed
  223. self.confirm_autoselected_addr(self.chg_output.mmid, 'change address')
  224. elif len(self.nondata_outputs) > 1:
  225. await self.warn_addr_used(self.proto, self.chg_output, 'change address')
  226. def get_addrfiles_from_cmdline(self, cmd_args):
  227. from ..addrfile import AddrFile
  228. addrfile_args = remove_dups(
  229. tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
  230. desc = 'command line',
  231. edesc = 'argument',
  232. )
  233. cmd_args = tuple(a for a in cmd_args if a not in addrfile_args)
  234. if not self.is_swap:
  235. cmd_args = remove_dups(cmd_args, desc='command line', edesc='argument')
  236. return cmd_args, addrfile_args
  237. def get_addrdata_from_files(self, proto, addrfiles):
  238. from ..addrdata import AddrData
  239. from ..addrlist import AddrList
  240. from ..fileutil import check_infile
  241. ad_f = AddrData(proto)
  242. for addrfile in addrfiles:
  243. check_infile(addrfile)
  244. try:
  245. ad_f.add(AddrList(self.cfg, proto, infile=addrfile))
  246. except Exception as e:
  247. msg(f'{type(e).__name__}: {e}')
  248. return ad_f
  249. def confirm_autoselected_addr(self, mmid, desc):
  250. from ..ui import keypress_confirm
  251. if not keypress_confirm(
  252. self.cfg,
  253. 'Using {a} as {b}. OK?'.format(
  254. a = mmid.hl(),
  255. b = 'single output address' if len(self.nondata_outputs) == 1 else desc),
  256. default_yes = True):
  257. die(1, 'Exiting at user request')
  258. async def warn_addr_used(self, proto, chg, desc):
  259. from ..tw.addresses import TwAddresses
  260. if (await TwAddresses(self.cfg, proto, get_data=True)).is_used(chg.addr):
  261. from ..ui import keypress_confirm
  262. if not keypress_confirm(
  263. self.cfg,
  264. '{a} {b} {c}\n{d}'.format(
  265. a = yellow(f'Requested {desc}'),
  266. b = chg.mmid.hl() if chg.mmid else chg.addr.hl(chg.addr.view_pref),
  267. c = yellow('is already used!'),
  268. d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
  269. ),
  270. complete_prompt = True,
  271. default_yes = False):
  272. die(1, 'Exiting at user request')
  273. # inputs methods
  274. def get_unspent_nums_from_user(self, unspent):
  275. prompt = 'Enter a range or space-separated list of outputs to spend: '
  276. from ..ui import line_input
  277. while True:
  278. reply = line_input(self.cfg, prompt).strip()
  279. if reply:
  280. from ..addrlist import AddrIdxList
  281. selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()))
  282. if selected:
  283. if selected[-1] <= len(unspent):
  284. return selected
  285. msg(f'Unspent output number must be <= {len(unspent)}')
  286. def get_unspent_nums_from_inputs_opt(self, unspent):
  287. def do_add_msg(idx):
  288. uo = unspent[idx]
  289. mm_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
  290. msg('Adding input: {} {}{}'.format(idx + 1, uo.addr, mm_disp))
  291. def get_uo_nums():
  292. for addr in self.cfg.inputs.split(','):
  293. if is_mmgen_id(self.proto, addr):
  294. attr = 'twmmid'
  295. elif is_coin_addr(self.proto, addr):
  296. attr = 'addr'
  297. else:
  298. die(1, f'{addr!r}: not an MMGen ID or {self.coin} address')
  299. found = False
  300. for idx, e in enumerate(unspent):
  301. if getattr(e, attr) == addr:
  302. do_add_msg(idx)
  303. yield idx + 1
  304. found = True
  305. if not found:
  306. die(1, f'{addr!r}: address not found in tracking wallet')
  307. return set(get_uo_nums()) # silently discard duplicates
  308. def copy_inputs_from_tw(self, tw_unspent_data):
  309. def gen_inputs():
  310. for d in tw_unspent_data:
  311. i = self.Input(
  312. self.proto,
  313. **{attr:getattr(d, attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs})
  314. if d.twmmid.type == 'mmgen':
  315. i.mmid = d.twmmid # twmmid -> mmid
  316. yield i
  317. self.inputs = type(self.inputs)(self, list(gen_inputs()))
  318. async def get_funds_available(self, fee, outputs_sum):
  319. in_sum = self.sum_inputs()
  320. out_sum = outputs_sum + fee
  321. return self._funds_available(
  322. in_sum >= out_sum,
  323. # CoinAmt must be non-negative, so cannot use abs():
  324. in_sum - out_sum if in_sum >= out_sum else out_sum - in_sum)
  325. async def get_inputs(self, outputs_sum):
  326. sel_nums = (
  327. self.get_unspent_nums_from_inputs_opt if self.cfg.inputs else
  328. self.get_unspent_nums_from_user
  329. )(self.twuo.data)
  330. msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums)))
  331. sel_unspent = MMGenList(self.twuo.data[i-1] for i in sel_nums)
  332. if not await self.precheck_sufficient_funds(
  333. sum(s.amt for s in sel_unspent),
  334. sel_unspent,
  335. outputs_sum):
  336. return False
  337. self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
  338. return True
  339. async def get_fee(self, fee, outputs_sum, start_fee_desc):
  340. if fee:
  341. self.usr_fee = self.get_usr_fee_interactive(fee, desc=start_fee_desc)
  342. else:
  343. fee_per_kb, fe_type = await self.get_rel_fee_from_network()
  344. self.usr_fee = self.get_usr_fee_interactive(
  345. None if fee_per_kb is None else self.fee_est2abs(fee_per_kb, fe_type=fe_type),
  346. desc = self.network_estimated_fee_label)
  347. funds = await self.get_funds_available(self.usr_fee, outputs_sum)
  348. if funds.is_positive:
  349. p = self.final_inputs_ok_msg(funds.amt)
  350. from ..ui import keypress_confirm
  351. if self.cfg.yes or keypress_confirm(self.cfg, p+'. OK?', default_yes=True):
  352. if self.cfg.yes:
  353. msg(p)
  354. return funds.amt
  355. else:
  356. self.warn_insufficient_funds(funds.amt, self.coin)
  357. async def create(self, cmd_args, *, locktime=None, do_info=False, caller='txcreate'):
  358. assert isinstance(locktime, (int, type(None))), 'locktime must be of type int'
  359. from ..tw.unspent import TwUnspentOutputs
  360. if self.cfg.comment_file:
  361. self.add_comment(infile=self.cfg.comment_file)
  362. if not do_info:
  363. cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
  364. if self.is_swap:
  365. cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
  366. self.process_swap_options()
  367. self.proto = self.send_proto # updating self.proto!
  368. from ..rpc import rpc_init
  369. self.rpc = await rpc_init(self.cfg, self.proto)
  370. from ..addrdata import TwAddrData
  371. await self.process_cmdline_args(
  372. cmd_args,
  373. self.get_addrdata_from_files(self.proto, addrfile_args),
  374. await TwAddrData(self.cfg, self.proto, twctl=self.twctl))
  375. if not self.is_bump:
  376. self.twuo = await TwUnspentOutputs(
  377. self.cfg,
  378. self.proto,
  379. minconf = self.cfg.minconf,
  380. addrs = await self.get_input_addrs_from_inputs_opt())
  381. await self.twuo.get_data()
  382. from ..ui import do_license_msg
  383. do_license_msg(self.cfg)
  384. if not (self.is_bump or self.cfg.inputs):
  385. await self.twuo.view_filter_and_sort()
  386. if not self.is_bump:
  387. self.twuo.display_total()
  388. if do_info:
  389. del self.twuo.twctl
  390. import sys
  391. sys.exit(0)
  392. outputs_sum = self.sum_outputs()
  393. msg('Total amount to spend: {}'.format(
  394. f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'
  395. ))
  396. while True:
  397. if not await self.get_inputs(outputs_sum):
  398. continue
  399. fee_hint = None
  400. if self.is_swap:
  401. fee_hint = self.update_vault_output(
  402. self.vault_output.amt or self.sum_inputs(),
  403. deduct_est_fee = self.vault_output == self.chg_output)
  404. if funds_left := await self.get_fee(
  405. self.cfg.fee or fee_hint,
  406. outputs_sum,
  407. 'User-selected' if self.cfg.fee else 'Recommended' if fee_hint else None):
  408. break
  409. self.check_non_mmgen_inputs(caller)
  410. self.update_change_output(funds_left)
  411. self.check_chg_addr_is_wallet_addr()
  412. if not self.cfg.yes:
  413. self.add_comment() # edits an existing comment
  414. if self.is_swap:
  415. self.update_vault_output(self.vault_output.amt)
  416. await self.create_serialized(locktime=locktime) # creates self.txid too
  417. self.add_timestamp()
  418. self.add_blockcount()
  419. self.chain = self.proto.chain_name
  420. self.check_fee()
  421. self.cfg._util.qmsg('Transaction successfully created')
  422. if self.is_bump:
  423. return
  424. from . import UnsignedTX
  425. new = UnsignedTX(cfg=self.cfg, data=self.__dict__, automount=self.cfg.autosign)
  426. if not self.cfg.yes:
  427. new.info.view_with_prompt('View transaction details?')
  428. del new.twuo.twctl
  429. return new