- alembic.ini + migrations/env.py: async SQLAlchemy migration setup using asyncpg
- migrations/versions/001_initial_schema.py: creates tenants, agents, channel_connections, portal_users
- ENABLE + FORCE ROW LEVEL SECURITY on agents and channel_connections
- RLS policy: tenant_id = current_setting('app.current_tenant', TRUE)::uuid
- konstruct_app role created with SELECT/INSERT/UPDATE/DELETE on all tables
- packages/shared/shared/rls.py: idempotent configure_rls_hook, UUID-sanitized SET LOCAL
- tests/conftest.py: test_db_name (session-scoped), db_engine + db_session as konstruct_app
- tests/unit/test_normalize.py: 11 tests for KonstructMessage Slack normalization (CHAN-01)
- tests/unit/test_tenant_resolution.py: 7 tests for workspace_id → tenant resolution (TNNT-02)
- tests/unit/test_redis_namespacing.py: 15 tests for Redis key namespace isolation (TNNT-03)
- tests/integration/test_tenant_isolation.py: 7 tests proving RLS tenant isolation (TNNT-01)
- tenant_b cannot see tenant_a's agents or channel_connections
- FORCE ROW LEVEL SECURITY verified via pg_class.relforcerowsecurity
274 lines
9.9 KiB
Python
274 lines
9.9 KiB
Python
"""
|
|
Integration tests for PostgreSQL Row Level Security (RLS) tenant isolation.
|
|
|
|
Tests TNNT-01: All tenant data is isolated via PostgreSQL Row Level Security.
|
|
|
|
THIS IS THE MOST CRITICAL TEST IN PHASE 1.
|
|
|
|
These tests prove that Tenant A cannot see Tenant B's data through PostgreSQL
|
|
RLS — not through application-layer filtering, but at the database level.
|
|
|
|
Critical verification points:
|
|
1. tenant_b cannot see tenant_a's agents (even by primary key lookup)
|
|
2. tenant_a can see its own agents
|
|
3. FORCE ROW LEVEL SECURITY is active (relforcerowsecurity = TRUE)
|
|
4. Same isolation holds for channel_connections table
|
|
5. All tests connect as konstruct_app (not superuser)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from typing import Any
|
|
|
|
import pytest
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.rls import current_tenant_id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestAgentRLSIsolation:
|
|
"""Prove that Agent rows are invisible across tenant boundaries."""
|
|
|
|
async def test_tenant_b_cannot_see_tenant_a_agent(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
tenant_b: dict[str, Any],
|
|
) -> None:
|
|
"""
|
|
Core RLS test: tenant_b must see ZERO rows from agents owned by tenant_a.
|
|
|
|
If this test passes trivially (e.g., because the connection is superuser),
|
|
the test_rls_is_forced test below will catch it.
|
|
"""
|
|
agent_id = uuid.uuid4()
|
|
|
|
# Create an agent for tenant_a (using superuser-like direct insert)
|
|
# We need to temporarily bypass RLS to seed data
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
await db_session.execute(
|
|
text(
|
|
"INSERT INTO agents (id, tenant_id, name, role) "
|
|
"VALUES (:id, :tenant_id, :name, :role)"
|
|
),
|
|
{
|
|
"id": str(agent_id),
|
|
"tenant_id": str(tenant_a["id"]),
|
|
"name": "Alice",
|
|
"role": "Support Lead",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
# Now set tenant_b context and try to query the agent
|
|
token = current_tenant_id.set(tenant_b["id"])
|
|
try:
|
|
result = await db_session.execute(text("SELECT id FROM agents"))
|
|
rows = result.fetchall()
|
|
assert len(rows) == 0, (
|
|
f"RLS FAILURE: tenant_b can see {len(rows)} agent(s) belonging to tenant_a. "
|
|
"This is a critical security violation — check that FORCE ROW LEVEL SECURITY "
|
|
"is applied and that the connection uses konstruct_app role (not superuser)."
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
async def test_tenant_a_can_see_own_agents(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
tenant_b: dict[str, Any],
|
|
) -> None:
|
|
"""Tenant A must be able to see its own agents — RLS must not block legitimate access."""
|
|
agent_id = uuid.uuid4()
|
|
|
|
# Insert agent as tenant_a
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
await db_session.execute(
|
|
text(
|
|
"INSERT INTO agents (id, tenant_id, name, role) "
|
|
"VALUES (:id, :tenant_id, :name, :role)"
|
|
),
|
|
{
|
|
"id": str(agent_id),
|
|
"tenant_id": str(tenant_a["id"]),
|
|
"name": "Bob",
|
|
"role": "Sales Rep",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
# Query as tenant_a — must see the agent
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
result = await db_session.execute(text("SELECT id FROM agents WHERE id = :id"), {"id": str(agent_id)})
|
|
rows = result.fetchall()
|
|
assert len(rows) == 1, (
|
|
f"Tenant A cannot see its own agent. Found {len(rows)} rows. "
|
|
"RLS policy may be too restrictive."
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestChannelConnectionRLSIsolation:
|
|
"""Prove that ChannelConnection rows are invisible across tenant boundaries."""
|
|
|
|
async def test_tenant_b_cannot_see_tenant_a_channel_connection(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
tenant_b: dict[str, Any],
|
|
) -> None:
|
|
"""tenant_b must see ZERO channel_connections owned by tenant_a."""
|
|
conn_id = uuid.uuid4()
|
|
|
|
# Create a channel connection for tenant_a
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
await db_session.execute(
|
|
text(
|
|
"INSERT INTO channel_connections (id, tenant_id, channel_type, workspace_id, config) "
|
|
"VALUES (:id, :tenant_id, 'slack', :workspace_id, :config)"
|
|
),
|
|
{
|
|
"id": str(conn_id),
|
|
"tenant_id": str(tenant_a["id"]),
|
|
"workspace_id": "T-ALPHA-WORKSPACE",
|
|
"config": "{}",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
# Query as tenant_b — must see zero rows
|
|
token = current_tenant_id.set(tenant_b["id"])
|
|
try:
|
|
result = await db_session.execute(text("SELECT id FROM channel_connections"))
|
|
rows = result.fetchall()
|
|
assert len(rows) == 0, (
|
|
f"RLS FAILURE: tenant_b can see {len(rows)} channel_connection(s) belonging to tenant_a."
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
async def test_tenant_a_can_see_own_channel_connections(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
) -> None:
|
|
"""Tenant A must see its own channel connections."""
|
|
conn_id = uuid.uuid4()
|
|
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
await db_session.execute(
|
|
text(
|
|
"INSERT INTO channel_connections (id, tenant_id, channel_type, workspace_id, config) "
|
|
"VALUES (:id, :tenant_id, 'telegram', :workspace_id, :config)"
|
|
),
|
|
{
|
|
"id": str(conn_id),
|
|
"tenant_id": str(tenant_a["id"]),
|
|
"workspace_id": "tg-alpha-chat",
|
|
"config": "{}",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
# Query as tenant_a — must see the connection
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
result = await db_session.execute(
|
|
text("SELECT id FROM channel_connections WHERE id = :id"),
|
|
{"id": str(conn_id)},
|
|
)
|
|
rows = result.fetchall()
|
|
assert len(rows) == 1, f"Tenant A cannot see its own channel connection. Found {len(rows)} rows."
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestRLSPolicyConfiguration:
|
|
"""Verify PostgreSQL RLS configuration is correct at the schema level."""
|
|
|
|
async def test_agents_force_row_level_security_is_active(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
) -> None:
|
|
"""
|
|
FORCE ROW LEVEL SECURITY must be TRUE for agents table.
|
|
|
|
Without FORCE RLS, the table owner (postgres) bypasses RLS.
|
|
This would mean our isolation tests passed trivially and provide
|
|
zero real protection.
|
|
"""
|
|
# We need to query pg_class as superuser to check this
|
|
# Using the session (which is konstruct_app) — pg_class is readable by all
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
result = await db_session.execute(
|
|
text("SELECT relforcerowsecurity FROM pg_class WHERE relname = 'agents'")
|
|
)
|
|
row = result.fetchone()
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
assert row is not None, "agents table not found in pg_class"
|
|
assert row[0] is True, (
|
|
"FORCE ROW LEVEL SECURITY is NOT active on agents table. "
|
|
"This is a critical security misconfiguration — the table owner "
|
|
"can bypass RLS and cross-tenant data leakage is possible."
|
|
)
|
|
|
|
async def test_channel_connections_force_row_level_security_is_active(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
) -> None:
|
|
"""FORCE ROW LEVEL SECURITY must be TRUE for channel_connections table."""
|
|
token = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
result = await db_session.execute(
|
|
text("SELECT relforcerowsecurity FROM pg_class WHERE relname = 'channel_connections'")
|
|
)
|
|
row = result.fetchone()
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
assert row is not None, "channel_connections table not found in pg_class"
|
|
assert row[0] is True, (
|
|
"FORCE ROW LEVEL SECURITY is NOT active on channel_connections table."
|
|
)
|
|
|
|
async def test_tenants_table_exists_and_is_accessible(
|
|
self,
|
|
db_session: AsyncSession,
|
|
tenant_a: dict[str, Any],
|
|
) -> None:
|
|
"""
|
|
tenants table must be accessible without tenant context.
|
|
|
|
RLS is NOT applied to tenants — the Router needs to look up all tenants
|
|
during message routing, before tenant context is set.
|
|
"""
|
|
result = await db_session.execute(text("SELECT id, slug FROM tenants LIMIT 10"))
|
|
rows = result.fetchall()
|
|
# Should be accessible (no RLS) — we just care it doesn't raise
|
|
assert rows is not None
|