Refactor trade downloading to handle exceptions only once

This commit is contained in:
Matthias 2019-08-29 12:56:10 +02:00
parent 476adf872a
commit 0d592f6c55
2 changed files with 67 additions and 67 deletions

View File

@ -364,6 +364,7 @@ def download_trades_history(datadir: Optional[Path],
logger.debug("New Start: %s", trades[0]['datetime']) logger.debug("New Start: %s", trades[0]['datetime'])
logger.debug("New End: %s", trades[-1]['datetime']) logger.debug("New End: %s", trades[-1]['datetime'])
logger.info(f"New Amount of trades: {len(trades)}") logger.info(f"New Amount of trades: {len(trades)}")
return True
except Exception as e: except Exception as e:
logger.error( logger.error(

View File

@ -751,13 +751,11 @@ class Exchange:
params: Optional[dict] = None) -> List[Dict]: params: Optional[dict] = None) -> List[Dict]:
""" """
Asyncronously gets trade history using fetch_trades. Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange.
:param pair: Pair to fetch trade data for :param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds :param since: Since as integer timestamp in milliseconds
returns tuple: (pair, ticker_interval, ohlcv_list) returns tuple: (pair, ticker_interval, ohlcv_list)
""" """
if not self.exchange_has("fetchTrades"):
# TODO: Maybe don't stop the bot ... ?
raise OperationalException("This exchange does not suport downloading Trades.")
try: try:
# fetch trades asynchronously # fetch trades asynchronously
if params: if params:
@ -787,58 +785,47 @@ class Exchange:
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]: from_id: Optional[str] = None) -> Tuple[str, List[Dict]]:
""" """
Asyncronously gets trade history using fetch_trades Asyncronously gets trade history using fetch_trades
use this when exchange doesn't use time-based pagination (e.g. Kraken) use this when exchange uses id-based iteration
:param pair: Pair to fetch trade data for :param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds :param since: Since as integer timestamp in milliseconds
:param until: Until as integer timestamp in milliseconds :param until: Until as integer timestamp in milliseconds
:param from_id: Download data starting with ID (if id is known). Ignores "since" if set. :param from_id: Download data starting with ID (if id is known). Ignores "since" if set.
returns tuple: (pair, ticker_interval, ohlcv_list) returns tuple: (pair, ticker_interval, ohlcv_list)
""" """
try: if self._trades_pagination == 'time':
if self._trades_pagination == 'time': raise OperationalException(f"Wrong method called to get trades for {self.name}")
raise OperationalException(f"Wrong method called to get trades for {self.name}")
trades: List[Dict] = [] trades: List[Dict] = []
if not from_id: if not from_id:
# Fetch first elements using timebased method to get an ID to paginate on # Fetch first elements using timebased method to get an ID to paginate on
# Depending on the Exchange, this can introduce a drift at the start of the interval # Depending on the Exchange, this can introduce a drift at the start of the interval
# of up to an hour. # of up to an hour.
# Binance returns the "last 1000" candles within a 1h time interval # Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first candles. # - so we will miss the first candles.
t = await self._async_fetch_trades(pair, since=since) t = await self._async_fetch_trades(pair, since=since)
from_id = t[-1]['id'] from_id = t[-1]['id']
trades.extend(t[:-1])
while True:
t = await self._async_fetch_trades(pair,
params={self._trades_pagination_arg: from_id})
if len(t):
# Skip last id since its the key for the next call
trades.extend(t[:-1]) trades.extend(t[:-1])
while True: if from_id == t[-1]['id'] or (until and t[-1]['timestamp'] > until):
t = await self._async_fetch_trades(pair, logger.debug(f"Stopping because from_id did not change. "
params={self._trades_pagination_arg: from_id}) f"Reached {t[-1]['timestamp']} > {until}")
if len(t):
# Skip last id since its the key for the next call
trades.extend(t[:-1])
if from_id == t[-1]['id'] or (until and t[-1]['timestamp'] > until):
logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1]['timestamp']} > {until}")
break
# Reached the end of the defined-download period
from_id = t[-1]['id']
else:
break break
return (pair, trades) # Reached the end of the defined-download period
except ccxt.NotSupported as e: from_id = t[-1]['id']
raise OperationalException( else:
f'Exchange {self._api.name} does not support fetching historical trade data.' break
f'Message: {e}') from e
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(f'Could not load trade history due to {e.__class__.__name__}. '
f'Message: {e}') from e
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
async def _async_get_trade_history_time(self, pair: str, return (pair, trades)
since: Optional[int] = None,
until: Optional[int] = None) -> Tuple[str, List]: async def _async_get_trade_history_time(self, pair: str, until: int,
since: Optional[int] = None) -> Tuple[str, List]:
""" """
Asyncronously gets trade history using fetch_trades, Asyncronously gets trade history using fetch_trades,
when the exchange uses time-based iteration when the exchange uses time-based iteration
@ -849,21 +836,41 @@ class Exchange:
""" """
if self._trades_pagination != 'time': if self._trades_pagination != 'time':
raise OperationalException(f"Wrong method called to get trades for {self.name}") raise OperationalException(f"Wrong method called to get trades for {self.name}")
try:
trades: List[Dict] = [] trades: List[Dict] = []
while True: while True:
t = await self._async_fetch_trades(pair, since=since) t = await self._async_fetch_trades(pair, since=since)
if len(t): if len(t):
since = t[-1]['timestamp'] since = t[-1]['timestamp']
trades.extend(t) trades.extend(t)
# Reached the end of the defined-download period # Reached the end of the defined-download period
if until and t[-1]['timestamp'] > until: if until and t[-1]['timestamp'] > until:
break
else:
break break
else:
break
return (pair, trades)
async def _async_get_trade_history(self, pair: str,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]:
"""
Async wrapper handling downloading trades using either time or id based methods.
"""
if not self.exchange_has("fetchTrades"):
# TODO: Maybe don't stop the bot ... ?
raise OperationalException("This exchange does not suport downloading Trades.")
try:
if not until:
# Current milliseconds
until = ccxt.Exchange.milliseconds()
if self._trades_pagination == 'time':
return await self._async_get_trade_history_time(pair=pair, since=since, until=until)
elif self._trades_pagination == 'id':
return await self._async_get_trade_history_id(pair=pair, since=since,
until=until, from_id=from_id)
return (pair, trades)
except ccxt.NotSupported as e: except ccxt.NotSupported as e:
raise OperationalException( raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.' f'Exchange {self._api.name} does not support fetching historical trade data.'
@ -877,7 +884,7 @@ class Exchange:
def get_historic_trades(self, pair: str, def get_historic_trades(self, pair: str,
since: Optional[int] = None, since: Optional[int] = None,
until: Optional[int] = None, until: Optional[int] = None,
from_id: Optional[str] = None) -> List: from_id: Optional[str] = None) -> Tuple[str, List]:
""" """
Gets candle history using asyncio and returns the list of candles. Gets candle history using asyncio and returns the list of candles.
Handles all async doing. Handles all async doing.
@ -889,18 +896,10 @@ class Exchange:
:param from_id: Download data starting with ID (if id is known) :param from_id: Download data starting with ID (if id is known)
:returns List of tickers :returns List of tickers
""" """
if not until:
# Current milliseconds
until = ccxt.Exchange.milliseconds()
if self._trades_pagination == 'time':
return asyncio.get_event_loop().run_until_complete(
self._async_get_trade_history_time(pair=pair, since=since, until=until))
elif self._trades_pagination == 'id': return asyncio.get_event_loop().run_until_complete(
# Use id-based trade-downloader self._async_get_trade_history(pair=pair, since=since,
return asyncio.get_event_loop().run_until_complete( until=until, from_id=from_id))
self._async_get_trade_history_id(pair=pair, since=since,
until=until, from_id=from_id))
@retrier @retrier
def cancel_order(self, order_id: str, pair: str) -> None: def cancel_order(self, order_id: str, pair: str) -> None: