Rehaul organization of return values

This commit is contained in:
robcaulk
2022-07-02 18:09:38 +02:00
parent 93e1410ed9
commit 106131ff0f
7 changed files with 429 additions and 292 deletions

View File

@@ -59,9 +59,7 @@ class IFreqaiModel(ABC):
self.update_historic_data = 0
self.set_full_path()
self.follow_mode = self.freqai_info.get('follow_mode', False)
self.data_drawer = FreqaiDataDrawer(Path(self.full_path),
self.config,
self.follow_mode)
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
self.lock = threading.Lock()
self.follow_mode = self.freqai_info.get('follow_mode', False)
self.identifier = self.freqai_info.get('identifier', 'no_id_provided')
@@ -91,12 +89,12 @@ class IFreqaiModel(ABC):
"""
self.live = strategy.dp.runmode in (RunMode.DRY_RUN, RunMode.LIVE)
self.data_drawer.set_pair_dict_info(metadata)
self.dd.set_pair_dict_info(metadata)
if self.live:
self.dh = FreqaiDataKitchen(self.config, self.data_drawer,
self.dk = FreqaiDataKitchen(self.config, self.dd,
self.live, metadata["pair"])
dh = self.start_live(dataframe, metadata, strategy, self.dh)
dk = self.start_live(dataframe, metadata, strategy, self.dk)
# For backtesting, each pair enters and then gets trained for each window along the
# sliding window defined by "train_period" (training window) and "backtest_period"
@@ -104,19 +102,19 @@ class IFreqaiModel(ABC):
# FreqAI slides the window and sequentially builds the backtesting results before returning
# the concatenated results for the full backtesting period back to the strategy.
elif not self.follow_mode:
self.dh = FreqaiDataKitchen(self.config, self.data_drawer, self.live, metadata["pair"])
logger.info(f'Training {len(self.dh.training_timeranges)} timeranges')
dh = self.start_backtesting(dataframe, metadata, self.dh)
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
logger.info(f'Training {len(self.dk.training_timeranges)} timeranges')
dk = self.start_backtesting(dataframe, metadata, self.dk)
dataframe = self.remove_features_from_df(dataframe)
return self.return_values(dataframe, dh)
dataframe = self.remove_features_from_df(dk.return_dataframe)
return self.return_values(dataframe, dk)
@threaded
def start_scanning(self, strategy: IStrategy) -> None:
"""
Function designed to constantly scan pairs for retraining on a separate thread (intracandle)
to improve model youth. This function is agnostic to data preparation/collection/storage,
it simply trains on what ever data is available in the self.data_drawer.
it simply trains on what ever data is available in the self.dd.
:params:
strategy: IStrategy = The user defined strategy class
"""
@@ -124,33 +122,33 @@ class IFreqaiModel(ABC):
time.sleep(1)
for pair in self.config.get('exchange', {}).get('pair_whitelist'):
(_, trained_timestamp, _, _) = self.data_drawer.get_pair_dict_info(pair)
(_, trained_timestamp, _, _) = self.dd.get_pair_dict_info(pair)
if self.data_drawer.pair_dict[pair]['priority'] != 1:
if self.dd.pair_dict[pair]['priority'] != 1:
continue
dh = FreqaiDataKitchen(self.config, self.data_drawer,
dk = FreqaiDataKitchen(self.config, self.dd,
self.live, pair)
# file_exists = False
dh.set_paths(pair, trained_timestamp)
dk.set_paths(pair, trained_timestamp)
# file_exists = self.model_exists(pair,
# dh,
# dk,
# trained_timestamp=trained_timestamp,
# model_filename=model_filename,
# scanning=True)
(retrain,
new_trained_timerange,
data_load_timerange) = dh.check_if_new_training_required(trained_timestamp)
dh.set_paths(pair, new_trained_timerange.stopts)
data_load_timerange) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
if retrain: # or not file_exists:
self.train_model_in_series(new_trained_timerange, pair,
strategy, dh, data_load_timerange)
strategy, dk, data_load_timerange)
def start_backtesting(self, dataframe: DataFrame, metadata: dict,
dh: FreqaiDataKitchen) -> FreqaiDataKitchen:
dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
"""
The main broad execution for backtesting. For backtesting, each pair enters and then gets
trained for each window along the sliding window defined by "train_period" (training window)
@@ -161,9 +159,9 @@ class IFreqaiModel(ABC):
:params:
dataframe: DataFrame = strategy passed dataframe
metadata: Dict = pair metadata
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:returns:
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
"""
# Loop enforcing the sliding window training/backtesting paradigm
@@ -172,15 +170,15 @@ class IFreqaiModel(ABC):
# following tr_train. Both of these windows slide through the
# entire backtest
for tr_train, tr_backtest in zip(
dh.training_timeranges, dh.backtesting_timeranges
dk.training_timeranges, dk.backtesting_timeranges
):
(_, _, _, _) = self.data_drawer.get_pair_dict_info(metadata['pair'])
(_, _, _, _) = self.dd.get_pair_dict_info(metadata['pair'])
gc.collect()
dh.data = {} # clean the pair specific data between training window sliding
dk.data = {} # clean the pair specific data between training window sliding
self.training_timerange = tr_train
# self.training_timerange_timerange = tr_train
dataframe_train = dh.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dh.slice_dataframe(tr_backtest, dataframe)
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
trained_timestamp = tr_train # TimeRange.parse_timerange(tr_train)
tr_train_startts_str = datetime.datetime.utcfromtimestamp(
@@ -190,33 +188,33 @@ class IFreqaiModel(ABC):
logger.info("Training %s", metadata["pair"])
logger.info(f'Training {tr_train_startts_str} to {tr_train_stopts_str}')
dh.data_path = Path(dh.full_path /
dk.data_path = Path(dk.full_path /
str("sub-train" + "-" + metadata['pair'].split("/")[0] +
str(int(trained_timestamp.stopts))))
if not self.model_exists(metadata["pair"], dh,
if not self.model_exists(metadata["pair"], dk,
trained_timestamp=trained_timestamp.stopts):
self.model = self.train(dataframe_train, metadata['pair'], dh)
self.data_drawer.pair_dict[metadata['pair']][
self.model = self.train(dataframe_train, metadata['pair'], dk)
self.dd.pair_dict[metadata['pair']][
'trained_timestamp'] = trained_timestamp.stopts
dh.set_new_model_names(metadata['pair'], trained_timestamp)
dh.save_data(self.model, metadata['pair'], keras=self.keras)
dk.set_new_model_names(metadata['pair'], trained_timestamp)
dk.save_data(self.model, metadata['pair'], keras_model=self.keras)
else:
self.model = dh.load_data(metadata['pair'], keras=self.keras)
self.model = dk.load_data(metadata['pair'], keras_model=self.keras)
self.check_if_feature_list_matches_strategy(dataframe_train, dh)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
preds, do_preds = self.predict(dataframe_backtest, dh)
preds, do_preds = self.predict(dataframe_backtest, dk)
dh.append_predictions(preds, do_preds, len(dataframe_backtest))
print('predictions', len(dh.full_predictions),
'do_predict', len(dh.full_do_predict))
dk.append_predictions(preds, do_preds, len(dataframe_backtest))
print('predictions', len(dk.full_predictions),
'do_predict', len(dk.full_do_predict))
dh.fill_predictions(len(dataframe))
dk.fill_predictions(len(dataframe))
return dh
return dk
def start_live(self, dataframe: DataFrame, metadata: dict,
strategy: IStrategy, dh: FreqaiDataKitchen) -> FreqaiDataKitchen:
strategy: IStrategy, dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
"""
The main broad execution for dry/live. This function will check if a retraining should be
performed, and if so, retrain and reset the model.
@@ -224,30 +222,30 @@ class IFreqaiModel(ABC):
dataframe: DataFrame = strategy passed dataframe
metadata: Dict = pair metadata
strategy: IStrategy = currently employed strategy
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:returns:
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
"""
# update follower
if self.follow_mode:
self.data_drawer.update_follower_metadata()
self.dd.update_follower_metadata()
# get the model metadata associated with the current pair
(_,
trained_timestamp,
_,
return_null_array) = self.data_drawer.get_pair_dict_info(metadata['pair'])
return_null_array) = self.dd.get_pair_dict_info(metadata['pair'])
# if the metadata doesnt exist, the follower returns null arrays to strategy
if self.follow_mode and return_null_array:
logger.info('Returning null array from follower to strategy')
self.data_drawer.return_null_values_to_strategy(dataframe, dh)
return dh
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
# append the historic data once per round
if self.data_drawer.historic_data:
dh.update_historic_data(strategy)
if self.dd.historic_data:
dk.update_historic_data(strategy)
logger.debug(f'Updating historic data on pair {metadata["pair"]}')
# if trainable, check if model needs training, if so compute new timerange,
@@ -257,95 +255,100 @@ class IFreqaiModel(ABC):
(_,
new_trained_timerange,
data_load_timerange) = dh.check_if_new_training_required(trained_timestamp)
dh.set_paths(metadata['pair'], new_trained_timerange.stopts)
data_load_timerange) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(metadata['pair'], new_trained_timerange.stopts)
# download candle history if it is not already in memory
if not self.data_drawer.historic_data:
if not self.dd.historic_data:
logger.info('Downloading all training data for all pairs in whitelist and '
'corr_pairlist, this may take a while if you do not have the '
'data saved')
dh.download_all_data_for_training(data_load_timerange)
dh.load_all_pair_histories(data_load_timerange)
dk.download_all_data_for_training(data_load_timerange)
dk.load_all_pair_histories(data_load_timerange)
if not self.scanning:
self.scanning = True
self.start_scanning(strategy)
elif self.follow_mode:
dh.set_paths(metadata['pair'], trained_timestamp)
dk.set_paths(metadata['pair'], trained_timestamp)
logger.info('FreqAI instance set to follow_mode, finding existing pair'
f'using { self.identifier }')
# load the model and associated data into the data kitchen
self.model = dh.load_data(coin=metadata['pair'], keras=self.keras)
self.model = dk.load_data(coin=metadata['pair'], keras_model=self.keras)
if not self.model:
logger.warning('No model ready, returning null values to strategy.')
self.data_drawer.return_null_values_to_strategy(dataframe, dh)
return dh
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
# ensure user is feeding the correct indicators to the model
self.check_if_feature_list_matches_strategy(dataframe, dh)
self.check_if_feature_list_matches_strategy(dataframe, dk)
self.build_strategy_return_arrays(dataframe, dh, metadata['pair'], trained_timestamp)
self.build_strategy_return_arrays(dataframe, dk, metadata['pair'], trained_timestamp)
return dh
return dk
def build_strategy_return_arrays(self, dataframe: DataFrame,
dh: FreqaiDataKitchen, pair: str,
dk: FreqaiDataKitchen, pair: str,
trained_timestamp: int) -> None:
# hold the historical predictions in memory so we are sending back
# correct array to strategy
if pair not in self.data_drawer.model_return_values:
preds, do_preds = self.predict(dataframe, dh)
if pair not in self.dd.model_return_values:
pred_df, do_preds = self.predict(dataframe, dk)
# mypy doesnt like the typing in else statement, so we need to explicitly add to
# dataframe separately
dataframe['prediction'], dataframe['do_predict'] = preds, do_preds
# dh.append_predictions(preds, do_preds, len(dataframe))
# dh.fill_predictions(len(dataframe))
self.data_drawer.set_initial_return_values(pair, dh, dataframe)
# for label in dk.label_list:
# dataframe[label] = pred_df[label]
# dataframe['do_predict'] = do_preds
# dk.append_predictions(preds, do_preds, len(dataframe))
# dk.fill_predictions(len(dataframe))
self.dd.set_initial_return_values(pair, dk, pred_df, do_preds)
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
return
elif self.dh.check_if_model_expired(trained_timestamp):
preds, do_preds, dh.DI_values = np.zeros(2), np.ones(2) * 2, np.zeros(2)
elif self.dk.check_if_model_expired(trained_timestamp):
pred_df = DataFrame(np.zeros((2, len(dk.label_list))), columns=dk.label_list)
do_preds, dk.DI_values = np.ones(2) * 2, np.zeros(2)
logger.warning('Model expired, returning null values to strategy. Strategy '
'construction should take care to consider this event with '
'prediction == 0 and do_predict == 2')
else:
# Only feed in the most recent candle for prediction in live scenario
preds, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dh, first=False)
pred_df, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dk, first=False)
self.dd.append_model_predictions(pair, pred_df, do_preds, dk, len(dataframe))
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
self.data_drawer.append_model_predictions(pair, preds, do_preds,
dh.data["target_mean"],
dh.data["target_std"],
dh,
len(dataframe))
return
def check_if_feature_list_matches_strategy(self, dataframe: DataFrame,
dh: FreqaiDataKitchen) -> None:
dk: FreqaiDataKitchen) -> None:
"""
Ensure user is passing the proper feature set if they are reusing an `identifier` pointing
to a folder holding existing models.
:params:
dataframe: DataFrame = strategy provided dataframe
dh: FreqaiDataKitchen = non-persistent data container/analyzer for current coin/bot loop
dk: FreqaiDataKitchen = non-persistent data container/analyzer for current coin/bot loop
"""
strategy_provided_features = dh.find_features(dataframe)
if 'training_features_list_raw' in dh.data:
feature_list = dh.data['training_features_list_raw']
dk.find_features(dataframe)
if 'training_features_list_raw' in dk.data:
feature_list = dk.data['training_features_list_raw']
else:
feature_list = dh.training_features_list
if strategy_provided_features != feature_list:
feature_list = dk.training_features_list
if dk.training_features_list != feature_list:
raise OperationalException("Trying to access pretrained model with `identifier` "
"but found different features furnished by current strategy."
"Change `identifer` to train from scratch, or ensure the"
"strategy is furnishing the same features as the pretrained"
"model")
def data_cleaning_train(self, dh: FreqaiDataKitchen) -> None:
def data_cleaning_train(self, dk: FreqaiDataKitchen) -> None:
"""
Base data cleaning method for train
Any function inside this method should drop training data points from the filtered_dataframe
@@ -354,23 +357,23 @@ class IFreqaiModel(ABC):
"""
if self.freqai_info.get('feature_parameters', {}).get('principal_component_analysis'):
dh.principal_component_analysis()
dk.principal_component_analysis()
if self.freqai_info.get('feature_parameters', {}).get('use_SVM_to_remove_outliers'):
dh.use_SVM_to_remove_outliers(predict=False)
dk.use_SVM_to_remove_outliers(predict=False)
if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'):
dh.data["avg_mean_dist"] = dh.compute_distances()
dk.data["avg_mean_dist"] = dk.compute_distances()
# if self.feature_parameters["determine_statistical_distributions"]:
# dh.determine_statistical_distributions()
# dk.determine_statistical_distributions()
# if self.feature_parameters["remove_outliers"]:
# dh.remove_outliers(predict=False)
# dk.remove_outliers(predict=False)
def data_cleaning_predict(self, dh: FreqaiDataKitchen, dataframe: DataFrame) -> None:
def data_cleaning_predict(self, dk: FreqaiDataKitchen, dataframe: DataFrame) -> None:
"""
Base data cleaning method for predict.
These functions each modify dh.do_predict, which is a dataframe with equal length
These functions each modify dk.do_predict, which is a dataframe with equal length
to the number of candles coming from and returning to the strategy. Inside do_predict,
1 allows prediction and < 0 signals to the strategy that the model is not confident in
the prediction.
@@ -379,20 +382,20 @@ class IFreqaiModel(ABC):
for buy signals.
"""
if self.freqai_info.get('feature_parameters', {}).get('principal_component_analysis'):
dh.pca_transform(dataframe)
dk.pca_transform(dataframe)
if self.freqai_info.get('feature_parameters', {}).get('use_SVM_to_remove_outliers'):
dh.use_SVM_to_remove_outliers(predict=True)
dk.use_SVM_to_remove_outliers(predict=True)
if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'):
dh.check_if_pred_in_training_spaces()
dk.check_if_pred_in_training_spaces()
# if self.feature_parameters["determine_statistical_distributions"]:
# dh.determine_statistical_distributions()
# dk.determine_statistical_distributions()
# if self.feature_parameters["remove_outliers"]:
# dh.remove_outliers(predict=True) # creates dropped index
# dk.remove_outliers(predict=True) # creates dropped index
def model_exists(self, pair: str, dh: FreqaiDataKitchen, trained_timestamp: int = None,
def model_exists(self, pair: str, dk: FreqaiDataKitchen, trained_timestamp: int = None,
model_filename: str = '', scanning: bool = False) -> bool:
"""
Given a pair and path, check if a model already exists
@@ -402,14 +405,14 @@ class IFreqaiModel(ABC):
coin, _ = pair.split("/")
if not self.live:
dh.model_filename = model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp)
dk.model_filename = model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp)
path_to_modelfile = Path(dh.data_path / str(model_filename + "_model.joblib"))
path_to_modelfile = Path(dk.data_path / str(model_filename + "_model.joblib"))
file_exists = path_to_modelfile.is_file()
if file_exists and not scanning:
logger.info("Found model at %s", dh.data_path / dh.model_filename)
logger.info("Found model at %s", dk.data_path / dk.model_filename)
elif not scanning:
logger.info("Could not find model at %s", dh.data_path / dh.model_filename)
logger.info("Could not find model at %s", dk.data_path / dk.model_filename)
return file_exists
def set_full_path(self) -> None:
@@ -430,7 +433,7 @@ class IFreqaiModel(ABC):
return dataframe[to_keep]
def train_model_in_series(self, new_trained_timerange: TimeRange, pair: str,
strategy: IStrategy, dh: FreqaiDataKitchen,
strategy: IStrategy, dk: FreqaiDataKitchen,
data_load_timerange: TimeRange):
"""
Retreive data and train model in single threaded mode (only used if model directory is empty
@@ -439,41 +442,43 @@ class IFreqaiModel(ABC):
new_trained_timerange: TimeRange = the timerange to train the model on
metadata: dict = strategy provided metadata
strategy: IStrategy = user defined strategy object
dh: FreqaiDataKitchen = non-persistent data container for current coin/loop
dk: FreqaiDataKitchen = non-persistent data container for current coin/loop
data_load_timerange: TimeRange = the amount of data to be loaded for populate_any_indicators
(larger than new_trained_timerange so that new_trained_timerange does not contain any NaNs)
"""
corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange,
corr_dataframes, base_dataframes = dk.get_base_and_corr_dataframes(data_load_timerange,
pair)
unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy,
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(strategy,
corr_dataframes,
base_dataframes,
pair)
unfiltered_dataframe = dh.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
model = self.train(unfiltered_dataframe, pair, dh)
# find the features indicated by strategy and store in datakitchen
dk.find_features(unfiltered_dataframe)
self.data_drawer.pair_dict[pair][
'trained_timestamp'] = new_trained_timerange.stopts
dh.set_new_model_names(pair, new_trained_timerange)
self.data_drawer.pair_dict[pair]['first'] = False
if self.data_drawer.pair_dict[pair]['priority'] == 1 and self.scanning:
model = self.train(unfiltered_dataframe, pair, dk)
self.dd.pair_dict[pair]['trained_timestamp'] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange)
self.dd.pair_dict[pair]['first'] = False
if self.dd.pair_dict[pair]['priority'] == 1 and self.scanning:
with self.lock:
self.data_drawer.pair_to_end_of_training_queue(pair)
dh.save_data(model, coin=pair, keras=self.keras)
self.dd.pair_to_end_of_training_queue(pair)
dk.save_data(model, coin=pair, keras_model=self.keras)
if self.freqai_info.get('purge_old_models', False):
self.data_drawer.purge_old_models()
self.dd.purge_old_models()
# self.retrain = False
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModlel.py for an example.
@abstractmethod
def train(self, unfiltered_dataframe: DataFrame, pair: str, dh: FreqaiDataKitchen) -> Any:
def train(self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahandler
for storing, saving, loading, and analyzing the data.
@@ -499,37 +504,36 @@ class IFreqaiModel(ABC):
@abstractmethod
def predict(self, dataframe: DataFrame,
dh: FreqaiDataKitchen, first: bool = True) -> Tuple[npt.ArrayLike, npt.ArrayLike]:
dk: FreqaiDataKitchen, first: bool = True) -> Tuple[DataFrame, npt.ArrayLike]:
"""
Filter the prediction features data and predict with it.
:param:
unfiltered_dataframe: Full dataframe for the current backtest period.
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (i.e. SVM and/or DI index)
"""
@abstractmethod
def make_labels(self, dataframe: DataFrame, dh: FreqaiDataKitchen) -> DataFrame:
def make_labels(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User defines the labels here (target values).
:params:
dataframe: DataFrame = the full dataframe for the present training period
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
"""
return
@abstractmethod
def return_values(self, dataframe: DataFrame, dh: FreqaiDataKitchen) -> DataFrame:
def return_values(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User defines the dataframe to be returned to strategy here.
:params:
dataframe: DataFrame = the full dataframe for the current prediction (live)
or --timerange (backtesting)
dh: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:returns:
dataframe: DataFrame = dataframe filled with user defined data
"""