change log calls to debug, handle already received candle
This commit is contained in:
parent
c050eb8b8b
commit
ccd1aa70a2
@ -179,7 +179,7 @@ class DataProvider:
|
|||||||
if (producer_name not in self.__producer_pairs_df) \
|
if (producer_name not in self.__producer_pairs_df) \
|
||||||
or (pair_key not in self.__producer_pairs_df[producer_name]):
|
or (pair_key not in self.__producer_pairs_df[producer_name]):
|
||||||
# We don't have data from this producer yet,
|
# We don't have data from this producer yet,
|
||||||
# sor we don't have data for this pair_key
|
# or we don't have data for this pair_key
|
||||||
# return False and 1000 for the full df
|
# return False and 1000 for the full df
|
||||||
return (False, 1000)
|
return (False, 1000)
|
||||||
|
|
||||||
@ -190,6 +190,13 @@ class DataProvider:
|
|||||||
local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data
|
local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data
|
||||||
incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data
|
incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data
|
||||||
|
|
||||||
|
# We have received this candle before, update our copy
|
||||||
|
# and return True, 0
|
||||||
|
if local_last == incoming_first:
|
||||||
|
existing_df.iloc[-1] = dataframe.iloc[0]
|
||||||
|
existing_df = existing_df.reset_index(drop=True)
|
||||||
|
return (True, 0)
|
||||||
|
|
||||||
candle_difference = (incoming_first - local_last) / timeframe_delta
|
candle_difference = (incoming_first - local_last) / timeframe_delta
|
||||||
|
|
||||||
# If the difference divided by the timeframe is 1, then this
|
# If the difference divided by the timeframe is 1, then this
|
||||||
|
@ -253,7 +253,7 @@ class ExternalMessageConsumer:
|
|||||||
# Now send any subsequent requests published to
|
# Now send any subsequent requests published to
|
||||||
# this channel's stream
|
# this channel's stream
|
||||||
async for request, _ in channel_stream:
|
async for request, _ in channel_stream:
|
||||||
logger.info(f"Sending request to channel - {channel} - {request}")
|
logger.debug(f"Sending request to channel - {channel} - {request}")
|
||||||
await channel.send(request)
|
await channel.send(request)
|
||||||
|
|
||||||
async def _receive_messages(
|
async def _receive_messages(
|
||||||
@ -377,14 +377,14 @@ class ExternalMessageConsumer:
|
|||||||
pair, timeframe, candle_type = key
|
pair, timeframe, candle_type = key
|
||||||
|
|
||||||
if df.empty:
|
if df.empty:
|
||||||
logger.info(f"Received Empty Dataframe for {key}")
|
logger.debug(f"Received Empty Dataframe for {key}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# If set, remove the Entry and Exit signals from the Producer
|
# If set, remove the Entry and Exit signals from the Producer
|
||||||
if self._emc_config.get('remove_entry_exit_signals', False):
|
if self._emc_config.get('remove_entry_exit_signals', False):
|
||||||
df = remove_entry_exit_signals(df)
|
df = remove_entry_exit_signals(df)
|
||||||
|
|
||||||
logger.info(f"Received {len(df)} candle(s) for {key}")
|
logger.debug(f"Received {len(df)} candle(s) for {key}")
|
||||||
|
|
||||||
if len(df) >= 999:
|
if len(df) >= 999:
|
||||||
# This is a full dataframe
|
# This is a full dataframe
|
||||||
@ -413,7 +413,7 @@ class ExternalMessageConsumer:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if not did_append:
|
if not did_append:
|
||||||
logger.info("Holes in data or no existing df, "
|
logger.debug("Holes in data or no existing df, "
|
||||||
f"requesting {n_missing} candles "
|
f"requesting {n_missing} candles "
|
||||||
f"for {key} from `{producer_name}`")
|
f"for {key} from `{producer_name}`")
|
||||||
|
|
||||||
@ -428,6 +428,6 @@ class ExternalMessageConsumer:
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(
|
logger.debug(
|
||||||
f"Consumed message from `{producer_name}` "
|
f"Consumed message from `{producer_name}` "
|
||||||
f"of type `RPCMessageType.ANALYZED_DF` for {key}")
|
f"of type `RPCMessageType.ANALYZED_DF` for {key}")
|
||||||
|
Loading…
Reference in New Issue
Block a user