Move "replace or append" decision to dataprovider

This commit is contained in:
Matthias 2022-12-14 19:56:54 +01:00
parent 97fee37072
commit fa260e6560
3 changed files with 50 additions and 47 deletions

View File

@ -61,6 +61,7 @@ USERPATH_FREQAIMODELS = 'freqaimodels'
TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent'] TELEGRAM_SETTING_OPTIONS = ['on', 'off', 'silent']
WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw'] WEBHOOK_FORMAT_OPTIONS = ['form', 'json', 'raw']
FULL_DATAFRAME_THRESHOLD = 100
ENV_VAR_PREFIX = 'FREQTRADE__' ENV_VAR_PREFIX = 'FREQTRADE__'

View File

@ -12,7 +12,8 @@ from typing import Any, Dict, List, Optional, Tuple
from pandas import DataFrame, to_timedelta from pandas import DataFrame, to_timedelta
from freqtrade.configuration import TimeRange from freqtrade.configuration import TimeRange
from freqtrade.constants import Config, ListPairsWithTimeframes, PairWithTimeframe from freqtrade.constants import (FULL_DATAFRAME_THRESHOLD, Config, ListPairsWithTimeframes,
PairWithTimeframe)
from freqtrade.data.history import load_pair_history from freqtrade.data.history import load_pair_history
from freqtrade.enums import CandleType, RPCMessageType, RunMode from freqtrade.enums import CandleType, RPCMessageType, RunMode
from freqtrade.exceptions import ExchangeError, OperationalException from freqtrade.exceptions import ExchangeError, OperationalException
@ -132,7 +133,7 @@ class DataProvider:
'data': pair_key, 'data': pair_key,
}) })
def _add_external_df( def _replace_external_df(
self, self,
pair: str, pair: str,
dataframe: DataFrame, dataframe: DataFrame,
@ -158,7 +159,7 @@ class DataProvider:
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed) self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.") logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
def _add_external_candle( def _add_external_df(
self, self,
pair: str, pair: str,
dataframe: DataFrame, dataframe: DataFrame,
@ -182,6 +183,19 @@ class DataProvider:
# The incoming dataframe must have at least 1 candle # The incoming dataframe must have at least 1 candle
return (False, 0) return (False, 0)
if len(dataframe) >= FULL_DATAFRAME_THRESHOLD:
# This is likely a full dataframe
# Add the dataframe to the dataprovider
self._add_external_df(
pair,
dataframe,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
return (True, 0)
if (producer_name not in self.__producer_pairs_df if (producer_name not in self.__producer_pairs_df
or pair_key not in self.__producer_pairs_df[producer_name]): or pair_key not in self.__producer_pairs_df[producer_name]):
# We don't have data from this producer yet, # We don't have data from this producer yet,
@ -214,7 +228,14 @@ class DataProvider:
appended_df = append_candles_to_dataframe(existing_df1, dataframe) appended_df = append_candles_to_dataframe(existing_df1, dataframe)
# Everything is good, we appended # Everything is good, we appended
self.__producer_pairs_df[producer_name][pair_key] = appended_df, last_analyzed self._add_external_df(
pair,
appended_df,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
return (True, 0) return (True, 0)
def get_producer_df( def get_producer_df(

View File

@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, TypedDict, Union
import websockets import websockets
from pydantic import ValidationError from pydantic import ValidationError
from freqtrade.constants import FULL_DATAFRAME_THRESHOLD
from freqtrade.data.dataprovider import DataProvider from freqtrade.data.dataprovider import DataProvider
from freqtrade.enums import RPCMessageType from freqtrade.enums import RPCMessageType
from freqtrade.misc import remove_entry_exit_signals from freqtrade.misc import remove_entry_exit_signals
@ -36,9 +37,6 @@ class Producer(TypedDict):
ws_token: str ws_token: str
FULL_DATAFRAME_THRESHOLD = 100
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -379,51 +377,34 @@ class ExternalMessageConsumer:
logger.debug(f"Received {len(df)} candle(s) for {key}") logger.debug(f"Received {len(df)} candle(s) for {key}")
if len(df) >= FULL_DATAFRAME_THRESHOLD: did_append, n_missing = self._dp._add_external_df(
# This is likely a full dataframe pair,
# Add the dataframe to the dataprovider df,
self._dp._add_external_df( last_analyzed=la,
pair, timeframe=timeframe,
df, candle_type=candle_type,
last_analyzed=la, producer_name=producer_name
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
) )
elif len(df) < FULL_DATAFRAME_THRESHOLD: if not did_append:
# This is likely n single candles # We want an overlap in candles incase some data has changed
# Have dataprovider append it to n_missing += 1
# the full datafame. If it can't, # Set to None for all candles if we missed a full df's worth of candles
# request the missing candles n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
did_append, n_missing = self._dp._add_external_candle(
pair,
df,
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
if not did_append: logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles "
# We want an overlap in candles incase some data has changed f"for {key} from `{producer_name}`")
n_missing += 1
# Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
logger.warning("Holes in data or no existing df, requesting {n_missing} candles " self.send_producer_request(
f"for {key} from `{producer_name}`") producer_name,
WSAnalyzedDFRequest(
self.send_producer_request( data={
producer_name, "limit": n_missing,
WSAnalyzedDFRequest( "pair": pair
data={ }
"limit": n_missing,
"pair": pair
}
)
) )
return )
return
logger.debug( logger.debug(
f"Consumed message from `{producer_name}` " f"Consumed message from `{producer_name}` "