- gateway/normalize.py: normalize_slack_event -> KonstructMessage (strips bot mention) - gateway/channels/slack.py: register_slack_handlers for app_mention + DM events - rate limit check -> ephemeral rejection on exceeded - idempotency dedup (Slack retry protection) - placeholder 'Thinking...' message posted in-thread before Celery dispatch - auto-follow engaged threads with 30-minute TTL - HTTP 200 returned immediately; all LLM work dispatched to Celery - gateway/main.py: FastAPI on port 8001, /slack/events + /health - router/tenant.py: resolve_tenant workspace_id -> tenant_id (RLS-bypass query) - router/ratelimit.py: check_rate_limit Redis token bucket, RateLimitExceeded exception - router/idempotency.py: is_duplicate + mark_processed (SET NX, 24h TTL) - router/context.py: load_agent_for_tenant with RLS ContextVar setup - orchestrator/tasks.py: handle_message now extracts placeholder_ts/channel_id, calls _update_slack_placeholder via chat.update after LLM response - docker-compose.yml: gateway service on port 8001 - pyproject.toml: added redis, konstruct-router, konstruct-orchestrator deps
77 lines
2.2 KiB
Python
77 lines
2.2 KiB
Python
"""
|
|
Agent context loading.
|
|
|
|
Loads the active agent for a tenant before message processing. Phase 1 supports
|
|
a single agent per tenant. The RLS context variable must be set before calling
|
|
any function here so that PostgreSQL RLS filters correctly.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.models.tenant import Agent
|
|
from shared.rls import current_tenant_id
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def load_agent_for_tenant(
|
|
tenant_id: str,
|
|
session: AsyncSession,
|
|
) -> Agent | None:
|
|
"""
|
|
Load the active agent for a tenant.
|
|
|
|
Sets the ``current_tenant_id`` ContextVar so that PostgreSQL RLS policies
|
|
correctly filter the agents table to only return rows belonging to this
|
|
tenant.
|
|
|
|
Phase 1: Returns the first active agent for the tenant (single-agent model).
|
|
Phase 2+: Will support agent selection based on message content and routing
|
|
rules.
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant ID as a UUID string.
|
|
session: Async SQLAlchemy session.
|
|
|
|
Returns:
|
|
The active Agent ORM instance, or None if no active agent is configured.
|
|
"""
|
|
try:
|
|
tenant_uuid = uuid.UUID(tenant_id)
|
|
except (ValueError, AttributeError):
|
|
logger.error("load_agent_for_tenant: invalid tenant_id=%r", tenant_id)
|
|
return None
|
|
|
|
# Set RLS context so the DB query is correctly scoped to this tenant
|
|
token = current_tenant_id.set(tenant_uuid)
|
|
try:
|
|
stmt = (
|
|
select(Agent)
|
|
.where(Agent.tenant_id == tenant_uuid)
|
|
.where(Agent.is_active.is_(True))
|
|
.limit(1)
|
|
)
|
|
result = await session.execute(stmt)
|
|
agent = result.scalars().first()
|
|
except Exception:
|
|
logger.exception(
|
|
"load_agent_for_tenant: DB error for tenant=%s", tenant_id
|
|
)
|
|
return None
|
|
finally:
|
|
# Always reset the RLS context var after DB work completes
|
|
current_tenant_id.reset(token)
|
|
|
|
if agent is None:
|
|
logger.warning(
|
|
"load_agent_for_tenant: no active agent for tenant=%s", tenant_id
|
|
)
|
|
|
|
return agent
|