Merge branch 'develop' into backtest_live_models

This commit is contained in:
Wagner Costa Santos
2022-11-03 13:29:25 -03:00
68 changed files with 1232 additions and 606 deletions

View File

@@ -51,7 +51,7 @@ class BaseClassifierModel(IFreqaiModel):
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)

View File

@@ -50,7 +50,7 @@ class BaseRegressionModel(IFreqaiModel):
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)

View File

@@ -47,7 +47,7 @@ class BaseTensorFlowModel(IFreqaiModel):
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)

View File

@@ -214,7 +214,10 @@ class FreqaiDataKitchen:
const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
if const_cols:
filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
self.data['constant_features_list'] = const_cols
logger.warning(f"Removed features {const_cols} with constant values.")
else:
self.data['constant_features_list'] = []
# we don't care about total row number (total no. datapoints) in training, we only care
# about removing any row with NaNs
# if labels has multiple columns (user wants to train multiple modelEs), we detect here
@@ -245,7 +248,8 @@ class FreqaiDataKitchen:
self.data["filter_drop_index_training"] = drop_index
else:
filtered_df = self.check_pred_labels(filtered_df)
if len(self.data['constant_features_list']):
filtered_df = self.check_pred_labels(filtered_df)
# we are backtesting so we need to preserve row number to send back to strategy,
# so now we use do_predict to avoid any prediction based on a NaN
drop_index = pd.isnull(filtered_df).any(axis=1)
@@ -354,13 +358,19 @@ class FreqaiDataKitchen:
:param df: Dataframe to be standardized
"""
for item in df.keys():
df[item] = (
2
* (df[item] - self.data[f"{item}_min"])
/ (self.data[f"{item}_max"] - self.data[f"{item}_min"])
- 1
)
train_max = [None] * len(df.keys())
train_min = [None] * len(df.keys())
for i, item in enumerate(df.keys()):
train_max[i] = self.data[f"{item}_max"]
train_min[i] = self.data[f"{item}_min"]
train_max_series = pd.Series(train_max, index=df.keys())
train_min_series = pd.Series(train_min, index=df.keys())
df = (
2 * (df - train_min_series) / (train_max_series - train_min_series) - 1
)
return df
@@ -491,18 +501,16 @@ class FreqaiDataKitchen:
def check_pred_labels(self, df_predictions: DataFrame) -> DataFrame:
"""
Check that prediction feature labels match training feature labels.
:params:
:df_predictions: incoming predictions
:param df_predictions: incoming predictions
"""
train_labels = self.data_dictionary["train_features"].columns
pred_labels = df_predictions.columns
num_diffs = len(pred_labels.difference(train_labels))
if num_diffs != 0:
df_predictions = df_predictions[train_labels]
logger.warning(
f"Removed {num_diffs} features from prediction features, "
f"these were likely considered constant values during most recent training."
)
constant_labels = self.data['constant_features_list']
df_predictions = df_predictions.filter(
df_predictions.columns.difference(constant_labels)
)
logger.warning(
f"Removed {len(constant_labels)} features from prediction features, "
f"these were considered constant values during most recent training."
)
return df_predictions
@@ -986,6 +994,9 @@ class FreqaiDataKitchen:
if "labels_std" in self.data:
append_df[f"{label}_std"] = self.data["labels_std"][label]
for extra_col in self.data["extra_returns_per_train"]:
append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
append_df["do_predict"] = do_predict
if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
append_df["DI_values"] = self.DI_values
@@ -1150,6 +1161,51 @@ class FreqaiDataKitchen:
if pair not in self.all_pairs:
self.all_pairs.append(pair)
def extract_corr_pair_columns_from_populated_indicators(
self,
dataframe: DataFrame
) -> Dict[str, DataFrame]:
"""
Find the columns of the dataframe corresponding to the corr_pairlist, save them
in a dictionary to be reused and attached to other pairs.
:param dataframe: fully populated dataframe (current pair + corr_pairs)
:return: corr_dataframes, dictionary of dataframes to be attached
to other pairs in same candle.
"""
corr_dataframes: Dict[str, DataFrame] = {}
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
for pair in pairs:
valid_strs = [f"%-{pair}", f"%{pair}", f"%_{pair}"]
pair_cols = [col for col in dataframe.columns if
any(substr in col for substr in valid_strs)]
pair_cols.insert(0, 'date')
corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
return corr_dataframes
def attach_corr_pair_columns(self, dataframe: DataFrame,
corr_dataframes: Dict[str, DataFrame],
current_pair: str) -> DataFrame:
"""
Attach the existing corr_pair dataframes to the current pair dataframe before training
:param dataframe: current pair strategy dataframe, indicators populated already
:param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
:param current_pair: current pair to which we will attach corr pair dataframe
:return:
:dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
ready for training
"""
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
for pair in pairs:
if current_pair != pair:
dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
return dataframe
def use_strategy_to_populate_indicators(
self,
strategy: IStrategy,
@@ -1157,6 +1213,7 @@ class FreqaiDataKitchen:
base_dataframes: dict = {},
pair: str = "",
prediction_dataframe: DataFrame = pd.DataFrame(),
do_corr_pairs: bool = True,
) -> DataFrame:
"""
Use the user defined strategy for populating indicators during retrain
@@ -1166,15 +1223,15 @@ class FreqaiDataKitchen:
:param base_dataframes: dict = dict containing the current pair dataframes
(for user defined timeframes)
:param metadata: dict = strategy furnished pair metadata
:returns:
:return:
dataframe: DataFrame = dataframe containing populated indicators
"""
# for prediction dataframe creation, we let dataprovider handle everything in the strategy
# so we create empty dictionaries, which allows us to pass None to
# `populate_any_indicators()`. Signaling we want the dp to give us the live dataframe.
tfs = self.freqai_config["feature_parameters"].get("include_timeframes")
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
pairs: List[str] = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
if not prediction_dataframe.empty:
dataframe = prediction_dataframe.copy()
for tf in tfs:
@@ -1197,15 +1254,18 @@ class FreqaiDataKitchen:
informative=base_dataframes[tf],
set_generalized_indicators=sgi
)
if pairs:
for i in pairs:
if pair in i:
continue # dont repeat anything from whitelist
# ensure corr pairs are always last
for corr_pair in pairs:
if pair == corr_pair:
continue # dont repeat anything from whitelist
for tf in tfs:
if pairs and do_corr_pairs:
dataframe = strategy.populate_any_indicators(
i,
corr_pair,
dataframe.copy(),
tf,
informative=corr_dataframes[i][tf]
informative=corr_dataframes[corr_pair][tf]
)
self.get_unique_classes_from_labels(dataframe)

View File

@@ -1,12 +1,10 @@
import logging
import shutil
import threading
import time
from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Literal, Tuple
import numpy as np
@@ -21,7 +19,7 @@ from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.freqai.data_drawer import FreqaiDataDrawer
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.utils import plot_feature_importance
from freqtrade.freqai.utils import plot_feature_importance, record_params
from freqtrade.strategy.interface import IStrategy
@@ -61,6 +59,7 @@ class IFreqaiModel(ABC):
"data_split_parameters", {})
self.model_training_parameters: Dict[str, Any] = config.get("freqai", {}).get(
"model_training_parameters", {})
self.identifier: str = self.freqai_info.get("identifier", "no_id_provided")
self.retrain = False
self.first = True
self.set_full_path()
@@ -69,9 +68,9 @@ class IFreqaiModel(ABC):
if self.save_backtest_models:
logger.info('Backtesting module configured to save all models.')
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
self.identifier: str = self.freqai_info.get("identifier", "no_id_provided")
self.scanning = False
self.ft_params = self.freqai_info["feature_parameters"]
self.corr_pairlist: List[str] = self.ft_params.get("include_corr_pairlist", [])
self.keras: bool = self.freqai_info.get("keras", False)
if self.keras and self.ft_params.get("DI_threshold", 0):
self.ft_params["DI_threshold"] = 0
@@ -83,9 +82,6 @@ class IFreqaiModel(ABC):
self.pair_it_train = 0
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
self.train_queue = self._set_train_queue()
self.last_trade_database_summary: DataFrame = {}
self.current_trade_database_summary: DataFrame = {}
self.analysis_lock = Lock()
self.inference_time: float = 0
self.train_time: float = 0
self.begin_time: float = 0
@@ -93,10 +89,16 @@ class IFreqaiModel(ABC):
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
self.continual_learning = self.freqai_info.get('continual_learning', False)
self.plot_features = self.ft_params.get("plot_feature_importances", 0)
self.corr_dataframes: Dict[str, DataFrame] = {}
# get_corr_dataframes is controlling the caching of corr_dataframes
# for improved performance. Careful with this boolean.
self.get_corr_dataframes: bool = True
self._threads: List[threading.Thread] = []
self._stop_event = threading.Event()
record_params(config, self.full_path)
def __getstate__(self):
"""
Return an empty state to be pickled in hyperopt
@@ -385,10 +387,10 @@ class IFreqaiModel(ABC):
# load the model and associated data into the data kitchen
self.model = self.dd.load_data(metadata["pair"], dk)
with self.analysis_lock:
dataframe = self.dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
)
dataframe = dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"],
do_corr_pairs=self.get_corr_dataframes
)
if not self.model:
logger.warning(
@@ -397,6 +399,9 @@ class IFreqaiModel(ABC):
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
if self.corr_pairlist:
dataframe = self.cache_corr_pairlist_dfs(dataframe, dk)
dk.find_labels(dataframe)
self.build_strategy_return_arrays(dataframe, dk, metadata["pair"], trained_timestamp)
@@ -548,14 +553,13 @@ class IFreqaiModel(ABC):
return file_exists
def set_full_path(self) -> None:
"""
Creates and sets the full path for the identifier
"""
self.full_path = Path(
self.config["user_data_dir"] / "models" / f"{self.freqai_info['identifier']}"
self.config["user_data_dir"] / "models" / f"{self.identifier}"
)
self.full_path.mkdir(parents=True, exist_ok=True)
shutil.copy(
self.config["config_files"][0],
Path(self.full_path, Path(self.config["config_files"][0]).name),
)
def extract_data_and_train_model(
self,
@@ -581,10 +585,9 @@ class IFreqaiModel(ABC):
data_load_timerange, pair, dk
)
with self.analysis_lock:
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
strategy, corr_dataframes, base_dataframes, pair
)
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
strategy, corr_dataframes, base_dataframes, pair
)
unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
@@ -702,6 +705,8 @@ class IFreqaiModel(ABC):
" avoid blinding open trades and degrading performance.")
self.pair_it = 0
self.inference_time = 0
if self.corr_pairlist:
self.get_corr_dataframes = True
return
def train_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
@@ -760,6 +765,29 @@ class IFreqaiModel(ABC):
f'Best approximation queue: {best_queue}')
return best_queue
def cache_corr_pairlist_dfs(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
Cache the corr_pairlist dfs to speed up performance for subsequent pairs during the
current candle.
:param dataframe: strategy fed dataframe
:param dk: datakitchen object for current asset
:return: dataframe to attach/extract cached corr_pair dfs to/from.
"""
if self.get_corr_dataframes:
self.corr_dataframes = dk.extract_corr_pair_columns_from_populated_indicators(dataframe)
if not self.corr_dataframes:
logger.warning("Couldn't cache corr_pair dataframes for improved performance. "
"Consider ensuring that the full coin/stake, e.g. XYZ/USD, "
"is included in the column names when you are creating features "
"in `populate_any_indicators()`.")
self.get_corr_dataframes = not bool(self.corr_dataframes)
else:
dataframe = dk.attach_corr_pair_columns(
dataframe, self.corr_dataframes, dk.pair)
return dataframe
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModel.py for an example.

View File

@@ -26,9 +26,8 @@ class XGBoostRFClassifier(BaseClassifierModel):
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
:param data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
X = data_dictionary["train_features"].to_numpy()
@@ -65,7 +64,7 @@ class XGBoostRFClassifier(BaseClassifierModel):
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_df: Full dataframe for the current backtest period.
:param unfiltered_df: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove

View File

@@ -29,6 +29,7 @@ class XGBoostRFRegressor(BaseRegressionModel):
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) == 0:
eval_set = None
eval_weights = None
else:
eval_set = [(data_dictionary["test_features"], data_dictionary["test_labels"])]
eval_weights = [data_dictionary['test_weights']]

View File

@@ -29,6 +29,7 @@ class XGBoostRegressor(BaseRegressionModel):
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) == 0:
eval_set = None
eval_weights = None
else:
eval_set = [(data_dictionary["test_features"], data_dictionary["test_labels"])]
eval_weights = [data_dictionary['test_weights']]

View File

@@ -1,9 +1,11 @@
import logging
from datetime import datetime, timezone
from typing import Any
from pathlib import Path
from typing import Any, Dict
import numpy as np
import pandas as pd
import rapidjson
from freqtrade.configuration import TimeRange
from freqtrade.constants import Config
@@ -193,6 +195,31 @@ def plot_feature_importance(model: Any, pair: str, dk: FreqaiDataKitchen,
store_plot_file(fig, f"{dk.model_filename}-{label}.html", dk.data_path)
def record_params(config: Dict[str, Any], full_path: Path) -> None:
"""
Records run params in the full path for reproducibility
"""
params_record_path = full_path / "run_params.json"
run_params = {
"freqai": config.get('freqai', {}),
"timeframe": config.get('timeframe'),
"stake_amount": config.get('stake_amount'),
"stake_currency": config.get('stake_currency'),
"max_open_trades": config.get('max_open_trades'),
"pairs": config.get('exchange', {}).get('pair_whitelist')
}
with open(params_record_path, "w") as handle:
rapidjson.dump(
run_params,
handle,
indent=4,
default=str,
number_mode=rapidjson.NM_NATIVE | rapidjson.NM_NAN
)
def get_timerange_backtest_live_models(config: Config):
"""
Returns a formated timerange for backtest live/ready models