merge develop into feat/freqai-rl-dev

This commit is contained in:
robcaulk
2022-11-12 10:54:34 +01:00
60 changed files with 1314 additions and 264 deletions

View File

@@ -0,0 +1,93 @@
import numpy as np
from joblib import Parallel
from sklearn.base import is_classifier
from sklearn.multioutput import MultiOutputClassifier, _fit_estimator
from sklearn.utils.fixes import delayed
from sklearn.utils.multiclass import check_classification_targets
from sklearn.utils.validation import has_fit_parameter
from freqtrade.exceptions import OperationalException
class FreqaiMultiOutputClassifier(MultiOutputClassifier):
def fit(self, X, y, sample_weight=None, fit_params=None):
"""Fit the model to data, separately for each output variable.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
y : {array-like, sparse matrix} of shape (n_samples, n_outputs)
Multi-output targets. An indicator matrix turns on multilabel
estimation.
sample_weight : array-like of shape (n_samples,), default=None
Sample weights. If `None`, then samples are equally weighted.
Only supported if the underlying classifier supports sample
weights.
fit_params : A list of dicts for the fit_params
Parameters passed to the ``estimator.fit`` method of each step.
Each dict may contain same or different values (e.g. different
eval_sets or init_models)
.. versionadded:: 0.23
Returns
-------
self : object
Returns a fitted instance.
"""
if not hasattr(self.estimator, "fit"):
raise ValueError("The base estimator should implement a fit method")
y = self._validate_data(X="no_validation", y=y, multi_output=True)
if is_classifier(self):
check_classification_targets(y)
if y.ndim == 1:
raise ValueError(
"y must have at least two dimensions for "
"multi-output regression but has only one."
)
if sample_weight is not None and not has_fit_parameter(
self.estimator, "sample_weight"
):
raise ValueError("Underlying estimator does not support sample weights.")
if not fit_params:
fit_params = [None] * y.shape[1]
self.estimators_ = Parallel(n_jobs=self.n_jobs)(
delayed(_fit_estimator)(
self.estimator, X, y[:, i], sample_weight, **fit_params[i]
)
for i in range(y.shape[1])
)
self.classes_ = []
for estimator in self.estimators_:
self.classes_.extend(estimator.classes_)
if len(set(self.classes_)) != len(self.classes_):
raise OperationalException(f"Class labels must be unique across targets: "
f"{self.classes_}")
if hasattr(self.estimators_[0], "n_features_in_"):
self.n_features_in_ = self.estimators_[0].n_features_in_
if hasattr(self.estimators_[0], "feature_names_in_"):
self.feature_names_in_ = self.estimators_[0].feature_names_in_
return self
def predict_proba(self, X):
"""
Get predict_proba and stack arrays horizontally
"""
results = np.hstack(super().predict_proba(X))
return np.squeeze(results)
def predict(self, X):
"""
Get predict and squeeze into 2D array
"""
results = super().predict(X)
return np.squeeze(results)

View File

