fix: NullPool for Celery workers + skip pgvector on first message
- Celery workers use NullPool to avoid "Future attached to a different loop" errors from stale pooled async connections across asyncio.run() calls. FastAPI keeps regular pool (single event loop, safe to reuse). - Skip pgvector similarity search when no conversation history exists (first message) — saves ~3s embedding + query overhead. - Wrap pgvector retrieval in try/except to prevent DB errors from blocking the LLM response. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -493,21 +493,26 @@ async def _process_message(
|
||||
)
|
||||
|
||||
# 2. Long-term: pgvector similarity search
|
||||
# Skip if no conversation history exists yet (first message optimization —
|
||||
# embedding + pgvector query adds ~3s before the first token appears)
|
||||
relevant_context: list[str] = []
|
||||
if user_text:
|
||||
query_embedding = embed_text(user_text)
|
||||
rls_token = current_tenant_id.set(tenant_uuid)
|
||||
if user_text and recent_messages:
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
relevant_context = await retrieve_relevant(
|
||||
session,
|
||||
tenant_uuid,
|
||||
agent.id,
|
||||
user_id,
|
||||
query_embedding,
|
||||
)
|
||||
finally:
|
||||
current_tenant_id.reset(rls_token)
|
||||
query_embedding = embed_text(user_text)
|
||||
rls_token = current_tenant_id.set(tenant_uuid)
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
relevant_context = await retrieve_relevant(
|
||||
session,
|
||||
tenant_uuid,
|
||||
agent.id,
|
||||
user_id,
|
||||
query_embedding,
|
||||
)
|
||||
finally:
|
||||
current_tenant_id.reset(rls_token)
|
||||
except Exception:
|
||||
logger.warning("pgvector retrieval failed — continuing without long-term memory")
|
||||
finally:
|
||||
await redis_client2.aclose()
|
||||
|
||||
|
||||
@@ -14,19 +14,32 @@ from __future__ import annotations
|
||||
|
||||
from collections.abc import AsyncGenerator
|
||||
|
||||
import os
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
from shared.config import settings
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Engine — one per process; shared across all requests
|
||||
#
|
||||
# Celery workers use asyncio.run() per task, creating a new event loop each
|
||||
# time. Connection pools hold connections bound to the previous (closed) loop,
|
||||
# causing "Future attached to a different loop" errors. NullPool avoids this
|
||||
# by never reusing connections. FastAPI (single event loop) can safely use a
|
||||
# regular pool, but NullPool works fine there too with minimal overhead.
|
||||
# ---------------------------------------------------------------------------
|
||||
_is_celery_worker = "celery" in os.environ.get("_", "") or "celery" in " ".join(os.sys.argv)
|
||||
|
||||
engine: AsyncEngine = create_async_engine(
|
||||
settings.database_url,
|
||||
echo=settings.debug,
|
||||
pool_pre_ping=True,
|
||||
pool_size=10,
|
||||
max_overflow=20,
|
||||
**({"poolclass": NullPool} if _is_celery_worker else {
|
||||
"pool_pre_ping": True,
|
||||
"pool_size": 10,
|
||||
"max_overflow": 20,
|
||||
}),
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user