diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index ae7c1f765..c571ac510 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -261,19 +261,19 @@ class ExternalMessageConsumer: try: producer_message = WSMessageSchema.parse_obj(message) except ValidationError as e: - logger.error(f"Invalid message from {producer_name}: {e}") + logger.error(f"Invalid message from `{producer_name}`: {e}") return # We shouldn't get empty messages if producer_message.data is None: return - logger.info(f"Received message of type {producer_message.type} from `{producer_name}`") + logger.info(f"Received message of type `{producer_message.type}` from `{producer_name}`") message_handler = self._message_handlers.get(producer_message.type) if not message_handler: - logger.info(f"Received unhandled message: {producer_message.data}, ignoring...") + logger.info(f"Received unhandled message: `{producer_message.data}`, ignoring...") return message_handler(producer_name, producer_message) @@ -288,7 +288,7 @@ class ExternalMessageConsumer: # Add the pairlist data to the DataProvider self._dp._set_producer_pairs(message.data, producer_name=producer_name) - logger.debug(f"Consumed message from {producer_name} of type `RPCMessageType.WHITELIST`") + logger.debug(f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`") def _consume_analyzed_df_message(self, producer_name: str, message: Any): try: @@ -314,4 +314,4 @@ class ExternalMessageConsumer: producer_name=producer_name) logger.debug( - f"Consumed message from {producer_name} of type RPCMessageType.ANALYZED_DF") + f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py index 6512ff2be..f33e80018 100644 --- a/tests/rpc/test_rpc_emc.py +++ b/tests/rpc/test_rpc_emc.py @@ -1,11 +1,14 @@ """ Unit test file for rpc/external_message_consumer.py """ +import logging +from datetime import datetime, timezone + import pytest from freqtrade.data.dataprovider import DataProvider from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer -from tests.conftest import log_has, log_has_when +from tests.conftest import log_has, log_has_re, log_has_when @pytest.fixture(autouse=True) @@ -77,3 +80,56 @@ def test_emc_init(patched_emc, default_conf, mocker, caplog): # Make sure we failed because of no producers assert str(exc.value) == "You must specify at least 1 Producer to connect to." + + +def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): + test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"} + caplog.set_level(logging.DEBUG) + + # Test handle whitelist message + whitelist_message = {"type": "whitelist", "data": ["BTC/USDT"]} + patched_emc.handle_producer_message(test_producer, whitelist_message) + + assert log_has("Received message of type `whitelist` from `test`", caplog) + assert log_has("Consumed message from `test` of type `RPCMessageType.WHITELIST`", caplog) + + # Test handle analyzed_df message + df_message = { + "type": "analyzed_df", + "data": { + "key": ("BTC/USDT", "5m", "spot"), + "df": ohlcv_history, + "la": datetime.now(timezone.utc) + } + } + patched_emc.handle_producer_message(test_producer, df_message) + + assert log_has("Received message of type `analyzed_df` from `test`", caplog) + assert log_has("Consumed message from `test` of type `RPCMessageType.ANALYZED_DF`", caplog) + + # Test unhandled message + unhandled_message = {"type": "status", "data": "RUNNING"} + patched_emc.handle_producer_message(test_producer, unhandled_message) + + assert log_has_re(r"Received unhandled message\: .*", caplog) + + # Test malformed message + caplog.clear() + malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}} + patched_emc.handle_producer_message(test_producer, malformed_message) + + assert log_has("Received message of type `whitelist` from `test`", caplog) + assert not log_has("Consumed message from `test` of type `RPCMessageType.WHITELIST`", caplog) + + malformed_message = { + "type": "analyzed_df", + "data": { + "key": "BTC/USDT", + "df": ohlcv_history, + "la": datetime.now(timezone.utc) + } + } + patched_emc.handle_producer_message(test_producer, malformed_message) + + assert log_has("Received message of type `analyzed_df` from `test`", caplog) + assert not log_has("Consumed message from `test` of type `RPCMessageType.ANALYZED_DF`", caplog)