initial candle request limit, better error reporting, split up _handle_producer_connection
This commit is contained in:
@@ -8,7 +8,7 @@ import asyncio
|
||||
import logging
|
||||
import socket
|
||||
from threading import Thread
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import pandas
|
||||
import websockets
|
||||
@@ -54,11 +54,25 @@ class ExternalMessageConsumer:
|
||||
self.ping_timeout = self._emc_config.get('ping_timeout', 2)
|
||||
self.sleep_time = self._emc_config.get('sleep_time', 5)
|
||||
|
||||
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 500)
|
||||
|
||||
# Setting these explicitly as they probably shouldn't be changed by a user
|
||||
# Unless we somehow integrate this with the strategy to allow creating
|
||||
# callbacks for the messages
|
||||
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
|
||||
|
||||
# Allow setting data for each initial request
|
||||
self._initial_requests: List[Dict[str, Any]] = [
|
||||
{
|
||||
"type": RPCRequestType.WHITELIST,
|
||||
"data": None
|
||||
},
|
||||
{
|
||||
"type": RPCRequestType.ANALYZED_DF,
|
||||
"data": {"limit": self.initial_candle_limit}
|
||||
}
|
||||
]
|
||||
|
||||
self._message_handlers = {
|
||||
RPCMessageType.WHITELIST: self._consume_whitelist_message,
|
||||
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
|
||||
@@ -145,12 +159,14 @@ class ExternalMessageConsumer:
|
||||
while self._running:
|
||||
try:
|
||||
url, token = producer['url'], producer['ws_token']
|
||||
name = producer["name"]
|
||||
ws_url = f"{url}?token={token}"
|
||||
|
||||
# This will raise InvalidURI if the url is bad
|
||||
async with websockets.connect(ws_url) as ws:
|
||||
logger.info("Connection successful")
|
||||
channel = WebSocketChannel(ws)
|
||||
channel = WebSocketChannel(ws, channel_id=name)
|
||||
|
||||
logger.info(f"Producer connection success - {channel}")
|
||||
|
||||
# Tell the producer we only want these topics
|
||||
# Should always be the first thing we send
|
||||
@@ -158,41 +174,16 @@ class ExternalMessageConsumer:
|
||||
self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics)
|
||||
)
|
||||
|
||||
# Now request the initial data from this Producer for every topic
|
||||
# we've subscribed to
|
||||
for topic in self.topics:
|
||||
# without .upper() we get KeyError
|
||||
request_type = RPCRequestType[topic.upper()]
|
||||
await channel.send(self.compose_consumer_request(request_type))
|
||||
# Now request the initial data from this Producer
|
||||
for request in self._initial_requests:
|
||||
request_type = request.get('type', 'none') # Default to string
|
||||
request_data = request.get('data')
|
||||
await channel.send(
|
||||
self.compose_consumer_request(request_type, request_data)
|
||||
)
|
||||
|
||||
# Now receive data, if none is within the time limit, ping
|
||||
while True:
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
channel.recv(),
|
||||
timeout=self.reply_timeout
|
||||
)
|
||||
|
||||
async with lock:
|
||||
# Handle the message
|
||||
self.handle_producer_message(producer, message)
|
||||
|
||||
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
|
||||
# We haven't received data yet. Check the connection and continue.
|
||||
try:
|
||||
# ping
|
||||
ping = await channel.ping()
|
||||
|
||||
await asyncio.wait_for(ping, timeout=self.ping_timeout)
|
||||
logger.debug(f"Connection to {url} still alive...")
|
||||
|
||||
continue
|
||||
except Exception:
|
||||
logger.info(
|
||||
f"Ping error {url} - retrying in {self.sleep_time}s")
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
|
||||
break
|
||||
await self._receive_messages(channel, producer, lock)
|
||||
|
||||
# Catch invalid ws_url, and break the loop
|
||||
except websockets.exceptions.InvalidURI as e:
|
||||
@@ -214,6 +205,47 @@ class ExternalMessageConsumer:
|
||||
logger.exception(e)
|
||||
break
|
||||
|
||||
async def _receive_messages(
|
||||
self,
|
||||
channel: WebSocketChannel,
|
||||
producer: Dict[str, Any],
|
||||
lock: asyncio.Lock
|
||||
):
|
||||
"""
|
||||
Loop to handle receiving messages from a Producer
|
||||
|
||||
:param channel: The WebSocketChannel object for the WebSocket
|
||||
:param producer: Dictionary containing producer info
|
||||
:param lock: An asyncio Lock
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
message = await asyncio.wait_for(
|
||||
channel.recv(),
|
||||
timeout=self.reply_timeout
|
||||
)
|
||||
|
||||
async with lock:
|
||||
# Handle the message
|
||||
self.handle_producer_message(producer, message)
|
||||
|
||||
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
|
||||
# We haven't received data yet. Check the connection and continue.
|
||||
try:
|
||||
# ping
|
||||
ping = await channel.ping()
|
||||
|
||||
await asyncio.wait_for(ping, timeout=self.ping_timeout)
|
||||
logger.debug(f"Connection to {channel} still alive...")
|
||||
|
||||
continue
|
||||
except Exception:
|
||||
logger.info(
|
||||
f"Ping error {channel} - retrying in {self.sleep_time}s")
|
||||
await asyncio.sleep(self.sleep_time)
|
||||
|
||||
break
|
||||
|
||||
def compose_consumer_request(
|
||||
self,
|
||||
type_: RPCRequestType,
|
||||
@@ -241,7 +273,7 @@ class ExternalMessageConsumer:
|
||||
if message_data is None:
|
||||
return
|
||||
|
||||
logger.debug(f"Received message of type {message_type}")
|
||||
logger.debug(f"Received message of type {message_type} from `{producer_name}`")
|
||||
|
||||
message_handler = self._message_handlers.get(message_type)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user