diff --git a/freqtrade/optimize/hyperopt.py b/freqtrade/optimize/hyperopt.py index 547ba502d..aa0a01638 100644 --- a/freqtrade/optimize/hyperopt.py +++ b/freqtrade/optimize/hyperopt.py @@ -4,7 +4,6 @@ This module contains the hyperopt logic """ import os -import functools import locale import logging import random @@ -92,7 +91,9 @@ class Hyperopt: self.n_jobs = self.config.get('hyperopt_jobs', -1) if self.n_jobs < 0: self.n_jobs = cpu_count() // 2 or 1 - self.effort = self.config['effort'] if 'effort' in self.config else 0 + self.effort = max(0.01, + self.config['effort'] if 'effort' in self.config else 1 + ) self.total_epochs = self.config['epochs'] if 'epochs' in self.config else 0 self.max_epoch = 0 self.max_epoch_reached = False @@ -155,6 +156,8 @@ class Hyperopt: # optimizers self.opts: List[Optimizer] = [] self.opt: Optimizer = None + self.Xi: Dict = {} + self.yi: Dict = {} backend.manager = Manager() self.mode = self.config.get('mode', 'single') @@ -170,11 +173,14 @@ class Hyperopt: self.opt_base_estimator = self.estimators self.opt_acq_optimizer = 'sampling' backend.optimizers = backend.manager.Queue() - backend.results_board = backend.manager.Queue(maxsize=1) - backend.results_board.put({}) + backend.results_batch = backend.manager.Queue() else: self.multi = False - backend.results = backend.manager.Queue() + backend.results_list = backend.manager.list([]) + # this is where opt_ask_and_tell stores the results after points are + # used for fit and predict, to avoid additional pickling + self.batch_results = [] + # self.opt_base_estimator = lambda: BayesianRidge(n_iter=100, normalize=True) self.opt_acq_optimizer = 'sampling' self.opt_base_estimator = lambda: 'ET' # The GaussianProcessRegressor is heavy, which makes it not a good default @@ -262,14 +268,20 @@ class Hyperopt: n_opts = 0 if self.multi: while not backend.optimizers.empty(): - opts.append(backend.optimizers.get()) + opt = backend.optimizers.get() + opt = Hyperopt.opt_clear(opt) + opts.append(opt) n_opts = len(opts) for opt in opts: backend.optimizers.put(opt) else: + # when we clear the object for saving we have to make a copy to preserve state + opt = Hyperopt.opt_rand(self.opt, seed=False) if self.opt: n_opts = 1 - opts = [self.opt] + opts = [Hyperopt.opt_clear(self.opt)] + # (the optimizer copy function also fits a new model with the known points) + self.opt = opt logger.debug(f"Saving {n_opts} {plural(n_opts, 'optimizer')}.") dump(opts, self.opts_file) @@ -610,42 +622,41 @@ class Hyperopt: def estimators(self): return ESTIMATORS[random.randrange(0, ESTIMATORS_N)] - def get_optimizer(self, dimensions: List[Dimension], n_jobs: int, - n_initial_points: int) -> Optimizer: + def get_optimizer(self, random_state: int = None) -> Optimizer: " Construct an optimizer object " # https://github.com/scikit-learn/scikit-learn/issues/14265 # lbfgs uses joblib threading backend so n_jobs has to be reduced # to avoid oversubscription if self.opt_acq_optimizer == 'lbfgs': n_jobs = 1 + else: + n_jobs = self.n_jobs return Optimizer( - dimensions, + self.dimensions, base_estimator=self.opt_base_estimator(), acq_optimizer=self.opt_acq_optimizer, - n_initial_points=n_initial_points, + n_initial_points=self.opt_n_initial_points, acq_optimizer_kwargs={'n_jobs': n_jobs}, model_queue_size=self.n_models, - random_state=self.random_state, + random_state=random_state or self.random_state, ) def run_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int, - jobs: int) -> List: + jobs: int): """ launch parallel in single opt mode, return the evaluated epochs """ - result = parallel( - delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, backend.results, i) + parallel( + delayed(wrap_non_picklable_objects(self.parallel_objective)) + (asked, backend.results_list, i) for asked, i in zip(self.opt_ask_and_tell(jobs, tries), range(first_try, first_try + tries))) - return result def run_multi_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int, - jobs: int) -> List: + jobs: int): """ launch parallel in multi opt mode, return the evaluated epochs""" - results = parallel( + parallel( delayed(wrap_non_picklable_objects(self.parallel_opt_objective))( - i, backend.optimizers, jobs, backend.results_board) + i, backend.optimizers, jobs, backend.results_shared, backend.results_batch) for i in range(first_try, first_try + tries)) - # each worker will return a list containing n_points, so compact into a single list - return functools.reduce(lambda x, y: [*x, *y], results, []) def opt_ask_and_tell(self, jobs: int, tries: int): """ @@ -660,34 +671,38 @@ class Hyperopt: evald: Set[Tuple] = set() opt = self.opt - def point(): - if self.ask_points: + # this is needed because when we ask None points, the optimizer doesn't return a list + if self.ask_points: + def point(): if to_ask: return tuple(to_ask.popleft()) else: to_ask.extend(opt.ask(n_points=self.ask_points, strategy=self.lie_strat())) return tuple(to_ask.popleft()) - else: + else: + def point(): return tuple(opt.ask(strategy=self.lie_strat())) for r in range(tries): fit = (len(to_ask) < 1) - while not backend.results.empty(): - vals.append(backend.results.get()) + if len(backend.results_list) > 0: + vals.extend(backend.results_list) + del backend.results_list[:] if vals: # filter losses - void_filtered = self.filter_void_losses(vals, opt) + void_filtered = Hyperopt.filter_void_losses(vals, opt) if void_filtered: # again if all are filtered - opt.tell([list(v['params_dict'].values()) for v in void_filtered], + opt.tell([Hyperopt.params_Xi(v) for v in void_filtered], [v['loss'] for v in void_filtered], fit=fit) # only fit when out of points - del vals[:], void_filtered[:] + self.batch_results.extend(void_filtered) + del vals[:], void_filtered[:] a = point() + # this usually happens at the start when trying to fit before the initial points if a in evald: logger.debug("this point was evaluated before...") - if not fit: - opt.update_next() + opt.update_next() a = point() if a in evald: break @@ -695,90 +710,111 @@ class Hyperopt: yield a @staticmethod - def opt_get_past_points(asked: dict, results_board: Queue) -> Tuple[dict, int]: + def opt_get_past_points(is_shared: bool, asked: dict, results_shared: Dict) -> Tuple[dict, int]: """ fetch shared results between optimizers """ - results = results_board.get() - results_board.put(results) + # a result is (y, counter) for a in asked: - if a in results: - asked[a] = results[a] - return asked, len(results) + if a in results_shared: + y, counter = results_shared[a] + asked[a] = y + counter -= 1 + if counter < 1: + del results_shared[a] + return asked, len(results_shared) @staticmethod - def opt_state(shared: bool, optimizers: Queue) -> Tuple[Optimizer, int]: + def opt_rand(opt: Optimizer, rand: int = None, seed: bool = True) -> Optimizer: + """ return a new instance of the optimizer with modified rng """ + if seed: + if not rand: + rand = opt.rng.randint(0, VOID_LOSS) + opt.rng.seed(rand) + opt, opt.void_loss, opt.void, opt.rs = ( + opt.copy(random_state=opt.rng), opt.void_loss, opt.void, opt.rs + ) + return opt + + @staticmethod + def opt_state(shared: bool, optimizers: Queue) -> Optimizer: """ fetch an optimizer in multi opt mode """ # get an optimizer instance opt = optimizers.get() - # this is the counter used by the optimizer internally to track the initial - # points evaluated so far.. - initial_points = opt._n_initial_points if shared: # get a random number before putting it back to avoid # replication with other workers and keep reproducibility rand = opt.rng.randint(0, VOID_LOSS) optimizers.put(opt) # switch the seed to get a different point - opt.rng.seed(rand) - opt, opt.void_loss, opt.void = opt.copy(random_state=opt.rng), opt.void_loss, opt.void - # a model is only fit after initial points - elif initial_points < 1: - opt.tell(opt.Xi, opt.yi) - # we have to get a new point anyway - else: - opt.update_next() - return opt, initial_points + opt = Hyperopt.opt_rand(opt, rand) + return opt @staticmethod - def opt_results(opt: Optimizer, void_filtered: list, - Xi_d: list, yi_d: list, initial_points: int, is_shared: bool, - results_board: Queue, optimizers: Queue) -> list: + def opt_clear(opt: Optimizer): + """ clear state from an optimizer object """ + del opt.models[:], opt.Xi[:], opt.yi[:] + return opt + + @staticmethod + def opt_results(opt: Optimizer, void_filtered: list, jobs: int, is_shared: bool, + results_shared: Dict, results_batch: Queue, optimizers: Queue): """ update the board used to skip already computed points, set the initial point status """ # add points of the current dispatch if any if opt.void_loss != VOID_LOSS or len(void_filtered) > 0: - Xi = [*Xi_d, *[list(v['params_dict'].values()) for v in void_filtered]] - yi = [*yi_d, *[v['loss'] for v in void_filtered]] - if is_shared: - # refresh the optimizer that stores all the points - opt = optimizers.get() - opt.tell(Xi, yi, fit=False) - opt.void = False + void = False else: - opt.void = True + void = True # send back the updated optimizer only in non shared mode - # because in shared mode if all results are void we don't - # fetch it at all - if not opt.void or not is_shared: - # don't pickle models - del opt.models[:] + if not is_shared: + opt = Hyperopt.opt_clear(opt) + # is not a replica in shared mode optimizers.put(opt) # NOTE: some results at the beginning won't be published - # because they are removed by the filter_void_losses - if not opt.void: - results = results_board.get() - for v in void_filtered: - a = tuple(v['params_dict'].values()) - if a not in results: - results[a] = v['loss'] - results_board.put(results) - # set initial point flag + # because they are removed by filter_void_losses + rs = opt.rs + if not void: + # the tuple keys are used to avoid computation of done points by any optimizer + results_shared.update({tuple(Hyperopt.params_Xi(v)): (v["loss"], jobs - 1) + for v in void_filtered}) + # in multi opt mode (non shared) also track results for each optimizer (using rs as ID) + # this keys should be cleared after each batch + Xi, yi = results_shared[rs] + Xi = Xi + tuple((Hyperopt.params_Xi(v)) for v in void_filtered) + yi = yi + tuple(v["loss"] for v in void_filtered) + results_shared[rs] = (Xi, yi) + # this is the counter used by the optimizer internally to track the initial + # points evaluated so far.. + initial_points = opt._n_initial_points + # set initial point flag and optimizer random state for n, v in enumerate(void_filtered): v['is_initial_point'] = initial_points - n > 0 - return void_filtered + v['random_state'] = rs + results_batch.put(void_filtered) - def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue): + def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, + results_shared: Dict, results_batch: Queue): """ objective run in multi opt mode, optimizers share the results as soon as they are completed """ self.log_results_immediate(n) is_shared = self.shared - opt, initial_points = self.opt_state(is_shared, optimizers) + opt = self.opt_state(is_shared, optimizers) sss = self.search_space_size asked: Dict[Tuple, Any] = {tuple([]): None} asked_d: Dict[Tuple, Any] = {} + # fit a model with the known points, (the optimizer has no points here since + # it was just fetched from the queue) + rs = opt.rs + Xi, yi = self.Xi[rs], self.yi[rs] + # add the points discovered within this batch + bXi, byi = results_shared[rs] + Xi.extend(list(bXi)) + yi.extend(list(byi)) + if Xi: + opt.tell(Xi, yi) told = 0 # told Xi_d = [] # done yi_d = [] @@ -794,7 +830,7 @@ class Hyperopt: else: asked = {tuple(a): None for a in asked} # check if some points have been evaluated by other optimizers - p_asked, _ = self.opt_get_past_points(asked, results_board) + p_asked, _ = Hyperopt.opt_get_past_points(is_shared, asked, results_shared) for a in p_asked: if p_asked[a] is not None: if a not in Xi_d: @@ -802,51 +838,55 @@ class Hyperopt: yi_d.append(p_asked[a]) else: Xi_t.append(a) + # no points to do? if len(Xi_t) < self.n_points: len_Xi_d = len(Xi_d) - if len_Xi_d > told: # tell new points + # did other workers backtest some points? + if len_Xi_d > told: + # if yes fit a new model with the new points opt.tell(Xi_d[told:], yi_d[told:]) told = len_Xi_d - else: - opt.update_next() + else: # or get new points from a different random state + opt = Hyperopt.opt_rand(opt) else: break # return early if there is nothing to backtest if len(Xi_t) < 1: - if not is_shared: - opt.void = -1 - del opt.models[:] - optimizers.put(opt) + if is_shared: + opt = optimizers.get() + opt.void = -1 + opt = Hyperopt.opt_clear(opt) + optimizers.put(opt) return [] # run the backtest for each point to do (Xi_t) - f_val = [self.backtest_params(a) for a in Xi_t] + results = [self.backtest_params(a) for a in Xi_t] # filter losses - void_filtered = self.filter_void_losses(f_val, opt) + void_filtered = Hyperopt.filter_void_losses(results, opt) - return self.opt_results(opt, void_filtered, Xi_d, yi_d, initial_points, is_shared, - results_board, optimizers) + Hyperopt.opt_results(opt, void_filtered, jobs, is_shared, + results_shared, results_batch, optimizers) - def parallel_objective(self, asked, results: Queue = None, n=0): + def parallel_objective(self, asked, results_list: List = [], n=0): """ objective run in single opt mode, run the backtest, store the results into a queue """ self.log_results_immediate(n) v = self.backtest_params(asked) - if results: - results.put(v) + v['is_initial_point'] = n < self.opt_n_initial_points - return v + v['random_state'] = self.random_state + results_list.append(v) def log_results_immediate(self, n) -> None: """ Signals that a new job has been scheduled""" print('.', end='') sys.stdout.flush() - def log_results(self, f_val, frame_start, total_epochs: int) -> int: + def log_results(self, batch_results, frame_start, total_epochs: int) -> int: """ Log results if it is better than any previous evaluation """ current = frame_start + 1 i = 0 - for i, v in enumerate(f_val, 1): + for i, v in enumerate(batch_results, 1): is_best = self.is_best_loss(v, self.current_best_loss) current = frame_start + i v['is_best'] = is_best @@ -857,8 +897,13 @@ class Hyperopt: self.update_max_epoch(v, current) self.print_results(v) self.trials.append(v) - # Save results and optimizersafter every batch + # Save results and optimizers after every batch self.save_trials() + # track new points if in multi mode + if self.multi: + self.track_points(trials=self.trials[frame_start:]) + # clear points used by optimizers intra batch + backend.results_shared.update(self.opt_empty_tuple()) # give up if no best since max epochs if current + 1 > self.epochs_limit(): self.max_epoch_reached = True @@ -953,8 +998,8 @@ class Hyperopt: # never waste n_initial_points = min(log_sss, search_space_size // 3) # it shall run for this much, I say - min_epochs = int(max(n_initial_points, opt_points) * (1 + effort) + n_initial_points) - return n_initial_points or 1, min_epochs, search_space_size + min_epochs = int(max(n_initial_points, opt_points) + 2 * n_initial_points) + return int(n_initial_points * effort) or 1, int(min_epochs * effort), search_space_size def update_max_epoch(self, val: Dict, current: int): """ calculate max epochs: store the number of non best epochs @@ -966,49 +1011,108 @@ class Hyperopt: self.current_best_epoch = current self.max_epoch = int( (self.current_best_epoch + self.avg_best_occurrence + self.min_epochs) * - (1 + self.effort)) + max(1, self.effort)) if self.max_epoch > self.search_space_size: self.max_epoch = self.search_space_size logger.debug(f'\nMax epoch set to: {self.epochs_limit()}') + @staticmethod + def params_Xi(v: dict): + return list(v["params_dict"].values()) + + def track_points(self, trials: List = None): + """ + keep tracking of the evaluated points per optimizer random state + """ + # if no trials are given, use saved trials + if not trials: + if len(self.trials) > 0: + if self.config.get('hyperopt_continue_filtered', False): + trials = filter_trials(self.trials, self.config) + else: + trials = self.trials + else: + return + for v in trials: + rs = v["random_state"] + try: + self.Xi[rs].append(Hyperopt.params_Xi(v)) + self.yi[rs].append(v["loss"]) + except IndexError: # Hyperopt was started with different random_state or number of jobs + pass + def setup_optimizers(self): """ Setup the optimizers objects, try to load from disk, or create new ones """ # try to load previous optimizers opts = self.load_previous_optimizers(self.opts_file) n_opts = len(opts) - max_opts = self.n_jobs if self.multi: + max_opts = self.n_jobs + rngs = [] # when sharing results there is only one optimizer that gets copied if self.shared: max_opts = 1 # put the restored optimizers in the queue - if n_opts > 0: + # only if they match the current number of jobs + if n_opts == max_opts: for n in range(n_opts): - backend.optimizers.put(opts[n]) + rngs.append(opts[n].rs) + # make sure to not store points and models in the optimizer + backend.optimizers.put(Hyperopt.opt_clear(opts[n])) # generate as many optimizers as are still needed to fill the job count remaining = max_opts - backend.optimizers.qsize() if remaining > 0: - opt = self.get_optimizer(self.dimensions, self.n_jobs, self.opt_n_initial_points) + opt = self.get_optimizer() + rngs = [] for _ in range(remaining): # generate optimizers # random state is preserved - opt_copy = opt.copy(random_state=opt.rng.randint(0, - iinfo(int32).max)) + rs = opt.rng.randint(0, iinfo(int32).max) + opt_copy = opt.copy(random_state=rs) opt_copy.void_loss = VOID_LOSS opt_copy.void = False + opt_copy.rs = rs + rngs.append(rs) backend.optimizers.put(opt_copy) del opt, opt_copy + # reconstruct observed points from epochs + # in shared mode each worker will remove the results once all the workers + # have read it (counter < 1) + counter = self.n_jobs + + def empty_dict(): + return {rs: [] for rs in rngs} + self.opt_empty_tuple = lambda: {rs: ((), ()) for rs in rngs} + self.Xi.update(empty_dict()) + self.yi.update(empty_dict()) + self.track_points() + # this is needed to keep track of results discovered within the same batch + # by each optimizer, use tuples! as the SyncManager doesn't handle nested dicts + Xi, yi = self.Xi, self.yi + results = {tuple(X): [yi[r][n], counter] for r in Xi for n, X in enumerate(Xi[r])} + results.update(self.opt_empty_tuple()) + backend.results_shared = backend.manager.dict(results) else: # if we have more than 1 optimizer but are using single opt, # pick one discard the rest... if n_opts > 0: self.opt = opts[-1] else: - self.opt = self.get_optimizer( - self.dimensions, self.n_jobs, self.opt_n_initial_points - ) + self.opt = self.get_optimizer() self.opt.void_loss = VOID_LOSS self.opt.void = False + self.opt.rs = self.random_state + # in single mode restore the points directly to the optimizer + # but delete first in case we have filtered the starting list of points + self.opt = Hyperopt.opt_clear(self.opt) + rs = self.random_state + self.Xi[rs] = [] + self.track_points() + if len(self.Xi[rs]) > 0: + self.opt.tell(self.Xi[rs], self.yi[rs], fit=False) + # delete points since in single mode the optimizer state sits in the main + # process and is not discarded + self.Xi, self.yi = {}, {} del opts[:] def setup_points(self): @@ -1028,6 +1132,20 @@ class Hyperopt: # initialize average best occurrence self.avg_best_occurrence = self.min_epochs // self.n_jobs + def return_results(self): + """ + results are passed by queue in multi mode, or stored by ask_and_tell in single mode + """ + batch_results = [] + if self.multi: + while not backend.results_batch.empty(): + worker_results = backend.results_batch.get() + batch_results.extend(worker_results) + else: + batch_results.extend(self.batch_results) + del self.batch_results[:] + return batch_results + def main_loop(self, jobs_scheduler): """ main parallel loop """ try: @@ -1036,7 +1154,7 @@ class Hyperopt: jobs = parallel._effective_n_jobs() logger.info(f'Effective number of parallel workers used: {jobs}') # update epochs count - n_points = self.n_points + opt_points = self.opt_points prev_batch = -1 epochs_so_far = len(self.trials) epochs_limit = self.epochs_limit @@ -1044,7 +1162,7 @@ class Hyperopt: columns -= 1 while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs: prev_batch = epochs_so_far - occurrence = int(self.avg_best_occurrence * (1 + self.effort)) + occurrence = int(self.avg_best_occurrence * max(1, self.effort)) # pad the batch length to the number of jobs to avoid desaturation batch_len = (occurrence + jobs - occurrence % jobs) @@ -1052,22 +1170,23 @@ class Hyperopt: # n_points (epochs) in 1 dispatch but this reduces the batch len too much # if self.multi: batch_len = batch_len // self.n_points # don't go over the limit - if epochs_so_far + batch_len * n_points >= epochs_limit(): - q, r = divmod(epochs_limit() - epochs_so_far, n_points) + if epochs_so_far + batch_len * opt_points >= epochs_limit(): + q, r = divmod(epochs_limit() - epochs_so_far, opt_points) batch_len = q + r print( - f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}" + f"{epochs_so_far+1}-{epochs_so_far+batch_len}" f"/{epochs_limit()}: ", end='') - f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs) + jobs_scheduler(parallel, batch_len, epochs_so_far, jobs) + batch_results = self.return_results() print(end='\r') - saved = self.log_results(f_val, epochs_so_far, epochs_limit()) + saved = self.log_results(batch_results, epochs_so_far, epochs_limit()) print('\r', ' ' * columns, end='\r') # stop if no epochs have been evaluated - if len(f_val) < batch_len: + if len(batch_results) < batch_len: logger.warning("Some evaluated epochs were void, " "check the loss function and the search space.") - if (not saved and len(f_val) > 1) or batch_len < 1 or \ + if (not saved and len(batch_results) > 1) or batch_len < 1 or \ (not saved and self.search_space_size < batch_len + epochs_limit()): break # log_results add diff --git a/freqtrade/optimize/hyperopt_backend.py b/freqtrade/optimize/hyperopt_backend.py index 7357fb4ee..1416f358d 100644 --- a/freqtrade/optimize/hyperopt_backend.py +++ b/freqtrade/optimize/hyperopt_backend.py @@ -1,4 +1,4 @@ -from typing import Any +from typing import Any, Dict, List, Tuple from queue import Queue from multiprocessing.managers import SyncManager @@ -6,8 +6,13 @@ hyperopt: Any = None manager: SyncManager # stores the optimizers in multi opt mode optimizers: Queue -# stores a list of the results to share between optimizers -# in the form of dict[tuple(Xi)] = yi -results_board: Queue -# store the results in single opt mode -results: Queue +# stores the results to share between optimizers +# in the form of key = Tuple[Xi], value = Tuple[float, int] +# where float is the loss and int is a decreasing counter of optimizers +# that have registered the result +results_shared: Dict[Tuple, Tuple] +# in single mode the results_list is used to pass the results to the optimizer +# to fit new models +results_list: List +# results_batch stores keeps results per batch that are eventually logged and stored +results_batch: Queue