Fix race condition for loop

This commit is contained in:
Matthias 2022-04-28 06:29:14 +02:00
parent 220927289d
commit ca49821df0

View File

@ -9,6 +9,7 @@ import logging
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from math import ceil from math import ceil
from threading import Lock
from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union
import arrow import arrow
@ -96,6 +97,9 @@ class Exchange:
self._markets: Dict = {} self._markets: Dict = {}
self._trading_fees: Dict[str, Any] = {} self._trading_fees: Dict[str, Any] = {}
self._leverage_tiers: Dict[str, List[Dict]] = {} self._leverage_tiers: Dict[str, List[Dict]] = {}
# Lock event loop. This is necessary to avoid race-conditions when using force* commands
# Due to funding fee fetching.
self._loop_lock = Lock()
self.loop = asyncio.new_event_loop() self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) asyncio.set_event_loop(self.loop)
self._config: Dict = {} self._config: Dict = {}
@ -1775,7 +1779,8 @@ class Exchange:
async def gather_stuff(): async def gather_stuff():
return await asyncio.gather(*input_coro, return_exceptions=True) return await asyncio.gather(*input_coro, return_exceptions=True)
results = self.loop.run_until_complete(gather_stuff()) with self._loop_lock:
results = self.loop.run_until_complete(gather_stuff())
for res in results: for res in results:
if isinstance(res, Exception): if isinstance(res, Exception):
@ -2032,9 +2037,10 @@ class Exchange:
if not self.exchange_has("fetchTrades"): if not self.exchange_has("fetchTrades"):
raise OperationalException("This exchange does not support downloading Trades.") raise OperationalException("This exchange does not support downloading Trades.")
return self.loop.run_until_complete( with self._loop_lock:
self._async_get_trade_history(pair=pair, since=since, return self.loop.run_until_complete(
until=until, from_id=from_id)) self._async_get_trade_history(pair=pair, since=since,
until=until, from_id=from_id))
@retrier @retrier
def _get_funding_fees_from_exchange(self, pair: str, since: Union[datetime, int]) -> float: def _get_funding_fees_from_exchange(self, pair: str, since: Union[datetime, int]) -> float: