From 62ee82a15d20e642ef2cc10e9bb19377588f622c Mon Sep 17 00:00:00 2001 From: Mohammad Dashti Date: Thu, 16 Sep 2021 12:45:43 -0700 Subject: [PATCH] Added a feature to facilitate caching data between multiple sessions --- environment.yml | 1 + freqtrade/exchange/exchange.py | 121 ++++++++++++++++++++++++++++++--- requirements.txt | 1 + setup.py | 1 + 4 files changed, 114 insertions(+), 10 deletions(-) diff --git a/environment.yml b/environment.yml index f58434c15..e62495bba 100644 --- a/environment.yml +++ b/environment.yml @@ -17,6 +17,7 @@ dependencies: - requests - urllib3 - wrapt + - intervaltree - jsonschema - TA-Lib - tabulate diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 87596d39c..9b55d621b 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -16,6 +16,7 @@ import ccxt import json import os from pathlib import Path +from intervaltree import Interval, IntervalTree import ccxt.async_support as ccxt_async from cachetools import TTLCache @@ -78,6 +79,11 @@ class Exchange: } _ft_has: Dict = {} + # For each pair, we cache the intermediate data files stored for trades if they exist + # This dictionary maps each pair name (e.g., `ETH-BTC`) to an `IntervalTree`. This interval tree + # contains the intervals of trade IDs that are already cached in the intermediate files. + _intermediate_data_cache: Dict = {} + def __init__(self, config: Dict[str, Any], validate: bool = True) -> None: """ Initializes this module with the given config, @@ -1372,15 +1378,25 @@ class Exchange: raise OperationalException(f'Could not fetch historical candle (OHLCV) data ' f'for pair {pair}. Message: {e}') from e - # Fetch historic trades - def _intermediate_trades_dir(self, datadir: str, pair: str, from_id: int) -> str: - tmpdata_file = os.path.join(datadir, "trades-intermediate-parts") - tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-")) + # Directory name for a pair, e.g., given `ETH/BTC` returns `ETH-BTC`. + def _pair_dir(self, pair: str) -> str: + return pair.replace("/","-") + + # Returns the directory path that contains the intermediate trade files for a given pair. + def _intermediate_trades_dir_for_pair(self, datadir: str, pair: str) -> str: + tmpdata_dir = os.path.join(datadir, "trades-intermediate-parts") + tmpdata_dir = os.path.join(tmpdata_dir, self._pair_dir(pair)) + return tmpdata_dir + + # Returns the intermediate trade file name for a given pair starting with `from_id` trade ID + def _intermediate_trades_file(self, datadir: str, pair: str, from_id: int) -> str: + tmpdata_file = self._intermediate_trades_dir_for_pair(datadir, pair) tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000)) Path(tmpdata_file).mkdir(parents=True, exist_ok=True) - tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-")+"_"+from_id+".json") + tmpdata_file = os.path.join(tmpdata_file, self._pair_dir(pair)+"_"+from_id+".json") return tmpdata_file + # Fetch historic trades @retrier_async async def _async_fetch_trades_from_file(self, datafile: str) -> List[List]: # Open a file: file @@ -1394,6 +1410,48 @@ class Exchange: trades_list = json.loads(json_string) return trades_list + # Builds the interval tree (from the intermediate trade files) for a given pair + @retrier_async + async def _get_interval_tree_for_pair(self, datadir: str, pair: str) -> IntervalTree: + cached_res = self._intermediate_data_cache.get(pair, None) + if cached_res: + return cached_res; + + logger.debug("Caching intervals for pair %s", pair) + cache_interval_tree = IntervalTree() + tmpdata_dir = self._intermediate_trades_dir_for_pair(datadir, pair) + if os.path.isdir(tmpdata_dir): + tmpdata_subdirs = os.listdir(tmpdata_dir) + for tmpdata_subdir in tmpdata_subdirs: + if tmpdata_subdir.isnumeric(): + tmpdata_subdir = os.path.join(tmpdata_dir, tmpdata_subdir) + tmpdata_files = os.listdir(tmpdata_subdir) + for tmpdata_file in tmpdata_files: + if tmpdata_file.startswith(self._pair_dir(pair)): + tmpdata_file = os.path.join(tmpdata_subdir, tmpdata_file) + if os.path.isfile(tmpdata_file): + trades_list = await self._async_fetch_trades_from_file(tmpdata_file) + trades_list_from_id = trades_list[0][1] + trades_list_to_id = trades_list[-1][1] + cache_interval_tree.addi( + int(trades_list_from_id), + int(trades_list_to_id)+1, + tmpdata_file) + + logger.debug("Cached intervals for pair %s: %s intervals", pair, len(cache_interval_tree)) + self._intermediate_data_cache[pair] = cache_interval_tree + return cache_interval_tree + + # Checks whether the given trade id is cached in the intermediate trade files. + # If it exists, returns the `Interval` for the cache file. Otherwise, returns `None`. + @retrier_async + async def _is_id_cached_in_intermediate_data( + self, datadir: str, pair: str, id: int + ) -> Optional[Interval]: + int_tree = await self._get_interval_tree_for_pair(datadir, pair) + intervals = sorted(int_tree[int(id)]) + return intervals[0] if len(intervals) > 0 else None + @retrier_async async def _async_fetch_trades(self, pair: str, since: Optional[int] = None, @@ -1421,18 +1479,58 @@ class Exchange: trades_list = trades_dict_to_list(trades) - if trades_list and len(trades_list) == self.batch_size() and datadir: + if trades_list and datadir and len(trades_list) == self.batch_size(): from_id = trades_list[0][1] - tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id) + to_id = trades_list[-1][1] + cached_from_id_interval = await self._is_id_cached_in_intermediate_data( + datadir, pair, from_id) + cached_to_id_interval = await self._is_id_cached_in_intermediate_data( + datadir, pair, to_id) + + # If `from_id` exists in a cached interval, we return everything before the + # beginning of this cached interval. In the next round, the cached trades (starting + # from `from_id`) will be requested and the response will be handled using a + # cached intermediate trade file. + if cached_from_id_interval: + # If the cached interval starts from `from_id`, then it's already cached and we + # return it. Otherwise, we filter all trades before the beginning of this cached + # interval. + if int(from_id) != cached_from_id_interval.begin: + trades_list = list(filter( + lambda trade: int(trade[1]) < cached_from_id_interval.begin, trades_list + )) + logger.debug("The result was partially cached in the intermediate result " + + "(using from_id). Returned %s elements without caching them.", + len(trades_list)) + return trades_list + + # If `to_id` exists in a cached interval, we return everything before the + # beginning of this cached interval. In the next round, the cached trades (starting + # from `to_id`) will be requested and the response will be handled using a + # cached intermediate trade file. + if cached_to_id_interval: + trades_list = list(filter( + lambda trade: int(trade[1]) < cached_to_id_interval.begin, trades_list + )) + logger.debug("The result was partially cached in the intermediate result " + + "(using to_id). Returned %s elements without caching them.", + len(trades_list)) + return trades_list + + + # If neither `from_id` nor `to_id` are cached, we cache the trades in an + # intermediate trade file. + tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id) json_string = json.dumps(trades_list) with open(tmpdata_file, "w") as text_file: text_file.write(json_string) logger.debug("Cached the intermediate trades in %s", tmpdata_file) else: from_id = trades_list[0][1] if trades_list else 0 - tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id) - logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s", tmpdata_file, len(trades_list)) + tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id) + logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s", + tmpdata_file, len(trades_list)) return trades_list except ccxt.NotSupported as e: raise OperationalException( @@ -1477,9 +1575,12 @@ class Exchange: # DEFAULT_TRADES_COLUMNS: 0 -> timestamp # DEFAULT_TRADES_COLUMNS: 1 -> id from_id = t[-1][1] + logger.debug( + "Did not cache the intermediate trades since %s with len=%s and from_id=%s", since, + len(t), from_id) trades.extend(t[:-1]) while True: - tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id) + tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id) t = [] success_cache_read = False diff --git a/requirements.txt b/requirements.txt index aa729dd9f..d33da7557 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ cachetools==4.2.2 requests==2.26.0 urllib3==1.26.6 wrapt==1.12.1 +intervaltree==3.1.0 jsonschema==3.2.0 TA-Lib==0.4.21 technical==1.3.0 diff --git a/setup.py b/setup.py index 727c40c7c..b9e20a8e1 100644 --- a/setup.py +++ b/setup.py @@ -52,6 +52,7 @@ setup( 'requests', 'urllib3', 'wrapt', + 'intervaltree', 'jsonschema', 'TA-Lib', 'technical',