From b8e1d29a1be5af7e4a3950005c1aa79f9a7eefb2 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 23 Sep 2022 12:36:05 -0600 Subject: [PATCH 1/4] catch connectionclosederror --- freqtrade/rpc/external_message_consumer.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index a57fac144..0aab5ba05 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -220,8 +220,11 @@ class ExternalMessageConsumer: continue - except websockets.exceptions.ConnectionClosedOK: - # Successfully closed, just keep trying to connect again indefinitely + except ( + websockets.exceptions.ConnectionClosedError, + websockets.exceptions.ConnectionClosedOk + ): + # Just keep trying to connect again indefinitely continue except Exception as e: From 4c7cef570f54c00f4debfb3b68bad8f93ac0fe14 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 23 Sep 2022 12:58:26 -0600 Subject: [PATCH 2/4] typo in exception --- freqtrade/rpc/external_message_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 0aab5ba05..f60948202 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -222,7 +222,7 @@ class ExternalMessageConsumer: except ( websockets.exceptions.ConnectionClosedError, - websockets.exceptions.ConnectionClosedOk + websockets.exceptions.ConnectionClosedOK ): # Just keep trying to connect again indefinitely continue From 6b5d71049e514c4d6e1dc2584910b1dcc5184d5f Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 23 Sep 2022 13:10:45 -0600 Subject: [PATCH 3/4] add sleep --- freqtrade/rpc/external_message_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index f60948202..99ba39f76 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -217,7 +217,6 @@ class ExternalMessageConsumer: ) as e: logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") await asyncio.sleep(self.sleep_time) - continue except ( @@ -225,6 +224,7 @@ class ExternalMessageConsumer: websockets.exceptions.ConnectionClosedOK ): # Just keep trying to connect again indefinitely + await asyncio.sleep(self.sleep_time) continue except Exception as e: From af974443cdbd146f6e5e4e7a302e33ee5226012d Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Fri, 23 Sep 2022 13:37:46 -0600 Subject: [PATCH 4/4] add test --- tests/rpc/test_rpc_emc.py | 95 +++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 39 deletions(-) diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 9aca88b4a..41faaf249 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -200,43 +200,60 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker): emc.shutdown() -# async def test_emc_create_connection_invalid(default_conf, caplog, mocker): -# default_conf.update({ -# "external_message_consumer": { -# "enabled": True, -# "producers": [ -# { -# "name": "default", -# "host": _TEST_WS_HOST, -# "port": _TEST_WS_PORT, -# "ws_token": _TEST_WS_TOKEN -# } -# ], -# "wait_timeout": 60, -# "ping_timeout": 60, -# "sleep_timeout": 60 -# } -# }) -# -# mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', -# MagicMock()) -# -# test_producer = default_conf['external_message_consumer']['producers'][0] -# lock = asyncio.Lock() -# -# dp = DataProvider(default_conf, None, None, None) -# emc = ExternalMessageConsumer(default_conf, dp) -# -# try: -# # Test invalid URL -# test_producer['url'] = "tcp://null:8080/api/v1/message/ws" -# emc._running = True -# await emc._create_connection(test_producer, lock) -# emc._running = False -# -# assert log_has_re(r".+is an invalid WebSocket URL.+", caplog) -# finally: -# emc.shutdown() +async def test_emc_create_connection_invalid_port(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "host": _TEST_WS_HOST, + "port": -1, + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 60, + "ping_timeout": 60, + "sleep_timeout": 60 + } + }) + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + try: + await asyncio.sleep(0.01) + assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog) + finally: + emc.shutdown() + + +async def test_emc_create_connection_invalid_host(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "host": "10000.1241..2121/", + "port": _TEST_WS_PORT, + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 60, + "ping_timeout": 60, + "sleep_timeout": 60 + } + }) + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + try: + await asyncio.sleep(0.01) + assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog) + finally: + emc.shutdown() async def test_emc_create_connection_error(default_conf, caplog, mocker): @@ -376,7 +393,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): "ws_token": _TEST_WS_TOKEN } ], - "wait_timeout": 1, + "wait_timeout": 0.1, "ping_timeout": 1, "sleep_time": 1 } @@ -396,7 +413,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker): class TestChannel: async def recv(self, *args, **kwargs): - await asyncio.sleep(10) + await asyncio.sleep(0.2) async def ping(self, *args, **kwargs): return asyncio.Future()