Merge remote-tracking branch 'origin/develop' into dev-merge-rl
This commit is contained in:
@@ -45,7 +45,6 @@ def test_rpc_trade_status(default_conf, ticker, fee, mocker) -> None:
|
||||
|
||||
freqtradebot.enter_positions()
|
||||
trades = Trade.get_open_trades()
|
||||
trades[0].open_order_id = None
|
||||
freqtradebot.exit_positions(trades)
|
||||
|
||||
results = rpc._rpc_trade_status()
|
||||
@@ -1031,6 +1030,7 @@ def test_rpc_count(mocker, default_conf, ticker, fee) -> None:
|
||||
|
||||
def test_rpc_force_entry(mocker, default_conf, ticker, fee, limit_buy_order_open) -> None:
|
||||
default_conf['force_entry_enable'] = True
|
||||
default_conf['max_open_trades'] = 0
|
||||
mocker.patch('freqtrade.rpc.telegram.Telegram', MagicMock())
|
||||
buy_mm = MagicMock(return_value=limit_buy_order_open)
|
||||
mocker.patch.multiple(
|
||||
@@ -1045,6 +1045,10 @@ def test_rpc_force_entry(mocker, default_conf, ticker, fee, limit_buy_order_open
|
||||
patch_get_signal(freqtradebot)
|
||||
rpc = RPC(freqtradebot)
|
||||
pair = 'ETH/BTC'
|
||||
with pytest.raises(RPCException, match='Maximum number of trades is reached.'):
|
||||
rpc._rpc_force_entry(pair, None)
|
||||
freqtradebot.config['max_open_trades'] = 5
|
||||
|
||||
trade = rpc._rpc_force_entry(pair, None)
|
||||
assert isinstance(trade, Trade)
|
||||
assert trade.pair == pair
|
||||
|
@@ -3,6 +3,8 @@ Unit test file for rpc/api_server.py
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from unittest.mock import ANY, MagicMock, PropertyMock
|
||||
@@ -10,7 +12,7 @@ from unittest.mock import ANY, MagicMock, PropertyMock
|
||||
import pandas as pd
|
||||
import pytest
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, WebSocketDisconnect
|
||||
from fastapi.exceptions import HTTPException
|
||||
from fastapi.testclient import TestClient
|
||||
from requests.auth import _basic_auth_str
|
||||
@@ -31,6 +33,7 @@ from tests.conftest import (CURRENT_TEST_STRATEGY, create_mock_trades, get_mock_
|
||||
BASE_URI = "/api/v1"
|
||||
_TEST_USER = "FreqTrader"
|
||||
_TEST_PASS = "SuperSecurePassword1!"
|
||||
_TEST_WS_TOKEN = "secret_Ws_t0ken"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -44,17 +47,21 @@ def botclient(default_conf, mocker):
|
||||
"CORS_origins": ['http://example.com'],
|
||||
"username": _TEST_USER,
|
||||
"password": _TEST_PASS,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}})
|
||||
|
||||
ftbot = get_patched_freqtradebot(mocker, default_conf)
|
||||
rpc = RPC(ftbot)
|
||||
mocker.patch('freqtrade.rpc.api_server.ApiServer.start_api', MagicMock())
|
||||
apiserver = None
|
||||
try:
|
||||
apiserver = ApiServer(default_conf)
|
||||
apiserver.add_rpc_handler(rpc)
|
||||
yield ftbot, TestClient(apiserver.app)
|
||||
# Cleanup ... ?
|
||||
finally:
|
||||
if apiserver:
|
||||
apiserver.cleanup()
|
||||
ApiServer.shutdown()
|
||||
|
||||
|
||||
@@ -154,6 +161,25 @@ def test_api_auth():
|
||||
get_user_from_token(b'not_a_token', 'secret1234')
|
||||
|
||||
|
||||
def test_api_ws_auth(botclient):
|
||||
ftbot, client = botclient
|
||||
def url(token): return f"/api/v1/message/ws?token={token}"
|
||||
|
||||
bad_token = "bad-ws_token"
|
||||
with pytest.raises(WebSocketDisconnect):
|
||||
with client.websocket_connect(url(bad_token)) as websocket:
|
||||
websocket.receive()
|
||||
|
||||
good_token = _TEST_WS_TOKEN
|
||||
with client.websocket_connect(url(good_token)) as websocket:
|
||||
pass
|
||||
|
||||
jwt_secret = ftbot.config['api_server'].get('jwt_secret_key', 'super-secret')
|
||||
jwt_token = create_token({'identity': {'u': 'Freqtrade'}}, jwt_secret)
|
||||
with client.websocket_connect(url(jwt_token)) as websocket:
|
||||
pass
|
||||
|
||||
|
||||
def test_api_unauthorized(botclient):
|
||||
ftbot, client = botclient
|
||||
rc = client.get(f"{BASE_URI}/ping")
|
||||
@@ -261,6 +287,7 @@ def test_api__init__(default_conf, mocker):
|
||||
with pytest.raises(OperationalException, match="RPC Handler already attached."):
|
||||
apiserver.add_rpc_handler(RPC(get_patched_freqtradebot(mocker, default_conf)))
|
||||
|
||||
apiserver.cleanup()
|
||||
ApiServer.shutdown()
|
||||
|
||||
|
||||
@@ -388,6 +415,7 @@ def test_api_run(default_conf, mocker, caplog):
|
||||
MagicMock(side_effect=Exception))
|
||||
apiserver.start_api()
|
||||
assert log_has("Api server failed to start.", caplog)
|
||||
apiserver.cleanup()
|
||||
ApiServer.shutdown()
|
||||
|
||||
|
||||
@@ -410,6 +438,7 @@ def test_api_cleanup(default_conf, mocker, caplog):
|
||||
apiserver.cleanup()
|
||||
assert apiserver._server.cleanup.call_count == 1
|
||||
assert log_has("Stopping API Server", caplog)
|
||||
assert log_has("Stopping API Server background tasks", caplog)
|
||||
ApiServer.shutdown()
|
||||
|
||||
|
||||
@@ -1449,6 +1478,10 @@ def test_api_strategy(botclient):
|
||||
rc = client_get(client, f"{BASE_URI}/strategy/NoStrat")
|
||||
assert_response(rc, 404)
|
||||
|
||||
# Disallow base64 strategies
|
||||
rc = client_get(client, f"{BASE_URI}/strategy/xx:cHJpbnQoImhlbGxvIHdvcmxkIik=")
|
||||
assert_response(rc, 500)
|
||||
|
||||
|
||||
def test_list_available_pairs(botclient):
|
||||
ftbot, client = botclient
|
||||
@@ -1622,6 +1655,11 @@ def test_api_backtesting(botclient, mocker, fee, caplog, tmpdir):
|
||||
assert not result['running']
|
||||
assert result['status_msg'] == 'Backtest reset'
|
||||
|
||||
# Disallow base64 strategies
|
||||
data['strategy'] = "xx:cHJpbnQoImhlbGxvIHdvcmxkIik="
|
||||
rc = client_post(client, f"{BASE_URI}/backtest", data=json.dumps(data))
|
||||
assert_response(rc, 500)
|
||||
|
||||
|
||||
def test_api_backtest_history(botclient, mocker, testdatadir):
|
||||
ftbot, client = botclient
|
||||
@@ -1664,3 +1702,93 @@ def test_health(botclient):
|
||||
ret = rc.json()
|
||||
assert ret['last_process_ts'] == 0
|
||||
assert ret['last_process'] == '1970-01-01T00:00:00+00:00'
|
||||
|
||||
|
||||
def test_api_ws_subscribe(botclient, mocker):
|
||||
ftbot, client = botclient
|
||||
ws_url = f"/api/v1/message/ws?token={_TEST_WS_TOKEN}"
|
||||
|
||||
sub_mock = mocker.patch('freqtrade.rpc.api_server.ws.WebSocketChannel.set_subscriptions')
|
||||
|
||||
with client.websocket_connect(ws_url) as ws:
|
||||
ws.send_json({'type': 'subscribe', 'data': ['whitelist']})
|
||||
|
||||
# Check call count is now 1 as we sent a valid subscribe request
|
||||
assert sub_mock.call_count == 1
|
||||
|
||||
with client.websocket_connect(ws_url) as ws:
|
||||
ws.send_json({'type': 'subscribe', 'data': 'whitelist'})
|
||||
|
||||
# Call count hasn't changed as the subscribe request was invalid
|
||||
assert sub_mock.call_count == 1
|
||||
|
||||
|
||||
def test_api_ws_requests(botclient, mocker, caplog):
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
ftbot, client = botclient
|
||||
ws_url = f"/api/v1/message/ws?token={_TEST_WS_TOKEN}"
|
||||
|
||||
# Test whitelist request
|
||||
with client.websocket_connect(ws_url) as ws:
|
||||
ws.send_json({"type": "whitelist", "data": None})
|
||||
response = ws.receive_json()
|
||||
|
||||
assert log_has_re(r"Request of type whitelist from.+", caplog)
|
||||
assert response['type'] == "whitelist"
|
||||
|
||||
# Test analyzed_df request
|
||||
with client.websocket_connect(ws_url) as ws:
|
||||
ws.send_json({"type": "analyzed_df", "data": {}})
|
||||
response = ws.receive_json()
|
||||
|
||||
assert log_has_re(r"Request of type analyzed_df from.+", caplog)
|
||||
assert response['type'] == "analyzed_df"
|
||||
|
||||
caplog.clear()
|
||||
# Test analyzed_df request with data
|
||||
with client.websocket_connect(ws_url) as ws:
|
||||
ws.send_json({"type": "analyzed_df", "data": {"limit": 100}})
|
||||
response = ws.receive_json()
|
||||
|
||||
assert log_has_re(r"Request of type analyzed_df from.+", caplog)
|
||||
assert response['type'] == "analyzed_df"
|
||||
|
||||
|
||||
def test_api_ws_send_msg(default_conf, mocker, caplog):
|
||||
try:
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
default_conf.update({"api_server": {"enabled": True,
|
||||
"listen_ip_address": "127.0.0.1",
|
||||
"listen_port": 8080,
|
||||
"CORS_origins": ['http://example.com'],
|
||||
"username": _TEST_USER,
|
||||
"password": _TEST_PASS,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}})
|
||||
mocker.patch('freqtrade.rpc.telegram.Updater')
|
||||
mocker.patch('freqtrade.rpc.api_server.ApiServer.start_api')
|
||||
apiserver = ApiServer(default_conf)
|
||||
apiserver.add_rpc_handler(RPC(get_patched_freqtradebot(mocker, default_conf)))
|
||||
apiserver.start_message_queue()
|
||||
# Give the queue thread time to start
|
||||
time.sleep(0.2)
|
||||
|
||||
# Test message_queue coro receives the message
|
||||
test_message = {"type": "status", "data": "test"}
|
||||
apiserver.send_msg(test_message)
|
||||
time.sleep(0.1) # Not sure how else to wait for the coro to receive the data
|
||||
assert log_has("Found message of type: status", caplog)
|
||||
|
||||
# Test if exception logged when error occurs in sending
|
||||
mocker.patch('freqtrade.rpc.api_server.ws.channel.ChannelManager.broadcast',
|
||||
side_effect=Exception)
|
||||
|
||||
apiserver.send_msg(test_message)
|
||||
time.sleep(0.1) # Not sure how else to wait for the coro to receive the data
|
||||
assert log_has_re(r"Exception happened in background task.*", caplog)
|
||||
|
||||
finally:
|
||||
apiserver.cleanup()
|
||||
ApiServer.shutdown()
|
||||
|
467
tests/rpc/test_rpc_emc.py
Normal file
467
tests/rpc/test_rpc_emc.py
Normal file
@@ -0,0 +1,467 @@
|
||||
"""
|
||||
Unit test file for rpc/external_message_consumer.py
|
||||
"""
|
||||
import asyncio
|
||||
import functools
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
import websockets
|
||||
|
||||
from freqtrade.data.dataprovider import DataProvider
|
||||
from freqtrade.rpc.external_message_consumer import ExternalMessageConsumer
|
||||
from tests.conftest import log_has, log_has_re, log_has_when
|
||||
|
||||
|
||||
_TEST_WS_TOKEN = "secret_Ws_t0ken"
|
||||
_TEST_WS_HOST = "127.0.0.1"
|
||||
_TEST_WS_PORT = 9989
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_emc(default_conf, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": "null",
|
||||
"port": 9891,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
]
|
||||
}
|
||||
})
|
||||
dataprovider = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dataprovider)
|
||||
|
||||
try:
|
||||
yield emc
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
def test_emc_start(patched_emc, caplog):
|
||||
# Test if the message was printed
|
||||
assert log_has_when("Starting ExternalMessageConsumer", caplog, "setup")
|
||||
# Test if the thread and loop objects were created
|
||||
assert patched_emc._thread and patched_emc._loop
|
||||
|
||||
# Test we call start again nothing happens
|
||||
prev_thread = patched_emc._thread
|
||||
patched_emc.start()
|
||||
assert prev_thread == patched_emc._thread
|
||||
|
||||
|
||||
def test_emc_shutdown(patched_emc, caplog):
|
||||
patched_emc.shutdown()
|
||||
|
||||
assert log_has("Stopping ExternalMessageConsumer", caplog)
|
||||
# Test the loop has stopped
|
||||
assert patched_emc._loop is None
|
||||
# Test if the thread has stopped
|
||||
assert patched_emc._thread is None
|
||||
|
||||
caplog.clear()
|
||||
patched_emc.shutdown()
|
||||
|
||||
# Test func didn't run again as it was called once already
|
||||
assert not log_has("Stopping ExternalMessageConsumer", caplog)
|
||||
|
||||
|
||||
def test_emc_init(patched_emc):
|
||||
# Test the settings were set correctly
|
||||
assert patched_emc.initial_candle_limit <= 1500
|
||||
assert patched_emc.wait_timeout > 0
|
||||
assert patched_emc.sleep_time > 0
|
||||
|
||||
|
||||
# Parametrize this?
|
||||
def test_emc_handle_producer_message(patched_emc, caplog, ohlcv_history):
|
||||
test_producer = {"name": "test", "url": "ws://test", "ws_token": "test"}
|
||||
producer_name = test_producer['name']
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
# Test handle whitelist message
|
||||
whitelist_message = {"type": "whitelist", "data": ["BTC/USDT"]}
|
||||
patched_emc.handle_producer_message(test_producer, whitelist_message)
|
||||
|
||||
assert log_has(f"Received message of type `whitelist` from `{producer_name}`", caplog)
|
||||
assert log_has(
|
||||
f"Consumed message from `{producer_name}` of type `RPCMessageType.WHITELIST`", caplog)
|
||||
|
||||
# Test handle analyzed_df message
|
||||
df_message = {
|
||||
"type": "analyzed_df",
|
||||
"data": {
|
||||
"key": ("BTC/USDT", "5m", "spot"),
|
||||
"df": ohlcv_history,
|
||||
"la": datetime.now(timezone.utc)
|
||||
}
|
||||
}
|
||||
patched_emc.handle_producer_message(test_producer, df_message)
|
||||
|
||||
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
||||
assert log_has(
|
||||
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`", caplog)
|
||||
|
||||
# Test unhandled message
|
||||
unhandled_message = {"type": "status", "data": "RUNNING"}
|
||||
patched_emc.handle_producer_message(test_producer, unhandled_message)
|
||||
|
||||
assert log_has_re(r"Received unhandled message\: .*", caplog)
|
||||
|
||||
# Test malformed messages
|
||||
caplog.clear()
|
||||
malformed_message = {"type": "whitelist", "data": {"pair": "BTC/USDT"}}
|
||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||
|
||||
assert log_has_re(r"Invalid message .+", caplog)
|
||||
|
||||
malformed_message = {
|
||||
"type": "analyzed_df",
|
||||
"data": {
|
||||
"key": "BTC/USDT",
|
||||
"df": ohlcv_history,
|
||||
"la": datetime.now(timezone.utc)
|
||||
}
|
||||
}
|
||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||
|
||||
assert log_has(f"Received message of type `analyzed_df` from `{producer_name}`", caplog)
|
||||
assert log_has_re(r"Invalid message .+", caplog)
|
||||
|
||||
caplog.clear()
|
||||
malformed_message = {"some": "stuff"}
|
||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||
|
||||
assert log_has_re(r"Invalid message .+", caplog)
|
||||
|
||||
caplog.clear()
|
||||
malformed_message = {"type": "whitelist", "data": None}
|
||||
patched_emc.handle_producer_message(test_producer, malformed_message)
|
||||
|
||||
assert log_has_re(r"Empty message .+", caplog)
|
||||
|
||||
|
||||
async def test_emc_create_connection_success(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 60,
|
||||
"ping_timeout": 60,
|
||||
"sleep_timeout": 60
|
||||
}
|
||||
})
|
||||
|
||||
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
|
||||
MagicMock())
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
test_producer = default_conf['external_message_consumer']['producers'][0]
|
||||
lock = asyncio.Lock()
|
||||
|
||||
emc._running = True
|
||||
|
||||
async def eat(websocket):
|
||||
emc._running = False
|
||||
|
||||
try:
|
||||
async with websockets.serve(eat, _TEST_WS_HOST, _TEST_WS_PORT):
|
||||
await emc._create_connection(test_producer, lock)
|
||||
|
||||
assert log_has_re(r"Producer connection success.+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_create_connection_invalid_port(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": -1,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 60,
|
||||
"ping_timeout": 60,
|
||||
"sleep_timeout": 60
|
||||
}
|
||||
})
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(0.01)
|
||||
assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_create_connection_invalid_host(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": "10000.1241..2121/",
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 60,
|
||||
"ping_timeout": 60,
|
||||
"sleep_timeout": 60
|
||||
}
|
||||
})
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(0.01)
|
||||
assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_create_connection_error(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 60,
|
||||
"ping_timeout": 60,
|
||||
"sleep_timeout": 60
|
||||
}
|
||||
})
|
||||
|
||||
# Test unexpected error
|
||||
mocker.patch('websockets.connect', side_effect=RuntimeError)
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(0.01)
|
||||
assert log_has("Unexpected error has occurred:", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_receive_messages_valid(default_conf, caplog, mocker):
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 1,
|
||||
"ping_timeout": 60,
|
||||
"sleep_time": 60
|
||||
}
|
||||
})
|
||||
|
||||
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
|
||||
MagicMock())
|
||||
|
||||
lock = asyncio.Lock()
|
||||
test_producer = default_conf['external_message_consumer']['producers'][0]
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
def change_running(emc): emc._running = not emc._running
|
||||
|
||||
class TestChannel:
|
||||
async def recv(self, *args, **kwargs):
|
||||
return {"type": "whitelist", "data": ["BTC/USDT"]}
|
||||
|
||||
async def ping(self, *args, **kwargs):
|
||||
return asyncio.Future()
|
||||
|
||||
try:
|
||||
change_running(emc)
|
||||
loop.call_soon(functools.partial(change_running, emc=emc))
|
||||
await emc._receive_messages(TestChannel(), test_producer, lock)
|
||||
|
||||
assert log_has_re(r"Received message of type `whitelist`.+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_receive_messages_invalid(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 1,
|
||||
"ping_timeout": 60,
|
||||
"sleep_time": 60
|
||||
}
|
||||
})
|
||||
|
||||
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
|
||||
MagicMock())
|
||||
|
||||
lock = asyncio.Lock()
|
||||
test_producer = default_conf['external_message_consumer']['producers'][0]
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
def change_running(emc): emc._running = not emc._running
|
||||
|
||||
class TestChannel:
|
||||
async def recv(self, *args, **kwargs):
|
||||
return {"type": ["BTC/USDT"]}
|
||||
|
||||
async def ping(self, *args, **kwargs):
|
||||
return asyncio.Future()
|
||||
|
||||
try:
|
||||
change_running(emc)
|
||||
loop.call_soon(functools.partial(change_running, emc=emc))
|
||||
await emc._receive_messages(TestChannel(), test_producer, lock)
|
||||
|
||||
assert log_has_re(r"Invalid message from.+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 0.1,
|
||||
"ping_timeout": 1,
|
||||
"sleep_time": 1
|
||||
}
|
||||
})
|
||||
|
||||
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
|
||||
MagicMock())
|
||||
|
||||
lock = asyncio.Lock()
|
||||
test_producer = default_conf['external_message_consumer']['producers'][0]
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
def change_running(emc): emc._running = not emc._running
|
||||
|
||||
class TestChannel:
|
||||
async def recv(self, *args, **kwargs):
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
async def ping(self, *args, **kwargs):
|
||||
return asyncio.Future()
|
||||
|
||||
try:
|
||||
change_running(emc)
|
||||
loop.call_soon(functools.partial(change_running, emc=emc))
|
||||
await emc._receive_messages(TestChannel(), test_producer, lock)
|
||||
|
||||
assert log_has_re(r"Ping error.+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
||||
|
||||
|
||||
async def test_emc_receive_messages_handle_error(default_conf, caplog, mocker):
|
||||
default_conf.update({
|
||||
"external_message_consumer": {
|
||||
"enabled": True,
|
||||
"producers": [
|
||||
{
|
||||
"name": "default",
|
||||
"host": _TEST_WS_HOST,
|
||||
"port": _TEST_WS_PORT,
|
||||
"ws_token": _TEST_WS_TOKEN
|
||||
}
|
||||
],
|
||||
"wait_timeout": 1,
|
||||
"ping_timeout": 1,
|
||||
"sleep_time": 1
|
||||
}
|
||||
})
|
||||
|
||||
mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
|
||||
MagicMock())
|
||||
|
||||
lock = asyncio.Lock()
|
||||
test_producer = default_conf['external_message_consumer']['producers'][0]
|
||||
|
||||
dp = DataProvider(default_conf, None, None, None)
|
||||
emc = ExternalMessageConsumer(default_conf, dp)
|
||||
|
||||
emc.handle_producer_message = MagicMock(side_effect=Exception)
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
def change_running(emc): emc._running = not emc._running
|
||||
|
||||
class TestChannel:
|
||||
async def recv(self, *args, **kwargs):
|
||||
return {"type": "whitelist", "data": ["BTC/USDT"]}
|
||||
|
||||
async def ping(self, *args, **kwargs):
|
||||
return asyncio.Future()
|
||||
|
||||
try:
|
||||
change_running(emc)
|
||||
loop.call_soon(functools.partial(change_running, emc=emc))
|
||||
await emc._receive_messages(TestChannel(), test_producer, lock)
|
||||
|
||||
assert log_has_re(r"Error handling producer message.+", caplog)
|
||||
finally:
|
||||
emc.shutdown()
|
@@ -365,6 +365,14 @@ def test_exception_send_msg(default_conf, mocker, caplog):
|
||||
with pytest.raises(NotImplementedError):
|
||||
webhook.send_msg(msg)
|
||||
|
||||
# Test no failure for not implemented but known messagetypes
|
||||
for e in RPCMessageType:
|
||||
msg = {
|
||||
'type': e,
|
||||
'status': 'whatever'
|
||||
}
|
||||
webhook.send_msg(msg)
|
||||
|
||||
|
||||
def test__send_msg(default_conf, mocker, caplog):
|
||||
default_conf["webhook"] = get_webhook_dict()
|
||||
|
Reference in New Issue
Block a user