Merge pull request #3 from xmatthias/ccxt-async_xmatt

ccxt async download
This commit is contained in:
misagh 2018-08-14 13:21:13 +02:00 committed by GitHub
commit 0b44dda7b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 29 deletions

View File

@ -54,7 +54,7 @@ class Exchange(object):
_cached_ticker: Dict[str, Any] = {} _cached_ticker: Dict[str, Any] = {}
# Holds last candle refreshed time of each pair # Holds last candle refreshed time of each pair
_pairs_last_refreshed_time = {} _pairs_last_refresh_time: Dict[str, int] = {}
# Holds candles # Holds candles
_cached_klines: Dict[str, Any] = {} _cached_klines: Dict[str, Any] = {}
@ -132,6 +132,15 @@ class Exchange(object):
"Please check your config.json") "Please check your config.json")
raise OperationalException(f'Exchange {name} does not provide a sandbox api') raise OperationalException(f'Exchange {name} does not provide a sandbox api')
def _load_async_markets(self) -> None:
try:
if self._api_async:
asyncio.get_event_loop().run_until_complete(self._api_async.load_markets())
except ccxt.BaseError as e:
logger.warning('Could not load async markets. Reason: %s', e)
return
def validate_pairs(self, pairs: List[str]) -> None: def validate_pairs(self, pairs: List[str]) -> None:
""" """
Checks if all given pairs are tradable on the current exchange. Checks if all given pairs are tradable on the current exchange.
@ -142,6 +151,7 @@ class Exchange(object):
try: try:
markets = self._api.load_markets() markets = self._api.load_markets()
self._load_async_markets()
except ccxt.BaseError as e: except ccxt.BaseError as e:
logger.warning('Unable to validate pairs (assuming they are correct). Reason: %s', e) logger.warning('Unable to validate pairs (assuming they are correct). Reason: %s', e)
return return
@ -341,11 +351,43 @@ class Exchange(object):
logger.info("returning cached ticker-data for %s", pair) logger.info("returning cached ticker-data for %s", pair)
return self._cached_ticker[pair] return self._cached_ticker[pair]
async def async_get_candles_history(self, pairs, tick_interval) -> List[Tuple[str, List]]: def get_history(self, pair: str, tick_interval: str,
since_ms: int) -> List:
"""
Gets candle history using asyncio and returns the list of candles.
Handles all async doing.
"""
return asyncio.get_event_loop().run_until_complete(
self._async_get_history(pair=pair, tick_interval=tick_interval,
since_ms=since_ms))
async def _async_get_history(self, pair: str,
tick_interval: str,
since_ms: int) -> List:
# Assume exchange returns 500 candles
_LIMIT = 500
one_call = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 * _LIMIT * 1000
logger.debug("one_call: %s", one_call)
input_coroutines = [self.async_get_candle_history(
pair, tick_interval, since) for since in
range(since_ms, int(time.time() * 1000), one_call)]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
# Combine tickers
data: List = []
for tick in tickers:
if tick[0] == pair:
data.extend(tick[1])
logger.info("downloaded %s with length %s.", pair, len(data))
return data
async def async_get_candles_history(self, pairs: List[str],
tick_interval: str) -> List[Tuple[str, List]]:
# COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ? # COMMENTED CODE IS FOR DISCUSSION: where should we close the loop on async ?
# loop = asyncio.new_event_loop() # loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop) # asyncio.set_event_loop(loop)
await self._api_async.load_markets() # await self._api_async.load_markets()
input_coroutines = [self.async_get_candle_history( input_coroutines = [self.async_get_candle_history(
symbol, tick_interval) for symbol in pairs] symbol, tick_interval) for symbol in pairs]
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True) tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
@ -356,21 +398,30 @@ class Exchange(object):
since_ms: Optional[int] = None) -> Tuple[str, List]: since_ms: Optional[int] = None) -> Tuple[str, List]:
try: try:
# fetch ohlcv asynchronously # fetch ohlcv asynchronously
logger.debug("fetching %s ...", pair) logger.debug("fetching %s since %s ...", pair, since_ms)
# Calculating ticker interval in second # Calculating ticker interval in second
interval_in_seconds = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60 interval_in_sec = constants.TICKER_INTERVAL_MINUTES[tick_interval] * 60
# If (last update time) + (interval in second) + (1 second) is greater than now # If (last update time) + (interval in second) is greater or equal than now
# that means we don't have to hit the API as there is no new candle # that means we don't have to hit the API as there is no new candle
# so we fetch it from local cache # so we fetch it from local cache
if self._pairs_last_refreshed_time.get(pair, 0) + interval_in_seconds + 1 > round(time.time()): if (not since_ms and
self._pairs_last_refresh_time.get(pair, 0) + interval_in_sec >=
int(time.time())):
data = self._cached_klines[pair] data = self._cached_klines[pair]
logger.debug("Using cached klines data for %s ...", pair)
else: else:
data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval, since=since_ms) data = await self._api_async.fetch_ohlcv(pair, timeframe=tick_interval,
since=since_ms)
# Because some exchange sort Tickers ASC and other DESC.
# Ex: Bittrex returns a list of tickers ASC (oldest first, newest last)
# when GDAX returns a list of tickers DESC (newest first, oldest last)
data = sorted(data, key=lambda x: x[0])
# keeping last candle time as last refreshed time of the pair # keeping last candle time as last refreshed time of the pair
self._pairs_last_refreshed_time[pair] = data[-1][0] / 1000 self._pairs_last_refresh_time[pair] = data[-1][0] // 1000
# keeping candles in cache # keeping candles in cache
self._cached_klines[pair] = data self._cached_klines[pair] = data

