diff --git a/freqtrade/exchange/__init__.py b/freqtrade/exchange/__init__.py index 810957902..ecd8e3615 100644 --- a/freqtrade/exchange/__init__.py +++ b/freqtrade/exchange/__init__.py @@ -7,7 +7,9 @@ from datetime import datetime from math import floor, ceil import ccxt +import ccxt.async_support as ccxt_async import arrow +import asyncio from freqtrade import constants, OperationalException, DependencyException, TemporaryError @@ -41,10 +43,21 @@ def retrier(f): return wrapper +def run_async_task(f, **kwargs): + """ + Run async task and its kwargs. + Usage: + run_async_task(async_f, arg1='foo', arg2='bar') + """ + logger.info('Running async task method %s with args %s', f, kwargs) + return asyncio.get_event_loop().run_until_complete(f(**kwargs)) + + class Exchange(object): # Current selected exchange _api: ccxt.Exchange = None + _api_async: ccxt_async.Exchange = None _conf: Dict = {} _cached_ticker: Dict[str, Any] = {} @@ -65,6 +78,7 @@ class Exchange(object): exchange_config = config['exchange'] self._api = self._init_ccxt(exchange_config) + self._api_async = self._init_ccxt(exchange_config, ccxt_async) logger.info('Using Exchange "%s"', self.name) @@ -75,7 +89,25 @@ class Exchange(object): # Check if timeframe is available self.validate_timeframes(config['ticker_interval']) - def _init_ccxt(self, exchange_config: dict) -> ccxt.Exchange: + self.__loop = None + self._ticker_history = {} + self._fetching_progress = False + for _pair in config['exchange']['pair_whitelist']: + print(f'initting {_pair} for history cache') + self._ticker_history[_pair] = {} + logger.info('Ticker_history cache for given pairs%s', self._ticker_history) + + @property + def loop(self): + return self.__loop + + @loop.setter + def loop(self, loop): + if not self.__loop: + logger.info('** Setting exchange asyncio loop **') + self.__loop = loop + + def _init_ccxt(self, exchange_config: dict, ccxt_module=ccxt) -> ccxt.Exchange: """ Initialize ccxt with given config and return valid ccxt instance. @@ -83,10 +115,10 @@ class Exchange(object): # Find matching class for the given exchange name name = exchange_config['name'] - if name not in ccxt.exchanges: + if name not in ccxt_module.exchanges: raise OperationalException(f'Exchange {name} is not supported') try: - api = getattr(ccxt, name.lower())({ + api = getattr(ccxt_module, name.lower())({ 'apiKey': exchange_config.get('key'), 'secret': exchange_config.get('secret'), 'password': exchange_config.get('password'), @@ -329,6 +361,23 @@ class Exchange(object): logger.info("returning cached ticker-data for %s", pair) return self._cached_ticker[pair] + async def get_ticker_history_async(self, tick_interval: str, + since_ms: Optional[int] = None): + """Fetch exchange ticker history for configured pairs""" + if self._fetching_progress: + return self._ticker_history + + data = [] + self._fetching_progress = True + for _pair in self._ticker_history: + logger.info('*'*20) + logger.info('Async: Fetching history for pair %s', _pair) + logger.info('*'*20) + data = await self._api_async.fetch_ohlcv(_pair, timeframe=tick_interval) + logger.info('Async data fetched len %s', len(data)) + print(f'***{_pair} Some data appears...... {data[0]}') + return data + @retrier def get_ticker_history(self, pair: str, tick_interval: str, since_ms: Optional[int] = None) -> List[Dict]: diff --git a/freqtrade/freqtradebot.py b/freqtrade/freqtradebot.py index 46fbb3a38..e927152c9 100644 --- a/freqtrade/freqtradebot.py +++ b/freqtrade/freqtradebot.py @@ -12,10 +12,11 @@ from typing import Any, Callable, Dict, List, Optional import arrow import requests from cachetools import TTLCache, cached +import asyncio from freqtrade import (DependencyException, OperationalException, TemporaryError, __version__, constants, persistence) -from freqtrade.exchange import Exchange +from freqtrade.exchange import Exchange, run_async_task from freqtrade.persistence import Trade from freqtrade.rpc import RPCManager, RPCMessageType from freqtrade.state import State @@ -43,6 +44,8 @@ class FreqtradeBot(object): __version__, ) + self.loop = asyncio.get_event_loop() + # Init bot states self.state = State.STOPPED @@ -52,6 +55,7 @@ class FreqtradeBot(object): self.rpc: RPCManager = RPCManager(self) self.persistence = None self.exchange = Exchange(self.config) + self.exchange.loop = self.loop self._init_modules() def _init_modules(self) -> None: @@ -146,6 +150,10 @@ class FreqtradeBot(object): final_list = sanitized_list[:nb_assets] if nb_assets else sanitized_list self.config['exchange']['pair_whitelist'] = final_list + # async update ticker history + run_async_task(self.exchange.get_ticker_history_async, + tick_interval = self.strategy.ticker_interval) + # Query trades from persistence layer trades = Trade.query.filter(Trade.is_open.is_(True)).all()