catch errors occuring on background thread, and make sure to keep the ball rolling. Improve pair retraining queue.

This commit is contained in:
robcaulk 2022-05-28 18:26:19 +02:00
parent e54614fa2f
commit 83dd453723
3 changed files with 31 additions and 22 deletions

View File

@ -28,7 +28,7 @@ class FreqaiDataDrawer:
self.full_path = full_path
self.load_drawer_from_disk()
self.training_queue: Dict[str, int] = {}
self.create_training_queue(pair_whitelist)
# self.create_training_queue(pair_whitelist)
def load_drawer_from_disk(self):
exists = Path(self.full_path / str('pair_dictionary.json')).resolve().exists()
@ -58,7 +58,6 @@ class FreqaiDataDrawer:
model_filename = self.pair_dict[metadata['pair']]['model_filename'] = ''
coin_first = self.pair_dict[metadata['pair']]['first'] = True
trained_timestamp = self.pair_dict[metadata['pair']]['trained_timestamp'] = 0
self.pair_dict[metadata['pair']]['priority'] = 1
return model_filename, trained_timestamp, coin_first
@ -71,17 +70,16 @@ class FreqaiDataDrawer:
self.pair_dict[metadata['pair']]['model_filename'] = ''
self.pair_dict[metadata['pair']]['first'] = True
self.pair_dict[metadata['pair']]['trained_timestamp'] = 0
self.pair_dict[metadata['pair']]['priority'] = 1
self.pair_dict[metadata['pair']]['priority'] = len(self.pair_dict)
return
def create_training_queue(self, pairs: list) -> None:
for i, pair in enumerate(pairs):
self.training_queue[pair] = i + 1
# def create_training_queue(self, pairs: list) -> None:
# for i, pair in enumerate(pairs):
# self.training_queue[pair] = i + 1
def pair_to_end_of_training_queue(self, pair: str) -> None:
# march all pairs up in the queue
for p in self.training_queue:
self.training_queue[p] -= 1
for p in self.pair_dict:
self.pair_dict[p]['priority'] -= 1
# send pair to end of queue
self.training_queue[pair] = len(self.training_queue)
self.pair_dict[pair]['priority'] = len(self.pair_dict)

View File

@ -717,10 +717,12 @@ class FreqaiDataKitchen:
# enables persistence, but not fully implemented into save/load data yer
# self.data['live_trained_timerange'] = str(int(trained_timerange.stopts))
def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict) -> None:
def download_new_data_for_retraining(self, timerange: TimeRange, metadata: dict,
strategy: IStrategy) -> None:
exchange = ExchangeResolver.load_exchange(self.config['exchange']['name'],
self.config, validate=False, freqai=True)
# exchange = strategy.dp._exchange # closes ccxt session
pairs = copy.deepcopy(self.freqai_config.get('corr_pairlist', []))
if str(metadata['pair']) not in pairs:
pairs.append(str(metadata['pair']))
@ -766,7 +768,7 @@ class FreqaiDataKitchen:
base_dataframes: dict,
metadata: dict) -> DataFrame:
dataframe = base_dataframes[self.config['timeframe']]
dataframe = base_dataframes[self.config['timeframe']].copy()
pairs = self.freqai_config.get("corr_pairlist", [])
for tf in self.freqai_config.get("timeframes"):

View File

@ -85,7 +85,7 @@ class IFreqaiModel(ABC):
# determine what the current pair will do
if self.live:
if (not self.training_on_separate_thread and
self.data_drawer.training_queue[metadata['pair']] == 1):
self.data_drawer.pair_dict[metadata['pair']]['priority'] == 1):
self.dh = FreqaiDataKitchen(self.config, self.data_drawer,
self.live, metadata["pair"])
@ -313,13 +313,22 @@ class IFreqaiModel(ABC):
strategy: IStrategy, dh: FreqaiDataKitchen):
# with nostdout():
dh.download_new_data_for_retraining(new_trained_timerange, metadata)
dh.download_new_data_for_retraining(new_trained_timerange, metadata, strategy)
corr_dataframes, base_dataframes = dh.load_pairs_histories(new_trained_timerange,
metadata)
unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy,
corr_dataframes,
base_dataframes,
metadata)
# protecting from common benign errors associated with grabbing new data from exchange:
try:
unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy,
corr_dataframes,
base_dataframes,
metadata)
except Exception:
logger.warning('Mismatched sizes encountered in strategy')
self.data_drawer.pair_to_end_of_training_queue(metadata['pair'])
self.training_on_separate_thread = False
self.retrain = False
return
try:
model = self.train(unfiltered_dataframe, metadata, dh)
@ -333,8 +342,8 @@ class IFreqaiModel(ABC):
self.data_drawer.pair_dict[metadata['pair']][
'trained_timestamp'] = new_trained_timerange.stopts
dh.set_new_model_names(metadata, new_trained_timerange)
logger.info('Training queue'
f'{sorted(self.data_drawer.training_queue.items(), key=lambda item: item[1])}')
# logger.info('Training queue'
# f'{sorted(self.data_drawer.pair_dict.items(), key=lambda item: item[1])}')
dh.save_data(model, coin=metadata['pair'])
self.data_drawer.pair_to_end_of_training_queue(metadata['pair'])
@ -345,7 +354,7 @@ class IFreqaiModel(ABC):
def train_model_in_series(self, new_trained_timerange: TimeRange, metadata: dict,
strategy: IStrategy, dh: FreqaiDataKitchen):
dh.download_new_data_for_retraining(new_trained_timerange, metadata)
dh.download_new_data_for_retraining(new_trained_timerange, metadata, strategy)
corr_dataframes, base_dataframes = dh.load_pairs_histories(new_trained_timerange,
metadata)
@ -363,7 +372,7 @@ class IFreqaiModel(ABC):
dh.save_data(model, coin=metadata['pair'])
self.retrain = False
# Methods which are overridden by user made prediction models.
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModlel.py for an example.
@abstractmethod