refactor refresh_latest_ohlcv

This commit is contained in:
Matthias 2022-10-03 20:00:56 +02:00
parent dc5c3a0ed2
commit 7f475e37d7

View File

@ -1870,6 +1870,38 @@ class Exchange:
return self._async_get_candle_history( return self._async_get_candle_history(
pair, timeframe, since_ms=since_ms, candle_type=candle_type) pair, timeframe, since_ms=since_ms, candle_type=candle_type)
def _build_ohlcv_dl_jobs(
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int],
cache: bool) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]:
"""
Build Coroutines to execute as part of refresh_latest_ohlcv
"""
input_coroutines = []
cached_pairs = []
for pair, timeframe, candle_type in set(pair_list):
if (
timeframe not in self.timeframes
and candle_type in (CandleType.SPOT, CandleType.FUTURES)
):
logger.warning(
f"Cannot download ({pair}, {timeframe}) combination as this timeframe is "
f"not available on {self.name}. Available timeframes are "
f"{', '.join(self.timeframes)}.")
continue
if ((pair, timeframe, candle_type) not in self._klines or not cache
or self._now_is_time_to_refresh(pair, timeframe, candle_type)):
input_coroutines.append(self._build_coroutine(
pair, timeframe, candle_type=candle_type, since_ms=since_ms))
else:
logger.debug(
f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..."
)
cached_pairs.append((pair, timeframe, candle_type))
return input_coroutines, cached_pairs
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True, since_ms: Optional[int] = None, cache: bool = True,
drop_incomplete: Optional[bool] = None drop_incomplete: Optional[bool] = None
@ -1887,27 +1919,9 @@ class Exchange:
""" """
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
drop_incomplete = self._ohlcv_partial_candle if drop_incomplete is None else drop_incomplete drop_incomplete = self._ohlcv_partial_candle if drop_incomplete is None else drop_incomplete
input_coroutines = []
cached_pairs = []
# Gather coroutines to run
for pair, timeframe, candle_type in set(pair_list):
if (timeframe not in self.timeframes
and candle_type in (CandleType.SPOT, CandleType.FUTURES)):
logger.warning(
f"Cannot download ({pair}, {timeframe}) combination as this timeframe is "
f"not available on {self.name}. Available timeframes are "
f"{', '.join(self.timeframes)}.")
continue
if ((pair, timeframe, candle_type) not in self._klines or not cache
or self._now_is_time_to_refresh(pair, timeframe, candle_type)):
input_coroutines.append(self._build_coroutine(
pair, timeframe, candle_type=candle_type, since_ms=since_ms))
else: # Gather coroutines to run
logger.debug( input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache)
f"Using cached candle (OHLCV) data for {pair}, {timeframe}, {candle_type} ..."
)
cached_pairs.append((pair, timeframe, candle_type))
results_df = {} results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling