Files

14 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
phase plan type wave depends_on files_modified autonomous requirements must_haves
01-foundation 03 execute 3
01-01
01-02
packages/gateway/__init__.py
packages/gateway/main.py
packages/gateway/channels/__init__.py
packages/gateway/channels/slack.py
packages/gateway/normalize.py
packages/gateway/verify.py
packages/router/__init__.py
packages/router/main.py
packages/router/tenant.py
packages/router/ratelimit.py
packages/router/idempotency.py
packages/router/context.py
docker-compose.yml
tests/unit/test_ratelimit.py
tests/integration/test_slack_flow.py
tests/integration/test_agent_persona.py
tests/integration/test_ratelimit.py
true
CHAN-02
CHAN-05
AGNT-01
truths artifacts key_links
A Slack @mention or DM to the AI employee triggers an LLM-generated response posted back in the same Slack thread
The Slack event handler returns HTTP 200 within 3 seconds — LLM work is dispatched to Celery, not done inline
A request exceeding the per-tenant or per-channel rate limit is rejected with an informative Slack message rather than silently dropped
The agent's response reflects the configured name, role, and persona from the Agent table
A typing indicator (placeholder message) appears while the LLM is generating
path provides exports
packages/gateway/main.py FastAPI app with /slack/events endpoint mounting slack-bolt AsyncApp
app
path provides exports
packages/gateway/channels/slack.py Slack event handlers for @mentions and DMs, dispatches to Celery
register_slack_handlers
path provides exports
packages/gateway/normalize.py Slack event -> KonstructMessage normalization
normalize_slack_event
path provides exports
packages/router/tenant.py Workspace ID -> tenant_id resolution from DB
resolve_tenant
path provides exports
packages/router/ratelimit.py Redis token bucket rate limiter per tenant per channel
check_rate_limit
RateLimitExceeded
path provides exports
packages/router/idempotency.py Redis-based message deduplication
is_duplicate
mark_processed
from to via pattern
packages/gateway/channels/slack.py packages/orchestrator/tasks.py handle_message_task.delay(msg.model_dump()) handle_message.*delay
from to via pattern
packages/gateway/channels/slack.py packages/router/tenant.py resolve_tenant(workspace_id, channel_type) resolve_tenant
from to via pattern
packages/gateway/channels/slack.py packages/router/ratelimit.py check_rate_limit(tenant_id, channel) before dispatch check_rate_limit
from to via pattern
packages/orchestrator/agents/runner.py Slack API chat.update to replace placeholder with real response chat_update|chat.update
Build the Channel Gateway (Slack adapter with slack-bolt AsyncApp), Message Router (tenant resolution, rate limiting, idempotency), and wire them to the Celery orchestrator from Plan 02 to complete the end-to-end Slack message -> LLM response flow.

Purpose: Close the vertical loop — a Slack user @mentions the AI employee, a response appears in-thread. This is the core value demonstration of the entire platform.

Output: Working Slack integration where @mentions and DMs trigger LLM responses in-thread, with rate limiting, tenant resolution, deduplication, and typing indicator.

<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/01-foundation/01-CONTEXT.md @.planning/phases/01-foundation/01-RESEARCH.md @.planning/phases/01-foundation/01-01-SUMMARY.md @.planning/phases/01-foundation/01-02-SUMMARY.md

From packages/shared/models/message.py:

class KonstructMessage(BaseModel):
    id: str
    tenant_id: str | None = None
    channel: ChannelType
    channel_metadata: dict
    sender: SenderInfo
    content: MessageContent
    timestamp: datetime
    thread_id: str | None = None

From packages/shared/redis_keys.py:

def rate_limit_key(tenant_id: str, channel: str) -> str: ...
def idempotency_key(tenant_id: str, message_id: str) -> str: ...
def engaged_thread_key(tenant_id: str, thread_id: str) -> str: ...

From packages/shared/rls.py:

current_tenant_id: ContextVar[str | None]

From packages/orchestrator/tasks.py:

@app.task
def handle_message(message_data: dict) -> dict: ...

From packages/orchestrator/agents/builder.py:

