- gateway/normalize.py: normalize_slack_event -> KonstructMessage (strips bot mention) - gateway/channels/slack.py: register_slack_handlers for app_mention + DM events - rate limit check -> ephemeral rejection on exceeded - idempotency dedup (Slack retry protection) - placeholder 'Thinking...' message posted in-thread before Celery dispatch - auto-follow engaged threads with 30-minute TTL - HTTP 200 returned immediately; all LLM work dispatched to Celery - gateway/main.py: FastAPI on port 8001, /slack/events + /health - router/tenant.py: resolve_tenant workspace_id -> tenant_id (RLS-bypass query) - router/ratelimit.py: check_rate_limit Redis token bucket, RateLimitExceeded exception - router/idempotency.py: is_duplicate + mark_processed (SET NX, 24h TTL) - router/context.py: load_agent_for_tenant with RLS ContextVar setup - orchestrator/tasks.py: handle_message now extracts placeholder_ts/channel_id, calls _update_slack_placeholder via chat.update after LLM response - docker-compose.yml: gateway service on port 8001 - pyproject.toml: added redis, konstruct-router, konstruct-orchestrator deps
122 lines
3.7 KiB
Python
122 lines
3.7 KiB
Python
"""
|
|
Redis token bucket rate limiter.
|
|
|
|
Implements a sliding window token bucket using Redis atomic operations.
|
|
|
|
Design:
|
|
- Key: {tenant_id}:ratelimit:{channel} (from shared.redis_keys)
|
|
- Window: configurable (default 60s)
|
|
- Tokens: configurable (default 30 per window per tenant per channel)
|
|
- Storage: INCR + EXPIRE (atomic via pipeline)
|
|
|
|
The token bucket approach:
|
|
1. INCR the counter key
|
|
2. If count == 1, set EXPIRE (first request in window — starts the clock)
|
|
3. If count > limit: raise RateLimitExceeded
|
|
4. Otherwise: return True (request allowed)
|
|
|
|
This is NOT a sliding window (it's a fixed window with INCR/EXPIRE) — it's
|
|
simple, Redis-atomic, and correct enough for Phase 1. A true sliding window
|
|
can be implemented with ZADD/ZREMRANGEBYSCORE later if needed.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from redis.asyncio import Redis
|
|
|
|
from shared.redis_keys import rate_limit_key
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default rate limit configuration — override per-tenant in Phase 2
|
|
_DEFAULT_LIMIT = 30 # Max requests per window
|
|
_DEFAULT_WINDOW = 60 # Window duration in seconds
|
|
|
|
|
|
class RateLimitExceeded(Exception):
|
|
"""
|
|
Raised when a tenant's per-channel rate limit is exceeded.
|
|
|
|
Attributes:
|
|
tenant_id: The tenant that exceeded the limit.
|
|
channel: The channel that hit the limit.
|
|
remaining_seconds: Approximate TTL on the rate limit key (how long
|
|
until the window resets).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
tenant_id: str,
|
|
channel: str,
|
|
remaining_seconds: int = 60,
|
|
) -> None:
|
|
self.tenant_id = tenant_id
|
|
self.channel = channel
|
|
self.remaining_seconds = remaining_seconds
|
|
super().__init__(
|
|
f"Rate limit exceeded for tenant={tenant_id} channel={channel}. "
|
|
f"Resets in ~{remaining_seconds}s."
|
|
)
|
|
|
|
|
|
async def check_rate_limit(
|
|
tenant_id: str,
|
|
channel: str,
|
|
redis: Redis, # type: ignore[type-arg]
|
|
limit: int = _DEFAULT_LIMIT,
|
|
window_seconds: int = _DEFAULT_WINDOW,
|
|
) -> bool:
|
|
"""
|
|
Check whether the tenant-channel combination is within its rate limit.
|
|
|
|
Uses an atomic INCR + EXPIRE pipeline. On the first request in a new
|
|
window the counter is set and the TTL clock starts. Subsequent requests
|
|
increment the counter; once it exceeds ``limit``, RateLimitExceeded is
|
|
raised with the remaining window TTL.
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
channel: Channel string (e.g. "slack").
|
|
redis: Async Redis client.
|
|
limit: Maximum requests per window (default 30).
|
|
window_seconds: Window duration in seconds (default 60).
|
|
|
|
Returns:
|
|
True if the request is allowed.
|
|
|
|
Raises:
|
|
RateLimitExceeded: If the request exceeds the limit.
|
|
"""
|
|
key = rate_limit_key(tenant_id, channel)
|
|
|
|
# Atomic pipeline: INCR then conditional EXPIRE
|
|
pipe = redis.pipeline(transaction=True)
|
|
pipe.incr(key)
|
|
pipe.ttl(key)
|
|
results = await pipe.execute()
|
|
|
|
count: int = results[0]
|
|
ttl: int = results[1]
|
|
|
|
# If TTL is -1, the key exists but has no expiry — set one now.
|
|
# This handles the case where INCR created the key but EXPIRE wasn't set yet.
|
|
if ttl == -1 or count == 1:
|
|
await redis.expire(key, window_seconds)
|
|
ttl = window_seconds
|
|
|
|
if count > limit:
|
|
remaining = max(ttl, 0)
|
|
logger.warning(
|
|
"Rate limit exceeded: tenant=%s channel=%s count=%d limit=%d ttl=%d",
|
|
tenant_id,
|
|
channel,
|
|
count,
|
|
limit,
|
|
remaining,
|
|
)
|
|
raise RateLimitExceeded(tenant_id, channel, remaining_seconds=remaining)
|
|
|
|
return True
|