add pytorch data convertor

This commit is contained in:
Yinon Polak 2023-04-03 15:19:10 +03:00
parent 5a7ca35c6b
commit bd3b70293f
9 changed files with 168 additions and 40 deletions

View File

@ -69,12 +69,11 @@ class BasePyTorchClassifier(BasePyTorchModel):
) )
filtered_df = dk.normalize_data_from_metadata(filtered_df) filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk) self.data_cleaning_predict(dk)
x = torch.from_numpy(dk.data_dictionary["prediction_features"].values)\ x = self.data_convertor.convert_x(
.float()\ dk.data_dictionary["prediction_features"],
.to(self.device) device=self.device
)
logits = self.model.model(x) logits = self.model.model(x)
probs = F.softmax(logits, dim=-1) probs = F.softmax(logits, dim=-1)
predicted_classes = torch.argmax(probs, dim=-1) predicted_classes = torch.argmax(probs, dim=-1)

View File

@ -1,4 +1,5 @@
import logging import logging
from abc import ABC, abstractmethod
from time import time from time import time
from typing import Any from typing import Any
@ -7,15 +8,17 @@ from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.torch import PyTorchDataConvertor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class BasePyTorchModel(IFreqaiModel): class BasePyTorchModel(IFreqaiModel, ABC):
""" """
Base class for PyTorch type models. Base class for PyTorch type models.
User *must* inherit from this class and set fit() and predict(). User *must* inherit from this class and set fit() and predict() and
data_convertor property.
""" """
def __init__(self, **kwargs): def __init__(self, **kwargs):
@ -69,3 +72,8 @@ class BasePyTorchModel(IFreqaiModel):
f"({end_time - start_time:.2f} secs) --------------------") f"({end_time - start_time:.2f} secs) --------------------")
return model return model
@property
@abstractmethod
def data_convertor(self) -> PyTorchDataConvertor:
raise NotImplementedError("Abstract property")

View File

@ -3,7 +3,6 @@ from typing import Tuple
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
import torch
from pandas import DataFrame from pandas import DataFrame
from freqtrade.freqai.base_models.BasePyTorchModel import BasePyTorchModel from freqtrade.freqai.base_models.BasePyTorchModel import BasePyTorchModel
@ -41,9 +40,12 @@ class BasePyTorchRegressor(BasePyTorchModel):
dk.data_dictionary["prediction_features"] = filtered_df dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk) self.data_cleaning_predict(dk)
x = torch.from_numpy(dk.data_dictionary["prediction_features"].values)\ x = self.data_convertor.convert_x(
.float()\ dk.data_dictionary["prediction_features"],
.to(self.device) device=self.device
)
logger.info(self.model.model)
logger.info(self.model.model)
y = self.model.model(x) y = self.model.model(x)
pred_df = DataFrame(y.detach().numpy(), columns=[dk.label_list[0]]) pred_df = DataFrame(y.detach().numpy(), columns=[dk.label_list[0]])

View File

@ -4,6 +4,8 @@ import torch
from freqtrade.freqai.base_models.BasePyTorchClassifier import BasePyTorchClassifier from freqtrade.freqai.base_models.BasePyTorchClassifier import BasePyTorchClassifier
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.torch import PyTorchDataConvertor
from freqtrade.freqai.torch.PyTorchDataConvertor import DefaultPyTorchDataConvertor
from freqtrade.freqai.torch.PyTorchMLPModel import PyTorchMLPModel from freqtrade.freqai.torch.PyTorchMLPModel import PyTorchMLPModel
from freqtrade.freqai.torch.PyTorchModelTrainer import PyTorchModelTrainer from freqtrade.freqai.torch.PyTorchModelTrainer import PyTorchModelTrainer
@ -38,6 +40,10 @@ class PyTorchMLPClassifier(BasePyTorchClassifier):
} }
""" """
@property
def data_convertor(self) -> PyTorchDataConvertor:
return DefaultPyTorchDataConvertor(target_tensor_type=torch.long, squeeze_target_tensor=True)
def __init__(self, **kwargs) -> None: def __init__(self, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
config = self.freqai_info.get("model_training_parameters", {}) config = self.freqai_info.get("model_training_parameters", {})
@ -72,8 +78,7 @@ class PyTorchMLPClassifier(BasePyTorchClassifier):
model_meta_data={"class_names": class_names}, model_meta_data={"class_names": class_names},
device=self.device, device=self.device,
init_model=init_model, init_model=init_model,
target_tensor_type=torch.long, data_convertor=self.data_convertor,
squeeze_target_tensor=True,
**self.trainer_kwargs, **self.trainer_kwargs,
) )
trainer.fit(data_dictionary, self.splits) trainer.fit(data_dictionary, self.splits)

View File

@ -4,6 +4,8 @@ import torch
from freqtrade.freqai.base_models.BasePyTorchRegressor import BasePyTorchRegressor from freqtrade.freqai.base_models.BasePyTorchRegressor import BasePyTorchRegressor
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.torch import PyTorchDataConvertor
from freqtrade.freqai.torch.PyTorchDataConvertor import DefaultPyTorchDataConvertor
from freqtrade.freqai.torch.PyTorchMLPModel import PyTorchMLPModel from freqtrade.freqai.torch.PyTorchMLPModel import PyTorchMLPModel
from freqtrade.freqai.torch.PyTorchModelTrainer import PyTorchModelTrainer from freqtrade.freqai.torch.PyTorchModelTrainer import PyTorchModelTrainer
@ -39,6 +41,10 @@ class PyTorchMLPRegressor(BasePyTorchRegressor):
} }
""" """
@property
def data_convertor(self) -> PyTorchDataConvertor:
return DefaultPyTorchDataConvertor(target_tensor_type=torch.float)
def __init__(self, **kwargs) -> None: def __init__(self, **kwargs) -> None:
super().__init__(**kwargs) super().__init__(**kwargs)
config = self.freqai_info.get("model_training_parameters", {}) config = self.freqai_info.get("model_training_parameters", {})
@ -69,7 +75,7 @@ class PyTorchMLPRegressor(BasePyTorchRegressor):
criterion=criterion, criterion=criterion,
device=self.device, device=self.device,
init_model=init_model, init_model=init_model,
target_tensor_type=torch.float, data_convertor=self.data_convertor,
**self.trainer_kwargs, **self.trainer_kwargs,
) )
trainer.fit(data_dictionary, self.splits) trainer.fit(data_dictionary, self.splits)

View File

@ -0,0 +1,56 @@
from abc import ABC, abstractmethod
from typing import Optional, Tuple
import pandas as pd
import torch
class PyTorchDataConvertor(ABC):
@abstractmethod
def convert_x(self, df: pd.DataFrame, device: Optional[str] = None) -> Tuple[torch.Tensor, ...]:
"""
:param df: "*_features" dataframe.
:param device: cpu/gpu.
:returns: tuple of tensors.
"""
@abstractmethod
def convert_y(self, df: pd.DataFrame, device: Optional[str] = None) -> Tuple[torch.Tensor, ...]:
"""
:param df: "*_labels" dataframe.
:param device: cpu/gpu.
:returns: tuple of tensors.
"""
class DefaultPyTorchDataConvertor(PyTorchDataConvertor):
def __init__(
self,
target_tensor_type: Optional[torch.dtype] = None,
squeeze_target_tensor: bool = False
):
self._target_tensor_type = target_tensor_type
self._squeeze_target_tensor = squeeze_target_tensor
def convert_x(self, df: pd.DataFrame, device: Optional[str] = None) -> Tuple[torch.Tensor, ...]:
x = torch.from_numpy(df.values).float()
if device:
x = x.to(device)
return x,
def convert_y(self, df: pd.DataFrame, device: Optional[str] = None) -> Tuple[torch.Tensor, ...]:
y = torch.from_numpy(df.values)
if self._target_tensor_type:
y = y.to(self._target_tensor_type)
if self._squeeze_target_tensor:
y = y.squeeze()
if device:
y = y.to(device)
return y,

View File

@ -1,4 +1,5 @@
import logging import logging
from typing import Tuple, List
import torch import torch
import torch.nn as nn import torch.nn as nn
@ -46,7 +47,8 @@ class PyTorchMLPModel(nn.Module):
self.relu = nn.ReLU() self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=dropout_percent) self.dropout = nn.Dropout(p=dropout_percent)
def forward(self, x: torch.Tensor) -> torch.Tensor: def forward(self, x: List[torch.Tensor]) -> torch.Tensor:
x, = x
x = self.relu(self.input_layer(x)) x = self.relu(self.input_layer(x))
x = self.dropout(x) x = self.dropout(x)
x = self.blocks(x) x = self.blocks(x)

View File

@ -9,11 +9,13 @@ import torch.nn as nn
from torch.optim import Optimizer from torch.optim import Optimizer
from torch.utils.data import DataLoader, TensorDataset from torch.utils.data import DataLoader, TensorDataset
from freqtrade.freqai.torch.PyTorchDataConvertor import PyTorchDataConvertor
from freqtrade.freqai.torch.PyTorchTrainerInterface import PyTorchTrainerInterface
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PyTorchModelTrainer: class PyTorchModelTrainer(PyTorchTrainerInterface):
def __init__( def __init__(
self, self,
model: nn.Module, model: nn.Module,
@ -21,8 +23,7 @@ class PyTorchModelTrainer:
criterion: nn.Module, criterion: nn.Module,
device: str, device: str,
init_model: Dict, init_model: Dict,
target_tensor_type: torch.dtype, data_convertor: PyTorchDataConvertor,
squeeze_target_tensor: bool = False,
model_meta_data: Dict[str, Any] = {}, model_meta_data: Dict[str, Any] = {},
**kwargs **kwargs
): ):
@ -33,11 +34,7 @@ class PyTorchModelTrainer:
:param device: The device to use for training (e.g. 'cpu', 'cuda'). :param device: The device to use for training (e.g. 'cpu', 'cuda').
:param init_model: A dictionary containing the initial model/optimizer :param init_model: A dictionary containing the initial model/optimizer
state_dict and model_meta_data saved by self.save() method. state_dict and model_meta_data saved by self.save() method.
:param target_tensor_type: type of target tensor, for classification usually
torch.long, for regressor usually torch.float.
:param model_meta_data: Additional metadata about the model (optional). :param model_meta_data: Additional metadata about the model (optional).
:param squeeze_target_tensor: controls the target shape, used for loss functions
that requires 0D or 1D.
:param max_iters: The number of training iterations to run. :param max_iters: The number of training iterations to run.
iteration here refers to the number of times we call iteration here refers to the number of times we call
self.optimizer.step(). used to calculate n_epochs. self.optimizer.step(). used to calculate n_epochs.
@ -49,11 +46,10 @@ class PyTorchModelTrainer:
self.criterion = criterion self.criterion = criterion
self.model_meta_data = model_meta_data self.model_meta_data = model_meta_data
self.device = device self.device = device
self.target_tensor_type = target_tensor_type
self.max_iters: int = kwargs.get("max_iters", 100) self.max_iters: int = kwargs.get("max_iters", 100)
self.batch_size: int = kwargs.get("batch_size", 64) self.batch_size: int = kwargs.get("batch_size", 64)
self.max_n_eval_batches: Optional[int] = kwargs.get("max_n_eval_batches", None) self.max_n_eval_batches: Optional[int] = kwargs.get("max_n_eval_batches", None)
self.squeeze_target_tensor = squeeze_target_tensor self.data_convertor = data_convertor
if init_model: if init_model:
self.load_from_checkpoint(init_model) self.load_from_checkpoint(init_model)
@ -81,9 +77,12 @@ class PyTorchModelTrainer:
# training # training
losses = [] losses = []
for i, batch_data in enumerate(data_loaders_dictionary["train"]): for i, batch_data in enumerate(data_loaders_dictionary["train"]):
xb, yb = batch_data
xb = xb.to(self.device) for tensor in batch_data:
yb = yb.to(self.device) tensor.to(self.device)
xb = batch_data[:-1]
yb = batch_data[-1]
yb_pred = self.model(xb) yb_pred = self.model(xb)
loss = self.criterion(yb_pred, yb) loss = self.criterion(yb_pred, yb)
@ -115,14 +114,16 @@ class PyTorchModelTrainer:
self.model.eval() self.model.eval()
n_batches = 0 n_batches = 0
losses = [] losses = []
for i, batch in enumerate(data_loader_dictionary[split]): for i, batch_data in enumerate(data_loader_dictionary[split]):
if max_n_eval_batches and i > max_n_eval_batches: if max_n_eval_batches and i > max_n_eval_batches:
n_batches += 1 n_batches += 1
break break
xb, yb = batch for tensor in batch_data:
xb = xb.to(self.device) tensor.to(self.device)
yb = yb.to(self.device)
xb = batch_data[:-1]
yb = batch_data[-1]
yb_pred = self.model(xb) yb_pred = self.model(xb)
loss = self.criterion(yb_pred, yb) loss = self.criterion(yb_pred, yb)
losses.append(loss.item()) losses.append(loss.item())
@ -140,14 +141,9 @@ class PyTorchModelTrainer:
""" """
data_loader_dictionary = {} data_loader_dictionary = {}
for split in splits: for split in splits:
x = torch.from_numpy(data_dictionary[f"{split}_features"].values).float() x = self.data_convertor.convert_x(data_dictionary[f"{split}_features"])
y = torch.from_numpy(data_dictionary[f"{split}_labels"].values)\ y = self.data_convertor.convert_y(data_dictionary[f"{split}_labels"])
.to(self.target_tensor_type) dataset = TensorDataset(*x, *y)
if self.squeeze_target_tensor:
y = y.squeeze()
dataset = TensorDataset(x, y)
data_loader = DataLoader( data_loader = DataLoader(
dataset, dataset,
batch_size=self.batch_size, batch_size=self.batch_size,
@ -186,7 +182,7 @@ class PyTorchModelTrainer:
"model_meta_data": self.model_meta_data, "model_meta_data": self.model_meta_data,
}, path) }, path)
def load_from_file(self, path: Path): def load(self, path: Path):
checkpoint = torch.load(path) checkpoint = torch.load(path)
return self.load_from_checkpoint(checkpoint) return self.load_from_checkpoint(checkpoint)

