add freqao backend machinery, user interface, documentation

This commit is contained in:
robcaulk
2022-05-03 10:14:17 +02:00
parent ebab02fce3
commit fc837c4daa
19 changed files with 1405 additions and 3 deletions

View File

@@ -19,6 +19,7 @@ from freqtrade.commands.list_commands import (start_list_exchanges, start_list_m
start_show_trades)
from freqtrade.commands.optimize_commands import (start_backtesting, start_backtesting_show,
start_edge, start_hyperopt)
from freqtrade.commands.freqai_commands import (start_training)
from freqtrade.commands.pairlist_commands import start_test_pairlist
from freqtrade.commands.plot_commands import start_plot_dataframe, start_plot_profit
from freqtrade.commands.trade_commands import start_trading

View File

@@ -12,7 +12,7 @@ from freqtrade.constants import DEFAULT_CONFIG
ARGS_COMMON = ["verbosity", "logfile", "version", "config", "datadir", "user_data_dir"]
ARGS_STRATEGY = ["strategy", "strategy_path", "recursive_strategy_search"]
ARGS_STRATEGY = ["strategy", "strategy_path", "recursive_strategy_search", "freqaimodel", "freqaimodel_path"]
ARGS_TRADE = ["db_url", "sd_notify", "dry_run", "dry_run_wallet", "fee"]
@@ -190,7 +190,8 @@ class Arguments:
start_list_markets, start_list_strategies,
start_list_timeframes, start_new_config, start_new_strategy,
start_plot_dataframe, start_plot_profit, start_show_trades,
start_test_pairlist, start_trading, start_webserver)
start_test_pairlist, start_trading, start_webserver,
start_training)
subparsers = self.parser.add_subparsers(dest='command',
# Use custom message when no subhandler is added

View File

@@ -614,4 +614,16 @@ AVAILABLE_CLI_OPTIONS = {
"that do not contain any parameters."),
action="store_true",
),
"freqaimodel": Arg(
'--freqaimodel',
help='Specify a custom freqaimodels.',
metavar='NAME',
),
"freqaimodel_path": Arg(
'--freqaimodel-path',
help='Specify additional lookup path for freqaimodels.',
metavar='PATH',
),
}

View File

@@ -0,0 +1,24 @@
import logging
from typing import Any, Dict
from freqtrade import constants
from freqtrade.configuration import setup_utils_configuration
from freqtrade.enums import RunMode
from freqtrade.exceptions import OperationalException
from freqtrade.misc import round_coin_value
logger = logging.getLogger(__name__)
def start_training(args: Dict[str, Any]) -> None:
"""
Train a model for predicting signals
:param args: Cli args from Arguments()
:return: None
"""
from freqtrade.freqai.training import Training
config = setup_utils_configuration(args, RunMode.FREQAI)
training = Training(config)
training.start()

View File

@@ -95,6 +95,8 @@ class Configuration:
self._process_data_options(config)
self._process_freqai_options(config)
# Check if the exchange set by the user is supported
check_exchange(config, config.get('experimental', {}).get('block_bad_exchanges', True))
@@ -446,6 +448,16 @@ class Configuration:
config.update({'runmode': self.runmode})
def _process_freqai_options(self, config: Dict[str, Any]) -> None:
self._args_to_config(config, argname='freqaimodel',
logstring='Using freqaimodel class name: {}')
self._args_to_config(config, argname='freqaimodel_path',
logstring='Using freqaimodel path: {}')
return
def _args_to_config(self, config: Dict[str, Any], argname: str,
logstring: str, logfun: Optional[Callable] = None,
deprecated_msg: Optional[str] = None) -> None:

View File

@@ -55,6 +55,7 @@ FTHYPT_FILEVERSION = 'fthypt_fileversion'
USERPATH_HYPEROPTS = 'hyperopts'
USERPATH_STRATEGIES = 'strategies'
USERPATH_NOTEBOOKS = 'notebooks'
USERPATH_FREQAIMODELS = 'freqaimodels'
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']

View File

@@ -15,9 +15,10 @@ class RunMode(Enum):
UTIL_NO_EXCHANGE = "util_no_exchange"
PLOT = "plot"
WEBSERVER = "webserver"
FREQAI = "freqai"
OTHER = "other"
TRADING_MODES = [RunMode.LIVE, RunMode.DRY_RUN]
OPTIMIZE_MODES = [RunMode.BACKTEST, RunMode.EDGE, RunMode.HYPEROPT]
OPTIMIZE_MODES = [RunMode.BACKTEST, RunMode.EDGE, RunMode.HYPEROPT, RunMode.FREQAI]
NON_UTIL_MODES = TRADING_MODES + OPTIMIZE_MODES

View File

@@ -0,0 +1,434 @@
import json
import os
import copy
import numpy as np
import pandas as pd
from pandas import DataFrame
from joblib import dump
from joblib import load
from sklearn.model_selection import train_test_split
from sklearn.metrics.pairwise import pairwise_distances
import datetime
from typing import Any, Dict, List, Tuple
import pickle as pk
from freqtrade.configuration import TimeRange
SECONDS_IN_DAY = 86400
class DataHandler:
"""
Class designed to handle all the data for the IFreqaiModel class model.
Functionalities include holding, saving, loading, and analyzing the data.
"""
def __init__(self, config: Dict[str, Any], dataframe: DataFrame, data: List):
self.full_dataframe = dataframe
(self.training_timeranges,
self.backtesting_timeranges) = self.split_timerange(
config['freqai']['full_timerange'],
config['freqai']['train_period'],
config['freqai']['backtest_period'])
self.data = data
self.data_dictionary = {}
self.config = config
self.freq_config = config['freqai']
def save_data(self, model: Any) -> None:
"""
Saves all data associated with a model for a single sub-train time range
:params:
:model: User trained model which can be reused for inferencing to generate
predictions
"""
if not os.path.exists(self.model_path): os.mkdir(self.model_path)
save_path = self.model_path + self.model_filename
# Save the trained model
dump(model, save_path+"_model.joblib")
self.data['model_path'] = self.model_path
self.data['model_filename'] = self.model_filename
self.data['training_features_list'] = list(self.data_dictionary['train_features'].columns)
# store the metadata
with open(save_path+"_metadata.json", 'w') as fp:
json.dump(self.data, fp, default=self.np_encoder)
# save the train data to file so we can check preds for area of applicability later
self.data_dictionary['train_features'].to_pickle(save_path+"_trained_df.pkl")
return
def load_data(self) -> Any:
"""
loads all data required to make a prediction on a sub-train time range
:returns:
:model: User trained model which can be inferenced for new predictions
"""
model = load(self.model_path+self.model_filename+"_model.joblib")
with open(self.model_path+self.model_filename+"_metadata.json", 'r') as fp:
self.data = json.load(fp)
if self.data.get('training_features_list'):
self.training_features_list = [*self.data.get('training_features_list')]
self.data_dictionary['train_features'] = pd.read_pickle(self.model_path+
self.model_filename+"_trained_df.pkl")
self.model_path = self.data['model_path']
self.model_filename = self.data['model_filename']
if self.config['freqai']['feature_parameters']['principal_component_analysis']:
self.pca = pk.load(open(self.model_path+self.model_filename+"_pca_object.pkl","rb"))
return model
def make_train_test_datasets(self, filtered_dataframe: DataFrame, labels: DataFrame) -> None:
'''
Given the dataframe for the full history for training, split the data into
training and test data according to user specified parameters in configuration
file.
:filtered_dataframe: cleaned dataframe ready to be split.
:labels: cleaned labels ready to be split.
'''
if self.config['freqai']['feature_parameters']['weight_factor'] > 0:
weights = self.set_weights_higher_recent(len(filtered_dataframe))
else: weights = np.ones(len(filtered_dataframe))
(train_features, test_features, train_labels,
test_labels, train_weights, test_weights) = train_test_split(
filtered_dataframe[:filtered_dataframe.shape[0]],
labels,
weights,
**self.config['freqai']['data_split_parameters']
)
return self.build_data_dictionary(
train_features,test_features,
train_labels,test_labels,
train_weights,test_weights)
def filter_features(self, unfiltered_dataframe: DataFrame, training_feature_list: List,
labels: DataFrame = None, training_filter: bool=True) -> Tuple[DataFrame, DataFrame]:
'''
Filter the unfiltered dataframe to extract the user requested features and properly
remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
0s in the prediction dataset. However, prediction dataset do_predict will reflect any
row that had a NaN and will shield user from that prediction.
:params:
:unfiltered_dataframe: the full dataframe for the present training period
:training_feature_list: list, the training feature list constructed by self.build_feature_list()
according to user specified parameters in the configuration file.
:labels: the labels for the dataset
:training_filter: boolean which lets the function know if it is training data or
prediction data to be filtered.
:returns:
:filtered_dataframe: dataframe cleaned of NaNs and only containing the user
requested feature set.
:labels: labels cleaned of NaNs.
'''
filtered_dataframe = unfiltered_dataframe.filter(training_feature_list, axis=1)
drop_index = pd.isnull(filtered_dataframe).any(1) # get the rows that have NaNs,
if training_filter: # we don't care about total row number (total no. datapoints) in training, we only care about removing any row with NaNs
drop_index_labels = pd.isnull(labels)
filtered_dataframe = filtered_dataframe[(drop_index==False) & (drop_index_labels==False)] # dropping values
labels = labels[(drop_index==False) & (drop_index_labels==False)] # assuming the labels depend entirely on the dataframe here.
print('dropped',len(unfiltered_dataframe)-len(filtered_dataframe),
'training data points due to NaNs, ensure you have downloaded all historical training data')
self.data['filter_drop_index_training'] = drop_index
else: # we are backtesting so we need to preserve row number to send back to strategy, so now we use do_predict to avoid any prediction based on a NaN
drop_index = pd.isnull(filtered_dataframe).any(1)
self.data['filter_drop_index_prediction'] = drop_index
filtered_dataframe.fillna(0, inplace=True) # replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction that was based on a single NaN is ultimately protected from buys with do_predict
drop_index = ~drop_index
self.do_predict = np.array(drop_index.replace(True,1).replace(False,0))
print('dropped',len(self.do_predict) - self.do_predict.sum(),'of',len(filtered_dataframe),
'prediction data points due to NaNs. These are protected from prediction with do_predict vector returned to strategy.')
return filtered_dataframe, labels
def build_data_dictionary(self, train_df: DataFrame, test_df: DataFrame,
train_labels: DataFrame, test_labels: DataFrame,
train_weights: Any, test_weights: Any) -> Dict:
self.data_dictionary = {'train_features': train_df,
'test_features': test_df,
'train_labels': train_labels,
'test_labels': test_labels,
'train_weights': train_weights,
'test_weights': test_weights}
return self.data_dictionary
def standardize_data(self, data_dictionary: Dict) -> None:
'''
Standardize all data in the data_dictionary according to the training dataset
:params:
:data_dictionary: dictionary containing the cleaned and split training/test data/labels
:returns:
:data_dictionary: updated dictionary with standardized values.
'''
# standardize the data by training stats
train_mean = data_dictionary['train_features'].mean()
train_std = data_dictionary['train_features'].std()
data_dictionary['train_features'] = (data_dictionary['train_features'] - train_mean) / train_std
data_dictionary['test_features'] = (data_dictionary['test_features'] - train_mean) / train_std
train_labels_std = data_dictionary['train_labels'].std()
train_labels_mean = data_dictionary['train_labels'].mean()
data_dictionary['train_labels'] = (data_dictionary['train_labels'] - train_labels_mean) / train_labels_std
data_dictionary['test_labels'] = (data_dictionary['test_labels'] - train_labels_mean) / train_labels_std
for item in train_std.keys():
self.data[item+'_std'] = train_std[item]
self.data[item+'_mean'] = train_mean[item]
self.data['labels_std'] = train_labels_std
self.data['labels_mean'] = train_labels_mean
return data_dictionary
def standardize_data_from_metadata(self, df: DataFrame) -> DataFrame:
'''
Standardizes a set of data using the mean and standard deviation from
the associated training data.
:params:
:df: Dataframe to be standardized
'''
for item in df.keys():
df[item] = (df[item] - self.data[item+'_mean']) / self.data[item+'_std']
return df
def split_timerange(self, tr: Dict, train_split: int=28, bt_split: int=7) -> list:
'''
Function which takes a single time range (tr) and splits it
into sub timeranges to train and backtest on based on user input
tr: str, full timerange to train on
train_split: the period length for the each training (days). Specified in user
configuration file
bt_split: the backtesting length (dats). Specified in user configuration file
'''
train_period = train_split * SECONDS_IN_DAY
bt_period = bt_split * SECONDS_IN_DAY
full_timerange = TimeRange.parse_timerange(tr)
timerange_train = copy.deepcopy(full_timerange)
timerange_backtest = copy.deepcopy(full_timerange)
tr_training_list = []
tr_backtesting_list = []
first = True
while True:
if not first: timerange_train.startts = timerange_train.startts + bt_period
timerange_train.stopts = timerange_train.startts + train_period
# if a full training period doesnt fit, we stop
if timerange_train.stopts > full_timerange.stopts: break
first = False
start = datetime.datetime.utcfromtimestamp(timerange_train.startts)
stop = datetime.datetime.utcfromtimestamp(timerange_train.stopts)
tr_training_list.append(start.strftime("%Y%m%d")+'-'+stop.strftime("%Y%m%d"))
## associated backtest period
timerange_backtest.startts = timerange_train.stopts
timerange_backtest.stopts = timerange_backtest.startts + bt_period
start = datetime.datetime.utcfromtimestamp(timerange_backtest.startts)
stop = datetime.datetime.utcfromtimestamp(timerange_backtest.stopts)
tr_backtesting_list.append(start.strftime("%Y%m%d")+'-'+stop.strftime("%Y%m%d"))
return tr_training_list, tr_backtesting_list
def slice_dataframe(self, tr: str, df: DataFrame) -> DataFrame:
"""
Given a full dataframe, extract the user desired window
:params:
:tr: timerange string that we wish to extract from df
:df: Dataframe containing all candles to run the entire backtest. Here
it is sliced down to just the present training period.
"""
timerange = TimeRange.parse_timerange(tr)
start = datetime.datetime.fromtimestamp(timerange.startts, tz=datetime.timezone.utc)
stop = datetime.datetime.fromtimestamp(timerange.stopts, tz=datetime.timezone.utc)
df = df.loc[df['date'] >= start, :]
df = df.loc[df['date'] <= stop, :]
return df
def principal_component_analysis(self) -> None:
"""
Performs Principal Component Analysis on the data for dimensionality reduction
and outlier detection (see self.remove_outliers())
No parameters or returns, it acts on the data_dictionary held by the DataHandler.
"""
from sklearn.decomposition import PCA # avoid importing if we dont need it
n_components = self.data_dictionary['train_features'].shape[1]
pca = PCA(n_components=n_components)
pca = pca.fit(self.data_dictionary['train_features'])
n_keep_components = np.argmin(pca.explained_variance_ratio_.cumsum() < 0.999)
pca2 = PCA(n_components=n_keep_components)
self.data['n_kept_components'] = n_keep_components
pca2 = pca2.fit(self.data_dictionary['train_features'])
print('reduced feature dimension by',n_components-n_keep_components)
print("explained variance",np.sum(pca2.explained_variance_ratio_))
train_components = pca2.transform(self.data_dictionary['train_features'])
test_components = pca2.transform(self.data_dictionary['test_features'])
self.data_dictionary['train_features'] = pd.DataFrame(data=train_components,
columns = ['PC'+str(i) for i in range(0,n_keep_components)],
index = self.data_dictionary['train_features'].index)
self.data_dictionary['test_features'] = pd.DataFrame(data=test_components,
columns = ['PC'+str(i) for i in range(0,n_keep_components)],
index = self.data_dictionary['test_features'].index)
self.data['n_kept_components'] = n_keep_components
self.pca = pca2
if not os.path.exists(self.model_path): os.mkdir(self.model_path)
pk.dump(pca2, open(self.model_path + self.model_filename+"_pca_object.pkl","wb"))
return None
def compute_distances(self) -> float:
print('computing average mean distance for all training points')
pairwise = pairwise_distances(self.data_dictionary['train_features'],n_jobs=-1)
avg_mean_dist = pairwise.mean(axis=1).mean()
print('avg_mean_dist',avg_mean_dist)
return avg_mean_dist
def remove_outliers(self,predict: bool) -> None:
"""
Remove data that looks like an outlier based on the distribution of each
variable.
:params:
:predict: boolean which tells the function if this is prediction data or
training data coming in.
"""
lower_quantile = self.data_dictionary['train_features'].quantile(0.001)
upper_quantile = self.data_dictionary['train_features'].quantile(0.999)
if predict:
df = self.data_dictionary['prediction_features'][(self.data_dictionary['prediction_features']<upper_quantile) & (self.data_dictionary['prediction_features']>lower_quantile)]
drop_index = pd.isnull(df).any(1)
self.data_dictionary['prediction_features'].fillna(0,inplace=True)
drop_index = ~drop_index
do_predict = np.array(drop_index.replace(True,1).replace(False,0))
print('remove_outliers() tossed',len(do_predict)-do_predict.sum(),'predictions because they were beyond 3 std deviations from training data.')
self.do_predict += do_predict
self.do_predict -= 1
else:
filter_train_df = self.data_dictionary['train_features'][(self.data_dictionary['train_features']<upper_quantile) & (self.data_dictionary['train_features']>lower_quantile)]
drop_index = pd.isnull(filter_train_df).any(1)
self.data_dictionary['train_features'] = self.data_dictionary['train_features'][(drop_index==False)]
self.data_dictionary['train_labels'] = self.data_dictionary['train_labels'][(drop_index==False)]
self.data_dictionary['train_weights'] = self.data_dictionary['train_weights'][(drop_index==False)]
# do the same for the test data
filter_test_df = self.data_dictionary['test_features'][(self.data_dictionary['test_features']<upper_quantile) & (self.data_dictionary['test_features']>lower_quantile)]
drop_index = pd.isnull(filter_test_df).any(1)
#pdb.set_trace()
self.data_dictionary['test_labels'] = self.data_dictionary['test_labels'][(drop_index==False)]
self.data_dictionary['test_features'] = self.data_dictionary['test_features'][(drop_index==False)]
self.data_dictionary['test_weights'] = self.data_dictionary['test_weights'][(drop_index==False)]
return
def build_feature_list(self, config: dict) -> int:
"""
Build the list of features that will be used to filter
the full dataframe. Feature list is construced from the
user configuration file.
:params:
:config: Canonical freqtrade config file containing all
user defined input in config['freqai] dictionary.
"""
features = []
for tf in config['freqai']['timeframes']:
for ft in config['freqai']['base_features']:
for n in range(config['freqai']['feature_parameters']['shift']+1):
shift=''
if n>0: shift = '_shift-'+str(n)
features.append(ft+shift+'_'+tf)
for p in config['freqai']['corr_pairlist']:
features.append(p.split("/")[0]+'-'+ft+shift+'_'+tf)
print('number of features',len(features))
return features
def check_if_pred_in_training_spaces(self) -> None:
"""
Compares the distance from each prediction point to each training data
point. It uses this information to estimate a Dissimilarity Index (DI)
and avoid making predictions on any points that are too far away
from the training data set.
"""
print('checking if prediction features are in AOA')
distance = pairwise_distances(self.data_dictionary['train_features'],
self.data_dictionary['prediction_features'],n_jobs=-1)
do_predict = np.where(distance.min(axis=0) /
self.data['avg_mean_dist'] < self.config['freqai']['feature_parameters']['DI_threshold'],1,0)
print('Distance checker tossed',len(do_predict)-do_predict.sum(),
'predictions for being too far from training data')
self.do_predict += do_predict
self.do_predict -= 1
def set_weights_higher_recent(self, num_weights: int) -> int:
"""
Set weights so that recent data is more heavily weighted during
training than older data.
"""
weights = np.zeros(num_weights)
for i in range(1, len(weights)):
weights[len(weights) - i] = np.exp(-i/
(self.config['freqai']['feature_parameters']['weight_factor']*num_weights))
return weights
def append_predictions(self, predictions, do_predict, len_dataframe):
"""
Append backtest prediction from current backtest period to all previous periods
"""
ones = np.ones(len_dataframe)
s_mean, s_std = ones*self.data['s_mean'], ones*self.data['s_std']
self.predictions = np.append(self.predictions,predictions)
self.do_predict = np.append(self.do_predict,do_predict)
self.target_mean = np.append(self.target_mean,s_mean)
self.target_std = np.append(self.target_std,s_std)
return
def fill_predictions(self, len_dataframe):
"""
Back fill values to before the backtesting range so that the dataframe matches size
when it goes back to the strategy. These rows are not included in the backtest.
"""
filler = np.zeros(len_dataframe -len(self.predictions)) # startup_candle_count
self.predictions = np.append(filler,self.predictions)
self.do_predict = np.append(filler,self.do_predict)
self.target_mean = np.append(filler,self.target_mean)
self.target_std = np.append(filler,self.target_std)
return
def np_encoder(self, object):
if isinstance(object, np.generic):
return object.item()

