Improve timing for worker throttling
This commit is contained in:
parent
e969479525
commit
f93b6eec63
@ -14,6 +14,7 @@ from freqtrade.configuration import Configuration
|
|||||||
from freqtrade.constants import PROCESS_THROTTLE_SECS, RETRY_TIMEOUT, Config
|
from freqtrade.constants import PROCESS_THROTTLE_SECS, RETRY_TIMEOUT, Config
|
||||||
from freqtrade.enums import State
|
from freqtrade.enums import State
|
||||||
from freqtrade.exceptions import OperationalException, TemporaryError
|
from freqtrade.exceptions import OperationalException, TemporaryError
|
||||||
|
from freqtrade.exchange import timeframe_to_next_date
|
||||||
from freqtrade.freqtradebot import FreqtradeBot
|
from freqtrade.freqtradebot import FreqtradeBot
|
||||||
|
|
||||||
|
|
||||||
@ -111,7 +112,10 @@ class Worker:
|
|||||||
# Ping systemd watchdog before throttling
|
# Ping systemd watchdog before throttling
|
||||||
self._notify("WATCHDOG=1\nSTATUS=State: RUNNING.")
|
self._notify("WATCHDOG=1\nSTATUS=State: RUNNING.")
|
||||||
|
|
||||||
self._throttle(func=self._process_running, throttle_secs=self._throttle_secs)
|
# Use an offset of 1s to ensure a new candle has been issued
|
||||||
|
self._throttle(func=self._process_running, throttle_secs=self._throttle_secs,
|
||||||
|
timeframe=self._config['timeframe'] if self._config else None,
|
||||||
|
timeframe_offset=1)
|
||||||
|
|
||||||
if self._heartbeat_interval:
|
if self._heartbeat_interval:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
@ -126,19 +130,32 @@ class Worker:
|
|||||||
|
|
||||||
return state
|
return state
|
||||||
|
|
||||||
def _throttle(self, func: Callable[..., Any], throttle_secs: float, *args, **kwargs) -> Any:
|
def _throttle(self, func: Callable[..., Any], throttle_secs: float,
|
||||||
|
timeframe: Optional[str] = None, timeframe_offset: float = 1.0,
|
||||||
|
*args, **kwargs) -> Any:
|
||||||
"""
|
"""
|
||||||
Throttles the given callable that it
|
Throttles the given callable that it
|
||||||
takes at least `min_secs` to finish execution.
|
takes at least `min_secs` to finish execution.
|
||||||
:param func: Any callable
|
:param func: Any callable
|
||||||
:param throttle_secs: throttling interation execution time limit in seconds
|
:param throttle_secs: throttling interation execution time limit in seconds
|
||||||
|
:param timeframe: ensure iteration is executed at the beginning of the next candle.
|
||||||
|
:param timeframe_offset: offset in seconds to apply to the next candle time.
|
||||||
:return: Any (result of execution of func)
|
:return: Any (result of execution of func)
|
||||||
"""
|
"""
|
||||||
last_throttle_start_time = time.time()
|
last_throttle_start_time = time.time()
|
||||||
logger.debug("========================================")
|
logger.debug("========================================")
|
||||||
result = func(*args, **kwargs)
|
result = func(*args, **kwargs)
|
||||||
time_passed = time.time() - last_throttle_start_time
|
time_passed = time.time() - last_throttle_start_time
|
||||||
sleep_duration = max(throttle_secs - time_passed, 0.0)
|
sleep_duration = throttle_secs - time_passed
|
||||||
|
if timeframe:
|
||||||
|
next_tf = timeframe_to_next_date(timeframe)
|
||||||
|
# Maximum throttling should be until new candle arrives
|
||||||
|
# Offset of 0.2s is added to ensure a new candle has been issued.
|
||||||
|
next_tf_with_offset = next_tf.timestamp() - time.time() + timeframe_offset
|
||||||
|
sleep_duration = min(sleep_duration, next_tf_with_offset)
|
||||||
|
sleep_duration = max(sleep_duration, 0.0)
|
||||||
|
# next_iter = datetime.now(timezone.utc) + timedelta(seconds=sleep_duration)
|
||||||
|
|
||||||
logger.debug(f"Throttling with '{func.__name__}()': sleep for {sleep_duration:.2f} s, "
|
logger.debug(f"Throttling with '{func.__name__}()': sleep for {sleep_duration:.2f} s, "
|
||||||
f"last iteration took {time_passed:.2f} s.")
|
f"last iteration took {time_passed:.2f} s.")
|
||||||
self._sleep(sleep_duration)
|
self._sleep(sleep_duration)
|
||||||
|
@ -103,6 +103,16 @@ def test_throttle_sleep_time(mocker, default_conf, caplog) -> None:
|
|||||||
assert sleep_mock.call_count == 1
|
assert sleep_mock.call_count == 1
|
||||||
assert 394.8 < sleep_mock.call_args[0][0] < 395.1
|
assert 394.8 < sleep_mock.call_args[0][0] < 395.1
|
||||||
|
|
||||||
|
t.move_to("2022-09-01 05:01:00 +00:00")
|
||||||
|
|
||||||
|
sleep_mock.reset_mock()
|
||||||
|
# Throttle for more than 5m (1 timeframe)
|
||||||
|
assert worker._throttle(throttled_func, throttle_secs=400, timeframe='5m',
|
||||||
|
timeframe_offset=0.4, x=5) == 42
|
||||||
|
assert sleep_mock.call_count == 1
|
||||||
|
# 300 (5m) - 60 (1m - see set time above) - 5 (duration of throttled_func) = 235
|
||||||
|
assert 235 < sleep_mock.call_args[0][0] < 235.4
|
||||||
|
|
||||||
|
|
||||||
def test_throttle_with_assets(mocker, default_conf) -> None:
|
def test_throttle_with_assets(mocker, default_conf) -> None:
|
||||||
def throttled_func(nb_assets=-1):
|
def throttled_func(nb_assets=-1):
|
||||||
|
Loading…
Reference in New Issue
Block a user