hyperopt shared mode

- shared mode uses one optimizer with shared results
- multi mode runs as many optimizers as jobs and results are
only shared on ask
- a flag to override the strategy when asking more points (--lie-strat)
- make sure to ask with n_points `None` to avoid computing more
points than needed in shared mode
- reduce n of models to 1 in multi mode
- don't load more than the specified number of jobs when loading previous optimizers
- stretch the batch length to reach the epochs limit
- a warning for when no epochs are logged
This commit is contained in:
orehunt 2020-03-14 20:53:56 +01:00
parent ef6efb7117
commit a5b44de0f6
6 changed files with 155 additions and 97 deletions

View File

@ -26,7 +26,7 @@ ARGS_HYPEROPT = ARGS_COMMON_OPTIMIZE + [
"hyperopt", "hyperopt_path", "position_stacking", "epochs", "spaces",
"use_max_market_positions", "print_all", "print_colorized", "print_json", "hyperopt_jobs",
"hyperopt_random_state", "hyperopt_min_trades", "hyperopt_continue", "hyperopt_loss", "effort",
"multi_opt", "points_per_opt"
"mode", "n_points", "lie_strat"
]
ARGS_EDGE = ARGS_COMMON_OPTIMIZE + ["stoploss_range"]

View File

@ -204,20 +204,26 @@ AVAILABLE_CLI_OPTIONS = {
metavar='FLOAT',
default=constants.HYPEROPT_EFFORT,
),
"multi_opt":
Arg('--multi',
"mode":
Arg('--mode',
help=('Switches hyperopt to use one optimizer per job, use it',
'when backtesting iterations are cheap (default: %(default)d).'),
action='store_true',
default=False),
"points_per_opt":
Arg('--points-per-opt',
help=('Controls how many points to ask at each job dispatch to each',
'optimizer in multi opt mode, increase if cpu usage of each core',
metavar='NAME',
default=constants.HYPEROPT_MODE),
"n_points":
Arg('--n-points',
help=('Controls how many points to ask to the optimizer',
'increase if cpu usage of each core',
'appears low (default: %(default)d).'),
type=int,
metavar='INT',
default=constants.HYPEROPT_POINTS_PER_OPT),
default=constants.HYPEROPT_N_POINTS),
"lie_strat":
Arg('--lie-strat',
help=('Sets the strategy that the optimizer uses to lie',
'when asking for more than one point, ',
'no effect if n_point is one (default: %(default)d).'),
default=constants.HYPEROPT_LIE_STRAT),
"spaces":
Arg(
'--spaces',

View File

@ -270,10 +270,13 @@ class Configuration:
logstring='Parameter --effort detected ... '
'Parameter --effort detected: {}')
self._args_to_config(config,
argname='multi_opt',
logstring='Hyperopt will use multiple optimizers ...')
argname='mode',
logstring='Hyperopt will run in {} mode ...')
self._args_to_config(config,
argname='points_per_opt',
argname='explore',
logstring='Acquisition strategy set to random {}...')
self._args_to_config(config,
argname='n_points',
logstring='Optimizers will be asked for {} points...')
self._args_to_config(config,
argname='spaces',

View File

@ -8,7 +8,9 @@ DEFAULT_EXCHANGE = 'bittrex'
PROCESS_THROTTLE_SECS = 5 # sec
HYPEROPT_EPOCH = 0 # epochs
HYPEROPT_EFFORT = 0. # tune max epoch count
HYPEROPT_POINTS_PER_OPT = 2 # tune iterations between estimations
HYPEROPT_N_POINTS = 2 # tune iterations between estimations
HYPEROPT_MODE = 'single'
HYPEROPT_LIE_STRAT = 'default'
RETRY_TIMEOUT = 30 # sec
DEFAULT_HYPEROPT_LOSS = 'DefaultHyperOptLoss'
DEFAULT_DB_PROD_URL = 'sqlite:///tradesv3.sqlite'

View File

@ -97,7 +97,7 @@ class Hyperopt:
# a guessed number extracted by the space dimensions
self.search_space_size = 0
# total number of candles being backtested
self.n_samples = 0
self.n_candles = 0
self.current_best_loss = VOID_LOSS
self.current_best_epoch = 0
@ -113,37 +113,9 @@ class Hyperopt:
# evaluations
self.trials: List = []
# optimizers
self.opts: List[Optimizer] = []
self.opt: Optimizer = None
backend.manager = Manager()
if 'multi_opt' in self.config and self.config['multi_opt']:
self.multi = True
backend.optimizers = backend.manager.Queue()
backend.results_board = backend.manager.Queue(maxsize=1)
backend.results_board.put([])
self.opt_base_estimator = 'GBRT'
self.opt_acq_optimizer = 'sampling'
default_n_points = 2
else:
self.multi = False
backend.results = backend.manager.Queue()
self.opt_base_estimator = 'GP'
self.opt_acq_optimizer = 'lbfgs'
default_n_points = 1
# in single opt assume runs are expensive so default to 1 point per ask
self.n_points = self.config.get('points_per_opt', default_n_points)
# if 0 n_points are given, don't use any base estimator (akin to random search)
if self.n_points < 1:
self.n_points = 1
self.opt_base_estimator = "DUMMY"
self.opt_acq_optimizer = "sampling"
# var used in epochs and batches calculation
self.opt_points = self.n_jobs * self.n_points
# models are only needed for posterior eval
self.n_models = max(16, self.n_jobs)
# configure multi mode
self.setup_multi()
# Populate functions here (hasattr is slow so should not be run during "regular" operations)
if hasattr(self.custom_hyperopt, 'populate_indicators'):
@ -174,6 +146,60 @@ class Hyperopt:
self.print_colorized = self.config.get('print_colorized', False)
self.print_json = self.config.get('print_json', False)
def setup_multi(self):
# optimizers
self.opts: List[Optimizer] = []
self.opt: Optimizer = None
backend.manager = Manager()
self.mode = self.config.get('mode', 'single')
self.shared = False
if self.mode in ('multi', 'shared'):
self.multi = True
if self.mode == 'shared':
self.shared = True
backend.optimizers = backend.manager.Queue()
backend.results_board = backend.manager.Queue(maxsize=1)
backend.results_board.put({})
self.opt_base_estimator = 'GBRT'
self.opt_acq_optimizer = 'sampling'
# in multi opt one model is enough
self.n_models = 1
default_n_points = 2
else:
self.multi = False
backend.results = backend.manager.Queue()
self.opt_base_estimator = 'GP'
self.opt_acq_optimizer = 'lbfgs'
# models are only needed for posterior eval
self.n_models = min(16, self.n_jobs)
default_n_points = 1
# in single opt assume runs are expensive so default to 1 point per ask
self.n_points = self.config.get('n_points', default_n_points)
# if 0 n_points are given, don't use any base estimator (akin to random search)
if self.n_points < 1:
self.n_points = 1
self.opt_base_estimator = "DUMMY"
self.opt_acq_optimizer = "sampling"
if self.n_points < 2:
# ask_points is what is used in the ask call
# because when n_points is None, it doesn't
# waste time generating new points
self.ask_points = None
else:
self.ask_points = self.n_points
# var used in epochs and batches calculation
self.opt_points = self.n_jobs * (self.n_points or 1)
# lie strategy
lie_strat = self.config.get('lie_strat', 'default')
if lie_strat == 'default':
self.lie_strat = lambda: 'cl_min'
elif lie_strat == 'random':
self.lie_strat = self.get_next_point_strategy
else:
self.lie_strat = lambda: lie_strat
@staticmethod
def get_lock_filename(config: Dict[str, Any]) -> str:
@ -627,11 +653,11 @@ class Hyperopt:
if void_filtered: # again if all are filtered
opt.tell([list(v['params_dict'].values()) for v in void_filtered],
[v['loss'] for v in void_filtered],
fit=(len(to_ask) < 1)) # only fit when out of points
fit=(len(to_ask) < 1)) # only fit when out of points
del vals[:], void_filtered[:]
if not to_ask:
to_ask.extend(opt.ask(n_points=self.n_points))
to_ask.extend(opt.ask(n_points=self.n_points, strategy=self.lie_strat()))
a = tuple(to_ask.popleft())
while a in evald:
logger.info("this point was evaluated before...")
@ -642,55 +668,70 @@ class Hyperopt:
evald.add(a)
yield a
@staticmethod
def opt_get_past_points(asked: dict, results_board: Queue) -> dict:
""" fetch shared results between optimizers """
results = results_board.get()
results_board.put(results)
for a in asked:
if a in results:
asked[a] = results[a]
return asked
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue):
"""
objective run in multi opt mode, optimizers share the results as soon as they are completed
"""
self.log_results_immediate(n)
# fetch an optimizer instance
is_shared = self.shared
# get an optimizer instance
opt = optimizers.get()
# tell new points if any
results = results_board.get()
past_Xi = []
past_yi = []
for idx, res in enumerate(results):
unsubscribe = False
vals = res[0] # res[1] is the counter
for v in vals:
if list(v['params_dict'].values()) not in opt.Xi:
past_Xi.append(list(v['params_dict'].values()))
past_yi.append(v['loss'])
# decrease counter
if not unsubscribe:
unsubscribe = True
if unsubscribe:
results[idx][1] -= 1
if results[idx][1] < 1:
del results[idx]
# put back the updated results
results_board.put(results)
if len(past_Xi) > 0:
opt.tell(past_Xi, past_yi, fit=True)
if is_shared:
# get a random number before putting it back to avoid
# replication with other workers
rand = opt.rng.randint(0, VOID_LOSS)
optimizers.put(opt)
# switch the seed to get a different point
opt.rng.seed(rand)
opt.update_next()
# ask for points according to config
asked = opt.ask(n_points=self.n_points, strategy=self.get_next_point_strategy())
# run the backtest for each point
f_val = [self.backtest_params(e) for e in asked]
asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat())
# check if some points have been evaluated by other optimizers
p_asked = self.opt_get_past_points({tuple(a): None for a in asked}, results_board)
Xi_d = [] # done
Xi_t = [] # to do
for a in p_asked:
if p_asked[a] is not None:
Xi_d.append(a)
else:
Xi_t.append(a)
# run the backtest for each point to do (Xi_t)
f_val = [self.backtest_params(a) for a in Xi_t]
# filter losses
void_filtered = self.filter_void_losses(f_val, opt)
# tell the optimizer the results
# add points of the current dispatch if any
if opt.void_loss != VOID_LOSS or len(void_filtered) > 0:
Xi = [list(v['params_dict'].values()) for v in void_filtered]
yi = [v['loss'] for v in void_filtered]
# because we fit with points from other runs
# only fit if at the current dispatch there were no points
opt.tell(Xi, yi, fit=(len(past_Xi) < 1))
# update the board with the new results
results = results_board.get()
results.append([void_filtered, jobs - 1])
results_board.put(results)
# send back the updated optimizer
optimizers.put(opt)
Xi = [*Xi_d, *[list(v['params_dict'].values()) for v in void_filtered]]
yi = [*[p_asked[a] for a in Xi_d], *[v['loss'] for v in void_filtered]]
void = False
if is_shared:
# refresh the optimizer that stores all the points
opt = optimizers.get()
opt.tell(Xi, yi, fit=False)
else:
void = True
if not void or not is_shared:
# send back the updated optimizer only in non shared mode
# because in shared mode if all results are void we don't
# fetch it at all
optimizers.put(opt)
# update the board used to skip already computed points
results = results_board.get()
for v in void_filtered:
results[tuple(v['params_dict'].values())] = v['loss']
results_board.put(results)
return void_filtered
def parallel_objective(self, asked, results: Queue, n=0):
@ -839,14 +880,19 @@ class Hyperopt:
""" Setup the optimizers objects, try to load from disk, or create new ones """
# try to load previous optimizers
opts = self.load_previous_optimizers(self.opts_file)
n_opts = len(opts)
max_opts = self.n_jobs
if self.multi:
if len(opts) > 0:
# put the restored optimizers in the queue and clear them from the object
for opt in opts:
backend.optimizers.put(opt)
# when sharing results there is only one optimizer that gets copied
if self.shared:
max_opts = 1
# put the restored optimizers in the queue
if n_opts > 0:
for n in range(n_opts):
backend.optimizers.put(opts[n])
# generate as many optimizers as are still needed to fill the job count
remaining = self.n_jobs - backend.optimizers.qsize()
remaining = max_opts - backend.optimizers.qsize()
if remaining > 0:
opt = self.get_optimizer(self.dimensions, self.n_jobs, self.opt_n_initial_points)
for _ in range(remaining): # generate optimizers
@ -859,7 +905,7 @@ class Hyperopt:
else:
# if we have more than 1 optimizer but are using single opt,
# pick one discard the rest...
if len(opts) > 0:
if n_opts > 0:
self.opt = opts[-1]
else:
self.opt = self.get_optimizer(
@ -897,7 +943,7 @@ class Hyperopt:
# Trim startup period from analyzed dataframe
for pair, df in preprocessed.items():
preprocessed[pair] = trim_dataframe(df, timerange)
self.n_samples += len(preprocessed[pair])
self.n_candles += len(preprocessed[pair])
min_date, max_date = get_timerange(data)
logger.info(
@ -944,7 +990,8 @@ class Hyperopt:
# if self.multi: batch_len = batch_len // self.n_points
# don't go over the limit
if epochs_so_far + batch_len * n_points > epochs_limit():
batch_len = (epochs_limit() - epochs_so_far) // n_points
q, r = divmod(epochs_limit() - epochs_so_far, n_points)
batch_len = q + r
print(
f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}"
f"/{epochs_limit()}: ",
@ -952,9 +999,9 @@ class Hyperopt:
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, self.n_jobs)
saved = self.log_results(f_val, epochs_so_far, epochs_limit())
# stop if no epochs have been evaluated
if len(f_val) < 1:
logger.warning("All epochs evaluated were void, "
"check the loss function and the search space.")
if len(f_val) < batch_len:
logger.warning("Some evaluated epochs were void, "
"check the loss function and the search space.")
if (not saved and len(f_val) > 1) or batch_len < 1:
break
# log_results add

View File

@ -7,7 +7,7 @@ manager: SyncManager
# stores the optimizers in multi opt mode
optimizers: Queue
# stores a list of the results to share between optimizers
# each result is a tuple of the params_dict and a decreasing counter
# in the form of dict[tuple(Xi)] = yi
results_board: Queue
# store the results in single opt mode
results: Queue