Improve typing in message_consumer

This commit is contained in:
Matthias 2022-09-24 16:10:42 +02:00
parent 98ba57ffaa
commit 53c8e0923f

View File

@ -8,7 +8,7 @@ import asyncio
import logging
import socket
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict
import websockets
from pydantic import ValidationError
@ -29,6 +29,13 @@ if TYPE_CHECKING:
import websockets.exceptions
class Producer(TypedDict):
name: str
host: str
port: int
ws_token: str
logger = logging.getLogger(__name__)
@ -55,7 +62,7 @@ class ExternalMessageConsumer:
self._emc_config = self._config.get('external_message_consumer', {})
self.enabled = self._emc_config.get('enabled', False)
self.producers = self._emc_config.get('producers', [])
self.producers: List[Producer] = self._emc_config.get('producers', [])
self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
@ -162,7 +169,7 @@ class ExternalMessageConsumer:
# Stop the loop once we are done
self._loop.stop()
async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock):
"""
Main connection loop for the consumer
@ -175,7 +182,7 @@ class ExternalMessageConsumer:
# Exit silently
pass
async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
async def _create_connection(self, producer: Producer, lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
@ -236,7 +243,7 @@ class ExternalMessageConsumer:
async def _receive_messages(
self,
channel: WebSocketChannel,
producer: Dict[str, Any],
producer: Producer,
lock: asyncio.Lock
):
"""
@ -277,7 +284,7 @@ class ExternalMessageConsumer:
break
def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]):
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
"""
Handles external messages from a Producer
"""