From e25dea7e0e1963ac4cf367cddd6a1bb76ac83016 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 13:26:27 -0600 Subject: [PATCH] update channel disconnecting --- freqtrade/rpc/api_server/api_ws.py | 10 ++-------- freqtrade/rpc/api_server/ws/channel.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index b230cbe2b..118d70d78 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -127,13 +127,6 @@ async def message_endpoint( except Exception as e: logger.info(f"Consumer connection failed - {channel}: {e}") logger.debug(e, exc_info=e) - finally: - await channel_manager.on_disconnect(ws) - - else: - if channel: - await channel_manager.on_disconnect(ws) - await ws.close() except RuntimeError: # WebSocket was closed @@ -144,4 +137,5 @@ async def message_endpoint( # Log tracebacks to keep track of what errors are happening logger.exception(e) finally: - await channel_manager.on_disconnect(ws) + if channel: + await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 34f03f0c4..ec1b4c639 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -46,7 +46,7 @@ class WebSocketChannel: self._relay_task = asyncio.create_task(self.relay()) # Internal event to signify a closed websocket - self._closed = False + self._closed = asyncio.Event() # Wrap the WebSocket in the Serializing class self._wrapped_ws = self._serializer_cls(self._websocket) @@ -99,14 +99,19 @@ class WebSocketChannel: Close the WebSocketChannel """ - self._closed = True + try: + await self.raw_websocket.close() + except Exception: + pass + + self._closed.set() self._relay_task.cancel() def is_closed(self) -> bool: """ Closed flag """ - return self._closed + return self._closed.is_set() def set_subscriptions(self, subscriptions: List[str] = []) -> None: """ @@ -129,7 +134,7 @@ class WebSocketChannel: Relay messages from the channel's queue and send them out. This is started as a task. """ - while True: + while not self._closed.is_set(): message = await self.queue.get() try: await self._send(message)