From a323acf343ba1f5622cd9e47ea13905eb97df4f4 Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 30 Oct 2022 09:46:12 +0100 Subject: [PATCH 01/15] Improve ShufflePairlist to shuffle only once per candle --- docs/includes/pairlists.md | 12 ++++++++++++ freqtrade/plugins/pairlist/IPairList.py | 1 - freqtrade/plugins/pairlist/ShuffleFilter.py | 17 +++++++++++++++-- tests/plugins/test_pairlist.py | 21 +++++++++++++++++---- 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/docs/includes/pairlists.md b/docs/includes/pairlists.md index 7dff75a02..5c1bf8274 100644 --- a/docs/includes/pairlists.md +++ b/docs/includes/pairlists.md @@ -286,6 +286,18 @@ Min price precision for SHITCOIN/BTC is 8 decimals. If its price is 0.00000011 - Shuffles (randomizes) pairs in the pairlist. It can be used for preventing the bot from trading some of the pairs more frequently then others when you want all pairs be treated with the same priority. +By default, ShuffleFilter will shuffle pairs once per candle. +To shuffle on every iteration, set `"shuffle"` to `"iteration"` instead of the default of `"candle"`. + +``` json + { + "method": "ShuffleFilter", + "shuffle": "candle", + "seed": 42 + } + +``` + !!! Tip You may set the `seed` value for this Pairlist to obtain reproducible results, which can be useful for repeated backtesting sessions. If `seed` is not set, the pairs are shuffled in the non-repeatable random order. ShuffleFilter will automatically detect runmodes and apply the `seed` only for backtesting modes - if a `seed` value is set. diff --git a/freqtrade/plugins/pairlist/IPairList.py b/freqtrade/plugins/pairlist/IPairList.py index 660d6228c..d0382c778 100644 --- a/freqtrade/plugins/pairlist/IPairList.py +++ b/freqtrade/plugins/pairlist/IPairList.py @@ -36,7 +36,6 @@ class IPairList(LoggingMixin, ABC): self._pairlistconfig = pairlistconfig self._pairlist_pos = pairlist_pos self.refresh_period = self._pairlistconfig.get('refresh_period', 1800) - self._last_refresh = 0 LoggingMixin.__init__(self, logger, self.refresh_period) @property diff --git a/freqtrade/plugins/pairlist/ShuffleFilter.py b/freqtrade/plugins/pairlist/ShuffleFilter.py index 1bc114d4e..e67ef52e8 100644 --- a/freqtrade/plugins/pairlist/ShuffleFilter.py +++ b/freqtrade/plugins/pairlist/ShuffleFilter.py @@ -3,16 +3,20 @@ Shuffle pair list filter """ import logging import random -from typing import Any, Dict, List +from typing import Any, Dict, List, Literal from freqtrade.constants import Config from freqtrade.enums import RunMode +from freqtrade.exchange import timeframe_to_seconds from freqtrade.exchange.types import Tickers from freqtrade.plugins.pairlist.IPairList import IPairList +from freqtrade.util.periodic_cache import PeriodicCache logger = logging.getLogger(__name__) +ShuffleValues = Literal['candle', 'iteration'] + class ShuffleFilter(IPairList): @@ -31,6 +35,9 @@ class ShuffleFilter(IPairList): logger.info(f"Backtesting mode detected, applying seed value: {self._seed}") self._random = random.Random(self._seed) + self._shuffle: ShuffleValues = pairlistconfig.get('shuffle', 'candle') + self.__pairlist_cache = PeriodicCache( + maxsize=1000, ttl=timeframe_to_seconds(self._config['timeframe'])) @property def needstickers(self) -> bool: @@ -45,7 +52,7 @@ class ShuffleFilter(IPairList): """ Short whitelist method description - used for startup-messages """ - return (f"{self.name} - Shuffling pairs" + + return (f"{self.name} - Shuffling pairs every {self._shuffle}" + (f", seed = {self._seed}." if self._seed is not None else ".")) def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]: @@ -56,7 +63,13 @@ class ShuffleFilter(IPairList): :param tickers: Tickers (from exchange.get_tickers). May be cached. :return: new whitelist """ + pairlist_bef = tuple(pairlist) + pairlist_new = self.__pairlist_cache.get(pairlist_bef) + if pairlist_new and self._shuffle == 'candle': + # Use cached pairlist. + return pairlist_new # Shuffle is done inplace self._random.shuffle(pairlist) + self.__pairlist_cache[pairlist_bef] = pairlist return pairlist diff --git a/tests/plugins/test_pairlist.py b/tests/plugins/test_pairlist.py index f0b983063..359291476 100644 --- a/tests/plugins/test_pairlist.py +++ b/tests/plugins/test_pairlist.py @@ -2,6 +2,8 @@ import logging import time +from copy import deepcopy +from datetime import timedelta from unittest.mock import MagicMock, PropertyMock import pandas as pd @@ -719,15 +721,26 @@ def test_PerformanceFilter_error(mocker, whitelist_conf, caplog) -> None: def test_ShuffleFilter_init(mocker, whitelist_conf, caplog) -> None: whitelist_conf['pairlists'] = [ {"method": "StaticPairList"}, - {"method": "ShuffleFilter", "seed": 42} + {"method": "ShuffleFilter", "seed": 43} ] exchange = get_patched_exchange(mocker, whitelist_conf) - PairListManager(exchange, whitelist_conf) - assert log_has("Backtesting mode detected, applying seed value: 42", caplog) + plm = PairListManager(exchange, whitelist_conf) + assert log_has("Backtesting mode detected, applying seed value: 43", caplog) + + with time_machine.travel("2021-09-01 05:01:00 +00:00") as t: + plm.refresh_pairlist() + pl1 = deepcopy(plm.whitelist) + plm.refresh_pairlist() + assert plm.whitelist == pl1 + + t.shift(timedelta(minutes=10)) + plm.refresh_pairlist() + assert plm.whitelist != pl1 + caplog.clear() whitelist_conf['runmode'] = RunMode.DRY_RUN - PairListManager(exchange, whitelist_conf) + plm = PairListManager(exchange, whitelist_conf) assert not log_has("Backtesting mode detected, applying seed value: 42", caplog) assert log_has("Live mode detected, not applying seed.", caplog) From 5013351143489d3eb48507340a651eadf6b5abed Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 30 Oct 2022 09:48:55 +0100 Subject: [PATCH 02/15] Rename "shuffle" parameter to "shuffle_freq" --- docs/includes/pairlists.md | 4 ++-- freqtrade/plugins/pairlist/ShuffleFilter.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/includes/pairlists.md b/docs/includes/pairlists.md index 5c1bf8274..dd3149e98 100644 --- a/docs/includes/pairlists.md +++ b/docs/includes/pairlists.md @@ -287,12 +287,12 @@ Min price precision for SHITCOIN/BTC is 8 decimals. If its price is 0.00000011 - Shuffles (randomizes) pairs in the pairlist. It can be used for preventing the bot from trading some of the pairs more frequently then others when you want all pairs be treated with the same priority. By default, ShuffleFilter will shuffle pairs once per candle. -To shuffle on every iteration, set `"shuffle"` to `"iteration"` instead of the default of `"candle"`. +To shuffle on every iteration, set `"shuffle_frequency"` to `"iteration"` instead of the default of `"candle"`. ``` json { "method": "ShuffleFilter", - "shuffle": "candle", + "shuffle_frequency": "candle", "seed": 42 } diff --git a/freqtrade/plugins/pairlist/ShuffleFilter.py b/freqtrade/plugins/pairlist/ShuffleFilter.py index e67ef52e8..76d7600d2 100644 --- a/freqtrade/plugins/pairlist/ShuffleFilter.py +++ b/freqtrade/plugins/pairlist/ShuffleFilter.py @@ -35,7 +35,7 @@ class ShuffleFilter(IPairList): logger.info(f"Backtesting mode detected, applying seed value: {self._seed}") self._random = random.Random(self._seed) - self._shuffle: ShuffleValues = pairlistconfig.get('shuffle', 'candle') + self._shuffle_freq: ShuffleValues = pairlistconfig.get('shuffle_frequency', 'candle') self.__pairlist_cache = PeriodicCache( maxsize=1000, ttl=timeframe_to_seconds(self._config['timeframe'])) @@ -52,7 +52,7 @@ class ShuffleFilter(IPairList): """ Short whitelist method description - used for startup-messages """ - return (f"{self.name} - Shuffling pairs every {self._shuffle}" + + return (f"{self.name} - Shuffling pairs every {self._shuffle_freq}" + (f", seed = {self._seed}." if self._seed is not None else ".")) def filter_pairlist(self, pairlist: List[str], tickers: Tickers) -> List[str]: @@ -65,7 +65,7 @@ class ShuffleFilter(IPairList): """ pairlist_bef = tuple(pairlist) pairlist_new = self.__pairlist_cache.get(pairlist_bef) - if pairlist_new and self._shuffle == 'candle': + if pairlist_new and self._shuffle_freq == 'candle': # Use cached pairlist. return pairlist_new # Shuffle is done inplace From 2afa185dc61142d686d834cf592ab72e174297b9 Mon Sep 17 00:00:00 2001 From: robcaulk Date: Wed, 2 Nov 2022 18:34:56 +0100 Subject: [PATCH 03/15] add integrated tests for PCA and DBSCAN --- tests/freqai/test_freqai_interface.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/freqai/test_freqai_interface.py b/tests/freqai/test_freqai_interface.py index c46f9e815..2bc65d52e 100644 --- a/tests/freqai/test_freqai_interface.py +++ b/tests/freqai/test_freqai_interface.py @@ -27,13 +27,13 @@ def is_mac() -> bool: return "Darwin" in machine -@pytest.mark.parametrize('model', [ - 'LightGBMRegressor', - 'XGBoostRegressor', - 'XGBoostRFRegressor', - 'CatboostRegressor', +@pytest.mark.parametrize('model, pca, dbscan', [ + ('LightGBMRegressor', True, False), + ('XGBoostRegressor', False, True), + ('XGBoostRFRegressor', False, False), + ('CatboostRegressor', False, False), ]) -def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model): +def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model, pca, dbscan): if is_arm() and model == 'CatboostRegressor': pytest.skip("CatBoost is not supported on ARM") @@ -41,6 +41,8 @@ def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model): freqai_conf.update({"freqaimodel": model}) freqai_conf.update({"timerange": "20180110-20180130"}) freqai_conf.update({"strategy": "freqai_test_strat"}) + freqai_conf['freqai']['feature_parameters'].update({"principal_component_analysis": pca}) + freqai_conf['freqai']['feature_parameters'].update({"use_DBSCAN_to_remove_outliers": dbscan}) strategy = get_patched_freqai_strategy(mocker, freqai_conf) exchange = get_patched_exchange(mocker, freqai_conf) From 23b6915dde7364ec8387655380d9d3d7c46c3fba Mon Sep 17 00:00:00 2001 From: Wagner Costa Santos Date: Wed, 2 Nov 2022 15:49:51 -0300 Subject: [PATCH 04/15] fix issue with different backtesting prediction size --- freqtrade/freqai/data_kitchen.py | 24 +++++++++++++++++------- freqtrade/freqai/freqai_interface.py | 2 +- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index f0e24dd80..581ab78c0 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -1312,14 +1312,16 @@ class FreqaiDataKitchen: append_df = pd.read_hdf(self.backtesting_results_path) return append_df - def check_if_backtest_prediction_exists( - self + def check_if_backtest_prediction_is_valid( + self, + length_backtesting_dataframe: int ) -> bool: """ - Check if a backtesting prediction already exists - :param dk: FreqaiDataKitchen + Check if a backtesting prediction already exists and if the predictions + to append has the same sime of backtesting dataframe slice + :param length_backtesting_dataframe: Length of backtesting dataframe slice :return: - :boolean: whether the prediction file exists or not. + :boolean: whether the prediction file is valid. """ path_to_predictionfile = Path(self.full_path / self.backtest_predictions_folder / @@ -1327,10 +1329,18 @@ class FreqaiDataKitchen: self.backtesting_results_path = path_to_predictionfile file_exists = path_to_predictionfile.is_file() + if file_exists: - logger.info(f"Found backtesting prediction file at {path_to_predictionfile}") + append_df = self.get_backtesting_prediction() + if len(append_df) == length_backtesting_dataframe: + logger.info(f"Found backtesting prediction file at {path_to_predictionfile}") + return True + else: + logger.info("A new backtesting prediction file is required. " + "(Number of predictions is different of dataframe length).") + return False else: logger.info( f"Could not find backtesting prediction file at {path_to_predictionfile}" ) - return file_exists + return False diff --git a/freqtrade/freqai/freqai_interface.py b/freqtrade/freqai/freqai_interface.py index dcf902954..af158990b 100644 --- a/freqtrade/freqai/freqai_interface.py +++ b/freqtrade/freqai/freqai_interface.py @@ -275,7 +275,7 @@ class IFreqaiModel(ABC): dk.set_new_model_names(pair, trained_timestamp) - if dk.check_if_backtest_prediction_exists(): + if dk.check_if_backtest_prediction_is_valid(len(dataframe_backtest)): self.dd.load_metadata(dk) dk.find_features(dataframe_train) self.check_if_feature_list_matches_strategy(dk) From e25dea7e0e1963ac4cf367cddd6a1bb76ac83016 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 13:26:27 -0600 Subject: [PATCH 05/15] update channel disconnecting --- freqtrade/rpc/api_server/api_ws.py | 10 ++-------- freqtrade/rpc/api_server/ws/channel.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/freqtrade/rpc/api_server/api_ws.py b/freqtrade/rpc/api_server/api_ws.py index b230cbe2b..118d70d78 100644 --- a/freqtrade/rpc/api_server/api_ws.py +++ b/freqtrade/rpc/api_server/api_ws.py @@ -127,13 +127,6 @@ async def message_endpoint( except Exception as e: logger.info(f"Consumer connection failed - {channel}: {e}") logger.debug(e, exc_info=e) - finally: - await channel_manager.on_disconnect(ws) - - else: - if channel: - await channel_manager.on_disconnect(ws) - await ws.close() except RuntimeError: # WebSocket was closed @@ -144,4 +137,5 @@ async def message_endpoint( # Log tracebacks to keep track of what errors are happening logger.exception(e) finally: - await channel_manager.on_disconnect(ws) + if channel: + await channel_manager.on_disconnect(ws) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 34f03f0c4..ec1b4c639 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -46,7 +46,7 @@ class WebSocketChannel: self._relay_task = asyncio.create_task(self.relay()) # Internal event to signify a closed websocket - self._closed = False + self._closed = asyncio.Event() # Wrap the WebSocket in the Serializing class self._wrapped_ws = self._serializer_cls(self._websocket) @@ -99,14 +99,19 @@ class WebSocketChannel: Close the WebSocketChannel """ - self._closed = True + try: + await self.raw_websocket.close() + except Exception: + pass + + self._closed.set() self._relay_task.cancel() def is_closed(self) -> bool: """ Closed flag """ - return self._closed + return self._closed.is_set() def set_subscriptions(self, subscriptions: List[str] = []) -> None: """ @@ -129,7 +134,7 @@ class WebSocketChannel: Relay messages from the channel's queue and send them out. This is started as a task. """ - while True: + while not self._closed.is_set(): message = await self.queue.get() try: await self._send(message) From d848c2728347410e4be65db6e7733bd2bdbcefd5 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 13:30:42 -0600 Subject: [PATCH 06/15] add task done to broadcast queue method --- freqtrade/rpc/api_server/webserver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 1d0192a89..51dceb64b 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -197,6 +197,7 @@ class ApiServer(RPCHandler): # Get data from queue message: WSMessageSchemaType = await async_queue.get() logger.debug(f"Found message of type: {message.get('type')}") + async_queue.task_done() # Broadcast it await self._ws_channel_manager.broadcast(message) except asyncio.CancelledError: From c2bdaea84a8928c420f36b6e59e5e7fff2362417 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 14:19:08 -0600 Subject: [PATCH 07/15] change exception handling in channel send --- freqtrade/rpc/api_server/ws/channel.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index ec1b4c639..f9de1c6a0 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -79,7 +79,9 @@ class WebSocketChannel: timeout=self.drain_timeout ) return True - except asyncio.TimeoutError: + except Exception: + # We must catch any exception here to prevent an exception bubbling + # up and stalling the broadcast thread return False async def recv(self): @@ -135,11 +137,14 @@ class WebSocketChannel: as a task. """ while not self._closed.is_set(): + logger.info(f"{self} Relay - queue.get") message = await self.queue.get() try: + logger.info(f"{self} Relay - sending message") await self._send(message) self.queue.task_done() + logger.info(f"{self} Relay - QSize: {self.queue.qsize()}") # Limit messages per sec. # Could cause problems with queue size if too low, and # problems with network traffik if too high. From 55bf195bfbffea4c91684ca87a63bdad8addc98b Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 14:21:34 -0600 Subject: [PATCH 08/15] remove debugging log calls --- freqtrade/rpc/api_server/ws/channel.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index f9de1c6a0..e69e51e86 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -137,14 +137,11 @@ class WebSocketChannel: as a task. """ while not self._closed.is_set(): - logger.info(f"{self} Relay - queue.get") message = await self.queue.get() try: - logger.info(f"{self} Relay - sending message") await self._send(message) self.queue.task_done() - logger.info(f"{self} Relay - QSize: {self.queue.qsize()}") # Limit messages per sec. # Could cause problems with queue size if too low, and # problems with network traffik if too high. From 2dc55e89e6a1fb1baba7631020941120efeea586 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 15:25:39 -0600 Subject: [PATCH 09/15] better error handling channel send --- freqtrade/rpc/api_server/ws/channel.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index e69e51e86..417b7725a 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -78,11 +78,12 @@ class WebSocketChannel: self.queue.put(data), timeout=self.drain_timeout ) - return True - except Exception: - # We must catch any exception here to prevent an exception bubbling - # up and stalling the broadcast thread + except asyncio.TimeoutError: return False + except RuntimeError: + pass + + return True async def recv(self): """ From cbede2e27dae6ae24b69e37e01c157c8969f7c53 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 17:57:11 -0600 Subject: [PATCH 10/15] refactor channel.send to avoid queue.put --- freqtrade/rpc/api_server/ws/channel.py | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 417b7725a..3a929ac26 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -1,5 +1,6 @@ import asyncio import logging +import time from threading import RLock from typing import Any, Dict, List, Optional, Type, Union from uuid import uuid4 @@ -73,16 +74,23 @@ class WebSocketChannel: Add the data to the queue to be sent. :returns: True if data added to queue, False otherwise """ - try: - await asyncio.wait_for( - self.queue.put(data), - timeout=self.drain_timeout - ) - except asyncio.TimeoutError: - return False - except RuntimeError: - pass + # This block only runs if the queue is full, it will wait + # until self.drain_timeout for the relay to drain the outgoing + # queue + start = time.time() + while self.queue.full(): + await asyncio.sleep(1) + if (time.time() - start) > self.drain_timeout: + return False + + # If for some reason the queue is still full, just return False + try: + self.queue.put_nowait(data) + except asyncio.QueueFull: + return False + + # If we got here everything is ok return True async def recv(self): From 000b0c2198397f30298b505e789b29be7189e549 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 18:00:10 -0600 Subject: [PATCH 11/15] prevent memory leaks from error in _broadcast_queue_data --- freqtrade/rpc/api_server/webserver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/freqtrade/rpc/api_server/webserver.py b/freqtrade/rpc/api_server/webserver.py index 51dceb64b..e9a12e4df 100644 --- a/freqtrade/rpc/api_server/webserver.py +++ b/freqtrade/rpc/api_server/webserver.py @@ -211,6 +211,9 @@ class ApiServer(RPCHandler): # Disconnect channels and stop the loop on cancel await self._ws_channel_manager.disconnect_all() self._ws_loop.stop() + # Avoid adding more items to the queue if they aren't + # going to get broadcasted. + self._ws_queue = None def start_api(self): """ From a0965606a58b5e3ba5dd46699395eadb68db2d59 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 18:49:11 -0600 Subject: [PATCH 12/15] update ws_client more verbosity, better readable time delta --- scripts/ws_client.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/scripts/ws_client.py b/scripts/ws_client.py index 23ad9296d..40b5cf466 100644 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -18,7 +18,6 @@ import orjson import pandas import rapidjson import websockets -from dateutil.relativedelta import relativedelta logger = logging.getLogger("WebSocketClient") @@ -28,7 +27,7 @@ logger = logging.getLogger("WebSocketClient") def setup_logging(filename: str): logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(filename), @@ -75,16 +74,15 @@ def load_config(configfile): def readable_timedelta(delta): """ - Convert a dateutil.relativedelta to a readable format + Convert a millisecond delta to a readable format - :param delta: A dateutil.relativedelta + :param delta: A delta between two timestamps in milliseconds :returns: The readable time difference string """ - attrs = ['years', 'months', 'days', 'hours', 'minutes', 'seconds', 'microseconds'] - return ", ".join([ - '%d %s' % (getattr(delta, attr), attr if getattr(delta, attr) > 0 else attr[:-1]) - for attr in attrs if getattr(delta, attr) - ]) + seconds, milliseconds = divmod(delta, 1000) + minutes, seconds = divmod(seconds, 60) + + return f"{int(minutes)}:{int(seconds)}.{int(milliseconds)}" # ---------------------------------------------------------------------------- @@ -170,8 +168,8 @@ class ClientProtocol: def _calculate_time_difference(self): old_last_received_at = self._LAST_RECEIVED_AT - self._LAST_RECEIVED_AT = time.time() * 1e6 - time_delta = relativedelta(microseconds=(self._LAST_RECEIVED_AT - old_last_received_at)) + self._LAST_RECEIVED_AT = time.time() * 1e3 + time_delta = self._LAST_RECEIVED_AT - old_last_received_at return readable_timedelta(time_delta) @@ -272,6 +270,7 @@ async def create_client( websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK ): + logger.info("Connection was closed") # Just keep trying to connect again indefinitely await asyncio.sleep(sleep_time) From b749f3edd65412c6bee998c58dc5d9413518ef06 Mon Sep 17 00:00:00 2001 From: Timothy Pogue Date: Wed, 2 Nov 2022 19:30:35 -0600 Subject: [PATCH 13/15] add latency measure from ping in emc and ws_client --- freqtrade/rpc/external_message_consumer.py | 8 ++++---- scripts/ws_client.py | 10 ++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/freqtrade/rpc/external_message_consumer.py b/freqtrade/rpc/external_message_consumer.py index e86f44c17..b978407e4 100644 --- a/freqtrade/rpc/external_message_consumer.py +++ b/freqtrade/rpc/external_message_consumer.py @@ -264,10 +264,10 @@ class ExternalMessageConsumer: # We haven't received data yet. Check the connection and continue. try: # ping - ping = await channel.ping() + pong = await channel.ping() + latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000) - await asyncio.wait_for(ping, timeout=self.ping_timeout) - logger.debug(f"Connection to {channel} still alive...") + logger.info(f"Connection to {channel} still alive, latency: {latency}ms") continue except (websockets.exceptions.ConnectionClosed): @@ -276,7 +276,7 @@ class ExternalMessageConsumer: await asyncio.sleep(self.sleep_time) break except Exception as e: - logger.warning(f"Ping error {channel} - retrying in {self.sleep_time}s") + logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.debug(e, exc_info=e) await asyncio.sleep(self.sleep_time) diff --git a/scripts/ws_client.py b/scripts/ws_client.py index 40b5cf466..090039cde 100644 --- a/scripts/ws_client.py +++ b/scripts/ws_client.py @@ -240,12 +240,10 @@ async def create_client( ): # Try pinging try: - pong = ws.ping() - await asyncio.wait_for( - pong, - timeout=ping_timeout - ) - logger.info("Connection still alive...") + pong = await ws.ping() + latency = (await asyncio.wait_for(pong, timeout=ping_timeout) * 1000) + + logger.info(f"Connection still alive, latency: {latency}ms") continue From ff619edebf2d981294aa1252ce605e3032b8a6ec Mon Sep 17 00:00:00 2001 From: Matthias Date: Thu, 3 Nov 2022 06:50:18 +0100 Subject: [PATCH 14/15] Improve explanation comment as to why we're waiting ourselfs --- freqtrade/rpc/api_server/ws/channel.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/freqtrade/rpc/api_server/ws/channel.py b/freqtrade/rpc/api_server/ws/channel.py index 3a929ac26..3c97d05b1 100644 --- a/freqtrade/rpc/api_server/ws/channel.py +++ b/freqtrade/rpc/api_server/ws/channel.py @@ -76,8 +76,9 @@ class WebSocketChannel: """ # This block only runs if the queue is full, it will wait - # until self.drain_timeout for the relay to drain the outgoing - # queue + # until self.drain_timeout for the relay to drain the outgoing queue + # We can't use asyncio.wait_for here because the queue may have been created with a + # different eventloop start = time.time() while self.queue.full(): await asyncio.sleep(1) From 3ba1e221eb5f12686f1a48ace1cebd90705beacd Mon Sep 17 00:00:00 2001 From: robcaulk Date: Thu, 3 Nov 2022 19:08:33 +0100 Subject: [PATCH 15/15] fix typo --- freqtrade/freqai/data_kitchen.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/freqtrade/freqai/data_kitchen.py b/freqtrade/freqai/data_kitchen.py index 581ab78c0..e06709b2c 100644 --- a/freqtrade/freqai/data_kitchen.py +++ b/freqtrade/freqai/data_kitchen.py @@ -1318,7 +1318,7 @@ class FreqaiDataKitchen: ) -> bool: """ Check if a backtesting prediction already exists and if the predictions - to append has the same sime of backtesting dataframe slice + to append has the same size of backtesting dataframe slice :param length_backtesting_dataframe: Length of backtesting dataframe slice :return: :boolean: whether the prediction file is valid. @@ -1337,7 +1337,7 @@ class FreqaiDataKitchen: return True else: logger.info("A new backtesting prediction file is required. " - "(Number of predictions is different of dataframe length).") + "(Number of predictions is different from dataframe length).") return False else: logger.info(