From 9660e445b89c15c732b276d380f3ef1a27618d46 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 25 Nov 2022 18:09:47 -0700 Subject: [PATCH 01/22] use new channel apis in emc, extend analyzed df to include list of dates for candles --- freqtrade/data/dataprovider.py | 78 ++++++++++++- freqtrade/rpc/api_server/ws_schemas.py | 2 +- freqtrade/rpc/external_message_consumer.py | 128 ++++++++++++++++----- freqtrade/rpc/rpc.py | 46 ++++++-- 4 files changed, 212 insertions(+), 42 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 6b220c8b4..d6eb217a8 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -9,7 +9,7 @@ from collections import deque from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple -from pandas import DataFrame +from pandas import DataFrame, concat, date_range from freqtrade.configuration import TimeRange from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe @@ -120,7 +120,7 @@ class DataProvider: 'type': RPCMessageType.ANALYZED_DF, 'data': { 'key': pair_key, - 'df': dataframe, + 'df': dataframe.tail(1), 'la': datetime.now(timezone.utc) } } @@ -157,6 +157,80 @@ class DataProvider: self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") + def _add_external_candle( + self, + pair: str, + dataframe: DataFrame, + last_analyzed: datetime, + timeframe: str, + candle_type: CandleType, + producer_name: str = "default" + ) -> Tuple[bool, Optional[List[str]]]: + """ + Append a candle to the existing external dataframe + + :param pair: pair to get the data for + :param timeframe: Timeframe to get data for + :param candle_type: Any of the enum CandleType (must match trading mode!) + :returns: A tuple with a boolean value signifying if the candle was correctly appended, + and a list of datetimes missing from the candle if it finds some. + Will return false if has no data for `producer_name`. + Will return false if no existing data for (pair, timeframe, candle_type). + Will return false if there's missing candles, and a list of datetimes of + the missing candles. + """ + pair_key = (pair, timeframe, candle_type) + + if producer_name not in self.__producer_pairs_df: + # We don't have data from this producer yet, + # so we can't append a candle + return (False, None) + + if pair_key not in self.__producer_pairs_df[producer_name]: + # We don't have data for this pair_key, + # so we can't append a candle + return (False, None) + + # CHECK FOR MISSING CANDLES + + existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] + appended_df = self._append_candle_to_dataframe(existing_df, dataframe) + + # Everything is good, we appended + self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed + return (True, None) + + def _append_candle_to_dataframe(self, existing: DataFrame, new: DataFrame) -> DataFrame: + """ + Append the `new` dataframe to the `existing` dataframe + + :param existing: The full dataframe you want appended to + :param new: The new dataframe containing the data you want appended + :returns: The dataframe with the new data in it + """ + if existing.iloc[-1]['date'] != new.iloc[-1]['date']: + existing = concat([existing, new]) + + # Only keep the last 1000 candles in memory + # TODO: Do this better + existing = existing[-1000:] if len(existing) > 1000 else existing + + return existing + + def _is_missing_candles(self, dataframe: DataFrame) -> bool: + """ + Check if the dataframe is missing any candles + + :param dataframe: The DataFrame to check + """ + logger.info(dataframe.index) + return len( + date_range( + dataframe.index.min(), + dataframe.index.max() + ).difference(dataframe.index) + ) > 0 + def get_producer_df( self, pair: str, diff --git a/freqtrade/rpc/api_server/ws_schemas.py b/freqtrade/rpc/api_server/ws_schemas.py index 877232213..292672b60 100644 --- a/freqtrade/rpc/api_server/ws_schemas.py +++ b/freqtrade/rpc/api_server/ws_schemas.py @@ -47,7 +47,7 @@ class WSWhitelistRequest(WSRequestSchema): class WSAnalyzedDFRequest(WSRequestSchema): type: RPCRequestType = RPCRequestType.ANALYZED_DF - data: Dict[str, Any] = {"limit": 1500} + data: Dict[str, Any] = {"limit": 1500, "pair": None} # ------------------------------ MESSAGE SCHEMAS ---------------------------- diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 6078efd07..24731ef4f 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -8,7 +8,7 @@ import asyncio import logging import socket from threading import Thread -from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict +from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union import websockets from pydantic import ValidationError @@ -16,7 +16,8 @@ from pydantic import ValidationError from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import RPCMessageType from freqtrade.misc import remove_entry_exit_signals -from freqtrade.rpc.api_server.ws import WebSocketChannel +from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel +from freqtrade.rpc.api_server.ws.message_stream import MessageStream from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest, WSMessageSchema, WSRequestSchema, WSSubscribeRequest, WSWhitelistMessage, @@ -38,6 +39,14 @@ class Producer(TypedDict): logger = logging.getLogger(__name__) +def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]): + return schema.dict(exclude_none=True) + + +# def parse_message(message: Dict[str, Any], message_schema: Type[WSMessageSchema]): +# return message_schema.parse_obj(message) + + class ExternalMessageConsumer: """ The main controller class for consuming external messages from @@ -92,6 +101,8 @@ class ExternalMessageConsumer: RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message, } + self._channel_streams: Dict[str, MessageStream] = {} + self.start() def start(self): @@ -118,6 +129,8 @@ class ExternalMessageConsumer: logger.info("Stopping ExternalMessageConsumer") self._running = False + self._channel_streams = {} + if self._sub_tasks: # Cancel sub tasks for task in self._sub_tasks: @@ -175,7 +188,6 @@ class ExternalMessageConsumer: :param producer: Dictionary containing producer info :param lock: An asyncio Lock """ - channel = None while self._running: try: host, port = producer['host'], producer['port'] @@ -190,19 +202,17 @@ class ExternalMessageConsumer: max_size=self.message_size_limit, ping_interval=None ) as ws: - channel = WebSocketChannel(ws, channel_id=name) + async with create_channel(ws, channel_id=name) as channel: - logger.info(f"Producer connection success - {channel}") + # Create the message stream for this channel + self._channel_streams[name] = MessageStream() - # Now request the initial data from this Producer - for request in self._initial_requests: - await channel.send( - request.dict(exclude_none=True) + # Run the channel tasks while connected + await channel.run_channel_tasks( + self._receive_messages(channel, producer, lock), + self._send_requests(channel, self._channel_streams[name]) ) - # Now receive data, if none is within the time limit, ping - await self._receive_messages(channel, producer, lock) - except (websockets.exceptions.InvalidURI, ValueError) as e: logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") break @@ -214,26 +224,33 @@ class ExternalMessageConsumer: websockets.exceptions.InvalidMessage ) as e: logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - continue except ( websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK ): # Just keep trying to connect again indefinitely - await asyncio.sleep(self.sleep_time) - continue + pass except Exception as e: # An unforseen error has occurred, log and continue logger.error("Unexpected error has occurred:") logger.exception(e) - continue finally: - if channel: - await channel.close() + await asyncio.sleep(self.sleep_time) + continue + + async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream): + # Send the initial requests + for init_request in self._initial_requests: + await channel.send(schema_to_dict(init_request)) + + # Now send any subsequent requests published to + # this channel's stream + async for request in channel_stream: + logger.info(f"Sending request to channel - {channel} - {request}") + await channel.send(request) async def _receive_messages( self, @@ -270,20 +287,39 @@ class ExternalMessageConsumer: latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000) logger.info(f"Connection to {channel} still alive, latency: {latency}ms") - continue + except (websockets.exceptions.ConnectionClosed): # Just eat the error and continue reconnecting logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - break + except Exception as e: + # Just eat the error and continue reconnecting logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) - await asyncio.sleep(self.sleep_time) + finally: + await asyncio.sleep(self.sleep_time) break + def send_producer_request( + self, + producer_name: str, + request: Union[WSRequestSchema, Dict[str, Any]] + ): + """ + Publish a message to the producer's message stream to be + sent by the channel task. + + :param producer_name: The name of the producer to publish the message to + :param request: The request to send to the producer + """ + if isinstance(request, WSRequestSchema): + request = schema_to_dict(request) + + if channel_stream := self._channel_streams.get(producer_name): + channel_stream.publish(request) + def handle_producer_message(self, producer: Producer, message: Dict[str, Any]): """ Handles external messages from a Producer @@ -340,12 +376,44 @@ class ExternalMessageConsumer: if self._emc_config.get('remove_entry_exit_signals', False): df = remove_entry_exit_signals(df) - # Add the dataframe to the dataprovider - self._dp._add_external_df(pair, df, - last_analyzed=la, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name) + if len(df) >= 999: + # This is a full dataframe + # Add the dataframe to the dataprovider + self._dp._add_external_df( + pair, + df, + last_analyzed=la, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) - logger.debug( + elif len(df) == 1: + # This is just a single candle + # Have dataprovider append it to + # the full datafame. If it can't, + # request the missing candles + if not self._dp._add_external_candle( + pair, + df, + last_analyzed=la, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ): + logger.info("Holes in data or no existing df, " + f"requesting data for {key} from `{producer_name}`") + + self.send_producer_request( + producer_name, + WSAnalyzedDFRequest( + data={ + "limit": 1000, + "pair": pair + } + ) + ) + return + + logger.info( f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 334e18dc7..8b23d33e7 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1058,23 +1058,46 @@ class RPC: return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'], pair, timeframe, _data, last_analyzed) - def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str, - limit: Optional[int]) -> Tuple[DataFrame, datetime]: - """ Get the dataframe and last analyze from the dataprovider """ + def __rpc_analysed_dataframe_raw( + self, + pair: str, + timeframe: str, + limit: Optional[Union[int, List[str]]] = None + ) -> Tuple[DataFrame, datetime]: + """ + Get the dataframe and last analyze from the dataprovider + + :param pair: The pair to get + :param timeframe: The timeframe of data to get + :param limit: If an integer, limits the size of dataframe + If a list of string date times, only returns those candles + """ _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( pair, timeframe) _data = _data.copy() - if limit: + if limit and isinstance(limit, int): _data = _data.iloc[-limit:] + elif limit and isinstance(limit, str): + _data = _data.iloc[_data['date'].isin(limit)] + return _data, last_analyzed def _ws_all_analysed_dataframes( self, pairlist: List[str], - limit: Optional[int] + limit: Optional[Union[int, List[str]]] = None ) -> Generator[Dict[str, Any], None, None]: - """ Get the analysed dataframes of each pair in the pairlist """ + """ + Get the analysed dataframes of each pair in the pairlist. + Limit size of dataframe if specified. + If candles, only return the candles specified. + + :param pairlist: A list of pairs to get + :param limit: If an integer, limits the size of dataframe + If a list of string date times, only returns those candles + :returns: A generator of dictionaries with the key, dataframe, and last analyzed timestamp + """ timeframe = self._freqtrade.config['timeframe'] candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT) @@ -1087,10 +1110,15 @@ class RPC: "la": last_analyzed } - def _ws_request_analyzed_df(self, limit: Optional[int]): + def _ws_request_analyzed_df( + self, + pair: Optional[str], + limit: Optional[Union[int, List[str]]] = None, + ): """ Historical Analyzed Dataframes for WebSocket """ - whitelist = self._freqtrade.active_pair_whitelist - return self._ws_all_analysed_dataframes(whitelist, limit) + pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist + + return self._ws_all_analysed_dataframes(pairlist, limit) def _ws_request_whitelist(self): """ Whitelist data for WebSocket """ From 4cbb3341d7160e21a55b86738100a1f49bfc7a6b Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 25 Nov 2022 19:04:51 -0700 Subject: [PATCH 02/22] change how missing candles will be handled --- freqtrade/data/dataprovider.py | 35 +++++----------------- freqtrade/rpc/external_message_consumer.py | 4 +-- freqtrade/rpc/rpc.py | 13 ++++---- 3 files changed, 15 insertions(+), 37 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index d6eb217a8..07999fc90 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -7,9 +7,9 @@ Common Interface for bot and strategy to access data. import logging from collections import deque from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union -from pandas import DataFrame, concat, date_range +from pandas import DataFrame, concat from freqtrade.configuration import TimeRange from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe @@ -165,40 +165,36 @@ class DataProvider: timeframe: str, candle_type: CandleType, producer_name: str = "default" - ) -> Tuple[bool, Optional[List[str]]]: + ) -> Union[bool, int]: """ Append a candle to the existing external dataframe :param pair: pair to get the data for :param timeframe: Timeframe to get data for :param candle_type: Any of the enum CandleType (must match trading mode!) - :returns: A tuple with a boolean value signifying if the candle was correctly appended, - and a list of datetimes missing from the candle if it finds some. - Will return false if has no data for `producer_name`. - Will return false if no existing data for (pair, timeframe, candle_type). - Will return false if there's missing candles, and a list of datetimes of - the missing candles. + :returns: False if the candle could not be appended, or the int number of missing candles. """ pair_key = (pair, timeframe, candle_type) if producer_name not in self.__producer_pairs_df: # We don't have data from this producer yet, # so we can't append a candle - return (False, None) + return False if pair_key not in self.__producer_pairs_df[producer_name]: # We don't have data for this pair_key, # so we can't append a candle - return (False, None) + return False # CHECK FOR MISSING CANDLES + # return int existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] appended_df = self._append_candle_to_dataframe(existing_df, dataframe) # Everything is good, we appended self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed - return (True, None) + return True def _append_candle_to_dataframe(self, existing: DataFrame, new: DataFrame) -> DataFrame: """ @@ -212,25 +208,10 @@ class DataProvider: existing = concat([existing, new]) # Only keep the last 1000 candles in memory - # TODO: Do this better existing = existing[-1000:] if len(existing) > 1000 else existing return existing - def _is_missing_candles(self, dataframe: DataFrame) -> bool: - """ - Check if the dataframe is missing any candles - - :param dataframe: The DataFrame to check - """ - logger.info(dataframe.index) - return len( - date_range( - dataframe.index.min(), - dataframe.index.max() - ).difference(dataframe.index) - ) > 0 - def get_producer_df( self, pair: str, diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 24731ef4f..231642142 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -388,8 +388,8 @@ class ExternalMessageConsumer: producer_name=producer_name ) - elif len(df) == 1: - # This is just a single candle + elif len(df) < 999: + # This is n single candles # Have dataprovider append it to # the full datafame. If it can't, # request the missing candles diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 8b23d33e7..2452a61b8 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1062,31 +1062,28 @@ class RPC: self, pair: str, timeframe: str, - limit: Optional[Union[int, List[str]]] = None + limit: Optional[int] = None ) -> Tuple[DataFrame, datetime]: """ Get the dataframe and last analyze from the dataprovider :param pair: The pair to get :param timeframe: The timeframe of data to get - :param limit: If an integer, limits the size of dataframe - If a list of string date times, only returns those candles + :param limit: The amount of candles in the dataframe """ _data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe( pair, timeframe) _data = _data.copy() - if limit and isinstance(limit, int): + if limit: _data = _data.iloc[-limit:] - elif limit and isinstance(limit, str): - _data = _data.iloc[_data['date'].isin(limit)] return _data, last_analyzed def _ws_all_analysed_dataframes( self, pairlist: List[str], - limit: Optional[Union[int, List[str]]] = None + limit: Optional[int] = None ) -> Generator[Dict[str, Any], None, None]: """ Get the analysed dataframes of each pair in the pairlist. @@ -1113,7 +1110,7 @@ class RPC: def _ws_request_analyzed_df( self, pair: Optional[str], - limit: Optional[Union[int, List[str]]] = None, + limit: Optional[int] = None, ): """ Historical Analyzed Dataframes for WebSocket """ pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist From 36a00e8de08b47900c5dbaea70c035e51f036571 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 27 Nov 2022 12:17:26 -0700 Subject: [PATCH 03/22] update add_external_candle returns --- freqtrade/data/dataprovider.py | 12 ++++++------ freqtrade/rpc/external_message_consumer.py | 8 +++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 07999fc90..19b5df652 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -7,7 +7,7 @@ Common Interface for bot and strategy to access data. import logging from collections import deque from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple from pandas import DataFrame, concat @@ -165,7 +165,7 @@ class DataProvider: timeframe: str, candle_type: CandleType, producer_name: str = "default" - ) -> Union[bool, int]: + ) -> Tuple[bool, int]: """ Append a candle to the existing external dataframe @@ -179,22 +179,22 @@ class DataProvider: if producer_name not in self.__producer_pairs_df: # We don't have data from this producer yet, # so we can't append a candle - return False + return (False, 0) if pair_key not in self.__producer_pairs_df[producer_name]: # We don't have data for this pair_key, # so we can't append a candle - return False + return (False, 0) # CHECK FOR MISSING CANDLES - # return int + # return (False, int > 0) existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] appended_df = self._append_candle_to_dataframe(existing_df, dataframe) # Everything is good, we appended self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed - return True + return (True, 0) def _append_candle_to_dataframe(self, existing: DataFrame, new: DataFrame) -> DataFrame: """ diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 231642142..17c4e1aa0 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -393,14 +393,16 @@ class ExternalMessageConsumer: # Have dataprovider append it to # the full datafame. If it can't, # request the missing candles - if not self._dp._add_external_candle( + did_append, n_missing = self._dp._add_external_candle( pair, df, last_analyzed=la, timeframe=timeframe, candle_type=candle_type, producer_name=producer_name - ): + ) + + if not did_append: logger.info("Holes in data or no existing df, " f"requesting data for {key} from `{producer_name}`") @@ -408,7 +410,7 @@ class ExternalMessageConsumer: producer_name, WSAnalyzedDFRequest( data={ - "limit": 1000, + "limit": n_missing if n_missing > 0 else 1000, "pair": pair } ) From fce1e9d6d0636c42d1ce19fdc6ebc8acce75e147 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 27 Nov 2022 12:18:41 -0700 Subject: [PATCH 04/22] update analyzed df request to allow specifying a single pair --- freqtrade/rpc/api_server/api_ws.py | 3 ++- freqtrade/rpc/rpc.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index e183cd7e7..18714f15f 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -91,9 +91,10 @@ async def _process_consumer_request( elif type == RPCRequestType.ANALYZED_DF: # Limit the amount of candles per dataframe to 'limit' or 1500 limit = min(data.get('limit', 1500), 1500) if data else None + pair = data.get('pair', None) if data else None # For every pair in the generator, send a separate message - for message in rpc._ws_request_analyzed_df(limit): + for message in rpc._ws_request_analyzed_df(limit, pair): # Format response response = WSAnalyzedDFMessage(data=message) await channel.send(response.dict(exclude_none=True)) diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 2452a61b8..4ebedd6c4 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1109,8 +1109,8 @@ class RPC: def _ws_request_analyzed_df( self, - pair: Optional[str], limit: Optional[int] = None, + pair: Optional[str] = None ): """ Historical Analyzed Dataframes for WebSocket """ pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist From d2c8487ecf01b90fab34dd55cc8d76bdd9bf5c2d Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 27 Nov 2022 13:11:43 -0700 Subject: [PATCH 05/22] update add_external_candle, fix breaking on ping error, handle empty dataframes --- freqtrade/data/dataprovider.py | 14 +++++++++----- freqtrade/rpc/external_message_consumer.py | 20 ++++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 19b5df652..42fe2f603 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -179,15 +179,19 @@ class DataProvider: if producer_name not in self.__producer_pairs_df: # We don't have data from this producer yet, # so we can't append a candle - return (False, 0) + return (False, 999) if pair_key not in self.__producer_pairs_df[producer_name]: # We don't have data for this pair_key, # so we can't append a candle - return (False, 0) + return (False, 999) # CHECK FOR MISSING CANDLES - # return (False, int > 0) + # Calculate difference between last candle in local dataframe + # and first candle in incoming dataframe. Take difference and divide + # by timeframe to find out how many candles we still need. If 1 + # then the incoming candle is the right candle. If more than 1, + # return (False, missing candles - 1) existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] appended_df = self._append_candle_to_dataframe(existing_df, dataframe) @@ -207,8 +211,8 @@ class DataProvider: if existing.iloc[-1]['date'] != new.iloc[-1]['date']: existing = concat([existing, new]) - # Only keep the last 1000 candles in memory - existing = existing[-1000:] if len(existing) > 1000 else existing + # Only keep the last 1500 candles in memory + existing = existing[-1500:] if len(existing) > 1000 else existing return existing diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 17c4e1aa0..13c2e5fb3 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -248,7 +248,7 @@ class ExternalMessageConsumer: # Now send any subsequent requests published to # this channel's stream - async for request in channel_stream: + async for request, _ in channel_stream: logger.info(f"Sending request to channel - {channel} - {request}") await channel.send(request) @@ -292,13 +292,13 @@ class ExternalMessageConsumer: except (websockets.exceptions.ConnectionClosed): # Just eat the error and continue reconnecting logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + break except Exception as e: # Just eat the error and continue reconnecting logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) - - finally: await asyncio.sleep(self.sleep_time) break @@ -372,10 +372,16 @@ class ExternalMessageConsumer: pair, timeframe, candle_type = key + if df.empty: + logger.info(f"Received Empty Dataframe for {key}") + return + # If set, remove the Entry and Exit signals from the Producer if self._emc_config.get('remove_entry_exit_signals', False): df = remove_entry_exit_signals(df) + logger.info(f"Received {len(df)} candle(s) for {key}") + if len(df) >= 999: # This is a full dataframe # Add the dataframe to the dataprovider @@ -404,13 +410,14 @@ class ExternalMessageConsumer: if not did_append: logger.info("Holes in data or no existing df, " - f"requesting data for {key} from `{producer_name}`") + f"requesting {n_missing} candles " + f"for {key} from `{producer_name}`") self.send_producer_request( producer_name, WSAnalyzedDFRequest( data={ - "limit": n_missing if n_missing > 0 else 1000, + "limit": n_missing, "pair": pair } ) @@ -418,4 +425,5 @@ class ExternalMessageConsumer: return logger.info( - f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") + f"Consumed message from `{producer_name}` " + f"of type `RPCMessageType.ANALYZED_DF` for {key}") From 89338fa677185b70528d2f74609ced74f84f7274 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 27 Nov 2022 13:14:49 -0700 Subject: [PATCH 06/22] allow specifying channel send throttle --- freqtrade/rpc/api_server/ws/channel.py | 7 +++++-- freqtrade/rpc/external_message_consumer.py | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index c50aff8be..3c0a833d8 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -27,7 +27,8 @@ class WebSocketChannel: self, websocket: WebSocketType, channel_id: Optional[str] = None, - serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer + serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer, + send_throttle: float = 0.01 ): self.channel_id = channel_id if channel_id else uuid4().hex[:8] self._websocket = WebSocketProxy(websocket) @@ -41,6 +42,7 @@ class WebSocketChannel: self._send_times: Deque[float] = deque([], maxlen=10) # High limit defaults to 3 to start self._send_high_limit = 3 + self._send_throttle = send_throttle # The subscribed message types self._subscriptions: List[str] = [] @@ -106,7 +108,8 @@ class WebSocketChannel: # Explicitly give control back to event loop as # websockets.send does not - await asyncio.sleep(0.01) + # Also throttles how fast we send + await asyncio.sleep(self._send_throttle) async def recv(self): """ diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 13c2e5fb3..aed5d9fb9 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -202,7 +202,11 @@ class ExternalMessageConsumer: max_size=self.message_size_limit, ping_interval=None ) as ws: - async with create_channel(ws, channel_id=name) as channel: + async with create_channel( + ws, + channel_id=name, + send_throttle=0.5 + ) as channel: # Create the message stream for this channel self._channel_streams[name] = MessageStream() From c050eb8b8b372c280b43ea0c2eecbe683ef083d9 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 28 Nov 2022 11:02:03 -0700 Subject: [PATCH 07/22] add candle difference calculation to dataprovider --- freqtrade/data/dataprovider.py | 40 +++++++++++++++++++--------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 42fe2f603..e34a428eb 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -9,7 +9,7 @@ from collections import deque from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple -from pandas import DataFrame, concat +from pandas import DataFrame, concat, to_timedelta from freqtrade.configuration import TimeRange from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe @@ -176,24 +176,30 @@ class DataProvider: """ pair_key = (pair, timeframe, candle_type) - if producer_name not in self.__producer_pairs_df: + if (producer_name not in self.__producer_pairs_df) \ + or (pair_key not in self.__producer_pairs_df[producer_name]): # We don't have data from this producer yet, - # so we can't append a candle - return (False, 999) - - if pair_key not in self.__producer_pairs_df[producer_name]: - # We don't have data for this pair_key, - # so we can't append a candle - return (False, 999) - - # CHECK FOR MISSING CANDLES - # Calculate difference between last candle in local dataframe - # and first candle in incoming dataframe. Take difference and divide - # by timeframe to find out how many candles we still need. If 1 - # then the incoming candle is the right candle. If more than 1, - # return (False, missing candles - 1) + # sor we don't have data for this pair_key + # return False and 1000 for the full df + return (False, 1000) existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] + + # CHECK FOR MISSING CANDLES + timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas + local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data + incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data + + candle_difference = (incoming_first - local_last) / timeframe_delta + + # If the difference divided by the timeframe is 1, then this + # is the candle we want and the incoming data isn't missing any. + # If the candle_difference is more than 1, that means + # we missed some candles between our data and the incoming + # so return False and candle_difference. + if candle_difference > 1: + return (False, candle_difference) + appended_df = self._append_candle_to_dataframe(existing_df, dataframe) # Everything is good, we appended @@ -212,7 +218,7 @@ class DataProvider: existing = concat([existing, new]) # Only keep the last 1500 candles in memory - existing = existing[-1500:] if len(existing) > 1000 else existing + existing = existing[-1500:] if len(existing) > 1500 else existing return existing From ccd1aa70a2f5b1ecfcc202e20250b2d79a11a6cc Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 29 Nov 2022 11:21:36 -0700 Subject: [PATCH 08/22] change log calls to debug, handle already received candle --- freqtrade/data/dataprovider.py | 9 ++++++++- freqtrade/rpc/external_message_consumer.py | 14 +++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index e34a428eb..657d96df1 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -179,7 +179,7 @@ class DataProvider: if (producer_name not in self.__producer_pairs_df) \ or (pair_key not in self.__producer_pairs_df[producer_name]): # We don't have data from this producer yet, - # sor we don't have data for this pair_key + # or we don't have data for this pair_key # return False and 1000 for the full df return (False, 1000) @@ -190,6 +190,13 @@ class DataProvider: local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data + # We have received this candle before, update our copy + # and return True, 0 + if local_last == incoming_first: + existing_df.iloc[-1] = dataframe.iloc[0] + existing_df = existing_df.reset_index(drop=True) + return (True, 0) + candle_difference = (incoming_first - local_last) / timeframe_delta # If the difference divided by the timeframe is 1, then this diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index aed5d9fb9..d028bc006 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -253,7 +253,7 @@ class ExternalMessageConsumer: # Now send any subsequent requests published to # this channel's stream async for request, _ in channel_stream: - logger.info(f"Sending request to channel - {channel} - {request}") + logger.debug(f"Sending request to channel - {channel} - {request}") await channel.send(request) async def _receive_messages( @@ -377,14 +377,14 @@ class ExternalMessageConsumer: pair, timeframe, candle_type = key if df.empty: - logger.info(f"Received Empty Dataframe for {key}") + logger.debug(f"Received Empty Dataframe for {key}") return # If set, remove the Entry and Exit signals from the Producer if self._emc_config.get('remove_entry_exit_signals', False): df = remove_entry_exit_signals(df) - logger.info(f"Received {len(df)} candle(s) for {key}") + logger.debug(f"Received {len(df)} candle(s) for {key}") if len(df) >= 999: # This is a full dataframe @@ -413,9 +413,9 @@ class ExternalMessageConsumer: ) if not did_append: - logger.info("Holes in data or no existing df, " - f"requesting {n_missing} candles " - f"for {key} from `{producer_name}`") + logger.debug("Holes in data or no existing df, " + f"requesting {n_missing} candles " + f"for {key} from `{producer_name}`") self.send_producer_request( producer_name, @@ -428,6 +428,6 @@ class ExternalMessageConsumer: ) return - logger.info( + logger.debug( f"Consumed message from `{producer_name}` " f"of type `RPCMessageType.ANALYZED_DF` for {key}") From d376bf4052f56dcceedf2d30121a1419a7369702 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 29 Nov 2022 12:22:06 -0700 Subject: [PATCH 09/22] fix indefinite reconnecting --- freqtrade/rpc/external_message_consumer.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index d028bc006..05effb783 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -293,18 +293,11 @@ class ExternalMessageConsumer: logger.info(f"Connection to {channel} still alive, latency: {latency}ms") continue - except (websockets.exceptions.ConnectionClosed): - # Just eat the error and continue reconnecting - logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") - await asyncio.sleep(self.sleep_time) - break - except Exception as e: # Just eat the error and continue reconnecting logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) - await asyncio.sleep(self.sleep_time) - break + raise def send_producer_request( self, From 0d5b2eed942922bffae0676d7870f2487f18ccec Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 2 Dec 2022 12:07:48 -0700 Subject: [PATCH 10/22] fix same candle handling --- freqtrade/data/dataprovider.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 657d96df1..78d73b07d 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -194,7 +194,9 @@ class DataProvider: # and return True, 0 if local_last == incoming_first: existing_df.iloc[-1] = dataframe.iloc[0] - existing_df = existing_df.reset_index(drop=True) + existing_data = (existing_df.reset_index(drop=True), _) + + self.__producer_pairs_df[producer_name][pair_key] = existing_data return (True, 0) candle_difference = (incoming_first - local_last) / timeframe_delta From 49f6f40662d46bdfc2ca5006c96577df3db593b1 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 2 Dec 2022 12:08:42 -0700 Subject: [PATCH 11/22] remove comment --- freqtrade/rpc/external_message_consumer.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 05effb783..15312ba10 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -43,10 +43,6 @@ def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]): return schema.dict(exclude_none=True) -# def parse_message(message: Dict[str, Any], message_schema: Type[WSMessageSchema]): -# return message_schema.parse_obj(message) - - class ExternalMessageConsumer: """ The main controller class for consuming external messages from From f1ebaf4730606498d928f3f02ab5fcddfe87310d Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 2 Dec 2022 12:28:27 -0700 Subject: [PATCH 12/22] fix tests --- freqtrade/rpc/external_message_consumer.py | 7 ++++--- tests/rpc/test_rpc_emc.py | 14 ++++++++------ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 15312ba10..743698b24 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -224,20 +224,21 @@ class ExternalMessageConsumer: websockets.exceptions.InvalidMessage ) as e: logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + continue except ( websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK ): # Just keep trying to connect again indefinitely - pass + await asyncio.sleep(self.sleep_time) + continue except Exception as e: # An unforseen error has occurred, log and continue logger.error("Unexpected error has occurred:") logger.exception(e) - - finally: await asyncio.sleep(self.sleep_time) continue diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 93ae829d5..155239e94 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -94,7 +94,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): assert log_has( f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog) - # Test handle analyzed_df message + # Test handle analyzed_df single candle message df_message = { "type": "analyzed_df", "data": { @@ -106,8 +106,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): patched_emc.handle_producer_message(test_producer, df_message) assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) - assert log_has( - f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`", caplog) + assert log_has_re(r"Holes in data or no existing df,.+", caplog) # Test unhandled message unhandled_message = {"type": "status", "data": "RUNNING"} @@ -183,7 +182,7 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker): async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT): await emc._create_connection(test_producer, lock) - assert log_has_re(r"Producer connection success.+", caplog) + assert log_has_re(r"Connected to channel.+", caplog) finally: emc.shutdown() @@ -212,7 +211,8 @@ async def test_emc_create_connection_invalid_url(default_conf, caplog, mocker, h dp = DataProvider(default_conf, None, None, None) # Handle start explicitly to avoid messing with threading in tests - mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start",) + mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start") + mocker.patch("freqtrade.rpc.api_server.ws.channel.create_channel") emc = ExternalMessageConsumer(default_conf, dp) try: @@ -390,7 +390,9 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): try: change_running(emc) loop.call_soon(functools.partial(change_running, emc=emc)) - await emc._receive_messages(TestChannel(), test_producer, lock) + + with pytest.raises(asyncio.TimeoutError): + await emc._receive_messages(TestChannel(), test_producer, lock) assert log_has_re(r"Ping error.+", caplog) finally: From 0602479f7d328094401ebe454fd4d33962b09a19 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 5 Dec 2022 13:11:07 -0700 Subject: [PATCH 13/22] minor changes, update candle appending to support overlaps --- freqtrade/data/dataprovider.py | 31 +++++++++++++--------- freqtrade/rpc/external_message_consumer.py | 22 ++++++++++----- freqtrade/rpc/rpc.py | 4 +-- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 78d73b07d..b889da17f 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -167,7 +167,8 @@ class DataProvider: producer_name: str = "default" ) -> Tuple[bool, int]: """ - Append a candle to the existing external dataframe + Append a candle to the existing external dataframe. The incoming dataframe + must have at least 1 candle. :param pair: pair to get the data for :param timeframe: Timeframe to get data for @@ -176,29 +177,32 @@ class DataProvider: """ pair_key = (pair, timeframe, candle_type) - if (producer_name not in self.__producer_pairs_df) \ - or (pair_key not in self.__producer_pairs_df[producer_name]): + if dataframe.empty: + # The incoming dataframe must have at least 1 candle + return (False, 0) + + if (producer_name not in self.__producer_pairs_df + or pair_key not in self.__producer_pairs_df[producer_name]): # We don't have data from this producer yet, # or we don't have data for this pair_key # return False and 1000 for the full df return (False, 1000) - existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] + existing_df, la = self.__producer_pairs_df[producer_name][pair_key] + + # Iterate over any overlapping candles and update the values + for idx, candle in dataframe.iterrows(): + existing_df.iloc[ + existing_df['date'] == candle['date'] + ] = candle + + existing_df.reset_index(drop=True, inplace=True) # CHECK FOR MISSING CANDLES timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data - # We have received this candle before, update our copy - # and return True, 0 - if local_last == incoming_first: - existing_df.iloc[-1] = dataframe.iloc[0] - existing_data = (existing_df.reset_index(drop=True), _) - - self.__producer_pairs_df[producer_name][pair_key] = existing_data - return (True, 0) - candle_difference = (incoming_first - local_last) / timeframe_delta # If the difference divided by the timeframe is 1, then this @@ -228,6 +232,7 @@ class DataProvider: # Only keep the last 1500 candles in memory existing = existing[-1500:] if len(existing) > 1500 else existing + existing.reset_index(drop=True, inplace=True) return existing diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 743698b24..278f04a8e 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -36,6 +36,9 @@ class Producer(TypedDict): ws_token: str +FULL_DATAFRAME_THRESHOLD = 100 + + logger = logging.getLogger(__name__) @@ -376,8 +379,8 @@ class ExternalMessageConsumer: logger.debug(f"Received {len(df)} candle(s) for {key}") - if len(df) >= 999: - # This is a full dataframe + if len(df) >= FULL_DATAFRAME_THRESHOLD: + # This is likely a full dataframe # Add the dataframe to the dataprovider self._dp._add_external_df( pair, @@ -388,8 +391,8 @@ class ExternalMessageConsumer: producer_name=producer_name ) - elif len(df) < 999: - # This is n single candles + elif len(df) < FULL_DATAFRAME_THRESHOLD: + # This is likely n single candles # Have dataprovider append it to # the full datafame. If it can't, # request the missing candles @@ -403,9 +406,14 @@ class ExternalMessageConsumer: ) if not did_append: - logger.debug("Holes in data or no existing df, " - f"requesting {n_missing} candles " - f"for {key} from `{producer_name}`") + # We want an overlap in candles incase some data has changed + n_missing += 1 + # Set to None for all candles if we missed a full df's worth of candles + n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500 + + logger.warning("Holes in data or no existing df, " + f"requesting {n_missing} candles " + f"for {key} from `{producer_name}`") self.send_producer_request( producer_name, diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 4ebedd6c4..331569de3 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1062,7 +1062,7 @@ class RPC: self, pair: str, timeframe: str, - limit: Optional[int] = None + limit: Optional[int] ) -> Tuple[DataFrame, datetime]: """ Get the dataframe and last analyze from the dataprovider @@ -1083,7 +1083,7 @@ class RPC: def _ws_all_analysed_dataframes( self, pairlist: List[str], - limit: Optional[int] = None + limit: Optional[int] ) -> Generator[Dict[str, Any], None, None]: """ Get the analysed dataframes of each pair in the pairlist. From 6717dff19bb75015ff8ad8624fa0a82d3a961952 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 6 Dec 2022 16:00:28 -0700 Subject: [PATCH 14/22] update overlapping candle handling, move append to misc --- freqtrade/data/dataprovider.py | 48 ++++++++++++++++------------------ freqtrade/misc.py | 18 +++++++++++++ 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index b889da17f..8d81221b6 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -17,6 +17,7 @@ from freqtrade.data.history import load_pair_history from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exchange import Exchange, timeframe_to_seconds +from freqtrade.misc import append_candles_to_dataframe from freqtrade.rpc import RPCManager from freqtrade.util import PeriodicCache @@ -190,18 +191,30 @@ class DataProvider: existing_df, la = self.__producer_pairs_df[producer_name][pair_key] - # Iterate over any overlapping candles and update the values - for idx, candle in dataframe.iterrows(): - existing_df.iloc[ - existing_df['date'] == candle['date'] - ] = candle + # Handle overlapping candles + old_candles = existing_df[ + ~existing_df['date'].isin( + dataframe['date'] + ) + ] + overlapping_candles = existing_df[ + existing_df['date'].isin( + dataframe['date'] + ) + ] + new_candles = dataframe[ + ~dataframe['date'].isin( + existing_df['date'] + ) + ] - existing_df.reset_index(drop=True, inplace=True) + if overlapping_candles: + existing_df = concat([old_candles, overlapping_candles], axis=0) # CHECK FOR MISSING CANDLES timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas - local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data - incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data + local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy + incoming_first = new_candles.iloc[0]['date'] # We want the first date from the incoming candle_difference = (incoming_first - local_last) / timeframe_delta @@ -213,29 +226,12 @@ class DataProvider: if candle_difference > 1: return (False, candle_difference) - appended_df = self._append_candle_to_dataframe(existing_df, dataframe) + appended_df = append_candles_to_dataframe(existing_df, dataframe) # Everything is good, we appended self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed return (True, 0) - def _append_candle_to_dataframe(self, existing: DataFrame, new: DataFrame) -> DataFrame: - """ - Append the `new` dataframe to the `existing` dataframe - - :param existing: The full dataframe you want appended to - :param new: The new dataframe containing the data you want appended - :returns: The dataframe with the new data in it - """ - if existing.iloc[-1]['date'] != new.iloc[-1]['date']: - existing = concat([existing, new]) - - # Only keep the last 1500 candles in memory - existing = existing[-1500:] if len(existing) > 1500 else existing - existing.reset_index(drop=True, inplace=True) - - return existing - def get_producer_df( self, pair: str, diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 2d2c7513a..93e8da6dd 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -301,3 +301,21 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame): dataframe[SignalTagType.EXIT_TAG.value] = None return dataframe + + +def append_candles_to_dataframe(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame: + """ + Append the `right` dataframe to the `left` dataframe + + :param left: The full dataframe you want appended to + :param right: The new dataframe containing the data you want appended + :returns: The dataframe with the right data in it + """ + if left.iloc[-1]['date'] != right.iloc[-1]['date']: + left = pd.concat([left, right]) + + # Only keep the last 1500 candles in memory + left = left[-1500:] if len(left) > 1500 else left + left.reset_index(drop=True, inplace=True) + + return left From 414c0ce050e520855a6440176b89e4c76797a6e1 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 6 Dec 2022 16:02:28 -0700 Subject: [PATCH 15/22] change unused var --- freqtrade/data/dataprovider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 8d81221b6..3a6f74b97 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -189,7 +189,7 @@ class DataProvider: # return False and 1000 for the full df return (False, 1000) - existing_df, la = self.__producer_pairs_df[producer_name][pair_key] + existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] # Handle overlapping candles old_candles = existing_df[ From 96edd31458e20237d65f98642c198b1cb13f8c4b Mon Sep 17 00:00:00 2001 From: Matthias Date: Sat, 10 Dec 2022 20:03:42 +0100 Subject: [PATCH 16/22] Test add_external_candle --- tests/data/test_dataprovider.py | 68 ++++++++++++++++++++++++++++++++- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/tests/data/test_dataprovider.py b/tests/data/test_dataprovider.py index 025e6d08a..862abfa0b 100644 --- a/tests/data/test_dataprovider.py +++ b/tests/data/test_dataprovider.py @@ -2,13 +2,13 @@ from datetime import datetime, timezone from unittest.mock import MagicMock import pytest -from pandas import DataFrame +from pandas import DataFrame, Timestamp from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import CandleType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.plugins.pairlistmanager import PairListManager -from tests.conftest import get_patched_exchange +from tests.conftest import generate_test_data, get_patched_exchange @pytest.mark.parametrize('candle_type', [ @@ -412,3 +412,67 @@ def test_dp_send_msg(default_conf): dp = DataProvider(default_conf, None) dp.send_msg(msg, always_send=True) assert msg not in dp._msg_queue + + +def test_dp__add_external_candle(default_conf_usdt): + timeframe = '1h' + default_conf_usdt["timeframe"] = timeframe + dp = DataProvider(default_conf_usdt, None) + df = generate_test_data(timeframe, 24, '2022-01-01 00:00:00+00:00') + last_analyzed = datetime.now(timezone.utc) + + res = dp._add_external_candle('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + # Why 1000 ?? + assert res[1] == 1000 + + dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + # BTC is not stored yet + res = dp._add_external_candle('BTC/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df) == 24 + + # Add the same dataframe again - dataframe size shall not change. + res = dp._add_external_candle('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is True + assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df) == 24 + + # Add a new day. + df2 = generate_test_data(timeframe, 24, '2022-01-02 00:00:00+00:00') + + res = dp._add_external_candle('ETH/USDT', df2, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is True + assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df) == 48 + + # Add a dataframe with a 12 hour offset - so 12 candles are overlapping, and 12 valid. + df3 = generate_test_data(timeframe, 24, '2022-01-02 12:00:00+00:00') + + res = dp._add_external_candle('ETH/USDT', df3, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is True + assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + # New length = 48 + 12 (since we have a 12 hour offset). + assert len(df) == 60 + assert df.iloc[-1]['date'] == df3.iloc[-1]['date'] + assert df.iloc[-1]['date'] == Timestamp('2022-01-03 11:00:00+00:00') + + # Generate 1 new candle + df4 = generate_test_data(timeframe, 1, '2022-01-03 12:00:00+00:00') + res = dp._add_external_candle('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + # assert res[0] is True + # assert res[1] == 0 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + # New length = 61 + 1 + assert len(df) == 61 + + # Gap in the data ... + df4 = generate_test_data(timeframe, 1, '2022-01-05 00:00:00+00:00') + res = dp._add_external_candle('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 + assert res[1] == 36 From a693495a6d599fd7bdbec75337db3c44dc39c5b7 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 11 Dec 2022 08:42:13 +0100 Subject: [PATCH 17/22] Improve external_candle aggregation --- freqtrade/data/dataprovider.py | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 3a6f74b97..10569e7c7 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -9,7 +9,7 @@ from collections import deque from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Tuple -from pandas import DataFrame, concat, to_timedelta +from pandas import DataFrame, to_timedelta from freqtrade.configuration import TimeRange from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe @@ -191,30 +191,13 @@ class DataProvider: existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] - # Handle overlapping candles - old_candles = existing_df[ - ~existing_df['date'].isin( - dataframe['date'] - ) - ] - overlapping_candles = existing_df[ - existing_df['date'].isin( - dataframe['date'] - ) - ] - new_candles = dataframe[ - ~dataframe['date'].isin( - existing_df['date'] - ) - ] - - if overlapping_candles: - existing_df = concat([old_candles, overlapping_candles], axis=0) - # CHECK FOR MISSING CANDLES timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy - incoming_first = new_candles.iloc[0]['date'] # We want the first date from the incoming + incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming + + # Remove existing candles that are newer than the incoming first candle + existing_df1 = existing_df[existing_df['date'] < incoming_first] candle_difference = (incoming_first - local_last) / timeframe_delta @@ -225,8 +208,10 @@ class DataProvider: # so return False and candle_difference. if candle_difference > 1: return (False, candle_difference) - - appended_df = append_candles_to_dataframe(existing_df, dataframe) + if existing_df1.empty: + appended_df = dataframe + else: + appended_df = append_candles_to_dataframe(existing_df1, dataframe) # Everything is good, we appended self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed From 1c0c4fd4206bcafc59ad70a5bb5890cf657a928d Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 11 Dec 2022 08:49:35 +0100 Subject: [PATCH 18/22] Improve test --- tests/data/test_dataprovider.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/data/test_dataprovider.py b/tests/data/test_dataprovider.py index 862abfa0b..cce483c07 100644 --- a/tests/data/test_dataprovider.py +++ b/tests/data/test_dataprovider.py @@ -469,6 +469,8 @@ def test_dp__add_external_candle(default_conf_usdt): df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) # New length = 61 + 1 assert len(df) == 61 + assert df.iloc[-2]['date'] == Timestamp('2022-01-03 11:00:00+00:00') + assert df.iloc[-1]['date'] == Timestamp('2022-01-03 12:00:00+00:00') # Gap in the data ... df4 = generate_test_data(timeframe, 1, '2022-01-05 00:00:00+00:00') @@ -476,3 +478,13 @@ def test_dp__add_external_candle(default_conf_usdt): assert res[0] is False # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 assert res[1] == 36 + df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + # New length = 61 + 1 + assert len(df) == 61 + + # Empty dataframe + df4 = generate_test_data(timeframe, 0, '2022-01-05 00:00:00+00:00') + res = dp._add_external_candle('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + assert res[0] is False + # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 + assert res[1] == 0 From 0dd3836cc7a6c3ac8b5863b8267db889c7666d14 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 12 Dec 2022 22:46:19 -0700 Subject: [PATCH 19/22] fix rpc method docstring --- freqtrade/rpc/rpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 331569de3..ceb791b46 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1087,8 +1087,8 @@ class RPC: ) -> Generator[Dict[str, Any], None, None]: """ Get the analysed dataframes of each pair in the pairlist. - Limit size of dataframe if specified. - If candles, only return the candles specified. + If specified, only return the most recent `limit` candles for + each dataframe. :param pairlist: A list of pairs to get :param limit: If an integer, limits the size of dataframe From 97fee37072dd28a8981131523711cfc7cbb9a3b6 Mon Sep 17 00:00:00 2001 From: Matthias Date: Wed, 14 Dec 2022 07:22:41 +0100 Subject: [PATCH 20/22] Improve emc test --- freqtrade/rpc/external_message_consumer.py | 3 +-- tests/rpc/test_rpc_emc.py | 25 +++++++++++++++++++--- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 278f04a8e..67b323fb2 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -411,8 +411,7 @@ class ExternalMessageConsumer: # Set to None for all candles if we missed a full df's worth of candles n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500 - logger.warning("Holes in data or no existing df, " - f"requesting {n_missing} candles " + logger.warning("Holes in data or no existing df, requesting {n_missing} candles " f"for {key} from `{producer_name}`") self.send_producer_request( diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 155239e94..e1537ec9e 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -83,6 +83,7 @@ def test_emc_init(patched_emc): def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"} producer_name = test_producer['name'] + invalid_msg = r"Invalid message .+" caplog.set_level(logging.DEBUG) @@ -119,7 +120,8 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}} patched_emc.handle_producer_message(test_producer, malformed_message) - assert log_has_re(r"Invalid message .+", caplog) + assert log_has_re(invalid_msg, caplog) + caplog.clear() malformed_message = { "type": "analyzed_df", @@ -132,13 +134,30 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): patched_emc.handle_producer_message(test_producer, malformed_message) assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) - assert log_has_re(r"Invalid message .+", caplog) + assert log_has_re(invalid_msg, caplog) + caplog.clear() + + # Empty dataframe + malformed_message = { + "type": "analyzed_df", + "data": { + "key": ("BTC/USDT", "5m", "spot"), + "df": ohlcv_history.loc[ohlcv_history['open'] < 0], + "la": datetime.now(timezone.utc) + } + } + patched_emc.handle_producer_message(test_producer, malformed_message) + + assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) + assert not log_has_re(invalid_msg, caplog) + assert log_has_re(r"Received Empty Dataframe for.+", caplog) caplog.clear() malformed_message = {"some": "stuff"} patched_emc.handle_producer_message(test_producer, malformed_message) - assert log_has_re(r"Invalid message .+", caplog) + assert log_has_re(invalid_msg, caplog) + caplog.clear() caplog.clear() malformed_message = {"type": "whitelist", "data": None} From fa260e6560591d848189197362e69806396eb1bb Mon Sep 17 00:00:00 2001 From: Matthias Date: Wed, 14 Dec 2022 19:56:54 +0100 Subject: [PATCH 21/22] Move "replace or append" decision to dataprovider --- freqtrade/constants.py | 1 + freqtrade/data/dataprovider.py | 29 ++++++++-- freqtrade/rpc/external_message_consumer.py | 67 ++++++++-------------- 3 files changed, 50 insertions(+), 47 deletions(-) diff --git a/freqtrade/constants.py b/freqtrade/constants.py index ca1be1d6a..ff6cc7c67 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -61,6 +61,7 @@ USERPATH_FREQAIMODELS = 'freqaimodels' TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] +FULL_DATAFRAME_THRESHOLD = 100 ENV_VAR_PREFIX = 'FREQTRADE__' diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 10569e7c7..b46f4e881 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -12,7 +12,8 @@ from typing import Any, Dict, List, Optional, Tuple from pandas import DataFrame, to_timedelta from freqtrade.configuration import TimeRange -from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe +from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes, + PairWithTimeframe) from freqtrade.data.history import load_pair_history from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException @@ -132,7 +133,7 @@ class DataProvider: 'data': pair_key, }) - def _add_external_df( + def _replace_external_df( self, pair: str, dataframe: DataFrame, @@ -158,7 +159,7 @@ class DataProvider: self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") - def _add_external_candle( + def _add_external_df( self, pair: str, dataframe: DataFrame, @@ -182,6 +183,19 @@ class DataProvider: # The incoming dataframe must have at least 1 candle return (False, 0) + if len(dataframe) >= FULL_DATAFRAME_THRESHOLD: + # This is likely a full dataframe + # Add the dataframe to the dataprovider + self._add_external_df( + pair, + dataframe, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) + return (True, 0) + if (producer_name not in self.__producer_pairs_df or pair_key not in self.__producer_pairs_df[producer_name]): # We don't have data from this producer yet, @@ -214,7 +228,14 @@ class DataProvider: appended_df = append_candles_to_dataframe(existing_df1, dataframe) # Everything is good, we appended - self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed + self._add_external_df( + pair, + appended_df, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name + ) return (True, 0) def get_producer_df( diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 67b323fb2..e888191ea 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union import websockets from pydantic import ValidationError +from freqtrade.constants import FULL_DATAFRAME_THRESHOLD from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import RPCMessageType from freqtrade.misc import remove_entry_exit_signals @@ -36,9 +37,6 @@ class Producer(TypedDict): ws_token: str -FULL_DATAFRAME_THRESHOLD = 100 - - logger = logging.getLogger(__name__) @@ -379,51 +377,34 @@ class ExternalMessageConsumer: logger.debug(f"Received {len(df)} candle(s) for {key}") - if len(df) >= FULL_DATAFRAME_THRESHOLD: - # This is likely a full dataframe - # Add the dataframe to the dataprovider - self._dp._add_external_df( - pair, - df, - last_analyzed=la, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name + did_append, n_missing = self._dp._add_external_df( + pair, + df, + last_analyzed=la, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name ) - elif len(df) < FULL_DATAFRAME_THRESHOLD: - # This is likely n single candles - # Have dataprovider append it to - # the full datafame. If it can't, - # request the missing candles - did_append, n_missing = self._dp._add_external_candle( - pair, - df, - last_analyzed=la, - timeframe=timeframe, - candle_type=candle_type, - producer_name=producer_name - ) + if not did_append: + # We want an overlap in candles incase some data has changed + n_missing += 1 + # Set to None for all candles if we missed a full df's worth of candles + n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500 - if not did_append: - # We want an overlap in candles incase some data has changed - n_missing += 1 - # Set to None for all candles if we missed a full df's worth of candles - n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500 + logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles " + f"for {key} from `{producer_name}`") - logger.warning("Holes in data or no existing df, requesting {n_missing} candles " - f"for {key} from `{producer_name}`") - - self.send_producer_request( - producer_name, - WSAnalyzedDFRequest( - data={ - "limit": n_missing, - "pair": pair - } - ) + self.send_producer_request( + producer_name, + WSAnalyzedDFRequest( + data={ + "limit": n_missing, + "pair": pair + } ) - return + ) + return logger.debug( f"Consumed message from `{producer_name}` " From ca2a878b86b32d5c81abd4276c7de7c907f25a69 Mon Sep 17 00:00:00 2001 From: Matthias Date: Wed, 14 Dec 2022 19:58:45 +0100 Subject: [PATCH 22/22] Update test naming --- freqtrade/data/dataprovider.py | 4 ++-- tests/data/test_dataprovider.py | 29 +++++++++++++++-------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index b46f4e881..df4a4c898 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -186,7 +186,7 @@ class DataProvider: if len(dataframe) >= FULL_DATAFRAME_THRESHOLD: # This is likely a full dataframe # Add the dataframe to the dataprovider - self._add_external_df( + self._replace_external_df( pair, dataframe, last_analyzed=last_analyzed, @@ -228,7 +228,7 @@ class DataProvider: appended_df = append_candles_to_dataframe(existing_df1, dataframe) # Everything is good, we appended - self._add_external_df( + self._replace_external_df( pair, appended_df, last_analyzed=last_analyzed, diff --git a/tests/data/test_dataprovider.py b/tests/data/test_dataprovider.py index cce483c07..7d61a22be 100644 --- a/tests/data/test_dataprovider.py +++ b/tests/data/test_dataprovider.py @@ -161,9 +161,9 @@ def test_producer_pairs(mocker, default_conf, ohlcv_history): assert dataprovider.get_producer_pairs("bad") == [] -def test_get_producer_df(mocker, default_conf, ohlcv_history): +def test_get_producer_df(mocker, default_conf): dataprovider = DataProvider(default_conf, None) - + ohlcv_history = generate_test_data('5m', 150) pair = 'BTC/USDT' timeframe = default_conf['timeframe'] candle_type = CandleType.SPOT @@ -414,27 +414,28 @@ def test_dp_send_msg(default_conf): assert msg not in dp._msg_queue -def test_dp__add_external_candle(default_conf_usdt): +def test_dp__add_external_df(default_conf_usdt): timeframe = '1h' default_conf_usdt["timeframe"] = timeframe dp = DataProvider(default_conf_usdt, None) df = generate_test_data(timeframe, 24, '2022-01-01 00:00:00+00:00') last_analyzed = datetime.now(timezone.utc) - res = dp._add_external_candle('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is False # Why 1000 ?? assert res[1] == 1000 - dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + # Hard add dataframe + dp._replace_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) # BTC is not stored yet - res = dp._add_external_candle('BTC/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('BTC/USDT', df, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is False - df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) - assert len(df) == 24 + df_res, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) + assert len(df_res) == 24 # Add the same dataframe again - dataframe size shall not change. - res = dp._add_external_candle('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is True assert res[1] == 0 df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) @@ -443,7 +444,7 @@ def test_dp__add_external_candle(default_conf_usdt): # Add a new day. df2 = generate_test_data(timeframe, 24, '2022-01-02 00:00:00+00:00') - res = dp._add_external_candle('ETH/USDT', df2, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df2, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is True assert res[1] == 0 df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) @@ -452,7 +453,7 @@ def test_dp__add_external_candle(default_conf_usdt): # Add a dataframe with a 12 hour offset - so 12 candles are overlapping, and 12 valid. df3 = generate_test_data(timeframe, 24, '2022-01-02 12:00:00+00:00') - res = dp._add_external_candle('ETH/USDT', df3, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df3, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is True assert res[1] == 0 df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) @@ -463,7 +464,7 @@ def test_dp__add_external_candle(default_conf_usdt): # Generate 1 new candle df4 = generate_test_data(timeframe, 1, '2022-01-03 12:00:00+00:00') - res = dp._add_external_candle('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) # assert res[0] is True # assert res[1] == 0 df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT) @@ -474,7 +475,7 @@ def test_dp__add_external_candle(default_conf_usdt): # Gap in the data ... df4 = generate_test_data(timeframe, 1, '2022-01-05 00:00:00+00:00') - res = dp._add_external_candle('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is False # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 assert res[1] == 36 @@ -484,7 +485,7 @@ def test_dp__add_external_candle(default_conf_usdt): # Empty dataframe df4 = generate_test_data(timeframe, 0, '2022-01-05 00:00:00+00:00') - res = dp._add_external_candle('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) + res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT) assert res[0] is False # 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00 assert res[1] == 0