- reduction of pickling time by using epochs to load points

- use object state just for rng and init points status, don't save models or points
- other counting edge cases fixes
This commit is contained in:
orehunt 2020-03-24 12:06:35 +01:00
parent cc47f3e1e4
commit 6b9bc7c83f
2 changed files with 248 additions and 124 deletions

View File

@ -4,7 +4,6 @@ This module contains the hyperopt logic
""" """
import os import os
import functools
import locale import locale
import logging import logging
import random import random
@ -92,7 +91,9 @@ class Hyperopt:
self.n_jobs = self.config.get('hyperopt_jobs', -1) self.n_jobs = self.config.get('hyperopt_jobs', -1)
if self.n_jobs < 0: if self.n_jobs < 0:
self.n_jobs = cpu_count() // 2 or 1 self.n_jobs = cpu_count() // 2 or 1
self.effort = self.config['effort'] if 'effort' in self.config else 0 self.effort = max(0.01,
self.config['effort'] if 'effort' in self.config else 1
)
self.total_epochs = self.config['epochs'] if 'epochs' in self.config else 0 self.total_epochs = self.config['epochs'] if 'epochs' in self.config else 0
self.max_epoch = 0 self.max_epoch = 0
self.max_epoch_reached = False self.max_epoch_reached = False
@ -155,6 +156,8 @@ class Hyperopt:
# optimizers # optimizers
self.opts: List[Optimizer] = [] self.opts: List[Optimizer] = []
self.opt: Optimizer = None self.opt: Optimizer = None
self.Xi: Dict = {}
self.yi: Dict = {}
backend.manager = Manager() backend.manager = Manager()
self.mode = self.config.get('mode', 'single') self.mode = self.config.get('mode', 'single')
@ -170,11 +173,14 @@ class Hyperopt:
self.opt_base_estimator = self.estimators self.opt_base_estimator = self.estimators
self.opt_acq_optimizer = 'sampling' self.opt_acq_optimizer = 'sampling'
backend.optimizers = backend.manager.Queue() backend.optimizers = backend.manager.Queue()
backend.results_board = backend.manager.Queue(maxsize=1) backend.results_batch = backend.manager.Queue()
backend.results_board.put({})
else: else:
self.multi = False self.multi = False
backend.results = backend.manager.Queue() backend.results_list = backend.manager.list([])
# this is where opt_ask_and_tell stores the results after points are
# used for fit and predict, to avoid additional pickling
self.batch_results = []
# self.opt_base_estimator = lambda: BayesianRidge(n_iter=100, normalize=True)
self.opt_acq_optimizer = 'sampling' self.opt_acq_optimizer = 'sampling'
self.opt_base_estimator = lambda: 'ET' self.opt_base_estimator = lambda: 'ET'
# The GaussianProcessRegressor is heavy, which makes it not a good default # The GaussianProcessRegressor is heavy, which makes it not a good default
@ -262,14 +268,20 @@ class Hyperopt:
n_opts = 0 n_opts = 0
if self.multi: if self.multi:
while not backend.optimizers.empty(): while not backend.optimizers.empty():
opts.append(backend.optimizers.get()) opt = backend.optimizers.get()
opt = Hyperopt.opt_clear(opt)
opts.append(opt)
n_opts = len(opts) n_opts = len(opts)
for opt in opts: for opt in opts:
backend.optimizers.put(opt) backend.optimizers.put(opt)
else: else:
# when we clear the object for saving we have to make a copy to preserve state
opt = Hyperopt.opt_rand(self.opt, seed=False)
if self.opt: if self.opt:
n_opts = 1 n_opts = 1
opts = [self.opt] opts = [Hyperopt.opt_clear(self.opt)]
# (the optimizer copy function also fits a new model with the known points)
self.opt = opt
logger.debug(f"Saving {n_opts} {plural(n_opts, 'optimizer')}.") logger.debug(f"Saving {n_opts} {plural(n_opts, 'optimizer')}.")
dump(opts, self.opts_file) dump(opts, self.opts_file)
@ -610,42 +622,41 @@ class Hyperopt:
def estimators(self): def estimators(self):
return ESTIMATORS[random.randrange(0, ESTIMATORS_N)] return ESTIMATORS[random.randrange(0, ESTIMATORS_N)]
def get_optimizer(self, dimensions: List[Dimension], n_jobs: int, def get_optimizer(self, random_state: int = None) -> Optimizer:
n_initial_points: int) -> Optimizer:
" Construct an optimizer object " " Construct an optimizer object "
# https://github.com/scikit-learn/scikit-learn/issues/14265 # https://github.com/scikit-learn/scikit-learn/issues/14265
# lbfgs uses joblib threading backend so n_jobs has to be reduced # lbfgs uses joblib threading backend so n_jobs has to be reduced
# to avoid oversubscription # to avoid oversubscription
if self.opt_acq_optimizer == 'lbfgs': if self.opt_acq_optimizer == 'lbfgs':
n_jobs = 1 n_jobs = 1
else:
n_jobs = self.n_jobs
return Optimizer( return Optimizer(
dimensions, self.dimensions,
base_estimator=self.opt_base_estimator(), base_estimator=self.opt_base_estimator(),
acq_optimizer=self.opt_acq_optimizer, acq_optimizer=self.opt_acq_optimizer,
n_initial_points=n_initial_points, n_initial_points=self.opt_n_initial_points,
acq_optimizer_kwargs={'n_jobs': n_jobs}, acq_optimizer_kwargs={'n_jobs': n_jobs},
model_queue_size=self.n_models, model_queue_size=self.n_models,
random_state=self.random_state, random_state=random_state or self.random_state,
) )
def run_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int, def run_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int,
jobs: int) -> List: jobs: int):
""" launch parallel in single opt mode, return the evaluated epochs """ """ launch parallel in single opt mode, return the evaluated epochs """
result = parallel( parallel(
delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, backend.results, i) delayed(wrap_non_picklable_objects(self.parallel_objective))
(asked, backend.results_list, i)
for asked, i in zip(self.opt_ask_and_tell(jobs, tries), for asked, i in zip(self.opt_ask_and_tell(jobs, tries),
range(first_try, first_try + tries))) range(first_try, first_try + tries)))
return result
def run_multi_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int, def run_multi_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int,
jobs: int) -> List: jobs: int):
""" launch parallel in multi opt mode, return the evaluated epochs""" """ launch parallel in multi opt mode, return the evaluated epochs"""
results = parallel( parallel(
delayed(wrap_non_picklable_objects(self.parallel_opt_objective))( delayed(wrap_non_picklable_objects(self.parallel_opt_objective))(
i, backend.optimizers, jobs, backend.results_board) i, backend.optimizers, jobs, backend.results_shared, backend.results_batch)
for i in range(first_try, first_try + tries)) for i in range(first_try, first_try + tries))
# each worker will return a list containing n_points, so compact into a single list
return functools.reduce(lambda x, y: [*x, *y], results, [])
def opt_ask_and_tell(self, jobs: int, tries: int): def opt_ask_and_tell(self, jobs: int, tries: int):
""" """
@ -660,33 +671,37 @@ class Hyperopt:
evald: Set[Tuple] = set() evald: Set[Tuple] = set()
opt = self.opt opt = self.opt
def point(): # this is needed because when we ask None points, the optimizer doesn't return a list
if self.ask_points: if self.ask_points:
def point():
if to_ask: if to_ask:
return tuple(to_ask.popleft()) return tuple(to_ask.popleft())
else: else:
to_ask.extend(opt.ask(n_points=self.ask_points, strategy=self.lie_strat())) to_ask.extend(opt.ask(n_points=self.ask_points, strategy=self.lie_strat()))
return tuple(to_ask.popleft()) return tuple(to_ask.popleft())
else: else:
def point():
return tuple(opt.ask(strategy=self.lie_strat())) return tuple(opt.ask(strategy=self.lie_strat()))
for r in range(tries): for r in range(tries):
fit = (len(to_ask) < 1) fit = (len(to_ask) < 1)
while not backend.results.empty(): if len(backend.results_list) > 0:
vals.append(backend.results.get()) vals.extend(backend.results_list)
del backend.results_list[:]
if vals: if vals:
# filter losses # filter losses
void_filtered = self.filter_void_losses(vals, opt) void_filtered = Hyperopt.filter_void_losses(vals, opt)
if void_filtered: # again if all are filtered if void_filtered: # again if all are filtered
opt.tell([list(v['params_dict'].values()) for v in void_filtered], opt.tell([Hyperopt.params_Xi(v) for v in void_filtered],
[v['loss'] for v in void_filtered], [v['loss'] for v in void_filtered],
fit=fit) # only fit when out of points fit=fit) # only fit when out of points
self.batch_results.extend(void_filtered)
del vals[:], void_filtered[:] del vals[:], void_filtered[:]
a = point() a = point()
# this usually happens at the start when trying to fit before the initial points
if a in evald: if a in evald:
logger.debug("this point was evaluated before...") logger.debug("this point was evaluated before...")
if not fit:
opt.update_next() opt.update_next()
a = point() a = point()
if a in evald: if a in evald:
@ -695,90 +710,111 @@ class Hyperopt:
yield a yield a
@staticmethod @staticmethod
def opt_get_past_points(asked: dict, results_board: Queue) -> Tuple[dict, int]: def opt_get_past_points(is_shared: bool, asked: dict, results_shared: Dict) -> Tuple[dict, int]:
""" fetch shared results between optimizers """ """ fetch shared results between optimizers """
results = results_board.get() # a result is (y, counter)
results_board.put(results)
for a in asked: for a in asked:
if a in results: if a in results_shared:
asked[a] = results[a] y, counter = results_shared[a]
return asked, len(results) asked[a] = y
counter -= 1
if counter < 1:
del results_shared[a]
return asked, len(results_shared)
@staticmethod @staticmethod
def opt_state(shared: bool, optimizers: Queue) -> Tuple[Optimizer, int]: def opt_rand(opt: Optimizer, rand: int = None, seed: bool = True) -> Optimizer:
""" return a new instance of the optimizer with modified rng """
if seed:
if not rand:
rand = opt.rng.randint(0, VOID_LOSS)
opt.rng.seed(rand)
opt, opt.void_loss, opt.void, opt.rs = (
opt.copy(random_state=opt.rng), opt.void_loss, opt.void, opt.rs
)
return opt
@staticmethod
def opt_state(shared: bool, optimizers: Queue) -> Optimizer:
""" fetch an optimizer in multi opt mode """ """ fetch an optimizer in multi opt mode """
# get an optimizer instance # get an optimizer instance
opt = optimizers.get() opt = optimizers.get()
# this is the counter used by the optimizer internally to track the initial
# points evaluated so far..
initial_points = opt._n_initial_points
if shared: if shared:
# get a random number before putting it back to avoid # get a random number before putting it back to avoid
# replication with other workers and keep reproducibility # replication with other workers and keep reproducibility
rand = opt.rng.randint(0, VOID_LOSS) rand = opt.rng.randint(0, VOID_LOSS)
optimizers.put(opt) optimizers.put(opt)
# switch the seed to get a different point # switch the seed to get a different point
opt.rng.seed(rand) opt = Hyperopt.opt_rand(opt, rand)
opt, opt.void_loss, opt.void = opt.copy(random_state=opt.rng), opt.void_loss, opt.void return opt
# a model is only fit after initial points
elif initial_points < 1:
opt.tell(opt.Xi, opt.yi)
# we have to get a new point anyway
else:
opt.update_next()
return opt, initial_points
@staticmethod @staticmethod
def opt_results(opt: Optimizer, void_filtered: list, def opt_clear(opt: Optimizer):
Xi_d: list, yi_d: list, initial_points: int, is_shared: bool, """ clear state from an optimizer object """
results_board: Queue, optimizers: Queue) -> list: del opt.models[:], opt.Xi[:], opt.yi[:]
return opt
@staticmethod
def opt_results(opt: Optimizer, void_filtered: list, jobs: int, is_shared: bool,
results_shared: Dict, results_batch: Queue, optimizers: Queue):
""" """
update the board used to skip already computed points, update the board used to skip already computed points,
set the initial point status set the initial point status
""" """
# add points of the current dispatch if any # add points of the current dispatch if any
if opt.void_loss != VOID_LOSS or len(void_filtered) > 0: if opt.void_loss != VOID_LOSS or len(void_filtered) > 0:
Xi = [*Xi_d, *[list(v['params_dict'].values()) for v in void_filtered]] void = False
yi = [*yi_d, *[v['loss'] for v in void_filtered]]
if is_shared:
# refresh the optimizer that stores all the points
opt = optimizers.get()
opt.tell(Xi, yi, fit=False)
opt.void = False
else: else:
opt.void = True void = True
# send back the updated optimizer only in non shared mode # send back the updated optimizer only in non shared mode
# because in shared mode if all results are void we don't if not is_shared:
# fetch it at all opt = Hyperopt.opt_clear(opt)
if not opt.void or not is_shared: # is not a replica in shared mode
# don't pickle models
del opt.models[:]
optimizers.put(opt) optimizers.put(opt)
# NOTE: some results at the beginning won't be published # NOTE: some results at the beginning won't be published
# because they are removed by the filter_void_losses # because they are removed by filter_void_losses
if not opt.void: rs = opt.rs
results = results_board.get() if not void:
for v in void_filtered: # the tuple keys are used to avoid computation of done points by any optimizer
a = tuple(v['params_dict'].values()) results_shared.update({tuple(Hyperopt.params_Xi(v)): (v["loss"], jobs - 1)
if a not in results: for v in void_filtered})
results[a] = v['loss'] # in multi opt mode (non shared) also track results for each optimizer (using rs as ID)
results_board.put(results) # this keys should be cleared after each batch
# set initial point flag Xi, yi = results_shared[rs]
Xi = Xi + tuple((Hyperopt.params_Xi(v)) for v in void_filtered)
yi = yi + tuple(v["loss"] for v in void_filtered)
results_shared[rs] = (Xi, yi)
# this is the counter used by the optimizer internally to track the initial
# points evaluated so far..
initial_points = opt._n_initial_points
# set initial point flag and optimizer random state
for n, v in enumerate(void_filtered): for n, v in enumerate(void_filtered):
v['is_initial_point'] = initial_points - n > 0 v['is_initial_point'] = initial_points - n > 0
return void_filtered v['random_state'] = rs
results_batch.put(void_filtered)
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue): def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int,
results_shared: Dict, results_batch: Queue):
""" """
objective run in multi opt mode, optimizers share the results as soon as they are completed objective run in multi opt mode, optimizers share the results as soon as they are completed
""" """
self.log_results_immediate(n) self.log_results_immediate(n)
is_shared = self.shared is_shared = self.shared
opt, initial_points = self.opt_state(is_shared, optimizers) opt = self.opt_state(is_shared, optimizers)
sss = self.search_space_size sss = self.search_space_size
asked: Dict[Tuple, Any] = {tuple([]): None} asked: Dict[Tuple, Any] = {tuple([]): None}
asked_d: Dict[Tuple, Any] = {} asked_d: Dict[Tuple, Any] = {}
# fit a model with the known points, (the optimizer has no points here since
# it was just fetched from the queue)
rs = opt.rs
Xi, yi = self.Xi[rs], self.yi[rs]
# add the points discovered within this batch
bXi, byi = results_shared[rs]
Xi.extend(list(bXi))
yi.extend(list(byi))
if Xi:
opt.tell(Xi, yi)
told = 0 # told told = 0 # told
Xi_d = [] # done Xi_d = [] # done
yi_d = [] yi_d = []
@ -794,7 +830,7 @@ class Hyperopt:
else: else:
asked = {tuple(a): None for a in asked} asked = {tuple(a): None for a in asked}
# check if some points have been evaluated by other optimizers # check if some points have been evaluated by other optimizers
p_asked, _ = self.opt_get_past_points(asked, results_board) p_asked, _ = Hyperopt.opt_get_past_points(is_shared, asked, results_shared)
for a in p_asked: for a in p_asked:
if p_asked[a] is not None: if p_asked[a] is not None:
if a not in Xi_d: if a not in Xi_d:
@ -802,51 +838,55 @@ class Hyperopt:
yi_d.append(p_asked[a]) yi_d.append(p_asked[a])
else: else:
Xi_t.append(a) Xi_t.append(a)
# no points to do?
if len(Xi_t) < self.n_points: if len(Xi_t) < self.n_points:
len_Xi_d = len(Xi_d) len_Xi_d = len(Xi_d)
if len_Xi_d > told: # tell new points # did other workers backtest some points?
if len_Xi_d > told:
# if yes fit a new model with the new points
opt.tell(Xi_d[told:], yi_d[told:]) opt.tell(Xi_d[told:], yi_d[told:])
told = len_Xi_d told = len_Xi_d
else: else: # or get new points from a different random state
opt.update_next() opt = Hyperopt.opt_rand(opt)
else: else:
break break
# return early if there is nothing to backtest # return early if there is nothing to backtest
if len(Xi_t) < 1: if len(Xi_t) < 1:
if not is_shared: if is_shared:
opt = optimizers.get()
opt.void = -1 opt.void = -1
del opt.models[:] opt = Hyperopt.opt_clear(opt)
optimizers.put(opt) optimizers.put(opt)
return [] return []
# run the backtest for each point to do (Xi_t) # run the backtest for each point to do (Xi_t)
f_val = [self.backtest_params(a) for a in Xi_t] results = [self.backtest_params(a) for a in Xi_t]
# filter losses # filter losses
void_filtered = self.filter_void_losses(f_val, opt) void_filtered = Hyperopt.filter_void_losses(results, opt)
return self.opt_results(opt, void_filtered, Xi_d, yi_d, initial_points, is_shared, Hyperopt.opt_results(opt, void_filtered, jobs, is_shared,
results_board, optimizers) results_shared, results_batch, optimizers)
def parallel_objective(self, asked, results: Queue = None, n=0): def parallel_objective(self, asked, results_list: List = [], n=0):
""" objective run in single opt mode, run the backtest, store the results into a queue """ """ objective run in single opt mode, run the backtest, store the results into a queue """
self.log_results_immediate(n) self.log_results_immediate(n)
v = self.backtest_params(asked) v = self.backtest_params(asked)
if results:
results.put(v)
v['is_initial_point'] = n < self.opt_n_initial_points v['is_initial_point'] = n < self.opt_n_initial_points
return v v['random_state'] = self.random_state
results_list.append(v)
def log_results_immediate(self, n) -> None: def log_results_immediate(self, n) -> None:
""" Signals that a new job has been scheduled""" """ Signals that a new job has been scheduled"""
print('.', end='') print('.', end='')
sys.stdout.flush() sys.stdout.flush()
def log_results(self, f_val, frame_start, total_epochs: int) -> int: def log_results(self, batch_results, frame_start, total_epochs: int) -> int:
""" """
Log results if it is better than any previous evaluation Log results if it is better than any previous evaluation
""" """
current = frame_start + 1 current = frame_start + 1
i = 0 i = 0
for i, v in enumerate(f_val, 1): for i, v in enumerate(batch_results, 1):
is_best = self.is_best_loss(v, self.current_best_loss) is_best = self.is_best_loss(v, self.current_best_loss)
current = frame_start + i current = frame_start + i
v['is_best'] = is_best v['is_best'] = is_best
@ -857,8 +897,13 @@ class Hyperopt:
self.update_max_epoch(v, current) self.update_max_epoch(v, current)
self.print_results(v) self.print_results(v)
self.trials.append(v) self.trials.append(v)
# Save results and optimizersafter every batch # Save results and optimizers after every batch
self.save_trials() self.save_trials()
# track new points if in multi mode
if self.multi:
self.track_points(trials=self.trials[frame_start:])
# clear points used by optimizers intra batch
backend.results_shared.update(self.opt_empty_tuple())
# give up if no best since max epochs # give up if no best since max epochs
if current + 1 > self.epochs_limit(): if current + 1 > self.epochs_limit():
self.max_epoch_reached = True self.max_epoch_reached = True
@ -953,8 +998,8 @@ class Hyperopt:
# never waste # never waste
n_initial_points = min(log_sss, search_space_size // 3) n_initial_points = min(log_sss, search_space_size // 3)
# it shall run for this much, I say # it shall run for this much, I say
min_epochs = int(max(n_initial_points, opt_points) * (1 + effort) + n_initial_points) min_epochs = int(max(n_initial_points, opt_points) + 2 * n_initial_points)
return n_initial_points or 1, min_epochs, search_space_size return int(n_initial_points * effort) or 1, int(min_epochs * effort), search_space_size
def update_max_epoch(self, val: Dict, current: int): def update_max_epoch(self, val: Dict, current: int):
""" calculate max epochs: store the number of non best epochs """ calculate max epochs: store the number of non best epochs
@ -966,49 +1011,108 @@ class Hyperopt:
self.current_best_epoch = current self.current_best_epoch = current
self.max_epoch = int( self.max_epoch = int(
(self.current_best_epoch + self.avg_best_occurrence + self.min_epochs) * (self.current_best_epoch + self.avg_best_occurrence + self.min_epochs) *
(1 + self.effort)) max(1, self.effort))
if self.max_epoch > self.search_space_size: if self.max_epoch > self.search_space_size:
self.max_epoch = self.search_space_size self.max_epoch = self.search_space_size
logger.debug(f'\nMax epoch set to: {self.epochs_limit()}') logger.debug(f'\nMax epoch set to: {self.epochs_limit()}')
@staticmethod
def params_Xi(v: dict):
return list(v["params_dict"].values())
def track_points(self, trials: List = None):
"""
keep tracking of the evaluated points per optimizer random state
"""
# if no trials are given, use saved trials
if not trials:
if len(self.trials) > 0:
if self.config.get('hyperopt_continue_filtered', False):
trials = filter_trials(self.trials, self.config)
else:
trials = self.trials
else:
return
for v in trials:
rs = v["random_state"]
try:
self.Xi[rs].append(Hyperopt.params_Xi(v))
self.yi[rs].append(v["loss"])
except IndexError: # Hyperopt was started with different random_state or number of jobs
pass
def setup_optimizers(self): def setup_optimizers(self):
""" Setup the optimizers objects, try to load from disk, or create new ones """ """ Setup the optimizers objects, try to load from disk, or create new ones """
# try to load previous optimizers # try to load previous optimizers
opts = self.load_previous_optimizers(self.opts_file) opts = self.load_previous_optimizers(self.opts_file)
n_opts = len(opts) n_opts = len(opts)
max_opts = self.n_jobs
if self.multi: if self.multi:
max_opts = self.n_jobs
rngs = []
# when sharing results there is only one optimizer that gets copied # when sharing results there is only one optimizer that gets copied
if self.shared: if self.shared:
max_opts = 1 max_opts = 1
# put the restored optimizers in the queue # put the restored optimizers in the queue
if n_opts > 0: # only if they match the current number of jobs
if n_opts == max_opts:
for n in range(n_opts): for n in range(n_opts):
backend.optimizers.put(opts[n]) rngs.append(opts[n].rs)
# make sure to not store points and models in the optimizer
backend.optimizers.put(Hyperopt.opt_clear(opts[n]))
# generate as many optimizers as are still needed to fill the job count # generate as many optimizers as are still needed to fill the job count
remaining = max_opts - backend.optimizers.qsize() remaining = max_opts - backend.optimizers.qsize()
if remaining > 0: if remaining > 0:
opt = self.get_optimizer(self.dimensions, self.n_jobs, self.opt_n_initial_points) opt = self.get_optimizer()
rngs = []
for _ in range(remaining): # generate optimizers for _ in range(remaining): # generate optimizers
# random state is preserved # random state is preserved
opt_copy = opt.copy(random_state=opt.rng.randint(0, rs = opt.rng.randint(0, iinfo(int32).max)
iinfo(int32).max)) opt_copy = opt.copy(random_state=rs)
opt_copy.void_loss = VOID_LOSS opt_copy.void_loss = VOID_LOSS
opt_copy.void = False opt_copy.void = False
opt_copy.rs = rs
rngs.append(rs)
backend.optimizers.put(opt_copy) backend.optimizers.put(opt_copy)
del opt, opt_copy del opt, opt_copy
# reconstruct observed points from epochs
# in shared mode each worker will remove the results once all the workers
# have read it (counter < 1)
counter = self.n_jobs
def empty_dict():
return {rs: [] for rs in rngs}
self.opt_empty_tuple = lambda: {rs: ((), ()) for rs in rngs}
self.Xi.update(empty_dict())
self.yi.update(empty_dict())
self.track_points()
# this is needed to keep track of results discovered within the same batch
# by each optimizer, use tuples! as the SyncManager doesn't handle nested dicts
Xi, yi = self.Xi, self.yi
results = {tuple(X): [yi[r][n], counter] for r in Xi for n, X in enumerate(Xi[r])}
results.update(self.opt_empty_tuple())
backend.results_shared = backend.manager.dict(results)
else: else:
# if we have more than 1 optimizer but are using single opt, # if we have more than 1 optimizer but are using single opt,
# pick one discard the rest... # pick one discard the rest...
if n_opts > 0: if n_opts > 0:
self.opt = opts[-1] self.opt = opts[-1]
else: else:
self.opt = self.get_optimizer( self.opt = self.get_optimizer()
self.dimensions, self.n_jobs, self.opt_n_initial_points
)
self.opt.void_loss = VOID_LOSS self.opt.void_loss = VOID_LOSS
self.opt.void = False self.opt.void = False
self.opt.rs = self.random_state
# in single mode restore the points directly to the optimizer
# but delete first in case we have filtered the starting list of points
self.opt = Hyperopt.opt_clear(self.opt)
rs = self.random_state
self.Xi[rs] = []
self.track_points()
if len(self.Xi[rs]) > 0:
self.opt.tell(self.Xi[rs], self.yi[rs], fit=False)
# delete points since in single mode the optimizer state sits in the main
# process and is not discarded
self.Xi, self.yi = {}, {}
del opts[:] del opts[:]
def setup_points(self): def setup_points(self):
@ -1028,6 +1132,20 @@ class Hyperopt:
# initialize average best occurrence # initialize average best occurrence
self.avg_best_occurrence = self.min_epochs // self.n_jobs self.avg_best_occurrence = self.min_epochs // self.n_jobs
def return_results(self):
"""
results are passed by queue in multi mode, or stored by ask_and_tell in single mode
"""
batch_results = []
if self.multi:
while not backend.results_batch.empty():
worker_results = backend.results_batch.get()
batch_results.extend(worker_results)
else:
batch_results.extend(self.batch_results)
del self.batch_results[:]
return batch_results
def main_loop(self, jobs_scheduler): def main_loop(self, jobs_scheduler):
""" main parallel loop """ """ main parallel loop """
try: try:
@ -1036,7 +1154,7 @@ class Hyperopt:
jobs = parallel._effective_n_jobs() jobs = parallel._effective_n_jobs()
logger.info(f'Effective number of parallel workers used: {jobs}') logger.info(f'Effective number of parallel workers used: {jobs}')
# update epochs count # update epochs count
n_points = self.n_points opt_points = self.opt_points
prev_batch = -1 prev_batch = -1
epochs_so_far = len(self.trials) epochs_so_far = len(self.trials)
epochs_limit = self.epochs_limit epochs_limit = self.epochs_limit
@ -1044,7 +1162,7 @@ class Hyperopt:
columns -= 1 columns -= 1
while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs: while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs:
prev_batch = epochs_so_far prev_batch = epochs_so_far
occurrence = int(self.avg_best_occurrence * (1 + self.effort)) occurrence = int(self.avg_best_occurrence * max(1, self.effort))
# pad the batch length to the number of jobs to avoid desaturation # pad the batch length to the number of jobs to avoid desaturation
batch_len = (occurrence + jobs - batch_len = (occurrence + jobs -
occurrence % jobs) occurrence % jobs)
@ -1052,22 +1170,23 @@ class Hyperopt:
# n_points (epochs) in 1 dispatch but this reduces the batch len too much # n_points (epochs) in 1 dispatch but this reduces the batch len too much
# if self.multi: batch_len = batch_len // self.n_points # if self.multi: batch_len = batch_len // self.n_points
# don't go over the limit # don't go over the limit
if epochs_so_far + batch_len * n_points >= epochs_limit(): if epochs_so_far + batch_len * opt_points >= epochs_limit():
q, r = divmod(epochs_limit() - epochs_so_far, n_points) q, r = divmod(epochs_limit() - epochs_so_far, opt_points)
batch_len = q + r batch_len = q + r
print( print(
f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}" f"{epochs_so_far+1}-{epochs_so_far+batch_len}"
f"/{epochs_limit()}: ", f"/{epochs_limit()}: ",
end='') end='')
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs) jobs_scheduler(parallel, batch_len, epochs_so_far, jobs)
batch_results = self.return_results()
print(end='\r') print(end='\r')
saved = self.log_results(f_val, epochs_so_far, epochs_limit()) saved = self.log_results(batch_results, epochs_so_far, epochs_limit())
print('\r', ' ' * columns, end='\r') print('\r', ' ' * columns, end='\r')
# stop if no epochs have been evaluated # stop if no epochs have been evaluated
if len(f_val) < batch_len: if len(batch_results) < batch_len:
logger.warning("Some evaluated epochs were void, " logger.warning("Some evaluated epochs were void, "
"check the loss function and the search space.") "check the loss function and the search space.")
if (not saved and len(f_val) > 1) or batch_len < 1 or \ if (not saved and len(batch_results) > 1) or batch_len < 1 or \
(not saved and self.search_space_size < batch_len + epochs_limit()): (not saved and self.search_space_size < batch_len + epochs_limit()):
break break
# log_results add # log_results add

View File

@ -1,4 +1,4 @@
from typing import Any from typing import Any, Dict, List, Tuple
from queue import Queue from queue import Queue
from multiprocessing.managers import SyncManager from multiprocessing.managers import SyncManager
@ -6,8 +6,13 @@ hyperopt: Any = None
manager: SyncManager manager: SyncManager
# stores the optimizers in multi opt mode # stores the optimizers in multi opt mode
optimizers: Queue optimizers: Queue
# stores a list of the results to share between optimizers # stores the results to share between optimizers
# in the form of dict[tuple(Xi)] = yi # in the form of key = Tuple[Xi], value = Tuple[float, int]
results_board: Queue # where float is the loss and int is a decreasing counter of optimizers
# store the results in single opt mode # that have registered the result
results: Queue results_shared: Dict[Tuple, Tuple]
# in single mode the results_list is used to pass the results to the optimizer
# to fit new models
results_list: List
# results_batch stores keeps results per batch that are eventually logged and stored
results_batch: Queue