def build_system_prompt(agent: Agent) -> str: ...
def build_messages(system_prompt: str, user_message: str, history: list[dict] | None = None) -> list[dict]: ...
Task 1: Channel Gateway (Slack adapter) and Message Router packages/gateway/__init__.py, packages/gateway/main.py, packages/gateway/channels/__init__.py, packages/gateway/channels/slack.py, packages/gateway/normalize.py, packages/gateway/verify.py, packages/router/__init__.py, packages/router/main.py, packages/router/tenant.py, packages/router/ratelimit.py, packages/router/idempotency.py, packages/router/context.py, docker-compose.yml 1. Create `packages/gateway/normalize.py`: - `normalize_slack_event(event: dict, workspace_id: str) -> KonstructMessage`: Converts a Slack Events API payload into a KonstructMessage. Extracts: user ID, text, thread_ts -> thread_id, channel ID, workspace_id into channel_metadata. Sets channel=ChannelType.SLACK. - Handle both @mention events (strip the `<@BOT_ID>` prefix from text) and DM events.
2. Create `packages/gateway/channels/slack.py`:
   - `register_slack_handlers(slack_app: AsyncApp)`: Registers event handlers on the slack-bolt AsyncApp.
   - Handle `app_mention` event: Normalize message, resolve tenant, check rate limit, check idempotency, post placeholder "Thinking..." message in thread (typing indicator per user decision), dispatch to Celery with `handle_message_task.delay(msg.model_dump() | {"placeholder_ts": placeholder_msg["ts"], "channel_id": event["channel"]})`.
   - Handle `message` event (DMs only — filter `channel_type == "im"`): Same flow as app_mention.
   - Thread follow-up behavior (Claude's discretion): Implement auto-follow for engaged threads. After the first @mention in a thread, store `engaged_thread_key(tenant_id, thread_id)` in Redis with 30-minute TTL. Subsequent messages in that thread (even without @mention) trigger a response. Per research recommendation.
   - If rate limit exceeded: Post an ephemeral message to the user: "I'm receiving too many requests right now. Please try again in a moment." Do NOT dispatch to Celery.
   - If tenant resolution fails (unknown workspace): Log warning and ignore the event silently.
   - CRITICAL: Return HTTP 200 immediately. NO LLM work inside the handler. Slack retries after 3 seconds.

3. Create `packages/gateway/main.py`:
   - FastAPI app mounting slack-bolt AsyncApp via `AsyncSlackRequestHandler`
   - `POST /slack/events` endpoint handled by the slack handler
   - `GET /health` endpoint
   - Port 8001

4. Create `packages/router/tenant.py`:
   - `async def resolve_tenant(workspace_id: str, channel_type: ChannelType, session: AsyncSession) -> str | None`: Queries `channel_connections` table for matching workspace_id + channel_type, returns tenant_id or None. Uses RLS-free query (tenant resolution must work across all tenants — this is the one pre-RLS operation).

5. Create `packages/router/ratelimit.py`:
   - `async def check_rate_limit(tenant_id: str, channel: str, redis: Redis) -> bool`: Implements token bucket using Redis. Uses `rate_limit_key(tenant_id, channel)` from shared redis_keys. Default: 30 requests per minute per tenant per channel (configurable). Returns True if allowed, raises `RateLimitExceeded` if not.
   - `class RateLimitExceeded(Exception)`: Custom exception with remaining_seconds attribute.

6. Create `packages/router/idempotency.py`:
   - `async def is_duplicate(tenant_id: str, message_id: str, redis: Redis) -> bool`: Checks Redis for `idempotency_key(tenant_id, message_id)`. If exists, return True (duplicate). Otherwise, set with 24-hour TTL and return False.

7. Create `packages/router/context.py`:
   - `async def load_agent_for_tenant(tenant_id: str, session: AsyncSession) -> Agent | None`: Loads the active agent for the tenant (Phase 1 = single agent per tenant). Sets `current_tenant_id` ContextVar before querying.

8. Update the Celery `handle_message` task in `packages/orchestrator/tasks.py` (or instruct Plan 02 SUMMARY reader to expect this):
   - After generating LLM response, use `slack_bolt` or `httpx` to call `chat.update` on the placeholder message (replace "Thinking..." with the real response). Use `placeholder_ts` and `channel_id` from the task payload.
   - Per user decision: Always reply in threads.

9. Update `docker-compose.yml` to add `gateway` service on port 8001, depending on redis, postgres, celery-worker.

10. Create `__init__.py` files for gateway, gateway/channels, router packages.
cd /home/adelorenzo/repos/konstruct && python -c "from packages.gateway.normalize import normalize_slack_event; from packages.router.tenant import resolve_tenant; from packages.router.ratelimit import check_rate_limit; print('Gateway + Router imports OK')" - Slack @mentions and DMs are handled by slack-bolt AsyncApp in HTTP mode - Messages are normalized to KonstructMessage format - Tenant resolution maps workspace_id to tenant_id - Rate limiting enforces per-tenant per-channel limits with Redis token bucket - Idempotency deduplication prevents double-processing of Slack retries - Placeholder "Thinking..." message posted immediately, replaced with LLM response - Auto-follow engaged threads with 30-minute idle timeout - HTTP 200 returned within 3 seconds, all LLM work dispatched to Celery Task 2: End-to-end integration tests for Slack flow, rate limiting, and agent persona tests/unit/test_ratelimit.py, tests/integration/test_slack_flow.py, tests/integration/test_agent_persona.py, tests/integration/test_ratelimit.py 1. Create `tests/unit/test_ratelimit.py` (CHAN-05 unit): - Test token bucket allows requests under the limit (29 of 30) - Test token bucket rejects the 31st request in a 1-minute window - Test that rate limit keys are namespaced per tenant (tenant_a limit independent of tenant_b) - Test that rate limit resets after the window expires - Use fakeredis or real Redis from Docker Compose
2. Create `tests/integration/test_ratelimit.py` (CHAN-05 integration):
   - Test the full flow: send a Slack event that exceeds rate limit, verify that an ephemeral "too many requests" message is sent back via Slack API (mock Slack client)
   - Verify the event is NOT dispatched to Celery when rate-limited

3. Create `tests/integration/test_slack_flow.py` (CHAN-02):
   - Mock Slack client (no real Slack workspace needed)
   - Mock LLM pool response (no real LLM call needed)
   - Test full flow: Slack app_mention event -> normalize -> resolve tenant -> dispatch Celery -> LLM call -> chat.update with response
   - Verify the response is posted in-thread (thread_ts is set)
   - Verify placeholder "Thinking..." message is posted before Celery dispatch
   - Verify the placeholder is replaced with the real response
   - Test DM flow: message event with channel_type="im" triggers the same pipeline
   - Test that bot messages are ignored (no infinite loop)
   - Test that unknown workspace_id events are silently ignored

4. Create `tests/integration/test_agent_persona.py` (AGNT-01):
   - Create a tenant with an agent configured with name="Mara", role="Customer Support", persona="Professional and empathetic"
   - Mock the LLM pool to capture the messages array sent to /complete
   - Trigger a message through the pipeline
   - Verify the system prompt contains: "Your name is Mara", "Your role is Customer Support", "Professional and empathetic", and the AI transparency clause
   - Verify model_preference from the agent config is passed to the LLM pool
cd /home/adelorenzo/repos/konstruct && pytest tests/unit/test_ratelimit.py tests/integration/test_slack_flow.py tests/integration/test_agent_persona.py tests/integration/test_ratelimit.py -x -q - Rate limiting unit tests verify token bucket behavior per tenant per channel - Slack flow integration test proves end-to-end: event -> normalize -> tenant resolve -> Celery -> LLM -> thread reply - Agent persona test proves system prompt reflects name, role, persona, and AI transparency clause - Rate limit integration test proves over-limit requests get informative rejection - All tests pass with mocked Slack client and mocked LLM pool - `pytest tests/unit/test_ratelimit.py -x` verifies token bucket logic - `pytest tests/integration/test_slack_flow.py -x` proves end-to-end Slack -> LLM -> reply - `pytest tests/integration/test_agent_persona.py -x` proves persona reflected in system prompt - `pytest tests/integration/test_ratelimit.py -x` proves rate limit produces informative rejection - `grep -r "async def.*@app.task" packages/orchestrator/` returns NO matches (no async Celery tasks)

<success_criteria>

  • A Slack @mention or DM triggers an LLM response in the same thread (mocked end-to-end)
  • Rate-limited requests are rejected with informative message, not silently dropped
  • Agent persona (name, role, persona) is reflected in the LLM system prompt
  • Typing indicator (placeholder message) appears before LLM response
  • All tests green, no async Celery tasks </success_criteria>
After completion, create `.planning/phases/01-foundation/01-03-SUMMARY.md`