From 2b5f0678772bea0abaf4abe93efc55de43ea3e0e Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 26 Aug 2022 23:40:13 -0600 Subject: [PATCH] Refactoring, minor improvements, data provider improvements --- freqtrade/constants.py | 6 + freqtrade/data/dataprovider.py | 106 ++++++++++++------ freqtrade/enums/__init__.py | 2 +- freqtrade/enums/externalsignal.py | 7 ++ freqtrade/freqtradebot.py | 3 - freqtrade/misc.py | 17 +++ freqtrade/rpc/api_server/webserver.py | 1 + freqtrade/rpc/external_signal/channel.py | 40 ++++--- freqtrade/rpc/external_signal/controller.py | 34 ++++-- freqtrade/rpc/external_signal/utils.py | 12 ++ freqtrade/rpc/rpc.py | 72 ++++++++---- freqtrade/rpc/rpc_manager.py | 13 +-- ...te.txt => requirements-externalsignals.txt | 1 + tests/rpc/test_rpc_apiserver.py | 2 - 14 files changed, 218 insertions(+), 98 deletions(-) rename requirements-replicate.txt => requirements-externalsignals.txt (94%) diff --git a/freqtrade/constants.py b/freqtrade/constants.py index ad0758e22..b1f189093 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -62,6 +62,7 @@ TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] FOLLOWER_MODE_OPTIONS = ['follower', 'leader'] +WAIT_DATA_POLICY_OPTIONS = ['none', 'first', 'all'] ENV_VAR_PREFIX = 'FREQTRADE__' @@ -509,6 +510,11 @@ CONF_SCHEMA = { 'follower_reply_timeout': {'type': 'integer'}, 'follower_sleep_time': {'type': 'integer'}, 'follower_ping_timeout': {'type': 'integer'}, + 'wait_data_policy': { + 'type': 'string', + 'enum': WAIT_DATA_POLICY_OPTIONS + }, + 'remove_signals_analyzed_df': {'type': 'boolean', 'default': False} }, 'required': ['mode'] }, diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 036005c84..cd70db9a3 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -15,7 +15,7 @@ from pandas import DataFrame from freqtrade.configuration import TimeRange from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe from freqtrade.data.history import load_pair_history -from freqtrade.enums import CandleType, RunMode +from freqtrade.enums import CandleType, RunMode, WaitDataPolicy from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exchange import Exchange, timeframe_to_seconds from freqtrade.util import PeriodicCache @@ -29,7 +29,12 @@ MAX_DATAFRAME_CANDLES = 1000 class DataProvider: - def __init__(self, config: dict, exchange: Optional[Exchange], pairlists=None) -> None: + def __init__( + self, + config: dict, + exchange: Optional[Exchange], + pairlists=None + ) -> None: self._config = config self._exchange = exchange self._pairlists = pairlists @@ -37,12 +42,18 @@ class DataProvider: self.__slice_index: Optional[int] = None self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} self.__external_pairs_df: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} - self.__external_pairs_event: Dict[str, Event] = {} + self.__external_pairs_event: Dict[PairWithTimeframe, Tuple[int, Event]] = {} self._msg_queue: deque = deque() self.__msg_cache = PeriodicCache( maxsize=1000, ttl=timeframe_to_seconds(self._config.get('timeframe', '1h'))) + self._num_sources = len(self._config.get('external_signal', {}).get('leader_list', [])) + self._wait_data_policy = self._config.get('external_signal', {}).get( + 'wait_data_policy', WaitDataPolicy.all) + self._wait_data_timeout = self._config.get( + 'external_signal', {}).get('wait_data_timeout', 5) + def _set_dataframe_max_index(self, limit_index: int): """ Limit analyzed dataframe to max specified index. @@ -75,57 +86,88 @@ class DataProvider: pair: str, timeframe: str, dataframe: DataFrame, - candle_type: CandleType + candle_type: CandleType, ) -> None: """ - Add the DataFrame to the __external_pairs_df. If a pair event exists, - set it to release the main thread from waiting. + Add the pair data to this class from an external source. + + :param pair: pair to get the data for + :param timeframe: Timeframe to get data for + :param candle_type: Any of the enum CandleType (must match trading mode!) """ pair_key = (pair, timeframe, candle_type) - # Delete stale data - if pair_key in self.__external_pairs_df: - del self.__external_pairs_df[pair_key] - + # For multiple leaders, if the data already exists, we'd merge self.__external_pairs_df[pair_key] = (dataframe, datetime.now(timezone.utc)) - - pair_event = self.__external_pairs_event.get(pair) - if pair_event: - logger.debug(f"Leader data for pair {pair_key} has been added") - pair_event.set() + self._set_data_event(pair_key) def get_external_df( self, pair: str, timeframe: str, - candle_type: CandleType, - wait: bool = True + candle_type: CandleType ) -> DataFrame: """ - If the pair exists in __external_pairs_df, return it. - If it doesn't, and wait is False, then return an empty df with the columns filled. - If it doesn't, and wait is True (default) create a new threading Event - in __external_pairs_event and wait on it. + Get the pair data from the external sources. Will wait if the policy is + set to, and data is not available. + + :param pair: pair to get the data for + :param timeframe: Timeframe to get data for + :param candle_type: Any of the enum CandleType (must match trading mode!) """ pair_key = (pair, timeframe, candle_type) if pair_key not in self.__external_pairs_df: - if wait: - pair_event = Event() - self.__external_pairs_event[pair] = pair_event + self._wait_on_data(pair_key) - logger.debug(f"Waiting on Leader data for: {pair_key}") - self.__external_pairs_event[pair].wait(timeout=5) - - if pair_key not in self.__external_pairs_df: - # Return empty dataframe but with expected columns merged and filled with NaN - return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) - else: - # Return empty dataframe but with expected columns merged and filled with NaN + if pair_key not in self.__external_pairs_df: return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) return self.__external_pairs_df[pair_key] + def _set_data_event(self, key: PairWithTimeframe): + """ + Depending on the WaitDataPolicy, if an event exists for this PairWithTimeframe + then set the event to release main thread from waiting. + + :param key: PairWithTimeframe + """ + pair_event = self.__external_pairs_event.get(key) + + if pair_event: + num_concat, event = pair_event + self.__external_pairs_event[key] = (num_concat + 1, event) + + if self._wait_data_policy == WaitDataPolicy.one: + logger.debug("Setting Data as policy is One") + event.set() + elif self._wait_data_policy == WaitDataPolicy.all and num_concat == self._num_sources: + logger.debug("Setting Data as policy is all, and is complete") + event.set() + + del self.__external_pairs_event[key] + + def _wait_on_data(self, key: PairWithTimeframe): + """ + Depending on the WaitDataPolicy, we will create and wait on an event until + set that determines the full amount of data is available + + :param key: PairWithTimeframe + """ + if self._wait_data_policy is not WaitDataPolicy.none: + pair, timeframe, candle_type = key + + pair_event = Event() + self.__external_pairs_event[key] = (0, pair_event) + + timeout = self._wait_data_timeout \ + if self._wait_data_policy is not WaitDataPolicy.all else 0 + + timeout_str = f"for {timeout} seconds" if timeout > 0 else "indefinitely" + logger.debug(f"Waiting for external data on {pair} for {timeout_str}") + + pair_event.wait(timeout=timeout) + def add_pairlisthandler(self, pairlists) -> None: """ Allow adding pairlisthandler after initialization diff --git a/freqtrade/enums/__init__.py b/freqtrade/enums/__init__.py index 913ef82dd..ffeb8cc12 100644 --- a/freqtrade/enums/__init__.py +++ b/freqtrade/enums/__init__.py @@ -3,7 +3,7 @@ from freqtrade.enums.backteststate import BacktestState from freqtrade.enums.candletype import CandleType from freqtrade.enums.exitchecktuple import ExitCheckTuple from freqtrade.enums.exittype import ExitType -from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType +from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy from freqtrade.enums.marginmode import MarginMode from freqtrade.enums.ordertypevalue import OrderTypeValues from freqtrade.enums.rpcmessagetype import RPCMessageType diff --git a/freqtrade/enums/externalsignal.py b/freqtrade/enums/externalsignal.py index 4695a4eab..05dc604a2 100644 --- a/freqtrade/enums/externalsignal.py +++ b/freqtrade/enums/externalsignal.py @@ -7,5 +7,12 @@ class ExternalSignalModeType(str, Enum): class LeaderMessageType(str, Enum): + default = "default" pairlist = "pairlist" analyzed_df = "analyzed_df" + + +class WaitDataPolicy(str, Enum): + none = "none" + one = "one" + all = "all" diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 9704b7e08..6aee3d104 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -281,9 +281,6 @@ class FreqtradeBot(LoggingMixin): # If external signal leader, broadcast whitelist data # Should we broadcast before trade pairs are added? - # Or should this class be made available to the PairListManager and ran - # when filter_pairlist is called? - if self.external_signal_controller: if self.external_signal_controller.is_leader(): self.rpc.emit_data({ diff --git a/freqtrade/misc.py b/freqtrade/misc.py index bc644a7ec..ceace4ed8 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -14,6 +14,7 @@ import pandas import rapidjson from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN +from freqtrade.enums.signaltype import SignalTagType, SignalType logger = logging.getLogger(__name__) @@ -271,3 +272,19 @@ def json_to_dataframe(data: str) -> pandas.DataFrame: dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) return dataframe + + +def remove_entry_exit_signals(dataframe: pandas.DataFrame): + """ + Remove Entry and Exit signals from a DataFrame + + :param dataframe: The DataFrame to remove signals from + """ + dataframe[SignalType.ENTER_LONG.value] = 0 + dataframe[SignalType.EXIT_LONG.value] = 0 + dataframe[SignalType.ENTER_SHORT.value] = 0 + dataframe[SignalType.EXIT_SHORT.value] = 0 + dataframe[SignalTagType.ENTER_TAG.value] = None + dataframe[SignalTagType.EXIT_TAG.value] = None + + return dataframe diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index c98fb9fd4..049e7dbc2 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -74,6 +74,7 @@ class ApiServer(RPCHandler): default_response_class=FTJSONResponse, ) self.configure_app(self.app, self._config) + self.start_api() def add_rpc_handler(self, rpc: RPC): """ diff --git a/freqtrade/rpc/external_signal/channel.py b/freqtrade/rpc/external_signal/channel.py index 585b6bae5..4ccb2d864 100644 --- a/freqtrade/rpc/external_signal/channel.py +++ b/freqtrade/rpc/external_signal/channel.py @@ -1,4 +1,5 @@ import logging +from threading import RLock from typing import Type from freqtrade.rpc.external_signal.proxy import WebSocketProxy @@ -63,6 +64,7 @@ class WebSocketChannel: class ChannelManager: def __init__(self): self.channels = dict() + self._lock = RLock() # Re-entrant Lock async def on_connect(self, websocket: WebSocketType): """ @@ -78,7 +80,9 @@ class ChannelManager: return ws_channel = WebSocketChannel(websocket) - self.channels[websocket] = ws_channel + + with self._lock: + self.channels[websocket] = ws_channel return ws_channel @@ -88,21 +92,26 @@ class ChannelManager: :param websocket: The WebSocket objet attached to the Channel """ - if websocket in self.channels.keys(): - channel = self.channels[websocket] + with self._lock: + channel = self.channels.get(websocket) + if channel: + logger.debug(f"Disconnecting channel - {channel}") - logger.debug(f"Disconnecting channel - {channel}") + if not channel.is_closed(): + await channel.close() - if not channel.is_closed(): - await channel.close() - del self.channels[websocket] + del self.channels[websocket] async def disconnect_all(self): """ Disconnect all Channels """ - for websocket in self.channels.keys(): - await self.on_disconnect(websocket) + with self._lock: + for websocket, channel in self.channels.items(): + if not channel.is_closed(): + await channel.close() + + self.channels = dict() async def broadcast(self, data): """ @@ -110,12 +119,13 @@ class ChannelManager: :param data: The data to send """ - for websocket, channel in self.channels.items(): - try: - await channel.send(data) - except RuntimeError: - # Handle cannot send after close cases - await self.on_disconnect(websocket) + with self._lock: + for websocket, channel in self.channels.items(): + try: + await channel.send(data) + except RuntimeError: + # Handle cannot send after close cases + await self.on_disconnect(websocket) async def send_direct(self, channel, data): """ diff --git a/freqtrade/rpc/external_signal/controller.py b/freqtrade/rpc/external_signal/controller.py index af91a67b7..01c15fc15 100644 --- a/freqtrade/rpc/external_signal/controller.py +++ b/freqtrade/rpc/external_signal/controller.py @@ -6,7 +6,7 @@ import logging import secrets import socket from threading import Thread -from typing import Any, Coroutine, Dict, Union +from typing import Any, Callable, Coroutine, Dict, Union import websockets from fastapi import Depends @@ -56,8 +56,13 @@ class ExternalSignalController(RPCHandler): self._main_task = None self._sub_tasks = None - self.channel_manager = ChannelManager() + self._message_handlers = { + LeaderMessageType.pairlist: self._rpc._handle_pairlist_message, + LeaderMessageType.analyzed_df: self._rpc._handle_analyzed_df_message, + LeaderMessageType.default: self._rpc._handle_default_message + } + self.channel_manager = ChannelManager() self.external_signal_config = config.get('external_signal', {}) # What the config should look like @@ -89,6 +94,8 @@ class ExternalSignalController(RPCHandler): self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2) self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5) + # Validate external_signal_config here? + if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0: raise ValueError("You must specify at least 1 leader in follower mode.") @@ -99,7 +106,6 @@ class ExternalSignalController(RPCHandler): default_api_key = secrets.token_urlsafe(16) self.secret_api_key = self.external_signal_config.get('api_token', default_api_key) - self.start_threaded_loop() self.start() def is_leader(self): @@ -114,6 +120,12 @@ class ExternalSignalController(RPCHandler): """ return self.external_signal_config.get('enabled', False) + def num_leaders(self): + """ + The number of leaders we should be connected to + """ + return len(self.leaders_list) + def start_threaded_loop(self): """ Start the main internal loop in another thread to run coroutines @@ -144,6 +156,7 @@ class ExternalSignalController(RPCHandler): """ Start the controller main loop """ + self.start_threaded_loop() self._main_task = self.submit_coroutine(self.main()) async def shutdown(self): @@ -242,23 +255,20 @@ class ExternalSignalController(RPCHandler): async def send_initial_data(self, channel): logger.info("Sending initial data through channel") - # We first send pairlist data - # We should move this to a func in the RPC object - initial_data = { - "data_type": LeaderMessageType.pairlist, - "data": self.freqtrade.pairlists.whitelist - } + data = self._rpc._initial_leader_data() - await channel.send(initial_data) + for message in data: + await channel.send(message) async def _handle_leader_message(self, message: MessageType): """ Handle message received from a Leader """ - type = message.get("data_type") + type = message.get("data_type", LeaderMessageType.default) data = message.get("data") - self._rpc._handle_emitted_data(type, data) + handler: Callable = self._message_handlers[type] + handler(type, data) # ---------------------------------------------------------------------- diff --git a/freqtrade/rpc/external_signal/utils.py b/freqtrade/rpc/external_signal/utils.py index 7b703810e..e5469dce3 100644 --- a/freqtrade/rpc/external_signal/utils.py +++ b/freqtrade/rpc/external_signal/utils.py @@ -1,5 +1,8 @@ +from pandas import DataFrame from starlette.websockets import WebSocket, WebSocketState +from freqtrade.enums.signaltype import SignalTagType, SignalType + async def is_websocket_alive(ws: WebSocket) -> bool: if ( @@ -8,3 +11,12 @@ async def is_websocket_alive(ws: WebSocket) -> bool: ): return True return False + + +def remove_entry_exit_signals(dataframe: DataFrame): + dataframe[SignalType.ENTER_LONG.value] = 0 + dataframe[SignalType.EXIT_LONG.value] = 0 + dataframe[SignalType.ENTER_SHORT.value] = 0 + dataframe[SignalType.EXIT_SHORT.value] = 0 + dataframe[SignalTagType.ENTER_TAG.value] = None + dataframe[SignalTagType.EXIT_TAG.value] = None diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 68871a15a..82d50f33c 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -24,7 +24,8 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, LeaderMessage from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler -from freqtrade.misc import decimals_per_coin, json_to_dataframe, shorten_date +from freqtrade.misc import (decimals_per_coin, json_to_dataframe, remove_entry_exit_signals, + shorten_date) from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence.models import PairLock from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist @@ -1090,41 +1091,64 @@ class RPC: 'last_process_ts': int(last_p.timestamp()), } - def _handle_emitted_data(self, type, data): + # ------------------------------ EXTERNAL SIGNALS ----------------------- + + def _initial_leader_data(self): + # We create a list of Messages to send to the follower on connect + data = [] + + # Send Pairlist data + data.append({ + "data_type": LeaderMessageType.pairlist, + "data": self._freqtrade.pairlists._whitelist + }) + + return data + + def _handle_pairlist_message(self, type, data): """ - Handles the emitted data from the Leaders + Handles the emitted pairlists from the Leaders :param type: The data_type of the data :param data: The data """ - logger.debug(f"Handling emitted data of type ({type})") + pairlist = data - if type == LeaderMessageType.pairlist: - pairlist = data + logger.debug(f"Handling Pairlist message: {pairlist}") - logger.debug(pairlist) + external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0] + external_pairlist.add_pairlist_data(pairlist) - # Add the pairlist data to the ExternalPairList object - external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0] - external_pairlist.add_pairlist_data(pairlist) + def _handle_analyzed_df_message(self, type, data): + """ + Handles the analyzed dataframes from the Leaders - elif type == LeaderMessageType.analyzed_df: + :param type: The data_type of the data + :param data: The data + """ + key, value = data["key"], data["value"] + pair, timeframe, candle_type = key - # Convert the dataframe back from json - key, value = data["key"], data["value"] + # Skip any pairs that we don't have in the pairlist? + # leader_pairlist = self._freqtrade.pairlists._whitelist + # if pair not in leader_pairlist: + # return - pair, timeframe, candle_type = key + dataframe = json_to_dataframe(value) - # Skip any pairs that we don't have in the pairlist? - # leader_pairlist = self._freqtrade.pairlists._whitelist - # if pair not in leader_pairlist: - # return + if self._config.get('external_signal', {}).get('remove_signals_analyzed_df', False): + dataframe = remove_entry_exit_signals(dataframe) - dataframe = json_to_dataframe(value) + logger.debug(f"Handling analyzed dataframe for {pair}") + logger.debug(dataframe.tail()) - logger.debug(f"Received analyzed dataframe for {pair}") - logger.debug(dataframe.tail()) + # Add the dataframe to the dataprovider + dataprovider = self._freqtrade.dataprovider + dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) - # Add the dataframe to the dataprovider - dataprovider = self._freqtrade.dataprovider - dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) + def _handle_default_message(self, type, data): + """ + Default leader message handler, just logs it. We should never have to + run this unless the leader sends us some weird message. + """ + logger.debug(f"Received message from Leader of type {type}: {data}") diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index 0a0e285a4..11e21da6f 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -45,25 +45,20 @@ class RPCManager: if config.get('api_server', {}).get('enabled', False): logger.info('Enabling rpc.api_server') from freqtrade.rpc.api_server import ApiServer - - # Pass replicate_rpc as param or defer starting api_server - # until we register the replicate rpc enpoint? apiserver = ApiServer(config) apiserver.add_rpc_handler(self._rpc) self.registered_modules.append(apiserver) - # Enable Replicate mode + # Enable External Signals mode # For this to be enabled, the API server must also be enabled if config.get('external_signal', {}).get('enabled', False): logger.info('Enabling RPC.ExternalSignalController') from freqtrade.rpc.external_signal import ExternalSignalController - external_signal_rpc = ExternalSignalController(self._rpc, config, apiserver) - self.registered_modules.append(external_signal_rpc) + external_signals = ExternalSignalController(self._rpc, config, apiserver) + self.registered_modules.append(external_signals) # Attach the controller to FreqTrade - freqtrade.external_signal_controller = external_signal_rpc - - apiserver.start_api() + freqtrade.external_signal_controller = external_signals def cleanup(self) -> None: """ Stops all enabled rpc modules """ diff --git a/requirements-replicate.txt b/requirements-externalsignals.txt similarity index 94% rename from requirements-replicate.txt rename to requirements-externalsignals.txt index 2c994ea2f..7920b34f6 100644 --- a/requirements-replicate.txt +++ b/requirements-externalsignals.txt @@ -4,3 +4,4 @@ # Required for follower websockets msgpack +janus diff --git a/tests/rpc/test_rpc_apiserver.py b/tests/rpc/test_rpc_apiserver.py index 9a7bdfef6..af9f9d248 100644 --- a/tests/rpc/test_rpc_apiserver.py +++ b/tests/rpc/test_rpc_apiserver.py @@ -52,7 +52,6 @@ def botclient(default_conf, mocker): try: apiserver = ApiServer(default_conf) apiserver.add_rpc_handler(rpc) - apiserver.start_api() yield ftbot, TestClient(apiserver.app) # Cleanup ... ? finally: @@ -333,7 +332,6 @@ def test_api_run(default_conf, mocker, caplog): apiserver = ApiServer(default_conf) apiserver.add_rpc_handler(RPC(get_patched_freqtradebot(mocker, default_conf))) - apiserver.start_api() assert server_mock.call_count == 1 assert apiserver._config == default_conf apiserver.start_api()