Mutex TTL Cache accesses which can be accessed by multiple threads

Apparently, cachetools is (intentionally) not threadsafe
when using the Caches directly.
It's therefore recommended to wrap these with an explicit lock to avoid
problems.

source: https://github.com/tkem/cachetools/issues/245

closes #7215
This commit is contained in:
Matthias 2022-08-16 19:43:41 +02:00
parent ea6bc47d7a
commit 0b0e7eaf96
2 changed files with 17 additions and 8 deletions

View File

@ -116,6 +116,7 @@ class Exchange:
self._last_markets_refresh: int = 0
# Cache for 10 minutes ...
self._cache_lock = Lock()
self._fetch_tickers_cache: TTLCache = TTLCache(maxsize=2, ttl=60 * 10)
# Cache values for 1800 to avoid frequent polling of the exchange for prices
# Caching only applies to RPC methods, so prices for open trades are still
@ -1388,12 +1389,14 @@ class Exchange:
if not self.exchange_has('fetchBidsAsks'):
return {}
if cached:
tickers = self._fetch_tickers_cache.get('fetch_bids_asks')
with self._cache_lock:
tickers = self._fetch_tickers_cache.get('fetch_bids_asks')
if tickers:
return tickers
try:
tickers = self._api.fetch_bids_asks(symbols)
self._fetch_tickers_cache['fetch_bids_asks'] = tickers
with self._cache_lock:
self._fetch_tickers_cache['fetch_bids_asks'] = tickers
return tickers
except ccxt.NotSupported as e:
raise OperationalException(
@ -1414,12 +1417,14 @@ class Exchange:
:return: fetch_tickers result
"""
if cached:
tickers = self._fetch_tickers_cache.get('fetch_tickers')
with self._cache_lock:
tickers = self._fetch_tickers_cache.get('fetch_tickers')
if tickers:
return tickers
try:
tickers = self._api.fetch_tickers(symbols)
self._fetch_tickers_cache['fetch_tickers'] = tickers
with self._cache_lock:
self._fetch_tickers_cache['fetch_tickers'] = tickers
return tickers
except ccxt.NotSupported as e:
raise OperationalException(
@ -1528,7 +1533,8 @@ class Exchange:
cache_rate: TTLCache = self._entry_rate_cache if side == "entry" else self._exit_rate_cache
if not refresh:
rate = cache_rate.get(pair)
with self._cache_lock:
rate = cache_rate.get(pair)
# Check if cache has been invalidated
if rate:
logger.debug(f"Using cached {side} rate for {pair}.")
@ -1573,7 +1579,8 @@ class Exchange:
if rate is None:
raise PricingError(f"{name}-Rate for {pair} was empty.")
cache_rate[pair] = rate
with self._cache_lock:
cache_rate[pair] = rate
return rate
@ -1581,8 +1588,9 @@ class Exchange:
entry_rate = None
exit_rate = None
if not refresh:
entry_rate = self._entry_rate_cache.get(pair)
exit_rate = self._exit_rate_cache.get(pair)
with self._cache_lock:
entry_rate = self._entry_rate_cache.get(pair)
exit_rate = self._exit_rate_cache.get(pair)
if entry_rate:
logger.debug(f"Using cached buy rate for {pair}.")
if exit_rate:

View File

@ -483,6 +483,7 @@ class Hyperopt:
self.backtesting.exchange._api_async = None
self.backtesting.exchange.loop = None # type: ignore
self.backtesting.exchange._loop_lock = None # type: ignore
self.backtesting.exchange._cache_lock = None # type: ignore
# self.backtesting.exchange = None # type: ignore
self.backtesting.pairlists = None # type: ignore