use new channel apis in emc, extend analyzed df to include list of dates for candles

This commit is contained in:
Timothy Pogue
2022-11-25 18:09:47 -07:00
parent 3e4e6bb114
commit 9660e445b8
4 changed files with 212 additions and 42 deletions

View File

@@ -8,7 +8,7 @@ import asyncio
import logging
import socket
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
import websockets
from pydantic import ValidationError
@@ -16,7 +16,8 @@ from pydantic import ValidationError
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import RPCMessageType
from freqtrade.misc import remove_entry_exit_signals
from freqtrade.rpc.api_server.ws import WebSocketChannel
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
WSMessageSchema, WSRequestSchema,
WSSubscribeRequest, WSWhitelistMessage,
@@ -38,6 +39,14 @@ class Producer(TypedDict):
logger = logging.getLogger(__name__)
def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]):
return schema.dict(exclude_none=True)
# def parse_message(message: Dict[str, Any], message_schema: Type[WSMessageSchema]):
# return message_schema.parse_obj(message)
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
@@ -92,6 +101,8 @@ class ExternalMessageConsumer:
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self._channel_streams: Dict[str, MessageStream] = {}
self.start()
def start(self):
@@ -118,6 +129,8 @@ class ExternalMessageConsumer:
logger.info("Stopping ExternalMessageConsumer")
self._running = False
self._channel_streams = {}
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
@@ -175,7 +188,6 @@ class ExternalMessageConsumer:
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
channel = None
while self._running:
try:
host, port = producer['host'], producer['port']
@@ -190,19 +202,17 @@ class ExternalMessageConsumer:
max_size=self.message_size_limit,
ping_interval=None
) as ws:
channel = WebSocketChannel(ws, channel_id=name)
async with create_channel(ws, channel_id=name) as channel:
logger.info(f"Producer connection success - {channel}")
# Create the message stream for this channel
self._channel_streams[name] = MessageStream()
# Now request the initial data from this Producer
for request in self._initial_requests:
await channel.send(
request.dict(exclude_none=True)
# Run the channel tasks while connected
await channel.run_channel_tasks(
self._receive_messages(channel, producer, lock),
self._send_requests(channel, self._channel_streams[name])
)
# Now receive data, if none is within the time limit, ping
await self._receive_messages(channel, producer, lock)
except (websockets.exceptions.InvalidURI, ValueError) as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
@@ -214,26 +224,33 @@ class ExternalMessageConsumer:
websockets.exceptions.InvalidMessage
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
except (
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK
):
# Just keep trying to connect again indefinitely
await asyncio.sleep(self.sleep_time)
continue
pass
except Exception as e:
# An unforseen error has occurred, log and continue
logger.error("Unexpected error has occurred:")
logger.exception(e)
continue
finally:
if channel:
await channel.close()
await asyncio.sleep(self.sleep_time)
continue
async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream):
# Send the initial requests
for init_request in self._initial_requests:
await channel.send(schema_to_dict(init_request))
# Now send any subsequent requests published to
# this channel's stream
async for request in channel_stream:
logger.info(f"Sending request to channel - {channel} - {request}")
await channel.send(request)
async def _receive_messages(
self,
@@ -270,20 +287,39 @@ class ExternalMessageConsumer:
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
continue
except (websockets.exceptions.ConnectionClosed):
# Just eat the error and continue reconnecting
logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except Exception as e:
# Just eat the error and continue reconnecting
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
logger.debug(e, exc_info=e)
await asyncio.sleep(self.sleep_time)
finally:
await asyncio.sleep(self.sleep_time)
break
def send_producer_request(
self,
producer_name: str,
request: Union[WSRequestSchema, Dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
sent by the channel task.
:param producer_name: The name of the producer to publish the message to
:param request: The request to send to the producer
"""
if isinstance(request, WSRequestSchema):
request = schema_to_dict(request)
if channel_stream := self._channel_streams.get(producer_name):
channel_stream.publish(request)
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
"""
Handles external messages from a Producer
@@ -340,12 +376,44 @@ class ExternalMessageConsumer:
if self._emc_config.get('remove_entry_exit_signals', False):
df = remove_entry_exit_signals(df)
# Add the dataframe to the dataprovider
self._dp._add_external_df(pair, df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name)
if len(df) >= 999:
# This is a full dataframe
# Add the dataframe to the dataprovider
self._dp._add_external_df(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
logger.debug(
elif len(df) == 1:
# This is just a single candle
# Have dataprovider append it to
# the full datafame. If it can't,
# request the missing candles
if not self._dp._add_external_candle(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
):
logger.info("Holes in data or no existing df, "
f"requesting data for {key} from `{producer_name}`")
self.send_producer_request(
producer_name,
WSAnalyzedDFRequest(
data={
"limit": 1000,
"pair": pair
}
)
)
return
logger.info(
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`")