update message parsing, tests
This commit is contained in:
		| @@ -264,10 +264,6 @@ class ExternalMessageConsumer: | |||||||
|             logger.error(f"Invalid message from `{producer_name}`: {e}") |             logger.error(f"Invalid message from `{producer_name}`: {e}") | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         # We shouldn't get empty messages |  | ||||||
|         if producer_message.data is None: |  | ||||||
|             return |  | ||||||
|  |  | ||||||
|         logger.info(f"Received message of type `{producer_message.type}` from `{producer_name}`") |         logger.info(f"Received message of type `{producer_message.type}` from `{producer_name}`") | ||||||
|  |  | ||||||
|         message_handler = self._message_handlers.get(producer_message.type) |         message_handler = self._message_handlers.get(producer_message.type) | ||||||
| @@ -282,7 +278,8 @@ class ExternalMessageConsumer: | |||||||
|         try: |         try: | ||||||
|             # Validate the message |             # Validate the message | ||||||
|             message = WSWhitelistMessage.parse_obj(message) |             message = WSWhitelistMessage.parse_obj(message) | ||||||
|         except ValidationError: |         except ValidationError as e: | ||||||
|  |             logger.error(f"Invalid message from `{producer_name}`: {e}") | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         # Add the pairlist data to the DataProvider |         # Add the pairlist data to the DataProvider | ||||||
| @@ -293,7 +290,8 @@ class ExternalMessageConsumer: | |||||||
|     def _consume_analyzed_df_message(self, producer_name: str, message: Any): |     def _consume_analyzed_df_message(self, producer_name: str, message: Any): | ||||||
|         try: |         try: | ||||||
|             message = WSAnalyzedDFMessage.parse_obj(message) |             message = WSAnalyzedDFMessage.parse_obj(message) | ||||||
|         except ValidationError: |         except ValidationError as e: | ||||||
|  |             logger.error(f"Invalid message from `{producer_name}`: {e}") | ||||||
|             return |             return | ||||||
|  |  | ||||||
|         key = message.data.key |         key = message.data.key | ||||||
|   | |||||||
| @@ -82,16 +82,20 @@ def test_emc_init(patched_emc, default_conf, mocker, caplog): | |||||||
|     assert str(exc.value) == "You must specify at least 1 Producer to connect to." |     assert str(exc.value) == "You must specify at least 1 Producer to connect to." | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # Parametrize this? | ||||||
| def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): | def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): | ||||||
|     test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"} |     test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"} | ||||||
|  |     producer_name = test_producer['name'] | ||||||
|  |  | ||||||
|     caplog.set_level(logging.DEBUG) |     caplog.set_level(logging.DEBUG) | ||||||
|  |  | ||||||
|     # Test handle whitelist message |     # Test handle whitelist message | ||||||
|     whitelist_message = {"type": "whitelist", "data": ["BTC/USDT"]} |     whitelist_message = {"type": "whitelist", "data": ["BTC/USDT"]} | ||||||
|     patched_emc.handle_producer_message(test_producer, whitelist_message) |     patched_emc.handle_producer_message(test_producer, whitelist_message) | ||||||
|  |  | ||||||
|     assert log_has("Received message of type `whitelist` from `test`", caplog) |     assert log_has(f"Received message of type `whitelist` from `{producer_name}`", caplog) | ||||||
|     assert log_has("Consumed message from `test` of type `RPCMessageType.WHITELIST`", caplog) |     assert log_has( | ||||||
|  |         f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog) | ||||||
|  |  | ||||||
|     # Test handle analyzed_df message |     # Test handle analyzed_df message | ||||||
|     df_message = { |     df_message = { | ||||||
| @@ -104,8 +108,9 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): | |||||||
|     } |     } | ||||||
|     patched_emc.handle_producer_message(test_producer, df_message) |     patched_emc.handle_producer_message(test_producer, df_message) | ||||||
|  |  | ||||||
|     assert log_has("Received message of type `analyzed_df` from `test`", caplog) |     assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) | ||||||
|     assert log_has("Consumed message from `test` of type `RPCMessageType.ANALYZED_DF`", caplog) |     assert log_has( | ||||||
|  |         f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`", caplog) | ||||||
|  |  | ||||||
|     # Test unhandled message |     # Test unhandled message | ||||||
|     unhandled_message = {"type": "status", "data": "RUNNING"} |     unhandled_message = {"type": "status", "data": "RUNNING"} | ||||||
| @@ -113,13 +118,12 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): | |||||||
|  |  | ||||||
|     assert log_has_re(r"Received unhandled message\: .*", caplog) |     assert log_has_re(r"Received unhandled message\: .*", caplog) | ||||||
|  |  | ||||||
|     # Test malformed message |     # Test malformed messages | ||||||
|     caplog.clear() |     caplog.clear() | ||||||
|     malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}} |     malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}} | ||||||
|     patched_emc.handle_producer_message(test_producer, malformed_message) |     patched_emc.handle_producer_message(test_producer, malformed_message) | ||||||
|  |  | ||||||
|     assert log_has("Received message of type `whitelist` from `test`", caplog) |     assert log_has_re(r"Invalid message .+", caplog) | ||||||
|     assert not log_has("Consumed message from `test` of type `RPCMessageType.WHITELIST`", caplog) |  | ||||||
|  |  | ||||||
|     malformed_message = { |     malformed_message = { | ||||||
|         "type": "analyzed_df", |         "type": "analyzed_df", | ||||||
| @@ -131,5 +135,17 @@ def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history): | |||||||
|     } |     } | ||||||
|     patched_emc.handle_producer_message(test_producer, malformed_message) |     patched_emc.handle_producer_message(test_producer, malformed_message) | ||||||
|  |  | ||||||
|     assert log_has("Received message of type `analyzed_df` from `test`", caplog) |     assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog) | ||||||
|     assert not log_has("Consumed message from `test` of type `RPCMessageType.ANALYZED_DF`", caplog) |     assert log_has_re(r"Invalid message .+", caplog) | ||||||
|  |  | ||||||
|  |     caplog.clear() | ||||||
|  |     malformed_message = {"some": "stuff"} | ||||||
|  |     patched_emc.handle_producer_message(test_producer, malformed_message) | ||||||
|  |  | ||||||
|  |     assert log_has_re(r"Invalid message .+", caplog) | ||||||
|  |  | ||||||
|  |     caplog.clear() | ||||||
|  |     malformed_message = {"type": "whitelist", "data": None} | ||||||
|  |     patched_emc.handle_producer_message(test_producer, malformed_message) | ||||||
|  |  | ||||||
|  |     assert log_has_re(r"Invalid message .+", caplog) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user