From 9660e445b89c15c732b276d380f3ef1a27618d46 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 25 Nov 2022 18:09:47 -0700 Subject: [PATCH] use new channel apis in emc, extend analyzed df to include list of dates for candles --- freqtrade/data/dataprovider.py | 78 ++++++++++++- freqtrade/rpc/api_server/ws_schemas.py | 2 +- freqtrade/rpc/external_message_consumer.py | 128 ++++++++++++++++----- freqtrade/rpc/rpc.py | 46 ++++++-- 4 files changed, 212 insertions(+), 42 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 6b220c8b4..d6eb217a8 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -9,7 +9,7 @@ from collections import deque from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple -from pandas import DataFrame +from pandas import DataFrame, concat, date_range from freqtrade.configuration import TimeRange from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe @@ -120,7 +120,7 @@ class DataProvider: 'type': RPCMessageType.ANALYZED_DF, 'data': { 'key': pair_key, - 'df': dataframe, + 'df': dataframe.tail(1), 'la': datetime.now(timezone.utc) } } @@ -157,6 +157,80 @@ class DataProvider: self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") + def _add_external_candle( + self, + pair: str, + dataframe: DataFrame, + last_analyzed: datetime, + timeframe: str, + candle_type: CandleType, + producer_name: str = "default" + ) -> Tuple[bool, Optional[List[str]]]: + """ + Append a candle to the existing external dataframe + + :param pair: pair to get the data for + :param timeframe: Timeframe to get data for + :param candle_type: Any of the enum CandleType (must match trading mode!) + :returns: A tuple with a boolean value signifying if the candle was correctly appended, + and a list of datetimes missing from the candle if it finds some. + Will return false if has no data for `producer_name`. + Will return false if no existing data for (pair, timeframe, candle_type). + Will return false if there's missing candles, and a list of datetimes of + the missing candles. + """ + pair_key = (pair, timeframe, candle_type) + + if producer_name not in self.__producer_pairs_df: + # We don't have data from this producer yet, + # so we can't append a candle + return (False, None) + + if pair_key not in self.__producer_pairs_df[producer_name]: + # We don't have data for this pair_key, + # so we can't append a candle + return (False, None) + + # CHECK FOR MISSING CANDLES + + existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] + appended_df = self._append_candle_to_dataframe(existing_df, dataframe) + + # Everything is good, we appended + self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed + return (True, None) + + def _append_candle_to_dataframe(self, existing: DataFrame, new: DataFrame) -> DataFrame: + """ + Append the `new` dataframe to the `existing` dataframe + + :param existing: The full dataframe you want appended to + :param new: The new dataframe containing the data you want appended + :returns: The dataframe with the new data in it + """ + if existing.iloc[-1]['date'] != new.iloc[-1]['date']: + existing = concat([existing, new]) + + # Only keep the last 1000 candles in memory + # TODO: Do this better + existing = existing[-1000:] if len(existing) > 1000 else existing + + return existing + + def _is_missing_candles(self, dataframe: DataFrame) -> bool: + """ + Check if the dataframe is missing any candles + + :param dataframe: The DataFrame to check + """ + logger.info(dataframe.index) + return len( + date_range( + dataframe.index.min(), + dataframe.index.max() + ).difference(dataframe.index) + ) > 0 + def get_producer_df( self, pair: str, diff --git a/freqtrade/rpc/api_server/ws_schemas.py b/freqtrade/rpc/api_server/ws_schemas.py index 877232213..292672b60 100644 --- a/freqtrade/rpc/api_server/ws_schemas.py +++ b/freqtrade/rpc/api_server/ws_schemas.py @@ -47,7 +47,7 @@ class WSWhitelistRequest(WSRequestSchema): class WSAnalyzedDFRequest(WSRequestSchema): type: RPCRequestType = RPCRequestType.ANALYZED_DF - data: Dict[str, Any] = {"limit": 1500} + data: Dict[str, Any] = {"limit": 1500, "pair": None} # ------------------------------ MESSAGE SCHEMAS ---------------------------- diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 6078efd07..24731ef4f 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -8,7 +8,7 @@ import asyncio import logging import socket from threading import Thread -from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict +from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union import websockets from pydantic import ValidationError @@ -16,7 +16,8 @@ from pydantic import ValidationError from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import RPCMessageType from freqtrade.misc import remove_entry_exit_signals -from freqtrade.rpc.api_server.ws import WebSocketChannel +from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel +from freqtrade.rpc.api_server.ws.message_stream import MessageStream from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest, WSMessageSchema, WSRequestSchema, WSSubscribeRequest, WSWhitelistMessage, @@ -38,6 +39,14 @@ class Producer(TypedDict): logger = logging.getLogger(__name__) +def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]): + return schema.dict(exclude_none=True) + + +# def parse_message(message: Dict[str, Any], message_schema: Type[WSMessageSchema]): +# return message_schema.parse_obj(message) + + class ExternalMessageConsumer: """ The main controller class for consuming external messages from @@ -92,6 +101,8 @@ class ExternalMessageConsumer: RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message, } + self._channel_streams: Dict[str, MessageStream] = {} + self.start() def start(self): @@ -118,6 +129,8 @@ class ExternalMessageConsumer: logger.info("Stopping ExternalMessageConsumer") self._running = False + self._channel_streams = {} + if self._sub_tasks: # Cancel sub tasks for task in self._sub_tasks: @@ -175,7 +188,6 @@ class ExternalMessageConsumer: :param producer: Dictionary containing producer info :param lock: An asyncio Lock """ - channel = None while self._running: try: host, port = producer['host'], producer['port'] @@ -190,19 +202,17 @@ class ExternalMessageConsumer: max_size=self.message_size_limit, ping_interval=None ) as ws: - channel = WebSocketChannel(ws, channel_id=name) + async with create_channel(ws, channel_id=name) as channel: - logger.info(f"Producer connection success - {channel}") + # Create the message stream for this channel + self._channel_streams[name] = MessageStream() - # Now request the initial data from this Producer - for request in self._initial_requests: - await channel.send( - request.dict(exclude_none=True) + # Run the channel tasks while connected + await channel.run_channel_tasks( + self._receive_messages(channel, producer, lock), + self._send_requests(channel, self._channel_streams[name]) ) - # Now receive data, if none is within the time limit, ping - await self._receive_messages(channel, producer, lock) - except (websockets.exceptions.InvalidURI, ValueError) as e: logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") break @@ -214,26 +224,33 @@ class ExternalMessageConsumer: websockets.exceptions.InvalidMessage ) as e: logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - continue except ( websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK ): # Just keep trying to connect again indefinitely - await asyncio.sleep(self.sleep_time) - continue + pass except Exception as e: # An unforseen error has occurred, log and continue logger.error("Unexpected error has occurred:") logger.exception(e) - continue finally: - if channel: - await channel.close() + await asyncio.sleep(self.sleep_time) + continue + + async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream): + # Send the initial requests + for init_request in self._initial_requests: + await channel.send(schema_to_dict(init_request)) + + # Now send any subsequent requests published to + # this channel's stream + async for request in channel_stream: + logger.info(f"Sending request to channel - {channel} - {request}") + await channel.send(request) async def _receive_messages( self, @@ -270,20 +287,39 @@ class ExternalMessageConsumer: latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000) logger.info(f"Connection to {channel} still alive, latency: {latency}ms") - continue + except (websockets.exceptions.ConnectionClosed): # Just eat the error and continue reconnecting logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - break + except Exception as e: + # Just eat the error and continue reconnecting logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) - await asyncio.sleep(self.sleep_time) + finally: + await asyncio.sleep(self.sleep_time) break + def send_producer_request( + self, + producer_name: str, + request: Union[WSRequestSchema, Dict[str, Any]] + ): + """ + Publish a message to the producer's message stream to be + sent by the channel task. + + :param producer_name: The name of the producer to publish the message to + :param request: The request to send to the producer + """ + if isinstance(request, WSRequestSchema): + request = schema_to_dict(request) + + if channel_stream := self._channel_streams.get(producer_name): + channel_stream.publish(request) + def handle_producer_message(self, producer: Producer, message: Dict[str, Any]): """ Handles external messages from a Producer @@ -340,12 +376,44 @@ class ExternalMessageConsumer: if self._emc_config.get('remove_entry_exit_signals', False): df = remove_entry_exit_signals(df) - # Add the dataframe to the dataprovider - self._dp._add_external_df(pair, df, - last_analyzed=la, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name) + if len(df) >= 999: + # This is a full dataframe + # Add the dataframe to the dataprovider + self._dp._add_external_df( + pair, + df, + last_analyzed=la, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) - logger.debug( + elif len(df) == 1: + # This is just a single candle + # Have dataprovider append it to + # the full datafame. If it can't, + # request the missing candles + if not self._dp._add_external_candle( + pair, + df, + last_analyzed=la, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ): + logger.info("Holes in data or no existing df, " + f"requesting data for {key} from `{producer_name}`") + + self.send_producer_request( + producer_name, + WSAnalyzedDFRequest( + data={ + "limit": 1000, + "pair": pair + } + ) + ) + return + + logger.info( f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 334e18dc7..8b23d33e7 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1058,23 +1058,46 @@ class RPC: return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'], pair, timeframe, _data, last_analyzed) - def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str, - limit: Optional[int]) -> Tuple[DataFrame, datetime]: - """ Get the dataframe and last analyze from the dataprovider """ + def __rpc_analysed_dataframe_raw( + self, + pair: str, + timeframe: str, + limit: Optional[Union[int, List[str]]] = None + ) -> Tuple[DataFrame, datetime]: + """ + Get the dataframe and last analyze from the dataprovider + + :param pair: The pair to get + :param timeframe: The timeframe of data to get + :param limit: If an integer, limits the size of dataframe + If a list of string date times, only returns those candles + """ _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( pair, timeframe) _data = _data.copy() - if limit: + if limit and isinstance(limit, int): _data = _data.iloc[-limit:] + elif limit and isinstance(limit, str): + _data = _data.iloc[_data['date'].isin(limit)] + return _data, last_analyzed def _ws_all_analysed_dataframes( self, pairlist: List[str], - limit: Optional[int] + limit: Optional[Union[int, List[str]]] = None ) -> Generator[Dict[str, Any], None, None]: - """ Get the analysed dataframes of each pair in the pairlist """ + """ + Get the analysed dataframes of each pair in the pairlist. + Limit size of dataframe if specified. + If candles, only return the candles specified. + + :param pairlist: A list of pairs to get + :param limit: If an integer, limits the size of dataframe + If a list of string date times, only returns those candles + :returns: A generator of dictionaries with the key, dataframe, and last analyzed timestamp + """ timeframe = self._freqtrade.config['timeframe'] candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT) @@ -1087,10 +1110,15 @@ class RPC: "la": last_analyzed } - def _ws_request_analyzed_df(self, limit: Optional[int]): + def _ws_request_analyzed_df( + self, + pair: Optional[str], + limit: Optional[Union[int, List[str]]] = None, + ): """ Historical Analyzed Dataframes for WebSocket """ - whitelist = self._freqtrade.active_pair_whitelist - return self._ws_all_analysed_dataframes(whitelist, limit) + pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist + + return self._ws_all_analysed_dataframes(pairlist, limit) def _ws_request_whitelist(self): """ Whitelist data for WebSocket """