diff --git a/docs/configuration.md b/docs/configuration.md
index b3dbcd817..556414e21 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -225,7 +225,7 @@ Mandatory parameters are marked as **Required**, which means that they are requi
| `webhook.webhookexitcancel` | Payload to send on exit order cancel. Only required if `webhook.enabled` is `true`. See the [webhook documentation](webhook-config.md) for more details.
**Datatype:** String
| `webhook.webhookexitfill` | Payload to send on exit order filled. Only required if `webhook.enabled` is `true`. See the [webhook documentation](webhook-config.md) for more details.
**Datatype:** String
| `webhook.webhookstatus` | Payload to send on status calls. Only required if `webhook.enabled` is `true`. See the [webhook documentation](webhook-config.md) for more details.
**Datatype:** String
-| | **Rest API / FreqUI / External Signals**
+| | **Rest API / FreqUI / Producer-Consumer**
| `api_server.enabled` | Enable usage of API Server. See the [API Server documentation](rest-api.md) for more details.
**Datatype:** Boolean
| `api_server.listen_ip_address` | Bind IP address. See the [API Server documentation](rest-api.md) for more details.
**Datatype:** IPv4
| `api_server.listen_port` | Bind Port. See the [API Server documentation](rest-api.md) for more details.
**Datatype:** Integer between 1024 and 65535
diff --git a/freqtrade/freqai/base_models/BaseClassifierModel.py b/freqtrade/freqai/base_models/BaseClassifierModel.py
index 5142ffb0d..70f212d2a 100644
--- a/freqtrade/freqai/base_models/BaseClassifierModel.py
+++ b/freqtrade/freqai/base_models/BaseClassifierModel.py
@@ -1,4 +1,5 @@
import logging
+from time import time
from typing import Any, Tuple
import numpy as np
@@ -32,7 +33,9 @@ class BaseClassifierModel(IFreqaiModel):
:model: Trained model which can be used to inference (self.predict)
"""
- logger.info("-------------------- Starting training " f"{pair} --------------------")
+ logger.info(f"-------------------- Starting training {pair} --------------------")
+
+ start_time = time()
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
@@ -45,10 +48,10 @@ class BaseClassifierModel(IFreqaiModel):
start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d")
end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d")
logger.info(f"-------------------- Training on data from {start_date} to "
- f"{end_date}--------------------")
+ f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
- if not self.freqai_info.get('fit_live_predictions', 0) or not self.live:
+ if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -57,13 +60,16 @@ class BaseClassifierModel(IFreqaiModel):
self.data_cleaning_train(dk)
logger.info(
- f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
+ f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
- logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
+ logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
model = self.fit(data_dictionary, dk)
- logger.info(f"--------------------done training {pair}--------------------")
+ end_time = time()
+
+ logger.info(f"-------------------- Done training {pair} "
+ f"({end_time - start_time:.2f} secs) --------------------")
return model
diff --git a/freqtrade/freqai/base_models/BaseRegressionModel.py b/freqtrade/freqai/base_models/BaseRegressionModel.py
index 1d87e42c0..2450bf305 100644
--- a/freqtrade/freqai/base_models/BaseRegressionModel.py
+++ b/freqtrade/freqai/base_models/BaseRegressionModel.py
@@ -1,4 +1,5 @@
import logging
+from time import time
from typing import Any, Tuple
import numpy as np
@@ -31,7 +32,9 @@ class BaseRegressionModel(IFreqaiModel):
:model: Trained model which can be used to inference (self.predict)
"""
- logger.info("-------------------- Starting training " f"{pair} --------------------")
+ logger.info(f"-------------------- Starting training {pair} --------------------")
+
+ start_time = time()
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
@@ -44,10 +47,10 @@ class BaseRegressionModel(IFreqaiModel):
start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d")
end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d")
logger.info(f"-------------------- Training on data from {start_date} to "
- f"{end_date}--------------------")
+ f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
- if not self.freqai_info.get('fit_live_predictions', 0) or not self.live:
+ if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -56,13 +59,16 @@ class BaseRegressionModel(IFreqaiModel):
self.data_cleaning_train(dk)
logger.info(
- f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
+ f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
- logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
+ logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
model = self.fit(data_dictionary, dk)
- logger.info(f"--------------------done training {pair}--------------------")
+ end_time = time()
+
+ logger.info(f"-------------------- Done training {pair} "
+ f"({end_time - start_time:.2f} secs) --------------------")
return model
diff --git a/freqtrade/freqai/base_models/BaseTensorFlowModel.py b/freqtrade/freqai/base_models/BaseTensorFlowModel.py
index eea80f3a2..00f9d6cba 100644
--- a/freqtrade/freqai/base_models/BaseTensorFlowModel.py
+++ b/freqtrade/freqai/base_models/BaseTensorFlowModel.py
@@ -1,4 +1,5 @@
import logging
+from time import time
from typing import Any
from pandas import DataFrame
@@ -28,7 +29,9 @@ class BaseTensorFlowModel(IFreqaiModel):
:model: Trained model which can be used to inference (self.predict)
"""
- logger.info("-------------------- Starting training " f"{pair} --------------------")
+ logger.info(f"-------------------- Starting training {pair} --------------------")
+
+ start_time = time()
# filter the features requested by user in the configuration file and elegantly handle NaNs
features_filtered, labels_filtered = dk.filter_features(
@@ -41,10 +44,10 @@ class BaseTensorFlowModel(IFreqaiModel):
start_date = unfiltered_df["date"].iloc[0].strftime("%Y-%m-%d")
end_date = unfiltered_df["date"].iloc[-1].strftime("%Y-%m-%d")
logger.info(f"-------------------- Training on data from {start_date} to "
- f"{end_date}--------------------")
+ f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
- if not self.freqai_info.get('fit_live_predictions', 0) or not self.live:
+ if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
@@ -53,12 +56,15 @@ class BaseTensorFlowModel(IFreqaiModel):
self.data_cleaning_train(dk)
logger.info(
- f'Training model on {len(dk.data_dictionary["train_features"].columns)}' " features"
+ f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
- logger.info(f'Training model on {len(data_dictionary["train_features"])} data points')
+ logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
model = self.fit(data_dictionary, dk)
- logger.info(f"--------------------done training {pair}--------------------")
+ end_time = time()
+
+ logger.info(f"-------------------- Done training {pair} "
+ f"({end_time - start_time:.2f} secs) --------------------")
return model
diff --git a/freqtrade/freqai/base_models/FreqaiMultiOutputRegressor.py b/freqtrade/freqai/base_models/FreqaiMultiOutputRegressor.py
index a9db81e31..54136d5e0 100644
--- a/freqtrade/freqai/base_models/FreqaiMultiOutputRegressor.py
+++ b/freqtrade/freqai/base_models/FreqaiMultiOutputRegressor.py
@@ -1,4 +1,3 @@
-
from joblib import Parallel
from sklearn.multioutput import MultiOutputRegressor, _fit_estimator
from sklearn.utils.fixes import delayed
diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py
index 5850cdeb3..bdc418083 100644
--- a/freqtrade/freqai/freqai_interface.py
+++ b/freqtrade/freqai/freqai_interface.py
@@ -244,7 +244,8 @@ class IFreqaiModel(ABC):
# following tr_train. Both of these windows slide through the
# entire backtest
for tr_train, tr_backtest in zip(dk.training_timeranges, dk.backtesting_timeranges):
- (_, _, _) = self.dd.get_pair_dict_info(metadata["pair"])
+ pair = metadata["pair"]
+ (_, _, _) = self.dd.get_pair_dict_info(pair)
train_it += 1
total_trains = len(dk.backtesting_timeranges)
self.training_timerange = tr_train
@@ -266,12 +267,10 @@ class IFreqaiModel(ABC):
trained_timestamp_int = int(trained_timestamp.stopts)
dk.data_path = Path(
- dk.full_path
- /
- f"sub-train-{metadata['pair'].split('/')[0]}_{trained_timestamp_int}"
+ dk.full_path / f"sub-train-{pair.split('/')[0]}_{trained_timestamp_int}"
)
- dk.set_new_model_names(metadata["pair"], trained_timestamp)
+ dk.set_new_model_names(pair, trained_timestamp)
if dk.check_if_backtest_prediction_exists():
append_df = dk.get_backtesting_prediction()
@@ -281,15 +280,15 @@ class IFreqaiModel(ABC):
metadata["pair"], dk, trained_timestamp=trained_timestamp_int
):
dk.find_features(dataframe_train)
- self.model = self.train(dataframe_train, metadata["pair"], dk)
- self.dd.pair_dict[metadata["pair"]]["trained_timestamp"] = int(
+ self.model = self.train(dataframe_train, pair, dk)
+ self.dd.pair_dict[pair]["trained_timestamp"] = int(
trained_timestamp.stopts)
if self.save_backtest_models:
logger.info('Saving backtest model to disk.')
- self.dd.save_data(self.model, metadata["pair"], dk)
+ self.dd.save_data(self.model, pair, dk)
else:
- self.model = self.dd.load_data(metadata["pair"], dk)
+ self.model = self.dd.load_data(pair, dk)
self.check_if_feature_list_matches_strategy(dataframe_train, dk)
diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py
index a57fac144..99ba39f76 100644
--- a/freqtrade/rpc/external_message_consumer.py
+++ b/freqtrade/rpc/external_message_consumer.py
@@ -217,11 +217,14 @@ class ExternalMessageConsumer:
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
-
continue
- except websockets.exceptions.ConnectionClosedOK:
- # Successfully closed, just keep trying to connect again indefinitely
+ except (
+ websockets.exceptions.ConnectionClosedError,
+ websockets.exceptions.ConnectionClosedOK
+ ):
+ # Just keep trying to connect again indefinitely
+ await asyncio.sleep(self.sleep_time)
continue
except Exception as e:
diff --git a/tests/rpc/test_rpc_emc.py b/tests/rpc/test_rpc_emc.py
index 9aca88b4a..41faaf249 100644
--- a/tests/rpc/test_rpc_emc.py
+++ b/tests/rpc/test_rpc_emc.py
@@ -200,43 +200,60 @@ async def test_emc_create_connection_success(default_conf, caplog, mocker):
emc.shutdown()
-# async def test_emc_create_connection_invalid(default_conf, caplog, mocker):
-# default_conf.update({
-# "external_message_consumer": {
-# "enabled": True,
-# "producers": [
-# {
-# "name": "default",
-# "host": _TEST_WS_HOST,
-# "port": _TEST_WS_PORT,
-# "ws_token": _TEST_WS_TOKEN
-# }
-# ],
-# "wait_timeout": 60,
-# "ping_timeout": 60,
-# "sleep_timeout": 60
-# }
-# })
-#
-# mocker.patch('freqtrade.rpc.external_message_consumer.ExternalMessageConsumer.start',
-# MagicMock())
-#
-# test_producer = default_conf['external_message_consumer']['producers'][0]
-# lock = asyncio.Lock()
-#
-# dp = DataProvider(default_conf, None, None, None)
-# emc = ExternalMessageConsumer(default_conf, dp)
-#
-# try:
-# # Test invalid URL
-# test_producer['url'] = "tcp://null:8080/api/v1/message/ws"
-# emc._running = True
-# await emc._create_connection(test_producer, lock)
-# emc._running = False
-#
-# assert log_has_re(r".+is an invalid WebSocket URL.+", caplog)
-# finally:
-# emc.shutdown()
+async def test_emc_create_connection_invalid_port(default_conf, caplog, mocker):
+ default_conf.update({
+ "external_message_consumer": {
+ "enabled": True,
+ "producers": [
+ {
+ "name": "default",
+ "host": _TEST_WS_HOST,
+ "port": -1,
+ "ws_token": _TEST_WS_TOKEN
+ }
+ ],
+ "wait_timeout": 60,
+ "ping_timeout": 60,
+ "sleep_timeout": 60
+ }
+ })
+
+ dp = DataProvider(default_conf, None, None, None)
+ emc = ExternalMessageConsumer(default_conf, dp)
+
+ try:
+ await asyncio.sleep(0.01)
+ assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
+ finally:
+ emc.shutdown()
+
+
+async def test_emc_create_connection_invalid_host(default_conf, caplog, mocker):
+ default_conf.update({
+ "external_message_consumer": {
+ "enabled": True,
+ "producers": [
+ {
+ "name": "default",
+ "host": "10000.1241..2121/",
+ "port": _TEST_WS_PORT,
+ "ws_token": _TEST_WS_TOKEN
+ }
+ ],
+ "wait_timeout": 60,
+ "ping_timeout": 60,
+ "sleep_timeout": 60
+ }
+ })
+
+ dp = DataProvider(default_conf, None, None, None)
+ emc = ExternalMessageConsumer(default_conf, dp)
+
+ try:
+ await asyncio.sleep(0.01)
+ assert log_has_re(r".+ is an invalid WebSocket URL .+", caplog)
+ finally:
+ emc.shutdown()
async def test_emc_create_connection_error(default_conf, caplog, mocker):
@@ -376,7 +393,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
"ws_token": _TEST_WS_TOKEN
}
],
- "wait_timeout": 1,
+ "wait_timeout": 0.1,
"ping_timeout": 1,
"sleep_time": 1
}
@@ -396,7 +413,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
class TestChannel:
async def recv(self, *args, **kwargs):
- await asyncio.sleep(10)
+ await asyncio.sleep(0.2)
async def ping(self, *args, **kwargs):
return asyncio.Future()