give user ability to analyze live trade dataframe inside custom prediction model. Add documentation to explain new functionality

This commit is contained in:
robcaulk 2022-08-02 20:14:02 +02:00
parent 895ebbfd18
commit 95d3009a95
4 changed files with 147 additions and 13 deletions

View File

@ -619,6 +619,46 @@ If the user sets this value, FreqAI will initially use the predictions from the
and then subsequently begin introducing real prediction data as it is generated. FreqAI will save and then subsequently begin introducing real prediction data as it is generated. FreqAI will save
this historical data to be reloaded if the user stops and restarts with the same `identifier`. this historical data to be reloaded if the user stops and restarts with the same `identifier`.
## Extra returns per train
Users may find that there are some important metrics that they'd like to return to the strategy at the end of each retrain.
Users can include these metrics by assigining them to `dk.data['extra_returns_per_train']['my_new_value'] = XYZ` inside their custom prediction
model class. FreqAI takes the `my_new_value` assigned in this dictionary and expands it to fit the return dataframe to the strategy.
The user can then use the value in the strategy with `dataframe['my_new_value']`. An example of how this is already used in FreqAI is
the `&*_mean` and `&*_std` values, which indicate the mean and standard deviation of that particular label during the most recent training.
Another example is shown below if the user wants to use live metrics from the trade databse.
The user needs to set the standard dictionary in the config so FreqAI can return proper dataframe shapes:
```json
"freqai": {
"extra_returns_per_train": {"total_profit": 4}
}
```
These values will likely be overridden by the user prediction model, but in the case where the user model has yet to set them, or needs
a default initial value - this is the value that will be returned.
## Analyzing the trade live database
Users can analyze the live trade database by calling `analyze_trade_database()` in their custom prediction model. FreqAI already has the
database setup in a pandas dataframe and ready to be analyzed. Here is an example usecase:
```python
def analyze_trade_database(self, dk: FreqaiDataKitchen, pair: str) -> None:
"""
User analyzes the trade database here and returns summary stats which will be passed back
to the strategy for reinforcement learning or for additional adaptive metrics for use
in entry/exit signals. Store these metrics in dk.data['extra_returns_per_train'] and
they will format themselves into the dataframe as an additional column in the user
strategy. User has access to the current trade database in dk.trade_database_df.
"""
total_profit = dk.trade_database_df['close_profit_abs'].sum()
dk.data['extra_returns_per_train']['total_profit'] = total_profit
return
```
<!-- ## Dynamic target expectation <!-- ## Dynamic target expectation
The labels used for model training have a unique statistical distribution for each separate model training. The labels used for model training have a unique statistical distribution for each separate model training.

View File

@ -39,7 +39,7 @@ class FreqaiDataDrawer:
Robert Caulk @robcaulk Robert Caulk @robcaulk
Theoretical brainstorming: Theoretical brainstorming:
Elin Törnquist @thorntwig Elin Törnquist @th0rntwig
Code review, software architecture brainstorming: Code review, software architecture brainstorming:
@xmatthias @xmatthias
@ -238,6 +238,11 @@ class FreqaiDataDrawer:
mrv_df["do_predict"] = do_preds mrv_df["do_predict"] = do_preds
if dk.data['extra_returns_per_train']:
rets = dk.data['extra_returns_per_train']
for return_str in rets:
mrv_df[return_str] = rets[return_str]
# for keras type models, the conv_window needs to be prepended so # for keras type models, the conv_window needs to be prepended so
# viewing is correct in frequi # viewing is correct in frequi
if self.freqai_info.get('keras', False): if self.freqai_info.get('keras', False):
@ -282,9 +287,15 @@ class FreqaiDataDrawer:
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0: if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
df["DI_values"].iloc[-1] = dk.DI_values[-1] df["DI_values"].iloc[-1] = dk.DI_values[-1]
if dk.data['extra_returns_per_train']:
rets = dk.data['extra_returns_per_train']
for return_str in rets:
df[return_str].iloc[-1] = rets[return_str]
# append the new predictions to persistent storage # append the new predictions to persistent storage
if pair in self.historic_predictions: if pair in self.historic_predictions:
self.historic_predictions[pair].iloc[-1] = df[label].iloc[-1] for key in df.keys():
self.historic_predictions[pair][key].iloc[-1] = df[key].iloc[-1]
if length_difference < 0: if length_difference < 0:
prepend_df = pd.DataFrame( prepend_df = pd.DataFrame(
@ -320,7 +331,12 @@ class FreqaiDataDrawer:
dataframe["do_predict"] = 0 dataframe["do_predict"] = 0
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0: if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
dataframe["DI_value"] = 0 dataframe["DI_values"] = 0
if dk.data['extra_returns_per_train']:
rets = dk.data['extra_returns_per_train']
for return_str in rets:
dataframe[return_str] = 0
dk.return_dataframe = dataframe dk.return_dataframe = dataframe

View File

@ -2,6 +2,7 @@ import copy
import datetime import datetime
import logging import logging
import shutil import shutil
import sqlite3
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
@ -39,7 +40,7 @@ class FreqaiDataKitchen:
Robert Caulk @robcaulk Robert Caulk @robcaulk
Theoretical brainstorming: Theoretical brainstorming:
Elin Törnquist @thorntwig Elin Törnquist @th0rntwig
Code review, software architecture brainstorming: Code review, software architecture brainstorming:
@xmatthias @xmatthias
@ -84,6 +85,12 @@ class FreqaiDataKitchen:
config["freqai"]["backtest_period_days"], config["freqai"]["backtest_period_days"],
) )
db_url = self.config.get('db_url', None)
self.database_path = '' if db_url == 'sqlite://' else str(db_url).split('///')[1]
self.trade_database_df: DataFrame = pd.DataFrame()
self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
def set_paths( def set_paths(
self, self,
pair: str, pair: str,
@ -101,7 +108,7 @@ class FreqaiDataKitchen:
self.data_path = Path( self.data_path = Path(
self.full_path self.full_path
/ str("sub-train" + "-" + pair.split("/")[0] + "_" + str(trained_timestamp)) / f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
) )
return return
@ -328,7 +335,7 @@ class FreqaiDataKitchen:
""" """
for label in self.label_list: for label in self.label_list:
if df[label].dtype == str: if df[label].dtype == object:
continue continue
df[label] = ( df[label] = (
(df[label] + 1) (df[label] + 1)
@ -493,7 +500,6 @@ class FreqaiDataKitchen:
tc = self.freqai_config.get("model_training_parameters", {}).get("thread_count", -1) tc = self.freqai_config.get("model_training_parameters", {}).get("thread_count", -1)
pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=tc) pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=tc)
avg_mean_dist = pairwise.mean(axis=1).mean() avg_mean_dist = pairwise.mean(axis=1).mean()
logger.info(f"avg_mean_dist {avg_mean_dist:.2f}")
return avg_mean_dist return avg_mean_dist
@ -599,10 +605,11 @@ class FreqaiDataKitchen:
from the training data set. from the training data set.
""" """
tc = self.freqai_config.get("model_training_parameters", {}).get("thread_count", -1)
distance = pairwise_distances( distance = pairwise_distances(
self.data_dictionary["train_features"], self.data_dictionary["train_features"],
self.data_dictionary["prediction_features"], self.data_dictionary["prediction_features"],
n_jobs=-1, n_jobs=tc,
) )
self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"] self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
@ -946,6 +953,19 @@ class FreqaiDataKitchen:
] ]
return dataframe[to_keep] return dataframe[to_keep]
def get_current_trade_database(self) -> None:
if self.database_path == '':
logger.warning('No trade databse found. Skipping analysis.')
return
data = sqlite3.connect(self.database_path)
query = data.execute("SELECT * From trades")
cols = [column[0] for column in query.description]
df = pd.DataFrame.from_records(data=query.fetchall(), columns=cols)
self.trade_database_df = df.dropna(subset='close_date')
data.close()
def np_encoder(self, object): def np_encoder(self, object):
if isinstance(object, np.generic): if isinstance(object, np.generic):
return object.item() return object.item()

View File

@ -1,5 +1,4 @@
# import contextlib # import contextlib
import copy
import datetime import datetime
import logging import logging
import shutil import shutil
@ -46,7 +45,7 @@ class IFreqaiModel(ABC):
Robert Caulk @robcaulk Robert Caulk @robcaulk
Theoretical brainstorming: Theoretical brainstorming:
Elin Törnquist @thorntwig Elin Törnquist @th0rntwig
Code review, software architecture brainstorming: Code review, software architecture brainstorming:
@xmatthias @xmatthias
@ -81,6 +80,8 @@ class IFreqaiModel(ABC):
self.CONV_WIDTH = self.freqai_info.get("conv_width", 2) self.CONV_WIDTH = self.freqai_info.get("conv_width", 2)
self.pair_it = 0 self.pair_it = 0
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist")) self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
self.last_trade_database_summary: DataFrame = {}
self.current_trade_database_summary: DataFrame = {}
def assert_config(self, config: Dict[str, Any]) -> None: def assert_config(self, config: Dict[str, Any]) -> None:
@ -479,6 +480,9 @@ class IFreqaiModel(ABC):
model = self.train(unfiltered_dataframe, pair, dk) model = self.train(unfiltered_dataframe, pair, dk)
dk.get_current_trade_database()
self.analyze_trade_database(dk, pair)
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange) dk.set_new_model_names(pair, new_trained_timerange)
self.dd.pair_dict[pair]["first"] = False self.dd.pair_dict[pair]["first"] = False
@ -493,13 +497,50 @@ class IFreqaiModel(ABC):
def set_initial_historic_predictions( def set_initial_historic_predictions(
self, df: DataFrame, model: Any, dk: FreqaiDataKitchen, pair: str self, df: DataFrame, model: Any, dk: FreqaiDataKitchen, pair: str
) -> None: ) -> None:
trained_predictions = model.predict(df) """
This function is called only if the datadrawer failed to load an
existing set of historic predictions. In this case, it builds
the structure and sets fake predictions off the first training
data. After that, FreqAI will append new real predictions to the
set of historic predictions.
These values are used to generate live statistics which can be used
in the strategy for adaptive values. E.g. &*_mean/std are quantities
that can computed based on live predictions from the set of historical
predictions. Those values can be used in the user strategy to better
assess prediction rarity, and thus wait for probabilistically favorable
entries relative to the live historical predictions.
If the user reuses an identifier on a subsequent instance,
this function will not be called. In that case, "real" predictions
will be appended to the loaded set of historic predictions.
:param: df: DataFrame = the dataframe containing the training feature data
:param: model: Any = A model which was `fit` using a common librariy such as
catboost or lightgbm
:param: dk: FreqaiDataKitchen = object containing methods for data analysis
:param: pair: str = current pair
"""
num_candles = self.freqai_info.get('fit_live_predictions_candles', 600)
df_tail = df.tail(num_candles)
trained_predictions = model.predict(df_tail)
pred_df = DataFrame(trained_predictions, columns=dk.label_list) pred_df = DataFrame(trained_predictions, columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df) pred_df = dk.denormalize_labels_from_metadata(pred_df)
self.dd.historic_predictions[pair] = pd.DataFrame() self.dd.historic_predictions[pair] = pred_df
self.dd.historic_predictions[pair] = copy.deepcopy(pred_df) hist_preds_df = self.dd.historic_predictions[pair]
hist_preds_df['do_predict'] = 0
if self.freqai_info['feature_parameters'].get('DI_threshold', 0) > 0:
hist_preds_df['DI_values'] = 0
for label in dk.data['labels_mean']:
hist_preds_df[f'{label}_mean'] = 0
hist_preds_df[f'{label}_std'] = 0
for return_str in dk.data['extra_returns_per_train']:
hist_preds_df[return_str] = 0
def fit_live_predictions(self, dk: FreqaiDataKitchen) -> None: def fit_live_predictions(self, dk: FreqaiDataKitchen) -> None:
""" """
@ -565,3 +606,20 @@ class IFreqaiModel(ABC):
""" """
return return
def analyze_trade_database(self, dk: FreqaiDataKitchen, pair: str) -> None:
"""
User analyzes the trade database here and returns summary stats which will be passed back
to the strategy for reinforcement learning or for additional adaptive metrics for use
in entry/exit signals. Store these metrics in dk.data['extra_returns_per_train'] and
they will format themselves into the dataframe as an additional column in the user
strategy. User has access to the current trade database in dk.trade_database_df.
"""
if dk.trade_database_df.empty:
logger.warning(f'No trades found for {pair} to analyze DB')
return
total_profit = dk.trade_database_df['close_profit_abs'].sum()
dk.data['extra_returns_per_train']['total_profit'] = total_profit
return