add async method

This commit is contained in:
Matthias 2018-08-14 13:21:15 +02:00
parent 721fb3e326
commit 2602cbe683
5 changed files with 428 additions and 448 deletions

View File

@ -161,14 +161,6 @@ class Arguments(object):
dest='exportfilename',
metavar='PATH',
)
parser.add_argument(
'--backslap',
help="Utilize the Backslapping approach instead of the default Backtesting. This should provide more "
"accurate results, unless you are utilizing Min/Max function in your strategy.",
required=False,
dest='backslap',
action='store_true'
)
@staticmethod
def optimizer_shared_options(parser: argparse.ArgumentParser) -> None:
@ -236,7 +228,7 @@ class Arguments(object):
Builds and attaches all subcommands
:return: None
"""
from freqtrade.optimize import backtesting, hyperopt
from freqtrade.optimize import backtesting, backslapping, hyperopt
subparsers = self.parser.add_subparsers(dest='subparser')
@ -246,6 +238,12 @@ class Arguments(object):
self.optimizer_shared_options(backtesting_cmd)
self.backtesting_options(backtesting_cmd)
# Add backslapping subcommand
backslapping_cmd = subparsers.add_parser('backslapping', help='backslapping module')
backslapping_cmd.set_defaults(func=backslapping.start)
self.optimizer_shared_options(backslapping_cmd)
self.backtesting_options(backslapping_cmd)
# Add hyperopt subcommand
hyperopt_cmd = subparsers.add_parser('hyperopt', help='hyperopt module')
hyperopt_cmd.set_defaults(func=hyperopt.start)

View File

@ -1,48 +1,35 @@
import timeit
from argparse import Namespace
import logging
from typing import Dict, Any
from pandas import DataFrame
from freqtrade.exchange import Exchange
from freqtrade.optimize.optimize import IOptimize, BacktestResult, setup_configuration
from freqtrade.strategy import IStrategy
from freqtrade.strategy.interface import SellType
from freqtrade.strategy.resolver import StrategyResolver
logger = logging.getLogger(__name__)
class Backslapping:
class Backslapping(IOptimize):
"""
provides a quick way to evaluate strategies over a longer term of time
"""
def __init__(self, config: Dict[str, Any], exchange = None) -> None:
def __init__(self, config: Dict[str, Any]) -> None:
"""
constructor
"""
self.config = config
self.strategy: IStrategy = StrategyResolver(self.config).strategy
self.ticker_interval = self.strategy.ticker_interval
self.tickerdata_to_dataframe = self.strategy.tickerdata_to_dataframe
self.populate_buy_trend = self.strategy.populate_buy_trend
self.populate_sell_trend = self.strategy.populate_sell_trend
###
#
###
if exchange is None:
self.config['exchange']['secret'] = ''
self.config['exchange']['password'] = ''
self.config['exchange']['uid'] = ''
self.config['dry_run'] = True
self.exchange = Exchange(self.config)
else:
self.exchange = exchange
super().__init__(config)
self.fee = self.exchange.get_fee()
self.stop_loss_value = self.strategy.stoploss
#### backslap config
# backslap config
'''
Numpy arrays are used for 100x speed up
We requires setting Int values for
@ -96,8 +83,8 @@ class Backslapping:
if self.debug_timing: # Start timer
fl = self.s()
ticker_data = self.populate_sell_trend(
self.populate_buy_trend(pair_data))[headers].copy()
ticker_data = self.advise_sell(self.advise_buy(pair_data, {'pair': pair}),
{'pair': pair})[headers].copy()
if self.debug_timing: # print time taken
flt = self.f(fl)
@ -132,7 +119,7 @@ class Backslapping:
bslap_results_df = self.vector_fill_results_table(bslap_results_df, pair)
else:
from freqtrade.optimize.backtesting import BacktestResult
bslap_results_df = []
bslap_results_df = DataFrame.from_records(bslap_results_df, columns=BacktestResult._fields)
@ -787,3 +774,18 @@ class Backslapping:
# Send back List of trade dicts
return bslap_pair_results
def start(args: Namespace) -> None:
"""
Start Backtesting script
:param args: Cli args from Arguments()
:return: None
"""
# Initialize configuration
config = setup_configuration(args)
logger.info('Starting freqtrade in Backtesting mode')
# Initialize backtesting object
backslapping = Backslapping(config)
backslapping.start()

View File

@ -4,51 +4,21 @@
This module contains the backtesting logic
"""
import logging
import operator
from argparse import Namespace
from datetime import datetime, timedelta
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
from typing import Any, Dict, List, Optional
import arrow
from pandas import DataFrame, to_datetime
from tabulate import tabulate
from pandas import DataFrame
import freqtrade.optimize as optimize
from freqtrade import DependencyException, constants
from freqtrade.optimize.optimize import IOptimize, BacktestResult, setup_configuration
from freqtrade.arguments import Arguments
from freqtrade.configuration import Configuration
from freqtrade.exchange import Exchange
from freqtrade.misc import file_dump_json
from freqtrade.optimize.backslapping import Backslapping
from freqtrade.persistence import Trade
from freqtrade.strategy.interface import SellType
from freqtrade.strategy.resolver import IStrategy, StrategyResolver
from collections import OrderedDict
import timeit
from time import sleep
logger = logging.getLogger(__name__)
class BacktestResult(NamedTuple):
"""
NamedTuple Defining BacktestResults inputs.
"""
pair: str
profit_percent: float
profit_abs: float
open_time: datetime
close_time: datetime
open_index: int
close_index: int
trade_duration: float
open_at_end: bool
open_rate: float
close_rate: float
sell_reason: SellType
class Backtesting(object):
class Backtesting(IOptimize):
"""
Backtesting class, this class contains all the logic to run a backtest
@ -58,139 +28,7 @@ class Backtesting(object):
"""
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
self.strategy: IStrategy = StrategyResolver(self.config).strategy
self.ticker_interval = self.strategy.ticker_interval
self.tickerdata_to_dataframe = self.strategy.tickerdata_to_dataframe
self.advise_buy = self.strategy.advise_buy
self.advise_sell = self.strategy.advise_sell
# Reset keys for backtesting
self.config['exchange']['key'] = ''
self.config['exchange']['secret'] = ''
self.config['exchange']['password'] = ''
self.config['exchange']['uid'] = ''
self.config['dry_run'] = True
self.exchange = Exchange(self.config)
self.fee = self.exchange.get_fee()
self.stop_loss_value = self.strategy.stoploss
#### backslap config
'''
Numpy arrays are used for 100x speed up
We requires setting Int values for
buy stop triggers and stop calculated on
# buy 0 - open 1 - close 2 - sell 3 - high 4 - low 5 - stop 6
'''
self.np_buy: int = 0
self.np_open: int = 1
self.np_close: int = 2
self.np_sell: int = 3
self.np_high: int = 4
self.np_low: int = 5
self.np_stop: int = 6
self.np_bto: int = self.np_close # buys_triggered_on - should be close
self.np_bco: int = self.np_open # buys calculated on - open of the next candle.
self.np_sto: int = self.np_low # stops_triggered_on - Should be low, FT uses close
self.np_sco: int = self.np_stop # stops_calculated_on - Should be stop, FT uses close
# self.np_sto: int = self.np_close # stops_triggered_on - Should be low, FT uses close
# self.np_sco: int = self.np_close # stops_calculated_on - Should be stop, FT uses close
if 'backslap' in config:
self.use_backslap = config['backslap'] # Enable backslap - if false Orginal code is executed.
else:
self.use_backslap = False
logger.info("using backslap: {}".format(self.use_backslap))
self.debug = False # Main debug enable, very print heavy, enable 2 loops recommended
self.debug_timing = False # Stages within Backslap
self.debug_2loops = False # Limit each pair to two loops, useful when debugging
self.debug_vector = False # Debug vector calcs
self.debug_timing_main_loop = False # print overall timing per pair - works in Backtest and Backslap
self.backslap_show_trades = False # prints trades in addition to summary report
self.backslap_save_trades = True # saves trades as a pretty table to backslap.txt
self.stop_stops: int = 9999 # stop back testing any pair with this many stops, set to 999999 to not hit
self.backslap = Backslapping(config)
@staticmethod
def get_timeframe(data: Dict[str, DataFrame]) -> Tuple[arrow.Arrow, arrow.Arrow]:
"""
Get the maximum timeframe for the given backtest data
:param data: dictionary with preprocessed backtesting data
:return: tuple containing min_date, max_date
"""
timeframe = [
(arrow.get(frame['date'].min()), arrow.get(frame['date'].max()))
for frame in data.values()
]
return min(timeframe, key=operator.itemgetter(0))[0], \
max(timeframe, key=operator.itemgetter(1))[1]
def _generate_text_table(self, data: Dict[str, Dict], results: DataFrame) -> str:
"""
Generates and returns a text table for the given backtest data and the results dataframe
:return: pretty printed table with tabulate as str
"""
stake_currency = str(self.config.get('stake_currency'))
floatfmt = ('s', 'd', '.2f', '.2f', '.8f', 'd', '.1f', '.1f')
tabular_data = []
headers = ['pair', 'buy count', 'avg profit %', 'cum profit %',
'total profit ' + stake_currency, 'avg duration', 'profit', 'loss']
for pair in data:
result = results[results.pair == pair]
tabular_data.append([
pair,
len(result.index),
result.profit_percent.mean() * 100.0,
result.profit_percent.sum() * 100.0,
result.profit_abs.sum(),
str(timedelta(
minutes=round(result.trade_duration.mean()))) if not result.empty else '0:00',
len(result[result.profit_abs > 0]),
len(result[result.profit_abs < 0])
])
# Append Total
tabular_data.append([
'TOTAL',
len(results.index),
results.profit_percent.mean() * 100.0,
results.profit_percent.sum() * 100.0,
results.profit_abs.sum(),
str(timedelta(
minutes=round(results.trade_duration.mean()))) if not results.empty else '0:00',
len(results[results.profit_abs > 0]),
len(results[results.profit_abs < 0])
])
return tabulate(tabular_data, headers=headers, floatfmt=floatfmt, tablefmt="pipe")
def _generate_text_table_sell_reason(self, data: Dict[str, Dict], results: DataFrame) -> str:
"""
Generate small table outlining Backtest results
"""
tabular_data = []
headers = ['Sell Reason', 'Count']
for reason, count in results['sell_reason'].value_counts().iteritems():
tabular_data.append([reason.value, count])
return tabulate(tabular_data, headers=headers, tablefmt="pipe")
def _store_backtest_result(self, recordfilename: Optional[str], results: DataFrame) -> None:
records = [(t.pair, t.profit_percent, t.open_time.timestamp(),
t.close_time.timestamp(), t.open_index - 1, t.trade_duration,
t.open_rate, t.close_rate, t.open_at_end, t.sell_reason.value)
for index, t in results.iterrows()]
if records:
logger.info('Dumping backtest results to %s', recordfilename)
file_dump_json(recordfilename, records)
super().__init__(config)
def _get_sell_trade_entry(
self, pair: str, buy_row: DataFrame,
@ -217,6 +55,7 @@ class Backtesting(object):
sell = self.strategy.should_sell(trade, sell_row.open, sell_row.date, buy_signal,
sell_row.sell)
if sell.sell_flag:
return BacktestResult(pair=pair,
profit_percent=trade.calc_profit_percent(rate=sell_row.open),
profit_abs=trade.calc_profit(rate=sell_row.open),
@ -253,14 +92,7 @@ class Backtesting(object):
return btr
return None
def s(self):
st = timeit.default_timer()
return st
def f(self, st):
return (timeit.default_timer() - st)
def backtest(self, args: Dict) -> DataFrame:
def run(self, args: Dict) -> DataFrame:
"""
Implements backtesting functionality
@ -275,26 +107,13 @@ class Backtesting(object):
position_stacking: do we allow position stacking? (default: False)
:return: DataFrame
"""
use_backslap = self.use_backslap
debug_timing = self.debug_timing_main_loop
if use_backslap: # Use Back Slap code
return self.backslap.run(args)
else: # use Original Back test code
########################## Original BT loop
headers = ['date', 'buy', 'open', 'close', 'sell']
processed = args['processed']
max_open_trades = args.get('max_open_trades', 0)
position_stacking = args.get('position_stacking', False)
trades = []
trade_count_lock: Dict = {}
for pair, pair_data in processed.items():
if debug_timing: # Start timer
fl = self.s()
pair_data['buy'], pair_data['sell'] = 0, 0 # cleanup from previous run
ticker_data = self.advise_sell(
@ -306,11 +125,6 @@ class Backtesting(object):
ticker_data.drop(ticker_data.head(1).index, inplace=True)
if debug_timing: # print time taken
flt = self.f(fl)
# print("populate_buy_trend:", pair, round(flt, 10))
st = self.s()
# Convert from Pandas to list for performance reasons
# (Looping Pandas is slow.)
ticker = [x for x in ticker_data.itertuples()]
@ -341,165 +155,9 @@ class Backtesting(object):
# This happens only if the buy-signal was with the last candle
lock_pair_until = ticker_data.iloc[-1].date
if debug_timing: # print time taken
tt = self.f(st)
print("Time to BackTest :", pair, round(tt, 10))
print("-----------------------")
return DataFrame.from_records(trades, columns=BacktestResult._fields)
####################### Original BT loop end
def start(self) -> None:
"""
Run a backtesting end-to-end
:return: None
"""
data = {}
pairs = self.config['exchange']['pair_whitelist']
logger.info('Using stake_currency: %s ...', self.config['stake_currency'])
logger.info('Using stake_amount: %s ...', self.config['stake_amount'])
if self.config.get('live'):
logger.info('Downloading data for all pairs in whitelist ...')
for pair in pairs:
data[pair] = self.exchange.get_ticker_history(pair, self.ticker_interval)
else:
logger.info('Using local backtesting data (using whitelist in given config) ...')
timerange = Arguments.parse_timerange(None if self.config.get(
'timerange') is None else str(self.config.get('timerange')))
data = optimize.load_data(
self.config['datadir'],
pairs=pairs,
ticker_interval=self.ticker_interval,
refresh_pairs=self.config.get('refresh_pairs', False),
exchange=self.exchange,
timerange=timerange
)
ld_files = self.s()
if not data:
logger.critical("No data found. Terminating.")
return
# Use max_open_trades in backtesting, except --disable-max-market-positions is set
if self.config.get('use_max_market_positions', True):
max_open_trades = self.config['max_open_trades']
else:
logger.info('Ignoring max_open_trades (--disable-max-market-positions was used) ...')
max_open_trades = 0
preprocessed = self.tickerdata_to_dataframe(data)
t_t = self.f(ld_files)
print("Load from json to file to df in mem took", t_t)
# Print timeframe
min_date, max_date = self.get_timeframe(preprocessed)
logger.info(
'Measuring data from %s up to %s (%s days)..',
min_date.isoformat(),
max_date.isoformat(),
(max_date - min_date).days
)
# Execute backtest and print results
results = self.backtest(
{
'stake_amount': self.config.get('stake_amount'),
'processed': preprocessed,
'max_open_trades': max_open_trades,
'position_stacking': self.config.get('position_stacking', False),
}
)
if self.config.get('export', False):
self._store_backtest_result(self.config.get('exportfilename'), results)
if self.use_backslap:
logger.info(
'\n====================================================== '
'BackSLAP REPORT'
' =======================================================\n'
'%s',
self._generate_text_table(
data,
results
)
)
# optional print trades
if self.backslap_show_trades:
TradesFrame = results.filter(['open_time', 'pair', 'exit_type', 'profit_percent', 'profit_abs',
'buy_spend', 'sell_take', 'trade_duration', 'close_time'], axis=1)
def to_fwf(df, fname):
content = tabulate(df.values.tolist(), list(df.columns), floatfmt=".8f", tablefmt='psql')
print(content)
DataFrame.to_fwf = to_fwf(TradesFrame, "backslap.txt")
# optional save trades
if self.backslap_save_trades:
TradesFrame = results.filter(['open_time', 'pair', 'exit_type', 'profit_percent', 'profit_abs',
'buy_spend', 'sell_take', 'trade_duration', 'close_time'], axis=1)
def to_fwf(df, fname):
content = tabulate(df.values.tolist(), list(df.columns), floatfmt=".8f", tablefmt='psql')
open(fname, "w").write(content)
DataFrame.to_fwf = to_fwf(TradesFrame, "backslap.txt")
else:
logger.info(
'\n================================================= '
'BACKTEST REPORT'
' ==================================================\n'
'%s',
self._generate_text_table(
data,
results
)
)
if 'sell_reason' in results.columns:
logger.info(
'\n' +
' SELL READON STATS '.center(119, '=') +
'\n%s \n',
self._generate_text_table_sell_reason(data, results)
)
else:
logger.info("no sell reasons available!")
logger.info(
'\n' +
' LEFT OPEN TRADES REPORT '.center(119, '=') +
'\n%s',
self._generate_text_table(
data,
results.loc[results.open_at_end]
)
)
def setup_configuration(args: Namespace) -> Dict[str, Any]:
"""
Prepare the configuration for the backtesting
:param args: Cli args from Arguments()
:return: Configuration
"""
configuration = Configuration(args)
config = configuration.get_config()
# Ensure we do not use Exchange credentials
config['exchange']['key'] = ''
config['exchange']['secret'] = ''
config['backslap'] = args.backslap
if config['stake_amount'] == constants.UNLIMITED_STAKE_AMOUNT:
raise DependencyException('stake amount could not be "%s" for backtesting' %
constants.UNLIMITED_STAKE_AMOUNT)
return config
def start(args: Namespace) -> None:

View File

@ -276,7 +276,7 @@ class Hyperopt(Backtesting):
self.strategy.stoploss = params['stoploss']
processed = load(TICKERDATA_PICKLE)
results = self.backtest(
results = self.run(
{
'stake_amount': self.config['stake_amount'],
'processed': processed,

View File

@ -0,0 +1,322 @@
# pragma pylint: disable=missing-docstring, W0212, too-many-arguments
"""
This module contains the backtesting logic
"""
import logging
import operator
from abc import ABC, abstractmethod
from argparse import Namespace
from copy import deepcopy
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, NamedTuple, Optional, Tuple
import arrow
from pandas import DataFrame
from tabulate import tabulate
from freqtrade import DependencyException, constants
from freqtrade.arguments import Arguments
from freqtrade.configuration import Configuration
from freqtrade.exchange import Exchange
from freqtrade.misc import file_dump_json
import freqtrade.optimize as optimize
from freqtrade.strategy.interface import SellType
from freqtrade.strategy.resolver import IStrategy, StrategyResolver
logger = logging.getLogger(__name__)
class BacktestResult(NamedTuple):
"""
NamedTuple Defining BacktestResults inputs.
"""
pair: str
profit_percent: float
profit_abs: float
open_time: datetime
close_time: datetime
open_index: int
close_index: int
trade_duration: float
open_at_end: bool
open_rate: float
close_rate: float
sell_reason: SellType
class IOptimize(ABC):
"""
Backtesting Abstract class, this class contains all the logic to run a backtest
To run a backtest:
backtesting = Backtesting(config)
backtesting.start()
"""
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
# Reset keys for backtesting
self.config['exchange']['key'] = ''
self.config['exchange']['secret'] = ''
self.config['exchange']['password'] = ''
self.config['exchange']['uid'] = ''
self.config['dry_run'] = True
self.strategylist: List[IStrategy] = []
if self.config.get('strategy_list', None):
# Force one interval
self.ticker_interval = str(self.config.get('ticker_interval'))
for strat in list(self.config['strategy_list']):
stratconf = deepcopy(self.config)
stratconf['strategy'] = strat
self.strategylist.append(StrategyResolver(stratconf).strategy)
else:
# only one strategy
strat = StrategyResolver(self.config).strategy
self.strategylist.append(StrategyResolver(self.config).strategy)
# Load one strategy
self._set_strategy(self.strategylist[0])
self.exchange = Exchange(self.config)
self.fee = self.exchange.get_fee()
def _set_strategy(self, strategy):
"""
Load strategy into backtesting
"""
self.strategy = strategy
self.ticker_interval = self.config.get('ticker_interval')
self.tickerdata_to_dataframe = strategy.tickerdata_to_dataframe
self.advise_buy = strategy.advise_buy
self.advise_sell = strategy.advise_sell
def _get_timeframe(self, data: Dict[str, DataFrame]) -> Tuple[arrow.Arrow, arrow.Arrow]:
"""
Get the maximum timeframe for the given backtest data
:param data: dictionary with preprocessed backtesting data
:return: tuple containing min_date, max_date
"""
timeframe = [
(arrow.get(frame['date'].min()), arrow.get(frame['date'].max()))
for frame in data.values()
]
return min(timeframe, key=operator.itemgetter(0))[0], \
max(timeframe, key=operator.itemgetter(1))[1]
def _generate_text_table(self, data: Dict[str, Dict], results: DataFrame) -> str:
"""
Generates and returns a text table for the given backtest data and the results dataframe
:return: pretty printed table with tabulate as str
"""
stake_currency = str(self.config.get('stake_currency'))
floatfmt = ('s', 'd', '.2f', '.2f', '.8f', 'd', '.1f', '.1f')
tabular_data = []
headers = ['pair', 'buy count', 'avg profit %', 'cum profit %',
'total profit ' + stake_currency, 'avg duration', 'profit', 'loss']
for pair in data:
result = results[results.pair == pair]
tabular_data.append([
pair,
len(result.index),
result.profit_percent.mean() * 100.0,
result.profit_percent.sum() * 100.0,
result.profit_abs.sum(),
str(timedelta(
minutes=round(result.trade_duration.mean()))) if not result.empty else '0:00',
len(result[result.profit_abs > 0]),
len(result[result.profit_abs < 0])
])
# Append Total
tabular_data.append([
'TOTAL',
len(results.index),
results.profit_percent.mean() * 100.0,
results.profit_percent.sum() * 100.0,
results.profit_abs.sum(),
str(timedelta(
minutes=round(results.trade_duration.mean()))) if not results.empty else '0:00',
len(results[results.profit_abs > 0]),
len(results[results.profit_abs < 0])
])
return tabulate(tabular_data, headers=headers, floatfmt=floatfmt, tablefmt="pipe")
def _generate_text_table_sell_reason(self, data: Dict[str, Dict], results: DataFrame) -> str:
"""
Generate small table outlining Backtest results
"""
tabular_data = []
headers = ['Sell Reason', 'Count']
for reason, count in results['sell_reason'].value_counts().iteritems():
tabular_data.append([reason.value, count])
return tabulate(tabular_data, headers=headers, tablefmt="pipe")
def _generate_text_table_strategy(self, all_results: dict) -> str:
"""
Generate summary table per strategy
"""
stake_currency = str(self.config.get('stake_currency'))
floatfmt = ('s', 'd', '.2f', '.2f', '.8f', 'd', '.1f', '.1f')
tabular_data = []
headers = ['Strategy', 'buy count', 'avg profit %', 'cum profit %',
'total profit ' + stake_currency, 'avg duration', 'profit', 'loss']
for strategy, results in all_results.items():
tabular_data.append([
strategy,
len(results.index),
results.profit_percent.mean() * 100.0,
results.profit_percent.sum() * 100.0,
results.profit_abs.sum(),
str(timedelta(
minutes=round(results.trade_duration.mean()))) if not results.empty else '0:00',
len(results[results.profit_abs > 0]),
len(results[results.profit_abs < 0])
])
return tabulate(tabular_data, headers=headers, floatfmt=floatfmt, tablefmt="pipe")
def _store_backtest_result(self, recordfilename: str, results: DataFrame,
strategyname: Optional[str] = None) -> None:
records = [(t.pair, t.profit_percent, t.open_time.timestamp(),
t.close_time.timestamp(), t.open_index - 1, t.trade_duration,
t.open_rate, t.close_rate, t.open_at_end, t.sell_reason.value)
for index, t in results.iterrows()]
if records:
if strategyname:
# Inject strategyname to filename
recname = Path(recordfilename)
recordfilename = str(Path.joinpath(
recname.parent, f'{recname.stem}-{strategyname}').with_suffix(recname.suffix))
logger.info('Dumping backtest results to %s', recordfilename)
file_dump_json(recordfilename, records)
def start(self) -> None:
"""
Run a backtesting end-to-end
:return: None
"""
data = {}
pairs = self.config['exchange']['pair_whitelist']
logger.info('Using stake_currency: %s ...', self.config['stake_currency'])
logger.info('Using stake_amount: %s ...', self.config['stake_amount'])
if self.config.get('live'):
logger.info('Downloading data for all pairs in whitelist ...')
for pair in pairs:
data[pair] = self.exchange.get_candle_history(pair, self.ticker_interval)
else:
logger.info('Using local backtesting data (using whitelist in given config) ...')
timerange = Arguments.parse_timerange(None if self.config.get(
'timerange') is None else str(self.config.get('timerange')))
data = optimize.load_data(
self.config['datadir'],
pairs=pairs,
ticker_interval=self.ticker_interval,
refresh_pairs=self.config.get('refresh_pairs', False),
exchange=self.exchange,
timerange=timerange
)
if not data:
logger.critical("No data found. Terminating.")
return
# Use max_open_trades in backtesting, except --disable-max-market-positions is set
if self.config.get('use_max_market_positions', True):
max_open_trades = self.config['max_open_trades']
else:
logger.info('Ignoring max_open_trades (--disable-max-market-positions was used) ...')
max_open_trades = 0
all_results = {}
for strat in self.strategylist:
logger.info("Running backtesting for Strategy %s", strat.get_strategy_name())
self._set_strategy(strat)
# need to reprocess data every time to populate signals
preprocessed = self.tickerdata_to_dataframe(data)
# Print timeframe
min_date, max_date = self._get_timeframe(preprocessed)
logger.info(
'Measuring data from %s up to %s (%s days)..',
min_date.isoformat(),
max_date.isoformat(),
(max_date - min_date).days
)
# Execute backtest and print results
all_results[self.strategy.get_strategy_name()] = self.run(
{
'stake_amount': self.config.get('stake_amount'),
'processed': preprocessed,
'max_open_trades': max_open_trades,
'position_stacking': self.config.get('position_stacking', False),
}
)
for strategy, results in all_results.items():
if self.config.get('export', False):
self._store_backtest_result(self.config['exportfilename'], results,
strategy if len(self.strategylist) > 1 else None)
print(f"Result for strategy {strategy}")
print(' BACKTESTING REPORT '.center(119, '='))
print(self._generate_text_table(data, results))
print(' SELL REASON STATS '.center(119, '='))
print(self._generate_text_table_sell_reason(data, results))
print(' LEFT OPEN TRADES REPORT '.center(119, '='))
print(self._generate_text_table(data, results.loc[results.open_at_end]))
print()
if len(all_results) > 1:
# Print Strategy summary table
print(' Strategy Summary '.center(119, '='))
print(self._generate_text_table_strategy(all_results))
print('\nFor more details, please look at the detail tables above')
@abstractmethod
def run(self, args: Dict) -> DataFrame:
"""
Runs backtesting functionality.
NOTE: This method is used by Hyperopt at each iteration. Please keep it optimized.
Of course try to not have ugly code. By some accessor are sometime slower than functions.
Avoid, logging on this method
:param args: a dict containing:
stake_amount: btc amount to use for each trade
processed: a processed dictionary with format {pair, data}
max_open_trades: maximum number of concurrent trades (default: 0, disabled)
position_stacking: do we allow position stacking? (default: False)
:return: DataFrame
"""
def setup_configuration(args: Namespace) -> Dict[str, Any]:
"""
Prepare the configuration for the backtesting
:param args: Cli args from Arguments()
:return: Configuration
"""
configuration = Configuration(args)
config = configuration.get_config()
# Ensure we do not use Exchange credentials
config['exchange']['key'] = ''
config['exchange']['secret'] = ''
if config['stake_amount'] == constants.UNLIMITED_STAKE_AMOUNT:
raise DependencyException('stake amount could not be "%s" for backtesting' %
constants.UNLIMITED_STAKE_AMOUNT)
return config