Merge pull request #7434 from freqtrade/improve-train-queue

improve train queue system in FreqAI
This commit is contained in:
Robert Caulk 2022-09-19 10:55:53 +02:00 committed by GitHub
commit f9460c80c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 36 deletions

View File

@ -28,9 +28,7 @@ logger = logging.getLogger(__name__)
class pair_info(TypedDict): class pair_info(TypedDict):
model_filename: str model_filename: str
first: bool
trained_timestamp: int trained_timestamp: int
priority: int
data_path: str data_path: str
extras: dict extras: dict
@ -92,7 +90,7 @@ class FreqaiDataDrawer:
self.old_DBSCAN_eps: Dict[str, float] = {} self.old_DBSCAN_eps: Dict[str, float] = {}
self.empty_pair_dict: pair_info = { self.empty_pair_dict: pair_info = {
"model_filename": "", "trained_timestamp": 0, "model_filename": "", "trained_timestamp": 0,
"priority": 1, "first": True, "data_path": "", "extras": {}} "data_path": "", "extras": {}}
def load_drawer_from_disk(self): def load_drawer_from_disk(self):
""" """
@ -217,7 +215,6 @@ class FreqaiDataDrawer:
self.pair_dict[pair] = self.empty_pair_dict.copy() self.pair_dict[pair] = self.empty_pair_dict.copy()
model_filename = "" model_filename = ""
trained_timestamp = 0 trained_timestamp = 0
self.pair_dict[pair]["priority"] = len(self.pair_dict)
if not data_path_set and self.follow_mode: if not data_path_set and self.follow_mode:
logger.warning( logger.warning(
@ -237,18 +234,9 @@ class FreqaiDataDrawer:
return return
else: else:
self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy() self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy()
self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict)
return return
def pair_to_end_of_training_queue(self, pair: str) -> None:
# march all pairs up in the queue
with self.pair_dict_lock:
for p in self.pair_dict:
self.pair_dict[p]["priority"] -= 1
# send pair to end of queue
self.pair_dict[pair]["priority"] = len(self.pair_dict)
def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None: def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None:
""" """
Set the initial return values to the historical predictions dataframe. This avoids needing Set the initial return values to the historical predictions dataframe. This avoids needing

View File

@ -3,6 +3,7 @@ import shutil
import threading import threading
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from threading import Lock from threading import Lock
@ -81,6 +82,7 @@ class IFreqaiModel(ABC):
self.pair_it = 0 self.pair_it = 0
self.pair_it_train = 0 self.pair_it_train = 0
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist")) self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
self.train_queue = self._set_train_queue()
self.last_trade_database_summary: DataFrame = {} self.last_trade_database_summary: DataFrame = {}
self.current_trade_database_summary: DataFrame = {} self.current_trade_database_summary: DataFrame = {}
self.analysis_lock = Lock() self.analysis_lock = Lock()
@ -182,29 +184,36 @@ class IFreqaiModel(ABC):
""" """
while not self._stop_event.is_set(): while not self._stop_event.is_set():
time.sleep(1) time.sleep(1)
for pair in self.config.get("exchange", {}).get("pair_whitelist"): pair = self.train_queue[0]
(_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair) # ensure pair is avaialble in dp
if pair not in strategy.dp.current_whitelist():
self.train_queue.popleft()
logger.warning(f'{pair} not in current whitelist, removing from train queue.')
continue
if self.dd.pair_dict[pair]["priority"] != 1: (_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair)
continue
dk = FreqaiDataKitchen(self.config, self.live, pair)
dk.set_paths(pair, trained_timestamp)
(
retrain,
new_trained_timerange,
data_load_timerange,
) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
if retrain: dk = FreqaiDataKitchen(self.config, self.live, pair)
self.train_timer('start') dk.set_paths(pair, trained_timestamp)
self.extract_data_and_train_model( (
new_trained_timerange, pair, strategy, dk, data_load_timerange retrain,
) new_trained_timerange,
self.train_timer('stop') data_load_timerange,
) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
self.dd.save_historic_predictions_to_disk() if retrain:
self.train_timer('start')
self.extract_data_and_train_model(
new_trained_timerange, pair, strategy, dk, data_load_timerange
)
self.train_timer('stop')
# only rotate the queue after the first has been trained.
self.train_queue.rotate(-1)
self.dd.save_historic_predictions_to_disk()
def start_backtesting( def start_backtesting(
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
@ -558,9 +567,6 @@ class IFreqaiModel(ABC):
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange) dk.set_new_model_names(pair, new_trained_timerange)
self.dd.pair_dict[pair]["first"] = False
if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning:
self.dd.pair_to_end_of_training_queue(pair)
self.dd.save_data(model, pair, dk) self.dd.save_data(model, pair, dk)
if self.freqai_info["feature_parameters"].get("plot_feature_importance", False): if self.freqai_info["feature_parameters"].get("plot_feature_importance", False):
@ -689,6 +695,30 @@ class IFreqaiModel(ABC):
return init_model return init_model
def _set_train_queue(self):
"""
Sets train queue from existing train timestamps if they exist
otherwise it sets the train queue based on the provided whitelist.
"""
current_pairlist = self.config.get("exchange", {}).get("pair_whitelist")
if not self.dd.pair_dict:
logger.info('Set fresh train queue from whitelist.')
return deque(current_pairlist)
best_queue = deque()
pair_dict_sorted = sorted(self.dd.pair_dict.items(),
key=lambda k: k[1]['trained_timestamp'])
for pair in pair_dict_sorted:
if pair[0] in current_pairlist:
best_queue.appendleft(pair[0])
for pair in current_pairlist:
if pair not in best_queue:
best_queue.appendleft(pair)
logger.info('Set existing queue from trained timestamps.')
return best_queue
# Following methods which are overridden by user made prediction models. # Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModel.py for an example. # See freqai/prediction_models/CatboostPredictionModel.py for an example.

View File

@ -45,7 +45,7 @@ class FreqaiExampleStrategy(IStrategy):
std_dev_multiplier_buy = CategoricalParameter( std_dev_multiplier_buy = CategoricalParameter(
[0.75, 1, 1.25, 1.5, 1.75], default=1.25, space="buy", optimize=True) [0.75, 1, 1.25, 1.5, 1.75], default=1.25, space="buy", optimize=True)
std_dev_multiplier_sell = CategoricalParameter( std_dev_multiplier_sell = CategoricalParameter(
[0.1, 0.25, 0.4], space="sell", default=0.2, optimize=True) [0.75, 1, 1.25, 1.5, 1.75], space="sell", default=1.25, optimize=True)
def populate_any_indicators( def populate_any_indicators(
self, pair, df, tf, informative=None, set_generalized_indicators=False self, pair, df, tf, informative=None, set_generalized_indicators=False