Merge pull request #4 from freqtrade/feat/freqai

Feat/freqai
This commit is contained in:
lolong
2022-08-13 10:48:07 +02:00
committed by GitHub
96 changed files with 3492 additions and 1676 deletions

View File

@@ -5,13 +5,14 @@ import re
import shutil
import threading
from pathlib import Path
from typing import Any, Dict, Tuple
from typing import Any, Dict, Tuple, TypedDict
import numpy as np
import pandas as pd
import rapidjson
from joblib import dump, load
from joblib.externals import cloudpickle
from numpy.typing import ArrayLike
from numpy.typing import ArrayLike, NDArray
from pandas import DataFrame
from freqtrade.configuration import TimeRange
@@ -24,6 +25,15 @@ from freqtrade.strategy.interface import IStrategy
logger = logging.getLogger(__name__)
class pair_info(TypedDict):
model_filename: str
first: bool
trained_timestamp: int
priority: int
data_path: str
extras: dict
class FreqaiDataDrawer:
"""
Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving
@@ -39,7 +49,7 @@ class FreqaiDataDrawer:
Robert Caulk @robcaulk
Theoretical brainstorming:
Elin Törnquist @thorntwig
Elin Törnquist @th0rntwig
Code review, software architecture brainstorming:
@xmatthias
@@ -54,14 +64,13 @@ class FreqaiDataDrawer:
self.config = config
self.freqai_info = config.get("freqai", {})
# dictionary holding all pair metadata necessary to load in from disk
self.pair_dict: Dict[str, Any] = {}
self.pair_dict: Dict[str, pair_info] = {}
# dictionary holding all actively inferenced models in memory given a model filename
self.model_dictionary: Dict[str, Any] = {}
self.model_return_values: Dict[str, Any] = {}
self.pair_data_dict: Dict[str, Any] = {}
self.historic_data: Dict[str, Any] = {}
self.historic_predictions: Dict[str, Any] = {}
self.follower_dict: Dict[str, Any] = {}
self.model_return_values: Dict[str, DataFrame] = {}
self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
self.historic_predictions: Dict[str, DataFrame] = {}
self.follower_dict: Dict[str, pair_info] = {}
self.full_path = full_path
self.follower_name: str = self.config.get("bot_name", "follower1")
self.follower_dict_path = Path(
@@ -76,6 +85,10 @@ class FreqaiDataDrawer:
self.load_historic_predictions_from_disk()
self.training_queue: Dict[str, int] = {}
self.history_lock = threading.Lock()
self.old_DBSCAN_eps: Dict[str, float] = {}
self.empty_pair_dict: pair_info = {
"model_filename": "", "trained_timestamp": 0,
"priority": 1, "first": True, "data_path": "", "extras": {}}
def load_drawer_from_disk(self):
"""
@@ -132,15 +145,17 @@ class FreqaiDataDrawer:
"""
Save data drawer full of all pair model metadata in present model folder.
"""
with open(self.pair_dictionary_path, "w") as fp:
json.dump(self.pair_dict, fp, default=self.np_encoder)
with open(self.pair_dictionary_path, 'w') as fp:
rapidjson.dump(self.pair_dict, fp, default=self.np_encoder,
number_mode=rapidjson.NM_NATIVE)
def save_follower_dict_to_disk(self):
"""
Save follower dictionary to disk (used by strategy for persistent prediction targets)
"""
with open(self.follower_dict_path, "w") as fp:
json.dump(self.follower_dict, fp, default=self.np_encoder)
rapidjson.dump(self.follower_dict, fp, default=self.np_encoder,
number_mode=rapidjson.NM_NATIVE)
def create_follower_dict(self):
"""
@@ -174,18 +189,19 @@ class FreqaiDataDrawer:
trained_timestamp: int = the last time the coin was trained
return_null_array: bool = Follower could not find pair metadata
"""
pair_dict = self.pair_dict.get(pair)
data_path_set = self.pair_dict.get(pair, {}).get("data_path", None)
data_path_set = self.pair_dict.get(pair, self.empty_pair_dict).get("data_path", "")
return_null_array = False
if pair_dict:
model_filename = pair_dict["model_filename"]
trained_timestamp = pair_dict["trained_timestamp"]
elif not self.follow_mode:
pair_dict = self.pair_dict[pair] = {}
model_filename = pair_dict["model_filename"] = ""
trained_timestamp = pair_dict["trained_timestamp"] = 0
pair_dict["priority"] = len(self.pair_dict)
self.pair_dict[pair] = self.empty_pair_dict.copy()
model_filename = ""
trained_timestamp = 0
self.pair_dict[pair]["priority"] = len(self.pair_dict)
if not data_path_set and self.follow_mode:
logger.warning(
@@ -204,11 +220,9 @@ class FreqaiDataDrawer:
if pair_in_dict:
return
else:
self.pair_dict[metadata["pair"]] = {}
self.pair_dict[metadata["pair"]]["model_filename"] = ""
self.pair_dict[metadata["pair"]]["first"] = True
self.pair_dict[metadata["pair"]]["trained_timestamp"] = 0
self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy()
self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict)
return
def pair_to_end_of_training_queue(self, pair: str) -> None:
@@ -225,25 +239,59 @@ class FreqaiDataDrawer:
historical candles, and also stores historical predictions despite retrainings (so stored
predictions are true predictions, not just inferencing on trained data)
"""
# dynamic df returned to strategy and plotted in frequi
mrv_df = self.model_return_values[pair] = pd.DataFrame()
for label in dk.label_list:
mrv_df[label] = pred_df[label]
mrv_df[f"{label}_mean"] = dk.data["labels_mean"][label]
mrv_df[f"{label}_std"] = dk.data["labels_std"][label]
# if user reused `identifier` in config and has historical predictions collected, load them
# so that frequi remains uninterrupted after a crash
hist_df = self.historic_predictions
if pair in hist_df:
len_diff = len(hist_df[pair].index) - len(pred_df.index)
if len_diff < 0:
df_concat = pd.concat([pred_df.iloc[:abs(len_diff)], hist_df[pair]],
ignore_index=True, keys=hist_df[pair].keys())
else:
df_concat = hist_df[pair].tail(len(pred_df.index)).reset_index(drop=True)
df_concat = df_concat.fillna(0)
self.model_return_values[pair] = df_concat
logger.info(f'Setting initial FreqUI plots from historical data for {pair}.')
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
mrv_df["DI_values"] = dk.DI_values
else:
for label in pred_df.columns:
mrv_df[label] = pred_df[label]
if mrv_df[label].dtype == object:
continue
mrv_df[f"{label}_mean"] = dk.data["labels_mean"][label]
mrv_df[f"{label}_std"] = dk.data["labels_std"][label]
mrv_df["do_predict"] = do_preds
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
mrv_df["DI_values"] = dk.DI_values
def append_model_predictions(self, pair: str, predictions: DataFrame, do_preds: ArrayLike,
mrv_df["do_predict"] = do_preds
if dk.data['extra_returns_per_train']:
rets = dk.data['extra_returns_per_train']
for return_str in rets:
mrv_df[return_str] = rets[return_str]
# for keras type models, the conv_window needs to be prepended so
# viewing is correct in frequi
if self.freqai_info.get('keras', False):
n_lost_points = self.freqai_info.get('conv_width', 2)
zeros_df = DataFrame(np.zeros((n_lost_points, len(mrv_df.columns))),
columns=mrv_df.columns)
self.model_return_values[pair] = pd.concat(
[zeros_df, mrv_df], axis=0, ignore_index=True)
def append_model_predictions(self, pair: str, predictions: DataFrame,
do_preds: NDArray[np.int_],
dk: FreqaiDataKitchen, len_df: int) -> None:
# strat seems to feed us variable sized dataframes - and since we are trying to build our
# own return array in the same shape, we need to figure out how the size has changed
# and adapt our stored/returned info accordingly.
length_difference = len(self.model_return_values[pair]) - len_df
i = 0
@@ -262,19 +310,28 @@ class FreqaiDataDrawer:
hp_df = pd.concat([hp_df, nan_df], ignore_index=True, axis=0)
self.historic_predictions[pair] = hp_df[:-1]
for label in dk.label_list:
# incase user adds additional "predictions" e.g. predict_proba output:
for label in predictions.columns:
df[label].iloc[-1] = predictions[label].iloc[-1]
if df[label].dtype == object:
continue
df[f"{label}_mean"].iloc[-1] = dk.data["labels_mean"][label]
df[f"{label}_std"].iloc[-1] = dk.data["labels_std"][label]
# df['prediction'].iloc[-1] = predictions[-1]
df["do_predict"].iloc[-1] = do_preds[-1]
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
df["DI_values"].iloc[-1] = dk.DI_values[-1]
if dk.data['extra_returns_per_train']:
rets = dk.data['extra_returns_per_train']
for return_str in rets:
df[return_str].iloc[-1] = rets[return_str]
# append the new predictions to persistent storage
if pair in self.historic_predictions:
self.historic_predictions[pair].iloc[-1] = df[label].iloc[-1]
for key in df.keys():
self.historic_predictions[pair][key].iloc[-1] = df[key].iloc[-1]
if length_difference < 0:
prepend_df = pd.DataFrame(
@@ -301,16 +358,25 @@ class FreqaiDataDrawer:
dk.find_features(dataframe)
for label in dk.label_list:
if self.freqai_info.get('predict_proba', []):
full_labels = dk.label_list + self.freqai_info['predict_proba']
else:
full_labels = dk.label_list
for label in full_labels:
dataframe[label] = 0
dataframe[f"{label}_mean"] = 0
dataframe[f"{label}_std"] = 0
# dataframe['prediction'] = 0
dataframe["do_predict"] = 0
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
dataframe["DI_value"] = 0
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
dataframe["DI_values"] = 0
if dk.data['extra_returns_per_train']:
rets = dk.data['extra_returns_per_train']
for return_str in rets:
dataframe[return_str] = 0
dk.return_dataframe = dataframe
@@ -379,24 +445,28 @@ class FreqaiDataDrawer:
model.save(save_path / f"{dk.model_filename}_model.h5")
if dk.svm_model is not None:
dump(dk.svm_model, save_path / str(dk.model_filename + "_svm_model.joblib"))
dump(dk.svm_model, save_path / f"{dk.model_filename}_svm_model.joblib")
dk.data["data_path"] = str(dk.data_path)
dk.data["model_filename"] = str(dk.model_filename)
dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
dk.data["label_list"] = dk.label_list
# store the metadata
with open(save_path / str(dk.model_filename + "_metadata.json"), "w") as fp:
json.dump(dk.data, fp, default=dk.np_encoder)
with open(save_path / f"{dk.model_filename}_metadata.json", "w") as fp:
rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
# save the train data to file so we can check preds for area of applicability later
dk.data_dictionary["train_features"].to_pickle(
save_path / str(dk.model_filename + "_trained_df.pkl")
save_path / f"{dk.model_filename}_trained_df.pkl"
)
if self.freqai_info.get("feature_parameters", {}).get("principal_component_analysis"):
dk.data_dictionary["train_dates"].to_pickle(
save_path / f"{dk.model_filename}_trained_dates_df.pkl"
)
if self.freqai_info["feature_parameters"].get("principal_component_analysis"):
cloudpickle.dump(
dk.pca, open(dk.data_path / str(dk.model_filename + "_pca_object.pkl"), "wb")
dk.pca, open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "wb")
)
# if self.live:
@@ -429,27 +499,27 @@ class FreqaiDataDrawer:
/ dk.data_path.parts[-1]
)
with open(dk.data_path / str(dk.model_filename + "_metadata.json"), "r") as fp:
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
dk.data = json.load(fp)
dk.training_features_list = dk.data["training_features_list"]
dk.label_list = dk.data["label_list"]
dk.data_dictionary["train_features"] = pd.read_pickle(
dk.data_path / str(dk.model_filename + "_trained_df.pkl")
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
)
# try to access model in memory instead of loading object from disk to save time
if dk.live and dk.model_filename in self.model_dictionary:
model = self.model_dictionary[dk.model_filename]
elif not dk.keras:
model = load(dk.data_path / str(dk.model_filename + "_model.joblib"))
model = load(dk.data_path / f"{dk.model_filename}_model.joblib")
else:
from tensorflow import keras
model = keras.models.load_model(dk.data_path / str(dk.model_filename + "_model.h5"))
model = keras.models.load_model(dk.data_path / f"{dk.model_filename}_model.h5")
if Path(dk.data_path / str(dk.model_filename + "_svm_model.joblib")).resolve().exists():
dk.svm_model = load(dk.data_path / str(dk.model_filename + "_svm_model.joblib"))
if Path(dk.data_path / f"{dk.model_filename}_svm_model.joblib").is_file():
dk.svm_model = load(dk.data_path / f"{dk.model_filename}_svm_model.joblib")
if not model:
raise OperationalException(
@@ -458,7 +528,7 @@ class FreqaiDataDrawer:
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
dk.pca = cloudpickle.load(
open(dk.data_path / str(dk.model_filename + "_pca_object.pkl"), "rb")
open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "rb")
)
return model
@@ -471,7 +541,7 @@ class FreqaiDataDrawer:
:params:
dataframe: DataFrame = strategy provided dataframe
"""
feat_params = self.freqai_info.get("feature_parameters", {})
feat_params = self.freqai_info["feature_parameters"]
with self.history_lock:
history_data = self.historic_data
@@ -524,7 +594,7 @@ class FreqaiDataDrawer:
for pair in dk.all_pairs:
if pair not in history_data:
history_data[pair] = {}
for tf in self.freqai_info.get("feature_parameters", {}).get("include_timeframes"):
for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
history_data[pair][tf] = load_pair_history(
datadir=self.config["datadir"],
timeframe=tf,
@@ -550,11 +620,11 @@ class FreqaiDataDrawer:
corr_dataframes: Dict[Any, Any] = {}
base_dataframes: Dict[Any, Any] = {}
historic_data = self.historic_data
pairs = self.freqai_info.get("feature_parameters", {}).get(
pairs = self.freqai_info["feature_parameters"].get(
"include_corr_pairlist", []
)
for tf in self.freqai_info.get("feature_parameters", {}).get("include_timeframes"):
for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
base_dataframes[tf] = dk.slice_dataframe(timerange, historic_data[pair][tf])
if pairs:
for p in pairs:

View File

@@ -10,13 +10,16 @@ import numpy.typing as npt
import pandas as pd
from pandas import DataFrame
from sklearn import linear_model
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from freqtrade.configuration import TimeRange
from freqtrade.data.dataprovider import DataProvider
from freqtrade.data.history.history_utils import refresh_backtest_ohlcv_data
from freqtrade.exceptions import OperationalException
from freqtrade.resolvers import ExchangeResolver
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.strategy.interface import IStrategy
@@ -39,7 +42,7 @@ class FreqaiDataKitchen:
Robert Caulk @robcaulk
Theoretical brainstorming:
Elin Törnquist @thorntwig
Elin Törnquist @th0rntwig
Code review, software architecture brainstorming:
@xmatthias
@@ -55,10 +58,10 @@ class FreqaiDataKitchen:
live: bool = False,
pair: str = "",
):
self.data: Dict[Any, Any] = {}
self.data_dictionary: Dict[Any, Any] = {}
self.data: Dict[str, Any] = {}
self.data_dictionary: Dict[str, DataFrame] = {}
self.config = config
self.freqai_config = config["freqai"]
self.freqai_config: Dict[str, Any] = config["freqai"]
self.full_df: DataFrame = DataFrame()
self.append_df: DataFrame = DataFrame()
self.data_path = Path()
@@ -68,14 +71,14 @@ class FreqaiDataKitchen:
self.live = live
self.pair = pair
self.svm_model: linear_model.SGDOneClassSVM = None
self.keras = self.freqai_config.get("keras", False)
self.keras: bool = self.freqai_config.get("keras", False)
self.set_all_pairs()
if not self.live:
if not self.config["timerange"]:
raise OperationalException(
'Please pass --timerange if you intend to use FreqAI for backtesting.')
self.full_timerange = self.create_fulltimerange(
self.config["timerange"], self.freqai_config.get("train_period_days")
self.config["timerange"], self.freqai_config.get("train_period_days", 0)
)
(self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
@@ -84,6 +87,10 @@ class FreqaiDataKitchen:
config["freqai"]["backtest_period_days"],
)
self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
self.thread_count = self.freqai_config.get("data_kitchen_thread_count", -1)
self.train_dates: DataFrame = pd.DataFrame()
def set_paths(
self,
pair: str,
@@ -101,7 +108,7 @@ class FreqaiDataKitchen:
self.data_path = Path(
self.full_path
/ str("sub-train" + "-" + pair.split("/")[0] + "_" + str(trained_timestamp))
/ f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
)
return
@@ -116,7 +123,7 @@ class FreqaiDataKitchen:
:filtered_dataframe: cleaned dataframe ready to be split.
:labels: cleaned labels ready to be split.
"""
feat_dict = self.freqai_config.get("feature_parameters", {})
feat_dict = self.freqai_config["feature_parameters"]
weights: npt.ArrayLike
if feat_dict.get("weight_factor", 0) > 0:
@@ -188,20 +195,23 @@ class FreqaiDataKitchen:
drop_index = pd.isnull(filtered_dataframe).any(1) # get the rows that have NaNs,
drop_index = drop_index.replace(True, 1).replace(False, 0) # pep8 requirement.
if (
training_filter
): # we don't care about total row number (total no. datapoints) in training, we only care
if (training_filter):
# we don't care about total row number (total no. datapoints) in training, we only care
# about removing any row with NaNs
# if labels has multiple columns (user wants to train multiple models), we detect here
# if labels has multiple columns (user wants to train multiple modelEs), we detect here
labels = unfiltered_dataframe.filter(label_list, axis=1)
drop_index_labels = pd.isnull(labels).any(1)
drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0)
dates = unfiltered_dataframe['date']
filtered_dataframe = filtered_dataframe[
(drop_index == 0) & (drop_index_labels == 0)
] # dropping values
labels = labels[
(drop_index == 0) & (drop_index_labels == 0)
] # assuming the labels depend entirely on the dataframe here.
self.train_dates = dates[
(drop_index == 0) & (drop_index_labels == 0)
]
logger.info(
f"dropped {len(unfiltered_dataframe) - len(filtered_dataframe)} training points"
f" due to NaNs in populated dataset {len(unfiltered_dataframe)}."
@@ -252,6 +262,7 @@ class FreqaiDataKitchen:
"test_labels": test_labels,
"train_weights": train_weights,
"test_weights": test_weights,
"train_dates": self.train_dates
}
return self.data_dictionary
@@ -279,7 +290,7 @@ class FreqaiDataKitchen:
self.data[item + "_min"] = train_min[item]
for item in data_dictionary["train_labels"].keys():
if data_dictionary["train_labels"][item].dtype == str:
if data_dictionary["train_labels"][item].dtype == object:
continue
train_labels_max = data_dictionary["train_labels"][item].max()
train_labels_min = data_dictionary["train_labels"][item].min()
@@ -305,8 +316,7 @@ class FreqaiDataKitchen:
"""
Normalize a set of data using the mean and standard deviation from
the associated training data.
:params:
:df: Dataframe to be standardized
:param df: Dataframe to be standardized
"""
for item in df.keys():
@@ -323,12 +333,11 @@ class FreqaiDataKitchen:
"""
Normalize a set of data using the mean and standard deviation from
the associated training data.
:params:
:df: Dataframe of predictions to be denormalized
:param df: Dataframe of predictions to be denormalized
"""
for label in self.label_list:
if df[label].dtype == str:
for label in df.columns:
if df[label].dtype == object:
continue
df[label] = (
(df[label] + 1)
@@ -339,7 +348,7 @@ class FreqaiDataKitchen:
return df
def split_timerange(
self, tr: str, train_split: int = 28, bt_split: int = 7
self, tr: str, train_split: int = 28, bt_split: float = 7
) -> Tuple[list, list]:
"""
Function which takes a single time range (tr) and splits it
@@ -347,12 +356,12 @@ class FreqaiDataKitchen:
tr: str, full timerange to train on
train_split: the period length for the each training (days). Specified in user
configuration file
bt_split: the backtesting length (dats). Specified in user configuration file
bt_split: the backtesting length (days). Specified in user configuration file
"""
if not isinstance(train_split, int) or train_split < 1:
raise OperationalException(
"train_period_days must be an integer greater than 0. " f"Got {train_split}."
f"train_period_days must be an integer greater than 0. Got {train_split}."
)
train_period_days = train_split * SECONDS_IN_DAY
bt_period = bt_split * SECONDS_IN_DAY
@@ -374,7 +383,7 @@ class FreqaiDataKitchen:
while True:
if not first:
timerange_train.startts = timerange_train.startts + bt_period
timerange_train.startts = timerange_train.startts + int(bt_period)
timerange_train.stopts = timerange_train.startts + train_period_days
first = False
@@ -387,7 +396,7 @@ class FreqaiDataKitchen:
timerange_backtest.startts = timerange_train.stopts
timerange_backtest.stopts = timerange_backtest.startts + bt_period
timerange_backtest.stopts = timerange_backtest.startts + int(bt_period)
if timerange_backtest.stopts > config_timerange.stopts:
timerange_backtest.stopts = config_timerange.stopts
@@ -408,10 +417,9 @@ class FreqaiDataKitchen:
def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
"""
Given a full dataframe, extract the user desired window
:params:
:tr: timerange string that we wish to extract from df
:df: Dataframe containing all candles to run the entire backtest. Here
it is sliced down to just the present training period.
:param tr: timerange string that we wish to extract from df
:param df: Dataframe containing all candles to run the entire backtest. Here
it is sliced down to just the present training period.
"""
start = datetime.datetime.fromtimestamp(timerange.startts, tz=datetime.timezone.utc)
@@ -489,11 +497,10 @@ class FreqaiDataKitchen:
point. This metric defines the neighborhood of trained data and is used
for prediction confidence in the Dissimilarity Index
"""
logger.info("computing average mean distance for all training points")
tc = self.freqai_config.get("model_training_parameters", {}).get("thread_count", -1)
pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=tc)
# logger.info("computing average mean distance for all training points")
pairwise = pairwise_distances(
self.data_dictionary["train_features"], n_jobs=self.thread_count)
avg_mean_dist = pairwise.mean(axis=1).mean()
logger.info(f"avg_mean_dist {avg_mean_dist:.2f}")
return avg_mean_dist
@@ -515,21 +522,22 @@ class FreqaiDataKitchen:
return
if predict:
assert self.svm_model, "No svm model available for outlier removal"
if not self.svm_model:
logger.warning("No svm model available for outlier removal")
return
y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
do_predict = np.where(y_pred == -1, 0, y_pred)
if (len(do_predict) - do_predict.sum()) > 0:
logger.info(
f"svm_remove_outliers() tossed {len(do_predict) - do_predict.sum()} predictions"
)
logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.")
self.do_predict += do_predict
self.do_predict -= 1
else:
# use SGDOneClassSVM to increase speed?
nu = self.freqai_config.get("feature_parameters", {}).get("svm_nu", 0.2)
self.svm_model = linear_model.SGDOneClassSVM(nu=nu).fit(
svm_params = self.freqai_config["feature_parameters"].get(
"svm_params", {"shuffle": False, "nu": 0.1})
self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit(
self.data_dictionary["train_features"]
)
y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
@@ -546,12 +554,14 @@ class FreqaiDataKitchen:
]
logger.info(
f"svm_remove_outliers() tossed {len(y_pred) - dropped_points.sum()}"
f" train points from {len(y_pred)}"
f"SVM tossed {len(y_pred) - dropped_points.sum()}"
f" train points from {len(y_pred)} total points."
)
# same for test data
if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
# TODO: This (and the part above) could be refactored into a separate function
# to reduce code duplication
if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0:
y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
dropped_points = np.where(y_pred == -1, 0, y_pred)
self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
@@ -564,8 +574,77 @@ class FreqaiDataKitchen:
]
logger.info(
f"svm_remove_outliers() tossed {len(y_pred) - dropped_points.sum()}"
f" test points from {len(y_pred)}"
f"SVM tossed {len(y_pred) - dropped_points.sum()}"
f" test points from {len(y_pred)} total points."
)
return
def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None:
"""
Use DBSCAN to cluster training data and remove "noisy" data (read outliers).
User controls this via the config param `DBSCAN_outlier_pct` which indicates the
pct of training data that they want to be considered outliers.
:params:
predict: bool = If False (training), iterate to find the best hyper parameters to match
user requested outlier percent target. If True (prediction), use the parameters
determined from the previous training to estimate if the current prediction point
is an outlier.
"""
if predict:
train_ft_df = self.data_dictionary['train_features']
pred_ft_df = self.data_dictionary['prediction_features']
num_preds = len(pred_ft_df)
df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True)
clustering = DBSCAN(eps=self.data['DBSCAN_eps'],
min_samples=self.data['DBSCAN_min_samples'],
n_jobs=self.thread_count
).fit(df)
do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1)
if (len(do_predict) - do_predict.sum()) > 0:
logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions")
self.do_predict += do_predict
self.do_predict -= 1
else:
MinPts = len(self.data_dictionary['train_features'].columns) * 2
# measure pairwise distances to train_features.shape[1]*2 nearest neighbours
neighbors = NearestNeighbors(
n_neighbors=MinPts, n_jobs=self.thread_count)
neighbors_fit = neighbors.fit(self.data_dictionary['train_features'])
distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features'])
distances = np.sort(distances, axis=0)
index_ten_pct = int(len(distances[:, 1]) * 0.1)
distances = distances[index_ten_pct:, 1]
epsilon = distances[-1]
clustering = DBSCAN(eps=epsilon, min_samples=MinPts,
n_jobs=int(self.thread_count)).fit(
self.data_dictionary['train_features']
)
logger.info(f'DBSCAN found eps of {epsilon}.')
self.data['DBSCAN_eps'] = epsilon
self.data['DBSCAN_min_samples'] = MinPts
dropped_points = np.where(clustering.labels_ == -1, 1, 0)
self.data_dictionary['train_features'] = self.data_dictionary['train_features'][
(clustering.labels_ != -1)
]
self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
(clustering.labels_ != -1)
]
self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
(clustering.labels_ != -1)
]
logger.info(
f"DBSCAN tossed {dropped_points.sum()}"
f" train points from {len(clustering.labels_)}"
)
return
@@ -573,9 +652,8 @@ class FreqaiDataKitchen:
def find_features(self, dataframe: DataFrame) -> None:
"""
Find features in the strategy provided dataframe
:params:
dataframe: DataFrame = strategy provided dataframe
:returns:
:param dataframe: DataFrame = strategy provided dataframe
:return:
features: list = the features to be used for training/prediction
"""
column_names = dataframe.columns
@@ -586,7 +664,6 @@ class FreqaiDataKitchen:
self.training_features_list = features
self.label_list = labels
# return features, labels
def check_if_pred_in_training_spaces(self) -> None:
"""
@@ -599,13 +676,13 @@ class FreqaiDataKitchen:
distance = pairwise_distances(
self.data_dictionary["train_features"],
self.data_dictionary["prediction_features"],
n_jobs=-1,
n_jobs=self.thread_count,
)
self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
do_predict = np.where(
self.DI_values < self.freqai_config.get("feature_parameters", {}).get("DI_threshold"),
self.DI_values < self.freqai_config["feature_parameters"]["DI_threshold"],
1,
0,
)
@@ -628,25 +705,27 @@ class FreqaiDataKitchen:
weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
return weights
def append_predictions(self, predictions, do_predict, len_dataframe):
def append_predictions(self, predictions: DataFrame, do_predict: npt.ArrayLike) -> None:
"""
Append backtest prediction from current backtest period to all previous periods
"""
self.append_df = DataFrame()
for label in self.label_list:
self.append_df[label] = predictions[label]
self.append_df[f"{label}_mean"] = self.data["labels_mean"][label]
self.append_df[f"{label}_std"] = self.data["labels_std"][label]
append_df = DataFrame()
for label in predictions.columns:
append_df[label] = predictions[label]
if append_df[label].dtype == object:
continue
append_df[f"{label}_mean"] = self.data["labels_mean"][label]
append_df[f"{label}_std"] = self.data["labels_std"][label]
self.append_df["do_predict"] = do_predict
if self.freqai_config.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
self.append_df["DI_values"] = self.DI_values
append_df["do_predict"] = do_predict
if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
append_df["DI_values"] = self.DI_values
if self.full_df.empty:
self.full_df = self.append_df
self.full_df = append_df
else:
self.full_df = pd.concat([self.full_df, self.append_df], axis=0)
self.full_df = pd.concat([self.full_df, append_df], axis=0)
return
@@ -666,7 +745,6 @@ class FreqaiDataKitchen:
to_keep = [col for col in dataframe.columns if not col.startswith("&")]
self.return_dataframe = pd.concat([dataframe[to_keep], self.full_df], axis=1)
self.append_df = DataFrame()
self.full_df = DataFrame()
return
@@ -683,7 +761,7 @@ class FreqaiDataKitchen:
if backtest_timerange.stopts == 0:
# typically open ended time ranges do work, however, there are some edge cases where
# it does not. accomodating these kinds of edge cases just to allow open-ended
# it does not. accommodating these kinds of edge cases just to allow open-ended
# timerange is not high enough priority to warrant the effort. It is safer for now
# to simply ask user to add their end date
raise OperationalException("FreqAI backtesting does not allow open ended timeranges. "
@@ -701,7 +779,7 @@ class FreqaiDataKitchen:
full_timerange = start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d")
self.full_path = Path(
self.config["user_data_dir"] / "models" / str(self.freqai_config.get("identifier"))
self.config["user_data_dir"] / "models" / f"{self.freqai_config['identifier']}"
)
config_path = Path(self.config["config_files"][0])
@@ -719,10 +797,9 @@ class FreqaiDataKitchen:
"""
A model age checker to determine if the model is trustworthy based on user defined
`expiration_hours` in the configuration file.
:params:
trained_timestamp: int = The time of training for the most recent model.
:returns:
bool = If the model is expired or not.
:param trained_timestamp: int = The time of training for the most recent model.
:return:
bool = If the model is expired or not.
"""
time = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
elapsed_time = (time - trained_timestamp) / 3600 # hours
@@ -740,30 +817,21 @@ class FreqaiDataKitchen:
trained_timerange = TimeRange()
data_load_timerange = TimeRange()
# find the max indicator length required
max_timeframe_chars = self.freqai_config.get("feature_parameters", {}).get(
"include_timeframes"
)[-1]
max_period = self.freqai_config.get("feature_parameters", {}).get(
"indicator_max_period_candles", 50
)
additional_seconds = 0
if max_timeframe_chars[-1] == "d":
additional_seconds = max_period * SECONDS_IN_DAY * int(max_timeframe_chars[-2])
elif max_timeframe_chars[-1] == "h":
additional_seconds = max_period * 3600 * int(max_timeframe_chars[-2])
elif max_timeframe_chars[-1] == "m":
if len(max_timeframe_chars) == 2:
additional_seconds = max_period * 60 * int(max_timeframe_chars[-2])
elif len(max_timeframe_chars) == 3:
additional_seconds = max_period * 60 * int(float(max_timeframe_chars[0:2]))
else:
logger.warning(
"FreqAI could not detect max timeframe and therefore may not "
"download the proper amount of data for training"
)
timeframes = self.freqai_config["feature_parameters"].get("include_timeframes")
# logger.info(f'Extending data download by {additional_seconds/SECONDS_IN_DAY:.2f} days')
max_tf_seconds = 0
for tf in timeframes:
secs = timeframe_to_seconds(tf)
if secs > max_tf_seconds:
max_tf_seconds = secs
# We notice that users like to use exotic indicators where
# they do not know the required timeperiod. Here we include a factor
# of safety by multiplying the user considered "max" by 2.
max_period = self.freqai_config["feature_parameters"].get(
"indicator_max_period_candles", 20
) * 2
additional_seconds = max_period * max_tf_seconds
if trained_timestamp != 0:
elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR
@@ -784,7 +852,7 @@ class FreqaiDataKitchen:
data_load_timerange.stopts = int(time)
else: # user passed no live_trained_timerange in config
trained_timerange.startts = int(
time - self.freqai_config.get("train_period_days") * SECONDS_IN_DAY
time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
)
trained_timerange.stopts = int(time)
@@ -815,24 +883,22 @@ class FreqaiDataKitchen:
self.model_filename = f"cb_{coin.lower()}_{int(trained_timerange.stopts)}"
def download_all_data_for_training(self, timerange: TimeRange) -> None:
def download_all_data_for_training(self, timerange: TimeRange, dp: DataProvider) -> None:
"""
Called only once upon start of bot to download the necessary data for
populating indicators and training the model.
:params:
timerange: TimeRange = The full data timerange for populating the indicators
and training the model.
:param timerange: TimeRange = The full data timerange for populating the indicators
and training the model.
:param dp: DataProvider instance attached to the strategy
"""
exchange = ExchangeResolver.load_exchange(
self.config["exchange"]["name"], self.config, validate=False, load_leverage_tiers=False
)
new_pairs_days = int((timerange.stopts - timerange.startts) / SECONDS_IN_DAY)
if not dp._exchange:
# Not realistic - this is only called in live mode.
raise OperationalException("Dataprovider did not have an exchange attached.")
refresh_backtest_ohlcv_data(
exchange,
dp._exchange,
pairs=self.all_pairs,
timeframes=self.freqai_config.get("feature_parameters", {}).get("include_timeframes"),
timeframes=self.freqai_config["feature_parameters"].get("include_timeframes"),
datadir=self.config["datadir"],
timerange=timerange,
new_pairs_days=new_pairs_days,
@@ -845,7 +911,7 @@ class FreqaiDataKitchen:
def set_all_pairs(self) -> None:
self.all_pairs = copy.deepcopy(
self.freqai_config.get("feature_parameters", {}).get("include_corr_pairlist", [])
self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
)
for pair in self.config.get("exchange", "").get("pair_whitelist"):
if pair not in self.all_pairs:
@@ -876,8 +942,8 @@ class FreqaiDataKitchen:
# for prediction dataframe creation, we let dataprovider handle everything in the strategy
# so we create empty dictionaries, which allows us to pass None to
# `populate_any_indicators()`. Signaling we want the dp to give us the live dataframe.
tfs = self.freqai_config.get("feature_parameters", {}).get("include_timeframes")
pairs = self.freqai_config.get("feature_parameters", {}).get("include_corr_pairlist", [])
tfs = self.freqai_config["feature_parameters"].get("include_timeframes")
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
if not prediction_dataframe.empty:
dataframe = prediction_dataframe.copy()
for tf in tfs:
@@ -889,29 +955,26 @@ class FreqaiDataKitchen:
else:
dataframe = base_dataframes[self.config["timeframe"]].copy()
sgi = True
sgi = False
for tf in tfs:
if tf == tfs[-1]:
sgi = True # doing this last allows user to use all tf raw prices in labels
dataframe = strategy.populate_any_indicators(
pair,
pair,
dataframe.copy(),
tf,
informative=base_dataframes[tf],
coin=pair.split("/")[0] + "-",
set_generalized_indicators=sgi,
set_generalized_indicators=sgi
)
sgi = False
if pairs:
for i in pairs:
if pair in i:
continue # dont repeat anything from whitelist
dataframe = strategy.populate_any_indicators(
pair,
i,
dataframe.copy(),
tf,
informative=corr_dataframes[i][tf],
coin=i.split("/")[0] + "-",
informative=corr_dataframes[i][tf]
)
return dataframe
@@ -923,17 +986,12 @@ class FreqaiDataKitchen:
import scipy as spy
self.data["labels_mean"], self.data["labels_std"] = {}, {}
for label in self.label_list:
for label in self.data_dictionary["train_labels"].columns:
if self.data_dictionary["train_labels"][label].dtype == object:
continue
f = spy.stats.norm.fit(self.data_dictionary["train_labels"][label])
self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
# KEEPME incase we want to let user start to grab quantiles.
# upper_q = spy.stats.norm.ppf(self.freqai_config['feature_parameters'][
# 'target_quantile'], *f)
# lower_q = spy.stats.norm.ppf(1 - self.freqai_config['feature_parameters'][
# 'target_quantile'], *f)
# self.data["upper_quantile"] = upper_q
# self.data["lower_quantile"] = lower_q
return
def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
@@ -945,168 +1003,3 @@ class FreqaiDataKitchen:
col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
]
return dataframe[to_keep]
def np_encoder(self, object):
if isinstance(object, np.generic):
return object.item()
# Functions containing useful data manpulation examples. but not actively in use.
# Possibly phasing these outlier removal methods below out in favor of
# use_SVM_to_remove_outliers (computationally more efficient and apparently higher performance).
# But these have good data manipulation examples, so keep them commented here for now.
# def determine_statistical_distributions(self) -> None:
# from fitter import Fitter
# logger.info('Determining best model for all features, may take some time')
# def compute_quantiles(ft):
# f = Fitter(self.data_dictionary["train_features"][ft],
# distributions=['gamma', 'cauchy', 'laplace',
# 'beta', 'uniform', 'lognorm'])
# f.fit()
# # f.summary()
# dist = list(f.get_best().items())[0][0]
# params = f.get_best()[dist]
# upper_q = getattr(spy.stats, list(f.get_best().items())[0][0]).ppf(0.999, **params)
# lower_q = getattr(spy.stats, list(f.get_best().items())[0][0]).ppf(0.001, **params)
# return ft, upper_q, lower_q, dist
# quantiles_tuple = Parallel(n_jobs=-1)(
# delayed(compute_quantiles)(ft) for ft in self.data_dictionary[
# 'train_features'].columns)
# df = pd.DataFrame(quantiles_tuple, columns=['features', 'upper_quantiles',
# 'lower_quantiles', 'dist'])
# self.data_dictionary['upper_quantiles'] = df['upper_quantiles']
# self.data_dictionary['lower_quantiles'] = df['lower_quantiles']
# return
# def remove_outliers(self, predict: bool) -> None:
# """
# Remove data that looks like an outlier based on the distribution of each
# variable.
# :params:
# :predict: boolean which tells the function if this is prediction data or
# training data coming in.
# """
# lower_quantile = self.data_dictionary["lower_quantiles"].to_numpy()
# upper_quantile = self.data_dictionary["upper_quantiles"].to_numpy()
# if predict:
# df = self.data_dictionary["prediction_features"][
# (self.data_dictionary["prediction_features"] < upper_quantile)
# & (self.data_dictionary["prediction_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(df).any(1)
# self.data_dictionary["prediction_features"].fillna(0, inplace=True)
# drop_index = ~drop_index
# do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
# logger.info(
# "remove_outliers() tossed %s predictions",
# len(do_predict) - do_predict.sum(),
# )
# self.do_predict += do_predict
# self.do_predict -= 1
# else:
# filter_train_df = self.data_dictionary["train_features"][
# (self.data_dictionary["train_features"] < upper_quantile)
# & (self.data_dictionary["train_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(filter_train_df).any(1)
# drop_index = drop_index.replace(True, 1).replace(False, 0)
# self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
# (drop_index == 0)
# ]
# self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
# (drop_index == 0)
# ]
# self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
# (drop_index == 0)
# ]
# logger.info(
# f'remove_outliers() tossed {drop_index.sum()}'
# f' training points from {len(filter_train_df)}'
# )
# # do the same for the test data
# filter_test_df = self.data_dictionary["test_features"][
# (self.data_dictionary["test_features"] < upper_quantile)
# & (self.data_dictionary["test_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(filter_test_df).any(1)
# drop_index = drop_index.replace(True, 1).replace(False, 0)
# self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][
# (drop_index == 0)
# ]
# self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
# (drop_index == 0)
# ]
# self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
# (drop_index == 0)
# ]
# logger.info(
# f'remove_outliers() tossed {drop_index.sum()}'
# f' test points from {len(filter_test_df)}'
# )
# return
# def standardize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
# """
# standardize all data in the data_dictionary according to the training dataset
# :params:
# :data_dictionary: dictionary containing the cleaned and split training/test data/labels
# :returns:
# :data_dictionary: updated dictionary with standardized values.
# """
# # standardize the data by training stats
# train_mean = data_dictionary["train_features"].mean()
# train_std = data_dictionary["train_features"].std()
# data_dictionary["train_features"] = (
# data_dictionary["train_features"] - train_mean
# ) / train_std
# data_dictionary["test_features"] = (
# data_dictionary["test_features"] - train_mean
# ) / train_std
# train_labels_std = data_dictionary["train_labels"].std()
# train_labels_mean = data_dictionary["train_labels"].mean()
# data_dictionary["train_labels"] = (
# data_dictionary["train_labels"] - train_labels_mean
# ) / train_labels_std
# data_dictionary["test_labels"] = (
# data_dictionary["test_labels"] - train_labels_mean
# ) / train_labels_std
# for item in train_std.keys():
# self.data[item + "_std"] = train_std[item]
# self.data[item + "_mean"] = train_mean[item]
# self.data["labels_std"] = train_labels_std
# self.data["labels_mean"] = train_labels_mean
# return data_dictionary
# def standardize_data_from_metadata(self, df: DataFrame) -> DataFrame:
# """
# Normalizes a set of data using the mean and standard deviation from
# the associated training data.
# :params:
# :df: Dataframe to be standardized
# """
# for item in df.keys():
# df[item] = (df[item] - self.data[item + "_mean"]) / self.data[item + "_std"]
# return df

