black formatting on freqai files

This commit is contained in:
robcaulk
2022-07-03 10:59:38 +02:00
parent 106131ff0f
commit ffb39a5029
7 changed files with 508 additions and 427 deletions

View File

@@ -29,6 +29,7 @@ logger = logging.getLogger(__name__)
def threaded(fn):
def wrapper(*args, **kwargs):
threading.Thread(target=fn, args=args, kwargs=kwargs).start()
return wrapper
@@ -46,7 +47,7 @@ class IFreqaiModel(ABC):
self.config = config
self.assert_config(self.config)
self.freqai_info = config["freqai"]
self.data_split_parameters = config.get('freqai', {}).get("data_split_parameters")
self.data_split_parameters = config.get("freqai", {}).get("data_split_parameters")
self.model_training_parameters = config.get("freqai", {}).get("model_training_parameters")
self.feature_parameters = config.get("freqai", {}).get("feature_parameters")
self.time_last_trained = None
@@ -58,23 +59,21 @@ class IFreqaiModel(ABC):
self.first = True
self.update_historic_data = 0
self.set_full_path()
self.follow_mode = self.freqai_info.get('follow_mode', False)
self.follow_mode = self.freqai_info.get("follow_mode", False)
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config, self.follow_mode)
self.lock = threading.Lock()
self.follow_mode = self.freqai_info.get('follow_mode', False)
self.identifier = self.freqai_info.get('identifier', 'no_id_provided')
self.follow_mode = self.freqai_info.get("follow_mode", False)
self.identifier = self.freqai_info.get("identifier", "no_id_provided")
self.scanning = False
self.ready_to_scan = False
self.first = True
self.keras = self.freqai_info.get('keras', False)
self.CONV_WIDTH = self.freqai_info.get('conv_width', 2)
self.keras = self.freqai_info.get("keras", False)
self.CONV_WIDTH = self.freqai_info.get("conv_width", 2)
def assert_config(self, config: Dict[str, Any]) -> None:
if not config.get('freqai', {}):
raise OperationalException(
"No freqai parameters found in configuration file."
)
if not config.get("freqai", {}):
raise OperationalException("No freqai parameters found in configuration file.")
def start(self, dataframe: DataFrame, metadata: dict, strategy: IStrategy) -> DataFrame:
"""
@@ -92,8 +91,7 @@ class IFreqaiModel(ABC):
self.dd.set_pair_dict_info(metadata)
if self.live:
self.dk = FreqaiDataKitchen(self.config, self.dd,
self.live, metadata["pair"])
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
dk = self.start_live(dataframe, metadata, strategy, self.dk)
# For backtesting, each pair enters and then gets trained for each window along the
@@ -103,7 +101,7 @@ class IFreqaiModel(ABC):
# the concatenated results for the full backtesting period back to the strategy.
elif not self.follow_mode:
self.dk = FreqaiDataKitchen(self.config, self.dd, self.live, metadata["pair"])
logger.info(f'Training {len(self.dk.training_timeranges)} timeranges')
logger.info(f"Training {len(self.dk.training_timeranges)} timeranges")
dk = self.start_backtesting(dataframe, metadata, self.dk)
dataframe = self.remove_features_from_df(dk.return_dataframe)
@@ -120,14 +118,13 @@ class IFreqaiModel(ABC):
"""
while 1:
time.sleep(1)
for pair in self.config.get('exchange', {}).get('pair_whitelist'):
for pair in self.config.get("exchange", {}).get("pair_whitelist"):
(_, trained_timestamp, _, _) = self.dd.get_pair_dict_info(pair)
if self.dd.pair_dict[pair]['priority'] != 1:
if self.dd.pair_dict[pair]["priority"] != 1:
continue
dk = FreqaiDataKitchen(self.config, self.dd,
self.live, pair)
dk = FreqaiDataKitchen(self.config, self.dd, self.live, pair)
# file_exists = False
@@ -138,17 +135,21 @@ class IFreqaiModel(ABC):
# model_filename=model_filename,
# scanning=True)
(retrain,
new_trained_timerange,
data_load_timerange) = dk.check_if_new_training_required(trained_timestamp)
(
retrain,
new_trained_timerange,
data_load_timerange,
) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(pair, new_trained_timerange.stopts)
if retrain: # or not file_exists:
self.train_model_in_series(new_trained_timerange, pair,
strategy, dk, data_load_timerange)
self.train_model_in_series(
new_trained_timerange, pair, strategy, dk, data_load_timerange
)
def start_backtesting(self, dataframe: DataFrame, metadata: dict,
dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
def start_backtesting(
self, dataframe: DataFrame, metadata: dict, dk: FreqaiDataKitchen
) -> FreqaiDataKitchen:
"""
The main broad execution for backtesting. For backtesting, each pair enters and then gets
trained for each window along the sliding window defined by "train_period" (training window)
@@ -169,10 +170,8 @@ class IFreqaiModel(ABC):
# tr_backtest is the backtesting time range e.g. the week directly
# following tr_train. Both of these windows slide through the
# entire backtest
for tr_train, tr_backtest in zip(
dk.training_timeranges, dk.backtesting_timeranges
):
(_, _, _, _) = self.dd.get_pair_dict_info(metadata['pair'])
for tr_train, tr_backtest in zip(dk.training_timeranges, dk.backtesting_timeranges):
(_, _, _, _) = self.dd.get_pair_dict_info(metadata["pair"])
gc.collect()
dk.data = {} # clean the pair specific data between training window sliding
self.training_timerange = tr_train
@@ -181,40 +180,48 @@ class IFreqaiModel(ABC):
dataframe_backtest = dk.slice_dataframe(tr_backtest, dataframe)
trained_timestamp = tr_train # TimeRange.parse_timerange(tr_train)
tr_train_startts_str = datetime.datetime.utcfromtimestamp(
tr_train.startts).strftime('%Y-%m-%d %H:%M:%S')
tr_train_stopts_str = datetime.datetime.utcfromtimestamp(
tr_train.stopts).strftime('%Y-%m-%d %H:%M:%S')
tr_train_startts_str = datetime.datetime.utcfromtimestamp(tr_train.startts).strftime(
"%Y-%m-%d %H:%M:%S"
)
tr_train_stopts_str = datetime.datetime.utcfromtimestamp(tr_train.stopts).strftime(
"%Y-%m-%d %H:%M:%S"
)
logger.info("Training %s", metadata["pair"])
logger.info(f'Training {tr_train_startts_str} to {tr_train_stopts_str}')
logger.info(f"Training {tr_train_startts_str} to {tr_train_stopts_str}")
dk.data_path = Path(dk.full_path /
str("sub-train" + "-" + metadata['pair'].split("/")[0] +
str(int(trained_timestamp.stopts))))
if not self.model_exists(metadata["pair"], dk,
trained_timestamp=trained_timestamp.stopts):
self.model = self.train(dataframe_train, metadata['pair'], dk)
self.dd.pair_dict[metadata['pair']][
'trained_timestamp'] = trained_timestamp.stopts
dk.set_new_model_names(metadata['pair'], trained_timestamp)
dk.save_data(self.model, metadata['pair'], keras_model=self.keras)
dk.data_path = Path(
dk.full_path
/ str(
"sub-train"
+ "-"
+ metadata["pair"].split("/")[0]
+ str(int(trained_timestamp.stopts))
)
)
if not self.model_exists(
metadata["pair"], dk, trained_timestamp=trained_timestamp.stopts
):
self.model = self.train(dataframe_train, metadata["pair"], dk)
self.dd.pair_dict[metadata["pair"]]["trained_timestamp"] = trained_timestamp.stopts
dk.set_new_model_names(metadata["pair"], trained_timestamp)
dk.save_data(self.model, metadata["pair"], keras_model=self.keras)
else:
self.model = dk.load_data(metadata['pair'], keras_model=self.keras)
self.model = dk.load_data(metadata["pair"], keras_model=self.keras)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
preds, do_preds = self.predict(dataframe_backtest, dk)
dk.append_predictions(preds, do_preds, len(dataframe_backtest))
print('predictions', len(dk.full_predictions),
'do_predict', len(dk.full_do_predict))
print("predictions", len(dk.full_predictions), "do_predict", len(dk.full_do_predict))
dk.fill_predictions(len(dataframe))
return dk
def start_live(self, dataframe: DataFrame, metadata: dict,
strategy: IStrategy, dk: FreqaiDataKitchen) -> FreqaiDataKitchen:
def start_live(
self, dataframe: DataFrame, metadata: dict, strategy: IStrategy, dk: FreqaiDataKitchen
) -> FreqaiDataKitchen:
"""
The main broad execution for dry/live. This function will check if a retraining should be
performed, and if so, retrain and reset the model.
@@ -232,14 +239,11 @@ class IFreqaiModel(ABC):
self.dd.update_follower_metadata()
# get the model metadata associated with the current pair
(_,
trained_timestamp,
_,
return_null_array) = self.dd.get_pair_dict_info(metadata['pair'])
(_, trained_timestamp, _, return_null_array) = self.dd.get_pair_dict_info(metadata["pair"])
# if the metadata doesnt exist, the follower returns null arrays to strategy
if self.follow_mode and return_null_array:
logger.info('Returning null array from follower to strategy')
logger.info("Returning null array from follower to strategy")
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
@@ -253,16 +257,18 @@ class IFreqaiModel(ABC):
# if not trainable, load existing data
if not self.follow_mode:
(_,
new_trained_timerange,
data_load_timerange) = dk.check_if_new_training_required(trained_timestamp)
dk.set_paths(metadata['pair'], new_trained_timerange.stopts)
(_, new_trained_timerange, data_load_timerange) = dk.check_if_new_training_required(
trained_timestamp
)
dk.set_paths(metadata["pair"], new_trained_timerange.stopts)
# download candle history if it is not already in memory
if not self.dd.historic_data:
logger.info('Downloading all training data for all pairs in whitelist and '
'corr_pairlist, this may take a while if you do not have the '
'data saved')
logger.info(
"Downloading all training data for all pairs in whitelist and "
"corr_pairlist, this may take a while if you do not have the "
"data saved"
)
dk.download_all_data_for_training(data_load_timerange)
dk.load_all_pair_histories(data_load_timerange)
@@ -271,53 +277,47 @@ class IFreqaiModel(ABC):
self.start_scanning(strategy)
elif self.follow_mode:
dk.set_paths(metadata['pair'], trained_timestamp)
logger.info('FreqAI instance set to follow_mode, finding existing pair'
f'using { self.identifier }')
dk.set_paths(metadata["pair"], trained_timestamp)
logger.info(
"FreqAI instance set to follow_mode, finding existing pair"
f"using { self.identifier }"
)
# load the model and associated data into the data kitchen
self.model = dk.load_data(coin=metadata['pair'], keras_model=self.keras)
self.model = dk.load_data(coin=metadata["pair"], keras_model=self.keras)
if not self.model:
logger.warning('No model ready, returning null values to strategy.')
logger.warning("No model ready, returning null values to strategy.")
self.dd.return_null_values_to_strategy(dataframe, dk)
return dk
# ensure user is feeding the correct indicators to the model
self.check_if_feature_list_matches_strategy(dataframe, dk)
self.build_strategy_return_arrays(dataframe, dk, metadata['pair'], trained_timestamp)
self.build_strategy_return_arrays(dataframe, dk, metadata["pair"], trained_timestamp)
return dk
def build_strategy_return_arrays(self, dataframe: DataFrame,
dk: FreqaiDataKitchen, pair: str,
trained_timestamp: int) -> None:
def build_strategy_return_arrays(
self, dataframe: DataFrame, dk: FreqaiDataKitchen, pair: str, trained_timestamp: int
) -> None:
# hold the historical predictions in memory so we are sending back
# correct array to strategy
if pair not in self.dd.model_return_values:
pred_df, do_preds = self.predict(dataframe, dk)
# mypy doesnt like the typing in else statement, so we need to explicitly add to
# dataframe separately
# for label in dk.label_list:
# dataframe[label] = pred_df[label]
# dataframe['do_predict'] = do_preds
# dk.append_predictions(preds, do_preds, len(dataframe))
# dk.fill_predictions(len(dataframe))
self.dd.set_initial_return_values(pair, dk, pred_df, do_preds)
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
return
elif self.dk.check_if_model_expired(trained_timestamp):
pred_df = DataFrame(np.zeros((2, len(dk.label_list))), columns=dk.label_list)
do_preds, dk.DI_values = np.ones(2) * 2, np.zeros(2)
logger.warning('Model expired, returning null values to strategy. Strategy '
'construction should take care to consider this event with '
'prediction == 0 and do_predict == 2')
logger.warning(
"Model expired, returning null values to strategy. Strategy "
"construction should take care to consider this event with "
"prediction == 0 and do_predict == 2"
)
else:
# Only feed in the most recent candle for prediction in live scenario
pred_df, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dk, first=False)
@@ -327,8 +327,9 @@ class IFreqaiModel(ABC):
return
def check_if_feature_list_matches_strategy(self, dataframe: DataFrame,
dk: FreqaiDataKitchen) -> None:
def check_if_feature_list_matches_strategy(
self, dataframe: DataFrame, dk: FreqaiDataKitchen
) -> None:
"""
Ensure user is passing the proper feature set if they are reusing an `identifier` pointing
to a folder holding existing models.
@@ -337,16 +338,18 @@ class IFreqaiModel(ABC):
dk: FreqaiDataKitchen = non-persistent data container/analyzer for current coin/bot loop
"""
dk.find_features(dataframe)
if 'training_features_list_raw' in dk.data:
feature_list = dk.data['training_features_list_raw']
if "training_features_list_raw" in dk.data:
feature_list = dk.data["training_features_list_raw"]
else:
feature_list = dk.training_features_list
if dk.training_features_list != feature_list:
raise OperationalException("Trying to access pretrained model with `identifier` "
"but found different features furnished by current strategy."
"Change `identifer` to train from scratch, or ensure the"
"strategy is furnishing the same features as the pretrained"
"model")
raise OperationalException(
"Trying to access pretrained model with `identifier` "
"but found different features furnished by current strategy."
"Change `identifer` to train from scratch, or ensure the"
"strategy is furnishing the same features as the pretrained"
"model"
)
def data_cleaning_train(self, dk: FreqaiDataKitchen) -> None:
"""
@@ -356,13 +359,13 @@ class IFreqaiModel(ABC):
of how outlier data points are dropped from the dataframe used for training.
"""
if self.freqai_info.get('feature_parameters', {}).get('principal_component_analysis'):
if self.freqai_info.get("feature_parameters", {}).get("principal_component_analysis"):
dk.principal_component_analysis()
if self.freqai_info.get('feature_parameters', {}).get('use_SVM_to_remove_outliers'):
if self.freqai_info.get("feature_parameters", {}).get("use_SVM_to_remove_outliers"):
dk.use_SVM_to_remove_outliers(predict=False)
if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'):
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold"):
dk.data["avg_mean_dist"] = dk.compute_distances()
# if self.feature_parameters["determine_statistical_distributions"]:
@@ -381,13 +384,13 @@ class IFreqaiModel(ABC):
of how the do_predict vector is modified. do_predict is ultimately passed back to strategy
for buy signals.
"""
if self.freqai_info.get('feature_parameters', {}).get('principal_component_analysis'):
if self.freqai_info.get("feature_parameters", {}).get("principal_component_analysis"):
dk.pca_transform(dataframe)
if self.freqai_info.get('feature_parameters', {}).get('use_SVM_to_remove_outliers'):
if self.freqai_info.get("feature_parameters", {}).get("use_SVM_to_remove_outliers"):
dk.use_SVM_to_remove_outliers(predict=True)
if self.freqai_info.get('feature_parameters', {}).get('DI_threshold'):
if self.freqai_info.get("feature_parameters", {}).get("DI_threshold"):
dk.check_if_pred_in_training_spaces()
# if self.feature_parameters["determine_statistical_distributions"]:
@@ -395,8 +398,14 @@ class IFreqaiModel(ABC):
# if self.feature_parameters["remove_outliers"]:
# dk.remove_outliers(predict=True) # creates dropped index
def model_exists(self, pair: str, dk: FreqaiDataKitchen, trained_timestamp: int = None,
model_filename: str = '', scanning: bool = False) -> bool:
def model_exists(
self,
pair: str,
dk: FreqaiDataKitchen,
trained_timestamp: int = None,
model_filename: str = "",
scanning: bool = False,
) -> bool:
"""
Given a pair and path, check if a model already exists
:param pair: pair e.g. BTC/USD
@@ -416,25 +425,33 @@ class IFreqaiModel(ABC):
return file_exists
def set_full_path(self) -> None:
self.full_path = Path(self.config['user_data_dir'] /
"models" /
str(self.freqai_info.get('identifier')))
self.full_path = Path(
self.config["user_data_dir"] / "models" / str(self.freqai_info.get("identifier"))
)
self.full_path.mkdir(parents=True, exist_ok=True)
shutil.copy(self.config['config_files'][0], Path(self.full_path,
Path(self.config['config_files'][0]).name))
shutil.copy(
self.config["config_files"][0],
Path(self.full_path, Path(self.config["config_files"][0]).name),
)
def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
"""
Remove the features from the dataframe before returning it to strategy. This keeps it
compact for Frequi purposes.
"""
to_keep = [col for col in dataframe.columns
if not col.startswith('%') or col.startswith('%%')]
to_keep = [
col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
]
return dataframe[to_keep]
def train_model_in_series(self, new_trained_timerange: TimeRange, pair: str,
strategy: IStrategy, dk: FreqaiDataKitchen,
data_load_timerange: TimeRange):
def train_model_in_series(
self,
new_trained_timerange: TimeRange,
pair: str,
strategy: IStrategy,
dk: FreqaiDataKitchen,
data_load_timerange: TimeRange,
):
"""
Retreive data and train model in single threaded mode (only used if model directory is empty
upon startup for dry/live )
@@ -447,13 +464,13 @@ class IFreqaiModel(ABC):
(larger than new_trained_timerange so that new_trained_timerange does not contain any NaNs)
"""
corr_dataframes, base_dataframes = dk.get_base_and_corr_dataframes(data_load_timerange,
pair)
corr_dataframes, base_dataframes = dk.get_base_and_corr_dataframes(
data_load_timerange, pair
)
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(strategy,
corr_dataframes,
base_dataframes,
pair)
unfiltered_dataframe = dk.use_strategy_to_populate_indicators(
strategy, corr_dataframes, base_dataframes, pair
)
unfiltered_dataframe = dk.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
@@ -462,15 +479,15 @@ class IFreqaiModel(ABC):
model = self.train(unfiltered_dataframe, pair, dk)
self.dd.pair_dict[pair]['trained_timestamp'] = new_trained_timerange.stopts
self.dd.pair_dict[pair]["trained_timestamp"] = new_trained_timerange.stopts
dk.set_new_model_names(pair, new_trained_timerange)
self.dd.pair_dict[pair]['first'] = False
if self.dd.pair_dict[pair]['priority'] == 1 and self.scanning:
self.dd.pair_dict[pair]["first"] = False
if self.dd.pair_dict[pair]["priority"] == 1 and self.scanning:
with self.lock:
self.dd.pair_to_end_of_training_queue(pair)
dk.save_data(model, coin=pair, keras_model=self.keras)
if self.freqai_info.get('purge_old_models', False):
if self.freqai_info.get("purge_old_models", False):
self.dd.purge_old_models()
# self.retrain = False
@@ -503,8 +520,9 @@ class IFreqaiModel(ABC):
return
@abstractmethod
def predict(self, dataframe: DataFrame,
dk: FreqaiDataKitchen, first: bool = True) -> Tuple[DataFrame, npt.ArrayLike]:
def predict(
self, dataframe: DataFrame, dk: FreqaiDataKitchen, first: bool = True
) -> Tuple[DataFrame, npt.ArrayLike]:
"""
Filter the prediction features data and predict with it.
:param: