Merge feat/freqai into develop to get new features

This commit is contained in:
robcaulk
2022-07-03 16:17:13 +02:00
29 changed files with 3894 additions and 8 deletions

View File

@@ -12,7 +12,8 @@ from freqtrade.constants import DEFAULT_CONFIG
ARGS_COMMON = ["verbosity", "logfile", "version", "config", "datadir", "user_data_dir"]
ARGS_STRATEGY = ["strategy", "strategy_path", "recursive_strategy_search"]
ARGS_STRATEGY = ["strategy", "strategy_path", "recursive_strategy_search", "freqaimodel",
"freqaimodel_path"]
ARGS_TRADE = ["db_url", "sd_notify", "dry_run", "dry_run_wallet", "fee"]

View File

@@ -647,4 +647,14 @@ AVAILABLE_CLI_OPTIONS = {
nargs='+',
default=[],
),
"freqaimodel": Arg(
'--freqaimodel',
help='Specify a custom freqaimodels.',
metavar='NAME',
),
"freqaimodel_path": Arg(
'--freqaimodel-path',
help='Specify additional lookup path for freqaimodels.',
metavar='PATH',
),
}

View File

@@ -12,7 +12,7 @@ from freqtrade.enums import CandleType, RunMode, TradingMode
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_minutes
from freqtrade.exchange.exchange import market_is_active
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
from freqtrade.plugins.pairlist.pairlist_helpers import dynamic_expand_pairlist, expand_pairlist
from freqtrade.resolvers import ExchangeResolver
@@ -50,7 +50,8 @@ def start_download_data(args: Dict[str, Any]) -> None:
exchange = ExchangeResolver.load_exchange(config['exchange']['name'], config, validate=False)
markets = [p for p, m in exchange.markets.items() if market_is_active(m)
or config.get('include_inactive')]
expanded_pairs = expand_pairlist(config['pairs'], markets)
expanded_pairs = dynamic_expand_pairlist(config, markets)
# Manual validations of relevant settings
if not config['exchange'].get('skip_pair_validation', False):

View File

@@ -85,6 +85,7 @@ def validate_config_consistency(conf: Dict[str, Any], preliminary: bool = False)
_validate_unlimited_amount(conf)
_validate_ask_orderbook(conf)
validate_migrated_strategy_settings(conf)
_validate_freqai(conf)
# validate configuration before returning
logger.info('Validating configuration ...')
@@ -163,6 +164,21 @@ def _validate_edge(conf: Dict[str, Any]) -> None:
)
def _validate_freqai(conf: Dict[str, Any]) -> None:
"""
Freqai param validator
"""
if not conf.get('freqai', {}):
return
for param in constants.SCHEMA_FREQAI_REQUIRED:
if param not in conf.get('freqai', {}):
raise OperationalException(
f'{param} not found in Freqai config'
)
def _validate_whitelist(conf: Dict[str, Any]) -> None:
"""
Dynamic whitelist does not require pair_whitelist to be set - however StaticWhitelist does.

View File

@@ -97,6 +97,8 @@ class Configuration:
self._process_analyze_options(config)
self._process_freqai_options(config)
# Check if the exchange set by the user is supported
check_exchange(config, config.get('experimental', {}).get('block_bad_exchanges', True))
@@ -461,6 +463,16 @@ class Configuration:
config.update({'runmode': self.runmode})
def _process_freqai_options(self, config: Dict[str, Any]) -> None:
self._args_to_config(config, argname='freqaimodel',
logstring='Using freqaimodel class name: {}')
self._args_to_config(config, argname='freqaimodel_path',
logstring='Using freqaimodel path: {}')
return
def _args_to_config(self, config: Dict[str, Any], argname: str,
logstring: str, logfun: Optional[Callable] = None,
deprecated_msg: Optional[str] = None) -> None:

View File

@@ -55,6 +55,7 @@ FTHYPT_FILEVERSION = 'fthypt_fileversion'
USERPATH_HYPEROPTS = 'hyperopts'
USERPATH_STRATEGIES = 'strategies'
USERPATH_NOTEBOOKS = 'notebooks'
USERPATH_FREQAIMODELS = 'freqaimodels'
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
@@ -472,7 +473,44 @@ CONF_SCHEMA = {
'remove_pumps': {'type': 'boolean'}
},
'required': ['process_throttle_secs', 'allowed_risk']
}
},
"freqai": {
"type": "object",
"properties": {
"timeframes": {"type": "list"},
"train_period": {"type": "integer", "default": 0},
"backtest_period": {"type": "float", "default": 7},
"identifier": {"type": "str", "default": "example"},
"corr_pairlist": {"type": "list"},
"feature_parameters": {
"type": "object",
"properties": {
"period": {"type": "integer"},
"shift": {"type": "integer", "default": 0},
"DI_threshold": {"type": "float", "default": 0},
"weight_factor": {"type": "number", "default": 0},
"principal_component_analysis": {"type": "boolean", "default": False},
"use_SVM_to_remove_outliers": {"type": "boolean", "default": False},
},
},
"data_split_parameters": {
"type": "object",
"properties": {
"test_size": {"type": "number"},
"random_state": {"type": "integer"},
},
},
"model_training_parameters": {
"type": "object",
"properties": {
"n_estimators": {"type": "integer", "default": 2000},
"random_state": {"type": "integer", "default": 1},
"learning_rate": {"type": "number", "default": 0.02},
"task_type": {"type": "string", "default": "CPU"},
},
},
},
},
},
}
@@ -516,6 +554,17 @@ SCHEMA_MINIMAL_REQUIRED = [
'dataformat_trades',
]
SCHEMA_FREQAI_REQUIRED = [
'timeframes',
'train_period',
'backtest_period',
'identifier',
'corr_pairlist',
'feature_parameters',
'data_split_parameters',
'model_training_parameters'
]
CANCEL_REASON = {
"TIMEOUT": "cancelled due to timeout",
"PARTIALLY_FILLED_KEEP_OPEN": "partially filled - keeping order open",

View File

@@ -86,7 +86,7 @@ class Exchange:
# TradingMode.SPOT always supported and not required in this list
]
def __init__(self, config: Dict[str, Any], validate: bool = True) -> None:
def __init__(self, config: Dict[str, Any], validate: bool = True, freqai: bool = False) -> None:
"""
Initializes this module with the given config,
it does basic validation whether the specified exchange and pairs are valid.
@@ -196,7 +196,7 @@ class Exchange:
self.markets_refresh_interval: int = exchange_config.get(
"markets_refresh_interval", 60) * 60
if self.trading_mode != TradingMode.SPOT:
if self.trading_mode != TradingMode.SPOT and freqai is False:
self.fill_leverage_tiers()
self.additional_exchange_init()

View File

