434 lines
18 KiB
Python
434 lines
18 KiB
Python
"""
|
|
This module contains the configuration class
|
|
"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from argparse import Namespace
|
|
from typing import Any, Callable, Dict, Optional
|
|
|
|
from jsonschema import Draft4Validator, validators
|
|
from jsonschema.exceptions import ValidationError, best_match
|
|
|
|
from freqtrade import OperationalException, constants
|
|
from freqtrade.exchange import (is_exchange_bad, is_exchange_available,
|
|
is_exchange_officially_supported, available_exchanges)
|
|
from freqtrade.loggers import setup_logging
|
|
from freqtrade.misc import deep_merge_dicts
|
|
from freqtrade.state import RunMode
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _extend_validator(validator_class):
|
|
"""
|
|
Extended validator for the Freqtrade configuration JSON Schema.
|
|
Currently it only handles defaults for subschemas.
|
|
"""
|
|
validate_properties = validator_class.VALIDATORS['properties']
|
|
|
|
def set_defaults(validator, properties, instance, schema):
|
|
for prop, subschema in properties.items():
|
|
if 'default' in subschema:
|
|
instance.setdefault(prop, subschema['default'])
|
|
|
|
for error in validate_properties(
|
|
validator, properties, instance, schema,
|
|
):
|
|
yield error
|
|
|
|
return validators.extend(
|
|
validator_class, {'properties': set_defaults}
|
|
)
|
|
|
|
|
|
FreqtradeValidator = _extend_validator(Draft4Validator)
|
|
|
|
|
|
class Configuration(object):
|
|
"""
|
|
Class to read and init the bot configuration
|
|
Reuse this class for the bot, backtesting, hyperopt and every script that required configuration
|
|
"""
|
|
|
|
def __init__(self, args: Namespace, runmode: RunMode = None) -> None:
|
|
self.args = args
|
|
self.config: Optional[Dict[str, Any]] = None
|
|
self.runmode = runmode
|
|
|
|
def load_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Extract information for sys.argv and load the bot configuration
|
|
:return: Configuration dictionary
|
|
"""
|
|
config: Dict[str, Any] = {}
|
|
# Now expecting a list of config filenames here, not a string
|
|
for path in self.args.config:
|
|
logger.info('Using config: %s ...', path)
|
|
|
|
# Merge config options, overwriting old values
|
|
config = deep_merge_dicts(self._load_config_file(path), config)
|
|
|
|
if 'internals' not in config:
|
|
config['internals'] = {}
|
|
|
|
logger.info('Validating configuration ...')
|
|
self._validate_config_schema(config)
|
|
self._validate_config_consistency(config)
|
|
|
|
# Set strategy if not specified in config and or if it's non default
|
|
if self.args.strategy != constants.DEFAULT_STRATEGY or not config.get('strategy'):
|
|
config.update({'strategy': self.args.strategy})
|
|
|
|
if self.args.strategy_path:
|
|
config.update({'strategy_path': self.args.strategy_path})
|
|
|
|
# Load Common configuration
|
|
config = self._load_common_config(config)
|
|
|
|
# Load Optimize configurations
|
|
config = self._load_optimize_config(config)
|
|
|
|
# Add plotting options if available
|
|
config = self._load_plot_config(config)
|
|
|
|
# Set runmode
|
|
if not self.runmode:
|
|
# Handle real mode, infer dry/live from config
|
|
self.runmode = RunMode.DRY_RUN if config.get('dry_run', True) else RunMode.LIVE
|
|
|
|
config.update({'runmode': self.runmode})
|
|
|
|
return config
|
|
|
|
def _load_config_file(self, path: str) -> Dict[str, Any]:
|
|
"""
|
|
Loads a config file from the given path
|
|
:param path: path as str
|
|
:return: configuration as dictionary
|
|
"""
|
|
try:
|
|
# Read config from stdin if requested in the options
|
|
with open(path) if path != '-' else sys.stdin as file:
|
|
conf = json.load(file)
|
|
except FileNotFoundError:
|
|
raise OperationalException(
|
|
f'Config file "{path}" not found!'
|
|
' Please create a config file or check whether it exists.')
|
|
|
|
return conf
|
|
|
|
def _load_logging_config(self, config: Dict[str, Any]) -> None:
|
|
"""
|
|
Extract information for sys.argv and load logging configuration:
|
|
the -v/--verbose, --logfile options
|
|
"""
|
|
# Log level
|
|
if 'verbosity' in self.args and self.args.verbosity:
|
|
config.update({'verbosity': self.args.verbosity})
|
|
else:
|
|
config.update({'verbosity': 0})
|
|
|
|
if 'logfile' in self.args and self.args.logfile:
|
|
config.update({'logfile': self.args.logfile})
|
|
|
|
setup_logging(config)
|
|
|
|
def _load_common_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract information for sys.argv and load common configuration
|
|
:return: configuration as dictionary
|
|
"""
|
|
self._load_logging_config(config)
|
|
|
|
# Support for sd_notify
|
|
if 'sd_notify' in self.args and self.args.sd_notify:
|
|
config['internals'].update({'sd_notify': True})
|
|
|
|
# Add dynamic_whitelist if found
|
|
if 'dynamic_whitelist' in self.args and self.args.dynamic_whitelist:
|
|
# Update to volumePairList (the previous default)
|
|
config['pairlist'] = {'method': 'VolumePairList',
|
|
'config': {'number_assets': self.args.dynamic_whitelist}
|
|
}
|
|
logger.warning(
|
|
'Parameter --dynamic-whitelist has been deprecated, '
|
|
'and will be completely replaced by the whitelist dict in the future. '
|
|
'For now: using dynamically generated whitelist based on VolumePairList. '
|
|
'(not applicable with Backtesting and Hyperopt)'
|
|
)
|
|
|
|
if ('db_url' in self.args and self.args.db_url and
|
|
self.args.db_url != constants.DEFAULT_DB_PROD_URL):
|
|
config.update({'db_url': self.args.db_url})
|
|
logger.info('Parameter --db-url detected ...')
|
|
|
|
if config.get('dry_run', False):
|
|
logger.info('Dry run is enabled')
|
|
if config.get('db_url') in [None, constants.DEFAULT_DB_PROD_URL]:
|
|
# Default to in-memory db for dry_run if not specified
|
|
config['db_url'] = constants.DEFAULT_DB_DRYRUN_URL
|
|
else:
|
|
if not config.get('db_url', None):
|
|
config['db_url'] = constants.DEFAULT_DB_PROD_URL
|
|
logger.info('Dry run is disabled')
|
|
|
|
if config.get('forcebuy_enable', False):
|
|
logger.warning('`forcebuy` RPC message enabled.')
|
|
|
|
# Setting max_open_trades to infinite if -1
|
|
if config.get('max_open_trades') == -1:
|
|
config['max_open_trades'] = float('inf')
|
|
|
|
logger.info(f'Using DB: "{config["db_url"]}"')
|
|
|
|
# Check if the exchange set by the user is supported
|
|
self.check_exchange(config)
|
|
|
|
return config
|
|
|
|
def _create_datadir(self, config: Dict[str, Any], datadir: Optional[str] = None) -> str:
|
|
if not datadir:
|
|
# set datadir
|
|
exchange_name = config.get('exchange', {}).get('name').lower()
|
|
datadir = os.path.join('user_data', 'data', exchange_name)
|
|
|
|
if not os.path.isdir(datadir):
|
|
os.makedirs(datadir)
|
|
logger.info(f'Created data directory: {datadir}')
|
|
return datadir
|
|
|
|
def _args_to_config(self, config: Dict[str, Any], argname: str,
|
|
logstring: str, logfun: Optional[Callable] = None) -> None:
|
|
"""
|
|
:param config: Configuration dictionary
|
|
:param argname: Argumentname in self.args - will be copied to config dict.
|
|
:param logstring: Logging String
|
|
:param logfun: logfun is applied to the configuration entry before passing
|
|
that entry to the log string using .format().
|
|
sample: logfun=len (prints the length of the found
|
|
configuration instead of the content)
|
|
"""
|
|
if argname in self.args and getattr(self.args, argname):
|
|
|
|
config.update({argname: getattr(self.args, argname)})
|
|
if logfun:
|
|
logger.info(logstring.format(logfun(config[argname])))
|
|
else:
|
|
logger.info(logstring.format(config[argname]))
|
|
|
|
def _load_datadir_config(self, config: Dict[str, Any]) -> None:
|
|
"""
|
|
Extract information for sys.argv and load datadir configuration:
|
|
the --datadir option
|
|
"""
|
|
if 'datadir' in self.args and self.args.datadir:
|
|
config.update({'datadir': self._create_datadir(config, self.args.datadir)})
|
|
else:
|
|
config.update({'datadir': self._create_datadir(config, None)})
|
|
logger.info('Using data directory: %s ...', config.get('datadir'))
|
|
|
|
def _load_optimize_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract information for sys.argv and load Optimize configuration
|
|
:return: configuration as dictionary
|
|
"""
|
|
|
|
# This will override the strategy configuration
|
|
self._args_to_config(config, argname='ticker_interval',
|
|
logstring='Parameter -i/--ticker-interval detected ... '
|
|
'Using ticker_interval: {} ...')
|
|
|
|
self._args_to_config(config, argname='live',
|
|
logstring='Parameter -l/--live detected ...')
|
|
|
|
self._args_to_config(config, argname='position_stacking',
|
|
logstring='Parameter --enable-position-stacking detected ...')
|
|
|
|
if 'use_max_market_positions' in self.args and not self.args.use_max_market_positions:
|
|
config.update({'use_max_market_positions': False})
|
|
logger.info('Parameter --disable-max-market-positions detected ...')
|
|
logger.info('max_open_trades set to unlimited ...')
|
|
elif 'max_open_trades' in self.args and self.args.max_open_trades:
|
|
config.update({'max_open_trades': self.args.max_open_trades})
|
|
logger.info('Parameter --max_open_trades detected, '
|
|
'overriding max_open_trades to: %s ...', config.get('max_open_trades'))
|
|
else:
|
|
logger.info('Using max_open_trades: %s ...', config.get('max_open_trades'))
|
|
|
|
self._args_to_config(config, argname='stake_amount',
|
|
logstring='Parameter --stake_amount detected, '
|
|
'overriding stake_amount to: {} ...')
|
|
|
|
self._args_to_config(config, argname='timerange',
|
|
logstring='Parameter --timerange detected: {} ...')
|
|
|
|
self._load_datadir_config(config)
|
|
|
|
self._args_to_config(config, argname='refresh_pairs',
|
|
logstring='Parameter -r/--refresh-pairs-cached detected ...')
|
|
|
|
self._args_to_config(config, argname='strategy_list',
|
|
logstring='Using strategy list of {} Strategies', logfun=len)
|
|
|
|
self._args_to_config(config, argname='ticker_interval',
|
|
logstring='Overriding ticker interval with Command line argument')
|
|
|
|
self._args_to_config(config, argname='export',
|
|
logstring='Parameter --export detected: {} ...')
|
|
|
|
self._args_to_config(config, argname='exportfilename',
|
|
logstring='Storing backtest results to {} ...')
|
|
|
|
# Edge section:
|
|
if 'stoploss_range' in self.args and self.args.stoploss_range:
|
|
txt_range = eval(self.args.stoploss_range)
|
|
config['edge'].update({'stoploss_range_min': txt_range[0]})
|
|
config['edge'].update({'stoploss_range_max': txt_range[1]})
|
|
config['edge'].update({'stoploss_range_step': txt_range[2]})
|
|
logger.info('Parameter --stoplosses detected: %s ...', self.args.stoploss_range)
|
|
|
|
# Hyperopt section
|
|
self._args_to_config(config, argname='hyperopt',
|
|
logstring='Using Hyperopt file {}')
|
|
|
|
self._args_to_config(config, argname='epochs',
|
|
logstring='Parameter --epochs detected ... '
|
|
'Will run Hyperopt with for {} epochs ...'
|
|
)
|
|
|
|
self._args_to_config(config, argname='spaces',
|
|
logstring='Parameter -s/--spaces detected: {}')
|
|
|
|
self._args_to_config(config, argname='print_all',
|
|
logstring='Parameter --print-all detected ...')
|
|
|
|
self._args_to_config(config, argname='hyperopt_jobs',
|
|
logstring='Parameter -j/--job-workers detected: {}')
|
|
|
|
self._args_to_config(config, argname='hyperopt_random_state',
|
|
logstring='Parameter --random-state detected: {}')
|
|
|
|
self._args_to_config(config, argname='hyperopt_min_trades',
|
|
logstring='Parameter --min-trades detected: {}')
|
|
|
|
return config
|
|
|
|
def _load_plot_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract information for sys.argv Plotting configuration
|
|
:return: configuration as dictionary
|
|
"""
|
|
|
|
self._args_to_config(config, argname='pairs',
|
|
logstring='Using pairs {}')
|
|
|
|
self._args_to_config(config, argname='indicators1',
|
|
logstring='Using indicators1: {}')
|
|
|
|
self._args_to_config(config, argname='indicators2',
|
|
logstring='Using indicators2: {}')
|
|
|
|
self._args_to_config(config, argname='plot_limit',
|
|
logstring='Limiting plot to: {}')
|
|
self._args_to_config(config, argname='trade_source',
|
|
logstring='Using trades from: {}')
|
|
return config
|
|
|
|
def _validate_config_schema(self, conf: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Validate the configuration follow the Config Schema
|
|
:param conf: Config in JSON format
|
|
:return: Returns the config if valid, otherwise throw an exception
|
|
"""
|
|
try:
|
|
FreqtradeValidator(constants.CONF_SCHEMA).validate(conf)
|
|
return conf
|
|
except ValidationError as exception:
|
|
logger.critical(
|
|
'Invalid configuration. See config.json.example. Reason: %s',
|
|
exception
|
|
)
|
|
raise ValidationError(
|
|
best_match(Draft4Validator(constants.CONF_SCHEMA).iter_errors(conf)).message
|
|
)
|
|
|
|
def _validate_config_consistency(self, conf: Dict[str, Any]) -> None:
|
|
"""
|
|
Validate the configuration consistency
|
|
:param conf: Config in JSON format
|
|
:return: Returns None if everything is ok, otherwise throw an OperationalException
|
|
"""
|
|
|
|
# validating trailing stoploss
|
|
self._validate_trailing_stoploss(conf)
|
|
|
|
def _validate_trailing_stoploss(self, conf: Dict[str, Any]) -> None:
|
|
# Skip if trailing stoploss is not activated
|
|
if not conf.get('trailing_stop', False):
|
|
return
|
|
|
|
tsl_positive = float(conf.get('trailing_stop_positive', 0))
|
|
tsl_offset = float(conf.get('trailing_stop_positive_offset', 0))
|
|
tsl_only_offset = conf.get('trailing_only_offset_is_reached', False)
|
|
|
|
if tsl_only_offset:
|
|
if tsl_positive == 0.0:
|
|
raise OperationalException(
|
|
f'The config trailing_only_offset_is_reached needs '
|
|
'trailing_stop_positive_offset to be more than 0 in your config.')
|
|
if tsl_positive > 0 and 0 < tsl_offset <= tsl_positive:
|
|
raise OperationalException(
|
|
f'The config trailing_stop_positive_offset needs '
|
|
'to be greater than trailing_stop_positive_offset in your config.')
|
|
|
|
def get_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Return the config. Use this method to get the bot config
|
|
:return: Dict: Bot config
|
|
"""
|
|
if self.config is None:
|
|
self.config = self.load_config()
|
|
|
|
return self.config
|
|
|
|
def check_exchange(self, config: Dict[str, Any], check_for_bad: bool = True) -> bool:
|
|
"""
|
|
Check if the exchange name in the config file is supported by Freqtrade
|
|
:param check_for_bad: if True, check the exchange against the list of known 'bad'
|
|
exchanges
|
|
:return: False if exchange is 'bad', i.e. is known to work with the bot with
|
|
critical issues or does not work at all, crashes, etc. True otherwise.
|
|
raises an exception if the exchange if not supported by ccxt
|
|
and thus is not known for the Freqtrade at all.
|
|
"""
|
|
logger.info("Checking exchange...")
|
|
|
|
exchange = config.get('exchange', {}).get('name').lower()
|
|
if not is_exchange_available(exchange):
|
|
raise OperationalException(
|
|
f'Exchange "{exchange}" is not supported by ccxt '
|
|
f'and therefore not available for the bot.\n'
|
|
f'The following exchanges are supported by ccxt: '
|
|
f'{", ".join(available_exchanges())}'
|
|
)
|
|
|
|
if check_for_bad and is_exchange_bad(exchange):
|
|
logger.warning(f'Exchange "{exchange}" is known to not work with the bot yet. '
|
|
f'Use it only for development and testing purposes.')
|
|
return False
|
|
|
|
if is_exchange_officially_supported(exchange):
|
|
logger.info(f'Exchange "{exchange}" is officially supported '
|
|
f'by the Freqtrade development team.')
|
|
else:
|
|
logger.warning(f'Exchange "{exchange}" is supported by ccxt '
|
|
f'and therefore available for the bot but not officially supported '
|
|
f'by the Freqtrade development team. '
|
|
f'It may work flawlessly (please report back) or have serious issues. '
|
|
f'Use it at your own discretion.')
|
|
|
|
return True
|