test(02-04): add failing tests for escalation handler

- Unit tests: rule matching, natural language escalation, transcript formatting
- Integration tests: Slack API calls, Redis key, audit log, return value
This commit is contained in:
2026-03-23 14:49:54 -06:00
parent df7a5a922f
commit d489551130
2 changed files with 525 additions and 0 deletions

View File

@@ -0,0 +1,289 @@
"""
Integration tests for the escalation handler.
Tests cover:
- escalate_to_human: Slack API calls (conversations.open + chat.postMessage)
- escalate_to_human: Redis escalation key is set
- escalate_to_human: Audit event is logged
- escalate_to_human: Returns user-facing message
Uses fakeredis and mocked httpx to avoid real network/DB connections.
"""
from __future__ import annotations
import json
from unittest.mock import AsyncMock, MagicMock, patch
import fakeredis.aioredis
import pytest
from orchestrator.escalation.handler import escalate_to_human
from shared.redis_keys import escalation_status_key
TENANT_ID = "tenant-test-123"
THREAD_ID = "thread-T12345-123456.789"
BOT_TOKEN = "xoxb-test-bot-token"
ASSIGNEE_SLACK_ID = "U0HUMANID"
def make_agent(name: str = "Mara", assignee_id: str = ASSIGNEE_SLACK_ID) -> MagicMock:
"""Create a mock Agent for escalation tests."""
agent = MagicMock()
agent.name = name
agent.escalation_assignee = assignee_id
return agent
def make_audit_logger() -> MagicMock:
"""Create a mock AuditLogger with async log_escalation."""
logger = MagicMock()
logger.log_escalation = AsyncMock()
return logger
RECENT_MESSAGES = [
{"role": "user", "content": "I have a billing problem"},
{"role": "assistant", "content": "Let me look into that"},
{"role": "user", "content": "This is the third time!"},
]
@pytest.fixture
async def redis_client():
"""fakeredis async client — no real Redis needed."""
client = fakeredis.aioredis.FakeRedis()
yield client
await client.aclose()
class TestEscalateToHuman:
async def test_conversations_open_called(self, redis_client) -> None:
"""escalate_to_human opens a DM channel via conversations.open."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
# First call should be conversations.open
first_call = mock_client.post.call_args_list[0]
assert "conversations.open" in first_call.args[0]
# Should include the user ID
call_json = first_call.kwargs.get("json") or first_call.args[1] if len(first_call.args) > 1 else {}
assert ASSIGNEE_SLACK_ID in str(first_call)
async def test_chat_post_message_called_with_transcript(self, redis_client) -> None:
"""escalate_to_human posts transcript to DM via chat.postMessage."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
# Second call should be chat.postMessage
second_call = mock_client.post.call_args_list[1]
assert "chat.postMessage" in second_call.args[0]
# DM channel ID from conversations.open response should be used
call_str = str(second_call)
assert "D0DMCHANNELID" in call_str
async def test_escalation_key_set_in_redis(self, redis_client) -> None:
"""escalate_to_human sets escalation status key in Redis."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
key = escalation_status_key(TENANT_ID, THREAD_ID)
value = await redis_client.get(key)
assert value is not None
assert value.decode() == "escalated"
async def test_audit_log_called_with_escalation(self, redis_client) -> None:
"""escalate_to_human logs escalation event to audit trail."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
audit_logger.log_escalation.assert_called_once()
call_kwargs = audit_logger.log_escalation.call_args
# Should be called with trigger_reason
assert "billing_dispute" in str(call_kwargs)
async def test_returns_user_facing_message(self, redis_client) -> None:
"""escalate_to_human returns a user-facing escalation message."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
result = await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
assert isinstance(result, str)
assert len(result) > 0
# Should mention a team member or human
result_lower = result.lower()
assert "team member" in result_lower or "colleague" in result_lower or "shortly" in result_lower
async def test_dm_text_contains_agent_name_and_reason(self, redis_client) -> None:
"""DM posted to human contains agent name and escalation reason."""
agent = make_agent(name="TestBot")
audit_logger = make_audit_logger()
captured_posts: list[dict] = []
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
async def capture_post(url: str, **kwargs) -> MagicMock:
captured_posts.append({"url": url, "kwargs": kwargs})
if "conversations.open" in url:
return _make_httpx_response(open_response)
return _make_httpx_response(post_response)
mock_client.post = AsyncMock(side_effect=capture_post)
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="test escalation reason",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
# Find the postMessage call
post_call = next(p for p in captured_posts if "postMessage" in p["url"])
post_text = post_call["kwargs"].get("json", {}).get("text", "")
assert "TestBot" in post_text
assert "test escalation reason" in post_text
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_httpx_response(data: dict) -> MagicMock:
"""Create a mock httpx Response that returns data from .json()."""
resp = MagicMock()
resp.json = MagicMock(return_value=data)
resp.status_code = 200
return resp

View File

@@ -0,0 +1,236 @@
"""
Unit tests for the escalation handler.
Tests cover:
- check_escalation_rules: rule matching logic with conversation metadata
- check_escalation_rules: natural language escalation trigger
- build_transcript: message formatting and truncation
"""
from __future__ import annotations
from unittest.mock import MagicMock
import pytest
from orchestrator.escalation.handler import build_transcript, check_escalation_rules
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_agent(rules: list[dict], name: str = "Mara") -> MagicMock:
"""Create a mock Agent with escalation_rules and name."""
agent = MagicMock()
agent.name = name
agent.escalation_rules = rules
return agent
# ---------------------------------------------------------------------------
# check_escalation_rules — rule-based matching
# ---------------------------------------------------------------------------
class TestCheckEscalationRules:
def test_no_rules_returns_none(self) -> None:
"""Agent with no escalation rules always returns None."""
agent = make_agent([])
result = check_escalation_rules(agent, "hello", {})
assert result is None
def test_billing_dispute_and_attempts_matches(self) -> None:
"""Condition 'billing_dispute AND attempts > 2' matches when metadata has both."""
agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"I want to cancel",
{"billing_dispute": True, "attempts": 3},
)
assert result is not None
assert result["action"] == "handoff_human"
def test_billing_dispute_insufficient_attempts_no_match(self) -> None:
"""Condition 'billing_dispute AND attempts > 2' does NOT match when attempts <= 2."""
agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"help me",
{"billing_dispute": True, "attempts": 2},
)
assert result is None
def test_billing_dispute_false_no_match(self) -> None:
"""Condition 'billing_dispute AND attempts > 2' does NOT match when billing_dispute=False."""
agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"help me",
{"billing_dispute": False, "attempts": 5},
)
assert result is None
def test_first_matching_rule_returned(self) -> None:
"""When multiple rules exist, the first matching rule is returned."""
agent = make_agent([
{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"},
{"condition": "billing_dispute AND attempts > 1", "action": "other_action"},
])
result = check_escalation_rules(
agent,
"help",
{"billing_dispute": True, "attempts": 3},
)
assert result is not None
assert result["action"] == "handoff_human"
def test_missing_metadata_field_no_match(self) -> None:
"""Condition with field missing from metadata returns None (not an error)."""
agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"help",
{}, # no metadata at all
)
assert result is None
# ---------------------------------------------------------------------------
# check_escalation_rules — natural language escalation
# ---------------------------------------------------------------------------
class TestNaturalLanguageEscalation:
def test_talk_to_human_phrase_matches_when_enabled(self) -> None:
"""'can I talk to a human?' triggers escalation when natural_lang_enabled=True."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"can I talk to a human?",
{},
natural_lang_enabled=True,
)
assert result is not None
assert result["action"] == "handoff_human"
def test_talk_to_human_phrase_no_match_when_disabled(self) -> None:
"""Natural language escalation returns None when natural_lang_enabled=False."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"can I talk to a human?",
{},
natural_lang_enabled=False,
)
assert result is None
def test_speak_to_someone_phrase(self) -> None:
"""'speak to someone' triggers natural language escalation when enabled."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(agent, "I need to speak to someone", {}, natural_lang_enabled=True)
assert result is not None
def test_get_a_person_phrase(self) -> None:
"""'get a person' triggers natural language escalation when enabled."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(agent, "can I get a person please", {}, natural_lang_enabled=True)
assert result is not None
def test_human_agent_phrase(self) -> None:
"""'human agent' triggers natural language escalation when enabled."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(agent, "I want a human agent", {}, natural_lang_enabled=True)
assert result is not None
def test_real_person_phrase(self) -> None:
"""'real person' triggers natural language escalation when enabled."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(agent, "let me talk to a real person", {}, natural_lang_enabled=True)
assert result is not None
def test_manager_phrase(self) -> None:
"""'manager' triggers natural language escalation when enabled."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(agent, "get me your manager", {}, natural_lang_enabled=True)
assert result is not None
def test_normal_message_does_not_trigger(self) -> None:
"""Normal message does not trigger natural language escalation even when enabled."""
agent = make_agent([{"condition": "natural_language_escalation", "action": "handoff_human"}])
result = check_escalation_rules(agent, "what is my account balance?", {}, natural_lang_enabled=True)
assert result is None
def test_natural_lang_only_when_no_other_rules_match(self) -> None:
"""Agent with no natural_language_escalation condition does not trigger on phrases."""
agent = make_agent([{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
result = check_escalation_rules(
agent,
"can I talk to a human?",
{},
natural_lang_enabled=True,
)
assert result is None
# ---------------------------------------------------------------------------
# build_transcript — formatting
# ---------------------------------------------------------------------------
class TestBuildTranscript:
def test_empty_messages(self) -> None:
"""build_transcript with empty list returns empty string."""
result = build_transcript([])
assert result == ""
def test_single_user_message(self) -> None:
"""Single user message is formatted correctly."""
messages = [{"role": "user", "content": "Hello!"}]
result = build_transcript(messages)
assert result == "*User:* Hello!"
def test_single_assistant_message(self) -> None:
"""Single assistant message is formatted correctly."""
messages = [{"role": "assistant", "content": "How can I help?"}]
result = build_transcript(messages)
assert result == "*Assistant:* How can I help?"
def test_conversation_turn(self) -> None:
"""User and assistant messages are joined with newlines."""
messages = [
{"role": "user", "content": "I have a billing issue"},
{"role": "assistant", "content": "I can help with that"},
]
result = build_transcript(messages)
assert result == "*User:* I have a billing issue\n*Assistant:* I can help with that"
def test_multiple_turns(self) -> None:
"""Multiple conversation turns maintain order."""
messages = [
{"role": "user", "content": "First"},
{"role": "assistant", "content": "Second"},
{"role": "user", "content": "Third"},
]
result = build_transcript(messages)
lines = result.split("\n")
assert len(lines) == 3
assert lines[0] == "*User:* First"
assert lines[1] == "*Assistant:* Second"
assert lines[2] == "*User:* Third"
def test_transcript_truncated_at_3000_chars(self) -> None:
"""build_transcript truncates output to 3000 characters max."""
# Create a message that will produce output well over 3000 chars
long_content = "x" * 200
messages = [{"role": "user", "content": long_content} for _ in range(20)]
result = build_transcript(messages)
assert len(result) <= 3000
def test_transcript_under_limit_not_truncated(self) -> None:
"""build_transcript does not truncate output under 3000 chars."""
messages = [
{"role": "user", "content": "Short message"},
{"role": "assistant", "content": "Short reply"},
]
result = build_transcript(messages)
assert "..." not in result or len(result) <= 3000
assert "*User:* Short message" in result
assert "*Assistant:* Short reply" in result