exchange: proof-of-concept ccxt async support

This commit is contained in:
Samuel Husso 2018-07-27 10:13:58 +03:00
parent a3d870ad3e
commit ab8993d13f

View File

@ -6,7 +6,9 @@ from typing import List, Dict, Any, Optional
from datetime import datetime
import ccxt
import ccxt.async_support as ccxt_async
import arrow
import asyncio
from freqtrade import constants, OperationalException, DependencyException, TemporaryError
@ -40,10 +42,21 @@ def retrier(f):
return wrapper
def run_async_task(f, **kwargs):
"""
Run async task and its kwargs.
Usage:
run_async_task(async_f, arg1='foo', arg2='bar')
"""
logger.info('Running async task method %s with args %s', f, kwargs)
return asyncio.get_event_loop().run_until_complete(f(**kwargs))
class Exchange(object):
# Current selected exchange
_api: ccxt.Exchange = None
_api_async: ccxt_async.Exchange = None
_conf: Dict = {}
_cached_ticker: Dict[str, Any] = {}
@ -64,6 +77,7 @@ class Exchange(object):
exchange_config = config['exchange']
self._api = self._init_ccxt(exchange_config)
self._api_async = self._init_ccxt(exchange_config, ccxt_async)
logger.info('Using Exchange "%s"', self.name)
@ -74,7 +88,25 @@ class Exchange(object):
# Check if timeframe is available
self.validate_timeframes(config['ticker_interval'])
def _init_ccxt(self, exchange_config: dict) -> ccxt.Exchange:
self.__loop = None
self._ticker_history = {}
self._fetching_progress = False
for _pair in config['exchange']['pair_whitelist']:
print(f'initting {_pair} for history cache')
self._ticker_history[_pair] = {}
logger.info('Ticker_history cache for given pairs%s', self._ticker_history)
@property
def loop(self):
return self.__loop
@loop.setter
def loop(self, loop):
if not self.__loop:
logger.info('** Setting exchange asyncio loop **')
self.__loop = loop
def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt) -> ccxt.Exchange:
"""
Initialize ccxt with given config and return valid
ccxt instance.
@ -82,10 +114,10 @@ class Exchange(object):
# Find matching class for the given exchange name
name = exchange_config['name']
if name not in ccxt.exchanges:
if name not in ccxt_module.exchanges:
raise OperationalException(f'Exchange {name} is not supported')
try:
api = getattr(ccxt, name.lower())({
api = getattr(ccxt_module, name.lower())({
'apiKey': exchange_config.get('key'),
'secret': exchange_config.get('secret'),
'password': exchange_config.get('password'),
@ -286,6 +318,23 @@ class Exchange(object):
logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair]
async def get_ticker_history_async(self, tick_interval: str,
since_ms: Optional[int] = None):
"""Fetch exchange ticker history for configured pairs"""
if self._fetching_progress:
return self._ticker_history
data = []
self._fetching_progress = True
for _pair in self._ticker_history:
logger.info('*'*20)
logger.info('Async: Fetching history for pair %s', _pair)
logger.info('*'*20)
data = await self._api_async.fetch_ohlcv(_pair, timeframe=tick_interval)
logger.info('Async data fetched len %s', len(data))
print(f'***{_pair} Some data appears...... {data[0]}')
return data
@retrier
def get_ticker_history(self, pair: str, tick_interval: str,
since_ms: Optional[int] = None) -> List[Dict]: