From 875e9ab447a6d3d165b13a64f95eb79f94daec74 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Thu, 17 Nov 2022 11:59:03 -0700 Subject: [PATCH 1/5] change df serialization to avoid mem leak --- freqtrade/misc.py | 7 +++++-- freqtrade/rpc/api_server/ws/serializer.py | 7 ++++++- scripts/ws_client.py | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 49d33d46f..308f0b32d 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -262,7 +262,10 @@ def dataframe_to_json(dataframe: pandas.DataFrame) -> str: :param dataframe: A pandas DataFrame :returns: A JSON string of the pandas DataFrame """ - return dataframe.to_json(orient='split') + # https://github.com/pandas-dev/pandas/issues/24889 + # https://github.com/pandas-dev/pandas/issues/40443 + # We need to convert to a dict to avoid mem leak + return dataframe.to_dict(orient='tight') def json_to_dataframe(data: str) -> pandas.DataFrame: @@ -271,7 +274,7 @@ def json_to_dataframe(data: str) -> pandas.DataFrame: :param data: A JSON string :returns: A pandas DataFrame from the JSON string """ - dataframe = pandas.read_json(data, orient='split') + dataframe = pandas.DataFrame.from_dict(data, orient='tight') if 'date' in dataframe.columns: dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) diff --git a/freqtrade/rpc/api_server/ws/serializer.py b/freqtrade/rpc/api_server/ws/serializer.py index 6c402a100..8d06746f7 100644 --- a/freqtrade/rpc/api_server/ws/serializer.py +++ b/freqtrade/rpc/api_server/ws/serializer.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod import orjson import rapidjson -from pandas import DataFrame +from pandas import DataFrame, Timestamp from freqtrade.misc import dataframe_to_json, json_to_dataframe from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy @@ -52,6 +52,11 @@ def _json_default(z): '__type__': 'dataframe', '__value__': dataframe_to_json(z) } + # Pandas returns a Timestamp object, we need to + # convert it to a timestamp int (with ms) for orjson + # to handle it + if isinstance(z, Timestamp): + return z.timestamp() * 1e3 raise TypeError diff --git a/scripts/ws_client.py b/scripts/ws_client.py index 090039cde..ff437e63e 100644 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -101,7 +101,7 @@ def json_deserialize(message): :param message: The message to deserialize """ def json_to_dataframe(data: str) -> pandas.DataFrame: - dataframe = pandas.read_json(data, orient='split') + dataframe = pandas.DataFrame.from_dict(data, orient='tight') if 'date' in dataframe.columns: dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) From ce43fa5f431092bff5a23b1586f3381d1ba4cfac Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Thu, 17 Nov 2022 12:03:11 -0700 Subject: [PATCH 2/5] small fix to websocketchannel and relay --- freqtrade/rpc/api_server/ws/channel.py | 31 ++++++++++++++------------ 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 88b4db9ba..4eef738d4 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -77,21 +77,24 @@ class WebSocketChannel: # until self.drain_timeout for the relay to drain the outgoing queue # We can't use asyncio.wait_for here because the queue may have been created with a # different eventloop - start = time.time() - while self.queue.full(): - await asyncio.sleep(1) - if (time.time() - start) > self.drain_timeout: + if not self.is_closed(): + start = time.time() + while self.queue.full(): + await asyncio.sleep(1) + if (time.time() - start) > self.drain_timeout: + return False + + # If for some reason the queue is still full, just return False + try: + self.queue.put_nowait(data) + except asyncio.QueueFull: return False - # If for some reason the queue is still full, just return False - try: - self.queue.put_nowait(data) - except asyncio.QueueFull: + # If we got here everything is ok + return True + else: return False - # If we got here everything is ok - return True - async def recv(self): """ Receive data on the wrapped websocket @@ -109,14 +112,14 @@ class WebSocketChannel: Close the WebSocketChannel """ + self._closed.set() + self._relay_task.cancel() + try: await self.raw_websocket.close() except Exception: pass - self._closed.set() - self._relay_task.cancel() - def is_closed(self) -> bool: """ Closed flag From 4c7bb79c86d6167659e8dca643f7bf9cd55519af Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 18 Nov 2022 13:59:29 +0100 Subject: [PATCH 3/5] Restore prior data transfer format --- freqtrade/misc.py | 20 +++++++++++++------- scripts/ws_client.py | 2 +- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 308f0b32d..2d2c7513a 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -10,7 +10,8 @@ from typing import Any, Dict, Iterator, List, Mapping, Union from typing.io import IO from urllib.parse import urlparse -import pandas +import orjson +import pandas as pd import rapidjson from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN @@ -256,7 +257,7 @@ def parse_db_uri_for_logging(uri: str): return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@') -def dataframe_to_json(dataframe: pandas.DataFrame) -> str: +def dataframe_to_json(dataframe: pd.DataFrame) -> str: """ Serialize a DataFrame for transmission over the wire using JSON :param dataframe: A pandas DataFrame @@ -265,23 +266,28 @@ def dataframe_to_json(dataframe: pandas.DataFrame) -> str: # https://github.com/pandas-dev/pandas/issues/24889 # https://github.com/pandas-dev/pandas/issues/40443 # We need to convert to a dict to avoid mem leak - return dataframe.to_dict(orient='tight') + def default(z): + if isinstance(z, pd.Timestamp): + return z.timestamp() * 1e3 + raise TypeError + + return str(orjson.dumps(dataframe.to_dict(orient='split'), default=default), 'utf-8') -def json_to_dataframe(data: str) -> pandas.DataFrame: +def json_to_dataframe(data: str) -> pd.DataFrame: """ Deserialize JSON into a DataFrame :param data: A JSON string :returns: A pandas DataFrame from the JSON string """ - dataframe = pandas.DataFrame.from_dict(data, orient='tight') + dataframe = pd.read_json(data, orient='split') if 'date' in dataframe.columns: - dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) + dataframe['date'] = pd.to_datetime(dataframe['date'], unit='ms', utc=True) return dataframe -def remove_entry_exit_signals(dataframe: pandas.DataFrame): +def remove_entry_exit_signals(dataframe: pd.DataFrame): """ Remove Entry and Exit signals from a DataFrame diff --git a/scripts/ws_client.py b/scripts/ws_client.py index ff437e63e..090039cde 100644 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -101,7 +101,7 @@ def json_deserialize(message): :param message: The message to deserialize """ def json_to_dataframe(data: str) -> pandas.DataFrame: - dataframe = pandas.DataFrame.from_dict(data, orient='tight') + dataframe = pandas.read_json(data, orient='split') if 'date' in dataframe.columns: dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) From 12cd83453c18b9962ba80f6a6a13434748917456 Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 18 Nov 2022 14:03:56 +0100 Subject: [PATCH 4/5] Add warning when queue websocket queue becomes too full --- freqtrade/rpc/api_server/webserver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 6464ae44e..ec4907e67 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -194,6 +194,9 @@ class ApiServer(RPCHandler): try: while True: logger.debug("Getting queue messages...") + if (qsize := async_queue.qsize()) > 20: + # If the queue becomes too big for too long, this may indicate a problem. + logger.warning(f"Queue size now {qsize}") # Get data from queue message: WSMessageSchemaType = await async_queue.get() logger.debug(f"Found message of type: {message.get('type')}") From b6a8e421f1b2ddb7112773c686867cc6debacf88 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 18 Nov 2022 09:43:39 -0700 Subject: [PATCH 5/5] remove redundant timestamp conversion in ws serializer --- freqtrade/rpc/api_server/ws/serializer.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/serializer.py b/freqtrade/rpc/api_server/ws/serializer.py index 8d06746f7..6c402a100 100644 --- a/freqtrade/rpc/api_server/ws/serializer.py +++ b/freqtrade/rpc/api_server/ws/serializer.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod import orjson import rapidjson -from pandas import DataFrame, Timestamp +from pandas import DataFrame from freqtrade.misc import dataframe_to_json, json_to_dataframe from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy @@ -52,11 +52,6 @@ def _json_default(z): '__type__': 'dataframe', '__value__': dataframe_to_json(z) } - # Pandas returns a Timestamp object, we need to - # convert it to a timestamp int (with ms) for orjson - # to handle it - if isinstance(z, Timestamp): - return z.timestamp() * 1e3 raise TypeError