merge datarehaul into main freqai branch
This commit is contained in:
@@ -25,9 +25,6 @@ from freqtrade.resolvers import ExchangeResolver
|
||||
from freqtrade.strategy.interface import IStrategy
|
||||
|
||||
|
||||
# import scipy as spy # used for auto distribution assignment
|
||||
|
||||
|
||||
SECONDS_IN_DAY = 86400
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -52,6 +49,7 @@ class FreqaiDataKitchen:
|
||||
self.target_std: npt.ArrayLike = np.array([])
|
||||
self.full_predictions: npt.ArrayLike = np.array([])
|
||||
self.full_do_predict: npt.ArrayLike = np.array([])
|
||||
self.full_DI_values: npt.ArrayLike = np.array([])
|
||||
self.full_target_mean: npt.ArrayLike = np.array([])
|
||||
self.full_target_std: npt.ArrayLike = np.array([])
|
||||
self.data_path = Path()
|
||||
@@ -59,6 +57,7 @@ class FreqaiDataKitchen:
|
||||
self.live = live
|
||||
self.pair = pair
|
||||
self.svm_model: linear_model.SGDOneClassSVM = None
|
||||
self.set_all_pairs()
|
||||
if not self.live:
|
||||
self.full_timerange = self.create_fulltimerange(self.config["timerange"],
|
||||
self.freqai_config.get("train_period")
|
||||
@@ -73,6 +72,12 @@ class FreqaiDataKitchen:
|
||||
self.data_drawer = data_drawer
|
||||
|
||||
def set_paths(self, metadata: dict, trained_timestamp: int = None,) -> None:
|
||||
"""
|
||||
Set the paths to the data for the present coin/botloop
|
||||
:params:
|
||||
metadata: dict = strategy furnished pair metadata
|
||||
trained_timestamp: int = timestamp of most recent training
|
||||
"""
|
||||
self.full_path = Path(self.config['user_data_dir'] /
|
||||
"models" /
|
||||
str(self.freqai_config.get('identifier')))
|
||||
@@ -293,7 +298,7 @@ class FreqaiDataKitchen:
|
||||
)
|
||||
if (1 - len(filtered_dataframe) / len(unfiltered_dataframe)) > 0.1 and self.live:
|
||||
logger.warning(
|
||||
f' {(1 - len(filtered_dataframe)/len(unfiltered_dataframe)) * 100} percent'
|
||||
f' {(1 - len(filtered_dataframe)/len(unfiltered_dataframe)) * 100:.2f} percent'
|
||||
' of training data dropped due to NaNs, model may perform inconsistent'
|
||||
'with expectations'
|
||||
)
|
||||
@@ -515,6 +520,11 @@ class FreqaiDataKitchen:
|
||||
return None
|
||||
|
||||
def pca_transform(self, filtered_dataframe: DataFrame) -> None:
|
||||
"""
|
||||
Use an existing pca transform to transform data into components
|
||||
:params:
|
||||
filtered_dataframe: DataFrame = the cleaned dataframe
|
||||
"""
|
||||
pca_components = self.pca.transform(filtered_dataframe)
|
||||
self.data_dictionary["prediction_features"] = pd.DataFrame(
|
||||
data=pca_components,
|
||||
@@ -523,14 +533,26 @@ class FreqaiDataKitchen:
|
||||
)
|
||||
|
||||
def compute_distances(self) -> float:
|
||||
"""
|
||||
Compute distances between each training point and every other training
|
||||
point. This metric defines the neighborhood of trained data and is used
|
||||
for prediction confidence in the Dissimilarity Index
|
||||
"""
|
||||
logger.info("computing average mean distance for all training points")
|
||||
pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=-1)
|
||||
tc = self.freqai_config.get('model_training_parameters', {}).get('thread_count', -1)
|
||||
pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=tc)
|
||||
avg_mean_dist = pairwise.mean(axis=1).mean()
|
||||
logger.info("avg_mean_dist %s", avg_mean_dist)
|
||||
logger.info(f'avg_mean_dist {avg_mean_dist:.2f}')
|
||||
|
||||
return avg_mean_dist
|
||||
|
||||
def use_SVM_to_remove_outliers(self, predict: bool) -> None:
|
||||
"""
|
||||
Build/inference a Support Vector Machine to detect outliers
|
||||
in training data and prediction
|
||||
:params:
|
||||
predict: bool = If true, inference an existing SVM model, else construct one
|
||||
"""
|
||||
|
||||
if predict:
|
||||
assert self.svm_model, "No svm model available for outlier removal"
|
||||
@@ -581,6 +603,13 @@ class FreqaiDataKitchen:
|
||||
return
|
||||
|
||||
def find_features(self, dataframe: DataFrame) -> list:
|
||||
"""
|
||||
Find features in the strategy provided dataframe
|
||||
:params:
|
||||
dataframe: DataFrame = strategy provided dataframe
|
||||
:returns:
|
||||
features: list = the features to be used for training/prediction
|
||||
"""
|
||||
column_names = dataframe.columns
|
||||
features = [c for c in column_names if '%' in c]
|
||||
if not features:
|
||||
@@ -601,17 +630,19 @@ class FreqaiDataKitchen:
|
||||
n_jobs=-1,
|
||||
)
|
||||
|
||||
self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
|
||||
|
||||
do_predict = np.where(
|
||||
distance.min(axis=0) / self.data["avg_mean_dist"]
|
||||
self.DI_values
|
||||
< self.freqai_config.get("feature_parameters", {}).get("DI_threshold"),
|
||||
1,
|
||||
0,
|
||||
)
|
||||
|
||||
# logger.info(
|
||||
# "Distance checker tossed %s predictions for being too far from training data",
|
||||
# len(do_predict) - do_predict.sum(),
|
||||
# )
|
||||
logger.info(
|
||||
f'DI tossed {len(do_predict) - do_predict.sum():.2f} predictions for '
|
||||
'being too far from training data'
|
||||
)
|
||||
|
||||
self.do_predict += do_predict
|
||||
self.do_predict -= 1
|
||||
@@ -639,6 +670,8 @@ class FreqaiDataKitchen:
|
||||
|
||||
self.full_predictions = np.append(self.full_predictions, predictions)
|
||||
self.full_do_predict = np.append(self.full_do_predict, do_predict)
|
||||
if self.freqai_config.get('feature_parameters', {}).get('DI-threshold', 0) > 0:
|
||||
self.full_DI_values = np.append(self.full_DI_values, self.DI_values)
|
||||
self.full_target_mean = np.append(self.full_target_mean, target_mean)
|
||||
self.full_target_std = np.append(self.full_target_std, target_std)
|
||||
|
||||
@@ -653,6 +686,8 @@ class FreqaiDataKitchen:
|
||||
filler = np.zeros(len_dataframe - len(self.full_predictions)) # startup_candle_count
|
||||
self.full_predictions = np.append(filler, self.full_predictions)
|
||||
self.full_do_predict = np.append(filler, self.full_do_predict)
|
||||
if self.freqai_config.get('feature_parameters', {}).get('DI-threshold', 0) > 0:
|
||||
self.full_DI_values = np.append(filler, self.full_DI_values)
|
||||
self.full_target_mean = np.append(filler, self.full_target_mean)
|
||||
self.full_target_std = np.append(filler, self.full_target_std)
|
||||
|
||||
@@ -697,7 +732,7 @@ class FreqaiDataKitchen:
|
||||
# find the max indicator length required
|
||||
max_timeframe_chars = self.freqai_config.get('timeframes')[-1]
|
||||
max_period = self.freqai_config.get('feature_parameters', {}).get(
|
||||
'indicator_max_period', 20)
|
||||
'indicator_max_period', 50)
|
||||
additional_seconds = 0
|
||||
if max_timeframe_chars[-1] == 'd':
|
||||
additional_seconds = max_period * SECONDS_IN_DAY * int(max_timeframe_chars[-2])
|
||||
@@ -712,6 +747,8 @@ class FreqaiDataKitchen:
|
||||
logger.warning('FreqAI could not detect max timeframe and therefore may not '
|
||||
'download the proper amount of data for training')
|
||||
|
||||
logger.info(f'Extending data download by {additional_seconds/SECONDS_IN_DAY:.2f} days')
|
||||
|
||||
if trained_timestamp != 0:
|
||||
elapsed_time = (time - trained_timestamp) / SECONDS_IN_DAY
|
||||
retrain = elapsed_time > self.freqai_config.get('backtest_period')
|
||||
@@ -737,6 +774,14 @@ class FreqaiDataKitchen:
|
||||
data_load_timerange.stopts = int(time)
|
||||
retrain = True
|
||||
|
||||
# logger.info(
|
||||
# f'Total data download needed '
|
||||
# f'{(data_load_timerange.stopts - data_load_timerange.startts)/SECONDS_IN_DAY:.2f}'
|
||||
# ' days')
|
||||
# logger.info(f'Total training timerange '
|
||||
# f'{(trained_timerange.stopts - trained_timerange.startts)/SECONDS_IN_DAY} '
|
||||
# ' days')
|
||||
|
||||
# if retrain:
|
||||
# coin, _ = metadata['pair'].split("/")
|
||||
# # set the new data_path
|
||||
@@ -765,61 +810,194 @@ class FreqaiDataKitchen:
|
||||
# enables persistence, but not fully implemented into save/load data yer
|
||||
# self.data['live_trained_timerange'] = str(int(trained_timerange.stopts))
|
||||
|
||||
def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict,
|
||||
strategy: IStrategy) -> None:
|
||||
# SUPERCEDED
|
||||
# def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict,
|
||||
# strategy: IStrategy) -> None:
|
||||
|
||||
# exchange = ExchangeResolver.load_exchange(self.config['exchange']['name'],
|
||||
# self.config, validate=False, freqai=True)
|
||||
# # exchange = strategy.dp._exchange # closes ccxt session
|
||||
# pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
|
||||
# if str(metadata['pair']) not in pairs:
|
||||
# pairs.append(str(metadata['pair']))
|
||||
|
||||
# refresh_backtest_ohlcv_data(
|
||||
# exchange, pairs=pairs, timeframes=self.freqai_config.get('timeframes'),
|
||||
# datadir=self.config['datadir'], timerange=timerange,
|
||||
# new_pairs_days=self.config['new_pairs_days'],
|
||||
# erase=False, data_format=self.config.get('dataformat_ohlcv', 'json'),
|
||||
# trading_mode=self.config.get('trading_mode', 'spot'),
|
||||
# prepend=self.config.get('prepend_data', False)
|
||||
# )
|
||||
|
||||
def download_all_data_for_training(self, timerange: TimeRange) -> None:
|
||||
"""
|
||||
Called only once upon start of bot to download the necessary data for
|
||||
populating indicators and training the model.
|
||||
:params:
|
||||
timerange: TimeRange = The full data timerange for populating the indicators
|
||||
and training the model.
|
||||
"""
|
||||
exchange = ExchangeResolver.load_exchange(self.config['exchange']['name'],
|
||||
self.config, validate=False, freqai=True)
|
||||
# exchange = strategy.dp._exchange # closes ccxt session
|
||||
pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
|
||||
if str(metadata['pair']) not in pairs:
|
||||
pairs.append(str(metadata['pair']))
|
||||
|
||||
new_pairs_days = int((timerange.stopts - timerange.startts) / SECONDS_IN_DAY)
|
||||
|
||||
refresh_backtest_ohlcv_data(
|
||||
exchange, pairs=pairs, timeframes=self.freqai_config.get('timeframes'),
|
||||
exchange, pairs=self.all_pairs,
|
||||
timeframes=self.freqai_config.get('timeframes'),
|
||||
datadir=self.config['datadir'], timerange=timerange,
|
||||
new_pairs_days=self.config['new_pairs_days'],
|
||||
new_pairs_days=new_pairs_days,
|
||||
erase=False, data_format=self.config.get('dataformat_ohlcv', 'json'),
|
||||
trading_mode=self.config.get('trading_mode', 'spot'),
|
||||
prepend=self.config.get('prepend_data', False)
|
||||
)
|
||||
|
||||
def load_pairs_histories(self, timerange: TimeRange, metadata: dict) -> Tuple[Dict[Any, Any],
|
||||
DataFrame]:
|
||||
corr_dataframes: Dict[Any, Any] = {}
|
||||
base_dataframes: Dict[Any, Any] = {}
|
||||
pairs = self.freqai_config.get('corr_pairlist', []) # + [metadata['pair']]
|
||||
# timerange = TimeRange.parse_timerange(new_timerange)
|
||||
def update_historic_data(self, strategy: IStrategy) -> None:
|
||||
"""
|
||||
Append new candles to our stores historic data (in memory) so that
|
||||
we do not need to load candle history from disk and we dont need to
|
||||
pinging exchange multiple times for the same candle.
|
||||
:params:
|
||||
dataframe: DataFrame = strategy provided dataframe
|
||||
"""
|
||||
|
||||
for tf in self.freqai_config.get('timeframes'):
|
||||
base_dataframes[tf] = load_pair_history(datadir=self.config['datadir'],
|
||||
timeframe=tf,
|
||||
pair=metadata['pair'], timerange=timerange,
|
||||
data_format=self.config.get(
|
||||
'dataformat_ohlcv', 'json'),
|
||||
candle_type=self.config.get(
|
||||
'trading_mode', 'spot'))
|
||||
if pairs:
|
||||
for p in pairs:
|
||||
if metadata['pair'] in p:
|
||||
continue # dont repeat anything from whitelist
|
||||
if p not in corr_dataframes:
|
||||
corr_dataframes[p] = {}
|
||||
corr_dataframes[p][tf] = load_pair_history(datadir=self.config['datadir'],
|
||||
timeframe=tf,
|
||||
pair=p, timerange=timerange,
|
||||
data_format=self.config.get(
|
||||
'dataformat_ohlcv', 'json'),
|
||||
candle_type=self.config.get(
|
||||
'trading_mode', 'spot'))
|
||||
with self.data_drawer.history_lock:
|
||||
history_data = self.data_drawer.historic_data
|
||||
|
||||
for pair in self.all_pairs:
|
||||
for tf in self.freqai_config.get('timeframes'):
|
||||
|
||||
# check if newest candle is already appended
|
||||
df_dp = strategy.dp.get_pair_dataframe(pair, tf)
|
||||
if (
|
||||
str(history_data[pair][tf].iloc[-1]['date']) ==
|
||||
str(df_dp.iloc[-1:]['date'].iloc[-1])
|
||||
):
|
||||
continue
|
||||
|
||||
index = df_dp.loc[
|
||||
df_dp['date'] ==
|
||||
history_data[pair][tf].iloc[-1]['date']
|
||||
].index[0] + 1
|
||||
history_data[pair][tf] = pd.concat(
|
||||
[history_data[pair][tf],
|
||||
strategy.dp.get_pair_dataframe(pair, tf).iloc[index:]],
|
||||
ignore_index=True, axis=0
|
||||
)
|
||||
|
||||
logger.info(f'Length of history data {len(history_data[pair][tf])}')
|
||||
|
||||
def set_all_pairs(self) -> None:
|
||||
|
||||
self.all_pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
|
||||
for pair in self.config.get('exchange', '').get('pair_whitelist'):
|
||||
if pair not in self.all_pairs:
|
||||
self.all_pairs.append(pair)
|
||||
|
||||
def load_all_pair_histories(self, timerange: TimeRange) -> None:
|
||||
"""
|
||||
Load pair histories for all whitelist and corr_pairlist pairs.
|
||||
Only called once upon startup of bot.
|
||||
:params:
|
||||
timerange: TimeRange = full timerange required to populate all indicators
|
||||
for training according to user defined train_period
|
||||
"""
|
||||
history_data = self.data_drawer.historic_data
|
||||
|
||||
for pair in self.all_pairs:
|
||||
if pair not in history_data:
|
||||
history_data[pair] = {}
|
||||
for tf in self.freqai_config.get('timeframes'):
|
||||
history_data[pair][tf] = load_pair_history(datadir=self.config['datadir'],
|
||||
timeframe=tf,
|
||||
pair=pair, timerange=timerange,
|
||||
data_format=self.config.get(
|
||||
'dataformat_ohlcv', 'json'),
|
||||
candle_type=self.config.get(
|
||||
'trading_mode', 'spot'))
|
||||
|
||||
def get_base_and_corr_dataframes(self, timerange: TimeRange,
|
||||
metadata: dict) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
|
||||
"""
|
||||
Searches through our historic_data in memory and returns the dataframes relevant
|
||||
to the present pair.
|
||||
:params:
|
||||
timerange: TimeRange = full timerange required to populate all indicators
|
||||
for training according to user defined train_period
|
||||
metadata: dict = strategy furnished pair metadata
|
||||
"""
|
||||
with self.data_drawer.history_lock:
|
||||
corr_dataframes: Dict[Any, Any] = {}
|
||||
base_dataframes: Dict[Any, Any] = {}
|
||||
historic_data = self.data_drawer.historic_data
|
||||
pairs = self.freqai_config.get('corr_pairlist', [])
|
||||
|
||||
for tf in self.freqai_config.get('timeframes'):
|
||||
base_dataframes[tf] = self.slice_dataframe(
|
||||
timerange,
|
||||
historic_data[metadata['pair']][tf]
|
||||
)
|
||||
if pairs:
|
||||
for p in pairs:
|
||||
if metadata['pair'] in p:
|
||||
continue # dont repeat anything from whitelist
|
||||
if p not in corr_dataframes:
|
||||
corr_dataframes[p] = {}
|
||||
corr_dataframes[p][tf] = self.slice_dataframe(timerange,
|
||||
historic_data[p][tf])
|
||||
|
||||
return corr_dataframes, base_dataframes
|
||||
|
||||
# SUPERCEDED
|
||||
# def load_pairs_histories(self, timerange: TimeRange, metadata: dict) -> Tuple[Dict[Any, Any],
|
||||
# DataFrame]:
|
||||
# corr_dataframes: Dict[Any, Any] = {}
|
||||
# base_dataframes: Dict[Any, Any] = {}
|
||||
# pairs = self.freqai_config.get('corr_pairlist', []) # + [metadata['pair']]
|
||||
# # timerange = TimeRange.parse_timerange(new_timerange)
|
||||
|
||||
# for tf in self.freqai_config.get('timeframes'):
|
||||
# base_dataframes[tf] = load_pair_history(datadir=self.config['datadir'],
|
||||
# timeframe=tf,
|
||||
# pair=metadata['pair'], timerange=timerange,
|
||||
# data_format=self.config.get(
|
||||
# 'dataformat_ohlcv', 'json'),
|
||||
# candle_type=self.config.get(
|
||||
# 'trading_mode', 'spot'))
|
||||
# if pairs:
|
||||
# for p in pairs:
|
||||
# if metadata['pair'] in p:
|
||||
# continue # dont repeat anything from whitelist
|
||||
# if p not in corr_dataframes:
|
||||
# corr_dataframes[p] = {}
|
||||
# corr_dataframes[p][tf] = load_pair_history(datadir=self.config['datadir'],
|
||||
# timeframe=tf,
|
||||
# pair=p, timerange=timerange,
|
||||
# data_format=self.config.get(
|
||||
# 'dataformat_ohlcv', 'json'),
|
||||
# candle_type=self.config.get(
|
||||
# 'trading_mode', 'spot'))
|
||||
|
||||
# return corr_dataframes, base_dataframes
|
||||
|
||||
def use_strategy_to_populate_indicators(self, strategy: IStrategy,
|
||||
corr_dataframes: dict,
|
||||
base_dataframes: dict,
|
||||
metadata: dict) -> DataFrame:
|
||||
|
||||
"""
|
||||
Use the user defined strategy for populating indicators during
|
||||
retrain
|
||||
:params:
|
||||
strategy: IStrategy = user defined strategy object
|
||||
corr_dataframes: dict = dict containing the informative pair dataframes
|
||||
(for user defined timeframes)
|
||||
base_dataframes: dict = dict containing the current pair dataframes
|
||||
(for user defined timeframes)
|
||||
metadata: dict = strategy furnished pair metadata
|
||||
:returns:
|
||||
dataframe: DataFrame = dataframe containing populated indicators
|
||||
"""
|
||||
dataframe = base_dataframes[self.config['timeframe']].copy()
|
||||
pairs = self.freqai_config.get("corr_pairlist", [])
|
||||
|
||||
@@ -848,6 +1026,9 @@ class FreqaiDataKitchen:
|
||||
return dataframe
|
||||
|
||||
def fit_labels(self) -> None:
|
||||
"""
|
||||
Fit the labels with a gaussian distribution
|
||||
"""
|
||||
import scipy as spy
|
||||
|
||||
f = spy.stats.norm.fit(self.data_dictionary["train_labels"])
|
||||
|
Reference in New Issue
Block a user