From 3535aa7724c4a202007841a1efa18c70fe728ab5 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 6 Sep 2022 12:12:05 -0600 Subject: [PATCH] add last_analyzed to emitted dataframe --- freqtrade/data/dataprovider.py | 10 ++++++++-- freqtrade/rpc/external_message_consumer.py | 21 +++++++++++++-------- freqtrade/rpc/rpc.py | 3 ++- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 76e184296..44296ab40 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -121,7 +121,7 @@ class DataProvider: 'type': RPCMessageType.ANALYZED_DF, 'data': { 'key': pair_key, - 'value': dataframe + 'value': (dataframe, datetime.now(timezone.utc)) } } ) @@ -130,6 +130,7 @@ class DataProvider: self, pair: str, dataframe: DataFrame, + last_analyzed: Optional[str] = None, timeframe: Optional[str] = None, candle_type: Optional[CandleType] = None, producer_name: str = "default" @@ -149,7 +150,12 @@ class DataProvider: if producer_name not in self.__producer_pairs_df: self.__producer_pairs_df[producer_name] = {} - self.__producer_pairs_df[producer_name][pair_key] = (dataframe, datetime.now(timezone.utc)) + if not last_analyzed: + _last_analyzed = datetime.now(timezone.utc) + else: + _last_analyzed = datetime.fromisoformat(last_analyzed) + + self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") def get_external_df( diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index f0b177647..28628c1f6 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -10,7 +10,6 @@ import socket from threading import Thread from typing import Any, Dict, List, Optional -import pandas import websockets from freqtrade.data.dataprovider import DataProvider @@ -225,9 +224,12 @@ class ExternalMessageConsumer: timeout=self.reply_timeout ) - async with lock: - # Handle the message - self.handle_producer_message(producer, message) + try: + async with lock: + # Handle the message + self.handle_producer_message(producer, message) + except Exception as e: + logger.exception(f"Error handling producer message: {e}") except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): # We haven't received data yet. Check the connection and continue. @@ -300,17 +302,20 @@ class ExternalMessageConsumer: key, value = message_data.get('key'), message_data.get('value') - if key and isinstance(value, pandas.DataFrame): + if key and value: pair, timeframe, candle_type = key - dataframe = value + dataframe, last_analyzed = value # If set, remove the Entry and Exit signals from the Producer if self._emc_config.get('remove_entry_exit_signals', False): dataframe = remove_entry_exit_signals(dataframe) # Add the dataframe to the dataprovider - self._dp._add_external_df(pair, dataframe, timeframe, - candle_type, producer_name=producer_name) + self._dp._add_external_df(pair, dataframe, + last_analyzed=last_analyzed, + timeframe=timeframe, + candle_type=candle_type, + producer_name=producer_name) logger.debug( f"Consumed message from {producer_name} of type RPCMessageType.ANALYZED_DF") diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 3757c58c2..98dad278f 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1064,7 +1064,8 @@ class RPC: for pair in pairlist: dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit) - _data[pair] = {"key": (pair, timeframe, candle_type), "value": dataframe} + _data[pair] = {"key": (pair, timeframe, candle_type), + "value": (dataframe, last_analyzed)} return _data