From d2c8487ecf01b90fab34dd55cc8d76bdd9bf5c2d Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 27 Nov 2022 13:11:43 -0700 Subject: [PATCH] update add_external_candle, fix breaking on ping error, handle empty dataframes --- freqtrade/data/dataprovider.py | 14 +++++++++----- freqtrade/rpc/external_message_consumer.py | 20 ++++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 19b5df652..42fe2f603 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -179,15 +179,19 @@ class DataProvider: if producer_name not in self.__producer_pairs_df: # We don't have data from this producer yet, # so we can't append a candle - return (False, 0) + return (False, 999) if pair_key not in self.__producer_pairs_df[producer_name]: # We don't have data for this pair_key, # so we can't append a candle - return (False, 0) + return (False, 999) # CHECK FOR MISSING CANDLES - # return (False, int > 0) + # Calculate difference between last candle in local dataframe + # and first candle in incoming dataframe. Take difference and divide + # by timeframe to find out how many candles we still need. If 1 + # then the incoming candle is the right candle. If more than 1, + # return (False, missing candles - 1) existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] appended_df = self._append_candle_to_dataframe(existing_df, dataframe) @@ -207,8 +211,8 @@ class DataProvider: if existing.iloc[-1]['date'] != new.iloc[-1]['date']: existing = concat([existing, new]) - # Only keep the last 1000 candles in memory - existing = existing[-1000:] if len(existing) > 1000 else existing + # Only keep the last 1500 candles in memory + existing = existing[-1500:] if len(existing) > 1000 else existing return existing diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 17c4e1aa0..13c2e5fb3 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -248,7 +248,7 @@ class ExternalMessageConsumer: # Now send any subsequent requests published to # this channel's stream - async for request in channel_stream: + async for request, _ in channel_stream: logger.info(f"Sending request to channel - {channel} - {request}") await channel.send(request) @@ -292,13 +292,13 @@ class ExternalMessageConsumer: except (websockets.exceptions.ConnectionClosed): # Just eat the error and continue reconnecting logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + break except Exception as e: # Just eat the error and continue reconnecting logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) - - finally: await asyncio.sleep(self.sleep_time) break @@ -372,10 +372,16 @@ class ExternalMessageConsumer: pair, timeframe, candle_type = key + if df.empty: + logger.info(f"Received Empty Dataframe for {key}") + return + # If set, remove the Entry and Exit signals from the Producer if self._emc_config.get('remove_entry_exit_signals', False): df = remove_entry_exit_signals(df) + logger.info(f"Received {len(df)} candle(s) for {key}") + if len(df) >= 999: # This is a full dataframe # Add the dataframe to the dataprovider @@ -404,13 +410,14 @@ class ExternalMessageConsumer: if not did_append: logger.info("Holes in data or no existing df, " - f"requesting data for {key} from `{producer_name}`") + f"requesting {n_missing} candles " + f"for {key} from `{producer_name}`") self.send_producer_request( producer_name, WSAnalyzedDFRequest( data={ - "limit": n_missing if n_missing > 0 else 1000, + "limit": n_missing, "pair": pair } ) @@ -418,4 +425,5 @@ class ExternalMessageConsumer: return logger.info( - f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") + f"Consumed message from `{producer_name}` " + f"of type `RPCMessageType.ANALYZED_DF` for {key}")