--- phase: 02-agent-features plan: 04 type: execute wave: 2 depends_on: ["02-01"] files_modified: - packages/orchestrator/orchestrator/escalation/__init__.py - packages/orchestrator/orchestrator/escalation/handler.py - packages/shared/shared/redis_keys.py - packages/orchestrator/orchestrator/tasks.py - tests/unit/test_escalation.py - tests/integration/test_escalation.py autonomous: true requirements: - AGNT-05 must_haves: truths: - "When a configured escalation rule triggers, the conversation is handed off to a human" - "The human receives a DM with the full conversation transcript and escalation reason" - "The agent stays in the thread as assistant after escalation — defers to human for end-user responses" - "Natural language escalation ('can I talk to a human?') works when enabled per tenant" - "Escalation events are logged in the audit trail" artifacts: - path: "packages/orchestrator/orchestrator/escalation/handler.py" provides: "Escalation rule evaluation, transcript packaging, DM delivery" exports: ["check_escalation_rules", "escalate_to_human"] - path: "tests/unit/test_escalation.py" provides: "Unit tests for rule evaluation and transcript packaging" - path: "tests/integration/test_escalation.py" provides: "Integration tests for escalation DM delivery" key_links: - from: "packages/orchestrator/orchestrator/tasks.py" to: "orchestrator/escalation/handler.py" via: "check_escalation_rules called after LLM response in handle_message" pattern: "check_escalation_rules|escalate_to_human" - from: "packages/orchestrator/orchestrator/escalation/handler.py" to: "Slack API conversations.open + chat.postMessage" via: "httpx POST for DM delivery to assigned human" pattern: "conversations\\.open|chat\\.postMessage" - from: "packages/orchestrator/orchestrator/escalation/handler.py" to: "orchestrator/audit/logger.py" via: "log_escalation on every handoff" pattern: "audit_logger\\.log_escalation" --- Build the human escalation/handoff system: rule-based trigger evaluation, full conversation transcript packaging, DM delivery to assigned human, and post-escalation assistant mode where the agent defers to the human. Purpose: Ensures the AI employee knows its limits and gracefully hands off to a human when configured rules trigger or the user explicitly requests it — maintaining the "employee" metaphor ("let me get my manager"). Output: Escalation handler, updated orchestrator pipeline, passing tests. @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-agent-features/02-CONTEXT.md @.planning/phases/02-agent-features/02-RESEARCH.md @.planning/phases/02-agent-features/02-01-SUMMARY.md @packages/orchestrator/orchestrator/tasks.py @packages/orchestrator/orchestrator/agents/runner.py @packages/shared/shared/models/tenant.py @packages/shared/shared/redis_keys.py From packages/orchestrator/orchestrator/memory/short_term.py: - async get_recent_messages(redis, tenant_id, agent_id, user_id, n=20) -> list[dict] From packages/orchestrator/orchestrator/audit/logger.py: - AuditLogger.log_escalation(tenant_id, agent_id, user_id, trigger_reason, metadata={}) From packages/shared/shared/models/tenant.py: - Agent.escalation_rules: list[dict] (e.g., [{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) - Agent.name: str (used in escalation DM message) - escalation_status_key(tenant_id, thread_id) -> "{tenant_id}:escalation:{thread_id}" Task 1: Escalation rule evaluator, transcript packager, and DM delivery with tests packages/orchestrator/orchestrator/escalation/__init__.py, packages/orchestrator/orchestrator/escalation/handler.py, tests/unit/test_escalation.py, tests/integration/test_escalation.py - check_escalation_rules returns the matching rule when a condition is met, None otherwise - check_escalation_rules with "billing_dispute AND attempts > 2" matches when conversation metadata has billing_dispute=True and attempts=3 - check_escalation_rules with natural language trigger ("can I talk to a human?") matches when natural_language_escalation is enabled for the tenant - check_escalation_rules with natural language trigger returns None when natural_language_escalation is disabled - build_transcript formats recent messages as "*User:* message\n*Assistant:* response" with Slack mrkdwn - escalate_to_human opens a Slack DM with the assigned human and posts the transcript - escalate_to_human sets the escalation status key in Redis - After escalation, agent responses to end-user messages include "A team member is looking into this" - Escalation event is logged to audit trail with trigger_reason 1. Create `packages/orchestrator/orchestrator/escalation/handler.py`: **check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled=False) -> dict | None:** - Iterates agent.escalation_rules list - Each rule has: condition (str), action (str — 'handoff_human') - Simple condition parser: supports "keyword AND count_check" format - Check if conversation_metadata matches the condition fields - For natural language: check if message_text matches common escalation phrases ("talk to a human", "speak to someone", "get a person", "human agent", "real person", "manager") AND natural_lang_enabled is True - Returns the first matching rule dict, or None **build_transcript(recent_messages: list[dict]) -> str:** - Formats each message as "*{role.capitalize()}:* {content}" - Joins with newlines - Truncates to 3000 chars if needed (Slack message limit) **async escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages, assignee_slack_user_id, bot_token, redis, audit_logger) -> str:** - Build formatted transcript via build_transcript() - Compose DM text following the "employee" metaphor: "{agent.name} needs human assistance\nReason: {trigger_reason}\nTenant: {tenant_id}\n\nConversation transcript:\n{transcript}\n\nThe agent will stay in the thread. You can reply directly to the user." - Open DM channel via httpx POST to https://slack.com/api/conversations.open - Post transcript to DM via httpx POST to https://slack.com/api/chat.postMessage - Set escalation status in Redis: escalation_status_key(tenant_id, thread_id) = "escalated" with no TTL (stays until manually resolved) - Log escalation event via audit_logger.log_escalation() - Return a message for the end user: "I've brought in {assignee_name or 'a team member'} to help with this. They'll be with you shortly." 2. Write unit tests (test_escalation.py): - Rule matching: condition with billing_dispute matches, non-matching condition returns None - Natural language: "can I talk to a human?" matches when enabled, returns None when disabled - Transcript formatting: messages formatted correctly, truncated at limit - Various escalation phrases tested 3. Write integration tests (test_escalation.py): - Mock httpx calls to Slack API - Verify conversations.open is called with correct user ID - Verify chat.postMessage is called with transcript - Verify Redis escalation key is set - Verify audit event is logged with action_type='escalation' cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v - Escalation rules evaluate correctly against conversation metadata - Natural language escalation triggers on common phrases when enabled - Transcript is formatted in Slack mrkdwn and truncated if needed - DM delivered to assigned human via Slack API - Escalation status tracked in Redis - Audit event logged for every escalation Task 2: Wire escalation into orchestrator pipeline with post-escalation assistant mode packages/orchestrator/orchestrator/tasks.py, packages/shared/shared/redis_keys.py 1. Update `tasks.py` — add escalation checks to handle_message: **At the START of handle_message (before LLM call):** - Check Redis escalation_status_key(tenant_id, thread_id) - If escalated: enter assistant mode - If the sender is the assigned human: process normally (the human might ask the agent for info) - If the sender is the end user: respond with "A team member is looking into this. They'll respond shortly." Do NOT call the LLM. This prevents the agent from overriding the human's response. - This check must happen AFTER message normalization but BEFORE the LLM call **AFTER the LLM response (before sending reply):** - Load agent's escalation_rules and tenant config (natural_language_escalation setting) - Call check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled) - If a rule matches: a. Load recent messages from Redis sliding window (already loaded for memory) b. Get assignee_slack_user_id from agent configuration (add escalation_assignee field to Agent model or read from escalation_rules config) c. Get bot_token from channel_connections config (already available in task extras) d. Call escalate_to_human() e. Replace the LLM response with the escalation message returned by escalate_to_human() f. Send escalation message to user instead of original LLM response 2. Add escalation_assignee field to Agent model if not already present: - In packages/shared/shared/models/tenant.py, add: escalation_assignee: Mapped[str | None] = mapped_column(Text, nullable=True) - This is the Slack user ID of the human to DM on escalation - Also add: natural_language_escalation: Mapped[bool] = mapped_column(Boolean, default=False) 3. Conversation metadata tracking: - For rule-based escalation (e.g., "billing_dispute AND attempts > 2"), the orchestrator needs to track conversation metadata - Store conversation metadata in Redis: {tenant_id}:conv_meta:{thread_id} as a JSON dict - The LLM can populate this via a system prompt instruction: "If the user mentions billing, set billing_dispute=true in your response metadata" - Or simpler: use keyword detection on the conversation history to populate metadata - Use Claude's discretion on the simplest approach that works. Keyword detection on the sliding window is probably sufficient for v1. 4. If Agent model is modified, create a small Alembic migration for the new columns. CRITICAL constraints: - Celery task is sync def with asyncio.run() - httpx calls to Slack API follow the same pattern as existing chat.update in tasks.py - Redis operations use existing async pattern - Audit logging uses AuditLogger from Plan 02 (if Plan 02 not yet executed, use a no-op logger that can be replaced) cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v - Escalated conversations route end-user messages to "team member is handling this" auto-reply - Human messages in escalated threads are processed normally by the agent - Escalation rules checked after every LLM response - Natural language escalation works when enabled per tenant - Escalation triggers DM to assigned human with full transcript - Agent model has escalation_assignee and natural_language_escalation fields - Full pipeline: message in -> memory -> LLM -> escalation check -> response/handoff - All tests pass: `pytest tests/ -x` - Escalation tests pass: `pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x` - Agent model migration applies cleanly: `alembic upgrade head` - Configured escalation rules trigger handoff to human with full conversation context - Natural language escalation ("can I talk to a human?") works when enabled per tenant - Escalated conversations enter assistant mode — agent defers to human - Human receives DM with complete transcript and escalation reason - Every escalation event is recorded in the audit trail After completion, create `.planning/phases/02-agent-features/02-04-SUMMARY.md`