Merge branch 'develop' into add-inlier-metric

This commit is contained in:
Robert Caulk
2022-09-06 20:40:21 +02:00
committed by GitHub
38 changed files with 664 additions and 349 deletions

View File

@@ -76,6 +76,8 @@ class FreqaiDataDrawer:
self.full_path / f"follower_dictionary-{self.follower_name}.json"
)
self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
self.historic_predictions_bkp_path = Path(
self.full_path / "historic_predictions.backup.pkl")
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
self.follow_mode = follow_mode
if follow_mode:
@@ -118,13 +120,21 @@ class FreqaiDataDrawer:
"""
exists = self.historic_predictions_path.is_file()
if exists:
with open(self.historic_predictions_path, "rb") as fp:
self.historic_predictions = cloudpickle.load(fp)
logger.info(
f"Found existing historic predictions at {self.full_path}, but beware "
"that statistics may be inaccurate if the bot has been offline for "
"an extended period of time."
)
try:
with open(self.historic_predictions_path, "rb") as fp:
self.historic_predictions = cloudpickle.load(fp)
logger.info(
f"Found existing historic predictions at {self.full_path}, but beware "
"that statistics may be inaccurate if the bot has been offline for "
"an extended period of time."
)
except EOFError:
logger.warning(
'Historical prediction file was corrupted. Trying to load backup file.')
with open(self.historic_predictions_bkp_path, "rb") as fp:
self.historic_predictions = cloudpickle.load(fp)
logger.warning('FreqAI successfully loaded the backup historical predictions file.')
elif not self.follow_mode:
logger.info("Could not find existing historic_predictions, starting from scratch")
else:
@@ -142,6 +152,9 @@ class FreqaiDataDrawer:
with open(self.historic_predictions_path, "wb") as fp:
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
# create a backup
shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
def save_drawer_to_disk(self):
"""
Save data drawer full of all pair model metadata in present model folder.

View File