View File

@ -191,19 +191,18 @@ def download_backtesting_testdata(datadir: str,
timerange: Optional[TimeRange] = None) -> None: timerange: Optional[TimeRange] = None) -> None:
""" """
Download the latest ticker intervals from the exchange for the pairs passed in parameters Download the latest ticker intervals from the exchange for the pair passed in parameters
The data is downloaded starting from the last correct ticker interval data that The data is downloaded starting from the last correct ticker interval data that
esists in a cache. If timerange starts earlier than the data in the cache, exists in a cache. If timerange starts earlier than the data in the cache,
the full data will be redownloaded the full data will be redownloaded
Based on @Rybolov work: https://github.com/rybolov/freqtrade-data Based on @Rybolov work: https://github.com/rybolov/freqtrade-data
:param pairs: list of pairs to download :param pair: pair to download
:param tick_interval: ticker interval :param tick_interval: ticker interval
:param timerange: range of time to download :param timerange: range of time to download
:return: None :return: None
""" """
path = make_testdata_path(datadir) path = make_testdata_path(datadir)
filepair = pair.replace("/", "_") filepair = pair.replace("/", "_")
filename = os.path.join(path, f'{filepair}-{tick_interval}.json') filename = os.path.join(path, f'{filepair}-{tick_interval}.json')
@ -219,8 +218,11 @@ def download_backtesting_testdata(datadir: str,
logger.debug("Current Start: %s", misc.format_ms_time(data[1][0]) if data else 'None') logger.debug("Current Start: %s", misc.format_ms_time(data[1][0]) if data else 'None')
logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None') logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None')
new_data = exchange.get_candle_history(pair=pair, tick_interval=tick_interval, # Default since_ms to 30 days if nothing is given
since_ms=since_ms) new_data = exchange.get_history(pair=pair, tick_interval=tick_interval,
since_ms=since_ms if since_ms
else
int(arrow.utcnow().shift(days=-30).float_timestamp) * 1000)
data.extend(new_data) data.extend(new_data)
logger.debug("New Start: %s", misc.format_ms_time(data[0][0])) logger.debug("New Start: %s", misc.format_ms_time(data[0][0]))

View File

