From f22caa78ed01903da1512a892d69081f9d87d445 Mon Sep 17 00:00:00 2001 From: hroff-1902 Date: Tue, 16 Apr 2019 08:24:38 +0300 Subject: [PATCH] issue resolved: opt.tell() is now called in the parent process --- freqtrade/optimize/hyperopt.py | 126 ++++++++++++++++------ freqtrade/tests/optimize/test_hyperopt.py | 7 +- 2 files changed, 100 insertions(+), 33 deletions(-) diff --git a/freqtrade/optimize/hyperopt.py b/freqtrade/optimize/hyperopt.py index ff25e6e84..a65b87b19 100644 --- a/freqtrade/optimize/hyperopt.py +++ b/freqtrade/optimize/hyperopt.py @@ -3,6 +3,7 @@ """ This module contains the hyperopt logic """ +import functools import logging import multiprocessing import os @@ -16,6 +17,8 @@ from pprint import pprint from typing import Any, Dict, List from joblib import Parallel, delayed, dump, load, wrap_non_picklable_objects +from joblib._parallel_backends import LokyBackend +from joblib import register_parallel_backend, parallel_backend from pandas import DataFrame from skopt import Optimizer from skopt.space import Dimension @@ -37,6 +40,9 @@ TICKERDATA_PICKLE = os.path.join('user_data', 'hyperopt_tickerdata.pkl') EVALS_FRAME = 100 +_hyperopt = None + + class Hyperopt(Backtesting): """ Hyperopt class, this class contains all the logic to run a hyperopt simulation @@ -69,6 +75,9 @@ class Hyperopt(Backtesting): self.trials_file = os.path.join('user_data', 'hyperopt_results.pickle') self.trials: List = [] + self.opt = None + self.f_val: List = [] + def get_args(self, params): dimensions = self.hyperopt_space() # Ensure the number of dimensions match @@ -114,7 +123,7 @@ class Hyperopt(Backtesting): logger.info('ROI table:') pprint(self.custom_hyperopt.generate_roi_table(best_result['params']), indent=4) - def log_results_immediate(self) -> None: + def log_results_immediate(self, n) -> None: print('.', end='') sys.stdout.flush() @@ -169,7 +178,21 @@ class Hyperopt(Backtesting): return spaces def generate_optimizer(self, _params: Dict) -> Dict: + """ + Process params (a point in the hyper options space generated by the + optimizer.ask() optimizer method) and run backtest() for it. + + This method is executed in one of the joblib worker processes. + + The optimizer.tell() function is executed in the joblib parent process + in order to tell results to single instance of the optimizer rather than + its copies serialized in the joblib worker processes. + + It receives result of this method as a part of the joblib batch completion + callback function. + """ params = self.get_args(_params) + if self.has_space('roi'): self.strategy.minimal_roi = self.custom_hyperopt.generate_roi_table(params) @@ -203,19 +226,14 @@ class Hyperopt(Backtesting): trade_count = len(results.index) trade_duration = results.trade_duration.mean() - if trade_count == 0: - return { - 'loss': MAX_LOSS, - 'params': params, - 'result': result_explanation, - } - - loss = self.calculate_loss(total_profit, trade_count, trade_duration) + loss = MAX_LOSS if trade_count == 0 else \ + self.calculate_loss(total_profit, trade_count, trade_duration) return { 'loss': loss, 'params': params, 'result': result_explanation, + 'asked': _params, } def format_results(self, results: DataFrame) -> str: @@ -239,20 +257,35 @@ class Hyperopt(Backtesting): base_estimator="ET", acq_optimizer="auto", n_initial_points=30, - acq_optimizer_kwargs={'n_jobs': cpu_count} + acq_optimizer_kwargs={'n_jobs': cpu_count}, + random_state=777, ) - def run_optimizer_parallel(self, parallel, opt, tries: int, first_try: int) -> List: + def run_optimizer_parallel(self, parallel, tries: int, first_try: int) -> List: result = parallel(delayed( - wrap_non_picklable_objects(self.parallel_tell_and_log)) - (opt, i, opt.ask()) for i in range(first_try, first_try + tries)) + wrap_non_picklable_objects(self.parallel_objective)) + (asked, i) for asked, i in + zip(self.opt_generator(), range(first_try, first_try + tries))) return result - def parallel_tell_and_log(self, opt, i, asked): - f_val = self.generate_optimizer(asked) - opt.tell(asked, f_val['loss']) - self.log_results_immediate() - return f_val + def opt_generator(self): + while True: + if self.f_val: + # print("opt.tell(): ", + # [v['asked'] for v in self.f_val], [v['loss'] for v in self.f_val]) + functools.partial( + self.opt.tell, + ([v['asked'] for v in self.f_val], [v['loss'] for v in self.f_val]) + ) + self.f_val = [] + yield self.opt.ask() + + def parallel_objective(self, asked, n): + self.log_results_immediate(n) + return self.generate_optimizer(asked) + + def parallel_callback(self, f_val): + self.f_val.extend(f_val) def load_previous_results(self): """ read trials file if we have one """ @@ -283,25 +316,27 @@ class Hyperopt(Backtesting): cpus = multiprocessing.cpu_count() logger.info(f'Found {cpus} CPU cores. Let\'s make them scream!') - opt = self.get_optimizer(cpus) + self.opt = self.get_optimizer(cpus) frames = self.total_tries // EVALS_FRAME last_frame_len = self.total_tries % EVALS_FRAME try: - with Parallel(n_jobs=cpus, verbose=0) as parallel: - for frame in range(frames + 1): - frame_start = frame * EVALS_FRAME - frame_len = last_frame_len if frame == frames else EVALS_FRAME - print(f"\n{frame_start+1}-{frame_start+frame_len}" - f"/{self.total_tries}: ", end='') - f_val = self.run_optimizer_parallel( - parallel, opt, + register_parallel_backend('custom', CustomImmediateResultBackend) + with parallel_backend('custom'): + with Parallel(n_jobs=cpus, verbose=0) as parallel: + for frame in range(frames + 1): + frame_start = frame * EVALS_FRAME + frame_len = last_frame_len if frame == frames else EVALS_FRAME + print(f"\n{frame_start+1}-{frame_start+frame_len}" + f"/{self.total_tries}: ", end='') + f_val = self.run_optimizer_parallel( + parallel, frame_len, frame_start - ) - self.trials += f_val - self.log_results(f_val, frame_start, self.total_tries) + ) + self.trials += f_val + self.log_results(f_val, frame_start, self.total_tries) except KeyboardInterrupt: print('User interrupted..') @@ -337,5 +372,32 @@ def start(args: Namespace) -> None: "to understand how to configure hyperopt.") raise ValueError("--strategy configured but not supported for hyperopt") # Initialize backtesting object - hyperopt = Hyperopt(config) - hyperopt.start() + global _hyperopt + _hyperopt = Hyperopt(config) + _hyperopt.start() + + +class MultiCallback: + def __init__(self, *callbacks): + self.callbacks = [cb for cb in callbacks if cb] + + def __call__(self, out): + for cb in self.callbacks: + cb(out) + + +class CustomImmediateResultBackend(LokyBackend): + def callback(self, result): + """ + Our custom completion callback. Executed in the parent process. + Use it to run Optimizer.tell() with immediate results of the backtest() + evaluated in the joblib worker process. + """ + # Fetch results from the Future object passed to us. + # Future object is assumed to be 'done' already. + f_val = result.result().copy() + _hyperopt.parallel_callback(f_val) + + def apply_async(self, func, callback=None): + cbs = MultiCallback(callback, self.callback) + return super().apply_async(func, cbs) diff --git a/freqtrade/tests/optimize/test_hyperopt.py b/freqtrade/tests/optimize/test_hyperopt.py index 986e62861..2fdecb2bb 100644 --- a/freqtrade/tests/optimize/test_hyperopt.py +++ b/freqtrade/tests/optimize/test_hyperopt.py @@ -358,7 +358,12 @@ def test_generate_optimizer(mocker, default_conf) -> None: 'loss': 1.9840569076926293, 'result': ' 1 trades. Avg profit 2.31%. Total profit 0.00023300 BTC ' '(0.0231Σ%). Avg duration 100.0 mins.', - 'params': optimizer_param + 'params': optimizer_param, + 'asked': [ + 0, 35, 0, 0, False, True, False, False, 'macd_cross_signal', 0, 75, 0, 0, + False, True, False, False, 'macd_cross_signal', 60.0, 30.0, 20.0, + 0.01, 0.01, 0.1, -0.4 + ] } hyperopt = Hyperopt(default_conf)