remove data waiting, remove explicit analyzing of external df

This commit is contained in:
Timothy Pogue 2022-08-31 10:40:26 -06:00
parent 115a901773
commit 510cf4f305
8 changed files with 182 additions and 207 deletions

View File

@ -7,7 +7,6 @@ Common Interface for bot and strategy to access data.
import logging
from collections import deque
from datetime import datetime, timezone
from threading import Event
from typing import Any, Dict, List, Optional, Tuple
from pandas import DataFrame
@ -15,9 +14,11 @@ from pandas import DataFrame
from freqtrade.configuration import TimeRange
from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe
from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RunMode, WaitDataPolicy
from freqtrade.enums import CandleType, RPCMessageType, RunMode
from freqtrade.exceptions import ExchangeError, OperationalException
from freqtrade.exchange import Exchange, timeframe_to_seconds
from freqtrade.misc import dataframe_to_json
from freqtrade.rpc import RPCManager
from freqtrade.util import PeriodicCache
@ -33,16 +34,18 @@ class DataProvider:
self,
config: dict,
exchange: Optional[Exchange],
rpc: Optional[RPCManager] = None,
pairlists=None
) -> None:
self._config = config
self._exchange = exchange
self._pairlists = pairlists
self.__rpc = rpc
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
self.__slice_index: Optional[int] = None
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
self.__external_pairs_df: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
self.__external_pairs_event: Dict[PairWithTimeframe, Tuple[int, Event]] = {}
self.__producer_pairs: List[str] = []
self._msg_queue: deque = deque()
self.__msg_cache = PeriodicCache(
@ -51,10 +54,7 @@ class DataProvider:
self._num_sources = len(
self._config.get('external_message_consumer', {}).get('producers', [])
)
self._wait_data_policy = self._config.get('external_message_consumer', {}).get(
'wait_data_policy', WaitDataPolicy.all)
self._wait_data_timeout = self._config.get('external_message_consumer', {}).get(
'wait_data_timeout', 5)
self.external_data_enabled = self._num_sources > 0
def _set_dataframe_max_index(self, limit_index: int):
"""
@ -83,6 +83,46 @@ class DataProvider:
self.__cached_pairs[pair_key] = (
dataframe, datetime.now(timezone.utc))
# For multiple producers we will want to merge the pairlists instead of overwriting
def set_producer_pairs(self, pairlist: List[str]):
"""
Set the pairs received to later be used.
This only supports 1 Producer right now.
:param pairlist: List of pairs
"""
self.__producer_pairs = pairlist.copy()
def get_producer_pairs(self) -> List[str]:
"""
Get the pairs cached from the producer
:returns: List of pairs
"""
return self.__producer_pairs
def emit_df(
self,
pair_key: PairWithTimeframe,
dataframe: DataFrame
) -> None:
"""
Send this dataframe as an ANALYZED_DF message to RPC
:param pair_key: PairWithTimeframe tuple
:param data: Tuple containing the DataFrame and the datetime it was cached
"""
if self.__rpc:
self.__rpc.send_msg(
{
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': pair_key,
'value': dataframe_to_json(dataframe)
}
}
)
def add_external_df(
self,
pair: str,
@ -101,7 +141,6 @@ class DataProvider:
# For multiple leaders, if the data already exists, we'd merge
self.__external_pairs_df[pair_key] = (dataframe, datetime.now(timezone.utc))
self._set_data_event(pair_key)
def get_external_df(
self,
@ -120,59 +159,11 @@ class DataProvider:
pair_key = (pair, timeframe, candle_type)
if pair_key not in self.__external_pairs_df:
self._wait_on_data(pair_key)
if pair_key not in self.__external_pairs_df:
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
# We don't have this data yet, return empty DataFrame and datetime (01-01-1970)
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
return self.__external_pairs_df[pair_key]
def _set_data_event(self, key: PairWithTimeframe):
"""
Depending on the WaitDataPolicy, if an event exists for this PairWithTimeframe
then set the event to release main thread from waiting.
:param key: PairWithTimeframe
"""
pair_event = self.__external_pairs_event.get(key)
if pair_event:
num_concat, event = pair_event
self.__external_pairs_event[key] = (num_concat + 1, event)
if self._wait_data_policy == WaitDataPolicy.one:
logger.debug("Setting Data as policy is One")
event.set()
elif self._wait_data_policy == WaitDataPolicy.all and num_concat == self._num_sources:
logger.debug("Setting Data as policy is all, and is complete")
event.set()
del self.__external_pairs_event[key]
def _wait_on_data(self, key: PairWithTimeframe):
"""
Depending on the WaitDataPolicy, we will create and wait on an event until
set that determines the full amount of data is available
:param key: PairWithTimeframe
"""
if self._wait_data_policy is not WaitDataPolicy.none:
pair, timeframe, candle_type = key
pair_event = Event()
self.__external_pairs_event[key] = (0, pair_event)
timeout = self._wait_data_timeout \
if self._wait_data_policy is not WaitDataPolicy.all else 0
timeout_str = f"for {timeout} seconds" if timeout > 0 else "indefinitely"
logger.debug(f"Waiting for external data on {pair} for {timeout_str}")
if timeout > 0:
pair_event.wait(timeout=timeout)
else:
pair_event.wait()
def add_pairlisthandler(self, pairlists) -> None:
"""
Allow adding pairlisthandler after initialization

View File

@ -3,7 +3,6 @@ from freqtrade.enums.backteststate import BacktestState
from freqtrade.enums.candletype import CandleType
from freqtrade.enums.exitchecktuple import ExitCheckTuple
from freqtrade.enums.exittype import ExitType
from freqtrade.enums.externalmessages import WaitDataPolicy
from freqtrade.enums.marginmode import MarginMode
from freqtrade.enums.ordertypevalue import OrderTypeValues
from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType

View File

@ -1,7 +0,0 @@
from enum import Enum
class WaitDataPolicy(str, Enum):
none = "none"
one = "one"
all = "all"

View File

@ -85,21 +85,19 @@ class FreqtradeBot(LoggingMixin):
# Keep this at the end of this initialization method.
self.rpc: RPCManager = RPCManager(self)
self.dataprovider = DataProvider(self.config, self.exchange, self.pairlists)
self.dataprovider = DataProvider(self.config, self.exchange, self.rpc, self.pairlists)
# Attach Dataprovider to strategy instance
self.strategy.dp = self.dataprovider
# Attach Wallets to strategy instance
self.strategy.wallets = self.wallets
# Attach rpc to strategy instance
self.strategy.rpc = self.rpc
# Initializing Edge only if enabled
self.edge = Edge(self.config, self.exchange, self.strategy) if \
self.config.get('edge', {}).get('enabled', False) else None
# Init ExternalMessageConsumer if enabled
self.emc = ExternalMessageConsumer(self.rpc._rpc, self.config) if \
self.emc = ExternalMessageConsumer(self.config, self.dataprovider) if \
self.config.get('external_message_consumer', {}).get('enabled', False) else None
self.active_pair_whitelist = self._refresh_active_whitelist()
@ -201,11 +199,11 @@ class FreqtradeBot(LoggingMixin):
strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)()
if self.emc:
leader_pairs = self.pairlists._whitelist
self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs)
else:
self.strategy.analyze(self.active_pair_whitelist)
# This just means we won't broadcast dataframes if we're listening to a producer
# Doesn't necessarily NEED to be this way, as maybe we'd like to broadcast
# even if we are using external dataframes in the future.
self.strategy.analyze(self.active_pair_whitelist,
external_data=self.dataprovider.external_data_enabled)
with self._exit_lock:
# Check for exchange cancelations, timeouts and user requested replace

