Merge branch 'develop' into backtest_fitlivepredictions

This commit is contained in:
Wagner Costa Santos 2022-11-18 17:53:34 -03:00
commit 3d3195847c
3 changed files with 36 additions and 21 deletions

View File

@ -10,7 +10,8 @@ from typing import Any, Dict, Iterator, List, Mapping, Union
from typing.io import IO from typing.io import IO
from urllib.parse import urlparse from urllib.parse import urlparse
import pandas import orjson
import pandas as pd
import rapidjson import rapidjson
from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN from freqtrade.constants import DECIMAL_PER_COIN_FALLBACK, DECIMALS_PER_COIN
@ -256,29 +257,37 @@ def parse_db_uri_for_logging(uri: str):
return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@') return parsed_db_uri.geturl().replace(f':{pwd}@', ':*****@')
def dataframe_to_json(dataframe: pandas.DataFrame) -> str: def dataframe_to_json(dataframe: pd.DataFrame) -> str:
""" """
Serialize a DataFrame for transmission over the wire using JSON Serialize a DataFrame for transmission over the wire using JSON
:param dataframe: A pandas DataFrame :param dataframe: A pandas DataFrame
:returns: A JSON string of the pandas DataFrame :returns: A JSON string of the pandas DataFrame
""" """
return dataframe.to_json(orient='split') # https://github.com/pandas-dev/pandas/issues/24889
# https://github.com/pandas-dev/pandas/issues/40443
# We need to convert to a dict to avoid mem leak
def default(z):
if isinstance(z, pd.Timestamp):
return z.timestamp() * 1e3
raise TypeError
return str(orjson.dumps(dataframe.to_dict(orient='split'), default=default), 'utf-8')
def json_to_dataframe(data: str) -> pandas.DataFrame: def json_to_dataframe(data: str) -> pd.DataFrame:
""" """
Deserialize JSON into a DataFrame Deserialize JSON into a DataFrame
:param data: A JSON string :param data: A JSON string
:returns: A pandas DataFrame from the JSON string :returns: A pandas DataFrame from the JSON string
""" """
dataframe = pandas.read_json(data, orient='split') dataframe = pd.read_json(data, orient='split')
if 'date' in dataframe.columns: if 'date' in dataframe.columns:
dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) dataframe['date'] = pd.to_datetime(dataframe['date'], unit='ms', utc=True)
return dataframe return dataframe
def remove_entry_exit_signals(dataframe: pandas.DataFrame): def remove_entry_exit_signals(dataframe: pd.DataFrame):
""" """
Remove Entry and Exit signals from a DataFrame Remove Entry and Exit signals from a DataFrame

View File

@ -194,6 +194,9 @@ class ApiServer(RPCHandler):
try: try:
while True: while True:
logger.debug("Getting queue messages...") logger.debug("Getting queue messages...")
if (qsize := async_queue.qsize()) > 20:
# If the queue becomes too big for too long, this may indicate a problem.
logger.warning(f"Queue size now {qsize}")
# Get data from queue # Get data from queue
message: WSMessageSchemaType = await async_queue.get() message: WSMessageSchemaType = await async_queue.get()
logger.debug(f"Found message of type: {message.get('type')}") logger.debug(f"Found message of type: {message.get('type')}")

View File

@ -77,21 +77,24 @@ class WebSocketChannel:
# until self.drain_timeout for the relay to drain the outgoing queue # until self.drain_timeout for the relay to drain the outgoing queue
# We can't use asyncio.wait_for here because the queue may have been created with a # We can't use asyncio.wait_for here because the queue may have been created with a
# different eventloop # different eventloop
start = time.time() if not self.is_closed():
while self.queue.full(): start = time.time()
await asyncio.sleep(1) while self.queue.full():
if (time.time() - start) > self.drain_timeout: await asyncio.sleep(1)
if (time.time() - start) > self.drain_timeout:
return False
# If for some reason the queue is still full, just return False
try:
self.queue.put_nowait(data)
except asyncio.QueueFull:
return False return False
# If for some reason the queue is still full, just return False # If we got here everything is ok
try: return True
self.queue.put_nowait(data) else:
except asyncio.QueueFull:
return False return False
# If we got here everything is ok
return True
async def recv(self): async def recv(self):
""" """
Receive data on the wrapped websocket Receive data on the wrapped websocket
@ -109,14 +112,14 @@ class WebSocketChannel:
Close the WebSocketChannel Close the WebSocketChannel
""" """
self._closed.set()
self._relay_task.cancel()
try: try:
await self.raw_websocket.close() await self.raw_websocket.close()
except Exception: except Exception:
pass pass
self._closed.set()
self._relay_task.cancel()
def is_closed(self) -> bool: def is_closed(self) -> bool:
""" """
Closed flag Closed flag