From d4895511306b9ef46daaf7eb67ed876dc571ac03 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 23 Mar 2026 14:49:54 -0600 Subject: [PATCH] test(02-04): add failing tests for escalation handler - Unit tests: rule matching, natural language escalation, transcript formatting - Integration tests: Slack API calls, Redis key, audit log, return value --- tests/integration/test_escalation.py | 289 +++++++++++++++++++++++++++ tests/unit/test_escalation.py | 236 ++++++++++++++++++++++ 2 files changed, 525 insertions(+) create mode 100644 tests/integration/test_escalation.py create mode 100644 tests/unit/test_escalation.py diff --git a/tests/integration/test_escalation.py b/tests/integration/test_escalation.py new file mode 100644 index 0000000..847f88b --- /dev/null +++ b/tests/integration/test_escalation.py @@ -0,0 +1,289 @@ +""" +Integration tests for the escalation handler. + +Tests cover: +- escalate_to_human: Slack API calls (conversations.open + chat.postMessage) +- escalate_to_human: Redis escalation key is set +- escalate_to_human: Audit event is logged +- escalate_to_human: Returns user-facing message + +Uses fakeredis and mocked httpx to avoid real network/DB connections. +""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import fakeredis.aioredis +import pytest + +from orchestrator.escalation.handler import escalate_to_human +from shared.redis_keys import escalation_status_key + + +TENANT_ID = "tenant-test-123" +THREAD_ID = "thread-T12345-123456.789" +BOT_TOKEN = "xoxb-test-bot-token" +ASSIGNEE_SLACK_ID = "U0HUMANID" + + +def make_agent(name: str = "Mara", assignee_id: str = ASSIGNEE_SLACK_ID) -> MagicMock: + """Create a mock Agent for escalation tests.""" + agent = MagicMock() + agent.name = name + agent.escalation_assignee = assignee_id + return agent + + +def make_audit_logger() -> MagicMock: + """Create a mock AuditLogger with async log_escalation.""" + logger = MagicMock() + logger.log_escalation = AsyncMock() + return logger + + +RECENT_MESSAGES = [ + {"role": "user", "content": "I have a billing problem"}, + {"role": "assistant", "content": "Let me look into that"}, + {"role": "user", "content": "This is the third time!"}, +] + + +@pytest.fixture +async def redis_client(): + """fakeredis async client — no real Redis needed.""" + client = fakeredis.aioredis.FakeRedis() + yield client + await client.aclose() + + +class TestEscalateToHuman: + async def test_conversations_open_called(self, redis_client) -> None: + """escalate_to_human opens a DM channel via conversations.open.""" + agent = make_agent() + audit_logger = make_audit_logger() + + open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} + post_response = {"ok": True, "ts": "9999.0001"} + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(side_effect=[ + _make_httpx_response(open_response), + _make_httpx_response(post_response), + ]) + + await escalate_to_human( + tenant_id=TENANT_ID, + agent=agent, + thread_id=THREAD_ID, + trigger_reason="billing_dispute AND attempts > 2", + recent_messages=RECENT_MESSAGES, + assignee_slack_user_id=ASSIGNEE_SLACK_ID, + bot_token=BOT_TOKEN, + redis=redis_client, + audit_logger=audit_logger, + ) + + # First call should be conversations.open + first_call = mock_client.post.call_args_list[0] + assert "conversations.open" in first_call.args[0] + # Should include the user ID + call_json = first_call.kwargs.get("json") or first_call.args[1] if len(first_call.args) > 1 else {} + assert ASSIGNEE_SLACK_ID in str(first_call) + + async def test_chat_post_message_called_with_transcript(self, redis_client) -> None: + """escalate_to_human posts transcript to DM via chat.postMessage.""" + agent = make_agent() + audit_logger = make_audit_logger() + + open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} + post_response = {"ok": True, "ts": "9999.0001"} + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(side_effect=[ + _make_httpx_response(open_response), + _make_httpx_response(post_response), + ]) + + await escalate_to_human( + tenant_id=TENANT_ID, + agent=agent, + thread_id=THREAD_ID, + trigger_reason="billing_dispute AND attempts > 2", + recent_messages=RECENT_MESSAGES, + assignee_slack_user_id=ASSIGNEE_SLACK_ID, + bot_token=BOT_TOKEN, + redis=redis_client, + audit_logger=audit_logger, + ) + + # Second call should be chat.postMessage + second_call = mock_client.post.call_args_list[1] + assert "chat.postMessage" in second_call.args[0] + + # DM channel ID from conversations.open response should be used + call_str = str(second_call) + assert "D0DMCHANNELID" in call_str + + async def test_escalation_key_set_in_redis(self, redis_client) -> None: + """escalate_to_human sets escalation status key in Redis.""" + agent = make_agent() + audit_logger = make_audit_logger() + + open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} + post_response = {"ok": True, "ts": "9999.0001"} + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(side_effect=[ + _make_httpx_response(open_response), + _make_httpx_response(post_response), + ]) + + await escalate_to_human( + tenant_id=TENANT_ID, + agent=agent, + thread_id=THREAD_ID, + trigger_reason="billing_dispute AND attempts > 2", + recent_messages=RECENT_MESSAGES, + assignee_slack_user_id=ASSIGNEE_SLACK_ID, + bot_token=BOT_TOKEN, + redis=redis_client, + audit_logger=audit_logger, + ) + + key = escalation_status_key(TENANT_ID, THREAD_ID) + value = await redis_client.get(key) + assert value is not None + assert value.decode() == "escalated" + + async def test_audit_log_called_with_escalation(self, redis_client) -> None: + """escalate_to_human logs escalation event to audit trail.""" + agent = make_agent() + audit_logger = make_audit_logger() + + open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} + post_response = {"ok": True, "ts": "9999.0001"} + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(side_effect=[ + _make_httpx_response(open_response), + _make_httpx_response(post_response), + ]) + + await escalate_to_human( + tenant_id=TENANT_ID, + agent=agent, + thread_id=THREAD_ID, + trigger_reason="billing_dispute AND attempts > 2", + recent_messages=RECENT_MESSAGES, + assignee_slack_user_id=ASSIGNEE_SLACK_ID, + bot_token=BOT_TOKEN, + redis=redis_client, + audit_logger=audit_logger, + ) + + audit_logger.log_escalation.assert_called_once() + call_kwargs = audit_logger.log_escalation.call_args + # Should be called with trigger_reason + assert "billing_dispute" in str(call_kwargs) + + async def test_returns_user_facing_message(self, redis_client) -> None: + """escalate_to_human returns a user-facing escalation message.""" + agent = make_agent() + audit_logger = make_audit_logger() + + open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} + post_response = {"ok": True, "ts": "9999.0001"} + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + mock_client.post = AsyncMock(side_effect=[ + _make_httpx_response(open_response), + _make_httpx_response(post_response), + ]) + + result = await escalate_to_human( + tenant_id=TENANT_ID, + agent=agent, + thread_id=THREAD_ID, + trigger_reason="billing_dispute AND attempts > 2", + recent_messages=RECENT_MESSAGES, + assignee_slack_user_id=ASSIGNEE_SLACK_ID, + bot_token=BOT_TOKEN, + redis=redis_client, + audit_logger=audit_logger, + ) + + assert isinstance(result, str) + assert len(result) > 0 + # Should mention a team member or human + result_lower = result.lower() + assert "team member" in result_lower or "colleague" in result_lower or "shortly" in result_lower + + async def test_dm_text_contains_agent_name_and_reason(self, redis_client) -> None: + """DM posted to human contains agent name and escalation reason.""" + agent = make_agent(name="TestBot") + audit_logger = make_audit_logger() + + captured_posts: list[dict] = [] + + open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}} + post_response = {"ok": True, "ts": "9999.0001"} + + with patch("httpx.AsyncClient") as mock_client_cls: + mock_client = AsyncMock() + mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None) + + async def capture_post(url: str, **kwargs) -> MagicMock: + captured_posts.append({"url": url, "kwargs": kwargs}) + if "conversations.open" in url: + return _make_httpx_response(open_response) + return _make_httpx_response(post_response) + + mock_client.post = AsyncMock(side_effect=capture_post) + + await escalate_to_human( + tenant_id=TENANT_ID, + agent=agent, + thread_id=THREAD_ID, + trigger_reason="test escalation reason", + recent_messages=RECENT_MESSAGES, + assignee_slack_user_id=ASSIGNEE_SLACK_ID, + bot_token=BOT_TOKEN, + redis=redis_client, + audit_logger=audit_logger, + ) + + # Find the postMessage call + post_call = next(p for p in captured_posts if "postMessage" in p["url"]) + post_text = post_call["kwargs"].get("json", {}).get("text", "") + assert "TestBot" in post_text + assert "test escalation reason" in post_text + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_httpx_response(data: dict) -> MagicMock: + """Create a mock httpx Response that returns data from .json().""" + resp = MagicMock() + resp.json = MagicMock(return_value=data) + resp.status_code = 200 + return resp diff --git a/tests/unit/test_escalation.py b/tests/unit/test_escalation.py new file mode 100644 index 0000000..251b673 --- /dev/null +++ b/tests/unit/test_escalation.py @@ -0,0 +1,236 @@ +""" +Unit tests for the escalation handler. + +Tests cover: +- check_escalation_rules: rule matching logic with conversation metadata +- check_escalation_rules: natural language escalation trigger +- build_transcript: message formatting and truncation +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from orchestrator.escalation.handler import build_transcript, check_escalation_rules + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def make_agent(rules: list[dict], name: str = "Mara") -> MagicMock: + """Create a mock Agent with escalation_rules and name.""" + agent = MagicMock() + agent.name = name + agent.escalation_rules = rules + return agent + + +# --------------------------------------------------------------------------- +# check_escalation_rules — rule-based matching +# --------------------------------------------------------------------------- + +class TestCheckEscalationRules: + def test_no_rules_returns_none(self) -> None: + """Agent with no escalation rules always returns None.""" + agent = make_agent([]) + result = check_escalation_rules(agent, "hello", {}) + assert result is None + + def test_billing_dispute_and_attempts_matches(self) -> None: + """Condition 'billing_dispute AND attempts > 2' matches when metadata has both.""" + agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "I want to cancel", + {"billing_dispute": True, "attempts": 3}, + ) + assert result is not None + assert result["action"] == "handoff_human" + + def test_billing_dispute_insufficient_attempts_no_match(self) -> None: + """Condition 'billing_dispute AND attempts > 2' does NOT match when attempts <= 2.""" + agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "help me", + {"billing_dispute": True, "attempts": 2}, + ) + assert result is None + + def test_billing_dispute_false_no_match(self) -> None: + """Condition 'billing_dispute AND attempts > 2' does NOT match when billing_dispute=False.""" + agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "help me", + {"billing_dispute": False, "attempts": 5}, + ) + assert result is None + + def test_first_matching_rule_returned(self) -> None: + """When multiple rules exist, the first matching rule is returned.""" + agent = make_agent([ + {"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}, + {"condition": "billing_dispute AND attempts > 1", "action": "other_action"}, + ]) + result = check_escalation_rules( + agent, + "help", + {"billing_dispute": True, "attempts": 3}, + ) + assert result is not None + assert result["action"] == "handoff_human" + + def test_missing_metadata_field_no_match(self) -> None: + """Condition with field missing from metadata returns None (not an error).""" + agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "help", + {}, # no metadata at all + ) + assert result is None + + +# --------------------------------------------------------------------------- +# check_escalation_rules — natural language escalation +# --------------------------------------------------------------------------- + +class TestNaturalLanguageEscalation: + def test_talk_to_human_phrase_matches_when_enabled(self) -> None: + """'can I talk to a human?' triggers escalation when natural_lang_enabled=True.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "can I talk to a human?", + {}, + natural_lang_enabled=True, + ) + assert result is not None + assert result["action"] == "handoff_human" + + def test_talk_to_human_phrase_no_match_when_disabled(self) -> None: + """Natural language escalation returns None when natural_lang_enabled=False.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "can I talk to a human?", + {}, + natural_lang_enabled=False, + ) + assert result is None + + def test_speak_to_someone_phrase(self) -> None: + """'speak to someone' triggers natural language escalation when enabled.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules(agent, "I need to speak to someone", {}, natural_lang_enabled=True) + assert result is not None + + def test_get_a_person_phrase(self) -> None: + """'get a person' triggers natural language escalation when enabled.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules(agent, "can I get a person please", {}, natural_lang_enabled=True) + assert result is not None + + def test_human_agent_phrase(self) -> None: + """'human agent' triggers natural language escalation when enabled.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules(agent, "I want a human agent", {}, natural_lang_enabled=True) + assert result is not None + + def test_real_person_phrase(self) -> None: + """'real person' triggers natural language escalation when enabled.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules(agent, "let me talk to a real person", {}, natural_lang_enabled=True) + assert result is not None + + def test_manager_phrase(self) -> None: + """'manager' triggers natural language escalation when enabled.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules(agent, "get me your manager", {}, natural_lang_enabled=True) + assert result is not None + + def test_normal_message_does_not_trigger(self) -> None: + """Normal message does not trigger natural language escalation even when enabled.""" + agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}]) + result = check_escalation_rules(agent, "what is my account balance?", {}, natural_lang_enabled=True) + assert result is None + + def test_natural_lang_only_when_no_other_rules_match(self) -> None: + """Agent with no natural_language_escalation condition does not trigger on phrases.""" + agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) + result = check_escalation_rules( + agent, + "can I talk to a human?", + {}, + natural_lang_enabled=True, + ) + assert result is None + + +# --------------------------------------------------------------------------- +# build_transcript — formatting +# --------------------------------------------------------------------------- + +class TestBuildTranscript: + def test_empty_messages(self) -> None: + """build_transcript with empty list returns empty string.""" + result = build_transcript([]) + assert result == "" + + def test_single_user_message(self) -> None: + """Single user message is formatted correctly.""" + messages = [{"role": "user", "content": "Hello!"}] + result = build_transcript(messages) + assert result == "*User:* Hello!" + + def test_single_assistant_message(self) -> None: + """Single assistant message is formatted correctly.""" + messages = [{"role": "assistant", "content": "How can I help?"}] + result = build_transcript(messages) + assert result == "*Assistant:* How can I help?" + + def test_conversation_turn(self) -> None: + """User and assistant messages are joined with newlines.""" + messages = [ + {"role": "user", "content": "I have a billing issue"}, + {"role": "assistant", "content": "I can help with that"}, + ] + result = build_transcript(messages) + assert result == "*User:* I have a billing issue\n*Assistant:* I can help with that" + + def test_multiple_turns(self) -> None: + """Multiple conversation turns maintain order.""" + messages = [ + {"role": "user", "content": "First"}, + {"role": "assistant", "content": "Second"}, + {"role": "user", "content": "Third"}, + ] + result = build_transcript(messages) + lines = result.split("\n") + assert len(lines) == 3 + assert lines[0] == "*User:* First" + assert lines[1] == "*Assistant:* Second" + assert lines[2] == "*User:* Third" + + def test_transcript_truncated_at_3000_chars(self) -> None: + """build_transcript truncates output to 3000 characters max.""" + # Create a message that will produce output well over 3000 chars + long_content = "x" * 200 + messages = [{"role": "user", "content": long_content} for _ in range(20)] + result = build_transcript(messages) + assert len(result) <= 3000 + + def test_transcript_under_limit_not_truncated(self) -> None: + """build_transcript does not truncate output under 3000 chars.""" + messages = [ + {"role": "user", "content": "Short message"}, + {"role": "assistant", "content": "Short reply"}, + ] + result = build_transcript(messages) + assert "..." not in result or len(result) <= 3000 + assert "*User:* Short message" in result + assert "*Assistant:* Short reply" in result