feat(02-04): implement escalation handler (rule evaluator, transcript, DM delivery)
- check_escalation_rules: condition parser for 'keyword AND count > N' and NL phrases - build_transcript: formats messages as Slack mrkdwn, truncates at 3000 chars - escalate_to_human: opens DM, posts transcript, sets Redis key, logs audit event
This commit is contained in:
@@ -0,0 +1,8 @@
|
|||||||
|
"""
|
||||||
|
Escalation package for the Konstruct Agent Orchestrator.
|
||||||
|
|
||||||
|
Provides rule-based and natural language escalation logic:
|
||||||
|
- check_escalation_rules: evaluates configured rules and NL triggers
|
||||||
|
- build_transcript: formats conversation history for DM delivery
|
||||||
|
- escalate_to_human: full handoff pipeline (Slack DM + Redis + audit)
|
||||||
|
"""
|
||||||
342
packages/orchestrator/orchestrator/escalation/handler.py
Normal file
342
packages/orchestrator/orchestrator/escalation/handler.py
Normal file
@@ -0,0 +1,342 @@
|
|||||||
|
"""
|
||||||
|
Human escalation/handoff handler for the Konstruct Agent Orchestrator.
|
||||||
|
|
||||||
|
This module implements the "employee asks for manager" escalation pattern:
|
||||||
|
when a configured rule triggers or the user explicitly requests a human,
|
||||||
|
the agent hands off gracefully by:
|
||||||
|
1. Evaluating escalation rules against conversation metadata / message text
|
||||||
|
2. Packaging the conversation transcript in Slack mrkdwn format
|
||||||
|
3. Opening a DM with the assigned human and posting the transcript
|
||||||
|
4. Setting an escalation status flag in Redis so future messages route correctly
|
||||||
|
5. Logging the event to the audit trail
|
||||||
|
|
||||||
|
PUBLIC INTERFACE:
|
||||||
|
check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled) -> dict | None
|
||||||
|
build_transcript(recent_messages) -> str
|
||||||
|
escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages,
|
||||||
|
assignee_slack_user_id, bot_token, redis, audit_logger) -> str
|
||||||
|
|
||||||
|
CONDITION LANGUAGE (v1 — simple keyword parser):
|
||||||
|
- "keyword AND count_field > N"
|
||||||
|
e.g. "billing_dispute AND attempts > 2"
|
||||||
|
Checks that conversation_metadata[keyword] is truthy AND
|
||||||
|
conversation_metadata[count_field] > N (integer comparison).
|
||||||
|
- "natural_language_escalation"
|
||||||
|
Triggers when message_text contains a common escalation phrase
|
||||||
|
AND natural_lang_enabled is True.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
pass
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Natural language escalation phrases (case-insensitive substring match)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
_ESCALATION_PHRASES: tuple[str, ...] = (
|
||||||
|
"talk to a human",
|
||||||
|
"speak to someone",
|
||||||
|
"get a person",
|
||||||
|
"human agent",
|
||||||
|
"real person",
|
||||||
|
"manager",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Maximum Slack message block size (characters)
|
||||||
|
_TRANSCRIPT_MAX_CHARS = 3000
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# check_escalation_rules
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def check_escalation_rules(
|
||||||
|
agent: Any,
|
||||||
|
message_text: str,
|
||||||
|
conversation_metadata: dict[str, Any],
|
||||||
|
natural_lang_enabled: bool = False,
|
||||||
|
) -> dict[str, Any] | None:
|
||||||
|
"""
|
||||||
|
Evaluate the agent's configured escalation rules against the current message.
|
||||||
|
|
||||||
|
Iterates agent.escalation_rules in order and returns the first matching rule.
|
||||||
|
Returns None when no rule matches.
|
||||||
|
|
||||||
|
Condition types:
|
||||||
|
- "keyword AND count_field > N": boolean flag AND integer threshold check
|
||||||
|
- "natural_language_escalation": user is explicitly asking for a human
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: Agent ORM/mock object with escalation_rules list and name.
|
||||||
|
message_text: The user's current message text.
|
||||||
|
conversation_metadata: Dict of runtime metadata (e.g. {"billing_dispute": True, "attempts": 3}).
|
||||||
|
natural_lang_enabled: Whether the tenant has natural language escalation enabled.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The first matching rule dict (e.g. {"condition": "...", "action": "handoff_human"}),
|
||||||
|
or None if no rule matches.
|
||||||
|
"""
|
||||||
|
rules: list[dict[str, Any]] = getattr(agent, "escalation_rules", []) or []
|
||||||
|
|
||||||
|
for rule in rules:
|
||||||
|
condition: str = rule.get("condition", "")
|
||||||
|
|
||||||
|
if condition == "natural_language_escalation":
|
||||||
|
if _matches_natural_language(message_text, natural_lang_enabled):
|
||||||
|
logger.debug("Natural language escalation triggered: %r", message_text[:80])
|
||||||
|
return rule
|
||||||
|
|
||||||
|
elif _matches_condition(condition, conversation_metadata):
|
||||||
|
logger.debug("Escalation rule matched: %r metadata=%s", condition, conversation_metadata)
|
||||||
|
return rule
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _matches_natural_language(message_text: str, enabled: bool) -> bool:
|
||||||
|
"""Return True when the message contains an escalation phrase and NL is enabled."""
|
||||||
|
if not enabled:
|
||||||
|
return False
|
||||||
|
normalized = message_text.lower()
|
||||||
|
return any(phrase in normalized for phrase in _ESCALATION_PHRASES)
|
||||||
|
|
||||||
|
|
||||||
|
def _matches_condition(condition: str, metadata: dict[str, Any]) -> bool:
|
||||||
|
"""
|
||||||
|
Simple condition evaluator for "keyword AND count_field > N" format.
|
||||||
|
|
||||||
|
Supports only the AND combinator. Left operand is a boolean flag key,
|
||||||
|
right operand is an integer comparison (>, >=, <, <=, ==, !=).
|
||||||
|
|
||||||
|
Returns False on any parsing error or missing metadata key.
|
||||||
|
"""
|
||||||
|
# Strip whitespace
|
||||||
|
condition = condition.strip()
|
||||||
|
|
||||||
|
# Only handle "X AND Y op Z" format
|
||||||
|
and_match = re.match(
|
||||||
|
r"^(?P<flag>\w+)\s+AND\s+(?P<count_field>\w+)\s*(?P<op>[><!]=?|==)\s*(?P<value>\d+)$",
|
||||||
|
condition,
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
if not and_match:
|
||||||
|
# Unknown condition format — do not match
|
||||||
|
return False
|
||||||
|
|
||||||
|
flag_key = and_match.group("flag")
|
||||||
|
count_field = and_match.group("count_field")
|
||||||
|
op = and_match.group("op")
|
||||||
|
threshold = int(and_match.group("value"))
|
||||||
|
|
||||||
|
# Check boolean flag
|
||||||
|
flag_value = metadata.get(flag_key)
|
||||||
|
if not flag_value:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check integer comparison
|
||||||
|
count_value = metadata.get(count_field)
|
||||||
|
if count_value is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
count_int = int(count_value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return _compare(count_int, op, threshold)
|
||||||
|
|
||||||
|
|
||||||
|
def _compare(actual: int, op: str, threshold: int) -> bool:
|
||||||
|
"""Evaluate an integer comparison operator."""
|
||||||
|
if op == ">":
|
||||||
|
return actual > threshold
|
||||||
|
if op == ">=":
|
||||||
|
return actual >= threshold
|
||||||
|
if op == "<":
|
||||||
|
return actual < threshold
|
||||||
|
if op == "<=":
|
||||||
|
return actual <= threshold
|
||||||
|
if op in ("==", "="):
|
||||||
|
return actual == threshold
|
||||||
|
if op == "!=":
|
||||||
|
return actual != threshold
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# build_transcript
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def build_transcript(recent_messages: list[dict[str, str]]) -> str:
|
||||||
|
"""
|
||||||
|
Format conversation history as Slack mrkdwn for DM delivery.
|
||||||
|
|
||||||
|
Each message is formatted as "*Role:* content" and joined with newlines.
|
||||||
|
Output is truncated to _TRANSCRIPT_MAX_CHARS if necessary, with a
|
||||||
|
truncation indicator appended.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
recent_messages: List of {"role": ..., "content": ...} dicts, oldest first.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted transcript string, at most 3000 characters.
|
||||||
|
"""
|
||||||
|
if not recent_messages:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
lines: list[str] = []
|
||||||
|
for msg in recent_messages:
|
||||||
|
role = msg.get("role", "unknown").capitalize()
|
||||||
|
content = msg.get("content", "")
|
||||||
|
lines.append(f"*{role}:* {content}")
|
||||||
|
|
||||||
|
transcript = "\n".join(lines)
|
||||||
|
|
||||||
|
if len(transcript) > _TRANSCRIPT_MAX_CHARS:
|
||||||
|
# Truncate and indicate how many chars were cut
|
||||||
|
truncation_suffix = "...[truncated]"
|
||||||
|
transcript = transcript[: _TRANSCRIPT_MAX_CHARS - len(truncation_suffix)] + truncation_suffix
|
||||||
|
|
||||||
|
return transcript
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# escalate_to_human
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
async def escalate_to_human(
|
||||||
|
tenant_id: str,
|
||||||
|
agent: Any,
|
||||||
|
thread_id: str,
|
||||||
|
trigger_reason: str,
|
||||||
|
recent_messages: list[dict[str, str]],
|
||||||
|
assignee_slack_user_id: str,
|
||||||
|
bot_token: str,
|
||||||
|
redis: Any,
|
||||||
|
audit_logger: Any,
|
||||||
|
user_id: str = "",
|
||||||
|
agent_id: str = "",
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Perform a full human escalation handoff.
|
||||||
|
|
||||||
|
Steps:
|
||||||
|
1. Build the conversation transcript
|
||||||
|
2. Compose the DM message (employee metaphor)
|
||||||
|
3. Open a Slack DM with the assigned human via conversations.open
|
||||||
|
4. Post the transcript to the DM via chat.postMessage
|
||||||
|
5. Set the escalation status key in Redis (no TTL — stays until resolved)
|
||||||
|
6. Log the escalation event to the audit trail
|
||||||
|
7. Return the user-facing message to replace the LLM response
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tenant_id: Konstruct tenant identifier.
|
||||||
|
agent: Agent with .name and .escalation_assignee attributes.
|
||||||
|
thread_id: Thread identifier for the escalated conversation.
|
||||||
|
trigger_reason: Human-readable description of what triggered escalation.
|
||||||
|
recent_messages: Conversation history to include in DM.
|
||||||
|
assignee_slack_user_id: Slack user ID of the human to DM.
|
||||||
|
bot_token: Bot token (xoxb-...) for Slack API calls.
|
||||||
|
redis: Redis async client for escalation status storage.
|
||||||
|
audit_logger: AuditLogger instance (or compatible no-op).
|
||||||
|
user_id: End-user identifier (optional, for audit logging).
|
||||||
|
agent_id: Agent identifier string (optional, for audit logging).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
User-facing message confirming escalation (replaces LLM response).
|
||||||
|
"""
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from shared.redis_keys import escalation_status_key
|
||||||
|
|
||||||
|
agent_name: str = getattr(agent, "name", "Your assistant")
|
||||||
|
transcript = build_transcript(recent_messages)
|
||||||
|
|
||||||
|
dm_text = (
|
||||||
|
f"*{agent_name} needs human assistance*\n"
|
||||||
|
f"Reason: {trigger_reason}\n"
|
||||||
|
f"Tenant: {tenant_id}\n"
|
||||||
|
f"\nConversation transcript:\n{transcript}\n"
|
||||||
|
f"\nThe agent will stay in the thread. You can reply directly to the user."
|
||||||
|
)
|
||||||
|
|
||||||
|
dm_channel_id: str = ""
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
|
||||||
|
# Step 1: Open DM channel with the assigned human
|
||||||
|
open_resp = await client.post(
|
||||||
|
"https://slack.com/api/conversations.open",
|
||||||
|
headers={"Authorization": f"Bearer {bot_token}"},
|
||||||
|
json={"users": assignee_slack_user_id},
|
||||||
|
)
|
||||||
|
open_data = open_resp.json()
|
||||||
|
if open_data.get("ok"):
|
||||||
|
dm_channel_id = open_data.get("channel", {}).get("id", "")
|
||||||
|
else:
|
||||||
|
logger.error(
|
||||||
|
"conversations.open failed for assignee=%s: %r",
|
||||||
|
assignee_slack_user_id,
|
||||||
|
open_data.get("error"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 2: Post transcript to DM
|
||||||
|
if dm_channel_id:
|
||||||
|
post_resp = await client.post(
|
||||||
|
"https://slack.com/api/chat.postMessage",
|
||||||
|
headers={"Authorization": f"Bearer {bot_token}"},
|
||||||
|
json={
|
||||||
|
"channel": dm_channel_id,
|
||||||
|
"text": dm_text,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
post_data = post_resp.json()
|
||||||
|
if not post_data.get("ok"):
|
||||||
|
logger.error(
|
||||||
|
"chat.postMessage to DM failed: channel=%s error=%r",
|
||||||
|
dm_channel_id,
|
||||||
|
post_data.get("error"),
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"Escalation Slack API call failed for tenant=%s thread=%s",
|
||||||
|
tenant_id,
|
||||||
|
thread_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 3: Set escalation status in Redis (no TTL — cleared by human after resolution)
|
||||||
|
esc_key = escalation_status_key(tenant_id, thread_id)
|
||||||
|
await redis.set(esc_key, "escalated")
|
||||||
|
|
||||||
|
# Step 4: Log audit event
|
||||||
|
try:
|
||||||
|
await audit_logger.log_escalation(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
agent_id=agent_id,
|
||||||
|
user_id=user_id,
|
||||||
|
trigger_reason=trigger_reason,
|
||||||
|
metadata={
|
||||||
|
"thread_id": thread_id,
|
||||||
|
"assignee_slack_user_id": assignee_slack_user_id,
|
||||||
|
"dm_channel_id": dm_channel_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"Failed to log escalation audit event for tenant=%s thread=%s",
|
||||||
|
tenant_id,
|
||||||
|
thread_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 5: Return user-facing confirmation message
|
||||||
|
return (
|
||||||
|
"I've brought in a team member to help with this. They'll be with you shortly."
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user