View File

@ -0,0 +1,54 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import torch
import torch.nn as nn
from pathlib import Path
class PyTorchTrainerInterface(ABC):
@abstractmethod
def fit(self, data_dictionary: Dict[str, pd.DataFrame], splits: List[str]) -> None:
"""
:param data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
:param splits: splits to use in training, splits must contain "train",
optional "test" could be added by setting freqai.data_split_parameters.test_size > 0
in the config file.
- Calculates the predicted output for the batch using the PyTorch model.
- Calculates the loss between the predicted and actual output using a loss function.
- Computes the gradients of the loss with respect to the model's parameters using
backpropagation.
- Updates the model's parameters using an optimizer.
"""
@abstractmethod
def save(self, path: Path) -> None:
"""
- Saving any nn.Module state_dict
- Saving model_meta_data, this dict should contain any additional data that the
user needs to store. e.g class_names for classification models.
"""
def load(self, path: Path) -> nn.Module:
"""
:param path: path to zip file.
:returns: pytorch model.
"""
checkpoint = torch.load(path)
return self.load_from_checkpoint(checkpoint)
@abstractmethod
def load_from_checkpoint(self, checkpoint: Dict) -> nn.Module:
"""
when using continual_learning, DataDrawer will load the dictionary
(containing state dicts and model_meta_data) by calling torch.load(path).
you can access this dict from any class that inherits IFreqaiModel by calling
get_init_model method.
:checkpoint checkpoint: dict containing the model & optimizer state dicts,
model_meta_data, etc..
"""