From d713af045fbd51df67825836d9fe3a17f1424622 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Mon, 14 Nov 2022 22:21:40 -0700 Subject: [PATCH] remove main queue completely --- freqtrade/rpc/api_server/api_ws.py | 3 +- freqtrade/rpc/api_server/webserver.py | 47 ++------------------------ freqtrade/rpc/api_server/ws/channel.py | 5 ++- 3 files changed, 6 insertions(+), 49 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index a9b88aadb..3f207eac3 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -148,7 +148,8 @@ async def message_endpoint( channel_broadcaster(channel, message_stream) ) await channel_tasks - + except WebSocketChannelClosed: + pass finally: logger.info(f"Channel disconnected - {channel}") channel_tasks.cancel() diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 7e2c3f39f..d0695e06d 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -1,4 +1,3 @@ -import asyncio import logging from ipaddress import IPv4Address from typing import Any, Dict @@ -7,15 +6,12 @@ import orjson import uvicorn from fastapi import Depends, FastAPI from fastapi.middleware.cors import CORSMiddleware -# Look into alternatives -from janus import Queue as ThreadedQueue from starlette.responses import JSONResponse from freqtrade.constants import Config from freqtrade.exceptions import OperationalException from freqtrade.rpc.api_server.uvicorn_threaded import UvicornServer from freqtrade.rpc.api_server.ws.message_stream import MessageStream -from freqtrade.rpc.api_server.ws_schemas import WSMessageSchemaType from freqtrade.rpc.rpc import RPC, RPCException, RPCHandler @@ -72,9 +68,6 @@ class ApiServer(RPCHandler): self._standalone: bool = standalone self._server = None - self._ws_queue = None - self._ws_publisher_task = None - ApiServer.__initialized = True api_config = self._config['api_server'] @@ -130,9 +123,8 @@ class ApiServer(RPCHandler): cls._rpc = None def send_msg(self, msg: Dict[str, Any]) -> None: - if self._ws_queue: - sync_q = self._ws_queue.sync_q - sync_q.put(msg) + if ApiServer._message_stream: + ApiServer._message_stream.publish(msg) def handle_rpc_exception(self, request, exc): logger.exception(f"API Error calling: {exc}") @@ -184,45 +176,10 @@ class ApiServer(RPCHandler): if not ApiServer._message_stream: ApiServer._message_stream = MessageStream() - if not self._ws_queue: - self._ws_queue = ThreadedQueue() - - if not self._ws_publisher_task: - self._ws_publisher_task = asyncio.create_task( - self._publish_messages() - ) - async def _api_shutdown_event(self): if ApiServer._message_stream: ApiServer._message_stream = None - if self._ws_queue: - self._ws_queue = None - - if self._ws_publisher_task: - self._ws_publisher_task.cancel() - - async def _publish_messages(self): - """ - Background task that reads messages from the queue and adds them - to the message stream - """ - try: - async_queue = self._ws_queue.async_q - message_stream = ApiServer._message_stream - - while message_stream: - message: WSMessageSchemaType = await async_queue.get() - message_stream.publish(message) - - # Make sure to throttle how fast we - # publish messages as some clients will be - # slower than others - await asyncio.sleep(0.01) - async_queue.task_done() - finally: - self._ws_queue = None - # def start_message_queue(self): # if self._ws_thread: # return diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index b98bd13c9..39c8db516 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -30,8 +30,8 @@ class WebSocketChannel: # Internal event to signify a closed websocket self._closed = asyncio.Event() - # Throttle how fast we send messages - self._throttle = 0.01 + # The subscribed message types + self._subscriptions: List[str] = [] # Wrap the WebSocket in the Serializing class self._wrapped_ws = serializer_cls(self._websocket) @@ -51,7 +51,6 @@ class WebSocketChannel: """ Send a message on the wrapped websocket """ - await asyncio.sleep(self._throttle) await self._wrapped_ws.send(message) async def recv(self):