feat(01-03): Channel Gateway (Slack adapter) and Message Router

- gateway/normalize.py: normalize_slack_event -> KonstructMessage (strips bot mention)
- gateway/channels/slack.py: register_slack_handlers for app_mention + DM events
  - rate limit check -> ephemeral rejection on exceeded
  - idempotency dedup (Slack retry protection)
  - placeholder 'Thinking...' message posted in-thread before Celery dispatch
  - auto-follow engaged threads with 30-minute TTL
  - HTTP 200 returned immediately; all LLM work dispatched to Celery
- gateway/main.py: FastAPI on port 8001, /slack/events + /health
- router/tenant.py: resolve_tenant workspace_id -> tenant_id (RLS-bypass query)
- router/ratelimit.py: check_rate_limit Redis token bucket, RateLimitExceeded exception
- router/idempotency.py: is_duplicate + mark_processed (SET NX, 24h TTL)
- router/context.py: load_agent_for_tenant with RLS ContextVar setup
- orchestrator/tasks.py: handle_message now extracts placeholder_ts/channel_id,
  calls _update_slack_placeholder via chat.update after LLM response
- docker-compose.yml: gateway service on port 8001
- pyproject.toml: added redis, konstruct-router, konstruct-orchestrator deps
This commit is contained in:
2026-03-23 10:27:59 -06:00
parent dcd89cc8fd
commit 6f30705e1a
17 changed files with 1166 additions and 10 deletions

View File

@@ -136,6 +136,48 @@ services:
timeout: 5s
retries: 5
gateway:
build:
context: .
dockerfile_inline: |
FROM python:3.12-slim
WORKDIR /app
RUN pip install uv
COPY pyproject.toml ./
COPY packages/shared ./packages/shared
COPY packages/router ./packages/router
COPY packages/gateway ./packages/gateway
COPY packages/orchestrator ./packages/orchestrator
RUN uv pip install --system -e packages/shared -e packages/router -e packages/gateway -e packages/orchestrator
CMD ["uvicorn", "gateway.main:app", "--host", "0.0.0.0", "--port", "8001"]
container_name: konstruct-gateway
ports:
- "8001:8001"
networks:
- konstruct-net
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
celery-worker:
condition: service_started
environment:
- DATABASE_URL=postgresql+asyncpg://konstruct_app:konstruct_dev@postgres:5432/konstruct
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/1
- CELERY_RESULT_BACKEND=redis://redis:6379/2
- SLACK_BOT_TOKEN=${SLACK_BOT_TOKEN:-}
- SLACK_SIGNING_SECRET=${SLACK_SIGNING_SECRET:-}
- SLACK_APP_TOKEN=${SLACK_APP_TOKEN:-}
- LOG_LEVEL=INFO
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:8001/health || exit 1"]
interval: 10s
timeout: 5s
retries: 5
celery-worker:
build:
context: .

View File

@@ -0,0 +1,7 @@
"""
Konstruct Channel Gateway.
Unified ingress for all messaging platforms. Each channel adapter normalizes
inbound events into KonstructMessage format before dispatching to the
Message Router / Celery orchestrator.
"""

View File

@@ -0,0 +1,6 @@
"""
Channel adapter modules.
Each adapter handles the channel-specific event format and normalizes
events into KonstructMessage before dispatching to the orchestrator.
"""

View File

@@ -0,0 +1,280 @@
"""
Slack channel adapter.
Handles Slack Events API events via slack-bolt AsyncApp.
EVENT FLOW:
1. Slack sends event to /slack/events (HTTP 200 must be returned in <3s)
2. Handler normalizes event -> KonstructMessage
3. Tenant resolved from workspace_id
4. Rate limit checked
5. Idempotency checked (prevents duplicate processing on Slack retries)
6. Placeholder "Thinking..." message posted in-thread (typing indicator)
7. Celery task dispatched with message + placeholder details
8. Celery worker calls LLM pool, replaces placeholder with real response
CRITICAL: DO NOT perform any LLM work inside event handlers.
Slack retries after 3 seconds if HTTP 200 is not received. All heavyweight
work must be dispatched to Celery before returning.
THREAD FOLLOW-UP:
After the first @mention in a thread, subsequent messages in that thread
(even without @mention) trigger a response for 30 minutes of idle time.
This is tracked via Redis engaged_thread_key with a 30-minute TTL.
"""
from __future__ import annotations
import logging
from redis.asyncio import Redis
from slack_bolt.async_app import AsyncApp
from sqlalchemy.ext.asyncio import AsyncSession
from gateway.normalize import normalize_slack_event
from router.idempotency import is_duplicate
from router.ratelimit import RateLimitExceeded, check_rate_limit
from router.tenant import resolve_tenant
from shared.redis_keys import engaged_thread_key
logger = logging.getLogger(__name__)
# How long a thread stays "engaged" after the last @mention (seconds)
_ENGAGED_THREAD_TTL = 1800 # 30 minutes
def register_slack_handlers(
slack_app: AsyncApp,
redis: Redis, # type: ignore[type-arg]
get_session: object, # Callable returning AsyncSession context manager
) -> None:
"""
Register Slack event handlers on the slack-bolt AsyncApp.
Call this once at application startup after creating the AsyncApp instance.
Args:
slack_app: The slack-bolt AsyncApp instance.
redis: Async Redis client for rate limiting + idempotency.
get_session: Async context manager factory for DB sessions.
Typically ``shared.db.async_session_factory``.
"""
@slack_app.event("app_mention")
async def handle_app_mention(event: dict, say: object, client: object) -> None:
"""
Handle @mention events in channels.
Called when a user @mentions the bot in any channel the bot belongs to.
"""
await _handle_slack_event(
event=event,
say=say,
client=client,
redis=redis,
get_session=get_session,
event_type="app_mention",
)
@slack_app.event("message")
async def handle_message(event: dict, say: object, client: object) -> None:
"""
Handle direct messages (DMs).
Filtered to channel_type=="im" only — ignores channel messages
to avoid double-processing @mentions.
"""
# Only handle DMs to prevent double-triggering alongside app_mention
if event.get("channel_type") != "im":
return
# Ignore bot messages to prevent infinite response loops
if event.get("bot_id") or event.get("subtype") == "bot_message":
return
await _handle_slack_event(
event=event,
say=say,
client=client,
redis=redis,
get_session=get_session,
event_type="dm",
)
async def _handle_slack_event(
event: dict,
say: object,
client: object,
redis: Redis, # type: ignore[type-arg]
get_session: object,
event_type: str,
) -> None:
"""
Shared handler logic for app_mention and DM message events.
Performs: normalize -> tenant resolve -> rate limit -> idempotency ->
post placeholder -> dispatch Celery -> mark thread engaged.
All work is dispatched to Celery before returning. HTTP 200 is returned
to Slack immediately by slack-bolt after this coroutine completes.
"""
# Ignore bot messages (double-check here for safety)
if event.get("bot_id") or event.get("subtype") == "bot_message":
return
# Extract workspace_id from the outer context — injected via middleware
# or extracted from the Slack signing payload.
workspace_id: str = event.get("_workspace_id", "")
bot_user_id: str = event.get("_bot_user_id", "")
# Step 1: Normalize to KonstructMessage
msg = normalize_slack_event(
event=event,
workspace_id=workspace_id,
bot_user_id=bot_user_id,
)
# Step 2: Resolve tenant from workspace_id
tenant_id: str | None = None
async with get_session() as session: # type: ignore[attr-defined]
tenant_id = await resolve_tenant(
workspace_id=workspace_id,
channel_type="slack",
session=session,
)
if tenant_id is None:
logger.warning(
"handle_slack_event: unknown workspace_id=%r event_type=%s — ignoring",
workspace_id,
event_type,
)
return
msg.tenant_id = tenant_id
# Step 3: Check rate limit — post ephemeral rejection if exceeded
try:
await check_rate_limit(
tenant_id=tenant_id,
channel="slack",
redis=redis,
)
except RateLimitExceeded as exc:
logger.info(
"Rate limit exceeded: tenant=%s — posting ephemeral rejection",
tenant_id,
)
# Post ephemeral message visible only to the requesting user
try:
await client.chat_postEphemeral( # type: ignore[union-attr]
channel=event.get("channel", ""),
user=event.get("user", ""),
text=(
f"I'm receiving too many requests right now. "
f"Please try again in about {exc.remaining_seconds} seconds."
),
)
except Exception:
logger.exception("Failed to post rate limit ephemeral message")
return
# Step 4: Idempotency check — skip duplicate events (Slack retry protection)
event_ts: str = event.get("ts", msg.id)
if await is_duplicate(tenant_id, event_ts, redis):
logger.debug(
"Duplicate Slack event: tenant=%s event_ts=%s — skipping",
tenant_id,
event_ts,
)
return
# Step 5: Check thread engagement — auto-follow messages in engaged threads
thread_id = msg.thread_id or event_ts
is_engaged = await _is_engaged_thread(tenant_id, thread_id, redis)
# For channel messages: only respond to @mentions or engaged threads
if event_type != "dm" and not is_engaged:
# This shouldn't happen for app_mention, but guard defensively
# (e.g., if the event somehow arrives without mention metadata)
pass # Fall through to dispatch — app_mention always warrants a response
# Step 6: Post placeholder "Thinking..." message in thread
channel_id: str = event.get("channel", "")
placeholder_ts: str = ""
try:
placeholder_resp = await client.chat_postMessage( # type: ignore[union-attr]
channel=channel_id,
thread_ts=thread_id,
text="_Thinking..._",
)
placeholder_ts = placeholder_resp.get("ts", "") # type: ignore[union-attr]
except Exception:
logger.exception(
"Failed to post placeholder message: tenant=%s channel=%s thread=%s",
tenant_id,
channel_id,
thread_id,
)
# Continue even if placeholder fails — still dispatch to Celery
# The Celery task will post a new message instead of updating
# Step 7: Dispatch to Celery (fire-and-forget)
# Import here to avoid circular imports at module load time
from orchestrator.tasks import handle_message as handle_message_task # noqa: PLC0415
task_payload = msg.model_dump() | {
"placeholder_ts": placeholder_ts,
"channel_id": channel_id,
}
try:
handle_message_task.delay(task_payload)
except Exception:
logger.exception(
"Failed to dispatch handle_message task: tenant=%s msg_id=%s",
tenant_id,
msg.id,
)
return
# Step 8: Mark thread as engaged (auto-follow for 30 minutes)
await _mark_thread_engaged(tenant_id, thread_id, redis)
logger.info(
"Dispatched: event_type=%s tenant=%s msg_id=%s thread=%s",
event_type,
tenant_id,
msg.id,
thread_id,
)
async def _is_engaged_thread(
tenant_id: str,
thread_id: str,
redis: Redis, # type: ignore[type-arg]
) -> bool:
"""Check if a thread is currently engaged (bot responded recently)."""
key = engaged_thread_key(tenant_id, thread_id)
try:
result = await redis.exists(key)
return bool(result)
except Exception:
logger.exception("Failed to check engaged thread: tenant=%s thread=%s", tenant_id, thread_id)
return False
async def _mark_thread_engaged(
tenant_id: str,
thread_id: str,
redis: Redis, # type: ignore[type-arg]
) -> None:
"""Mark a thread as engaged with a 30-minute TTL."""
key = engaged_thread_key(tenant_id, thread_id)
try:
await redis.set(key, "1", ex=_ENGAGED_THREAD_TTL)
except Exception:
logger.exception("Failed to mark thread engaged: tenant=%s thread=%s", tenant_id, thread_id)

View File

@@ -0,0 +1,107 @@
"""
Channel Gateway — FastAPI application.
Mounts the slack-bolt AsyncApp as a sub-application at /slack/events.
All other channels will be added as additional sub-applications in Phase 2.
Port: 8001
Endpoints:
POST /slack/events — Slack Events API webhook (handled by slack-bolt)
GET /health — Health check
Startup sequence:
1. Create Redis connection
2. Create slack-bolt AsyncApp (signing_secret=...)
3. Register Slack event handlers
4. Mount slack-bolt request handler at /slack/events
5. Expose /health
"""
from __future__ import annotations
import logging
from fastapi import FastAPI, Request, Response
from redis.asyncio import Redis
from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler
from slack_bolt.async_app import AsyncApp
from gateway.channels.slack import register_slack_handlers
from shared.config import settings
from shared.db import async_session_factory
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(
title="Konstruct Channel Gateway",
description="Unified ingress for all messaging platforms",
version="0.1.0",
)
# ---------------------------------------------------------------------------
# Slack bolt app — initialized at module import time.
# signing_secret="" is safe for local dev/testing; set via env in production.
# ---------------------------------------------------------------------------
slack_app = AsyncApp(
token=settings.slack_bot_token or None,
signing_secret=settings.slack_signing_secret or None,
# In HTTP mode (Events API), token_verification_enabled must be True
# slack-bolt validates signing_secret on every inbound request
)
# Async Redis client — shared across all request handlers
_redis: Redis | None = None # type: ignore[type-arg]
def _get_redis() -> Redis: # type: ignore[type-arg]
"""Return the module-level Redis client, creating it if necessary."""
global _redis
if _redis is None:
_redis = Redis.from_url(settings.redis_url, decode_responses=True)
return _redis
# ---------------------------------------------------------------------------
# Register Slack event handlers
# ---------------------------------------------------------------------------
register_slack_handlers(
slack_app=slack_app,
redis=_get_redis(),
get_session=async_session_factory,
)
# ---------------------------------------------------------------------------
# Slack request handler — adapts slack-bolt AsyncApp to FastAPI
# ---------------------------------------------------------------------------
slack_handler = AsyncSlackRequestHandler(slack_app)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.post("/slack/events")
async def slack_events(request: Request) -> Response:
"""
Slack Events API webhook endpoint.
slack-bolt's AsyncSlackRequestHandler handles:
- Slack signature verification (X-Slack-Signature)
- URL verification challenge (type=url_verification)
- Event routing to registered handlers
CRITICAL: This endpoint MUST return HTTP 200 within 3 seconds.
All LLM/heavy work is dispatched to Celery inside the event handlers.
"""
return await slack_handler.handle(request)
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok", "service": "gateway"}

View File

