diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index e9dbd63be..a1334bce9 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -24,6 +24,7 @@ class WebSocketChannel: self, websocket: WebSocketType, channel_id: Optional[str] = None, + drain_timeout: int = 3, serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer ): @@ -34,6 +35,8 @@ class WebSocketChannel: # The Serializing class for the WebSocket object self._serializer_cls = serializer_cls + self.drain_timeout = drain_timeout + self._subscriptions: List[str] = [] self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32) self._relay_task = asyncio.create_task(self.relay()) @@ -57,11 +60,19 @@ class WebSocketChannel: """ await self._wrapped_ws.send(data) - async def send(self, data): + async def send(self, data) -> bool: """ - Add the data to the queue to be sent + Add the data to the queue to be sent. + :returns: True if data added to queue, False otherwise """ - self.queue.put_nowait(data) + try: + await asyncio.wait_for( + self.queue.put(data), + timeout=self.drain_timeout + ) + return True + except asyncio.TimeoutError: + return False async def recv(self): """ @@ -119,8 +130,8 @@ class WebSocketChannel: # Limit messages per sec. # Could cause problems with queue size if too low, and # problems with network traffik if too high. - # 0.001 = 1000/s - await asyncio.sleep(0.001) + # 0.01 = 100/s + await asyncio.sleep(0.01) except RuntimeError: # The connection was closed, just exit the task return @@ -186,9 +197,7 @@ class ChannelManager: message_type = data.get('type') for websocket, channel in self.channels.copy().items(): if channel.subscribed_to(message_type): - if not channel.queue.full(): - await channel.send(data) - else: + if not await channel.send(data): logger.info(f"Channel {channel} is too far behind, disconnecting") await self.on_disconnect(websocket)