feat(02-04): implement escalation handler (rule evaluator, transcript, DM delivery)
- check_escalation_rules: condition parser for 'keyword AND count > N' and NL phrases - build_transcript: formats messages as Slack mrkdwn, truncates at 3000 chars - escalate_to_human: opens DM, posts transcript, sets Redis key, logs audit event
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
"""
|
||||
Escalation package for the Konstruct Agent Orchestrator.
|
||||
|
||||
Provides rule-based and natural language escalation logic:
|
||||
- check_escalation_rules: evaluates configured rules and NL triggers
|
||||
- build_transcript: formats conversation history for DM delivery
|
||||
- escalate_to_human: full handoff pipeline (Slack DM + Redis + audit)
|
||||
"""
|
||||
342
packages/orchestrator/orchestrator/escalation/handler.py
Normal file
342
packages/orchestrator/orchestrator/escalation/handler.py
Normal file
@@ -0,0 +1,342 @@
|
||||
"""
|
||||
Human escalation/handoff handler for the Konstruct Agent Orchestrator.
|
||||
|
||||
This module implements the "employee asks for manager" escalation pattern:
|
||||
when a configured rule triggers or the user explicitly requests a human,
|
||||
the agent hands off gracefully by:
|
||||
1. Evaluating escalation rules against conversation metadata / message text
|
||||
2. Packaging the conversation transcript in Slack mrkdwn format
|
||||
3. Opening a DM with the assigned human and posting the transcript
|
||||
4. Setting an escalation status flag in Redis so future messages route correctly
|
||||
5. Logging the event to the audit trail
|
||||
|
||||
PUBLIC INTERFACE:
|
||||
check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled) -> dict | None
|
||||
build_transcript(recent_messages) -> str
|
||||
escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages,
|
||||
assignee_slack_user_id, bot_token, redis, audit_logger) -> str
|
||||
|
||||
CONDITION LANGUAGE (v1 — simple keyword parser):
|
||||
- "keyword AND count_field > N"
|
||||
e.g. "billing_dispute AND attempts > 2"
|
||||
Checks that conversation_metadata[keyword] is truthy AND
|
||||
conversation_metadata[count_field] > N (integer comparison).
|
||||
- "natural_language_escalation"
|
||||
Triggers when message_text contains a common escalation phrase
|
||||
AND natural_lang_enabled is True.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Natural language escalation phrases (case-insensitive substring match)
|
||||
# ---------------------------------------------------------------------------
|
||||
_ESCALATION_PHRASES: tuple[str, ...] = (
|
||||
"talk to a human",
|
||||
"speak to someone",
|
||||
"get a person",
|
||||
"human agent",
|
||||
"real person",
|
||||
"manager",
|
||||
)
|
||||
|
||||
# Maximum Slack message block size (characters)
|
||||
_TRANSCRIPT_MAX_CHARS = 3000
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# check_escalation_rules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_escalation_rules(
|
||||
agent: Any,
|
||||
message_text: str,
|
||||
conversation_metadata: dict[str, Any],
|
||||
natural_lang_enabled: bool = False,
|
||||
) -> dict[str, Any] | None:
|
||||
"""
|
||||
Evaluate the agent's configured escalation rules against the current message.
|
||||
|
||||
Iterates agent.escalation_rules in order and returns the first matching rule.
|
||||
Returns None when no rule matches.
|
||||
|
||||
Condition types:
|
||||
- "keyword AND count_field > N": boolean flag AND integer threshold check
|
||||
- "natural_language_escalation": user is explicitly asking for a human
|
||||
|
||||
Args:
|
||||
agent: Agent ORM/mock object with escalation_rules list and name.
|
||||
message_text: The user's current message text.
|
||||
conversation_metadata: Dict of runtime metadata (e.g. {"billing_dispute": True, "attempts": 3}).
|
||||
natural_lang_enabled: Whether the tenant has natural language escalation enabled.
|
||||
|
||||
Returns:
|
||||
The first matching rule dict (e.g. {"condition": "...", "action": "handoff_human"}),
|
||||
or None if no rule matches.
|
||||
"""
|
||||
rules: list[dict[str, Any]] = getattr(agent, "escalation_rules", []) or []
|
||||
|
||||
for rule in rules:
|
||||
condition: str = rule.get("condition", "")
|
||||
|
||||
if condition == "natural_language_escalation":
|
||||
if _matches_natural_language(message_text, natural_lang_enabled):
|
||||
logger.debug("Natural language escalation triggered: %r", message_text[:80])
|
||||
return rule
|
||||
|
||||
elif _matches_condition(condition, conversation_metadata):
|
||||
logger.debug("Escalation rule matched: %r metadata=%s", condition, conversation_metadata)
|
||||
return rule
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _matches_natural_language(message_text: str, enabled: bool) -> bool:
|
||||
"""Return True when the message contains an escalation phrase and NL is enabled."""
|
||||
if not enabled:
|
||||
return False
|
||||
normalized = message_text.lower()
|
||||
return any(phrase in normalized for phrase in _ESCALATION_PHRASES)
|
||||
|
||||
|
||||
def _matches_condition(condition: str, metadata: dict[str, Any]) -> bool:
|
||||
"""
|
||||
Simple condition evaluator for "keyword AND count_field > N" format.
|
||||
|
||||
Supports only the AND combinator. Left operand is a boolean flag key,
|
||||
right operand is an integer comparison (>, >=, <, <=, ==, !=).
|
||||
|
||||
Returns False on any parsing error or missing metadata key.
|
||||
"""
|
||||
# Strip whitespace
|
||||
condition = condition.strip()
|
||||
|
||||
# Only handle "X AND Y op Z" format
|
||||
and_match = re.match(
|
||||
r"^(?P<flag>\w+)\s+AND\s+(?P<count_field>\w+)\s*(?P<op>[><!]=?|==)\s*(?P<value>\d+)$",
|
||||
condition,
|
||||
re.IGNORECASE,
|
||||
)
|
||||
if not and_match:
|
||||
# Unknown condition format — do not match
|
||||
return False
|
||||
|
||||
flag_key = and_match.group("flag")
|
||||
count_field = and_match.group("count_field")
|
||||
op = and_match.group("op")
|
||||
threshold = int(and_match.group("value"))
|
||||
|
||||
# Check boolean flag
|
||||
flag_value = metadata.get(flag_key)
|
||||
if not flag_value:
|
||||
return False
|
||||
|
||||
# Check integer comparison
|
||||
count_value = metadata.get(count_field)
|
||||
if count_value is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
count_int = int(count_value)
|
||||
except (TypeError, ValueError):
|
||||
return False
|
||||
|
||||
return _compare(count_int, op, threshold)
|
||||
|
||||
|
||||
def _compare(actual: int, op: str, threshold: int) -> bool:
|
||||
"""Evaluate an integer comparison operator."""
|
||||
if op == ">":
|
||||
return actual > threshold
|
||||
if op == ">=":
|
||||
return actual >= threshold
|
||||
if op == "<":
|
||||
return actual < threshold
|
||||
if op == "<=":
|
||||
return actual <= threshold
|
||||
if op in ("==", "="):
|
||||
return actual == threshold
|
||||
if op == "!=":
|
||||
return actual != threshold
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# build_transcript
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def build_transcript(recent_messages: list[dict[str, str]]) -> str:
|
||||
"""
|
||||
Format conversation history as Slack mrkdwn for DM delivery.
|
||||
|
||||
Each message is formatted as "*Role:* content" and joined with newlines.
|
||||
Output is truncated to _TRANSCRIPT_MAX_CHARS if necessary, with a
|
||||
truncation indicator appended.
|
||||
|
||||
Args:
|
||||
recent_messages: List of {"role": ..., "content": ...} dicts, oldest first.
|
||||
|
||||
Returns:
|
||||
Formatted transcript string, at most 3000 characters.
|
||||
"""
|
||||
if not recent_messages:
|
||||
return ""
|
||||
|
||||
lines: list[str] = []
|
||||
for msg in recent_messages:
|
||||
role = msg.get("role", "unknown").capitalize()
|
||||
content = msg.get("content", "")
|
||||
lines.append(f"*{role}:* {content}")
|
||||
|
||||
transcript = "\n".join(lines)
|
||||
|
||||
if len(transcript) > _TRANSCRIPT_MAX_CHARS:
|
||||
# Truncate and indicate how many chars were cut
|
||||
truncation_suffix = "...[truncated]"
|
||||
transcript = transcript[: _TRANSCRIPT_MAX_CHARS - len(truncation_suffix)] + truncation_suffix
|
||||
|
||||
return transcript
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# escalate_to_human
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def escalate_to_human(
|
||||
tenant_id: str,
|
||||
agent: Any,
|
||||
thread_id: str,
|
||||
trigger_reason: str,
|
||||
recent_messages: list[dict[str, str]],
|
||||
assignee_slack_user_id: str,
|
||||
bot_token: str,
|
||||
redis: Any,
|
||||
audit_logger: Any,
|
||||
user_id: str = "",
|
||||
agent_id: str = "",
|
||||
) -> str:
|
||||
"""
|
||||
Perform a full human escalation handoff.
|
||||
|
||||
Steps:
|
||||
1. Build the conversation transcript
|
||||
2. Compose the DM message (employee metaphor)
|
||||
3. Open a Slack DM with the assigned human via conversations.open
|
||||
4. Post the transcript to the DM via chat.postMessage
|
||||
5. Set the escalation status key in Redis (no TTL — stays until resolved)
|
||||
6. Log the escalation event to the audit trail
|
||||
7. Return the user-facing message to replace the LLM response
|
||||
|
||||
Args:
|
||||
tenant_id: Konstruct tenant identifier.
|
||||
agent: Agent with .name and .escalation_assignee attributes.
|
||||
thread_id: Thread identifier for the escalated conversation.
|
||||
trigger_reason: Human-readable description of what triggered escalation.
|
||||
recent_messages: Conversation history to include in DM.
|
||||
assignee_slack_user_id: Slack user ID of the human to DM.
|
||||
bot_token: Bot token (xoxb-...) for Slack API calls.
|
||||
redis: Redis async client for escalation status storage.
|
||||
audit_logger: AuditLogger instance (or compatible no-op).
|
||||
user_id: End-user identifier (optional, for audit logging).
|
||||
agent_id: Agent identifier string (optional, for audit logging).
|
||||
|
||||
Returns:
|
||||
User-facing message confirming escalation (replaces LLM response).
|
||||
"""
|
||||
import httpx
|
||||
|
||||
from shared.redis_keys import escalation_status_key
|
||||
|
||||
agent_name: str = getattr(agent, "name", "Your assistant")
|
||||
transcript = build_transcript(recent_messages)
|
||||
|
||||
dm_text = (
|
||||
f"*{agent_name} needs human assistance*\n"
|
||||
f"Reason: {trigger_reason}\n"
|
||||
f"Tenant: {tenant_id}\n"
|
||||
f"\nConversation transcript:\n{transcript}\n"
|
||||
f"\nThe agent will stay in the thread. You can reply directly to the user."
|
||||
)
|
||||
|
||||
dm_channel_id: str = ""
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
|
||||
# Step 1: Open DM channel with the assigned human
|
||||
open_resp = await client.post(
|
||||
"https://slack.com/api/conversations.open",
|
||||
headers={"Authorization": f"Bearer {bot_token}"},
|
||||
json={"users": assignee_slack_user_id},
|
||||
)
|
||||
open_data = open_resp.json()
|
||||
if open_data.get("ok"):
|
||||
dm_channel_id = open_data.get("channel", {}).get("id", "")
|
||||
else:
|
||||
logger.error(
|
||||
"conversations.open failed for assignee=%s: %r",
|
||||
assignee_slack_user_id,
|
||||
open_data.get("error"),
|
||||
)
|
||||
|
||||
# Step 2: Post transcript to DM
|
||||
if dm_channel_id:
|
||||
post_resp = await client.post(
|
||||
"https://slack.com/api/chat.postMessage",
|
||||
headers={"Authorization": f"Bearer {bot_token}"},
|
||||
json={
|
||||
"channel": dm_channel_id,
|
||||
"text": dm_text,
|
||||
},
|
||||
)
|
||||
post_data = post_resp.json()
|
||||
if not post_data.get("ok"):
|
||||
logger.error(
|
||||
"chat.postMessage to DM failed: channel=%s error=%r",
|
||||
dm_channel_id,
|
||||
post_data.get("error"),
|
||||
)
|
||||
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Escalation Slack API call failed for tenant=%s thread=%s",
|
||||
tenant_id,
|
||||
thread_id,
|
||||
)
|
||||
|
||||
# Step 3: Set escalation status in Redis (no TTL — cleared by human after resolution)
|
||||
esc_key = escalation_status_key(tenant_id, thread_id)
|
||||
await redis.set(esc_key, "escalated")
|
||||
|
||||
# Step 4: Log audit event
|
||||
try:
|
||||
await audit_logger.log_escalation(
|
||||
tenant_id=tenant_id,
|
||||
agent_id=agent_id,
|
||||
user_id=user_id,
|
||||
trigger_reason=trigger_reason,
|
||||
metadata={
|
||||
"thread_id": thread_id,
|
||||
"assignee_slack_user_id": assignee_slack_user_id,
|
||||
"dm_channel_id": dm_channel_id,
|
||||
},
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Failed to log escalation audit event for tenant=%s thread=%s",
|
||||
tenant_id,
|
||||
thread_id,
|
||||
)
|
||||
|
||||
# Step 5: Return user-facing confirmation message
|
||||
return (
|
||||
"I've brought in a team member to help with this. They'll be with you shortly."
|
||||
)
|
||||
Reference in New Issue
Block a user