skip darwin in RL tests, remove example scripts, improve doc

This commit is contained in:
robcaulk
2022-09-22 21:16:21 +02:00
parent ea8e34e192
commit eeebb78a5c
5 changed files with 362 additions and 518 deletions

View File

@@ -1,262 +1,262 @@
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type, Union
# import logging
# from pathlib import Path
# from typing import Any, Dict, List, Optional, Tuple, Type, Union
import gym
import torch as th
from stable_baselines3 import DQN
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3.common.policies import BasePolicy
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, FlattenExtractor
from stable_baselines3.common.type_aliases import GymEnv, Schedule
from stable_baselines3.dqn.policies import CnnPolicy, DQNPolicy, MlpPolicy, QNetwork
from torch import nn
# import gym
# import torch as th
# from stable_baselines3 import DQN
# from stable_baselines3.common.buffers import ReplayBuffer
# from stable_baselines3.common.policies import BasePolicy
# from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, FlattenExtractor
# from stable_baselines3.common.type_aliases import GymEnv, Schedule
# from stable_baselines3.dqn.policies import CnnPolicy, DQNPolicy, MlpPolicy, QNetwork
# from torch import nn
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
# from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
# from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
logger = logging.getLogger(__name__)
# logger = logging.getLogger(__name__)
class ReinforcementLearnerCustomAgent(BaseReinforcementLearningModel):
"""
User can customize agent by defining the class and using it directly.
Here the example is "TDQN"
# class ReinforcementLearnerCustomAgent(BaseReinforcementLearningModel):
# """
# User can customize agent by defining the class and using it directly.
# Here the example is "TDQN"
Warning!
This is an advanced example of how a user may create and use a highly
customized model class (which can inherit from existing classes,
similar to how the example below inherits from DQN).
This file is for example purposes only, and should not be run.
"""
# Warning!
# This is an advanced example of how a user may create and use a highly
# customized model class (which can inherit from existing classes,
# similar to how the example below inherits from DQN).
# This file is for example purposes only, and should not be run.
# """
def fit_rl(self, data_dictionary: Dict[str, Any], dk: FreqaiDataKitchen):
# def fit_rl(self, data_dictionary: Dict[str, Any], dk: FreqaiDataKitchen):
train_df = data_dictionary["train_features"]
total_timesteps = self.freqai_info["rl_config"]["train_cycles"] * len(train_df)
# train_df = data_dictionary["train_features"]
# total_timesteps = self.freqai_info["rl_config"]["train_cycles"] * len(train_df)
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[256, 256, 128])
# policy_kwargs = dict(activation_fn=th.nn.ReLU,
# net_arch=[256, 256, 128])
# TDQN is a custom agent defined below
model = TDQN(self.policy_type, self.train_env,
tensorboard_log=str(Path(dk.data_path / "tensorboard")),
policy_kwargs=policy_kwargs,
**self.freqai_info['model_training_parameters']
)
# # TDQN is a custom agent defined below
# model = TDQN(self.policy_type, self.train_env,
# tensorboard_log=str(Path(dk.data_path / "tensorboard")),
# policy_kwargs=policy_kwargs,
# **self.freqai_info['model_training_parameters']
# )
model.learn(
total_timesteps=int(total_timesteps),
callback=self.eval_callback
)
# model.learn(
# total_timesteps=int(total_timesteps),
# callback=self.eval_callback
# )
if Path(dk.data_path / "best_model.zip").is_file():
logger.info('Callback found a best model.')
best_model = self.MODELCLASS.load(dk.data_path / "best_model")
return best_model
# if Path(dk.data_path / "best_model.zip").is_file():
# logger.info('Callback found a best model.')
# best_model = self.MODELCLASS.load(dk.data_path / "best_model")
# return best_model
logger.info('Couldnt find best model, using final model instead.')
# logger.info('Couldnt find best model, using final model instead.')
return model
# return model
# User creates their custom agent and networks as shown below
# # User creates their custom agent and networks as shown below
def create_mlp_(
input_dim: int,
output_dim: int,
net_arch: List[int],
activation_fn: Type[nn.Module] = nn.ReLU,
squash_output: bool = False,
) -> List[nn.Module]:
dropout = 0.2
if len(net_arch) > 0:
number_of_neural = net_arch[0]
# def create_mlp_(
# input_dim: int,
# output_dim: int,
# net_arch: List[int],
# activation_fn: Type[nn.Module] = nn.ReLU,
# squash_output: bool = False,
# ) -> List[nn.Module]:
# dropout = 0.2
# if len(net_arch) > 0:
# number_of_neural = net_arch[0]
modules = [
nn.Linear(input_dim, number_of_neural),
nn.BatchNorm1d(number_of_neural),
nn.LeakyReLU(),
nn.Dropout(dropout),
nn.Linear(number_of_neural, number_of_neural),
nn.BatchNorm1d(number_of_neural),
nn.LeakyReLU(),
nn.Dropout(dropout),
nn.Linear(number_of_neural, number_of_neural),
nn.BatchNorm1d(number_of_neural),
nn.LeakyReLU(),
nn.Dropout(dropout),
nn.Linear(number_of_neural, number_of_neural),
nn.BatchNorm1d(number_of_neural),
nn.LeakyReLU(),
nn.Dropout(dropout),
nn.Linear(number_of_neural, output_dim)
]
return modules
# modules = [
# nn.Linear(input_dim, number_of_neural),
# nn.BatchNorm1d(number_of_neural),
# nn.LeakyReLU(),
# nn.Dropout(dropout),
# nn.Linear(number_of_neural, number_of_neural),
# nn.BatchNorm1d(number_of_neural),
# nn.LeakyReLU(),
# nn.Dropout(dropout),
# nn.Linear(number_of_neural, number_of_neural),
# nn.BatchNorm1d(number_of_neural),
# nn.LeakyReLU(),
# nn.Dropout(dropout),
# nn.Linear(number_of_neural, number_of_neural),
# nn.BatchNorm1d(number_of_neural),
# nn.LeakyReLU(),
# nn.Dropout(dropout),
# nn.Linear(number_of_neural, output_dim)
# ]
# return modules
class TDQNetwork(QNetwork):
def __init__(self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
features_extractor: nn.Module,
features_dim: int,
net_arch: Optional[List[int]] = None,
activation_fn: Type[nn.Module] = nn.ReLU,
normalize_images: bool = True
):
super().__init__(
observation_space=observation_space,
action_space=action_space,
features_extractor=features_extractor,
features_dim=features_dim,
net_arch=net_arch,
activation_fn=activation_fn,
normalize_images=normalize_images
)
action_dim = self.action_space.n
q_net = create_mlp_(self.features_dim, action_dim, self.net_arch, self.activation_fn)
self.q_net = nn.Sequential(*q_net).apply(self.init_weights)
# class TDQNetwork(QNetwork):
# def __init__(self,
# observation_space: gym.spaces.Space,
# action_space: gym.spaces.Space,
# features_extractor: nn.Module,
# features_dim: int,
# net_arch: Optional[List[int]] = None,
# activation_fn: Type[nn.Module] = nn.ReLU,
# normalize_images: bool = True
# ):
# super().__init__(
# observation_space=observation_space,
# action_space=action_space,
# features_extractor=features_extractor,
# features_dim=features_dim,
# net_arch=net_arch,
# activation_fn=activation_fn,
# normalize_images=normalize_images
# )
# action_dim = self.action_space.n
# q_net = create_mlp_(self.features_dim, action_dim, self.net_arch, self.activation_fn)
# self.q_net = nn.Sequential(*q_net).apply(self.init_weights)
def init_weights(self, m):
if type(m) == nn.Linear:
th.nn.init.kaiming_uniform_(m.weight)
# def init_weights(self, m):
# if type(m) == nn.Linear:
# th.nn.init.kaiming_uniform_(m.weight)
class TDQNPolicy(DQNPolicy):
# class TDQNPolicy(DQNPolicy):
def __init__(
self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
lr_schedule: Schedule,
net_arch: Optional[List[int]] = None,
activation_fn: Type[nn.Module] = nn.ReLU,
features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor,
features_extractor_kwargs: Optional[Dict[str, Any]] = None,
normalize_images: bool = True,
optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
optimizer_kwargs: Optional[Dict[str, Any]] = None,
):
super().__init__(
observation_space=observation_space,
action_space=action_space,
lr_schedule=lr_schedule,
net_arch=net_arch,
activation_fn=activation_fn,
features_extractor_class=features_extractor_class,
features_extractor_kwargs=features_extractor_kwargs,
normalize_images=normalize_images,
optimizer_class=optimizer_class,
optimizer_kwargs=optimizer_kwargs
)
# def __init__(
# self,
# observation_space: gym.spaces.Space,
# action_space: gym.spaces.Space,
# lr_schedule: Schedule,
# net_arch: Optional[List[int]] = None,
# activation_fn: Type[nn.Module] = nn.ReLU,
# features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor,
# features_extractor_kwargs: Optional[Dict[str, Any]] = None,
# normalize_images: bool = True,
# optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
# optimizer_kwargs: Optional[Dict[str, Any]] = None,
# ):
# super().__init__(
# observation_space=observation_space,
# action_space=action_space,
# lr_schedule=lr_schedule,
# net_arch=net_arch,
# activation_fn=activation_fn,
# features_extractor_class=features_extractor_class,
# features_extractor_kwargs=features_extractor_kwargs,
# normalize_images=normalize_images,
# optimizer_class=optimizer_class,
# optimizer_kwargs=optimizer_kwargs
# )
@staticmethod
def init_weights(module: nn.Module, gain: float = 1) -> None:
"""
Orthogonal initialization (used in PPO and A2C)
"""
if isinstance(module, (nn.Linear, nn.Conv2d)):
nn.init.kaiming_uniform_(module.weight)
if module.bias is not None:
module.bias.data.fill_(0.0)
# @staticmethod
# def init_weights(module: nn.Module, gain: float = 1) -> None:
# """
# Orthogonal initialization (used in PPO and A2C)
# """
# if isinstance(module, (nn.Linear, nn.Conv2d)):
# nn.init.kaiming_uniform_(module.weight)
# if module.bias is not None:
# module.bias.data.fill_(0.0)
def make_q_net(self) -> TDQNetwork:
# Make sure we always have separate networks for features extractors etc
net_args = self._update_features_extractor(self.net_args, features_extractor=None)
return TDQNetwork(**net_args).to(self.device)
# def make_q_net(self) -> TDQNetwork:
# # Make sure we always have separate networks for features extractors etc
# net_args = self._update_features_extractor(self.net_args, features_extractor=None)
# return TDQNetwork(**net_args).to(self.device)
class TMultiInputPolicy(TDQNPolicy):
def __init__(
self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
lr_schedule: Schedule,
net_arch: Optional[List[int]] = None,
activation_fn: Type[nn.Module] = nn.ReLU,
features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor,
features_extractor_kwargs: Optional[Dict[str, Any]] = None,
normalize_images: bool = True,
optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
optimizer_kwargs: Optional[Dict[str, Any]] = None,
):
super().__init__(
observation_space,
action_space,
lr_schedule,
net_arch,
activation_fn,
features_extractor_class,
features_extractor_kwargs,
normalize_images,
optimizer_class,
optimizer_kwargs,
)
# class TMultiInputPolicy(TDQNPolicy):
# def __init__(
# self,
# observation_space: gym.spaces.Space,
# action_space: gym.spaces.Space,
# lr_schedule: Schedule,
# net_arch: Optional[List[int]] = None,
# activation_fn: Type[nn.Module] = nn.ReLU,
# features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor,
# features_extractor_kwargs: Optional[Dict[str, Any]] = None,
# normalize_images: bool = True,
# optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam,
# optimizer_kwargs: Optional[Dict[str, Any]] = None,
# ):
# super().__init__(
# observation_space,
# action_space,
# lr_schedule,
# net_arch,
# activation_fn,
# features_extractor_class,
# features_extractor_kwargs,
# normalize_images,
# optimizer_class,
# optimizer_kwargs,
# )
class TDQN(DQN):
# class TDQN(DQN):
policy_aliases: Dict[str, Type[BasePolicy]] = {
"MlpPolicy": MlpPolicy,
"CnnPolicy": CnnPolicy,
"TMultiInputPolicy": TMultiInputPolicy,
}
# policy_aliases: Dict[str, Type[BasePolicy]] = {
# "MlpPolicy": MlpPolicy,
# "CnnPolicy": CnnPolicy,
# "TMultiInputPolicy": TMultiInputPolicy,
# }
def __init__(
self,
policy: Union[str, Type[TDQNPolicy]],
env: Union[GymEnv, str],
learning_rate: Union[float, Schedule] = 1e-4,
buffer_size: int = 1000000, # 1e6
learning_starts: int = 50000,
batch_size: int = 32,
tau: float = 1.0,
gamma: float = 0.99,
train_freq: Union[int, Tuple[int, str]] = 4,
gradient_steps: int = 1,
replay_buffer_class: Optional[ReplayBuffer] = None,
replay_buffer_kwargs: Optional[Dict[str, Any]] = None,
optimize_memory_usage: bool = False,
target_update_interval: int = 10000,
exploration_fraction: float = 0.1,
exploration_initial_eps: float = 1.0,
exploration_final_eps: float = 0.05,
max_grad_norm: float = 10,
tensorboard_log: Optional[str] = None,
create_eval_env: bool = False,
policy_kwargs: Optional[Dict[str, Any]] = None,
verbose: int = 1,
seed: Optional[int] = None,
device: Union[th.device, str] = "auto",
_init_setup_model: bool = True,
):
# def __init__(
# self,
# policy: Union[str, Type[TDQNPolicy]],
# env: Union[GymEnv, str],
# learning_rate: Union[float, Schedule] = 1e-4,
# buffer_size: int = 1000000, # 1e6
# learning_starts: int = 50000,
# batch_size: int = 32,
# tau: float = 1.0,
# gamma: float = 0.99,
# train_freq: Union[int, Tuple[int, str]] = 4,
# gradient_steps: int = 1,
# replay_buffer_class: Optional[ReplayBuffer] = None,
# replay_buffer_kwargs: Optional[Dict[str, Any]] = None,
# optimize_memory_usage: bool = False,
# target_update_interval: int = 10000,
# exploration_fraction: float = 0.1,
# exploration_initial_eps: float = 1.0,
# exploration_final_eps: float = 0.05,
# max_grad_norm: float = 10,
# tensorboard_log: Optional[str] = None,
# create_eval_env: bool = False,
# policy_kwargs: Optional[Dict[str, Any]] = None,
# verbose: int = 1,
# seed: Optional[int] = None,
# device: Union[th.device, str] = "auto",
# _init_setup_model: bool = True,
# ):
super().__init__(
policy=policy,
env=env,
learning_rate=learning_rate,
buffer_size=buffer_size,
learning_starts=learning_starts,
batch_size=batch_size,
tau=tau,
gamma=gamma,
train_freq=train_freq,
gradient_steps=gradient_steps,
replay_buffer_class=replay_buffer_class, # No action noise
replay_buffer_kwargs=replay_buffer_kwargs,
optimize_memory_usage=optimize_memory_usage,
target_update_interval=target_update_interval,
exploration_fraction=exploration_fraction,
exploration_initial_eps=exploration_initial_eps,
exploration_final_eps=exploration_final_eps,
max_grad_norm=max_grad_norm,
tensorboard_log=tensorboard_log,
create_eval_env=create_eval_env,
policy_kwargs=policy_kwargs,
verbose=verbose,
seed=seed,
device=device,
_init_setup_model=_init_setup_model
)
# super().__init__(
# policy=policy,
# env=env,
# learning_rate=learning_rate,
# buffer_size=buffer_size,
# learning_starts=learning_starts,
# batch_size=batch_size,
# tau=tau,
# gamma=gamma,
# train_freq=train_freq,
# gradient_steps=gradient_steps,
# replay_buffer_class=replay_buffer_class, # No action noise
# replay_buffer_kwargs=replay_buffer_kwargs,
# optimize_memory_usage=optimize_memory_usage,
# target_update_interval=target_update_interval,
# exploration_fraction=exploration_fraction,
# exploration_initial_eps=exploration_initial_eps,
# exploration_final_eps=exploration_final_eps,
# max_grad_norm=max_grad_norm,
# tensorboard_log=tensorboard_log,
# create_eval_env=create_eval_env,
# policy_kwargs=policy_kwargs,
# verbose=verbose,
# seed=seed,
# device=device,
# _init_setup_model=_init_setup_model
# )

