diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index eac5cba07..a9c21fb65 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -7,7 +7,7 @@ import time from abc import ABC, abstractmethod from pathlib import Path from threading import Lock -from typing import Any, Dict, Tuple +from typing import Any, Dict, List, Tuple import numpy as np import pandas as pd @@ -27,13 +27,6 @@ pd.options.mode.chained_assignment = None logger = logging.getLogger(__name__) -def threaded(fn): - def wrapper(*args, **kwargs): - threading.Thread(target=fn, args=args, kwargs=kwargs).start() - - return wrapper - - class IFreqaiModel(ABC): """ Class containing all tools for training and prediction in the strategy. @@ -94,6 +87,9 @@ class IFreqaiModel(ABC): self.begin_time_train: float = 0 self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe']) + self._threads: List[threading.Thread] = [] + self._stop_event = threading.Event() + def assert_config(self, config: Dict[str, Any]) -> None: if not config.get("freqai", {}): @@ -147,15 +143,34 @@ class IFreqaiModel(ABC): self.model = None self.dk = None - @threaded - def start_scanning(self, strategy: IStrategy) -> None: + def shutdown(self): + """ + Cleans up threads on Shutdown, set stop event. Join threads to wait + for current training iteration. + """ + logger.info("Stopping FreqAI") + self._stop_event.set() + + logger.info("Waiting on Training iteration") + for _thread in self._threads: + _thread.join() + + def start_scanning(self, *args, **kwargs) -> None: + """ + Start `self._start_scanning` in a separate thread + """ + _thread = threading.Thread(target=self._start_scanning, args=args, kwargs=kwargs) + self._threads.append(_thread) + _thread.start() + + def _start_scanning(self, strategy: IStrategy) -> None: """ Function designed to constantly scan pairs for retraining on a separate thread (intracandle) to improve model youth. This function is agnostic to data preparation/collection/storage, it simply trains on what ever data is available in the self.dd. :param strategy: IStrategy = The user defined strategy class """ - while 1: + while not self._stop_event.is_set(): time.sleep(1) for pair in self.config.get("exchange", {}).get("pair_whitelist"): diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 37bc6dfed..883417219 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -148,6 +148,8 @@ class FreqtradeBot(LoggingMixin): self.check_for_open_trades() + self.strategy.ft_bot_cleanup() + self.rpc.cleanup() Trade.commit() self.exchange.close() diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index c7ea95bda..562a16b18 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -168,6 +168,10 @@ class IStrategy(ABC, HyperStrategyMixin): raise OperationalException( 'freqAI is not enabled. ' 'Please enable it in your config to use this strategy.') + + def cleanup(self, *args, **kwargs): + pass + self.freqai = DummyClass() # type: ignore def ft_bot_start(self, **kwargs) -> None: @@ -181,6 +185,12 @@ class IStrategy(ABC, HyperStrategyMixin): self.ft_load_hyper_params(self.config.get('runmode') == RunMode.HYPEROPT) + def ft_bot_cleanup(self) -> None: + """ + Clean up FreqAI and child threads + """ + self.freqai.shutdown() + @abstractmethod def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """