Merge pull request #7135 from freqtrade/rpc/sendmsg

Strategy allow rpc messages
This commit is contained in:
Matthias
2022-07-30 16:15:00 +02:00
committed by GitHub
12 changed files with 123 additions and 4 deletions

View File

@@ -1,6 +1,7 @@
# pragma pylint: disable=missing-docstring, C0103
import logging
import time
from collections import deque
from unittest.mock import MagicMock
from freqtrade.enums import RPCMessageType
@@ -81,9 +82,25 @@ def test_send_msg_telegram_disabled(mocker, default_conf, caplog) -> None:
assert telegram_mock.call_count == 0
def test_process_msg_queue(mocker, default_conf, caplog) -> None:
telegram_mock = mocker.patch('freqtrade.rpc.telegram.Telegram.send_msg')
mocker.patch('freqtrade.rpc.telegram.Telegram._init')
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
rpc_manager = RPCManager(freqtradebot)
queue = deque()
queue.append('Test message')
queue.append('Test message 2')
rpc_manager.process_msg_queue(queue)
assert log_has("Sending rpc message: {'type': strategy_msg, 'msg': 'Test message'}", caplog)
assert log_has("Sending rpc message: {'type': strategy_msg, 'msg': 'Test message 2'}", caplog)
assert telegram_mock.call_count == 2
def test_send_msg_telegram_enabled(mocker, default_conf, caplog) -> None:
telegram_mock = mocker.patch('freqtrade.rpc.telegram.Telegram.send_msg', MagicMock())
mocker.patch('freqtrade.rpc.telegram.Telegram._init', MagicMock())
telegram_mock = mocker.patch('freqtrade.rpc.telegram.Telegram.send_msg')
mocker.patch('freqtrade.rpc.telegram.Telegram._init')
freqtradebot = get_patched_freqtradebot(mocker, default_conf)
rpc_manager = RPCManager(freqtradebot)

View File

@@ -1994,6 +1994,16 @@ def test_startup_notification(default_conf, mocker) -> None:
assert msg_mock.call_args[0][0] == '*Custom:* `Hello World`'
def test_send_msg_strategy_msg_notification(default_conf, mocker) -> None:
telegram, _, msg_mock = get_telegram_testobject(mocker, default_conf)
telegram.send_msg({
'type': RPCMessageType.STRATEGY_MSG,
'msg': 'hello world, Test msg'
})
assert msg_mock.call_args[0][0] == 'hello world, Test msg'
def test_send_msg_unknown_type(default_conf, mocker) -> None:
telegram, _, _ = get_telegram_testobject(mocker, default_conf)
with pytest.raises(NotImplementedError, match=r'Unknown message type: None'):