split initial data into separate requests
This commit is contained in:
		| @@ -33,4 +33,6 @@ class RPCMessageType(str, Enum): | ||||
| # Enum for parsing requests from ws consumers | ||||
| class RPCRequestType(str, Enum): | ||||
|     SUBSCRIBE = 'subscribe' | ||||
|     INITIAL_DATA = 'initial_data' | ||||
|  | ||||
|     WHITELIST = 'whitelist' | ||||
|     ANALYZED_DF = 'analyzed_df' | ||||
|   | ||||
| @@ -33,7 +33,7 @@ async def _process_consumer_request( | ||||
|             return | ||||
|  | ||||
|         if not isinstance(data, list): | ||||
|             logger.error(f"Improper request from channel: {channel} - {request}") | ||||
|             logger.error(f"Improper subscribe request from channel: {channel} - {request}") | ||||
|             return | ||||
|  | ||||
|         # If all topics passed are a valid RPCMessageType, set subscriptions on channel | ||||
| @@ -42,19 +42,19 @@ async def _process_consumer_request( | ||||
|             logger.debug(f"{channel} subscribed to topics: {data}") | ||||
|             channel.set_subscriptions(data) | ||||
|  | ||||
|     elif type == RPCRequestType.INITIAL_DATA: | ||||
|         # Acquire the data | ||||
|         initial_data = rpc._ws_initial_data() | ||||
|     elif type == RPCRequestType.WHITELIST: | ||||
|         # They requested the whitelist | ||||
|         whitelist = rpc._ws_request_whitelist() | ||||
|  | ||||
|         # We now loop over it sending it in pieces | ||||
|         whitelist_data, analyzed_df = initial_data.get('whitelist'), initial_data.get('analyzed_df') | ||||
|         await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist}) | ||||
|  | ||||
|         if whitelist_data: | ||||
|             await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist_data}) | ||||
|     elif type == RPCRequestType.ANALYZED_DF: | ||||
|         # They requested the full historical analyzed dataframes | ||||
|         analyzed_df = rpc._ws_request_analyzed_df() | ||||
|  | ||||
|         if analyzed_df: | ||||
|             for pair, message in analyzed_df.items(): | ||||
|                 await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message}) | ||||
|         # For every dataframe, send as a separate message | ||||
|         for _, message in analyzed_df.items(): | ||||
|             await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message}) | ||||
|  | ||||
|  | ||||
| @router.websocket("/message/ws") | ||||
|   | ||||
| @@ -158,10 +158,12 @@ class ExternalMessageConsumer: | ||||
|                         self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) | ||||
|                     ) | ||||
|  | ||||
|                     # Now request the initial data from this Producer | ||||
|                     await channel.send( | ||||
|                         self.compose_consumer_request(RPCRequestType.INITIAL_DATA) | ||||
|                     ) | ||||
|                     # Now request the initial data from this Producer for every topic | ||||
|                     # we've subscribed to | ||||
|                     for topic in self.topics: | ||||
|                         # without .upper() we get KeyError | ||||
|                         request_type = RPCRequestType[topic.upper()] | ||||
|                         await channel.send(self.compose_consumer_request(request_type)) | ||||
|  | ||||
|                     # Now receive data, if none is within the time limit, ping | ||||
|                     while True: | ||||
| @@ -191,9 +193,12 @@ class ExternalMessageConsumer: | ||||
|                                 await asyncio.sleep(self.sleep_time) | ||||
|  | ||||
|                                 break | ||||
|                         except Exception as e: | ||||
|                             logger.exception(e) | ||||
|                             continue | ||||
|  | ||||
|             # Catch invalid ws_url, and break the loop | ||||
|             except websockets.exceptions.InvalidURI as e: | ||||
|                 logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") | ||||
|                 break | ||||
|  | ||||
|             except ( | ||||
|                 socket.gaierror, | ||||
|                 ConnectionRefusedError, | ||||
| @@ -204,9 +209,9 @@ class ExternalMessageConsumer: | ||||
|  | ||||
|                 continue | ||||
|  | ||||
|             # Catch invalid ws_url, and break the loop | ||||
|             except websockets.exceptions.InvalidURI as e: | ||||
|                 logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") | ||||
|             except Exception as e: | ||||
|                 # An unforseen error has occurred, log and stop | ||||
|                 logger.exception(e) | ||||
|                 break | ||||
|  | ||||
|     def compose_consumer_request( | ||||
|   | ||||
| @@ -1068,13 +1068,14 @@ class RPC: | ||||
|  | ||||
|         return _data | ||||
|  | ||||
|     def _ws_initial_data(self): | ||||
|         """ Websocket friendly initial data, whitelists and all analyzed dataframes """ | ||||
|     def _ws_request_analyzed_df(self): | ||||
|         """ Historical Analyzed Dataframes for WebSocket """ | ||||
|         whitelist = self._freqtrade.active_pair_whitelist | ||||
|         # We only get the last 500 candles, should we remove the limit? | ||||
|         analyzed_df = self._ws_all_analysed_dataframes(whitelist, 500) | ||||
|         return self._ws_all_analysed_dataframes(whitelist, 500) | ||||
|  | ||||
|         return {"whitelist": whitelist, "analyzed_df": analyzed_df} | ||||
|     def _ws_request_whitelist(self): | ||||
|         """ Whitelist data for WebSocket """ | ||||
|         return self._freqtrade.active_pair_whitelist | ||||
|  | ||||
|     @ staticmethod | ||||
|     def _rpc_analysed_history_full(config, pair: str, timeframe: str, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user