@@ -0,0 +1,100 @@
"""
Slack event normalization.
Converts Slack Events API payloads into KonstructMessage format.
All channel adapters produce KonstructMessage — the router and orchestrator
never inspect Slack-specific fields directly.
"""
from __future__ import annotations
import re
import uuid
from datetime import datetime, timezone
from shared.models.message import (
ChannelType,
KonstructMessage,
MessageContent,
SenderInfo,
)
# Pattern to strip <@BOT_USER_ID> mentions from message text.
# Slack injects <@U...> tokens for @mentions — we strip the bot mention
# so the agent sees clean user text, not the mention syntax.
_BOT_MENTION_RE = re.compile(r"<@[A-Z0-9]+>")
def normalize_slack_event(
event: dict,
workspace_id: str,
bot_user_id: str = "",
) -> KonstructMessage:
"""
Normalize a Slack Events API event payload into a KonstructMessage.
Handles both ``app_mention`` events (where the bot is @mentioned in a
channel) and ``message`` events in DMs (``channel_type == "im"``).
The bot mention token (``<@BOT_USER_ID>``) is stripped from the beginning
of the text for ``app_mention`` events so the agent receives clean input.
Args:
event: The inner ``event`` dict from the Slack Events API payload.
workspace_id: The Slack workspace ID (team_id from the outer payload).
bot_user_id: The bot's Slack user ID (used for mention stripping).
Returns:
A fully-populated KonstructMessage. ``tenant_id`` is ``None`` at this
stage — the Message Router populates it via channel_connections lookup.
"""
# Extract and clean user text
raw_text: str = event.get("text", "") or ""
# Strip any <@BOT_ID> mention tokens from the message
clean_text = _BOT_MENTION_RE.sub("", raw_text).strip()
# Slack thread_ts is the canonical thread identifier
thread_ts: str | None = event.get("thread_ts") or event.get("ts")
# Timestamp — Slack uses Unix float strings ("1234567890.123456")
ts_raw = event.get("ts", "0")
try:
ts_float = float(ts_raw)
timestamp = datetime.fromtimestamp(ts_float, tz=timezone.utc)
except (ValueError, TypeError):
timestamp = datetime.now(tz=timezone.utc)
# User info — Slack provides user_id; display name is enriched later
sender_user_id: str = event.get("user", "") or ""
is_bot = bool(event.get("bot_id") or event.get("subtype") == "bot_message")
# Build the set of mentions present in the original text
mentions: list[str] = _BOT_MENTION_RE.findall(raw_text)
# Strip angle brackets from extracted tokens: <@U123> -> U123
mentions = [m.strip("<>@") for m in mentions]
return KonstructMessage(
id=str(uuid.uuid4()),
tenant_id=None, # Populated by Message Router
channel=ChannelType.SLACK,
channel_metadata={
"workspace_id": workspace_id,
"channel_id": event.get("channel", ""),
"thread_ts": thread_ts,
"bot_user_id": bot_user_id,
"event_ts": ts_raw,
"channel_type": event.get("channel_type", ""),
},
sender=SenderInfo(
user_id=sender_user_id,
display_name=sender_user_id, # Enriched later if needed
is_bot=is_bot,
),
content=MessageContent(
text=clean_text,
mentions=mentions,
),
timestamp=timestamp,
thread_id=thread_ts,
)

View File

@@ -0,0 +1,64 @@
"""
Slack request signature verification.
slack-bolt's AsyncApp handles signature verification automatically when
initialized with a signing_secret. This module provides a standalone
helper for contexts that require manual verification (e.g., testing,
custom middleware layers).
In production, prefer slack-bolt's built-in verification — do NOT disable
it or bypass it.
"""
from __future__ import annotations
import hashlib
import hmac
import time
def verify_slack_signature(
body: bytes,
timestamp: str,
signature: str,
signing_secret: str,
max_age_seconds: int = 300,
) -> bool:
"""
Verify a Slack webhook request signature.
Implements Slack's signing secret verification algorithm:
https://api.slack.com/authentication/verifying-requests-from-slack
Args:
body: Raw request body bytes.
timestamp: Value of the ``X-Slack-Request-Timestamp`` header.
signature: Value of the ``X-Slack-Signature`` header.
signing_secret: App's signing secret from Slack dashboard.
max_age_seconds: Reject requests older than this (replay protection).
Returns:
True if signature is valid and request is fresh, False otherwise.
"""
# Replay attack prevention — reject stale requests
try:
request_age = abs(int(time.time()) - int(timestamp))
except (ValueError, TypeError):
return False
if request_age > max_age_seconds:
return False
# Compute expected signature
sig_basestring = f"v0:{timestamp}:{body.decode('utf-8', errors='replace')}"
computed = (
"v0="
+ hmac.new(
signing_secret.encode("utf-8"),
sig_basestring.encode("utf-8"),
hashlib.sha256,
).hexdigest()
)
# Constant-time comparison to prevent timing attacks
return hmac.compare_digest(computed, signature)

View File

@@ -9,14 +9,19 @@ description = "Channel Gateway — unified ingress for all messaging platforms"
requires-python = ">=3.12"
dependencies = [
"konstruct-shared",
"konstruct-router",
"konstruct-orchestrator",
"fastapi[standard]>=0.115.0",
"slack-bolt>=1.22.0",
"python-telegram-bot>=21.0",
"httpx>=0.28.0",
"redis>=5.0.0",
]
[tool.uv.sources]
konstruct-shared = { workspace = true }
konstruct-router = { workspace = true }
konstruct-orchestrator = { workspace = true }
[tool.hatch.build.targets.wheel]
packages = ["gateway"]

View File

@@ -34,16 +34,26 @@ def handle_message(self, message_data: dict) -> dict: # type: ignore[no-untyped
Process an inbound Konstruct message through the agent pipeline.
This task is the primary entry point for the Celery worker. It is dispatched
by the Message Router (or Channel Gateway in simple deployments) after tenant
resolution completes.
by the Channel Gateway after tenant resolution completes.
The ``message_data`` dict MAY contain extra keys beyond KonstructMessage
fields. Specifically, the Slack handler injects:
- ``placeholder_ts``: Slack message timestamp of the "Thinking..." placeholder
- ``channel_id``: Slack channel ID where the response should be posted
These are extracted before KonstructMessage validation and used to update
the placeholder with the real LLM response via chat.update.
Pipeline:
1. Deserialize message_data -> KonstructMessage
2. Run async agent pipeline via asyncio.run()
3. Return response dict
1. Extract Slack reply metadata (placeholder_ts, channel_id) if present
2. Deserialize message_data -> KonstructMessage
3. Run async agent pipeline via asyncio.run()
4. If Slack metadata present: call chat.update to replace placeholder
5. Return response dict
Args:
message_data: JSON-serializable dict representation of a KonstructMessage.
message_data: JSON-serializable dict. Must contain KonstructMessage
fields plus optional ``placeholder_ts`` and ``channel_id``.
Returns:
Dict with keys:
@@ -51,25 +61,40 @@ def handle_message(self, message_data: dict) -> dict: # type: ignore[no-untyped
- response (str): Agent's response text
- tenant_id (str | None): Tenant that handled the message
"""
# Extract Slack-specific reply metadata before model validation
# (KonstructMessage doesn't know about these fields)
placeholder_ts: str = message_data.pop("placeholder_ts", "") or ""
channel_id: str = message_data.pop("channel_id", "") or ""
try:
msg = KonstructMessage.model_validate(message_data)
except Exception as exc:
logger.exception("Failed to deserialize KonstructMessage: %s", message_data)
raise self.retry(exc=exc)
result = asyncio.run(_process_message(msg))
result = asyncio.run(_process_message(msg, placeholder_ts=placeholder_ts, channel_id=channel_id))
return result
async def _process_message(msg: KonstructMessage) -> dict:
async def _process_message(
msg: KonstructMessage,
placeholder_ts: str = "",
channel_id: str = "",
) -> dict:
"""
Async agent pipeline — load agent config, build prompt, call LLM pool.
After getting the LLM response, if Slack placeholder metadata is present,
updates the "Thinking..." placeholder message with the real response using
Slack's chat.update API.
This function is called from the synchronous handle_message task via
asyncio.run(). It must not be called directly from Celery task code.
Args:
msg: The deserialized KonstructMessage.
msg: The deserialized KonstructMessage.
placeholder_ts: Slack message timestamp of the "Thinking..." placeholder.
channel_id: Slack channel ID for the chat.update call.
Returns:
Dict with message_id, response, and tenant_id.
@@ -94,6 +119,8 @@ async def _process_message(msg: KonstructMessage) -> dict:
tenant_uuid = uuid.UUID(msg.tenant_id)
token = current_tenant_id.set(tenant_uuid)
slack_bot_token: str = ""
try:
agent: Agent | None = None
async with async_session_factory() as session:
@@ -107,6 +134,21 @@ async def _process_message(msg: KonstructMessage) -> dict:
)
result = await session.execute(stmt)
agent = result.scalars().first()
# Load the bot token for this tenant from channel_connections config
if agent is not None and placeholder_ts and channel_id:
from shared.models.tenant import ChannelConnection, ChannelTypeEnum
conn_stmt = (
select(ChannelConnection)
.where(ChannelConnection.tenant_id == tenant_uuid)
.where(ChannelConnection.channel_type == ChannelTypeEnum.SLACK)
.limit(1)
)
conn_result = await session.execute(conn_stmt)
conn = conn_result.scalars().first()
if conn and conn.config:
slack_bot_token = conn.config.get("bot_token", "")
finally:
# Always reset the RLS context var after DB work is done
current_tenant_id.reset(token)
@@ -117,9 +159,17 @@ async def _process_message(msg: KonstructMessage) -> dict:
msg.tenant_id,
msg.id,
)
no_agent_response = "No active agent is configured for your workspace. Please contact your administrator."
if placeholder_ts and channel_id:
await _update_slack_placeholder(
bot_token=slack_bot_token,
channel_id=channel_id,
placeholder_ts=placeholder_ts,
text=no_agent_response,
)
return {
"message_id": msg.id,
"response": "No active agent is configured for your workspace. Please contact your administrator.",
"response": no_agent_response,
"tenant_id": msg.tenant_id,
}
@@ -132,8 +182,78 @@ async def _process_message(msg: KonstructMessage) -> dict:
msg.tenant_id,
)
# Replace the "Thinking..." placeholder with the real response
if placeholder_ts and channel_id:
await _update_slack_placeholder(
bot_token=slack_bot_token,
channel_id=channel_id,
placeholder_ts=placeholder_ts,
text=response_text,
)
return {
"message_id": msg.id,
"response": response_text,
"tenant_id": msg.tenant_id,
}
async def _update_slack_placeholder(
bot_token: str,
channel_id: str,
placeholder_ts: str,
text: str,
) -> None:
"""
Replace the "Thinking..." placeholder message with the real agent response.
Uses Slack's chat.update API via httpx (no slack-bolt dependency in
orchestrator — keeps the service boundary clean).
Per user decision: responses are always posted in threads (thread_ts is
set to placeholder_ts — the placeholder was posted in-thread).
Args:
bot_token: Slack bot token (xoxb-...) for this tenant.
channel_id: Slack channel ID where the placeholder was posted.
placeholder_ts: Slack message timestamp of the placeholder to replace.
text: The real LLM response to replace the placeholder with.
"""
import httpx
if not bot_token:
# No bot token available — cannot update via Slack API.
# This happens when channel_connections has no bot_token in config.
# Log and continue — the placeholder will remain as "Thinking...".
logger.warning(
"No Slack bot token for channel=%s placeholder_ts=%s — cannot update placeholder",
channel_id,
placeholder_ts,
)
return
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
response = await client.post(
"https://slack.com/api/chat.update",
headers={"Authorization": f"Bearer {bot_token}"},
json={
"channel": channel_id,
"ts": placeholder_ts,
"text": text,
},
)
data = response.json()
if not data.get("ok"):
logger.error(
"chat.update failed: channel=%s ts=%s error=%r",
channel_id,
placeholder_ts,
data.get("error"),
)
except Exception:
logger.exception(
"Failed to update Slack placeholder: channel=%s ts=%s",
channel_id,
placeholder_ts,
)

