stable/freqtrade/freqai/prediction_models/ReinforcementLearner_multiproc.py
2022-09-28 03:06:05 +00:00

102 lines
4.3 KiB
Python

import logging
from pathlib import Path
from typing import Any, Dict # , Tuple
# import numpy.typing as npt
import torch as th
from pandas import DataFrame
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.RL.BaseReinforcementLearningModel import (BaseReinforcementLearningModel,
make_env)
logger = logging.getLogger(__name__)
class ReinforcementLearner_multiproc(BaseReinforcementLearningModel):
"""
User created Reinforcement Learning Model prediction model.
"""
def fit(self, data_dictionary: Dict[str, Any], dk: FreqaiDataKitchen, **kwargs):
train_df = data_dictionary["train_features"]
total_timesteps = self.freqai_info["rl_config"]["train_cycles"] * len(train_df)
# model arch
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[128, 128])
if dk.pair not in self.dd.model_dictionary or not self.continual_learning:
model = self.MODELCLASS(self.policy_type, self.train_env, policy_kwargs=policy_kwargs,
tensorboard_log=Path(
dk.full_path / "tensorboard" / dk.pair.split('/')[0]),
**self.freqai_info['model_training_parameters']
)
else:
logger.info('Continual learning activated - starting training from previously '
'trained agent.')
model = self.dd.model_dictionary[dk.pair]
model.set_env(self.train_env)
model.learn(
total_timesteps=int(total_timesteps),
callback=self.eval_callback
)
if Path(dk.data_path / "best_model.zip").is_file():
logger.info('Callback found a best model.')
best_model = self.MODELCLASS.load(dk.data_path / "best_model")
return best_model
logger.info('Couldnt find best model, using final model instead.')
return model
def set_train_and_eval_environments(self, data_dictionary: Dict[str, Any],
prices_train: DataFrame, prices_test: DataFrame,
dk: FreqaiDataKitchen):
"""
User can override this if they are using a custom MyRLEnv
:params:
data_dictionary: dict = common data dictionary containing train and test
features/labels/weights.
prices_train/test: DataFrame = dataframe comprised of the prices to be used in
the environment during training
or testing
dk: FreqaiDataKitchen = the datakitchen for the current pair
"""
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
env_id = "train_env"
num_cpu = int(self.freqai_info["rl_config"].get("cpu_count", 2))
self.train_env = SubprocVecEnv([make_env(self.MyRLEnv, env_id, i, 1, train_df, prices_train,
self.reward_params, self.CONV_WIDTH, monitor=True,
config=self.config) for i
in range(num_cpu)])
eval_env_id = 'eval_env'
self.eval_env = SubprocVecEnv([make_env(self.MyRLEnv, eval_env_id, i, 1,
test_df, prices_test,
self.reward_params, self.CONV_WIDTH, monitor=True,
config=self.config) for i
in range(num_cpu)])
self.eval_callback = EvalCallback(self.eval_env, deterministic=True,
render=False, eval_freq=len(train_df),
best_model_save_path=str(dk.data_path))
def _on_stop(self):
"""
Hook called on bot shutdown. Close SubprocVecEnv subprocesses for clean shutdown.
"""
if hasattr(self, "train_env") and self.train_env:
self.train_env.close()
if hasattr(self, "eval_env") and self.eval_env:
self.eval_env.close()