- alembic.ini + migrations/env.py: async SQLAlchemy migration setup using asyncpg
- migrations/versions/001_initial_schema.py: creates tenants, agents, channel_connections, portal_users
- ENABLE + FORCE ROW LEVEL SECURITY on agents and channel_connections
- RLS policy: tenant_id = current_setting('app.current_tenant', TRUE)::uuid
- konstruct_app role created with SELECT/INSERT/UPDATE/DELETE on all tables
- packages/shared/shared/rls.py: idempotent configure_rls_hook, UUID-sanitized SET LOCAL
- tests/conftest.py: test_db_name (session-scoped), db_engine + db_session as konstruct_app
- tests/unit/test_normalize.py: 11 tests for KonstructMessage Slack normalization (CHAN-01)
- tests/unit/test_tenant_resolution.py: 7 tests for workspace_id → tenant resolution (TNNT-02)
- tests/unit/test_redis_namespacing.py: 15 tests for Redis key namespace isolation (TNNT-03)
- tests/integration/test_tenant_isolation.py: 7 tests proving RLS tenant isolation (TNNT-01)
- tenant_b cannot see tenant_a's agents or channel_connections
- FORCE ROW LEVEL SECURITY verified via pg_class.relforcerowsecurity
90 lines
3.4 KiB
Python
90 lines
3.4 KiB
Python
"""
|
|
PostgreSQL Row Level Security (RLS) integration.
|
|
|
|
How it works:
|
|
1. `current_tenant_id` is a ContextVar — set once per request/task.
|
|
2. `configure_rls_hook(engine)` registers a SQLAlchemy event listener that
|
|
fires before every cursor execute.
|
|
3. When `current_tenant_id` is set, the listener injects:
|
|
SET LOCAL app.current_tenant = '<tenant_id>'
|
|
into the current transaction.
|
|
4. PostgreSQL evaluates this setting in every RLS policy via:
|
|
current_setting('app.current_tenant', TRUE)::uuid
|
|
|
|
CRITICAL: The application MUST connect as `konstruct_app` (not postgres
|
|
superuser). Superuser connections bypass RLS entirely — isolation tests
|
|
would pass trivially but provide zero real protection.
|
|
|
|
IMPORTANT: SET LOCAL is transaction-scoped. The tenant context resets
|
|
automatically when each transaction ends — no manual cleanup required.
|
|
|
|
NOTE ON SQL INJECTION: PostgreSQL's SET LOCAL does not support parameterized
|
|
placeholders. We protect against injection by passing the tenant_id value
|
|
through uuid.UUID() — any non-UUID string raises ValueError before it reaches
|
|
the database. The resulting string is always in canonical UUID format:
|
|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx with only hex chars and hyphens.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from contextvars import ContextVar
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import event
|
|
from sqlalchemy.ext.asyncio import AsyncEngine
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ContextVar — set in middleware or request context
|
|
# ---------------------------------------------------------------------------
|
|
current_tenant_id: ContextVar[UUID | None] = ContextVar("current_tenant_id", default=None)
|
|
|
|
# Track engines that have already had the hook configured (by sync engine id)
|
|
_configured_engines: set[int] = set()
|
|
|
|
|
|
def configure_rls_hook(engine: AsyncEngine) -> None:
|
|
"""
|
|
Register the before_cursor_execute event on the given engine.
|
|
|
|
Call once at application startup, after the engine is created.
|
|
|
|
Example:
|
|
from shared.db import engine
|
|
from shared.rls import configure_rls_hook
|
|
configure_rls_hook(engine)
|
|
"""
|
|
|
|
# Idempotent — skip if already configured for this engine
|
|
engine_id = id(engine.sync_engine)
|
|
if engine_id in _configured_engines:
|
|
return
|
|
_configured_engines.add(engine_id)
|
|
|
|
@event.listens_for(engine.sync_engine, "before_cursor_execute")
|
|
def _set_rls_tenant(
|
|
conn: Any,
|
|
cursor: Any,
|
|
statement: str,
|
|
parameters: Any,
|
|
context: Any,
|
|
executemany: bool,
|
|
) -> None:
|
|
"""
|
|
Inject SET LOCAL app.current_tenant before every statement.
|
|
|
|
PostgreSQL SET LOCAL does not support parameterized placeholders.
|
|
We prevent SQL injection by validating the tenant_id value through
|
|
uuid.UUID() — any non-UUID string raises ValueError before it reaches
|
|
the database. The resulting string contains only hex characters and
|
|
hyphens in canonical UUID format.
|
|
|
|
SET LOCAL is transaction-scoped and resets on commit/rollback.
|
|
"""
|
|
tenant_id = current_tenant_id.get()
|
|
if tenant_id is not None:
|
|
# Sanitize: round-trip through UUID raises ValueError on invalid input.
|
|
# UUID.__str__ always produces canonical xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
|
|
safe_id = str(UUID(str(tenant_id)))
|
|
cursor.execute(f"SET LOCAL app.current_tenant = '{safe_id}'")
|