improve data persistence/mapping for live/dry. This accommodates quick reloads after crash and handles multi-pair cleanly
This commit is contained in:
@@ -13,6 +13,7 @@ from pandas import DataFrame
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.enums import RunMode
|
||||
from freqtrade.freqai.data_drawer import FreqaiDataDrawer
|
||||
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
|
||||
from freqtrade.strategy.interface import IStrategy
|
||||
|
||||
@@ -65,11 +66,14 @@ class IFreqaiModel(ABC):
|
||||
self.training_on_separate_thread = False
|
||||
self.retrain = False
|
||||
self.first = True
|
||||
if self.freqai_info.get('live_trained_timerange'):
|
||||
self.new_trained_timerange = TimeRange.parse_timerange(
|
||||
self.freqai_info['live_trained_timerange'])
|
||||
else:
|
||||
self.new_trained_timerange = TimeRange()
|
||||
# if self.freqai_info.get('live_trained_timerange'):
|
||||
# self.new_trained_timerange = TimeRange.parse_timerange(
|
||||
# self.freqai_info['live_trained_timerange'])
|
||||
# else:
|
||||
# self.new_trained_timerange = TimeRange()
|
||||
|
||||
self.set_full_path()
|
||||
self.data_drawer = FreqaiDataDrawer(Path(self.full_path))
|
||||
|
||||
def assert_config(self, config: Dict[str, Any]) -> None:
|
||||
|
||||
@@ -86,7 +90,7 @@ class IFreqaiModel(ABC):
|
||||
|
||||
def start(self, dataframe: DataFrame, metadata: dict, strategy: IStrategy) -> DataFrame:
|
||||
"""
|
||||
Entry point to the FreqaiModel, it will train a new model if
|
||||
Entry point to the FreqaiModel from a specific pair, it will train a new model if
|
||||
necessary before making the prediction.
|
||||
The backtesting and training paradigm is a sliding training window
|
||||
with a following backtest window. Both windows slide according to the
|
||||
@@ -103,8 +107,8 @@ class IFreqaiModel(ABC):
|
||||
|
||||
self.live = strategy.dp.runmode in (RunMode.DRY_RUN, RunMode.LIVE)
|
||||
|
||||
self.pair = metadata["pair"]
|
||||
self.dh = FreqaiDataKitchen(self.config, dataframe, self.live)
|
||||
# FreqaiDataKitchen is reinstantiated for each coin
|
||||
self.dh = FreqaiDataKitchen(self.config, self.data_drawer, self.live, metadata["pair"])
|
||||
|
||||
if self.live:
|
||||
# logger.info('testing live')
|
||||
@@ -113,7 +117,7 @@ class IFreqaiModel(ABC):
|
||||
return (self.dh.full_predictions, self.dh.full_do_predict,
|
||||
self.dh.full_target_mean, self.dh.full_target_std)
|
||||
|
||||
logger.info("going to train %s timeranges", len(self.dh.training_timeranges))
|
||||
logger.info(f'Training {len(self.dh.training_timeranges)} timeranges')
|
||||
|
||||
# Loop enforcing the sliding window training/backtesting paradigm
|
||||
# tr_train is the training time range e.g. 1 historical month
|
||||
@@ -129,9 +133,12 @@ class IFreqaiModel(ABC):
|
||||
self.training_timerange = tr_train
|
||||
dataframe_train = self.dh.slice_dataframe(tr_train, dataframe)
|
||||
dataframe_backtest = self.dh.slice_dataframe(tr_backtest, dataframe)
|
||||
logger.info("training %s for %s", self.pair, tr_train)
|
||||
self.dh.model_path = Path(self.dh.full_path / str("sub-train" + "-" + str(tr_train)))
|
||||
if not self.model_exists(self.pair, training_timerange=tr_train):
|
||||
logger.info("training %s for %s", metadata["pair"], tr_train)
|
||||
trained_timestamp = TimeRange.parse_timerange(tr_train)
|
||||
self.dh.data_path = Path(self.dh.full_path /
|
||||
str("sub-train" + "-" + metadata['pair'].split("/")[0] +
|
||||
str(int(trained_timestamp.stopts))))
|
||||
if not self.model_exists(metadata["pair"], trained_timestamp=trained_timestamp.stopts):
|
||||
self.model = self.train(dataframe_train, metadata)
|
||||
self.dh.save_data(self.model)
|
||||
else:
|
||||
@@ -161,36 +168,40 @@ class IFreqaiModel(ABC):
|
||||
|
||||
"""
|
||||
|
||||
self.dh.set_paths()
|
||||
(model_filename,
|
||||
trained_timestamp,
|
||||
coin_first) = self.data_drawer.get_pair_dict_info(metadata)
|
||||
|
||||
file_exists = self.model_exists(metadata['pair'],
|
||||
training_timerange=self.freqai_info[
|
||||
'live_trained_timerange'])
|
||||
if trained_timestamp != 0:
|
||||
self.dh.set_paths(trained_timestamp)
|
||||
# data_drawer thinks the file eixts, verify here
|
||||
file_exists = self.model_exists(metadata['pair'],
|
||||
trained_timestamp=trained_timestamp,
|
||||
model_filename=model_filename)
|
||||
|
||||
if not self.training_on_separate_thread:
|
||||
# this will also prevent other pairs from trying to train simultaneously.
|
||||
(self.retrain,
|
||||
self.new_trained_timerange) = self.dh.check_if_new_training_required(
|
||||
self.new_trained_timerange,
|
||||
metadata)
|
||||
new_trained_timerange) = self.dh.check_if_new_training_required(
|
||||
trained_timestamp)
|
||||
self.dh.set_paths(new_trained_timerange.stopts)
|
||||
else:
|
||||
logger.info("FreqAI training a new model on background thread.")
|
||||
self.retrain = False
|
||||
|
||||
if self.retrain or not file_exists:
|
||||
if self.first:
|
||||
self.train_model_in_series(self.new_trained_timerange, metadata, strategy)
|
||||
self.first = False
|
||||
if coin_first:
|
||||
self.train_model_in_series(new_trained_timerange, metadata, strategy)
|
||||
else:
|
||||
self.training_on_separate_thread = True # acts like a lock
|
||||
self.retrain_model_on_separate_thread(self.new_trained_timerange,
|
||||
self.retrain_model_on_separate_thread(new_trained_timerange,
|
||||
metadata, strategy)
|
||||
|
||||
self.model = self.dh.load_data()
|
||||
self.model = self.dh.load_data(coin=metadata['pair'])
|
||||
|
||||
strategy_provided_features = self.dh.find_features(dataframe)
|
||||
if strategy_provided_features != self.dh.training_features_list:
|
||||
self.train_model_in_series(self.new_trained_timerange, metadata, strategy)
|
||||
self.train_model_in_series(new_trained_timerange, metadata, strategy)
|
||||
|
||||
preds, do_preds = self.predict(dataframe, metadata)
|
||||
self.dh.append_predictions(preds, do_preds, len(dataframe))
|
||||
@@ -252,24 +263,34 @@ class IFreqaiModel(ABC):
|
||||
if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'):
|
||||
self.dh.check_if_pred_in_training_spaces() # sets do_predict
|
||||
|
||||
def model_exists(self, pair: str, training_timerange: str) -> bool:
|
||||
def model_exists(self, pair: str, trained_timestamp: int = None,
|
||||
model_filename: str = '') -> bool:
|
||||
"""
|
||||
Given a pair and path, check if a model already exists
|
||||
:param pair: pair e.g. BTC/USD
|
||||
:param path: path to model
|
||||
"""
|
||||
if self.live and training_timerange == "":
|
||||
return False
|
||||
coin, _ = pair.split("/")
|
||||
self.dh.model_filename = "cb_" + coin.lower() + "_" + training_timerange
|
||||
path_to_modelfile = Path(self.dh.model_path / str(self.dh.model_filename + "_model.joblib"))
|
||||
|
||||
if self.live and trained_timestamp is None:
|
||||
self.dh.model_filename = model_filename
|
||||
else:
|
||||
self.dh.model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp)
|
||||
|
||||
path_to_modelfile = Path(self.dh.data_path / str(self.dh.model_filename + "_model.joblib"))
|
||||
file_exists = path_to_modelfile.is_file()
|
||||
if file_exists:
|
||||
logger.info("Found model at %s", self.dh.model_path / self.dh.model_filename)
|
||||
logger.info("Found model at %s", self.dh.data_path / self.dh.model_filename)
|
||||
else:
|
||||
logger.info("Could not find model at %s", self.dh.model_path / self.dh.model_filename)
|
||||
logger.info("Could not find model at %s", self.dh.data_path / self.dh.model_filename)
|
||||
return file_exists
|
||||
|
||||
def set_full_path(self) -> None:
|
||||
self.full_path = Path(self.config['user_data_dir'] /
|
||||
"models" /
|
||||
str(self.freqai_info.get('live_full_backtestrange') +
|
||||
self.freqai_info.get('identifier')))
|
||||
|
||||
@threaded
|
||||
def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, metadata: dict,
|
||||
strategy: IStrategy):
|
||||
@@ -285,7 +306,13 @@ class IFreqaiModel(ABC):
|
||||
metadata)
|
||||
|
||||
self.model = self.train(unfiltered_dataframe, metadata)
|
||||
self.dh.save_data(self.model)
|
||||
|
||||
self.data_drawer.pair_dict[metadata['pair']][
|
||||
'trained_timestamp'] = new_trained_timerange.stopts
|
||||
|
||||
self.dh.set_new_model_names(metadata, new_trained_timerange)
|
||||
|
||||
self.dh.save_data(self.model, coin=metadata['pair'])
|
||||
|
||||
self.training_on_separate_thread = False
|
||||
self.retrain = False
|
||||
@@ -303,7 +330,14 @@ class IFreqaiModel(ABC):
|
||||
metadata)
|
||||
|
||||
self.model = self.train(unfiltered_dataframe, metadata)
|
||||
self.dh.save_data(self.model)
|
||||
|
||||
self.data_drawer.pair_dict[metadata['pair']][
|
||||
'trained_timestamp'] = new_trained_timerange.stopts
|
||||
|
||||
self.dh.set_new_model_names(metadata, new_trained_timerange)
|
||||
|
||||
self.data_drawer.pair_dict[metadata['pair']]['first'] = False
|
||||
self.dh.save_data(self.model, coin=metadata['pair'])
|
||||
self.retrain = False
|
||||
|
||||
# Methods which are overridden by user made prediction models.
|
||||
|
Reference in New Issue
Block a user