From 426f8f37e9f45a8dc7df3bce11b1a0b3ace6c5b3 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 9 Sep 2022 10:45:49 -0600 Subject: [PATCH] change var names --- freqtrade/constants.py | 2 +- freqtrade/rpc/api_server/webserver.py | 52 +++++++++++----------- freqtrade/rpc/external_message_consumer.py | 8 ++-- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 2279acc13..e77940b3c 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -502,7 +502,7 @@ CONF_SCHEMA = { 'required': ['name', 'url', 'ws_token'] } }, - 'reply_timeout': {'type': 'integer'}, + 'wait_timeout': {'type': 'integer'}, 'sleep_time': {'type': 'integer'}, 'ping_timeout': {'type': 'integer'}, 'remove_signals_analyzed_df': {'type': 'boolean', 'default': False}, diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 73e80dd48..557857ecc 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -49,9 +49,9 @@ class ApiServer(RPCHandler): # Exchange - only available in webserver mode. _exchange = None # websocket message queue stuff - _channel_manager = None - _thread = None - _loop = None + _ws_channel_manager = None + _ws_thread = None + _ws_loop = None def __new__(cls, *args, **kwargs): """ @@ -69,14 +69,14 @@ class ApiServer(RPCHandler): return self._standalone: bool = standalone self._server = None - self._queue = None - self._background_task = None + self._ws_queue = None + self._ws_background_task = None ApiServer.__initialized = True api_config = self._config['api_server'] - ApiServer._channel_manager = ChannelManager() + ApiServer._ws_channel_manager = ChannelManager() self.app = FastAPI(title="Freqtrade API", docs_url='/docs' if api_config.get('enable_openapi', False) else None, @@ -105,18 +105,18 @@ class ApiServer(RPCHandler): logger.info("Stopping API Server") self._server.cleanup() - if self._thread and self._loop: + if self._ws_thread and self._ws_loop: logger.info("Stopping API Server background tasks") - if self._background_task: + if self._ws_background_task: # Cancel the queue task - self._background_task.cancel() + self._ws_background_task.cancel() - self._thread.join() + self._ws_thread.join() - self._thread = None - self._loop = None - self._background_task = None + self._ws_thread = None + self._ws_loop = None + self._ws_background_task = None @classmethod def shutdown(cls): @@ -127,8 +127,8 @@ class ApiServer(RPCHandler): cls._rpc = None def send_msg(self, msg: Dict[str, str]) -> None: - if self._queue: - sync_q = self._queue.sync_q + if self._ws_queue: + sync_q = self._ws_queue.sync_q sync_q.put(msg) def handle_rpc_exception(self, request, exc): @@ -170,24 +170,24 @@ class ApiServer(RPCHandler): app.add_exception_handler(RPCException, self.handle_rpc_exception) def start_message_queue(self): - if self._thread: + if self._ws_thread: return # Create a new loop, as it'll be just for the background thread - self._loop = asyncio.new_event_loop() + self._ws_loop = asyncio.new_event_loop() # Start the thread - self._thread = Thread(target=self._loop.run_forever) - self._thread.start() + self._ws_thread = Thread(target=self._ws_loop.run_forever) + self._ws_thread.start() # Finally, submit the coro to the thread - self._background_task = asyncio.run_coroutine_threadsafe( - self._broadcast_queue_data(), loop=self._loop) + self._ws_background_task = asyncio.run_coroutine_threadsafe( + self._broadcast_queue_data(), loop=self._ws_loop) async def _broadcast_queue_data(self): # Instantiate the queue in this coroutine so it's attached to our loop - self._queue = ThreadedQueue() - async_queue = self._queue.async_q + self._ws_queue = ThreadedQueue() + async_queue = self._ws_queue.async_q try: while True: @@ -196,13 +196,13 @@ class ApiServer(RPCHandler): message = await async_queue.get() logger.debug(f"Found message of type: {message.get('type')}") # Broadcast it - await self._channel_manager.broadcast(message) + await self._ws_channel_manager.broadcast(message) # Sleep, make this configurable? await asyncio.sleep(0.1) except asyncio.CancelledError: # Disconnect channels and stop the loop on cancel - await self._channel_manager.disconnect_all() - self._loop.stop() + await self._ws_channel_manager.disconnect_all() + self._ws_loop.stop() # For testing, shouldn't happen when stable except Exception as e: diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index abeedb0a4..1c2a27617 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -59,9 +59,9 @@ class ExternalMessageConsumer: if self.enabled and len(self.producers) < 1: raise ValueError("You must specify at least 1 Producer to connect to.") - self.reply_timeout = self._emc_config.get('reply_timeout', 30) - self.ping_timeout = self._emc_config.get('ping_timeout', 5) - self.sleep_time = self._emc_config.get('sleep_time', 10) + self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds + self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds + self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds # The amount of candles per dataframe on the initial request self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500) @@ -220,7 +220,7 @@ class ExternalMessageConsumer: try: message = await asyncio.wait_for( channel.recv(), - timeout=self.reply_timeout + timeout=self.wait_timeout ) try: