diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 1d917577a..89fa90c8e 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -203,8 +203,13 @@ class ExternalMessageConsumer: continue + except websockets.exceptions.ConnectionClosedOK: + # Successfully closed, just end + return + except Exception as e: # An unforseen error has occurred, log and stop + logger.error("Unexpected error has occurred:") logger.exception(e) break diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 9b14c2039..a074334c5 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -1,16 +1,26 @@ """ Unit test file for rpc/external_message_consumer.py """ +import asyncio +import functools +import json import logging from datetime import datetime, timezone +from unittest.mock import MagicMock import pytest +import websockets from freqtrade.data.dataprovider import DataProvider from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer from tests.conftest import log_has, log_has_re, log_has_when +_TEST_WS_TOKEN = "secret_Ws_t0ken" +_TEST_WS_HOST = "localhost" +_TEST_WS_PORT = 9989 + + @pytest.fixture def patched_emc(default_conf, mocker): default_conf.update({ @@ -20,7 +30,7 @@ def patched_emc(default_conf, mocker): { "name": "default", "url": "ws://127.0.0.1:8080/api/v1/message/ws", - "ws_token": "secret_Ws_t0ken" + "ws_token": _TEST_WS_TOKEN } ] } @@ -149,3 +159,168 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): patched_emc.handle_producer_message(test_producer, malformed_message) assert log_has_re(r"Invalid message .+", caplog) + + +async def test_emc_create_connection_success(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 60, + "ping_timeout": 60, + "sleep_timeout": 60 + } + }) + + mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', + MagicMock()) + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + test_producer = default_conf['external_message_consumer']['producers'][0] + lock = asyncio.Lock() + + async def eat(websocket): + pass + + try: + async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT): + emc._running = True + await emc._create_connection(test_producer, lock) + emc._running = False + + assert log_has_re(r"Producer connection success.+", caplog) + finally: + emc.shutdown() + + +async def test_emc_create_connection_invalid(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": "ws://localhost:8080/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 60, + "ping_timeout": 60, + "sleep_timeout": 60 + } + }) + + mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', + MagicMock()) + + lock = asyncio.Lock() + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + test_producer = default_conf['external_message_consumer']['producers'][0] + + try: + # Test invalid URL + test_producer['url'] = "tcp://localhost:8080/api/v1/message/ws" + emc._running = True + await emc._create_connection(test_producer, lock) + emc._running = False + + assert log_has_re(r".+is an invalid WebSocket URL.+", caplog) + finally: + emc.shutdown() + + +async def test_emc_create_connection_error(default_conf, caplog, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": "ws://localhost:8080/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 60, + "ping_timeout": 60, + "sleep_timeout": 60 + } + }) + + # Test unexpected error + mocker.patch('websockets.connect', side_effect=RuntimeError) + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + try: + await asyncio.sleep(1) + assert log_has("Unexpected error has occurred:", caplog) + finally: + emc.shutdown() + + +async def test_emc_receive_messages(default_conf, caplog, mocker): + """ + Test ExternalMessageConsumer._receive_messages + + Instantiates a patched ExternalMessageConsumer, creates a dummy websocket server, + and listens to the generated messages from the server for 1 second, then checks logs + """ + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws", + "ws_token": _TEST_WS_TOKEN + } + ], + "wait_timeout": 60, + "ping_timeout": 60, + "sleep_timeout": 60 + } + }) + + mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', + MagicMock()) + + lock = asyncio.Lock() + test_producer = default_conf['external_message_consumer']['producers'][0] + + dp = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dp) + + # Dummy generator + async def generate_messages(websocket): + try: + for i in range(3): + message = json.dumps({"type": "whitelist", "data": ["BTC/USDT"]}) + await websocket.send(message) + await asyncio.sleep(1) + except websockets.exceptions.ConnectionClosedOK: + return + + loop = asyncio.get_event_loop() + def change_running(emc): emc._running = not emc._running + + try: + # Start the dummy websocket server + async with websockets.serve(generate_messages, _TEST_WS_HOST, _TEST_WS_PORT): + # Change running to True, and call change_running in 1 second + emc._running = True + loop.call_later(1, functools.partial(change_running, emc=emc)) + # Create the connection that receives messages + await emc._create_connection(test_producer, lock) + + assert log_has_re(r"Received message of type `whitelist`.+", caplog) + finally: + emc.shutdown()