- ConversationEmbedding ORM model with Vector(384) column (pgvector) - memory_short_key, escalation_status_key, pending_tool_confirm_key in redis_keys.py - orchestrator/memory/short_term.py: RPUSH/LTRIM sliding window (get_recent_messages, append_message) - orchestrator/memory/long_term.py: pgvector HNSW cosine search (retrieve_relevant, store_embedding) - Migration 002: conversation_embeddings table, HNSW index, RLS with FORCE, SELECT/INSERT only - 10 unit tests (fakeredis), 6 integration tests (pgvector) — all passing - Auto-fix [Rule 3]: postgres image updated to pgvector/pgvector:pg16 (extension required)
147 lines
5.6 KiB
Python
147 lines
5.6 KiB
Python
"""Phase 2: conversation_embeddings table with HNSW index and RLS
|
|
|
|
Revision ID: 002
|
|
Revises: 001
|
|
Create Date: 2026-03-23
|
|
|
|
This migration adds the conversation_embeddings table for the long-term
|
|
conversational memory system. It stores pgvector embeddings of past
|
|
conversation turns for semantic similarity retrieval.
|
|
|
|
Key design decisions:
|
|
1. pgvector extension is enabled (CREATE EXTENSION IF NOT EXISTS vector)
|
|
2. HNSW index with m=16, ef_construction=64 for approximate nearest neighbor
|
|
search — cosine distance operator (vector_cosine_ops)
|
|
3. Covering index on (tenant_id, agent_id, user_id, created_at DESC) for
|
|
pre-filtering before ANN search
|
|
4. RLS with FORCE — tenant_id isolation enforced at DB level
|
|
5. GRANT SELECT, INSERT only — embeddings are immutable (no UPDATE/DELETE)
|
|
This models conversation history as an append-only audit log
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Sequence, Union
|
|
|
|
import sqlalchemy as sa
|
|
from alembic import op
|
|
from sqlalchemy.dialects.postgresql import UUID
|
|
|
|
|
|
# revision identifiers, used by Alembic.
|
|
revision: str = "002"
|
|
down_revision: Union[str, None] = "001"
|
|
branch_labels: Union[str, Sequence[str], None] = None
|
|
depends_on: Union[str, Sequence[str], None] = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
# -------------------------------------------------------------------------
|
|
# 1. Enable pgvector extension (idempotent)
|
|
# -------------------------------------------------------------------------
|
|
op.execute("CREATE EXTENSION IF NOT EXISTS vector")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 2. Create conversation_embeddings table
|
|
# -------------------------------------------------------------------------
|
|
op.create_table(
|
|
"conversation_embeddings",
|
|
sa.Column(
|
|
"id",
|
|
UUID(as_uuid=True),
|
|
primary_key=True,
|
|
server_default=sa.text("gen_random_uuid()"),
|
|
),
|
|
sa.Column(
|
|
"tenant_id",
|
|
UUID(as_uuid=True),
|
|
sa.ForeignKey("tenants.id", ondelete="CASCADE"),
|
|
nullable=False,
|
|
),
|
|
sa.Column(
|
|
"agent_id",
|
|
UUID(as_uuid=True),
|
|
nullable=False,
|
|
),
|
|
sa.Column(
|
|
"user_id",
|
|
sa.Text,
|
|
nullable=False,
|
|
comment="Channel-native user identifier (e.g. Slack user ID U12345)",
|
|
),
|
|
sa.Column(
|
|
"content",
|
|
sa.Text,
|
|
nullable=False,
|
|
comment="Original message text that was embedded",
|
|
),
|
|
sa.Column(
|
|
"role",
|
|
sa.Text,
|
|
nullable=False,
|
|
comment="Message role: 'user' or 'assistant'",
|
|
),
|
|
sa.Column(
|
|
"created_at",
|
|
sa.DateTime(timezone=True),
|
|
nullable=False,
|
|
server_default=sa.text("NOW()"),
|
|
),
|
|
# The embedding column uses pgvector type — created via raw DDL below
|
|
# because SQLAlchemy doesn't know the 'vector' type without pgvector extension
|
|
)
|
|
|
|
# Add embedding column as vector(384) — must be raw DDL for pgvector type
|
|
op.execute(
|
|
"ALTER TABLE conversation_embeddings "
|
|
"ADD COLUMN embedding vector(384) NOT NULL"
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 3. Create covering index for pre-filter (tenant + agent + user + time)
|
|
# Used to scope queries before the ANN operator for isolation + performance
|
|
# -------------------------------------------------------------------------
|
|
op.create_index(
|
|
"ix_conv_embed_tenant_agent_user_time",
|
|
"conversation_embeddings",
|
|
["tenant_id", "agent_id", "user_id", "created_at"],
|
|
postgresql_ops={"created_at": "DESC"},
|
|
)
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 4. Create HNSW index for approximate nearest neighbor cosine search
|
|
# m=16: number of bidirectional links per node (quality vs. memory tradeoff)
|
|
# ef_construction=64: search width during build (quality vs. speed)
|
|
# vector_cosine_ops: uses cosine distance (compatible with <=> operator)
|
|
# -------------------------------------------------------------------------
|
|
op.execute("""
|
|
CREATE INDEX ix_conv_embed_hnsw
|
|
ON conversation_embeddings
|
|
USING hnsw (embedding vector_cosine_ops)
|
|
WITH (m = 16, ef_construction = 64)
|
|
""")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 5. Apply Row Level Security
|
|
# FORCE ensures even the table owner cannot bypass tenant isolation
|
|
# -------------------------------------------------------------------------
|
|
op.execute("ALTER TABLE conversation_embeddings ENABLE ROW LEVEL SECURITY")
|
|
op.execute("ALTER TABLE conversation_embeddings FORCE ROW LEVEL SECURITY")
|
|
op.execute("""
|
|
CREATE POLICY tenant_isolation ON conversation_embeddings
|
|
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
|
|
""")
|
|
|
|
# -------------------------------------------------------------------------
|
|
# 6. Grant permissions to konstruct_app role
|
|
# SELECT + INSERT only — embeddings are immutable (no UPDATE or DELETE)
|
|
# This models conversation history as an append-only audit log
|
|
# -------------------------------------------------------------------------
|
|
op.execute("GRANT SELECT, INSERT ON conversation_embeddings TO konstruct_app")
|
|
|
|
|
|
def downgrade() -> None:
|
|
op.execute("REVOKE ALL ON conversation_embeddings FROM konstruct_app")
|
|
op.drop_table("conversation_embeddings")
|
|
# Note: We do NOT drop the vector extension — other tables may use it
|