diff --git a/.planning/phases/02-agent-features/02-01-PLAN.md b/.planning/phases/02-agent-features/02-01-PLAN.md new file mode 100644 index 0000000..bf28074 --- /dev/null +++ b/.planning/phases/02-agent-features/02-01-PLAN.md @@ -0,0 +1,257 @@ +--- +phase: 02-agent-features +plan: 01 +type: execute +wave: 1 +depends_on: [] +files_modified: + - packages/shared/shared/models/memory.py + - packages/shared/shared/redis_keys.py + - packages/orchestrator/orchestrator/memory/__init__.py + - packages/orchestrator/orchestrator/memory/short_term.py + - packages/orchestrator/orchestrator/memory/long_term.py + - packages/orchestrator/orchestrator/agents/builder.py + - packages/orchestrator/orchestrator/tasks.py + - migrations/versions/002_phase2_memory.py + - tests/unit/test_memory_short_term.py + - tests/integration/test_memory_long_term.py + - tests/conftest.py +autonomous: true +requirements: + - AGNT-02 + - AGNT-03 + +must_haves: + truths: + - "Agent includes the last 20 messages verbatim in the LLM prompt for immediate context" + - "Agent retrieves up to 3 semantically relevant past exchanges from pgvector when assembling the prompt" + - "Memory is keyed per-user per-agent — different users talking to the same agent have isolated memory" + - "Cross-tenant vector search is impossible — tenant_id pre-filter enforced on every pgvector query" + - "Embedding backfill runs asynchronously via Celery task — never blocks the LLM response" + artifacts: + - path: "packages/orchestrator/orchestrator/memory/short_term.py" + provides: "Redis sliding window (RPUSH/LTRIM/LRANGE)" + exports: ["get_recent_messages", "append_message"] + - path: "packages/orchestrator/orchestrator/memory/long_term.py" + provides: "pgvector embedding store + HNSW similarity search" + exports: ["retrieve_relevant", "store_embedding"] + - path: "packages/shared/shared/models/memory.py" + provides: "ConversationMessage and ConversationEmbedding ORM models" + contains: "class ConversationEmbedding" + - path: "migrations/versions/002_phase2_memory.py" + provides: "Alembic migration for conversation_embeddings table with HNSW index" + key_links: + - from: "packages/orchestrator/orchestrator/tasks.py" + to: "orchestrator/memory/short_term.py" + via: "get_recent_messages + append_message called in handle_message" + pattern: "get_recent_messages|append_message" + - from: "packages/orchestrator/orchestrator/agents/builder.py" + to: "orchestrator/memory/long_term.py" + via: "retrieve_relevant called during prompt assembly" + pattern: "retrieve_relevant" + - from: "packages/orchestrator/orchestrator/tasks.py" + to: "embed_and_store Celery task" + via: "fire-and-forget delay() after response" + pattern: "embed_and_store\\.delay" +--- + + +Build the two-layer conversational memory system: Redis sliding window for immediate context (last 20 messages) and pgvector long-term storage with HNSW similarity search for cross-conversation recall. + +Purpose: Transforms the stateless single-turn agent from Phase 1 into one that remembers conversations and user preferences across sessions and channels. +Output: Memory modules, DB migration, updated orchestrator pipeline, passing tests. + + + +@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md +@/home/adelorenzo/.claude/get-shit-done/templates/summary.md + + + +@.planning/PROJECT.md +@.planning/ROADMAP.md +@.planning/STATE.md +@.planning/phases/02-agent-features/02-CONTEXT.md +@.planning/phases/02-agent-features/02-RESEARCH.md + +@packages/shared/shared/redis_keys.py +@packages/shared/shared/models/tenant.py +@packages/shared/shared/models/message.py +@packages/shared/shared/rls.py +@packages/shared/shared/db.py +@packages/orchestrator/orchestrator/tasks.py +@packages/orchestrator/orchestrator/agents/builder.py +@packages/orchestrator/orchestrator/agents/runner.py +@migrations/versions/001_initial_schema.py +@tests/conftest.py + + + + +From packages/shared/shared/redis_keys.py: +- Existing key constructors: rate_limit_key(), idempotency_key(), session_key(), engaged_thread_key() +- Pattern: def key_name(tenant_id: str, ...) -> str returning "{tenant_id}:namespace:..." + +From packages/orchestrator/orchestrator/tasks.py: +- handle_message Celery task (sync def with asyncio.run()) +- Receives msg dict from Celery, reconstructs KonstructMessage via model_validate +- Loads agent via load_agent_for_tenant +- Calls run_agent to get LLM response +- Posts response via Slack chat.update + +From packages/orchestrator/orchestrator/agents/builder.py: +- build_system_prompt(agent: Agent) -> str +- Assembles system_prompt + identity + persona + AI transparency clause + +From packages/orchestrator/orchestrator/agents/runner.py: +- run_agent(msg: KonstructMessage, agent: Agent) -> str +- httpx POST to llm-pool /complete with messages array + + + + + + + Task 1: DB models, migration, and memory modules with tests + + packages/shared/shared/models/memory.py, + packages/shared/shared/redis_keys.py, + packages/orchestrator/orchestrator/memory/__init__.py, + packages/orchestrator/orchestrator/memory/short_term.py, + packages/orchestrator/orchestrator/memory/long_term.py, + migrations/versions/002_phase2_memory.py, + tests/unit/test_memory_short_term.py, + tests/integration/test_memory_long_term.py, + tests/conftest.py + + + - get_recent_messages returns last N messages from Redis as list[dict] with role/content keys + - append_message adds a message and trims the list to window size (20) + - append_message with window=5 keeps only last 5 messages (parameterized trim) + - get_recent_messages on empty key returns empty list + - Memory keys are namespaced: {tenant_id}:memory:short:{agent_id}:{user_id} + - retrieve_relevant returns top-K content strings above cosine similarity threshold + - retrieve_relevant with tenant_id=A never returns tenant_id=B embeddings (cross-tenant isolation) + - retrieve_relevant with threshold=0.99 and dissimilar query returns empty list + - store_embedding inserts a row into conversation_embeddings with correct tenant/agent/user scoping + - ConversationEmbedding model has: id, tenant_id, agent_id, user_id, content, role, embedding (vector 384), created_at + - Migration creates conversation_embeddings table with HNSW index and RLS policy + + + 1. Create `packages/shared/shared/models/memory.py` with SQLAlchemy 2.0 `Mapped[]` style: + - ConversationEmbedding: id (UUID PK), tenant_id (UUID NOT NULL), agent_id (UUID NOT NULL), user_id (TEXT NOT NULL), content (TEXT NOT NULL), role (TEXT NOT NULL), embedding (Vector(384) NOT NULL), created_at (TIMESTAMPTZ, server_default=now()) + - Use `from pgvector.sqlalchemy import Vector` for the embedding column + - RLS policy follows existing pattern from tenant.py + + 2. Extend `packages/shared/shared/redis_keys.py` with: + - memory_short_key(tenant_id, agent_id, user_id) -> "{tenant_id}:memory:short:{agent_id}:{user_id}" + - escalation_status_key(tenant_id, thread_id) -> "{tenant_id}:escalation:{thread_id}" + - pending_tool_confirm_key(tenant_id, thread_id) -> "{tenant_id}:tool_confirm:{thread_id}" + + 3. Create `packages/orchestrator/orchestrator/memory/short_term.py`: + - async get_recent_messages(redis, tenant_id, agent_id, user_id, n=20) -> list[dict] + - async append_message(redis, tenant_id, agent_id, user_id, role, content, window=20) -> None + - Uses RPUSH + LTRIM pattern. No TTL (indefinite retention per user decision). + + 4. Create `packages/orchestrator/orchestrator/memory/long_term.py`: + - async retrieve_relevant(session, tenant_id, agent_id, user_id, query_embedding, top_k=3, threshold=0.75) -> list[str] + - async store_embedding(session, tenant_id, agent_id, user_id, content, role, embedding) -> None + - CRITICAL: All queries MUST include WHERE tenant_id = :tenant_id AND agent_id = :agent_id AND user_id = :user_id BEFORE the ANN operator + - Uses raw SQL text() for pgvector operations (cosine distance operator <=>) + + 5. Create Alembic migration `002_phase2_memory.py`: + - conversation_embeddings table with all columns from the model + - HNSW index: CREATE INDEX ... USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64) + - Covering index on (tenant_id, agent_id, user_id, created_at DESC) + - RLS: ENABLE ROW LEVEL SECURITY, FORCE ROW LEVEL SECURITY + - RLS policy: tenant_id = current_setting('app.current_tenant')::uuid + - GRANT SELECT, INSERT on conversation_embeddings TO konstruct_app (no UPDATE/DELETE — embeddings are immutable like audit) + + 6. Extend tests/conftest.py with pgvector fixtures (ensure pgvector extension created in test DB). + + 7. Write unit tests (test_memory_short_term.py) using fakeredis for sliding window operations. + + 8. Write integration tests (test_memory_long_term.py) using real PostgreSQL with pgvector for embedding storage and retrieval, including a two-tenant cross-contamination test. + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_memory_short_term.py tests/integration/test_memory_long_term.py -x -v + + + - ConversationEmbedding ORM model exists with Vector(384) column + - Redis sliding window stores/retrieves messages correctly with tenant+agent+user namespacing + - pgvector similarity search returns relevant content above threshold + - Cross-tenant isolation verified: tenant A's embeddings never returned for tenant B queries + - Alembic migration runs cleanly and creates HNSW index + + + + + Task 2: Wire memory into orchestrator pipeline + + packages/orchestrator/orchestrator/agents/builder.py, + packages/orchestrator/orchestrator/agents/runner.py, + packages/orchestrator/orchestrator/tasks.py + + + 1. Update `builder.py` — add `build_messages_with_memory()` function: + - Takes: agent, current_message, recent_messages (from Redis), relevant_context (from pgvector) + - Returns: list[dict] formatted as LLM messages array + - Structure: [system_prompt] + [pgvector context as system message: "Relevant context from past conversations: ..."] + [sliding window messages as user/assistant alternation] + [current user message] + - pgvector context injected as a system message BEFORE the sliding window — gives the LLM background without polluting the conversation flow + - If no relevant context found, omit the context system message entirely (don't inject empty context) + + 2. Update `runner.py` — modify `run_agent()` to accept pre-built messages array: + - Current: builds simple [system, user] messages internally + - New: accept optional `messages` parameter. If provided, use it directly. If not, fall back to existing behavior (backward compat for tests). + - This lets the pipeline pass the memory-enriched messages array + + 3. Update `tasks.py` — modify `handle_message` Celery task: + - BEFORE LLM call: load recent messages from Redis via get_recent_messages() + - BEFORE LLM call: embed current message text, call retrieve_relevant() for long-term context + - For embedding the query: use sentence-transformers `SentenceTransformer('all-MiniLM-L6-v2').encode()` — load model once at module level (lazy singleton) + - Build messages array via build_messages_with_memory() + - Pass messages to run_agent() + - AFTER LLM response: append both user message and assistant response to Redis sliding window via append_message() + - AFTER LLM response: dispatch embed_and_store.delay() Celery task for async pgvector backfill (fire-and-forget) + - Create embed_and_store Celery task (sync def with asyncio.run()): takes tenant_id, agent_id, user_id, messages list, embeds each, stores via store_embedding() + - The embed_and_store task must use sentence-transformers for embedding (same model as query embedding) + + Note: sentence-transformers must be installed. Run `uv add sentence-transformers` in the orchestrator package. If sentence-transformers is too heavy, use the Ollama embedding endpoint via httpx POST to llm-pool (add an /embed endpoint to llm-pool). Use Claude's discretion on which approach is simpler — but the embedding model MUST be all-MiniLM-L6-v2 (384 dimensions) to match the pgvector column width. + + CRITICAL constraints: + - All Celery tasks MUST be sync def with asyncio.run() — never async def + - Redis operations use the existing redis.asyncio.Redis client pattern + - DB operations use the existing async SQLAlchemy session pattern with RLS context + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_memory_short_term.py tests/integration/test_memory_long_term.py -x -v + + + - handle_message loads sliding window + pgvector context before every LLM call + - LLM prompt includes recent conversation history and relevant past context + - User and assistant messages are appended to Redis after each turn + - Embedding backfill dispatched asynchronously via embed_and_store.delay() + - Existing Slack flow still works end-to-end (backward compatible) + + + + + + +- All existing Phase 1 tests still pass: `pytest tests/ -x` +- Memory unit tests pass: `pytest tests/unit/test_memory_short_term.py -x` +- Memory integration tests pass: `pytest tests/integration/test_memory_long_term.py -x` +- Cross-tenant isolation verified in integration tests +- Migration applies cleanly: `alembic upgrade head` + + + +- Agent maintains conversational context within a session via Redis sliding window +- Agent recalls relevant past context across conversations via pgvector retrieval +- Memory is isolated per-user per-agent per-tenant +- Embedding backfill is asynchronous and never blocks the response pipeline + + + +After completion, create `.planning/phases/02-agent-features/02-01-SUMMARY.md` + diff --git a/.planning/phases/02-agent-features/02-02-PLAN.md b/.planning/phases/02-agent-features/02-02-PLAN.md new file mode 100644 index 0000000..76ccfd4 --- /dev/null +++ b/.planning/phases/02-agent-features/02-02-PLAN.md @@ -0,0 +1,263 @@ +--- +phase: 02-agent-features +plan: 02 +type: execute +wave: 2 +depends_on: ["02-01"] +files_modified: + - packages/shared/shared/models/audit.py + - packages/shared/shared/models/kb.py + - packages/orchestrator/orchestrator/audit/__init__.py + - packages/orchestrator/orchestrator/audit/logger.py + - packages/orchestrator/orchestrator/tools/__init__.py + - packages/orchestrator/orchestrator/tools/registry.py + - packages/orchestrator/orchestrator/tools/executor.py + - packages/orchestrator/orchestrator/tools/builtins/__init__.py + - packages/orchestrator/orchestrator/tools/builtins/web_search.py + - packages/orchestrator/orchestrator/tools/builtins/kb_search.py + - packages/orchestrator/orchestrator/tools/builtins/http_request.py + - packages/orchestrator/orchestrator/tools/builtins/calendar_lookup.py + - packages/orchestrator/orchestrator/agents/runner.py + - packages/orchestrator/orchestrator/tasks.py + - migrations/versions/003_phase2_audit_kb.py + - tests/unit/test_tool_registry.py + - tests/unit/test_tool_executor.py + - tests/integration/test_audit.py +autonomous: true +requirements: + - AGNT-04 + - AGNT-06 + +must_haves: + truths: + - "Agent can invoke a registered tool and incorporate the result into its response" + - "Tool arguments are schema-validated before execution — invalid args rejected with error message" + - "Tools requiring confirmation pause the loop and ask the user before executing" + - "Every LLM call, tool invocation, and handoff event is recorded in an immutable audit trail" + - "Audit entries cannot be updated or deleted by the application role" + - "Audit trail is queryable by tenant via RLS" + artifacts: + - path: "packages/orchestrator/orchestrator/tools/registry.py" + provides: "ToolDefinition model + BUILTIN_TOOLS mapping" + exports: ["ToolDefinition", "BUILTIN_TOOLS", "get_tools_for_agent"] + - path: "packages/orchestrator/orchestrator/tools/executor.py" + provides: "Schema-validated tool execution with audit logging" + exports: ["execute_tool"] + - path: "packages/orchestrator/orchestrator/audit/logger.py" + provides: "Immutable audit event writer" + exports: ["AuditLogger"] + - path: "packages/shared/shared/models/audit.py" + provides: "AuditEvent ORM model" + contains: "class AuditEvent" + - path: "migrations/versions/003_phase2_audit_kb.py" + provides: "Migration for audit_events and kb tables with REVOKE UPDATE/DELETE" + key_links: + - from: "packages/orchestrator/orchestrator/agents/runner.py" + to: "orchestrator/tools/executor.py" + via: "tool-call loop: LLM returns tool_calls -> execute -> re-call LLM" + pattern: "execute_tool|tool_calls" + - from: "packages/orchestrator/orchestrator/tools/executor.py" + to: "orchestrator/audit/logger.py" + via: "log_tool_call after every tool execution" + pattern: "audit_logger\\.log" + - from: "packages/orchestrator/orchestrator/tasks.py" + to: "orchestrator/audit/logger.py" + via: "log_llm_call after every LLM invocation" + pattern: "audit_logger\\.log" +--- + + +Build the tool framework (registry, schema-validated executor, 4 built-in tools) and immutable audit logging system. Wire the tool-call loop into the agent runner so the LLM can reason, call tools, observe results, and respond. + +Purpose: Gives the AI employee the ability to take actions (search, look up info, make requests) and creates the compliance-ready audit trail for all agent activity. +Output: Tool registry + executor, 4 builtin tools, audit logger, DB migration, updated runner with tool loop, passing tests. + + + +@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md +@/home/adelorenzo/.claude/get-shit-done/templates/summary.md + + + +@.planning/PROJECT.md +@.planning/ROADMAP.md +@.planning/STATE.md +@.planning/phases/02-agent-features/02-CONTEXT.md +@.planning/phases/02-agent-features/02-RESEARCH.md +@.planning/phases/02-agent-features/02-01-SUMMARY.md + +@packages/orchestrator/orchestrator/agents/runner.py +@packages/orchestrator/orchestrator/tasks.py +@packages/shared/shared/models/tenant.py +@packages/shared/shared/rls.py +@packages/shared/shared/db.py +@migrations/versions/002_phase2_memory.py + + + + + + Task 1: Audit logger, tool registry, executor, and built-in tools with tests + + packages/shared/shared/models/audit.py, + packages/shared/shared/models/kb.py, + packages/orchestrator/orchestrator/audit/__init__.py, + packages/orchestrator/orchestrator/audit/logger.py, + packages/orchestrator/orchestrator/tools/__init__.py, + packages/orchestrator/orchestrator/tools/registry.py, + packages/orchestrator/orchestrator/tools/executor.py, + packages/orchestrator/orchestrator/tools/builtins/__init__.py, + packages/orchestrator/orchestrator/tools/builtins/web_search.py, + packages/orchestrator/orchestrator/tools/builtins/kb_search.py, + packages/orchestrator/orchestrator/tools/builtins/http_request.py, + packages/orchestrator/orchestrator/tools/builtins/calendar_lookup.py, + migrations/versions/003_phase2_audit_kb.py, + tests/unit/test_tool_registry.py, + tests/unit/test_tool_executor.py, + tests/integration/test_audit.py + + + - ToolDefinition has name, description, parameters (JSON Schema), requires_confirmation, handler + - BUILTIN_TOOLS contains 4 tools: web_search, kb_search, http_request, calendar_lookup + - get_tools_for_agent filters BUILTIN_TOOLS by agent's configured tool list + - execute_tool validates args against tool's JSON schema before calling handler + - execute_tool with invalid args returns error string and logs the failure + - execute_tool with unknown tool name raises ValueError + - execute_tool with requires_confirmation=True returns a confirmation request instead of executing + - AuditLogger.log_tool_call writes a row to audit_events with action_type='tool_invocation' + - AuditLogger.log_llm_call writes a row with action_type='llm_call' including latency_ms + - audit_events table rejects UPDATE and DELETE from konstruct_app role + - audit_events are tenant-scoped via RLS + - web_search tool calls Brave Search API and returns structured results + - kb_search tool queries pgvector knowledge base (conversation_embeddings or dedicated kb_chunks table) + - http_request tool makes outbound HTTP with timeout (30s), size cap (1MB), allowed methods (GET/POST/PUT/DELETE) + - calendar_lookup tool queries Google Calendar events.list for availability + + + 1. Create `packages/shared/shared/models/audit.py`: + - AuditEvent: id (UUID PK), tenant_id (UUID NOT NULL), agent_id (UUID), user_id (TEXT), action_type (TEXT NOT NULL — 'llm_call' | 'tool_invocation' | 'escalation'), input_summary (TEXT), output_summary (TEXT), latency_ms (INTEGER), metadata (JSONB, default={}), created_at (TIMESTAMPTZ, server_default=now()) + - RLS enabled + forced, same pattern as other tenant-scoped tables + + 2. Create `packages/shared/shared/models/kb.py`: + - KnowledgeBaseDocument: id (UUID PK), tenant_id (UUID NOT NULL), agent_id (UUID NOT NULL), filename (TEXT), source_url (TEXT), content_type (TEXT), created_at + - KBChunk: id (UUID PK), tenant_id (UUID NOT NULL), document_id (UUID FK), content (TEXT NOT NULL), embedding (Vector(384) NOT NULL), chunk_index (INTEGER), created_at + - RLS on both tables + + 3. Create Alembic migration `003_phase2_audit_kb.py`: + - audit_events table with all columns, index on (tenant_id, created_at DESC), RLS + - REVOKE UPDATE, DELETE ON audit_events FROM konstruct_app — immutability enforced at DB level + - kb_documents and kb_chunks tables, HNSW index on kb_chunks embedding, RLS + - GRANT SELECT, INSERT on audit_events TO konstruct_app + - GRANT SELECT, INSERT, UPDATE, DELETE on kb_documents and kb_chunks TO konstruct_app + + 4. Create `packages/orchestrator/orchestrator/audit/logger.py`: + - AuditLogger class initialized with async session factory + - async log_llm_call(tenant_id, agent_id, user_id, input_summary, output_summary, latency_ms, metadata={}) + - async log_tool_call(tool_name, args, result, tenant_id, agent_id, latency_ms, error=None) + - async log_escalation(tenant_id, agent_id, user_id, trigger_reason, metadata={}) + - All methods write to audit_events table with RLS context set + + 5. Create `packages/orchestrator/orchestrator/tools/registry.py`: + - ToolDefinition Pydantic model: name, description, parameters (dict — JSON Schema), requires_confirmation (bool, default False), handler (Any, excluded from serialization) + - BUILTIN_TOOLS: dict[str, ToolDefinition] with 4 tools + - get_tools_for_agent(agent: Agent) -> dict[str, ToolDefinition]: filters by agent.tools list + - to_litellm_format(tools: dict) -> list[dict]: converts to OpenAI function-calling schema for LiteLLM + + 6. Create `packages/orchestrator/orchestrator/tools/executor.py`: + - async execute_tool(tool_call: dict, registry: dict, tenant_id, agent_id, audit_logger) -> str + - Validates args via jsonschema.validate() BEFORE calling handler (LLM output is untrusted) + - If requires_confirmation is True, return a confirmation message string instead of executing + - Logs every invocation (success or failure) to audit trail + - Install jsonschema: `uv add jsonschema` in orchestrator package + + 7. Create 4 built-in tool handlers in `tools/builtins/`: + - web_search.py: async web_search(query: str) -> str. Uses Brave Search API via httpx. Env var: BRAVE_API_KEY. Returns top 3 results formatted as text. Install: `uv add brave-search` or use raw httpx to https://api.search.brave.com/res/v1/web/search + - kb_search.py: async kb_search(query: str, tenant_id: str, agent_id: str) -> str. Embeds query, searches kb_chunks via pgvector. Returns top 3 matching chunks as text. + - http_request.py: async http_request(url: str, method: str = "GET", body: str | None = None) -> str. Timeout 30s, response size cap 1MB, allowed methods GET/POST/PUT/DELETE. requires_confirmation=True. + - calendar_lookup.py: async calendar_lookup(date: str, calendar_id: str = "primary") -> str. Uses google-api-python-client events.list(). Requires GOOGLE_SERVICE_ACCOUNT_KEY env var or per-tenant OAuth. Returns formatted availability. requires_confirmation=False (read-only). + + 8. Write unit tests: + - test_tool_registry.py: test tool lookup, filtering by agent, LiteLLM format conversion + - test_tool_executor.py: test schema validation (valid args pass, invalid rejected), confirmation flow, unknown tool error, audit logging called + + 9. Write integration tests: + - test_audit.py: test that audit events are written to DB, test that UPDATE/DELETE is rejected (expect error), test RLS isolation between tenants + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_tool_registry.py tests/unit/test_tool_executor.py tests/integration/test_audit.py -x -v + + + - 4 built-in tools registered with JSON Schema definitions + - Tool executor validates args and rejects invalid input + - Confirmation-required tools return confirmation message instead of executing + - Audit events written to DB for every tool call and LLM call + - audit_events immutability enforced (UPDATE/DELETE rejected) + - RLS isolates audit data per tenant + + + + + Task 2: Wire tool-call loop into agent runner and orchestrator pipeline + + packages/orchestrator/orchestrator/agents/runner.py, + packages/orchestrator/orchestrator/tasks.py + + + 1. Update `runner.py` — implement tool-call loop: + - After LLM response, check if response contains `tool_calls` array (LiteLLM returns this in OpenAI format) + - If tool_calls present: for each tool call, dispatch to execute_tool() + - If tool requires confirmation: stop the loop, return the confirmation message to the user, store pending action in Redis (pending_tool_confirm_key) + - If tool executed: append tool result as a `tool` role message, re-call LLM with updated messages + - Loop until LLM returns plain text (no tool_calls) or max iterations reached (default: 5) + - Max iteration guard prevents runaway tool chains + - Pass AuditLogger instance through the loop for logging each LLM call and tool call + + 2. Update `tasks.py`: + - Initialize AuditLogger at task start with session factory + - Pass audit_logger and tool registry to run_agent + - Log initial LLM call and final response via audit_logger.log_llm_call() + - Handle pending tool confirmation: check pending_tool_confirm_key in Redis at start of handle_message. If pending, check if current message is a confirmation (yes/no). If yes, execute the pending tool and continue. If no, cancel and respond. + - The tool definitions are passed to LiteLLM via the `tools` parameter in the /complete request to llm-pool. Update the llm-pool /complete endpoint to forward `tools` parameter to litellm.acompletion() if present. + + 3. Update llm-pool /complete endpoint: + - Accept optional `tools` parameter in request body + - Forward to litellm.acompletion(tools=tools) when present + - Return tool_calls in response when LLM produces them + + CRITICAL: The tool loop happens inside the Celery task (sync context with asyncio.run). Each iteration of the loop is an async function call within the same asyncio.run() block. Do NOT dispatch separate Celery tasks for tool execution — it all happens in one task invocation. + + Seamless tool usage per user decision: The agent's system prompt should NOT include instructions like "announce when using tools." The tool results are injected as context and the LLM naturally incorporates them. The confirmation flow is the only user-visible tool interaction. + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_tool_registry.py tests/unit/test_tool_executor.py tests/integration/test_audit.py -x -v + + + - Agent runner supports multi-turn tool-call loop (reason -> tool -> observe -> respond) + - Tool calls are bounded at 5 iterations maximum + - Confirmation-required tools pause and await user response + - Every LLM call and tool invocation logged to audit trail + - llm-pool forwards tools parameter to LiteLLM + - Existing memory pipeline from Plan 01 still works (no regression) + + + + + + +- All Phase 1 + Plan 01 tests still pass: `pytest tests/ -x` +- Tool tests pass: `pytest tests/unit/test_tool_registry.py tests/unit/test_tool_executor.py -x` +- Audit integration tests pass: `pytest tests/integration/test_audit.py -x` +- Migration applies cleanly: `alembic upgrade head` + + + +- Agent can invoke tools during conversation and incorporate results naturally +- Tool arguments are validated against JSON Schema before execution +- Confirmation-required tools pause for user approval +- Every agent action is recorded in immutable, tenant-scoped audit trail +- Audit entries cannot be modified or deleted at the database level + + + +After completion, create `.planning/phases/02-agent-features/02-02-SUMMARY.md` + diff --git a/.planning/phases/02-agent-features/02-03-PLAN.md b/.planning/phases/02-agent-features/02-03-PLAN.md new file mode 100644 index 0000000..70e97bd --- /dev/null +++ b/.planning/phases/02-agent-features/02-03-PLAN.md @@ -0,0 +1,265 @@ +--- +phase: 02-agent-features +plan: 03 +type: execute +wave: 1 +depends_on: [] +files_modified: + - packages/gateway/gateway/channels/whatsapp.py + - packages/gateway/gateway/normalize.py + - packages/gateway/gateway/main.py + - packages/shared/shared/models/message.py + - packages/shared/shared/config.py + - packages/orchestrator/orchestrator/agents/runner.py + - packages/orchestrator/orchestrator/tasks.py + - migrations/versions/004_phase2_media.py + - tests/unit/test_whatsapp_verify.py + - tests/unit/test_whatsapp_normalize.py + - tests/unit/test_whatsapp_scoping.py +autonomous: true +requirements: + - CHAN-03 + - CHAN-04 + +must_haves: + truths: + - "A WhatsApp message to the AI employee produces a reply in the same WhatsApp conversation" + - "Webhook signature is verified via HMAC-SHA256 on raw body bytes before any JSON parsing" + - "Per-tenant phone number isolation — each tenant's WhatsApp connection uses its own phone_number_id" + - "Clearly off-topic messages get a canned rejection without an LLM call (tier 1 allowlist gate)" + - "Borderline messages are handled by the LLM with business-function scoping in the system prompt (tier 2)" + - "Media attachments (images, documents) are downloaded, stored in MinIO, and passed to the orchestrator" + artifacts: + - path: "packages/gateway/gateway/channels/whatsapp.py" + provides: "WhatsApp webhook handler, signature verification, message sending" + exports: ["whatsapp_router"] + - path: "packages/gateway/gateway/normalize.py" + provides: "normalize_whatsapp_event function alongside existing Slack normalizer" + exports: ["normalize_whatsapp_event"] + - path: "packages/shared/shared/models/message.py" + provides: "MediaAttachment model added to MessageContent" + contains: "class MediaAttachment" + key_links: + - from: "packages/gateway/gateway/channels/whatsapp.py" + to: "gateway/normalize.py" + via: "normalize_whatsapp_event called after signature verification" + pattern: "normalize_whatsapp_event" + - from: "packages/gateway/gateway/channels/whatsapp.py" + to: "router/tenant.py" + via: "resolve_tenant with phone_number_id as workspace_id" + pattern: "resolve_tenant" + - from: "packages/gateway/gateway/channels/whatsapp.py" + to: "Celery handle_message task" + via: "handle_message.delay() after normalization" + pattern: "handle_message\\.delay" +--- + + +Build the WhatsApp Business Cloud API adapter in the Channel Gateway: webhook verification, signature checking, message normalization to KonstructMessage, business-function scoping gate, media handling, and outbound message delivery. Extend the shared message model with typed media attachments. + +Purpose: Adds the second messaging channel, enabling SMBs to deploy their AI employee on WhatsApp — the most common business communication channel globally. +Output: WhatsApp adapter, media model extension, business-function scoping, passing tests. + + + +@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md +@/home/adelorenzo/.claude/get-shit-done/templates/summary.md + + + +@.planning/PROJECT.md +@.planning/ROADMAP.md +@.planning/STATE.md +@.planning/phases/02-agent-features/02-CONTEXT.md +@.planning/phases/02-agent-features/02-RESEARCH.md + +@packages/gateway/gateway/channels/slack.py +@packages/gateway/gateway/normalize.py +@packages/gateway/gateway/main.py +@packages/shared/shared/models/message.py +@packages/shared/shared/config.py +@packages/router/router/tenant.py +@packages/router/router/ratelimit.py +@packages/router/router/idempotency.py + + + + +Channel adapter sequence (from Phase 1): +1. Receive webhook -> verify signature +2. Normalize to KonstructMessage +3. resolve_tenant(workspace_id) -> tenant_id +4. check_rate_limit(tenant_id, channel) +5. is_duplicate(message_id) -> skip if True +6. Post placeholder / typing indicator +7. handle_message.delay(msg_payload) + +From packages/shared/shared/models/message.py: +- KonstructMessage: id, tenant_id, channel, channel_metadata, sender, content, timestamp, thread_id, reply_to, context +- MessageContent: text, html, attachments (list[dict]), mentions (list[str]) +- SenderInfo: user_id, display_name, role +- ChannelType: slack (needs 'whatsapp' added) + +From packages/router/router/tenant.py: +- resolve_tenant(workspace_id: str, session) -> str (tenant_id) +- workspace_id for WhatsApp = phone_number_id from channel_connections + +From packages/shared/shared/config.py: +- Settings class with pydantic-settings +- Need to add: whatsapp_app_secret, whatsapp_verify_token + + + + + + + Task 1: Media model extension, WhatsApp normalizer, and signature verification with tests + + packages/shared/shared/models/message.py, + packages/gateway/gateway/normalize.py, + packages/shared/shared/config.py, + tests/unit/test_whatsapp_verify.py, + tests/unit/test_whatsapp_normalize.py + + + - MediaAttachment has media_type (image|document|audio|video), url, storage_key, mime_type, filename, size_bytes + - MessageContent.media is a list[MediaAttachment] (new field, defaults to []) + - ChannelType enum includes 'whatsapp' + - normalize_whatsapp_event converts Meta webhook payload to KonstructMessage with correct field mapping + - normalize_whatsapp_event extracts media attachments (image/document) into MediaAttachment objects + - normalize_whatsapp_event sets channel='whatsapp', sender.user_id=wa_id, thread_id=wa_id (WhatsApp has no threads) + - verify_whatsapp_signature raises HTTPException(403) when signature is invalid + - verify_whatsapp_signature returns raw body bytes when signature is valid + - verify_whatsapp_signature uses hmac.compare_digest for timing-safe comparison + - WhatsApp webhook GET verification returns hub.challenge when token matches + - WhatsApp webhook GET verification returns 403 when token doesn't match + + + 1. Extend `packages/shared/shared/models/message.py`: + - Add MediaType(StrEnum): IMAGE, DOCUMENT, AUDIO, VIDEO + - Add MediaAttachment(BaseModel): media_type, url (str|None), storage_key (str|None), mime_type (str|None), filename (str|None), size_bytes (int|None) + - Add `media: list[MediaAttachment] = []` field to MessageContent + - Add 'whatsapp' to ChannelType enum + + 2. Extend `packages/shared/shared/config.py`: + - Add whatsapp_app_secret: str = "" and whatsapp_verify_token: str = "" to Settings + + 3. Create `normalize_whatsapp_event()` in normalize.py: + - Takes: parsed webhook JSON body (dict) + - Extracts: entry[0].changes[0].value — this is the Meta Cloud API v20.0 structure + - Maps: messages[0].from -> sender.user_id, messages[0].text.body -> content.text + - For media messages (type=image/document): extract media_id, set MediaAttachment with media_type and a placeholder URL (actual download happens in the adapter) + - Sets channel='whatsapp', thread_id=sender_wa_id (WhatsApp conversations are per-phone-number, not threaded) + - Sets channel_metadata with phone_number_id, message_id from webhook + + 4. Write tests: + - test_whatsapp_verify.py: Valid signature passes, invalid signature raises 403, timing-safe comparison used + - test_whatsapp_normalize.py: Text message normalizes correctly, image message normalizes with MediaAttachment, correct field mapping for sender/channel/metadata + + Note: Use `hmac.new()` not `hmac.HMAC()` for signature verification. Read raw body via `await request.body()` BEFORE any JSON parsing (Pitfall 5 from research). + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_whatsapp_verify.py tests/unit/test_whatsapp_normalize.py -x -v + + + - MediaAttachment model exists with typed media fields + - MessageContent has media list field + - WhatsApp events normalize to KonstructMessage with correct mapping + - Signature verification is timing-safe and works on raw bytes + - ChannelType includes 'whatsapp' + + + + + Task 2: WhatsApp adapter with business-function scoping, media handling, and outbound delivery + + packages/gateway/gateway/channels/whatsapp.py, + packages/gateway/gateway/main.py, + tests/unit/test_whatsapp_scoping.py + + + - is_clearly_off_topic returns True for messages with zero keyword overlap with allowed_functions + - is_clearly_off_topic returns False for messages containing keywords from allowed_functions + - Off-topic messages receive a canned redirect response mentioning the agent name and allowed topics + - Borderline messages (not clearly off-topic) pass through to the LLM with scoping in system prompt + - WhatsApp webhook POST processes messages through the full adapter sequence (verify -> normalize -> resolve_tenant -> rate_limit -> dedup -> scoping -> dispatch) + - send_whatsapp_message sends text via Meta Cloud API POST to /v20.0/{phone_number_id}/messages + - send_whatsapp_media sends image/document via Meta Cloud API with media_id or URL + - Media download fetches from Meta API (GET /media/{media_id}) and stores to MinIO with tenant-prefixed key + + + 1. Create `packages/gateway/gateway/channels/whatsapp.py`: + - whatsapp_router = APIRouter() + - GET /whatsapp/webhook — verification handshake: check hub.mode=="subscribe" and hub.verify_token matches settings, return hub.challenge as PlainTextResponse + - POST /whatsapp/webhook — inbound message handler: + a. Read raw body via request.body() BEFORE parsing + b. Verify HMAC-SHA256 signature (X-Hub-Signature-256 header) + c. Parse JSON from raw body + d. Skip non-message events (status updates, read receipts — check for messages key) + e. Normalize via normalize_whatsapp_event() + f. Resolve tenant via phone_number_id as workspace_id (same resolve_tenant function as Slack) + g. Check rate limit (reuse existing check_rate_limit) + h. Check idempotency (reuse is_duplicate/mark_processed) + i. Business-function scoping check (see below) + j. If media: download from Meta API, upload to MinIO with key {tenant_id}/{agent_id}/{message_id}/{filename}, update MediaAttachment.storage_key and .url (presigned URL) + k. Dispatch handle_message.delay() with msg payload + extras (bot_token from channel_connections.config['access_token'], phone_number_id) + - Always return 200 OK to Meta (even on errors — Meta retries on non-200) + + 2. Business-function scoping (two-tier gate per user decision): + - Tier 1: is_clearly_off_topic(text, allowed_functions) — simple keyword overlap check. If zero overlap with any allowed function keywords, return True. Agent's allowed_functions come from Agent model (add `allowed_functions: list[str] = []` field if not present, or use agent.tools as proxy). + - If clearly off-topic: send canned redirect via send_whatsapp_message: "{agent.name} is here to help with {', '.join(allowed_functions)}. How can I assist you with one of those?" + - Tier 2: Borderline messages pass to the LLM. The scoping is enforced via the system prompt (which already contains the agent's role and persona). Add to system prompt builder: if channel == 'whatsapp', append "You only handle: {allowed_functions}. If a request is outside these areas, politely redirect the user." + + 3. Outbound message delivery: + - async send_whatsapp_message(phone_number_id, access_token, recipient_wa_id, text) -> None + - POST to https://graph.facebook.com/v20.0/{phone_number_id}/messages with messaging_product="whatsapp", to=recipient_wa_id, type="text", text={"body": text} + - async send_whatsapp_media(phone_number_id, access_token, recipient_wa_id, media_url, media_type) for outbound media + + 4. Wire into orchestrator tasks.py: + - In handle_message task, after getting LLM response, check channel type + - If 'whatsapp': call send_whatsapp_message via httpx (same pattern as Slack chat.update — bot token from extras) + - If 'slack': existing chat.update flow + + 5. Install boto3 for MinIO: `uv add boto3` in gateway package. Use endpoint_url=settings.minio_endpoint for S3-compatible MinIO access. + + 6. Register whatsapp_router in gateway main.py: `app.include_router(whatsapp_router)` + + 7. Write test_whatsapp_scoping.py: + - Test is_clearly_off_topic with matching keywords -> False + - Test is_clearly_off_topic with zero overlap -> True + - Test canned redirect message format includes agent name and allowed functions + - Test borderline message passes through (not rejected by tier 1) + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_whatsapp_verify.py tests/unit/test_whatsapp_normalize.py tests/unit/test_whatsapp_scoping.py -x -v + + + - WhatsApp webhook handler processes inbound messages through full adapter sequence + - Signature verification on raw body bytes before JSON parsing + - Business-function scoping: tier 1 rejects clearly off-topic, tier 2 scopes via system prompt + - Media downloaded from Meta API and stored in MinIO with tenant-prefixed keys + - Outbound messages sent via Meta Cloud API + - Gateway routes registered and running + - Canned redirect includes agent name and allowed topics + + + + + + +- All Phase 1 tests still pass: `pytest tests/ -x` +- WhatsApp tests pass: `pytest tests/unit/test_whatsapp_verify.py tests/unit/test_whatsapp_normalize.py tests/unit/test_whatsapp_scoping.py -x` +- Gateway starts without errors with new routes registered + + + +- WhatsApp messages are normalized to KonstructMessage and dispatched through the existing pipeline +- Webhook signature verification prevents unauthorized requests +- Business-function scoping enforces Meta 2026 policy (tier 1 keyword gate + tier 2 LLM scoping) +- Media attachments are downloaded, stored in MinIO, and available for multimodal LLM processing +- Per-tenant phone number isolation via phone_number_id in channel_connections + + + +After completion, create `.planning/phases/02-agent-features/02-03-SUMMARY.md` + diff --git a/.planning/phases/02-agent-features/02-04-PLAN.md b/.planning/phases/02-agent-features/02-04-PLAN.md new file mode 100644 index 0000000..24ce952 --- /dev/null +++ b/.planning/phases/02-agent-features/02-04-PLAN.md @@ -0,0 +1,244 @@ +--- +phase: 02-agent-features +plan: 04 +type: execute +wave: 2 +depends_on: ["02-01"] +files_modified: + - packages/orchestrator/orchestrator/escalation/__init__.py + - packages/orchestrator/orchestrator/escalation/handler.py + - packages/shared/shared/redis_keys.py + - packages/orchestrator/orchestrator/tasks.py + - tests/unit/test_escalation.py + - tests/integration/test_escalation.py +autonomous: true +requirements: + - AGNT-05 + +must_haves: + truths: + - "When a configured escalation rule triggers, the conversation is handed off to a human" + - "The human receives a DM with the full conversation transcript and escalation reason" + - "The agent stays in the thread as assistant after escalation — defers to human for end-user responses" + - "Natural language escalation ('can I talk to a human?') works when enabled per tenant" + - "Escalation events are logged in the audit trail" + artifacts: + - path: "packages/orchestrator/orchestrator/escalation/handler.py" + provides: "Escalation rule evaluation, transcript packaging, DM delivery" + exports: ["check_escalation_rules", "escalate_to_human"] + - path: "tests/unit/test_escalation.py" + provides: "Unit tests for rule evaluation and transcript packaging" + - path: "tests/integration/test_escalation.py" + provides: "Integration tests for escalation DM delivery" + key_links: + - from: "packages/orchestrator/orchestrator/tasks.py" + to: "orchestrator/escalation/handler.py" + via: "check_escalation_rules called after LLM response in handle_message" + pattern: "check_escalation_rules|escalate_to_human" + - from: "packages/orchestrator/orchestrator/escalation/handler.py" + to: "Slack API conversations.open + chat.postMessage" + via: "httpx POST for DM delivery to assigned human" + pattern: "conversations\\.open|chat\\.postMessage" + - from: "packages/orchestrator/orchestrator/escalation/handler.py" + to: "orchestrator/audit/logger.py" + via: "log_escalation on every handoff" + pattern: "audit_logger\\.log_escalation" +--- + + +Build the human escalation/handoff system: rule-based trigger evaluation, full conversation transcript packaging, DM delivery to assigned human, and post-escalation assistant mode where the agent defers to the human. + +Purpose: Ensures the AI employee knows its limits and gracefully hands off to a human when configured rules trigger or the user explicitly requests it — maintaining the "employee" metaphor ("let me get my manager"). +Output: Escalation handler, updated orchestrator pipeline, passing tests. + + + +@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md +@/home/adelorenzo/.claude/get-shit-done/templates/summary.md + + + +@.planning/PROJECT.md +@.planning/ROADMAP.md +@.planning/STATE.md +@.planning/phases/02-agent-features/02-CONTEXT.md +@.planning/phases/02-agent-features/02-RESEARCH.md +@.planning/phases/02-agent-features/02-01-SUMMARY.md + +@packages/orchestrator/orchestrator/tasks.py +@packages/orchestrator/orchestrator/agents/runner.py +@packages/shared/shared/models/tenant.py +@packages/shared/shared/redis_keys.py + + + +From packages/orchestrator/orchestrator/memory/short_term.py: +- async get_recent_messages(redis, tenant_id, agent_id, user_id, n=20) -> list[dict] + + +From packages/orchestrator/orchestrator/audit/logger.py: +- AuditLogger.log_escalation(tenant_id, agent_id, user_id, trigger_reason, metadata={}) + + +From packages/shared/shared/models/tenant.py: +- Agent.escalation_rules: list[dict] (e.g., [{"condition": "billing_dispute AND attempts > 2", "action": "handoff_human"}]) +- Agent.name: str (used in escalation DM message) + + +- escalation_status_key(tenant_id, thread_id) -> "{tenant_id}:escalation:{thread_id}" + + + + + + + Task 1: Escalation rule evaluator, transcript packager, and DM delivery with tests + + packages/orchestrator/orchestrator/escalation/__init__.py, + packages/orchestrator/orchestrator/escalation/handler.py, + tests/unit/test_escalation.py, + tests/integration/test_escalation.py + + + - check_escalation_rules returns the matching rule when a condition is met, None otherwise + - check_escalation_rules with "billing_dispute AND attempts > 2" matches when conversation metadata has billing_dispute=True and attempts=3 + - check_escalation_rules with natural language trigger ("can I talk to a human?") matches when natural_language_escalation is enabled for the tenant + - check_escalation_rules with natural language trigger returns None when natural_language_escalation is disabled + - build_transcript formats recent messages as "*User:* message\n*Assistant:* response" with Slack mrkdwn + - escalate_to_human opens a Slack DM with the assigned human and posts the transcript + - escalate_to_human sets the escalation status key in Redis + - After escalation, agent responses to end-user messages include "A team member is looking into this" + - Escalation event is logged to audit trail with trigger_reason + + + 1. Create `packages/orchestrator/orchestrator/escalation/handler.py`: + + **check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled=False) -> dict | None:** + - Iterates agent.escalation_rules list + - Each rule has: condition (str), action (str — 'handoff_human') + - Simple condition parser: supports "keyword AND count_check" format + - Check if conversation_metadata matches the condition fields + - For natural language: check if message_text matches common escalation phrases ("talk to a human", "speak to someone", "get a person", "human agent", "real person", "manager") AND natural_lang_enabled is True + - Returns the first matching rule dict, or None + + **build_transcript(recent_messages: list[dict]) -> str:** + - Formats each message as "*{role.capitalize()}:* {content}" + - Joins with newlines + - Truncates to 3000 chars if needed (Slack message limit) + + **async escalate_to_human(tenant_id, agent, thread_id, trigger_reason, recent_messages, assignee_slack_user_id, bot_token, redis, audit_logger) -> str:** + - Build formatted transcript via build_transcript() + - Compose DM text following the "employee" metaphor: + "{agent.name} needs human assistance\nReason: {trigger_reason}\nTenant: {tenant_id}\n\nConversation transcript:\n{transcript}\n\nThe agent will stay in the thread. You can reply directly to the user." + - Open DM channel via httpx POST to https://slack.com/api/conversations.open + - Post transcript to DM via httpx POST to https://slack.com/api/chat.postMessage + - Set escalation status in Redis: escalation_status_key(tenant_id, thread_id) = "escalated" with no TTL (stays until manually resolved) + - Log escalation event via audit_logger.log_escalation() + - Return a message for the end user: "I've brought in {assignee_name or 'a team member'} to help with this. They'll be with you shortly." + + 2. Write unit tests (test_escalation.py): + - Rule matching: condition with billing_dispute matches, non-matching condition returns None + - Natural language: "can I talk to a human?" matches when enabled, returns None when disabled + - Transcript formatting: messages formatted correctly, truncated at limit + - Various escalation phrases tested + + 3. Write integration tests (test_escalation.py): + - Mock httpx calls to Slack API + - Verify conversations.open is called with correct user ID + - Verify chat.postMessage is called with transcript + - Verify Redis escalation key is set + - Verify audit event is logged with action_type='escalation' + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v + + + - Escalation rules evaluate correctly against conversation metadata + - Natural language escalation triggers on common phrases when enabled + - Transcript is formatted in Slack mrkdwn and truncated if needed + - DM delivered to assigned human via Slack API + - Escalation status tracked in Redis + - Audit event logged for every escalation + + + + + Task 2: Wire escalation into orchestrator pipeline with post-escalation assistant mode + + packages/orchestrator/orchestrator/tasks.py, + packages/shared/shared/redis_keys.py + + + 1. Update `tasks.py` — add escalation checks to handle_message: + + **At the START of handle_message (before LLM call):** + - Check Redis escalation_status_key(tenant_id, thread_id) + - If escalated: enter assistant mode + - If the sender is the assigned human: process normally (the human might ask the agent for info) + - If the sender is the end user: respond with "A team member is looking into this. They'll respond shortly." Do NOT call the LLM. This prevents the agent from overriding the human's response. + - This check must happen AFTER message normalization but BEFORE the LLM call + + **AFTER the LLM response (before sending reply):** + - Load agent's escalation_rules and tenant config (natural_language_escalation setting) + - Call check_escalation_rules(agent, message_text, conversation_metadata, natural_lang_enabled) + - If a rule matches: + a. Load recent messages from Redis sliding window (already loaded for memory) + b. Get assignee_slack_user_id from agent configuration (add escalation_assignee field to Agent model or read from escalation_rules config) + c. Get bot_token from channel_connections config (already available in task extras) + d. Call escalate_to_human() + e. Replace the LLM response with the escalation message returned by escalate_to_human() + f. Send escalation message to user instead of original LLM response + + 2. Add escalation_assignee field to Agent model if not already present: + - In packages/shared/shared/models/tenant.py, add: escalation_assignee: Mapped[str | None] = mapped_column(Text, nullable=True) + - This is the Slack user ID of the human to DM on escalation + - Also add: natural_language_escalation: Mapped[bool] = mapped_column(Boolean, default=False) + + 3. Conversation metadata tracking: + - For rule-based escalation (e.g., "billing_dispute AND attempts > 2"), the orchestrator needs to track conversation metadata + - Store conversation metadata in Redis: {tenant_id}:conv_meta:{thread_id} as a JSON dict + - The LLM can populate this via a system prompt instruction: "If the user mentions billing, set billing_dispute=true in your response metadata" + - Or simpler: use keyword detection on the conversation history to populate metadata + - Use Claude's discretion on the simplest approach that works. Keyword detection on the sliding window is probably sufficient for v1. + + 4. If Agent model is modified, create a small Alembic migration for the new columns. + + CRITICAL constraints: + - Celery task is sync def with asyncio.run() + - httpx calls to Slack API follow the same pattern as existing chat.update in tasks.py + - Redis operations use existing async pattern + - Audit logging uses AuditLogger from Plan 02 (if Plan 02 not yet executed, use a no-op logger that can be replaced) + + + cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x -v + + + - Escalated conversations route end-user messages to "team member is handling this" auto-reply + - Human messages in escalated threads are processed normally by the agent + - Escalation rules checked after every LLM response + - Natural language escalation works when enabled per tenant + - Escalation triggers DM to assigned human with full transcript + - Agent model has escalation_assignee and natural_language_escalation fields + - Full pipeline: message in -> memory -> LLM -> escalation check -> response/handoff + + + + + + +- All tests pass: `pytest tests/ -x` +- Escalation tests pass: `pytest tests/unit/test_escalation.py tests/integration/test_escalation.py -x` +- Agent model migration applies cleanly: `alembic upgrade head` + + + +- Configured escalation rules trigger handoff to human with full conversation context +- Natural language escalation ("can I talk to a human?") works when enabled per tenant +- Escalated conversations enter assistant mode — agent defers to human +- Human receives DM with complete transcript and escalation reason +- Every escalation event is recorded in the audit trail + + + +After completion, create `.planning/phases/02-agent-features/02-04-SUMMARY.md` +