Added intermediate files to help with progressive downloads

This commit is contained in:
Mohammad Dashti 2021-09-16 08:48:22 -07:00
parent 90ad178932
commit 0439ae0072
3 changed files with 89 additions and 15 deletions

View File

@ -21,6 +21,11 @@ def create_datadir(config: Dict[str, Any], datadir: Optional[str] = None) -> Pat
if not folder.is_dir():
folder.mkdir(parents=True)
logger.info(f'Created data directory: {datadir}')
intermediate_dir = folder.joinpath('trades-intermediate-parts')
if not intermediate_dir.is_dir():
intermediate_dir.mkdir(parents=True)
logger.info(f'Created intermediate data directory: {intermediate_dir}')
return folder

View File

@ -306,6 +306,7 @@ def _download_trades_history(exchange: Exchange,
since=since,
until=until,
from_id=from_id,
datadir=data_handler._datadir
)
trades.extend(new_trades[1])
# Remove duplicates to make sure we're not storing data we don't need

View File

@ -13,6 +13,10 @@ from typing import Any, Dict, List, Optional, Tuple
import arrow
import ccxt
import json
import os
from pathlib import Path
import ccxt.async_support as ccxt_async
from cachetools import TTLCache
from ccxt.base.decimal_to_precision import (ROUND_DOWN, ROUND_UP, TICK_SIZE, TRUNCATE,
@ -1369,11 +1373,32 @@ class Exchange:
f'for pair {pair}. Message: {e}') from e
# Fetch historic trades
def _intermediate_trades_dir(self, datadir: str, pair: str, from_id: int) -> str:
tmpdata_file = os.path.join(datadir, "trades-intermediate-parts")
tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-"))
tmpdata_file = os.path.join(tmpdata_file, str(int(from_id)//1000000))
Path(tmpdata_file).mkdir(parents=True, exist_ok=True)
tmpdata_file = os.path.join(tmpdata_file, pair.replace("/","-")+"_"+from_id+".json")
return tmpdata_file
@retrier_async
async def _async_fetch_trades_from_file(self, datafile: str) -> List[List]:
# Open a file: file
file = open(datafile, mode='r')
# read all lines at once
json_string = file.read()
# close the file
file.close()
trades_list = json.loads(json_string)
return trades_list
@retrier_async
async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None,
params: Optional[dict] = None) -> List[List]:
params: Optional[dict] = None,
datadir: Optional[str] = None) -> List[List]:
"""
Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange.
@ -1385,15 +1410,30 @@ class Exchange:
# fetch trades asynchronously
if params:
logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
trades = await self._api_async.fetch_trades(pair, params=params, limit=1000)
trades = await self._api_async.fetch_trades(pair, params=params, limit=self.batch_size())
else:
logger.debug(
"Fetching trades for pair %s, since %s %s...",
pair, since,
'(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else ''
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
return trades_dict_to_list(trades)
trades = await self._api_async.fetch_trades(pair, since=since, limit=self.batch_size())
trades_list = trades_dict_to_list(trades)
if trades_list and len(trades_list) == self.batch_size() and datadir:
from_id = trades_list[0][1]
tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id)
json_string = json.dumps(trades_list)
with open(tmpdata_file, "w") as text_file:
text_file.write(json_string)
logger.debug("Cached the intermediate trades in %s", tmpdata_file)
else:
from_id = trades_list[0][1] if trades_list else 0
tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id)
logger.debug("DID NOT CACHE the intermediate trades in %s with len=%s", tmpdata_file, len(trades_list))
return trades_list
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.'
@ -1406,10 +1446,15 @@ class Exchange:
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
def batch_size(self) -> int:
return 1000
async def _async_get_trade_history_id(self, pair: str,
until: int,
since: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[List]]:
from_id: Optional[str] = None,
datadir: Optional[str] = None
) -> Tuple[str, List[List]]:
"""
Asyncronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`)
@ -1428,14 +1473,32 @@ class Exchange:
# of up to an hour.
# e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since)
t = await self._async_fetch_trades(pair, since=since, datadir=datadir)
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
from_id = t[-1][1]
trades.extend(t[:-1])
while True:
t = await self._async_fetch_trades(pair,
params={self._trades_pagination_arg: from_id})
tmpdata_file = self._intermediate_trades_dir(datadir, pair, from_id)
t = []
success_cache_read = False
if os.path.isfile(tmpdata_file):
t = await self._async_fetch_trades_from_file(tmpdata_file)
if len(t) < self.batch_size():
success_cache_read = False
logger.debug("Read from cache %s", tmpdata_file)
else:
success_cache_read = True
from_id = t[0][1]
to_id = t[-1][1]
logger.debug("Read from cache %s from %s to %s", tmpdata_file, from_id, to_id)
if not success_cache_read:
t = await self._async_fetch_trades(pair,
params={self._trades_pagination_arg: from_id},
datadir=datadir)
if t:
# Skip last id since its the key for the next call
trades.extend(t[:-1])
@ -1453,7 +1516,9 @@ class Exchange:
return (pair, trades)
async def _async_get_trade_history_time(self, pair: str, until: int,
since: Optional[int] = None) -> Tuple[str, List[List]]:
since: Optional[int] = None,
datadir: Optional[str] = None
) -> Tuple[str, List[List]]:
"""
Asyncronously gets trade history using fetch_trades,
when the exchange uses time-based iteration (check `self._trades_pagination`)
@ -1467,7 +1532,7 @@ class Exchange:
# DEFAULT_TRADES_COLUMNS: 0 -> timestamp
# DEFAULT_TRADES_COLUMNS: 1 -> id
while True:
t = await self._async_fetch_trades(pair, since=since)
t = await self._async_fetch_trades(pair, since=since, datadir=datadir)
if t:
since = t[-1][0]
trades.extend(t)
@ -1484,7 +1549,9 @@ class Exchange:
async def _async_get_trade_history(self, pair: str,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[List]]:
from_id: Optional[str] = None,
datadir: Optional[str] = None
) -> Tuple[str, List[List]]:
"""
Async wrapper handling downloading trades using either time or id based methods.
"""
@ -1498,10 +1565,10 @@ class Exchange:
if self._trades_pagination == 'time':
return await self._async_get_trade_history_time(
pair=pair, since=since, until=until)
pair=pair, since=since, until=until, datadir=datadir)
elif self._trades_pagination == 'id':
return await self._async_get_trade_history_id(
pair=pair, since=since, until=until, from_id=from_id
pair=pair, since=since, until=until, from_id=from_id, datadir=datadir
)
else:
raise OperationalException(f"Exchange {self.name} does use neither time, "
@ -1510,7 +1577,8 @@ class Exchange:
def get_historic_trades(self, pair: str,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List]:
from_id: Optional[str] = None,
datadir: Optional[str] = None) -> Tuple[str, List]:
"""
Get trade history data using asyncio.
Handles all async work and returns the list of candles.
@ -1526,7 +1594,7 @@ class Exchange:
return asyncio.get_event_loop().run_until_complete(
self._async_get_trade_history(pair=pair, since=since,
until=until, from_id=from_id))
until=until, from_id=from_id, datadir=datadir))
def is_exchange_known_ccxt(exchange_name: str, ccxt_module: CcxtModuleType = None) -> bool: