"""Phase 2: conversation_embeddings table with HNSW index and RLS Revision ID: 002 Revises: 001 Create Date: 2026-03-23 This migration adds the conversation_embeddings table for the long-term conversational memory system. It stores pgvector embeddings of past conversation turns for semantic similarity retrieval. Key design decisions: 1. pgvector extension is enabled (CREATE EXTENSION IF NOT EXISTS vector) 2. HNSW index with m=16, ef_construction=64 for approximate nearest neighbor search — cosine distance operator (vector_cosine_ops) 3. Covering index on (tenant_id, agent_id, user_id, created_at DESC) for pre-filtering before ANN search 4. RLS with FORCE — tenant_id isolation enforced at DB level 5. GRANT SELECT, INSERT only — embeddings are immutable (no UPDATE/DELETE) This models conversation history as an append-only audit log """ from __future__ import annotations from typing import Sequence, Union import sqlalchemy as sa from alembic import op from sqlalchemy.dialects.postgresql import UUID # revision identifiers, used by Alembic. revision: str = "002" down_revision: Union[str, None] = "001" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # ------------------------------------------------------------------------- # 1. Enable pgvector extension (idempotent) # ------------------------------------------------------------------------- op.execute("CREATE EXTENSION IF NOT EXISTS vector") # ------------------------------------------------------------------------- # 2. Create conversation_embeddings table # ------------------------------------------------------------------------- op.create_table( "conversation_embeddings", sa.Column( "id", UUID(as_uuid=True), primary_key=True, server_default=sa.text("gen_random_uuid()"), ), sa.Column( "tenant_id", UUID(as_uuid=True), sa.ForeignKey("tenants.id", ondelete="CASCADE"), nullable=False, ), sa.Column( "agent_id", UUID(as_uuid=True), nullable=False, ), sa.Column( "user_id", sa.Text, nullable=False, comment="Channel-native user identifier (e.g. Slack user ID U12345)", ), sa.Column( "content", sa.Text, nullable=False, comment="Original message text that was embedded", ), sa.Column( "role", sa.Text, nullable=False, comment="Message role: 'user' or 'assistant'", ), sa.Column( "created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.text("NOW()"), ), # The embedding column uses pgvector type — created via raw DDL below # because SQLAlchemy doesn't know the 'vector' type without pgvector extension ) # Add embedding column as vector(384) — must be raw DDL for pgvector type op.execute( "ALTER TABLE conversation_embeddings " "ADD COLUMN embedding vector(384) NOT NULL" ) # ------------------------------------------------------------------------- # 3. Create covering index for pre-filter (tenant + agent + user + time) # Used to scope queries before the ANN operator for isolation + performance # ------------------------------------------------------------------------- op.create_index( "ix_conv_embed_tenant_agent_user_time", "conversation_embeddings", ["tenant_id", "agent_id", "user_id", "created_at"], postgresql_ops={"created_at": "DESC"}, ) # ------------------------------------------------------------------------- # 4. Create HNSW index for approximate nearest neighbor cosine search # m=16: number of bidirectional links per node (quality vs. memory tradeoff) # ef_construction=64: search width during build (quality vs. speed) # vector_cosine_ops: uses cosine distance (compatible with <=> operator) # ------------------------------------------------------------------------- op.execute(""" CREATE INDEX ix_conv_embed_hnsw ON conversation_embeddings USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64) """) # ------------------------------------------------------------------------- # 5. Apply Row Level Security # FORCE ensures even the table owner cannot bypass tenant isolation # ------------------------------------------------------------------------- op.execute("ALTER TABLE conversation_embeddings ENABLE ROW LEVEL SECURITY") op.execute("ALTER TABLE conversation_embeddings FORCE ROW LEVEL SECURITY") op.execute(""" CREATE POLICY tenant_isolation ON conversation_embeddings USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid) """) # ------------------------------------------------------------------------- # 6. Grant permissions to konstruct_app role # SELECT + INSERT only — embeddings are immutable (no UPDATE or DELETE) # This models conversation history as an append-only audit log # ------------------------------------------------------------------------- op.execute("GRANT SELECT, INSERT ON conversation_embeddings TO konstruct_app") def downgrade() -> None: op.execute("REVOKE ALL ON conversation_embeddings FROM konstruct_app") op.drop_table("conversation_embeddings") # Note: We do NOT drop the vector extension — other tables may use it