Files
konstruct/packages/router/router/context.py
Adolfo Delorenzo 6f30705e1a feat(01-03): Channel Gateway (Slack adapter) and Message Router
- gateway/normalize.py: normalize_slack_event -> KonstructMessage (strips bot mention)
- gateway/channels/slack.py: register_slack_handlers for app_mention + DM events
  - rate limit check -> ephemeral rejection on exceeded
  - idempotency dedup (Slack retry protection)
  - placeholder 'Thinking...' message posted in-thread before Celery dispatch
  - auto-follow engaged threads with 30-minute TTL
  - HTTP 200 returned immediately; all LLM work dispatched to Celery
- gateway/main.py: FastAPI on port 8001, /slack/events + /health
- router/tenant.py: resolve_tenant workspace_id -> tenant_id (RLS-bypass query)
- router/ratelimit.py: check_rate_limit Redis token bucket, RateLimitExceeded exception
- router/idempotency.py: is_duplicate + mark_processed (SET NX, 24h TTL)
- router/context.py: load_agent_for_tenant with RLS ContextVar setup
- orchestrator/tasks.py: handle_message now extracts placeholder_ts/channel_id,
  calls _update_slack_placeholder via chat.update after LLM response
- docker-compose.yml: gateway service on port 8001
- pyproject.toml: added redis, konstruct-router, konstruct-orchestrator deps
2026-03-23 10:27:59 -06:00

77 lines
2.2 KiB
Python

"""
Agent context loading.
Loads the active agent for a tenant before message processing. Phase 1 supports
a single agent per tenant. The RLS context variable must be set before calling
any function here so that PostgreSQL RLS filters correctly.
"""
from __future__ import annotations
import logging
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.tenant import Agent
from shared.rls import current_tenant_id
logger = logging.getLogger(__name__)
async def load_agent_for_tenant(
tenant_id: str,
session: AsyncSession,
) -> Agent | None:
"""
Load the active agent for a tenant.
Sets the ``current_tenant_id`` ContextVar so that PostgreSQL RLS policies
correctly filter the agents table to only return rows belonging to this
tenant.
Phase 1: Returns the first active agent for the tenant (single-agent model).
Phase 2+: Will support agent selection based on message content and routing
rules.
Args:
tenant_id: Konstruct tenant ID as a UUID string.
session: Async SQLAlchemy session.
Returns:
The active Agent ORM instance, or None if no active agent is configured.
"""
try:
tenant_uuid = uuid.UUID(tenant_id)
except (ValueError, AttributeError):
logger.error("load_agent_for_tenant: invalid tenant_id=%r", tenant_id)
return None
# Set RLS context so the DB query is correctly scoped to this tenant
token = current_tenant_id.set(tenant_uuid)
try:
stmt = (
select(Agent)
.where(Agent.tenant_id == tenant_uuid)
.where(Agent.is_active.is_(True))
.limit(1)
)
result = await session.execute(stmt)
agent = result.scalars().first()
except Exception:
logger.exception(
"load_agent_for_tenant: DB error for tenant=%s", tenant_id
)
return None
finally:
# Always reset the RLS context var after DB work completes
current_tenant_id.reset(token)
if agent is None:
logger.warning(
"load_agent_for_tenant: no active agent for tenant=%s", tenant_id
)
return agent