diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index 1aecddb9d..f8002a45f 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -5,10 +5,11 @@ import re import shutil import threading from pathlib import Path -from typing import Any, Dict, Tuple +from typing import Any, Dict, Tuple, TypedDict import numpy as np import pandas as pd +import rapidjson from joblib import dump, load from joblib.externals import cloudpickle from numpy.typing import ArrayLike, NDArray @@ -24,6 +25,14 @@ from freqtrade.strategy.interface import IStrategy logger = logging.getLogger(__name__) +class pair_info(TypedDict): + model_filename: str + first: bool + trained_timestamp: int + priority: int + data_path: str + + class FreqaiDataDrawer: """ Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving @@ -54,14 +63,13 @@ class FreqaiDataDrawer: self.config = config self.freqai_info = config.get("freqai", {}) # dictionary holding all pair metadata necessary to load in from disk - self.pair_dict: Dict[str, Any] = {} + self.pair_dict: Dict[str, pair_info] = {} # dictionary holding all actively inferenced models in memory given a model filename self.model_dictionary: Dict[str, Any] = {} - self.model_return_values: Dict[str, Any] = {} - self.pair_data_dict: Dict[str, Any] = {} - self.historic_data: Dict[str, Any] = {} - self.historic_predictions: Dict[str, Any] = {} - self.follower_dict: Dict[str, Any] = {} + self.model_return_values: Dict[str, DataFrame] = {} + self.historic_data: Dict[str, Dict[str, DataFrame]] = {} + self.historic_predictions: Dict[str, DataFrame] = {} + self.follower_dict: Dict[str, pair_info] = {} self.full_path = full_path self.follower_name: str = self.config.get("bot_name", "follower1") self.follower_dict_path = Path( @@ -77,6 +85,9 @@ class FreqaiDataDrawer: self.training_queue: Dict[str, int] = {} self.history_lock = threading.Lock() self.old_DBSCAN_eps: Dict[str, float] = {} + self.empty_pair_dict: pair_info = { + "model_filename": "", "trained_timestamp": 0, + "priority": 1, "first": True, "data_path": ""} def load_drawer_from_disk(self): """ @@ -133,15 +144,17 @@ class FreqaiDataDrawer: """ Save data drawer full of all pair model metadata in present model folder. """ - with open(self.pair_dictionary_path, "w") as fp: - json.dump(self.pair_dict, fp, default=self.np_encoder) + with open(self.pair_dictionary_path, 'w') as fp: + rapidjson.dump(self.pair_dict, fp, default=self.np_encoder, + number_mode=rapidjson.NM_NATIVE) def save_follower_dict_to_disk(self): """ Save follower dictionary to disk (used by strategy for persistent prediction targets) """ with open(self.follower_dict_path, "w") as fp: - json.dump(self.follower_dict, fp, default=self.np_encoder) + rapidjson.dump(self.follower_dict, fp, default=self.np_encoder, + number_mode=rapidjson.NM_NATIVE) def create_follower_dict(self): """ @@ -175,18 +188,19 @@ class FreqaiDataDrawer: trained_timestamp: int = the last time the coin was trained return_null_array: bool = Follower could not find pair metadata """ + pair_dict = self.pair_dict.get(pair) - data_path_set = self.pair_dict.get(pair, {}).get("data_path", None) + data_path_set = self.pair_dict.get(pair, self.empty_pair_dict).get("data_path", "") return_null_array = False if pair_dict: model_filename = pair_dict["model_filename"] trained_timestamp = pair_dict["trained_timestamp"] elif not self.follow_mode: - pair_dict = self.pair_dict[pair] = {} - model_filename = pair_dict["model_filename"] = "" - trained_timestamp = pair_dict["trained_timestamp"] = 0 - pair_dict["priority"] = len(self.pair_dict) + self.pair_dict[pair] = self.empty_pair_dict.copy() + model_filename = "" + trained_timestamp = 0 + self.pair_dict[pair]["priority"] = len(self.pair_dict) if not data_path_set and self.follow_mode: logger.warning( @@ -205,11 +219,9 @@ class FreqaiDataDrawer: if pair_in_dict: return else: - self.pair_dict[metadata["pair"]] = {} - self.pair_dict[metadata["pair"]]["model_filename"] = "" - self.pair_dict[metadata["pair"]]["first"] = True - self.pair_dict[metadata["pair"]]["trained_timestamp"] = 0 + self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy() self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict) + return def pair_to_end_of_training_queue(self, pair: str) -> None: @@ -440,13 +452,17 @@ class FreqaiDataDrawer: dk.data["label_list"] = dk.label_list # store the metadata with open(save_path / f"{dk.model_filename}_metadata.json", "w") as fp: - json.dump(dk.data, fp, default=dk.np_encoder) + rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE) # save the train data to file so we can check preds for area of applicability later dk.data_dictionary["train_features"].to_pickle( save_path / f"{dk.model_filename}_trained_df.pkl" ) + dk.data_dictionary["train_dates"].to_pickle( + save_path / f"{dk.model_filename}_trained_dates_df.pkl" + ) + if self.freqai_info["feature_parameters"].get("principal_component_analysis"): cloudpickle.dump( dk.pca, open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "wb") diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index 6735df02b..96479b87c 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -20,6 +20,7 @@ from freqtrade.configuration import TimeRange from freqtrade.data.dataprovider import DataProvider from freqtrade.data.history.history_utils import refresh_backtest_ohlcv_data from freqtrade.exceptions import OperationalException +from freqtrade.exchange import timeframe_to_seconds from freqtrade.strategy.interface import IStrategy @@ -58,8 +59,8 @@ class FreqaiDataKitchen: live: bool = False, pair: str = "", ): - self.data: Dict[Any, Any] = {} - self.data_dictionary: Dict[Any, Any] = {} + self.data: Dict[str, Any] = {} + self.data_dictionary: Dict[str, DataFrame] = {} self.config = config self.freqai_config: Dict[str, Any] = config["freqai"] self.full_df: DataFrame = DataFrame() @@ -98,6 +99,7 @@ class FreqaiDataKitchen: self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {}) self.thread_count = self.freqai_config.get("data_kitchen_thread_count", -1) + self.train_dates: DataFrame = pd.DataFrame() def set_paths( self, @@ -206,16 +208,20 @@ class FreqaiDataKitchen: if (training_filter): # we don't care about total row number (total no. datapoints) in training, we only care # about removing any row with NaNs - # if labels has multiple columns (user wants to train multiple models), we detect here + # if labels has multiple columns (user wants to train multiple modelEs), we detect here labels = unfiltered_dataframe.filter(label_list, axis=1) drop_index_labels = pd.isnull(labels).any(1) drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0) + dates = unfiltered_dataframe.filter('date', axis=1) filtered_dataframe = filtered_dataframe[ (drop_index == 0) & (drop_index_labels == 0) ] # dropping values labels = labels[ (drop_index == 0) & (drop_index_labels == 0) ] # assuming the labels depend entirely on the dataframe here. + self.train_dates = dates[ + (drop_index == 0) & (drop_index_labels == 0) + ] logger.info( f"dropped {len(unfiltered_dataframe) - len(filtered_dataframe)} training points" f" due to NaNs in populated dataset {len(unfiltered_dataframe)}." @@ -266,6 +272,7 @@ class FreqaiDataKitchen: "test_labels": test_labels, "train_weights": train_weights, "test_weights": test_weights, + "train_dates": self.train_dates } return self.data_dictionary @@ -351,7 +358,7 @@ class FreqaiDataKitchen: return df def split_timerange( - self, tr: str, train_split: int = 28, bt_split: int = 7 + self, tr: str, train_split: int = 28, bt_split: float = 7 ) -> Tuple[list, list]: """ Function which takes a single time range (tr) and splits it @@ -359,7 +366,7 @@ class FreqaiDataKitchen: tr: str, full timerange to train on train_split: the period length for the each training (days). Specified in user configuration file - bt_split: the backtesting length (dats). Specified in user configuration file + bt_split: the backtesting length (days). Specified in user configuration file """ if not isinstance(train_split, int) or train_split < 1: @@ -386,7 +393,7 @@ class FreqaiDataKitchen: while True: if not first: - timerange_train.startts = timerange_train.startts + bt_period + timerange_train.startts = timerange_train.startts + int(bt_period) timerange_train.stopts = timerange_train.startts + train_period_days first = False @@ -399,7 +406,7 @@ class FreqaiDataKitchen: timerange_backtest.startts = timerange_train.stopts - timerange_backtest.stopts = timerange_backtest.startts + bt_period + timerange_backtest.stopts = timerange_backtest.startts + int(bt_period) if timerange_backtest.stopts > config_timerange.stopts: timerange_backtest.stopts = config_timerange.stopts @@ -820,30 +827,21 @@ class FreqaiDataKitchen: trained_timerange = TimeRange() data_load_timerange = TimeRange() - # find the max indicator length required - max_timeframe_chars = self.freqai_config["feature_parameters"].get( - "include_timeframes" - )[-1] - max_period = self.freqai_config["feature_parameters"].get( - "indicator_max_period_candles", 50 - ) - additional_seconds = 0 - if max_timeframe_chars[-1] == "d": - additional_seconds = max_period * SECONDS_IN_DAY * int(max_timeframe_chars[-2]) - elif max_timeframe_chars[-1] == "h": - additional_seconds = max_period * 3600 * int(max_timeframe_chars[-2]) - elif max_timeframe_chars[-1] == "m": - if len(max_timeframe_chars) == 2: - additional_seconds = max_period * 60 * int(max_timeframe_chars[-2]) - elif len(max_timeframe_chars) == 3: - additional_seconds = max_period * 60 * int(float(max_timeframe_chars[0:2])) - else: - logger.warning( - "FreqAI could not detect max timeframe and therefore may not " - "download the proper amount of data for training" - ) + timeframes = self.freqai_config["feature_parameters"].get("include_timeframes") - # logger.info(f'Extending data download by {additional_seconds/SECONDS_IN_DAY:.2f} days') + max_tf_seconds = 0 + for tf in timeframes: + secs = timeframe_to_seconds(tf) + if secs > max_tf_seconds: + max_tf_seconds = secs + + # We notice that users like to use exotic indicators where + # they do not know the required timeperiod. Here we include a factor + # of safety by multiplying the user considered "max" by 2. + max_period = self.freqai_config["feature_parameters"].get( + "indicator_max_period_candles", 20 + ) * 2 + additional_seconds = max_period * max_tf_seconds if trained_timestamp != 0: elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR diff --git a/tests/strategy/strats/freqai_test_classifier.py b/tests/strategy/strats/freqai_test_classifier.py index c333ac818..0a2ce793a 100644 --- a/tests/strategy/strats/freqai_test_classifier.py +++ b/tests/strategy/strats/freqai_test_classifier.py @@ -1,10 +1,11 @@ import logging from functools import reduce +import numpy as np import pandas as pd import talib.abstract as ta from pandas import DataFrame -import numpy as np + from freqtrade.strategy import DecimalParameter, IntParameter, IStrategy, merge_informative_pair