Move "drop_candle" decision to coroutine
This commit is contained in:
parent
bdf6537c60
commit
787d292ba0
@ -1813,7 +1813,7 @@ class Exchange:
|
|||||||
:param candle_type: '', mark, index, premiumIndex, or funding_rate
|
:param candle_type: '', mark, index, premiumIndex, or funding_rate
|
||||||
:return: List with candle (OHLCV) data
|
:return: List with candle (OHLCV) data
|
||||||
"""
|
"""
|
||||||
pair, _, _, data = self.loop.run_until_complete(
|
pair, _, _, data, _ = self.loop.run_until_complete(
|
||||||
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
|
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
|
||||||
since_ms=since_ms, until_ms=until_ms,
|
since_ms=since_ms, until_ms=until_ms,
|
||||||
is_new_pair=is_new_pair, candle_type=candle_type))
|
is_new_pair=is_new_pair, candle_type=candle_type))
|
||||||
@ -1855,12 +1855,12 @@ class Exchange:
|
|||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
# Deconstruct tuple if it's not an exception
|
# Deconstruct tuple if it's not an exception
|
||||||
p, _, c, new_data = res
|
p, _, c, new_data, _ = res
|
||||||
if p == pair and c == candle_type:
|
if p == pair and c == candle_type:
|
||||||
data.extend(new_data)
|
data.extend(new_data)
|
||||||
# Sort data again after extending the result - above calls return in "async order"
|
# Sort data again after extending the result - above calls return in "async order"
|
||||||
data = sorted(data, key=lambda x: x[0])
|
data = sorted(data, key=lambda x: x[0])
|
||||||
return pair, timeframe, candle_type, data
|
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
|
||||||
|
|
||||||
def _build_coroutine(
|
def _build_coroutine(
|
||||||
self, pair: str, timeframe: str, candle_type: CandleType,
|
self, pair: str, timeframe: str, candle_type: CandleType,
|
||||||
@ -1965,7 +1965,6 @@ class Exchange:
|
|||||||
:return: Dict of [{(pair, timeframe): Dataframe}]
|
:return: Dict of [{(pair, timeframe): Dataframe}]
|
||||||
"""
|
"""
|
||||||
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
|
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
|
||||||
drop_incomplete = self._ohlcv_partial_candle if drop_incomplete is None else drop_incomplete
|
|
||||||
|
|
||||||
# Gather coroutines to run
|
# Gather coroutines to run
|
||||||
input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache)
|
input_coroutines, cached_pairs = self._build_ohlcv_dl_jobs(pair_list, since_ms, cache)
|
||||||
@ -1983,8 +1982,9 @@ class Exchange:
|
|||||||
if isinstance(res, Exception):
|
if isinstance(res, Exception):
|
||||||
logger.warning(f"Async code raised an exception: {repr(res)}")
|
logger.warning(f"Async code raised an exception: {repr(res)}")
|
||||||
continue
|
continue
|
||||||
# Deconstruct tuple (has 4 elements)
|
# Deconstruct tuple (has 5 elements)
|
||||||
pair, timeframe, c_type, ticks = res
|
pair, timeframe, c_type, ticks, drop_hint = res
|
||||||
|
drop_incomplete = drop_hint if drop_incomplete is None else drop_incomplete
|
||||||
ohlcv_df = self._process_ohlcv_df(
|
ohlcv_df = self._process_ohlcv_df(
|
||||||
pair, timeframe, c_type, ticks, cache, drop_incomplete)
|
pair, timeframe, c_type, ticks, cache, drop_incomplete)
|
||||||
|
|
||||||
@ -2052,9 +2052,9 @@ class Exchange:
|
|||||||
data = sorted(data, key=lambda x: x[0])
|
data = sorted(data, key=lambda x: x[0])
|
||||||
except IndexError:
|
except IndexError:
|
||||||
logger.exception("Error loading %s. Result was %s.", pair, data)
|
logger.exception("Error loading %s. Result was %s.", pair, data)
|
||||||
return pair, timeframe, candle_type, []
|
return pair, timeframe, candle_type, [], self._ohlcv_partial_candle
|
||||||
logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe)
|
logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe)
|
||||||
return pair, timeframe, candle_type, data
|
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
|
||||||
|
|
||||||
except ccxt.NotSupported as e:
|
except ccxt.NotSupported as e:
|
||||||
raise OperationalException(
|
raise OperationalException(
|
||||||
|
@ -17,5 +17,5 @@ class Ticker(TypedDict):
|
|||||||
|
|
||||||
Tickers = Dict[str, Ticker]
|
Tickers = Dict[str, Ticker]
|
||||||
|
|
||||||
# pair, timeframe, candleType, OHLCV
|
# pair, timeframe, candleType, OHLCV, drop last?,
|
||||||
OHLCVResponse = Tuple[str, str, CandleType, List]
|
OHLCVResponse = Tuple[str, str, CandleType, List, bool]
|
||||||
|
@ -557,7 +557,7 @@ async def test__async_get_historic_ohlcv_binance(default_conf, mocker, caplog, c
|
|||||||
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
||||||
|
|
||||||
pair = 'ETH/BTC'
|
pair = 'ETH/BTC'
|
||||||
respair, restf, restype, res = await exchange._async_get_historic_ohlcv(
|
respair, restf, restype, res, _ = await exchange._async_get_historic_ohlcv(
|
||||||
pair, "5m", 1500000000000, is_new_pair=False, candle_type=candle_type)
|
pair, "5m", 1500000000000, is_new_pair=False, candle_type=candle_type)
|
||||||
assert respair == pair
|
assert respair == pair
|
||||||
assert restf == '5m'
|
assert restf == '5m'
|
||||||
@ -566,7 +566,7 @@ async def test__async_get_historic_ohlcv_binance(default_conf, mocker, caplog, c
|
|||||||
assert exchange._api_async.fetch_ohlcv.call_count > 400
|
assert exchange._api_async.fetch_ohlcv.call_count > 400
|
||||||
# assert res == ohlcv
|
# assert res == ohlcv
|
||||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||||
_, _, _, res = await exchange._async_get_historic_ohlcv(
|
_, _, _, res, _ = await exchange._async_get_historic_ohlcv(
|
||||||
pair, "5m", 1500000000000, is_new_pair=True, candle_type=candle_type)
|
pair, "5m", 1500000000000, is_new_pair=True, candle_type=candle_type)
|
||||||
|
|
||||||
# Called twice - one "init" call - and one to get the actual data.
|
# Called twice - one "init" call - and one to get the actual data.
|
||||||
|
@ -1955,7 +1955,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name, candle_
|
|||||||
pair = 'ETH/BTC'
|
pair = 'ETH/BTC'
|
||||||
|
|
||||||
async def mock_candle_hist(pair, timeframe, candle_type, since_ms):
|
async def mock_candle_hist(pair, timeframe, candle_type, since_ms):
|
||||||
return pair, timeframe, candle_type, ohlcv
|
return pair, timeframe, candle_type, ohlcv, True
|
||||||
|
|
||||||
exchange._async_get_candle_history = Mock(wraps=mock_candle_hist)
|
exchange._async_get_candle_history = Mock(wraps=mock_candle_hist)
|
||||||
# one_call calculation * 1.8 should do 2 calls
|
# one_call calculation * 1.8 should do 2 calls
|
||||||
@ -2007,7 +2007,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
|
|||||||
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
||||||
|
|
||||||
pair = 'ETH/USDT'
|
pair = 'ETH/USDT'
|
||||||
respair, restf, _, res = await exchange._async_get_historic_ohlcv(
|
respair, restf, _, res, _ = await exchange._async_get_historic_ohlcv(
|
||||||
pair, "5m", 1500000000000, candle_type=candle_type, is_new_pair=False)
|
pair, "5m", 1500000000000, candle_type=candle_type, is_new_pair=False)
|
||||||
assert respair == pair
|
assert respair == pair
|
||||||
assert restf == '5m'
|
assert restf == '5m'
|
||||||
@ -2018,7 +2018,7 @@ async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_
|
|||||||
exchange._api_async.fetch_ohlcv.reset_mock()
|
exchange._api_async.fetch_ohlcv.reset_mock()
|
||||||
end_ts = 1_500_500_000_000
|
end_ts = 1_500_500_000_000
|
||||||
start_ts = 1_500_000_000_000
|
start_ts = 1_500_000_000_000
|
||||||
respair, restf, _, res = await exchange._async_get_historic_ohlcv(
|
respair, restf, _, res, _ = await exchange._async_get_historic_ohlcv(
|
||||||
pair, "5m", since_ms=start_ts, candle_type=candle_type, is_new_pair=False,
|
pair, "5m", since_ms=start_ts, candle_type=candle_type, is_new_pair=False,
|
||||||
until_ms=end_ts
|
until_ms=end_ts
|
||||||
)
|
)
|
||||||
@ -2250,7 +2250,7 @@ async def test__async_get_candle_history(default_conf, mocker, caplog, exchange_
|
|||||||
pair = 'ETH/BTC'
|
pair = 'ETH/BTC'
|
||||||
res = await exchange._async_get_candle_history(pair, "5m", CandleType.SPOT)
|
res = await exchange._async_get_candle_history(pair, "5m", CandleType.SPOT)
|
||||||
assert type(res) is tuple
|
assert type(res) is tuple
|
||||||
assert len(res) == 4
|
assert len(res) == 5
|
||||||
assert res[0] == pair
|
assert res[0] == pair
|
||||||
assert res[1] == "5m"
|
assert res[1] == "5m"
|
||||||
assert res[2] == CandleType.SPOT
|
assert res[2] == CandleType.SPOT
|
||||||
@ -2337,7 +2337,7 @@ async def test__async_get_candle_history_empty(default_conf, mocker, caplog):
|
|||||||
pair = 'ETH/BTC'
|
pair = 'ETH/BTC'
|
||||||
res = await exchange._async_get_candle_history(pair, "5m", CandleType.SPOT)
|
res = await exchange._async_get_candle_history(pair, "5m", CandleType.SPOT)
|
||||||
assert type(res) is tuple
|
assert type(res) is tuple
|
||||||
assert len(res) == 4
|
assert len(res) == 5
|
||||||
assert res[0] == pair
|
assert res[0] == pair
|
||||||
assert res[1] == "5m"
|
assert res[1] == "5m"
|
||||||
assert res[2] == CandleType.SPOT
|
assert res[2] == CandleType.SPOT
|
||||||
|
Loading…
Reference in New Issue
Block a user