View File

@@ -0,0 +1,158 @@
import os
import numpy as np
import pandas as pd
from pandas import DataFrame
import shutil
import gc
from typing import Any, Dict, Optional, Tuple
from abc import ABC
from freqtrade.freqai.data_handler import DataHandler
pd.options.mode.chained_assignment = None
class IFreqaiModel(ABC):
"""
Class containing all tools for training and prediction in the strategy.
User models should inherit from this class as shown in
templates/ExamplePredictionModel.py where the user overrides
train(), predict(), fit(), and make_labels().
"""
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
self.freqai_info = config['freqai']
self.data_split_parameters = config['freqai']['data_split_parameters']
self.model_training_parameters = config['freqai']['model_training_parameters']
self.feature_parameters = config['freqai']['feature_parameters']
self.full_path = (str(config['user_data_dir'])+
"/models/"+self.freqai_info['full_timerange']+
'-'+self.freqai_info['identifier'])
self.metadata = {}
self.data = {}
self.time_last_trained = None
self.current_time = None
self.model = None
self.predictions = None
if not os.path.exists(self.full_path):
os.mkdir(self.full_path)
shutil.copy(self.config['config_files'][0],self.full_path+"/"+self.config['config_files'][0])
def start(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""
Entry point to the FreqaiModel, it will train a new model if
necesssary before making the prediction.
The backtesting and training paradigm is a sliding training window
with a following backtest window. Both windows slide according to the
length of the backtest window. This function is not intended to be
overridden by children of IFreqaiModel, but technically, it can be
if the user wishes to make deeper changes to the sliding window
logic.
:params:
:dataframe: Full dataframe coming from strategy - it contains entire
backtesting timerange + additional historical data necessary to train
the model.
:metadata: pair metadataa coming from strategy.
"""
self.pair = metadata['pair']
self.dh = DataHandler(self.config, dataframe, self.data)
print('going to train',len(self.dh.training_timeranges),
'timeranges:',self.dh.training_timeranges)
predictions = np.array([])
do_predict = np.array([])
target_mean = np.array([])
target_std = np.array([])
# Loop enforcing the sliding window training/backtesting paragigm
# tr_train is the training time range e.g. 1 historical month
# tr_backtest is the backtesting time range e.g. the week directly
# following tr_train. Both of these windows slide through the
# entire backtest
for tr_train, tr_backtest in zip(self.dh.training_timeranges,
self.dh.backtesting_timeranges):
gc.collect()
#self.config['timerange'] = tr_train
self.dh.data = {} # clean the pair specific data between models
self.freqai_info['training_timerange'] = tr_train
dataframe_train = self.dh.slice_dataframe(tr_train, dataframe)
dataframe_backtest = self.dh.slice_dataframe(tr_backtest, dataframe)
print("training",self.pair,"for",tr_train)
self.dh.model_path = self.full_path+"/"+ 'sub-train'+'-'+str(tr_train)+'/'
if not self.model_exists(self.pair, training_timerange=tr_train):
self.model = self.train(dataframe_train, metadata)
self.dh.save_data(self.model)
else:
self.model = self.dh.load_data(self.dh.model_path)
preds, do_preds = self.predict(dataframe_backtest)
self.dh.append_predictions(preds,do_preds,len(dataframe_backtest))
self.dh.fill_predictions(len(dataframe))
return self.dh.predictions, self.dh.do_predict, self.dh.target_mean, self.dh.target_std
def make_labels(self, dataframe: DataFrame) -> DataFrame:
"""
User defines the labels here (target values).
:params:
:dataframe: the full dataframe for the present training period
"""
return dataframe
def train(self, unfiltered_dataframe: DataFrame, metadata: dict) -> Tuple[DataFrame, DataFrame]:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahandler
for storing, saving, loading, and managed.
:params:
:unfiltered_dataframe: Full dataframe for the current training period
:metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
return unfiltered_dataframe, unfiltered_dataframe
def fit(self) -> Any:
"""
Most regressors use the same function names and arguments e.g. user
can drop in LGBMRegressor in place of CatBoostRegressor and all data
management will be properly handled by Freqai.
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
return None
def predict(self) -> Optional[Tuple[DataFrame, DataFrame]]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
return None
def model_exists(self, pair: str, training_timerange: str = None) -> bool:
"""
Given a pair and path, check if a model already exists
:param pair: pair e.g. BTC/USD
:param path: path to model
"""
coin,_ = pair.split('/')
self.dh.model_filename = f"cb_"+coin.lower()+"_"+self.freqai_info['trained_stake']+"_"+training_timerange
file_exists = os.path.isfile(self.dh.model_path+
self.dh.model_filename+"_model.joblib")
if file_exists:
print("Found model at", self.dh.model_path+self.dh.model_filename)
else: print("Could not find model at",
self.dh.model_path+self.dh.model_filename)
return file_exists

View File

@@ -0,0 +1,12 @@
from freqtrade.resolvers.freqaimodel_resolver import FreqaiModelResolver
class CustomModel:
"""
A bridge between the user defined IFreqaiModel class
and the strategy.
"""
def __init__(self,config):
self.bridge = FreqaiModelResolver.load_freqaimodel(config)

View File

@@ -204,6 +204,12 @@ class Backtesting:
"""
self.progress.init_step(BacktestState.DATALOAD, 1)
if self.config['freqaimodel']:
self.required_startup += int((self.config['freqai']['train_period']*86400) /
timeframe_to_seconds(self.config['timeframe']))
self.config['startup_candle_count'] = self.required_startup
data = history.load_data(
datadir=self.config['datadir'],
pairs=self.pairlists.whitelist,

View File

@@ -0,0 +1,45 @@
# pragma pylint: disable=attribute-defined-outside-init
"""
This module load a custom model for freqai
"""
import logging
from pathlib import Path
from typing import Dict
from freqtrade.constants import USERPATH_FREQAIMODELS
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.resolvers import IResolver
logger = logging.getLogger(__name__)
class FreqaiModelResolver(IResolver):
"""
This class contains all the logic to load custom hyperopt loss class
"""
object_type = IFreqaiModel
object_type_str = "FreqaiModel"
user_subdir = USERPATH_FREQAIMODELS
initial_search_path = Path(__file__).parent.parent.joinpath('optimize').resolve()
@staticmethod
def load_freqaimodel(config: Dict) -> IFreqaiModel:
"""
Load the custom class from config parameter
:param config: configuration dictionary
"""
freqaimodel_name = config.get('freqaimodel')
if not freqaimodel_name:
raise OperationalException(
"No freqaimodel set. Please use `--freqaimodel` to "
"specify the FreqaiModel class to use.\n"
)
freqaimodel = FreqaiModelResolver.load_object(freqaimodel_name,
config, kwargs={'config': config},
extra_dir=config.get('freqaimodel_path'))
return freqaimodel

View File

@@ -0,0 +1,139 @@
import numpy as np
import pandas as pd
from catboost import CatBoostRegressor, Pool
from pandas import DataFrame
from typing import Any, Dict, Tuple
from freqtrade.freqai.freqai_interface import IFreqaiModel
class ExamplePredictionModel(IFreqaiModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def make_labels(self, dataframe: DataFrame) -> DataFrame:
"""
User defines the labels here (target values).
:params:
:dataframe: the full dataframe for the present training period
"""
dataframe['s'] = (dataframe['close'].shift(-self.feature_parameters['period']).rolling(
self.feature_parameters['period']).max() / dataframe['close'] - 1)
self.dh.data['s_mean'] = dataframe['s'].mean()
self.dh.data['s_std'] = dataframe['s'].std()
print('label mean',self.dh.data['s_mean'],'label std',self.dh.data['s_std'])
return dataframe['s']
def train(self, unfiltered_dataframe: DataFrame, metadata: dict) -> Tuple[DataFrame, DataFrame]:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahandler
for storing, saving, loading, and managed.
:params:
:unfiltered_dataframe: Full dataframe for the current training period
:metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
print("--------------------Starting training--------------------")
# create the full feature list based on user config info
self.dh.training_features_list = self.dh.build_feature_list(self.config)
unfiltered_labels = self.make_labels(unfiltered_dataframe)
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = self.dh.filter_features(unfiltered_dataframe,
self.dh.training_features_list, unfiltered_labels, training_filter=True)
# split data into train/test data.
data_dictionary = self.dh.make_train_test_datasets(features_filtered, labels_filtered)
# standardize all data based on train_dataset only
data_dictionary = self.dh.standardize_data(data_dictionary)
# optional additional data cleaning
if self.feature_parameters['principal_component_analysis']:
self.dh.principal_component_analysis()
if self.feature_parameters["remove_outliers"]:
self.dh.remove_outliers(predict=False)
if self.feature_parameters['DI_threshold']:
self.dh.data['avg_mean_dist'] = self.dh.compute_distances()
print("length of train data", len(data_dictionary['train_features']))
model = self.fit(data_dictionary)
print('Finished training')
print(f'--------------------done training {metadata["pair"]}--------------------')
return model
def fit(self, data_dictionary: Dict) -> Any:
"""
Most regressors use the same function names and arguments e.g. user
can drop in LGBMRegressor in place of CatBoostRegressor and all data
management will be properly handled by Freqai.
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
train_data = Pool(
data=data_dictionary['train_features'],
label=data_dictionary['train_labels'],
weight=data_dictionary['train_weights']
)
test_data = Pool(
data=data_dictionary['test_features'],
label=data_dictionary['test_labels'],
weight=data_dictionary['test_weights']
)
model = CatBoostRegressor(verbose=100, early_stopping_rounds=400,
**self.model_training_parameters)
model.fit(X=train_data, eval_set=test_data)
return model
def predict(self, unfiltered_dataframe: DataFrame) -> Tuple[DataFrame, DataFrame]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
print("--------------------Starting prediction--------------------")
original_feature_list = self.dh.build_feature_list(self.config)
filtered_dataframe, _ = self.dh.filter_features(unfiltered_dataframe, original_feature_list, training_filter=False)
filtered_dataframe = self.dh.standardize_data_from_metadata(filtered_dataframe)
self.dh.data_dictionary['prediction_features'] = filtered_dataframe
# optional additional data cleaning
if self.feature_parameters['principal_component_analysis']:
pca_components = self.dh.pca.transform(filtered_dataframe)
self.dh.data_dictionary['prediction_features'] = pd.DataFrame(data=pca_components,
columns = ['PC'+str(i) for i in range(0,self.dh.data['n_kept_components'])],
index = filtered_dataframe.index)
if self.feature_parameters["remove_outliers"]:
self.dh.remove_outliers(predict=True) # creates dropped index
if self.feature_parameters['DI_threshold']:
self.dh.check_if_pred_in_training_spaces() # sets do_predict
predictions = self.model.predict(self.dh.data_dictionary['prediction_features'])
# compute the non-standardized predictions
predictions = predictions * self.dh.data['labels_std'] + self.dh.data['labels_mean']
print("--------------------Finished prediction--------------------")
return (predictions, self.dh.do_predict)

View File

@@ -0,0 +1,179 @@
import logging
import talib.abstract as ta
from pandas import DataFrame
import pandas as pd
from technical import qtpylib
import numpy as np
from freqtrade.strategy import (merge_informative_pair)
from freqtrade.strategy.interface import IStrategy
from freqtrade.freqai.strategy_bridge import CustomModel
from functools import reduce
logger = logging.getLogger(__name__)
class FreqaiExampleStrategy(IStrategy):
"""
Example strategy showing how the user connects their own
IFreqaiModel to the strategy. Namely, the user uses:
self.model = CustomModel(self.config)
self.model.bridge.start(dataframe, metadata)
to make predictions on their data. populate_any_indicators() automatically
generates the variety of features indicated by the user in the
canonical freqtrade configuration file under config['freqai'].
"""
minimal_roi = {
"0": 0.01,
"240": -1
}
plot_config = {
'main_plot': {
},
'subplots': {
"prediction":{
'prediction':{'color':'blue'}
},
"target_roi":{
'target_roi':{'color':'brown'},
},
"do_predict":{
'do_predict':{'color':'brown'},
},
}
}
stoploss = -0.05
use_sell_signal = True
startup_candle_count: int = 1000
def informative_pairs(self):
pairs = self.freqai_info['corr_pairlist']
informative_pairs = []
for tf in self.timeframes:
informative_pairs.append([(pair, tf) for pair in pairs])
return informative_pairs
def populate_any_indicators(self, pair, df, tf, informative=None,coin=''):
"""
Function designed to automatically generate, name and merge features
from user indicated timeframes in the configuration file. User can add
additional features here, but must follow the naming convention.
:params:
:pair: pair to be used as informative
:df: strategy dataframe which will receive merges from informatives
:tf: timeframe of the dataframe which will modify the feature names
:informative: the dataframe associated with the informative pair
:coin: the name of the coin which will modify the feature names.
"""
if informative is None:
informative = self.dp.get_pair_dataframe(pair, tf)
informative[coin+'rsi'] = ta.RSI(informative, timeperiod=14)
informative[coin+'mfi'] = ta.MFI(informative, timeperiod=25)
informative[coin+'adx'] = ta.ADX(informative, window=20)
informative[coin+'20sma'] = ta.SMA(informative,timeperiod=20)
informative[coin+'21ema'] = ta.EMA(informative,timeperiod=21)
informative[coin+'bmsb'] = np.where(informative[coin+'20sma'].lt(informative[coin+'21ema']),1,0)
informative[coin+'close_over_20sma'] = informative['close']/informative[coin+'20sma']
informative[coin+'mfi'] = ta.MFI(informative, timeperiod=25)
informative[coin+'ema21'] = ta.EMA(informative, timeperiod=21)
informative[coin+'sma20'] = ta.SMA(informative, timeperiod=20)
stoch = ta.STOCHRSI(informative, 15, 20, 2, 2)
informative[coin+'srsi-fk'] = stoch['fastk']
informative[coin+'srsi-fd'] = stoch['fastd']
bollinger = qtpylib.bollinger_bands(qtpylib.typical_price(informative), window=14, stds=2.2)
informative[coin+'bb_lowerband'] = bollinger['lower']
informative[coin+'bb_middleband'] = bollinger['mid']
informative[coin+'bb_upperband'] = bollinger['upper']
informative[coin+'bb_width'] = ((informative[coin+"bb_upperband"] - informative[coin+"bb_lowerband"]) / informative[coin+"bb_middleband"])
informative[coin+'close-bb_lower'] = informative['close'] / informative[coin+'bb_lowerband']
informative[coin+'roc'] = ta.ROC(informative, timeperiod=3)
informative[coin+'adx'] = ta.ADX(informative, window=14)
macd = ta.MACD(informative)
informative[coin+'macd'] = macd['macd']
informative[coin+'pct-change'] = informative['close'].pct_change()
informative[coin+'relative_volume'] = informative['volume'] / informative['volume'].rolling(10).mean()
informative[coin+'pct-change'] = informative['close'].pct_change()
indicators = [col for col in informative if col.startswith(coin)]
for n in range(self.freqai_info['feature_parameters']['shift']+1):
if n==0: continue
informative_shift = informative[indicators].shift(n)
informative_shift = informative_shift.add_suffix('_shift-'+str(n))
informative = pd.concat((informative,informative_shift),axis=1)
df = merge_informative_pair(df, informative, self.config['timeframe'], tf, ffill=True)
skip_columns = [(s + '_'+tf) for s in
['date', 'open', 'high', 'low', 'close', 'volume']]
df = df.drop(columns=skip_columns)
return df
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# the configuration file parameters are stored here
self.freqai_info = self.config['freqai']
# the model is instantiated here
self.model = CustomModel(self.config)
print('Populating indicators...')
# the following loops are necessary for building the features
# indicated by the user in the configuration file.
for tf in self.freqai_info['timeframes']:
dataframe = self.populate_any_indicators(metadata['pair'],
dataframe.copy(), tf)
for i in self.freqai_info['corr_pairlist']:
dataframe = self.populate_any_indicators(i,
dataframe.copy(), tf, coin=i.split("/")[0]+'-')
# the model will return 4 values, its prediction, an indication of whether or not the prediction
# should be accepted, the target mean/std values from the labels used during each training period.
(dataframe['prediction'], dataframe['do_predict'],
dataframe['target_mean'], dataframe['target_std']) = self.model.bridge.start(dataframe, metadata)
dataframe['target_roi'] = dataframe['target_mean']+dataframe['target_std']*0.5
dataframe['sell_roi'] = dataframe['target_mean']-dataframe['target_std']*1.5
return dataframe
def populate_buy_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
buy_conditions = [
(dataframe['prediction'] > dataframe['target_roi'])
&
(dataframe['do_predict'] == 1)
]
if buy_conditions:
dataframe.loc[reduce(lambda x, y: x | y, buy_conditions), 'buy'] = 1
return dataframe
def populate_sell_trend(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# sell_goal = eval('self.'+metadata['pair'].split("/")[0]+'_sell_goal.value')
sell_conditions = [
(dataframe['prediction'] < dataframe['sell_roi'])
&
(dataframe['do_predict'] == 1)
]
if sell_conditions:
dataframe.loc[reduce(lambda x, y: x | y, sell_conditions), 'sell'] = 1
return dataframe
def get_ticker_indicator(self):
return int(self.config['timeframe'][:-1])