issue resolved: opt.tell() is now called in the parent process

This commit is contained in:
hroff-1902 2019-04-16 08:24:38 +03:00
parent a8399533e2
commit f22caa78ed
2 changed files with 100 additions and 33 deletions

View File

@ -3,6 +3,7 @@
""" """
This module contains the hyperopt logic This module contains the hyperopt logic
""" """
import functools
import logging import logging
import multiprocessing import multiprocessing
import os import os
@ -16,6 +17,8 @@ from pprint import pprint
from typing import Any, Dict, List from typing import Any, Dict, List
from joblib import Parallel, delayed, dump, load, wrap_non_picklable_objects from joblib import Parallel, delayed, dump, load, wrap_non_picklable_objects
from joblib._parallel_backends import LokyBackend
from joblib import register_parallel_backend, parallel_backend
from pandas import DataFrame from pandas import DataFrame
from skopt import Optimizer from skopt import Optimizer
from skopt.space import Dimension from skopt.space import Dimension
@ -37,6 +40,9 @@ TICKERDATA_PICKLE = os.path.join('user_data', 'hyperopt_tickerdata.pkl')
EVALS_FRAME = 100 EVALS_FRAME = 100
_hyperopt = None
class Hyperopt(Backtesting): class Hyperopt(Backtesting):
""" """
Hyperopt class, this class contains all the logic to run a hyperopt simulation Hyperopt class, this class contains all the logic to run a hyperopt simulation
@ -69,6 +75,9 @@ class Hyperopt(Backtesting):
self.trials_file = os.path.join('user_data', 'hyperopt_results.pickle') self.trials_file = os.path.join('user_data', 'hyperopt_results.pickle')
self.trials: List = [] self.trials: List = []
self.opt = None
self.f_val: List = []
def get_args(self, params): def get_args(self, params):
dimensions = self.hyperopt_space() dimensions = self.hyperopt_space()
# Ensure the number of dimensions match # Ensure the number of dimensions match
@ -114,7 +123,7 @@ class Hyperopt(Backtesting):
logger.info('ROI table:') logger.info('ROI table:')
pprint(self.custom_hyperopt.generate_roi_table(best_result['params']), indent=4) pprint(self.custom_hyperopt.generate_roi_table(best_result['params']), indent=4)
def log_results_immediate(self) -> None: def log_results_immediate(self, n) -> None:
print('.', end='') print('.', end='')
sys.stdout.flush() sys.stdout.flush()
@ -169,7 +178,21 @@ class Hyperopt(Backtesting):
return spaces return spaces
def generate_optimizer(self, _params: Dict) -> Dict: def generate_optimizer(self, _params: Dict) -> Dict:
"""
Process params (a point in the hyper options space generated by the
optimizer.ask() optimizer method) and run backtest() for it.
This method is executed in one of the joblib worker processes.
The optimizer.tell() function is executed in the joblib parent process
in order to tell results to single instance of the optimizer rather than
its copies serialized in the joblib worker processes.
It receives result of this method as a part of the joblib batch completion
callback function.
"""
params = self.get_args(_params) params = self.get_args(_params)
if self.has_space('roi'): if self.has_space('roi'):
self.strategy.minimal_roi = self.custom_hyperopt.generate_roi_table(params) self.strategy.minimal_roi = self.custom_hyperopt.generate_roi_table(params)
@ -203,19 +226,14 @@ class Hyperopt(Backtesting):
trade_count = len(results.index) trade_count = len(results.index)
trade_duration = results.trade_duration.mean() trade_duration = results.trade_duration.mean()
if trade_count == 0: loss = MAX_LOSS if trade_count == 0 else \
return { self.calculate_loss(total_profit, trade_count, trade_duration)
'loss': MAX_LOSS,
'params': params,
'result': result_explanation,
}
loss = self.calculate_loss(total_profit, trade_count, trade_duration)
return { return {
'loss': loss, 'loss': loss,
'params': params, 'params': params,
'result': result_explanation, 'result': result_explanation,
'asked': _params,
} }
def format_results(self, results: DataFrame) -> str: def format_results(self, results: DataFrame) -> str:
@ -239,20 +257,35 @@ class Hyperopt(Backtesting):
base_estimator="ET", base_estimator="ET",
acq_optimizer="auto", acq_optimizer="auto",
n_initial_points=30, n_initial_points=30,
acq_optimizer_kwargs={'n_jobs': cpu_count} acq_optimizer_kwargs={'n_jobs': cpu_count},
random_state=777,
) )
def run_optimizer_parallel(self, parallel, opt, tries: int, first_try: int) -> List: def run_optimizer_parallel(self, parallel, tries: int, first_try: int) -> List:
result = parallel(delayed( result = parallel(delayed(
wrap_non_picklable_objects(self.parallel_tell_and_log)) wrap_non_picklable_objects(self.parallel_objective))
(opt, i, opt.ask()) for i in range(first_try, first_try + tries)) (asked, i) for asked, i in
zip(self.opt_generator(), range(first_try, first_try + tries)))
return result return result
def parallel_tell_and_log(self, opt, i, asked): def opt_generator(self):
f_val = self.generate_optimizer(asked) while True:
opt.tell(asked, f_val['loss']) if self.f_val:
self.log_results_immediate() # print("opt.tell(): ",
return f_val # [v['asked'] for v in self.f_val], [v['loss'] for v in self.f_val])
functools.partial(
self.opt.tell,
([v['asked'] for v in self.f_val], [v['loss'] for v in self.f_val])
)
self.f_val = []
yield self.opt.ask()
def parallel_objective(self, asked, n):
self.log_results_immediate(n)
return self.generate_optimizer(asked)
def parallel_callback(self, f_val):
self.f_val.extend(f_val)
def load_previous_results(self): def load_previous_results(self):
""" read trials file if we have one """ """ read trials file if we have one """
@ -283,25 +316,27 @@ class Hyperopt(Backtesting):
cpus = multiprocessing.cpu_count() cpus = multiprocessing.cpu_count()
logger.info(f'Found {cpus} CPU cores. Let\'s make them scream!') logger.info(f'Found {cpus} CPU cores. Let\'s make them scream!')
opt = self.get_optimizer(cpus) self.opt = self.get_optimizer(cpus)
frames = self.total_tries // EVALS_FRAME frames = self.total_tries // EVALS_FRAME
last_frame_len = self.total_tries % EVALS_FRAME last_frame_len = self.total_tries % EVALS_FRAME
try: try:
with Parallel(n_jobs=cpus, verbose=0) as parallel: register_parallel_backend('custom', CustomImmediateResultBackend)
for frame in range(frames + 1): with parallel_backend('custom'):
frame_start = frame * EVALS_FRAME with Parallel(n_jobs=cpus, verbose=0) as parallel:
frame_len = last_frame_len if frame == frames else EVALS_FRAME for frame in range(frames + 1):
print(f"\n{frame_start+1}-{frame_start+frame_len}" frame_start = frame * EVALS_FRAME
f"/{self.total_tries}: ", end='') frame_len = last_frame_len if frame == frames else EVALS_FRAME
f_val = self.run_optimizer_parallel( print(f"\n{frame_start+1}-{frame_start+frame_len}"
parallel, opt, f"/{self.total_tries}: ", end='')
f_val = self.run_optimizer_parallel(
parallel,
frame_len, frame_len,
frame_start frame_start
) )
self.trials += f_val self.trials += f_val
self.log_results(f_val, frame_start, self.total_tries) self.log_results(f_val, frame_start, self.total_tries)
except KeyboardInterrupt: except KeyboardInterrupt:
print('User interrupted..') print('User interrupted..')
@ -337,5 +372,32 @@ def start(args: Namespace) -> None:
"to understand how to configure hyperopt.") "to understand how to configure hyperopt.")
raise ValueError("--strategy configured but not supported for hyperopt") raise ValueError("--strategy configured but not supported for hyperopt")
# Initialize backtesting object # Initialize backtesting object
hyperopt = Hyperopt(config) global _hyperopt
hyperopt.start() _hyperopt = Hyperopt(config)
_hyperopt.start()
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class CustomImmediateResultBackend(LokyBackend):
def callback(self, result):
"""
Our custom completion callback. Executed in the parent process.
Use it to run Optimizer.tell() with immediate results of the backtest()
evaluated in the joblib worker process.
"""
# Fetch results from the Future object passed to us.
# Future object is assumed to be 'done' already.
f_val = result.result().copy()
_hyperopt.parallel_callback(f_val)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)

View File

@ -358,7 +358,12 @@ def test_generate_optimizer(mocker, default_conf) -> None:
'loss': 1.9840569076926293, 'loss': 1.9840569076926293,
'result': ' 1 trades. Avg profit 2.31%. Total profit 0.00023300 BTC ' 'result': ' 1 trades. Avg profit 2.31%. Total profit 0.00023300 BTC '
'(0.0231Σ%). Avg duration 100.0 mins.', '(0.0231Σ%). Avg duration 100.0 mins.',
'params': optimizer_param 'params': optimizer_param,
'asked': [
0, 35, 0, 0, False, True, False, False, 'macd_cross_signal', 0, 75, 0, 0,
False, True, False, False, 'macd_cross_signal', 60.0, 30.0, 20.0,
0.01, 0.01, 0.1, -0.4
]
} }
hyperopt = Hyperopt(default_conf) hyperopt = Hyperopt(default_conf)