Added a feature to facilitate caching data between multiple sessions
This commit is contained in:
parent
0439ae0072
commit
62ee82a15d
@ -17,6 +17,7 @@ dependencies:
|
|||||||
- requests
|
- requests
|
||||||
- urllib3
|
- urllib3
|
||||||
- wrapt
|
- wrapt
|
||||||
|
- intervaltree
|
||||||
- jsonschema
|
- jsonschema
|
||||||
- TA-Lib
|
- TA-Lib
|
||||||
- tabulate
|
- tabulate
|
||||||
|
@ -16,6 +16,7 @@ import ccxt
|
|||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from intervaltree import Interval, IntervalTree
|
||||||
|
|
||||||
import ccxt.async_support as ccxt_async
|
import ccxt.async_support as ccxt_async
|
||||||
from cachetools import TTLCache
|
from cachetools import TTLCache
|
||||||
@ -78,6 +79,11 @@ class Exchange:
|
|||||||
}
|
}
|
||||||
_ft_has: Dict = {}
|
_ft_has: Dict = {}
|
||||||
|
|
||||||
|
# For each pair, we cache the intermediate data files stored for trades if they exist
|
||||||
|
# This dictionary maps each pair name (e.g., `ETH-BTC`) to an `IntervalTree`. This interval tree
|
||||||
|
# contains the intervals of trade IDs that are already cached in the intermediate files.
|
||||||
|
_intermediate_data_cache: Dict = {}
|
||||||
|
|
||||||
def __init__(self, config: Dict[str, Any], validate: bool = True) -> None:
|
def __init__(self, config: Dict[str, Any], validate: bool = True) -> None:
|
||||||
"""
|
"""
|
||||||
Initializes this module with the given config,
|
Initializes this module with the given config,
|
||||||
@ -1372,15 +1378,25 @@ class Exchange:
|
|||||||
raise OperationalException(f'Could not fetch historical candle (OHLCV) data '
|
raise OperationalException(f'Could not fetch historical candle (OHLCV) data '
|
||||||
f'for pair {pair}. Message: {e}') from e
|
f'for pair {pair}. Message: {e}') from e
|
||||||
|
|
||||||
# Fetch historic trades
|
# Directory name for a pair, e.g., given `ETH/BTC` returns `ETH-BTC`.
|
||||||
def _intermediate_trades_dir(self, datadir: str, pair: str, from_id: int) -> str:
|
def _pair_dir(self, pair: str) -> str:
|
||||||
tmpdata_file = os.path.join(datadir, "trades-intermediate-parts")
|
return pair.replace("/","-")
|
||||||
tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-"))
|
|
||||||
|
# Returns the directory path that contains the intermediate trade files for a given pair.
|
||||||
|
def _intermediate_trades_dir_for_pair(self, datadir: str, pair: str) -> str:
|
||||||
|
tmpdata_dir = os.path.join(datadir, "trades-intermediate-parts")
|
||||||
|
tmpdata_dir = os.path.join(tmpdata_dir, self._pair_dir(pair))
|
||||||
|
return tmpdata_dir
|
||||||
|
|
||||||
|
# Returns the intermediate trade file name for a given pair starting with `from_id` trade ID
|
||||||
|
def _intermediate_trades_file(self, datadir: str, pair: str, from_id: int) -> str:
|
||||||
|
tmpdata_file = self._intermediate_trades_dir_for_pair(datadir, pair)
|
||||||
tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000))
|
tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000))
|
||||||
Path(tmpdata_file).mkdir(parents=True, exist_ok=True)
|
Path(tmpdata_file).mkdir(parents=True, exist_ok=True)
|
||||||
tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-")+"_"+from_id+".json")
|
tmpdata_file = os.path.join(tmpdata_file, self._pair_dir(pair)+"_"+from_id+".json")
|
||||||
return tmpdata_file
|
return tmpdata_file
|
||||||
|
|
||||||
|
# Fetch historic trades
|
||||||
@retrier_async
|
@retrier_async
|
||||||
async def _async_fetch_trades_from_file(self, datafile: str) -> List[List]:
|
async def _async_fetch_trades_from_file(self, datafile: str) -> List[List]:
|
||||||
# Open a file: file
|
# Open a file: file
|
||||||
@ -1394,6 +1410,48 @@ class Exchange:
|
|||||||
trades_list = json.loads(json_string)
|
trades_list = json.loads(json_string)
|
||||||
return trades_list
|
return trades_list
|
||||||
|
|
||||||
|
# Builds the interval tree (from the intermediate trade files) for a given pair
|
||||||
|
@retrier_async
|
||||||
|
async def _get_interval_tree_for_pair(self, datadir: str, pair: str) -> IntervalTree:
|
||||||
|
cached_res = self._intermediate_data_cache.get(pair, None)
|
||||||
|
if cached_res:
|
||||||
|
return cached_res;
|
||||||
|
|
||||||
|
logger.debug("Caching intervals for pair %s", pair)
|
||||||
|
cache_interval_tree = IntervalTree()
|
||||||
|
tmpdata_dir = self._intermediate_trades_dir_for_pair(datadir, pair)
|
||||||
|
if os.path.isdir(tmpdata_dir):
|
||||||
|
tmpdata_subdirs = os.listdir(tmpdata_dir)
|
||||||
|
for tmpdata_subdir in tmpdata_subdirs:
|
||||||
|
if tmpdata_subdir.isnumeric():
|
||||||
|
tmpdata_subdir = os.path.join(tmpdata_dir, tmpdata_subdir)
|
||||||
|
tmpdata_files = os.listdir(tmpdata_subdir)
|
||||||
|
for tmpdata_file in tmpdata_files:
|
||||||
|
if tmpdata_file.startswith(self._pair_dir(pair)):
|
||||||
|
tmpdata_file = os.path.join(tmpdata_subdir, tmpdata_file)
|
||||||
|
if os.path.isfile(tmpdata_file):
|
||||||
|
trades_list = await self._async_fetch_trades_from_file(tmpdata_file)
|
||||||
|
trades_list_from_id = trades_list[0][1]
|
||||||
|
trades_list_to_id = trades_list[-1][1]
|
||||||
|
cache_interval_tree.addi(
|
||||||
|
int(trades_list_from_id),
|
||||||
|
int(trades_list_to_id)+1,
|
||||||
|
tmpdata_file)
|
||||||
|
|
||||||
|
logger.debug("Cached intervals for pair %s: %s intervals", pair, len(cache_interval_tree))
|
||||||
|
self._intermediate_data_cache[pair] = cache_interval_tree
|
||||||
|
return cache_interval_tree
|
||||||
|
|
||||||
|
# Checks whether the given trade id is cached in the intermediate trade files.
|
||||||
|
# If it exists, returns the `Interval` for the cache file. Otherwise, returns `None`.
|
||||||
|
@retrier_async
|
||||||
|
async def _is_id_cached_in_intermediate_data(
|
||||||
|
self, datadir: str, pair: str, id: int
|
||||||
|
) -> Optional[Interval]:
|
||||||
|
int_tree = await self._get_interval_tree_for_pair(datadir, pair)
|
||||||
|
intervals = sorted(int_tree[int(id)])
|
||||||
|
return intervals[0] if len(intervals) > 0 else None
|
||||||
|
|
||||||
@retrier_async
|
@retrier_async
|
||||||
async def _async_fetch_trades(self, pair: str,
|
async def _async_fetch_trades(self, pair: str,
|
||||||
since: Optional[int] = None,
|
since: Optional[int] = None,
|
||||||
@ -1421,18 +1479,58 @@ class Exchange:
|
|||||||
|
|
||||||
|
|
||||||
trades_list = trades_dict_to_list(trades)
|
trades_list = trades_dict_to_list(trades)
|
||||||
if trades_list and len(trades_list) == self.batch_size() and datadir:
|
if trades_list and datadir and len(trades_list) == self.batch_size():
|
||||||
from_id = trades_list[0][1]
|
from_id = trades_list[0][1]
|
||||||
tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id)
|
to_id = trades_list[-1][1]
|
||||||
|
|
||||||
|
cached_from_id_interval = await self._is_id_cached_in_intermediate_data(
|
||||||
|
datadir, pair, from_id)
|
||||||
|
cached_to_id_interval = await self._is_id_cached_in_intermediate_data(
|
||||||
|
datadir, pair, to_id)
|
||||||
|
|
||||||
|
# If `from_id` exists in a cached interval, we return everything before the
|
||||||
|
# beginning of this cached interval. In the next round, the cached trades (starting
|
||||||
|
# from `from_id`) will be requested and the response will be handled using a
|
||||||
|
# cached intermediate trade file.
|
||||||
|
if cached_from_id_interval:
|
||||||
|
# If the cached interval starts from `from_id`, then it's already cached and we
|
||||||
|
# return it. Otherwise, we filter all trades before the beginning of this cached
|
||||||
|
# interval.
|
||||||
|
if int(from_id) != cached_from_id_interval.begin:
|
||||||
|
trades_list = list(filter(
|
||||||
|
lambda trade: int(trade[1]) < cached_from_id_interval.begin, trades_list
|
||||||
|
))
|
||||||
|
logger.debug("The result was partially cached in the intermediate result " +
|
||||||
|
"(using from_id). Returned %s elements without caching them.",
|
||||||
|
len(trades_list))
|
||||||
|
return trades_list
|
||||||
|
|
||||||
|
# If `to_id` exists in a cached interval, we return everything before the
|
||||||
|
# beginning of this cached interval. In the next round, the cached trades (starting
|
||||||
|
# from `to_id`) will be requested and the response will be handled using a
|
||||||
|
# cached intermediate trade file.
|
||||||
|
if cached_to_id_interval:
|
||||||
|
trades_list = list(filter(
|
||||||
|
lambda trade: int(trade[1]) < cached_to_id_interval.begin, trades_list
|
||||||
|
))
|
||||||
|
logger.debug("The result was partially cached in the intermediate result " +
|
||||||
|
"(using to_id). Returned %s elements without caching them.",
|
||||||
|
len(trades_list))
|
||||||
|
return trades_list
|
||||||
|
|
||||||
|
|
||||||
|
# If neither `from_id` nor `to_id` are cached, we cache the trades in an
|
||||||
|
# intermediate trade file.
|
||||||
|
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id)
|
||||||
json_string = json.dumps(trades_list)
|
json_string = json.dumps(trades_list)
|
||||||
with open(tmpdata_file, "w") as text_file:
|
with open(tmpdata_file, "w") as text_file:
|
||||||
text_file.write(json_string)
|
text_file.write(json_string)
|
||||||
logger.debug("Cached the intermediate trades in %s", tmpdata_file)
|
logger.debug("Cached the intermediate trades in %s", tmpdata_file)
|
||||||
else:
|
else:
|
||||||
from_id = trades_list[0][1] if trades_list else 0
|
from_id = trades_list[0][1] if trades_list else 0
|
||||||
tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id)
|
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id)
|
||||||
logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s", tmpdata_file, len(trades_list))
|
logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s",
|
||||||
|
tmpdata_file, len(trades_list))
|
||||||
return trades_list
|
return trades_list
|
||||||
except ccxt.NotSupported as e:
|
except ccxt.NotSupported as e:
|
||||||
raise OperationalException(
|
raise OperationalException(
|
||||||
@ -1477,9 +1575,12 @@ class Exchange:
|
|||||||
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
|
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
|
||||||
# DEFAULT_TRADES_COLUMNS: 1 -> id
|
# DEFAULT_TRADES_COLUMNS: 1 -> id
|
||||||
from_id = t[-1][1]
|
from_id = t[-1][1]
|
||||||
|
logger.debug(
|
||||||
|
"Did not cache the intermediate trades since %s with len=%s and from_id=%s", since,
|
||||||
|
len(t), from_id)
|
||||||
trades.extend(t[:-1])
|
trades.extend(t[:-1])
|
||||||
while True:
|
while True:
|
||||||
tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id)
|
tmpdata_file = self._intermediate_trades_file(datadir, pair, from_id)
|
||||||
|
|
||||||
t = []
|
t = []
|
||||||
success_cache_read = False
|
success_cache_read = False
|
||||||
|
@ -12,6 +12,7 @@ cachetools==4.2.2
|
|||||||
requests==2.26.0
|
requests==2.26.0
|
||||||
urllib3==1.26.6
|
urllib3==1.26.6
|
||||||
wrapt==1.12.1
|
wrapt==1.12.1
|
||||||
|
intervaltree==3.1.0
|
||||||
jsonschema==3.2.0
|
jsonschema==3.2.0
|
||||||
TA-Lib==0.4.21
|
TA-Lib==0.4.21
|
||||||
technical==1.3.0
|
technical==1.3.0
|
||||||
|
Loading…
Reference in New Issue
Block a user