From dae3b3d86adde0f6c7065ce1d083d9ceac62e5ef Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sat, 3 Sep 2022 13:24:14 -0600 Subject: [PATCH 1/3] support shutting down freqai --- freqtrade/freqai/freqai_interface.py | 37 +++++++++++++++++++--------- freqtrade/freqtradebot.py | 2 ++ freqtrade/strategy/interface.py | 10 ++++++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index eac5cba07..a9c21fb65 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -7,7 +7,7 @@ import time from abc import ABC, abstractmethod from pathlib import Path from threading import Lock -from typing import Any, Dict, Tuple +from typing import Any, Dict, List, Tuple import numpy as np import pandas as pd @@ -27,13 +27,6 @@ pd.options.mode.chained_assignment = None logger = logging.getLogger(__name__) -def threaded(fn): - def wrapper(*args, **kwargs): - threading.Thread(target=fn, args=args, kwargs=kwargs).start() - - return wrapper - - class IFreqaiModel(ABC): """ Class containing all tools for training and prediction in the strategy. @@ -94,6 +87,9 @@ class IFreqaiModel(ABC): self.begin_time_train: float = 0 self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe']) + self._threads: List[threading.Thread] = [] + self._stop_event = threading.Event() + def assert_config(self, config: Dict[str, Any]) -> None: if not config.get("freqai", {}): @@ -147,15 +143,34 @@ class IFreqaiModel(ABC): self.model = None self.dk = None - @threaded - def start_scanning(self, strategy: IStrategy) -> None: + def shutdown(self): + """ + Cleans up threads on Shutdown, set stop event. Join threads to wait + for current training iteration. + """ + logger.info("Stopping FreqAI") + self._stop_event.set() + + logger.info("Waiting on Training iteration") + for _thread in self._threads: + _thread.join() + + def start_scanning(self, *args, **kwargs) -> None: + """ + Start `self._start_scanning` in a separate thread + """ + _thread = threading.Thread(target=self._start_scanning, args=args, kwargs=kwargs) + self._threads.append(_thread) + _thread.start() + + def _start_scanning(self, strategy: IStrategy) -> None: """ Function designed to constantly scan pairs for retraining on a separate thread (intracandle) to improve model youth. This function is agnostic to data preparation/collection/storage, it simply trains on what ever data is available in the self.dd. :param strategy: IStrategy = The user defined strategy class """ - while 1: + while not self._stop_event.is_set(): time.sleep(1) for pair in self.config.get("exchange", {}).get("pair_whitelist"): diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 37bc6dfed..883417219 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -148,6 +148,8 @@ class FreqtradeBot(LoggingMixin): self.check_for_open_trades() + self.strategy.ft_bot_cleanup() + self.rpc.cleanup() Trade.commit() self.exchange.close() diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index c7ea95bda..562a16b18 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -168,6 +168,10 @@ class IStrategy(ABC, HyperStrategyMixin): raise OperationalException( 'freqAI is not enabled. ' 'Please enable it in your config to use this strategy.') + + def cleanup(self, *args, **kwargs): + pass + self.freqai = DummyClass() # type: ignore def ft_bot_start(self, **kwargs) -> None: @@ -181,6 +185,12 @@ class IStrategy(ABC, HyperStrategyMixin): self.ft_load_hyper_params(self.config.get('runmode') == RunMode.HYPEROPT) + def ft_bot_cleanup(self) -> None: + """ + Clean up FreqAI and child threads + """ + self.freqai.shutdown() + @abstractmethod def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame: """ From 3b5e5fc57b7a11402c76f13bddede071bd9eaf9e Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Sat, 3 Sep 2022 14:10:23 -0600 Subject: [PATCH 2/3] fix method name in dummy class --- freqtrade/strategy/interface.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 562a16b18..70cc7fdb3 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -169,7 +169,7 @@ class IStrategy(ABC, HyperStrategyMixin): 'freqAI is not enabled. ' 'Please enable it in your config to use this strategy.') - def cleanup(self, *args, **kwargs): + def shutdown(self, *args, **kwargs): pass self.freqai = DummyClass() # type: ignore From ec76214d023a6c53ffab0af8d43bc5b72b1d66af Mon Sep 17 00:00:00 2001 From: robcaulk Date: Sun, 4 Sep 2022 15:56:07 +0200 Subject: [PATCH 3/3] backup historical predictions pickle and load the backup in case of corruption --- freqtrade/freqai/data_drawer.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index b6a1a15d7..9eeabef8f 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -76,6 +76,8 @@ class FreqaiDataDrawer: self.full_path / f"follower_dictionary-{self.follower_name}.json" ) self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl") + self.historic_predictions_bkp_path = Path( + self.full_path / "historic_predictions.backup.pkl") self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json") self.follow_mode = follow_mode if follow_mode: @@ -118,13 +120,21 @@ class FreqaiDataDrawer: """ exists = self.historic_predictions_path.is_file() if exists: - with open(self.historic_predictions_path, "rb") as fp: - self.historic_predictions = cloudpickle.load(fp) - logger.info( - f"Found existing historic predictions at {self.full_path}, but beware " - "that statistics may be inaccurate if the bot has been offline for " - "an extended period of time." - ) + try: + with open(self.historic_predictions_path, "rb") as fp: + self.historic_predictions = cloudpickle.load(fp) + logger.info( + f"Found existing historic predictions at {self.full_path}, but beware " + "that statistics may be inaccurate if the bot has been offline for " + "an extended period of time." + ) + except EOFError: + logger.warning( + 'Historical prediction file was corrupted. Trying to load backup file.') + with open(self.historic_predictions_bkp_path, "rb") as fp: + self.historic_predictions = cloudpickle.load(fp) + logger.warning('FreqAI successfully loaded the backup historical predictions file.') + elif not self.follow_mode: logger.info("Could not find existing historic_predictions, starting from scratch") else: @@ -142,6 +152,9 @@ class FreqaiDataDrawer: with open(self.historic_predictions_path, "wb") as fp: cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL) + # create a backup + shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path) + def save_drawer_to_disk(self): """ Save data drawer full of all pair model metadata in present model folder.