Files

17 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves, user_setup
phase plan type wave depends_on files_modified autonomous requirements must_haves user_setup
06-web-chat 01 execute 1
packages/shared/shared/models/message.py
packages/shared/shared/redis_keys.py
packages/shared/shared/models/chat.py
packages/shared/shared/api/chat.py
packages/shared/shared/api/__init__.py
packages/gateway/gateway/channels/web.py
packages/gateway/gateway/main.py
packages/orchestrator/orchestrator/tasks.py
migrations/versions/008_web_chat.py
tests/unit/test_web_channel.py
tests/unit/test_chat_api.py
true
CHAT-01
CHAT-02
CHAT-03
CHAT-04
CHAT-05
truths artifacts key_links
Web channel messages normalize into valid KonstructMessage with channel='web'
Celery _send_response publishes web channel responses to Redis pub-sub
WebSocket endpoint accepts connections and dispatches messages to Celery pipeline
Typing indicator event is sent immediately after receiving a user message
Chat REST API enforces RBAC — non-members get 403
Platform admin can access conversations for any tenant
Conversation history persists in DB and is loadable via REST
path provides contains
packages/shared/shared/models/chat.py WebConversation and WebConversationMessage ORM models class WebConversation
path provides contains
packages/gateway/gateway/channels/web.py WebSocket endpoint and web channel normalizer async def chat_websocket
path provides exports
packages/shared/shared/api/chat.py REST API for conversation CRUD
chat_router
path provides contains
migrations/versions/008_web_chat.py DB migration for web_conversations and web_conversation_messages tables web_conversations
path provides contains
tests/unit/test_web_channel.py Unit tests for web channel adapter test_normalize_web_event
path provides contains
tests/unit/test_chat_api.py Unit tests for chat REST API with RBAC test_chat_rbac_enforcement
from to via pattern
packages/gateway/gateway/channels/web.py packages/orchestrator/orchestrator/tasks.py handle_message.delay() Celery dispatch handle_message.delay
from to via pattern
packages/orchestrator/orchestrator/tasks.py packages/shared/shared/redis_keys.py Redis pub-sub publish for web channel webchat_response_key
from to via pattern
packages/gateway/gateway/channels/web.py packages/shared/shared/redis_keys.py Redis pub-sub subscribe for response delivery webchat_response_key
from to via pattern
packages/shared/shared/api/chat.py packages/shared/shared/api/rbac.py require_tenant_member RBAC guard require_tenant_member
Build the complete backend infrastructure for web chat: DB schema, ORM models, web channel adapter with WebSocket endpoint, Redis pub-sub response bridge, chat REST API with RBAC, and orchestrator integration. After this plan, the portal can send messages via WebSocket and receive responses through the full agent pipeline.

Purpose: Enables the portal to use the same agent pipeline as Slack/WhatsApp via a new "web" channel — the foundational plumbing that the frontend chat UI (Plan 02) connects to. Output: Working WebSocket endpoint, conversation persistence, RBAC-enforced REST API, and unit tests.

<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/06-web-chat/06-CONTEXT.md @.planning/phases/06-web-chat/06-RESEARCH.md

From packages/shared/shared/models/message.py:

class ChannelType(StrEnum):
    SLACK = "slack"
    WHATSAPP = "whatsapp"
    MATTERMOST = "mattermost"
    ROCKETCHAT = "rocketchat"
    TEAMS = "teams"
    TELEGRAM = "telegram"
    SIGNAL = "signal"
    # WEB = "web"  <-- ADD THIS

class KonstructMessage(BaseModel):
    id: str
    tenant_id: str | None
    channel: ChannelType
    channel_metadata: dict[str, Any]
    sender: SenderInfo
    content: MessageContent
    timestamp: datetime
    thread_id: str | None
    reply_to: str | None
    context: dict[str, Any]

From packages/shared/shared/redis_keys.py:

# All keys follow: {tenant_id}:{key_type}:{discriminator}
def memory_short_key(tenant_id: str, agent_id: str, user_id: str) -> str
def escalation_status_key(tenant_id: str, thread_id: str) -> str
# ADD: webchat_response_key(tenant_id, conversation_id)

From packages/shared/shared/api/rbac.py:

@dataclass
class PortalCaller:
    user_id: uuid.UUID
    role: str
    tenant_id: uuid.UUID | None = None

async def get_portal_caller(...) -> PortalCaller
async def require_tenant_member(tenant_id: UUID, caller: PortalCaller, session: AsyncSession) -> None
async def require_tenant_admin(tenant_id: UUID, caller: PortalCaller, session: AsyncSession) -> None

From packages/orchestrator/orchestrator/tasks.py:

# handle_message pops extras before model_validate:
# placeholder_ts, channel_id, phone_number_id, bot_token
# ADD: conversation_id, portal_user_id, tenant_id (for web)

# _send_response routes by channel_str:
# "slack" -> _update_slack_placeholder
# "whatsapp" -> send_whatsapp_message
# ADD: "web" -> Redis pub-sub publish

# _build_response_extras builds channel-specific extras dict
# ADD: "web" case returning conversation_id + tenant_id

From packages/shared/shared/api/init.py:

# Current routers mounted on gateway:
# portal_router, billing_router, channels_router, llm_keys_router,
# usage_router, webhook_router, invitations_router, templates_router
# ADD: chat_router

From packages/gateway/gateway/main.py:

# CORS allows: localhost:3000, 127.0.0.1:3000, 100.64.0.10:3000
# WebSocket doesn't use CORS (browser doesn't enforce) but same origin rules apply
# Include chat_router and WebSocket router here
Task 1: Backend models, migration, channel type, Redis key, and unit tests packages/shared/shared/models/message.py, packages/shared/shared/redis_keys.py, packages/shared/shared/models/chat.py, migrations/versions/008_web_chat.py, tests/unit/test_web_channel.py, tests/unit/test_chat_api.py - test_normalize_web_event: normalize_web_event({text, tenant_id, agent_id, user_id, conversation_id}) -> KonstructMessage with channel=WEB, thread_id=conversation_id, sender.user_id=portal_user_id - test_send_response_web_publishes_to_redis: _send_response("web", "hello", {conversation_id, tenant_id}) publishes JSON to Redis channel matching webchat_response_key(tenant_id, conversation_id) - test_typing_indicator_sent: WebSocket handler sends {"type": "typing"} immediately after receiving user message, before Celery dispatch - test_chat_rbac_enforcement: GET /api/portal/chat/conversations?tenant_id=X returns 403 when caller is not a member of tenant X - test_platform_admin_cross_tenant: GET /api/portal/chat/conversations?tenant_id=X returns 200 when caller is platform_admin (bypasses membership) - test_list_conversation_history: GET /api/portal/chat/conversations/{id}/messages returns paginated messages ordered by created_at - test_create_conversation: POST /api/portal/chat/conversations with {tenant_id, agent_id} creates or returns existing conversation for user+agent pair 1. Add WEB = "web" to ChannelType in packages/shared/shared/models/message.py
2. Add webchat_response_key(tenant_id, conversation_id) to packages/shared/shared/redis_keys.py following existing pattern: return f"{tenant_id}:webchat:response:{conversation_id}"

3. Create packages/shared/shared/models/chat.py with ORM models:
   - WebConversation: id (UUID PK), tenant_id (UUID, FK tenants.id), agent_id (UUID, FK agents.id), user_id (UUID, FK portal_users.id), created_at, updated_at. UniqueConstraint on (tenant_id, agent_id, user_id). RLS via tenant_id.
   - WebConversationMessage: id (UUID PK), conversation_id (UUID, FK web_conversations.id ON DELETE CASCADE), tenant_id (UUID), role (TEXT, CHECK "user"/"assistant"), content (TEXT), created_at. RLS via tenant_id.
   Use mapped_column() + Mapped[] (SQLAlchemy 2.0 pattern, not Column()).

4. Create migration 008_web_chat.py:
   - Create web_conversations table with columns matching ORM model
   - Create web_conversation_messages table with FK to web_conversations
   - Enable RLS on both tables (FORCE ROW LEVEL SECURITY)
   - Create RLS policies matching existing pattern (current_setting('app.current_tenant')::uuid)
   - ALTER CHECK constraint on channel_connections.channel_type to include 'web' (see Pitfall 5 in RESEARCH.md — the existing CHECK must be replaced, not just added to)
   - Add index on web_conversation_messages(conversation_id, created_at)

5. Write test files FIRST (RED phase):
   - tests/unit/test_web_channel.py: test normalize_web_event, test _send_response web publishes to Redis (mock aioredis), test typing indicator
   - tests/unit/test_chat_api.py: test RBAC enforcement (403 for non-member), platform admin cross-tenant (200), list history (paginated), create conversation (get-or-create)
   Use httpx AsyncClient with app fixture pattern from existing tests. Mock DB sessions and Redis.

IMPORTANT: Celery tasks MUST be sync def with asyncio.run() — never async def (hard architectural constraint).
IMPORTANT: Use TEXT+CHECK for role column (not sa.Enum) per Phase 1 convention.
IMPORTANT: _send_response "web" case must use try/finally around aioredis.from_url() to avoid connection leaks (Pitfall 2 from RESEARCH.md).
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x -v ChannelType.WEB exists. webchat_response_key function exists. ORM models define web_conversations and web_conversation_messages. Migration 008 creates both tables with RLS and updates channel_type CHECK constraint. All test assertions pass (RED then GREEN). Task 2: WebSocket endpoint, web channel adapter, REST API, orchestrator wiring packages/gateway/gateway/channels/web.py, packages/shared/shared/api/chat.py, packages/shared/shared/api/__init__.py, packages/gateway/gateway/main.py, packages/orchestrator/orchestrator/tasks.py 1. Create packages/gateway/gateway/channels/web.py with: a. normalize_web_event() function: takes dict with {text, tenant_id, agent_id, user_id, display_name, conversation_id} and returns KonstructMessage with channel=ChannelType.WEB, thread_id=conversation_id, sender.user_id=user_id (portal user UUID string), channel_metadata={portal_user_id, tenant_id, conversation_id} b. WebSocket endpoint at /chat/ws/{conversation_id}: - Accept connection - Wait for first JSON message with type="auth" containing {userId, role, tenantId} (browser cannot send custom headers — Pitfall 1 from RESEARCH.md) - Validate auth: userId must be non-empty UUID string, role must be valid - For each subsequent message (type="message"): * Immediately send {"type": "typing"} back to client (CHAT-05) * Normalize message to KonstructMessage via normalize_web_event * Save user message to web_conversation_messages table * Build extras dict: conversation_id, portal_user_id, tenant_id * Dispatch handle_message.delay(msg.model_dump() | extras) * Subscribe to Redis pub-sub channel webchat_response_key(tenant_id, conversation_id) with 60s timeout * When response arrives: save assistant message to web_conversation_messages, send {"type": "response", "text": ..., "conversation_id": ...} to WebSocket - On disconnect: unsubscribe and close Redis connections c. Create an APIRouter with the WebSocket route for mounting
2. Create packages/shared/shared/api/chat.py with REST endpoints:
   a. GET /api/portal/chat/conversations?tenant_id={id} — list conversations for the authenticated user within a tenant. For platform_admin: returns conversations across all tenants if no tenant_id. Uses require_tenant_member for RBAC. Returns [{id, agent_id, agent_name, updated_at, last_message_preview}] sorted by updated_at DESC.
   b. GET /api/portal/chat/conversations/{id}/messages?limit=50&before={cursor} — paginated message history. Verify caller owns the conversation (same user_id) OR is platform_admin. Returns [{id, role, content, created_at}] ordered by created_at ASC.
   c. POST /api/portal/chat/conversations — create or get-or-create conversation. Body: {tenant_id, agent_id}. Uses require_tenant_member. Returns conversation object with id.
   d. DELETE /api/portal/chat/conversations/{id} — reset conversation (delete messages, keep row). Updates updated_at. Verify ownership.
   All endpoints use Depends(get_portal_caller) and Depends(get_session). Set RLS context var (configure_rls_hook + current_tenant_id.set) before DB queries.

