Files
konstruct/packages/router/router/tenant.py
Adolfo Delorenzo 6f30705e1a feat(01-03): Channel Gateway (Slack adapter) and Message Router
- gateway/normalize.py: normalize_slack_event -> KonstructMessage (strips bot mention)
- gateway/channels/slack.py: register_slack_handlers for app_mention + DM events
  - rate limit check -> ephemeral rejection on exceeded
  - idempotency dedup (Slack retry protection)
  - placeholder 'Thinking...' message posted in-thread before Celery dispatch
  - auto-follow engaged threads with 30-minute TTL
  - HTTP 200 returned immediately; all LLM work dispatched to Celery
- gateway/main.py: FastAPI on port 8001, /slack/events + /health
- router/tenant.py: resolve_tenant workspace_id -> tenant_id (RLS-bypass query)
- router/ratelimit.py: check_rate_limit Redis token bucket, RateLimitExceeded exception
- router/idempotency.py: is_duplicate + mark_processed (SET NX, 24h TTL)
- router/context.py: load_agent_for_tenant with RLS ContextVar setup
- orchestrator/tasks.py: handle_message now extracts placeholder_ts/channel_id,
  calls _update_slack_placeholder via chat.update after LLM response
- docker-compose.yml: gateway service on port 8001
- pyproject.toml: added redis, konstruct-router, konstruct-orchestrator deps
2026-03-23 10:27:59 -06:00

103 lines
3.6 KiB
Python

"""
Tenant resolution — maps channel workspace IDs to Konstruct tenant IDs.
This is the ONE pre-RLS query in the system. Tenant resolution must work
across all tenants because we don't know which tenant owns a message until
after we resolve it. The query bypasses RLS by using the admin/superuser
connection for this specific lookup only.
Design:
- Query `channel_connections` for matching workspace_id + channel_type
- Returns the tenant_id UUID as a string, or None if not found
- Uses a raw SELECT without RLS context (intentional — pre-resolution)
"""
from __future__ import annotations
import logging
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.message import ChannelType
from shared.models.tenant import ChannelConnection, ChannelTypeEnum
logger = logging.getLogger(__name__)
# Map ChannelType (StrEnum from message.py) to ChannelTypeEnum (ORM enum from tenant.py)
_CHANNEL_TYPE_MAP: dict[str, ChannelTypeEnum] = {
"slack": ChannelTypeEnum.SLACK,
"whatsapp": ChannelTypeEnum.WHATSAPP,
"mattermost": ChannelTypeEnum.MATTERMOST,
"rocketchat": ChannelTypeEnum.ROCKETCHAT,
"teams": ChannelTypeEnum.TEAMS,
"telegram": ChannelTypeEnum.TELEGRAM,
"signal": ChannelTypeEnum.SIGNAL,
}
async def resolve_tenant(
workspace_id: str,
channel_type: ChannelType | str,
session: AsyncSession,
) -> str | None:
"""
Resolve a channel workspace ID to a Konstruct tenant ID.
This is deliberately a RLS-bypass query — we cannot know which tenant to
set in `app.current_tenant` until after we resolve the tenant. The session
passed here should use the admin connection (postgres superuser) or the
konstruct_app role with RLS disabled for this specific query.
In practice, for this single lookup, we disable the RLS SET LOCAL by
temporarily not setting `current_tenant_id` — the ContextVar defaults to
None, so the RLS hook does not inject SET LOCAL, and the query sees all
rows in `channel_connections`.
Args:
workspace_id: Channel-native workspace identifier (e.g. Slack T12345).
channel_type: Channel type as ChannelType enum or string.
session: Async SQLAlchemy session.
Returns:
Tenant ID as a string (UUID), or None if no matching connection found.
"""
channel_str = str(channel_type).lower()
orm_channel = _CHANNEL_TYPE_MAP.get(channel_str)
if orm_channel is None:
logger.warning("resolve_tenant: unknown channel_type=%r", channel_type)
return None
try:
# Bypass RLS for this query — disable RLS row filtering at the session level
# by setting app.current_tenant to empty (no policy match = all rows visible
# to konstruct_app for SELECT on channel_connections).
# We use a raw SET LOCAL here to ensure the tenant policy is not applied.
await session.execute(text("SET LOCAL app.current_tenant = ''"))
stmt = (
select(ChannelConnection.tenant_id)
.where(ChannelConnection.channel_type == orm_channel)
.where(ChannelConnection.workspace_id == workspace_id)
.limit(1)
)
result = await session.execute(stmt)
row = result.scalar_one_or_none()
except Exception:
logger.exception(
"resolve_tenant: DB error workspace_id=%r channel=%r",
workspace_id,
channel_type,
)
return None
if row is None:
logger.debug(
"resolve_tenant: no match workspace_id=%r channel=%r",
workspace_id,
channel_type,
)
return None
return str(row)