From eaa43337d2d7c13eeeb8c809d212e047f5935470 Mon Sep 17 00:00:00 2001 From: robcaulk Date: Sun, 18 Sep 2022 17:00:55 +0200 Subject: [PATCH] improve train queue system, ensure crash resilience in train queue. --- freqtrade/freqai/data_drawer.py | 14 +--- freqtrade/freqai/freqai_interface.py | 71 +++++++++++++------- freqtrade/templates/FreqaiExampleStrategy.py | 2 +- 3 files changed, 50 insertions(+), 37 deletions(-) diff --git a/freqtrade/freqai/data_drawer.py b/freqtrade/freqai/data_drawer.py index 1c091f1be..67daa626d 100644 --- a/freqtrade/freqai/data_drawer.py +++ b/freqtrade/freqai/data_drawer.py @@ -27,9 +27,7 @@ logger = logging.getLogger(__name__) class pair_info(TypedDict): model_filename: str - first: bool trained_timestamp: int - priority: int data_path: str extras: dict @@ -91,7 +89,7 @@ class FreqaiDataDrawer: self.old_DBSCAN_eps: Dict[str, float] = {} self.empty_pair_dict: pair_info = { "model_filename": "", "trained_timestamp": 0, - "priority": 1, "first": True, "data_path": "", "extras": {}} + "data_path": "", "extras": {}} def load_drawer_from_disk(self): """ @@ -216,7 +214,6 @@ class FreqaiDataDrawer: self.pair_dict[pair] = self.empty_pair_dict.copy() model_filename = "" trained_timestamp = 0 - self.pair_dict[pair]["priority"] = len(self.pair_dict) if not data_path_set and self.follow_mode: logger.warning( @@ -236,18 +233,9 @@ class FreqaiDataDrawer: return else: self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy() - self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict) return - def pair_to_end_of_training_queue(self, pair: str) -> None: - # march all pairs up in the queue - with self.pair_dict_lock: - for p in self.pair_dict: - self.pair_dict[p]["priority"] -= 1 - # send pair to end of queue - self.pair_dict[pair]["priority"] = len(self.pair_dict) - def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None: """ Set the initial return values to the historical predictions dataframe. This avoids needing diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index 78931bed4..85768fcf8 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -3,6 +3,7 @@ import shutil import threading import time from abc import ABC, abstractmethod +from collections import deque from datetime import datetime, timezone from pathlib import Path from threading import Lock @@ -80,6 +81,7 @@ class IFreqaiModel(ABC): self.pair_it = 0 self.pair_it_train = 0 self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist")) + self.train_queue = self._set_train_queue() self.last_trade_database_summary: DataFrame = {} self.current_trade_database_summary: DataFrame = {} self.analysis_lock = Lock() @@ -180,30 +182,36 @@ class IFreqaiModel(ABC): :param strategy: IStrategy = The user defined strategy class """ while not self._stop_event.is_set(): - time.sleep(1) - for pair in self.config.get("exchange", {}).get("pair_whitelist"): + pair = self.train_queue[0] - (_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair) + # ensure pair is avaialble in dp + if pair not in strategy.dp.current_whitelist(): + self.train_queue.popleft() + logger.warning(f'{pair} not in current whitelist, removing from train queue.') + continue - if self.dd.pair_dict[pair]["priority"] != 1: - continue - dk = FreqaiDataKitchen(self.config, self.live, pair) - dk.set_paths(pair, trained_timestamp) - ( - retrain, - new_trained_timerange, - data_load_timerange, - ) = dk.check_if_new_training_required(trained_timestamp) - dk.set_paths(pair, new_trained_timerange.stopts) + (_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair) - if retrain: - self.train_timer('start') - self.extract_data_and_train_model( - new_trained_timerange, pair, strategy, dk, data_load_timerange - ) - self.train_timer('stop') + dk = FreqaiDataKitchen(self.config, self.live, pair) + dk.set_paths(pair, trained_timestamp) + ( + retrain, + new_trained_timerange, + data_load_timerange, + ) = dk.check_if_new_training_required(trained_timestamp) + dk.set_paths(pair, new_trained_timerange.stopts) - self.dd.save_historic_predictions_to_disk() + if retrain: + self.train_timer('start') + self.extract_data_and_train_model( + new_trained_timerange, pair, strategy, dk, data_load_timerange + ) + self.train_timer('stop') + + # only rotate the queue after the first has been trained. + self.train_queue.rotate(-1) + + self.dd.save_historic_predictions_to_disk() def start_backtesting( self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen @@ -557,9 +565,6 @@ class IFreqaiModel(ABC): self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts dk.set_new_model_names(pair, new_trained_timerange) - self.dd.pair_dict[pair]["first"] = False - if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning: - self.dd.pair_to_end_of_training_queue(pair) self.dd.save_data(model, pair, dk) if self.freqai_info.get("purge_old_models", False): @@ -685,6 +690,26 @@ class IFreqaiModel(ABC): return init_model + def _set_train_queue(self): + """ + Sets train queue from existing train timestamps if they exist + otherwise it sets the train queue based on the provided whitelist. + """ + current_pairlist = self.config.get("exchange", {}).get("pair_whitelist") + if not self.dd.pair_dict: + logger.info('Set fresh train queue from whitelist.') + return deque(current_pairlist) + + best_queue = deque() + + pair_dict_sorted = sorted(self.dd.pair_dict.items(), + key=lambda k: k[1]['trained_timestamp']) + for pair in pair_dict_sorted: + if pair[0] in current_pairlist: + best_queue.appendleft(pair[0]) + logger.info('Set existing queue from trained timestamps.') + return best_queue + # Following methods which are overridden by user made prediction models. # See freqai/prediction_models/CatboostPredictionModel.py for an example. diff --git a/freqtrade/templates/FreqaiExampleStrategy.py b/freqtrade/templates/FreqaiExampleStrategy.py index 15b2c6c83..0498ea564 100644 --- a/freqtrade/templates/FreqaiExampleStrategy.py +++ b/freqtrade/templates/FreqaiExampleStrategy.py @@ -45,7 +45,7 @@ class FreqaiExampleStrategy(IStrategy): std_dev_multiplier_buy = CategoricalParameter( [0.75, 1, 1.25, 1.5, 1.75], default=1.25, space="buy", optimize=True) std_dev_multiplier_sell = CategoricalParameter( - [0.1, 0.25, 0.4], space="sell", default=0.2, optimize=True) + [0.75, 1, 1.25, 1.5, 1.75], space="sell", default=1.25, optimize=True) def informative_pairs(self): whitelist_pairs = self.dp.current_whitelist()