From ddc45ce2ebe57c670a6419a724cc10b7b127e7ad Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 30 Aug 2022 19:30:14 -0600 Subject: [PATCH] message handling fix, data waiting fix --- freqtrade/data/dataprovider.py | 5 ++++- freqtrade/rpc/emc.py | 25 +++++++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 430ee0932..2d473683c 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -168,7 +168,10 @@ class DataProvider: timeout_str = f"for {timeout} seconds" if timeout > 0 else "indefinitely" logger.debug(f"Waiting for external data on {pair} for {timeout_str}") - pair_event.wait(timeout=timeout) + if timeout > 0: + pair_event.wait(timeout=timeout) + else: + pair_event.wait() def add_pairlisthandler(self, pairlists) -> None: """ diff --git a/freqtrade/rpc/emc.py b/freqtrade/rpc/emc.py index 48ad78266..ee4d9e6b8 100644 --- a/freqtrade/rpc/emc.py +++ b/freqtrade/rpc/emc.py @@ -164,6 +164,9 @@ class ExternalMessageConsumer: await asyncio.sleep(self.sleep_time) break + except Exception as e: + logger.exception(e) + continue except ( socket.gaierror, ConnectionRefusedError, @@ -214,16 +217,18 @@ class ExternalMessageConsumer: if message_data is None: return - key, value = message_data.get('key'), message_data.get('data') - pair, timeframe, candle_type = key + key, value = message_data.get('key'), message_data.get('value') - # Convert the JSON to a pandas DataFrame - dataframe = json_to_dataframe(value) + if key and value: + pair, timeframe, candle_type = key - # If set, remove the Entry and Exit signals from the Producer - if self._emc_config.get('remove_entry_exit_signals', False): - dataframe = remove_entry_exit_signals(dataframe) + # Convert the JSON to a pandas DataFrame + dataframe = json_to_dataframe(value) - # Add the dataframe to the dataprovider - dataprovider = self._rpc._freqtrade.dataprovider - dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) + # If set, remove the Entry and Exit signals from the Producer + if self._emc_config.get('remove_entry_exit_signals', False): + dataframe = remove_entry_exit_signals(dataframe) + + # Add the dataframe to the dataprovider + dataprovider = self._rpc._freqtrade.dataprovider + dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)