- make sure dispatches always perform the given n of epochs

- prints fixes
This commit is contained in:
orehunt 2020-03-18 18:36:10 +01:00
parent a982eae622
commit 9e0b07b2fd

View File

@ -2,6 +2,8 @@
"""
This module contains the hyperopt logic
"""
import os
import functools
import locale
import logging
@ -175,7 +177,6 @@ class Hyperopt:
backend.results = backend.manager.Queue()
self.opt_acq_optimizer = 'sampling'
self.opt_base_estimator = lambda: 'ET'
default_n_points = 1
# The GaussianProcessRegressor is heavy, which makes it not a good default
# however longer backtests might make it a better tradeoff
# self.opt_base_estimator = lambda: 'GP'
@ -727,20 +728,28 @@ class Hyperopt:
elif initial_points < 1:
opt.tell(opt.Xi, opt.yi)
# ask for points according to config
asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat())
# wrap in a list when asked for 1 point
if not self.ask_points:
asked = [asked]
# check if some points have been evaluated by other optimizers
p_asked = self.opt_get_past_points({tuple(a): None for a in asked}, results_board)
Xi_d = [] # done
yi_d = []
Xi_t = [] # to do
for a in p_asked:
if p_asked[a] is not None:
Xi_d.append(a)
# ask for points according to config
while True:
asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat())
if not self.ask_points:
asked = {tuple(asked): None}
else:
Xi_t.append(a)
asked = {tuple(a): None for a in asked}
# check if some points have been evaluated by other optimizers
p_asked = self.opt_get_past_points(asked, results_board)
for a in p_asked:
if p_asked[a] is not None:
Xi_d.append(a)
yi_d.append(p_asked[a])
else:
Xi_t.append(a)
if len(Xi_t) < self.n_points:
opt.update_next()
else:
break
# run the backtest for each point to do (Xi_t)
f_val = [self.backtest_params(a) for a in Xi_t]
# filter losses
@ -748,7 +757,7 @@ class Hyperopt:
# add points of the current dispatch if any
if opt.void_loss != VOID_LOSS or len(void_filtered) > 0:
Xi = [*Xi_d, *[list(v['params_dict'].values()) for v in void_filtered]]
yi = [*[p_asked[a] for a in Xi_d], *[v['loss'] for v in void_filtered]]
yi = [*yi_d, *[v['loss'] for v in void_filtered]]
void = False
if is_shared:
# refresh the optimizer that stores all the points
@ -827,6 +836,7 @@ class Hyperopt:
# sorting from lowest to highest, the first value is the current best
best = sorted(best_epochs, key=lambda k: k["loss"])[0]
self.current_best_epoch = best["current_epoch"]
self.current_best_loss = best["loss"]
self.avg_best_occurrence = len_trials // len_best
return True
return False
@ -1021,30 +1031,36 @@ class Hyperopt:
jobs_scheduler = self.run_backtest_parallel
with parallel_backend('loky', inner_max_num_threads=2):
with Parallel(n_jobs=self.n_jobs, verbose=0, backend='loky') as parallel:
jobs = parallel._effective_n_jobs()
logger.info(f'Effective number of parallel workers used: {jobs}')
# update epochs count
n_points = self.n_points
prev_batch = -1
epochs_so_far = len(self.trials)
epochs_limit = self.epochs_limit
columns, _ = os.get_terminal_size()
columns -= 1
while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs:
prev_batch = epochs_so_far
occurrence = int(self.avg_best_occurrence * (1 + self.effort))
# pad the batch length to the number of jobs to avoid desaturation
batch_len = (self.avg_best_occurrence + self.n_jobs -
self.avg_best_occurrence % self.n_jobs)
batch_len = (occurrence + jobs -
occurrence % jobs)
# when using multiple optimizers each worker performs
# n_points (epochs) in 1 dispatch but this reduces the batch len too much
# if self.multi: batch_len = batch_len // self.n_points
# don't go over the limit
if epochs_so_far + batch_len * n_points > epochs_limit():
if epochs_so_far + batch_len * n_points >= epochs_limit():
q, r = divmod(epochs_limit() - epochs_so_far, n_points)
batch_len = q + r
print(
f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}"
f"/{epochs_limit()}: ",
end='')
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, self.n_jobs)
print(' ' * batch_len * n_points, end='\r')
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs)
print(end='\r')
saved = self.log_results(f_val, epochs_so_far, epochs_limit())
print('\r', ' ' * columns, end='\r')
# stop if no epochs have been evaluated
if len(f_val) < batch_len:
logger.warning("Some evaluated epochs were void, "