From a5b44de0f6fb5774381a8e458d6745a15937b97e Mon Sep 17 00:00:00 2001 From: orehunt Date: Sat, 14 Mar 2020 20:53:56 +0100 Subject: [PATCH] hyperopt shared mode - shared mode uses one optimizer with shared results - multi mode runs as many optimizers as jobs and results are only shared on ask - a flag to override the strategy when asking more points (--lie-strat) - make sure to ask with n_points `None` to avoid computing more points than needed in shared mode - reduce n of models to 1 in multi mode - don't load more than the specified number of jobs when loading previous optimizers - stretch the batch length to reach the epochs limit - a warning for when no epochs are logged --- freqtrade/commands/arguments.py | 2 +- freqtrade/commands/cli_options.py | 24 ++- freqtrade/configuration/configuration.py | 9 +- freqtrade/constants.py | 4 +- freqtrade/optimize/hyperopt.py | 211 ++++++++++++++--------- freqtrade/optimize/hyperopt_backend.py | 2 +- 6 files changed, 155 insertions(+), 97 deletions(-) diff --git a/freqtrade/commands/arguments.py b/freqtrade/commands/arguments.py index 323a556f8..6afb9ea96 100644 --- a/freqtrade/commands/arguments.py +++ b/freqtrade/commands/arguments.py @@ -26,7 +26,7 @@ ARGS_HYPEROPT = ARGS_COMMON_OPTIMIZE + [ "hyperopt", "hyperopt_path", "position_stacking", "epochs", "spaces", "use_max_market_positions", "print_all", "print_colorized", "print_json", "hyperopt_jobs", "hyperopt_random_state", "hyperopt_min_trades", "hyperopt_continue", "hyperopt_loss", "effort", - "multi_opt", "points_per_opt" + "mode", "n_points", "lie_strat" ] ARGS_EDGE = ARGS_COMMON_OPTIMIZE + ["stoploss_range"] diff --git a/freqtrade/commands/cli_options.py b/freqtrade/commands/cli_options.py index 0aac462dd..b49f893e3 100644 --- a/freqtrade/commands/cli_options.py +++ b/freqtrade/commands/cli_options.py @@ -204,20 +204,26 @@ AVAILABLE_CLI_OPTIONS = { metavar='FLOAT', default=constants.HYPEROPT_EFFORT, ), - "multi_opt": - Arg('--multi', + "mode": + Arg('--mode', help=('Switches hyperopt to use one optimizer per job, use it', 'when backtesting iterations are cheap (default: %(default)d).'), - action='store_true', - default=False), - "points_per_opt": - Arg('--points-per-opt', - help=('Controls how many points to ask at each job dispatch to each', - 'optimizer in multi opt mode, increase if cpu usage of each core', + metavar='NAME', + default=constants.HYPEROPT_MODE), + "n_points": + Arg('--n-points', + help=('Controls how many points to ask to the optimizer', + 'increase if cpu usage of each core', 'appears low (default: %(default)d).'), type=int, metavar='INT', - default=constants.HYPEROPT_POINTS_PER_OPT), + default=constants.HYPEROPT_N_POINTS), + "lie_strat": + Arg('--lie-strat', + help=('Sets the strategy that the optimizer uses to lie', + 'when asking for more than one point, ', + 'no effect if n_point is one (default: %(default)d).'), + default=constants.HYPEROPT_LIE_STRAT), "spaces": Arg( '--spaces', diff --git a/freqtrade/configuration/configuration.py b/freqtrade/configuration/configuration.py index 7c9c01237..40db823de 100644 --- a/freqtrade/configuration/configuration.py +++ b/freqtrade/configuration/configuration.py @@ -270,10 +270,13 @@ class Configuration: logstring='Parameter --effort detected ... ' 'Parameter --effort detected: {}') self._args_to_config(config, - argname='multi_opt', - logstring='Hyperopt will use multiple optimizers ...') + argname='mode', + logstring='Hyperopt will run in {} mode ...') self._args_to_config(config, - argname='points_per_opt', + argname='explore', + logstring='Acquisition strategy set to random {}...') + self._args_to_config(config, + argname='n_points', logstring='Optimizers will be asked for {} points...') self._args_to_config(config, argname='spaces', diff --git a/freqtrade/constants.py b/freqtrade/constants.py index 2c24fd01e..48a0700df 100644 --- a/freqtrade/constants.py +++ b/freqtrade/constants.py @@ -8,7 +8,9 @@ DEFAULT_EXCHANGE = 'bittrex' PROCESS_THROTTLE_SECS = 5 # sec HYPEROPT_EPOCH = 0 # epochs HYPEROPT_EFFORT = 0. # tune max epoch count -HYPEROPT_POINTS_PER_OPT = 2 # tune iterations between estimations +HYPEROPT_N_POINTS = 2 # tune iterations between estimations +HYPEROPT_MODE = 'single' +HYPEROPT_LIE_STRAT = 'default' RETRY_TIMEOUT = 30 # sec DEFAULT_HYPEROPT_LOSS = 'DefaultHyperOptLoss' DEFAULT_DB_PROD_URL = 'sqlite:///tradesv3.sqlite' diff --git a/freqtrade/optimize/hyperopt.py b/freqtrade/optimize/hyperopt.py index 06dbfb5cb..006cc230d 100644 --- a/freqtrade/optimize/hyperopt.py +++ b/freqtrade/optimize/hyperopt.py @@ -97,7 +97,7 @@ class Hyperopt: # a guessed number extracted by the space dimensions self.search_space_size = 0 # total number of candles being backtested - self.n_samples = 0 + self.n_candles = 0 self.current_best_loss = VOID_LOSS self.current_best_epoch = 0 @@ -113,37 +113,9 @@ class Hyperopt: # evaluations self.trials: List = [] - # optimizers - self.opts: List[Optimizer] = [] - self.opt: Optimizer = None - backend.manager = Manager() - if 'multi_opt' in self.config and self.config['multi_opt']: - self.multi = True - backend.optimizers = backend.manager.Queue() - backend.results_board = backend.manager.Queue(maxsize=1) - backend.results_board.put([]) - self.opt_base_estimator = 'GBRT' - self.opt_acq_optimizer = 'sampling' - default_n_points = 2 - else: - self.multi = False - backend.results = backend.manager.Queue() - self.opt_base_estimator = 'GP' - self.opt_acq_optimizer = 'lbfgs' - default_n_points = 1 - - # in single opt assume runs are expensive so default to 1 point per ask - self.n_points = self.config.get('points_per_opt', default_n_points) - # if 0 n_points are given, don't use any base estimator (akin to random search) - if self.n_points < 1: - self.n_points = 1 - self.opt_base_estimator = "DUMMY" - self.opt_acq_optimizer = "sampling" - # var used in epochs and batches calculation - self.opt_points = self.n_jobs * self.n_points - # models are only needed for posterior eval - self.n_models = max(16, self.n_jobs) + # configure multi mode + self.setup_multi() # Populate functions here (hasattr is slow so should not be run during "regular" operations) if hasattr(self.custom_hyperopt, 'populate_indicators'): @@ -174,6 +146,60 @@ class Hyperopt: self.print_colorized = self.config.get('print_colorized', False) self.print_json = self.config.get('print_json', False) + def setup_multi(self): + # optimizers + self.opts: List[Optimizer] = [] + self.opt: Optimizer = None + + backend.manager = Manager() + self.mode = self.config.get('mode', 'single') + self.shared = False + if self.mode in ('multi', 'shared'): + self.multi = True + if self.mode == 'shared': + self.shared = True + backend.optimizers = backend.manager.Queue() + backend.results_board = backend.manager.Queue(maxsize=1) + backend.results_board.put({}) + self.opt_base_estimator = 'GBRT' + self.opt_acq_optimizer = 'sampling' + # in multi opt one model is enough + self.n_models = 1 + default_n_points = 2 + else: + self.multi = False + backend.results = backend.manager.Queue() + self.opt_base_estimator = 'GP' + self.opt_acq_optimizer = 'lbfgs' + # models are only needed for posterior eval + self.n_models = min(16, self.n_jobs) + default_n_points = 1 + + # in single opt assume runs are expensive so default to 1 point per ask + self.n_points = self.config.get('n_points', default_n_points) + # if 0 n_points are given, don't use any base estimator (akin to random search) + if self.n_points < 1: + self.n_points = 1 + self.opt_base_estimator = "DUMMY" + self.opt_acq_optimizer = "sampling" + if self.n_points < 2: + # ask_points is what is used in the ask call + # because when n_points is None, it doesn't + # waste time generating new points + self.ask_points = None + else: + self.ask_points = self.n_points + # var used in epochs and batches calculation + self.opt_points = self.n_jobs * (self.n_points or 1) + # lie strategy + lie_strat = self.config.get('lie_strat', 'default') + if lie_strat == 'default': + self.lie_strat = lambda: 'cl_min' + elif lie_strat == 'random': + self.lie_strat = self.get_next_point_strategy + else: + self.lie_strat = lambda: lie_strat + @staticmethod def get_lock_filename(config: Dict[str, Any]) -> str: @@ -627,11 +653,11 @@ class Hyperopt: if void_filtered: # again if all are filtered opt.tell([list(v['params_dict'].values()) for v in void_filtered], [v['loss'] for v in void_filtered], - fit=(len(to_ask) < 1)) # only fit when out of points + fit=(len(to_ask) < 1)) # only fit when out of points del vals[:], void_filtered[:] if not to_ask: - to_ask.extend(opt.ask(n_points=self.n_points)) + to_ask.extend(opt.ask(n_points=self.n_points, strategy=self.lie_strat())) a = tuple(to_ask.popleft()) while a in evald: logger.info("this point was evaluated before...") @@ -642,55 +668,70 @@ class Hyperopt: evald.add(a) yield a + @staticmethod + def opt_get_past_points(asked: dict, results_board: Queue) -> dict: + """ fetch shared results between optimizers """ + results = results_board.get() + results_board.put(results) + for a in asked: + if a in results: + asked[a] = results[a] + return asked + def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue): """ objective run in multi opt mode, optimizers share the results as soon as they are completed """ self.log_results_immediate(n) - # fetch an optimizer instance + is_shared = self.shared + # get an optimizer instance opt = optimizers.get() - # tell new points if any - results = results_board.get() - past_Xi = [] - past_yi = [] - for idx, res in enumerate(results): - unsubscribe = False - vals = res[0] # res[1] is the counter - for v in vals: - if list(v['params_dict'].values()) not in opt.Xi: - past_Xi.append(list(v['params_dict'].values())) - past_yi.append(v['loss']) - # decrease counter - if not unsubscribe: - unsubscribe = True - if unsubscribe: - results[idx][1] -= 1 - if results[idx][1] < 1: - del results[idx] - # put back the updated results - results_board.put(results) - if len(past_Xi) > 0: - opt.tell(past_Xi, past_yi, fit=True) + + if is_shared: + # get a random number before putting it back to avoid + # replication with other workers + rand = opt.rng.randint(0, VOID_LOSS) + optimizers.put(opt) + # switch the seed to get a different point + opt.rng.seed(rand) + opt.update_next() # ask for points according to config - asked = opt.ask(n_points=self.n_points, strategy=self.get_next_point_strategy()) - # run the backtest for each point - f_val = [self.backtest_params(e) for e in asked] + asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat()) + # check if some points have been evaluated by other optimizers + p_asked = self.opt_get_past_points({tuple(a): None for a in asked}, results_board) + Xi_d = [] # done + Xi_t = [] # to do + for a in p_asked: + if p_asked[a] is not None: + Xi_d.append(a) + else: + Xi_t.append(a) + # run the backtest for each point to do (Xi_t) + f_val = [self.backtest_params(a) for a in Xi_t] # filter losses void_filtered = self.filter_void_losses(f_val, opt) - # tell the optimizer the results + # add points of the current dispatch if any if opt.void_loss != VOID_LOSS or len(void_filtered) > 0: - Xi = [list(v['params_dict'].values()) for v in void_filtered] - yi = [v['loss'] for v in void_filtered] - # because we fit with points from other runs - # only fit if at the current dispatch there were no points - opt.tell(Xi, yi, fit=(len(past_Xi) < 1)) - # update the board with the new results - results = results_board.get() - results.append([void_filtered, jobs - 1]) - results_board.put(results) - # send back the updated optimizer - optimizers.put(opt) + Xi = [*Xi_d, *[list(v['params_dict'].values()) for v in void_filtered]] + yi = [*[p_asked[a] for a in Xi_d], *[v['loss'] for v in void_filtered]] + void = False + if is_shared: + # refresh the optimizer that stores all the points + opt = optimizers.get() + opt.tell(Xi, yi, fit=False) + else: + void = True + if not void or not is_shared: + # send back the updated optimizer only in non shared mode + # because in shared mode if all results are void we don't + # fetch it at all + optimizers.put(opt) + # update the board used to skip already computed points + results = results_board.get() + for v in void_filtered: + results[tuple(v['params_dict'].values())] = v['loss'] + results_board.put(results) return void_filtered def parallel_objective(self, asked, results: Queue, n=0): @@ -839,14 +880,19 @@ class Hyperopt: """ Setup the optimizers objects, try to load from disk, or create new ones """ # try to load previous optimizers opts = self.load_previous_optimizers(self.opts_file) + n_opts = len(opts) + max_opts = self.n_jobs if self.multi: - if len(opts) > 0: - # put the restored optimizers in the queue and clear them from the object - for opt in opts: - backend.optimizers.put(opt) + # when sharing results there is only one optimizer that gets copied + if self.shared: + max_opts = 1 + # put the restored optimizers in the queue + if n_opts > 0: + for n in range(n_opts): + backend.optimizers.put(opts[n]) # generate as many optimizers as are still needed to fill the job count - remaining = self.n_jobs - backend.optimizers.qsize() + remaining = max_opts - backend.optimizers.qsize() if remaining > 0: opt = self.get_optimizer(self.dimensions, self.n_jobs, self.opt_n_initial_points) for _ in range(remaining): # generate optimizers @@ -859,7 +905,7 @@ class Hyperopt: else: # if we have more than 1 optimizer but are using single opt, # pick one discard the rest... - if len(opts) > 0: + if n_opts > 0: self.opt = opts[-1] else: self.opt = self.get_optimizer( @@ -897,7 +943,7 @@ class Hyperopt: # Trim startup period from analyzed dataframe for pair, df in preprocessed.items(): preprocessed[pair] = trim_dataframe(df, timerange) - self.n_samples += len(preprocessed[pair]) + self.n_candles += len(preprocessed[pair]) min_date, max_date = get_timerange(data) logger.info( @@ -944,7 +990,8 @@ class Hyperopt: # if self.multi: batch_len = batch_len // self.n_points # don't go over the limit if epochs_so_far + batch_len * n_points > epochs_limit(): - batch_len = (epochs_limit() - epochs_so_far) // n_points + q, r = divmod(epochs_limit() - epochs_so_far, n_points) + batch_len = q + r print( f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}" f"/{epochs_limit()}: ", @@ -952,9 +999,9 @@ class Hyperopt: f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, self.n_jobs) saved = self.log_results(f_val, epochs_so_far, epochs_limit()) # stop if no epochs have been evaluated - if len(f_val) < 1: - logger.warning("All epochs evaluated were void, " - "check the loss function and the search space.") + if len(f_val) < batch_len: + logger.warning("Some evaluated epochs were void, " + "check the loss function and the search space.") if (not saved and len(f_val) > 1) or batch_len < 1: break # log_results add diff --git a/freqtrade/optimize/hyperopt_backend.py b/freqtrade/optimize/hyperopt_backend.py index 4871c44ca..7357fb4ee 100644 --- a/freqtrade/optimize/hyperopt_backend.py +++ b/freqtrade/optimize/hyperopt_backend.py @@ -7,7 +7,7 @@ manager: SyncManager # stores the optimizers in multi opt mode optimizers: Queue # stores a list of the results to share between optimizers -# each result is a tuple of the params_dict and a decreasing counter +# in the form of dict[tuple(Xi)] = yi results_board: Queue # store the results in single opt mode results: Queue