update add_external_candle, fix breaking on ping error, handle empty dataframes

This commit is contained in:
Timothy Pogue 2022-11-27 13:11:43 -07:00
parent fce1e9d6d0
commit d2c8487ecf
2 changed files with 23 additions and 11 deletions

View File

@ -179,15 +179,19 @@ class DataProvider:
if producer_name not in self.__producer_pairs_df: if producer_name not in self.__producer_pairs_df:
# We don't have data from this producer yet, # We don't have data from this producer yet,
# so we can't append a candle # so we can't append a candle
return (False, 0) return (False, 999)
if pair_key not in self.__producer_pairs_df[producer_name]: if pair_key not in self.__producer_pairs_df[producer_name]:
# We don't have data for this pair_key, # We don't have data for this pair_key,
# so we can't append a candle # so we can't append a candle
return (False, 0) return (False, 999)
# CHECK FOR MISSING CANDLES # CHECK FOR MISSING CANDLES
# return (False, int > 0) # Calculate difference between last candle in local dataframe
# and first candle in incoming dataframe. Take difference and divide
# by timeframe to find out how many candles we still need. If 1
# then the incoming candle is the right candle. If more than 1,
# return (False, missing candles - 1)
existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] existing_df, _ = self.__producer_pairs_df[producer_name][pair_key]
appended_df = self._append_candle_to_dataframe(existing_df, dataframe) appended_df = self._append_candle_to_dataframe(existing_df, dataframe)
@ -207,8 +211,8 @@ class DataProvider:
if existing.iloc[-1]['date'] != new.iloc[-1]['date']: if existing.iloc[-1]['date'] != new.iloc[-1]['date']:
existing = concat([existing, new]) existing = concat([existing, new])
# Only keep the last 1000 candles in memory # Only keep the last 1500 candles in memory
existing = existing[-1000:] if len(existing) > 1000 else existing existing = existing[-1500:] if len(existing) > 1000 else existing
return existing return existing

View File

@ -248,7 +248,7 @@ class ExternalMessageConsumer:
# Now send any subsequent requests published to # Now send any subsequent requests published to
# this channel's stream # this channel's stream
async for request in channel_stream: async for request, _ in channel_stream:
logger.info(f"Sending request to channel - {channel} - {request}") logger.info(f"Sending request to channel - {channel} - {request}")
await channel.send(request) await channel.send(request)
@ -292,13 +292,13 @@ class ExternalMessageConsumer:
except (websockets.exceptions.ConnectionClosed): except (websockets.exceptions.ConnectionClosed):
# Just eat the error and continue reconnecting # Just eat the error and continue reconnecting
logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s") logger.warning(f"Disconnection in {channel} - retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
break
except Exception as e: except Exception as e:
# Just eat the error and continue reconnecting # Just eat the error and continue reconnecting
logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s") logger.warning(f"Ping error {channel} - {e} - retrying in {self.sleep_time}s")
logger.debug(e, exc_info=e) logger.debug(e, exc_info=e)
finally:
await asyncio.sleep(self.sleep_time) await asyncio.sleep(self.sleep_time)
break break
@ -372,10 +372,16 @@ class ExternalMessageConsumer:
pair, timeframe, candle_type = key pair, timeframe, candle_type = key
if df.empty:
logger.info(f"Received Empty Dataframe for {key}")
return
# If set, remove the Entry and Exit signals from the Producer # If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False): if self._emc_config.get('remove_entry_exit_signals', False):
df = remove_entry_exit_signals(df) df = remove_entry_exit_signals(df)
logger.info(f"Received {len(df)} candle(s) for {key}")
if len(df) >= 999: if len(df) >= 999:
# This is a full dataframe # This is a full dataframe
# Add the dataframe to the dataprovider # Add the dataframe to the dataprovider
@ -404,13 +410,14 @@ class ExternalMessageConsumer:
if not did_append: if not did_append:
logger.info("Holes in data or no existing df, " logger.info("Holes in data or no existing df, "
f"requesting data for {key} from `{producer_name}`") f"requesting {n_missing} candles "
f"for {key} from `{producer_name}`")
self.send_producer_request( self.send_producer_request(
producer_name, producer_name,
WSAnalyzedDFRequest( WSAnalyzedDFRequest(
data={ data={
"limit": n_missing if n_missing > 0 else 1000, "limit": n_missing,
"pair": pair "pair": pair
} }
) )
@ -418,4 +425,5 @@ class ExternalMessageConsumer:
return return
logger.info( logger.info(
f"Consumed message from `{producer_name}` of type `RPCMessageType.ANALYZED_DF`") f"Consumed message from `{producer_name}` "
f"of type `RPCMessageType.ANALYZED_DF` for {key}")