docs(01-03): complete Channel Gateway + Message Router plan

This commit is contained in:
2026-03-23 10:34:50 -06:00
parent 74326dfc3d
commit 44f5b98890
4 changed files with 209 additions and 14 deletions

View File

@@ -0,0 +1,191 @@
---
phase: 01-foundation
plan: 03
subsystem: messaging
tags: [slack, slack-bolt, redis, celery, fastapi, fakeredis, pytest, token-bucket, idempotency]
# Dependency graph
requires:
- phase: 01-foundation plan 01
provides: "KonstructMessage, ChannelType, SenderInfo, MessageContent, redis_keys, rls ContextVar, DB session factory, shared config"
- phase: 01-foundation plan 02
provides: "handle_message Celery task, build_system_prompt, build_messages, run_agent, sync-def Celery pattern"
provides:
- "Channel Gateway FastAPI service (port 8001) with POST /slack/events and GET /health"
- "slack-bolt AsyncApp event handlers for app_mention and DM events"
- "normalize_slack_event: Slack event -> KonstructMessage (strips bot mention, extracts thread_id)"
- "resolve_tenant: workspace_id -> tenant_id DB lookup (RLS-bypass query)"
- "check_rate_limit: Redis token bucket, 30 req/min per tenant per channel, raises RateLimitExceeded"
- "is_duplicate + mark_processed: Redis SET NX idempotency deduplication (24h TTL)"
- "load_agent_for_tenant: loads active agent with RLS ContextVar scoping"
- "Placeholder 'Thinking...' message posted in-thread before Celery dispatch"
- "Auto-follow engaged threads with 30-minute Redis TTL"
- "chat.update after LLM response replaces 'Thinking...' placeholder with real response"
- "45 tests: 11 rate limit unit, 15 Slack flow integration, 15 persona integration, 4 rate limit integration"
affects:
- "Phase 2 channel plans (Mattermost, WhatsApp, Teams) — follow same normalize->resolve->ratelimit->dispatch pattern"
- "Phase 2 memory plans — extend _process_message to load conversation history before LLM call"
- "Phase 2 team orchestration — coordinator agent replaces single-agent dispatch"
# Tech tracking
tech-stack:
added:
- "slack-bolt>=1.22.0 (AsyncApp, AsyncSlackRequestHandler)"
- "redis>=5.0.0 (async Redis client — redis.asyncio.Redis)"
- "fakeredis>=2.28.0 (test fixture, already in dev deps)"
patterns:
- "Channel adapter pattern: every adapter calls normalize -> resolve_tenant -> check_rate_limit -> is_duplicate -> post_placeholder -> handle_message.delay"
- "Patch at usage site: mock 'gateway.channels.slack.resolve_tenant', not 'router.tenant.resolve_tenant'"
- "Celery payload extension: pop extra keys (placeholder_ts, channel_id) before KonstructMessage.model_validate"
- "Slack bot token stored in channel_connections.config['bot_token'] — loaded in orchestrator for chat.update"
key-files:
created:
- packages/gateway/gateway/__init__.py
- packages/gateway/gateway/main.py
- packages/gateway/gateway/normalize.py
- packages/gateway/gateway/verify.py
- packages/gateway/gateway/channels/__init__.py
- packages/gateway/gateway/channels/slack.py
- packages/router/router/__init__.py
- packages/router/router/main.py
- packages/router/router/tenant.py
- packages/router/router/ratelimit.py
- packages/router/router/idempotency.py
- packages/router/router/context.py
- tests/unit/test_ratelimit.py
- tests/integration/test_slack_flow.py
- tests/integration/test_agent_persona.py
- tests/integration/test_ratelimit.py
modified:
- packages/orchestrator/orchestrator/tasks.py
- packages/gateway/pyproject.toml
- packages/router/pyproject.toml
- docker-compose.yml
key-decisions:
- "Patch at usage site in tests: 'gateway.channels.slack.resolve_tenant' not 'router.tenant.resolve_tenant' — Python's name binding means patching the module where the name is imported is the only reliable approach"
- "Celery payload extension via dict merge + pop: task_payload = msg.model_dump() | {extra_keys}, extras popped before model_validate in tasks.py — avoids pydantic validation error on unknown fields"
- "Bot token for chat.update loaded from channel_connections.config['bot_token'] inside Celery task — keeps orchestrator self-contained and avoids Slack SDK dependency in orchestrator package"
- "rate_limit default 30 req/min per settings.default_rate_limit_rpm (config says 60, handler defaults to 30) — plan specified 30, left as per-call parameter for easy per-tenant override in Phase 2"
- "uv sync uninstalls editable packages if workspace is out of sync — use 'uv pip install -e ...' after adding new workspace packages"
patterns-established:
- "Channel adapter pattern: normalize -> resolve_tenant -> check_rate_limit -> is_duplicate -> post_placeholder -> Celery.delay — all future channels follow this sequence"
- "Mock at usage site: test patches must target where the symbol is bound (e.g., gateway.channels.slack.X), not where it is defined (router.X)"
- "Celery task payload extras: extend msg.model_dump() dict with channel-specific metadata, pop extras in task before model_validate"
requirements-completed: [CHAN-02, CHAN-05, AGNT-01]
# Metrics
duration: 9min
completed: 2026-03-23
---
# Phase 1 Plan 3: Channel Gateway + Message Router Summary
**slack-bolt AsyncApp gateway (port 8001) with tenant resolution, Redis token bucket rate limiting, idempotency deduplication, and 'Thinking...' placeholder replaced via chat.update after LLM response**
## Performance
- **Duration:** 9 min
- **Started:** 2026-03-23T16:23:31Z
- **Completed:** 2026-03-23T16:32:59Z
- **Tasks:** 2
- **Files modified:** 20 (16 created, 4 modified)
## Accomplishments
- Channel Gateway FastAPI app (port 8001): POST /slack/events via slack-bolt AsyncApp, GET /health
- Slack event handlers: app_mention + DM (channel_type=im), placeholder posted before Celery dispatch, auto-follow engaged threads with 30-min TTL
- Message Router modules: resolve_tenant (RLS-bypass), check_rate_limit (token bucket, RateLimitExceeded exception), is_duplicate (SET NX 24h TTL), load_agent_for_tenant
- Orchestrator tasks.py updated: pops placeholder_ts/channel_id from payload, calls chat.update after LLM response
- 45 tests across all behavioral requirements: rate limit unit (11), Slack flow integration (15), agent persona integration (15), rate limit integration (4)
## Task Commits
Each task was committed atomically:
1. **Task 1: Channel Gateway (Slack adapter) and Message Router** - `6f30705` (feat)
2. **Task 2: End-to-end integration tests for Slack flow, rate limiting, and agent persona** - `74326df` (feat)
**Plan metadata:** _(docs commit follows self-check)_
## Files Created/Modified
- `packages/gateway/gateway/main.py` — FastAPI app port 8001: /slack/events + /health, AsyncSlackRequestHandler mount
- `packages/gateway/gateway/normalize.py` — normalize_slack_event: strips <@BOT> tokens, extracts thread_id, channel_metadata
- `packages/gateway/gateway/verify.py` — Slack signature verification helper (HMAC SHA256)
- `packages/gateway/gateway/channels/slack.py` — register_slack_handlers: app_mention + DM, placeholder, rate limit, idempotency, thread engagement
- `packages/router/router/tenant.py` — resolve_tenant: RLS-bypass SELECT on channel_connections
- `packages/router/router/ratelimit.py` — check_rate_limit: INCR + EXPIRE token bucket, RateLimitExceeded(remaining_seconds)
- `packages/router/router/idempotency.py` — is_duplicate + mark_processed: SET NX with 24h TTL
- `packages/router/router/context.py` — load_agent_for_tenant: SELECT with RLS ContextVar scoping
- `packages/router/router/main.py` — Router FastAPI app skeleton with /health
- `packages/orchestrator/orchestrator/tasks.py` — handle_message: pops placeholder_ts/channel_id, _update_slack_placeholder via chat.update
- `docker-compose.yml` — gateway service on port 8001 (depends on redis, postgres, celery-worker)
- `tests/unit/test_ratelimit.py` — 11 token bucket unit tests (fakeredis)
- `tests/integration/test_slack_flow.py` — 15 end-to-end Slack flow tests (mocked client + Celery)
- `tests/integration/test_agent_persona.py` — 15 persona/system prompt tests (mocked LLM pool)
- `tests/integration/test_ratelimit.py` — 4 rate limit integration tests
## Decisions Made
- **Patch at usage site:** All test mocks patch `gateway.channels.slack.X` rather than `router.X`. Python's name binding means only patching the imported name in the using module produces correct mock behavior.
- **Celery payload extension:** `msg.model_dump() | {"placeholder_ts": ..., "channel_id": ...}` passed as Celery payload; extras popped before `KonstructMessage.model_validate` in tasks.py to avoid pydantic validation errors.
- **Bot token via channel_connections:** `chat.update` requires the per-tenant bot token, loaded from `channel_connections.config['bot_token']` inside the Celery task. Keeps Slack SDK out of the orchestrator package.
- **Rate limit default 30/min:** Handler defaults match the plan's specification (30 req/min). `settings.default_rate_limit_rpm = 60` is used for future per-tenant override capability in Phase 2.
## Deviations from Plan
### Auto-fixed Issues
**1. [Rule 3 - Blocking] Fixed uv workspace sync dropped editable packages**
- **Found during:** Task 2 (test execution)
- **Issue:** `uv sync` uninstalled all packages when run with an incomplete workspace lockfile, leaving the .pth files empty and `router` / `gateway` unimportable.
- **Fix:** Ran `uv pip install -e packages/shared -e packages/router -e packages/gateway -e packages/orchestrator -e packages/llm-pool` to restore editable installs. Tests then ran correctly.
- **Files modified:** None (environment fix only)
- **Committed in:** Not committed (environment state, not code)
**2. [Rule 1 - Bug] Fixed mock patch targets from module definition to usage site**
- **Found during:** Task 2 (first test run)
- **Issue:** Tests patched `router.tenant.resolve_tenant` but `slack.py` imports `from router.tenant import resolve_tenant` — Python already bound the name at import time, so the original module patch had no effect.
- **Fix:** Changed all test patches to target `gateway.channels.slack.resolve_tenant` (and similarly for `is_duplicate`).
- **Files modified:** `tests/integration/test_slack_flow.py`, `tests/integration/test_ratelimit.py`
- **Verification:** All 45 tests pass
- **Committed in:** `74326df` (Task 2 commit)
---
**Total deviations:** 2 (1 environment fix, 1 test patch correction)
**Impact on plan:** Both fixes necessary for tests to run. No production code logic changed.
## Issues Encountered
- `uv sync` without `-e` flags silently uninstalled all workspace editable packages on a fresh run — resolved by explicit `uv pip install -e` for each package.
## User Setup Required
**Slack app configuration required before real Slack events can be received.** For local development/testing, all tests run with mocked Slack clients — no Slack credentials needed.
For production use:
1. Create a Slack App at https://api.slack.com/apps
2. Add `SLACK_BOT_TOKEN`, `SLACK_SIGNING_SECRET`, `SLACK_APP_TOKEN` to `.env`
3. Configure Events API webhook URL: `https://your-domain/slack/events`
4. Subscribe to bot events: `app_mention`, `message.im`
5. Store bot token per-tenant in `channel_connections.config['bot_token']` for chat.update to work
## Next Phase Readiness
- Slack end-to-end loop is complete (mocked): @mention -> normalize -> tenant resolve -> Celery -> LLM -> chat.update
- Real Slack testing requires: bot token + signing secret in .env + ngrok/webhook configuration
- Pattern established for future channel adapters (Telegram, Mattermost, WhatsApp) — follow same sequence
- Thread auto-follow (engaged threads) is live with 30-minute TTL
## Self-Check: PASSED
---
*Phase: 01-foundation*
*Completed: 2026-03-23*