stable/freqtrade/data/dataprovider.py

427 lines
17 KiB
Python
Raw Normal View History

2018-11-30 19:42:16 +00:00
"""
Dataprovider
Responsible to provide data to the bot
including ticker and orderbook data, live and historical candle (OHLCV) data
2018-11-30 19:42:16 +00:00
Common Interface for bot and strategy to access data.
"""
import logging
from collections import deque
2020-06-14 09:51:20 +00:00
from datetime import datetime, timezone
2020-06-12 12:02:21 +00:00
from typing import Any, Dict, List, Optional, Tuple
2018-11-30 19:42:16 +00:00
2018-12-02 08:16:35 +00:00
from pandas import DataFrame
2021-07-18 21:47:51 +00:00
from freqtrade.configuration import TimeRange
2020-06-12 12:12:33 +00:00
from freqtrade.constants import ListPairsWithTimeframes, PairWithTimeframe
2018-12-17 05:52:13 +00:00
from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode
2020-06-28 14:01:40 +00:00
from freqtrade.exceptions import ExchangeError, OperationalException
from freqtrade.exchange import Exchange, timeframe_to_seconds
from freqtrade.rpc import RPCManager
2022-08-10 08:57:19 +00:00
from freqtrade.util import PeriodicCache
2020-09-28 17:39:41 +00:00
2018-11-30 19:42:16 +00:00
logger = logging.getLogger(__name__)
2021-05-05 18:08:31 +00:00
NO_EXCHANGE_EXCEPTION = 'Exchange is not available to DataProvider.'
MAX_DATAFRAME_CANDLES = 1000
2021-05-05 18:08:31 +00:00
2018-11-30 19:42:16 +00:00
2019-09-12 09:13:20 +00:00
class DataProvider:
2018-11-30 19:42:16 +00:00
def __init__(
self,
config: dict,
exchange: Optional[Exchange],
2022-08-31 17:43:02 +00:00
pairlists=None,
rpc: Optional[RPCManager] = None
) -> None:
2018-12-02 08:16:35 +00:00
self._config = config
self._exchange = exchange
self._pairlists = pairlists
self.__rpc = rpc
2020-06-15 12:08:57 +00:00
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
2021-05-09 07:56:36 +00:00
self.__slice_index: Optional[int] = None
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
2022-08-31 17:43:02 +00:00
self.__producer_pairs_df: Dict[str,
Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {}
self.__producer_pairs: Dict[str, List[str]] = {}
self._msg_queue: deque = deque()
self._default_candle_type = self._config.get('candle_type_def', CandleType.SPOT)
self._default_timeframe = self._config.get('timeframe', '1h')
self.__msg_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe))
self.producers = self._config.get('external_message_consumer', {}).get('producers', [])
self.external_data_enabled = len(self.producers) > 0
def _set_dataframe_max_index(self, limit_index: int):
"""
Limit analyzed dataframe to max specified index.
:param limit_index: dataframe index.
"""
self.__slice_index = limit_index
2020-06-12 12:02:21 +00:00
def _set_cached_df(
self,
pair: str,
timeframe: str,
dataframe: DataFrame,
2021-12-03 12:04:31 +00:00
candle_type: CandleType
) -> None:
2020-06-12 12:02:21 +00:00
"""
Store cached Dataframe.
Using private method as this should never be used by a user
(but the class is exposed via `self.dp` to the strategy)
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param dataframe: analyzed dataframe
2021-12-03 12:04:31 +00:00
:param candle_type: Any of the enum CandleType (must match trading mode!)
2020-06-12 12:02:21 +00:00
"""
pair_key = (pair, timeframe, candle_type)
self.__cached_pairs[pair_key] = (
dataframe, datetime.now(timezone.utc))
2018-11-30 19:42:16 +00:00
# For multiple producers we will want to merge the pairlists instead of overwriting
def _set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"):
"""
Set the pairs received to later be used.
This only supports 1 Producer right now.
:param pairlist: List of pairs
"""
2022-08-31 17:43:02 +00:00
self.__producer_pairs[producer_name] = pairlist.copy()
2022-08-31 17:43:02 +00:00
def get_producer_pairs(self, producer_name: str = "default") -> List[str]:
"""
Get the pairs cached from the producer
:returns: List of pairs
"""
2022-08-31 17:43:02 +00:00
return self.__producer_pairs.get(producer_name, [])
def _emit_df(
self,
pair_key: PairWithTimeframe,
dataframe: DataFrame
) -> None:
"""
Send this dataframe as an ANALYZED_DF message to RPC
:param pair_key: PairWithTimeframe tuple
:param data: Tuple containing the DataFrame and the datetime it was cached
"""
if self.__rpc:
self.__rpc.send_msg(
{
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': pair_key,
2022-09-07 21:08:01 +00:00
'df': dataframe,
'la': datetime.now(timezone.utc)
}
}
)
def _add_external_df(
self,
pair: str,
dataframe: DataFrame,
2022-09-07 21:08:01 +00:00
last_analyzed: Optional[datetime] = None,
timeframe: Optional[str] = None,
candle_type: Optional[CandleType] = None,
2022-08-31 17:43:02 +00:00
producer_name: str = "default"
) -> None:
"""
Add the pair data to this class from an external source.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
_timeframe = self._default_timeframe if not timeframe else timeframe
_candle_type = self._default_candle_type if not candle_type else candle_type
pair_key = (pair, _timeframe, _candle_type)
2022-08-31 17:43:02 +00:00
if producer_name not in self.__producer_pairs_df:
self.__producer_pairs_df[producer_name] = {}
2022-09-07 21:08:01 +00:00
_last_analyzed = datetime.now(timezone.utc) if not last_analyzed else last_analyzed
2022-09-06 18:12:05 +00:00
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
def get_external_df(
self,
pair: str,
timeframe: Optional[str] = None,
candle_type: Optional[CandleType] = None,
2022-08-31 17:43:02 +00:00
producer_name: str = "default"
) -> Tuple[DataFrame, datetime]:
"""
Get the pair data from the external sources. Will wait if the policy is
set to, and data is not available.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
:param candle_type: Any of the enum CandleType (must match trading mode!)
"""
_timeframe = self._default_timeframe if not timeframe else timeframe
_candle_type = self._default_candle_type if not candle_type else candle_type
pair_key = (pair, _timeframe, _candle_type)
# If we have no data from this Producer yet
2022-08-31 17:43:02 +00:00
if producer_name not in self.__producer_pairs_df:
# We don't have this data yet, return empty DataFrame and datetime (01-01-1970)
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
# If we do have data from that Producer, but no data on this pair_key
if pair_key not in self.__producer_pairs_df[producer_name]:
# We don't have this data yet, return empty DataFrame and datetime (01-01-1970)
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
# We have it, return this data
2022-08-31 17:43:02 +00:00
return self.__producer_pairs_df[producer_name][pair_key]
def add_pairlisthandler(self, pairlists) -> None:
"""
Allow adding pairlisthandler after initialization
"""
self._pairlists = pairlists
def historic_ohlcv(
self,
pair: str,
timeframe: str = None,
candle_type: str = ''
) -> DataFrame:
2018-11-30 19:42:16 +00:00
"""
Get stored historical candle (OHLCV) data
:param pair: pair to get the data for
2019-11-13 10:28:26 +00:00
:param timeframe: timeframe to get data for
2021-11-27 08:55:42 +00:00
:param candle_type: '', mark, index, premiumIndex, or funding_rate
2018-11-30 19:42:16 +00:00
"""
2022-01-28 18:18:03 +00:00
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
saved_pair = (pair, str(timeframe), _candle_type)
if saved_pair not in self.__cached_pairs_backtesting:
timerange = TimeRange.parse_timerange(None if self._config.get(
'timerange') is None else str(self._config.get('timerange')))
# Move informative start time respecting startup_candle_count
startup_candles = self.get_required_startup(str(timeframe))
tf_seconds = timeframe_to_seconds(str(timeframe))
timerange.subtract_start(tf_seconds * startup_candles)
self.__cached_pairs_backtesting[saved_pair] = load_pair_history(
pair=pair,
timeframe=timeframe or self._config['timeframe'],
datadir=self._config['datadir'],
timerange=timerange,
data_format=self._config.get('dataformat_ohlcv', 'json'),
2022-01-28 18:18:03 +00:00
candle_type=_candle_type,
2021-12-03 12:04:31 +00:00
)
return self.__cached_pairs_backtesting[saved_pair].copy()
2018-11-30 19:42:16 +00:00
def get_required_startup(self, timeframe: str) -> int:
freqai_config = self._config.get('freqai', {})
if not freqai_config.get('enabled', False):
return self._config.get('startup_candle_count', 0)
else:
startup_candles = self._config.get('startup_candle_count', 0)
indicator_periods = freqai_config['feature_parameters']['indicator_periods_candles']
# make sure the startupcandles is at least the set maximum indicator periods
self._config['startup_candle_count'] = max(startup_candles, max(indicator_periods))
tf_seconds = timeframe_to_seconds(timeframe)
train_candles = freqai_config['train_period_days'] * 86400 / tf_seconds
total_candles = int(self._config['startup_candle_count'] + train_candles)
logger.info(f'Increasing startup_candle_count for freqai to {total_candles}')
return total_candles
def get_pair_dataframe(
self,
pair: str,
timeframe: str = None,
candle_type: str = ''
) -> DataFrame:
2019-08-17 08:43:36 +00:00
"""
Return pair candle (OHLCV) data, either live or cached historical -- depending
2019-08-17 08:43:36 +00:00
on the runmode.
2021-11-28 14:53:13 +00:00
Only combinations in the pairlist or which have been specified as informative pairs
will be available.
2019-08-17 08:43:36 +00:00
:param pair: pair to get the data for
2019-11-13 10:28:26 +00:00
:param timeframe: timeframe to get data for
:return: Dataframe for this pair
2021-11-27 08:55:42 +00:00
:param candle_type: '', mark, index, premiumIndex, or funding_rate
2019-08-17 08:43:36 +00:00
"""
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
# Get live OHLCV data.
data = self.ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
2019-08-17 08:43:36 +00:00
else:
# Get historical OHLCV data (cached on disk).
data = self.historic_ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
2019-08-17 08:43:36 +00:00
if len(data) == 0:
logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).")
2019-08-17 08:43:36 +00:00
return data
2021-11-28 14:53:13 +00:00
def get_analyzed_dataframe(self, pair: str, timeframe: str) -> Tuple[DataFrame, datetime]:
2020-06-12 12:02:21 +00:00
"""
2021-05-09 07:56:36 +00:00
Retrieve the analyzed dataframe. Returns the full dataframe in trade mode (live / dry),
and the last 1000 candles (up to the time evaluated at this moment) in all other modes.
2020-06-12 12:02:21 +00:00
:param pair: pair to get the data for
:param timeframe: timeframe to get data for
2020-06-12 12:12:33 +00:00
:return: Tuple of (Analyzed Dataframe, lastrefreshed) for the requested pair / timeframe
2020-06-14 09:51:20 +00:00
combination.
Returns empty dataframe and Epoch 0 (1970-01-01) if no dataframe was cached.
2020-06-12 12:02:21 +00:00
"""
pair_key = (pair, timeframe, self._config.get('candle_type_def', CandleType.SPOT))
if pair_key in self.__cached_pairs:
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
df, date = self.__cached_pairs[pair_key]
else:
df, date = self.__cached_pairs[pair_key]
2021-05-09 07:56:36 +00:00
if self.__slice_index is not None:
max_index = self.__slice_index
df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES):max_index]
return df, date
2020-06-12 12:02:21 +00:00
else:
2020-06-14 09:51:20 +00:00
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
2020-06-12 12:02:21 +00:00
2021-05-03 06:47:58 +00:00
@property
def runmode(self) -> RunMode:
"""
Get runmode of the bot
can be "live", "dry-run", "backtest", "edgecli", "hyperopt" or "other".
"""
return RunMode(self._config.get('runmode', RunMode.OTHER))
def current_whitelist(self) -> List[str]:
"""
fetch latest available whitelist.
Useful when you have a large whitelist and need to call each pair as an informative pair.
As available pairs does not show whitelist until after informative pairs have been cached.
:return: list of pairs in whitelist
"""
if self._pairlists:
return self._pairlists.whitelist.copy()
else:
raise OperationalException("Dataprovider was not initialized with a pairlist provider.")
def clear_cache(self):
"""
Clear pair dataframe cache.
"""
self.__cached_pairs = {}
self.__cached_pairs_backtesting = {}
self.__slice_index = 0
2021-05-03 06:47:58 +00:00
# Exchange functions
def refresh(self,
pairlist: ListPairsWithTimeframes,
helping_pairs: ListPairsWithTimeframes = None) -> None:
"""
Refresh data, called with each cycle
"""
if self._exchange is None:
2021-05-05 18:08:31 +00:00
raise OperationalException(NO_EXCHANGE_EXCEPTION)
2021-05-03 06:47:58 +00:00
if helping_pairs:
self._exchange.refresh_latest_ohlcv(pairlist + helping_pairs)
else:
self._exchange.refresh_latest_ohlcv(pairlist)
@property
def available_pairs(self) -> ListPairsWithTimeframes:
"""
Return a list of tuples containing (pair, timeframe) for which data is currently cached.
Should be whitelist + open trades.
"""
if self._exchange is None:
2021-05-05 18:08:31 +00:00
raise OperationalException(NO_EXCHANGE_EXCEPTION)
2021-05-03 06:47:58 +00:00
return list(self._exchange._klines.keys())
def ohlcv(
self,
pair: str,
timeframe: str = None,
copy: bool = True,
candle_type: str = ''
) -> DataFrame:
2021-05-03 06:47:58 +00:00
"""
Get candle (OHLCV) data for the given pair as DataFrame
Please use the `available_pairs` method to verify which pairs are currently cached.
:param pair: pair to get the data for
:param timeframe: Timeframe to get data for
2021-11-27 08:55:42 +00:00
:param candle_type: '', mark, index, premiumIndex, or funding_rate
2021-05-03 06:47:58 +00:00
:param copy: copy dataframe before returning if True.
Use False only for read-only operations (where the dataframe is not modified)
"""
2021-05-05 18:08:31 +00:00
if self._exchange is None:
raise OperationalException(NO_EXCHANGE_EXCEPTION)
2021-05-03 06:47:58 +00:00
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
2022-01-28 18:18:03 +00:00
_candle_type = CandleType.from_string(
candle_type) if candle_type != '' else self._config['candle_type_def']
return self._exchange.klines(
2022-01-28 18:18:03 +00:00
(pair, timeframe or self._config['timeframe'], _candle_type),
copy=copy
)
2021-05-03 06:47:58 +00:00
else:
return DataFrame()
def market(self, pair: str) -> Optional[Dict[str, Any]]:
2019-10-02 23:58:45 +00:00
"""
Return market data for the pair
:param pair: Pair to get the data for
:return: Market data dict from ccxt or None if market info is not available for the pair
"""
2021-05-03 06:47:58 +00:00
if self._exchange is None:
2021-05-05 18:08:31 +00:00
raise OperationalException(NO_EXCHANGE_EXCEPTION)
2019-10-02 23:58:45 +00:00
return self._exchange.markets.get(pair)
2018-12-02 08:16:35 +00:00
def ticker(self, pair: str):
"""
Return last ticker data from exchange
:param pair: Pair to get the data for
:return: Ticker dict from exchange or empty dict if ticker is not available for the pair
2018-12-02 08:16:35 +00:00
"""
2021-05-03 06:47:58 +00:00
if self._exchange is None:
2021-05-05 18:08:31 +00:00
raise OperationalException(NO_EXCHANGE_EXCEPTION)
try:
return self._exchange.fetch_ticker(pair)
2020-06-28 14:01:40 +00:00
except ExchangeError:
return {}
2018-11-30 19:42:16 +00:00
def orderbook(self, pair: str, maximum: int) -> Dict[str, List]:
2018-12-02 08:16:35 +00:00
"""
Fetch latest l2 orderbook data
Warning: Does a network request - so use with common sense.
:param pair: pair to get the data for
:param maximum: Maximum number of orderbook entries to query
:return: dict including bids/asks with a total of `maximum` entries.
2018-12-02 08:16:35 +00:00
"""
2021-05-03 06:47:58 +00:00
if self._exchange is None:
2021-05-05 18:08:31 +00:00
raise OperationalException(NO_EXCHANGE_EXCEPTION)
return self._exchange.fetch_l2_order_book(pair, maximum)
def send_msg(self, message: str, *, always_send: bool = False) -> None:
"""
Send custom RPC Notifications from your bot.
Will not send any bot in modes other than Dry-run or Live.
:param message: Message to be sent. Must be below 4096.
:param always_send: If False, will send the message only once per candle, and surpress
identical messages.
Careful as this can end up spaming your chat.
Defaults to False
"""
if self.runmode not in (RunMode.DRY_RUN, RunMode.LIVE):
return
if always_send or message not in self.__msg_cache:
self._msg_queue.append(message)
self.__msg_cache[message] = True