mmgen-node-tools/mmgen_node_tools/Ticker.py

1122 lines
34 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
#
# mmgen = Multi-Mode GENerator, a command-line cryptocurrency wallet
# Copyright (C)2013-2022 The MMGen Project <mmgen@tuta.io>
# Licensed under the GNU General Public License, Version 3:
# https://www.gnu.org/licenses
# Public project repositories:
2023-11-20 14:45:15 +00:00
# https://github.com/mmgen/mmgen-wallet https://github.com/mmgen/mmgen-node-tools
# https://gitlab.com/mmgen/mmgen-wallet https://gitlab.com/mmgen/mmgen-node-tools
"""
mmgen_node_tools.Ticker: Display price information for cryptocurrency and other assets
"""
# v3.2.dev4: switch to new coinpaprika ‘tickers’ API call (supports ‘limit’ parameter, more historical data)
# Old ‘ticker’ API (/v1/ticker): data['BTC']['price_usd']
# New ‘tickers’ API (/v1/tickers): data['BTC']['quotes']['USD']['price']
# Possible alternatives:
# - https://min-api.cryptocompare.com/data/pricemultifull?fsyms=BTC,LTC&tsyms=USD,EUR
2025-10-01 15:26:41 +00:00
import sys, os, re, time, datetime, json, yaml, random
from subprocess import run, PIPE, CalledProcessError
from decimal import Decimal
from collections import namedtuple
2025-10-01 15:26:41 +00:00
from mmgen.color import red, yellow, green, blue, orange, gray
from mmgen.util import msg, msg_r, Msg, Msg_r, die, fmt, fmt_list, fmt_dict, list_gen
2024-03-10 14:44:57 +00:00
from mmgen.ui import do_pager
homedir = os.getenv('HOME')
2025-10-04 09:56:55 +00:00
dfl_cachedir = os.path.join(homedir, '.cache', 'mmgen-node-tools')
cfg_fn = 'ticker-cfg.yaml'
portfolio_fn = 'ticker-portfolio.yaml'
2025-10-04 09:56:55 +00:00
asset_tuple = namedtuple('asset_tuple', ['symbol', 'id', 'source'])
2023-11-05 13:40:22 +00:00
last_api_host = None
percent_cols = {
'd': 'day',
'w': 'week',
'm': 'month',
'y': 'year',
}
class DataSource:
2023-11-05 13:40:22 +00:00
source_groups = [
{
'cc': 'coinpaprika'
}, {
'fi': 'yahoospot',
'hi': 'yahoohist',
2025-10-04 09:56:55 +00:00
}]
2023-11-05 13:40:22 +00:00
@classmethod
2025-10-04 09:56:55 +00:00
def get_sources(cls, randomize=False):
g = random.sample(cls.source_groups, k=len(cls.source_groups)) if randomize else cls.source_groups
return {k: v for a in g for k, v in a.items()}
class base:
2023-11-05 13:40:22 +00:00
def fetch_delay(self):
global last_api_host
if not gcfg.testing and last_api_host and last_api_host != self.api_host:
2025-10-04 09:56:55 +00:00
delay = 1 + random.randrange(1, 5000) / 1000
2023-11-05 13:40:22 +00:00
msg_r(f'Waiting {delay:.3f} seconds...')
time.sleep(delay)
msg('')
last_api_host = self.api_host
def get_data_from_network(self):
curl_cmd = list_gen(
2025-10-04 09:56:55 +00:00
['curl', '--tr-encoding', '--header', 'Accept: application/json', True],
['--compressed'], # adds 'Accept-Encoding: gzip'
2025-10-04 09:56:55 +00:00
['--proxy', cfg.proxy, isinstance(cfg.proxy, str)],
['--silent', not cfg.verbose],
['--connect-timeout', str(gcfg.http_timeout), gcfg.http_timeout],
2025-10-04 09:56:55 +00:00
[self.api_url])
if gcfg.testing:
2025-10-04 09:56:55 +00:00
Msg(fmt_list(curl_cmd, fmt='bare'))
return
try:
2025-10-04 09:56:55 +00:00
return run(curl_cmd, check=True, stdout=PIPE).stdout.decode()
except CalledProcessError as e:
msg('')
from .Misc import curl_exit_codes
msg(red(curl_exit_codes[e.returncode]))
2025-10-01 15:26:41 +00:00
msg(red('Command line:\n {}'.format(
' '.join((repr(i) if ' ' in i else i) for i in e.cmd))))
from mmgen.exception import MMGenCalledProcessError
2025-10-01 15:26:41 +00:00
raise MMGenCalledProcessError(
f'Subprocess returned non-zero exit status {e.returncode}')
def get_data(self):
if not os.path.exists(cfg.cachedir):
os.makedirs(cfg.cachedir)
if not os.path.exists(self.json_fn):
2025-10-04 09:56:55 +00:00
open(self.json_fn, 'w').write('{}')
use_cached_data = cfg.cached_data and not gcfg.download
if use_cached_data:
data_type = 'json'
data_in = open(self.json_fn).read()
else:
data_type = self.net_data_type
elapsed = int(time.time() - os.stat(self.json_fn).st_mtime)
2023-11-05 13:40:22 +00:00
if elapsed >= self.timeout or gcfg.testing:
if gcfg.testing:
msg('')
2023-11-05 13:40:22 +00:00
self.fetch_delay()
msg_r(f'Fetching {self.data_desc} from {self.api_host}...')
if self.has_verbose and cfg.verbose:
msg('')
data_in = self.get_data_from_network()
msg('done')
if gcfg.testing:
return {}
else:
2025-10-04 09:56:55 +00:00
die(1, self.rate_limit_errmsg(elapsed))
2025-10-01 15:26:42 +00:00
match data_type:
case 'json':
try:
data = json.loads(data_in)
except:
self.json_data_error_msg(data_in)
2025-10-04 09:56:55 +00:00
die(2, 'Retrieved data is not valid JSON, exiting')
2025-10-01 15:26:42 +00:00
json_text = data_in
case 'python':
data = data_in
json_text = json.dumps(data_in)
if not data:
if use_cached_data:
die(1,
f'No cached {self.data_desc}! Run command without the --cached-data option, '
'or use --download to retrieve data from remote host')
else:
2025-10-04 09:56:55 +00:00
die(2, 'Remote host returned no data!')
elif 'error' in data:
2025-10-04 09:56:55 +00:00
die(1, data['error'])
if use_cached_data:
if not cfg.quiet:
msg(f'Using cached data from ~/{self.json_fn_rel}')
else:
if os.path.exists(self.json_fn):
os.rename(self.json_fn, self.json_fn + '.bak')
with open(self.json_fn, 'w') as fh:
fh.write(json_text)
if not cfg.quiet:
msg(f'JSON data cached to ~/{self.json_fn_rel}')
if gcfg.download:
sys.exit(0)
return self.postprocess_data(data)
2025-10-04 09:56:55 +00:00
def json_data_error_msg(self, json_text):
pass
2025-10-04 09:56:55 +00:00
def postprocess_data(self, data):
return data
@property
def json_fn_rel(self):
2025-10-04 09:56:55 +00:00
return os.path.relpath(self.json_fn, start=homedir)
class coinpaprika(base):
desc = 'CoinPaprika'
2023-11-05 13:40:22 +00:00
data_desc = 'cryptocurrency data'
api_host = 'api.coinpaprika.com'
2025-10-12 10:01:51 +00:00
api_proto = 'https'
ratelimit = 240
btc_ratelimit = 10
net_data_type = 'json'
has_verbose = True
dfl_asset_limit = 2000
def __init__(self):
self.asset_limit = int(cfg.asset_limit or self.dfl_asset_limit)
2025-10-04 09:56:55 +00:00
def rate_limit_errmsg(self, elapsed):
return (
f'Rate limit exceeded! Retry in {self.timeout-elapsed} seconds' +
2025-10-04 09:56:55 +00:00
('' if cfg.btc_only else ', or use --cached-data or --btc'))
@property
def api_url(self):
return (
2025-10-12 10:01:51 +00:00
f'{self.api_proto}://{self.api_host}/v1/tickers/btc-bitcoin'
2025-10-12 10:01:47 +00:00
if cfg.btc_only else
2025-10-12 10:01:51 +00:00
f'{self.api_proto}://{self.api_host}/v1/tickers?limit={self.asset_limit}'
2025-10-12 10:01:47 +00:00
if self.asset_limit else
2025-10-12 10:01:51 +00:00
f'{self.api_proto}://{self.api_host}/v1/tickers')
@property
def json_fn(self):
return os.path.join(
cfg.cachedir,
2025-10-04 09:56:55 +00:00
'ticker-btc.json' if cfg.btc_only else 'ticker.json')
@property
def timeout(self):
return 0 if gcfg.test_suite else self.btc_ratelimit if cfg.btc_only else self.ratelimit
2025-10-04 09:56:55 +00:00
def json_data_error_msg(self, json_text):
tor_captcha_msg = f"""
If youre using Tor, the API request may have failed due to Captcha protection.
A workaround for this issue is to retrieve the JSON data with a browser from
the following URL:
{self.api_url}
and save it to:
{cfg.cachedir}/ticker.json
Then invoke the program with --cached-data and without --btc
"""
msg(json_text[:1024] + '...')
2025-10-04 09:56:55 +00:00
msg(orange(fmt(tor_captcha_msg, strip_char='\t')))
2025-10-04 09:56:55 +00:00
def postprocess_data(self, data):
return [data] if cfg.btc_only else data
@staticmethod
2025-10-04 09:56:55 +00:00
def parse_asset_id(s, require_label):
sym, label = (*s.split('-', 1), None)[:2]
if require_label and not label:
2025-10-04 09:56:55 +00:00
die(1, f'{s!r}: asset label is missing')
return asset_tuple(
symbol = sym.upper(),
id = (s.lower() if label else None),
2025-10-04 09:56:55 +00:00
source = 'cc')
2023-11-05 13:40:22 +00:00
class yahoospot(base):
desc = 'Yahoo Finance'
2023-11-05 13:40:22 +00:00
data_desc = 'spot financial data'
api_host = 'finance.yahoo.com'
ratelimit = 30
net_data_type = 'python'
has_verbose = False
asset_id_pat = r'^\^.*|.*=[xf]$'
2023-11-05 13:40:22 +00:00
json_fn_basename = 'ticker-finance.json'
@staticmethod
2025-10-04 09:56:55 +00:00
def get_id(sym, data):
return sym.lower()
@staticmethod
2025-10-04 09:56:55 +00:00
def conv_data(sym, data, btcusd):
price_usd = Decimal(data['regularMarketPrice']['raw'])
return {
'id': sym,
'name': data['shortName'],
'symbol': sym.upper(),
'price_usd': price_usd,
'price_btc': price_usd / btcusd,
'percent_change_1y': data['pct_chg_1y'],
'percent_change_30d': data['pct_chg_4wks'],
'percent_change_7d': data['pct_chg_1wk'],
'percent_change_24h': data['regularMarketChangePercent']['raw'] * 100,
2025-10-04 09:56:55 +00:00
'last_updated': data['regularMarketTime']}
2025-10-04 09:56:55 +00:00
def rate_limit_errmsg(self, elapsed):
return f'Rate limit exceeded! Retry in {self.timeout-elapsed} seconds, or use --cached-data'
@property
def json_fn(self):
2025-10-04 09:56:55 +00:00
return os.path.join(cfg.cachedir, self.json_fn_basename)
@property
def timeout(self):
return 0 if gcfg.test_suite else self.ratelimit
2023-11-05 13:40:22 +00:00
@property
def symbols(self):
2025-10-04 09:56:55 +00:00
return [r.symbol for r in cfg.rows if isinstance(r, tuple) and r.source == 'fi']
2023-11-05 13:40:22 +00:00
def get_data_from_network(self):
2023-11-05 13:40:22 +00:00
kwargs = {
'formatted': True,
'asynchronous': True,
2025-10-04 09:56:55 +00:00
'proxies': {'https': cfg.proxy2}}
if gcfg.test_suite:
2025-10-04 09:56:55 +00:00
kwargs.update({'timeout': 1, 'retry': 0})
if gcfg.http_timeout:
kwargs.update({'timeout': gcfg.http_timeout})
if gcfg.testing:
Msg('\nyahooquery.Ticker(\n {},\n {}\n)'.format(
2023-11-05 13:40:22 +00:00
self.symbols,
2025-10-04 09:56:55 +00:00
fmt_dict(kwargs, fmt='kwargs')))
return
from yahooquery import Ticker
2025-10-04 09:56:55 +00:00
return self.process_network_data(Ticker(self.symbols,**kwargs))
2023-11-05 13:40:22 +00:00
2025-10-04 09:56:55 +00:00
def process_network_data(self, ticker):
2023-11-05 13:40:22 +00:00
return ticker.price
@staticmethod
2025-10-04 09:56:55 +00:00
def parse_asset_id(s, require_label):
return asset_tuple(
symbol = s.upper(),
id = s.lower(),
2025-10-04 09:56:55 +00:00
source = 'fi')
class yahoohist(yahoospot):
json_fn_basename = 'ticker-finance-history.json'
data_desc = 'historical financial data'
net_data_type = 'json'
period = '1y'
interval = '1wk'
2025-10-04 09:56:55 +00:00
def process_network_data(self, ticker):
return ticker.history(
period = self.period,
interval = self.interval).to_json(orient='index')
2025-10-04 09:56:55 +00:00
def postprocess_data(self, data):
def gen():
keys = set()
2025-10-01 15:26:41 +00:00
d = {}
for key, val in data.items():
2025-10-04 09:56:55 +00:00
if m := re.match(r"\('(.*?)', datetime\.date\((.*)\)\)$", key):
date = '{}-{:>02}-{:>02}'.format(*m[2].split(', '))
if (sym := m[1]) in keys:
d[date] = val
else:
keys.add(sym)
2025-10-01 15:26:41 +00:00
d = {date: val}
yield (sym, d)
return dict(gen())
def assets_list_gen(cfg_in):
2025-10-04 09:56:55 +00:00
for k, v in cfg_in.cfg['assets'].items():
2023-10-03 14:35:57 +00:00
yield ''
yield k.upper()
for e in v:
2025-10-04 09:56:55 +00:00
out = e.split('-', 1)
yield ' {:5s} {}'.format(out[0], out[1] if len(out) == 2 else '')
def gen_data(data):
"""
Filter the raw data and return it as a dict keyed by the IDs of the assets
we want to display.
Add dummy entry for USD and entry for user-specified asset, if any.
Since symbols in source data are not guaranteed to be unique (e.g. XAG), we
must search the data twice: first for unique IDs, then for symbols while
checking for duplicates.
"""
def dup_sym_errmsg(dup_sym):
return (
f'The symbol {dup_sym!r} is shared by the following assets:\n' +
'\n ' + '\n '.join(d['id'] for d in data['cc'] if d['symbol'] == dup_sym) +
'\n\nPlease specify the asset by one of the full IDs listed above\n' +
2025-10-04 09:56:55 +00:00
f'instead of {dup_sym!r}')
2025-10-04 09:56:55 +00:00
def check_assets_found(wants, found, keys=['symbol', 'id']):
error = False
for k in keys:
missing = wants[k] - found[k]
if missing:
msg(
('The following IDs were not found in source data:\n{}' if k == 'id' else
'The following symbols could not be resolved:\n{}').format(
2025-10-04 09:56:55 +00:00
fmt_list(missing, fmt='col', indent=' ')))
error = True
if error:
2025-10-04 09:56:55 +00:00
die(1, 'Missing data, exiting')
rows_want = {
2025-10-04 09:56:55 +00:00
'id': {r.id for r in cfg.rows if isinstance(r, tuple) and r.id} - {'usd-us-dollar'},
'symbol': {r.symbol for r in cfg.rows if isinstance(r, tuple) and r.id is None} - {'USD'}}
usr_rate_assets = tuple(u.rate_asset for u in cfg.usr_rows + cfg.usr_columns if u.rate_asset)
usr_rate_assets_want = {
'id': {a.id for a in usr_rate_assets if a.id},
2025-10-04 09:56:55 +00:00
'symbol': {a.symbol for a in usr_rate_assets if not a.id}}
usr_assets = cfg.usr_rows + cfg.usr_columns + tuple(c for c in (cfg.query or ()) if c)
usr_wants = {
'id': (
{a.id for a in usr_assets + usr_rate_assets if a.id} -
2025-10-04 09:56:55 +00:00
{a.id for a in usr_assets if a.rate and a.id} - {'usd-us-dollar'})
,
'symbol': (
{a.symbol for a in usr_assets + usr_rate_assets if not a.id} -
2025-10-04 09:56:55 +00:00
{a.symbol for a in usr_assets if a.rate} - {'USD'})}
2025-10-04 09:56:55 +00:00
found = {'id': set(), 'symbol': set()}
rate_assets = {}
2025-10-04 09:56:55 +00:00
wants = {k: rows_want[k] | usr_wants[k] for k in ('id', 'symbol')}
for d in data['cc']:
if d['id'] == 'btc-bitcoin':
btcusd = Decimal(str(d['quotes']['USD']['price']))
break
get_id = src_cls['fi'].get_id
conv_func = src_cls['fi'].conv_data
2025-10-04 09:56:55 +00:00
for k, v in data['fi'].items():
id = get_id(k, v)
if wants['id']:
if id in wants['id']:
2025-10-04 09:56:55 +00:00
if not isinstance(v, dict):
2024-03-10 14:44:57 +00:00
die(2, str(v))
if id in found['id']:
2025-10-04 09:56:55 +00:00
die(1, dup_sym_errmsg(id))
if m := data['hi'].get(k):
spot = v['regularMarketPrice']['raw']
hist = tuple(m.values())
v['pct_chg_1wk'], v['pct_chg_4wks'], v['pct_chg_1y'] = (
(spot / hist[-2]['close'] - 1) * 100,
(spot / hist[-5]['close'] - 1) * 100, # 4 weeks ≈ 1 month
2025-10-04 09:56:55 +00:00
(spot / hist[0]['close'] - 1) * 100)
else:
v['pct_chg_1wk'] = v['pct_chg_4wks'] = v['pct_chg_1y'] = None
2025-10-04 09:56:55 +00:00
yield (id, conv_func(id, v, btcusd))
found['id'].add(id)
wants['id'].remove(id)
if id in usr_rate_assets_want['id']:
2025-10-04 09:56:55 +00:00
rate_assets[k] = conv_func(id, v, btcusd) # NB: using symbol instead of ID for key
else:
break
2025-10-04 09:56:55 +00:00
for k in ('id', 'symbol'):
for d in data['cc']:
if wants[k]:
if d[k] in wants[k]:
if d[k] in found[k]:
2025-10-04 09:56:55 +00:00
die(1, dup_sym_errmsg(d[k]))
if not 'price_usd' in d:
d['price_usd'] = Decimal(str(d['quotes']['USD']['price']))
d['price_btc'] = Decimal(str(d['quotes']['USD']['price'])) / btcusd
d['percent_change_24h'] = d['quotes']['USD']['percent_change_24h']
d['percent_change_7d'] = d['quotes']['USD']['percent_change_7d']
d['percent_change_30d'] = d['quotes']['USD']['percent_change_30d']
d['percent_change_1y'] = d['quotes']['USD']['percent_change_1y']
# .replace('Z','+00:00') -- Python 3.9 backport
2025-10-04 09:56:55 +00:00
d['last_updated'] = int(datetime.datetime.fromisoformat(
d['last_updated'].replace('Z', '+00:00')).timestamp())
yield (d['id'], d)
found[k].add(d[k])
wants[k].remove(d[k])
if d[k] in usr_rate_assets_want[k]:
rate_assets[d['symbol']] = d # NB: using symbol instead of ID for key
else:
break
2025-10-04 09:56:55 +00:00
check_assets_found(usr_wants, found)
for asset in (cfg.usr_rows + cfg.usr_columns):
2022-08-06 09:48:48 +00:00
if asset.rate:
"""
User-supplied rate overrides rate from source data.
"""
_id = asset.id or f'{asset.symbol}-user-asset-{asset.symbol}'.lower()
ra_rate = rate_assets[asset.rate_asset.symbol]['price_usd'] if asset.rate_asset else 1
2025-10-04 09:56:55 +00:00
yield (_id, {
'symbol': asset.symbol,
'id': _id,
'name': ' '.join(_id.split('-')[1:]),
'price_usd': ra_rate / asset.rate,
'price_btc': ra_rate / asset.rate / btcusd,
2025-10-04 09:56:55 +00:00
'last_updated': None})
yield ('usd-us-dollar', {
'symbol': 'USD',
'id': 'usd-us-dollar',
'name': 'US Dollar',
'price_usd': Decimal(1),
'price_btc': Decimal(1) / btcusd,
2025-10-04 09:56:55 +00:00
'last_updated': None})
def main():
def update_sample_file(usr_cfg_file):
2025-10-04 09:56:55 +00:00
usr_data = files('mmgen_node_tools').joinpath('data', os.path.basename(usr_cfg_file)).read_text()
sample_file = usr_cfg_file + '.sample'
sample_data = open(sample_file).read() if os.path.exists(sample_file) else None
if usr_data != sample_data:
2025-10-04 09:56:55 +00:00
os.makedirs(os.path.dirname(sample_file), exist_ok=True)
msg('{} {}'.format(
2025-10-04 09:56:55 +00:00
('Updating', 'Creating')[sample_data is None],
sample_file))
open(sample_file, 'w').write(usr_data)
try:
from importlib.resources import files # Python 3.9
except ImportError:
from importlib_resources import files
update_sample_file(cfg_in.cfg_file)
update_sample_file(cfg_in.portfolio_file)
2023-03-28 18:16:33 +00:00
if gcfg.portfolio and not cfg_in.portfolio:
2025-10-04 09:56:55 +00:00
die(1, 'No portfolio configured!\nTo configure a portfolio, edit the file ~/{}'.format(
os.path.relpath(cfg_in.portfolio_file, start=homedir)))
if gcfg.list_ids:
src_ids = ['cc']
elif gcfg.download:
2023-11-05 13:40:22 +00:00
if not gcfg.download in DataSource.get_sources():
2025-10-04 09:56:55 +00:00
die(1, f'{gcfg.download!r}: invalid data source')
src_ids = [gcfg.download]
else:
2023-11-05 13:40:22 +00:00
src_ids = DataSource.get_sources(randomize=True)
2025-10-04 09:56:55 +00:00
src_data = {k: src_cls[k]().get_data() for k in src_ids}
if gcfg.testing:
return
2023-03-28 18:16:33 +00:00
if gcfg.list_ids:
do_pager('\n'.join(e['id'] for e in src_data['cc']))
return
global now
2023-03-28 18:16:33 +00:00
now = 1659465400 if gcfg.test_suite else time.time() # 1659524400 1659445900
data = dict(gen_data(src_data))
(do_pager if cfg.pager else Msg_r)(
2025-10-04 09:56:55 +00:00
'\n'.join(getattr(Ticker, cfg.clsname)(data).gen_output()) + '\n')
2023-10-13 09:50:15 +00:00
def make_cfg(gcfg_arg):
2025-10-04 09:56:55 +00:00
query_tuple = namedtuple('query', ['asset', 'to_asset'])
asset_data = namedtuple('asset_data', ['symbol', 'id', 'amount', 'rate', 'rate_asset', 'source'])
2023-10-03 14:35:57 +00:00
2025-10-04 09:56:55 +00:00
def parse_asset_id(s, require_label=False):
return src_cls['fi' if re.match(fi_pat, s) else 'cc'].parse_asset_id(s, require_label)
2023-10-03 14:35:57 +00:00
def get_rows_from_cfg(add_data=None):
def gen():
2025-10-04 09:56:55 +00:00
for n, (k, v) in enumerate(cfg_in.cfg['assets'].items()):
2023-10-03 14:35:57 +00:00
yield k
if add_data and k in add_data:
v += tuple(add_data[k])
for e in v:
2025-10-04 09:56:55 +00:00
yield parse_asset_id(e, require_label=True)
return tuple(gen())
def parse_percent_cols(arg):
if arg is None:
return []
res = arg.lower().split(',')
for s in res:
if s not in percent_cols:
2025-10-04 09:56:55 +00:00
die(1, '{!r}: invalid --percent-cols parameter (valid letters: {})'.format(
arg,
fmt_list(percent_cols)))
return res
2025-10-04 09:56:55 +00:00
def parse_usr_asset_arg(key, use_cf_file=False):
2022-08-06 09:48:48 +00:00
"""
asset_id[:rate[:rate_asset]]
2022-08-06 09:48:48 +00:00
"""
def parse_parm(s):
ss = s.split(':')
2025-10-04 09:56:55 +00:00
assert len(ss) in (1, 2, 3), f'{s}: malformed argument'
asset_id, rate, rate_asset = (*ss, None, None)[:3]
2022-08-06 09:48:48 +00:00
parsed_id = parse_asset_id(asset_id)
return asset_data(
symbol = parsed_id.symbol,
id = parsed_id.id,
amount = None,
rate = (
None if rate is None else
1 / Decimal(rate[:-1]) if rate.lower().endswith('r') else
2025-10-04 09:56:55 +00:00
Decimal(rate)),
rate_asset = parse_asset_id(rate_asset) if rate_asset else None,
2025-10-04 09:56:55 +00:00
source = parsed_id.source)
2022-08-06 09:48:48 +00:00
2025-10-04 09:56:55 +00:00
cl_opt = getattr(gcfg, key)
cf_opt = cfg_in.cfg.get(key,[]) if use_cf_file else []
2025-10-04 09:56:55 +00:00
return tuple(parse_parm(s) for s in (cl_opt.split(',') if cl_opt else cf_opt))
def parse_query_arg(s):
2022-08-06 09:48:48 +00:00
"""
asset_id:amount[:to_asset_id[:to_amount]]
"""
2025-10-04 09:56:55 +00:00
def parse_query_asset(asset_id, amount):
2022-08-06 09:48:48 +00:00
parsed_id = parse_asset_id(asset_id)
return asset_data(
symbol = parsed_id.symbol,
id = parsed_id.id,
amount = None if amount is None else Decimal(amount),
rate = None,
rate_asset = None,
2025-10-04 09:56:55 +00:00
source = parsed_id.source)
2022-08-06 09:48:48 +00:00
ss = s.split(':')
2025-10-04 09:56:55 +00:00
assert len(ss) in (2, 3, 4), f'{s}: malformed argument'
asset_id, amount, to_asset_id, to_amount = (*ss, None, None)[:4]
2022-08-06 09:48:48 +00:00
return query_tuple(
2025-10-04 09:56:55 +00:00
asset = parse_query_asset(asset_id, amount),
to_asset = parse_query_asset(to_asset_id, to_amount) if to_asset_id else None)
2025-10-04 09:56:55 +00:00
def gen_uniq(obj_list, key, preload=None):
found = set([getattr(obj, key) for obj in preload if hasattr(obj, key)] if preload else ())
for obj in obj_list:
2025-10-04 09:56:55 +00:00
id = getattr(obj, key)
if id not in found:
yield obj
found.add(id)
def get_usr_assets():
return (
'user_added',
usr_rows +
(tuple(asset for asset in query if asset) if query else ()) +
2025-10-04 09:56:55 +00:00
usr_columns)
def get_portfolio_assets(ret=()):
2023-03-28 18:16:33 +00:00
if cfg_in.portfolio and gcfg.portfolio:
2025-10-04 09:56:55 +00:00
ret = (parse_asset_id(e, require_label=True) for e in cfg_in.portfolio)
return ('portfolio', tuple(e for e in ret if (not gcfg.btc) or e.symbol == 'BTC'))
def get_portfolio():
2025-10-04 09:56:55 +00:00
return {k: Decimal(v) for k, v in cfg_in.portfolio.items()
if (not gcfg.btc) or k == 'btc-bitcoin'}
2024-03-10 14:44:57 +00:00
def parse_add_precision(arg):
if not arg:
return 0
2024-03-10 14:44:57 +00:00
s = str(arg)
if not (s.isdigit() and s.isascii()):
2025-10-04 09:56:55 +00:00
die(1, f'{s}: invalid parameter for --add-precision (not an integer)')
if int(s) > 30:
2025-10-04 09:56:55 +00:00
die(1, f'{s}: invalid parameter for --add-precision (value >30)')
return int(s)
def create_rows():
rows = (
('trade_pair',) + query if (query and query.to_asset) else
2025-10-04 09:56:55 +00:00
('bitcoin', parse_asset_id('btc-bitcoin')) if gcfg.btc else
get_rows_from_cfg(add_data={'fiat':['usd-us-dollar']} if gcfg.add_columns else None))
for hdr, data in (
(get_usr_assets(),) if query else
(get_usr_assets(), get_portfolio_assets())):
if data:
2025-10-04 09:56:55 +00:00
uniq_data = tuple(gen_uniq(data, 'symbol', preload=rows))
if uniq_data:
rows += (hdr,) + uniq_data
return rows
cfg_tuple = namedtuple('global_cfg',[
'rows',
'usr_rows',
'usr_columns',
'query',
'adjust',
'clsname',
'btc_only',
'add_prec',
'cachedir',
'proxy',
'proxy2',
'portfolio',
'percent_cols',
'asset_limit',
'cached_data',
'elapsed',
'name_labels',
'pager',
'thousands_comma',
'update_time',
'quiet',
'verbose'])
2025-10-04 09:56:55 +00:00
global gcfg, cfg_in, src_cls, cfg
2023-10-13 09:50:15 +00:00
gcfg = gcfg_arg
2025-10-04 09:56:55 +00:00
src_cls = {k: getattr(DataSource, v) for k, v in DataSource.get_sources().items()}
fi_pat = src_cls['fi'].asset_id_pat
cmd_args = gcfg._args
cfg_in = get_cfg_in()
2025-10-12 10:01:51 +00:00
if gcfg.test_suite: # required for testing with overlay
from . import Ticker as this_mod
this_mod.src_cls = src_cls
this_mod.cfg_in = cfg_in
usr_rows = parse_usr_asset_arg('add_rows')
2025-10-04 09:56:55 +00:00
usr_columns = parse_usr_asset_arg('add_columns', use_cf_file=True)
query = parse_query_arg(cmd_args[0]) if cmd_args else None
def get_proxy(name):
2025-10-04 09:56:55 +00:00
proxy = getattr(gcfg, name)
return (
'' if proxy == '' else 'none' if (proxy and proxy.lower() == 'none')
2025-10-04 09:56:55 +00:00
else (proxy or cfg_in.cfg.get(name)))
proxy = get_proxy('proxy')
proxy = None if proxy == 'none' else proxy
proxy2 = get_proxy('proxy2')
cfg = cfg_tuple(
rows = create_rows(),
usr_rows = usr_rows,
usr_columns = usr_columns,
query = query,
2025-10-04 09:56:55 +00:00
adjust = (lambda x: (100 + x) / 100 if x else 1)(Decimal(gcfg.adjust or 0)),
clsname = 'trading' if query else 'overview',
btc_only = gcfg.btc or cfg_in.cfg.get('btc'),
add_prec = parse_add_precision(gcfg.add_precision or cfg_in.cfg.get('add_precision')),
cachedir = gcfg.cachedir or cfg_in.cfg.get('cachedir') or dfl_cachedir,
proxy = proxy,
proxy2 = None if proxy2 == 'none' else '' if proxy2 == '' else (proxy2 or proxy),
2024-03-10 14:44:57 +00:00
portfolio =
get_portfolio()
if cfg_in.portfolio
and (gcfg.portfolio or cfg_in.cfg.get('portfolio'))
2024-03-10 14:44:57 +00:00
and not query
else None,
percent_cols = parse_percent_cols(gcfg.percent_cols or cfg_in.cfg.get('percent_cols')),
asset_limit = gcfg.asset_limit or cfg_in.cfg.get('asset_limit'),
cached_data = gcfg.cached_data or cfg_in.cfg.get('cached_data'),
elapsed = gcfg.elapsed or cfg_in.cfg.get('elapsed'),
name_labels = gcfg.name_labels or cfg_in.cfg.get('name_labels'),
pager = gcfg.pager or cfg_in.cfg.get('pager'),
thousands_comma = gcfg.thousands_comma or cfg_in.cfg.get('thousands_comma'),
update_time = gcfg.update_time or cfg_in.cfg.get('update_time'),
quiet = gcfg.quiet or cfg_in.cfg.get('quiet'),
2025-10-04 09:56:55 +00:00
verbose = gcfg.verbose or cfg_in.cfg.get('verbose'))
def get_cfg_in():
2025-10-04 09:56:55 +00:00
ret = namedtuple('cfg_in_data', ['cfg', 'portfolio', 'cfg_file', 'portfolio_file'])
cfg_file, portfolio_file = (
[os.path.join(gcfg.data_dir_root, 'node_tools', fn)
for fn in (cfg_fn, portfolio_fn)])
cfg_data, portfolio_data = (
[yaml.safe_load(open(fn).read()) if os.path.exists(fn) else None
for fn in (cfg_file, portfolio_file)])
return ret(
cfg = cfg_data or {
'assets': {
'coin': [ 'btc-bitcoin', 'eth-ethereum', 'xmr-monero' ],
# gold futures, silver futures, Brent futures
'commodity': [ 'gc=f', 'si=f', 'bz=f' ],
# Pound Sterling, Euro, Swiss Franc
'fiat': [ 'gbpusd=x', 'eurusd=x', 'chfusd=x' ],
# Dow Jones Industrials, Nasdaq 100, S&P 500
2025-10-04 09:56:55 +00:00
'index': [ '^dji', '^ixic', '^gspc' ]},
'proxy': 'http://vpn-gw:8118'},
portfolio = portfolio_data,
cfg_file = cfg_file,
portfolio_file = portfolio_file)
class Ticker:
class base:
offer = None
to_asset = None
2025-10-04 09:56:55 +00:00
def __init__(self, data):
self.comma = ',' if cfg.thousands_comma else ''
2025-10-04 09:56:55 +00:00
self.col1_wid = max(len('TOTAL'), (
max(len(self.create_label(d['id'])) for d in data.values()) if cfg.name_labels else
2025-10-04 09:56:55 +00:00
max(len(d['symbol']) for d in data.values()))) + 1
2025-10-04 09:56:55 +00:00
self.rows = [row._replace(id=self.get_id(row)) if isinstance(row, tuple) else row
for row in cfg.rows]
self.col_usd_prices = {k: self.data[k]['price_usd'] for k in self.col_ids}
2025-10-04 09:56:55 +00:00
self.prices = {row.id: self.get_row_prices(row.id)
for row in self.rows if isinstance(row, tuple) and row.id in data}
self.prices['usd-us-dollar'] = self.get_row_prices('usd-us-dollar')
2025-10-04 09:56:55 +00:00
def format_last_update_col(self, cross_assets=()):
if cfg.elapsed:
2022-10-17 18:35:55 +00:00
from mmgen.util2 import format_elapsed_hr
fmt_func = format_elapsed_hr
else:
2025-10-04 09:56:55 +00:00
fmt_func = lambda t, now: time.strftime('%F %X', time.gmtime(t))
d = self.data
max_w = 0
if cross_assets:
last_updated_x = [d[a.id]['last_updated'] for a in cross_assets]
2025-10-04 09:56:55 +00:00
min_t = min((int(n) for n in last_updated_x if isinstance(n, int)), default=None)
else:
min_t = None
for row in self.rows:
2025-10-04 09:56:55 +00:00
if isinstance(row, tuple):
try:
2025-10-04 09:56:55 +00:00
t = int(d[row.id]['last_updated'])
except TypeError as e:
d[row.id]['last_updated_fmt'] = gray('--' if 'NoneType' in str(e) else str(e))
except KeyError as e:
msg(str(e))
pass
else:
2025-03-15 18:23:16 +00:00
t_fmt = d[row.id]['last_updated_fmt'] = fmt_func(
2025-10-04 09:56:55 +00:00
(min(t, min_t) if min_t else t),
2025-03-15 18:23:16 +00:00
now = now)
max_w = max(len(t_fmt), max_w)
self.upd_w = max_w
def init_prec(self):
2025-03-15 18:23:16 +00:00
exp = [(a.id, self.prices[a.id]['usd-us-dollar'].adjusted()) for a in self.usr_col_assets]
self.uprec = {k: max(0, v+4) + cfg.add_prec for k, v in exp}
self.uwid = {k: 12 + max(0, abs(v)-6) + cfg.add_prec for k, v in exp}
2025-10-04 09:56:55 +00:00
def get_id(self, asset):
if asset.id:
return asset.id
else:
for d in self.data.values():
if d['symbol'] == asset.symbol:
return d['id']
2025-10-04 09:56:55 +00:00
def create_label(self, id):
return self.data[id]['name'].upper()
def gen_output(self):
2025-10-04 09:56:55 +00:00
yield 'Current time: {} UTC'.format(time.strftime('%F %X', time.gmtime(now)))
for asset in self.usr_col_assets:
if asset.symbol != 'USD':
usdprice = self.data[asset.id]['price_usd']
yield '{} ({}) = {:{}.{}f} USD'.format(
asset.symbol,
self.create_label(asset.id),
usdprice,
self.comma,
2025-10-04 09:56:55 +00:00
max(2, 4-usdprice.adjusted()))
2025-10-04 09:56:55 +00:00
if hasattr(self, 'subhdr'):
yield self.subhdr
if self.show_adj:
yield (
('Offered price differs from spot' if self.offer else 'Adjusting prices')
+ ' by '
2025-10-04 09:56:55 +00:00
+ yellow('{:+.2f}%'.format((self.adjust-1) * 100)))
yield ''
if cfg.portfolio:
yield blue('PRICES')
if self.table_hdr:
yield self.table_hdr
for row in self.rows:
2025-10-04 09:56:55 +00:00
if isinstance(row, str):
yield ('-' * self.hl_wid)
else:
try:
yield self.fmt_row(self.data[row.id])
except KeyError:
yield gray(f'(no data for {row.id})')
yield '-' * self.hl_wid
if cfg.portfolio:
self.fs_num = self.fs_num2
self.fs_str = self.fs_str2
yield ''
yield blue('PORTFOLIO')
yield self.table_hdr
yield '-' * self.hl_wid
2025-10-04 09:56:55 +00:00
for sym, amt in cfg.portfolio.items():
try:
2025-10-04 09:56:55 +00:00
yield self.fmt_row(self.data[sym], amt=amt)
except KeyError:
yield gray(f'(no data for {sym})')
yield '-' * self.hl_wid
if not cfg.btc_only:
yield self.fs_num.format(
lbl = 'TOTAL', pc3='', pc4='', pc1='', pc2='', upd='', amt='',
2025-10-04 09:56:55 +00:00
**{k.replace('-', '_'): v for k, v in self.prices['total'].items()})
class overview(base):
2025-10-04 09:56:55 +00:00
def __init__(self, data):
self.data = data
self.adjust = cfg.adjust
self.show_adj = self.adjust != 1
self.usr_col_assets = [asset._replace(id=self.get_id(asset)) for asset in cfg.usr_columns]
self.col_ids = ('usd-us-dollar',) + tuple(a.id for a in self.usr_col_assets) + ('btc-bitcoin',)
super().__init__(data)
self.format_last_update_col()
if cfg.portfolio:
2025-10-04 09:56:55 +00:00
self.prices['total'] = {col_id: sum(self.prices[row.id][col_id] * cfg.portfolio[row.id]
for row in self.rows
if isinstance(row, tuple) and row.id in cfg.portfolio and row.id in data)
for col_id in self.col_ids}
self.init_prec()
self.init_fs()
2025-10-04 09:56:55 +00:00
def get_row_prices(self, id):
if id in self.data:
d = self.data[id]
2025-10-04 09:56:55 +00:00
return {k: (
d['price_btc'] if k == 'btc-bitcoin' else
d['price_usd'] / self.col_usd_prices[k]
2025-10-04 09:56:55 +00:00
) * self.adjust for k in self.col_ids}
2025-10-04 09:56:55 +00:00
def fmt_row(self, d, amt=None, amt_fmt=None):
def fmt_pct(n):
2025-10-04 09:56:55 +00:00
return gray(' --') if n is None else (red, green)[n>=0](f'{n:+7.2f}')
p = self.prices[d['id']]
if amt is not None:
amt_fmt = f'{amt:{19+cfg.add_prec}{self.comma}.{8+cfg.add_prec}f}'
if '.' in amt_fmt:
amt_fmt = amt_fmt.rstrip('0').rstrip('.')
return self.fs_num.format(
lbl = self.create_label(d['id']) if cfg.name_labels else d['symbol'],
pc1 = fmt_pct(d.get('percent_change_7d')),
pc2 = fmt_pct(d.get('percent_change_24h')),
pc3 = fmt_pct(d.get('percent_change_1y')),
pc4 = fmt_pct(d.get('percent_change_30d')),
upd = d.get('last_updated_fmt'),
amt = amt_fmt,
2025-10-04 09:56:55 +00:00
**{k.replace('-', '_'): v * (1 if amt is None else amt) for k, v in p.items()})
def init_fs(self):
2025-10-04 09:56:55 +00:00
col_prec = {'usd-us-dollar': 2+cfg.add_prec, 'btc-bitcoin': 8+cfg.add_prec} | self.uprec
max_row = max(
2025-10-04 09:56:55 +00:00
((k, v['btc-bitcoin']) for k, v in self.prices.items()),
key = lambda a: a[1])
widths = {k: len('{:{}.{}f}'.format(self.prices[max_row[0]][k], self.comma, col_prec[k]))
for k in self.col_ids}
2025-10-04 09:56:55 +00:00
fd = namedtuple('format_str_data', ['fs_str', 'fs_num', 'wid'])
col_fs_data = {
2025-10-04 09:56:55 +00:00
'label': fd(f'{{lbl:{self.col1_wid}}}', f'{{lbl:{self.col1_wid}}}', self.col1_wid),
'pct1y': fd(' {pc3:7}', ' {pc3:7}', 8),
'pct1m': fd(' {pc4:7}', ' {pc4:7}', 8),
'pct1w': fd(' {pc1:7}', ' {pc1:7}', 8),
'pct1d': fd(' {pc2:7}', ' {pc2:7}', 8),
2025-10-04 09:56:55 +00:00
'update_time': fd(' {upd}', ' {upd}',
max((19 if cfg.portfolio else 0), self.upd_w) + 2),
'amt': fd(' {amt}', ' {amt}', 21)
} | {k: fd(
' {{{}:>{}}}'.format(k.replace('-', '_'), widths[k]),
' {{{}:{}{}.{}f}}'.format(k.replace('-', '_'), widths[k], self.comma, col_prec[k]),
widths[k] + 2
) for k in self.col_ids}
cols = (
2025-10-04 09:56:55 +00:00
['label', 'usd-us-dollar'] +
[asset.id for asset in self.usr_col_assets] +
2025-10-04 09:56:55 +00:00
[a for a, b in (
('btc-bitcoin', not cfg.btc_only),
('pct1y', 'y' in cfg.percent_cols),
('pct1m', 'm' in cfg.percent_cols),
('pct1w', 'w' in cfg.percent_cols),
('pct1d', 'd' in cfg.percent_cols),
('update_time', cfg.update_time))
if b])
cols2 = list(cols)
if cfg.update_time:
cols2.pop()
cols2.append('amt')
self.fs_str = ''.join(col_fs_data[c].fs_str for c in cols)
self.fs_num = ''.join(col_fs_data[c].fs_num for c in cols)
self.hl_wid = sum(col_fs_data[c].wid for c in cols)
self.fs_str2 = ''.join(col_fs_data[c].fs_str for c in cols2)
self.fs_num2 = ''.join(col_fs_data[c].fs_num for c in cols2)
self.hl_wid2 = sum(col_fs_data[c].wid for c in cols2)
@property
def table_hdr(self):
return self.fs_str.format(
lbl = '',
pc1 = ' CHG_7d',
pc2 = 'CHG_24h',
pc3 = 'CHG_1y',
pc4 = 'CHG_30d',
upd = 'UPDATED',
amt = ' AMOUNT',
usd_us_dollar = 'USD',
btc_bitcoin = ' BTC',
2025-10-04 09:56:55 +00:00
**{a.id.replace('-', '_'): a.symbol for a in self.usr_col_assets})
class trading(base):
2025-10-04 09:56:55 +00:00
def __init__(self, data):
self.data = data
self.asset = cfg.query.asset._replace(id=self.get_id(cfg.query.asset))
self.to_asset = (
cfg.query.to_asset._replace(id=self.get_id(cfg.query.to_asset))
2025-10-04 09:56:55 +00:00
if cfg.query.to_asset else None)
self.col_ids = [self.asset.id]
self.adjust = cfg.adjust
if self.to_asset:
self.offer = self.to_asset.amount
if self.offer:
real_price = (
self.asset.amount
* data[self.asset.id]['price_usd']
2025-10-04 09:56:55 +00:00
/ data[self.to_asset.id]['price_usd'])
if self.adjust != 1:
2025-10-04 09:56:55 +00:00
die(1,
'the --adjust option may not be combined with TO_AMOUNT '
'in the trade specifier')
self.adjust = self.offer / real_price
2025-10-04 09:56:55 +00:00
self.hl_ids = [self.asset.id, self.to_asset.id]
else:
self.hl_ids = [self.asset.id]
self.show_adj = self.adjust != 1 or self.offer
super().__init__(data)
self.usr_col_assets = [self.asset] + ([self.to_asset] if self.to_asset else [])
for a in self.usr_col_assets:
self.prices[a.id]['usd-us-dollar'] = data[a.id]['price_usd']
self.format_last_update_col(cross_assets=self.usr_col_assets)
self.init_prec()
self.init_fs()
2025-10-04 09:56:55 +00:00
def get_row_prices(self, id):
if id in self.data:
d = self.data[id]
2025-10-04 09:56:55 +00:00
return {k: self.col_usd_prices[self.asset.id] / d['price_usd'] for k in self.col_ids}
def init_fs(self):
self.max_wid = max(
len('{:{}{}.{}f}'.format(
v[self.asset.id] * self.asset.amount,
16 + cfg.add_prec,
self.comma,
2025-10-04 09:56:55 +00:00
8 + cfg.add_prec))
for v in self.prices.values())
self.fs_str = '{lbl:%s} {p_spot}' % self.col1_wid
self.hl_wid = self.col1_wid + self.max_wid + 1
if self.show_adj:
self.fs_str += ' {p_adj}'
self.hl_wid += self.max_wid + 1
if cfg.update_time:
self.fs_str += ' {upd}'
self.hl_wid += self.upd_w + 2
2025-10-04 09:56:55 +00:00
def fmt_row(self, d):
id = d['id']
p = self.prices[id][self.asset.id] * self.asset.amount
2025-10-04 09:56:55 +00:00
p_spot = '{:{}{}.{}f}'.format(p, self.max_wid, self.comma, 8+cfg.add_prec)
p_adj = (
2025-10-04 09:56:55 +00:00
'{:{}{}.{}f}'.format(p*self.adjust, self.max_wid, self.comma, 8+cfg.add_prec)
if self.show_adj else '')
return self.fs_str.format(
lbl = self.create_label(id) if cfg.name_labels else d['symbol'],
p_spot = green(p_spot) if id in self.hl_ids else p_spot,
p_adj = yellow(p_adj) if id in self.hl_ids else p_adj,
2025-10-04 09:56:55 +00:00
upd = d.get('last_updated_fmt'))
@property
def table_hdr(self):
return self.fs_str.format(
lbl = '',
p_spot = '{t:>{w}}'.format(
t = 'SPOT PRICE',
2025-10-04 09:56:55 +00:00
w = self.max_wid),
p_adj = '{t:>{w}}'.format(
t = ('OFFERED' if self.offer else 'ADJUSTED') + ' PRICE',
2025-10-04 09:56:55 +00:00
w = self.max_wid),
upd = 'UPDATED')
@property
def subhdr(self):
return (
'{a}: {b:{c}} {d}'.format(
a = 'Offer' if self.offer else 'Amount',
b = self.asset.amount,
c = self.comma,
d = self.asset.symbol
) + (
(
' =>' +
2025-10-04 09:56:55 +00:00
(' {:{}}'.format(self.offer, self.comma) if self.offer else '') +
' {} ({})'.format(
self.to_asset.symbol,
2025-10-04 09:56:55 +00:00
self.create_label(self.to_asset.id))
) if self.to_asset else ''))