@@ -87,6 +87,7 @@ class FreqaiDataDrawer:
self.create_follower_dict()
self.load_drawer_from_disk()
self.load_historic_predictions_from_disk()
self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
self.load_metric_tracker_from_disk()
self.training_queue: Dict[str, int] = {}
self.history_lock = threading.Lock()
@@ -97,7 +98,6 @@ class FreqaiDataDrawer:
self.empty_pair_dict: pair_info = {
"model_filename": "", "trained_timestamp": 0,
"data_path": "", "extras": {}}
self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
self.limit_ram_use = self.freqai_info.get('limit_ram_usage', False)
if 'rl_config' in self.freqai_info:
self.model_type = 'stable_baselines'
@@ -160,6 +160,7 @@ class FreqaiDataDrawer:
if exists:
with open(self.metric_tracker_path, "r") as fp:
self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
logger.info("Loading existing metric tracker from disk.")
else:
logger.info("Could not find existing metric tracker, starting from scratch")
@@ -549,14 +550,6 @@ class FreqaiDataDrawer:
if dk.live:
dk.model_filename = self.pair_dict[coin]["model_filename"]
dk.data_path = Path(self.pair_dict[coin]["data_path"])
if self.freqai_info.get("follow_mode", False):
# follower can be on a different system which is rsynced from the leader:
dk.data_path = Path(
self.config["user_data_dir"]
/ "models"
/ dk.data_path.parts[-2]
/ dk.data_path.parts[-1]
)
if coin in self.meta_data_dictionary:
dk.data = self.meta_data_dictionary[coin]["meta_data"]
@@ -652,6 +645,8 @@ class FreqaiDataDrawer:
axis=0,
)
self.current_candle = history_data[dk.pair][self.config['timeframe']].iloc[-1]['date']
def load_all_pair_histories(self, timerange: TimeRange, dk: FreqaiDataKitchen) -> None:
"""
Load pair histories for all whitelist and corr_pairlist pairs.

View File

@@ -1,7 +1,7 @@
import copy
import logging
import shutil
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from math import cos, sin
from pathlib import Path
from typing import Any, Dict, List, Tuple
@@ -81,19 +81,25 @@ class FreqaiDataKitchen:
self.svm_model: linear_model.SGDOneClassSVM = None
self.keras: bool = self.freqai_config.get("keras", False)
self.set_all_pairs()
if not self.live:
if not self.config["timerange"]:
raise OperationalException(
'Please pass --timerange if you intend to use FreqAI for backtesting.')
self.full_timerange = self.create_fulltimerange(
self.config["timerange"], self.freqai_config.get("train_period_days", 0)
)
self.backtest_live_models = config.get("freqai_backtest_live_models", False)
(self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
self.full_timerange,
config["freqai"]["train_period_days"],
config["freqai"]["backtest_period_days"],
)
if not self.live:
self.full_path = self.get_full_models_path(self.config)
if self.backtest_live_models:
if self.pair:
self.set_timerange_from_ready_models()
(self.training_timeranges,
self.backtesting_timeranges) = self.split_timerange_live_models()
else:
self.full_timerange = self.create_fulltimerange(
self.config["timerange"], self.freqai_config.get("train_period_days", 0)
)
(self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
self.full_timerange,
config["freqai"]["train_period_days"],
config["freqai"]["backtest_period_days"],
)
self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
if not self.freqai_config.get("data_kitchen_thread_count", 0):
@@ -103,6 +109,7 @@ class FreqaiDataKitchen:
self.train_dates: DataFrame = pd.DataFrame()
self.unique_classes: Dict[str, list] = {}
self.unique_class_list: list = []
self.backtest_live_models_data: Dict[str, Any] = {}
def set_paths(
self,
@@ -114,10 +121,7 @@ class FreqaiDataKitchen:
:param metadata: dict = strategy furnished pair metadata
:param trained_timestamp: int = timestamp of most recent training
"""
self.full_path = Path(
self.config["user_data_dir"] / "models" / str(self.freqai_config.get("identifier"))
)
self.full_path = self.get_full_models_path(self.config)
self.data_path = Path(
self.full_path
/ f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
@@ -248,7 +252,7 @@ class FreqaiDataKitchen:
self.data["filter_drop_index_training"] = drop_index
else:
if len(self.data['constant_features_list']):
if 'constant_features_list' in self.data and len(self.data['constant_features_list']):
filtered_df = self.check_pred_labels(filtered_df)
# we are backtesting so we need to preserve row number to send back to strategy,
# so now we use do_predict to avoid any prediction based on a NaN
@@ -459,6 +463,29 @@ class FreqaiDataKitchen:
# print(tr_training_list, tr_backtesting_list)
return tr_training_list_timerange, tr_backtesting_list_timerange
def split_timerange_live_models(
self
) -> Tuple[list, list]:
tr_backtesting_list_timerange = []
asset = self.pair.split("/")[0]
if asset not in self.backtest_live_models_data["assets_end_dates"]:
raise OperationalException(
f"Model not available for pair {self.pair}. "
"Please, try again after removing this pair from the configuration file."
)
asset_data = self.backtest_live_models_data["assets_end_dates"][asset]
backtesting_timerange = self.backtest_live_models_data["backtesting_timerange"]
model_end_dates = [x for x in asset_data]
model_end_dates.append(backtesting_timerange.stopts)
model_end_dates.sort()
for index, item in enumerate(model_end_dates):
if len(model_end_dates) > (index + 1):
tr_to_add = TimeRange("date", "date", item, model_end_dates[index + 1])
tr_backtesting_list_timerange.append(tr_to_add)
return tr_backtesting_list_timerange, tr_backtesting_list_timerange
def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
"""
Given a full dataframe, extract the user desired window
@@ -966,11 +993,13 @@ class FreqaiDataKitchen:
append_df[label] = predictions[label]
if append_df[label].dtype == object:
continue
append_df[f"{label}_mean"] = self.data["labels_mean"][label]
append_df[f"{label}_std"] = self.data["labels_std"][label]
if "labels_mean" in self.data:
append_df[f"{label}_mean"] = self.data["labels_mean"][label]
if "labels_std" in self.data:
append_df[f"{label}_std"] = self.data["labels_std"][label]
for extra_col in self.data["extra_returns_per_train"]:
append_df["{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
append_df["do_predict"] = do_predict
if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
@@ -1035,11 +1064,6 @@ class FreqaiDataKitchen:
start = datetime.fromtimestamp(backtest_timerange.startts, tz=timezone.utc)
stop = datetime.fromtimestamp(backtest_timerange.stopts, tz=timezone.utc)
full_timerange = start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d")
self.full_path = Path(
self.config["user_data_dir"] / "models" / f"{self.freqai_config['identifier']}"
)
config_path = Path(self.config["config_files"][0])
if not self.full_path.is_dir():
@@ -1122,15 +1146,15 @@ class FreqaiDataKitchen:
return retrain, trained_timerange, data_load_timerange
def set_new_model_names(self, pair: str, trained_timerange: TimeRange):
def set_new_model_names(self, pair: str, timestamp_id: int):
coin, _ = pair.split("/")
self.data_path = Path(
self.full_path
/ f"sub-train-{pair.split('/')[0]}_{int(trained_timerange.stopts)}"
/ f"sub-train-{pair.split('/')[0]}_{timestamp_id}"
)
self.model_filename = f"cb_{coin.lower()}_{int(trained_timerange.stopts)}"
self.model_filename = f"cb_{coin.lower()}_{timestamp_id}"
def set_all_pairs(self) -> None:
@@ -1141,6 +1165,54 @@ class FreqaiDataKitchen:
if pair not in self.all_pairs:
self.all_pairs.append(pair)
def extract_corr_pair_columns_from_populated_indicators(
self,
dataframe: DataFrame
) -> Dict[str, DataFrame]:
"""
Find the columns of the dataframe corresponding to the corr_pairlist, save them
in a dictionary to be reused and attached to other pairs.
:param dataframe: fully populated dataframe (current pair + corr_pairs)
:return: corr_dataframes, dictionary of dataframes to be attached
to other pairs in same candle.
"""
corr_dataframes: Dict[str, DataFrame] = {}
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
for pair in pairs:
pair = pair.replace(':', '') # lightgbm doesnt like colons
valid_strs = [f"%-{pair}", f"%{pair}", f"%_{pair}"]
pair_cols = [col for col in dataframe.columns if
any(substr in col for substr in valid_strs)]
if pair_cols:
pair_cols.insert(0, 'date')
corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
return corr_dataframes
def attach_corr_pair_columns(self, dataframe: DataFrame,
corr_dataframes: Dict[str, DataFrame],
current_pair: str) -> DataFrame:
"""
Attach the existing corr_pair dataframes to the current pair dataframe before training
:param dataframe: current pair strategy dataframe, indicators populated already
:param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
:param current_pair: current pair to which we will attach corr pair dataframe
:return:
:dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
ready for training
"""
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
current_pair = current_pair.replace(':', '')
for pair in pairs:
pair = pair.replace(':', '') # lightgbm doesnt work with colons
if current_pair != pair:
dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
return dataframe
def use_strategy_to_populate_indicators(
self,
strategy: IStrategy,
@@ -1148,6 +1220,7 @@ class FreqaiDataKitchen:
base_dataframes: dict = {},
pair: str = "",
prediction_dataframe: DataFrame = pd.DataFrame(),
do_corr_pairs: bool = True,
) -> DataFrame:
"""
Use the user defined strategy for populating indicators during retrain
@@ -1157,15 +1230,15 @@ class FreqaiDataKitchen:
:param base_dataframes: dict = dict containing the current pair dataframes
(for user defined timeframes)
:param metadata: dict = strategy furnished pair metadata
:returns:
:return:
dataframe: DataFrame = dataframe containing populated indicators
"""
# for prediction dataframe creation, we let dataprovider handle everything in the strategy
# so we create empty dictionaries, which allows us to pass None to
# `populate_any_indicators()`. Signaling we want the dp to give us the live dataframe.
tfs = self.freqai_config["feature_parameters"].get("include_timeframes")
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
pairs: List[str] = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
if not prediction_dataframe.empty:
dataframe = prediction_dataframe.copy()
for tf in tfs:
@@ -1188,19 +1261,24 @@ class FreqaiDataKitchen:
informative=base_dataframes[tf],
set_generalized_indicators=sgi
)
if pairs:
for i in pairs:
if pair in i:
continue # dont repeat anything from whitelist
# ensure corr pairs are always last
for corr_pair in pairs:
if pair == corr_pair:
continue # dont repeat anything from whitelist
for tf in tfs:
if pairs and do_corr_pairs:
dataframe = strategy.populate_any_indicators(
i,
corr_pair,
dataframe.copy(),
tf,
informative=corr_dataframes[i][tf]
informative=corr_dataframes[corr_pair][tf]
)
self.get_unique_classes_from_labels(dataframe)
dataframe = self.remove_special_chars_from_feature_names(dataframe)
return dataframe
def fit_labels(self) -> None:
@@ -1267,14 +1345,16 @@ class FreqaiDataKitchen:
append_df = pd.read_hdf(self.backtesting_results_path)
return append_df
def check_if_backtest_prediction_exists(
self
def check_if_backtest_prediction_is_valid(
self,
len_backtest_df: int
) -> bool:
"""
Check if a backtesting prediction already exists
:param dk: FreqaiDataKitchen
Check if a backtesting prediction already exists and if the predictions
to append have the same size as the backtesting dataframe slice
:param length_backtesting_dataframe: Length of backtesting dataframe slice
:return:
:boolean: whether the prediction file exists or not.
:boolean: whether the prediction file is valid.
"""
path_to_predictionfile = Path(self.full_path /
self.backtest_predictions_folder /
@@ -1282,10 +1362,134 @@ class FreqaiDataKitchen:
self.backtesting_results_path = path_to_predictionfile
file_exists = path_to_predictionfile.is_file()
if file_exists:
logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
append_df = self.get_backtesting_prediction()
if len(append_df) == len_backtest_df:
logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
return True
else:
logger.info("A new backtesting prediction file is required. "
"(Number of predictions is different from dataframe length).")
return False
else:
logger.info(
f"Could not find backtesting prediction file at {path_to_predictionfile}"
)
return file_exists
return False
def set_timerange_from_ready_models(self):
backtesting_timerange, \
assets_end_dates = (
self.get_timerange_and_assets_end_dates_from_ready_models(self.full_path))
self.backtest_live_models_data = {
"backtesting_timerange": backtesting_timerange,
"assets_end_dates": assets_end_dates
}
return
def get_full_models_path(self, config: Config) -> Path:
"""
Returns default FreqAI model path
:param config: Configuration dictionary
"""
freqai_config: Dict[str, Any] = config["freqai"]
return Path(
config["user_data_dir"] / "models" / str(freqai_config.get("identifier"))
)
def get_timerange_and_assets_end_dates_from_ready_models(
self, models_path: Path) -> Tuple[TimeRange, Dict[str, Any]]:
"""
Returns timerange information based on a FreqAI model directory
:param models_path: FreqAI model path
:return: a Tuple with (Timerange calculated from directory and
a Dict with pair and model end training dates info)
"""
all_models_end_dates = []
assets_end_dates: Dict[str, Any] = self.get_assets_timestamps_training_from_ready_models(
models_path)
for key in assets_end_dates:
for model_end_date in assets_end_dates[key]:
if model_end_date not in all_models_end_dates:
all_models_end_dates.append(model_end_date)
if len(all_models_end_dates) == 0:
raise OperationalException(
'At least 1 saved model is required to '
'run backtest with the freqai-backtest-live-models option'
)
if len(all_models_end_dates) == 1:
logger.warning(
"Only 1 model was found. Backtesting will run with the "
"timerange from the end of the training date to the current date"
)
finish_timestamp = int(datetime.now(tz=timezone.utc).timestamp())
if len(all_models_end_dates) > 1:
# After last model end date, use the same period from previous model
# to finish the backtest
all_models_end_dates.sort(reverse=True)
finish_timestamp = all_models_end_dates[0] + \
(all_models_end_dates[0] - all_models_end_dates[1])
all_models_end_dates.append(finish_timestamp)
all_models_end_dates.sort()
start_date = (datetime(*datetime.fromtimestamp(min(all_models_end_dates),
timezone.utc).timetuple()[:3], tzinfo=timezone.utc))
end_date = (datetime(*datetime.fromtimestamp(max(all_models_end_dates),
timezone.utc).timetuple()[:3], tzinfo=timezone.utc))
# add 1 day to string timerange to ensure BT module will load all dataframe data
end_date = end_date + timedelta(days=1)
backtesting_timerange = TimeRange(
'date', 'date', int(start_date.timestamp()), int(end_date.timestamp())
)
return backtesting_timerange, assets_end_dates
def get_assets_timestamps_training_from_ready_models(
self, models_path: Path) -> Dict[str, Any]:
"""
Scan the models path and returns all assets end training dates (timestamp)
:param models_path: FreqAI model path
:return: a Dict with asset and model end training dates info
"""
assets_end_dates: Dict[str, Any] = {}
if not models_path.is_dir():
raise OperationalException(
'Model folders not found. Saved models are required '
'to run backtest with the freqai-backtest-live-models option'
)
for model_dir in models_path.iterdir():
if str(model_dir.name).startswith("sub-train"):
model_end_date = int(model_dir.name.split("_")[1])
asset = model_dir.name.split("_")[0].replace("sub-train-", "")
model_file_name = (
f"cb_{str(model_dir.name).replace('sub-train-', '').lower()}"
"_model.joblib"
)
model_path_file = Path(model_dir / model_file_name)
if model_path_file.is_file():
if asset not in assets_end_dates:
assets_end_dates[asset] = []
assets_end_dates[asset].append(model_end_date)
return assets_end_dates
def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""
Remove all special characters from feature strings (:)
:param dataframe: the dataframe that just finished indicator population. (unfiltered)
:return: dataframe with cleaned featrue names
"""
spec_chars = [':']
for c in spec_chars:
dataframe.columns = dataframe.columns.str.replace(c, "")
return dataframe

View File

@@ -5,8 +5,7 @@ from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional, Literal, Tuple
from typing import Any, Dict, List, Literal, Optional, Tuple
import numpy as np
import pandas as pd
@@ -70,22 +69,23 @@ class IFreqaiModel(ABC):
if self.save_backtest_models:
logger.info('Backtesting module configured to save all models.')
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
# set current candle to arbitrary historical date
self.current_candle: datetime = datetime.fromtimestamp(637887600, tz=timezone.utc)
self.dd.current_candle = self.current_candle
self.scanning = False
self.ft_params = self.freqai_info["feature_parameters"]
self.corr_pairlist: List[str] = self.ft_params.get("include_corr_pairlist", [])
self.keras: bool = self.freqai_info.get("keras", False)
if self.keras and self.ft_params.get("DI_threshold", 0):
self.ft_params["DI_threshold"] = 0
logger.warning("DI threshold is not configured for Keras models yet. Deactivating.")
self.CONV_WIDTH = self.freqai_info.get("conv_width", 2)
self.CONV_WIDTH = self.freqai_info.get('conv_width', 1)
if self.ft_params.get("inlier_metric_window", 0):
self.CONV_WIDTH = self.ft_params.get("inlier_metric_window", 0) * 2
self.pair_it = 0
self.pair_it_train = 0
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
self.train_queue = self._set_train_queue()
self.last_trade_database_summary: DataFrame = {}
self.current_trade_database_summary: DataFrame = {}
self.analysis_lock = Lock()
self.inference_time: float = 0
self.train_time: float = 0
self.begin_time: float = 0
@@ -93,7 +93,10 @@ class IFreqaiModel(ABC):
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
self.continual_learning = self.freqai_info.get('continual_learning', False)
self.plot_features = self.ft_params.get("plot_feature_importances", 0)
self.corr_dataframes: Dict[str, DataFrame] = {}
# get_corr_dataframes is controlling the caching of corr_dataframes
# for improved performance. Careful with this boolean.
self.get_corr_dataframes: bool = True
self._threads: List[threading.Thread] = []
self._stop_event = threading.Event()
self.strategy: Optional[IStrategy] = None
@@ -140,7 +143,11 @@ class IFreqaiModel(ABC):
# the concatenated results for the full backtesting period back to the strategy.
elif not self.follow_mode:
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
if self.dk.backtest_live_models:
logger.info(
f"Backtesting {len(self.dk.backtesting_timeranges)} timeranges (live models)")
else:
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
dataframe = self.dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
)
@@ -269,25 +276,20 @@ class IFreqaiModel(ABC):
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
trained_timestamp = tr_train
tr_train_startts_str = datetime.fromtimestamp(
tr_train.startts,
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
tr_train_stopts_str = datetime.fromtimestamp(
tr_train.stopts,
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
logger.info(
f"Training {pair}, {self.pair_it}/{self.total_pairs} pairs"
f" from {tr_train_startts_str} to {tr_train_stopts_str}, {train_it}/{total_trains} "
"trains"
)
if not self.ensure_data_exists(dataframe_backtest, tr_backtest, pair):
continue
trained_timestamp_int = int(trained_timestamp.stopts)
dk.set_paths(pair, trained_timestamp_int)
self.log_backtesting_progress(tr_train, pair, train_it, total_trains)
dk.set_new_model_names(pair, trained_timestamp)
timestamp_model_id = int(tr_train.stopts)
if dk.backtest_live_models:
timestamp_model_id = int(tr_backtest.startts)
if dk.check_if_backtest_prediction_exists():
dk.set_paths(pair, timestamp_model_id)
dk.set_new_model_names(pair, timestamp_model_id)
if dk.check_if_backtest_prediction_is_valid(len(dataframe_backtest)):
self.dd.load_metadata(dk)
dk.find_features(dataframe_train)
self.check_if_feature_list_matches_strategy(dk)
@@ -299,7 +301,7 @@ class IFreqaiModel(ABC):
dk.find_labels(dataframe_train)
self.model = self.train(dataframe_train, pair, dk)
self.dd.pair_dict[pair]["trained_timestamp"] = int(
trained_timestamp.stopts)
tr_train.stopts)
if self.plot_features:
plot_feature_importance(self.model, pair, dk, self.plot_features)
if self.save_backtest_models:
@@ -351,6 +353,7 @@ class IFreqaiModel(ABC):
if self.dd.historic_data:
self.dd.update_historic_data(strategy, dk)
logger.debug(f'Updating historic data on pair {metadata["pair"]}')
self.track_current_candle()
if not self.follow_mode:
@@ -377,10 +380,10 @@ class IFreqaiModel(ABC):
# load the model and associated data into the data kitchen
self.model = self.dd.load_data(metadata["pair"], dk)
with self.analysis_lock:
dataframe = self.dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
)
dataframe = dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"],
do_corr_pairs=self.get_corr_dataframes
)
if not self.model:
logger.warning(
@@ -389,6 +392,9 @@ class IFreqaiModel(ABC):
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
if self.corr_pairlist:
dataframe = self.cache_corr_pairlist_dfs(dataframe, dk)
dk.find_labels(dataframe)
self.build_strategy_return_arrays(dataframe, dk, metadata["pair"], trained_timestamp)
@@ -572,10 +578,9 @@ class IFreqaiModel(ABC):
data_load_timerange, pair, dk
)
with self.analysis_lock:
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
strategy, corr_dataframes, base_dataframes, pair
)
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
strategy, corr_dataframes, base_dataframes, pair
)
unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
@@ -586,7 +591,7 @@ class IFreqaiModel(ABC):
model = self.train(unfiltered_dataframe, pair, dk)
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange)
dk.set_new_model_names(pair, new_trained_timerange.stopts)
self.dd.save_data(model, pair, dk)
if self.plot_features:
@@ -751,6 +756,87 @@ class IFreqaiModel(ABC):
f'Best approximation queue: {best_queue}')
return best_queue
def cache_corr_pairlist_dfs(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
Cache the corr_pairlist dfs to speed up performance for subsequent pairs during the
current candle.
:param dataframe: strategy fed dataframe
:param dk: datakitchen object for current asset
:return: dataframe to attach/extract cached corr_pair dfs to/from.
"""
if self.get_corr_dataframes:
self.corr_dataframes = dk.extract_corr_pair_columns_from_populated_indicators(dataframe)
if not self.corr_dataframes:
logger.warning("Couldn't cache corr_pair dataframes for improved performance. "
"Consider ensuring that the full coin/stake, e.g. XYZ/USD, "
"is included in the column names when you are creating features "
"in `populate_any_indicators()`.")
self.get_corr_dataframes = not bool(self.corr_dataframes)
elif self.corr_dataframes:
dataframe = dk.attach_corr_pair_columns(
dataframe, self.corr_dataframes, dk.pair)
return dataframe
def track_current_candle(self):
"""
Checks if the latest candle appended by the datadrawer is
equivalent to the latest candle seen by FreqAI. If not, it
asks to refresh the cached corr_dfs, and resets the pair
counter.
"""
if self.dd.current_candle > self.current_candle:
self.get_corr_dataframes = True
self.pair_it = 1
self.current_candle = self.dd.current_candle
def ensure_data_exists(self, dataframe_backtest: DataFrame,
tr_backtest: TimeRange, pair: str) -> bool:
"""
Check if the dataframe is empty, if not, report useful information to user.
:param dataframe_backtest: the backtesting dataframe, maybe empty.
:param tr_backtest: current backtesting timerange.
:param pair: current pair
:return: if the data exists or not
"""
if self.config.get("freqai_backtest_live_models", False) and len(dataframe_backtest) == 0:
tr_backtest_startts_str = datetime.fromtimestamp(
tr_backtest.startts,
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
tr_backtest_stopts_str = datetime.fromtimestamp(
tr_backtest.stopts,
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
logger.info(f"No data found for pair {pair} from {tr_backtest_startts_str} "
f" from {tr_backtest_startts_str} to {tr_backtest_stopts_str}. "
"Probably more than one training within the same candle period.")
return False
return True
def log_backtesting_progress(self, tr_train: TimeRange, pair: str,
train_it: int, total_trains: int):
"""
Log the backtesting progress so user knows how many pairs have been trained and
how many more pairs/trains remain.
:param tr_train: the training timerange
:param train_it: the train iteration for the current pair (the sliding window progress)
:param pair: the current pair
:param total_trains: total trains (total number of slides for the sliding window)
"""
tr_train_startts_str = datetime.fromtimestamp(
tr_train.startts,
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
tr_train_stopts_str = datetime.fromtimestamp(
tr_train.stopts,
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
if not self.config.get("freqai_backtest_live_models", False):
logger.info(
f"Training {pair}, {self.pair_it}/{self.total_pairs} pairs"
f" from {tr_train_startts_str} "
f"to {tr_train_stopts_str}, {train_it}/{total_trains} "
"trains"
)
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModel.py for an example.

View File

@@ -0,0 +1,74 @@
import logging
import sys
from pathlib import Path
from typing import Any, Dict
from catboost import CatBoostClassifier, Pool
from freqtrade.freqai.base_models.BaseClassifierModel import BaseClassifierModel
from freqtrade.freqai.base_models.FreqaiMultiOutputClassifier import FreqaiMultiOutputClassifier
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
logger = logging.getLogger(__name__)
class CatboostClassifierMultiTarget(BaseClassifierModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
:param data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
cbc = CatBoostClassifier(
allow_writing_files=True,
loss_function='MultiClass',
train_dir=Path(dk.data_path),
**self.model_training_parameters,
)
X = data_dictionary["train_features"]
y = data_dictionary["train_labels"]
sample_weight = data_dictionary["train_weights"]
eval_sets = [None] * y.shape[1]
if self.freqai_info.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
eval_sets = [None] * data_dictionary['test_labels'].shape[1]
for i in range(data_dictionary['test_labels'].shape[1]):
eval_sets[i] = Pool(
data=data_dictionary["test_features"],
label=data_dictionary["test_labels"].iloc[:, i],
weight=data_dictionary["test_weights"],
)
init_model = self.get_init_model(dk.pair)
if init_model:
init_models = init_model.estimators_
else:
init_models = [None] * y.shape[1]
fit_params = []
for i in range(len(eval_sets)):
fit_params.append({
'eval_set': eval_sets[i], 'init_model': init_models[i],
'log_cout': sys.stdout, 'log_cerr': sys.stderr,
})
model = FreqaiMultiOutputClassifier(estimator=cbc)
thread_training = self.freqai_info.get('multitarget_parallel_training', False)
if thread_training:
model.n_jobs = y.shape[1]
model.fit(X=X, y=y, sample_weight=sample_weight, fit_params=fit_params)
return model

View File

@@ -0,0 +1,64 @@
import logging
from typing import Any, Dict
from lightgbm import LGBMClassifier
from freqtrade.freqai.base_models.BaseClassifierModel import BaseClassifierModel
from freqtrade.freqai.base_models.FreqaiMultiOutputClassifier import FreqaiMultiOutputClassifier
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
logger = logging.getLogger(__name__)
class LightGBMClassifierMultiTarget(BaseClassifierModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
:param data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
lgb = LGBMClassifier(**self.model_training_parameters)
X = data_dictionary["train_features"]
y = data_dictionary["train_labels"]
sample_weight = data_dictionary["train_weights"]
eval_weights = None
eval_sets = [None] * y.shape[1]
if self.freqai_info.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
eval_weights = [data_dictionary["test_weights"]]
eval_sets = [(None, None)] * data_dictionary['test_labels'].shape[1] # type: ignore
for i in range(data_dictionary['test_labels'].shape[1]):
eval_sets[i] = ( # type: ignore
data_dictionary["test_features"],
data_dictionary["test_labels"].iloc[:, i]
)
init_model = self.get_init_model(dk.pair)
if init_model:
init_models = init_model.estimators_
else:
init_models = [None] * y.shape[1]
fit_params = []
for i in range(len(eval_sets)):
fit_params.append(
{'eval_set': eval_sets[i], 'eval_sample_weight': eval_weights,
'init_model': init_models[i]})
model = FreqaiMultiOutputClassifier(estimator=lgb)
thread_training = self.freqai_info.get('multitarget_parallel_training', False)
if thread_training:
model.n_jobs = y.shape[1]
model.fit(X=X, y=y, sample_weight=sample_weight, fit_params=fit_params)
return model

View File

@@ -218,3 +218,19 @@ def record_params(config: Dict[str, Any], full_path: Path) -> None:
default=str,
number_mode=rapidjson.NM_NATIVE | rapidjson.NM_NAN
)
def get_timerange_backtest_live_models(config: Config) -> str:
"""
Returns a formated timerange for backtest live/ready models
:param config: Configuration dictionary
:return: a string timerange (format example: '20220801-20220822')
"""
dk = FreqaiDataKitchen(config)
models_path = dk.get_full_models_path(config)
timerange, _ = dk.get_timerange_and_assets_end_dates_from_ready_models(models_path)
start_date = datetime.fromtimestamp(timerange.startts, tz=timezone.utc)
end_date = datetime.fromtimestamp(timerange.stopts, tz=timezone.utc)
tr = f"{start_date.strftime('%Y%m%d')}-{end_date.strftime('%Y%m%d')}"
return tr