From a025cadc44a12dca71a60f792aa9bcb09a944db7 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 23 Mar 2026 14:53:45 -0600 Subject: [PATCH] feat(02-04): wire escalation into orchestrator pipeline - Add escalation pre-check in _process_message: assistant mode for escalated threads - Add escalation post-check after LLM response: calls escalate_to_human on rule match - Load Slack bot token unconditionally (needed for escalation DM, not just placeholders) - Add keyword-based conversation metadata detector (billing keywords, attempt counter) - Add no-op audit logger stub (replaced by real AuditLogger from Plan 02 when available) - Add escalation_assignee and natural_language_escalation fields to Agent model - Add Alembic migration 003 for new Agent columns --- migrations/versions/003_escalation_fields.py | 56 ++++++ packages/orchestrator/orchestrator/tasks.py | 185 ++++++++++++++++++- packages/shared/shared/models/tenant.py | 11 ++ 3 files changed, 250 insertions(+), 2 deletions(-) create mode 100644 migrations/versions/003_escalation_fields.py diff --git a/migrations/versions/003_escalation_fields.py b/migrations/versions/003_escalation_fields.py new file mode 100644 index 0000000..9d4bb03 --- /dev/null +++ b/migrations/versions/003_escalation_fields.py @@ -0,0 +1,56 @@ +"""Phase 2 Plan 04: Add escalation_assignee and natural_language_escalation to agents + +Revision ID: 003 +Revises: 002 +Create Date: 2026-03-23 + +These columns support the human escalation/handoff system: +- escalation_assignee: Slack user ID of the human to DM when escalation triggers +- natural_language_escalation: tenant-level flag enabling NL phrase detection + +Both are nullable/defaulted so existing agent rows are unaffected. +""" + +from __future__ import annotations + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + + +# revision identifiers, used by Alembic. +revision: str = "003" +down_revision: Union[str, None] = "002" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Add escalation_assignee column — nullable TEXT + op.add_column( + "agents", + sa.Column( + "escalation_assignee", + sa.Text, + nullable=True, + comment="Slack user ID of the human to DM on escalation (e.g. U0HUMANID)", + ), + ) + + # Add natural_language_escalation column — NOT NULL with default False + op.add_column( + "agents", + sa.Column( + "natural_language_escalation", + sa.Boolean, + nullable=False, + server_default=sa.text("FALSE"), + comment="Whether natural language escalation phrases trigger handoff", + ), + ) + + +def downgrade() -> None: + op.drop_column("agents", "natural_language_escalation") + op.drop_column("agents", "escalation_assignee") diff --git a/packages/orchestrator/orchestrator/tasks.py b/packages/orchestrator/orchestrator/tasks.py index a933482..c71e6cf 100644 --- a/packages/orchestrator/orchestrator/tasks.py +++ b/packages/orchestrator/orchestrator/tasks.py @@ -22,6 +22,16 @@ Memory pipeline (Phase 2): The embed_and_store Celery task runs asynchronously, meaning the LLM response is never blocked waiting for embedding computation. + +Escalation pipeline (Phase 2 Plan 04): + At message start (before LLM call): + 6. Check Redis escalation_status_key for this thread + - If escalated and sender is end user: return assistant-mode reply (skip LLM) + - If escalated and sender is human assignee: process normally (human may ask agent for info) + + After LLM response: + 7. check_escalation_rules() — evaluate configured rules + NL trigger + 8. If rule matches: call escalate_to_human() and replace LLM response with handoff message """ from __future__ import annotations @@ -188,6 +198,15 @@ async def _process_message( 4. Append user message + assistant response to Redis sliding window 5. Dispatch embed_and_store.delay() for async pgvector backfill + Escalation pipeline (Phase 2 Plan 04): + BEFORE LLM call: + 6. Check Redis escalation status for this thread + - Escalated + end user message → skip LLM, return "team member is handling this" + - Escalated + human assignee message → process normally (human may query agent) + AFTER LLM response: + 7. Evaluate escalation rules (configured + NL trigger) + 8. If rule matches → call escalate_to_human, replace response with handoff message + After getting the LLM response, if Slack placeholder metadata is present, updates the "Thinking..." placeholder message with the real response using Slack's chat.update API. @@ -243,8 +262,10 @@ async def _process_message( result = await session.execute(stmt) agent = result.scalars().first() - # Load the bot token for this tenant from channel_connections config - if agent is not None and placeholder_ts and channel_id: + # Load the Slack bot token for this tenant from channel_connections config. + # Loaded unconditionally (not just when placeholder_ts is set) because + # escalation DM delivery also requires the bot token. + if agent is not None: from shared.models.tenant import ChannelConnection, ChannelTypeEnum conn_stmt = ( @@ -290,6 +311,7 @@ async def _process_message( ) agent_id_str = str(agent.id) user_text: str = msg.content.text or "" + thread_id: str = msg.thread_id or msg.id # ------------------------------------------------------------------------- # Memory retrieval (before LLM call) @@ -305,6 +327,47 @@ async def _process_message( redis_client, msg.tenant_id, agent_id_str, user_id ) + # ------------------------------------------------------------------------- + # Escalation pre-check (BEFORE LLM call) + # If this thread is already escalated, enter assistant mode and skip LLM. + # ------------------------------------------------------------------------- + from shared.redis_keys import escalation_status_key + + esc_key = escalation_status_key(msg.tenant_id, thread_id) + esc_status = await redis_client.get(esc_key) + + if esc_status is not None: + # Thread is escalated — check if sender is the assigned human or end user + assignee_id: str = getattr(agent, "escalation_assignee", "") or "" + sender_id: str = msg.sender.user_id if msg.sender else "" + + if assignee_id and sender_id == assignee_id: + # Human assignee is messaging — process normally so they can ask the agent + logger.info( + "Escalated thread %s: assignee %s messaging — processing normally", + thread_id, + assignee_id, + ) + else: + # End user messaging an escalated thread — defer to human, skip LLM + assistant_mode_reply = "A team member is looking into this. They'll respond shortly." + logger.info( + "Escalated thread %s: end user message — returning assistant-mode reply", + thread_id, + ) + if placeholder_ts and channel_id: + await _update_slack_placeholder( + bot_token=slack_bot_token, + channel_id=channel_id, + placeholder_ts=placeholder_ts, + text=assistant_mode_reply, + ) + return { + "message_id": msg.id, + "response": assistant_mode_reply, + "tenant_id": msg.tenant_id, + } + # 2. Long-term: pgvector similarity search relevant_context: list[str] = [] if user_text: @@ -324,6 +387,12 @@ async def _process_message( finally: await redis_client.aclose() + # ------------------------------------------------------------------------- + # Conversation metadata detection (keyword-based, v1) + # Used by rule-based escalation conditions like "billing_dispute AND attempts > 2" + # ------------------------------------------------------------------------- + conversation_metadata = _detect_conversation_metadata(user_text, recent_messages) + # ------------------------------------------------------------------------- # Build memory-enriched messages array and run LLM # ------------------------------------------------------------------------- @@ -345,6 +414,62 @@ async def _process_message( len(relevant_context), ) + # ------------------------------------------------------------------------- + # Escalation post-check (AFTER LLM response) + # ------------------------------------------------------------------------- + from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human + + natural_lang_enabled: bool = getattr(agent, "natural_language_escalation", False) or False + matched_rule = check_escalation_rules( + agent, + user_text, + conversation_metadata, + natural_lang_enabled=natural_lang_enabled, + ) + + if matched_rule is not None: + trigger_reason = matched_rule.get("condition", "escalation rule triggered") + assignee_id = getattr(agent, "escalation_assignee", "") or "" + + if assignee_id and slack_bot_token: + # Full escalation: DM the human, set Redis flag, log audit + audit_logger = _get_no_op_audit_logger() + + redis_esc = aioredis.from_url(settings.redis_url) + try: + response_text = await escalate_to_human( + tenant_id=msg.tenant_id, + agent=agent, + thread_id=thread_id, + trigger_reason=trigger_reason, + recent_messages=recent_messages, + assignee_slack_user_id=assignee_id, + bot_token=slack_bot_token, + redis=redis_esc, + audit_logger=audit_logger, + user_id=user_id, + agent_id=agent_id_str, + ) + finally: + await redis_esc.aclose() + + logger.info( + "Escalation triggered for tenant=%s agent=%s thread=%s reason=%r", + msg.tenant_id, + agent.id, + thread_id, + trigger_reason, + ) + else: + # Escalation configured but missing assignee/token — log and continue + logger.warning( + "Escalation rule matched but escalation_assignee or bot_token missing " + "for tenant=%s agent=%s — cannot DM human", + msg.tenant_id, + agent.id, + ) + response_text = "I've flagged this for a team member to review. They'll follow up with you soon." + # Replace the "Thinking..." placeholder with the real response if placeholder_ts and channel_id: await _update_slack_placeholder( @@ -379,6 +504,62 @@ async def _process_message( } +def _detect_conversation_metadata( + current_text: str, + recent_messages: list[dict[str, str]], +) -> dict[str, object]: + """ + Keyword-based conversation metadata detector (v1 implementation). + + Scans the current message and recent conversation history for keywords + that map to escalation rule conditions. This is a simple v1 approach — + the LLM could populate this more accurately via structured output in v2. + + Returns a dict with detected boolean flags and integer counters that + escalation rules can reference (e.g. {"billing_dispute": True, "attempts": 3}). + + Args: + current_text: The current user message text. + recent_messages: Recent conversation history (role/content dicts). + + Returns: + Dict of detected metadata fields. + """ + metadata: dict[str, object] = {} + + # Combine all text for keyword scanning + all_texts = [current_text] + [m.get("content", "") for m in recent_messages] + combined = " ".join(all_texts).lower() + + # Billing dispute detection + billing_keywords = ("billing", "charge", "invoice", "refund", "payment", "overcharged", "subscription") + if any(kw in combined for kw in billing_keywords): + metadata["billing_dispute"] = True + + # Attempt counter: count user messages in recent history as a proxy for attempts + user_turn_count = sum(1 for m in recent_messages if m.get("role") == "user") + # +1 for the current message + metadata["attempts"] = user_turn_count + 1 + + return metadata + + +def _get_no_op_audit_logger() -> object: + """ + Return a no-op audit logger for use when the real AuditLogger is not available. + + This allows the escalation system to function even if Plan 02 (audit) has + not been implemented yet. Replace this with the real AuditLogger when available. + """ + import asyncio + + class _NoOpAuditLogger: + async def log_escalation(self, **kwargs: object) -> None: + logger.info("AUDIT [no-op] escalation: %s", kwargs) + + return _NoOpAuditLogger() + + async def _update_slack_placeholder( bot_token: str, channel_id: str, diff --git a/packages/shared/shared/models/tenant.py b/packages/shared/shared/models/tenant.py index d597ccf..322481f 100644 --- a/packages/shared/shared/models/tenant.py +++ b/packages/shared/shared/models/tenant.py @@ -113,6 +113,17 @@ class Agent(Base): ) tool_assignments: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list) escalation_rules: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list) + escalation_assignee: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="Slack user ID of the human to DM on escalation (e.g. U0HUMANID)", + ) + natural_language_escalation: Mapped[bool] = mapped_column( + Boolean, + nullable=False, + default=False, + comment="Whether natural language escalation phrases trigger handoff", + ) is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True),