diff --git a/freqtrade/edge/__init__.py b/freqtrade/edge/__init__.py index 49acbd3e7..9bd43e37f 100644 --- a/freqtrade/edge/__init__.py +++ b/freqtrade/edge/__init__.py @@ -144,20 +144,6 @@ class Edge(): self._cached_pairs = self._process_expectancy(trades_df) self._last_updated = arrow.utcnow().timestamp - # Not a nice hack but probably simplest solution: - # When backtest load data it loads the delta between disk and exchange - # The problem is that exchange consider that recent. - # it is but it is incomplete (c.f. _async_get_candle_history) - # So it causes get_signal to exit cause incomplete ticker_hist - # A patch to that would be update _pairs_last_refresh_time of exchange - # so it will download again all pairs - # Another solution is to add new data to klines instead of reassigning it: - # self.klines[pair].update(data) instead of self.klines[pair] = data in exchange package. - # But that means indexing timestamp and having a verification so that - # there is no empty range between two timestaps (recently added and last - # one) - self.exchange._pairs_last_refresh_time = {} - return True def stake_amount(self, pair: str, free_capital: float, diff --git a/freqtrade/exchange/__init__.py b/freqtrade/exchange/__init__.py index 65e912a1f..c0fb6629f 100644 --- a/freqtrade/exchange/__init__.py +++ b/freqtrade/exchange/__init__.py @@ -64,14 +64,8 @@ def retrier(f): class Exchange(object): - # Current selected exchange - _api: ccxt.Exchange = None - _api_async: ccxt_async.Exchange = None _conf: Dict = {} - # Holds all open sell orders for dry_run - _dry_run_open_orders: Dict[str, Any] = {} - def __init__(self, config: dict) -> None: """ Initializes this module with the given config, @@ -89,13 +83,17 @@ class Exchange(object): # Holds candles self.klines: Dict[str, Any] = {} + # Holds all open sell orders for dry_run + self._dry_run_open_orders: Dict[str, Any] = {} + if config['dry_run']: logger.info('Instance is running with dry_run enabled') exchange_config = config['exchange'] - self._api = self._init_ccxt(exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config')) - self._api_async = self._init_ccxt(exchange_config, ccxt_async, - ccxt_kwargs=exchange_config.get('ccxt_async_config')) + self._api: ccxt.Exchange = self._init_ccxt( + exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config')) + self._api_async: ccxt_async.Exchange = self._init_ccxt( + exchange_config, ccxt_async, ccxt_kwargs=exchange_config.get('ccxt_async_config')) logger.info('Using Exchange "%s"', self.name) @@ -128,12 +126,12 @@ class Exchange(object): raise OperationalException(f'Exchange {name} is not supported') ex_config = { - 'apiKey': exchange_config.get('key'), - 'secret': exchange_config.get('secret'), - 'password': exchange_config.get('password'), - 'uid': exchange_config.get('uid', ''), - 'enableRateLimit': exchange_config.get('ccxt_rate_limit', True) - } + 'apiKey': exchange_config.get('key'), + 'secret': exchange_config.get('secret'), + 'password': exchange_config.get('password'), + 'uid': exchange_config.get('uid', ''), + 'enableRateLimit': exchange_config.get('ccxt_rate_limit', True) + } if ccxt_kwargs: logger.info('Applying additional ccxt config: %s', ccxt_kwargs) ex_config.update(ccxt_kwargs) @@ -491,9 +489,9 @@ class Exchange(object): # Combine tickers data: List = [] - for tick in tickers: - if tick[0] == pair: - data.extend(tick[1]) + for p, ticker in tickers: + if p == pair: + data.extend(ticker) # Sort data again after extending the result - above calls return in "async order" order data = sorted(data, key=lambda x: x[0]) logger.info("downloaded %s with length %s.", pair, len(data)) @@ -501,7 +499,7 @@ class Exchange(object): def refresh_tickers(self, pair_list: List[str], ticker_interval: str) -> None: """ - Refresh tickers asyncronously and return the result. + Refresh tickers asyncronously and set `klines` of this object with the result """ logger.debug("Refreshing klines for %d pairs", len(pair_list)) asyncio.get_event_loop().run_until_complete( @@ -510,9 +508,27 @@ class Exchange(object): async def async_get_candles_history(self, pairs: List[str], tick_interval: str) -> List[Tuple[str, List]]: """Download ohlcv history for pair-list asyncronously """ - input_coroutines = [self._async_get_candle_history( - symbol, tick_interval) for symbol in pairs] + # Calculating ticker interval in second + interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 + input_coroutines = [] + + # Gather corotines to run + for pair in pairs: + if not (self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >= + arrow.utcnow().timestamp and pair in self.klines): + input_coroutines.append(self._async_get_candle_history(pair, tick_interval)) + else: + logger.debug("Using cached klines data for %s ...", pair) + tickers = await asyncio.gather(*input_coroutines, return_exceptions=True) + + # handle caching + for pair, ticks in tickers: + # keeping last candle time as last refreshed time of the pair + if ticks: + self._pairs_last_refresh_time[pair] = ticks[-1][0] // 1000 + # keeping parsed dataframe in cache + self.klines[pair] = ticks return tickers @retrier_async @@ -522,20 +538,8 @@ class Exchange(object): # fetch ohlcv asynchronously logger.debug("fetching %s since %s ...", pair, since_ms) - # Calculating ticker interval in second - interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 - - # If (last update time) + (interval in second) is greater or equal than now - # that means we don't have to hit the API as there is no new candle - # so we fetch it from local cache - if (not since_ms and - self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >= - arrow.utcnow().timestamp): - data = self.klines[pair] - logger.debug("Using cached klines data for %s ...", pair) - else: - data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, - since=since_ms) + data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, + since=since_ms) # Because some exchange sort Tickers ASC and other DESC. # Ex: Bittrex returns a list of tickers ASC (oldest first, newest last) @@ -544,13 +548,6 @@ class Exchange(object): if data and data[0][0] > data[-1][0]: data = sorted(data, key=lambda x: x[0]) - # keeping last candle time as last refreshed time of the pair - if data: - self._pairs_last_refresh_time[pair] = data[-1][0] // 1000 - - # keeping candles in cache - self.klines[pair] = data - logger.debug("done fetching %s ...", pair) return pair, data diff --git a/freqtrade/tests/exchange/test_exchange.py b/freqtrade/tests/exchange/test_exchange.py index d1f391266..b711dd3ab 100644 --- a/freqtrade/tests/exchange/test_exchange.py +++ b/freqtrade/tests/exchange/test_exchange.py @@ -737,7 +737,7 @@ def test_get_history(default_conf, mocker, caplog): def test_refresh_tickers(mocker, default_conf, caplog) -> None: tick = [ [ - 1511686200000, # unix timestamp ms + arrow.utcnow().timestamp * 1000, # unix timestamp ms 1, # open 2, # high 3, # low @@ -757,9 +757,16 @@ def test_refresh_tickers(mocker, default_conf, caplog) -> None: assert log_has(f'Refreshing klines for {len(pairs)} pairs', caplog.record_tuples) assert exchange.klines + assert exchange._api_async.fetch_ohlcv.call_count == 2 for pair in pairs: assert exchange.klines[pair] + # test caching + exchange.refresh_tickers(['IOTA/ETH', 'XRP/ETH'], '5m') + + assert exchange._api_async.fetch_ohlcv.call_count == 2 + assert log_has(f"Using cached klines data for {pairs[0]} ...", caplog.record_tuples) + @pytest.mark.asyncio async def test__async_get_candle_history(default_conf, mocker, caplog): @@ -788,10 +795,6 @@ async def test__async_get_candle_history(default_conf, mocker, caplog): assert res[1] == tick assert exchange._api_async.fetch_ohlcv.call_count == 1 assert not log_has(f"Using cached klines data for {pair} ...", caplog.record_tuples) - # test caching - res = await exchange._async_get_candle_history(pair, "5m") - assert exchange._api_async.fetch_ohlcv.call_count == 1 - assert log_has(f"Using cached klines data for {pair} ...", caplog.record_tuples) # exchange = Exchange(default_conf) await async_ccxt_exception(mocker, default_conf, MagicMock(), diff --git a/freqtrade/tests/optimize/test_optimize.py b/freqtrade/tests/optimize/test_optimize.py index d73f31ad5..970041a4f 100644 --- a/freqtrade/tests/optimize/test_optimize.py +++ b/freqtrade/tests/optimize/test_optimize.py @@ -283,13 +283,15 @@ def test_download_pairs_exception(ticker_history, mocker, caplog, default_conf) def test_download_backtesting_testdata(ticker_history, mocker, default_conf) -> None: mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history) exchange = get_patched_exchange(mocker, default_conf) - + # Tst that pairs-cached is not touched. + assert not exchange._pairs_last_refresh_time # Download a 1 min ticker file file1 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'XEL_BTC-1m.json') _backup_file(file1) download_backtesting_testdata(None, exchange, pair="XEL/BTC", tick_interval='1m') assert os.path.isfile(file1) is True _clean_test_file(file1) + assert not exchange._pairs_last_refresh_time # Download a 5 min ticker file file2 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'STORJ_BTC-5m.json') @@ -298,6 +300,7 @@ def test_download_backtesting_testdata(ticker_history, mocker, default_conf) -> download_backtesting_testdata(None, exchange, pair="STORJ/BTC", tick_interval='5m') assert os.path.isfile(file2) is True _clean_test_file(file2) + assert not exchange._pairs_last_refresh_time def test_download_backtesting_testdata2(mocker, default_conf) -> None: