""" Integration tests for the escalation handler. Tests cover: - escalate_to_human: Slack API calls (conversations.open + chat.postMessage) - escalate_to_human: Redis escalation key is set - escalate_to_human: Audit event is logged - escalate_to_human: Returns user-facing message Uses fakeredis and mocked httpx to avoid real network/DB connections. """ from __future__ import annotations import json from unittest.mock import AsyncMock, MagicMock, patch import fakeredis.aioredis import pytest from orchestrator.escalation.handler import escalate_to_human from shared.redis_keys import escalation_status_key TENANT_ID = "tenant-test-123" THREAD_ID = "thread-T12345-123456.789" BOT_TOKEN = "xoxb-test-bot-token" ASSIGNEE_SLACK_ID = "U0HUMANID" def make_agent(name: str = "Mara", assignee_id: str = ASSIGNEE_SLACK_ID) -> MagicMock: """Create a mock Agent for escalation tests.""" agent = MagicMock() agent.name = name agent.escalation_assignee = assignee_id return agent def make_audit_logger() -> MagicMock: """Create a mock AuditLogger with async log_escalation.""" logger = MagicMock() logger.log_escalation = AsyncMock() return logger RECENT_MESSAGES = [ {"role": "user", "content": "I have a billing problem"}, {"role": "assistant", "content": "Let me look into that"}, {"role": "user", "content": "This is the third time!"}, ] @pytest.fixture async def redis_client(): """fakeredis async client — no real Redis needed.""" client = fakeredis.aioredis.FakeRedis() yield client await client.aclose() class TestEscalateToHuman: async def test_conversations_open_called(self, redis_client) -> None: """escalate_to_human opens a DM channel via conversations.open.""" agent = make_agent() audit_logger = make_audit_logger() open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} post_response = {"ok": True, "ts": "9999.0001"} with patch("httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) mock_client.post = AsyncMock(side_effect=[ _make_httpx_response(open_response), _make_httpx_response(post_response), ]) await escalate_to_human( tenant_id=TENANT_ID, agent=agent, thread_id=THREAD_ID, trigger_reason="billing_dispute AND attempts > 2", recent_messages=RECENT_MESSAGES, assignee_slack_user_id=ASSIGNEE_SLACK_ID, bot_token=BOT_TOKEN, redis=redis_client, audit_logger=audit_logger, ) # First call should be conversations.open first_call = mock_client.post.call_args_list[0] assert "conversations.open" in first_call.args[0] # Should include the user ID call_json = first_call.kwargs.get("json") or first_call.args[1] if len(first_call.args) > 1 else {} assert ASSIGNEE_SLACK_ID in str(first_call) async def test_chat_post_message_called_with_transcript(self, redis_client) -> None: """escalate_to_human posts transcript to DM via chat.postMessage.""" agent = make_agent() audit_logger = make_audit_logger() open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} post_response = {"ok": True, "ts": "9999.0001"} with patch("httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) mock_client.post = AsyncMock(side_effect=[ _make_httpx_response(open_response), _make_httpx_response(post_response), ]) await escalate_to_human( tenant_id=TENANT_ID, agent=agent, thread_id=THREAD_ID, trigger_reason="billing_dispute AND attempts > 2", recent_messages=RECENT_MESSAGES, assignee_slack_user_id=ASSIGNEE_SLACK_ID, bot_token=BOT_TOKEN, redis=redis_client, audit_logger=audit_logger, ) # Second call should be chat.postMessage second_call = mock_client.post.call_args_list[1] assert "chat.postMessage" in second_call.args[0] # DM channel ID from conversations.open response should be used call_str = str(second_call) assert "D0DMCHANNELID" in call_str async def test_escalation_key_set_in_redis(self, redis_client) -> None: """escalate_to_human sets escalation status key in Redis.""" agent = make_agent() audit_logger = make_audit_logger() open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} post_response = {"ok": True, "ts": "9999.0001"} with patch("httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) mock_client.post = AsyncMock(side_effect=[ _make_httpx_response(open_response), _make_httpx_response(post_response), ]) await escalate_to_human( tenant_id=TENANT_ID, agent=agent, thread_id=THREAD_ID, trigger_reason="billing_dispute AND attempts > 2", recent_messages=RECENT_MESSAGES, assignee_slack_user_id=ASSIGNEE_SLACK_ID, bot_token=BOT_TOKEN, redis=redis_client, audit_logger=audit_logger, ) key = escalation_status_key(TENANT_ID, THREAD_ID) value = await redis_client.get(key) assert value is not None assert value.decode() == "escalated" async def test_audit_log_called_with_escalation(self, redis_client) -> None: """escalate_to_human logs escalation event to audit trail.""" agent = make_agent() audit_logger = make_audit_logger() open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} post_response = {"ok": True, "ts": "9999.0001"} with patch("httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) mock_client.post = AsyncMock(side_effect=[ _make_httpx_response(open_response), _make_httpx_response(post_response), ]) await escalate_to_human( tenant_id=TENANT_ID, agent=agent, thread_id=THREAD_ID, trigger_reason="billing_dispute AND attempts > 2", recent_messages=RECENT_MESSAGES, assignee_slack_user_id=ASSIGNEE_SLACK_ID, bot_token=BOT_TOKEN, redis=redis_client, audit_logger=audit_logger, ) audit_logger.log_escalation.assert_called_once() call_kwargs = audit_logger.log_escalation.call_args # Should be called with trigger_reason assert "billing_dispute" in str(call_kwargs) async def test_returns_user_facing_message(self, redis_client) -> None: """escalate_to_human returns a user-facing escalation message.""" agent = make_agent() audit_logger = make_audit_logger() open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} post_response = {"ok": True, "ts": "9999.0001"} with patch("httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) mock_client.post = AsyncMock(side_effect=[ _make_httpx_response(open_response), _make_httpx_response(post_response), ]) result = await escalate_to_human( tenant_id=TENANT_ID, agent=agent, thread_id=THREAD_ID, trigger_reason="billing_dispute AND attempts > 2", recent_messages=RECENT_MESSAGES, assignee_slack_user_id=ASSIGNEE_SLACK_ID, bot_token=BOT_TOKEN, redis=redis_client, audit_logger=audit_logger, ) assert isinstance(result, str) assert len(result) > 0 # Should mention a team member or human result_lower = result.lower() assert "team member" in result_lower or "colleague" in result_lower or "shortly" in result_lower async def test_dm_text_contains_agent_name_and_reason(self, redis_client) -> None: """DM posted to human contains agent name and escalation reason.""" agent = make_agent(name="TestBot") audit_logger = make_audit_logger() captured_posts: list[dict] = [] open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} post_response = {"ok": True, "ts": "9999.0001"} with patch("httpx.AsyncClient") as mock_client_cls: mock_client = AsyncMock() mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) async def capture_post(url: str, **kwargs) -> MagicMock: captured_posts.append({"url": url, "kwargs": kwargs}) if "conversations.open" in url: return _make_httpx_response(open_response) return _make_httpx_response(post_response) mock_client.post = AsyncMock(side_effect=capture_post) await escalate_to_human( tenant_id=TENANT_ID, agent=agent, thread_id=THREAD_ID, trigger_reason="test escalation reason", recent_messages=RECENT_MESSAGES, assignee_slack_user_id=ASSIGNEE_SLACK_ID, bot_token=BOT_TOKEN, redis=redis_client, audit_logger=audit_logger, ) # Find the postMessage call post_call = next(p for p in captured_posts if "postMessage" in p["url"]) post_text = post_call["kwargs"].get("json", {}).get("text", "") assert "TestBot" in post_text assert "test escalation reason" in post_text # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_httpx_response(data: dict) -> MagicMock: """Create a mock httpx Response that returns data from .json().""" resp = MagicMock() resp.json = MagicMock(return_value=data) resp.status_code = 200 return resp