Files
konstruct/migrations/versions/003_phase2_audit_kb.py
Adolfo Delorenzo 30b9f60668 feat(02-02): audit model, KB model, migration, and audit logger
- AuditEvent ORM model with tenant_id, action_type, latency_ms, metadata
- KnowledgeBaseDocument and KBChunk ORM models for vector KB
- Migration 003: audit_events (immutable via REVOKE), kb_documents, kb_chunks
  with HNSW index and RLS on all tables
- AuditLogger with log_llm_call, log_tool_call, log_escalation methods
- audit_events immutability enforced at DB level (UPDATE/DELETE rejected)
- [Rule 1 - Bug] Fixed CAST(:metadata AS jsonb) for asyncpg compatibility
2026-03-23 14:50:51 -06:00

239 lines
7.8 KiB
Python

"""Phase 2: audit_events table (immutable) and kb_documents/kb_chunks tables
Revision ID: 003
Revises: 002
Create Date: 2026-03-23
This migration adds:
1. audit_events — append-only audit trail for all agent actions
- REVOKE UPDATE, DELETE from konstruct_app (immutability enforced at DB level)
- GRANT SELECT, INSERT only
- RLS for tenant isolation
- Composite index on (tenant_id, created_at DESC) for efficient queries
2. kb_documents and kb_chunks — knowledge base storage
- kb_chunks has a vector(384) embedding column
- HNSW index for approximate nearest neighbor cosine search
- Full CRUD grants for kb tables (mutable)
- RLS on both tables
Key design decision: audit_events immutability is enforced at the DB level via
REVOKE. Even if application code attempts an UPDATE or DELETE, PostgreSQL will
reject it with a permission error. This provides a hard compliance guarantee
that the audit trail cannot be tampered with via the application role.
"""
from __future__ import annotations
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB, UUID
# revision identifiers, used by Alembic.
revision: str = "003"
down_revision: Union[str, None] = "002"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# =========================================================================
# 1. audit_events — immutable audit trail
# =========================================================================
op.create_table(
"audit_events",
sa.Column(
"id",
UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column(
"tenant_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column(
"agent_id",
UUID(as_uuid=True),
nullable=True,
),
sa.Column(
"user_id",
sa.Text,
nullable=True,
),
sa.Column(
"action_type",
sa.Text,
nullable=False,
comment="llm_call | tool_invocation | escalation",
),
sa.Column(
"input_summary",
sa.Text,
nullable=True,
),
sa.Column(
"output_summary",
sa.Text,
nullable=True,
),
sa.Column(
"latency_ms",
sa.Integer,
nullable=True,
),
sa.Column(
"metadata",
JSONB,
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
)
# Index for efficient per-tenant queries ordered by time (most recent first)
op.create_index(
"ix_audit_events_tenant_created",
"audit_events",
["tenant_id", "created_at"],
postgresql_ops={"created_at": "DESC"},
)
# Apply Row Level Security
op.execute("ALTER TABLE audit_events ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE audit_events FORCE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_isolation ON audit_events
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
""")
# Grant SELECT + INSERT only — immutability enforced by revoking UPDATE/DELETE
op.execute("GRANT SELECT, INSERT ON audit_events TO konstruct_app")
op.execute("REVOKE UPDATE, DELETE ON audit_events FROM konstruct_app")
# =========================================================================
# 2. kb_documents — knowledge base document metadata
# =========================================================================
op.create_table(
"kb_documents",
sa.Column(
"id",
UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column(
"tenant_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column(
"agent_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column("filename", sa.Text, nullable=True),
sa.Column("source_url", sa.Text, nullable=True),
sa.Column("content_type", sa.Text, nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
)
op.create_index("ix_kb_documents_tenant", "kb_documents", ["tenant_id"])
op.create_index("ix_kb_documents_agent", "kb_documents", ["agent_id"])
# Apply Row Level Security on kb_documents
op.execute("ALTER TABLE kb_documents ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE kb_documents FORCE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_isolation ON kb_documents
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
""")
op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_documents TO konstruct_app")
# =========================================================================
# 3. kb_chunks — chunked text with vector embeddings
# =========================================================================
op.create_table(
"kb_chunks",
sa.Column(
"id",
UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column(
"tenant_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column(
"document_id",
UUID(as_uuid=True),
sa.ForeignKey("kb_documents.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("content", sa.Text, nullable=False),
sa.Column("chunk_index", sa.Integer, nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
# embedding column added via raw DDL below (pgvector type)
)
# Add embedding column as vector(384) — raw DDL required for pgvector type
op.execute("ALTER TABLE kb_chunks ADD COLUMN embedding vector(384) NOT NULL DEFAULT array_fill(0, ARRAY[384])::vector")
# Remove the default after adding — embeddings must be explicitly provided
op.execute("ALTER TABLE kb_chunks ALTER COLUMN embedding DROP DEFAULT")
op.create_index("ix_kb_chunks_tenant", "kb_chunks", ["tenant_id"])
op.create_index("ix_kb_chunks_document", "kb_chunks", ["document_id"])
# HNSW index for approximate nearest-neighbor cosine search
op.execute("""
CREATE INDEX ix_kb_chunks_hnsw
ON kb_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
# Apply Row Level Security on kb_chunks
op.execute("ALTER TABLE kb_chunks ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE kb_chunks FORCE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_isolation ON kb_chunks
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
""")
op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_chunks TO konstruct_app")
def downgrade() -> None:
op.execute("REVOKE ALL ON kb_chunks FROM konstruct_app")
op.drop_table("kb_chunks")
op.execute("REVOKE ALL ON kb_documents FROM konstruct_app")
op.drop_table("kb_documents")
op.execute("REVOKE ALL ON audit_events FROM konstruct_app")
op.drop_table("audit_events")