Added price as param to fetch_ohlcv

This commit is contained in:
Sam Germain 2021-10-23 21:10:36 -06:00
parent aad37bb8f3
commit c8162479d6
3 changed files with 82 additions and 29 deletions

View File

@ -200,24 +200,36 @@ class Binance(Exchange):
except ccxt.BaseError as e: except ccxt.BaseError as e:
raise OperationalException(e) from e raise OperationalException(e) from e
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, async def _async_get_historic_ohlcv(
since_ms: int, is_new_pair: bool = False, self,
raise_: bool = False pair: str,
) -> Tuple[str, str, List]: timeframe: str,
since_ms: int,
is_new_pair: bool,
raise_: bool = False,
price: Optional[str] = None
) -> Tuple[str, str, List]:
""" """
Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date Overwrite to introduce "fast new pair" functionality by detecting the pair's listing date
Does not work for other exchanges, which don't return the earliest data when called with "0" Does not work for other exchanges, which don't return the earliest data when called with "0"
:param price: "mark" if retrieving the mark price cnadles
""" """
if is_new_pair: if is_new_pair:
x = await self._async_get_candle_history(pair, timeframe, 0) x = await self._async_get_candle_history(pair, timeframe, 0, price)
if x and x[2] and x[2][0] and x[2][0][0] > since_ms: if x and x[2] and x[2][0] and x[2][0][0] > since_ms:
# Set starting date to first available candle. # Set starting date to first available candle.
since_ms = x[2][0][0] since_ms = x[2][0][0]
logger.info(f"Candle-data for {pair} available starting with " logger.info(f"Candle-data for {pair} available starting with "
f"{arrow.get(since_ms // 1000).isoformat()}.") f"{arrow.get(since_ms // 1000).isoformat()}.")
return await super()._async_get_historic_ohlcv( return await super()._async_get_historic_ohlcv(
pair=pair, timeframe=timeframe, since_ms=since_ms, is_new_pair=is_new_pair, pair=pair,
raise_=raise_) timeframe=timeframe,
since_ms=since_ms,
is_new_pair=is_new_pair,
raise_=raise_,
price=price
)
def funding_fee_cutoff(self, open_date: datetime): def funding_fee_cutoff(self, open_date: datetime):
""" """

View File

@ -1309,8 +1309,14 @@ class Exchange:
# Historic data # Historic data
def get_historic_ohlcv(self, pair: str, timeframe: str, def get_historic_ohlcv(
since_ms: int, is_new_pair: bool = False) -> List: self,
pair: str,
timeframe: str,
since_ms: int,
is_new_pair: bool = False,
price: Optional[str] = None
) -> List:
""" """
Get candle history using asyncio and returns the list of candles. Get candle history using asyncio and returns the list of candles.
Handles all async work for this. Handles all async work for this.
@ -1318,34 +1324,52 @@ class Exchange:
:param pair: Pair to download :param pair: Pair to download
:param timeframe: Timeframe to get data for :param timeframe: Timeframe to get data for
:param since_ms: Timestamp in milliseconds to get history from :param since_ms: Timestamp in milliseconds to get history from
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
:return: List with candle (OHLCV) data :return: List with candle (OHLCV) data
""" """
pair, timeframe, data = asyncio.get_event_loop().run_until_complete( pair, timeframe, data = asyncio.get_event_loop().run_until_complete(
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe, self._async_get_historic_ohlcv(
since_ms=since_ms, is_new_pair=is_new_pair)) pair=pair,
timeframe=timeframe,
since_ms=since_ms,
is_new_pair=is_new_pair,
price=price
))
logger.info(f"Downloaded data for {pair} with length {len(data)}.") logger.info(f"Downloaded data for {pair} with length {len(data)}.")
return data return data
def get_historic_ohlcv_as_df(self, pair: str, timeframe: str, def get_historic_ohlcv_as_df(
since_ms: int) -> DataFrame: self,
pair: str,
timeframe: str,
since_ms: int,
price: Optional[str] = None
) -> DataFrame:
""" """
Minimal wrapper around get_historic_ohlcv - converting the result into a dataframe Minimal wrapper around get_historic_ohlcv - converting the result into a dataframe
:param pair: Pair to download :param pair: Pair to download
:param timeframe: Timeframe to get data for :param timeframe: Timeframe to get data for
:param since_ms: Timestamp in milliseconds to get history from :param since_ms: Timestamp in milliseconds to get history from
:param price: "mark" if retrieving the mark price candles, "index" for index price candles
:return: OHLCV DataFrame :return: OHLCV DataFrame
""" """
ticks = self.get_historic_ohlcv(pair, timeframe, since_ms=since_ms) ticks = self.get_historic_ohlcv(pair, timeframe, since_ms=since_ms, price=price)
return ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, return ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True,
drop_incomplete=self._ohlcv_partial_candle) drop_incomplete=self._ohlcv_partial_candle)
async def _async_get_historic_ohlcv(self, pair: str, timeframe: str, async def _async_get_historic_ohlcv(
since_ms: int, is_new_pair: bool = False, self,
raise_: bool = False pair: str,
) -> Tuple[str, str, List]: timeframe: str,
since_ms: int,
is_new_pair: bool,
raise_: bool = False,
price: Optional[str] = None
) -> Tuple[str, str, List]:
""" """
Download historic ohlcv Download historic ohlcv
:param is_new_pair: used by binance subclass to allow "fast" new pair downloading :param is_new_pair: used by binance subclass to allow "fast" new pair downloading
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
""" """
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(timeframe) one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(timeframe)
@ -1354,8 +1378,13 @@ class Exchange:
one_call, one_call,
arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True) arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True)
) )
input_coroutines = [self._async_get_candle_history( input_coroutines = [
pair, timeframe, since) for since in self._async_get_candle_history(
pair,
timeframe,
since,
price
) for since in
range(since_ms, arrow.utcnow().int_timestamp * 1000, one_call)] range(since_ms, arrow.utcnow().int_timestamp * 1000, one_call)]
data: List = [] data: List = []
@ -1378,9 +1407,13 @@ class Exchange:
data = sorted(data, key=lambda x: x[0]) data = sorted(data, key=lambda x: x[0])
return pair, timeframe, data return pair, timeframe, data
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, def refresh_latest_ohlcv(
since_ms: Optional[int] = None, cache: bool = True self,
) -> Dict[Tuple[str, str], DataFrame]: pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None,
cache: bool = True,
price: Optional[str] = None
) -> Dict[Tuple[str, str], DataFrame]:
""" """
Refresh in-memory OHLCV asynchronously and set `_klines` with the result Refresh in-memory OHLCV asynchronously and set `_klines` with the result
Loops asynchronously over pair_list and downloads all pairs async (semi-parallel). Loops asynchronously over pair_list and downloads all pairs async (semi-parallel).
@ -1388,6 +1421,7 @@ class Exchange:
:param pair_list: List of 2 element tuples containing pair, interval to refresh :param pair_list: List of 2 element tuples containing pair, interval to refresh
:param since_ms: time since when to download, in milliseconds :param since_ms: time since when to download, in milliseconds
:param cache: Assign result to _klines. Usefull for one-off downloads like for pairlists :param cache: Assign result to _klines. Usefull for one-off downloads like for pairlists
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
:return: Dict of [{(pair, timeframe): Dataframe}] :return: Dict of [{(pair, timeframe): Dataframe}]
""" """
logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list)) logger.debug("Refreshing candle (OHLCV) data for %d pairs", len(pair_list))
@ -1411,7 +1445,7 @@ class Exchange:
else: else:
# One call ... "regular" refresh # One call ... "regular" refresh
input_coroutines.append(self._async_get_candle_history( input_coroutines.append(self._async_get_candle_history(
pair, timeframe, since_ms=since_ms)) pair, timeframe, since_ms=since_ms, price=price))
else: else:
logger.debug( logger.debug(
"Using cached candle (OHLCV) data for pair %s, timeframe %s ...", "Using cached candle (OHLCV) data for pair %s, timeframe %s ...",
@ -1454,10 +1488,16 @@ class Exchange:
+ interval_in_sec) >= arrow.utcnow().int_timestamp) + interval_in_sec) >= arrow.utcnow().int_timestamp)
@retrier_async @retrier_async
async def _async_get_candle_history(self, pair: str, timeframe: str, async def _async_get_candle_history(
since_ms: Optional[int] = None) -> Tuple[str, str, List]: self,
pair: str,
timeframe: str,
since_ms: Optional[int] = None,
price: Optional[str] = None
) -> Tuple[str, str, List]:
""" """
Asynchronously get candle history data using fetch_ohlcv Asynchronously get candle history data using fetch_ohlcv
:param price: "mark" if retrieving the mark price cnadles, "index" for index price candles
returns tuple: (pair, timeframe, ohlcv_list) returns tuple: (pair, timeframe, ohlcv_list)
""" """
try: try:
@ -1467,7 +1507,8 @@ class Exchange:
"Fetching pair %s, interval %s, since %s %s...", "Fetching pair %s, interval %s, since %s %s...",
pair, timeframe, since_ms, s pair, timeframe, since_ms, s
) )
params = self._ft_has.get('ohlcv_params', {}) # TODO-lev: Does this put price into params correctly?
params = self._ft_has.get('ohlcv_params', {price: price})
data = await self._api_async.fetch_ohlcv(pair, timeframe=timeframe, data = await self._api_async.fetch_ohlcv(pair, timeframe=timeframe,
since=since_ms, since=since_ms,
limit=self.ohlcv_candle_limit(timeframe), limit=self.ohlcv_candle_limit(timeframe),

View File

@ -1568,7 +1568,7 @@ def test_get_historic_ohlcv(default_conf, mocker, caplog, exchange_name):
] ]
pair = 'ETH/BTC' pair = 'ETH/BTC'
async def mock_candle_hist(pair, timeframe, since_ms): async def mock_candle_hist(pair, timeframe, since_ms, price=None):
return pair, timeframe, ohlcv return pair, timeframe, ohlcv
exchange._async_get_candle_history = Mock(wraps=mock_candle_hist) exchange._async_get_candle_history = Mock(wraps=mock_candle_hist)
@ -1625,7 +1625,7 @@ def test_get_historic_ohlcv_as_df(default_conf, mocker, exchange_name):
] ]
pair = 'ETH/BTC' pair = 'ETH/BTC'
async def mock_candle_hist(pair, timeframe, since_ms): async def mock_candle_hist(pair, timeframe, since_ms, price=None):
return pair, timeframe, ohlcv return pair, timeframe, ohlcv
exchange._async_get_candle_history = Mock(wraps=mock_candle_hist) exchange._async_get_candle_history = Mock(wraps=mock_candle_hist)