refactor channel.send to avoid queue.put
This commit is contained in:
parent
2dc55e89e6
commit
cbede2e27d
@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from threading import RLock
|
from threading import RLock
|
||||||
from typing import Any, Dict, List, Optional, Type, Union
|
from typing import Any, Dict, List, Optional, Type, Union
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
@ -73,16 +74,23 @@ class WebSocketChannel:
|
|||||||
Add the data to the queue to be sent.
|
Add the data to the queue to be sent.
|
||||||
:returns: True if data added to queue, False otherwise
|
:returns: True if data added to queue, False otherwise
|
||||||
"""
|
"""
|
||||||
try:
|
|
||||||
await asyncio.wait_for(
|
|
||||||
self.queue.put(data),
|
|
||||||
timeout=self.drain_timeout
|
|
||||||
)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
return False
|
|
||||||
except RuntimeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
# This block only runs if the queue is full, it will wait
|
||||||
|
# until self.drain_timeout for the relay to drain the outgoing
|
||||||
|
# queue
|
||||||
|
start = time.time()
|
||||||
|
while self.queue.full():
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
if (time.time() - start) > self.drain_timeout:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If for some reason the queue is still full, just return False
|
||||||
|
try:
|
||||||
|
self.queue.put_nowait(data)
|
||||||
|
except asyncio.QueueFull:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# If we got here everything is ok
|
||||||
return True
|
return True
|
||||||
|
|
||||||
async def recv(self):
|
async def recv(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user