""" Integration tests for RBAC enforcement on all portal API endpoints. Tests every endpoint against the full role matrix: - platform_admin: unrestricted access to all endpoints - customer_admin (own tenant): access to own-tenant endpoints, 403 on others - customer_admin (other tenant): 403 on all tenant-scoped endpoints - customer_operator: 200 on GET and test-message, 403 on POST/PUT/DELETE - Missing role headers: 422 (FastAPI Header() validation) Also tests: - Impersonation endpoint creates AuditEvent row - Billing/channels/llm_keys/usage RBAC representative tests """ from __future__ import annotations import uuid from typing import Any import pytest import pytest_asyncio from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from shared.api.billing import billing_router from shared.api.channels import channels_router from shared.api.llm_keys import llm_keys_router from shared.api.portal import portal_router from shared.api.usage import usage_router from shared.db import get_session from shared.models.auth import PortalUser, UserTenantRole from shared.models.tenant import Agent, Tenant # --------------------------------------------------------------------------- # App factory # --------------------------------------------------------------------------- def make_app(session: AsyncSession) -> FastAPI: """Build a FastAPI test app with all portal routers and session override.""" app = FastAPI() app.include_router(portal_router) app.include_router(billing_router) app.include_router(channels_router) app.include_router(llm_keys_router) app.include_router(usage_router) async def override_get_session(): # type: ignore[return] yield session app.dependency_overrides[get_session] = override_get_session return app # --------------------------------------------------------------------------- # RBAC header helpers # --------------------------------------------------------------------------- def platform_admin_headers(user_id: uuid.UUID) -> dict[str, str]: """Headers for a platform_admin caller.""" return { "X-Portal-User-Id": str(user_id), "X-Portal-User-Role": "platform_admin", } def customer_admin_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]: """Headers for a customer_admin caller with an active tenant.""" return { "X-Portal-User-Id": str(user_id), "X-Portal-User-Role": "customer_admin", "X-Portal-Tenant-Id": str(tenant_id), } def customer_operator_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]: """Headers for a customer_operator caller with an active tenant.""" return { "X-Portal-User-Id": str(user_id), "X-Portal-User-Role": "customer_operator", "X-Portal-Tenant-Id": str(tenant_id), } # --------------------------------------------------------------------------- # DB setup helpers # --------------------------------------------------------------------------- async def _create_tenant(session: AsyncSession, name: str | None = None) -> Tenant: """Create a tenant directly in DB and return it.""" suffix = uuid.uuid4().hex[:8] tenant = Tenant( id=uuid.uuid4(), name=name or f"RBAC Test Tenant {suffix}", slug=f"rbac-test-{suffix}", settings={}, ) session.add(tenant) await session.flush() return tenant async def _create_user( session: AsyncSession, role: str = "customer_admin" ) -> PortalUser: """Create a portal user directly in DB and return it.""" import bcrypt suffix = uuid.uuid4().hex[:8] hashed = bcrypt.hashpw(b"testpassword123", bcrypt.gensalt()).decode() user = PortalUser( id=uuid.uuid4(), email=f"test-{suffix}@example.com", hashed_password=hashed, name=f"Test User {suffix}", role=role, ) session.add(user) await session.flush() return user async def _grant_membership( session: AsyncSession, user: PortalUser, tenant: Tenant, role: str, ) -> UserTenantRole: """Grant a user membership in a tenant.""" membership = UserTenantRole( id=uuid.uuid4(), user_id=user.id, tenant_id=tenant.id, role=role, ) session.add(membership) await session.flush() return membership async def _create_agent(session: AsyncSession, tenant: Tenant) -> Agent: """Create an agent for a tenant.""" agent = Agent( id=uuid.uuid4(), tenant_id=tenant.id, name="Test Agent", role="Support", persona="", system_prompt="", model_preference="quality", tool_assignments=[], escalation_rules=[], is_active=True, ) session.add(agent) await session.flush() return agent # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def rbac_setup(db_session: AsyncSession) -> dict[str, Any]: """ Set up test tenants, users, and memberships for RBAC tests. Returns a dict with: - tenant: primary test tenant - other_tenant: second tenant for cross-tenant tests - platform_admin: platform_admin user (no tenant membership needed) - customer_admin: customer_admin user with membership in tenant - operator: customer_operator user with membership in tenant - agent: an agent belonging to tenant """ tenant = await _create_tenant(db_session, "Primary RBAC Tenant") other_tenant = await _create_tenant(db_session, "Other RBAC Tenant") platform_admin = await _create_user(db_session, role="platform_admin") customer_admin = await _create_user(db_session, role="customer_admin") operator = await _create_user(db_session, role="customer_operator") # Grant memberships only in primary tenant (not other_tenant) await _grant_membership(db_session, customer_admin, tenant, "customer_admin") await _grant_membership(db_session, operator, tenant, "customer_operator") agent = await _create_agent(db_session, tenant) await db_session.commit() return { "tenant": tenant, "other_tenant": other_tenant, "platform_admin": platform_admin, "customer_admin": customer_admin, "operator": operator, "agent": agent, } @pytest_asyncio.fixture async def rbac_client(db_session: AsyncSession) -> AsyncClient: """HTTP client with all portal routers mounted.""" app = make_app(db_session) async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c # --------------------------------------------------------------------------- # Tests: Tenant CRUD endpoint RBAC # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestTenantEndpointRBAC: async def test_platform_admin_can_list_tenants( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants — platform_admin gets 200.""" resp = await rbac_client.get( "/api/portal/tenants", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 async def test_customer_admin_cannot_list_tenants( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants — customer_admin gets 403 (not platform admin).""" resp = await rbac_client.get( "/api/portal/tenants", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_cannot_list_tenants( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants — customer_operator gets 403.""" resp = await rbac_client.get( "/api/portal/tenants", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_platform_admin_can_create_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants — platform_admin gets 201.""" suffix = uuid.uuid4().hex[:8] resp = await rbac_client.post( "/api/portal/tenants", json={"name": f"New Tenant {suffix}", "slug": f"new-tenant-{suffix}"}, headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 201 async def test_customer_admin_cannot_create_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants — customer_admin gets 403.""" suffix = uuid.uuid4().hex[:8] resp = await rbac_client.post( "/api/portal/tenants", json={"name": f"Sneaky Tenant {suffix}", "slug": f"sneaky-{suffix}"}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_cannot_create_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants — customer_operator gets 403.""" suffix = uuid.uuid4().hex[:8] resp = await rbac_client.post( "/api/portal/tenants", json={"name": f"Op Tenant {suffix}", "slug": f"op-tenant-{suffix}"}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_platform_admin_can_get_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{id} — platform_admin gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 async def test_customer_admin_can_get_own_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{id} — customer_admin with membership gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_customer_admin_cannot_get_other_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{id} — customer_admin without membership in that tenant gets 403.""" other_tid = rbac_setup["other_tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{other_tid}", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_can_get_own_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{id} — operator with membership gets 200 (read-only).""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_platform_admin_can_update_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """PUT /tenants/{id} — platform_admin gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.put( f"/api/portal/tenants/{tid}", json={"settings": {"tier": "team"}}, headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 async def test_customer_admin_cannot_update_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """PUT /tenants/{id} — customer_admin gets 403 (only platform admin can update tenant).""" tid = rbac_setup["tenant"].id resp = await rbac_client.put( f"/api/portal/tenants/{tid}", json={"settings": {"tier": "enterprise"}}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_cannot_update_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """PUT /tenants/{id} — customer_operator gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.put( f"/api/portal/tenants/{tid}", json={"settings": {}}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_platform_admin_can_delete_tenant( self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] ) -> None: """DELETE /tenants/{id} — platform_admin gets 204.""" # Create a disposable tenant disposable = await _create_tenant(db_session, "Disposable Tenant") await db_session.commit() resp = await rbac_client.delete( f"/api/portal/tenants/{disposable.id}", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 204 async def test_customer_admin_cannot_delete_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """DELETE /tenants/{id} — customer_admin gets 403.""" tid = rbac_setup["other_tenant"].id resp = await rbac_client.delete( f"/api/portal/tenants/{tid}", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_cannot_delete_tenant( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """DELETE /tenants/{id} — customer_operator gets 403.""" tid = rbac_setup["other_tenant"].id resp = await rbac_client.delete( f"/api/portal/tenants/{tid}", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_missing_headers_returns_422( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """Requests without RBAC headers get 422 from FastAPI header validation.""" resp = await rbac_client.get("/api/portal/tenants") assert resp.status_code == 422 # --------------------------------------------------------------------------- # Tests: Agent CRUD endpoint RBAC # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestAgentEndpointRBAC: async def test_platform_admin_can_list_agents( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/agents — platform_admin gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}/agents", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 async def test_customer_admin_can_list_own_tenant_agents( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/agents — customer_admin with membership gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}/agents", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_customer_admin_cannot_list_other_tenant_agents( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/agents — customer_admin without membership gets 403.""" other_tid = rbac_setup["other_tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{other_tid}/agents", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_can_list_own_tenant_agents( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/agents — customer_operator with membership gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}/agents", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_platform_admin_can_create_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/agents — platform_admin gets 201.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents", json={"name": "New Agent", "role": "Support"}, headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 201 async def test_customer_admin_can_create_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/agents — customer_admin with membership gets 201.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents", json={"name": "Admin Created Agent", "role": "Support"}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 201 async def test_operator_cannot_create_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/agents — customer_operator gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents", json={"name": "Op Agent", "role": "Support"}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_platform_admin_can_update_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """PUT /tenants/{tid}/agents/{aid} — platform_admin gets 200.""" tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.put( f"/api/portal/tenants/{tid}/agents/{aid}", json={"persona": "Updated persona"}, headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 async def test_customer_admin_can_update_own_tenant_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """PUT /tenants/{tid}/agents/{aid} — customer_admin with membership gets 200.""" tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.put( f"/api/portal/tenants/{tid}/agents/{aid}", json={"persona": "Admin updated persona"}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_operator_cannot_update_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """PUT /tenants/{tid}/agents/{aid} — customer_operator gets 403.""" tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.put( f"/api/portal/tenants/{tid}/agents/{aid}", json={"persona": "Op trying to update"}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_customer_admin_cannot_update_other_tenant_agent( self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] ) -> None: """PUT — customer_admin without membership in other tenant gets 403.""" other_tenant = rbac_setup["other_tenant"] other_agent = await _create_agent(db_session, other_tenant) await db_session.commit() resp = await rbac_client.put( f"/api/portal/tenants/{other_tenant.id}/agents/{other_agent.id}", json={"persona": "Cross-tenant attack"}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_platform_admin_can_delete_agent( self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] ) -> None: """DELETE /tenants/{tid}/agents/{aid} — platform_admin gets 204.""" tid = rbac_setup["tenant"].id disposable_agent = await _create_agent(db_session, rbac_setup["tenant"]) await db_session.commit() resp = await rbac_client.delete( f"/api/portal/tenants/{tid}/agents/{disposable_agent.id}", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 204 async def test_operator_cannot_delete_agent( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """DELETE /tenants/{tid}/agents/{aid} — customer_operator gets 403.""" tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.delete( f"/api/portal/tenants/{tid}/agents/{aid}", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 # --------------------------------------------------------------------------- # Tests: Test-message endpoint — operators CAN send test messages # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestAgentTestMessageEndpoint: async def test_platform_admin_can_send_test_message( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/agents/{aid}/test — platform_admin gets 200.""" tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents/{aid}/test", json={"message": "Hello agent!"}, headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 data = resp.json() assert data["agent_id"] == str(aid) assert data["message"] == "Hello agent!" assert "response" in data async def test_customer_admin_can_send_test_message( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/agents/{aid}/test — customer_admin with membership gets 200.""" tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents/{aid}/test", json={"message": "Test from admin"}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_operator_can_send_test_message( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/agents/{aid}/test — customer_operator CAN send test messages. This is the key locked decision: operators can send test messages even though they cannot modify agents. """ tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents/{aid}/test", json={"message": "Operator test message"}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_operator_cannot_send_test_message_to_other_tenant( self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] ) -> None: """POST /test — operator without membership in other tenant gets 403.""" other_tenant = rbac_setup["other_tenant"] other_agent = await _create_agent(db_session, other_tenant) await db_session.commit() resp = await rbac_client.post( f"/api/portal/tenants/{other_tenant.id}/agents/{other_agent.id}/test", json={"message": "Cross-tenant attempt"}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_cannot_create_agent_but_can_send_test_message( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """ Critical test: operator is blocked from POST /agents (create) but allowed on POST /agents/{id}/test (test message). """ tid = rbac_setup["tenant"].id aid = rbac_setup["agent"].id op_headers = customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ) # Cannot create agent create_resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents", json={"name": "Sneaky Agent", "role": "Hacker"}, headers=op_headers, ) assert create_resp.status_code == 403 # CAN send test message to existing agent test_resp = await rbac_client.post( f"/api/portal/tenants/{tid}/agents/{aid}/test", json={"message": "Legit test message"}, headers=op_headers, ) assert test_resp.status_code == 200 # --------------------------------------------------------------------------- # Tests: User listing endpoints # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestUserListingEndpoints: async def test_platform_admin_can_list_tenant_users( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/users — platform_admin gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}/users", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 data = resp.json() assert "users" in data assert "pending_invitations" in data # Customer admin and operator should be in the user list user_ids = [u["id"] for u in data["users"]] assert str(rbac_setup["customer_admin"].id) in user_ids async def test_customer_admin_can_list_own_tenant_users( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/users — customer_admin with membership gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}/users", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_operator_cannot_list_tenant_users( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/users — customer_operator gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{tid}/users", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_customer_admin_cannot_list_other_tenant_users( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /tenants/{tid}/users — customer_admin without membership in other tenant gets 403.""" other_tid = rbac_setup["other_tenant"].id resp = await rbac_client.get( f"/api/portal/tenants/{other_tid}/users", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_platform_admin_can_list_all_users( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /admin/users — platform_admin gets 200.""" resp = await rbac_client.get( "/api/portal/admin/users", headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 data = resp.json() assert isinstance(data, list) async def test_customer_admin_cannot_access_admin_users( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /admin/users — customer_admin gets 403.""" resp = await rbac_client.get( "/api/portal/admin/users", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 # --------------------------------------------------------------------------- # Tests: Impersonation endpoint # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestImpersonationEndpoint: async def test_platform_admin_can_impersonate( self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] ) -> None: """POST /admin/impersonate — platform_admin gets 200 and creates AuditEvent.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( "/api/portal/admin/impersonate", json={"tenant_id": str(tid)}, headers=platform_admin_headers(rbac_setup["platform_admin"].id), ) assert resp.status_code == 200 data = resp.json() assert data["tenant_id"] == str(tid) # Verify AuditEvent was logged result = await db_session.execute( text( "SELECT * FROM audit_events WHERE action_type = 'impersonation' " "AND tenant_id = :tenant_id ORDER BY created_at DESC LIMIT 1" ), {"tenant_id": str(tid)}, ) row = result.mappings().first() assert row is not None assert row["action_type"] == "impersonation" async def test_customer_admin_cannot_impersonate( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /admin/impersonate — customer_admin gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( "/api/portal/admin/impersonate", json={"tenant_id": str(tid)}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_cannot_impersonate( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /admin/impersonate — customer_operator gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( "/api/portal/admin/impersonate", json={"tenant_id": str(tid)}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 # --------------------------------------------------------------------------- # Tests: Billing/channels/llm_keys/usage RBAC representative tests # --------------------------------------------------------------------------- @pytest.mark.asyncio class TestOtherRouterRBAC: async def test_operator_cannot_checkout_billing( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /billing/checkout — customer_operator gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( "/api/portal/billing/checkout", json={"tenant_id": str(tid), "agent_count": 1}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_customer_admin_can_access_billing( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /billing/checkout — customer_admin gets past RBAC (may fail for Stripe config, not 403).""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( "/api/portal/billing/checkout", json={"tenant_id": str(tid), "agent_count": 1}, headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) # Should not be 403 — may be 500 (Stripe not configured) but RBAC passes assert resp.status_code != 403 async def test_operator_cannot_create_llm_key( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """POST /tenants/{tid}/llm-keys — customer_operator gets 403.""" tid = rbac_setup["tenant"].id resp = await rbac_client.post( f"/api/portal/tenants/{tid}/llm-keys", json={"provider": "openai", "label": "My Key", "api_key": "sk-test"}, headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 403 async def test_operator_can_view_usage( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /usage/{tid}/summary — customer_operator gets 200 (read-only allowed).""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/usage/{tid}/summary", headers=customer_operator_headers( rbac_setup["operator"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200 async def test_customer_admin_can_view_usage( self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] ) -> None: """GET /usage/{tid}/summary — customer_admin gets 200.""" tid = rbac_setup["tenant"].id resp = await rbac_client.get( f"/api/portal/usage/{tid}/summary", headers=customer_admin_headers( rbac_setup["customer_admin"].id, rbac_setup["tenant"].id ), ) assert resp.status_code == 200