""" Unit tests for Stripe webhook handler logic. Tests: - process_stripe_event idempotency: same event_id processed twice returns "already_processed" on the second call - customer.subscription.updated: updates tenant subscription_status - customer.subscription.deleted: sets status=canceled and deactivates agents """ from __future__ import annotations import uuid from unittest.mock import AsyncMock, MagicMock, patch import pytest from shared.api.billing import process_stripe_event @pytest.fixture() def mock_session() -> AsyncMock: """Mock AsyncSession that tracks execute calls.""" session = AsyncMock() # Default: no existing StripeEvent found (not yet processed) session.execute.return_value = MagicMock(scalar_one_or_none=MagicMock(return_value=None)) return session @pytest.fixture() def tenant_id() -> str: return str(uuid.uuid4()) @pytest.fixture() def agent_id() -> str: return str(uuid.uuid4()) class _MockTenant: def __init__(self, tenant_id: str) -> None: self.id = uuid.UUID(tenant_id) self.stripe_customer_id: str | None = None self.stripe_subscription_id: str | None = None self.stripe_subscription_item_id: str | None = None self.subscription_status: str = "trialing" self.trial_ends_at = None self.agent_quota: int = 0 class _MockAgent: def __init__(self, agent_id: str, tenant_id: str) -> None: self.id = uuid.UUID(agent_id) self.tenant_id = uuid.UUID(tenant_id) self.is_active: bool = True @pytest.mark.asyncio async def test_stripe_webhook_idempotency(mock_session: AsyncMock, tenant_id: str) -> None: """ Processing the same event_id twice returns 'already_processed' on second call. The second call should detect the existing StripeEvent and skip processing. """ event_id = "evt_test_idempotent_001" event_data = { "id": event_id, "type": "customer.subscription.updated", "data": { "object": { "id": "sub_test123", "status": "active", "customer": "cus_test123", "trial_end": None, "items": {"data": [{"id": "si_test123", "quantity": 2}]}, } }, } # First call: no existing event in DB first_session = AsyncMock() first_session.execute.return_value = MagicMock( scalar_one_or_none=MagicMock(return_value=None), scalars=MagicMock(return_value=MagicMock(all=MagicMock(return_value=[]))), ) mock_tenant = _MockTenant(tenant_id) with patch("shared.api.billing._get_tenant_by_stripe_customer", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_tenant result1 = await process_stripe_event(event_data, first_session) assert result1 != "already_processed" # Second call: event already in DB (simulating idempotent duplicate) existing_event = MagicMock() # Non-None means already processed second_session = AsyncMock() second_session.execute.return_value = MagicMock( scalar_one_or_none=MagicMock(return_value=existing_event) ) result2 = await process_stripe_event(event_data, second_session) assert result2 == "already_processed" @pytest.mark.asyncio async def test_stripe_subscription_updated(mock_session: AsyncMock, tenant_id: str) -> None: """ customer.subscription.updated event updates tenant subscription_status. """ event_data = { "id": "evt_test_sub_updated", "type": "customer.subscription.updated", "data": { "object": { "id": "sub_updated123", "status": "active", "customer": "cus_tenant123", "trial_end": None, "items": {"data": [{"id": "si_updated123", "quantity": 3}]}, } }, } mock_tenant = _MockTenant(tenant_id) assert mock_tenant.subscription_status == "trialing" with patch("shared.api.billing._get_tenant_by_stripe_customer", new_callable=AsyncMock) as mock_get: mock_get.return_value = mock_tenant result = await process_stripe_event(event_data, mock_session) assert result != "already_processed" assert mock_tenant.subscription_status == "active" assert mock_tenant.stripe_subscription_id == "sub_updated123" assert mock_tenant.agent_quota == 3 @pytest.mark.asyncio async def test_stripe_cancellation(mock_session: AsyncMock, tenant_id: str, agent_id: str) -> None: """ customer.subscription.deleted sets status=canceled and deactivates all tenant agents. """ event_data = { "id": "evt_test_canceled", "type": "customer.subscription.deleted", "data": { "object": { "id": "sub_canceled123", "status": "canceled", "customer": "cus_tenant_cancel", "trial_end": None, "items": {"data": []}, } }, } mock_tenant = _MockTenant(tenant_id) mock_tenant.subscription_status = "active" mock_agent = _MockAgent(agent_id, tenant_id) assert mock_agent.is_active is True with ( patch("shared.api.billing._get_tenant_by_stripe_customer", new_callable=AsyncMock) as mock_get_tenant, patch("shared.api.billing._deactivate_all_agents", new_callable=AsyncMock) as mock_deactivate, ): mock_get_tenant.return_value = mock_tenant # Simulate deactivation side effect async def _do_deactivate(session, tenant_id_arg): # type: ignore[no-untyped-def] mock_agent.is_active = False mock_deactivate.side_effect = _do_deactivate result = await process_stripe_event(event_data, mock_session) assert result != "already_processed" assert mock_tenant.subscription_status == "canceled" assert mock_agent.is_active is False mock_deactivate.assert_called_once()