stable/freqtrade/rpc/external_message_consumer.py

327 lines
11 KiB
Python
Raw Normal View History

2022-08-31 01:21:34 +00:00
"""
ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
import asyncio
import logging
import socket
from threading import Thread
2022-09-06 18:40:58 +00:00
from typing import TYPE_CHECKING, Any, Dict, List, Optional
2022-08-31 01:21:34 +00:00
import websockets
from freqtrade.data.dataprovider import DataProvider
2022-08-31 01:21:34 +00:00
from freqtrade.enums import RPCMessageType, RPCRequestType
2022-09-02 05:15:03 +00:00
from freqtrade.misc import remove_entry_exit_signals
2022-08-31 01:21:34 +00:00
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel
2022-09-06 18:40:58 +00:00
if TYPE_CHECKING:
import websockets.connect
import websockets.exceptions
2022-08-31 01:21:34 +00:00
logger = logging.getLogger(__name__)
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
other FreqTrade bot's
"""
def __init__(
self,
config: Dict[str, Any],
dataprovider: DataProvider
2022-08-31 01:21:34 +00:00
):
self._config = config
self._dp = dataprovider
2022-08-31 01:21:34 +00:00
self._running = False
self._thread = None
self._loop = None
self._main_task = None
self._sub_tasks = None
self._emc_config = self._config.get('external_message_consumer', {})
self.enabled = self._emc_config.get('enabled', False)
self.producers = self._emc_config.get('producers', [])
if self.enabled and len(self.producers) < 1:
raise ValueError("You must specify at least 1 Producer to connect to.")
self.reply_timeout = self._emc_config.get('reply_timeout', 10)
self.ping_timeout = self._emc_config.get('ping_timeout', 2)
self.sleep_time = self._emc_config.get('sleep_time', 5)
2022-09-02 22:01:33 +00:00
# The amount of candles per dataframe on the initial request
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500)
2022-08-31 01:21:34 +00:00
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
# callbacks for the messages
self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF]
# Allow setting data for each initial request
self._initial_requests: List[Dict[str, Any]] = [
{
"type": RPCRequestType.WHITELIST,
"data": None
},
{
"type": RPCRequestType.ANALYZED_DF,
"data": {"limit": self.initial_candle_limit}
}
]
2022-09-02 22:01:33 +00:00
# Specify which function to use for which RPCMessageType
self._message_handlers = {
RPCMessageType.WHITELIST: self._consume_whitelist_message,
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
2022-08-31 01:21:34 +00:00
self.start()
def start(self):
"""
Start the main internal loop in another thread to run coroutines
"""
self._loop = asyncio.new_event_loop()
if not self._thread:
logger.info("Starting ExternalMessageConsumer")
self._thread = Thread(target=self._loop.run_forever)
self._thread.start()
self._running = True
else:
raise RuntimeError("A loop is already running")
self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop)
def shutdown(self):
"""
Shutdown the loop, thread, and tasks
"""
if self._thread and self._loop:
logger.info("Stopping ExternalMessageConsumer")
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
task.cancel()
if self._main_task:
# Cancel the main task
self._main_task.cancel()
self._thread.join()
async def _main(self):
"""
The main task coroutine
"""
lock = asyncio.Lock()
2022-08-31 01:21:34 +00:00
try:
# Create a connection to each producer
self._sub_tasks = [
self._loop.create_task(self._handle_producer_connection(producer, lock))
2022-08-31 01:21:34 +00:00
for producer in self.producers
]
await asyncio.gather(*self._sub_tasks)
except asyncio.CancelledError:
pass
finally:
# Stop the loop once we are done
self._loop.stop()
async def _handle_producer_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
2022-08-31 01:21:34 +00:00
"""
Main connection loop for the consumer
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
2022-08-31 01:21:34 +00:00
"""
try:
await self._create_connection(producer, lock)
2022-08-31 01:21:34 +00:00
except asyncio.CancelledError:
# Exit silently
pass
async def _create_connection(self, producer: Dict[str, Any], lock: asyncio.Lock):
"""
Actually creates and handles the websocket connection, pinging on timeout
and handling connection errors.
2022-09-02 22:01:33 +00:00
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
while self._running:
try:
url, token = producer['url'], producer['ws_token']
name = producer["name"]
ws_url = f"{url}?token={token}"
# This will raise InvalidURI if the url is bad
async with websockets.connect(ws_url) as ws:
channel = WebSocketChannel(ws, channel_id=name)
logger.info(f"Producer connection success - {channel}")
# Tell the producer we only want these topics
# Should always be the first thing we send
await channel.send(
self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics)
)
# Now request the initial data from this Producer
for request in self._initial_requests:
await channel.send(
2022-09-02 22:01:33 +00:00
self.compose_consumer_request(request['type'], request['data'])
)
# Now receive data, if none is within the time limit, ping
await self._receive_messages(channel, producer, lock)
# Catch invalid ws_url, and break the loop
except websockets.exceptions.InvalidURI as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
except (
socket.gaierror,
ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
continue
except Exception as e:
# An unforseen error has occurred, log and stop
logger.exception(e)
break
async def _receive_messages(
self,
channel: WebSocketChannel,
producer: Dict[str, Any],
lock: asyncio.Lock
):
"""
Loop to handle receiving messages from a Producer
:param channel: The WebSocketChannel object for the WebSocket
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
2022-09-02 22:01:33 +00:00
while self._running:
try:
message = await asyncio.wait_for(
channel.recv(),
timeout=self.reply_timeout
)
2022-09-06 18:12:05 +00:00
try:
async with lock:
# Handle the message
self.handle_producer_message(producer, message)
except Exception as e:
logger.exception(f"Error handling producer message: {e}")
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
# We haven't received data yet. Check the connection and continue.
try:
# ping
ping = await channel.ping()
await asyncio.wait_for(ping, timeout=self.ping_timeout)
logger.debug(f"Connection to {channel} still alive...")
continue
except Exception:
logger.info(
f"Ping error {channel} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
def compose_consumer_request(
self,
type_: RPCRequestType,
data: Optional[Any] = None
) -> Dict[str, Any]:
2022-08-31 01:21:34 +00:00
"""
Create a request for sending to a producer
:param type_: The RPCRequestType
:param data: The data to send
:returns: Dict[str, Any]
"""
return {'type': type_, 'data': data}
2022-08-31 17:43:02 +00:00
def handle_producer_message(self, producer: Dict[str, Any], message: Dict[str, Any]):
2022-08-31 01:21:34 +00:00
"""
Handles external messages from a Producer
"""
2022-08-31 17:43:02 +00:00
producer_name = producer.get('name', 'default')
2022-08-31 01:21:34 +00:00
# Should we have a default message type?
message_type = message.get('type', RPCMessageType.STATUS)
message_data = message.get('data')
# We shouldn't get empty messages
if message_data is None:
return
2022-09-02 22:01:33 +00:00
logger.info(f"Received message of type {message_type} from `{producer_name}`")
2022-08-31 01:21:34 +00:00
message_handler = self._message_handlers.get(message_type)
if not message_handler:
logger.info(f"Received unhandled message: {message_data}, ignoring...")
return
message_handler(producer_name, message_data)
def _consume_whitelist_message(self, producer_name: str, message_data: Any):
# We expect List[str]
if not isinstance(message_data, list):
return
# Add the pairlist data to the DataProvider
self._dp._set_producer_pairs(message_data, producer_name=producer_name)
logger.debug(f"Consumed message from {producer_name} of type RPCMessageType.WHITELIST")
def _consume_analyzed_df_message(self, producer_name: str, message_data: Any):
# We expect a Dict[str, Any]
if not isinstance(message_data, dict):
return
2022-08-31 01:21:34 +00:00
key, value = message_data.get('key'), message_data.get('value')
2022-08-31 01:21:34 +00:00
2022-09-06 18:12:05 +00:00
if key and value:
pair, timeframe, candle_type = key
2022-09-06 18:12:05 +00:00
dataframe, last_analyzed = value
2022-08-31 01:21:34 +00:00
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
dataframe = remove_entry_exit_signals(dataframe)
2022-08-31 01:21:34 +00:00
# Add the dataframe to the dataprovider
2022-09-06 18:12:05 +00:00
self._dp._add_external_df(pair, dataframe,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name)
2022-08-31 01:30:14 +00:00
logger.debug(
f"Consumed message from {producer_name} of type RPCMessageType.ANALYZED_DF")