diff --git a/freqtrade/enums/__init__.py b/freqtrade/enums/__init__.py index ffeb8cc12..406d847e6 100644 --- a/freqtrade/enums/__init__.py +++ b/freqtrade/enums/__init__.py @@ -6,7 +6,7 @@ from freqtrade.enums.exittype import ExitType from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy from freqtrade.enums.marginmode import MarginMode from freqtrade.enums.ordertypevalue import OrderTypeValues -from freqtrade.enums.rpcmessagetype import RPCMessageType +from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType from freqtrade.enums.state import State diff --git a/freqtrade/enums/rpcmessagetype.py b/freqtrade/enums/rpcmessagetype.py index 8e4182b33..6283fb7cc 100644 --- a/freqtrade/enums/rpcmessagetype.py +++ b/freqtrade/enums/rpcmessagetype.py @@ -21,9 +21,15 @@ class RPCMessageType(str, Enum): STRATEGY_MSG = 'strategy_msg' WHITELIST = 'whitelist' + ANALYZED_DF = 'analyzed_df' def __repr__(self): return self.value def __str__(self): return self.value + + +# Enum for parsing requests from ws consumers +class RPCRequestType(str, Enum): + SUBSCRIBE = 'subscribe' diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 405beed79..c8d1b70fa 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -2,8 +2,7 @@ import logging from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect -from freqtrade.enums import RPCMessageType -from freqtrade.rpc.api_server.deps import get_channel_manager +from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc_optional from freqtrade.rpc.api_server.ws.utils import is_websocket_alive @@ -16,7 +15,8 @@ router = APIRouter() @router.websocket("/message/ws") async def message_endpoint( ws: WebSocket, - channel_manager=Depends(get_channel_manager) + channel_manager=Depends(get_channel_manager), + rpc=Depends(get_rpc_optional) ): try: if is_websocket_alive(ws): @@ -31,19 +31,10 @@ async def message_endpoint( while not channel.is_closed(): request = await channel.recv() - # This is where we'd parse the request. For now this should only - # be a list of topics to subscribe too. List[str] - # Maybe allow the consumer to update the topics subscribed - # during runtime? - - # If the request isn't a list then skip it - if not isinstance(request, list): - continue - - # Check if all topics listed are an RPCMessageType - if all([any(x.value == topic for x in RPCMessageType) for topic in request]): - logger.debug(f"{ws.client} subscribed to topics: {request}") - channel.set_subscriptions(request) + # Process the request here. Should this be a method of RPC? + if rpc: + logger.info(f"Request: {request}") + rpc._process_consumer_request(request, channel) except WebSocketDisconnect: # Handle client disconnects diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 94cb8cd45..f4af8c8ed 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -63,11 +63,7 @@ class ApiServer(RPCHandler): ApiServer.__initialized = False return ApiServer.__instance - def __init__( - self, - config: Dict[str, Any], - standalone: bool = False, - ) -> None: + def __init__(self, config: Dict[str, Any], standalone: bool = False) -> None: ApiServer._config = config if self.__initialized and (standalone or self._standalone): return diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 3c7558158..f684c7783 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -19,8 +19,8 @@ from freqtrade.configuration.timerange import TimeRange from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT from freqtrade.data.history import load_data from freqtrade.data.metrics import calculate_max_drawdown -from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, State, - TradingMode) +from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, RPCRequestType, + SignalDirection, State, TradingMode) from freqtrade.exceptions import ExchangeError, PricingError from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs from freqtrade.loggers import bufferHandler @@ -1089,6 +1089,16 @@ class RPC: 'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT), 'last_process_ts': int(last_p.timestamp()), } + + # We are passed a Channel object, we can only do sync functions on that channel object + def _process_consumer_request(self, request, channel): + # Should we ensure that request is Dict[str, Any]? + type, data = request.get('type'), request.get('data') + + if type == RPCRequestType.SUBSCRIBE: + if all([any(x.value == topic for x in RPCMessageType) for topic in data]): + logger.debug(f"{channel} subscribed to topics: {data}") + channel.set_subscriptions(data) # # # ------------------------------ EXTERNAL SIGNALS ----------------------- # diff --git a/freqtrade/rpc/rpc_manager.py b/freqtrade/rpc/rpc_manager.py index c5e93e3b4..b3cd5604c 100644 --- a/freqtrade/rpc/rpc_manager.py +++ b/freqtrade/rpc/rpc_manager.py @@ -64,7 +64,7 @@ class RPCManager: """ Stops all enabled rpc modules """ logger.info('Cleaning up rpc modules ...') while self.registered_modules: - mod = self.registered_modules.pop() + mod = self.registered_modules.pop() # popleft to cleanup API server last? logger.info('Cleaning up rpc.%s ...', mod.name) mod.cleanup() del mod diff --git a/freqtrade/rpc/telegram.py b/freqtrade/rpc/telegram.py index 8ce2fa2e4..141368769 100644 --- a/freqtrade/rpc/telegram.py +++ b/freqtrade/rpc/telegram.py @@ -436,9 +436,13 @@ class Telegram(RPCHandler): # Notification disabled return - message = self.compose_message(msg, msg_type) - - self._send_msg(message, disable_notification=(noti == 'silent')) + # Would this be better than adding un-needed if statements to compose_message? + try: + message = self.compose_message(msg, msg_type) + self._send_msg(message, disable_notification=(noti == 'silent')) + except NotImplementedError: + # just skip it + return def _get_sell_emoji(self, msg): """ diff --git a/requirements-externalsignals.txt b/requirements-externalsignals.txt deleted file mode 100644 index 7920b34f6..000000000 --- a/requirements-externalsignals.txt +++ /dev/null @@ -1,7 +0,0 @@ -# Include all requirements to run the bot. --r requirements.txt - -# Required for follower -websockets -msgpack -janus diff --git a/requirements.txt b/requirements.txt index 77925f98b..4d97f500a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -50,3 +50,8 @@ python-dateutil==2.8.2 #Futures schedule==1.1.0 + +#WS Messages +websockets~=10.3 +msgpack~=1.0.4 +janus==1.0.0 diff --git a/scripts/test_ws_client.py b/scripts/test_ws_client.py index 2c64ae867..872ff3ccf 100644 --- a/scripts/test_ws_client.py +++ b/scripts/test_ws_client.py @@ -1,6 +1,7 @@ import asyncio import logging import socket +from typing import Any import websockets @@ -12,8 +13,14 @@ logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) +def compose_consumer_request(type_: str, data: Any): + return {"type": type_, "data": data} + + async def _client(): - subscribe_topics = [RPCMessageType.WHITELIST] + # Trying to recreate multiple topic issue. Wait until first whitelist message, + # then CTRL-C to get the status message. + topics = [RPCMessageType.WHITELIST, RPCMessageType.STATUS] try: while True: try: @@ -23,7 +30,7 @@ async def _client(): logger.info("Connection successful") # Tell the producer we only want these topics - await channel.send(subscribe_topics) + await channel.send(compose_consumer_request("subscribe", topics)) while True: try: diff --git a/setup.py b/setup.py index 8f04e75f7..c7b1f1c7c 100644 --- a/setup.py +++ b/setup.py @@ -79,7 +79,10 @@ setup( 'psutil', 'pyjwt', 'aiofiles', - 'schedule' + 'schedule', + 'websockets', + 'msgpack', + 'janus' ], extras_require={ 'dev': all_extra,