import logging from typing import Any, Dict, List, Optional, Tuple import ccxt from freqtrade.constants import BuySell from freqtrade.enums import CandleType, MarginMode, TradingMode from freqtrade.enums.pricetype import PriceType from freqtrade.exceptions import (DDosProtection, OperationalException, RetryableOrderError, TemporaryError) from freqtrade.exchange import Exchange, date_minus_candles from freqtrade.exchange.common import retrier from freqtrade.misc import safe_value_fallback2 logger = logging.getLogger(__name__) class Okx(Exchange): """Okx exchange class. Contains adjustments needed for Freqtrade to work with this exchange. """ _ft_has: Dict = { "ohlcv_candle_limit": 100, # Warning, special case with data prior to X months "mark_ohlcv_timeframe": "4h", "funding_fee_timeframe": "8h", "stoploss_order_types": {"limit": "limit"}, "stoploss_on_exchange": True, } _ft_has_futures: Dict = { "tickers_have_quoteVolume": False, "fee_cost_in_contracts": True, "stop_price_type_field": "slTriggerPxType", "stop_price_type_value_mapping": { PriceType.LAST: "last", PriceType.MARK: "index", PriceType.INDEX: "mark", }, } _supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [ # TradingMode.SPOT always supported and not required in this list # (TradingMode.MARGIN, MarginMode.CROSS), # (TradingMode.FUTURES, MarginMode.CROSS), (TradingMode.FUTURES, MarginMode.ISOLATED), ] net_only = True _ccxt_params: Dict = {'options': {'brokerId': 'ffb5405ad327SUDE'}} def ohlcv_candle_limit( self, timeframe: str, candle_type: CandleType, since_ms: Optional[int] = None) -> int: """ Exchange ohlcv candle limit OKX has the following behaviour: * 300 candles for uptodate data * 100 candles for historic data * 100 candles for additional candles (not futures or spot). :param timeframe: Timeframe to check :param candle_type: Candle-type :param since_ms: Starting timestamp :return: Candle limit as integer """ if ( candle_type in (CandleType.FUTURES, CandleType.SPOT) and (not since_ms or since_ms > (date_minus_candles(timeframe, 300).timestamp() * 1000)) ): return 300 return super().ohlcv_candle_limit(timeframe, candle_type, since_ms) @retrier def additional_exchange_init(self) -> None: """ Additional exchange initialization logic. .api will be available at this point. Must be overridden in child methods if required. """ try: if self.trading_mode == TradingMode.FUTURES and not self._config['dry_run']: accounts = self._api.fetch_accounts() self._log_exchange_response('fetch_accounts', accounts) if len(accounts) > 0: self.net_only = accounts[0].get('info', {}).get('posMode') == 'net_mode' except ccxt.DDoSProtection as e: raise DDosProtection(e) from e except (ccxt.NetworkError, ccxt.ExchangeError) as e: raise TemporaryError( f'Error in additional_exchange_init due to {e.__class__.__name__}. Message: {e}' ) from e except ccxt.BaseError as e: raise OperationalException(e) from e def _get_posSide(self, side: BuySell, reduceOnly: bool): if self.net_only: return 'net' if not reduceOnly: # Enter return 'long' if side == 'buy' else 'short' else: # Exit return 'long' if side == 'sell' else 'short' def _get_params( self, side: BuySell, ordertype: str, leverage: float, reduceOnly: bool, time_in_force: str = 'GTC', ) -> Dict: params = super()._get_params( side=side, ordertype=ordertype, leverage=leverage, reduceOnly=reduceOnly, time_in_force=time_in_force, ) if self.trading_mode == TradingMode.FUTURES and self.margin_mode: params['tdMode'] = self.margin_mode.value params['posSide'] = self._get_posSide(side, reduceOnly) return params @retrier def _lev_prep(self, pair: str, leverage: float, side: BuySell): if self.trading_mode != TradingMode.SPOT and self.margin_mode is not None: try: # TODO-lev: Test me properly (check mgnMode passed) res = self._api.set_leverage( leverage=leverage, symbol=pair, params={ "mgnMode": self.margin_mode.value, "posSide": self._get_posSide(side, False), }) self._log_exchange_response('set_leverage', res) except ccxt.DDoSProtection as e: raise DDosProtection(e) from e except (ccxt.NetworkError, ccxt.ExchangeError) as e: raise TemporaryError( f'Could not set leverage due to {e.__class__.__name__}. Message: {e}') from e except ccxt.BaseError as e: raise OperationalException(e) from e def get_max_pair_stake_amount( self, pair: str, price: float, leverage: float = 1.0 ) -> float: if self.trading_mode == TradingMode.SPOT: return float('inf') # Not actually inf, but this probably won't matter for SPOT if pair not in self._leverage_tiers: return float('inf') pair_tiers = self._leverage_tiers[pair] return pair_tiers[-1]['maxNotional'] / leverage def _get_stop_params(self, side: BuySell, ordertype: str, stop_price: float) -> Dict: params = self._params.copy() # Verify if stopPrice works for your exchange! params.update({'stopLossPrice': stop_price}) if self.trading_mode == TradingMode.FUTURES and self.margin_mode: params['tdMode'] = self.margin_mode.value params['posSide'] = self._get_posSide(side, True) return params def stoploss_adjust(self, stop_loss: float, order: Dict, side: str) -> bool: """ OKX uses non-default stoploss price naming. """ if not self._ft_has.get('stoploss_on_exchange'): raise OperationalException(f"stoploss is not implemented for {self.name}.") return ( order.get('stopLossPrice', None) is None or ((side == "sell" and stop_loss > float(order['stopLossPrice'])) or (side == "buy" and stop_loss < float(order['stopLossPrice']))) ) def fetch_stoploss_order(self, order_id: str, pair: str, params: Dict = {}) -> Dict: if self._config['dry_run']: return self.fetch_dry_run_order(order_id) try: params1 = {'stop': True} order_reg = self._api.fetch_order(order_id, pair, params=params1) self._log_exchange_response('fetch_stoploss_order', order_reg) return order_reg except ccxt.OrderNotFound: pass params2 = {'stop': True, 'ordType': 'conditional'} for method in (self._api.fetch_open_orders, self._api.fetch_closed_orders, self._api.fetch_canceled_orders): try: orders = method(pair, params=params2) orders_f = [order for order in orders if order['id'] == order_id] if orders_f: order = orders_f[0] if (order['status'] == 'closed' and (real_order_id := order.get('info', {}).get('ordId')) is not None): # Once a order triggered, we fetch the regular followup order. order_reg = self.fetch_order(real_order_id, pair) self._log_exchange_response('fetch_stoploss_order1', order_reg) order_reg['id_stop'] = order_reg['id'] order_reg['id'] = order_id order_reg['type'] = 'stoploss' order_reg['status_stop'] = 'triggered' return order_reg order['type'] = 'stoploss' return order except ccxt.BaseError: pass raise RetryableOrderError( f'StoplossOrder not found (pair: {pair} id: {order_id}).') def get_order_id_conditional(self, order: Dict[str, Any]) -> str: if order['type'] == 'stop': return safe_value_fallback2(order, order, 'id_stop', 'id') return order['id'] def cancel_stoploss_order(self, order_id: str, pair: str, params: Dict = {}) -> Dict: params1 = {'stop': True} # 'ordType': 'conditional' # return self.cancel_order( order_id=order_id, pair=pair, params=params1, )