From 0439ae00720530f6d511bfa924995b66ec8066db Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Thu, 16 Sep 2021 08:48:22 -0700 Subject: [PATCH] Added intermediate files to help with progressive downloads --- .../configuration/directory_operations.py | 5 + freqtrade/data/history/history_utils.py | 1 + freqtrade/exchange/exchange.py | 98 ++++++++++++++++--- 3 files changed, 89 insertions(+), 15 deletions(-) diff --git a/freqtrade/configuration/directory_operations.py b/freqtrade/configuration/directory_operations.py index ca305c260..3dcb6acd5 100644 --- a/freqtrade/configuration/directory_operations.py +++ b/freqtrade/configuration/directory_operations.py @@ -21,6 +21,11 @@ def create_datadir(config: Dict[str, Any], datadir: Optional[str] = None) -> Pat if not folder.is_dir(): folder.mkdir(parents=True) logger.info(f'Created data directory: {datadir}') + + intermediate_dir = folder.joinpath('trades-intermediate-parts') + if not intermediate_dir.is_dir(): + intermediate_dir.mkdir(parents=True) + logger.info(f'Created intermediate data directory: {intermediate_dir}') return folder diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index e6b8db322..48930bf30 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -306,6 +306,7 @@ def _download_trades_history(exchange: Exchange, since=since, until=until, from_id=from_id, + datadir=data_handler._datadir ) trades.extend(new_trades[1]) # Remove duplicates to make sure we're not storing data we don't need diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 2b9b08d70..87596d39c 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -13,6 +13,10 @@ from typing import Any, Dict, List, Optional, Tuple import arrow import ccxt +import json +import os +from pathlib import Path + import ccxt.async_support as ccxt_async from cachetools import TTLCache from ccxt.base.decimal_to_precision import (ROUND_DOWN, ROUND_UP, TICK_SIZE, TRUNCATE, @@ -1369,11 +1373,32 @@ class Exchange: f'for pair {pair}. Message: {e}') from e # Fetch historic trades + def _intermediate_trades_dir(self, datadir: str, pair: str, from_id: int) -> str: + tmpdata_file = os.path.join(datadir, "trades-intermediate-parts") + tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-")) + tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000)) + Path(tmpdata_file).mkdir(parents=True, exist_ok=True) + tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-")+"_"+from_id+".json") + return tmpdata_file + + @retrier_async + async def _async_fetch_trades_from_file(self, datafile: str) -> List[List]: + # Open a file: file + file = open(datafile, mode='r') + + # read all lines at once + json_string = file.read() + + # close the file + file.close() + trades_list = json.loads(json_string) + return trades_list @retrier_async async def _async_fetch_trades(self, pair: str, since: Optional[int] = None, - params: Optional[dict] = None) -> List[List]: + params: Optional[dict] = None, + datadir: Optional[str] = None) -> List[List]: """ Asyncronously gets trade history using fetch_trades. Handles exchange errors, does one call to the exchange. @@ -1385,15 +1410,30 @@ class Exchange: # fetch trades asynchronously if params: logger.debug("Fetching trades for pair %s, params: %s ", pair, params) - trades = await self._api_async.fetch_trades(pair, params=params, limit=1000) + trades = await self._api_async.fetch_trades(pair, params=params, limit=self.batch_size()) else: logger.debug( "Fetching trades for pair %s, since %s %s...", pair, since, '(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else '' ) - trades = await self._api_async.fetch_trades(pair, since=since, limit=1000) - return trades_dict_to_list(trades) + trades = await self._api_async.fetch_trades(pair, since=since, limit=self.batch_size()) + + + trades_list = trades_dict_to_list(trades) + if trades_list and len(trades_list) == self.batch_size() and datadir: + from_id = trades_list[0][1] + tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id) + + json_string = json.dumps(trades_list) + with open(tmpdata_file, "w") as text_file: + text_file.write(json_string) + logger.debug("Cached the intermediate trades in %s", tmpdata_file) + else: + from_id = trades_list[0][1] if trades_list else 0 + tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id) + logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s", tmpdata_file, len(trades_list)) + return trades_list except ccxt.NotSupported as e: raise OperationalException( f'Exchange {self._api.name} does not support fetching historical trade data.' @@ -1406,10 +1446,15 @@ class Exchange: except ccxt.BaseError as e: raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e + def batch_size(self) -> int: + return 1000 + async def _async_get_trade_history_id(self, pair: str, until: int, since: Optional[int] = None, - from_id: Optional[str] = None) -> Tuple[str, List[List]]: + from_id: Optional[str] = None, + datadir: Optional[str] = None + ) -> Tuple[str, List[List]]: """ Asyncronously gets trade history using fetch_trades use this when exchange uses id-based iteration (check `self._trades_pagination`) @@ -1428,14 +1473,32 @@ class Exchange: # of up to an hour. # e.g. Binance returns the "last 1000" candles within a 1h time interval # - so we will miss the first trades. - t = await self._async_fetch_trades(pair, since=since) + t = await self._async_fetch_trades(pair, since=since, datadir=datadir) # DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 1 -> id from_id = t[-1][1] trades.extend(t[:-1]) while True: - t = await self._async_fetch_trades(pair, - params={self._trades_pagination_arg: from_id}) + tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id) + + t = [] + success_cache_read = False + if os.path.isfile(tmpdata_file): + t = await self._async_fetch_trades_from_file(tmpdata_file) + + if len(t) < self.batch_size(): + success_cache_read = False + logger.debug("Read from cache %s", tmpdata_file) + else: + success_cache_read = True + from_id = t[0][1] + to_id = t[-1][1] + logger.debug("Read from cache %s from %s to %s", tmpdata_file, from_id, to_id) + + if not success_cache_read: + t = await self._async_fetch_trades(pair, + params={self._trades_pagination_arg: from_id}, + datadir=datadir) if t: # Skip last id since its the key for the next call trades.extend(t[:-1]) @@ -1453,7 +1516,9 @@ class Exchange: return (pair, trades) async def _async_get_trade_history_time(self, pair: str, until: int, - since: Optional[int] = None) -> Tuple[str, List[List]]: + since: Optional[int] = None, + datadir: Optional[str] = None + ) -> Tuple[str, List[List]]: """ Asyncronously gets trade history using fetch_trades, when the exchange uses time-based iteration (check `self._trades_pagination`) @@ -1467,7 +1532,7 @@ class Exchange: # DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 1 -> id while True: - t = await self._async_fetch_trades(pair, since=since) + t = await self._async_fetch_trades(pair, since=since, datadir=datadir) if t: since = t[-1][0] trades.extend(t) @@ -1484,7 +1549,9 @@ class Exchange: async def _async_get_trade_history(self, pair: str, since: Optional[int] = None, until: Optional[int] = None, - from_id: Optional[str] = None) -> Tuple[str, List[List]]: + from_id: Optional[str] = None, + datadir: Optional[str] = None + ) -> Tuple[str, List[List]]: """ Async wrapper handling downloading trades using either time or id based methods. """ @@ -1498,10 +1565,10 @@ class Exchange: if self._trades_pagination == 'time': return await self._async_get_trade_history_time( - pair=pair, since=since, until=until) + pair=pair, since=since, until=until, datadir=datadir) elif self._trades_pagination == 'id': return await self._async_get_trade_history_id( - pair=pair, since=since, until=until, from_id=from_id + pair=pair, since=since, until=until, from_id=from_id, datadir=datadir ) else: raise OperationalException(f"Exchange {self.name} does use neither time, " @@ -1510,7 +1577,8 @@ class Exchange: def get_historic_trades(self, pair: str, since: Optional[int] = None, until: Optional[int] = None, - from_id: Optional[str] = None) -> Tuple[str, List]: + from_id: Optional[str] = None, + datadir: Optional[str] = None) -> Tuple[str, List]: """ Get trade history data using asyncio. Handles all async work and returns the list of candles. @@ -1526,7 +1594,7 @@ class Exchange: return asyncio.get_event_loop().run_until_complete( self._async_get_trade_history(pair=pair, since=since, - until=until, from_id=from_id)) + until=until, from_id=from_id, datadir=datadir)) def is_exchange_known_ccxt(exchange_name: str, ccxt_module: CcxtModuleType = None) -> bool: