Download using asyncio
This commit is contained in:
parent
74d6816a1a
commit
a107c4c7b4
@ -142,6 +142,7 @@ class Exchange(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
markets = self._api.load_markets()
|
markets = self._api.load_markets()
|
||||||
|
asyncio.get_event_loop().run_until_complete(self._api_async.load_markets())
|
||||||
except ccxt.BaseError as e:
|
except ccxt.BaseError as e:
|
||||||
logger.warning('Unable to validate pairs (assuming they are correct). Reason: %s', e)
|
logger.warning('Unable to validate pairs (assuming they are correct). Reason: %s', e)
|
||||||
return
|
return
|
||||||
@ -341,12 +342,43 @@ class Exchange(object):
|
|||||||
logger.info("returning cached ticker-data for %s", pair)
|
logger.info("returning cached ticker-data for %s", pair)
|
||||||
return self._cached_ticker[pair]
|
return self._cached_ticker[pair]
|
||||||
|
|
||||||
|
def get_history(self, pair: str, tick_interval: str,
|
||||||
|
since_ms: int) -> List:
|
||||||
|
"""
|
||||||
|
Gets candle history using asyncio and returns the list of candles.
|
||||||
|
Handles all async doing.
|
||||||
|
"""
|
||||||
|
return asyncio.get_event_loop().run_until_complete(
|
||||||
|
self._async_get_history(pair=pair, tick_interval=tick_interval,
|
||||||
|
since_ms=since_ms))
|
||||||
|
|
||||||
|
async def _async_get_history(self, pair: str,
|
||||||
|
tick_interval: str,
|
||||||
|
since_ms: int) -> List:
|
||||||
|
# Assume exchange returns 500 candles
|
||||||
|
_LIMIT = 500
|
||||||
|
|
||||||
|
one_call = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 * _LIMIT * 1000
|
||||||
|
logger.debug("one_call: %s", one_call)
|
||||||
|
input_coroutines = [self.async_get_candle_history(
|
||||||
|
pair, tick_interval, since) for since in
|
||||||
|
range(since_ms, int(time.time() * 1000), one_call)]
|
||||||
|
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
|
||||||
|
|
||||||
|
# Combine tickers
|
||||||
|
data = []
|
||||||
|
for tick in tickers:
|
||||||
|
if tick[0] == pair:
|
||||||
|
data.extend(tick[1])
|
||||||
|
logger.info("downloaded %s with length %s.", pair, len(data))
|
||||||
|
return data
|
||||||
|
|
||||||
async def async_get_candles_history(self, pairs: List[str],
|
async def async_get_candles_history(self, pairs: List[str],
|
||||||
tick_interval: str) -> List[Tuple[str, List]]:
|
tick_interval: str) -> List[Tuple[str, List]]:
|
||||||
# COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ?
|
# COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ?
|
||||||
# loop = asyncio.new_event_loop()
|
# loop = asyncio.new_event_loop()
|
||||||
# asyncio.set_event_loop(loop)
|
# asyncio.set_event_loop(loop)
|
||||||
await self._api_async.load_markets()
|
# await self._api_async.load_markets()
|
||||||
input_coroutines = [self.async_get_candle_history(
|
input_coroutines = [self.async_get_candle_history(
|
||||||
symbol, tick_interval) for symbol in pairs]
|
symbol, tick_interval) for symbol in pairs]
|
||||||
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
|
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
|
||||||
@ -357,7 +389,7 @@ class Exchange(object):
|
|||||||
since_ms: Optional[int] = None) -> Tuple[str, List]:
|
since_ms: Optional[int] = None) -> Tuple[str, List]:
|
||||||
try:
|
try:
|
||||||
# fetch ohlcv asynchronously
|
# fetch ohlcv asynchronously
|
||||||
logger.debug("fetching %s ...", pair)
|
logger.debug("fetching %s since %s ...", pair, since_ms)
|
||||||
|
|
||||||
# Calculating ticker interval in second
|
# Calculating ticker interval in second
|
||||||
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
|
interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
|
||||||
|
@ -218,8 +218,8 @@ def download_backtesting_testdata(datadir: str,
|
|||||||
logger.debug("Current Start: %s", misc.format_ms_time(data[1][0]) if data else 'None')
|
logger.debug("Current Start: %s", misc.format_ms_time(data[1][0]) if data else 'None')
|
||||||
logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None')
|
logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None')
|
||||||
|
|
||||||
new_data = exchange.get_candle_history(pair=pair, tick_interval=tick_interval,
|
new_data = exchange.get_history(pair=pair, tick_interval=tick_interval,
|
||||||
since_ms=since_ms)
|
since_ms=since_ms)
|
||||||
data.extend(new_data)
|
data.extend(new_data)
|
||||||
|
|
||||||
logger.debug("New Start: %s", misc.format_ms_time(data[0][0]))
|
logger.debug("New Start: %s", misc.format_ms_time(data[0][0]))
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
"""This script generate json data from bittrex"""
|
"""This script generate json data"""
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@ -52,9 +52,10 @@ exchange = Exchange({'key': '',
|
|||||||
'stake_currency': '',
|
'stake_currency': '',
|
||||||
'dry_run': True,
|
'dry_run': True,
|
||||||
'exchange': {
|
'exchange': {
|
||||||
'name': args.exchange,
|
'name': args.exchange,
|
||||||
'pair_whitelist': []
|
'pair_whitelist': [],
|
||||||
}
|
'ccxt_rate_limit': False
|
||||||
|
}
|
||||||
})
|
})
|
||||||
pairs_not_available = []
|
pairs_not_available = []
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user