Add tests for dataprovider send-message methods
This commit is contained in:
parent
7bac054668
commit
0adfa4d9ef
@ -311,3 +311,22 @@ def test_no_exchange_mode(default_conf):
|
||||
|
||||
with pytest.raises(OperationalException, match=message):
|
||||
dp.available_pairs()
|
||||
|
||||
|
||||
def test_dp_send_msg(default_conf):
|
||||
|
||||
default_conf["runmode"] = RunMode.DRY_RUN
|
||||
|
||||
default_conf["timeframe"] = '1h'
|
||||
dp = DataProvider(default_conf, None)
|
||||
msg = 'Test message'
|
||||
dp.send_msg(msg)
|
||||
|
||||
assert msg in dp._msg_queue
|
||||
dp._msg_queue.pop()
|
||||
assert msg not in dp._msg_queue
|
||||
# Message is not resent due to caching
|
||||
dp.send_msg(msg)
|
||||
assert msg not in dp._msg_queue
|
||||
dp.send_msg(msg, always_send=True)
|
||||
assert msg in dp._msg_queue
|
||||
|
@ -1,6 +1,7 @@
|
||||
# pragma pylint: disable=missing-docstring, C0103
|
||||
import logging
|
||||
import time
|
||||
from collections import deque
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from freqtrade.enums import RPCMessageType
|
||||
@ -81,9 +82,25 @@ def test_send_msg_telegram_disabled(mocker, default_conf, caplog) -> None:
|
||||
assert telegram_mock.call_count == 0
|
||||
|
||||
|
||||
def test_process_msg_queue(mocker, default_conf, caplog) -> None:
|
||||
telegram_mock = mocker.patch('freqtrade.rpc.telegram.Telegram.send_msg')
|
||||
mocker.patch('freqtrade.rpc.telegram.Telegram._init')
|
||||
|
||||
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
|
||||
rpc_manager = RPCManager(freqtradebot)
|
||||
queue = deque()
|
||||
queue.append('Test message')
|
||||
queue.append('Test message 2')
|
||||
rpc_manager.process_msg_queue(queue)
|
||||
|
||||
assert log_has("Sending rpc message: {'type': strategy_msg, 'msg': 'Test message'}", caplog)
|
||||
assert log_has("Sending rpc message: {'type': strategy_msg, 'msg': 'Test message 2'}", caplog)
|
||||
assert telegram_mock.call_count == 2
|
||||
|
||||
|
||||
def test_send_msg_telegram_enabled(mocker, default_conf, caplog) -> None:
|
||||
telegram_mock = mocker.patch('freqtrade.rpc.telegram.Telegram.send_msg', MagicMock())
|
||||
mocker.patch('freqtrade.rpc.telegram.Telegram._init', MagicMock())
|
||||
telegram_mock = mocker.patch('freqtrade.rpc.telegram.Telegram.send_msg')
|
||||
mocker.patch('freqtrade.rpc.telegram.Telegram._init')
|
||||
|
||||
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
|
||||
rpc_manager = RPCManager(freqtradebot)
|
||||
|
@ -1994,6 +1994,16 @@ def test_startup_notification(default_conf, mocker) -> None:
|
||||
assert msg_mock.call_args[0][0] == '*Custom:* `Hello World`'
|
||||
|
||||
|
||||
def test_send_msg_strategy_msg_notification(default_conf, mocker) -> None:
|
||||
|
||||
telegram, _, msg_mock = get_telegram_testobject(mocker, default_conf)
|
||||
telegram.send_msg({
|
||||
'type': RPCMessageType.STRATEGY_MSG,
|
||||
'msg': 'hello world, Test msg'
|
||||
})
|
||||
assert msg_mock.call_args[0][0] == 'hello world, Test msg'
|
||||
|
||||
|
||||
def test_send_msg_unknown_type(default_conf, mocker) -> None:
|
||||
telegram, _, _ = get_telegram_testobject(mocker, default_conf)
|
||||
with pytest.raises(NotImplementedError, match=r'Unknown message type: None'):
|
||||
|
@ -68,6 +68,12 @@ def test_process_stopped(mocker, default_conf_usdt) -> None:
|
||||
assert coo_mock.call_count == 1
|
||||
|
||||
|
||||
def test_process_calls_sendmsg(mocker, default_conf_usdt) -> None:
|
||||
freqtrade = get_patched_freqtradebot(mocker, default_conf_usdt)
|
||||
freqtrade.process()
|
||||
assert freqtrade.rpc.process_msg_queue.call_count == 1
|
||||
|
||||
|
||||
def test_bot_cleanup(mocker, default_conf_usdt, caplog) -> None:
|
||||
mock_cleanup = mocker.patch('freqtrade.freqtradebot.cleanup_db')
|
||||
coo_mock = mocker.patch('freqtrade.freqtradebot.FreqtradeBot.cancel_all_open_orders')
|
||||
|
Loading…
Reference in New Issue
Block a user