View File

@@ -1,7 +1,5 @@
# import contextlib
import copy
import datetime
import gc
import logging
import shutil
import threading
@@ -12,7 +10,7 @@ from typing import Any, Dict, Tuple
import numpy as np
import pandas as pd
from numpy.typing import ArrayLike
from numpy.typing import NDArray
from pandas import DataFrame
from freqtrade.configuration import TimeRange
@@ -47,7 +45,7 @@ class IFreqaiModel(ABC):
Robert Caulk @robcaulk
Theoretical brainstorming:
Elin Törnquist @thorntwig
Elin Törnquist @th0rntwig
Code review, software architecture brainstorming:
@xmatthias
@@ -82,6 +80,8 @@ class IFreqaiModel(ABC):
self.CONV_WIDTH = self.freqai_info.get("conv_width", 2)
self.pair_it = 0
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
self.last_trade_database_summary: DataFrame = {}
self.current_trade_database_summary: DataFrame = {}
def assert_config(self, config: Dict[str, Any]) -> None:
@@ -123,7 +123,7 @@ class IFreqaiModel(ABC):
dataframe = dk.remove_features_from_df(dk.return_dataframe)
del dk
return self.return_values(dataframe)
return dataframe
@threaded
def start_scanning(self, strategy: IStrategy) -> None:
@@ -183,8 +183,6 @@ class IFreqaiModel(ABC):
(_, _, _) = self.dd.get_pair_dict_info(metadata["pair"])
train_it += 1
total_trains = len(dk.backtesting_timeranges)
gc.collect()
dk.data = {} # clean the pair specific data between training window sliding
self.training_timerange = tr_train
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
@@ -204,14 +202,9 @@ class IFreqaiModel(ABC):
dk.data_path = Path(
dk.full_path
/ str(
"sub-train"
+ "-"
+ metadata["pair"].split("/")[0]
+ "_"
+ str(int(trained_timestamp.stopts))
/
f"sub-train-{metadata['pair'].split('/')[0]}_{int(trained_timestamp.stopts)}"
)
)
if not self.model_exists(
metadata["pair"], dk, trained_timestamp=int(trained_timestamp.stopts)
):
@@ -228,7 +221,7 @@ class IFreqaiModel(ABC):
pred_df, do_preds = self.predict(dataframe_backtest, dk)
dk.append_predictions(pred_df, do_preds, len(dataframe_backtest))
dk.append_predictions(pred_df, do_preds)
dk.fill_predictions(dataframe)
@@ -280,7 +273,7 @@ class IFreqaiModel(ABC):
"corr_pairlist, this may take a while if you do not have the "
"data saved"
)
dk.download_all_data_for_training(data_load_timerange)
dk.download_all_data_for_training(data_load_timerange, strategy.dp)
self.dd.load_all_pair_histories(data_load_timerange, dk)
if not self.scanning:
@@ -331,7 +324,8 @@ class IFreqaiModel(ABC):
return
elif self.dk.check_if_model_expired(trained_timestamp):
pred_df = DataFrame(np.zeros((2, len(dk.label_list))), columns=dk.label_list)
do_preds, dk.DI_values = np.ones(2) * 2, np.zeros(2)
do_preds = np.ones(2, dtype=np.int_) * 2
dk.DI_values = np.zeros(2)
logger.warning(
f"Model expired for {pair}, returning null values to strategy. Strategy "
"construction should take care to consider this event with "
@@ -379,17 +373,25 @@ class IFreqaiModel(ABC):
example of how outlier data points are dropped from the dataframe used for training.
"""
if self.freqai_info.get("feature_parameters", {}).get(
if self.freqai_info["feature_parameters"].get(
"principal_component_analysis", False
):
dk.principal_component_analysis()
if self.freqai_info.get("feature_parameters", {}).get("use_SVM_to_remove_outliers", False):
if self.freqai_info["feature_parameters"].get("use_SVM_to_remove_outliers", False):
dk.use_SVM_to_remove_outliers(predict=False)
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0):
if self.freqai_info["feature_parameters"].get("DI_threshold", 0):
dk.data["avg_mean_dist"] = dk.compute_distances()
if self.freqai_info["feature_parameters"].get("use_DBSCAN_to_remove_outliers", False):
if dk.pair in self.dd.old_DBSCAN_eps:
eps = self.dd.old_DBSCAN_eps[dk.pair]
else:
eps = None
dk.use_DBSCAN_to_remove_outliers(predict=False, eps=eps)
self.dd.old_DBSCAN_eps[dk.pair] = dk.data['DBSCAN_eps']
def data_cleaning_predict(self, dk: FreqaiDataKitchen, dataframe: DataFrame) -> None:
"""
Base data cleaning method for predict.
@@ -401,17 +403,20 @@ class IFreqaiModel(ABC):
of how the do_predict vector is modified. do_predict is ultimately passed back to strategy
for buy signals.
"""
if self.freqai_info.get("feature_parameters", {}).get(
if self.freqai_info["feature_parameters"].get(
"principal_component_analysis", False
):
dk.pca_transform(dataframe)
if self.freqai_info.get("feature_parameters", {}).get("use_SVM_to_remove_outliers", False):
if self.freqai_info["feature_parameters"].get("use_SVM_to_remove_outliers", False):
dk.use_SVM_to_remove_outliers(predict=True)
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0):
if self.freqai_info["feature_parameters"].get("DI_threshold", 0):
dk.check_if_pred_in_training_spaces()
if self.freqai_info["feature_parameters"].get("use_DBSCAN_to_remove_outliers", False):
dk.use_DBSCAN_to_remove_outliers(predict=True)
def model_exists(
self,
pair: str,
@@ -430,9 +435,9 @@ class IFreqaiModel(ABC):
coin, _ = pair.split("/")
if not self.live:
dk.model_filename = model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp)
dk.model_filename = model_filename = f"cb_{coin.lower()}_{trained_timestamp}"
path_to_modelfile = Path(dk.data_path / str(model_filename + "_model.joblib"))
path_to_modelfile = Path(dk.data_path / f"{model_filename}_model.joblib")
file_exists = path_to_modelfile.is_file()
if file_exists and not scanning:
logger.info("Found model at %s", dk.data_path / dk.model_filename)
@@ -442,7 +447,7 @@ class IFreqaiModel(ABC):
def set_full_path(self) -> None:
self.full_path = Path(
self.config["user_data_dir"] / "models" / str(self.freqai_info.get("identifier"))
self.config["user_data_dir"] / "models" / f"{self.freqai_info['identifier']}"
)
self.full_path.mkdir(parents=True, exist_ok=True)
shutil.copy(
@@ -500,13 +505,54 @@ class IFreqaiModel(ABC):
def set_initial_historic_predictions(
self, df: DataFrame, model: Any, dk: FreqaiDataKitchen, pair: str
) -> None:
trained_predictions = model.predict(df)
"""
This function is called only if the datadrawer failed to load an
existing set of historic predictions. In this case, it builds
the structure and sets fake predictions off the first training
data. After that, FreqAI will append new real predictions to the
set of historic predictions.
These values are used to generate live statistics which can be used
in the strategy for adaptive values. E.g. &*_mean/std are quantities
that can computed based on live predictions from the set of historical
predictions. Those values can be used in the user strategy to better
assess prediction rarity, and thus wait for probabilistically favorable
entries relative to the live historical predictions.
If the user reuses an identifier on a subsequent instance,
this function will not be called. In that case, "real" predictions
will be appended to the loaded set of historic predictions.
:param: df: DataFrame = the dataframe containing the training feature data
:param: model: Any = A model which was `fit` using a common library such as
catboost or lightgbm
:param: dk: FreqaiDataKitchen = object containing methods for data analysis
:param: pair: str = current pair
"""
num_candles = self.freqai_info.get('fit_live_predictions_candles', 600)
if not num_candles:
num_candles = 600
df_tail = df.tail(num_candles)
trained_predictions = model.predict(df_tail)
pred_df = DataFrame(trained_predictions, columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df)
self.dd.historic_predictions[pair] = pd.DataFrame()
self.dd.historic_predictions[pair] = copy.deepcopy(pred_df)
self.dd.historic_predictions[pair] = pred_df
hist_preds_df = self.dd.historic_predictions[pair]
for label in hist_preds_df.columns:
if hist_preds_df[label].dtype == object:
continue
hist_preds_df[f'{label}_mean'] = 0
hist_preds_df[f'{label}_std'] = 0
hist_preds_df['do_predict'] = 0
if self.freqai_info['feature_parameters'].get('DI_threshold', 0) > 0:
hist_preds_df['DI_values'] = 0
for return_str in dk.data['extra_returns_per_train']:
hist_preds_df[return_str] = 0
def fit_live_predictions(self, dk: FreqaiDataKitchen) -> None:
"""
@@ -517,13 +563,15 @@ class IFreqaiModel(ABC):
num_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
for label in dk.label_list:
if self.dd.historic_predictions[dk.pair][label].dtype == object:
continue
f = spy.stats.norm.fit(self.dd.historic_predictions[dk.pair][label].tail(num_candles))
dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
return
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModlel.py for an example.
# See freqai/prediction_models/CatboostPredictionModel.py for an example.
@abstractmethod
def train(self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen) -> Any:
@@ -550,7 +598,7 @@ class IFreqaiModel(ABC):
@abstractmethod
def predict(
self, dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = True
) -> Tuple[DataFrame, ArrayLike]:
) -> Tuple[DataFrame, NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param unfiltered_dataframe: Full dataframe for the current backtest period.
@@ -561,14 +609,3 @@ class IFreqaiModel(ABC):
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (i.e. SVM and/or DI index)
"""
@abstractmethod
def return_values(self, dataframe: DataFrame) -> DataFrame:
"""
User defines the dataframe to be returned to strategy here.
:param dataframe: DataFrame = the full dataframe for the current prediction (live)
or --timerange (backtesting)
:return: dataframe: DataFrame = dataframe filled with user defined data
"""
return

View File

@@ -1,6 +1,7 @@
import logging
from typing import Any, Tuple
import numpy as np
import numpy.typing as npt
from pandas import DataFrame
@@ -18,15 +19,6 @@ class BaseRegressionModel(IFreqaiModel):
such as prediction_models/CatboostPredictionModel.py for guidance.
"""
def return_values(self, dataframe: DataFrame) -> DataFrame:
"""
User uses this function to add any additional return values to the dataframe.
e.g.
dataframe['volatility'] = dk.volatility_values
"""
return dataframe
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Any:
@@ -55,6 +47,8 @@ class BaseRegressionModel(IFreqaiModel):
f"{end_date}--------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get('fit_live_predictions', 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -74,8 +68,6 @@ class BaseRegressionModel(IFreqaiModel):
if self.freqai_info.get('fit_live_predictions_candles', 0) and self.live:
self.fit_live_predictions(dk)
else:
dk.fit_labels()
self.dd.save_historic_predictions_to_disk()
@@ -85,7 +77,7 @@ class BaseRegressionModel(IFreqaiModel):
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = False
) -> Tuple[DataFrame, npt.ArrayLike]:
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.

View File

@@ -16,15 +16,6 @@ class BaseTensorFlowModel(IFreqaiModel):
User *must* inherit from this class and set fit() and predict().
"""
def return_values(self, dataframe: DataFrame) -> DataFrame:
"""
User uses this function to add any additional return values to the dataframe.
e.g.
dataframe['volatility'] = dk.volatility_values
"""
return dataframe
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Any:

View File

@@ -0,0 +1,41 @@
import logging
from typing import Any, Dict
from catboost import CatBoostClassifier, Pool
from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressionModel
logger = logging.getLogger(__name__)
class CatboostClassifier(BaseRegressionModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def fit(self, data_dictionary: Dict) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
train_data = Pool(
data=data_dictionary["train_features"],
label=data_dictionary["train_labels"],
weight=data_dictionary["train_weights"],
)
cbr = CatBoostClassifier(
allow_writing_files=False,
loss_function='MultiClass',
**self.model_training_parameters,
)
cbr.fit(train_data)
return cbr

View File

@@ -1,6 +1,7 @@
import gc
import logging
from typing import Any, Dict
import gc
from catboost import CatBoostRegressor, Pool
from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressionModel
@@ -9,7 +10,7 @@ from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressio
logger = logging.getLogger(__name__)
class CatboostPredictionModel(BaseRegressionModel):
class CatboostRegressor(BaseRegressionModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which

View File

@@ -10,7 +10,7 @@ from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressio
logger = logging.getLogger(__name__)
class CatboostPredictionMultiModel(BaseRegressionModel):
class CatboostRegressorMultiTarget(BaseRegressionModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which

View File

@@ -0,0 +1,38 @@
import logging
from typing import Any, Dict
from lightgbm import LGBMClassifier
from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressionModel
logger = logging.getLogger(__name__)
class LightGBMClassifier(BaseRegressionModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def fit(self, data_dictionary: Dict) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
if self.freqai_info.get('data_split_parameters', {}).get('test_size', 0.1) == 0:
eval_set = None
else:
eval_set = (data_dictionary["test_features"], data_dictionary["test_labels"])
X = data_dictionary["train_features"]
y = data_dictionary["train_labels"]
model = LGBMClassifier(**self.model_training_parameters)
model.fit(X=X, y=y, eval_set=eval_set)
return model

View File

@@ -9,7 +9,7 @@ from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressio
logger = logging.getLogger(__name__)
class LightGBMPredictionModel(BaseRegressionModel):
class LightGBMRegressor(BaseRegressionModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which

View File

@@ -10,7 +10,7 @@ from freqtrade.freqai.prediction_models.BaseRegressionModel import BaseRegressio
logger = logging.getLogger(__name__)
class LightGBMPredictionMultiModel(BaseRegressionModel):
class LightGBMRegressorMultiTarget(BaseRegressionModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which