Batch ohlcv requests to not overwelm ccxt's async throttler
closes #6003
This commit is contained in:
parent
231b1e2f57
commit
f0abe218a2
@ -1317,27 +1317,30 @@ class Exchange:
|
|||||||
)
|
)
|
||||||
cached_pairs.append((pair, timeframe))
|
cached_pairs.append((pair, timeframe))
|
||||||
|
|
||||||
results = asyncio.get_event_loop().run_until_complete(
|
|
||||||
asyncio.gather(*input_coroutines, return_exceptions=True))
|
|
||||||
|
|
||||||
results_df = {}
|
results_df = {}
|
||||||
# handle caching
|
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
|
||||||
for res in results:
|
for input_coro in chunks(input_coroutines, 100):
|
||||||
if isinstance(res, Exception):
|
results = asyncio.get_event_loop().run_until_complete(
|
||||||
logger.warning(f"Async code raised an exception: {repr(res)}")
|
asyncio.gather(*input_coro, return_exceptions=True))
|
||||||
continue
|
|
||||||
# Deconstruct tuple (has 3 elements)
|
# handle caching
|
||||||
pair, timeframe, ticks = res
|
for res in results:
|
||||||
# keeping last candle time as last refreshed time of the pair
|
if isinstance(res, Exception):
|
||||||
if ticks:
|
logger.warning(f"Async code raised an exception: {repr(res)}")
|
||||||
self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000
|
continue
|
||||||
# keeping parsed dataframe in cache
|
# Deconstruct tuple (has 3 elements)
|
||||||
ohlcv_df = ohlcv_to_dataframe(
|
pair, timeframe, ticks = res
|
||||||
ticks, timeframe, pair=pair, fill_missing=True,
|
# keeping last candle time as last refreshed time of the pair
|
||||||
drop_incomplete=self._ohlcv_partial_candle)
|
if ticks:
|
||||||
results_df[(pair, timeframe)] = ohlcv_df
|
self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000
|
||||||
if cache:
|
# keeping parsed dataframe in cache
|
||||||
self._klines[(pair, timeframe)] = ohlcv_df
|
ohlcv_df = ohlcv_to_dataframe(
|
||||||
|
ticks, timeframe, pair=pair, fill_missing=True,
|
||||||
|
drop_incomplete=self._ohlcv_partial_candle)
|
||||||
|
results_df[(pair, timeframe)] = ohlcv_df
|
||||||
|
if cache:
|
||||||
|
self._klines[(pair, timeframe)] = ohlcv_df
|
||||||
|
|
||||||
# Return cached klines
|
# Return cached klines
|
||||||
for pair, timeframe in cached_pairs:
|
for pair, timeframe in cached_pairs:
|
||||||
results_df[(pair, timeframe)] = self.klines((pair, timeframe), copy=False)
|
results_df[(pair, timeframe)] = self.klines((pair, timeframe), copy=False)
|
||||||
|
Loading…
Reference in New Issue
Block a user