isolate data_drawer functions from data_kitchen, accommodate tests, add new test

This commit is contained in:
robcaulk
2022-07-26 10:24:14 +02:00
parent 56b17e6f3c
commit e213d0ad55
12 changed files with 606 additions and 376 deletions

View File

@@ -7,11 +7,17 @@ import threading
from pathlib import Path
from typing import Any, Dict, Tuple
import numpy.typing as npt
import numpy as np
import pandas as pd
from joblib.externals import cloudpickle
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from joblib import dump, load
from freqtrade.strategy.interface import IStrategy
from freqtrade.exceptions import OperationalException
from freqtrade.data.history import load_pair_history
from freqtrade.configuration import TimeRange
logger = logging.getLogger(__name__)
@@ -214,7 +220,8 @@ class FreqaiDataDrawer:
# send pair to end of queue
self.pair_dict[pair]["priority"] = len(self.pair_dict)
def set_initial_return_values(self, pair: str, dk, pred_df, do_preds) -> None:
def set_initial_return_values(self, pair: str, dk: FreqaiDataKitchen,
pred_df: DataFrame, do_preds: npt.ArrayLike) -> None:
"""
Set the initial return values to a persistent dataframe. This avoids needing to repredict on
historical candles, and also stores historical predictions despite retrainings (so stored
@@ -351,6 +358,217 @@ class FreqaiDataDrawer:
if self.config.get("freqai", {}).get("purge_old_models", False):
self.purge_old_models()
# Functions pulled back from FreqaiDataKitchen because they relied on DataDrawer
def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
"""
Saves all data associated with a model for a single sub-train time range
:params:
:model: User trained model which can be reused for inferencing to generate
predictions
"""
if not dk.data_path.is_dir():
dk.data_path.mkdir(parents=True, exist_ok=True)
save_path = Path(dk.data_path)
# Save the trained model
if not dk.keras:
dump(model, save_path / f"{dk.model_filename}_model.joblib")
else:
model.save(save_path / f"{dk.model_filename}_model.h5")
if dk.svm_model is not None:
dump(dk.svm_model, save_path / str(dk.model_filename + "_svm_model.joblib"))
dk.data["data_path"] = str(dk.data_path)
dk.data["model_filename"] = str(dk.model_filename)
dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
dk.data["label_list"] = dk.label_list
# store the metadata
with open(save_path / str(dk.model_filename + "_metadata.json"), "w") as fp:
json.dump(dk.data, fp, default=dk.np_encoder)
# save the train data to file so we can check preds for area of applicability later
dk.data_dictionary["train_features"].to_pickle(
save_path / str(dk.model_filename + "_trained_df.pkl")
)
if self.freqai_info.get("feature_parameters", {}).get("principal_component_analysis"):
cloudpickle.dump(
dk.pca, open(dk.data_path / str(dk.model_filename + "_pca_object.pkl"), "wb")
)
# if self.live:
self.model_dictionary[dk.model_filename] = model
self.pair_dict[coin]["model_filename"] = dk.model_filename
self.pair_dict[coin]["data_path"] = str(dk.data_path)
self.save_drawer_to_disk()
return
def load_data(self, coin: str, dk: FreqaiDataKitchen) -> Any:
"""
loads all data required to make a prediction on a sub-train time range
:returns:
:model: User trained model which can be inferenced for new predictions
"""
if not self.pair_dict[coin]["model_filename"]:
return None
if dk.live:
dk.model_filename = self.pair_dict[coin]["model_filename"]
dk.data_path = Path(self.pair_dict[coin]["data_path"])
if self.freqai_info.get("follow_mode", False):
# follower can be on a different system which is rsynced to the leader:
dk.data_path = Path(
self.config["user_data_dir"]
/ "models"
/ dk.data_path.parts[-2]
/ dk.data_path.parts[-1]
)
with open(dk.data_path / str(dk.model_filename + "_metadata.json"), "r") as fp:
dk.data = json.load(fp)
dk.training_features_list = dk.data["training_features_list"]
dk.label_list = dk.data["label_list"]
dk.data_dictionary["train_features"] = pd.read_pickle(
dk.data_path / str(dk.model_filename + "_trained_df.pkl")
)
# try to access model in memory instead of loading object from disk to save time
if dk.live and dk.model_filename in self.model_dictionary:
model = self.model_dictionary[dk.model_filename]
elif not dk.keras:
model = load(dk.data_path / str(dk.model_filename + "_model.joblib"))
else:
from tensorflow import keras
model = keras.models.load_model(dk.data_path / str(dk.model_filename + "_model.h5"))
if Path(dk.data_path / str(dk.model_filename + "_svm_model.joblib")).resolve().exists():
dk.svm_model = load(dk.data_path / str(dk.model_filename + "_svm_model.joblib"))
if not model:
raise OperationalException(
f"Unable to load model, ensure model exists at " f"{dk.data_path} "
)
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
dk.pca = cloudpickle.load(
open(dk.data_path / str(dk.model_filename + "_pca_object.pkl"), "rb")
)
return model
def update_historic_data(self, strategy: IStrategy, dk: FreqaiDataKitchen) -> None:
"""
Append new candles to our stores historic data (in memory) so that
we do not need to load candle history from disk and we dont need to
pinging exchange multiple times for the same candle.
:params:
dataframe: DataFrame = strategy provided dataframe
"""
feat_params = self.freqai_info.get("feature_parameters", {})
with self.history_lock:
history_data = self.historic_data
for pair in dk.all_pairs:
for tf in feat_params.get("include_timeframes"):
# check if newest candle is already appended
df_dp = strategy.dp.get_pair_dataframe(pair, tf)
if len(df_dp.index) == 0:
continue
if str(history_data[pair][tf].iloc[-1]["date"]) == str(
df_dp.iloc[-1:]["date"].iloc[-1]
):
continue
try:
index = (
df_dp.loc[
df_dp["date"] == history_data[pair][tf].iloc[-1]["date"]
].index[0]
+ 1
)
except IndexError:
logger.warning(
f"Unable to update pair history for {pair}. "
"If this does not resolve itself after 1 additional candle, "
"please report the error to #freqai discord channel"
)
return
history_data[pair][tf] = pd.concat(
[
history_data[pair][tf],
strategy.dp.get_pair_dataframe(pair, tf).iloc[index:],
],
ignore_index=True,
axis=0,
)
def load_all_pair_histories(self, timerange: TimeRange, dk: FreqaiDataKitchen) -> None:
"""
Load pair histories for all whitelist and corr_pairlist pairs.
Only called once upon startup of bot.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
"""
history_data = self.historic_data
for pair in dk.all_pairs:
if pair not in history_data:
history_data[pair] = {}
for tf in self.freqai_info.get("feature_parameters", {}).get("include_timeframes"):
history_data[pair][tf] = load_pair_history(
datadir=self.config["datadir"],
timeframe=tf,
pair=pair,
timerange=timerange,
data_format=self.config.get("dataformat_ohlcv", "json"),
candle_type=self.config.get("trading_mode", "spot"),
)
def get_base_and_corr_dataframes(
self, timerange: TimeRange, pair: str, dk: FreqaiDataKitchen
) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
"""
Searches through our historic_data in memory and returns the dataframes relevant
to the present pair.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
metadata: dict = strategy furnished pair metadata
"""
with self.history_lock:
corr_dataframes: Dict[Any, Any] = {}
base_dataframes: Dict[Any, Any] = {}
historic_data = self.historic_data
pairs = self.freqai_info.get("feature_parameters", {}).get(
"include_corr_pairlist", []
)
for tf in self.freqai_info.get("feature_parameters", {}).get("include_timeframes"):
base_dataframes[tf] = dk.slice_dataframe(timerange, historic_data[pair][tf])
if pairs:
for p in pairs:
if pair in p:
continue # dont repeat anything from whitelist
if p not in corr_dataframes:
corr_dataframes[p] = {}
corr_dataframes[p][tf] = dk.slice_dataframe(
timerange, historic_data[p][tf]
)
return corr_dataframes, base_dataframes
# to be used if we want to send predictions directly to the follower instead of forcing
# follower to load models and inference
# def save_model_return_values_to_disk(self) -> None:

View File

@@ -1,6 +1,5 @@
import copy
import datetime
import json
import logging
import shutil
from pathlib import Path
@@ -9,18 +8,14 @@ from typing import Any, Dict, List, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
from joblib import dump, load # , Parallel, delayed # used for auto distribution assignment
from joblib.externals import cloudpickle
from pandas import DataFrame
from sklearn import linear_model
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
from freqtrade.configuration import TimeRange
from freqtrade.data.history import load_pair_history
from freqtrade.data.history.history_utils import refresh_backtest_ohlcv_data
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.data_drawer import FreqaiDataDrawer
from freqtrade.resolvers import ExchangeResolver
from freqtrade.strategy.interface import IStrategy
@@ -57,7 +52,6 @@ class FreqaiDataKitchen:
def __init__(
self,
config: Dict[str, Any],
data_drawer: FreqaiDataDrawer,
live: bool = False,
pair: str = "",
):
@@ -69,6 +63,7 @@ class FreqaiDataKitchen:
self.append_df: DataFrame = DataFrame()
self.data_path = Path()
self.label_list: List = []
self.training_features_list: List = []
self.model_filename: str = ""
self.live = live
self.pair = pair
@@ -89,8 +84,6 @@ class FreqaiDataKitchen:
config["freqai"]["backtest_period_days"],
)
self.dd = data_drawer
def set_paths(
self,
pair: str,
@@ -113,110 +106,6 @@ class FreqaiDataKitchen:
return
def save_data(self, model: Any, coin: str = "", label=None) -> None:
"""
Saves all data associated with a model for a single sub-train time range
:params:
:model: User trained model which can be reused for inferencing to generate
predictions
"""
if not self.data_path.is_dir():
self.data_path.mkdir(parents=True, exist_ok=True)
save_path = Path(self.data_path)
# Save the trained model
if not self.keras:
dump(model, save_path / f"{self.model_filename}_model.joblib")
else:
model.save(save_path / f"{self.model_filename}_model.h5")
if self.svm_model is not None:
dump(self.svm_model, save_path / str(self.model_filename + "_svm_model.joblib"))
self.data["data_path"] = str(self.data_path)
self.data["model_filename"] = str(self.model_filename)
self.data["training_features_list"] = list(self.data_dictionary["train_features"].columns)
self.data["label_list"] = self.label_list
# store the metadata
with open(save_path / str(self.model_filename + "_metadata.json"), "w") as fp:
json.dump(self.data, fp, default=self.np_encoder)
# save the train data to file so we can check preds for area of applicability later
self.data_dictionary["train_features"].to_pickle(
save_path / str(self.model_filename + "_trained_df.pkl")
)
if self.freqai_config.get("feature_parameters", {}).get("principal_component_analysis"):
cloudpickle.dump(
self.pca, open(self.data_path / str(self.model_filename + "_pca_object.pkl"), "wb")
)
# if self.live:
self.dd.model_dictionary[self.model_filename] = model
self.dd.pair_dict[coin]["model_filename"] = self.model_filename
self.dd.pair_dict[coin]["data_path"] = str(self.data_path)
self.dd.save_drawer_to_disk()
return
def load_data(self, coin: str = "") -> Any:
"""
loads all data required to make a prediction on a sub-train time range
:returns:
:model: User trained model which can be inferenced for new predictions
"""
if not self.dd.pair_dict[coin]["model_filename"]:
return None
if self.live:
self.model_filename = self.dd.pair_dict[coin]["model_filename"]
self.data_path = Path(self.dd.pair_dict[coin]["data_path"])
if self.freqai_config.get("follow_mode", False):
# follower can be on a different system which is rsynced to the leader:
self.data_path = Path(
self.config["user_data_dir"]
/ "models"
/ self.data_path.parts[-2]
/ self.data_path.parts[-1]
)
with open(self.data_path / str(self.model_filename + "_metadata.json"), "r") as fp:
self.data = json.load(fp)
self.training_features_list = self.data["training_features_list"]
self.label_list = self.data["label_list"]
self.data_dictionary["train_features"] = pd.read_pickle(
self.data_path / str(self.model_filename + "_trained_df.pkl")
)
# try to access model in memory instead of loading object from disk to save time
if self.live and self.model_filename in self.dd.model_dictionary:
model = self.dd.model_dictionary[self.model_filename]
elif not self.keras:
model = load(self.data_path / str(self.model_filename + "_model.joblib"))
else:
from tensorflow import keras
model = keras.models.load_model(self.data_path / str(self.model_filename + "_model.h5"))
if Path(self.data_path / str(self.model_filename + "_svm_model.joblib")).resolve().exists():
self.svm_model = load(self.data_path / str(self.model_filename + "_svm_model.joblib"))
if not model:
raise OperationalException(
f"Unable to load model, ensure model exists at " f"{self.data_path} "
)
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
self.pca = cloudpickle.load(
open(self.data_path / str(self.model_filename + "_pca_object.pkl"), "rb")
)
return model
def make_train_test_datasets(
self, filtered_dataframe: DataFrame, labels: DataFrame
) -> Dict[Any, Any]:
@@ -953,56 +842,6 @@ class FreqaiDataKitchen:
prepend=self.config.get("prepend_data", False),
)
def update_historic_data(self, strategy: IStrategy) -> None:
"""
Append new candles to our stores historic data (in memory) so that
we do not need to load candle history from disk and we dont need to
pinging exchange multiple times for the same candle.
:params:
dataframe: DataFrame = strategy provided dataframe
"""
feat_params = self.freqai_config.get("feature_parameters", {})
with self.dd.history_lock:
history_data = self.dd.historic_data
for pair in self.all_pairs:
for tf in feat_params.get("include_timeframes"):
# check if newest candle is already appended
df_dp = strategy.dp.get_pair_dataframe(pair, tf)
if len(df_dp.index) == 0:
continue
if str(history_data[pair][tf].iloc[-1]["date"]) == str(
df_dp.iloc[-1:]["date"].iloc[-1]
):
continue
try:
index = (
df_dp.loc[
df_dp["date"] == history_data[pair][tf].iloc[-1]["date"]
].index[0]
+ 1
)
except IndexError:
logger.warning(
f"Unable to update pair history for {pair}. "
"If this does not resolve itself after 1 additional candle, "
"please report the error to #freqai discord channel"
)
return
history_data[pair][tf] = pd.concat(
[
history_data[pair][tf],
strategy.dp.get_pair_dataframe(pair, tf).iloc[index:],
],
ignore_index=True,
axis=0,
)
# logger.info(f'Length of history data {len(history_data[pair][tf])}')
def set_all_pairs(self) -> None:
self.all_pairs = copy.deepcopy(
@@ -1012,63 +851,6 @@ class FreqaiDataKitchen:
if pair not in self.all_pairs:
self.all_pairs.append(pair)
def load_all_pair_histories(self, timerange: TimeRange) -> None:
"""
Load pair histories for all whitelist and corr_pairlist pairs.
Only called once upon startup of bot.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
"""
history_data = self.dd.historic_data
for pair in self.all_pairs:
if pair not in history_data:
history_data[pair] = {}
for tf in self.freqai_config.get("feature_parameters", {}).get("include_timeframes"):
history_data[pair][tf] = load_pair_history(
datadir=self.config["datadir"],
timeframe=tf,
pair=pair,
timerange=timerange,
data_format=self.config.get("dataformat_ohlcv", "json"),
candle_type=self.config.get("trading_mode", "spot"),
)
def get_base_and_corr_dataframes(
self, timerange: TimeRange, pair: str
) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
"""
Searches through our historic_data in memory and returns the dataframes relevant
to the present pair.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
metadata: dict = strategy furnished pair metadata
"""
with self.dd.history_lock:
corr_dataframes: Dict[Any, Any] = {}
base_dataframes: Dict[Any, Any] = {}
historic_data = self.dd.historic_data
pairs = self.freqai_config.get("feature_parameters", {}).get(
"include_corr_pairlist", []
)
for tf in self.freqai_config.get("feature_parameters", {}).get("include_timeframes"):
base_dataframes[tf] = self.slice_dataframe(timerange, historic_data[pair][tf])
if pairs:
for p in pairs:
if pair in p:
continue # dont repeat anything from whitelist
if p not in corr_dataframes:
corr_dataframes[p] = {}
corr_dataframes[p][tf] = self.slice_dataframe(
timerange, historic_data[p][tf]
)
return corr_dataframes, base_dataframes
def use_strategy_to_populate_indicators(
self,
strategy: IStrategy,
@@ -1134,20 +916,6 @@ class FreqaiDataKitchen:
return dataframe
def fit_live_predictions(self) -> None:
"""
Fit the labels with a gaussian distribution
"""
import scipy as spy
num_candles = self.freqai_config.get("fit_live_predictions_candles", 100)
self.data["labels_mean"], self.data["labels_std"] = {}, {}
for label in self.label_list:
f = spy.stats.norm.fit(self.dd.historic_predictions[self.pair][label].tail(num_candles))
self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
return
def fit_labels(self) -> None:
"""
Fit the labels with a gaussian distribution

