update channel disconnecting

This commit is contained in:
Timothy Pogue 2022-11-02 13:26:27 -06:00
parent 09e0a8d4df
commit e25dea7e0e
2 changed files with 11 additions and 12 deletions

View File

@ -127,13 +127,6 @@ async def message_endpoint(
except Exception as e: except Exception as e:
logger.info(f"Consumer connection failed - {channel}: {e}") logger.info(f"Consumer connection failed - {channel}: {e}")
logger.debug(e, exc_info=e) logger.debug(e, exc_info=e)
finally:
await channel_manager.on_disconnect(ws)
else:
if channel:
await channel_manager.on_disconnect(ws)
await ws.close()
except RuntimeError: except RuntimeError:
# WebSocket was closed # WebSocket was closed
@ -144,4 +137,5 @@ async def message_endpoint(
# Log tracebacks to keep track of what errors are happening # Log tracebacks to keep track of what errors are happening
logger.exception(e) logger.exception(e)
finally: finally:
await channel_manager.on_disconnect(ws) if channel:
await channel_manager.on_disconnect(ws)

View File

@ -46,7 +46,7 @@ class WebSocketChannel:
self._relay_task = asyncio.create_task(self.relay()) self._relay_task = asyncio.create_task(self.relay())
# Internal event to signify a closed websocket # Internal event to signify a closed websocket
self._closed = False self._closed = asyncio.Event()
# Wrap the WebSocket in the Serializing class # Wrap the WebSocket in the Serializing class
self._wrapped_ws = self._serializer_cls(self._websocket) self._wrapped_ws = self._serializer_cls(self._websocket)
@ -99,14 +99,19 @@ class WebSocketChannel:
Close the WebSocketChannel Close the WebSocketChannel
""" """
self._closed = True try:
await self.raw_websocket.close()
except Exception:
pass
self._closed.set()
self._relay_task.cancel() self._relay_task.cancel()
def is_closed(self) -> bool: def is_closed(self) -> bool:
""" """
Closed flag Closed flag
""" """
return self._closed return self._closed.is_set()
def set_subscriptions(self, subscriptions: List[str] = []) -> None: def set_subscriptions(self, subscriptions: List[str] = []) -> None:
""" """
@ -129,7 +134,7 @@ class WebSocketChannel:
Relay messages from the channel's queue and send them out. This is started Relay messages from the channel's queue and send them out. This is started
as a task. as a task.
""" """
while True: while not self._closed.is_set():
message = await self.queue.get() message = await self.queue.get()
try: try:
await self._send(message) await self._send(message)