stable/tests/rpc/test_rpc_emc.py

468 lines
14 KiB
Python
Raw Normal View History

"""
Unit test file for rpc/external_message_consumer.py
"""
2022-09-11 05:57:17 +00:00
import asyncio
import functools
2022-09-10 19:44:27 +00:00
import logging
from datetime import datetime, timezone
2022-09-11 05:57:17 +00:00
from unittest.mock import MagicMock
2022-09-10 19:44:27 +00:00
import pytest
2022-09-11 05:57:17 +00:00
import websockets
from freqtrade.data.dataprovider import DataProvider
from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer
2022-09-10 19:44:27 +00:00
from tests.conftest import log_has, log_has_re, log_has_when
2022-09-11 05:57:17 +00:00
_TEST_WS_TOKEN = "secret_Ws_t0ken"
_TEST_WS_HOST = "127.0.0.1"
2022-09-11 05:57:17 +00:00
_TEST_WS_PORT = 9989
2022-09-10 19:50:36 +00:00
@pytest.fixture
def patched_emc(default_conf, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": "null",
"port": 9891,
2022-09-11 05:57:17 +00:00
"ws_token": _TEST_WS_TOKEN
}
]
}
})
dataprovider = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dataprovider)
try:
yield emc
finally:
emc.shutdown()
def test_emc_start(patched_emc, caplog):
# Test if the message was printed
assert log_has_when("Starting ExternalMessageConsumer", caplog, "setup")
# Test if the thread and loop objects were created
assert patched_emc._thread and patched_emc._loop
# Test we call start again nothing happens
prev_thread = patched_emc._thread
patched_emc.start()
assert prev_thread == patched_emc._thread
def test_emc_shutdown(patched_emc, caplog):
patched_emc.shutdown()
assert log_has("Stopping ExternalMessageConsumer", caplog)
# Test the loop has stopped
assert patched_emc._loop is None
# Test if the thread has stopped
assert patched_emc._thread is None
caplog.clear()
patched_emc.shutdown()
# Test func didn't run again as it was called once already
assert not log_has("Stopping ExternalMessageConsumer", caplog)
2022-09-24 14:38:56 +00:00
def test_emc_init(patched_emc):
# Test the settings were set correctly
assert patched_emc.initial_candle_limit <= 1500
assert patched_emc.wait_timeout > 0
assert patched_emc.sleep_time > 0
2022-09-10 19:44:27 +00:00
2022-09-10 20:29:15 +00:00
# Parametrize this?
2022-09-10 19:44:27 +00:00
def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"}
2022-09-10 20:29:15 +00:00
producer_name = test_producer['name']
2022-12-14 06:22:41 +00:00
invalid_msg = r"Invalid message .+"
2022-09-10 20:29:15 +00:00
2022-09-10 19:44:27 +00:00
caplog.set_level(logging.DEBUG)
# Test handle whitelist message
whitelist_message = {"type": "whitelist", "data": ["BTC/USDT"]}
patched_emc.handle_producer_message(test_producer, whitelist_message)
2022-09-10 20:29:15 +00:00
assert log_has(f"Received message of type `whitelist` from `{producer_name}`", caplog)
assert log_has(
f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog)
2022-09-10 19:44:27 +00:00
2022-12-02 19:28:27 +00:00
# Test handle analyzed_df single candle message
2022-09-10 19:44:27 +00:00
df_message = {
"type": "analyzed_df",
"data": {
"key": ("BTC/USDT", "5m", "spot"),
"df": ohlcv_history,
"la": datetime.now(timezone.utc)
}
}
patched_emc.handle_producer_message(test_producer, df_message)
2022-09-10 20:29:15 +00:00
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
2022-12-02 19:28:27 +00:00
assert log_has_re(r"Holes in data or no existing df,.+", caplog)
2022-09-10 19:44:27 +00:00
# Test unhandled message
unhandled_message = {"type": "status", "data": "RUNNING"}
patched_emc.handle_producer_message(test_producer, unhandled_message)
assert log_has_re(r"Received unhandled message\: .*", caplog)
2022-09-10 20:29:15 +00:00
# Test malformed messages
2022-09-10 19:44:27 +00:00
caplog.clear()
malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}}
patched_emc.handle_producer_message(test_producer, malformed_message)
2022-12-14 06:22:41 +00:00
assert log_has_re(invalid_msg, caplog)
caplog.clear()
2022-09-10 19:44:27 +00:00
malformed_message = {
"type": "analyzed_df",
"data": {
"key": "BTC/USDT",
"df": ohlcv_history,
"la": datetime.now(timezone.utc)
}
}
patched_emc.handle_producer_message(test_producer, malformed_message)
2022-09-10 20:29:15 +00:00
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
2022-12-14 06:22:41 +00:00
assert log_has_re(invalid_msg, caplog)
caplog.clear()
# Empty dataframe
malformed_message = {
"type": "analyzed_df",
"data": {
"key": ("BTC/USDT", "5m", "spot"),
"df": ohlcv_history.loc[ohlcv_history['open'] < 0],
"la": datetime.now(timezone.utc)
}
}
patched_emc.handle_producer_message(test_producer, malformed_message)
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
assert not log_has_re(invalid_msg, caplog)
assert log_has_re(r"Received Empty Dataframe for.+", caplog)
2022-09-10 20:29:15 +00:00
caplog.clear()
malformed_message = {"some": "stuff"}
patched_emc.handle_producer_message(test_producer, malformed_message)
2022-12-14 06:22:41 +00:00
assert log_has_re(invalid_msg, caplog)
caplog.clear()
2022-09-10 20:29:15 +00:00
caplog.clear()
malformed_message = {"type": "whitelist", "data": None}
patched_emc.handle_producer_message(test_producer, malformed_message)
2022-09-12 20:12:39 +00:00
assert log_has_re(r"Empty message .+", caplog)
2022-09-11 05:57:17 +00:00
async def test_emc_create_connection_success(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": _TEST_WS_HOST,
"port": _TEST_WS_PORT,
2022-09-11 05:57:17 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 60,
"ping_timeout": 60,
"sleep_timeout": 60
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
test_producer = default_conf['external_message_consumer']['producers'][0]
lock = asyncio.Lock()
2022-09-13 19:36:21 +00:00
emc._running = True
2022-09-11 05:57:17 +00:00
async def eat(websocket):
2022-09-13 19:36:21 +00:00
emc._running = False
2022-09-11 05:57:17 +00:00
try:
async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT):
await emc._create_connection(test_producer, lock)
2022-12-02 19:28:27 +00:00
assert log_has_re(r"Connected to channel.+", caplog)
2022-09-11 05:57:17 +00:00
finally:
emc.shutdown()
2022-10-03 04:39:20 +00:00
@pytest.mark.parametrize('host,port', [
(_TEST_WS_HOST, -1),
("10000.1241..2121/", _TEST_WS_PORT),
])
async def test_emc_create_connection_invalid_url(default_conf, caplog, mocker, host, port):
2022-09-23 19:37:46 +00:00
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-10-03 04:39:20 +00:00
"host": host,
"port": port,
2022-09-23 19:37:46 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 60,
"ping_timeout": 60,
"sleep_timeout": 60
}
})
dp = DataProvider(default_conf, None, None, None)
2022-10-02 06:12:03 +00:00
# Handle start explicitly to avoid messing with threading in tests
2022-12-02 19:28:27 +00:00
mocker.patch("freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start")
mocker.patch("freqtrade.rpc.api_server.ws.channel.create_channel")
2022-09-23 19:37:46 +00:00
emc = ExternalMessageConsumer(default_conf, dp)
try:
2022-10-02 06:37:37 +00:00
emc._running = True
2022-10-02 06:12:03 +00:00
await emc._create_connection(emc.producers[0], asyncio.Lock())
2022-09-23 19:37:46 +00:00
assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
finally:
emc.shutdown()
2022-09-11 05:57:17 +00:00
async def test_emc_create_connection_error(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": _TEST_WS_HOST,
"port": _TEST_WS_PORT,
2022-09-11 05:57:17 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 60,
"ping_timeout": 60,
"sleep_timeout": 60
}
})
# Test unexpected error
mocker.patch('websockets.connect', side_effect=RuntimeError)
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
try:
2022-09-15 23:54:31 +00:00
await asyncio.sleep(0.01)
2022-09-11 05:57:17 +00:00
assert log_has("Unexpected error has occurred:", caplog)
finally:
emc.shutdown()
2022-09-11 06:50:18 +00:00
async def test_emc_receive_messages_valid(default_conf, caplog, mocker):
2022-09-29 05:19:16 +00:00
caplog.set_level(logging.DEBUG)
2022-09-11 06:50:18 +00:00
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": _TEST_WS_HOST,
"port": _TEST_WS_PORT,
2022-09-11 06:50:18 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 1,
"ping_timeout": 60,
"sleep_time": 60
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
2022-09-11 05:57:17 +00:00
2022-09-11 06:50:18 +00:00
class TestChannel:
async def recv(self, *args, **kwargs):
return {"type": "whitelist", "data": ["BTC/USDT"]}
async def ping(self, *args, **kwargs):
return asyncio.Future()
try:
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
await emc._receive_messages(TestChannel(), test_producer, lock)
assert log_has_re(r"Received message of type `whitelist`.+", caplog)
finally:
emc.shutdown()
async def test_emc_receive_messages_invalid(default_conf, caplog, mocker):
2022-09-11 05:57:17 +00:00
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": _TEST_WS_HOST,
"port": _TEST_WS_PORT,
2022-09-11 05:57:17 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
2022-09-11 06:50:18 +00:00
"wait_timeout": 1,
2022-09-11 05:57:17 +00:00
"ping_timeout": 60,
2022-09-11 06:50:18 +00:00
"sleep_time": 60
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
class TestChannel:
async def recv(self, *args, **kwargs):
return {"type": ["BTC/USDT"]}
async def ping(self, *args, **kwargs):
return asyncio.Future()
try:
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
await emc._receive_messages(TestChannel(), test_producer, lock)
assert log_has_re(r"Invalid message from.+", caplog)
finally:
emc.shutdown()
async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": _TEST_WS_HOST,
"port": _TEST_WS_PORT,
2022-09-11 06:50:18 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
2022-09-23 19:37:46 +00:00
"wait_timeout": 0.1,
2022-09-11 06:50:18 +00:00
"ping_timeout": 1,
"sleep_time": 1
2022-09-11 05:57:17 +00:00
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
2022-09-11 06:50:18 +00:00
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
class TestChannel:
async def recv(self, *args, **kwargs):
2022-09-23 19:37:46 +00:00
await asyncio.sleep(0.2)
2022-09-11 06:50:18 +00:00
async def ping(self, *args, **kwargs):
return asyncio.Future()
try:
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
2022-12-02 19:28:27 +00:00
with pytest.raises(asyncio.TimeoutError):
await emc._receive_messages(TestChannel(), test_producer, lock)
2022-09-11 06:50:18 +00:00
assert log_has_re(r"Ping error.+", caplog)
finally:
emc.shutdown()
async def test_emc_receive_messages_handle_error(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
2022-09-15 23:54:31 +00:00
"host": _TEST_WS_HOST,
"port": _TEST_WS_PORT,
2022-09-11 06:50:18 +00:00
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 1,
"ping_timeout": 1,
"sleep_time": 1
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
emc.handle_producer_message = MagicMock(side_effect=Exception)
2022-09-11 05:57:17 +00:00
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
2022-09-11 06:50:18 +00:00
class TestChannel:
async def recv(self, *args, **kwargs):
return {"type": "whitelist", "data": ["BTC/USDT"]}
async def ping(self, *args, **kwargs):
return asyncio.Future()
2022-09-11 05:57:17 +00:00
try:
2022-09-11 06:50:18 +00:00
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
await emc._receive_messages(TestChannel(), test_producer, lock)
2022-09-11 05:57:17 +00:00
2022-09-11 06:50:18 +00:00
assert log_has_re(r"Error handling producer message.+", caplog)
2022-09-11 05:57:17 +00:00
finally:
emc.shutdown()