stable/docs/producer-consumer.md
2023-03-29 20:43:23 +02:00

8.3 KiB

Producer / Consumer mode

freqtrade provides a mechanism whereby an instance (also called consumer) may listen to messages from an upstream freqtrade instance (also called producer) using the message websocket. Mainly, analyzed_df and whitelist messages. This allows the reuse of computed indicators (and signals) for pairs in multiple bots without needing to compute them multiple times.

See Message Websocket in the Rest API docs for setting up the api_server configuration for your message websocket (this will be your producer).

!!! Note We strongly recommend to set ws_token to something random and known only to yourself to avoid unauthorized access to your bot.

Configuration

Enable subscribing to an instance by adding the external_message_consumer section to the consumer's config file.

{
    //...
   "external_message_consumer": {
        "enabled": true,
        "producers": [
            {
                "name": "default", // This can be any name you'd like, default is "default"
                "host": "127.0.0.1", // The host from your producer's api_server config
                "port": 8080, // The port from your producer's api_server config
                "secure": false, // Use a secure websockets connection, default false
                "ws_token": "sercet_Ws_t0ken" // The ws_token from your producer's api_server config
            }
        ],
        // The following configurations are optional, and usually not required
        // "wait_timeout": 300,
        // "ping_timeout": 10,
        // "sleep_time": 10,
        // "remove_entry_exit_signals": false,
        // "message_size_limit": 8
    }
    //...
}
Parameter Description
enabled Required. Enable consumer mode. If set to false, all other settings in this section are ignored.
Defaults to false.
Datatype: boolean .
producers Required. List of producers
Datatype: Array.
producers.name Required. Name of this producer. This name must be used in calls to get_producer_pairs() and get_producer_df() if more than one producer is used.
Datatype: string
producers.host Required. The hostname or IP address from your producer.
Datatype: string
producers.port Required. The port matching the above host.
Defaults to 8080.
Datatype: Integer
producers.secure Optional. Use ssl in websockets connection. Default False.
Datatype: string
producers.ws_token Required. ws_token as configured on the producer.
Datatype: string
Optional settings
wait_timeout Timeout until we ping again if no message is received.
Defaults to 300.
Datatype: Integer - in seconds.
ping_timeout Ping timeout
Defaults to 10.
Datatype: Integer - in seconds.
sleep_time Sleep time before retrying to connect.
Defaults to 10.
Datatype: Integer - in seconds.
remove_entry_exit_signals Remove signal columns from the dataframe (set them to 0) on dataframe receipt.
Defaults to False.
Datatype: Boolean.
message_size_limit Size limit per message
Defaults to 8.
Datatype: Integer - Megabytes.

Instead of (or as well as) calculating indicators in populate_indicators() the follower instance listens on the connection to a producer instance's messages (or multiple producer instances in advanced configurations) and requests the producer's most recently analyzed dataframes for each pair in the active whitelist.

A consumer instance will then have a full copy of the analyzed dataframes without the need to calculate them itself.

Examples

Example - Producer Strategy

A simple strategy with multiple indicators. No special considerations are required in the strategy itself.

class ProducerStrategy(IStrategy):
    #...
    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Calculate indicators in the standard freqtrade way which can then be broadcast to other instances
        """
        dataframe['rsi'] = ta.RSI(dataframe)
        bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(dataframe), window=20, stds=2)
        dataframe['bb_lowerband'] = bollinger['lower']
        dataframe['bb_middleband'] = bollinger['mid']
        dataframe['bb_upperband'] = bollinger['upper']
        dataframe['tema'] = ta.TEMA(dataframe, timeperiod=9)

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Populates the entry signal for the given dataframe
        """
        dataframe.loc[
            (
                (qtpylib.crossed_above(dataframe['rsi'], self.buy_rsi.value)) &
                (dataframe['tema'] <= dataframe['bb_middleband']) &
                (dataframe['tema'] > dataframe['tema'].shift(1)) &
                (dataframe['volume'] > 0)
            ),
            'enter_long'] = 1

        return dataframe

!!! Tip "FreqAI" You can use this to setup FreqAI on a powerful machine, while you run consumers on simple machines like raspberries, which can interpret the signals generated from the producer in different ways.

Example - Consumer Strategy

A logically equivalent strategy which calculates no indicators itself, but will have the same analyzed dataframes available to make trading decisions based on the indicators calculated in the producer. In this example the consumer has the same entry criteria, however this is not necessary. The consumer may use different logic to enter/exit trades, and only use the indicators as specified.

class ConsumerStrategy(IStrategy):
    #...
    process_only_new_candles = False # required for consumers

    _columns_to_expect = ['rsi_default', 'tema_default', 'bb_middleband_default']

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Use the websocket api to get pre-populated indicators from another freqtrade instance.
        Use `self.dp.get_producer_df(pair)` to get the dataframe
        """
        pair = metadata['pair']
        timeframe = self.timeframe

        producer_pairs = self.dp.get_producer_pairs()
        # You can specify which producer to get pairs from via:
        # self.dp.get_producer_pairs("my_other_producer")

        # This func returns the analyzed dataframe, and when it was analyzed
        producer_dataframe, _ = self.dp.get_producer_df(pair)
        # You can get other data if the producer makes it available:
        # self.dp.get_producer_df(
        #   pair,
        #   timeframe="1h",
        #   candle_type=CandleType.SPOT,
        #   producer_name="my_other_producer"
        # )

        if not producer_dataframe.empty:
            # If you plan on passing the producer's entry/exit signal directly,
            # specify ffill=False or it will have unintended results
            merged_dataframe = merge_informative_pair(dataframe, producer_dataframe,
                                                      timeframe, timeframe,
                                                      append_timeframe=False,
                                                      suffix="default")
            return merged_dataframe
        else:
            dataframe[self._columns_to_expect] = 0

        return dataframe

    def populate_entry_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        """
        Populates the entry signal for the given dataframe
        """
        # Use the dataframe columns as if we calculated them ourselves
        dataframe.loc[
            (
                (qtpylib.crossed_above(dataframe['rsi_default'], self.buy_rsi.value)) &
                (dataframe['tema_default'] <= dataframe['bb_middleband_default']) &
                (dataframe['tema_default'] > dataframe['tema_default'].shift(1)) &
                (dataframe['volume'] > 0)
            ),
            'enter_long'] = 1

        return dataframe

!!! Tip "Using upstream signals" By setting remove_entry_exit_signals=false, you can also use the producer's signals directly. They should be available as enter_long_default (assuming suffix="default" was used) - and can be used as either signal directly, or as additional indicator.