3. Update packages/shared/shared/api/__init__.py: add chat_router to imports and __all__

4. Update packages/gateway/gateway/main.py:
   - Import chat_router from shared.api and web channel router from gateway.channels.web
   - app.include_router(chat_router) for REST endpoints
   - app.include_router(web_chat_router) for WebSocket endpoint
   - Add comment block "Phase 6 Web Chat routers"

5. Update packages/orchestrator/orchestrator/tasks.py:
   a. In handle_message: pop "conversation_id" and "portal_user_id" before model_validate (same pattern as placeholder_ts, channel_id). Add to extras dict.
   b. In _build_response_extras: add "web" case returning {"conversation_id": extras.get("conversation_id"), "tenant_id": extras.get("tenant_id")}. Note: tenant_id for web comes from extras, not from channel_metadata like Slack.
   c. In _send_response: add "web" case that publishes to Redis pub-sub:
      ```python
      elif channel_str == "web":
          conversation_id = extras.get("conversation_id", "")
          tenant_id = extras.get("tenant_id", "")
          if not conversation_id or not tenant_id:
              logger.warning("_send_response: web channel missing conversation_id or tenant_id")
              return
          response_channel = webchat_response_key(tenant_id, conversation_id)
          publish_redis = aioredis.from_url(settings.redis_url)
          try:
              await publish_redis.publish(response_channel, json.dumps({
                  "type": "response", "text": text, "conversation_id": conversation_id,
              }))
          finally:
              await publish_redis.aclose()
      ```
   d. Import webchat_response_key from shared.redis_keys at module level (matches existing import pattern for other keys)

IMPORTANT: WebSocket auth via JSON message after connection (NOT URL params or headers — browser limitation).
IMPORTANT: Redis pub-sub subscribe in WebSocket handler must use try/finally for cleanup (Pitfall 2).
IMPORTANT: The web normalizer must set thread_id = conversation_id (Pitfall 3 — conversation ID scopes memory correctly).
IMPORTANT: For DB access in WebSocket handler, use configure_rls_hook + current_tenant_id context var per existing pattern.
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x -v WebSocket endpoint at /chat/ws/{conversation_id} accepts connections, authenticates via JSON message, dispatches to Celery, subscribes to Redis for response. REST API provides conversation CRUD with RBAC. Orchestrator _send_response handles "web" channel via Redis pub-sub publish. All unit tests pass. Gateway mounts both routers. 1. All unit tests pass: `pytest tests/unit/test_web_channel.py tests/unit/test_chat_api.py -x` 2. Migration 008 applies cleanly: `cd /home/adelorenzo/repos/konstruct && alembic upgrade head` 3. Gateway starts without errors: `cd /home/adelorenzo/repos/konstruct/packages/gateway && python -c "from gateway.main import app; print('OK')"` 4. Full test suite still green: `pytest tests/unit -x`

<success_criteria>

  • ChannelType includes WEB
  • WebSocket endpoint exists at /chat/ws/{conversation_id}
  • REST API at /api/portal/chat/* provides conversation CRUD with RBAC
  • _send_response in tasks.py handles "web" channel via Redis pub-sub
  • web_conversations and web_conversation_messages tables created with RLS
  • All 7+ unit tests pass covering CHAT-01 through CHAT-05 </success_criteria>
After completion, create `.planning/phases/06-web-chat/06-01-SUMMARY.md`