- batched hyperopt

- auto epochs
This commit is contained in:
orehunt 2020-02-21 10:40:26 +01:00
parent 43add0b159
commit 0a49dcb712
4 changed files with 229 additions and 105 deletions

View File

@ -54,6 +54,7 @@ def start_hyperopt(args: Dict[str, Any]) -> None:
try:
from filelock import FileLock, Timeout
from freqtrade.optimize.hyperopt import Hyperopt
from freqtrade.optimize import hyperopt_backend as backend
except ImportError as e:
raise OperationalException(
f"{e}. Please ensure that the hyperopt dependencies are installed.") from e
@ -72,8 +73,8 @@ def start_hyperopt(args: Dict[str, Any]) -> None:
logging.getLogger('filelock').setLevel(logging.WARNING)
# Initialize backtesting object
hyperopt = Hyperopt(config)
hyperopt.start()
backend.hyperopt = Hyperopt(config)
backend.hyperopt.start()
except Timeout:
logger.info("Another running instance of freqtrade Hyperopt detected.")

View File

@ -1,15 +1,17 @@
# pragma pylint: disable=too-many-instance-attributes, pointless-string-statement
"""
This module contains the hyperopt logic
"""
import os
import functools
import locale
import logging
import random
import sys
import warnings
from collections import OrderedDict
from math import factorial, log
from operator import itemgetter
from pathlib import Path
from pprint import pprint
@ -18,9 +20,6 @@ from typing import Any, Dict, List, Optional
import rapidjson
from colorama import Fore, Style
from colorama import init as colorama_init
from joblib import (Parallel, cpu_count, delayed, dump, load,
wrap_non_picklable_objects)
from pandas import DataFrame
from freqtrade.data.converter import trim_dataframe
from freqtrade.data.history import get_timerange
@ -28,10 +27,14 @@ from freqtrade.exceptions import OperationalException
from freqtrade.misc import plural, round_dict
from freqtrade.optimize.backtesting import Backtesting
# Import IHyperOpt and IHyperOptLoss to allow unpickling classes from these modules
from freqtrade.optimize.hyperopt_backend import CustomImmediateResultBackend
from freqtrade.optimize.hyperopt_interface import IHyperOpt # noqa: F401
from freqtrade.optimize.hyperopt_loss_interface import IHyperOptLoss # noqa: F401
from freqtrade.resolvers.hyperopt_resolver import (HyperOptLossResolver,
HyperOptResolver)
from freqtrade.resolvers.hyperopt_resolver import (HyperOptLossResolver, HyperOptResolver)
from joblib import (Parallel, cpu_count, delayed, dump, load, wrap_non_picklable_objects)
from joblib._parallel_backends import LokyBackend
from joblib import register_parallel_backend, parallel_backend
from pandas import DataFrame
# Suppress scikit-learn FutureWarnings from skopt
with warnings.catch_warnings():
@ -39,10 +42,8 @@ with warnings.catch_warnings():
from skopt import Optimizer
from skopt.space import Dimension
logger = logging.getLogger(__name__)
INITIAL_POINTS = 30
# Keep no more than 2*SKOPT_MODELS_MAX_NUM models
@ -60,7 +61,6 @@ class Hyperopt:
hyperopt = Hyperopt(config)
hyperopt.start()
"""
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
@ -71,13 +71,21 @@ class Hyperopt:
self.custom_hyperoptloss = HyperOptLossResolver.load_hyperoptloss(self.config)
self.calculate_loss = self.custom_hyperoptloss.hyperopt_loss_function
self.trials_file = (self.config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
self.tickerdata_pickle = (self.config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_tickerdata.pkl')
self.total_epochs = config.get('epochs', 0)
self.trials_file = (self.config['user_data_dir'] / 'hyperopt_results' /
'hyperopt_results.pickle')
self.tickerdata_pickle = (self.config['user_data_dir'] / 'hyperopt_results' /
'hyperopt_tickerdata.pkl')
self.effort = config.get('epochs', 0) or 1
self.total_epochs = 9999
self.max_epoch = 9999
self.search_space_size = 0
self.max_epoch_reached = False
self.min_epochs = INITIAL_POINTS
self.current_best_loss = 100
self.current_best_epoch = 0
self.epochs_since_last_best = []
self.avg_best_occurrence = 0
if not self.config.get('hyperopt_continue'):
self.clean_hyperopt()
@ -89,6 +97,10 @@ class Hyperopt:
# Previous evaluations
self.trials: List = []
self.opt: Optimizer
self.opt = None
self.f_val: List = []
# Populate functions here (hasattr is slow so should not be run during "regular" operations)
if hasattr(self.custom_hyperopt, 'populate_indicators'):
self.backtesting.strategy.advise_indicators = \
@ -175,24 +187,27 @@ class Hyperopt:
result: Dict = {}
if self.has_space('buy'):
result['buy'] = {p.name: params.get(p.name)
for p in self.hyperopt_space('buy')}
result['buy'] = {p.name: params.get(p.name) for p in self.hyperopt_space('buy')}
if self.has_space('sell'):
result['sell'] = {p.name: params.get(p.name)
for p in self.hyperopt_space('sell')}
result['sell'] = {p.name: params.get(p.name) for p in self.hyperopt_space('sell')}
if self.has_space('roi'):
result['roi'] = self.custom_hyperopt.generate_roi_table(params)
if self.has_space('stoploss'):
result['stoploss'] = {p.name: params.get(p.name)
for p in self.hyperopt_space('stoploss')}
result['stoploss'] = {
p.name: params.get(p.name)
for p in self.hyperopt_space('stoploss')
}
if self.has_space('trailing'):
result['trailing'] = self.custom_hyperopt.generate_trailing_params(params)
return result
@staticmethod
def print_epoch_details(results, total_epochs: int, print_json: bool,
no_header: bool = False, header_str: str = None) -> None:
def print_epoch_details(results,
total_epochs: int,
print_json: bool,
no_header: bool = False,
header_str: str = None) -> None:
"""
Display details of the hyperopt result
"""
@ -231,8 +246,7 @@ class Hyperopt:
# OrderedDict is used to keep the numeric order of the items
# in the dict.
result_dict['minimal_roi'] = OrderedDict(
(str(k), v) for k, v in space_params.items()
)
(str(k), v) for k, v in space_params.items())
else: # 'stoploss', 'trailing'
result_dict.update(space_params)
@ -261,16 +275,7 @@ class Hyperopt:
Log results if it is better than any previous evaluation
"""
is_best = results['is_best']
if not self.print_all:
# Print '\n' after each 100th epoch to separate dots from the log messages.
# Otherwise output is messy on a terminal.
print('.', end='' if results['current_epoch'] % 100 != 0 else None) # type: ignore
sys.stdout.flush()
if self.print_all or is_best:
if not self.print_all:
# Separate the results explanation string from dots
print("\n")
self.print_results_explanation(results, self.total_epochs, self.print_all,
self.print_colorized)
@ -291,10 +296,9 @@ class Hyperopt:
@staticmethod
def _format_explanation_string(results, total_epochs) -> str:
return (("*" if results['is_initial_point'] else " ") +
return (("*" if 'is_initial_point' in results and results['is_initial_point'] else " ") +
f"{results['current_epoch']:5d}/{total_epochs}: " +
f"{results['results_explanation']} " +
f"Objective: {results['loss']:.5f}")
f"{results['results_explanation']} " + f"Objective: {results['loss']:.5f}")
def has_space(self, space: str) -> bool:
"""
@ -381,11 +385,11 @@ class Hyperopt:
max_open_trades=self.max_open_trades,
position_stacking=self.position_stacking,
)
return self._get_results_dict(backtesting_results, min_date, max_date,
params_dict, params_details)
return self._get_results_dict(backtesting_results, min_date, max_date, params_dict,
params_details)
def _get_results_dict(self, backtesting_results, min_date, max_date,
params_dict, params_details):
def _get_results_dict(self, backtesting_results, min_date, max_date, params_dict,
params_details):
results_metrics = self._calculate_results_metrics(backtesting_results)
results_explanation = self._format_results_explanation_string(results_metrics)
@ -398,8 +402,10 @@ class Hyperopt:
# path. We do not want to optimize 'hodl' strategies.
loss: float = MAX_LOSS
if trade_count >= self.config['hyperopt_min_trades']:
loss = self.calculate_loss(results=backtesting_results, trade_count=trade_count,
min_date=min_date.datetime, max_date=max_date.datetime)
loss = self.calculate_loss(results=backtesting_results,
trade_count=trade_count,
min_date=min_date.datetime,
max_date=max_date.datetime)
return {
'loss': loss,
'params_dict': params_dict,
@ -427,39 +433,75 @@ class Hyperopt:
f"Avg profit {results_metrics['avg_profit']: 6.2f}%. "
f"Total profit {results_metrics['total_profit']: 11.8f} {stake_cur} "
f"({results_metrics['profit']: 7.2f}\N{GREEK CAPITAL LETTER SIGMA}%). "
f"Avg duration {results_metrics['duration']:5.1f} min."
).encode(locale.getpreferredencoding(), 'replace').decode('utf-8')
f"Avg duration {results_metrics['duration']:5.1f} min.").encode(
locale.getpreferredencoding(), 'replace').decode('utf-8')
def get_optimizer(self, dimensions: List[Dimension], cpu_count) -> Optimizer:
def get_optimizer(self, dimensions: List[Dimension], cpu_count,
n_initial_points=INITIAL_POINTS) -> Optimizer:
return Optimizer(
dimensions,
base_estimator="ET",
acq_optimizer="auto",
n_initial_points=INITIAL_POINTS,
n_initial_points=n_initial_points,
acq_optimizer_kwargs={'n_jobs': cpu_count},
model_queue_size=SKOPT_MODELS_MAX_NUM,
random_state=self.random_state,
)
def fix_optimizer_models_list(self) -> None:
"""
WORKAROUND: Since skopt is not actively supported, this resolves problems with skopt
memory usage, see also: https://github.com/scikit-optimize/scikit-optimize/pull/746
def run_optimizer_parallel(self, parallel, tries: int, first_try: int) -> List:
result = parallel(
delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, i)
for asked, i in zip(self.opt_generator(), range(first_try, first_try + tries)))
return result
This may cease working when skopt updates if implementation of this intrinsic
part changes.
"""
n = len(self.opt.models) - SKOPT_MODELS_MAX_NUM
# Keep no more than 2*SKOPT_MODELS_MAX_NUM models in the skopt models list,
# remove the old ones. These are actually of no use, the current model
# from the estimator is the only one used in the skopt optimizer.
# Freqtrade code also does not inspect details of the models.
if n >= SKOPT_MODELS_MAX_NUM:
logger.debug(f"Fixing skopt models list, removing {n} old items...")
del self.opt.models[0:n]
def opt_generator(self):
while True:
if self.f_val:
# print("opt.tell(): ",
# [v['params_dict'] for v in self.f_val], [v['loss'] for v in self.f_val])
functools.partial(self.opt.tell,
([v['params_dict']
for v in self.f_val], [v['loss'] for v in self.f_val]))
self.f_val = []
yield self.opt.ask()
def run_optimizer_parallel(self, parallel, asked, i) -> List:
return parallel(delayed(
wrap_non_picklable_objects(self.generate_optimizer))(v, i) for v in asked)
def parallel_objective(self, asked, n):
self.log_results_immediate(n)
return self.generate_optimizer(asked)
def parallel_callback(self, f_val):
self.f_val.extend(f_val)
def log_results_immediate(self, n) -> None:
print('.', end='')
sys.stdout.flush()
def log_results(self, f_val, frame_start, max_epoch) -> None:
"""
Log results if it is better than any previous evaluation
"""
for i, v in enumerate(f_val):
is_best = self.is_best_loss(v, self.current_best_loss)
current = frame_start + i + 1
v['is_best'] = is_best
v['current_epoch'] = current
v['is_initial_point'] = current <= self.n_initial_points
logger.debug(f"Optimizer epoch evaluated: {v}")
if is_best:
self.current_best_loss = v['loss']
self.update_max_epoch(v, current)
self.print_results(v)
self.trials.append(v)
# Save results after every batch
print('\n')
self.save_trials()
# give up if no best since max epochs
if current > self.max_epoch:
self.max_epoch_reached = True
# testing trapdoor
if os.getenv('FQT_HYPEROPT_TRAP'):
logger.debug('bypassing hyperopt loop')
self.max_epoch = 1
@staticmethod
def load_previous_results(trials_file: Path) -> List:
@ -479,6 +521,55 @@ class Hyperopt:
def _set_random_state(self, random_state: Optional[int]) -> int:
return random_state or random.randint(1, 2**16 - 1)
@staticmethod
def calc_epochs(dimensions: List[Dimension], config_jobs: int, effort: int):
""" Compute a reasonable number of initial points and
a minimum number of epochs to evaluate """
n_dimensions = len(dimensions)
n_parameters = 0
# sum all the dimensions discretely, granting minimum values
for d in dimensions:
if type(d).__name__ == 'Integer':
n_parameters += max(1, d.high - d.low)
elif type(d).__name__ == 'Real':
n_parameters += max(10, int(d.high - d.low))
else:
n_parameters += len(d.bounds)
# guess the size of the search space as the count of the
# unordered combination of the dimensions entries
search_space_size = (factorial(n_parameters) /
(factorial(n_parameters - n_dimensions) * factorial(n_dimensions)))
# logger.info(f'Search space size: {search_space_size}')
if search_space_size < config_jobs:
# don't waste if the space is small
n_initial_points = config_jobs
else:
# extract coefficients from the search space and the jobs count
log_sss = int(log(search_space_size, 10))
log_jobs = int(log(config_jobs, 2))
log_jobs = 2 if log_jobs < 0 else log_jobs
jobs_ip = log_jobs * log_sss
# never waste
n_initial_points = log_sss if jobs_ip > search_space_size else jobs_ip
# it shall run for this much, I say
min_epochs = max(2 * n_initial_points, 3 * config_jobs) * effort
return n_initial_points, min_epochs, search_space_size
def update_max_epoch(self, val: Dict, current: int):
""" calculate max epochs: store the number of non best epochs
between each best, and get the mean of that value """
if val['is_initial_point'] is not True:
self.epochs_since_last_best.append(current - self.current_best_epoch)
self.avg_best_occurrence = (sum(self.epochs_since_last_best) //
len(self.epochs_since_last_best))
self.current_best_epoch = current
self.max_epoch = (self.current_best_epoch + self.avg_best_occurrence +
self.min_epochs) * self.effort
if self.max_epoch > self.search_space_size:
self.max_epoch = self.search_space_size
print('\n')
logger.info(f'Max epochs set to: {self.max_epoch}')
def start(self) -> None:
self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None))
logger.info(f"Using optimizer random state: {self.random_state}")
@ -492,10 +583,8 @@ class Hyperopt:
preprocessed[pair] = trim_dataframe(df, timerange)
min_date, max_date = get_timerange(data)
logger.info(
'Hyperopting with data from %s up to %s (%s days)..',
min_date.isoformat(), max_date.isoformat(), (max_date - min_date).days
)
logger.info('Hyperopting with data from %s up to %s (%s days)..', min_date.isoformat(),
max_date.isoformat(), (max_date - min_date).days)
dump(preprocessed, self.tickerdata_pickle)
# We don't need exchange instance anymore while running hyperopt
@ -509,46 +598,41 @@ class Hyperopt:
logger.info(f'Number of parallel jobs set as: {config_jobs}')
self.dimensions: List[Dimension] = self.hyperopt_space()
self.opt = self.get_optimizer(self.dimensions, config_jobs)
self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs(
self.dimensions, config_jobs, self.effort)
logger.info(f"Min epochs set to: {self.min_epochs}")
self.max_epoch = self.min_epochs
self.avg_best_occurrence = self.max_epoch
logger.info(f'Initial points: {self.n_initial_points}')
self.opt = self.get_optimizer(self.dimensions, config_jobs, self.n_initial_points)
# last_frame_len = (self.total_epochs - 1) % self.avg_best_occurrence
if self.print_colorized:
colorama_init(autoreset=True)
try:
with Parallel(n_jobs=config_jobs) as parallel:
jobs = parallel._effective_n_jobs()
logger.info(f'Effective number of parallel workers used: {jobs}')
EVALS = max(self.total_epochs // jobs, 1)
for i in range(EVALS):
asked = self.opt.ask(n_points=jobs)
f_val = self.run_optimizer_parallel(parallel, asked, i)
self.opt.tell(asked, [v['loss'] for v in f_val])
self.fix_optimizer_models_list()
for j in range(jobs):
# Use human-friendly indexes here (starting from 1)
current = i * jobs + j + 1
val = f_val[j]
val['current_epoch'] = current
val['is_initial_point'] = current <= INITIAL_POINTS
logger.debug(f"Optimizer epoch evaluated: {val}")
try:
register_parallel_backend('custom', CustomImmediateResultBackend)
with parallel_backend('custom'):
with Parallel(n_jobs=config_jobs, verbose=0) as parallel:
for frame in range(self.total_epochs):
epochs_so_far = len(self.trials)
# pad the frame length to the number of jobs to avoid desaturation
frame_len = (self.avg_best_occurrence + config_jobs -
self.avg_best_occurrence % config_jobs)
print(
f"{epochs_so_far+1}-{epochs_so_far+self.avg_best_occurrence}"
f"/{self.total_epochs}: ",
end='')
f_val = self.run_optimizer_parallel(parallel, frame_len, epochs_so_far)
self.log_results(f_val, epochs_so_far, self.total_epochs)
if self.max_epoch_reached:
logger.info("Max epoch reached, terminating.")
break
is_best = self.is_best_loss(val, self.current_best_loss)
# This value is assigned here and not in the optimization method
# to keep proper order in the list of results. That's because
# evaluations can take different time. Here they are aligned in the
# order they will be shown to the user.
val['is_best'] = is_best
self.print_results(val)
if is_best:
self.current_best_loss = val['loss']
self.trials.append(val)
# Save results after each best epoch and every 100 epochs
if is_best or current % 100 == 0:
self.save_trials()
except KeyboardInterrupt:
print('User interrupted..')
except KeyboardInterrupt:
print("User interrupted..")
self.save_trials(final=True)
@ -560,3 +644,8 @@ class Hyperopt:
# This is printed when Ctrl+C is pressed quickly, before first epochs have
# a chance to be evaluated.
print("No epochs evaluated yet, no best result.")
def __getstate__(self):
state = self.__dict__.copy()
del state['trials']
return state

View File

@ -0,0 +1,30 @@
from joblib._parallel_backends import LokyBackend
hyperopt = None
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class CustomImmediateResultBackend(LokyBackend):
def callback(self, result):
"""
Our custom completion callback. Executed in the parent process.
Use it to run Optimizer.tell() with immediate results of the backtest()
evaluated in the joblib worker process.
"""
if not result.exception():
# Fetch results from the Future object passed to us.
# Future object is assumed to be 'done' already.
f_val = result.result().copy()
hyperopt.parallel_callback(f_val)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)

View File

@ -13,3 +13,7 @@ ignore_missing_imports = True
[mypy-tests.*]
ignore_errors = True
[yapf]
based_on_style = pep8
column_limit = 100