Merge remote-tracking branch 'origin/develop' into feat/convolutional-neural-net

This commit is contained in:
robcaulk
2022-12-16 12:24:35 +01:00
60 changed files with 969 additions and 403 deletions

View File

@@ -355,6 +355,13 @@ def _validate_freqai_include_timeframes(conf: Dict[str, Any]) -> None:
f"Main timeframe of {main_tf} must be smaller or equal to FreqAI "
f"`include_timeframes`.Offending include-timeframes: {', '.join(offending_lines)}")
# Ensure that the base timeframe is included in the include_timeframes list
if main_tf not in freqai_include_timeframes:
feature_parameters = conf.get('freqai', {}).get('feature_parameters', {})
include_timeframes = [main_tf] + freqai_include_timeframes
conf.get('freqai', {}).get('feature_parameters', {}) \
.update({**feature_parameters, 'include_timeframes': include_timeframes})
def _validate_freqai_backtest(conf: Dict[str, Any]) -> None:
if conf.get('runmode', RunMode.OTHER) == RunMode.BACKTEST:

View File

@@ -61,6 +61,7 @@ USERPATH_FREQAIMODELS = 'freqaimodels'
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
FULL_DATAFRAME_THRESHOLD = 100
ENV_VAR_PREFIX = 'FREQTRADE__'
@@ -608,9 +609,8 @@ CONF_SCHEMA = {
"backtest_period_days",
"identifier",
"feature_parameters",
"data_split_parameters",
"model_training_parameters"
]
"data_split_parameters"
]
},
},
}

View File

