move sleep call in send, minor cleanup

This commit is contained in:
Timothy Pogue 2022-11-19 13:21:26 -07:00
parent 98d87b3ba6
commit c1a73a5512
2 changed files with 9 additions and 12 deletions

View File

@ -55,14 +55,16 @@ class WebSocketChannel:
""" """
Send a message on the wrapped websocket Send a message on the wrapped websocket
""" """
await self._wrapped_ws.send(message)
# Without this sleep, messages would send to one channel # Without this sleep, messages would send to one channel
# first then another after the first one finished. # first then another after the first one finished and prevent
# any normal Rest API calls from processing at the same time.
# With the sleep call, it gives control to the event # With the sleep call, it gives control to the event
# loop to schedule other channel send methods. # loop to schedule other channel send methods, and helps
await asyncio.sleep(0) # throttle how fast we send.
# 0.01 = 100 messages/second max throughput
return await self._wrapped_ws.send(message) await asyncio.sleep(0.01)
async def recv(self): async def recv(self):
""" """
@ -132,12 +134,10 @@ class WebSocketChannel:
] ]
try: try:
await asyncio.gather(*self._channel_tasks, **kwargs) return await asyncio.gather(*self._channel_tasks, **kwargs)
except Exception: except Exception:
# If an exception occurred, cancel the rest of the tasks and bubble up # If an exception occurred, cancel the rest of the tasks
# the error that was caught here
await self.cancel_channel_tasks() await self.cancel_channel_tasks()
raise
async def cancel_channel_tasks(self): async def cancel_channel_tasks(self):
""" """
@ -176,8 +176,6 @@ async def create_channel(websocket: WebSocketType, **kwargs):
logger.info(f"Connected to channel - {channel}") logger.info(f"Connected to channel - {channel}")
yield channel yield channel
except Exception:
pass
finally: finally:
await channel.close() await channel.close()
logger.info(f"Disconnected from channel - {channel}") logger.info(f"Disconnected from channel - {channel}")

View File

@ -31,7 +31,6 @@ class WebSocketSerializer(ABC):
async def recv(self) -> bytes: async def recv(self) -> bytes:
data = await self._websocket.recv() data = await self._websocket.recv()
return self._deserialize(data) return self._deserialize(data)