""" Integration tests for PostgreSQL Row Level Security (RLS) tenant isolation. Tests TNNT-01: All tenant data is isolated via PostgreSQL Row Level Security. THIS IS THE MOST CRITICAL TEST IN PHASE 1. These tests prove that Tenant A cannot see Tenant B's data through PostgreSQL RLS — not through application-layer filtering, but at the database level. Critical verification points: 1. tenant_b cannot see tenant_a's agents (even by primary key lookup) 2. tenant_a can see its own agents 3. FORCE ROW LEVEL SECURITY is active (relforcerowsecurity = TRUE) 4. Same isolation holds for channel_connections table 5. All tests connect as konstruct_app (not superuser) """ from __future__ import annotations import uuid from typing import Any import pytest from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from shared.rls import current_tenant_id @pytest.mark.asyncio class TestAgentRLSIsolation: """Prove that Agent rows are invisible across tenant boundaries.""" async def test_tenant_b_cannot_see_tenant_a_agent( self, db_session: AsyncSession, tenant_a: dict[str, Any], tenant_b: dict[str, Any], ) -> None: """ Core RLS test: tenant_b must see ZERO rows from agents owned by tenant_a. If this test passes trivially (e.g., because the connection is superuser), the test_rls_is_forced test below will catch it. """ agent_id = uuid.uuid4() # Create an agent for tenant_a (using superuser-like direct insert) # We need to temporarily bypass RLS to seed data token = current_tenant_id.set(tenant_a["id"]) try: await db_session.execute( text( "INSERT INTO agents (id, tenant_id, name, role) " "VALUES (:id, :tenant_id, :name, :role)" ), { "id": str(agent_id), "tenant_id": str(tenant_a["id"]), "name": "Alice", "role": "Support Lead", }, ) await db_session.commit() finally: current_tenant_id.reset(token) # Now set tenant_b context and try to query the agent token = current_tenant_id.set(tenant_b["id"]) try: result = await db_session.execute(text("SELECT id FROM agents")) rows = result.fetchall() assert len(rows) == 0, ( f"RLS FAILURE: tenant_b can see {len(rows)} agent(s) belonging to tenant_a. " "This is a critical security violation — check that FORCE ROW LEVEL SECURITY " "is applied and that the connection uses konstruct_app role (not superuser)." ) finally: current_tenant_id.reset(token) async def test_tenant_a_can_see_own_agents( self, db_session: AsyncSession, tenant_a: dict[str, Any], tenant_b: dict[str, Any], ) -> None: """Tenant A must be able to see its own agents — RLS must not block legitimate access.""" agent_id = uuid.uuid4() # Insert agent as tenant_a token = current_tenant_id.set(tenant_a["id"]) try: await db_session.execute( text( "INSERT INTO agents (id, tenant_id, name, role) " "VALUES (:id, :tenant_id, :name, :role)" ), { "id": str(agent_id), "tenant_id": str(tenant_a["id"]), "name": "Bob", "role": "Sales Rep", }, ) await db_session.commit() finally: current_tenant_id.reset(token) # Query as tenant_a — must see the agent token = current_tenant_id.set(tenant_a["id"]) try: result = await db_session.execute(text("SELECT id FROM agents WHERE id = :id"), {"id": str(agent_id)}) rows = result.fetchall() assert len(rows) == 1, ( f"Tenant A cannot see its own agent. Found {len(rows)} rows. " "RLS policy may be too restrictive." ) finally: current_tenant_id.reset(token) @pytest.mark.asyncio class TestChannelConnectionRLSIsolation: """Prove that ChannelConnection rows are invisible across tenant boundaries.""" async def test_tenant_b_cannot_see_tenant_a_channel_connection( self, db_session: AsyncSession, tenant_a: dict[str, Any], tenant_b: dict[str, Any], ) -> None: """tenant_b must see ZERO channel_connections owned by tenant_a.""" conn_id = uuid.uuid4() # Create a channel connection for tenant_a token = current_tenant_id.set(tenant_a["id"]) try: await db_session.execute( text( "INSERT INTO channel_connections (id, tenant_id, channel_type, workspace_id, config) " "VALUES (:id, :tenant_id, 'slack', :workspace_id, :config)" ), { "id": str(conn_id), "tenant_id": str(tenant_a["id"]), "workspace_id": "T-ALPHA-WORKSPACE", "config": "{}", }, ) await db_session.commit() finally: current_tenant_id.reset(token) # Query as tenant_b — must see zero rows token = current_tenant_id.set(tenant_b["id"]) try: result = await db_session.execute(text("SELECT id FROM channel_connections")) rows = result.fetchall() assert len(rows) == 0, ( f"RLS FAILURE: tenant_b can see {len(rows)} channel_connection(s) belonging to tenant_a." ) finally: current_tenant_id.reset(token) async def test_tenant_a_can_see_own_channel_connections( self, db_session: AsyncSession, tenant_a: dict[str, Any], ) -> None: """Tenant A must see its own channel connections.""" conn_id = uuid.uuid4() token = current_tenant_id.set(tenant_a["id"]) try: await db_session.execute( text( "INSERT INTO channel_connections (id, tenant_id, channel_type, workspace_id, config) " "VALUES (:id, :tenant_id, 'telegram', :workspace_id, :config)" ), { "id": str(conn_id), "tenant_id": str(tenant_a["id"]), "workspace_id": "tg-alpha-chat", "config": "{}", }, ) await db_session.commit() finally: current_tenant_id.reset(token) # Query as tenant_a — must see the connection token = current_tenant_id.set(tenant_a["id"]) try: result = await db_session.execute( text("SELECT id FROM channel_connections WHERE id = :id"), {"id": str(conn_id)}, ) rows = result.fetchall() assert len(rows) == 1, f"Tenant A cannot see its own channel connection. Found {len(rows)} rows." finally: current_tenant_id.reset(token) @pytest.mark.asyncio class TestRLSPolicyConfiguration: """Verify PostgreSQL RLS configuration is correct at the schema level.""" async def test_agents_force_row_level_security_is_active( self, db_session: AsyncSession, tenant_a: dict[str, Any], ) -> None: """ FORCE ROW LEVEL SECURITY must be TRUE for agents table. Without FORCE RLS, the table owner (postgres) bypasses RLS. This would mean our isolation tests passed trivially and provide zero real protection. """ # We need to query pg_class as superuser to check this # Using the session (which is konstruct_app) — pg_class is readable by all token = current_tenant_id.set(tenant_a["id"]) try: result = await db_session.execute( text("SELECT relforcerowsecurity FROM pg_class WHERE relname = 'agents'") ) row = result.fetchone() finally: current_tenant_id.reset(token) assert row is not None, "agents table not found in pg_class" assert row[0] is True, ( "FORCE ROW LEVEL SECURITY is NOT active on agents table. " "This is a critical security misconfiguration — the table owner " "can bypass RLS and cross-tenant data leakage is possible." ) async def test_channel_connections_force_row_level_security_is_active( self, db_session: AsyncSession, tenant_a: dict[str, Any], ) -> None: """FORCE ROW LEVEL SECURITY must be TRUE for channel_connections table.""" token = current_tenant_id.set(tenant_a["id"]) try: result = await db_session.execute( text("SELECT relforcerowsecurity FROM pg_class WHERE relname = 'channel_connections'") ) row = result.fetchone() finally: current_tenant_id.reset(token) assert row is not None, "channel_connections table not found in pg_class" assert row[0] is True, ( "FORCE ROW LEVEL SECURITY is NOT active on channel_connections table." ) async def test_tenants_table_exists_and_is_accessible( self, db_session: AsyncSession, tenant_a: dict[str, Any], ) -> None: """ tenants table must be accessible without tenant context. RLS is NOT applied to tenants — the Router needs to look up all tenants during message routing, before tenant context is set. """ result = await db_session.execute(text("SELECT id, slug FROM tenants LIMIT 10")) rows = result.fetchall() # Should be accessible (no RLS) — we just care it doesn't raise assert rows is not None