diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 77950923d..a80250c1b 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -1,4 +1,5 @@ import logging +import time from typing import Any, Dict from fastapi import APIRouter, Depends @@ -33,8 +34,16 @@ async def channel_broadcaster(channel: WebSocketChannel, message_stream: Message """ Iterate over messages in the message stream and send them """ - async for message in message_stream: + async for message, ts in message_stream: if channel.subscribed_to(message.get('type')): + # Log a warning if this channel is behind + # on the message stream by a lot + if (time.time() - ts) > 60: + logger.warning("Channel {channel} is behind MessageStream by 1 minute," + " this can cause a memory leak if you see this message" + " often, consider reducing pair list size or amount of" + " consumers.") + await channel.send(message, timeout=True) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 7343bc306..a5f3b6216 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -59,6 +59,10 @@ class WebSocketChannel: def remote_addr(self): return self._websocket.remote_addr + @property + def avg_send_time(self): + return sum(self._send_times) / len(self._send_times) + def _calc_send_limit(self): """ Calculate the send high limit for this channel @@ -66,11 +70,9 @@ class WebSocketChannel: # Only update if we have enough data if len(self._send_times) == self._send_times.maxlen: - # At least 1s or twice the average of send times - self._send_high_limit = max( - (sum(self._send_times) / len(self._send_times)) * 2, - 1 - ) + # At least 1s or twice the average of send times, with a + # maximum of 3 seconds per message + self._send_high_limit = min(max(self.avg_send_time * 2, 1), 3) async def send( self, diff --git a/freqtrade/rpc/api_server/ws/message_stream.py b/freqtrade/rpc/api_server/ws/message_stream.py index 9592908ab..a55a0da3c 100644 --- a/freqtrade/rpc/api_server/ws/message_stream.py +++ b/freqtrade/rpc/api_server/ws/message_stream.py @@ -1,4 +1,5 @@ import asyncio +import time class MessageStream: @@ -11,14 +12,20 @@ class MessageStream: self._waiter = self._loop.create_future() def publish(self, message): - waiter, self._waiter = self._waiter, self._loop.create_future() - waiter.set_result((message, self._waiter)) + """ + Publish a message to this MessageStream - async def subscribe(self): + :param message: The message to publish + """ + waiter, self._waiter = self._waiter, self._loop.create_future() + waiter.set_result((message, time.time(), self._waiter)) + + async def __aiter__(self): + """ + Iterate over the messages in the message stream + """ waiter = self._waiter while True: # Shield the future from being cancelled by a task waiting on it - message, waiter = await asyncio.shield(waiter) - yield message - - __aiter__ = subscribe + message, ts, waiter = await asyncio.shield(waiter) + yield message, ts