Merge pull request #5560 from freqtrade/fix/5548_1mdl
Don't blindly create coroutines, but fire them off in batches
This commit is contained in:
commit
50479d0b44
@ -28,7 +28,7 @@ from freqtrade.exceptions import (DDosProtection, ExchangeError, InsufficientFun
|
|||||||
from freqtrade.exchange.common import (API_FETCH_ORDER_RETRY_COUNT, BAD_EXCHANGES,
|
from freqtrade.exchange.common import (API_FETCH_ORDER_RETRY_COUNT, BAD_EXCHANGES,
|
||||||
EXCHANGE_HAS_OPTIONAL, EXCHANGE_HAS_REQUIRED, retrier,
|
EXCHANGE_HAS_OPTIONAL, EXCHANGE_HAS_REQUIRED, retrier,
|
||||||
retrier_async)
|
retrier_async)
|
||||||
from freqtrade.misc import deep_merge_dicts, safe_value_fallback2
|
from freqtrade.misc import chunks, deep_merge_dicts, safe_value_fallback2
|
||||||
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
|
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
|
||||||
|
|
||||||
|
|
||||||
@ -1239,21 +1239,22 @@ class Exchange:
|
|||||||
pair, timeframe, since) for since in
|
pair, timeframe, since) for since in
|
||||||
range(since_ms, arrow.utcnow().int_timestamp * 1000, one_call)]
|
range(since_ms, arrow.utcnow().int_timestamp * 1000, one_call)]
|
||||||
|
|
||||||
results = await asyncio.gather(*input_coroutines, return_exceptions=True)
|
|
||||||
|
|
||||||
# Combine gathered results
|
|
||||||
data: List = []
|
data: List = []
|
||||||
for res in results:
|
# Chunk requests into batches of 100 to avoid overwelming ccxt Throttling
|
||||||
if isinstance(res, Exception):
|
for input_coro in chunks(input_coroutines, 100):
|
||||||
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
|
|
||||||
continue
|
results = await asyncio.gather(*input_coro, return_exceptions=True)
|
||||||
# Deconstruct tuple if it's not an exception
|
for res in results:
|
||||||
p, _, new_data = res
|
if isinstance(res, Exception):
|
||||||
if p == pair:
|
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
|
||||||
data.extend(new_data)
|
continue
|
||||||
|
# Deconstruct tuple if it's not an exception
|
||||||
|
p, _, new_data = res
|
||||||
|
if p == pair:
|
||||||
|
data.extend(new_data)
|
||||||
# Sort data again after extending the result - above calls return in "async order"
|
# Sort data again after extending the result - above calls return in "async order"
|
||||||
data = sorted(data, key=lambda x: x[0])
|
data = sorted(data, key=lambda x: x[0])
|
||||||
logger.info("Downloaded data for %s with length %s.", pair, len(data))
|
logger.info(f"Downloaded data for {pair} with length {len(data)}.")
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
|
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
|
||||||
|
@ -1551,6 +1551,32 @@ def test_get_historic_ohlcv_as_df(default_conf, mocker, exchange_name):
|
|||||||
assert 'high' in ret.columns
|
assert 'high' in ret.columns
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.parametrize("exchange_name", EXCHANGES)
|
||||||
|
async def test__async_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name):
|
||||||
|
ohlcv = [
|
||||||
|
[
|
||||||
|
int((datetime.now(timezone.utc).timestamp() - 1000) * 1000),
|
||||||
|
1, # open
|
||||||
|
2, # high
|
||||||
|
3, # low
|
||||||
|
4, # close
|
||||||
|
5, # volume (in quote currency)
|
||||||
|
]
|
||||||
|
]
|
||||||
|
exchange = get_patched_exchange(mocker, default_conf, id=exchange_name)
|
||||||
|
# Monkey-patch async function
|
||||||
|
exchange._api_async.fetch_ohlcv = get_mock_coro(ohlcv)
|
||||||
|
|
||||||
|
pair = 'ETH/USDT'
|
||||||
|
res = await exchange._async_get_historic_ohlcv(pair, "5m",
|
||||||
|
1500000000000, is_new_pair=False)
|
||||||
|
# Call with very old timestamp - causes tons of requests
|
||||||
|
assert exchange._api_async.fetch_ohlcv.call_count > 200
|
||||||
|
assert res[0] == ohlcv[0]
|
||||||
|
assert log_has_re(r'Downloaded data for .* with length .*\.', caplog)
|
||||||
|
|
||||||
|
|
||||||
def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None:
|
def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None:
|
||||||
ohlcv = [
|
ohlcv = [
|
||||||
[
|
[
|
||||||
|
Loading…
Reference in New Issue
Block a user