Merge branch 'develop' into backtest_live_models
This commit is contained in:
@@ -440,7 +440,7 @@ AVAILABLE_CLI_OPTIONS = {
|
||||
"dataformat_trades": Arg(
|
||||
'--data-format-trades',
|
||||
help='Storage format for downloaded trades data. (default: `jsongz`).',
|
||||
choices=constants.AVAILABLE_DATAHANDLERS,
|
||||
choices=constants.AVAILABLE_DATAHANDLERS_TRADES,
|
||||
),
|
||||
"show_timerange": Arg(
|
||||
'--show-timerange',
|
||||
|
@@ -36,7 +36,8 @@ AVAILABLE_PAIRLISTS = ['StaticPairList', 'VolumePairList',
|
||||
'PrecisionFilter', 'PriceFilter', 'RangeStabilityFilter',
|
||||
'ShuffleFilter', 'SpreadFilter', 'VolatilityFilter']
|
||||
AVAILABLE_PROTECTIONS = ['CooldownPeriod', 'LowProfitPairs', 'MaxDrawdown', 'StoplossGuard']
|
||||
AVAILABLE_DATAHANDLERS = ['json', 'jsongz', 'hdf5']
|
||||
AVAILABLE_DATAHANDLERS_TRADES = ['json', 'jsongz', 'hdf5']
|
||||
AVAILABLE_DATAHANDLERS = AVAILABLE_DATAHANDLERS_TRADES + ['feather', 'parquet']
|
||||
BACKTEST_BREAKDOWNS = ['day', 'week', 'month']
|
||||
BACKTEST_CACHE_AGE = ['none', 'day', 'week', 'month']
|
||||
BACKTEST_CACHE_DEFAULT = 'day'
|
||||
@@ -434,7 +435,7 @@ CONF_SCHEMA = {
|
||||
},
|
||||
'dataformat_trades': {
|
||||
'type': 'string',
|
||||
'enum': AVAILABLE_DATAHANDLERS,
|
||||
'enum': AVAILABLE_DATAHANDLERS_TRADES,
|
||||
'default': 'jsongz'
|
||||
},
|
||||
'position_adjustment_enable': {'type': 'boolean'},
|
||||
|
130
freqtrade/data/history/featherdatahandler.py
Normal file
130
freqtrade/data/history/featherdatahandler.py
Normal file
@@ -0,0 +1,130 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from pandas import DataFrame, read_feather, to_datetime
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, TradeList
|
||||
from freqtrade.enums import CandleType
|
||||
|
||||
from .idatahandler import IDataHandler
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FeatherDataHandler(IDataHandler):
|
||||
|
||||
_columns = DEFAULT_DATAFRAME_COLUMNS
|
||||
|
||||
def ohlcv_store(
|
||||
self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None:
|
||||
"""
|
||||
Store data in json format "values".
|
||||
format looks as follows:
|
||||
[[<date>,<open>,<high>,<low>,<close>]]
|
||||
:param pair: Pair - used to generate filename
|
||||
:param timeframe: Timeframe - used to generate filename
|
||||
:param data: Dataframe containing OHLCV data
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
:return: None
|
||||
"""
|
||||
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
|
||||
self.create_dir_if_needed(filename)
|
||||
|
||||
data.reset_index(drop=True).loc[:, self._columns].to_feather(
|
||||
filename, compression_level=9, compression='lz4')
|
||||
|
||||
def _ohlcv_load(self, pair: str, timeframe: str,
|
||||
timerange: Optional[TimeRange], candle_type: CandleType
|
||||
) -> DataFrame:
|
||||
"""
|
||||
Internal method used to load data for one pair from disk.
|
||||
Implements the loading and conversion to a Pandas dataframe.
|
||||
Timerange trimming and dataframe validation happens outside of this method.
|
||||
:param pair: Pair to load data
|
||||
:param timeframe: Timeframe (e.g. "5m")
|
||||
:param timerange: Limit data to be loaded to this timerange.
|
||||
Optionally implemented by subclasses to avoid loading
|
||||
all data where possible.
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
:return: DataFrame with ohlcv data, or empty DataFrame
|
||||
"""
|
||||
filename = self._pair_data_filename(
|
||||
self._datadir, pair, timeframe, candle_type=candle_type)
|
||||
if not filename.exists():
|
||||
# Fallback mode for 1M files
|
||||
filename = self._pair_data_filename(
|
||||
self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True)
|
||||
if not filename.exists():
|
||||
return DataFrame(columns=self._columns)
|
||||
|
||||
pairdata = read_feather(filename)
|
||||
pairdata.columns = self._columns
|
||||
pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float',
|
||||
'low': 'float', 'close': 'float', 'volume': 'float'})
|
||||
pairdata['date'] = to_datetime(pairdata['date'],
|
||||
unit='ms',
|
||||
utc=True,
|
||||
infer_datetime_format=True)
|
||||
return pairdata
|
||||
|
||||
def ohlcv_append(
|
||||
self,
|
||||
pair: str,
|
||||
timeframe: str,
|
||||
data: DataFrame,
|
||||
candle_type: CandleType
|
||||
) -> None:
|
||||
"""
|
||||
Append data to existing data structures
|
||||
:param pair: Pair
|
||||
:param timeframe: Timeframe this ohlcv data is for
|
||||
:param data: Data to append.
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def trades_store(self, pair: str, data: TradeList) -> None:
|
||||
"""
|
||||
Store trades data (list of Dicts) to file
|
||||
:param pair: Pair - used for filename
|
||||
:param data: List of Lists containing trade data,
|
||||
column sequence as in DEFAULT_TRADES_COLUMNS
|
||||
"""
|
||||
# filename = self._pair_trades_filename(self._datadir, pair)
|
||||
|
||||
raise NotImplementedError()
|
||||
# array = pa.array(data)
|
||||
# array
|
||||
# feather.write_feather(data, filename)
|
||||
|
||||
def trades_append(self, pair: str, data: TradeList):
|
||||
"""
|
||||
Append data to existing files
|
||||
:param pair: Pair - used for filename
|
||||
:param data: List of Lists containing trade data,
|
||||
column sequence as in DEFAULT_TRADES_COLUMNS
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
|
||||
"""
|
||||
Load a pair from file, either .json.gz or .json
|
||||
# TODO: respect timerange ...
|
||||
:param pair: Load trades for this pair
|
||||
:param timerange: Timerange to load trades for - currently not implemented
|
||||
:return: List of trades
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
# filename = self._pair_trades_filename(self._datadir, pair)
|
||||
# tradesdata = misc.file_load_json(filename)
|
||||
|
||||
# if not tradesdata:
|
||||
# return []
|
||||
|
||||
# return tradesdata
|
||||
|
||||
@classmethod
|
||||
def _get_file_extension(cls):
|
||||
return "feather"
|
@@ -81,6 +81,7 @@ class HDF5DataHandler(IDataHandler):
|
||||
raise ValueError("Wrong dataframe format")
|
||||
pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float',
|
||||
'low': 'float', 'close': 'float', 'volume': 'float'})
|
||||
pairdata = pairdata.reset_index(drop=True)
|
||||
return pairdata
|
||||
|
||||
def ohlcv_append(
|
||||
|
@@ -375,6 +375,12 @@ def get_datahandlerclass(datatype: str) -> Type[IDataHandler]:
|
||||
elif datatype == 'hdf5':
|
||||
from .hdf5datahandler import HDF5DataHandler
|
||||
return HDF5DataHandler
|
||||
elif datatype == 'feather':
|
||||
from .featherdatahandler import FeatherDataHandler
|
||||
return FeatherDataHandler
|
||||
elif datatype == 'parquet':
|
||||
from .parquetdatahandler import ParquetDataHandler
|
||||
return ParquetDataHandler
|
||||
else:
|
||||
raise ValueError(f"No datahandler for datatype {datatype} available.")
|
||||
|
||||
|
129
freqtrade/data/history/parquetdatahandler.py
Normal file
129
freqtrade/data/history/parquetdatahandler.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from pandas import DataFrame, read_parquet, to_datetime
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, TradeList
|
||||
from freqtrade.enums import CandleType
|
||||
|
||||
from .idatahandler import IDataHandler
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ParquetDataHandler(IDataHandler):
|
||||
|
||||
_columns = DEFAULT_DATAFRAME_COLUMNS
|
||||
|
||||
def ohlcv_store(
|
||||
self, pair: str, timeframe: str, data: DataFrame, candle_type: CandleType) -> None:
|
||||
"""
|
||||
Store data in json format "values".
|
||||
format looks as follows:
|
||||
[[<date>,<open>,<high>,<low>,<close>]]
|
||||
:param pair: Pair - used to generate filename
|
||||
:param timeframe: Timeframe - used to generate filename
|
||||
:param data: Dataframe containing OHLCV data
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
:return: None
|
||||
"""
|
||||
filename = self._pair_data_filename(self._datadir, pair, timeframe, candle_type)
|
||||
self.create_dir_if_needed(filename)
|
||||
|
||||
data.reset_index(drop=True).loc[:, self._columns].to_parquet(filename)
|
||||
|
||||
def _ohlcv_load(self, pair: str, timeframe: str,
|
||||
timerange: Optional[TimeRange], candle_type: CandleType
|
||||
) -> DataFrame:
|
||||
"""
|
||||
Internal method used to load data for one pair from disk.
|
||||
Implements the loading and conversion to a Pandas dataframe.
|
||||
Timerange trimming and dataframe validation happens outside of this method.
|
||||
:param pair: Pair to load data
|
||||
:param timeframe: Timeframe (e.g. "5m")
|
||||
:param timerange: Limit data to be loaded to this timerange.
|
||||
Optionally implemented by subclasses to avoid loading
|
||||
all data where possible.
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
:return: DataFrame with ohlcv data, or empty DataFrame
|
||||
"""
|
||||
filename = self._pair_data_filename(
|
||||
self._datadir, pair, timeframe, candle_type=candle_type)
|
||||
if not filename.exists():
|
||||
# Fallback mode for 1M files
|
||||
filename = self._pair_data_filename(
|
||||
self._datadir, pair, timeframe, candle_type=candle_type, no_timeframe_modify=True)
|
||||
if not filename.exists():
|
||||
return DataFrame(columns=self._columns)
|
||||
|
||||
pairdata = read_parquet(filename)
|
||||
pairdata.columns = self._columns
|
||||
pairdata = pairdata.astype(dtype={'open': 'float', 'high': 'float',
|
||||
'low': 'float', 'close': 'float', 'volume': 'float'})
|
||||
pairdata['date'] = to_datetime(pairdata['date'],
|
||||
unit='ms',
|
||||
utc=True,
|
||||
infer_datetime_format=True)
|
||||
return pairdata
|
||||
|
||||
def ohlcv_append(
|
||||
self,
|
||||
pair: str,
|
||||
timeframe: str,
|
||||
data: DataFrame,
|
||||
candle_type: CandleType
|
||||
) -> None:
|
||||
"""
|
||||
Append data to existing data structures
|
||||
:param pair: Pair
|
||||
:param timeframe: Timeframe this ohlcv data is for
|
||||
:param data: Data to append.
|
||||
:param candle_type: Any of the enum CandleType (must match trading mode!)
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def trades_store(self, pair: str, data: TradeList) -> None:
|
||||
"""
|
||||
Store trades data (list of Dicts) to file
|
||||
:param pair: Pair - used for filename
|
||||
:param data: List of Lists containing trade data,
|
||||
column sequence as in DEFAULT_TRADES_COLUMNS
|
||||
"""
|
||||
# filename = self._pair_trades_filename(self._datadir, pair)
|
||||
|
||||
raise NotImplementedError()
|
||||
# array = pa.array(data)
|
||||
# array
|
||||
# feather.write_feather(data, filename)
|
||||
|
||||
def trades_append(self, pair: str, data: TradeList):
|
||||
"""
|
||||
Append data to existing files
|
||||
:param pair: Pair - used for filename
|
||||
:param data: List of Lists containing trade data,
|
||||
column sequence as in DEFAULT_TRADES_COLUMNS
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _trades_load(self, pair: str, timerange: Optional[TimeRange] = None) -> TradeList:
|
||||
"""
|
||||
Load a pair from file, either .json.gz or .json
|
||||
# TODO: respect timerange ...
|
||||
:param pair: Load trades for this pair
|
||||
:param timerange: Timerange to load trades for - currently not implemented
|
||||
:return: List of trades
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
# filename = self._pair_trades_filename(self._datadir, pair)
|
||||
# tradesdata = misc.file_load_json(filename)
|
||||
|
||||
# if not tradesdata:
|
||||
# return []
|
||||
|
||||
# return tradesdata
|
||||
|
||||
@classmethod
|
||||
def _get_file_extension(cls):
|
||||
return "parquet"
|
@@ -313,6 +313,7 @@ class FreqaiDataDrawer:
|
||||
"""
|
||||
|
||||
dk.find_features(dataframe)
|
||||
dk.find_labels(dataframe)
|
||||
|
||||
full_labels = dk.label_list + dk.unique_class_list
|
||||
|
||||
@@ -376,7 +377,27 @@ class FreqaiDataDrawer:
|
||||
if self.config.get("freqai", {}).get("purge_old_models", False):
|
||||
self.purge_old_models()
|
||||
|
||||
# Functions pulled back from FreqaiDataKitchen because they relied on DataDrawer
|
||||
def save_metadata(self, dk: FreqaiDataKitchen) -> None:
|
||||
"""
|
||||
Saves only metadata for backtesting studies if user prefers
|
||||
not to save model data. This saves tremendous amounts of space
|
||||
for users generating huge studies.
|
||||
This is only active when `save_backtest_models`: false (not default)
|
||||
"""
|
||||
if not dk.data_path.is_dir():
|
||||
dk.data_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
save_path = Path(dk.data_path)
|
||||
|
||||
dk.data["data_path"] = str(dk.data_path)
|
||||
dk.data["model_filename"] = str(dk.model_filename)
|
||||
dk.data["training_features_list"] = list(dk.data_dictionary["train_features"].columns)
|
||||
dk.data["label_list"] = dk.label_list
|
||||
|
||||
with open(save_path / f"{dk.model_filename}_metadata.json", "w") as fp:
|
||||
rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
|
||||
|
||||
return
|
||||
|
||||
def save_data(self, model: Any, coin: str, dk: FreqaiDataKitchen) -> None:
|
||||
"""
|
||||
|
@@ -858,7 +858,7 @@ class FreqaiDataKitchen:
|
||||
|
||||
inlier_metric = pd.DataFrame(
|
||||
data=inliers.sum(axis=1) / no_prev_pts,
|
||||
columns=['inlier_metric'],
|
||||
columns=['%-inlier_metric'],
|
||||
index=compute_df.index
|
||||
)
|
||||
|
||||
@@ -908,11 +908,14 @@ class FreqaiDataKitchen:
|
||||
"""
|
||||
column_names = dataframe.columns
|
||||
features = [c for c in column_names if "%" in c]
|
||||
labels = [c for c in column_names if "&" in c]
|
||||
if not features:
|
||||
raise OperationalException("Could not find any features!")
|
||||
|
||||
self.training_features_list = features
|
||||
|
||||
def find_labels(self, dataframe: DataFrame) -> None:
|
||||
column_names = dataframe.columns
|
||||
labels = [c for c in column_names if "&" in c]
|
||||
self.label_list = labels
|
||||
|
||||
def check_if_pred_in_training_spaces(self) -> None:
|
||||
@@ -1233,7 +1236,8 @@ class FreqaiDataKitchen:
|
||||
|
||||
def get_unique_classes_from_labels(self, dataframe: DataFrame) -> None:
|
||||
|
||||
self.find_features(dataframe)
|
||||
# self.find_features(dataframe)
|
||||
self.find_labels(dataframe)
|
||||
|
||||
for key in self.label_list:
|
||||
if dataframe[key].dtype == object:
|
||||
|
783
freqtrade/freqai/freqai_interface copy.py
Normal file
783
freqtrade/freqai/freqai_interface copy.py
Normal file
@@ -0,0 +1,783 @@
|
||||
import logging
|
||||
import shutil
|
||||
import threading
|
||||
import time
|
||||
from abc import ABC, abstractmethod
|
||||
from collections import deque
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from threading import Lock
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from numpy.typing import NDArray
|
||||
from pandas import DataFrame
|
||||
|
||||
from freqtrade.configuration import TimeRange
|
||||
from freqtrade.constants import DATETIME_PRINT_FORMAT, Config
|
||||
from freqtrade.enums import RunMode
|
||||
from freqtrade.exceptions import OperationalException
|
||||
from freqtrade.exchange import timeframe_to_seconds
|
||||
from freqtrade.freqai.data_drawer import FreqaiDataDrawer
|
||||
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
|
||||
from freqtrade.freqai.utils import plot_feature_importance
|
||||
from freqtrade.strategy.interface import IStrategy
|
||||
|
||||
|
||||
pd.options.mode.chained_assignment = None
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class IFreqaiModel(ABC):
|
||||
"""
|
||||
Class containing all tools for training and prediction in the strategy.
|
||||
Base*PredictionModels inherit from this class.
|
||||
|
||||
Record of contribution:
|
||||
FreqAI was developed by a group of individuals who all contributed specific skillsets to the
|
||||
project.
|
||||
|
||||
Conception and software development:
|
||||
Robert Caulk @robcaulk
|
||||
|
||||
Theoretical brainstorming:
|
||||
Elin Törnquist @th0rntwig
|
||||
|
||||
Code review, software architecture brainstorming:
|
||||
@xmatthias
|
||||
|
||||
Beta testing and bug reporting:
|
||||
@bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
|
||||
Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
|
||||
"""
|
||||
|
||||
def __init__(self, config: Config) -> None:
|
||||
|
||||
self.config = config
|
||||
self.assert_config(self.config)
|
||||
self.freqai_info: Dict[str, Any] = config["freqai"]
|
||||
self.data_split_parameters: Dict[str, Any] = config.get("freqai", {}).get(
|
||||
"data_split_parameters", {})
|
||||
self.model_training_parameters: Dict[str, Any] = config.get("freqai", {}).get(
|
||||
"model_training_parameters", {})
|
||||
self.retrain = False
|
||||
self.first = True
|
||||
self.set_full_path()
|
||||
self.follow_mode: bool = self.freqai_info.get("follow_mode", False)
|
||||
self.save_backtest_models: bool = self.freqai_info.get("save_backtest_models", True)
|
||||
if self.save_backtest_models:
|
||||
logger.info('Backtesting module configured to save all models.')
|
||||
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
|
||||
self.identifier: str = self.freqai_info.get("identifier", "no_id_provided")
|
||||
self.scanning = False
|
||||
self.ft_params = self.freqai_info["feature_parameters"]
|
||||
self.keras: bool = self.freqai_info.get("keras", False)
|
||||
if self.keras and self.ft_params.get("DI_threshold", 0):
|
||||
self.ft_params["DI_threshold"] = 0
|
||||
logger.warning("DI threshold is not configured for Keras models yet. Deactivating.")
|
||||
self.CONV_WIDTH = self.freqai_info.get("conv_width", 2)
|
||||
if self.ft_params.get("inlier_metric_window", 0):
|
||||
self.CONV_WIDTH = self.ft_params.get("inlier_metric_window", 0) * 2
|
||||
self.pair_it = 0
|
||||
self.pair_it_train = 0
|
||||
self.total_pairs = len(self.config.get("exchange", {}).get("pair_whitelist"))
|
||||
self.train_queue = self._set_train_queue()
|
||||
self.last_trade_database_summary: DataFrame = {}
|
||||
self.current_trade_database_summary: DataFrame = {}
|
||||
self.analysis_lock = Lock()
|
||||
self.inference_time: float = 0
|
||||
self.train_time: float = 0
|
||||
self.begin_time: float = 0
|
||||
self.begin_time_train: float = 0
|
||||
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
|
||||
self.continual_learning = self.freqai_info.get('continual_learning', False)
|
||||
|
||||
self._threads: List[threading.Thread] = []
|
||||
self._stop_event = threading.Event()
|
||||
|
||||
def __getstate__(self):
|
||||
"""
|
||||
Return an empty state to be pickled in hyperopt
|
||||
"""
|
||||
return ({})
|
||||
|
||||
def assert_config(self, config: Config) -> None:
|
||||
|
||||
if not config.get("freqai", {}):
|
||||
raise OperationalException("No freqai parameters found in configuration file.")
|
||||
|
||||
def start(self, dataframe: DataFrame, metadata: dict, strategy: IStrategy) -> DataFrame:
|
||||
"""
|
||||
Entry point to the FreqaiModel from a specific pair, it will train a new model if
|
||||
necessary before making the prediction.
|
||||
|
||||
:param dataframe: Full dataframe coming from strategy - it contains entire
|
||||
backtesting timerange + additional historical data necessary to train
|
||||
the model.
|
||||
:param metadata: pair metadata coming from strategy.
|
||||
:param strategy: Strategy to train on
|
||||
"""
|
||||
|
||||
self.live = strategy.dp.runmode in (RunMode.DRY_RUN, RunMode.LIVE)
|
||||
self.dd.set_pair_dict_info(metadata)
|
||||
|
||||
if self.live:
|
||||
self.inference_timer('start')
|
||||
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
|
||||
dk = self.start_live(dataframe, metadata, strategy, self.dk)
|
||||
|
||||
# For backtesting, each pair enters and then gets trained for each window along the
|
||||
# sliding window defined by "train_period_days" (training window) and "live_retrain_hours"
|
||||
# (backtest window, i.e. window immediately following the training window).
|
||||
# FreqAI slides the window and sequentially builds the backtesting results before returning
|
||||
# the concatenated results for the full backtesting period back to the strategy.
|
||||
elif not self.follow_mode:
|
||||
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
|
||||
if(self.dk.backtest_live_models):
|
||||
logger.info(
|
||||
f"Backtesting {len(self.dk.backtesting_timeranges)} timeranges (Live Models)")
|
||||
else:
|
||||
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
|
||||
dataframe = self.dk.use_strategy_to_populate_indicators(
|
||||
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
|
||||
)
|
||||
dk = self.start_backtesting(dataframe, metadata, self.dk)
|
||||
# else:
|
||||
# dk = self.start_backtesting_live_models(dataframe, metadata, self.dk)
|
||||
|
||||
dataframe = dk.remove_features_from_df(dk.return_dataframe)
|
||||
self.clean_up()
|
||||
if self.live:
|
||||
self.inference_timer('stop')
|
||||
return dataframe
|
||||
|
||||
def clean_up(self):
|
||||
"""
|
||||
Objects that should be handled by GC already between coins, but
|
||||
are explicitly shown here to help demonstrate the non-persistence of these
|
||||
objects.
|
||||
"""
|
||||
self.model = None
|
||||
self.dk = None
|
||||
|
||||
def shutdown(self):
|
||||
"""
|
||||
Cleans up threads on Shutdown, set stop event. Join threads to wait
|
||||
for current training iteration.
|
||||
"""
|
||||
logger.info("Stopping FreqAI")
|
||||
self._stop_event.set()
|
||||
|
||||
logger.info("Waiting on Training iteration")
|
||||
for _thread in self._threads:
|
||||
_thread.join()
|
||||
|
||||
def start_scanning(self, *args, **kwargs) -> None:
|
||||
"""
|
||||
Start `self._start_scanning` in a separate thread
|
||||
"""
|
||||
_thread = threading.Thread(target=self._start_scanning, args=args, kwargs=kwargs)
|
||||
self._threads.append(_thread)
|
||||
_thread.start()
|
||||
|
||||
def _start_scanning(self, strategy: IStrategy) -> None:
|
||||
"""
|
||||
Function designed to constantly scan pairs for retraining on a separate thread (intracandle)
|
||||
to improve model youth. This function is agnostic to data preparation/collection/storage,
|
||||
it simply trains on what ever data is available in the self.dd.
|
||||
:param strategy: IStrategy = The user defined strategy class
|
||||
"""
|
||||
while not self._stop_event.is_set():
|
||||
time.sleep(1)
|
||||
pair = self.train_queue[0]
|
||||
|
||||
# ensure pair is avaialble in dp
|
||||
if pair not in strategy.dp.current_whitelist():
|
||||
self.train_queue.popleft()
|
||||
logger.warning(f'{pair} not in current whitelist, removing from train queue.')
|
||||
continue
|
||||
|
||||
(_, trained_timestamp, _) = self.dd.get_pair_dict_info(pair)
|
||||
|
||||
dk = FreqaiDataKitchen(self.config, self.live, pair)
|
||||
dk.set_paths(pair, trained_timestamp)
|
||||
(
|
||||
retrain,
|
||||
new_trained_timerange,
|
||||
data_load_timerange,
|
||||
) = dk.check_if_new_training_required(trained_timestamp)
|
||||
dk.set_paths(pair, new_trained_timerange.stopts)
|
||||
|
||||
if retrain:
|
||||
self.train_timer('start')
|
||||
try:
|
||||
self.extract_data_and_train_model(
|
||||
new_trained_timerange, pair, strategy, dk, data_load_timerange
|
||||
)
|
||||
except Exception as msg:
|
||||
logger.warning(f'Training {pair} raised exception {msg}, skipping.')
|
||||
|
||||
self.train_timer('stop')
|
||||
|
||||
# only rotate the queue after the first has been trained.
|
||||
self.train_queue.rotate(-1)
|
||||
|
||||
self.dd.save_historic_predictions_to_disk()
|
||||
|
||||
def start_backtesting(
|
||||
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
|
||||
) -> FreqaiDataKitchen:
|
||||
"""
|
||||
The main broad execution for backtesting. For backtesting, each pair enters and then gets
|
||||
trained for each window along the sliding window defined by "train_period_days"
|
||||
(training window) and "backtest_period_days" (backtest window, i.e. window immediately
|
||||
following the training window). FreqAI slides the window and sequentially builds
|
||||
the backtesting results before returning the concatenated results for the full
|
||||
backtesting period back to the strategy.
|
||||
:param dataframe: DataFrame = strategy passed dataframe
|
||||
:param metadata: Dict = pair metadata
|
||||
:param dk: FreqaiDataKitchen = Data management/analysis tool associated to present pair only
|
||||
:return:
|
||||
FreqaiDataKitchen = Data management/analysis tool associated to present pair only
|
||||
"""
|
||||
|
||||
self.pair_it += 1
|
||||
train_it = 0
|
||||
# Loop enforcing the sliding window training/backtesting paradigm
|
||||
# tr_train is the training time range e.g. 1 historical month
|
||||
# tr_backtest is the backtesting time range e.g. the week directly
|
||||
# following tr_train. Both of these windows slide through the
|
||||
# entire backtest
|
||||
for tr_train, tr_backtest in zip(dk.training_timeranges, dk.backtesting_timeranges):
|
||||
pair = metadata["pair"]
|
||||
(_, _, _) = self.dd.get_pair_dict_info(pair)
|
||||
train_it += 1
|
||||
total_trains = len(dk.backtesting_timeranges)
|
||||
self.training_timerange = tr_train
|
||||
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
|
||||
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
|
||||
|
||||
trained_timestamp = tr_train
|
||||
tr_train_startts_str = datetime.fromtimestamp(
|
||||
tr_train.startts,
|
||||
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
|
||||
tr_train_stopts_str = datetime.fromtimestamp(
|
||||
tr_train.stopts,
|
||||
tz=timezone.utc).strftime(DATETIME_PRINT_FORMAT)
|
||||
if not dk.backtest_live_models:
|
||||
logger.info(
|
||||
f"Training {pair}, {self.pair_it}/{self.total_pairs} pairs"
|
||||
f" from {tr_train_startts_str}"
|
||||
f" to {tr_train_stopts_str}, {train_it}/{total_trains} "
|
||||
"trains"
|
||||
)
|
||||
|
||||
timestamp_model_id = int(trained_timestamp.stopts)
|
||||
if dk.backtest_live_models:
|
||||
timestamp_model_id = int(tr_backtest.startts)
|
||||
|
||||
dk.data_path = Path(
|
||||
dk.full_path / f"sub-train-{pair.split('/')[0]}_{timestamp_model_id}"
|
||||
)
|
||||
|
||||
dk.set_new_model_names(pair, timestamp_model_id)
|
||||
|
||||
if dk.check_if_backtest_prediction_exists():
|
||||
self.dd.load_metadata(dk)
|
||||
if not dk.backtest_live_models:
|
||||
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
|
||||
|
||||
append_df = dk.get_backtesting_prediction()
|
||||
dk.append_predictions(append_df)
|
||||
else:
|
||||
if not self.model_exists(dk):
|
||||
if dk.backtest_live_models:
|
||||
raise OperationalException(
|
||||
"Training models is not allowed "
|
||||
"in backtest_live_models backtesting "
|
||||
"mode"
|
||||
)
|
||||
dk.find_features(dataframe_train)
|
||||
self.model = self.train(dataframe_train, pair, dk)
|
||||
self.dd.pair_dict[pair]["trained_timestamp"] = int(
|
||||
trained_timestamp.stopts)
|
||||
|
||||
if self.save_backtest_models:
|
||||
logger.info('Saving backtest model to disk.')
|
||||
self.dd.save_data(self.model, pair, dk)
|
||||
else:
|
||||
self.model = self.dd.load_data(pair, dk)
|
||||
|
||||
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
|
||||
|
||||
pred_df, do_preds = self.predict(dataframe_backtest, dk)
|
||||
append_df = dk.get_predictions_to_append(pred_df, do_preds)
|
||||
dk.append_predictions(append_df)
|
||||
dk.save_backtesting_prediction(append_df)
|
||||
|
||||
dk.fill_predictions(dataframe)
|
||||
return dk
|
||||
|
||||
def start_live(
|
||||
self, dataframe: DataFrame, metadata: dict, strategy: IStrategy, dk: FreqaiDataKitchen
|
||||
) -> FreqaiDataKitchen:
|
||||
"""
|
||||
The main broad execution for dry/live. This function will check if a retraining should be
|
||||
performed, and if so, retrain and reset the model.
|
||||
:param dataframe: DataFrame = strategy passed dataframe
|
||||
:param metadata: Dict = pair metadata
|
||||
:param strategy: IStrategy = currently employed strategy
|
||||
dk: FreqaiDataKitchen = Data management/analysis tool associated to present pair only
|
||||
:returns:
|
||||
dk: FreqaiDataKitchen = Data management/analysis tool associated to present pair only
|
||||
"""
|
||||
|
||||
# update follower
|
||||
if self.follow_mode:
|
||||
self.dd.update_follower_metadata()
|
||||
|
||||
# get the model metadata associated with the current pair
|
||||
(_, trained_timestamp, return_null_array) = self.dd.get_pair_dict_info(metadata["pair"])
|
||||
|
||||
# if the metadata doesn't exist, the follower returns null arrays to strategy
|
||||
if self.follow_mode and return_null_array:
|
||||
logger.info("Returning null array from follower to strategy")
|
||||
self.dd.return_null_values_to_strategy(dataframe, dk)
|
||||
return dk
|
||||
|
||||
# append the historic data once per round
|
||||
if self.dd.historic_data:
|
||||
self.dd.update_historic_data(strategy, dk)
|
||||
logger.debug(f'Updating historic data on pair {metadata["pair"]}')
|
||||
|
||||
if not self.follow_mode:
|
||||
|
||||
(_, new_trained_timerange, data_load_timerange) = dk.check_if_new_training_required(
|
||||
trained_timestamp
|
||||
)
|
||||
dk.set_paths(metadata["pair"], new_trained_timerange.stopts)
|
||||
|
||||
# load candle history into memory if it is not yet.
|
||||
if not self.dd.historic_data:
|
||||
self.dd.load_all_pair_histories(data_load_timerange, dk)
|
||||
|
||||
if not self.scanning:
|
||||
self.scanning = True
|
||||
self.start_scanning(strategy)
|
||||
|
||||
elif self.follow_mode:
|
||||
dk.set_paths(metadata["pair"], trained_timestamp)
|
||||
logger.info(
|
||||
"FreqAI instance set to follow_mode, finding existing pair "
|
||||
f"using { self.identifier }"
|
||||
)
|
||||
|
||||
# load the model and associated data into the data kitchen
|
||||
self.model = self.dd.load_data(metadata["pair"], dk)
|
||||
|
||||
with self.analysis_lock:
|
||||
dataframe = self.dk.use_strategy_to_populate_indicators(
|
||||
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
|
||||
)
|
||||
|
||||
if not self.model:
|
||||
logger.warning(
|
||||
f"No model ready for {metadata['pair']}, returning null values to strategy."
|
||||
)
|
||||
self.dd.return_null_values_to_strategy(dataframe, dk)
|
||||
return dk
|
||||
|
||||
# ensure user is feeding the correct indicators to the model
|
||||
self.check_if_feature_list_matches_strategy(dataframe, dk)
|
||||
|
||||
self.build_strategy_return_arrays(dataframe, dk, metadata["pair"], trained_timestamp)
|
||||
|
||||
return dk
|
||||
|
||||
def build_strategy_return_arrays(
|
||||
self, dataframe: DataFrame, dk: FreqaiDataKitchen, pair: str, trained_timestamp: int
|
||||
) -> None:
|
||||
|
||||
# hold the historical predictions in memory so we are sending back
|
||||
# correct array to strategy
|
||||
|
||||
if pair not in self.dd.model_return_values:
|
||||
# first predictions are made on entire historical candle set coming from strategy. This
|
||||
# allows FreqUI to show full return values.
|
||||
pred_df, do_preds = self.predict(dataframe, dk)
|
||||
if pair not in self.dd.historic_predictions:
|
||||
self.set_initial_historic_predictions(pred_df, dk, pair)
|
||||
self.dd.set_initial_return_values(pair, pred_df)
|
||||
|
||||
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
|
||||
return
|
||||
elif self.dk.check_if_model_expired(trained_timestamp):
|
||||
pred_df = DataFrame(np.zeros((2, len(dk.label_list))), columns=dk.label_list)
|
||||
do_preds = np.ones(2, dtype=np.int_) * 2
|
||||
dk.DI_values = np.zeros(2)
|
||||
logger.warning(
|
||||
f"Model expired for {pair}, returning null values to strategy. Strategy "
|
||||
"construction should take care to consider this event with "
|
||||
"prediction == 0 and do_predict == 2"
|
||||
)
|
||||
else:
|
||||
# remaining predictions are made only on the most recent candles for performance and
|
||||
# historical accuracy reasons.
|
||||
pred_df, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dk, first=False)
|
||||
|
||||
if self.freqai_info.get('fit_live_predictions_candles', 0) and self.live:
|
||||
self.fit_live_predictions(dk, pair)
|
||||
self.dd.append_model_predictions(pair, pred_df, do_preds, dk, len(dataframe))
|
||||
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
|
||||
|
||||
return
|
||||
|
||||
def check_if_feature_list_matches_strategy(
|
||||
self, dataframe: DataFrame, dk: FreqaiDataKitchen
|
||||
) -> None:
|
||||
"""
|
||||
Ensure user is passing the proper feature set if they are reusing an `identifier` pointing
|
||||
to a folder holding existing models.
|
||||
:param dataframe: DataFrame = strategy provided dataframe
|
||||
:param dk: FreqaiDataKitchen = non-persistent data container/analyzer for
|
||||
current coin/bot loop
|
||||
"""
|
||||
dk.find_features(dataframe)
|
||||
if "training_features_list_raw" in dk.data:
|
||||
feature_list = dk.data["training_features_list_raw"]
|
||||
else:
|
||||
feature_list = dk.data['training_features_list']
|
||||
if dk.training_features_list != feature_list:
|
||||
raise OperationalException(
|
||||
"Trying to access pretrained model with `identifier` "
|
||||
"but found different features furnished by current strategy."
|
||||
"Change `identifier` to train from scratch, or ensure the"
|
||||
"strategy is furnishing the same features as the pretrained"
|
||||
"model. In case of --strategy-list, please be aware that FreqAI "
|
||||
"requires all strategies to maintain identical "
|
||||
"populate_any_indicator() functions"
|
||||
)
|
||||
|
||||
def data_cleaning_train(self, dk: FreqaiDataKitchen) -> None:
|
||||
"""
|
||||
Base data cleaning method for train.
|
||||
Functions here improve/modify the input data by identifying outliers,
|
||||
computing additional metrics, adding noise, reducing dimensionality etc.
|
||||
"""
|
||||
|
||||
ft_params = self.freqai_info["feature_parameters"]
|
||||
|
||||
if ft_params.get('inlier_metric_window', 0):
|
||||
dk.compute_inlier_metric(set_='train')
|
||||
if self.freqai_info["data_split_parameters"]["test_size"] > 0:
|
||||
dk.compute_inlier_metric(set_='test')
|
||||
|
||||
if ft_params.get(
|
||||
"principal_component_analysis", False
|
||||
):
|
||||
dk.principal_component_analysis()
|
||||
|
||||
if ft_params.get("use_SVM_to_remove_outliers", False):
|
||||
dk.use_SVM_to_remove_outliers(predict=False)
|
||||
|
||||
if ft_params.get("DI_threshold", 0):
|
||||
dk.data["avg_mean_dist"] = dk.compute_distances()
|
||||
|
||||
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
|
||||
if dk.pair in self.dd.old_DBSCAN_eps:
|
||||
eps = self.dd.old_DBSCAN_eps[dk.pair]
|
||||
else:
|
||||
eps = None
|
||||
dk.use_DBSCAN_to_remove_outliers(predict=False, eps=eps)
|
||||
self.dd.old_DBSCAN_eps[dk.pair] = dk.data['DBSCAN_eps']
|
||||
|
||||
if self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0):
|
||||
dk.add_noise_to_training_features()
|
||||
|
||||
def data_cleaning_predict(self, dk: FreqaiDataKitchen, dataframe: DataFrame) -> None:
|
||||
"""
|
||||
Base data cleaning method for predict.
|
||||
Functions here are complementary to the functions of data_cleaning_train.
|
||||
"""
|
||||
ft_params = self.freqai_info["feature_parameters"]
|
||||
|
||||
if ft_params.get('inlier_metric_window', 0):
|
||||
dk.compute_inlier_metric(set_='predict')
|
||||
|
||||
if ft_params.get(
|
||||
"principal_component_analysis", False
|
||||
):
|
||||
dk.pca_transform(self.dk.data_dictionary['prediction_features'])
|
||||
|
||||
if ft_params.get("use_SVM_to_remove_outliers", False):
|
||||
dk.use_SVM_to_remove_outliers(predict=True)
|
||||
|
||||
if ft_params.get("DI_threshold", 0):
|
||||
dk.check_if_pred_in_training_spaces()
|
||||
|
||||
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
|
||||
dk.use_DBSCAN_to_remove_outliers(predict=True)
|
||||
|
||||
def model_exists(
|
||||
self,
|
||||
dk: FreqaiDataKitchen,
|
||||
scanning: bool = False,
|
||||
) -> bool:
|
||||
"""
|
||||
Given a pair and path, check if a model already exists
|
||||
:param pair: pair e.g. BTC/USD
|
||||
:param path: path to model
|
||||
:return:
|
||||
:boolean: whether the model file exists or not.
|
||||
"""
|
||||
path_to_modelfile = Path(dk.data_path / f"{dk.model_filename}_model.joblib")
|
||||
file_exists = path_to_modelfile.is_file()
|
||||
if file_exists and not scanning:
|
||||
logger.info("Found model at %s", dk.data_path / dk.model_filename)
|
||||
elif not scanning:
|
||||
logger.info("Could not find model at %s", dk.data_path / dk.model_filename)
|
||||
return file_exists
|
||||
|
||||
def set_full_path(self) -> None:
|
||||
self.full_path = Path(
|
||||
self.config["user_data_dir"] / "models" / f"{self.freqai_info['identifier']}"
|
||||
)
|
||||
self.full_path.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy(
|
||||
self.config["config_files"][0],
|
||||
Path(self.full_path, Path(self.config["config_files"][0]).name),
|
||||
)
|
||||
|
||||
def extract_data_and_train_model(
|
||||
self,
|
||||
new_trained_timerange: TimeRange,
|
||||
pair: str,
|
||||
strategy: IStrategy,
|
||||
dk: FreqaiDataKitchen,
|
||||
data_load_timerange: TimeRange,
|
||||
):
|
||||
"""
|
||||
Retrieve data and train model.
|
||||
:param new_trained_timerange: TimeRange = the timerange to train the model on
|
||||
:param metadata: dict = strategy provided metadata
|
||||
:param strategy: IStrategy = user defined strategy object
|
||||
:param dk: FreqaiDataKitchen = non-persistent data container for current coin/loop
|
||||
:param data_load_timerange: TimeRange = the amount of data to be loaded
|
||||
for populate_any_indicators
|
||||
(larger than new_trained_timerange so that
|
||||
new_trained_timerange does not contain any NaNs)
|
||||
"""
|
||||
|
||||
corr_dataframes, base_dataframes = self.dd.get_base_and_corr_dataframes(
|
||||
data_load_timerange, pair, dk
|
||||
)
|
||||
|
||||
with self.analysis_lock:
|
||||
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
|
||||
strategy, corr_dataframes, base_dataframes, pair
|
||||
)
|
||||
|
||||
unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
|
||||
|
||||
# find the features indicated by strategy and store in datakitchen
|
||||
dk.find_features(unfiltered_dataframe)
|
||||
|
||||
model = self.train(unfiltered_dataframe, pair, dk)
|
||||
|
||||
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
|
||||
dk.set_new_model_names(pair, int(new_trained_timerange.stopts))
|
||||
self.dd.save_data(model, pair, dk)
|
||||
|
||||
if self.freqai_info["feature_parameters"].get("plot_feature_importance", False):
|
||||
plot_feature_importance(model, pair, dk)
|
||||
|
||||
if self.freqai_info.get("purge_old_models", False):
|
||||
self.dd.purge_old_models()
|
||||
|
||||
def set_initial_historic_predictions(
|
||||
self, pred_df: DataFrame, dk: FreqaiDataKitchen, pair: str
|
||||
) -> None:
|
||||
"""
|
||||
This function is called only if the datadrawer failed to load an
|
||||
existing set of historic predictions. In this case, it builds
|
||||
the structure and sets fake predictions off the first training
|
||||
data. After that, FreqAI will append new real predictions to the
|
||||
set of historic predictions.
|
||||
|
||||
These values are used to generate live statistics which can be used
|
||||
in the strategy for adaptive values. E.g. &*_mean/std are quantities
|
||||
that can computed based on live predictions from the set of historical
|
||||
predictions. Those values can be used in the user strategy to better
|
||||
assess prediction rarity, and thus wait for probabilistically favorable
|
||||
entries relative to the live historical predictions.
|
||||
|
||||
If the user reuses an identifier on a subsequent instance,
|
||||
this function will not be called. In that case, "real" predictions
|
||||
will be appended to the loaded set of historic predictions.
|
||||
:param: df: DataFrame = the dataframe containing the training feature data
|
||||
:param: model: Any = A model which was `fit` using a common library such as
|
||||
catboost or lightgbm
|
||||
:param: dk: FreqaiDataKitchen = object containing methods for data analysis
|
||||
:param: pair: str = current pair
|
||||
"""
|
||||
|
||||
self.dd.historic_predictions[pair] = pred_df
|
||||
hist_preds_df = self.dd.historic_predictions[pair]
|
||||
|
||||
for label in hist_preds_df.columns:
|
||||
if hist_preds_df[label].dtype == object:
|
||||
continue
|
||||
hist_preds_df[f'{label}_mean'] = 0
|
||||
hist_preds_df[f'{label}_std'] = 0
|
||||
|
||||
hist_preds_df['do_predict'] = 0
|
||||
|
||||
if self.freqai_info['feature_parameters'].get('DI_threshold', 0) > 0:
|
||||
hist_preds_df['DI_values'] = 0
|
||||
|
||||
for return_str in dk.data['extra_returns_per_train']:
|
||||
hist_preds_df[return_str] = 0
|
||||
|
||||
# # for keras type models, the conv_window needs to be prepended so
|
||||
# # viewing is correct in frequi
|
||||
if self.freqai_info.get('keras', False) or self.ft_params.get('inlier_metric_window', 0):
|
||||
n_lost_points = self.freqai_info.get('conv_width', 2)
|
||||
zeros_df = DataFrame(np.zeros((n_lost_points, len(hist_preds_df.columns))),
|
||||
columns=hist_preds_df.columns)
|
||||
self.dd.historic_predictions[pair] = pd.concat(
|
||||
[zeros_df, hist_preds_df], axis=0, ignore_index=True)
|
||||
|
||||
def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
|
||||
"""
|
||||
Fit the labels with a gaussian distribution
|
||||
"""
|
||||
import scipy as spy
|
||||
|
||||
# add classes from classifier label types if used
|
||||
full_labels = dk.label_list + dk.unique_class_list
|
||||
|
||||
num_candles = self.freqai_info.get("fit_live_predictions_candles", 100)
|
||||
dk.data["labels_mean"], dk.data["labels_std"] = {}, {}
|
||||
for label in full_labels:
|
||||
if self.dd.historic_predictions[dk.pair][label].dtype == object:
|
||||
continue
|
||||
f = spy.stats.norm.fit(self.dd.historic_predictions[dk.pair][label].tail(num_candles))
|
||||
dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
|
||||
|
||||
return
|
||||
|
||||
def inference_timer(self, do='start'):
|
||||
"""
|
||||
Timer designed to track the cumulative time spent in FreqAI for one pass through
|
||||
the whitelist. This will check if the time spent is more than 1/4 the time
|
||||
of a single candle, and if so, it will warn the user of degraded performance
|
||||
"""
|
||||
if do == 'start':
|
||||
self.pair_it += 1
|
||||
self.begin_time = time.time()
|
||||
elif do == 'stop':
|
||||
end = time.time()
|
||||
self.inference_time += (end - self.begin_time)
|
||||
if self.pair_it == self.total_pairs:
|
||||
logger.info(
|
||||
f'Total time spent inferencing pairlist {self.inference_time:.2f} seconds')
|
||||
if self.inference_time > 0.25 * self.base_tf_seconds:
|
||||
logger.warning("Inference took over 25% of the candle time. Reduce pairlist to"
|
||||
" avoid blinding open trades and degrading performance.")
|
||||
self.pair_it = 0
|
||||
self.inference_time = 0
|
||||
return
|
||||
|
||||
def train_timer(self, do='start'):
|
||||
"""
|
||||
Timer designed to track the cumulative time spent training the full pairlist in
|
||||
FreqAI.
|
||||
"""
|
||||
if do == 'start':
|
||||
self.pair_it_train += 1
|
||||
self.begin_time_train = time.time()
|
||||
elif do == 'stop':
|
||||
end = time.time()
|
||||
self.train_time += (end - self.begin_time_train)
|
||||
if self.pair_it_train == self.total_pairs:
|
||||
logger.info(
|
||||
f'Total time spent training pairlist {self.train_time:.2f} seconds')
|
||||
self.pair_it_train = 0
|
||||
self.train_time = 0
|
||||
return
|
||||
|
||||
def get_init_model(self, pair: str) -> Any:
|
||||
if pair not in self.dd.model_dictionary or not self.continual_learning:
|
||||
init_model = None
|
||||
else:
|
||||
init_model = self.dd.model_dictionary[pair]
|
||||
|
||||
return init_model
|
||||
|
||||
def _set_train_queue(self):
|
||||
"""
|
||||
Sets train queue from existing train timestamps if they exist
|
||||
otherwise it sets the train queue based on the provided whitelist.
|
||||
"""
|
||||
current_pairlist = self.config.get("exchange", {}).get("pair_whitelist")
|
||||
if not self.dd.pair_dict:
|
||||
logger.info('Set fresh train queue from whitelist. '
|
||||
f'Queue: {current_pairlist}')
|
||||
return deque(current_pairlist)
|
||||
|
||||
best_queue = deque()
|
||||
|
||||
pair_dict_sorted = sorted(self.dd.pair_dict.items(),
|
||||
key=lambda k: k[1]['trained_timestamp'])
|
||||
for pair in pair_dict_sorted:
|
||||
if pair[0] in current_pairlist:
|
||||
best_queue.append(pair[0])
|
||||
for pair in current_pairlist:
|
||||
if pair not in best_queue:
|
||||
best_queue.appendleft(pair)
|
||||
|
||||
logger.info('Set existing queue from trained timestamps. '
|
||||
f'Best approximation queue: {best_queue}')
|
||||
return best_queue
|
||||
|
||||
# Following methods which are overridden by user made prediction models.
|
||||
# See freqai/prediction_models/CatboostPredictionModel.py for an example.
|
||||
|
||||
@abstractmethod
|
||||
def train(self, unfiltered_df: DataFrame, pair: str,
|
||||
dk: FreqaiDataKitchen, **kwargs) -> Any:
|
||||
"""
|
||||
Filter the training data and train a model to it. Train makes heavy use of the datahandler
|
||||
for storing, saving, loading, and analyzing the data.
|
||||
:param unfiltered_df: Full dataframe for the current training period
|
||||
:param metadata: pair metadata from strategy.
|
||||
:return: Trained model which can be used to inference (self.predict)
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def fit(self, data_dictionary: Dict[str, Any], dk: FreqaiDataKitchen, **kwargs) -> Any:
|
||||
"""
|
||||
Most regressors use the same function names and arguments e.g. user
|
||||
can drop in LGBMRegressor in place of CatBoostRegressor and all data
|
||||
management will be properly handled by Freqai.
|
||||
:param data_dictionary: Dict = the dictionary constructed by DataHandler to hold
|
||||
all the training and test data/labels.
|
||||
"""
|
||||
|
||||
return
|
||||
|
||||
@abstractmethod
|
||||
def predict(
|
||||
self, unfiltered_df: DataFrame, dk: FreqaiDataKitchen, **kwargs
|
||||
) -> Tuple[DataFrame, NDArray[np.int_]]:
|
||||
"""
|
||||
Filter the prediction features data and predict with it.
|
||||
:param unfiltered_df: Full dataframe for the current backtest period.
|
||||
:param dk: FreqaiDataKitchen = Data management/analysis tool associated to present pair only
|
||||
:param first: boolean = whether this is the first prediction or not.
|
||||
:return:
|
||||
:predictions: np.array of predictions
|
||||
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
|
||||
data (NaNs) or felt uncertain about data (i.e. SVM and/or DI index)
|
||||
"""
|
@@ -92,6 +92,7 @@ class IFreqaiModel(ABC):
|
||||
self.begin_time_train: float = 0
|
||||
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
|
||||
self.continual_learning = self.freqai_info.get('continual_learning', False)
|
||||
self.plot_features = self.ft_params.get("plot_feature_importances", 0)
|
||||
|
||||
self._threads: List[threading.Thread] = []
|
||||
self._stop_event = threading.Event()
|
||||
@@ -143,8 +144,6 @@ class IFreqaiModel(ABC):
|
||||
strategy, prediction_dataframe=dataframe, pair=metadata["pair"]
|
||||
)
|
||||
dk = self.start_backtesting(dataframe, metadata, self.dk)
|
||||
# else:
|
||||
# dk = self.start_backtesting_live_models(dataframe, metadata, self.dk)
|
||||
|
||||
dataframe = dk.remove_features_from_df(dk.return_dataframe)
|
||||
self.clean_up()
|
||||
@@ -268,8 +267,8 @@ class IFreqaiModel(ABC):
|
||||
if not dk.backtest_live_models:
|
||||
logger.info(
|
||||
f"Training {pair}, {self.pair_it}/{self.total_pairs} pairs"
|
||||
f" from {tr_train_startts_str}"
|
||||
f" to {tr_train_stopts_str}, {train_it}/{total_trains} "
|
||||
f" from {tr_train_startts_str} "
|
||||
f"to {tr_train_stopts_str}, {train_it}/{total_trains} "
|
||||
"trains"
|
||||
)
|
||||
|
||||
@@ -285,9 +284,7 @@ class IFreqaiModel(ABC):
|
||||
|
||||
if dk.check_if_backtest_prediction_exists():
|
||||
self.dd.load_metadata(dk)
|
||||
if not dk.backtest_live_models:
|
||||
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
|
||||
|
||||
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
|
||||
append_df = dk.get_backtesting_prediction()
|
||||
dk.append_predictions(append_df)
|
||||
else:
|
||||
@@ -299,24 +296,29 @@ class IFreqaiModel(ABC):
|
||||
"mode"
|
||||
)
|
||||
dk.find_features(dataframe_train)
|
||||
dk.find_labels(dataframe_train)
|
||||
self.model = self.train(dataframe_train, pair, dk)
|
||||
self.dd.pair_dict[pair]["trained_timestamp"] = int(
|
||||
trained_timestamp.stopts)
|
||||
|
||||
if self.plot_features:
|
||||
plot_feature_importance(self.model, pair, dk, self.plot_features)
|
||||
if self.save_backtest_models:
|
||||
logger.info('Saving backtest model to disk.')
|
||||
self.dd.save_data(self.model, pair, dk)
|
||||
else:
|
||||
logger.info('Saving metadata to disk.')
|
||||
self.dd.save_metadata(dk)
|
||||
else:
|
||||
self.model = self.dd.load_data(pair, dk)
|
||||
|
||||
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
|
||||
|
||||
# self.check_if_feature_list_matches_strategy(dataframe_train, dk)
|
||||
pred_df, do_preds = self.predict(dataframe_backtest, dk)
|
||||
append_df = dk.get_predictions_to_append(pred_df, do_preds)
|
||||
dk.append_predictions(append_df)
|
||||
dk.save_backtesting_prediction(append_df)
|
||||
|
||||
dk.fill_predictions(dataframe)
|
||||
|
||||
return dk
|
||||
|
||||
def start_live(
|
||||
@@ -388,8 +390,7 @@ class IFreqaiModel(ABC):
|
||||
self.dd.return_null_values_to_strategy(dataframe, dk)
|
||||
return dk
|
||||
|
||||
# ensure user is feeding the correct indicators to the model
|
||||
self.check_if_feature_list_matches_strategy(dataframe, dk)
|
||||
dk.find_labels(dataframe)
|
||||
|
||||
self.build_strategy_return_arrays(dataframe, dk, metadata["pair"], trained_timestamp)
|
||||
|
||||
@@ -508,7 +509,7 @@ class IFreqaiModel(ABC):
|
||||
if ft_params.get(
|
||||
"principal_component_analysis", False
|
||||
):
|
||||
dk.pca_transform(self.dk.data_dictionary['prediction_features'])
|
||||
dk.pca_transform(dk.data_dictionary['prediction_features'])
|
||||
|
||||
if ft_params.get("use_SVM_to_remove_outliers", False):
|
||||
dk.use_SVM_to_remove_outliers(predict=True)
|
||||
@@ -519,11 +520,10 @@ class IFreqaiModel(ABC):
|
||||
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
|
||||
dk.use_DBSCAN_to_remove_outliers(predict=True)
|
||||
|
||||
def model_exists(
|
||||
self,
|
||||
dk: FreqaiDataKitchen,
|
||||
scanning: bool = False,
|
||||
) -> bool:
|
||||
# ensure user is feeding the correct indicators to the model
|
||||
self.check_if_feature_list_matches_strategy(dk.data_dictionary['prediction_features'], dk)
|
||||
|
||||
def model_exists(self, dk: FreqaiDataKitchen) -> bool:
|
||||
"""
|
||||
Given a pair and path, check if a model already exists
|
||||
:param pair: pair e.g. BTC/USD
|
||||
@@ -533,9 +533,9 @@ class IFreqaiModel(ABC):
|
||||
"""
|
||||
path_to_modelfile = Path(dk.data_path / f"{dk.model_filename}_model.joblib")
|
||||
file_exists = path_to_modelfile.is_file()
|
||||
if file_exists and not scanning:
|
||||
if file_exists:
|
||||
logger.info("Found model at %s", dk.data_path / dk.model_filename)
|
||||
elif not scanning:
|
||||
else:
|
||||
logger.info("Could not find model at %s", dk.data_path / dk.model_filename)
|
||||
return file_exists
|
||||
|
||||
@@ -582,6 +582,7 @@ class IFreqaiModel(ABC):
|
||||
|
||||
# find the features indicated by strategy and store in datakitchen
|
||||
dk.find_features(unfiltered_dataframe)
|
||||
dk.find_labels(unfiltered_dataframe)
|
||||
|
||||
model = self.train(unfiltered_dataframe, pair, dk)
|
||||
|
||||
@@ -589,8 +590,8 @@ class IFreqaiModel(ABC):
|
||||
dk.set_new_model_names(pair, int(new_trained_timerange.stopts))
|
||||
self.dd.save_data(model, pair, dk)
|
||||
|
||||
if self.freqai_info["feature_parameters"].get("plot_feature_importance", False):
|
||||
plot_feature_importance(model, pair, dk)
|
||||
if self.plot_features:
|
||||
plot_feature_importance(model, pair, dk, self.plot_features)
|
||||
|
||||
if self.freqai_info.get("purge_old_models", False):
|
||||
self.dd.purge_old_models()
|
||||
|
@@ -170,7 +170,7 @@ def plot_feature_importance(model: Any, pair: str, dk: FreqaiDataKitchen,
|
||||
|
||||
# Data preparation
|
||||
fi_df = pd.DataFrame({
|
||||
"feature_names": np.array(dk.training_features_list),
|
||||
"feature_names": np.array(dk.data_dictionary['train_features'].columns),
|
||||
"feature_importance": np.array(feature_importance)
|
||||
})
|
||||
fi_df_top = fi_df.nlargest(count_max, "feature_importance")[::-1]
|
||||
|
Reference in New Issue
Block a user