diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 63222f2ff..352e48148 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -505,7 +505,13 @@ CONF_SCHEMA = { 'reply_timeout': {'type': 'integer'}, 'sleep_time': {'type': 'integer'}, 'ping_timeout': {'type': 'integer'}, - 'remove_signals_analyzed_df': {'type': 'boolean', 'default': False} + 'remove_signals_analyzed_df': {'type': 'boolean', 'default': False}, + 'initial_candle_limit': { + 'type': 'integer', + 'minimum': 100, + 'maximum': 1500, + 'default': 500 + } }, 'required': ['producers'] }, diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index cf5b6cde0..95cfd031a 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -26,6 +26,8 @@ async def _process_consumer_request( ): type, data = request.get('type'), request.get('data') + logger.debug(f"Request of type {type} from {channel}") + # If we have a request of type SUBSCRIBE, set the topics in this channel if type == RPCRequestType.SUBSCRIBE: # If the request is empty, do nothing @@ -49,8 +51,16 @@ async def _process_consumer_request( await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist}) elif type == RPCRequestType.ANALYZED_DF: + limit = None + + if data: + # Limit the amount of candles per dataframe to 'limit' or 1500 + limit = max(data.get('limit', 500), 1500) + # They requested the full historical analyzed dataframes - analyzed_df = rpc._ws_request_analyzed_df() + analyzed_df = rpc._ws_request_analyzed_df(limit) + + logger.debug(f"ANALYZED_DF RESULT: {analyzed_df}") # For every dataframe, send as a separate message for _, message in analyzed_df.items(): @@ -65,32 +75,33 @@ async def message_endpoint( ): try: if is_websocket_alive(ws): - logger.info(f"Consumer connected - {ws.client}") - # TODO: # Return a channel ID, pass that instead of ws to the rest of the methods channel = await channel_manager.on_connect(ws) + logger.info(f"Consumer connected - {channel}") + # Keep connection open until explicitly closed, and process requests try: while not channel.is_closed(): request = await channel.recv() - # Process the request here. Should this be a method of RPC? - logger.info(f"Request: {request}") + # Process the request here await _process_consumer_request(request, channel, rpc) except WebSocketDisconnect: # Handle client disconnects - logger.info(f"Consumer disconnected - {ws.client}") + logger.info(f"Consumer disconnected - {channel}") await channel_manager.on_disconnect(ws) except Exception as e: - logger.info(f"Consumer connection failed - {ws.client}") + logger.info(f"Consumer connection failed - {channel}") logger.exception(e) # Handle cases like - # RuntimeError('Cannot call "send" once a closed message has been sent') await channel_manager.on_disconnect(ws) - except Exception: + except Exception as e: logger.error(f"Failed to serve - {ws.client}") + # Log tracebacks to keep track of what errors are happening + logger.exception(e) await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 8891d3296..1f0cd9c7a 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -1,6 +1,7 @@ import logging from threading import RLock -from typing import List, Type +from typing import List, Optional, Type +from uuid import uuid4 from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer, @@ -19,8 +20,12 @@ class WebSocketChannel: def __init__( self, websocket: WebSocketType, + channel_id: Optional[str] = None, serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer ): + + self.channel_id = channel_id if channel_id else uuid4().hex[:8] + # The WebSocket object self._websocket = WebSocketProxy(websocket) # The Serializing class for the WebSocket object @@ -34,6 +39,13 @@ class WebSocketChannel: # Wrap the WebSocket in the Serializing class self._wrapped_ws = self._serializer_cls(self._websocket) + def __repr__(self): + return f"WebSocketChannel({self.channel_id}, {self.remote_addr})" + + @property + def remote_addr(self): + return self._websocket.remote_addr + async def send(self, data): """ Send data on the wrapped websocket diff --git a/freqtrade/rpc/api_server/ws/proxy.py b/freqtrade/rpc/api_server/ws/proxy.py index 6acc1d363..73d1481b9 100644 --- a/freqtrade/rpc/api_server/ws/proxy.py +++ b/freqtrade/rpc/api_server/ws/proxy.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import Any, Tuple, Union from fastapi import WebSocket as FastAPIWebSocket from websockets import WebSocketClientProtocol as WebSocket @@ -15,6 +15,14 @@ class WebSocketProxy: def __init__(self, websocket: WebSocketType): self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket + @property + def remote_addr(self) -> Tuple[Any, ...]: + if hasattr(self._websocket, "remote_address"): + return self._websocket.remote_address + elif hasattr(self._websocket, "client"): + return tuple(self._websocket.client) + return ("unknown", 0) + async def send(self, data): """ Send data on the wrapped websocket diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index c925624f8..3b39b02c8 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -8,7 +8,7 @@ import asyncio import logging import socket from threading import Thread -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import pandas import websockets @@ -54,11 +54,25 @@ class ExternalMessageConsumer: self.ping_timeout = self._emc_config.get('ping_timeout', 2) self.sleep_time = self._emc_config.get('sleep_time', 5) + self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 500) + # Setting these explicitly as they probably shouldn't be changed by a user # Unless we somehow integrate this with the strategy to allow creating # callbacks for the messages self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF] + # Allow setting data for each initial request + self._initial_requests: List[Dict[str, Any]] = [ + { + "type": RPCRequestType.WHITELIST, + "data": None + }, + { + "type": RPCRequestType.ANALYZED_DF, + "data": {"limit": self.initial_candle_limit} + } + ] + self._message_handlers = { RPCMessageType.WHITELIST: self._consume_whitelist_message, RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message, @@ -145,12 +159,14 @@ class ExternalMessageConsumer: while self._running: try: url, token = producer['url'], producer['ws_token'] + name = producer["name"] ws_url = f"{url}?token={token}" # This will raise InvalidURI if the url is bad async with websockets.connect(ws_url) as ws: - logger.info("Connection successful") - channel = WebSocketChannel(ws) + channel = WebSocketChannel(ws, channel_id=name) + + logger.info(f"Producer connection success - {channel}") # Tell the producer we only want these topics # Should always be the first thing we send @@ -158,41 +174,16 @@ class ExternalMessageConsumer: self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) ) - # Now request the initial data from this Producer for every topic - # we've subscribed to - for topic in self.topics: - # without .upper() we get KeyError - request_type = RPCRequestType[topic.upper()] - await channel.send(self.compose_consumer_request(request_type)) + # Now request the initial data from this Producer + for request in self._initial_requests: + request_type = request.get('type', 'none') # Default to string + request_data = request.get('data') + await channel.send( + self.compose_consumer_request(request_type, request_data) + ) # Now receive data, if none is within the time limit, ping - while True: - try: - message = await asyncio.wait_for( - channel.recv(), - timeout=self.reply_timeout - ) - - async with lock: - # Handle the message - self.handle_producer_message(producer, message) - - except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): - # We haven't received data yet. Check the connection and continue. - try: - # ping - ping = await channel.ping() - - await asyncio.wait_for(ping, timeout=self.ping_timeout) - logger.debug(f"Connection to {url} still alive...") - - continue - except Exception: - logger.info( - f"Ping error {url} - retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - - break + await self._receive_messages(channel, producer, lock) # Catch invalid ws_url, and break the loop except websockets.exceptions.InvalidURI as e: @@ -214,6 +205,47 @@ class ExternalMessageConsumer: logger.exception(e) break + async def _receive_messages( + self, + channel: WebSocketChannel, + producer: Dict[str, Any], + lock: asyncio.Lock + ): + """ + Loop to handle receiving messages from a Producer + + :param channel: The WebSocketChannel object for the WebSocket + :param producer: Dictionary containing producer info + :param lock: An asyncio Lock + """ + while True: + try: + message = await asyncio.wait_for( + channel.recv(), + timeout=self.reply_timeout + ) + + async with lock: + # Handle the message + self.handle_producer_message(producer, message) + + except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): + # We haven't received data yet. Check the connection and continue. + try: + # ping + ping = await channel.ping() + + await asyncio.wait_for(ping, timeout=self.ping_timeout) + logger.debug(f"Connection to {channel} still alive...") + + continue + except Exception: + logger.info( + f"Ping error {channel} - retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + + break + def compose_consumer_request( self, type_: RPCRequestType, @@ -241,7 +273,7 @@ class ExternalMessageConsumer: if message_data is None: return - logger.debug(f"Received message of type {message_type}") + logger.debug(f"Received message of type {message_type} from `{producer_name}`") message_handler = self._message_handlers.get(message_type) diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 378677e44..7b29665eb 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1068,10 +1068,10 @@ class RPC: return _data - def _ws_request_analyzed_df(self): + def _ws_request_analyzed_df(self, limit: Optional[int]): """ Historical Analyzed Dataframes for WebSocket """ whitelist = self._freqtrade.active_pair_whitelist - return self._ws_all_analysed_dataframes(whitelist, 500) + return self._ws_all_analysed_dataframes(whitelist, limit) def _ws_request_whitelist(self): """ Whitelist data for WebSocket """