diff --git a/freqtrade/constants.py b/freqtrade/constants.py index c7f2acc84..2e580acf5 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -498,6 +498,7 @@ CONF_SCHEMA = { 'items': { 'type': 'object', 'properties': { + 'name': {'type': 'string'}, 'url': {'type': 'string', 'default': ''}, 'ws_token': {'type': 'string', 'default': ''}, } diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 9376c0b33..947387f75 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -34,8 +34,8 @@ class DataProvider: self, config: dict, exchange: Optional[Exchange], - rpc: Optional[RPCManager] = None, - pairlists=None + pairlists=None, + rpc: Optional[RPCManager] = None ) -> None: self._config = config self._exchange = exchange @@ -44,8 +44,9 @@ class DataProvider: self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} self.__slice_index: Optional[int] = None self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} - self.__external_pairs_df: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} - self.__producer_pairs: List[str] = [] + self.__producer_pairs_df: Dict[str, + Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {} + self.__producer_pairs: Dict[str, List[str]] = {} self._msg_queue: deque = deque() self.__msg_cache = PeriodicCache( @@ -84,22 +85,22 @@ class DataProvider: dataframe, datetime.now(timezone.utc)) # For multiple producers we will want to merge the pairlists instead of overwriting - def set_producer_pairs(self, pairlist: List[str]): + def set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"): """ Set the pairs received to later be used. This only supports 1 Producer right now. :param pairlist: List of pairs """ - self.__producer_pairs = pairlist.copy() + self.__producer_pairs[producer_name] = pairlist.copy() - def get_producer_pairs(self) -> List[str]: + def get_producer_pairs(self, producer_name: str = "default") -> List[str]: """ Get the pairs cached from the producer :returns: List of pairs """ - return self.__producer_pairs + return self.__producer_pairs.get(producer_name, []) def emit_df( self, @@ -129,6 +130,7 @@ class DataProvider: timeframe: str, dataframe: DataFrame, candle_type: CandleType, + producer_name: str = "default" ) -> None: """ Add the pair data to this class from an external source. @@ -139,15 +141,19 @@ class DataProvider: """ pair_key = (pair, timeframe, candle_type) + if producer_name not in self.__producer_pairs_df: + self.__producer_pairs_df[producer_name] = {} + # For multiple leaders, if the data already exists, we'd merge - self.__external_pairs_df[pair_key] = (dataframe, datetime.now(timezone.utc)) + self.__producer_pairs_df[producer_name][pair_key] = (dataframe, datetime.now(timezone.utc)) def get_external_df( self, pair: str, timeframe: str, - candle_type: CandleType - ) -> DataFrame: + candle_type: CandleType, + producer_name: str = "default" + ) -> Tuple[DataFrame, datetime]: """ Get the pair data from the external sources. Will wait if the policy is set to, and data is not available. @@ -158,11 +164,15 @@ class DataProvider: """ pair_key = (pair, timeframe, candle_type) - if pair_key not in self.__external_pairs_df: + if producer_name not in self.__producer_pairs_df: # We don't have this data yet, return empty DataFrame and datetime (01-01-1970) return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) - return self.__external_pairs_df[pair_key] + if pair_key not in self.__producer_pairs_df: + # We don't have this data yet, return empty DataFrame and datetime (01-01-1970) + return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) + + return self.__producer_pairs_df[producer_name][pair_key] def add_pairlisthandler(self, pairlists) -> None: """ diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index f7b7ad80b..19c77d403 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -30,7 +30,7 @@ from freqtrade.plugins.pairlistmanager import PairListManager from freqtrade.plugins.protectionmanager import ProtectionManager from freqtrade.resolvers import ExchangeResolver, StrategyResolver from freqtrade.rpc import RPCManager -from freqtrade.rpc.emc import ExternalMessageConsumer +from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer from freqtrade.strategy.interface import IStrategy from freqtrade.strategy.strategy_wrapper import strategy_safe_wrapper from freqtrade.util import FtPrecise @@ -85,7 +85,7 @@ class FreqtradeBot(LoggingMixin): # Keep this at the end of this initialization method. self.rpc: RPCManager = RPCManager(self) - self.dataprovider = DataProvider(self.config, self.exchange, self.rpc, self.pairlists) + self.dataprovider = DataProvider(self.config, self.exchange, self.pairlists, self.rpc) # Attach Dataprovider to strategy instance self.strategy.dp = self.dataprovider @@ -202,8 +202,9 @@ class FreqtradeBot(LoggingMixin): # This just means we won't broadcast dataframes if we're listening to a producer # Doesn't necessarily NEED to be this way, as maybe we'd like to broadcast # even if we are using external dataframes in the future. + self.strategy.analyze(self.active_pair_whitelist, - external_data=self.dataprovider.external_data_enabled) + emit_df=self.dataprovider.external_data_enabled) with self._exit_lock: # Check for exchange cancelations, timeouts and user requested replace diff --git a/freqtrade/rpc/emc.py b/freqtrade/rpc/external_message_consumer.py similarity index 95% rename from freqtrade/rpc/emc.py rename to freqtrade/rpc/external_message_consumer.py index 3d78bc257..ae72089b5 100644 --- a/freqtrade/rpc/emc.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -162,7 +162,7 @@ class ExternalMessageConsumer: async with lock: # Handle the message - self.handle_producer_message(message) + self.handle_producer_message(producer, message) except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): # We haven't received data yet. Check the connection and continue. @@ -210,10 +210,11 @@ class ExternalMessageConsumer: # How we do things here isn't set in stone. There seems to be some interest # in figuring out a better way, but we shall do this for now. - def handle_producer_message(self, message: Dict[str, Any]): + def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]): """ Handles external messages from a Producer """ + producer_name = producer.get('name', 'default') # Should we have a default message type? message_type = message.get('type', RPCMessageType.STATUS) message_data = message.get('data') @@ -229,7 +230,7 @@ class ExternalMessageConsumer: pairlist = message_data # Add the pairlist data to the DataProvider - self._dp.set_producer_pairs(pairlist) + self._dp.set_producer_pairs(pairlist, producer_name=producer_name) # Handle analyzed dataframes elif message_type == RPCMessageType.ANALYZED_DF: @@ -246,4 +247,5 @@ class ExternalMessageConsumer: dataframe = remove_entry_exit_signals(dataframe) # Add the dataframe to the dataprovider - self._dp.add_external_df(pair, timeframe, dataframe, candle_type) + self._dp.add_external_df(pair, timeframe, dataframe, + candle_type, producer_name=producer_name) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index a06b6506e..34e475ed7 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -682,8 +682,7 @@ class IStrategy(ABC, HyperStrategyMixin): def analyze_ticker( self, dataframe: DataFrame, - metadata: dict, - populate_indicators: bool = True + metadata: dict ) -> DataFrame: """ Parses the given candle (OHLCV) data and returns a populated DataFrame @@ -693,8 +692,7 @@ class IStrategy(ABC, HyperStrategyMixin): :return: DataFrame of candle (OHLCV) data with indicator data and signals added """ logger.debug("TA Analysis Launched") - if populate_indicators: - dataframe = self.advise_indicators(dataframe, metadata) + dataframe = self.advise_indicators(dataframe, metadata) dataframe = self.advise_entry(dataframe, metadata) dataframe = self.advise_exit(dataframe, metadata) return dataframe @@ -703,7 +701,7 @@ class IStrategy(ABC, HyperStrategyMixin): self, dataframe: DataFrame, metadata: dict, - external_data: bool = False + emit_df: bool = False ) -> DataFrame: """ Parses the given candle (OHLCV) data and returns a populated DataFrame @@ -720,16 +718,15 @@ class IStrategy(ABC, HyperStrategyMixin): if (not self.process_only_new_candles or self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]['date']): - populate_indicators = not external_data # Defs that only make change on new candle data. - dataframe = self.analyze_ticker(dataframe, metadata, populate_indicators) + dataframe = self.analyze_ticker(dataframe, metadata) self._last_candle_seen_per_pair[pair] = dataframe.iloc[-1]['date'] candle_type = self.config.get('candle_type_def', CandleType.SPOT) self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) - if populate_indicators: + if emit_df: self.dp.emit_df((pair, self.timeframe, candle_type), dataframe) else: @@ -743,7 +740,7 @@ class IStrategy(ABC, HyperStrategyMixin): def analyze_pair( self, pair: str, - external_data: bool = False + emit_df: bool = False ) -> None: """ Fetch data for this pair from dataprovider and analyze. @@ -764,7 +761,7 @@ class IStrategy(ABC, HyperStrategyMixin): dataframe = strategy_safe_wrapper( self._analyze_ticker_internal, message="" - )(dataframe, {'pair': pair}, external_data) + )(dataframe, {'pair': pair}, emit_df) self.assert_df(dataframe, df_len, df_close, df_date) except StrategyError as error: @@ -778,14 +775,14 @@ class IStrategy(ABC, HyperStrategyMixin): def analyze( self, pairs: List[str], - external_data: bool = False + emit_df: bool = False ) -> None: """ Analyze all pairs using analyze_pair(). :param pairs: List of pairs to analyze """ for pair in pairs: - self.analyze_pair(pair, external_data) + self.analyze_pair(pair, emit_df) @ staticmethod def preserve_df(dataframe: DataFrame) -> Tuple[int, float, datetime]: diff --git a/tests/rpc/test_rpc_telegram.py b/tests/rpc/test_rpc_telegram.py index a30115bd9..c7ae7cb74 100644 --- a/tests/rpc/test_rpc_telegram.py +++ b/tests/rpc/test_rpc_telegram.py @@ -2138,10 +2138,9 @@ def test_send_msg_strategy_msg_notification(default_conf, mocker) -> None: def test_send_msg_unknown_type(default_conf, mocker) -> None: telegram, _, _ = get_telegram_testobject(mocker, default_conf) - with pytest.raises(NotImplementedError, match=r'Unknown message type: None'): - telegram.send_msg({ - 'type': None, - }) + telegram.send_msg({ + 'type': None, + }) @pytest.mark.parametrize('message_type,enter,enter_signal,leverage', [