From 2b6d00dde449934db8789c860d5e0e9dc9c528ab Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sat, 22 Oct 2022 09:30:18 -0600 Subject: [PATCH 1/7] initial channel api change --- freqtrade/rpc/api_server/ws/channel.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index e9dbd63be..a1334bce9 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -24,6 +24,7 @@ class WebSocketChannel: self, websocket: WebSocketType, channel_id: Optional[str] = None, + drain_timeout: int = 3, serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer ): @@ -34,6 +35,8 @@ class WebSocketChannel: # The Serializing class for the WebSocket object self._serializer_cls = serializer_cls + self.drain_timeout = drain_timeout + self._subscriptions: List[str] = [] self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32) self._relay_task = asyncio.create_task(self.relay()) @@ -57,11 +60,19 @@ class WebSocketChannel: """ await self._wrapped_ws.send(data) - async def send(self, data): + async def send(self, data) -> bool: """ - Add the data to the queue to be sent + Add the data to the queue to be sent. + :returns: True if data added to queue, False otherwise """ - self.queue.put_nowait(data) + try: + await asyncio.wait_for( + self.queue.put(data), + timeout=self.drain_timeout + ) + return True + except asyncio.TimeoutError: + return False async def recv(self): """ @@ -119,8 +130,8 @@ class WebSocketChannel: # Limit messages per sec. # Could cause problems with queue size if too low, and # problems with network traffik if too high. - # 0.001 = 1000/s - await asyncio.sleep(0.001) + # 0.01 = 100/s + await asyncio.sleep(0.01) except RuntimeError: # The connection was closed, just exit the task return @@ -186,9 +197,7 @@ class ChannelManager: message_type = data.get('type') for websocket, channel in self.channels.copy().items(): if channel.subscribed_to(message_type): - if not channel.queue.full(): - await channel.send(data) - else: + if not await channel.send(data): logger.info(f"Channel {channel} is too far behind, disconnecting") await self.on_disconnect(websocket) From 3d7a311caa3c58dbc4cfcfbe2e29a15abcfeffae Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sat, 22 Oct 2022 19:02:05 -0600 Subject: [PATCH 2/7] removed sleep calls, better channel sending --- freqtrade/rpc/api_server/api_ws.py | 17 ++++++++-------- freqtrade/rpc/api_server/webserver.py | 4 ---- freqtrade/rpc/api_server/ws/channel.py | 23 +++++++++++++--------- freqtrade/rpc/api_server/ws/proxy.py | 4 ++++ freqtrade/rpc/external_message_consumer.py | 5 +++++ 5 files changed, 31 insertions(+), 22 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 2f490b8a8..f3f6b852d 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -1,4 +1,3 @@ -import asyncio import logging from typing import Any, Dict @@ -11,6 +10,7 @@ from freqtrade.enums import RPCMessageType, RPCRequestType from freqtrade.rpc.api_server.api_auth import validate_ws_token from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc from freqtrade.rpc.api_server.ws import WebSocketChannel +from freqtrade.rpc.api_server.ws.channel import ChannelManager from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSMessageSchema, WSRequestSchema, WSWhitelistMessage) from freqtrade.rpc.rpc import RPC @@ -37,7 +37,8 @@ async def is_websocket_alive(ws: WebSocket) -> bool: async def _process_consumer_request( request: Dict[str, Any], channel: WebSocketChannel, - rpc: RPC + rpc: RPC, + channel_manager: ChannelManager ): """ Validate and handle a request from a websocket consumer @@ -72,9 +73,9 @@ async def _process_consumer_request( whitelist = rpc._ws_request_whitelist() # Format response - response = WSWhitelistMessage(data=whitelist) + response = WSWhitelistMessage(data=whitelist).dict(exclude_none=True) # Send it back - await channel.send(response.dict(exclude_none=True)) + await channel_manager.send_direct(channel, response) elif type == RPCRequestType.ANALYZED_DF: limit = None @@ -88,10 +89,8 @@ async def _process_consumer_request( # For every dataframe, send as a separate message for _, message in analyzed_df.items(): - response = WSAnalyzedDFMessage(data=message) - await channel.send(response.dict(exclude_none=True)) - # Throttle the messages to 50/s - await asyncio.sleep(0.02) + response = WSAnalyzedDFMessage(data=message).dict(exclude_none=True) + await channel_manager.send_direct(channel, response) @router.websocket("/message/ws") @@ -116,7 +115,7 @@ async def message_endpoint( request = await channel.recv() # Process the request here - await _process_consumer_request(request, channel, rpc) + await _process_consumer_request(request, channel, rpc, channel_manager) except (WebSocketDisconnect, WebSocketException): # Handle client disconnects diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 4a09fd78e..c6639f1a6 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -198,10 +198,6 @@ class ApiServer(RPCHandler): logger.debug(f"Found message of type: {message.get('type')}") # Broadcast it await self._ws_channel_manager.broadcast(message) - # Limit messages per sec. - # Could cause problems with queue size if too low, and - # problems with network traffik if too high. - await asyncio.sleep(0.001) except asyncio.CancelledError: pass diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index a1334bce9..4afca0d33 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -25,6 +25,7 @@ class WebSocketChannel: websocket: WebSocketType, channel_id: Optional[str] = None, drain_timeout: int = 3, + throttle: float = 0.01, serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer ): @@ -36,6 +37,7 @@ class WebSocketChannel: self._serializer_cls = serializer_cls self.drain_timeout = drain_timeout + self.throttle = throttle self._subscriptions: List[str] = [] self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32) @@ -50,6 +52,10 @@ class WebSocketChannel: def __repr__(self): return f"WebSocketChannel({self.channel_id}, {self.remote_addr})" + @property + def raw(self): + return self._websocket.raw + @property def remote_addr(self): return self._websocket.remote_addr @@ -131,7 +137,7 @@ class WebSocketChannel: # Could cause problems with queue size if too low, and # problems with network traffik if too high. # 0.01 = 100/s - await asyncio.sleep(0.01) + await asyncio.sleep(self.throttle) except RuntimeError: # The connection was closed, just exit the task return @@ -171,6 +177,7 @@ class ChannelManager: with self._lock: channel = self.channels.get(websocket) if channel: + logger.info(f"Disconnecting channel {channel}") if not channel.is_closed(): await channel.close() @@ -181,9 +188,8 @@ class ChannelManager: Disconnect all Channels """ with self._lock: - for websocket, channel in self.channels.copy().items(): - if not channel.is_closed(): - await channel.close() + for websocket in self.channels.copy().keys(): + await self.on_disconnect(websocket) self.channels = dict() @@ -195,11 +201,9 @@ class ChannelManager: """ with self._lock: message_type = data.get('type') - for websocket, channel in self.channels.copy().items(): + for channel in self.channels.copy().values(): if channel.subscribed_to(message_type): - if not await channel.send(data): - logger.info(f"Channel {channel} is too far behind, disconnecting") - await self.on_disconnect(websocket) + await self.send_direct(channel, data) async def send_direct(self, channel, data): """ @@ -208,7 +212,8 @@ class ChannelManager: :param direct_channel: The WebSocketChannel object to send data through :param data: The data to send """ - await channel.send(data) + if not await channel.send(data): + await self.on_disconnect(channel.raw) def has_channels(self): """ diff --git a/freqtrade/rpc/api_server/ws/proxy.py b/freqtrade/rpc/api_server/ws/proxy.py index 2e5a59f05..8518709aa 100644 --- a/freqtrade/rpc/api_server/ws/proxy.py +++ b/freqtrade/rpc/api_server/ws/proxy.py @@ -15,6 +15,10 @@ class WebSocketProxy: def __init__(self, websocket: WebSocketType): self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket + @property + def raw(self): + return self._websocket + @property def remote_addr(self) -> Tuple[Any, ...]: if isinstance(self._websocket, WebSocket): diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 01bc974ad..e86f44c17 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -270,6 +270,11 @@ class ExternalMessageConsumer: logger.debug(f"Connection to {channel} still alive...") continue + except (websockets.exceptions.ConnectionClosed): + # Just eat the error and continue reconnecting + logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + break except Exception as e: logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) From 9cffa3ca2b68ff66b5e6cfd8106b41cdb430cf79 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sat, 22 Oct 2022 21:03:57 -0600 Subject: [PATCH 3/7] add comment in channel --- freqtrade/rpc/api_server/ws/channel.py | 1 + 1 file changed, 1 insertion(+) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 4afca0d33..92cb6dedc 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -40,6 +40,7 @@ class WebSocketChannel: self.throttle = throttle self._subscriptions: List[str] = [] + # 32 is the size of the receiving queue in websockets package self.queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue(maxsize=32) self._relay_task = asyncio.create_task(self.relay()) From 94b65a007a6e2e163f18eb88647412e8f6b04860 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sun, 23 Oct 2022 11:42:59 -0600 Subject: [PATCH 4/7] fix message typing in channel manager, minor improvements --- freqtrade/rpc/api_server/api_ws.py | 4 ++-- freqtrade/rpc/api_server/ws/channel.py | 30 ++++++++++++-------------- freqtrade/rpc/api_server/ws/proxy.py | 2 +- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index f3f6b852d..cafbaefcb 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -73,7 +73,7 @@ async def _process_consumer_request( whitelist = rpc._ws_request_whitelist() # Format response - response = WSWhitelistMessage(data=whitelist).dict(exclude_none=True) + response = WSWhitelistMessage(data=whitelist) # Send it back await channel_manager.send_direct(channel, response) @@ -89,7 +89,7 @@ async def _process_consumer_request( # For every dataframe, send as a separate message for _, message in analyzed_df.items(): - response = WSAnalyzedDFMessage(data=message).dict(exclude_none=True) + response = WSAnalyzedDFMessage(data=message) await channel_manager.send_direct(channel, response) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 92cb6dedc..942a3df70 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -10,6 +10,7 @@ from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer, WebSocketSerializer) from freqtrade.rpc.api_server.ws.types import WebSocketType +from freqtrade.rpc.api_server.ws_schemas import WSMessageSchema logger = logging.getLogger(__name__) @@ -54,8 +55,8 @@ class WebSocketChannel: return f"WebSocketChannel({self.channel_id}, {self.remote_addr})" @property - def raw(self): - return self._websocket.raw + def raw_websocket(self): + return self._websocket.raw_websocket @property def remote_addr(self): @@ -192,29 +193,26 @@ class ChannelManager: for websocket in self.channels.copy().keys(): await self.on_disconnect(websocket) - self.channels = dict() - - async def broadcast(self, data): + async def broadcast(self, message: WSMessageSchema): """ - Broadcast data on all Channels + Broadcast a message on all Channels - :param data: The data to send + :param message: The message to send """ with self._lock: - message_type = data.get('type') for channel in self.channels.copy().values(): - if channel.subscribed_to(message_type): - await self.send_direct(channel, data) + if channel.subscribed_to(message.type): + await self.send_direct(channel, message) - async def send_direct(self, channel, data): + async def send_direct(self, channel: WebSocketChannel, message: WSMessageSchema): """ - Send data directly through direct_channel only + Send a message directly through direct_channel only - :param direct_channel: The WebSocketChannel object to send data through - :param data: The data to send + :param direct_channel: The WebSocketChannel object to send the message through + :param message: The message to send """ - if not await channel.send(data): - await self.on_disconnect(channel.raw) + if not await channel.send(message.dict(exclude_none=True)): + await self.on_disconnect(channel.raw_websocket) def has_channels(self): """ diff --git a/freqtrade/rpc/api_server/ws/proxy.py b/freqtrade/rpc/api_server/ws/proxy.py index 8518709aa..ae123dd2d 100644 --- a/freqtrade/rpc/api_server/ws/proxy.py +++ b/freqtrade/rpc/api_server/ws/proxy.py @@ -16,7 +16,7 @@ class WebSocketProxy: self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket @property - def raw(self): + def raw_websocket(self): return self._websocket @property From 32600a113f7aed46e241a6de905ceb7be2f45f25 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 24 Oct 2022 12:21:17 -0600 Subject: [PATCH 5/7] fix broadcast --- freqtrade/rpc/api_server/webserver.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index c6639f1a6..22a05f07b 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -16,6 +16,7 @@ from freqtrade.constants import Config from freqtrade.exceptions import OperationalException from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer from freqtrade.rpc.api_server.ws import ChannelManager +from freqtrade.rpc.api_server.ws_schemas import WSMessageSchema from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler @@ -127,10 +128,10 @@ class ApiServer(RPCHandler): cls._has_rpc = False cls._rpc = None - def send_msg(self, msg: Dict[str, str]) -> None: + def send_msg(self, msg: Dict[str, Any]) -> None: if self._ws_queue: sync_q = self._ws_queue.sync_q - sync_q.put(msg) + sync_q.put(WSMessageSchema(**msg)) def handle_rpc_exception(self, request, exc): logger.exception(f"API Error calling: {exc}") @@ -194,8 +195,8 @@ class ApiServer(RPCHandler): while True: logger.debug("Getting queue messages...") # Get data from queue - message = await async_queue.get() - logger.debug(f"Found message of type: {message.get('type')}") + message: WSMessageSchema = await async_queue.get() + logger.debug(f"Found message of type: {message.type}") # Broadcast it await self._ws_channel_manager.broadcast(message) except asyncio.CancelledError: From 3fa50077c945d9273ca98eef8f4e4f1ec31f6238 Mon Sep 17 00:00:00 2001 From: Matthias Date: Tue, 25 Oct 2022 19:36:40 +0200 Subject: [PATCH 6/7] Don't use pydantic to type-verify outgoing messages --- freqtrade/rpc/api_server/api_ws.py | 4 ++-- freqtrade/rpc/api_server/webserver.py | 8 ++++---- freqtrade/rpc/api_server/ws/channel.py | 13 +++++++------ freqtrade/rpc/api_server/ws_schemas.py | 8 +++++++- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index cafbaefcb..c33f9c730 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -75,7 +75,7 @@ async def _process_consumer_request( # Format response response = WSWhitelistMessage(data=whitelist) # Send it back - await channel_manager.send_direct(channel, response) + await channel_manager.send_direct(channel, response.dict(exclude_none=True)) elif type == RPCRequestType.ANALYZED_DF: limit = None @@ -90,7 +90,7 @@ async def _process_consumer_request( # For every dataframe, send as a separate message for _, message in analyzed_df.items(): response = WSAnalyzedDFMessage(data=message) - await channel_manager.send_direct(channel, response) + await channel_manager.send_direct(channel, response.dict(exclude_none=True)) @router.websocket("/message/ws") diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 22a05f07b..1d0192a89 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -16,7 +16,7 @@ from freqtrade.constants import Config from freqtrade.exceptions import OperationalException from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer from freqtrade.rpc.api_server.ws import ChannelManager -from freqtrade.rpc.api_server.ws_schemas import WSMessageSchema +from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler @@ -131,7 +131,7 @@ class ApiServer(RPCHandler): def send_msg(self, msg: Dict[str, Any]) -> None: if self._ws_queue: sync_q = self._ws_queue.sync_q - sync_q.put(WSMessageSchema(**msg)) + sync_q.put(msg) def handle_rpc_exception(self, request, exc): logger.exception(f"API Error calling: {exc}") @@ -195,8 +195,8 @@ class ApiServer(RPCHandler): while True: logger.debug("Getting queue messages...") # Get data from queue - message: WSMessageSchema = await async_queue.get() - logger.debug(f"Found message of type: {message.type}") + message: WSMessageSchemaType = await async_queue.get() + logger.debug(f"Found message of type: {message.get('type')}") # Broadcast it await self._ws_channel_manager.broadcast(message) except asyncio.CancelledError: diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 942a3df70..34f03f0c4 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -1,7 +1,7 @@ import asyncio import logging from threading import RLock -from typing import Any, Dict, List, Optional, Type +from typing import Any, Dict, List, Optional, Type, Union from uuid import uuid4 from fastapi import WebSocket as FastAPIWebSocket @@ -10,7 +10,7 @@ from freqtrade.rpc.api_server.ws.proxy import WebSocketProxy from freqtrade.rpc.api_server.ws.serializer import (HybridJSONWebSocketSerializer, WebSocketSerializer) from freqtrade.rpc.api_server.ws.types import WebSocketType -from freqtrade.rpc.api_server.ws_schemas import WSMessageSchema +from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType logger = logging.getLogger(__name__) @@ -193,7 +193,7 @@ class ChannelManager: for websocket in self.channels.copy().keys(): await self.on_disconnect(websocket) - async def broadcast(self, message: WSMessageSchema): + async def broadcast(self, message: WSMessageSchemaType): """ Broadcast a message on all Channels @@ -201,17 +201,18 @@ class ChannelManager: """ with self._lock: for channel in self.channels.copy().values(): - if channel.subscribed_to(message.type): + if channel.subscribed_to(message.get('type')): await self.send_direct(channel, message) - async def send_direct(self, channel: WebSocketChannel, message: WSMessageSchema): + async def send_direct( + self, channel: WebSocketChannel, message: Union[WSMessageSchemaType, Dict[str, Any]]): """ Send a message directly through direct_channel only :param direct_channel: The WebSocketChannel object to send the message through :param message: The message to send """ - if not await channel.send(message.dict(exclude_none=True)): + if not await channel.send(message): await self.on_disconnect(channel.raw_websocket) def has_channels(self): diff --git a/freqtrade/rpc/api_server/ws_schemas.py b/freqtrade/rpc/api_server/ws_schemas.py index 255226d84..877232213 100644 --- a/freqtrade/rpc/api_server/ws_schemas.py +++ b/freqtrade/rpc/api_server/ws_schemas.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, TypedDict from pandas import DataFrame from pydantic import BaseModel @@ -18,6 +18,12 @@ class WSRequestSchema(BaseArbitraryModel): data: Optional[Any] = None +class WSMessageSchemaType(TypedDict): + # Type for typing to avoid doing pydantic typechecks. + type: RPCMessageType + data: Optional[Dict[str, Any]] + + class WSMessageSchema(BaseArbitraryModel): type: RPCMessageType data: Optional[Any] = None From fd5f31368c4335fb5fd61a82d60d7e910e71394e Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 25 Oct 2022 14:08:28 -0600 Subject: [PATCH 7/7] fix indent in initial df send --- freqtrade/rpc/api_server/api_ws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index c33f9c730..b230cbe2b 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -90,7 +90,7 @@ async def _process_consumer_request( # For every dataframe, send as a separate message for _, message in analyzed_df.items(): response = WSAnalyzedDFMessage(data=message) - await channel_manager.send_direct(channel, response.dict(exclude_none=True)) + await channel_manager.send_direct(channel, response.dict(exclude_none=True)) @router.websocket("/message/ws")