Store tickers by pair / ticker_interval

This commit is contained in:
Matthias
2018-12-30 07:15:21 +01:00
parent 5f61da30ed
commit 0aa0b1d4fe
4 changed files with 43 additions and 26 deletions

View File

@@ -83,7 +83,7 @@ class Exchange(object):
self._pairs_last_refresh_time: Dict[str, int] = {}
# Holds candles
self._klines: Dict[str, DataFrame] = {}
self._klines: Dict[Tuple[str, str], DataFrame] = {}
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
@@ -158,9 +158,10 @@ class Exchange(object):
"""exchange ccxt id"""
return self._api.id
def klines(self, pair: str, copy=True) -> DataFrame:
if pair in self._klines:
return self._klines[pair].copy() if copy else self._klines[pair]
def klines(self, pair_interval: Tuple[str, str], copy=True) -> DataFrame:
# create key tuple
if pair_interval in self._klines:
return self._klines[pair_interval].copy() if copy else self._klines[pair_interval]
else:
return DataFrame()
@@ -531,24 +532,24 @@ class Exchange(object):
logger.info("downloaded %s with length %s.", pair, len(data))
return data
def refresh_latest_ohlcv(self, pair_list: List[str],
ticker_interval: str) -> List[Tuple[str, List]]:
def refresh_latest_ohlcv(self, pair_list: List[Tuple[str, str]]) -> List[Tuple[str, List]]:
"""
Refresh in-memory ohlcv asyncronously and set `_klines` with the result
Refresh in-memory ohlcv asyncronously and set `_klines` with the result
"""
logger.debug("Refreshing ohlcv data for %d pairs", len(pair_list))
# Calculating ticker interval in second
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[ticker_interval] * 60
input_coroutines = []
# Gather corotines to run
for pair in pair_list:
for pair, ticker_interval in pair_list:
# Calculating ticker interval in second
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[ticker_interval] * 60
if not (self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
arrow.utcnow().timestamp and pair in self._klines):
arrow.utcnow().timestamp and (pair, ticker_interval) in self._klines):
input_coroutines.append(self._async_get_candle_history(pair, ticker_interval))
else:
logger.debug("Using cached ohlcv data for %s ...", pair)
logger.debug("Using cached ohlcv data for %s, %s ...", pair, ticker_interval)
tickers = asyncio.get_event_loop().run_until_complete(
asyncio.gather(*input_coroutines, return_exceptions=True))
@@ -559,13 +560,14 @@ class Exchange(object):
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
continue
pair = res[0]
tick_interval[1]
tick_interval = res[1]
ticks = res[2]
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[pair] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
self._klines[pair] = parse_ticker_dataframe(ticks, tick_interval, fill_missing=True)
# keeping parsed dataframe in cache
self._klines[(pair, tick_interval)] = parse_ticker_dataframe(
ticks, tick_interval, fill_missing=True)
return tickers
@retrier_async