Aggregated commit. Adding support vector machine for outlier detection, improve user interface to dry/live, better standardization, fix various other bugs

This commit is contained in:
robcaulk
2022-05-22 17:51:49 +02:00
parent c5ecf94177
commit 42d95af829
7 changed files with 404 additions and 300 deletions

View File

@@ -10,8 +10,9 @@ from typing import Any, Dict, List, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
from joblib import dump, load
from joblib import dump, load # , Parallel, delayed # used for auto distribution assignment
from pandas import DataFrame
from sklearn import linear_model
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
@@ -22,6 +23,9 @@ from freqtrade.resolvers import ExchangeResolver
from freqtrade.strategy.interface import IStrategy
# import scipy as spy # used for auto distribution assignment
SECONDS_IN_DAY = 86400
logger = logging.getLogger(__name__)
@@ -52,6 +56,7 @@ class FreqaiDataKitchen:
self.model_filename: str = ""
self.model_dictionary: Dict[Any, Any] = {}
self.live = live
self.svm_model: linear_model.SGDOneClassSVM = None
if not self.live:
self.full_timerange = self.create_fulltimerange(self.config["timerange"],
self.freqai_config["train_period"]
@@ -89,6 +94,10 @@ class FreqaiDataKitchen:
# Save the trained model
dump(model, save_path / str(self.model_filename + "_model.joblib"))
if self.svm_model is not None:
dump(self.svm_model, save_path / str(self.model_filename + "_svm_model.joblib"))
self.data["model_path"] = str(self.model_path)
self.data["model_filename"] = str(self.model_filename)
self.data["training_features_list"] = list(self.data_dictionary["train_features"].columns)
@@ -104,6 +113,19 @@ class FreqaiDataKitchen:
if self.live:
self.model_dictionary[self.model_filename] = model
# TODO add a helper function to let user save/load any data they are custom adding. We
# do not want them having to edit the default save/load methods here. Below is an example
# of what we do NOT want.
# if self.freqai_config['feature_parameters']['determine_statistical_distributions']:
# self.data_dictionary["upper_quantiles"].to_pickle(
# save_path / str(self.model_filename + "_upper_quantiles.pkl")
# )
# self.data_dictionary["lower_quantiles"].to_pickle(
# save_path / str(self.model_filename + "_lower_quantiles.pkl")
# )
return
def load_data(self) -> Any:
@@ -121,6 +143,19 @@ class FreqaiDataKitchen:
self.model_path / str(self.model_filename + "_trained_df.pkl")
)
# TODO add a helper function to let user save/load any data they are custom adding. We
# do not want them having to edit the default save/load methods here. Below is an example
# of what we do NOT want.
# if self.freqai_config['feature_parameters']['determine_statistical_distributions']:
# self.data_dictionary["upper_quantiles"] = pd.read_pickle(
# self.model_path / str(self.model_filename + "_upper_quantiles.pkl")
# )
# self.data_dictionary["lower_quantiles"] = pd.read_pickle(
# self.model_path / str(self.model_filename + "_lower_quantiles.pkl")
# )
self.model_path = Path(self.data["model_path"])
self.model_filename = self.data["model_filename"]
@@ -130,6 +165,10 @@ class FreqaiDataKitchen:
else:
model = load(self.model_path / str(self.model_filename + "_model.joblib"))
if Path(self.model_path / str(self.model_filename +
"_svm_model.joblib")).resolve().exists():
self.svm_model = load(self.model_path / str(self.model_filename + "_svm_model.joblib"))
assert model, (
f"Unable to load model, ensure model exists at "
f"{self.model_path} "
@@ -159,6 +198,12 @@ class FreqaiDataKitchen:
else:
weights = np.ones(len(filtered_dataframe))
if self.config["freqai"]["feature_parameters"]["stratify"] > 0:
stratification = np.zeros(len(filtered_dataframe))
for i in range(1, len(stratification)):
if i % self.config["freqai"]["feature_parameters"]["stratify"] == 0:
stratification[i] = 1
(
train_features,
test_features,
@@ -170,6 +215,8 @@ class FreqaiDataKitchen:
filtered_dataframe[: filtered_dataframe.shape[0]],
labels,
weights,
stratify=stratification,
# shuffle=False,
**self.config["freqai"]["data_split_parameters"]
)
@@ -261,9 +308,9 @@ class FreqaiDataKitchen:
return self.data_dictionary
def standardize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
"""
Standardize all data in the data_dictionary according to the training dataset
Normalize all data in the data_dictionary according to the training dataset
:params:
:data_dictionary: dictionary containing the cleaned and split training/test data/labels
:returns:
@@ -297,6 +344,42 @@ class FreqaiDataKitchen:
return data_dictionary
def standardize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
"""
Standardize all data in the data_dictionary according to the training dataset
:params:
:data_dictionary: dictionary containing the cleaned and split training/test data/labels
:returns:
:data_dictionary: updated dictionary with standardized values.
"""
# standardize the data by training stats
train_max = data_dictionary["train_features"].max()
train_min = data_dictionary["train_features"].min()
data_dictionary["train_features"] = 2 * (
data_dictionary["train_features"] - train_min
) / (train_max - train_min) - 1
data_dictionary["test_features"] = 2 * (
data_dictionary["test_features"] - train_min
) / (train_max - train_min) - 1
train_labels_max = data_dictionary["train_labels"].max()
train_labels_min = data_dictionary["train_labels"].min()
data_dictionary["train_labels"] = 2 * (
data_dictionary["train_labels"] - train_labels_min
) / (train_labels_max - train_labels_min) - 1
data_dictionary["test_labels"] = 2 * (
data_dictionary["test_labels"] - train_labels_min
) / (train_labels_max - train_labels_min) - 1
for item in train_max.keys():
self.data[item + "_max"] = train_max[item]
self.data[item + "_min"] = train_min[item]
self.data["labels_max"] = train_labels_max
self.data["labels_min"] = train_labels_min
return data_dictionary
def standardize_data_from_metadata(self, df: DataFrame) -> DataFrame:
"""
Standardizes a set of data using the mean and standard deviation from
@@ -305,6 +388,20 @@ class FreqaiDataKitchen:
:df: Dataframe to be standardized
"""
for item in df.keys():
df[item] = 2 * (df[item] - self.data[item + "_min"]) / (self.data[item + "_max"] -
self.data[item + '_min']) - 1
return df
def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame:
"""
Normalizes a set of data using the mean and standard deviation from
the associated training data.
:params:
:df: Dataframe to be standardized
"""
for item in df.keys():
df[item] = (df[item] - self.data[item + "_mean"]) / self.data[item + "_std"]
@@ -420,6 +517,8 @@ class FreqaiDataKitchen:
self.data["n_kept_components"] = n_keep_components
self.pca = pca2
logger.info(f'PCA reduced total features from {n_components} to {n_keep_components}')
if not self.model_path.is_dir():
self.model_path.mkdir(parents=True, exist_ok=True)
pk.dump(pca2, open(self.model_path / str(self.model_filename + "_pca_object.pkl"), "wb"))
@@ -434,70 +533,53 @@ class FreqaiDataKitchen:
return avg_mean_dist
def remove_outliers(self, predict: bool) -> None:
"""
Remove data that looks like an outlier based on the distribution of each
variable.
:params:
:predict: boolean which tells the function if this is prediction data or
training data coming in.
"""
lower_quantile = self.data_dictionary["train_features"].quantile(0.001)
upper_quantile = self.data_dictionary["train_features"].quantile(0.999)
def use_SVM_to_remove_outliers(self, predict: bool) -> None:
if predict:
df = self.data_dictionary["prediction_features"][
(self.data_dictionary["prediction_features"] < upper_quantile)
& (self.data_dictionary["prediction_features"] > lower_quantile)
]
drop_index = pd.isnull(df).any(1)
self.data_dictionary["prediction_features"].fillna(0, inplace=True)
drop_index = ~drop_index
do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
assert self.svm_model, "No svm model available for outlier removal"
y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
do_predict = np.where(y_pred == -1, 0, y_pred)
logger.info(
"remove_outliers() tossed %s predictions",
len(do_predict) - do_predict.sum(),
f'svm_remove_outliers() tossed {len(do_predict) - do_predict.sum()} predictions'
)
self.do_predict += do_predict
self.do_predict -= 1
else:
# use SGDOneClassSVM to increase speed?
self.svm_model = linear_model.SGDOneClassSVM(nu=0.1).fit(
self.data_dictionary["train_features"]
)
y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
dropped_points = np.where(y_pred == -1, 0, y_pred)
# keep_index = np.where(y_pred == 1)
self.data_dictionary["train_features"] = self.data_dictionary[
"train_features"][(y_pred == 1)]
self.data_dictionary["train_labels"] = self.data_dictionary[
"train_labels"][(y_pred == 1)]
self.data_dictionary["train_weights"] = self.data_dictionary[
"train_weights"][(y_pred == 1)]
filter_train_df = self.data_dictionary["train_features"][
(self.data_dictionary["train_features"] < upper_quantile)
& (self.data_dictionary["train_features"] > lower_quantile)
]
drop_index = pd.isnull(filter_train_df).any(1)
drop_index = drop_index.replace(True, 1).replace(False, 0)
self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
(drop_index == 0)
]
self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
(drop_index == 0)
]
self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
(drop_index == 0)
]
logger.info(
f'svm_remove_outliers() tossed {len(y_pred) - dropped_points.sum()}'
f' train points from {len(y_pred)}'
)
# do the same for the test data
filter_test_df = self.data_dictionary["test_features"][
(self.data_dictionary["test_features"] < upper_quantile)
& (self.data_dictionary["test_features"] > lower_quantile)
]
drop_index = pd.isnull(filter_test_df).any(1)
drop_index = drop_index.replace(True, 1).replace(False, 0)
self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][
(drop_index == 0)
]
self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
(drop_index == 0)
]
self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
(drop_index == 0)
]
# same for test data
y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
dropped_points = np.where(y_pred == -1, 0, y_pred)
self.data_dictionary["test_features"] = self.data_dictionary[
"test_features"][(y_pred == 1)]
self.data_dictionary["test_labels"] = self.data_dictionary[
"test_labels"][(y_pred == 1)]
self.data_dictionary["test_weights"] = self.data_dictionary[
"test_weights"][(y_pred == 1)]
logger.info(
f'svm_remove_outliers() tossed {len(y_pred) - dropped_points.sum()}'
f' test points from {len(y_pred)}'
)
return
@@ -507,32 +589,6 @@ class FreqaiDataKitchen:
assert features, ("Could not find any features!")
return features
# def build_feature_list(self, config: dict, metadata: dict) -> list:
# """
# SUPERCEDED BY self.find_features()
# Build the list of features that will be used to filter
# the full dataframe. Feature list is construced from the
# user configuration file.
# :params:
# :config: Canonical freqtrade config file containing all
# user defined input in config['freqai] dictionary.
# """
# features = []
# for tf in config["freqai"]["timeframes"]:
# for ft in config["freqai"]["base_features"]:
# for n in range(config["freqai"]["feature_parameters"]["shift"] + 1):
# shift = ""
# if n > 0:
# shift = "_shift-" + str(n)
# features.append(metadata['pair'].split("/")[0] + "-" + ft + shift + "_" + tf)
# for p in config["freqai"]["corr_pairlist"]:
# if metadata['pair'] in p:
# continue # avoid duplicate features
# features.append(p.split("/")[0] + "-" + ft + shift + "_" + tf)
# # logger.info("number of features %s", len(features))
# return features
def check_if_pred_in_training_spaces(self) -> None:
"""
Compares the distance from each prediction point to each training data
@@ -568,7 +624,7 @@ class FreqaiDataKitchen:
training than older data.
"""
weights = np.zeros_like(num_weights)
weights = np.zeros(num_weights)
for i in range(1, len(weights)):
weights[len(weights) - i] = np.exp(
-i / (self.config["freqai"]["feature_parameters"]["weight_factor"] * num_weights)
@@ -638,19 +694,23 @@ class FreqaiDataKitchen:
time = datetime.datetime.now(tz=datetime.timezone.utc).timestamp()
trained_timerange = TimeRange.parse_timerange(training_timerange)
if training_timerange: # user passed no live_trained_timerange in config
trained_timerange = TimeRange.parse_timerange(training_timerange)
elapsed_time = (time - trained_timerange.stopts) / SECONDS_IN_DAY
trained_timerange.startts += self.freqai_config['backtest_period'] * SECONDS_IN_DAY
trained_timerange.stopts += self.freqai_config['backtest_period'] * SECONDS_IN_DAY
retrain = elapsed_time > self.freqai_config['backtest_period']
else:
trained_timerange = TimeRange.parse_timerange("20000101-20000201")
trained_timerange.startts = int(time - self.freqai_config['train_period'] *
SECONDS_IN_DAY)
trained_timerange.stopts = int(time)
retrain = True
elapsed_time = (time - trained_timerange.stopts) / SECONDS_IN_DAY
trained_timerange.startts += self.freqai_config['backtest_period'] * SECONDS_IN_DAY
trained_timerange.stopts += self.freqai_config['backtest_period'] * SECONDS_IN_DAY
start = datetime.datetime.utcfromtimestamp(trained_timerange.startts)
stop = datetime.datetime.utcfromtimestamp(trained_timerange.stopts)
new_trained_timerange = start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d")
retrain = elapsed_time > self.freqai_config['backtest_period']
if retrain:
coin, _ = metadata['pair'].split("/")
# set the new model_path
@@ -738,3 +798,141 @@ class FreqaiDataKitchen:
def np_encoder(self, object):
if isinstance(object, np.generic):
return object.item()
# Functions containing useful data manpulation examples. but not actively in use.
# def build_feature_list(self, config: dict, metadata: dict) -> list:
# """
# SUPERCEDED BY self.find_features()
# Build the list of features that will be used to filter
# the full dataframe. Feature list is construced from the
# user configuration file.
# :params:
# :config: Canonical freqtrade config file containing all
# user defined input in config['freqai] dictionary.
# """
# features = []
# for tf in config["freqai"]["timeframes"]:
# for ft in config["freqai"]["base_features"]:
# for n in range(config["freqai"]["feature_parameters"]["shift"] + 1):
# shift = ""
# if n > 0:
# shift = "_shift-" + str(n)
# features.append(metadata['pair'].split("/")[0] + "-" + ft + shift + "_" + tf)
# for p in config["freqai"]["corr_pairlist"]:
# if metadata['pair'] in p:
# continue # avoid duplicate features
# features.append(p.split("/")[0] + "-" + ft + shift + "_" + tf)
# # logger.info("number of features %s", len(features))
# return features
# Possibly phasing these outlier removal methods below out in favor of
# use_SVM_to_remove_outliers (computationally more efficient and apparently higher performance).
# But these have good data manipulation examples, so keep them commented here for now.
# def determine_statistical_distributions(self) -> None:
# from fitter import Fitter
# logger.info('Determining best model for all features, may take some time')
# def compute_quantiles(ft):
# f = Fitter(self.data_dictionary["train_features"][ft],
# distributions=['gamma', 'cauchy', 'laplace',
# 'beta', 'uniform', 'lognorm'])
# f.fit()
# # f.summary()
# dist = list(f.get_best().items())[0][0]
# params = f.get_best()[dist]
# upper_q = getattr(spy.stats, list(f.get_best().items())[0][0]).ppf(0.999, **params)
# lower_q = getattr(spy.stats, list(f.get_best().items())[0][0]).ppf(0.001, **params)
# return ft, upper_q, lower_q, dist
# quantiles_tuple = Parallel(n_jobs=-1)(
# delayed(compute_quantiles)(ft) for ft in self.data_dictionary[
# 'train_features'].columns)
# df = pd.DataFrame(quantiles_tuple, columns=['features', 'upper_quantiles',
# 'lower_quantiles', 'dist'])
# self.data_dictionary['upper_quantiles'] = df['upper_quantiles']
# self.data_dictionary['lower_quantiles'] = df['lower_quantiles']
# return
# def remove_outliers(self, predict: bool) -> None:
# """
# Remove data that looks like an outlier based on the distribution of each
# variable.
# :params:
# :predict: boolean which tells the function if this is prediction data or
# training data coming in.
# """
# lower_quantile = self.data_dictionary["lower_quantiles"].to_numpy()
# upper_quantile = self.data_dictionary["upper_quantiles"].to_numpy()
# if predict:
# df = self.data_dictionary["prediction_features"][
# (self.data_dictionary["prediction_features"] < upper_quantile)
# & (self.data_dictionary["prediction_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(df).any(1)
# self.data_dictionary["prediction_features"].fillna(0, inplace=True)
# drop_index = ~drop_index
# do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
# logger.info(
# "remove_outliers() tossed %s predictions",
# len(do_predict) - do_predict.sum(),
# )
# self.do_predict += do_predict
# self.do_predict -= 1
# else:
# filter_train_df = self.data_dictionary["train_features"][
# (self.data_dictionary["train_features"] < upper_quantile)
# & (self.data_dictionary["train_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(filter_train_df).any(1)
# drop_index = drop_index.replace(True, 1).replace(False, 0)
# self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
# (drop_index == 0)
# ]
# self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
# (drop_index == 0)
# ]
# self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
# (drop_index == 0)
# ]
# logger.info(
# f'remove_outliers() tossed {drop_index.sum()}'
# f' training points from {len(filter_train_df)}'
# )
# # do the same for the test data
# filter_test_df = self.data_dictionary["test_features"][
# (self.data_dictionary["test_features"] < upper_quantile)
# & (self.data_dictionary["test_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(filter_test_df).any(1)
# drop_index = drop_index.replace(True, 1).replace(False, 0)
# self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][
# (drop_index == 0)
# ]
# self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
# (drop_index == 0)
# ]
# self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
# (drop_index == 0)
# ]
# logger.info(
# f'remove_outliers() tossed {drop_index.sum()}'
# f' test points from {len(filter_test_df)}'
# )
# return