Extract caching logic from lowestlevel fetch_ohlcv function

This commit is contained in:
Matthias 2018-12-11 07:11:43 +01:00
parent 523dea4a04
commit 8c1901ad1e

View File

@ -126,12 +126,12 @@ class Exchange(object):
raise OperationalException(f'Exchange {name} is not supported') raise OperationalException(f'Exchange {name} is not supported')
ex_config = { ex_config = {
'apiKey': exchange_config.get('key'), 'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'), 'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'), 'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''), 'uid': exchange_config.get('uid', ''),
'enableRateLimit': exchange_config.get('ccxt_rate_limit', True) 'enableRateLimit': exchange_config.get('ccxt_rate_limit', True)
} }
if ccxt_kwargs: if ccxt_kwargs:
logger.info('Applying additional ccxt config: %s', ccxt_kwargs) logger.info('Applying additional ccxt config: %s', ccxt_kwargs)
ex_config.update(ccxt_kwargs) ex_config.update(ccxt_kwargs)
@ -499,25 +499,36 @@ class Exchange(object):
def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None: def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None:
""" """
Refresh tickers asyncronously and return the result. Refresh tickers asyncronously and set `klines` of this object with the result
""" """
logger.debug("Refreshing klines for %d pairs", len(pair_list)) logger.debug("Refreshing klines for %d pairs", len(pair_list))
ticklist = asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
self.async_get_candles_history(pair_list, ticker_interval)) self.async_get_candles_history(pair_list, ticker_interval))
for pair, ticks in ticklist:
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[pair] = ticks[-1][0] // 1000
# keeping candles in cache
self.klines[pair] = ticks
async def async_get_candles_history(self, pairs: List[str], async def async_get_candles_history(self, pairs: List[str],
tick_interval: str) -> List[Tuple[str, List]]: tick_interval: str) -> List[Tuple[str, List]]:
"""Download ohlcv history for pair-list asyncronously """ """Download ohlcv history for pair-list asyncronously """
input_coroutines = [self._async_get_candle_history( # Calculating ticker interval in second
symbol, tick_interval) for symbol in pairs] interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
input_coroutines = []
# Gather corotines to run
for pair in pairs:
if not (self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp and pair in self.klines):
input_coroutines.append(self._async_get_candle_history(pair, tick_interval))
else:
logger.debug("Using cached klines data for %s ...", pair)
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True) tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
# handle caching
for pair, ticks in tickers:
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[pair] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
self.klines[pair] = ticks
return tickers return tickers
@retrier_async @retrier_async
@ -527,20 +538,8 @@ class Exchange(object):
# fetch ohlcv asynchronously # fetch ohlcv asynchronously
logger.debug("fetching %s since %s ...", pair, since_ms) logger.debug("fetching %s since %s ...", pair, since_ms)
# Calculating ticker interval in second data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 since=since_ms)
# If (last update time) + (interval in second) is greater or equal than now
# that means we don't have to hit the API as there is no new candle
# so we fetch it from local cache
if (not since_ms and
self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp and pair in self.klines):
data = self.klines[pair]
logger.debug("Using cached klines data for %s ...", pair)
else:
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
since=since_ms)
# Because some exchange sort Tickers ASC and other DESC. # Because some exchange sort Tickers ASC and other DESC.
# Ex: Bittrex returns a list of tickers ASC (oldest first, newest last) # Ex: Bittrex returns a list of tickers ASC (oldest first, newest last)