stable/freqtrade/utils.py

481 lines
18 KiB
Python

import csv
import logging
import sys
from collections import OrderedDict
from operator import itemgetter
from pathlib import Path
from typing import Any, Dict, List
import arrow
import rapidjson
from colorama import init as colorama_init
from tabulate import tabulate
from freqtrade.configuration import (Configuration, TimeRange,
remove_credentials,
validate_config_consistency)
from freqtrade.configuration.directory_operations import (copy_sample_files,
create_userdata_dir)
from freqtrade.constants import USERPATH_HYPEROPTS, USERPATH_STRATEGY
from freqtrade.data.history import (convert_trades_to_ohlcv,
refresh_backtest_ohlcv_data,
refresh_backtest_trades_data)
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import (available_exchanges, ccxt_exchanges,
market_is_active, symbol_is_pair)
from freqtrade.misc import plural, render_template
from freqtrade.resolvers import ExchangeResolver, StrategyResolver
from freqtrade.state import RunMode
logger = logging.getLogger(__name__)
def setup_utils_configuration(args: Dict[str, Any], method: RunMode) -> Dict[str, Any]:
"""
Prepare the configuration for utils subcommands
:param args: Cli args from Arguments()
:return: Configuration
"""
configuration = Configuration(args, method)
config = configuration.get_config()
# Ensure we do not use Exchange credentials
remove_credentials(config)
validate_config_consistency(config)
return config
def start_trading(args: Dict[str, Any]) -> int:
"""
Main entry point for trading mode
"""
from freqtrade.worker import Worker
# Load and run worker
worker = None
try:
worker = Worker(args)
worker.run()
except KeyboardInterrupt:
logger.info('SIGINT received, aborting ...')
finally:
if worker:
logger.info("worker found ... calling exit")
worker.exit()
return 0
def start_list_exchanges(args: Dict[str, Any]) -> None:
"""
Print available exchanges
:param args: Cli args from Arguments()
:return: None
"""
exchanges = ccxt_exchanges() if args['list_exchanges_all'] else available_exchanges()
if args['print_one_column']:
print('\n'.join(exchanges))
else:
if args['list_exchanges_all']:
print(f"All exchanges supported by the ccxt library: {', '.join(exchanges)}")
else:
print(f"Exchanges available for Freqtrade: {', '.join(exchanges)}")
def start_create_userdir(args: Dict[str, Any]) -> None:
"""
Create "user_data" directory to contain user data strategies, hyperopt, ...)
:param args: Cli args from Arguments()
:return: None
"""
if "user_data_dir" in args and args["user_data_dir"]:
userdir = create_userdata_dir(args["user_data_dir"], create_dir=True)
copy_sample_files(userdir, overwrite=args["reset"])
else:
logger.warning("`create-userdir` requires --userdir to be set.")
sys.exit(1)
def deploy_new_strategy(strategy_name, strategy_path: Path, subtemplate: str):
"""
Deploy new strategy from template to strategy_path
"""
indicators = render_template(templatefile=f"subtemplates/indicators_{subtemplate}.j2",)
buy_trend = render_template(templatefile=f"subtemplates/buy_trend_{subtemplate}.j2",)
sell_trend = render_template(templatefile=f"subtemplates/sell_trend_{subtemplate}.j2",)
strategy_text = render_template(templatefile='base_strategy.py.j2',
arguments={"strategy": strategy_name,
"indicators": indicators,
"buy_trend": buy_trend,
"sell_trend": sell_trend,
})
logger.info(f"Writing strategy to `{strategy_path}`.")
strategy_path.write_text(strategy_text)
def start_new_strategy(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
if "strategy" in args and args["strategy"]:
if args["strategy"] == "DefaultStrategy":
raise OperationalException("DefaultStrategy is not allowed as name.")
new_path = config['user_data_dir'] / USERPATH_STRATEGY / (args["strategy"] + ".py")
if new_path.exists():
raise OperationalException(f"`{new_path}` already exists. "
"Please choose another Strategy Name.")
deploy_new_strategy(args['strategy'], new_path, args['template'])
else:
raise OperationalException("`new-strategy` requires --strategy to be set.")
def deploy_new_hyperopt(hyperopt_name, hyperopt_path: Path, subtemplate: str):
"""
Deploys a new hyperopt template to hyperopt_path
"""
buy_guards = render_template(
templatefile=f"subtemplates/hyperopt_buy_guards_{subtemplate}.j2",)
sell_guards = render_template(
templatefile=f"subtemplates/hyperopt_sell_guards_{subtemplate}.j2",)
buy_space = render_template(
templatefile=f"subtemplates/hyperopt_buy_space_{subtemplate}.j2",)
sell_space = render_template(
templatefile=f"subtemplates/hyperopt_sell_space_{subtemplate}.j2",)
strategy_text = render_template(templatefile='base_hyperopt.py.j2',
arguments={"hyperopt": hyperopt_name,
"buy_guards": buy_guards,
"sell_guards": sell_guards,
"buy_space": buy_space,
"sell_space": sell_space,
})
logger.info(f"Writing hyperopt to `{hyperopt_path}`.")
hyperopt_path.write_text(strategy_text)
def start_new_hyperopt(args: Dict[str, Any]) -> None:
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
if "hyperopt" in args and args["hyperopt"]:
if args["hyperopt"] == "DefaultHyperopt":
raise OperationalException("DefaultHyperopt is not allowed as name.")
new_path = config['user_data_dir'] / USERPATH_HYPEROPTS / (args["hyperopt"] + ".py")
if new_path.exists():
raise OperationalException(f"`{new_path}` already exists. "
"Please choose another Strategy Name.")
deploy_new_hyperopt(args['hyperopt'], new_path, args['template'])
else:
raise OperationalException("`new-hyperopt` requires --hyperopt to be set.")
def start_download_data(args: Dict[str, Any]) -> None:
"""
Download data (former download_backtest_data.py script)
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
timerange = TimeRange()
if 'days' in config:
time_since = arrow.utcnow().shift(days=-config['days']).strftime("%Y%m%d")
timerange = TimeRange.parse_timerange(f'{time_since}-')
if 'pairs' not in config:
raise OperationalException(
"Downloading data requires a list of pairs. "
"Please check the documentation on how to configure this.")
logger.info(f'About to download pairs: {config["pairs"]}, '
f'intervals: {config["timeframes"]} to {config["datadir"]}')
pairs_not_available: List[str] = []
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config)
try:
if config.get('download_trades'):
pairs_not_available = refresh_backtest_trades_data(
exchange, pairs=config["pairs"], datadir=config['datadir'],
timerange=timerange, erase=config.get("erase"))
# Convert downloaded trade data to different timeframes
convert_trades_to_ohlcv(
pairs=config["pairs"], timeframes=config["timeframes"],
datadir=config['datadir'], timerange=timerange, erase=config.get("erase"))
else:
pairs_not_available = refresh_backtest_ohlcv_data(
exchange, pairs=config["pairs"], timeframes=config["timeframes"],
datadir=config['datadir'], timerange=timerange, erase=config.get("erase"))
except KeyboardInterrupt:
sys.exit("SIGINT received, aborting ...")
finally:
if pairs_not_available:
logger.info(f"Pairs [{','.join(pairs_not_available)}] not available "
f"on exchange {exchange.name}.")
def start_list_strategies(args: Dict[str, Any]) -> None:
"""
Print Strategies available in a directory
"""
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
directory = Path(config.get('strategy_path', config['user_data_dir'] / USERPATH_STRATEGY))
strategies = StrategyResolver.search_all_objects(directory)
# Sort alphabetically
strategies = sorted(strategies, key=lambda x: x['name'])
strats_to_print = [{'name': s['name'], 'location': s['location'].name} for s in strategies]
if args['print_one_column']:
print('\n'.join([s['name'] for s in strategies]))
else:
print(tabulate(strats_to_print, headers='keys', tablefmt='pipe'))
def start_list_timeframes(args: Dict[str, Any]) -> None:
"""
Print ticker intervals (timeframes) available on Exchange
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
# Do not use ticker_interval set in the config
config['ticker_interval'] = None
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
if args['print_one_column']:
print('\n'.join(exchange.timeframes))
else:
print(f"Timeframes available for the exchange `{exchange.name}`: "
f"{', '.join(exchange.timeframes)}")
def start_list_markets(args: Dict[str, Any], pairs_only: bool = False) -> None:
"""
Print pairs/markets on the exchange
:param args: Cli args from Arguments()
:param pairs_only: if True print only pairs, otherwise print all instruments (markets)
:return: None
"""
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
# Init exchange
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
# By default only active pairs/markets are to be shown
active_only = not args.get('list_pairs_all', False)
base_currencies = args.get('base_currencies', [])
quote_currencies = args.get('quote_currencies', [])
try:
pairs = exchange.get_markets(base_currencies=base_currencies,
quote_currencies=quote_currencies,
pairs_only=pairs_only,
active_only=active_only)
# Sort the pairs/markets by symbol
pairs = OrderedDict(sorted(pairs.items()))
except Exception as e:
raise OperationalException(f"Cannot get markets. Reason: {e}") from e
else:
summary_str = ((f"Exchange {exchange.name} has {len(pairs)} ") +
("active " if active_only else "") +
(plural(len(pairs), "pair" if pairs_only else "market")) +
(f" with {', '.join(base_currencies)} as base "
f"{plural(len(base_currencies), 'currency', 'currencies')}"
if base_currencies else "") +
(" and" if base_currencies and quote_currencies else "") +
(f" with {', '.join(quote_currencies)} as quote "
f"{plural(len(quote_currencies), 'currency', 'currencies')}"
if quote_currencies else ""))
headers = ["Id", "Symbol", "Base", "Quote", "Active",
*(['Is pair'] if not pairs_only else [])]
tabular_data = []
for _, v in pairs.items():
tabular_data.append({'Id': v['id'], 'Symbol': v['symbol'],
'Base': v['base'], 'Quote': v['quote'],
'Active': market_is_active(v),
**({'Is pair': symbol_is_pair(v['symbol'])}
if not pairs_only else {})})
if (args.get('print_one_column', False) or
args.get('list_pairs_print_json', False) or
args.get('print_csv', False)):
# Print summary string in the log in case of machine-readable
# regular formats.
logger.info(f"{summary_str}.")
else:
# Print empty string separating leading logs and output in case of
# human-readable formats.
print()
if len(pairs):
if args.get('print_list', False):
# print data as a list, with human-readable summary
print(f"{summary_str}: {', '.join(pairs.keys())}.")
elif args.get('print_one_column', False):
print('\n'.join(pairs.keys()))
elif args.get('list_pairs_print_json', False):
print(rapidjson.dumps(list(pairs.keys()), default=str))
elif args.get('print_csv', False):
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(tabular_data)
else:
# print data as a table, with the human-readable summary
print(f"{summary_str}:")
print(tabulate(tabular_data, headers='keys', tablefmt='pipe'))
elif not (args.get('print_one_column', False) or
args.get('list_pairs_print_json', False) or
args.get('print_csv', False)):
print(f"{summary_str}.")
def start_test_pairlist(args: Dict[str, Any]) -> None:
"""
Test Pairlist configuration
"""
from freqtrade.pairlist.pairlistmanager import PairListManager
config = setup_utils_configuration(args, RunMode.UTIL_EXCHANGE)
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
quote_currencies = args.get('quote_currencies')
if not quote_currencies:
quote_currencies = [config.get('stake_currency')]
results = {}
for curr in quote_currencies:
config['stake_currency'] = curr
# Do not use ticker_interval set in the config
pairlists = PairListManager(exchange, config)
pairlists.refresh_pairlist()
results[curr] = pairlists.whitelist
for curr, pairlist in results.items():
if not args.get('print_one_column', False):
print(f"Pairs for {curr}: ")
if args.get('print_one_column', False):
print('\n'.join(pairlist))
elif args.get('list_pairs_print_json', False):
print(rapidjson.dumps(list(pairlist), default=str))
else:
print(pairlist)
def start_hyperopt_list(args: Dict[str, Any]) -> None:
"""
List hyperopt epochs previously evaluated
"""
from freqtrade.optimize.hyperopt import Hyperopt
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
only_best = config.get('hyperopt_list_best', False)
only_profitable = config.get('hyperopt_list_profitable', False)
print_colorized = config.get('print_colorized', False)
print_json = config.get('print_json', False)
no_details = config.get('hyperopt_list_no_details', False)
no_header = False
trials_file = (config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
# Previous evaluations
trials = Hyperopt.load_previous_results(trials_file)
total_epochs = len(trials)
trials = _hyperopt_filter_trials(trials, only_best, only_profitable)
# TODO: fetch the interval for epochs to print from the cli option
epoch_start, epoch_stop = 0, None
if print_colorized:
colorama_init(autoreset=True)
try:
# Human-friendly indexes used here (starting from 1)
for val in trials[epoch_start:epoch_stop]:
Hyperopt.print_results_explanation(val, total_epochs, not only_best, print_colorized)
except KeyboardInterrupt:
print('User interrupted..')
if trials and not no_details:
sorted_trials = sorted(trials, key=itemgetter('loss'))
results = sorted_trials[0]
Hyperopt.print_epoch_details(results, total_epochs, print_json, no_header)
def start_hyperopt_show(args: Dict[str, Any]) -> None:
"""
Show details of a hyperopt epoch previously evaluated
"""
from freqtrade.optimize.hyperopt import Hyperopt
config = setup_utils_configuration(args, RunMode.UTIL_NO_EXCHANGE)
only_best = config.get('hyperopt_list_best', False)
only_profitable = config.get('hyperopt_list_profitable', False)
no_header = config.get('hyperopt_show_no_header', False)
trials_file = (config['user_data_dir'] /
'hyperopt_results' / 'hyperopt_results.pickle')
# Previous evaluations
trials = Hyperopt.load_previous_results(trials_file)
total_epochs = len(trials)
trials = _hyperopt_filter_trials(trials, only_best, only_profitable)
trials_epochs = len(trials)
n = config.get('hyperopt_show_index', -1)
if n > trials_epochs:
raise OperationalException(
f"The index of the epoch to show should be less than {trials_epochs + 1}.")
if n < -trials_epochs:
raise OperationalException(
f"The index of the epoch to show should be greater than {-trials_epochs - 1}.")
# Translate epoch index from human-readable format to pythonic
if n > 0:
n -= 1
print_json = config.get('print_json', False)
if trials:
val = trials[n]
Hyperopt.print_epoch_details(val, total_epochs, print_json, no_header,
header_str="Epoch details")
def _hyperopt_filter_trials(trials: List, only_best: bool, only_profitable: bool) -> List:
"""
Filter our items from the list of hyperopt results
"""
if only_best:
trials = [x for x in trials if x['is_best']]
if only_profitable:
trials = [x for x in trials if x['results_metrics']['profit'] > 0]
logger.info(f"{len(trials)} " +
("best " if only_best else "") +
("profitable " if only_profitable else "") +
"epochs found.")
return trials