From 9fa64c264768c4156c7cfc84b5a86badd7ee62ef Mon Sep 17 00:00:00 2001 From: Matthias Date: Thu, 4 Nov 2021 20:00:02 +0100 Subject: [PATCH] Allow multiple calls to get more candles in live-run --- freqtrade/exchange/binance.py | 10 ++++---- freqtrade/exchange/exchange.py | 42 +++++++++++++++++++++++---------- tests/exchange/test_binance.py | 9 ++++--- tests/exchange/test_exchange.py | 8 ++++--- 4 files changed, 47 insertions(+), 22 deletions(-) diff --git a/freqtrade/exchange/binance.py b/freqtrade/exchange/binance.py index 06d64999d..4ba30b626 100644 --- a/freqtrade/exchange/binance.py +++ b/freqtrade/exchange/binance.py @@ -1,6 +1,6 @@ """ Binance exchange subclass """ import logging -from typing import Dict, List +from typing import Dict, List, Tuple import arrow import ccxt @@ -93,8 +93,9 @@ class Binance(Exchange): raise OperationalException(e) from e async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, - since_ms: int, is_new_pair: bool = False - ) -> List: + since_ms: int, is_new_pair: bool = False, + raise_: bool = False + ) -> Tuple[str, str, List]: """ Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date Does not work for other exchanges, which don't return the earliest data when called with "0" @@ -107,4 +108,5 @@ class Binance(Exchange): logger.info(f"Candle-data for {pair} available starting with " f"{arrow.get(since_ms // 1000).isoformat()}.") return await super()._async_get_historic_ohlcv( - pair=pair, timeframe=timeframe, since_ms=since_ms, is_new_pair=is_new_pair) + pair=pair, timeframe=timeframe, since_ms=since_ms, is_new_pair=is_new_pair, + raise_=raise_) diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index b22556f60..01f0864c4 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -7,7 +7,7 @@ import http import inspect import logging from copy import deepcopy -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from math import ceil from typing import Any, Dict, List, Optional, Tuple @@ -1205,9 +1205,11 @@ class Exchange: :param since_ms: Timestamp in milliseconds to get history from :return: List with candle (OHLCV) data """ - return asyncio.get_event_loop().run_until_complete( + pair, timeframe, data = asyncio.get_event_loop().run_until_complete( self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe, since_ms=since_ms, is_new_pair=is_new_pair)) + logger.info(f"Downloaded data for {pair} with length {len(data)}.") + return data def get_historic_ohlcv_as_df(self, pair: str, timeframe: str, since_ms: int) -> DataFrame: @@ -1223,7 +1225,8 @@ class Exchange: drop_incomplete=self._ohlcv_partial_candle) async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, - since_ms: int, is_new_pair: bool = False + since_ms: int, is_new_pair: bool = False, + raise_: bool = False ) -> Tuple[str, str, List]: """ Download historic ohlcv @@ -1248,15 +1251,17 @@ class Exchange: for res in results: if isinstance(res, Exception): logger.warning("Async code raised an exception: %s", res.__class__.__name__) + if raise_: + raise continue - # Deconstruct tuple if it's not an exception - p, _, new_data = res - if p == pair: - data.extend(new_data) + else: + # Deconstruct tuple if it's not an exception + p, _, new_data = res + if p == pair: + data.extend(new_data) # Sort data again after extending the result - above calls return in "async order" data = sorted(data, key=lambda x: x[0]) - logger.info(f"Downloaded data for {pair} with length {len(data)}.") - return data + return pair, timeframe, data def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, since_ms: Optional[int] = None, cache: bool = True @@ -1276,10 +1281,23 @@ class Exchange: cached_pairs = [] # Gather coroutines to run for pair, timeframe in set(pair_list): - if (((pair, timeframe) not in self._klines) + if ((pair, timeframe) not in self._klines or self._now_is_time_to_refresh(pair, timeframe)): - input_coroutines.append(self._async_get_candle_history(pair, timeframe, - since_ms=since_ms)) + call_count = self._ft_has.get('ohlcv_candle_call_count', 1) + if not since_ms and call_count > 1: + # Multiple calls for one pair - to get more history + one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(timeframe) + move_to = one_call * call_count + now = timeframe_to_next_date(timeframe) + since_ms = int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000) + + if since_ms: + input_coroutines.append(self._async_get_historic_ohlcv( + pair, timeframe, since_ms=since_ms, raise_=True)) + else: + # One call ... "regular" refresh + input_coroutines.append(self._async_get_candle_history( + pair, timeframe, since_ms=since_ms)) else: logger.debug( "Using cached candle (OHLCV) data for pair %s, timeframe %s ...", diff --git a/tests/exchange/test_binance.py b/tests/exchange/test_binance.py index dd85c3abe..d88ae9b1d 100644 --- a/tests/exchange/test_binance.py +++ b/tests/exchange/test_binance.py @@ -126,13 +126,16 @@ async def test__async_get_historic_ohlcv_binance(default_conf, mocker, caplog): exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv) pair = 'ETH/BTC' - res = await exchange._async_get_historic_ohlcv(pair, "5m", - 1500000000000, is_new_pair=False) + respair, restf, res = await exchange._async_get_historic_ohlcv( + pair, "5m", 1500000000000, is_new_pair=False) + assert respair == pair + assert restf == '5m' # Call with very old timestamp - causes tons of requests assert exchange._api_async.fetch_ohlcv.call_count > 400 # assert res == ohlcv exchange._api_async.fetch_ohlcv.reset_mock() - res = await exchange._async_get_historic_ohlcv(pair, "5m", 1500000000000, is_new_pair=True) + _, _, res = await exchange._async_get_historic_ohlcv( + pair, "5m", 1500000000000, is_new_pair=True) # Called twice - one "init" call - and one to get the actual data. assert exchange._api_async.fetch_ohlcv.call_count == 2 diff --git a/tests/exchange/test_exchange.py b/tests/exchange/test_exchange.py index e3369182d..34e2b04ab 100644 --- a/tests/exchange/test_exchange.py +++ b/tests/exchange/test_exchange.py @@ -1506,6 +1506,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name): assert exchange._async_get_candle_history.call_count == 2 # Returns twice the above OHLCV data assert len(ret) == 2 + assert log_has_re(r'Downloaded data for .* with length .*\.', caplog) caplog.clear() @@ -1587,12 +1588,13 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_ exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv) pair = 'ETH/USDT' - res = await exchange._async_get_historic_ohlcv(pair, "5m", - 1500000000000, is_new_pair=False) + respair, restf, res = await exchange._async_get_historic_ohlcv( + pair, "5m", 1500000000000, is_new_pair=False) + assert respair == pair + assert restf == '5m' # Call with very old timestamp - causes tons of requests assert exchange._api_async.fetch_ohlcv.call_count > 200 assert res[0] == ohlcv[0] - assert log_has_re(r'Downloaded data for .* with length .*\.', caplog) def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None: