Exchange ticker_interval with timeframe in some more places
This commit is contained in:
parent
e4bdb92521
commit
08aedc18e1
@ -37,7 +37,7 @@ class DataProvider:
|
|||||||
@property
|
@property
|
||||||
def available_pairs(self) -> List[Tuple[str, str]]:
|
def available_pairs(self) -> List[Tuple[str, str]]:
|
||||||
"""
|
"""
|
||||||
Return a list of tuples containing pair, ticker_interval for which data is currently cached.
|
Return a list of tuples containing (pair, timeframe) for which data is currently cached.
|
||||||
Should be whitelist + open trades.
|
Should be whitelist + open trades.
|
||||||
"""
|
"""
|
||||||
return list(self._exchange._klines.keys())
|
return list(self._exchange._klines.keys())
|
||||||
@ -68,21 +68,22 @@ class DataProvider:
|
|||||||
datadir=Path(self._config['datadir'])
|
datadir=Path(self._config['datadir'])
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_pair_dataframe(self, pair: str, ticker_interval: str = None) -> DataFrame:
|
def get_pair_dataframe(self, pair: str, timeframe: str = None) -> DataFrame:
|
||||||
"""
|
"""
|
||||||
Return pair ohlcv data, either live or cached historical -- depending
|
Return pair ohlcv data, either live or cached historical -- depending
|
||||||
on the runmode.
|
on the runmode.
|
||||||
:param pair: pair to get the data for
|
:param pair: pair to get the data for
|
||||||
:param ticker_interval: ticker interval to get data for
|
:param timeframe: ticker interval to get data for
|
||||||
|
:return: Dataframe for this pair
|
||||||
"""
|
"""
|
||||||
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
|
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
|
||||||
# Get live ohlcv data.
|
# Get live ohlcv data.
|
||||||
data = self.ohlcv(pair=pair, timeframe=ticker_interval)
|
data = self.ohlcv(pair=pair, timeframe=timeframe)
|
||||||
else:
|
else:
|
||||||
# Get historic ohlcv data (cached on disk).
|
# Get historic ohlcv data (cached on disk).
|
||||||
data = self.historic_ohlcv(pair=pair, timeframe=ticker_interval)
|
data = self.historic_ohlcv(pair=pair, timeframe=timeframe)
|
||||||
if len(data) == 0:
|
if len(data) == 0:
|
||||||
logger.warning(f"No data found for ({pair}, {ticker_interval}).")
|
logger.warning(f"No data found for ({pair}, {timeframe}).")
|
||||||
return data
|
return data
|
||||||
|
|
||||||
def market(self, pair: str) -> Optional[Dict[str, Any]]:
|
def market(self, pair: str) -> Optional[Dict[str, Any]]:
|
||||||
|
@ -308,7 +308,7 @@ def download_pair_history(datadir: Path,
|
|||||||
logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None')
|
logger.debug("Current End: %s", misc.format_ms_time(data[-1][0]) if data else 'None')
|
||||||
|
|
||||||
# Default since_ms to 30 days if nothing is given
|
# Default since_ms to 30 days if nothing is given
|
||||||
new_data = exchange.get_historic_ohlcv(pair=pair, ticker_interval=timeframe,
|
new_data = exchange.get_historic_ohlcv(pair=pair, timeframe=timeframe,
|
||||||
since_ms=since_ms if since_ms
|
since_ms=since_ms if since_ms
|
||||||
else
|
else
|
||||||
int(arrow.utcnow().shift(
|
int(arrow.utcnow().shift(
|
||||||
|
@ -536,40 +536,40 @@ class Exchange:
|
|||||||
logger.info("returning cached ticker-data for %s", pair)
|
logger.info("returning cached ticker-data for %s", pair)
|
||||||
return self._cached_ticker[pair]
|
return self._cached_ticker[pair]
|
||||||
|
|
||||||
def get_historic_ohlcv(self, pair: str, ticker_interval: str,
|
def get_historic_ohlcv(self, pair: str, timeframe: str,
|
||||||
since_ms: int) -> List:
|
since_ms: int) -> List:
|
||||||
"""
|
"""
|
||||||
Gets candle history using asyncio and returns the list of candles.
|
Gets candle history using asyncio and returns the list of candles.
|
||||||
Handles all async doing.
|
Handles all async doing.
|
||||||
Async over one pair, assuming we get `_ohlcv_candle_limit` candles per call.
|
Async over one pair, assuming we get `_ohlcv_candle_limit` candles per call.
|
||||||
:param pair: Pair to download
|
:param pair: Pair to download
|
||||||
:param ticker_interval: Interval to get
|
:param timeframe: Ticker Timeframe to get
|
||||||
:param since_ms: Timestamp in milliseconds to get history from
|
:param since_ms: Timestamp in milliseconds to get history from
|
||||||
:returns List of tickers
|
:returns List of tickers
|
||||||
"""
|
"""
|
||||||
return asyncio.get_event_loop().run_until_complete(
|
return asyncio.get_event_loop().run_until_complete(
|
||||||
self._async_get_historic_ohlcv(pair=pair, ticker_interval=ticker_interval,
|
self._async_get_historic_ohlcv(pair=pair, timeframe=timeframe,
|
||||||
since_ms=since_ms))
|
since_ms=since_ms))
|
||||||
|
|
||||||
async def _async_get_historic_ohlcv(self, pair: str,
|
async def _async_get_historic_ohlcv(self, pair: str,
|
||||||
ticker_interval: str,
|
timeframe: str,
|
||||||
since_ms: int) -> List:
|
since_ms: int) -> List:
|
||||||
|
|
||||||
one_call = timeframe_to_msecs(ticker_interval) * self._ohlcv_candle_limit
|
one_call = timeframe_to_msecs(timeframe) * self._ohlcv_candle_limit
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"one_call: %s msecs (%s)",
|
"one_call: %s msecs (%s)",
|
||||||
one_call,
|
one_call,
|
||||||
arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True)
|
arrow.utcnow().shift(seconds=one_call // 1000).humanize(only_distance=True)
|
||||||
)
|
)
|
||||||
input_coroutines = [self._async_get_candle_history(
|
input_coroutines = [self._async_get_candle_history(
|
||||||
pair, ticker_interval, since) for since in
|
pair, timeframe, since) for since in
|
||||||
range(since_ms, arrow.utcnow().timestamp * 1000, one_call)]
|
range(since_ms, arrow.utcnow().timestamp * 1000, one_call)]
|
||||||
|
|
||||||
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
|
tickers = await asyncio.gather(*input_coroutines, return_exceptions=True)
|
||||||
|
|
||||||
# Combine tickers
|
# Combine tickers
|
||||||
data: List = []
|
data: List = []
|
||||||
for p, ticker_interval, ticker in tickers:
|
for p, timeframe, ticker in tickers:
|
||||||
if p == pair:
|
if p == pair:
|
||||||
data.extend(ticker)
|
data.extend(ticker)
|
||||||
# Sort data again after extending the result - above calls return in "async order"
|
# Sort data again after extending the result - above calls return in "async order"
|
||||||
@ -589,14 +589,14 @@ class Exchange:
|
|||||||
input_coroutines = []
|
input_coroutines = []
|
||||||
|
|
||||||
# Gather coroutines to run
|
# Gather coroutines to run
|
||||||
for pair, ticker_interval in set(pair_list):
|
for pair, timeframe in set(pair_list):
|
||||||
if (not ((pair, ticker_interval) in self._klines)
|
if (not ((pair, timeframe) in self._klines)
|
||||||
or self._now_is_time_to_refresh(pair, ticker_interval)):
|
or self._now_is_time_to_refresh(pair, timeframe)):
|
||||||
input_coroutines.append(self._async_get_candle_history(pair, ticker_interval))
|
input_coroutines.append(self._async_get_candle_history(pair, timeframe))
|
||||||
else:
|
else:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Using cached ohlcv data for pair %s, interval %s ...",
|
"Using cached ohlcv data for pair %s, timeframe %s ...",
|
||||||
pair, ticker_interval
|
pair, timeframe
|
||||||
)
|
)
|
||||||
|
|
||||||
tickers = asyncio.get_event_loop().run_until_complete(
|
tickers = asyncio.get_event_loop().run_until_complete(
|
||||||
@ -608,40 +608,40 @@ class Exchange:
|
|||||||
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
|
logger.warning("Async code raised an exception: %s", res.__class__.__name__)
|
||||||
continue
|
continue
|
||||||
pair = res[0]
|
pair = res[0]
|
||||||
ticker_interval = res[1]
|
timeframe = res[1]
|
||||||
ticks = res[2]
|
ticks = res[2]
|
||||||
# keeping last candle time as last refreshed time of the pair
|
# keeping last candle time as last refreshed time of the pair
|
||||||
if ticks:
|
if ticks:
|
||||||
self._pairs_last_refresh_time[(pair, ticker_interval)] = ticks[-1][0] // 1000
|
self._pairs_last_refresh_time[(pair, timeframe)] = ticks[-1][0] // 1000
|
||||||
# keeping parsed dataframe in cache
|
# keeping parsed dataframe in cache
|
||||||
self._klines[(pair, ticker_interval)] = parse_ticker_dataframe(
|
self._klines[(pair, timeframe)] = parse_ticker_dataframe(
|
||||||
ticks, ticker_interval, pair=pair, fill_missing=True,
|
ticks, timeframe, pair=pair, fill_missing=True,
|
||||||
drop_incomplete=self._ohlcv_partial_candle)
|
drop_incomplete=self._ohlcv_partial_candle)
|
||||||
return tickers
|
return tickers
|
||||||
|
|
||||||
def _now_is_time_to_refresh(self, pair: str, ticker_interval: str) -> bool:
|
def _now_is_time_to_refresh(self, pair: str, timeframe: str) -> bool:
|
||||||
# Calculating ticker interval in seconds
|
# Calculating ticker interval in seconds
|
||||||
interval_in_sec = timeframe_to_seconds(ticker_interval)
|
interval_in_sec = timeframe_to_seconds(timeframe)
|
||||||
|
|
||||||
return not ((self._pairs_last_refresh_time.get((pair, ticker_interval), 0)
|
return not ((self._pairs_last_refresh_time.get((pair, timeframe), 0)
|
||||||
+ interval_in_sec) >= arrow.utcnow().timestamp)
|
+ interval_in_sec) >= arrow.utcnow().timestamp)
|
||||||
|
|
||||||
@retrier_async
|
@retrier_async
|
||||||
async def _async_get_candle_history(self, pair: str, ticker_interval: str,
|
async def _async_get_candle_history(self, pair: str, timeframe: str,
|
||||||
since_ms: Optional[int] = None) -> Tuple[str, str, List]:
|
since_ms: Optional[int] = None) -> Tuple[str, str, List]:
|
||||||
"""
|
"""
|
||||||
Asynchronously gets candle histories using fetch_ohlcv
|
Asynchronously gets candle histories using fetch_ohlcv
|
||||||
returns tuple: (pair, ticker_interval, ohlcv_list)
|
returns tuple: (pair, timeframe, ohlcv_list)
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# fetch ohlcv asynchronously
|
# fetch ohlcv asynchronously
|
||||||
s = '(' + arrow.get(since_ms // 1000).isoformat() + ') ' if since_ms is not None else ''
|
s = '(' + arrow.get(since_ms // 1000).isoformat() + ') ' if since_ms is not None else ''
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Fetching pair %s, interval %s, since %s %s...",
|
"Fetching pair %s, interval %s, since %s %s...",
|
||||||
pair, ticker_interval, since_ms, s
|
pair, timeframe, since_ms, s
|
||||||
)
|
)
|
||||||
|
|
||||||
data = await self._api_async.fetch_ohlcv(pair, timeframe=ticker_interval,
|
data = await self._api_async.fetch_ohlcv(pair, timeframe=timeframe,
|
||||||
since=since_ms)
|
since=since_ms)
|
||||||
|
|
||||||
# Because some exchange sort Tickers ASC and other DESC.
|
# Because some exchange sort Tickers ASC and other DESC.
|
||||||
@ -653,9 +653,9 @@ class Exchange:
|
|||||||
data = sorted(data, key=lambda x: x[0])
|
data = sorted(data, key=lambda x: x[0])
|
||||||
except IndexError:
|
except IndexError:
|
||||||
logger.exception("Error loading %s. Result was %s.", pair, data)
|
logger.exception("Error loading %s. Result was %s.", pair, data)
|
||||||
return pair, ticker_interval, []
|
return pair, timeframe, []
|
||||||
logger.debug("Done fetching pair %s, interval %s ...", pair, ticker_interval)
|
logger.debug("Done fetching pair %s, interval %s ...", pair, timeframe)
|
||||||
return pair, ticker_interval, data
|
return pair, timeframe, data
|
||||||
|
|
||||||
except ccxt.NotSupported as e:
|
except ccxt.NotSupported as e:
|
||||||
raise OperationalException(
|
raise OperationalException(
|
||||||
@ -802,7 +802,6 @@ class Exchange:
|
|||||||
Handles all async doing.
|
Handles all async doing.
|
||||||
Async over one pair, assuming we get `_ohlcv_candle_limit` candles per call.
|
Async over one pair, assuming we get `_ohlcv_candle_limit` candles per call.
|
||||||
:param pair: Pair to download
|
:param pair: Pair to download
|
||||||
:param ticker_interval: Interval to get
|
|
||||||
:param since: Timestamp in milliseconds to get history from
|
:param since: Timestamp in milliseconds to get history from
|
||||||
:param until: Timestamp in milliseconds. Defaults to current timestamp if not defined.
|
:param until: Timestamp in milliseconds. Defaults to current timestamp if not defined.
|
||||||
:param from_id: Download data starting with ID (if id is known)
|
:param from_id: Download data starting with ID (if id is known)
|
||||||
@ -958,27 +957,27 @@ def available_exchanges(ccxt_module=None) -> List[str]:
|
|||||||
return [x for x in exchanges if not is_exchange_bad(x)]
|
return [x for x in exchanges if not is_exchange_bad(x)]
|
||||||
|
|
||||||
|
|
||||||
def timeframe_to_seconds(ticker_interval: str) -> int:
|
def timeframe_to_seconds(timeframe: str) -> int:
|
||||||
"""
|
"""
|
||||||
Translates the timeframe interval value written in the human readable
|
Translates the timeframe interval value written in the human readable
|
||||||
form ('1m', '5m', '1h', '1d', '1w', etc.) to the number
|
form ('1m', '5m', '1h', '1d', '1w', etc.) to the number
|
||||||
of seconds for one timeframe interval.
|
of seconds for one timeframe interval.
|
||||||
"""
|
"""
|
||||||
return ccxt.Exchange.parse_timeframe(ticker_interval)
|
return ccxt.Exchange.parse_timeframe(timeframe)
|
||||||
|
|
||||||
|
|
||||||
def timeframe_to_minutes(ticker_interval: str) -> int:
|
def timeframe_to_minutes(timeframe: str) -> int:
|
||||||
"""
|
"""
|
||||||
Same as timeframe_to_seconds, but returns minutes.
|
Same as timeframe_to_seconds, but returns minutes.
|
||||||
"""
|
"""
|
||||||
return ccxt.Exchange.parse_timeframe(ticker_interval) // 60
|
return ccxt.Exchange.parse_timeframe(timeframe) // 60
|
||||||
|
|
||||||
|
|
||||||
def timeframe_to_msecs(ticker_interval: str) -> int:
|
def timeframe_to_msecs(timeframe: str) -> int:
|
||||||
"""
|
"""
|
||||||
Same as timeframe_to_seconds, but returns milliseconds.
|
Same as timeframe_to_seconds, but returns milliseconds.
|
||||||
"""
|
"""
|
||||||
return ccxt.Exchange.parse_timeframe(ticker_interval) * 1000
|
return ccxt.Exchange.parse_timeframe(timeframe) * 1000
|
||||||
|
|
||||||
|
|
||||||
def timeframe_to_prev_date(timeframe: str, date: datetime = None) -> datetime:
|
def timeframe_to_prev_date(timeframe: str, date: datetime = None) -> datetime:
|
||||||
|
@ -1107,7 +1107,7 @@ def test_refresh_latest_ohlcv(mocker, default_conf, caplog) -> None:
|
|||||||
exchange.refresh_latest_ohlcv([('IOTA/ETH', '5m'), ('XRP/ETH', '5m')])
|
exchange.refresh_latest_ohlcv([('IOTA/ETH', '5m'), ('XRP/ETH', '5m')])
|
||||||
|
|
||||||
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
assert exchange._api_async.fetch_ohlcv.call_count == 2
|
||||||
assert log_has(f"Using cached ohlcv data for pair {pairs[0][0]}, interval {pairs[0][1]} ...",
|
assert log_has(f"Using cached ohlcv data for pair {pairs[0][0]}, timeframe {pairs[0][1]} ...",
|
||||||
caplog)
|
caplog)
|
||||||
|
|
||||||
|
|
||||||
@ -1143,7 +1143,7 @@ async def test__async_get_candle_history(default_conf, mocker, caplog, exchange_
|
|||||||
# exchange = Exchange(default_conf)
|
# exchange = Exchange(default_conf)
|
||||||
await async_ccxt_exception(mocker, default_conf, MagicMock(),
|
await async_ccxt_exception(mocker, default_conf, MagicMock(),
|
||||||
"_async_get_candle_history", "fetch_ohlcv",
|
"_async_get_candle_history", "fetch_ohlcv",
|
||||||
pair='ABCD/BTC', ticker_interval=default_conf['ticker_interval'])
|
pair='ABCD/BTC', timeframe=default_conf['ticker_interval'])
|
||||||
|
|
||||||
api_mock = MagicMock()
|
api_mock = MagicMock()
|
||||||
with pytest.raises(OperationalException, match=r'Could not fetch ticker data*'):
|
with pytest.raises(OperationalException, match=r'Could not fetch ticker data*'):
|
||||||
|
Loading…
Reference in New Issue
Block a user