diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index 8839317f8..17f3fc2ee 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -1,6 +1,5 @@ import collections -import copy import json import logging import re @@ -11,6 +10,7 @@ from typing import Any, Dict, Tuple # import pickle as pk import numpy as np +import pandas as pd from pandas import DataFrame @@ -163,18 +163,13 @@ class FreqaiDataDrawer: # send pair to end of queue self.pair_dict[pair]['priority'] = len(self.pair_dict) - def set_initial_return_values(self, pair: str, dh): + def set_initial_return_values(self, pair: str, dh, dataframe: DataFrame) -> None: - self.model_return_values[pair] = {} - self.model_return_values[pair]['predictions'] = dh.full_predictions - self.model_return_values[pair]['do_preds'] = dh.full_do_predict - self.model_return_values[pair]['target_mean'] = dh.full_target_mean - self.model_return_values[pair]['target_std'] = dh.full_target_std + self.model_return_values[pair] = dataframe + self.model_return_values[pair]['target_mean'] = dh.data['target_mean'] + self.model_return_values[pair]['target_std'] = dh.data['target_std'] if self.freqai_info.get('feature_parameters', {}).get('DI_threshold', 0) > 0: - self.model_return_values[pair]['DI_values'] = dh.full_DI_values - - # if not self.follow_mode: - # self.save_model_return_values_to_disk() + self.model_return_values[pair]['DI_values'] = dh.DI_values def append_model_predictions(self, pair: str, predictions, do_preds, target_mean, target_std, dh, len_df) -> None: @@ -182,7 +177,7 @@ class FreqaiDataDrawer: # strat seems to feed us variable sized dataframes - and since we are trying to build our # own return array in the same shape, we need to figure out how the size has changed # and adapt our stored/returned info accordingly. - length_difference = len(self.model_return_values[pair]['predictions']) - len_df + length_difference = len(self.model_return_values[pair]['prediction']) - len_df i = 0 if length_difference == 0: @@ -190,51 +185,29 @@ class FreqaiDataDrawer: elif length_difference > 0: i = length_difference + 1 - self.model_return_values[pair]['predictions'] = np.append( - self.model_return_values[pair]['predictions'][i:], predictions[-1]) + df = self.model_return_values[pair].shift(-i) + + df['prediction'].iloc[-1] = predictions[-1] + df['do_predict'].iloc[-1] = do_preds[-1] + df['target_mean'].iloc[-1] = target_mean + df['target_std'].iloc[-1] = target_std if self.freqai_info.get('feature_parameters', {}).get('DI_threshold', 0) > 0: - self.model_return_values[pair]['DI_values'] = np.append( - self.model_return_values[pair]['DI_values'][i:], dh.DI_values[-1]) - self.model_return_values[pair]['do_preds'] = np.append( - self.model_return_values[pair]['do_preds'][i:], do_preds[-1]) - self.model_return_values[pair]['target_mean'] = np.append( - self.model_return_values[pair]['target_mean'][i:], target_mean) - self.model_return_values[pair]['target_std'] = np.append( - self.model_return_values[pair]['target_std'][i:], target_std) + df['DI_values'].iloc[-1] = dh.DI_values[-1] if length_difference < 0: - prepend = np.zeros(abs(length_difference) - 1) - self.model_return_values[pair]['predictions'] = np.insert( - self.model_return_values[pair]['predictions'], 0, prepend) - if self.freqai_info.get('feature_parameters', {}).get('DI_threshold', 0) > 0: - self.model_return_values[pair]['DI_values'] = np.insert( - self.model_return_values[pair]['DI_values'], 0, prepend) - self.model_return_values[pair]['do_preds'] = np.insert( - self.model_return_values[pair]['do_preds'], 0, prepend) - self.model_return_values[pair]['target_mean'] = np.insert( - self.model_return_values[pair]['target_mean'], 0, prepend) - self.model_return_values[pair]['target_std'] = np.insert( - self.model_return_values[pair]['target_std'], 0, prepend) - - dh.full_predictions = copy.deepcopy(self.model_return_values[pair]['predictions']) - dh.full_do_predict = copy.deepcopy(self.model_return_values[pair]['do_preds']) - dh.full_target_mean = copy.deepcopy(self.model_return_values[pair]['target_mean']) - dh.full_target_std = copy.deepcopy(self.model_return_values[pair]['target_std']) - if self.freqai_info.get('feature_parameters', {}).get('DI_threshold', 0) > 0: - dh.full_DI_values = copy.deepcopy(self.model_return_values[pair]['DI_values']) - - # if not self.follow_mode: - # self.save_model_return_values_to_disk() + prepend_df = pd.DataFrame(np.zeros((abs(length_difference) - 1, len(df.columns))), + columns=df.columns) + df = pd.concat([prepend_df, df], axis=0) def return_null_values_to_strategy(self, dataframe: DataFrame, dh) -> None: - len_df = len(dataframe) - dh.full_predictions = np.zeros(len_df) - dh.full_do_predict = np.zeros(len_df) - dh.full_target_mean = np.zeros(len_df) - dh.full_target_std = np.zeros(len_df) + dataframe['prediction'] = 0 + dataframe['do_predict'] = 0 + dataframe['target_mean'] = 0 + dataframe['target_std'] = 0 + if self.freqai_info.get('feature_parameters', {}).get('DI_threshold', 0) > 0: - dh.full_DI_values = np.zeros(len_df) + dataframe['DI_value'] = 0 def purge_old_models(self) -> None: diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index 7f3d80c5e..599706636 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -88,7 +88,7 @@ class FreqaiDataKitchen: return - def save_data(self, model: Any, coin: str = '') -> None: + def save_data(self, model: Any, coin: str = '', keras_model=False) -> None: """ Saves all data associated with a model for a single sub-train time range :params: @@ -102,7 +102,10 @@ class FreqaiDataKitchen: save_path = Path(self.data_path) # Save the trained model - dump(model, save_path / str(self.model_filename + "_model.joblib")) + if not keras_model: + dump(model, save_path / str(self.model_filename + "_model.joblib")) + else: + model.save(save_path / str(self.model_filename + "_model.h5")) if self.svm_model is not None: dump(self.svm_model, save_path / str(self.model_filename + "_svm_model.joblib")) @@ -144,7 +147,7 @@ class FreqaiDataKitchen: return - def load_data(self, coin: str = '') -> Any: + def load_data(self, coin: str = '', keras_model=False) -> Any: """ loads all data required to make a prediction on a sub-train time range :returns: @@ -190,8 +193,11 @@ class FreqaiDataKitchen: # try to access model in memory instead of loading object from disk to save time if self.live and self.model_filename in self.data_drawer.model_dictionary: model = self.data_drawer.model_dictionary[self.model_filename] - else: + elif not keras_model: model = load(self.data_path / str(self.model_filename + "_model.joblib")) + else: + from tensorflow import keras + model = keras.models.load_model(self.data_path / str(self.model_filename + "_model.h5")) if Path(self.data_path / str(self.model_filename + "_svm_model.joblib")).resolve().exists(): @@ -287,7 +293,11 @@ class FreqaiDataKitchen: training_filter ): # we don't care about total row number (total no. datapoints) in training, we only care # about removing any row with NaNs - drop_index_labels = pd.isnull(labels) + # if labels has multiple columns (user wants to train multiple models), we detect here + if labels.shape[1] == 1: + drop_index_labels = pd.isnull(labels) + else: + drop_index_labels = pd.isnull(labels).any(1) drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0) filtered_dataframe = filtered_dataframe[ (drop_index == 0) & (drop_index_labels == 0) diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index 7062947a8..64a13c802 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -68,6 +68,8 @@ class IFreqaiModel(ABC): self.scanning = False self.ready_to_scan = False self.first = True + self.keras = self.freqai_info.get('keras', False) + self.CONV_WIDTH = self.freqai_info.get('conv_width', 2) def assert_config(self, config: Dict[str, Any]) -> None: @@ -122,30 +124,28 @@ class IFreqaiModel(ABC): time.sleep(1) for pair in self.config.get('exchange', {}).get('pair_whitelist'): - (model_filename, - trained_timestamp, - _, _) = self.data_drawer.get_pair_dict_info(pair) + (_, trained_timestamp, _, _) = self.data_drawer.get_pair_dict_info(pair) if self.data_drawer.pair_dict[pair]['priority'] != 1: continue dh = FreqaiDataKitchen(self.config, self.data_drawer, self.live, pair) - file_exists = False + # file_exists = False dh.set_paths(pair, trained_timestamp) - file_exists = self.model_exists(pair, - dh, - trained_timestamp=trained_timestamp, - model_filename=model_filename, - scanning=True) + # file_exists = self.model_exists(pair, + # dh, + # trained_timestamp=trained_timestamp, + # model_filename=model_filename, + # scanning=True) (retrain, new_trained_timerange, data_load_timerange) = dh.check_if_new_training_required(trained_timestamp) dh.set_paths(pair, new_trained_timerange.stopts) - if retrain or not file_exists: + if retrain: # or not file_exists: self.train_model_in_series(new_trained_timerange, pair, strategy, dh, data_load_timerange) @@ -199,9 +199,9 @@ class IFreqaiModel(ABC): self.data_drawer.pair_dict[metadata['pair']][ 'trained_timestamp'] = trained_timestamp.stopts dh.set_new_model_names(metadata['pair'], trained_timestamp) - dh.save_data(self.model, metadata['pair']) + dh.save_data(self.model, metadata['pair'], keras=self.keras) else: - self.model = dh.load_data(metadata['pair']) + self.model = dh.load_data(metadata['pair'], keras=self.keras) self.check_if_feature_list_matches_strategy(dataframe_train, dh) @@ -278,7 +278,7 @@ class IFreqaiModel(ABC): f'using { self.identifier }') # load the model and associated data into the data kitchen - self.model = dh.load_data(coin=metadata['pair']) + self.model = dh.load_data(coin=metadata['pair'], keras=self.keras) if not self.model: logger.warning('No model ready, returning null values to strategy.') @@ -297,14 +297,16 @@ class IFreqaiModel(ABC): trained_timestamp: int) -> None: # hold the historical predictions in memory so we are sending back - # correct array to strategy FIXME currently broken, but only affecting - # Frequi reporting. Signals remain unaffeted. + # correct array to strategy if pair not in self.data_drawer.model_return_values: preds, do_preds = self.predict(dataframe, dh) - dh.append_predictions(preds, do_preds, len(dataframe)) - dh.fill_predictions(len(dataframe)) - self.data_drawer.set_initial_return_values(pair, dh) + # mypy doesnt like the typing in else statement, so we need to explicitly add to + # dataframe separately + dataframe['prediction'], dataframe['do_predict'] = preds, do_preds + # dh.append_predictions(preds, do_preds, len(dataframe)) + # dh.fill_predictions(len(dataframe)) + self.data_drawer.set_initial_return_values(pair, dh, dataframe) return elif self.dh.check_if_model_expired(trained_timestamp): preds, do_preds, dh.DI_values = np.zeros(2), np.ones(2) * 2, np.zeros(2) @@ -312,7 +314,8 @@ class IFreqaiModel(ABC): 'construction should take care to consider this event with ' 'prediction == 0 and do_predict == 2') else: - preds, do_preds = self.predict(dataframe.iloc[-2:], dh) + # Only feed in the most recent candle for prediction in live scenario + preds, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dh, first=False) self.data_drawer.append_model_predictions(pair, preds, do_preds, dh.data["target_mean"], @@ -426,71 +429,6 @@ class IFreqaiModel(ABC): if not col.startswith('%') or col.startswith('%%')] return dataframe[to_keep] - @threaded - def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, pair: str, - strategy: IStrategy, dh: FreqaiDataKitchen, - data_load_timerange: TimeRange): - """ - Retreive data and train model on separate thread. Always called if the model folder already - contains a full set of trained models. - :params: - new_trained_timerange: TimeRange = the timerange to train the model on - metadata: dict = strategy provided metadata - strategy: IStrategy = user defined strategy object - dh: FreqaiDataKitchen = non-persistent data container for current coin/loop - data_load_timerange: TimeRange = the amount of data to be loaded for populate_any_indicators - (larger than new_trained_timerange so that new_trained_timerange does not contain any NaNs) - """ - - # with nostdout(): - # dh.download_new_data_for_retraining(data_load_timerange, metadata, strategy) - # corr_dataframes, base_dataframes = dh.load_pairs_histories(data_load_timerange, - # metadata) - - corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange, - pair) - - # protecting from common benign errors associated with grabbing new data from exchange: - try: - unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy, - corr_dataframes, - base_dataframes, - pair) - unfiltered_dataframe = dh.slice_dataframe(new_trained_timerange, unfiltered_dataframe) - - except Exception as err: - logger.exception(err) - self.training_on_separate_thread = False - self.retrain = False - return - - try: - model = self.train(unfiltered_dataframe, pair, dh) - except ValueError: - logger.warning('Value error encountered during training') - self.training_on_separate_thread = False - self.retrain = False - return - - self.data_drawer.pair_dict[pair][ - 'trained_timestamp'] = new_trained_timerange.stopts - dh.set_new_model_names(pair, new_trained_timerange) - # logger.info('Training queue' - # f'{sorted(self.data_drawer.pair_dict.items(), key=lambda item: item[1])}') - - if self.data_drawer.pair_dict[pair]['priority'] == 1: - with self.lock: - self.data_drawer.pair_to_end_of_training_queue(pair) - dh.save_data(model, coin=pair) - # self.training_on_separate_thread = False - # self.retrain = False - - # each time we finish a training, we check the directory to purge old models. - if self.freqai_info.get('purge_old_models', False): - self.data_drawer.purge_old_models() - - return - def train_model_in_series(self, new_trained_timerange: TimeRange, pair: str, strategy: IStrategy, dh: FreqaiDataKitchen, data_load_timerange: TimeRange): @@ -505,9 +443,7 @@ class IFreqaiModel(ABC): data_load_timerange: TimeRange = the amount of data to be loaded for populate_any_indicators (larger than new_trained_timerange so that new_trained_timerange does not contain any NaNs) """ - # dh.download_new_data_for_retraining(data_load_timerange, metadata, strategy) - # corr_dataframes, base_dataframes = dh.load_pairs_histories(data_load_timerange, - # metadata) + corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange, pair) @@ -527,7 +463,7 @@ class IFreqaiModel(ABC): if self.data_drawer.pair_dict[pair]['priority'] == 1 and self.scanning: with self.lock: self.data_drawer.pair_to_end_of_training_queue(pair) - dh.save_data(model, coin=pair) + dh.save_data(model, coin=pair, keras=self.keras) if self.freqai_info.get('purge_old_models', False): self.data_drawer.purge_old_models() @@ -563,7 +499,7 @@ class IFreqaiModel(ABC): @abstractmethod def predict(self, dataframe: DataFrame, - dh: FreqaiDataKitchen) -> Tuple[npt.ArrayLike, npt.ArrayLike]: + dh: FreqaiDataKitchen, first: bool = True) -> Tuple[npt.ArrayLike, npt.ArrayLike]: """ Filter the prediction features data and predict with it. :param: