17 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, gap_closure, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | gap_closure | requirements | must_haves | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 02-agent-features | 06 | execute | 1 |
|
true | true |
|
|
Purpose: Plans 02-02 and 02-05 rewrote tasks.py and dropped integrations from earlier plans. The escalation handler is orphaned (never called) and WhatsApp replies are silently lost (all responses go to Slack's chat.update). Tier-2 system prompt scoping for WhatsApp was never implemented.
Output: tasks.py calls escalation pre/post-checks and uses _send_response for all outbound; builder.py appends business-function constraint for WhatsApp channel; tests verify both wirings.
<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-agent-features/02-VERIFICATION.mdFrom packages/orchestrator/orchestrator/escalation/handler.py:
def check_escalation_rules(
agent: Any,
message_text: str,
conversation_metadata: dict[str, Any],
natural_lang_enabled: bool = False,
) -> dict[str, Any] | None:
"""Returns first matching rule dict or None."""
async def escalate_to_human(
tenant_id: str,
agent: Any,
thread_id: str,
trigger_reason: str,
recent_messages: list[dict[str, str]],
assignee_slack_user_id: str,
bot_token: str,
redis: Any,
audit_logger: Any,
user_id: str = "",
agent_id: str = "",
) -> str:
"""Returns user-facing escalation confirmation message."""
From packages/orchestrator/orchestrator/tasks.py (current state):
async def _process_message(
msg: KonstructMessage,
placeholder_ts: str = "",
channel_id: str = "",
) -> dict:
"""Lines 194-489. Three _update_slack_placeholder calls at lines 294, 366, 458."""
async def _send_response(channel: str, text: str, extras: dict) -> None:
"""Line 528. Defined but never called. Routes to Slack or WhatsApp."""
def handle_message(self, message_data: dict) -> dict:
"""Line 147. Pops placeholder_ts and channel_id before model_validate.
WhatsApp gateway injects phone_number_id and bot_token into task_payload
but handle_message does NOT pop them — they are lost during model_validate."""
From packages/shared/shared/redis_keys.py:
def escalation_status_key(tenant_id: str, thread_id: str) -> str:
From packages/shared/shared/models/tenant.py:
class Agent(Base):
escalation_rules: Mapped[list[Any]] # JSON list of rule dicts
escalation_assignee: Mapped[str | None] # Slack user ID
natural_language_escalation: Mapped[bool]
tool_assignments: Mapped[list[Any]] # JSON list — used as allowed_functions proxy
From packages/gateway/gateway/channels/whatsapp.py:
# Line 604: task_payload = msg.model_dump() | {"phone_number_id": phone_number_id, "bot_token": access_token or ""}
# WhatsApp gateway injects phone_number_id and bot_token as extra keys in the Celery payload
Pass these to _process_message. Change _process_message signature to accept an `extras: dict` parameter instead of individual `placeholder_ts` and `channel_id` params. The extras dict holds all channel-specific metadata:
- For Slack: `{"bot_token": slack_bot_token, "channel_id": channel_id, "placeholder_ts": placeholder_ts}`
- For WhatsApp: `{"phone_number_id": phone_number_id, "bot_token": bot_token, "wa_id": wa_id}`
In handle_message, build the extras dict from popped values:
```python
extras = {
"placeholder_ts": placeholder_ts,
"channel_id": channel_id,
"phone_number_id": phone_number_id,
"bot_token": bot_token,
}
```
Extract `wa_id` from `msg.sender.user_id` after model_validate (since the WhatsApp normalizer sets sender.user_id to the wa_id) and add to extras.
**In _process_message:**
1. Change signature: `async def _process_message(msg, extras: dict | None = None) -> dict`
2. Extract channel-specific values from extras at the top.
3. Replace all three `_update_slack_placeholder(...)` calls (lines 294, 366, 458) with `_send_response(msg.channel, text, extras_with_bot_token)` where extras_with_bot_token merges the Slack bot_token loaded from DB with the incoming extras.
- For the Slack path: the DB-loaded `slack_bot_token` should be added to extras if `msg.channel == "slack"`.
- For WhatsApp: extras already contain `phone_number_id` and `bot_token` from handle_message; add `wa_id` from extras or `msg.sender.user_id`.
4. **Escalation pre-check** (add BEFORE the pending tool confirmation block, after agent is loaded):
```python
# Escalation pre-check: if conversation is already escalated, respond in assistant mode
from shared.redis_keys import escalation_status_key
esc_key = escalation_status_key(msg.tenant_id, msg.thread_id or user_id)
esc_status = await redis_client.get(esc_key)
if esc_status == b"escalated":
assistant_reply = f"I've already connected you with a team member. They'll continue assisting you."
await _send_response(msg.channel, assistant_reply, response_extras)
return {"message_id": msg.id, "response": assistant_reply, "tenant_id": msg.tenant_id}
```
Use a single Redis client created before this block (reuse the one already created for pending_confirm_key). Close it in the finally block.
5. **Escalation post-check** (add AFTER the run_agent call and BEFORE the is_confirmation_request check):
```python
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
# Build conversation metadata from sliding window for rule evaluation
conversation_metadata = _build_conversation_metadata(recent_messages, user_text)
triggered_rule = check_escalation_rules(
agent=agent,
message_text=user_text,
conversation_metadata=conversation_metadata,
natural_lang_enabled=getattr(agent, "natural_language_escalation", False),
)
if triggered_rule and getattr(agent, "escalation_assignee", None):
escalation_redis = aioredis.from_url(settings.redis_url)
try:
response_text = await escalate_to_human(
tenant_id=msg.tenant_id,
agent=agent,
thread_id=msg.thread_id or user_id,
trigger_reason=triggered_rule.get("condition", "rule triggered"),
recent_messages=recent_messages,
assignee_slack_user_id=agent.escalation_assignee,
bot_token=slack_bot_token,
redis=escalation_redis,
audit_logger=audit_logger,
user_id=user_id,
agent_id=agent_id_str,
)
finally:
await escalation_redis.aclose()
```
6. **Add _build_conversation_metadata helper** (new function):
```python
def _build_conversation_metadata(recent_messages: list[dict], current_text: str) -> dict[str, Any]:
"""Build conversation metadata dict for escalation rule evaluation.
Scans recent messages for billing keywords and counts attempts.
"""
billing_keywords = {"billing", "invoice", "charge", "refund", "payment", "subscription"}
all_texts = [m.get("content", "") for m in recent_messages] + [current_text]
billing_count = sum(1 for t in all_texts if any(kw in t.lower() for kw in billing_keywords))
return {
"billing_dispute": billing_count > 0,
"attempts": billing_count,
}
```
This matches the v1 keyword-based metadata detection described in STATE.md decisions.
**Important constraints:**
- Celery tasks MUST remain sync def with asyncio.run() — never async def
- Import escalation functions inside _process_message (local import, matching existing pattern)
- Use `aioredis.from_url(settings.redis_url)` for new Redis clients (matching existing pattern in tasks.py)
- The Slack DB bot_token loading (lines 269-281) must be preserved — it's needed for escalation DM delivery even on WhatsApp messages
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v
- handle_message pops phone_number_id, bot_token from message_data before model_validate
- _process_message accepts extras dict and uses _send_response for ALL outbound delivery
- Escalation pre-check: already-escalated conversations get assistant-mode reply without LLM call
- Escalation post-check: check_escalation_rules called after LLM response; escalate_to_human called when rule matches and assignee is configured
- _build_conversation_metadata extracts billing keywords from sliding window
- All existing functionality preserved (memory pipeline, tool confirmation, audit logging)
Task 2: Add tier-2 WhatsApp business-function scoping to system prompt builder
packages/orchestrator/orchestrator/agents/builder.py
tests/unit/test_pipeline_wiring.py
- Test: build_system_prompt(agent, channel="whatsapp") appends "You only handle: {topics}" when agent.tool_assignments is non-empty
- Test: build_system_prompt(agent, channel="slack") does NOT append business-function scoping
- Test: build_system_prompt(agent, channel="whatsapp") with empty tool_assignments does NOT append scoping
- Test: build_messages_with_memory passes channel through to build_system_prompt
**In builder.py build_system_prompt:**
Add an optional `channel: str = ""` parameter:
```python
def build_system_prompt(agent: Agent, channel: str = "") -> str:
```
After the AI transparency clause (step 4), add step 5 — WhatsApp business-function scoping:
```python
# 5. WhatsApp tier-2 scoping — constrain LLM to declared business functions
if channel == "whatsapp":
functions: list[str] = getattr(agent, "tool_assignments", []) or []
if functions:
topics = ", ".join(functions)
parts.append(
f"You are responding on WhatsApp. You only handle: {topics}. "
f"If the user asks about something outside these topics, "
f"politely redirect them to the allowed topics."
)
```
**In builder.py build_messages_with_memory:**
Add optional `channel: str = ""` parameter and pass through:
```python
def build_messages_with_memory(agent, current_message, recent_messages, relevant_context, channel: str = "") -> list[dict]:
system_prompt = build_system_prompt(agent, channel=channel)
...
```
**In builder.py build_messages_with_media:**
Same change — add `channel: str = ""` parameter and pass to build_messages_with_memory.
**In tasks.py _process_message:**
Pass `msg.channel` to `build_messages_with_memory`:
```python
enriched_messages = build_messages_with_memory(
agent=agent,
current_message=user_text,
recent_messages=recent_messages,
relevant_context=relevant_context,
channel=msg.channel,
)
```
And similarly for the build_messages_with_media call if present.
Add tests for tier-2 scoping to the same test_pipeline_wiring.py file created in Task 1.
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v
- build_system_prompt appends business-function scoping when channel == "whatsapp" and tool_assignments is non-empty
- build_system_prompt does NOT append scoping for Slack or when tool_assignments is empty
- build_messages_with_memory and build_messages_with_media pass channel through
- _process_message passes msg.channel to builder functions
After both tasks complete, run the full verification:
# Unit tests for new wiring
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v
# Existing escalation tests still pass
python -m pytest tests/unit/test_escalation.py -x -v
# Existing WhatsApp tests still pass
python -m pytest tests/unit/test_whatsapp_scoping.py tests/unit/test_whatsapp_normalize.py tests/unit/test_whatsapp_verify.py -x -v
# Grep verification: escalation is wired
grep -n "check_escalation_rules\|escalate_to_human" packages/orchestrator/orchestrator/tasks.py
# Grep verification: _send_response is called (not _update_slack_placeholder directly in _process_message)
grep -n "_send_response\|_update_slack_placeholder" packages/orchestrator/orchestrator/tasks.py
# Grep verification: tier-2 scoping exists
grep -n "You only handle" packages/orchestrator/orchestrator/agents/builder.py
<success_criteria>
check_escalation_rulesandescalate_to_humanare imported and called in_process_message_send_responseis called at all response delivery points in_process_message(no direct_update_slack_placeholdercalls remain in that function)build_system_promptappends business-function scoping for WhatsApp channel- All existing unit tests pass
- New wiring tests pass </success_criteria>