@@ -9,14 +9,16 @@ from collections import deque
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from pandas import DataFrame
from pandas import DataFrame, to_timedelta
from freqtrade.configuration import TimeRange
from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe
from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes,
PairWithTimeframe)
from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode
from freqtrade.exceptions import ExchangeError, OperationalException
from freqtrade.exchange import Exchange, timeframe_to_seconds
from freqtrade.misc import append_candles_to_dataframe
from freqtrade.rpc import RPCManager
from freqtrade.util import PeriodicCache
@@ -104,13 +106,15 @@ class DataProvider:
def _emit_df(
self,
pair_key: PairWithTimeframe,
dataframe: DataFrame
dataframe: DataFrame,
new_candle: bool
) -> None:
"""
Send this dataframe as an ANALYZED_DF message to RPC
:param pair_key: PairWithTimeframe tuple
:param data: Tuple containing the DataFrame and the datetime it was cached
:param dataframe: Dataframe to emit
:param new_candle: This is a new candle
"""
if self.__rpc:
self.__rpc.send_msg(
@@ -118,13 +122,18 @@ class DataProvider:
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': pair_key,
'df': dataframe,
'df': dataframe.tail(1),
'la': datetime.now(timezone.utc)
}
}
)
if new_candle:
self.__rpc.send_msg({
'type': RPCMessageType.NEW_CANDLE,
'data': pair_key,
})
def _add_external_df(
def _replace_external_df(
self,
pair: str,
dataframe: DataFrame,
@@ -150,6 +159,85 @@ class DataProvider:
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
def _add_external_df(
self,
pair: str,
dataframe: DataFrame,
last_analyzed: datetime,
timeframe: str,
candle_type: CandleType,
producer_name: str = "default"
) -> Tuple[bool, int]:
"""
Append a candle to the existing external dataframe. The incoming dataframe
must have at least 1 candle.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: Any of the enum CandleType (must match trading mode!)
:returns: False if the candle could not be appended, or the int number of missing candles.
"""
pair_key = (pair, timeframe, candle_type)
if dataframe.empty:
# The incoming dataframe must have at least 1 candle
return (False, 0)
if len(dataframe) >= FULL_DATAFRAME_THRESHOLD:
# This is likely a full dataframe
# Add the dataframe to the dataprovider
self._replace_external_df(
pair,
dataframe,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
return (True, 0)
if (producer_name not in self.__producer_pairs_df
or pair_key not in self.__producer_pairs_df[producer_name]):
# We don't have data from this producer yet,
# or we don't have data for this pair_key
# return False and 1000 for the full df
return (False, 1000)
existing_df, _ = self.__producer_pairs_df[producer_name][pair_key]
# CHECK FOR MISSING CANDLES
timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas
local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy
incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming
# Remove existing candles that are newer than the incoming first candle
existing_df1 = existing_df[existing_df['date'] < incoming_first]
candle_difference = (incoming_first - local_last) / timeframe_delta
# If the difference divided by the timeframe is 1, then this
# is the candle we want and the incoming data isn't missing any.
# If the candle_difference is more than 1, that means
# we missed some candles between our data and the incoming
# so return False and candle_difference.
if candle_difference > 1:
return (False, candle_difference)
if existing_df1.empty:
appended_df = dataframe
else:
appended_df = append_candles_to_dataframe(existing_df1, dataframe)
# Everything is good, we appended
self._replace_external_df(
pair,
appended_df,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
return (True, 0)
def get_producer_df(
self,
pair: str,

View File

@@ -6,7 +6,7 @@ from freqtrade.enums.exittype import ExitType
from freqtrade.enums.hyperoptstate import HyperoptState
from freqtrade.enums.marginmode import MarginMode
from freqtrade.enums.ordertypevalue import OrderTypeValues
from freqtrade.enums.rpcmessagetype import RPCMessageType, RPCRequestType
from freqtrade.enums.rpcmessagetype import NO_ECHO_MESSAGES, RPCMessageType, RPCRequestType
from freqtrade.enums.runmode import NON_UTIL_MODES, OPTIMIZE_MODES, TRADING_MODES, RunMode
from freqtrade.enums.signaltype import SignalDirection, SignalTagType, SignalType
from freqtrade.enums.state import State

View File

@@ -21,6 +21,7 @@ class RPCMessageType(str, Enum):
WHITELIST = 'whitelist'
ANALYZED_DF = 'analyzed_df'
NEW_CANDLE = 'new_candle'
def __repr__(self):
return self.value
@@ -35,3 +36,6 @@ class RPCRequestType(str, Enum):
WHITELIST = 'whitelist'
ANALYZED_DF = 'analyzed_df'
NO_ECHO_MESSAGES = (RPCMessageType.ANALYZED_DF, RPCMessageType.WHITELIST, RPCMessageType.NEW_CANDLE)

View File

@@ -3,7 +3,6 @@
from freqtrade.exchange.common import remove_credentials, MAP_EXCHANGE_CHILDCLASS
from freqtrade.exchange.exchange import Exchange
# isort: on
from freqtrade.exchange.bibox import Bibox
from freqtrade.exchange.binance import Binance
from freqtrade.exchange.bitpanda import Bitpanda
from freqtrade.exchange.bittrex import Bittrex

View File

@@ -1,28 +0,0 @@
""" Bibox exchange subclass """
import logging
from typing import Dict
from freqtrade.exchange import Exchange
logger = logging.getLogger(__name__)
class Bibox(Exchange):
"""
Bibox exchange class. Contains adjustments needed for Freqtrade to work
with this exchange.
Please note that this exchange is not included in the list of exchanges
officially supported by the Freqtrade development team. So some features
may still not work as expected.
"""
# fetchCurrencies API point requires authentication for Bibox,
# so switch it off for Freqtrade load_markets()
@property
def _ccxt_config(self) -> Dict:
# Parameters to add directly to ccxt sync/async initialization.
config = {"has": {"fetchCurrencies": False}}
config.update(super()._ccxt_config)
return config

View File

@@ -20,6 +20,9 @@ class Base4ActionRLEnv(BaseEnvironment):
"""
Base class for a 4 action environment
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.actions = Actions
def set_action_space(self):
self.action_space = spaces.Discrete(len(Actions))
@@ -43,9 +46,9 @@ class Base4ActionRLEnv(BaseEnvironment):
self._done = True
self._update_unrealized_total_profit()
step_reward = self.calculate_reward(action)
self.total_reward += step_reward
self.tensorboard_log(self.actions._member_names_[action])
trade_type = None
if self.is_tradesignal(action):
@@ -92,9 +95,12 @@ class Base4ActionRLEnv(BaseEnvironment):
info = dict(
tick=self._current_tick,
action=action,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
position=self._position.value,
trade_duration=self.get_trade_duration(),
current_profit_pct=self.get_unrealized_profit()
)
observation = self._get_observation()

View File

@@ -21,6 +21,9 @@ class Base5ActionRLEnv(BaseEnvironment):
"""
Base class for a 5 action environment
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.actions = Actions
def set_action_space(self):
self.action_space = spaces.Discrete(len(Actions))
@@ -46,6 +49,7 @@ class Base5ActionRLEnv(BaseEnvironment):
self._update_unrealized_total_profit()
step_reward = self.calculate_reward(action)
self.total_reward += step_reward
self.tensorboard_log(self.actions._member_names_[action])
trade_type = None
if self.is_tradesignal(action):
@@ -98,9 +102,12 @@ class Base5ActionRLEnv(BaseEnvironment):
info = dict(
tick=self._current_tick,
action=action,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
position=self._position.value,
trade_duration=self.get_trade_duration(),
current_profit_pct=self.get_unrealized_profit()
)
observation = self._get_observation()

View File

@@ -2,7 +2,7 @@ import logging
import random
from abc import abstractmethod
from enum import Enum
from typing import Optional
from typing import Optional, Type, Union
import gym
import numpy as np
@@ -11,12 +11,21 @@ from gym import spaces
from gym.utils import seeding
from pandas import DataFrame
from freqtrade.data.dataprovider import DataProvider
logger = logging.getLogger(__name__)
class BaseActions(Enum):
"""
Default action space, mostly used for type handling.
"""
Neutral = 0
Long_enter = 1
Long_exit = 2
Short_enter = 3
Short_exit = 4
class Positions(Enum):
Short = 0
Long = 1
@@ -35,8 +44,8 @@ class BaseEnvironment(gym.Env):
def __init__(self, df: DataFrame = DataFrame(), prices: DataFrame = DataFrame(),
reward_kwargs: dict = {}, window_size=10, starting_point=True,
id: str = 'baseenv-1', seed: int = 1, config: dict = {},
dp: Optional[DataProvider] = None):
id: str = 'baseenv-1', seed: int = 1, config: dict = {}, live: bool = False,
fee: float = 0.0015):
"""
Initializes the training/eval environment.
:param df: dataframe of features
@@ -47,22 +56,29 @@ class BaseEnvironment(gym.Env):
:param id: string id of the environment (used in backend for multiprocessed env)
:param seed: Sets the seed of the environment higher in the gym.Env object
:param config: Typical user configuration file
:param dp: dataprovider from freqtrade
:param live: Whether or not this environment is active in dry/live/backtesting
:param fee: The fee to use for environmental interactions.
"""
self.config = config
self.rl_config = config['freqai']['rl_config']
self.add_state_info = self.rl_config.get('add_state_info', False)
self.id = id
self.seed(seed)
self.reset_env(df, prices, window_size, reward_kwargs, starting_point)
self.max_drawdown = 1 - self.rl_config.get('max_training_drawdown_pct', 0.8)
self.compound_trades = config['stake_amount'] == 'unlimited'
if self.config.get('fee', None) is not None:
self.fee = self.config['fee']
elif dp is not None:
self.fee = dp._exchange.get_fee(symbol=dp.current_whitelist()[0]) # type: ignore
else:
self.fee = 0.0015
self.fee = fee
# set here to default 5Ac, but all children envs can override this
self.actions: Type[Enum] = BaseActions
self.tensorboard_metrics: dict = {}
self.live = live
if not self.live and self.add_state_info:
self.add_state_info = False
logger.warning("add_state_info is not available in backtesting. Deactivating.")
self.seed(seed)
self.reset_env(df, prices, window_size, reward_kwargs, starting_point)
def reset_env(self, df: DataFrame, prices: DataFrame, window_size: int,
reward_kwargs: dict, starting_point=True):
@@ -117,7 +133,38 @@ class BaseEnvironment(gym.Env):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def tensorboard_log(self, metric: str, value: Union[int, float] = 1, inc: bool = True):
"""
Function builds the tensorboard_metrics dictionary
to be parsed by the TensorboardCallback. This
function is designed for tracking incremented objects,
events, actions inside the training environment.
For example, a user can call this to track the
frequency of occurence of an `is_valid` call in
their `calculate_reward()`:
def calculate_reward(self, action: int) -> float:
if not self._is_valid(action):
self.tensorboard_log("is_valid")
return -2
:param metric: metric to be tracked and incremented
:param value: value to increment `metric` by
:param inc: sets whether the `value` is incremented or not
"""
if not inc or metric not in self.tensorboard_metrics:
self.tensorboard_metrics[metric] = value
else:
self.tensorboard_metrics[metric] += value
def reset_tensorboard_log(self):
self.tensorboard_metrics = {}
def reset(self):
"""
Reset is called at the beginning of every episode
"""
self.reset_tensorboard_log()
self._done = False
@@ -271,6 +318,13 @@ class BaseEnvironment(gym.Env):
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def get_actions(self) -> Type[Enum]:
"""
Used by SubprocVecEnv to get actions from
initialized env for tensorboard callback
"""
return self.actions
# Keeping around incase we want to start building more complex environment
# templates in the future.
# def most_recent_return(self):

View File

@@ -21,7 +21,8 @@ from freqtrade.exceptions import OperationalException
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.RL.Base5ActionRLEnv import Actions, Base5ActionRLEnv
from freqtrade.freqai.RL.BaseEnvironment import Positions
from freqtrade.freqai.RL.BaseEnvironment import BaseActions, Positions
from freqtrade.freqai.RL.TensorboardCallback import TensorboardCallback
from freqtrade.persistence import Trade
@@ -44,8 +45,8 @@ class BaseReinforcementLearningModel(IFreqaiModel):
'cpu_count', 1), max(int(self.max_system_threads / 2), 1))
th.set_num_threads(self.max_threads)
self.reward_params = self.freqai_info['rl_config']['model_reward_parameters']
self.train_env: Union[SubprocVecEnv, gym.Env] = None
self.eval_env: Union[SubprocVecEnv, gym.Env] = None
self.train_env: Union[SubprocVecEnv, Type[gym.Env]] = gym.Env()
self.eval_env: Union[SubprocVecEnv, Type[gym.Env]] = gym.Env()
self.eval_callback: Optional[EvalCallback] = None
self.model_type = self.freqai_info['rl_config']['model_type']
self.rl_config = self.freqai_info['rl_config']
@@ -65,6 +66,8 @@ class BaseReinforcementLearningModel(IFreqaiModel):
self.unset_outlier_removal()
self.net_arch = self.rl_config.get('net_arch', [128, 128])
self.dd.model_type = import_str
self.tensorboard_callback: TensorboardCallback = \
TensorboardCallback(verbose=1, actions=BaseActions)
def unset_outlier_removal(self):
"""
@@ -140,22 +143,35 @@ class BaseReinforcementLearningModel(IFreqaiModel):
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
env_info = self.pack_env_dict()
self.train_env = self.MyRLEnv(df=train_df,
prices=prices_train,
window_size=self.CONV_WIDTH,
reward_kwargs=self.reward_params,
config=self.config,
dp=self.data_provider)
**env_info)
self.eval_env = Monitor(self.MyRLEnv(df=test_df,
prices=prices_test,
window_size=self.CONV_WIDTH,
reward_kwargs=self.reward_params,
config=self.config,
dp=self.data_provider))
**env_info))
self.eval_callback = EvalCallback(self.eval_env, deterministic=True,
render=False, eval_freq=len(train_df),
best_model_save_path=str(dk.data_path))
actions = self.train_env.get_actions()
self.tensorboard_callback = TensorboardCallback(verbose=1, actions=actions)
def pack_env_dict(self) -> Dict[str, Any]:
"""
Create dictionary of environment arguments
"""
env_info = {"window_size": self.CONV_WIDTH,
"reward_kwargs": self.reward_params,
"config": self.config,
"live": self.live}
if self.data_provider:
env_info["fee"] = self.data_provider._exchange \
.get_fee(symbol=self.data_provider.current_whitelist()[0]) # type: ignore
return env_info
@abstractmethod
def fit(self, data_dictionary: Dict[str, Any], dk: FreqaiDataKitchen, **kwargs):
"""
@@ -377,8 +393,8 @@ class BaseReinforcementLearningModel(IFreqaiModel):
def make_env(MyRLEnv: Type[gym.Env], env_id: str, rank: int,
seed: int, train_df: DataFrame, price: DataFrame,
reward_params: Dict[str, int], window_size: int, monitor: bool = False,
config: Dict[str, Any] = {}) -> Callable:
monitor: bool = False,
env_info: Dict[str, Any] = {}) -> Callable:
"""
Utility function for multiprocessed env.
@@ -386,13 +402,14 @@ def make_env(MyRLEnv: Type[gym.Env], env_id: str, rank: int,
:param num_env: (int) the number of environment you wish to have in subprocesses
:param seed: (int) the inital seed for RNG
:param rank: (int) index of the subprocess
:param env_info: (dict) all required arguments to instantiate the environment.
:return: (Callable)
"""
def _init() -> gym.Env:
env = MyRLEnv(df=train_df, prices=price, window_size=window_size,
reward_kwargs=reward_params, id=env_id, seed=seed + rank, config=config)
env = MyRLEnv(df=train_df, prices=price, id=env_id, seed=seed + rank,
**env_info)
if monitor:
env = Monitor(env)
return env

View File

@@ -0,0 +1,59 @@
from enum import Enum
from typing import Any, Dict, Type, Union
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.logger import HParam
from freqtrade.freqai.RL.BaseEnvironment import BaseActions, BaseEnvironment
class TensorboardCallback(BaseCallback):
"""
Custom callback for plotting additional values in tensorboard and
episodic summary reports.
"""
def __init__(self, verbose=1, actions: Type[Enum] = BaseActions):
super(TensorboardCallback, self).__init__(verbose)
self.model: Any = None
self.logger = None # type: Any
self.training_env: BaseEnvironment = None # type: ignore
self.actions: Type[Enum] = actions
def _on_training_start(self) -> None:
hparam_dict = {
"algorithm": self.model.__class__.__name__,
"learning_rate": self.model.learning_rate,
# "gamma": self.model.gamma,
# "gae_lambda": self.model.gae_lambda,
# "batch_size": self.model.batch_size,
# "n_steps": self.model.n_steps,
}
metric_dict: Dict[str, Union[float, int]] = {
"eval/mean_reward": 0,
"rollout/ep_rew_mean": 0,
"rollout/ep_len_mean": 0,
"train/value_loss": 0,
"train/explained_variance": 0,
}
self.logger.record(
"hparams",
HParam(hparam_dict, metric_dict),
exclude=("stdout", "log", "json", "csv"),
)
def _on_step(self) -> bool:
local_info = self.locals["infos"][0]
tensorboard_metrics = self.training_env.get_attr("tensorboard_metrics")[0]
for info in local_info:
if info not in ["episode", "terminal_observation"]:
self.logger.record(f"_info/{info}", local_info[info])
for info in tensorboard_metrics:
if info in [action.name for action in self.actions]:
self.logger.record(f"_actions/{info}", tensorboard_metrics[info])
else:
self.logger.record(f"_custom/{info}", tensorboard_metrics[info])
return True

View File

@@ -95,9 +95,14 @@ class BaseClassifierModel(IFreqaiModel):
self.data_cleaning_predict(dk)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
if self.CONV_WIDTH == 1:
predictions = np.reshape(predictions, (-1, len(dk.label_list)))
pred_df = DataFrame(predictions, columns=dk.label_list)
predictions_prob = self.model.predict_proba(dk.data_dictionary["prediction_features"])
if self.CONV_WIDTH == 1:
predictions_prob = np.reshape(predictions_prob, (-1, len(self.model.classes_)))
pred_df_prob = DataFrame(predictions_prob, columns=self.model.classes_)
pred_df = pd.concat([pred_df, pred_df_prob], axis=1)

View File

@@ -95,6 +95,9 @@ class BaseRegressionModel(IFreqaiModel):
self.data_cleaning_predict(dk)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
if self.CONV_WIDTH == 1:
predictions = np.reshape(predictions, (-1, len(dk.label_list)))
pred_df = DataFrame(predictions, columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df)

View File

@@ -462,10 +462,10 @@ class FreqaiDataKitchen:
:param df: Dataframe containing all candles to run the entire backtest. Here
it is sliced down to just the present training period.
"""
df = df.loc[df["date"] >= timerange.startdt, :]
if not self.live:
df = df.loc[df["date"] < timerange.stopdt, :]
df = df.loc[(df["date"] >= timerange.startdt) & (df["date"] < timerange.stopdt), :]
else:
df = df.loc[df["date"] >= timerange.startdt, :]
return df

View File

@@ -282,10 +282,10 @@ class IFreqaiModel(ABC):
train_it += 1
total_trains = len(dk.backtesting_timeranges)
self.training_timerange = tr_train
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
len_backtest_df = len(dataframe.loc[(dataframe["date"] >= tr_backtest.startdt) & (
dataframe["date"] < tr_backtest.stopdt), :])
if not self.ensure_data_exists(dataframe_backtest, tr_backtest, pair):
if not self.ensure_data_exists(len_backtest_df, tr_backtest, pair):
continue
self.log_backtesting_progress(tr_train, pair, train_it, total_trains)
@@ -298,13 +298,15 @@ class IFreqaiModel(ABC):
dk.set_new_model_names(pair, timestamp_model_id)
if dk.check_if_backtest_prediction_is_valid(len(dataframe_backtest)):
if dk.check_if_backtest_prediction_is_valid(len_backtest_df):
self.dd.load_metadata(dk)
dk.find_features(dataframe_train)
dk.find_features(dataframe)
self.check_if_feature_list_matches_strategy(dk)
append_df = dk.get_backtesting_prediction()
dk.append_predictions(append_df)
else:
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
if not self.model_exists(dk):
dk.find_features(dataframe_train)
dk.find_labels(dataframe_train)
@@ -804,16 +806,16 @@ class IFreqaiModel(ABC):
self.pair_it = 1
self.current_candle = self.dd.current_candle
def ensure_data_exists(self, dataframe_backtest: DataFrame,
def ensure_data_exists(self, len_dataframe_backtest: int,
tr_backtest: TimeRange, pair: str) -> bool:
"""
Check if the dataframe is empty, if not, report useful information to user.
:param dataframe_backtest: the backtesting dataframe, maybe empty.
:param len_dataframe_backtest: the len of backtesting dataframe
:param tr_backtest: current backtesting timerange.
:param pair: current pair
:return: if the data exists or not
"""
if self.config.get("freqai_backtest_live_models", False) and len(dataframe_backtest) == 0:
if self.config.get("freqai_backtest_live_models", False) and len_dataframe_backtest == 0:
logger.info(f"No data found for pair {pair} from "
f"from { tr_backtest.start_fmt} to {tr_backtest.stop_fmt}. "
"Probably more than one training within the same candle period.")

View File

@@ -61,7 +61,7 @@ class ReinforcementLearner(BaseReinforcementLearningModel):
model = self.MODELCLASS(self.policy_type, self.train_env, policy_kwargs=policy_kwargs,
tensorboard_log=Path(
dk.full_path / "tensorboard" / dk.pair.split('/')[0]),
**self.freqai_info['model_training_parameters']
**self.freqai_info.get('model_training_parameters', {})
)
else:
logger.info('Continual training activated - starting training from previously '
@@ -71,7 +71,7 @@ class ReinforcementLearner(BaseReinforcementLearningModel):
model.learn(
total_timesteps=int(total_timesteps),
callback=self.eval_callback
callback=[self.eval_callback, self.tensorboard_callback]
)
if Path(dk.data_path / "best_model.zip").is_file():
@@ -100,13 +100,17 @@ class ReinforcementLearner(BaseReinforcementLearningModel):
"""
# first, penalize if the action is not valid
if not self._is_valid(action):
self.tensorboard_log("is_valid")
return -2
pnl = self.get_unrealized_profit()
factor = 100.
# reward agent for entering trades
if (action in (Actions.Long_enter.value, Actions.Short_enter.value)
if (action == Actions.Long_enter.value
and self._position == Positions.Neutral):
return 25
if (action == Actions.Short_enter.value
and self._position == Positions.Neutral):
return 25
# discourage agent from not entering trades

View File

@@ -1,7 +1,6 @@
import logging
from typing import Any, Dict # , Tuple
from typing import Any, Dict
# import numpy.typing as npt
from pandas import DataFrame
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.vec_env import SubprocVecEnv
@@ -9,6 +8,7 @@ from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.prediction_models.ReinforcementLearner import ReinforcementLearner
from freqtrade.freqai.RL.BaseReinforcementLearningModel import make_env
from freqtrade.freqai.RL.TensorboardCallback import TensorboardCallback
logger = logging.getLogger(__name__)
@@ -34,18 +34,24 @@ class ReinforcementLearner_multiproc(ReinforcementLearner):
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
env_info = self.pack_env_dict()
env_id = "train_env"
self.train_env = SubprocVecEnv([make_env(self.MyRLEnv, env_id, i, 1, train_df, prices_train,
self.reward_params, self.CONV_WIDTH, monitor=True,
config=self.config) for i
self.train_env = SubprocVecEnv([make_env(self.MyRLEnv, env_id, i, 1,
train_df, prices_train,
monitor=True,
env_info=env_info) for i
in range(self.max_threads)])
eval_env_id = 'eval_env'
self.eval_env = SubprocVecEnv([make_env(self.MyRLEnv, eval_env_id, i, 1,
test_df, prices_test,
self.reward_params, self.CONV_WIDTH, monitor=True,
config=self.config) for i
monitor=True,
env_info=env_info) for i
in range(self.max_threads)])
self.eval_callback = EvalCallback(self.eval_env, deterministic=True,
render=False, eval_freq=len(train_df),
best_model_save_path=str(dk.data_path))
actions = self.train_env.env_method("get_actions")[0]
self.tensorboard_callback = TensorboardCallback(verbose=1, actions=actions)

View File

@@ -155,6 +155,8 @@ class FreqtradeBot(LoggingMixin):
self.cancel_all_open_orders()
self.check_for_open_trades()
except Exception as e:
logger.warning(f'Exception during cleanup: {e.__class__.__name__} {e}')
finally:
self.strategy.ft_bot_cleanup()
@@ -162,8 +164,13 @@ class FreqtradeBot(LoggingMixin):
self.rpc.cleanup()
if self.emc:
self.emc.shutdown()
Trade.commit()
self.exchange.close()
try:
Trade.commit()
except Exception:
# Exeptions here will be happening if the db disappeared.
# At which point we can no longer commit anyway.
pass
def startup(self) -> None:
"""

View File

@@ -301,3 +301,21 @@ def remove_entry_exit_signals(dataframe: pd.DataFrame):
dataframe[SignalTagType.EXIT_TAG.value] = None
return dataframe
def append_candles_to_dataframe(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
"""
Append the `right` dataframe to the `left` dataframe
:param left: The full dataframe you want appended to
:param right: The new dataframe containing the data you want appended
:returns: The dataframe with the right data in it
"""
if left.iloc[-1]['date'] != right.iloc[-1]['date']:
left = pd.concat([left, right])
# Only keep the last 1500 candles in memory
left = left[-1500:] if len(left) > 1500 else left
left.reset_index(drop=True, inplace=True)
return left

View File

@@ -218,7 +218,7 @@ class VolumePairList(IPairList):
else:
filtered_tickers[i]['quoteVolume'] = 0
else:
# Tickers mode - filter based on incomming pairlist.
# Tickers mode - filter based on incoming pairlist.
filtered_tickers = [v for k, v in tickers.items() if k in pairlist]
if self._min_value > 0:

View File

@@ -37,7 +37,8 @@ logger = logging.getLogger(__name__)
# 2.16: Additional daily metrics
# 2.17: Forceentry - leverage, partial force_exit
# 2.20: Add websocket endpoints
API_VERSION = 2.20
# 2.21: Add new_candle messagetype
API_VERSION = 2.21
# Public API, requires no auth.
router_public = APIRouter()

View File

@@ -91,9 +91,10 @@ async def _process_consumer_request(
elif type == RPCRequestType.ANALYZED_DF:
# Limit the amount of candles per dataframe to 'limit' or 1500
limit = min(data.get('limit', 1500), 1500) if data else None
pair = data.get('pair', None) if data else None
# For every pair in the generator, send a separate message
for message in rpc._ws_request_analyzed_df(limit):
for message in rpc._ws_request_analyzed_df(limit, pair):
# Format response
response = WSAnalyzedDFMessage(data=message)
await channel.send(response.dict(exclude_none=True))

View File

@@ -27,7 +27,8 @@ class WebSocketChannel:
self,
websocket: WebSocketType,
channel_id: Optional[str] = None,
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
send_throttle: float = 0.01
):
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
self._websocket = WebSocketProxy(websocket)
@@ -41,6 +42,7 @@ class WebSocketChannel:
self._send_times: Deque[float] = deque([], maxlen=10)
# High limit defaults to 3 to start
self._send_high_limit = 3
self._send_throttle = send_throttle
# The subscribed message types
self._subscriptions: List[str] = []
@@ -106,7 +108,8 @@ class WebSocketChannel:
# Explicitly give control back to event loop as
# websockets.send does not
await asyncio.sleep(0.01)
# Also throttles how fast we send
await asyncio.sleep(self._send_throttle)
async def recv(self):
"""

View File

@@ -47,7 +47,7 @@ class WSWhitelistRequest(WSRequestSchema):
class WSAnalyzedDFRequest(WSRequestSchema):
type: RPCRequestType = RPCRequestType.ANALYZED_DF
data: Dict[str, Any] = {"limit": 1500}
data: Dict[str, Any] = {"limit": 1500, "pair": None}
# ------------------------------ MESSAGE SCHEMAS ----------------------------

View File

@@ -8,15 +8,17 @@ import asyncio
import logging
import socket
from threading import Thread
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict
from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
import websockets
from pydantic import ValidationError
from freqtrade.constants import FULL_DATAFRAME_THRESHOLD
from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import RPCMessageType
from freqtrade.misc import remove_entry_exit_signals
from freqtrade.rpc.api_server.ws import WebSocketChannel
from freqtrade.rpc.api_server.ws.channel import WebSocketChannel, create_channel
from freqtrade.rpc.api_server.ws.message_stream import MessageStream
from freqtrade.rpc.api_server.ws_schemas import (WSAnalyzedDFMessage, WSAnalyzedDFRequest,
WSMessageSchema, WSRequestSchema,
WSSubscribeRequest, WSWhitelistMessage,
@@ -38,6 +40,10 @@ class Producer(TypedDict):
logger = logging.getLogger(__name__)
def schema_to_dict(schema: Union[WSMessageSchema, WSRequestSchema]):
return schema.dict(exclude_none=True)
class ExternalMessageConsumer:
"""
The main controller class for consuming external messages from
@@ -92,6 +98,8 @@ class ExternalMessageConsumer:
RPCMessageType.ANALYZED_DF: self._consume_analyzed_df_message,
}
self._channel_streams: Dict[str, MessageStream] = {}
self.start()
def start(self):
@@ -118,6 +126,8 @@ class ExternalMessageConsumer:
logger.info("Stopping ExternalMessageConsumer")
self._running = False
self._channel_streams = {}
if self._sub_tasks:
# Cancel sub tasks
for task in self._sub_tasks:
@@ -175,7 +185,6 @@ class ExternalMessageConsumer:
:param producer: Dictionary containing producer info
:param lock: An asyncio Lock
"""
channel = None
while self._running:
try:
host, port = producer['host'], producer['port']
@@ -190,19 +199,21 @@ class ExternalMessageConsumer:
max_size=self.message_size_limit,
ping_interval=None
) as ws:
channel = WebSocketChannel(ws, channel_id=name)
async with create_channel(
ws,
channel_id=name,
send_throttle=0.5
) as channel:
logger.info(f"Producer connection success - {channel}")
# Create the message stream for this channel
self._channel_streams[name] = MessageStream()
# Now request the initial data from this Producer
for request in self._initial_requests:
await channel.send(
request.dict(exclude_none=True)
# Run the channel tasks while connected
await channel.run_channel_tasks(
self._receive_messages(channel, producer, lock),
self._send_requests(channel, self._channel_streams[name])
)
# Now receive data, if none is within the time limit, ping
await self._receive_messages(channel, producer, lock)
except (websockets.exceptions.InvalidURI, ValueError) as e:
logger.error(f"{ws_url} is an invalid WebSocket URL - {e}")
break
@@ -229,11 +240,19 @@ class ExternalMessageConsumer:
# An unforseen error has occurred, log and continue
logger.error("Unexpected error has occurred:")
logger.exception(e)
await asyncio.sleep(self.sleep_time)
continue
finally:
if channel:
await channel.close()
async def _send_requests(self, channel: WebSocketChannel, channel_stream: MessageStream):
# Send the initial requests
for init_request in self._initial_requests:
await channel.send(schema_to_dict(init_request))
# Now send any subsequent requests published to
# this channel's stream
async for request, _ in channel_stream:
logger.debug(f"Sending request to channel - {channel} - {request}")
await channel.send(request)
async def _receive_messages(
self,
@@ -270,19 +289,31 @@ class ExternalMessageConsumer:
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
continue
except (websockets.exceptions.ConnectionClosed):
# Just eat the error and continue reconnecting
logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except Exception as e:
# Just eat the error and continue reconnecting
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
logger.debug(e, exc_info=e)
await asyncio.sleep(self.sleep_time)
raise
break
def send_producer_request(
self,
producer_name: str,
request: Union[WSRequestSchema, Dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
sent by the channel task.
:param producer_name: The name of the producer to publish the message to
:param request: The request to send to the producer
"""
if isinstance(request, WSRequestSchema):
request = schema_to_dict(request)
if channel_stream := self._channel_streams.get(producer_name):
channel_stream.publish(request)
def handle_producer_message(self, producer: Producer, message: Dict[str, Any]):
"""
@@ -336,16 +367,45 @@ class ExternalMessageConsumer:
pair, timeframe, candle_type = key
if df.empty:
logger.debug(f"Received Empty Dataframe for {key}")
return
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
df = remove_entry_exit_signals(df)
# Add the dataframe to the dataprovider
self._dp._add_external_df(pair, df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name)
logger.debug(f"Received {len(df)} candle(s) for {key}")
did_append, n_missing = self._dp._add_external_df(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
if not did_append:
# We want an overlap in candles incase some data has changed
n_missing += 1
# Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles "
f"for {key} from `{producer_name}`")
self.send_producer_request(
producer_name,
WSAnalyzedDFRequest(
data={
"limit": n_missing,
"pair": pair
}
)
)
return
logger.debug(
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`")
f"Consumed message from `{producer_name}` "
f"of type `RPCMessageType.ANALYZED_DF` for {key}")

View File

@@ -167,6 +167,7 @@ class RPC:
results = []
for trade in trades:
order: Optional[Order] = None
current_profit_fiat: Optional[float] = None
if trade.open_order_id:
order = trade.select_order_by_order_id(trade.open_order_id)
# calculate profit and send message to user
@@ -176,23 +177,26 @@ class RPC:
trade.pair, side='exit', is_short=trade.is_short, refresh=False)
except (ExchangeError, PricingError):
current_rate = NAN
if len(trade.select_filled_orders(trade.entry_side)) > 0:
current_profit = trade.calc_profit_ratio(
current_rate) if not isnan(current_rate) else NAN
current_profit_abs = trade.calc_profit(
current_rate) if not isnan(current_rate) else NAN
else:
current_profit = current_profit_abs = current_profit_fiat = 0.0
else:
# Closed trade ...
current_rate = trade.close_rate
if len(trade.select_filled_orders(trade.entry_side)) > 0:
current_profit = trade.calc_profit_ratio(
current_rate) if not isnan(current_rate) else NAN
current_profit_abs = trade.calc_profit(
current_rate) if not isnan(current_rate) else NAN
current_profit_fiat: Optional[float] = None
# Calculate fiat profit
if self._fiat_converter:
current_profit_fiat = self._fiat_converter.convert_amount(
current_profit_abs,
self._freqtrade.config['stake_currency'],
self._freqtrade.config['fiat_display_currency']
)
else:
current_profit = current_profit_abs = current_profit_fiat = 0.0
current_profit = trade.close_profit
current_profit_abs = trade.close_profit_abs
# Calculate fiat profit
if not isnan(current_profit_abs) and self._fiat_converter:
current_profit_fiat = self._fiat_converter.convert_amount(
current_profit_abs,
self._freqtrade.config['stake_currency'],
self._freqtrade.config['fiat_display_currency']
)
# Calculate guaranteed profit (in case of trailing stop)
stoploss_entry_dist = trade.calc_profit(trade.stop_loss)
@@ -1058,15 +1062,26 @@ class RPC:
return self._convert_dataframe_to_dict(self._freqtrade.config['strategy'],
pair, timeframe, _data, last_analyzed)
def __rpc_analysed_dataframe_raw(self, pair: str, timeframe: str,
limit: Optional[int]) -> Tuple[DataFrame, datetime]:
""" Get the dataframe and last analyze from the dataprovider """
def __rpc_analysed_dataframe_raw(
self,
pair: str,
timeframe: str,
limit: Optional[int]
) -> Tuple[DataFrame, datetime]:
"""
Get the dataframe and last analyze from the dataprovider
:param pair: The pair to get
:param timeframe: The timeframe of data to get
:param limit: The amount of candles in the dataframe
"""
_data, last_analyzed = self._freqtrade.dataprovider.get_analyzed_dataframe(
pair, timeframe)
_data = _data.copy()
if limit:
_data = _data.iloc[-limit:]
return _data, last_analyzed
def _ws_all_analysed_dataframes(
@@ -1074,7 +1089,16 @@ class RPC:
pairlist: List[str],
limit: Optional[int]
) -> Generator[Dict[str, Any], None, None]:
""" Get the analysed dataframes of each pair in the pairlist """
"""
Get the analysed dataframes of each pair in the pairlist.
If specified, only return the most recent `limit` candles for
each dataframe.
:param pairlist: A list of pairs to get
:param limit: If an integer, limits the size of dataframe
If a list of string date times, only returns those candles
:returns: A generator of dictionaries with the key, dataframe, and last analyzed timestamp
"""
timeframe = self._freqtrade.config['timeframe']
candle_type = self._freqtrade.config.get('candle_type_def', CandleType.SPOT)
@@ -1087,10 +1111,15 @@ class RPC:
"la": last_analyzed
}
def _ws_request_analyzed_df(self, limit: Optional[int]):
def _ws_request_analyzed_df(
self,
limit: Optional[int] = None,
pair: Optional[str] = None
):
""" Historical Analyzed Dataframes for WebSocket """
whitelist = self._freqtrade.active_pair_whitelist
return self._ws_all_analysed_dataframes(whitelist, limit)
pairlist = [pair] if pair else self._freqtrade.active_pair_whitelist
return self._ws_all_analysed_dataframes(pairlist, limit)
def _ws_request_whitelist(self):
""" Whitelist data for WebSocket """

View File

@@ -6,7 +6,7 @@ from collections import deque
from typing import Any, Dict, List
from freqtrade.constants import Config
from freqtrade.enums import RPCMessageType
from freqtrade.enums import NO_ECHO_MESSAGES, RPCMessageType
from freqtrade.rpc import RPC, RPCHandler
@@ -67,7 +67,7 @@ class RPCManager:
'status': 'stopping bot'
}
"""
if msg.get('type') not in (RPCMessageType.ANALYZED_DF, RPCMessageType.WHITELIST):
if msg.get('type') not in NO_ECHO_MESSAGES:
logger.info('Sending rpc message: %s', msg)
if 'pair' in msg:
msg.update({

View File

@@ -68,6 +68,7 @@ class Webhook(RPCHandler):
RPCMessageType.PROTECTION_TRIGGER_GLOBAL,
RPCMessageType.WHITELIST,
RPCMessageType.ANALYZED_DF,
RPCMessageType.NEW_CANDLE,
RPCMessageType.STRATEGY_MSG):
# Don't fail for non-implemented types
return None

View File

@@ -739,10 +739,10 @@ class IStrategy(ABC, HyperStrategyMixin):
"""
pair = str(metadata.get('pair'))
new_candle = self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]['date']
# Test if seen this pair and last candle before.
# always run if process_only_new_candles is set to false
if (not self.process_only_new_candles or
self._last_candle_seen_per_pair.get(pair, None) != dataframe.iloc[-1]['date']):
if not self.process_only_new_candles or new_candle:
# Defs that only make change on new candle data.
dataframe = self.analyze_ticker(dataframe, metadata)
@@ -751,7 +751,7 @@ class IStrategy(ABC, HyperStrategyMixin):
candle_type = self.config.get('candle_type_def', CandleType.SPOT)
self.dp._set_cached_df(pair, self.timeframe, dataframe, candle_type=candle_type)
self.dp._emit_df((pair, self.timeframe, candle_type), dataframe)
self.dp._emit_df((pair, self.timeframe, candle_type), dataframe, new_candle)
else:
logger.debug("Skipping TA Analysis for already analyzed candle")

View File

@@ -7,14 +7,17 @@
"# Strategy analysis example\n",
"\n",
"Debugging a strategy can be time-consuming. Freqtrade offers helper functions to visualize raw data.\n",
"The following assumes you work with SampleStrategy, data for 5m timeframe from Binance and have downloaded them into the data directory in the default location."
"The following assumes you work with SampleStrategy, data for 5m timeframe from Binance and have downloaded them into the data directory in the default location.\n",
"Please follow the [documentation](https://www.freqtrade.io/en/stable/data-download/) for more details."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Setup"
"## Setup\n",
"\n",
"### Change Working directory to repository root"
]
},
{
@@ -23,7 +26,38 @@
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"from pathlib import Path\n",
"\n",
"# Change directory\n",
"# Modify this cell to insure that the output shows the correct path.\n",
"# Define all paths relative to the project root shown in the cell output\n",
"project_root = \"somedir/freqtrade\"\n",
"i=0\n",
"try:\n",
" os.chdirdir(project_root)\n",
" assert Path('LICENSE').is_file()\n",
"except:\n",
" while i<4 and (not Path('LICENSE').is_file()):\n",
" os.chdir(Path(Path.cwd(), '../'))\n",
" i+=1\n",
" project_root = Path.cwd()\n",
"print(Path.cwd())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Configure Freqtrade environment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from freqtrade.configuration import Configuration\n",
"\n",
"# Customize these according to your needs.\n",
@@ -31,14 +65,14 @@
"# Initialize empty configuration object\n",
"config = Configuration.from_files([])\n",
"# Optionally (recommended), use existing configuration file\n",
"# config = Configuration.from_files([\"config.json\"])\n",
"# config = Configuration.from_files([\"user_data/config.json\"])\n",
"\n",
"# Define some constants\n",
"config[\"timeframe\"] = \"5m\"\n",
"# Name of the strategy class\n",
"config[\"strategy\"] = \"SampleStrategy\"\n",
"# Location of the data\n",
"data_location = config['datadir']\n",
"data_location = config[\"datadir\"]\n",
"# Pair to analyze - Only use one pair here\n",
"pair = \"BTC/USDT\""
]
@@ -56,12 +90,12 @@
"candles = load_pair_history(datadir=data_location,\n",
" timeframe=config[\"timeframe\"],\n",
" pair=pair,\n",
" data_format = \"hdf5\",\n",
" data_format = \"json\", # Make sure to update this to your data\n",
" candle_type=CandleType.SPOT,\n",
" )\n",
"\n",
"# Confirm success\n",
"print(\"Loaded \" + str(len(candles)) + f\" rows of data for {pair} from {data_location}\")\n",
"print(f\"Loaded {len(candles)} rows of data for {pair} from {data_location}\")\n",
"candles.head()"
]
},
@@ -365,7 +399,7 @@
"metadata": {
"file_extension": ".py",
"kernelspec": {
"display_name": "Python 3.9.7 64-bit ('trade_397')",
"display_name": "Python 3.9.7 64-bit",
"language": "python",
"name": "python3"
},