""" Integration tests for pgvector long-term memory. Requires a live PostgreSQL instance with pgvector extension installed. Tests are automatically skipped if the database is not available (fixture from conftest.py handles that via pytest.skip). Key scenarios tested: - store_embedding inserts with correct scoping - retrieve_relevant returns matching content above threshold - Cross-tenant isolation: tenant A's embeddings never returned for tenant B - High threshold returns empty list for dissimilar queries """ from __future__ import annotations import uuid import pytest import pytest_asyncio from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from orchestrator.memory.long_term import retrieve_relevant, store_embedding # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def agent_a_id() -> uuid.UUID: """Return a stable agent UUID for tenant A tests.""" return uuid.UUID("aaaaaaaa-0000-0000-0000-000000000001") @pytest_asyncio.fixture async def agent_b_id() -> uuid.UUID: """Return a stable agent UUID for tenant B tests.""" return uuid.UUID("bbbbbbbb-0000-0000-0000-000000000002") # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- async def test_store_embedding_inserts_row( db_session: AsyncSession, tenant_a: dict, agent_a_id: uuid.UUID, ): """store_embedding inserts a row into conversation_embeddings.""" from shared.rls import current_tenant_id tenant_id = tenant_a["id"] user_id = "user-store-test" embedding = [0.1] * 384 content = "I prefer concise answers." token = current_tenant_id.set(tenant_id) try: await store_embedding(db_session, tenant_id, agent_a_id, user_id, content, "user", embedding) await db_session.commit() result = await db_session.execute( text("SELECT content, role FROM conversation_embeddings WHERE tenant_id = :tid AND user_id = :uid"), {"tid": str(tenant_id), "uid": user_id}, ) rows = result.fetchall() finally: current_tenant_id.reset(token) assert len(rows) == 1 assert rows[0].content == content assert rows[0].role == "user" async def test_retrieve_relevant_returns_similar_content( db_session: AsyncSession, tenant_a: dict, agent_a_id: uuid.UUID, ): """retrieve_relevant returns content above cosine similarity threshold.""" from shared.rls import current_tenant_id tenant_id = tenant_a["id"] user_id = "user-retrieve-test" # Store two embeddings: one very similar to the query, one dissimilar # We simulate similarity by using identical embeddings similar_embedding = [1.0] + [0.0] * 383 dissimilar_embedding = [0.0] * 383 + [1.0] query_embedding = [1.0] + [0.0] * 383 # identical to similar_embedding token = current_tenant_id.set(tenant_id) try: await store_embedding( db_session, tenant_id, agent_a_id, user_id, "The user likes Python programming.", "user", similar_embedding ) await store_embedding( db_session, tenant_id, agent_a_id, user_id, "This is completely unrelated content.", "user", dissimilar_embedding ) await db_session.commit() results = await retrieve_relevant( db_session, tenant_id, agent_a_id, user_id, query_embedding, top_k=3, threshold=0.5 ) finally: current_tenant_id.reset(token) # Should return the similar content assert len(results) >= 1 assert any("Python" in r for r in results) async def test_retrieve_relevant_high_threshold_returns_empty( db_session: AsyncSession, tenant_a: dict, agent_a_id: uuid.UUID, ): """retrieve_relevant with threshold=0.99 and dissimilar query returns empty list.""" from shared.rls import current_tenant_id tenant_id = tenant_a["id"] user_id = "user-threshold-test" # Store an embedding pointing in one direction stored_embedding = [1.0] + [0.0] * 383 # Query pointing in orthogonal direction — cosine distance ~= 1.0, similarity ~= 0.0 query_embedding = [0.0] + [1.0] + [0.0] * 382 token = current_tenant_id.set(tenant_id) try: await store_embedding( db_session, tenant_id, agent_a_id, user_id, "Some stored content.", "user", stored_embedding ) await db_session.commit() results = await retrieve_relevant( db_session, tenant_id, agent_a_id, user_id, query_embedding, top_k=3, threshold=0.99 ) finally: current_tenant_id.reset(token) assert results == [] async def test_cross_tenant_isolation( db_session: AsyncSession, tenant_a: dict, tenant_b: dict, agent_a_id: uuid.UUID, agent_b_id: uuid.UUID, ): """ retrieve_relevant with tenant_id=A NEVER returns tenant_id=B embeddings. This is the critical security test — cross-tenant contamination would be a catastrophic data leak in a multi-tenant system. """ from shared.rls import current_tenant_id user_id = "shared-user-id" tenant_a_id = tenant_a["id"] tenant_b_id = tenant_b["id"] # Same query embedding for both tenants embedding = [1.0] + [0.0] * 383 # Store embedding for tenant B token = current_tenant_id.set(tenant_b_id) try: await store_embedding( db_session, tenant_b_id, agent_b_id, user_id, "Tenant B secret information.", "user", embedding ) await db_session.commit() finally: current_tenant_id.reset(token) # Query as tenant A — should NOT see tenant B's data token = current_tenant_id.set(tenant_a_id) try: results = await retrieve_relevant( db_session, tenant_a_id, agent_a_id, user_id, embedding, top_k=10, threshold=0.0 ) finally: current_tenant_id.reset(token) # Tenant A should get nothing — it has no embeddings of its own # and it MUST NOT see tenant B's embeddings for result in results: assert "Tenant B" not in result, "Cross-tenant data leakage detected!" async def test_retrieve_relevant_user_isolation( db_session: AsyncSession, tenant_a: dict, agent_a_id: uuid.UUID, ): """retrieve_relevant for user A never returns user B embeddings.""" from shared.rls import current_tenant_id tenant_id = tenant_a["id"] embedding = [1.0] + [0.0] * 383 token = current_tenant_id.set(tenant_id) try: await store_embedding( db_session, tenant_id, agent_a_id, "user-A", "User A private information.", "user", embedding ) await db_session.commit() # Query as user B — should not see user A's data results = await retrieve_relevant( db_session, tenant_id, agent_a_id, "user-B", embedding, top_k=10, threshold=0.0 ) finally: current_tenant_id.reset(token) for result in results: assert "User A private" not in result async def test_retrieve_relevant_top_k_limits_results( db_session: AsyncSession, tenant_a: dict, agent_a_id: uuid.UUID, ): """retrieve_relevant respects top_k limit.""" from shared.rls import current_tenant_id tenant_id = tenant_a["id"] user_id = "user-topk-test" embedding = [1.0] + [0.0] * 383 token = current_tenant_id.set(tenant_id) try: # Store 5 very similar embeddings for i in range(5): await store_embedding( db_session, tenant_id, agent_a_id, user_id, f"Content item {i}", "user", embedding ) await db_session.commit() results = await retrieve_relevant( db_session, tenant_id, agent_a_id, user_id, embedding, top_k=2, threshold=0.0 ) finally: current_tenant_id.reset(token) assert len(results) <= 2