330 lines
17 KiB
Markdown
330 lines
17 KiB
Markdown
---
|
|
phase: 06-web-chat
|
|
plan: 01
|
|
type: execute
|
|
wave: 1
|
|
depends_on: []
|
|
files_modified:
|
|
- packages/shared/shared/models/message.py
|
|
- packages/shared/shared/redis_keys.py
|
|
- packages/shared/shared/models/chat.py
|
|
- packages/shared/shared/api/chat.py
|
|
- packages/shared/shared/api/__init__.py
|
|
- packages/gateway/gateway/channels/web.py
|
|
- packages/gateway/gateway/main.py
|
|
- packages/orchestrator/orchestrator/tasks.py
|
|
- migrations/versions/008_web_chat.py
|
|
- tests/unit/test_web_channel.py
|
|
- tests/unit/test_chat_api.py
|
|
autonomous: true
|
|
requirements:
|
|
- CHAT-01
|
|
- CHAT-02
|
|
- CHAT-03
|
|
- CHAT-04
|
|
- CHAT-05
|
|
|
|
must_haves:
|
|
truths:
|
|
- "Web channel messages normalize into valid KonstructMessage with channel='web'"
|
|
- "Celery _send_response publishes web channel responses to Redis pub-sub"
|
|
- "WebSocket endpoint accepts connections and dispatches messages to Celery pipeline"
|
|
- "Typing indicator event is sent immediately after receiving a user message"
|
|
- "Chat REST API enforces RBAC — non-members get 403"
|
|
- "Platform admin can access conversations for any tenant"
|
|
- "Conversation history persists in DB and is loadable via REST"
|
|
artifacts:
|
|
- path: "packages/shared/shared/models/chat.py"
|
|
provides: "WebConversation and WebConversationMessage ORM models"
|
|
contains: "class WebConversation"
|
|
- path: "packages/gateway/gateway/channels/web.py"
|
|
provides: "WebSocket endpoint and web channel normalizer"
|
|
contains: "async def chat_websocket"
|
|
- path: "packages/shared/shared/api/chat.py"
|
|
provides: "REST API for conversation CRUD"
|
|
exports: ["chat_router"]
|
|
- path: "migrations/versions/008_web_chat.py"
|
|
provides: "DB migration for web_conversations and web_conversation_messages tables"
|
|
contains: "web_conversations"
|
|
- path: "tests/unit/test_web_channel.py"
|
|
provides: "Unit tests for web channel adapter"
|
|
contains: "test_normalize_web_event"
|
|
- path: "tests/unit/test_chat_api.py"
|
|
provides: "Unit tests for chat REST API with RBAC"
|
|
contains: "test_chat_rbac_enforcement"
|
|
key_links:
|
|
- from: "packages/gateway/gateway/channels/web.py"
|
|
to: "packages/orchestrator/orchestrator/tasks.py"
|
|
via: "handle_message.delay() Celery dispatch"
|
|
pattern: "handle_message\\.delay"
|
|
- from: "packages/orchestrator/orchestrator/tasks.py"
|
|
to: "packages/shared/shared/redis_keys.py"
|
|
via: "Redis pub-sub publish for web channel"
|
|
pattern: "webchat_response_key"
|
|
- from: "packages/gateway/gateway/channels/web.py"
|
|
to: "packages/shared/shared/redis_keys.py"
|
|
via: "Redis pub-sub subscribe for response delivery"
|
|
pattern: "webchat_response_key"
|
|
- from: "packages/shared/shared/api/chat.py"
|
|
to: "packages/shared/shared/api/rbac.py"
|
|
via: "require_tenant_member RBAC guard"
|
|
pattern: "require_tenant_member"
|
|
|
|
user_setup: []
|
|
---
|
|
|
|
<objective>
|
|
Build the complete backend infrastructure for web chat: DB schema, ORM models, web channel adapter with WebSocket endpoint, Redis pub-sub response bridge, chat REST API with RBAC, and orchestrator integration. After this plan, the portal can send messages via WebSocket and receive responses through the full agent pipeline.
|
|
|
|
Purpose: Enables the portal to use the same agent pipeline as Slack/WhatsApp via a new "web" channel — the foundational plumbing that the frontend chat UI (Plan 02) connects to.
|
|
Output: Working WebSocket endpoint, conversation persistence, RBAC-enforced REST API, and unit tests.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md
|
|
@/home/adelorenzo/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/06-web-chat/06-CONTEXT.md
|
|
@.planning/phases/06-web-chat/06-RESEARCH.md
|
|
|
|
<interfaces>
|
|
<!-- Key types and contracts the executor needs. Extracted from codebase. -->
|
|
|
|
From packages/shared/shared/models/message.py:
|
|
```python
|
|
class ChannelType(StrEnum):
|
|
SLACK = "slack"
|
|
WHATSAPP = "whatsapp"
|
|
MATTERMOST = "mattermost"
|
|
ROCKETCHAT = "rocketchat"
|
|
TEAMS = "teams"
|
|
TELEGRAM = "telegram"
|
|
SIGNAL = "signal"
|
|
# WEB = "web" <-- ADD THIS
|
|
|
|
class KonstructMessage(BaseModel):
|
|
id: str
|
|
tenant_id: str | None
|
|
channel: ChannelType
|
|
channel_metadata: dict[str, Any]
|
|
sender: SenderInfo
|
|
content: MessageContent
|
|
timestamp: datetime
|
|
thread_id: str | None
|
|
reply_to: str | None
|
|
context: dict[str, Any]
|
|
```
|
|
|
|
From packages/shared/shared/redis_keys.py:
|
|
```python
|
|
# All keys follow: {tenant_id}:{key_type}:{discriminator}
|
|
def memory_short_key(tenant_id: str, agent_id: str, user_id: str) -> str
|
|
def escalation_status_key(tenant_id: str, thread_id: str) -> str
|
|
# ADD: webchat_response_key(tenant_id, conversation_id)
|
|
```
|
|
|
|
From packages/shared/shared/api/rbac.py:
|
|
```python
|
|
@dataclass
|
|
class PortalCaller:
|
|
user_id: uuid.UUID
|
|
role: str
|
|
tenant_id: uuid.UUID | None = None
|
|
|
|
async def get_portal_caller(...) -> PortalCaller
|
|
async def require_tenant_member(tenant_id: UUID, caller: PortalCaller, session: AsyncSession) -> None
|
|
async def require_tenant_admin(tenant_id: UUID, caller: PortalCaller, session: AsyncSession) -> None
|
|
```
|
|
|
|
From packages/orchestrator/orchestrator/tasks.py:
|
|
```python
|
|
# handle_message pops extras before model_validate:
|
|
# placeholder_ts, channel_id, phone_number_id, bot_token
|
|
# ADD: conversation_id, portal_user_id, tenant_id (for web)
|
|
|
|
# _send_response routes by channel_str:
|
|
# "slack" -> _update_slack_placeholder
|
|
# "whatsapp" -> send_whatsapp_message
|
|
# ADD: "web" -> Redis pub-sub publish
|
|
|
|
# _build_response_extras builds channel-specific extras dict
|
|
# ADD: "web" case returning conversation_id + tenant_id
|
|
```
|
|
|
|
From packages/shared/shared/api/__init__.py:
|
|
```python
|
|
# Current routers mounted on gateway:
|
|
# portal_router, billing_router, channels_router, llm_keys_router,
|
|
# usage_router, webhook_router, invitations_router, templates_router
|
|
# ADD: chat_router
|
|
```
|
|
|
|
From packages/gateway/gateway/main.py:
|
|
```python
|
|
# CORS allows: localhost:3000, 127.0.0.1:3000, 100.64.0.10:3000
|
|
# WebSocket doesn't use CORS (browser doesn't enforce) but same origin rules apply
|
|
# Include chat_router and WebSocket router here
|
|
```
|
|
</interfaces>
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto" tdd="true">
|
|
<name>Task 1: Backend models, migration, channel type, Redis key, and unit tests</name>
|
|
<files>
|
|
packages/shared/shared/models/message.py,
|
|
packages/shared/shared/redis_keys.py,
|
|
packages/shared/shared/models/chat.py,
|
|
migrations/versions/008_web_chat.py,
|
|
tests/unit/test_web_channel.py,
|
|
tests/unit/test_chat_api.py
|
|
</files>
|
|
<behavior>
|
|
- test_normalize_web_event: normalize_web_event({text, tenant_id, agent_id, user_id, conversation_id}) -> KonstructMessage with channel=WEB, thread_id=conversation_id, sender.user_id=portal_user_id
|
|
- test_send_response_web_publishes_to_redis: _send_response("web", "hello", {conversation_id, tenant_id}) publishes JSON to Redis channel matching webchat_response_key(tenant_id, conversation_id)
|
|
- test_typing_indicator_sent: WebSocket handler sends {"type": "typing"} immediately after receiving user message, before Celery dispatch
|
|
- test_chat_rbac_enforcement: GET /api/portal/chat/conversations?tenant_id=X returns 403 when caller is not a member of tenant X
|
|
- test_platform_admin_cross_tenant: GET /api/portal/chat/conversations?tenant_id=X returns 200 when caller is platform_admin (bypasses membership)
|
|
- test_list_conversation_history: GET /api/portal/chat/conversations/{id}/messages returns paginated messages ordered by created_at
|
|
- test_create_conversation: POST /api/portal/chat/conversations with {tenant_id, agent_id} creates or returns existing conversation for user+agent pair
|
|
</behavior>
|
|
<action>
|
|
1. Add WEB = "web" to ChannelType in packages/shared/shared/models/message.py
|
|
|
|
2. Add webchat_response_key(tenant_id, conversation_id) to packages/shared/shared/redis_keys.py following existing pattern: return f"{tenant_id}:webchat:response:{conversation_id}"
|
|
|
|
3. Create packages/shared/shared/models/chat.py with ORM models:
|
|
- WebConversation: id (UUID PK), tenant_id (UUID, FK tenants.id), agent_id (UUID, FK agents.id), user_id (UUID, FK portal_users.id), created_at, updated_at. UniqueConstraint on (tenant_id, agent_id, user_id). RLS via tenant_id.
|
|
- WebConversationMessage: id (UUID PK), conversation_id (UUID, FK web_conversations.id ON DELETE CASCADE), tenant_id (UUID), role (TEXT, CHECK "user"/"assistant"), content (TEXT), created_at. RLS via tenant_id.
|
|
Use mapped_column() + Mapped[] (SQLAlchemy 2.0 pattern, not Column()).
|
|
|
|
4. Create migration 008_web_chat.py:
|
|
- Create web_conversations table with columns matching ORM model
|
|
- Create web_conversation_messages table with FK to web_conversations
|
|
- Enable RLS on both tables (FORCE ROW LEVEL SECURITY)
|
|
- Create RLS policies matching existing pattern (current_setting('app.current_tenant')::uuid)
|
|
- ALTER CHECK constraint on channel_connections.channel_type to include 'web' (see Pitfall 5 in RESEARCH.md — the existing CHECK must be replaced, not just added to)
|
|
- Add index on web_conversation_messages(conversation_id, created_at)
|
|
|
|
5. Write test files FIRST (RED phase):
|
|
- tests/unit/test_web_channel.py: test normalize_web_event, test _send_response web publishes to Redis (mock aioredis), test typing indicator
|
|
- tests/unit/test_chat_api.py: test RBAC enforcement (403 for non-member), platform admin cross-tenant (200), list history (paginated), create conversation (get-or-create)
|
|
Use httpx AsyncClient with app fixture pattern from existing tests. Mock DB sessions and Redis.
|
|
|
|
IMPORTANT: Celery tasks MUST be sync def with asyncio.run() — never async def (hard architectural constraint).
|
|
IMPORTANT: Use TEXT+CHECK for role column (not sa.Enum) per Phase 1 convention.
|
|
IMPORTANT: _send_response "web" case must use try/finally around aioredis.from_url() to avoid connection leaks (Pitfall 2 from RESEARCH.md).
|
|
</action>
|
|
<verify>
|
|
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x -v</automated>
|
|
</verify>
|
|
<done>
|
|
ChannelType.WEB exists. webchat_response_key function exists. ORM models define web_conversations and web_conversation_messages. Migration 008 creates both tables with RLS and updates channel_type CHECK constraint. All test assertions pass (RED then GREEN).
|
|
</done>
|
|
</task>
|
|
|
|
<task type="auto">
|
|
<name>Task 2: WebSocket endpoint, web channel adapter, REST API, orchestrator wiring</name>
|
|
<files>
|
|
packages/gateway/gateway/channels/web.py,
|
|
packages/shared/shared/api/chat.py,
|
|
packages/shared/shared/api/__init__.py,
|
|
packages/gateway/gateway/main.py,
|
|
packages/orchestrator/orchestrator/tasks.py
|
|
</files>
|
|
<action>
|
|
1. Create packages/gateway/gateway/channels/web.py with:
|
|
a. normalize_web_event() function: takes dict with {text, tenant_id, agent_id, user_id, display_name, conversation_id} and returns KonstructMessage with channel=ChannelType.WEB, thread_id=conversation_id, sender.user_id=user_id (portal user UUID string), channel_metadata={portal_user_id, tenant_id, conversation_id}
|
|
b. WebSocket endpoint at /chat/ws/{conversation_id}:
|
|
- Accept connection
|
|
- Wait for first JSON message with type="auth" containing {userId, role, tenantId} (browser cannot send custom headers — Pitfall 1 from RESEARCH.md)
|
|
- Validate auth: userId must be non-empty UUID string, role must be valid
|
|
- For each subsequent message (type="message"):
|
|
* Immediately send {"type": "typing"} back to client (CHAT-05)
|
|
* Normalize message to KonstructMessage via normalize_web_event
|
|
* Save user message to web_conversation_messages table
|
|
* Build extras dict: conversation_id, portal_user_id, tenant_id
|
|
* Dispatch handle_message.delay(msg.model_dump() | extras)
|
|
* Subscribe to Redis pub-sub channel webchat_response_key(tenant_id, conversation_id) with 60s timeout
|
|
* When response arrives: save assistant message to web_conversation_messages, send {"type": "response", "text": ..., "conversation_id": ...} to WebSocket
|
|
- On disconnect: unsubscribe and close Redis connections
|
|
c. Create an APIRouter with the WebSocket route for mounting
|
|
|
|
2. Create packages/shared/shared/api/chat.py with REST endpoints:
|
|
a. GET /api/portal/chat/conversations?tenant_id={id} — list conversations for the authenticated user within a tenant. For platform_admin: returns conversations across all tenants if no tenant_id. Uses require_tenant_member for RBAC. Returns [{id, agent_id, agent_name, updated_at, last_message_preview}] sorted by updated_at DESC.
|
|
b. GET /api/portal/chat/conversations/{id}/messages?limit=50&before={cursor} — paginated message history. Verify caller owns the conversation (same user_id) OR is platform_admin. Returns [{id, role, content, created_at}] ordered by created_at ASC.
|
|
c. POST /api/portal/chat/conversations — create or get-or-create conversation. Body: {tenant_id, agent_id}. Uses require_tenant_member. Returns conversation object with id.
|
|
d. DELETE /api/portal/chat/conversations/{id} — reset conversation (delete messages, keep row). Updates updated_at. Verify ownership.
|
|
All endpoints use Depends(get_portal_caller) and Depends(get_session). Set RLS context var (configure_rls_hook + current_tenant_id.set) before DB queries.
|
|
|
|
3. Update packages/shared/shared/api/__init__.py: add chat_router to imports and __all__
|
|
|
|
4. Update packages/gateway/gateway/main.py:
|
|
- Import chat_router from shared.api and web channel router from gateway.channels.web
|
|
- app.include_router(chat_router) for REST endpoints
|
|
- app.include_router(web_chat_router) for WebSocket endpoint
|
|
- Add comment block "Phase 6 Web Chat routers"
|
|
|
|
5. Update packages/orchestrator/orchestrator/tasks.py:
|
|
a. In handle_message: pop "conversation_id" and "portal_user_id" before model_validate (same pattern as placeholder_ts, channel_id). Add to extras dict.
|
|
b. In _build_response_extras: add "web" case returning {"conversation_id": extras.get("conversation_id"), "tenant_id": extras.get("tenant_id")}. Note: tenant_id for web comes from extras, not from channel_metadata like Slack.
|
|
c. In _send_response: add "web" case that publishes to Redis pub-sub:
|
|
```python
|
|
elif channel_str == "web":
|
|
conversation_id = extras.get("conversation_id", "")
|
|
tenant_id = extras.get("tenant_id", "")
|
|
if not conversation_id or not tenant_id:
|
|
logger.warning("_send_response: web channel missing conversation_id or tenant_id")
|
|
return
|
|
response_channel = webchat_response_key(tenant_id, conversation_id)
|
|
publish_redis = aioredis.from_url(settings.redis_url)
|
|
try:
|
|
await publish_redis.publish(response_channel, json.dumps({
|
|
"type": "response", "text": text, "conversation_id": conversation_id,
|
|
}))
|
|
finally:
|
|
await publish_redis.aclose()
|
|
```
|
|
d. Import webchat_response_key from shared.redis_keys at module level (matches existing import pattern for other keys)
|
|
|
|
IMPORTANT: WebSocket auth via JSON message after connection (NOT URL params or headers — browser limitation).
|
|
IMPORTANT: Redis pub-sub subscribe in WebSocket handler must use try/finally for cleanup (Pitfall 2).
|
|
IMPORTANT: The web normalizer must set thread_id = conversation_id (Pitfall 3 — conversation ID scopes memory correctly).
|
|
IMPORTANT: For DB access in WebSocket handler, use configure_rls_hook + current_tenant_id context var per existing pattern.
|
|
</action>
|
|
<verify>
|
|
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x -v</automated>
|
|
</verify>
|
|
<done>
|
|
WebSocket endpoint at /chat/ws/{conversation_id} accepts connections, authenticates via JSON message, dispatches to Celery, subscribes to Redis for response. REST API provides conversation CRUD with RBAC. Orchestrator _send_response handles "web" channel via Redis pub-sub publish. All unit tests pass. Gateway mounts both routers.
|
|
</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
1. All unit tests pass: `pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x`
|
|
2. Migration 008 applies cleanly: `cd /home/adelorenzo/repos/konstruct && alembic upgrade head`
|
|
3. Gateway starts without errors: `cd /home/adelorenzo/repos/konstruct/packages/gateway && python -c "from gateway.main import app; print('OK')"`
|
|
4. Full test suite still green: `pytest tests/unit -x`
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
- ChannelType includes WEB
|
|
- WebSocket endpoint exists at /chat/ws/{conversation_id}
|
|
- REST API at /api/portal/chat/* provides conversation CRUD with RBAC
|
|
- _send_response in tasks.py handles "web" channel via Redis pub-sub
|
|
- web_conversations and web_conversation_messages tables created with RLS
|
|
- All 7+ unit tests pass covering CHAT-01 through CHAT-05
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/06-web-chat/06-01-SUMMARY.md`
|
|
</output>
|