From 4047b552a7fbc9293d48878ff6bed9912c2c2b78 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 23 Mar 2026 14:50:56 -0600 Subject: [PATCH] feat(02-04): implement escalation handler (rule evaluator, transcript, DM delivery) - check_escalation_rules: condition parser for 'keyword AND count > N' and NL phrases - build_transcript: formats messages as Slack mrkdwn, truncates at 3000 chars - escalate_to_human: opens DM, posts transcript, sets Redis key, logs audit event --- .../orchestrator/escalation/__init__.py | 8 + .../orchestrator/escalation/handler.py | 342 ++++++++++++++++++ 2 files changed, 350 insertions(+) create mode 100644 packages/orchestrator/orchestrator/escalation/__init__.py create mode 100644 packages/orchestrator/orchestrator/escalation/handler.py diff --git a/packages/orchestrator/orchestrator/escalation/__init__.py b/packages/orchestrator/orchestrator/escalation/__init__.py new file mode 100644 index 0000000..791b091 --- /dev/null +++ b/packages/orchestrator/orchestrator/escalation/__init__.py @@ -0,0 +1,8 @@ +""" +Escalation package for the Konstruct Agent Orchestrator. + +Provides rule-based and natural language escalation logic: +- check_escalation_rules: evaluates configured rules and NL triggers +- build_transcript: formats conversation history for DM delivery +- escalate_to_human: full handoff pipeline (Slack DM + Redis + audit) +""" diff --git a/packages/orchestrator/orchestrator/escalation/handler.py b/packages/orchestrator/orchestrator/escalation/handler.py new file mode 100644 index 0000000..239b7b5 --- /dev/null +++ b/packages/orchestrator/orchestrator/escalation/handler.py @@ -0,0 +1,342 @@ +""" +Human escalation/handoff handler for the Konstruct Agent Orchestrator. + +This module implements the "employee asks for manager" escalation pattern: +when a configured rule triggers or the user explicitly requests a human, +the agent hands off gracefully by: + 1. Evaluating escalation rules against conversation metadata / message text + 2. Packaging the conversation transcript in Slack mrkdwn format + 3. Opening a DM with the assigned human and posting the transcript + 4. Setting an escalation status flag in Redis so future messages route correctly + 5. Logging the event to the audit trail + +PUBLIC INTERFACE: + check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled) -> dict | None + build_transcript(recent_messages) -> str + escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages, + assignee_slack_user_id, bot_token, redis, audit_logger) -> str + +CONDITION LANGUAGE (v1 — simple keyword parser): + - "keyword AND count_field > N" + e.g. "billing_dispute AND attempts > 2" + Checks that conversation_metadata[keyword] is truthy AND + conversation_metadata[count_field] > N (integer comparison). + - "natural_language_escalation" + Triggers when message_text contains a common escalation phrase + AND natural_lang_enabled is True. +""" + +from __future__ import annotations + +import logging +import re +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Natural language escalation phrases (case-insensitive substring match) +# --------------------------------------------------------------------------- +_ESCALATION_PHRASES: tuple[str, ...] = ( + "talk to a human", + "speak to someone", + "get a person", + "human agent", + "real person", + "manager", +) + +# Maximum Slack message block size (characters) +_TRANSCRIPT_MAX_CHARS = 3000 + + +# --------------------------------------------------------------------------- +# check_escalation_rules +# --------------------------------------------------------------------------- + +def check_escalation_rules( + agent: Any, + message_text: str, + conversation_metadata: dict[str, Any], + natural_lang_enabled: bool = False, +) -> dict[str, Any] | None: + """ + Evaluate the agent's configured escalation rules against the current message. + + Iterates agent.escalation_rules in order and returns the first matching rule. + Returns None when no rule matches. + + Condition types: + - "keyword AND count_field > N": boolean flag AND integer threshold check + - "natural_language_escalation": user is explicitly asking for a human + + Args: + agent: Agent ORM/mock object with escalation_rules list and name. + message_text: The user's current message text. + conversation_metadata: Dict of runtime metadata (e.g. {"billing_dispute": True, "attempts": 3}). + natural_lang_enabled: Whether the tenant has natural language escalation enabled. + + Returns: + The first matching rule dict (e.g. {"condition": "...", "action": "handoff_human"}), + or None if no rule matches. + """ + rules: list[dict[str, Any]] = getattr(agent, "escalation_rules", []) or [] + + for rule in rules: + condition: str = rule.get("condition", "") + + if condition == "natural_language_escalation": + if _matches_natural_language(message_text, natural_lang_enabled): + logger.debug("Natural language escalation triggered: %r", message_text[:80]) + return rule + + elif _matches_condition(condition, conversation_metadata): + logger.debug("Escalation rule matched: %r metadata=%s", condition, conversation_metadata) + return rule + + return None + + +def _matches_natural_language(message_text: str, enabled: bool) -> bool: + """Return True when the message contains an escalation phrase and NL is enabled.""" + if not enabled: + return False + normalized = message_text.lower() + return any(phrase in normalized for phrase in _ESCALATION_PHRASES) + + +def _matches_condition(condition: str, metadata: dict[str, Any]) -> bool: + """ + Simple condition evaluator for "keyword AND count_field > N" format. + + Supports only the AND combinator. Left operand is a boolean flag key, + right operand is an integer comparison (>, >=, <, <=, ==, !=). + + Returns False on any parsing error or missing metadata key. + """ + # Strip whitespace + condition = condition.strip() + + # Only handle "X AND Y op Z" format + and_match = re.match( + r"^(?P\w+)\s+AND\s+(?P\w+)\s*(?P[>\d+)$", + condition, + re.IGNORECASE, + ) + if not and_match: + # Unknown condition format — do not match + return False + + flag_key = and_match.group("flag") + count_field = and_match.group("count_field") + op = and_match.group("op") + threshold = int(and_match.group("value")) + + # Check boolean flag + flag_value = metadata.get(flag_key) + if not flag_value: + return False + + # Check integer comparison + count_value = metadata.get(count_field) + if count_value is None: + return False + + try: + count_int = int(count_value) + except (TypeError, ValueError): + return False + + return _compare(count_int, op, threshold) + + +def _compare(actual: int, op: str, threshold: int) -> bool: + """Evaluate an integer comparison operator.""" + if op == ">": + return actual > threshold + if op == ">=": + return actual >= threshold + if op == "<": + return actual < threshold + if op == "<=": + return actual <= threshold + if op in ("==", "="): + return actual == threshold + if op == "!=": + return actual != threshold + return False + + +# --------------------------------------------------------------------------- +# build_transcript +# --------------------------------------------------------------------------- + +def build_transcript(recent_messages: list[dict[str, str]]) -> str: + """ + Format conversation history as Slack mrkdwn for DM delivery. + + Each message is formatted as "*Role:* content" and joined with newlines. + Output is truncated to _TRANSCRIPT_MAX_CHARS if necessary, with a + truncation indicator appended. + + Args: + recent_messages: List of {"role": ..., "content": ...} dicts, oldest first. + + Returns: + Formatted transcript string, at most 3000 characters. + """ + if not recent_messages: + return "" + + lines: list[str] = [] + for msg in recent_messages: + role = msg.get("role", "unknown").capitalize() + content = msg.get("content", "") + lines.append(f"*{role}:* {content}") + + transcript = "\n".join(lines) + + if len(transcript) > _TRANSCRIPT_MAX_CHARS: + # Truncate and indicate how many chars were cut + truncation_suffix = "...[truncated]" + transcript = transcript[: _TRANSCRIPT_MAX_CHARS - len(truncation_suffix)] + truncation_suffix + + return transcript + + +# --------------------------------------------------------------------------- +# escalate_to_human +# --------------------------------------------------------------------------- + +async def escalate_to_human( + tenant_id: str, + agent: Any, + thread_id: str, + trigger_reason: str, + recent_messages: list[dict[str, str]], + assignee_slack_user_id: str, + bot_token: str, + redis: Any, + audit_logger: Any, + user_id: str = "", + agent_id: str = "", +) -> str: + """ + Perform a full human escalation handoff. + + Steps: + 1. Build the conversation transcript + 2. Compose the DM message (employee metaphor) + 3. Open a Slack DM with the assigned human via conversations.open + 4. Post the transcript to the DM via chat.postMessage + 5. Set the escalation status key in Redis (no TTL — stays until resolved) + 6. Log the escalation event to the audit trail + 7. Return the user-facing message to replace the LLM response + + Args: + tenant_id: Konstruct tenant identifier. + agent: Agent with .name and .escalation_assignee attributes. + thread_id: Thread identifier for the escalated conversation. + trigger_reason: Human-readable description of what triggered escalation. + recent_messages: Conversation history to include in DM. + assignee_slack_user_id: Slack user ID of the human to DM. + bot_token: Bot token (xoxb-...) for Slack API calls. + redis: Redis async client for escalation status storage. + audit_logger: AuditLogger instance (or compatible no-op). + user_id: End-user identifier (optional, for audit logging). + agent_id: Agent identifier string (optional, for audit logging). + + Returns: + User-facing message confirming escalation (replaces LLM response). + """ + import httpx + + from shared.redis_keys import escalation_status_key + + agent_name: str = getattr(agent, "name", "Your assistant") + transcript = build_transcript(recent_messages) + + dm_text = ( + f"*{agent_name} needs human assistance*\n" + f"Reason: {trigger_reason}\n" + f"Tenant: {tenant_id}\n" + f"\nConversation transcript:\n{transcript}\n" + f"\nThe agent will stay in the thread. You can reply directly to the user." + ) + + dm_channel_id: str = "" + + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + # Step 1: Open DM channel with the assigned human + open_resp = await client.post( + "https://slack.com/api/conversations.open", + headers={"Authorization": f"Bearer {bot_token}"}, + json={"users": assignee_slack_user_id}, + ) + open_data = open_resp.json() + if open_data.get("ok"): + dm_channel_id = open_data.get("channel", {}).get("id", "") + else: + logger.error( + "conversations.open failed for assignee=%s: %r", + assignee_slack_user_id, + open_data.get("error"), + ) + + # Step 2: Post transcript to DM + if dm_channel_id: + post_resp = await client.post( + "https://slack.com/api/chat.postMessage", + headers={"Authorization": f"Bearer {bot_token}"}, + json={ + "channel": dm_channel_id, + "text": dm_text, + }, + ) + post_data = post_resp.json() + if not post_data.get("ok"): + logger.error( + "chat.postMessage to DM failed: channel=%s error=%r", + dm_channel_id, + post_data.get("error"), + ) + + except Exception: + logger.exception( + "Escalation Slack API call failed for tenant=%s thread=%s", + tenant_id, + thread_id, + ) + + # Step 3: Set escalation status in Redis (no TTL — cleared by human after resolution) + esc_key = escalation_status_key(tenant_id, thread_id) + await redis.set(esc_key, "escalated") + + # Step 4: Log audit event + try: + await audit_logger.log_escalation( + tenant_id=tenant_id, + agent_id=agent_id, + user_id=user_id, + trigger_reason=trigger_reason, + metadata={ + "thread_id": thread_id, + "assignee_slack_user_id": assignee_slack_user_id, + "dm_channel_id": dm_channel_id, + }, + ) + except Exception: + logger.exception( + "Failed to log escalation audit event for tenant=%s thread=%s", + tenant_id, + thread_id, + ) + + # Step 5: Return user-facing confirmation message + return ( + "I've brought in a team member to help with this. They'll be with you shortly." + )