diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index f3f6b852d..cafbaefcb 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -73,7 +73,7 @@ async def _process_consumer_request( whitelist = rpc._ws_request_whitelist() # Format response - response = WSWhitelistMessage(data=whitelist).dict(exclude_none=True) + response = WSWhitelistMessage(data=whitelist) # Send it back await channel_manager.send_direct(channel, response) @@ -89,7 +89,7 @@ async def _process_consumer_request( # For every dataframe, send as a separate message for _, message in analyzed_df.items(): - response = WSAnalyzedDFMessage(data=message).dict(exclude_none=True) + response = WSAnalyzedDFMessage(data=message) await channel_manager.send_direct(channel, response) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 92cb6dedc..942a3df70 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -10,6 +10,7 @@ from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer, WebSocketSerializer) from freqtrade.rpc.api_server.ws.types import WebSocketType +from freqtrade.rpc.api_server.ws_schemas import WSMessageSchema logger = logging.getLogger(__name__) @@ -54,8 +55,8 @@ class WebSocketChannel: return f"WebSocketChannel({self.channel_id}, {self.remote_addr})" @property - def raw(self): - return self._websocket.raw + def raw_websocket(self): + return self._websocket.raw_websocket @property def remote_addr(self): @@ -192,29 +193,26 @@ class ChannelManager: for websocket in self.channels.copy().keys(): await self.on_disconnect(websocket) - self.channels = dict() - - async def broadcast(self, data): + async def broadcast(self, message: WSMessageSchema): """ - Broadcast data on all Channels + Broadcast a message on all Channels - :param data: The data to send + :param message: The message to send """ with self._lock: - message_type = data.get('type') for channel in self.channels.copy().values(): - if channel.subscribed_to(message_type): - await self.send_direct(channel, data) + if channel.subscribed_to(message.type): + await self.send_direct(channel, message) - async def send_direct(self, channel, data): + async def send_direct(self, channel: WebSocketChannel, message: WSMessageSchema): """ - Send data directly through direct_channel only + Send a message directly through direct_channel only - :param direct_channel: The WebSocketChannel object to send data through - :param data: The data to send + :param direct_channel: The WebSocketChannel object to send the message through + :param message: The message to send """ - if not await channel.send(data): - await self.on_disconnect(channel.raw) + if not await channel.send(message.dict(exclude_none=True)): + await self.on_disconnect(channel.raw_websocket) def has_channels(self): """ diff --git a/freqtrade/rpc/api_server/ws/proxy.py b/freqtrade/rpc/api_server/ws/proxy.py index 8518709aa..ae123dd2d 100644 --- a/freqtrade/rpc/api_server/ws/proxy.py +++ b/freqtrade/rpc/api_server/ws/proxy.py @@ -16,7 +16,7 @@ class WebSocketProxy: self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket @property - def raw(self): + def raw_websocket(self): return self._websocket @property