View File

@@ -11,6 +11,7 @@ dependencies = [
"konstruct-shared",
"fastapi[standard]>=0.115.0",
"httpx>=0.28.0",
"redis>=5.0.0",
]
[tool.uv.sources]

View File

@@ -0,0 +1,6 @@
"""
Konstruct Message Router.
Handles tenant resolution, rate limiting, idempotency deduplication,
and context loading before dispatching to the Agent Orchestrator.
"""

View File

@@ -0,0 +1,76 @@
"""
Agent context loading.
Loads the active agent for a tenant before message processing. Phase 1 supports
a single agent per tenant. The RLS context variable must be set before calling
any function here so that PostgreSQL RLS filters correctly.
"""
from __future__ import annotations
import logging
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.tenant import Agent
from shared.rls import current_tenant_id
logger = logging.getLogger(__name__)
async def load_agent_for_tenant(
tenant_id: str,
session: AsyncSession,
) -> Agent | None:
"""
Load the active agent for a tenant.
Sets the ``current_tenant_id`` ContextVar so that PostgreSQL RLS policies
correctly filter the agents table to only return rows belonging to this
tenant.
Phase 1: Returns the first active agent for the tenant (single-agent model).
Phase 2+: Will support agent selection based on message content and routing
rules.
Args:
tenant_id: Konstruct tenant ID as a UUID string.
session: Async SQLAlchemy session.
Returns:
The active Agent ORM instance, or None if no active agent is configured.
"""
try:
tenant_uuid = uuid.UUID(tenant_id)
except (ValueError, AttributeError):
logger.error("load_agent_for_tenant: invalid tenant_id=%r", tenant_id)
return None
# Set RLS context so the DB query is correctly scoped to this tenant
token = current_tenant_id.set(tenant_uuid)
try:
stmt = (
select(Agent)
.where(Agent.tenant_id == tenant_uuid)
.where(Agent.is_active.is_(True))
.limit(1)
)
result = await session.execute(stmt)
agent = result.scalars().first()
except Exception:
logger.exception(
"load_agent_for_tenant: DB error for tenant=%s", tenant_id
)
return None
finally:
# Always reset the RLS context var after DB work completes
current_tenant_id.reset(token)
if agent is None:
logger.warning(
"load_agent_for_tenant: no active agent for tenant=%s", tenant_id
)
return agent

View File

@@ -0,0 +1,87 @@
"""
Message deduplication (idempotency).
Slack (and other channels) retry event delivery when the gateway does not
respond with HTTP 200 within 3 seconds. This module tracks which message
IDs have already been dispatched to Celery, preventing duplicate processing.
Design:
- Key: {tenant_id}:dedup:{message_id} (from shared.redis_keys)
- TTL: 24 hours (Slack retries stop after ~1 hour; 24h is conservative)
- Op: SET NX (atomic check-and-set)
"""
from __future__ import annotations
import logging
from redis.asyncio import Redis
from shared.redis_keys import idempotency_key
logger = logging.getLogger(__name__)
# How long to remember a message ID (seconds).
# Slack retries for up to ~1 hour; 24h gives plenty of buffer.
_DEDUP_TTL_SECONDS = 86400 # 24 hours
async def is_duplicate(
tenant_id: str,
message_id: str,
redis: Redis, # type: ignore[type-arg]
) -> bool:
"""
Check if this message has already been dispatched for processing.
Uses SET NX (set-if-not-exists) as an atomic check-and-mark operation.
If the key did not exist, it is created with a 24-hour TTL and this
function returns False (not a duplicate — process it).
If the key already existed, this function returns True (duplicate — skip).
Args:
tenant_id: Konstruct tenant identifier.
message_id: Unique message identifier (e.g. Slack event_ts or UUID).
redis: Async Redis client.
Returns:
True if this message is a duplicate (already dispatched).
False if this is the first time we've seen this message.
"""
key = idempotency_key(tenant_id, message_id)
# SET key "1" NX EX ttl — returns True if key was set (new), None if key existed
was_set = await redis.set(key, "1", nx=True, ex=_DEDUP_TTL_SECONDS)
if was_set:
# Key was freshly created — this is NOT a duplicate
return False
# Key already existed — this IS a duplicate
logger.info(
"Duplicate message detected: tenant=%s message_id=%s — skipping",
tenant_id,
message_id,
)
return True
async def mark_processed(
tenant_id: str,
message_id: str,
redis: Redis, # type: ignore[type-arg]
) -> None:
"""
Explicitly mark a message as processed (without the duplicate check).
Use this when you want to mark a message as seen without the
check-and-mark semantics of ``is_duplicate``. Typically you'll use
``is_duplicate`` instead (which does both).
Args:
tenant_id: Konstruct tenant identifier.
message_id: Unique message identifier.
redis: Async Redis client.
"""
key = idempotency_key(tenant_id, message_id)
await redis.set(key, "1", ex=_DEDUP_TTL_SECONDS)

