test(04-rbac-01): unit tests for RBAC guards, invitation system, portal auth
- test_rbac_guards.py: 11 tests covering platform_admin pass-through, customer_admin/operator 403 rejection, tenant membership checks, and platform_admin bypass for tenant-scoped guards - test_invitations.py: 11 tests covering HMAC token roundtrip, tamper/expiry rejection, invitation create/accept/resend/list - test_portal_auth.py: 7 tests covering role field (not is_admin), tenant_ids list, active_tenant_id, platform_admin all-tenants, customer_admin own-tenants-only - All 27 tests pass
This commit is contained in:
279
tests/unit/test_portal_auth.py
Normal file
279
tests/unit/test_portal_auth.py
Normal file
@@ -0,0 +1,279 @@
|
||||
"""
|
||||
Unit tests for the updated auth/verify endpoint.
|
||||
|
||||
Tests verify that the response shape matches the new RBAC contract:
|
||||
- Returns `role` (not `is_admin`)
|
||||
- Returns `tenant_ids` as a list of UUID strings
|
||||
- Returns `active_tenant_id` as the first tenant ID (or None)
|
||||
- platform_admin returns all tenant IDs from the tenants table
|
||||
- customer_admin returns only their UserTenantRole tenant IDs
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import bcrypt
|
||||
import pytest
|
||||
|
||||
from shared.api.portal import AuthVerifyRequest, AuthVerifyResponse, verify_credentials
|
||||
from shared.models.auth import PortalUser, UserTenantRole
|
||||
from shared.models.tenant import Tenant
|
||||
|
||||
|
||||
def _make_user(role: str, email: str = "test@example.com") -> PortalUser:
|
||||
user = MagicMock(spec=PortalUser)
|
||||
user.id = uuid.uuid4()
|
||||
user.email = email
|
||||
user.name = "Test User"
|
||||
user.role = role
|
||||
# Real bcrypt hash for password "testpassword"
|
||||
user.hashed_password = bcrypt.hashpw(b"testpassword", bcrypt.gensalt()).decode()
|
||||
user.created_at = datetime.now(tz=timezone.utc)
|
||||
return user
|
||||
|
||||
|
||||
def _make_tenant_role(user_id: uuid.UUID, tenant_id: uuid.UUID, role: str) -> UserTenantRole:
|
||||
m = MagicMock(spec=UserTenantRole)
|
||||
m.user_id = user_id
|
||||
m.tenant_id = tenant_id
|
||||
m.role = role
|
||||
return m
|
||||
|
||||
|
||||
def _make_tenant(name: str = "Acme") -> Tenant:
|
||||
t = MagicMock(spec=Tenant)
|
||||
t.id = uuid.uuid4()
|
||||
t.name = name
|
||||
return t
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# test_auth_verify_returns_role
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_verify_returns_role() -> None:
|
||||
"""
|
||||
auth/verify response contains 'role' field (not 'is_admin').
|
||||
"""
|
||||
user = _make_user("customer_admin")
|
||||
session = AsyncMock()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def execute_side_effect(stmt):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
# User lookup
|
||||
result.scalar_one_or_none = MagicMock(return_value=user)
|
||||
else:
|
||||
# UserTenantRole lookup — empty
|
||||
result.scalars = MagicMock(return_value=MagicMock(all=MagicMock(return_value=[])))
|
||||
return result
|
||||
|
||||
session.execute = AsyncMock(side_effect=execute_side_effect)
|
||||
|
||||
body = AuthVerifyRequest(email=user.email, password="testpassword")
|
||||
response = await verify_credentials(body=body, session=session)
|
||||
|
||||
assert isinstance(response, AuthVerifyResponse)
|
||||
assert response.role == "customer_admin"
|
||||
# Ensure is_admin is NOT in the response model fields
|
||||
assert not hasattr(response, "is_admin") or not isinstance(getattr(response, "is_admin", None), bool)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# test_auth_verify_returns_tenant_ids
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_verify_returns_tenant_ids() -> None:
|
||||
"""
|
||||
auth/verify response contains tenant_ids as a list of UUID strings.
|
||||
"""
|
||||
user = _make_user("customer_admin")
|
||||
tenant_id_1 = uuid.uuid4()
|
||||
tenant_id_2 = uuid.uuid4()
|
||||
memberships = [
|
||||
_make_tenant_role(user.id, tenant_id_1, "customer_admin"),
|
||||
_make_tenant_role(user.id, tenant_id_2, "customer_admin"),
|
||||
]
|
||||
session = AsyncMock()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def execute_side_effect(stmt):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
result.scalar_one_or_none = MagicMock(return_value=user)
|
||||
else:
|
||||
result.scalars = MagicMock(return_value=MagicMock(all=MagicMock(return_value=memberships)))
|
||||
return result
|
||||
|
||||
session.execute = AsyncMock(side_effect=execute_side_effect)
|
||||
|
||||
body = AuthVerifyRequest(email=user.email, password="testpassword")
|
||||
response = await verify_credentials(body=body, session=session)
|
||||
|
||||
assert isinstance(response.tenant_ids, list)
|
||||
assert len(response.tenant_ids) == 2
|
||||
assert str(tenant_id_1) in response.tenant_ids
|
||||
assert str(tenant_id_2) in response.tenant_ids
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# test_auth_verify_returns_active_tenant
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_verify_returns_active_tenant_first() -> None:
|
||||
"""
|
||||
auth/verify response contains active_tenant_id as the first tenant ID.
|
||||
"""
|
||||
user = _make_user("customer_admin")
|
||||
tenant_id_1 = uuid.uuid4()
|
||||
memberships = [
|
||||
_make_tenant_role(user.id, tenant_id_1, "customer_admin"),
|
||||
]
|
||||
session = AsyncMock()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def execute_side_effect(stmt):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
result.scalar_one_or_none = MagicMock(return_value=user)
|
||||
else:
|
||||
result.scalars = MagicMock(return_value=MagicMock(all=MagicMock(return_value=memberships)))
|
||||
return result
|
||||
|
||||
session.execute = AsyncMock(side_effect=execute_side_effect)
|
||||
|
||||
body = AuthVerifyRequest(email=user.email, password="testpassword")
|
||||
response = await verify_credentials(body=body, session=session)
|
||||
|
||||
assert response.active_tenant_id == str(tenant_id_1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_verify_active_tenant_none_for_no_memberships() -> None:
|
||||
"""
|
||||
auth/verify response contains active_tenant_id=None for users with no tenant memberships.
|
||||
"""
|
||||
user = _make_user("customer_admin")
|
||||
session = AsyncMock()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def execute_side_effect(stmt):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
result.scalar_one_or_none = MagicMock(return_value=user)
|
||||
else:
|
||||
result.scalars = MagicMock(return_value=MagicMock(all=MagicMock(return_value=[])))
|
||||
return result
|
||||
|
||||
session.execute = AsyncMock(side_effect=execute_side_effect)
|
||||
|
||||
body = AuthVerifyRequest(email=user.email, password="testpassword")
|
||||
response = await verify_credentials(body=body, session=session)
|
||||
|
||||
assert response.active_tenant_id is None
|
||||
assert response.tenant_ids == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# test_auth_verify_platform_admin_returns_all_tenants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_verify_platform_admin_returns_all_tenants() -> None:
|
||||
"""
|
||||
platform_admin auth/verify returns all tenant IDs from the tenants table.
|
||||
"""
|
||||
user = _make_user("platform_admin")
|
||||
tenant_1 = _make_tenant("Acme")
|
||||
tenant_2 = _make_tenant("Globex")
|
||||
all_tenants = [tenant_1, tenant_2]
|
||||
|
||||
session = AsyncMock()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def execute_side_effect(stmt):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
# User lookup
|
||||
result.scalar_one_or_none = MagicMock(return_value=user)
|
||||
else:
|
||||
# All tenants query for platform_admin
|
||||
result.scalars = MagicMock(return_value=MagicMock(all=MagicMock(return_value=all_tenants)))
|
||||
return result
|
||||
|
||||
session.execute = AsyncMock(side_effect=execute_side_effect)
|
||||
|
||||
body = AuthVerifyRequest(email=user.email, password="testpassword")
|
||||
response = await verify_credentials(body=body, session=session)
|
||||
|
||||
assert response.role == "platform_admin"
|
||||
assert len(response.tenant_ids) == 2
|
||||
assert str(tenant_1.id) in response.tenant_ids
|
||||
assert str(tenant_2.id) in response.tenant_ids
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# test_auth_verify_customer_admin_only_own_tenants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auth_verify_customer_admin_only_own_tenants() -> None:
|
||||
"""
|
||||
customer_admin auth/verify returns only their UserTenantRole tenant IDs.
|
||||
Not the full tenant list.
|
||||
"""
|
||||
user = _make_user("customer_admin")
|
||||
own_tenant_id = uuid.uuid4()
|
||||
other_tenant_id = uuid.uuid4() # Should NOT appear in response
|
||||
|
||||
memberships = [_make_tenant_role(user.id, own_tenant_id, "customer_admin")]
|
||||
session = AsyncMock()
|
||||
|
||||
call_count = 0
|
||||
|
||||
def execute_side_effect(stmt):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
result.scalar_one_or_none = MagicMock(return_value=user)
|
||||
else:
|
||||
result.scalars = MagicMock(return_value=MagicMock(all=MagicMock(return_value=memberships)))
|
||||
return result
|
||||
|
||||
session.execute = AsyncMock(side_effect=execute_side_effect)
|
||||
|
||||
body = AuthVerifyRequest(email=user.email, password="testpassword")
|
||||
response = await verify_credentials(body=body, session=session)
|
||||
|
||||
assert response.role == "customer_admin"
|
||||
assert len(response.tenant_ids) == 1
|
||||
assert str(own_tenant_id) in response.tenant_ids
|
||||
assert str(other_tenant_id) not in response.tenant_ids
|
||||
Reference in New Issue
Block a user