From 3e8d8fd1b08e28f8ec231de9ee3be57a539b266e Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 9 Oct 2022 15:04:52 -0600 Subject: [PATCH 1/7] refactor broadcasting to a queue per client --- freqtrade/rpc/api_server/api_ws.py | 24 ++++++++++++------- freqtrade/rpc/api_server/webserver.py | 1 + freqtrade/rpc/api_server/ws/channel.py | 32 ++++++++++++++++++++------ 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index f55b2dbd3..b60210143 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -1,6 +1,7 @@ import logging from typing import Any, Dict +import websockets from fastapi import APIRouter, Depends, WebSocketDisconnect from fastapi.websockets import WebSocket, WebSocketState from pydantic import ValidationError @@ -102,7 +103,6 @@ async def message_endpoint( """ try: channel = await channel_manager.on_connect(ws) - if await is_websocket_alive(ws): logger.info(f"Consumer connected - {channel}") @@ -115,26 +115,34 @@ async def message_endpoint( # Process the request here await _process_consumer_request(request, channel, rpc) - except WebSocketDisconnect: + except ( + WebSocketDisconnect, + websockets.exceptions.WebSocketException + ): # Handle client disconnects logger.info(f"Consumer disconnected - {channel}") - await channel_manager.on_disconnect(ws) - except Exception as e: - logger.info(f"Consumer connection failed - {channel}") - logger.exception(e) + except RuntimeError: # Handle cases like - # RuntimeError('Cannot call "send" once a closed message has been sent') + pass + except Exception as e: + logger.info(f"Consumer connection failed - {channel}") + logger.debug(e, exc_info=e) + finally: await channel_manager.on_disconnect(ws) else: + if channel: + await channel_manager.on_disconnect(ws) await ws.close() except RuntimeError: # WebSocket was closed - await channel_manager.on_disconnect(ws) - + # Do nothing + pass except Exception as e: logger.error(f"Failed to serve - {ws.client}") # Log tracebacks to keep track of what errors are happening logger.exception(e) + finally: await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 53af91477..4a09fd78e 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -245,6 +245,7 @@ class ApiServer(RPCHandler): use_colors=False, log_config=None, access_log=True if verbosity != 'error' else False, + ws_ping_interval=None # We do this explicitly ourselves ) try: self._server = UvicornServer(uvconfig) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 69a32e266..8c6c56d6e 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -1,6 +1,7 @@ +import asyncio import logging from threading import RLock -from typing import List, Optional, Type +from typing import Any, Dict, List, Optional, Type from uuid import uuid4 from fastapi import WebSocket as FastAPIWebSocket @@ -34,6 +35,8 @@ class WebSocketChannel: self._serializer_cls = serializer_cls self._subscriptions: List[str] = [] + self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + self._relay_task = asyncio.create_task(self.relay()) # Internal event to signify a closed websocket self._closed = False @@ -72,6 +75,7 @@ class WebSocketChannel: """ self._closed = True + self._relay_task.cancel() def is_closed(self) -> bool: """ @@ -95,6 +99,20 @@ class WebSocketChannel: """ return message_type in self._subscriptions + async def relay(self): + """ + Relay messages from the channel's queue and send them out. This is started + as a task. + """ + while True: + message = await self.queue.get() + try: + await self.send(message) + self.queue.task_done() + except RuntimeError: + # The connection was closed, just exit the task + return + class ChannelManager: def __init__(self): @@ -155,12 +173,12 @@ class ChannelManager: with self._lock: message_type = data.get('type') for websocket, channel in self.channels.copy().items(): - try: - if channel.subscribed_to(message_type): - await channel.send(data) - except RuntimeError: - # Handle cannot send after close cases - await self.on_disconnect(websocket) + if channel.subscribed_to(message_type): + if not channel.queue.full(): + channel.queue.put_nowait(data) + else: + logger.info(f"Channel {channel} is too far behind, disconnecting") + await self.on_disconnect(websocket) async def send_direct(self, channel, data): """ From 2f64a086238b383312dd2db9aea104690aac5a8a Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 9 Oct 2022 15:11:58 -0600 Subject: [PATCH 2/7] set channel queue maxsize to 32 --- freqtrade/rpc/api_server/ws/channel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 8c6c56d6e..91875abfb 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -35,7 +35,7 @@ class WebSocketChannel: self._serializer_cls = serializer_cls self._subscriptions: List[str] = [] - self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32) self._relay_task = asyncio.create_task(self.relay()) # Internal event to signify a closed websocket From 2c76dd9e39c8742dfc39b82eae407f11c9be7c91 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 9 Oct 2022 15:23:56 -0600 Subject: [PATCH 3/7] change wait timeout to 30 seconds to better support reverse proxies --- freqtrade/rpc/external_message_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index f5ba4b490..a182ddc57 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -62,7 +62,7 @@ class ExternalMessageConsumer: self.enabled = self._emc_config.get('enabled', False) self.producers: List[Producer] = self._emc_config.get('producers', []) - self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds + self.wait_timeout = self._emc_config.get('wait_timeout', 30) # in seconds self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds From 71bbffd10a0fcac0ca5992d7ab4c62e0ed6e9bf8 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 9 Oct 2022 18:49:04 -0600 Subject: [PATCH 4/7] update ws channel send to add data to queue --- freqtrade/rpc/api_server/ws/channel.py | 12 +++++++++--- freqtrade/rpc/external_message_consumer.py | 5 +++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 91875abfb..7cee95e6d 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -51,12 +51,18 @@ class WebSocketChannel: def remote_addr(self): return self._websocket.remote_addr - async def send(self, data): + async def _send(self, data): """ Send data on the wrapped websocket """ await self._wrapped_ws.send(data) + async def send(self, data): + """ + Add the data to the queue to be sent + """ + self.queue.put_nowait(data) + async def recv(self): """ Receive data on the wrapped websocket @@ -107,7 +113,7 @@ class WebSocketChannel: while True: message = await self.queue.get() try: - await self.send(message) + await self._send(message) self.queue.task_done() except RuntimeError: # The connection was closed, just exit the task @@ -175,7 +181,7 @@ class ChannelManager: for websocket, channel in self.channels.copy().items(): if channel.subscribed_to(message_type): if not channel.queue.full(): - channel.queue.put_nowait(data) + await channel.send(data) else: logger.info(f"Channel {channel} is too far behind, disconnecting") await self.on_disconnect(websocket) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index a182ddc57..88ade185e 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -174,6 +174,7 @@ class ExternalMessageConsumer: :param producer: Dictionary containing producer info :param lock: An asyncio Lock """ + channel = None while self._running: try: host, port = producer['host'], producer['port'] @@ -224,6 +225,10 @@ class ExternalMessageConsumer: logger.exception(e) continue + finally: + if channel: + await channel.close() + async def _receive_messages( self, channel: WebSocketChannel, From db8cf6c957cdf56b3c1bb7ec8f27c445a0933070 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 9 Oct 2022 18:51:52 -0600 Subject: [PATCH 5/7] disable ping interval in client --- freqtrade/rpc/external_message_consumer.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 88ade185e..01bc974ad 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -183,7 +183,11 @@ class ExternalMessageConsumer: ws_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}" # This will raise InvalidURI if the url is bad - async with websockets.connect(ws_url, max_size=self.message_size_limit) as ws: + async with websockets.connect( + ws_url, + max_size=self.message_size_limit, + ping_interval=None + ) as ws: channel = WebSocketChannel(ws, channel_id=name) logger.info(f"Producer connection success - {channel}") From 5ada5eb54001724775cace6335fd9be64d4ec31e Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 10 Oct 2022 23:30:43 -0600 Subject: [PATCH 6/7] fix error message, update exception import --- freqtrade/rpc/api_server/api_ws.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index b60210143..46909955d 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -1,10 +1,10 @@ import logging from typing import Any, Dict -import websockets from fastapi import APIRouter, Depends, WebSocketDisconnect from fastapi.websockets import WebSocket, WebSocketState from pydantic import ValidationError +from websockets.exceptions import WebSocketException from freqtrade.enums import RPCMessageType, RPCRequestType from freqtrade.rpc.api_server.api_auth import validate_ws_token @@ -115,10 +115,7 @@ async def message_endpoint( # Process the request here await _process_consumer_request(request, channel, rpc) - except ( - WebSocketDisconnect, - websockets.exceptions.WebSocketException - ): + except (WebSocketDisconnect, WebSocketException): # Handle client disconnects logger.info(f"Consumer disconnected - {channel}") except RuntimeError: @@ -126,7 +123,7 @@ async def message_endpoint( # RuntimeError('Cannot call "send" once a closed message has been sent') pass except Exception as e: - logger.info(f"Consumer connection failed - {channel}") + logger.info(f"Consumer connection failed - {channel}: {e}") logger.debug(e, exc_info=e) finally: await channel_manager.on_disconnect(ws) From eb8c89fe3188fefc284d0c825ab29e98c59096ab Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 10 Oct 2022 23:32:10 -0600 Subject: [PATCH 7/7] move send delay to relay --- freqtrade/rpc/api_server/webserver.py | 4 ---- freqtrade/rpc/api_server/ws/channel.py | 6 ++++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 4a09fd78e..c6639f1a6 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -198,10 +198,6 @@ class ApiServer(RPCHandler): logger.debug(f"Found message of type: {message.get('type')}") # Broadcast it await self._ws_channel_manager.broadcast(message) - # Limit messages per sec. - # Could cause problems with queue size if too low, and - # problems with network traffik if too high. - await asyncio.sleep(0.001) except asyncio.CancelledError: pass diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 7cee95e6d..e9dbd63be 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -115,6 +115,12 @@ class WebSocketChannel: try: await self._send(message) self.queue.task_done() + + # Limit messages per sec. + # Could cause problems with queue size if too low, and + # problems with network traffik if too high. + # 0.001 = 1000/s + await asyncio.sleep(0.001) except RuntimeError: # The connection was closed, just exit the task return