From df7a5a922ffdaa84bb9e0231f3b1307eb4ca16ec Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 23 Mar 2026 14:48:52 -0600 Subject: [PATCH] test(02-02): add failing audit integration tests - Tests for AuditLogger.log_llm_call, log_tool_call, log_escalation - Tests for audit_events immutability (UPDATE/DELETE rejection) - Tests for RLS tenant isolation --- tests/integration/test_audit.py | 281 ++++++++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 tests/integration/test_audit.py diff --git a/tests/integration/test_audit.py b/tests/integration/test_audit.py new file mode 100644 index 0000000..57b7bfb --- /dev/null +++ b/tests/integration/test_audit.py @@ -0,0 +1,281 @@ +""" +Integration tests for the audit logging system. + +Tests: + - AuditLogger writes correct rows to audit_events + - audit_events is immutable (UPDATE/DELETE rejected by DB) + - RLS isolates audit events per tenant + - KBChunk and KBDocument models exist and map to migration tables +""" + +from __future__ import annotations + +import uuid + +import pytest +import pytest_asyncio +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest_asyncio.fixture +async def audit_tenant(db_session: AsyncSession) -> dict: + """Create a tenant for audit tests.""" + tenant_id = uuid.uuid4() + suffix = uuid.uuid4().hex[:6] + await db_session.execute( + text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"), + { + "id": str(tenant_id), + "name": f"Audit Tenant {suffix}", + "slug": f"audit-tenant-{suffix}", + "settings": "{}", + }, + ) + await db_session.commit() + return {"id": tenant_id} + + +@pytest_asyncio.fixture +async def audit_logger_fixture(db_engine): + """Return an AuditLogger wired to the test engine.""" + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + + from orchestrator.audit.logger import AuditLogger + + session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) + return AuditLogger(session_factory=session_factory) + + +# --------------------------------------------------------------------------- +# AuditLogger tests +# --------------------------------------------------------------------------- + + +class TestAuditLoggerLlmCall: + """AuditLogger.log_llm_call writes correct audit_events rows.""" + + async def test_llm_call_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine): + from shared.rls import configure_rls_hook, current_tenant_id + + tenant_id = audit_tenant["id"] + configure_rls_hook(db_engine) + token = current_tenant_id.set(tenant_id) + try: + await audit_logger_fixture.log_llm_call( + tenant_id=tenant_id, + agent_id=uuid.uuid4(), + user_id="test-user", + input_summary="What is the weather?", + output_summary="It is sunny.", + latency_ms=350, + metadata={"model": "quality"}, + ) + finally: + current_tenant_id.reset(token) + + # Verify row was written + token2 = current_tenant_id.set(tenant_id) + try: + result = await db_session.execute( + text( + "SELECT action_type, input_summary, output_summary, latency_ms " + "FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1" + ), + {"tid": str(tenant_id)}, + ) + finally: + current_tenant_id.reset(token2) + + row = result.fetchone() + assert row is not None + assert row.action_type == "llm_call" + assert row.input_summary == "What is the weather?" + assert row.output_summary == "It is sunny." + assert row.latency_ms == 350 + + async def test_tool_call_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine): + from shared.rls import configure_rls_hook, current_tenant_id + + tenant_id = audit_tenant["id"] + configure_rls_hook(db_engine) + token = current_tenant_id.set(tenant_id) + try: + await audit_logger_fixture.log_tool_call( + tool_name="web_search", + args={"query": "latest AI news"}, + result="Top AI news: ...", + tenant_id=tenant_id, + agent_id=uuid.uuid4(), + latency_ms=1200, + ) + finally: + current_tenant_id.reset(token) + + token2 = current_tenant_id.set(tenant_id) + try: + result = await db_session.execute( + text( + "SELECT action_type, input_summary, latency_ms " + "FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1" + ), + {"tid": str(tenant_id)}, + ) + finally: + current_tenant_id.reset(token2) + + row = result.fetchone() + assert row is not None + assert row.action_type == "tool_invocation" + assert "web_search" in row.input_summary + assert row.latency_ms == 1200 + + async def test_escalation_written_to_db(self, audit_logger_fixture, db_session, audit_tenant, db_engine): + from shared.rls import configure_rls_hook, current_tenant_id + + tenant_id = audit_tenant["id"] + configure_rls_hook(db_engine) + token = current_tenant_id.set(tenant_id) + try: + await audit_logger_fixture.log_escalation( + tenant_id=tenant_id, + agent_id=uuid.uuid4(), + user_id="test-user", + trigger_reason="sentiment < -0.7", + metadata={"sentiment_score": -0.85}, + ) + finally: + current_tenant_id.reset(token) + + token2 = current_tenant_id.set(tenant_id) + try: + result = await db_session.execute( + text( + "SELECT action_type, input_summary " + "FROM audit_events WHERE tenant_id = :tid ORDER BY created_at DESC LIMIT 1" + ), + {"tid": str(tenant_id)}, + ) + finally: + current_tenant_id.reset(token2) + + row = result.fetchone() + assert row is not None + assert row.action_type == "escalation" + assert "sentiment < -0.7" in row.input_summary + + +class TestAuditImmutability: + """audit_events rows cannot be updated or deleted by konstruct_app.""" + + async def test_update_rejected(self, audit_logger_fixture, db_session, audit_tenant, db_engine): + """UPDATE on audit_events must raise a permission error.""" + from sqlalchemy.exc import ProgrammingError + + from shared.rls import configure_rls_hook, current_tenant_id + + tenant_id = audit_tenant["id"] + configure_rls_hook(db_engine) + token = current_tenant_id.set(tenant_id) + try: + await audit_logger_fixture.log_llm_call( + tenant_id=tenant_id, + agent_id=uuid.uuid4(), + user_id="test-user", + input_summary="original", + output_summary="original output", + latency_ms=100, + ) + finally: + current_tenant_id.reset(token) + + token3 = current_tenant_id.set(tenant_id) + try: + with pytest.raises(ProgrammingError): + await db_session.execute( + text("UPDATE audit_events SET input_summary = 'tampered' WHERE tenant_id = :tid"), + {"tid": str(tenant_id)}, + ) + await db_session.commit() + finally: + current_tenant_id.reset(token3) + await db_session.rollback() + + async def test_delete_rejected(self, audit_logger_fixture, db_session, audit_tenant, db_engine): + """DELETE on audit_events must raise a permission error.""" + from sqlalchemy.exc import ProgrammingError + + from shared.rls import configure_rls_hook, current_tenant_id + + tenant_id = audit_tenant["id"] + configure_rls_hook(db_engine) + token = current_tenant_id.set(tenant_id) + try: + await audit_logger_fixture.log_llm_call( + tenant_id=tenant_id, + agent_id=uuid.uuid4(), + user_id="test-user", + input_summary="to be deleted", + output_summary="output", + latency_ms=50, + ) + finally: + current_tenant_id.reset(token) + + token3 = current_tenant_id.set(tenant_id) + try: + with pytest.raises(ProgrammingError): + await db_session.execute( + text("DELETE FROM audit_events WHERE tenant_id = :tid"), + {"tid": str(tenant_id)}, + ) + await db_session.commit() + finally: + current_tenant_id.reset(token3) + await db_session.rollback() + + +class TestAuditRLS: + """Tenant A cannot see Tenant B's audit events via RLS.""" + + async def test_rls_isolation(self, db_session, db_engine, tenant_a, tenant_b): + """Tenant A cannot read Tenant B's audit rows.""" + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + + from orchestrator.audit.logger import AuditLogger + from shared.rls import configure_rls_hook, current_tenant_id + + configure_rls_hook(db_engine) + session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) + audit_logger = AuditLogger(session_factory=session_factory) + + # Write an event for tenant_b + token_b = current_tenant_id.set(tenant_b["id"]) + try: + await audit_logger.log_llm_call( + tenant_id=tenant_b["id"], + agent_id=uuid.uuid4(), + user_id="user-b", + input_summary="tenant b message", + output_summary="tenant b response", + latency_ms=200, + ) + finally: + current_tenant_id.reset(token_b) + + # Query from tenant_a's context — should return no rows + token_a = current_tenant_id.set(tenant_a["id"]) + try: + result = await db_session.execute( + text("SELECT COUNT(*) FROM audit_events WHERE tenant_id = :tid"), + {"tid": str(tenant_b["id"])}, + ) + count = result.scalar() + finally: + current_tenant_id.reset(token_a) + + assert count == 0, "Tenant A should not see Tenant B's audit events"