diff --git a/freqtrade/misc.py b/freqtrade/misc.py index 49d33d46f..2d2c7513a 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -10,7 +10,8 @@ from typing import Any, Dict, Iterator, List, Mapping, Union from typing.io import IO from urllib.parse import urlparse -import pandas +import orjson +import pandas as pd import rapidjson from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN @@ -256,29 +257,37 @@ def parse_db_uri_for_logging(uri: str): return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@') -def dataframe_to_json(dataframe: pandas.DataFrame) -> str: +def dataframe_to_json(dataframe: pd.DataFrame) -> str: """ Serialize a DataFrame for transmission over the wire using JSON :param dataframe: A pandas DataFrame :returns: A JSON string of the pandas DataFrame """ - return dataframe.to_json(orient='split') + # https://github.com/pandas-dev/pandas/issues/24889 + # https://github.com/pandas-dev/pandas/issues/40443 + # We need to convert to a dict to avoid mem leak + def default(z): + if isinstance(z, pd.Timestamp): + return z.timestamp() * 1e3 + raise TypeError + + return str(orjson.dumps(dataframe.to_dict(orient='split'), default=default), 'utf-8') -def json_to_dataframe(data: str) -> pandas.DataFrame: +def json_to_dataframe(data: str) -> pd.DataFrame: """ Deserialize JSON into a DataFrame :param data: A JSON string :returns: A pandas DataFrame from the JSON string """ - dataframe = pandas.read_json(data, orient='split') + dataframe = pd.read_json(data, orient='split') if 'date' in dataframe.columns: - dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) + dataframe['date'] = pd.to_datetime(dataframe['date'], unit='ms', utc=True) return dataframe -def remove_entry_exit_signals(dataframe: pandas.DataFrame): +def remove_entry_exit_signals(dataframe: pd.DataFrame): """ Remove Entry and Exit signals from a DataFrame diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 6464ae44e..ec4907e67 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -194,6 +194,9 @@ class ApiServer(RPCHandler): try: while True: logger.debug("Getting queue messages...") + if (qsize := async_queue.qsize()) > 20: + # If the queue becomes too big for too long, this may indicate a problem. + logger.warning(f"Queue size now {qsize}") # Get data from queue message: WSMessageSchemaType = await async_queue.get() logger.debug(f"Found message of type: {message.get('type')}") diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 88b4db9ba..4eef738d4 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -77,21 +77,24 @@ class WebSocketChannel: # until self.drain_timeout for the relay to drain the outgoing queue # We can't use asyncio.wait_for here because the queue may have been created with a # different eventloop - start = time.time() - while self.queue.full(): - await asyncio.sleep(1) - if (time.time() - start) > self.drain_timeout: + if not self.is_closed(): + start = time.time() + while self.queue.full(): + await asyncio.sleep(1) + if (time.time() - start) > self.drain_timeout: + return False + + # If for some reason the queue is still full, just return False + try: + self.queue.put_nowait(data) + except asyncio.QueueFull: return False - # If for some reason the queue is still full, just return False - try: - self.queue.put_nowait(data) - except asyncio.QueueFull: + # If we got here everything is ok + return True + else: return False - # If we got here everything is ok - return True - async def recv(self): """ Receive data on the wrapped websocket @@ -109,14 +112,14 @@ class WebSocketChannel: Close the WebSocketChannel """ + self._closed.set() + self._relay_task.cancel() + try: await self.raw_websocket.close() except Exception: pass - self._closed.set() - self._relay_task.cancel() - def is_closed(self) -> bool: """ Closed flag