don't use deprecated asyncio.get_event_loop()

This commit is contained in:
Matthias 2021-12-31 16:34:15 +01:00
parent c9296dc9a0
commit 0277d93a64

View File

@ -83,6 +83,8 @@ class Exchange:
self._api: ccxt.Exchange = None self._api: ccxt.Exchange = None
self._api_async: ccxt_async.Exchange = None self._api_async: ccxt_async.Exchange = None
self._markets: Dict = {} self._markets: Dict = {}
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._config.update(config) self._config.update(config)
@ -173,7 +175,7 @@ class Exchange:
if (self._api_async and inspect.iscoroutinefunction(self._api_async.close) if (self._api_async and inspect.iscoroutinefunction(self._api_async.close)
and self._api_async.session): and self._api_async.session):
logger.info("Closing async ccxt session.") logger.info("Closing async ccxt session.")
asyncio.get_event_loop().run_until_complete(self._api_async.close()) self.loop.run_until_complete(self._api_async.close())
def _init_ccxt(self, exchange_config: Dict[str, Any], ccxt_module: CcxtModuleType = ccxt, def _init_ccxt(self, exchange_config: Dict[str, Any], ccxt_module: CcxtModuleType = ccxt,
ccxt_kwargs: Dict = {}) -> ccxt.Exchange: ccxt_kwargs: Dict = {}) -> ccxt.Exchange:
@ -328,7 +330,7 @@ class Exchange:
def _load_async_markets(self, reload: bool = False) -> None: def _load_async_markets(self, reload: bool = False) -> None:
try: try:
if self._api_async: if self._api_async:
asyncio.get_event_loop().run_until_complete( self.loop.run_until_complete(
self._api_async.load_markets(reload=reload)) self._api_async.load_markets(reload=reload))
except (asyncio.TimeoutError, ccxt.BaseError) as e: except (asyncio.TimeoutError, ccxt.BaseError) as e:
@ -1229,7 +1231,7 @@ class Exchange:
:param since_ms: Timestamp in milliseconds to get history from :param since_ms: Timestamp in milliseconds to get history from
:return: List with candle (OHLCV) data :return: List with candle (OHLCV) data
""" """
pair, timeframe, data = asyncio.get_event_loop().run_until_complete( pair, timeframe, data = self.loop.run_until_complete(
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe, self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
since_ms=since_ms, is_new_pair=is_new_pair)) since_ms=since_ms, is_new_pair=is_new_pair))
logger.info(f"Downloaded data for {pair} with length {len(data)}.") logger.info(f"Downloaded data for {pair} with length {len(data)}.")
@ -1331,8 +1333,10 @@ class Exchange:
results_df = {} results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling # Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100): for input_coro in chunks(input_coroutines, 100):
results = asyncio.get_event_loop().run_until_complete( async def gather_stuff():
asyncio.gather(*input_coro, return_exceptions=True)) return await asyncio.gather(*input_coro, return_exceptions=True)
results = self.loop.run_until_complete(gather_stuff())
# handle caching # handle caching
for res in results: for res in results:
@ -1568,7 +1572,7 @@ class Exchange:
if not self.exchange_has("fetchTrades"): if not self.exchange_has("fetchTrades"):
raise OperationalException("This exchange does not support downloading Trades.") raise OperationalException("This exchange does not support downloading Trades.")
return asyncio.get_event_loop().run_until_complete( return self.loop.run_until_complete(
self._async_get_trade_history(pair=pair, since=since, self._async_get_trade_history(pair=pair, since=since,
until=until, from_id=from_id)) until=until, from_id=from_id))