- gateway/normalize.py: normalize_slack_event -> KonstructMessage (strips bot mention) - gateway/channels/slack.py: register_slack_handlers for app_mention + DM events - rate limit check -> ephemeral rejection on exceeded - idempotency dedup (Slack retry protection) - placeholder 'Thinking...' message posted in-thread before Celery dispatch - auto-follow engaged threads with 30-minute TTL - HTTP 200 returned immediately; all LLM work dispatched to Celery - gateway/main.py: FastAPI on port 8001, /slack/events + /health - router/tenant.py: resolve_tenant workspace_id -> tenant_id (RLS-bypass query) - router/ratelimit.py: check_rate_limit Redis token bucket, RateLimitExceeded exception - router/idempotency.py: is_duplicate + mark_processed (SET NX, 24h TTL) - router/context.py: load_agent_for_tenant with RLS ContextVar setup - orchestrator/tasks.py: handle_message now extracts placeholder_ts/channel_id, calls _update_slack_placeholder via chat.update after LLM response - docker-compose.yml: gateway service on port 8001 - pyproject.toml: added redis, konstruct-router, konstruct-orchestrator deps
88 lines
2.7 KiB
Python
88 lines
2.7 KiB
Python
"""
|
|
Message deduplication (idempotency).
|
|
|
|
Slack (and other channels) retry event delivery when the gateway does not
|
|
respond with HTTP 200 within 3 seconds. This module tracks which message
|
|
IDs have already been dispatched to Celery, preventing duplicate processing.
|
|
|
|
Design:
|
|
- Key: {tenant_id}:dedup:{message_id} (from shared.redis_keys)
|
|
- TTL: 24 hours (Slack retries stop after ~1 hour; 24h is conservative)
|
|
- Op: SET NX (atomic check-and-set)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from redis.asyncio import Redis
|
|
|
|
from shared.redis_keys import idempotency_key
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# How long to remember a message ID (seconds).
|
|
# Slack retries for up to ~1 hour; 24h gives plenty of buffer.
|
|
_DEDUP_TTL_SECONDS = 86400 # 24 hours
|
|
|
|
|
|
async def is_duplicate(
|
|
tenant_id: str,
|
|
message_id: str,
|
|
redis: Redis, # type: ignore[type-arg]
|
|
) -> bool:
|
|
"""
|
|
Check if this message has already been dispatched for processing.
|
|
|
|
Uses SET NX (set-if-not-exists) as an atomic check-and-mark operation.
|
|
If the key did not exist, it is created with a 24-hour TTL and this
|
|
function returns False (not a duplicate — process it).
|
|
If the key already existed, this function returns True (duplicate — skip).
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
message_id: Unique message identifier (e.g. Slack event_ts or UUID).
|
|
redis: Async Redis client.
|
|
|
|
Returns:
|
|
True if this message is a duplicate (already dispatched).
|
|
False if this is the first time we've seen this message.
|
|
"""
|
|
key = idempotency_key(tenant_id, message_id)
|
|
|
|
# SET key "1" NX EX ttl — returns True if key was set (new), None if key existed
|
|
was_set = await redis.set(key, "1", nx=True, ex=_DEDUP_TTL_SECONDS)
|
|
|
|
if was_set:
|
|
# Key was freshly created — this is NOT a duplicate
|
|
return False
|
|
|
|
# Key already existed — this IS a duplicate
|
|
logger.info(
|
|
"Duplicate message detected: tenant=%s message_id=%s — skipping",
|
|
tenant_id,
|
|
message_id,
|
|
)
|
|
return True
|
|
|
|
|
|
async def mark_processed(
|
|
tenant_id: str,
|
|
message_id: str,
|
|
redis: Redis, # type: ignore[type-arg]
|
|
) -> None:
|
|
"""
|
|
Explicitly mark a message as processed (without the duplicate check).
|
|
|
|
Use this when you want to mark a message as seen without the
|
|
check-and-mark semantics of ``is_duplicate``. Typically you'll use
|
|
``is_duplicate`` instead (which does both).
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
message_id: Unique message identifier.
|
|
redis: Async Redis client.
|
|
"""
|
|
key = idempotency_key(tenant_id, message_id)
|
|
await redis.set(key, "1", ex=_DEDUP_TTL_SECONDS)
|