
931 lines
40 KiB
Raw Normal View History

import copy
import datetime
import json
2022-05-04 15:53:40 +00:00
import logging
import pickle as pk
import shutil
from pathlib import Path
from typing import Any, Dict, List, Tuple
import numpy as np
2022-05-06 14:20:52 +00:00
import numpy.typing as npt
import pandas as pd
from joblib import dump, load # , Parallel, delayed # used for auto distribution assignment
from pandas import DataFrame
from sklearn import linear_model
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
from freqtrade.configuration import TimeRange
from import load_pair_history
from import refresh_backtest_ohlcv_data
from freqtrade.resolvers import ExchangeResolver
from freqtrade.strategy.interface import IStrategy
# import scipy as spy # used for auto distribution assignment
2022-05-04 15:53:40 +00:00
logger = logging.getLogger(__name__)
class FreqaiDataKitchen:
Class designed to handle all the data for the IFreqaiModel class model.
Functionalities include holding, saving, loading, and analyzing the data.
2022-05-03 08:28:13 +00:00
author: Robert Caulk,
def __init__(self, config: Dict[str, Any], dataframe: DataFrame, live: bool = False):
self.full_dataframe = dataframe Dict[Any, Any] = {}
self.data_dictionary: Dict[Any, Any] = {}
self.config = config
self.freqai_config = config["freqai"]
2022-05-06 14:20:52 +00:00
self.predictions: npt.ArrayLike = np.array([])
self.do_predict: npt.ArrayLike = np.array([])
self.target_mean: npt.ArrayLike = np.array([])
self.target_std: npt.ArrayLike = np.array([])
self.full_predictions: npt.ArrayLike = np.array([])
self.full_do_predict: npt.ArrayLike = np.array([])
self.full_target_mean: npt.ArrayLike = np.array([])
self.full_target_std: npt.ArrayLike = np.array([])
self.model_path = Path()
self.model_filename: str = ""
self.model_dictionary: Dict[Any, Any] = {} = live
self.svm_model: linear_model.SGDOneClassSVM = None
if not
self.full_timerange = self.create_fulltimerange(self.config["timerange"],
(self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
def set_paths(self) -> None:
self.full_path = Path(self.config['user_data_dir'] /
"models" /
str(self.freqai_config['live_full_backtestrange'] +
self.model_path = Path(self.full_path / str("sub-train" + "-" +
def save_data(self, model: Any) -> None:
Saves all data associated with a model for a single sub-train time range
:model: User trained model which can be reused for inferencing to generate
if not self.model_path.is_dir():
self.model_path.mkdir(parents=True, exist_ok=True)
save_path = Path(self.model_path)
# Save the trained model
dump(model, save_path / str(self.model_filename + "_model.joblib"))
if self.svm_model is not None:
dump(self.svm_model, save_path / str(self.model_filename + "_svm_model.joblib"))["model_path"] = str(self.model_path)["model_filename"] = str(self.model_filename)["training_features_list"] = list(self.data_dictionary["train_features"].columns)
# store the metadata
with open(save_path / str(self.model_filename + "_metadata.json"), "w") as fp:
json.dump(, fp, default=self.np_encoder)
# save the train data to file so we can check preds for area of applicability later
save_path / str(self.model_filename + "_trained_df.pkl")
self.model_dictionary[self.model_filename] = model
# TODO add a helper function to let user save/load any data they are custom adding. We
# do not want them having to edit the default save/load methods here. Below is an example
# of what we do NOT want.
# if self.freqai_config['feature_parameters']['determine_statistical_distributions']:
# self.data_dictionary["upper_quantiles"].to_pickle(
# save_path / str(self.model_filename + "_upper_quantiles.pkl")
# )
# self.data_dictionary["lower_quantiles"].to_pickle(
# save_path / str(self.model_filename + "_lower_quantiles.pkl")
# )
def load_data(self) -> Any:
loads all data required to make a prediction on a sub-train time range
:model: User trained model which can be inferenced for new predictions
with open(self.model_path / str(self.model_filename + "_metadata.json"), "r") as fp: = json.load(fp)
self.training_features_list =["training_features_list"]
self.data_dictionary["train_features"] = pd.read_pickle(
self.model_path / str(self.model_filename + "_trained_df.pkl")
# TODO add a helper function to let user save/load any data they are custom adding. We
# do not want them having to edit the default save/load methods here. Below is an example
# of what we do NOT want.
# if self.freqai_config['feature_parameters']['determine_statistical_distributions']:
# self.data_dictionary["upper_quantiles"] = pd.read_pickle(
# self.model_path / str(self.model_filename + "_upper_quantiles.pkl")
# )
# self.data_dictionary["lower_quantiles"] = pd.read_pickle(
# self.model_path / str(self.model_filename + "_lower_quantiles.pkl")
# )
self.model_path = Path(["model_path"])
self.model_filename =["model_filename"]
# try to access model in memory instead of loading object from disk to save time
if and self.model_filename in self.model_dictionary:
model = self.model_dictionary[self.model_filename]
model = load(self.model_path / str(self.model_filename + "_model.joblib"))
if Path(self.model_path / str(self.model_filename +
self.svm_model = load(self.model_path / str(self.model_filename + "_svm_model.joblib"))
assert model, (
f"Unable to load model, ensure model exists at "
f"{self.model_path} "
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
self.pca = pk.load(
open(self.model_path / str(self.model_filename + "_pca_object.pkl"), "rb")
return model
def make_train_test_datasets(
self, filtered_dataframe: DataFrame, labels: DataFrame
) -> Dict[Any, Any]:
Given the dataframe for the full history for training, split the data into
training and test data according to user specified parameters in configuration
:filtered_dataframe: cleaned dataframe ready to be split.
:labels: cleaned labels ready to be split.
2022-05-06 14:20:52 +00:00
weights: npt.ArrayLike
if self.config["freqai"]["feature_parameters"]["weight_factor"] > 0:
weights = self.set_weights_higher_recent(len(filtered_dataframe))
weights = np.ones(len(filtered_dataframe))
if self.config["freqai"]["feature_parameters"]["stratify"] > 0:
stratification = np.zeros(len(filtered_dataframe))
for i in range(1, len(stratification)):
if i % self.config["freqai"]["feature_parameters"]["stratify"] == 0:
stratification[i] = 1
) = train_test_split(
filtered_dataframe[: filtered_dataframe.shape[0]],
# shuffle=False,
return self.build_data_dictionary(
train_features, test_features, train_labels, test_labels, train_weights, test_weights
def filter_features(
unfiltered_dataframe: DataFrame,
training_feature_list: List,
labels: DataFrame = pd.DataFrame(),
training_filter: bool = True,
) -> Tuple[DataFrame, DataFrame]:
Filter the unfiltered dataframe to extract the user requested features and properly
remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
0s in the prediction dataset. However, prediction dataset do_predict will reflect any
row that had a NaN and will shield user from that prediction.
:unfiltered_dataframe: the full dataframe for the present training period
:training_feature_list: list, the training feature list constructed by
self.build_feature_list() according to user specified parameters in the configuration file.
:labels: the labels for the dataset
:training_filter: boolean which lets the function know if it is training data or
prediction data to be filtered.
:filtered_dataframe: dataframe cleaned of NaNs and only containing the user
requested feature set.
:labels: labels cleaned of NaNs.
filtered_dataframe = unfiltered_dataframe.filter(training_feature_list, axis=1)
drop_index = pd.isnull(filtered_dataframe).any(1) # get the rows that have NaNs,
drop_index = drop_index.replace(True, 1).replace(False, 0) # pep8 requirement.
if (
): # we don't care about total row number (total no. datapoints) in training, we only care
# about removing any row with NaNs
drop_index_labels = pd.isnull(labels)
drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0)
filtered_dataframe = filtered_dataframe[
(drop_index == 0) & (drop_index_labels == 0)
] # dropping values
labels = labels[
(drop_index == 0) & (drop_index_labels == 0)
] # assuming the labels depend entirely on the dataframe here.
# "dropped %s training points due to NaNs, ensure all historical data downloaded",
# len(unfiltered_dataframe) - len(filtered_dataframe),
# )["filter_drop_index_training"] = drop_index
# we are backtesting so we need to preserve row number to send back to strategy,
# so now we use do_predict to avoid any prediction based on a NaN
drop_index = pd.isnull(filtered_dataframe).any(1)["filter_drop_index_prediction"] = drop_index
filtered_dataframe.fillna(0, inplace=True)
# replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction
# that was based on a single NaN is ultimately protected from buys with do_predict
drop_index = ~drop_index
self.do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
2022-05-04 15:53:40 +00:00
"dropped %s of %s prediction data points due to NaNs.",
len(self.do_predict) - self.do_predict.sum(),
return filtered_dataframe, labels
def build_data_dictionary(
train_df: DataFrame,
test_df: DataFrame,
train_labels: DataFrame,
test_labels: DataFrame,
train_weights: Any,
test_weights: Any,
) -> Dict:
self.data_dictionary = {
"train_features": train_df,
"test_features": test_df,
"train_labels": train_labels,
"test_labels": test_labels,
"train_weights": train_weights,
"test_weights": test_weights,
return self.data_dictionary
def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
Normalize all data in the data_dictionary according to the training dataset
:data_dictionary: dictionary containing the cleaned and split training/test data/labels
:data_dictionary: updated dictionary with standardized values.
# standardize the data by training stats
train_mean = data_dictionary["train_features"].mean()
train_std = data_dictionary["train_features"].std()
data_dictionary["train_features"] = (
data_dictionary["train_features"] - train_mean
) / train_std
data_dictionary["test_features"] = (
data_dictionary["test_features"] - train_mean
) / train_std
train_labels_std = data_dictionary["train_labels"].std()
train_labels_mean = data_dictionary["train_labels"].mean()
data_dictionary["train_labels"] = (
data_dictionary["train_labels"] - train_labels_mean
) / train_labels_std
data_dictionary["test_labels"] = (
data_dictionary["test_labels"] - train_labels_mean
) / train_labels_std
for item in train_std.keys():[item + "_std"] = train_std[item][item + "_mean"] = train_mean[item]["labels_std"] = train_labels_std["labels_mean"] = train_labels_mean
return data_dictionary
def standardize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
Standardize all data in the data_dictionary according to the training dataset
:data_dictionary: dictionary containing the cleaned and split training/test data/labels
:data_dictionary: updated dictionary with standardized values.
# standardize the data by training stats
train_max = data_dictionary["train_features"].max()
train_min = data_dictionary["train_features"].min()
data_dictionary["train_features"] = 2 * (
data_dictionary["train_features"] - train_min
) / (train_max - train_min) - 1
data_dictionary["test_features"] = 2 * (
data_dictionary["test_features"] - train_min
) / (train_max - train_min) - 1
train_labels_max = data_dictionary["train_labels"].max()
train_labels_min = data_dictionary["train_labels"].min()
data_dictionary["train_labels"] = 2 * (
data_dictionary["train_labels"] - train_labels_min
) / (train_labels_max - train_labels_min) - 1
data_dictionary["test_labels"] = 2 * (
data_dictionary["test_labels"] - train_labels_min
) / (train_labels_max - train_labels_min) - 1
for item in train_max.keys():[item + "_max"] = train_max[item][item + "_min"] = train_min[item]["labels_max"] = train_labels_max["labels_min"] = train_labels_min
return data_dictionary
def standardize_data_from_metadata(self, df: DataFrame) -> DataFrame:
Standardizes a set of data using the mean and standard deviation from
the associated training data.
:df: Dataframe to be standardized
for item in df.keys():
df[item] = 2 * (df[item] -[item + "_min"]) / ([item + "_max"] -[item + '_min']) - 1
return df
def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame:
Normalizes a set of data using the mean and standard deviation from
the associated training data.
:df: Dataframe to be standardized
for item in df.keys():
df[item] = (df[item] -[item + "_mean"]) /[item + "_std"]
return df
def split_timerange(
self, tr: str, train_split: int = 28, bt_split: int = 7
) -> Tuple[list, list]:
Function which takes a single time range (tr) and splits it
into sub timeranges to train and backtest on based on user input
tr: str, full timerange to train on
train_split: the period length for the each training (days). Specified in user
configuration file
bt_split: the backtesting length (dats). Specified in user configuration file
train_period = train_split * SECONDS_IN_DAY
bt_period = bt_split * SECONDS_IN_DAY
full_timerange = TimeRange.parse_timerange(tr)
config_timerange = TimeRange.parse_timerange(self.config["timerange"])
2022-05-17 17:50:06 +00:00
if config_timerange.stopts == 0:
config_timerange.stopts = int(
timerange_train = copy.deepcopy(full_timerange)
timerange_backtest = copy.deepcopy(full_timerange)
tr_training_list = []
tr_backtesting_list = []
first = True
# within_config_timerange = True
while True:
if not first:
timerange_train.startts = timerange_train.startts + bt_period
timerange_train.stopts = timerange_train.startts + train_period
first = False
start = datetime.datetime.utcfromtimestamp(timerange_train.startts)
stop = datetime.datetime.utcfromtimestamp(timerange_train.stopts)
tr_training_list.append(start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d"))
# associated backtest period
2022-05-06 11:06:54 +00:00
timerange_backtest.startts = timerange_train.stopts
timerange_backtest.stopts = timerange_backtest.startts + bt_period
if timerange_backtest.stopts > config_timerange.stopts:
timerange_backtest.stopts = config_timerange.stopts
start = datetime.datetime.utcfromtimestamp(timerange_backtest.startts)
stop = datetime.datetime.utcfromtimestamp(timerange_backtest.stopts)
tr_backtesting_list.append(start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d"))
# ensure we are predicting on exactly same amount of data as requested by user defined
# --timerange
if timerange_backtest.stopts == config_timerange.stopts:
print(tr_training_list, tr_backtesting_list)
return tr_training_list, tr_backtesting_list
def slice_dataframe(self, tr: str, df: DataFrame) -> DataFrame:
Given a full dataframe, extract the user desired window
:tr: timerange string that we wish to extract from df
:df: Dataframe containing all candles to run the entire backtest. Here
it is sliced down to just the present training period.
timerange = TimeRange.parse_timerange(tr)
start = datetime.datetime.fromtimestamp(timerange.startts, tz=datetime.timezone.utc)
stop = datetime.datetime.fromtimestamp(timerange.stopts, tz=datetime.timezone.utc)
df = df.loc[df["date"] >= start, :]
df = df.loc[df["date"] <= stop, :]
return df
def principal_component_analysis(self) -> None:
Performs Principal Component Analysis on the data for dimensionality reduction
and outlier detection (see self.remove_outliers())
No parameters or returns, it acts on the data_dictionary held by the DataHandler.
from sklearn.decomposition import PCA # avoid importing if we dont need it
n_components = self.data_dictionary["train_features"].shape[1]
pca = PCA(n_components=n_components)
pca =["train_features"])
n_keep_components = np.argmin(pca.explained_variance_ratio_.cumsum() < 0.999)
pca2 = PCA(n_components=n_keep_components)["n_kept_components"] = n_keep_components
pca2 =["train_features"])"reduced feature dimension by %s", n_components - n_keep_components)"explained variance %f", np.sum(pca2.explained_variance_ratio_))
train_components = pca2.transform(self.data_dictionary["train_features"])
test_components = pca2.transform(self.data_dictionary["test_features"])
self.data_dictionary["train_features"] = pd.DataFrame(
columns=["PC" + str(i) for i in range(0, n_keep_components)],
self.data_dictionary["test_features"] = pd.DataFrame(
columns=["PC" + str(i) for i in range(0, n_keep_components)],
)["n_kept_components"] = n_keep_components
self.pca = pca2'PCA reduced total features from {n_components} to {n_keep_components}')
if not self.model_path.is_dir():
self.model_path.mkdir(parents=True, exist_ok=True)
pk.dump(pca2, open(self.model_path / str(self.model_filename + "_pca_object.pkl"), "wb"))
return None
def compute_distances(self) -> float:
2022-05-04 15:53:40 +00:00"computing average mean distance for all training points")
pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=-1)
avg_mean_dist = pairwise.mean(axis=1).mean()"avg_mean_dist %s", avg_mean_dist)
return avg_mean_dist
def use_SVM_to_remove_outliers(self, predict: bool) -> None:
if predict:
assert self.svm_model, "No svm model available for outlier removal"
y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
do_predict = np.where(y_pred == -1, 0, y_pred)
2022-05-04 15:53:40 +00:00
f'svm_remove_outliers() tossed {len(do_predict) - do_predict.sum()} predictions'
self.do_predict += do_predict
self.do_predict -= 1
# use SGDOneClassSVM to increase speed?
self.svm_model = linear_model.SGDOneClassSVM(nu=0.1).fit(
y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
dropped_points = np.where(y_pred == -1, 0, y_pred)
# keep_index = np.where(y_pred == 1)
self.data_dictionary["train_features"] = self.data_dictionary[
"train_features"][(y_pred == 1)]
self.data_dictionary["train_labels"] = self.data_dictionary[
"train_labels"][(y_pred == 1)]
self.data_dictionary["train_weights"] = self.data_dictionary[
"train_weights"][(y_pred == 1)]
f'svm_remove_outliers() tossed {len(y_pred) - dropped_points.sum()}'
f' train points from {len(y_pred)}'
# same for test data
y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
dropped_points = np.where(y_pred == -1, 0, y_pred)
self.data_dictionary["test_features"] = self.data_dictionary[
"test_features"][(y_pred == 1)]
self.data_dictionary["test_labels"] = self.data_dictionary[
"test_labels"][(y_pred == 1)]
self.data_dictionary["test_weights"] = self.data_dictionary[
"test_weights"][(y_pred == 1)]
f'svm_remove_outliers() tossed {len(y_pred) - dropped_points.sum()}'
f' test points from {len(y_pred)}'
def find_features(self, dataframe: DataFrame) -> list:
column_names = dataframe.columns
features = [c for c in column_names if '%' in c]
assert features, ("Could not find any features!")
return features
def check_if_pred_in_training_spaces(self) -> None:
Compares the distance from each prediction point to each training data
point. It uses this information to estimate a Dissimilarity Index (DI)
and avoid making predictions on any points that are too far away
from the training data set.
distance = pairwise_distances(
do_predict = np.where(
distance.min(axis=0) /["avg_mean_dist"]
< self.config["freqai"]["feature_parameters"]["DI_threshold"],
# "Distance checker tossed %s predictions for being too far from training data",
# len(do_predict) - do_predict.sum(),
# )
self.do_predict += do_predict
self.do_predict -= 1
2022-05-06 14:20:52 +00:00
def set_weights_higher_recent(self, num_weights: int) -> npt.ArrayLike:
Set weights so that recent data is more heavily weighted during
training than older data.
2022-05-06 14:20:52 +00:00
weights = np.zeros(num_weights)
for i in range(1, len(weights)):
weights[len(weights) - i] = np.exp(
-i / (self.config["freqai"]["feature_parameters"]["weight_factor"] * num_weights)
return weights
def append_predictions(self, predictions, do_predict, len_dataframe):
Append backtest prediction from current backtest period to all previous periods
ones = np.ones(len_dataframe)
s_mean, s_std = ones *["s_mean"], ones *["s_std"]
self.full_predictions = np.append(self.full_predictions, predictions)
self.full_do_predict = np.append(self.full_do_predict, do_predict)
self.full_target_mean = np.append(self.full_target_mean, s_mean)
self.full_target_std = np.append(self.full_target_std, s_std)
def fill_predictions(self, len_dataframe):
Back fill values to before the backtesting range so that the dataframe matches size
when it goes back to the strategy. These rows are not included in the backtest.
filler = np.zeros(len_dataframe - len(self.full_predictions)) # startup_candle_count
self.full_predictions = np.append(filler, self.full_predictions)
self.full_do_predict = np.append(filler, self.full_do_predict)
self.full_target_mean = np.append(filler, self.full_target_mean)
self.full_target_std = np.append(filler, self.full_target_std)
def create_fulltimerange(self, backtest_tr: str, backtest_period: int) -> str:
backtest_timerange = TimeRange.parse_timerange(backtest_tr)
2022-05-17 17:50:06 +00:00
if backtest_timerange.stopts == 0:
backtest_timerange.stopts = int(
backtest_timerange.startts = backtest_timerange.startts - backtest_period * SECONDS_IN_DAY
start = datetime.datetime.utcfromtimestamp(backtest_timerange.startts)
stop = datetime.datetime.utcfromtimestamp(backtest_timerange.stopts)
full_timerange = start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d")
self.full_path = Path(
/ "models"
/ str(full_timerange + self.freqai_config["identifier"])
config_path = Path(self.config["config_files"][0])
if not self.full_path.is_dir():
self.full_path.mkdir(parents=True, exist_ok=True)
Path(self.full_path /[-1]),
return full_timerange
def check_if_new_training_required(self, trained_timerange: TimeRange,
2022-05-23 07:55:58 +00:00
metadata: dict) -> Tuple[bool, TimeRange]:
time =
if trained_timerange.startts != 0:
elapsed_time = (time - trained_timerange.stopts) / SECONDS_IN_DAY
retrain = elapsed_time > self.freqai_config['backtest_period']
if retrain:
trained_timerange.startts += self.freqai_config['backtest_period'] * SECONDS_IN_DAY
trained_timerange.stopts += self.freqai_config['backtest_period'] * SECONDS_IN_DAY
else: # user passed no live_trained_timerange in config
2022-05-22 22:10:36 +00:00
trained_timerange = TimeRange()
trained_timerange.startts = int(time - self.freqai_config['train_period'] *
trained_timerange.stopts = int(time)
retrain = True
if retrain:
coin, _ = metadata['pair'].split("/")
# set the new model_path
self.model_path = Path(self.full_path / str("sub-train" + "-" +
2022-05-23 07:55:58 +00:00
2022-05-23 07:55:58 +00:00
self.model_filename = "cb_" + coin.lower() + "_" + str(int(trained_timerange.stopts))
# this is not persistent at the moment TODO
2022-05-23 07:55:58 +00:00
self.freqai_config['live_trained_timerange'] = str(int(trained_timerange.stopts))
# enables persistence, but not fully implemented into save/load data yer
2022-05-23 07:55:58 +00:00['live_trained_timerange'] = str(int(trained_timerange.stopts))
2022-05-23 07:55:58 +00:00
return retrain, trained_timerange
def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict) -> None:
exchange = ExchangeResolver.load_exchange(self.config['exchange']['name'],
self.config, validate=False)
pairs = self.freqai_config['corr_pairlist']
if metadata['pair'] not in pairs:
pairs += metadata['pair'] # dont include pair twice
# timerange = TimeRange.parse_timerange(new_timerange)
exchange, pairs=pairs, timeframes=self.freqai_config['timeframes'],
datadir=self.config['datadir'], timerange=timerange,
erase=False, data_format=self.config['dataformat_ohlcv'],
trading_mode=self.config.get('trading_mode', 'spot'),
prepend=self.config.get('prepend_data', False)
def load_pairs_histories(self, timerange: TimeRange, metadata: dict) -> Tuple[Dict[Any, Any],
corr_dataframes: Dict[Any, Any] = {}
base_dataframes: Dict[Any, Any] = {}
pairs = self.freqai_config['corr_pairlist'] # + [metadata['pair']]
# timerange = TimeRange.parse_timerange(new_timerange)
for tf in self.freqai_config['timeframes']:
base_dataframes[tf] = load_pair_history(datadir=self.config['datadir'],
pair=metadata['pair'], timerange=timerange)
for p in pairs:
if metadata['pair'] in p:
continue # dont repeat anything from whitelist
if p not in corr_dataframes:
corr_dataframes[p] = {}
corr_dataframes[p][tf] = load_pair_history(datadir=self.config['datadir'],
pair=p, timerange=timerange)
return corr_dataframes, base_dataframes
def use_strategy_to_populate_indicators(self, strategy: IStrategy,
corr_dataframes: dict,
base_dataframes: dict,
metadata: dict) -> DataFrame:
dataframe = base_dataframes[self.config['timeframe']]
for tf in self.freqai_config["timeframes"]:
dataframe = strategy.populate_any_indicators(metadata['pair'],
coin=metadata['pair'].split("/")[0] + "-"
for i in self.freqai_config["corr_pairlist"]:
if metadata['pair'] in i:
continue # dont repeat anything from whitelist
dataframe = strategy.populate_any_indicators(i,
coin=i.split("/")[0] + "-"
return dataframe
def np_encoder(self, object):
if isinstance(object, np.generic):
return object.item()
# Functions containing useful data manpulation examples. but not actively in use.
# def build_feature_list(self, config: dict, metadata: dict) -> list:
# """
# SUPERCEDED BY self.find_features()
# Build the list of features that will be used to filter
# the full dataframe. Feature list is construced from the
# user configuration file.
# :params:
# :config: Canonical freqtrade config file containing all
# user defined input in config['freqai] dictionary.
# """
# features = []
# for tf in config["freqai"]["timeframes"]:
# for ft in config["freqai"]["base_features"]:
# for n in range(config["freqai"]["feature_parameters"]["shift"] + 1):
# shift = ""
# if n > 0:
# shift = "_shift-" + str(n)
# features.append(metadata['pair'].split("/")[0] + "-" + ft + shift + "_" + tf)
# for p in config["freqai"]["corr_pairlist"]:
# if metadata['pair'] in p:
# continue # avoid duplicate features
# features.append(p.split("/")[0] + "-" + ft + shift + "_" + tf)
# #"number of features %s", len(features))
# return features
# Possibly phasing these outlier removal methods below out in favor of
# use_SVM_to_remove_outliers (computationally more efficient and apparently higher performance).
# But these have good data manipulation examples, so keep them commented here for now.
# def determine_statistical_distributions(self) -> None:
# from fitter import Fitter
#'Determining best model for all features, may take some time')
# def compute_quantiles(ft):
# f = Fitter(self.data_dictionary["train_features"][ft],
# distributions=['gamma', 'cauchy', 'laplace',
# 'beta', 'uniform', 'lognorm'])
# # f.summary()
# dist = list(f.get_best().items())[0][0]
# params = f.get_best()[dist]
# upper_q = getattr(spy.stats, list(f.get_best().items())[0][0]).ppf(0.999, **params)
# lower_q = getattr(spy.stats, list(f.get_best().items())[0][0]).ppf(0.001, **params)
# return ft, upper_q, lower_q, dist
# quantiles_tuple = Parallel(n_jobs=-1)(
# delayed(compute_quantiles)(ft) for ft in self.data_dictionary[
# 'train_features'].columns)
# df = pd.DataFrame(quantiles_tuple, columns=['features', 'upper_quantiles',
# 'lower_quantiles', 'dist'])
# self.data_dictionary['upper_quantiles'] = df['upper_quantiles']
# self.data_dictionary['lower_quantiles'] = df['lower_quantiles']
# return
# def remove_outliers(self, predict: bool) -> None:
# """
# Remove data that looks like an outlier based on the distribution of each
# variable.
# :params:
# :predict: boolean which tells the function if this is prediction data or
# training data coming in.
# """
# lower_quantile = self.data_dictionary["lower_quantiles"].to_numpy()
# upper_quantile = self.data_dictionary["upper_quantiles"].to_numpy()
# if predict:
# df = self.data_dictionary["prediction_features"][
# (self.data_dictionary["prediction_features"] < upper_quantile)
# & (self.data_dictionary["prediction_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(df).any(1)
# self.data_dictionary["prediction_features"].fillna(0, inplace=True)
# drop_index = ~drop_index
# do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
# "remove_outliers() tossed %s predictions",
# len(do_predict) - do_predict.sum(),
# )
# self.do_predict += do_predict
# self.do_predict -= 1
# else:
# filter_train_df = self.data_dictionary["train_features"][
# (self.data_dictionary["train_features"] < upper_quantile)
# & (self.data_dictionary["train_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(filter_train_df).any(1)
# drop_index = drop_index.replace(True, 1).replace(False, 0)
# self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
# (drop_index == 0)
# ]
# self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
# (drop_index == 0)
# ]
# self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
# (drop_index == 0)
# ]
# f'remove_outliers() tossed {drop_index.sum()}'
# f' training points from {len(filter_train_df)}'
# )
# # do the same for the test data
# filter_test_df = self.data_dictionary["test_features"][
# (self.data_dictionary["test_features"] < upper_quantile)
# & (self.data_dictionary["test_features"] > lower_quantile)
# ]
# drop_index = pd.isnull(filter_test_df).any(1)
# drop_index = drop_index.replace(True, 1).replace(False, 0)
# self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][
# (drop_index == 0)
# ]
# self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
# (drop_index == 0)
# ]
# self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
# (drop_index == 0)
# ]
# f'remove_outliers() tossed {drop_index.sum()}'
# f' test points from {len(filter_test_df)}'
# )
# return