docs(02-agent-features): create gap closure plan for escalation and WhatsApp outbound wiring

This commit is contained in:
2026-03-23 19:03:24 -06:00
parent d921ed776a
commit 48d9ef0c29
2 changed files with 372 additions and 3 deletions

View File

@@ -13,7 +13,7 @@ Konstruct ships in three coarse phases ordered by dependency: first build the se
Decimal phases appear between their surrounding integers in numeric order. Decimal phases appear between their surrounding integers in numeric order.
- [x] **Phase 1: Foundation** - Secure multi-tenant pipeline with Slack end-to-end and basic agent response (completed 2026-03-23) - [x] **Phase 1: Foundation** - Secure multi-tenant pipeline with Slack end-to-end and basic agent response (completed 2026-03-23)
- [x] **Phase 2: Agent Features** - Persistent memory, tool framework, WhatsApp integration, and human escalation (completed 2026-03-23) - [ ] **Phase 2: Agent Features** - Persistent memory, tool framework, WhatsApp integration, and human escalation (gap closure in progress)
- [ ] **Phase 3: Operator Experience** - Admin portal, tenant onboarding, and Stripe billing - [ ] **Phase 3: Operator Experience** - Admin portal, tenant onboarding, and Stripe billing
## Phase Details ## Phase Details
@@ -47,7 +47,7 @@ Plans:
3. The agent can invoke a registered tool (e.g., knowledge base search) and incorporate the result into its response 3. The agent can invoke a registered tool (e.g., knowledge base search) and incorporate the result into its response
4. When a configured escalation rule triggers (e.g., failed resolution attempts), the conversation and full context are handed off to a human with no information lost 4. When a configured escalation rule triggers (e.g., failed resolution attempts), the conversation and full context are handed off to a human with no information lost
5. Every LLM call, tool invocation, and handoff event is recorded in an immutable audit trail queryable by tenant 5. Every LLM call, tool invocation, and handoff event is recorded in an immutable audit trail queryable by tenant
**Plans**: 5 plans **Plans**: 6 plans
Plans: Plans:
- [ ] 02-01: Conversational memory layer (Redis sliding window + pgvector long-term storage with HNSW index) - [ ] 02-01: Conversational memory layer (Redis sliding window + pgvector long-term storage with HNSW index)
@@ -55,6 +55,7 @@ Plans:
- [ ] 02-03: WhatsApp adapter (Business Cloud API, per-tenant phone numbers, media download, Meta policy compliance) - [ ] 02-03: WhatsApp adapter (Business Cloud API, per-tenant phone numbers, media download, Meta policy compliance)
- [ ] 02-04: Human escalation/handoff with full context transfer and audit trail - [ ] 02-04: Human escalation/handoff with full context transfer and audit trail
- [ ] 02-05: Cross-channel media support and multimodal LLM interpretation (Slack file_share, image_url content blocks, channel-aware outbound routing) - [ ] 02-05: Cross-channel media support and multimodal LLM interpretation (Slack file_share, image_url content blocks, channel-aware outbound routing)
- [ ] 02-06: Gap closure — re-wire escalation handler and WhatsApp outbound routing into pipeline, add tier-2 system prompt scoping
### Phase 3: Operator Experience ### Phase 3: Operator Experience
**Goal**: An operator can sign up, onboard their tenant through a web UI, connect their messaging channels, configure their AI employee, and manage their subscription — without touching config files or the command line **Goal**: An operator can sign up, onboard their tenant through a web UI, connect their messaging channels, configure their AI employee, and manage their subscription — without touching config files or the command line
@@ -79,7 +80,7 @@ Phases execute in numeric order: 1 → 2 → 3
| Phase | Plans Complete | Status | Completed | | Phase | Plans Complete | Status | Completed |
|-------|----------------|--------|-----------| |-------|----------------|--------|-----------|
| 1. Foundation | 4/4 | Complete | 2026-03-23 | | 1. Foundation | 4/4 | Complete | 2026-03-23 |
| 2. Agent Features | 5/5 | Complete | 2026-03-23 | | 2. Agent Features | 5/6 | Gap closure | - |
| 3. Operator Experience | 0/2 | Not started | - | | 3. Operator Experience | 0/2 | Not started | - |
--- ---

View File

@@ -0,0 +1,368 @@
---
phase: 02-agent-features
plan: 06
type: execute
wave: 1
depends_on: []
files_modified:
- packages/orchestrator/orchestrator/tasks.py
- packages/orchestrator/orchestrator/agents/builder.py
- tests/unit/test_pipeline_wiring.py
autonomous: true
gap_closure: true
requirements: [AGNT-05, AGNT-06, CHAN-03, CHAN-04]
must_haves:
truths:
- "When a configured escalation rule triggers, the conversation is handed off to a human"
- "A user can send a WhatsApp message and receive a reply (outbound routing works)"
- "WhatsApp messages get business-function scoping in the LLM system prompt (tier 2)"
artifacts:
- path: "packages/orchestrator/orchestrator/tasks.py"
provides: "Escalation wiring and channel-aware outbound routing"
contains: "check_escalation_rules"
- path: "packages/orchestrator/orchestrator/agents/builder.py"
provides: "Tier-2 WhatsApp system prompt scoping"
contains: "allowed_functions"
- path: "tests/unit/test_pipeline_wiring.py"
provides: "Tests verifying escalation and outbound routing in _process_message"
key_links:
- from: "packages/orchestrator/orchestrator/tasks.py"
to: "packages/orchestrator/orchestrator/escalation/handler.py"
via: "import and call check_escalation_rules + escalate_to_human in _process_message"
pattern: "check_escalation_rules|escalate_to_human"
- from: "packages/orchestrator/orchestrator/tasks.py"
to: "_send_response"
via: "Replace direct _update_slack_placeholder calls with _send_response"
pattern: "_send_response\\("
- from: "packages/orchestrator/orchestrator/agents/builder.py"
to: "Agent.tool_assignments"
via: "Append allowed_functions constraint to system prompt when channel is whatsapp"
pattern: "You only handle"
---
<objective>
Re-wire escalation handler and WhatsApp outbound routing into the orchestrator pipeline, and add tier-2 business-function scoping to the system prompt builder.
Purpose: Plans 02-02 and 02-05 rewrote tasks.py and dropped integrations from earlier plans. The escalation handler is orphaned (never called) and WhatsApp replies are silently lost (all responses go to Slack's chat.update). Tier-2 system prompt scoping for WhatsApp was never implemented.
Output: tasks.py calls escalation pre/post-checks and uses _send_response for all outbound; builder.py appends business-function constraint for WhatsApp channel; tests verify both wirings.
</objective>
<execution_context>
@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md
@/home/adelorenzo/.claude/get-shit-done/templates/summary.md
</execution_context>
<context>
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/02-agent-features/02-VERIFICATION.md
<interfaces>
<!-- Key types and contracts the executor needs. Extracted from codebase. -->
From packages/orchestrator/orchestrator/escalation/handler.py:
```python
def check_escalation_rules(
agent: Any,
message_text: str,
conversation_metadata: dict[str, Any],
natural_lang_enabled: bool = False,
) -> dict[str, Any] | None:
"""Returns first matching rule dict or None."""
async def escalate_to_human(
tenant_id: str,
agent: Any,
thread_id: str,
trigger_reason: str,
recent_messages: list[dict[str, str]],
assignee_slack_user_id: str,
bot_token: str,
redis: Any,
audit_logger: Any,
user_id: str = "",
agent_id: str = "",
) -> str:
"""Returns user-facing escalation confirmation message."""
```
From packages/orchestrator/orchestrator/tasks.py (current state):
```python
async def _process_message(
msg: KonstructMessage,
placeholder_ts: str = "",
channel_id: str = "",
) -> dict:
"""Lines 194-489. Three _update_slack_placeholder calls at lines 294, 366, 458."""
async def _send_response(channel: str, text: str, extras: dict) -> None:
"""Line 528. Defined but never called. Routes to Slack or WhatsApp."""
def handle_message(self, message_data: dict) -> dict:
"""Line 147. Pops placeholder_ts and channel_id before model_validate.
WhatsApp gateway injects phone_number_id and bot_token into task_payload
but handle_message does NOT pop them — they are lost during model_validate."""
```
From packages/shared/shared/redis_keys.py:
```python
def escalation_status_key(tenant_id: str, thread_id: str) -> str:
```
From packages/shared/shared/models/tenant.py:
```python
class Agent(Base):
escalation_rules: Mapped[list[Any]] # JSON list of rule dicts
escalation_assignee: Mapped[str | None] # Slack user ID
natural_language_escalation: Mapped[bool]
tool_assignments: Mapped[list[Any]] # JSON list — used as allowed_functions proxy
```
From packages/gateway/gateway/channels/whatsapp.py:
```python
# Line 604: task_payload = msg.model_dump() | {"phone_number_id": phone_number_id, "bot_token": access_token or ""}
# WhatsApp gateway injects phone_number_id and bot_token as extra keys in the Celery payload
```
</interfaces>
</context>
<tasks>
<task type="auto" tdd="true">
<name>Task 1: Re-wire escalation and outbound routing in tasks.py</name>
<files>
packages/orchestrator/orchestrator/tasks.py
tests/unit/test_pipeline_wiring.py
</files>
<behavior>
- Test: _process_message calls check_escalation_rules after LLM response and before memory persistence
- Test: When check_escalation_rules returns a matching rule AND agent.escalation_assignee is set, escalate_to_human is called and its return value replaces the LLM response
- Test: When escalation status is "escalated" in Redis (pre-check), _process_message returns assistant-mode reply without calling run_agent
- Test: _process_message uses _send_response (not _update_slack_placeholder directly) for all three response delivery points
- Test: For WhatsApp messages, _send_response receives extras with phone_number_id, bot_token, wa_id
- Test: handle_message pops phone_number_id, bot_token (WhatsApp extras) and wa_id before model_validate and passes them through to _process_message
</behavior>
<action>
**In handle_message (line ~179):**
Add extraction of WhatsApp extras alongside the existing Slack extras:
```python
phone_number_id: str = message_data.pop("phone_number_id", "") or ""
bot_token: str = message_data.pop("bot_token", "") or ""
```
Note: `channel_id` is already popped for Slack. `bot_token` here is the WhatsApp access_token injected by the gateway.
Pass these to _process_message. Change _process_message signature to accept an `extras: dict` parameter instead of individual `placeholder_ts` and `channel_id` params. The extras dict holds all channel-specific metadata:
- For Slack: `{"bot_token": slack_bot_token, "channel_id": channel_id, "placeholder_ts": placeholder_ts}`
- For WhatsApp: `{"phone_number_id": phone_number_id, "bot_token": bot_token, "wa_id": wa_id}`
In handle_message, build the extras dict from popped values:
```python
extras = {
"placeholder_ts": placeholder_ts,
"channel_id": channel_id,
"phone_number_id": phone_number_id,
"bot_token": bot_token,
}
```
Extract `wa_id` from `msg.sender.user_id` after model_validate (since the WhatsApp normalizer sets sender.user_id to the wa_id) and add to extras.
**In _process_message:**
1. Change signature: `async def _process_message(msg, extras: dict | None = None) -> dict`
2. Extract channel-specific values from extras at the top.
3. Replace all three `_update_slack_placeholder(...)` calls (lines 294, 366, 458) with `_send_response(msg.channel, text, extras_with_bot_token)` where extras_with_bot_token merges the Slack bot_token loaded from DB with the incoming extras.
- For the Slack path: the DB-loaded `slack_bot_token` should be added to extras if `msg.channel == "slack"`.
- For WhatsApp: extras already contain `phone_number_id` and `bot_token` from handle_message; add `wa_id` from extras or `msg.sender.user_id`.
4. **Escalation pre-check** (add BEFORE the pending tool confirmation block, after agent is loaded):
```python
# Escalation pre-check: if conversation is already escalated, respond in assistant mode
from shared.redis_keys import escalation_status_key
esc_key = escalation_status_key(msg.tenant_id, msg.thread_id or user_id)
esc_status = await redis_client.get(esc_key)
if esc_status == b"escalated":
assistant_reply = f"I've already connected you with a team member. They'll continue assisting you."
await _send_response(msg.channel, assistant_reply, response_extras)
return {"message_id": msg.id, "response": assistant_reply, "tenant_id": msg.tenant_id}
```
Use a single Redis client created before this block (reuse the one already created for pending_confirm_key). Close it in the finally block.
5. **Escalation post-check** (add AFTER the run_agent call and BEFORE the is_confirmation_request check):
```python
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
# Build conversation metadata from sliding window for rule evaluation
conversation_metadata = _build_conversation_metadata(recent_messages, user_text)
triggered_rule = check_escalation_rules(
agent=agent,
message_text=user_text,
conversation_metadata=conversation_metadata,
natural_lang_enabled=getattr(agent, "natural_language_escalation", False),
)
if triggered_rule and getattr(agent, "escalation_assignee", None):
escalation_redis = aioredis.from_url(settings.redis_url)
try:
response_text = await escalate_to_human(
tenant_id=msg.tenant_id,
agent=agent,
thread_id=msg.thread_id or user_id,
trigger_reason=triggered_rule.get("condition", "rule triggered"),
recent_messages=recent_messages,
assignee_slack_user_id=agent.escalation_assignee,
bot_token=slack_bot_token,
redis=escalation_redis,
audit_logger=audit_logger,
user_id=user_id,
agent_id=agent_id_str,
)
finally:
await escalation_redis.aclose()
```
6. **Add _build_conversation_metadata helper** (new function):
```python
def _build_conversation_metadata(recent_messages: list[dict], current_text: str) -> dict[str, Any]:
"""Build conversation metadata dict for escalation rule evaluation.
Scans recent messages for billing keywords and counts attempts.
"""
billing_keywords = {"billing", "invoice", "charge", "refund", "payment", "subscription"}
all_texts = [m.get("content", "") for m in recent_messages] + [current_text]
billing_count = sum(1 for t in all_texts if any(kw in t.lower() for kw in billing_keywords))
return {
"billing_dispute": billing_count > 0,
"attempts": billing_count,
}
```
This matches the v1 keyword-based metadata detection described in STATE.md decisions.
**Important constraints:**
- Celery tasks MUST remain sync def with asyncio.run() — never async def
- Import escalation functions inside _process_message (local import, matching existing pattern)
- Use `aioredis.from_url(settings.redis_url)` for new Redis clients (matching existing pattern in tasks.py)
- The Slack DB bot_token loading (lines 269-281) must be preserved — it's needed for escalation DM delivery even on WhatsApp messages
</action>
<verify>
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v</automated>
</verify>
<done>
- handle_message pops phone_number_id, bot_token from message_data before model_validate
- _process_message accepts extras dict and uses _send_response for ALL outbound delivery
- Escalation pre-check: already-escalated conversations get assistant-mode reply without LLM call
- Escalation post-check: check_escalation_rules called after LLM response; escalate_to_human called when rule matches and assignee is configured
- _build_conversation_metadata extracts billing keywords from sliding window
- All existing functionality preserved (memory pipeline, tool confirmation, audit logging)
</done>
</task>
<task type="auto" tdd="true">
<name>Task 2: Add tier-2 WhatsApp business-function scoping to system prompt builder</name>
<files>
packages/orchestrator/orchestrator/agents/builder.py
tests/unit/test_pipeline_wiring.py
</files>
<behavior>
- Test: build_system_prompt(agent, channel="whatsapp") appends "You only handle: {topics}" when agent.tool_assignments is non-empty
- Test: build_system_prompt(agent, channel="slack") does NOT append business-function scoping
- Test: build_system_prompt(agent, channel="whatsapp") with empty tool_assignments does NOT append scoping
- Test: build_messages_with_memory passes channel through to build_system_prompt
</behavior>
<action>
**In builder.py build_system_prompt:**
Add an optional `channel: str = ""` parameter:
```python
def build_system_prompt(agent: Agent, channel: str = "") -> str:
```
After the AI transparency clause (step 4), add step 5 — WhatsApp business-function scoping:
```python
# 5. WhatsApp tier-2 scoping — constrain LLM to declared business functions
if channel == "whatsapp":
functions: list[str] = getattr(agent, "tool_assignments", []) or []
if functions:
topics = ", ".join(functions)
parts.append(
f"You are responding on WhatsApp. You only handle: {topics}. "
f"If the user asks about something outside these topics, "
f"politely redirect them to the allowed topics."
)
```
**In builder.py build_messages_with_memory:**
Add optional `channel: str = ""` parameter and pass through:
```python
def build_messages_with_memory(agent, current_message, recent_messages, relevant_context, channel: str = "") -> list[dict]:
system_prompt = build_system_prompt(agent, channel=channel)
...
```
**In builder.py build_messages_with_media:**
Same change — add `channel: str = ""` parameter and pass to build_messages_with_memory.
**In tasks.py _process_message:**
Pass `msg.channel` to `build_messages_with_memory`:
```python
enriched_messages = build_messages_with_memory(
agent=agent,
current_message=user_text,
recent_messages=recent_messages,
relevant_context=relevant_context,
channel=msg.channel,
)
```
And similarly for the build_messages_with_media call if present.
Add tests for tier-2 scoping to the same test_pipeline_wiring.py file created in Task 1.
</action>
<verify>
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v</automated>
</verify>
<done>
- build_system_prompt appends business-function scoping when channel == "whatsapp" and tool_assignments is non-empty
- build_system_prompt does NOT append scoping for Slack or when tool_assignments is empty
- build_messages_with_memory and build_messages_with_media pass channel through
- _process_message passes msg.channel to builder functions
</done>
</task>
</tasks>
<verification>
After both tasks complete, run the full verification:
```bash
# Unit tests for new wiring
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v
# Existing escalation tests still pass
python -m pytest tests/unit/test_escalation.py -x -v
# Existing WhatsApp tests still pass
python -m pytest tests/unit/test_whatsapp_scoping.py tests/unit/test_whatsapp_normalize.py tests/unit/test_whatsapp_verify.py -x -v
# Grep verification: escalation is wired
grep -n "check_escalation_rules\|escalate_to_human" packages/orchestrator/orchestrator/tasks.py
# Grep verification: _send_response is called (not _update_slack_placeholder directly in _process_message)
grep -n "_send_response\|_update_slack_placeholder" packages/orchestrator/orchestrator/tasks.py
# Grep verification: tier-2 scoping exists
grep -n "You only handle" packages/orchestrator/orchestrator/agents/builder.py
```
</verification>
<success_criteria>
1. `check_escalation_rules` and `escalate_to_human` are imported and called in `_process_message`
2. `_send_response` is called at all response delivery points in `_process_message` (no direct `_update_slack_placeholder` calls remain in that function)
3. `build_system_prompt` appends business-function scoping for WhatsApp channel
4. All existing unit tests pass
5. New wiring tests pass
</success_criteria>
<output>
After completion, create `.planning/phases/02-agent-features/02-06-SUMMARY.md`
</output>