Added support for pagination by since
This commit is contained in:
parent
9a3de2b701
commit
ddba69af71
@ -1390,8 +1390,12 @@ class Exchange:
|
|||||||
# Returns the intermediate trade file name for a given pair starting with `from_id` trade ID
|
# Returns the intermediate trade file name for a given pair starting with `from_id` trade ID
|
||||||
def _intermediate_trades_file(self, datadir: Path, pair: str, from_id: str,
|
def _intermediate_trades_file(self, datadir: Path, pair: str, from_id: str,
|
||||||
mkdir: bool = True) -> str:
|
mkdir: bool = True) -> str:
|
||||||
|
def get_full_path(tmpdata_file: str, dir_id: int):
|
||||||
|
if dir_id == 0:
|
||||||
|
return tmpdata_file
|
||||||
|
return os.path.join(get_full_path(tmpdata_file, dir_id//1000), str(dir_id % 1000))
|
||||||
tmpdata_file = self._intermediate_trades_dir_for_pair(datadir, pair)
|
tmpdata_file = self._intermediate_trades_dir_for_pair(datadir, pair)
|
||||||
tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000))
|
tmpdata_file = get_full_path(tmpdata_file, int(from_id)//1000000)
|
||||||
if mkdir:
|
if mkdir:
|
||||||
Path(tmpdata_file).mkdir(parents=True, exist_ok=True)
|
Path(tmpdata_file).mkdir(parents=True, exist_ok=True)
|
||||||
tmpdata_file = os.path.join(tmpdata_file, self._pair_dir(pair)+"_"+str(from_id)+".json")
|
tmpdata_file = os.path.join(tmpdata_file, self._pair_dir(pair)+"_"+str(from_id)+".json")
|
||||||
@ -1411,9 +1415,37 @@ class Exchange:
|
|||||||
trades_list = json.loads(json_string)
|
trades_list = json.loads(json_string)
|
||||||
return trades_list
|
return trades_list
|
||||||
|
|
||||||
|
@retrier_async
|
||||||
|
async def _async_read_interval_tree_for_pair(self, tmpdata_dir: Path, pair: str,
|
||||||
|
int_tree: IntervalTree):
|
||||||
|
if os.path.isdir(tmpdata_dir):
|
||||||
|
tmpdata_subdirs = sorted(os.listdir(tmpdata_dir))
|
||||||
|
for tmpdata_subdir in tmpdata_subdirs:
|
||||||
|
if tmpdata_subdir.isnumeric():
|
||||||
|
tmpdata_subdir = os.path.join(tmpdata_dir, tmpdata_subdir)
|
||||||
|
if os.path.isdir(tmpdata_subdir):
|
||||||
|
await self._async_read_interval_tree_for_pair(tmpdata_subdir, pair,
|
||||||
|
int_tree)
|
||||||
|
elif tmpdata_subdir.startswith(self._pair_dir(pair)):
|
||||||
|
tmpdata_file = os.path.join(tmpdata_dir, tmpdata_subdir)
|
||||||
|
if os.path.isfile(tmpdata_file):
|
||||||
|
trades_list = await self._async_fetch_trades_from_file(tmpdata_file)
|
||||||
|
trades_list_from_id = trades_list[0][1]
|
||||||
|
trades_list_to_id = trades_list[-1][1] # the last one is repeated in next
|
||||||
|
|
||||||
|
# if it's not paginated by_id
|
||||||
|
if not trades_list_from_id:
|
||||||
|
# pagination by_since
|
||||||
|
trades_list_from_id = trades_list[0][0]
|
||||||
|
trades_list_to_id = trades_list[-1][0] + 1 # +1 as it's exclusive
|
||||||
|
int_tree.add(Interval(
|
||||||
|
int(trades_list_from_id), # inclusive
|
||||||
|
int(trades_list_to_id), # exclusive
|
||||||
|
tmpdata_file))
|
||||||
|
|
||||||
# Builds the interval tree (from the intermediate trade files) for a given pair
|
# Builds the interval tree (from the intermediate trade files) for a given pair
|
||||||
@retrier_async
|
@retrier_async
|
||||||
async def _get_interval_tree_for_pair(self, datadir: Path, pair: str) -> IntervalTree:
|
async def _async_get_interval_tree_for_pair(self, datadir: Path, pair: str) -> IntervalTree:
|
||||||
cached_res = self._intermediate_data_cache.get(pair, None)
|
cached_res = self._intermediate_data_cache.get(pair, None)
|
||||||
if cached_res:
|
if cached_res:
|
||||||
return cached_res
|
return cached_res
|
||||||
@ -1421,23 +1453,7 @@ class Exchange:
|
|||||||
logger.debug("Caching intervals for pair %s", pair)
|
logger.debug("Caching intervals for pair %s", pair)
|
||||||
cache_interval_tree = IntervalTree()
|
cache_interval_tree = IntervalTree()
|
||||||
tmpdata_dir = self._intermediate_trades_dir_for_pair(datadir, pair)
|
tmpdata_dir = self._intermediate_trades_dir_for_pair(datadir, pair)
|
||||||
if os.path.isdir(tmpdata_dir):
|
await self._async_read_interval_tree_for_pair(tmpdata_dir, pair, cache_interval_tree)
|
||||||
tmpdata_subdirs = sorted(os.listdir(tmpdata_dir))
|
|
||||||
for tmpdata_subdir in tmpdata_subdirs:
|
|
||||||
if tmpdata_subdir.isnumeric():
|
|
||||||
tmpdata_subdir = os.path.join(tmpdata_dir, tmpdata_subdir)
|
|
||||||
tmpdata_files = sorted(os.listdir(tmpdata_subdir))
|
|
||||||
for tmpdata_file in tmpdata_files:
|
|
||||||
if tmpdata_file.startswith(self._pair_dir(pair)):
|
|
||||||
tmpdata_file = os.path.join(tmpdata_subdir, tmpdata_file)
|
|
||||||
if os.path.isfile(tmpdata_file):
|
|
||||||
trades_list = await self._async_fetch_trades_from_file(tmpdata_file)
|
|
||||||
trades_list_from_id = trades_list[0][1]
|
|
||||||
trades_list_to_id = trades_list[-1][1]
|
|
||||||
cache_interval_tree.add(Interval(
|
|
||||||
int(trades_list_from_id), # inclusive
|
|
||||||
int(trades_list_to_id), # exclusive
|
|
||||||
tmpdata_file))
|
|
||||||
|
|
||||||
logger.debug("Cached intervals for pair %s: %s intervals", pair, len(cache_interval_tree))
|
logger.debug("Cached intervals for pair %s: %s intervals", pair, len(cache_interval_tree))
|
||||||
self._intermediate_data_cache[pair] = cache_interval_tree
|
self._intermediate_data_cache[pair] = cache_interval_tree
|
||||||
@ -1446,10 +1462,10 @@ class Exchange:
|
|||||||
# Checks whether the given trade id is cached in the intermediate trade files.
|
# Checks whether the given trade id is cached in the intermediate trade files.
|
||||||
# If it exists, returns the `Interval` for the cache file. Otherwise, returns `None`.
|
# If it exists, returns the `Interval` for the cache file. Otherwise, returns `None`.
|
||||||
@retrier_async
|
@retrier_async
|
||||||
async def _is_id_cached_in_intermediate_data(
|
async def _async_is_id_cached_in_intermediate_data(
|
||||||
self, datadir: Path, pair: str, id: int
|
self, datadir: Path, pair: str, id: int
|
||||||
) -> Optional[Interval]:
|
) -> Optional[Interval]:
|
||||||
int_tree = await self._get_interval_tree_for_pair(datadir, pair)
|
int_tree = await self._async_get_interval_tree_for_pair(datadir, pair)
|
||||||
intervals = sorted(int_tree[int(id)])
|
intervals = sorted(int_tree[int(id)])
|
||||||
return intervals[0] if len(intervals) > 0 else None
|
return intervals[0] if len(intervals) > 0 else None
|
||||||
|
|
||||||
@ -1482,58 +1498,75 @@ class Exchange:
|
|||||||
|
|
||||||
trades_list = trades_dict_to_list(trades)
|
trades_list = trades_dict_to_list(trades)
|
||||||
if trades_list and datadir and len(trades_list) == self.batch_size():
|
if trades_list and datadir and len(trades_list) == self.batch_size():
|
||||||
from_id = trades_list[0][1]
|
pagination_method = "by_id"
|
||||||
to_id = trades_list[-1][1]
|
from_pg_id = trades_list[0][1]
|
||||||
|
to_pg_id = trades_list[-1][1]
|
||||||
|
|
||||||
cached_from_id_interval = await self._is_id_cached_in_intermediate_data(
|
if not from_pg_id:
|
||||||
datadir, pair, from_id)
|
from_pg_id = trades_list[0][0]
|
||||||
cached_to_id_interval = await self._is_id_cached_in_intermediate_data(
|
to_pg_id = trades_list[-1][0]
|
||||||
datadir, pair, to_id)
|
pagination_method = "by_since"
|
||||||
|
|
||||||
# If `from_id` exists in a cached interval, we return everything before the
|
pagination_col_index = 1 if pagination_method == "by_id" else 0
|
||||||
|
|
||||||
|
cached_from_pg_id_interval = await self._async_is_id_cached_in_intermediate_data(
|
||||||
|
datadir, pair, from_pg_id)
|
||||||
|
cached_to_pg_id_interval = await self._async_is_id_cached_in_intermediate_data(
|
||||||
|
datadir, pair, to_pg_id)
|
||||||
|
|
||||||
|
# If `from_pg_id` exists in a cached interval, we return everything before the
|
||||||
# beginning of this cached interval. In the next round, the cached trades (starting
|
# beginning of this cached interval. In the next round, the cached trades (starting
|
||||||
# from `from_id`) will be requested and the response will be handled using a
|
# from `from_pg_id`) will be requested and the response will be handled using a
|
||||||
# cached intermediate trade file.
|
# cached intermediate trade file.
|
||||||
if cached_from_id_interval:
|
if cached_from_pg_id_interval:
|
||||||
# If the cached interval starts from `from_id`, then it's already cached and we
|
# If the cached interval starts from `from_pg_id`, then it's already cached and
|
||||||
# return it. Otherwise, we filter all trades before the beginning of this cached
|
# we return it. Otherwise, we filter all trades before the beginning of this
|
||||||
# interval.
|
# cached interval.
|
||||||
if int(from_id) != cached_from_id_interval.begin:
|
if int(from_pg_id) != cached_from_pg_id_interval.begin:
|
||||||
trades_list = list(filter(
|
trades_list = list(filter(
|
||||||
lambda trade: int(trade[1]) < cached_from_id_interval.begin, trades_list
|
lambda trade: (int(trade[pagination_col_index]) <
|
||||||
|
cached_from_pg_id_interval.begin),
|
||||||
|
trades_list
|
||||||
))
|
))
|
||||||
logger.debug("The result was partially cached in the intermediate result " +
|
logger.debug("The result was partially cached in the intermediate result " +
|
||||||
"(using from_id). Returned %s elements without caching them.",
|
"(using from_pg_id). Returned %s " +
|
||||||
|
"elements without caching them.",
|
||||||
len(trades_list))
|
len(trades_list))
|
||||||
return trades_list
|
return trades_list
|
||||||
|
|
||||||
# If `to_id` exists in a cached interval, we return everything before the
|
# If `to_pg_id` exists in a cached interval, we return everything before the
|
||||||
# beginning of this cached interval. In the next round, the cached trades (starting
|
# beginning of this cached interval. In the next round, the cached trades (starting
|
||||||
# from `to_id`) will be requested and the response will be handled using a
|
# from `to_pg_id`) will be requested and the response will be handled using a
|
||||||
# cached intermediate trade file.
|
# cached intermediate trade file.
|
||||||
if cached_to_id_interval:
|
if cached_to_pg_id_interval:
|
||||||
trades_list = list(filter(
|
trades_list = list(filter(
|
||||||
lambda trade: int(trade[1]) < cached_to_id_interval.begin, trades_list
|
lambda trade: (int(trade[pagination_col_index]) <
|
||||||
|
cached_to_pg_id_interval.begin),
|
||||||
|
trades_list
|
||||||
))
|
))
|
||||||
logger.debug("The result was partially cached in the intermediate result " +
|
logger.debug("The result was partially cached in the intermediate result " +
|
||||||
"(using to_id). Returned %s elements without caching them.",
|
"(using to_pg_id). Returned %s elements without caching them.",
|
||||||
len(trades_list))
|
len(trades_list))
|
||||||
return trades_list
|
return trades_list
|
||||||
|
|
||||||
# If neither `from_id` nor `to_id` are cached, we cache the trades in an
|
# If neither `from_pg_id` nor `to_pg_id` are cached, we cache the trades in an
|
||||||
# intermediate trade file.
|
# intermediate trade file.
|
||||||
assert datadir is not None
|
assert datadir is not None
|
||||||
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id)
|
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_pg_id)
|
||||||
json_string = json.dumps(trades_list)
|
json_string = json.dumps(trades_list)
|
||||||
self._intermediate_data_cache[pair].addi(int(from_id), int(to_id), tmpdata_file)
|
self._intermediate_data_cache[pair].addi(
|
||||||
|
int(from_pg_id),
|
||||||
|
int(to_pg_id),
|
||||||
|
tmpdata_file)
|
||||||
|
|
||||||
with open(tmpdata_file, "w") as text_file:
|
with open(tmpdata_file, "w") as text_file:
|
||||||
text_file.write(json_string)
|
text_file.write(json_string)
|
||||||
logger.debug("Cached the intermediate trades in %s", tmpdata_file)
|
logger.debug("Cached the intermediate trades in %s", tmpdata_file)
|
||||||
else:
|
else:
|
||||||
from_id = trades_list[0][1] if trades_list else 0
|
from_pg_id = trades_list[0][1] if trades_list else -1
|
||||||
|
from_pg_id = from_pg_id if from_pg_id else trades_list[0][0]
|
||||||
datadir = datadir or Path(".")
|
datadir = datadir or Path(".")
|
||||||
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id, False)
|
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_pg_id, False)
|
||||||
logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s",
|
logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s",
|
||||||
tmpdata_file, len(trades_list))
|
tmpdata_file, len(trades_list))
|
||||||
return trades_list
|
return trades_list
|
||||||
@ -1589,7 +1622,7 @@ class Exchange:
|
|||||||
t = []
|
t = []
|
||||||
success_cache_read = False
|
success_cache_read = False
|
||||||
if datadir:
|
if datadir:
|
||||||
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id)
|
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id, False)
|
||||||
if os.path.isfile(tmpdata_file):
|
if os.path.isfile(tmpdata_file):
|
||||||
t = await self._async_fetch_trades_from_file(tmpdata_file)
|
t = await self._async_fetch_trades_from_file(tmpdata_file)
|
||||||
|
|
||||||
|
@ -2274,6 +2274,12 @@ def test__intermediate_trades_file(default_conf, mocker, caplog, exchange_name,
|
|||||||
expected_file = ("user_data/data/" + exchange_name +
|
expected_file = ("user_data/data/" + exchange_name +
|
||||||
"/trades-intermediate-parts/ETH-BTC/254/ETH-BTC_254648140.json")
|
"/trades-intermediate-parts/ETH-BTC/254/ETH-BTC_254648140.json")
|
||||||
assert expected_file == exchange._intermediate_trades_file(datadir, pair, "254648140", False)
|
assert expected_file == exchange._intermediate_trades_file(datadir, pair, "254648140", False)
|
||||||
|
expected_file = (
|
||||||
|
"user_data/data/" + exchange_name +
|
||||||
|
"/trades-intermediate-parts/ETH-BTC/1/623/216/788/143/ETH-BTC_1623216788143201587.json"
|
||||||
|
)
|
||||||
|
assert expected_file == exchange._intermediate_trades_file(
|
||||||
|
datadir, pair, "1623216788143201587", False)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -2298,7 +2304,7 @@ async def test__async_fetch_trades_from_file(default_conf, mocker, caplog, excha
|
|||||||
expected_trades_list = await exchange._async_fetch_trades_from_file(expected_file)
|
expected_trades_list = await exchange._async_fetch_trades_from_file(expected_file)
|
||||||
assert expected_trades_list == json.loads(trades_list_string)
|
assert expected_trades_list == json.loads(trades_list_string)
|
||||||
|
|
||||||
expected_interval_tree = await exchange._get_interval_tree_for_pair(datadir, pair)
|
expected_interval_tree = await exchange._async_get_interval_tree_for_pair(datadir, pair)
|
||||||
expected_interval = Interval(254648140, 254648143, expected_file)
|
expected_interval = Interval(254648140, 254648143, expected_file)
|
||||||
assert len(expected_interval_tree) == 1
|
assert len(expected_interval_tree) == 1
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user