- added multi optimizer mode

- tweaked optimizer config
- dump optimizer state on disk
This commit is contained in:
orehunt 2020-03-02 07:47:34 +01:00
parent d96e842a21
commit f797413c80
7 changed files with 897 additions and 578 deletions

View File

@ -24,7 +24,8 @@ ARGS_BACKTEST = ARGS_COMMON_OPTIMIZE + [
ARGS_HYPEROPT = ARGS_COMMON_OPTIMIZE + [
"hyperopt", "hyperopt_path", "position_stacking", "epochs", "spaces",
"use_max_market_positions", "print_all", "print_colorized", "print_json", "hyperopt_jobs",
"hyperopt_random_state", "hyperopt_min_trades", "hyperopt_continue", "hyperopt_loss", "effort"
"hyperopt_random_state", "hyperopt_min_trades", "hyperopt_continue", "hyperopt_loss", "effort",
"multi_opt", "points_per_opt"
]
ARGS_EDGE = ARGS_COMMON_OPTIMIZE + ["stoploss_range"]

View File

@ -234,10 +234,24 @@ AVAILABLE_CLI_OPTIONS = {
'--effort',
help=('The higher the number, the longer will be the search if'
'no epochs are defined (default: %(default)d).'),
type=check_int_positive,
metavar='INT',
type=float,
metavar='FLOAT',
default=constants.HYPEROPT_EFFORT,
),
"multi_opt":
Arg('--multi',
help=('Switches hyperopt to use one optimizer per job, use it',
'when backtesting iterations are cheap (default: %(default)d).'),
action='store_true',
default=False),
"points_per_opt":
Arg('--points-per-opt',
help=('Controls how many points to ask at each job dispatch to each',
'optimizer in multi opt mode, increase if cpu usage of each core',
'appears low (default: %(default)d).'),
type=int,
metavar='INT',
default=constants.HYPEROPT_POINTS_PER_OPT),
"spaces":
Arg(
'--spaces',

View File

@ -10,8 +10,7 @@ from typing import Any, Callable, Dict, List, Optional
from freqtrade import constants
from freqtrade.configuration.check_exchange import check_exchange
from freqtrade.configuration.deprecated_settings import process_temporary_deprecated_settings
from freqtrade.configuration.directory_operations import (create_datadir,
create_userdata_dir)
from freqtrade.configuration.directory_operations import (create_datadir, create_userdata_dir)
from freqtrade.configuration.load_config import load_config_file
from freqtrade.exceptions import OperationalException
from freqtrade.loggers import setup_logging
@ -26,7 +25,6 @@ class Configuration:
Class to read and init the bot configuration
Reuse this class for the bot, backtesting, hyperopt and every script that required configuration
"""
def __init__(self, args: Dict[str, Any], runmode: RunMode = None) -> None:
self.args = args
self.config: Optional[Dict[str, Any]] = None
@ -152,11 +150,12 @@ class Configuration:
if self.args.get("strategy") or not config.get('strategy'):
config.update({'strategy': self.args.get("strategy")})
self._args_to_config(config, argname='strategy_path',
self._args_to_config(config,
argname='strategy_path',
logstring='Using additional Strategy lookup path: {}')
if ('db_url' in self.args and self.args["db_url"] and
self.args["db_url"] != constants.DEFAULT_DB_PROD_URL):
if ('db_url' in self.args and self.args["db_url"]
and self.args["db_url"] != constants.DEFAULT_DB_PROD_URL):
config.update({'db_url': self.args["db_url"]})
logger.info('Parameter --db-url detected ...')
@ -167,7 +166,8 @@ class Configuration:
if 'sd_notify' in self.args and self.args["sd_notify"]:
config['internals'].update({'sd_notify': True})
self._args_to_config(config, argname='dry_run',
self._args_to_config(config,
argname='dry_run',
logstring='Parameter --dry-run detected, '
'overriding dry_run to: {} ...')
@ -198,20 +198,23 @@ class Configuration:
logger.info('Using data directory: %s ...', config.get('datadir'))
if self.args.get('exportfilename'):
self._args_to_config(config, argname='exportfilename',
self._args_to_config(config,
argname='exportfilename',
logstring='Storing backtest results to {} ...')
else:
config['exportfilename'] = (config['user_data_dir']
/ 'backtest_results/backtest-result.json')
config['exportfilename'] = (config['user_data_dir'] /
'backtest_results/backtest-result.json')
def _process_optimize_options(self, config: Dict[str, Any]) -> None:
# This will override the strategy configuration
self._args_to_config(config, argname='ticker_interval',
self._args_to_config(config,
argname='ticker_interval',
logstring='Parameter -i/--ticker-interval detected ... '
'Using ticker_interval: {} ...')
self._args_to_config(config, argname='position_stacking',
self._args_to_config(config,
argname='position_stacking',
logstring='Parameter --enable-position-stacking detected ...')
# Setting max_open_trades to infinite if -1
@ -224,31 +227,39 @@ class Configuration:
logger.info('max_open_trades set to unlimited ...')
elif 'max_open_trades' in self.args and self.args["max_open_trades"]:
config.update({'max_open_trades': self.args["max_open_trades"]})
logger.info('Parameter --max-open-trades detected, '
logger.info(
'Parameter --max-open-trades detected, '
'overriding max_open_trades to: %s ...', config.get('max_open_trades'))
elif config['runmode'] in NON_UTIL_MODES:
logger.info('Using max_open_trades: %s ...', config.get('max_open_trades'))
self._args_to_config(config, argname='stake_amount',
self._args_to_config(config,
argname='stake_amount',
logstring='Parameter --stake-amount detected, '
'overriding stake_amount to: {} ...')
self._args_to_config(config, argname='fee',
self._args_to_config(config,
argname='fee',
logstring='Parameter --fee detected, '
'setting fee to: {} ...')
self._args_to_config(config, argname='timerange',
self._args_to_config(config,
argname='timerange',
logstring='Parameter --timerange detected: {} ...')
self._process_datadir_options(config)
self._args_to_config(config, argname='strategy_list',
logstring='Using strategy list of {} strategies', logfun=len)
self._args_to_config(config,
argname='strategy_list',
logstring='Using strategy list of {} strategies',
logfun=len)
self._args_to_config(config, argname='ticker_interval',
self._args_to_config(config,
argname='ticker_interval',
logstring='Overriding ticker interval with Command line argument')
self._args_to_config(config, argname='export',
self._args_to_config(config,
argname='export',
logstring='Parameter --export detected: {} ...')
# Edge section:
@ -260,21 +271,32 @@ class Configuration:
logger.info('Parameter --stoplosses detected: %s ...', self.args["stoploss_range"])
# Hyperopt section
self._args_to_config(config, argname='hyperopt',
logstring='Using Hyperopt class name: {}')
self._args_to_config(config, argname='hyperopt', logstring='Using Hyperopt class name: {}')
self._args_to_config(config, argname='hyperopt_path',
self._args_to_config(config,
argname='hyperopt_path',
logstring='Using additional Hyperopt lookup path: {}')
self._args_to_config(config, argname='epochs',
self._args_to_config(config,
argname='epochs',
logstring='Parameter --epochs detected ... '
'Will run Hyperopt with for {} epochs ...'
)
self._args_to_config(config, argname='spaces',
'Will run Hyperopt with for {} epochs ...')
self._args_to_config(config,
argname='effort',
logstring='Parameter --effort detected ... '
'Parameter --effort detected: {}')
self._args_to_config(config,
argname='multi_opt',
logstring='Hyperopt will use multiple optimizers ...')
self._args_to_config(config,
argname='points_per_opt',
logstring='Optimizers will be asked for {} points...')
self._args_to_config(config,
argname='spaces',
logstring='Parameter -s/--spaces detected: {}')
self._args_to_config(config, argname='print_all',
self._args_to_config(config,
argname='print_all',
logstring='Parameter --print-all detected ...')
if 'print_colorized' in self.args and not self.args["print_colorized"]:
@ -283,95 +305,109 @@ class Configuration:
else:
config.update({'print_colorized': True})
self._args_to_config(config, argname='print_json',
self._args_to_config(config,
argname='print_json',
logstring='Parameter --print-json detected ...')
self._args_to_config(config, argname='hyperopt_jobs',
self._args_to_config(config,
argname='hyperopt_jobs',
logstring='Parameter -j/--job-workers detected: {}')
self._args_to_config(config, argname='hyperopt_random_state',
self._args_to_config(config,
argname='hyperopt_random_state',
logstring='Parameter --random-state detected: {}')
self._args_to_config(config, argname='hyperopt_min_trades',
self._args_to_config(config,
argname='hyperopt_min_trades',
logstring='Parameter --min-trades detected: {}')
self._args_to_config(config, argname='hyperopt_continue',
logstring='Hyperopt continue: {}')
self._args_to_config(config, argname='hyperopt_continue', logstring='Hyperopt continue: {}')
self._args_to_config(config, argname='hyperopt_loss',
self._args_to_config(config,
argname='hyperopt_loss',
logstring='Using Hyperopt loss class name: {}')
self._args_to_config(config, argname='hyperopt_show_index',
self._args_to_config(config,
argname='hyperopt_show_index',
logstring='Parameter -n/--index detected: {}')
self._args_to_config(config, argname='hyperopt_list_best',
self._args_to_config(config,
argname='hyperopt_list_best',
logstring='Parameter --best detected: {}')
self._args_to_config(config, argname='hyperopt_list_profitable',
self._args_to_config(config,
argname='hyperopt_list_profitable',
logstring='Parameter --profitable detected: {}')
self._args_to_config(config, argname='hyperopt_list_min_trades',
self._args_to_config(config,
argname='hyperopt_list_min_trades',
logstring='Parameter --min-trades detected: {}')
self._args_to_config(config, argname='hyperopt_list_max_trades',
self._args_to_config(config,
argname='hyperopt_list_max_trades',
logstring='Parameter --max-trades detected: {}')
self._args_to_config(config, argname='hyperopt_list_min_avg_time',
self._args_to_config(config,
argname='hyperopt_list_min_avg_time',
logstring='Parameter --min-avg-time detected: {}')
self._args_to_config(config, argname='hyperopt_list_max_avg_time',
self._args_to_config(config,
argname='hyperopt_list_max_avg_time',
logstring='Parameter --max-avg-time detected: {}')
self._args_to_config(config, argname='hyperopt_list_min_avg_profit',
self._args_to_config(config,
argname='hyperopt_list_min_avg_profit',
logstring='Parameter --min-avg-profit detected: {}')
self._args_to_config(config, argname='hyperopt_list_max_avg_profit',
self._args_to_config(config,
argname='hyperopt_list_max_avg_profit',
logstring='Parameter --max-avg-profit detected: {}')
self._args_to_config(config, argname='hyperopt_list_min_total_profit',
self._args_to_config(config,
argname='hyperopt_list_min_total_profit',
logstring='Parameter --min-total-profit detected: {}')
self._args_to_config(config, argname='hyperopt_list_max_total_profit',
self._args_to_config(config,
argname='hyperopt_list_max_total_profit',
logstring='Parameter --max-total-profit detected: {}')
self._args_to_config(config, argname='hyperopt_list_no_details',
self._args_to_config(config,
argname='hyperopt_list_no_details',
logstring='Parameter --no-details detected: {}')
self._args_to_config(config, argname='hyperopt_show_no_header',
self._args_to_config(config,
argname='hyperopt_show_no_header',
logstring='Parameter --no-header detected: {}')
def _process_plot_options(self, config: Dict[str, Any]) -> None:
self._args_to_config(config, argname='pairs',
logstring='Using pairs {}')
self._args_to_config(config, argname='pairs', logstring='Using pairs {}')
self._args_to_config(config, argname='indicators1',
logstring='Using indicators1: {}')
self._args_to_config(config, argname='indicators1', logstring='Using indicators1: {}')
self._args_to_config(config, argname='indicators2',
logstring='Using indicators2: {}')
self._args_to_config(config, argname='indicators2', logstring='Using indicators2: {}')
self._args_to_config(config, argname='plot_limit',
logstring='Limiting plot to: {}')
self._args_to_config(config, argname='trade_source',
logstring='Using trades from: {}')
self._args_to_config(config, argname='plot_limit', logstring='Limiting plot to: {}')
self._args_to_config(config, argname='trade_source', logstring='Using trades from: {}')
self._args_to_config(config, argname='erase',
self._args_to_config(config,
argname='erase',
logstring='Erase detected. Deleting existing data.')
self._args_to_config(config, argname='timeframes',
logstring='timeframes --timeframes: {}')
self._args_to_config(config, argname='timeframes', logstring='timeframes --timeframes: {}')
self._args_to_config(config, argname='days',
logstring='Detected --days: {}')
self._args_to_config(config, argname='days', logstring='Detected --days: {}')
self._args_to_config(config, argname='download_trades',
self._args_to_config(config,
argname='download_trades',
logstring='Detected --dl-trades: {}')
self._args_to_config(config, argname='dataformat_ohlcv',
self._args_to_config(config,
argname='dataformat_ohlcv',
logstring='Using "{}" to store OHLCV data.')
self._args_to_config(config, argname='dataformat_trades',
self._args_to_config(config,
argname='dataformat_trades',
logstring='Using "{}" to store trades data.')
def _process_runmode(self, config: Dict[str, Any]) -> None:
@ -383,8 +419,11 @@ class Configuration:
config.update({'runmode': self.runmode})
def _args_to_config(self, config: Dict[str, Any], argname: str,
logstring: str, logfun: Optional[Callable] = None,
def _args_to_config(self,
config: Dict[str, Any],
argname: str,
logstring: str,
logfun: Optional[Callable] = None,
deprecated_msg: Optional[str] = None) -> None:
"""
:param config: Configuration dictionary

View File

@ -6,7 +6,8 @@ DEFAULT_CONFIG = 'config.json'
DEFAULT_EXCHANGE = 'bittrex'
PROCESS_THROTTLE_SECS = 5 # sec
HYPEROPT_EPOCH = 0 # epochs
HYPEROPT_EFFORT = 0 # /10
HYPEROPT_EFFORT = 0. # tune max epoch count
HYPEROPT_POINTS_PER_OPT = 2 # tune iterations between estimations
RETRY_TIMEOUT = 30 # sec
DEFAULT_HYPEROPT_LOSS = 'DefaultHyperOptLoss'
DEFAULT_DB_PROD_URL = 'sqlite:///tradesv3.sqlite'

View File

@ -2,7 +2,7 @@
"""
This module contains the hyperopt logic
"""
import os
import functools
import locale
import logging
@ -10,11 +10,11 @@ import random
import sys
import warnings
from collections import OrderedDict, deque
from math import factorial, log
from math import factorial, log, inf
from operator import itemgetter
from pathlib import Path
from pprint import pprint
from typing import Any, Dict, List, Optional, Callable
from typing import Any, Dict, List, Optional
import rapidjson
from colorama import Fore, Style
@ -26,19 +26,28 @@ from freqtrade.exceptions import OperationalException
from freqtrade.misc import plural, round_dict
from freqtrade.optimize.backtesting import Backtesting
# Import IHyperOpt and IHyperOptLoss to allow unpickling classes from these modules
from freqtrade.optimize.hyperopt_backend import CustomImmediateResultBackend
import freqtrade.optimize.hyperopt_backend as backend
from freqtrade.optimize.hyperopt_interface import IHyperOpt # noqa: F401
from freqtrade.optimize.hyperopt_loss_interface import IHyperOptLoss # noqa: F401
from freqtrade.resolvers.hyperopt_resolver import (HyperOptLossResolver, HyperOptResolver)
from joblib import (Parallel, cpu_count, delayed, dump, load, wrap_non_picklable_objects)
from joblib import register_parallel_backend, parallel_backend
from joblib import parallel_backend
from multiprocessing import Manager
from queue import Queue
from pandas import DataFrame
from numpy import iinfo, int32
# Suppress scikit-learn FutureWarnings from skopt
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
from skopt import Optimizer
from skopt.space import Dimension
# Additional regressors already pluggable into the optimizer
# from sklearn.linear_model import ARDRegression, BayesianRidge
# possibly interesting regressors that need predict method override
# from sklearn.ensemble import HistGradientBoostingRegressor
# from xgboost import XGBoostRegressor
logger = logging.getLogger(__name__)
@ -48,6 +57,10 @@ INITIAL_POINTS = 30
# in the skopt models list
SKOPT_MODELS_MAX_NUM = 10
# supported strategies when asking for multiple points to the optimizer
NEXT_POINT_METHODS = ["cl_min", "cl_mean", "cl_max"]
NEXT_POINT_METHODS_LENGTH = 3
MAX_LOSS = 100000 # just a big enough number to be bad result in loss optimization
@ -72,16 +85,24 @@ class Hyperopt:
self.trials_file = (self.config['user_data_dir'] / 'hyperopt_results' /
'hyperopt_results.pickle')
self.opts_file = (self.config['user_data_dir'] / 'hyperopt_results' /
'hyperopt_optimizers.pickle')
self.tickerdata_pickle = (self.config['user_data_dir'] / 'hyperopt_results' /
'hyperopt_tickerdata.pkl')
self.total_epochs = config['epochs'] if 'epochs' in config else 0
self.effort = config['effort'] if 'effort' in config else -1
self.n_jobs = self.config.get('hyperopt_jobs', -1)
self.effort = self.config['effort'] if 'effort' in self.config else 0
self.total_epochs = self.config['epochs'] if 'epochs' in self.config else 0
self.max_epoch = 0
self.search_space_size = 0
self.max_epoch_reached = False
self.min_epochs = 0
self.epochs_limit = lambda: self.total_epochs or self.max_epoch
self.min_epochs = INITIAL_POINTS
self.current_best_loss = 100
# a guessed number extracted by the space dimensions
self.search_space_size = 0
# total number of candles being backtested
self.n_samples = 0
self.current_best_loss = inf
self.current_best_epoch = 0
self.epochs_since_last_best: List = []
self.avg_best_occurrence = 0
@ -93,16 +114,35 @@ class Hyperopt:
self.num_trials_saved = 0
# Previous evaluations
# evaluations
self.trials: List = []
# optimizers
self.opts: List[Optimizer] = []
self.opt: Optimizer = None
self.opt: Optimizer
self.opt = None
self.f_val: List = []
self.to_ask: deque
self.to_ask = deque()
self.tell: Callable
self.tell = None
if 'multi_opt' in self.config and self.config['multi_opt']:
self.multi = True
backend.manager = Manager()
backend.optimizers = backend.manager.Queue()
backend.results_board = backend.manager.Queue(maxsize=1)
backend.results_board.put([])
self.opt_base_estimator = 'GBRT'
self.opt_acq_optimizer = 'sampling'
default_n_points = 2
else:
backend.manager = Manager()
backend.results = backend.manager.Queue()
self.multi = False
self.opt_base_estimator = 'GP'
self.opt_acq_optimizer = 'lbfgs'
default_n_points = 1
# in single opt assume runs are expensive so default to 1 point per ask
self.n_points = self.config.get('points_per_opt', default_n_points)
if self.n_points < 1:
self.n_points = 1
self.opt_base_estimator = 'DUMMY'
self.opt_acq_optimizer = 'sampling'
# Populate functions here (hasattr is slow so should not be run during "regular" operations)
if hasattr(self.custom_hyperopt, 'populate_indicators'):
@ -142,7 +182,7 @@ class Hyperopt:
"""
Remove hyperopt pickle files to restart hyperopt.
"""
for f in [self.tickerdata_pickle, self.trials_file]:
for f in [self.tickerdata_pickle, self.trials_file, self.opts_file]:
p = Path(f)
if p.is_file():
logger.info(f"Removing `{p}`.")
@ -171,10 +211,30 @@ class Hyperopt:
logger.info(f"Saving {num_trials} {plural(num_trials, 'epoch')}.")
dump(self.trials, self.trials_file)
self.num_trials_saved = num_trials
self.save_opts()
if final:
logger.info(f"{num_trials} {plural(num_trials, 'epoch')} "
f"saved to '{self.trials_file}'.")
def save_opts(self) -> None:
""" Save optimizers state to disk. The minimum required state could also be constructed
from the attributes [ models, space, rng ] with Xi, yi loaded from trials """
# synchronize with saved trials
opts = []
n_opts = 0
if self.multi:
while not backend.optimizers.empty():
opts.append(backend.optimizers.get())
n_opts = len(opts)
for opt in opts:
backend.optimizers.put(opt)
else:
if self.opt:
n_opts = 1
opts = [self.opt]
logger.info(f"Saving {n_opts} {plural(n_opts, 'optimizer')}.")
dump(opts, self.opts_file)
@staticmethod
def _read_trials(trials_file: Path) -> List:
"""
@ -280,8 +340,8 @@ class Hyperopt:
"""
is_best = results['is_best']
if self.print_all or is_best:
self.print_results_explanation(results, self.total_epochs or self.max_epoch,
self.print_all, self.print_colorized)
self.print_results_explanation(results, self.epochs_limit(), self.print_all,
self.print_colorized)
@staticmethod
def print_results_explanation(results, total_epochs, highlight_best: bool,
@ -345,7 +405,7 @@ class Hyperopt:
return spaces
def generate_optimizer(self, raw_params: List[Any], iteration=None) -> Dict:
def backtest_params(self, raw_params: List[Any], iteration=None) -> Dict:
"""
Used Optimize function. Called once per epoch to optimize whatever is configured.
Keep this function as optimized as possible!
@ -417,7 +477,6 @@ class Hyperopt:
'results_metrics': results_metrics,
'results_explanation': results_explanation,
'total_profit': total_profit,
'asked': raw_params,
}
def _calculate_results_metrics(self, backtesting_results: DataFrame) -> Dict:
@ -441,54 +500,134 @@ class Hyperopt:
f"Avg duration {results_metrics['duration']:5.1f} min.").encode(
locale.getpreferredencoding(), 'replace').decode('utf-8')
def get_optimizer(self, dimensions: List[Dimension], cpu_count,
def get_next_point_strategy(self):
""" Choose a strategy randomly among the supported ones, used in multi opt mode
to increase the diversion of the searches of each optimizer """
return NEXT_POINT_METHODS[random.randrange(0, NEXT_POINT_METHODS_LENGTH)]
def get_optimizer(self,
dimensions: List[Dimension],
n_jobs: int,
n_initial_points=INITIAL_POINTS) -> Optimizer:
" Construct an optimizer object "
# https://github.com/scikit-learn/scikit-learn/issues/14265
# lbfgs uses joblib threading backend so n_jobs has to be reduced
# to avoid oversubscription
if self.opt_acq_optimizer == 'lbfgs':
n_jobs = 1
return Optimizer(
dimensions,
base_estimator="ET",
acq_optimizer="auto",
base_estimator=self.opt_base_estimator,
acq_optimizer=self.opt_acq_optimizer,
n_initial_points=n_initial_points,
acq_optimizer_kwargs={'n_jobs': cpu_count},
acq_optimizer_kwargs={'n_jobs': n_jobs},
acq_func_kwargs={
'xi': 0.00001,
'kappa': 0.00001
},
model_queue_size=SKOPT_MODELS_MAX_NUM,
random_state=self.random_state,
)
def run_optimizer_parallel(self, parallel: Parallel, tries: int, first_try: int,
def run_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int,
jobs: int) -> List:
result = parallel(
delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, i)
for asked, i in zip(self.opt_generator(jobs, tries), range(
first_try, first_try + tries)))
delayed(wrap_non_picklable_objects(self.parallel_objective))(asked, backend.results, i)
for asked, i in zip(self.opt_ask_and_tell(jobs, tries),
range(first_try, first_try + tries)))
return result
def opt_generator(self, jobs: int, tries: int):
while True:
if self.f_val:
# print("opt.tell(): ", [v['asked'] for v in self.f_val],
# [v['loss'] for v in self.f_val])
self.tell = functools.partial(self.opt.tell, [v['asked'] for v in self.f_val],
[v['loss'] for v in self.f_val])
self.f_val = []
def run_multi_backtest_parallel(self, parallel: Parallel, tries: int, first_try: int,
jobs: int) -> List:
results = parallel(
delayed(wrap_non_picklable_objects(self.parallel_opt_objective))(
i, backend.optimizers, jobs, backend.results_board)
for i in range(first_try, first_try + tries))
return functools.reduce(lambda x, y: [*x, *y], results)
if not self.to_ask:
def opt_ask_and_tell(self, jobs: int, tries: int):
""" loop to manager optimizer state in single optimizer mode """
vals = []
to_ask: deque = deque()
evald: List[List] = []
fit = False
for r in range(tries):
while not backend.results.empty():
vals.append(backend.results.get())
if vals:
self.opt.tell([list(v['params_dict'].values()) for v in vals],
[v['loss'] for v in vals],
fit=fit)
if fit:
fit = False
vals = []
if not to_ask:
self.opt.update_next()
self.to_ask.extend(self.opt.ask(n_points=tries))
self.fit = True
yield self.to_ask.popleft()
# yield self.opt.ask()
to_ask.extend(self.opt.ask(n_points=self.n_points))
fit = True
a = to_ask.popleft()
if a in evald:
logger.info('this point was evaluated before...')
evald.append(a)
yield a
def parallel_objective(self, asked, n):
def parallel_opt_objective(self, n: int, optimizers: Queue, jobs: int, results_board: Queue):
self.log_results_immediate(n)
return self.generate_optimizer(asked)
# fetch an optimizer instance
opt = optimizers.get()
# tell new points if any
results = results_board.get()
past_Xi = []
past_yi = []
for idx, res in enumerate(results):
unsubscribe = False
vals = res[0] # res[1] is the counter
for v in vals:
if list(v['params_dict'].values()) not in opt.Xi:
past_Xi.append(list(v['params_dict'].values()))
past_yi.append(v['loss'])
# decrease counter
if not unsubscribe:
unsubscribe = True
if unsubscribe:
results[idx][1] -= 1
if results[idx][1] < 1:
del results[idx]
# put back the updated results
results_board.put(results)
if len(past_Xi) > 0:
opt.tell(past_Xi, past_yi, fit=False)
opt.update_next()
# ask for points according to config
asked = opt.ask(n_points=self.n_points, strategy=self.get_next_point_strategy())
# run the backtest for each point
f_val = [self.backtest_params(e) for e in asked]
# tell the optimizer the results
Xi = [list(v['params_dict'].values()) for v in f_val]
yi = [v['loss'] for v in f_val]
opt.tell(Xi, yi, fit=False)
# update the board with the new results
results = results_board.get()
results.append([f_val, jobs - 1])
results_board.put(results)
# send back the updated optimizer
optimizers.put(opt)
return f_val
def parallel_objective(self, asked, results: Queue, n=0):
self.log_results_immediate(n)
v = self.backtest_params(asked)
results.put(v)
return v
def parallel_callback(self, f_val):
if self.tell:
self.tell(fit=self.fit)
self.tell = None
self.fit = False
""" Executed after each epoch evaluation to collect the results """
self.f_val.extend(f_val)
def log_results_immediate(self, n) -> None:
""" Signals that a new job has been scheduled"""
print('.', end='')
sys.stdout.flush()
@ -510,10 +649,10 @@ class Hyperopt:
self.update_max_epoch(v, current)
self.print_results(v)
self.trials.append(v)
# Save results after every batch
# Save results and optimizersafter every batch
self.save_trials()
# give up if no best since max epochs
if current + 1 > (total_epochs or self.max_epoch):
if current + 1 > self.epochs_limit():
self.max_epoch_reached = True
@staticmethod
@ -531,11 +670,25 @@ class Hyperopt:
logger.info(f"Loaded {len(trials)} previous evaluations from disk.")
return trials
@staticmethod
def load_previous_optimizers(opts_file: Path) -> List:
""" Load the state of previous optimizers from file """
opts: List[Optimizer] = []
if opts_file.is_file() and opts_file.stat().st_size > 0:
opts = load(opts_file)
n_opts = len(opts)
if n_opts > 0 and type(opts[-1]) != Optimizer:
raise OperationalException("The file storing optimizers state might be corrupted "
"and cannot be loaded.")
else:
logger.info(f"Loaded {n_opts} previous {plural(n_opts, 'optimizer')} from disk.")
return opts
def _set_random_state(self, random_state: Optional[int]) -> int:
return random_state or random.randint(1, 2**16 - 1)
@staticmethod
def calc_epochs(dimensions: List[Dimension], config_jobs: int, effort: int, total_epochs: int):
def calc_epochs(dimensions: List[Dimension], n_jobs: int, effort: float, total_epochs: int):
""" Compute a reasonable number of initial points and
a minimum number of epochs to evaluate """
n_dimensions = len(dimensions)
@ -553,21 +706,21 @@ class Hyperopt:
search_space_size = (factorial(n_parameters) /
(factorial(n_parameters - n_dimensions) * factorial(n_dimensions)))
# logger.info(f'Search space size: {search_space_size}')
if search_space_size < config_jobs:
if search_space_size < n_jobs:
# don't waste if the space is small
n_initial_points = config_jobs
n_initial_points = n_jobs
elif total_epochs > 0:
n_initial_points = total_epochs // 3 if total_epochs > config_jobs * 3 else config_jobs
n_initial_points = total_epochs // 3 if total_epochs > n_jobs * 3 else n_jobs
min_epochs = n_initial_points
else:
# extract coefficients from the search space and the jobs count
log_sss = int(log(search_space_size, 10))
log_jobs = int(log(config_jobs, 2)) if config_jobs > 4 else 2
log_jobs = int(log(n_jobs, 2)) if n_jobs > 4 else 2
jobs_ip = log_jobs * log_sss
# never waste
n_initial_points = log_sss if jobs_ip > search_space_size else jobs_ip
# it shall run for this much, I say
min_epochs = int(max(2 * n_initial_points, 3 * config_jobs) * (1 + effort / 10))
min_epochs = int(max(n_initial_points, n_jobs) * (1 + effort) + n_initial_points)
return n_initial_points, min_epochs, search_space_size
def update_max_epoch(self, val: Dict, current: int):
@ -580,11 +733,40 @@ class Hyperopt:
self.current_best_epoch = current
self.max_epoch = int(
(self.current_best_epoch + self.avg_best_occurrence + self.min_epochs) *
(1 + self.effort / 10))
(1 + self.effort))
if self.max_epoch > self.search_space_size:
self.max_epoch = self.search_space_size
print()
logger.info(f'Max epochs set to: {self.max_epoch}')
logger.info(f'Max epochs set to: {self.epochs_limit()}')
def setup_optimizers(self):
""" Setup the optimizers objects, try to load from disk, or create new ones """
# try to load previous optimizers
self.opts = self.load_previous_optimizers(self.opts_file)
if self.multi:
if len(self.opts) == self.n_jobs:
# put the restored optimizers in the queue and clear them from the object
for opt in self.opts:
backend.optimizers.put(opt)
else: # or generate new optimizers
opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points)
# reduce random points by the number of optimizers
self.n_initial_points = self.n_initial_points // self.n_jobs
for _ in range(self.n_jobs): # generate optimizers
# random state is preserved
backend.optimizers.put(
opt.copy(random_state=opt.rng.randint(0,
iinfo(int32).max)))
del opt
else:
# if we have more than 1 optimizer but are using single opt,
# pick one discard the rest...
if len(self.opts) > 0:
self.opt = self.opts[-1]
del self.opts
else:
self.opt = self.get_optimizer(self.dimensions, self.n_jobs, self.n_initial_points)
def start(self) -> None:
self.random_state = self._set_random_state(self.config.get('hyperopt_random_state', None))
@ -597,6 +779,7 @@ class Hyperopt:
# Trim startup period from analyzed dataframe
for pair, df in preprocessed.items():
preprocessed[pair] = trim_dataframe(df, timerange)
self.n_samples += len(preprocessed[pair])
min_date, max_date = get_timerange(data)
logger.info('Hyperopting with data from %s up to %s (%s days)..', min_date.isoformat(),
@ -608,47 +791,49 @@ class Hyperopt:
self.trials = self.load_previous_results(self.trials_file)
cpus = cpu_count()
logger.info(f"Found {cpus} CPU cores. Let's make them scream!")
config_jobs = self.config.get('hyperopt_jobs', -1)
logger.info(f'Number of parallel jobs set as: {config_jobs}')
logger.info(f"Found {cpu_count()} CPU cores. Let's make them scream!")
logger.info(f'Number of parallel jobs set as: {self.n_jobs}')
self.dimensions: List[Dimension] = self.hyperopt_space()
self.n_initial_points, self.min_epochs, self.search_space_size = self.calc_epochs(
self.dimensions, config_jobs, self.effort, self.total_epochs)
self.dimensions, self.n_jobs, self.effort, self.total_epochs)
logger.info(f"Min epochs set to: {self.min_epochs}")
if self.total_epochs < 1:
self.max_epoch = int(self.min_epochs + len(self.trials))
else:
self.max_epoch = self.n_initial_points
self.avg_best_occurrence = self.min_epochs
self.avg_best_occurrence = self.min_epochs // self.n_jobs
logger.info(f'Initial points: {self.n_initial_points}')
self.opt = self.get_optimizer(self.dimensions, config_jobs, self.n_initial_points)
if self.print_colorized:
colorama_init(autoreset=True)
self.setup_optimizers()
try:
register_parallel_backend('custom', CustomImmediateResultBackend)
with parallel_backend('custom'):
with Parallel(n_jobs=config_jobs, verbose=0) as parallel:
if self.multi:
jobs_scheduler = self.run_multi_backtest_parallel
else:
jobs_scheduler = self.run_backtest_parallel
with parallel_backend('loky', inner_max_num_threads=2):
with Parallel(n_jobs=self.n_jobs, verbose=0, backend='loky') as parallel:
while True:
# update epochs count
epochs_so_far = len(self.trials)
# pad the frame length to the number of jobs to avoid desaturation
frame_len = (self.avg_best_occurrence + config_jobs -
self.avg_best_occurrence % config_jobs)
# pad the batch length to the number of jobs to avoid desaturation
batch_len = (self.avg_best_occurrence + self.n_jobs -
self.avg_best_occurrence % self.n_jobs)
# when using multiple optimizers each worker performs
# n_points (epochs) in 1 dispatch but this reduces the batch len too much
# if self.multi: batch_len = batch_len // self.n_points
# don't go over the limit
if epochs_so_far + frame_len > (self.total_epochs or self.max_epoch):
frame_len = (self.total_epochs or self.max_epoch) - epochs_so_far
if epochs_so_far + batch_len > self.epochs_limit():
batch_len = self.epochs_limit() - epochs_so_far
print(
f"{epochs_so_far+1}-{epochs_so_far+frame_len}"
f"/{self.total_epochs}: ",
f"{epochs_so_far+1}-{epochs_so_far+batch_len}"
f"/{self.epochs_limit()}: ",
end='')
f_val = self.run_optimizer_parallel(parallel, frame_len, epochs_so_far,
config_jobs)
self.log_results(f_val, epochs_so_far, self.total_epochs or self.max_epoch)
f_val = jobs_scheduler(parallel, batch_len, epochs_so_far, self.n_jobs)
self.log_results(f_val, epochs_so_far, self.epochs_limit())
if self.max_epoch_reached:
logger.info("Max epoch reached, terminating.")
break

View File

@ -1,31 +1,13 @@
from joblib._parallel_backends import LokyBackend
from typing import Any
from queue import Queue
from multiprocessing.managers import SyncManager
hyperopt: Any = None
class MultiCallback:
def __init__(self, *callbacks):
self.callbacks = [cb for cb in callbacks if cb]
def __call__(self, out):
for cb in self.callbacks:
cb(out)
class CustomImmediateResultBackend(LokyBackend):
def callback(self, result):
"""
Our custom completion callback. Executed in the parent process.
Use it to run Optimizer.tell() with immediate results of the backtest()
evaluated in the joblib worker process.
"""
if not result.exception():
# Fetch results from the Future object passed to us.
# Future object is assumed to be 'done' already.
f_val = result.result().copy()
hyperopt.parallel_callback(f_val)
def apply_async(self, func, callback=None):
cbs = MultiCallback(callback, self.callback)
return super().apply_async(func, cbs)
manager: SyncManager
# stores the optimizers in multi opt mode
optimizers: Queue
# stores a list of the results to share between optimizers
# each result is a tuple of the params_dict and a decreasing counter
results_board: Queue
# store the results in single opt mode
results: Queue

File diff suppressed because it is too large Load Diff