From 0ccbaa8c966aa8dcf2f7784536d61a74213e7af4 Mon Sep 17 00:00:00 2001 From: orehunt Date: Fri, 20 Mar 2020 15:42:25 +0100 Subject: [PATCH] - refactoring - fixes to prevent stalling --- freqtrade/optimize/hyperopt.py | 236 +++++++++++++++++++-------------- 1 file changed, 137 insertions(+), 99 deletions(-) diff --git a/freqtrade/optimize/hyperopt.py b/freqtrade/optimize/hyperopt.py index f1ddaed5b..876978a1b 100644 --- a/freqtrade/optimize/hyperopt.py +++ b/freqtrade/optimize/hyperopt.py @@ -247,7 +247,7 @@ class Hyperopt: self.num_trials_saved = num_trials self.save_opts() if final: - logger.info(f"\n{num_trials} {plural(num_trials, 'epoch')} " + logger.info(f"{num_trials} {plural(num_trials, 'epoch')} " f"saved to '{self.trials_file}'.") def save_opts(self) -> None: @@ -659,6 +659,7 @@ class Hyperopt: to_ask: deque = deque() evald: Set[Tuple] = set() opt = self.opt + def point(): if self.ask_points: if to_ask: @@ -683,23 +684,67 @@ class Hyperopt: del vals[:], void_filtered[:] a = point() - while a in evald: + if a in evald: logger.debug("this point was evaluated before...") if not fit: opt.update_next() a = point() + if a in evald: + break evald.add(a) yield a @staticmethod - def opt_get_past_points(asked: dict, results_board: Queue) -> dict: + def opt_get_past_points(asked: dict, results_board: Queue) -> Tuple[dict, int]: """ fetch shared results between optimizers """ results = results_board.get() results_board.put(results) for a in asked: if a in results: asked[a] = results[a] - return asked + return asked, len(results) + + @staticmethod + def opt_state(shared: bool, optimizers: Queue) -> Tuple[Optimizer, int]: + """ fetch an optimizer in multi opt mode """ + # get an optimizer instance + opt = optimizers.get() + # this is the counter used by the optimizer internally to track the initial + # points evaluated so far.. + initial_points = opt._n_initial_points + if shared: + # get a random number before putting it back to avoid + # replication with other workers and keep reproducibility + rand = opt.rng.randint(0, VOID_LOSS) + optimizers.put(opt) + # switch the seed to get a different point + opt.rng.seed(rand) + opt, opt.void_loss = opt.copy(random_state=opt.rng), opt.void_loss + # a model is only fit after initial points + elif initial_points < 1: + opt.tell(opt.Xi, opt.yi) + # we have to get a new point anyway + else: + opt.update_next() + return opt, initial_points + + @staticmethod + def opt_results(void: bool, void_filtered: list, + initial_points: int, results_board: Queue) -> list: + # update the board used to skip already computed points + # NOTE: some results at the beginning won't be published + # because they are removed by the filter_void_losses + if not void: + results = results_board.get() + for v in void_filtered: + a = tuple(v['params_dict'].values()) + if a not in results: + results[a] = v['loss'] + results_board.put(results) + # set initial point flag + for n, v in enumerate(void_filtered): + v['is_initial_point'] = initial_points - n > 0 + return void_filtered def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue): """ @@ -707,47 +752,39 @@ class Hyperopt: """ self.log_results_immediate(n) is_shared = self.shared - # get an optimizer instance - opt = optimizers.get() - # this is the counter used by the optimizer internally to track the initial - # points evaluated so far.. - initial_points = opt._n_initial_points - - if is_shared: - # get a random number before putting it back to avoid - # replication with other workers and keep reproducibility - rand = opt.rng.randint(0, VOID_LOSS) - optimizers.put(opt) - # switch the seed to get a different point - opt.rng.seed(rand) - opt, opt.void_loss = opt.copy(random_state=opt.rng), opt.void_loss - # we have to get a new point if the last batch was all void - elif opt.void: - opt.update_next() - # a model is only fit after initial points - elif initial_points < 1: - opt.tell(opt.Xi, opt.yi) + id = optimizers.qsize() + opt, initial_points = self.opt_state(is_shared, optimizers) + sss = self.search_space_size + asked = {None: None} + asked_d = {} + told = 0 # told Xi_d = [] # done yi_d = [] Xi_t = [] # to do - # ask for points according to config - while True: + while asked != asked_d and len(opt.Xi) < sss: + asked_d = asked asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat()) if not self.ask_points: asked = {tuple(asked): None} else: asked = {tuple(a): None for a in asked} # check if some points have been evaluated by other optimizers - p_asked = self.opt_get_past_points(asked, results_board) + p_asked, _ = self.opt_get_past_points(asked, results_board) for a in p_asked: if p_asked[a] is not None: - Xi_d.append(a) - yi_d.append(p_asked[a]) + if a not in Xi_d: + Xi_d.append(a) + yi_d.append(p_asked[a]) else: Xi_t.append(a) if len(Xi_t) < self.n_points: - opt.update_next() + len_Xi_d = len(Xi_d) + if len_Xi_d > told: # tell new points + opt.tell(Xi_d[told:], yi_d[told:]) + told = len_Xi_d + else: + opt.update_next() else: break # run the backtest for each point to do (Xi_t) @@ -773,20 +810,8 @@ class Hyperopt: # don't pickle models del opt.models[:] optimizers.put(opt) - # update the board used to skip already computed points - # NOTE: some results at the beginning won't be published - # because they are removed by the filter_void_losses - if not void: - results = results_board.get() - for v in void_filtered: - a = tuple(v['params_dict'].values()) - if a not in results: - results[a] = v['loss'] - results_board.put(results) - # set initial point flag - for n, v in enumerate(void_filtered): - v['is_initial_point'] = initial_points - n > 0 - return void_filtered + + return self.opt_results(void, void_filtered, initial_points, results_board) def parallel_objective(self, asked, results: Queue = None, n=0): """ objective run in single opt mode, run the backtest, store the results into a queue """ @@ -892,9 +917,12 @@ class Hyperopt: n_parameters += len(d.bounds) # guess the size of the search space as the count of the # unordered combination of the dimensions entries - search_space_size = int( - (factorial(n_parameters) / - (factorial(n_parameters - n_dimensions) * factorial(n_dimensions)))) + try: + search_space_size = int( + (factorial(n_parameters) / + (factorial(n_parameters - n_dimensions) * factorial(n_dimensions)))) + except OverflowError: + search_space_size = VOID_LOSS # logger.info(f'Search space size: {search_space_size}') log_opt = int(log(opt_points, 2)) if opt_points > 4 else 2 if search_space_size < opt_points: @@ -913,7 +941,7 @@ class Hyperopt: n_initial_points = min(log_sss, search_space_size // 3) # it shall run for this much, I say min_epochs = int(max(n_initial_points, opt_points) * (1 + effort) + n_initial_points) - return n_initial_points, min_epochs, search_space_size + return n_initial_points or 1, min_epochs, search_space_size def update_max_epoch(self, val: Dict, current: int): """ calculate max epochs: store the number of non best epochs @@ -987,11 +1015,65 @@ class Hyperopt: # initialize average best occurrence self.avg_best_occurrence = self.min_epochs // self.n_jobs + def main_loop(self, jobs_scheduler): + """ main parallel loop """ + try: + if self.multi: + jobs_scheduler = self.run_multi_backtest_parallel + else: + jobs_scheduler = self.run_backtest_parallel + with parallel_backend('loky', inner_max_num_threads=2): + with Parallel(n_jobs=self.n_jobs, verbose=0, backend='loky') as parallel: + jobs = parallel._effective_n_jobs() + logger.info(f'Effective number of parallel workers used: {jobs}') + # update epochs count + n_points = self.n_points + prev_batch = -1 + epochs_so_far = len(self.trials) + epochs_limit = self.epochs_limit + columns, _ = os.get_terminal_size() + columns -= 1 + while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs: + prev_batch = epochs_so_far + occurrence = int(self.avg_best_occurrence * (1 + self.effort)) + # pad the batch length to the number of jobs to avoid desaturation + batch_len = (occurrence + jobs - + occurrence % jobs) + # when using multiple optimizers each worker performs + # n_points (epochs) in 1 dispatch but this reduces the batch len too much + # if self.multi: batch_len = batch_len // self.n_points + # don't go over the limit + if epochs_so_far + batch_len * n_points >= epochs_limit(): + q, r = divmod(epochs_limit() - epochs_so_far, n_points) + batch_len = q + r + print( + f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}" + f"/{epochs_limit()}: ", + end='') + f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs) + print(end='\r') + saved = self.log_results(f_val, epochs_so_far, epochs_limit()) + print('\r', ' ' * columns, end='\r') + # stop if no epochs have been evaluated + if len(f_val) < batch_len: + logger.warning("Some evaluated epochs were void, " + "check the loss function and the search space.") + if (not saved and len(f_val) > 1) or batch_len < 1 or \ + (not saved and self.search_space_size < batch_len + epochs_limit()): + break + # log_results add + epochs_so_far += saved + if self.max_epoch_reached: + logger.info("Max epoch reached, terminating.") + break + except KeyboardInterrupt: + print('User interrupted..') + def start(self) -> None: """ Broom Broom """ self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None)) logger.info(f"Using optimizer random state: {self.random_state}") - + self.hyperopt_table_header = -1 data, timerange = self.backtesting.load_bt_data() preprocessed = self.backtesting.strategy.tickerdata_to_dataframe(data) @@ -1024,57 +1106,13 @@ class Hyperopt: colorama_init(autoreset=True) self.setup_optimizers() - try: - if self.multi: - jobs_scheduler = self.run_multi_backtest_parallel - else: - jobs_scheduler = self.run_backtest_parallel - with parallel_backend('loky', inner_max_num_threads=2): - with Parallel(n_jobs=self.n_jobs, verbose=0, backend='loky') as parallel: - jobs = parallel._effective_n_jobs() - logger.info(f'Effective number of parallel workers used: {jobs}') - # update epochs count - n_points = self.n_points - prev_batch = -1 - epochs_so_far = len(self.trials) - epochs_limit = self.epochs_limit - columns, _ = os.get_terminal_size() - columns -= 1 - while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs: - prev_batch = epochs_so_far - occurrence = int(self.avg_best_occurrence * (1 + self.effort)) - # pad the batch length to the number of jobs to avoid desaturation - batch_len = (occurrence + jobs - - occurrence % jobs) - # when using multiple optimizers each worker performs - # n_points (epochs) in 1 dispatch but this reduces the batch len too much - # if self.multi: batch_len = batch_len // self.n_points - # don't go over the limit - if epochs_so_far + batch_len * n_points >= epochs_limit(): - q, r = divmod(epochs_limit() - epochs_so_far, n_points) - batch_len = q + r - print( - f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}" - f"/{epochs_limit()}: ", - end='') - f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs) - print(end='\r') - saved = self.log_results(f_val, epochs_so_far, epochs_limit()) - print('\r', ' ' * columns, end='\r') - # stop if no epochs have been evaluated - if len(f_val) < batch_len: - logger.warning("Some evaluated epochs were void, " - "check the loss function and the search space.") - if (not saved and len(f_val) > 1) or batch_len < 1: - break - # log_results add - epochs_so_far += saved - if self.max_epoch_reached: - logger.info("Max epoch reached, terminating.") - break - except KeyboardInterrupt: - print('User interrupted..') + if self.multi: + jobs_scheduler = self.run_multi_backtest_parallel + else: + jobs_scheduler = self.run_backtest_parallel + + self.main_loop(jobs_scheduler) self.save_trials(final=True)