Merge pull request #7842 from wizrds/feat/refactor-emc
Change to broadcasting single candles in Producer/Consumer
This commit is contained in:
commit
2828255435
@ -61,6 +61,7 @@ USERPATH_FREQAIMODELS = 'freqaimodels'
|
|||||||
|
|
||||||
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
|
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
|
||||||
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
|
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
|
||||||
|
FULL_DATAFRAME_THRESHOLD = 100
|
||||||
|
|
||||||
ENV_VAR_PREFIX = 'FREQTRADE__'
|
ENV_VAR_PREFIX = 'FREQTRADE__'
|
||||||
|
|
||||||
|
@ -9,14 +9,16 @@ from collections import deque
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Dict, List, Optional, Tuple
|
from typing import Any, Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from pandas import DataFrame
|
from pandas import DataFrame, to_timedelta
|
||||||
|
|
||||||
from freqtrade.configuration import TimeRange
|
from freqtrade.configuration import TimeRange
|
||||||
from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe
|
from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes,
|
||||||
|
PairWithTimeframe)
|
||||||
from freqtrade.data.history import load_pair_history
|
from freqtrade.data.history import load_pair_history
|
||||||
from freqtrade.enums import CandleType, RPCMessageType, RunMode
|
from freqtrade.enums import CandleType, RPCMessageType, RunMode
|
||||||
from freqtrade.exceptions import ExchangeError, OperationalException
|
from freqtrade.exceptions import ExchangeError, OperationalException
|
||||||
from freqtrade.exchange import Exchange, timeframe_to_seconds
|
from freqtrade.exchange import Exchange, timeframe_to_seconds
|
||||||
|
from freqtrade.misc import append_candles_to_dataframe
|
||||||
from freqtrade.rpc import RPCManager
|
from freqtrade.rpc import RPCManager
|
||||||
from freqtrade.util import PeriodicCache
|
from freqtrade.util import PeriodicCache
|
||||||
|
|
||||||
@ -120,7 +122,7 @@ class DataProvider:
|
|||||||
'type': RPCMessageType.ANALYZED_DF,
|
'type': RPCMessageType.ANALYZED_DF,
|
||||||
'data': {
|
'data': {
|
||||||
'key': pair_key,
|
'key': pair_key,
|
||||||
'df': dataframe,
|
'df': dataframe.tail(1),
|
||||||
'la': datetime.now(timezone.utc)
|
'la': datetime.now(timezone.utc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,7 +133,7 @@ class DataProvider:
|
|||||||
'data': pair_key,
|
'data': pair_key,
|
||||||
})
|
})
|
||||||
|
|
||||||
def _add_external_df(
|
def _replace_external_df(
|
||||||
self,
|
self,
|
||||||
pair: str,
|
pair: str,
|
||||||
dataframe: DataFrame,
|
dataframe: DataFrame,
|
||||||
@ -157,6 +159,85 @@ class DataProvider:
|
|||||||
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
|
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
|
||||||
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
|
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
|
||||||
|
|
||||||
|
def _add_external_df(
|
||||||
|
self,
|
||||||
|
pair: str,
|
||||||
|
dataframe: DataFrame,
|
||||||
|
last_analyzed: datetime,
|
||||||
|
timeframe: str,
|
||||||
|
candle_type: CandleType,
|
||||||
|
producer_name: str = "default"
|
||||||
|
) -> Tuple[bool, int]:
|
||||||
|
"""
|
||||||
|
Append a candle to the existing external dataframe. The incoming dataframe
|
||||||
|
must have at least 1 candle.
|
||||||
|
|
||||||
|
:param pair: pair to get the data for
|
||||||
|
:param timeframe: Timeframe to get data for
|
||||||
|
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||||
|
:returns: False if the candle could not be appended, or the int number of missing candles.
|
||||||
|
"""
|
||||||
|
pair_key = (pair, timeframe, candle_type)
|
||||||
|
|
||||||
|
if dataframe.empty:
|
||||||
|
# The incoming dataframe must have at least 1 candle
|
||||||
|
return (False, 0)
|
||||||
|
|
||||||
|
if len(dataframe) >= FULL_DATAFRAME_THRESHOLD:
|
||||||
|
# This is likely a full dataframe
|
||||||
|
# Add the dataframe to the dataprovider
|
||||||
|
self._replace_external_df(
|
||||||
|
pair,
|
||||||
|
dataframe,
|
||||||
|
last_analyzed=last_analyzed,
|
||||||
|
timeframe=timeframe,
|
||||||
|
candle_type=candle_type,
|
||||||
|
producer_name=producer_name
|
||||||
|
)
|
||||||
|
return (True, 0)
|
||||||
|
|
||||||
|
if (producer_name not in self.__producer_pairs_df
|
||||||
|
or pair_key not in self.__producer_pairs_df[producer_name]):
|
||||||
|
# We don't have data from this producer yet,
|
||||||
|
# or we don't have data for this pair_key
|
||||||
|
# return False and 1000 for the full df
|
||||||
|
return (False, 1000)
|
||||||
|
|
||||||
|
existing_df, _ = self.__producer_pairs_df[producer_name][pair_key]
|
||||||
|
|
||||||
|
# CHECK FOR MISSING CANDLES
|
||||||
|
timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas
|
||||||
|
local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy
|
||||||
|
incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming
|
||||||
|
|
||||||
|
# Remove existing candles that are newer than the incoming first candle
|
||||||
|
existing_df1 = existing_df[existing_df['date'] < incoming_first]
|
||||||
|
|
||||||
|
candle_difference = (incoming_first - local_last) / timeframe_delta
|
||||||
|
|
||||||
|
# If the difference divided by the timeframe is 1, then this
|
||||||
|
# is the candle we want and the incoming data isn't missing any.
|
||||||
|
# If the candle_difference is more than 1, that means
|
||||||
|
# we missed some candles between our data and the incoming
|
||||||
|
# so return False and candle_difference.
|
||||||
|
if candle_difference > 1:
|
||||||
|
return (False, candle_difference)
|
||||||
|
if existing_df1.empty:
|
||||||
|
appended_df = dataframe
|
||||||
|
else:
|
||||||
|
appended_df = append_candles_to_dataframe(existing_df1, dataframe)
|
||||||
|
|
||||||
|
# Everything is good, we appended
|
||||||
|
self._replace_external_df(
|
||||||
|
pair,
|
||||||
|
appended_df,
|
||||||
|
last_analyzed=last_analyzed,
|
||||||
|
timeframe=timeframe,
|
||||||
|
candle_type=candle_type,
|
||||||
|
producer_name=producer_name
|
||||||
|
)
|
||||||
|
return (True, 0)
|
||||||
|
|
||||||
def get_producer_df(
|
def get_producer_df(
|
||||||
self,
|
self,
|
||||||
pair: str,
|
pair: str,
|
||||||
|
@ -301,3 +301,21 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame):
|
|||||||
dataframe[SignalTagType.EXIT_TAG.value] = None
|
dataframe[SignalTagType.EXIT_TAG.value] = None
|
||||||
|
|
||||||
return dataframe
|
return dataframe
|
||||||
|
|
||||||
|
|
||||||
|
def append_candles_to_dataframe(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
|
||||||
|
"""
|
||||||
|
Append the `right` dataframe to the `left` dataframe
|
||||||
|
|
||||||
|
:param left: The full dataframe you want appended to
|
||||||
|
:param right: The new dataframe containing the data you want appended
|
||||||
|
:returns: The dataframe with the right data in it
|
||||||
|
"""
|
||||||
|
if left.iloc[-1]['date'] != right.iloc[-1]['date']:
|
||||||
|
left = pd.concat([left, right])
|
||||||
|
|
||||||
|
# Only keep the last 1500 candles in memory
|
||||||
|
left = left[-1500:] if len(left) > 1500 else left
|
||||||
|
left.reset_index(drop=True, inplace=True)
|
||||||
|
|
||||||
|
return left
|
||||||
|
@ -91,9 +91,10 @@ async def _process_consumer_request(
|
|||||||
elif type == RPCRequestType.ANALYZED_DF:
|
elif type == RPCRequestType.ANALYZED_DF:
|
||||||
# Limit the amount of candles per dataframe to 'limit' or 1500
|
# Limit the amount of candles per dataframe to 'limit' or 1500
|
||||||
limit = min(data.get('limit', 1500), 1500) if data else None
|
limit = min(data.get('limit', 1500), 1500) if data else None
|
||||||
|
pair = data.get('pair', None) if data else None
|
||||||
|
|
||||||
# For every pair in the generator, send a separate message
|
# For every pair in the generator, send a separate message
|
||||||
for message in rpc._ws_request_analyzed_df(limit):
|
for message in rpc._ws_request_analyzed_df(limit, pair):
|
||||||
# Format response
|
# Format response
|
||||||
response = WSAnalyzedDFMessage(data=message)
|
response = WSAnalyzedDFMessage(data=message)
|
||||||
await channel.send(response.dict(exclude_none=True))
|
await channel.send(response.dict(exclude_none=True))
|
||||||
|
@ -27,7 +27,8 @@ class WebSocketChannel:
|
|||||||
self,
|
self,
|
||||||
websocket: WebSocketType,
|
websocket: WebSocketType,
|
||||||
channel_id: Optional[str] = None,
|
channel_id: Optional[str] = None,
|
||||||
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer
|
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
|
||||||
|
send_throttle: float = 0.01
|
||||||
):
|
):
|
||||||
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
|
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
|
||||||
self._websocket = WebSocketProxy(websocket)
|
self._websocket = WebSocketProxy(websocket)
|
||||||
@ -41,6 +42,7 @@ class WebSocketChannel:
|
|||||||
self._send_times: Deque[float] = deque([], maxlen=10)
|
self._send_times: Deque[float] = deque([], maxlen=10)
|
||||||
# High limit defaults to 3 to start
|
# High limit defaults to 3 to start
|
||||||
self._send_high_limit = 3
|
self._send_high_limit = 3
|
||||||
|
self._send_throttle = send_throttle
|
||||||
|
|
||||||
# The subscribed message types
|
# The subscribed message types
|
||||||
self._subscriptions: List[str] = []
|
self._subscriptions: List[str] = []
|
||||||
@ -106,7 +108,8 @@ class WebSocketChannel:
|
|||||||
|
|
||||||
# Explicitly give control back to event loop as
|
# Explicitly give control back to event loop as
|
||||||
# websockets.send does not
|
# websockets.send does not
|
||||||
await asyncio.sleep(0.01)
|
# Also throttles how fast we send
|
||||||
|
await asyncio.sleep(self._send_throttle)
|
||||||
|
|
||||||
async def recv(self):
|
async def recv(self):
|
||||||
"""
|
"""
|
||||||
|
@ -47,7 +47,7 @@ class WSWhitelistRequest(WSRequestSchema):
|
|||||||
|
|
||||||
class WSAnalyzedDFRequest(WSRequestSchema):
|
class WSAnalyzedDFRequest(WSRequestSchema):
|
||||||
type: RPCRequestType = RPCRequestType.ANALYZED_DF
|
type: RPCRequestType = RPCRequestType.ANALYZED_DF
|
||||||
data: Dict[str, Any] = {"limit": 1500}
|
data: Dict[str, Any] = {"limit": 1500, "pair": None}
|
||||||
|
|
||||||
|
|
||||||
# ------------------------------ MESSAGE SCHEMAS ----------------------------
|
# ------------------------------ MESSAGE SCHEMAS ----------------------------
|
||||||
|
@ -8,15 +8,17 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
import socket
|
import socket
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict
|
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
|
||||||
|
|
||||||
import websockets
|
import websockets
|
||||||
from pydantic import ValidationError
|
from pydantic import ValidationError
|
||||||
|
|
||||||
|
from freqtrade.constants import FULL_DATAFRAME_THRESHOLD
|
||||||
from freqtrade.data.dataprovider import DataProvider
|
from freqtrade.data.dataprovider import DataProvider
|
||||||
from freqtrade.enums import RPCMessageType
|
from freqtrade.enums import RPCMessageType
|
||||||
from freqtrade.misc import remove_entry_exit_signals
|
from freqtrade.misc import remove_entry_exit_signals
|
||||||
from freqtrade.rpc.api_server.ws import WebSocketChannel
|
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
|
||||||
|
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
|
||||||
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
|
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
|
||||||
WSMessageSchema, WSRequestSchema,
|
WSMessageSchema, WSRequestSchema,
|
||||||
WSSubscribeRequest, WSWhitelistMessage,
|
WSSubscribeRequest, WSWhitelistMessage,
|
||||||
@ -38,6 +40,10 @@ class Producer(TypedDict):
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]):
|
||||||
|
return schema.dict(exclude_none=True)
|
||||||
|
|
||||||
|
|
||||||
class ExternalMessageConsumer:
|
class ExternalMessageConsumer:
|
||||||
"""
|
"""
|
||||||
The main controller class for consuming external messages from
|
The main controller class for consuming external messages from
|
||||||
@ -92,6 +98,8 @@ class ExternalMessageConsumer:
|
|||||||
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
|
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self._channel_streams: Dict[str, MessageStream] = {}
|
||||||
|
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
@ -118,6 +126,8 @@ class ExternalMessageConsumer:
|
|||||||
logger.info("Stopping ExternalMessageConsumer")
|
logger.info("Stopping ExternalMessageConsumer")
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
||||||
|
self._channel_streams = {}
|
||||||
|
|
||||||
if self._sub_tasks:
|
if self._sub_tasks:
|
||||||
# Cancel sub tasks
|
# Cancel sub tasks
|
||||||
for task in self._sub_tasks:
|
for task in self._sub_tasks:
|
||||||
@ -175,7 +185,6 @@ class ExternalMessageConsumer:
|
|||||||
:param producer: Dictionary containing producer info
|
:param producer: Dictionary containing producer info
|
||||||
:param lock: An asyncio Lock
|
:param lock: An asyncio Lock
|
||||||
"""
|
"""
|
||||||
channel = None
|
|
||||||
while self._running:
|
while self._running:
|
||||||
try:
|
try:
|
||||||
host, port = producer['host'], producer['port']
|
host, port = producer['host'], producer['port']
|
||||||
@ -190,19 +199,21 @@ class ExternalMessageConsumer:
|
|||||||
max_size=self.message_size_limit,
|
max_size=self.message_size_limit,
|
||||||
ping_interval=None
|
ping_interval=None
|
||||||
) as ws:
|
) as ws:
|
||||||
channel = WebSocketChannel(ws, channel_id=name)
|
async with create_channel(
|
||||||
|
ws,
|
||||||
|
channel_id=name,
|
||||||
|
send_throttle=0.5
|
||||||
|
) as channel:
|
||||||
|
|
||||||
logger.info(f"Producer connection success - {channel}")
|
# Create the message stream for this channel
|
||||||
|
self._channel_streams[name] = MessageStream()
|
||||||
|
|
||||||
# Now request the initial data from this Producer
|
# Run the channel tasks while connected
|
||||||
for request in self._initial_requests:
|
await channel.run_channel_tasks(
|
||||||
await channel.send(
|
self._receive_messages(channel, producer, lock),
|
||||||
request.dict(exclude_none=True)
|
self._send_requests(channel, self._channel_streams[name])
|
||||||
)
|
)
|
||||||
|
|
||||||
# Now receive data, if none is within the time limit, ping
|
|
||||||
await self._receive_messages(channel, producer, lock)
|
|
||||||
|
|
||||||
except (websockets.exceptions.InvalidURI, ValueError) as e:
|
except (websockets.exceptions.InvalidURI, ValueError) as e:
|
||||||
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
|
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
|
||||||
break
|
break
|
||||||
@ -229,11 +240,19 @@ class ExternalMessageConsumer:
|
|||||||
# An unforseen error has occurred, log and continue
|
# An unforseen error has occurred, log and continue
|
||||||
logger.error("Unexpected error has occurred:")
|
logger.error("Unexpected error has occurred:")
|
||||||
logger.exception(e)
|
logger.exception(e)
|
||||||
|
await asyncio.sleep(self.sleep_time)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
finally:
|
async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream):
|
||||||
if channel:
|
# Send the initial requests
|
||||||
await channel.close()
|
for init_request in self._initial_requests:
|
||||||
|
await channel.send(schema_to_dict(init_request))
|
||||||
|
|
||||||
|
# Now send any subsequent requests published to
|
||||||
|
# this channel's stream
|
||||||
|
async for request, _ in channel_stream:
|
||||||
|
logger.debug(f"Sending request to channel - {channel} - {request}")
|
||||||
|
await channel.send(request)
|
||||||
|
|
||||||
async def _receive_messages(
|
async def _receive_messages(
|
||||||
self,
|
self,
|
||||||
@ -270,19 +289,31 @@ class ExternalMessageConsumer:
|
|||||||
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
|
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
|
||||||
|
|
||||||
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
|
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
|
||||||
|
|
||||||
continue
|
continue
|
||||||
except (websockets.exceptions.ConnectionClosed):
|
|
||||||
# Just eat the error and continue reconnecting
|
|
||||||
logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s")
|
|
||||||
await asyncio.sleep(self.sleep_time)
|
|
||||||
break
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# Just eat the error and continue reconnecting
|
||||||
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
|
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
|
||||||
logger.debug(e, exc_info=e)
|
logger.debug(e, exc_info=e)
|
||||||
await asyncio.sleep(self.sleep_time)
|
raise
|
||||||
|
|
||||||
break
|
def send_producer_request(
|
||||||
|
self,
|
||||||
|
producer_name: str,
|
||||||
|
request: Union[WSRequestSchema, Dict[str, Any]]
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Publish a message to the producer's message stream to be
|
||||||
|
sent by the channel task.
|
||||||
|
|
||||||
|
:param producer_name: The name of the producer to publish the message to
|
||||||
|
:param request: The request to send to the producer
|
||||||
|
"""
|
||||||
|
if isinstance(request, WSRequestSchema):
|
||||||
|
request = schema_to_dict(request)
|
||||||
|
|
||||||
|
if channel_stream := self._channel_streams.get(producer_name):
|
||||||
|
channel_stream.publish(request)
|
||||||
|
|
||||||
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
|
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
|
||||||
"""
|
"""
|
||||||
@ -336,16 +367,45 @@ class ExternalMessageConsumer:
|
|||||||
|
|
||||||
pair, timeframe, candle_type = key
|
pair, timeframe, candle_type = key
|
||||||
|
|
||||||
|
if df.empty:
|
||||||
|
logger.debug(f"Received Empty Dataframe for {key}")
|
||||||
|
return
|
||||||
|
|
||||||
# If set, remove the Entry and Exit signals from the Producer
|
# If set, remove the Entry and Exit signals from the Producer
|
||||||
if self._emc_config.get('remove_entry_exit_signals', False):
|
if self._emc_config.get('remove_entry_exit_signals', False):
|
||||||
df = remove_entry_exit_signals(df)
|
df = remove_entry_exit_signals(df)
|
||||||
|
|
||||||
# Add the dataframe to the dataprovider
|
logger.debug(f"Received {len(df)} candle(s) for {key}")
|
||||||
self._dp._add_external_df(pair, df,
|
|
||||||
last_analyzed=la,
|
did_append, n_missing = self._dp._add_external_df(
|
||||||
timeframe=timeframe,
|
pair,
|
||||||
candle_type=candle_type,
|
df,
|
||||||
producer_name=producer_name)
|
last_analyzed=la,
|
||||||
|
timeframe=timeframe,
|
||||||
|
candle_type=candle_type,
|
||||||
|
producer_name=producer_name
|
||||||
|
)
|
||||||
|
|
||||||
|
if not did_append:
|
||||||
|
# We want an overlap in candles incase some data has changed
|
||||||
|
n_missing += 1
|
||||||
|
# Set to None for all candles if we missed a full df's worth of candles
|
||||||
|
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
|
||||||
|
|
||||||
|
logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles "
|
||||||
|
f"for {key} from `{producer_name}`")
|
||||||
|
|
||||||
|
self.send_producer_request(
|
||||||
|
producer_name,
|
||||||
|
WSAnalyzedDFRequest(
|
||||||
|
data={
|
||||||
|
"limit": n_missing,
|
||||||
|
"pair": pair
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`")
|
f"Consumed message from `{producer_name}` "
|
||||||
|
f"of type `RPCMessageType.ANALYZED_DF` for {key}")
|
||||||
|
@ -1062,15 +1062,26 @@ class RPC:
|
|||||||
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'],
|
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'],
|
||||||
pair, timeframe, _data, last_analyzed)
|
pair, timeframe, _data, last_analyzed)
|
||||||
|
|
||||||
def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str,
|
def __rpc_analysed_dataframe_raw(
|
||||||
limit: Optional[int]) -> Tuple[DataFrame, datetime]:
|
self,
|
||||||
""" Get the dataframe and last analyze from the dataprovider """
|
pair: str,
|
||||||
|
timeframe: str,
|
||||||
|
limit: Optional[int]
|
||||||
|
) -> Tuple[DataFrame, datetime]:
|
||||||
|
"""
|
||||||
|
Get the dataframe and last analyze from the dataprovider
|
||||||
|
|
||||||
|
:param pair: The pair to get
|
||||||
|
:param timeframe: The timeframe of data to get
|
||||||
|
:param limit: The amount of candles in the dataframe
|
||||||
|
"""
|
||||||
_data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe(
|
_data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe(
|
||||||
pair, timeframe)
|
pair, timeframe)
|
||||||
_data = _data.copy()
|
_data = _data.copy()
|
||||||
|
|
||||||
if limit:
|
if limit:
|
||||||
_data = _data.iloc[-limit:]
|
_data = _data.iloc[-limit:]
|
||||||
|
|
||||||
return _data, last_analyzed
|
return _data, last_analyzed
|
||||||
|
|
||||||
def _ws_all_analysed_dataframes(
|
def _ws_all_analysed_dataframes(
|
||||||
@ -1078,7 +1089,16 @@ class RPC:
|
|||||||
pairlist: List[str],
|
pairlist: List[str],
|
||||||
limit: Optional[int]
|
limit: Optional[int]
|
||||||
) -> Generator[Dict[str, Any], None, None]:
|
) -> Generator[Dict[str, Any], None, None]:
|
||||||
""" Get the analysed dataframes of each pair in the pairlist """
|
"""
|
||||||
|
Get the analysed dataframes of each pair in the pairlist.
|
||||||
|
If specified, only return the most recent `limit` candles for
|
||||||
|
each dataframe.
|
||||||
|
|
||||||
|
:param pairlist: A list of pairs to get
|
||||||
|
:param limit: If an integer, limits the size of dataframe
|
||||||
|
If a list of string date times, only returns those candles
|
||||||
|
:returns: A generator of dictionaries with the key, dataframe, and last analyzed timestamp
|
||||||
|
"""
|
||||||
timeframe = self._freqtrade.config['timeframe']
|
timeframe = self._freqtrade.config['timeframe']
|
||||||
candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT)
|
candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT)
|
||||||
|
|
||||||
@ -1091,10 +1111,15 @@ class RPC:
|
|||||||
"la": last_analyzed
|
"la": last_analyzed
|
||||||
}
|
}
|
||||||
|
|
||||||
def _ws_request_analyzed_df(self, limit: Optional[int]):
|
def _ws_request_analyzed_df(
|
||||||
|
self,
|
||||||
|
limit: Optional[int] = None,
|
||||||
|
pair: Optional[str] = None
|
||||||
|
):
|
||||||
""" Historical Analyzed Dataframes for WebSocket """
|
""" Historical Analyzed Dataframes for WebSocket """
|
||||||
whitelist = self._freqtrade.active_pair_whitelist
|
pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist
|
||||||
return self._ws_all_analysed_dataframes(whitelist, limit)
|
|
||||||
|
return self._ws_all_analysed_dataframes(pairlist, limit)
|
||||||
|
|
||||||
def _ws_request_whitelist(self):
|
def _ws_request_whitelist(self):
|
||||||
""" Whitelist data for WebSocket """
|
""" Whitelist data for WebSocket """
|
||||||
|
@ -2,13 +2,13 @@ from datetime import datetime, timezone
|
|||||||
from unittest.mock import MagicMock
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from pandas import DataFrame
|
from pandas import DataFrame, Timestamp
|
||||||
|
|
||||||
from freqtrade.data.dataprovider import DataProvider
|
from freqtrade.data.dataprovider import DataProvider
|
||||||
from freqtrade.enums import CandleType, RunMode
|
from freqtrade.enums import CandleType, RunMode
|
||||||
from freqtrade.exceptions import ExchangeError, OperationalException
|
from freqtrade.exceptions import ExchangeError, OperationalException
|
||||||
from freqtrade.plugins.pairlistmanager import PairListManager
|
from freqtrade.plugins.pairlistmanager import PairListManager
|
||||||
from tests.conftest import get_patched_exchange
|
from tests.conftest import generate_test_data, get_patched_exchange
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('candle_type', [
|
@pytest.mark.parametrize('candle_type', [
|
||||||
@ -161,9 +161,9 @@ def test_producer_pairs(mocker, default_conf, ohlcv_history):
|
|||||||
assert dataprovider.get_producer_pairs("bad") == []
|
assert dataprovider.get_producer_pairs("bad") == []
|
||||||
|
|
||||||
|
|
||||||
def test_get_producer_df(mocker, default_conf, ohlcv_history):
|
def test_get_producer_df(mocker, default_conf):
|
||||||
dataprovider = DataProvider(default_conf, None)
|
dataprovider = DataProvider(default_conf, None)
|
||||||
|
ohlcv_history = generate_test_data('5m', 150)
|
||||||
pair = 'BTC/USDT'
|
pair = 'BTC/USDT'
|
||||||
timeframe = default_conf['timeframe']
|
timeframe = default_conf['timeframe']
|
||||||
candle_type = CandleType.SPOT
|
candle_type = CandleType.SPOT
|
||||||
@ -412,3 +412,80 @@ def test_dp_send_msg(default_conf):
|
|||||||
dp = DataProvider(default_conf, None)
|
dp = DataProvider(default_conf, None)
|
||||||
dp.send_msg(msg, always_send=True)
|
dp.send_msg(msg, always_send=True)
|
||||||
assert msg not in dp._msg_queue
|
assert msg not in dp._msg_queue
|
||||||
|
|
||||||
|
|
||||||
|
def test_dp__add_external_df(default_conf_usdt):
|
||||||
|
timeframe = '1h'
|
||||||
|
default_conf_usdt["timeframe"] = timeframe
|
||||||
|
dp = DataProvider(default_conf_usdt, None)
|
||||||
|
df = generate_test_data(timeframe, 24, '2022-01-01 00:00:00+00:00')
|
||||||
|
last_analyzed = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
res = dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is False
|
||||||
|
# Why 1000 ??
|
||||||
|
assert res[1] == 1000
|
||||||
|
|
||||||
|
# Hard add dataframe
|
||||||
|
dp._replace_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
# BTC is not stored yet
|
||||||
|
res = dp._add_external_df('BTC/USDT', df, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is False
|
||||||
|
df_res, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT)
|
||||||
|
assert len(df_res) == 24
|
||||||
|
|
||||||
|
# Add the same dataframe again - dataframe size shall not change.
|
||||||
|
res = dp._add_external_df('ETH/USDT', df, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is True
|
||||||
|
assert res[1] == 0
|
||||||
|
df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT)
|
||||||
|
assert len(df) == 24
|
||||||
|
|
||||||
|
# Add a new day.
|
||||||
|
df2 = generate_test_data(timeframe, 24, '2022-01-02 00:00:00+00:00')
|
||||||
|
|
||||||
|
res = dp._add_external_df('ETH/USDT', df2, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is True
|
||||||
|
assert res[1] == 0
|
||||||
|
df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT)
|
||||||
|
assert len(df) == 48
|
||||||
|
|
||||||
|
# Add a dataframe with a 12 hour offset - so 12 candles are overlapping, and 12 valid.
|
||||||
|
df3 = generate_test_data(timeframe, 24, '2022-01-02 12:00:00+00:00')
|
||||||
|
|
||||||
|
res = dp._add_external_df('ETH/USDT', df3, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is True
|
||||||
|
assert res[1] == 0
|
||||||
|
df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT)
|
||||||
|
# New length = 48 + 12 (since we have a 12 hour offset).
|
||||||
|
assert len(df) == 60
|
||||||
|
assert df.iloc[-1]['date'] == df3.iloc[-1]['date']
|
||||||
|
assert df.iloc[-1]['date'] == Timestamp('2022-01-03 11:00:00+00:00')
|
||||||
|
|
||||||
|
# Generate 1 new candle
|
||||||
|
df4 = generate_test_data(timeframe, 1, '2022-01-03 12:00:00+00:00')
|
||||||
|
res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
# assert res[0] is True
|
||||||
|
# assert res[1] == 0
|
||||||
|
df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT)
|
||||||
|
# New length = 61 + 1
|
||||||
|
assert len(df) == 61
|
||||||
|
assert df.iloc[-2]['date'] == Timestamp('2022-01-03 11:00:00+00:00')
|
||||||
|
assert df.iloc[-1]['date'] == Timestamp('2022-01-03 12:00:00+00:00')
|
||||||
|
|
||||||
|
# Gap in the data ...
|
||||||
|
df4 = generate_test_data(timeframe, 1, '2022-01-05 00:00:00+00:00')
|
||||||
|
res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is False
|
||||||
|
# 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00
|
||||||
|
assert res[1] == 36
|
||||||
|
df, _ = dp.get_producer_df('ETH/USDT', timeframe, CandleType.SPOT)
|
||||||
|
# New length = 61 + 1
|
||||||
|
assert len(df) == 61
|
||||||
|
|
||||||
|
# Empty dataframe
|
||||||
|
df4 = generate_test_data(timeframe, 0, '2022-01-05 00:00:00+00:00')
|
||||||
|
res = dp._add_external_df('ETH/USDT', df4, last_analyzed, timeframe, CandleType.SPOT)
|
||||||
|
assert res[0] is False
|
||||||
|
# 36 hours - from 2022-01-03 12:00:00+00:00 to 2022-01-05 00:00:00+00:00
|
||||||
|
assert res[1] == 0
|
||||||
|
@ -83,6 +83,7 @@ def test_emc_init(patched_emc):
|
|||||||
def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
||||||
test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"}
|
test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"}
|
||||||
producer_name = test_producer['name']
|
producer_name = test_producer['name']
|
||||||
|
invalid_msg = r"Invalid message .+"
|
||||||
|
|
||||||
caplog.set_level(logging.DEBUG)
|
caplog.set_level(logging.DEBUG)
|
||||||
|
|
||||||
@ -94,7 +95,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
|||||||
assert log_has(
|
assert log_has(
|
||||||
f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog)
|
f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog)
|
||||||
|
|
||||||
# Test handle analyzed_df message
|
# Test handle analyzed_df single candle message
|
||||||
df_message = {
|
df_message = {
|
||||||
"type": "analyzed_df",
|
"type": "analyzed_df",
|
||||||
"data": {
|
"data": {
|
||||||
@ -106,8 +107,7 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
|||||||
patched_emc.handle_producer_message(test_producer, df_message)
|
patched_emc.handle_producer_message(test_producer, df_message)
|
||||||
|
|
||||||
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
||||||
assert log_has(
|
assert log_has_re(r"Holes in data or no existing df,.+", caplog)
|
||||||
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`", caplog)
|
|
||||||
|
|
||||||
# Test unhandled message
|
# Test unhandled message
|
||||||
unhandled_message = {"type": "status", "data": "RUNNING"}
|
unhandled_message = {"type": "status", "data": "RUNNING"}
|
||||||
@ -120,7 +120,8 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
|||||||
malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}}
|
malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}}
|
||||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||||
|
|
||||||
assert log_has_re(r"Invalid message .+", caplog)
|
assert log_has_re(invalid_msg, caplog)
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
malformed_message = {
|
malformed_message = {
|
||||||
"type": "analyzed_df",
|
"type": "analyzed_df",
|
||||||
@ -133,13 +134,30 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
|||||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||||
|
|
||||||
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
||||||
assert log_has_re(r"Invalid message .+", caplog)
|
assert log_has_re(invalid_msg, caplog)
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
|
# Empty dataframe
|
||||||
|
malformed_message = {
|
||||||
|
"type": "analyzed_df",
|
||||||
|
"data": {
|
||||||
|
"key": ("BTC/USDT", "5m", "spot"),
|
||||||
|
"df": ohlcv_history.loc[ohlcv_history['open'] < 0],
|
||||||
|
"la": datetime.now(timezone.utc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||||
|
|
||||||
|
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
||||||
|
assert not log_has_re(invalid_msg, caplog)
|
||||||
|
assert log_has_re(r"Received Empty Dataframe for.+", caplog)
|
||||||
|
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
malformed_message = {"some": "stuff"}
|
malformed_message = {"some": "stuff"}
|
||||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||||
|
|
||||||
assert log_has_re(r"Invalid message .+", caplog)
|
assert log_has_re(invalid_msg, caplog)
|
||||||
|
caplog.clear()
|
||||||
|
|
||||||
caplog.clear()
|
caplog.clear()
|
||||||
malformed_message = {"type": "whitelist", "data": None}
|
malformed_message = {"type": "whitelist", "data": None}
|
||||||
@ -183,7 +201,7 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker):
|
|||||||
async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT):
|
async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT):
|
||||||
await emc._create_connection(test_producer, lock)
|
await emc._create_connection(test_producer, lock)
|
||||||
|
|
||||||
assert log_has_re(r"Producer connection success.+", caplog)
|
assert log_has_re(r"Connected to channel.+", caplog)
|
||||||
finally:
|
finally:
|
||||||
emc.shutdown()
|
emc.shutdown()
|
||||||
|
|
||||||
@ -212,7 +230,8 @@ async def test_emc_create_connection_invalid_url(default_conf, caplog, mocker, h
|
|||||||
|
|
||||||
dp = DataProvider(default_conf, None, None, None)
|
dp = DataProvider(default_conf, None, None, None)
|
||||||
# Handle start explicitly to avoid messing with threading in tests
|
# Handle start explicitly to avoid messing with threading in tests
|
||||||
mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start",)
|
mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start")
|
||||||
|
mocker.patch("freqtrade.rpc.api_server.ws.channel.create_channel")
|
||||||
emc = ExternalMessageConsumer(default_conf, dp)
|
emc = ExternalMessageConsumer(default_conf, dp)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -390,7 +409,9 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
|
|||||||
try:
|
try:
|
||||||
change_running(emc)
|
change_running(emc)
|
||||||
loop.call_soon(functools.partial(change_running, emc=emc))
|
loop.call_soon(functools.partial(change_running, emc=emc))
|
||||||
await emc._receive_messages(TestChannel(), test_producer, lock)
|
|
||||||
|
with pytest.raises(asyncio.TimeoutError):
|
||||||
|
await emc._receive_messages(TestChannel(), test_producer, lock)
|
||||||
|
|
||||||
assert log_has_re(r"Ping error.+", caplog)
|
assert log_has_re(r"Ping error.+", caplog)
|
||||||
finally:
|
finally:
|
||||||
|
Loading…
Reference in New Issue
Block a user