stable/freqtrade/exchange/exchange.py

787 lines
31 KiB
Python
Raw Normal View History

2017-11-18 07:52:28 +00:00
# pragma pylint: disable=W0603
"""
Cryptocurrency Exchanges support
"""
2019-06-09 12:05:36 +00:00
import asyncio
import inspect
2019-06-09 12:05:36 +00:00
import logging
from copy import deepcopy
2018-04-15 17:39:11 +00:00
from datetime import datetime
2019-06-09 12:05:36 +00:00
from math import ceil, floor
from random import randint
from typing import Any, Dict, List, Optional, Tuple
2017-09-01 19:11:46 +00:00
2018-12-11 18:47:48 +00:00
import arrow
import ccxt
2018-07-31 10:47:32 +00:00
import ccxt.async_support as ccxt_async
2018-12-11 18:47:48 +00:00
from pandas import DataFrame
2019-06-09 12:05:36 +00:00
from freqtrade import (DependencyException, InvalidOrderException,
OperationalException, TemporaryError, constants)
2018-12-12 19:16:03 +00:00
from freqtrade.data.converter import parse_ticker_dataframe
2019-06-09 12:05:36 +00:00
from freqtrade.misc import deep_merge_dicts
2017-05-12 17:11:56 +00:00
2017-05-14 12:14:16 +00:00
logger = logging.getLogger(__name__)
2017-05-12 17:11:56 +00:00
API_RETRY_COUNT = 4
2017-05-12 17:11:56 +00:00
2017-11-05 14:21:16 +00:00
2018-08-18 19:05:38 +00:00
def retrier_async(f):
async def wrapper(*args, **kwargs):
count = kwargs.pop('count', API_RETRY_COUNT)
try:
return await f(*args, **kwargs)
except (TemporaryError, DependencyException) as ex:
logger.warning('%s() returned exception: "%s"', f.__name__, ex)
if count > 0:
count -= 1
kwargs.update({'count': count})
logger.warning('retrying %s() still for %s times', f.__name__, count)
return await wrapper(*args, **kwargs)
else:
logger.warning('Giving up retrying: %s()', f.__name__)
raise ex
return wrapper
def retrier(f):
def wrapper(*args, **kwargs):
count = kwargs.pop('count', API_RETRY_COUNT)
try:
return f(*args, **kwargs)
2018-04-21 20:37:27 +00:00
except (TemporaryError, DependencyException) as ex:
logger.warning('%s() returned exception: "%s"', f.__name__, ex)
if count > 0:
count -= 1
kwargs.update({'count': count})
2018-04-21 20:37:27 +00:00
logger.warning('retrying %s() still for %s times', f.__name__, count)
return wrapper(*args, **kwargs)
else:
logger.warning('Giving up retrying: %s()', f.__name__)
raise ex
return wrapper
2018-06-17 10:41:33 +00:00
class Exchange(object):
2019-02-28 23:13:16 +00:00
_config: Dict = {}
2019-02-17 22:34:15 +00:00
_params: Dict = {}
2019-02-24 18:35:29 +00:00
# Dict to specify which options each exchange implements
2019-06-09 12:05:36 +00:00
# This defines defaults, which can be selectively overridden by subclasses using _ft_has
# or by specifying them in the configuration.
_ft_has_default: Dict = {
2019-02-24 18:35:29 +00:00
"stoploss_on_exchange": False,
"order_time_in_force": ["gtc"],
"ohlcv_candle_limit": 500,
2019-06-09 12:35:58 +00:00
"ohlcv_partial_candle": True,
2019-02-24 18:35:29 +00:00
}
2019-06-09 12:05:36 +00:00
_ft_has: Dict = {}
2019-02-24 18:35:29 +00:00
2018-06-17 10:41:33 +00:00
def __init__(self, config: dict) -> None:
"""
Initializes this module with the given config,
2019-02-24 18:35:29 +00:00
it does basic validation whether the specified exchange and pairs are valid.
2018-06-17 10:41:33 +00:00
:return: None
"""
2019-07-03 02:02:44 +00:00
self._api: ccxt.Exchange = None
self._api_async: ccxt_async.Exchange = None
2019-02-28 23:13:16 +00:00
self._config.update(config)
2018-08-19 17:37:48 +00:00
self._cached_ticker: Dict[str, Any] = {}
# Holds last candle refreshed time of each pair
self._pairs_last_refresh_time: Dict[Tuple[str, str], int] = {}
2019-03-10 12:30:45 +00:00
# Timestamp of last markets refresh
self._last_markets_refresh: int = 0
2018-08-19 17:37:48 +00:00
# Holds candles
self._klines: Dict[Tuple[str, str], DataFrame] = {}
2018-08-19 17:37:48 +00:00
2018-12-10 18:54:43 +00:00
# Holds all open sell orders for dry_run
self._dry_run_open_orders: Dict[str, Any] = {}
2018-08-19 17:37:48 +00:00
2018-06-17 10:41:33 +00:00
if config['dry_run']:
logger.info('Instance is running with dry_run enabled')
2017-10-01 21:28:09 +00:00
2018-06-17 10:41:33 +00:00
exchange_config = config['exchange']
2019-06-09 12:05:36 +00:00
# Deep merge ft_has with default ft_has options
self._ft_has = deep_merge_dicts(self._ft_has, deepcopy(self._ft_has_default))
if exchange_config.get("_ft_has_params"):
self._ft_has = deep_merge_dicts(exchange_config.get("_ft_has_params"),
self._ft_has)
2019-06-09 12:35:58 +00:00
logger.info("Overriding exchange._ft_has with config params, result: %s", self._ft_has)
2019-06-09 12:05:36 +00:00
2019-06-09 12:35:58 +00:00
# Assign this directly for easy access
self._ohlcv_candle_limit = self._ft_has['ohlcv_candle_limit']
self._ohlcv_partial_candle = self._ft_has['ohlcv_partial_candle']
2019-06-09 12:35:58 +00:00
# Initialize ccxt objects
2019-07-03 02:02:44 +00:00
self._api = self._init_ccxt(
2018-12-10 18:54:43 +00:00
exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config'))
2019-07-03 02:02:44 +00:00
self._api_async = self._init_ccxt(
2018-12-10 18:54:43 +00:00
exchange_config, ccxt_async, ccxt_kwargs=exchange_config.get('ccxt_async_config'))
2017-10-01 21:28:09 +00:00
logger.info('Using Exchange "%s"', self.name)
2019-03-10 12:30:45 +00:00
# Converts the interval provided in minutes in config to seconds
self.markets_refresh_interval: int = exchange_config.get(
"markets_refresh_interval", 60) * 60
# Initial markets load
2019-03-04 22:59:08 +00:00
self._load_markets()
2019-03-10 12:30:45 +00:00
2018-06-17 10:41:33 +00:00
# Check if all pairs are available
self.validate_pairs(config['exchange']['pair_whitelist'])
self.validate_ordertypes(config.get('order_types', {}))
2018-11-25 20:09:35 +00:00
self.validate_order_time_in_force(config.get('order_time_in_force', {}))
2018-07-09 20:11:12 +00:00
if config.get('ticker_interval'):
# Check if timeframe is available
self.validate_timeframes(config['ticker_interval'])
def __del__(self):
"""
Destructor - clean up async stuff
"""
logger.debug("Exchange object destroyed, closing async loop")
if self._api_async and inspect.iscoroutinefunction(self._api_async.close):
asyncio.get_event_loop().run_until_complete(self._api_async.close())
def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt,
ccxt_kwargs: dict = None) -> ccxt.Exchange:
2018-06-17 11:09:23 +00:00
"""
Initialize ccxt with given config and return valid
ccxt instance.
"""
# Find matching class for the given exchange name
name = exchange_config['name']
2019-06-12 19:37:43 +00:00
if not is_exchange_available(name, ccxt_module):
2019-06-11 10:43:29 +00:00
raise OperationalException(f'Exchange {name} is not supported by ccxt')
ex_config = {
'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''),
}
if ccxt_kwargs:
logger.info('Applying additional ccxt config: %s', ccxt_kwargs)
ex_config.update(ccxt_kwargs)
try:
api = getattr(ccxt_module, name.lower())(ex_config)
2018-06-17 11:09:23 +00:00
except (KeyError, AttributeError):
raise OperationalException(f'Exchange {name} is not supported')
2019-07-03 02:13:41 +00:00
except ccxt.BaseError as e:
2019-07-03 02:02:44 +00:00
raise OperationalException(f"Initialization of ccxt failed. Reason: {e}")
2018-06-17 11:09:23 +00:00
self.set_sandbox(api, exchange_config, name)
2018-07-27 08:55:36 +00:00
2018-06-17 11:09:23 +00:00
return api
@property
def name(self) -> str:
"""exchange Name (from ccxt)"""
2018-06-18 20:07:15 +00:00
return self._api.name
@property
def id(self) -> str:
"""exchange ccxt id"""
2018-06-18 20:07:15 +00:00
return self._api.id
2017-10-06 10:22:04 +00:00
2019-03-04 22:59:08 +00:00
@property
def markets(self) -> Dict:
"""exchange ccxt markets"""
2019-03-06 21:57:31 +00:00
if not self._api.markets:
logger.warning("Markets were not loaded. Loading them now..")
self._load_markets()
2019-03-04 22:59:08 +00:00
return self._api.markets
def klines(self, pair_interval: Tuple[str, str], copy=True) -> DataFrame:
if pair_interval in self._klines:
return self._klines[pair_interval].copy() if copy else self._klines[pair_interval]
2018-12-11 18:47:48 +00:00
else:
2018-12-29 12:00:50 +00:00
return DataFrame()
2018-12-11 18:47:48 +00:00
def set_sandbox(self, api, exchange_config: dict, name: str):
if exchange_config.get('sandbox'):
if api.urls.get('test'):
api.urls['api'] = api.urls['test']
logger.info("Enabled Sandbox API on %s", name)
else:
2018-08-05 13:08:07 +00:00
logger.warning(name, "No Sandbox URL in CCXT, exiting. "
2018-07-29 08:10:55 +00:00
"Please check your config.json")
raise OperationalException(f'Exchange {name} does not provide a sandbox api')
2019-03-06 21:48:04 +00:00
def _load_async_markets(self, reload=False) -> None:
2018-08-10 11:04:43 +00:00
try:
if self._api_async:
2019-03-06 21:48:04 +00:00
asyncio.get_event_loop().run_until_complete(
self._api_async.load_markets(reload=reload))
2018-08-10 11:04:43 +00:00
except ccxt.BaseError as e:
logger.warning('Could not load async markets. Reason: %s', e)
return
2019-03-12 15:35:32 +00:00
def _load_markets(self) -> None:
""" Initialize markets both sync and async """
try:
2019-03-12 15:35:32 +00:00
self._api.load_markets()
self._load_async_markets()
2019-03-10 12:30:45 +00:00
self._last_markets_refresh = arrow.utcnow().timestamp
except ccxt.BaseError as e:
logger.warning('Unable to initialize markets. Reason: %s', e)
2019-03-10 12:30:45 +00:00
def _reload_markets(self) -> None:
"""Reload markets both sync and async, if refresh interval has passed"""
# Check whether markets have to be reloaded
if (self._last_markets_refresh > 0) and (
self._last_markets_refresh + self.markets_refresh_interval
> arrow.utcnow().timestamp):
return None
2019-03-10 15:36:25 +00:00
logger.debug("Performing scheduled market reload..")
2019-04-24 19:56:24 +00:00
try:
self._api.load_markets(reload=True)
self._last_markets_refresh = arrow.utcnow().timestamp
except ccxt.BaseError:
2019-04-24 19:56:24 +00:00
logger.exception("Could not reload markets.")
2018-06-17 10:41:33 +00:00
def validate_pairs(self, pairs: List[str]) -> None:
"""
Checks if all given pairs are tradable on the current exchange.
Raises OperationalException if one pair is not available.
:param pairs: list of pairs
:return: None
"""
2017-10-06 10:22:04 +00:00
if not self.markets:
logger.warning('Unable to validate pairs (assuming they are correct).')
# return
2018-06-17 10:41:33 +00:00
for pair in pairs:
# Note: ccxt has BaseCurrency/QuoteCurrency format for pairs
# TODO: add a support for having coins in BTC/USDT format
if self.markets and pair not in self.markets:
2018-06-17 10:41:33 +00:00
raise OperationalException(
2019-03-05 18:45:42 +00:00
f'Pair {pair} is not available on {self.name}. '
f'Please remove {pair} from your whitelist.')
2018-06-17 10:41:33 +00:00
2019-07-07 04:36:35 +00:00
def get_valid_pair_combination(self, curr_1, curr_2) -> str:
"""
2019-07-07 04:36:35 +00:00
Get valid pair combination of curr_1 and curr_2 by trying both combinations.
"""
2019-07-07 04:36:35 +00:00
for pair in [f"{curr_1}/{curr_2}", f"{curr_2}/{curr_1}"]:
2019-07-03 18:20:12 +00:00
if pair in self.markets and self.markets[pair].get('active'):
return pair
2019-07-07 04:36:35 +00:00
raise DependencyException(f"Could not combine {curr_1} and {curr_2} to get a valid pair.")
def validate_timeframes(self, timeframe: List[str]) -> None:
"""
Checks if ticker interval from config is a supported timeframe on the exchange
"""
2019-07-02 22:03:38 +00:00
if not hasattr(self._api, "timeframes") or self._api.timeframes is None:
# If timeframes attribute is missing (or is None), the exchange probably
# has no fetchOHLCV method.
# Therefore we also show that.
raise OperationalException(
2019-07-02 22:03:38 +00:00
f"The ccxt library does not provide the list of timeframes "
f"for the exchange \"{self.name}\" and this exchange "
f"is therefore not supported. ccxt fetchOHLCV: {self.exchange_has('fetchOHLCV')}")
2018-07-05 12:05:31 +00:00
timeframes = self._api.timeframes
if timeframe not in timeframes:
2018-07-05 12:05:31 +00:00
raise OperationalException(
f'Invalid ticker {timeframe}, this Exchange supports {timeframes}')
def validate_ordertypes(self, order_types: Dict) -> None:
"""
Checks if order-types configured in strategy/config are supported
"""
if any(v == 'market' for k, v in order_types.items()):
if not self.exchange_has('createMarketOrder'):
raise OperationalException(
f'Exchange {self.name} does not support market orders.')
2019-02-24 18:35:29 +00:00
if (order_types.get("stoploss_on_exchange")
and not self._ft_has.get("stoploss_on_exchange", False)):
2019-02-24 19:18:41 +00:00
raise OperationalException(
'On exchange stoploss is not supported for %s.' % self.name
)
2018-11-25 20:09:35 +00:00
def validate_order_time_in_force(self, order_time_in_force: Dict) -> None:
"""
Checks if order time in force configured in strategy/config are supported
"""
if any(v not in self._ft_has["order_time_in_force"]
for k, v in order_time_in_force.items()):
2019-03-21 18:12:15 +00:00
raise OperationalException(
f'Time in force policies are not supported for {self.name} yet.')
2018-11-25 20:09:35 +00:00
2018-06-17 10:41:33 +00:00
def exchange_has(self, endpoint: str) -> bool:
"""
Checks if exchange implements a specific API endpoint.
Wrapper around ccxt 'has' attribute
:param endpoint: Name of endpoint (e.g. 'fetchOHLCV', 'fetchTickers')
:return: bool
"""
2018-06-18 20:07:15 +00:00
return endpoint in self._api.has and self._api.has[endpoint]
2018-06-17 10:41:33 +00:00
def symbol_amount_prec(self, pair, amount: float):
'''
Returns the amount to buy or sell to a precision the Exchange accepts
Rounded down
'''
if self.markets[pair]['precision']['amount']:
symbol_prec = self.markets[pair]['precision']['amount']
big_amount = amount * pow(10, symbol_prec)
amount = floor(big_amount) / pow(10, symbol_prec)
return amount
def symbol_price_prec(self, pair, price: float):
'''
Returns the price buying or selling with to the precision the Exchange accepts
Rounds up
'''
if self.markets[pair]['precision']['price']:
symbol_prec = self.markets[pair]['precision']['price']
big_price = price * pow(10, symbol_prec)
price = ceil(big_price) / pow(10, symbol_prec)
return price
def dry_run_order(self, pair: str, ordertype: str, side: str, amount: float,
rate: float, params: Dict = {}) -> Dict[str, Any]:
order_id = f'dry_run_{side}_{randint(0, 10**6)}'
dry_order = { # TODO: additional entry should be added for stoploss limit
"id": order_id,
'pair': pair,
'price': rate,
'amount': amount,
2019-02-23 15:03:15 +00:00
"cost": amount * rate,
'type': ordertype,
2019-03-08 20:17:12 +00:00
'side': side,
2019-02-23 15:03:15 +00:00
'remaining': amount,
'datetime': arrow.utcnow().isoformat(),
2019-02-23 15:03:15 +00:00
'status': "open",
'fee': None,
"info": {}
}
2019-02-23 15:28:13 +00:00
self._store_dry_order(dry_order)
return dry_order
2019-02-23 15:28:13 +00:00
def _store_dry_order(self, dry_order: Dict) -> None:
2019-02-23 15:03:15 +00:00
closed_order = dry_order.copy()
if closed_order["type"] in ["market", "limit"]:
closed_order.update({
"status": "closed",
"filled": closed_order["amount"],
"remaining": 0
})
self._dry_run_open_orders[closed_order["id"]] = closed_order
def create_order(self, pair: str, ordertype: str, side: str, amount: float,
rate: float, params: Dict = {}) -> Dict:
2018-06-06 18:18:16 +00:00
try:
# Set the precision for amount and price(rate) as accepted by the exchange
amount = self.symbol_amount_prec(pair, amount)
needs_price = (ordertype != 'market'
or self._api.options.get("createMarketBuyOrderRequiresPrice", False))
rate = self.symbol_price_prec(pair, rate) if needs_price else None
return self._api.create_order(pair, ordertype, side,
2019-02-17 22:34:15 +00:00
amount, rate, params)
2018-06-17 10:41:33 +00:00
except ccxt.InsufficientFunds as e:
raise DependencyException(
f'Insufficient funds to create {ordertype} {side} order on market {pair}.'
f'Tried to {side} amount {amount} at rate {rate} (total {rate * amount}).'
2018-06-17 10:41:33 +00:00
f'Message: {e}')
except ccxt.InvalidOrder as e:
raise DependencyException(
f'Could not create {ordertype} {side} order on market {pair}.'
f'Tried to {side} amount {amount} at rate {rate} (total {rate * amount}).'
2018-06-17 10:41:33 +00:00
f'Message: {e}')
2018-06-06 18:18:16 +00:00
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not place {side} order due to {e.__class__.__name__}. Message: {e}')
2018-06-06 18:18:16 +00:00
except ccxt.BaseError as e:
raise OperationalException(e)
2017-10-06 10:22:04 +00:00
def buy(self, pair: str, ordertype: str, amount: float,
rate: float, time_in_force) -> Dict:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
dry_order = self.dry_run_order(pair, ordertype, "buy", amount, rate)
2019-02-23 15:03:15 +00:00
return dry_order
2017-10-06 10:22:04 +00:00
params = self._params.copy()
if time_in_force != 'gtc' and ordertype != 'market':
params.update({'timeInForce': time_in_force})
return self.create_order(pair, ordertype, 'buy', amount, rate, params)
2019-02-17 22:34:15 +00:00
def sell(self, pair: str, ordertype: str, amount: float,
rate: float, time_in_force='gtc') -> Dict:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
dry_order = self.dry_run_order(pair, ordertype, "sell", amount, rate)
2019-02-23 15:03:15 +00:00
return dry_order
params = self._params.copy()
if time_in_force != 'gtc' and ordertype != 'market':
params.update({'timeInForce': time_in_force})
return self.create_order(pair, ordertype, 'sell', amount, rate, params)
2017-10-06 10:22:04 +00:00
2018-11-22 15:24:40 +00:00
def stoploss_limit(self, pair: str, amount: float, stop_price: float, rate: float) -> Dict:
"""
creates a stoploss limit order.
NOTICE: it is not supported by all exchanges. only binance is tested for now.
2019-02-24 19:13:38 +00:00
TODO: implementation maybe needs to be moved to the binance subclass
"""
ordertype = "stop_loss_limit"
2018-11-22 15:24:40 +00:00
stop_price = self.symbol_price_prec(pair, stop_price)
# Ensure rate is less than stop price
2018-11-22 19:30:31 +00:00
if stop_price <= rate:
2018-11-22 15:24:40 +00:00
raise OperationalException(
'In stoploss limit order, stop price should be more than limit price')
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
dry_order = self.dry_run_order(
pair, ordertype, "sell", amount, stop_price)
return dry_order
params = self._params.copy()
params.update({'stopPrice': stop_price})
order = self.create_order(pair, ordertype, 'sell', amount, rate, params)
logger.info('stoploss limit order added for %s. '
'stop price: %s. limit: %s' % (pair, stop_price, rate))
return order
2018-06-17 10:41:33 +00:00
@retrier
def get_balance(self, currency: str) -> float:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
return constants.DRY_RUN_WALLET
2018-04-15 17:39:11 +00:00
2018-06-17 10:41:33 +00:00
# ccxt exception is already handled by get_balances
balances = self.get_balances()
balance = balances.get(currency)
if balance is None:
raise TemporaryError(
f'Could not get {currency} balance due to malformed exchange response: {balances}')
return balance['free']
2018-04-15 17:39:11 +00:00
2018-06-17 10:41:33 +00:00
@retrier
def get_balances(self) -> dict:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
2018-06-17 10:41:33 +00:00
return {}
2018-06-17 10:41:33 +00:00
try:
2018-06-18 20:07:15 +00:00
balances = self._api.fetch_balance()
2018-06-17 10:41:33 +00:00
# Remove additional info from ccxt results
balances.pop("info", None)
balances.pop("free", None)
balances.pop("total", None)
balances.pop("used", None)
return balances
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not get balance due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
2018-06-17 10:41:33 +00:00
@retrier
def get_tickers(self) -> Dict:
try:
2018-06-18 20:07:15 +00:00
return self._api.fetch_tickers()
2018-06-17 10:41:33 +00:00
except ccxt.NotSupported as e:
raise OperationalException(
2018-06-18 20:07:15 +00:00
f'Exchange {self._api.name} does not support fetching tickers in batch.'
2018-06-17 10:41:33 +00:00
f'Message: {e}')
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not load tickers due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
2018-06-17 10:41:33 +00:00
@retrier
def get_ticker(self, pair: str, refresh: Optional[bool] = True) -> dict:
2018-06-18 20:09:46 +00:00
if refresh or pair not in self._cached_ticker.keys():
2018-06-17 10:41:33 +00:00
try:
if pair not in self._api.markets or not self._api.markets[pair].get('active'):
raise DependencyException(f"Pair {pair} not available")
2018-06-18 20:07:15 +00:00
data = self._api.fetch_ticker(pair)
2018-06-17 10:41:33 +00:00
try:
2018-06-18 20:09:46 +00:00
self._cached_ticker[pair] = {
2018-06-17 10:41:33 +00:00
'bid': float(data['bid']),
'ask': float(data['ask']),
}
except KeyError:
logger.debug("Could not cache ticker data for %s", pair)
return data
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
2018-08-08 19:55:48 +00:00
f'Could not load ticker due to {e.__class__.__name__}. Message: {e}')
2018-06-17 10:41:33 +00:00
except ccxt.BaseError as e:
raise OperationalException(e)
else:
logger.info("returning cached ticker-data for %s", pair)
2018-06-18 20:09:46 +00:00
return self._cached_ticker[pair]
2018-06-17 10:41:33 +00:00
def get_history(self, pair: str, ticker_interval: str,
2018-08-10 09:08:28 +00:00
since_ms: int) -> List:
"""
Gets candle history using asyncio and returns the list of candles.
Handles all async doing.
"""
return asyncio.get_event_loop().run_until_complete(
self._async_get_history(pair=pair, ticker_interval=ticker_interval,
2018-08-10 09:08:28 +00:00
since_ms=since_ms))
async def _async_get_history(self, pair: str,
ticker_interval: str,
2018-08-10 09:08:28 +00:00
since_ms: int) -> List:
one_call = timeframe_to_msecs(ticker_interval) * self._ohlcv_candle_limit
logger.debug(
"one_call: %s msecs (%s)",
one_call,
arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True)
)
input_coroutines = [self._async_get_candle_history(
pair, ticker_interval, since) for since in
range(since_ms, arrow.utcnow().timestamp * 1000, one_call)]
2018-08-10 09:08:28 +00:00
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
# Combine tickers
data: List = []
for p, ticker_interval, ticker in tickers:
if p == pair:
data.extend(ticker)
# Sort data again after extending the result - above calls return in "async order"
2018-08-18 19:08:59 +00:00
data = sorted(data, key=lambda x: x[0])
2018-08-10 09:08:28 +00:00
logger.info("downloaded %s with length %s.", pair, len(data))
return data
def refresh_latest_ohlcv(self, pair_list: List[Tuple[str, str]]) -> List[Tuple[str, List]]:
"""
Refresh in-memory ohlcv asyncronously and set `_klines` with the result
"""
logger.debug("Refreshing ohlcv data for %d pairs", len(pair_list))
input_coroutines = []
2019-02-20 21:46:35 +00:00
# Gather coroutines to run
2019-01-21 18:56:15 +00:00
for pair, ticker_interval in set(pair_list):
if (not ((pair, ticker_interval) in self._klines)
or self._now_is_time_to_refresh(pair, ticker_interval)):
2018-12-29 12:07:22 +00:00
input_coroutines.append(self._async_get_candle_history(pair, ticker_interval))
else:
logger.debug(
"Using cached ohlcv data for pair %s, interval %s ...",
pair, ticker_interval
)
2018-12-29 12:07:22 +00:00
tickers = asyncio.get_event_loop().run_until_complete(
asyncio.gather(*input_coroutines, return_exceptions=True))
# handle caching
2019-01-19 19:02:37 +00:00
for res in tickers:
if isinstance(res, Exception):
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
continue
pair = res[0]
ticker_interval = res[1]
ticks = res[2]
# keeping last candle time as last refreshed time of the pair
if ticks:
self._pairs_last_refresh_time[(pair, ticker_interval)] = ticks[-1][0] // 1000
# keeping parsed dataframe in cache
self._klines[(pair, ticker_interval)] = parse_ticker_dataframe(
ticks, ticker_interval, pair=pair, fill_missing=True,
drop_incomplete=self._ohlcv_partial_candle)
2018-07-31 10:47:32 +00:00
return tickers
2019-02-20 22:20:24 +00:00
def _now_is_time_to_refresh(self, pair: str, ticker_interval: str) -> bool:
# Calculating ticker interval in seconds
interval_in_sec = timeframe_to_seconds(ticker_interval)
2019-02-20 22:20:24 +00:00
return not ((self._pairs_last_refresh_time.get((pair, ticker_interval), 0)
+ interval_in_sec) >= arrow.utcnow().timestamp)
2019-02-20 22:20:24 +00:00
2018-08-18 19:05:38 +00:00
@retrier_async
async def _async_get_candle_history(self, pair: str, ticker_interval: str,
since_ms: Optional[int] = None) -> Tuple[str, str, List]:
"""
Asyncronously gets candle histories using fetch_ohlcv
returns tuple: (pair, ticker_interval, ohlcv_list)
"""
2018-07-31 10:47:32 +00:00
try:
# fetch ohlcv asynchronously
s = '(' + arrow.get(since_ms // 1000).isoformat() + ') ' if since_ms is not None else ''
logger.debug(
"Fetching pair %s, interval %s, since %s %s...",
pair, ticker_interval, since_ms, s
)
data = await self._api_async.fetch_ohlcv(pair, timeframe=ticker_interval,
since=since_ms)
# Because some exchange sort Tickers ASC and other DESC.
# Ex: Bittrex returns a list of tickers ASC (oldest first, newest last)
# when GDAX returns a list of tickers DESC (newest first, oldest last)
2018-11-25 14:00:50 +00:00
# Only sort if necessary to save computing time
2019-01-19 19:02:37 +00:00
try:
if data and data[0][0] > data[-1][0]:
data = sorted(data, key=lambda x: x[0])
except IndexError:
logger.exception("Error loading %s. Result was %s.", pair, data)
return pair, ticker_interval, []
logger.debug("Done fetching pair %s, interval %s ...", pair, ticker_interval)
return pair, ticker_interval, data
2018-07-31 10:47:32 +00:00
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical candlestick data.'
f'Message: {e}')
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not load ticker history due to {e.__class__.__name__}. Message: {e}')
2018-06-17 10:41:33 +00:00
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch ticker data. Msg: {e}')
2018-06-17 10:41:33 +00:00
@retrier
def cancel_order(self, order_id: str, pair: str) -> None:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
2018-06-17 10:41:33 +00:00
return
try:
2018-06-18 20:07:15 +00:00
return self._api.cancel_order(order_id, pair)
2018-06-17 10:41:33 +00:00
except ccxt.InvalidOrder as e:
2019-04-02 16:45:18 +00:00
raise InvalidOrderException(
2018-06-17 10:41:33 +00:00
f'Could not cancel order. Message: {e}')
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not cancel order due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
@retrier
def get_order(self, order_id: str, pair: str) -> Dict:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
2018-06-18 20:09:46 +00:00
order = self._dry_run_open_orders[order_id]
2018-06-17 10:41:33 +00:00
return order
try:
2018-06-18 20:07:15 +00:00
return self._api.fetch_order(order_id, pair)
2018-06-17 10:41:33 +00:00
except ccxt.InvalidOrder as e:
raise InvalidOrderException(
f'Tried to get an invalid order (id: {order_id}). Message: {e}')
2018-06-17 10:41:33 +00:00
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not get order due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
2017-11-11 18:20:16 +00:00
2018-08-05 04:41:06 +00:00
@retrier
def get_order_book(self, pair: str, limit: int = 100) -> dict:
2018-08-07 10:29:37 +00:00
"""
get order book level 2 from exchange
Notes:
20180619: bittrex doesnt support limits -.-
"""
2018-08-05 04:41:06 +00:00
try:
return self._api.fetch_l2_order_book(pair, limit)
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching order book.'
f'Message: {e}')
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not get order book due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
2018-06-17 10:41:33 +00:00
@retrier
def get_trades_for_order(self, order_id: str, pair: str, since: datetime) -> List:
2019-02-28 23:13:16 +00:00
if self._config['dry_run']:
2018-06-17 10:41:33 +00:00
return []
if not self.exchange_has('fetchMyTrades'):
return []
try:
2018-09-15 18:28:36 +00:00
# Allow 5s offset to catch slight time offsets (discovered in #1185)
my_trades = self._api.fetch_my_trades(pair, since.timestamp() - 5)
2018-06-17 10:41:33 +00:00
matched_trades = [trade for trade in my_trades if trade['order'] == order_id]
2017-11-11 18:20:16 +00:00
2018-06-17 10:41:33 +00:00
return matched_trades
2018-06-17 10:41:33 +00:00
except ccxt.NetworkError as e:
raise TemporaryError(
f'Could not get trades due to networking error. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
2018-06-17 10:41:33 +00:00
@retrier
def get_fee(self, symbol='ETH/BTC', type='', side='', amount=1,
price=1, taker_or_maker='maker') -> float:
try:
# validate that markets are loaded before trying to get fee
2018-06-18 20:07:15 +00:00
if self._api.markets is None or len(self._api.markets) == 0:
self._api.load_markets()
2018-04-15 17:39:11 +00:00
2018-06-18 20:07:15 +00:00
return self._api.calculate_fee(symbol=symbol, type=type, side=side, amount=amount,
2018-06-17 10:41:33 +00:00
price=price, takerOrMaker=taker_or_maker)['rate']
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not get fee info due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(e)
2019-06-11 10:18:35 +00:00
def is_exchange_bad(exchange: str) -> bool:
return exchange in ['bitmex', 'bitstamp']
2019-06-11 10:18:35 +00:00
2019-06-12 19:37:43 +00:00
def is_exchange_available(exchange: str, ccxt_module=None) -> bool:
return exchange in available_exchanges(ccxt_module)
2019-06-11 10:18:35 +00:00
def is_exchange_officially_supported(exchange: str) -> bool:
return exchange in ['bittrex', 'binance']
2019-06-12 19:37:43 +00:00
def available_exchanges(ccxt_module=None) -> List[str]:
2019-04-10 21:07:27 +00:00
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
def timeframe_to_seconds(ticker_interval: str) -> int:
"""
Translates the timeframe interval value written in the human readable
form ('1m', '5m', '1h', '1d', '1w', etc.) to the number
of seconds for one timeframe interval.
"""
return ccxt.Exchange.parse_timeframe(ticker_interval)
def timeframe_to_minutes(ticker_interval: str) -> int:
"""
Same as above, but returns minutes.
"""
return ccxt.Exchange.parse_timeframe(ticker_interval) // 60
def timeframe_to_msecs(ticker_interval: str) -> int:
"""
Same as above, but returns milliseconds.
"""
return ccxt.Exchange.parse_timeframe(ticker_interval) * 1000