Merge pull request #2372 from xmatthias/kraken_ohlcv_emulate

download tick-based data to emulate candles
This commit is contained in:
hroff-1902
2019-10-19 19:32:37 +03:00
committed by GitHub
19 changed files with 799 additions and 15 deletions

View File

@@ -16,6 +16,8 @@ class Binance(Exchange):
_ft_has: Dict = {
"stoploss_on_exchange": True,
"order_time_in_force": ['gtc', 'fok', 'ioc'],
"trades_pagination": "id",
"trades_pagination_arg": "fromId",
}
def get_order_book(self, pair: str, limit: int = 100) -> dict:

View File

@@ -147,6 +147,8 @@ def retrier(f):
class Exchange:
_config: Dict = {}
# Parameters to add directly to buy/sell calls (like agreeing to trading agreement)
_params: Dict = {}
# Dict to specify which options each exchange implements
@@ -157,6 +159,9 @@ class Exchange:
"order_time_in_force": ["gtc"],
"ohlcv_candle_limit": 500,
"ohlcv_partial_candle": True,
"trades_pagination": "time", # Possible are "time" or "id"
"trades_pagination_arg": "since",
}
_ft_has: Dict = {}
@@ -200,6 +205,9 @@ class Exchange:
self._ohlcv_candle_limit = self._ft_has['ohlcv_candle_limit']
self._ohlcv_partial_candle = self._ft_has['ohlcv_partial_candle']
self._trades_pagination = self._ft_has['trades_pagination']
self._trades_pagination_arg = self._ft_has['trades_pagination_arg']
# Initialize ccxt objects
self._api = self._init_ccxt(
exchange_config, ccxt_kwargs=exchange_config.get('ccxt_config'))
@@ -742,6 +750,154 @@ class Exchange:
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch ticker data. Msg: {e}') from e
@retrier_async
async def _async_fetch_trades(self, pair: str,
since: Optional[int] = None,
params: Optional[dict] = None) -> List[Dict]:
"""
Asyncronously gets trade history using fetch_trades.
Handles exchange errors, does one call to the exchange.
:param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds
returns: List of dicts containing trades
"""
try:
# fetch trades asynchronously
if params:
logger.debug("Fetching trades for pair %s, params: %s ", pair, params)
trades = await self._api_async.fetch_trades(pair, params=params, limit=1000)
else:
logger.debug(
"Fetching trades for pair %s, since %s %s...",
pair, since,
'(' + arrow.get(since // 1000).isoformat() + ') ' if since is not None else ''
)
trades = await self._api_async.fetch_trades(pair, since=since, limit=1000)
return trades
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical trade data.'
f'Message: {e}') from e
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(f'Could not load trade history due to {e.__class__.__name__}. '
f'Message: {e}') from e
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch trade data. Msg: {e}') from e
async def _async_get_trade_history_id(self, pair: str,
until: int,
since: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]:
"""
Asyncronously gets trade history using fetch_trades
use this when exchange uses id-based iteration (check `self._trades_pagination`)
:param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds
:param until: Until as integer timestamp in milliseconds
:param from_id: Download data starting with ID (if id is known). Ignores "since" if set.
returns tuple: (pair, trades-list)
"""
trades: List[Dict] = []
if not from_id:
# Fetch first elements using timebased method to get an ID to paginate on
# Depending on the Exchange, this can introduce a drift at the start of the interval
# of up to an hour.
# e.g. Binance returns the "last 1000" candles within a 1h time interval
# - so we will miss the first trades.
t = await self._async_fetch_trades(pair, since=since)
from_id = t[-1]['id']
trades.extend(t[:-1])
while True:
t = await self._async_fetch_trades(pair,
params={self._trades_pagination_arg: from_id})
if len(t):
# Skip last id since its the key for the next call
trades.extend(t[:-1])
if from_id == t[-1]['id'] or t[-1]['timestamp'] > until:
logger.debug(f"Stopping because from_id did not change. "
f"Reached {t[-1]['timestamp']} > {until}")
# Reached the end of the defined-download period - add last trade as well.
trades.extend(t[-1:])
break
from_id = t[-1]['id']
else:
break
return (pair, trades)
async def _async_get_trade_history_time(self, pair: str, until: int,
since: Optional[int] = None) -> Tuple[str, List]:
"""
Asyncronously gets trade history using fetch_trades,
when the exchange uses time-based iteration (check `self._trades_pagination`)
:param pair: Pair to fetch trade data for
:param since: Since as integer timestamp in milliseconds
:param until: Until as integer timestamp in milliseconds
returns tuple: (pair, trades-list)
"""
trades: List[Dict] = []
while True:
t = await self._async_fetch_trades(pair, since=since)
if len(t):
since = t[-1]['timestamp']
trades.extend(t)
# Reached the end of the defined-download period
if until and t[-1]['timestamp'] > until:
logger.debug(
f"Stopping because until was reached. {t[-1]['timestamp']} > {until}")
break
else:
break
return (pair, trades)
async def _async_get_trade_history(self, pair: str,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List[Dict]]:
"""
Async wrapper handling downloading trades using either time or id based methods.
"""
if self._trades_pagination == 'time':
return await self._async_get_trade_history_time(
pair=pair, since=since,
until=until or ccxt.Exchange.milliseconds())
elif self._trades_pagination == 'id':
return await self._async_get_trade_history_id(
pair=pair, since=since,
until=until or ccxt.Exchange.milliseconds(), from_id=from_id
)
else:
raise OperationalException(f"Exchange {self.name} does use neither time, "
f"nor id based pagination")
def get_historic_trades(self, pair: str,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[str] = None) -> Tuple[str, List]:
"""
Gets candle history using asyncio and returns the list of candles.
Handles all async doing.
Async over one pair, assuming we get `_ohlcv_candle_limit` candles per call.
:param pair: Pair to download
:param ticker_interval: Interval to get
:param since: Timestamp in milliseconds to get history from
:param until: Timestamp in milliseconds. Defaults to current timestamp if not defined.
:param from_id: Download data starting with ID (if id is known)
:returns List of tickers
"""
if not self.exchange_has("fetchTrades"):
raise OperationalException("This exchange does not suport downloading Trades.")
return asyncio.get_event_loop().run_until_complete(
self._async_get_trade_history(pair=pair, since=since,
until=until, from_id=from_id))
@retrier
def cancel_order(self, order_id: str, pair: str) -> None:
if self._config['dry_run']:

View File

@@ -14,6 +14,10 @@ logger = logging.getLogger(__name__)
class Kraken(Exchange):
_params: Dict = {"trading_agreement": "agree"}
_ft_has: Dict = {
"trades_pagination": "id",
"trades_pagination_arg": "since",
}
@retrier
def get_balances(self) -> dict: