import logging from enum import Enum import gym import numpy as np import pandas as pd from gym import spaces from gym.utils import seeding from pandas import DataFrame # from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union logger = logging.getLogger(__name__) class Actions(Enum): Short = 0 Long = 1 Neutral = 2 class Positions(Enum): Short = 0 Long = 1 Neutral = 0.5 def opposite(self): return Positions.Short if self == Positions.Long else Positions.Long def mean_over_std(x): std = np.std(x, ddof=1) mean = np.mean(x) return mean / std if std > 0 else 0 class Base3ActionRLEnv(gym.Env): metadata = {'render.modes': ['human']} def __init__(self, df: DataFrame = DataFrame(), prices: DataFrame = DataFrame(), reward_kwargs: dict = {}, window_size=10, starting_point=True, id: str = 'baseenv-1', seed: int = 1): assert df.ndim == 2 self.id = id self.seed(seed) self.reset_env(df, prices, window_size, reward_kwargs, starting_point) def reset_env(self, df, prices, window_size, reward_kwargs, starting_point=True): self.df = df self.signal_features = self.df self.prices = prices self.window_size = window_size self.starting_point = starting_point self.rr = reward_kwargs["rr"] self.profit_aim = reward_kwargs["profit_aim"] self.fee = 0.0015 # # spaces self.shape = (window_size, self.signal_features.shape[1] + 2) self.action_space = spaces.Discrete(len(Actions)) self.observation_space = spaces.Box( low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32) # episode self._start_tick = self.window_size self._end_tick = len(self.prices) - 1 self._done = None self._current_tick = None self._last_trade_tick = None self._position = Positions.Neutral self._position_history = None self.total_reward = None self._total_profit = None self._first_rendering = None self.history = None self.trade_history = [] def seed(self, seed: int = 1): self.np_random, seed = seeding.np_random(seed) return [seed] def reset(self): self._done = False if self.starting_point is True: self._position_history = (self._start_tick * [None]) + [self._position] else: self._position_history = (self.window_size * [None]) + [self._position] self._current_tick = self._start_tick self._last_trade_tick = None self._position = Positions.Neutral self.total_reward = 0. self._total_profit = 1. # unit self._first_rendering = True self.history = {} self.trade_history = [] self.portfolio_log_returns = np.zeros(len(self.prices)) self._profits = [(self._start_tick, 1)] self.close_trade_profit = [] return self._get_observation() def step(self, action: int): self._done = False self._current_tick += 1 if self._current_tick == self._end_tick: self._done = True self.update_portfolio_log_returns(action) self._update_profit(action) step_reward = self.calculate_reward(action) self.total_reward += step_reward trade_type = None if self.is_tradesignal(action): # exclude 3 case not trade # Update position """ Action: Neutral, position: Long -> Close Long Action: Neutral, position: Short -> Close Short Action: Long, position: Neutral -> Open Long Action: Long, position: Short -> Close Short and Open Long Action: Short, position: Neutral -> Open Short Action: Short, position: Long -> Close Long and Open Short """ if action == Actions.Neutral.value: self._position = Positions.Neutral trade_type = "neutral" elif action == Actions.Long.value: self._position = Positions.Long trade_type = "long" elif action == Actions.Short.value: self._position = Positions.Short trade_type = "short" else: print("case not defined") # Update last trade tick self._last_trade_tick = self._current_tick if trade_type is not None: self.trade_history.append( {'price': self.current_price(), 'index': self._current_tick, 'type': trade_type}) if self._total_profit < 0.2: self._done = True self._position_history.append(self._position) observation = self._get_observation() info = dict( tick=self._current_tick, total_reward=self.total_reward, total_profit=self._total_profit, position=self._position.value ) self._update_history(info) return observation, step_reward, self._done, info def _get_observation(self): features_window = self.signal_features[( self._current_tick - self.window_size):self._current_tick] features_and_state = DataFrame(np.zeros((len(features_window), 2)), columns=['current_profit_pct', 'position'], index=features_window.index) features_and_state['current_profit_pct'] = self.get_unrealized_profit() features_and_state['position'] = self._position.value features_and_state = pd.concat([features_window, features_and_state], axis=1) return features_and_state def get_unrealized_profit(self): if self._last_trade_tick is None: return 0. if self._position == Positions.Neutral: return 0. elif self._position == Positions.Short: current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open) last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open) return (last_trade_price - current_price) / last_trade_price elif self._position == Positions.Long: current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open) last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open) return (current_price - last_trade_price) / last_trade_price else: return 0. def is_tradesignal(self, action: int): # trade signal """ not trade signal is : Action: Neutral, position: Neutral -> Nothing Action: Long, position: Long -> Hold Long Action: Short, position: Short -> Hold Short """ return not ((action == Actions.Neutral.value and self._position == Positions.Neutral) or (action == Actions.Short.value and self._position == Positions.Short) or (action == Actions.Long.value and self._position == Positions.Long)) def _is_trade(self, action: Actions): return ((action == Actions.Long.value and self._position == Positions.Short) or (action == Actions.Short.value and self._position == Positions.Long) or (action == Actions.Neutral.value and self._position == Positions.Long) or (action == Actions.Neutral.value and self._position == Positions.Short) ) def is_hold(self, action): return ((action == Actions.Short.value and self._position == Positions.Short) or (action == Actions.Long.value and self._position == Positions.Long)) def add_buy_fee(self, price): return price * (1 + self.fee) def add_sell_fee(self, price): return price / (1 + self.fee) def _update_history(self, info): if not self.history: self.history = {key: [] for key in info.keys()} for key, value in info.items(): self.history[key].append(value) def get_sharpe_ratio(self): return mean_over_std(self.get_portfolio_log_returns()) def calculate_reward(self, action): if self._last_trade_tick is None: return 0. # close long if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long: last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open) current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open) return float(np.log(current_price) - np.log(last_trade_price)) # close short if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short: last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open) current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open) return float(np.log(last_trade_price) - np.log(current_price)) return 0. def _update_profit(self, action): if self._is_trade(action) or self._done: pnl = self.get_unrealized_profit() if self._position == Positions.Long: self._total_profit = self._total_profit + self._total_profit * pnl self._profits.append((self._current_tick, self._total_profit)) self.close_trade_profit.append(pnl) if self._position == Positions.Short: self._total_profit = self._total_profit + self._total_profit * pnl self._profits.append((self._current_tick, self._total_profit)) self.close_trade_profit.append(pnl) def most_recent_return(self, action: int): """ We support Long, Neutral and Short positions. Return is generated from rising prices in Long and falling prices in Short positions. The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee. """ # Long positions if self._position == Positions.Long: current_price = self.prices.iloc[self._current_tick].open if action == Actions.Short.value or action == Actions.Neutral.value: current_price = self.add_sell_fee(current_price) previous_price = self.prices.iloc[self._current_tick - 1].open if (self._position_history[self._current_tick - 1] == Positions.Short or self._position_history[self._current_tick - 1] == Positions.Neutral): previous_price = self.add_buy_fee(previous_price) return np.log(current_price) - np.log(previous_price) # Short positions if self._position == Positions.Short: current_price = self.prices.iloc[self._current_tick].open if action == Actions.Long.value or action == Actions.Neutral.value: current_price = self.add_buy_fee(current_price) previous_price = self.prices.iloc[self._current_tick - 1].open if (self._position_history[self._current_tick - 1] == Positions.Long or self._position_history[self._current_tick - 1] == Positions.Neutral): previous_price = self.add_sell_fee(previous_price) return np.log(previous_price) - np.log(current_price) return 0 def get_portfolio_log_returns(self): return self.portfolio_log_returns[1:self._current_tick + 1] def update_portfolio_log_returns(self, action): self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action) def current_price(self) -> float: return self.prices.iloc[self._current_tick].open def prev_price(self) -> float: return self.prices.iloc[self._current_tick - 1].open def sharpe_ratio(self) -> float: if len(self.close_trade_profit) == 0: return 0. returns = np.array(self.close_trade_profit) reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9) return reward