make base 3ac and base 5ac environments. TDQN defaults to 3AC.

This commit is contained in:
robcaulk 2022-08-15 12:13:37 +02:00
parent 096533bcb9
commit 926023935f
5 changed files with 417 additions and 223 deletions

View File

@ -31,7 +31,7 @@ def mean_over_std(x):
return mean / std if std > 0 else 0
class BaseRLEnv(gym.Env):
class Base3ActionRLEnv(gym.Env):
metadata = {'render.modes': ['human']}

View File

@ -0,0 +1,364 @@
import logging
from enum import Enum
# from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import gym
import numpy as np
from gym import spaces
from gym.utils import seeding
logger = logging.getLogger(__name__)
class Actions(Enum):
Neutral = 0
Long_buy = 1
Long_sell = 2
Short_buy = 3
Short_sell = 4
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
def mean_over_std(x):
std = np.std(x, ddof=1)
mean = np.mean(x)
return mean / std if std > 0 else 0
class Base5ActionRLEnv(gym.Env):
"""
Base class for a 5 action environment
"""
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee = 0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point is True:
self._position_history = (self._start_tick * [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self.calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long_buy.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short_buy.value:
self._position = Positions.Short
trade_type = "short"
elif action == Actions.Long_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Short_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
# def processState(self, state):
# return state.to_numpy()
# def convert_mlp_Policy(self, obs_):
# pass
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick is None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price) / last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price) / last_trade_price
else:
return 0.
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral) or
(action == Actions.Short_buy.value and self._position == Positions.Short) or
(action == Actions.Short_sell.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Short_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Long) or
(action == Actions.Long_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Long_sell.value and self._position == Positions.Short))
def _is_trade(self, action: Actions):
return ((action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short) or
(action == Actions.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions.Neutral.Long_sell and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
def _update_profit(self, action):
# if self._is_trade(action) or self._done:
if self._is_trade(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
self._total_profit = self._total_profit + self._total_profit * pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
if self._position == Positions.Short:
self._total_profit = self._total_profit + self._total_profit * pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
def most_recent_return(self, action):
"""
We support Long, Neutral and Short positions.
Return is generated from rising prices in Long
and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
# Long positions
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
# if action == Actions.Short.value or action == Actions.Neutral.value:
if action == Actions.Short_buy.value or action == Actions.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_buy_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
# Short positions
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
# if action == Actions.Long.value or action == Actions.Neutral.value:
if action == Actions.Long_buy.value or action == Actions.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_sell_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
return 0
def get_portfolio_log_returns(self):
return self.portfolio_log_returns[1:self._current_tick + 1]
def get_trading_log_return(self):
return self.portfolio_log_returns[self._start_tick:]
def update_portfolio_log_returns(self, action):
self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action)
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def prev_price(self) -> float:
return self.prices.iloc[self._current_tick - 1].open
def sharpe_ratio(self):
if len(self.close_trade_profit) == 0:
return 0.
returns = np.array(self.close_trade_profit)
reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9)
return reward
def get_bnh_log_return(self):
return np.diff(np.log(self.prices['open'][self._start_tick:]))
def calculate_reward(self, action):
if self._last_trade_tick is None:
return 0.
# close long
if action == Actions.Long_sell.value and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
if action == Actions.Long_sell.value and self._position == Positions.Long:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(current_price) - np.log(last_trade_price)) * 2)
# close short
if action == Actions.Short_buy.value and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
if action == Actions.Short_buy.value and self._position == Positions.Short:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(last_trade_price) - np.log(current_price)) * 2)
return 0.

View File

@ -8,7 +8,7 @@ from pandas import DataFrame
from abc import abstractmethod
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv, Actions, Positions
from freqtrade.freqai.RL.Base3ActionRLEnv import Base3ActionRLEnv, Actions, Positions
from freqtrade.persistence import Trade
logger = logging.getLogger(__name__)
@ -165,7 +165,7 @@ class BaseReinforcementLearningModel(IFreqaiModel):
hist_preds_df[return_str] = 0
class MyRLEnv(BaseRLEnv):
class MyRLEnv(Base3ActionRLEnv):
def step(self, action):
self._done = False

View File

@ -10,7 +10,7 @@ from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
# from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv, Actions, Positions
from freqtrade.freqai.RL.Base3ActionRLEnv import Base3ActionRLEnv, Actions, Positions
from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
@ -67,7 +67,7 @@ class ReinforcementLearningPPO(BaseReinforcementLearningModel):
return model
class MyRLEnv(BaseRLEnv):
class MyRLEnv(Base3ActionRLEnv):
"""
User can override any function in BaseRLEnv and gym.Env
"""

View File

@ -1,17 +1,14 @@
import logging
from typing import Any, Dict # Optional
from enum import Enum
import numpy as np
import torch as th
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
# from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv
from freqtrade.freqai.RL.Base3ActionRLEnv import Base3ActionRLEnv, Actions, Positions
from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
from freqtrade.freqai.RL.TDQNagent import TDQN
from stable_baselines3.common.buffers import ReplayBuffer
from gym import spaces
from gym.utils import seeding
import numpy as np
logger = logging.getLogger(__name__)
@ -71,233 +68,66 @@ class ReinforcementLearningTDQN(BaseReinforcementLearningModel):
return model
class Actions(Enum):
Neutral = 0
Long_buy = 1
Long_sell = 2
Short_buy = 3
Short_sell = 4
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
class MyRLEnv(BaseRLEnv):
class MyRLEnv(Base3ActionRLEnv):
"""
User can override any function in BaseRLEnv and gym.Env. Here the user
Adds 5 actions.
User can override any function in BaseRLEnv and gym.Env
"""
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee = 0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point is True:
self._position_history = (self._start_tick * [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self.calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long_buy.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short_buy.value:
self._position = Positions.Short
trade_type = "short"
elif action == Actions.Long_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Short_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
def calculate_reward(self, action):
if self._last_trade_tick is None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price) / last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
# close long
if (action == Actions.Short.value or
action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price) / last_trade_price
else:
return 0.
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral) or
(action == Actions.Short_buy.value and self._position == Positions.Short) or
(action == Actions.Short_sell.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Short_sell.value and self._position == Positions.Long) or
# close short
if (action == Actions.Long.value or
action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
(action == Actions.Long_buy.value and self._position == Positions.Long) or
(action == Actions.Long_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Long_sell.value and self._position == Positions.Short))
return 0.
def _is_trade(self, action):
return ((action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short) or
# User can inherit and customize 5 action environment
# class MyRLEnv(Base5ActionRLEnv):
# """
# User can override any function in BaseRLEnv and gym.Env. Here the user
# Adds 5 actions.
# """
(action == Actions.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions.Neutral.Long_sell and self._position == Positions.Short)
)
# def calculate_reward(self, action):
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
# if self._last_trade_tick is None:
# return 0.
def add_buy_fee(self, price):
return price * (1 + self.fee)
# # close long
# if action == Actions.Long_sell.value and self._position == Positions.Long:
# last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
# current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
# return float(np.log(current_price) - np.log(last_trade_price))
def add_sell_fee(self, price):
return price / (1 + self.fee)
# if action == Actions.Long_sell.value and self._position == Positions.Long:
# if self.close_trade_profit[-1] > self.profit_aim * self.rr:
# last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
# current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
# return float((np.log(current_price) - np.log(last_trade_price)) * 2)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
# # close short
# if action == Actions.Short_buy.value and self._position == Positions.Short:
# last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
# current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
# return float(np.log(last_trade_price) - np.log(current_price))
for key, value in info.items():
self.history[key].append(value)
# if action == Actions.Short_buy.value and self._position == Positions.Short:
# if self.close_trade_profit[-1] > self.profit_aim * self.rr:
# last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
# current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
# return float((np.log(last_trade_price) - np.log(current_price)) * 2)
# return 0.