add optional Cache arguments to refresh_pairs method
This commit is contained in:
parent
ca9036ee1d
commit
39fec25ae0
@ -733,13 +733,17 @@ class Exchange:
|
|||||||
logger.info("Downloaded data for %s with length %s.", pair, len(data))
|
logger.info("Downloaded data for %s with length %s.", pair, len(data))
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes) -> List[Tuple[str, List]]:
|
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
|
||||||
|
since_ms: Optional[int] = None, cache: bool = True
|
||||||
|
) -> Dict[str, DataFrame]:
|
||||||
"""
|
"""
|
||||||
Refresh in-memory OHLCV asynchronously and set `_klines` with the result
|
Refresh in-memory OHLCV asynchronously and set `_klines` with the result
|
||||||
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
|
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
|
||||||
Only used in the dataprovider.refresh() method.
|
Only used in the dataprovider.refresh() method.
|
||||||
:param pair_list: List of 2 element tuples containing pair, interval to refresh
|
:param pair_list: List of 2 element tuples containing pair, interval to refresh
|
||||||
:return: TODO: return value is only used in the tests, get rid of it
|
:param since_ms: time since when to download, in milliseconds
|
||||||
|
:param cache: Assign result to _klines. Usefull for one-off downloads like for pairlists
|
||||||
|
:return: Dict of [{(pair, timeframe): Dataframe}]
|
||||||
"""
|
"""
|
||||||
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
|
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
|
||||||
|
|
||||||
@ -749,7 +753,8 @@ class Exchange:
|
|||||||
for pair, timeframe in set(pair_list):
|
for pair, timeframe in set(pair_list):
|
||||||
if (not ((pair, timeframe) in self._klines)
|
if (not ((pair, timeframe) in self._klines)
|
||||||
or self._now_is_time_to_refresh(pair, timeframe)):
|
or self._now_is_time_to_refresh(pair, timeframe)):
|
||||||
input_coroutines.append(self._async_get_candle_history(pair, timeframe))
|
input_coroutines.append(self._async_get_candle_history(pair, timeframe,
|
||||||
|
since_ms=since_ms))
|
||||||
else:
|
else:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Using cached candle (OHLCV) data for pair %s, timeframe %s ...",
|
"Using cached candle (OHLCV) data for pair %s, timeframe %s ...",
|
||||||
@ -759,6 +764,7 @@ class Exchange:
|
|||||||
results = asyncio.get_event_loop().run_until_complete(
|
results = asyncio.get_event_loop().run_until_complete(
|
||||||
asyncio.gather(*input_coroutines, return_exceptions=True))
|
asyncio.gather(*input_coroutines, return_exceptions=True))
|
||||||
|
|
||||||
|
results_df = {}
|
||||||
# handle caching
|
# handle caching
|
||||||
for res in results:
|
for res in results:
|
||||||
if isinstance(res, Exception):
|
if isinstance(res, Exception):
|
||||||
@ -770,11 +776,13 @@ class Exchange:
|
|||||||
if ticks:
|
if ticks:
|
||||||
self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000
|
self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000
|
||||||
# keeping parsed dataframe in cache
|
# keeping parsed dataframe in cache
|
||||||
self._klines[(pair, timeframe)] = ohlcv_to_dataframe(
|
ohlcv_df = ohlcv_to_dataframe(
|
||||||
ticks, timeframe, pair=pair, fill_missing=True,
|
ticks, timeframe, pair=pair, fill_missing=True,
|
||||||
drop_incomplete=self._ohlcv_partial_candle)
|
drop_incomplete=self._ohlcv_partial_candle)
|
||||||
|
results_df[(pair, timeframe)] = ohlcv_df
|
||||||
return results
|
if cache:
|
||||||
|
self._klines[(pair, timeframe)] = ohlcv_df
|
||||||
|
return results_df
|
||||||
|
|
||||||
def _now_is_time_to_refresh(self, pair: str, timeframe: str) -> bool:
|
def _now_is_time_to_refresh(self, pair: str, timeframe: str) -> bool:
|
||||||
# Timeframe in seconds
|
# Timeframe in seconds
|
||||||
|
@ -1385,6 +1385,12 @@ def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None:
|
|||||||
pairs = [('IOTA/ETH', '5m'), ('XRP/ETH', '5m')]
|
pairs = [('IOTA/ETH', '5m'), ('XRP/ETH', '5m')]
|
||||||
# empty dicts
|
# empty dicts
|
||||||
assert not exchange._klines
|
assert not exchange._klines
|
||||||
|
exchange.refresh_latest_ohlcv(pairs, cache=False)
|
||||||
|
# No caching
|
||||||
|
assert not exchange._klines
|
||||||
|
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||||
|
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||||
|
|
||||||
exchange.refresh_latest_ohlcv(pairs)
|
exchange.refresh_latest_ohlcv(pairs)
|
||||||
|
|
||||||
assert log_has(f'Refreshing candle (OHLCV) data for {len(pairs)} pairs', caplog)
|
assert log_has(f'Refreshing candle (OHLCV) data for {len(pairs)} pairs', caplog)
|
||||||
@ -1499,11 +1505,9 @@ def test_refresh_latest_ohlcv_inv_result(default_conf, mocker, caplog):
|
|||||||
assert exchange._klines
|
assert exchange._klines
|
||||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||||
|
|
||||||
assert type(res) is list
|
assert type(res) is dict
|
||||||
assert len(res) == 2
|
assert len(res) == 1
|
||||||
# Test that each is in list at least once as order is not guaranteed
|
# Test that each is in list at least once as order is not guaranteed
|
||||||
assert type(res[0]) is tuple or type(res[1]) is tuple
|
|
||||||
assert type(res[0]) is TypeError or type(res[1]) is TypeError
|
|
||||||
assert log_has("Error loading ETH/BTC. Result was [[]].", caplog)
|
assert log_has("Error loading ETH/BTC. Result was [[]].", caplog)
|
||||||
assert log_has("Async code raised an exception: TypeError", caplog)
|
assert log_has("Async code raised an exception: TypeError", caplog)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user