Files
konstruct/tests/integration/test_escalation.py
Adolfo Delorenzo d489551130 test(02-04): add failing tests for escalation handler
- Unit tests: rule matching, natural language escalation, transcript formatting
- Integration tests: Slack API calls, Redis key, audit log, return value
2026-03-23 14:49:54 -06:00

290 lines
11 KiB
Python

"""
Integration tests for the escalation handler.
Tests cover:
- escalate_to_human: Slack API calls (conversations.open + chat.postMessage)
- escalate_to_human: Redis escalation key is set
- escalate_to_human: Audit event is logged
- escalate_to_human: Returns user-facing message
Uses fakeredis and mocked httpx to avoid real network/DB connections.
"""
from __future__ import annotations
import json
from unittest.mock import AsyncMock, MagicMock, patch
import fakeredis.aioredis
import pytest
from orchestrator.escalation.handler import escalate_to_human
from shared.redis_keys import escalation_status_key
TENANT_ID = "tenant-test-123"
THREAD_ID = "thread-T12345-123456.789"
BOT_TOKEN = "xoxb-test-bot-token"
ASSIGNEE_SLACK_ID = "U0HUMANID"
def make_agent(name: str = "Mara", assignee_id: str = ASSIGNEE_SLACK_ID) -> MagicMock:
"""Create a mock Agent for escalation tests."""
agent = MagicMock()
agent.name = name
agent.escalation_assignee = assignee_id
return agent
def make_audit_logger() -> MagicMock:
"""Create a mock AuditLogger with async log_escalation."""
logger = MagicMock()
logger.log_escalation = AsyncMock()
return logger
RECENT_MESSAGES = [
{"role": "user", "content": "I have a billing problem"},
{"role": "assistant", "content": "Let me look into that"},
{"role": "user", "content": "This is the third time!"},
]
@pytest.fixture
async def redis_client():
"""fakeredis async client — no real Redis needed."""
client = fakeredis.aioredis.FakeRedis()
yield client
await client.aclose()
class TestEscalateToHuman:
async def test_conversations_open_called(self, redis_client) -> None:
"""escalate_to_human opens a DM channel via conversations.open."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
# First call should be conversations.open
first_call = mock_client.post.call_args_list[0]
assert "conversations.open" in first_call.args[0]
# Should include the user ID
call_json = first_call.kwargs.get("json") or first_call.args[1] if len(first_call.args) > 1 else {}
assert ASSIGNEE_SLACK_ID in str(first_call)
async def test_chat_post_message_called_with_transcript(self, redis_client) -> None:
"""escalate_to_human posts transcript to DM via chat.postMessage."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
# Second call should be chat.postMessage
second_call = mock_client.post.call_args_list[1]
assert "chat.postMessage" in second_call.args[0]
# DM channel ID from conversations.open response should be used
call_str = str(second_call)
assert "D0DMCHANNELID" in call_str
async def test_escalation_key_set_in_redis(self, redis_client) -> None:
"""escalate_to_human sets escalation status key in Redis."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
key = escalation_status_key(TENANT_ID, THREAD_ID)
value = await redis_client.get(key)
assert value is not None
assert value.decode() == "escalated"
async def test_audit_log_called_with_escalation(self, redis_client) -> None:
"""escalate_to_human logs escalation event to audit trail."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
audit_logger.log_escalation.assert_called_once()
call_kwargs = audit_logger.log_escalation.call_args
# Should be called with trigger_reason
assert "billing_dispute" in str(call_kwargs)
async def test_returns_user_facing_message(self, redis_client) -> None:
"""escalate_to_human returns a user-facing escalation message."""
agent = make_agent()
audit_logger = make_audit_logger()
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
mock_client.post = AsyncMock(side_effect=[
_make_httpx_response(open_response),
_make_httpx_response(post_response),
])
result = await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="billing_dispute AND attempts > 2",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
assert isinstance(result, str)
assert len(result) > 0
# Should mention a team member or human
result_lower = result.lower()
assert "team member" in result_lower or "colleague" in result_lower or "shortly" in result_lower
async def test_dm_text_contains_agent_name_and_reason(self, redis_client) -> None:
"""DM posted to human contains agent name and escalation reason."""
agent = make_agent(name="TestBot")
audit_logger = make_audit_logger()
captured_posts: list[dict] = []
open_response = {"ok": True, "channel": {"id": "D0DMCHANNELID"}}
post_response = {"ok": True, "ts": "9999.0001"}
with patch("httpx.AsyncClient") as mock_client_cls:
mock_client = AsyncMock()
mock_client_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client)
mock_client_cls.return_value.__aexit__ = AsyncMock(return_value=None)
async def capture_post(url: str, **kwargs) -> MagicMock:
captured_posts.append({"url": url, "kwargs": kwargs})
if "conversations.open" in url:
return _make_httpx_response(open_response)
return _make_httpx_response(post_response)
mock_client.post = AsyncMock(side_effect=capture_post)
await escalate_to_human(
tenant_id=TENANT_ID,
agent=agent,
thread_id=THREAD_ID,
trigger_reason="test escalation reason",
recent_messages=RECENT_MESSAGES,
assignee_slack_user_id=ASSIGNEE_SLACK_ID,
bot_token=BOT_TOKEN,
redis=redis_client,
audit_logger=audit_logger,
)
# Find the postMessage call
post_call = next(p for p in captured_posts if "postMessage" in p["url"])
post_text = post_call["kwargs"].get("json", {}).get("text", "")
assert "TestBot" in post_text
assert "test escalation reason" in post_text
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_httpx_response(data: dict) -> MagicMock:
"""Create a mock httpx Response that returns data from .json()."""
resp = MagicMock()
resp.json = MagicMock(return_value=data)
resp.status_code = 200
return resp