Allow multiple calls to get more candles in live-run

This commit is contained in:
Matthias 2021-11-04 20:00:02 +01:00
parent fb6ba62158
commit 9fa64c2647
4 changed files with 47 additions and 22 deletions

View File

@ -1,6 +1,6 @@
""" Binance exchange subclass """ """ Binance exchange subclass """
import logging import logging
from typing import Dict, List from typing import Dict, List, Tuple
import arrow import arrow
import ccxt import ccxt
@ -93,8 +93,9 @@ class Binance(Exchange):
raise OperationalException(e) from e raise OperationalException(e) from e
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, is_new_pair: bool = False since_ms: int, is_new_pair: bool = False,
) -> List: raise_: bool = False
) -> Tuple[str, str, List]:
""" """
Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date
Does not work for other exchanges, which don't return the earliest data when called with "0" Does not work for other exchanges, which don't return the earliest data when called with "0"
@ -107,4 +108,5 @@ class Binance(Exchange):
logger.info(f"Candle-data for {pair} available starting with " logger.info(f"Candle-data for {pair} available starting with "
f"{arrow.get(since_ms // 1000).isoformat()}.") f"{arrow.get(since_ms // 1000).isoformat()}.")
return await super()._async_get_historic_ohlcv( return await super()._async_get_historic_ohlcv(
pair=pair, timeframe=timeframe, since_ms=since_ms, is_new_pair=is_new_pair) pair=pair, timeframe=timeframe, since_ms=since_ms, is_new_pair=is_new_pair,
raise_=raise_)

View File

@ -7,7 +7,7 @@ import http
import inspect import inspect
import logging import logging
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timezone from datetime import datetime, timedelta, timezone
from math import ceil from math import ceil
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
@ -1205,9 +1205,11 @@ class Exchange:
:param since_ms: Timestamp in milliseconds to get history from :param since_ms: Timestamp in milliseconds to get history from
:return: List with candle (OHLCV) data :return: List with candle (OHLCV) data
""" """
return asyncio.get_event_loop().run_until_complete( pair, timeframe, data = asyncio.get_event_loop().run_until_complete(
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe, self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
since_ms=since_ms, is_new_pair=is_new_pair)) since_ms=since_ms, is_new_pair=is_new_pair))
logger.info(f"Downloaded data for {pair} with length {len(data)}.")
return data
def get_historic_ohlcv_as_df(self, pair: str, timeframe: str, def get_historic_ohlcv_as_df(self, pair: str, timeframe: str,
since_ms: int) -> DataFrame: since_ms: int) -> DataFrame:
@ -1223,7 +1225,8 @@ class Exchange:
drop_incomplete=self._ohlcv_partial_candle) drop_incomplete=self._ohlcv_partial_candle)
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, async def _async_get_historic_ohlcv(self, pair: str, timeframe: str,
since_ms: int, is_new_pair: bool = False since_ms: int, is_new_pair: bool = False,
raise_: bool = False
) -> Tuple[str, str, List]: ) -> Tuple[str, str, List]:
""" """
Download historic ohlcv Download historic ohlcv
@ -1248,15 +1251,17 @@ class Exchange:
for res in results: for res in results:
if isinstance(res, Exception): if isinstance(res, Exception):
logger.warning("Async code raised an exception: %s", res.__class__.__name__) logger.warning("Async code raised an exception: %s", res.__class__.__name__)
if raise_:
raise
continue continue
# Deconstruct tuple if it's not an exception else:
p, _, new_data = res # Deconstruct tuple if it's not an exception
if p == pair: p, _, new_data = res
data.extend(new_data) if p == pair:
data.extend(new_data)
# Sort data again after extending the result - above calls return in "async order" # Sort data again after extending the result - above calls return in "async order"
data = sorted(data, key=lambda x: x[0]) data = sorted(data, key=lambda x: x[0])
logger.info(f"Downloaded data for {pair} with length {len(data)}.") return pair, timeframe, data
return data
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True since_ms: Optional[int] = None, cache: bool = True
@ -1276,10 +1281,23 @@ class Exchange:
cached_pairs = [] cached_pairs = []
# Gather coroutines to run # Gather coroutines to run
for pair, timeframe in set(pair_list): for pair, timeframe in set(pair_list):
if (((pair, timeframe) not in self._klines) if ((pair, timeframe) not in self._klines
or self._now_is_time_to_refresh(pair, timeframe)): or self._now_is_time_to_refresh(pair, timeframe)):
input_coroutines.append(self._async_get_candle_history(pair, timeframe, call_count = self._ft_has.get('ohlcv_candle_call_count', 1)
since_ms=since_ms)) if not since_ms and call_count > 1:
# Multiple calls for one pair - to get more history
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(timeframe)
move_to = one_call * call_count
now = timeframe_to_next_date(timeframe)
since_ms = int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
if since_ms:
input_coroutines.append(self._async_get_historic_ohlcv(
pair, timeframe, since_ms=since_ms, raise_=True))
else:
# One call ... "regular" refresh
input_coroutines.append(self._async_get_candle_history(
pair, timeframe, since_ms=since_ms))
else: else:
logger.debug( logger.debug(
"Using cached candle (OHLCV) data for pair %s, timeframe %s ...", "Using cached candle (OHLCV) data for pair %s, timeframe %s ...",

View File

@ -126,13 +126,16 @@ async def test__async_get_historic_ohlcv_binance(default_conf, mocker, caplog):
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv) exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
pair = 'ETH/BTC' pair = 'ETH/BTC'
res = await exchange._async_get_historic_ohlcv(pair, "5m", respair, restf, res = await exchange._async_get_historic_ohlcv(
1500000000000, is_new_pair=False) pair, "5m", 1500000000000, is_new_pair=False)
assert respair == pair
assert restf == '5m'
# Call with very old timestamp - causes tons of requests # Call with very old timestamp - causes tons of requests
assert exchange._api_async.fetch_ohlcv.call_count > 400 assert exchange._api_async.fetch_ohlcv.call_count > 400
# assert res == ohlcv # assert res == ohlcv
exchange._api_async.fetch_ohlcv.reset_mock() exchange._api_async.fetch_ohlcv.reset_mock()
res = await exchange._async_get_historic_ohlcv(pair, "5m", 1500000000000, is_new_pair=True) _, _, res = await exchange._async_get_historic_ohlcv(
pair, "5m", 1500000000000, is_new_pair=True)
# Called twice - one "init" call - and one to get the actual data. # Called twice - one "init" call - and one to get the actual data.
assert exchange._api_async.fetch_ohlcv.call_count == 2 assert exchange._api_async.fetch_ohlcv.call_count == 2

View File

@ -1506,6 +1506,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name):
assert exchange._async_get_candle_history.call_count == 2 assert exchange._async_get_candle_history.call_count == 2
# Returns twice the above OHLCV data # Returns twice the above OHLCV data
assert len(ret) == 2 assert len(ret) == 2
assert log_has_re(r'Downloaded data for .* with length .*\.', caplog)
caplog.clear() caplog.clear()
@ -1587,12 +1588,13 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv) exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
pair = 'ETH/USDT' pair = 'ETH/USDT'
res = await exchange._async_get_historic_ohlcv(pair, "5m", respair, restf, res = await exchange._async_get_historic_ohlcv(
1500000000000, is_new_pair=False) pair, "5m", 1500000000000, is_new_pair=False)
assert respair == pair
assert restf == '5m'
# Call with very old timestamp - causes tons of requests # Call with very old timestamp - causes tons of requests
assert exchange._api_async.fetch_ohlcv.call_count > 200 assert exchange._api_async.fetch_ohlcv.call_count > 200
assert res[0] == ohlcv[0] assert res[0] == ohlcv[0]
assert log_has_re(r'Downloaded data for .* with length .*\.', caplog)
def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None: def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None: