- Tests for AuditLogger.log_llm_call, log_tool_call, log_escalation - Tests for audit_events immutability (UPDATE/DELETE rejection) - Tests for RLS tenant isolation
282 lines
9.9 KiB
Python
282 lines
9.9 KiB
Python
"""
|
|
Integration tests for the audit logging system.
|
|
|
|
Tests:
|
|
- AuditLogger writes correct rows to audit_events
|
|
- audit_events is immutable (UPDATE/DELETE rejected by DB)
|
|
- RLS isolates audit events per tenant
|
|
- KBChunk and KBDocument models exist and map to migration tables
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def audit_tenant(db_session: AsyncSession) -> dict:
|
|
"""Create a tenant for audit tests."""
|
|
tenant_id = uuid.uuid4()
|
|
suffix = uuid.uuid4().hex[:6]
|
|
await db_session.execute(
|
|
text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"),
|
|
{
|
|
"id": str(tenant_id),
|
|
"name": f"Audit Tenant {suffix}",
|
|
"slug": f"audit-tenant-{suffix}",
|
|
"settings": "{}",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
return {"id": tenant_id}
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def audit_logger_fixture(db_engine):
|
|
"""Return an AuditLogger wired to the test engine."""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
from orchestrator.audit.logger import AuditLogger
|
|
|
|
session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
return AuditLogger(session_factory=session_factory)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# AuditLogger tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAuditLoggerLlmCall:
|
|
"""AuditLogger.log_llm_call writes correct audit_events rows."""
|
|
|
|
async def test_llm_call_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
|
|
from shared.rls import configure_rls_hook, current_tenant_id
|
|
|
|
tenant_id = audit_tenant["id"]
|
|
configure_rls_hook(db_engine)
|
|
token = current_tenant_id.set(tenant_id)
|
|
try:
|
|
await audit_logger_fixture.log_llm_call(
|
|
tenant_id=tenant_id,
|
|
agent_id=uuid.uuid4(),
|
|
user_id="test-user",
|
|
input_summary="What is the weather?",
|
|
output_summary="It is sunny.",
|
|
latency_ms=350,
|
|
metadata={"model": "quality"},
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
# Verify row was written
|
|
token2 = current_tenant_id.set(tenant_id)
|
|
try:
|
|
result = await db_session.execute(
|
|
text(
|
|
"SELECT action_type, input_summary, output_summary, latency_ms "
|
|
"FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1"
|
|
),
|
|
{"tid": str(tenant_id)},
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token2)
|
|
|
|
row = result.fetchone()
|
|
assert row is not None
|
|
assert row.action_type == "llm_call"
|
|
assert row.input_summary == "What is the weather?"
|
|
assert row.output_summary == "It is sunny."
|
|
assert row.latency_ms == 350
|
|
|
|
async def test_tool_call_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
|
|
from shared.rls import configure_rls_hook, current_tenant_id
|
|
|
|
tenant_id = audit_tenant["id"]
|
|
configure_rls_hook(db_engine)
|
|
token = current_tenant_id.set(tenant_id)
|
|
try:
|
|
await audit_logger_fixture.log_tool_call(
|
|
tool_name="web_search",
|
|
args={"query": "latest AI news"},
|
|
result="Top AI news: ...",
|
|
tenant_id=tenant_id,
|
|
agent_id=uuid.uuid4(),
|
|
latency_ms=1200,
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
token2 = current_tenant_id.set(tenant_id)
|
|
try:
|
|
result = await db_session.execute(
|
|
text(
|
|
"SELECT action_type, input_summary, latency_ms "
|
|
"FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1"
|
|
),
|
|
{"tid": str(tenant_id)},
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token2)
|
|
|
|
row = result.fetchone()
|
|
assert row is not None
|
|
assert row.action_type == "tool_invocation"
|
|
assert "web_search" in row.input_summary
|
|
assert row.latency_ms == 1200
|
|
|
|
async def test_escalation_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
|
|
from shared.rls import configure_rls_hook, current_tenant_id
|
|
|
|
tenant_id = audit_tenant["id"]
|
|
configure_rls_hook(db_engine)
|
|
token = current_tenant_id.set(tenant_id)
|
|
try:
|
|
await audit_logger_fixture.log_escalation(
|
|
tenant_id=tenant_id,
|
|
agent_id=uuid.uuid4(),
|
|
user_id="test-user",
|
|
trigger_reason="sentiment < -0.7",
|
|
metadata={"sentiment_score": -0.85},
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
token2 = current_tenant_id.set(tenant_id)
|
|
try:
|
|
result = await db_session.execute(
|
|
text(
|
|
"SELECT action_type, input_summary "
|
|
"FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1"
|
|
),
|
|
{"tid": str(tenant_id)},
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token2)
|
|
|
|
row = result.fetchone()
|
|
assert row is not None
|
|
assert row.action_type == "escalation"
|
|
assert "sentiment < -0.7" in row.input_summary
|
|
|
|
|
|
class TestAuditImmutability:
|
|
"""audit_events rows cannot be updated or deleted by konstruct_app."""
|
|
|
|
async def test_update_rejected(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
|
|
"""UPDATE on audit_events must raise a permission error."""
|
|
from sqlalchemy.exc import ProgrammingError
|
|
|
|
from shared.rls import configure_rls_hook, current_tenant_id
|
|
|
|
tenant_id = audit_tenant["id"]
|
|
configure_rls_hook(db_engine)
|
|
token = current_tenant_id.set(tenant_id)
|
|
try:
|
|
await audit_logger_fixture.log_llm_call(
|
|
tenant_id=tenant_id,
|
|
agent_id=uuid.uuid4(),
|
|
user_id="test-user",
|
|
input_summary="original",
|
|
output_summary="original output",
|
|
latency_ms=100,
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
token3 = current_tenant_id.set(tenant_id)
|
|
try:
|
|
with pytest.raises(ProgrammingError):
|
|
await db_session.execute(
|
|
text("UPDATE audit_events SET input_summary = 'tampered' WHERE tenant_id = :tid"),
|
|
{"tid": str(tenant_id)},
|
|
)
|
|
await db_session.commit()
|
|
finally:
|
|
current_tenant_id.reset(token3)
|
|
await db_session.rollback()
|
|
|
|
async def test_delete_rejected(self, audit_logger_fixture, db_session, audit_tenant, db_engine):
|
|
"""DELETE on audit_events must raise a permission error."""
|
|
from sqlalchemy.exc import ProgrammingError
|
|
|
|
from shared.rls import configure_rls_hook, current_tenant_id
|
|
|
|
tenant_id = audit_tenant["id"]
|
|
configure_rls_hook(db_engine)
|
|
token = current_tenant_id.set(tenant_id)
|
|
try:
|
|
await audit_logger_fixture.log_llm_call(
|
|
tenant_id=tenant_id,
|
|
agent_id=uuid.uuid4(),
|
|
user_id="test-user",
|
|
input_summary="to be deleted",
|
|
output_summary="output",
|
|
latency_ms=50,
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token)
|
|
|
|
token3 = current_tenant_id.set(tenant_id)
|
|
try:
|
|
with pytest.raises(ProgrammingError):
|
|
await db_session.execute(
|
|
text("DELETE FROM audit_events WHERE tenant_id = :tid"),
|
|
{"tid": str(tenant_id)},
|
|
)
|
|
await db_session.commit()
|
|
finally:
|
|
current_tenant_id.reset(token3)
|
|
await db_session.rollback()
|
|
|
|
|
|
class TestAuditRLS:
|
|
"""Tenant A cannot see Tenant B's audit events via RLS."""
|
|
|
|
async def test_rls_isolation(self, db_session, db_engine, tenant_a, tenant_b):
|
|
"""Tenant A cannot read Tenant B's audit rows."""
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
from orchestrator.audit.logger import AuditLogger
|
|
from shared.rls import configure_rls_hook, current_tenant_id
|
|
|
|
configure_rls_hook(db_engine)
|
|
session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
audit_logger = AuditLogger(session_factory=session_factory)
|
|
|
|
# Write an event for tenant_b
|
|
token_b = current_tenant_id.set(tenant_b["id"])
|
|
try:
|
|
await audit_logger.log_llm_call(
|
|
tenant_id=tenant_b["id"],
|
|
agent_id=uuid.uuid4(),
|
|
user_id="user-b",
|
|
input_summary="tenant b message",
|
|
output_summary="tenant b response",
|
|
latency_ms=200,
|
|
)
|
|
finally:
|
|
current_tenant_id.reset(token_b)
|
|
|
|
# Query from tenant_a's context — should return no rows
|
|
token_a = current_tenant_id.set(tenant_a["id"])
|
|
try:
|
|
result = await db_session.execute(
|
|
text("SELECT COUNT(*) FROM audit_events WHERE tenant_id = :tid"),
|
|
{"tid": str(tenant_b["id"])},
|
|
)
|
|
count = result.scalar()
|
|
finally:
|
|
current_tenant_id.reset(token_a)
|
|
|
|
assert count == 0, "Tenant A should not see Tenant B's audit events"
|