improve train queue system, ensure crash resilience in train queue.

This commit is contained in:
robcaulk 2022-09-18 17:00:55 +02:00
parent e5368f5a14
commit eaa43337d2
3 changed files with 50 additions and 37 deletions

View File

@ -27,9 +27,7 @@ logger = logging.getLogger(__name__)
class pair_info(TypedDict): class pair_info(TypedDict):
model_filename: str model_filename: str
first: bool
trained_timestamp: int trained_timestamp: int
priority: int
data_path: str data_path: str
extras: dict extras: dict
@ -91,7 +89,7 @@ class FreqaiDataDrawer:
self.old_DBSCAN_eps: Dict[str, float] = {} self.old_DBSCAN_eps: Dict[str, float] = {}
self.empty_pair_dict: pair_info = { self.empty_pair_dict: pair_info = {
"model_filename": "", "trained_timestamp": 0, "model_filename": "", "trained_timestamp": 0,
"priority": 1, "first": True, "data_path": "", "extras": {}} "data_path": "", "extras": {}}
def load_drawer_from_disk(self): def load_drawer_from_disk(self):
""" """
@ -216,7 +214,6 @@ class FreqaiDataDrawer:
self.pair_dict[pair] = self.empty_pair_dict.copy() self.pair_dict[pair] = self.empty_pair_dict.copy()
model_filename = "" model_filename = ""
trained_timestamp = 0 trained_timestamp = 0
self.pair_dict[pair]["priority"] = len(self.pair_dict)
if not data_path_set and self.follow_mode: if not data_path_set and self.follow_mode:
logger.warning( logger.warning(
@ -236,18 +233,9 @@ class FreqaiDataDrawer:
return return
else: else:
self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy() self.pair_dict[metadata["pair"]] = self.empty_pair_dict.copy()
self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict)
return return
def pair_to_end_of_training_queue(self, pair: str) -> None:
# march all pairs up in the queue
with self.pair_dict_lock:
for p in self.pair_dict:
self.pair_dict[p]["priority"] -= 1
# send pair to end of queue
self.pair_dict[pair]["priority"] = len(self.pair_dict)
def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None: def set_initial_return_values(self, pair: str, pred_df: DataFrame) -> None:
""" """
Set the initial return values to the historical predictions dataframe. This avoids needing Set the initial return values to the historical predictions dataframe. This avoids needing

View File

@ -3,6 +3,7 @@ import shutil
import threading import threading
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from threading import Lock from threading import Lock
@ -80,6 +81,7 @@ class IFreqaiModel(ABC):
self.pair_it = 0 self.pair_it = 0
self.pair_it_train = 0 self.pair_it_train = 0
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist")) self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
self.train_queue = self._set_train_queue()
self.last_trade_database_summary: DataFrame = {} self.last_trade_database_summary: DataFrame = {}
self.current_trade_database_summary: DataFrame = {} self.current_trade_database_summary: DataFrame = {}
self.analysis_lock = Lock() self.analysis_lock = Lock()
@ -180,30 +182,36 @@ class IFreqaiModel(ABC):
:param strategy: IStrategy = The user defined strategy class :param strategy: IStrategy = The user defined strategy class
""" """
while not self._stop_event.is_set(): while not self._stop_event.is_set():
time.sleep(1) pair = self.train_queue[0]
for pair in self.config.get("exchange", {}).get("pair_whitelist"):
(_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair) # ensure pair is avaialble in dp
if pair not in strategy.dp.current_whitelist():
self.train_queue.popleft()
logger.warning(f'{pair} not in current whitelist, removing from train queue.')
continue
if self.dd.pair_dict[pair]["priority"] != 1: (_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair)
continue
dk = FreqaiDataKitchen(self.config, self.live, pair)
dk.set_paths(pair, trained_timestamp)
(
retrain,
new_trained_timerange,
data_load_timerange,
) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
if retrain: dk = FreqaiDataKitchen(self.config, self.live, pair)
self.train_timer('start') dk.set_paths(pair, trained_timestamp)
self.extract_data_and_train_model( (
new_trained_timerange, pair, strategy, dk, data_load_timerange retrain,
) new_trained_timerange,
self.train_timer('stop') data_load_timerange,
) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
self.dd.save_historic_predictions_to_disk() if retrain:
self.train_timer('start')
self.extract_data_and_train_model(
new_trained_timerange, pair, strategy, dk, data_load_timerange
)
self.train_timer('stop')
# only rotate the queue after the first has been trained.
self.train_queue.rotate(-1)
self.dd.save_historic_predictions_to_disk()
def start_backtesting( def start_backtesting(
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
@ -557,9 +565,6 @@ class IFreqaiModel(ABC):
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange) dk.set_new_model_names(pair, new_trained_timerange)
self.dd.pair_dict[pair]["first"] = False
if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning:
self.dd.pair_to_end_of_training_queue(pair)
self.dd.save_data(model, pair, dk) self.dd.save_data(model, pair, dk)
if self.freqai_info.get("purge_old_models", False): if self.freqai_info.get("purge_old_models", False):
@ -685,6 +690,26 @@ class IFreqaiModel(ABC):
return init_model return init_model
def _set_train_queue(self):
"""
Sets train queue from existing train timestamps if they exist
otherwise it sets the train queue based on the provided whitelist.
"""
current_pairlist = self.config.get("exchange", {}).get("pair_whitelist")
if not self.dd.pair_dict:
logger.info('Set fresh train queue from whitelist.')
return deque(current_pairlist)
best_queue = deque()
pair_dict_sorted = sorted(self.dd.pair_dict.items(),
key=lambda k: k[1]['trained_timestamp'])
for pair in pair_dict_sorted:
if pair[0] in current_pairlist:
best_queue.appendleft(pair[0])
logger.info('Set existing queue from trained timestamps.')
return best_queue
# Following methods which are overridden by user made prediction models. # Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModel.py for an example. # See freqai/prediction_models/CatboostPredictionModel.py for an example.

View File

@ -45,7 +45,7 @@ class FreqaiExampleStrategy(IStrategy):
std_dev_multiplier_buy = CategoricalParameter( std_dev_multiplier_buy = CategoricalParameter(
[0.75, 1, 1.25, 1.5, 1.75], default=1.25, space="buy", optimize=True) [0.75, 1, 1.25, 1.5, 1.75], default=1.25, space="buy", optimize=True)
std_dev_multiplier_sell = CategoricalParameter( std_dev_multiplier_sell = CategoricalParameter(
[0.1, 0.25, 0.4], space="sell", default=0.2, optimize=True) [0.75, 1, 1.25, 1.5, 1.75], space="sell", default=1.25, optimize=True)
def informative_pairs(self): def informative_pairs(self):
whitelist_pairs = self.dp.current_whitelist() whitelist_pairs = self.dp.current_whitelist()