restructure RL so that user can customize environment

This commit is contained in:
robcaulk 2022-08-15 10:26:44 +02:00
parent ecd1f55abc
commit 91683e1dca
13 changed files with 882 additions and 1904 deletions

View File

@ -0,0 +1,318 @@
import logging
from enum import Enum
# from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import gym
import numpy as np
from gym import spaces
from gym.utils import seeding
logger = logging.getLogger(__name__)
class Actions(Enum):
Short = 0
Long = 1
Neutral = 2
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
def mean_over_std(x):
std = np.std(x, ddof=1)
mean = np.mean(x)
return mean / std if std > 0 else 0
class BaseRLEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee = 0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed: int = 1):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point is True:
self._position_history = (self._start_tick * [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action: int):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self.calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short.value:
self._position = Positions.Short
trade_type = "short"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick is None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price) / last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price) / last_trade_price
else:
return 0.
def is_tradesignal(self, action: int):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral)
or (action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def _is_trade(self, action: Actions):
return ((action == Actions.Long.value and self._position == Positions.Short) or
(action == Actions.Short.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
def calculate_reward(self, action):
if self._last_trade_tick is None:
return 0.
# close long
if (action == Actions.Short.value or
action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
# close short
if (action == Actions.Long.value or
action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
return 0.
def _update_profit(self, action):
if self._is_trade(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
self._total_profit = self._total_profit + self._total_profit * pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
if self._position == Positions.Short:
self._total_profit = self._total_profit + self._total_profit * pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
def most_recent_return(self, action: int):
"""
We support Long, Neutral and Short positions.
Return is generated from rising prices in Long
and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
# Long positions
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
if action == Actions.Short.value or action == Actions.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_buy_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
# Short positions
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
if action == Actions.Long.value or action == Actions.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_sell_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
return 0
def get_portfolio_log_returns(self):
return self.portfolio_log_returns[1:self._current_tick + 1]
def update_portfolio_log_returns(self, action):
self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action)
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def prev_price(self) -> float:
return self.prices.iloc[self._current_tick - 1].open
def sharpe_ratio(self):
if len(self.close_trade_profit) == 0:
return 0.
returns = np.array(self.close_trade_profit)
reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9)
return reward

View File

@ -0,0 +1,230 @@
import logging
from typing import Any, Dict, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
from pandas import DataFrame
from abc import abstractmethod
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv, Actions, Positions
from freqtrade.persistence import Trade
logger = logging.getLogger(__name__)
class BaseReinforcementLearningModel(IFreqaiModel):
"""
User created Reinforcement Learning Model prediction model.
"""
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_dataframe: Full dataframe for the current training period
:param metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info("--------------------Starting training " f"{pair} --------------------")
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_dataframe,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
data_dictionary: Dict[str, Any] = dk.make_train_test_datasets(
features_filtered, labels_filtered)
dk.fit_labels() # useless for now, but just satiating append methods
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
)
logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
model = self.fit(data_dictionary, pair)
if pair not in self.dd.historic_predictions:
self.set_initial_historic_predictions(
data_dictionary['train_features'], model, dk, pair)
self.dd.save_historic_predictions_to_disk()
logger.info(f"--------------------done training {pair}--------------------")
return model
@abstractmethod
def fit(self, data_dictionary: Dict[str, Any], pair: str = ''):
"""
Agent customizations and abstract Reinforcement Learning customizations
go in here. Abstract method, so this function must be overridden by
user class.
"""
return
def get_state_info(self, pair):
open_trades = Trade.get_trades(trade_filter=Trade.is_open.is_(True))
market_side = 0.5
current_profit = 0
for trade in open_trades:
if trade.pair == pair:
current_value = trade.open_trade_value
openrate = trade.open_rate
if 'long' in trade.enter_tag:
market_side = 1
else:
market_side = 0
current_profit = current_value / openrate - 1
total_profit = 0
closed_trades = Trade.get_trades(
trade_filter=[Trade.is_open.is_(False), Trade.pair == pair])
for trade in closed_trades:
total_profit += trade.close_profit
return market_side, current_profit, total_profit
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = False
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, dk.training_features_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk, filtered_dataframe)
pred_df = self.rl_model_predict(dk.data_dictionary["prediction_features"], dk, self.model)
pred_df.fillna(0, inplace=True)
return (pred_df, dk.do_predict)
def rl_model_predict(self, dataframe: DataFrame,
dk: FreqaiDataKitchen, model: Any) -> DataFrame:
output = pd.DataFrame(np.full((len(dataframe), 1), 2), columns=dk.label_list)
def _predict(window):
observations = dataframe.iloc[window.index]
res, _ = model.predict(observations, deterministic=True)
return res
output = output.rolling(window=self.CONV_WIDTH).apply(_predict)
return output
def set_initial_historic_predictions(
self, df: DataFrame, model: Any, dk: FreqaiDataKitchen, pair: str
) -> None:
pred_df = self.rl_model_predict(df, dk, model)
pred_df.fillna(0, inplace=True)
self.dd.historic_predictions[pair] = pred_df
hist_preds_df = self.dd.historic_predictions[pair]
for label in hist_preds_df.columns:
if hist_preds_df[label].dtype == object:
continue
hist_preds_df[f'{label}_mean'] = 0
hist_preds_df[f'{label}_std'] = 0
hist_preds_df['do_predict'] = 0
if self.freqai_info['feature_parameters'].get('DI_threshold', 0) > 0:
hist_preds_df['DI_values'] = 0
for return_str in dk.data['extra_returns_per_train']:
hist_preds_df[return_str] = 0
class MyRLEnv(BaseRLEnv):
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short.value:
self._position = Positions.Short
trade_type = "short"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info

View File

@ -6,11 +6,10 @@ import torch as th
from stable_baselines3 import DQN
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3.common.policies import BasePolicy
from stable_baselines3.common.torch_layers import (BaseFeaturesExtractor, CombinedExtractor,
from stable_baselines3.common.torch_layers import (BaseFeaturesExtractor,
FlattenExtractor)
from stable_baselines3.common.type_aliases import GymEnv, Schedule
#from stable_baselines3.common.policies import register_policy
from stable_baselines3.dqn.policies import (CnnPolicy, DQNPolicy, MlpPolicy, MultiInputPolicy,
from stable_baselines3.dqn.policies import (CnnPolicy, DQNPolicy, MlpPolicy,
QNetwork)
from torch import nn
@ -47,16 +46,17 @@ def create_mlp_(
]
return modules
class TDQNetwork(QNetwork):
def __init__(self,
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
features_extractor: nn.Module,
features_dim: int,
net_arch: Optional[List[int]] = None,
activation_fn: Type[nn.Module] = nn.ReLU,
normalize_images: bool = True
):
observation_space: gym.spaces.Space,
action_space: gym.spaces.Space,
features_extractor: nn.Module,
features_dim: int,
net_arch: Optional[List[int]] = None,
activation_fn: Type[nn.Module] = nn.ReLU,
normalize_images: bool = True
):
super().__init__(
observation_space=observation_space,
action_space=action_space,
@ -211,10 +211,3 @@ class TDQN(DQN):
device=device,
_init_setup_model=_init_setup_model
)
# try:
# register_policy("TMultiInputPolicy", TMultiInputPolicy)
# except:
# print("already registered")

View File

@ -1,139 +0,0 @@
# common library
import gym
import numpy as np
from stable_baselines3 import A2C, DDPG, PPO, SAC, TD3
from stable_baselines3.common.callbacks import (BaseCallback, CallbackList, CheckpointCallback,
EvalCallback, StopTrainingOnRewardThreshold)
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
from freqtrade.freqai.prediction_models.RL import config
#from freqtrade.freqai.prediction_models.RL.RLPrediction_agent_v2 import TDQN
from freqtrade.freqai.prediction_models.RL.RLPrediction_env import DEnv
# from stable_baselines3.common.vec_env import DummyVecEnv
# from meta.env_stock_trading.env_stock_trading import StockTradingEnv
# RL models from stable-baselines
MODELS = {"a2c": A2C, "ddpg": DDPG, "td3": TD3, "sac": SAC, "ppo": PPO}
MODEL_KWARGS = {x: config.__dict__[f"{x.upper()}_PARAMS"] for x in MODELS.keys()}
NOISE = {
"normal": NormalActionNoise,
"ornstein_uhlenbeck": OrnsteinUhlenbeckActionNoise,
}
class TensorboardCallback(BaseCallback):
"""
Custom callback for plotting additional values in tensorboard.
"""
def __init__(self, verbose=0):
super(TensorboardCallback, self).__init__(verbose)
def _on_step(self) -> bool:
try:
self.logger.record(key="train/reward", value=self.locals["rewards"][0])
except BaseException:
self.logger.record(key="train/reward", value=self.locals["reward"][0])
return True
class RLPrediction_agent:
"""Provides implementations for DRL algorithms
Based on:
https://github.com/AI4Finance-Foundation/FinRL-Meta/blob/master/agents/stablebaselines3_models.py
Attributes
----------
env: gym environment class
user-defined class
Methods
-------
get_model()
setup DRL algorithms
train_model()
train DRL algorithms in a train dataset
and output the trained model
DRL_prediction()
make a prediction in a test dataset and get results
"""
def __init__(self, env):
self.env = env
def get_model(
self,
model_name,
policy="MlpPolicy",
policy_kwargs=None,
model_kwargs=None,
reward_kwargs=None,
#total_timesteps=None,
verbose=1,
seed=None
):
if model_name not in MODELS:
raise NotImplementedError("NotImplementedError")
if model_kwargs is None:
model_kwargs = MODEL_KWARGS[model_name]
if "action_noise" in model_kwargs:
n_actions = self.env.action_space.shape[-1]
model_kwargs["action_noise"] = NOISE[model_kwargs["action_noise"]](
mean=np.zeros(n_actions), sigma=0.1 * np.ones(n_actions)
)
print(model_kwargs)
model = MODELS[model_name](
policy=policy,
env=self.env,
tensorboard_log=f"{config.TENSORBOARD_LOG_DIR}/{model_name}",
verbose=verbose,
policy_kwargs=policy_kwargs,
#model_kwargs=model_kwargs,
#total_timesteps=model_kwargs["total_timesteps"],
seed=seed
#**model_kwargs,
)
return model
def train_model(self, model, tb_log_name, model_kwargs, train_df, test_df, price, price_test, window_size):
agent_params = self.freqai_info['model_training_parameters']
reward_params = self.freqai_info['model_reward_parameters']
train_env = DEnv(df=train_df, prices=price, window_size=window_size, reward_kwargs=reward_params)
eval_env = DEnv(df=test_df, prices=price_test, window_size=window_size, reward_kwargs=reward_params)
# checkpoint_callback = CheckpointCallback(save_freq=1000, save_path='./logs/',
# name_prefix='rl_model')
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path='./logs/')
eval_callback = EvalCallback(eval_env, best_model_save_path='./logs/best_model', log_path='./logs/results', eval_freq=500)
#callback_on_best = StopTrainingOnRewardThreshold(reward_threshold=-200, verbose=1)
# Create the callback list
callback = CallbackList([checkpoint_callback, eval_callback])
model = model.learn(
total_timesteps=model_kwargs["total_timesteps"],
tb_log_name=tb_log_name,
callback=callback,
#callback=TensorboardCallback(),
)
return model

View File

@ -1,513 +0,0 @@
import logging
import random
from collections import deque
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import gym
import matplotlib.pylab as plt
import numpy as np
import pandas as pd
from gym import spaces
from gym.utils import seeding
logger = logging.getLogger(__name__)
class Actions(Enum):
Short = 0
Long = 1
Neutral = 2
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
def mean_over_std(x):
std = np.std(x, ddof=1)
mean = np.mean(x)
return mean / std if std > 0 else 0
class DEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee=0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point == True:
self._position_history = (self._start_tick* [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
#self._last_trade_tick = self._current_tick - 1
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
temp_position = self._position
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short.value:
self._position = Positions.Short
trade_type = "short"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type != None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick, 'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick = self._current_tick,
total_reward = self.total_reward,
total_profit = self._total_profit,
position = self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
# def processState(self, state):
# return state.to_numpy()
# def convert_mlp_Policy(self, obs_):
# pass
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick == None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price)/last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price)/last_trade_price
else:
return 0.
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral)
or (action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def _is_trade(self, action: Actions):
return ((action == Actions.Long.value and self._position == Positions.Short) or
(action == Actions.Short.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)
# def render(self, mode='human'):
# def _plot_position(position, tick):
# color = None
# if position == Positions.Short:
# color = 'red'
# elif position == Positions.Long:
# color = 'green'
# if color:
# plt.scatter(tick, self.prices.loc[tick].open, color=color)
# if self._first_rendering:
# self._first_rendering = False
# plt.cla()
# plt.plot(self.prices)
# start_position = self._position_history[self._start_tick]
# _plot_position(start_position, self._start_tick)
# plt.cla()
# plt.plot(self.prices)
# _plot_position(self._position, self._current_tick)
# plt.suptitle("Total Reward: %.6f" % self.total_reward + ' ~ ' + "Total Profit: %.6f" % self._total_profit)
# plt.pause(0.01)
# def render_all(self):
# plt.figure()
# window_ticks = np.arange(len(self._position_history))
# plt.plot(self.prices['open'], alpha=0.5)
# short_ticks = []
# long_ticks = []
# neutral_ticks = []
# for i, tick in enumerate(window_ticks):
# if self._position_history[i] == Positions.Short:
# short_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Long:
# long_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Neutral:
# neutral_ticks.append(tick - 1)
# plt.plot(neutral_ticks, self.prices.loc[neutral_ticks].open,
# 'o', color='grey', ms=3, alpha=0.1)
# plt.plot(short_ticks, self.prices.loc[short_ticks].open,
# 'o', color='r', ms=3, alpha=0.8)
# plt.plot(long_ticks, self.prices.loc[long_ticks].open,
# 'o', color='g', ms=3, alpha=0.8)
# plt.suptitle("Generalising")
# fig = plt.gcf()
# fig.set_size_inches(15, 10)
# def close_trade_report(self):
# small_trade = 0
# positive_big_trade = 0
# negative_big_trade = 0
# small_profit = 0.003
# for i in self.close_trade_profit:
# if i < small_profit and i > -small_profit:
# small_trade+=1
# elif i > small_profit:
# positive_big_trade += 1
# elif i < -small_profit:
# negative_big_trade += 1
# print(f"small trade={small_trade/len(self.close_trade_profit)}; positive_big_trade={positive_big_trade/len(self.close_trade_profit)}; negative_big_trade={negative_big_trade/len(self.close_trade_profit)}")
# def report(self):
# # get total trade
# long_trade = 0
# short_trade = 0
# neutral_trade = 0
# for trade in self.trade_history:
# if trade['type'] == 'long':
# long_trade += 1
# elif trade['type'] == 'short':
# short_trade += 1
# else:
# neutral_trade += 1
# negative_trade = 0
# positive_trade = 0
# for tr in self.close_trade_profit:
# if tr < 0.:
# negative_trade += 1
# if tr > 0.:
# positive_trade += 1
# total_trade_lr = negative_trade+positive_trade
# total_trade = long_trade + short_trade
# sharp_ratio = self.sharpe_ratio()
# sharp_log = self.get_sharpe_ratio()
# from tabulate import tabulate
# headers = ["Performance", ""]
# performanceTable = [["Total Trade", "{0:.2f}".format(total_trade)],
# ["Total reward", "{0:.3f}".format(self.total_reward)],
# ["Start profit(unit)", "{0:.2f}".format(1.)],
# ["End profit(unit)", "{0:.3f}".format(self._total_profit)],
# ["Sharp ratio", "{0:.3f}".format(sharp_ratio)],
# ["Sharp log", "{0:.3f}".format(sharp_log)],
# # ["Sortino ratio", "{0:.2f}".format(0) + '%'],
# ["winrate", "{0:.2f}".format(positive_trade*100/total_trade_lr) + '%']
# ]
# tabulation = tabulate(performanceTable, headers, tablefmt="fancy_grid", stralign="center")
# print(tabulation)
# result = {
# "Start": "{0:.2f}".format(1.),
# "End": "{0:.2f}".format(self._total_profit),
# "Sharp": "{0:.3f}".format(sharp_ratio),
# "Winrate": "{0:.2f}".format(positive_trade*100/total_trade_lr)
# }
# return result
# def close(self):
# plt.close()
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
# def save_rendering(self, filepath):
# plt.savefig(filepath)
# def pause_rendering(self):
# plt.show()
def _calculate_reward(self, action):
# rw = self.transaction_profit_reward(action)
#rw = self.reward_rr_profit_config(action)
rw = self.profit_only_when_close_reward(action)
#rw = self.profit_only_when_close_reward_aim(action)
return rw
def _update_profit(self, action):
if self._is_trade(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
if self._position == Positions.Short:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
def most_recent_return(self, action):
"""
We support Long, Neutral and Short positions.
Return is generated from rising prices in Long
and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
# Long positions
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
if action == Actions.Short.value or action == Actions.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_buy_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
# Short positions
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
if action == Actions.Long.value or action == Actions.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_sell_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
return 0
def get_portfolio_log_returns(self):
return self.portfolio_log_returns[1:self._current_tick + 1]
# def get_trading_log_return(self):
# return self.portfolio_log_returns[self._start_tick:]
def update_portfolio_log_returns(self, action):
self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action)
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def prev_price(self) -> float:
return self.prices.iloc[self._current_tick-1].open
def sharpe_ratio(self):
if len(self.close_trade_profit) == 0:
return 0.
returns = np.array(self.close_trade_profit)
reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9)
return reward
# def get_bnh_log_return(self):
# return np.diff(np.log(self.prices['open'][self._start_tick:]))
def transaction_profit_reward(self, action):
rw = 0.
pt = self.prev_price()
pt_1 = self.current_price()
if self._position == Positions.Long:
a_t = 1
elif self._position == Positions.Short:
a_t = -1
else:
a_t = 0
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
# close short
elif (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
pt_1 = self.add_buy_fee(self.current_price())
po = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
else:
rw = a_t*(pt_1 - pt)/pt
return np.clip(rw, 0, 1)
def profit_only_when_close_reward_aim(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(current_price) - np.log(last_trade_price)) * 2)
# close short
if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(last_trade_price) - np.log(current_price)) * 2)
return 0.
def profit_only_when_close_reward(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
# close short
if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
return 0.

View File

@ -1,671 +0,0 @@
import logging
import random
from collections import deque
from enum import Enum
#from sklearn.decomposition import PCA, KernelPCA
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import gym
import matplotlib.pylab as plt
import numpy as np
import pandas as pd
from gym import spaces
from gym.utils import seeding
logger = logging.getLogger(__name__)
# from bokeh.io import output_notebook
# from bokeh.plotting import figure, show
# from bokeh.models import (
# CustomJS,
# ColumnDataSource,
# NumeralTickFormatter,
# Span,
# HoverTool,
# Range1d,
# DatetimeTickFormatter,
# Scatter,
# Label, LabelSet
# )
class Actions(Enum):
Neutral = 0
Long_buy = 1
Long_sell = 2
Short_buy = 3
Short_sell = 4
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
def mean_over_std(x):
std = np.std(x, ddof=1)
mean = np.mean(x)
return mean / std if std > 0 else 0
class DEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee=0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point == True:
self._position_history = (self._start_tick* [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
#self._last_trade_tick = self._current_tick - 1
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
temp_position = self._position
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long_buy.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short_buy.value:
self._position = Positions.Short
trade_type = "short"
elif action == Actions.Long_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Short_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type != None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick, 'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick = self._current_tick,
total_reward = self.total_reward,
total_profit = self._total_profit,
position = self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
# def processState(self, state):
# return state.to_numpy()
# def convert_mlp_Policy(self, obs_):
# pass
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick == None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price)/last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price)/last_trade_price
else:
return 0.
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral) or
(action == Actions.Short_buy.value and self._position == Positions.Short) or
(action == Actions.Short_sell.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Short_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Long) or
(action == Actions.Long_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Long_sell.value and self._position == Positions.Short))
def _is_trade(self, action: Actions):
return ((action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short) or
(action == Actions.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions.Neutral.Long_sell and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)
# def render(self, mode='human'):
# def _plot_position(position, tick):
# color = None
# if position == Positions.Short:
# color = 'red'
# elif position == Positions.Long:
# color = 'green'
# if color:
# plt.scatter(tick, self.prices.loc[tick].open, color=color)
# if self._first_rendering:
# self._first_rendering = False
# plt.cla()
# plt.plot(self.prices)
# start_position = self._position_history[self._start_tick]
# _plot_position(start_position, self._start_tick)
# plt.cla()
# plt.plot(self.prices)
# _plot_position(self._position, self._current_tick)
# plt.suptitle("Total Reward: %.6f" % self.total_reward + ' ~ ' + "Total Profit: %.6f" % self._total_profit)
# plt.pause(0.01)
# def render_all(self):
# plt.figure()
# window_ticks = np.arange(len(self._position_history))
# plt.plot(self.prices['open'], alpha=0.5)
# short_ticks = []
# long_ticks = []
# neutral_ticks = []
# for i, tick in enumerate(window_ticks):
# if self._position_history[i] == Positions.Short:
# short_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Long:
# long_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Neutral:
# neutral_ticks.append(tick - 1)
# plt.plot(neutral_ticks, self.prices.loc[neutral_ticks].open,
# 'o', color='grey', ms=3, alpha=0.1)
# plt.plot(short_ticks, self.prices.loc[short_ticks].open,
# 'o', color='r', ms=3, alpha=0.8)
# plt.plot(long_ticks, self.prices.loc[long_ticks].open,
# 'o', color='g', ms=3, alpha=0.8)
# plt.suptitle("Generalising")
# fig = plt.gcf()
# fig.set_size_inches(15, 10)
# def close_trade_report(self):
# small_trade = 0
# positive_big_trade = 0
# negative_big_trade = 0
# small_profit = 0.003
# for i in self.close_trade_profit:
# if i < small_profit and i > -small_profit:
# small_trade+=1
# elif i > small_profit:
# positive_big_trade += 1
# elif i < -small_profit:
# negative_big_trade += 1
# print(f"small trade={small_trade/len(self.close_trade_profit)}; positive_big_trade={positive_big_trade/len(self.close_trade_profit)}; negative_big_trade={negative_big_trade/len(self.close_trade_profit)}")
# def report(self):
# # get total trade
# long_trade = 0
# short_trade = 0
# neutral_trade = 0
# for trade in self.trade_history:
# if trade['type'] == 'long':
# long_trade += 1
# elif trade['type'] == 'short':
# short_trade += 1
# else:
# neutral_trade += 1
# negative_trade = 0
# positive_trade = 0
# for tr in self.close_trade_profit:
# if tr < 0.:
# negative_trade += 1
# if tr > 0.:
# positive_trade += 1
# total_trade_lr = negative_trade+positive_trade
# total_trade = long_trade + short_trade
# sharp_ratio = self.sharpe_ratio()
# sharp_log = self.get_sharpe_ratio()
# from tabulate import tabulate
# headers = ["Performance", ""]
# performanceTable = [["Total Trade", "{0:.2f}".format(total_trade)],
# ["Total reward", "{0:.3f}".format(self.total_reward)],
# ["Start profit(unit)", "{0:.2f}".format(1.)],
# ["End profit(unit)", "{0:.3f}".format(self._total_profit)],
# ["Sharp ratio", "{0:.3f}".format(sharp_ratio)],
# ["Sharp log", "{0:.3f}".format(sharp_log)],
# # ["Sortino ratio", "{0:.2f}".format(0) + '%'],
# ["winrate", "{0:.2f}".format(positive_trade*100/total_trade_lr) + '%']
# ]
# tabulation = tabulate(performanceTable, headers, tablefmt="fancy_grid", stralign="center")
# print(tabulation)
# result = {
# "Start": "{0:.2f}".format(1.),
# "End": "{0:.2f}".format(self._total_profit),
# "Sharp": "{0:.3f}".format(sharp_ratio),
# "Winrate": "{0:.2f}".format(positive_trade*100/total_trade_lr)
# }
# return result
# def close(self):
# plt.close()
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
# def save_rendering(self, filepath):
# plt.savefig(filepath)
# def pause_rendering(self):
# plt.show()
def _calculate_reward(self, action):
# rw = self.transaction_profit_reward(action)
#rw = self.reward_rr_profit_config(action)
#rw = self.reward_rr_profit_config(action) # main
#rw = self.profit_only_when_close_reward(action)
rw = self.profit_only_when_close_reward_aim(action)
return rw
def _update_profit(self, action):
#if self._is_trade(action) or self._done:
if self._is_trade(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
if self._position == Positions.Short:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
def most_recent_return(self, action):
"""
We support Long, Neutral and Short positions.
Return is generated from rising prices in Long
and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
# Long positions
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
#if action == Actions.Short.value or action == Actions.Neutral.value:
if action == Actions.Short_buy.value or action == Actions.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_buy_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
# Short positions
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
#if action == Actions.Long.value or action == Actions.Neutral.value:
if action == Actions.Long_buy.value or action == Actions.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_sell_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
return 0
def get_portfolio_log_returns(self):
return self.portfolio_log_returns[1:self._current_tick + 1]
def get_trading_log_return(self):
return self.portfolio_log_returns[self._start_tick:]
def update_portfolio_log_returns(self, action):
self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action)
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def prev_price(self) -> float:
return self.prices.iloc[self._current_tick-1].open
def sharpe_ratio(self):
if len(self.close_trade_profit) == 0:
return 0.
returns = np.array(self.close_trade_profit)
reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9)
return reward
def get_bnh_log_return(self):
return np.diff(np.log(self.prices['open'][self._start_tick:]))
def transaction_profit_reward(self, action):
rw = 0.
pt = self.prev_price()
pt_1 = self.current_price()
if self._position == Positions.Long:
a_t = 1
elif self._position == Positions.Short:
a_t = -1
else:
a_t = 0
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
# close short
elif (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
pt_1 = self.add_buy_fee(self.current_price())
po = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
else:
rw = a_t*(pt_1 - pt)/pt
return np.clip(rw, 0, 1)
def profit_only_when_close_reward(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if action == Actions.Long_sell.value and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
# close short
if action == Actions.Short_buy.value and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
return 0.
def profit_only_when_close_reward_aim(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if action == Actions.Long_sell.value and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
if action == Actions.Long_sell.value and self._position == Positions.Long:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(current_price) - np.log(last_trade_price)) * 2)
# close short
if action == Actions.Short_buy.value and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
if action == Actions.Short_buy.value and self._position == Positions.Short:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(last_trade_price) - np.log(current_price)) * 2)
return 0.
def reward_rr_profit_config(self, action):
rw = 0.
pt_1 = self.current_price()
if len(self.close_trade_profit) > 0:
# long
if self._position == Positions.Long:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
if action == Actions.Short_buy.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 15
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = -10
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -15
if action == Actions.Long_sell.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 20
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = -15
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -25
if action == Actions.Neutral.value:
if self.close_trade_profit[-1] > 0.005:
rw = 0
elif self.close_trade_profit[-1] < 0:
rw = 0
# short
if self._position == Positions.Short:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
if action == Actions.Long_buy.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 15
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = -10
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw =- -25
if action == Actions.Short_sell.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 20
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = -15
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -25
if action == Actions.Neutral.value:
if self.close_trade_profit[-1] > 0.005:
rw = 0
elif self.close_trade_profit[-1] < 0:
rw = 0
return np.clip(rw, 0, 1)

View File

@ -1,37 +0,0 @@
# dir
DATA_SAVE_DIR = "datasets"
TRAINED_MODEL_DIR = "trained_models"
TENSORBOARD_LOG_DIR = "tensorboard_log"
RESULTS_DIR = "results"
# Model Parameters
A2C_PARAMS = {"n_steps": 5, "ent_coef": 0.01, "learning_rate": 0.0007}
PPO_PARAMS = {
"n_steps": 2048,
"ent_coef": 0.01,
"learning_rate": 0.00025,
"batch_size": 64,
}
DDPG_PARAMS = {"batch_size": 128, "buffer_size": 50000, "learning_rate": 0.001}
TD3_PARAMS = {
"batch_size": 100,
"buffer_size": 1000000,
"learning_rate": 0.001,
}
SAC_PARAMS = {
"batch_size": 64,
"buffer_size": 100000,
"learning_rate": 0.0001,
"learning_starts": 100,
"ent_coef": "auto_0.1",
}
ERL_PARAMS = {
"learning_rate": 3e-5,
"batch_size": 2048,
"gamma": 0.985,
"seed": 312,
"net_dimension": 512,
"target_step": 5000,
"eval_gap": 30,
}
RLlib_PARAMS = {"lr": 5e-5, "train_batch_size": 500, "gamma": 0.99}

View File

@ -1,253 +0,0 @@
import logging
from typing import Any, Dict, Tuple
#from matplotlib.colors import DivergingNorm
from pandas import DataFrame
import pandas as pd
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
import tensorflow as tf
from freqtrade.freqai.prediction_models.BaseTensorFlowModel import BaseTensorFlowModel
from freqtrade.freqai.freqai_interface import IFreqaiModel
from tensorflow.keras.layers import Input, Conv1D, Dense, MaxPooling1D, Flatten, Dropout
from tensorflow.keras.models import Model
import numpy as np
import copy
from keras.layers import *
import random
logger = logging.getLogger(__name__)
# tf.config.run_functions_eagerly(True)
# tf.data.experimental.enable_debug_mode()
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
MAX_EPOCHS = 10
LOOKBACK = 8
class RLPredictionModel_v2(IFreqaiModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), fit().
"""
def fit(self, data_dictionary: Dict, pair) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
train_df = data_dictionary["train_features"]
train_labels = data_dictionary["train_labels"]
test_df = data_dictionary["test_features"]
test_labels = data_dictionary["test_labels"]
n_labels = len(train_labels.columns)
if n_labels > 1:
raise OperationalException(
"Neural Net not yet configured for multi-targets. Please "
" reduce number of targets to 1 in strategy."
)
n_features = len(data_dictionary["train_features"].columns)
BATCH_SIZE = self.freqai_info.get("batch_size", 64)
input_dims = [BATCH_SIZE, self.CONV_WIDTH, n_features]
w1 = WindowGenerator(
input_width=self.CONV_WIDTH,
label_width=1,
shift=1,
train_df=train_df,
val_df=test_df,
train_labels=train_labels,
val_labels=test_labels,
batch_size=BATCH_SIZE,
)
# train_agent()
#pair = self.dd.historical_data[pair]
#gym_env = FreqtradeEnv(data=train_df, prices=0.01, windows_size=100, pair=pair, stake_amount=100)
# sep = '/'
# coin = pair.split(sep, 1)[0]
# # df1 = train_df.filter(regex='price')
# # df2 = df1.filter(regex='raw')
# # df3 = df2.filter(regex=f"{coin}")
# # print(df3)
# price = train_df[f"%-{coin}raw_price_5m"]
# gym_env = RLPrediction_GymAnytrading(signal_features=train_df, prices=price, window_size=100)
# sac = RLPrediction_Agent(gym_env)
# print(sac)
# return 0
return model
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first=True
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, dk.training_features_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk, filtered_dataframe)
if first:
full_df = dk.data_dictionary["prediction_features"]
w1 = WindowGenerator(
input_width=self.CONV_WIDTH,
label_width=1,
shift=1,
test_df=full_df,
batch_size=len(full_df),
)
predictions = self.model.predict(w1.inference)
len_diff = len(dk.do_predict) - len(predictions)
if len_diff > 0:
dk.do_predict = dk.do_predict[len_diff:]
else:
data = dk.data_dictionary["prediction_features"]
data = tf.expand_dims(data, axis=0)
predictions = self.model(data, training=False)
predictions = predictions[:, 0]
pred_df = DataFrame(predictions, columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df)
return (pred_df, np.ones(len(pred_df)))
def set_initial_historic_predictions(
self, df: DataFrame, model: Any, dk: FreqaiDataKitchen, pair: str
) -> None:
pass
# w1 = WindowGenerator(
# input_width=self.CONV_WIDTH, label_width=1, shift=1, test_df=df, batch_size=len(df)
# )
# trained_predictions = model.predict(w1.inference)
# #trained_predictions = trained_predictions[:, 0, 0]
# trained_predictions = trained_predictions[:, 0]
# n_lost_points = len(df) - len(trained_predictions)
# pred_df = DataFrame(trained_predictions, columns=dk.label_list)
# zeros_df = DataFrame(np.zeros((n_lost_points, len(dk.label_list))), columns=dk.label_list)
# pred_df = pd.concat([zeros_df, pred_df], axis=0)
# pred_df = dk.denormalize_labels_from_metadata(pred_df)
# self.dd.historic_predictions[pair] = DataFrame()
# self.dd.historic_predictions[pair] = copy.deepcopy(pred_df)
class WindowGenerator:
def __init__(
self,
input_width,
label_width,
shift,
train_df=None,
val_df=None,
test_df=None,
train_labels=None,
val_labels=None,
test_labels=None,
batch_size=None,
):
# Store the raw data.
self.train_df = train_df
self.val_df = val_df
self.test_df = test_df
self.train_labels = train_labels
self.val_labels = val_labels
self.test_labels = test_labels
self.batch_size = batch_size
self.input_width = input_width
self.label_width = label_width
self.shift = shift
self.total_window_size = input_width + shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
def make_dataset(self, data, labels=None):
data = np.array(data, dtype=np.float32)
if labels is not None:
labels = np.array(labels, dtype=np.float32)
ds = tf.keras.preprocessing.timeseries_dataset_from_array(
data=data,
targets=labels,
sequence_length=self.total_window_size,
sequence_stride=1,
sampling_rate=1,
shuffle=False,
batch_size=self.batch_size,
)
return ds
@property
def train(self):
return self.make_dataset(self.train_df, self.train_labels)
@property
def val(self):
return self.make_dataset(self.val_df, self.val_labels)
@property
def test(self):
return self.make_dataset(self.test_df, self.test_labels)
@property
def inference(self):
return self.make_dataset(self.test_df)
@property
def example(self):
"""Get and cache an example batch of `inputs, labels` for plotting."""
result = getattr(self, "_example", None)
if result is None:
# No example batch was found, so get one from the `.train` dataset
result = next(iter(self.train))
# And cache it for next time
self._example = result
return result

View File

@ -1,273 +0,0 @@
import logging
from typing import Any, Dict, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
import torch as th
from pandas import DataFrame
from stable_baselines3 import PPO
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.prediction_models.RL.RLPrediction_agent_TDQN import TDQN
from freqtrade.freqai.prediction_models.RL.RLPrediction_env_TDQN_5ac import DEnv
#from freqtrade.freqai.prediction_models.RL.RLPrediction_env_TDQN_3ac import DEnv
from freqtrade.persistence import Trade
logger = logging.getLogger(__name__)
class ReinforcementLearning(IFreqaiModel):
"""
User created Reinforcement Learning Model prediction model.
"""
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_dataframe: Full dataframe for the current training period
:param metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info("--------------------Starting training " f"{pair} --------------------")
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_dataframe,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
data_dictionary: Dict[str, Any] = dk.make_train_test_datasets(
features_filtered, labels_filtered)
dk.fit_labels() # useless for now, but just satiating append methods
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
)
logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
model = self.fit(data_dictionary, pair)
if pair not in self.dd.historic_predictions:
self.set_initial_historic_predictions(
data_dictionary['train_features'], model, dk, pair)
self.dd.save_historic_predictions_to_disk()
logger.info(f"--------------------done training {pair}--------------------")
return model
def fit(self, data_dictionary: Dict[str, Any], pair: str = ''):
# train_df = data_dictionary["train_features"]
# # train_labels = data_dictionary["train_labels"]
# test_df = data_dictionary["test_features"]
# # test_labels = data_dictionary["test_labels"]
# # sep = '/'
# # coin = pair.split(sep, 1)[0]
# # price = train_df[f"%-{coin}raw_price_{self.config['timeframe']}"]
# # price.reset_index(inplace=True, drop=True)
# # price = price.to_frame()
# price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
# price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(test_df.index))
# #train_env = GymAnytrading(train_df, price, self.CONV_WIDTH)
# agent_params = self.freqai_info['model_training_parameters']
# reward_params = self.freqai_info['model_reward_parameters']
# train_env = DEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
# #eval_env = DEnv(df=test_df, prices=price_test, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
# #env_instance = SubprocVecEnv([DEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)])
# #train_env.reset()
# #eval_env.reset()
# # model
# #policy_kwargs = dict(net_arch=[512, 512, 512])
# policy_kwargs = dict(activation_fn=th.nn.Tanh,
# net_arch=[256, 256, 256])
# agent = RLPrediction_agent(train_env)
# #eval_agent = RLPrediction_agent(eval_env)
# # PPO
# model_name = 'ppo'
# model = agent.get_model(model_name, model_kwargs=agent_params, policy_kwargs=policy_kwargs)
# trained_model = agent.train_model(model=model,
# tb_log_name=model_name,
# model_kwargs=agent_params,
# train_df=train_df,
# test_df=test_df,
# price=price,
# price_test=price_test,
# window_size=self.CONV_WIDTH)
# # best_model = eval_agent.train_model(model=model,
# # tb_log_name=model_name,
# # model_kwargs=agent_params,
# # eval=eval_env)
# # TDQN
# # model_name = 'TDQN'
# # model = TDQN('TMultiInputPolicy', train_env, policy_kwargs=policy_kwargs, tensorboard_log='./tensorboard_log/',
# # learning_rate=agent_params["learning_rate"], gamma=0.9,
# # target_update_interval=5000, buffer_size=50000,
# # exploration_initial_eps=1, exploration_final_eps=0.1,
# # replay_buffer_class=ReplayBuffer
# # )
# # trained_model = agent.train_model(model=model,
# # tb_log_name=model_name,
# # model_kwargs=agent_params)
# #model.learn(
# # total_timesteps=5000,
# # callback=callback
# # )
agent_params = self.freqai_info['model_training_parameters']
reward_params = self.freqai_info['model_reward_parameters']
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
eval_freq = agent_params["eval_cycles"] * len(test_df)
total_timesteps = agent_params["train_cycles"] * len(train_df)
# price data for model training and evaluation
price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(test_df.index))
# environments
train_env = DEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
eval = DEnv(df=test_df, prices=price_test, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
eval_env = Monitor(eval, ".")
eval_env.reset()
# this should be in config - TODO
agent_type = 'tdqn'
path = self.dk.data_path
eval_callback = EvalCallback(eval_env, best_model_save_path=f"{path}/",
log_path=f"{path}/{agent_type}/logs/", eval_freq=int(eval_freq),
deterministic=True, render=False)
# model arch
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[256, 256, 128])
if agent_type == 'tdqn':
model = TDQN('TMultiInputPolicy', train_env, policy_kwargs=policy_kwargs, tensorboard_log=f"{path}/{agent_type}/tensorboard/",
learning_rate=0.00025, gamma=0.9,
target_update_interval=5000, buffer_size=50000,
exploration_initial_eps=1, exploration_final_eps=0.1,
replay_buffer_class=ReplayBuffer
)
elif agent_type == 'ppo':
model = PPO('MultiInputPolicy', train_env, policy_kwargs=policy_kwargs, tensorboard_log=f"{path}/{agent_type}/tensorboard/",
learning_rate=0.00025, gamma=0.9
)
model.learn(
total_timesteps=int(total_timesteps),
callback=eval_callback
)
print('Training finished!')
return model
def get_state_info(self, pair):
open_trades = Trade.get_trades(trade_filter=Trade.is_open.is_(True))
market_side = 0.5
current_profit = 0
for trade in open_trades:
if trade.pair == pair:
current_value = trade.open_trade_value
openrate = trade.open_rate
if 'long' in trade.enter_tag:
market_side = 1
else:
market_side = 0
current_profit = current_value / openrate -1
total_profit = 0
closed_trades = Trade.get_trades(trade_filter=[Trade.is_open.is_(False), Trade.pair == pair])
for trade in closed_trades:
total_profit += trade.close_profit
return market_side, current_profit, total_profit
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = False
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, dk.training_features_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk, filtered_dataframe)
pred_df = self.rl_model_predict(dk.data_dictionary["prediction_features"], dk, self.model)
pred_df.fillna(0, inplace=True)
return (pred_df, dk.do_predict)
def rl_model_predict(self, dataframe: DataFrame,
dk: FreqaiDataKitchen, model: Any) -> DataFrame:
output = pd.DataFrame(np.full((len(dataframe), 1), 2), columns=dk.label_list)
def _predict(window):
observations = dataframe.iloc[window.index]
res, _ = model.predict(observations, deterministic=True)
return res
output = output.rolling(window=self.CONV_WIDTH).apply(_predict)
return output
def set_initial_historic_predictions(
self, df: DataFrame, model: Any, dk: FreqaiDataKitchen, pair: str
) -> None:
pred_df = self.rl_model_predict(df, dk, model)
pred_df.fillna(0, inplace=True)
self.dd.historic_predictions[pair] = pred_df
hist_preds_df = self.dd.historic_predictions[pair]
for label in hist_preds_df.columns:
if hist_preds_df[label].dtype == object:
continue
hist_preds_df[f'{label}_mean'] = 0
hist_preds_df[f'{label}_std'] = 0
hist_preds_df['do_predict'] = 0
if self.freqai_info['feature_parameters'].get('DI_threshold', 0) > 0:
hist_preds_df['DI_values'] = 0
for return_str in dk.data['extra_returns_per_train']:
hist_preds_df[return_str] = 0

