From f93b6eec63a9c0dbfd3d34c6592e86f51d9a56c8 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 23 Oct 2022 14:56:51 +0200 Subject: [PATCH] Improve timing for worker throttling --- freqtrade/worker.py | 23 ++++++++++++++++++++--- tests/test_worker.py | 10 ++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/freqtrade/worker.py b/freqtrade/worker.py index a8bb931f8..a407de0d7 100755 --- a/freqtrade/worker.py +++ b/freqtrade/worker.py @@ -14,6 +14,7 @@ from freqtrade.configuration import Configuration from freqtrade.constants import PROCESS_THROTTLE_SECS, RETRY_TIMEOUT, Config from freqtrade.enums import State from freqtrade.exceptions import OperationalException, TemporaryError +from freqtrade.exchange import timeframe_to_next_date from freqtrade.freqtradebot import FreqtradeBot @@ -111,7 +112,10 @@ class Worker: # Ping systemd watchdog before throttling self._notify("WATCHDOG=1\nSTATUS=State: RUNNING.") - self._throttle(func=self._process_running, throttle_secs=self._throttle_secs) + # Use an offset of 1s to ensure a new candle has been issued + self._throttle(func=self._process_running, throttle_secs=self._throttle_secs, + timeframe=self._config['timeframe'] if self._config else None, + timeframe_offset=1) if self._heartbeat_interval: now = time.time() @@ -126,19 +130,32 @@ class Worker: return state - def _throttle(self, func: Callable[..., Any], throttle_secs: float, *args, **kwargs) -> Any: + def _throttle(self, func: Callable[..., Any], throttle_secs: float, + timeframe: Optional[str] = None, timeframe_offset: float = 1.0, + *args, **kwargs) -> Any: """ Throttles the given callable that it takes at least `min_secs` to finish execution. :param func: Any callable :param throttle_secs: throttling interation execution time limit in seconds + :param timeframe: ensure iteration is executed at the beginning of the next candle. + :param timeframe_offset: offset in seconds to apply to the next candle time. :return: Any (result of execution of func) """ last_throttle_start_time = time.time() logger.debug("========================================") result = func(*args, **kwargs) time_passed = time.time() - last_throttle_start_time - sleep_duration = max(throttle_secs - time_passed, 0.0) + sleep_duration = throttle_secs - time_passed + if timeframe: + next_tf = timeframe_to_next_date(timeframe) + # Maximum throttling should be until new candle arrives + # Offset of 0.2s is added to ensure a new candle has been issued. + next_tf_with_offset = next_tf.timestamp() - time.time() + timeframe_offset + sleep_duration = min(sleep_duration, next_tf_with_offset) + sleep_duration = max(sleep_duration, 0.0) + # next_iter = datetime.now(timezone.utc) + timedelta(seconds=sleep_duration) + logger.debug(f"Throttling with '{func.__name__}()': sleep for {sleep_duration:.2f} s, " f"last iteration took {time_passed:.2f} s.") self._sleep(sleep_duration) diff --git a/tests/test_worker.py b/tests/test_worker.py index 8e17f7bc1..2237e7f4c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -103,6 +103,16 @@ def test_throttle_sleep_time(mocker, default_conf, caplog) -> None: assert sleep_mock.call_count == 1 assert 394.8 < sleep_mock.call_args[0][0] < 395.1 + t.move_to("2022-09-01 05:01:00 +00:00") + + sleep_mock.reset_mock() + # Throttle for more than 5m (1 timeframe) + assert worker._throttle(throttled_func, throttle_secs=400, timeframe='5m', + timeframe_offset=0.4, x=5) == 42 + assert sleep_mock.call_count == 1 + # 300 (5m) - 60 (1m - see set time above) - 5 (duration of throttled_func) = 235 + assert 235 < sleep_mock.call_args[0][0] < 235.4 + def test_throttle_with_assets(mocker, default_conf) -> None: def throttled_func(nb_assets=-1):