From 02e238a944d7a87a207d6f552e97f6e2b3771eac Mon Sep 17 00:00:00 2001 From: Matthias Date: Mon, 3 Oct 2022 20:49:54 +0200 Subject: [PATCH] Combine ohlcv data in exchange class for live mode --- freqtrade/exchange/exchange.py | 49 +++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/freqtrade/exchange/exchange.py b/freqtrade/exchange/exchange.py index cb9cbebbd..4f869f994 100644 --- a/freqtrade/exchange/exchange.py +++ b/freqtrade/exchange/exchange.py @@ -18,12 +18,12 @@ import ccxt.async_support as ccxt_async from cachetools import TTLCache from ccxt import ROUND_DOWN, ROUND_UP, TICK_SIZE, TRUNCATE, decimal_to_precision from dateutil import parser -from pandas import DataFrame +from pandas import DataFrame, concat from freqtrade.constants import (DEFAULT_AMOUNT_RESERVE_PERCENT, NON_OPEN_EXCHANGE_STATES, BuySell, Config, EntryExit, ListPairsWithTimeframes, MakerTaker, PairWithTimeframe) -from freqtrade.data.converter import ohlcv_to_dataframe, trades_dict_to_list +from freqtrade.data.converter import clean_ohlcv_dataframe, ohlcv_to_dataframe, trades_dict_to_list from freqtrade.enums import OPTIMIZE_MODES, CandleType, MarginMode, TradingMode from freqtrade.exceptions import (DDosProtection, ExchangeError, InsufficientFundsError, InvalidOrderException, OperationalException, PricingError, @@ -1850,10 +1850,14 @@ class Exchange: return pair, timeframe, candle_type, data def _build_coroutine(self, pair: str, timeframe: str, candle_type: CandleType, - since_ms: Optional[int]) -> Coroutine: + since_ms: Optional[int], cache: bool) -> Coroutine: + not_all_data = self.required_candle_call_count > 1 + if cache and (pair, timeframe, candle_type) in self._klines: + # Not in cache - force multi-calls + not_all_data = False if (not since_ms - and (self._ft_has["ohlcv_require_since"] or self.required_candle_call_count > 1)): + and (self._ft_has["ohlcv_require_since"] or not_all_data)): # Multiple calls for one pair - to get more history one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit( timeframe, candle_type, since_ms) @@ -1890,8 +1894,9 @@ class Exchange: if ((pair, timeframe, candle_type) not in self._klines or not cache or self._now_is_time_to_refresh(pair, timeframe, candle_type)): - input_coroutines.append(self._build_coroutine( - pair, timeframe, candle_type=candle_type, since_ms=since_ms)) + + input_coroutines.append( + self._build_coroutine(pair, timeframe, candle_type, since_ms, cache)) else: logger.debug( @@ -1901,6 +1906,25 @@ class Exchange: return input_coroutines, cached_pairs + def _process_ohlcv_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List], + cache: bool, drop_incomplete: bool) -> DataFrame: + # keeping last candle time as last refreshed time of the pair + if ticks: + self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000 + # keeping parsed dataframe in cache + ohlcv_df = ohlcv_to_dataframe(ticks, timeframe, pair=pair, fill_missing=True, + drop_incomplete=drop_incomplete) + if cache: + if (pair, timeframe, c_type) in self._klines: + old = self._klines[(pair, timeframe, c_type)] + # Reassign so we return the updated, combined df + ohlcv_df = clean_ohlcv_dataframe(concat([old, ohlcv_df], axis=0), timeframe, pair, + fill_missing=True, drop_incomplete=False) + self._klines[(pair, timeframe, c_type)] = ohlcv_df + else: + self._klines[(pair, timeframe, c_type)] = ohlcv_df + return ohlcv_df + def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *, since_ms: Optional[int] = None, cache: bool = True, drop_incomplete: Optional[bool] = None @@ -1937,16 +1961,11 @@ class Exchange: continue # Deconstruct tuple (has 4 elements) pair, timeframe, c_type, ticks = res - # keeping last candle time as last refreshed time of the pair - if ticks: - self._pairs_last_refresh_time[(pair, timeframe, c_type)] = ticks[-1][0] // 1000 - # keeping parsed dataframe in cache - ohlcv_df = ohlcv_to_dataframe( - ticks, timeframe, pair=pair, fill_missing=True, - drop_incomplete=drop_incomplete) + ohlcv_df = self._process_ohlcv_df( + pair, timeframe, c_type, ticks, cache, drop_incomplete) + results_df[(pair, timeframe, c_type)] = ohlcv_df - if cache: - self._klines[(pair, timeframe, c_type)] = ohlcv_df + # Return cached klines for pair, timeframe, c_type in cached_pairs: results_df[(pair, timeframe, c_type)] = self.klines(