@@ -18,8 +18,6 @@ from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from freqtrade.configuration import TimeRange
from freqtrade.data.dataprovider import DataProvider
from freqtrade.data.history.history_utils import refresh_backtest_ohlcv_data
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.strategy.interface import IStrategy
@@ -73,6 +71,8 @@ class FreqaiDataKitchen:
self.label_list: List = []
self.training_features_list: List = []
self.model_filename: str = ""
self.backtesting_results_path = Path()
self.backtest_predictions_folder: str = "backtesting_predictions"
self.live = live
self.pair = pair
@@ -291,6 +291,7 @@ class FreqaiDataKitchen:
:returns:
:data_dictionary: updated dictionary with standardized values.
"""
# standardize the data by training stats
train_max = data_dictionary["train_features"].max()
train_min = data_dictionary["train_features"].min()
@@ -324,10 +325,24 @@ class FreqaiDataKitchen:
- 1
)
self.data[f"{item}_max"] = train_labels_max # .to_dict()
self.data[f"{item}_min"] = train_labels_min # .to_dict()
self.data[f"{item}_max"] = train_labels_max
self.data[f"{item}_min"] = train_labels_min
return data_dictionary
def normalize_single_dataframe(self, df: DataFrame) -> DataFrame:
train_max = df.max()
train_min = df.min()
df = (
2 * (df - train_min) / (train_max - train_min) - 1
)
for item in train_max.keys():
self.data[item + "_max"] = train_max[item]
self.data[item + "_min"] = train_min[item]
return df
def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame:
"""
Normalize a set of data using the mean and standard deviation from
@@ -441,7 +456,8 @@ class FreqaiDataKitchen:
start = datetime.fromtimestamp(timerange.startts, tz=timezone.utc)
stop = datetime.fromtimestamp(timerange.stopts, tz=timezone.utc)
df = df.loc[df["date"] >= start, :]
df = df.loc[df["date"] <= stop, :]
if not self.live:
df = df.loc[df["date"] < stop, :]
return df
@@ -454,22 +470,23 @@ class FreqaiDataKitchen:
from sklearn.decomposition import PCA # avoid importing if we dont need it
n_components = self.data_dictionary["train_features"].shape[1]
pca = PCA(n_components=n_components)
pca = PCA(0.999)
pca = pca.fit(self.data_dictionary["train_features"])
n_keep_components = np.argmin(pca.explained_variance_ratio_.cumsum() < 0.999)
pca2 = PCA(n_components=n_keep_components)
n_keep_components = pca.n_components_
self.data["n_kept_components"] = n_keep_components
pca2 = pca2.fit(self.data_dictionary["train_features"])
n_components = self.data_dictionary["train_features"].shape[1]
logger.info("reduced feature dimension by %s", n_components - n_keep_components)
logger.info("explained variance %f", np.sum(pca2.explained_variance_ratio_))
train_components = pca2.transform(self.data_dictionary["train_features"])
logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_))
train_components = pca.transform(self.data_dictionary["train_features"])
self.data_dictionary["train_features"] = pd.DataFrame(
data=train_components,
columns=["PC" + str(i) for i in range(0, n_keep_components)],
index=self.data_dictionary["train_features"].index,
)
# normalsing transformed training features
self.data_dictionary["train_features"] = self.normalize_single_dataframe(
self.data_dictionary["train_features"])
# keeping a copy of the non-transformed features so we can check for errors during
# model load from disk
@@ -477,15 +494,18 @@ class FreqaiDataKitchen:
self.training_features_list = self.data_dictionary["train_features"].columns
if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
test_components = pca2.transform(self.data_dictionary["test_features"])
test_components = pca.transform(self.data_dictionary["test_features"])
self.data_dictionary["test_features"] = pd.DataFrame(
data=test_components,
columns=["PC" + str(i) for i in range(0, n_keep_components)],
index=self.data_dictionary["test_features"].index,
)
# normalise transformed test feature to transformed training features
self.data_dictionary["test_features"] = self.normalize_data_from_metadata(
self.data_dictionary["test_features"])
self.data["n_kept_components"] = n_keep_components
self.pca = pca2
self.pca = pca
logger.info(f"PCA reduced total features from {n_components} to {n_keep_components}")
@@ -506,6 +526,9 @@ class FreqaiDataKitchen:
columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])],
index=filtered_dataframe.index,
)
# normalise transformed predictions to transformed training features
self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata(
self.data_dictionary["prediction_features"])
def compute_distances(self) -> float:
"""
@@ -885,9 +908,10 @@ class FreqaiDataKitchen:
weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
return weights
def append_predictions(self, predictions: DataFrame, do_predict: npt.ArrayLike) -> None:
def get_predictions_to_append(self, predictions: DataFrame,
do_predict: npt.ArrayLike) -> DataFrame:
"""
Append backtest prediction from current backtest period to all previous periods
Get backtest prediction from current backtest period
"""
append_df = DataFrame()
@@ -902,13 +926,18 @@ class FreqaiDataKitchen:
if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
append_df["DI_values"] = self.DI_values
return append_df
def append_predictions(self, append_df: DataFrame) -> None:
"""
Append backtest prediction from current backtest period to all previous periods
"""
if self.full_df.empty:
self.full_df = append_df
else:
self.full_df = pd.concat([self.full_df, append_df], axis=0)
return
def fill_predictions(self, dataframe):
"""
Back fill values to before the backtesting range so that the dataframe matches size
@@ -1008,9 +1037,7 @@ class FreqaiDataKitchen:
# We notice that users like to use exotic indicators where
# they do not know the required timeperiod. Here we include a factor
# of safety by multiplying the user considered "max" by 2.
max_period = self.freqai_config["feature_parameters"].get(
"indicator_max_period_candles", 20
) * 2
max_period = self.config.get('startup_candle_count', 20) * 2
additional_seconds = max_period * max_tf_seconds
if trained_timestamp != 0:
@@ -1056,31 +1083,6 @@ class FreqaiDataKitchen:
self.model_filename = f"cb_{coin.lower()}_{int(trained_timerange.stopts)}"
def download_all_data_for_training(self, timerange: TimeRange, dp: DataProvider) -> None:
"""
Called only once upon start of bot to download the necessary data for
populating indicators and training the model.
:param timerange: TimeRange = The full data timerange for populating the indicators
and training the model.
:param dp: DataProvider instance attached to the strategy
"""
new_pairs_days = int((timerange.stopts - timerange.startts) / SECONDS_IN_DAY)
if not dp._exchange:
# Not realistic - this is only called in live mode.
raise OperationalException("Dataprovider did not have an exchange attached.")
refresh_backtest_ohlcv_data(
dp._exchange,
pairs=self.all_pairs,
timeframes=self.freqai_config["feature_parameters"].get("include_timeframes"),
datadir=self.config["datadir"],
timerange=timerange,
new_pairs_days=new_pairs_days,
erase=False,
data_format=self.config.get("dataformat_ohlcv", "json"),
trading_mode=self.config.get("trading_mode", "spot"),
prepend=self.config.get("prepend_data", False),
)
def set_all_pairs(self) -> None:
self.all_pairs = copy.deepcopy(
@@ -1194,3 +1196,50 @@ class FreqaiDataKitchen:
if self.unique_classes:
for label in self.unique_classes:
self.unique_class_list += list(self.unique_classes[label])
def save_backtesting_prediction(
self, append_df: DataFrame
) -> None:
"""
Save prediction dataframe from backtesting to h5 file format
:param append_df: dataframe for backtesting period
"""
full_predictions_folder = Path(self.full_path / self.backtest_predictions_folder)
if not full_predictions_folder.is_dir():
full_predictions_folder.mkdir(parents=True, exist_ok=True)
append_df.to_hdf(self.backtesting_results_path, key='append_df', mode='w')
def get_backtesting_prediction(
self
) -> DataFrame:
"""
Get prediction dataframe from h5 file format
"""
append_df = pd.read_hdf(self.backtesting_results_path)
return append_df
def check_if_backtest_prediction_exists(
self
) -> bool:
"""
Check if a backtesting prediction already exists
:param dk: FreqaiDataKitchen
:return:
:boolean: whether the prediction file exists or not.
"""
path_to_predictionfile = Path(self.full_path /
self.backtest_predictions_folder /
f"{self.model_filename}_prediction.h5")
self.backtesting_results_path = path_to_predictionfile
file_exists = path_to_predictionfile.is_file()
if file_exists:
logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
else:
logger.info(
f"Could not find backtesting prediction file at {path_to_predictionfile}"
)
return file_exists

View File

@@ -6,7 +6,7 @@ from abc import ABC, abstractmethod
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Dict, Tuple
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
@@ -26,13 +26,6 @@ pd.options.mode.chained_assignment = None
logger = logging.getLogger(__name__)
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs).start()
return wrapper
class IFreqaiModel(ABC):
"""
Class containing all tools for training and prediction in the strategy.
@@ -69,6 +62,9 @@ class IFreqaiModel(ABC):
self.first = True
self.set_full_path()
self.follow_mode: bool = self.freqai_info.get("follow_mode", False)
self.save_backtest_models: bool = self.freqai_info.get("save_backtest_models", False)
if self.save_backtest_models:
logger.info('Backtesting module configured to save all models.')
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
self.identifier: str = self.freqai_info.get("identifier", "no_id_provided")
self.scanning = False
@@ -92,6 +88,9 @@ class IFreqaiModel(ABC):
self.begin_time_train: float = 0
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
self._threads: List[threading.Thread] = []
self._stop_event = threading.Event()
def assert_config(self, config: Dict[str, Any]) -> None:
if not config.get("freqai", {}):
@@ -125,10 +124,9 @@ class IFreqaiModel(ABC):
elif not self.follow_mode:
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
with self.analysis_lock:
dataframe = self.dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
)
dataframe = self.dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
)
dk = self.start_backtesting(dataframe, metadata, self.dk)
dataframe = dk.remove_features_from_df(dk.return_dataframe)
@@ -146,15 +144,34 @@ class IFreqaiModel(ABC):
self.model = None
self.dk = None
@threaded
def start_scanning(self, strategy: IStrategy) -> None:
def shutdown(self):
"""
Cleans up threads on Shutdown, set stop event. Join threads to wait
for current training iteration.
"""
logger.info("Stopping FreqAI")
self._stop_event.set()
logger.info("Waiting on Training iteration")
for _thread in self._threads:
_thread.join()
def start_scanning(self, *args, **kwargs) -> None:
"""
Start `self._start_scanning` in a separate thread
"""
_thread = threading.Thread(target=self._start_scanning, args=args, kwargs=kwargs)
self._threads.append(_thread)
_thread.start()
def _start_scanning(self, strategy: IStrategy) -> None:
"""
Function designed to constantly scan pairs for retraining on a separate thread (intracandle)
to improve model youth. This function is agnostic to data preparation/collection/storage,
it simply trains on what ever data is available in the self.dd.
:param strategy: IStrategy = The user defined strategy class
"""
while 1:
while not self._stop_event.is_set():
time.sleep(1)
for pair in self.config.get("exchange", {}).get("pair_whitelist"):
@@ -225,28 +242,39 @@ class IFreqaiModel(ABC):
"trains"
)
trained_timestamp_int = int(trained_timestamp.stopts)
dk.data_path = Path(
dk.full_path
/
f"sub-train-{metadata['pair'].split('/')[0]}_{int(trained_timestamp.stopts)}"
f"sub-train-{metadata['pair'].split('/')[0]}_{trained_timestamp_int}"
)
if not self.model_exists(
metadata["pair"], dk, trained_timestamp=int(trained_timestamp.stopts)
):
dk.find_features(dataframe_train)
self.model = self.train(dataframe_train, metadata["pair"], dk)
self.dd.pair_dict[metadata["pair"]]["trained_timestamp"] = int(
trained_timestamp.stopts)
dk.set_new_model_names(metadata["pair"], trained_timestamp)
self.dd.save_data(self.model, metadata["pair"], dk)
dk.set_new_model_names(metadata["pair"], trained_timestamp)
if dk.check_if_backtest_prediction_exists():
append_df = dk.get_backtesting_prediction()
dk.append_predictions(append_df)
else:
self.model = self.dd.load_data(metadata["pair"], dk)
if not self.model_exists(
metadata["pair"], dk, trained_timestamp=trained_timestamp_int
):
dk.find_features(dataframe_train)
self.model = self.train(dataframe_train, metadata["pair"], dk)
self.dd.pair_dict[metadata["pair"]]["trained_timestamp"] = int(
trained_timestamp.stopts)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
if self.save_backtest_models:
logger.info('Saving backtest model to disk.')
self.dd.save_data(self.model, metadata["pair"], dk)
else:
self.model = self.dd.load_data(metadata["pair"], dk)
pred_df, do_preds = self.predict(dataframe_backtest, dk)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
dk.append_predictions(pred_df, do_preds)
pred_df, do_preds = self.predict(dataframe_backtest, dk)
append_df = dk.get_predictions_to_append(pred_df, do_preds)
dk.append_predictions(append_df)
dk.save_backtesting_prediction(append_df)
dk.fill_predictions(dataframe)
@@ -291,14 +319,8 @@ class IFreqaiModel(ABC):
)
dk.set_paths(metadata["pair"], new_trained_timerange.stopts)
# download candle history if it is not already in memory
# load candle history into memory if it is not yet.
if not self.dd.historic_data:
logger.info(
"Downloading all training data for all pairs in whitelist and "
"corr_pairlist, this may take a while if you do not have the "
"data saved"
)
dk.download_all_data_for_training(data_load_timerange, strategy.dp)
self.dd.load_all_pair_histories(data_load_timerange, dk)
if not self.scanning:
@@ -471,11 +493,6 @@ class IFreqaiModel(ABC):
:return:
:boolean: whether the model file exists or not.
"""
coin, _ = pair.split("/")
if not self.live:
dk.model_filename = model_filename = f"cb_{coin.lower()}_{trained_timestamp}"
path_to_modelfile = Path(dk.data_path / f"{model_filename}_model.joblib")
file_exists = path_to_modelfile.is_file()
if file_exists and not scanning:
@@ -628,8 +645,8 @@ class IFreqaiModel(ABC):
logger.info(
f'Total time spent inferencing pairlist {self.inference_time:.2f} seconds')
if self.inference_time > 0.25 * self.base_tf_seconds:
logger.warning('Inference took over 25/% of the candle time. Reduce pairlist to'
' avoid blinding open trades and degrading performance.')
logger.warning("Inference took over 25% of the candle time. Reduce pairlist to"
" avoid blinding open trades and degrading performance.")
self.pair_it = 0
self.inference_time = 0
return

