Merge pull request #7557 from freqtrade/add-metric-tracker
Add metric tracker to FreqAI
This commit is contained in:
@@ -540,6 +540,8 @@ CONF_SCHEMA = {
|
||||
"properties": {
|
||||
"enabled": {"type": "boolean", "default": False},
|
||||
"keras": {"type": "boolean", "default": False},
|
||||
"write_metrics_to_disk": {"type": "boolean", "default": False},
|
||||
"purge_old_models": {"type": "boolean", "default": True},
|
||||
"conv_width": {"type": "integer", "default": 2},
|
||||
"train_period_days": {"type": "integer", "default": 0},
|
||||
"backtest_period_days": {"type": "number", "default": 7},
|
||||
|
@@ -1,14 +1,15 @@
|
||||
import collections
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
import threading
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple, TypedDict
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import psutil
|
||||
import rapidjson
|
||||
from joblib import dump, load
|
||||
from joblib.externals import cloudpickle
|
||||
@@ -65,6 +66,8 @@ class FreqaiDataDrawer:
|
||||
self.pair_dict: Dict[str, pair_info] = {}
|
||||
# dictionary holding all actively inferenced models in memory given a model filename
|
||||
self.model_dictionary: Dict[str, Any] = {}
|
||||
# all additional metadata that we want to keep in ram
|
||||
self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {}
|
||||
self.model_return_values: Dict[str, DataFrame] = {}
|
||||
self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
|
||||
self.historic_predictions: Dict[str, DataFrame] = {}
|
||||
@@ -78,30 +81,60 @@ class FreqaiDataDrawer:
|
||||
self.historic_predictions_bkp_path = Path(
|
||||
self.full_path / "historic_predictions.backup.pkl")
|
||||
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
|
||||
self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
|
||||
self.follow_mode = follow_mode
|
||||
if follow_mode:
|
||||
self.create_follower_dict()
|
||||
self.load_drawer_from_disk()
|
||||
self.load_historic_predictions_from_disk()
|
||||
self.load_metric_tracker_from_disk()
|
||||
self.training_queue: Dict[str, int] = {}
|
||||
self.history_lock = threading.Lock()
|
||||
self.save_lock = threading.Lock()
|
||||
self.pair_dict_lock = threading.Lock()
|
||||
self.metric_tracker_lock = threading.Lock()
|
||||
self.old_DBSCAN_eps: Dict[str, float] = {}
|
||||
self.empty_pair_dict: pair_info = {
|
||||
"model_filename": "", "trained_timestamp": 0,
|
||||
"data_path": "", "extras": {}}
|
||||
self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
|
||||
|
||||
def update_metric_tracker(self, metric: str, value: float, pair: str) -> None:
|
||||
"""
|
||||
General utility for adding and updating custom metrics. Typically used
|
||||
for adding training performance, train timings, inferenc timings, cpu loads etc.
|
||||
"""
|
||||
with self.metric_tracker_lock:
|
||||
if pair not in self.metric_tracker:
|
||||
self.metric_tracker[pair] = {}
|
||||
if metric not in self.metric_tracker[pair]:
|
||||
self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []}
|
||||
|
||||
timestamp = int(datetime.now(timezone.utc).timestamp())
|
||||
self.metric_tracker[pair][metric]['value'].append(value)
|
||||
self.metric_tracker[pair][metric]['timestamp'].append(timestamp)
|
||||
|
||||
def collect_metrics(self, time_spent: float, pair: str):
|
||||
"""
|
||||
Add metrics to the metric tracker dictionary
|
||||
"""
|
||||
load1, load5, load15 = psutil.getloadavg()
|
||||
cpus = psutil.cpu_count()
|
||||
self.update_metric_tracker('train_time', time_spent, pair)
|
||||
self.update_metric_tracker('cpu_load1min', load1 / cpus, pair)
|
||||
self.update_metric_tracker('cpu_load5min', load5 / cpus, pair)
|
||||
self.update_metric_tracker('cpu_load15min', load15 / cpus, pair)
|
||||
|
||||
def load_drawer_from_disk(self):
|
||||
"""
|
||||
Locate and load a previously saved data drawer full of all pair model metadata in
|
||||
present model folder.
|
||||
:return: bool - whether or not the drawer was located
|
||||
Load any existing metric tracker that may be present.
|
||||
"""
|
||||
exists = self.pair_dictionary_path.is_file()
|
||||
if exists:
|
||||
with open(self.pair_dictionary_path, "r") as fp:
|
||||
self.pair_dict = json.load(fp)
|
||||
self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
elif not self.follow_mode:
|
||||
logger.info("Could not find existing datadrawer, starting from scratch")
|
||||
else:
|
||||
@@ -110,7 +143,18 @@ class FreqaiDataDrawer:
|
||||
"sending null values back to strategy"
|
||||
)
|
||||
|
||||
return exists
|
||||
def load_metric_tracker_from_disk(self):
|
||||
"""
|
||||
Tries to load an existing metrics dictionary if the user
|
||||
wants to collect metrics.
|
||||
"""
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
exists = self.metric_tracker_path.is_file()
|
||||
if exists:
|
||||
with open(self.metric_tracker_path, "r") as fp:
|
||||
self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
else:
|
||||
logger.info("Could not find existing metric tracker, starting from scratch")
|
||||
|
||||
def load_historic_predictions_from_disk(self):
|
||||
"""
|
||||
@@ -146,7 +190,7 @@ class FreqaiDataDrawer:
|
||||
|
||||
def save_historic_predictions_to_disk(self):
|
||||
"""
|
||||
Save data drawer full of all pair model metadata in present model folder.
|
||||
Save historic predictions pickle to disk
|
||||
"""
|
||||
with open(self.historic_predictions_path, "wb") as fp:
|
||||
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
|
||||
@@ -154,6 +198,15 @@ class FreqaiDataDrawer:
|
||||
# create a backup
|
||||
shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
|
||||
|
||||
def save_metric_tracker_to_disk(self):
|
||||
"""
|
||||
Save metric tracker of all pair metrics collected.
|
||||
"""
|
||||
with self.save_lock:
|
||||
with open(self.metric_tracker_path, 'w') as fp:
|
||||
rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder,
|
||||
number_mode=rapidjson.NM_NATIVE)
|
||||
|
||||
def save_drawer_to_disk(self):
|
||||
"""
|
||||
Save data drawer full of all pair model metadata in present model folder.
|
||||
@@ -453,9 +506,14 @@ class FreqaiDataDrawer:
|
||||
)
|
||||
|
||||
# if self.live:
|
||||
# store as much in ram as possible to increase performance
|
||||
self.model_dictionary[coin] = model
|
||||
self.pair_dict[coin]["model_filename"] = dk.model_filename
|
||||
self.pair_dict[coin]["data_path"] = str(dk.data_path)
|
||||
if coin not in self.meta_data_dictionary:
|
||||
self.meta_data_dictionary[coin] = {}
|
||||
self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"]
|
||||
self.meta_data_dictionary[coin]["meta_data"] = dk.data
|
||||
self.save_drawer_to_disk()
|
||||
|
||||
return
|
||||
@@ -466,7 +524,7 @@ class FreqaiDataDrawer:
|
||||
presaved backtesting (prediction file loading).
|
||||
"""
|
||||
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
||||
dk.data = json.load(fp)
|
||||
dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
dk.training_features_list = dk.data["training_features_list"]
|
||||
dk.label_list = dk.data["label_list"]
|
||||
|
||||
@@ -492,14 +550,19 @@ class FreqaiDataDrawer:
|
||||
/ dk.data_path.parts[-1]
|
||||
)
|
||||
|
||||
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
||||
dk.data = json.load(fp)
|
||||
dk.training_features_list = dk.data["training_features_list"]
|
||||
dk.label_list = dk.data["label_list"]
|
||||
if coin in self.meta_data_dictionary:
|
||||
dk.data = self.meta_data_dictionary[coin]["meta_data"]
|
||||
dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"]
|
||||
else:
|
||||
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
|
||||
dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
|
||||
|
||||
dk.data_dictionary["train_features"] = pd.read_pickle(
|
||||
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
|
||||
)
|
||||
dk.data_dictionary["train_features"] = pd.read_pickle(
|
||||
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
|
||||
)
|
||||
|
||||
dk.training_features_list = dk.data["training_features_list"]
|
||||
dk.label_list = dk.data["label_list"]
|
||||
|
||||
# try to access model in memory instead of loading object from disk to save time
|
||||
if dk.live and coin in self.model_dictionary:
|
||||
@@ -627,22 +690,3 @@ class FreqaiDataDrawer:
|
||||
).reset_index(drop=True)
|
||||
|
||||
return corr_dataframes, base_dataframes
|
||||
|
||||
# to be used if we want to send predictions directly to the follower instead of forcing
|
||||
# follower to load models and inference
|
||||
# def save_model_return_values_to_disk(self) -> None:
|
||||
# with open(self.full_path / str('model_return_values.json'), "w") as fp:
|
||||
# json.dump(self.model_return_values, fp, default=self.np_encoder)
|
||||
|
||||
# def load_model_return_values_from_disk(self, dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
|
||||
# exists = Path(self.full_path / str('model_return_values.json')).resolve().exists()
|
||||
# if exists:
|
||||
# with open(self.full_path / str('model_return_values.json'), "r") as fp:
|
||||
# self.model_return_values = json.load(fp)
|
||||
# elif not self.follow_mode:
|
||||
# logger.info("Could not find existing datadrawer, starting from scratch")
|
||||
# else:
|
||||
# logger.warning(f'Follower could not find pair_dictionary at {self.full_path} '
|
||||
# 'sending null values back to strategy')
|
||||
|
||||
# return exists, dk
|
||||
|
@@ -7,7 +7,7 @@ from collections import deque
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from typing import Any, Dict, List, Literal, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@@ -144,7 +144,7 @@ class IFreqaiModel(ABC):
|
||||
dataframe = dk.remove_features_from_df(dk.return_dataframe)
|
||||
self.clean_up()
|
||||
if self.live:
|
||||
self.inference_timer('stop')
|
||||
self.inference_timer('stop', metadata["pair"])
|
||||
return dataframe
|
||||
|
||||
def clean_up(self):
|
||||
@@ -213,12 +213,14 @@ class IFreqaiModel(ABC):
|
||||
logger.warning(f"Training {pair} raised exception {msg.__class__.__name__}. "
|
||||
f"Message: {msg}, skipping.")
|
||||
|
||||
self.train_timer('stop')
|
||||
self.train_timer('stop', pair)
|
||||
|
||||
# only rotate the queue after the first has been trained.
|
||||
self.train_queue.rotate(-1)
|
||||
|
||||
self.dd.save_historic_predictions_to_disk()
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
self.dd.save_metric_tracker_to_disk()
|
||||
|
||||
def start_backtesting(
|
||||
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
|
||||
@@ -655,7 +657,7 @@ class IFreqaiModel(ABC):
|
||||
|
||||
return
|
||||
|
||||
def inference_timer(self, do='start'):
|
||||
def inference_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
|
||||
"""
|
||||
Timer designed to track the cumulative time spent in FreqAI for one pass through
|
||||
the whitelist. This will check if the time spent is more than 1/4 the time
|
||||
@@ -666,7 +668,10 @@ class IFreqaiModel(ABC):
|
||||
self.begin_time = time.time()
|
||||
elif do == 'stop':
|
||||
end = time.time()
|
||||
self.inference_time += (end - self.begin_time)
|
||||
time_spent = (end - self.begin_time)
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
self.dd.update_metric_tracker('inference_time', time_spent, pair)
|
||||
self.inference_time += time_spent
|
||||
if self.pair_it == self.total_pairs:
|
||||
logger.info(
|
||||
f'Total time spent inferencing pairlist {self.inference_time:.2f} seconds')
|
||||
@@ -677,7 +682,7 @@ class IFreqaiModel(ABC):
|
||||
self.inference_time = 0
|
||||
return
|
||||
|
||||
def train_timer(self, do='start'):
|
||||
def train_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
|
||||
"""
|
||||
Timer designed to track the cumulative time spent training the full pairlist in
|
||||
FreqAI.
|
||||
@@ -687,7 +692,11 @@ class IFreqaiModel(ABC):
|
||||
self.begin_time_train = time.time()
|
||||
elif do == 'stop':
|
||||
end = time.time()
|
||||
self.train_time += (end - self.begin_time_train)
|
||||
time_spent = (end - self.begin_time_train)
|
||||
if self.freqai_info.get('write_metrics_to_disk', False):
|
||||
self.dd.collect_metrics(time_spent, pair)
|
||||
|
||||
self.train_time += time_spent
|
||||
if self.pair_it_train == self.total_pairs:
|
||||
logger.info(
|
||||
f'Total time spent training pairlist {self.train_time:.2f} seconds')
|
||||
|
Reference in New Issue
Block a user