From 48d9ef0c29961d6a40790ef0ae4b9c6197c1ef80 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 23 Mar 2026 19:03:24 -0600 Subject: [PATCH] docs(02-agent-features): create gap closure plan for escalation and WhatsApp outbound wiring --- .planning/ROADMAP.md | 7 +- .../phases/02-agent-features/02-06-PLAN.md | 368 ++++++++++++++++++ 2 files changed, 372 insertions(+), 3 deletions(-) create mode 100644 .planning/phases/02-agent-features/02-06-PLAN.md diff --git a/.planning/ROADMAP.md b/.planning/ROADMAP.md index e284531..7aeae49 100644 --- a/.planning/ROADMAP.md +++ b/.planning/ROADMAP.md @@ -13,7 +13,7 @@ Konstruct ships in three coarse phases ordered by dependency: first build the se Decimal phases appear between their surrounding integers in numeric order. - [x] **Phase 1: Foundation** - Secure multi-tenant pipeline with Slack end-to-end and basic agent response (completed 2026-03-23) -- [x] **Phase 2: Agent Features** - Persistent memory, tool framework, WhatsApp integration, and human escalation (completed 2026-03-23) +- [ ] **Phase 2: Agent Features** - Persistent memory, tool framework, WhatsApp integration, and human escalation (gap closure in progress) - [ ] **Phase 3: Operator Experience** - Admin portal, tenant onboarding, and Stripe billing ## Phase Details @@ -47,7 +47,7 @@ Plans: 3. The agent can invoke a registered tool (e.g., knowledge base search) and incorporate the result into its response 4. When a configured escalation rule triggers (e.g., failed resolution attempts), the conversation and full context are handed off to a human with no information lost 5. Every LLM call, tool invocation, and handoff event is recorded in an immutable audit trail queryable by tenant -**Plans**: 5 plans +**Plans**: 6 plans Plans: - [ ] 02-01: Conversational memory layer (Redis sliding window + pgvector long-term storage with HNSW index) @@ -55,6 +55,7 @@ Plans: - [ ] 02-03: WhatsApp adapter (Business Cloud API, per-tenant phone numbers, media download, Meta policy compliance) - [ ] 02-04: Human escalation/handoff with full context transfer and audit trail - [ ] 02-05: Cross-channel media support and multimodal LLM interpretation (Slack file_share, image_url content blocks, channel-aware outbound routing) +- [ ] 02-06: Gap closure — re-wire escalation handler and WhatsApp outbound routing into pipeline, add tier-2 system prompt scoping ### Phase 3: Operator Experience **Goal**: An operator can sign up, onboard their tenant through a web UI, connect their messaging channels, configure their AI employee, and manage their subscription — without touching config files or the command line @@ -79,7 +80,7 @@ Phases execute in numeric order: 1 → 2 → 3 | Phase | Plans Complete | Status | Completed | |-------|----------------|--------|-----------| | 1. Foundation | 4/4 | Complete | 2026-03-23 | -| 2. Agent Features | 5/5 | Complete | 2026-03-23 | +| 2. Agent Features | 5/6 | Gap closure | - | | 3. Operator Experience | 0/2 | Not started | - | --- diff --git a/.planning/phases/02-agent-features/02-06-PLAN.md b/.planning/phases/02-agent-features/02-06-PLAN.md new file mode 100644 index 0000000..03f071d --- /dev/null +++ b/.planning/phases/02-agent-features/02-06-PLAN.md @@ -0,0 +1,368 @@ +--- +phase: 02-agent-features +plan: 06 +type: execute +wave: 1 +depends_on: [] +files_modified: + - packages/orchestrator/orchestrator/tasks.py + - packages/orchestrator/orchestrator/agents/builder.py + - tests/unit/test_pipeline_wiring.py +autonomous: true +gap_closure: true +requirements: [AGNT-05, AGNT-06, CHAN-03, CHAN-04] + +must_haves: + truths: + - "When a configured escalation rule triggers, the conversation is handed off to a human" + - "A user can send a WhatsApp message and receive a reply (outbound routing works)" + - "WhatsApp messages get business-function scoping in the LLM system prompt (tier 2)" + artifacts: + - path: "packages/orchestrator/orchestrator/tasks.py" + provides: "Escalation wiring and channel-aware outbound routing" + contains: "check_escalation_rules" + - path: "packages/orchestrator/orchestrator/agents/builder.py" + provides: "Tier-2 WhatsApp system prompt scoping" + contains: "allowed_functions" + - path: "tests/unit/test_pipeline_wiring.py" + provides: "Tests verifying escalation and outbound routing in _process_message" + key_links: + - from: "packages/orchestrator/orchestrator/tasks.py" + to: "packages/orchestrator/orchestrator/escalation/handler.py" + via: "import and call check_escalation_rules + escalate_to_human in _process_message" + pattern: "check_escalation_rules|escalate_to_human" + - from: "packages/orchestrator/orchestrator/tasks.py" + to: "_send_response" + via: "Replace direct _update_slack_placeholder calls with _send_response" + pattern: "_send_response\\(" + - from: "packages/orchestrator/orchestrator/agents/builder.py" + to: "Agent.tool_assignments" + via: "Append allowed_functions constraint to system prompt when channel is whatsapp" + pattern: "You only handle" +--- + + +Re-wire escalation handler and WhatsApp outbound routing into the orchestrator pipeline, and add tier-2 business-function scoping to the system prompt builder. + +Purpose: Plans 02-02 and 02-05 rewrote tasks.py and dropped integrations from earlier plans. The escalation handler is orphaned (never called) and WhatsApp replies are silently lost (all responses go to Slack's chat.update). Tier-2 system prompt scoping for WhatsApp was never implemented. + +Output: tasks.py calls escalation pre/post-checks and uses _send_response for all outbound; builder.py appends business-function constraint for WhatsApp channel; tests verify both wirings. + + + +@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md +@/home/adelorenzo/.claude/get-shit-done/templates/summary.md + + + +@.planning/PROJECT.md +@.planning/ROADMAP.md +@.planning/STATE.md +@.planning/phases/02-agent-features/02-VERIFICATION.md + + + + +From packages/orchestrator/orchestrator/escalation/handler.py: +```python +def check_escalation_rules( + agent: Any, + message_text: str, + conversation_metadata: dict[str, Any], + natural_lang_enabled: bool = False, +) -> dict[str, Any] | None: + """Returns first matching rule dict or None.""" + +async def escalate_to_human( + tenant_id: str, + agent: Any, + thread_id: str, + trigger_reason: str, + recent_messages: list[dict[str, str]], + assignee_slack_user_id: str, + bot_token: str, + redis: Any, + audit_logger: Any, + user_id: str = "", + agent_id: str = "", +) -> str: + """Returns user-facing escalation confirmation message.""" +``` + +From packages/orchestrator/orchestrator/tasks.py (current state): +```python +async def _process_message( + msg: KonstructMessage, + placeholder_ts: str = "", + channel_id: str = "", +) -> dict: + """Lines 194-489. Three _update_slack_placeholder calls at lines 294, 366, 458.""" + +async def _send_response(channel: str, text: str, extras: dict) -> None: + """Line 528. Defined but never called. Routes to Slack or WhatsApp.""" + +def handle_message(self, message_data: dict) -> dict: + """Line 147. Pops placeholder_ts and channel_id before model_validate. + WhatsApp gateway injects phone_number_id and bot_token into task_payload + but handle_message does NOT pop them — they are lost during model_validate.""" +``` + +From packages/shared/shared/redis_keys.py: +```python +def escalation_status_key(tenant_id: str, thread_id: str) -> str: +``` + +From packages/shared/shared/models/tenant.py: +```python +class Agent(Base): + escalation_rules: Mapped[list[Any]] # JSON list of rule dicts + escalation_assignee: Mapped[str | None] # Slack user ID + natural_language_escalation: Mapped[bool] + tool_assignments: Mapped[list[Any]] # JSON list — used as allowed_functions proxy +``` + +From packages/gateway/gateway/channels/whatsapp.py: +```python +# Line 604: task_payload = msg.model_dump() | {"phone_number_id": phone_number_id, "bot_token": access_token or ""} +# WhatsApp gateway injects phone_number_id and bot_token as extra keys in the Celery payload +``` + + + + + + + Task 1: Re-wire escalation and outbound routing in tasks.py + + packages/orchestrator/orchestrator/tasks.py + tests/unit/test_pipeline_wiring.py + + + - Test: _process_message calls check_escalation_rules after LLM response and before memory persistence + - Test: When check_escalation_rules returns a matching rule AND agent.escalation_assignee is set, escalate_to_human is called and its return value replaces the LLM response + - Test: When escalation status is "escalated" in Redis (pre-check), _process_message returns assistant-mode reply without calling run_agent + - Test: _process_message uses _send_response (not _update_slack_placeholder directly) for all three response delivery points + - Test: For WhatsApp messages, _send_response receives extras with phone_number_id, bot_token, wa_id + - Test: handle_message pops phone_number_id, bot_token (WhatsApp extras) and wa_id before model_validate and passes them through to _process_message + + + **In handle_message (line ~179):** + Add extraction of WhatsApp extras alongside the existing Slack extras: + ```python + phone_number_id: str = message_data.pop("phone_number_id", "") or "" + bot_token: str = message_data.pop("bot_token", "") or "" + ``` + Note: `channel_id` is already popped for Slack. `bot_token` here is the WhatsApp access_token injected by the gateway. + + Pass these to _process_message. Change _process_message signature to accept an `extras: dict` parameter instead of individual `placeholder_ts` and `channel_id` params. The extras dict holds all channel-specific metadata: + - For Slack: `{"bot_token": slack_bot_token, "channel_id": channel_id, "placeholder_ts": placeholder_ts}` + - For WhatsApp: `{"phone_number_id": phone_number_id, "bot_token": bot_token, "wa_id": wa_id}` + + In handle_message, build the extras dict from popped values: + ```python + extras = { + "placeholder_ts": placeholder_ts, + "channel_id": channel_id, + "phone_number_id": phone_number_id, + "bot_token": bot_token, + } + ``` + Extract `wa_id` from `msg.sender.user_id` after model_validate (since the WhatsApp normalizer sets sender.user_id to the wa_id) and add to extras. + + **In _process_message:** + 1. Change signature: `async def _process_message(msg, extras: dict | None = None) -> dict` + 2. Extract channel-specific values from extras at the top. + 3. Replace all three `_update_slack_placeholder(...)` calls (lines 294, 366, 458) with `_send_response(msg.channel, text, extras_with_bot_token)` where extras_with_bot_token merges the Slack bot_token loaded from DB with the incoming extras. + - For the Slack path: the DB-loaded `slack_bot_token` should be added to extras if `msg.channel == "slack"`. + - For WhatsApp: extras already contain `phone_number_id` and `bot_token` from handle_message; add `wa_id` from extras or `msg.sender.user_id`. + + 4. **Escalation pre-check** (add BEFORE the pending tool confirmation block, after agent is loaded): + ```python + # Escalation pre-check: if conversation is already escalated, respond in assistant mode + from shared.redis_keys import escalation_status_key + esc_key = escalation_status_key(msg.tenant_id, msg.thread_id or user_id) + esc_status = await redis_client.get(esc_key) + if esc_status == b"escalated": + assistant_reply = f"I've already connected you with a team member. They'll continue assisting you." + await _send_response(msg.channel, assistant_reply, response_extras) + return {"message_id": msg.id, "response": assistant_reply, "tenant_id": msg.tenant_id} + ``` + Use a single Redis client created before this block (reuse the one already created for pending_confirm_key). Close it in the finally block. + + 5. **Escalation post-check** (add AFTER the run_agent call and BEFORE the is_confirmation_request check): + ```python + from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human + + # Build conversation metadata from sliding window for rule evaluation + conversation_metadata = _build_conversation_metadata(recent_messages, user_text) + + triggered_rule = check_escalation_rules( + agent=agent, + message_text=user_text, + conversation_metadata=conversation_metadata, + natural_lang_enabled=getattr(agent, "natural_language_escalation", False), + ) + + if triggered_rule and getattr(agent, "escalation_assignee", None): + escalation_redis = aioredis.from_url(settings.redis_url) + try: + response_text = await escalate_to_human( + tenant_id=msg.tenant_id, + agent=agent, + thread_id=msg.thread_id or user_id, + trigger_reason=triggered_rule.get("condition", "rule triggered"), + recent_messages=recent_messages, + assignee_slack_user_id=agent.escalation_assignee, + bot_token=slack_bot_token, + redis=escalation_redis, + audit_logger=audit_logger, + user_id=user_id, + agent_id=agent_id_str, + ) + finally: + await escalation_redis.aclose() + ``` + + 6. **Add _build_conversation_metadata helper** (new function): + ```python + def _build_conversation_metadata(recent_messages: list[dict], current_text: str) -> dict[str, Any]: + """Build conversation metadata dict for escalation rule evaluation. + + Scans recent messages for billing keywords and counts attempts. + """ + billing_keywords = {"billing", "invoice", "charge", "refund", "payment", "subscription"} + all_texts = [m.get("content", "") for m in recent_messages] + [current_text] + billing_count = sum(1 for t in all_texts if any(kw in t.lower() for kw in billing_keywords)) + return { + "billing_dispute": billing_count > 0, + "attempts": billing_count, + } + ``` + This matches the v1 keyword-based metadata detection described in STATE.md decisions. + + **Important constraints:** + - Celery tasks MUST remain sync def with asyncio.run() — never async def + - Import escalation functions inside _process_message (local import, matching existing pattern) + - Use `aioredis.from_url(settings.redis_url)` for new Redis clients (matching existing pattern in tasks.py) + - The Slack DB bot_token loading (lines 269-281) must be preserved — it's needed for escalation DM delivery even on WhatsApp messages + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v + + + - handle_message pops phone_number_id, bot_token from message_data before model_validate + - _process_message accepts extras dict and uses _send_response for ALL outbound delivery + - Escalation pre-check: already-escalated conversations get assistant-mode reply without LLM call + - Escalation post-check: check_escalation_rules called after LLM response; escalate_to_human called when rule matches and assignee is configured + - _build_conversation_metadata extracts billing keywords from sliding window + - All existing functionality preserved (memory pipeline, tool confirmation, audit logging) + + + + + Task 2: Add tier-2 WhatsApp business-function scoping to system prompt builder + + packages/orchestrator/orchestrator/agents/builder.py + tests/unit/test_pipeline_wiring.py + + + - Test: build_system_prompt(agent, channel="whatsapp") appends "You only handle: {topics}" when agent.tool_assignments is non-empty + - Test: build_system_prompt(agent, channel="slack") does NOT append business-function scoping + - Test: build_system_prompt(agent, channel="whatsapp") with empty tool_assignments does NOT append scoping + - Test: build_messages_with_memory passes channel through to build_system_prompt + + + **In builder.py build_system_prompt:** + Add an optional `channel: str = ""` parameter: + ```python + def build_system_prompt(agent: Agent, channel: str = "") -> str: + ``` + + After the AI transparency clause (step 4), add step 5 — WhatsApp business-function scoping: + ```python + # 5. WhatsApp tier-2 scoping — constrain LLM to declared business functions + if channel == "whatsapp": + functions: list[str] = getattr(agent, "tool_assignments", []) or [] + if functions: + topics = ", ".join(functions) + parts.append( + f"You are responding on WhatsApp. You only handle: {topics}. " + f"If the user asks about something outside these topics, " + f"politely redirect them to the allowed topics." + ) + ``` + + **In builder.py build_messages_with_memory:** + Add optional `channel: str = ""` parameter and pass through: + ```python + def build_messages_with_memory(agent, current_message, recent_messages, relevant_context, channel: str = "") -> list[dict]: + system_prompt = build_system_prompt(agent, channel=channel) + ... + ``` + + **In builder.py build_messages_with_media:** + Same change — add `channel: str = ""` parameter and pass to build_messages_with_memory. + + **In tasks.py _process_message:** + Pass `msg.channel` to `build_messages_with_memory`: + ```python + enriched_messages = build_messages_with_memory( + agent=agent, + current_message=user_text, + recent_messages=recent_messages, + relevant_context=relevant_context, + channel=msg.channel, + ) + ``` + And similarly for the build_messages_with_media call if present. + + Add tests for tier-2 scoping to the same test_pipeline_wiring.py file created in Task 1. + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v + + + - build_system_prompt appends business-function scoping when channel == "whatsapp" and tool_assignments is non-empty + - build_system_prompt does NOT append scoping for Slack or when tool_assignments is empty + - build_messages_with_memory and build_messages_with_media pass channel through + - _process_message passes msg.channel to builder functions + + + + + + +After both tasks complete, run the full verification: + +```bash +# Unit tests for new wiring +cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_pipeline_wiring.py -x -v + +# Existing escalation tests still pass +python -m pytest tests/unit/test_escalation.py -x -v + +# Existing WhatsApp tests still pass +python -m pytest tests/unit/test_whatsapp_scoping.py tests/unit/test_whatsapp_normalize.py tests/unit/test_whatsapp_verify.py -x -v + +# Grep verification: escalation is wired +grep -n "check_escalation_rules\|escalate_to_human" packages/orchestrator/orchestrator/tasks.py + +# Grep verification: _send_response is called (not _update_slack_placeholder directly in _process_message) +grep -n "_send_response\|_update_slack_placeholder" packages/orchestrator/orchestrator/tasks.py + +# Grep verification: tier-2 scoping exists +grep -n "You only handle" packages/orchestrator/orchestrator/agents/builder.py +``` + + + +1. `check_escalation_rules` and `escalate_to_human` are imported and called in `_process_message` +2. `_send_response` is called at all response delivery points in `_process_message` (no direct `_update_slack_placeholder` calls remain in that function) +3. `build_system_prompt` appends business-function scoping for WhatsApp channel +4. All existing unit tests pass +5. New wiring tests pass + + + +After completion, create `.planning/phases/02-agent-features/02-06-SUMMARY.md` +