initial commit

This commit is contained in:
Yinon Polak 2023-03-05 16:59:24 +02:00
parent 108a578772
commit 751b205618
5 changed files with 254 additions and 1 deletions

View File

@ -0,0 +1,69 @@
import logging
from time import time
from typing import Any, Dict
import torch
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
logger = logging.getLogger(__name__)
class BasePytorchModel(IFreqaiModel):
"""
Base class for TensorFlow type models.
User *must* inherit from this class and set fit() and predict().
"""
def __init__(self, **kwargs):
super().__init__(config=kwargs['config'])
self.dd.model_type = 'pytorch'
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
def train(
self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_df: Full dataframe for the current training period
:param metadata: pair metadata from strategy.
:return:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info(f"-------------------- Starting training {pair} --------------------")
start_time = time()
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
model = self.fit(data_dictionary, dk)
end_time = time()
logger.info(f"-------------------- Done training {pair} "
f"({end_time - start_time:.2f} secs) --------------------")
return model

View File

@ -0,0 +1,51 @@
import logging
from pathlib import Path
from typing import Dict
import torch
import torch.nn as nn
logger = logging.getLogger(__name__)
class PytorchModelTrainer:
def __init__(self, model: nn.Module, optimizer, init_model: Dict):
self.model = model
self.optimizer = optimizer
if init_model:
self.load_from_checkpoint(init_model)
def fit(self, tensor_dictionary, max_iters, batch_size):
for iter in range(max_iters):
# todo add validation evaluation here
xb, yb = self.get_batch(tensor_dictionary, 'train', batch_size)
logits, loss = self.model(xb, yb)
self.optimizer.zero_grad(set_to_none=True)
loss.backward()
self.optimizer.step()
def save(self, path):
torch.save({
'model_state_dict': self.model.state_dict(),
'optimizer_state_dict': self.optimizer.state_dict(),
}, path)
def load_from_file(self, path: Path):
checkpoint = torch.load(path)
return self.load_from_checkpoint(checkpoint)
def load_from_checkpoint(self, checkpoint: Dict):
self.model.load_state_dict(checkpoint['model_state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
return self
@staticmethod
def get_batch(tensor_dictionary: Dict, split: str, batch_size: int):
ix = torch.randint(len(tensor_dictionary[f'{split}_labels']), (batch_size,))
x = tensor_dictionary[f'{split}_features'][ix]
y = tensor_dictionary[f'{split}_labels'][ix]
return x, y

View File

@ -446,7 +446,9 @@ class FreqaiDataDrawer:
dump(model, save_path / f"{dk.model_filename}_model.joblib")
elif self.model_type == 'keras':
model.save(save_path / f"{dk.model_filename}_model.h5")
elif 'stable_baselines' in self.model_type or 'sb3_contrib' == self.model_type:
elif 'stable_baselines' in self.model_type or\
'sb3_contrib' == self.model_type or\
'pytorch' == self.model_type:
model.save(save_path / f"{dk.model_filename}_model.zip")
if dk.svm_model is not None:
@ -537,6 +539,9 @@ class FreqaiDataDrawer:
self.model_type, self.freqai_info['rl_config']['model_type'])
MODELCLASS = getattr(mod, self.freqai_info['rl_config']['model_type'])
model = MODELCLASS.load(dk.data_path / f"{dk.model_filename}_model")
elif self.model_type == 'pytorch':
import torch
model = torch.load(dk.data_path / f"{dk.model_filename}_model.zip")
if Path(dk.data_path / f"{dk.model_filename}_svm_model.joblib").is_file():
dk.svm_model = load(dk.data_path / f"{dk.model_filename}_svm_model.joblib")

View File

@ -0,0 +1,97 @@
import logging
from typing import Dict
from typing import Any, Dict, Tuple
import numpy.typing as npt
import numpy as np
import pandas as pd
import torch
from pandas import DataFrame
from torch.nn import functional as F
from freqtrade.freqai.base_models.BasePytorchModel import BasePytorchModel
from freqtrade.freqai.base_models.PytorchModelTrainer import PytorchModelTrainer
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.prediction_models.PytorchMLPModel import MLP
logger = logging.getLogger(__name__)
class PytorchClassifierMultiTarget(BasePytorchModel):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# todo move to config
self.n_hidden = 1024
self.labels = ['0.0', '1.0', '2.0']
self.max_iters = 100
self.batch_size = 64
self.learning_rate = 3e-4
def fit(self, data_dictionary: Dict, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
User sets up the training and test data to fit their desired model here
:param tensor_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
n_features = data_dictionary['train_features'].shape[-1]
tensor_dictionary = self.convert_data_to_tensors(data_dictionary)
model = MLP(
input_dim=n_features,
hidden_dim=self.n_hidden,
output_dim=len(self.labels)
)
model.to(self.device)
optimizer = torch.optim.AdamW(model.parameters(), lr=self.learning_rate)
init_model = self.get_init_model(dk.pair)
trainer = PytorchModelTrainer(model, optimizer, init_model=init_model)
trainer.fit(tensor_dictionary, self.max_iters, self.batch_size)
return trainer
def predict(
self, unfiltered_df: DataFrame, dk: FreqaiDataKitchen, **kwargs
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param unfiltered_df: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_df)
filtered_df, _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"] = torch.tensor(
dk.data_dictionary["prediction_features"].values
).to(self.device)
logits, _ = self.model.model(dk.data_dictionary["prediction_features"])
probs = F.softmax(logits, dim=-1)
label_ints = torch.argmax(probs, dim=-1)
pred_df_prob = DataFrame(probs.detach().numpy(), columns=self.labels)
pred_df = DataFrame(label_ints, columns=dk.label_list).astype(float).astype(str)
pred_df = pd.concat([pred_df, pred_df_prob], axis=1)
return (pred_df, dk.do_predict)
def convert_data_to_tensors(self, data_dictionary: Dict) -> Dict:
tensor_dictionary = {}
for split in ['train', 'test']:
tensor_dictionary[f'{split}_features'] = torch.tensor(
data_dictionary[f'{split}_features'].values
).to(self.device)
tensor_dictionary[f'{split}_labels'] = torch.tensor(
data_dictionary[f'{split}_labels'].astype(float).values
).long().to(self.device)
return tensor_dictionary

View File

@ -0,0 +1,31 @@
import logging
import torch
import torch.nn as nn
from torch.nn import functional as F
logger = logging.getLogger(__name__)
class MLP(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super(MLP, self).__init__()
self.input_layer = nn.Linear(input_dim, hidden_dim)
self.hidden_layer = nn.Linear(hidden_dim, hidden_dim)
self.output_layer = nn.Linear(hidden_dim, output_dim)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=0.2)
def forward(self, x, targets=None):
x = self.relu(self.input_layer(x))
x = self.dropout(x)
x = self.relu(self.hidden_layer(x))
x = self.dropout(x)
logits = self.output_layer(x)
if targets is None:
return logits, None
loss = F.cross_entropy(logits, targets.squeeze())
return logits, loss