From e25dea7e0e1963ac4cf367cddd6a1bb76ac83016 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 13:26:27 -0600 Subject: [PATCH 01/10] update channel disconnecting --- freqtrade/rpc/api_server/api_ws.py | 10 ++-------- freqtrade/rpc/api_server/ws/channel.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index b230cbe2b..118d70d78 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -127,13 +127,6 @@ async def message_endpoint( except Exception as e: logger.info(f"Consumer connection failed - {channel}: {e}") logger.debug(e, exc_info=e) - finally: - await channel_manager.on_disconnect(ws) - - else: - if channel: - await channel_manager.on_disconnect(ws) - await ws.close() except RuntimeError: # WebSocket was closed @@ -144,4 +137,5 @@ async def message_endpoint( # Log tracebacks to keep track of what errors are happening logger.exception(e) finally: - await channel_manager.on_disconnect(ws) + if channel: + await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 34f03f0c4..ec1b4c639 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -46,7 +46,7 @@ class WebSocketChannel: self._relay_task = asyncio.create_task(self.relay()) # Internal event to signify a closed websocket - self._closed = False + self._closed = asyncio.Event() # Wrap the WebSocket in the Serializing class self._wrapped_ws = self._serializer_cls(self._websocket) @@ -99,14 +99,19 @@ class WebSocketChannel: Close the WebSocketChannel """ - self._closed = True + try: + await self.raw_websocket.close() + except Exception: + pass + + self._closed.set() self._relay_task.cancel() def is_closed(self) -> bool: """ Closed flag """ - return self._closed + return self._closed.is_set() def set_subscriptions(self, subscriptions: List[str] = []) -> None: """ @@ -129,7 +134,7 @@ class WebSocketChannel: Relay messages from the channel's queue and send them out. This is started as a task. """ - while True: + while not self._closed.is_set(): message = await self.queue.get() try: await self._send(message) From d848c2728347410e4be65db6e7733bd2bdbcefd5 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 13:30:42 -0600 Subject: [PATCH 02/10] add task done to broadcast queue method --- freqtrade/rpc/api_server/webserver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 1d0192a89..51dceb64b 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -197,6 +197,7 @@ class ApiServer(RPCHandler): # Get data from queue message: WSMessageSchemaType = await async_queue.get() logger.debug(f"Found message of type: {message.get('type')}") + async_queue.task_done() # Broadcast it await self._ws_channel_manager.broadcast(message) except asyncio.CancelledError: From c2bdaea84a8928c420f36b6e59e5e7fff2362417 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 14:19:08 -0600 Subject: [PATCH 03/10] change exception handling in channel send --- freqtrade/rpc/api_server/ws/channel.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index ec1b4c639..f9de1c6a0 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -79,7 +79,9 @@ class WebSocketChannel: timeout=self.drain_timeout ) return True - except asyncio.TimeoutError: + except Exception: + # We must catch any exception here to prevent an exception bubbling + # up and stalling the broadcast thread return False async def recv(self): @@ -135,11 +137,14 @@ class WebSocketChannel: as a task. """ while not self._closed.is_set(): + logger.info(f"{self} Relay - queue.get") message = await self.queue.get() try: + logger.info(f"{self} Relay - sending message") await self._send(message) self.queue.task_done() + logger.info(f"{self} Relay - QSize: {self.queue.qsize()}") # Limit messages per sec. # Could cause problems with queue size if too low, and # problems with network traffik if too high. From 55bf195bfbffea4c91684ca87a63bdad8addc98b Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 14:21:34 -0600 Subject: [PATCH 04/10] remove debugging log calls --- freqtrade/rpc/api_server/ws/channel.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index f9de1c6a0..e69e51e86 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -137,14 +137,11 @@ class WebSocketChannel: as a task. """ while not self._closed.is_set(): - logger.info(f"{self} Relay - queue.get") message = await self.queue.get() try: - logger.info(f"{self} Relay - sending message") await self._send(message) self.queue.task_done() - logger.info(f"{self} Relay - QSize: {self.queue.qsize()}") # Limit messages per sec. # Could cause problems with queue size if too low, and # problems with network traffik if too high. From 2dc55e89e6a1fb1baba7631020941120efeea586 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 15:25:39 -0600 Subject: [PATCH 05/10] better error handling channel send --- freqtrade/rpc/api_server/ws/channel.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index e69e51e86..417b7725a 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -78,11 +78,12 @@ class WebSocketChannel: self.queue.put(data), timeout=self.drain_timeout ) - return True - except Exception: - # We must catch any exception here to prevent an exception bubbling - # up and stalling the broadcast thread + except asyncio.TimeoutError: return False + except RuntimeError: + pass + + return True async def recv(self): """ From cbede2e27dae6ae24b69e37e01c157c8969f7c53 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 17:57:11 -0600 Subject: [PATCH 06/10] refactor channel.send to avoid queue.put --- freqtrade/rpc/api_server/ws/channel.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 417b7725a..3a929ac26 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -1,5 +1,6 @@ import asyncio import logging +import time from threading import RLock from typing import Any, Dict, List, Optional, Type, Union from uuid import uuid4 @@ -73,16 +74,23 @@ class WebSocketChannel: Add the data to the queue to be sent. :returns: True if data added to queue, False otherwise """ - try: - await asyncio.wait_for( - self.queue.put(data), - timeout=self.drain_timeout - ) - except asyncio.TimeoutError: - return False - except RuntimeError: - pass + # This block only runs if the queue is full, it will wait + # until self.drain_timeout for the relay to drain the outgoing + # queue + start = time.time() + while self.queue.full(): + await asyncio.sleep(1) + if (time.time() - start) > self.drain_timeout: + return False + + # If for some reason the queue is still full, just return False + try: + self.queue.put_nowait(data) + except asyncio.QueueFull: + return False + + # If we got here everything is ok return True async def recv(self): From 000b0c2198397f30298b505e789b29be7189e549 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 18:00:10 -0600 Subject: [PATCH 07/10] prevent memory leaks from error in _broadcast_queue_data --- freqtrade/rpc/api_server/webserver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 51dceb64b..e9a12e4df 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -211,6 +211,9 @@ class ApiServer(RPCHandler): # Disconnect channels and stop the loop on cancel await self._ws_channel_manager.disconnect_all() self._ws_loop.stop() + # Avoid adding more items to the queue if they aren't + # going to get broadcasted. + self._ws_queue = None def start_api(self): """ From a0965606a58b5e3ba5dd46699395eadb68db2d59 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 18:49:11 -0600 Subject: [PATCH 08/10] update ws_client more verbosity, better readable time delta --- scripts/ws_client.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/scripts/ws_client.py b/scripts/ws_client.py index 23ad9296d..40b5cf466 100644 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -18,7 +18,6 @@ import orjson import pandas import rapidjson import websockets -from dateutil.relativedelta import relativedelta logger = logging.getLogger("WebSocketClient") @@ -28,7 +27,7 @@ logger = logging.getLogger("WebSocketClient") def setup_logging(filename: str): logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(filename), @@ -75,16 +74,15 @@ def load_config(configfile): def readable_timedelta(delta): """ - Convert a dateutil.relativedelta to a readable format + Convert a millisecond delta to a readable format - :param delta: A dateutil.relativedelta + :param delta: A delta between two timestamps in milliseconds :returns: The readable time difference string """ - attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds', 'microseconds'] - return ", ".join([ - '%d %s' % (getattr(delta, attr), attr if getattr(delta, attr) > 0 else attr[:-1]) - for attr in attrs if getattr(delta, attr) - ]) + seconds, milliseconds = divmod(delta, 1000) + minutes, seconds = divmod(seconds, 60) + + return f"{int(minutes)}:{int(seconds)}.{int(milliseconds)}" # ---------------------------------------------------------------------------- @@ -170,8 +168,8 @@ class ClientProtocol: def _calculate_time_difference(self): old_last_received_at = self._LAST_RECEIVED_AT - self._LAST_RECEIVED_AT = time.time() * 1e6 - time_delta = relativedelta(microseconds=(self._LAST_RECEIVED_AT - old_last_received_at)) + self._LAST_RECEIVED_AT = time.time() * 1e3 + time_delta = self._LAST_RECEIVED_AT - old_last_received_at return readable_timedelta(time_delta) @@ -272,6 +270,7 @@ async def create_client( websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK ): + logger.info("Connection was closed") # Just keep trying to connect again indefinitely await asyncio.sleep(sleep_time) From b749f3edd65412c6bee998c58dc5d9413518ef06 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 19:30:35 -0600 Subject: [PATCH 09/10] add latency measure from ping in emc and ws_client --- freqtrade/rpc/external_message_consumer.py | 8 ++++---- scripts/ws_client.py | 10 ++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index e86f44c17..b978407e4 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -264,10 +264,10 @@ class ExternalMessageConsumer: # We haven't received data yet. Check the connection and continue. try: # ping - ping = await channel.ping() + pong = await channel.ping() + latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000) - await asyncio.wait_for(ping, timeout=self.ping_timeout) - logger.debug(f"Connection to {channel} still alive...") + logger.info(f"Connection to {channel} still alive, latency: {latency}ms") continue except (websockets.exceptions.ConnectionClosed): @@ -276,7 +276,7 @@ class ExternalMessageConsumer: await asyncio.sleep(self.sleep_time) break except Exception as e: - logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s") + logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) await asyncio.sleep(self.sleep_time) diff --git a/scripts/ws_client.py b/scripts/ws_client.py index 40b5cf466..090039cde 100644 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -240,12 +240,10 @@ async def create_client( ): # Try pinging try: - pong = ws.ping() - await asyncio.wait_for( - pong, - timeout=ping_timeout - ) - logger.info("Connection still alive...") + pong = await ws.ping() + latency = (await asyncio.wait_for(pong, timeout=ping_timeout) * 1000) + + logger.info(f"Connection still alive, latency: {latency}ms") continue From ff619edebf2d981294aa1252ce605e3032b8a6ec Mon Sep 17 00:00:00 2001 From: Matthias Date: Thu, 3 Nov 2022 06:50:18 +0100 Subject: [PATCH 10/10] Improve explanation comment as to why we're waiting ourselfs --- freqtrade/rpc/api_server/ws/channel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 3a929ac26..3c97d05b1 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -76,8 +76,9 @@ class WebSocketChannel: """ # This block only runs if the queue is full, it will wait - # until self.drain_timeout for the relay to drain the outgoing - # queue + # until self.drain_timeout for the relay to drain the outgoing queue + # We can't use asyncio.wait_for here because the queue may have been created with a + # different eventloop start = time.time() while self.queue.full(): await asyncio.sleep(1)