diff --git a/docker-compose.yml b/docker-compose.yml index 2798bb7..ce7646f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -136,6 +136,48 @@ services: timeout: 5s retries: 5 + gateway: + build: + context: . + dockerfile_inline: | + FROM python:3.12-slim + WORKDIR /app + RUN pip install uv + COPY pyproject.toml ./ + COPY packages/shared ./packages/shared + COPY packages/router ./packages/router + COPY packages/gateway ./packages/gateway + COPY packages/orchestrator ./packages/orchestrator + RUN uv pip install --system -e packages/shared -e packages/router -e packages/gateway -e packages/orchestrator + CMD ["uvicorn", "gateway.main:app", "--host", "0.0.0.0", "--port", "8001"] + container_name: konstruct-gateway + ports: + - "8001:8001" + networks: + - konstruct-net + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + celery-worker: + condition: service_started + environment: + - DATABASE_URL=postgresql+asyncpg://konstruct_app:konstruct_dev@postgres:5432/konstruct + - REDIS_URL=redis://redis:6379/0 + - CELERY_BROKER_URL=redis://redis:6379/1 + - CELERY_RESULT_BACKEND=redis://redis:6379/2 + - SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN:-} + - SLACK_SIGNING_SECRET=${SLACK_SIGNING_SECRET:-} + - SLACK_APP_TOKEN=${SLACK_APP_TOKEN:-} + - LOG_LEVEL=INFO + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:8001/health || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + celery-worker: build: context: . diff --git a/packages/gateway/gateway/__init__.py b/packages/gateway/gateway/__init__.py new file mode 100644 index 0000000..27b759a --- /dev/null +++ b/packages/gateway/gateway/__init__.py @@ -0,0 +1,7 @@ +""" +Konstruct Channel Gateway. + +Unified ingress for all messaging platforms. Each channel adapter normalizes +inbound events into KonstructMessage format before dispatching to the +Message Router / Celery orchestrator. +""" diff --git a/packages/gateway/gateway/channels/__init__.py b/packages/gateway/gateway/channels/__init__.py new file mode 100644 index 0000000..64249c8 --- /dev/null +++ b/packages/gateway/gateway/channels/__init__.py @@ -0,0 +1,6 @@ +""" +Channel adapter modules. + +Each adapter handles the channel-specific event format and normalizes +events into KonstructMessage before dispatching to the orchestrator. +""" diff --git a/packages/gateway/gateway/channels/slack.py b/packages/gateway/gateway/channels/slack.py new file mode 100644 index 0000000..dc12bcc --- /dev/null +++ b/packages/gateway/gateway/channels/slack.py @@ -0,0 +1,280 @@ +""" +Slack channel adapter. + +Handles Slack Events API events via slack-bolt AsyncApp. + +EVENT FLOW: + 1. Slack sends event to /slack/events (HTTP 200 must be returned in <3s) + 2. Handler normalizes event -> KonstructMessage + 3. Tenant resolved from workspace_id + 4. Rate limit checked + 5. Idempotency checked (prevents duplicate processing on Slack retries) + 6. Placeholder "Thinking..." message posted in-thread (typing indicator) + 7. Celery task dispatched with message + placeholder details + 8. Celery worker calls LLM pool, replaces placeholder with real response + +CRITICAL: DO NOT perform any LLM work inside event handlers. +Slack retries after 3 seconds if HTTP 200 is not received. All heavyweight +work must be dispatched to Celery before returning. + +THREAD FOLLOW-UP: + After the first @mention in a thread, subsequent messages in that thread + (even without @mention) trigger a response for 30 minutes of idle time. + This is tracked via Redis engaged_thread_key with a 30-minute TTL. +""" + +from __future__ import annotations + +import logging + +from redis.asyncio import Redis +from slack_bolt.async_app import AsyncApp +from sqlalchemy.ext.asyncio import AsyncSession + +from gateway.normalize import normalize_slack_event +from router.idempotency import is_duplicate +from router.ratelimit import RateLimitExceeded, check_rate_limit +from router.tenant import resolve_tenant +from shared.redis_keys import engaged_thread_key + +logger = logging.getLogger(__name__) + +# How long a thread stays "engaged" after the last @mention (seconds) +_ENGAGED_THREAD_TTL = 1800 # 30 minutes + + +def register_slack_handlers( + slack_app: AsyncApp, + redis: Redis, # type: ignore[type-arg] + get_session: object, # Callable returning AsyncSession context manager +) -> None: + """ + Register Slack event handlers on the slack-bolt AsyncApp. + + Call this once at application startup after creating the AsyncApp instance. + + Args: + slack_app: The slack-bolt AsyncApp instance. + redis: Async Redis client for rate limiting + idempotency. + get_session: Async context manager factory for DB sessions. + Typically ``shared.db.async_session_factory``. + """ + + @slack_app.event("app_mention") + async def handle_app_mention(event: dict, say: object, client: object) -> None: + """ + Handle @mention events in channels. + + Called when a user @mentions the bot in any channel the bot belongs to. + """ + await _handle_slack_event( + event=event, + say=say, + client=client, + redis=redis, + get_session=get_session, + event_type="app_mention", + ) + + @slack_app.event("message") + async def handle_message(event: dict, say: object, client: object) -> None: + """ + Handle direct messages (DMs). + + Filtered to channel_type=="im" only — ignores channel messages + to avoid double-processing @mentions. + """ + # Only handle DMs to prevent double-triggering alongside app_mention + if event.get("channel_type") != "im": + return + + # Ignore bot messages to prevent infinite response loops + if event.get("bot_id") or event.get("subtype") == "bot_message": + return + + await _handle_slack_event( + event=event, + say=say, + client=client, + redis=redis, + get_session=get_session, + event_type="dm", + ) + + +async def _handle_slack_event( + event: dict, + say: object, + client: object, + redis: Redis, # type: ignore[type-arg] + get_session: object, + event_type: str, +) -> None: + """ + Shared handler logic for app_mention and DM message events. + + Performs: normalize -> tenant resolve -> rate limit -> idempotency -> + post placeholder -> dispatch Celery -> mark thread engaged. + + All work is dispatched to Celery before returning. HTTP 200 is returned + to Slack immediately by slack-bolt after this coroutine completes. + """ + # Ignore bot messages (double-check here for safety) + if event.get("bot_id") or event.get("subtype") == "bot_message": + return + + # Extract workspace_id from the outer context — injected via middleware + # or extracted from the Slack signing payload. + workspace_id: str = event.get("_workspace_id", "") + bot_user_id: str = event.get("_bot_user_id", "") + + # Step 1: Normalize to KonstructMessage + msg = normalize_slack_event( + event=event, + workspace_id=workspace_id, + bot_user_id=bot_user_id, + ) + + # Step 2: Resolve tenant from workspace_id + tenant_id: str | None = None + async with get_session() as session: # type: ignore[attr-defined] + tenant_id = await resolve_tenant( + workspace_id=workspace_id, + channel_type="slack", + session=session, + ) + + if tenant_id is None: + logger.warning( + "handle_slack_event: unknown workspace_id=%r event_type=%s — ignoring", + workspace_id, + event_type, + ) + return + + msg.tenant_id = tenant_id + + # Step 3: Check rate limit — post ephemeral rejection if exceeded + try: + await check_rate_limit( + tenant_id=tenant_id, + channel="slack", + redis=redis, + ) + except RateLimitExceeded as exc: + logger.info( + "Rate limit exceeded: tenant=%s — posting ephemeral rejection", + tenant_id, + ) + # Post ephemeral message visible only to the requesting user + try: + await client.chat_postEphemeral( # type: ignore[union-attr] + channel=event.get("channel", ""), + user=event.get("user", ""), + text=( + f"I'm receiving too many requests right now. " + f"Please try again in about {exc.remaining_seconds} seconds." + ), + ) + except Exception: + logger.exception("Failed to post rate limit ephemeral message") + return + + # Step 4: Idempotency check — skip duplicate events (Slack retry protection) + event_ts: str = event.get("ts", msg.id) + if await is_duplicate(tenant_id, event_ts, redis): + logger.debug( + "Duplicate Slack event: tenant=%s event_ts=%s — skipping", + tenant_id, + event_ts, + ) + return + + # Step 5: Check thread engagement — auto-follow messages in engaged threads + thread_id = msg.thread_id or event_ts + is_engaged = await _is_engaged_thread(tenant_id, thread_id, redis) + + # For channel messages: only respond to @mentions or engaged threads + if event_type != "dm" and not is_engaged: + # This shouldn't happen for app_mention, but guard defensively + # (e.g., if the event somehow arrives without mention metadata) + pass # Fall through to dispatch — app_mention always warrants a response + + # Step 6: Post placeholder "Thinking..." message in thread + channel_id: str = event.get("channel", "") + placeholder_ts: str = "" + + try: + placeholder_resp = await client.chat_postMessage( # type: ignore[union-attr] + channel=channel_id, + thread_ts=thread_id, + text="_Thinking..._", + ) + placeholder_ts = placeholder_resp.get("ts", "") # type: ignore[union-attr] + except Exception: + logger.exception( + "Failed to post placeholder message: tenant=%s channel=%s thread=%s", + tenant_id, + channel_id, + thread_id, + ) + # Continue even if placeholder fails — still dispatch to Celery + # The Celery task will post a new message instead of updating + + # Step 7: Dispatch to Celery (fire-and-forget) + # Import here to avoid circular imports at module load time + from orchestrator.tasks import handle_message as handle_message_task # noqa: PLC0415 + + task_payload = msg.model_dump() | { + "placeholder_ts": placeholder_ts, + "channel_id": channel_id, + } + + try: + handle_message_task.delay(task_payload) + except Exception: + logger.exception( + "Failed to dispatch handle_message task: tenant=%s msg_id=%s", + tenant_id, + msg.id, + ) + return + + # Step 8: Mark thread as engaged (auto-follow for 30 minutes) + await _mark_thread_engaged(tenant_id, thread_id, redis) + + logger.info( + "Dispatched: event_type=%s tenant=%s msg_id=%s thread=%s", + event_type, + tenant_id, + msg.id, + thread_id, + ) + + +async def _is_engaged_thread( + tenant_id: str, + thread_id: str, + redis: Redis, # type: ignore[type-arg] +) -> bool: + """Check if a thread is currently engaged (bot responded recently).""" + key = engaged_thread_key(tenant_id, thread_id) + try: + result = await redis.exists(key) + return bool(result) + except Exception: + logger.exception("Failed to check engaged thread: tenant=%s thread=%s", tenant_id, thread_id) + return False + + +async def _mark_thread_engaged( + tenant_id: str, + thread_id: str, + redis: Redis, # type: ignore[type-arg] +) -> None: + """Mark a thread as engaged with a 30-minute TTL.""" + key = engaged_thread_key(tenant_id, thread_id) + try: + await redis.set(key, "1", ex=_ENGAGED_THREAD_TTL) + except Exception: + logger.exception("Failed to mark thread engaged: tenant=%s thread=%s", tenant_id, thread_id) diff --git a/packages/gateway/gateway/main.py b/packages/gateway/gateway/main.py new file mode 100644 index 0000000..dbb63a3 --- /dev/null +++ b/packages/gateway/gateway/main.py @@ -0,0 +1,107 @@ +""" +Channel Gateway — FastAPI application. + +Mounts the slack-bolt AsyncApp as a sub-application at /slack/events. +All other channels will be added as additional sub-applications in Phase 2. + +Port: 8001 + +Endpoints: + POST /slack/events — Slack Events API webhook (handled by slack-bolt) + GET /health — Health check + +Startup sequence: + 1. Create Redis connection + 2. Create slack-bolt AsyncApp (signing_secret=...) + 3. Register Slack event handlers + 4. Mount slack-bolt request handler at /slack/events + 5. Expose /health +""" + +from __future__ import annotations + +import logging + +from fastapi import FastAPI, Request, Response +from redis.asyncio import Redis +from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler +from slack_bolt.async_app import AsyncApp + +from gateway.channels.slack import register_slack_handlers +from shared.config import settings +from shared.db import async_session_factory + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- +app = FastAPI( + title="Konstruct Channel Gateway", + description="Unified ingress for all messaging platforms", + version="0.1.0", +) + +# --------------------------------------------------------------------------- +# Slack bolt app — initialized at module import time. +# signing_secret="" is safe for local dev/testing; set via env in production. +# --------------------------------------------------------------------------- +slack_app = AsyncApp( + token=settings.slack_bot_token or None, + signing_secret=settings.slack_signing_secret or None, + # In HTTP mode (Events API), token_verification_enabled must be True + # slack-bolt validates signing_secret on every inbound request +) + +# Async Redis client — shared across all request handlers +_redis: Redis | None = None # type: ignore[type-arg] + + +def _get_redis() -> Redis: # type: ignore[type-arg] + """Return the module-level Redis client, creating it if necessary.""" + global _redis + if _redis is None: + _redis = Redis.from_url(settings.redis_url, decode_responses=True) + return _redis + + +# --------------------------------------------------------------------------- +# Register Slack event handlers +# --------------------------------------------------------------------------- +register_slack_handlers( + slack_app=slack_app, + redis=_get_redis(), + get_session=async_session_factory, +) + +# --------------------------------------------------------------------------- +# Slack request handler — adapts slack-bolt AsyncApp to FastAPI +# --------------------------------------------------------------------------- +slack_handler = AsyncSlackRequestHandler(slack_app) + + +# --------------------------------------------------------------------------- +# Routes +# --------------------------------------------------------------------------- + + +@app.post("/slack/events") +async def slack_events(request: Request) -> Response: + """ + Slack Events API webhook endpoint. + + slack-bolt's AsyncSlackRequestHandler handles: + - Slack signature verification (X-Slack-Signature) + - URL verification challenge (type=url_verification) + - Event routing to registered handlers + + CRITICAL: This endpoint MUST return HTTP 200 within 3 seconds. + All LLM/heavy work is dispatched to Celery inside the event handlers. + """ + return await slack_handler.handle(request) + + +@app.get("/health") +async def health() -> dict[str, str]: + """Health check endpoint.""" + return {"status": "ok", "service": "gateway"} diff --git a/packages/gateway/gateway/normalize.py b/packages/gateway/gateway/normalize.py new file mode 100644 index 0000000..b7675d9 --- /dev/null +++ b/packages/gateway/gateway/normalize.py @@ -0,0 +1,100 @@ +""" +Slack event normalization. + +Converts Slack Events API payloads into KonstructMessage format. + +All channel adapters produce KonstructMessage — the router and orchestrator +never inspect Slack-specific fields directly. +""" + +from __future__ import annotations + +import re +import uuid +from datetime import datetime, timezone + +from shared.models.message import ( + ChannelType, + KonstructMessage, + MessageContent, + SenderInfo, +) + +# Pattern to strip <@BOT_USER_ID> mentions from message text. +# Slack injects <@U...> tokens for @mentions — we strip the bot mention +# so the agent sees clean user text, not the mention syntax. +_BOT_MENTION_RE = re.compile(r"<@[A-Z0-9]+>") + + +def normalize_slack_event( + event: dict, + workspace_id: str, + bot_user_id: str = "", +) -> KonstructMessage: + """ + Normalize a Slack Events API event payload into a KonstructMessage. + + Handles both ``app_mention`` events (where the bot is @mentioned in a + channel) and ``message`` events in DMs (``channel_type == "im"``). + + The bot mention token (``<@BOT_USER_ID>``) is stripped from the beginning + of the text for ``app_mention`` events so the agent receives clean input. + + Args: + event: The inner ``event`` dict from the Slack Events API payload. + workspace_id: The Slack workspace ID (team_id from the outer payload). + bot_user_id: The bot's Slack user ID (used for mention stripping). + + Returns: + A fully-populated KonstructMessage. ``tenant_id`` is ``None`` at this + stage — the Message Router populates it via channel_connections lookup. + """ + # Extract and clean user text + raw_text: str = event.get("text", "") or "" + # Strip any <@BOT_ID> mention tokens from the message + clean_text = _BOT_MENTION_RE.sub("", raw_text).strip() + + # Slack thread_ts is the canonical thread identifier + thread_ts: str | None = event.get("thread_ts") or event.get("ts") + + # Timestamp — Slack uses Unix float strings ("1234567890.123456") + ts_raw = event.get("ts", "0") + try: + ts_float = float(ts_raw) + timestamp = datetime.fromtimestamp(ts_float, tz=timezone.utc) + except (ValueError, TypeError): + timestamp = datetime.now(tz=timezone.utc) + + # User info — Slack provides user_id; display name is enriched later + sender_user_id: str = event.get("user", "") or "" + is_bot = bool(event.get("bot_id") or event.get("subtype") == "bot_message") + + # Build the set of mentions present in the original text + mentions: list[str] = _BOT_MENTION_RE.findall(raw_text) + # Strip angle brackets from extracted tokens: <@U123> -> U123 + mentions = [m.strip("<>@") for m in mentions] + + return KonstructMessage( + id=str(uuid.uuid4()), + tenant_id=None, # Populated by Message Router + channel=ChannelType.SLACK, + channel_metadata={ + "workspace_id": workspace_id, + "channel_id": event.get("channel", ""), + "thread_ts": thread_ts, + "bot_user_id": bot_user_id, + "event_ts": ts_raw, + "channel_type": event.get("channel_type", ""), + }, + sender=SenderInfo( + user_id=sender_user_id, + display_name=sender_user_id, # Enriched later if needed + is_bot=is_bot, + ), + content=MessageContent( + text=clean_text, + mentions=mentions, + ), + timestamp=timestamp, + thread_id=thread_ts, + ) diff --git a/packages/gateway/gateway/verify.py b/packages/gateway/gateway/verify.py new file mode 100644 index 0000000..0f6a72e --- /dev/null +++ b/packages/gateway/gateway/verify.py @@ -0,0 +1,64 @@ +""" +Slack request signature verification. + +slack-bolt's AsyncApp handles signature verification automatically when +initialized with a signing_secret. This module provides a standalone +helper for contexts that require manual verification (e.g., testing, +custom middleware layers). + +In production, prefer slack-bolt's built-in verification — do NOT disable +it or bypass it. +""" + +from __future__ import annotations + +import hashlib +import hmac +import time + + +def verify_slack_signature( + body: bytes, + timestamp: str, + signature: str, + signing_secret: str, + max_age_seconds: int = 300, +) -> bool: + """ + Verify a Slack webhook request signature. + + Implements Slack's signing secret verification algorithm: + https://api.slack.com/authentication/verifying-requests-from-slack + + Args: + body: Raw request body bytes. + timestamp: Value of the ``X-Slack-Request-Timestamp`` header. + signature: Value of the ``X-Slack-Signature`` header. + signing_secret: App's signing secret from Slack dashboard. + max_age_seconds: Reject requests older than this (replay protection). + + Returns: + True if signature is valid and request is fresh, False otherwise. + """ + # Replay attack prevention — reject stale requests + try: + request_age = abs(int(time.time()) - int(timestamp)) + except (ValueError, TypeError): + return False + + if request_age > max_age_seconds: + return False + + # Compute expected signature + sig_basestring = f"v0:{timestamp}:{body.decode('utf-8', errors='replace')}" + computed = ( + "v0=" + + hmac.new( + signing_secret.encode("utf-8"), + sig_basestring.encode("utf-8"), + hashlib.sha256, + ).hexdigest() + ) + + # Constant-time comparison to prevent timing attacks + return hmac.compare_digest(computed, signature) diff --git a/packages/gateway/pyproject.toml b/packages/gateway/pyproject.toml index d87b4f5..641ca2c 100644 --- a/packages/gateway/pyproject.toml +++ b/packages/gateway/pyproject.toml @@ -9,14 +9,19 @@ description = "Channel Gateway — unified ingress for all messaging platforms" requires-python = ">=3.12" dependencies = [ "konstruct-shared", + "konstruct-router", + "konstruct-orchestrator", "fastapi[standard]>=0.115.0", "slack-bolt>=1.22.0", "python-telegram-bot>=21.0", "httpx>=0.28.0", + "redis>=5.0.0", ] [tool.uv.sources] konstruct-shared = { workspace = true } +konstruct-router = { workspace = true } +konstruct-orchestrator = { workspace = true } [tool.hatch.build.targets.wheel] packages = ["gateway"] diff --git a/packages/orchestrator/orchestrator/tasks.py b/packages/orchestrator/orchestrator/tasks.py index 63f893b..a4f9d1c 100644 --- a/packages/orchestrator/orchestrator/tasks.py +++ b/packages/orchestrator/orchestrator/tasks.py @@ -34,16 +34,26 @@ def handle_message(self, message_data: dict) -> dict: # type: ignore[no-untyped Process an inbound Konstruct message through the agent pipeline. This task is the primary entry point for the Celery worker. It is dispatched - by the Message Router (or Channel Gateway in simple deployments) after tenant - resolution completes. + by the Channel Gateway after tenant resolution completes. + + The ``message_data`` dict MAY contain extra keys beyond KonstructMessage + fields. Specifically, the Slack handler injects: + - ``placeholder_ts``: Slack message timestamp of the "Thinking..." placeholder + - ``channel_id``: Slack channel ID where the response should be posted + + These are extracted before KonstructMessage validation and used to update + the placeholder with the real LLM response via chat.update. Pipeline: - 1. Deserialize message_data -> KonstructMessage - 2. Run async agent pipeline via asyncio.run() - 3. Return response dict + 1. Extract Slack reply metadata (placeholder_ts, channel_id) if present + 2. Deserialize message_data -> KonstructMessage + 3. Run async agent pipeline via asyncio.run() + 4. If Slack metadata present: call chat.update to replace placeholder + 5. Return response dict Args: - message_data: JSON-serializable dict representation of a KonstructMessage. + message_data: JSON-serializable dict. Must contain KonstructMessage + fields plus optional ``placeholder_ts`` and ``channel_id``. Returns: Dict with keys: @@ -51,25 +61,40 @@ def handle_message(self, message_data: dict) -> dict: # type: ignore[no-untyped - response (str): Agent's response text - tenant_id (str | None): Tenant that handled the message """ + # Extract Slack-specific reply metadata before model validation + # (KonstructMessage doesn't know about these fields) + placeholder_ts: str = message_data.pop("placeholder_ts", "") or "" + channel_id: str = message_data.pop("channel_id", "") or "" + try: msg = KonstructMessage.model_validate(message_data) except Exception as exc: logger.exception("Failed to deserialize KonstructMessage: %s", message_data) raise self.retry(exc=exc) - result = asyncio.run(_process_message(msg)) + result = asyncio.run(_process_message(msg, placeholder_ts=placeholder_ts, channel_id=channel_id)) return result -async def _process_message(msg: KonstructMessage) -> dict: +async def _process_message( + msg: KonstructMessage, + placeholder_ts: str = "", + channel_id: str = "", +) -> dict: """ Async agent pipeline — load agent config, build prompt, call LLM pool. + After getting the LLM response, if Slack placeholder metadata is present, + updates the "Thinking..." placeholder message with the real response using + Slack's chat.update API. + This function is called from the synchronous handle_message task via asyncio.run(). It must not be called directly from Celery task code. Args: - msg: The deserialized KonstructMessage. + msg: The deserialized KonstructMessage. + placeholder_ts: Slack message timestamp of the "Thinking..." placeholder. + channel_id: Slack channel ID for the chat.update call. Returns: Dict with message_id, response, and tenant_id. @@ -94,6 +119,8 @@ async def _process_message(msg: KonstructMessage) -> dict: tenant_uuid = uuid.UUID(msg.tenant_id) token = current_tenant_id.set(tenant_uuid) + slack_bot_token: str = "" + try: agent: Agent | None = None async with async_session_factory() as session: @@ -107,6 +134,21 @@ async def _process_message(msg: KonstructMessage) -> dict: ) result = await session.execute(stmt) agent = result.scalars().first() + + # Load the bot token for this tenant from channel_connections config + if agent is not None and placeholder_ts and channel_id: + from shared.models.tenant import ChannelConnection, ChannelTypeEnum + + conn_stmt = ( + select(ChannelConnection) + .where(ChannelConnection.tenant_id == tenant_uuid) + .where(ChannelConnection.channel_type == ChannelTypeEnum.SLACK) + .limit(1) + ) + conn_result = await session.execute(conn_stmt) + conn = conn_result.scalars().first() + if conn and conn.config: + slack_bot_token = conn.config.get("bot_token", "") finally: # Always reset the RLS context var after DB work is done current_tenant_id.reset(token) @@ -117,9 +159,17 @@ async def _process_message(msg: KonstructMessage) -> dict: msg.tenant_id, msg.id, ) + no_agent_response = "No active agent is configured for your workspace. Please contact your administrator." + if placeholder_ts and channel_id: + await _update_slack_placeholder( + bot_token=slack_bot_token, + channel_id=channel_id, + placeholder_ts=placeholder_ts, + text=no_agent_response, + ) return { "message_id": msg.id, - "response": "No active agent is configured for your workspace. Please contact your administrator.", + "response": no_agent_response, "tenant_id": msg.tenant_id, } @@ -132,8 +182,78 @@ async def _process_message(msg: KonstructMessage) -> dict: msg.tenant_id, ) + # Replace the "Thinking..." placeholder with the real response + if placeholder_ts and channel_id: + await _update_slack_placeholder( + bot_token=slack_bot_token, + channel_id=channel_id, + placeholder_ts=placeholder_ts, + text=response_text, + ) + return { "message_id": msg.id, "response": response_text, "tenant_id": msg.tenant_id, } + + +async def _update_slack_placeholder( + bot_token: str, + channel_id: str, + placeholder_ts: str, + text: str, +) -> None: + """ + Replace the "Thinking..." placeholder message with the real agent response. + + Uses Slack's chat.update API via httpx (no slack-bolt dependency in + orchestrator — keeps the service boundary clean). + + Per user decision: responses are always posted in threads (thread_ts is + set to placeholder_ts — the placeholder was posted in-thread). + + Args: + bot_token: Slack bot token (xoxb-...) for this tenant. + channel_id: Slack channel ID where the placeholder was posted. + placeholder_ts: Slack message timestamp of the placeholder to replace. + text: The real LLM response to replace the placeholder with. + """ + import httpx + + if not bot_token: + # No bot token available — cannot update via Slack API. + # This happens when channel_connections has no bot_token in config. + # Log and continue — the placeholder will remain as "Thinking...". + logger.warning( + "No Slack bot token for channel=%s placeholder_ts=%s — cannot update placeholder", + channel_id, + placeholder_ts, + ) + return + + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + response = await client.post( + "https://slack.com/api/chat.update", + headers={"Authorization": f"Bearer {bot_token}"}, + json={ + "channel": channel_id, + "ts": placeholder_ts, + "text": text, + }, + ) + data = response.json() + if not data.get("ok"): + logger.error( + "chat.update failed: channel=%s ts=%s error=%r", + channel_id, + placeholder_ts, + data.get("error"), + ) + except Exception: + logger.exception( + "Failed to update Slack placeholder: channel=%s ts=%s", + channel_id, + placeholder_ts, + ) diff --git a/packages/router/pyproject.toml b/packages/router/pyproject.toml index 7704478..d73d2b9 100644 --- a/packages/router/pyproject.toml +++ b/packages/router/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "konstruct-shared", "fastapi[standard]>=0.115.0", "httpx>=0.28.0", + "redis>=5.0.0", ] [tool.uv.sources] diff --git a/packages/router/router/__init__.py b/packages/router/router/__init__.py new file mode 100644 index 0000000..5581cf3 --- /dev/null +++ b/packages/router/router/__init__.py @@ -0,0 +1,6 @@ +""" +Konstruct Message Router. + +Handles tenant resolution, rate limiting, idempotency deduplication, +and context loading before dispatching to the Agent Orchestrator. +""" diff --git a/packages/router/router/context.py b/packages/router/router/context.py new file mode 100644 index 0000000..5f932e8 --- /dev/null +++ b/packages/router/router/context.py @@ -0,0 +1,76 @@ +""" +Agent context loading. + +Loads the active agent for a tenant before message processing. Phase 1 supports +a single agent per tenant. The RLS context variable must be set before calling +any function here so that PostgreSQL RLS filters correctly. +""" + +from __future__ import annotations + +import logging +import uuid + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.models.tenant import Agent +from shared.rls import current_tenant_id + +logger = logging.getLogger(__name__) + + +async def load_agent_for_tenant( + tenant_id: str, + session: AsyncSession, +) -> Agent | None: + """ + Load the active agent for a tenant. + + Sets the ``current_tenant_id`` ContextVar so that PostgreSQL RLS policies + correctly filter the agents table to only return rows belonging to this + tenant. + + Phase 1: Returns the first active agent for the tenant (single-agent model). + Phase 2+: Will support agent selection based on message content and routing + rules. + + Args: + tenant_id: Konstruct tenant ID as a UUID string. + session: Async SQLAlchemy session. + + Returns: + The active Agent ORM instance, or None if no active agent is configured. + """ + try: + tenant_uuid = uuid.UUID(tenant_id) + except (ValueError, AttributeError): + logger.error("load_agent_for_tenant: invalid tenant_id=%r", tenant_id) + return None + + # Set RLS context so the DB query is correctly scoped to this tenant + token = current_tenant_id.set(tenant_uuid) + try: + stmt = ( + select(Agent) + .where(Agent.tenant_id == tenant_uuid) + .where(Agent.is_active.is_(True)) + .limit(1) + ) + result = await session.execute(stmt) + agent = result.scalars().first() + except Exception: + logger.exception( + "load_agent_for_tenant: DB error for tenant=%s", tenant_id + ) + return None + finally: + # Always reset the RLS context var after DB work completes + current_tenant_id.reset(token) + + if agent is None: + logger.warning( + "load_agent_for_tenant: no active agent for tenant=%s", tenant_id + ) + + return agent diff --git a/packages/router/router/idempotency.py b/packages/router/router/idempotency.py new file mode 100644 index 0000000..6c1ba4e --- /dev/null +++ b/packages/router/router/idempotency.py @@ -0,0 +1,87 @@ +""" +Message deduplication (idempotency). + +Slack (and other channels) retry event delivery when the gateway does not +respond with HTTP 200 within 3 seconds. This module tracks which message +IDs have already been dispatched to Celery, preventing duplicate processing. + +Design: + - Key: {tenant_id}:dedup:{message_id} (from shared.redis_keys) + - TTL: 24 hours (Slack retries stop after ~1 hour; 24h is conservative) + - Op: SET NX (atomic check-and-set) +""" + +from __future__ import annotations + +import logging + +from redis.asyncio import Redis + +from shared.redis_keys import idempotency_key + +logger = logging.getLogger(__name__) + +# How long to remember a message ID (seconds). +# Slack retries for up to ~1 hour; 24h gives plenty of buffer. +_DEDUP_TTL_SECONDS = 86400 # 24 hours + + +async def is_duplicate( + tenant_id: str, + message_id: str, + redis: Redis, # type: ignore[type-arg] +) -> bool: + """ + Check if this message has already been dispatched for processing. + + Uses SET NX (set-if-not-exists) as an atomic check-and-mark operation. + If the key did not exist, it is created with a 24-hour TTL and this + function returns False (not a duplicate — process it). + If the key already existed, this function returns True (duplicate — skip). + + Args: + tenant_id: Konstruct tenant identifier. + message_id: Unique message identifier (e.g. Slack event_ts or UUID). + redis: Async Redis client. + + Returns: + True if this message is a duplicate (already dispatched). + False if this is the first time we've seen this message. + """ + key = idempotency_key(tenant_id, message_id) + + # SET key "1" NX EX ttl — returns True if key was set (new), None if key existed + was_set = await redis.set(key, "1", nx=True, ex=_DEDUP_TTL_SECONDS) + + if was_set: + # Key was freshly created — this is NOT a duplicate + return False + + # Key already existed — this IS a duplicate + logger.info( + "Duplicate message detected: tenant=%s message_id=%s — skipping", + tenant_id, + message_id, + ) + return True + + +async def mark_processed( + tenant_id: str, + message_id: str, + redis: Redis, # type: ignore[type-arg] +) -> None: + """ + Explicitly mark a message as processed (without the duplicate check). + + Use this when you want to mark a message as seen without the + check-and-mark semantics of ``is_duplicate``. Typically you'll use + ``is_duplicate`` instead (which does both). + + Args: + tenant_id: Konstruct tenant identifier. + message_id: Unique message identifier. + redis: Async Redis client. + """ + key = idempotency_key(tenant_id, message_id) + await redis.set(key, "1", ex=_DEDUP_TTL_SECONDS) diff --git a/packages/router/router/main.py b/packages/router/router/main.py new file mode 100644 index 0000000..b933322 --- /dev/null +++ b/packages/router/router/main.py @@ -0,0 +1,24 @@ +""" +Message Router — FastAPI application. + +The router is an internal service. In the current architecture (Phase 1), +routing logic is embedded directly in the channel gateway handlers rather +than as a separate HTTP call. This FastAPI app provides a health endpoint +and is a placeholder for future standalone router deployments. +""" + +from __future__ import annotations + +from fastapi import FastAPI + +app = FastAPI( + title="Konstruct Message Router", + description="Tenant resolution, rate limiting, context loading", + version="0.1.0", +) + + +@app.get("/health") +async def health() -> dict[str, str]: + """Health check endpoint.""" + return {"status": "ok", "service": "router"} diff --git a/packages/router/router/ratelimit.py b/packages/router/router/ratelimit.py new file mode 100644 index 0000000..076e0c9 --- /dev/null +++ b/packages/router/router/ratelimit.py @@ -0,0 +1,121 @@ +""" +Redis token bucket rate limiter. + +Implements a sliding window token bucket using Redis atomic operations. + +Design: + - Key: {tenant_id}:ratelimit:{channel} (from shared.redis_keys) + - Window: configurable (default 60s) + - Tokens: configurable (default 30 per window per tenant per channel) + - Storage: INCR + EXPIRE (atomic via pipeline) + +The token bucket approach: + 1. INCR the counter key + 2. If count == 1, set EXPIRE (first request in window — starts the clock) + 3. If count > limit: raise RateLimitExceeded + 4. Otherwise: return True (request allowed) + +This is NOT a sliding window (it's a fixed window with INCR/EXPIRE) — it's +simple, Redis-atomic, and correct enough for Phase 1. A true sliding window +can be implemented with ZADD/ZREMRANGEBYSCORE later if needed. +""" + +from __future__ import annotations + +import logging + +from redis.asyncio import Redis + +from shared.redis_keys import rate_limit_key + +logger = logging.getLogger(__name__) + +# Default rate limit configuration — override per-tenant in Phase 2 +_DEFAULT_LIMIT = 30 # Max requests per window +_DEFAULT_WINDOW = 60 # Window duration in seconds + + +class RateLimitExceeded(Exception): + """ + Raised when a tenant's per-channel rate limit is exceeded. + + Attributes: + tenant_id: The tenant that exceeded the limit. + channel: The channel that hit the limit. + remaining_seconds: Approximate TTL on the rate limit key (how long + until the window resets). + """ + + def __init__( + self, + tenant_id: str, + channel: str, + remaining_seconds: int = 60, + ) -> None: + self.tenant_id = tenant_id + self.channel = channel + self.remaining_seconds = remaining_seconds + super().__init__( + f"Rate limit exceeded for tenant={tenant_id} channel={channel}. " + f"Resets in ~{remaining_seconds}s." + ) + + +async def check_rate_limit( + tenant_id: str, + channel: str, + redis: Redis, # type: ignore[type-arg] + limit: int = _DEFAULT_LIMIT, + window_seconds: int = _DEFAULT_WINDOW, +) -> bool: + """ + Check whether the tenant-channel combination is within its rate limit. + + Uses an atomic INCR + EXPIRE pipeline. On the first request in a new + window the counter is set and the TTL clock starts. Subsequent requests + increment the counter; once it exceeds ``limit``, RateLimitExceeded is + raised with the remaining window TTL. + + Args: + tenant_id: Konstruct tenant identifier. + channel: Channel string (e.g. "slack"). + redis: Async Redis client. + limit: Maximum requests per window (default 30). + window_seconds: Window duration in seconds (default 60). + + Returns: + True if the request is allowed. + + Raises: + RateLimitExceeded: If the request exceeds the limit. + """ + key = rate_limit_key(tenant_id, channel) + + # Atomic pipeline: INCR then conditional EXPIRE + pipe = redis.pipeline(transaction=True) + pipe.incr(key) + pipe.ttl(key) + results = await pipe.execute() + + count: int = results[0] + ttl: int = results[1] + + # If TTL is -1, the key exists but has no expiry — set one now. + # This handles the case where INCR created the key but EXPIRE wasn't set yet. + if ttl == -1 or count == 1: + await redis.expire(key, window_seconds) + ttl = window_seconds + + if count > limit: + remaining = max(ttl, 0) + logger.warning( + "Rate limit exceeded: tenant=%s channel=%s count=%d limit=%d ttl=%d", + tenant_id, + channel, + count, + limit, + remaining, + ) + raise RateLimitExceeded(tenant_id, channel, remaining_seconds=remaining) + + return True diff --git a/packages/router/router/tenant.py b/packages/router/router/tenant.py new file mode 100644 index 0000000..7e1fc6f --- /dev/null +++ b/packages/router/router/tenant.py @@ -0,0 +1,102 @@ +""" +Tenant resolution — maps channel workspace IDs to Konstruct tenant IDs. + +This is the ONE pre-RLS query in the system. Tenant resolution must work +across all tenants because we don't know which tenant owns a message until +after we resolve it. The query bypasses RLS by using the admin/superuser +connection for this specific lookup only. + +Design: + - Query `channel_connections` for matching workspace_id + channel_type + - Returns the tenant_id UUID as a string, or None if not found + - Uses a raw SELECT without RLS context (intentional — pre-resolution) +""" + +from __future__ import annotations + +import logging + +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.models.message import ChannelType +from shared.models.tenant import ChannelConnection, ChannelTypeEnum + +logger = logging.getLogger(__name__) + +# Map ChannelType (StrEnum from message.py) to ChannelTypeEnum (ORM enum from tenant.py) +_CHANNEL_TYPE_MAP: dict[str, ChannelTypeEnum] = { + "slack": ChannelTypeEnum.SLACK, + "whatsapp": ChannelTypeEnum.WHATSAPP, + "mattermost": ChannelTypeEnum.MATTERMOST, + "rocketchat": ChannelTypeEnum.ROCKETCHAT, + "teams": ChannelTypeEnum.TEAMS, + "telegram": ChannelTypeEnum.TELEGRAM, + "signal": ChannelTypeEnum.SIGNAL, +} + + +async def resolve_tenant( + workspace_id: str, + channel_type: ChannelType | str, + session: AsyncSession, +) -> str | None: + """ + Resolve a channel workspace ID to a Konstruct tenant ID. + + This is deliberately a RLS-bypass query — we cannot know which tenant to + set in `app.current_tenant` until after we resolve the tenant. The session + passed here should use the admin connection (postgres superuser) or the + konstruct_app role with RLS disabled for this specific query. + + In practice, for this single lookup, we disable the RLS SET LOCAL by + temporarily not setting `current_tenant_id` — the ContextVar defaults to + None, so the RLS hook does not inject SET LOCAL, and the query sees all + rows in `channel_connections`. + + Args: + workspace_id: Channel-native workspace identifier (e.g. Slack T12345). + channel_type: Channel type as ChannelType enum or string. + session: Async SQLAlchemy session. + + Returns: + Tenant ID as a string (UUID), or None if no matching connection found. + """ + channel_str = str(channel_type).lower() + orm_channel = _CHANNEL_TYPE_MAP.get(channel_str) + if orm_channel is None: + logger.warning("resolve_tenant: unknown channel_type=%r", channel_type) + return None + + try: + # Bypass RLS for this query — disable RLS row filtering at the session level + # by setting app.current_tenant to empty (no policy match = all rows visible + # to konstruct_app for SELECT on channel_connections). + # We use a raw SET LOCAL here to ensure the tenant policy is not applied. + await session.execute(text("SET LOCAL app.current_tenant = ''")) + + stmt = ( + select(ChannelConnection.tenant_id) + .where(ChannelConnection.channel_type == orm_channel) + .where(ChannelConnection.workspace_id == workspace_id) + .limit(1) + ) + result = await session.execute(stmt) + row = result.scalar_one_or_none() + except Exception: + logger.exception( + "resolve_tenant: DB error workspace_id=%r channel=%r", + workspace_id, + channel_type, + ) + return None + + if row is None: + logger.debug( + "resolve_tenant: no match workspace_id=%r channel=%r", + workspace_id, + channel_type, + ) + return None + + return str(row) diff --git a/uv.lock b/uv.lock index 769ad03..0dd6961 100644 --- a/uv.lock +++ b/uv.lock @@ -1167,8 +1167,11 @@ source = { editable = "packages/gateway" } dependencies = [ { name = "fastapi", extra = ["standard"] }, { name = "httpx" }, + { name = "konstruct-orchestrator" }, + { name = "konstruct-router" }, { name = "konstruct-shared" }, { name = "python-telegram-bot" }, + { name = "redis" }, { name = "slack-bolt" }, ] @@ -1176,8 +1179,11 @@ dependencies = [ requires-dist = [ { name = "fastapi", extras = ["standard"], specifier = ">=0.115.0" }, { name = "httpx", specifier = ">=0.28.0" }, + { name = "konstruct-orchestrator", editable = "packages/orchestrator" }, + { name = "konstruct-router", editable = "packages/router" }, { name = "konstruct-shared", editable = "packages/shared" }, { name = "python-telegram-bot", specifier = ">=21.0" }, + { name = "redis", specifier = ">=5.0.0" }, { name = "slack-bolt", specifier = ">=1.22.0" }, ] @@ -1227,6 +1233,7 @@ dependencies = [ { name = "fastapi", extra = ["standard"] }, { name = "httpx" }, { name = "konstruct-shared" }, + { name = "redis" }, ] [package.metadata] @@ -1234,6 +1241,7 @@ requires-dist = [ { name = "fastapi", extras = ["standard"], specifier = ">=0.115.0" }, { name = "httpx", specifier = ">=0.28.0" }, { name = "konstruct-shared", editable = "packages/shared" }, + { name = "redis", specifier = ">=5.0.0" }, ] [[package]]