diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 6bacaf961..63222f2ff 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -61,8 +61,6 @@ USERPATH_FREQAIMODELS = 'freqaimodels' TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] -WAIT_DATA_POLICY_OPTIONS = ['none', 'first', 'all'] - ENV_VAR_PREFIX = 'FREQTRADE__' NON_OPEN_EXCHANGE_STATES = ('cancelled', 'canceled', 'closed', 'expired') @@ -404,7 +402,6 @@ CONF_SCHEMA = { 'username': {'type': 'string'}, 'password': {'type': 'string'}, 'ws_token': {'type': 'string'}, - 'enable_message_ws': {'type': 'boolean', 'default': False}, 'jwt_secret_key': {'type': 'string'}, 'CORS_origins': {'type': 'array', 'items': {'type': 'string'}}, 'verbosity': {'type': 'string', 'enum': ['error', 'info']}, diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 947387f75..90302f88e 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -144,7 +144,6 @@ class DataProvider: if producer_name not in self.__producer_pairs_df: self.__producer_pairs_df[producer_name] = {} - # For multiple leaders, if the data already exists, we'd merge self.__producer_pairs_df[producer_name][pair_key] = (dataframe, datetime.now(timezone.utc)) def get_external_df( diff --git a/freqtrade/enums/rpcmessagetype.py b/freqtrade/enums/rpcmessagetype.py index 6283fb7cc..c213826ae 100644 --- a/freqtrade/enums/rpcmessagetype.py +++ b/freqtrade/enums/rpcmessagetype.py @@ -33,3 +33,4 @@ class RPCMessageType(str, Enum): # Enum for parsing requests from ws consumers class RPCRequestType(str, Enum): SUBSCRIBE = 'subscribe' + INITIAL_DATA = 'initial_data' diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 19c77d403..888994ffb 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -203,8 +203,7 @@ class FreqtradeBot(LoggingMixin): # Doesn't necessarily NEED to be this way, as maybe we'd like to broadcast # even if we are using external dataframes in the future. - self.strategy.analyze(self.active_pair_whitelist, - emit_df=self.dataprovider.external_data_enabled) + self.strategy.analyze(self.active_pair_whitelist) with self._exit_lock: # Check for exchange cancelations, timeouts and user requested replace @@ -264,11 +263,10 @@ class FreqtradeBot(LoggingMixin): pairs that have open trades. """ # Refresh whitelist + _prev_whitelist = self.pairlists.whitelist self.pairlists.refresh_pairlist() _whitelist = self.pairlists.whitelist - self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist}) - # Calculating Edge positioning if self.edge: self.edge.calculate(_whitelist) @@ -279,6 +277,10 @@ class FreqtradeBot(LoggingMixin): # It ensures that candle (OHLCV) data are downloaded for open trades as well _whitelist.extend([trade.pair for trade in trades if trade.pair not in _whitelist]) + # Called last to include the included pairs + if _prev_whitelist != _whitelist: + self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist}) + return _whitelist def get_free_open_trades(self) -> int: diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index d7c7239d1..52507106d 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -4,9 +4,10 @@ from typing import Any, Dict from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect from freqtrade.enums import RPCMessageType, RPCRequestType -from freqtrade.rpc.api_server.deps import get_channel_manager +from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc from freqtrade.rpc.api_server.ws.channel import WebSocketChannel from freqtrade.rpc.api_server.ws.utils import is_websocket_alive +from freqtrade.rpc.rpc import RPC # from typing import Any, Dict @@ -18,17 +19,20 @@ logger = logging.getLogger(__name__) router = APIRouter() -# We are passed a Channel object, we can only do sync functions on that channel object -def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel): +async def _process_consumer_request( + request: Dict[str, Any], + channel: WebSocketChannel, + rpc: RPC +): type, data = request.get('type'), request.get('data') - # If the request is empty, do nothing - if not data: - return - # If we have a request of type SUBSCRIBE, set the topics in this channel if type == RPCRequestType.SUBSCRIBE: - if isinstance(data, list): + # If the request is empty, do nothing + if not data: + return + + if not isinstance(data, list): logger.error(f"Improper request from channel: {channel} - {request}") return @@ -38,11 +42,26 @@ def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel logger.debug(f"{channel} subscribed to topics: {data}") channel.set_subscriptions(data) + elif type == RPCRequestType.INITIAL_DATA: + # Acquire the data + initial_data = rpc._ws_initial_data() + + # We now loop over it sending it in pieces + whitelist_data, analyzed_df = initial_data.get('whitelist'), initial_data.get('analyzed_df') + + if whitelist_data: + await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist_data}) + + if analyzed_df: + for pair, message in analyzed_df.items(): + await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message}) + @router.websocket("/message/ws") async def message_endpoint( ws: WebSocket, - channel_manager=Depends(get_channel_manager) + rpc: RPC = Depends(get_rpc), + channel_manager=Depends(get_channel_manager), ): try: if is_websocket_alive(ws): @@ -59,7 +78,7 @@ async def message_endpoint( # Process the request here. Should this be a method of RPC? logger.info(f"Request: {request}") - _process_consumer_request(request, channel) + await _process_consumer_request(request, channel, rpc) except WebSocketDisconnect: # Handle client disconnects diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index e391e66af..150c83890 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -151,11 +151,9 @@ class ApiServer(RPCHandler): app.include_router(api_backtest, prefix="/api/v1", dependencies=[Depends(http_basic_or_jwt_token)], ) - if self._config.get('api_server', {}).get('enable_message_ws', False): - logger.info("Enabling Message WebSocket") - app.include_router(ws_router, prefix="/api/v1", - dependencies=[Depends(get_ws_token)] - ) + app.include_router(ws_router, prefix="/api/v1", + dependencies=[Depends(get_ws_token)] + ) app.include_router(router_login, prefix="/api/v1", tags=["auth"]) # UI Router MUST be last! app.include_router(router_ui, prefix='') diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index ae72089b5..4544afc29 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -8,7 +8,7 @@ import asyncio import logging import socket from threading import Thread -from typing import Any, Dict +from typing import Any, Dict, Optional import websockets @@ -58,6 +58,11 @@ class ExternalMessageConsumer: # callbacks for the messages self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF] + self._message_handlers = { + RPCMessageType.WHITELIST: self._consume_whitelist_message, + RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message, + } + self.start() def start(self): @@ -152,6 +157,11 @@ class ExternalMessageConsumer: self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) ) + # Now request the initial data from this Producer + await channel.send( + self.compose_consumer_request(RPCRequestType.INITIAL_DATA) + ) + # Now receive data, if none is within the time limit, ping while True: try: @@ -198,7 +208,11 @@ class ExternalMessageConsumer: logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") break - def compose_consumer_request(self, type_: RPCRequestType, data: Any) -> Dict[str, Any]: + def compose_consumer_request( + self, + type_: RPCRequestType, + data: Optional[Any] = None + ) -> Dict[str, Any]: """ Create a request for sending to a producer @@ -208,8 +222,6 @@ class ExternalMessageConsumer: """ return {'type': type_, 'data': data} - # How we do things here isn't set in stone. There seems to be some interest - # in figuring out a better way, but we shall do this for now. def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]): """ Handles external messages from a Producer @@ -225,27 +237,44 @@ class ExternalMessageConsumer: logger.debug(f"Received message of type {message_type}") - # Handle Whitelists - if message_type == RPCMessageType.WHITELIST: - pairlist = message_data + message_handler = self._message_handlers.get(message_type) - # Add the pairlist data to the DataProvider - self._dp.set_producer_pairs(pairlist, producer_name=producer_name) + if not message_handler: + logger.info(f"Received unhandled message: {message_data}, ignoring...") + return - # Handle analyzed dataframes - elif message_type == RPCMessageType.ANALYZED_DF: - key, value = message_data.get('key'), message_data.get('value') + message_handler(producer_name, message_data) - if key and value: - pair, timeframe, candle_type = key + def _consume_whitelist_message(self, producer_name: str, message_data: Any): + # We expect List[str] + if not isinstance(message_data, list): + return - # Convert the JSON to a pandas DataFrame - dataframe = json_to_dataframe(value) + # Add the pairlist data to the DataProvider + self._dp.set_producer_pairs(message_data, producer_name=producer_name) - # If set, remove the Entry and Exit signals from the Producer - if self._emc_config.get('remove_entry_exit_signals', False): - dataframe = remove_entry_exit_signals(dataframe) + logger.debug(f"Consumed message from {producer_name} of type RPCMessageType.WHITELIST") - # Add the dataframe to the dataprovider - self._dp.add_external_df(pair, timeframe, dataframe, - candle_type, producer_name=producer_name) + def _consume_analyzed_df_message(self, producer_name: str, message_data: Any): + # We expect a Dict[str, Any] + if not isinstance(message_data, dict): + return + + key, value = message_data.get('key'), message_data.get('value') + + if key and value: + pair, timeframe, candle_type = key + + # Convert the JSON to a pandas DataFrame + dataframe = json_to_dataframe(value) + + # If set, remove the Entry and Exit signals from the Producer + if self._emc_config.get('remove_entry_exit_signals', False): + dataframe = remove_entry_exit_signals(dataframe) + + # Add the dataframe to the dataprovider + self._dp.add_external_df(pair, timeframe, dataframe, + candle_type, producer_name=producer_name) + + logger.debug( + f"Consumed message from {producer_name} of type RPCMessageType.ANALYZED_DF") diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index ed7f13a96..c4752c570 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -24,7 +24,7 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirecti from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler -from freqtrade.misc import decimals_per_coin, shorten_date +from freqtrade.misc import dataframe_to_json, decimals_per_coin, shorten_date from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence.models import PairLock from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist @@ -1035,16 +1035,51 @@ class RPC: def _rpc_analysed_dataframe(self, pair: str, timeframe: str, limit: Optional[int]) -> Dict[str, Any]: + """ Analyzed dataframe in Dict form """ - _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( - pair, timeframe) - _data = _data.copy() - if limit: - _data = _data.iloc[-limit:] + _data, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit) return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'], pair, timeframe, _data, last_analyzed) - @staticmethod + def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str, + limit: Optional[int]) -> Tuple[DataFrame, datetime]: + """ Get the dataframe and last analyze from the dataprovider """ + _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( + pair, timeframe) + _data = _data.copy() + + if limit: + _data = _data.iloc[-limit:] + return _data, last_analyzed + + def _ws_all_analysed_dataframes( + self, + pairlist: List[str], + limit: Optional[int] + ) -> Dict[str, Any]: + """ Get the analysed dataframes of each pair in the pairlist """ + timeframe = self._freqtrade.config['timeframe'] + candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT) + _data = {} + + for pair in pairlist: + dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit) + _data[pair] = { + "key": (pair, timeframe, candle_type), + "value": dataframe_to_json(dataframe) + } + + return _data + + def _ws_initial_data(self): + """ Websocket friendly initial data, whitelists and all analyzed dataframes """ + whitelist = self._freqtrade.active_pair_whitelist + # We only get the last 500 candles, should we remove the limit? + analyzed_df = self._ws_all_analysed_dataframes(whitelist, 500) + + return {"whitelist": whitelist, "analyzed_df": analyzed_df} + + @ staticmethod def _rpc_analysed_history_full(config, pair: str, timeframe: str, timerange: str, exchange) -> Dict[str, Any]: timerange_parsed = TimeRange.parse_timerange(timerange) @@ -1075,7 +1110,7 @@ class RPC: self._freqtrade.strategy.plot_config['subplots'] = {} return self._freqtrade.strategy.plot_config - @staticmethod + @ staticmethod def _rpc_sysinfo() -> Dict[str, Any]: return { "cpu_pct": psutil.cpu_percent(interval=1, percpu=True), diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index 3488a6e3c..d6374566c 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -81,6 +81,8 @@ class RPCManager: # Removed actually showing the message because the logs would be # completely spammed of the json dataframe logger.info('Sending rpc message of type: %s', msg.get('type')) + # Log actual message in debug? + # logger.debug(msg) if 'pair' in msg: msg.update({ 'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair']) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 34e475ed7..7fcae870a 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -725,9 +725,7 @@ class IStrategy(ABC, HyperStrategyMixin): candle_type = self.config.get('candle_type_def', CandleType.SPOT) self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) - - if emit_df: - self.dp.emit_df((pair, self.timeframe, candle_type), dataframe) + self.dp.emit_df((pair, self.timeframe, candle_type), dataframe) else: logger.debug("Skipping TA Analysis for already analyzed candle")