- refactoring
- fixes to prevent stalling
This commit is contained in:
parent
9e0b07b2fd
commit
0ccbaa8c96
@ -247,7 +247,7 @@ class Hyperopt:
|
|||||||
self.num_trials_saved = num_trials
|
self.num_trials_saved = num_trials
|
||||||
self.save_opts()
|
self.save_opts()
|
||||||
if final:
|
if final:
|
||||||
logger.info(f"\n{num_trials} {plural(num_trials, 'epoch')} "
|
logger.info(f"{num_trials} {plural(num_trials, 'epoch')} "
|
||||||
f"saved to '{self.trials_file}'.")
|
f"saved to '{self.trials_file}'.")
|
||||||
|
|
||||||
def save_opts(self) -> None:
|
def save_opts(self) -> None:
|
||||||
@ -659,6 +659,7 @@ class Hyperopt:
|
|||||||
to_ask: deque = deque()
|
to_ask: deque = deque()
|
||||||
evald: Set[Tuple] = set()
|
evald: Set[Tuple] = set()
|
||||||
opt = self.opt
|
opt = self.opt
|
||||||
|
|
||||||
def point():
|
def point():
|
||||||
if self.ask_points:
|
if self.ask_points:
|
||||||
if to_ask:
|
if to_ask:
|
||||||
@ -683,23 +684,67 @@ class Hyperopt:
|
|||||||
del vals[:], void_filtered[:]
|
del vals[:], void_filtered[:]
|
||||||
|
|
||||||
a = point()
|
a = point()
|
||||||
while a in evald:
|
if a in evald:
|
||||||
logger.debug("this point was evaluated before...")
|
logger.debug("this point was evaluated before...")
|
||||||
if not fit:
|
if not fit:
|
||||||
opt.update_next()
|
opt.update_next()
|
||||||
a = point()
|
a = point()
|
||||||
|
if a in evald:
|
||||||
|
break
|
||||||
evald.add(a)
|
evald.add(a)
|
||||||
yield a
|
yield a
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def opt_get_past_points(asked: dict, results_board: Queue) -> dict:
|
def opt_get_past_points(asked: dict, results_board: Queue) -> Tuple[dict, int]:
|
||||||
""" fetch shared results between optimizers """
|
""" fetch shared results between optimizers """
|
||||||
results = results_board.get()
|
results = results_board.get()
|
||||||
results_board.put(results)
|
results_board.put(results)
|
||||||
for a in asked:
|
for a in asked:
|
||||||
if a in results:
|
if a in results:
|
||||||
asked[a] = results[a]
|
asked[a] = results[a]
|
||||||
return asked
|
return asked, len(results)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def opt_state(shared: bool, optimizers: Queue) -> Tuple[Optimizer, int]:
|
||||||
|
""" fetch an optimizer in multi opt mode """
|
||||||
|
# get an optimizer instance
|
||||||
|
opt = optimizers.get()
|
||||||
|
# this is the counter used by the optimizer internally to track the initial
|
||||||
|
# points evaluated so far..
|
||||||
|
initial_points = opt._n_initial_points
|
||||||
|
if shared:
|
||||||
|
# get a random number before putting it back to avoid
|
||||||
|
# replication with other workers and keep reproducibility
|
||||||
|
rand = opt.rng.randint(0, VOID_LOSS)
|
||||||
|
optimizers.put(opt)
|
||||||
|
# switch the seed to get a different point
|
||||||
|
opt.rng.seed(rand)
|
||||||
|
opt, opt.void_loss = opt.copy(random_state=opt.rng), opt.void_loss
|
||||||
|
# a model is only fit after initial points
|
||||||
|
elif initial_points < 1:
|
||||||
|
opt.tell(opt.Xi, opt.yi)
|
||||||
|
# we have to get a new point anyway
|
||||||
|
else:
|
||||||
|
opt.update_next()
|
||||||
|
return opt, initial_points
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def opt_results(void: bool, void_filtered: list,
|
||||||
|
initial_points: int, results_board: Queue) -> list:
|
||||||
|
# update the board used to skip already computed points
|
||||||
|
# NOTE: some results at the beginning won't be published
|
||||||
|
# because they are removed by the filter_void_losses
|
||||||
|
if not void:
|
||||||
|
results = results_board.get()
|
||||||
|
for v in void_filtered:
|
||||||
|
a = tuple(v['params_dict'].values())
|
||||||
|
if a not in results:
|
||||||
|
results[a] = v['loss']
|
||||||
|
results_board.put(results)
|
||||||
|
# set initial point flag
|
||||||
|
for n, v in enumerate(void_filtered):
|
||||||
|
v['is_initial_point'] = initial_points - n > 0
|
||||||
|
return void_filtered
|
||||||
|
|
||||||
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue):
|
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue):
|
||||||
"""
|
"""
|
||||||
@ -707,47 +752,39 @@ class Hyperopt:
|
|||||||
"""
|
"""
|
||||||
self.log_results_immediate(n)
|
self.log_results_immediate(n)
|
||||||
is_shared = self.shared
|
is_shared = self.shared
|
||||||
# get an optimizer instance
|
id = optimizers.qsize()
|
||||||
opt = optimizers.get()
|
opt, initial_points = self.opt_state(is_shared, optimizers)
|
||||||
# this is the counter used by the optimizer internally to track the initial
|
sss = self.search_space_size
|
||||||
# points evaluated so far..
|
asked = {None: None}
|
||||||
initial_points = opt._n_initial_points
|
asked_d = {}
|
||||||
|
|
||||||
if is_shared:
|
|
||||||
# get a random number before putting it back to avoid
|
|
||||||
# replication with other workers and keep reproducibility
|
|
||||||
rand = opt.rng.randint(0, VOID_LOSS)
|
|
||||||
optimizers.put(opt)
|
|
||||||
# switch the seed to get a different point
|
|
||||||
opt.rng.seed(rand)
|
|
||||||
opt, opt.void_loss = opt.copy(random_state=opt.rng), opt.void_loss
|
|
||||||
# we have to get a new point if the last batch was all void
|
|
||||||
elif opt.void:
|
|
||||||
opt.update_next()
|
|
||||||
# a model is only fit after initial points
|
|
||||||
elif initial_points < 1:
|
|
||||||
opt.tell(opt.Xi, opt.yi)
|
|
||||||
|
|
||||||
|
told = 0 # told
|
||||||
Xi_d = [] # done
|
Xi_d = [] # done
|
||||||
yi_d = []
|
yi_d = []
|
||||||
Xi_t = [] # to do
|
Xi_t = [] # to do
|
||||||
# ask for points according to config
|
while asked != asked_d and len(opt.Xi) < sss:
|
||||||
while True:
|
asked_d = asked
|
||||||
asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat())
|
asked = opt.ask(n_points=self.ask_points, strategy=self.lie_strat())
|
||||||
if not self.ask_points:
|
if not self.ask_points:
|
||||||
asked = {tuple(asked): None}
|
asked = {tuple(asked): None}
|
||||||
else:
|
else:
|
||||||
asked = {tuple(a): None for a in asked}
|
asked = {tuple(a): None for a in asked}
|
||||||
# check if some points have been evaluated by other optimizers
|
# check if some points have been evaluated by other optimizers
|
||||||
p_asked = self.opt_get_past_points(asked, results_board)
|
p_asked, _ = self.opt_get_past_points(asked, results_board)
|
||||||
for a in p_asked:
|
for a in p_asked:
|
||||||
if p_asked[a] is not None:
|
if p_asked[a] is not None:
|
||||||
Xi_d.append(a)
|
if a not in Xi_d:
|
||||||
yi_d.append(p_asked[a])
|
Xi_d.append(a)
|
||||||
|
yi_d.append(p_asked[a])
|
||||||
else:
|
else:
|
||||||
Xi_t.append(a)
|
Xi_t.append(a)
|
||||||
if len(Xi_t) < self.n_points:
|
if len(Xi_t) < self.n_points:
|
||||||
opt.update_next()
|
len_Xi_d = len(Xi_d)
|
||||||
|
if len_Xi_d > told: # tell new points
|
||||||
|
opt.tell(Xi_d[told:], yi_d[told:])
|
||||||
|
told = len_Xi_d
|
||||||
|
else:
|
||||||
|
opt.update_next()
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
# run the backtest for each point to do (Xi_t)
|
# run the backtest for each point to do (Xi_t)
|
||||||
@ -773,20 +810,8 @@ class Hyperopt:
|
|||||||
# don't pickle models
|
# don't pickle models
|
||||||
del opt.models[:]
|
del opt.models[:]
|
||||||
optimizers.put(opt)
|
optimizers.put(opt)
|
||||||
# update the board used to skip already computed points
|
|
||||||
# NOTE: some results at the beginning won't be published
|
return self.opt_results(void, void_filtered, initial_points, results_board)
|
||||||
# because they are removed by the filter_void_losses
|
|
||||||
if not void:
|
|
||||||
results = results_board.get()
|
|
||||||
for v in void_filtered:
|
|
||||||
a = tuple(v['params_dict'].values())
|
|
||||||
if a not in results:
|
|
||||||
results[a] = v['loss']
|
|
||||||
results_board.put(results)
|
|
||||||
# set initial point flag
|
|
||||||
for n, v in enumerate(void_filtered):
|
|
||||||
v['is_initial_point'] = initial_points - n > 0
|
|
||||||
return void_filtered
|
|
||||||
|
|
||||||
def parallel_objective(self, asked, results: Queue = None, n=0):
|
def parallel_objective(self, asked, results: Queue = None, n=0):
|
||||||
""" objective run in single opt mode, run the backtest, store the results into a queue """
|
""" objective run in single opt mode, run the backtest, store the results into a queue """
|
||||||
@ -892,9 +917,12 @@ class Hyperopt:
|
|||||||
n_parameters += len(d.bounds)
|
n_parameters += len(d.bounds)
|
||||||
# guess the size of the search space as the count of the
|
# guess the size of the search space as the count of the
|
||||||
# unordered combination of the dimensions entries
|
# unordered combination of the dimensions entries
|
||||||
search_space_size = int(
|
try:
|
||||||
(factorial(n_parameters) /
|
search_space_size = int(
|
||||||
(factorial(n_parameters - n_dimensions) * factorial(n_dimensions))))
|
(factorial(n_parameters) /
|
||||||
|
(factorial(n_parameters - n_dimensions) * factorial(n_dimensions))))
|
||||||
|
except OverflowError:
|
||||||
|
search_space_size = VOID_LOSS
|
||||||
# logger.info(f'Search space size: {search_space_size}')
|
# logger.info(f'Search space size: {search_space_size}')
|
||||||
log_opt = int(log(opt_points, 2)) if opt_points > 4 else 2
|
log_opt = int(log(opt_points, 2)) if opt_points > 4 else 2
|
||||||
if search_space_size < opt_points:
|
if search_space_size < opt_points:
|
||||||
@ -913,7 +941,7 @@ class Hyperopt:
|
|||||||
n_initial_points = min(log_sss, search_space_size // 3)
|
n_initial_points = min(log_sss, search_space_size // 3)
|
||||||
# it shall run for this much, I say
|
# it shall run for this much, I say
|
||||||
min_epochs = int(max(n_initial_points, opt_points) * (1 + effort) + n_initial_points)
|
min_epochs = int(max(n_initial_points, opt_points) * (1 + effort) + n_initial_points)
|
||||||
return n_initial_points, min_epochs, search_space_size
|
return n_initial_points or 1, min_epochs, search_space_size
|
||||||
|
|
||||||
def update_max_epoch(self, val: Dict, current: int):
|
def update_max_epoch(self, val: Dict, current: int):
|
||||||
""" calculate max epochs: store the number of non best epochs
|
""" calculate max epochs: store the number of non best epochs
|
||||||
@ -987,11 +1015,65 @@ class Hyperopt:
|
|||||||
# initialize average best occurrence
|
# initialize average best occurrence
|
||||||
self.avg_best_occurrence = self.min_epochs // self.n_jobs
|
self.avg_best_occurrence = self.min_epochs // self.n_jobs
|
||||||
|
|
||||||
|
def main_loop(self, jobs_scheduler):
|
||||||
|
""" main parallel loop """
|
||||||
|
try:
|
||||||
|
if self.multi:
|
||||||
|
jobs_scheduler = self.run_multi_backtest_parallel
|
||||||
|
else:
|
||||||
|
jobs_scheduler = self.run_backtest_parallel
|
||||||
|
with parallel_backend('loky', inner_max_num_threads=2):
|
||||||
|
with Parallel(n_jobs=self.n_jobs, verbose=0, backend='loky') as parallel:
|
||||||
|
jobs = parallel._effective_n_jobs()
|
||||||
|
logger.info(f'Effective number of parallel workers used: {jobs}')
|
||||||
|
# update epochs count
|
||||||
|
n_points = self.n_points
|
||||||
|
prev_batch = -1
|
||||||
|
epochs_so_far = len(self.trials)
|
||||||
|
epochs_limit = self.epochs_limit
|
||||||
|
columns, _ = os.get_terminal_size()
|
||||||
|
columns -= 1
|
||||||
|
while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs:
|
||||||
|
prev_batch = epochs_so_far
|
||||||
|
occurrence = int(self.avg_best_occurrence * (1 + self.effort))
|
||||||
|
# pad the batch length to the number of jobs to avoid desaturation
|
||||||
|
batch_len = (occurrence + jobs -
|
||||||
|
occurrence % jobs)
|
||||||
|
# when using multiple optimizers each worker performs
|
||||||
|
# n_points (epochs) in 1 dispatch but this reduces the batch len too much
|
||||||
|
# if self.multi: batch_len = batch_len // self.n_points
|
||||||
|
# don't go over the limit
|
||||||
|
if epochs_so_far + batch_len * n_points >= epochs_limit():
|
||||||
|
q, r = divmod(epochs_limit() - epochs_so_far, n_points)
|
||||||
|
batch_len = q + r
|
||||||
|
print(
|
||||||
|
f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}"
|
||||||
|
f"/{epochs_limit()}: ",
|
||||||
|
end='')
|
||||||
|
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs)
|
||||||
|
print(end='\r')
|
||||||
|
saved = self.log_results(f_val, epochs_so_far, epochs_limit())
|
||||||
|
print('\r', ' ' * columns, end='\r')
|
||||||
|
# stop if no epochs have been evaluated
|
||||||
|
if len(f_val) < batch_len:
|
||||||
|
logger.warning("Some evaluated epochs were void, "
|
||||||
|
"check the loss function and the search space.")
|
||||||
|
if (not saved and len(f_val) > 1) or batch_len < 1 or \
|
||||||
|
(not saved and self.search_space_size < batch_len + epochs_limit()):
|
||||||
|
break
|
||||||
|
# log_results add
|
||||||
|
epochs_so_far += saved
|
||||||
|
if self.max_epoch_reached:
|
||||||
|
logger.info("Max epoch reached, terminating.")
|
||||||
|
break
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print('User interrupted..')
|
||||||
|
|
||||||
def start(self) -> None:
|
def start(self) -> None:
|
||||||
""" Broom Broom """
|
""" Broom Broom """
|
||||||
self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None))
|
self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None))
|
||||||
logger.info(f"Using optimizer random state: {self.random_state}")
|
logger.info(f"Using optimizer random state: {self.random_state}")
|
||||||
|
self.hyperopt_table_header = -1
|
||||||
data, timerange = self.backtesting.load_bt_data()
|
data, timerange = self.backtesting.load_bt_data()
|
||||||
|
|
||||||
preprocessed = self.backtesting.strategy.tickerdata_to_dataframe(data)
|
preprocessed = self.backtesting.strategy.tickerdata_to_dataframe(data)
|
||||||
@ -1024,57 +1106,13 @@ class Hyperopt:
|
|||||||
colorama_init(autoreset=True)
|
colorama_init(autoreset=True)
|
||||||
|
|
||||||
self.setup_optimizers()
|
self.setup_optimizers()
|
||||||
try:
|
|
||||||
if self.multi:
|
|
||||||
jobs_scheduler = self.run_multi_backtest_parallel
|
|
||||||
else:
|
|
||||||
jobs_scheduler = self.run_backtest_parallel
|
|
||||||
with parallel_backend('loky', inner_max_num_threads=2):
|
|
||||||
with Parallel(n_jobs=self.n_jobs, verbose=0, backend='loky') as parallel:
|
|
||||||
jobs = parallel._effective_n_jobs()
|
|
||||||
logger.info(f'Effective number of parallel workers used: {jobs}')
|
|
||||||
# update epochs count
|
|
||||||
n_points = self.n_points
|
|
||||||
prev_batch = -1
|
|
||||||
epochs_so_far = len(self.trials)
|
|
||||||
epochs_limit = self.epochs_limit
|
|
||||||
columns, _ = os.get_terminal_size()
|
|
||||||
columns -= 1
|
|
||||||
while epochs_so_far > prev_batch or epochs_so_far < self.min_epochs:
|
|
||||||
prev_batch = epochs_so_far
|
|
||||||
occurrence = int(self.avg_best_occurrence * (1 + self.effort))
|
|
||||||
# pad the batch length to the number of jobs to avoid desaturation
|
|
||||||
batch_len = (occurrence + jobs -
|
|
||||||
occurrence % jobs)
|
|
||||||
# when using multiple optimizers each worker performs
|
|
||||||
# n_points (epochs) in 1 dispatch but this reduces the batch len too much
|
|
||||||
# if self.multi: batch_len = batch_len // self.n_points
|
|
||||||
# don't go over the limit
|
|
||||||
if epochs_so_far + batch_len * n_points >= epochs_limit():
|
|
||||||
q, r = divmod(epochs_limit() - epochs_so_far, n_points)
|
|
||||||
batch_len = q + r
|
|
||||||
print(
|
|
||||||
f"{epochs_so_far+1}-{epochs_so_far+batch_len*n_points}"
|
|
||||||
f"/{epochs_limit()}: ",
|
|
||||||
end='')
|
|
||||||
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, jobs)
|
|
||||||
print(end='\r')
|
|
||||||
saved = self.log_results(f_val, epochs_so_far, epochs_limit())
|
|
||||||
print('\r', ' ' * columns, end='\r')
|
|
||||||
# stop if no epochs have been evaluated
|
|
||||||
if len(f_val) < batch_len:
|
|
||||||
logger.warning("Some evaluated epochs were void, "
|
|
||||||
"check the loss function and the search space.")
|
|
||||||
if (not saved and len(f_val) > 1) or batch_len < 1:
|
|
||||||
break
|
|
||||||
# log_results add
|
|
||||||
epochs_so_far += saved
|
|
||||||
if self.max_epoch_reached:
|
|
||||||
logger.info("Max epoch reached, terminating.")
|
|
||||||
break
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
if self.multi:
|
||||||
print('User interrupted..')
|
jobs_scheduler = self.run_multi_backtest_parallel
|
||||||
|
else:
|
||||||
|
jobs_scheduler = self.run_backtest_parallel
|
||||||
|
|
||||||
|
self.main_loop(jobs_scheduler)
|
||||||
|
|
||||||
self.save_trials(final=True)
|
self.save_trials(final=True)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user