@@ -0,0 +1,314 @@
import collections
import json
import logging
import re
import shutil
import threading
from pathlib import Path
from typing import Any, Dict, Tuple
# import pickle as pk
import numpy as np
import pandas as pd
from pandas import DataFrame
# from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
logger = logging.getLogger(__name__)
class FreqaiDataDrawer:
"""
Class aimed at holding all pair models/info in memory for better inferencing/retrainig/saving
/loading to/from disk.
This object remains persistent throughout live/dry, unlike FreqaiDataKitchen, which is
reinstantiated for each coin.
"""
def __init__(self, full_path: Path, config: dict, follow_mode: bool = False):
self.config = config
self.freqai_info = config.get("freqai", {})
# dictionary holding all pair metadata necessary to load in from disk
self.pair_dict: Dict[str, Any] = {}
# dictionary holding all actively inferenced models in memory given a model filename
self.model_dictionary: Dict[str, Any] = {}
self.model_return_values: Dict[str, Any] = {}
self.pair_data_dict: Dict[str, Any] = {}
self.historic_data: Dict[str, Any] = {}
self.follower_dict: Dict[str, Any] = {}
self.full_path = full_path
self.follow_mode = follow_mode
if follow_mode:
self.create_follower_dict()
self.load_drawer_from_disk()
self.training_queue: Dict[str, int] = {}
self.history_lock = threading.Lock()
def load_drawer_from_disk(self):
"""
Locate and load a previously saved data drawer full of all pair model metadata in
present model folder.
:returns:
exists: bool = whether or not the drawer was located
"""
exists = Path(self.full_path / str("pair_dictionary.json")).resolve().exists()
if exists:
with open(self.full_path / str("pair_dictionary.json"), "r") as fp:
self.pair_dict = json.load(fp)
elif not self.follow_mode:
logger.info("Could not find existing datadrawer, starting from scratch")
else:
logger.warning(
f"Follower could not find pair_dictionary at {self.full_path} "
"sending null values back to strategy"
)
return exists
def save_drawer_to_disk(self):
"""
Save data drawer full of all pair model metadata in present model folder.
"""
with open(self.full_path / str("pair_dictionary.json"), "w") as fp:
json.dump(self.pair_dict, fp, default=self.np_encoder)
def save_follower_dict_to_disk(self):
"""
Save follower dictionary to disk (used by strategy for persistent prediction targets)
"""
follower_name = self.config.get("bot_name", "follower1")
with open(
self.full_path / str("follower_dictionary-" + follower_name + ".json"), "w"
) as fp:
json.dump(self.follower_dict, fp, default=self.np_encoder)
def create_follower_dict(self):
"""
Create or dictionary for each follower to maintain unique persistent prediction targets
"""
follower_name = self.config.get("bot_name", "follower1")
whitelist_pairs = self.config.get("exchange", {}).get("pair_whitelist")
exists = (
Path(self.full_path / str("follower_dictionary-" + follower_name + ".json"))
.resolve()
.exists()
)
if exists:
logger.info("Found an existing follower dictionary")
for pair in whitelist_pairs:
self.follower_dict[pair] = {}
with open(
self.full_path / str("follower_dictionary-" + follower_name + ".json"), "w"
) as fp:
json.dump(self.follower_dict, fp, default=self.np_encoder)
def np_encoder(self, object):
if isinstance(object, np.generic):
return object.item()
def get_pair_dict_info(self, pair: str) -> Tuple[str, int, bool, bool]:
"""
Locate and load existing model metadata from persistent storage. If not located,
create a new one and append the current pair to it and prepare it for its first
training
:params:
metadata: dict = strategy furnished pair metadata
:returns:
model_filename: str = unique filename used for loading persistent objects from disk
trained_timestamp: int = the last time the coin was trained
coin_first: bool = If the coin is fresh without metadata
return_null_array: bool = Follower could not find pair metadata
"""
pair_in_dict = self.pair_dict.get(pair)
data_path_set = self.pair_dict.get(pair, {}).get("data_path", None)
return_null_array = False
if pair_in_dict:
model_filename = self.pair_dict[pair]["model_filename"]
trained_timestamp = self.pair_dict[pair]["trained_timestamp"]
coin_first = self.pair_dict[pair]["first"]
elif not self.follow_mode:
self.pair_dict[pair] = {}
model_filename = self.pair_dict[pair]["model_filename"] = ""
coin_first = self.pair_dict[pair]["first"] = True
trained_timestamp = self.pair_dict[pair]["trained_timestamp"] = 0
self.pair_dict[pair]["priority"] = len(self.pair_dict)
if not data_path_set and self.follow_mode:
logger.warning(
f"Follower could not find current pair {pair} in "
f"pair_dictionary at path {self.full_path}, sending null values "
"back to strategy."
)
return_null_array = True
return model_filename, trained_timestamp, coin_first, return_null_array
def set_pair_dict_info(self, metadata: dict) -> None:
pair_in_dict = self.pair_dict.get(metadata["pair"])
if pair_in_dict:
return
else:
self.pair_dict[metadata["pair"]] = {}
self.pair_dict[metadata["pair"]]["model_filename"] = ""
self.pair_dict[metadata["pair"]]["first"] = True
self.pair_dict[metadata["pair"]]["trained_timestamp"] = 0
self.pair_dict[metadata["pair"]]["priority"] = len(self.pair_dict)
return
def pair_to_end_of_training_queue(self, pair: str) -> None:
# march all pairs up in the queue
for p in self.pair_dict:
self.pair_dict[p]["priority"] -= 1
# send pair to end of queue
self.pair_dict[pair]["priority"] = len(self.pair_dict)
def set_initial_return_values(self, pair: str, dk, pred_df, do_preds) -> None:
"""
Set the initial return values to a persistent dataframe. This avoids needing to repredict on
historical candles, and also stores historical predictions despite retrainings (so stored
predictions are true predictions, not just inferencing on trained data)
"""
self.model_return_values[pair] = pd.DataFrame()
for label in dk.label_list:
self.model_return_values[pair][label] = pred_df[label]
self.model_return_values[pair][f"{label}_mean"] = dk.data["labels_mean"][label]
self.model_return_values[pair][f"{label}_std"] = dk.data["labels_std"][label]
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
self.model_return_values[pair]["DI_values"] = dk.DI_values
self.model_return_values[pair]["do_predict"] = do_preds
def append_model_predictions(self, pair: str, predictions, do_preds, dk, len_df) -> None:
# strat seems to feed us variable sized dataframes - and since we are trying to build our
# own return array in the same shape, we need to figure out how the size has changed
# and adapt our stored/returned info accordingly.
length_difference = len(self.model_return_values[pair]) - len_df
i = 0
if length_difference == 0:
i = 1
elif length_difference > 0:
i = length_difference + 1
df = self.model_return_values[pair] = self.model_return_values[pair].shift(-i)
for label in dk.label_list:
df[label].iloc[-1] = predictions[label].iloc[-1]
df[f"{label}_mean"].iloc[-1] = dk.data["labels_mean"][label]
df[f"{label}_std"].iloc[-1] = dk.data["labels_std"][label]
# df['prediction'].iloc[-1] = predictions[-1]
df["do_predict"].iloc[-1] = do_preds[-1]
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
df["DI_values"].iloc[-1] = dk.DI_values[-1]
if length_difference < 0:
prepend_df = pd.DataFrame(
np.zeros((abs(length_difference) - 1, len(df.columns))), columns=df.columns
)
df = pd.concat([prepend_df, df], axis=0)
def attach_return_values_to_return_dataframe(self, pair: str, dataframe) -> DataFrame:
"""
Attach the return values to the strat dataframe
:params:
dataframe: DataFrame = strat dataframe
:returns:
dataframe: DataFrame = strat dataframe with return values attached
"""
df = self.model_return_values[pair]
to_keep = [col for col in dataframe.columns if not col.startswith("&")]
dataframe = pd.concat([dataframe[to_keep], df], axis=1)
return dataframe
def return_null_values_to_strategy(self, dataframe: DataFrame, dk) -> None:
"""
Build 0 filled dataframe to return to strategy
"""
dk.find_features(dataframe)
for label in dk.label_list:
dataframe[label] = 0
dataframe[f"{label}_mean"] = 0
dataframe[f"{label}_std"] = 0
# dataframe['prediction'] = 0
dataframe["do_predict"] = 0
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold", 0) > 0:
dataframe["DI_value"] = 0
dk.return_dataframe = dataframe
def purge_old_models(self) -> None:
model_folders = [x for x in self.full_path.iterdir() if x.is_dir()]
pattern = re.compile(r"sub-train-(\w+)(\d{10})")
delete_dict: Dict[str, Any] = {}
for dir in model_folders:
result = pattern.match(str(dir.name))
if result is None:
break
coin = result.group(1)
timestamp = result.group(2)
if coin not in delete_dict:
delete_dict[coin] = {}
delete_dict[coin]["num_folders"] = 1
delete_dict[coin]["timestamps"] = {int(timestamp): dir}
else:
delete_dict[coin]["num_folders"] += 1
delete_dict[coin]["timestamps"][int(timestamp)] = dir
for coin in delete_dict:
if delete_dict[coin]["num_folders"] > 2:
sorted_dict = collections.OrderedDict(
sorted(delete_dict[coin]["timestamps"].items())
)
num_delete = len(sorted_dict) - 2
deleted = 0
for k, v in sorted_dict.items():
if deleted >= num_delete:
break
logger.info(f"Freqai purging old model file {v}")
shutil.rmtree(v)
deleted += 1
def update_follower_metadata(self):
# follower needs to load from disk to get any changes made by leader to pair_dict
self.load_drawer_from_disk()
if self.config.get("freqai", {}).get("purge_old_models", False):
self.purge_old_models()
# to be used if we want to send predictions directly to the follower instead of forcing
# follower to load models and inference
# def save_model_return_values_to_disk(self) -> None:
# with open(self.full_path / str('model_return_values.json'), "w") as fp:
# json.dump(self.model_return_values, fp, default=self.np_encoder)
# def load_model_return_values_from_disk(self, dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
# exists = Path(self.full_path / str('model_return_values.json')).resolve().exists()
# if exists:
# with open(self.full_path / str('model_return_values.json'), "r") as fp:
# self.model_return_values = json.load(fp)
# elif not self.follow_mode:
# logger.info("Could not find existing datadrawer, starting from scratch")
# else:
# logger.warning(f'Follower could not find pair_dictionary at {self.full_path} '
# 'sending null values back to strategy')
# return exists, dk

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,559 @@
# import contextlib
import datetime
import gc
import logging
import shutil
import threading
import time
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, Tuple
import numpy as np
import numpy.typing as npt
import pandas as pd
from pandas import DataFrame
from freqtrade.configuration import TimeRange
from freqtrade.enums import RunMode
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.data_drawer import FreqaiDataDrawer
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.strategy.interface import IStrategy
pd.options.mode.chained_assignment = None
logger = logging.getLogger(__name__)
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs).start()
return wrapper
class IFreqaiModel(ABC):
"""
Class containing all tools for training and prediction in the strategy.
User models should inherit from this class as shown in
templates/ExamplePredictionModel.py where the user overrides
train(), predict(), fit(), and make_labels().
Author: Robert Caulk, rob.caulk@gmail.com
"""
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
self.assert_config(self.config)
self.freqai_info = config["freqai"]
self.data_split_parameters = config.get("freqai", {}).get("data_split_parameters")
self.model_training_parameters = config.get("freqai", {}).get("model_training_parameters")
self.feature_parameters = config.get("freqai", {}).get("feature_parameters")
self.time_last_trained = None
self.current_time = None
self.model = None
self.predictions = None
self.training_on_separate_thread = False
self.retrain = False
self.first = True
self.update_historic_data = 0
self.set_full_path()
self.follow_mode = self.freqai_info.get("follow_mode", False)
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
self.lock = threading.Lock()
self.follow_mode = self.freqai_info.get("follow_mode", False)
self.identifier = self.freqai_info.get("identifier", "no_id_provided")
self.scanning = False
self.ready_to_scan = False
self.first = True
self.keras = self.freqai_info.get("keras", False)
self.CONV_WIDTH = self.freqai_info.get("conv_width", 2)
def assert_config(self, config: Dict[str, Any]) -> None:
if not config.get("freqai", {}):
raise OperationalException("No freqai parameters found in configuration file.")
def start(self, dataframe: DataFrame, metadata: dict, strategy: IStrategy) -> DataFrame:
"""
Entry point to the FreqaiModel from a specific pair, it will train a new model if
necessary before making the prediction.
:params:
:dataframe: Full dataframe coming from strategy - it contains entire
backtesting timerange + additional historical data necessary to train
the model.
:metadata: pair metadata coming from strategy.
"""
self.live = strategy.dp.runmode in (RunMode.DRY_RUN, RunMode.LIVE)
self.dd.set_pair_dict_info(metadata)
if self.live:
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
dk = self.start_live(dataframe, metadata, strategy, self.dk)
# For backtesting, each pair enters and then gets trained for each window along the
# sliding window defined by "train_period" (training window) and "backtest_period"
# (backtest window, i.e. window immediately following the training window).
# FreqAI slides the window and sequentially builds the backtesting results before returning
# the concatenated results for the full backtesting period back to the strategy.
elif not self.follow_mode:
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
dk = self.start_backtesting(dataframe, metadata, self.dk)
dataframe = self.remove_features_from_df(dk.return_dataframe)
return self.return_values(dataframe, dk)
@threaded
def start_scanning(self, strategy: IStrategy) -> None:
"""
Function designed to constantly scan pairs for retraining on a separate thread (intracandle)
to improve model youth. This function is agnostic to data preparation/collection/storage,
it simply trains on what ever data is available in the self.dd.
:params:
strategy: IStrategy = The user defined strategy class
"""
while 1:
time.sleep(1)
for pair in self.config.get("exchange", {}).get("pair_whitelist"):
(_, trained_timestamp, _, _) = self.dd.get_pair_dict_info(pair)
if self.dd.pair_dict[pair]["priority"] != 1:
continue
dk = FreqaiDataKitchen(self.config, self.dd, self.live, pair)
# file_exists = False
dk.set_paths(pair, trained_timestamp)
# file_exists = self.model_exists(pair,
# dk,
# trained_timestamp=trained_timestamp,
# model_filename=model_filename,
# scanning=True)
(
retrain,
new_trained_timerange,
data_load_timerange,
) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
if retrain: # or not file_exists:
self.train_model_in_series(
new_trained_timerange, pair, strategy, dk, data_load_timerange
)
def start_backtesting(
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
) -> FreqaiDataKitchen:
"""
The main broad execution for backtesting. For backtesting, each pair enters and then gets
trained for each window along the sliding window defined by "train_period" (training window)
and "backtest_period" (backtest window, i.e. window immediately following the
training window). FreqAI slides the window and sequentially builds the backtesting results
before returning the concatenated results for the full backtesting period back to the
strategy.
:params:
dataframe: DataFrame = strategy passed dataframe
metadata: Dict = pair metadata
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:returns:
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
"""
# Loop enforcing the sliding window training/backtesting paradigm
# tr_train is the training time range e.g. 1 historical month
# tr_backtest is the backtesting time range e.g. the week directly
# following tr_train. Both of these windows slide through the
# entire backtest
for tr_train, tr_backtest in zip(dk.training_timeranges, dk.backtesting_timeranges):
(_, _, _, _) = self.dd.get_pair_dict_info(metadata["pair"])
gc.collect()
dk.data = {} # clean the pair specific data between training window sliding
self.training_timerange = tr_train
# self.training_timerange_timerange = tr_train
dataframe_train = dk.slice_dataframe(tr_train, dataframe)
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
trained_timestamp = tr_train # TimeRange.parse_timerange(tr_train)
tr_train_startts_str = datetime.datetime.utcfromtimestamp(tr_train.startts).strftime(
"%Y-%m-%d %H:%M:%S"
)
tr_train_stopts_str = datetime.datetime.utcfromtimestamp(tr_train.stopts).strftime(
"%Y-%m-%d %H:%M:%S"
)
logger.info("Training %s", metadata["pair"])
logger.info(f"Training {tr_train_startts_str} to {tr_train_stopts_str}")
dk.data_path = Path(
dk.full_path
/ str(
"sub-train"
+ "-"
+ metadata["pair"].split("/")[0]
+ str(int(trained_timestamp.stopts))
)
)
if not self.model_exists(
metadata["pair"], dk, trained_timestamp=trained_timestamp.stopts
):
self.model = self.train(dataframe_train, metadata["pair"], dk)
self.dd.pair_dict[metadata["pair"]]["trained_timestamp"] = trained_timestamp.stopts
dk.set_new_model_names(metadata["pair"], trained_timestamp)
dk.save_data(self.model, metadata["pair"], keras_model=self.keras)
else:
self.model = dk.load_data(metadata["pair"], keras_model=self.keras)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
preds, do_preds = self.predict(dataframe_backtest, dk)
dk.append_predictions(preds, do_preds, len(dataframe_backtest))
print("predictions", len(dk.full_predictions), "do_predict", len(dk.full_do_predict))
dk.fill_predictions(len(dataframe))
return dk
def start_live(
self, dataframe: DataFrame, metadata: dict, strategy: IStrategy, dk: FreqaiDataKitchen
) -> FreqaiDataKitchen:
"""
The main broad execution for dry/live. This function will check if a retraining should be
performed, and if so, retrain and reset the model.
:params:
dataframe: DataFrame = strategy passed dataframe
metadata: Dict = pair metadata
strategy: IStrategy = currently employed strategy
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:returns:
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
"""
# update follower
if self.follow_mode:
self.dd.update_follower_metadata()
# get the model metadata associated with the current pair
(_, trained_timestamp, _, return_null_array) = self.dd.get_pair_dict_info(metadata["pair"])
# if the metadata doesnt exist, the follower returns null arrays to strategy
if self.follow_mode and return_null_array:
logger.info("Returning null array from follower to strategy")
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
# append the historic data once per round
if self.dd.historic_data:
dk.update_historic_data(strategy)
logger.debug(f'Updating historic data on pair {metadata["pair"]}')
# if trainable, check if model needs training, if so compute new timerange,
# then save model and metadata.
# if not trainable, load existing data
if not self.follow_mode:
(_, new_trained_timerange, data_load_timerange) = dk.check_if_new_training_required(
trained_timestamp
)
dk.set_paths(metadata["pair"], new_trained_timerange.stopts)
# download candle history if it is not already in memory
if not self.dd.historic_data:
logger.info(
"Downloading all training data for all pairs in whitelist and "
"corr_pairlist, this may take a while if you do not have the "
"data saved"
)
dk.download_all_data_for_training(data_load_timerange)
dk.load_all_pair_histories(data_load_timerange)
if not self.scanning:
self.scanning = True
self.start_scanning(strategy)
elif self.follow_mode:
dk.set_paths(metadata["pair"], trained_timestamp)
logger.info(
"FreqAI instance set to follow_mode, finding existing pair"
f"using { self.identifier }"
)
# load the model and associated data into the data kitchen
self.model = dk.load_data(coin=metadata["pair"], keras_model=self.keras)
if not self.model:
logger.warning("No model ready, returning null values to strategy.")
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
# ensure user is feeding the correct indicators to the model
self.check_if_feature_list_matches_strategy(dataframe, dk)
self.build_strategy_return_arrays(dataframe, dk, metadata["pair"], trained_timestamp)
return dk
def build_strategy_return_arrays(
self, dataframe: DataFrame, dk: FreqaiDataKitchen, pair: str, trained_timestamp: int
) -> None:
# hold the historical predictions in memory so we are sending back
# correct array to strategy
if pair not in self.dd.model_return_values:
pred_df, do_preds = self.predict(dataframe, dk)
self.dd.set_initial_return_values(pair, dk, pred_df, do_preds)
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
return
elif self.dk.check_if_model_expired(trained_timestamp):
pred_df = DataFrame(np.zeros((2, len(dk.label_list))), columns=dk.label_list)
do_preds, dk.DI_values = np.ones(2) * 2, np.zeros(2)
logger.warning(
"Model expired, returning null values to strategy. Strategy "
"construction should take care to consider this event with "
"prediction == 0 and do_predict == 2"
)
else:
# Only feed in the most recent candle for prediction in live scenario
pred_df, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dk, first=False)
self.dd.append_model_predictions(pair, pred_df, do_preds, dk, len(dataframe))
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
return
def check_if_feature_list_matches_strategy(
self, dataframe: DataFrame, dk: FreqaiDataKitchen
) -> None:
"""
Ensure user is passing the proper feature set if they are reusing an `identifier` pointing
to a folder holding existing models.
:params:
dataframe: DataFrame = strategy provided dataframe
dk: FreqaiDataKitchen = non-persistent data container/analyzer for current coin/bot loop
"""
dk.find_features(dataframe)
if "training_features_list_raw" in dk.data:
feature_list = dk.data["training_features_list_raw"]
else:
feature_list = dk.training_features_list
if dk.training_features_list != feature_list:
raise OperationalException(
"Trying to access pretrained model with `identifier` "
"but found different features furnished by current strategy."
"Change `identifer` to train from scratch, or ensure the"
"strategy is furnishing the same features as the pretrained"
"model"
)
def data_cleaning_train(self, dk: FreqaiDataKitchen) -> None:
"""
Base data cleaning method for train
Any function inside this method should drop training data points from the filtered_dataframe
based on user decided logic. See FreqaiDataKitchen::remove_outliers() for an example
of how outlier data points are dropped from the dataframe used for training.
"""
if self.freqai_info.get("feature_parameters", {}).get("principal_component_analysis"):
dk.principal_component_analysis()
if self.freqai_info.get("feature_parameters", {}).get("use_SVM_to_remove_outliers"):
dk.use_SVM_to_remove_outliers(predict=False)
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold"):
dk.data["avg_mean_dist"] = dk.compute_distances()
# if self.feature_parameters["determine_statistical_distributions"]:
# dk.determine_statistical_distributions()
# if self.feature_parameters["remove_outliers"]:
# dk.remove_outliers(predict=False)
def data_cleaning_predict(self, dk: FreqaiDataKitchen, dataframe: DataFrame) -> None:
"""
Base data cleaning method for predict.
These functions each modify dk.do_predict, which is a dataframe with equal length
to the number of candles coming from and returning to the strategy. Inside do_predict,
1 allows prediction and < 0 signals to the strategy that the model is not confident in
the prediction.
See FreqaiDataKitchen::remove_outliers() for an example
of how the do_predict vector is modified. do_predict is ultimately passed back to strategy
for buy signals.
"""
if self.freqai_info.get("feature_parameters", {}).get("principal_component_analysis"):
dk.pca_transform(dataframe)
if self.freqai_info.get("feature_parameters", {}).get("use_SVM_to_remove_outliers"):
dk.use_SVM_to_remove_outliers(predict=True)
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold"):
dk.check_if_pred_in_training_spaces()
# if self.feature_parameters["determine_statistical_distributions"]:
# dk.determine_statistical_distributions()
# if self.feature_parameters["remove_outliers"]:
# dk.remove_outliers(predict=True) # creates dropped index
def model_exists(
self,
pair: str,
dk: FreqaiDataKitchen,
trained_timestamp: int = None,
model_filename: str = "",
scanning: bool = False,
) -> bool:
"""
Given a pair and path, check if a model already exists
:param pair: pair e.g. BTC/USD
:param path: path to model
"""
coin, _ = pair.split("/")
if not self.live:
dk.model_filename = model_filename = "cb_" + coin.lower() + "_" + str(trained_timestamp)
path_to_modelfile = Path(dk.data_path / str(model_filename + "_model.joblib"))
file_exists = path_to_modelfile.is_file()
if file_exists and not scanning:
logger.info("Found model at %s", dk.data_path / dk.model_filename)
elif not scanning:
logger.info("Could not find model at %s", dk.data_path / dk.model_filename)
return file_exists
def set_full_path(self) -> None:
self.full_path = Path(
self.config["user_data_dir"] / "models" / str(self.freqai_info.get("identifier"))
)
self.full_path.mkdir(parents=True, exist_ok=True)
shutil.copy(
self.config["config_files"][0],
Path(self.full_path, Path(self.config["config_files"][0]).name),
)
def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
"""
Remove the features from the dataframe before returning it to strategy. This keeps it
compact for Frequi purposes.
"""
to_keep = [
col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
]
return dataframe[to_keep]
def train_model_in_series(
self,
new_trained_timerange: TimeRange,
pair: str,
strategy: IStrategy,
dk: FreqaiDataKitchen,
data_load_timerange: TimeRange,
):
"""
Retreive data and train model in single threaded mode (only used if model directory is empty
upon startup for dry/live )
:params:
new_trained_timerange: TimeRange = the timerange to train the model on
metadata: dict = strategy provided metadata
strategy: IStrategy = user defined strategy object
dk: FreqaiDataKitchen = non-persistent data container for current coin/loop
data_load_timerange: TimeRange = the amount of data to be loaded for populate_any_indicators
(larger than new_trained_timerange so that new_trained_timerange does not contain any NaNs)
"""
corr_dataframes, base_dataframes = dk.get_base_and_corr_dataframes(
data_load_timerange, pair
)
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
strategy, corr_dataframes, base_dataframes, pair
)
unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
# find the features indicated by strategy and store in datakitchen
dk.find_features(unfiltered_dataframe)
model = self.train(unfiltered_dataframe, pair, dk)
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange)
self.dd.pair_dict[pair]["first"] = False
if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning:
with self.lock:
self.dd.pair_to_end_of_training_queue(pair)
dk.save_data(model, coin=pair, keras_model=self.keras)
if self.freqai_info.get("purge_old_models", False):
self.dd.purge_old_models()
# self.retrain = False
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModlel.py for an example.
@abstractmethod
def train(self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahandler
for storing, saving, loading, and analyzing the data.
:params:
:unfiltered_dataframe: Full dataframe for the current training period
:metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
@abstractmethod
def fit(self) -> Any:
"""
Most regressors use the same function names and arguments e.g. user
can drop in LGBMRegressor in place of CatBoostRegressor and all data
management will be properly handled by Freqai.
:params:
data_dictionary: Dict = the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
return
@abstractmethod
def predict(
self, dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = True
) -> Tuple[DataFrame, npt.ArrayLike]:
"""
Filter the prediction features data and predict with it.
:param:
unfiltered_dataframe: Full dataframe for the current backtest period.
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (i.e. SVM and/or DI index)
"""
def make_labels(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User defines the labels here (target values).
:params:
dataframe: DataFrame = the full dataframe for the present training period
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
"""
return
@abstractmethod
def return_values(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User defines the dataframe to be returned to strategy here.
:params:
dataframe: DataFrame = the full dataframe for the current prediction (live)
or --timerange (backtesting)
dk: FreqaiDataKitchen = Data management/analysis tool assoicated to present pair only
:returns:
dataframe: DataFrame = dataframe filled with user defined data
"""
return

View File

@@ -0,0 +1,154 @@
import logging
from typing import Any, Dict, Tuple
from catboost import CatBoostRegressor, Pool
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
logger = logging.getLogger(__name__)
class CatboostPredictionModel(IFreqaiModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def return_values(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User uses this function to add any additional return values to the dataframe.
e.g.
dataframe['volatility'] = dk.volatility_values
"""
return dataframe
def make_labels(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User defines the labels here (target values).
:params:
:dataframe: the full dataframe for the present training period
"""
dataframe["s"] = (
dataframe["close"]
.shift(-self.feature_parameters["period"])
.rolling(self.feature_parameters["period"])
.mean()
/ dataframe["close"]
- 1
)
return dataframe["s"]
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahkitchen
for storing, saving, loading, and analyzing the data.
:params:
:unfiltered_dataframe: Full dataframe for the current training period
:metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info("--------------------Starting training " f"{pair} --------------------")
# unfiltered_labels = self.make_labels(unfiltered_dataframe, dk)
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_dataframe,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
dk.fit_labels() # fit labels to a cauchy distribution so we know what to expect in strategy
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
)
logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
model = self.fit(data_dictionary)
logger.info(f"--------------------done training {pair}--------------------")
return model
def fit(self, data_dictionary: Dict) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
train_data = Pool(
data=data_dictionary["train_features"],
label=data_dictionary["train_labels"],
weight=data_dictionary["train_weights"],
)
test_data = Pool(
data=data_dictionary["test_features"],
label=data_dictionary["test_labels"],
weight=data_dictionary["test_weights"],
)
model = CatBoostRegressor(
allow_writing_files=False,
verbose=100,
early_stopping_rounds=400,
**self.model_training_parameters,
)
model.fit(X=train_data, eval_set=test_data)
return model
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = False
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, dk.training_features_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk, filtered_dataframe)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
pred_df = DataFrame(predictions, columns=dk.label_list)
for label in dk.label_list:
pred_df[label] = (
(pred_df[label] + 1)
* (dk.data["labels_max"][label] - dk.data["labels_min"][label])
/ 2
) + dk.data["labels_min"][label]
return (pred_df, dk.do_predict)

View File

@@ -0,0 +1,133 @@
import logging
from typing import Any, Dict, Tuple
from catboost import CatBoostRegressor # , Pool
from pandas import DataFrame
from sklearn.multioutput import MultiOutputRegressor
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
logger = logging.getLogger(__name__)
class CatboostPredictionMultiModel(IFreqaiModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def return_values(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User uses this function to add any additional return values to the dataframe.
e.g.
dataframe['volatility'] = dk.volatility_values
"""
return dataframe
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahkitchen
for storing, saving, loading, and analyzing the data.
:params:
:unfiltered_dataframe: Full dataframe for the current training period
:metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info("--------------------Starting training " f"{pair} --------------------")
# unfiltered_labels = self.make_labels(unfiltered_dataframe, dk)
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_dataframe,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
dk.fit_labels() # fit labels to a cauchy distribution so we know what to expect in strategy
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
)
logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
model = self.fit(data_dictionary)
logger.info(f"--------------------done training {pair}--------------------")
return model
def fit(self, data_dictionary: Dict) -> Any:
"""
User sets up the training and test data to fit their desired model here
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
cbr = CatBoostRegressor(
allow_writing_files=False,
gpu_ram_part=0.5,
verbose=100,
early_stopping_rounds=400,
**self.model_training_parameters,
)
X = data_dictionary["train_features"]
y = data_dictionary["train_labels"]
# eval_set = (data_dictionary["test_features"], data_dictionary["test_labels"])
sample_weight = data_dictionary["train_weights"]
model = MultiOutputRegressor(estimator=cbr)
model.fit(X=X, y=y, sample_weight=sample_weight) # , eval_set=eval_set)
return model
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = False
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, dk.training_features_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk, filtered_dataframe)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
pred_df = DataFrame(predictions, columns=dk.label_list)
for label in dk.label_list:
pred_df[label] = (
(pred_df[label] + 1)
* (dk.data["labels_max"][label] - dk.data["labels_min"][label])
/ 2
) + dk.data["labels_min"][label]
return (pred_df, dk.do_predict)

View File

@@ -0,0 +1,127 @@
import logging
from typing import Any, Dict, Tuple
from lightgbm import LGBMRegressor
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.freqai_interface import IFreqaiModel
logger = logging.getLogger(__name__)
class LightGBMPredictionModel(IFreqaiModel):
"""
User created prediction model. The class needs to override three necessary
functions, predict(), train(), fit(). The class inherits ModelHandler which
has its own DataHandler where data is held, saved, loaded, and managed.
"""
def return_values(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
"""
User uses this function to add any additional return values to the dataframe.
e.g.
dataframe['volatility'] = dk.volatility_values
"""
return dataframe
def train(
self, unfiltered_dataframe: DataFrame, pair: str, dk: FreqaiDataKitchen
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahkitchen
for storing, saving, loading, and analyzing the data.
:params:
:unfiltered_dataframe: Full dataframe for the current training period
:metadata: pair metadata from strategy.
:returns:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info("--------------------Starting training " f"{pair} --------------------")
# unfiltered_labels = self.make_labels(unfiltered_dataframe, dk)
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
unfiltered_dataframe,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
dk.fit_labels() # fit labels to a cauchy distribution so we know what to expect in strategy
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
)
logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
model = self.fit(data_dictionary)
logger.info(f"--------------------done training {pair}--------------------")
return model
def fit(self, data_dictionary: Dict) -> Any:
"""
Most regressors use the same function names and arguments e.g. user
can drop in LGBMRegressor in place of CatBoostRegressor and all data
management will be properly handled by Freqai.
:params:
:data_dictionary: the dictionary constructed by DataHandler to hold
all the training and test data/labels.
"""
eval_set = (data_dictionary["test_features"], data_dictionary["test_labels"])
X = data_dictionary["train_features"]
y = data_dictionary["train_labels"]
model = LGBMRegressor(seed=42, n_estimators=2000, verbosity=1, force_col_wise=True)
model.fit(X=X, y=y, eval_set=eval_set)
return model
def predict(
self, unfiltered_dataframe: DataFrame, dk: FreqaiDataKitchen
) -> Tuple[DataFrame, DataFrame]:
"""
Filter the prediction features data and predict with it.
:param: unfiltered_dataframe: Full dataframe for the current backtest period.
:return:
:predictions: np.array of predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
# logger.info("--------------------Starting prediction--------------------")
original_feature_list = dk.find_features(unfiltered_dataframe)
filtered_dataframe, _ = dk.filter_features(
unfiltered_dataframe, original_feature_list, training_filter=False
)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk, filtered_dataframe)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
pred_df = DataFrame(predictions, columns=dk.label_list)
for label in dk.label_list:
pred_df[label] = (
(pred_df[label] + 1)
* (dk.data["labels_max"][label] - dk.data["labels_min"][label])
/ 2
) + dk.data["labels_min"][label]
return (pred_df, dk.do_predict)

View File

@@ -0,0 +1,12 @@
from freqtrade.resolvers.freqaimodel_resolver import FreqaiModelResolver
class CustomModel:
"""
A bridge between the user defined IFreqaiModel class
and the strategy.
"""
def __init__(self, config):
self.bridge = FreqaiModelResolver.load_freqaimodel(config)

View File

@@ -206,6 +206,11 @@ class Backtesting:
"""
self.progress.init_step(BacktestState.DATALOAD, 1)
if self.config.get('freqai') is not None:
self.required_startup += int(self.config.get('freqai', {}).get('startup_candles', 1000))
logger.info(f'Increasing startup_candle_count for freqai to {self.required_startup}')
self.config['startup_candle_count'] = self.required_startup
data = history.load_data(
datadir=self.config['datadir'],
pairs=self.pairlists.whitelist,

View File

@@ -40,3 +40,14 @@ def expand_pairlist(wildcardpl: List[str], available_pairs: List[str],
except re.error as err:
raise ValueError(f"Wildcard error in {pair_wc}, {err}")
return result
def dynamic_expand_pairlist(config: dict, markets: list) -> List[str]:
if config.get('freqai', {}):
full_pairs = config['pairs'] + [pair for pair in config['freqai']['corr_pairlist']
if pair not in config['pairs']]
expanded_pairs = expand_pairlist(full_pairs, markets)
else:
expanded_pairs = expand_pairlist(config['pairs'], markets)
return expanded_pairs

View File

@@ -18,7 +18,8 @@ class ExchangeResolver(IResolver):
object_type = Exchange
@staticmethod
def load_exchange(exchange_name: str, config: dict, validate: bool = True) -> Exchange:
def load_exchange(exchange_name: str, config: dict, validate: bool = True,
freqai: bool = False) -> Exchange:
"""
Load the custom class from config parameter
:param exchange_name: name of the Exchange to load
@@ -31,7 +32,8 @@ class ExchangeResolver(IResolver):
try:
exchange = ExchangeResolver._load_exchange(exchange_name,
kwargs={'config': config,
'validate': validate})
'validate': validate,
'freqai': freqai})
except ImportError:
logger.info(
f"No {exchange_name} specific subclass found. Using the generic class instead.")

View File

@@ -0,0 +1,50 @@
# pragma pylint: disable=attribute-defined-outside-init
"""
This module load a custom model for freqai
"""
import logging
from pathlib import Path
from typing import Dict
from freqtrade.constants import USERPATH_FREQAIMODELS
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.resolvers import IResolver
logger = logging.getLogger(__name__)
class FreqaiModelResolver(IResolver):
"""
This class contains all the logic to load custom hyperopt loss class
"""
object_type = IFreqaiModel
object_type_str = "FreqaiModel"
user_subdir = USERPATH_FREQAIMODELS
initial_search_path = Path(__file__).parent.parent.joinpath(
"freqai/prediction_models").resolve()
@staticmethod
def load_freqaimodel(config: Dict) -> IFreqaiModel:
"""
Load the custom class from config parameter
:param config: configuration dictionary
"""
freqaimodel_name = config.get("freqaimodel")
if not freqaimodel_name:
raise OperationalException(
"No freqaimodel set. Please use `--freqaimodel` to "
"specify the FreqaiModel class to use.\n"
)
freqaimodel = FreqaiModelResolver.load_object(
freqaimodel_name,
config,
kwargs={"config": config},
extra_dir=config.get("freqaimodel_path"),
)
return freqaimodel

View File

@@ -546,6 +546,23 @@ class IStrategy(ABC, HyperStrategyMixin):
"""
return None
def populate_any_indicators(self, basepair: str, pair: str, df: DataFrame, tf: str,
informative: DataFrame = None, coin: str = "",
set_generalized_indicators: bool = False) -> DataFrame:
"""
Function designed to automatically generate, name and merge features
from user indicated timeframes in the configuration file. User can add
additional features here, but must follow the naming convention.
Defined in IStrategy because Freqai needs to know it exists.
:params:
:pair: pair to be used as informative
:df: strategy dataframe which will receive merges from informatives
:tf: timeframe of the dataframe which will modify the feature names
:informative: the dataframe associated with the informative pair
:coin: the name of the coin which will modify the feature names.
"""
return df
###
# END - Intended to be overridden by strategy
###

View File

@@ -0,0 +1,342 @@
import logging
from functools import reduce
import pandas as pd
import talib.abstract as ta
from pandas import DataFrame
from technical import qtpylib
from freqtrade.exchange import timeframe_to_prev_date
from freqtrade.freqai.strategy_bridge import CustomModel
from freqtrade.persistence import Trade
from freqtrade.strategy import DecimalParameter, IntParameter, merge_informative_pair
from freqtrade.strategy.interface import IStrategy
logger = logging.getLogger(__name__)
class FreqaiExampleStrategy(IStrategy):
"""
Example strategy showing how the user connects their own
IFreqaiModel to the strategy. Namely, the user uses:
self.model = CustomModel(self.config)
self.model.bridge.start(dataframe, metadata)
to make predictions on their data. populate_any_indicators() automatically
generates the variety of features indicated by the user in the
canonical freqtrade configuration file under config['freqai'].
"""
minimal_roi = {"0": 0.1, "240": -1}
plot_config = {
"main_plot": {},
"subplots": {
"prediction": {"prediction": {"color": "blue"}},
"target_roi": {
"target_roi": {"color": "brown"},
},
"do_predict": {
"do_predict": {"color": "brown"},
},
},
}
process_only_new_candles = True
stoploss = -0.05
use_exit_signal = True
startup_candle_count: int = 300
can_short = False
linear_roi_offset = DecimalParameter(
0.00, 0.02, default=0.005, space="sell", optimize=False, load=True
)
max_roi_time_long = IntParameter(0, 800, default=400, space="sell", optimize=False, load=True)
def informative_pairs(self):
whitelist_pairs = self.dp.current_whitelist()
corr_pairs = self.config["freqai"]["corr_pairlist"]
informative_pairs = []
for tf in self.config["freqai"]["timeframes"]:
for pair in whitelist_pairs:
informative_pairs.append((pair, tf))
for pair in corr_pairs:
if pair in whitelist_pairs:
continue # avoid duplication
informative_pairs.append((pair, tf))
return informative_pairs
def bot_start(self):
self.model = CustomModel(self.config)
def populate_any_indicators(
self, metadata, pair, df, tf, informative=None, coin="", set_generalized_indicators=False
):
"""
Function designed to automatically generate, name and merge features
from user indicated timeframes in the configuration file. User controls the indicators
passed to the training/prediction by prepending indicators with `'%-' + coin `
(see convention below). I.e. user should not prepend any supporting metrics
(e.g. bb_lowerband below) with % unless they explicitly want to pass that metric to the
model.
:params:
:pair: pair to be used as informative
:df: strategy dataframe which will receive merges from informatives
:tf: timeframe of the dataframe which will modify the feature names
:informative: the dataframe associated with the informative pair
:coin: the name of the coin which will modify the feature names.
"""
with self.model.bridge.lock:
if informative is None:
informative = self.dp.get_pair_dataframe(pair, tf)
# first loop is automatically duplicating indicators for time periods
for t in self.freqai_info["feature_parameters"]["indicator_periods"]:
t = int(t)
informative[f"%-{coin}rsi-period_{t}"] = ta.RSI(informative, timeperiod=t)
informative[f"%-{coin}mfi-period_{t}"] = ta.MFI(informative, timeperiod=t)
informative[f"%-{coin}adx-period_{t}"] = ta.ADX(informative, window=t)
informative[f"{coin}20sma-period_{t}"] = ta.SMA(informative, timeperiod=t)
informative[f"{coin}21ema-period_{t}"] = ta.EMA(informative, timeperiod=t)
informative[f"%-{coin}close_over_20sma-period_{t}"] = (
informative["close"] / informative[f"{coin}20sma-period_{t}"]
)
informative[f"%-{coin}mfi-period_{t}"] = ta.MFI(informative, timeperiod=t)
bollinger = qtpylib.bollinger_bands(
qtpylib.typical_price(informative), window=t, stds=2.2
)
informative[f"{coin}bb_lowerband-period_{t}"] = bollinger["lower"]
informative[f"{coin}bb_middleband-period_{t}"] = bollinger["mid"]
informative[f"{coin}bb_upperband-period_{t}"] = bollinger["upper"]
informative[f"%-{coin}bb_width-period_{t}"] = (
informative[f"{coin}bb_upperband-period_{t}"]
- informative[f"{coin}bb_lowerband-period_{t}"]
) / informative[f"{coin}bb_middleband-period_{t}"]
informative[f"%-{coin}close-bb_lower-period_{t}"] = (
informative["close"] / informative[f"{coin}bb_lowerband-period_{t}"]
)
informative[f"%-{coin}roc-period_{t}"] = ta.ROC(informative, timeperiod=t)
macd = ta.MACD(informative, timeperiod=t)
informative[f"%-{coin}macd-period_{t}"] = macd["macd"]
informative[f"%-{coin}relative_volume-period_{t}"] = (
informative["volume"] / informative["volume"].rolling(t).mean()
)
informative[f"%-{coin}pct-change"] = informative["close"].pct_change()
informative[f"%-{coin}raw_volume"] = informative["volume"]
informative[f"%-{coin}raw_price"] = informative["close"]
indicators = [col for col in informative if col.startswith("%")]
# This loop duplicates and shifts all indicators to add a sense of recency to data
for n in range(self.freqai_info["feature_parameters"]["shift"] + 1):
if n == 0:
continue
informative_shift = informative[indicators].shift(n)
informative_shift = informative_shift.add_suffix("_shift-" + str(n))
informative = pd.concat((informative, informative_shift), axis=1)
df = merge_informative_pair(df, informative, self.config["timeframe"], tf, ffill=True)
skip_columns = [
(s + "_" + tf) for s in ["date", "open", "high", "low", "close", "volume"]
]
df = df.drop(columns=skip_columns)
# Add generalized indicators here (because in live, it will call this
# function to populate indicators during training). Notice how we ensure not to
# add them multiple times
if set_generalized_indicators:
df["%-day_of_week"] = (df["date"].dt.dayofweek + 1) / 7
df["%-hour_of_day"] = (df["date"].dt.hour + 1) / 25
# user adds targets here by prepending them with &- (see convention below)
# If user wishes to use multiple targets, a multioutput prediction model
# needs to be used such as templates/CatboostPredictionMultiModel.py
df["&-s_close"] = (
df["close"]
.shift(-self.freqai_info["feature_parameters"]["period"])
.rolling(self.freqai_info["feature_parameters"]["period"])
.mean()
/ df["close"]
- 1
)
return df
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
self.freqai_info = self.config["freqai"]
self.pair = metadata["pair"]
sgi = True
# the following loops are necessary for building the features
# indicated by the user in the configuration file.
# All indicators must be populated by populate_any_indicators() for live functionality
# to work correctly.
for tf in self.freqai_info["timeframes"]:
dataframe = self.populate_any_indicators(
metadata,
self.pair,
dataframe.copy(),
tf,
coin=self.pair.split("/")[0] + "-",
set_generalized_indicators=sgi,
)
sgi = False
for pair in self.freqai_info["corr_pairlist"]:
if metadata["pair"] in pair:
continue # do not include whitelisted pair twice if it is in corr_pairlist
dataframe = self.populate_any_indicators(
metadata, pair, dataframe.copy(), tf, coin=pair.split("/")[0] + "-"
)
# the model will return 4 values, its prediction, an indication of whether or not the
# prediction should be accepted, the target mean/std values from the labels used during
# each training period.
dataframe = self.model.bridge.start(dataframe, metadata, self)
dataframe["target_roi"] = dataframe["&-s_close_mean"] + dataframe["&-s_close_std"] * 1.25
dataframe["sell_roi"] = dataframe["&-s_close_mean"] - dataframe["&-s_close_std"] * 1.25
return dataframe
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
enter_long_conditions = [df["do_predict"] == 1, df["&-s_close"] > df["target_roi"]]
if enter_long_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"]
] = (1, "long")
enter_short_conditions = [df["do_predict"] == 1, df["&-s_close"] < df["sell_roi"]]
if enter_short_conditions:
df.loc[
reduce(lambda x, y: x & y, enter_short_conditions), ["enter_short", "enter_tag"]
] = (1, "short")
return df
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
exit_long_conditions = [df["do_predict"] == 1, df["&-s_close"] < df["sell_roi"] * 0.25]
if exit_long_conditions:
df.loc[reduce(lambda x, y: x & y, exit_long_conditions), "exit_long"] = 1
exit_short_conditions = [df["do_predict"] == 1, df["&-s_close"] > df["target_roi"] * 0.25]
if exit_short_conditions:
df.loc[reduce(lambda x, y: x & y, exit_short_conditions), "exit_short"] = 1
return df
def get_ticker_indicator(self):
return int(self.config["timeframe"][:-1])
def custom_exit(
self, pair: str, trade: Trade, current_time, current_rate, current_profit, **kwargs
):
dataframe, _ = self.dp.get_analyzed_dataframe(pair=pair, timeframe=self.timeframe)
trade_date = timeframe_to_prev_date(self.config["timeframe"], trade.open_date_utc)
trade_candle = dataframe.loc[(dataframe["date"] == trade_date)]
if trade_candle.empty:
return None
trade_candle = trade_candle.squeeze()
follow_mode = self.config.get("freqai", {}).get("follow_mode", False)
if not follow_mode:
pair_dict = self.model.bridge.dd.pair_dict
else:
pair_dict = self.model.bridge.dd.follower_dict
entry_tag = trade.enter_tag
if (
"prediction" + entry_tag not in pair_dict[pair]
or pair_dict[pair]["prediction" + entry_tag] > 0
):
with self.model.bridge.lock:
pair_dict[pair]["prediction" + entry_tag] = abs(trade_candle["&-s_close"])
if not follow_mode:
self.model.bridge.dd.save_drawer_to_disk()
else:
self.model.bridge.dd.save_follower_dict_to_disk()
roi_price = pair_dict[pair]["prediction" + entry_tag]
roi_time = self.max_roi_time_long.value
roi_decay = roi_price * (
1 - ((current_time - trade.open_date_utc).seconds) / (roi_time * 60)
)
if roi_decay < 0:
roi_decay = self.linear_roi_offset.value
else:
roi_decay += self.linear_roi_offset.value
if current_profit > roi_decay:
return "roi_custom_win"
if current_profit < -roi_decay:
return "roi_custom_loss"
def confirm_trade_exit(
self,
pair: str,
trade: Trade,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
exit_reason: str,
current_time,
**kwargs,
) -> bool:
entry_tag = trade.enter_tag
follow_mode = self.config.get("freqai", {}).get("follow_mode", False)
if not follow_mode:
pair_dict = self.model.bridge.dd.pair_dict
else:
pair_dict = self.model.bridge.dd.follower_dict
with self.model.bridge.lock:
pair_dict[pair]["prediction" + entry_tag] = 0
if not follow_mode:
self.model.bridge.dd.save_drawer_to_disk()
else:
self.model.bridge.dd.save_follower_dict_to_disk()
return True
def confirm_trade_entry(
self,
pair: str,
order_type: str,
amount: float,
rate: float,
time_in_force: str,
current_time,
entry_tag,
side: str,
**kwargs,
) -> bool:
df, _ = self.dp.get_analyzed_dataframe(pair, self.timeframe)
last_candle = df.iloc[-1].squeeze()
if side == "long":
if rate > (last_candle["close"] * (1 + 0.0025)):
return False
else:
if rate < (last_candle["close"] * (1 - 0.0025)):
return False
return True