diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 21cead77f..3de73bb0d 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -7,6 +7,7 @@ Common Interface for bot and strategy to access data. import logging from collections import deque from datetime import datetime, timezone +from threading import Event from typing import Any, Dict, List, Optional, Tuple from pandas import DataFrame @@ -28,13 +29,16 @@ MAX_DATAFRAME_CANDLES = 1000 class DataProvider: - def __init__(self, config: dict, exchange: Optional[Exchange], pairlists=None) -> None: + def __init__(self, config: dict, exchange: Optional[Exchange], + pairlists=None, replicate_controller=None) -> None: self._config = config self._exchange = exchange self._pairlists = pairlists self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} self.__slice_index: Optional[int] = None self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} + self.__external_pairs_df: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} + self.__external_pairs_event: Dict[str, Event] = {} self._msg_queue: deque = deque() self.__msg_cache = PeriodicCache( @@ -63,9 +67,58 @@ class DataProvider: :param dataframe: analyzed dataframe :param candle_type: Any of the enum CandleType (must match trading mode!) """ - self.__cached_pairs[(pair, timeframe, candle_type)] = ( + pair_key = (pair, timeframe, candle_type) + self.__cached_pairs[pair_key] = ( dataframe, datetime.now(timezone.utc)) + def add_external_df( + self, + pair: str, + timeframe: str, + dataframe: DataFrame, + candle_type: CandleType + ) -> None: + """ + Add the DataFrame to the __external_pairs_df. If a pair event exists, + set it to release the main thread from waiting. + """ + pair_key = (pair, timeframe, candle_type) + + # Delete stale data + if pair_key in self.__external_pairs_df: + del self.__external_pairs_df[pair_key] + + self.__external_pairs_df[pair_key] = (dataframe, datetime.now(timezone.utc)) + + pair_event = self.__external_pairs_event.get(pair) + if pair_event: + logger.debug(f"Leader data for pair {pair_key} has been added") + pair_event.set() + + def get_external_df( + self, + pair: str, + timeframe: str, + candle_type: CandleType + ) -> DataFrame: + """ + If the pair exists in __external_pairs_df, return it. If it doesn't, + create a new threading Event in __external_pairs_event and wait on it. + """ + pair_key = (pair, timeframe, candle_type) + if pair_key not in self.__external_pairs_df: + pair_event = Event() + self.__external_pairs_event[pair] = pair_event + + logger.debug(f"Waiting on Leader data for: {pair_key}") + self.__external_pairs_event[pair].wait() + + if pair_key in self.__external_pairs_df: + return self.__external_pairs_df[pair_key] + + # Because of the waiting mechanism, this should never return + return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc)) + def add_pairlisthandler(self, pairlists) -> None: """ Allow adding pairlisthandler after initialization diff --git a/freqtrade/enums/replicate.py b/freqtrade/enums/replicate.py index 73be996c0..8d036f0b9 100644 --- a/freqtrade/enums/replicate.py +++ b/freqtrade/enums/replicate.py @@ -8,3 +8,4 @@ class ReplicateModeType(str, Enum): class LeaderMessageType(str, Enum): pairlist = "pairlist" + analyzed_df = "analyzed_df" diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index b2ec1448e..3b850dd4e 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -23,7 +23,7 @@ from freqtrade.exceptions import (DependencyException, ExchangeError, Insufficie InvalidOrderException, PricingError) from freqtrade.exchange import timeframe_to_minutes, timeframe_to_seconds from freqtrade.exchange.exchange import timeframe_to_next_date -from freqtrade.misc import safe_value_fallback, safe_value_fallback2 +from freqtrade.misc import dataframe_to_json, safe_value_fallback, safe_value_fallback2 from freqtrade.mixins import LoggingMixin from freqtrade.persistence import Order, PairLocks, Trade, init_db from freqtrade.plugins.pairlistmanager import PairListManager @@ -77,6 +77,8 @@ class FreqtradeBot(LoggingMixin): self.replicate_controller = None + self.pairlists = PairListManager(self.exchange, self.config) + # RPC runs in separate threads, can start handling external commands just after # initialization, even before Freqtradebot has a chance to start its throttling, # so anything in the Freqtradebot instance should be ready (initialized), including @@ -84,8 +86,6 @@ class FreqtradeBot(LoggingMixin): # Keep this at the end of this initialization method. self.rpc: RPCManager = RPCManager(self) - self.pairlists = PairListManager(self.exchange, self.config) - self.dataprovider = DataProvider(self.config, self.exchange, self.pairlists) # Attach Dataprovider to strategy instance @@ -93,6 +93,9 @@ class FreqtradeBot(LoggingMixin): # Attach Wallets to strategy instance self.strategy.wallets = self.wallets + # Attach ReplicateController to the strategy + # self.strategy.replicate_controller = self.replicate_controller + # Initializing Edge only if enabled self.edge = Edge(self.config, self.exchange, self.strategy) if \ self.config.get('edge', {}).get('enabled', False) else None @@ -194,7 +197,28 @@ class FreqtradeBot(LoggingMixin): strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)() - self.strategy.analyze(self.active_pair_whitelist) + if self.replicate_controller: + if not self.replicate_controller.is_leader(): + # Run Follower mode analyzing + leader_pairs = self.pairlists._whitelist + self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs) + else: + # We are leader, make sure to pass callback func to emit data + def emit_on_finish(pair, dataframe, timeframe, candle_type): + logger.debug(f"Emitting dataframe for {pair}") + return self.rpc.emit_data( + { + "data_type": LeaderMessageType.analyzed_df, + "data": { + "key": (pair, timeframe, candle_type), + "value": dataframe_to_json(dataframe) + } + } + ) + + self.strategy.analyze(self.active_pair_whitelist, finish_callback=emit_on_finish) + else: + self.strategy.analyze(self.active_pair_whitelist) with self._exit_lock: # Check for exchange cancelations, timeouts and user requested replace @@ -264,14 +288,13 @@ class FreqtradeBot(LoggingMixin): # Or should this class be made available to the PairListManager and ran # when filter_pairlist is called? + if self.replicate_controller: if self.replicate_controller.is_leader(): - self.replicate_controller.send_message( - { - "data_type": LeaderMessageType.pairlist, - "data": _whitelist - } - ) + self.rpc.emit_data({ + "data_type": LeaderMessageType.pairlist, + "data": _whitelist + }) # Calculating Edge positioning if self.edge: diff --git a/freqtrade/misc.py b/freqtrade/misc.py index c3968e61c..bc644a7ec 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -10,6 +10,7 @@ from typing import Any, Iterator, List from typing.io import IO from urllib.parse import urlparse +import pandas import rapidjson from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN @@ -249,3 +250,24 @@ def parse_db_uri_for_logging(uri: str): return uri pwd = parsed_db_uri.netloc.split(':')[1].split('@')[0] return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@') + + +def dataframe_to_json(dataframe: pandas.DataFrame) -> str: + """ + Serialize a DataFrame for transmission over the wire using JSON + :param dataframe: A pandas DataFrame + :returns: A JSON string of the pandas DataFrame + """ + return dataframe.to_json(orient='records') + + +def json_to_dataframe(data: str) -> pandas.DataFrame: + """ + Deserialize JSON into a DataFrame + :param data: A JSON string + :returns: A pandas DataFrame from the JSON string + """ + dataframe = pandas.read_json(data) + dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) + + return dataframe diff --git a/freqtrade/plugins/pairlist/ExternalPairList.py b/freqtrade/plugins/pairlist/ExternalPairList.py index 82fc12ff9..bd36c7cf3 100644 --- a/freqtrade/plugins/pairlist/ExternalPairList.py +++ b/freqtrade/plugins/pairlist/ExternalPairList.py @@ -81,11 +81,10 @@ class ExternalPairList(IPairList): seen = set(self._leader_pairs) for pair in pairlist: if pair in seen: - logger.debug(f"Encountered already existing pair {pair}") continue self._leader_pairs.append(pair) - if not self._has_data.is_set(): + if not self._has_data.is_set() and len(self._leader_pairs) > 0: self._has_data.set() def gen_pairlist(self, tickers: Dict) -> List[str]: diff --git a/freqtrade/rpc/replicate/__init__.py b/freqtrade/rpc/replicate/__init__.py index fd718197e..5cc2ae6a9 100644 --- a/freqtrade/rpc/replicate/__init__.py +++ b/freqtrade/rpc/replicate/__init__.py @@ -5,6 +5,7 @@ import asyncio import logging import secrets import socket +import traceback from threading import Event, Thread from typing import Any, Coroutine, Dict, Union @@ -17,6 +18,7 @@ from freqtrade.enums import LeaderMessageType, ReplicateModeType, RPCMessageType from freqtrade.rpc import RPC, RPCHandler from freqtrade.rpc.replicate.channel import ChannelManager from freqtrade.rpc.replicate.thread_queue import Queue as ThreadedQueue +from freqtrade.rpc.replicate.types import MessageType from freqtrade.rpc.replicate.utils import is_websocket_alive @@ -79,11 +81,11 @@ class ReplicateController(RPCHandler): self.mode = ReplicateModeType[self.replicate_config.get('mode', 'leader').lower()] self.leaders_list = self.replicate_config.get('leaders', []) - self.push_throttle_secs = self.replicate_config.get('push_throttle_secs', 1) + self.push_throttle_secs = self.replicate_config.get('push_throttle_secs', 0.1) self.reply_timeout = self.replicate_config.get('follower_reply_timeout', 10) self.ping_timeout = self.replicate_config.get('follower_ping_timeout', 2) - self.sleep_time = self.replicate_config.get('follower_sleep_time', 1) + self.sleep_time = self.replicate_config.get('follower_sleep_time', 5) if self.mode == ReplicateModeType.follower and len(self.leaders_list) == 0: raise ValueError("You must specify at least 1 leader in follower mode.") @@ -143,6 +145,8 @@ class ReplicateController(RPCHandler): except asyncio.CancelledError: pass + except Exception: + pass finally: self._loop.stop() @@ -170,22 +174,19 @@ class ReplicateController(RPCHandler): self._thread.join() - def send_msg(self, msg: Dict[str, Any]) -> None: + def send_msg(self, msg: MessageType) -> None: """ Support RPC calls """ if msg["type"] == RPCMessageType.EMIT_DATA: - self.send_message( - { - "data_type": msg.get("data_type"), - "data": msg.get("data") - } - ) + message = msg.get("message") + if message: + self.send_message(message) + else: + logger.error(f"Message is empty! {msg}") - def send_message(self, msg: Dict[str, Any]) -> None: - """ Push message through """ - - # We should probably do some type of schema validation here + def send_message(self, msg: MessageType) -> None: + """ Broadcast message over all channels if there are any """ if self.channel_manager.has_channels(): self._send_message(msg) @@ -193,12 +194,11 @@ class ReplicateController(RPCHandler): logger.debug("No listening followers, skipping...") pass - def _send_message(self, msg: Dict[Any, Any]): + def _send_message(self, msg: MessageType): """ Add data to the internal queue to be broadcasted. This func will block if the queue is full. This is meant to be called in the main thread. """ - if self._queue: queue = self._queue.sync_q queue.put(msg) # This will block if the queue is full @@ -226,7 +226,6 @@ class ReplicateController(RPCHandler): This starts all of the leader coros and registers the endpoint on the ApiServer """ - logger.info("Running rpc.replicate in Leader mode") logger.info("-" * 15) logger.info(f"API_KEY: {self.secret_api_key}") @@ -253,16 +252,17 @@ class ReplicateController(RPCHandler): # Get data from queue data = await async_queue.get() - logger.info(f"Found data - broadcasting: {data}") - # Broadcast it to everyone await self.channel_manager.broadcast(data) # Sleep await asyncio.sleep(self.push_throttle_secs) + except asyncio.CancelledError: # Silently stop pass + except Exception as e: + logger.exception(e) async def get_api_token( self, @@ -285,7 +285,6 @@ class ReplicateController(RPCHandler): :param path: The endpoint path """ - if not self.api_server: raise RuntimeError("The leader needs the ApiServer to be active") @@ -312,10 +311,13 @@ class ReplicateController(RPCHandler): # we may not have to send initial data at all. Further testing # required. + await self.send_initial_data(channel) + # Keep connection open until explicitly closed, and sleep try: while not channel.is_closed(): - await channel.recv() + request = await channel.recv() + logger.info(f"Follower request - {request}") except WebSocketDisconnect: # Handle client disconnects @@ -332,6 +334,17 @@ class ReplicateController(RPCHandler): logger.error(f"Failed to serve - {websocket.client}") await self.channel_manager.on_disconnect(websocket) + async def send_initial_data(self, channel): + logger.info("Sending initial data through channel") + + # We first send pairlist data + initial_data = { + "data_type": LeaderMessageType.pairlist, + "data": self.freqtrade.pairlists.whitelist + } + + await channel.send(initial_data) + # -------------------------------FOLLOWER LOGIC---------------------------- async def follower_loop(self): @@ -340,18 +353,27 @@ class ReplicateController(RPCHandler): This starts all of the follower connection coros """ - logger.info("Starting rpc.replicate in Follower mode") - try: - results = await self._connect_to_leaders() - except Exception as e: - logger.error("Exception occurred in Follower loop: ") - logger.exception(e) - finally: - for result in results: - if isinstance(result, Exception): - logger.debug(f"Exception in Follower loop: {result}") + responses = await self._connect_to_leaders() + + # Eventually add the ability to send requests to the Leader + # await self._send_requests() + + for result in responses: + if isinstance(result, Exception): + logger.debug(f"Exception in Follower loop: {result}") + traceback_message = ''.join(traceback.format_tb(result.__traceback__)) + logger.error(traceback_message) + + async def _handle_leader_message(self, message: MessageType): + """ + Handle message received from a Leader + """ + type = message.get("data_type") + data = message.get("data") + + self._rpc._handle_emitted_data(type, data) async def _connect_to_leaders(self): """ @@ -375,7 +397,6 @@ class ReplicateController(RPCHandler): """ try: url, token = leader["url"], leader["token"] - websocket_url = f"{url}?token={token}" logger.info(f"Attempting to connect to Leader at: {url}") @@ -384,6 +405,7 @@ class ReplicateController(RPCHandler): try: async with websockets.connect(websocket_url) as ws: channel = await self.channel_manager.on_connect(ws) + logger.info(f"Connection to Leader at {url} successful") while True: try: data = await asyncio.wait_for( @@ -420,13 +442,3 @@ class ReplicateController(RPCHandler): except asyncio.CancelledError: pass - - async def _handle_leader_message(self, message: Dict[str, Any]): - type = message.get('data_type') - data = message.get('data') - - logger.info(f"Received message from Leader: {type} - {data}") - - if type == LeaderMessageType.pairlist: - # Add the data to the ExternalPairlist - self.freqtrade.pairlists._pairlist_handlers[0].add_pairlist_data(data) diff --git a/freqtrade/rpc/replicate/channel.py b/freqtrade/rpc/replicate/channel.py index 7aa316ff5..62ed3e025 100644 --- a/freqtrade/rpc/replicate/channel.py +++ b/freqtrade/rpc/replicate/channel.py @@ -2,7 +2,7 @@ import logging from typing import Type from freqtrade.rpc.replicate.proxy import WebSocketProxy -from freqtrade.rpc.replicate.serializer import JSONWebSocketSerializer, WebSocketSerializer +from freqtrade.rpc.replicate.serializer import MsgPackWebSocketSerializer, WebSocketSerializer from freqtrade.rpc.replicate.types import WebSocketType @@ -17,7 +17,7 @@ class WebSocketChannel: def __init__( self, websocket: WebSocketType, - serializer_cls: Type[WebSocketSerializer] = JSONWebSocketSerializer + serializer_cls: Type[WebSocketSerializer] = MsgPackWebSocketSerializer ): # The WebSocket object self._websocket = WebSocketProxy(websocket) @@ -34,6 +34,7 @@ class WebSocketChannel: """ Send data on the wrapped websocket """ + # logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}") await self._wrapped_ws.send(data) async def recv(self): @@ -116,6 +117,17 @@ class ChannelManager: # Handle cannot send after close cases await self.on_disconnect(websocket) + async def send_direct(self, channel, data): + """ + Send data directly through direct_channel only + + :param direct_channel: The WebSocketChannel object to send data through + :param data: The data to send + """ + # We iterate over the channels to get reference to the websocket object + # so we can disconnect incase of failure + await channel.send(data) + def has_channels(self): """ Flag for more than 0 channels diff --git a/freqtrade/rpc/replicate/serializer.py b/freqtrade/rpc/replicate/serializer.py index 717458f09..98bdc8934 100644 --- a/freqtrade/rpc/replicate/serializer.py +++ b/freqtrade/rpc/replicate/serializer.py @@ -1,9 +1,16 @@ import json +import logging from abc import ABC, abstractmethod +import msgpack +import orjson + from freqtrade.rpc.replicate.proxy import WebSocketProxy +logger = logging.getLogger(__name__) + + class WebSocketSerializer(ABC): def __init__(self, websocket: WebSocketProxy): self._websocket: WebSocketProxy = websocket @@ -34,9 +41,25 @@ class WebSocketSerializer(ABC): class JSONWebSocketSerializer(WebSocketSerializer): def _serialize(self, data): - # json expects string not bytes return json.dumps(data) def _deserialize(self, data): - # The WebSocketSerializer gives bytes not string return json.loads(data) + + +class ORJSONWebSocketSerializer(WebSocketSerializer): + ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY + + def _serialize(self, data): + return orjson.dumps(data, option=self.ORJSON_OPTIONS) + + def _deserialize(self, data): + return orjson.loads(data, option=self.ORJSON_OPTIONS) + + +class MsgPackWebSocketSerializer(WebSocketSerializer): + def _serialize(self, data): + return msgpack.packb(data, use_bin_type=True) + + def _deserialize(self, data): + return msgpack.unpackb(data, raw=False) diff --git a/freqtrade/rpc/replicate/types.py b/freqtrade/rpc/replicate/types.py index 763147196..814fe6649 100644 --- a/freqtrade/rpc/replicate/types.py +++ b/freqtrade/rpc/replicate/types.py @@ -1,7 +1,8 @@ -from typing import TypeVar +from typing import Any, Dict, TypeVar from fastapi import WebSocket as FastAPIWebSocket from websockets import WebSocketClientProtocol as WebSocket WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket) +MessageType = Dict[str, Any] diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index ed7f13a96..2c7b2ec72 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -19,12 +19,12 @@ from freqtrade.configuration.timerange import TimeRange from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT from freqtrade.data.history import load_data from freqtrade.data.metrics import calculate_max_drawdown -from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, State, - TradingMode) +from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, LeaderMessageType, + SignalDirection, State, TradingMode) from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler -from freqtrade.misc import decimals_per_coin, shorten_date +from freqtrade.misc import decimals_per_coin, json_to_dataframe, shorten_date from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence.models import PairLock from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist @@ -1089,3 +1089,36 @@ class RPC: 'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT), 'last_process_ts': int(last_p.timestamp()), } + + def _handle_emitted_data(self, type, data): + """ + Handles the emitted data from the Leaders + + :param type: The data_type of the data + :param data: The data + """ + logger.debug(f"Handling emitted data of type ({type})") + + if type == LeaderMessageType.pairlist: + pairlist = data + + logger.debug(pairlist) + + # Add the pairlist data to the ExternalPairList object + external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0] + external_pairlist.add_pairlist_data(pairlist) + + elif type == LeaderMessageType.analyzed_df: + # Convert the dataframe back from json + key, value = data["key"], data["value"] + + pair, timeframe, candle_type = key + dataframe = json_to_dataframe(value) + + dataprovider = self._freqtrade.dataprovider + + logger.debug(f"Received analyzed dataframe for {pair}") + logger.debug(dataframe.tail()) + + # Add the dataframe to the dataprovider + dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index 8eaec21ea..3d561cc8e 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -20,6 +20,7 @@ class RPCManager: def __init__(self, freqtrade) -> None: """ Initializes all enabled rpc modules """ self.registered_modules: List[RPCHandler] = [] + self._freqtrade = freqtrade self._rpc = RPC(freqtrade) config = freqtrade.config # Enable telegram @@ -82,7 +83,8 @@ class RPCManager: 'status': 'stopping bot' } """ - logger.info('Sending rpc message: %s', msg) + if msg.get("type") != RPCMessageType.EMIT_DATA: + logger.info('Sending rpc message: %s', msg) if 'pair' in msg: msg.update({ 'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair']) @@ -141,3 +143,12 @@ class RPCManager: 'type': RPCMessageType.STARTUP, 'status': f'Using Protections: \n{prots}' }) + + def emit_data(self, data: Dict[str, Any]): + """ + Send a message via RPC with type RPCMessageType.EMIT_DATA + """ + self.send_msg({ + "type": RPCMessageType.EMIT_DATA, + "message": data + }) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 79dbd4c69..ddd10dd8e 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -5,7 +5,7 @@ This module defines the interface to apply for strategies import logging from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone -from typing import Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import arrow from pandas import DataFrame @@ -18,6 +18,7 @@ from freqtrade.enums.runmode import RunMode from freqtrade.exceptions import OperationalException, StrategyError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds from freqtrade.persistence import Order, PairLocks, Trade +from freqtrade.rpc.replicate import ReplicateController from freqtrade.strategy.hyper import HyperStrategyMixin from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators, _create_and_merge_informative_pair, @@ -110,6 +111,7 @@ class IStrategy(ABC, HyperStrategyMixin): # the dataprovider (dp) (access to other candles, historic data, ...) # and wallets - access to the current balance. dp: DataProvider + replicate_controller: Optional[ReplicateController] wallets: Optional[Wallets] = None # Filled from configuration stake_currency: str @@ -123,6 +125,7 @@ class IStrategy(ABC, HyperStrategyMixin): self.config = config # Dict to determine if analysis is necessary self._last_candle_seen_per_pair: Dict[str, datetime] = {} + self._last_candle_seen_external: Dict[str, datetime] = {} super().__init__(config) # Gather informative pairs from @informative-decorated methods. @@ -678,7 +681,12 @@ class IStrategy(ABC, HyperStrategyMixin): lock_time = timeframe_to_next_date(self.timeframe, candle_date) return PairLocks.is_pair_locked(pair, lock_time, side=side) - def analyze_ticker(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + def analyze_ticker( + self, + dataframe: DataFrame, + metadata: dict, + populate_indicators: bool = True + ) -> DataFrame: """ Parses the given candle (OHLCV) data and returns a populated DataFrame add several TA indicators and entry order signal to it @@ -687,12 +695,19 @@ class IStrategy(ABC, HyperStrategyMixin): :return: DataFrame of candle (OHLCV) data with indicator data and signals added """ logger.debug("TA Analysis Launched") - dataframe = self.advise_indicators(dataframe, metadata) + if populate_indicators: + dataframe = self.advise_indicators(dataframe, metadata) dataframe = self.advise_entry(dataframe, metadata) dataframe = self.advise_exit(dataframe, metadata) return dataframe - def _analyze_ticker_internal(self, dataframe: DataFrame, metadata: dict) -> DataFrame: + def _analyze_ticker_internal( + self, + dataframe: DataFrame, + metadata: dict, + external_data: bool = False, + finish_callback: Optional[Callable] = None, + ) -> DataFrame: """ Parses the given candle (OHLCV) data and returns a populated DataFrame add several TA indicators and buy signal to it @@ -707,12 +722,19 @@ class IStrategy(ABC, HyperStrategyMixin): # always run if process_only_new_candles is set to false if (not self.process_only_new_candles or self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]['date']): + + populate_indicators = not external_data # Defs that only make change on new candle data. - dataframe = self.analyze_ticker(dataframe, metadata) + dataframe = self.analyze_ticker(dataframe, metadata, populate_indicators) + self._last_candle_seen_per_pair[pair] = dataframe.iloc[-1]['date'] - self.dp._set_cached_df( - pair, self.timeframe, dataframe, - candle_type=self.config.get('candle_type_def', CandleType.SPOT)) + + candle_type = self.config.get('candle_type_def', CandleType.SPOT) + self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) + + if finish_callback: + finish_callback(pair, dataframe, self.timeframe, candle_type) + else: logger.debug("Skipping TA Analysis for already analyzed candle") dataframe[SignalType.ENTER_LONG.value] = 0 @@ -726,16 +748,25 @@ class IStrategy(ABC, HyperStrategyMixin): return dataframe - def analyze_pair(self, pair: str) -> None: + def analyze_pair( + self, + pair: str, + external_data: bool = False, + finish_callback: Optional[Callable] = None, + ) -> None: """ Fetch data for this pair from dataprovider and analyze. Stores the dataframe into the dataprovider. The analyzed dataframe is then accessible via `dp.get_analyzed_dataframe()`. :param pair: Pair to analyze. """ - dataframe = self.dp.ohlcv( - pair, self.timeframe, candle_type=self.config.get('candle_type_def', CandleType.SPOT) - ) + candle_type = self.config.get('candle_type_def', CandleType.SPOT) + + if not external_data: + dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type) + else: + dataframe, last_analyzed = self.dp.get_external_df(pair, self.timeframe, candle_type) + if not isinstance(dataframe, DataFrame) or dataframe.empty: logger.warning('Empty candle (OHLCV) data for pair %s', pair) return @@ -745,7 +776,7 @@ class IStrategy(ABC, HyperStrategyMixin): dataframe = strategy_safe_wrapper( self._analyze_ticker_internal, message="" - )(dataframe, {'pair': pair}) + )(dataframe, {'pair': pair}, external_data, finish_callback) self.assert_df(dataframe, df_len, df_close, df_date) except StrategyError as error: @@ -756,15 +787,43 @@ class IStrategy(ABC, HyperStrategyMixin): logger.warning('Empty dataframe for pair %s', pair) return - def analyze(self, pairs: List[str]) -> None: + def analyze( + self, + pairs: List[str], + finish_callback: Optional[Callable] = None + ) -> None: """ Analyze all pairs using analyze_pair(). :param pairs: List of pairs to analyze """ for pair in pairs: + self.analyze_pair(pair, finish_callback=finish_callback) + + def analyze_external(self, pairs: List[str], leader_pairs: List[str]) -> None: + """ + Analyze the pre-populated dataframes from the Leader + + :param pairs: The active pair whitelist + :param leader_pairs: The list of pairs from the Leaders + """ + + # Get the extra pairs not listed in Leader pairs, and process + # them normally. + # List order is not preserved when doing this! + # We use ^ instead of - for symmetric difference + # What do we do with these? + extra_pairs = list(set(pairs) ^ set(leader_pairs)) + # These would be the pairs that we have trades in, which means + # we would have to analyze them normally + + for pair in leader_pairs: + # Analyze the pairs, but get the dataframe from the external data + self.analyze_pair(pair, external_data=True) + + for pair in extra_pairs: self.analyze_pair(pair) - @staticmethod + @ staticmethod def preserve_df(dataframe: DataFrame) -> Tuple[int, float, datetime]: """ keep some data for dataframes """ return len(dataframe), dataframe["close"].iloc[-1], dataframe["date"].iloc[-1] @@ -1185,6 +1244,9 @@ class IStrategy(ABC, HyperStrategyMixin): dataframe = _create_and_merge_informative_pair( self, dataframe, metadata, inf_data, populate_fn) + # If in follower mode, get analyzed dataframe from leader df's in dp + # otherise run populate_indicators + return self.populate_indicators(dataframe, metadata) def advise_entry(self, dataframe: DataFrame, metadata: dict) -> DataFrame: diff --git a/requirements-replicate.txt b/requirements-replicate.txt index 7ee351d9d..2c994ea2f 100644 --- a/requirements-replicate.txt +++ b/requirements-replicate.txt @@ -3,3 +3,4 @@ # Required for follower websockets +msgpack