134
freqtrade/freqai/utils.py Normal file
View File

@@ -0,0 +1,134 @@
import logging
from datetime import datetime, timezone
from freqtrade.configuration import TimeRange
from freqtrade.data.dataprovider import DataProvider
from freqtrade.data.history.history_utils import refresh_backtest_ohlcv_data
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.exchange.exchange import market_is_active
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist
logger = logging.getLogger(__name__)
def download_all_data_for_training(dp: DataProvider, config: dict) -> None:
"""
Called only once upon start of bot to download the necessary data for
populating indicators and training the model.
:param timerange: TimeRange = The full data timerange for populating the indicators
and training the model.
:param dp: DataProvider instance attached to the strategy
"""
if dp._exchange is None:
raise OperationalException('No exchange object found.')
markets = [p for p, m in dp._exchange.markets.items() if market_is_active(m)
or config.get('include_inactive')]
all_pairs = dynamic_expand_pairlist(config, markets)
timerange = get_required_data_timerange(config)
new_pairs_days = int((timerange.stopts - timerange.startts) / 86400)
refresh_backtest_ohlcv_data(
dp._exchange,
pairs=all_pairs,
timeframes=config["freqai"]["feature_parameters"].get("include_timeframes"),
datadir=config["datadir"],
timerange=timerange,
new_pairs_days=new_pairs_days,
erase=False,
data_format=config.get("dataformat_ohlcv", "json"),
trading_mode=config.get("trading_mode", "spot"),
prepend=config.get("prepend_data", False),
)
def get_required_data_timerange(
config: dict
) -> TimeRange:
"""
Used to compute the required data download time range
for auto data-download in FreqAI
"""
time = datetime.now(tz=timezone.utc).timestamp()
timeframes = config["freqai"]["feature_parameters"].get("include_timeframes")
max_tf_seconds = 0
for tf in timeframes:
secs = timeframe_to_seconds(tf)
if secs > max_tf_seconds:
max_tf_seconds = secs
startup_candles = config.get('startup_candle_count', 0)
indicator_periods = config["freqai"]["feature_parameters"]["indicator_periods_candles"]
# factor the max_period as a factor of safety.
max_period = int(max(startup_candles, max(indicator_periods)) * 1.5)
config['startup_candle_count'] = max_period
logger.info(f'FreqAI auto-downloader using {max_period} startup candles.')
additional_seconds = max_period * max_tf_seconds
startts = int(
time
- config["freqai"].get("train_period_days", 0) * 86400
- additional_seconds
)
stopts = int(time)
data_load_timerange = TimeRange('date', 'date', startts, stopts)
return data_load_timerange
# Keep below for when we wish to download heterogeneously lengthed data for FreqAI.
# def download_all_data_for_training(dp: DataProvider, config: dict) -> None:
# """
# Called only once upon start of bot to download the necessary data for
# populating indicators and training a FreqAI model.
# :param timerange: TimeRange = The full data timerange for populating the indicators
# and training the model.
# :param dp: DataProvider instance attached to the strategy
# """
# if dp._exchange is not None:
# markets = [p for p, m in dp._exchange.markets.items() if market_is_active(m)
# or config.get('include_inactive')]
# else:
# # This should not occur:
# raise OperationalException('No exchange object found.')
# all_pairs = dynamic_expand_pairlist(config, markets)
# if not dp._exchange:
# # Not realistic - this is only called in live mode.
# raise OperationalException("Dataprovider did not have an exchange attached.")
# time = datetime.now(tz=timezone.utc).timestamp()
# for tf in config["freqai"]["feature_parameters"].get("include_timeframes"):
# timerange = TimeRange()
# timerange.startts = int(time)
# timerange.stopts = int(time)
# startup_candles = dp.get_required_startup(str(tf))
# tf_seconds = timeframe_to_seconds(str(tf))
# timerange.subtract_start(tf_seconds * startup_candles)
# new_pairs_days = int((timerange.stopts - timerange.startts) / 86400)
# # FIXME: now that we are looping on `refresh_backtest_ohlcv_data`, the function
# # redownloads the funding rate for each pair.
# refresh_backtest_ohlcv_data(
# dp._exchange,
# pairs=all_pairs,
# timeframes=[tf],
# datadir=config["datadir"],
# timerange=timerange,
# new_pairs_days=new_pairs_days,
# erase=False,
# data_format=config.get("dataformat_ohlcv", "json"),
# trading_mode=config.get("trading_mode", "spot"),
# prepend=config.get("prepend_data", False),
# )