This commit is contained in:
Robert Caulk
2022-12-19 00:55:51 +01:00
committed by GitHub
6 changed files with 261 additions and 8 deletions

View File

@@ -2,6 +2,8 @@ import logging
from time import time
from typing import Any
import numpy as np
import tensorflow as tf
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
@@ -17,6 +19,14 @@ class BaseTensorFlowModel(IFreqaiModel):
User *must* inherit from this class and set fit() and predict().
"""
def __init__(self, **kwargs):
super().__init__(config=kwargs['config'])
self.keras = True
# if self.ft_params.get("DI_threshold", 0):
# self.ft_params["DI_threshold"] = 0
# logger.warning("DI threshold is not configured for Keras models yet. Deactivating.")
self.dd.model_type = 'keras'
def train(
self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
) -> Any:
@@ -33,7 +43,6 @@ class BaseTensorFlowModel(IFreqaiModel):
start_time = time()
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
@@ -41,13 +50,9 @@ class BaseTensorFlowModel(IFreqaiModel):
training_filter=True,
)
start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d")
end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d")
logger.info(f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -68,3 +73,76 @@ class BaseTensorFlowModel(IFreqaiModel):
f"({end_time - start_time:.2f} secs) --------------------")
return model
class WindowGenerator:
def __init__(
self,
input_width,
label_width,
shift,
train_df=None,
val_df=None,
test_df=None,
train_labels=None,
val_labels=None,
test_labels=None,
batch_size=None,
):
# Store the raw data.
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
self.train_labels = train_labels
self.val_labels = val_labels
self.test_labels = test_labels
self.batch_size = batch_size
self.input_width = input_width
self.label_width = label_width
self.shift = shift
self.total_window_size = input_width + shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
def make_dataset(self, data, labels=None):
data = np.array(data, dtype=np.float32)
if labels is not None:
labels = np.array(labels, dtype=np.float32)
ds = tf.keras.preprocessing.timeseries_dataset_from_array(
data=data,
targets=labels,
sequence_length=self.total_window_size,
sequence_stride=1,
sampling_rate=1,
shuffle=False,
batch_size=self.batch_size,
)
return ds
@property
def train(self):
return self.make_dataset(self.train_df, self.train_labels)
@property
def val(self):
return self.make_dataset(self.val_df, self.val_labels)
@property
def test(self):
return self.make_dataset(self.test_df, self.test_labels)
@property
def inference(self):
return self.make_dataset(self.test_df)
@property
def example(self):
"""Get and cache an example batch of `inputs, labels` for plotting."""
result = getattr(self, "_example", None)
if result is None:
# No example batch was found, so get one from the `.train` dataset
result = next(iter(self.train))
# And cache it for next time
self._example = result
return result

View File

@@ -0,0 +1,152 @@
import logging
from typing import Any, Dict, Tuple
import numpy as np
import tensorflow as tf
from pandas import DataFrame
from tensorflow.keras.layers import Conv1D, Dense, Input
from tensorflow.keras.models import Model
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.base_models.BaseTensorFlowModel import BaseTensorFlowModel, WindowGenerator
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
logger = logging.getLogger(__name__)
class CNNPredictionModel(BaseTensorFlowModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), fit().
"""
def fit(self, data_dictionary: Dict[str, Any], dk: FreqaiDataKitchen) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
train_df = data_dictionary["train_features"]
train_labels = data_dictionary["train_labels"]
test_df = data_dictionary["test_features"]
test_labels = data_dictionary["test_labels"]
n_labels = len(train_labels.columns)
if n_labels > 1:
raise OperationalException(
"Neural Net not yet configured for multi-targets. Please "
" reduce number of targets to 1 in strategy."
)
n_features = len(data_dictionary["train_features"].columns)
BATCH_SIZE = self.model_training_parameters.get("batch_size", 64)
# we need to remove batch_size from the model_training_params because
# we dont want fit() to get the incorrect assignment (we use the WindowGenerator)
# to handle our batches.
if 'batch_size' in self.model_training_parameters:
self.model_training_parameters.pop('batch_size')
input_dims = [BATCH_SIZE, self.CONV_WIDTH, n_features]
w1 = WindowGenerator(
input_width=self.CONV_WIDTH,
label_width=1,
shift=1,
train_df=train_df,
val_df=test_df,
train_labels=train_labels,
val_labels=test_labels,
batch_size=BATCH_SIZE,
)
model = self.create_model(input_dims, n_labels)
steps_per_epoch = np.ceil(len(test_df) / BATCH_SIZE)
lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
0.001, decay_steps=steps_per_epoch * 1000, decay_rate=1, staircase=False
)
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor="loss", patience=3, mode="min", min_delta=0.0001
)
model.compile(
loss=tf.losses.MeanSquaredError(),
optimizer=tf.optimizers.Adam(lr_schedule),
metrics=[tf.metrics.MeanAbsoluteError()],
)
if self.freqai_info.get('data_split_parameters', {}).get('test_size', 0.1) == 0:
val_data = None
else:
val_data = w1.val
model.fit(
w1.train,
validation_data=val_data,
callbacks=[early_stopping],
**self.model_training_parameters,
)
return model
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first=True
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, dk.training_features_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk)
if first:
full_df = dk.data_dictionary["prediction_features"]
w1 = WindowGenerator(
input_width=self.CONV_WIDTH,
label_width=1,
shift=1,
test_df=full_df,
batch_size=len(full_df),
)
predictions = self.model.predict(w1.inference)
len_diff = len(dk.do_predict) - len(predictions)
if len_diff > 0:
dk.do_predict = dk.do_predict[len_diff:]
else:
data = dk.data_dictionary["prediction_features"]
data = tf.expand_dims(data, axis=0)
data = tf.convert_to_tensor(data)
predictions = self.model(data, training=False)
predictions = predictions[:, 0, 0]
pred_df = DataFrame(predictions, columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df)
return (pred_df, np.ones(len(pred_df)))
def create_model(self, input_dims, n_labels) -> Any:
input_layer = Input(shape=(input_dims[1], input_dims[2]))
Layer_1 = Conv1D(filters=32, kernel_size=(self.CONV_WIDTH,), activation="relu")(input_layer)
Layer_3 = Dense(units=32, activation="relu")(Layer_1)
output_layer = Dense(units=n_labels)(Layer_3)
return Model(inputs=input_layer, outputs=output_layer)