stable/freqtrade/rpc/external_message_consumer.py

412 lines
14 KiB
Python
Raw Permalink Normal View History

2022-08-31 01:21:34 +00:00
"""
ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
import asyncio
import logging
import socket
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
2022-08-31 01:21:34 +00:00
import websockets
2022-09-08 19:58:28 +00:00
from pydantic import ValidationError
2022-08-31 01:21:34 +00:00
from freqtrade.constants import FULL_DATAFRAME_THRESHOLD
from freqtrade.data.dataprovider import DataProvider
2022-09-07 21:08:01 +00:00
from freqtrade.enums import RPCMessageType
2022-09-02 05:15:03 +00:00
from freqtrade.misc import remove_entry_exit_signals
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
2022-09-08 19:58:28 +00:00
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
WSMessageSchema, WSRequestSchema,
WSSubscribeRequest, WSWhitelistMessage,
WSWhitelistRequest)
2022-08-31 01:21:34 +00:00
2022-09-06 18:40:58 +00:00
if TYPE_CHECKING:
import websockets.connect
2022-09-24 14:10:42 +00:00
class Producer(TypedDict):
name: str
host: str
port: int
2022-11-16 05:26:54 +00:00
secure: bool
2022-09-24 14:10:42 +00:00
ws_token: str
2022-08-31 01:21:34 +00:00
logger = logging.getLogger(__name__)
def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]):
return schema.dict(exclude_none=True)
2022-08-31 01:21:34 +00:00
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
2022-09-22 17:58:38 +00:00
other freqtrade bot's
2022-08-31 01:21:34 +00:00
"""
def __init__(
self,
config: Dict[str, Any],
dataprovider: DataProvider
2022-08-31 01:21:34 +00:00
):
self._config = config
self._dp = dataprovider
2022-08-31 01:21:34 +00:00
self._running = False
self._thread = None
self._loop = None
self._main_task = None
self._sub_tasks = None
self._emc_config = self._config.get('external_message_consumer', {})
self.enabled = self._emc_config.get('enabled', False)
2022-09-24 14:10:42 +00:00
self.producers: List[Producer] = self._emc_config.get('producers', [])
2022-08-31 01:21:34 +00:00
self.wait_timeout = self._emc_config.get('wait_timeout', 30) # in seconds
2022-09-09 16:45:49 +00:00
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds
2022-08-31 01:21:34 +00:00
2022-09-02 22:01:33 +00:00
# The amount of candles per dataframe on the initial request
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500)
# Message size limit, in megabytes. Default 8mb, Use bitwise operator << 20 to convert
# as the websockets client expects bytes.
self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20)
2022-08-31 01:21:34 +00:00
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
# callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
# Allow setting data for each initial request
2022-09-07 21:08:01 +00:00
self._initial_requests: List[WSRequestSchema] = [
WSSubscribeRequest(data=self.topics),
WSWhitelistRequest(),
WSAnalyzedDFRequest()
]
2022-09-02 22:01:33 +00:00
# Specify which function to use for which RPCMessageType
2022-09-12 18:00:01 +00:00
self._message_handlers: Dict[str, Callable[[str, WSMessageSchema], None]] = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self._channel_streams: Dict[str, MessageStream] = {}
2022-08-31 01:21:34 +00:00
self.start()
def start(self):
"""
Start the main internal loop in another thread to run coroutines
"""
if self._thread and self._loop:
return
2022-08-31 01:21:34 +00:00
logger.info("Starting ExternalMessageConsumer")
2022-08-31 01:21:34 +00:00
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever)
self._running = True
self._thread.start()
2022-08-31 01:21:34 +00:00
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
def shutdown(self):
"""
Shutdown the loop, thread, and tasks
"""
if self._thread and self._loop:
logger.info("Stopping ExternalMessageConsumer")
self._running = False
2022-08-31 01:21:34 +00:00
self._channel_streams = {}
2022-08-31 01:21:34 +00:00
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
if self._main_task:
# Cancel the main task
self._main_task.cancel()
self._thread.join()
self._thread = None
self._loop = None
self._sub_tasks = None
self._main_task = None
2022-08-31 01:21:34 +00:00
async def _main(self):
"""
The main task coroutine
"""
lock = asyncio.Lock()
2022-08-31 01:21:34 +00:00
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, lock))
2022-08-31 01:21:34 +00:00
for producer in self.producers
]
await asyncio.gather(*self._sub_tasks)
except asyncio.CancelledError:
pass
finally:
# Stop the loop once we are done
self._loop.stop()
2022-09-24 14:10:42 +00:00
async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock):
2022-08-31 01:21:34 +00:00
"""
Main connection loop for the consumer
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
2022-08-31 01:21:34 +00:00
"""
try:
await self._create_connection(producer, lock)
2022-08-31 01:21:34 +00:00
except asyncio.CancelledError:
# Exit silently
pass
2022-09-24 14:10:42 +00:00
async def _create_connection(self, producer: Producer, lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
while self._running:
try:
host, port = producer['host'], producer['port']
token = producer['ws_token']
name = producer['name']
2022-11-20 23:36:22 +00:00
scheme = 'wss' if producer.get('secure', False) else 'ws'
2022-11-16 05:26:54 +00:00
ws_url = f"{scheme}://{host}:{port}/api/v1/message/ws?token={token}"
# This will raise InvalidURI if the url is bad
2022-10-10 00:51:52 +00:00
async with websockets.connect(
ws_url,
max_size=self.message_size_limit,
ping_interval=None
) as ws:
2022-11-27 20:14:49 +00:00
async with create_channel(
ws,
channel_id=name,
send_throttle=0.5
) as channel:
# Create the message stream for this channel
self._channel_streams[name] = MessageStream()
# Run the channel tasks while connected
await channel.run_channel_tasks(
self._receive_messages(channel, producer, lock),
self._send_requests(channel, self._channel_streams[name])
)
except (websockets.exceptions.InvalidURI, ValueError) as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
except (
socket.gaierror,
ConnectionRefusedError,
2022-09-21 22:04:25 +00:00
websockets.exceptions.InvalidStatusCode,
websockets.exceptions.InvalidMessage
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
2022-12-02 19:28:27 +00:00
await asyncio.sleep(self.sleep_time)
continue
2022-09-23 18:36:05 +00:00
except (
websockets.exceptions.ConnectionClosedError,
2022-09-23 18:58:26 +00:00
websockets.exceptions.ConnectionClosedOK
2022-09-23 18:36:05 +00:00
):
# Just keep trying to connect again indefinitely
2022-12-02 19:28:27 +00:00
await asyncio.sleep(self.sleep_time)
continue
2022-09-11 05:57:17 +00:00
except Exception as e:
2022-09-21 22:04:25 +00:00
# An unforseen error has occurred, log and continue
2022-09-11 05:57:17 +00:00
logger.error("Unexpected error has occurred:")
logger.exception(e)
await asyncio.sleep(self.sleep_time)
continue
async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream):
# Send the initial requests
for init_request in self._initial_requests:
await channel.send(schema_to_dict(init_request))
# Now send any subsequent requests published to
# this channel's stream
async for request, _ in channel_stream:
logger.debug(f"Sending request to channel - {channel} - {request}")
await channel.send(request)
async def _receive_messages(
self,
channel: WebSocketChannel,
2022-09-24 14:10:42 +00:00
producer: Producer,
lock: asyncio.Lock
):
"""
Loop to handle receiving messages from a Producer
:param channel: The WebSocketChannel object for the WebSocket
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
2022-09-02 22:01:33 +00:00
while self._running:
try:
message = await asyncio.wait_for(
channel.recv(),
2022-09-09 16:45:49 +00:00
timeout=self.wait_timeout
)
2022-09-06 18:12:05 +00:00
try:
async with lock:
# Handle the message
self.handle_producer_message(producer, message)
except Exception as e:
logger.exception(f"Error handling producer message: {e}")
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
pong = await channel.ping()
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
continue
2022-09-13 22:06:25 +00:00
except Exception as e:
# Just eat the error and continue reconnecting
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
2022-09-13 22:06:25 +00:00
logger.debug(e, exc_info=e)
2022-11-29 19:22:06 +00:00
raise
def send_producer_request(
self,
producer_name: str,
request: Union[WSRequestSchema, Dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
sent by the channel task.
:param producer_name: The name of the producer to publish the message to
:param request: The request to send to the producer
"""
if isinstance(request, WSRequestSchema):
request = schema_to_dict(request)
if channel_stream := self._channel_streams.get(producer_name):
channel_stream.publish(request)
2022-09-24 14:10:42 +00:00
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
2022-08-31 01:21:34 +00:00
"""
Handles external messages from a Producer
"""
2022-08-31 17:43:02 +00:00
producer_name = producer.get('name', 'default')
2022-09-07 21:08:01 +00:00
try:
producer_message = WSMessageSchema.parse_obj(message)
except ValidationError as e:
2022-09-10 19:44:27 +00:00
logger.error(f"Invalid message from `{producer_name}`: {e}")
2022-09-07 21:08:01 +00:00
return
2022-08-31 01:21:34 +00:00
if not producer_message.data:
logger.error(f"Empty message received from `{producer_name}`")
return
2022-09-29 05:10:00 +00:00
logger.debug(f"Received message of type `{producer_message.type}` from `{producer_name}`")
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
message_handler = self._message_handlers.get(producer_message.type)
if not message_handler:
2022-09-10 19:44:27 +00:00
logger.info(f"Received unhandled message: `{producer_message.data}`, ignoring...")
return
2022-09-07 21:08:01 +00:00
message_handler(producer_name, producer_message)
2022-09-12 18:00:01 +00:00
def _consume_whitelist_message(self, producer_name: str, message: WSMessageSchema):
2022-09-07 21:08:01 +00:00
try:
# Validate the message
whitelist_message = WSWhitelistMessage.parse_obj(message)
2022-09-10 20:29:15 +00:00
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
# Add the pairlist data to the DataProvider
2022-09-12 20:09:12 +00:00
self._dp._set_producer_pairs(whitelist_message.data, producer_name=producer_name)
2022-09-10 19:44:27 +00:00
logger.debug(f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`")
2022-09-12 18:00:01 +00:00
def _consume_analyzed_df_message(self, producer_name: str, message: WSMessageSchema):
2022-09-07 21:08:01 +00:00
try:
df_message = WSAnalyzedDFMessage.parse_obj(message)
2022-09-10 20:29:15 +00:00
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
2022-08-31 01:21:34 +00:00
key = df_message.data.key
df = df_message.data.df
la = df_message.data.la
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
pair, timeframe, candle_type = key
2022-08-31 01:21:34 +00:00
if df.empty:
logger.debug(f"Received Empty Dataframe for {key}")
return
2022-09-07 21:08:01 +00:00
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
df = remove_entry_exit_signals(df)
2022-08-31 01:21:34 +00:00
logger.debug(f"Received {len(df)} candle(s) for {key}")
did_append, n_missing = self._dp._add_external_df(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
2022-11-27 19:17:26 +00:00
)
if not did_append:
# We want an overlap in candles incase some data has changed
n_missing += 1
# Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles "
f"for {key} from `{producer_name}`")
self.send_producer_request(
producer_name,
WSAnalyzedDFRequest(
data={
"limit": n_missing,
"pair": pair
}
)
)
return
2022-08-31 01:30:14 +00:00
logger.debug(
f"Consumed message from `{producer_name}` "
f"of type `RPCMessageType.ANALYZED_DF` for {key}")