From d474111a65a07c3133d7e2502be648b362fb72ce Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 24 Aug 2022 22:42:29 -0600 Subject: [PATCH] Renamed to external signals, controller class refactored --- freqtrade/constants.py | 8 +- freqtrade/data/dataprovider.py | 33 +- freqtrade/enums/__init__.py | 2 +- .../enums/{replicate.py => externalsignal.py} | 2 +- freqtrade/freqtradebot.py | 19 +- .../plugins/pairlist/ExternalPairList.py | 4 +- freqtrade/rpc/external_signal/__init__.py | 5 + .../{replicate => external_signal}/channel.py | 6 +- .../controller.py} | 288 +++++++++--------- .../{replicate => external_signal}/proxy.py | 2 +- .../serializer.py | 2 +- .../thread_queue.py | 0 .../{replicate => external_signal}/types.py | 0 .../{replicate => external_signal}/utils.py | 0 freqtrade/rpc/rpc.py | 10 +- freqtrade/rpc/rpc_manager.py | 12 +- freqtrade/strategy/interface.py | 4 +- 17 files changed, 203 insertions(+), 194 deletions(-) rename freqtrade/enums/{replicate.py => externalsignal.py} (80%) create mode 100644 freqtrade/rpc/external_signal/__init__.py rename freqtrade/rpc/{replicate => external_signal}/channel.py (94%) rename freqtrade/rpc/{replicate/__init__.py => external_signal/controller.py} (74%) rename freqtrade/rpc/{replicate => external_signal}/proxy.py (96%) rename freqtrade/rpc/{replicate => external_signal}/serializer.py (96%) rename freqtrade/rpc/{replicate => external_signal}/thread_queue.py (100%) rename freqtrade/rpc/{replicate => external_signal}/types.py (100%) rename freqtrade/rpc/{replicate => external_signal}/utils.py (100%) diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 55363cca1..ad0758e22 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -245,7 +245,7 @@ CONF_SCHEMA = { 'exchange': {'$ref': '#/definitions/exchange'}, 'edge': {'$ref': '#/definitions/edge'}, 'freqai': {'$ref': '#/definitions/freqai'}, - 'replicate': {'$ref': '#/definitions/replicate'}, + 'external_signal': {'$ref': '#/definitions/external_signal'}, 'experimental': { 'type': 'object', 'properties': { @@ -487,7 +487,7 @@ CONF_SCHEMA = { }, 'required': ['process_throttle_secs', 'allowed_risk'] }, - 'replicate': { + 'external_signal': { 'type': 'object', 'properties': { 'enabled': {'type': 'boolean', 'default': False}, @@ -495,14 +495,14 @@ CONF_SCHEMA = { 'type': 'string', 'enum': FOLLOWER_MODE_OPTIONS }, - 'api_key': {'type': 'string', 'default': ''}, + 'api_token': {'type': 'string', 'default': ''}, 'leaders': { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'url': {'type': 'string', 'default': ''}, - 'token': {'type': 'string', 'default': ''}, + 'api_token': {'type': 'string', 'default': ''}, } } }, diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 3de73bb0d..036005c84 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -29,8 +29,7 @@ MAX_DATAFRAME_CANDLES = 1000 class DataProvider: - def __init__(self, config: dict, exchange: Optional[Exchange], - pairlists=None, replicate_controller=None) -> None: + def __init__(self, config: dict, exchange: Optional[Exchange], pairlists=None) -> None: self._config = config self._exchange = exchange self._pairlists = pairlists @@ -99,25 +98,33 @@ class DataProvider: self, pair: str, timeframe: str, - candle_type: CandleType + candle_type: CandleType, + wait: bool = True ) -> DataFrame: """ - If the pair exists in __external_pairs_df, return it. If it doesn't, - create a new threading Event in __external_pairs_event and wait on it. + If the pair exists in __external_pairs_df, return it. + If it doesn't, and wait is False, then return an empty df with the columns filled. + If it doesn't, and wait is True (default) create a new threading Event + in __external_pairs_event and wait on it. """ pair_key = (pair, timeframe, candle_type) + if pair_key not in self.__external_pairs_df: - pair_event = Event() - self.__external_pairs_event[pair] = pair_event + if wait: + pair_event = Event() + self.__external_pairs_event[pair] = pair_event - logger.debug(f"Waiting on Leader data for: {pair_key}") - self.__external_pairs_event[pair].wait() + logger.debug(f"Waiting on Leader data for: {pair_key}") + self.__external_pairs_event[pair].wait(timeout=5) - if pair_key in self.__external_pairs_df: - return self.__external_pairs_df[pair_key] + if pair_key not in self.__external_pairs_df: + # Return empty dataframe but with expected columns merged and filled with NaN + return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) + else: + # Return empty dataframe but with expected columns merged and filled with NaN + return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) - # Because of the waiting mechanism, this should never return - return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) + return self.__external_pairs_df[pair_key] def add_pairlisthandler(self, pairlists) -> None: """ diff --git a/freqtrade/enums/__init__.py b/freqtrade/enums/__init__.py index e1057208a..913ef82dd 100644 --- a/freqtrade/enums/__init__.py +++ b/freqtrade/enums/__init__.py @@ -3,9 +3,9 @@ from freqtrade.enums.backteststate import BacktestState from freqtrade.enums.candletype import CandleType from freqtrade.enums.exitchecktuple import ExitCheckTuple from freqtrade.enums.exittype import ExitType +from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType from freqtrade.enums.marginmode import MarginMode from freqtrade.enums.ordertypevalue import OrderTypeValues -from freqtrade.enums.replicate import LeaderMessageType, ReplicateModeType from freqtrade.enums.rpcmessagetype import RPCMessageType from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType diff --git a/freqtrade/enums/replicate.py b/freqtrade/enums/externalsignal.py similarity index 80% rename from freqtrade/enums/replicate.py rename to freqtrade/enums/externalsignal.py index 8d036f0b9..4695a4eab 100644 --- a/freqtrade/enums/replicate.py +++ b/freqtrade/enums/externalsignal.py @@ -1,7 +1,7 @@ from enum import Enum -class ReplicateModeType(str, Enum): +class ExternalSignalModeType(str, Enum): leader = "leader" follower = "follower" diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 3b850dd4e..9704b7e08 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -75,7 +75,7 @@ class FreqtradeBot(LoggingMixin): PairLocks.timeframe = self.config['timeframe'] - self.replicate_controller = None + self.external_signal_controller = None self.pairlists = PairListManager(self.exchange, self.config) @@ -93,9 +93,6 @@ class FreqtradeBot(LoggingMixin): # Attach Wallets to strategy instance self.strategy.wallets = self.wallets - # Attach ReplicateController to the strategy - # self.strategy.replicate_controller = self.replicate_controller - # Initializing Edge only if enabled self.edge = Edge(self.config, self.exchange, self.strategy) if \ self.config.get('edge', {}).get('enabled', False) else None @@ -197,8 +194,8 @@ class FreqtradeBot(LoggingMixin): strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)() - if self.replicate_controller: - if not self.replicate_controller.is_leader(): + if self.external_signal_controller: + if not self.external_signal_controller.is_leader(): # Run Follower mode analyzing leader_pairs = self.pairlists._whitelist self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs) @@ -281,16 +278,14 @@ class FreqtradeBot(LoggingMixin): self.pairlists.refresh_pairlist() _whitelist = self.pairlists.whitelist - # If replicate leader, broadcast whitelist data - # Should we broadcast before trade pairs are added? What if - # the follower doesn't have trades with those pairs. They would be added for - # no reason. + # If external signal leader, broadcast whitelist data + # Should we broadcast before trade pairs are added? # Or should this class be made available to the PairListManager and ran # when filter_pairlist is called? - if self.replicate_controller: - if self.replicate_controller.is_leader(): + if self.external_signal_controller: + if self.external_signal_controller.is_leader(): self.rpc.emit_data({ "data_type": LeaderMessageType.pairlist, "data": _whitelist diff --git a/freqtrade/plugins/pairlist/ExternalPairList.py b/freqtrade/plugins/pairlist/ExternalPairList.py index 40e3f9a7f..27a328060 100644 --- a/freqtrade/plugins/pairlist/ExternalPairList.py +++ b/freqtrade/plugins/pairlist/ExternalPairList.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) class ExternalPairList(IPairList): """ - PairList plugin for use with replicate follower mode. + PairList plugin for use with external signal follower mode. Will use pairs given from leader data. Usage: @@ -67,6 +67,8 @@ class ExternalPairList(IPairList): def add_pairlist_data(self, pairlist: List[str]): """ Add pairs from Leader + + :param pairlist: List of pairs """ # If some pairs were removed on Leader, remove them here diff --git a/freqtrade/rpc/external_signal/__init__.py b/freqtrade/rpc/external_signal/__init__.py new file mode 100644 index 000000000..c1b05b3f0 --- /dev/null +++ b/freqtrade/rpc/external_signal/__init__.py @@ -0,0 +1,5 @@ +# flake8: noqa: F401 +from freqtrade.rpc.external_signal.controller import ExternalSignalController + + +__all__ = ('ExternalSignalController') diff --git a/freqtrade/rpc/replicate/channel.py b/freqtrade/rpc/external_signal/channel.py similarity index 94% rename from freqtrade/rpc/replicate/channel.py rename to freqtrade/rpc/external_signal/channel.py index 62ed3e025..585b6bae5 100644 --- a/freqtrade/rpc/replicate/channel.py +++ b/freqtrade/rpc/external_signal/channel.py @@ -1,9 +1,9 @@ import logging from typing import Type -from freqtrade.rpc.replicate.proxy import WebSocketProxy -from freqtrade.rpc.replicate.serializer import MsgPackWebSocketSerializer, WebSocketSerializer -from freqtrade.rpc.replicate.types import WebSocketType +from freqtrade.rpc.external_signal.proxy import WebSocketProxy +from freqtrade.rpc.external_signal.serializer import MsgPackWebSocketSerializer, WebSocketSerializer +from freqtrade.rpc.external_signal.types import WebSocketType logger = logging.getLogger(__name__) diff --git a/freqtrade/rpc/replicate/__init__.py b/freqtrade/rpc/external_signal/controller.py similarity index 74% rename from freqtrade/rpc/replicate/__init__.py rename to freqtrade/rpc/external_signal/controller.py index 5cc2ae6a9..af91a67b7 100644 --- a/freqtrade/rpc/replicate/__init__.py +++ b/freqtrade/rpc/external_signal/controller.py @@ -5,8 +5,7 @@ import asyncio import logging import secrets import socket -import traceback -from threading import Event, Thread +from threading import Thread from typing import Any, Coroutine, Dict, Union import websockets @@ -14,18 +13,18 @@ from fastapi import Depends from fastapi import WebSocket as FastAPIWebSocket from fastapi import WebSocketDisconnect, status -from freqtrade.enums import LeaderMessageType, ReplicateModeType, RPCMessageType +from freqtrade.enums import ExternalSignalModeType, LeaderMessageType, RPCMessageType from freqtrade.rpc import RPC, RPCHandler -from freqtrade.rpc.replicate.channel import ChannelManager -from freqtrade.rpc.replicate.thread_queue import Queue as ThreadedQueue -from freqtrade.rpc.replicate.types import MessageType -from freqtrade.rpc.replicate.utils import is_websocket_alive +from freqtrade.rpc.external_signal.channel import ChannelManager +from freqtrade.rpc.external_signal.thread_queue import Queue as ThreadedQueue +from freqtrade.rpc.external_signal.types import MessageType +from freqtrade.rpc.external_signal.utils import is_websocket_alive logger = logging.getLogger(__name__) -class ReplicateController(RPCHandler): +class ExternalSignalController(RPCHandler): """ This class handles all websocket communication """ def __init__( @@ -35,9 +34,10 @@ class ReplicateController(RPCHandler): api_server: Union[Any, None] = None ) -> None: """ - Init the ReplicateRPC class, and init the super class RPCHandler + Init the ExternalSignalController class, and init the super class RPCHandler :param rpc: instance of RPC Helper class :param config: Configuration object + :param api_server: The ApiServer object :return: None """ super().__init__(rpc, config) @@ -46,48 +46,50 @@ class ReplicateController(RPCHandler): self.api_server = api_server if not self.api_server: - raise RuntimeError("The API server must be enabled for replicate to work") + raise RuntimeError("The API server must be enabled for external signals to work") self._loop = None self._running = False self._thread = None self._queue = None - self._stop_event = Event() - self._follower_tasks = None + self._main_task = None + self._sub_tasks = None self.channel_manager = ChannelManager() - self.replicate_config = config.get('replicate', {}) + self.external_signal_config = config.get('external_signal', {}) # What the config should look like - # "replicate": { + # "external_signal": { # "enabled": true, # "mode": "follower", # "leaders": [ # { - # "url": "ws://localhost:8080/replicate/ws", - # "token": "test" + # "url": "ws://localhost:8080/signals/ws", + # "api_token": "test" # } # ] # } - # "replicate": { + # "external_signal": { # "enabled": true, # "mode": "leader", - # "api_key": "test" + # "api_token": "test" # } - self.mode = ReplicateModeType[self.replicate_config.get('mode', 'leader').lower()] + self.mode = ExternalSignalModeType[ + self.external_signal_config.get('mode', 'leader').lower() + ] - self.leaders_list = self.replicate_config.get('leaders', []) - self.push_throttle_secs = self.replicate_config.get('push_throttle_secs', 0.1) + self.leaders_list = self.external_signal_config.get('leaders', []) + self.push_throttle_secs = self.external_signal_config.get('push_throttle_secs', 0.1) - self.reply_timeout = self.replicate_config.get('follower_reply_timeout', 10) - self.ping_timeout = self.replicate_config.get('follower_ping_timeout', 2) - self.sleep_time = self.replicate_config.get('follower_sleep_time', 5) + self.reply_timeout = self.external_signal_config.get('follower_reply_timeout', 10) + self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2) + self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5) - if self.mode == ReplicateModeType.follower and len(self.leaders_list) == 0: + if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0: raise ValueError("You must specify at least 1 leader in follower mode.") # This is only used by the leader, the followers use the tokens specified @@ -95,12 +97,23 @@ class ReplicateController(RPCHandler): # If you do not specify an API key in the config, one will be randomly # generated and logged on startup default_api_key = secrets.token_urlsafe(16) - self.secret_api_key = self.replicate_config.get('api_key', default_api_key) + self.secret_api_key = self.external_signal_config.get('api_token', default_api_key) self.start_threaded_loop() - self.start() + def is_leader(self): + """ + Leader flag + """ + return self.enabled() and self.mode == ExternalSignalModeType.leader + + def enabled(self): + """ + Enabled flag + """ + return self.external_signal_config.get('enabled', False) + def start_threaded_loop(self): """ Start the main internal loop in another thread to run coroutines @@ -125,36 +138,29 @@ class ReplicateController(RPCHandler): raise RuntimeError("Loop must be started before any function can" " be submitted") - try: - return asyncio.run_coroutine_threadsafe(coroutine, self._loop) - except Exception as e: - logger.error(f"Error running coroutine - {str(e)}") - return None - - async def main_loop(self): - """ - Main loop coro - - Start the loop based on what mode we're in - """ - try: - if self.mode == ReplicateModeType.leader: - await self.leader_loop() - elif self.mode == ReplicateModeType.follower: - await self.follower_loop() - - except asyncio.CancelledError: - pass - except Exception: - pass - finally: - self._loop.stop() + return asyncio.run_coroutine_threadsafe(coroutine, self._loop) def start(self): """ Start the controller main loop """ - self.submit_coroutine(self.main_loop()) + self._main_task = self.submit_coroutine(self.main()) + + async def shutdown(self): + """ + Shutdown all tasks and close up + """ + logger.info("Stopping rpc.externalsignalcontroller") + + # Flip running flag + self._running = False + + # Cancel sub tasks + for task in self._sub_tasks: + task.cancel() + + # Then disconnect all channels + await self.channel_manager.disconnect_all() def cleanup(self) -> None: """ @@ -162,18 +168,44 @@ class ReplicateController(RPCHandler): """ if self._thread: if self._loop.is_running(): - - self._running = False - - # Tell all coroutines submitted to the loop they're cancelled - pending = asyncio.all_tasks(loop=self._loop) - for task in pending: - task.cancel() - - self._loop.call_soon_threadsafe(self.channel_manager.disconnect_all) - + self._main_task.cancel() self._thread.join() + async def main(self): + """ + Main coro + + Start the loop based on what mode we're in + """ + try: + if self.mode == ExternalSignalModeType.leader: + logger.info("Starting rpc.externalsignalcontroller in Leader mode") + + await self.run_leader_mode() + elif self.mode == ExternalSignalModeType.follower: + logger.info("Starting rpc.externalsignalcontroller in Follower mode") + + await self.run_follower_mode() + + except asyncio.CancelledError: + # We're cancelled + await self.shutdown() + except Exception as e: + # Log the error + logger.error(f"Exception occurred in main task: {e}") + logger.exception(e) + finally: + # This coroutine is the last thing to be ended, so it should stop the loop + self._loop.stop() + + def log_api_token(self): + """ + Log the API token + """ + logger.info("-" * 15) + logger.info(f"API_KEY: {self.secret_api_key}") + logger.info("-" * 15) + def send_msg(self, msg: MessageType) -> None: """ Support RPC calls @@ -186,7 +218,9 @@ class ReplicateController(RPCHandler): logger.error(f"Message is empty! {msg}") def send_message(self, msg: MessageType) -> None: - """ Broadcast message over all channels if there are any """ + """ + Broadcast message over all channels if there are any + """ if self.channel_manager.has_channels(): self._send_message(msg) @@ -205,39 +239,60 @@ class ReplicateController(RPCHandler): else: logger.warning("Can not send data, leader loop has not started yet!") - def is_leader(self): - """ - Leader flag - """ - return self.enabled() and self.mode == ReplicateModeType.leader + async def send_initial_data(self, channel): + logger.info("Sending initial data through channel") - def enabled(self): - """ - Enabled flag - """ - return self.replicate_config.get('enabled', False) + # We first send pairlist data + # We should move this to a func in the RPC object + initial_data = { + "data_type": LeaderMessageType.pairlist, + "data": self.freqtrade.pairlists.whitelist + } - # ----------------------- LEADER LOGIC ------------------------------ + await channel.send(initial_data) - async def leader_loop(self): + async def _handle_leader_message(self, message: MessageType): + """ + Handle message received from a Leader + """ + type = message.get("data_type") + data = message.get("data") + + self._rpc._handle_emitted_data(type, data) + + # ---------------------------------------------------------------------- + + async def run_leader_mode(self): """ Main leader coroutine This starts all of the leader coros and registers the endpoint on the ApiServer """ - logger.info("Running rpc.replicate in Leader mode") - logger.info("-" * 15) - logger.info(f"API_KEY: {self.secret_api_key}") - logger.info("-" * 15) - self.register_leader_endpoint() + self.log_api_token() - try: - await self._broadcast_queue_data() - except Exception as e: - logger.error("Exception occurred in Leader loop: ") - logger.exception(e) + self._sub_tasks = [ + self._loop.create_task(self._broadcast_queue_data()) + ] + + return await asyncio.gather(*self._sub_tasks) + + async def run_follower_mode(self): + """ + Main follower coroutine + + This starts all of the follower connection coros + """ + + rpc_lock = asyncio.Lock() + + self._sub_tasks = [ + self._loop.create_task(self._handle_leader_connection(leader, rpc_lock)) + for leader in self.leaders_list + ] + + return await asyncio.gather(*self._sub_tasks) async def _broadcast_queue_data(self): """ @@ -261,8 +316,6 @@ class ReplicateController(RPCHandler): except asyncio.CancelledError: # Silently stop pass - except Exception as e: - logger.exception(e) async def get_api_token( self, @@ -279,7 +332,7 @@ class ReplicateController(RPCHandler): logger.info("Denying websocket request...") await websocket.close(code=status.WS_1008_POLICY_VIOLATION) - def register_leader_endpoint(self, path: str = "/replicate/ws"): + def register_leader_endpoint(self, path: str = "/signals/ws"): """ Attach and start the main leader loop to the ApiServer @@ -334,73 +387,16 @@ class ReplicateController(RPCHandler): logger.error(f"Failed to serve - {websocket.client}") await self.channel_manager.on_disconnect(websocket) - async def send_initial_data(self, channel): - logger.info("Sending initial data through channel") - - # We first send pairlist data - initial_data = { - "data_type": LeaderMessageType.pairlist, - "data": self.freqtrade.pairlists.whitelist - } - - await channel.send(initial_data) - - # -------------------------------FOLLOWER LOGIC---------------------------- - - async def follower_loop(self): - """ - Main follower coroutine - - This starts all of the follower connection coros - """ - logger.info("Starting rpc.replicate in Follower mode") - - responses = await self._connect_to_leaders() - - # Eventually add the ability to send requests to the Leader - # await self._send_requests() - - for result in responses: - if isinstance(result, Exception): - logger.debug(f"Exception in Follower loop: {result}") - traceback_message = ''.join(traceback.format_tb(result.__traceback__)) - logger.error(traceback_message) - - async def _handle_leader_message(self, message: MessageType): - """ - Handle message received from a Leader - """ - type = message.get("data_type") - data = message.get("data") - - self._rpc._handle_emitted_data(type, data) - - async def _connect_to_leaders(self): - """ - For each leader in `self.leaders_list` create a connection and - listen for data. - """ - rpc_lock = asyncio.Lock() - - logger.info("Starting connections to Leaders...") - - self.follower_tasks = [ - self._loop.create_task(self._handle_leader_connection(leader, rpc_lock)) - for leader in self.leaders_list - ] - return await asyncio.gather(*self.follower_tasks, return_exceptions=True) - async def _handle_leader_connection(self, leader, lock): """ Given a leader, connect and wait on data. If connection is lost, it will attempt to reconnect. """ try: - url, token = leader["url"], leader["token"] + url, token = leader["url"], leader["api_token"] websocket_url = f"{url}?token={token}" logger.info(f"Attempting to connect to Leader at: {url}") - # TODO: limit the amount of connection retries while True: try: async with websockets.connect(websocket_url) as ws: diff --git a/freqtrade/rpc/replicate/proxy.py b/freqtrade/rpc/external_signal/proxy.py similarity index 96% rename from freqtrade/rpc/replicate/proxy.py rename to freqtrade/rpc/external_signal/proxy.py index aae536b6d..36ff4a74e 100644 --- a/freqtrade/rpc/replicate/proxy.py +++ b/freqtrade/rpc/external_signal/proxy.py @@ -3,7 +3,7 @@ from typing import Union from fastapi import WebSocket as FastAPIWebSocket from websockets import WebSocketClientProtocol as WebSocket -from freqtrade.rpc.replicate.types import WebSocketType +from freqtrade.rpc.external_signal.types import WebSocketType class WebSocketProxy: diff --git a/freqtrade/rpc/replicate/serializer.py b/freqtrade/rpc/external_signal/serializer.py similarity index 96% rename from freqtrade/rpc/replicate/serializer.py rename to freqtrade/rpc/external_signal/serializer.py index 98bdc8934..2a0f53037 100644 --- a/freqtrade/rpc/replicate/serializer.py +++ b/freqtrade/rpc/external_signal/serializer.py @@ -5,7 +5,7 @@ from abc import ABC, abstractmethod import msgpack import orjson -from freqtrade.rpc.replicate.proxy import WebSocketProxy +from freqtrade.rpc.external_signal.proxy import WebSocketProxy logger = logging.getLogger(__name__) diff --git a/freqtrade/rpc/replicate/thread_queue.py b/freqtrade/rpc/external_signal/thread_queue.py similarity index 100% rename from freqtrade/rpc/replicate/thread_queue.py rename to freqtrade/rpc/external_signal/thread_queue.py diff --git a/freqtrade/rpc/replicate/types.py b/freqtrade/rpc/external_signal/types.py similarity index 100% rename from freqtrade/rpc/replicate/types.py rename to freqtrade/rpc/external_signal/types.py diff --git a/freqtrade/rpc/replicate/utils.py b/freqtrade/rpc/external_signal/utils.py similarity index 100% rename from freqtrade/rpc/replicate/utils.py rename to freqtrade/rpc/external_signal/utils.py diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 2c7b2ec72..68871a15a 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1109,16 +1109,22 @@ class RPC: external_pairlist.add_pairlist_data(pairlist) elif type == LeaderMessageType.analyzed_df: + # Convert the dataframe back from json key, value = data["key"], data["value"] pair, timeframe, candle_type = key - dataframe = json_to_dataframe(value) - dataprovider = self._freqtrade.dataprovider + # Skip any pairs that we don't have in the pairlist? + # leader_pairlist = self._freqtrade.pairlists._whitelist + # if pair not in leader_pairlist: + # return + + dataframe = json_to_dataframe(value) logger.debug(f"Received analyzed dataframe for {pair}") logger.debug(dataframe.tail()) # Add the dataframe to the dataprovider + dataprovider = self._freqtrade.dataprovider dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index 3d561cc8e..0a0e285a4 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -54,14 +54,14 @@ class RPCManager: # Enable Replicate mode # For this to be enabled, the API server must also be enabled - if config.get('replicate', {}).get('enabled', False): - logger.info('Enabling rpc.replicate') - from freqtrade.rpc.replicate import ReplicateController - replicate_rpc = ReplicateController(self._rpc, config, apiserver) - self.registered_modules.append(replicate_rpc) + if config.get('external_signal', {}).get('enabled', False): + logger.info('Enabling RPC.ExternalSignalController') + from freqtrade.rpc.external_signal import ExternalSignalController + external_signal_rpc = ExternalSignalController(self._rpc, config, apiserver) + self.registered_modules.append(external_signal_rpc) # Attach the controller to FreqTrade - freqtrade.replicate_controller = replicate_rpc + freqtrade.external_signal_controller = external_signal_rpc apiserver.start_api() diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 1084838ec..22a10b4d3 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -18,7 +18,6 @@ from freqtrade.enums.runmode import RunMode from freqtrade.exceptions import OperationalException, StrategyError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds from freqtrade.persistence import Order, PairLocks, Trade -from freqtrade.rpc.replicate import ReplicateController from freqtrade.strategy.hyper import HyperStrategyMixin from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators, _create_and_merge_informative_pair, @@ -111,7 +110,6 @@ class IStrategy(ABC, HyperStrategyMixin): # the dataprovider (dp) (access to other candles, historic data, ...) # and wallets - access to the current balance. dp: DataProvider - replicate_controller: Optional[ReplicateController] wallets: Optional[Wallets] = None # Filled from configuration stake_currency: str @@ -764,7 +762,7 @@ class IStrategy(ABC, HyperStrategyMixin): if not external_data: dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type) else: - dataframe, last_analyzed = self.dp.get_external_df(pair, self.timeframe, candle_type) + dataframe, _ = self.dp.get_external_df(pair, self.timeframe, candle_type) if not isinstance(dataframe, DataFrame) or dataframe.empty: logger.warning('Empty candle (OHLCV) data for pair %s', pair)