diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 6a8faef81..e4b3c2609 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -99,7 +99,7 @@ class ExternalMessageConsumer: if self.enabled and self._config.get('process_only_new_candles', True): # Warning here or require it? - logger.warning("To receive best performance with external data," + logger.warning("To receive best performance with external data, " "please set `process_only_new_candles` to False") def start(self): @@ -205,11 +205,6 @@ class ExternalMessageConsumer: # Now receive data, if none is within the time limit, ping await self._receive_messages(channel, producer, lock) - # Catch invalid ws_url, and break the loop - except websockets.exceptions.InvalidURI as e: - logger.error(f"{ws_url} is an invalid WebSocket URL - {e}") - break - except ( socket.gaierror, ConnectionRefusedError, diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 7bb727810..83276aabe 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -29,7 +29,8 @@ def patched_emc(default_conf, mocker): "producers": [ { "name": "default", - "url": "ws://null:9891/api/v1/message/ws", + "host": "null", + "port": 9891, "ws_token": _TEST_WS_TOKEN } ] @@ -166,7 +167,8 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker): "producers": [ { "name": "default", - "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "host": _TEST_WS_HOST, + "port": _TEST_WS_PORT, "ws_token": _TEST_WS_TOKEN } ], @@ -198,42 +200,43 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker): emc.shutdown() -async def test_emc_create_connection_invalid(default_conf, caplog, mocker): - default_conf.update({ - "external_message_consumer": { - "enabled": True, - "producers": [ - { - "name": "default", - "url": "ws://localhost:8080/api/v1/message/ws", - "ws_token": _TEST_WS_TOKEN - } - ], - "wait_timeout": 60, - "ping_timeout": 60, - "sleep_timeout": 60 - } - }) - - mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', - MagicMock()) - - test_producer = default_conf['external_message_consumer']['producers'][0] - lock = asyncio.Lock() - - dp = DataProvider(default_conf, None, None, None) - emc = ExternalMessageConsumer(default_conf, dp) - - try: - # Test invalid URL - test_producer['url'] = "tcp://localhost:8080/api/v1/message/ws" - emc._running = True - await emc._create_connection(test_producer, lock) - emc._running = False - - assert log_has_re(r".+is an invalid WebSocket URL.+", caplog) - finally: - emc.shutdown() +# async def test_emc_create_connection_invalid(default_conf, caplog, mocker): +# default_conf.update({ +# "external_message_consumer": { +# "enabled": True, +# "producers": [ +# { +# "name": "default", +# "host": _TEST_WS_HOST, +# "port": _TEST_WS_PORT, +# "ws_token": _TEST_WS_TOKEN +# } +# ], +# "wait_timeout": 60, +# "ping_timeout": 60, +# "sleep_timeout": 60 +# } +# }) +# +# mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', +# MagicMock()) +# +# test_producer = default_conf['external_message_consumer']['producers'][0] +# lock = asyncio.Lock() +# +# dp = DataProvider(default_conf, None, None, None) +# emc = ExternalMessageConsumer(default_conf, dp) +# +# try: +# # Test invalid URL +# test_producer['url'] = "tcp://null:8080/api/v1/message/ws" +# emc._running = True +# await emc._create_connection(test_producer, lock) +# emc._running = False +# +# assert log_has_re(r".+is an invalid WebSocket URL.+", caplog) +# finally: +# emc.shutdown() async def test_emc_create_connection_error(default_conf, caplog, mocker): @@ -243,7 +246,8 @@ async def test_emc_create_connection_error(default_conf, caplog, mocker): "producers": [ { "name": "default", - "url": "ws://localhost:8080/api/v1/message/ws", + "host": _TEST_WS_HOST, + "port": _TEST_WS_PORT, "ws_token": _TEST_WS_TOKEN } ], @@ -260,7 +264,7 @@ async def test_emc_create_connection_error(default_conf, caplog, mocker): emc = ExternalMessageConsumer(default_conf, dp) try: - await asyncio.sleep(1) + await asyncio.sleep(0.01) assert log_has("Unexpected error has occurred:", caplog) finally: emc.shutdown() @@ -273,7 +277,8 @@ async def test_emc_receive_messages_valid(default_conf, caplog, mocker): "producers": [ { "name": "default", - "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "host": _TEST_WS_HOST, + "port": _TEST_WS_PORT, "ws_token": _TEST_WS_TOKEN } ], @@ -319,7 +324,8 @@ async def test_emc_receive_messages_invalid(default_conf, caplog, mocker): "producers": [ { "name": "default", - "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "host": _TEST_WS_HOST, + "port": _TEST_WS_PORT, "ws_token": _TEST_WS_TOKEN } ], @@ -365,7 +371,8 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): "producers": [ { "name": "default", - "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "host": _TEST_WS_HOST, + "port": _TEST_WS_PORT, "ws_token": _TEST_WS_TOKEN } ], @@ -411,7 +418,8 @@ async def test_emc_receive_messages_handle_error(default_conf, caplog, mocker): "producers": [ { "name": "default", - "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "host": _TEST_WS_HOST, + "port": _TEST_WS_PORT, "ws_token": _TEST_WS_TOKEN } ],