Merge branch 'develop' into pr/xataxxx/6079

This commit is contained in:
Matthias
2022-01-02 22:21:41 +01:00
42 changed files with 300 additions and 134 deletions

View File

@@ -1,6 +1,6 @@
from datetime import datetime, timezone
from cachetools.ttl import TTLCache
from cachetools import TTLCache
class PeriodicCache(TTLCache):

View File

@@ -325,6 +325,7 @@ def combine_dataframes_with_mean(data: Dict[str, pd.DataFrame],
:param column: Column in the original dataframes to use
:return: DataFrame with the column renamed to the dict key, and a column
named mean, containing the mean of all pairs.
:raise: ValueError if no data is provided.
"""
df_comb = pd.concat([data[pair].set_index('date').rename(
{column: pair}, axis=1)[pair] for pair in data], axis=1)
@@ -360,6 +361,36 @@ def create_cum_profit(df: pd.DataFrame, trades: pd.DataFrame, col_name: str,
return df
def _calc_drawdown_series(profit_results: pd.DataFrame, *, date_col: str, value_col: str
) -> pd.DataFrame:
max_drawdown_df = pd.DataFrame()
max_drawdown_df['cumulative'] = profit_results[value_col].cumsum()
max_drawdown_df['high_value'] = max_drawdown_df['cumulative'].cummax()
max_drawdown_df['drawdown'] = max_drawdown_df['cumulative'] - max_drawdown_df['high_value']
max_drawdown_df['date'] = profit_results.loc[:, date_col]
return max_drawdown_df
def calculate_underwater(trades: pd.DataFrame, *, date_col: str = 'close_date',
value_col: str = 'profit_ratio'
):
"""
Calculate max drawdown and the corresponding close dates
:param trades: DataFrame containing trades (requires columns close_date and profit_ratio)
:param date_col: Column in DataFrame to use for dates (defaults to 'close_date')
:param value_col: Column in DataFrame to use for values (defaults to 'profit_ratio')
:return: Tuple (float, highdate, lowdate, highvalue, lowvalue) with absolute max drawdown,
high and low time and high and low value.
:raise: ValueError if trade-dataframe was found empty.
"""
if len(trades) == 0:
raise ValueError("Trade dataframe empty.")
profit_results = trades.sort_values(date_col).reset_index(drop=True)
max_drawdown_df = _calc_drawdown_series(profit_results, date_col=date_col, value_col=value_col)
return max_drawdown_df
def calculate_max_drawdown(trades: pd.DataFrame, *, date_col: str = 'close_date',
value_col: str = 'profit_ratio'
) -> Tuple[float, pd.Timestamp, pd.Timestamp, float, float]:
@@ -375,10 +406,7 @@ def calculate_max_drawdown(trades: pd.DataFrame, *, date_col: str = 'close_date'
if len(trades) == 0:
raise ValueError("Trade dataframe empty.")
profit_results = trades.sort_values(date_col).reset_index(drop=True)
max_drawdown_df = pd.DataFrame()
max_drawdown_df['cumulative'] = profit_results[value_col].cumsum()
max_drawdown_df['high_value'] = max_drawdown_df['cumulative'].cummax()
max_drawdown_df['drawdown'] = max_drawdown_df['cumulative'] - max_drawdown_df['high_value']
max_drawdown_df = _calc_drawdown_series(profit_results, date_col=date_col, value_col=value_col)
idxmin = max_drawdown_df['drawdown'].idxmin()
if idxmin == 0:

View File

@@ -201,7 +201,7 @@ class IDataHandler(ABC):
enddate = pairdf.iloc[-1]['date']
if timerange_startup:
self._validate_pairdata(pair, pairdf, timerange_startup)
self._validate_pairdata(pair, pairdf, timeframe, timerange_startup)
pairdf = trim_dataframe(pairdf, timerange_startup)
if self._check_empty_df(pairdf, pair, timeframe, warn_no_data):
return pairdf
@@ -228,7 +228,7 @@ class IDataHandler(ABC):
return True
return False
def _validate_pairdata(self, pair, pairdata: DataFrame, timerange: TimeRange):
def _validate_pairdata(self, pair, pairdata: DataFrame, timeframe: str, timerange: TimeRange):
"""
Validates pairdata for missing data at start end end and logs warnings.
:param pairdata: Dataframe to validate
@@ -238,12 +238,12 @@ class IDataHandler(ABC):
if timerange.starttype == 'date':
start = datetime.fromtimestamp(timerange.startts, tz=timezone.utc)
if pairdata.iloc[0]['date'] > start:
logger.warning(f"Missing data at start for pair {pair}, "
logger.warning(f"Missing data at start for pair {pair} at {timeframe}, "
f"data starts at {pairdata.iloc[0]['date']:%Y-%m-%d %H:%M:%S}")
if timerange.stoptype == 'date':
stop = datetime.fromtimestamp(timerange.stopts, tz=timezone.utc)
if pairdata.iloc[-1]['date'] < stop:
logger.warning(f"Missing data at end for pair {pair}, "
logger.warning(f"Missing data at end for pair {pair} at {timeframe}, "
f"data ends at {pairdata.iloc[-1]['date']:%Y-%m-%d %H:%M:%S}")

View File

@@ -4,9 +4,20 @@ import time
from functools import wraps
from freqtrade.exceptions import DDosProtection, RetryableOrderError, TemporaryError
from freqtrade.mixins import LoggingMixin
logger = logging.getLogger(__name__)
__logging_mixin = None
def _get_logging_mixin():
# Logging-mixin to cache kucoin responses
# Only to be used in retrier
global __logging_mixin
if not __logging_mixin:
__logging_mixin = LoggingMixin(logger)
return __logging_mixin
# Maximum default retry count.
@@ -72,28 +83,33 @@ def calculate_backoff(retrycount, max_retries):
def retrier_async(f):
async def wrapper(*args, **kwargs):
count = kwargs.pop('count', API_RETRY_COUNT)
kucoin = args[0].name == "Kucoin" # Check if the exchange is KuCoin.
try:
return await f(*args, **kwargs)
except TemporaryError as ex:
logger.warning('%s() returned exception: "%s"', f.__name__, ex)
msg = f'{f.__name__}() returned exception: "{ex}". '
if count > 0:
logger.warning('retrying %s() still for %s times', f.__name__, count)
msg += f'Retrying still for {count} times.'
count -= 1
kwargs.update({'count': count})
kwargs['count'] = count
if isinstance(ex, DDosProtection):
if "kucoin" in str(ex) and "429000" in str(ex):
if kucoin and "429000" in str(ex):
# Temporary fix for 429000 error on kucoin
# see https://github.com/freqtrade/freqtrade/issues/5700 for details.
logger.warning(
_get_logging_mixin().log_once(
f"Kucoin 429 error, avoid triggering DDosProtection backoff delay. "
f"{count} tries left before giving up")
f"{count} tries left before giving up", logmethod=logger.warning)
# Reset msg to avoid logging too many times.
msg = ''
else:
backoff_delay = calculate_backoff(count + 1, API_RETRY_COUNT)
logger.info(f"Applying DDosProtection backoff delay: {backoff_delay}")
await asyncio.sleep(backoff_delay)
if msg:
logger.warning(msg)
return await wrapper(*args, **kwargs)
else:
logger.warning('Giving up retrying: %s()', f.__name__)
logger.warning(msg + 'Giving up.')
raise ex
return wrapper
@@ -106,9 +122,9 @@ def retrier(_func=None, retries=API_RETRY_COUNT):
try:
return f(*args, **kwargs)
except (TemporaryError, RetryableOrderError) as ex:
logger.warning('%s() returned exception: "%s"', f.__name__, ex)
msg = f'{f.__name__}() returned exception: "{ex}". '
if count > 0:
logger.warning('retrying %s() still for %s times', f.__name__, count)
logger.warning(msg + f'Retrying still for {count} times.')
count -= 1
kwargs.update({'count': count})
if isinstance(ex, (DDosProtection, RetryableOrderError)):
@@ -118,7 +134,7 @@ def retrier(_func=None, retries=API_RETRY_COUNT):
time.sleep(backoff_delay)
return wrapper(*args, **kwargs)
else:
logger.warning('Giving up retrying: %s()', f.__name__)
logger.warning(msg + 'Giving up.')
raise ex
return wrapper
# Support both @retrier and @retrier(retries=2) syntax

View File

@@ -83,6 +83,8 @@ class Exchange:
self._api: ccxt.Exchange = None
self._api_async: ccxt_async.Exchange = None
self._markets: Dict = {}
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._config.update(config)
@@ -170,8 +172,10 @@ class Exchange:
def close(self):
logger.debug("Exchange object destroyed, closing async loop")
if self._api_async and inspect.iscoroutinefunction(self._api_async.close):
asyncio.get_event_loop().run_until_complete(self._api_async.close())
if (self._api_async and inspect.iscoroutinefunction(self._api_async.close)
and self._api_async.session):
logger.info("Closing async ccxt session.")
self.loop.run_until_complete(self._api_async.close())
def _init_ccxt(self, exchange_config: Dict[str, Any], ccxt_module: CcxtModuleType = ccxt,
ccxt_kwargs: Dict = {}) -> ccxt.Exchange:
@@ -326,7 +330,7 @@ class Exchange:
def _load_async_markets(self, reload: bool = False) -> None:
try:
if self._api_async:
asyncio.get_event_loop().run_until_complete(
self.loop.run_until_complete(
self._api_async.load_markets(reload=reload))
except (asyncio.TimeoutError, ccxt.BaseError) as e:
@@ -1227,7 +1231,7 @@ class Exchange:
:param since_ms: Timestamp in milliseconds to get history from
:return: List with candle (OHLCV) data
"""
pair, timeframe, data = asyncio.get_event_loop().run_until_complete(
pair, timeframe, data = self.loop.run_until_complete(
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
since_ms=since_ms, is_new_pair=is_new_pair))
logger.info(f"Downloaded data for {pair} with length {len(data)}.")
@@ -1329,8 +1333,10 @@ class Exchange:
results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100):
results = asyncio.get_event_loop().run_until_complete(
asyncio.gather(*input_coro, return_exceptions=True))
async def gather_stuff():
return await asyncio.gather(*input_coro, return_exceptions=True)
results = self.loop.run_until_complete(gather_stuff())
# handle caching
for res in results:
@@ -1566,7 +1572,7 @@ class Exchange:
if not self.exchange_has("fetchTrades"):
raise OperationalException("This exchange does not support downloading Trades.")
return asyncio.get_event_loop().run_until_complete(
return self.loop.run_until_complete(
self._async_get_trade_history(pair=pair, since=since,
until=until, from_id=from_id))

View File

@@ -126,6 +126,7 @@ class FreqtradeBot(LoggingMixin):
self.rpc.cleanup()
cleanup_db()
self.exchange.close()
def startup(self) -> None:
"""

View File

@@ -246,6 +246,9 @@ class Backtesting:
Helper function to convert a processed dataframes into lists for performance reasons.
Used by backtest() - so keep this optimized for performance.
:param processed: a processed dictionary with format {pair, data}, which gets cleared to
optimize memory usage!
"""
# Every change to this headers list must evaluate further usages of the resulting tuple
# and eventually change the constants for indexes at the top
@@ -254,7 +257,8 @@ class Backtesting:
self.progress.init_step(BacktestState.CONVERT, len(processed))
# Create dict with data
for pair, pair_data in processed.items():
for pair in processed.keys():
pair_data = processed[pair]
self.check_abort()
self.progress.increment()
if not pair_data.empty:
@@ -283,6 +287,9 @@ class Backtesting:
# Convert from Pandas to list for performance reasons
# (Looping Pandas is slow.)
data[pair] = df_analyzed[headers].values.tolist()
# Do not hold on to old data to reduce memory usage
processed[pair] = pair_data = None
return data
def _get_close_rate(self, sell_row: Tuple, trade: LocalTrade, sell: SellCheckTuple,
@@ -571,7 +578,8 @@ class Backtesting:
Of course try to not have ugly code. By some accessor are sometime slower than functions.
Avoid extensive logging in this method and functions it calls.
:param processed: a processed dictionary with format {pair, data}
:param processed: a processed dictionary with format {pair, data}, which gets cleared to
optimize memory usage!
:param start_date: backtesting timerange start datetime
:param end_date: backtesting timerange end datetime
:param max_open_trades: maximum number of concurrent trades, <= 0 means unlimited

View File

@@ -422,6 +422,7 @@ class Hyperopt:
self.backtesting.exchange.close()
self.backtesting.exchange._api = None # type: ignore
self.backtesting.exchange._api_async = None # type: ignore
self.backtesting.exchange.loop = None # type: ignore
# self.backtesting.exchange = None # type: ignore
self.backtesting.pairlists = None # type: ignore

View File

@@ -5,7 +5,8 @@ from typing import Any, Dict, List
import pandas as pd
from freqtrade.configuration import TimeRange
from freqtrade.data.btanalysis import (calculate_max_drawdown, combine_dataframes_with_mean,
from freqtrade.data.btanalysis import (analyze_trade_parallelism, calculate_max_drawdown,
calculate_underwater, combine_dataframes_with_mean,
create_cum_profit, extract_trades_of_period, load_trades)
from freqtrade.data.converter import trim_dataframe
from freqtrade.data.dataprovider import DataProvider
@@ -185,6 +186,48 @@ def add_max_drawdown(fig, row, trades: pd.DataFrame, df_comb: pd.DataFrame,
return fig
def add_underwater(fig, row, trades: pd.DataFrame) -> make_subplots:
"""
Add underwater plot
"""
try:
underwater = calculate_underwater(trades, value_col="profit_abs")
underwater = go.Scatter(
x=underwater['date'],
y=underwater['drawdown'],
name="Underwater Plot",
fill='tozeroy',
fillcolor='#cc362b',
line={'color': '#cc362b'},
)
fig.add_trace(underwater, row, 1)
except ValueError:
logger.warning("No trades found - not plotting underwater plot")
return fig
def add_parallelism(fig, row, trades: pd.DataFrame, timeframe: str) -> make_subplots:
"""
Add Chart showing trade parallelism
"""
try:
result = analyze_trade_parallelism(trades, timeframe)
drawdown = go.Scatter(
x=result.index,
y=result['open_trades'],
name="Parallel trades",
fill='tozeroy',
fillcolor='#242222',
line={'color': '#242222'},
)
fig.add_trace(drawdown, row, 1)
except ValueError:
logger.warning("No trades found - not plotting Parallelism.")
return fig
def plot_trades(fig, trades: pd.DataFrame) -> make_subplots:
"""
Add trades to "fig"
@@ -460,7 +503,12 @@ def generate_candlestick_graph(pair: str, data: pd.DataFrame, trades: pd.DataFra
def generate_profit_graph(pairs: str, data: Dict[str, pd.DataFrame],
trades: pd.DataFrame, timeframe: str, stake_currency: str) -> go.Figure:
# Combine close-values for all pairs, rename columns to "pair"
df_comb = combine_dataframes_with_mean(data, "close")
try:
df_comb = combine_dataframes_with_mean(data, "close")
except ValueError:
raise OperationalException(
"No data found. Please make sure that data is available for "
"the timerange and pairs selected.")
# Trim trades to available OHLCV data
trades = extract_trades_of_period(df_comb, trades, date_index=True)
@@ -477,20 +525,30 @@ def generate_profit_graph(pairs: str, data: Dict[str, pd.DataFrame],
name='Avg close price',
)
fig = make_subplots(rows=3, cols=1, shared_xaxes=True,
row_width=[1, 1, 1],
fig = make_subplots(rows=5, cols=1, shared_xaxes=True,
row_heights=[1, 1, 1, 0.5, 1],
vertical_spacing=0.05,
subplot_titles=["AVG Close Price", "Combined Profit", "Profit per pair"])
subplot_titles=[
"AVG Close Price",
"Combined Profit",
"Profit per pair",
"Parallelism",
"Underwater",
])
fig['layout'].update(title="Freqtrade Profit plot")
fig['layout']['yaxis1'].update(title='Price')
fig['layout']['yaxis2'].update(title=f'Profit {stake_currency}')
fig['layout']['yaxis3'].update(title=f'Profit {stake_currency}')
fig['layout']['yaxis4'].update(title='Trade count')
fig['layout']['yaxis5'].update(title='Underwater Plot')
fig['layout']['xaxis']['rangeslider'].update(visible=False)
fig.update_layout(modebar_add=["v1hovermode", "toggleSpikeLines"])
fig.add_trace(avgclose, 1, 1)
fig = add_profit(fig, 2, df_comb, 'cum_profit', 'Profit')
fig = add_max_drawdown(fig, 2, trades, df_comb, timeframe)
fig = add_parallelism(fig, 4, trades, timeframe)
fig = add_underwater(fig, 5, trades)
for pair in pairs:
profit_col = f'cum_profit_{pair}'

View File

@@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional
import arrow
import numpy as np
from cachetools.ttl import TTLCache
from cachetools import TTLCache
from pandas import DataFrame
from freqtrade.exceptions import OperationalException

View File

@@ -8,7 +8,7 @@ from functools import partial
from typing import Any, Dict, List
import arrow
from cachetools.ttl import TTLCache
from cachetools import TTLCache
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_minutes

View File

@@ -6,7 +6,7 @@ from copy import deepcopy
from typing import Any, Dict, List, Optional
import arrow
from cachetools.ttl import TTLCache
from cachetools import TTLCache
from pandas import DataFrame
from freqtrade.exceptions import OperationalException

View File

@@ -47,7 +47,7 @@ class UvicornServer(uvicorn.Server):
else:
asyncio.set_event_loop(uvloop.new_event_loop())
try:
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
except RuntimeError:
# When running in a thread, we'll not have an eventloop yet.
loop = asyncio.new_event_loop()

View File

@@ -7,7 +7,7 @@ import datetime
import logging
from typing import Dict, List
from cachetools.ttl import TTLCache
from cachetools import TTLCache
from pycoingecko import CoinGeckoAPI
from requests.exceptions import RequestException

View File

@@ -199,8 +199,8 @@ class Telegram(RPCHandler):
self._updater.start_polling(
bootstrap_retries=-1,
timeout=30,
read_latency=60,
timeout=20,
read_latency=60, # Assumed transmission latency
drop_pending_updates=True,
)
logger.info(
@@ -213,6 +213,7 @@ class Telegram(RPCHandler):
Stops all running telegram threads.
:return: None
"""
# This can take up to `timeout` from the call to `start_polling`.
self._updater.stop()
def _format_buy_msg(self, msg: Dict[str, Any]) -> str:

View File

@@ -727,23 +727,21 @@ class IStrategy(ABC, HyperStrategyMixin):
custom_reason = custom_reason[:CUSTOM_SELL_MAX_LENGTH]
else:
custom_reason = None
# TODO: return here if sell-signal should be favored over ROI
if sell_signal in (SellType.CUSTOM_SELL, SellType.SELL_SIGNAL):
logger.debug(f"{trade.pair} - Sell signal received. "
f"sell_type=SellType.{sell_signal.name}" +
(f", custom_reason={custom_reason}" if custom_reason else ""))
return SellCheckTuple(sell_type=sell_signal, sell_reason=custom_reason)
# Start evaluations
# Sequence:
# ROI (if not stoploss)
# Sell-signal
# ROI (if not stoploss)
# Stoploss
if roi_reached and stoplossflag.sell_type != SellType.STOP_LOSS:
logger.debug(f"{trade.pair} - Required profit reached. sell_type=SellType.ROI")
return SellCheckTuple(sell_type=SellType.ROI)
if sell_signal != SellType.NONE:
logger.debug(f"{trade.pair} - Sell signal received. "
f"sell_type=SellType.{sell_signal.name}" +
(f", custom_reason={custom_reason}" if custom_reason else ""))
return SellCheckTuple(sell_type=sell_signal, sell_reason=custom_reason)
if stoplossflag.sell_flag:
logger.debug(f"{trade.pair} - Stoploss hit. sell_type={stoplossflag.sell_type}")