View File

@ -1,22 +1,48 @@
import logging
from typing import Any, Dict
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc_optional
from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.rpc.api_server.deps import get_channel_manager
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
from freqtrade.rpc.api_server.ws.utils import is_websocket_alive
# from typing import Any, Dict
logger = logging.getLogger(__name__)
# Private router, protected by API Key authentication
router = APIRouter()
# We are passed a Channel object, we can only do sync functions on that channel object
def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel):
type, data = request.get('type'), request.get('data')
# If the request is empty, do nothing
if not data:
return
# If we have a request of type SUBSCRIBE, set the topics in this channel
if type == RPCRequestType.SUBSCRIBE:
if isinstance(data, list):
logger.error(f"Improper request from channel: {channel} - {request}")
return
# If all topics passed are a valid RPCMessageType, set subscriptions on channel
if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
logger.debug(f"{channel} subscribed to topics: {data}")
channel.set_subscriptions(data)
@router.websocket("/message/ws")
async def message_endpoint(
ws: WebSocket,
channel_manager=Depends(get_channel_manager),
rpc=Depends(get_rpc_optional)
channel_manager=Depends(get_channel_manager)
):
try:
if is_websocket_alive(ws):
@ -32,9 +58,8 @@ async def message_endpoint(
request = await channel.recv()
# Process the request here. Should this be a method of RPC?
if rpc:
logger.info(f"Request: {request}")
rpc._process_consumer_request(request, channel)
logger.info(f"Request: {request}")
_process_consumer_request(request, channel)
except WebSocketDisconnect:
# Handle client disconnects

View File

@ -12,9 +12,9 @@ from typing import Any, Dict
import websockets
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import RPCMessageType, RPCRequestType
from freqtrade.misc import json_to_dataframe, remove_entry_exit_signals
from freqtrade.rpc import RPC
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
@ -29,11 +29,11 @@ class ExternalMessageConsumer:
def __init__(
self,
rpc: RPC,
config: Dict[str, Any],
dataprovider: DataProvider
):
self._rpc = rpc
self._config = config
self._dp = dataprovider
self._running = False
self._thread = None
@ -99,12 +99,12 @@ class ExternalMessageConsumer:
"""
The main task coroutine
"""
rpc_lock = asyncio.Lock()
lock = asyncio.Lock()
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, rpc_lock))
self._loop.create_task(self._handle_producer_connection(producer, lock))
for producer in self.producers
]
@ -115,73 +115,90 @@ class ExternalMessageConsumer:
# Stop the loop once we are done
self._loop.stop()
async def _handle_producer_connection(self, producer, lock):
async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
"""
Main connection loop for the consumer
:param producer: Dictionary containing producer info: {'url': '', 'ws_token': ''}
:param lock: An asyncio Lock
"""
try:
while True:
try:
url, token = producer['url'], producer['ws_token']
ws_url = f"{url}?token={token}"
async with websockets.connect(ws_url) as ws:
logger.info("Connection successful")
channel = WebSocketChannel(ws)
# Tell the producer we only want these topics
# Should always be the first thing we send
await channel.send(
self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics)
)
# Now receive data, if none is within the time limit, ping
while True:
try:
message = await asyncio.wait_for(
channel.recv(),
timeout=5
)
async with lock:
# Handle the data here
# We use a lock because it will call RPC methods
self.handle_producer_message(message)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
await asyncio.wait_for(ping, timeout=self.ping_timeout)
logger.debug(f"Connection to {url} still alive...")
continue
except Exception:
logger.info(
f"Ping error {url} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except Exception as e:
logger.exception(e)
continue
except (
socket.gaierror,
ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
await self._create_connection(producer, lock)
except asyncio.CancelledError:
# Exit silently
pass
def compose_consumer_request(self, type_: str, data: Any) -> Dict[str, Any]:
async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
:param producer: Dictionary containing producer info: {'url': '', 'ws_token': ''}
:param lock: An asyncio Lock
"""
while self._running:
try:
url, token = producer['url'], producer['ws_token']
ws_url = f"{url}?token={token}"
# This will raise InvalidURI if the url is bad
async with websockets.connect(ws_url) as ws:
logger.info("Connection successful")
channel = WebSocketChannel(ws)
# Tell the producer we only want these topics
# Should always be the first thing we send
await channel.send(
self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics)
)
# Now receive data, if none is within the time limit, ping
while True:
try:
message = await asyncio.wait_for(
channel.recv(),
timeout=self.reply_timeout
)
async with lock:
# Handle the message
self.handle_producer_message(message)
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
await asyncio.wait_for(ping, timeout=self.ping_timeout)
logger.debug(f"Connection to {url} still alive...")
continue
except Exception:
logger.info(
f"Ping error {url} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except Exception as e:
logger.exception(e)
continue
except (
socket.gaierror,
ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
# Catch invalid ws_url, and break the loop
except websockets.exceptions.InvalidURI as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
def compose_consumer_request(self, type_: RPCRequestType, data: Any) -> Dict[str, Any]:
"""
Create a request for sending to a producer
@ -211,9 +228,8 @@ class ExternalMessageConsumer:
if message_type == RPCMessageType.WHITELIST:
pairlist = message_data
# Add the pairlist data to the ExternalPairlist plugin
external_pairlist = self._rpc._freqtrade.pairlists._pairlist_handlers[0]
external_pairlist.add_pairlist_data(pairlist)
# Add the pairlist data to the DataProvider
self._dp.set_producer_pairs(pairlist)
# Handle analyzed dataframes
elif message_type == RPCMessageType.ANALYZED_DF:
@ -230,5 +246,4 @@ class ExternalMessageConsumer:
dataframe = remove_entry_exit_signals(dataframe)
# Add the dataframe to the dataprovider
dataprovider = self._rpc._freqtrade.dataprovider
dataprovider.add_external_df(pair, timeframe, dataframe, candle_type)
self._dp.add_external_df(pair, timeframe, dataframe, candle_type)

View File

@ -19,8 +19,8 @@ from freqtrade.configuration.timerange import TimeRange
from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT
from freqtrade.data.history import load_data
from freqtrade.data.metrics import calculate_max_drawdown
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, RPCRequestType,
SignalDirection, State, TradingMode)
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, State,
TradingMode)
from freqtrade.exceptions import ExchangeError, PricingError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs
from freqtrade.loggers import bufferHandler
@ -1089,13 +1089,3 @@ class RPC:
'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT),
'last_process_ts': int(last_p.timestamp()),
}
# We are passed a Channel object, we can only do sync functions on that channel object
def _process_consumer_request(self, request, channel):
# Should we ensure that request is Dict[str, Any]?
type, data = request.get('type'), request.get('data')
if type == RPCRequestType.SUBSCRIBE:
if all([any(x.value == topic for x in RPCMessageType) for topic in data]):
logger.debug(f"{channel} subscribed to topics: {data}")
channel.set_subscriptions(data)

