- alembic.ini + migrations/env.py: async SQLAlchemy migration setup using asyncpg
- migrations/versions/001_initial_schema.py: creates tenants, agents, channel_connections, portal_users
- ENABLE + FORCE ROW LEVEL SECURITY on agents and channel_connections
- RLS policy: tenant_id = current_setting('app.current_tenant', TRUE)::uuid
- konstruct_app role created with SELECT/INSERT/UPDATE/DELETE on all tables
- packages/shared/shared/rls.py: idempotent configure_rls_hook, UUID-sanitized SET LOCAL
- tests/conftest.py: test_db_name (session-scoped), db_engine + db_session as konstruct_app
- tests/unit/test_normalize.py: 11 tests for KonstructMessage Slack normalization (CHAN-01)
- tests/unit/test_tenant_resolution.py: 7 tests for workspace_id → tenant resolution (TNNT-02)
- tests/unit/test_redis_namespacing.py: 15 tests for Redis key namespace isolation (TNNT-03)
- tests/integration/test_tenant_isolation.py: 7 tests proving RLS tenant isolation (TNNT-01)
- tenant_b cannot see tenant_a's agents or channel_connections
- FORCE ROW LEVEL SECURITY verified via pg_class.relforcerowsecurity
190 lines
6.7 KiB
Python
190 lines
6.7 KiB
Python
"""
|
|
Shared test fixtures for Konstruct.
|
|
|
|
IMPORTANT: The `db_session` fixture connects as `konstruct_app` (not postgres
|
|
superuser). This is mandatory — RLS is bypassed for superuser connections, so
|
|
tests using superuser would pass trivially while providing zero real protection.
|
|
|
|
Integration tests requiring a live PostgreSQL container are skipped if the
|
|
database is not available. Unit tests never require a live DB.
|
|
|
|
Event loop design: All async fixtures use function scope to avoid pytest-asyncio
|
|
cross-loop-scope issues. The test database is created once (at session scope, via
|
|
a synchronous fixture) and reused across tests within the session.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
import uuid
|
|
from collections.abc import AsyncGenerator
|
|
from typing import Any
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database URLs
|
|
# ---------------------------------------------------------------------------
|
|
_ADMIN_URL = os.environ.get(
|
|
"DATABASE_ADMIN_URL",
|
|
"postgresql+asyncpg://postgres:postgres_dev@localhost:5432/konstruct",
|
|
)
|
|
_APP_URL = os.environ.get(
|
|
"DATABASE_URL",
|
|
"postgresql+asyncpg://konstruct_app:konstruct_dev@localhost:5432/konstruct",
|
|
)
|
|
|
|
|
|
def _replace_db_name(url: str, new_db: str) -> str:
|
|
"""Replace database name in a SQLAlchemy URL string."""
|
|
parts = url.rsplit("/", 1)
|
|
return f"{parts[0]}/{new_db}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Session-scoped synchronous setup — creates and migrates the test DB once
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(scope="session")
|
|
def test_db_name() -> str:
|
|
"""Create a fresh test database, run migrations, return DB name."""
|
|
db_name = f"konstruct_test_{uuid.uuid4().hex[:8]}"
|
|
admin_postgres_url = _replace_db_name(_ADMIN_URL, "postgres")
|
|
|
|
# Check PostgreSQL reachability using synchronous driver
|
|
try:
|
|
import asyncio as _asyncio
|
|
|
|
async def _check() -> None:
|
|
eng = create_async_engine(admin_postgres_url)
|
|
async with eng.connect() as conn:
|
|
await conn.execute(text("SELECT 1"))
|
|
await eng.dispose()
|
|
|
|
_asyncio.run(_check())
|
|
except Exception as exc:
|
|
pytest.skip(f"PostgreSQL not available: {exc}")
|
|
|
|
# Create test database
|
|
async def _create_db() -> None:
|
|
eng = create_async_engine(admin_postgres_url, isolation_level="AUTOCOMMIT")
|
|
async with eng.connect() as conn:
|
|
await conn.execute(text(f'CREATE DATABASE "{db_name}"'))
|
|
await eng.dispose()
|
|
|
|
asyncio.run(_create_db())
|
|
|
|
# Run Alembic migrations against test DB (subprocess — avoids loop conflicts)
|
|
admin_test_url = _replace_db_name(_ADMIN_URL, db_name)
|
|
result = subprocess.run(
|
|
["uv", "run", "alembic", "upgrade", "head"],
|
|
env={**os.environ, "DATABASE_ADMIN_URL": admin_test_url},
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=os.path.join(os.path.dirname(__file__), ".."),
|
|
)
|
|
if result.returncode != 0:
|
|
# Clean up on failure
|
|
async def _drop_db() -> None:
|
|
eng = create_async_engine(admin_postgres_url, isolation_level="AUTOCOMMIT")
|
|
async with eng.connect() as conn:
|
|
await conn.execute(text(f'DROP DATABASE IF EXISTS "{db_name}"'))
|
|
await eng.dispose()
|
|
|
|
asyncio.run(_drop_db())
|
|
pytest.fail(f"Alembic migration failed:\n{result.stdout}\n{result.stderr}")
|
|
|
|
yield db_name
|
|
|
|
# Teardown: drop test database
|
|
async def _cleanup() -> None:
|
|
eng = create_async_engine(admin_postgres_url, isolation_level="AUTOCOMMIT")
|
|
async with eng.connect() as conn:
|
|
await conn.execute(
|
|
text(
|
|
"SELECT pg_terminate_backend(pid) FROM pg_stat_activity "
|
|
"WHERE datname = :dbname AND pid <> pg_backend_pid()"
|
|
),
|
|
{"dbname": db_name},
|
|
)
|
|
await conn.execute(text(f'DROP DATABASE IF EXISTS "{db_name}"'))
|
|
await eng.dispose()
|
|
|
|
asyncio.run(_cleanup())
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def db_engine(test_db_name: str) -> AsyncGenerator[AsyncEngine, None]:
|
|
"""
|
|
Function-scoped async engine connected as konstruct_app.
|
|
|
|
Using konstruct_app role is critical — it enforces RLS. The postgres
|
|
superuser would bypass RLS and make isolation tests worthless.
|
|
"""
|
|
app_test_url = _replace_db_name(_APP_URL, test_db_name)
|
|
engine = create_async_engine(app_test_url, echo=False)
|
|
yield engine
|
|
await engine.dispose()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def db_session(db_engine: AsyncEngine) -> AsyncGenerator[AsyncSession, None]:
|
|
"""
|
|
Function-scoped async session connected as konstruct_app.
|
|
|
|
The RLS hook is configured on this engine so SET LOCAL statements are
|
|
injected before each query when current_tenant_id is set.
|
|
"""
|
|
from shared.rls import configure_rls_hook
|
|
|
|
# Always configure — SQLAlchemy event.listens_for is idempotent per listener function
|
|
# when the same function object is registered; but since configure_rls_hook creates
|
|
# a new closure each call, wrap with a set to avoid duplicate listeners.
|
|
configure_rls_hook(db_engine)
|
|
|
|
session_factory = async_sessionmaker(db_engine, class_=AsyncSession, expire_on_commit=False)
|
|
async with session_factory() as session:
|
|
yield session
|
|
await session.rollback()
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def tenant_a(db_session: AsyncSession) -> dict[str, Any]:
|
|
"""Create Tenant A and return its data dict."""
|
|
tenant_id = uuid.uuid4()
|
|
suffix = uuid.uuid4().hex[:6]
|
|
await db_session.execute(
|
|
text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"),
|
|
{
|
|
"id": str(tenant_id),
|
|
"name": f"Tenant Alpha {suffix}",
|
|
"slug": f"tenant-alpha-{suffix}",
|
|
"settings": "{}",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
return {"id": tenant_id}
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def tenant_b(db_session: AsyncSession) -> dict[str, Any]:
|
|
"""Create Tenant B and return its data dict."""
|
|
tenant_id = uuid.uuid4()
|
|
suffix = uuid.uuid4().hex[:6]
|
|
await db_session.execute(
|
|
text("INSERT INTO tenants (id, name, slug, settings) VALUES (:id, :name, :slug, :settings)"),
|
|
{
|
|
"id": str(tenant_id),
|
|
"name": f"Tenant Beta {suffix}",
|
|
"slug": f"tenant-beta-{suffix}",
|
|
"settings": "{}",
|
|
},
|
|
)
|
|
await db_session.commit()
|
|
return {"id": tenant_id}
|