Files
konstruct/packages/shared/shared/db.py
Adolfo Delorenzo 2116059157 fix: NullPool for Celery workers + skip pgvector on first message
- Celery workers use NullPool to avoid "Future attached to a different
  loop" errors from stale pooled async connections across asyncio.run()
  calls. FastAPI keeps regular pool (single event loop, safe to reuse).
- Skip pgvector similarity search when no conversation history exists
  (first message) — saves ~3s embedding + query overhead.
- Wrap pgvector retrieval in try/except to prevent DB errors from
  blocking the LLM response.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 18:21:19 -06:00

70 lines
2.3 KiB
Python

"""
Async SQLAlchemy engine and session factory.
Usage in FastAPI:
async def route(session: AsyncSession = Depends(get_session)):
...
Usage in tests:
async with async_session_factory() as session:
...
"""
from __future__ import annotations
from collections.abc import AsyncGenerator
import os
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from shared.config import settings
# ---------------------------------------------------------------------------
# Engine — one per process; shared across all requests
#
# Celery workers use asyncio.run() per task, creating a new event loop each
# time. Connection pools hold connections bound to the previous (closed) loop,
# causing "Future attached to a different loop" errors. NullPool avoids this
# by never reusing connections. FastAPI (single event loop) can safely use a
# regular pool, but NullPool works fine there too with minimal overhead.
# ---------------------------------------------------------------------------
_is_celery_worker = "celery" in os.environ.get("_", "") or "celery" in " ".join(os.sys.argv)
engine: AsyncEngine = create_async_engine(
settings.database_url,
echo=settings.debug,
**({"poolclass": NullPool} if _is_celery_worker else {
"pool_pre_ping": True,
"pool_size": 10,
"max_overflow": 20,
}),
)
# ---------------------------------------------------------------------------
# Session factory
# ---------------------------------------------------------------------------
async_session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency that yields an async database session.
The session is automatically closed (and the connection returned to the
pool) when the request context exits, even if an exception is raised.
Example:
@router.get("/agents")
async def list_agents(session: AsyncSession = Depends(get_session)):
result = await session.execute(select(Agent))
return result.scalars().all()
"""
async with async_session_factory() as session:
yield session