642 lines
26 KiB
Python
642 lines
26 KiB
Python
import collections
|
|
import json
|
|
import logging
|
|
import re
|
|
import shutil
|
|
import threading
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Tuple
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from joblib import dump, load
|
|
from joblib.externals import cloudpickle
|
|
from numpy.typing import ArrayLike, NDArray
|
|
from pandas import DataFrame
|
|
|
|
from freqtrade.configuration import TimeRange
|
|
from freqtrade.data.history import load_pair_history
|
|
from freqtrade.exceptions import OperationalException
|
|
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
|
|
from freqtrade.strategy.interface import IStrategy
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FreqaiDataDrawer:
|
|
"""
|
|
Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving
|
|
/loading to/from disk.
|
|
This object remains persistent throughout live/dry, unlike FreqaiDataKitchen, which is
|
|
reinstantiated for each coin.
|
|
|
|
Record of contribution:
|
|
FreqAI was developed by a group of individuals who all contributed specific skillsets to the
|
|
project.
|
|
|
|
Conception and software development:
|
|
Robert Caulk @robcaulk
|
|
|
|
Theoretical brainstorming:
|
|
Elin Törnquist @th0rntwig
|
|
|
|
Code review, software architecture brainstorming:
|
|
@xmatthias
|
|
|
|
Beta testing and bug reporting:
|
|
@bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
|
|
Juha Nykänen @suikula, Wagner Costa @wagnercosta
|
|
"""
|
|
|
|
def __init__(self, full_path: Path, config: dict, follow_mode: bool = False):
|
|
|
|
self.config = config
|
|
self.freqai_info = config.get("freqai", {})
|
|
# dictionary holding all pair metadata necessary to load in from disk
|
|
self.pair_dict: Dict[str, Any] = {}
|
|
# dictionary holding all actively inferenced models in memory given a model filename
|
|
self.model_dictionary: Dict[str, Any] = {}
|
|
self.model_return_values: Dict[str, Any] = {}
|
|
self.pair_data_dict: Dict[str, Any] = {}
|
|
self.historic_data: Dict[str, Any] = {}
|
|
self.historic_predictions: Dict[str, Any] = {}
|
|
self.follower_dict: Dict[str, Any] = {}
|
|
self.full_path = full_path
|
|
self.follower_name: str = self.config.get("bot_name", "follower1")
|
|
self.follower_dict_path = Path(
|
|
self.full_path / f"follower_dictionary-{self.follower_name}.json"
|
|
)
|
|
self.historic_predictions_path = Path(self.full_path / "historic_predictions.pkl")
|
|
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
|
|
self.follow_mode = follow_mode
|
|
if follow_mode:
|
|
self.create_follower_dict()
|
|
self.load_drawer_from_disk()
|
|
self.load_historic_predictions_from_disk()
|
|
self.training_queue: Dict[str, int] = {}
|
|
self.history_lock = threading.Lock()
|
|
self.old_DBSCAN_eps: Dict[str, float] = {}
|
|
|
|
def load_drawer_from_disk(self):
|
|
"""
|
|
Locate and load a previously saved data drawer full of all pair model metadata in
|
|
present model folder.
|
|
:return: bool - whether or not the drawer was located
|
|
"""
|
|
exists = self.pair_dictionary_path.is_file()
|
|
if exists:
|
|
with open(self.pair_dictionary_path, "r") as fp:
|
|
self.pair_dict = json.load(fp)
|
|
elif not self.follow_mode:
|
|
logger.info("Could not find existing datadrawer, starting from scratch")
|
|
else:
|
|
logger.warning(
|
|
f"Follower could not find pair_dictionary at {self.full_path} "
|
|
"sending null values back to strategy"
|
|
)
|
|
|
|
return exists
|
|
|
|
def load_historic_predictions_from_disk(self):
|
|
"""
|
|
Locate and load a previously saved historic predictions.
|
|
:return: bool - whether or not the drawer was located
|
|
"""
|
|
exists = self.historic_predictions_path.is_file()
|
|
if exists:
|
|
with open(self.historic_predictions_path, "rb") as fp:
|
|
self.historic_predictions = cloudpickle.load(fp)
|
|
logger.info(
|
|
f"Found existing historic predictions at {self.full_path}, but beware "
|
|
"that statistics may be inaccurate if the bot has been offline for "
|
|
"an extended period of time."
|
|
)
|
|
elif not self.follow_mode:
|
|
logger.info("Could not find existing historic_predictions, starting from scratch")
|
|
else:
|
|
logger.warning(
|
|
f"Follower could not find historic predictions at {self.full_path} "
|
|
"sending null values back to strategy"
|
|
)
|
|
|
|
return exists
|
|
|
|
def save_historic_predictions_to_disk(self):
|
|
"""
|
|
Save data drawer full of all pair model metadata in present model folder.
|
|
"""
|
|
with open(self.historic_predictions_path, "wb") as fp:
|
|
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
|
|
|
|
def save_drawer_to_disk(self):
|
|
"""
|
|
Save data drawer full of all pair model metadata in present model folder.
|
|
"""
|
|
with open(self.pair_dictionary_path, "w") as fp:
|
|
json.dump(self.pair_dict, fp, default=self.np_encoder)
|
|
|
|
def save_follower_dict_to_disk(self):
|
|
"""
|
|
Save follower dictionary to disk (used by strategy for persistent prediction targets)
|
|
"""
|
|
with open(self.follower_dict_path, "w") as fp:
|
|
json.dump(self.follower_dict, fp, default=self.np_encoder)
|
|
|
|
def create_follower_dict(self):
|
|
"""
|
|
Create or dictionary for each follower to maintain unique persistent prediction targets
|
|
"""
|
|
|
|
whitelist_pairs = self.config.get("exchange", {}).get("pair_whitelist")
|
|
|
|
exists = self.follower_dict_path.is_file()
|
|
|
|
if exists:
|
|
logger.info("Found an existing follower dictionary")
|
|
|
|
for pair in whitelist_pairs:
|
|
self.follower_dict[pair] = {}
|
|
|
|
self.save_follower_dict_to_disk()
|
|
|
|
def np_encoder(self, object):
|
|
if isinstance(object, np.generic):
|
|
return object.item()
|
|
|
|
def get_pair_dict_info(self, pair: str) -> Tuple[str, int, bool]:
|
|
"""
|
|
Locate and load existing model metadata from persistent storage. If not located,
|
|
create a new one and append the current pair to it and prepare it for its first
|
|
training
|
|
:param pair: str: pair to lookup
|
|
:return:
|
|
model_filename: str = unique filename used for loading persistent objects from disk
|
|
trained_timestamp: int = the last time the coin was trained
|
|
return_null_array: bool = Follower could not find pair metadata
|
|
"""
|
|
pair_dict = self.pair_dict.get(pair)
|
|
data_path_set = self.pair_dict.get(pair, {}).get("data_path", None)
|
|
return_null_array = False
|
|
|
|
if pair_dict:
|
|
model_filename = pair_dict["model_filename"]
|
|
trained_timestamp = pair_dict["trained_timestamp"]
|
|
elif not self.follow_mode:
|
|
pair_dict = self.pair_dict[pair] = {}
|
|
model_filename = pair_dict["model_filename"] = ""
|
|
trained_timestamp = pair_dict["trained_timestamp"] = 0
|
|
pair_dict["priority"] = len(self.pair_dict)
|
|
|
|
if not data_path_set and self.follow_mode:
|
|
logger.warning(
|
|
f"Follower could not find current pair {pair} in "
|
|
f"pair_dictionary at path {self.full_path}, sending null values "
|
|
"back to strategy."
|
|
)
|
|
trained_timestamp = 0
|
|
model_filename = ''
|
|
return_null_array = True
|
|
|
|
return model_filename, trained_timestamp, return_null_array
|
|
|
|
def set_pair_dict_info(self, metadata: dict) -> None:
|
|
pair_in_dict = self.pair_dict.get(metadata["pair"])
|
|
if pair_in_dict:
|
|
return
|
|
else:
|
|
self.pair_dict[metadata["pair"]] = {}
|
|
self.pair_dict[metadata["pair"]]["model_filename"] = ""
|
|
self.pair_dict[metadata["pair"]]["first"] = True
|
|
self.pair_dict[metadata["pair"]]["trained_timestamp"] = 0
|
|
self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict)
|
|
return
|
|
|
|
def pair_to_end_of_training_queue(self, pair: str) -> None:
|
|
# march all pairs up in the queue
|
|
for p in self.pair_dict:
|
|
self.pair_dict[p]["priority"] -= 1
|
|
# send pair to end of queue
|
|
self.pair_dict[pair]["priority"] = len(self.pair_dict)
|
|
|
|
def set_initial_return_values(self, pair: str, dk: FreqaiDataKitchen,
|
|
pred_df: DataFrame, do_preds: ArrayLike) -> None:
|
|
"""
|
|
Set the initial return values to a persistent dataframe. This avoids needing to repredict on
|
|
historical candles, and also stores historical predictions despite retrainings (so stored
|
|
predictions are true predictions, not just inferencing on trained data)
|
|
"""
|
|
|
|
# dynamic df returned to strategy and plotted in frequi
|
|
mrv_df = self.model_return_values[pair] = pd.DataFrame()
|
|
|
|
# if user reused `identifier` in config and has historical predictions collected, load them
|
|
# so that frequi remains uninterrupted after a crash
|
|
hist_df = self.historic_predictions
|
|
if pair in hist_df:
|
|
len_diff = len(hist_df[pair].index) - len(pred_df.index)
|
|
if len_diff < 0:
|
|
df_concat = pd.concat([pred_df.iloc[:abs(len_diff)], hist_df[pair]],
|
|
ignore_index=True, keys=hist_df[pair].keys())
|
|
else:
|
|
df_concat = hist_df[pair].tail(len(pred_df.index)).reset_index(drop=True)
|
|
df_concat = df_concat.fillna(0)
|
|
self.model_return_values[pair] = df_concat
|
|
logger.info(f'Setting initial FreqUI plots from historical data for {pair}.')
|
|
|
|
else:
|
|
for label in pred_df.columns:
|
|
mrv_df[label] = pred_df[label]
|
|
if mrv_df[label].dtype == object:
|
|
continue
|
|
mrv_df[f"{label}_mean"] = dk.data["labels_mean"][label]
|
|
mrv_df[f"{label}_std"] = dk.data["labels_std"][label]
|
|
|
|
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
|
|
mrv_df["DI_values"] = dk.DI_values
|
|
|
|
mrv_df["do_predict"] = do_preds
|
|
|
|
if dk.data['extra_returns_per_train']:
|
|
rets = dk.data['extra_returns_per_train']
|
|
for return_str in rets:
|
|
mrv_df[return_str] = rets[return_str]
|
|
|
|
# for keras type models, the conv_window needs to be prepended so
|
|
# viewing is correct in frequi
|
|
if self.freqai_info.get('keras', False):
|
|
n_lost_points = self.freqai_info.get('conv_width', 2)
|
|
zeros_df = DataFrame(np.zeros((n_lost_points, len(mrv_df.columns))),
|
|
columns=mrv_df.columns)
|
|
self.model_return_values[pair] = pd.concat(
|
|
[zeros_df, mrv_df], axis=0, ignore_index=True)
|
|
|
|
def append_model_predictions(self, pair: str, predictions: DataFrame,
|
|
do_preds: NDArray[np.int_],
|
|
dk: FreqaiDataKitchen, len_df: int) -> None:
|
|
|
|
# strat seems to feed us variable sized dataframes - and since we are trying to build our
|
|
# own return array in the same shape, we need to figure out how the size has changed
|
|
# and adapt our stored/returned info accordingly.
|
|
|
|
length_difference = len(self.model_return_values[pair]) - len_df
|
|
i = 0
|
|
|
|
if length_difference == 0:
|
|
i = 1
|
|
elif length_difference > 0:
|
|
i = length_difference + 1
|
|
|
|
df = self.model_return_values[pair] = self.model_return_values[pair].shift(-i)
|
|
|
|
if pair in self.historic_predictions:
|
|
hp_df = self.historic_predictions[pair]
|
|
# here are some pandas hula hoops to accommodate the possibility of a series
|
|
# or dataframe depending number of labels requested by user
|
|
nan_df = pd.DataFrame(np.nan, index=hp_df.index[-2:] + 2, columns=hp_df.columns)
|
|
hp_df = pd.concat([hp_df, nan_df], ignore_index=True, axis=0)
|
|
self.historic_predictions[pair] = hp_df[:-1]
|
|
|
|
# incase user adds additional "predictions" e.g. predict_proba output:
|
|
for label in predictions.columns:
|
|
df[label].iloc[-1] = predictions[label].iloc[-1]
|
|
if df[label].dtype == object:
|
|
continue
|
|
df[f"{label}_mean"].iloc[-1] = dk.data["labels_mean"][label]
|
|
df[f"{label}_std"].iloc[-1] = dk.data["labels_std"][label]
|
|
|
|
df["do_predict"].iloc[-1] = do_preds[-1]
|
|
|
|
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
|
|
df["DI_values"].iloc[-1] = dk.DI_values[-1]
|
|
|
|
if dk.data['extra_returns_per_train']:
|
|
rets = dk.data['extra_returns_per_train']
|
|
for return_str in rets:
|
|
df[return_str].iloc[-1] = rets[return_str]
|
|
|
|
# append the new predictions to persistent storage
|
|
if pair in self.historic_predictions:
|
|
for key in df.keys():
|
|
self.historic_predictions[pair][key].iloc[-1] = df[key].iloc[-1]
|
|
|
|
if length_difference < 0:
|
|
prepend_df = pd.DataFrame(
|
|
np.zeros((abs(length_difference) - 1, len(df.columns))), columns=df.columns
|
|
)
|
|
df = pd.concat([prepend_df, df], axis=0)
|
|
|
|
def attach_return_values_to_return_dataframe(
|
|
self, pair: str, dataframe: DataFrame) -> DataFrame:
|
|
"""
|
|
Attach the return values to the strat dataframe
|
|
:param dataframe: DataFrame = strategy dataframe
|
|
:return: DataFrame = strat dataframe with return values attached
|
|
"""
|
|
df = self.model_return_values[pair]
|
|
to_keep = [col for col in dataframe.columns if not col.startswith("&")]
|
|
dataframe = pd.concat([dataframe[to_keep], df], axis=1)
|
|
return dataframe
|
|
|
|
def return_null_values_to_strategy(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> None:
|
|
"""
|
|
Build 0 filled dataframe to return to strategy
|
|
"""
|
|
|
|
dk.find_features(dataframe)
|
|
|
|
if self.freqai_info.get('predict_proba', []):
|
|
full_labels = dk.label_list + self.freqai_info['predict_proba']
|
|
else:
|
|
full_labels = dk.label_list
|
|
|
|
for label in full_labels:
|
|
dataframe[label] = 0
|
|
dataframe[f"{label}_mean"] = 0
|
|
dataframe[f"{label}_std"] = 0
|
|
|
|
dataframe["do_predict"] = 0
|
|
|
|
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
|
|
dataframe["DI_values"] = 0
|
|
|
|
if dk.data['extra_returns_per_train']:
|
|
rets = dk.data['extra_returns_per_train']
|
|
for return_str in rets:
|
|
dataframe[return_str] = 0
|
|
|
|
dk.return_dataframe = dataframe
|
|
|
|
def purge_old_models(self) -> None:
|
|
|
|
model_folders = [x for x in self.full_path.iterdir() if x.is_dir()]
|
|
|
|
pattern = re.compile(r"sub-train-(\w+)_(\d{10})")
|
|
|
|
delete_dict: Dict[str, Any] = {}
|
|
|
|
for dir in model_folders:
|
|
result = pattern.match(str(dir.name))
|
|
if result is None:
|
|
break
|
|
coin = result.group(1)
|
|
timestamp = result.group(2)
|
|
|
|
if coin not in delete_dict:
|
|
delete_dict[coin] = {}
|
|
delete_dict[coin]["num_folders"] = 1
|
|
delete_dict[coin]["timestamps"] = {int(timestamp): dir}
|
|
else:
|
|
delete_dict[coin]["num_folders"] += 1
|
|
delete_dict[coin]["timestamps"][int(timestamp)] = dir
|
|
|
|
for coin in delete_dict:
|
|
if delete_dict[coin]["num_folders"] > 2:
|
|
sorted_dict = collections.OrderedDict(
|
|
sorted(delete_dict[coin]["timestamps"].items())
|
|
)
|
|
num_delete = len(sorted_dict) - 2
|
|
deleted = 0
|
|
for k, v in sorted_dict.items():
|
|
if deleted >= num_delete:
|
|
break
|
|
logger.info(f"Freqai purging old model file {v}")
|
|
shutil.rmtree(v)
|
|
deleted += 1
|
|
|
|
def update_follower_metadata(self):
|
|
# follower needs to load from disk to get any changes made by leader to pair_dict
|
|
self.load_drawer_from_disk()
|
|
if self.config.get("freqai", {}).get("purge_old_models", False):
|
|
self.purge_old_models()
|
|
|
|
# Functions pulled back from FreqaiDataKitchen because they relied on DataDrawer
|
|
|
|
def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
|
|
"""
|
|
Saves all data associated with a model for a single sub-train time range
|
|
:params:
|
|
:model: User trained model which can be reused for inferencing to generate
|
|
predictions
|
|
"""
|
|
|
|
if not dk.data_path.is_dir():
|
|
dk.data_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
save_path = Path(dk.data_path)
|
|
|
|
# Save the trained model
|
|
if not dk.keras:
|
|
dump(model, save_path / f"{dk.model_filename}_model.joblib")
|
|
else:
|
|
model.save(save_path / f"{dk.model_filename}_model.h5")
|
|
|
|
if dk.svm_model is not None:
|
|
dump(dk.svm_model, save_path / f"{dk.model_filename}_svm_model.joblib")
|
|
|
|
dk.data["data_path"] = str(dk.data_path)
|
|
dk.data["model_filename"] = str(dk.model_filename)
|
|
dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
|
|
dk.data["label_list"] = dk.label_list
|
|
# store the metadata
|
|
with open(save_path / f"{dk.model_filename}_metadata.json", "w") as fp:
|
|
json.dump(dk.data, fp, default=dk.np_encoder)
|
|
|
|
# save the train data to file so we can check preds for area of applicability later
|
|
dk.data_dictionary["train_features"].to_pickle(
|
|
save_path / f"{dk.model_filename}_trained_df.pkl"
|
|
)
|
|
|
|
if self.freqai_info["feature_parameters"].get("principal_component_analysis"):
|
|
cloudpickle.dump(
|
|
dk.pca, open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "wb")
|
|
)
|
|
|
|
# if self.live:
|
|
self.model_dictionary[dk.model_filename] = model
|
|
self.pair_dict[coin]["model_filename"] = dk.model_filename
|
|
self.pair_dict[coin]["data_path"] = str(dk.data_path)
|
|
self.save_drawer_to_disk()
|
|
|
|
return
|
|
|
|
def load_data(self, coin: str, dk: FreqaiDataKitchen) -> Any:
|
|
"""
|
|
loads all data required to make a prediction on a sub-train time range
|
|
:returns:
|
|
:model: User trained model which can be inferenced for new predictions
|
|
"""
|
|
|
|
if not self.pair_dict[coin]["model_filename"]:
|
|
return None
|
|
|
|
if dk.live:
|
|
dk.model_filename = self.pair_dict[coin]["model_filename"]
|
|
dk.data_path = Path(self.pair_dict[coin]["data_path"])
|
|
if self.freqai_info.get("follow_mode", False):
|
|
# follower can be on a different system which is rsynced from the leader:
|
|
dk.data_path = Path(
|
|
self.config["user_data_dir"]
|
|
/ "models"
|
|
/ dk.data_path.parts[-2]
|
|
/ dk.data_path.parts[-1]
|
|
)
|
|
|
|
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
|
dk.data = json.load(fp)
|
|
dk.training_features_list = dk.data["training_features_list"]
|
|
dk.label_list = dk.data["label_list"]
|
|
|
|
dk.data_dictionary["train_features"] = pd.read_pickle(
|
|
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
|
|
)
|
|
|
|
# try to access model in memory instead of loading object from disk to save time
|
|
if dk.live and dk.model_filename in self.model_dictionary:
|
|
model = self.model_dictionary[dk.model_filename]
|
|
elif not dk.keras:
|
|
model = load(dk.data_path / f"{dk.model_filename}_model.joblib")
|
|
else:
|
|
from tensorflow import keras
|
|
|
|
model = keras.models.load_model(dk.data_path / f"{dk.model_filename}_model.h5")
|
|
|
|
if Path(dk.data_path / f"{dk.model_filename}_svm_model.joblib").is_file():
|
|
dk.svm_model = load(dk.data_path / f"{dk.model_filename}_svm_model.joblib")
|
|
|
|
if not model:
|
|
raise OperationalException(
|
|
f"Unable to load model, ensure model exists at " f"{dk.data_path} "
|
|
)
|
|
|
|
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
|
|
dk.pca = cloudpickle.load(
|
|
open(dk.data_path / f"{dk.model_filename}_pca_object.pkl", "rb")
|
|
)
|
|
|
|
return model
|
|
|
|
def update_historic_data(self, strategy: IStrategy, dk: FreqaiDataKitchen) -> None:
|
|
"""
|
|
Append new candles to our stores historic data (in memory) so that
|
|
we do not need to load candle history from disk and we dont need to
|
|
pinging exchange multiple times for the same candle.
|
|
:params:
|
|
dataframe: DataFrame = strategy provided dataframe
|
|
"""
|
|
feat_params = self.freqai_info["feature_parameters"]
|
|
with self.history_lock:
|
|
history_data = self.historic_data
|
|
|
|
for pair in dk.all_pairs:
|
|
for tf in feat_params.get("include_timeframes"):
|
|
|
|
# check if newest candle is already appended
|
|
df_dp = strategy.dp.get_pair_dataframe(pair, tf)
|
|
if len(df_dp.index) == 0:
|
|
continue
|
|
if str(history_data[pair][tf].iloc[-1]["date"]) == str(
|
|
df_dp.iloc[-1:]["date"].iloc[-1]
|
|
):
|
|
continue
|
|
|
|
try:
|
|
index = (
|
|
df_dp.loc[
|
|
df_dp["date"] == history_data[pair][tf].iloc[-1]["date"]
|
|
].index[0]
|
|
+ 1
|
|
)
|
|
except IndexError:
|
|
logger.warning(
|
|
f"Unable to update pair history for {pair}. "
|
|
"If this does not resolve itself after 1 additional candle, "
|
|
"please report the error to #freqai discord channel"
|
|
)
|
|
return
|
|
|
|
history_data[pair][tf] = pd.concat(
|
|
[
|
|
history_data[pair][tf],
|
|
strategy.dp.get_pair_dataframe(pair, tf).iloc[index:],
|
|
],
|
|
ignore_index=True,
|
|
axis=0,
|
|
)
|
|
|
|
def load_all_pair_histories(self, timerange: TimeRange, dk: FreqaiDataKitchen) -> None:
|
|
"""
|
|
Load pair histories for all whitelist and corr_pairlist pairs.
|
|
Only called once upon startup of bot.
|
|
:params:
|
|
timerange: TimeRange = full timerange required to populate all indicators
|
|
for training according to user defined train_period_days
|
|
"""
|
|
history_data = self.historic_data
|
|
|
|
for pair in dk.all_pairs:
|
|
if pair not in history_data:
|
|
history_data[pair] = {}
|
|
for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
|
|
history_data[pair][tf] = load_pair_history(
|
|
datadir=self.config["datadir"],
|
|
timeframe=tf,
|
|
pair=pair,
|
|
timerange=timerange,
|
|
data_format=self.config.get("dataformat_ohlcv", "json"),
|
|
candle_type=self.config.get("trading_mode", "spot"),
|
|
)
|
|
|
|
def get_base_and_corr_dataframes(
|
|
self, timerange: TimeRange, pair: str, dk: FreqaiDataKitchen
|
|
) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
|
|
"""
|
|
Searches through our historic_data in memory and returns the dataframes relevant
|
|
to the present pair.
|
|
:params:
|
|
timerange: TimeRange = full timerange required to populate all indicators
|
|
for training according to user defined train_period_days
|
|
metadata: dict = strategy furnished pair metadata
|
|
"""
|
|
|
|
with self.history_lock:
|
|
corr_dataframes: Dict[Any, Any] = {}
|
|
base_dataframes: Dict[Any, Any] = {}
|
|
historic_data = self.historic_data
|
|
pairs = self.freqai_info["feature_parameters"].get(
|
|
"include_corr_pairlist", []
|
|
)
|
|
|
|
for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
|
|
base_dataframes[tf] = dk.slice_dataframe(timerange, historic_data[pair][tf])
|
|
if pairs:
|
|
for p in pairs:
|
|
if pair in p:
|
|
continue # dont repeat anything from whitelist
|
|
if p not in corr_dataframes:
|
|
corr_dataframes[p] = {}
|
|
corr_dataframes[p][tf] = dk.slice_dataframe(
|
|
timerange, historic_data[p][tf]
|
|
)
|
|
|
|
return corr_dataframes, base_dataframes
|
|
|
|
# to be used if we want to send predictions directly to the follower instead of forcing
|
|
# follower to load models and inference
|
|
# def save_model_return_values_to_disk(self) -> None:
|
|
# with open(self.full_path / str('model_return_values.json'), "w") as fp:
|
|
# json.dump(self.model_return_values, fp, default=self.np_encoder)
|
|
|
|
# def load_model_return_values_from_disk(self, dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
|
|
# exists = Path(self.full_path / str('model_return_values.json')).resolve().exists()
|
|
# if exists:
|
|
# with open(self.full_path / str('model_return_values.json'), "r") as fp:
|
|
# self.model_return_values = json.load(fp)
|
|
# elif not self.follow_mode:
|
|
# logger.info("Could not find existing datadrawer, starting from scratch")
|
|
# else:
|
|
# logger.warning(f'Follower could not find pair_dictionary at {self.full_path} '
|
|
# 'sending null values back to strategy')
|
|
|
|
# return exists, dk
|