From 53c8e0923fe2fc2d0048f004c89a452057c01a8b Mon Sep 17 00:00:00 2001 From: Matthias Date: Sat, 24 Sep 2022 16:10:42 +0200 Subject: [PATCH] Improve typing in message_consumer --- freqtrade/rpc/external_message_consumer.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 99ba39f76..bf71c24ea 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -8,7 +8,7 @@ import asyncio import logging import socket from threading import Thread -from typing import TYPE_CHECKING, Any, Callable, Dict, List +from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict import websockets from pydantic import ValidationError @@ -29,6 +29,13 @@ if TYPE_CHECKING: import websockets.exceptions +class Producer(TypedDict): + name: str + host: str + port: int + ws_token: str + + logger = logging.getLogger(__name__) @@ -55,7 +62,7 @@ class ExternalMessageConsumer: self._emc_config = self._config.get('external_message_consumer', {}) self.enabled = self._emc_config.get('enabled', False) - self.producers = self._emc_config.get('producers', []) + self.producers: List[Producer] = self._emc_config.get('producers', []) self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds @@ -162,7 +169,7 @@ class ExternalMessageConsumer: # Stop the loop once we are done self._loop.stop() - async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock): + async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock): """ Main connection loop for the consumer @@ -175,7 +182,7 @@ class ExternalMessageConsumer: # Exit silently pass - async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock): + async def _create_connection(self, producer: Producer, lock: asyncio.Lock): """ Actually creates and handles the websocket connection, pinging on timeout and handling connection errors. @@ -236,7 +243,7 @@ class ExternalMessageConsumer: async def _receive_messages( self, channel: WebSocketChannel, - producer: Dict[str, Any], + producer: Producer, lock: asyncio.Lock ): """ @@ -277,7 +284,7 @@ class ExternalMessageConsumer: break - def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]): + def handle_producer_message(self, producer: Producer, message: Dict[str, Any]): """ Handles external messages from a Producer """