Files
konstruct/migrations/versions/004_phase2_audit_kb.py
Adolfo Delorenzo 44fa7e6845 feat(02-02): wire tool-call loop into agent runner and orchestrator pipeline
- runner.py: multi-turn tool-call loop (LLM -> tool -> observe -> respond)
- runner.py: max 5 iterations guard against runaway tool chains
- runner.py: confirmation gate — returns confirmation msg, stops loop
- runner.py: audit logging for every LLM call via audit_logger
- tasks.py: AuditLogger initialized at task start with session factory
- tasks.py: tool registry built from agent.tool_assignments
- tasks.py: pending tool confirmation flow via Redis (10 min TTL)
- tasks.py: memory persistence skipped for confirmation request responses
- llm-pool/router.py: LLMResponse model with content + tool_calls fields
- llm-pool/router.py: tools parameter forwarded to litellm.acompletion()
- llm-pool/main.py: CompleteRequest accepts optional tools list
- llm-pool/main.py: CompleteResponse includes tool_calls field
- Migration renamed to 004 (003 was already taken by escalation migration)
- [Rule 1 - Bug] Renamed 003_phase2_audit_kb.py -> 004 to fix duplicate revision ID
2026-03-23 15:00:17 -06:00

239 lines
7.8 KiB
Python

"""Phase 2: audit_events table (immutable) and kb_documents/kb_chunks tables
Revision ID: 004
Revises: 003
Create Date: 2026-03-23
This migration adds:
1. audit_events — append-only audit trail for all agent actions
- REVOKE UPDATE, DELETE from konstruct_app (immutability enforced at DB level)
- GRANT SELECT, INSERT only
- RLS for tenant isolation
- Composite index on (tenant_id, created_at DESC) for efficient queries
2. kb_documents and kb_chunks — knowledge base storage
- kb_chunks has a vector(384) embedding column
- HNSW index for approximate nearest neighbor cosine search
- Full CRUD grants for kb tables (mutable)
- RLS on both tables
Key design decision: audit_events immutability is enforced at the DB level via
REVOKE. Even if application code attempts an UPDATE or DELETE, PostgreSQL will
reject it with a permission error. This provides a hard compliance guarantee
that the audit trail cannot be tampered with via the application role.
"""
from __future__ import annotations
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB, UUID
# revision identifiers, used by Alembic.
revision: str = "004"
down_revision: Union[str, None] = "003"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# =========================================================================
# 1. audit_events — immutable audit trail
# =========================================================================
op.create_table(
"audit_events",
sa.Column(
"id",
UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column(
"tenant_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column(
"agent_id",
UUID(as_uuid=True),
nullable=True,
),
sa.Column(
"user_id",
sa.Text,
nullable=True,
),
sa.Column(
"action_type",
sa.Text,
nullable=False,
comment="llm_call | tool_invocation | escalation",
),
sa.Column(
"input_summary",
sa.Text,
nullable=True,
),
sa.Column(
"output_summary",
sa.Text,
nullable=True,
),
sa.Column(
"latency_ms",
sa.Integer,
nullable=True,
),
sa.Column(
"metadata",
JSONB,
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
)
# Index for efficient per-tenant queries ordered by time (most recent first)
op.create_index(
"ix_audit_events_tenant_created",
"audit_events",
["tenant_id", "created_at"],
postgresql_ops={"created_at": "DESC"},
)
# Apply Row Level Security
op.execute("ALTER TABLE audit_events ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE audit_events FORCE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_isolation ON audit_events
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
""")
# Grant SELECT + INSERT only — immutability enforced by revoking UPDATE/DELETE
op.execute("GRANT SELECT, INSERT ON audit_events TO konstruct_app")
op.execute("REVOKE UPDATE, DELETE ON audit_events FROM konstruct_app")
# =========================================================================
# 2. kb_documents — knowledge base document metadata
# =========================================================================
op.create_table(
"kb_documents",
sa.Column(
"id",
UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column(
"tenant_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column(
"agent_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column("filename", sa.Text, nullable=True),
sa.Column("source_url", sa.Text, nullable=True),
sa.Column("content_type", sa.Text, nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
)
op.create_index("ix_kb_documents_tenant", "kb_documents", ["tenant_id"])
op.create_index("ix_kb_documents_agent", "kb_documents", ["agent_id"])
# Apply Row Level Security on kb_documents
op.execute("ALTER TABLE kb_documents ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE kb_documents FORCE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_isolation ON kb_documents
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
""")
op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_documents TO konstruct_app")
# =========================================================================
# 3. kb_chunks — chunked text with vector embeddings
# =========================================================================
op.create_table(
"kb_chunks",
sa.Column(
"id",
UUID(as_uuid=True),
primary_key=True,
server_default=sa.text("gen_random_uuid()"),
),
sa.Column(
"tenant_id",
UUID(as_uuid=True),
nullable=False,
),
sa.Column(
"document_id",
UUID(as_uuid=True),
sa.ForeignKey("kb_documents.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("content", sa.Text, nullable=False),
sa.Column("chunk_index", sa.Integer, nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("NOW()"),
),
# embedding column added via raw DDL below (pgvector type)
)
# Add embedding column as vector(384) — raw DDL required for pgvector type
op.execute("ALTER TABLE kb_chunks ADD COLUMN embedding vector(384) NOT NULL DEFAULT array_fill(0, ARRAY[384])::vector")
# Remove the default after adding — embeddings must be explicitly provided
op.execute("ALTER TABLE kb_chunks ALTER COLUMN embedding DROP DEFAULT")
op.create_index("ix_kb_chunks_tenant", "kb_chunks", ["tenant_id"])
op.create_index("ix_kb_chunks_document", "kb_chunks", ["document_id"])
# HNSW index for approximate nearest-neighbor cosine search
op.execute("""
CREATE INDEX ix_kb_chunks_hnsw
ON kb_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
""")
# Apply Row Level Security on kb_chunks
op.execute("ALTER TABLE kb_chunks ENABLE ROW LEVEL SECURITY")
op.execute("ALTER TABLE kb_chunks FORCE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_isolation ON kb_chunks
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
""")
op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_chunks TO konstruct_app")
def downgrade() -> None:
op.execute("REVOKE ALL ON kb_chunks FROM konstruct_app")
op.drop_table("kb_chunks")
op.execute("REVOKE ALL ON kb_documents FROM konstruct_app")
op.drop_table("kb_documents")
op.execute("REVOKE ALL ON audit_events FROM konstruct_app")
op.drop_table("audit_events")