View File

@@ -1,143 +0,0 @@
import logging
from functools import reduce
import pandas as pd
import talib.abstract as ta
from pandas import DataFrame
from freqtrade.strategy import DecimalParameter, IntParameter, IStrategy, merge_informative_pair
logger = logging.getLogger(__name__)
class ReinforcementLearningExample4ac(IStrategy):
"""
Test strategy - used for testing freqAI functionalities.
DO not use in production.
"""
minimal_roi = {"0": 0.1, "240": -1}
plot_config = {
"main_plot": {},
"subplots": {
"prediction": {"prediction": {"color": "blue"}},
"target_roi": {
"target_roi": {"color": "brown"},
},
"do_predict": {
"do_predict": {"color": "brown"},
},
},
}
process_only_new_candles = True
stoploss = -0.05
use_exit_signal = True
startup_candle_count: int = 300
can_short = True
linear_roi_offset = DecimalParameter(
0.00, 0.02, default=0.005, space="sell", optimize=False, load=True
)
max_roi_time_long = IntParameter(0, 800, default=400, space="sell", optimize=False, load=True)
def informative_pairs(self):
whitelist_pairs = self.dp.current_whitelist()
corr_pairs = self.config["freqai"]["feature_parameters"]["include_corr_pairlist"]
informative_pairs = []
for tf in self.config["freqai"]["feature_parameters"]["include_timeframes"]:
for pair in whitelist_pairs:
informative_pairs.append((pair, tf))
for pair in corr_pairs:
if pair in whitelist_pairs:
continue # avoid duplication
informative_pairs.append((pair, tf))
return informative_pairs
def populate_any_indicators(
self, pair, df, tf, informative=None, set_generalized_indicators=False
):
coin = pair.split('/')[0]
if informative is None:
informative = self.dp.get_pair_dataframe(pair, tf)
# first loop is automatically duplicating indicators for time periods
for t in self.freqai_info["feature_parameters"]["indicator_periods_candles"]:
t = int(t)
informative[f"%-{coin}rsi-period_{t}"] = ta.RSI(informative, timeperiod=t)
informative[f"%-{coin}mfi-period_{t}"] = ta.MFI(informative, timeperiod=t)
informative[f"%-{coin}adx-period_{t}"] = ta.ADX(informative, window=t)
informative[f"%-{coin}pct-change"] = informative["close"].pct_change()
informative[f"%-{coin}raw_volume"] = informative["volume"]
# The following features are necessary for RL models
informative[f"%-{coin}raw_close"] = informative["close"]
informative[f"%-{coin}raw_open"] = informative["open"]
informative[f"%-{coin}raw_high"] = informative["high"]
informative[f"%-{coin}raw_low"] = informative["low"]
indicators = [col for col in informative if col.startswith("%")]
# This loop duplicates and shifts all indicators to add a sense of recency to data
for n in range(self.freqai_info["feature_parameters"]["include_shifted_candles"] + 1):
if n == 0:
continue
informative_shift = informative[indicators].shift(n)
informative_shift = informative_shift.add_suffix("_shift-" + str(n))
informative = pd.concat((informative, informative_shift), axis=1)
df = merge_informative_pair(df, informative, self.config["timeframe"], tf, ffill=True)
skip_columns = [
(s + "_" + tf) for s in ["date", "open", "high", "low", "close", "volume"]
]
df = df.drop(columns=skip_columns)
# Add generalized indicators here (because in live, it will call this
# function to populate indicators during training). Notice how we ensure not to
# add them multiple times
if set_generalized_indicators:
df["%-day_of_week"] = (df["date"].dt.dayofweek + 1) / 7
df["%-hour_of_day"] = (df["date"].dt.hour + 1) / 25
# For RL, this is not a target, it is simply a filler until actions come out
# of the model.
# for Base4ActionEnv, 0 is netural (hold)
df["&-action"] = 0
return df
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe = self.freqai.start(dataframe, metadata, self)
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [df["do_predict"] == 1, df["&-action"] == 2]
if enter_long_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"]
] = (1, "long")
enter_short_conditions = [df["do_predict"] == 1, df["&-action"] == 3]
if enter_short_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_short_conditions), ["enter_short", "enter_tag"]
] = (1, "short")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
exit_long_conditions = [df["do_predict"] == 1, df["&-action"] == 1]
if exit_long_conditions:
df.loc[reduce(lambda x, y: x & y, exit_long_conditions), "exit"] = 1
return df

