diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index 4ba55a0ec..7a0ddd7ff 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -38,8 +38,7 @@ class FreqaiDataDrawer: """ Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving /loading to/from disk. - This object remains persistent throughout live/dry, unlike FreqaiDataKitchen, which is - reinstantiated for each coin. + This object remains persistent throughout live/dry. Record of contribution: FreqAI was developed by a group of individuals who all contributed specific skillsets to the @@ -56,7 +55,7 @@ class FreqaiDataDrawer: Beta testing and bug reporting: @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm - Juha Nykänen @suikula, Wagner Costa @wagnercosta + Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert """ def __init__(self, full_path: Path, config: dict, follow_mode: bool = False): @@ -85,6 +84,7 @@ class FreqaiDataDrawer: self.load_historic_predictions_from_disk() self.training_queue: Dict[str, int] = {} self.history_lock = threading.Lock() + self.save_lock = threading.Lock() self.old_DBSCAN_eps: Dict[str, float] = {} self.empty_pair_dict: pair_info = { "model_filename": "", "trained_timestamp": 0, @@ -145,9 +145,10 @@ class FreqaiDataDrawer: """ Save data drawer full of all pair model metadata in present model folder. """ - with open(self.pair_dictionary_path, 'w') as fp: - rapidjson.dump(self.pair_dict, fp, default=self.np_encoder, - number_mode=rapidjson.NM_NATIVE) + with self.save_lock: + with open(self.pair_dictionary_path, 'w') as fp: + rapidjson.dump(self.pair_dict, fp, default=self.np_encoder, + number_mode=rapidjson.NM_NATIVE) def save_follower_dict_to_disk(self): """ diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index 730ef2767..cf0af6c08 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -3,7 +3,6 @@ import datetime import logging import shutil from pathlib import Path -from threading import Lock from typing import Any, Dict, List, Tuple import numpy as np @@ -35,6 +34,9 @@ class FreqaiDataKitchen: Class designed to analyze data for a single pair. Employed by the IFreqaiModel class. Functionalities include holding, saving, loading, and analyzing the data. + This object is not persistent, it is reinstantiated for each coin, each time the coin + model needs to be inferenced or trained. + Record of contribution: FreqAI was developed by a group of individuals who all contributed specific skillsets to the project. @@ -50,7 +52,7 @@ class FreqaiDataKitchen: Beta testing and bug reporting: @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm - Juha Nykänen @suikula, Wagner Costa @wagnercosta + Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert """ def __init__( @@ -71,7 +73,6 @@ class FreqaiDataKitchen: self.model_filename: str = "" self.live = live self.pair = pair - self.analysis_lock = Lock() self.svm_model: linear_model.SGDOneClassSVM = None self.keras: bool = self.freqai_config.get("keras", False) @@ -964,7 +965,6 @@ class FreqaiDataKitchen: for tf in tfs: if tf == tfs[-1]: sgi = True # doing this last allows user to use all tf raw prices in labels - with self.analysis_lock: dataframe = strategy.populate_any_indicators( pair, dataframe.copy(), @@ -972,7 +972,6 @@ class FreqaiDataKitchen: informative=base_dataframes[tf], set_generalized_indicators=sgi ) - with self.analysis_lock: if pairs: for i in pairs: if pair in i: diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index e48de1fb8..902604989 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -12,7 +12,7 @@ import numpy as np import pandas as pd from numpy.typing import NDArray from pandas import DataFrame - +from threading import Lock from freqtrade.configuration import TimeRange from freqtrade.enums import RunMode from freqtrade.exceptions import OperationalException @@ -52,7 +52,7 @@ class IFreqaiModel(ABC): Beta testing and bug reporting: @bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm - Juha Nykänen @suikula, Wagner Costa @wagnercosta + Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert """ def __init__(self, config: Dict[str, Any]) -> None: @@ -81,6 +81,7 @@ class IFreqaiModel(ABC): self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist")) self.last_trade_database_summary: DataFrame = {} self.current_trade_database_summary: DataFrame = {} + self.analysis_lock = Lock() def assert_config(self, config: Dict[str, Any]) -> None: @@ -114,10 +115,10 @@ class IFreqaiModel(ABC): elif not self.follow_mode: self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"]) logger.info(f"Training {len(self.dk.training_timeranges)} timeranges") - - dataframe = self.dk.use_strategy_to_populate_indicators( - strategy, prediction_dataframe=dataframe, pair=metadata["pair"] - ) + with self.analysis_lock: + dataframe = self.dk.use_strategy_to_populate_indicators( + strategy, prediction_dataframe=dataframe, pair=metadata["pair"] + ) dk = self.start_backtesting(dataframe, metadata, self.dk) dataframe = dk.remove_features_from_df(dk.return_dataframe) @@ -289,9 +290,10 @@ class IFreqaiModel(ABC): # load the model and associated data into the data kitchen self.model = self.dd.load_data(metadata["pair"], dk) - dataframe = self.dk.use_strategy_to_populate_indicators( - strategy, prediction_dataframe=dataframe, pair=metadata["pair"] - ) + with self.analysis_lock: + dataframe = self.dk.use_strategy_to_populate_indicators( + strategy, prediction_dataframe=dataframe, pair=metadata["pair"] + ) if not self.model: logger.warning( @@ -485,9 +487,10 @@ class IFreqaiModel(ABC): data_load_timerange, pair, dk ) - unfiltered_dataframe = dk.use_strategy_to_populate_indicators( - strategy, corr_dataframes, base_dataframes, pair - ) + with self.analysis_lock: + unfiltered_dataframe = dk.use_strategy_to_populate_indicators( + strategy, corr_dataframes, base_dataframes, pair + ) unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe) @@ -500,7 +503,7 @@ class IFreqaiModel(ABC): dk.set_new_model_names(pair, new_trained_timerange) self.dd.pair_dict[pair]["first"] = False if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning: - with dk.analysis_lock: + with self.analysis_lock: self.dd.pair_to_end_of_training_queue(pair) self.dd.save_data(model, pair, dk)