From 0a49dcb7124b3e4ba96bbbea1faeff27c6801994 Mon Sep 17 00:00:00 2001 From: orehunt Date: Fri, 21 Feb 2020 10:40:26 +0100 Subject: [PATCH] - batched hyperopt - auto epochs --- freqtrade/commands/optimize_commands.py | 5 +- freqtrade/optimize/hyperopt.py | 295 +++++++++++++++--------- freqtrade/optimize/hyperopt_backend.py | 30 +++ setup.cfg | 4 + 4 files changed, 229 insertions(+), 105 deletions(-) create mode 100644 freqtrade/optimize/hyperopt_backend.py diff --git a/freqtrade/commands/optimize_commands.py b/freqtrade/commands/optimize_commands.py index a2d1b4601..e1db5ec9f 100644 --- a/freqtrade/commands/optimize_commands.py +++ b/freqtrade/commands/optimize_commands.py @@ -54,6 +54,7 @@ def start_hyperopt(args: Dict[str, Any]) -> None: try: from filelock import FileLock, Timeout from freqtrade.optimize.hyperopt import Hyperopt + from freqtrade.optimize import hyperopt_backend as backend except ImportError as e: raise OperationalException( f"{e}. Please ensure that the hyperopt dependencies are installed.") from e @@ -72,8 +73,8 @@ def start_hyperopt(args: Dict[str, Any]) -> None: logging.getLogger('filelock').setLevel(logging.WARNING) # Initialize backtesting object - hyperopt = Hyperopt(config) - hyperopt.start() + backend.hyperopt = Hyperopt(config) + backend.hyperopt.start() except Timeout: logger.info("Another running instance of freqtrade Hyperopt detected.") diff --git a/freqtrade/optimize/hyperopt.py b/freqtrade/optimize/hyperopt.py index 0f9076770..14a77cdf5 100644 --- a/freqtrade/optimize/hyperopt.py +++ b/freqtrade/optimize/hyperopt.py @@ -1,15 +1,17 @@ # pragma pylint: disable=too-many-instance-attributes, pointless-string-statement - """ This module contains the hyperopt logic """ +import os +import functools import locale import logging import random import sys import warnings from collections import OrderedDict +from math import factorial, log from operator import itemgetter from pathlib import Path from pprint import pprint @@ -18,9 +20,6 @@ from typing import Any, Dict, List, Optional import rapidjson from colorama import Fore, Style from colorama import init as colorama_init -from joblib import (Parallel, cpu_count, delayed, dump, load, - wrap_non_picklable_objects) -from pandas import DataFrame from freqtrade.data.converter import trim_dataframe from freqtrade.data.history import get_timerange @@ -28,10 +27,14 @@ from freqtrade.exceptions import OperationalException from freqtrade.misc import plural, round_dict from freqtrade.optimize.backtesting import Backtesting # Import IHyperOpt and IHyperOptLoss to allow unpickling classes from these modules +from freqtrade.optimize.hyperopt_backend import CustomImmediateResultBackend from freqtrade.optimize.hyperopt_interface import IHyperOpt # noqa: F401 from freqtrade.optimize.hyperopt_loss_interface import IHyperOptLoss # noqa: F401 -from freqtrade.resolvers.hyperopt_resolver import (HyperOptLossResolver, - HyperOptResolver) +from freqtrade.resolvers.hyperopt_resolver import (HyperOptLossResolver, HyperOptResolver) +from joblib import (Parallel, cpu_count, delayed, dump, load, wrap_non_picklable_objects) +from joblib._parallel_backends import LokyBackend +from joblib import register_parallel_backend, parallel_backend +from pandas import DataFrame # Suppress scikit-learn FutureWarnings from skopt with warnings.catch_warnings(): @@ -39,10 +42,8 @@ with warnings.catch_warnings(): from skopt import Optimizer from skopt.space import Dimension - logger = logging.getLogger(__name__) - INITIAL_POINTS = 30 # Keep no more than 2*SKOPT_MODELS_MAX_NUM models @@ -60,7 +61,6 @@ class Hyperopt: hyperopt = Hyperopt(config) hyperopt.start() """ - def __init__(self, config: Dict[str, Any]) -> None: self.config = config @@ -71,13 +71,21 @@ class Hyperopt: self.custom_hyperoptloss = HyperOptLossResolver.load_hyperoptloss(self.config) self.calculate_loss = self.custom_hyperoptloss.hyperopt_loss_function - self.trials_file = (self.config['user_data_dir'] / - 'hyperopt_results' / 'hyperopt_results.pickle') - self.tickerdata_pickle = (self.config['user_data_dir'] / - 'hyperopt_results' / 'hyperopt_tickerdata.pkl') - self.total_epochs = config.get('epochs', 0) + self.trials_file = (self.config['user_data_dir'] / 'hyperopt_results' / + 'hyperopt_results.pickle') + self.tickerdata_pickle = (self.config['user_data_dir'] / 'hyperopt_results' / + 'hyperopt_tickerdata.pkl') + self.effort = config.get('epochs', 0) or 1 + self.total_epochs = 9999 + self.max_epoch = 9999 + self.search_space_size = 0 + self.max_epoch_reached = False + self.min_epochs = INITIAL_POINTS self.current_best_loss = 100 + self.current_best_epoch = 0 + self.epochs_since_last_best = [] + self.avg_best_occurrence = 0 if not self.config.get('hyperopt_continue'): self.clean_hyperopt() @@ -89,6 +97,10 @@ class Hyperopt: # Previous evaluations self.trials: List = [] + self.opt: Optimizer + self.opt = None + self.f_val: List = [] + # Populate functions here (hasattr is slow so should not be run during "regular" operations) if hasattr(self.custom_hyperopt, 'populate_indicators'): self.backtesting.strategy.advise_indicators = \ @@ -175,24 +187,27 @@ class Hyperopt: result: Dict = {} if self.has_space('buy'): - result['buy'] = {p.name: params.get(p.name) - for p in self.hyperopt_space('buy')} + result['buy'] = {p.name: params.get(p.name) for p in self.hyperopt_space('buy')} if self.has_space('sell'): - result['sell'] = {p.name: params.get(p.name) - for p in self.hyperopt_space('sell')} + result['sell'] = {p.name: params.get(p.name) for p in self.hyperopt_space('sell')} if self.has_space('roi'): result['roi'] = self.custom_hyperopt.generate_roi_table(params) if self.has_space('stoploss'): - result['stoploss'] = {p.name: params.get(p.name) - for p in self.hyperopt_space('stoploss')} + result['stoploss'] = { + p.name: params.get(p.name) + for p in self.hyperopt_space('stoploss') + } if self.has_space('trailing'): result['trailing'] = self.custom_hyperopt.generate_trailing_params(params) return result @staticmethod - def print_epoch_details(results, total_epochs: int, print_json: bool, - no_header: bool = False, header_str: str = None) -> None: + def print_epoch_details(results, + total_epochs: int, + print_json: bool, + no_header: bool = False, + header_str: str = None) -> None: """ Display details of the hyperopt result """ @@ -231,8 +246,7 @@ class Hyperopt: # OrderedDict is used to keep the numeric order of the items # in the dict. result_dict['minimal_roi'] = OrderedDict( - (str(k), v) for k, v in space_params.items() - ) + (str(k), v) for k, v in space_params.items()) else: # 'stoploss', 'trailing' result_dict.update(space_params) @@ -261,16 +275,7 @@ class Hyperopt: Log results if it is better than any previous evaluation """ is_best = results['is_best'] - if not self.print_all: - # Print '\n' after each 100th epoch to separate dots from the log messages. - # Otherwise output is messy on a terminal. - print('.', end='' if results['current_epoch'] % 100 != 0 else None) # type: ignore - sys.stdout.flush() - if self.print_all or is_best: - if not self.print_all: - # Separate the results explanation string from dots - print("\n") self.print_results_explanation(results, self.total_epochs, self.print_all, self.print_colorized) @@ -291,10 +296,9 @@ class Hyperopt: @staticmethod def _format_explanation_string(results, total_epochs) -> str: - return (("*" if results['is_initial_point'] else " ") + + return (("*" if 'is_initial_point' in results and results['is_initial_point'] else " ") + f"{results['current_epoch']:5d}/{total_epochs}: " + - f"{results['results_explanation']} " + - f"Objective: {results['loss']:.5f}") + f"{results['results_explanation']} " + f"Objective: {results['loss']:.5f}") def has_space(self, space: str) -> bool: """ @@ -381,11 +385,11 @@ class Hyperopt: max_open_trades=self.max_open_trades, position_stacking=self.position_stacking, ) - return self._get_results_dict(backtesting_results, min_date, max_date, - params_dict, params_details) + return self._get_results_dict(backtesting_results, min_date, max_date, params_dict, + params_details) - def _get_results_dict(self, backtesting_results, min_date, max_date, - params_dict, params_details): + def _get_results_dict(self, backtesting_results, min_date, max_date, params_dict, + params_details): results_metrics = self._calculate_results_metrics(backtesting_results) results_explanation = self._format_results_explanation_string(results_metrics) @@ -398,8 +402,10 @@ class Hyperopt: # path. We do not want to optimize 'hodl' strategies. loss: float = MAX_LOSS if trade_count >= self.config['hyperopt_min_trades']: - loss = self.calculate_loss(results=backtesting_results, trade_count=trade_count, - min_date=min_date.datetime, max_date=max_date.datetime) + loss = self.calculate_loss(results=backtesting_results, + trade_count=trade_count, + min_date=min_date.datetime, + max_date=max_date.datetime) return { 'loss': loss, 'params_dict': params_dict, @@ -427,39 +433,75 @@ class Hyperopt: f"Avg profit {results_metrics['avg_profit']: 6.2f}%. " f"Total profit {results_metrics['total_profit']: 11.8f} {stake_cur} " f"({results_metrics['profit']: 7.2f}\N{GREEK CAPITAL LETTER SIGMA}%). " - f"Avg duration {results_metrics['duration']:5.1f} min." - ).encode(locale.getpreferredencoding(), 'replace').decode('utf-8') + f"Avg duration {results_metrics['duration']:5.1f} min.").encode( + locale.getpreferredencoding(), 'replace').decode('utf-8') - def get_optimizer(self, dimensions: List[Dimension], cpu_count) -> Optimizer: + def get_optimizer(self, dimensions: List[Dimension], cpu_count, + n_initial_points=INITIAL_POINTS) -> Optimizer: return Optimizer( dimensions, base_estimator="ET", acq_optimizer="auto", - n_initial_points=INITIAL_POINTS, + n_initial_points=n_initial_points, acq_optimizer_kwargs={'n_jobs': cpu_count}, + model_queue_size=SKOPT_MODELS_MAX_NUM, random_state=self.random_state, ) - def fix_optimizer_models_list(self) -> None: - """ - WORKAROUND: Since skopt is not actively supported, this resolves problems with skopt - memory usage, see also: https://github.com/scikit-optimize/scikit-optimize/pull/746 + def run_optimizer_parallel(self, parallel, tries: int, first_try: int) -> List: + result = parallel( + delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, i) + for asked, i in zip(self.opt_generator(), range(first_try, first_try + tries))) + return result - This may cease working when skopt updates if implementation of this intrinsic - part changes. - """ - n = len(self.opt.models) - SKOPT_MODELS_MAX_NUM - # Keep no more than 2*SKOPT_MODELS_MAX_NUM models in the skopt models list, - # remove the old ones. These are actually of no use, the current model - # from the estimator is the only one used in the skopt optimizer. - # Freqtrade code also does not inspect details of the models. - if n >= SKOPT_MODELS_MAX_NUM: - logger.debug(f"Fixing skopt models list, removing {n} old items...") - del self.opt.models[0:n] + def opt_generator(self): + while True: + if self.f_val: + # print("opt.tell(): ", + # [v['params_dict'] for v in self.f_val], [v['loss'] for v in self.f_val]) + functools.partial(self.opt.tell, + ([v['params_dict'] + for v in self.f_val], [v['loss'] for v in self.f_val])) + self.f_val = [] + yield self.opt.ask() - def run_optimizer_parallel(self, parallel, asked, i) -> List: - return parallel(delayed( - wrap_non_picklable_objects(self.generate_optimizer))(v, i) for v in asked) + def parallel_objective(self, asked, n): + self.log_results_immediate(n) + return self.generate_optimizer(asked) + + def parallel_callback(self, f_val): + self.f_val.extend(f_val) + + def log_results_immediate(self, n) -> None: + print('.', end='') + sys.stdout.flush() + + def log_results(self, f_val, frame_start, max_epoch) -> None: + """ + Log results if it is better than any previous evaluation + """ + for i, v in enumerate(f_val): + is_best = self.is_best_loss(v, self.current_best_loss) + current = frame_start + i + 1 + v['is_best'] = is_best + v['current_epoch'] = current + v['is_initial_point'] = current <= self.n_initial_points + logger.debug(f"Optimizer epoch evaluated: {v}") + if is_best: + self.current_best_loss = v['loss'] + self.update_max_epoch(v, current) + self.print_results(v) + self.trials.append(v) + # Save results after every batch + print('\n') + self.save_trials() + # give up if no best since max epochs + if current > self.max_epoch: + self.max_epoch_reached = True + # testing trapdoor + if os.getenv('FQT_HYPEROPT_TRAP'): + logger.debug('bypassing hyperopt loop') + self.max_epoch = 1 @staticmethod def load_previous_results(trials_file: Path) -> List: @@ -479,6 +521,55 @@ class Hyperopt: def _set_random_state(self, random_state: Optional[int]) -> int: return random_state or random.randint(1, 2**16 - 1) + @staticmethod + def calc_epochs(dimensions: List[Dimension], config_jobs: int, effort: int): + """ Compute a reasonable number of initial points and + a minimum number of epochs to evaluate """ + n_dimensions = len(dimensions) + n_parameters = 0 + # sum all the dimensions discretely, granting minimum values + for d in dimensions: + if type(d).__name__ == 'Integer': + n_parameters += max(1, d.high - d.low) + elif type(d).__name__ == 'Real': + n_parameters += max(10, int(d.high - d.low)) + else: + n_parameters += len(d.bounds) + # guess the size of the search space as the count of the + # unordered combination of the dimensions entries + search_space_size = (factorial(n_parameters) / + (factorial(n_parameters - n_dimensions) * factorial(n_dimensions))) + # logger.info(f'Search space size: {search_space_size}') + if search_space_size < config_jobs: + # don't waste if the space is small + n_initial_points = config_jobs + else: + # extract coefficients from the search space and the jobs count + log_sss = int(log(search_space_size, 10)) + log_jobs = int(log(config_jobs, 2)) + log_jobs = 2 if log_jobs < 0 else log_jobs + jobs_ip = log_jobs * log_sss + # never waste + n_initial_points = log_sss if jobs_ip > search_space_size else jobs_ip + # it shall run for this much, I say + min_epochs = max(2 * n_initial_points, 3 * config_jobs) * effort + return n_initial_points, min_epochs, search_space_size + + def update_max_epoch(self, val: Dict, current: int): + """ calculate max epochs: store the number of non best epochs + between each best, and get the mean of that value """ + if val['is_initial_point'] is not True: + self.epochs_since_last_best.append(current - self.current_best_epoch) + self.avg_best_occurrence = (sum(self.epochs_since_last_best) // + len(self.epochs_since_last_best)) + self.current_best_epoch = current + self.max_epoch = (self.current_best_epoch + self.avg_best_occurrence + + self.min_epochs) * self.effort + if self.max_epoch > self.search_space_size: + self.max_epoch = self.search_space_size + print('\n') + logger.info(f'Max epochs set to: {self.max_epoch}') + def start(self) -> None: self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None)) logger.info(f"Using optimizer random state: {self.random_state}") @@ -492,10 +583,8 @@ class Hyperopt: preprocessed[pair] = trim_dataframe(df, timerange) min_date, max_date = get_timerange(data) - logger.info( - 'Hyperopting with data from %s up to %s (%s days)..', - min_date.isoformat(), max_date.isoformat(), (max_date - min_date).days - ) + logger.info('Hyperopting with data from %s up to %s (%s days)..', min_date.isoformat(), + max_date.isoformat(), (max_date - min_date).days) dump(preprocessed, self.tickerdata_pickle) # We don't need exchange instance anymore while running hyperopt @@ -509,46 +598,41 @@ class Hyperopt: logger.info(f'Number of parallel jobs set as: {config_jobs}') self.dimensions: List[Dimension] = self.hyperopt_space() - self.opt = self.get_optimizer(self.dimensions, config_jobs) + self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs( + self.dimensions, config_jobs, self.effort) + logger.info(f"Min epochs set to: {self.min_epochs}") + self.max_epoch = self.min_epochs + self.avg_best_occurrence = self.max_epoch + + logger.info(f'Initial points: {self.n_initial_points}') + self.opt = self.get_optimizer(self.dimensions, config_jobs, self.n_initial_points) + + # last_frame_len = (self.total_epochs - 1) % self.avg_best_occurrence if self.print_colorized: colorama_init(autoreset=True) - try: - with Parallel(n_jobs=config_jobs) as parallel: - jobs = parallel._effective_n_jobs() - logger.info(f'Effective number of parallel workers used: {jobs}') - EVALS = max(self.total_epochs // jobs, 1) - for i in range(EVALS): - asked = self.opt.ask(n_points=jobs) - f_val = self.run_optimizer_parallel(parallel, asked, i) - self.opt.tell(asked, [v['loss'] for v in f_val]) - self.fix_optimizer_models_list() - for j in range(jobs): - # Use human-friendly indexes here (starting from 1) - current = i * jobs + j + 1 - val = f_val[j] - val['current_epoch'] = current - val['is_initial_point'] = current <= INITIAL_POINTS - logger.debug(f"Optimizer epoch evaluated: {val}") + try: + register_parallel_backend('custom', CustomImmediateResultBackend) + with parallel_backend('custom'): + with Parallel(n_jobs=config_jobs, verbose=0) as parallel: + for frame in range(self.total_epochs): + epochs_so_far = len(self.trials) + # pad the frame length to the number of jobs to avoid desaturation + frame_len = (self.avg_best_occurrence + config_jobs - + self.avg_best_occurrence % config_jobs) + print( + f"{epochs_so_far+1}-{epochs_so_far+self.avg_best_occurrence}" + f"/{self.total_epochs}: ", + end='') + f_val = self.run_optimizer_parallel(parallel, frame_len, epochs_so_far) + self.log_results(f_val, epochs_so_far, self.total_epochs) + if self.max_epoch_reached: + logger.info("Max epoch reached, terminating.") + break - is_best = self.is_best_loss(val, self.current_best_loss) - # This value is assigned here and not in the optimization method - # to keep proper order in the list of results. That's because - # evaluations can take different time. Here they are aligned in the - # order they will be shown to the user. - val['is_best'] = is_best - - self.print_results(val) - - if is_best: - self.current_best_loss = val['loss'] - self.trials.append(val) - # Save results after each best epoch and every 100 epochs - if is_best or current % 100 == 0: - self.save_trials() - except KeyboardInterrupt: - print('User interrupted..') + except KeyboardInterrupt: + print("User interrupted..") self.save_trials(final=True) @@ -560,3 +644,8 @@ class Hyperopt: # This is printed when Ctrl+C is pressed quickly, before first epochs have # a chance to be evaluated. print("No epochs evaluated yet, no best result.") + + def __getstate__(self): + state = self.__dict__.copy() + del state['trials'] + return state diff --git a/freqtrade/optimize/hyperopt_backend.py b/freqtrade/optimize/hyperopt_backend.py new file mode 100644 index 000000000..d7a8544cc --- /dev/null +++ b/freqtrade/optimize/hyperopt_backend.py @@ -0,0 +1,30 @@ +from joblib._parallel_backends import LokyBackend + +hyperopt = None + + +class MultiCallback: + def __init__(self, *callbacks): + self.callbacks = [cb for cb in callbacks if cb] + + def __call__(self, out): + for cb in self.callbacks: + cb(out) + + +class CustomImmediateResultBackend(LokyBackend): + def callback(self, result): + """ + Our custom completion callback. Executed in the parent process. + Use it to run Optimizer.tell() with immediate results of the backtest() + evaluated in the joblib worker process. + """ + if not result.exception(): + # Fetch results from the Future object passed to us. + # Future object is assumed to be 'done' already. + f_val = result.result().copy() + hyperopt.parallel_callback(f_val) + + def apply_async(self, func, callback=None): + cbs = MultiCallback(callback, self.callback) + return super().apply_async(func, cbs) diff --git a/setup.cfg b/setup.cfg index 34f25482b..9853c99d9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -13,3 +13,7 @@ ignore_missing_imports = True [mypy-tests.*] ignore_errors = True + +[yapf] +based_on_style = pep8 +column_limit = 100 \ No newline at end of file