View File

@@ -0,0 +1,24 @@
"""
Message Router — FastAPI application.
The router is an internal service. In the current architecture (Phase 1),
routing logic is embedded directly in the channel gateway handlers rather
than as a separate HTTP call. This FastAPI app provides a health endpoint
and is a placeholder for future standalone router deployments.
"""
from __future__ import annotations
from fastapi import FastAPI
app = FastAPI(
title="Konstruct Message Router",
description="Tenant resolution, rate limiting, context loading",
version="0.1.0",
)
@app.get("/health")
async def health() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok", "service": "router"}

View File

@@ -0,0 +1,121 @@
"""
Redis token bucket rate limiter.
Implements a sliding window token bucket using Redis atomic operations.
Design:
- Key: {tenant_id}:ratelimit:{channel} (from shared.redis_keys)
- Window: configurable (default 60s)
- Tokens: configurable (default 30 per window per tenant per channel)
- Storage: INCR + EXPIRE (atomic via pipeline)
The token bucket approach:
1. INCR the counter key
2. If count == 1, set EXPIRE (first request in window — starts the clock)
3. If count > limit: raise RateLimitExceeded
4. Otherwise: return True (request allowed)
This is NOT a sliding window (it's a fixed window with INCR/EXPIRE) — it's
simple, Redis-atomic, and correct enough for Phase 1. A true sliding window
can be implemented with ZADD/ZREMRANGEBYSCORE later if needed.
"""
from __future__ import annotations
import logging
from redis.asyncio import Redis
from shared.redis_keys import rate_limit_key
logger = logging.getLogger(__name__)
# Default rate limit configuration — override per-tenant in Phase 2
_DEFAULT_LIMIT = 30 # Max requests per window
_DEFAULT_WINDOW = 60 # Window duration in seconds
class RateLimitExceeded(Exception):
"""
Raised when a tenant's per-channel rate limit is exceeded.
Attributes:
tenant_id: The tenant that exceeded the limit.
channel: The channel that hit the limit.
remaining_seconds: Approximate TTL on the rate limit key (how long
until the window resets).
"""
def __init__(
self,
tenant_id: str,
channel: str,
remaining_seconds: int = 60,
) -> None:
self.tenant_id = tenant_id
self.channel = channel
self.remaining_seconds = remaining_seconds
super().__init__(
f"Rate limit exceeded for tenant={tenant_id} channel={channel}. "
f"Resets in ~{remaining_seconds}s."
)
async def check_rate_limit(
tenant_id: str,
channel: str,
redis: Redis, # type: ignore[type-arg]
limit: int = _DEFAULT_LIMIT,
window_seconds: int = _DEFAULT_WINDOW,
) -> bool:
"""
Check whether the tenant-channel combination is within its rate limit.
Uses an atomic INCR + EXPIRE pipeline. On the first request in a new
window the counter is set and the TTL clock starts. Subsequent requests
increment the counter; once it exceeds ``limit``, RateLimitExceeded is
raised with the remaining window TTL.
Args:
tenant_id: Konstruct tenant identifier.
channel: Channel string (e.g. "slack").
redis: Async Redis client.
limit: Maximum requests per window (default 30).
window_seconds: Window duration in seconds (default 60).
Returns:
True if the request is allowed.
Raises:
RateLimitExceeded: If the request exceeds the limit.
"""
key = rate_limit_key(tenant_id, channel)
# Atomic pipeline: INCR then conditional EXPIRE
pipe = redis.pipeline(transaction=True)
pipe.incr(key)
pipe.ttl(key)
results = await pipe.execute()
count: int = results[0]
ttl: int = results[1]
# If TTL is -1, the key exists but has no expiry — set one now.
# This handles the case where INCR created the key but EXPIRE wasn't set yet.
if ttl == -1 or count == 1:
await redis.expire(key, window_seconds)
ttl = window_seconds
if count > limit:
remaining = max(ttl, 0)
logger.warning(
"Rate limit exceeded: tenant=%s channel=%s count=%d limit=%d ttl=%d",
tenant_id,
channel,
count,
limit,
remaining,
)
raise RateLimitExceeded(tenant_id, channel, remaining_seconds=remaining)
return True

View File

@@ -0,0 +1,102 @@
"""
Tenant resolution — maps channel workspace IDs to Konstruct tenant IDs.
This is the ONE pre-RLS query in the system. Tenant resolution must work
across all tenants because we don't know which tenant owns a message until
after we resolve it. The query bypasses RLS by using the admin/superuser
connection for this specific lookup only.
Design:
- Query `channel_connections` for matching workspace_id + channel_type
- Returns the tenant_id UUID as a string, or None if not found
- Uses a raw SELECT without RLS context (intentional — pre-resolution)
"""
from __future__ import annotations
import logging
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.message import ChannelType
from shared.models.tenant import ChannelConnection, ChannelTypeEnum
logger = logging.getLogger(__name__)
# Map ChannelType (StrEnum from message.py) to ChannelTypeEnum (ORM enum from tenant.py)
_CHANNEL_TYPE_MAP: dict[str, ChannelTypeEnum] = {
"slack": ChannelTypeEnum.SLACK,
"whatsapp": ChannelTypeEnum.WHATSAPP,
"mattermost": ChannelTypeEnum.MATTERMOST,
"rocketchat": ChannelTypeEnum.ROCKETCHAT,
"teams": ChannelTypeEnum.TEAMS,
"telegram": ChannelTypeEnum.TELEGRAM,
"signal": ChannelTypeEnum.SIGNAL,
}
async def resolve_tenant(
workspace_id: str,
channel_type: ChannelType | str,
session: AsyncSession,
) -> str | None:
"""
Resolve a channel workspace ID to a Konstruct tenant ID.
This is deliberately a RLS-bypass query — we cannot know which tenant to
set in `app.current_tenant` until after we resolve the tenant. The session
passed here should use the admin connection (postgres superuser) or the
konstruct_app role with RLS disabled for this specific query.
In practice, for this single lookup, we disable the RLS SET LOCAL by
temporarily not setting `current_tenant_id` — the ContextVar defaults to
None, so the RLS hook does not inject SET LOCAL, and the query sees all
rows in `channel_connections`.
Args:
workspace_id: Channel-native workspace identifier (e.g. Slack T12345).
channel_type: Channel type as ChannelType enum or string.
session: Async SQLAlchemy session.
Returns:
Tenant ID as a string (UUID), or None if no matching connection found.
"""
channel_str = str(channel_type).lower()
orm_channel = _CHANNEL_TYPE_MAP.get(channel_str)
if orm_channel is None:
logger.warning("resolve_tenant: unknown channel_type=%r", channel_type)
return None
try:
# Bypass RLS for this query — disable RLS row filtering at the session level
# by setting app.current_tenant to empty (no policy match = all rows visible
# to konstruct_app for SELECT on channel_connections).
# We use a raw SET LOCAL here to ensure the tenant policy is not applied.
await session.execute(text("SET LOCAL app.current_tenant = ''"))
stmt = (
select(ChannelConnection.tenant_id)
.where(ChannelConnection.channel_type == orm_channel)
.where(ChannelConnection.workspace_id == workspace_id)
.limit(1)
)
result = await session.execute(stmt)
row = result.scalar_one_or_none()
except Exception:
logger.exception(
"resolve_tenant: DB error workspace_id=%r channel=%r",
workspace_id,
channel_type,
)
return None
if row is None:
logger.debug(
"resolve_tenant: no match workspace_id=%r channel=%r",
workspace_id,
channel_type,
)
return None
return str(row)

8
uv.lock generated
View File

@@ -1167,8 +1167,11 @@ source = { editable = "packages/gateway" }
dependencies = [
{ name = "fastapi", extra = ["standard"] },
{ name = "httpx" },
{ name = "konstruct-orchestrator" },
{ name = "konstruct-router" },
{ name = "konstruct-shared" },
{ name = "python-telegram-bot" },
{ name = "redis" },
{ name = "slack-bolt" },
]
@@ -1176,8 +1179,11 @@ dependencies = [
requires-dist = [
{ name = "fastapi", extras = ["standard"], specifier = ">=0.115.0" },
{ name = "httpx", specifier = ">=0.28.0" },
{ name = "konstruct-orchestrator", editable = "packages/orchestrator" },
{ name = "konstruct-router", editable = "packages/router" },
{ name = "konstruct-shared", editable = "packages/shared" },
{ name = "python-telegram-bot", specifier = ">=21.0" },
{ name = "redis", specifier = ">=5.0.0" },
{ name = "slack-bolt", specifier = ">=1.22.0" },
]
@@ -1227,6 +1233,7 @@ dependencies = [
{ name = "fastapi", extra = ["standard"] },
{ name = "httpx" },
{ name = "konstruct-shared" },
{ name = "redis" },
]
[package.metadata]
@@ -1234,6 +1241,7 @@ requires-dist = [
{ name = "fastapi", extras = ["standard"], specifier = ">=0.115.0" },
{ name = "httpx", specifier = ">=0.28.0" },
{ name = "konstruct-shared", editable = "packages/shared" },
{ name = "redis", specifier = ">=5.0.0" },
]
[[package]]