""" Unit tests for RBAC guard FastAPI dependencies. Tests: - test_platform_admin_passes: platform_admin caller passes require_platform_admin - test_customer_admin_rejected: customer_admin gets 403 from require_platform_admin - test_customer_operator_rejected: customer_operator gets 403 from require_platform_admin - test_tenant_admin_own_tenant: customer_admin with membership passes require_tenant_admin - test_tenant_admin_no_membership: customer_admin without membership gets 403 - test_platform_admin_bypasses_tenant_check: platform_admin passes require_tenant_admin without a UserTenantRole row (no DB query for membership) - test_operator_rejected_from_admin: customer_operator gets 403 from require_tenant_admin - test_tenant_member_all_roles: customer_admin and customer_operator with membership pass - test_tenant_member_no_membership: user with no membership gets 403 - test_platform_admin_bypasses_tenant_member: platform_admin passes require_tenant_member """ from __future__ import annotations import uuid from unittest.mock import AsyncMock, MagicMock import pytest from fastapi import HTTPException from shared.api.rbac import ( PortalCaller, require_platform_admin, require_tenant_admin, require_tenant_member, ) from shared.models.auth import UserTenantRole def _make_caller(role: str, tenant_id: uuid.UUID | None = None) -> PortalCaller: return PortalCaller(user_id=uuid.uuid4(), role=role, tenant_id=tenant_id) def _make_membership(user_id: uuid.UUID, tenant_id: uuid.UUID, role: str) -> UserTenantRole: m = MagicMock(spec=UserTenantRole) m.user_id = user_id m.tenant_id = tenant_id m.role = role return m def _mock_session_with_membership(membership: UserTenantRole | None) -> AsyncMock: session = AsyncMock() session.execute.return_value = MagicMock( scalar_one_or_none=MagicMock(return_value=membership) ) return session # --------------------------------------------------------------------------- # require_platform_admin tests # --------------------------------------------------------------------------- def test_platform_admin_passes() -> None: """platform_admin caller should pass require_platform_admin and return the caller.""" caller = _make_caller("platform_admin") result = require_platform_admin(caller=caller) assert result is caller def test_customer_admin_rejected() -> None: """customer_admin should get 403 from require_platform_admin.""" caller = _make_caller("customer_admin") with pytest.raises(HTTPException) as exc_info: require_platform_admin(caller=caller) assert exc_info.value.status_code == 403 def test_customer_operator_rejected() -> None: """customer_operator should get 403 from require_platform_admin.""" caller = _make_caller("customer_operator") with pytest.raises(HTTPException) as exc_info: require_platform_admin(caller=caller) assert exc_info.value.status_code == 403 # --------------------------------------------------------------------------- # require_tenant_admin tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_tenant_admin_own_tenant() -> None: """customer_admin with UserTenantRole membership passes require_tenant_admin.""" tenant_id = uuid.uuid4() caller = _make_caller("customer_admin") membership = _make_membership(caller.user_id, tenant_id, "customer_admin") session = _mock_session_with_membership(membership) result = await require_tenant_admin(tenant_id=tenant_id, caller=caller, session=session) assert result is caller session.execute.assert_called_once() @pytest.mark.asyncio async def test_tenant_admin_no_membership() -> None: """customer_admin without UserTenantRole row gets 403 from require_tenant_admin.""" tenant_id = uuid.uuid4() caller = _make_caller("customer_admin") session = _mock_session_with_membership(None) with pytest.raises(HTTPException) as exc_info: await require_tenant_admin(tenant_id=tenant_id, caller=caller, session=session) assert exc_info.value.status_code == 403 @pytest.mark.asyncio async def test_platform_admin_bypasses_tenant_check() -> None: """platform_admin passes require_tenant_admin without any DB membership query.""" tenant_id = uuid.uuid4() caller = _make_caller("platform_admin") session = AsyncMock() # Should NOT be called result = await require_tenant_admin(tenant_id=tenant_id, caller=caller, session=session) assert result is caller session.execute.assert_not_called() @pytest.mark.asyncio async def test_operator_rejected_from_admin() -> None: """customer_operator always gets 403 from require_tenant_admin (cannot be admin).""" tenant_id = uuid.uuid4() caller = _make_caller("customer_operator") session = AsyncMock() with pytest.raises(HTTPException) as exc_info: await require_tenant_admin(tenant_id=tenant_id, caller=caller, session=session) assert exc_info.value.status_code == 403 session.execute.assert_not_called() # --------------------------------------------------------------------------- # require_tenant_member tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_tenant_member_customer_admin() -> None: """customer_admin with membership passes require_tenant_member.""" tenant_id = uuid.uuid4() caller = _make_caller("customer_admin") membership = _make_membership(caller.user_id, tenant_id, "customer_admin") session = _mock_session_with_membership(membership) result = await require_tenant_member(tenant_id=tenant_id, caller=caller, session=session) assert result is caller @pytest.mark.asyncio async def test_tenant_member_customer_operator() -> None: """customer_operator with membership passes require_tenant_member.""" tenant_id = uuid.uuid4() caller = _make_caller("customer_operator") membership = _make_membership(caller.user_id, tenant_id, "customer_operator") session = _mock_session_with_membership(membership) result = await require_tenant_member(tenant_id=tenant_id, caller=caller, session=session) assert result is caller @pytest.mark.asyncio async def test_tenant_member_no_membership() -> None: """User with no UserTenantRole row gets 403 from require_tenant_member.""" tenant_id = uuid.uuid4() caller = _make_caller("customer_admin") session = _mock_session_with_membership(None) with pytest.raises(HTTPException) as exc_info: await require_tenant_member(tenant_id=tenant_id, caller=caller, session=session) assert exc_info.value.status_code == 403 @pytest.mark.asyncio async def test_platform_admin_bypasses_tenant_member() -> None: """platform_admin passes require_tenant_member without DB membership check.""" tenant_id = uuid.uuid4() caller = _make_caller("platform_admin") session = AsyncMock() result = await require_tenant_member(tenant_id=tenant_id, caller=caller, session=session) assert result is caller session.execute.assert_not_called()