merge develop into feat/freqai-rl-dev

This commit is contained in:
robcaulk
2022-10-30 10:13:03 +01:00
129 changed files with 2648 additions and 1004 deletions

View File

@@ -1,14 +1,15 @@
import collections
import json
import logging
import re
import shutil
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Tuple, TypedDict
import numpy as np
import pandas as pd
import psutil
import rapidjson
from joblib import dump, load
from joblib.externals import cloudpickle
@@ -65,6 +66,8 @@ class FreqaiDataDrawer:
self.pair_dict: Dict[str, pair_info] = {}
# dictionary holding all actively inferenced models in memory given a model filename
self.model_dictionary: Dict[str, Any] = {}
# all additional metadata that we want to keep in ram
self.meta_data_dictionary: Dict[str, Dict[str, Any]] = {}
self.model_return_values: Dict[str, DataFrame] = {}
self.historic_data: Dict[str, Dict[str, DataFrame]] = {}
self.historic_predictions: Dict[str, DataFrame] = {}
@@ -78,19 +81,49 @@ class FreqaiDataDrawer:
self.historic_predictions_bkp_path = Path(
self.full_path / "historic_predictions.backup.pkl")
self.pair_dictionary_path = Path(self.full_path / "pair_dictionary.json")
self.metric_tracker_path = Path(self.full_path / "metric_tracker.json")
self.follow_mode = follow_mode
if follow_mode:
self.create_follower_dict()
self.load_drawer_from_disk()
self.load_historic_predictions_from_disk()
self.load_metric_tracker_from_disk()
self.training_queue: Dict[str, int] = {}
self.history_lock = threading.Lock()
self.save_lock = threading.Lock()
self.pair_dict_lock = threading.Lock()
self.metric_tracker_lock = threading.Lock()
self.old_DBSCAN_eps: Dict[str, float] = {}
self.empty_pair_dict: pair_info = {
"model_filename": "", "trained_timestamp": 0,
"data_path": "", "extras": {}}
self.metric_tracker: Dict[str, Dict[str, Dict[str, list]]] = {}
def update_metric_tracker(self, metric: str, value: float, pair: str) -> None:
"""
General utility for adding and updating custom metrics. Typically used
for adding training performance, train timings, inferenc timings, cpu loads etc.
"""
with self.metric_tracker_lock:
if pair not in self.metric_tracker:
self.metric_tracker[pair] = {}
if metric not in self.metric_tracker[pair]:
self.metric_tracker[pair][metric] = {'timestamp': [], 'value': []}
timestamp = int(datetime.now(timezone.utc).timestamp())
self.metric_tracker[pair][metric]['value'].append(value)
self.metric_tracker[pair][metric]['timestamp'].append(timestamp)
def collect_metrics(self, time_spent: float, pair: str):
"""
Add metrics to the metric tracker dictionary
"""
load1, load5, load15 = psutil.getloadavg()
cpus = psutil.cpu_count()
self.update_metric_tracker('train_time', time_spent, pair)
self.update_metric_tracker('cpu_load1min', load1 / cpus, pair)
self.update_metric_tracker('cpu_load5min', load5 / cpus, pair)
self.update_metric_tracker('cpu_load15min', load15 / cpus, pair)
self.limit_ram_use = self.freqai_info.get('limit_ram_usage', False)
if 'rl_config' in self.freqai_info:
self.model_type = 'stable_baselines'
@@ -103,12 +136,12 @@ class FreqaiDataDrawer:
"""
Locate and load a previously saved data drawer full of all pair model metadata in
present model folder.
:return: bool - whether or not the drawer was located
Load any existing metric tracker that may be present.
"""
exists = self.pair_dictionary_path.is_file()
if exists:
with open(self.pair_dictionary_path, "r") as fp:
self.pair_dict = json.load(fp)
self.pair_dict = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
elif not self.follow_mode:
logger.info("Could not find existing datadrawer, starting from scratch")
else:
@@ -117,7 +150,18 @@ class FreqaiDataDrawer:
"sending null values back to strategy"
)
return exists
def load_metric_tracker_from_disk(self):
"""
Tries to load an existing metrics dictionary if the user
wants to collect metrics.
"""
if self.freqai_info.get('write_metrics_to_disk', False):
exists = self.metric_tracker_path.is_file()
if exists:
with open(self.metric_tracker_path, "r") as fp:
self.metric_tracker = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
else:
logger.info("Could not find existing metric tracker, starting from scratch")
def load_historic_predictions_from_disk(self):
"""
@@ -153,7 +197,7 @@ class FreqaiDataDrawer:
def save_historic_predictions_to_disk(self):
"""
Save data drawer full of all pair model metadata in present model folder.
Save historic predictions pickle to disk
"""
with open(self.historic_predictions_path, "wb") as fp:
cloudpickle.dump(self.historic_predictions, fp, protocol=cloudpickle.DEFAULT_PROTOCOL)
@@ -161,6 +205,15 @@ class FreqaiDataDrawer:
# create a backup
shutil.copy(self.historic_predictions_path, self.historic_predictions_bkp_path)
def save_metric_tracker_to_disk(self):
"""
Save metric tracker of all pair metrics collected.
"""
with self.save_lock:
with open(self.metric_tracker_path, 'w') as fp:
rapidjson.dump(self.metric_tracker, fp, default=self.np_encoder,
number_mode=rapidjson.NM_NATIVE)
def save_drawer_to_disk(self):
"""
Save data drawer full of all pair model metadata in present model folder.
@@ -419,9 +472,8 @@ class FreqaiDataDrawer:
def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
"""
Saves all data associated with a model for a single sub-train time range
:params:
:model: User trained model which can be reused for inferencing to generate
predictions
:param model: User trained model which can be reused for inferencing to generate
predictions
"""
if not dk.data_path.is_dir():
@@ -466,6 +518,10 @@ class FreqaiDataDrawer:
self.model_dictionary[coin] = model
self.pair_dict[coin]["model_filename"] = dk.model_filename
self.pair_dict[coin]["data_path"] = str(dk.data_path)
if coin not in self.meta_data_dictionary:
self.meta_data_dictionary[coin] = {}
self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"]
self.meta_data_dictionary[coin]["meta_data"] = dk.data
self.save_drawer_to_disk()
return
@@ -476,7 +532,7 @@ class FreqaiDataDrawer:
presaved backtesting (prediction file loading).
"""
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
dk.data = json.load(fp)
dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
dk.training_features_list = dk.data["training_features_list"]
dk.label_list = dk.data["label_list"]
@@ -502,14 +558,19 @@ class FreqaiDataDrawer:
/ dk.data_path.parts[-1]
)
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
dk.data = json.load(fp)
dk.training_features_list = dk.data["training_features_list"]
dk.label_list = dk.data["label_list"]
if coin in self.meta_data_dictionary:
dk.data = self.meta_data_dictionary[coin]["meta_data"]
dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"]
else:
with open(dk.data_path / f"{dk.model_filename}_metadata.json", "r") as fp:
dk.data = rapidjson.load(fp, number_mode=rapidjson.NM_NATIVE)
dk.data_dictionary["train_features"] = pd.read_pickle(
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
)
dk.data_dictionary["train_features"] = pd.read_pickle(
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
)
dk.training_features_list = dk.data["training_features_list"]
dk.label_list = dk.data["label_list"]
# try to access model in memory instead of loading object from disk to save time
if dk.live and coin in self.model_dictionary and not self.limit_ram_use:
@@ -549,8 +610,7 @@ class FreqaiDataDrawer:
Append new candles to our stores historic data (in memory) so that
we do not need to load candle history from disk and we dont need to
pinging exchange multiple times for the same candle.
:params:
dataframe: DataFrame = strategy provided dataframe
:param dataframe: DataFrame = strategy provided dataframe
"""
feat_params = self.freqai_info["feature_parameters"]
with self.history_lock:
@@ -596,9 +656,8 @@ class FreqaiDataDrawer:
"""
Load pair histories for all whitelist and corr_pairlist pairs.
Only called once upon startup of bot.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
:param timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
"""
history_data = self.historic_data
@@ -621,10 +680,9 @@ class FreqaiDataDrawer:
"""
Searches through our historic_data in memory and returns the dataframes relevant
to the present pair.
:params:
timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
metadata: dict = strategy furnished pair metadata
:param timerange: TimeRange = full timerange required to populate all indicators
for training according to user defined train_period_days
:param metadata: dict = strategy furnished pair metadata
"""
with self.history_lock:
corr_dataframes: Dict[Any, Any] = {}
@@ -635,7 +693,8 @@ class FreqaiDataDrawer:
)
for tf in self.freqai_info["feature_parameters"].get("include_timeframes"):
base_dataframes[tf] = dk.slice_dataframe(timerange, historic_data[pair][tf])
base_dataframes[tf] = dk.slice_dataframe(
timerange, historic_data[pair][tf]).reset_index(drop=True)
if pairs:
for p in pairs:
if pair in p:
@@ -644,6 +703,6 @@ class FreqaiDataDrawer:
corr_dataframes[p] = {}
corr_dataframes[p][tf] = dk.slice_dataframe(
timerange, historic_data[p][tf]
)
).reset_index(drop=True)
return corr_dataframes, base_dataframes