From 4d472a0ea178b1619bb1bd42b0a4ad147d04ece8 Mon Sep 17 00:00:00 2001 From: robcaulk Date: Tue, 7 Jun 2022 22:14:01 -0600 Subject: [PATCH] merging datarehaul into scanning branch --- freqtrade/freqai/data_kitchen.py | 27 +++--- freqtrade/freqai/freqai_interface.py | 139 +++++++++++++++------------ freqtrade/strategy/interface.py | 2 +- 3 files changed, 94 insertions(+), 74 deletions(-) diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index e7ef69903..afc55a1a2 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -71,7 +71,7 @@ class FreqaiDataKitchen: self.data_drawer = data_drawer - def set_paths(self, metadata: dict, trained_timestamp: int = None,) -> None: + def set_paths(self, pair: str, trained_timestamp: int = None,) -> None: """ Set the paths to the data for the present coin/botloop :params: @@ -83,7 +83,7 @@ class FreqaiDataKitchen: str(self.freqai_config.get('identifier'))) self.data_path = Path(self.full_path / str("sub-train" + "-" + - metadata['pair'].split("/")[0] + + pair.split("/")[0] + str(trained_timestamp))) return @@ -796,12 +796,12 @@ class FreqaiDataKitchen: return retrain, trained_timerange, data_load_timerange - def set_new_model_names(self, metadata: dict, trained_timerange: TimeRange): + def set_new_model_names(self, pair: str, trained_timerange: TimeRange): - coin, _ = metadata['pair'].split("/") + coin, _ = pair.split("/") # set the new data_path self.data_path = Path(self.full_path / str("sub-train" + "-" + - metadata['pair'].split("/")[0] + + pair.split("/")[0] + str(int(trained_timerange.stopts)))) self.model_filename = "cb_" + coin.lower() + "_" + str(int(trained_timerange.stopts)) @@ -918,7 +918,7 @@ class FreqaiDataKitchen: 'trading_mode', 'spot')) def get_base_and_corr_dataframes(self, timerange: TimeRange, - metadata: dict) -> Tuple[Dict[Any, Any], Dict[Any, Any]]: + pair: str) -> Tuple[Dict[Any, Any], Dict[Any, Any]]: """ Searches through our historic_data in memory and returns the dataframes relevant to the present pair. @@ -927,6 +927,7 @@ class FreqaiDataKitchen: for training according to user defined train_period metadata: dict = strategy furnished pair metadata """ + with self.data_drawer.history_lock: corr_dataframes: Dict[Any, Any] = {} base_dataframes: Dict[Any, Any] = {} @@ -940,7 +941,7 @@ class FreqaiDataKitchen: ) if pairs: for p in pairs: - if metadata['pair'] in p: + if pair in p: continue # dont repeat anything from whitelist if p not in corr_dataframes: corr_dataframes[p] = {} @@ -984,7 +985,7 @@ class FreqaiDataKitchen: def use_strategy_to_populate_indicators(self, strategy: IStrategy, corr_dataframes: dict, base_dataframes: dict, - metadata: dict) -> DataFrame: + pair: str) -> DataFrame: """ Use the user defined strategy for populating indicators during retrain @@ -1003,19 +1004,19 @@ class FreqaiDataKitchen: for tf in self.freqai_config.get("timeframes"): dataframe = strategy.populate_any_indicators( - metadata, - metadata['pair'], + pair, + pair, dataframe.copy(), tf, base_dataframes[tf], - coin=metadata['pair'].split("/")[0] + "-" + coin=pair.split("/")[0] + "-" ) if pairs: for i in pairs: - if metadata['pair'] in i: + if pair in i: continue # dont repeat anything from whitelist dataframe = strategy.populate_any_indicators( - metadata, + pair, i, dataframe.copy(), tf, diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index 1f194860d..dc7b3a750 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -63,6 +63,8 @@ class IFreqaiModel(ABC): self.lock = threading.Lock() self.follow_mode = self.freqai_info.get('follow_mode', False) self.identifier = self.freqai_info.get('identifier', 'no_id_provided') + self.scanning = False + self.ready_to_scan = False def assert_config(self, config: Dict[str, Any]) -> None: @@ -91,17 +93,9 @@ class IFreqaiModel(ABC): # and we keep the flag self.training_on_separate_threaad in the current object to help # determine what the current pair will do if self.live: - if (not self.training_on_separate_thread and - self.data_drawer.pair_dict[metadata['pair']]['priority'] == 1): - - self.dh = FreqaiDataKitchen(self.config, self.data_drawer, - self.live, metadata["pair"]) - dh = self.start_live(dataframe, metadata, strategy, self.dh, trainable=True) - else: - # we will have at max 2 separate instances of the kitchen at once. - self.dh_fg = FreqaiDataKitchen(self.config, self.data_drawer, - self.live, metadata["pair"]) - dh = self.start_live(dataframe, metadata, strategy, self.dh_fg, trainable=False) + self.dh = FreqaiDataKitchen(self.config, self.data_drawer, + self.live, metadata["pair"]) + dh = self.start_live(dataframe, metadata, strategy, self.dh) # For backtesting, each pair enters and then gets trained for each window along the # sliding window defined by "train_period" (training window) and "backtest_period" @@ -114,8 +108,36 @@ class IFreqaiModel(ABC): dh = self.start_backtesting(dataframe, metadata, self.dh) return self.return_values(dataframe, dh) - # return (dh.full_predictions, dh.full_do_predict, - # dh.full_target_mean, dh.full_target_std) + + @threaded + def start_scanning(self, strategy: IStrategy) -> None: + while 1: + for pair in self.config.get('exchange', {}).get('pair_whitelist'): + if self.data_drawer.pair_dict[pair]['priority'] != 1: + continue + dh = FreqaiDataKitchen(self.config, self.data_drawer, + self.live, pair) + + (model_filename, + trained_timestamp, + _, _) = self.data_drawer.get_pair_dict_info(pair) + + file_exists = False + + # dh.set_paths(pair, trained_timestamp) + file_exists = self.model_exists(pair, + dh, + trained_timestamp=trained_timestamp, + model_filename=model_filename) + + (self.retrain, + new_trained_timerange, + data_load_timerange) = dh.check_if_new_training_required(trained_timestamp) + dh.set_paths(pair, new_trained_timerange.stopts) + + if self.retrain or not file_exists: + self.train_model_in_series(new_trained_timerange, pair, + strategy, dh, data_load_timerange) def start_backtesting(self, dataframe: DataFrame, metadata: dict, dh: FreqaiDataKitchen) -> FreqaiDataKitchen: @@ -142,7 +164,7 @@ class IFreqaiModel(ABC): for tr_train, tr_backtest in zip( dh.training_timeranges, dh.backtesting_timeranges ): - (_, _, _, _) = self.data_drawer.get_pair_dict_info(metadata) + (_, _, _, _) = self.data_drawer.get_pair_dict_info(metadata['pair']) gc.collect() dh.data = {} # clean the pair specific data between training window sliding self.training_timerange = tr_train @@ -163,7 +185,7 @@ class IFreqaiModel(ABC): str(int(trained_timestamp.stopts)))) if not self.model_exists(metadata["pair"], dh, trained_timestamp=trained_timestamp.stopts): - self.model = self.train(dataframe_train, metadata, dh) + self.model = self.train(dataframe_train, metadata['pair'], dh) self.data_drawer.pair_dict[metadata['pair']][ 'trained_timestamp'] = trained_timestamp.stopts dh.set_new_model_names(metadata, trained_timestamp) @@ -184,8 +206,7 @@ class IFreqaiModel(ABC): return dh def start_live(self, dataframe: DataFrame, metadata: dict, - strategy: IStrategy, dh: FreqaiDataKitchen, - trainable: bool) -> FreqaiDataKitchen: + strategy: IStrategy, dh: FreqaiDataKitchen) -> FreqaiDataKitchen: """ The main broad execution for dry/live. This function will check if a retraining should be performed, and if so, retrain and reset the model. @@ -203,10 +224,10 @@ class IFreqaiModel(ABC): self.data_drawer.update_follower_metadata() # get the model metadata associated with the current pair - (model_filename, + (_, trained_timestamp, coin_first, - return_null_array) = self.data_drawer.get_pair_dict_info(metadata) + return_null_array) = self.data_drawer.get_pair_dict_info(metadata['pair']) # if the metadata doesnt exist, the follower returns null arrays to strategy if self.follow_mode and return_null_array: @@ -222,20 +243,18 @@ class IFreqaiModel(ABC): # if trainable, check if model needs training, if so compute new timerange, # then save model and metadata. # if not trainable, load existing data - if (trainable or coin_first) and not self.follow_mode: - file_exists = False - - if trained_timestamp != 0: # historical model available - dh.set_paths(metadata, trained_timestamp) - file_exists = self.model_exists(metadata['pair'], - dh, - trained_timestamp=trained_timestamp, - model_filename=model_filename) + if not self.follow_mode: + # if trained_timestamp != 0: # historical model available + # dh.set_paths(metadata['pair'], trained_timestamp) + # # file_exists = self.model_exists(metadata['pair'], + # # dh, + # # trained_timestamp=trained_timestamp, + # # model_filename=model_filename) (self.retrain, new_trained_timerange, data_load_timerange) = dh.check_if_new_training_required(trained_timestamp) - dh.set_paths(metadata, new_trained_timerange.stopts) + dh.set_paths(metadata['pair'], new_trained_timerange.stopts) # download candle history if it is not already in memory if not self.data_drawer.historic_data: @@ -246,20 +265,17 @@ class IFreqaiModel(ABC): dh.load_all_pair_histories(data_load_timerange) # train the model on the trained timerange - if self.retrain or not file_exists: - if coin_first: - self.train_model_in_series(new_trained_timerange, metadata, - strategy, dh, data_load_timerange) - else: - self.training_on_separate_thread = True # acts like a lock - self.retrain_model_on_separate_thread(new_trained_timerange, - metadata, strategy, - dh, data_load_timerange) + if coin_first and not self.scanning: + self.train_model_in_series(new_trained_timerange, metadata['pair'], + strategy, dh, data_load_timerange) + elif not coin_first and not self.scanning: + self.scanning = True + self.start_scanning(strategy) - elif not trainable and not self.follow_mode: - logger.info(f'{metadata["pair"]} holds spot ' - f'{self.data_drawer.pair_dict[metadata["pair"]]["priority"]} ' - 'in training queue') + # elif not trainable and not self.follow_mode: + # logger.info(f'{metadata["pair"]} holds spot ' + # f'{self.data_drawer.pair_dict[metadata["pair"]]["priority"]} ' + # 'in training queue') elif self.follow_mode: dh.set_paths(metadata, trained_timestamp) logger.info('FreqAI instance set to follow_mode, finding existing pair' @@ -382,7 +398,7 @@ class IFreqaiModel(ABC): str(self.freqai_info.get('identifier'))) @threaded - def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, metadata: dict, + def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, pair: str, strategy: IStrategy, dh: FreqaiDataKitchen, data_load_timerange: TimeRange): """ @@ -403,14 +419,14 @@ class IFreqaiModel(ABC): # metadata) corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange, - metadata) + pair) # protecting from common benign errors associated with grabbing new data from exchange: try: unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy, corr_dataframes, base_dataframes, - metadata) + pair) unfiltered_dataframe = dh.slice_dataframe(new_trained_timerange, unfiltered_dataframe) except Exception as err: @@ -420,23 +436,23 @@ class IFreqaiModel(ABC): return try: - model = self.train(unfiltered_dataframe, metadata, dh) + model = self.train(unfiltered_dataframe, pair, dh) except ValueError: logger.warning('Value error encountered during training') self.training_on_separate_thread = False self.retrain = False return - self.data_drawer.pair_dict[metadata['pair']][ + self.data_drawer.pair_dict[pair][ 'trained_timestamp'] = new_trained_timerange.stopts - dh.set_new_model_names(metadata, new_trained_timerange) + dh.set_new_model_names(pair, new_trained_timerange) # logger.info('Training queue' # f'{sorted(self.data_drawer.pair_dict.items(), key=lambda item: item[1])}') - if self.data_drawer.pair_dict[metadata['pair']]['priority'] == 1: + if self.data_drawer.pair_dict[pair]['priority'] == 1: with self.lock: - self.data_drawer.pair_to_end_of_training_queue(metadata['pair']) - dh.save_data(model, coin=metadata['pair']) + self.data_drawer.pair_to_end_of_training_queue(pair) + dh.save_data(model, coin=pair) self.training_on_separate_thread = False self.retrain = False @@ -446,7 +462,7 @@ class IFreqaiModel(ABC): return - def train_model_in_series(self, new_trained_timerange: TimeRange, metadata: dict, + def train_model_in_series(self, new_trained_timerange: TimeRange, pair: str, strategy: IStrategy, dh: FreqaiDataKitchen, data_load_timerange: TimeRange): """ @@ -464,29 +480,32 @@ class IFreqaiModel(ABC): # corr_dataframes, base_dataframes = dh.load_pairs_histories(data_load_timerange, # metadata) corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange, - metadata) + pair) unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy, corr_dataframes, base_dataframes, - metadata) + pair) unfiltered_dataframe = dh.slice_dataframe(new_trained_timerange, unfiltered_dataframe) - model = self.train(unfiltered_dataframe, metadata, dh) + model = self.train(unfiltered_dataframe, pair, dh) - self.data_drawer.pair_dict[metadata['pair']][ + self.data_drawer.pair_dict[pair][ 'trained_timestamp'] = new_trained_timerange.stopts - dh.set_new_model_names(metadata, new_trained_timerange) - self.data_drawer.pair_dict[metadata['pair']]['first'] = False - dh.save_data(model, coin=metadata['pair']) + dh.set_new_model_names(pair, new_trained_timerange) + self.data_drawer.pair_dict[pair]['first'] = False + if self.data_drawer.pair_dict[pair]['priority'] == 1 and self.scanning: + with self.lock: + self.data_drawer.pair_to_end_of_training_queue(pair) + dh.save_data(model, coin=pair) self.retrain = False # Following methods which are overridden by user made prediction models. # See freqai/prediction_models/CatboostPredictionModlel.py for an example. @abstractmethod - def train(self, unfiltered_dataframe: DataFrame, metadata: dict, dh: FreqaiDataKitchen) -> Any: + def train(self, unfiltered_dataframe: DataFrame, pair: str, dh: FreqaiDataKitchen) -> Any: """ Filter the training data and train a model to it. Train makes heavy use of the datahandler for storing, saving, loading, and analyzing the data. diff --git a/freqtrade/strategy/interface.py b/freqtrade/strategy/interface.py index 6237e3397..bfecad495 100644 --- a/freqtrade/strategy/interface.py +++ b/freqtrade/strategy/interface.py @@ -532,7 +532,7 @@ class IStrategy(ABC, HyperStrategyMixin): """ return None - def populate_any_indicators(self, metadata: dict, pair: str, df: DataFrame, tf: str, + def populate_any_indicators(self, basepair: str, pair: str, df: DataFrame, tf: str, informative: DataFrame = None, coin: str = "") -> DataFrame: """ Function designed to automatically generate, name and merge features