Files
konstruct/tests/integration/test_audit.py
Adolfo Delorenzo df7a5a922f test(02-02): add failing audit integration tests
- Tests for AuditLogger.log_llm_call, log_tool_call, log_escalation
- Tests for audit_events immutability (UPDATE/DELETE rejection)
- Tests for RLS tenant isolation
2026-03-23 14:48:52 -06:00

282 lines
9.9 KiB
Python

"""
Integration tests for the audit logging system.
Tests:
- AuditLogger writes correct rows to audit_events
- audit_events is immutable (UPDATE/DELETE rejected by DB)
- RLS isolates audit events per tenant
- KBChunk and KBDocument models exist and map to migration tables
"""
from __future__ import annotations
import uuid
import pytest
import pytest_asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest_asyncio.fixture
async def audit_tenant(db_session: AsyncSession) -> dict:
"""Create a tenant for audit tests."""
tenant_id = uuid.uuid4()
suffix = uuid.uuid4().hex[:6]
await db_session.execute(
text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"),
{
"id": str(tenant_id),
"name": f"Audit Tenant {suffix}",
"slug": f"audit-tenant-{suffix}",
"settings": "{}",
},
)
await db_session.commit()
return {"id": tenant_id}
@pytest_asyncio.fixture
async def audit_logger_fixture(db_engine):
"""Return an AuditLogger wired to the test engine."""
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from orchestrator.audit.logger import AuditLogger
session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
return AuditLogger(session_factory=session_factory)
# ---------------------------------------------------------------------------
# AuditLogger tests
# ---------------------------------------------------------------------------
class TestAuditLoggerLlmCall:
"""AuditLogger.log_llm_call writes correct audit_events rows."""
async def test_llm_call_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
from shared.rls import configure_rls_hook, current_tenant_id
tenant_id = audit_tenant["id"]
configure_rls_hook(db_engine)
token = current_tenant_id.set(tenant_id)
try:
await audit_logger_fixture.log_llm_call(
tenant_id=tenant_id,
agent_id=uuid.uuid4(),
user_id="test-user",
input_summary="What is the weather?",
output_summary="It is sunny.",
latency_ms=350,
metadata={"model": "quality"},
)
finally:
current_tenant_id.reset(token)
# Verify row was written
token2 = current_tenant_id.set(tenant_id)
try:
result = await db_session.execute(
text(
"SELECT action_type, input_summary, output_summary, latency_ms "
"FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1"
),
{"tid": str(tenant_id)},
)
finally:
current_tenant_id.reset(token2)
row = result.fetchone()
assert row is not None
assert row.action_type == "llm_call"
assert row.input_summary == "What is the weather?"
assert row.output_summary == "It is sunny."
assert row.latency_ms == 350
async def test_tool_call_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
from shared.rls import configure_rls_hook, current_tenant_id
tenant_id = audit_tenant["id"]
configure_rls_hook(db_engine)
token = current_tenant_id.set(tenant_id)
try:
await audit_logger_fixture.log_tool_call(
tool_name="web_search",
args={"query": "latest AI news"},
result="Top AI news: ...",
tenant_id=tenant_id,
agent_id=uuid.uuid4(),
latency_ms=1200,
)
finally:
current_tenant_id.reset(token)
token2 = current_tenant_id.set(tenant_id)
try:
result = await db_session.execute(
text(
"SELECT action_type, input_summary, latency_ms "
"FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1"
),
{"tid": str(tenant_id)},
)
finally:
current_tenant_id.reset(token2)
row = result.fetchone()
assert row is not None
assert row.action_type == "tool_invocation"
assert "web_search" in row.input_summary
assert row.latency_ms == 1200
async def test_escalation_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
from shared.rls import configure_rls_hook, current_tenant_id
tenant_id = audit_tenant["id"]
configure_rls_hook(db_engine)
token = current_tenant_id.set(tenant_id)
try:
await audit_logger_fixture.log_escalation(
tenant_id=tenant_id,
agent_id=uuid.uuid4(),
user_id="test-user",
trigger_reason="sentiment < -0.7",
metadata={"sentiment_score": -0.85},
)
finally:
current_tenant_id.reset(token)
token2 = current_tenant_id.set(tenant_id)
try:
result = await db_session.execute(
text(
"SELECT action_type, input_summary "
"FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1"
),
{"tid": str(tenant_id)},
)
finally:
current_tenant_id.reset(token2)
row = result.fetchone()
assert row is not None
assert row.action_type == "escalation"
assert "sentiment < -0.7" in row.input_summary
class TestAuditImmutability:
"""audit_events rows cannot be updated or deleted by konstruct_app."""
async def test_update_rejected(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
"""UPDATE on audit_events must raise a permission error."""
from sqlalchemy.exc import ProgrammingError
from shared.rls import configure_rls_hook, current_tenant_id
tenant_id = audit_tenant["id"]
configure_rls_hook(db_engine)
token = current_tenant_id.set(tenant_id)
try:
await audit_logger_fixture.log_llm_call(
tenant_id=tenant_id,
agent_id=uuid.uuid4(),
user_id="test-user",
input_summary="original",
output_summary="original output",
latency_ms=100,
)
finally:
current_tenant_id.reset(token)
token3 = current_tenant_id.set(tenant_id)
try:
with pytest.raises(ProgrammingError):
await db_session.execute(
text("UPDATE audit_events SET input_summary = 'tampered' WHERE tenant_id = :tid"),
{"tid": str(tenant_id)},
)
await db_session.commit()
finally:
current_tenant_id.reset(token3)
await db_session.rollback()
async def test_delete_rejected(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
"""DELETE on audit_events must raise a permission error."""
from sqlalchemy.exc import ProgrammingError
from shared.rls import configure_rls_hook, current_tenant_id
tenant_id = audit_tenant["id"]
configure_rls_hook(db_engine)
token = current_tenant_id.set(tenant_id)
try:
await audit_logger_fixture.log_llm_call(
tenant_id=tenant_id,
agent_id=uuid.uuid4(),
user_id="test-user",
input_summary="to be deleted",
output_summary="output",
latency_ms=50,
)
finally:
current_tenant_id.reset(token)
token3 = current_tenant_id.set(tenant_id)
try:
with pytest.raises(ProgrammingError):
await db_session.execute(
text("DELETE FROM audit_events WHERE tenant_id = :tid"),
{"tid": str(tenant_id)},
)
await db_session.commit()
finally:
current_tenant_id.reset(token3)
await db_session.rollback()
class TestAuditRLS:
"""Tenant A cannot see Tenant B's audit events via RLS."""
async def test_rls_isolation(self, db_session, db_engine, tenant_a, tenant_b):
"""Tenant A cannot read Tenant B's audit rows."""
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from orchestrator.audit.logger import AuditLogger
from shared.rls import configure_rls_hook, current_tenant_id
configure_rls_hook(db_engine)
session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
audit_logger = AuditLogger(session_factory=session_factory)
# Write an event for tenant_b
token_b = current_tenant_id.set(tenant_b["id"])
try:
await audit_logger.log_llm_call(
tenant_id=tenant_b["id"],
agent_id=uuid.uuid4(),
user_id="user-b",
input_summary="tenant b message",
output_summary="tenant b response",
latency_ms=200,
)
finally:
current_tenant_id.reset(token_b)
# Query from tenant_a's context — should return no rows
token_a = current_tenant_id.set(tenant_a["id"])
try:
result = await db_session.execute(
text("SELECT COUNT(*) FROM audit_events WHERE tenant_id = :tid"),
{"tid": str(tenant_b["id"])},
)
count = result.scalar()
finally:
current_tenant_id.reset(token_a)
assert count == 0, "Tenant A should not see Tenant B's audit events"