new.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. # Licensed under the GNU General Public License, Version 3:
  6. # https://www.gnu.org/licenses
  7. # Public project repositories:
  8. # https://github.com/mmgen/mmgen
  9. # https://gitlab.com/mmgen/mmgen
  10. """
  11. tx.new: new transaction class
  12. """
  13. from ..globalvars import *
  14. from ..opts import opt
  15. from .base import Base
  16. from ..color import pink,yellow
  17. from ..obj import get_obj,MMGenList
  18. from ..util import msg,qmsg,fmt,die,suf,remove_dups,get_extension
  19. from ..addr import (
  20. is_mmgen_id,
  21. CoinAddr,
  22. is_coin_addr,
  23. is_addrlist_id
  24. )
  25. def mmaddr2coinaddr(mmaddr,ad_w,ad_f,proto):
  26. def wmsg(k):
  27. messages = {
  28. 'addr_in_addrfile_only': f"""
  29. Warning: output address {mmaddr} is not in the tracking wallet, which
  30. means its balance will not be tracked. You're strongly advised to import
  31. the address into your tracking wallet before broadcasting this transaction.
  32. """,
  33. 'addr_not_found': f"""
  34. No data for {g.proj_name} address {mmaddr} could be found in either the
  35. tracking wallet or the supplied address file. Please import this address
  36. into your tracking wallet, or supply an address file on the command line.
  37. """,
  38. 'addr_not_found_no_addrfile': f"""
  39. No data for {g.proj_name} address {mmaddr} could be found in the tracking
  40. wallet. Please import this address into your tracking wallet or supply an
  41. address file for it on the command line.
  42. """
  43. }
  44. return '\n' + fmt(messages[k],indent=' ')
  45. # assume mmaddr has already been checked
  46. coin_addr = ad_w.mmaddr2coinaddr(mmaddr)
  47. if not coin_addr:
  48. if ad_f:
  49. coin_addr = ad_f.mmaddr2coinaddr(mmaddr)
  50. if coin_addr:
  51. msg(wmsg('addr_in_addrfile_only'))
  52. from ..ui import keypress_confirm
  53. if not (opt.yes or keypress_confirm('Continue anyway?')):
  54. sys.exit(1)
  55. else:
  56. die(2,wmsg('addr_not_found'))
  57. else:
  58. die(2,wmsg('addr_not_found_no_addrfile'))
  59. return CoinAddr(proto,coin_addr)
  60. class New(Base):
  61. fee_is_approximate = False
  62. msg_low_coin = 'Selected outputs insufficient to fund this transaction ({} {} needed)'
  63. msg_wallet_low_coin = 'Wallet has insufficient funds for this transaction ({} {} needed)'
  64. msg_no_change_output = """
  65. ERROR: No change address specified. If you wish to create a transaction with
  66. only one output, specify a single output address with no {} amount
  67. """
  68. chg_autoselected = False
  69. def update_output_amt(self,idx,amt):
  70. o = self.outputs[idx]._asdict()
  71. o['amt'] = amt
  72. self.outputs[idx] = self.Output(self.proto,**o)
  73. def add_mmaddrs_to_outputs(self,ad_w,ad_f):
  74. a = [e.addr for e in self.outputs]
  75. d = ad_w.make_reverse_dict(a)
  76. if ad_f:
  77. d.update(ad_f.make_reverse_dict(a))
  78. for e in self.outputs:
  79. if e.addr and e.addr in d:
  80. e.mmid,f = d[e.addr]
  81. if f:
  82. e.comment = f
  83. def check_dup_addrs(self,io_str):
  84. assert io_str in ('inputs','outputs')
  85. addrs = [e.addr for e in getattr(self,io_str)]
  86. if len(addrs) != len(set(addrs)):
  87. die(2,f'{addrs}: duplicate address in transaction {io_str}')
  88. # given tx size and absolute fee or fee spec, return absolute fee
  89. # relative fee is N+<first letter of unit name>
  90. def feespec2abs(self,fee_arg,tx_size):
  91. fee = get_obj(self.proto.coin_amt,num=fee_arg,silent=True)
  92. if fee:
  93. return fee
  94. else:
  95. import re
  96. units = {u[0]:u for u in self.proto.coin_amt.units}
  97. pat = re.compile(r'([1-9][0-9]*)({})'.format('|'.join(units)))
  98. if pat.match(fee_arg):
  99. amt,unit = pat.match(fee_arg).groups()
  100. return self.fee_rel2abs(tx_size,units,int(amt),unit)
  101. return False
  102. def get_usr_fee_interactive(self,fee=None,desc='Starting'):
  103. abs_fee = None
  104. from ..ui import line_input
  105. while True:
  106. if fee:
  107. abs_fee = self.convert_and_check_fee(fee,desc)
  108. if abs_fee:
  109. prompt = '{} TX fee{}: {}{} {} ({} {})\n'.format(
  110. desc,
  111. (f' (after {opt.fee_adjust:.2f}X adjustment)'
  112. if opt.fee_adjust != 1 and desc.startswith('Network-estimated')
  113. else ''),
  114. ('','≈')[self.fee_is_approximate],
  115. abs_fee.hl(),
  116. self.coin,
  117. pink(str(self.fee_abs2rel(abs_fee))),
  118. self.rel_fee_disp)
  119. from ..ui import keypress_confirm
  120. if opt.yes or keypress_confirm(prompt+'OK?',default_yes=True):
  121. if opt.yes:
  122. msg(prompt)
  123. return abs_fee
  124. fee = line_input(self.usr_fee_prompt)
  125. desc = 'User-selected'
  126. # we don't know fee yet, so perform preliminary check with fee == 0
  127. async def precheck_sufficient_funds(self,inputs_sum,sel_unspent,outputs_sum):
  128. if self.twuo.total < outputs_sum:
  129. msg(self.msg_wallet_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
  130. return False
  131. if inputs_sum < outputs_sum:
  132. msg(self.msg_low_coin.format(outputs_sum-inputs_sum,self.dcoin))
  133. return False
  134. return True
  135. async def get_fee_from_user(self,have_estimate_fail=[]):
  136. if opt.fee:
  137. desc = 'User-selected'
  138. start_fee = opt.fee
  139. else:
  140. desc = 'Network-estimated ({}, {} conf{})'.format(
  141. opt.fee_estimate_mode.upper(),
  142. pink(str(opt.fee_estimate_confs)),
  143. suf(opt.fee_estimate_confs) )
  144. fee_per_kb,fe_type = await self.get_rel_fee_from_network()
  145. if fee_per_kb < 0:
  146. if not have_estimate_fail:
  147. msg(self.fee_fail_fs.format(c=opt.fee_estimate_confs,t=fe_type))
  148. have_estimate_fail.append(True)
  149. start_fee = None
  150. else:
  151. start_fee = self.fee_est2abs(fee_per_kb,fe_type)
  152. return self.get_usr_fee_interactive(start_fee,desc=desc)
  153. def add_output(self,coinaddr,amt,is_chg=None):
  154. self.outputs.append(self.Output(self.proto,addr=coinaddr,amt=amt,is_chg=is_chg))
  155. async def process_cmd_arg(self,arg_in,ad_f,ad_w):
  156. arg,amt = arg_in.split(',',1) if ',' in arg_in else (arg_in,None)
  157. if is_mmgen_id(self.proto,arg):
  158. coin_addr = mmaddr2coinaddr(arg,ad_w,ad_f,self.proto)
  159. elif is_coin_addr(self.proto,arg):
  160. coin_addr = CoinAddr(self.proto,arg)
  161. elif is_addrlist_id(self.proto,arg):
  162. if self.proto.base_proto_coin != 'BTC':
  163. die(2,f'Change addresses not supported for {self.proto.name} protocol')
  164. from ..tw.addresses import TwAddresses
  165. al = await TwAddresses(self.proto,get_data=True)
  166. al.reverse = False
  167. al.do_sort('twmmid')
  168. res = al.get_change_address(arg)
  169. if res:
  170. coin_addr = res.addr
  171. self.chg_autoselected = True
  172. else:
  173. die(2,'Tracking wallet contains no {t}addresses from address list {a!r}'.format(
  174. t = ('unused ','')[res is None],
  175. a = arg ))
  176. else:
  177. die(2,f'{arg_in}: invalid command-line argument')
  178. if not (amt or self.chg_idx is None):
  179. die(2,'ERROR: More than one change address {} on command line'.format(
  180. 'requested' if self.chg_autoselected else 'listed'))
  181. self.add_output(coin_addr,self.proto.coin_amt(amt or '0'),is_chg=not amt)
  182. async def process_cmd_args(self,cmd_args,ad_f,ad_w):
  183. for a in cmd_args:
  184. await self.process_cmd_arg(a,ad_f,ad_w)
  185. if self.chg_idx is None:
  186. die(2,(
  187. fmt( self.msg_no_change_output.format(self.dcoin) ).strip()
  188. if len(self.outputs) == 1 else
  189. 'ERROR: No change output specified' ))
  190. if self.has_segwit_outputs() and not self.rpc.info('segwit_is_active'):
  191. die(2,f'{g.proj_name} Segwit address requested on the command line, '
  192. + 'but Segwit is not active on this chain')
  193. if not self.outputs:
  194. die(2,'At least one output must be specified on the command line')
  195. async def get_outputs_from_cmdline(self,cmd_args):
  196. from ..addrdata import AddrData,TwAddrData
  197. from ..addrlist import AddrList
  198. from ..addrfile import AddrFile
  199. addrfiles = remove_dups(
  200. tuple(a for a in cmd_args if get_extension(a) == AddrFile.ext),
  201. desc = 'command line',
  202. edesc = 'argument',
  203. )
  204. cmd_args = remove_dups(
  205. tuple(a for a in cmd_args if a not in addrfiles),
  206. desc = 'command line',
  207. edesc = 'argument',
  208. )
  209. ad_f = AddrData(self.proto)
  210. from ..fileutil import check_infile
  211. for a in addrfiles:
  212. check_infile(a)
  213. ad_f.add(AddrList(self.proto,a))
  214. ad_w = await TwAddrData(self.proto,twctl=self.twctl)
  215. await self.process_cmd_args(cmd_args,ad_f,ad_w)
  216. self.add_mmaddrs_to_outputs(ad_w,ad_f)
  217. self.check_dup_addrs('outputs')
  218. if self.chg_output is not None:
  219. if self.chg_autoselected:
  220. self.confirm_autoselected_addr(self.chg_output)
  221. elif len(self.outputs) > 1:
  222. await self.warn_chg_addr_used(self.chg_output)
  223. def confirm_autoselected_addr(self,chg):
  224. from ..ui import keypress_confirm
  225. if not keypress_confirm(
  226. 'Using {a} as {b} address. OK?'.format(
  227. a = chg.mmid.hl(),
  228. b = 'single output' if len(self.outputs) == 1 else 'change' ),
  229. default_yes = True ):
  230. die(1,'Exiting at user request')
  231. async def warn_chg_addr_used(self,chg):
  232. from ..tw.addresses import TwAddresses
  233. if (await TwAddresses(self.proto,get_data=True)).is_used(chg.addr):
  234. from ..ui import keypress_confirm
  235. if not keypress_confirm(
  236. '{a} {b} {c}\n{d}'.format(
  237. a = yellow(f'Requested change address'),
  238. b = (chg.mmid or chg.addr).hl(),
  239. c = yellow('is already used!'),
  240. d = yellow('Address reuse harms your privacy and security. Continue anyway? (y/N): ')
  241. ),
  242. complete_prompt = True,
  243. default_yes = False ):
  244. die(1,'Exiting at user request')
  245. # inputs methods
  246. def select_unspent(self,unspent):
  247. prompt = 'Enter a range or space-separated list of outputs to spend: '
  248. from ..ui import line_input
  249. while True:
  250. reply = line_input(prompt).strip()
  251. if reply:
  252. from ..addrlist import AddrIdxList
  253. selected = get_obj(AddrIdxList, fmt_str=','.join(reply.split()) )
  254. if selected:
  255. if selected[-1] <= len(unspent):
  256. return selected
  257. msg(f'Unspent output number must be <= {len(unspent)}')
  258. def select_unspent_cmdline(self,unspent):
  259. def idx2num(idx):
  260. uo = unspent[idx]
  261. mmid_disp = f' ({uo.twmmid})' if uo.twmmid.type == 'mmgen' else ''
  262. msg(f'Adding input: {idx + 1} {uo.addr}{mmid_disp}')
  263. return idx + 1
  264. def get_uo_nums():
  265. for addr in opt.inputs.split(','):
  266. if is_mmgen_id(self.proto,addr):
  267. attr = 'twmmid'
  268. elif is_coin_addr(self.proto,addr):
  269. attr = 'addr'
  270. else:
  271. die(1,f'{addr!r}: not an MMGen ID or {self.coin} address')
  272. found = False
  273. for idx in range(len(unspent)):
  274. if getattr(unspent[idx],attr) == addr:
  275. yield idx2num(idx)
  276. found = True
  277. if not found:
  278. die(1,f'{addr!r}: address not found in tracking wallet')
  279. return set(get_uo_nums()) # silently discard duplicates
  280. def copy_inputs_from_tw(self,tw_unspent_data):
  281. def gen_inputs():
  282. for d in tw_unspent_data:
  283. i = self.Input(
  284. self.proto,
  285. **{attr:getattr(d,attr) for attr in d.__dict__ if attr in self.Input.tw_copy_attrs} )
  286. if d.twmmid.type == 'mmgen':
  287. i.mmid = d.twmmid # twmmid -> mmid
  288. yield i
  289. self.inputs = type(self.inputs)(self,list(gen_inputs()))
  290. def warn_insufficient_funds(self,funds_left):
  291. msg(self.msg_low_coin.format(self.proto.coin_amt(-funds_left).hl(),self.coin))
  292. async def get_funds_left(self,fee,outputs_sum):
  293. return self.sum_inputs() - outputs_sum - fee
  294. async def get_inputs_from_user(self,outputs_sum):
  295. while True:
  296. us_f = self.select_unspent_cmdline if opt.inputs else self.select_unspent
  297. sel_nums = us_f(self.twuo.data)
  298. msg(f'Selected output{suf(sel_nums)}: {{}}'.format(' '.join(str(n) for n in sel_nums)))
  299. sel_unspent = MMGenList(self.twuo.data[i-1] for i in sel_nums)
  300. inputs_sum = sum(s.amt for s in sel_unspent)
  301. if not await self.precheck_sufficient_funds(inputs_sum,sel_unspent,outputs_sum):
  302. continue
  303. self.copy_inputs_from_tw(sel_unspent) # makes self.inputs
  304. self.usr_fee = await self.get_fee_from_user()
  305. funds_left = await self.get_funds_left(self.usr_fee,outputs_sum)
  306. if funds_left >= 0:
  307. p = self.final_inputs_ok_msg(funds_left)
  308. from ..ui import keypress_confirm
  309. if opt.yes or keypress_confirm(p+'. OK?',default_yes=True):
  310. if opt.yes:
  311. msg(p)
  312. return funds_left
  313. else:
  314. self.warn_insufficient_funds(funds_left)
  315. async def create(self,cmd_args,locktime=None,do_info=False,caller='txcreate'):
  316. assert isinstance( locktime, (int,type(None)) ), 'locktime must be of type int'
  317. from ..tw.unspent import TwUnspentOutputs
  318. if opt.comment_file:
  319. self.add_comment(opt.comment_file)
  320. twuo_addrs = await self.get_input_addrs_from_cmdline()
  321. self.twuo = await TwUnspentOutputs(self.proto,minconf=opt.minconf,addrs=twuo_addrs)
  322. await self.twuo.get_data()
  323. if not do_info:
  324. await self.get_outputs_from_cmdline(cmd_args)
  325. from ..ui import do_license_msg
  326. do_license_msg()
  327. if not opt.inputs:
  328. await self.twuo.view_filter_and_sort()
  329. self.twuo.display_total()
  330. if do_info:
  331. del self.twuo.twctl
  332. sys.exit(0)
  333. outputs_sum = self.sum_outputs()
  334. msg('Total amount to spend: {}'.format(
  335. f'{outputs_sum.hl()} {self.dcoin}' if outputs_sum else 'Unknown'
  336. ))
  337. funds_left = await self.get_inputs_from_user(outputs_sum)
  338. self.check_non_mmgen_inputs(caller)
  339. self.update_change_output(funds_left)
  340. if not opt.yes:
  341. self.add_comment() # edits an existing comment
  342. await self.create_serialized(locktime=locktime) # creates self.txid too
  343. self.add_timestamp()
  344. self.add_blockcount()
  345. self.chain = self.proto.chain_name
  346. self.check_fee()
  347. qmsg('Transaction successfully created')
  348. from . import UnsignedTX
  349. new = UnsignedTX(data=self.__dict__)
  350. if not opt.yes:
  351. new.info.view_with_prompt('View transaction details?')
  352. del new.twuo.twctl
  353. return new