better handling of interaction between n_points and jobs

This commit is contained in:
orehunt 2020-03-09 13:34:10 +01:00
parent 15749d3427
commit 60e519eac4

View File

@ -138,8 +138,10 @@ class Hyperopt:
# if 0 n_points are given, don't use any base estimator (akin to random search) # if 0 n_points are given, don't use any base estimator (akin to random search)
if self.n_points < 1: if self.n_points < 1:
self.n_points = 1 self.n_points = 1
self.opt_base_estimator = 'DUMMY' self.opt_base_estimator = "DUMMY"
self.opt_acq_optimizer = 'sampling' self.opt_acq_optimizer = "sampling"
# var used in epochs and batches calculation
self.opt_points = self.n_jobs * self.n_points
# models are only needed for posterior eval # models are only needed for posterior eval
self.n_models = max(16, self.n_jobs) self.n_models = max(16, self.n_jobs)
@ -597,8 +599,9 @@ class Hyperopt:
""" """
vals = [] vals = []
to_ask: deque = deque() to_ask: deque = deque()
evald: List[List] = [] evald: set(Tuple) = set()
fit = False fit = False
opt = self.opt
for r in range(tries): for r in range(tries):
while not backend.results.empty(): while not backend.results.empty():
vals.append(backend.results.get()) vals.append(backend.results.get())
@ -607,7 +610,7 @@ class Hyperopt:
# to reduce noise # to reduce noise
vals = list(filter(lambda v: v['loss'] != MAX_LOSS, vals)) vals = list(filter(lambda v: v['loss'] != MAX_LOSS, vals))
if vals: # again if all are filtered if vals: # again if all are filtered
self.opt.tell([list(v['params_dict'].values()) for v in vals], opt.tell([list(v['params_dict'].values()) for v in vals],
[v['loss'] for v in vals], [v['loss'] for v in vals],
fit=fit) fit=fit)
if fit: if fit:
@ -615,14 +618,17 @@ class Hyperopt:
vals = [] vals = []
if not to_ask: if not to_ask:
self.opt.update_next() opt.update_next()
to_ask.extend(self.opt.ask(n_points=self.n_points)) to_ask.extend(opt.ask(n_points=self.n_points))
fit = True fit = True
a = to_ask.popleft() a = tuple(to_ask.popleft())
while a in evald and len(to_ask) > 0: while a in evald:
logger.info('this point was evaluated before...') logger.info("this point was evaluated before...")
a = to_ask.popleft() if len(to_ask) > 0:
evald.append(a) a = tuple(to_ask.popleft())
else:
break
evald.add(a)
yield a yield a
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue): def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue):
@ -666,7 +672,8 @@ class Hyperopt:
opt.tell(Xi, yi, fit=False) opt.tell(Xi, yi, fit=False)
# update the board with the new results # update the board with the new results
results = results_board.get() results = results_board.get()
results.append([f_val, jobs - 1]) no_max_loss_results = list(filter(lambda v: v["loss"] != MAX_LOSS, f_val))
results.append([no_max_loss_results, jobs - 1])
results_board.put(results) results_board.put(results)
# send back the updated optimizer # send back the updated optimizer
optimizers.put(opt) optimizers.put(opt)
@ -743,11 +750,14 @@ class Hyperopt:
return random_state or random.randint(1, 2**16 - 1) return random_state or random.randint(1, 2**16 - 1)
@staticmethod @staticmethod
def calc_epochs(dimensions: List[Dimension], n_jobs: int, effort: float, total_epochs: int): def calc_epochs(
dimensions: List[Dimension], n_jobs: int, effort: float, total_epochs: int, n_points: int
):
""" Compute a reasonable number of initial points and """ Compute a reasonable number of initial points and
a minimum number of epochs to evaluate """ a minimum number of epochs to evaluate """
n_dimensions = len(dimensions) n_dimensions = len(dimensions)
n_parameters = 0 n_parameters = 0
opt_points = n_jobs * n_points
# sum all the dimensions discretely, granting minimum values # sum all the dimensions discretely, granting minimum values
for d in dimensions: for d in dimensions:
if type(d).__name__ == 'Integer': if type(d).__name__ == 'Integer':
@ -762,22 +772,22 @@ class Hyperopt:
(factorial(n_parameters) / (factorial(n_parameters) /
(factorial(n_parameters - n_dimensions) * factorial(n_dimensions)))) (factorial(n_parameters - n_dimensions) * factorial(n_dimensions))))
# logger.info(f'Search space size: {search_space_size}') # logger.info(f'Search space size: {search_space_size}')
if search_space_size < n_jobs: if search_space_size < opt_points:
# don't waste if the space is small # don't waste if the space is small
n_initial_points = n_jobs n_initial_points = opt_points // 3
min_epochs = n_jobs min_epochs = opt_points
elif total_epochs > 0: elif total_epochs > 0:
n_initial_points = total_epochs // 3 if total_epochs > n_jobs * 3 else n_jobs n_initial_points = total_epochs // 3 if total_epochs > opt_points * 3 else opt_points
min_epochs = n_initial_points min_epochs = total_epochs
else: else:
# extract coefficients from the search space and the jobs count # extract coefficients from the search space and the jobs count
log_sss = int(log(search_space_size, 10)) log_sss = int(log(search_space_size, 10))
log_jobs = int(log(n_jobs, 2)) if n_jobs > 4 else 2 log_opt = int(log(opt_points, 2)) if opt_points > 4 else 2
jobs_ip = log_jobs * log_sss opt_ip = log_opt * log_sss
# never waste # never waste
n_initial_points = log_sss if jobs_ip > search_space_size else jobs_ip n_initial_points = log_sss if opt_ip > search_space_size else opt_ip
# it shall run for this much, I say # it shall run for this much, I say
min_epochs = int(max(n_initial_points, n_jobs) * (1 + effort) + n_initial_points) min_epochs = int(max(n_initial_points, opt_points) * (1 + effort) + n_initial_points)
return n_initial_points, min_epochs, search_space_size return n_initial_points, min_epochs, search_space_size
def update_max_epoch(self, val: Dict, current: int): def update_max_epoch(self, val: Dict, current: int):
@ -809,7 +819,7 @@ class Hyperopt:
# generate as many optimizers as are still needed to fill the job count # generate as many optimizers as are still needed to fill the job count
remaining = self.n_jobs - backend.optimizers.qsize() remaining = self.n_jobs - backend.optimizers.qsize()
if remaining > 0: if remaining > 0:
opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points) opt = self.get_optimizer(self.dimensions, self.n_jobs, self.opt_n_initial_points)
for _ in range(remaining): # generate optimizers for _ in range(remaining): # generate optimizers
# random state is preserved # random state is preserved
backend.optimizers.put( backend.optimizers.put(
@ -822,7 +832,9 @@ class Hyperopt:
if len(opts) > 0: if len(opts) > 0:
self.opt = opts[-1] self.opt = opts[-1]
else: else:
self.opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points) self.opt = self.get_optimizer(
self.dimensions, self.n_jobs, self.opt_n_initial_points
)
del opts[:] del opts[:]
def start(self) -> None: def start(self) -> None:
@ -856,10 +868,13 @@ class Hyperopt:
self.dimensions: List[Dimension] = self.hyperopt_space() self.dimensions: List[Dimension] = self.hyperopt_space()
self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs( self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs(
self.dimensions, self.n_jobs, self.effort, self.total_epochs) self.dimensions, self.n_jobs, self.effort, self.total_epochs, self.n_points
# reduce random points by the number of optimizers in multi mode )
# reduce random points by n_points in multi mode because asks are per job
if self.multi: if self.multi:
self.n_initial_points = self.n_initial_points // self.n_jobs self.opt_n_initial_points = self.n_initial_points // self.n_points
else:
self.opt_n_initial_points = self.n_initial_points
logger.info(f"Min epochs set to: {self.min_epochs}") logger.info(f"Min epochs set to: {self.min_epochs}")
# if total epochs are not set, max_epoch takes its place # if total epochs are not set, max_epoch takes its place
if self.total_epochs < 1: if self.total_epochs < 1: