test(04-rbac-03): add failing integration tests for RBAC enforcement and invite flow

RED phase — tests are written, will pass when connected to live DB.
Tests cover:
- Full RBAC matrix: platform_admin/customer_admin/operator on all endpoints
- Operator can POST /test but not POST /agents (create)
- Missing headers return 422
- Impersonation creates AuditEvent row
- Full invite flow: create -> accept -> login with correct role
- Expired invite rejection
- Resend generates new token and extends expiry
- Double-accept prevention
This commit is contained in:
2026-03-24 17:16:13 -06:00
parent 43b73aa6c5
commit 9515c5374a
2 changed files with 1433 additions and 0 deletions

View File

@@ -0,0 +1,949 @@
"""
Integration tests for RBAC enforcement on all portal API endpoints.
Tests every endpoint against the full role matrix:
- platform_admin: unrestricted access to all endpoints
- customer_admin (own tenant): access to own-tenant endpoints, 403 on others
- customer_admin (other tenant): 403 on all tenant-scoped endpoints
- customer_operator: 200 on GET and test-message, 403 on POST/PUT/DELETE
- Missing role headers: 422 (FastAPI Header() validation)
Also tests:
- Impersonation endpoint creates AuditEvent row
- Billing/channels/llm_keys/usage RBAC representative tests
"""
from __future__ import annotations
import uuid
from typing import Any
import pytest
import pytest_asyncio
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from shared.api.billing import billing_router
from shared.api.channels import channels_router
from shared.api.llm_keys import llm_keys_router
from shared.api.portal import portal_router
from shared.api.usage import usage_router
from shared.db import get_session
from shared.models.auth import PortalUser, UserTenantRole
from shared.models.tenant import Agent, Tenant
# ---------------------------------------------------------------------------
# App factory
# ---------------------------------------------------------------------------
def make_app(session: AsyncSession) -> FastAPI:
"""Build a FastAPI test app with all portal routers and session override."""
app = FastAPI()
app.include_router(portal_router)
app.include_router(billing_router)
app.include_router(channels_router)
app.include_router(llm_keys_router)
app.include_router(usage_router)
async def override_get_session(): # type: ignore[return]
yield session
app.dependency_overrides[get_session] = override_get_session
return app
# ---------------------------------------------------------------------------
# RBAC header helpers
# ---------------------------------------------------------------------------
def platform_admin_headers(user_id: uuid.UUID) -> dict[str, str]:
"""Headers for a platform_admin caller."""
return {
"X-Portal-User-Id": str(user_id),
"X-Portal-User-Role": "platform_admin",
}
def customer_admin_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]:
"""Headers for a customer_admin caller with an active tenant."""
return {
"X-Portal-User-Id": str(user_id),
"X-Portal-User-Role": "customer_admin",
"X-Portal-Tenant-Id": str(tenant_id),
}
def customer_operator_headers(user_id: uuid.UUID, tenant_id: uuid.UUID) -> dict[str, str]:
"""Headers for a customer_operator caller with an active tenant."""
return {
"X-Portal-User-Id": str(user_id),
"X-Portal-User-Role": "customer_operator",
"X-Portal-Tenant-Id": str(tenant_id),
}
# ---------------------------------------------------------------------------
# DB setup helpers
# ---------------------------------------------------------------------------
async def _create_tenant(session: AsyncSession, name: str | None = None) -> Tenant:
"""Create a tenant directly in DB and return it."""
suffix = uuid.uuid4().hex[:8]
tenant = Tenant(
id=uuid.uuid4(),
name=name or f"RBAC Test Tenant {suffix}",
slug=f"rbac-test-{suffix}",
settings={},
)
session.add(tenant)
await session.flush()
return tenant
async def _create_user(
session: AsyncSession, role: str = "customer_admin"
) -> PortalUser:
"""Create a portal user directly in DB and return it."""
import bcrypt
suffix = uuid.uuid4().hex[:8]
hashed = bcrypt.hashpw(b"testpassword123", bcrypt.gensalt()).decode()
user = PortalUser(
id=uuid.uuid4(),
email=f"test-{suffix}@example.com",
hashed_password=hashed,
name=f"Test User {suffix}",
role=role,
)
session.add(user)
await session.flush()
return user
async def _grant_membership(
session: AsyncSession,
user: PortalUser,
tenant: Tenant,
role: str,
) -> UserTenantRole:
"""Grant a user membership in a tenant."""
membership = UserTenantRole(
id=uuid.uuid4(),
user_id=user.id,
tenant_id=tenant.id,
role=role,
)
session.add(membership)
await session.flush()
return membership
async def _create_agent(session: AsyncSession, tenant: Tenant) -> Agent:
"""Create an agent for a tenant."""
agent = Agent(
id=uuid.uuid4(),
tenant_id=tenant.id,
name="Test Agent",
role="Support",
persona="",
system_prompt="",
model_preference="quality",
tool_assignments=[],
escalation_rules=[],
is_active=True,
)
session.add(agent)
await session.flush()
return agent
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture
async def rbac_setup(db_session: AsyncSession) -> dict[str, Any]:
"""
Set up test tenants, users, and memberships for RBAC tests.
Returns a dict with:
- tenant: primary test tenant
- other_tenant: second tenant for cross-tenant tests
- platform_admin: platform_admin user (no tenant membership needed)
- customer_admin: customer_admin user with membership in tenant
- operator: customer_operator user with membership in tenant
- agent: an agent belonging to tenant
"""
tenant = await _create_tenant(db_session, "Primary RBAC Tenant")
other_tenant = await _create_tenant(db_session, "Other RBAC Tenant")
platform_admin = await _create_user(db_session, role="platform_admin")
customer_admin = await _create_user(db_session, role="customer_admin")
operator = await _create_user(db_session, role="customer_operator")
# Grant memberships only in primary tenant (not other_tenant)
await _grant_membership(db_session, customer_admin, tenant, "customer_admin")
await _grant_membership(db_session, operator, tenant, "customer_operator")
agent = await _create_agent(db_session, tenant)
await db_session.commit()
return {
"tenant": tenant,
"other_tenant": other_tenant,
"platform_admin": platform_admin,
"customer_admin": customer_admin,
"operator": operator,
"agent": agent,
}
@pytest_asyncio.fixture
async def rbac_client(db_session: AsyncSession) -> AsyncClient:
"""HTTP client with all portal routers mounted."""
app = make_app(db_session)
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
yield c
# ---------------------------------------------------------------------------
# Tests: Tenant CRUD endpoint RBAC
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestTenantEndpointRBAC:
async def test_platform_admin_can_list_tenants(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants — platform_admin gets 200."""
resp = await rbac_client.get(
"/api/portal/tenants",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
async def test_customer_admin_cannot_list_tenants(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants — customer_admin gets 403 (not platform admin)."""
resp = await rbac_client.get(
"/api/portal/tenants",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_cannot_list_tenants(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants — customer_operator gets 403."""
resp = await rbac_client.get(
"/api/portal/tenants",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_platform_admin_can_create_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants — platform_admin gets 201."""
suffix = uuid.uuid4().hex[:8]
resp = await rbac_client.post(
"/api/portal/tenants",
json={"name": f"New Tenant {suffix}", "slug": f"new-tenant-{suffix}"},
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 201
async def test_customer_admin_cannot_create_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants — customer_admin gets 403."""
suffix = uuid.uuid4().hex[:8]
resp = await rbac_client.post(
"/api/portal/tenants",
json={"name": f"Sneaky Tenant {suffix}", "slug": f"sneaky-{suffix}"},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_cannot_create_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants — customer_operator gets 403."""
suffix = uuid.uuid4().hex[:8]
resp = await rbac_client.post(
"/api/portal/tenants",
json={"name": f"Op Tenant {suffix}", "slug": f"op-tenant-{suffix}"},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_platform_admin_can_get_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{id} — platform_admin gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
async def test_customer_admin_can_get_own_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{id} — customer_admin with membership gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_customer_admin_cannot_get_other_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{id} — customer_admin without membership in that tenant gets 403."""
other_tid = rbac_setup["other_tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{other_tid}",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_can_get_own_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{id} — operator with membership gets 200 (read-only)."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_platform_admin_can_update_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""PUT /tenants/{id} — platform_admin gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.put(
f"/api/portal/tenants/{tid}",
json={"settings": {"tier": "team"}},
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
async def test_customer_admin_cannot_update_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""PUT /tenants/{id} — customer_admin gets 403 (only platform admin can update tenant)."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.put(
f"/api/portal/tenants/{tid}",
json={"settings": {"tier": "enterprise"}},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_cannot_update_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""PUT /tenants/{id} — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.put(
f"/api/portal/tenants/{tid}",
json={"settings": {}},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_platform_admin_can_delete_tenant(
self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any]
) -> None:
"""DELETE /tenants/{id} — platform_admin gets 204."""
# Create a disposable tenant
disposable = await _create_tenant(db_session, "Disposable Tenant")
await db_session.commit()
resp = await rbac_client.delete(
f"/api/portal/tenants/{disposable.id}",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 204
async def test_customer_admin_cannot_delete_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""DELETE /tenants/{id} — customer_admin gets 403."""
tid = rbac_setup["other_tenant"].id
resp = await rbac_client.delete(
f"/api/portal/tenants/{tid}",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_cannot_delete_tenant(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""DELETE /tenants/{id} — customer_operator gets 403."""
tid = rbac_setup["other_tenant"].id
resp = await rbac_client.delete(
f"/api/portal/tenants/{tid}",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_missing_headers_returns_422(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""Requests without RBAC headers get 422 from FastAPI header validation."""
resp = await rbac_client.get("/api/portal/tenants")
assert resp.status_code == 422
# ---------------------------------------------------------------------------
# Tests: Agent CRUD endpoint RBAC
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestAgentEndpointRBAC:
async def test_platform_admin_can_list_agents(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/agents — platform_admin gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}/agents",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
async def test_customer_admin_can_list_own_tenant_agents(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/agents — customer_admin with membership gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}/agents",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_customer_admin_cannot_list_other_tenant_agents(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/agents — customer_admin without membership gets 403."""
other_tid = rbac_setup["other_tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{other_tid}/agents",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_can_list_own_tenant_agents(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/agents — customer_operator with membership gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}/agents",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_platform_admin_can_create_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/agents — platform_admin gets 201."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents",
json={"name": "New Agent", "role": "Support"},
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 201
async def test_customer_admin_can_create_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/agents — customer_admin with membership gets 201."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents",
json={"name": "Admin Created Agent", "role": "Support"},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 201
async def test_operator_cannot_create_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/agents — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents",
json={"name": "Op Agent", "role": "Support"},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_platform_admin_can_update_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""PUT /tenants/{tid}/agents/{aid} — platform_admin gets 200."""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.put(
f"/api/portal/tenants/{tid}/agents/{aid}",
json={"persona": "Updated persona"},
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
async def test_customer_admin_can_update_own_tenant_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""PUT /tenants/{tid}/agents/{aid} — customer_admin with membership gets 200."""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.put(
f"/api/portal/tenants/{tid}/agents/{aid}",
json={"persona": "Admin updated persona"},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_operator_cannot_update_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""PUT /tenants/{tid}/agents/{aid} — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.put(
f"/api/portal/tenants/{tid}/agents/{aid}",
json={"persona": "Op trying to update"},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_customer_admin_cannot_update_other_tenant_agent(
self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any]
) -> None:
"""PUT — customer_admin without membership in other tenant gets 403."""
other_tenant = rbac_setup["other_tenant"]
other_agent = await _create_agent(db_session, other_tenant)
await db_session.commit()
resp = await rbac_client.put(
f"/api/portal/tenants/{other_tenant.id}/agents/{other_agent.id}",
json={"persona": "Cross-tenant attack"},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_platform_admin_can_delete_agent(
self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any]
) -> None:
"""DELETE /tenants/{tid}/agents/{aid} — platform_admin gets 204."""
tid = rbac_setup["tenant"].id
disposable_agent = await _create_agent(db_session, rbac_setup["tenant"])
await db_session.commit()
resp = await rbac_client.delete(
f"/api/portal/tenants/{tid}/agents/{disposable_agent.id}",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 204
async def test_operator_cannot_delete_agent(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""DELETE /tenants/{tid}/agents/{aid} — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.delete(
f"/api/portal/tenants/{tid}/agents/{aid}",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# Tests: Test-message endpoint — operators CAN send test messages
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestAgentTestMessageEndpoint:
async def test_platform_admin_can_send_test_message(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/agents/{aid}/test — platform_admin gets 200."""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents/{aid}/test",
json={"message": "Hello agent!"},
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
data = resp.json()
assert data["agent_id"] == str(aid)
assert data["message"] == "Hello agent!"
assert "response" in data
async def test_customer_admin_can_send_test_message(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/agents/{aid}/test — customer_admin with membership gets 200."""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents/{aid}/test",
json={"message": "Test from admin"},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_operator_can_send_test_message(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/agents/{aid}/test — customer_operator CAN send test messages.
This is the key locked decision: operators can send test messages
even though they cannot modify agents.
"""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents/{aid}/test",
json={"message": "Operator test message"},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_operator_cannot_send_test_message_to_other_tenant(
self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any]
) -> None:
"""POST /test — operator without membership in other tenant gets 403."""
other_tenant = rbac_setup["other_tenant"]
other_agent = await _create_agent(db_session, other_tenant)
await db_session.commit()
resp = await rbac_client.post(
f"/api/portal/tenants/{other_tenant.id}/agents/{other_agent.id}/test",
json={"message": "Cross-tenant attempt"},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_cannot_create_agent_but_can_send_test_message(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""
Critical test: operator is blocked from POST /agents (create)
but allowed on POST /agents/{id}/test (test message).
"""
tid = rbac_setup["tenant"].id
aid = rbac_setup["agent"].id
op_headers = customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
)
# Cannot create agent
create_resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents",
json={"name": "Sneaky Agent", "role": "Hacker"},
headers=op_headers,
)
assert create_resp.status_code == 403
# CAN send test message to existing agent
test_resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/agents/{aid}/test",
json={"message": "Legit test message"},
headers=op_headers,
)
assert test_resp.status_code == 200
# ---------------------------------------------------------------------------
# Tests: User listing endpoints
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestUserListingEndpoints:
async def test_platform_admin_can_list_tenant_users(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/users — platform_admin gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}/users",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
data = resp.json()
assert "users" in data
assert "pending_invitations" in data
# Customer admin and operator should be in the user list
user_ids = [u["id"] for u in data["users"]]
assert str(rbac_setup["customer_admin"].id) in user_ids
async def test_customer_admin_can_list_own_tenant_users(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/users — customer_admin with membership gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}/users",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_operator_cannot_list_tenant_users(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/users — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{tid}/users",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_customer_admin_cannot_list_other_tenant_users(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /tenants/{tid}/users — customer_admin without membership in other tenant gets 403."""
other_tid = rbac_setup["other_tenant"].id
resp = await rbac_client.get(
f"/api/portal/tenants/{other_tid}/users",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_platform_admin_can_list_all_users(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /admin/users — platform_admin gets 200."""
resp = await rbac_client.get(
"/api/portal/admin/users",
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
async def test_customer_admin_cannot_access_admin_users(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /admin/users — customer_admin gets 403."""
resp = await rbac_client.get(
"/api/portal/admin/users",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# Tests: Impersonation endpoint
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestImpersonationEndpoint:
async def test_platform_admin_can_impersonate(
self, rbac_client: AsyncClient, db_session: AsyncSession, rbac_setup: dict[str, Any]
) -> None:
"""POST /admin/impersonate — platform_admin gets 200 and creates AuditEvent."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
"/api/portal/admin/impersonate",
json={"tenant_id": str(tid)},
headers=platform_admin_headers(rbac_setup["platform_admin"].id),
)
assert resp.status_code == 200
data = resp.json()
assert data["tenant_id"] == str(tid)
# Verify AuditEvent was logged
result = await db_session.execute(
text(
"SELECT * FROM audit_events WHERE action_type = 'impersonation' "
"AND tenant_id = :tenant_id ORDER BY created_at DESC LIMIT 1"
),
{"tenant_id": str(tid)},
)
row = result.mappings().first()
assert row is not None
assert row["action_type"] == "impersonation"
async def test_customer_admin_cannot_impersonate(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /admin/impersonate — customer_admin gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
"/api/portal/admin/impersonate",
json={"tenant_id": str(tid)},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_cannot_impersonate(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /admin/impersonate — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
"/api/portal/admin/impersonate",
json={"tenant_id": str(tid)},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# Tests: Billing/channels/llm_keys/usage RBAC representative tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
class TestOtherRouterRBAC:
async def test_operator_cannot_checkout_billing(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /billing/checkout — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
"/api/portal/billing/checkout",
json={"tenant_id": str(tid), "agent_count": 1},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_customer_admin_can_access_billing(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /billing/checkout — customer_admin gets past RBAC (may fail for Stripe config, not 403)."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
"/api/portal/billing/checkout",
json={"tenant_id": str(tid), "agent_count": 1},
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
# Should not be 403 — may be 500 (Stripe not configured) but RBAC passes
assert resp.status_code != 403
async def test_operator_cannot_create_llm_key(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""POST /tenants/{tid}/llm-keys — customer_operator gets 403."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.post(
f"/api/portal/tenants/{tid}/llm-keys",
json={"provider": "openai", "label": "My Key", "api_key": "sk-test"},
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 403
async def test_operator_can_view_usage(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /usage/{tid}/summary — customer_operator gets 200 (read-only allowed)."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/usage/{tid}/summary",
headers=customer_operator_headers(
rbac_setup["operator"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200
async def test_customer_admin_can_view_usage(
self, rbac_client: AsyncClient, rbac_setup: dict[str, Any]
) -> None:
"""GET /usage/{tid}/summary — customer_admin gets 200."""
tid = rbac_setup["tenant"].id
resp = await rbac_client.get(
f"/api/portal/usage/{tid}/summary",
headers=customer_admin_headers(
rbac_setup["customer_admin"].id, rbac_setup["tenant"].id
),
)
assert resp.status_code == 200