From f0abe218a2cbdcd0a7296b07894d23a5233ed39e Mon Sep 17 00:00:00 2001 From: Matthias Date: Tue, 30 Nov 2021 07:02:49 +0100 Subject: [PATCH] Batch ohlcv requests to not overwelm ccxt's async throttler closes #6003 --- freqtrade/exchange/exchange.py | 43 ++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index e25be9ae1..0ae78cf1b 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -1317,27 +1317,30 @@ class Exchange: ) cached_pairs.append((pair, timeframe)) - results = asyncio.get_event_loop().run_until_complete( - asyncio.gather(*input_coroutines, return_exceptions=True)) - results_df = {} - # handle caching - for res in results: - if isinstance(res, Exception): - logger.warning(f"Async code raised an exception: {repr(res)}") - continue - # Deconstruct tuple (has 3 elements) - pair, timeframe, ticks = res - # keeping last candle time as last refreshed time of the pair - if ticks: - self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000 - # keeping parsed dataframe in cache - ohlcv_df = ohlcv_to_dataframe( - ticks, timeframe, pair=pair, fill_missing=True, - drop_incomplete=self._ohlcv_partial_candle) - results_df[(pair, timeframe)] = ohlcv_df - if cache: - self._klines[(pair, timeframe)] = ohlcv_df + # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling + for input_coro in chunks(input_coroutines, 100): + results = asyncio.get_event_loop().run_until_complete( + asyncio.gather(*input_coro, return_exceptions=True)) + + # handle caching + for res in results: + if isinstance(res, Exception): + logger.warning(f"Async code raised an exception: {repr(res)}") + continue + # Deconstruct tuple (has 3 elements) + pair, timeframe, ticks = res + # keeping last candle time as last refreshed time of the pair + if ticks: + self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000 + # keeping parsed dataframe in cache + ohlcv_df = ohlcv_to_dataframe( + ticks, timeframe, pair=pair, fill_missing=True, + drop_incomplete=self._ohlcv_partial_candle) + results_df[(pair, timeframe)] = ohlcv_df + if cache: + self._klines[(pair, timeframe)] = ohlcv_df + # Return cached klines for pair, timeframe in cached_pairs: results_df[(pair, timeframe)] = self.klines((pair, timeframe), copy=False)