log warning if channel too far behind, add docstrings to message stream

This commit is contained in:
Timothy Pogue 2022-11-25 12:48:57 -07:00
parent bcc8063eeb
commit afc00bc30a
3 changed files with 31 additions and 13 deletions

View File

@ -1,4 +1,5 @@
import logging
import time
from typing import Any, Dict
from fastapi import APIRouter, Depends
@ -33,8 +34,16 @@ async def channel_broadcaster(channel: WebSocketChannel, message_stream: Message
"""
Iterate over messages in the message stream and send them
"""
async for message in message_stream:
async for message, ts in message_stream:
if channel.subscribed_to(message.get('type')):
# Log a warning if this channel is behind
# on the message stream by a lot
if (time.time() - ts) > 60:
logger.warning("Channel {channel} is behind MessageStream by 1 minute,"
" this can cause a memory leak if you see this message"
" often, consider reducing pair list size or amount of"
" consumers.")
await channel.send(message, timeout=True)

View File

@ -59,6 +59,10 @@ class WebSocketChannel:
def remote_addr(self):
return self._websocket.remote_addr
@property
def avg_send_time(self):
return sum(self._send_times) / len(self._send_times)
def _calc_send_limit(self):
"""
Calculate the send high limit for this channel
@ -66,11 +70,9 @@ class WebSocketChannel:
# Only update if we have enough data
if len(self._send_times) == self._send_times.maxlen:
# At least 1s or twice the average of send times
self._send_high_limit = max(
(sum(self._send_times) / len(self._send_times)) * 2,
1
)
# At least 1s or twice the average of send times, with a
# maximum of 3 seconds per message
self._send_high_limit = min(max(self.avg_send_time * 2, 1), 3)
async def send(
self,

View File

@ -1,4 +1,5 @@
import asyncio
import time
class MessageStream:
@ -11,14 +12,20 @@ class MessageStream:
self._waiter = self._loop.create_future()
def publish(self, message):
waiter, self._waiter = self._waiter, self._loop.create_future()
waiter.set_result((message, self._waiter))
"""
Publish a message to this MessageStream
async def subscribe(self):
:param message: The message to publish
"""
waiter, self._waiter = self._waiter, self._loop.create_future()
waiter.set_result((message, time.time(), self._waiter))
async def __aiter__(self):
"""
Iterate over the messages in the message stream
"""
waiter = self._waiter
while True:
# Shield the future from being cancelled by a task waiting on it
message, waiter = await asyncio.shield(waiter)
yield message
__aiter__ = subscribe
message, ts, waiter = await asyncio.shield(waiter)
yield message, ts