@ -13,6 +13,10 @@ from freqtrade.exchange import API_RETRY_COUNT, Exchange
from freqtrade.tests.conftest import get_patched_exchange, log_has from freqtrade.tests.conftest import get_patched_exchange, log_has
async def async_load_markets():
return {}
def ccxt_exceptionhandlers(mocker, default_conf, api_mock, fun, mock_ccxt_fun, **kwargs): def ccxt_exceptionhandlers(mocker, default_conf, api_mock, fun, mock_ccxt_fun, **kwargs):
with pytest.raises(TemporaryError): with pytest.raises(TemporaryError):
api_mock.__dict__[mock_ccxt_fun] = MagicMock(side_effect=ccxt.NetworkError) api_mock.__dict__[mock_ccxt_fun] = MagicMock(side_effect=ccxt.NetworkError)
@ -78,6 +82,7 @@ def test_symbol_amount_prec(default_conf, mocker):
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
exchange = Exchange(default_conf) exchange = Exchange(default_conf)
amount = 2.34559 amount = 2.34559
@ -101,6 +106,7 @@ def test_symbol_price_prec(default_conf, mocker):
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
exchange = Exchange(default_conf) exchange = Exchange(default_conf)
price = 2.34559 price = 2.34559
@ -122,6 +128,7 @@ def test_set_sandbox(default_conf, mocker):
type(api_mock).urls = url_mock type(api_mock).urls = url_mock
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
exchange = Exchange(default_conf) exchange = Exchange(default_conf)
liveurl = exchange._api.urls['api'] liveurl = exchange._api.urls['api']
@ -143,6 +150,7 @@ def test_set_sandbox_exception(default_conf, mocker):
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
with pytest.raises(OperationalException, match=r'does not provide a sandbox api'): with pytest.raises(OperationalException, match=r'does not provide a sandbox api'):
exchange = Exchange(default_conf) exchange = Exchange(default_conf)
@ -160,6 +168,7 @@ def test_validate_pairs(default_conf, mocker):
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
Exchange(default_conf) Exchange(default_conf)
@ -168,6 +177,7 @@ def test_validate_pairs_not_available(default_conf, mocker):
api_mock.load_markets = MagicMock(return_value={}) api_mock.load_markets = MagicMock(return_value={})
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
with pytest.raises(OperationalException, match=r'not available'): with pytest.raises(OperationalException, match=r'not available'):
Exchange(default_conf) Exchange(default_conf)
@ -181,6 +191,7 @@ def test_validate_pairs_not_compatible(default_conf, mocker):
default_conf['stake_currency'] = 'ETH' default_conf['stake_currency'] = 'ETH'
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock)) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', MagicMock(return_value=api_mock))
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
with pytest.raises(OperationalException, match=r'not compatible'): with pytest.raises(OperationalException, match=r'not compatible'):
Exchange(default_conf) Exchange(default_conf)
@ -193,6 +204,7 @@ def test_validate_pairs_exception(default_conf, mocker, caplog):
api_mock.load_markets = MagicMock(return_value={}) api_mock.load_markets = MagicMock(return_value={})
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', api_mock) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', api_mock)
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
with pytest.raises(OperationalException, match=r'Pair ETH/BTC is not available at Binance'): with pytest.raises(OperationalException, match=r'Pair ETH/BTC is not available at Binance'):
Exchange(default_conf) Exchange(default_conf)
@ -212,6 +224,7 @@ def test_validate_pairs_stake_exception(default_conf, mocker, caplog):
api_mock.name = MagicMock(return_value='binance') api_mock.name = MagicMock(return_value='binance')
mocker.patch('freqtrade.exchange.Exchange._init_ccxt', api_mock) mocker.patch('freqtrade.exchange.Exchange._init_ccxt', api_mock)
mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock()) mocker.patch('freqtrade.exchange.Exchange.validate_timeframes', MagicMock())
mocker.patch('freqtrade.exchange.Exchange._load_async_markets', MagicMock())
with pytest.raises( with pytest.raises(
OperationalException, OperationalException,
@ -588,7 +601,8 @@ async def test_async_get_candles_history(default_conf, mocker):
# Monkey-patch async function # Monkey-patch async function
exchange._api_async.fetch_ohlcv = async_fetch_ohlcv exchange._api_async.fetch_ohlcv = async_fetch_ohlcv
exchange = Exchange(default_conf) exchange._api_async.load_markets = async_load_markets
pairs = ['ETH/BTC', 'XRP/BTC'] pairs = ['ETH/BTC', 'XRP/BTC']
res = await exchange.async_get_candles_history(pairs, "5m") res = await exchange.async_get_candles_history(pairs, "5m")
assert type(res) is list assert type(res) is list

View File

@ -53,7 +53,7 @@ def _clean_test_file(file: str) -> None:
def test_load_data_30min_ticker(ticker_history, mocker, caplog, default_conf) -> None: def test_load_data_30min_ticker(ticker_history, mocker, caplog, default_conf) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'UNITTEST_BTC-30m.json') file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'UNITTEST_BTC-30m.json')
_backup_file(file, copy_file=True) _backup_file(file, copy_file=True)
optimize.load_data(None, pairs=['UNITTEST/BTC'], ticker_interval='30m') optimize.load_data(None, pairs=['UNITTEST/BTC'], ticker_interval='30m')
@ -63,7 +63,7 @@ def test_load_data_30min_ticker(ticker_history, mocker, caplog, default_conf) ->
def test_load_data_5min_ticker(ticker_history, mocker, caplog, default_conf) -> None: def test_load_data_5min_ticker(ticker_history, mocker, caplog, default_conf) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'UNITTEST_BTC-5m.json') file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'UNITTEST_BTC-5m.json')
_backup_file(file, copy_file=True) _backup_file(file, copy_file=True)
@ -74,7 +74,7 @@ def test_load_data_5min_ticker(ticker_history, mocker, caplog, default_conf) ->
def test_load_data_1min_ticker(ticker_history, mocker, caplog) -> None: def test_load_data_1min_ticker(ticker_history, mocker, caplog) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'UNITTEST_BTC-1m.json') file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'UNITTEST_BTC-1m.json')
_backup_file(file, copy_file=True) _backup_file(file, copy_file=True)
optimize.load_data(None, ticker_interval='1m', pairs=['UNITTEST/BTC']) optimize.load_data(None, ticker_interval='1m', pairs=['UNITTEST/BTC'])
@ -87,7 +87,7 @@ def test_load_data_with_new_pair_1min(ticker_history, mocker, caplog, default_co
""" """
Test load_data() with 1 min ticker Test load_data() with 1 min ticker
""" """
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
exchange = get_patched_exchange(mocker, default_conf) exchange = get_patched_exchange(mocker, default_conf)
file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'MEME_BTC-1m.json') file = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'MEME_BTC-1m.json')
@ -118,7 +118,7 @@ def test_testdata_path() -> None:
def test_download_pairs(ticker_history, mocker, default_conf) -> None: def test_download_pairs(ticker_history, mocker, default_conf) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
exchange = get_patched_exchange(mocker, default_conf) exchange = get_patched_exchange(mocker, default_conf)
file1_1 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'MEME_BTC-1m.json') file1_1 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'MEME_BTC-1m.json')
file1_5 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'MEME_BTC-5m.json') file1_5 = os.path.join(os.path.dirname(__file__), '..', 'testdata', 'MEME_BTC-5m.json')
@ -261,7 +261,7 @@ def test_load_cached_data_for_updating(mocker) -> None:
def test_download_pairs_exception(ticker_history, mocker, caplog, default_conf) -> None: def test_download_pairs_exception(ticker_history, mocker, caplog, default_conf) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
mocker.patch('freqtrade.optimize.__init__.download_backtesting_testdata', mocker.patch('freqtrade.optimize.__init__.download_backtesting_testdata',
side_effect=BaseException('File Error')) side_effect=BaseException('File Error'))
exchange = get_patched_exchange(mocker, default_conf) exchange = get_patched_exchange(mocker, default_conf)
@ -279,7 +279,7 @@ def test_download_pairs_exception(ticker_history, mocker, caplog, default_conf)
def test_download_backtesting_testdata(ticker_history, mocker, default_conf) -> None: def test_download_backtesting_testdata(ticker_history, mocker, default_conf) -> None:
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=ticker_history) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=ticker_history)
exchange = get_patched_exchange(mocker, default_conf) exchange = get_patched_exchange(mocker, default_conf)
# Download a 1 min ticker file # Download a 1 min ticker file
@ -304,7 +304,7 @@ def test_download_backtesting_testdata2(mocker, default_conf) -> None:
[1509836580000, 0.00161, 0.00161, 0.00161, 0.00161, 82.390199] [1509836580000, 0.00161, 0.00161, 0.00161, 0.00161, 82.390199]
] ]
json_dump_mock = mocker.patch('freqtrade.misc.file_dump_json', return_value=None) json_dump_mock = mocker.patch('freqtrade.misc.file_dump_json', return_value=None)
mocker.patch('freqtrade.exchange.Exchange.get_candle_history', return_value=tick) mocker.patch('freqtrade.exchange.Exchange.get_history', return_value=tick)
exchange = get_patched_exchange(mocker, default_conf) exchange = get_patched_exchange(mocker, default_conf)
download_backtesting_testdata(None, exchange, pair="UNITTEST/BTC", tick_interval='1m') download_backtesting_testdata(None, exchange, pair="UNITTEST/BTC", tick_interval='1m')
download_backtesting_testdata(None, exchange, pair="UNITTEST/BTC", tick_interval='3m') download_backtesting_testdata(None, exchange, pair="UNITTEST/BTC", tick_interval='3m')

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""This script generate json data from bittrex""" """This script generate json data"""
import json import json
import sys import sys
from pathlib import Path from pathlib import Path
@ -53,7 +53,8 @@ exchange = Exchange({'key': '',
'dry_run': True, 'dry_run': True,
'exchange': { 'exchange': {
'name': args.exchange, 'name': args.exchange,
'pair_whitelist': [] 'pair_whitelist': [],
'ccxt_rate_limit': False
} }
}) })
pairs_not_available = [] pairs_not_available = []