common.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. #!/usr/bin/env python3
  2. #
  3. # mmgen = Multi-Mode GENerator, command-line Bitcoin cold storage solution
  4. # Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
  5. #
  6. # This program is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # This program is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. """
  19. tw: Tracking wallet dependency classes and helper functions
  20. """
  21. import sys,time
  22. from ..globalvars import g
  23. from ..objmethods import Hilite,InitErrors,MMGenObject
  24. from ..obj import TwComment,get_obj,MMGenIdx,MMGenList
  25. from ..color import nocolor,yellow,green
  26. from ..util import msg,msg_r,fmt,die,line_input,do_pager,capfirst,make_timestr
  27. from ..addr import MMGenID
  28. # mixin class for TwUnspentOutputs,TwAddrList,TwTxHistory:
  29. class TwCommon:
  30. cols = None
  31. reverse = False
  32. group = False
  33. sort_key = 'age'
  34. interactive = False
  35. age_fmts = ('confs','block','days','date','date_time')
  36. age_fmts_date_dependent = ('days','date','date_time')
  37. age_fmts_interactive = ('confs','block','days','date')
  38. _age_fmt = 'confs'
  39. age_col_params = {
  40. 'confs': (7, 'Confs'),
  41. 'block': (8, 'Block'),
  42. 'days': (6, 'Age(d)'),
  43. 'date': (8, 'Date'),
  44. 'date_time': (16, 'Date/Time'),
  45. }
  46. date_formatter = {
  47. 'days': lambda rpc,secs: (rpc.cur_date - secs) // 86400 if secs else 0,
  48. 'date': (
  49. lambda rpc,secs: '{}-{:02}-{:02}'.format(*time.gmtime(secs)[:3])[2:]
  50. if secs else '--------' ),
  51. 'date_time': (
  52. lambda rpc,secs: '{}-{:02}-{:02} {:02}:{:02}'.format(*time.gmtime(secs)[:5])
  53. if secs else '---------- -----' ),
  54. }
  55. def age_disp(self,o,age_fmt):
  56. if age_fmt == 'confs':
  57. return o.confs
  58. elif age_fmt == 'block':
  59. return self.rpc.blockcount - (o.confs - 1)
  60. else:
  61. return self.date_formatter[age_fmt](self.rpc,o.date)
  62. async def get_data(self,sort_key=None,reverse_sort=False):
  63. rpc_data = await self.get_rpc_data()
  64. if not rpc_data:
  65. die(0,fmt(self.no_rpcdata_errmsg).strip())
  66. lbl_id = ('account','label')['label_api' in self.rpc.caps]
  67. res = self.gen_data(rpc_data,lbl_id)
  68. self.data = MMGenList(await res if type(res).__name__ == 'coroutine' else res)
  69. if not self.data:
  70. die(1,self.no_data_errmsg)
  71. self.do_sort(key=sort_key,reverse=reverse_sort)
  72. @staticmethod
  73. async def set_dates(rpc,us):
  74. if us and us[0].date is None:
  75. # 'blocktime' differs from 'time', is same as getblockheader['time']
  76. dates = [ o.get('blocktime',0)
  77. for o in await rpc.gathered_icall('gettransaction',[(o.txid,True,False) for o in us]) ]
  78. for idx,o in enumerate(us):
  79. o.date = dates[idx]
  80. @property
  81. def age_w(self):
  82. return self.age_col_params[self.age_fmt][0]
  83. @property
  84. def age_hdr(self):
  85. return self.age_col_params[self.age_fmt][1]
  86. @property
  87. def age_fmt(self):
  88. return self._age_fmt
  89. @age_fmt.setter
  90. def age_fmt(self,val):
  91. ok_vals,op_desc = (
  92. (self.age_fmts_interactive,'interactive') if self.interactive else
  93. (self.age_fmts,'non-interactive') )
  94. if val not in ok_vals:
  95. die('BadAgeFormat',
  96. f'{val!r}: invalid age format for {op_desc} operation (must be one of {ok_vals!r})' )
  97. self._age_fmt = val
  98. @property
  99. def disp_prec(self):
  100. return self.proto.coin_amt.max_prec
  101. def get_term_columns(self,min_cols):
  102. from ..term import get_terminal_size,get_char_raw
  103. while True:
  104. cols = g.columns or get_terminal_size().width
  105. if cols >= min_cols:
  106. return cols
  107. if sys.stdout.isatty():
  108. if g.columns:
  109. die(1,
  110. f'\n--columns or MMGEN_COLUMNS value ({g.columns}) is too small to display the {self.desc}.\n'
  111. + f'Minimum value for this configuration: {min_cols}' )
  112. else:
  113. get_char_raw(
  114. f'\nScreen is too narrow to display the {self.desc}\n'
  115. + f'Please resize your screen to at least {min_cols} characters and hit any key: ' )
  116. else:
  117. return min_cols
  118. sort_disp = {
  119. 'addr': 'Addr',
  120. 'age': 'Age',
  121. 'amt': 'Amt',
  122. 'txid': 'TxID',
  123. 'twmmid': 'MMGenID',
  124. }
  125. def sort_info(self,include_group=True):
  126. ret = ([],['Reverse'])[self.reverse]
  127. ret.append(self.sort_disp[self.sort_key])
  128. if include_group and self.group and (self.sort_key in ('addr','txid','twmmid')):
  129. ret.append('Grouped')
  130. return ret
  131. sort_funcs = {
  132. 'addr': lambda i: i.addr,
  133. 'age': lambda i: 0 - i.confs,
  134. 'amt': lambda i: i.amt,
  135. 'txid': lambda i: f'{i.txid} {i.vout:04}',
  136. 'twmmid': lambda i: i.twmmid.sort_key
  137. }
  138. def do_sort(self,key=None,reverse=False):
  139. key = key or self.sort_key
  140. if key not in self.sort_funcs:
  141. die(1,f'{key!r}: invalid sort key. Valid options: {" ".join(self.sort_funcs)}')
  142. self.sort_key = key
  143. assert type(reverse) == bool
  144. self.data.sort(key=self.sort_funcs[key],reverse=reverse or self.reverse)
  145. async def format_squeezed(self,color=True,cached=False):
  146. if not cached:
  147. data = self.data
  148. if self.has_age and self.age_fmt in self.age_fmts_date_dependent:
  149. await self.set_dates(self.rpc,data)
  150. if not getattr(self,'column_params',None):
  151. self.set_column_params()
  152. if self.group and (self.sort_key in ('addr','txid','twmmid')):
  153. for a,b in [(data[i],data[i+1]) for i in range(len(data)-1)]:
  154. for k in ('addr','txid','twmmid'):
  155. if self.sort_key == k and getattr(a,k) == getattr(b,k):
  156. b.skip = (k,'addr')[k=='twmmid']
  157. self._format_squeezed_display_data = (
  158. self.hdr_fmt.format(
  159. a = ' '.join(self.sort_info()),
  160. b = self.proto.dcoin,
  161. c = self.total.hl() if hasattr(self,'total') else None )
  162. + '\nNetwork: {}'.format((nocolor,green)[color](
  163. self.proto.coin + ' ' +
  164. self.proto.chain_name.upper() ))
  165. + '\n' + '\n'.join(self.gen_squeezed_display(self.column_params,color=color))
  166. + '\n'
  167. )
  168. return self._format_squeezed_display_data
  169. async def format_detail(self,color):
  170. if self.has_age:
  171. await self.set_dates(self.rpc,self.data)
  172. sep = self.detail_display_separator
  173. return self.print_hdr_fs.format(
  174. a = capfirst(self.desc),
  175. b = self.rpc.blockcount,
  176. c = make_timestr(self.rpc.cur_date),
  177. d = 'Network: {}\n'.format((nocolor,green)[color](
  178. self.proto.coin + ' ' +
  179. self.proto.chain_name.upper() )),
  180. e = ' '.join(self.sort_info(include_group=False)),
  181. f = sep.join(self.gen_detail_display(color)),
  182. g = self.proto.dcoin,
  183. h = self.total.hl(color=color) if hasattr(self,'total') else None )
  184. async def view_and_sort(self):
  185. from ..opts import opt
  186. from ..term import get_char
  187. self.prompt = type(self).prompt.strip() + '\b'
  188. self.no_output = False
  189. self.oneshot_msg = None
  190. self.interactive = True
  191. CUR_HOME = '\033[H'
  192. ERASE_ALL = '\033[0J'
  193. while True:
  194. msg_r('' if self.no_output else '\n\n' if (opt.no_blank or g.test_suite) else CUR_HOME+ERASE_ALL)
  195. reply = get_char(
  196. '' if self.no_output else (
  197. await self.format_squeezed()
  198. + '\n'
  199. + (self.oneshot_msg or '')
  200. + self.prompt
  201. ),
  202. immed_chars = ''.join(self.key_mappings.keys())
  203. )
  204. self.no_output = False
  205. self.oneshot_msg = '' if self.oneshot_msg else None # tristate, saves previous state
  206. if reply not in self.key_mappings:
  207. msg_r('\ninvalid keypress ')
  208. time.sleep(0.5)
  209. continue
  210. action = self.key_mappings[reply]
  211. if hasattr(self.action,action):
  212. await self.action().run(self,action)
  213. elif action.startswith('s_'): # put here to allow overriding by action method
  214. self.do_sort(action[2:])
  215. elif hasattr(self.item_action,action):
  216. await self.item_action().run(self,action)
  217. self.set_column_params()
  218. elif action == 'a_quit':
  219. msg('')
  220. return self.data
  221. class action:
  222. async def run(self,parent,action):
  223. ret = getattr(self,action)(parent)
  224. if type(ret).__name__ == 'coroutine':
  225. await ret
  226. def d_days(self,parent):
  227. af = parent.age_fmts_interactive
  228. parent.age_fmt = af[(af.index(parent.age_fmt) + 1) % len(af)]
  229. if parent.update_params_on_age_toggle:
  230. parent.set_column_params()
  231. def d_redraw(self,parent):
  232. parent.set_column_params()
  233. def d_reverse(self,parent):
  234. parent.data.reverse()
  235. parent.reverse = not parent.reverse
  236. async def a_print_detail(self,parent):
  237. return await self._print(parent,output_type='detail')
  238. async def a_print_squeezed(self,parent):
  239. return await self._print(parent,output_type='squeezed')
  240. async def _print(self,parent,output_type):
  241. outfile = '{}{}-{}{}[{}].out'.format(
  242. parent.dump_fn_pfx,
  243. f'-{output_type}' if len(parent.print_output_types) > 1 else '',
  244. parent.proto.dcoin,
  245. ('' if parent.proto.network == 'mainnet' else '-'+parent.proto.network.upper()),
  246. ','.join(parent.sort_info(include_group=False)).replace(' ','') )
  247. msg('')
  248. from ..fileutil import write_data_to_file
  249. from ..exception import UserNonConfirmation
  250. hdr = {
  251. 'squeezed': f'[screen print truncated to width {parent.cols}]\n',
  252. 'detail': '',
  253. }[output_type]
  254. try:
  255. write_data_to_file(
  256. outfile = outfile,
  257. data = hdr + await getattr(parent,f'format_{output_type}')(color=False),
  258. desc = f'{parent.desc} listing' )
  259. except UserNonConfirmation as e:
  260. parent.oneshot_msg = yellow(f'File {outfile!r} not overwritten by user request\n\n')
  261. else:
  262. parent.oneshot_msg = green(f'Data written to {outfile!r}\n\n')
  263. async def a_view(self,parent):
  264. do_pager( await parent.format_squeezed(color=True,cached=True) )
  265. self.post_view(parent)
  266. async def a_view_detail(self,parent):
  267. do_pager( await parent.format_detail(color=True) )
  268. self.post_view(parent)
  269. def post_view(self,parent):
  270. if g.platform == 'linux' and parent.oneshot_msg == None:
  271. CUR_RIGHT = lambda n: f'\033[{n}C'
  272. msg_r(CUR_RIGHT(len(parent.prompt.split('\n')[-1])-2))
  273. parent.no_output = True
  274. class item_action:
  275. async def run(self,parent,action):
  276. msg('')
  277. while True:
  278. ret = line_input(f'Enter {parent.item_desc} number (or RETURN to return to main menu): ')
  279. if ret == '':
  280. return None
  281. idx = get_obj(MMGenIdx,n=ret,silent=True)
  282. if not idx or idx < 1 or idx > len(parent.data):
  283. msg(f'Choice must be a single number between 1 and {len(parent.data)}')
  284. elif (await getattr(self,action)(parent,idx)) != 'redo':
  285. break
  286. class TwMMGenID(str,Hilite,InitErrors,MMGenObject):
  287. color = 'orange'
  288. width = 0
  289. trunc_ok = False
  290. def __new__(cls,proto,id_str):
  291. if type(id_str) == cls:
  292. return id_str
  293. ret = None
  294. try:
  295. ret = MMGenID(proto,id_str)
  296. sort_key,idtype = ret.sort_key,'mmgen'
  297. except Exception as e:
  298. try:
  299. assert id_str.split(':',1)[0] == proto.base_coin.lower(),(
  300. f'not a string beginning with the prefix {proto.base_coin.lower()!r}:' )
  301. assert id_str.isascii() and id_str[4:].isalnum(), 'not an ASCII alphanumeric string'
  302. assert len(id_str) > 4,'not more that four characters long'
  303. ret,sort_key,idtype = str(id_str),'z_'+id_str,'non-mmgen'
  304. except Exception as e2:
  305. return cls.init_fail(e,id_str,e2=e2)
  306. me = str.__new__(cls,ret)
  307. me.obj = ret
  308. me.sort_key = sort_key
  309. me.type = idtype
  310. me.proto = proto
  311. return me
  312. # non-displaying container for TwMMGenID,TwComment
  313. class TwLabel(str,InitErrors,MMGenObject):
  314. exc = 'BadTwLabel'
  315. passthru_excs = ('BadTwComment',)
  316. def __new__(cls,proto,text):
  317. if type(text) == cls:
  318. return text
  319. try:
  320. ts = text.split(None,1)
  321. mmid = TwMMGenID(proto,ts[0])
  322. comment = TwComment(ts[1] if len(ts) == 2 else '')
  323. me = str.__new__( cls, mmid + (' ' + comment if comment else '') )
  324. me.mmid = mmid
  325. me.comment = comment
  326. me.proto = proto
  327. return me
  328. except Exception as e:
  329. return cls.init_fail(e,text)
  330. def get_tw_label(proto,s):
  331. """
  332. raise an exception on a malformed comment, return None on an empty or invalid label
  333. """
  334. try:
  335. return TwLabel(proto,s)
  336. except Exception as e:
  337. if type(e).__name__ == 'BadTwComment': # do it this way to avoid importing .exception
  338. raise
  339. else:
  340. return None