rehaul of backend data management - increasing performance by holding history in memory, reducing load on the ratelimit by only pinging exchange once per candle. Improve code readability.

This commit is contained in:
robcaulk
2022-06-03 15:19:46 +02:00
parent 4ac6ef2972
commit 16b4a5b71f
5 changed files with 342 additions and 70 deletions

View File

@@ -25,9 +25,6 @@ from freqtrade.resolvers import ExchangeResolver
from freqtrade.strategy.interface import IStrategy
# import scipy as spy # used for auto distribution assignment
SECONDS_IN_DAY = 86400
logger = logging.getLogger(__name__)
@@ -52,6 +49,7 @@ class FreqaiDataKitchen:
self.target_std: npt.ArrayLike = np.array([])
self.full_predictions: npt.ArrayLike = np.array([])
self.full_do_predict: npt.ArrayLike = np.array([])
self.full_DI_values: npt.ArrayLike = np.array([])
self.full_target_mean: npt.ArrayLike = np.array([])
self.full_target_std: npt.ArrayLike = np.array([])
self.data_path = Path()
@@ -59,6 +57,7 @@ class FreqaiDataKitchen:
self.live = live
self.pair = pair
self.svm_model: linear_model.SGDOneClassSVM = None
self.set_all_pairs()
if not self.live:
self.full_timerange = self.create_fulltimerange(self.config["timerange"],
self.freqai_config.get("train_period")
@@ -73,6 +72,12 @@ class FreqaiDataKitchen:
self.data_drawer = data_drawer
def set_paths(self, metadata: dict, trained_timestamp: int = None,) -> None:
"""
Set the paths to the data for the present coin/botloop
:params:
metadata: dict = strategy furnished pair metadata
trained_timestamp: int = timestamp of most recent training
"""
self.full_path = Path(self.config['user_data_dir'] /
"models" /
str(self.freqai_config.get('identifier')))
@@ -514,6 +519,11 @@ class FreqaiDataKitchen:
return None
def pca_transform(self, filtered_dataframe: DataFrame) -> None:
"""
Use an existing pca transform to transform data into components
:params:
filtered_dataframe: DataFrame = the cleaned dataframe
"""
pca_components = self.pca.transform(filtered_dataframe)
self.data_dictionary["prediction_features"] = pd.DataFrame(
data=pca_components,
@@ -522,6 +532,11 @@ class FreqaiDataKitchen:
)
def compute_distances(self) -> float:
"""
Compute distances between each training point and every other training
point. This metric defines the neighborhood of trained data and is used
for prediction confidence in the Dissimilarity Index
"""
logger.info("computing average mean distance for all training points")
pairwise = pairwise_distances(self.data_dictionary["train_features"], n_jobs=-1)
avg_mean_dist = pairwise.mean(axis=1).mean()
@@ -530,6 +545,12 @@ class FreqaiDataKitchen:
return avg_mean_dist
def use_SVM_to_remove_outliers(self, predict: bool) -> None:
"""
Build/inference a Support Vector Machine to detect outliers
in training data and prediction
:params:
predict: bool = If true, inference an existing SVM model, else construct one
"""
if predict:
assert self.svm_model, "No svm model available for outlier removal"
@@ -580,6 +601,13 @@ class FreqaiDataKitchen:
return
def find_features(self, dataframe: DataFrame) -> list:
"""
Find features in the strategy provided dataframe
:params:
dataframe: DataFrame = strategy provided dataframe
:returns:
features: list = the features to be used for training/prediction
"""
column_names = dataframe.columns
features = [c for c in column_names if '%' in c]
if not features:
@@ -600,17 +628,19 @@ class FreqaiDataKitchen:
n_jobs=-1,
)
self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
do_predict = np.where(
distance.min(axis=0) / self.data["avg_mean_dist"]
self.DI_values
< self.freqai_config.get("feature_parameters", {}).get("DI_threshold"),
1,
0,
)
# logger.info(
# "Distance checker tossed %s predictions for being too far from training data",
# len(do_predict) - do_predict.sum(),
# )
logger.info(
"DI tossed %s predictions for being too far from training data",
len(do_predict) - do_predict.sum(),
)
self.do_predict += do_predict
self.do_predict -= 1
@@ -638,6 +668,7 @@ class FreqaiDataKitchen:
self.full_predictions = np.append(self.full_predictions, predictions)
self.full_do_predict = np.append(self.full_do_predict, do_predict)
self.full_DI_values = np.append(self.full_DI_values, self.DI_values)
self.full_target_mean = np.append(self.full_target_mean, target_mean)
self.full_target_std = np.append(self.full_target_std, target_std)
@@ -652,6 +683,7 @@ class FreqaiDataKitchen:
filler = np.zeros(len_dataframe - len(self.full_predictions)) # startup_candle_count
self.full_predictions = np.append(filler, self.full_predictions)
self.full_do_predict = np.append(filler, self.full_do_predict)
self.full_DI_values = np.append(filler, self.full_DI_values)
self.full_target_mean = np.append(filler, self.full_target_mean)
self.full_target_std = np.append(filler, self.full_target_std)
@@ -711,6 +743,8 @@ class FreqaiDataKitchen:
logger.warning('FreqAI could not detect max timeframe and therefore may not '
'download the proper amount of data for training')
# logger.info(f'Extending data download by {additional_seconds/SECONDS_IN_DAY} days')
if trained_timestamp != 0:
elapsed_time = (time - trained_timestamp) / SECONDS_IN_DAY
retrain = elapsed_time > self.freqai_config.get('backtest_period')
@@ -764,61 +798,176 @@ class FreqaiDataKitchen:
# enables persistence, but not fully implemented into save/load data yer
# self.data['live_trained_timerange'] = str(int(trained_timerange.stopts))
def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict,
strategy: IStrategy) -> None:
# SUPERCEDED
# def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict,
# strategy: IStrategy) -> None:
# exchange = ExchangeResolver.load_exchange(self.config['exchange']['name'],
# self.config, validate=False, freqai=True)
# # exchange = strategy.dp._exchange # closes ccxt session
# pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
# if str(metadata['pair']) not in pairs:
# pairs.append(str(metadata['pair']))
# refresh_backtest_ohlcv_data(
# exchange, pairs=pairs, timeframes=self.freqai_config.get('timeframes'),
# datadir=self.config['datadir'], timerange=timerange,
# new_pairs_days=self.config['new_pairs_days'],
# erase=False, data_format=self.config.get('dataformat_ohlcv', 'json'),
# trading_mode=self.config.get('trading_mode', 'spot'),
# prepend=self.config.get('prepend_data', False)
# )
def download_all_data_for_training(self, timerange: TimeRange) -> None:
"""
Called only once upon start of bot to download the necessary data for
populating indicators and training the model.
:params:
timerange: TimeRange = The full data timerange for populating the indicators
and training the model.
"""
exchange = ExchangeResolver.load_exchange(self.config['exchange']['name'],
self.config, validate=False, freqai=True)
# exchange = strategy.dp._exchange # closes ccxt session
pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
if str(metadata['pair']) not in pairs:
pairs.append(str(metadata['pair']))
new_pairs_days = int((timerange.stopts - timerange.startts) / SECONDS_IN_DAY)
refresh_backtest_ohlcv_data(
exchange, pairs=pairs, timeframes=self.freqai_config.get('timeframes'),
exchange, pairs=self.all_pairs,
timeframes=self.freqai_config.get('timeframes'),
datadir=self.config['datadir'], timerange=timerange,
new_pairs_days=self.config['new_pairs_days'],
new_pairs_days=new_pairs_days,
erase=False, data_format=self.config.get('dataformat_ohlcv', 'json'),
trading_mode=self.config.get('trading_mode', 'spot'),
prepend=self.config.get('prepend_data', False)
)
def load_pairs_histories(self, timerange: TimeRange, metadata: dict) -> Tuple[Dict[Any, Any],
DataFrame]:
def update_historic_data(self, strategy: IStrategy) -> None:
"""
Append new candles to our stores historic data (in memory) so that
we do not need to load candle history from disk and we dont need to
pinging exchange multiple times for the same candle.
:params:
dataframe: DataFrame = strategy provided dataframe
"""
history_data = self.data_drawer.historic_data
for pair in self.all_pairs:
for tf in self.freqai_config.get('timeframes'):
history_data[pair][tf] = pd.concat(
[history_data[pair][tf],
strategy.dp.get_pair_dataframe(pair, tf).iloc[-1]],
axis=0
)
def set_all_pairs(self) -> None:
self.all_pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
for pair in self.config.get('exchange', '').get('pair_whitelist'):
if pair not in self.all_pairs:
self.all_pairs.append(pair)
def load_all_pair_histories(self, timerange: TimeRange) -> None:
"""
Load pair histories for all whitelist and corr_pairlist pairs.
Only called once upon startup of bot.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period
"""
history_data = self.data_drawer.historic_data
for pair in self.all_pairs:
if pair not in history_data:
history_data[pair] = {}
for tf in self.freqai_config.get('timeframes'):
history_data[pair][tf] = load_pair_history(datadir=self.config['datadir'],
timeframe=tf,
pair=pair, timerange=timerange,
data_format=self.config.get(
'dataformat_ohlcv', 'json'),
candle_type=self.config.get(
'trading_mode', 'spot'))
def get_base_and_corr_dataframes(self, timerange: TimeRange,
metadata: dict) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
"""
Searches through our historic_data in memory and returns the dataframes relevant
to the present pair.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period
metadata: dict = strategy furnished pair metadata
"""
corr_dataframes: Dict[Any, Any] = {}
base_dataframes: Dict[Any, Any] = {}
pairs = self.freqai_config.get('corr_pairlist', []) # + [metadata['pair']]
# timerange = TimeRange.parse_timerange(new_timerange)
historic_data = self.data_drawer.historic_data
pairs = self.freqai_config.get('corr_pairlist', [])
for tf in self.freqai_config.get('timeframes'):
base_dataframes[tf] = load_pair_history(datadir=self.config['datadir'],
timeframe=tf,
pair=metadata['pair'], timerange=timerange,
data_format=self.config.get(
'dataformat_ohlcv', 'json'),
candle_type=self.config.get(
'trading_mode', 'spot'))
base_dataframes[tf] = self.slice_dataframe(
timerange,
historic_data[metadata['pair']][tf]
)
if pairs:
for p in pairs:
if metadata['pair'] in p:
continue # dont repeat anything from whitelist
if p not in corr_dataframes:
corr_dataframes[p] = {}
corr_dataframes[p][tf] = load_pair_history(datadir=self.config['datadir'],
timeframe=tf,
pair=p, timerange=timerange,
data_format=self.config.get(
'dataformat_ohlcv', 'json'),
candle_type=self.config.get(
'trading_mode', 'spot'))
corr_dataframes[p][tf] = self.slice_dataframe(timerange, historic_data[p][tf])
return corr_dataframes, base_dataframes
# SUPERCEDED
# def load_pairs_histories(self, timerange: TimeRange, metadata: dict) -> Tuple[Dict[Any, Any],
# DataFrame]:
# corr_dataframes: Dict[Any, Any] = {}
# base_dataframes: Dict[Any, Any] = {}
# pairs = self.freqai_config.get('corr_pairlist', []) # + [metadata['pair']]
# # timerange = TimeRange.parse_timerange(new_timerange)
# for tf in self.freqai_config.get('timeframes'):
# base_dataframes[tf] = load_pair_history(datadir=self.config['datadir'],
# timeframe=tf,
# pair=metadata['pair'], timerange=timerange,
# data_format=self.config.get(
# 'dataformat_ohlcv', 'json'),
# candle_type=self.config.get(
# 'trading_mode', 'spot'))
# if pairs:
# for p in pairs:
# if metadata['pair'] in p:
# continue # dont repeat anything from whitelist
# if p not in corr_dataframes:
# corr_dataframes[p] = {}
# corr_dataframes[p][tf] = load_pair_history(datadir=self.config['datadir'],
# timeframe=tf,
# pair=p, timerange=timerange,
# data_format=self.config.get(
# 'dataformat_ohlcv', 'json'),
# candle_type=self.config.get(
# 'trading_mode', 'spot'))
# return corr_dataframes, base_dataframes
def use_strategy_to_populate_indicators(self, strategy: IStrategy,
corr_dataframes: dict,
base_dataframes: dict,
metadata: dict) -> DataFrame:
"""
Use the user defined strategy for populating indicators during
retrain
:params:
strategy: IStrategy = user defined strategy object
corr_dataframes: dict = dict containing the informative pair dataframes
(for user defined timeframes)
base_dataframes: dict = dict containing the current pair dataframes
(for user defined timeframes)
metadata: dict = strategy furnished pair metadata
:returns:
dataframe: DataFrame = dataframe containing populated indicators
"""
dataframe = base_dataframes[self.config['timeframe']].copy()
pairs = self.freqai_config.get("corr_pairlist", [])
@@ -847,6 +996,9 @@ class FreqaiDataKitchen:
return dataframe
def fit_labels(self) -> None:
"""
Fit the labels with a gaussian distribution
"""
import scipy as spy
f = spy.stats.norm.fit(self.data_dictionary["train_labels"])