Merge branch 'develop' into backtest_live_models
This commit is contained in:
@@ -1,14 +1,15 @@
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
import threading
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import psutil
|
||||
import rapidjson
|
||||
from joblib import dump, load
|
||||
from joblib.externals import cloudpickle
|
||||
@@ -65,6 +66,8 @@ class FreqaiDataDrawer:
|
||||
self.pair_dict: Dict[str, pair_info] = {}
|
||||
# dictionary holding all actively inferenced models in memory given a model filename
|
||||
self.model_dictionary: Dict[str, Any] = {}
|
||||
# all additional metadata that we want to keep in ram
|
||||
self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {}
|
||||
self.model_return_values: Dict[str, DataFrame] = {}
|
||||
self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
|
||||
self.historic_predictions: Dict[str, DataFrame] = {}
|
||||
@@ -78,30 +81,60 @@ class FreqaiDataDrawer:
|
||||
self.historic_predictions_bkp_path = Path(
|
||||
self.full_path / "historic_predictions.backup.pkl")
|
||||
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
|
||||
self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
|
||||
self.follow_mode = follow_mode
|
||||
if follow_mode:
|
||||
self.create_follower_dict()
|
||||
self.load_drawer_from_disk()
|
||||
self.load_historic_predictions_from_disk()
|
||||
self.load_metric_tracker_from_disk()
|
||||
self.training_queue: Dict[str, int] = {}
|
||||
self.history_lock = threading.Lock()
|
||||
self.save_lock = threading.Lock()
|
||||
self.pair_dict_lock = threading.Lock()
|
||||
self.metric_tracker_lock = threading.Lock()
|
||||
self.old_DBSCAN_eps: Dict[str, float] = {}
|
||||
self.empty_pair_dict: pair_info = {
|
||||
"model_filename": "", "trained_timestamp": 0,
|
||||
"data_path": "", "extras": {}}
|
||||
self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
|
||||
|
||||
def update_metric_tracker(self, metric: str, value: float, pair: str) -> None:
|
||||
"""
|
||||
General utility for adding and updating custom metrics. Typically used
|
||||
for adding training performance, train timings, inferenc timings, cpu loads etc.
|
||||
"""
|
||||
with self.metric_tracker_lock:
|
||||
if pair not in self.metric_tracker:
|
||||
self.metric_tracker[pair] = {}
|
||||
if metric not in self.metric_tracker[pair]:
|
||||
self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []}
|
||||
|
||||
timestamp = int(datetime.now(timezone.utc).timestamp())
|
||||
self.metric_tracker[pair][metric]['value'].append(value)
|
||||
self.metric_tracker[pair][metric]['timestamp'].append(timestamp)
|
||||
|
||||
def collect_metrics(self, time_spent: float, pair: str):
|
||||
"""
|
||||
Add metrics to the metric tracker dictionary
|
||||
"""
|
||||
load1, load5, load15 = psutil.getloadavg()
|
||||
cpus = psutil.cpu_count()
|
||||
self.update_metric_tracker('train_time', time_spent, pair)
|
||||
self.update_metric_tracker('cpu_load1min', load1 / cpus, pair)
|
||||
self.update_metric_tracker('cpu_load5min', load5 / cpus, pair)
|
||||
self.update_metric_tracker('cpu_load15min', load15 / cpus, pair)
|
||||
|
||||
def load_drawer_from_disk(self):
|
||||
"""
|
||||
Locate and load a previously saved data drawer full of all pair model metadata in
|
||||
present model folder.
|
||||
:return: bool - whether or not the drawer was located
|
||||
Load any existing metric tracker that may be present.
|
||||
"""
|
||||
exists = self.pair_dictionary_path.is_file()
|
||||
if exists:
|
||||
with open(self.pair_dictionary_path, "r") as fp:
|
||||
self.pair_dict = json.load(fp)
|
||||
self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
elif not self.follow_mode:
|
||||
logger.info("Could not find existing datadrawer, starting from scratch")
|
||||
else:
|
||||
@@ -110,7 +143,18 @@ class FreqaiDataDrawer:
|
||||
"sending null values back to strategy"
|
||||
)
|
||||
|
||||
return exists
|
||||
def load_metric_tracker_from_disk(self):
|
||||
"""
|
||||
Tries to load an existing metrics dictionary if the user
|
||||
wants to collect metrics.
|
||||
"""
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
exists = self.metric_tracker_path.is_file()
|
||||
if exists:
|
||||
with open(self.metric_tracker_path, "r") as fp:
|
||||
self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
else:
|
||||
logger.info("Could not find existing metric tracker, starting from scratch")
|
||||
|
||||
def load_historic_predictions_from_disk(self):
|
||||
"""
|
||||
@@ -146,7 +190,7 @@ class FreqaiDataDrawer:
|
||||
|
||||
def save_historic_predictions_to_disk(self):
|
||||
"""
|
||||
Save data drawer full of all pair model metadata in present model folder.
|
||||
Save historic predictions pickle to disk
|
||||
"""
|
||||
with open(self.historic_predictions_path, "wb") as fp:
|
||||
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
|
||||
@@ -154,6 +198,15 @@ class FreqaiDataDrawer:
|
||||
# create a backup
|
||||
shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
|
||||
|
||||
def save_metric_tracker_to_disk(self):
|
||||
"""
|
||||
Save metric tracker of all pair metrics collected.
|
||||
"""
|
||||
with self.save_lock:
|
||||
with open(self.metric_tracker_path, 'w') as fp:
|
||||
rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder,
|
||||
number_mode=rapidjson.NM_NATIVE)
|
||||
|
||||
def save_drawer_to_disk(self):
|
||||
"""
|
||||
Save data drawer full of all pair model metadata in present model folder.
|
||||
@@ -453,9 +506,14 @@ class FreqaiDataDrawer:
|
||||
)
|
||||
|
||||
# if self.live:
|
||||
# store as much in ram as possible to increase performance
|
||||
self.model_dictionary[coin] = model
|
||||
self.pair_dict[coin]["model_filename"] = dk.model_filename
|
||||
self.pair_dict[coin]["data_path"] = str(dk.data_path)
|
||||
if coin not in self.meta_data_dictionary:
|
||||
self.meta_data_dictionary[coin] = {}
|
||||
self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"]
|
||||
self.meta_data_dictionary[coin]["meta_data"] = dk.data
|
||||
self.save_drawer_to_disk()
|
||||
|
||||
return
|
||||
@@ -466,7 +524,7 @@ class FreqaiDataDrawer:
|
||||
presaved backtesting (prediction file loading).
|
||||
"""
|
||||
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
||||
dk.data = json.load(fp)
|
||||
dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
dk.training_features_list = dk.data["training_features_list"]
|
||||
dk.label_list = dk.data["label_list"]
|
||||
|
||||
@@ -492,14 +550,19 @@ class FreqaiDataDrawer:
|
||||
/ dk.data_path.parts[-1]
|
||||
)
|
||||
|
||||
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
||||
dk.data = json.load(fp)
|
||||
dk.training_features_list = dk.data["training_features_list"]
|
||||
dk.label_list = dk.data["label_list"]
|
||||
if coin in self.meta_data_dictionary:
|
||||
dk.data = self.meta_data_dictionary[coin]["meta_data"]
|
||||
dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"]
|
||||
else:
|
||||
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
||||
dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
|
||||
dk.data_dictionary["train_features"] = pd.read_pickle(
|
||||
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
|
||||
)
|
||||
dk.data_dictionary["train_features"] = pd.read_pickle(
|
||||
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
|
||||
)
|
||||
|
||||
dk.training_features_list = dk.data["training_features_list"]
|
||||
dk.label_list = dk.data["label_list"]
|
||||
|
||||
# try to access model in memory instead of loading object from disk to save time
|
||||
if dk.live and coin in self.model_dictionary:
|
||||
@@ -627,22 +690,3 @@ class FreqaiDataDrawer:
|
||||
).reset_index(drop=True)
|
||||
|
||||
return corr_dataframes, base_dataframes
|
||||
|
||||
# to be used if we want to send predictions directly to the follower instead of forcing
|
||||
# follower to load models and inference
|
||||
# def save_model_return_values_to_disk(self) -> None:
|
||||
# with open(self.full_path / str('model_return_values.json'), "w") as fp:
|
||||
# json.dump(self.model_return_values, fp, default=self.np_encoder)
|
||||
|
||||
# def load_model_return_values_from_disk(self, dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
|
||||
# exists = Path(self.full_path / str('model_return_values.json')).resolve().exists()
|
||||
# if exists:
|
||||
# with open(self.full_path / str('model_return_values.json'), "r") as fp:
|
||||
# self.model_return_values = json.load(fp)
|
||||
# elif not self.follow_mode:
|
||||
# logger.info("Could not find existing datadrawer, starting from scratch")
|
||||
# else:
|
||||
# logger.warning(f'Follower could not find pair_dictionary at {self.full_path} '
|
||||
# 'sending null values back to strategy')
|
||||
|
||||
# return exists, dk
|
||||
|
@@ -7,7 +7,7 @@ from collections import deque
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from typing import Any, Dict, List, Literal, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@@ -148,7 +148,7 @@ class IFreqaiModel(ABC):
|
||||
dataframe = dk.remove_features_from_df(dk.return_dataframe)
|
||||
self.clean_up()
|
||||
if self.live:
|
||||
self.inference_timer('stop')
|
||||
self.inference_timer('stop', metadata["pair"])
|
||||
return dataframe
|
||||
|
||||
def clean_up(self):
|
||||
@@ -217,12 +217,14 @@ class IFreqaiModel(ABC):
|
||||
logger.warning(f"Training {pair} raised exception {msg.__class__.__name__}. "
|
||||
f"Message: {msg}, skipping.")
|
||||
|
||||
self.train_timer('stop')
|
||||
self.train_timer('stop', pair)
|
||||
|
||||
# only rotate the queue after the first has been trained.
|
||||
self.train_queue.rotate(-1)
|
||||
|
||||
self.dd.save_historic_predictions_to_disk()
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
self.dd.save_metric_tracker_to_disk()
|
||||
|
||||
def start_backtesting(
|
||||
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
|
||||
@@ -677,7 +679,7 @@ class IFreqaiModel(ABC):
|
||||
|
||||
return
|
||||
|
||||
def inference_timer(self, do='start'):
|
||||
def inference_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
|
||||
"""
|
||||
Timer designed to track the cumulative time spent in FreqAI for one pass through
|
||||
the whitelist. This will check if the time spent is more than 1/4 the time
|
||||
@@ -688,7 +690,10 @@ class IFreqaiModel(ABC):
|
||||
self.begin_time = time.time()
|
||||
elif do == 'stop':
|
||||
end = time.time()
|
||||
self.inference_time += (end - self.begin_time)
|
||||
time_spent = (end - self.begin_time)
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
self.dd.update_metric_tracker('inference_time', time_spent, pair)
|
||||
self.inference_time += time_spent
|
||||
if self.pair_it == self.total_pairs:
|
||||
logger.info(
|
||||
f'Total time spent inferencing pairlist {self.inference_time:.2f} seconds')
|
||||
@@ -699,7 +704,7 @@ class IFreqaiModel(ABC):
|
||||
self.inference_time = 0
|
||||
return
|
||||
|
||||
def train_timer(self, do='start'):
|
||||
def train_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
|
||||
"""
|
||||
Timer designed to track the cumulative time spent training the full pairlist in
|
||||
FreqAI.
|
||||
@@ -709,7 +714,11 @@ class IFreqaiModel(ABC):
|
||||
self.begin_time_train = time.time()
|
||||
elif do == 'stop':
|
||||
end = time.time()
|
||||
self.train_time += (end - self.begin_time_train)
|
||||
time_spent = (end - self.begin_time_train)
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
self.dd.collect_metrics(time_spent, pair)
|
||||
|
||||
self.train_time += time_spent
|
||||
if self.pair_it_train == self.total_pairs:
|
||||
logger.info(
|
||||
f'Total time spent training pairlist {self.train_time:.2f} seconds')
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
@@ -30,6 +31,14 @@ class CatboostClassifier(BaseClassifierModel):
|
||||
label=data_dictionary["train_labels"],
|
||||
weight=data_dictionary["train_weights"],
|
||||
)
|
||||
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) == 0:
|
||||
test_data = None
|
||||
else:
|
||||
test_data = Pool(
|
||||
data=data_dictionary["test_features"],
|
||||
label=data_dictionary["test_labels"],
|
||||
weight=data_dictionary["test_weights"],
|
||||
)
|
||||
|
||||
cbr = CatBoostClassifier(
|
||||
allow_writing_files=True,
|
||||
@@ -40,6 +49,7 @@ class CatboostClassifier(BaseClassifierModel):
|
||||
|
||||
init_model = self.get_init_model(dk.pair)
|
||||
|
||||
cbr.fit(train_data, init_model=init_model)
|
||||
cbr.fit(X=train_data, eval_set=test_data, init_model=init_model,
|
||||
log_cout=sys.stdout, log_cerr=sys.stderr)
|
||||
|
||||
return cbr
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
@@ -47,6 +48,7 @@ class CatboostRegressor(BaseRegressionModel):
|
||||
**self.model_training_parameters,
|
||||
)
|
||||
|
||||
model.fit(X=train_data, eval_set=test_data, init_model=init_model)
|
||||
model.fit(X=train_data, eval_set=test_data, init_model=init_model,
|
||||
log_cout=sys.stdout, log_cerr=sys.stderr)
|
||||
|
||||
return model
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
@@ -58,8 +59,10 @@ class CatboostRegressorMultiTarget(BaseRegressionModel):
|
||||
|
||||
fit_params = []
|
||||
for i in range(len(eval_sets)):
|
||||
fit_params.append(
|
||||
{'eval_set': eval_sets[i], 'init_model': init_models[i]})
|
||||
fit_params.append({
|
||||
'eval_set': eval_sets[i], 'init_model': init_models[i],
|
||||
'log_cout': sys.stdout, 'log_cerr': sys.stderr,
|
||||
})
|
||||
|
||||
model = FreqaiMultiOutputRegressor(estimator=cbr)
|
||||
thread_training = self.freqai_info.get('multitarget_parallel_training', False)
|
||||
|
85
freqtrade/freqai/prediction_models/XGBoostRFClassifier.py
Normal file
85
freqtrade/freqai/prediction_models/XGBoostRFClassifier.py
Normal file
@@ -0,0 +1,85 @@
|
||||
import logging
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
import pandas as pd
|
||||
from pandas import DataFrame
|
||||
from pandas.api.types import is_integer_dtype
|
||||
from sklearn.preprocessing import LabelEncoder
|
||||
from xgboost import XGBRFClassifier
|
||||
|
||||
from freqtrade.freqai.base_models.BaseClassifierModel import BaseClassifierModel
|
||||
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class XGBoostRFClassifier(BaseClassifierModel):
|
||||
"""
|
||||
User created prediction model. The class needs to override three necessary
|
||||
functions, predict(), train(), fit(). The class inherits ModelHandler which
|
||||
has its own DataHandler where data is held, saved, loaded, and managed.
|
||||
"""
|
||||
|
||||
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
|
||||
"""
|
||||
User sets up the training and test data to fit their desired model here
|
||||
:params:
|
||||
:data_dictionary: the dictionary constructed by DataHandler to hold
|
||||
all the training and test data/labels.
|
||||
"""
|
||||
|
||||
X = data_dictionary["train_features"].to_numpy()
|
||||
y = data_dictionary["train_labels"].to_numpy()[:, 0]
|
||||
|
||||
le = LabelEncoder()
|
||||
if not is_integer_dtype(y):
|
||||
y = pd.Series(le.fit_transform(y), dtype="int64")
|
||||
|
||||
if self.freqai_info.get('data_split_parameters', {}).get('test_size', 0.1) == 0:
|
||||
eval_set = None
|
||||
else:
|
||||
test_features = data_dictionary["test_features"].to_numpy()
|
||||
test_labels = data_dictionary["test_labels"].to_numpy()[:, 0]
|
||||
|
||||
if not is_integer_dtype(test_labels):
|
||||
test_labels = pd.Series(le.transform(test_labels), dtype="int64")
|
||||
|
||||
eval_set = [(test_features, test_labels)]
|
||||
|
||||
train_weights = data_dictionary["train_weights"]
|
||||
|
||||
init_model = self.get_init_model(dk.pair)
|
||||
|
||||
model = XGBRFClassifier(**self.model_training_parameters)
|
||||
|
||||
model.fit(X=X, y=y, eval_set=eval_set, sample_weight=train_weights,
|
||||
xgb_model=init_model)
|
||||
|
||||
return model
|
||||
|
||||
def predict(
|
||||
self, unfiltered_df: DataFrame, dk: FreqaiDataKitchen, **kwargs
|
||||
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
|
||||
"""
|
||||
Filter the prediction features data and predict with it.
|
||||
:param: unfiltered_df: Full dataframe for the current backtest period.
|
||||
:return:
|
||||
:pred_df: dataframe containing the predictions
|
||||
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
|
||||
data (NaNs) or felt uncertain about data (PCA and DI index)
|
||||
"""
|
||||
|
||||
(pred_df, dk.do_predict) = super().predict(unfiltered_df, dk, **kwargs)
|
||||
|
||||
le = LabelEncoder()
|
||||
label = dk.label_list[0]
|
||||
labels_before = list(dk.data['labels_std'].keys())
|
||||
labels_after = le.fit_transform(labels_before).tolist()
|
||||
pred_df[label] = le.inverse_transform(pred_df[label])
|
||||
pred_df = pred_df.rename(
|
||||
columns={labels_after[i]: labels_before[i] for i in range(len(labels_before))})
|
||||
|
||||
return (pred_df, dk.do_predict)
|
45
freqtrade/freqai/prediction_models/XGBoostRFRegressor.py
Normal file
45
freqtrade/freqai/prediction_models/XGBoostRFRegressor.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
|
||||
from xgboost import XGBRFRegressor
|
||||
|
||||
from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
|
||||
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class XGBoostRFRegressor(BaseRegressionModel):
|
||||
"""
|
||||
User created prediction model. The class needs to override three necessary
|
||||
functions, predict(), train(), fit(). The class inherits ModelHandler which
|
||||
has its own DataHandler where data is held, saved, loaded, and managed.
|
||||
"""
|
||||
|
||||
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
|
||||
"""
|
||||
User sets up the training and test data to fit their desired model here
|
||||
:param data_dictionary: the dictionary constructed by DataHandler to hold
|
||||
all the training and test data/labels.
|
||||
"""
|
||||
|
||||
X = data_dictionary["train_features"]
|
||||
y = data_dictionary["train_labels"]
|
||||
|
||||
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) == 0:
|
||||
eval_set = None
|
||||
else:
|
||||
eval_set = [(data_dictionary["test_features"], data_dictionary["test_labels"])]
|
||||
eval_weights = [data_dictionary['test_weights']]
|
||||
|
||||
sample_weight = data_dictionary["train_weights"]
|
||||
|
||||
xgb_model = self.get_init_model(dk.pair)
|
||||
|
||||
model = XGBRFRegressor(**self.model_training_parameters)
|
||||
|
||||
model.fit(X=X, y=y, sample_weight=sample_weight, eval_set=eval_set,
|
||||
sample_weight_eval_set=eval_weights, xgb_model=xgb_model)
|
||||
|
||||
return model
|
Reference in New Issue
Block a user