diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 2454646ea..618490ec8 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -34,7 +34,7 @@ async def channel_broadcaster(channel: WebSocketChannel, message_stream: Message Iterate over messages in the message stream and send them """ async for message in message_stream: - await channel.send(message) + await channel.send(message, timeout=True) async def _process_consumer_request( diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 80b2ec220..5424d7440 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -1,7 +1,9 @@ import asyncio import logging +import time +from collections import deque from contextlib import asynccontextmanager -from typing import Any, Dict, List, Optional, Type, Union +from typing import Any, Deque, Dict, List, Optional, Type, Union from uuid import uuid4 from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy @@ -29,7 +31,13 @@ class WebSocketChannel: # Internal event to signify a closed websocket self._closed = asyncio.Event() - self._send_timeout_high_limit = 2 + # The async tasks created for the channel + self._channel_tasks: List[asyncio.Task] = [] + + # Deque for average send times + self._send_times: Deque[float] = deque([], maxlen=10) + # High limit defaults to 3 to start + self._send_high_limit = 3 # The subscribed message types self._subscriptions: List[str] = [] @@ -37,9 +45,6 @@ class WebSocketChannel: # Wrap the WebSocket in the Serializing class self._wrapped_ws = serializer_cls(self._websocket) - # The async tasks created for the channel - self._channel_tasks: List[asyncio.Task] = [] - def __repr__(self): return f"WebSocketChannel({self.channel_id}, {self.remote_addr})" @@ -51,20 +56,48 @@ class WebSocketChannel: def remote_addr(self): return self._websocket.remote_addr - async def send(self, message: Union[WSMessageSchemaType, Dict[str, Any]]): + def _calc_send_limit(self): """ - Send a message on the wrapped websocket + Calculate the send high limit for this channel """ - await self._wrapped_ws.send(message) - # Without this sleep, messages would send to one channel - # first then another after the first one finished and prevent - # any normal Rest API calls from processing at the same time. - # With the sleep call, it gives control to the event - # loop to schedule other channel send methods, and helps - # throttle how fast we send. - # 0.005 = 200 messages/second max throughput - await asyncio.sleep(0.005) + # Only update if we have enough data + if len(self._send_times) == self._send_times.maxlen: + # At least 1s or twice the average of send times + self._send_high_limit = max( + (sum(self._send_times) / len(self._send_times)) * 2, + 1 + ) + + async def send( + self, + message: Union[WSMessageSchemaType, Dict[str, Any]], + timeout: bool = False + ): + """ + Send a message on the wrapped websocket. If the sending + takes too long, it will raise a TimeoutError and + disconnect the connection. + + :param message: The message to send + :param timeout: Enforce send high limit, defaults to False + """ + try: + _ = time.time() + # If the send times out, it will raise + # a TimeoutError and bubble up to the + # message_endpoint to close the connection + await asyncio.wait_for( + self._wrapped_ws.send(message), + timeout=self._send_high_limit if timeout else None + ) + total_time = time.time() - _ + self._send_times.append(total_time) + + self._calc_send_limit() + except asyncio.TimeoutError: + logger.info(f"Connection for {self} is too far behind, disconnecting") + raise async def recv(self): """