---
phase: 06-web-chat
plan: 01
type: execute
wave: 1
depends_on: []
files_modified:
- packages/shared/shared/models/message.py
- packages/shared/shared/redis_keys.py
- packages/shared/shared/models/chat.py
- packages/shared/shared/api/chat.py
- packages/shared/shared/api/__init__.py
- packages/gateway/gateway/channels/web.py
- packages/gateway/gateway/main.py
- packages/orchestrator/orchestrator/tasks.py
- migrations/versions/008_web_chat.py
- tests/unit/test_web_channel.py
- tests/unit/test_chat_api.py
autonomous: true
requirements:
- CHAT-01
- CHAT-02
- CHAT-03
- CHAT-04
- CHAT-05
must_haves:
truths:
- "Web channel messages normalize into valid KonstructMessage with channel='web'"
- "Celery _send_response publishes web channel responses to Redis pub-sub"
- "WebSocket endpoint accepts connections and dispatches messages to Celery pipeline"
- "Typing indicator event is sent immediately after receiving a user message"
- "Chat REST API enforces RBAC — non-members get 403"
- "Platform admin can access conversations for any tenant"
- "Conversation history persists in DB and is loadable via REST"
artifacts:
- path: "packages/shared/shared/models/chat.py"
provides: "WebConversation and WebConversationMessage ORM models"
contains: "class WebConversation"
- path: "packages/gateway/gateway/channels/web.py"
provides: "WebSocket endpoint and web channel normalizer"
contains: "async def chat_websocket"
- path: "packages/shared/shared/api/chat.py"
provides: "REST API for conversation CRUD"
exports: ["chat_router"]
- path: "migrations/versions/008_web_chat.py"
provides: "DB migration for web_conversations and web_conversation_messages tables"
contains: "web_conversations"
- path: "tests/unit/test_web_channel.py"
provides: "Unit tests for web channel adapter"
contains: "test_normalize_web_event"
- path: "tests/unit/test_chat_api.py"
provides: "Unit tests for chat REST API with RBAC"
contains: "test_chat_rbac_enforcement"
key_links:
- from: "packages/gateway/gateway/channels/web.py"
to: "packages/orchestrator/orchestrator/tasks.py"
via: "handle_message.delay() Celery dispatch"
pattern: "handle_message\\.delay"
- from: "packages/orchestrator/orchestrator/tasks.py"
to: "packages/shared/shared/redis_keys.py"
via: "Redis pub-sub publish for web channel"
pattern: "webchat_response_key"
- from: "packages/gateway/gateway/channels/web.py"
to: "packages/shared/shared/redis_keys.py"
via: "Redis pub-sub subscribe for response delivery"
pattern: "webchat_response_key"
- from: "packages/shared/shared/api/chat.py"
to: "packages/shared/shared/api/rbac.py"
via: "require_tenant_member RBAC guard"
pattern: "require_tenant_member"
user_setup: []
---
Build the complete backend infrastructure for web chat: DB schema, ORM models, web channel adapter with WebSocket endpoint, Redis pub-sub response bridge, chat REST API with RBAC, and orchestrator integration. After this plan, the portal can send messages via WebSocket and receive responses through the full agent pipeline.
Purpose: Enables the portal to use the same agent pipeline as Slack/WhatsApp via a new "web" channel — the foundational plumbing that the frontend chat UI (Plan 02) connects to.
Output: Working WebSocket endpoint, conversation persistence, RBAC-enforced REST API, and unit tests.
@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md
@/home/adelorenzo/.claude/get-shit-done/templates/summary.md
@.planning/PROJECT.md
@.planning/ROADMAP.md
@.planning/STATE.md
@.planning/phases/06-web-chat/06-CONTEXT.md
@.planning/phases/06-web-chat/06-RESEARCH.md
From packages/shared/shared/models/message.py:
```python
class ChannelType(StrEnum):
SLACK = "slack"
WHATSAPP = "whatsapp"
MATTERMOST = "mattermost"
ROCKETCHAT = "rocketchat"
TEAMS = "teams"
TELEGRAM = "telegram"
SIGNAL = "signal"
# WEB = "web" <-- ADD THIS
class KonstructMessage(BaseModel):
id: str
tenant_id: str | None
channel: ChannelType
channel_metadata: dict[str, Any]
sender: SenderInfo
content: MessageContent
timestamp: datetime
thread_id: str | None
reply_to: str | None
context: dict[str, Any]
```
From packages/shared/shared/redis_keys.py:
```python
# All keys follow: {tenant_id}:{key_type}:{discriminator}
def memory_short_key(tenant_id: str, agent_id: str, user_id: str) -> str
def escalation_status_key(tenant_id: str, thread_id: str) -> str
# ADD: webchat_response_key(tenant_id, conversation_id)
```
From packages/shared/shared/api/rbac.py:
```python
@dataclass
class PortalCaller:
user_id: uuid.UUID
role: str
tenant_id: uuid.UUID | None = None
async def get_portal_caller(...) -> PortalCaller
async def require_tenant_member(tenant_id: UUID, caller: PortalCaller, session: AsyncSession) -> None
async def require_tenant_admin(tenant_id: UUID, caller: PortalCaller, session: AsyncSession) -> None
```
From packages/orchestrator/orchestrator/tasks.py:
```python
# handle_message pops extras before model_validate:
# placeholder_ts, channel_id, phone_number_id, bot_token
# ADD: conversation_id, portal_user_id, tenant_id (for web)
# _send_response routes by channel_str:
# "slack" -> _update_slack_placeholder
# "whatsapp" -> send_whatsapp_message
# ADD: "web" -> Redis pub-sub publish
# _build_response_extras builds channel-specific extras dict
# ADD: "web" case returning conversation_id + tenant_id
```
From packages/shared/shared/api/__init__.py:
```python
# Current routers mounted on gateway:
# portal_router, billing_router, channels_router, llm_keys_router,
# usage_router, webhook_router, invitations_router, templates_router
# ADD: chat_router
```
From packages/gateway/gateway/main.py:
```python
# CORS allows: localhost:3000, 127.0.0.1:3000, 100.64.0.10:3000
# WebSocket doesn't use CORS (browser doesn't enforce) but same origin rules apply
# Include chat_router and WebSocket router here
```
Task 1: Backend models, migration, channel type, Redis key, and unit tests
packages/shared/shared/models/message.py,
packages/shared/shared/redis_keys.py,
packages/shared/shared/models/chat.py,
migrations/versions/008_web_chat.py,
tests/unit/test_web_channel.py,
tests/unit/test_chat_api.py
- test_normalize_web_event: normalize_web_event({text, tenant_id, agent_id, user_id, conversation_id}) -> KonstructMessage with channel=WEB, thread_id=conversation_id, sender.user_id=portal_user_id
- test_send_response_web_publishes_to_redis: _send_response("web", "hello", {conversation_id, tenant_id}) publishes JSON to Redis channel matching webchat_response_key(tenant_id, conversation_id)
- test_typing_indicator_sent: WebSocket handler sends {"type": "typing"} immediately after receiving user message, before Celery dispatch
- test_chat_rbac_enforcement: GET /api/portal/chat/conversations?tenant_id=X returns 403 when caller is not a member of tenant X
- test_platform_admin_cross_tenant: GET /api/portal/chat/conversations?tenant_id=X returns 200 when caller is platform_admin (bypasses membership)
- test_list_conversation_history: GET /api/portal/chat/conversations/{id}/messages returns paginated messages ordered by created_at
- test_create_conversation: POST /api/portal/chat/conversations with {tenant_id, agent_id} creates or returns existing conversation for user+agent pair
1. Add WEB = "web" to ChannelType in packages/shared/shared/models/message.py
2. Add webchat_response_key(tenant_id, conversation_id) to packages/shared/shared/redis_keys.py following existing pattern: return f"{tenant_id}:webchat:response:{conversation_id}"
3. Create packages/shared/shared/models/chat.py with ORM models:
- WebConversation: id (UUID PK), tenant_id (UUID, FK tenants.id), agent_id (UUID, FK agents.id), user_id (UUID, FK portal_users.id), created_at, updated_at. UniqueConstraint on (tenant_id, agent_id, user_id). RLS via tenant_id.
- WebConversationMessage: id (UUID PK), conversation_id (UUID, FK web_conversations.id ON DELETE CASCADE), tenant_id (UUID), role (TEXT, CHECK "user"/"assistant"), content (TEXT), created_at. RLS via tenant_id.
Use mapped_column() + Mapped[] (SQLAlchemy 2.0 pattern, not Column()).
4. Create migration 008_web_chat.py:
- Create web_conversations table with columns matching ORM model
- Create web_conversation_messages table with FK to web_conversations
- Enable RLS on both tables (FORCE ROW LEVEL SECURITY)
- Create RLS policies matching existing pattern (current_setting('app.current_tenant')::uuid)
- ALTER CHECK constraint on channel_connections.channel_type to include 'web' (see Pitfall 5 in RESEARCH.md — the existing CHECK must be replaced, not just added to)
- Add index on web_conversation_messages(conversation_id, created_at)
5. Write test files FIRST (RED phase):
- tests/unit/test_web_channel.py: test normalize_web_event, test _send_response web publishes to Redis (mock aioredis), test typing indicator
- tests/unit/test_chat_api.py: test RBAC enforcement (403 for non-member), platform admin cross-tenant (200), list history (paginated), create conversation (get-or-create)
Use httpx AsyncClient with app fixture pattern from existing tests. Mock DB sessions and Redis.
IMPORTANT: Celery tasks MUST be sync def with asyncio.run() — never async def (hard architectural constraint).
IMPORTANT: Use TEXT+CHECK for role column (not sa.Enum) per Phase 1 convention.
IMPORTANT: _send_response "web" case must use try/finally around aioredis.from_url() to avoid connection leaks (Pitfall 2 from RESEARCH.md).
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x -v
ChannelType.WEB exists. webchat_response_key function exists. ORM models define web_conversations and web_conversation_messages. Migration 008 creates both tables with RLS and updates channel_type CHECK constraint. All test assertions pass (RED then GREEN).
Task 2: WebSocket endpoint, web channel adapter, REST API, orchestrator wiring
packages/gateway/gateway/channels/web.py,
packages/shared/shared/api/chat.py,
packages/shared/shared/api/__init__.py,
packages/gateway/gateway/main.py,
packages/orchestrator/orchestrator/tasks.py
1. Create packages/gateway/gateway/channels/web.py with:
a. normalize_web_event() function: takes dict with {text, tenant_id, agent_id, user_id, display_name, conversation_id} and returns KonstructMessage with channel=ChannelType.WEB, thread_id=conversation_id, sender.user_id=user_id (portal user UUID string), channel_metadata={portal_user_id, tenant_id, conversation_id}
b. WebSocket endpoint at /chat/ws/{conversation_id}:
- Accept connection
- Wait for first JSON message with type="auth" containing {userId, role, tenantId} (browser cannot send custom headers — Pitfall 1 from RESEARCH.md)
- Validate auth: userId must be non-empty UUID string, role must be valid
- For each subsequent message (type="message"):
* Immediately send {"type": "typing"} back to client (CHAT-05)
* Normalize message to KonstructMessage via normalize_web_event
* Save user message to web_conversation_messages table
* Build extras dict: conversation_id, portal_user_id, tenant_id
* Dispatch handle_message.delay(msg.model_dump() | extras)
* Subscribe to Redis pub-sub channel webchat_response_key(tenant_id, conversation_id) with 60s timeout
* When response arrives: save assistant message to web_conversation_messages, send {"type": "response", "text": ..., "conversation_id": ...} to WebSocket
- On disconnect: unsubscribe and close Redis connections
c. Create an APIRouter with the WebSocket route for mounting
2. Create packages/shared/shared/api/chat.py with REST endpoints:
a. GET /api/portal/chat/conversations?tenant_id={id} — list conversations for the authenticated user within a tenant. For platform_admin: returns conversations across all tenants if no tenant_id. Uses require_tenant_member for RBAC. Returns [{id, agent_id, agent_name, updated_at, last_message_preview}] sorted by updated_at DESC.
b. GET /api/portal/chat/conversations/{id}/messages?limit=50&before={cursor} — paginated message history. Verify caller owns the conversation (same user_id) OR is platform_admin. Returns [{id, role, content, created_at}] ordered by created_at ASC.
c. POST /api/portal/chat/conversations — create or get-or-create conversation. Body: {tenant_id, agent_id}. Uses require_tenant_member. Returns conversation object with id.
d. DELETE /api/portal/chat/conversations/{id} — reset conversation (delete messages, keep row). Updates updated_at. Verify ownership.
All endpoints use Depends(get_portal_caller) and Depends(get_session). Set RLS context var (configure_rls_hook + current_tenant_id.set) before DB queries.
3. Update packages/shared/shared/api/__init__.py: add chat_router to imports and __all__
4. Update packages/gateway/gateway/main.py:
- Import chat_router from shared.api and web channel router from gateway.channels.web
- app.include_router(chat_router) for REST endpoints
- app.include_router(web_chat_router) for WebSocket endpoint
- Add comment block "Phase 6 Web Chat routers"
5. Update packages/orchestrator/orchestrator/tasks.py:
a. In handle_message: pop "conversation_id" and "portal_user_id" before model_validate (same pattern as placeholder_ts, channel_id). Add to extras dict.
b. In _build_response_extras: add "web" case returning {"conversation_id": extras.get("conversation_id"), "tenant_id": extras.get("tenant_id")}. Note: tenant_id for web comes from extras, not from channel_metadata like Slack.
c. In _send_response: add "web" case that publishes to Redis pub-sub:
```python
elif channel_str == "web":
conversation_id = extras.get("conversation_id", "")
tenant_id = extras.get("tenant_id", "")
if not conversation_id or not tenant_id:
logger.warning("_send_response: web channel missing conversation_id or tenant_id")
return
response_channel = webchat_response_key(tenant_id, conversation_id)
publish_redis = aioredis.from_url(settings.redis_url)
try:
await publish_redis.publish(response_channel, json.dumps({
"type": "response", "text": text, "conversation_id": conversation_id,
}))
finally:
await publish_redis.aclose()
```
d. Import webchat_response_key from shared.redis_keys at module level (matches existing import pattern for other keys)
IMPORTANT: WebSocket auth via JSON message after connection (NOT URL params or headers — browser limitation).
IMPORTANT: Redis pub-sub subscribe in WebSocket handler must use try/finally for cleanup (Pitfall 2).
IMPORTANT: The web normalizer must set thread_id = conversation_id (Pitfall 3 — conversation ID scopes memory correctly).
IMPORTANT: For DB access in WebSocket handler, use configure_rls_hook + current_tenant_id context var per existing pattern.
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x -v
WebSocket endpoint at /chat/ws/{conversation_id} accepts connections, authenticates via JSON message, dispatches to Celery, subscribes to Redis for response. REST API provides conversation CRUD with RBAC. Orchestrator _send_response handles "web" channel via Redis pub-sub publish. All unit tests pass. Gateway mounts both routers.
1. All unit tests pass: `pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x`
2. Migration 008 applies cleanly: `cd /home/adelorenzo/repos/konstruct && alembic upgrade head`
3. Gateway starts without errors: `cd /home/adelorenzo/repos/konstruct/packages/gateway && python -c "from gateway.main import app; print('OK')"`
4. Full test suite still green: `pytest tests/unit -x`
- ChannelType includes WEB
- WebSocket endpoint exists at /chat/ws/{conversation_id}
- REST API at /api/portal/chat/* provides conversation CRUD with RBAC
- _send_response in tasks.py handles "web" channel via Redis pub-sub
- web_conversations and web_conversation_messages tables created with RLS
- All 7+ unit tests pass covering CHAT-01 through CHAT-05