First draft of async get_trade methods
This commit is contained in:
parent
26b3148904
commit
77c367ad1d
@ -745,6 +745,146 @@ class Exchange:
|
|||||||
except ccxt.BaseError as e:
|
except ccxt.BaseError as e:
|
||||||
raise OperationalException(f'Could not fetch ticker data. Msg: {e}') from e
|
raise OperationalException(f'Could not fetch ticker data. Msg: {e}') from e
|
||||||
|
|
||||||
|
@retrier_async
|
||||||
|
async def _async_fetch_trades(self, pair: str,
|
||||||
|
since: Optional[int] = None,
|
||||||
|
params: Optional[dict] = None) -> List[Dict]:
|
||||||
|
"""
|
||||||
|
Asyncronously gets trade history using fetch_trades.
|
||||||
|
:param pair: Pair to fetch trade data for
|
||||||
|
:param since: Since as integer timestamp in milliseconds
|
||||||
|
returns tuple: (pair, ticker_interval, ohlcv_list)
|
||||||
|
"""
|
||||||
|
if not self.exchange_has("fetchTrades"):
|
||||||
|
# TODO: Maybe don't stop the bot ... ?
|
||||||
|
raise OperationalException("This exchange does not suport downloading Trades.")
|
||||||
|
try:
|
||||||
|
# fetch trades asynchronously
|
||||||
|
if params:
|
||||||
|
logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
|
||||||
|
trades = await self._api_async.fetch_trades(pair, params=params, limit=1000)
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
"Fetching trades for pair %s, since %s %s...",
|
||||||
|
pair, since,
|
||||||
|
'(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else ''
|
||||||
|
)
|
||||||
|
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
|
||||||
|
return trades
|
||||||
|
except ccxt.NotSupported as e:
|
||||||
|
raise OperationalException(
|
||||||
|
f'Exchange {self._api.name} does not support fetching historical trade data.'
|
||||||
|
f'Message: {e}') from e
|
||||||
|
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
|
||||||
|
raise TemporaryError(f'Could not load trade history due to {e.__class__.__name__}. '
|
||||||
|
f'Message: {e}') from e
|
||||||
|
except ccxt.BaseError as e:
|
||||||
|
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
|
||||||
|
|
||||||
|
async def _async_get_trade_history_id(self, pair: str,
|
||||||
|
since: Optional[int] = None,
|
||||||
|
until: Optional[int] = None) -> Tuple[str, List[Dict]]:
|
||||||
|
"""
|
||||||
|
Asyncronously gets trade history using fetch_trades
|
||||||
|
use this when exchange doesn't use time-based pagination (e.g. Kraken)
|
||||||
|
:param pair: Pair to fetch trade data for
|
||||||
|
:param since: Since as integer timestamp in milliseconds
|
||||||
|
:param until: Until as integer timestamp in milliseconds
|
||||||
|
returns tuple: (pair, ticker_interval, ohlcv_list)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if self._trades_pagination == 'time':
|
||||||
|
raise OperationalException(f"Wrong method called to get trades for {self.name}")
|
||||||
|
trades: List[Dict] = []
|
||||||
|
|
||||||
|
# Fetch first elements using timebased method to get an ID to paginate on
|
||||||
|
# Depending on the Exchange, this can introduce a drift at the start of the interval
|
||||||
|
# of up to an hour.
|
||||||
|
# Binance returns the "last 1000" candles within a 1h time interval
|
||||||
|
# - so we will miss the first candles.
|
||||||
|
t = await self._async_fetch_trades(pair, since=since)
|
||||||
|
from_id = t[-1]['id']
|
||||||
|
trades.extend(t)
|
||||||
|
while True:
|
||||||
|
t = await self._async_fetch_trades(pair,
|
||||||
|
params={self._trades_pagination_arg: from_id})
|
||||||
|
if len(t):
|
||||||
|
from_id = t[-1]['id']
|
||||||
|
# TODO: eliminate duplicates (first trade = last from previous)
|
||||||
|
trades.extend(t)
|
||||||
|
# Reached the end of the defined-download period
|
||||||
|
if until and t[-1]['timestamp'] > until:
|
||||||
|
print(f"Reached {t[-1]['timestamp']} > {until}")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
return (pair, trades)
|
||||||
|
except ccxt.NotSupported as e:
|
||||||
|
raise OperationalException(
|
||||||
|
f'Exchange {self._api.name} does not support fetching historical trade data.'
|
||||||
|
f'Message: {e}') from e
|
||||||
|
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
|
||||||
|
raise TemporaryError(f'Could not load trade history due to {e.__class__.__name__}. '
|
||||||
|
f'Message: {e}') from e
|
||||||
|
except ccxt.BaseError as e:
|
||||||
|
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
|
||||||
|
|
||||||
|
async def _async_get_trade_history(self, pair: str,
|
||||||
|
since: Optional[int] = None,
|
||||||
|
until: Optional[int] = None) -> Tuple[str, List]:
|
||||||
|
"""
|
||||||
|
Asyncronously gets trade history using fetch_trades.
|
||||||
|
:param pair: Pair to fetch trade data for
|
||||||
|
:param since: Since as integer timestamp in milliseconds
|
||||||
|
:param until: Until as integer timestamp in milliseconds
|
||||||
|
returns tuple: (pair, ticker_interval, ohlcv_list)
|
||||||
|
"""
|
||||||
|
if not self.exchange_has("fetchTrades"):
|
||||||
|
# TODO: Maybe don't completey stop the bot ... ?
|
||||||
|
raise OperationalException("This exchange does not suport downloading Trades.")
|
||||||
|
try:
|
||||||
|
if self._trades_pagination != 'time':
|
||||||
|
return await self._async_get_trade_history_id(pair, since, until)
|
||||||
|
|
||||||
|
trades: List[Dict] = []
|
||||||
|
while True:
|
||||||
|
t = await self._async_fetch_trades(pair, since=since)
|
||||||
|
if len(t):
|
||||||
|
since = t[-1]['timestamp']
|
||||||
|
trades.extend(t)
|
||||||
|
# Reached the end of the defined-download period
|
||||||
|
if until and t[-1]['timestamp'] > until:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
return (pair, trades)
|
||||||
|
except ccxt.NotSupported as e:
|
||||||
|
raise OperationalException(
|
||||||
|
f'Exchange {self._api.name} does not support fetching historical trade data.'
|
||||||
|
f'Message: {e}') from e
|
||||||
|
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
|
||||||
|
raise TemporaryError(f'Could not load trade history due to {e.__class__.__name__}. '
|
||||||
|
f'Message: {e}') from e
|
||||||
|
except ccxt.BaseError as e:
|
||||||
|
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
|
||||||
|
|
||||||
|
def get_historic_trades(self, pair: str,
|
||||||
|
since: Optional[int] = None,
|
||||||
|
until: Optional[int] = None) -> List:
|
||||||
|
"""
|
||||||
|
Gets candle history using asyncio and returns the list of candles.
|
||||||
|
Handles all async doing.
|
||||||
|
Async over one pair, assuming we get `_ohlcv_candle_limit` candles per call.
|
||||||
|
:param pair: Pair to download
|
||||||
|
:param ticker_interval: Interval to get
|
||||||
|
:param since_ms: Timestamp in milliseconds to get history from
|
||||||
|
:returns List of tickers
|
||||||
|
"""
|
||||||
|
return asyncio.get_event_loop().run_until_complete(
|
||||||
|
self._async_get_trade_history(pair=pair, since=since, until=until))
|
||||||
|
|
||||||
@retrier
|
@retrier
|
||||||
def cancel_order(self, order_id: str, pair: str) -> None:
|
def cancel_order(self, order_id: str, pair: str) -> None:
|
||||||
if self._config['dry_run']:
|
if self._config['dry_run']:
|
||||||
|
Loading…
Reference in New Issue
Block a user