diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index 90302f88e..ef3067f38 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -17,7 +17,6 @@ from freqtrade.data.history import load_pair_history from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exchange import Exchange, timeframe_to_seconds -from freqtrade.misc import dataframe_to_json from freqtrade.rpc import RPCManager from freqtrade.util import PeriodicCache @@ -119,7 +118,7 @@ class DataProvider: 'type': RPCMessageType.ANALYZED_DF, 'data': { 'key': pair_key, - 'value': dataframe_to_json(dataframe) + 'value': dataframe } } ) diff --git a/freqtrade/misc.py b/freqtrade/misc.py index ceace4ed8..6a93b6f26 100644 --- a/freqtrade/misc.py +++ b/freqtrade/misc.py @@ -269,7 +269,8 @@ def json_to_dataframe(data: str) -> pandas.DataFrame: :returns: A pandas DataFrame from the JSON string """ dataframe = pandas.read_json(data) - dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) + if 'date' in dataframe.columns: + dataframe['date'] = pandas.to_datetime(dataframe['date'], unit='ms', utc=True) return dataframe diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 6bc5b9d6b..b47fe7550 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -3,7 +3,7 @@ from threading import RLock from typing import List, Type from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy -from freqtrade.rpc.api_server.ws.serializer import ORJSONWebSocketSerializer, WebSocketSerializer +from freqtrade.rpc.api_server.ws.serializer import RapidJSONWebSocketSerializer, WebSocketSerializer from freqtrade.rpc.api_server.ws.types import WebSocketType @@ -18,7 +18,7 @@ class WebSocketChannel: def __init__( self, websocket: WebSocketType, - serializer_cls: Type[WebSocketSerializer] = ORJSONWebSocketSerializer + serializer_cls: Type[WebSocketSerializer] = RapidJSONWebSocketSerializer ): # The WebSocket object self._websocket = WebSocketProxy(websocket) diff --git a/freqtrade/rpc/api_server/ws/serializer.py b/freqtrade/rpc/api_server/ws/serializer.py index ae2857f0b..c11ca9a99 100644 --- a/freqtrade/rpc/api_server/ws/serializer.py +++ b/freqtrade/rpc/api_server/ws/serializer.py @@ -3,8 +3,10 @@ import logging from abc import ABC, abstractmethod import msgpack -import orjson +import rapidjson +from pandas import DataFrame +from freqtrade.misc import dataframe_to_json, json_to_dataframe from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy @@ -34,27 +36,23 @@ class WebSocketSerializer(ABC): async def close(self, code: int = 1000): await self._websocket.close(code) -# Going to explore using MsgPack as the serialization, -# as that might be the best method for sending pandas -# dataframes over the wire - class JSONWebSocketSerializer(WebSocketSerializer): def _serialize(self, data): - return json.dumps(data) + return json.dumps(data, default=_json_default) def _deserialize(self, data): - return json.loads(data) + return json.loads(data, object_hook=_json_object_hook) -class ORJSONWebSocketSerializer(WebSocketSerializer): - ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY +# ORJSON does not support .loads(object_hook=x) parameter, so we must use RapidJSON +class RapidJSONWebSocketSerializer(WebSocketSerializer): def _serialize(self, data): - return orjson.dumps(data, option=self.ORJSON_OPTIONS) + return rapidjson.dumps(data, default=_json_default) def _deserialize(self, data): - return orjson.loads(data) + return rapidjson.loads(data, object_hook=_json_object_hook) class MsgPackWebSocketSerializer(WebSocketSerializer): @@ -63,3 +61,20 @@ class MsgPackWebSocketSerializer(WebSocketSerializer): def _deserialize(self, data): return msgpack.unpackb(data, raw=False) + + +# Support serializing pandas DataFrames +def _json_default(z): + if isinstance(z, DataFrame): + return { + '__type__': 'dataframe', + '__value__': dataframe_to_json(z) + } + raise TypeError + + +# Support deserializing JSON to pandas DataFrames +def _json_object_hook(z): + if z.get('__type__') == 'dataframe': + return json_to_dataframe(z.get('__value__')) + return z diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 4544afc29..4c7f6570d 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -10,11 +10,12 @@ import socket from threading import Thread from typing import Any, Dict, Optional +import pandas import websockets from freqtrade.data.dataprovider import DataProvider from freqtrade.enums import RPCMessageType, RPCRequestType -from freqtrade.misc import json_to_dataframe, remove_entry_exit_signals +from freqtrade.misc import remove_entry_exit_signals from freqtrade.rpc.api_server.ws.channel import WebSocketChannel @@ -262,11 +263,9 @@ class ExternalMessageConsumer: key, value = message_data.get('key'), message_data.get('value') - if key and value: + if key and isinstance(value, pandas.DataFrame): pair, timeframe, candle_type = key - - # Convert the JSON to a pandas DataFrame - dataframe = json_to_dataframe(value) + dataframe = value # If set, remove the Entry and Exit signals from the Producer if self._emc_config.get('remove_entry_exit_signals', False): diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index c4752c570..96b43f36b 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -24,7 +24,7 @@ from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirecti from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler -from freqtrade.misc import dataframe_to_json, decimals_per_coin, shorten_date +from freqtrade.misc import decimals_per_coin, shorten_date from freqtrade.persistence import PairLocks, Trade from freqtrade.persistence.models import PairLock from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist @@ -1064,10 +1064,7 @@ class RPC: for pair in pairlist: dataframe, last_analyzed = self.__rpc_analysed_dataframe_raw(pair, timeframe, limit) - _data[pair] = { - "key": (pair, timeframe, candle_type), - "value": dataframe_to_json(dataframe) - } + _data[pair] = {"key": (pair, timeframe, candle_type), "value": dataframe} return _data