diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 78d73b07d..b889da17f 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -167,7 +167,8 @@ class DataProvider: producer_name: str = "default" ) -> Tuple[bool, int]: """ - Append a candle to the existing external dataframe + Append a candle to the existing external dataframe. The incoming dataframe + must have at least 1 candle. :param pair: pair to get the data for :param timeframe: Timeframe to get data for @@ -176,29 +177,32 @@ class DataProvider: """ pair_key = (pair, timeframe, candle_type) - if (producer_name not in self.__producer_pairs_df) \ - or (pair_key not in self.__producer_pairs_df[producer_name]): + if dataframe.empty: + # The incoming dataframe must have at least 1 candle + return (False, 0) + + if (producer_name not in self.__producer_pairs_df + or pair_key not in self.__producer_pairs_df[producer_name]): # We don't have data from this producer yet, # or we don't have data for this pair_key # return False and 1000 for the full df return (False, 1000) - existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] + existing_df, la = self.__producer_pairs_df[producer_name][pair_key] + + # Iterate over any overlapping candles and update the values + for idx, candle in dataframe.iterrows(): + existing_df.iloc[ + existing_df['date'] == candle['date'] + ] = candle + + existing_df.reset_index(drop=True, inplace=True) # CHECK FOR MISSING CANDLES timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data - # We have received this candle before, update our copy - # and return True, 0 - if local_last == incoming_first: - existing_df.iloc[-1] = dataframe.iloc[0] - existing_data = (existing_df.reset_index(drop=True), _) - - self.__producer_pairs_df[producer_name][pair_key] = existing_data - return (True, 0) - candle_difference = (incoming_first - local_last) / timeframe_delta # If the difference divided by the timeframe is 1, then this @@ -228,6 +232,7 @@ class DataProvider: # Only keep the last 1500 candles in memory existing = existing[-1500:] if len(existing) > 1500 else existing + existing.reset_index(drop=True, inplace=True) return existing diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 743698b24..278f04a8e 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -36,6 +36,9 @@ class Producer(TypedDict): ws_token: str +FULL_DATAFRAME_THRESHOLD = 100 + + logger = logging.getLogger(__name__) @@ -376,8 +379,8 @@ class ExternalMessageConsumer: logger.debug(f"Received {len(df)} candle(s) for {key}") - if len(df) >= 999: - # This is a full dataframe + if len(df) >= FULL_DATAFRAME_THRESHOLD: + # This is likely a full dataframe # Add the dataframe to the dataprovider self._dp._add_external_df( pair, @@ -388,8 +391,8 @@ class ExternalMessageConsumer: producer_name=producer_name ) - elif len(df) < 999: - # This is n single candles + elif len(df) < FULL_DATAFRAME_THRESHOLD: + # This is likely n single candles # Have dataprovider append it to # the full datafame. If it can't, # request the missing candles @@ -403,9 +406,14 @@ class ExternalMessageConsumer: ) if not did_append: - logger.debug("Holes in data or no existing df, " - f"requesting {n_missing} candles " - f"for {key} from `{producer_name}`") + # We want an overlap in candles incase some data has changed + n_missing += 1 + # Set to None for all candles if we missed a full df's worth of candles + n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500 + + logger.warning("Holes in data or no existing df, " + f"requesting {n_missing} candles " + f"for {key} from `{producer_name}`") self.send_producer_request( producer_name, diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 4ebedd6c4..331569de3 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1062,7 +1062,7 @@ class RPC: self, pair: str, timeframe: str, - limit: Optional[int] = None + limit: Optional[int] ) -> Tuple[DataFrame, datetime]: """ Get the dataframe and last analyze from the dataprovider @@ -1083,7 +1083,7 @@ class RPC: def _ws_all_analysed_dataframes( self, pairlist: List[str], - limit: Optional[int] = None + limit: Optional[int] ) -> Generator[Dict[str, Any], None, None]: """ Get the analysed dataframes of each pair in the pairlist.