From 9515c5374ae634850898c1f9f3ae5b4958d16dd1 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Tue, 24 Mar 2026 17:16:13 -0600 Subject: [PATCH] test(04-rbac-03): add failing integration tests for RBAC enforcement and invite flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RED phase — tests are written, will pass when connected to live DB. Tests cover: - Full RBAC matrix: platform_admin/customer_admin/operator on all endpoints - Operator can POST /test but not POST /agents (create) - Missing headers return 422 - Impersonation creates AuditEvent row - Full invite flow: create -> accept -> login with correct role - Expired invite rejection - Resend generates new token and extends expiry - Double-accept prevention --- tests/integration/test_invite_flow.py | 484 +++++++++++++ tests/integration/test_portal_rbac.py | 949 ++++++++++++++++++++++++++ 2 files changed, 1433 insertions(+) create mode 100644 tests/integration/test_invite_flow.py create mode 100644 tests/integration/test_portal_rbac.py diff --git a/tests/integration/test_invite_flow.py b/tests/integration/test_invite_flow.py new file mode 100644 index 0000000..d7853a5 --- /dev/null +++ b/tests/integration/test_invite_flow.py @@ -0,0 +1,484 @@ +""" +End-to-end integration tests for the portal invitation flow. + +Tests: + 1. Full flow: admin creates invite -> token generated -> accept with password -> + new user created with correct role and tenant membership + 2. Expired invite acceptance returns error + 3. Resend generates new token and extends expiry + 4. Double-accept prevented (status no longer 'pending') + 5. Login works after acceptance: /auth/verify returns correct role +""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone + +import bcrypt +import pytest +import pytest_asyncio +from fastapi import FastAPI +from httpx import ASGITransport, AsyncClient +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.api.invitations import invitations_router +from shared.api.portal import portal_router +from shared.db import get_session +from shared.invite_token import generate_invite_token, token_to_hash +from shared.models.auth import PortalInvitation, PortalUser, UserTenantRole +from shared.models.tenant import Tenant + + +# --------------------------------------------------------------------------- +# App factory +# --------------------------------------------------------------------------- + + +def make_app(session: AsyncSession) -> FastAPI: + """Build a FastAPI test app with portal and invitations routers.""" + app = FastAPI() + app.include_router(portal_router) + app.include_router(invitations_router) + + async def override_get_session(): # type: ignore[return] + yield session + + app.dependency_overrides[get_session] = override_get_session + return app + + +# --------------------------------------------------------------------------- +# Header helpers +# --------------------------------------------------------------------------- + + +def admin_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]: + return { + "X-Portal-User-Id": str(user_id), + "X-Portal-User-Role": "customer_admin", + "X-Portal-Tenant-Id": str(tenant_id), + } + + +def platform_admin_headers(user_id: uuid.UUID) -> dict[str, str]: + return { + "X-Portal-User-Id": str(user_id), + "X-Portal-User-Role": "platform_admin", + } + + +# --------------------------------------------------------------------------- +# DB setup helpers +# --------------------------------------------------------------------------- + + +async def _create_tenant_direct(session: AsyncSession) -> Tenant: + suffix = uuid.uuid4().hex[:8] + tenant = Tenant( + id=uuid.uuid4(), + name=f"Invite Test Tenant {suffix}", + slug=f"invite-test-{suffix}", + settings={}, + ) + session.add(tenant) + await session.flush() + return tenant + + +async def _create_admin_user(session: AsyncSession, tenant: Tenant) -> PortalUser: + hashed = bcrypt.hashpw(b"adminpassword123", bcrypt.gensalt()).decode() + suffix = uuid.uuid4().hex[:8] + user = PortalUser( + id=uuid.uuid4(), + email=f"admin-{suffix}@example.com", + hashed_password=hashed, + name=f"Admin User {suffix}", + role="customer_admin", + ) + session.add(user) + await session.flush() + + membership = UserTenantRole( + id=uuid.uuid4(), + user_id=user.id, + tenant_id=tenant.id, + role="customer_admin", + ) + session.add(membership) + await session.flush() + return user + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest_asyncio.fixture +async def invite_client(db_session: AsyncSession) -> AsyncClient: + """HTTP client with portal and invitations routers.""" + app = make_app(db_session) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + yield c + + +@pytest_asyncio.fixture +async def invite_setup(db_session: AsyncSession) -> dict: + """Create a tenant and admin user for invitation tests.""" + tenant = await _create_tenant_direct(db_session) + admin = await _create_admin_user(db_session, tenant) + await db_session.commit() + return {"tenant": tenant, "admin": admin} + + +# --------------------------------------------------------------------------- +# Tests: Full invite flow +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestInviteFlow: + async def test_full_invite_flow_operator( + self, invite_client: AsyncClient, db_session: AsyncSession, invite_setup: dict + ) -> None: + """ + Full end-to-end invite flow for a customer_operator: + + 1. Admin creates invitation + 2. Token is included in response + 3. Invitee accepts invitation with password + 4. PortalUser created with role=customer_operator + 5. UserTenantRole created linking user to tenant + 6. Invitation status updated to 'accepted' + 7. Login returns correct role + """ + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"operator-{uuid.uuid4().hex[:8]}@example.com" + + # Step 1: Admin creates invitation + resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "New Operator", + "role": "customer_operator", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + assert resp.status_code == 201 + invite_data = resp.json() + assert invite_data["email"] == invitee_email + assert invite_data["role"] == "customer_operator" + assert invite_data["status"] == "pending" + token = invite_data["token"] + assert token is not None and len(token) > 0 + + # Step 2: Invitee accepts the invitation + accept_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": token, "password": "securepass123"}, + ) + assert accept_resp.status_code == 200 + accept_data = accept_resp.json() + assert accept_data["email"] == invitee_email + assert accept_data["role"] == "customer_operator" + new_user_id = accept_data["id"] + + # Step 3: Verify PortalUser was created correctly + user_result = await db_session.execute( + select(PortalUser).where(PortalUser.id == uuid.UUID(new_user_id)) + ) + new_user = user_result.scalar_one_or_none() + assert new_user is not None + assert new_user.email == invitee_email + assert new_user.role == "customer_operator" + + # Step 4: Verify UserTenantRole was created + membership_result = await db_session.execute( + select(UserTenantRole).where( + UserTenantRole.user_id == uuid.UUID(new_user_id), + UserTenantRole.tenant_id == tenant.id, + ) + ) + membership = membership_result.scalar_one_or_none() + assert membership is not None + assert membership.role == "customer_operator" + + # Step 5: Verify invitation status is 'accepted' + inv_result = await db_session.execute( + select(PortalInvitation).where( + PortalInvitation.id == uuid.UUID(invite_data["id"]) + ) + ) + invitation = inv_result.scalar_one_or_none() + assert invitation is not None + assert invitation.status == "accepted" + + # Step 6: Verify login works with correct role + login_resp = await invite_client.post( + "/api/portal/auth/verify", + json={"email": invitee_email, "password": "securepass123"}, + ) + assert login_resp.status_code == 200 + login_data = login_resp.json() + assert login_data["role"] == "customer_operator" + assert str(tenant.id) in login_data["tenant_ids"] + + async def test_full_invite_flow_customer_admin( + self, invite_client: AsyncClient, invite_setup: dict + ) -> None: + """Full flow for a customer_admin invitee.""" + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"new-admin-{uuid.uuid4().hex[:8]}@example.com" + + # Create invitation + resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "New Admin", + "role": "customer_admin", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + assert resp.status_code == 201 + token = resp.json()["token"] + + # Accept invitation + accept_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": token, "password": "adminpass123"}, + ) + assert accept_resp.status_code == 200 + assert accept_resp.json()["role"] == "customer_admin" + + +# --------------------------------------------------------------------------- +# Tests: Expired invitation +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestExpiredInvite: + async def test_expired_invite_acceptance_returns_error( + self, invite_client: AsyncClient, db_session: AsyncSession, invite_setup: dict + ) -> None: + """Accepting an expired invitation returns a 400 error.""" + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"expired-{uuid.uuid4().hex[:8]}@example.com" + + # Create invitation normally + resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "Expired User", + "role": "customer_operator", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + assert resp.status_code == 201 + token = resp.json()["token"] + invite_id = uuid.UUID(resp.json()["id"]) + + # Manually set expires_at to the past + inv_result = await db_session.execute( + select(PortalInvitation).where(PortalInvitation.id == invite_id) + ) + invitation = inv_result.scalar_one() + invitation.expires_at = datetime.now(tz=timezone.utc) - timedelta(hours=1) + await db_session.commit() + + # Attempt to accept — should fail with 400 + accept_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": token, "password": "somepass123"}, + ) + assert accept_resp.status_code == 400 + assert "expired" in accept_resp.json()["detail"].lower() + + +# --------------------------------------------------------------------------- +# Tests: Resend invitation +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestResendInvite: + async def test_resend_generates_new_token_and_extends_expiry( + self, invite_client: AsyncClient, db_session: AsyncSession, invite_setup: dict + ) -> None: + """Resending an invitation generates a new token and extends expiry.""" + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"resend-{uuid.uuid4().hex[:8]}@example.com" + + # Create initial invitation + create_resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "Resend User", + "role": "customer_operator", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + assert create_resp.status_code == 201 + invite_id = create_resp.json()["id"] + original_token = create_resp.json()["token"] + + # Record original expiry + inv_result = await db_session.execute( + select(PortalInvitation).where(PortalInvitation.id == uuid.UUID(invite_id)) + ) + original_invitation = inv_result.scalar_one() + original_expiry = original_invitation.expires_at + + # Resend the invitation + resend_resp = await invite_client.post( + f"/api/portal/invitations/{invite_id}/resend", + headers=admin_headers(admin.id, tenant.id), + ) + assert resend_resp.status_code == 200 + new_token = resend_resp.json()["token"] + + # Token should be different + assert new_token != original_token + assert new_token is not None and len(new_token) > 0 + + # Verify new token hash in DB and extended expiry + await db_session.refresh(original_invitation) + assert original_invitation.token_hash == token_to_hash(new_token) + + # Expiry should be extended (refreshed from "now + 48h") + new_expiry = original_invitation.expires_at + assert new_expiry > original_expiry or ( + # Allow if original was already far in future (within 1 second tolerance) + abs((new_expiry - original_expiry).total_seconds()) < 5 + ) + + async def test_resend_new_token_can_be_used_to_accept( + self, invite_client: AsyncClient, invite_setup: dict + ) -> None: + """New token from resend works for acceptance.""" + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"resend-accept-{uuid.uuid4().hex[:8]}@example.com" + + # Create invitation + create_resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "Resend Accept User", + "role": "customer_operator", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + invite_id = create_resp.json()["id"] + + # Resend to get new token + resend_resp = await invite_client.post( + f"/api/portal/invitations/{invite_id}/resend", + headers=admin_headers(admin.id, tenant.id), + ) + new_token = resend_resp.json()["token"] + + # Accept with new token + accept_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": new_token, "password": "securepass123"}, + ) + assert accept_resp.status_code == 200 + assert accept_resp.json()["email"] == invitee_email + + +# --------------------------------------------------------------------------- +# Tests: Double-accept prevention +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestDoubleAcceptPrevention: + async def test_double_accept_returns_conflict( + self, invite_client: AsyncClient, invite_setup: dict + ) -> None: + """Attempting to accept an already-accepted invitation returns 409.""" + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"double-accept-{uuid.uuid4().hex[:8]}@example.com" + + # Create invitation + create_resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "Double Accept User", + "role": "customer_operator", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + token = create_resp.json()["token"] + + # First accept — should succeed + first_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": token, "password": "firstpass123"}, + ) + assert first_resp.status_code == 200 + + # Second accept with same token — should fail + second_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": token, "password": "secondpass123"}, + ) + # Either 409 (already accepted) or 400 (email already registered) + assert second_resp.status_code in (400, 409) + + async def test_only_pending_invites_can_be_accepted( + self, invite_client: AsyncClient, db_session: AsyncSession, invite_setup: dict + ) -> None: + """Invitations with status != 'pending' cannot be accepted.""" + tenant = invite_setup["tenant"] + admin = invite_setup["admin"] + invitee_email = f"non-pending-{uuid.uuid4().hex[:8]}@example.com" + + # Create invitation + create_resp = await invite_client.post( + "/api/portal/invitations", + json={ + "email": invitee_email, + "name": "Non Pending User", + "role": "customer_operator", + "tenant_id": str(tenant.id), + }, + headers=admin_headers(admin.id, tenant.id), + ) + invite_id = uuid.UUID(create_resp.json()["id"]) + token = create_resp.json()["token"] + + # Manually set status to 'revoked' + inv_result = await db_session.execute( + select(PortalInvitation).where(PortalInvitation.id == invite_id) + ) + invitation = inv_result.scalar_one() + invitation.status = "revoked" + await db_session.commit() + + # Attempt to accept revoked invitation + accept_resp = await invite_client.post( + "/api/portal/invitations/accept", + json={"token": token, "password": "somepass123"}, + ) + assert accept_resp.status_code == 409 diff --git a/tests/integration/test_portal_rbac.py b/tests/integration/test_portal_rbac.py new file mode 100644 index 0000000..0370468 --- /dev/null +++ b/tests/integration/test_portal_rbac.py @@ -0,0 +1,949 @@ +""" +Integration tests for RBAC enforcement on all portal API endpoints. + +Tests every endpoint against the full role matrix: + - platform_admin: unrestricted access to all endpoints + - customer_admin (own tenant): access to own-tenant endpoints, 403 on others + - customer_admin (other tenant): 403 on all tenant-scoped endpoints + - customer_operator: 200 on GET and test-message, 403 on POST/PUT/DELETE + - Missing role headers: 422 (FastAPI Header() validation) + +Also tests: + - Impersonation endpoint creates AuditEvent row + - Billing/channels/llm_keys/usage RBAC representative tests +""" + +from __future__ import annotations + +import uuid +from typing import Any + +import pytest +import pytest_asyncio +from fastapi import FastAPI +from httpx import ASGITransport, AsyncClient +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from shared.api.billing import billing_router +from shared.api.channels import channels_router +from shared.api.llm_keys import llm_keys_router +from shared.api.portal import portal_router +from shared.api.usage import usage_router +from shared.db import get_session +from shared.models.auth import PortalUser, UserTenantRole +from shared.models.tenant import Agent, Tenant + + +# --------------------------------------------------------------------------- +# App factory +# --------------------------------------------------------------------------- + + +def make_app(session: AsyncSession) -> FastAPI: + """Build a FastAPI test app with all portal routers and session override.""" + app = FastAPI() + app.include_router(portal_router) + app.include_router(billing_router) + app.include_router(channels_router) + app.include_router(llm_keys_router) + app.include_router(usage_router) + + async def override_get_session(): # type: ignore[return] + yield session + + app.dependency_overrides[get_session] = override_get_session + return app + + +# --------------------------------------------------------------------------- +# RBAC header helpers +# --------------------------------------------------------------------------- + + +def platform_admin_headers(user_id: uuid.UUID) -> dict[str, str]: + """Headers for a platform_admin caller.""" + return { + "X-Portal-User-Id": str(user_id), + "X-Portal-User-Role": "platform_admin", + } + + +def customer_admin_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]: + """Headers for a customer_admin caller with an active tenant.""" + return { + "X-Portal-User-Id": str(user_id), + "X-Portal-User-Role": "customer_admin", + "X-Portal-Tenant-Id": str(tenant_id), + } + + +def customer_operator_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]: + """Headers for a customer_operator caller with an active tenant.""" + return { + "X-Portal-User-Id": str(user_id), + "X-Portal-User-Role": "customer_operator", + "X-Portal-Tenant-Id": str(tenant_id), + } + + +# --------------------------------------------------------------------------- +# DB setup helpers +# --------------------------------------------------------------------------- + + +async def _create_tenant(session: AsyncSession, name: str | None = None) -> Tenant: + """Create a tenant directly in DB and return it.""" + suffix = uuid.uuid4().hex[:8] + tenant = Tenant( + id=uuid.uuid4(), + name=name or f"RBAC Test Tenant {suffix}", + slug=f"rbac-test-{suffix}", + settings={}, + ) + session.add(tenant) + await session.flush() + return tenant + + +async def _create_user( + session: AsyncSession, role: str = "customer_admin" +) -> PortalUser: + """Create a portal user directly in DB and return it.""" + import bcrypt + + suffix = uuid.uuid4().hex[:8] + hashed = bcrypt.hashpw(b"testpassword123", bcrypt.gensalt()).decode() + user = PortalUser( + id=uuid.uuid4(), + email=f"test-{suffix}@example.com", + hashed_password=hashed, + name=f"Test User {suffix}", + role=role, + ) + session.add(user) + await session.flush() + return user + + +async def _grant_membership( + session: AsyncSession, + user: PortalUser, + tenant: Tenant, + role: str, +) -> UserTenantRole: + """Grant a user membership in a tenant.""" + membership = UserTenantRole( + id=uuid.uuid4(), + user_id=user.id, + tenant_id=tenant.id, + role=role, + ) + session.add(membership) + await session.flush() + return membership + + +async def _create_agent(session: AsyncSession, tenant: Tenant) -> Agent: + """Create an agent for a tenant.""" + agent = Agent( + id=uuid.uuid4(), + tenant_id=tenant.id, + name="Test Agent", + role="Support", + persona="", + system_prompt="", + model_preference="quality", + tool_assignments=[], + escalation_rules=[], + is_active=True, + ) + session.add(agent) + await session.flush() + return agent + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest_asyncio.fixture +async def rbac_setup(db_session: AsyncSession) -> dict[str, Any]: + """ + Set up test tenants, users, and memberships for RBAC tests. + + Returns a dict with: + - tenant: primary test tenant + - other_tenant: second tenant for cross-tenant tests + - platform_admin: platform_admin user (no tenant membership needed) + - customer_admin: customer_admin user with membership in tenant + - operator: customer_operator user with membership in tenant + - agent: an agent belonging to tenant + """ + tenant = await _create_tenant(db_session, "Primary RBAC Tenant") + other_tenant = await _create_tenant(db_session, "Other RBAC Tenant") + + platform_admin = await _create_user(db_session, role="platform_admin") + customer_admin = await _create_user(db_session, role="customer_admin") + operator = await _create_user(db_session, role="customer_operator") + + # Grant memberships only in primary tenant (not other_tenant) + await _grant_membership(db_session, customer_admin, tenant, "customer_admin") + await _grant_membership(db_session, operator, tenant, "customer_operator") + + agent = await _create_agent(db_session, tenant) + + await db_session.commit() + + return { + "tenant": tenant, + "other_tenant": other_tenant, + "platform_admin": platform_admin, + "customer_admin": customer_admin, + "operator": operator, + "agent": agent, + } + + +@pytest_asyncio.fixture +async def rbac_client(db_session: AsyncSession) -> AsyncClient: + """HTTP client with all portal routers mounted.""" + app = make_app(db_session) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + yield c + + +# --------------------------------------------------------------------------- +# Tests: Tenant CRUD endpoint RBAC +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestTenantEndpointRBAC: + async def test_platform_admin_can_list_tenants( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants — platform_admin gets 200.""" + resp = await rbac_client.get( + "/api/portal/tenants", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + + async def test_customer_admin_cannot_list_tenants( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants — customer_admin gets 403 (not platform admin).""" + resp = await rbac_client.get( + "/api/portal/tenants", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_cannot_list_tenants( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants — customer_operator gets 403.""" + resp = await rbac_client.get( + "/api/portal/tenants", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_platform_admin_can_create_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants — platform_admin gets 201.""" + suffix = uuid.uuid4().hex[:8] + resp = await rbac_client.post( + "/api/portal/tenants", + json={"name": f"New Tenant {suffix}", "slug": f"new-tenant-{suffix}"}, + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 201 + + async def test_customer_admin_cannot_create_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants — customer_admin gets 403.""" + suffix = uuid.uuid4().hex[:8] + resp = await rbac_client.post( + "/api/portal/tenants", + json={"name": f"Sneaky Tenant {suffix}", "slug": f"sneaky-{suffix}"}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_cannot_create_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants — customer_operator gets 403.""" + suffix = uuid.uuid4().hex[:8] + resp = await rbac_client.post( + "/api/portal/tenants", + json={"name": f"Op Tenant {suffix}", "slug": f"op-tenant-{suffix}"}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_platform_admin_can_get_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{id} — platform_admin gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + + async def test_customer_admin_can_get_own_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{id} — customer_admin with membership gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_customer_admin_cannot_get_other_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{id} — customer_admin without membership in that tenant gets 403.""" + other_tid = rbac_setup["other_tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{other_tid}", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_can_get_own_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{id} — operator with membership gets 200 (read-only).""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_platform_admin_can_update_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """PUT /tenants/{id} — platform_admin gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.put( + f"/api/portal/tenants/{tid}", + json={"settings": {"tier": "team"}}, + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + + async def test_customer_admin_cannot_update_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """PUT /tenants/{id} — customer_admin gets 403 (only platform admin can update tenant).""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.put( + f"/api/portal/tenants/{tid}", + json={"settings": {"tier": "enterprise"}}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_cannot_update_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """PUT /tenants/{id} — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.put( + f"/api/portal/tenants/{tid}", + json={"settings": {}}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_platform_admin_can_delete_tenant( + self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] + ) -> None: + """DELETE /tenants/{id} — platform_admin gets 204.""" + # Create a disposable tenant + disposable = await _create_tenant(db_session, "Disposable Tenant") + await db_session.commit() + resp = await rbac_client.delete( + f"/api/portal/tenants/{disposable.id}", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 204 + + async def test_customer_admin_cannot_delete_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """DELETE /tenants/{id} — customer_admin gets 403.""" + tid = rbac_setup["other_tenant"].id + resp = await rbac_client.delete( + f"/api/portal/tenants/{tid}", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_cannot_delete_tenant( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """DELETE /tenants/{id} — customer_operator gets 403.""" + tid = rbac_setup["other_tenant"].id + resp = await rbac_client.delete( + f"/api/portal/tenants/{tid}", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_missing_headers_returns_422( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """Requests without RBAC headers get 422 from FastAPI header validation.""" + resp = await rbac_client.get("/api/portal/tenants") + assert resp.status_code == 422 + + +# --------------------------------------------------------------------------- +# Tests: Agent CRUD endpoint RBAC +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestAgentEndpointRBAC: + async def test_platform_admin_can_list_agents( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/agents — platform_admin gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}/agents", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + + async def test_customer_admin_can_list_own_tenant_agents( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/agents — customer_admin with membership gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}/agents", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_customer_admin_cannot_list_other_tenant_agents( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/agents — customer_admin without membership gets 403.""" + other_tid = rbac_setup["other_tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{other_tid}/agents", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_can_list_own_tenant_agents( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/agents — customer_operator with membership gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}/agents", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_platform_admin_can_create_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/agents — platform_admin gets 201.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents", + json={"name": "New Agent", "role": "Support"}, + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 201 + + async def test_customer_admin_can_create_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/agents — customer_admin with membership gets 201.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents", + json={"name": "Admin Created Agent", "role": "Support"}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 201 + + async def test_operator_cannot_create_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/agents — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents", + json={"name": "Op Agent", "role": "Support"}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_platform_admin_can_update_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """PUT /tenants/{tid}/agents/{aid} — platform_admin gets 200.""" + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.put( + f"/api/portal/tenants/{tid}/agents/{aid}", + json={"persona": "Updated persona"}, + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + + async def test_customer_admin_can_update_own_tenant_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """PUT /tenants/{tid}/agents/{aid} — customer_admin with membership gets 200.""" + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.put( + f"/api/portal/tenants/{tid}/agents/{aid}", + json={"persona": "Admin updated persona"}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_operator_cannot_update_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """PUT /tenants/{tid}/agents/{aid} — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.put( + f"/api/portal/tenants/{tid}/agents/{aid}", + json={"persona": "Op trying to update"}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_customer_admin_cannot_update_other_tenant_agent( + self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] + ) -> None: + """PUT — customer_admin without membership in other tenant gets 403.""" + other_tenant = rbac_setup["other_tenant"] + other_agent = await _create_agent(db_session, other_tenant) + await db_session.commit() + + resp = await rbac_client.put( + f"/api/portal/tenants/{other_tenant.id}/agents/{other_agent.id}", + json={"persona": "Cross-tenant attack"}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_platform_admin_can_delete_agent( + self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] + ) -> None: + """DELETE /tenants/{tid}/agents/{aid} — platform_admin gets 204.""" + tid = rbac_setup["tenant"].id + disposable_agent = await _create_agent(db_session, rbac_setup["tenant"]) + await db_session.commit() + resp = await rbac_client.delete( + f"/api/portal/tenants/{tid}/agents/{disposable_agent.id}", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 204 + + async def test_operator_cannot_delete_agent( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """DELETE /tenants/{tid}/agents/{aid} — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.delete( + f"/api/portal/tenants/{tid}/agents/{aid}", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# Tests: Test-message endpoint — operators CAN send test messages +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestAgentTestMessageEndpoint: + async def test_platform_admin_can_send_test_message( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/agents/{aid}/test — platform_admin gets 200.""" + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents/{aid}/test", + json={"message": "Hello agent!"}, + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["agent_id"] == str(aid) + assert data["message"] == "Hello agent!" + assert "response" in data + + async def test_customer_admin_can_send_test_message( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/agents/{aid}/test — customer_admin with membership gets 200.""" + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents/{aid}/test", + json={"message": "Test from admin"}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_operator_can_send_test_message( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/agents/{aid}/test — customer_operator CAN send test messages. + + This is the key locked decision: operators can send test messages + even though they cannot modify agents. + """ + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents/{aid}/test", + json={"message": "Operator test message"}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_operator_cannot_send_test_message_to_other_tenant( + self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] + ) -> None: + """POST /test — operator without membership in other tenant gets 403.""" + other_tenant = rbac_setup["other_tenant"] + other_agent = await _create_agent(db_session, other_tenant) + await db_session.commit() + + resp = await rbac_client.post( + f"/api/portal/tenants/{other_tenant.id}/agents/{other_agent.id}/test", + json={"message": "Cross-tenant attempt"}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_cannot_create_agent_but_can_send_test_message( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """ + Critical test: operator is blocked from POST /agents (create) + but allowed on POST /agents/{id}/test (test message). + """ + tid = rbac_setup["tenant"].id + aid = rbac_setup["agent"].id + op_headers = customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ) + + # Cannot create agent + create_resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents", + json={"name": "Sneaky Agent", "role": "Hacker"}, + headers=op_headers, + ) + assert create_resp.status_code == 403 + + # CAN send test message to existing agent + test_resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/agents/{aid}/test", + json={"message": "Legit test message"}, + headers=op_headers, + ) + assert test_resp.status_code == 200 + + +# --------------------------------------------------------------------------- +# Tests: User listing endpoints +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestUserListingEndpoints: + async def test_platform_admin_can_list_tenant_users( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/users — platform_admin gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}/users", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + data = resp.json() + assert "users" in data + assert "pending_invitations" in data + # Customer admin and operator should be in the user list + user_ids = [u["id"] for u in data["users"]] + assert str(rbac_setup["customer_admin"].id) in user_ids + + async def test_customer_admin_can_list_own_tenant_users( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/users — customer_admin with membership gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}/users", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_operator_cannot_list_tenant_users( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/users — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{tid}/users", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_customer_admin_cannot_list_other_tenant_users( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /tenants/{tid}/users — customer_admin without membership in other tenant gets 403.""" + other_tid = rbac_setup["other_tenant"].id + resp = await rbac_client.get( + f"/api/portal/tenants/{other_tid}/users", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_platform_admin_can_list_all_users( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /admin/users — platform_admin gets 200.""" + resp = await rbac_client.get( + "/api/portal/admin/users", + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + data = resp.json() + assert isinstance(data, list) + + async def test_customer_admin_cannot_access_admin_users( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /admin/users — customer_admin gets 403.""" + resp = await rbac_client.get( + "/api/portal/admin/users", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# Tests: Impersonation endpoint +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestImpersonationEndpoint: + async def test_platform_admin_can_impersonate( + self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any] + ) -> None: + """POST /admin/impersonate — platform_admin gets 200 and creates AuditEvent.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + "/api/portal/admin/impersonate", + json={"tenant_id": str(tid)}, + headers=platform_admin_headers(rbac_setup["platform_admin"].id), + ) + assert resp.status_code == 200 + data = resp.json() + assert data["tenant_id"] == str(tid) + + # Verify AuditEvent was logged + result = await db_session.execute( + text( + "SELECT * FROM audit_events WHERE action_type = 'impersonation' " + "AND tenant_id = :tenant_id ORDER BY created_at DESC LIMIT 1" + ), + {"tenant_id": str(tid)}, + ) + row = result.mappings().first() + assert row is not None + assert row["action_type"] == "impersonation" + + async def test_customer_admin_cannot_impersonate( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /admin/impersonate — customer_admin gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + "/api/portal/admin/impersonate", + json={"tenant_id": str(tid)}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_cannot_impersonate( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /admin/impersonate — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + "/api/portal/admin/impersonate", + json={"tenant_id": str(tid)}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + +# --------------------------------------------------------------------------- +# Tests: Billing/channels/llm_keys/usage RBAC representative tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +class TestOtherRouterRBAC: + async def test_operator_cannot_checkout_billing( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /billing/checkout — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + "/api/portal/billing/checkout", + json={"tenant_id": str(tid), "agent_count": 1}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_customer_admin_can_access_billing( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /billing/checkout — customer_admin gets past RBAC (may fail for Stripe config, not 403).""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + "/api/portal/billing/checkout", + json={"tenant_id": str(tid), "agent_count": 1}, + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + # Should not be 403 — may be 500 (Stripe not configured) but RBAC passes + assert resp.status_code != 403 + + async def test_operator_cannot_create_llm_key( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """POST /tenants/{tid}/llm-keys — customer_operator gets 403.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.post( + f"/api/portal/tenants/{tid}/llm-keys", + json={"provider": "openai", "label": "My Key", "api_key": "sk-test"}, + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 403 + + async def test_operator_can_view_usage( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /usage/{tid}/summary — customer_operator gets 200 (read-only allowed).""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/usage/{tid}/summary", + headers=customer_operator_headers( + rbac_setup["operator"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200 + + async def test_customer_admin_can_view_usage( + self, rbac_client: AsyncClient, rbac_setup: dict[str, Any] + ) -> None: + """GET /usage/{tid}/summary — customer_admin gets 200.""" + tid = rbac_setup["tenant"].id + resp = await rbac_client.get( + f"/api/portal/usage/{tid}/summary", + headers=customer_admin_headers( + rbac_setup["customer_admin"].id, rbac_setup["tenant"].id + ), + ) + assert resp.status_code == 200