feat(02-02): audit model, KB model, migration, and audit logger

- AuditEvent ORM model with tenant_id, action_type, latency_ms, metadata
- KnowledgeBaseDocument and KBChunk ORM models for vector KB
- Migration 003: audit_events (immutable via REVOKE), kb_documents, kb_chunks
  with HNSW index and RLS on all tables
- AuditLogger with log_llm_call, log_tool_call, log_escalation methods
- audit_events immutability enforced at DB level (UPDATE/DELETE rejected)
- [Rule 1 - Bug] Fixed CAST(:metadata AS jsonb) for asyncpg compatibility
This commit is contained in:
2026-03-23 14:50:51 -06:00
parent d489551130
commit 30b9f60668
5 changed files with 696 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Audit logging subsystem for the Konstruct Agent Orchestrator."""

View File

@@ -0,0 +1,224 @@
"""
Immutable audit event writer for the Konstruct Agent Orchestrator.
AuditLogger writes to the audit_events table. The table is protected at the DB
level: the konstruct_app role can only SELECT and INSERT — UPDATE and DELETE are
revoked. This ensures the audit trail is tamper-proof even if application code
contains a bug that would otherwise modify audit records.
Usage:
from shared.db import async_session_factory
from orchestrator.audit.logger import AuditLogger
audit_logger = AuditLogger(session_factory=async_session_factory)
await audit_logger.log_llm_call(
tenant_id=tenant_uuid,
agent_id=agent_uuid,
user_id="U12345",
input_summary="What is the weather?",
output_summary="It is sunny today.",
latency_ms=350,
)
"""
from __future__ import annotations
import json
import logging
import uuid
from typing import Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from shared.rls import current_tenant_id
logger = logging.getLogger(__name__)
# Maximum length for input/output summaries stored in audit trail
_SUMMARY_MAX_LEN = 500
def _truncate(value: str, max_len: int = _SUMMARY_MAX_LEN) -> str:
"""Truncate a string to max_len characters with ellipsis indicator."""
if len(value) <= max_len:
return value
return value[:max_len] + ""
class AuditLogger:
"""
Writes immutable audit events to the audit_events table.
The RLS context (current_tenant_id ContextVar) must be set by the caller
before invoking any log method. This ensures each insert is scoped to the
correct tenant.
Args:
session_factory: An async_sessionmaker configured with the application
engine. The logger creates a fresh session per write
to avoid transaction entanglement with the caller.
"""
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
self._session_factory = session_factory
async def _write_event(
self,
tenant_id: uuid.UUID,
agent_id: uuid.UUID | None,
user_id: str | None,
action_type: str,
input_summary: str | None,
output_summary: str | None,
latency_ms: int | None,
metadata: dict[str, Any],
) -> None:
"""
Internal: write a single audit event row.
Uses raw INSERT to avoid SQLAlchemy ORM session tracking — audit events
should never be accidentally queued for update/delete by the ORM.
"""
tenant_token = current_tenant_id.set(tenant_id)
try:
async with self._session_factory() as session:
await session.execute(
text("""
INSERT INTO audit_events
(tenant_id, agent_id, user_id, action_type,
input_summary, output_summary, latency_ms, metadata)
VALUES
(:tenant_id, :agent_id, :user_id, :action_type,
:input_summary, :output_summary, :latency_ms, CAST(:metadata AS jsonb))
"""),
{
"tenant_id": str(tenant_id),
"agent_id": str(agent_id) if agent_id else None,
"user_id": user_id,
"action_type": action_type,
"input_summary": input_summary,
"output_summary": output_summary,
"latency_ms": latency_ms,
"metadata": json.dumps(metadata),
},
)
await session.commit()
except Exception:
logger.exception(
"Failed to write audit event: action=%s tenant=%s",
action_type,
tenant_id,
)
raise
finally:
current_tenant_id.reset(tenant_token)
async def log_llm_call(
self,
tenant_id: uuid.UUID,
agent_id: uuid.UUID | None,
user_id: str | None,
input_summary: str,
output_summary: str,
latency_ms: int,
metadata: dict[str, Any] | None = None,
) -> None:
"""
Record an LLM completion call.
Args:
tenant_id: Tenant this call belongs to.
agent_id: Agent that made the LLM call.
user_id: End-user identifier from the channel.
input_summary: Truncated representation of the messages sent to LLM.
output_summary: Truncated LLM response content.
latency_ms: Round-trip duration in milliseconds.
metadata: Optional dict (model name, token counts, etc.).
"""
await self._write_event(
tenant_id=tenant_id,
agent_id=agent_id,
user_id=user_id,
action_type="llm_call",
input_summary=_truncate(input_summary),
output_summary=_truncate(output_summary),
latency_ms=latency_ms,
metadata=metadata or {},
)
async def log_tool_call(
self,
tool_name: str,
args: dict[str, Any],
result: str | None,
tenant_id: uuid.UUID,
agent_id: uuid.UUID | None,
latency_ms: int,
error: str | None = None,
) -> None:
"""
Record a tool invocation.
Args:
tool_name: Name of the invoked tool (e.g. "web_search").
args: Arguments passed to the tool (will be JSON-serialized).
result: Tool result string, or None if an error occurred.
tenant_id: Tenant this invocation belongs to.
agent_id: Agent that invoked the tool.
latency_ms: Tool execution duration in milliseconds.
error: Error message if the tool failed, else None.
"""
# Summarize args as truncated JSON for the audit trail
try:
args_str = json.dumps(args, ensure_ascii=False)
except Exception:
args_str = repr(args)
input_summary = _truncate(f"{tool_name}({args_str})")
output_summary = _truncate(error or result or "")
metadata: dict[str, Any] = {"tool_name": tool_name}
if error:
metadata["error"] = error
await self._write_event(
tenant_id=tenant_id,
agent_id=agent_id,
user_id=None,
action_type="tool_invocation",
input_summary=input_summary,
output_summary=output_summary,
latency_ms=latency_ms,
metadata=metadata,
)
async def log_escalation(
self,
tenant_id: uuid.UUID,
agent_id: uuid.UUID | None,
user_id: str | None,
trigger_reason: str,
metadata: dict[str, Any] | None = None,
) -> None:
"""
Record an agent escalation (handoff to human or another agent).
Args:
tenant_id: Tenant this escalation belongs to.
agent_id: Agent that triggered the escalation.
user_id: End-user identifier.
trigger_reason: Human-readable description of why escalation occurred.
metadata: Optional additional context.
"""
await self._write_event(
tenant_id=tenant_id,
agent_id=agent_id,
user_id=user_id,
action_type="escalation",
input_summary=_truncate(trigger_reason),
output_summary=None,
latency_ms=None,
metadata=metadata or {},
)

View File

@@ -0,0 +1,99 @@
"""
SQLAlchemy 2.0 ORM model for the immutable audit_events table.
Design:
- Append-only: konstruct_app role has SELECT + INSERT only (enforced via REVOKE in migration)
- Tenant-scoped via RLS — every query sees only the current tenant's rows
- action_type discriminates between llm_call, tool_invocation, and escalation events
Important: The DB-level REVOKE UPDATE/DELETE on audit_events means that even if
application code accidentally attempts an UPDATE or DELETE, the database will reject it.
This is a hard compliance guarantee.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import DateTime, Integer, Text, func
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class AuditBase(DeclarativeBase):
"""Separate declarative base for audit models to avoid conflicts with tenant Base."""
pass
class AuditEvent(AuditBase):
"""
Immutable record of every LLM call, tool invocation, and escalation event.
RLS is enabled — rows are scoped to the current tenant via app.current_tenant.
The konstruct_app role has SELECT + INSERT only — UPDATE and DELETE are revoked.
action_type values:
'llm_call' — LLM completion request/response
'tool_invocation' — Tool execution (success or failure)
'escalation' — Agent handoff to human or another agent
"""
__tablename__ = "audit_events"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
server_default=func.gen_random_uuid(),
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
nullable=False,
index=True,
)
agent_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
nullable=True,
)
user_id: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Channel-native user identifier",
)
action_type: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="llm_call | tool_invocation | escalation",
)
input_summary: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Truncated input for audit readability (not full content)",
)
output_summary: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Truncated output for audit readability",
)
latency_ms: Mapped[int | None] = mapped_column(
Integer,
nullable=True,
comment="Duration of the operation in milliseconds",
)
metadata: Mapped[dict[str, Any]] = mapped_column(
JSONB,
nullable=False,
server_default="{}",
default=dict,
comment="Additional structured context (model name, tool args hash, etc.)",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
def __repr__(self) -> str:
return f"<AuditEvent id={self.id} action={self.action_type} tenant={self.tenant_id}>"

View File

@@ -0,0 +1,134 @@
"""
SQLAlchemy 2.0 ORM models for the Knowledge Base tables.
Tables:
kb_documents — uploaded documents belonging to a tenant/agent
kb_chunks — text chunks with vector embeddings for semantic search
The embedding column uses pgvector's vector(384) type, matching the
all-MiniLM-L6-v2 model used for embeddings (same as conversation_embeddings).
RLS is applied to both tables — each tenant's KB is completely isolated.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from sqlalchemy import DateTime, ForeignKey, Integer, Text, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class KBBase(DeclarativeBase):
"""Separate declarative base for KB models."""
pass
class KnowledgeBaseDocument(KBBase):
"""
A document uploaded to a tenant's knowledge base.
Documents are chunked into KBChunk rows for vector search.
RLS ensures tenant isolation — each tenant sees only their documents.
"""
__tablename__ = "kb_documents"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
server_default=func.gen_random_uuid(),
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
nullable=False,
index=True,
)
agent_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
nullable=False,
index=True,
comment="Agent this document is associated with",
)
filename: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Original filename if uploaded as a file",
)
source_url: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Source URL if ingested from the web",
)
content_type: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="MIME type: text/plain, application/pdf, etc.",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
# Relationship
chunks: Mapped[list[KBChunk]] = relationship("KBChunk", back_populates="document", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<KnowledgeBaseDocument id={self.id} tenant={self.tenant_id}>"
class KBChunk(KBBase):
"""
A text chunk from a knowledge base document, with a vector embedding.
The embedding column is vector(384) — matches all-MiniLM-L6-v2 output dimensions.
The HNSW index in the migration enables fast cosine similarity search.
RLS ensures tenant isolation — each tenant's chunks are invisible to others.
"""
__tablename__ = "kb_chunks"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
server_default=func.gen_random_uuid(),
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
nullable=False,
index=True,
)
document_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("kb_documents.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
content: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="The text content of this chunk",
)
# embedding is vector(384) — raw DDL in migration, not mapped here
# because SQLAlchemy doesn't natively know the pgvector type
chunk_index: Mapped[int | None] = mapped_column(
Integer,
nullable=True,
comment="Position of this chunk within its source document (0-indexed)",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
# Relationship
document: Mapped[KnowledgeBaseDocument] = relationship("KnowledgeBaseDocument", back_populates="chunks")
def __repr__(self) -> str:
return f"<KBChunk id={self.id} document={self.document_id} idx={self.chunk_index}>"