diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index 558a4eb0f..7c93198d3 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -1390,8 +1390,12 @@ class Exchange: # Returns the intermediate trade file name for a given pair starting with `from_id` trade ID def _intermediate_trades_file(self, datadir: Path, pair: str, from_id: str, mkdir: bool = True) -> str: + def get_full_path(tmpdata_file: str, dir_id: int): + if dir_id == 0: + return tmpdata_file + return os.path.join(get_full_path(tmpdata_file, dir_id//1000), str(dir_id % 1000)) tmpdata_file = self._intermediate_trades_dir_for_pair(datadir, pair) - tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000)) + tmpdata_file = get_full_path(tmpdata_file, int(from_id)//1000000) if mkdir: Path(tmpdata_file).mkdir(parents=True, exist_ok=True) tmpdata_file = os.path.join(tmpdata_file, self._pair_dir(pair)+"_"+str(from_id)+".json") @@ -1411,9 +1415,37 @@ class Exchange: trades_list = json.loads(json_string) return trades_list + @retrier_async + async def _async_read_interval_tree_for_pair(self, tmpdata_dir: Path, pair: str, + int_tree: IntervalTree): + if os.path.isdir(tmpdata_dir): + tmpdata_subdirs = sorted(os.listdir(tmpdata_dir)) + for tmpdata_subdir in tmpdata_subdirs: + if tmpdata_subdir.isnumeric(): + tmpdata_subdir = os.path.join(tmpdata_dir, tmpdata_subdir) + if os.path.isdir(tmpdata_subdir): + await self._async_read_interval_tree_for_pair(tmpdata_subdir, pair, + int_tree) + elif tmpdata_subdir.startswith(self._pair_dir(pair)): + tmpdata_file = os.path.join(tmpdata_dir, tmpdata_subdir) + if os.path.isfile(tmpdata_file): + trades_list = await self._async_fetch_trades_from_file(tmpdata_file) + trades_list_from_id = trades_list[0][1] + trades_list_to_id = trades_list[-1][1] # the last one is repeated in next + + # if it's not paginated by_id + if not trades_list_from_id: + # pagination by_since + trades_list_from_id = trades_list[0][0] + trades_list_to_id = trades_list[-1][0] + 1 # +1 as it's exclusive + int_tree.add(Interval( + int(trades_list_from_id), # inclusive + int(trades_list_to_id), # exclusive + tmpdata_file)) + # Builds the interval tree (from the intermediate trade files) for a given pair @retrier_async - async def _get_interval_tree_for_pair(self, datadir: Path, pair: str) -> IntervalTree: + async def _async_get_interval_tree_for_pair(self, datadir: Path, pair: str) -> IntervalTree: cached_res = self._intermediate_data_cache.get(pair, None) if cached_res: return cached_res @@ -1421,23 +1453,7 @@ class Exchange: logger.debug("Caching intervals for pair %s", pair) cache_interval_tree = IntervalTree() tmpdata_dir = self._intermediate_trades_dir_for_pair(datadir, pair) - if os.path.isdir(tmpdata_dir): - tmpdata_subdirs = sorted(os.listdir(tmpdata_dir)) - for tmpdata_subdir in tmpdata_subdirs: - if tmpdata_subdir.isnumeric(): - tmpdata_subdir = os.path.join(tmpdata_dir, tmpdata_subdir) - tmpdata_files = sorted(os.listdir(tmpdata_subdir)) - for tmpdata_file in tmpdata_files: - if tmpdata_file.startswith(self._pair_dir(pair)): - tmpdata_file = os.path.join(tmpdata_subdir, tmpdata_file) - if os.path.isfile(tmpdata_file): - trades_list = await self._async_fetch_trades_from_file(tmpdata_file) - trades_list_from_id = trades_list[0][1] - trades_list_to_id = trades_list[-1][1] - cache_interval_tree.add(Interval( - int(trades_list_from_id), # inclusive - int(trades_list_to_id), # exclusive - tmpdata_file)) + await self._async_read_interval_tree_for_pair(tmpdata_dir, pair, cache_interval_tree) logger.debug("Cached intervals for pair %s: %s intervals", pair, len(cache_interval_tree)) self._intermediate_data_cache[pair] = cache_interval_tree @@ -1446,10 +1462,10 @@ class Exchange: # Checks whether the given trade id is cached in the intermediate trade files. # If it exists, returns the `Interval` for the cache file. Otherwise, returns `None`. @retrier_async - async def _is_id_cached_in_intermediate_data( + async def _async_is_id_cached_in_intermediate_data( self, datadir: Path, pair: str, id: int ) -> Optional[Interval]: - int_tree = await self._get_interval_tree_for_pair(datadir, pair) + int_tree = await self._async_get_interval_tree_for_pair(datadir, pair) intervals = sorted(int_tree[int(id)]) return intervals[0] if len(intervals) > 0 else None @@ -1482,58 +1498,75 @@ class Exchange: trades_list = trades_dict_to_list(trades) if trades_list and datadir and len(trades_list) == self.batch_size(): - from_id = trades_list[0][1] - to_id = trades_list[-1][1] + pagination_method = "by_id" + from_pg_id = trades_list[0][1] + to_pg_id = trades_list[-1][1] - cached_from_id_interval = await self._is_id_cached_in_intermediate_data( - datadir, pair, from_id) - cached_to_id_interval = await self._is_id_cached_in_intermediate_data( - datadir, pair, to_id) + if not from_pg_id: + from_pg_id = trades_list[0][0] + to_pg_id = trades_list[-1][0] + pagination_method = "by_since" - # If `from_id` exists in a cached interval, we return everything before the + pagination_col_index = 1 if pagination_method == "by_id" else 0 + + cached_from_pg_id_interval = await self._async_is_id_cached_in_intermediate_data( + datadir, pair, from_pg_id) + cached_to_pg_id_interval = await self._async_is_id_cached_in_intermediate_data( + datadir, pair, to_pg_id) + + # If `from_pg_id` exists in a cached interval, we return everything before the # beginning of this cached interval. In the next round, the cached trades (starting - # from `from_id`) will be requested and the response will be handled using a + # from `from_pg_id`) will be requested and the response will be handled using a # cached intermediate trade file. - if cached_from_id_interval: - # If the cached interval starts from `from_id`, then it's already cached and we - # return it. Otherwise, we filter all trades before the beginning of this cached - # interval. - if int(from_id) != cached_from_id_interval.begin: + if cached_from_pg_id_interval: + # If the cached interval starts from `from_pg_id`, then it's already cached and + # we return it. Otherwise, we filter all trades before the beginning of this + # cached interval. + if int(from_pg_id) != cached_from_pg_id_interval.begin: trades_list = list(filter( - lambda trade: int(trade[1]) < cached_from_id_interval.begin, trades_list + lambda trade: (int(trade[pagination_col_index]) < + cached_from_pg_id_interval.begin), + trades_list )) logger.debug("The result was partially cached in the intermediate result " + - "(using from_id). Returned %s elements without caching them.", + "(using from_pg_id). Returned %s " + + "elements without caching them.", len(trades_list)) return trades_list - # If `to_id` exists in a cached interval, we return everything before the + # If `to_pg_id` exists in a cached interval, we return everything before the # beginning of this cached interval. In the next round, the cached trades (starting - # from `to_id`) will be requested and the response will be handled using a + # from `to_pg_id`) will be requested and the response will be handled using a # cached intermediate trade file. - if cached_to_id_interval: + if cached_to_pg_id_interval: trades_list = list(filter( - lambda trade: int(trade[1]) < cached_to_id_interval.begin, trades_list + lambda trade: (int(trade[pagination_col_index]) < + cached_to_pg_id_interval.begin), + trades_list )) logger.debug("The result was partially cached in the intermediate result " + - "(using to_id). Returned %s elements without caching them.", + "(using to_pg_id). Returned %s elements without caching them.", len(trades_list)) return trades_list - # If neither `from_id` nor `to_id` are cached, we cache the trades in an + # If neither `from_pg_id` nor `to_pg_id` are cached, we cache the trades in an # intermediate trade file. assert datadir is not None - tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id) + tmpdata_file = self._intermediate_trades_file(datadir, pair, from_pg_id) json_string = json.dumps(trades_list) - self._intermediate_data_cache[pair].addi(int(from_id), int(to_id), tmpdata_file) + self._intermediate_data_cache[pair].addi( + int(from_pg_id), + int(to_pg_id), + tmpdata_file) with open(tmpdata_file, "w") as text_file: text_file.write(json_string) logger.debug("Cached the intermediate trades in %s", tmpdata_file) else: - from_id = trades_list[0][1] if trades_list else 0 + from_pg_id = trades_list[0][1] if trades_list else -1 + from_pg_id = from_pg_id if from_pg_id else trades_list[0][0] datadir = datadir or Path(".") - tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id, False) + tmpdata_file = self._intermediate_trades_file(datadir, pair, from_pg_id, False) logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s", tmpdata_file, len(trades_list)) return trades_list @@ -1589,7 +1622,7 @@ class Exchange: t = [] success_cache_read = False if datadir: - tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id) + tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id, False) if os.path.isfile(tmpdata_file): t = await self._async_fetch_trades_from_file(tmpdata_file) diff --git a/tests/exchange/test_exchange.py b/tests/exchange/test_exchange.py index ef9cd8801..10c16720f 100644 --- a/tests/exchange/test_exchange.py +++ b/tests/exchange/test_exchange.py @@ -2274,6 +2274,12 @@ def test__intermediate_trades_file(default_conf, mocker, caplog, exchange_name, expected_file = ("user_data/data/" + exchange_name + "/trades-intermediate-parts/ETH-BTC/254/ETH-BTC_254648140.json") assert expected_file == exchange._intermediate_trades_file(datadir, pair, "254648140", False) + expected_file = ( + "user_data/data/" + exchange_name + + "/trades-intermediate-parts/ETH-BTC/1/623/216/788/143/ETH-BTC_1623216788143201587.json" + ) + assert expected_file == exchange._intermediate_trades_file( + datadir, pair, "1623216788143201587", False) @pytest.mark.asyncio @@ -2298,7 +2304,7 @@ async def test__async_fetch_trades_from_file(default_conf, mocker, caplog, excha expected_trades_list = await exchange._async_fetch_trades_from_file(expected_file) assert expected_trades_list == json.loads(trades_list_string) - expected_interval_tree = await exchange._get_interval_tree_for_pair(datadir, pair) + expected_interval_tree = await exchange._async_get_interval_tree_for_pair(datadir, pair) expected_interval = Interval(254648140, 254648143, expected_file) assert len(expected_interval_tree) == 1