# pragma pylint: disable=unused-argument, unused-variable, protected-access, invalid-name """ This module manage Telegram communication """ import json import logging import re from dataclasses import dataclass from datetime import date, datetime, timedelta from functools import partial from html import escape from itertools import chain from math import isnan from typing import Any, Callable, Dict, List, Optional, Union import arrow from tabulate import tabulate from telegram import (CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, KeyboardButton, ParseMode, ReplyKeyboardMarkup, Update) from telegram.error import BadRequest, NetworkError, TelegramError from telegram.ext import CallbackContext, CallbackQueryHandler, CommandHandler, Updater from telegram.utils.helpers import escape_markdown from freqtrade.__init__ import __version__ from freqtrade.constants import DUST_PER_COIN from freqtrade.enums import RPCMessageType, SignalDirection, TradingMode from freqtrade.exceptions import OperationalException from freqtrade.misc import chunks, plural, round_coin_value from freqtrade.persistence import Trade from freqtrade.rpc import RPC, RPCException, RPCHandler logger = logging.getLogger(__name__) logger.debug('Included module rpc.telegram ...') MAX_TELEGRAM_MESSAGE_LENGTH = 4096 @dataclass class TimeunitMappings: header: str message: str message2: str callback: str default: int def authorized_only(command_handler: Callable[..., None]) -> Callable[..., Any]: """ Decorator to check if the message comes from the correct chat_id :param command_handler: Telegram CommandHandler :return: decorated function """ def wrapper(self, *args, **kwargs): """ Decorator logic """ update = kwargs.get('update') or args[0] # Reject unauthorized messages if update.callback_query: cchat_id = int(update.callback_query.message.chat.id) else: cchat_id = int(update.message.chat_id) chat_id = int(self._config['telegram']['chat_id']) if cchat_id != chat_id: logger.info( 'Rejected unauthorized message from: %s', update.message.chat_id ) return wrapper # Rollback session to avoid getting data stored in a transaction. Trade.query.session.rollback() logger.debug( 'Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id ) try: return command_handler(self, *args, **kwargs) except BaseException: logger.exception('Exception occurred within Telegram module') return wrapper class Telegram(RPCHandler): """ This class handles all telegram communication """ def __init__(self, rpc: RPC, config: Dict[str, Any]) -> None: """ Init the Telegram call, and init the super class RPCHandler :param rpc: instance of RPC Helper class :param config: Configuration object :return: None """ super().__init__(rpc, config) self._updater: Updater self._init_keyboard() self._init() def _init_keyboard(self) -> None: """ Validates the keyboard configuration from telegram config section. """ self._keyboard: List[List[Union[str, KeyboardButton]]] = [ ['/daily', '/profit', '/balance'], ['/status', '/status table', '/performance'], ['/count', '/start', '/stop', '/help'] ] # do not allow commands with mandatory arguments and critical cmds # TODO: DRY! - its not good to list all valid cmds here. But otherwise # this needs refactoring of the whole telegram module (same # problem in _help()). valid_keys: List[str] = [r'/start$', r'/stop$', r'/status$', r'/status table$', r'/trades$', r'/performance$', r'/buys', r'/entries', r'/sells', r'/exits', r'/mix_tags', r'/daily$', r'/daily \d+$', r'/profit$', r'/profit \d+', r'/stats$', r'/count$', r'/locks$', r'/balance$', r'/stopbuy$', r'/reload_config$', r'/show_config$', r'/logs$', r'/whitelist$', r'/blacklist$', r'/bl_delete$', r'/weekly$', r'/weekly \d+$', r'/monthly$', r'/monthly \d+$', r'/forcebuy$', r'/forcelong$', r'/forceshort$', r'/forcesell$', r'/forceexit$', r'/edge$', r'/health$', r'/help$', r'/version$'] # Create keys for generation valid_keys_print = [k.replace('$', '') for k in valid_keys] # custom keyboard specified in config.json cust_keyboard = self._config['telegram'].get('keyboard', []) if cust_keyboard: combined = "(" + ")|(".join(valid_keys) + ")" # check for valid shortcuts invalid_keys = [b for b in chain.from_iterable(cust_keyboard) if not re.match(combined, b)] if len(invalid_keys): err_msg = ('config.telegram.keyboard: Invalid commands for ' f'custom Telegram keyboard: {invalid_keys}' f'\nvalid commands are: {valid_keys_print}') raise OperationalException(err_msg) else: self._keyboard = cust_keyboard logger.info('using custom keyboard from ' f'config.json: {self._keyboard}') def _init(self) -> None: """ Initializes this module with the given config, registers all known command handlers and starts polling for message updates """ self._updater = Updater(token=self._config['telegram']['token'], workers=0, use_context=True) # Register command handler and start telegram message polling handles = [ CommandHandler('status', self._status), CommandHandler('profit', self._profit), CommandHandler('balance', self._balance), CommandHandler('start', self._start), CommandHandler('stop', self._stop), CommandHandler(['forcesell', 'forceexit', 'fx'], self._force_exit), CommandHandler(['forcebuy', 'forcelong'], partial( self._force_enter, order_side=SignalDirection.LONG)), CommandHandler('forceshort', partial( self._force_enter, order_side=SignalDirection.SHORT)), CommandHandler('trades', self._trades), CommandHandler('delete', self._delete_trade), CommandHandler('performance', self._performance), CommandHandler(['buys', 'entries'], self._enter_tag_performance), CommandHandler(['sells', 'exits'], self._exit_reason_performance), CommandHandler('mix_tags', self._mix_tag_performance), CommandHandler('stats', self._stats), CommandHandler('daily', self._daily), CommandHandler('weekly', self._weekly), CommandHandler('monthly', self._monthly), CommandHandler('count', self._count), CommandHandler('locks', self._locks), CommandHandler(['unlock', 'delete_locks'], self._delete_locks), CommandHandler(['reload_config', 'reload_conf'], self._reload_config), CommandHandler(['show_config', 'show_conf'], self._show_config), CommandHandler('stopbuy', self._stopbuy), CommandHandler('whitelist', self._whitelist), CommandHandler('blacklist', self._blacklist), CommandHandler(['blacklist_delete', 'bl_delete'], self._blacklist_delete), CommandHandler('logs', self._logs), CommandHandler('edge', self._edge), CommandHandler('health', self._health), CommandHandler('help', self._help), CommandHandler('version', self._version), ] callbacks = [ CallbackQueryHandler(self._status_table, pattern='update_status_table'), CallbackQueryHandler(self._daily, pattern='update_daily'), CallbackQueryHandler(self._weekly, pattern='update_weekly'), CallbackQueryHandler(self._monthly, pattern='update_monthly'), CallbackQueryHandler(self._profit, pattern='update_profit'), CallbackQueryHandler(self._balance, pattern='update_balance'), CallbackQueryHandler(self._performance, pattern='update_performance'), CallbackQueryHandler(self._enter_tag_performance, pattern='update_enter_tag_performance'), CallbackQueryHandler(self._exit_reason_performance, pattern='update_exit_reason_performance'), CallbackQueryHandler(self._mix_tag_performance, pattern='update_mix_tag_performance'), CallbackQueryHandler(self._count, pattern='update_count'), CallbackQueryHandler(self._force_exit_inline, pattern=r"force_exit__\S+"), CallbackQueryHandler(self._force_enter_inline, pattern=r"\S+\/\S+"), ] for handle in handles: self._updater.dispatcher.add_handler(handle) for callback in callbacks: self._updater.dispatcher.add_handler(callback) self._updater.start_polling( bootstrap_retries=-1, timeout=20, read_latency=60, # Assumed transmission latency drop_pending_updates=True, ) logger.info( 'rpc.telegram is listening for following commands: %s', [h.command for h in handles] ) def cleanup(self) -> None: """ Stops all running telegram threads. :return: None """ # This can take up to `timeout` from the call to `start_polling`. self._updater.stop() def _exchange_from_msg(self, msg: Dict[str, Any]) -> str: """ Extracts the exchange name from the given message. :param msg: The message to extract the exchange name from. :return: The exchange name. """ return f"{msg['exchange']}{' (dry)' if self._config['dry_run'] else ''}" def _add_analyzed_candle(self, pair: str) -> str: candle_val = self._config['telegram'].get( 'notification_settings', {}).get('show_candle', 'off') if candle_val != 'off': if candle_val == 'ohlc': analyzed_df, _ = self._rpc._freqtrade.dataprovider.get_analyzed_dataframe( pair, self._config['timeframe']) candle = analyzed_df.iloc[-1].squeeze() if len(analyzed_df) > 0 else None if candle is not None: return ( f"*Candle OHLC*: `{candle['open']}, {candle['high']}, " f"{candle['low']}, {candle['close']}`\n" ) return '' def _format_entry_msg(self, msg: Dict[str, Any]) -> str: if self._rpc._fiat_converter: msg['stake_amount_fiat'] = self._rpc._fiat_converter.convert_amount( msg['stake_amount'], msg['stake_currency'], msg['fiat_currency']) else: msg['stake_amount_fiat'] = 0 is_fill = msg['type'] in [RPCMessageType.ENTRY_FILL] emoji = '\N{CHECK MARK}' if is_fill else '\N{LARGE BLUE CIRCLE}' entry_side = ({'enter': 'Long', 'entered': 'Longed'} if msg['direction'] == 'Long' else {'enter': 'Short', 'entered': 'Shorted'}) message = ( f"{emoji} *{self._exchange_from_msg(msg)}:*" f" {entry_side['entered'] if is_fill else entry_side['enter']} {msg['pair']}" f" (#{msg['trade_id']})\n" ) message += self._add_analyzed_candle(msg['pair']) message += f"*Enter Tag:* `{msg['enter_tag']}`\n" if msg.get('enter_tag') else "" message += f"*Amount:* `{msg['amount']:.8f}`\n" if msg.get('leverage') and msg.get('leverage', 1.0) != 1.0: message += f"*Leverage:* `{msg['leverage']}`\n" if msg['type'] in [RPCMessageType.ENTRY_FILL]: message += f"*Open Rate:* `{msg['open_rate']:.8f}`\n" elif msg['type'] in [RPCMessageType.ENTRY]: message += f"*Open Rate:* `{msg['limit']:.8f}`\n"\ f"*Current Rate:* `{msg['current_rate']:.8f}`\n" message += f"*Total:* `({round_coin_value(msg['stake_amount'], msg['stake_currency'])}" if msg.get('fiat_currency'): message += f", {round_coin_value(msg['stake_amount_fiat'], msg['fiat_currency'])}" message += ")`" return message def _format_exit_msg(self, msg: Dict[str, Any]) -> str: msg['amount'] = round(msg['amount'], 8) msg['profit_percent'] = round(msg['profit_ratio'] * 100, 2) msg['duration'] = msg['close_date'].replace( microsecond=0) - msg['open_date'].replace(microsecond=0) msg['duration_min'] = msg['duration'].total_seconds() / 60 msg['enter_tag'] = msg['enter_tag'] if "enter_tag" in msg.keys() else None msg['emoji'] = self._get_sell_emoji(msg) msg['leverage_text'] = (f"*Leverage:* `{msg['leverage']:.1f}`\n" if msg.get('leverage') and msg.get('leverage', 1.0) != 1.0 else "") # Check if all sell properties are available. # This might not be the case if the message origin is triggered by /forceexit if (all(prop in msg for prop in ['gain', 'fiat_currency', 'stake_currency']) and self._rpc._fiat_converter): msg['profit_fiat'] = self._rpc._fiat_converter.convert_amount( msg['profit_amount'], msg['stake_currency'], msg['fiat_currency']) msg['profit_extra'] = ( f" ({msg['gain']}: {msg['profit_amount']:.8f} {msg['stake_currency']}" f" / {msg['profit_fiat']:.3f} {msg['fiat_currency']})") else: msg['profit_extra'] = '' is_fill = msg['type'] == RPCMessageType.EXIT_FILL message = ( f"{msg['emoji']} *{self._exchange_from_msg(msg)}:* " f"{'Exited' if is_fill else 'Exiting'} {msg['pair']} (#{msg['trade_id']})\n" f"{self._add_analyzed_candle(msg['pair'])}" f"*{'Profit' if is_fill else 'Unrealized Profit'}:* " f"`{msg['profit_ratio']:.2%}{msg['profit_extra']}`\n" f"*Enter Tag:* `{msg['enter_tag']}`\n" f"*Exit Reason:* `{msg['exit_reason']}`\n" f"*Duration:* `{msg['duration']} ({msg['duration_min']:.1f} min)`\n" f"*Direction:* `{msg['direction']}`\n" f"{msg['leverage_text']}" f"*Amount:* `{msg['amount']:.8f}`\n" f"*Open Rate:* `{msg['open_rate']:.8f}`\n" ) if msg['type'] == RPCMessageType.EXIT: message += (f"*Current Rate:* `{msg['current_rate']:.8f}`\n" f"*Close Rate:* `{msg['limit']:.8f}`") elif msg['type'] == RPCMessageType.EXIT_FILL: message += f"*Close Rate:* `{msg['close_rate']:.8f}`" return message def compose_message(self, msg: Dict[str, Any], msg_type: RPCMessageType) -> str: if msg_type in [RPCMessageType.ENTRY, RPCMessageType.ENTRY_FILL]: message = self._format_entry_msg(msg) elif msg_type in [RPCMessageType.EXIT, RPCMessageType.EXIT_FILL]: message = self._format_exit_msg(msg) elif msg_type in (RPCMessageType.ENTRY_CANCEL, RPCMessageType.EXIT_CANCEL): msg['message_side'] = 'enter' if msg_type in [RPCMessageType.ENTRY_CANCEL] else 'exit' message = (f"\N{WARNING SIGN} *{self._exchange_from_msg(msg)}:* " f"Cancelling {msg['message_side']} Order for {msg['pair']} " f"(#{msg['trade_id']}). Reason: {msg['reason']}.") elif msg_type == RPCMessageType.PROTECTION_TRIGGER: message = ( f"*Protection* triggered due to {msg['reason']}. " f"`{msg['pair']}` will be locked until `{msg['lock_end_time']}`." ) elif msg_type == RPCMessageType.PROTECTION_TRIGGER_GLOBAL: message = ( f"*Protection* triggered due to {msg['reason']}. " f"*All pairs* will be locked until `{msg['lock_end_time']}`." ) elif msg_type == RPCMessageType.STATUS: message = f"*Status:* `{msg['status']}`" elif msg_type == RPCMessageType.WARNING: message = f"\N{WARNING SIGN} *Warning:* `{msg['status']}`" elif msg_type == RPCMessageType.STARTUP: message = f"{msg['status']}" else: raise NotImplementedError(f"Unknown message type: {msg_type}") return message def send_msg(self, msg: Dict[str, Any]) -> None: """ Send a message to telegram channel """ default_noti = 'on' msg_type = msg['type'] noti = '' if msg_type == RPCMessageType.EXIT: sell_noti = self._config['telegram'] \ .get('notification_settings', {}).get(str(msg_type), {}) # For backward compatibility sell still can be string if isinstance(sell_noti, str): noti = sell_noti else: noti = sell_noti.get(str(msg['exit_reason']), default_noti) else: noti = self._config['telegram'] \ .get('notification_settings', {}).get(str(msg_type), default_noti) if noti == 'off': logger.info(f"Notification '{msg_type}' not sent.") # Notification disabled return message = self.compose_message(msg, msg_type) self._send_msg(message, disable_notification=(noti == 'silent')) def _get_sell_emoji(self, msg): """ Get emoji for sell-side """ if float(msg['profit_percent']) >= 5.0: return "\N{ROCKET}" elif float(msg['profit_percent']) >= 0.0: return "\N{EIGHT SPOKED ASTERISK}" elif msg['exit_reason'] == "stop_loss": return "\N{WARNING SIGN}" else: return "\N{CROSS MARK}" def _prepare_entry_details(self, filled_orders: List, quote_currency: str, is_open: bool): """ Prepare details of trade with entry adjustment enabled """ lines: List[str] = [] if len(filled_orders) > 0: first_avg = filled_orders[0]["safe_price"] for x, order in enumerate(filled_orders): if not order['ft_is_entry'] or order['is_open'] is True: continue cur_entry_datetime = arrow.get(order["order_filled_date"]) cur_entry_amount = order["amount"] cur_entry_average = order["safe_price"] lines.append(" ") if x == 0: lines.append(f"*Entry #{x+1}:*") lines.append( f"*Entry Amount:* {cur_entry_amount} ({order['cost']:.8f} {quote_currency})") lines.append(f"*Average Entry Price:* {cur_entry_average}") else: sumA = 0 sumB = 0 for y in range(x): sumA += (filled_orders[y]["amount"] * filled_orders[y]["safe_price"]) sumB += filled_orders[y]["amount"] prev_avg_price = sumA / sumB price_to_1st_entry = ((cur_entry_average - first_avg) / first_avg) minus_on_entry = 0 if prev_avg_price: minus_on_entry = (cur_entry_average - prev_avg_price) / prev_avg_price dur_entry = cur_entry_datetime - arrow.get( filled_orders[x - 1]["order_filled_date"]) days = dur_entry.days hours, remainder = divmod(dur_entry.seconds, 3600) minutes, seconds = divmod(remainder, 60) lines.append(f"*Entry #{x+1}:* at {minus_on_entry:.2%} avg profit") if is_open: lines.append("({})".format(cur_entry_datetime .humanize(granularity=["day", "hour", "minute"]))) lines.append( f"*Entry Amount:* {cur_entry_amount} ({order['cost']:.8f} {quote_currency})") lines.append(f"*Average Entry Price:* {cur_entry_average} " f"({price_to_1st_entry:.2%} from 1st entry rate)") lines.append(f"*Order filled at:* {order['order_filled_date']}") lines.append(f"({days}d {hours}h {minutes}m {seconds}s from previous entry)") return lines @authorized_only def _status(self, update: Update, context: CallbackContext) -> None: """ Handler for /status. Returns the current TradeThread status :param bot: telegram bot :param update: message update :return: None """ if context.args and 'table' in context.args: self._status_table(update, context) return try: # Check if there's at least one numerical ID provided. # If so, try to get only these trades. trade_ids = [] if context.args and len(context.args) > 0: trade_ids = [int(i) for i in context.args if i.isnumeric()] results = self._rpc._rpc_trade_status(trade_ids=trade_ids) position_adjust = self._config.get('position_adjustment_enable', False) max_entries = self._config.get('max_entry_position_adjustment', -1) messages = [] for r in results: r['open_date_hum'] = arrow.get(r['open_date']).humanize() r['num_entries'] = len([o for o in r['orders'] if o['ft_is_entry']]) r['exit_reason'] = r.get('exit_reason', "") lines = [ "*Trade ID:* `{trade_id}`" + ("` (since {open_date_hum})`" if r['is_open'] else ""), "*Current Pair:* {pair}", "*Direction:* " + ("`Short`" if r.get('is_short') else "`Long`"), "*Leverage:* `{leverage}`" if r.get('leverage') else "", "*Amount:* `{amount} ({stake_amount} {quote_currency})`", "*Enter Tag:* `{enter_tag}`" if r['enter_tag'] else "", "*Exit Reason:* `{exit_reason}`" if r['exit_reason'] else "", ] if position_adjust: max_buy_str = (f"/{max_entries + 1}" if (max_entries > 0) else "") lines.append("*Number of Entries:* `{num_entries}`" + max_buy_str) lines.extend([ "*Open Rate:* `{open_rate:.8f}`", "*Close Rate:* `{close_rate:.8f}`" if r['close_rate'] else "", "*Open Date:* `{open_date}`", "*Close Date:* `{close_date}`" if r['close_date'] else "", "*Current Rate:* `{current_rate:.8f}`" if r['is_open'] else "", ("*Current Profit:* " if r['is_open'] else "*Close Profit: *") + "`{profit_ratio:.2%}`", ]) if r['is_open']: if (r['stop_loss_abs'] != r['initial_stop_loss_abs'] and r['initial_stop_loss_ratio'] is not None): # Adding initial stoploss only if it is different from stoploss lines.append("*Initial Stoploss:* `{initial_stop_loss_abs:.8f}` " "`({initial_stop_loss_ratio:.2%})`") # Adding stoploss and stoploss percentage only if it is not None lines.append("*Stoploss:* `{stop_loss_abs:.8f}` " + ("`({stop_loss_ratio:.2%})`" if r['stop_loss_ratio'] else "")) lines.append("*Stoploss distance:* `{stoploss_current_dist:.8f}` " "`({stoploss_current_dist_ratio:.2%})`") if r['open_order']: if r['exit_order_status']: lines.append("*Open Order:* `{open_order}` - `{exit_order_status}`") else: lines.append("*Open Order:* `{open_order}`") lines_detail = self._prepare_entry_details( r['orders'], r['quote_currency'], r['is_open']) lines.extend(lines_detail if lines_detail else "") # Filter empty lines using list-comprehension messages.append("\n".join([line for line in lines if line]).format(**r)) for msg in messages: self._send_msg(msg) except RPCException as e: self._send_msg(str(e)) @authorized_only def _status_table(self, update: Update, context: CallbackContext) -> None: """ Handler for /status table. Returns the current TradeThread status in table format :param bot: telegram bot :param update: message update :return: None """ try: fiat_currency = self._config.get('fiat_display_currency', '') statlist, head, fiat_profit_sum = self._rpc._rpc_status_table( self._config['stake_currency'], fiat_currency) show_total = not isnan(fiat_profit_sum) and len(statlist) > 1 max_trades_per_msg = 50 """ Calculate the number of messages of 50 trades per message 0.99 is used to make sure that there are no extra (empty) messages As an example with 50 trades, there will be int(50/50 + 0.99) = 1 message """ messages_count = max(int(len(statlist) / max_trades_per_msg + 0.99), 1) for i in range(0, messages_count): trades = statlist[i * max_trades_per_msg:(i + 1) * max_trades_per_msg] if show_total and i == messages_count - 1: # append total line trades.append(["Total", "", "", f"{fiat_profit_sum:.2f} {fiat_currency}"]) message = tabulate(trades, headers=head, tablefmt='simple') if show_total and i == messages_count - 1: # insert separators line between Total lines = message.split("\n") message = "\n".join(lines[:-1] + [lines[1]] + [lines[-1]]) self._send_msg(f"
{message}", parse_mode=ParseMode.HTML, reload_able=True, callback_path="update_status_table", query=update.callback_query) except RPCException as e: self._send_msg(str(e)) @authorized_only def _timeunit_stats(self, update: Update, context: CallbackContext, unit: str) -> None: """ Handler for /daily
{stats_tab}' ) self._send_msg(message, parse_mode=ParseMode.HTML, reload_able=True, callback_path=val.callback, query=update.callback_query) except RPCException as e: self._send_msg(str(e)) @authorized_only def _daily(self, update: Update, context: CallbackContext) -> None: """ Handler for /daily
{trades_tab}" if trades['trades_count'] > 0 else '')) self._send_msg(message, parse_mode=ParseMode.HTML) except RPCException as e: self._send_msg(str(e)) @authorized_only def _delete_trade(self, update: Update, context: CallbackContext) -> None: """ Handler for /delete
{trade['pair']}\t"
f"{round_coin_value(trade['profit_abs'], self._config['stake_currency'])} "
f"({trade['profit_ratio']:.2%}) "
f"({trade['count']})
\n")
if len(output + stat_line) >= MAX_TELEGRAM_MESSAGE_LENGTH:
self._send_msg(output, parse_mode=ParseMode.HTML)
output = stat_line
else:
output += stat_line
self._send_msg(output, parse_mode=ParseMode.HTML,
reload_able=True, callback_path="update_performance",
query=update.callback_query)
except RPCException as e:
self._send_msg(str(e))
@authorized_only
def _enter_tag_performance(self, update: Update, context: CallbackContext) -> None:
"""
Handler for /buys PAIR .
Shows a performance statistic from finished trades
:param bot: telegram bot
:param update: message update
:return: None
"""
try:
pair = None
if context.args and isinstance(context.args[0], str):
pair = context.args[0]
trades = self._rpc._rpc_enter_tag_performance(pair)
output = "Entry Tag Performance:\n"
for i, trade in enumerate(trades):
stat_line = (
f"{i+1}.\t {trade['enter_tag']}\t"
f"{round_coin_value(trade['profit_abs'], self._config['stake_currency'])} "
f"({trade['profit_ratio']:.2%}) "
f"({trade['count']})
\n")
if len(output + stat_line) >= MAX_TELEGRAM_MESSAGE_LENGTH:
self._send_msg(output, parse_mode=ParseMode.HTML)
output = stat_line
else:
output += stat_line
self._send_msg(output, parse_mode=ParseMode.HTML,
reload_able=True, callback_path="update_enter_tag_performance",
query=update.callback_query)
except RPCException as e:
self._send_msg(str(e))
@authorized_only
def _exit_reason_performance(self, update: Update, context: CallbackContext) -> None:
"""
Handler for /sells.
Shows a performance statistic from finished trades
:param bot: telegram bot
:param update: message update
:return: None
"""
try:
pair = None
if context.args and isinstance(context.args[0], str):
pair = context.args[0]
trades = self._rpc._rpc_exit_reason_performance(pair)
output = "Exit Reason Performance:\n"
for i, trade in enumerate(trades):
stat_line = (
f"{i+1}.\t {trade['exit_reason']}\t"
f"{round_coin_value(trade['profit_abs'], self._config['stake_currency'])} "
f"({trade['profit_ratio']:.2%}) "
f"({trade['count']})
\n")
if len(output + stat_line) >= MAX_TELEGRAM_MESSAGE_LENGTH:
self._send_msg(output, parse_mode=ParseMode.HTML)
output = stat_line
else:
output += stat_line
self._send_msg(output, parse_mode=ParseMode.HTML,
reload_able=True, callback_path="update_exit_reason_performance",
query=update.callback_query)
except RPCException as e:
self._send_msg(str(e))
@authorized_only
def _mix_tag_performance(self, update: Update, context: CallbackContext) -> None:
"""
Handler for /mix_tags.
Shows a performance statistic from finished trades
:param bot: telegram bot
:param update: message update
:return: None
"""
try:
pair = None
if context.args and isinstance(context.args[0], str):
pair = context.args[0]
trades = self._rpc._rpc_mix_tag_performance(pair)
output = "Mix Tag Performance:\n"
for i, trade in enumerate(trades):
stat_line = (
f"{i+1}.\t {trade['mix_tag']}\t"
f"{round_coin_value(trade['profit_abs'], self._config['stake_currency'])} "
f"({trade['profit']:.2%}) "
f"({trade['count']})
\n")
if len(output + stat_line) >= MAX_TELEGRAM_MESSAGE_LENGTH:
self._send_msg(output, parse_mode=ParseMode.HTML)
output = stat_line
else:
output += stat_line
self._send_msg(output, parse_mode=ParseMode.HTML,
reload_able=True, callback_path="update_mix_tag_performance",
query=update.callback_query)
except RPCException as e:
self._send_msg(str(e))
@authorized_only
def _count(self, update: Update, context: CallbackContext) -> None:
"""
Handler for /count.
Returns the number of trades running
:param bot: telegram bot
:param update: message update
:return: None
"""
try:
counts = self._rpc._rpc_count()
message = tabulate({k: [v] for k, v in counts.items()},
headers=['current', 'max', 'total stake'],
tablefmt='simple')
message = "{}".format(message) logger.debug(message) self._send_msg(message, parse_mode=ParseMode.HTML, reload_able=True, callback_path="update_count", query=update.callback_query) except RPCException as e: self._send_msg(str(e)) @authorized_only def _locks(self, update: Update, context: CallbackContext) -> None: """ Handler for /locks. Returns the currently active locks """ rpc_locks = self._rpc._rpc_locks() if not rpc_locks['locks']: self._send_msg('No active locks.', parse_mode=ParseMode.HTML) for locks in chunks(rpc_locks['locks'], 25): message = tabulate([[ lock['id'], lock['pair'], lock['lock_end_time'], lock['reason']] for lock in locks], headers=['ID', 'Pair', 'Until', 'Reason'], tablefmt='simple') message = f"
{escape(message)}" logger.debug(message) self._send_msg(message, parse_mode=ParseMode.HTML) @authorized_only def _delete_locks(self, update: Update, context: CallbackContext) -> None: """ Handler for /delete_locks. Returns the currently active locks """ arg = context.args[0] if context.args and len(context.args) > 0 else None lockid = None pair = None if arg: try: lockid = int(arg) except ValueError: pair = arg self._rpc._rpc_delete_lock(lockid=lockid, pair=pair) self._locks(update, context) @authorized_only def _whitelist(self, update: Update, context: CallbackContext) -> None: """ Handler for /whitelist Shows the currently active whitelist """ try: whitelist = self._rpc._rpc_whitelist() message = f"Using whitelist `{whitelist['method']}` with {whitelist['length']} pairs\n" message += f"`{', '.join(whitelist['whitelist'])}`" logger.debug(message) self._send_msg(message) except RPCException as e: self._send_msg(str(e)) @authorized_only def _blacklist(self, update: Update, context: CallbackContext) -> None: """ Handler for /blacklist Shows the currently active blacklist """ self.send_blacklist_msg(self._rpc._rpc_blacklist(context.args)) def send_blacklist_msg(self, blacklist: Dict): errmsgs = [] for pair, error in blacklist['errors'].items(): errmsgs.append(f"Error adding `{pair}` to blacklist: `{error['error_msg']}`") if errmsgs: self._send_msg('\n'.join(errmsgs)) message = f"Blacklist contains {blacklist['length']} pairs\n" message += f"`{', '.join(blacklist['blacklist'])}`" logger.debug(message) self._send_msg(message) @authorized_only def _blacklist_delete(self, update: Update, context: CallbackContext) -> None: """ Handler for /bl_delete Deletes pair(s) from current blacklist """ self.send_blacklist_msg(self._rpc._rpc_blacklist_delete(context.args or [])) @authorized_only def _logs(self, update: Update, context: CallbackContext) -> None: """ Handler for /logs Shows the latest logs """ try: try: limit = int(context.args[0]) if context.args else 10 except (TypeError, ValueError, IndexError): limit = 10 logs = RPC._rpc_get_logs(limit)['logs'] msgs = '' msg_template = "*{}* {}: {} \\- `{}`" for logrec in logs: msg = msg_template.format(escape_markdown(logrec[0], version=2), escape_markdown(logrec[2], version=2), escape_markdown(logrec[3], version=2), escape_markdown(logrec[4], version=2)) if len(msgs + msg) + 10 >= MAX_TELEGRAM_MESSAGE_LENGTH: # Send message immediately if it would become too long self._send_msg(msgs, parse_mode=ParseMode.MARKDOWN_V2) msgs = msg + '\n' else: # Append message to messages to send msgs += msg + '\n' if msgs: self._send_msg(msgs, parse_mode=ParseMode.MARKDOWN_V2) except RPCException as e: self._send_msg(str(e)) @authorized_only def _edge(self, update: Update, context: CallbackContext) -> None: """ Handler for /edge Shows information related to Edge """ try: edge_pairs = self._rpc._rpc_edge() if not edge_pairs: message = 'Edge only validated following pairs:' self._send_msg(message, parse_mode=ParseMode.HTML) for chunk in chunks(edge_pairs, 25): edge_pairs_tab = tabulate(chunk, headers='keys', tablefmt='simple') message = (f'Edge only validated following pairs:\n' f'
{edge_pairs_tab}') self._send_msg(message, parse_mode=ParseMode.HTML) except RPCException as e: self._send_msg(str(e)) @authorized_only def _help(self, update: Update, context: CallbackContext) -> None: """ Handler for /help. Show commands of the bot :param bot: telegram bot :param update: message update :return: None """ force_enter_text = ("*/forcelong