""" Shared test fixtures for Konstruct. IMPORTANT: The `db_session` fixture connects as `konstruct_app` (not postgres superuser). This is mandatory — RLS is bypassed for superuser connections, so tests using superuser would pass trivially while providing zero real protection. Integration tests requiring a live PostgreSQL container are skipped if the database is not available. Unit tests never require a live DB. Event loop design: All async fixtures use function scope to avoid pytest-asyncio cross-loop-scope issues. The test database is created once (at session scope, via a synchronous fixture) and reused across tests within the session. """ from __future__ import annotations import asyncio import os import subprocess import uuid from collections.abc import AsyncGenerator from typing import Any import pytest import pytest_asyncio from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine # --------------------------------------------------------------------------- # Database URLs # --------------------------------------------------------------------------- _ADMIN_URL = os.environ.get( "DATABASE_ADMIN_URL", "postgresql+asyncpg://postgres:postgres_dev@localhost:5432/konstruct", ) _APP_URL = os.environ.get( "DATABASE_URL", "postgresql+asyncpg://konstruct_app:konstruct_dev@localhost:5432/konstruct", ) def _replace_db_name(url: str, new_db: str) -> str: """Replace database name in a SQLAlchemy URL string.""" parts = url.rsplit("/", 1) return f"{parts[0]}/{new_db}" # --------------------------------------------------------------------------- # Session-scoped synchronous setup — creates and migrates the test DB once # --------------------------------------------------------------------------- @pytest.fixture(scope="session") def test_db_name() -> str: """Create a fresh test database, run migrations, return DB name.""" db_name = f"konstruct_test_{uuid.uuid4().hex[:8]}" admin_postgres_url = _replace_db_name(_ADMIN_URL, "postgres") # Check PostgreSQL reachability using synchronous driver try: import asyncio as _asyncio async def _check() -> None: eng = create_async_engine(admin_postgres_url) async with eng.connect() as conn: await conn.execute(text("SELECT 1")) await eng.dispose() _asyncio.run(_check()) except Exception as exc: pytest.skip(f"PostgreSQL not available: {exc}") # Create test database async def _create_db() -> None: eng = create_async_engine(admin_postgres_url, isolation_level="AUTOCOMMIT") async with eng.connect() as conn: await conn.execute(text(f'CREATE DATABASE "{db_name}"')) await eng.dispose() asyncio.run(_create_db()) # Run Alembic migrations against test DB (subprocess — avoids loop conflicts) admin_test_url = _replace_db_name(_ADMIN_URL, db_name) result = subprocess.run( ["uv", "run", "alembic", "upgrade", "head"], env={**os.environ, "DATABASE_ADMIN_URL": admin_test_url}, capture_output=True, text=True, cwd=os.path.join(os.path.dirname(__file__), ".."), ) if result.returncode != 0: # Clean up on failure async def _drop_db() -> None: eng = create_async_engine(admin_postgres_url, isolation_level="AUTOCOMMIT") async with eng.connect() as conn: await conn.execute(text(f'DROP DATABASE IF EXISTS "{db_name}"')) await eng.dispose() asyncio.run(_drop_db()) pytest.fail(f"Alembic migration failed:\n{result.stdout}\n{result.stderr}") yield db_name # Teardown: drop test database async def _cleanup() -> None: eng = create_async_engine(admin_postgres_url, isolation_level="AUTOCOMMIT") async with eng.connect() as conn: await conn.execute( text( "SELECT pg_terminate_backend(pid) FROM pg_stat_activity " "WHERE datname = :dbname AND pid <> pg_backend_pid()" ), {"dbname": db_name}, ) await conn.execute(text(f'DROP DATABASE IF EXISTS "{db_name}"')) await eng.dispose() asyncio.run(_cleanup()) @pytest_asyncio.fixture async def db_engine(test_db_name: str) -> AsyncGenerator[AsyncEngine, None]: """ Function-scoped async engine connected as konstruct_app. Using konstruct_app role is critical — it enforces RLS. The postgres superuser would bypass RLS and make isolation tests worthless. """ app_test_url = _replace_db_name(_APP_URL, test_db_name) engine = create_async_engine(app_test_url, echo=False) yield engine await engine.dispose() @pytest_asyncio.fixture async def db_session(db_engine: AsyncEngine) -> AsyncGenerator[AsyncSession, None]: """ Function-scoped async session connected as konstruct_app. The RLS hook is configured on this engine so SET LOCAL statements are injected before each query when current_tenant_id is set. """ from shared.rls import configure_rls_hook # Always configure — SQLAlchemy event.listens_for is idempotent per listener function # when the same function object is registered; but since configure_rls_hook creates # a new closure each call, wrap with a set to avoid duplicate listeners. configure_rls_hook(db_engine) session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False) async with session_factory() as session: yield session await session.rollback() @pytest_asyncio.fixture async def tenant_a(db_session: AsyncSession) -> dict[str, Any]: """Create Tenant A and return its data dict.""" tenant_id = uuid.uuid4() suffix = uuid.uuid4().hex[:6] await db_session.execute( text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"), { "id": str(tenant_id), "name": f"Tenant Alpha {suffix}", "slug": f"tenant-alpha-{suffix}", "settings": "{}", }, ) await db_session.commit() return {"id": tenant_id} @pytest_asyncio.fixture async def tenant_b(db_session: AsyncSession) -> dict[str, Any]: """Create Tenant B and return its data dict.""" tenant_id = uuid.uuid4() suffix = uuid.uuid4().hex[:6] await db_session.execute( text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"), { "id": str(tenant_id), "name": f"Tenant Beta {suffix}", "slug": f"tenant-beta-{suffix}", "settings": "{}", }, ) await db_session.commit() return {"id": tenant_id}