""" This module contains class to manage RPC communications (Telegram, API, ...) """ import logging from collections import deque from typing import Any, Dict, List from freqtrade.enums import RPCMessageType from freqtrade.rpc import RPC, RPCHandler logger = logging.getLogger(__name__) class RPCManager: """ Class to manage RPC objects (Telegram, API, ...) """ def __init__(self, freqtrade) -> None: """ Initializes all enabled rpc modules """ self.registered_modules: List[RPCHandler] = [] self._freqtrade = freqtrade self._rpc = RPC(freqtrade) config = freqtrade.config # Enable telegram if config.get('telegram', {}).get('enabled', False): logger.info('Enabling rpc.telegram ...') from freqtrade.rpc.telegram import Telegram self.registered_modules.append(Telegram(self._rpc, config)) # Enable discord if config.get('discord', {}).get('enabled', False): logger.info('Enabling rpc.discord ...') from freqtrade.rpc.discord import Discord self.registered_modules.append(Discord(self._rpc, config)) # Enable Webhook if config.get('webhook', {}).get('enabled', False): logger.info('Enabling rpc.webhook ...') from freqtrade.rpc.webhook import Webhook self.registered_modules.append(Webhook(self._rpc, config)) # Enable local rest api server for cmd line control if config.get('api_server', {}).get('enabled', False): logger.info('Enabling rpc.api_server') from freqtrade.rpc.api_server import ApiServer apiserver = ApiServer(config) apiserver.add_rpc_handler(self._rpc) self.registered_modules.append(apiserver) # Enable External Signals mode # For this to be enabled, the API server must also be enabled # if config.get('external_signal', {}).get('enabled', False): # logger.info('Enabling RPC.ExternalSignalController') # from freqtrade.rpc.external_signal import ExternalSignalController # external_signals = ExternalSignalController(self._rpc, config, apiserver) # self.registered_modules.append(external_signals) # # # Attach the controller to FreqTrade # freqtrade.external_signal_controller = external_signals def cleanup(self) -> None: """ Stops all enabled rpc modules """ logger.info('Cleaning up rpc modules ...') while self.registered_modules: mod = self.registered_modules.pop() # popleft to cleanup API server last? logger.info('Cleaning up rpc.%s ...', mod.name) mod.cleanup() del mod def send_msg(self, msg: Dict[str, Any]) -> None: """ Send given message to all registered rpc modules. A message consists of one or more key value pairs of strings. e.g.: { 'status': 'stopping bot' } """ # Removed actually showing the message because the logs would be # completely spammed of the json dataframe logger.info('Sending rpc message of type: %s', msg.get('type')) if 'pair' in msg: msg.update({ 'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair']) }) for mod in self.registered_modules: logger.debug('Forwarding message to rpc.%s', mod.name) try: mod.send_msg(msg) except NotImplementedError: logger.error(f"Message type '{msg['type']}' not implemented by handler {mod.name}.") def process_msg_queue(self, queue: deque) -> None: """ Process all messages in the queue. """ while queue: msg = queue.popleft() self.send_msg({ 'type': RPCMessageType.STRATEGY_MSG, 'msg': msg, }) def startup_messages(self, config: Dict[str, Any], pairlist, protections) -> None: if config['dry_run']: self.send_msg({ 'type': RPCMessageType.WARNING, 'status': 'Dry run is enabled. All trades are simulated.' }) stake_currency = config['stake_currency'] stake_amount = config['stake_amount'] minimal_roi = config['minimal_roi'] stoploss = config['stoploss'] trailing_stop = config['trailing_stop'] timeframe = config['timeframe'] exchange_name = config['exchange']['name'] strategy_name = config.get('strategy', '') pos_adjust_enabled = 'On' if config['position_adjustment_enable'] else 'Off' self.send_msg({ 'type': RPCMessageType.STARTUP, 'status': f'*Exchange:* `{exchange_name}`\n' f'*Stake per trade:* `{stake_amount} {stake_currency}`\n' f'*Minimum ROI:* `{minimal_roi}`\n' f'*{"Trailing " if trailing_stop else ""}Stoploss:* `{stoploss}`\n' f'*Position adjustment:* `{pos_adjust_enabled}`\n' f'*Timeframe:* `{timeframe}`\n' f'*Strategy:* `{strategy_name}`' }) self.send_msg({ 'type': RPCMessageType.STARTUP, 'status': f'Searching for {stake_currency} pairs to buy and sell ' f'based on {pairlist.short_desc()}' }) if len(protections.name_list) > 0: prots = '\n'.join([p for prot in protections.short_desc() for k, p in prot.items()]) self.send_msg({ 'type': RPCMessageType.STARTUP, 'status': f'Using Protections: \n{prots}' })