View File

@ -0,0 +1,155 @@
import logging
from typing import Any, Dict # , Tuple
import numpy as np
# import numpy.typing as npt
# import pandas as pd
import torch as th
# from pandas import DataFrame
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
# from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv, Actions, Positions
from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
logger = logging.getLogger(__name__)
class ReinforcementLearningPPO(BaseReinforcementLearningModel):
"""
User created Reinforcement Learning Model prediction model.
"""
def fit(self, data_dictionary: Dict[str, Any], pair: str = ''):
agent_params = self.freqai_info['model_training_parameters']
reward_params = self.freqai_info['model_reward_parameters']
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
eval_freq = agent_params["eval_cycles"] * len(test_df)
total_timesteps = agent_params["train_cycles"] * len(train_df)
# price data for model training and evaluation
price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(
len(test_df.index))
# environments
train_env = MyRLEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH,
reward_kwargs=reward_params)
eval = MyRLEnv(df=test_df, prices=price_test,
window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
eval_env = Monitor(eval, ".")
eval_env.reset()
path = self.dk.data_path
eval_callback = EvalCallback(eval_env, best_model_save_path=f"{path}/",
log_path=f"{path}/ppo/logs/", eval_freq=int(eval_freq),
deterministic=True, render=False)
# model arch
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[256, 256, 128])
model = PPO('MultiInputPolicy', train_env, policy_kwargs=policy_kwargs,
tensorboard_log=f"{path}/ppo/tensorboard/", learning_rate=0.00025, gamma=0.9
)
model.learn(
total_timesteps=int(total_timesteps),
callback=eval_callback
)
print('Training finished!')
return model
class MyRLEnv(BaseRLEnv):
"""
User can override any function in BaseRLEnv and gym.Env
"""
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action):
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short.value:
self._position = Positions.Short
trade_type = "short"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
def calculate_reward(self, action):
if self._last_trade_tick is None:
return 0.
# close long
if (action == Actions.Short.value or
action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
# close short
if (action == Actions.Long.value or
action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
return 0.

View File

@ -0,0 +1,168 @@
import logging
from typing import Any, Dict, Optional
import numpy as np
import torch as th
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
# from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv, Actions, Positions
from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
from freqtrade.freqai.RL.TDQNagent import TDQN
from stable_baselines3.common.buffers import ReplayBuffer
logger = logging.getLogger(__name__)
class ReinforcementLearningPPO(BaseReinforcementLearningModel):
"""
User created Reinforcement Learning Model prediction model.
"""
def fit(self, data_dictionary: Dict[str, Any], pair: str = ''):
agent_params = self.freqai_info['model_training_parameters']
reward_params = self.freqai_info['model_reward_parameters']
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
eval_freq = agent_params["eval_cycles"] * len(test_df)
total_timesteps = agent_params["train_cycles"] * len(train_df)
# price data for model training and evaluation
price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(
len(test_df.index))
# environments
train_env = MyRLEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH,
reward_kwargs=reward_params)
eval = MyRLEnv(df=test_df, prices=price_test,
window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
eval_env = Monitor(eval, ".")
eval_env.reset()
path = self.dk.data_path
eval_callback = EvalCallback(eval_env, best_model_save_path=f"{path}/",
log_path=f"{path}/tdqn/logs/", eval_freq=int(eval_freq),
deterministic=True, render=False)
# model arch
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[256, 256, 128])
model = TDQN('TMultiInputPolicy', train_env,
policy_kwargs=policy_kwargs,
tensorboard_log=f"{path}/tdqn/tensorboard/",
learning_rate=0.00025, gamma=0.9,
target_update_interval=5000, buffer_size=50000,
exploration_initial_eps=1, exploration_final_eps=0.1,
replay_buffer_class=Optional(ReplayBuffer)
)
model.learn(
total_timesteps=int(total_timesteps),
callback=eval_callback
)
print('Training finished!')
return model
class MyRLEnv(BaseRLEnv):
"""
User can override any function in BaseRLEnv and gym.Env
"""
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action):
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short.value:
self._position = Positions.Short
trade_type = "short"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
def calculate_reward(self, action):
if self._last_trade_tick is None:
return 0.
# close long
if action == Actions.Long_sell.value and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
if action == Actions.Long_sell.value and self._position == Positions.Long:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(current_price) - np.log(last_trade_price)) * 2)
# close short
if action == Actions.Short_buy.value and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
if action == Actions.Short_buy.value and self._position == Positions.Short:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(last_trade_price) - np.log(current_price)) * 2)
return 0.