reworked emc tests

This commit is contained in:
Timothy Pogue 2022-09-11 00:50:18 -06:00
parent ed4ba8801f
commit 0a8b7686d6

View File

@ -3,7 +3,6 @@ Unit test file for rpc/external_message_consumer.py
""" """
import asyncio import asyncio
import functools import functools
import json
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
from unittest.mock import MagicMock from unittest.mock import MagicMock
@ -220,10 +219,11 @@ async def test_emc_create_connection_invalid(default_conf, caplog, mocker):
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start', mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock()) MagicMock())
test_producer = default_conf['external_message_consumer']['producers'][0]
lock = asyncio.Lock() lock = asyncio.Lock()
dp = DataProvider(default_conf, None, None, None) dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp) emc = ExternalMessageConsumer(default_conf, dp)
test_producer = default_conf['external_message_consumer']['producers'][0]
try: try:
# Test invalid URL # Test invalid URL
@ -267,13 +267,7 @@ async def test_emc_create_connection_error(default_conf, caplog, mocker):
emc.shutdown() emc.shutdown()
async def test_emc_receive_messages(default_conf, caplog, mocker): async def test_emc_receive_messages_valid(default_conf, caplog, mocker):
"""
Test ExternalMessageConsumer._receive_messages
Instantiates a patched ExternalMessageConsumer, creates a dummy websocket server,
and listens to the generated messages from the server for 1 second, then checks logs
"""
default_conf.update({ default_conf.update({
"external_message_consumer": { "external_message_consumer": {
"enabled": True, "enabled": True,
@ -284,9 +278,9 @@ async def test_emc_receive_messages(default_conf, caplog, mocker):
"ws_token": _TEST_WS_TOKEN "ws_token": _TEST_WS_TOKEN
} }
], ],
"wait_timeout": 60, "wait_timeout": 1,
"ping_timeout": 60, "ping_timeout": 60,
"sleep_timeout": 60 "sleep_time": 60
} }
}) })
@ -299,28 +293,161 @@ async def test_emc_receive_messages(default_conf, caplog, mocker):
dp = DataProvider(default_conf, None, None, None) dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp) emc = ExternalMessageConsumer(default_conf, dp)
# Dummy generator
async def generate_messages(websocket):
try:
for i in range(3):
message = json.dumps({"type": "whitelist", "data": ["BTC/USDT"]})
await websocket.send(message)
await asyncio.sleep(1)
except websockets.exceptions.ConnectionClosedOK:
return
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running def change_running(emc): emc._running = not emc._running
class TestChannel:
async def recv(self, *args, **kwargs):
return {"type": "whitelist", "data": ["BTC/USDT"]}
async def ping(self, *args, **kwargs):
return asyncio.Future()
try: try:
# Start the dummy websocket server change_running(emc)
async with websockets.serve(generate_messages, _TEST_WS_HOST, _TEST_WS_PORT): loop.call_soon(functools.partial(change_running, emc=emc))
# Change running to True, and call change_running in 1 second await emc._receive_messages(TestChannel(), test_producer, lock)
emc._running = True
loop.call_later(1, functools.partial(change_running, emc=emc))
# Create the connection that receives messages
await emc._create_connection(test_producer, lock)
assert log_has_re(r"Received message of type `whitelist`.+", caplog) assert log_has_re(r"Received message of type `whitelist`.+", caplog)
finally: finally:
emc.shutdown() emc.shutdown()
async def test_emc_receive_messages_invalid(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
"url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws",
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 1,
"ping_timeout": 60,
"sleep_time": 60
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
class TestChannel:
async def recv(self, *args, **kwargs):
return {"type": ["BTC/USDT"]}
async def ping(self, *args, **kwargs):
return asyncio.Future()
try:
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
await emc._receive_messages(TestChannel(), test_producer, lock)
assert log_has_re(r"Invalid message from.+", caplog)
finally:
emc.shutdown()
async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
"url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws",
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 1,
"ping_timeout": 1,
"sleep_time": 1
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
class TestChannel:
async def recv(self, *args, **kwargs):
await asyncio.sleep(10)
async def ping(self, *args, **kwargs):
return asyncio.Future()
try:
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
await emc._receive_messages(TestChannel(), test_producer, lock)
assert log_has_re(r"Ping error.+", caplog)
finally:
emc.shutdown()
async def test_emc_receive_messages_handle_error(default_conf, caplog, mocker):
default_conf.update({
"external_message_consumer": {
"enabled": True,
"producers": [
{
"name": "default",
"url": f"ws://{_TEST_WS_HOST}:{_TEST_WS_PORT}/api/v1/message/ws",
"ws_token": _TEST_WS_TOKEN
}
],
"wait_timeout": 1,
"ping_timeout": 1,
"sleep_time": 1
}
})
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
MagicMock())
lock = asyncio.Lock()
test_producer = default_conf['external_message_consumer']['producers'][0]
dp = DataProvider(default_conf, None, None, None)
emc = ExternalMessageConsumer(default_conf, dp)
emc.handle_producer_message = MagicMock(side_effect=Exception)
loop = asyncio.get_event_loop()
def change_running(emc): emc._running = not emc._running
class TestChannel:
async def recv(self, *args, **kwargs):
return {"type": "whitelist", "data": ["BTC/USDT"]}
async def ping(self, *args, **kwargs):
return asyncio.Future()
try:
change_running(emc)
loop.call_soon(functools.partial(change_running, emc=emc))
await emc._receive_messages(TestChannel(), test_producer, lock)
assert log_has_re(r"Error handling producer message.+", caplog)
finally:
emc.shutdown()