initial commit - new dev branch

This commit is contained in:
MukavaValkku 2022-08-14 16:24:20 +03:00 committed by robcaulk
parent cd3fe44424
commit 9b895500b3
6 changed files with 810 additions and 920 deletions

View File

@ -1,11 +1,15 @@
# common library
import gym
import numpy as np
from stable_baselines3 import A2C, DDPG, PPO, SAC, TD3
from stable_baselines3.common.callbacks import BaseCallback, EvalCallback
from stable_baselines3.common.callbacks import (BaseCallback, CallbackList, CheckpointCallback,
EvalCallback, StopTrainingOnRewardThreshold)
from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
from freqtrade.freqai.prediction_models.RL import config
#from freqtrade.freqai.prediction_models.RL.RLPrediction_agent_v2 import TDQN
from freqtrade.freqai.prediction_models.RL.RLPrediction_env import DEnv
# from stable_baselines3.common.vec_env import DummyVecEnv
@ -106,12 +110,30 @@ class RLPrediction_agent:
return model
def train_model(self, model, tb_log_name, model_kwargs):
def train_model(self, model, tb_log_name, model_kwargs, train_df, test_df, price, price_test, window_size):
agent_params = self.freqai_info['model_training_parameters']
reward_params = self.freqai_info['model_reward_parameters']
train_env = DEnv(df=train_df, prices=price, window_size=window_size, reward_kwargs=reward_params)
eval_env = DEnv(df=test_df, prices=price_test, window_size=window_size, reward_kwargs=reward_params)
# checkpoint_callback = CheckpointCallback(save_freq=1000, save_path='./logs/',
# name_prefix='rl_model')
checkpoint_callback = CheckpointCallback(save_freq=1000, save_path='./logs/')
eval_callback = EvalCallback(eval_env, best_model_save_path='./logs/best_model', log_path='./logs/results', eval_freq=500)
#callback_on_best = StopTrainingOnRewardThreshold(reward_threshold=-200, verbose=1)
# Create the callback list
callback = CallbackList([checkpoint_callback, eval_callback])
model = model.learn(
total_timesteps=model_kwargs["total_timesteps"],
tb_log_name=tb_log_name,
#callback=eval_callback,
callback=TensorboardCallback(),
callback=callback,
#callback=TensorboardCallback(),
)
return model

View File

@ -1,23 +1,18 @@
import torch as th
from torch import nn
from typing import Dict, List, Tuple, Type, Optional, Any, Union
from typing import Any, Dict, List, Optional, Tuple, Type, Union
import gym
from stable_baselines3.common.type_aliases import GymEnv, Schedule
from stable_baselines3.common.torch_layers import (
BaseFeaturesExtractor,
FlattenExtractor,
CombinedExtractor
)
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3 import DQN
from stable_baselines3.common.policies import BasePolicy
#from stable_baselines3.common.policies import register_policy
from stable_baselines3.dqn.policies import (
QNetwork, DQNPolicy, MultiInputPolicy,
CnnPolicy, DQNPolicy, MlpPolicy)
import torch
import torch as th
from stable_baselines3 import DQN
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3.common.policies import BasePolicy
from stable_baselines3.common.torch_layers import (BaseFeaturesExtractor, CombinedExtractor,
FlattenExtractor)
from stable_baselines3.common.type_aliases import GymEnv, Schedule
#from stable_baselines3.common.policies import register_policy
from stable_baselines3.dqn.policies import (CnnPolicy, DQNPolicy, MlpPolicy, MultiInputPolicy,
QNetwork)
from torch import nn
def create_mlp_(
@ -30,7 +25,7 @@ def create_mlp_(
dropout = 0.2
if len(net_arch) > 0:
number_of_neural = net_arch[0]
modules = [
nn.Linear(input_dim, number_of_neural),
nn.BatchNorm1d(number_of_neural),
@ -69,19 +64,19 @@ class TDQNetwork(QNetwork):
features_dim=features_dim,
net_arch=net_arch,
activation_fn=activation_fn,
normalize_images=normalize_images
normalize_images=normalize_images
)
action_dim = self.action_space.n
q_net = create_mlp_(self.features_dim, action_dim, self.net_arch, self.activation_fn)
self.q_net = nn.Sequential(*q_net).apply(self.init_weights)
def init_weights(self, m):
if type(m) == nn.Linear:
torch.nn.init.kaiming_uniform_(m.weight)
class TDQNPolicy(DQNPolicy):
def __init__(
self,
observation_space: gym.spaces.Space,
@ -107,7 +102,7 @@ class TDQNPolicy(DQNPolicy):
optimizer_class=optimizer_class,
optimizer_kwargs=optimizer_kwargs
)
@staticmethod
def init_weights(module: nn.Module, gain: float = 1) -> None:
"""
@ -117,13 +112,13 @@ class TDQNPolicy(DQNPolicy):
nn.init.kaiming_uniform_(module.weight)
if module.bias is not None:
module.bias.data.fill_(0.0)
def make_q_net(self) -> TDQNetwork:
# Make sure we always have separate networks for features extractors etc
net_args = self._update_features_extractor(self.net_args, features_extractor=None)
return TDQNetwork(**net_args).to(self.device)
class TMultiInputPolicy(TDQNPolicy):
def __init__(
self,
@ -150,8 +145,8 @@ class TMultiInputPolicy(TDQNPolicy):
optimizer_class,
optimizer_kwargs,
)
class TDQN(DQN):
policy_aliases: Dict[str, Type[BasePolicy]] = {
@ -216,10 +211,10 @@ class TDQN(DQN):
device=device,
_init_setup_model=_init_setup_model
)
# try:
# register_policy("TMultiInputPolicy", TMultiInputPolicy)
# except:
# print("already registered")
# print("already registered")

View File

@ -0,0 +1,513 @@
import logging
import random
from collections import deque
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import gym
import matplotlib.pylab as plt
import numpy as np
import pandas as pd
from gym import spaces
from gym.utils import seeding
logger = logging.getLogger(__name__)
class Actions(Enum):
Short = 0
Long = 1
Neutral = 2
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
def mean_over_std(x):
std = np.std(x, ddof=1)
mean = np.mean(x)
return mean / std if std > 0 else 0
class DEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee=0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point == True:
self._position_history = (self._start_tick* [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
#self._last_trade_tick = self._current_tick - 1
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
temp_position = self._position
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short.value:
self._position = Positions.Short
trade_type = "short"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type != None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick, 'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick = self._current_tick,
total_reward = self.total_reward,
total_profit = self._total_profit,
position = self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
# def processState(self, state):
# return state.to_numpy()
# def convert_mlp_Policy(self, obs_):
# pass
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick == None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price)/last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price)/last_trade_price
else:
return 0.
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral)
or (action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def _is_trade(self, action: Actions):
return ((action == Actions.Long.value and self._position == Positions.Short) or
(action == Actions.Short.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)
# def render(self, mode='human'):
# def _plot_position(position, tick):
# color = None
# if position == Positions.Short:
# color = 'red'
# elif position == Positions.Long:
# color = 'green'
# if color:
# plt.scatter(tick, self.prices.loc[tick].open, color=color)
# if self._first_rendering:
# self._first_rendering = False
# plt.cla()
# plt.plot(self.prices)
# start_position = self._position_history[self._start_tick]
# _plot_position(start_position, self._start_tick)
# plt.cla()
# plt.plot(self.prices)
# _plot_position(self._position, self._current_tick)
# plt.suptitle("Total Reward: %.6f" % self.total_reward + ' ~ ' + "Total Profit: %.6f" % self._total_profit)
# plt.pause(0.01)
# def render_all(self):
# plt.figure()
# window_ticks = np.arange(len(self._position_history))
# plt.plot(self.prices['open'], alpha=0.5)
# short_ticks = []
# long_ticks = []
# neutral_ticks = []
# for i, tick in enumerate(window_ticks):
# if self._position_history[i] == Positions.Short:
# short_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Long:
# long_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Neutral:
# neutral_ticks.append(tick - 1)
# plt.plot(neutral_ticks, self.prices.loc[neutral_ticks].open,
# 'o', color='grey', ms=3, alpha=0.1)
# plt.plot(short_ticks, self.prices.loc[short_ticks].open,
# 'o', color='r', ms=3, alpha=0.8)
# plt.plot(long_ticks, self.prices.loc[long_ticks].open,
# 'o', color='g', ms=3, alpha=0.8)
# plt.suptitle("Generalising")
# fig = plt.gcf()
# fig.set_size_inches(15, 10)
# def close_trade_report(self):
# small_trade = 0
# positive_big_trade = 0
# negative_big_trade = 0
# small_profit = 0.003
# for i in self.close_trade_profit:
# if i < small_profit and i > -small_profit:
# small_trade+=1
# elif i > small_profit:
# positive_big_trade += 1
# elif i < -small_profit:
# negative_big_trade += 1
# print(f"small trade={small_trade/len(self.close_trade_profit)}; positive_big_trade={positive_big_trade/len(self.close_trade_profit)}; negative_big_trade={negative_big_trade/len(self.close_trade_profit)}")
# def report(self):
# # get total trade
# long_trade = 0
# short_trade = 0
# neutral_trade = 0
# for trade in self.trade_history:
# if trade['type'] == 'long':
# long_trade += 1
# elif trade['type'] == 'short':
# short_trade += 1
# else:
# neutral_trade += 1
# negative_trade = 0
# positive_trade = 0
# for tr in self.close_trade_profit:
# if tr < 0.:
# negative_trade += 1
# if tr > 0.:
# positive_trade += 1
# total_trade_lr = negative_trade+positive_trade
# total_trade = long_trade + short_trade
# sharp_ratio = self.sharpe_ratio()
# sharp_log = self.get_sharpe_ratio()
# from tabulate import tabulate
# headers = ["Performance", ""]
# performanceTable = [["Total Trade", "{0:.2f}".format(total_trade)],
# ["Total reward", "{0:.3f}".format(self.total_reward)],
# ["Start profit(unit)", "{0:.2f}".format(1.)],
# ["End profit(unit)", "{0:.3f}".format(self._total_profit)],
# ["Sharp ratio", "{0:.3f}".format(sharp_ratio)],
# ["Sharp log", "{0:.3f}".format(sharp_log)],
# # ["Sortino ratio", "{0:.2f}".format(0) + '%'],
# ["winrate", "{0:.2f}".format(positive_trade*100/total_trade_lr) + '%']
# ]
# tabulation = tabulate(performanceTable, headers, tablefmt="fancy_grid", stralign="center")
# print(tabulation)
# result = {
# "Start": "{0:.2f}".format(1.),
# "End": "{0:.2f}".format(self._total_profit),
# "Sharp": "{0:.3f}".format(sharp_ratio),
# "Winrate": "{0:.2f}".format(positive_trade*100/total_trade_lr)
# }
# return result
# def close(self):
# plt.close()
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
# def save_rendering(self, filepath):
# plt.savefig(filepath)
# def pause_rendering(self):
# plt.show()
def _calculate_reward(self, action):
# rw = self.transaction_profit_reward(action)
#rw = self.reward_rr_profit_config(action)
rw = self.profit_only_when_close_reward(action)
#rw = self.profit_only_when_close_reward_aim(action)
return rw
def _update_profit(self, action):
if self._is_trade(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
if self._position == Positions.Short:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
def most_recent_return(self, action):
"""
We support Long, Neutral and Short positions.
Return is generated from rising prices in Long
and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
# Long positions
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
if action == Actions.Short.value or action == Actions.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_buy_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
# Short positions
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
if action == Actions.Long.value or action == Actions.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_sell_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
return 0
def get_portfolio_log_returns(self):
return self.portfolio_log_returns[1:self._current_tick + 1]
# def get_trading_log_return(self):
# return self.portfolio_log_returns[self._start_tick:]
def update_portfolio_log_returns(self, action):
self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action)
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def prev_price(self) -> float:
return self.prices.iloc[self._current_tick-1].open
def sharpe_ratio(self):
if len(self.close_trade_profit) == 0:
return 0.
returns = np.array(self.close_trade_profit)
reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9)
return reward
# def get_bnh_log_return(self):
# return np.diff(np.log(self.prices['open'][self._start_tick:]))
def transaction_profit_reward(self, action):
rw = 0.
pt = self.prev_price()
pt_1 = self.current_price()
if self._position == Positions.Long:
a_t = 1
elif self._position == Positions.Short:
a_t = -1
else:
a_t = 0
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
# close short
elif (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
pt_1 = self.add_buy_fee(self.current_price())
po = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
else:
rw = a_t*(pt_1 - pt)/pt
return np.clip(rw, 0, 1)
def profit_only_when_close_reward_aim(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(current_price) - np.log(last_trade_price)) * 2)
# close short
if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(last_trade_price) - np.log(current_price)) * 2)
return 0.
def profit_only_when_close_reward(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
# close short
if (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
return 0.

View File

@ -2,6 +2,7 @@ import logging
import random
from collections import deque
from enum import Enum
#from sklearn.decomposition import PCA, KernelPCA
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
import gym
@ -10,7 +11,6 @@ import numpy as np
import pandas as pd
from gym import spaces
from gym.utils import seeding
from sklearn.decomposition import PCA, KernelPCA
logger = logging.getLogger(__name__)
@ -29,12 +29,8 @@ logger = logging.getLogger(__name__)
# Label, LabelSet
# )
class Actions(Enum):
Short = 0
Long = 1
Neutral = 2
class Actions_v2(Enum):
class Actions(Enum):
Neutral = 0
Long_buy = 1
Long_sell = 2
@ -75,7 +71,7 @@ class DEnv(gym.Env):
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions_v2))
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
@ -152,7 +148,7 @@ class DEnv(gym.Env):
trade_type = None
if self.is_tradesignal_v2(action): # exclude 3 case not trade
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
@ -167,19 +163,19 @@ class DEnv(gym.Env):
temp_position = self._position
if action == Actions_v2.Neutral.value:
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions_v2.Long_buy.value:
elif action == Actions.Long_buy.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions_v2.Short_buy.value:
elif action == Actions.Short_buy.value:
self._position = Positions.Short
trade_type = "short"
elif action == Actions_v2.Long_sell.value:
elif action == Actions.Long_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions_v2.Short_sell.value:
elif action == Actions.Short_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
else:
@ -208,11 +204,11 @@ class DEnv(gym.Env):
return observation, step_reward, self._done, info
def processState(self, state):
return state.to_numpy()
# def processState(self, state):
# return state.to_numpy()
def convert_mlp_Policy(self, obs_):
pass
# def convert_mlp_Policy(self, obs_):
# pass
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
@ -245,46 +241,26 @@ class DEnv(gym.Env):
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral)
or (action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def is_tradesignal_v2(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions_v2.Neutral.value and self._position == Positions.Neutral) or
(action == Actions_v2.Short_buy.value and self._position == Positions.Short) or
(action == Actions_v2.Short_sell.value and self._position == Positions.Short) or
(action == Actions_v2.Short_buy.value and self._position == Positions.Long) or
(action == Actions_v2.Short_sell.value and self._position == Positions.Long) or
(action == Actions_v2.Long_buy.value and self._position == Positions.Long) or
(action == Actions_v2.Long_sell.value and self._position == Positions.Long) or
(action == Actions_v2.Long_buy.value and self._position == Positions.Short) or
(action == Actions_v2.Long_sell.value and self._position == Positions.Short))
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral) or
(action == Actions.Short_buy.value and self._position == Positions.Short) or
(action == Actions.Short_sell.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Short_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Long) or
(action == Actions.Long_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Long_sell.value and self._position == Positions.Short))
def _is_trade(self, action: Actions):
return ((action == Actions.Long.value and self._position == Positions.Short) or
(action == Actions.Short.value and self._position == Positions.Long) or
return ((action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short)
)
(action == Actions.Neutral.value and self._position == Positions.Short) or
def _is_trade_v2(self, action: Actions_v2):
return ((action == Actions_v2.Long_buy.value and self._position == Positions.Short) or
(action == Actions_v2.Short_buy.value and self._position == Positions.Long) or
(action == Actions_v2.Neutral.value and self._position == Positions.Long) or
(action == Actions_v2.Neutral.value and self._position == Positions.Short) or
(action == Actions_v2.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions_v2.Neutral.Long_sell and self._position == Positions.Short)
(action == Actions.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions.Neutral.Long_sell and self._position == Positions.Short)
)
@ -292,9 +268,6 @@ class DEnv(gym.Env):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def is_hold_v2(self, action):
return ((action == Actions_v2.Short_buy.value and self._position == Positions.Short)
or (action == Actions_v2.Long_buy.value and self._position == Positions.Long))
def add_buy_fee(self, price):
@ -311,156 +284,158 @@ class DEnv(gym.Env):
self.history[key].append(value)
def render(self, mode='human'):
# def render(self, mode='human'):
def _plot_position(position, tick):
color = None
if position == Positions.Short:
color = 'red'
elif position == Positions.Long:
color = 'green'
if color:
plt.scatter(tick, self.prices.loc[tick].open, color=color)
# def _plot_position(position, tick):
# color = None
# if position == Positions.Short:
# color = 'red'
# elif position == Positions.Long:
# color = 'green'
# if color:
# plt.scatter(tick, self.prices.loc[tick].open, color=color)
if self._first_rendering:
self._first_rendering = False
plt.cla()
plt.plot(self.prices)
start_position = self._position_history[self._start_tick]
_plot_position(start_position, self._start_tick)
# if self._first_rendering:
# self._first_rendering = False
# plt.cla()
# plt.plot(self.prices)
# start_position = self._position_history[self._start_tick]
# _plot_position(start_position, self._start_tick)
plt.cla()
plt.plot(self.prices)
_plot_position(self._position, self._current_tick)
# plt.cla()
# plt.plot(self.prices)
# _plot_position(self._position, self._current_tick)
plt.suptitle("Total Reward: %.6f" % self.total_reward + ' ~ ' + "Total Profit: %.6f" % self._total_profit)
plt.pause(0.01)
# plt.suptitle("Total Reward: %.6f" % self.total_reward + ' ~ ' + "Total Profit: %.6f" % self._total_profit)
# plt.pause(0.01)
def render_all(self):
plt.figure()
window_ticks = np.arange(len(self._position_history))
plt.plot(self.prices['open'], alpha=0.5)
# def render_all(self):
# plt.figure()
# window_ticks = np.arange(len(self._position_history))
# plt.plot(self.prices['open'], alpha=0.5)
short_ticks = []
long_ticks = []
neutral_ticks = []
for i, tick in enumerate(window_ticks):
if self._position_history[i] == Positions.Short:
short_ticks.append(tick - 1)
elif self._position_history[i] == Positions.Long:
long_ticks.append(tick - 1)
elif self._position_history[i] == Positions.Neutral:
neutral_ticks.append(tick - 1)
# short_ticks = []
# long_ticks = []
# neutral_ticks = []
# for i, tick in enumerate(window_ticks):
# if self._position_history[i] == Positions.Short:
# short_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Long:
# long_ticks.append(tick - 1)
# elif self._position_history[i] == Positions.Neutral:
# neutral_ticks.append(tick - 1)
plt.plot(neutral_ticks, self.prices.loc[neutral_ticks].open,
'o', color='grey', ms=3, alpha=0.1)
plt.plot(short_ticks, self.prices.loc[short_ticks].open,
'o', color='r', ms=3, alpha=0.8)
plt.plot(long_ticks, self.prices.loc[long_ticks].open,
'o', color='g', ms=3, alpha=0.8)
# plt.plot(neutral_ticks, self.prices.loc[neutral_ticks].open,
# 'o', color='grey', ms=3, alpha=0.1)
# plt.plot(short_ticks, self.prices.loc[short_ticks].open,
# 'o', color='r', ms=3, alpha=0.8)
# plt.plot(long_ticks, self.prices.loc[long_ticks].open,
# 'o', color='g', ms=3, alpha=0.8)
plt.suptitle("Generalising")
fig = plt.gcf()
fig.set_size_inches(15, 10)
# plt.suptitle("Generalising")
# fig = plt.gcf()
# fig.set_size_inches(15, 10)
def close_trade_report(self):
small_trade = 0
positive_big_trade = 0
negative_big_trade = 0
small_profit = 0.003
for i in self.close_trade_profit:
if i < small_profit and i > -small_profit:
small_trade+=1
elif i > small_profit:
positive_big_trade += 1
elif i < -small_profit:
negative_big_trade += 1
print(f"small trade={small_trade/len(self.close_trade_profit)}; positive_big_trade={positive_big_trade/len(self.close_trade_profit)}; negative_big_trade={negative_big_trade/len(self.close_trade_profit)}")
# def close_trade_report(self):
# small_trade = 0
# positive_big_trade = 0
# negative_big_trade = 0
# small_profit = 0.003
# for i in self.close_trade_profit:
# if i < small_profit and i > -small_profit:
# small_trade+=1
# elif i > small_profit:
# positive_big_trade += 1
# elif i < -small_profit:
# negative_big_trade += 1
# print(f"small trade={small_trade/len(self.close_trade_profit)}; positive_big_trade={positive_big_trade/len(self.close_trade_profit)}; negative_big_trade={negative_big_trade/len(self.close_trade_profit)}")
def report(self):
# def report(self):
# get total trade
long_trade = 0
short_trade = 0
neutral_trade = 0
for trade in self.trade_history:
if trade['type'] == 'long':
long_trade += 1
# # get total trade
# long_trade = 0
# short_trade = 0
# neutral_trade = 0
# for trade in self.trade_history:
# if trade['type'] == 'long':
# long_trade += 1
elif trade['type'] == 'short':
short_trade += 1
else:
neutral_trade += 1
# elif trade['type'] == 'short':
# short_trade += 1
# else:
# neutral_trade += 1
negative_trade = 0
positive_trade = 0
for tr in self.close_trade_profit:
if tr < 0.:
negative_trade += 1
# negative_trade = 0
# positive_trade = 0
# for tr in self.close_trade_profit:
# if tr < 0.:
# negative_trade += 1
if tr > 0.:
positive_trade += 1
# if tr > 0.:
# positive_trade += 1
total_trade_lr = negative_trade+positive_trade
# total_trade_lr = negative_trade+positive_trade
total_trade = long_trade + short_trade
sharp_ratio = self.sharpe_ratio()
sharp_log = self.get_sharpe_ratio()
# total_trade = long_trade + short_trade
# sharp_ratio = self.sharpe_ratio()
# sharp_log = self.get_sharpe_ratio()
from tabulate import tabulate
# from tabulate import tabulate
headers = ["Performance", ""]
performanceTable = [["Total Trade", "{0:.2f}".format(total_trade)],
["Total reward", "{0:.3f}".format(self.total_reward)],
["Start profit(unit)", "{0:.2f}".format(1.)],
["End profit(unit)", "{0:.3f}".format(self._total_profit)],
["Sharp ratio", "{0:.3f}".format(sharp_ratio)],
["Sharp log", "{0:.3f}".format(sharp_log)],
# ["Sortino ratio", "{0:.2f}".format(0) + '%'],
["winrate", "{0:.2f}".format(positive_trade*100/total_trade_lr) + '%']
]
tabulation = tabulate(performanceTable, headers, tablefmt="fancy_grid", stralign="center")
print(tabulation)
# headers = ["Performance", ""]
# performanceTable = [["Total Trade", "{0:.2f}".format(total_trade)],
# ["Total reward", "{0:.3f}".format(self.total_reward)],
# ["Start profit(unit)", "{0:.2f}".format(1.)],
# ["End profit(unit)", "{0:.3f}".format(self._total_profit)],
# ["Sharp ratio", "{0:.3f}".format(sharp_ratio)],
# ["Sharp log", "{0:.3f}".format(sharp_log)],
# # ["Sortino ratio", "{0:.2f}".format(0) + '%'],
# ["winrate", "{0:.2f}".format(positive_trade*100/total_trade_lr) + '%']
# ]
# tabulation = tabulate(performanceTable, headers, tablefmt="fancy_grid", stralign="center")
# print(tabulation)
result = {
"Start": "{0:.2f}".format(1.),
"End": "{0:.2f}".format(self._total_profit),
"Sharp": "{0:.3f}".format(sharp_ratio),
"Winrate": "{0:.2f}".format(positive_trade*100/total_trade_lr)
}
return result
# result = {
# "Start": "{0:.2f}".format(1.),
# "End": "{0:.2f}".format(self._total_profit),
# "Sharp": "{0:.3f}".format(sharp_ratio),
# "Winrate": "{0:.2f}".format(positive_trade*100/total_trade_lr)
# }
# return result
def close(self):
plt.close()
# def close(self):
# plt.close()
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
def save_rendering(self, filepath):
plt.savefig(filepath)
# def save_rendering(self, filepath):
# plt.savefig(filepath)
def pause_rendering(self):
plt.show()
# def pause_rendering(self):
# plt.show()
def _calculate_reward(self, action):
# rw = self.transaction_profit_reward(action)
#rw = self.reward_rr_profit_config(action)
rw = self.reward_rr_profit_config_v2(action)
#rw = self.reward_rr_profit_config(action) # main
#rw = self.profit_only_when_close_reward(action)
rw = self.profit_only_when_close_reward_aim(action)
return rw
def _update_profit(self, action):
#if self._is_trade(action) or self._done:
if self._is_trade_v2(action) or self._done:
if self._is_trade(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
@ -485,7 +460,7 @@ class DEnv(gym.Env):
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
#if action == Actions.Short.value or action == Actions.Neutral.value:
if action == Actions_v2.Short_buy.value or action == Actions_v2.Neutral.value:
if action == Actions.Short_buy.value or action == Actions.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
@ -500,7 +475,7 @@ class DEnv(gym.Env):
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
#if action == Actions.Long.value or action == Actions.Neutral.value:
if action == Actions_v2.Long_buy.value or action == Actions_v2.Neutral.value:
if action == Actions.Long_buy.value or action == Actions.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
@ -574,8 +549,57 @@ class DEnv(gym.Env):
return np.clip(rw, 0, 1)
def profit_only_when_close_reward(self, action):
def reward_rr_profit_config_v2(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if action == Actions.Long_sell.value and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
# close short
if action == Actions.Short_buy.value and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
return 0.
def profit_only_when_close_reward_aim(self, action):
if self._last_trade_tick == None:
return 0.
# close long
if action == Actions.Long_sell.value and self._position == Positions.Long:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(current_price) - np.log(last_trade_price))
if action == Actions.Long_sell.value and self._position == Positions.Long:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(current_price) - np.log(last_trade_price)) * 2)
# close short
if action == Actions.Short_buy.value and self._position == Positions.Short:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float(np.log(last_trade_price) - np.log(current_price))
if action == Actions.Short_buy.value and self._position == Positions.Short:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
return float((np.log(last_trade_price) - np.log(current_price)) * 2)
return 0.
def reward_rr_profit_config(self, action):
rw = 0.
pt_1 = self.current_price()
@ -587,61 +611,61 @@ class DEnv(gym.Env):
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
if action == Actions_v2.Short_buy.value:
if action == Actions.Short_buy.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 2
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = 10 * 1 * 1
rw = 15
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
rw = -10
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
rw = -15
if action == Actions_v2.Long_sell.value:
if action == Actions.Long_sell.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 5
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = 10 * 1 * 3
rw = 20
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
rw = -15
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
rw = -25
if action == Actions_v2.Neutral.value:
if self.close_trade_profit[-1] > 0:
rw = 2
if action == Actions.Neutral.value:
if self.close_trade_profit[-1] > 0.005:
rw = 0
elif self.close_trade_profit[-1] < 0:
rw = 2 * -1
rw = 0
# short
if self._position == Positions.Short:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
if action == Actions_v2.Long_buy.value:
if action == Actions.Long_buy.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 2
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 1 * 1
rw = 15
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
rw = -10
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
rw =- -25
if action == Actions_v2.Short_sell.value:
if action == Actions.Short_sell.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 5
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 1 * 3
rw = 20
elif self.close_trade_profit[-1] > 0.01 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = -1
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
rw = -15
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
rw = -25
if action == Actions_v2.Neutral.value:
if self.close_trade_profit[-1] > 0:
rw = 2
if action == Actions.Neutral.value:
if self.close_trade_profit[-1] > 0.005:
rw = 0
elif self.close_trade_profit[-1] < 0:
rw = 2 * -1
rw = 0
return np.clip(rw, 0, 1)

