ccxt async POC

This commit is contained in:
misagh
2018-07-31 12:47:32 +02:00
parent 336cd524a3
commit c8f125dbb9
2 changed files with 65 additions and 10 deletions

View File

@@ -6,7 +6,9 @@ from typing import List, Dict, Any, Optional
from datetime import datetime
import ccxt
import ccxt.async_support as ccxt_async
import arrow
import asyncio
from freqtrade import constants, OperationalException, DependencyException, TemporaryError
@@ -44,6 +46,7 @@ class Exchange(object):
# Current selected exchange
_api: ccxt.Exchange = None
_api_async: ccxt_async.Exchange = None
_conf: Dict = {}
_cached_ticker: Dict[str, Any] = {}
@@ -64,6 +67,7 @@ class Exchange(object):
exchange_config = config['exchange']
self._api = self._init_ccxt(exchange_config)
self._api_async = self._init_ccxt(exchange_config, ccxt_async)
logger.info('Using Exchange "%s"', self.name)
@@ -74,7 +78,7 @@ class Exchange(object):
# Check if timeframe is available
self.validate_timeframes(config['ticker_interval'])
def _init_ccxt(self, exchange_config: dict) -> ccxt.Exchange:
def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt) -> ccxt.Exchange:
"""
Initialize ccxt with given config and return valid
ccxt instance.
@@ -82,15 +86,16 @@ class Exchange(object):
# Find matching class for the given exchange name
name = exchange_config['name']
if name not in ccxt.exchanges:
if name not in ccxt_module.exchanges:
raise OperationalException(f'Exchange {name} is not supported')
try:
api = getattr(ccxt, name.lower())({
api = getattr(ccxt_module, name.lower())({
'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
'uid': exchange_config.get('uid', ''),
'enableRateLimit': True,
#'enableRateLimit': True,
'enableRateLimit': False,
})
except (KeyError, AttributeError):
raise OperationalException(f'Exchange {name} is not supported')
@@ -286,6 +291,35 @@ class Exchange(object):
logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair]
async def async_get_tickers_history(self, pairs, tick_interval):
# COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ?
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
input_coroutines = [self.async_get_ticker_history(symbol, tick_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
#await self._api_async.close()
return tickers
async def async_get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]:
try:
# fetch ohlcv asynchronously
print("fetching %s ..." % pair)
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, since=since_ms)
print("done fetching %s ..." % pair)
return pair, data
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical candlestick data.'
f'Message: {e}')
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(
f'Could not load ticker history due to {e.__class__.__name__}. Message: {e}')
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch ticker data. Msg: {e}')
@retrier
def get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]: