From 30b9f606689502b9554a4a4950a74f711cd97fa9 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 23 Mar 2026 14:50:51 -0600 Subject: [PATCH] feat(02-02): audit model, KB model, migration, and audit logger - AuditEvent ORM model with tenant_id, action_type, latency_ms, metadata - KnowledgeBaseDocument and KBChunk ORM models for vector KB - Migration 003: audit_events (immutable via REVOKE), kb_documents, kb_chunks with HNSW index and RLS on all tables - AuditLogger with log_llm_call, log_tool_call, log_escalation methods - audit_events immutability enforced at DB level (UPDATE/DELETE rejected) - [Rule 1 - Bug] Fixed CAST(:metadata AS jsonb) for asyncpg compatibility --- migrations/versions/003_phase2_audit_kb.py | 238 ++++++++++++++++++ .../orchestrator/audit/__init__.py | 1 + .../orchestrator/orchestrator/audit/logger.py | 224 +++++++++++++++++ packages/shared/shared/models/audit.py | 99 ++++++++ packages/shared/shared/models/kb.py | 134 ++++++++++ 5 files changed, 696 insertions(+) create mode 100644 migrations/versions/003_phase2_audit_kb.py create mode 100644 packages/orchestrator/orchestrator/audit/__init__.py create mode 100644 packages/orchestrator/orchestrator/audit/logger.py create mode 100644 packages/shared/shared/models/audit.py create mode 100644 packages/shared/shared/models/kb.py diff --git a/migrations/versions/003_phase2_audit_kb.py b/migrations/versions/003_phase2_audit_kb.py new file mode 100644 index 0000000..5925567 --- /dev/null +++ b/migrations/versions/003_phase2_audit_kb.py @@ -0,0 +1,238 @@ +"""Phase 2: audit_events table (immutable) and kb_documents/kb_chunks tables + +Revision ID: 003 +Revises: 002 +Create Date: 2026-03-23 + +This migration adds: +1. audit_events — append-only audit trail for all agent actions + - REVOKE UPDATE, DELETE from konstruct_app (immutability enforced at DB level) + - GRANT SELECT, INSERT only + - RLS for tenant isolation + - Composite index on (tenant_id, created_at DESC) for efficient queries + +2. kb_documents and kb_chunks — knowledge base storage + - kb_chunks has a vector(384) embedding column + - HNSW index for approximate nearest neighbor cosine search + - Full CRUD grants for kb tables (mutable) + - RLS on both tables + +Key design decision: audit_events immutability is enforced at the DB level via +REVOKE. Even if application code attempts an UPDATE or DELETE, PostgreSQL will +reject it with a permission error. This provides a hard compliance guarantee +that the audit trail cannot be tampered with via the application role. +""" + +from __future__ import annotations + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects.postgresql import JSONB, UUID + + +# revision identifiers, used by Alembic. +revision: str = "003" +down_revision: Union[str, None] = "002" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ========================================================================= + # 1. audit_events — immutable audit trail + # ========================================================================= + op.create_table( + "audit_events", + sa.Column( + "id", + UUID(as_uuid=True), + primary_key=True, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column( + "tenant_id", + UUID(as_uuid=True), + nullable=False, + ), + sa.Column( + "agent_id", + UUID(as_uuid=True), + nullable=True, + ), + sa.Column( + "user_id", + sa.Text, + nullable=True, + ), + sa.Column( + "action_type", + sa.Text, + nullable=False, + comment="llm_call | tool_invocation | escalation", + ), + sa.Column( + "input_summary", + sa.Text, + nullable=True, + ), + sa.Column( + "output_summary", + sa.Text, + nullable=True, + ), + sa.Column( + "latency_ms", + sa.Integer, + nullable=True, + ), + sa.Column( + "metadata", + JSONB, + nullable=False, + server_default=sa.text("'{}'::jsonb"), + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + ) + + # Index for efficient per-tenant queries ordered by time (most recent first) + op.create_index( + "ix_audit_events_tenant_created", + "audit_events", + ["tenant_id", "created_at"], + postgresql_ops={"created_at": "DESC"}, + ) + + # Apply Row Level Security + op.execute("ALTER TABLE audit_events ENABLE ROW LEVEL SECURITY") + op.execute("ALTER TABLE audit_events FORCE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY tenant_isolation ON audit_events + USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid) + """) + + # Grant SELECT + INSERT only — immutability enforced by revoking UPDATE/DELETE + op.execute("GRANT SELECT, INSERT ON audit_events TO konstruct_app") + op.execute("REVOKE UPDATE, DELETE ON audit_events FROM konstruct_app") + + # ========================================================================= + # 2. kb_documents — knowledge base document metadata + # ========================================================================= + op.create_table( + "kb_documents", + sa.Column( + "id", + UUID(as_uuid=True), + primary_key=True, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column( + "tenant_id", + UUID(as_uuid=True), + nullable=False, + ), + sa.Column( + "agent_id", + UUID(as_uuid=True), + nullable=False, + ), + sa.Column("filename", sa.Text, nullable=True), + sa.Column("source_url", sa.Text, nullable=True), + sa.Column("content_type", sa.Text, nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + ) + + op.create_index("ix_kb_documents_tenant", "kb_documents", ["tenant_id"]) + op.create_index("ix_kb_documents_agent", "kb_documents", ["agent_id"]) + + # Apply Row Level Security on kb_documents + op.execute("ALTER TABLE kb_documents ENABLE ROW LEVEL SECURITY") + op.execute("ALTER TABLE kb_documents FORCE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY tenant_isolation ON kb_documents + USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid) + """) + + op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_documents TO konstruct_app") + + # ========================================================================= + # 3. kb_chunks — chunked text with vector embeddings + # ========================================================================= + op.create_table( + "kb_chunks", + sa.Column( + "id", + UUID(as_uuid=True), + primary_key=True, + server_default=sa.text("gen_random_uuid()"), + ), + sa.Column( + "tenant_id", + UUID(as_uuid=True), + nullable=False, + ), + sa.Column( + "document_id", + UUID(as_uuid=True), + sa.ForeignKey("kb_documents.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("content", sa.Text, nullable=False), + sa.Column("chunk_index", sa.Integer, nullable=True), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("NOW()"), + ), + # embedding column added via raw DDL below (pgvector type) + ) + + # Add embedding column as vector(384) — raw DDL required for pgvector type + op.execute("ALTER TABLE kb_chunks ADD COLUMN embedding vector(384) NOT NULL DEFAULT array_fill(0, ARRAY[384])::vector") + + # Remove the default after adding — embeddings must be explicitly provided + op.execute("ALTER TABLE kb_chunks ALTER COLUMN embedding DROP DEFAULT") + + op.create_index("ix_kb_chunks_tenant", "kb_chunks", ["tenant_id"]) + op.create_index("ix_kb_chunks_document", "kb_chunks", ["document_id"]) + + # HNSW index for approximate nearest-neighbor cosine search + op.execute(""" + CREATE INDEX ix_kb_chunks_hnsw + ON kb_chunks + USING hnsw (embedding vector_cosine_ops) + WITH (m = 16, ef_construction = 64) + """) + + # Apply Row Level Security on kb_chunks + op.execute("ALTER TABLE kb_chunks ENABLE ROW LEVEL SECURITY") + op.execute("ALTER TABLE kb_chunks FORCE ROW LEVEL SECURITY") + op.execute(""" + CREATE POLICY tenant_isolation ON kb_chunks + USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid) + """) + + op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_chunks TO konstruct_app") + + +def downgrade() -> None: + op.execute("REVOKE ALL ON kb_chunks FROM konstruct_app") + op.drop_table("kb_chunks") + + op.execute("REVOKE ALL ON kb_documents FROM konstruct_app") + op.drop_table("kb_documents") + + op.execute("REVOKE ALL ON audit_events FROM konstruct_app") + op.drop_table("audit_events") diff --git a/packages/orchestrator/orchestrator/audit/__init__.py b/packages/orchestrator/orchestrator/audit/__init__.py new file mode 100644 index 0000000..86f0c28 --- /dev/null +++ b/packages/orchestrator/orchestrator/audit/__init__.py @@ -0,0 +1 @@ +"""Audit logging subsystem for the Konstruct Agent Orchestrator.""" diff --git a/packages/orchestrator/orchestrator/audit/logger.py b/packages/orchestrator/orchestrator/audit/logger.py new file mode 100644 index 0000000..e928c32 --- /dev/null +++ b/packages/orchestrator/orchestrator/audit/logger.py @@ -0,0 +1,224 @@ +""" +Immutable audit event writer for the Konstruct Agent Orchestrator. + +AuditLogger writes to the audit_events table. The table is protected at the DB +level: the konstruct_app role can only SELECT and INSERT — UPDATE and DELETE are +revoked. This ensures the audit trail is tamper-proof even if application code +contains a bug that would otherwise modify audit records. + +Usage: + from shared.db import async_session_factory + from orchestrator.audit.logger import AuditLogger + + audit_logger = AuditLogger(session_factory=async_session_factory) + + await audit_logger.log_llm_call( + tenant_id=tenant_uuid, + agent_id=agent_uuid, + user_id="U12345", + input_summary="What is the weather?", + output_summary="It is sunny today.", + latency_ms=350, + ) +""" + +from __future__ import annotations + +import json +import logging +import uuid +from typing import Any + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from shared.rls import current_tenant_id + +logger = logging.getLogger(__name__) + +# Maximum length for input/output summaries stored in audit trail +_SUMMARY_MAX_LEN = 500 + + +def _truncate(value: str, max_len: int = _SUMMARY_MAX_LEN) -> str: + """Truncate a string to max_len characters with ellipsis indicator.""" + if len(value) <= max_len: + return value + return value[:max_len] + "…" + + +class AuditLogger: + """ + Writes immutable audit events to the audit_events table. + + The RLS context (current_tenant_id ContextVar) must be set by the caller + before invoking any log method. This ensures each insert is scoped to the + correct tenant. + + Args: + session_factory: An async_sessionmaker configured with the application + engine. The logger creates a fresh session per write + to avoid transaction entanglement with the caller. + """ + + def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None: + self._session_factory = session_factory + + async def _write_event( + self, + tenant_id: uuid.UUID, + agent_id: uuid.UUID | None, + user_id: str | None, + action_type: str, + input_summary: str | None, + output_summary: str | None, + latency_ms: int | None, + metadata: dict[str, Any], + ) -> None: + """ + Internal: write a single audit event row. + + Uses raw INSERT to avoid SQLAlchemy ORM session tracking — audit events + should never be accidentally queued for update/delete by the ORM. + """ + tenant_token = current_tenant_id.set(tenant_id) + try: + async with self._session_factory() as session: + await session.execute( + text(""" + INSERT INTO audit_events + (tenant_id, agent_id, user_id, action_type, + input_summary, output_summary, latency_ms, metadata) + VALUES + (:tenant_id, :agent_id, :user_id, :action_type, + :input_summary, :output_summary, :latency_ms, CAST(:metadata AS jsonb)) + """), + { + "tenant_id": str(tenant_id), + "agent_id": str(agent_id) if agent_id else None, + "user_id": user_id, + "action_type": action_type, + "input_summary": input_summary, + "output_summary": output_summary, + "latency_ms": latency_ms, + "metadata": json.dumps(metadata), + }, + ) + await session.commit() + except Exception: + logger.exception( + "Failed to write audit event: action=%s tenant=%s", + action_type, + tenant_id, + ) + raise + finally: + current_tenant_id.reset(tenant_token) + + async def log_llm_call( + self, + tenant_id: uuid.UUID, + agent_id: uuid.UUID | None, + user_id: str | None, + input_summary: str, + output_summary: str, + latency_ms: int, + metadata: dict[str, Any] | None = None, + ) -> None: + """ + Record an LLM completion call. + + Args: + tenant_id: Tenant this call belongs to. + agent_id: Agent that made the LLM call. + user_id: End-user identifier from the channel. + input_summary: Truncated representation of the messages sent to LLM. + output_summary: Truncated LLM response content. + latency_ms: Round-trip duration in milliseconds. + metadata: Optional dict (model name, token counts, etc.). + """ + await self._write_event( + tenant_id=tenant_id, + agent_id=agent_id, + user_id=user_id, + action_type="llm_call", + input_summary=_truncate(input_summary), + output_summary=_truncate(output_summary), + latency_ms=latency_ms, + metadata=metadata or {}, + ) + + async def log_tool_call( + self, + tool_name: str, + args: dict[str, Any], + result: str | None, + tenant_id: uuid.UUID, + agent_id: uuid.UUID | None, + latency_ms: int, + error: str | None = None, + ) -> None: + """ + Record a tool invocation. + + Args: + tool_name: Name of the invoked tool (e.g. "web_search"). + args: Arguments passed to the tool (will be JSON-serialized). + result: Tool result string, or None if an error occurred. + tenant_id: Tenant this invocation belongs to. + agent_id: Agent that invoked the tool. + latency_ms: Tool execution duration in milliseconds. + error: Error message if the tool failed, else None. + """ + # Summarize args as truncated JSON for the audit trail + try: + args_str = json.dumps(args, ensure_ascii=False) + except Exception: + args_str = repr(args) + + input_summary = _truncate(f"{tool_name}({args_str})") + output_summary = _truncate(error or result or "") + + metadata: dict[str, Any] = {"tool_name": tool_name} + if error: + metadata["error"] = error + + await self._write_event( + tenant_id=tenant_id, + agent_id=agent_id, + user_id=None, + action_type="tool_invocation", + input_summary=input_summary, + output_summary=output_summary, + latency_ms=latency_ms, + metadata=metadata, + ) + + async def log_escalation( + self, + tenant_id: uuid.UUID, + agent_id: uuid.UUID | None, + user_id: str | None, + trigger_reason: str, + metadata: dict[str, Any] | None = None, + ) -> None: + """ + Record an agent escalation (handoff to human or another agent). + + Args: + tenant_id: Tenant this escalation belongs to. + agent_id: Agent that triggered the escalation. + user_id: End-user identifier. + trigger_reason: Human-readable description of why escalation occurred. + metadata: Optional additional context. + """ + await self._write_event( + tenant_id=tenant_id, + agent_id=agent_id, + user_id=user_id, + action_type="escalation", + input_summary=_truncate(trigger_reason), + output_summary=None, + latency_ms=None, + metadata=metadata or {}, + ) diff --git a/packages/shared/shared/models/audit.py b/packages/shared/shared/models/audit.py new file mode 100644 index 0000000..8813f8b --- /dev/null +++ b/packages/shared/shared/models/audit.py @@ -0,0 +1,99 @@ +""" +SQLAlchemy 2.0 ORM model for the immutable audit_events table. + +Design: + - Append-only: konstruct_app role has SELECT + INSERT only (enforced via REVOKE in migration) + - Tenant-scoped via RLS — every query sees only the current tenant's rows + - action_type discriminates between llm_call, tool_invocation, and escalation events + +Important: The DB-level REVOKE UPDATE/DELETE on audit_events means that even if +application code accidentally attempts an UPDATE or DELETE, the database will reject it. +This is a hard compliance guarantee. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Any + +from sqlalchemy import DateTime, Integer, Text, func +from sqlalchemy.dialects.postgresql import JSONB, UUID +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +class AuditBase(DeclarativeBase): + """Separate declarative base for audit models to avoid conflicts with tenant Base.""" + + pass + + +class AuditEvent(AuditBase): + """ + Immutable record of every LLM call, tool invocation, and escalation event. + + RLS is enabled — rows are scoped to the current tenant via app.current_tenant. + The konstruct_app role has SELECT + INSERT only — UPDATE and DELETE are revoked. + + action_type values: + 'llm_call' — LLM completion request/response + 'tool_invocation' — Tool execution (success or failure) + 'escalation' — Agent handoff to human or another agent + """ + + __tablename__ = "audit_events" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + primary_key=True, + server_default=func.gen_random_uuid(), + ) + tenant_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + nullable=False, + index=True, + ) + agent_id: Mapped[uuid.UUID | None] = mapped_column( + UUID(as_uuid=True), + nullable=True, + ) + user_id: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="Channel-native user identifier", + ) + action_type: Mapped[str] = mapped_column( + Text, + nullable=False, + comment="llm_call | tool_invocation | escalation", + ) + input_summary: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="Truncated input for audit readability (not full content)", + ) + output_summary: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="Truncated output for audit readability", + ) + latency_ms: Mapped[int | None] = mapped_column( + Integer, + nullable=True, + comment="Duration of the operation in milliseconds", + ) + metadata: Mapped[dict[str, Any]] = mapped_column( + JSONB, + nullable=False, + server_default="{}", + default=dict, + comment="Additional structured context (model name, tool args hash, etc.)", + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + server_default=func.now(), + ) + + def __repr__(self) -> str: + return f"" diff --git a/packages/shared/shared/models/kb.py b/packages/shared/shared/models/kb.py new file mode 100644 index 0000000..ce956e6 --- /dev/null +++ b/packages/shared/shared/models/kb.py @@ -0,0 +1,134 @@ +""" +SQLAlchemy 2.0 ORM models for the Knowledge Base tables. + +Tables: + kb_documents — uploaded documents belonging to a tenant/agent + kb_chunks — text chunks with vector embeddings for semantic search + +The embedding column uses pgvector's vector(384) type, matching the +all-MiniLM-L6-v2 model used for embeddings (same as conversation_embeddings). + +RLS is applied to both tables — each tenant's KB is completely isolated. +""" + +from __future__ import annotations + +import uuid +from datetime import datetime + +from sqlalchemy import DateTime, ForeignKey, Integer, Text, func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship + + +class KBBase(DeclarativeBase): + """Separate declarative base for KB models.""" + + pass + + +class KnowledgeBaseDocument(KBBase): + """ + A document uploaded to a tenant's knowledge base. + + Documents are chunked into KBChunk rows for vector search. + RLS ensures tenant isolation — each tenant sees only their documents. + """ + + __tablename__ = "kb_documents" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + primary_key=True, + server_default=func.gen_random_uuid(), + ) + tenant_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + nullable=False, + index=True, + ) + agent_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + nullable=False, + index=True, + comment="Agent this document is associated with", + ) + filename: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="Original filename if uploaded as a file", + ) + source_url: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="Source URL if ingested from the web", + ) + content_type: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="MIME type: text/plain, application/pdf, etc.", + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + server_default=func.now(), + ) + + # Relationship + chunks: Mapped[list[KBChunk]] = relationship("KBChunk", back_populates="document", cascade="all, delete-orphan") + + def __repr__(self) -> str: + return f"" + + +class KBChunk(KBBase): + """ + A text chunk from a knowledge base document, with a vector embedding. + + The embedding column is vector(384) — matches all-MiniLM-L6-v2 output dimensions. + The HNSW index in the migration enables fast cosine similarity search. + + RLS ensures tenant isolation — each tenant's chunks are invisible to others. + """ + + __tablename__ = "kb_chunks" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + primary_key=True, + server_default=func.gen_random_uuid(), + ) + tenant_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + nullable=False, + index=True, + ) + document_id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), + ForeignKey("kb_documents.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + content: Mapped[str] = mapped_column( + Text, + nullable=False, + comment="The text content of this chunk", + ) + # embedding is vector(384) — raw DDL in migration, not mapped here + # because SQLAlchemy doesn't natively know the pgvector type + chunk_index: Mapped[int | None] = mapped_column( + Integer, + nullable=True, + comment="Position of this chunk within its source document (0-indexed)", + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + server_default=func.now(), + ) + + # Relationship + document: Mapped[KnowledgeBaseDocument] = relationship("KnowledgeBaseDocument", back_populates="chunks") + + def __repr__(self) -> str: + return f""