Files

245 lines
13 KiB
Markdown

---
phase: 02-agent-features
plan: 04
type: execute
wave: 2
depends_on: ["02-01"]
files_modified:
- packages/orchestrator/orchestrator/escalation/__init__.py
- packages/orchestrator/orchestrator/escalation/handler.py
- packages/shared/shared/redis_keys.py
- packages/orchestrator/orchestrator/tasks.py
- tests/unit/test_escalation.py
- tests/integration/test_escalation.py
autonomous: true
requirements:
- AGNT-05
must_haves:
truths:
- "When a configured escalation rule triggers, the conversation is handed off to a human"
- "The human receives a DM with the full conversation transcript and escalation reason"
- "The agent stays in the thread as assistant after escalation — defers to human for end-user responses"
- "Natural language escalation ('can I talk to a human?') works when enabled per tenant"
- "Escalation events are logged in the audit trail"
artifacts:
- path: "packages/orchestrator/orchestrator/escalation/handler.py"
provides: "Escalation rule evaluation, transcript packaging, DM delivery"
exports: ["check_escalation_rules", "escalate_to_human"]
- path: "tests/unit/test_escalation.py"
provides: "Unit tests for rule evaluation and transcript packaging"
- path: "tests/integration/test_escalation.py"
provides: "Integration tests for escalation DM delivery"
key_links:
- from: "packages/orchestrator/orchestrator/tasks.py"
to: "orchestrator/escalation/handler.py"
via: "check_escalation_rules called after LLM response in handle_message"
pattern: "check_escalation_rules|escalate_to_human"
- from: "packages/orchestrator/orchestrator/escalation/handler.py"
to: "Slack API conversations.open + chat.postMessage"
via: "httpx POST for DM delivery to assigned human"
pattern: "conversations\\.open|chat\\.postMessage"
- from: "packages/orchestrator/orchestrator/escalation/handler.py"
to: "orchestrator/audit/logger.py"
via: "log_escalation on every handoff"
pattern: "audit_logger\\.log_escalation"
---
<objective>
Build the human escalation/handoff system: rule-based trigger evaluation, full conversation transcript packaging, DM delivery to assigned human, and post-escalation assistant mode where the agent defers to the human.
Purpose: Ensures the AI employee knows its limits and gracefully hands off to a human when configured rules trigger or the user explicitly requests it — maintaining the "employee" metaphor ("let me get my manager").
Output: Escalation handler, updated orchestrator pipeline, passing tests.
</objective>
<execution_context>
@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md
@/home/adelorenzo/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/02-agent-features/02-CONTEXT.md
@.planning/phases/02-agent-features/02-RESEARCH.md
@.planning/phases/02-agent-features/02-01-SUMMARY.md
@packages/orchestrator/orchestrator/tasks.py
@packages/orchestrator/orchestrator/agents/runner.py
@packages/shared/shared/models/tenant.py
@packages/shared/shared/redis_keys.py
<interfaces>
<!-- From Plan 01 (memory) — needed for transcript assembly -->
From packages/orchestrator/orchestrator/memory/short_term.py:
- async get_recent_messages(redis, tenant_id, agent_id, user_id, n=20) -> list[dict]
<!-- From Plan 02 (audit) — needed for logging escalation events -->
From packages/orchestrator/orchestrator/audit/logger.py:
- AuditLogger.log_escalation(tenant_id, agent_id, user_id, trigger_reason, metadata={})
<!-- From shared/models/tenant.py — Agent model with escalation_rules -->
From packages/shared/shared/models/tenant.py:
- Agent.escalation_rules: list[dict] (e.g., [{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}])
- Agent.name: str (used in escalation DM message)
<!-- From shared/redis_keys.py — extended in Plan 01 -->
- escalation_status_key(tenant_id, thread_id) -> "{tenant_id}:escalation:{thread_id}"
</interfaces>
</context>
<tasks>
<task type="auto" tdd="true">
<name>Task 1: Escalation rule evaluator, transcript packager, and DM delivery with tests</name>
<files>
packages/orchestrator/orchestrator/escalation/__init__.py,
packages/orchestrator/orchestrator/escalation/handler.py,
tests/unit/test_escalation.py,
tests/integration/test_escalation.py
</files>
<behavior>
- check_escalation_rules returns the matching rule when a condition is met, None otherwise
- check_escalation_rules with "billing_dispute AND attempts > 2" matches when conversation metadata has billing_dispute=True and attempts=3
- check_escalation_rules with natural language trigger ("can I talk to a human?") matches when natural_language_escalation is enabled for the tenant
- check_escalation_rules with natural language trigger returns None when natural_language_escalation is disabled
- build_transcript formats recent messages as "*User:* message\n*Assistant:* response" with Slack mrkdwn
- escalate_to_human opens a Slack DM with the assigned human and posts the transcript
- escalate_to_human sets the escalation status key in Redis
- After escalation, agent responses to end-user messages include "A team member is looking into this"
- Escalation event is logged to audit trail with trigger_reason
</behavior>
<action>
1. Create `packages/orchestrator/orchestrator/escalation/handler.py`:
**check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled=False) -> dict | None:**
- Iterates agent.escalation_rules list
- Each rule has: condition (str), action (str — 'handoff_human')
- Simple condition parser: supports "keyword AND count_check" format
- Check if conversation_metadata matches the condition fields
- For natural language: check if message_text matches common escalation phrases ("talk to a human", "speak to someone", "get a person", "human agent", "real person", "manager") AND natural_lang_enabled is True
- Returns the first matching rule dict, or None
**build_transcript(recent_messages: list[dict]) -> str:**
- Formats each message as "*{role.capitalize()}:* {content}"
- Joins with newlines
- Truncates to 3000 chars if needed (Slack message limit)
**async escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages, assignee_slack_user_id, bot_token, redis, audit_logger) -> str:**
- Build formatted transcript via build_transcript()
- Compose DM text following the "employee" metaphor:
"{agent.name} needs human assistance\nReason: {trigger_reason}\nTenant: {tenant_id}\n\nConversation transcript:\n{transcript}\n\nThe agent will stay in the thread. You can reply directly to the user."
- Open DM channel via httpx POST to https://slack.com/api/conversations.open
- Post transcript to DM via httpx POST to https://slack.com/api/chat.postMessage
- Set escalation status in Redis: escalation_status_key(tenant_id, thread_id) = "escalated" with no TTL (stays until manually resolved)
- Log escalation event via audit_logger.log_escalation()
- Return a message for the end user: "I've brought in {assignee_name or 'a team member'} to help with this. They'll be with you shortly."
2. Write unit tests (test_escalation.py):
- Rule matching: condition with billing_dispute matches, non-matching condition returns None
- Natural language: "can I talk to a human?" matches when enabled, returns None when disabled
- Transcript formatting: messages formatted correctly, truncated at limit
- Various escalation phrases tested
3. Write integration tests (test_escalation.py):
- Mock httpx calls to Slack API
- Verify conversations.open is called with correct user ID
- Verify chat.postMessage is called with transcript
- Verify Redis escalation key is set
- Verify audit event is logged with action_type='escalation'
</action>
<verify>
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v</automated>
</verify>
<done>
- Escalation rules evaluate correctly against conversation metadata
- Natural language escalation triggers on common phrases when enabled
- Transcript is formatted in Slack mrkdwn and truncated if needed
- DM delivered to assigned human via Slack API
- Escalation status tracked in Redis
- Audit event logged for every escalation
</done>
</task>
<task type="auto">
<name>Task 2: Wire escalation into orchestrator pipeline with post-escalation assistant mode</name>
<files>
packages/orchestrator/orchestrator/tasks.py,
packages/shared/shared/redis_keys.py
</files>
<action>
1. Update `tasks.py` — add escalation checks to handle_message:
**At the START of handle_message (before LLM call):**
- Check Redis escalation_status_key(tenant_id, thread_id)
- If escalated: enter assistant mode
- If the sender is the assigned human: process normally (the human might ask the agent for info)
- If the sender is the end user: respond with "A team member is looking into this. They'll respond shortly." Do NOT call the LLM. This prevents the agent from overriding the human's response.
- This check must happen AFTER message normalization but BEFORE the LLM call
**AFTER the LLM response (before sending reply):**
- Load agent's escalation_rules and tenant config (natural_language_escalation setting)
- Call check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled)
- If a rule matches:
a. Load recent messages from Redis sliding window (already loaded for memory)
b. Get assignee_slack_user_id from agent configuration (add escalation_assignee field to Agent model or read from escalation_rules config)
c. Get bot_token from channel_connections config (already available in task extras)
d. Call escalate_to_human()
e. Replace the LLM response with the escalation message returned by escalate_to_human()
f. Send escalation message to user instead of original LLM response
2. Add escalation_assignee field to Agent model if not already present:
- In packages/shared/shared/models/tenant.py, add: escalation_assignee: Mapped[str | None] = mapped_column(Text, nullable=True)
- This is the Slack user ID of the human to DM on escalation
- Also add: natural_language_escalation: Mapped[bool] = mapped_column(Boolean, default=False)
3. Conversation metadata tracking:
- For rule-based escalation (e.g., "billing_dispute AND attempts > 2"), the orchestrator needs to track conversation metadata
- Store conversation metadata in Redis: {tenant_id}:conv_meta:{thread_id} as a JSON dict
- The LLM can populate this via a system prompt instruction: "If the user mentions billing, set billing_dispute=true in your response metadata"
- Or simpler: use keyword detection on the conversation history to populate metadata
- Use Claude's discretion on the simplest approach that works. Keyword detection on the sliding window is probably sufficient for v1.
4. If Agent model is modified, create a small Alembic migration for the new columns.
CRITICAL constraints:
- Celery task is sync def with asyncio.run()
- httpx calls to Slack API follow the same pattern as existing chat.update in tasks.py
- Redis operations use existing async pattern
- Audit logging uses AuditLogger from Plan 02 (if Plan 02 not yet executed, use a no-op logger that can be replaced)
</action>
<verify>
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v</automated>
</verify>
<done>
- Escalated conversations route end-user messages to "team member is handling this" auto-reply
- Human messages in escalated threads are processed normally by the agent
- Escalation rules checked after every LLM response
- Natural language escalation works when enabled per tenant
- Escalation triggers DM to assigned human with full transcript
- Agent model has escalation_assignee and natural_language_escalation fields
- Full pipeline: message in -> memory -> LLM -> escalation check -> response/handoff
</done>
</task>
</tasks>
<verification>
- All tests pass: `pytest tests/ -x`
- Escalation tests pass: `pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x`
- Agent model migration applies cleanly: `alembic upgrade head`
</verification>
<success_criteria>
- Configured escalation rules trigger handoff to human with full conversation context
- Natural language escalation ("can I talk to a human?") works when enabled per tenant
- Escalated conversations enter assistant mode — agent defers to human
- Human receives DM with complete transcript and escalation reason
- Every escalation event is recorded in the audit trail
</success_criteria>
<output>
After completion, create `.planning/phases/02-agent-features/02-04-SUMMARY.md`
</output>