feat(02-04): wire escalation into orchestrator pipeline

- Add escalation pre-check in _process_message: assistant mode for escalated threads
- Add escalation post-check after LLM response: calls escalate_to_human on rule match
- Load Slack bot token unconditionally (needed for escalation DM, not just placeholders)
- Add keyword-based conversation metadata detector (billing keywords, attempt counter)
- Add no-op audit logger stub (replaced by real AuditLogger from Plan 02 when available)
- Add escalation_assignee and natural_language_escalation fields to Agent model
- Add Alembic migration 003 for new Agent columns
This commit is contained in:
2026-03-23 14:53:45 -06:00
parent 420294b8fe
commit a025cadc44
3 changed files with 250 additions and 2 deletions

View File

@@ -0,0 +1,56 @@
"""Phase 2 Plan 04: Add escalation_assignee and natural_language_escalation to agents
Revision ID: 003
Revises: 002
Create Date: 2026-03-23
These columns support the human escalation/handoff system:
- escalation_assignee: Slack user ID of the human to DM when escalation triggers
- natural_language_escalation: tenant-level flag enabling NL phrase detection
Both are nullable/defaulted so existing agent rows are unaffected.
"""
from __future__ import annotations
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "003"
down_revision: Union[str, None] = "002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add escalation_assignee column — nullable TEXT
op.add_column(
"agents",
sa.Column(
"escalation_assignee",
sa.Text,
nullable=True,
comment="Slack user ID of the human to DM on escalation (e.g. U0HUMANID)",
),
)
# Add natural_language_escalation column — NOT NULL with default False
op.add_column(
"agents",
sa.Column(
"natural_language_escalation",
sa.Boolean,
nullable=False,
server_default=sa.text("FALSE"),
comment="Whether natural language escalation phrases trigger handoff",
),
)
def downgrade() -> None:
op.drop_column("agents", "natural_language_escalation")
op.drop_column("agents", "escalation_assignee")

View File

@@ -22,6 +22,16 @@ Memory pipeline (Phase 2):
The embed_and_store Celery task runs asynchronously, meaning the LLM response The embed_and_store Celery task runs asynchronously, meaning the LLM response
is never blocked waiting for embedding computation. is never blocked waiting for embedding computation.
Escalation pipeline (Phase 2 Plan 04):
At message start (before LLM call):
6. Check Redis escalation_status_key for this thread
- If escalated and sender is end user: return assistant-mode reply (skip LLM)
- If escalated and sender is human assignee: process normally (human may ask agent for info)
After LLM response:
7. check_escalation_rules() — evaluate configured rules + NL trigger
8. If rule matches: call escalate_to_human() and replace LLM response with handoff message
""" """
from __future__ import annotations from __future__ import annotations
@@ -188,6 +198,15 @@ async def _process_message(
4. Append user message + assistant response to Redis sliding window 4. Append user message + assistant response to Redis sliding window
5. Dispatch embed_and_store.delay() for async pgvector backfill 5. Dispatch embed_and_store.delay() for async pgvector backfill
Escalation pipeline (Phase 2 Plan 04):
BEFORE LLM call:
6. Check Redis escalation status for this thread
- Escalated + end user message → skip LLM, return "team member is handling this"
- Escalated + human assignee message → process normally (human may query agent)
AFTER LLM response:
7. Evaluate escalation rules (configured + NL trigger)
8. If rule matches → call escalate_to_human, replace response with handoff message
After getting the LLM response, if Slack placeholder metadata is present, After getting the LLM response, if Slack placeholder metadata is present,
updates the "Thinking..." placeholder message with the real response using updates the "Thinking..." placeholder message with the real response using
Slack's chat.update API. Slack's chat.update API.
@@ -243,8 +262,10 @@ async def _process_message(
result = await session.execute(stmt) result = await session.execute(stmt)
agent = result.scalars().first() agent = result.scalars().first()
# Load the bot token for this tenant from channel_connections config # Load the Slack bot token for this tenant from channel_connections config.
if agent is not None and placeholder_ts and channel_id: # Loaded unconditionally (not just when placeholder_ts is set) because
# escalation DM delivery also requires the bot token.
if agent is not None:
from shared.models.tenant import ChannelConnection, ChannelTypeEnum from shared.models.tenant import ChannelConnection, ChannelTypeEnum
conn_stmt = ( conn_stmt = (
@@ -290,6 +311,7 @@ async def _process_message(
) )
agent_id_str = str(agent.id) agent_id_str = str(agent.id)
user_text: str = msg.content.text or "" user_text: str = msg.content.text or ""
thread_id: str = msg.thread_id or msg.id
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Memory retrieval (before LLM call) # Memory retrieval (before LLM call)
@@ -305,6 +327,47 @@ async def _process_message(
redis_client, msg.tenant_id, agent_id_str, user_id redis_client, msg.tenant_id, agent_id_str, user_id
) )
# -------------------------------------------------------------------------
# Escalation pre-check (BEFORE LLM call)
# If this thread is already escalated, enter assistant mode and skip LLM.
# -------------------------------------------------------------------------
from shared.redis_keys import escalation_status_key
esc_key = escalation_status_key(msg.tenant_id, thread_id)
esc_status = await redis_client.get(esc_key)
if esc_status is not None:
# Thread is escalated — check if sender is the assigned human or end user
assignee_id: str = getattr(agent, "escalation_assignee", "") or ""
sender_id: str = msg.sender.user_id if msg.sender else ""
if assignee_id and sender_id == assignee_id:
# Human assignee is messaging — process normally so they can ask the agent
logger.info(
"Escalated thread %s: assignee %s messaging — processing normally",
thread_id,
assignee_id,
)
else:
# End user messaging an escalated thread — defer to human, skip LLM
assistant_mode_reply = "A team member is looking into this. They'll respond shortly."
logger.info(
"Escalated thread %s: end user message — returning assistant-mode reply",
thread_id,
)
if placeholder_ts and channel_id:
await _update_slack_placeholder(
bot_token=slack_bot_token,
channel_id=channel_id,
placeholder_ts=placeholder_ts,
text=assistant_mode_reply,
)
return {
"message_id": msg.id,
"response": assistant_mode_reply,
"tenant_id": msg.tenant_id,
}
# 2. Long-term: pgvector similarity search # 2. Long-term: pgvector similarity search
relevant_context: list[str] = [] relevant_context: list[str] = []
if user_text: if user_text:
@@ -324,6 +387,12 @@ async def _process_message(
finally: finally:
await redis_client.aclose() await redis_client.aclose()
# -------------------------------------------------------------------------
# Conversation metadata detection (keyword-based, v1)
# Used by rule-based escalation conditions like "billing_dispute AND attempts > 2"
# -------------------------------------------------------------------------
conversation_metadata = _detect_conversation_metadata(user_text, recent_messages)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Build memory-enriched messages array and run LLM # Build memory-enriched messages array and run LLM
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
@@ -345,6 +414,62 @@ async def _process_message(
len(relevant_context), len(relevant_context),
) )
# -------------------------------------------------------------------------
# Escalation post-check (AFTER LLM response)
# -------------------------------------------------------------------------
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
natural_lang_enabled: bool = getattr(agent, "natural_language_escalation", False) or False
matched_rule = check_escalation_rules(
agent,
user_text,
conversation_metadata,
natural_lang_enabled=natural_lang_enabled,
)
if matched_rule is not None:
trigger_reason = matched_rule.get("condition", "escalation rule triggered")
assignee_id = getattr(agent, "escalation_assignee", "") or ""
if assignee_id and slack_bot_token:
# Full escalation: DM the human, set Redis flag, log audit
audit_logger = _get_no_op_audit_logger()
redis_esc = aioredis.from_url(settings.redis_url)
try:
response_text = await escalate_to_human(
tenant_id=msg.tenant_id,
agent=agent,
thread_id=thread_id,
trigger_reason=trigger_reason,
recent_messages=recent_messages,
assignee_slack_user_id=assignee_id,
bot_token=slack_bot_token,
redis=redis_esc,
audit_logger=audit_logger,
user_id=user_id,
agent_id=agent_id_str,
)
finally:
await redis_esc.aclose()
logger.info(
"Escalation triggered for tenant=%s agent=%s thread=%s reason=%r",
msg.tenant_id,
agent.id,
thread_id,
trigger_reason,
)
else:
# Escalation configured but missing assignee/token — log and continue
logger.warning(
"Escalation rule matched but escalation_assignee or bot_token missing "
"for tenant=%s agent=%s — cannot DM human",
msg.tenant_id,
agent.id,
)
response_text = "I've flagged this for a team member to review. They'll follow up with you soon."
# Replace the "Thinking..." placeholder with the real response # Replace the "Thinking..." placeholder with the real response
if placeholder_ts and channel_id: if placeholder_ts and channel_id:
await _update_slack_placeholder( await _update_slack_placeholder(
@@ -379,6 +504,62 @@ async def _process_message(
} }
def _detect_conversation_metadata(
current_text: str,
recent_messages: list[dict[str, str]],
) -> dict[str, object]:
"""
Keyword-based conversation metadata detector (v1 implementation).
Scans the current message and recent conversation history for keywords
that map to escalation rule conditions. This is a simple v1 approach —
the LLM could populate this more accurately via structured output in v2.
Returns a dict with detected boolean flags and integer counters that
escalation rules can reference (e.g. {"billing_dispute": True, "attempts": 3}).
Args:
current_text: The current user message text.
recent_messages: Recent conversation history (role/content dicts).
Returns:
Dict of detected metadata fields.
"""
metadata: dict[str, object] = {}
# Combine all text for keyword scanning
all_texts = [current_text] + [m.get("content", "") for m in recent_messages]
combined = " ".join(all_texts).lower()
# Billing dispute detection
billing_keywords = ("billing", "charge", "invoice", "refund", "payment", "overcharged", "subscription")
if any(kw in combined for kw in billing_keywords):
metadata["billing_dispute"] = True
# Attempt counter: count user messages in recent history as a proxy for attempts
user_turn_count = sum(1 for m in recent_messages if m.get("role") == "user")
# +1 for the current message
metadata["attempts"] = user_turn_count + 1
return metadata
def _get_no_op_audit_logger() -> object:
"""
Return a no-op audit logger for use when the real AuditLogger is not available.
This allows the escalation system to function even if Plan 02 (audit) has
not been implemented yet. Replace this with the real AuditLogger when available.
"""
import asyncio
class _NoOpAuditLogger:
async def log_escalation(self, **kwargs: object) -> None:
logger.info("AUDIT [no-op] escalation: %s", kwargs)
return _NoOpAuditLogger()
async def _update_slack_placeholder( async def _update_slack_placeholder(
bot_token: str, bot_token: str,
channel_id: str, channel_id: str,

View File

@@ -113,6 +113,17 @@ class Agent(Base):
) )
tool_assignments: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list) tool_assignments: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list)
escalation_rules: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list) escalation_rules: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list)
escalation_assignee: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Slack user ID of the human to DM on escalation (e.g. U0HUMANID)",
)
natural_language_escalation: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=False,
comment="Whether natural language escalation phrases trigger handoff",
)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), DateTime(timezone=True),