Merge pull request #2959 from hroff-1902/throttling

Better throttling
This commit is contained in:
Matthias 2020-02-24 06:54:52 +01:00 committed by GitHub
commit 83959f0e56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 67 deletions

View File

@ -6,7 +6,6 @@ import logging
import traceback import traceback
from datetime import datetime from datetime import datetime
from math import isclose from math import isclose
from os import getpid
from threading import Lock from threading import Lock
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
@ -52,10 +51,6 @@ class FreqtradeBot:
# Init objects # Init objects
self.config = config self.config = config
self._heartbeat_msg = 0
self.heartbeat_interval = self.config.get('internals', {}).get('heartbeat_interval', 60)
self.strategy: IStrategy = StrategyResolver.load_strategy(self.config) self.strategy: IStrategy = StrategyResolver.load_strategy(self.config)
# Check config consistency here since strategies can set certain options # Check config consistency here since strategies can set certain options
@ -159,11 +154,6 @@ class FreqtradeBot:
self.check_handle_timedout() self.check_handle_timedout()
Trade.session.flush() Trade.session.flush()
if (self.heartbeat_interval
and (arrow.utcnow().timestamp - self._heartbeat_msg > self.heartbeat_interval)):
logger.info(f"Bot heartbeat. PID={getpid()}")
self._heartbeat_msg = arrow.utcnow().timestamp
def _refresh_whitelist(self, trades: List[Trade] = []) -> List[str]: def _refresh_whitelist(self, trades: List[Trade] = []) -> List[str]:
""" """
Refresh whitelist from pairlist or edge and extend it with trades. Refresh whitelist from pairlist or edge and extend it with trades.

View File

@ -4,6 +4,7 @@ Main Freqtrade worker class.
import logging import logging
import time import time
import traceback import traceback
from os import getpid
from typing import Any, Callable, Dict, Optional from typing import Any, Callable, Dict, Optional
import sdnotify import sdnotify
@ -26,12 +27,15 @@ class Worker:
""" """
Init all variables and objects the bot needs to work Init all variables and objects the bot needs to work
""" """
logger.info('Starting worker %s', __version__) logger.info(f"Starting worker {__version__}")
self._args = args self._args = args
self._config = config self._config = config
self._init(False) self._init(False)
self.last_throttle_start_time: float = 0
self._heartbeat_msg: float = 0
# Tell systemd that we completed initialization phase # Tell systemd that we completed initialization phase
if self._sd_notify: if self._sd_notify:
logger.debug("sd_notify: READY=1") logger.debug("sd_notify: READY=1")
@ -48,10 +52,10 @@ class Worker:
# Init the instance of the bot # Init the instance of the bot
self.freqtrade = FreqtradeBot(self._config) self.freqtrade = FreqtradeBot(self._config)
self._throttle_secs = self._config.get('internals', {}).get( internals_config = self._config.get('internals', {})
'process_throttle_secs', self._throttle_secs = internals_config.get('process_throttle_secs',
constants.PROCESS_THROTTLE_SECS constants.PROCESS_THROTTLE_SECS)
) self._heartbeat_interval = internals_config.get('heartbeat_interval', 60)
self._sd_notify = sdnotify.SystemdNotifier() if \ self._sd_notify = sdnotify.SystemdNotifier() if \
self._config.get('internals', {}).get('sd_notify', False) else None self._config.get('internals', {}).get('sd_notify', False) else None
@ -63,31 +67,33 @@ class Worker:
if state == State.RELOAD_CONF: if state == State.RELOAD_CONF:
self._reconfigure() self._reconfigure()
def _worker(self, old_state: Optional[State], throttle_secs: Optional[float] = None) -> State: def _worker(self, old_state: Optional[State]) -> State:
""" """
Trading routine that must be run at each loop The main routine that runs each throttling iteration and handles the states.
:param old_state: the previous service state from the previous call :param old_state: the previous service state from the previous call
:return: current service state :return: current service state
""" """
state = self.freqtrade.state state = self.freqtrade.state
if throttle_secs is None:
throttle_secs = self._throttle_secs
# Log state transition # Log state transition
if state != old_state: if state != old_state:
self.freqtrade.notify_status(f'{state.name.lower()}') self.freqtrade.notify_status(f'{state.name.lower()}')
logger.info('Changing state to: %s', state.name) logger.info(f"Changing state to: {state.name}")
if state == State.RUNNING: if state == State.RUNNING:
self.freqtrade.startup() self.freqtrade.startup()
# Reset heartbeat timestamp to log the heartbeat message at
# first throttling iteration when the state changes
self._heartbeat_msg = 0
if state == State.STOPPED: if state == State.STOPPED:
# Ping systemd watchdog before sleeping in the stopped state # Ping systemd watchdog before sleeping in the stopped state
if self._sd_notify: if self._sd_notify:
logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.") logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: STOPPED.")
self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.") self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: STOPPED.")
time.sleep(throttle_secs) self._throttle(func=self._process_stopped, throttle_secs=self._throttle_secs)
elif state == State.RUNNING: elif state == State.RUNNING:
# Ping systemd watchdog before throttling # Ping systemd watchdog before throttling
@ -95,28 +101,40 @@ class Worker:
logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.") logger.debug("sd_notify: WATCHDOG=1\\nSTATUS=State: RUNNING.")
self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.") self._sd_notify.notify("WATCHDOG=1\nSTATUS=State: RUNNING.")
self._throttle(func=self._process, min_secs=throttle_secs) self._throttle(func=self._process_running, throttle_secs=self._throttle_secs)
if self._heartbeat_interval:
now = time.time()
if (now - self._heartbeat_msg) > self._heartbeat_interval:
logger.info(f"Bot heartbeat. PID={getpid()}, "
f"version='{__version__}', state='{state.name}'")
self._heartbeat_msg = now
return state return state
def _throttle(self, func: Callable[..., Any], min_secs: float, *args, **kwargs) -> Any: def _throttle(self, func: Callable[..., Any], throttle_secs: float, *args, **kwargs) -> Any:
""" """
Throttles the given callable that it Throttles the given callable that it
takes at least `min_secs` to finish execution. takes at least `min_secs` to finish execution.
:param func: Any callable :param func: Any callable
:param min_secs: minimum execution time in seconds :param throttle_secs: throttling interation execution time limit in seconds
:return: Any :return: Any (result of execution of func)
""" """
start = time.time() self.last_throttle_start_time = time.time()
logger.debug("========================================")
result = func(*args, **kwargs) result = func(*args, **kwargs)
end = time.time() time_passed = time.time() - self.last_throttle_start_time
duration = max(min_secs - (end - start), 0.0) sleep_duration = max(throttle_secs - time_passed, 0.0)
logger.debug('Throttling %s for %.2f seconds', func.__name__, duration) logger.debug(f"Throttling with '{func.__name__}()': sleep for {sleep_duration:.2f} s, "
time.sleep(duration) f"last iteration took {time_passed:.2f} s.")
time.sleep(sleep_duration)
return result return result
def _process(self) -> None: def _process_stopped(self) -> None:
logger.debug("========================================") # Maybe do here something in the future...
pass
def _process_running(self) -> None:
try: try:
self.freqtrade.process() self.freqtrade.process()
except TemporaryError as error: except TemporaryError as error:

View File

@ -782,7 +782,7 @@ def test_process_exchange_failures(default_conf, ticker, mocker) -> None:
worker = Worker(args=None, config=default_conf) worker = Worker(args=None, config=default_conf)
patch_get_signal(worker.freqtrade) patch_get_signal(worker.freqtrade)
worker._process() worker._process_running()
assert sleep_mock.has_calls() assert sleep_mock.has_calls()
@ -799,7 +799,7 @@ def test_process_operational_exception(default_conf, ticker, mocker) -> None:
assert worker.freqtrade.state == State.RUNNING assert worker.freqtrade.state == State.RUNNING
worker._process() worker._process_running()
assert worker.freqtrade.state == State.STOPPED assert worker.freqtrade.state == State.STOPPED
assert 'OperationalException' in msg_mock.call_args_list[-1][0][0]['status'] assert 'OperationalException' in msg_mock.call_args_list[-1][0][0]['status']
@ -3665,30 +3665,6 @@ def test_startup_trade_reinit(default_conf, edge_conf, mocker):
assert reinit_mock.call_count == 0 assert reinit_mock.call_count == 0
def test_process_i_am_alive(default_conf, mocker, caplog):
patch_RPCManager(mocker)
patch_exchange(mocker)
mocker.patch('freqtrade.exchange.Exchange.exchange_has', MagicMock(return_value=True))
ftbot = get_patched_freqtradebot(mocker, default_conf)
message = r"Bot heartbeat\. PID=.*"
ftbot.process()
assert log_has_re(message, caplog)
assert ftbot._heartbeat_msg != 0
caplog.clear()
# Message is not shown before interval is up
ftbot.process()
assert not log_has_re(message, caplog)
caplog.clear()
# Set clock - 70 seconds
ftbot._heartbeat_msg -= 70
ftbot.process()
assert log_has_re(message, caplog)
@pytest.mark.usefixtures("init_persistence") @pytest.mark.usefixtures("init_persistence")
def test_sync_wallet_dry_run(mocker, default_conf, ticker, fee, limit_buy_order, caplog): def test_sync_wallet_dry_run(mocker, default_conf, ticker, fee, limit_buy_order, caplog):
default_conf['dry_run'] = True default_conf['dry_run'] = True

