174 lines
7.4 KiB
Python
174 lines
7.4 KiB
Python
|
import logging
|
||
|
from typing import Dict, Tuple
|
||
|
|
||
|
import numpy as np
|
||
|
import pandas as pd
|
||
|
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
def calculate_market_change(data: Dict[str, pd.DataFrame], column: str = "close") -> float:
|
||
|
"""
|
||
|
Calculate market change based on "column".
|
||
|
Calculation is done by taking the first non-null and the last non-null element of each column
|
||
|
and calculating the pctchange as "(last - first) / first".
|
||
|
Then the results per pair are combined as mean.
|
||
|
|
||
|
:param data: Dict of Dataframes, dict key should be pair.
|
||
|
:param column: Column in the original dataframes to use
|
||
|
:return:
|
||
|
"""
|
||
|
tmp_means = []
|
||
|
for pair, df in data.items():
|
||
|
start = df[column].dropna().iloc[0]
|
||
|
end = df[column].dropna().iloc[-1]
|
||
|
tmp_means.append((end - start) / start)
|
||
|
|
||
|
return float(np.mean(tmp_means))
|
||
|
|
||
|
|
||
|
def combine_dataframes_with_mean(data: Dict[str, pd.DataFrame],
|
||
|
column: str = "close") -> pd.DataFrame:
|
||
|
"""
|
||
|
Combine multiple dataframes "column"
|
||
|
:param data: Dict of Dataframes, dict key should be pair.
|
||
|
:param column: Column in the original dataframes to use
|
||
|
:return: DataFrame with the column renamed to the dict key, and a column
|
||
|
named mean, containing the mean of all pairs.
|
||
|
:raise: ValueError if no data is provided.
|
||
|
"""
|
||
|
df_comb = pd.concat([data[pair].set_index('date').rename(
|
||
|
{column: pair}, axis=1)[pair] for pair in data], axis=1)
|
||
|
|
||
|
df_comb['mean'] = df_comb.mean(axis=1)
|
||
|
|
||
|
return df_comb
|
||
|
|
||
|
|
||
|
def create_cum_profit(df: pd.DataFrame, trades: pd.DataFrame, col_name: str,
|
||
|
timeframe: str) -> pd.DataFrame:
|
||
|
"""
|
||
|
Adds a column `col_name` with the cumulative profit for the given trades array.
|
||
|
:param df: DataFrame with date index
|
||
|
:param trades: DataFrame containing trades (requires columns close_date and profit_abs)
|
||
|
:param col_name: Column name that will be assigned the results
|
||
|
:param timeframe: Timeframe used during the operations
|
||
|
:return: Returns df with one additional column, col_name, containing the cumulative profit.
|
||
|
:raise: ValueError if trade-dataframe was found empty.
|
||
|
"""
|
||
|
if len(trades) == 0:
|
||
|
raise ValueError("Trade dataframe empty.")
|
||
|
from freqtrade.exchange import timeframe_to_minutes
|
||
|
timeframe_minutes = timeframe_to_minutes(timeframe)
|
||
|
# Resample to timeframe to make sure trades match candles
|
||
|
_trades_sum = trades.resample(f'{timeframe_minutes}min', on='close_date'
|
||
|
)[['profit_abs']].sum()
|
||
|
df.loc[:, col_name] = _trades_sum['profit_abs'].cumsum()
|
||
|
# Set first value to 0
|
||
|
df.loc[df.iloc[0].name, col_name] = 0
|
||
|
# FFill to get continuous
|
||
|
df[col_name] = df[col_name].ffill()
|
||
|
return df
|
||
|
|
||
|
|
||
|
def _calc_drawdown_series(profit_results: pd.DataFrame, *, date_col: str, value_col: str
|
||
|
) -> pd.DataFrame:
|
||
|
max_drawdown_df = pd.DataFrame()
|
||
|
max_drawdown_df['cumulative'] = profit_results[value_col].cumsum()
|
||
|
max_drawdown_df['high_value'] = max_drawdown_df['cumulative'].cummax()
|
||
|
max_drawdown_df['drawdown'] = max_drawdown_df['cumulative'] - max_drawdown_df['high_value']
|
||
|
max_drawdown_df['date'] = profit_results.loc[:, date_col]
|
||
|
return max_drawdown_df
|
||
|
|
||
|
|
||
|
def calculate_underwater(trades: pd.DataFrame, *, date_col: str = 'close_date',
|
||
|
value_col: str = 'profit_ratio'
|
||
|
):
|
||
|
"""
|
||
|
Calculate max drawdown and the corresponding close dates
|
||
|
:param trades: DataFrame containing trades (requires columns close_date and profit_ratio)
|
||
|
:param date_col: Column in DataFrame to use for dates (defaults to 'close_date')
|
||
|
:param value_col: Column in DataFrame to use for values (defaults to 'profit_ratio')
|
||
|
:return: Tuple (float, highdate, lowdate, highvalue, lowvalue) with absolute max drawdown,
|
||
|
high and low time and high and low value.
|
||
|
:raise: ValueError if trade-dataframe was found empty.
|
||
|
"""
|
||
|
if len(trades) == 0:
|
||
|
raise ValueError("Trade dataframe empty.")
|
||
|
profit_results = trades.sort_values(date_col).reset_index(drop=True)
|
||
|
max_drawdown_df = _calc_drawdown_series(profit_results, date_col=date_col, value_col=value_col)
|
||
|
|
||
|
return max_drawdown_df
|
||
|
|
||
|
|
||
|
def calculate_max_drawdown(trades: pd.DataFrame, *, date_col: str = 'close_date',
|
||
|
value_col: str = 'profit_abs', starting_balance: float = 0
|
||
|
) -> Tuple[float, pd.Timestamp, pd.Timestamp, float, float, float]:
|
||
|
"""
|
||
|
Calculate max drawdown and the corresponding close dates
|
||
|
:param trades: DataFrame containing trades (requires columns close_date and profit_ratio)
|
||
|
:param date_col: Column in DataFrame to use for dates (defaults to 'close_date')
|
||
|
:param value_col: Column in DataFrame to use for values (defaults to 'profit_abs')
|
||
|
:param starting_balance: Portfolio starting balance - properly calculate relative drawdown.
|
||
|
:return: Tuple (float, highdate, lowdate, highvalue, lowvalue, relative_drawdown)
|
||
|
with absolute max drawdown, high and low time and high and low value,
|
||
|
and the relative account drawdown
|
||
|
:raise: ValueError if trade-dataframe was found empty.
|
||
|
"""
|
||
|
if len(trades) == 0:
|
||
|
raise ValueError("Trade dataframe empty.")
|
||
|
profit_results = trades.sort_values(date_col).reset_index(drop=True)
|
||
|
max_drawdown_df = _calc_drawdown_series(profit_results, date_col=date_col, value_col=value_col)
|
||
|
|
||
|
idxmin = max_drawdown_df['drawdown'].idxmin()
|
||
|
if idxmin == 0:
|
||
|
raise ValueError("No losing trade, therefore no drawdown.")
|
||
|
high_date = profit_results.loc[max_drawdown_df.iloc[:idxmin]['high_value'].idxmax(), date_col]
|
||
|
low_date = profit_results.loc[idxmin, date_col]
|
||
|
high_val = max_drawdown_df.loc[max_drawdown_df.iloc[:idxmin]
|
||
|
['high_value'].idxmax(), 'cumulative']
|
||
|
low_val = max_drawdown_df.loc[idxmin, 'cumulative']
|
||
|
max_drawdown_rel = 0.0
|
||
|
if high_val + starting_balance != 0:
|
||
|
max_drawdown_rel = (high_val - low_val) / (high_val + starting_balance)
|
||
|
|
||
|
return (
|
||
|
abs(min(max_drawdown_df['drawdown'])),
|
||
|
high_date,
|
||
|
low_date,
|
||
|
high_val,
|
||
|
low_val,
|
||
|
max_drawdown_rel
|
||
|
)
|
||
|
|
||
|
|
||
|
def calculate_csum(trades: pd.DataFrame, starting_balance: float = 0) -> Tuple[float, float]:
|
||
|
"""
|
||
|
Calculate min/max cumsum of trades, to show if the wallet/stake amount ratio is sane
|
||
|
:param trades: DataFrame containing trades (requires columns close_date and profit_percent)
|
||
|
:param starting_balance: Add starting balance to results, to show the wallets high / low points
|
||
|
:return: Tuple (float, float) with cumsum of profit_abs
|
||
|
:raise: ValueError if trade-dataframe was found empty.
|
||
|
"""
|
||
|
if len(trades) == 0:
|
||
|
raise ValueError("Trade dataframe empty.")
|
||
|
|
||
|
csum_df = pd.DataFrame()
|
||
|
csum_df['sum'] = trades['profit_abs'].cumsum()
|
||
|
csum_min = csum_df['sum'].min() + starting_balance
|
||
|
csum_max = csum_df['sum'].max() + starting_balance
|
||
|
|
||
|
return csum_min, csum_max
|
||
|
|
||
|
|
||
|
def calculate_cagr(days_passed: int, starting_balance: float, final_balance: float) -> float:
|
||
|
"""
|
||
|
Calculate CAGR
|
||
|
:param days_passed: Days passed between start and ending balance
|
||
|
:param starting_balance: Starting balance
|
||
|
:param final_balance: Final balance to calculate CAGR against
|
||
|
:return: CAGR
|
||
|
"""
|
||
|
return (final_balance / starting_balance) ** (1 / (days_passed / 365)) - 1
|