View File

@ -12,14 +12,13 @@ from pandas import DataFrame
from freqtrade.constants import ListPairsWithTimeframes
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, SignalDirection,
SignalTagType, SignalType, TradingMode)
from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, SignalTagType,
SignalType, TradingMode)
from freqtrade.enums.runmode import RunMode
from freqtrade.exceptions import OperationalException, StrategyError
from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds
from freqtrade.misc import dataframe_to_json, remove_entry_exit_signals
from freqtrade.misc import remove_entry_exit_signals
from freqtrade.persistence import Order, PairLocks, Trade
from freqtrade.rpc import RPCManager
from freqtrade.strategy.hyper import HyperStrategyMixin
from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators,
_create_and_merge_informative_pair,
@ -113,7 +112,6 @@ class IStrategy(ABC, HyperStrategyMixin):
# and wallets - access to the current balance.
dp: DataProvider
wallets: Optional[Wallets] = None
rpc: RPCManager
# Filled from configuration
stake_currency: str
# container variable for strategy source code
@ -731,16 +729,8 @@ class IStrategy(ABC, HyperStrategyMixin):
candle_type = self.config.get('candle_type_def', CandleType.SPOT)
self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type)
if not external_data:
self.rpc.send_msg(
{
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': (pair, self.timeframe, candle_type),
'value': dataframe_to_json(dataframe)
}
}
)
if populate_indicators:
self.dp.emit_df((pair, self.timeframe, candle_type), dataframe)
else:
logger.debug("Skipping TA Analysis for already analyzed candle")
@ -763,10 +753,7 @@ class IStrategy(ABC, HyperStrategyMixin):
"""
candle_type = self.config.get('candle_type_def', CandleType.SPOT)
if not external_data:
dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type)
else:
dataframe, _ = self.dp.get_external_df(pair, self.timeframe, candle_type)
dataframe = self.dp.ohlcv(pair, self.timeframe, candle_type)
if not isinstance(dataframe, DataFrame) or dataframe.empty:
logger.warning('Empty candle (OHLCV) data for pair %s', pair)
@ -790,38 +777,15 @@ class IStrategy(ABC, HyperStrategyMixin):
def analyze(
self,
pairs: List[str]
pairs: List[str],
external_data: bool = False
) -> None:
"""
Analyze all pairs using analyze_pair().
:param pairs: List of pairs to analyze
"""
for pair in pairs:
self.analyze_pair(pair)
def analyze_external(self, pairs: List[str], leader_pairs: List[str]) -> None:
"""
Analyze the pre-populated dataframes from the Leader
:param pairs: The active pair whitelist
:param leader_pairs: The list of pairs from the Leaders
"""
# Get the extra pairs not listed in Leader pairs, and process
# them normally.
# List order is not preserved when doing this!
# We use ^ instead of - for symmetric difference
extra_pairs = list(set(pairs) ^ set(leader_pairs))
# These would be the pairs that we have trades in, which means
# we would have to analyze them normally
# Eventually maybe request data from the Leader if we don't have it?
for pair in leader_pairs:
# Analyze the pairs, but get the dataframe from the external data
self.analyze_pair(pair, external_data=True)
for pair in extra_pairs:
self.analyze_pair(pair)
self.analyze_pair(pair, external_data)
@ staticmethod
def preserve_df(dataframe: DataFrame) -> Tuple[int, float, datetime]: