diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index 1c2a27617..ae7c1f765 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -90,16 +90,16 @@ class ExternalMessageConsumer: """ Start the main internal loop in another thread to run coroutines """ + if self._thread and self._loop: + return + + logger.info("Starting ExternalMessageConsumer") + self._loop = asyncio.new_event_loop() + self._thread = Thread(target=self._loop.run_forever) + self._thread.start() - if not self._thread: - logger.info("Starting ExternalMessageConsumer") - - self._thread = Thread(target=self._loop.run_forever) - self._thread.start() - self._running = True - else: - raise RuntimeError("A loop is already running") + self._running = True self._main_task = asyncio.run_coroutine_threadsafe(self._main(), loop=self._loop) @@ -121,6 +121,11 @@ class ExternalMessageConsumer: self._thread.join() + self._thread = None + self._loop = None + self._sub_tasks = None + self._main_task = None + async def _main(self): """ The main task coroutine diff --git a/tests/conftest.py b/tests/conftest.py index fffac8e0a..6ce767918 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -58,6 +58,11 @@ def log_has(line, logs): return any(line == message for message in logs.messages) +def log_has_when(line, logs, when): + """Check if line is found in caplog's messages during a specified stage""" + return any(line == message.message for message in logs.get_records(when)) + + def log_has_re(line, logs): """Check if line matches some caplog's message.""" return any(re.match(line, message) for message in logs.messages) diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py new file mode 100644 index 000000000..6512ff2be --- /dev/null +++ b/tests/rpc/test_rpc_emc.py @@ -0,0 +1,79 @@ +""" +Unit test file for rpc/external_message_consumer.py +""" +import pytest + +from freqtrade.data.dataprovider import DataProvider +from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer +from tests.conftest import log_has, log_has_when + + +@pytest.fixture(autouse=True) +def patched_emc(default_conf, mocker): + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [ + { + "name": "default", + "url": "ws://127.0.0.1:8080/api/v1/message/ws", + "ws_token": "secret_Ws_t0ken" + } + ] + } + }) + dataprovider = DataProvider(default_conf, None, None, None) + emc = ExternalMessageConsumer(default_conf, dataprovider) + + try: + yield emc + finally: + emc.shutdown() + + +def test_emc_start(patched_emc, caplog): + # Test if the message was printed + assert log_has_when("Starting ExternalMessageConsumer", caplog, "setup") + # Test if the thread and loop objects were created + assert patched_emc._thread and patched_emc._loop + + # Test we call start again nothing happens + prev_thread = patched_emc._thread + patched_emc.start() + assert prev_thread == patched_emc._thread + + +def test_emc_shutdown(patched_emc, caplog): + patched_emc.shutdown() + + assert log_has("Stopping ExternalMessageConsumer", caplog) + # Test the loop has stopped + assert patched_emc._loop is None + # Test if the thread has stopped + assert patched_emc._thread is None + + caplog.clear() + patched_emc.shutdown() + + # Test func didn't run again as it was called once already + assert not log_has("Stopping ExternalMessageConsumer", caplog) + + +def test_emc_init(patched_emc, default_conf, mocker, caplog): + # Test the settings were set correctly + assert patched_emc.initial_candle_limit <= 1500 + assert patched_emc.wait_timeout > 0 + assert patched_emc.sleep_time > 0 + + default_conf.update({ + "external_message_consumer": { + "enabled": True, + "producers": [] + } + }) + dataprovider = DataProvider(default_conf, None, None, None) + with pytest.raises(ValueError) as exc: + ExternalMessageConsumer(default_conf, dataprovider) + + # Make sure we failed because of no producers + assert str(exc.value) == "You must specify at least 1 Producer to connect to."