update ws channel send to add data to queue

This commit is contained in:
Timothy Pogue 2022-10-09 18:49:04 -06:00
parent 2c76dd9e39
commit 71bbffd10a
2 changed files with 14 additions and 3 deletions

View File

@ -51,12 +51,18 @@ class WebSocketChannel:
def remote_addr(self): def remote_addr(self):
return self._websocket.remote_addr return self._websocket.remote_addr
async def send(self, data): async def _send(self, data):
""" """
Send data on the wrapped websocket Send data on the wrapped websocket
""" """
await self._wrapped_ws.send(data) await self._wrapped_ws.send(data)
async def send(self, data):
"""
Add the data to the queue to be sent
"""
self.queue.put_nowait(data)
async def recv(self): async def recv(self):
""" """
Receive data on the wrapped websocket Receive data on the wrapped websocket
@ -107,7 +113,7 @@ class WebSocketChannel:
while True: while True:
message = await self.queue.get() message = await self.queue.get()
try: try:
await self.send(message) await self._send(message)
self.queue.task_done() self.queue.task_done()
except RuntimeError: except RuntimeError:
# The connection was closed, just exit the task # The connection was closed, just exit the task
@ -175,7 +181,7 @@ class ChannelManager:
for websocket, channel in self.channels.copy().items(): for websocket, channel in self.channels.copy().items():
if channel.subscribed_to(message_type): if channel.subscribed_to(message_type):
if not channel.queue.full(): if not channel.queue.full():
channel.queue.put_nowait(data) await channel.send(data)
else: else:
logger.info(f"Channel {channel} is too far behind, disconnecting") logger.info(f"Channel {channel} is too far behind, disconnecting")
await self.on_disconnect(websocket) await self.on_disconnect(websocket)

View File

@ -174,6 +174,7 @@ class ExternalMessageConsumer:
:param producer: Dictionary containing producer info :param producer: Dictionary containing producer info
:param lock: An asyncio Lock :param lock: An asyncio Lock
""" """
channel = None
while self._running: while self._running:
try: try:
host, port = producer['host'], producer['port'] host, port = producer['host'], producer['port']
@ -224,6 +225,10 @@ class ExternalMessageConsumer:
logger.exception(e) logger.exception(e)
continue continue
finally:
if channel:
await channel.close()
async def _receive_messages( async def _receive_messages(
self, self,
channel: WebSocketChannel, channel: WebSocketChannel,