new.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2024 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen-wallet
  9. # https://gitlab.com/mmgen/mmgen-wallet
  10. """
  11. tx.new: new transaction class
  12. """
  13. from .base import Base
  14. from ..cfg import gc
  15. from ..color import pink,yellow
  16. from ..obj import get_obj,MMGenList
  17. from ..util import msg,fmt,die,suf,remove_dups,get_extension
  18. from ..addr import (
  19. is_mmgen_id,
  20. MMGenAddrType,
  21. CoinAddr,
  22. is_mmgen_addrtype,
  23. is_coin_addr,
  24. is_addrlist_id
  25. )
  26. def mmaddr2coinaddr(cfg,mmaddr,ad_w,ad_f,proto):
  27. def wmsg(k):
  28. messages = {
  29. 'addr_in_addrfile_only': f"""
  30. Warning: output address {mmaddr} is not in the tracking wallet, which
  31. means its balance will not be tracked. You're strongly advised to import
  32. the address into your tracking wallet before broadcasting this transaction.
  33. """,
  34. 'addr_not_found': f"""
  35. No data for {gc.proj_name} address {mmaddr} could be found in either the
  36. tracking wallet or the supplied address file. Please import this address
  37. into your tracking wallet, or supply an address file on the command line.
  38. """,
  39. 'addr_not_found_no_addrfile': f"""
  40. No data for {gc.proj_name} address {mmaddr} could be found in the tracking
  41. wallet. Please import this address into your tracking wallet or supply an
  42. address file for it on the command line.
  43. """
  44. }
  45. return '\n' + fmt(messages[k],indent=' ')
  46. # assume mmaddr has already been checked
  47. coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
  48. if not coin_addr:
  49. if ad_f:
  50. coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
  51. if coin_addr:
  52. msg(wmsg('addr_in_addrfile_only'))
  53. from ..ui import keypress_confirm
  54. if not (cfg.yes or keypress_confirm( cfg, 'Continue anyway?' )):
  55. import sys
  56. sys.exit(1)
  57. else:
  58. die(2,wmsg('addr_not_found'))
  59. else:
  60. die(2,wmsg('addr_not_found_no_addrfile'))
  61. return CoinAddr(proto,coin_addr)
  62. class New(Base):
  63. fee_is_approximate = False
  64. msg_low_coin = 'Selected outputs insufficient to fund this transaction ({} {} needed)'
  65. msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
  66. msg_no_change_output = """
  67. ERROR: No change address specified. If you wish to create a transaction with
  68. only one output, specify a single output address with no {} amount
  69. """
  70. chg_autoselected = False
  71. def update_output_amt(self,idx,amt):
  72. o = self.outputs[idx]._asdict()
  73. o['amt'] = amt
  74. self.outputs[idx] = self.Output(self.proto,**o)
  75. def add_mmaddrs_to_outputs(self,ad_w,ad_f):
  76. a = [e.addr for e in self.outputs]
  77. d = ad_w.make_reverse_dict(a)
  78. if ad_f:
  79. d.update(ad_f.make_reverse_dict(a))
  80. for e in self.outputs:
  81. if e.addr and e.addr in d:
  82. e.mmid,f = d[e.addr]
  83. if f:
  84. e.comment = f
  85. def check_dup_addrs(self,io_str):
  86. assert io_str in ('inputs','outputs')
  87. addrs = [e.addr for e in getattr(self,io_str)]
  88. if len(addrs) != len(set(addrs)):
  89. die(2,f'{addrs}: duplicate address in transaction {io_str}')
  90. # given tx size and absolute fee or fee spec, return absolute fee
  91. # relative fee is N+<first letter of unit name>
  92. def feespec2abs(self,fee_arg,tx_size):
  93. fee = get_obj(self.proto.coin_amt,num=fee_arg,silent=True)
  94. if fee:
  95. return fee
  96. else:
  97. import re
  98. units = {u[0]:u for u in self.proto.coin_amt.units}
  99. pat = re.compile(r'([1-9][0-9]*)({})'.format('|'.join(units)))
  100. if pat.match(fee_arg):
  101. amt,unit = pat.match(fee_arg).groups()
  102. return self.fee_rel2abs(tx_size,units,int(amt),unit)
  103. return False
  104. def get_usr_fee_interactive(self,fee=None,desc='Starting'):
  105. abs_fee = None
  106. from ..ui import line_input
  107. while True:
  108. if fee:
  109. abs_fee = self.convert_and_check_fee(fee,desc)
  110. if abs_fee:
  111. prompt = '{a} TX fee{b}: {c}{d} {e} ({f} {g})\n'.format(
  112. a = desc,
  113. b = (f' (after {self.cfg.fee_adjust:.2f}X adjustment)'
  114. if self.cfg.fee_adjust != 1 and desc.startswith('Network-estimated')
  115. else ''),
  116. c = ('','≈')[self.fee_is_approximate],
  117. d = abs_fee.hl(),
  118. e = self.coin,
  119. f = pink(str(self.fee_abs2rel(abs_fee))),
  120. g = self.rel_fee_disp)
  121. from ..ui import keypress_confirm
  122. if self.cfg.yes or keypress_confirm( self.cfg, prompt+'OK?', default_yes=True ):
  123. if self.cfg.yes:
  124. msg(prompt)
  125. return abs_fee
  126. fee = line_input( self.cfg, self.usr_fee_prompt )
  127. desc = 'User-selected'
  128. # we don't know fee yet, so perform preliminary check with fee == 0
  129. async def precheck_sufficient_funds(self,inputs_sum,sel_unspent,outputs_sum):
  130. if self.twuo.total < outputs_sum:
  131. msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
  132. return False
  133. if inputs_sum < outputs_sum:
  134. msg(self.msg_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
  135. return False
  136. return True
  137. async def get_fee_from_user(self,have_estimate_fail=[]):
  138. if self.cfg.fee:
  139. desc = 'User-selected'
  140. start_fee = self.cfg.fee
  141. else:
  142. desc = 'Network-estimated ({}, {} conf{})'.format(
  143. self.cfg.fee_estimate_mode.upper(),
  144. pink(str(self.cfg.fee_estimate_confs)),
  145. suf(self.cfg.fee_estimate_confs) )
  146. fee_per_kb,fe_type = await self.get_rel_fee_from_network()
  147. if fee_per_kb < 0:
  148. if not have_estimate_fail:
  149. msg(self.fee_fail_fs.format(c=self.cfg.fee_estimate_confs,t=fe_type))
  150. have_estimate_fail.append(True)
  151. start_fee = None
  152. else:
  153. start_fee = self.fee_est2abs(fee_per_kb,fe_type)
  154. return self.get_usr_fee_interactive(start_fee,desc=desc)
  155. def add_output(self,coinaddr,amt,is_chg=None):
  156. self.outputs.append(self.Output(self.proto,addr=coinaddr,amt=amt,is_chg=is_chg))
  157. async def process_cmd_arg(self,arg_in,ad_f,ad_w):
  158. arg,amt = arg_in.split(',',1) if ',' in arg_in else (arg_in,None)
  159. if is_mmgen_id(self.proto,arg):
  160. coin_addr = mmaddr2coinaddr(self.cfg,arg,ad_w,ad_f,self.proto)
  161. elif is_coin_addr(self.proto,arg):
  162. coin_addr = CoinAddr(self.proto,arg)
  163. elif is_mmgen_addrtype(self.proto,arg) or is_addrlist_id(self.proto,arg):
  164. if self.proto.base_proto_coin != 'BTC':
  165. die(2,f'Change addresses not supported for {self.proto.name} protocol')
  166. from ..tw.addresses import TwAddresses
  167. al = await TwAddresses(self.cfg,self.proto,get_data=True)
  168. if is_mmgen_addrtype(self.proto,arg):
  169. arg = MMGenAddrType(self.proto,arg)
  170. res = al.get_change_address_by_addrtype(arg)
  171. desc = 'of address type'
  172. else:
  173. res = al.get_change_address(arg)
  174. desc = 'from address list'
  175. if res:
  176. coin_addr = res.addr
  177. self.chg_autoselected = True
  178. else:
  179. die(2,'Tracking wallet contains no {t}addresses {d} {a!r}'.format(
  180. t = ('unused ','')[res is None],
  181. d = desc,
  182. a = arg ))
  183. else:
  184. die(2,f'{arg_in}: invalid command-line argument')
  185. if not (amt or self.chg_idx is None):
  186. die(2,'ERROR: More than one change address {} on command line'.format(
  187. 'requested' if self.chg_autoselected else 'listed'))
  188. self.add_output(coin_addr,self.proto.coin_amt(amt or '0'),is_chg=not amt)
  189. async def process_cmd_args(self,cmd_args,ad_f,ad_w):
  190. for a in cmd_args:
  191. await self.process_cmd_arg(a,ad_f,ad_w)
  192. if self.chg_idx is None:
  193. die(2,(
  194. fmt( self.msg_no_change_output.format(self.dcoin) ).strip()
  195. if len(self.outputs) == 1 else
  196. 'ERROR: No change output specified' ))
  197. if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
  198. die(2,f'{gc.proj_name} Segwit address requested on the command line, '
  199. + 'but Segwit is not active on this chain')
  200. if not self.outputs:
  201. die(2,'At least one output must be specified on the command line')
  202. async def get_outputs_from_cmdline(self,cmd_args):
  203. from ..addrdata import AddrData,TwAddrData
  204. from ..addrlist import AddrList
  205. from ..addrfile import AddrFile
  206. addrfiles = remove_dups(
  207. tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
  208. desc = 'command line',
  209. edesc = 'argument',
  210. )
  211. cmd_args = remove_dups(
  212. tuple(a for a in cmd_args if a not in addrfiles),
  213. desc = 'command line',
  214. edesc = 'argument',
  215. )
  216. ad_f = AddrData(self.proto)
  217. from ..fileutil import check_infile
  218. for addrfile in addrfiles:
  219. check_infile(addrfile)
  220. ad_f.add(AddrList( self.cfg, self.proto, addrfile ))
  221. ad_w = await TwAddrData(self.cfg,self.proto,twctl=self.twctl)
  222. await self.process_cmd_args(cmd_args,ad_f,ad_w)
  223. self.add_mmaddrs_to_outputs(ad_w,ad_f)
  224. self.check_dup_addrs('outputs')
  225. if self.chg_output is not None:
  226. if self.chg_autoselected:
  227. self.confirm_autoselected_addr(self.chg_output)
  228. elif len(self.outputs) > 1:
  229. await self.warn_chg_addr_used(self.chg_output)
  230. def confirm_autoselected_addr(self,chg):
  231. from ..ui import keypress_confirm
  232. if not keypress_confirm(
  233. self.cfg,
  234. 'Using {a} as {b} address. OK?'.format(
  235. a = chg.mmid.hl(),
  236. b = 'single output' if len(self.outputs) == 1 else 'change' ),
  237. default_yes = True ):
  238. die(1,'Exiting at user request')
  239. async def warn_chg_addr_used(self,chg):
  240. from ..tw.addresses import TwAddresses
  241. if (await TwAddresses(self.cfg,self.proto,get_data=True)).is_used(chg.addr):
  242. from ..ui import keypress_confirm
  243. if not keypress_confirm(
  244. self.cfg,
  245. '{a} {b} {c}\n{d}'.format(
  246. a = yellow('Requested change address'),
  247. b = (chg.mmid or chg.addr).hl(),
  248. c = yellow('is already used!'),
  249. d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
  250. ),
  251. complete_prompt = True,
  252. default_yes = False ):
  253. die(1,'Exiting at user request')
  254. # inputs methods
  255. def select_unspent(self,unspent):
  256. prompt = 'Enter a range or space-separated list of outputs to spend: '
  257. from ..ui import line_input
  258. while True:
  259. reply = line_input( self.cfg, prompt ).strip()
  260. if reply:
  261. from ..addrlist import AddrIdxList
  262. selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
  263. if selected:
  264. if selected[-1] <= len(unspent):
  265. return selected
  266. msg(f'Unspent output number must be <= {len(unspent)}')
  267. def select_unspent_cmdline(self,unspent):
  268. def idx2num(idx):
  269. uo = unspent[idx]
  270. mmid_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
  271. msg(f'Adding input: {idx + 1} {uo.addr}{mmid_disp}')
  272. return idx + 1
  273. def get_uo_nums():
  274. for addr in self.cfg.inputs.split(','):
  275. if is_mmgen_id(self.proto,addr):
  276. attr = 'twmmid'
  277. elif is_coin_addr(self.proto,addr):
  278. attr = 'addr'
  279. else:
  280. die(1,f'{addr!r}: not an MMGen ID or {self.coin} address')
  281. found = False
  282. for idx,us in enumerate(unspent):
  283. if getattr(us,attr) == addr:
  284. yield idx2num(idx)
  285. found = True
  286. if not found:
  287. die(1,f'{addr!r}: address not found in tracking wallet')
  288. return set(get_uo_nums()) # silently discard duplicates
  289. def copy_inputs_from_tw(self,tw_unspent_data):
  290. def gen_inputs():
  291. for d in tw_unspent_data:
  292. i = self.Input(
  293. self.proto,
  294. **{attr:getattr(d,attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs} )
  295. if d.twmmid.type == 'mmgen':
  296. i.mmid = d.twmmid # twmmid -> mmid
  297. yield i
  298. self.inputs = type(self.inputs)(self,list(gen_inputs()))
  299. def warn_insufficient_funds(self,funds_left):
  300. msg(self.msg_low_coin.format(self.proto.coin_amt(-funds_left).hl(),self.coin))
  301. async def get_funds_left(self,fee,outputs_sum):
  302. return self.sum_inputs() - outputs_sum - fee
  303. async def get_inputs_from_user(self,outputs_sum):
  304. while True:
  305. us_f = self.select_unspent_cmdline if self.cfg.inputs else self.select_unspent
  306. sel_nums = us_f(self.twuo.data)
  307. msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums)))
  308. sel_unspent = MMGenList(self.twuo.data[i-1] for i in sel_nums)
  309. inputs_sum = sum(s.amt for s in sel_unspent)
  310. if not await self.precheck_sufficient_funds(inputs_sum,sel_unspent,outputs_sum):
  311. continue
  312. self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
  313. self.usr_fee = await self.get_fee_from_user()
  314. funds_left = await self.get_funds_left(self.usr_fee,outputs_sum)
  315. if funds_left >= 0:
  316. p = self.final_inputs_ok_msg(funds_left)
  317. from ..ui import keypress_confirm
  318. if self.cfg.yes or keypress_confirm( self.cfg, p+'. OK?', default_yes=True ):
  319. if self.cfg.yes:
  320. msg(p)
  321. return funds_left
  322. else:
  323. self.warn_insufficient_funds(funds_left)
  324. async def create(self,cmd_args,locktime=None,do_info=False,caller='txcreate'):
  325. assert isinstance( locktime, (int,type(None)) ), 'locktime must be of type int'
  326. from ..tw.unspent import TwUnspentOutputs
  327. if self.cfg.comment_file:
  328. self.add_comment(self.cfg.comment_file)
  329. twuo_addrs = await self.get_input_addrs_from_cmdline()
  330. self.twuo = await TwUnspentOutputs(self.cfg,self.proto,minconf=self.cfg.minconf,addrs=twuo_addrs)
  331. await self.twuo.get_data()
  332. if not do_info:
  333. await self.get_outputs_from_cmdline(cmd_args)
  334. from ..ui import do_license_msg
  335. do_license_msg(self.cfg)
  336. if not self.cfg.inputs:
  337. await self.twuo.view_filter_and_sort()
  338. self.twuo.display_total()
  339. if do_info:
  340. del self.twuo.twctl
  341. import sys
  342. sys.exit(0)
  343. outputs_sum = self.sum_outputs()
  344. msg('Total amount to spend: {}'.format(
  345. f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'
  346. ))
  347. funds_left = await self.get_inputs_from_user(outputs_sum)
  348. self.check_non_mmgen_inputs(caller)
  349. self.update_change_output(funds_left)
  350. if not self.cfg.yes:
  351. self.add_comment() # edits an existing comment
  352. await self.create_serialized(locktime=locktime) # creates self.txid too
  353. self.add_timestamp()
  354. self.add_blockcount()
  355. self.chain = self.proto.chain_name
  356. self.check_fee()
  357. self.cfg._util.qmsg('Transaction successfully created')
  358. from . import UnsignedTX
  359. new = UnsignedTX(cfg=self.cfg,data=self.__dict__)
  360. if not self.cfg.yes:
  361. new.info.view_with_prompt('View transaction details?')
  362. del new.twuo.twctl
  363. return new