View File

@@ -102,7 +102,7 @@ class IFreqaiModel(ABC):
self.dd.set_pair_dict_info(metadata)
if self.live:
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
dk = self.start_live(dataframe, metadata, strategy, self.dk)
# For backtesting, each pair enters and then gets trained for each window along the
@@ -111,7 +111,7 @@ class IFreqaiModel(ABC):
# FreqAI slides the window and sequentially builds the backtesting results before returning
# the concatenated results for the full backtesting period back to the strategy.
elif not self.follow_mode:
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
dataframe = self.dk.use_strategy_to_populate_indicators(
@@ -138,7 +138,7 @@ class IFreqaiModel(ABC):
if self.dd.pair_dict[pair]["priority"] != 1:
continue
dk = FreqaiDataKitchen(self.config, self.dd, self.live, pair)
dk = FreqaiDataKitchen(self.config, self.live, pair)
dk.set_paths(pair, trained_timestamp)
(
retrain,
@@ -217,9 +217,9 @@ class IFreqaiModel(ABC):
self.dd.pair_dict[metadata["pair"]]["trained_timestamp"] = int(
trained_timestamp.stopts)
dk.set_new_model_names(metadata["pair"], trained_timestamp)
dk.save_data(self.model, metadata["pair"])
self.dd.save_data(self.model, metadata["pair"], dk)
else:
self.model = dk.load_data(metadata["pair"])
self.model = self.dd.load_data(metadata["pair"], dk)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
@@ -260,7 +260,7 @@ class IFreqaiModel(ABC):
# append the historic data once per round
if self.dd.historic_data:
dk.update_historic_data(strategy)
self.dd.update_historic_data(strategy, dk)
logger.debug(f'Updating historic data on pair {metadata["pair"]}')
if not self.follow_mode:
@@ -278,7 +278,7 @@ class IFreqaiModel(ABC):
"data saved"
)
dk.download_all_data_for_training(data_load_timerange)
dk.load_all_pair_histories(data_load_timerange)
self.dd.load_all_pair_histories(data_load_timerange, dk)
if not self.scanning:
self.scanning = True
@@ -292,7 +292,7 @@ class IFreqaiModel(ABC):
)
# load the model and associated data into the data kitchen
self.model = dk.load_data(coin=metadata["pair"])
self.model = self.dd.load_data(metadata["pair"], dk)
dataframe = self.dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
@@ -468,8 +468,8 @@ class IFreqaiModel(ABC):
new_trained_timerange does not contain any NaNs)
"""
corr_dataframes, base_dataframes = dk.get_base_and_corr_dataframes(
data_load_timerange, pair
corr_dataframes, base_dataframes = self.dd.get_base_and_corr_dataframes(
data_load_timerange, pair, dk
)
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
@@ -489,7 +489,7 @@ class IFreqaiModel(ABC):
if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning:
with self.lock:
self.dd.pair_to_end_of_training_queue(pair)
dk.save_data(model, coin=pair)
self.dd.save_data(model, pair, dk)
if self.freqai_info.get("purge_old_models", False):
self.dd.purge_old_models()
@@ -505,6 +505,20 @@ class IFreqaiModel(ABC):
self.dd.historic_predictions[pair] = pd.DataFrame()
self.dd.historic_predictions[pair] = copy.deepcopy(pred_df)
def fit_live_predictions(self, dk: FreqaiDataKitchen) -> None:
"""
Fit the labels with a gaussian distribution
"""
import scipy as spy
num_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
for label in dk.label_list:
f = spy.stats.norm.fit(self.dd.historic_predictions[dk.pair][label].tail(num_candles))
dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
return
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModlel.py for an example.

View File

@@ -55,8 +55,6 @@ class BaseRegressionModel(IFreqaiModel):
f"{end_date}--------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get('fit_live_predictions', 0):
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -73,8 +71,11 @@ class BaseRegressionModel(IFreqaiModel):
if pair not in self.dd.historic_predictions:
self.set_initial_historic_predictions(
data_dictionary['train_features'], model, dk, pair)
elif self.freqai_info.get('fit_live_predictions_candles', 0):
dk.fit_live_predictions()
if self.freqai_info.get('fit_live_predictions_candles', 0) and self.live:
self.fit_live_predictions(dk)
else:
dk.fit_labels()
self.dd.save_historic_predictions_to_disk()

View File

@@ -49,8 +49,7 @@ class BaseTensorFlowModel(IFreqaiModel):
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get('fit_live_predictions', 0):
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -67,8 +66,11 @@ class BaseTensorFlowModel(IFreqaiModel):
if pair not in self.dd.historic_predictions:
self.set_initial_historic_predictions(
data_dictionary['train_features'], model, dk, pair)
elif self.freqai_info.get('fit_live_predictions_candles', 0):
dk.fit_live_predictions()
if self.freqai_info.get('fit_live_predictions_candles', 0) and self.live:
self.fit_live_predictions(dk)
else:
dk.fit_labels()
self.dd.save_historic_predictions_to_disk()