From 346e73dd75503f9170d6f6759a517d0f421e6fc6 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Tue, 30 Aug 2022 19:21:34 -0600 Subject: [PATCH] client implementation, minor fixes --- config_examples/config_full.example.json | 13 +- freqtrade/constants.py | 26 +- freqtrade/data/dataprovider.py | 10 +- freqtrade/enums/__init__.py | 2 +- freqtrade/enums/externalmessages.py | 7 + freqtrade/enums/externalsignal.py | 18 - freqtrade/freqtradebot.py | 17 +- freqtrade/rpc/api_server/api_ws.py | 2 +- freqtrade/rpc/api_server/webserver.py | 29 +- freqtrade/rpc/api_server/ws/channel.py | 3 - freqtrade/rpc/emc.py | 229 ++++++++++ freqtrade/rpc/external_signal/__init__.py | 5 - freqtrade/rpc/external_signal/channel.py | 145 ------- freqtrade/rpc/external_signal/controller.py | 449 -------------------- freqtrade/rpc/external_signal/proxy.py | 61 --- freqtrade/rpc/external_signal/serializer.py | 65 --- freqtrade/rpc/external_signal/types.py | 8 - freqtrade/rpc/external_signal/utils.py | 10 - freqtrade/rpc/rpc.py | 62 --- freqtrade/rpc/rpc_manager.py | 4 +- freqtrade/strategy/interface.py | 43 +- scripts/test_ws_client.py | 74 ---- 22 files changed, 323 insertions(+), 959 deletions(-) create mode 100644 freqtrade/enums/externalmessages.py delete mode 100644 freqtrade/enums/externalsignal.py create mode 100644 freqtrade/rpc/emc.py delete mode 100644 freqtrade/rpc/external_signal/__init__.py delete mode 100644 freqtrade/rpc/external_signal/channel.py delete mode 100644 freqtrade/rpc/external_signal/controller.py delete mode 100644 freqtrade/rpc/external_signal/proxy.py delete mode 100644 freqtrade/rpc/external_signal/serializer.py delete mode 100644 freqtrade/rpc/external_signal/types.py delete mode 100644 freqtrade/rpc/external_signal/utils.py delete mode 100644 scripts/test_ws_client.py diff --git a/config_examples/config_full.example.json b/config_examples/config_full.example.json index 74457d2b6..ec988687f 100644 --- a/config_examples/config_full.example.json +++ b/config_examples/config_full.example.json @@ -172,7 +172,18 @@ "jwt_secret_key": "somethingrandom", "CORS_origins": [], "username": "freqtrader", - "password": "SuperSecurePassword" + "password": "SuperSecurePassword", + "ws_token": "a_secret_ws_token", + "enable_message_ws": false + }, + "external_message_consumer": { + "enabled": false, + "producers": [ + { + "url": "ws://some.freqtrade.bot/api/v1/message/ws", + "ws_token": "a_secret_ws_token" + } + ] }, "bot_name": "freqtrade", "db_url": "sqlite:///tradesv3.sqlite", diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 96f8413b0..c7f2acc84 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -61,7 +61,6 @@ USERPATH_FREQAIMODELS = 'freqaimodels' TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] -FOLLOWER_MODE_OPTIONS = ['follower', 'leader'] WAIT_DATA_POLICY_OPTIONS = ['none', 'first', 'all'] ENV_VAR_PREFIX = 'FREQTRADE__' @@ -246,7 +245,7 @@ CONF_SCHEMA = { 'exchange': {'$ref': '#/definitions/exchange'}, 'edge': {'$ref': '#/definitions/edge'}, 'freqai': {'$ref': '#/definitions/freqai'}, - 'external_signal': {'$ref': '#/definitions/external_signal'}, + 'external_message_consumer': {'$ref': '#/definitions/external_message_consumer'}, 'experimental': { 'type': 'object', 'properties': { @@ -404,7 +403,8 @@ CONF_SCHEMA = { }, 'username': {'type': 'string'}, 'password': {'type': 'string'}, - 'api_token': {'type': 'string'}, + 'ws_token': {'type': 'string'}, + 'enable_message_ws': {'type': 'boolean', 'default': False}, 'jwt_secret_key': {'type': 'string'}, 'CORS_origins': {'type': 'array', 'items': {'type': 'string'}}, 'verbosity': {'type': 'string', 'enum': ['error', 'info']}, @@ -489,35 +489,31 @@ CONF_SCHEMA = { }, 'required': ['process_throttle_secs', 'allowed_risk'] }, - 'external_signal': { + 'external_message_consumer': { 'type': 'object', 'properties': { 'enabled': {'type': 'boolean', 'default': False}, - 'mode': { - 'type': 'string', - 'enum': FOLLOWER_MODE_OPTIONS - }, - 'api_token': {'type': 'string', 'default': ''}, - 'leaders': { + 'producers': { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'url': {'type': 'string', 'default': ''}, - 'api_token': {'type': 'string', 'default': ''}, + 'ws_token': {'type': 'string', 'default': ''}, } } }, - 'follower_reply_timeout': {'type': 'integer'}, - 'follower_sleep_time': {'type': 'integer'}, - 'follower_ping_timeout': {'type': 'integer'}, + 'reply_timeout': {'type': 'integer'}, + 'sleep_time': {'type': 'integer'}, + 'ping_timeout': {'type': 'integer'}, 'wait_data_policy': { 'type': 'string', 'enum': WAIT_DATA_POLICY_OPTIONS }, + 'wait_data_timeout': {'type': 'integer', 'default': 5}, 'remove_signals_analyzed_df': {'type': 'boolean', 'default': False} }, - 'required': ['mode'] + 'required': ['producers'] }, "freqai": { "type": "object", diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index cd70db9a3..430ee0932 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -48,11 +48,13 @@ class DataProvider: self.__msg_cache = PeriodicCache( maxsize=1000, ttl=timeframe_to_seconds(self._config.get('timeframe', '1h'))) - self._num_sources = len(self._config.get('external_signal', {}).get('leader_list', [])) - self._wait_data_policy = self._config.get('external_signal', {}).get( + self._num_sources = len( + self._config.get('external_message_consumer', {}).get('producers', []) + ) + self._wait_data_policy = self._config.get('external_message_consumer', {}).get( 'wait_data_policy', WaitDataPolicy.all) - self._wait_data_timeout = self._config.get( - 'external_signal', {}).get('wait_data_timeout', 5) + self._wait_data_timeout = self._config.get('external_message_consumer', {}).get( + 'wait_data_timeout', 5) def _set_dataframe_max_index(self, limit_index: int): """ diff --git a/freqtrade/enums/__init__.py b/freqtrade/enums/__init__.py index 406d847e6..229d770ce 100644 --- a/freqtrade/enums/__init__.py +++ b/freqtrade/enums/__init__.py @@ -3,7 +3,7 @@ from freqtrade.enums.backteststate import BacktestState from freqtrade.enums.candletype import CandleType from freqtrade.enums.exitchecktuple import ExitCheckTuple from freqtrade.enums.exittype import ExitType -from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy +from freqtrade.enums.externalmessages import WaitDataPolicy from freqtrade.enums.marginmode import MarginMode from freqtrade.enums.ordertypevalue import OrderTypeValues from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType diff --git a/freqtrade/enums/externalmessages.py b/freqtrade/enums/externalmessages.py new file mode 100644 index 000000000..e43899ab5 --- /dev/null +++ b/freqtrade/enums/externalmessages.py @@ -0,0 +1,7 @@ +from enum import Enum + + +class WaitDataPolicy(str, Enum): + none = "none" + one = "one" + all = "all" diff --git a/freqtrade/enums/externalsignal.py b/freqtrade/enums/externalsignal.py deleted file mode 100644 index 05dc604a2..000000000 --- a/freqtrade/enums/externalsignal.py +++ /dev/null @@ -1,18 +0,0 @@ -from enum import Enum - - -class ExternalSignalModeType(str, Enum): - leader = "leader" - follower = "follower" - - -class LeaderMessageType(str, Enum): - default = "default" - pairlist = "pairlist" - analyzed_df = "analyzed_df" - - -class WaitDataPolicy(str, Enum): - none = "none" - one = "one" - all = "all" diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index c9caaace6..c0d658c61 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -30,6 +30,7 @@ from freqtrade.plugins.pairlistmanager import PairListManager from freqtrade.plugins.protectionmanager import ProtectionManager from freqtrade.resolvers import ExchangeResolver, StrategyResolver from freqtrade.rpc import RPCManager +from freqtrade.rpc.emc import ExternalMessageConsumer from freqtrade.strategy.interface import IStrategy from freqtrade.strategy.strategy_wrapper import strategy_safe_wrapper from freqtrade.util import FtPrecise @@ -90,11 +91,17 @@ class FreqtradeBot(LoggingMixin): self.strategy.dp = self.dataprovider # Attach Wallets to strategy instance self.strategy.wallets = self.wallets + # Attach rpc to strategy instance + self.strategy.rpc = self.rpc # Initializing Edge only if enabled self.edge = Edge(self.config, self.exchange, self.strategy) if \ self.config.get('edge', {}).get('enabled', False) else None + # Init ExternalMessageConsumer if enabled + self.emc = ExternalMessageConsumer(self.rpc._rpc, self.config) if \ + self.config.get('external_message_consumer', {}).get('enabled', False) else None + self.active_pair_whitelist = self._refresh_active_whitelist() # Set initial bot state from config @@ -150,6 +157,8 @@ class FreqtradeBot(LoggingMixin): self.check_for_open_trades() self.rpc.cleanup() + if self.emc: + self.emc.shutdown() Trade.commit() self.exchange.close() @@ -192,7 +201,11 @@ class FreqtradeBot(LoggingMixin): strategy_safe_wrapper(self.strategy.bot_loop_start, supress_error=True)() - self.strategy.analyze(self.active_pair_whitelist) + if self.emc: + leader_pairs = self.pairlists._whitelist + self.strategy.analyze_external(self.active_pair_whitelist, leader_pairs) + else: + self.strategy.analyze(self.active_pair_whitelist) with self._exit_lock: # Check for exchange cancelations, timeouts and user requested replace @@ -255,7 +268,7 @@ class FreqtradeBot(LoggingMixin): self.pairlists.refresh_pairlist() _whitelist = self.pairlists.whitelist - self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'msg': _whitelist}) + self.rpc.send_msg({'type': RPCMessageType.WHITELIST, 'data': _whitelist}) # Calculating Edge positioning if self.edge: diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index c8d1b70fa..88bae099a 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -26,7 +26,7 @@ async def message_endpoint( # Return a channel ID, pass that instead of ws to the rest of the methods channel = await channel_manager.on_connect(ws) - # Keep connection open until explicitly closed, and sleep + # Keep connection open until explicitly closed, and process requests try: while not channel.is_closed(): request = await channel.recv() diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index f4af8c8ed..e391e66af 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -112,9 +112,6 @@ class ApiServer(RPCHandler): # Cancel the queue task self._background_task.cancel() - # Finally stop the loop - self._loop.call_soon_threadsafe(self._loop.stop) - self._thread.join() @classmethod @@ -127,7 +124,6 @@ class ApiServer(RPCHandler): def send_msg(self, msg: Dict[str, str]) -> None: if self._queue: - logger.info(f"Adding message to queue: {msg}") sync_q = self._queue.sync_q sync_q.put(msg) @@ -155,9 +151,11 @@ class ApiServer(RPCHandler): app.include_router(api_backtest, prefix="/api/v1", dependencies=[Depends(http_basic_or_jwt_token)], ) - app.include_router(ws_router, prefix="/api/v1", - dependencies=[Depends(get_ws_token)] - ) + if self._config.get('api_server', {}).get('enable_message_ws', False): + logger.info("Enabling Message WebSocket") + app.include_router(ws_router, prefix="/api/v1", + dependencies=[Depends(get_ws_token)] + ) app.include_router(router_login, prefix="/api/v1", tags=["auth"]) # UI Router MUST be last! app.include_router(router_ui, prefix='') @@ -194,17 +192,19 @@ class ApiServer(RPCHandler): try: while True: - logger.debug("Getting queue data...") + logger.debug("Getting queue messages...") # Get data from queue - data = await async_queue.get() - logger.debug(f"Found data: {data}") + message = await async_queue.get() + logger.debug(f"Found message of type: {message.get('type')}") # Broadcast it - await self._channel_manager.broadcast(data) + await self._channel_manager.broadcast(message) # Sleep, make this configurable? await asyncio.sleep(0.1) except asyncio.CancelledError: - # Silently stop - pass + # Disconnect channels and stop the loop on cancel + await self._channel_manager.disconnect_all() + self._loop.stop() + # For testing, shouldn't happen when stable except Exception as e: logger.info(f"Exception happened in background task: {e}") @@ -246,7 +246,8 @@ class ApiServer(RPCHandler): if self._standalone: self._server.run() else: - self.start_message_queue() + if self._config.get('api_server', {}).get('enable_message_ws', False): + self.start_message_queue() self._server.run_in_thread() except Exception: logger.exception("Api server failed to start.") diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index f24713a77..6bc5b9d6b 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -116,8 +116,6 @@ class ChannelManager: with self._lock: channel = self.channels.get(websocket) if channel: - logger.debug(f"Disconnecting channel - {channel}") - if not channel.is_closed(): await channel.close() @@ -142,7 +140,6 @@ class ChannelManager: """ with self._lock: message_type = data.get('type') - logger.debug(f"Broadcasting data: {message_type} - {data}") for websocket, channel in self.channels.items(): try: if channel.subscribed_to(message_type): diff --git a/freqtrade/rpc/emc.py b/freqtrade/rpc/emc.py new file mode 100644 index 000000000..48ad78266 --- /dev/null +++ b/freqtrade/rpc/emc.py @@ -0,0 +1,229 @@ +""" +ExternalMessageConsumer module + +Main purpose is to connect to external bot's message websocket to consume data +from it +""" +import asyncio +import logging +import socket +from threading import Thread +from typing import Any, Dict + +import websockets + +from freqtrade.enums import RPCMessageType, RPCRequestType +from freqtrade.misc import json_to_dataframe, remove_entry_exit_signals +from freqtrade.rpc import RPC +from freqtrade.rpc.api_server.ws.channel import WebSocketChannel + + +logger = logging.getLogger(__name__) + + +class ExternalMessageConsumer: + """ + The main controller class for consuming external messages from + other FreqTrade bot's + """ + + def __init__( + self, + rpc: RPC, + config: Dict[str, Any], + ): + self._rpc = rpc + self._config = config + + self._running = False + self._thread = None + self._loop = None + self._main_task = None + self._sub_tasks = None + + self._emc_config = self._config.get('external_message_consumer', {}) + + self.enabled = self._emc_config.get('enabled', False) + self.producers = self._emc_config.get('producers', []) + + if self.enabled and len(self.producers) < 1: + raise ValueError("You must specify at least 1 Producer to connect to.") + + self.reply_timeout = self._emc_config.get('reply_timeout', 10) + self.ping_timeout = self._emc_config.get('ping_timeout', 2) + self.sleep_time = self._emc_config.get('sleep_time', 5) + + # Setting these explicitly as they probably shouldn't be changed by a user + # Unless we somehow integrate this with the strategy to allow creating + # callbacks for the messages + self.topics = [RPCMessageType.WHITELIST, RPCMessageType.ANALYZED_DF] + + self.start() + + def start(self): + """ + Start the main internal loop in another thread to run coroutines + """ + self._loop = asyncio.new_event_loop() + + if not self._thread: + logger.info("Starting ExternalMessageConsumer") + + self._thread = Thread(target=self._loop.run_forever) + self._thread.start() + self._running = True + else: + raise RuntimeError("A loop is already running") + + self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop) + + def shutdown(self): + """ + Shutdown the loop, thread, and tasks + """ + if self._thread and self._loop: + logger.info("Stopping ExternalMessageConsumer") + + if self._sub_tasks: + # Cancel sub tasks + for task in self._sub_tasks: + task.cancel() + + if self._main_task: + # Cancel the main task + self._main_task.cancel() + + self._thread.join() + + async def _main(self): + """ + The main task coroutine + """ + rpc_lock = asyncio.Lock() + + try: + # Create a connection to each producer + self._sub_tasks = [ + self._loop.create_task(self._handle_producer_connection(producer, rpc_lock)) + for producer in self.producers + ] + + await asyncio.gather(*self._sub_tasks) + except asyncio.CancelledError: + pass + finally: + # Stop the loop once we are done + self._loop.stop() + + async def _handle_producer_connection(self, producer, lock): + """ + Main connection loop for the consumer + """ + try: + while True: + try: + url, token = producer['url'], producer['ws_token'] + ws_url = f"{url}?token={token}" + + async with websockets.connect(ws_url) as ws: + logger.info("Connection successful") + channel = WebSocketChannel(ws) + + # Tell the producer we only want these topics + # Should always be the first thing we send + await channel.send( + self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) + ) + + # Now receive data, if none is within the time limit, ping + while True: + try: + message = await asyncio.wait_for( + channel.recv(), + timeout=5 + ) + + async with lock: + # Handle the data here + # We use a lock because it will call RPC methods + self.handle_producer_message(message) + + except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): + # We haven't received data yet. Check the connection and continue. + try: + # ping + ping = await channel.ping() + + await asyncio.wait_for(ping, timeout=self.ping_timeout) + logger.debug(f"Connection to {url} still alive...") + + continue + except Exception: + logger.info( + f"Ping error {url} - retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + + break + except ( + socket.gaierror, + ConnectionRefusedError, + websockets.exceptions.InvalidStatusCode + ) as e: + logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") + await asyncio.sleep(self.sleep_time) + + continue + + except asyncio.CancelledError: + # Exit silently + pass + + def compose_consumer_request(self, type_: str, data: Any) -> Dict[str, Any]: + """ + Create a request for sending to a producer + + :param type_: The RPCRequestType + :param data: The data to send + :returns: Dict[str, Any] + """ + return {'type': type_, 'data': data} + + # How we do things here isn't set in stone. There seems to be some interest + # in figuring out a better way, but we shall do this for now. + def handle_producer_message(self, message: Dict[str, Any]): + """ + Handles external messages from a Producer + """ + # Should we have a default message type? + message_type = message.get('type', RPCMessageType.STATUS) + message_data = message.get('data') + + logger.debug(f"Received message of type {message_type}") + + # Handle Whitelists + if message_type == RPCMessageType.WHITELIST: + pairlist = message_data + + # Add the pairlist data to the ExternalPairlist plugin + external_pairlist = self._rpc._freqtrade.pairlists._pairlist_handlers[0] + external_pairlist.add_pairlist_data(pairlist) + + # Handle analyzed dataframes + elif message_type == RPCMessageType.ANALYZED_DF: + # This shouldn't happen + if message_data is None: + return + + key, value = message_data.get('key'), message_data.get('data') + pair, timeframe, candle_type = key + + # Convert the JSON to a pandas DataFrame + dataframe = json_to_dataframe(value) + + # If set, remove the Entry and Exit signals from the Producer + if self._emc_config.get('remove_entry_exit_signals', False): + dataframe = remove_entry_exit_signals(dataframe) + + # Add the dataframe to the dataprovider + dataprovider = self._rpc._freqtrade.dataprovider + dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) diff --git a/freqtrade/rpc/external_signal/__init__.py b/freqtrade/rpc/external_signal/__init__.py deleted file mode 100644 index decc51551..000000000 --- a/freqtrade/rpc/external_signal/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -# # flake8: noqa: F401 -# from freqtrade.rpc.external_signal.controller import ExternalSignalController -# -# -# __all__ = ('ExternalSignalController') diff --git a/freqtrade/rpc/external_signal/channel.py b/freqtrade/rpc/external_signal/channel.py deleted file mode 100644 index 5b278dfed..000000000 --- a/freqtrade/rpc/external_signal/channel.py +++ /dev/null @@ -1,145 +0,0 @@ -# import logging -# from threading import RLock -# from typing import Type -# -# from freqtrade.rpc.external_signal.proxy import WebSocketProxy -# from freqtrade.rpc.external_signal.serializer import MsgPackWebSocketSerializer -# from freqtrade.rpc.external_signal.types import WebSocketType -# -# -# logger = logging.getLogger(__name__) -# -# -# class WebSocketChannel: -# """ -# Object to help facilitate managing a websocket connection -# """ -# -# def __init__( -# self, -# websocket: WebSocketType, -# serializer_cls: Type[WebSocketSerializer] = MsgPackWebSocketSerializer -# ): -# # The WebSocket object -# self._websocket = WebSocketProxy(websocket) -# # The Serializing class for the WebSocket object -# self._serializer_cls = serializer_cls -# -# # Internal event to signify a closed websocket -# self._closed = False -# -# # Wrap the WebSocket in the Serializing class -# self._wrapped_ws = self._serializer_cls(self._websocket) -# -# async def send(self, data): -# """ -# Send data on the wrapped websocket -# """ -# # logger.info(f"Serialized Send - {self._wrapped_ws._serialize(data)}") -# await self._wrapped_ws.send(data) -# -# async def recv(self): -# """ -# Receive data on the wrapped websocket -# """ -# return await self._wrapped_ws.recv() -# -# async def ping(self): -# """ -# Ping the websocket -# """ -# return await self._websocket.ping() -# -# async def close(self): -# """ -# Close the WebSocketChannel -# """ -# -# self._closed = True -# -# def is_closed(self): -# return self._closed -# -# -# class ChannelManager: -# def __init__(self): -# self.channels = dict() -# self._lock = RLock() # Re-entrant Lock -# -# async def on_connect(self, websocket: WebSocketType): -# """ -# Wrap websocket connection into Channel and add to list -# -# :param websocket: The WebSocket object to attach to the Channel -# """ -# if hasattr(websocket, "accept"): -# try: -# await websocket.accept() -# except RuntimeError: -# # The connection was closed before we could accept it -# return -# -# ws_channel = WebSocketChannel(websocket) -# -# with self._lock: -# self.channels[websocket] = ws_channel -# -# return ws_channel -# -# async def on_disconnect(self, websocket: WebSocketType): -# """ -# Call close on the channel if it's not, and remove from channel list -# -# :param websocket: The WebSocket objet attached to the Channel -# """ -# with self._lock: -# channel = self.channels.get(websocket) -# if channel: -# logger.debug(f"Disconnecting channel - {channel}") -# -# if not channel.is_closed(): -# await channel.close() -# -# del self.channels[websocket] -# -# async def disconnect_all(self): -# """ -# Disconnect all Channels -# """ -# with self._lock: -# for websocket, channel in self.channels.items(): -# if not channel.is_closed(): -# await channel.close() -# -# self.channels = dict() -# -# async def broadcast(self, data): -# """ -# Broadcast data on all Channels -# -# :param data: The data to send -# """ -# with self._lock: -# for websocket, channel in self.channels.items(): -# try: -# await channel.send(data) -# except RuntimeError: -# # Handle cannot send after close cases -# await self.on_disconnect(websocket) -# -# async def send_direct(self, channel, data): -# """ -# Send data directly through direct_channel only -# -# :param direct_channel: The WebSocketChannel object to send data through -# :param data: The data to send -# """ -# # We iterate over the channels to get reference to the websocket object -# # so we can disconnect incase of failure -# await channel.send(data) -# -# def has_channels(self): -# """ -# Flag for more than 0 channels -# """ -# return len(self.channels) > 0 diff --git a/freqtrade/rpc/external_signal/controller.py b/freqtrade/rpc/external_signal/controller.py deleted file mode 100644 index 616ea7801..000000000 --- a/freqtrade/rpc/external_signal/controller.py +++ /dev/null @@ -1,449 +0,0 @@ -# """ -# This module manages replicate mode communication -# """ -# import asyncio -# import logging -# import secrets -# import socket -# from threading import Thread -# from typing import Any, Callable, Coroutine, Dict, Union -# -# import websockets -# from fastapi import Depends -# from fastapi import WebSocket as FastAPIWebSocket -# from fastapi import WebSocketDisconnect, status -# from janus import Queue as ThreadedQueue -# -# from freqtrade.enums import ExternalSignalModeType, LeaderMessageType, RPCMessageType -# from freqtrade.rpc import RPC, RPCHandler -# from freqtrade.rpc.external_signal.channel import ChannelManager -# from freqtrade.rpc.external_signal.types import MessageType -# from freqtrade.rpc.external_signal.utils import is_websocket_alive -# -# -# logger = logging.getLogger(__name__) -# -# -# class ExternalSignalController(RPCHandler): -# """ This class handles all websocket communication """ -# -# def __init__( -# self, -# rpc: RPC, -# config: Dict[str, Any], -# api_server: Union[Any, None] = None -# ) -> None: -# """ -# Init the ExternalSignalController class, and init the super class RPCHandler -# :param rpc: instance of RPC Helper class -# :param config: Configuration object -# :param api_server: The ApiServer object -# :return: None -# """ -# super().__init__(rpc, config) -# -# self.freqtrade = rpc._freqtrade -# self.api_server = api_server -# -# if not self.api_server: -# raise RuntimeError("The API server must be enabled for external signals to work") -# -# self._loop = None -# self._running = False -# self._thread = None -# self._queue = None -# -# self._main_task = None -# self._sub_tasks = None -# -# self._message_handlers = { -# LeaderMessageType.pairlist: self._rpc._handle_pairlist_message, -# LeaderMessageType.analyzed_df: self._rpc._handle_analyzed_df_message, -# LeaderMessageType.default: self._rpc._handle_default_message -# } -# -# self.channel_manager = ChannelManager() -# self.external_signal_config = config.get('external_signal', {}) -# -# # What the config should look like -# # "external_signal": { -# # "enabled": true, -# # "mode": "follower", -# # "leaders": [ -# # { -# # "url": "ws://localhost:8080/signals/ws", -# # "api_token": "test" -# # } -# # ] -# # } -# -# # "external_signal": { -# # "enabled": true, -# # "mode": "leader", -# # "api_token": "test" -# # } -# -# self.mode = ExternalSignalModeType[ -# self.external_signal_config.get('mode', 'leader').lower() -# ] -# -# self.leaders_list = self.external_signal_config.get('leaders', []) -# self.push_throttle_secs = self.external_signal_config.get('push_throttle_secs', 0.1) -# -# self.reply_timeout = self.external_signal_config.get('follower_reply_timeout', 10) -# self.ping_timeout = self.external_signal_config.get('follower_ping_timeout', 2) -# self.sleep_time = self.external_signal_config.get('follower_sleep_time', 5) -# -# # Validate external_signal_config here? -# -# if self.mode == ExternalSignalModeType.follower and len(self.leaders_list) == 0: -# raise ValueError("You must specify at least 1 leader in follower mode.") -# -# # This is only used by the leader, the followers use the tokens specified -# # in each of the leaders -# # If you do not specify an API key in the config, one will be randomly -# # generated and logged on startup -# default_api_key = secrets.token_urlsafe(16) -# self.secret_api_key = self.external_signal_config.get('api_token', default_api_key) -# -# self.start() -# -# def is_leader(self): -# """ -# Leader flag -# """ -# return self.enabled() and self.mode == ExternalSignalModeType.leader -# -# def enabled(self): -# """ -# Enabled flag -# """ -# return self.external_signal_config.get('enabled', False) -# -# def num_leaders(self): -# """ -# The number of leaders we should be connected to -# """ -# return len(self.leaders_list) -# -# def start_threaded_loop(self): -# """ -# Start the main internal loop in another thread to run coroutines -# """ -# self._loop = asyncio.new_event_loop() -# -# if not self._thread: -# self._thread = Thread(target=self._loop.run_forever) -# self._thread.start() -# self._running = True -# else: -# raise RuntimeError("A loop is already running") -# -# def submit_coroutine(self, coroutine: Coroutine): -# """ -# Submit a coroutine to the threaded loop -# """ -# if not self._running: -# raise RuntimeError("Cannot schedule new futures after shutdown") -# -# if not self._loop or not self._loop.is_running(): -# raise RuntimeError("Loop must be started before any function can" -# " be submitted") -# -# return asyncio.run_coroutine_threadsafe(coroutine, self._loop) -# -# def start(self): -# """ -# Start the controller main loop -# """ -# self.start_threaded_loop() -# self._main_task = self.submit_coroutine(self.main()) -# -# async def shutdown(self): -# """ -# Shutdown all tasks and close up -# """ -# logger.info("Stopping rpc.externalsignalcontroller") -# -# # Flip running flag -# self._running = False -# -# # Cancel sub tasks -# for task in self._sub_tasks: -# task.cancel() -# -# # Then disconnect all channels -# await self.channel_manager.disconnect_all() -# -# def cleanup(self) -> None: -# """ -# Cleanup pending module resources. -# """ -# if self._thread: -# if self._loop.is_running(): -# self._main_task.cancel() -# self._thread.join() -# -# async def main(self): -# """ -# Main coro -# -# Start the loop based on what mode we're in -# """ -# try: -# if self.mode == ExternalSignalModeType.leader: -# logger.info("Starting rpc.externalsignalcontroller in Leader mode") -# -# await self.run_leader_mode() -# elif self.mode == ExternalSignalModeType.follower: -# logger.info("Starting rpc.externalsignalcontroller in Follower mode") -# -# await self.run_follower_mode() -# -# except asyncio.CancelledError: -# # We're cancelled -# await self.shutdown() -# except Exception as e: -# # Log the error -# logger.error(f"Exception occurred in main task: {e}") -# logger.exception(e) -# finally: -# # This coroutine is the last thing to be ended, so it should stop the loop -# self._loop.stop() -# -# def log_api_token(self): -# """ -# Log the API token -# """ -# logger.info("-" * 15) -# logger.info(f"API_KEY: {self.secret_api_key}") -# logger.info("-" * 15) -# -# def send_msg(self, msg: MessageType) -> None: -# """ -# Support RPC calls -# """ -# if msg["type"] == RPCMessageType.EMIT_DATA: -# message = msg.get("message") -# if message: -# self.send_message(message) -# else: -# logger.error(f"Message is empty! {msg}") -# -# def send_message(self, msg: MessageType) -> None: -# """ -# Broadcast message over all channels if there are any -# """ -# -# if self.channel_manager.has_channels(): -# self._send_message(msg) -# else: -# logger.debug("No listening followers, skipping...") -# pass -# -# def _send_message(self, msg: MessageType): -# """ -# Add data to the internal queue to be broadcasted. This func will block -# if the queue is full. This is meant to be called in the main thread. -# """ -# if self._queue: -# queue = self._queue.sync_q -# queue.put(msg) # This will block if the queue is full -# else: -# logger.warning("Can not send data, leader loop has not started yet!") -# -# async def send_initial_data(self, channel): -# logger.info("Sending initial data through channel") -# -# data = self._rpc._initial_leader_data() -# -# for message in data: -# await channel.send(message) -# -# async def _handle_leader_message(self, message: MessageType): -# """ -# Handle message received from a Leader -# """ -# type = message.get("data_type", LeaderMessageType.default) -# data = message.get("data") -# -# handler: Callable = self._message_handlers[type] -# handler(type, data) -# -# # ---------------------------------------------------------------------- -# -# async def run_leader_mode(self): -# """ -# Main leader coroutine -# -# This starts all of the leader coros and registers the endpoint on -# the ApiServer -# """ -# self.register_leader_endpoint() -# self.log_api_token() -# -# self._sub_tasks = [ -# self._loop.create_task(self._broadcast_queue_data()) -# ] -# -# return await asyncio.gather(*self._sub_tasks) -# -# async def run_follower_mode(self): -# """ -# Main follower coroutine -# -# This starts all of the follower connection coros -# """ -# -# rpc_lock = asyncio.Lock() -# -# self._sub_tasks = [ -# self._loop.create_task(self._handle_leader_connection(leader, rpc_lock)) -# for leader in self.leaders_list -# ] -# -# return await asyncio.gather(*self._sub_tasks) -# -# async def _broadcast_queue_data(self): -# """ -# Loop over queue data and broadcast it -# """ -# # Instantiate the queue in this coroutine so it's attached to our loop -# self._queue = ThreadedQueue() -# async_queue = self._queue.async_q -# -# try: -# while self._running: -# # Get data from queue -# data = await async_queue.get() -# -# # Broadcast it to everyone -# await self.channel_manager.broadcast(data) -# -# # Sleep -# await asyncio.sleep(self.push_throttle_secs) -# -# except asyncio.CancelledError: -# # Silently stop -# pass -# -# async def get_api_token( -# self, -# websocket: FastAPIWebSocket, -# token: Union[str, None] = None -# ): -# """ -# Extract the API key from query param. Must match the -# set secret_api_key or the websocket connection will be closed. -# """ -# if token == self.secret_api_key: -# return token -# else: -# logger.info("Denying websocket request...") -# await websocket.close(code=status.WS_1008_POLICY_VIOLATION) -# -# def register_leader_endpoint(self, path: str = "/signals/ws"): -# """ -# Attach and start the main leader loop to the ApiServer -# -# :param path: The endpoint path -# """ -# if not self.api_server: -# raise RuntimeError("The leader needs the ApiServer to be active") -# -# # The endpoint function for running the main leader loop -# @self.api_server.app.websocket(path) -# async def leader_endpoint( -# websocket: FastAPIWebSocket, -# api_key: str = Depends(self.get_api_token) -# ): -# await self.leader_endpoint_loop(websocket) -# -# async def leader_endpoint_loop(self, websocket: FastAPIWebSocket): -# """ -# The WebSocket endpoint served by the ApiServer. This handles connections, -# and adding them to the channel manager. -# """ -# try: -# if is_websocket_alive(websocket): -# logger.info(f"Follower connected - {websocket.client}") -# channel = await self.channel_manager.on_connect(websocket) -# -# # Send initial data here -# # Data is being broadcasted right away as soon as startup, -# # we may not have to send initial data at all. Further testing -# # required. -# await self.send_initial_data(channel) -# -# # Keep connection open until explicitly closed, and sleep -# try: -# while not channel.is_closed(): -# request = await channel.recv() -# logger.info(f"Follower request - {request}") -# -# except WebSocketDisconnect: -# # Handle client disconnects -# logger.info(f"Follower disconnected - {websocket.client}") -# await self.channel_manager.on_disconnect(websocket) -# except Exception as e: -# logger.info(f"Follower connection failed - {websocket.client}") -# logger.exception(e) -# # Handle cases like - -# # RuntimeError('Cannot call "send" once a closed message has been sent') -# await self.channel_manager.on_disconnect(websocket) -# -# except Exception: -# logger.error(f"Failed to serve - {websocket.client}") -# await self.channel_manager.on_disconnect(websocket) -# -# async def _handle_leader_connection(self, leader, lock): -# """ -# Given a leader, connect and wait on data. If connection is lost, -# it will attempt to reconnect. -# """ -# try: -# url, token = leader["url"], leader["api_token"] -# websocket_url = f"{url}?token={token}" -# -# logger.info(f"Attempting to connect to Leader at: {url}") -# while True: -# try: -# async with websockets.connect(websocket_url) as ws: -# channel = await self.channel_manager.on_connect(ws) -# logger.info(f"Connection to Leader at {url} successful") -# while True: -# try: -# data = await asyncio.wait_for( -# channel.recv(), -# timeout=self.reply_timeout -# ) -# except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): -# # We haven't received data yet. Check the connection and continue. -# try: -# # ping -# ping = await channel.ping() -# await asyncio.wait_for(ping, timeout=self.ping_timeout) -# logger.debug(f"Connection to {url} still alive...") -# continue -# except Exception: -# logger.info( -# f"Ping error {url} - retrying in {self.sleep_time}s") -# asyncio.sleep(self.sleep_time) -# break -# -# async with lock: -# # Acquire lock so only 1 coro handling at a time -# # as we call the RPC module in the main thread -# await self._handle_leader_message(data) -# -# except (socket.gaierror, ConnectionRefusedError): -# logger.info(f"Connection Refused - retrying connection in {self.sleep_time}s") -# await asyncio.sleep(self.sleep_time) -# continue -# except websockets.exceptions.InvalidStatusCode as e: -# logger.error(f"Connection Refused - {e}") -# await asyncio.sleep(self.sleep_time) -# continue -# -# except asyncio.CancelledError: -# pass diff --git a/freqtrade/rpc/external_signal/proxy.py b/freqtrade/rpc/external_signal/proxy.py deleted file mode 100644 index df2a07da0..000000000 --- a/freqtrade/rpc/external_signal/proxy.py +++ /dev/null @@ -1,61 +0,0 @@ -# from typing import Union -# -# from fastapi import WebSocket as FastAPIWebSocket -# from websockets import WebSocketClientProtocol as WebSocket -# -# from freqtrade.rpc.external_signal.types import WebSocketType -# -# -# class WebSocketProxy: -# """ -# WebSocketProxy object to bring the FastAPIWebSocket and websockets.WebSocketClientProtocol -# under the same API -# """ -# -# def __init__(self, websocket: WebSocketType): -# self._websocket: Union[FastAPIWebSocket, WebSocket] = websocket -# -# async def send(self, data): -# """ -# Send data on the wrapped websocket -# """ -# if isinstance(data, str): -# data = data.encode() -# -# if hasattr(self._websocket, "send_bytes"): -# await self._websocket.send_bytes(data) -# else: -# await self._websocket.send(data) -# -# async def recv(self): -# """ -# Receive data on the wrapped websocket -# """ -# if hasattr(self._websocket, "receive_bytes"): -# return await self._websocket.receive_bytes() -# else: -# return await self._websocket.recv() -# -# async def ping(self): -# """ -# Ping the websocket, not supported by FastAPI WebSockets -# """ -# if hasattr(self._websocket, "ping"): -# return await self._websocket.ping() -# return False -# -# async def close(self, code: int = 1000): -# """ -# Close the websocket connection, only supported by FastAPI WebSockets -# """ -# if hasattr(self._websocket, "close"): -# return await self._websocket.close(code) -# pass -# -# async def accept(self): -# """ -# Accept the WebSocket connection, only support by FastAPI WebSockets -# """ -# if hasattr(self._websocket, "accept"): -# return await self._websocket.accept() -# pass diff --git a/freqtrade/rpc/external_signal/serializer.py b/freqtrade/rpc/external_signal/serializer.py deleted file mode 100644 index a23469ef4..000000000 --- a/freqtrade/rpc/external_signal/serializer.py +++ /dev/null @@ -1,65 +0,0 @@ -# import json -# import logging -# from abc import ABC, abstractmethod -# -# import msgpack -# import orjson -# -# from freqtrade.rpc.external_signal.proxy import WebSocketProxy -# -# -# logger = logging.getLogger(__name__) -# -# -# class WebSocketSerializer(ABC): -# def __init__(self, websocket: WebSocketProxy): -# self._websocket: WebSocketProxy = websocket -# -# @abstractmethod -# def _serialize(self, data): -# raise NotImplementedError() -# -# @abstractmethod -# def _deserialize(self, data): -# raise NotImplementedError() -# -# async def send(self, data: bytes): -# await self._websocket.send(self._serialize(data)) -# -# async def recv(self) -> bytes: -# data = await self._websocket.recv() -# -# return self._deserialize(data) -# -# async def close(self, code: int = 1000): -# await self._websocket.close(code) -# -# # Going to explore using MsgPack as the serialization, -# # as that might be the best method for sending pandas -# # dataframes over the wire -# -# -# class JSONWebSocketSerializer(WebSocketSerializer): -# def _serialize(self, data): -# return json.dumps(data) -# -# def _deserialize(self, data): -# return json.loads(data) -# -# -# class ORJSONWebSocketSerializer(WebSocketSerializer): -# ORJSON_OPTIONS = orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY -# -# def _serialize(self, data): -# return orjson.dumps(data, option=self.ORJSON_OPTIONS) -# -# def _deserialize(self, data): -# return orjson.loads(data, option=self.ORJSON_OPTIONS) -# -# -# class MsgPackWebSocketSerializer(WebSocketSerializer): -# def _serialize(self, data): -# return msgpack.packb(data, use_bin_type=True) -# -# def _deserialize(self, data): -# return msgpack.unpackb(data, raw=False) diff --git a/freqtrade/rpc/external_signal/types.py b/freqtrade/rpc/external_signal/types.py deleted file mode 100644 index 38e43f667..000000000 --- a/freqtrade/rpc/external_signal/types.py +++ /dev/null @@ -1,8 +0,0 @@ -# from typing import Any, Dict, TypeVar -# -# from fastapi import WebSocket as FastAPIWebSocket -# from websockets import WebSocketClientProtocol as WebSocket -# -# -# WebSocketType = TypeVar("WebSocketType", FastAPIWebSocket, WebSocket) -# MessageType = Dict[str, Any] diff --git a/freqtrade/rpc/external_signal/utils.py b/freqtrade/rpc/external_signal/utils.py deleted file mode 100644 index 72c8d2ef8..000000000 --- a/freqtrade/rpc/external_signal/utils.py +++ /dev/null @@ -1,10 +0,0 @@ -# from starlette.websockets import WebSocket, WebSocketState -# -# -# async def is_websocket_alive(ws: WebSocket) -> bool: -# if ( -# ws.application_state == WebSocketState.CONNECTED and -# ws.client_state == WebSocketState.CONNECTED -# ): -# return True -# return False diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index f684c7783..a41d08d55 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1099,65 +1099,3 @@ class RPC: if all([any(x.value == topic for x in RPCMessageType) for topic in data]): logger.debug(f"{channel} subscribed to topics: {data}") channel.set_subscriptions(data) - # - # # ------------------------------ EXTERNAL SIGNALS ----------------------- - # - # def _initial_leader_data(self): - # # We create a list of Messages to send to the follower on connect - # data = [] - # - # # Send Pairlist data - # data.append({ - # "data_type": LeaderMessageType.pairlist, - # "data": self._freqtrade.pairlists._whitelist - # }) - # - # return data - # - # def _handle_pairlist_message(self, type, data): - # """ - # Handles the emitted pairlists from the Leaders - # - # :param type: The data_type of the data - # :param data: The data - # """ - # pairlist = data - # - # logger.debug(f"Handling Pairlist message: {pairlist}") - # - # external_pairlist = self._freqtrade.pairlists._pairlist_handlers[0] - # external_pairlist.add_pairlist_data(pairlist) - # - # def _handle_analyzed_df_message(self, type, data): - # """ - # Handles the analyzed dataframes from the Leaders - # - # :param type: The data_type of the data - # :param data: The data - # """ - # key, value = data["key"], data["value"] - # pair, timeframe, candle_type = key - # - # # Skip any pairs that we don't have in the pairlist? - # # leader_pairlist = self._freqtrade.pairlists._whitelist - # # if pair not in leader_pairlist: - # # return - # - # dataframe = json_to_dataframe(value) - # - # if self._config.get('external_signal', {}).get('remove_signals_analyzed_df', False): - # dataframe = remove_entry_exit_signals(dataframe) - # - # logger.debug(f"Handling analyzed dataframe for {pair}") - # logger.debug(dataframe.tail()) - # - # # Add the dataframe to the dataprovider - # dataprovider = self._freqtrade.dataprovider - # dataprovider.add_external_df(pair, timeframe, dataframe, candle_type) - # - # def _handle_default_message(self, type, data): - # """ - # Default leader message handler, just logs it. We should never have to - # run this unless the leader sends us some weird message. - # """ - # logger.debug(f"Received message from Leader of type {type}: {data}") diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index b3cd5604c..3488a6e3c 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -78,7 +78,9 @@ class RPCManager: 'status': 'stopping bot' } """ - logger.info('Sending rpc message: %s', msg) + # Removed actually showing the message because the logs would be + # completely spammed of the json dataframe + logger.info('Sending rpc message of type: %s', msg.get('type')) if 'pair' in msg: msg.update({ 'base_currency': self._rpc._freqtrade.exchange.get_pair_base_currency(msg['pair']) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 22a10b4d3..7120928ff 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -5,19 +5,21 @@ This module defines the interface to apply for strategies import logging from abc import ABC, abstractmethod from datetime import datetime, timedelta, timezone -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union import arrow from pandas import DataFrame from freqtrade.constants import ListPairsWithTimeframes from freqtrade.data.dataprovider import DataProvider -from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, SignalTagType, - SignalType, TradingMode) +from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, SignalDirection, + SignalTagType, SignalType, TradingMode) from freqtrade.enums.runmode import RunMode from freqtrade.exceptions import OperationalException, StrategyError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_next_date, timeframe_to_seconds +from freqtrade.misc import dataframe_to_json, remove_entry_exit_signals from freqtrade.persistence import Order, PairLocks, Trade +from freqtrade.rpc import RPCManager from freqtrade.strategy.hyper import HyperStrategyMixin from freqtrade.strategy.informative_decorator import (InformativeData, PopulateIndicators, _create_and_merge_informative_pair, @@ -111,6 +113,7 @@ class IStrategy(ABC, HyperStrategyMixin): # and wallets - access to the current balance. dp: DataProvider wallets: Optional[Wallets] = None + rpc: RPCManager # Filled from configuration stake_currency: str # container variable for strategy source code @@ -702,8 +705,7 @@ class IStrategy(ABC, HyperStrategyMixin): self, dataframe: DataFrame, metadata: dict, - external_data: bool = False, - finish_callback: Optional[Callable] = None, + external_data: bool = False ) -> DataFrame: """ Parses the given candle (OHLCV) data and returns a populated DataFrame @@ -729,17 +731,20 @@ class IStrategy(ABC, HyperStrategyMixin): candle_type = self.config.get('candle_type_def', CandleType.SPOT) self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type) - if finish_callback: - finish_callback(pair, dataframe, self.timeframe, candle_type) + if not external_data: + self.rpc.send_msg( + { + 'type': RPCMessageType.ANALYZED_DF, + 'data': { + 'key': (pair, self.timeframe, candle_type), + 'value': dataframe_to_json(dataframe) + } + } + ) else: logger.debug("Skipping TA Analysis for already analyzed candle") - dataframe[SignalType.ENTER_LONG.value] = 0 - dataframe[SignalType.EXIT_LONG.value] = 0 - dataframe[SignalType.ENTER_SHORT.value] = 0 - dataframe[SignalType.EXIT_SHORT.value] = 0 - dataframe[SignalTagType.ENTER_TAG.value] = None - dataframe[SignalTagType.EXIT_TAG.value] = None + dataframe = remove_entry_exit_signals(dataframe) logger.debug("Loop Analysis Launched") @@ -748,8 +753,7 @@ class IStrategy(ABC, HyperStrategyMixin): def analyze_pair( self, pair: str, - external_data: bool = False, - finish_callback: Optional[Callable] = None, + external_data: bool = False ) -> None: """ Fetch data for this pair from dataprovider and analyze. @@ -773,7 +777,7 @@ class IStrategy(ABC, HyperStrategyMixin): dataframe = strategy_safe_wrapper( self._analyze_ticker_internal, message="" - )(dataframe, {'pair': pair}, external_data, finish_callback) + )(dataframe, {'pair': pair}, external_data) self.assert_df(dataframe, df_len, df_close, df_date) except StrategyError as error: @@ -786,15 +790,14 @@ class IStrategy(ABC, HyperStrategyMixin): def analyze( self, - pairs: List[str], - finish_callback: Optional[Callable] = None + pairs: List[str] ) -> None: """ Analyze all pairs using analyze_pair(). :param pairs: List of pairs to analyze """ for pair in pairs: - self.analyze_pair(pair, finish_callback=finish_callback) + self.analyze_pair(pair) def analyze_external(self, pairs: List[str], leader_pairs: List[str]) -> None: """ @@ -808,10 +811,10 @@ class IStrategy(ABC, HyperStrategyMixin): # them normally. # List order is not preserved when doing this! # We use ^ instead of - for symmetric difference - # What do we do with these? extra_pairs = list(set(pairs) ^ set(leader_pairs)) # These would be the pairs that we have trades in, which means # we would have to analyze them normally + # Eventually maybe request data from the Leader if we don't have it? for pair in leader_pairs: # Analyze the pairs, but get the dataframe from the external data diff --git a/scripts/test_ws_client.py b/scripts/test_ws_client.py deleted file mode 100644 index 872ff3ccf..000000000 --- a/scripts/test_ws_client.py +++ /dev/null @@ -1,74 +0,0 @@ -import asyncio -import logging -import socket -from typing import Any - -import websockets - -from freqtrade.enums import RPCMessageType -from freqtrade.rpc.api_server.ws.channel import WebSocketChannel - - -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger(__name__) - - -def compose_consumer_request(type_: str, data: Any): - return {"type": type_, "data": data} - - -async def _client(): - # Trying to recreate multiple topic issue. Wait until first whitelist message, - # then CTRL-C to get the status message. - topics = [RPCMessageType.WHITELIST, RPCMessageType.STATUS] - try: - while True: - try: - url = "ws://localhost:8080/api/v1/message/ws?token=testtoken" - async with websockets.connect(url) as ws: - channel = WebSocketChannel(ws) - - logger.info("Connection successful") - # Tell the producer we only want these topics - await channel.send(compose_consumer_request("subscribe", topics)) - - while True: - try: - data = await asyncio.wait_for( - channel.recv(), - timeout=5 - ) - logger.info(f"Data received - {data}") - except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed): - # We haven't received data yet. Check the connection and continue. - try: - # ping - ping = await channel.ping() - await asyncio.wait_for(ping, timeout=2) - logger.debug(f"Connection to {url} still alive...") - continue - except Exception: - logger.info( - f"Ping error {url} - retrying in 5s") - await asyncio.sleep(2) - break - - except (socket.gaierror, ConnectionRefusedError): - logger.info("Connection Refused - retrying connection in 5s") - await asyncio.sleep(2) - continue - except websockets.exceptions.InvalidStatusCode as e: - logger.error(f"Connection Refused - {e}") - await asyncio.sleep(2) - continue - - except (asyncio.CancelledError, KeyboardInterrupt): - pass - - -def main(): - asyncio.run(_client()) - - -if __name__ == "__main__": - main()