Files

13 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
phase plan type wave depends_on files_modified autonomous requirements must_haves
02-agent-features 04 execute 2
02-01
packages/orchestrator/orchestrator/escalation/__init__.py
packages/orchestrator/orchestrator/escalation/handler.py
packages/shared/shared/redis_keys.py
packages/orchestrator/orchestrator/tasks.py
tests/unit/test_escalation.py
tests/integration/test_escalation.py
true
AGNT-05
truths artifacts key_links
When a configured escalation rule triggers, the conversation is handed off to a human
The human receives a DM with the full conversation transcript and escalation reason
The agent stays in the thread as assistant after escalation — defers to human for end-user responses
Natural language escalation ('can I talk to a human?') works when enabled per tenant
Escalation events are logged in the audit trail
path provides exports
packages/orchestrator/orchestrator/escalation/handler.py Escalation rule evaluation, transcript packaging, DM delivery
check_escalation_rules
escalate_to_human
path provides
tests/unit/test_escalation.py Unit tests for rule evaluation and transcript packaging
path provides
tests/integration/test_escalation.py Integration tests for escalation DM delivery
from to via pattern
packages/orchestrator/orchestrator/tasks.py orchestrator/escalation/handler.py check_escalation_rules called after LLM response in handle_message check_escalation_rules|escalate_to_human
from to via pattern
packages/orchestrator/orchestrator/escalation/handler.py Slack API conversations.open + chat.postMessage httpx POST for DM delivery to assigned human conversations.open|chat.postMessage
from to via pattern
packages/orchestrator/orchestrator/escalation/handler.py orchestrator/audit/logger.py log_escalation on every handoff audit_logger.log_escalation
Build the human escalation/handoff system: rule-based trigger evaluation, full conversation transcript packaging, DM delivery to assigned human, and post-escalation assistant mode where the agent defers to the human.

Purpose: Ensures the AI employee knows its limits and gracefully hands off to a human when configured rules trigger or the user explicitly requests it — maintaining the "employee" metaphor ("let me get my manager"). Output: Escalation handler, updated orchestrator pipeline, passing tests.

<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-agent-features/02-CONTEXT.md @.planning/phases/02-agent-features/02-RESEARCH.md @.planning/phases/02-agent-features/02-01-SUMMARY.md

@packages/orchestrator/orchestrator/tasks.py @packages/orchestrator/orchestrator/agents/runner.py @packages/shared/shared/models/tenant.py @packages/shared/shared/redis_keys.py

From packages/orchestrator/orchestrator/memory/short_term.py: - async get_recent_messages(redis, tenant_id, agent_id, user_id, n=20) -> list[dict]

From packages/orchestrator/orchestrator/audit/logger.py:

  • AuditLogger.log_escalation(tenant_id, agent_id, user_id, trigger_reason, metadata={})

From packages/shared/shared/models/tenant.py:

  • Agent.escalation_rules: list[dict] (e.g., [{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
  • Agent.name: str (used in escalation DM message)
  • escalation_status_key(tenant_id, thread_id) -> "{tenant_id}:escalation:{thread_id}"
Task 1: Escalation rule evaluator, transcript packager, and DM delivery with tests packages/orchestrator/orchestrator/escalation/__init__.py, packages/orchestrator/orchestrator/escalation/handler.py, tests/unit/test_escalation.py, tests/integration/test_escalation.py - check_escalation_rules returns the matching rule when a condition is met, None otherwise - check_escalation_rules with "billing_dispute AND attempts > 2" matches when conversation metadata has billing_dispute=True and attempts=3 - check_escalation_rules with natural language trigger ("can I talk to a human?") matches when natural_language_escalation is enabled for the tenant - check_escalation_rules with natural language trigger returns None when natural_language_escalation is disabled - build_transcript formats recent messages as "*User:* message\n*Assistant:* response" with Slack mrkdwn - escalate_to_human opens a Slack DM with the assigned human and posts the transcript - escalate_to_human sets the escalation status key in Redis - After escalation, agent responses to end-user messages include "A team member is looking into this" - Escalation event is logged to audit trail with trigger_reason 1. Create `packages/orchestrator/orchestrator/escalation/handler.py`:
   **check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled=False) -> dict | None:**
   - Iterates agent.escalation_rules list
   - Each rule has: condition (str), action (str — 'handoff_human')
   - Simple condition parser: supports "keyword AND count_check" format
     - Check if conversation_metadata matches the condition fields
     - For natural language: check if message_text matches common escalation phrases ("talk to a human", "speak to someone", "get a person", "human agent", "real person", "manager") AND natural_lang_enabled is True
   - Returns the first matching rule dict, or None

   **build_transcript(recent_messages: list[dict]) -> str:**
   - Formats each message as "*{role.capitalize()}:* {content}"
   - Joins with newlines
   - Truncates to 3000 chars if needed (Slack message limit)

   **async escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages, assignee_slack_user_id, bot_token, redis, audit_logger) -> str:**
   - Build formatted transcript via build_transcript()
   - Compose DM text following the "employee" metaphor:
     "{agent.name} needs human assistance\nReason: {trigger_reason}\nTenant: {tenant_id}\n\nConversation transcript:\n{transcript}\n\nThe agent will stay in the thread. You can reply directly to the user."
   - Open DM channel via httpx POST to https://slack.com/api/conversations.open
   - Post transcript to DM via httpx POST to https://slack.com/api/chat.postMessage
   - Set escalation status in Redis: escalation_status_key(tenant_id, thread_id) = "escalated" with no TTL (stays until manually resolved)
   - Log escalation event via audit_logger.log_escalation()
   - Return a message for the end user: "I've brought in {assignee_name or 'a team member'} to help with this. They'll be with you shortly."

2. Write unit tests (test_escalation.py):
   - Rule matching: condition with billing_dispute matches, non-matching condition returns None
   - Natural language: "can I talk to a human?" matches when enabled, returns None when disabled
   - Transcript formatting: messages formatted correctly, truncated at limit
   - Various escalation phrases tested

3. Write integration tests (test_escalation.py):
   - Mock httpx calls to Slack API
   - Verify conversations.open is called with correct user ID
   - Verify chat.postMessage is called with transcript
   - Verify Redis escalation key is set
   - Verify audit event is logged with action_type='escalation'
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v - Escalation rules evaluate correctly against conversation metadata - Natural language escalation triggers on common phrases when enabled - Transcript is formatted in Slack mrkdwn and truncated if needed - DM delivered to assigned human via Slack API - Escalation status tracked in Redis - Audit event logged for every escalation Task 2: Wire escalation into orchestrator pipeline with post-escalation assistant mode packages/orchestrator/orchestrator/tasks.py, packages/shared/shared/redis_keys.py 1. Update `tasks.py` — add escalation checks to handle_message:
   **At the START of handle_message (before LLM call):**
   - Check Redis escalation_status_key(tenant_id, thread_id)
   - If escalated: enter assistant mode
     - If the sender is the assigned human: process normally (the human might ask the agent for info)
     - If the sender is the end user: respond with "A team member is looking into this. They'll respond shortly." Do NOT call the LLM. This prevents the agent from overriding the human's response.
     - This check must happen AFTER message normalization but BEFORE the LLM call

   **AFTER the LLM response (before sending reply):**
   - Load agent's escalation_rules and tenant config (natural_language_escalation setting)
   - Call check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled)
   - If a rule matches:
     a. Load recent messages from Redis sliding window (already loaded for memory)
     b. Get assignee_slack_user_id from agent configuration (add escalation_assignee field to Agent model or read from escalation_rules config)
     c. Get bot_token from channel_connections config (already available in task extras)
     d. Call escalate_to_human()
     e. Replace the LLM response with the escalation message returned by escalate_to_human()
     f. Send escalation message to user instead of original LLM response

2. Add escalation_assignee field to Agent model if not already present:
   - In packages/shared/shared/models/tenant.py, add: escalation_assignee: Mapped[str | None] = mapped_column(Text, nullable=True)
   - This is the Slack user ID of the human to DM on escalation
   - Also add: natural_language_escalation: Mapped[bool] = mapped_column(Boolean, default=False)

3. Conversation metadata tracking:
   - For rule-based escalation (e.g., "billing_dispute AND attempts > 2"), the orchestrator needs to track conversation metadata
   - Store conversation metadata in Redis: {tenant_id}:conv_meta:{thread_id} as a JSON dict
   - The LLM can populate this via a system prompt instruction: "If the user mentions billing, set billing_dispute=true in your response metadata"
   - Or simpler: use keyword detection on the conversation history to populate metadata
   - Use Claude's discretion on the simplest approach that works. Keyword detection on the sliding window is probably sufficient for v1.

4. If Agent model is modified, create a small Alembic migration for the new columns.

CRITICAL constraints:
- Celery task is sync def with asyncio.run()
- httpx calls to Slack API follow the same pattern as existing chat.update in tasks.py
- Redis operations use existing async pattern
- Audit logging uses AuditLogger from Plan 02 (if Plan 02 not yet executed, use a no-op logger that can be replaced)
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v - Escalated conversations route end-user messages to "team member is handling this" auto-reply - Human messages in escalated threads are processed normally by the agent - Escalation rules checked after every LLM response - Natural language escalation works when enabled per tenant - Escalation triggers DM to assigned human with full transcript - Agent model has escalation_assignee and natural_language_escalation fields - Full pipeline: message in -> memory -> LLM -> escalation check -> response/handoff - All tests pass: `pytest tests/ -x` - Escalation tests pass: `pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x` - Agent model migration applies cleanly: `alembic upgrade head`

<success_criteria>

  • Configured escalation rules trigger handoff to human with full conversation context
  • Natural language escalation ("can I talk to a human?") works when enabled per tenant
  • Escalated conversations enter assistant mode — agent defers to human
  • Human receives DM with complete transcript and escalation reason
  • Every escalation event is recorded in the audit trail </success_criteria>
After completion, create `.planning/phases/02-agent-features/02-04-SUMMARY.md`