stable/freqtrade/rpc/external_message_consumer.py

345 lines
12 KiB
Python
Raw Normal View History

2022-08-31 01:21:34 +00:00
"""
ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
import asyncio
import logging
import socket
from threading import Thread
2022-09-12 18:00:01 +00:00
from typing import TYPE_CHECKING, Any, Callable, Dict, List
2022-08-31 01:21:34 +00:00
import websockets
2022-09-08 19:58:28 +00:00
from pydantic import ValidationError
2022-08-31 01:21:34 +00:00
from freqtrade.data.dataprovider import DataProvider
2022-09-07 21:08:01 +00:00
from freqtrade.enums import RPCMessageType
from freqtrade.exceptions import OperationalException
2022-09-02 05:15:03 +00:00
from freqtrade.misc import remove_entry_exit_signals
2022-09-13 18:42:24 +00:00
from freqtrade.rpc.api_server.ws import WebSocketChannel
2022-09-08 19:58:28 +00:00
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
WSMessageSchema, WSRequestSchema,
WSSubscribeRequest, WSWhitelistMessage,
WSWhitelistRequest)
2022-08-31 01:21:34 +00:00
2022-09-06 18:40:58 +00:00
if TYPE_CHECKING:
import websockets.connect
import websockets.exceptions
2022-08-31 01:21:34 +00:00
logger = logging.getLogger(__name__)
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
2022-09-22 17:58:38 +00:00
other freqtrade bot's
2022-08-31 01:21:34 +00:00
"""
def __init__(
self,
config: Dict[str, Any],
dataprovider: DataProvider
2022-08-31 01:21:34 +00:00
):
self._config = config
self._dp = dataprovider
2022-08-31 01:21:34 +00:00
self._running = False
self._thread = None
self._loop = None
self._main_task = None
self._sub_tasks = None
self._emc_config = self._config.get('external_message_consumer', {})
self.enabled = self._emc_config.get('enabled', False)
self.producers = self._emc_config.get('producers', [])
2022-09-09 16:45:49 +00:00
self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds
2022-08-31 01:21:34 +00:00
2022-09-02 22:01:33 +00:00
# The amount of candles per dataframe on the initial request
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500)
# Message size limit, in megabytes. Default 8mb, Use bitwise operator << 20 to convert
# as the websockets client expects bytes.
self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20)
self.validate_config()
2022-08-31 01:21:34 +00:00
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
# callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
# Allow setting data for each initial request
2022-09-07 21:08:01 +00:00
self._initial_requests: List[WSRequestSchema] = [
WSSubscribeRequest(data=self.topics),
WSWhitelistRequest(),
WSAnalyzedDFRequest()
]
2022-09-02 22:01:33 +00:00
# Specify which function to use for which RPCMessageType
2022-09-12 18:00:01 +00:00
self._message_handlers: Dict[str, Callable[[str, WSMessageSchema], None]] = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
2022-08-31 01:21:34 +00:00
self.start()
def validate_config(self):
"""
Make sure values are what they are supposed to be
"""
if self.enabled and len(self.producers) < 1:
raise OperationalException("You must specify at least 1 Producer to connect to.")
if self.enabled and self._config.get('process_only_new_candles', True):
# Warning here or require it?
2022-09-15 23:54:31 +00:00
logger.warning("To receive best performance with external data, "
"please set `process_only_new_candles` to False")
2022-08-31 01:21:34 +00:00
def start(self):
"""
Start the main internal loop in another thread to run coroutines
"""
if self._thread and self._loop:
return
2022-08-31 01:21:34 +00:00
logger.info("Starting ExternalMessageConsumer")
2022-08-31 01:21:34 +00:00
self._loop = asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever)
self._running = True
self._thread.start()
2022-08-31 01:21:34 +00:00
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
def shutdown(self):
"""
Shutdown the loop, thread, and tasks
"""
if self._thread and self._loop:
logger.info("Stopping ExternalMessageConsumer")
self._running = False
2022-08-31 01:21:34 +00:00
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
if self._main_task:
# Cancel the main task
self._main_task.cancel()
self._thread.join()
self._thread = None
self._loop = None
self._sub_tasks = None
self._main_task = None
2022-08-31 01:21:34 +00:00
async def _main(self):
"""
The main task coroutine
"""
lock = asyncio.Lock()
2022-08-31 01:21:34 +00:00
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, lock))
2022-08-31 01:21:34 +00:00
for producer in self.producers
]
await asyncio.gather(*self._sub_tasks)
except asyncio.CancelledError:
pass
finally:
# Stop the loop once we are done
self._loop.stop()
async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
2022-08-31 01:21:34 +00:00
"""
Main connection loop for the consumer
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
2022-08-31 01:21:34 +00:00
"""
try:
await self._create_connection(producer, lock)
2022-08-31 01:21:34 +00:00
except asyncio.CancelledError:
# Exit silently
pass
async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
while self._running:
try:
host, port = producer['host'], producer['port']
token = producer['ws_token']
name = producer['name']
ws_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}"
# This will raise InvalidURI if the url is bad
async with websockets.connect(ws_url, max_size=self.message_size_limit) as ws:
channel = WebSocketChannel(ws, channel_id=name)
logger.info(f"Producer connection success - {channel}")
# Now request the initial data from this Producer
for request in self._initial_requests:
await channel.send(
2022-09-07 21:08:01 +00:00
request.dict(exclude_none=True)
)
# Now receive data, if none is within the time limit, ping
await self._receive_messages(channel, producer, lock)
except (websockets.exceptions.InvalidURI, ValueError) as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
except (
socket.gaierror,
ConnectionRefusedError,
2022-09-21 22:04:25 +00:00
websockets.exceptions.InvalidStatusCode,
websockets.exceptions.InvalidMessage
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
2022-09-23 18:36:05 +00:00
except (
websockets.exceptions.ConnectionClosedError,
2022-09-23 18:58:26 +00:00
websockets.exceptions.ConnectionClosedOK
2022-09-23 18:36:05 +00:00
):
# Just keep trying to connect again indefinitely
2022-09-23 19:10:45 +00:00
await asyncio.sleep(self.sleep_time)
2022-09-13 18:36:40 +00:00
continue
2022-09-11 05:57:17 +00:00
except Exception as e:
2022-09-21 22:04:25 +00:00
# An unforseen error has occurred, log and continue
2022-09-11 05:57:17 +00:00
logger.error("Unexpected error has occurred:")
logger.exception(e)
2022-09-13 18:36:40 +00:00
continue
async def _receive_messages(
self,
channel: WebSocketChannel,
producer: Dict[str, Any],
lock: asyncio.Lock
):
"""
Loop to handle receiving messages from a Producer
:param channel: The WebSocketChannel object for the WebSocket
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
2022-09-02 22:01:33 +00:00
while self._running:
try:
message = await asyncio.wait_for(
channel.recv(),
2022-09-09 16:45:49 +00:00
timeout=self.wait_timeout
)
2022-09-06 18:12:05 +00:00
try:
async with lock:
# Handle the message
self.handle_producer_message(producer, message)
except Exception as e:
logger.exception(f"Error handling producer message: {e}")
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
await asyncio.wait_for(ping, timeout=self.ping_timeout)
logger.debug(f"Connection to {channel} still alive...")
continue
2022-09-13 22:06:25 +00:00
except Exception as e:
2022-09-13 18:42:24 +00:00
logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s")
2022-09-13 22:06:25 +00:00
logger.debug(e, exc_info=e)
await asyncio.sleep(self.sleep_time)
2022-09-13 18:36:40 +00:00
break
2022-08-31 17:43:02 +00:00
def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]):
2022-08-31 01:21:34 +00:00
"""
Handles external messages from a Producer
"""
2022-08-31 17:43:02 +00:00
producer_name = producer.get('name', 'default')
2022-09-07 21:08:01 +00:00
try:
producer_message = WSMessageSchema.parse_obj(message)
except ValidationError as e:
2022-09-10 19:44:27 +00:00
logger.error(f"Invalid message from `{producer_name}`: {e}")
2022-09-07 21:08:01 +00:00
return
2022-08-31 01:21:34 +00:00
if not producer_message.data:
logger.error(f"Empty message received from `{producer_name}`")
return
2022-09-10 19:44:27 +00:00
logger.info(f"Received message of type `{producer_message.type}` from `{producer_name}`")
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
message_handler = self._message_handlers.get(producer_message.type)
if not message_handler:
2022-09-10 19:44:27 +00:00
logger.info(f"Received unhandled message: `{producer_message.data}`, ignoring...")
return
2022-09-07 21:08:01 +00:00
message_handler(producer_name, producer_message)
2022-09-12 18:00:01 +00:00
def _consume_whitelist_message(self, producer_name: str, message: WSMessageSchema):
2022-09-07 21:08:01 +00:00
try:
# Validate the message
whitelist_message = WSWhitelistMessage.parse_obj(message)
2022-09-10 20:29:15 +00:00
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
# Add the pairlist data to the DataProvider
2022-09-12 20:09:12 +00:00
self._dp._set_producer_pairs(whitelist_message.data, producer_name=producer_name)
2022-09-10 19:44:27 +00:00
logger.debug(f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`")
2022-09-12 18:00:01 +00:00
def _consume_analyzed_df_message(self, producer_name: str, message: WSMessageSchema):
2022-09-07 21:08:01 +00:00
try:
df_message = WSAnalyzedDFMessage.parse_obj(message)
2022-09-10 20:29:15 +00:00
except ValidationError as e:
logger.error(f"Invalid message from `{producer_name}`: {e}")
return
2022-08-31 01:21:34 +00:00
key = df_message.data.key
df = df_message.data.df
la = df_message.data.la
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
pair, timeframe, candle_type = key
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
df = remove_entry_exit_signals(df)
2022-08-31 01:21:34 +00:00
2022-09-07 21:08:01 +00:00
# Add the dataframe to the dataprovider
self._dp._add_external_df(pair, df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name)
2022-08-31 01:30:14 +00:00
2022-09-07 21:08:01 +00:00
logger.debug(
2022-09-10 19:44:27 +00:00
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`")