View File

@@ -1,147 +0,0 @@
import logging
from functools import reduce
import pandas as pd
import talib.abstract as ta
from pandas import DataFrame
from freqtrade.strategy import DecimalParameter, IntParameter, IStrategy, merge_informative_pair
logger = logging.getLogger(__name__)
class ReinforcementLearningExample5ac(IStrategy):
"""
Test strategy - used for testing freqAI functionalities.
DO not use in production.
"""
minimal_roi = {"0": 0.1, "240": -1}
plot_config = {
"main_plot": {},
"subplots": {
"prediction": {"prediction": {"color": "blue"}},
"target_roi": {
"target_roi": {"color": "brown"},
},
"do_predict": {
"do_predict": {"color": "brown"},
},
},
}
process_only_new_candles = True
stoploss = -0.05
use_exit_signal = True
startup_candle_count: int = 300
can_short = True
linear_roi_offset = DecimalParameter(
0.00, 0.02, default=0.005, space="sell", optimize=False, load=True
)
max_roi_time_long = IntParameter(0, 800, default=400, space="sell", optimize=False, load=True)
def informative_pairs(self):
whitelist_pairs = self.dp.current_whitelist()
corr_pairs = self.config["freqai"]["feature_parameters"]["include_corr_pairlist"]
informative_pairs = []
for tf in self.config["freqai"]["feature_parameters"]["include_timeframes"]:
for pair in whitelist_pairs:
informative_pairs.append((pair, tf))
for pair in corr_pairs:
if pair in whitelist_pairs:
continue # avoid duplication
informative_pairs.append((pair, tf))
return informative_pairs
def populate_any_indicators(
self, pair, df, tf, informative=None, set_generalized_indicators=False
):
coin = pair.split('/')[0]
if informative is None:
informative = self.dp.get_pair_dataframe(pair, tf)
# first loop is automatically duplicating indicators for time periods
for t in self.freqai_info["feature_parameters"]["indicator_periods_candles"]:
t = int(t)
informative[f"%-{coin}rsi-period_{t}"] = ta.RSI(informative, timeperiod=t)
informative[f"%-{coin}mfi-period_{t}"] = ta.MFI(informative, timeperiod=t)
informative[f"%-{coin}adx-period_{t}"] = ta.ADX(informative, window=t)
informative[f"%-{coin}pct-change"] = informative["close"].pct_change()
informative[f"%-{coin}raw_volume"] = informative["volume"]
# FIXME: add these outside the user strategy?
# The following columns are necessary for RL models.
informative[f"%-{coin}raw_close"] = informative["close"]
informative[f"%-{coin}raw_open"] = informative["open"]
informative[f"%-{coin}raw_high"] = informative["high"]
informative[f"%-{coin}raw_low"] = informative["low"]
indicators = [col for col in informative if col.startswith("%")]
# This loop duplicates and shifts all indicators to add a sense of recency to data
for n in range(self.freqai_info["feature_parameters"]["include_shifted_candles"] + 1):
if n == 0:
continue
informative_shift = informative[indicators].shift(n)
informative_shift = informative_shift.add_suffix("_shift-" + str(n))
informative = pd.concat((informative, informative_shift), axis=1)
df = merge_informative_pair(df, informative, self.config["timeframe"], tf, ffill=True)
skip_columns = [
(s + "_" + tf) for s in ["date", "open", "high", "low", "close", "volume"]
]
df = df.drop(columns=skip_columns)
# Add generalized indicators here (because in live, it will call this
# function to populate indicators during training). Notice how we ensure not to
# add them multiple times
if set_generalized_indicators:
df["%-day_of_week"] = (df["date"].dt.dayofweek + 1) / 7
df["%-hour_of_day"] = (df["date"].dt.hour + 1) / 25
# For RL, there are no direct targets to set. This is filler (neutral)
# until the agent sends an action.
df["&-action"] = 0
return df
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dataframe = self.freqai.start(dataframe, metadata, self)
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [df["do_predict"] == 1, df["&-action"] == 1]
if enter_long_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"]
] = (1, "long")
enter_short_conditions = [df["do_predict"] == 1, df["&-action"] == 3]
if enter_short_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_short_conditions), ["enter_short", "enter_tag"]
] = (1, "short")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
exit_long_conditions = [df["do_predict"] == 1, df["&-action"] == 2]
if exit_long_conditions:
df.loc[reduce(lambda x, y: x & y, exit_long_conditions), "exit_long"] = 1
exit_short_conditions = [df["do_predict"] == 1, df["&-action"] == 4]
if exit_short_conditions:
df.loc[reduce(lambda x, y: x & y, exit_short_conditions), "exit_short"] = 1
return df