--- phase: 02-agent-features plan: 06 type: execute wave: 1 depends_on: [] files_modified: - packages/orchestrator/orchestrator/tasks.py - packages/orchestrator/orchestrator/agents/builder.py - tests/unit/test_pipeline_wiring.py autonomous: true gap_closure: true requirements: [AGNT-05, AGNT-06, CHAN-03, CHAN-04] must_haves: truths: - "When a configured escalation rule triggers, the conversation is handed off to a human" - "A user can send a WhatsApp message and receive a reply (outbound routing works)" - "WhatsApp messages get business-function scoping in the LLM system prompt (tier 2)" artifacts: - path: "packages/orchestrator/orchestrator/tasks.py" provides: "Escalation wiring and channel-aware outbound routing" contains: "check_escalation_rules" - path: "packages/orchestrator/orchestrator/agents/builder.py" provides: "Tier-2 WhatsApp system prompt scoping" contains: "allowed_functions" - path: "tests/unit/test_pipeline_wiring.py" provides: "Tests verifying escalation and outbound routing in _process_message" key_links: - from: "packages/orchestrator/orchestrator/tasks.py" to: "packages/orchestrator/orchestrator/escalation/handler.py" via: "import and call check_escalation_rules + escalate_to_human in _process_message" pattern: "check_escalation_rules|escalate_to_human" - from: "packages/orchestrator/orchestrator/tasks.py" to: "_send_response" via: "Replace direct _update_slack_placeholder calls with _send_response" pattern: "_send_response\\(" - from: "packages/orchestrator/orchestrator/agents/builder.py" to: "Agent.tool_assignments" via: "Append allowed_functions constraint to system prompt when channel is whatsapp" pattern: "You only handle" --- Re-wire escalation handler and WhatsApp outbound routing into the orchestrator pipeline, and add tier-2 business-function scoping to the system prompt builder. Purpose: Plans 02-02 and 02-05 rewrote tasks.py and dropped integrations from earlier plans. The escalation handler is orphaned (never called) and WhatsApp replies are silently lost (all responses go to Slack's chat.update). Tier-2 system prompt scoping for WhatsApp was never implemented. Output: tasks.py calls escalation pre/post-checks and uses _send_response for all outbound; builder.py appends business-function constraint for WhatsApp channel; tests verify both wirings. @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-agent-features/02-VERIFICATION.md From packages/orchestrator/orchestrator/escalation/handler.py: ```python def check_escalation_rules( agent: Any, message_text: str, conversation_metadata: dict[str, Any], natural_lang_enabled: bool = False, ) -> dict[str, Any] | None: """Returns first matching rule dict or None.""" async def escalate_to_human( tenant_id: str, agent: Any, thread_id: str, trigger_reason: str, recent_messages: list[dict[str, str]], assignee_slack_user_id: str, bot_token: str, redis: Any, audit_logger: Any, user_id: str = "", agent_id: str = "", ) -> str: """Returns user-facing escalation confirmation message.""" ``` From packages/orchestrator/orchestrator/tasks.py (current state): ```python async def _process_message( msg: KonstructMessage, placeholder_ts: str = "", channel_id: str = "", ) -> dict: """Lines 194-489. Three _update_slack_placeholder calls at lines 294, 366, 458.""" async def _send_response(channel: str, text: str, extras: dict) -> None: """Line 528. Defined but never called. Routes to Slack or WhatsApp.""" def handle_message(self, message_data: dict) -> dict: """Line 147. Pops placeholder_ts and channel_id before model_validate. WhatsApp gateway injects phone_number_id and bot_token into task_payload but handle_message does NOT pop them — they are lost during model_validate.""" ``` From packages/shared/shared/redis_keys.py: ```python def escalation_status_key(tenant_id: str, thread_id: str) -> str: ``` From packages/shared/shared/models/tenant.py: ```python class Agent(Base): escalation_rules: Mapped[list[Any]] # JSON list of rule dicts escalation_assignee: Mapped[str | None] # Slack user ID natural_language_escalation: Mapped[bool] tool_assignments: Mapped[list[Any]] # JSON list — used as allowed_functions proxy ``` From packages/gateway/gateway/channels/whatsapp.py: ```python # Line 604: task_payload = msg.model_dump() | {"phone_number_id": phone_number_id, "bot_token": access_token or ""} # WhatsApp gateway injects phone_number_id and bot_token as extra keys in the Celery payload ``` Task 1: Re-wire escalation and outbound routing in tasks.py packages/orchestrator/orchestrator/tasks.py tests/unit/test_pipeline_wiring.py - Test: _process_message calls check_escalation_rules after LLM response and before memory persistence - Test: When check_escalation_rules returns a matching rule AND agent.escalation_assignee is set, escalate_to_human is called and its return value replaces the LLM response - Test: When escalation status is "escalated" in Redis (pre-check), _process_message returns assistant-mode reply without calling run_agent - Test: _process_message uses _send_response (not _update_slack_placeholder directly) for all three response delivery points - Test: For WhatsApp messages, _send_response receives extras with phone_number_id, bot_token, wa_id - Test: handle_message pops phone_number_id, bot_token (WhatsApp extras) and wa_id before model_validate and passes them through to _process_message **In handle_message (line ~179):** Add extraction of WhatsApp extras alongside the existing Slack extras: ```python phone_number_id: str = message_data.pop("phone_number_id", "") or "" bot_token: str = message_data.pop("bot_token", "") or "" ``` Note: `channel_id` is already popped for Slack. `bot_token` here is the WhatsApp access_token injected by the gateway. Pass these to _process_message. Change _process_message signature to accept an `extras: dict` parameter instead of individual `placeholder_ts` and `channel_id` params. The extras dict holds all channel-specific metadata: - For Slack: `{"bot_token": slack_bot_token, "channel_id": channel_id, "placeholder_ts": placeholder_ts}` - For WhatsApp: `{"phone_number_id": phone_number_id, "bot_token": bot_token, "wa_id": wa_id}` In handle_message, build the extras dict from popped values: ```python extras = { "placeholder_ts": placeholder_ts, "channel_id": channel_id, "phone_number_id": phone_number_id, "bot_token": bot_token, } ``` Extract `wa_id` from `msg.sender.user_id` after model_validate (since the WhatsApp normalizer sets sender.user_id to the wa_id) and add to extras. **In _process_message:** 1. Change signature: `async def _process_message(msg, extras: dict | None = None) -> dict` 2. Extract channel-specific values from extras at the top. 3. Replace all three `_update_slack_placeholder(...)` calls (lines 294, 366, 458) with `_send_response(msg.channel, text, extras_with_bot_token)` where extras_with_bot_token merges the Slack bot_token loaded from DB with the incoming extras. - For the Slack path: the DB-loaded `slack_bot_token` should be added to extras if `msg.channel == "slack"`. - For WhatsApp: extras already contain `phone_number_id` and `bot_token` from handle_message; add `wa_id` from extras or `msg.sender.user_id`. 4. **Escalation pre-check** (add BEFORE the pending tool confirmation block, after agent is loaded): ```python # Escalation pre-check: if conversation is already escalated, respond in assistant mode from shared.redis_keys import escalation_status_key esc_key = escalation_status_key(msg.tenant_id, msg.thread_id or user_id) esc_status = await redis_client.get(esc_key) if esc_status == b"escalated": assistant_reply = f"I've already connected you with a team member. They'll continue assisting you." await _send_response(msg.channel, assistant_reply, response_extras) return {"message_id": msg.id, "response": assistant_reply, "tenant_id": msg.tenant_id} ``` Use a single Redis client created before this block (reuse the one already created for pending_confirm_key). Close it in the finally block. 5. **Escalation post-check** (add AFTER the run_agent call and BEFORE the is_confirmation_request check): ```python from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human # Build conversation metadata from sliding window for rule evaluation conversation_metadata = _build_conversation_metadata(recent_messages, user_text) triggered_rule = check_escalation_rules( agent=agent, message_text=user_text, conversation_metadata=conversation_metadata, natural_lang_enabled=getattr(agent, "natural_language_escalation", False), ) if triggered_rule and getattr(agent, "escalation_assignee", None): escalation_redis = aioredis.from_url(settings.redis_url) try: response_text = await escalate_to_human( tenant_id=msg.tenant_id, agent=agent, thread_id=msg.thread_id or user_id, trigger_reason=triggered_rule.get("condition", "rule triggered"), recent_messages=recent_messages, assignee_slack_user_id=agent.escalation_assignee, bot_token=slack_bot_token, redis=escalation_redis, audit_logger=audit_logger, user_id=user_id, agent_id=agent_id_str, ) finally: await escalation_redis.aclose() ``` 6. **Add _build_conversation_metadata helper** (new function): ```python def _build_conversation_metadata(recent_messages: list[dict], current_text: str) -> dict[str, Any]: """Build conversation metadata dict for escalation rule evaluation. Scans recent messages for billing keywords and counts attempts. """ billing_keywords = {"billing", "invoice", "charge", "refund", "payment", "subscription"} all_texts = [m.get("content", "") for m in recent_messages] + [current_text] billing_count = sum(1 for t in all_texts if any(kw in t.lower() for kw in billing_keywords)) return { "billing_dispute": billing_count > 0, "attempts": billing_count, } ``` This matches the v1 keyword-based metadata detection described in STATE.md decisions. **Important constraints:** - Celery tasks MUST remain sync def with asyncio.run() — never async def - Import escalation functions inside _process_message (local import, matching existing pattern) - Use `aioredis.from_url(settings.redis_url)` for new Redis clients (matching existing pattern in tasks.py) - The Slack DB bot_token loading (lines 269-281) must be preserved — it's needed for escalation DM delivery even on WhatsApp messages cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v - handle_message pops phone_number_id, bot_token from message_data before model_validate - _process_message accepts extras dict and uses _send_response for ALL outbound delivery - Escalation pre-check: already-escalated conversations get assistant-mode reply without LLM call - Escalation post-check: check_escalation_rules called after LLM response; escalate_to_human called when rule matches and assignee is configured - _build_conversation_metadata extracts billing keywords from sliding window - All existing functionality preserved (memory pipeline, tool confirmation, audit logging) Task 2: Add tier-2 WhatsApp business-function scoping to system prompt builder packages/orchestrator/orchestrator/agents/builder.py tests/unit/test_pipeline_wiring.py - Test: build_system_prompt(agent, channel="whatsapp") appends "You only handle: {topics}" when agent.tool_assignments is non-empty - Test: build_system_prompt(agent, channel="slack") does NOT append business-function scoping - Test: build_system_prompt(agent, channel="whatsapp") with empty tool_assignments does NOT append scoping - Test: build_messages_with_memory passes channel through to build_system_prompt **In builder.py build_system_prompt:** Add an optional `channel: str = ""` parameter: ```python def build_system_prompt(agent: Agent, channel: str = "") -> str: ``` After the AI transparency clause (step 4), add step 5 — WhatsApp business-function scoping: ```python # 5. WhatsApp tier-2 scoping — constrain LLM to declared business functions if channel == "whatsapp": functions: list[str] = getattr(agent, "tool_assignments", []) or [] if functions: topics = ", ".join(functions) parts.append( f"You are responding on WhatsApp. You only handle: {topics}. " f"If the user asks about something outside these topics, " f"politely redirect them to the allowed topics." ) ``` **In builder.py build_messages_with_memory:** Add optional `channel: str = ""` parameter and pass through: ```python def build_messages_with_memory(agent, current_message, recent_messages, relevant_context, channel: str = "") -> list[dict]: system_prompt = build_system_prompt(agent, channel=channel) ... ``` **In builder.py build_messages_with_media:** Same change — add `channel: str = ""` parameter and pass to build_messages_with_memory. **In tasks.py _process_message:** Pass `msg.channel` to `build_messages_with_memory`: ```python enriched_messages = build_messages_with_memory( agent=agent, current_message=user_text, recent_messages=recent_messages, relevant_context=relevant_context, channel=msg.channel, ) ``` And similarly for the build_messages_with_media call if present. Add tests for tier-2 scoping to the same test_pipeline_wiring.py file created in Task 1. cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v - build_system_prompt appends business-function scoping when channel == "whatsapp" and tool_assignments is non-empty - build_system_prompt does NOT append scoping for Slack or when tool_assignments is empty - build_messages_with_memory and build_messages_with_media pass channel through - _process_message passes msg.channel to builder functions After both tasks complete, run the full verification: ```bash # Unit tests for new wiring cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v # Existing escalation tests still pass python -m pytest tests/unit/test_escalation.py -x -v # Existing WhatsApp tests still pass python -m pytest tests/unit/test_whatsapp_scoping.py tests/unit/test_whatsapp_normalize.py tests/unit/test_whatsapp_verify.py -x -v # Grep verification: escalation is wired grep -n "check_escalation_rules\|escalate_to_human" packages/orchestrator/orchestrator/tasks.py # Grep verification: _send_response is called (not _update_slack_placeholder directly in _process_message) grep -n "_send_response\|_update_slack_placeholder" packages/orchestrator/orchestrator/tasks.py # Grep verification: tier-2 scoping exists grep -n "You only handle" packages/orchestrator/orchestrator/agents/builder.py ``` 1. `check_escalation_rules` and `escalate_to_human` are imported and called in `_process_message` 2. `_send_response` is called at all response delivery points in `_process_message` (no direct `_update_slack_placeholder` calls remain in that function) 3. `build_system_prompt` appends business-function scoping for WhatsApp channel 4. All existing unit tests pass 5. New wiring tests pass After completion, create `.planning/phases/02-agent-features/02-06-SUMMARY.md`