diff --git a/docs/freqai-parameter-table.md b/docs/freqai-parameter-table.md index 7d9864fc0..28a15913b 100644 --- a/docs/freqai-parameter-table.md +++ b/docs/freqai-parameter-table.md @@ -37,6 +37,7 @@ Mandatory parameters are marked as **Required** and have to be set in one of the | `noise_standard_deviation` | If set, FreqAI adds noise to the training features with the aim of preventing overfitting. FreqAI generates random deviates from a gaussian distribution with a standard deviation of `noise_standard_deviation` and adds them to all data points. `noise_standard_deviation` should be kept relative to the normalized space, i.e., between -1 and 1. In other words, since data in FreqAI is always normalized to be between -1 and 1, `noise_standard_deviation: 0.05` would result in 32% of the data being randomly increased/decreased by more than 2.5% (i.e., the percent of data falling within the first standard deviation).
**Datatype:** Integer.
Default: `0`. | `outlier_protection_percentage` | Enable to prevent outlier detection methods from discarding too much data. If more than `outlier_protection_percentage` % of points are detected as outliers by the SVM or DBSCAN, FreqAI will log a warning message and ignore outlier detection, i.e., the original dataset will be kept intact. If the outlier protection is triggered, no predictions will be made based on the training dataset.
**Datatype:** Float.
Default: `30`. | `reverse_train_test_order` | Split the feature dataset (see below) and use the latest data split for training and test on historical split of the data. This allows the model to be trained up to the most recent data point, while avoiding overfitting. However, you should be careful to understand the unorthodox nature of this parameter before employing it.
**Datatype:** Boolean.
Default: `False` (no reversal). +| `write_metrics_to_disk` | Collect train timings, inference timings and cpu usage in json file.
**Datatype:** Boolean.
Default: `False` | | **Data split parameters** | `data_split_parameters` | Include any additional parameters available from Scikit-learn `test_train_split()`, which are shown [here](https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html) (external website).
**Datatype:** Dictionary. | `test_size` | The fraction of data that should be used for testing instead of training.
**Datatype:** Positive float < 1. diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 2fdb091a7..70f60867b 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -540,6 +540,8 @@ CONF_SCHEMA = { "properties": { "enabled": {"type": "boolean", "default": False}, "keras": {"type": "boolean", "default": False}, + "write_metrics_to_disk": {"type": "boolean", "default": False}, + "purge_old_models": {"type": "boolean", "default": True}, "conv_width": {"type": "integer", "default": 2}, "train_period_days": {"type": "integer", "default": 0}, "backtest_period_days": {"type": "number", "default": 7}, diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index 465ba27f5..0e9d2e605 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -1,14 +1,15 @@ import collections -import json import logging import re import shutil import threading +from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, Tuple, TypedDict import numpy as np import pandas as pd +import psutil import rapidjson from joblib import dump, load from joblib.externals import cloudpickle @@ -65,6 +66,8 @@ class FreqaiDataDrawer: self.pair_dict: Dict[str, pair_info] = {} # dictionary holding all actively inferenced models in memory given a model filename self.model_dictionary: Dict[str, Any] = {} + # all additional metadata that we want to keep in ram + self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {} self.model_return_values: Dict[str, DataFrame] = {} self.historic_data: Dict[str, Dict[str, DataFrame]] = {} self.historic_predictions: Dict[str, DataFrame] = {} @@ -78,30 +81,60 @@ class FreqaiDataDrawer: self.historic_predictions_bkp_path = Path( self.full_path / "historic_predictions.backup.pkl") self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json") + self.metric_tracker_path = Path(self.full_path / "metric_tracker.json") self.follow_mode = follow_mode if follow_mode: self.create_follower_dict() self.load_drawer_from_disk() self.load_historic_predictions_from_disk() + self.load_metric_tracker_from_disk() self.training_queue: Dict[str, int] = {} self.history_lock = threading.Lock() self.save_lock = threading.Lock() self.pair_dict_lock = threading.Lock() + self.metric_tracker_lock = threading.Lock() self.old_DBSCAN_eps: Dict[str, float] = {} self.empty_pair_dict: pair_info = { "model_filename": "", "trained_timestamp": 0, "data_path": "", "extras": {}} + self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {} + + def update_metric_tracker(self, metric: str, value: float, pair: str) -> None: + """ + General utility for adding and updating custom metrics. Typically used + for adding training performance, train timings, inferenc timings, cpu loads etc. + """ + with self.metric_tracker_lock: + if pair not in self.metric_tracker: + self.metric_tracker[pair] = {} + if metric not in self.metric_tracker[pair]: + self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []} + + timestamp = int(datetime.now(timezone.utc).timestamp()) + self.metric_tracker[pair][metric]['value'].append(value) + self.metric_tracker[pair][metric]['timestamp'].append(timestamp) + + def collect_metrics(self, time_spent: float, pair: str): + """ + Add metrics to the metric tracker dictionary + """ + load1, load5, load15 = psutil.getloadavg() + cpus = psutil.cpu_count() + self.update_metric_tracker('train_time', time_spent, pair) + self.update_metric_tracker('cpu_load1min', load1 / cpus, pair) + self.update_metric_tracker('cpu_load5min', load5 / cpus, pair) + self.update_metric_tracker('cpu_load15min', load15 / cpus, pair) def load_drawer_from_disk(self): """ Locate and load a previously saved data drawer full of all pair model metadata in present model folder. - :return: bool - whether or not the drawer was located + Load any existing metric tracker that may be present. """ exists = self.pair_dictionary_path.is_file() if exists: with open(self.pair_dictionary_path, "r") as fp: - self.pair_dict = json.load(fp) + self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE) elif not self.follow_mode: logger.info("Could not find existing datadrawer, starting from scratch") else: @@ -110,7 +143,18 @@ class FreqaiDataDrawer: "sending null values back to strategy" ) - return exists + def load_metric_tracker_from_disk(self): + """ + Tries to load an existing metrics dictionary if the user + wants to collect metrics. + """ + if self.freqai_info.get('write_metrics_to_disk', False): + exists = self.metric_tracker_path.is_file() + if exists: + with open(self.metric_tracker_path, "r") as fp: + self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE) + else: + logger.info("Could not find existing metric tracker, starting from scratch") def load_historic_predictions_from_disk(self): """ @@ -146,7 +190,7 @@ class FreqaiDataDrawer: def save_historic_predictions_to_disk(self): """ - Save data drawer full of all pair model metadata in present model folder. + Save historic predictions pickle to disk """ with open(self.historic_predictions_path, "wb") as fp: cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL) @@ -154,6 +198,15 @@ class FreqaiDataDrawer: # create a backup shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path) + def save_metric_tracker_to_disk(self): + """ + Save metric tracker of all pair metrics collected. + """ + with self.save_lock: + with open(self.metric_tracker_path, 'w') as fp: + rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder, + number_mode=rapidjson.NM_NATIVE) + def save_drawer_to_disk(self): """ Save data drawer full of all pair model metadata in present model folder. @@ -453,9 +506,14 @@ class FreqaiDataDrawer: ) # if self.live: + # store as much in ram as possible to increase performance self.model_dictionary[coin] = model self.pair_dict[coin]["model_filename"] = dk.model_filename self.pair_dict[coin]["data_path"] = str(dk.data_path) + if coin not in self.meta_data_dictionary: + self.meta_data_dictionary[coin] = {} + self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"] + self.meta_data_dictionary[coin]["meta_data"] = dk.data self.save_drawer_to_disk() return @@ -466,7 +524,7 @@ class FreqaiDataDrawer: presaved backtesting (prediction file loading). """ with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp: - dk.data = json.load(fp) + dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE) dk.training_features_list = dk.data["training_features_list"] dk.label_list = dk.data["label_list"] @@ -492,14 +550,19 @@ class FreqaiDataDrawer: / dk.data_path.parts[-1] ) - with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp: - dk.data = json.load(fp) - dk.training_features_list = dk.data["training_features_list"] - dk.label_list = dk.data["label_list"] + if coin in self.meta_data_dictionary: + dk.data = self.meta_data_dictionary[coin]["meta_data"] + dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"] + else: + with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp: + dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE) - dk.data_dictionary["train_features"] = pd.read_pickle( - dk.data_path / f"{dk.model_filename}_trained_df.pkl" - ) + dk.data_dictionary["train_features"] = pd.read_pickle( + dk.data_path / f"{dk.model_filename}_trained_df.pkl" + ) + + dk.training_features_list = dk.data["training_features_list"] + dk.label_list = dk.data["label_list"] # try to access model in memory instead of loading object from disk to save time if dk.live and coin in self.model_dictionary: @@ -627,22 +690,3 @@ class FreqaiDataDrawer: ).reset_index(drop=True) return corr_dataframes, base_dataframes - - # to be used if we want to send predictions directly to the follower instead of forcing - # follower to load models and inference - # def save_model_return_values_to_disk(self) -> None: - # with open(self.full_path / str('model_return_values.json'), "w") as fp: - # json.dump(self.model_return_values, fp, default=self.np_encoder) - - # def load_model_return_values_from_disk(self, dk: FreqaiDataKitchen) -> FreqaiDataKitchen: - # exists = Path(self.full_path / str('model_return_values.json')).resolve().exists() - # if exists: - # with open(self.full_path / str('model_return_values.json'), "r") as fp: - # self.model_return_values = json.load(fp) - # elif not self.follow_mode: - # logger.info("Could not find existing datadrawer, starting from scratch") - # else: - # logger.warning(f'Follower could not find pair_dictionary at {self.full_path} ' - # 'sending null values back to strategy') - - # return exists, dk diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index 9907a5f9f..db0d4c379 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -7,7 +7,7 @@ from collections import deque from datetime import datetime, timezone from pathlib import Path from threading import Lock -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Literal, Tuple import numpy as np import pandas as pd @@ -144,7 +144,7 @@ class IFreqaiModel(ABC): dataframe = dk.remove_features_from_df(dk.return_dataframe) self.clean_up() if self.live: - self.inference_timer('stop') + self.inference_timer('stop', metadata["pair"]) return dataframe def clean_up(self): @@ -213,12 +213,14 @@ class IFreqaiModel(ABC): logger.warning(f"Training {pair} raised exception {msg.__class__.__name__}. " f"Message: {msg}, skipping.") - self.train_timer('stop') + self.train_timer('stop', pair) # only rotate the queue after the first has been trained. self.train_queue.rotate(-1) self.dd.save_historic_predictions_to_disk() + if self.freqai_info.get('write_metrics_to_disk', False): + self.dd.save_metric_tracker_to_disk() def start_backtesting( self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen @@ -655,7 +657,7 @@ class IFreqaiModel(ABC): return - def inference_timer(self, do='start'): + def inference_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''): """ Timer designed to track the cumulative time spent in FreqAI for one pass through the whitelist. This will check if the time spent is more than 1/4 the time @@ -666,7 +668,10 @@ class IFreqaiModel(ABC): self.begin_time = time.time() elif do == 'stop': end = time.time() - self.inference_time += (end - self.begin_time) + time_spent = (end - self.begin_time) + if self.freqai_info.get('write_metrics_to_disk', False): + self.dd.update_metric_tracker('inference_time', time_spent, pair) + self.inference_time += time_spent if self.pair_it == self.total_pairs: logger.info( f'Total time spent inferencing pairlist {self.inference_time:.2f} seconds') @@ -677,7 +682,7 @@ class IFreqaiModel(ABC): self.inference_time = 0 return - def train_timer(self, do='start'): + def train_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''): """ Timer designed to track the cumulative time spent training the full pairlist in FreqAI. @@ -687,7 +692,11 @@ class IFreqaiModel(ABC): self.begin_time_train = time.time() elif do == 'stop': end = time.time() - self.train_time += (end - self.begin_time_train) + time_spent = (end - self.begin_time_train) + if self.freqai_info.get('write_metrics_to_disk', False): + self.dd.collect_metrics(time_spent, pair) + + self.train_time += time_spent if self.pair_it_train == self.total_pairs: logger.info( f'Total time spent training pairlist {self.train_time:.2f} seconds') diff --git a/tests/freqai/test_freqai_interface.py b/tests/freqai/test_freqai_interface.py index 3581cc50c..b619c0611 100644 --- a/tests/freqai/test_freqai_interface.py +++ b/tests/freqai/test_freqai_interface.py @@ -58,9 +58,15 @@ def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model): new_timerange = TimeRange.parse_timerange("20180127-20180130") freqai.dk.set_paths('ADA/BTC', None) + freqai.train_timer("start", "ADA/BTC") freqai.extract_data_and_train_model( new_timerange, "ADA/BTC", strategy, freqai.dk, data_load_timerange) + freqai.train_timer("stop", "ADA/BTC") + freqai.dd.save_metric_tracker_to_disk() + freqai.dd.save_drawer_to_disk() + assert Path(freqai.dk.full_path / "metric_tracker.json").is_file() + assert Path(freqai.dk.full_path / "pair_dictionary.json").is_file() assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_model.{model_save_ext}").is_file() assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_metadata.json").is_file()