View File

@ -1,645 +0,0 @@
import gym
from gym import spaces
from gym.utils import seeding
from enum import Enum
from sklearn.decomposition import PCA, KernelPCA
import random
import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pylab as plt
from typing import Dict, List, Tuple, Type, Optional, Any, Union, Callable
import logging
logger = logging.getLogger(__name__)
# from bokeh.io import output_notebook
# from bokeh.plotting import figure, show
# from bokeh.models import (
# CustomJS,
# ColumnDataSource,
# NumeralTickFormatter,
# Span,
# HoverTool,
# Range1d,
# DatetimeTickFormatter,
# Scatter,
# Label, LabelSet
# )
class Actions(Enum):
Short = 0
Long = 1
Neutral = 2
class Actions_v2(Enum):
Neutral = 0
Long_buy = 1
Long_sell = 2
Short_buy = 3
Short_sell = 4
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
def mean_over_std(x):
std = np.std(x, ddof=1)
mean = np.mean(x)
return mean / std if std > 0 else 0
class DEnv(gym.Env):
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee=0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions_v2))
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point == True:
self._position_history = (self._start_tick* [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
#self._last_trade_tick = self._current_tick - 1
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self._calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal_v2(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
temp_position = self._position
if action == Actions_v2.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions_v2.Long_buy.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions_v2.Short_buy.value:
self._position = Positions.Short
trade_type = "short"
elif action == Actions_v2.Long_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions_v2.Short_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type != None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick, 'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick = self._current_tick,
total_reward = self.total_reward,
total_profit = self._total_profit,
position = self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
def processState(self, state):
return state.to_numpy()
def convert_mlp_Policy(self, obs_):
pass
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick == None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price)/last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price)/last_trade_price
else:
return 0.
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral)
or (action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def is_tradesignal_v2(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions_v2.Neutral.value and self._position == Positions.Neutral) or
(action == Actions_v2.Short_buy.value and self._position == Positions.Short) or
(action == Actions_v2.Short_sell.value and self._position == Positions.Short) or
(action == Actions_v2.Short_buy.value and self._position == Positions.Long) or
(action == Actions_v2.Short_sell.value and self._position == Positions.Long) or
(action == Actions_v2.Long_buy.value and self._position == Positions.Long) or
(action == Actions_v2.Long_sell.value and self._position == Positions.Long) or
(action == Actions_v2.Long_buy.value and self._position == Positions.Short) or
(action == Actions_v2.Long_sell.value and self._position == Positions.Short))
def _is_trade(self, action: Actions):
return ((action == Actions.Long.value and self._position == Positions.Short) or
(action == Actions.Short.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short)
)
def _is_trade_v2(self, action: Actions_v2):
return ((action == Actions_v2.Long_buy.value and self._position == Positions.Short) or
(action == Actions_v2.Short_buy.value and self._position == Positions.Long) or
(action == Actions_v2.Neutral.value and self._position == Positions.Long) or
(action == Actions_v2.Neutral.value and self._position == Positions.Short) or
(action == Actions_v2.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions_v2.Neutral.Long_sell and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def is_hold_v2(self, action):
return ((action == Actions_v2.Short_buy.value and self._position == Positions.Short)
or (action == Actions_v2.Long_buy.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)
def render(self, mode='human'):
def _plot_position(position, tick):
color = None
if position == Positions.Short:
color = 'red'
elif position == Positions.Long:
color = 'green'
if color:
plt.scatter(tick, self.prices.loc[tick].open, color=color)
if self._first_rendering:
self._first_rendering = False
plt.cla()
plt.plot(self.prices)
start_position = self._position_history[self._start_tick]
_plot_position(start_position, self._start_tick)
plt.cla()
plt.plot(self.prices)
_plot_position(self._position, self._current_tick)
plt.suptitle("Total Reward: %.6f" % self.total_reward + ' ~ ' + "Total Profit: %.6f" % self._total_profit)
plt.pause(0.01)
def render_all(self):
plt.figure()
window_ticks = np.arange(len(self._position_history))
plt.plot(self.prices['open'], alpha=0.5)
short_ticks = []
long_ticks = []
neutral_ticks = []
for i, tick in enumerate(window_ticks):
if self._position_history[i] == Positions.Short:
short_ticks.append(tick - 1)
elif self._position_history[i] == Positions.Long:
long_ticks.append(tick - 1)
elif self._position_history[i] == Positions.Neutral:
neutral_ticks.append(tick - 1)
plt.plot(neutral_ticks, self.prices.loc[neutral_ticks].open,
'o', color='grey', ms=3, alpha=0.1)
plt.plot(short_ticks, self.prices.loc[short_ticks].open,
'o', color='r', ms=3, alpha=0.8)
plt.plot(long_ticks, self.prices.loc[long_ticks].open,
'o', color='g', ms=3, alpha=0.8)
plt.suptitle("Generalising")
fig = plt.gcf()
fig.set_size_inches(15, 10)
def close_trade_report(self):
small_trade = 0
positive_big_trade = 0
negative_big_trade = 0
small_profit = 0.003
for i in self.close_trade_profit:
if i < small_profit and i > -small_profit:
small_trade+=1
elif i > small_profit:
positive_big_trade += 1
elif i < -small_profit:
negative_big_trade += 1
print(f"small trade={small_trade/len(self.close_trade_profit)}; positive_big_trade={positive_big_trade/len(self.close_trade_profit)}; negative_big_trade={negative_big_trade/len(self.close_trade_profit)}")
def report(self):
# get total trade
long_trade = 0
short_trade = 0
neutral_trade = 0
for trade in self.trade_history:
if trade['type'] == 'long':
long_trade += 1
elif trade['type'] == 'short':
short_trade += 1
else:
neutral_trade += 1
negative_trade = 0
positive_trade = 0
for tr in self.close_trade_profit:
if tr < 0.:
negative_trade += 1
if tr > 0.:
positive_trade += 1
total_trade_lr = negative_trade+positive_trade
total_trade = long_trade + short_trade
sharp_ratio = self.sharpe_ratio()
sharp_log = self.get_sharpe_ratio()
from tabulate import tabulate
headers = ["Performance", ""]
performanceTable = [["Total Trade", "{0:.2f}".format(total_trade)],
["Total reward", "{0:.3f}".format(self.total_reward)],
["Start profit(unit)", "{0:.2f}".format(1.)],
["End profit(unit)", "{0:.3f}".format(self._total_profit)],
["Sharp ratio", "{0:.3f}".format(sharp_ratio)],
["Sharp log", "{0:.3f}".format(sharp_log)],
# ["Sortino ratio", "{0:.2f}".format(0) + '%'],
["winrate", "{0:.2f}".format(positive_trade*100/total_trade_lr) + '%']
]
tabulation = tabulate(performanceTable, headers, tablefmt="fancy_grid", stralign="center")
print(tabulation)
result = {
"Start": "{0:.2f}".format(1.),
"End": "{0:.2f}".format(self._total_profit),
"Sharp": "{0:.3f}".format(sharp_ratio),
"Winrate": "{0:.2f}".format(positive_trade*100/total_trade_lr)
}
return result
def close(self):
plt.close()
def get_sharpe_ratio(self):
return mean_over_std(self.get_portfolio_log_returns())
def save_rendering(self, filepath):
plt.savefig(filepath)
def pause_rendering(self):
plt.show()
def _calculate_reward(self, action):
# rw = self.transaction_profit_reward(action)
#rw = self.reward_rr_profit_config(action)
rw = self.reward_rr_profit_config_v2(action)
return rw
def _update_profit(self, action):
#if self._is_trade(action) or self._done:
if self._is_trade_v2(action) or self._done:
pnl = self.get_unrealized_profit()
if self._position == Positions.Long:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
if self._position == Positions.Short:
self._total_profit = self._total_profit + self._total_profit*pnl
self._profits.append((self._current_tick, self._total_profit))
self.close_trade_profit.append(pnl)
def most_recent_return(self, action):
"""
We support Long, Neutral and Short positions.
Return is generated from rising prices in Long
and falling prices in Short positions.
The actions Sell/Buy or Hold during a Long position trigger the sell/buy-fee.
"""
# Long positions
if self._position == Positions.Long:
current_price = self.prices.iloc[self._current_tick].open
#if action == Actions.Short.value or action == Actions.Neutral.value:
if action == Actions_v2.Short_buy.value or action == Actions_v2.Neutral.value:
current_price = self.add_sell_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Short
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_buy_fee(previous_price)
return np.log(current_price) - np.log(previous_price)
# Short positions
if self._position == Positions.Short:
current_price = self.prices.iloc[self._current_tick].open
#if action == Actions.Long.value or action == Actions.Neutral.value:
if action == Actions_v2.Long_buy.value or action == Actions_v2.Neutral.value:
current_price = self.add_buy_fee(current_price)
previous_price = self.prices.iloc[self._current_tick - 1].open
if (self._position_history[self._current_tick - 1] == Positions.Long
or self._position_history[self._current_tick - 1] == Positions.Neutral):
previous_price = self.add_sell_fee(previous_price)
return np.log(previous_price) - np.log(current_price)
return 0
def get_portfolio_log_returns(self):
return self.portfolio_log_returns[1:self._current_tick + 1]
def get_trading_log_return(self):
return self.portfolio_log_returns[self._start_tick:]
def update_portfolio_log_returns(self, action):
self.portfolio_log_returns[self._current_tick] = self.most_recent_return(action)
def current_price(self) -> float:
return self.prices.iloc[self._current_tick].open
def prev_price(self) -> float:
return self.prices.iloc[self._current_tick-1].open
def sharpe_ratio(self):
if len(self.close_trade_profit) == 0:
return 0.
returns = np.array(self.close_trade_profit)
reward = (np.mean(returns) - 0. + 1e-9) / (np.std(returns) + 1e-9)
return reward
def get_bnh_log_return(self):
return np.diff(np.log(self.prices['open'][self._start_tick:]))
def transaction_profit_reward(self, action):
rw = 0.
pt = self.prev_price()
pt_1 = self.current_price()
if self._position == Positions.Long:
a_t = 1
elif self._position == Positions.Short:
a_t = -1
else:
a_t = 0
# close long
if (action == Actions.Short.value or action == Actions.Neutral.value) and self._position == Positions.Long:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
# close short
elif (action == Actions.Long.value or action == Actions.Neutral.value) and self._position == Positions.Short:
pt_1 = self.add_buy_fee(self.current_price())
po = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
rw = a_t*(pt_1 - po)/po
#rw = rw*2
else:
rw = a_t*(pt_1 - pt)/pt
return np.clip(rw, 0, 1)
def reward_rr_profit_config_v2(self, action):
rw = 0.
pt_1 = self.current_price()
if len(self.close_trade_profit) > 0:
# long
if self._position == Positions.Long:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
if action == Actions_v2.Short_buy.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 2
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = 10 * 1 * 1
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
if action == Actions_v2.Long_sell.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 5
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < self.profit_aim * self.rr:
rw = 10 * 1 * 3
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
if action == Actions_v2.Neutral.value:
if self.close_trade_profit[-1] > 0:
rw = 2
elif self.close_trade_profit[-1] < 0:
rw = 2 * -1
# short
if self._position == Positions.Short:
pt_1 = self.add_sell_fee(self.current_price())
po = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
if action == Actions_v2.Long_buy.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 2
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 1 * 1
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
if action == Actions_v2.Short_sell.value:
if self.close_trade_profit[-1] > self.profit_aim * self.rr:
rw = 10 * 5
elif self.close_trade_profit[-1] > 0 and self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 1 * 3
elif self.close_trade_profit[-1] < 0:
rw = 10 * -1
elif self.close_trade_profit[-1] < (self.profit_aim * -1) * self.rr:
rw = 10 * 3 * -1
if action == Actions_v2.Neutral.value:
if self.close_trade_profit[-1] > 0:
rw = 2
elif self.close_trade_profit[-1] < 0:
rw = 2 * -1
return np.clip(rw, 0, 1)

View File

@ -4,29 +4,23 @@ from typing import Any, Dict, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
import torch as th
from pandas import DataFrame
from stable_baselines3 import PPO
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.prediction_models.RL.RLPrediction_agent import RLPrediction_agent
from freqtrade.freqai.prediction_models.RL.RLPrediction_agent_v2 import TDQN
#from freqtrade.freqai.prediction_models.RL.RLPrediction_env import GymAnytrading
from freqtrade.freqai.prediction_models.RL.RLPrediction_env import DEnv
from freqtrade.freqai.prediction_models.RL.RLPrediction_agent_TDQN import TDQN
from freqtrade.freqai.prediction_models.RL.RLPrediction_env_TDQN_5ac import DEnv
#from freqtrade.freqai.prediction_models.RL.RLPrediction_env_TDQN_3ac import DEnv
from freqtrade.persistence import Trade
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.monitor import Monitor
import torch as th
from stable_baselines3.common.callbacks import CallbackList, CheckpointCallback, EvalCallback, StopTrainingOnRewardThreshold
from stable_baselines3.common.buffers import ReplayBuffer
from stable_baselines3 import PPO
logger = logging.getLogger(__name__)
class ReinforcementLearningModel(IFreqaiModel):
"""
User created Reinforcement Learning Model prediction model.
@ -87,30 +81,22 @@ class ReinforcementLearningModel(IFreqaiModel):
# # train_labels = data_dictionary["train_labels"]
# test_df = data_dictionary["test_features"]
# # test_labels = data_dictionary["test_labels"]
# # sep = '/'
# # coin = pair.split(sep, 1)[0]
# # price = train_df[f"%-{coin}raw_price_{self.config['timeframe']}"]
# # price.reset_index(inplace=True, drop=True)
# # price = price.to_frame()
# price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
# price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(test_df.index))
# price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(test_df.index))
# #train_env = GymAnytrading(train_df, price, self.CONV_WIDTH)
# agent_params = self.freqai_info['model_training_parameters']
# reward_params = self.freqai_info['model_reward_parameters']
# train_env = DEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
# #eval_env = DEnv(df=test_df, prices=price_test, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
# #env_instance = SubprocVecEnv([DEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)])
# #train_env.reset()
# #eval_env.reset()
# # model
# #policy_kwargs = dict(net_arch=[512, 512, 512])
# policy_kwargs = dict(activation_fn=th.nn.Tanh,
# net_arch=[256, 256, 256])
@ -124,27 +110,22 @@ class ReinforcementLearningModel(IFreqaiModel):
# tb_log_name=model_name,
# model_kwargs=agent_params,
# train_df=train_df,
# test_df=test_df,
# price=price,
# price_test=price_test,
# test_df=test_df,
# price=price,
# price_test=price_test,
# window_size=self.CONV_WIDTH)
# # best_model = eval_agent.train_model(model=model,
# # tb_log_name=model_name,
# # model_kwargs=agent_params,
# # eval=eval_env)
# # TDQN
# # model_name = 'TDQN'
# # model = TDQN('TMultiInputPolicy', train_env, policy_kwargs=policy_kwargs, tensorboard_log='./tensorboard_log/',
# # learning_rate=agent_params["learning_rate"], gamma=0.9,
# # target_update_interval=5000, buffer_size=50000,
# # target_update_interval=5000, buffer_size=50000,
# # exploration_initial_eps=1, exploration_final_eps=0.1,
# # replay_buffer_class=ReplayBuffer
# # )
# # trained_model = agent.train_model(model=model,
# # tb_log_name=model_name,
# # model_kwargs=agent_params)
@ -157,11 +138,13 @@ class ReinforcementLearningModel(IFreqaiModel):
reward_params = self.freqai_info['model_reward_parameters']
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
eval_freq = agent_params["eval_cycles"] * len(test_df)
total_timesteps = agent_params["train_cycles"] * len(train_df)
# price data for model training and evaluation
price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(test_df.index))
price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(test_df.index))
# environments
train_env = DEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
eval = DEnv(df=test_df, prices=price_test, window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
@ -173,19 +156,17 @@ class ReinforcementLearningModel(IFreqaiModel):
path = self.dk.data_path
eval_callback = EvalCallback(eval_env, best_model_save_path=f"{path}/",
log_path=f"{path}/{agent_type}/logs/", eval_freq=10000,
log_path=f"{path}/{agent_type}/logs/", eval_freq=int(eval_freq),
deterministic=True, render=False)
# model arch
policy_kwargs = dict(activation_fn=th.nn.Tanh,
net_arch=[512, 512, 512])
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[256, 256, 128])
if agent_type == 'tdqn':
model = TDQN('TMultiInputPolicy', train_env, policy_kwargs=policy_kwargs, tensorboard_log=f"{path}/{agent_type}/tensorboard/",
learning_rate=0.00025, gamma=0.9,
target_update_interval=5000, buffer_size=50000,
target_update_interval=5000, buffer_size=50000,
exploration_initial_eps=1, exploration_final_eps=0.1,
replay_buffer_class=ReplayBuffer
)
@ -193,9 +174,9 @@ class ReinforcementLearningModel(IFreqaiModel):
model = PPO('MultiInputPolicy', train_env, policy_kwargs=policy_kwargs, tensorboard_log=f"{path}/{agent_type}/tensorboard/",
learning_rate=0.00025, gamma=0.9
)
model.learn(
total_timesteps=agent_params["total_timesteps"],
total_timesteps=int(total_timesteps),
callback=eval_callback
)