Seperate a few commands into specific files

This commit is contained in:
Matthias 2020-01-26 13:08:58 +01:00
parent 6e85280467
commit 926bf07df1
6 changed files with 419 additions and 362 deletions

View File

@ -1,6 +1,10 @@
from .utils import (setup_utils_configuration, start_create_userdir, # noqa: F401
start_download_data, start_hyperopt_list,
start_hyperopt_show, start_list_exchanges,
start_list_markets, start_list_strategies,
start_list_timeframes, start_new_hyperopt,
start_new_strategy, start_test_pairlist, start_trading)
from .hyperopt_commands import (start_hyperopt_list, start_hyperopt_show) # noqa: 401
from .list_commands import (start_list_exchanges, # noqa: F401
start_list_markets, start_list_strategies,
start_list_timeframes)
from .utils import setup_utils_configuration # noqa: F401
from .utils import (start_download_data, # noqa: F401
start_test_pairlist)
from .deploy_commands import (start_new_hyperopt, start_new_strategy, start_create_userdir) # noqa: F401
from .trade_commands import start_trading # noqa: F401

View File

@ -0,0 +1,113 @@
import logging
import sys
from pathlib import Path
from typing import Any, Dict
from freqtrade.configuration.directory_operations import (copy_sample_files,
create_userdata_dir)
from freqtrade.constants import USERPATH_HYPEROPTS, USERPATH_STRATEGY
from freqtrade.exceptions import OperationalException
from freqtrade.misc import render_template
from freqtrade.state import RunMode
from .utils import setup_utils_configuration
logger = logging.getLogger(__name__)
def start_create_userdir(args: Dict[str, Any]) -> None:
"""
Create "user_data" directory to contain user data strategies, hyperopt, ...)
:param args: Cli args from Arguments()
:return: None
"""
if "user_data_dir" in args and args["user_data_dir"]:
userdir = create_userdata_dir(args["user_data_dir"], create_dir=True)
copy_sample_files(userdir, overwrite=args["reset"])
else:
logger.warning("`create-userdir` requires --userdir to be set.")
sys.exit(1)
def deploy_new_strategy(strategy_name, strategy_path: Path, subtemplate: str):
"""
Deploy new strategy from template to strategy_path
"""
indicators = render_template(templatefile=f"subtemplates/indicators_{subtemplate}.j2",)
buy_trend = render_template(templatefile=f"subtemplates/buy_trend_{subtemplate}.j2",)
sell_trend = render_template(templatefile=f"subtemplates/sell_trend_{subtemplate}.j2",)
plot_config = render_template(templatefile=f"subtemplates/plot_config_{subtemplate}.j2",)
strategy_text = render_template(templatefile='base_strategy.py.j2',
arguments={"strategy": strategy_name,
"indicators": indicators,
"buy_trend": buy_trend,
"sell_trend": sell_trend,
"plot_config": plot_config,
})
logger.info(f"Writing strategy to `{strategy_path}`.")
strategy_path.write_text(strategy_text)
def start_new_strategy(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
if "strategy" in args and args["strategy"]:
if args["strategy"] == "DefaultStrategy":
raise OperationalException("DefaultStrategy is not allowed as name.")
new_path = config['user_data_dir'] / USERPATH_STRATEGY / (args["strategy"] + ".py")
if new_path.exists():
raise OperationalException(f"`{new_path}` already exists. "
"Please choose another Strategy Name.")
deploy_new_strategy(args['strategy'], new_path, args['template'])
else:
raise OperationalException("`new-strategy` requires --strategy to be set.")
def deploy_new_hyperopt(hyperopt_name, hyperopt_path: Path, subtemplate: str):
"""
Deploys a new hyperopt template to hyperopt_path
"""
buy_guards = render_template(
templatefile=f"subtemplates/hyperopt_buy_guards_{subtemplate}.j2",)
sell_guards = render_template(
templatefile=f"subtemplates/hyperopt_sell_guards_{subtemplate}.j2",)
buy_space = render_template(
templatefile=f"subtemplates/hyperopt_buy_space_{subtemplate}.j2",)
sell_space = render_template(
templatefile=f"subtemplates/hyperopt_sell_space_{subtemplate}.j2",)
strategy_text = render_template(templatefile='base_hyperopt.py.j2',
arguments={"hyperopt": hyperopt_name,
"buy_guards": buy_guards,
"sell_guards": sell_guards,
"buy_space": buy_space,
"sell_space": sell_space,
})
logger.info(f"Writing hyperopt to `{hyperopt_path}`.")
hyperopt_path.write_text(strategy_text)
def start_new_hyperopt(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
if "hyperopt" in args and args["hyperopt"]:
if args["hyperopt"] == "DefaultHyperopt":
raise OperationalException("DefaultHyperopt is not allowed as name.")
new_path = config['user_data_dir'] / USERPATH_HYPEROPTS / (args["hyperopt"] + ".py")
if new_path.exists():
raise OperationalException(f"`{new_path}` already exists. "
"Please choose another Strategy Name.")
deploy_new_hyperopt(args['hyperopt'], new_path, args['template'])
else:
raise OperationalException("`new-hyperopt` requires --hyperopt to be set.")

View File

@ -0,0 +1,114 @@
import logging
from operator import itemgetter
from typing import Any, Dict, List
from colorama import init as colorama_init
from .utils import setup_utils_configuration
from freqtrade.exceptions import OperationalException
from freqtrade.state import RunMode
logger = logging.getLogger(__name__)
def start_hyperopt_list(args: Dict[str, Any]) -> None:
"""
List hyperopt epochs previously evaluated
"""
from freqtrade.optimize.hyperopt import Hyperopt
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
only_best = config.get('hyperopt_list_best', False)
only_profitable = config.get('hyperopt_list_profitable', False)
print_colorized = config.get('print_colorized', False)
print_json = config.get('print_json', False)
no_details = config.get('hyperopt_list_no_details', False)
no_header = False
trials_file = (config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
# Previous evaluations
trials = Hyperopt.load_previous_results(trials_file)
total_epochs = len(trials)
trials = _hyperopt_filter_trials(trials, only_best, only_profitable)
# TODO: fetch the interval for epochs to print from the cli option
epoch_start, epoch_stop = 0, None
if print_colorized:
colorama_init(autoreset=True)
try:
# Human-friendly indexes used here (starting from 1)
for val in trials[epoch_start:epoch_stop]:
Hyperopt.print_results_explanation(val, total_epochs, not only_best, print_colorized)
except KeyboardInterrupt:
print('User interrupted..')
if trials and not no_details:
sorted_trials = sorted(trials, key=itemgetter('loss'))
results = sorted_trials[0]
Hyperopt.print_epoch_details(results, total_epochs, print_json, no_header)
def start_hyperopt_show(args: Dict[str, Any]) -> None:
"""
Show details of a hyperopt epoch previously evaluated
"""
from freqtrade.optimize.hyperopt import Hyperopt
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
only_best = config.get('hyperopt_list_best', False)
only_profitable = config.get('hyperopt_list_profitable', False)
no_header = config.get('hyperopt_show_no_header', False)
trials_file = (config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
# Previous evaluations
trials = Hyperopt.load_previous_results(trials_file)
total_epochs = len(trials)
trials = _hyperopt_filter_trials(trials, only_best, only_profitable)
trials_epochs = len(trials)
n = config.get('hyperopt_show_index', -1)
if n > trials_epochs:
raise OperationalException(
f"The index of the epoch to show should be less than {trials_epochs + 1}.")
if n < -trials_epochs:
raise OperationalException(
f"The index of the epoch to show should be greater than {-trials_epochs - 1}.")
# Translate epoch index from human-readable format to pythonic
if n > 0:
n -= 1
print_json = config.get('print_json', False)
if trials:
val = trials[n]
Hyperopt.print_epoch_details(val, total_epochs, print_json, no_header,
header_str="Epoch details")
def _hyperopt_filter_trials(trials: List, only_best: bool, only_profitable: bool) -> List:
"""
Filter our items from the list of hyperopt results
"""
if only_best:
trials = [x for x in trials if x['is_best']]
if only_profitable:
trials = [x for x in trials if x['results_metrics']['profit'] > 0]
logger.info(f"{len(trials)} " +
("best " if only_best else "") +
("profitable " if only_profitable else "") +
"epochs found.")
return trials

View File

@ -0,0 +1,157 @@
import csv
import logging
import sys
from collections import OrderedDict
from pathlib import Path
from typing import Any, Dict
import rapidjson
from tabulate import tabulate
from freqtrade.constants import USERPATH_STRATEGY
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import (available_exchanges, ccxt_exchanges,
market_is_active, symbol_is_pair)
from freqtrade.misc import plural
from freqtrade.resolvers import ExchangeResolver, StrategyResolver
from freqtrade.state import RunMode
from .utils import setup_utils_configuration
logger = logging.getLogger(__name__)
def start_list_exchanges(args: Dict[str, Any]) -> None:
"""
Print available exchanges
:param args: Cli args from Arguments()
:return: None
"""
exchanges = ccxt_exchanges() if args['list_exchanges_all'] else available_exchanges()
if args['print_one_column']:
print('\n'.join(exchanges))
else:
if args['list_exchanges_all']:
print(f"All exchanges supported by the ccxt library: {', '.join(exchanges)}")
else:
print(f"Exchanges available for Freqtrade: {', '.join(exchanges)}")
def start_list_strategies(args: Dict[str, Any]) -> None:
"""
Print Strategies available in a directory
"""
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
directory = Path(config.get('strategy_path', config['user_data_dir'] / USERPATH_STRATEGY))
strategies = StrategyResolver.search_all_objects(directory)
# Sort alphabetically
strategies = sorted(strategies, key=lambda x: x['name'])
strats_to_print = [{'name': s['name'], 'location': s['location'].name} for s in strategies]
if args['print_one_column']:
print('\n'.join([s['name'] for s in strategies]))
else:
print(tabulate(strats_to_print, headers='keys', tablefmt='pipe'))
def start_list_timeframes(args: Dict[str, Any]) -> None:
"""
Print ticker intervals (timeframes) available on Exchange
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
# Do not use ticker_interval set in the config
config['ticker_interval'] = None
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
if args['print_one_column']:
print('\n'.join(exchange.timeframes))
else:
print(f"Timeframes available for the exchange `{exchange.name}`: "
f"{', '.join(exchange.timeframes)}")
def start_list_markets(args: Dict[str, Any], pairs_only: bool = False) -> None:
"""
Print pairs/markets on the exchange
:param args: Cli args from Arguments()
:param pairs_only: if True print only pairs, otherwise print all instruments (markets)
:return: None
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
# By default only active pairs/markets are to be shown
active_only = not args.get('list_pairs_all', False)
base_currencies = args.get('base_currencies', [])
quote_currencies = args.get('quote_currencies', [])
try:
pairs = exchange.get_markets(base_currencies=base_currencies,
quote_currencies=quote_currencies,
pairs_only=pairs_only,
active_only=active_only)
# Sort the pairs/markets by symbol
pairs = OrderedDict(sorted(pairs.items()))
except Exception as e:
raise OperationalException(f"Cannot get markets. Reason: {e}") from e
else:
summary_str = ((f"Exchange {exchange.name} has {len(pairs)} ") +
("active " if active_only else "") +
(plural(len(pairs), "pair" if pairs_only else "market")) +
(f" with {', '.join(base_currencies)} as base "
f"{plural(len(base_currencies), 'currency', 'currencies')}"
if base_currencies else "") +
(" and" if base_currencies and quote_currencies else "") +
(f" with {', '.join(quote_currencies)} as quote "
f"{plural(len(quote_currencies), 'currency', 'currencies')}"
if quote_currencies else ""))
headers = ["Id", "Symbol", "Base", "Quote", "Active",
*(['Is pair'] if not pairs_only else [])]
tabular_data = []
for _, v in pairs.items():
tabular_data.append({'Id': v['id'], 'Symbol': v['symbol'],
'Base': v['base'], 'Quote': v['quote'],
'Active': market_is_active(v),
**({'Is pair': symbol_is_pair(v['symbol'])}
if not pairs_only else {})})
if (args.get('print_one_column', False) or
args.get('list_pairs_print_json', False) or
args.get('print_csv', False)):
# Print summary string in the log in case of machine-readable
# regular formats.
logger.info(f"{summary_str}.")
else:
# Print empty string separating leading logs and output in case of
# human-readable formats.
print()
if len(pairs):
if args.get('print_list', False):
# print data as a list, with human-readable summary
print(f"{summary_str}: {', '.join(pairs.keys())}.")
elif args.get('print_one_column', False):
print('\n'.join(pairs.keys()))
elif args.get('list_pairs_print_json', False):
print(rapidjson.dumps(list(pairs.keys()), default=str))
elif args.get('print_csv', False):
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(tabular_data)
else:
# print data as a table, with the human-readable summary
print(f"{summary_str}:")
print(tabulate(tabular_data, headers='keys', tablefmt='pipe'))
elif not (args.get('print_one_column', False) or
args.get('list_pairs_print_json', False) or
args.get('print_csv', False)):
print(f"{summary_str}.")

View File

@ -0,0 +1,25 @@
import logging
from typing import Any, Dict
logger = logging.getLogger(__name__)
def start_trading(args: Dict[str, Any]) -> int:
"""
Main entry point for trading mode
"""
from freqtrade.worker import Worker
# Load and run worker
worker = None
try:
worker = Worker(args)
worker.run()
except KeyboardInterrupt:
logger.info('SIGINT received, aborting ...')
finally:
if worker:
logger.info("worker found ... calling exit")
worker.exit()
return 0

View File

@ -46,139 +46,6 @@ def setup_utils_configuration(args: Dict[str, Any], method: RunMode) -> Dict[str
return config
def start_trading(args: Dict[str, Any]) -> int:
"""
Main entry point for trading mode
"""
from freqtrade.worker import Worker
# Load and run worker
worker = None
try:
worker = Worker(args)
worker.run()
except KeyboardInterrupt:
logger.info('SIGINT received, aborting ...')
finally:
if worker:
logger.info("worker found ... calling exit")
worker.exit()
return 0
def start_list_exchanges(args: Dict[str, Any]) -> None:
"""
Print available exchanges
:param args: Cli args from Arguments()
:return: None
"""
exchanges = ccxt_exchanges() if args['list_exchanges_all'] else available_exchanges()
if args['print_one_column']:
print('\n'.join(exchanges))
else:
if args['list_exchanges_all']:
print(f"All exchanges supported by the ccxt library: {', '.join(exchanges)}")
else:
print(f"Exchanges available for Freqtrade: {', '.join(exchanges)}")
def start_create_userdir(args: Dict[str, Any]) -> None:
"""
Create "user_data" directory to contain user data strategies, hyperopt, ...)
:param args: Cli args from Arguments()
:return: None
"""
if "user_data_dir" in args and args["user_data_dir"]:
userdir = create_userdata_dir(args["user_data_dir"], create_dir=True)
copy_sample_files(userdir, overwrite=args["reset"])
else:
logger.warning("`create-userdir` requires --userdir to be set.")
sys.exit(1)
def deploy_new_strategy(strategy_name, strategy_path: Path, subtemplate: str):
"""
Deploy new strategy from template to strategy_path
"""
indicators = render_template(templatefile=f"subtemplates/indicators_{subtemplate}.j2",)
buy_trend = render_template(templatefile=f"subtemplates/buy_trend_{subtemplate}.j2",)
sell_trend = render_template(templatefile=f"subtemplates/sell_trend_{subtemplate}.j2",)
plot_config = render_template(templatefile=f"subtemplates/plot_config_{subtemplate}.j2",)
strategy_text = render_template(templatefile='base_strategy.py.j2',
arguments={"strategy": strategy_name,
"indicators": indicators,
"buy_trend": buy_trend,
"sell_trend": sell_trend,
"plot_config": plot_config,
})
logger.info(f"Writing strategy to `{strategy_path}`.")
strategy_path.write_text(strategy_text)
def start_new_strategy(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
if "strategy" in args and args["strategy"]:
if args["strategy"] == "DefaultStrategy":
raise OperationalException("DefaultStrategy is not allowed as name.")
new_path = config['user_data_dir'] / USERPATH_STRATEGY / (args["strategy"] + ".py")
if new_path.exists():
raise OperationalException(f"`{new_path}` already exists. "
"Please choose another Strategy Name.")
deploy_new_strategy(args['strategy'], new_path, args['template'])
else:
raise OperationalException("`new-strategy` requires --strategy to be set.")
def deploy_new_hyperopt(hyperopt_name, hyperopt_path: Path, subtemplate: str):
"""
Deploys a new hyperopt template to hyperopt_path
"""
buy_guards = render_template(
templatefile=f"subtemplates/hyperopt_buy_guards_{subtemplate}.j2",)
sell_guards = render_template(
templatefile=f"subtemplates/hyperopt_sell_guards_{subtemplate}.j2",)
buy_space = render_template(
templatefile=f"subtemplates/hyperopt_buy_space_{subtemplate}.j2",)
sell_space = render_template(
templatefile=f"subtemplates/hyperopt_sell_space_{subtemplate}.j2",)
strategy_text = render_template(templatefile='base_hyperopt.py.j2',
arguments={"hyperopt": hyperopt_name,
"buy_guards": buy_guards,
"sell_guards": sell_guards,
"buy_space": buy_space,
"sell_space": sell_space,
})
logger.info(f"Writing hyperopt to `{hyperopt_path}`.")
hyperopt_path.write_text(strategy_text)
def start_new_hyperopt(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
if "hyperopt" in args and args["hyperopt"]:
if args["hyperopt"] == "DefaultHyperopt":
raise OperationalException("DefaultHyperopt is not allowed as name.")
new_path = config['user_data_dir'] / USERPATH_HYPEROPTS / (args["hyperopt"] + ".py")
if new_path.exists():
raise OperationalException(f"`{new_path}` already exists. "
"Please choose another Strategy Name.")
deploy_new_hyperopt(args['hyperopt'], new_path, args['template'])
else:
raise OperationalException("`new-hyperopt` requires --hyperopt to be set.")
def start_download_data(args: Dict[str, Any]) -> None:
"""
Download data (former download_backtest_data.py script)
@ -227,126 +94,6 @@ def start_download_data(args: Dict[str, Any]) -> None:
f"on exchange {exchange.name}.")
def start_list_strategies(args: Dict[str, Any]) -> None:
"""
Print Strategies available in a directory
"""
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
directory = Path(config.get('strategy_path', config['user_data_dir'] / USERPATH_STRATEGY))
strategies = StrategyResolver.search_all_objects(directory)
# Sort alphabetically
strategies = sorted(strategies, key=lambda x: x['name'])
strats_to_print = [{'name': s['name'], 'location': s['location'].name} for s in strategies]
if args['print_one_column']:
print('\n'.join([s['name'] for s in strategies]))
else:
print(tabulate(strats_to_print, headers='keys', tablefmt='pipe'))
def start_list_timeframes(args: Dict[str, Any]) -> None:
"""
Print ticker intervals (timeframes) available on Exchange
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
# Do not use ticker_interval set in the config
config['ticker_interval'] = None
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
if args['print_one_column']:
print('\n'.join(exchange.timeframes))
else:
print(f"Timeframes available for the exchange `{exchange.name}`: "
f"{', '.join(exchange.timeframes)}")
def start_list_markets(args: Dict[str, Any], pairs_only: bool = False) -> None:
"""
Print pairs/markets on the exchange
:param args: Cli args from Arguments()
:param pairs_only: if True print only pairs, otherwise print all instruments (markets)
:return: None
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
# By default only active pairs/markets are to be shown
active_only = not args.get('list_pairs_all', False)
base_currencies = args.get('base_currencies', [])
quote_currencies = args.get('quote_currencies', [])
try:
pairs = exchange.get_markets(base_currencies=base_currencies,
quote_currencies=quote_currencies,
pairs_only=pairs_only,
active_only=active_only)
# Sort the pairs/markets by symbol
pairs = OrderedDict(sorted(pairs.items()))
except Exception as e:
raise OperationalException(f"Cannot get markets. Reason: {e}") from e
else:
summary_str = ((f"Exchange {exchange.name} has {len(pairs)} ") +
("active " if active_only else "") +
(plural(len(pairs), "pair" if pairs_only else "market")) +
(f" with {', '.join(base_currencies)} as base "
f"{plural(len(base_currencies), 'currency', 'currencies')}"
if base_currencies else "") +
(" and" if base_currencies and quote_currencies else "") +
(f" with {', '.join(quote_currencies)} as quote "
f"{plural(len(quote_currencies), 'currency', 'currencies')}"
if quote_currencies else ""))
headers = ["Id", "Symbol", "Base", "Quote", "Active",
*(['Is pair'] if not pairs_only else [])]
tabular_data = []
for _, v in pairs.items():
tabular_data.append({'Id': v['id'], 'Symbol': v['symbol'],
'Base': v['base'], 'Quote': v['quote'],
'Active': market_is_active(v),
**({'Is pair': symbol_is_pair(v['symbol'])}
if not pairs_only else {})})
if (args.get('print_one_column', False) or
args.get('list_pairs_print_json', False) or
args.get('print_csv', False)):
# Print summary string in the log in case of machine-readable
# regular formats.
logger.info(f"{summary_str}.")
else:
# Print empty string separating leading logs and output in case of
# human-readable formats.
print()
if len(pairs):
if args.get('print_list', False):
# print data as a list, with human-readable summary
print(f"{summary_str}: {', '.join(pairs.keys())}.")
elif args.get('print_one_column', False):
print('\n'.join(pairs.keys()))
elif args.get('list_pairs_print_json', False):
print(rapidjson.dumps(list(pairs.keys()), default=str))
elif args.get('print_csv', False):
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(tabular_data)
else:
# print data as a table, with the human-readable summary
print(f"{summary_str}:")
print(tabulate(tabular_data, headers='keys', tablefmt='pipe'))
elif not (args.get('print_one_column', False) or
args.get('list_pairs_print_json', False) or
args.get('print_csv', False)):
print(f"{summary_str}.")
def start_test_pairlist(args: Dict[str, Any]) -> None:
"""
Test Pairlist configuration
@ -377,106 +124,3 @@ def start_test_pairlist(args: Dict[str, Any]) -> None:
print(rapidjson.dumps(list(pairlist), default=str))
else:
print(pairlist)
def start_hyperopt_list(args: Dict[str, Any]) -> None:
"""
List hyperopt epochs previously evaluated
"""
from freqtrade.optimize.hyperopt import Hyperopt
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
only_best = config.get('hyperopt_list_best', False)
only_profitable = config.get('hyperopt_list_profitable', False)
print_colorized = config.get('print_colorized', False)
print_json = config.get('print_json', False)
no_details = config.get('hyperopt_list_no_details', False)
no_header = False
trials_file = (config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
# Previous evaluations
trials = Hyperopt.load_previous_results(trials_file)
total_epochs = len(trials)
trials = _hyperopt_filter_trials(trials, only_best, only_profitable)
# TODO: fetch the interval for epochs to print from the cli option
epoch_start, epoch_stop = 0, None
if print_colorized:
colorama_init(autoreset=True)
try:
# Human-friendly indexes used here (starting from 1)
for val in trials[epoch_start:epoch_stop]:
Hyperopt.print_results_explanation(val, total_epochs, not only_best, print_colorized)
except KeyboardInterrupt:
print('User interrupted..')
if trials and not no_details:
sorted_trials = sorted(trials, key=itemgetter('loss'))
results = sorted_trials[0]
Hyperopt.print_epoch_details(results, total_epochs, print_json, no_header)
def start_hyperopt_show(args: Dict[str, Any]) -> None:
"""
Show details of a hyperopt epoch previously evaluated
"""
from freqtrade.optimize.hyperopt import Hyperopt
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
only_best = config.get('hyperopt_list_best', False)
only_profitable = config.get('hyperopt_list_profitable', False)
no_header = config.get('hyperopt_show_no_header', False)
trials_file = (config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
# Previous evaluations
trials = Hyperopt.load_previous_results(trials_file)
total_epochs = len(trials)
trials = _hyperopt_filter_trials(trials, only_best, only_profitable)
trials_epochs = len(trials)
n = config.get('hyperopt_show_index', -1)
if n > trials_epochs:
raise OperationalException(
f"The index of the epoch to show should be less than {trials_epochs + 1}.")
if n < -trials_epochs:
raise OperationalException(
f"The index of the epoch to show should be greater than {-trials_epochs - 1}.")
# Translate epoch index from human-readable format to pythonic
if n > 0:
n -= 1
print_json = config.get('print_json', False)
if trials:
val = trials[n]
Hyperopt.print_epoch_details(val, total_epochs, print_json, no_header,
header_str="Epoch details")
def _hyperopt_filter_trials(trials: List, only_best: bool, only_profitable: bool) -> List:
"""
Filter our items from the list of hyperopt results
"""
if only_best:
trials = [x for x in trials if x['is_best']]
if only_profitable:
trials = [x for x in trials if x['results_metrics']['profit'] > 0]
logger.info(f"{len(trials)} " +
("best " if only_best else "") +
("profitable " if only_profitable else "") +
"epochs found.")
return trials