allow specifying channel send throttle

This commit is contained in:
Timothy Pogue 2022-11-27 13:14:49 -07:00
parent d2c8487ecf
commit 89338fa677
2 changed files with 10 additions and 3 deletions

View File

@ -27,7 +27,8 @@ class WebSocketChannel:
self, self,
websocket: WebSocketType, websocket: WebSocketType,
channel_id: Optional[str] = None, channel_id: Optional[str] = None,
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
send_throttle: float = 0.01
): ):
self.channel_id = channel_id if channel_id else uuid4().hex[:8] self.channel_id = channel_id if channel_id else uuid4().hex[:8]
self._websocket = WebSocketProxy(websocket) self._websocket = WebSocketProxy(websocket)
@ -41,6 +42,7 @@ class WebSocketChannel:
self._send_times: Deque[float] = deque([], maxlen=10) self._send_times: Deque[float] = deque([], maxlen=10)
# High limit defaults to 3 to start # High limit defaults to 3 to start
self._send_high_limit = 3 self._send_high_limit = 3
self._send_throttle = send_throttle
# The subscribed message types # The subscribed message types
self._subscriptions: List[str] = [] self._subscriptions: List[str] = []
@ -106,7 +108,8 @@ class WebSocketChannel:
# Explicitly give control back to event loop as # Explicitly give control back to event loop as
# websockets.send does not # websockets.send does not
await asyncio.sleep(0.01) # Also throttles how fast we send
await asyncio.sleep(self._send_throttle)
async def recv(self): async def recv(self):
""" """

View File

@ -202,7 +202,11 @@ class ExternalMessageConsumer:
max_size=self.message_size_limit, max_size=self.message_size_limit,
ping_interval=None ping_interval=None
) as ws: ) as ws:
async with create_channel(ws, channel_id=name) as channel: async with create_channel(
ws,
channel_id=name,
send_throttle=0.5
) as channel:
# Create the message stream for this channel # Create the message stream for this channel
self._channel_streams[name] = MessageStream() self._channel_streams[name] = MessageStream()