diff --git a/config_examples/config_full.example.json b/config_examples/config_full.example.json index d8d552814..5a5096f81 100644 --- a/config_examples/config_full.example.json +++ b/config_examples/config_full.example.json @@ -175,22 +175,21 @@ "password": "SuperSecurePassword", "ws_token": "secret_ws_t0ken." }, - // The ExternalMessageConsumer config should only be enabled on an instance - // that listens to outside data from another instance. This should not be enabled - // in your producer of data. "external_message_consumer": { "enabled": false, "producers": [ { "name": "default", - "url": "ws://localhost:8081/api/v1/message/ws", + "host": "127.0.0.2", + "port": 8080, "ws_token": "secret_ws_t0ken." } ], - "poll_timeout": 300, + "wait_timeout": 300, "ping_timeout": 10, "sleep_time": 10, - "remove_entry_exit_signals": false + "remove_entry_exit_signals": false, + "message_size_limit": 8 }, "bot_name": "freqtrade", "db_url": "sqlite:///tradesv3.sqlite", diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 371cb9578..2fc855fbd 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -496,23 +496,24 @@ CONF_SCHEMA = { 'type': 'object', 'properties': { 'name': {'type': 'string'}, - 'url': {'type': 'string', 'default': ''}, - 'ws_token': {'type': 'string', 'default': ''}, + 'host': {'type': 'string'}, + 'port': {'type': 'integer', 'default': 8080}, + 'ws_token': {'type': 'string'}, }, - 'required': ['name', 'url', 'ws_token'] + 'required': ['name', 'host', 'ws_token'] } }, 'wait_timeout': {'type': 'integer', 'minimum': 0}, 'sleep_time': {'type': 'integer', 'minimum': 0}, 'ping_timeout': {'type': 'integer', 'minimum': 0}, - 'remove_signals_analyzed_df': {'type': 'boolean', 'default': False}, + 'remove_entry_exit_signals': {'type': 'boolean', 'default': False}, 'initial_candle_limit': { 'type': 'integer', 'minimum': 0, 'maximum': 1500, 'default': 1500 }, - 'max_message_size': { # In megabytes + 'message_size_limit': { # In megabytes 'type': 'integer', 'minimum': 1, 'maxmium': 20, diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index c4a3e9d4a..f55b2dbd3 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -101,8 +101,6 @@ async def message_endpoint( Message WebSocket endpoint, facilitates sending RPC messages """ try: - # TODO: - # Return a channel ID, pass that instead of ws to the rest of the methods channel = await channel_manager.on_connect(ws) if await is_websocket_alive(ws): diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 95031488d..6a8faef81 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -57,9 +57,6 @@ class ExternalMessageConsumer: self.enabled = self._emc_config.get('enabled', False) self.producers = self._emc_config.get('producers', []) - if self.enabled and len(self.producers) < 1: - raise OperationalException("You must specify at least 1 Producer to connect to.") - self.wait_timeout = self._emc_config.get('wait_timeout', 300) # in seconds self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds @@ -71,6 +68,8 @@ class ExternalMessageConsumer: # as the websockets client expects bytes. self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20) + self.validate_config() + # Setting these explicitly as they probably shouldn't be changed by a user # Unless we somehow integrate this with the strategy to allow creating # callbacks for the messages @@ -91,6 +90,18 @@ class ExternalMessageConsumer: self.start() + def validate_config(self): + """ + Make sure values are what they are supposed to be + """ + if self.enabled and len(self.producers) < 1: + raise OperationalException("You must specify at least 1 Producer to connect to.") + + if self.enabled and self._config.get('process_only_new_candles', True): + # Warning here or require it? + logger.warning("To receive best performance with external data," + "please set `process_only_new_candles` to False") + def start(self): """ Start the main internal loop in another thread to run coroutines @@ -174,12 +185,10 @@ class ExternalMessageConsumer: """ while self._running: try: - url, token = producer['url'], producer['ws_token'] - name = producer["name"] - ws_url = f"{url}?token={token}" - - logger.info( - f"Connecting to {name} @ {url}, max message size: {self.message_size_limit}") + host, port = producer['host'], producer['port'] + token = producer['ws_token'] + name = producer['name'] + ws_url = f"ws://{host}:{port}/api/v1/message/ws?token={token}" # This will raise InvalidURI if the url is bad async with websockets.connect(ws_url, max_size=self.message_size_limit) as ws: