Merge pull request #7460 from wizrds/bugfix-emc

Bug fix in External Message Consumer
This commit is contained in:
Matthias 2022-09-24 06:49:18 +02:00 committed by GitHub
commit d9c8e7157b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 42 deletions

View File

@ -217,11 +217,14 @@ class ExternalMessageConsumer:
) as e: ) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s") logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time) await asyncio.sleep(self.sleep_time)
continue continue
except websockets.exceptions.ConnectionClosedOK: except (
# Successfully closed, just keep trying to connect again indefinitely websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK
):
# Just keep trying to connect again indefinitely
await asyncio.sleep(self.sleep_time)
continue continue
except Exception as e: except Exception as e:

View File

@ -200,43 +200,60 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker):
emc.shutdown() emc.shutdown()
# async def test_emc_create_connection_invalid(default_conf, caplog, mocker): async def test_emc_create_connection_invalid_port(default_conf, caplog, mocker):
# default_conf.update({ default_conf.update({
# "external_message_consumer": { "external_message_consumer": {
# "enabled": True, "enabled": True,
# "producers": [ "producers": [
# { {
# "name": "default", "name": "default",
# "host": _TEST_WS_HOST, "host": _TEST_WS_HOST,
# "port": _TEST_WS_PORT, "port": -1,
# "ws_token": _TEST_WS_TOKEN "ws_token": _TEST_WS_TOKEN
# } }
# ], ],
# "wait_timeout": 60, "wait_timeout": 60,
# "ping_timeout": 60, "ping_timeout": 60,
# "sleep_timeout": 60 "sleep_timeout": 60
# } }
# }) })
#
# mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', dp = DataProvider(default_conf, None, None, None)
# MagicMock()) emc = ExternalMessageConsumer(default_conf, dp)
#
# test_producer = default_conf['external_message_consumer']['producers'][0] try:
# lock = asyncio.Lock() await asyncio.sleep(0.01)
# assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
# dp = DataProvider(default_conf, None, None, None) finally:
# emc = ExternalMessageConsumer(default_conf, dp) emc.shutdown()
#
# try:
# # Test invalid URL async def test_emc_create_connection_invalid_host(default_conf, caplog, mocker):
# test_producer['url'] = "tcp://null:8080/api/v1/message/ws" default_conf.update({
# emc._running = True "external_message_consumer": {
# await emc._create_connection(test_producer, lock) "enabled": True,
# emc._running = False "producers": [
# {
# assert log_has_re(r".+is an invalid WebSocket URL.+", caplog) "name": "default",
# finally: "host": "10000.1241..2121/",
# emc.shutdown() "port": _TEST_WS_PORT,
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 60,
"ping_timeout": 60,
"sleep_timeout": 60
}
})
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
try:
await asyncio.sleep(0.01)
assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
finally:
emc.shutdown()
async def test_emc_create_connection_error(default_conf, caplog, mocker): async def test_emc_create_connection_error(default_conf, caplog, mocker):
@ -376,7 +393,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
"ws_token": _TEST_WS_TOKEN "ws_token": _TEST_WS_TOKEN
} }
], ],
"wait_timeout": 1, "wait_timeout": 0.1,
"ping_timeout": 1, "ping_timeout": 1,
"sleep_time": 1 "sleep_time": 1
} }
@ -396,7 +413,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
class TestChannel: class TestChannel:
async def recv(self, *args, **kwargs): async def recv(self, *args, **kwargs):
await asyncio.sleep(10) await asyncio.sleep(0.2)
async def ping(self, *args, **kwargs): async def ping(self, *args, **kwargs):
return asyncio.Future() return asyncio.Future()