fixes for single mode

This commit is contained in:
orehunt 2020-03-18 08:06:50 +01:00
parent 8d03887b02
commit cf76be6845

View File

@ -2,7 +2,6 @@
""" """
This module contains the hyperopt logic This module contains the hyperopt logic
""" """
import functools import functools
import locale import locale
import logging import logging
@ -52,8 +51,12 @@ with warnings.catch_warnings():
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# supported strategies when asking for multiple points to the optimizer # supported strategies when asking for multiple points to the optimizer
NEXT_POINT_METHODS = ["cl_min", "cl_mean", "cl_max"] LIE_STRATS = ["cl_min", "cl_mean", "cl_max"]
NEXT_POINT_METHODS_LENGTH = 3 LIE_STRATS_N = len(LIE_STRATS)
# supported estimators
ESTIMATORS = ["GBRT", "ET", "RF"]
ESTIMATORS_N = len(ESTIMATORS)
VOID_LOSS = iinfo(int32).max # just a big enough number to be bad result in loss optimization VOID_LOSS = iinfo(int32).max # just a big enough number to be bad result in loss optimization
@ -154,23 +157,25 @@ class Hyperopt:
backend.manager = Manager() backend.manager = Manager()
self.mode = self.config.get('mode', 'single') self.mode = self.config.get('mode', 'single')
self.shared = False self.shared = False
# models are only needed for posterior eval # in multi opt one model is enough
self.n_models = 1 self.n_models = 1
if self.mode in ('multi', 'shared'): if self.mode in ('multi', 'shared'):
self.multi = True self.multi = True
if self.mode == 'shared': if self.mode == 'shared':
self.shared = True self.shared = True
self.opt_base_estimator = lambda: 'GBRT'
else:
self.opt_base_estimator = self.estimators
self.opt_acq_optimizer = 'sampling'
backend.optimizers = backend.manager.Queue() backend.optimizers = backend.manager.Queue()
backend.results_board = backend.manager.Queue(maxsize=1) backend.results_board = backend.manager.Queue(maxsize=1)
backend.results_board.put({}) backend.results_board.put({})
default_n_points = 2 default_n_points = 2
self.opt_base_estimator = 'GBRT'
self.opt_acq_optimizer = 'sampling'
else: else:
self.multi = False self.multi = False
backend.results = backend.manager.Queue() backend.results = backend.manager.Queue()
self.opt_base_estimator = 'ET'
self.opt_acq_optimizer = 'sampling' self.opt_acq_optimizer = 'sampling'
self.opt_base_estimator = lambda: 'ET'
default_n_points = 1 default_n_points = 1
# The GaussianProcessRegressor is heavy, which makes it not a good default # The GaussianProcessRegressor is heavy, which makes it not a good default
# however longer backtests might make it a better tradeoff # however longer backtests might make it a better tradeoff
@ -198,7 +203,7 @@ class Hyperopt:
if lie_strat == 'default': if lie_strat == 'default':
self.lie_strat = lambda: 'cl_min' self.lie_strat = lambda: 'cl_min'
elif lie_strat == 'random': elif lie_strat == 'random':
self.lie_strat = self.get_next_point_strategy self.lie_strat = self.lie_strategy
else: else:
self.lie_strat = lambda: lie_strat self.lie_strat = lambda: lie_strat
@ -232,17 +237,17 @@ class Hyperopt:
def save_trials(self, final: bool = False) -> None: def save_trials(self, final: bool = False) -> None:
""" """
Save hyperopt trials to file Save hyperopt trials
""" """
num_trials = len(self.trials) num_trials = len(self.trials)
print()
if num_trials > self.num_trials_saved: if num_trials > self.num_trials_saved:
logger.debug(f"Saving {num_trials} {plural(num_trials, 'epoch')}.") logger.debug(f"\nSaving {num_trials} {plural(num_trials, 'epoch')}.")
# save_trials(self.trials, trials_path, self.num_trials_saved)
dump(self.trials, self.trials_file) dump(self.trials, self.trials_file)
self.num_trials_saved = num_trials self.num_trials_saved = num_trials
self.save_opts() self.save_opts()
if final: if final:
logger.debug(f"{num_trials} {plural(num_trials, 'epoch')} " logger.info(f"\n{num_trials} {plural(num_trials, 'epoch')} "
f"saved to '{self.trials_file}'.") f"saved to '{self.trials_file}'.")
def save_opts(self) -> None: def save_opts(self) -> None:
@ -597,10 +602,13 @@ class Hyperopt:
void_filtered = vals void_filtered = vals
return void_filtered return void_filtered
def get_next_point_strategy(self): def lie_strategy(self):
""" Choose a strategy randomly among the supported ones, used in multi opt mode """ Choose a strategy randomly among the supported ones, used in multi opt mode
to increase the diversion of the searches of each optimizer """ to increase the diversion of the searches of each optimizer """
return NEXT_POINT_METHODS[random.randrange(0, NEXT_POINT_METHODS_LENGTH)] return LIE_STRATS[random.randrange(0, LIE_STRATS_N)]
def estimators(self):
return ESTIMATORS[random.randrange(0, ESTIMATORS_N)]
def get_optimizer(self, dimensions: List[Dimension], n_jobs: int, def get_optimizer(self, dimensions: List[Dimension], n_jobs: int,
n_initial_points: int) -> Optimizer: n_initial_points: int) -> Optimizer:
@ -612,7 +620,7 @@ class Hyperopt:
n_jobs = 1 n_jobs = 1
return Optimizer( return Optimizer(
dimensions, dimensions,
base_estimator=self.opt_base_estimator, base_estimator=self.opt_base_estimator(),
acq_optimizer=self.opt_acq_optimizer, acq_optimizer=self.opt_acq_optimizer,
n_initial_points=n_initial_points, n_initial_points=n_initial_points,
acq_optimizer_kwargs={'n_jobs': n_jobs}, acq_optimizer_kwargs={'n_jobs': n_jobs},
@ -647,10 +655,13 @@ class Hyperopt:
points become invalid. points become invalid.
""" """
vals = [] vals = []
fit = False
to_ask: deque = deque() to_ask: deque = deque()
evald: Set[Tuple] = set() evald: Set[Tuple] = set()
opt = self.opt opt = self.opt
ask = lambda: to_ask.extend(opt.ask(n_points=self.n_points, strategy=self.lie_strat()))
for r in range(tries): for r in range(tries):
fit = (len(to_ask) < 1)
while not backend.results.empty(): while not backend.results.empty():
vals.append(backend.results.get()) vals.append(backend.results.get())
if vals: if vals:
@ -659,18 +670,20 @@ class Hyperopt:
if void_filtered: # again if all are filtered if void_filtered: # again if all are filtered
opt.tell([list(v['params_dict'].values()) for v in void_filtered], opt.tell([list(v['params_dict'].values()) for v in void_filtered],
[v['loss'] for v in void_filtered], [v['loss'] for v in void_filtered],
fit=(len(to_ask) < 1)) # only fit when out of points fit=fit) # only fit when out of points
del vals[:], void_filtered[:] del vals[:], void_filtered[:]
if not to_ask: if fit:
to_ask.extend(opt.ask(n_points=self.n_points, strategy=self.lie_strat())) ask()
a = tuple(to_ask.popleft()) a = tuple(to_ask.popleft())
while a in evald: while a in evald:
logger.info("this point was evaluated before...") logger.debug("this point was evaluated before...")
if len(to_ask) > 0: if len(to_ask) > 0:
a = tuple(to_ask.popleft()) a = tuple(to_ask.popleft())
else: else:
break opt.update_next()
ask()
a = tuple(to_ask.popleft())
evald.add(a) evald.add(a)
yield a yield a
@ -700,8 +713,9 @@ class Hyperopt:
optimizers.put(opt) optimizers.put(opt)
# switch the seed to get a different point # switch the seed to get a different point
opt.rng.seed(rand) opt.rng.seed(rand)
opt.update_next()
# always update the next point because we never fit on tell
opt.update_next()
# ask for points according to config # ask for points according to config
asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat()) asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat())
# check if some points have been evaluated by other optimizers # check if some points have been evaluated by other optimizers
@ -728,10 +742,11 @@ class Hyperopt:
opt.tell(Xi, yi, fit=False) opt.tell(Xi, yi, fit=False)
else: else:
void = True void = True
if not void or not is_shared:
# send back the updated optimizer only in non shared mode # send back the updated optimizer only in non shared mode
# because in shared mode if all results are void we don't # because in shared mode if all results are void we don't
# fetch it at all # fetch it at all
if not void or not is_shared:
del opt.models[:]
optimizers.put(opt) optimizers.put(opt)
# update the board used to skip already computed points # update the board used to skip already computed points
results = results_board.get() results = results_board.get()
@ -740,11 +755,14 @@ class Hyperopt:
results_board.put(results) results_board.put(results)
return void_filtered return void_filtered
def parallel_objective(self, asked, results: Queue, n=0): def parallel_objective(self, asked, results: Queue = None, n=0):
""" objective run in single opt mode, run the backtest, store the results into a queue """ """ objective run in single opt mode, run the backtest, store the results into a queue """
self.log_results_immediate(n) self.log_results_immediate(n)
v = self.backtest_params(asked) v = self.backtest_params(asked)
if results:
results.put(v) results.put(v)
# the results logged won't be filtered
# the loss score will be == VOID_LOSS
return v return v
def log_results_immediate(self, n) -> None: def log_results_immediate(self, n) -> None:
@ -756,7 +774,6 @@ class Hyperopt:
""" """
Log results if it is better than any previous evaluation Log results if it is better than any previous evaluation
""" """
print()
current = frame_start + 1 current = frame_start + 1
i = 0 i = 0
for i, v in enumerate(f_val, 1): for i, v in enumerate(f_val, 1):
@ -778,7 +795,7 @@ class Hyperopt:
self.max_epoch_reached = True self.max_epoch_reached = True
return i return i
def setup_best_epochs(self) -> bool: def setup_epochs(self) -> bool:
""" used to resume the best epochs state from previous trials """ """ used to resume the best epochs state from previous trials """
len_trials = len(self.trials) len_trials = len(self.trials)
if len_trials > 0: if len_trials > 0:
@ -879,8 +896,7 @@ class Hyperopt:
(1 + self.effort)) (1 + self.effort))
if self.max_epoch > self.search_space_size: if self.max_epoch > self.search_space_size:
self.max_epoch = self.search_space_size self.max_epoch = self.search_space_size
print() logger.debug(f'\nMax epoch set to: {self.epochs_limit()}')
logger.debug(f'Max epoch set to: {self.epochs_limit()}')
def setup_optimizers(self): def setup_optimizers(self):
""" Setup the optimizers objects, try to load from disk, or create new ones """ """ Setup the optimizers objects, try to load from disk, or create new ones """
@ -962,7 +978,7 @@ class Hyperopt:
self.backtesting.exchange = None # type: ignore self.backtesting.exchange = None # type: ignore
self.trials = self.load_previous_results(self.trials_file) self.trials = self.load_previous_results(self.trials_file)
self.setup_best_epochs() self.setup_epochs()
logger.info(f"Found {cpu_count()} CPU cores. Let's make them scream!") logger.info(f"Found {cpu_count()} CPU cores. Let's make them scream!")
logger.info(f'Number of parallel jobs set as: {self.n_jobs}') logger.info(f'Number of parallel jobs set as: {self.n_jobs}')
@ -1003,6 +1019,7 @@ class Hyperopt:
f"/{epochs_limit()}: ", f"/{epochs_limit()}: ",
end='') end='')
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, self.n_jobs) f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, self.n_jobs)
print(' ' * batch_len * n_points, end='\r')
saved = self.log_results(f_val, epochs_so_far, epochs_limit()) saved = self.log_results(f_val, epochs_so_far, epochs_limit())
# stop if no epochs have been evaluated # stop if no epochs have been evaluated
if len(f_val) < batch_len: if len(f_val) < batch_len: