comments and small fixes

This commit is contained in:
orehunt 2020-03-04 17:00:56 +01:00
parent d6c66a54fd
commit 3f8ee76b24

View File

@ -10,7 +10,7 @@ import random
import sys import sys
import warnings import warnings
from collections import OrderedDict, deque from collections import OrderedDict, deque
from math import factorial, log, inf from math import factorial, log
from numpy import iinfo, int32 from numpy import iinfo, int32
from operator import itemgetter from operator import itemgetter
from pathlib import Path from pathlib import Path
@ -55,7 +55,7 @@ logger = logging.getLogger(__name__)
NEXT_POINT_METHODS = ["cl_min", "cl_mean", "cl_max"] NEXT_POINT_METHODS = ["cl_min", "cl_mean", "cl_max"]
NEXT_POINT_METHODS_LENGTH = 3 NEXT_POINT_METHODS_LENGTH = 3
MAX_LOSS = 10000 # just a big enough number to be bad result in loss optimization MAX_LOSS = iinfo(int32).max # just a big enough number to be bad result in loss optimization
class Hyperopt: class Hyperopt:
@ -83,6 +83,7 @@ class Hyperopt:
'hyperopt_optimizers.pickle') 'hyperopt_optimizers.pickle')
self.tickerdata_pickle = (self.config['user_data_dir'] / 'hyperopt_results' / self.tickerdata_pickle = (self.config['user_data_dir'] / 'hyperopt_results' /
'hyperopt_tickerdata.pkl') 'hyperopt_tickerdata.pkl')
self.n_jobs = self.config.get('hyperopt_jobs', -1) self.n_jobs = self.config.get('hyperopt_jobs', -1)
if self.n_jobs < 0: if self.n_jobs < 0:
self.n_jobs = cpu_count() // 2 or 1 self.n_jobs = cpu_count() // 2 or 1
@ -98,7 +99,7 @@ class Hyperopt:
# total number of candles being backtested # total number of candles being backtested
self.n_samples = 0 self.n_samples = 0
self.current_best_loss = inf self.current_best_loss = MAX_LOSS
self.current_best_epoch = 0 self.current_best_epoch = 0
self.epochs_since_last_best: List = [] self.epochs_since_last_best: List = []
self.avg_best_occurrence = 0 self.avg_best_occurrence = 0
@ -135,10 +136,12 @@ class Hyperopt:
# in single opt assume runs are expensive so default to 1 point per ask # in single opt assume runs are expensive so default to 1 point per ask
self.n_points = self.config.get('points_per_opt', default_n_points) self.n_points = self.config.get('points_per_opt', default_n_points)
# if 0 n_points are given, don't use any base estimator (akin to random search)
if self.n_points < 1: if self.n_points < 1:
self.n_points = 1 self.n_points = 1
self.opt_base_estimator = 'DUMMY' self.opt_base_estimator = 'DUMMY'
self.opt_acq_optimizer = 'sampling' self.opt_acq_optimizer = 'sampling'
# models are only needed for posterior eval
self.n_models = max(16, self.n_jobs) self.n_models = max(16, self.n_jobs)
# Populate functions here (hasattr is slow so should not be run during "regular" operations) # Populate functions here (hasattr is slow so should not be run during "regular" operations)
@ -496,10 +499,10 @@ class Hyperopt:
position_stacking=self.position_stacking, position_stacking=self.position_stacking,
) )
return self._get_results_dict(backtesting_results, min_date, max_date, params_dict, return self._get_results_dict(backtesting_results, min_date, max_date, params_dict,
params_details, raw_params) params_details)
def _get_results_dict(self, backtesting_results, min_date, max_date, params_dict, def _get_results_dict(self, backtesting_results, min_date, max_date, params_dict,
params_details, raw_params): params_details):
results_metrics = self._calculate_results_metrics(backtesting_results) results_metrics = self._calculate_results_metrics(backtesting_results)
results_explanation = self._format_results_explanation_string(results_metrics) results_explanation = self._format_results_explanation_string(results_metrics)
@ -569,6 +572,7 @@ class Hyperopt:
def run_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int, def run_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int,
jobs: int) -> List: jobs: int) -> List:
""" launch parallel in single opt mode, return the evaluated epochs """
result = parallel( result = parallel(
delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, backend.results, i) delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, backend.results, i)
for asked, i in zip(self.opt_ask_and_tell(jobs, tries), for asked, i in zip(self.opt_ask_and_tell(jobs, tries),
@ -577,14 +581,21 @@ class Hyperopt:
def run_multi_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int, def run_multi_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int,
jobs: int) -> List: jobs: int) -> List:
""" launch parallel in multi opt mode, return the evaluated epochs"""
results = parallel( results = parallel(
delayed(wrap_non_picklable_objects(self.parallel_opt_objective))( delayed(wrap_non_picklable_objects(self.parallel_opt_objective))(
i, backend.optimizers, jobs, backend.results_board) i, backend.optimizers, jobs, backend.results_board)
for i in range(first_try, first_try + tries)) for i in range(first_try, first_try + tries))
# each worker will return a list containing n_points, so compact into a single list
return functools.reduce(lambda x, y: [*x, *y], results) return functools.reduce(lambda x, y: [*x, *y], results)
def opt_ask_and_tell(self, jobs: int, tries: int): def opt_ask_and_tell(self, jobs: int, tries: int):
""" loop to manager optimizer state in single optimizer mode """ """
loop to manage optimizer state in single optimizer mode, everytime a job is
dispatched, we check the optimizer for points, to ask and to tell if any,
but only fit a new model every n_points, because if we fit at every result previous
points become invalid.
"""
vals = [] vals = []
to_ask: deque = deque() to_ask: deque = deque()
evald: List[List] = [] evald: List[List] = []
@ -605,12 +616,16 @@ class Hyperopt:
to_ask.extend(self.opt.ask(n_points=self.n_points)) to_ask.extend(self.opt.ask(n_points=self.n_points))
fit = True fit = True
a = to_ask.popleft() a = to_ask.popleft()
if a in evald: while a in evald and len(to_ask) > 0:
logger.info('this point was evaluated before...') logger.info('this point was evaluated before...')
a = to_ask.popleft()
evald.append(a) evald.append(a)
yield a yield a
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue): def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue):
"""
objective run in multi opt mode, optimizers share the results as soon as they are completed
"""
self.log_results_immediate(n) self.log_results_immediate(n)
# fetch an optimizer instance # fetch an optimizer instance
opt = optimizers.get() opt = optimizers.get()
@ -655,15 +670,12 @@ class Hyperopt:
return f_val return f_val
def parallel_objective(self, asked, results: Queue, n=0): def parallel_objective(self, asked, results: Queue, n=0):
""" objective run in single opt mode, run the backtest, store the results into a queue """
self.log_results_immediate(n) self.log_results_immediate(n)
v = self.backtest_params(asked) v = self.backtest_params(asked)
results.put(v) results.put(v)
return v return v
def parallel_callback(self, f_val):
""" Executed after each epoch evaluation to collect the results """
self.f_val.extend(f_val)
def log_results_immediate(self, n) -> None: def log_results_immediate(self, n) -> None:
""" Signals that a new job has been scheduled""" """ Signals that a new job has been scheduled"""
print('.', end='') print('.', end='')
@ -750,6 +762,7 @@ class Hyperopt:
if search_space_size < n_jobs: if search_space_size < n_jobs:
# don't waste if the space is small # don't waste if the space is small
n_initial_points = n_jobs n_initial_points = n_jobs
min_epochs = n_jobs
elif total_epochs > 0: elif total_epochs > 0:
n_initial_points = total_epochs // 3 if total_epochs > n_jobs * 3 else n_jobs n_initial_points = total_epochs // 3 if total_epochs > n_jobs * 3 else n_jobs
min_epochs = n_initial_points min_epochs = n_initial_points
@ -783,33 +796,34 @@ class Hyperopt:
def setup_optimizers(self): def setup_optimizers(self):
""" Setup the optimizers objects, try to load from disk, or create new ones """ """ Setup the optimizers objects, try to load from disk, or create new ones """
# try to load previous optimizers # try to load previous optimizers
self.opts = self.load_previous_optimizers(self.opts_file) opts = self.load_previous_optimizers(self.opts_file)
if self.multi: if self.multi:
if len(self.opts) == self.n_jobs: if len(opts) > 0:
# put the restored optimizers in the queue and clear them from the object # put the restored optimizers in the queue and clear them from the object
for opt in self.opts: for opt in opts:
backend.optimizers.put(opt) backend.optimizers.put(opt)
else: # or generate new optimizers # generate as many optimizers as are still needed to fill the job count
remaining = self.n_jobs - backend.optimizers.qsize()
if remaining > 0:
opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points) opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points)
# reduce random points by the number of optimizers for _ in range(remaining): # generate optimizers
self.n_initial_points = self.n_initial_points // self.n_jobs
for _ in range(self.n_jobs): # generate optimizers
# random state is preserved # random state is preserved
backend.optimizers.put( backend.optimizers.put(
opt.copy(random_state=opt.rng.randint(0, opt.copy(random_state=opt.rng.randint(0,
iinfo(int32).max))) iinfo(int32).max)))
del opt del opt
else: else:
# if we have more than 1 optimizer but are using single opt, # if we have more than 1 optimizer but are using single opt,
# pick one discard the rest... # pick one discard the rest...
if len(self.opts) > 0: if len(opts) > 0:
self.opt = self.opts[-1] self.opt = opts[-1]
del self.opts
else: else:
self.opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points) self.opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points)
del opts[:]
def start(self) -> None: def start(self) -> None:
""" Broom Broom """
self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None)) self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None))
logger.info(f"Using optimizer random state: {self.random_state}") logger.info(f"Using optimizer random state: {self.random_state}")
@ -840,11 +854,14 @@ class Hyperopt:
self.dimensions: List[Dimension] = self.hyperopt_space() self.dimensions: List[Dimension] = self.hyperopt_space()
self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs( self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs(
self.dimensions, self.n_jobs, self.effort, self.total_epochs) self.dimensions, self.n_jobs, self.effort, self.total_epochs)
# reduce random points by the number of optimizers in multi mode
if self.multi:
self.n_initial_points = self.n_initial_points // self.n_jobs
logger.info(f"Min epochs set to: {self.min_epochs}") logger.info(f"Min epochs set to: {self.min_epochs}")
# if total epochs are not set, max_epoch takes its place
if self.total_epochs < 1: if self.total_epochs < 1:
self.max_epoch = int(self.min_epochs + len(self.trials)) self.max_epoch = int(self.min_epochs + len(self.trials))
else: # initialize average best occurrence
self.max_epoch = self.n_initial_points
self.avg_best_occurrence = self.min_epochs // self.n_jobs self.avg_best_occurrence = self.min_epochs // self.n_jobs
logger.info(f'Initial points: {self.n_initial_points}') logger.info(f'Initial points: {self.n_initial_points}')
@ -896,7 +913,7 @@ class Hyperopt:
if self.trials: if self.trials:
sorted_trials = sorted(self.trials, key=itemgetter('loss')) sorted_trials = sorted(self.trials, key=itemgetter('loss'))
results = sorted_trials[0] results = sorted_trials[0]
self.print_epoch_details(results, self.max_epoch, self.print_json) self.print_epoch_details(results, self.epochs_limit(), self.print_json)
else: else:
# This is printed when Ctrl+C is pressed quickly, before first epochs have # This is printed when Ctrl+C is pressed quickly, before first epochs have
# a chance to be evaluated. # a chance to be evaluated.