From eb4cd6ba82b8d348b02a37d65c1567f0678f056d Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Thu, 1 Sep 2022 23:52:13 -0600 Subject: [PATCH] split initial data into separate requests --- freqtrade/enums/rpcmessagetype.py | 4 +++- freqtrade/rpc/api_server/api_ws.py | 22 +++++++++---------- freqtrade/rpc/external_message_consumer.py | 25 +++++++++++++--------- freqtrade/rpc/rpc.py | 11 +++++----- 4 files changed, 35 insertions(+), 27 deletions(-) diff --git a/freqtrade/enums/rpcmessagetype.py b/freqtrade/enums/rpcmessagetype.py index c213826ae..929f6d083 100644 --- a/freqtrade/enums/rpcmessagetype.py +++ b/freqtrade/enums/rpcmessagetype.py @@ -33,4 +33,6 @@ class RPCMessageType(str, Enum): # Enum for parsing requests from ws consumers class RPCRequestType(str, Enum): SUBSCRIBE = 'subscribe' - INITIAL_DATA = 'initial_data' + + WHITELIST = 'whitelist' + ANALYZED_DF = 'analyzed_df' diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index 52507106d..cf5b6cde0 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -33,7 +33,7 @@ async def _process_consumer_request( return if not isinstance(data, list): - logger.error(f"Improper request from channel: {channel} - {request}") + logger.error(f"Improper subscribe request from channel: {channel} - {request}") return # If all topics passed are a valid RPCMessageType, set subscriptions on channel @@ -42,19 +42,19 @@ async def _process_consumer_request( logger.debug(f"{channel} subscribed to topics: {data}") channel.set_subscriptions(data) - elif type == RPCRequestType.INITIAL_DATA: - # Acquire the data - initial_data = rpc._ws_initial_data() + elif type == RPCRequestType.WHITELIST: + # They requested the whitelist + whitelist = rpc._ws_request_whitelist() - # We now loop over it sending it in pieces - whitelist_data, analyzed_df = initial_data.get('whitelist'), initial_data.get('analyzed_df') + await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist}) - if whitelist_data: - await channel.send({"type": RPCMessageType.WHITELIST, "data": whitelist_data}) + elif type == RPCRequestType.ANALYZED_DF: + # They requested the full historical analyzed dataframes + analyzed_df = rpc._ws_request_analyzed_df() - if analyzed_df: - for pair, message in analyzed_df.items(): - await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message}) + # For every dataframe, send as a separate message + for _, message in analyzed_df.items(): + await channel.send({"type": RPCMessageType.ANALYZED_DF, "data": message}) @router.websocket("/message/ws") diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 4c7f6570d..c925624f8 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -158,10 +158,12 @@ class ExternalMessageConsumer: self.compose_consumer_request(RPCRequestType.SUBSCRIBE, self.topics) ) - # Now request the initial data from this Producer - await channel.send( - self.compose_consumer_request(RPCRequestType.INITIAL_DATA) - ) + # Now request the initial data from this Producer for every topic + # we've subscribed to + for topic in self.topics: + # without .upper() we get KeyError + request_type = RPCRequestType[topic.upper()] + await channel.send(self.compose_consumer_request(request_type)) # Now receive data, if none is within the time limit, ping while True: @@ -191,9 +193,12 @@ class ExternalMessageConsumer: await asyncio.sleep(self.sleep_time) break - except Exception as e: - logger.exception(e) - continue + + # Catch invalid ws_url, and break the loop + except websockets.exceptions.InvalidURI as e: + logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") + break + except ( socket.gaierror, ConnectionRefusedError, @@ -204,9 +209,9 @@ class ExternalMessageConsumer: continue - # Catch invalid ws_url, and break the loop - except websockets.exceptions.InvalidURI as e: - logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") + except Exception as e: + # An unforseen error has occurred, log and stop + logger.exception(e) break def compose_consumer_request( diff --git a/freqtrade/rpc/rpc.py b/freqtrade/rpc/rpc.py index 96b43f36b..378677e44 100644 --- a/freqtrade/rpc/rpc.py +++ b/freqtrade/rpc/rpc.py @@ -1068,13 +1068,14 @@ class RPC: return _data - def _ws_initial_data(self): - """ Websocket friendly initial data, whitelists and all analyzed dataframes """ + def _ws_request_analyzed_df(self): + """ Historical Analyzed Dataframes for WebSocket """ whitelist = self._freqtrade.active_pair_whitelist - # We only get the last 500 candles, should we remove the limit? - analyzed_df = self._ws_all_analysed_dataframes(whitelist, 500) + return self._ws_all_analysed_dataframes(whitelist, 500) - return {"whitelist": whitelist, "analyzed_df": analyzed_df} + def _ws_request_whitelist(self): + """ Whitelist data for WebSocket """ + return self._freqtrade.active_pair_whitelist @ staticmethod def _rpc_analysed_history_full(config, pair: str, timeframe: str,