new.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. #!/usr/bin/env python3
  2. #
  3. # MMGen Wallet, a terminal-based cryptocurrency wallet
  4. # Copyright (C)2013-2025 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.new: new transaction class
  12. """
  13. from collections import namedtuple
  14. from .base import Base
  15. from ..cfg import gc
  16. from ..color import pink, yellow
  17. from ..obj import get_obj, MMGenList
  18. from ..util import msg, fmt, die, suf, remove_dups, get_extension
  19. from ..addr import (
  20. is_mmgen_id,
  21. MMGenAddrType,
  22. MMGenID,
  23. CoinAddr,
  24. is_mmgen_addrtype,
  25. is_coin_addr,
  26. is_addrlist_id
  27. )
  28. def mmaddr2coinaddr(cfg, mmaddr, ad_w, ad_f, proto):
  29. def wmsg(k):
  30. messages = {
  31. 'addr_in_addrfile_only': f"""
  32. Warning: output address {mmaddr} is not in the tracking wallet, which
  33. means its balance will not be tracked. You're strongly advised to import
  34. the address into your tracking wallet before broadcasting this transaction.
  35. """,
  36. 'addr_not_found': f"""
  37. No data for {gc.proj_name} address {mmaddr} could be found in either the
  38. tracking wallet or the supplied address file. Please import this address
  39. into your tracking wallet, or supply an address file on the command line.
  40. """,
  41. 'addr_not_found_no_addrfile': f"""
  42. No data for {gc.proj_name} address {mmaddr} could be found in the tracking
  43. wallet. Please import this address into your tracking wallet or supply an
  44. address file for it on the command line.
  45. """
  46. }
  47. return '\n' + fmt(messages[k], indent=' ')
  48. # assume mmaddr has already been checked
  49. coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
  50. if not coin_addr:
  51. if ad_f:
  52. coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
  53. if coin_addr:
  54. msg(wmsg('addr_in_addrfile_only'))
  55. from ..ui import keypress_confirm
  56. if not (cfg.yes or keypress_confirm(cfg, 'Continue anyway?')):
  57. import sys
  58. sys.exit(1)
  59. else:
  60. die(2, wmsg('addr_not_found'))
  61. else:
  62. die(2, wmsg('addr_not_found_no_addrfile'))
  63. return CoinAddr(proto, coin_addr)
  64. class New(Base):
  65. fee_is_approximate = False
  66. msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
  67. msg_no_change_output = """
  68. ERROR: No change address specified. If you wish to create a transaction with
  69. only one output, specify a single output address with no {} amount
  70. """
  71. msg_insufficient_funds = 'Selected outputs insufficient to fund this transaction ({} {} needed)'
  72. chg_autoselected = False
  73. _funds_available = namedtuple('funds_available', ['is_positive', 'amt'])
  74. def __init__(self, *args, target=None, **kwargs):
  75. self.is_swap = target == 'swaptx'
  76. super().__init__(*args, **kwargs)
  77. def warn_insufficient_funds(self, amt, coin):
  78. msg(self.msg_insufficient_funds.format(amt.hl(), coin))
  79. def update_output_amt(self, idx, amt):
  80. o = self.outputs[idx]._asdict()
  81. o['amt'] = amt
  82. self.outputs[idx] = self.Output(self.proto, **o)
  83. def add_mmaddrs_to_outputs(self, ad_f, ad_w):
  84. a = [e.addr for e in self.outputs]
  85. d = ad_w.make_reverse_dict(a)
  86. if ad_f:
  87. d.update(ad_f.make_reverse_dict(a))
  88. for e in self.outputs:
  89. if e.addr and e.addr in d:
  90. e.mmid, f = d[e.addr]
  91. if f:
  92. e.comment = f
  93. def check_dup_addrs(self, io_desc):
  94. assert io_desc in ('inputs', 'outputs')
  95. addrs = [e.addr for e in getattr(self, io_desc) if e.addr]
  96. if len(addrs) != len(set(addrs)):
  97. die(2, f'{addrs}: duplicate address in transaction {io_desc}')
  98. # given tx size and absolute fee or fee spec, return absolute fee
  99. # relative fee is N+<first letter of unit name>
  100. def feespec2abs(self, fee_arg, tx_size):
  101. if type(fee_arg) is self.proto.coin_amt:
  102. return fee_arg
  103. if fee := get_obj(self.proto.coin_amt, num=fee_arg, silent=True):
  104. return fee
  105. import re
  106. units = {u[0]:u for u in self.proto.coin_amt.units}
  107. pat = re.compile(r'([1-9][0-9]*)({})'.format('|'.join(units)))
  108. if pat.match(fee_arg):
  109. amt, unit = pat.match(fee_arg).groups()
  110. return self.fee_rel2abs(tx_size, units, int(amt), unit)
  111. return False
  112. def get_usr_fee_interactive(self, fee=None, desc='Starting'):
  113. abs_fee = None
  114. from ..ui import line_input
  115. while True:
  116. if fee:
  117. abs_fee = self.convert_and_check_fee(fee, desc)
  118. if abs_fee:
  119. if self.is_bump and not self.check_bumped_fee_ok(abs_fee):
  120. pass
  121. else:
  122. prompt = '{a} TX fee{b}: {c}{d} {e} ({f} {g})\n'.format(
  123. a = desc,
  124. b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
  125. if self.cfg.fee_adjust != 1 and desc.startswith('Network-estimated')
  126. else ''),
  127. c = ('', '≈')[self.fee_is_approximate],
  128. d = abs_fee.hl(),
  129. e = self.coin,
  130. f = pink(self.fee_abs2rel(abs_fee)),
  131. g = self.rel_fee_disp)
  132. from ..ui import keypress_confirm
  133. if self.cfg.yes or keypress_confirm(self.cfg, prompt+'OK?', default_yes=True):
  134. if self.cfg.yes:
  135. msg(prompt)
  136. return abs_fee
  137. fee = line_input(self.cfg, self.usr_fee_prompt)
  138. desc = 'User-selected'
  139. # we don't know fee yet, so perform preliminary check with fee == 0
  140. async def precheck_sufficient_funds(self, inputs_sum, sel_unspent, outputs_sum):
  141. if self.twuo.total < outputs_sum:
  142. msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum, self.dcoin))
  143. return False
  144. if inputs_sum < outputs_sum:
  145. self.warn_insufficient_funds(outputs_sum - inputs_sum, self.dcoin)
  146. return False
  147. return True
  148. def add_output(self, coinaddr, amt, is_chg=False, data=None):
  149. self.outputs.append(self.Output(self.proto, addr=coinaddr, amt=amt, is_chg=is_chg, data=data))
  150. def process_data_output_arg(self, arg):
  151. return None
  152. def parse_cmdline_arg(self, proto, arg_in, ad_f, ad_w):
  153. _pa = namedtuple('txcreate_cmdline_output', ['arg', 'mmid', 'addr', 'amt', 'data'])
  154. if data := self.process_data_output_arg(arg_in):
  155. return _pa(arg_in, None, None, None, data)
  156. arg, amt = arg_in.split(',', 1) if ',' in arg_in else (arg_in, None)
  157. if mmid := get_obj(MMGenID, proto=proto, id_str=arg, silent=True):
  158. coin_addr = mmaddr2coinaddr(self.cfg, arg, ad_w, ad_f, proto)
  159. elif is_coin_addr(proto, arg):
  160. coin_addr = CoinAddr(proto, arg)
  161. elif is_mmgen_addrtype(proto, arg) or is_addrlist_id(proto, arg):
  162. if proto.base_proto_coin != 'BTC':
  163. die(2, f'Change addresses not supported for {proto.name} protocol')
  164. self.chg_autoselected = True
  165. coin_addr = None
  166. else:
  167. die(2, f'{arg_in}: invalid command-line argument')
  168. return _pa(arg, mmid, coin_addr, amt, None)
  169. async def get_autochg_addr(self, proto, arg, exclude, desc):
  170. from ..tw.addresses import TwAddresses
  171. al = await TwAddresses(self.cfg, proto, get_data=True)
  172. if obj := get_obj(MMGenAddrType, proto=proto, id_str=arg, silent=True):
  173. res = al.get_change_address_by_addrtype(obj, exclude=exclude, desc=desc)
  174. req_desc = f'of address type {arg!r}'
  175. else:
  176. res = al.get_change_address(arg, exclude=exclude, desc=desc)
  177. req_desc = f'from address list {arg!r}'
  178. if res:
  179. return res
  180. die(2, 'Tracking wallet contains no {t}addresses {d}'.format(
  181. t = '' if res is None else 'unused ',
  182. d = req_desc))
  183. async def process_cmdline_args(self, cmd_args, ad_f, ad_w):
  184. parsed_args = [self.parse_cmdline_arg(self.proto, arg, ad_f, ad_w) for arg in cmd_args]
  185. chg_args = [a for a in parsed_args if not ((a.amt and a.addr) or a.data)]
  186. if len(chg_args) > 1:
  187. desc = 'requested' if self.chg_autoselected else 'listed'
  188. die(2, f'ERROR: More than one change address {desc} on command line')
  189. for a in parsed_args:
  190. if a.data:
  191. self.add_output(None, self.proto.coin_amt('0'), data=a.data)
  192. else:
  193. exclude = [a.mmid for a in parsed_args if a.mmid]
  194. self.add_output(
  195. coinaddr = a.addr or (
  196. await self.get_autochg_addr(self.proto, a.arg, exclude=exclude, desc='change address')
  197. ).addr,
  198. amt = self.proto.coin_amt(a.amt or '0'),
  199. is_chg = not a.amt)
  200. if self.chg_idx is None:
  201. die(2,
  202. fmt(self.msg_no_change_output.format(self.dcoin)).strip()
  203. if len(self.outputs) == 1 else
  204. 'ERROR: No change output specified')
  205. if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
  206. die(2,
  207. f'{gc.proj_name} Segwit address requested on the command line, '
  208. 'but Segwit is not active on this chain')
  209. if not self.nondata_outputs:
  210. die(2, 'At least one spending output must be specified on the command line')
  211. self.add_mmaddrs_to_outputs(ad_f, ad_w)
  212. self.check_dup_addrs('outputs')
  213. if self.chg_output is not None:
  214. if self.chg_autoselected:
  215. self.confirm_autoselected_addr(self.chg_output.mmid, 'change address')
  216. elif len(self.nondata_outputs) > 1:
  217. await self.warn_addr_used(self.proto, self.chg_output, 'change address')
  218. def get_addrfiles_from_cmdline(self, cmd_args):
  219. from ..addrfile import AddrFile
  220. addrfile_args = remove_dups(
  221. tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
  222. desc = 'command line',
  223. edesc = 'argument',
  224. )
  225. cmd_args = tuple(a for a in cmd_args if a not in addrfile_args)
  226. if not self.is_swap:
  227. cmd_args = remove_dups(cmd_args, desc='command line', edesc='argument')
  228. return cmd_args, addrfile_args
  229. def get_addrdata_from_files(self, proto, addrfiles):
  230. from ..addrdata import AddrData
  231. from ..addrlist import AddrList
  232. from ..fileutil import check_infile
  233. ad_f = AddrData(proto)
  234. for addrfile in addrfiles:
  235. check_infile(addrfile)
  236. try:
  237. ad_f.add(AddrList(self.cfg, proto, addrfile))
  238. except Exception as e:
  239. msg(f'{type(e).__name__}: {e}')
  240. return ad_f
  241. def confirm_autoselected_addr(self, mmid, desc):
  242. from ..ui import keypress_confirm
  243. if not keypress_confirm(
  244. self.cfg,
  245. 'Using {a} as {b}. OK?'.format(
  246. a = mmid.hl(),
  247. b = 'single output address' if len(self.nondata_outputs) == 1 else desc),
  248. default_yes = True):
  249. die(1, 'Exiting at user request')
  250. async def warn_addr_used(self, proto, chg, desc):
  251. from ..tw.addresses import TwAddresses
  252. if (await TwAddresses(self.cfg, proto, get_data=True)).is_used(chg.addr):
  253. from ..ui import keypress_confirm
  254. if not keypress_confirm(
  255. self.cfg,
  256. '{a} {b} {c}\n{d}'.format(
  257. a = yellow(f'Requested {desc}'),
  258. b = chg.mmid.hl() if chg.mmid else chg.addr.hl(chg.addr.view_pref),
  259. c = yellow('is already used!'),
  260. d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
  261. ),
  262. complete_prompt = True,
  263. default_yes = False):
  264. die(1, 'Exiting at user request')
  265. # inputs methods
  266. def get_unspent_nums_from_user(self, unspent):
  267. prompt = 'Enter a range or space-separated list of outputs to spend: '
  268. from ..ui import line_input
  269. while True:
  270. reply = line_input(self.cfg, prompt).strip()
  271. if reply:
  272. from ..addrlist import AddrIdxList
  273. selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()))
  274. if selected:
  275. if selected[-1] <= len(unspent):
  276. return selected
  277. msg(f'Unspent output number must be <= {len(unspent)}')
  278. def get_unspent_nums_from_inputs_opt(self, unspent):
  279. def do_add_msg(idx):
  280. uo = unspent[idx]
  281. mm_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
  282. msg('Adding input: {} {}{}'.format(idx + 1, uo.addr, mm_disp))
  283. def get_uo_nums():
  284. for addr in self.cfg.inputs.split(','):
  285. if is_mmgen_id(self.proto, addr):
  286. attr = 'twmmid'
  287. elif is_coin_addr(self.proto, addr):
  288. attr = 'addr'
  289. else:
  290. die(1, f'{addr!r}: not an MMGen ID or {self.coin} address')
  291. found = False
  292. for idx, e in enumerate(unspent):
  293. if getattr(e, attr) == addr:
  294. do_add_msg(idx)
  295. yield idx + 1
  296. found = True
  297. if not found:
  298. die(1, f'{addr!r}: address not found in tracking wallet')
  299. return set(get_uo_nums()) # silently discard duplicates
  300. def copy_inputs_from_tw(self, tw_unspent_data):
  301. def gen_inputs():
  302. for d in tw_unspent_data:
  303. i = self.Input(
  304. self.proto,
  305. **{attr:getattr(d, attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs})
  306. if d.twmmid.type == 'mmgen':
  307. i.mmid = d.twmmid # twmmid -> mmid
  308. yield i
  309. self.inputs = type(self.inputs)(self, list(gen_inputs()))
  310. async def get_funds_available(self, fee, outputs_sum):
  311. in_sum = self.sum_inputs()
  312. out_sum = outputs_sum + fee
  313. return self._funds_available(
  314. in_sum >= out_sum,
  315. # CoinAmt must be non-negative, so cannot use abs():
  316. in_sum - out_sum if in_sum >= out_sum else out_sum - in_sum)
  317. async def get_inputs(self, outputs_sum):
  318. sel_nums = (
  319. self.get_unspent_nums_from_inputs_opt if self.cfg.inputs else
  320. self.get_unspent_nums_from_user
  321. )(self.twuo.data)
  322. msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums)))
  323. sel_unspent = MMGenList(self.twuo.data[i-1] for i in sel_nums)
  324. if not await self.precheck_sufficient_funds(
  325. sum(s.amt for s in sel_unspent),
  326. sel_unspent,
  327. outputs_sum):
  328. return False
  329. self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
  330. return True
  331. async def get_fee(self, fee, outputs_sum):
  332. if fee:
  333. self.usr_fee = self.get_usr_fee_interactive(fee, 'User-selected')
  334. else:
  335. fee_per_kb, fe_type = await self.get_rel_fee_from_network()
  336. self.usr_fee = self.get_usr_fee_interactive(
  337. None if fee_per_kb is None else self.fee_est2abs(fee_per_kb, fe_type),
  338. self.network_estimated_fee_label)
  339. funds = await self.get_funds_available(self.usr_fee, outputs_sum)
  340. if funds.is_positive:
  341. p = self.final_inputs_ok_msg(funds.amt)
  342. from ..ui import keypress_confirm
  343. if self.cfg.yes or keypress_confirm(self.cfg, p+'. OK?', default_yes=True):
  344. if self.cfg.yes:
  345. msg(p)
  346. return funds.amt
  347. else:
  348. self.warn_insufficient_funds(funds.amt, self.coin)
  349. async def create(self, cmd_args, locktime=None, do_info=False, caller='txcreate'):
  350. assert isinstance(locktime, (int, type(None))), 'locktime must be of type int'
  351. from ..tw.unspent import TwUnspentOutputs
  352. if self.cfg.comment_file:
  353. self.add_comment(self.cfg.comment_file)
  354. if not do_info:
  355. cmd_args, addrfile_args = self.get_addrfiles_from_cmdline(cmd_args)
  356. if self.is_swap:
  357. # updates self.proto!
  358. self.proto, cmd_args = await self.process_swap_cmdline_args(cmd_args, addrfile_args)
  359. from ..rpc import rpc_init
  360. self.rpc = await rpc_init(self.cfg, self.proto)
  361. from ..addrdata import TwAddrData
  362. await self.process_cmdline_args(
  363. cmd_args,
  364. self.get_addrdata_from_files(self.proto, addrfile_args),
  365. await TwAddrData(self.cfg, self.proto, twctl=self.twctl))
  366. if not self.is_bump:
  367. self.twuo = await TwUnspentOutputs(
  368. self.cfg,
  369. self.proto,
  370. minconf = self.cfg.minconf,
  371. addrs = await self.get_input_addrs_from_inputs_opt())
  372. await self.twuo.get_data()
  373. from ..ui import do_license_msg
  374. do_license_msg(self.cfg)
  375. if not (self.is_bump or self.cfg.inputs):
  376. await self.twuo.view_filter_and_sort()
  377. if not self.is_bump:
  378. self.twuo.display_total()
  379. if do_info:
  380. del self.twuo.twctl
  381. import sys
  382. sys.exit(0)
  383. outputs_sum = self.sum_outputs()
  384. msg('Total amount to spend: {}'.format(
  385. f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'
  386. ))
  387. while True:
  388. if not await self.get_inputs(outputs_sum):
  389. continue
  390. fee_hint = None
  391. if self.is_swap:
  392. fee_hint = self.update_vault_output(self.vault_output.amt or self.sum_inputs())
  393. if funds_left := await self.get_fee(fee_hint or self.cfg.fee, outputs_sum):
  394. break
  395. self.check_non_mmgen_inputs(caller)
  396. self.update_change_output(funds_left)
  397. self.check_chg_addr_is_wallet_addr()
  398. if not self.cfg.yes:
  399. self.add_comment() # edits an existing comment
  400. await self.create_serialized(locktime=locktime) # creates self.txid too
  401. self.add_timestamp()
  402. self.add_blockcount()
  403. self.chain = self.proto.chain_name
  404. self.check_fee()
  405. self.cfg._util.qmsg('Transaction successfully created')
  406. if self.is_bump:
  407. return
  408. from . import UnsignedTX
  409. new = UnsignedTX(cfg=self.cfg, data=self.__dict__, automount=self.cfg.autosign)
  410. if not self.cfg.yes:
  411. new.info.view_with_prompt('View transaction details?')
  412. del new.twuo.twctl
  413. return new