minor fixes, rework consumer request, update requirements.txt
This commit is contained in:
		| @@ -6,7 +6,7 @@ from freqtrade.enums.exittype import ExitType | |||||||
| from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy | from freqtrade.enums.externalsignal import ExternalSignalModeType, LeaderMessageType, WaitDataPolicy | ||||||
| from freqtrade.enums.marginmode import MarginMode | from freqtrade.enums.marginmode import MarginMode | ||||||
| from freqtrade.enums.ordertypevalue import OrderTypeValues | from freqtrade.enums.ordertypevalue import OrderTypeValues | ||||||
| from freqtrade.enums.rpcmessagetype import RPCMessageType | from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType | ||||||
| from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode | from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode | ||||||
| from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType | from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType | ||||||
| from freqtrade.enums.state import State | from freqtrade.enums.state import State | ||||||
|   | |||||||
| @@ -21,9 +21,15 @@ class RPCMessageType(str, Enum): | |||||||
|     STRATEGY_MSG = 'strategy_msg' |     STRATEGY_MSG = 'strategy_msg' | ||||||
|  |  | ||||||
|     WHITELIST = 'whitelist' |     WHITELIST = 'whitelist' | ||||||
|  |     ANALYZED_DF = 'analyzed_df' | ||||||
|  |  | ||||||
|     def __repr__(self): |     def __repr__(self): | ||||||
|         return self.value |         return self.value | ||||||
|  |  | ||||||
|     def __str__(self): |     def __str__(self): | ||||||
|         return self.value |         return self.value | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # Enum for parsing requests from ws consumers | ||||||
|  | class RPCRequestType(str, Enum): | ||||||
|  |     SUBSCRIBE = 'subscribe' | ||||||
|   | |||||||
| @@ -2,8 +2,7 @@ import logging | |||||||
|  |  | ||||||
| from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect | from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect | ||||||
|  |  | ||||||
| from freqtrade.enums import RPCMessageType | from freqtrade.rpc.api_server.deps import get_channel_manager, get_rpc_optional | ||||||
| from freqtrade.rpc.api_server.deps import get_channel_manager |  | ||||||
| from freqtrade.rpc.api_server.ws.utils import is_websocket_alive | from freqtrade.rpc.api_server.ws.utils import is_websocket_alive | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -16,7 +15,8 @@ router = APIRouter() | |||||||
| @router.websocket("/message/ws") | @router.websocket("/message/ws") | ||||||
| async def message_endpoint( | async def message_endpoint( | ||||||
|     ws: WebSocket, |     ws: WebSocket, | ||||||
|     channel_manager=Depends(get_channel_manager) |     channel_manager=Depends(get_channel_manager), | ||||||
|  |     rpc=Depends(get_rpc_optional) | ||||||
| ): | ): | ||||||
|     try: |     try: | ||||||
|         if is_websocket_alive(ws): |         if is_websocket_alive(ws): | ||||||
| @@ -31,19 +31,10 @@ async def message_endpoint( | |||||||
|                 while not channel.is_closed(): |                 while not channel.is_closed(): | ||||||
|                     request = await channel.recv() |                     request = await channel.recv() | ||||||
|  |  | ||||||
|                     # This is where we'd parse the request. For now this should only |                     # Process the request here. Should this be a method of RPC? | ||||||
|                     # be a list of topics to subscribe too. List[str] |                     if rpc: | ||||||
|                     # Maybe allow the consumer to update the topics subscribed |                         logger.info(f"Request: {request}") | ||||||
|                     # during runtime? |                         rpc._process_consumer_request(request, channel) | ||||||
|  |  | ||||||
|                     # If the request isn't a list then skip it |  | ||||||
|                     if not isinstance(request, list): |  | ||||||
|                         continue |  | ||||||
|  |  | ||||||
|                     # Check if all topics listed are an RPCMessageType |  | ||||||
|                     if all([any(x.value == topic for x in RPCMessageType) for topic in request]): |  | ||||||
|                         logger.debug(f"{ws.client} subscribed to topics: {request}") |  | ||||||
|                         channel.set_subscriptions(request) |  | ||||||
|  |  | ||||||
|             except WebSocketDisconnect: |             except WebSocketDisconnect: | ||||||
|                 # Handle client disconnects |                 # Handle client disconnects | ||||||
|   | |||||||
| @@ -63,11 +63,7 @@ class ApiServer(RPCHandler): | |||||||
|             ApiServer.__initialized = False |             ApiServer.__initialized = False | ||||||
|         return ApiServer.__instance |         return ApiServer.__instance | ||||||
|  |  | ||||||
|     def __init__( |     def __init__(self, config: Dict[str, Any], standalone: bool = False) -> None: | ||||||
|         self, |  | ||||||
|         config: Dict[str, Any], |  | ||||||
|         standalone: bool = False, |  | ||||||
|     ) -> None: |  | ||||||
|         ApiServer._config = config |         ApiServer._config = config | ||||||
|         if self.__initialized and (standalone or self._standalone): |         if self.__initialized and (standalone or self._standalone): | ||||||
|             return |             return | ||||||
|   | |||||||
| @@ -19,8 +19,8 @@ from freqtrade.configuration.timerange import TimeRange | |||||||
| from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT | from freqtrade.constants import CANCEL_REASON, DATETIME_PRINT_FORMAT | ||||||
| from freqtrade.data.history import load_data | from freqtrade.data.history import load_data | ||||||
| from freqtrade.data.metrics import calculate_max_drawdown | from freqtrade.data.metrics import calculate_max_drawdown | ||||||
| from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, SignalDirection, State, | from freqtrade.enums import (CandleType, ExitCheckTuple, ExitType, RPCMessageType, RPCRequestType, | ||||||
|                              TradingMode) |                              SignalDirection, State, TradingMode) | ||||||
| from freqtrade.exceptions import ExchangeError, PricingError | from freqtrade.exceptions import ExchangeError, PricingError | ||||||
| from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs | from freqtrade.exchange import timeframe_to_minutes, timeframe_to_msecs | ||||||
| from freqtrade.loggers import bufferHandler | from freqtrade.loggers import bufferHandler | ||||||
| @@ -1089,6 +1089,16 @@ class RPC: | |||||||
|             'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT), |             'last_process_loc': last_p.astimezone(tzlocal()).strftime(DATETIME_PRINT_FORMAT), | ||||||
|             'last_process_ts': int(last_p.timestamp()), |             'last_process_ts': int(last_p.timestamp()), | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |     # We are passed a Channel object, we can only do sync functions on that channel object | ||||||
|  |     def _process_consumer_request(self, request, channel): | ||||||
|  |         # Should we ensure that request is Dict[str, Any]? | ||||||
|  |         type, data = request.get('type'), request.get('data') | ||||||
|  |  | ||||||
|  |         if type == RPCRequestType.SUBSCRIBE: | ||||||
|  |             if all([any(x.value == topic for x in RPCMessageType) for topic in data]): | ||||||
|  |                 logger.debug(f"{channel} subscribed to topics: {data}") | ||||||
|  |                 channel.set_subscriptions(data) | ||||||
|     # |     # | ||||||
|     # # ------------------------------ EXTERNAL SIGNALS ----------------------- |     # # ------------------------------ EXTERNAL SIGNALS ----------------------- | ||||||
|     # |     # | ||||||
|   | |||||||
| @@ -64,7 +64,7 @@ class RPCManager: | |||||||
|         """ Stops all enabled rpc modules """ |         """ Stops all enabled rpc modules """ | ||||||
|         logger.info('Cleaning up rpc modules ...') |         logger.info('Cleaning up rpc modules ...') | ||||||
|         while self.registered_modules: |         while self.registered_modules: | ||||||
|             mod = self.registered_modules.pop() |             mod = self.registered_modules.pop()  # popleft to cleanup API server last? | ||||||
|             logger.info('Cleaning up rpc.%s ...', mod.name) |             logger.info('Cleaning up rpc.%s ...', mod.name) | ||||||
|             mod.cleanup() |             mod.cleanup() | ||||||
|             del mod |             del mod | ||||||
|   | |||||||
| @@ -436,9 +436,13 @@ class Telegram(RPCHandler): | |||||||
|             # Notification disabled |             # Notification disabled | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         message = self.compose_message(msg, msg_type) |         # Would this be better than adding un-needed if statements to compose_message? | ||||||
|  |         try: | ||||||
|         self._send_msg(message, disable_notification=(noti == 'silent')) |             message = self.compose_message(msg, msg_type) | ||||||
|  |             self._send_msg(message, disable_notification=(noti == 'silent')) | ||||||
|  |         except NotImplementedError: | ||||||
|  |             # just skip it | ||||||
|  |             return | ||||||
|  |  | ||||||
|     def _get_sell_emoji(self, msg): |     def _get_sell_emoji(self, msg): | ||||||
|         """ |         """ | ||||||
|   | |||||||
| @@ -1,7 +0,0 @@ | |||||||
| # Include all requirements to run the bot. |  | ||||||
| -r requirements.txt |  | ||||||
|  |  | ||||||
| # Required for follower |  | ||||||
| websockets |  | ||||||
| msgpack |  | ||||||
| janus |  | ||||||
| @@ -50,3 +50,8 @@ python-dateutil==2.8.2 | |||||||
|  |  | ||||||
| #Futures | #Futures | ||||||
| schedule==1.1.0 | schedule==1.1.0 | ||||||
|  |  | ||||||
|  | #WS Messages | ||||||
|  | websockets~=10.3 | ||||||
|  | msgpack~=1.0.4 | ||||||
|  | janus==1.0.0 | ||||||
|   | |||||||
| @@ -1,6 +1,7 @@ | |||||||
| import asyncio | import asyncio | ||||||
| import logging | import logging | ||||||
| import socket | import socket | ||||||
|  | from typing import Any | ||||||
|  |  | ||||||
| import websockets | import websockets | ||||||
|  |  | ||||||
| @@ -12,8 +13,14 @@ logging.basicConfig(level=logging.DEBUG) | |||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def compose_consumer_request(type_: str, data: Any): | ||||||
|  |     return {"type": type_, "data": data} | ||||||
|  |  | ||||||
|  |  | ||||||
| async def _client(): | async def _client(): | ||||||
|     subscribe_topics = [RPCMessageType.WHITELIST] |     # Trying to recreate multiple topic issue. Wait until first whitelist message, | ||||||
|  |     # then CTRL-C to get the status message. | ||||||
|  |     topics = [RPCMessageType.WHITELIST, RPCMessageType.STATUS] | ||||||
|     try: |     try: | ||||||
|         while True: |         while True: | ||||||
|             try: |             try: | ||||||
| @@ -23,7 +30,7 @@ async def _client(): | |||||||
|  |  | ||||||
|                     logger.info("Connection successful") |                     logger.info("Connection successful") | ||||||
|                     # Tell the producer we only want these topics |                     # Tell the producer we only want these topics | ||||||
|                     await channel.send(subscribe_topics) |                     await channel.send(compose_consumer_request("subscribe", topics)) | ||||||
|  |  | ||||||
|                     while True: |                     while True: | ||||||
|                         try: |                         try: | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user