feat(02-04): implement escalation handler (rule evaluator, transcript, DM delivery)

- check_escalation_rules: condition parser for 'keyword AND count > N' and NL phrases
- build_transcript: formats messages as Slack mrkdwn, truncates at 3000 chars
- escalate_to_human: opens DM, posts transcript, sets Redis key, logs audit event
This commit is contained in:
2026-03-23 14:50:56 -06:00
parent 30b9f60668
commit 4047b552a7
2 changed files with 350 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
"""
Escalation package for the Konstruct Agent Orchestrator.
Provides rule-based and natural language escalation logic:
- check_escalation_rules: evaluates configured rules and NL triggers
- build_transcript: formats conversation history for DM delivery
- escalate_to_human: full handoff pipeline (Slack DM + Redis + audit)
"""

View File

@@ -0,0 +1,342 @@
"""
Human escalation/handoff handler for the Konstruct Agent Orchestrator.
This module implements the "employee asks for manager" escalation pattern:
when a configured rule triggers or the user explicitly requests a human,
the agent hands off gracefully by:
1. Evaluating escalation rules against conversation metadata / message text
2. Packaging the conversation transcript in Slack mrkdwn format
3. Opening a DM with the assigned human and posting the transcript
4. Setting an escalation status flag in Redis so future messages route correctly
5. Logging the event to the audit trail
PUBLIC INTERFACE:
check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled) -> dict | None
build_transcript(recent_messages) -> str
escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages,
assignee_slack_user_id, bot_token, redis, audit_logger) -> str
CONDITION LANGUAGE (v1 — simple keyword parser):
- "keyword AND count_field > N"
e.g. "billing_dispute AND attempts > 2"
Checks that conversation_metadata[keyword] is truthy AND
conversation_metadata[count_field] > N (integer comparison).
- "natural_language_escalation"
Triggers when message_text contains a common escalation phrase
AND natural_lang_enabled is True.
"""
from __future__ import annotations
import logging
import re
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Natural language escalation phrases (case-insensitive substring match)
# ---------------------------------------------------------------------------
_ESCALATION_PHRASES: tuple[str, ...] = (
"talk to a human",
"speak to someone",
"get a person",
"human agent",
"real person",
"manager",
)
# Maximum Slack message block size (characters)
_TRANSCRIPT_MAX_CHARS = 3000
# ---------------------------------------------------------------------------
# check_escalation_rules
# ---------------------------------------------------------------------------
def check_escalation_rules(
agent: Any,
message_text: str,
conversation_metadata: dict[str, Any],
natural_lang_enabled: bool = False,
) -> dict[str, Any] | None:
"""
Evaluate the agent's configured escalation rules against the current message.
Iterates agent.escalation_rules in order and returns the first matching rule.
Returns None when no rule matches.
Condition types:
- "keyword AND count_field > N": boolean flag AND integer threshold check
- "natural_language_escalation": user is explicitly asking for a human
Args:
agent: Agent ORM/mock object with escalation_rules list and name.
message_text: The user's current message text.
conversation_metadata: Dict of runtime metadata (e.g. {"billing_dispute": True, "attempts": 3}).
natural_lang_enabled: Whether the tenant has natural language escalation enabled.
Returns:
The first matching rule dict (e.g. {"condition": "...", "action": "handoff_human"}),
or None if no rule matches.
"""
rules: list[dict[str, Any]] = getattr(agent, "escalation_rules", []) or []
for rule in rules:
condition: str = rule.get("condition", "")
if condition == "natural_language_escalation":
if _matches_natural_language(message_text, natural_lang_enabled):
logger.debug("Natural language escalation triggered: %r", message_text[:80])
return rule
elif _matches_condition(condition, conversation_metadata):
logger.debug("Escalation rule matched: %r metadata=%s", condition, conversation_metadata)
return rule
return None
def _matches_natural_language(message_text: str, enabled: bool) -> bool:
"""Return True when the message contains an escalation phrase and NL is enabled."""
if not enabled:
return False
normalized = message_text.lower()
return any(phrase in normalized for phrase in _ESCALATION_PHRASES)
def _matches_condition(condition: str, metadata: dict[str, Any]) -> bool:
"""
Simple condition evaluator for "keyword AND count_field > N" format.
Supports only the AND combinator. Left operand is a boolean flag key,
right operand is an integer comparison (>, >=, <, <=, ==, !=).
Returns False on any parsing error or missing metadata key.
"""
# Strip whitespace
condition = condition.strip()
# Only handle "X AND Y op Z" format
and_match = re.match(
r"^(?P<flag>\w+)\s+AND\s+(?P<count_field>\w+)\s*(?P<op>[><!]=?|==)\s*(?P<value>\d+)$",
condition,
re.IGNORECASE,
)
if not and_match:
# Unknown condition format — do not match
return False
flag_key = and_match.group("flag")
count_field = and_match.group("count_field")
op = and_match.group("op")
threshold = int(and_match.group("value"))
# Check boolean flag
flag_value = metadata.get(flag_key)
if not flag_value:
return False
# Check integer comparison
count_value = metadata.get(count_field)
if count_value is None:
return False
try:
count_int = int(count_value)
except (TypeError, ValueError):
return False
return _compare(count_int, op, threshold)
def _compare(actual: int, op: str, threshold: int) -> bool:
"""Evaluate an integer comparison operator."""
if op == ">":
return actual > threshold
if op == ">=":
return actual >= threshold
if op == "<":
return actual < threshold
if op == "<=":
return actual <= threshold
if op in ("==", "="):
return actual == threshold
if op == "!=":
return actual != threshold
return False
# ---------------------------------------------------------------------------
# build_transcript
# ---------------------------------------------------------------------------
def build_transcript(recent_messages: list[dict[str, str]]) -> str:
"""
Format conversation history as Slack mrkdwn for DM delivery.
Each message is formatted as "*Role:* content" and joined with newlines.
Output is truncated to _TRANSCRIPT_MAX_CHARS if necessary, with a
truncation indicator appended.
Args:
recent_messages: List of {"role": ..., "content": ...} dicts, oldest first.
Returns:
Formatted transcript string, at most 3000 characters.
"""
if not recent_messages:
return ""
lines: list[str] = []
for msg in recent_messages:
role = msg.get("role", "unknown").capitalize()
content = msg.get("content", "")
lines.append(f"*{role}:* {content}")
transcript = "\n".join(lines)
if len(transcript) > _TRANSCRIPT_MAX_CHARS:
# Truncate and indicate how many chars were cut
truncation_suffix = "...[truncated]"
transcript = transcript[: _TRANSCRIPT_MAX_CHARS - len(truncation_suffix)] + truncation_suffix
return transcript
# ---------------------------------------------------------------------------
# escalate_to_human
# ---------------------------------------------------------------------------
async def escalate_to_human(
tenant_id: str,
agent: Any,
thread_id: str,
trigger_reason: str,
recent_messages: list[dict[str, str]],
assignee_slack_user_id: str,
bot_token: str,
redis: Any,
audit_logger: Any,
user_id: str = "",
agent_id: str = "",
) -> str:
"""
Perform a full human escalation handoff.
Steps:
1. Build the conversation transcript
2. Compose the DM message (employee metaphor)
3. Open a Slack DM with the assigned human via conversations.open
4. Post the transcript to the DM via chat.postMessage
5. Set the escalation status key in Redis (no TTL — stays until resolved)
6. Log the escalation event to the audit trail
7. Return the user-facing message to replace the LLM response
Args:
tenant_id: Konstruct tenant identifier.
agent: Agent with .name and .escalation_assignee attributes.
thread_id: Thread identifier for the escalated conversation.
trigger_reason: Human-readable description of what triggered escalation.
recent_messages: Conversation history to include in DM.
assignee_slack_user_id: Slack user ID of the human to DM.
bot_token: Bot token (xoxb-...) for Slack API calls.
redis: Redis async client for escalation status storage.
audit_logger: AuditLogger instance (or compatible no-op).
user_id: End-user identifier (optional, for audit logging).
agent_id: Agent identifier string (optional, for audit logging).
Returns:
User-facing message confirming escalation (replaces LLM response).
"""
import httpx
from shared.redis_keys import escalation_status_key
agent_name: str = getattr(agent, "name", "Your assistant")
transcript = build_transcript(recent_messages)
dm_text = (
f"*{agent_name} needs human assistance*\n"
f"Reason: {trigger_reason}\n"
f"Tenant: {tenant_id}\n"
f"\nConversation transcript:\n{transcript}\n"
f"\nThe agent will stay in the thread. You can reply directly to the user."
)
dm_channel_id: str = ""
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
# Step 1: Open DM channel with the assigned human
open_resp = await client.post(
"https://slack.com/api/conversations.open",
headers={"Authorization": f"Bearer {bot_token}"},
json={"users": assignee_slack_user_id},
)
open_data = open_resp.json()
if open_data.get("ok"):
dm_channel_id = open_data.get("channel", {}).get("id", "")
else:
logger.error(
"conversations.open failed for assignee=%s: %r",
assignee_slack_user_id,
open_data.get("error"),
)
# Step 2: Post transcript to DM
if dm_channel_id:
post_resp = await client.post(
"https://slack.com/api/chat.postMessage",
headers={"Authorization": f"Bearer {bot_token}"},
json={
"channel": dm_channel_id,
"text": dm_text,
},
)
post_data = post_resp.json()
if not post_data.get("ok"):
logger.error(
"chat.postMessage to DM failed: channel=%s error=%r",
dm_channel_id,
post_data.get("error"),
)
except Exception:
logger.exception(
"Escalation Slack API call failed for tenant=%s thread=%s",
tenant_id,
thread_id,
)
# Step 3: Set escalation status in Redis (no TTL — cleared by human after resolution)
esc_key = escalation_status_key(tenant_id, thread_id)
await redis.set(esc_key, "escalated")
# Step 4: Log audit event
try:
await audit_logger.log_escalation(
tenant_id=tenant_id,
agent_id=agent_id,
user_id=user_id,
trigger_reason=trigger_reason,
metadata={
"thread_id": thread_id,
"assignee_slack_user_id": assignee_slack_user_id,
"dm_channel_id": dm_channel_id,
},
)
except Exception:
logger.exception(
"Failed to log escalation audit event for tenant=%s thread=%s",
tenant_id,
thread_id,
)
# Step 5: Return user-facing confirmation message
return (
"I've brought in a team member to help with this. They'll be with you shortly."
)