--- phase: 01-foundation plan: 03 type: execute wave: 3 depends_on: ["01-01", "01-02"] files_modified: - packages/gateway/__init__.py - packages/gateway/main.py - packages/gateway/channels/__init__.py - packages/gateway/channels/slack.py - packages/gateway/normalize.py - packages/gateway/verify.py - packages/router/__init__.py - packages/router/main.py - packages/router/tenant.py - packages/router/ratelimit.py - packages/router/idempotency.py - packages/router/context.py - docker-compose.yml - tests/unit/test_ratelimit.py - tests/integration/test_slack_flow.py - tests/integration/test_agent_persona.py - tests/integration/test_ratelimit.py autonomous: true requirements: - CHAN-02 - CHAN-05 - AGNT-01 must_haves: truths: - "A Slack @mention or DM to the AI employee triggers an LLM-generated response posted back in the same Slack thread" - "The Slack event handler returns HTTP 200 within 3 seconds — LLM work is dispatched to Celery, not done inline" - "A request exceeding the per-tenant or per-channel rate limit is rejected with an informative Slack message rather than silently dropped" - "The agent's response reflects the configured name, role, and persona from the Agent table" - "A typing indicator (placeholder message) appears while the LLM is generating" artifacts: - path: "packages/gateway/main.py" provides: "FastAPI app with /slack/events endpoint mounting slack-bolt AsyncApp" exports: ["app"] - path: "packages/gateway/channels/slack.py" provides: "Slack event handlers for @mentions and DMs, dispatches to Celery" exports: ["register_slack_handlers"] - path: "packages/gateway/normalize.py" provides: "Slack event -> KonstructMessage normalization" exports: ["normalize_slack_event"] - path: "packages/router/tenant.py" provides: "Workspace ID -> tenant_id resolution from DB" exports: ["resolve_tenant"] - path: "packages/router/ratelimit.py" provides: "Redis token bucket rate limiter per tenant per channel" exports: ["check_rate_limit", "RateLimitExceeded"] - path: "packages/router/idempotency.py" provides: "Redis-based message deduplication" exports: ["is_duplicate", "mark_processed"] key_links: - from: "packages/gateway/channels/slack.py" to: "packages/orchestrator/tasks.py" via: "handle_message_task.delay(msg.model_dump())" pattern: "handle_message.*delay" - from: "packages/gateway/channels/slack.py" to: "packages/router/tenant.py" via: "resolve_tenant(workspace_id, channel_type)" pattern: "resolve_tenant" - from: "packages/gateway/channels/slack.py" to: "packages/router/ratelimit.py" via: "check_rate_limit(tenant_id, channel) before dispatch" pattern: "check_rate_limit" - from: "packages/orchestrator/agents/runner.py" to: "Slack API" via: "chat.update to replace placeholder with real response" pattern: "chat_update|chat\\.update" --- Build the Channel Gateway (Slack adapter with slack-bolt AsyncApp), Message Router (tenant resolution, rate limiting, idempotency), and wire them to the Celery orchestrator from Plan 02 to complete the end-to-end Slack message -> LLM response flow. Purpose: Close the vertical loop — a Slack user @mentions the AI employee, a response appears in-thread. This is the core value demonstration of the entire platform. Output: Working Slack integration where @mentions and DMs trigger LLM responses in-thread, with rate limiting, tenant resolution, deduplication, and typing indicator. @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md @.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-foundation/01-CONTEXT.md @.planning/phases/01-foundation/01-RESEARCH.md @.planning/phases/01-foundation/01-01-SUMMARY.md @.planning/phases/01-foundation/01-02-SUMMARY.md From packages/shared/models/message.py: ```python class KonstructMessage(BaseModel): id: str tenant_id: str | None = None channel: ChannelType channel_metadata: dict sender: SenderInfo content: MessageContent timestamp: datetime thread_id: str | None = None ``` From packages/shared/redis_keys.py: ```python def rate_limit_key(tenant_id: str, channel: str) -> str: ... def idempotency_key(tenant_id: str, message_id: str) -> str: ... def engaged_thread_key(tenant_id: str, thread_id: str) -> str: ... ``` From packages/shared/rls.py: ```python current_tenant_id: ContextVar[str | None] ``` From packages/orchestrator/tasks.py: ```python @app.task def handle_message(message_data: dict) -> dict: ... ``` From packages/orchestrator/agents/builder.py: ```python def build_system_prompt(agent: Agent) -> str: ... def build_messages(system_prompt: str, user_message: str, history: list[dict] | None = None) -> list[dict]: ... ``` Task 1: Channel Gateway (Slack adapter) and Message Router packages/gateway/__init__.py, packages/gateway/main.py, packages/gateway/channels/__init__.py, packages/gateway/channels/slack.py, packages/gateway/normalize.py, packages/gateway/verify.py, packages/router/__init__.py, packages/router/main.py, packages/router/tenant.py, packages/router/ratelimit.py, packages/router/idempotency.py, packages/router/context.py, docker-compose.yml 1. Create `packages/gateway/normalize.py`: - `normalize_slack_event(event: dict, workspace_id: str) -> KonstructMessage`: Converts a Slack Events API payload into a KonstructMessage. Extracts: user ID, text, thread_ts -> thread_id, channel ID, workspace_id into channel_metadata. Sets channel=ChannelType.SLACK. - Handle both @mention events (strip the `<@BOT_ID>` prefix from text) and DM events. 2. Create `packages/gateway/channels/slack.py`: - `register_slack_handlers(slack_app: AsyncApp)`: Registers event handlers on the slack-bolt AsyncApp. - Handle `app_mention` event: Normalize message, resolve tenant, check rate limit, check idempotency, post placeholder "Thinking..." message in thread (typing indicator per user decision), dispatch to Celery with `handle_message_task.delay(msg.model_dump() | {"placeholder_ts": placeholder_msg["ts"], "channel_id": event["channel"]})`. - Handle `message` event (DMs only — filter `channel_type == "im"`): Same flow as app_mention. - Thread follow-up behavior (Claude's discretion): Implement auto-follow for engaged threads. After the first @mention in a thread, store `engaged_thread_key(tenant_id, thread_id)` in Redis with 30-minute TTL. Subsequent messages in that thread (even without @mention) trigger a response. Per research recommendation. - If rate limit exceeded: Post an ephemeral message to the user: "I'm receiving too many requests right now. Please try again in a moment." Do NOT dispatch to Celery. - If tenant resolution fails (unknown workspace): Log warning and ignore the event silently. - CRITICAL: Return HTTP 200 immediately. NO LLM work inside the handler. Slack retries after 3 seconds. 3. Create `packages/gateway/main.py`: - FastAPI app mounting slack-bolt AsyncApp via `AsyncSlackRequestHandler` - `POST /slack/events` endpoint handled by the slack handler - `GET /health` endpoint - Port 8001 4. Create `packages/router/tenant.py`: - `async def resolve_tenant(workspace_id: str, channel_type: ChannelType, session: AsyncSession) -> str | None`: Queries `channel_connections` table for matching workspace_id + channel_type, returns tenant_id or None. Uses RLS-free query (tenant resolution must work across all tenants — this is the one pre-RLS operation). 5. Create `packages/router/ratelimit.py`: - `async def check_rate_limit(tenant_id: str, channel: str, redis: Redis) -> bool`: Implements token bucket using Redis. Uses `rate_limit_key(tenant_id, channel)` from shared redis_keys. Default: 30 requests per minute per tenant per channel (configurable). Returns True if allowed, raises `RateLimitExceeded` if not. - `class RateLimitExceeded(Exception)`: Custom exception with remaining_seconds attribute. 6. Create `packages/router/idempotency.py`: - `async def is_duplicate(tenant_id: str, message_id: str, redis: Redis) -> bool`: Checks Redis for `idempotency_key(tenant_id, message_id)`. If exists, return True (duplicate). Otherwise, set with 24-hour TTL and return False. 7. Create `packages/router/context.py`: - `async def load_agent_for_tenant(tenant_id: str, session: AsyncSession) -> Agent | None`: Loads the active agent for the tenant (Phase 1 = single agent per tenant). Sets `current_tenant_id` ContextVar before querying. 8. Update the Celery `handle_message` task in `packages/orchestrator/tasks.py` (or instruct Plan 02 SUMMARY reader to expect this): - After generating LLM response, use `slack_bolt` or `httpx` to call `chat.update` on the placeholder message (replace "Thinking..." with the real response). Use `placeholder_ts` and `channel_id` from the task payload. - Per user decision: Always reply in threads. 9. Update `docker-compose.yml` to add `gateway` service on port 8001, depending on redis, postgres, celery-worker. 10. Create `__init__.py` files for gateway, gateway/channels, router packages. cd /home/adelorenzo/repos/konstruct && python -c "from packages.gateway.normalize import normalize_slack_event; from packages.router.tenant import resolve_tenant; from packages.router.ratelimit import check_rate_limit; print('Gateway + Router imports OK')" - Slack @mentions and DMs are handled by slack-bolt AsyncApp in HTTP mode - Messages are normalized to KonstructMessage format - Tenant resolution maps workspace_id to tenant_id - Rate limiting enforces per-tenant per-channel limits with Redis token bucket - Idempotency deduplication prevents double-processing of Slack retries - Placeholder "Thinking..." message posted immediately, replaced with LLM response - Auto-follow engaged threads with 30-minute idle timeout - HTTP 200 returned within 3 seconds, all LLM work dispatched to Celery Task 2: End-to-end integration tests for Slack flow, rate limiting, and agent persona tests/unit/test_ratelimit.py, tests/integration/test_slack_flow.py, tests/integration/test_agent_persona.py, tests/integration/test_ratelimit.py 1. Create `tests/unit/test_ratelimit.py` (CHAN-05 unit): - Test token bucket allows requests under the limit (29 of 30) - Test token bucket rejects the 31st request in a 1-minute window - Test that rate limit keys are namespaced per tenant (tenant_a limit independent of tenant_b) - Test that rate limit resets after the window expires - Use fakeredis or real Redis from Docker Compose 2. Create `tests/integration/test_ratelimit.py` (CHAN-05 integration): - Test the full flow: send a Slack event that exceeds rate limit, verify that an ephemeral "too many requests" message is sent back via Slack API (mock Slack client) - Verify the event is NOT dispatched to Celery when rate-limited 3. Create `tests/integration/test_slack_flow.py` (CHAN-02): - Mock Slack client (no real Slack workspace needed) - Mock LLM pool response (no real LLM call needed) - Test full flow: Slack app_mention event -> normalize -> resolve tenant -> dispatch Celery -> LLM call -> chat.update with response - Verify the response is posted in-thread (thread_ts is set) - Verify placeholder "Thinking..." message is posted before Celery dispatch - Verify the placeholder is replaced with the real response - Test DM flow: message event with channel_type="im" triggers the same pipeline - Test that bot messages are ignored (no infinite loop) - Test that unknown workspace_id events are silently ignored 4. Create `tests/integration/test_agent_persona.py` (AGNT-01): - Create a tenant with an agent configured with name="Mara", role="Customer Support", persona="Professional and empathetic" - Mock the LLM pool to capture the messages array sent to /complete - Trigger a message through the pipeline - Verify the system prompt contains: "Your name is Mara", "Your role is Customer Support", "Professional and empathetic", and the AI transparency clause - Verify model_preference from the agent config is passed to the LLM pool cd /home/adelorenzo/repos/konstruct && pytest tests/unit/test_ratelimit.py tests/integration/test_slack_flow.py tests/integration/test_agent_persona.py tests/integration/test_ratelimit.py -x -q - Rate limiting unit tests verify token bucket behavior per tenant per channel - Slack flow integration test proves end-to-end: event -> normalize -> tenant resolve -> Celery -> LLM -> thread reply - Agent persona test proves system prompt reflects name, role, persona, and AI transparency clause - Rate limit integration test proves over-limit requests get informative rejection - All tests pass with mocked Slack client and mocked LLM pool - `pytest tests/unit/test_ratelimit.py -x` verifies token bucket logic - `pytest tests/integration/test_slack_flow.py -x` proves end-to-end Slack -> LLM -> reply - `pytest tests/integration/test_agent_persona.py -x` proves persona reflected in system prompt - `pytest tests/integration/test_ratelimit.py -x` proves rate limit produces informative rejection - `grep -r "async def.*@app.task" packages/orchestrator/` returns NO matches (no async Celery tasks) - A Slack @mention or DM triggers an LLM response in the same thread (mocked end-to-end) - Rate-limited requests are rejected with informative message, not silently dropped - Agent persona (name, role, persona) is reflected in the LLM system prompt - Typing indicator (placeholder message) appears before LLM response - All tests green, no async Celery tasks After completion, create `.planning/phases/01-foundation/01-03-SUMMARY.md`