View File

@ -5,7 +5,7 @@ from unittest.mock import MagicMock, PropertyMock
from freqtrade.data.dataprovider import DataProvider from freqtrade.data.dataprovider import DataProvider
from freqtrade.state import State from freqtrade.state import State
from freqtrade.worker import Worker from freqtrade.worker import Worker
from tests.conftest import get_patched_worker, log_has from tests.conftest import get_patched_worker, log_has, log_has_re
def test_worker_state(mocker, default_conf, markets) -> None: def test_worker_state(mocker, default_conf, markets) -> None:
@ -38,15 +38,13 @@ def test_worker_running(mocker, default_conf, caplog) -> None:
def test_worker_stopped(mocker, default_conf, caplog) -> None: def test_worker_stopped(mocker, default_conf, caplog) -> None:
mock_throttle = MagicMock() mock_throttle = MagicMock()
mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle) mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle)
mock_sleep = mocker.patch('time.sleep', return_value=None)
worker = get_patched_worker(mocker, default_conf) worker = get_patched_worker(mocker, default_conf)
worker.freqtrade.state = State.STOPPED worker.freqtrade.state = State.STOPPED
state = worker._worker(old_state=State.RUNNING) state = worker._worker(old_state=State.RUNNING)
assert state is State.STOPPED assert state is State.STOPPED
assert log_has('Changing state to: STOPPED', caplog) assert log_has('Changing state to: STOPPED', caplog)
assert mock_throttle.call_count == 0 assert mock_throttle.call_count == 1
assert mock_sleep.call_count == 1
def test_throttle(mocker, default_conf, caplog) -> None: def test_throttle(mocker, default_conf, caplog) -> None:
@ -57,14 +55,14 @@ def test_throttle(mocker, default_conf, caplog) -> None:
worker = get_patched_worker(mocker, default_conf) worker = get_patched_worker(mocker, default_conf)
start = time.time() start = time.time()
result = worker._throttle(throttled_func, min_secs=0.1) result = worker._throttle(throttled_func, throttle_secs=0.1)
end = time.time() end = time.time()
assert result == 42 assert result == 42
assert end - start > 0.1 assert end - start > 0.1
assert log_has('Throttling throttled_func for 0.10 seconds', caplog) assert log_has_re(r"Throttling with 'throttled_func\(\)': sleep for 0\.10 s.*", caplog)
result = worker._throttle(throttled_func, min_secs=-1) result = worker._throttle(throttled_func, throttle_secs=-1)
assert result == 42 assert result == 42
@ -74,8 +72,54 @@ def test_throttle_with_assets(mocker, default_conf) -> None:
worker = get_patched_worker(mocker, default_conf) worker = get_patched_worker(mocker, default_conf)
result = worker._throttle(throttled_func, min_secs=0.1, nb_assets=666) result = worker._throttle(throttled_func, throttle_secs=0.1, nb_assets=666)
assert result == 666 assert result == 666
result = worker._throttle(throttled_func, min_secs=0.1) result = worker._throttle(throttled_func, throttle_secs=0.1)
assert result == -1 assert result == -1
def test_worker_heartbeat_running(default_conf, mocker, caplog):
message = r"Bot heartbeat\. PID=.*state='RUNNING'"
mock_throttle = MagicMock()
mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle)
worker = get_patched_worker(mocker, default_conf)
worker.freqtrade.state = State.RUNNING
worker._worker(old_state=State.STOPPED)
assert log_has_re(message, caplog)
caplog.clear()
# Message is not shown before interval is up
worker._worker(old_state=State.RUNNING)
assert not log_has_re(message, caplog)
caplog.clear()
# Set clock - 70 seconds
worker._heartbeat_msg -= 70
worker._worker(old_state=State.RUNNING)
assert log_has_re(message, caplog)
def test_worker_heartbeat_stopped(default_conf, mocker, caplog):
message = r"Bot heartbeat\. PID=.*state='STOPPED'"
mock_throttle = MagicMock()
mocker.patch('freqtrade.worker.Worker._throttle', mock_throttle)
worker = get_patched_worker(mocker, default_conf)
worker.freqtrade.state = State.STOPPED
worker._worker(old_state=State.RUNNING)
assert log_has_re(message, caplog)
caplog.clear()
# Message is not shown before interval is up
worker._worker(old_state=State.STOPPED)
assert not log_has_re(message, caplog)
caplog.clear()
# Set clock - 70 seconds
worker._heartbeat_msg -= 70
worker._worker(old_state=State.STOPPED)
assert log_has_re(message, caplog)