Merge branch 'develop' into feat/short

This commit is contained in:
Matthias
2022-01-01 19:16:24 +01:00
75 changed files with 593 additions and 314 deletions

View File

@@ -89,6 +89,8 @@ class Exchange:
self._api_async: ccxt_async.Exchange = None
self._markets: Dict = {}
self._leverage_brackets: Dict = {}
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self._config.update(config)
@@ -188,8 +190,10 @@ class Exchange:
def close(self):
logger.debug("Exchange object destroyed, closing async loop")
if self._api_async and inspect.iscoroutinefunction(self._api_async.close):
asyncio.get_event_loop().run_until_complete(self._api_async.close())
if (self._api_async and inspect.iscoroutinefunction(self._api_async.close)
and self._api_async.session):
logger.info("Closing async ccxt session.")
self.loop.run_until_complete(self._api_async.close())
def _init_ccxt(self, exchange_config: Dict[str, Any], ccxt_module: CcxtModuleType = ccxt,
ccxt_kwargs: Dict = {}) -> ccxt.Exchange:
@@ -379,7 +383,7 @@ class Exchange:
def _load_async_markets(self, reload: bool = False) -> None:
try:
if self._api_async:
asyncio.get_event_loop().run_until_complete(
self.loop.run_until_complete(
self._api_async.load_markets(reload=reload))
except (asyncio.TimeoutError, ccxt.BaseError) as e:
@@ -1194,7 +1198,8 @@ class Exchange:
# Fee handling
@retrier
def get_trades_for_order(self, order_id: str, pair: str, since: datetime) -> List:
def get_trades_for_order(self, order_id: str, pair: str, since: datetime,
params: Optional[Dict] = None) -> List:
"""
Fetch Orders using the "fetch_my_trades" endpoint and filter them by order-id.
The "since" argument passed in is coming from the database and is in UTC,
@@ -1218,8 +1223,10 @@ class Exchange:
try:
# Allow 5s offset to catch slight time offsets (discovered in #1185)
# since needs to be int in milliseconds
_params = params if params else {}
my_trades = self._api.fetch_my_trades(
pair, int((since.replace(tzinfo=timezone.utc).timestamp() - 5) * 1000))
pair, int((since.replace(tzinfo=timezone.utc).timestamp() - 5) * 1000),
params=_params)
matched_trades = [trade for trade in my_trades if trade['order'] == order_id]
self._log_exchange_response('get_trades_for_order', matched_trades)
@@ -1297,9 +1304,11 @@ class Exchange:
tick = self.fetch_ticker(comb)
fee_to_quote_rate = safe_value_fallback2(tick, tick, 'last', 'ask')
return round((order['fee']['cost'] * fee_to_quote_rate) / order['cost'], 8)
except ExchangeError:
return None
fee_to_quote_rate = self._config['exchange'].get('unknown_fee_rate', None)
if not fee_to_quote_rate:
return None
return round((order['fee']['cost'] * fee_to_quote_rate) / order['cost'], 8)
def extract_cost_curr_rate(self, order: Dict) -> Tuple[float, str, Optional[float]]:
"""
@@ -1327,7 +1336,7 @@ class Exchange:
:param candle_type: '', mark, index, premiumIndex, or funding_rate
:return: List with candle (OHLCV) data
"""
pair, _, _, data = asyncio.get_event_loop().run_until_complete(
pair, _, _, data = self.loop.run_until_complete(
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
since_ms=since_ms, is_new_pair=is_new_pair,
candle_type=candle_type))
@@ -1436,8 +1445,10 @@ class Exchange:
results_df = {}
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
for input_coro in chunks(input_coroutines, 100):
results = asyncio.get_event_loop().run_until_complete(
asyncio.gather(*input_coro, return_exceptions=True))
async def gather_stuff():
return await asyncio.gather(*input_coro, return_exceptions=True)
results = self.loop.run_until_complete(gather_stuff())
for res in results:
if isinstance(res, Exception):
@@ -1692,7 +1703,7 @@ class Exchange:
if not self.exchange_has("fetchTrades"):
raise OperationalException("This exchange does not support downloading Trades.")
return asyncio.get_event_loop().run_until_complete(
return self.loop.run_until_complete(
self._async_get_trade_history(pair=pair, since=since,
until=until, from_id=from_id))