change from bytes to text in websocket, remove old logs
This commit is contained in:
parent
8a08f8ff8d
commit
8f261d8edf
@ -67,8 +67,6 @@ async def _process_consumer_request(
|
|||||||
# They requested the full historical analyzed dataframes
|
# They requested the full historical analyzed dataframes
|
||||||
analyzed_df = rpc._ws_request_analyzed_df(limit)
|
analyzed_df = rpc._ws_request_analyzed_df(limit)
|
||||||
|
|
||||||
logger.debug(f"ANALYZED_DF RESULT: {analyzed_df}")
|
|
||||||
|
|
||||||
# For every dataframe, send as a separate message
|
# For every dataframe, send as a separate message
|
||||||
for _, message in analyzed_df.items():
|
for _, message in analyzed_df.items():
|
||||||
await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message})
|
await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message})
|
||||||
|
@ -50,7 +50,6 @@ class WebSocketChannel:
|
|||||||
"""
|
"""
|
||||||
Send data on the wrapped websocket
|
Send data on the wrapped websocket
|
||||||
"""
|
"""
|
||||||
# logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}")
|
|
||||||
await self._wrapped_ws.send(data)
|
await self._wrapped_ws.send(data)
|
||||||
|
|
||||||
async def recv(self):
|
async def recv(self):
|
||||||
@ -168,8 +167,6 @@ class ChannelManager:
|
|||||||
:param direct_channel: The WebSocketChannel object to send data through
|
:param direct_channel: The WebSocketChannel object to send data through
|
||||||
:param data: The data to send
|
:param data: The data to send
|
||||||
"""
|
"""
|
||||||
# We iterate over the channels to get reference to the websocket object
|
|
||||||
# so we can disconnect incase of failure
|
|
||||||
await channel.send(data)
|
await channel.send(data)
|
||||||
|
|
||||||
def has_channels(self):
|
def has_channels(self):
|
||||||
|
@ -27,11 +27,14 @@ class WebSocketProxy:
|
|||||||
"""
|
"""
|
||||||
Send data on the wrapped websocket
|
Send data on the wrapped websocket
|
||||||
"""
|
"""
|
||||||
if isinstance(data, str):
|
|
||||||
data = data.encode()
|
|
||||||
|
|
||||||
if hasattr(self._websocket, "send_bytes"):
|
if not isinstance(data, str):
|
||||||
await self._websocket.send_bytes(data)
|
# We use HybridJSONWebSocketSerializer, which when serialized returns
|
||||||
|
# bytes because of ORJSON, so we explicitly decode into a string
|
||||||
|
data = str(data, "utf-8")
|
||||||
|
|
||||||
|
if hasattr(self._websocket, "send_text"):
|
||||||
|
await self._websocket.send_text(data)
|
||||||
else:
|
else:
|
||||||
await self._websocket.send(data)
|
await self._websocket.send(data)
|
||||||
|
|
||||||
@ -39,8 +42,8 @@ class WebSocketProxy:
|
|||||||
"""
|
"""
|
||||||
Receive data on the wrapped websocket
|
Receive data on the wrapped websocket
|
||||||
"""
|
"""
|
||||||
if hasattr(self._websocket, "receive_bytes"):
|
if hasattr(self._websocket, "receive_text"):
|
||||||
return await self._websocket.receive_bytes()
|
return await self._websocket.receive_text()
|
||||||
else:
|
else:
|
||||||
return await self._websocket.recv()
|
return await self._websocket.recv()
|
||||||
|
|
||||||
|
@ -58,9 +58,11 @@ class RapidJSONWebSocketSerializer(WebSocketSerializer):
|
|||||||
|
|
||||||
class HybridJSONWebSocketSerializer(WebSocketSerializer):
|
class HybridJSONWebSocketSerializer(WebSocketSerializer):
|
||||||
def _serialize(self, data):
|
def _serialize(self, data):
|
||||||
|
# ORJSON returns bytes
|
||||||
return orjson.dumps(data, default=_json_default)
|
return orjson.dumps(data, default=_json_default)
|
||||||
|
|
||||||
def _deserialize(self, data):
|
def _deserialize(self, data):
|
||||||
|
# RapidJSON expects strings
|
||||||
return rapidjson.loads(data, object_hook=_json_object_hook)
|
return rapidjson.loads(data, object_hook=_json_object_hook)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user