diff --git a/packages/orchestrator/orchestrator/tasks.py b/packages/orchestrator/orchestrator/tasks.py index d74d501..dce7ba4 100644 --- a/packages/orchestrator/orchestrator/tasks.py +++ b/packages/orchestrator/orchestrator/tasks.py @@ -493,21 +493,26 @@ async def _process_message( ) # 2. Long-term: pgvector similarity search + # Skip if no conversation history exists yet (first message optimization — + # embedding + pgvector query adds ~3s before the first token appears) relevant_context: list[str] = [] - if user_text: - query_embedding = embed_text(user_text) - rls_token = current_tenant_id.set(tenant_uuid) + if user_text and recent_messages: try: - async with async_session_factory() as session: - relevant_context = await retrieve_relevant( - session, - tenant_uuid, - agent.id, - user_id, - query_embedding, - ) - finally: - current_tenant_id.reset(rls_token) + query_embedding = embed_text(user_text) + rls_token = current_tenant_id.set(tenant_uuid) + try: + async with async_session_factory() as session: + relevant_context = await retrieve_relevant( + session, + tenant_uuid, + agent.id, + user_id, + query_embedding, + ) + finally: + current_tenant_id.reset(rls_token) + except Exception: + logger.warning("pgvector retrieval failed — continuing without long-term memory") finally: await redis_client2.aclose() diff --git a/packages/shared/shared/db.py b/packages/shared/shared/db.py index cb41e69..89d3c8d 100644 --- a/packages/shared/shared/db.py +++ b/packages/shared/shared/db.py @@ -14,19 +14,32 @@ from __future__ import annotations from collections.abc import AsyncGenerator +import os + from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool from shared.config import settings # --------------------------------------------------------------------------- # Engine — one per process; shared across all requests +# +# Celery workers use asyncio.run() per task, creating a new event loop each +# time. Connection pools hold connections bound to the previous (closed) loop, +# causing "Future attached to a different loop" errors. NullPool avoids this +# by never reusing connections. FastAPI (single event loop) can safely use a +# regular pool, but NullPool works fine there too with minimal overhead. # --------------------------------------------------------------------------- +_is_celery_worker = "celery" in os.environ.get("_", "") or "celery" in " ".join(os.sys.argv) + engine: AsyncEngine = create_async_engine( settings.database_url, echo=settings.debug, - pool_pre_ping=True, - pool_size=10, - max_overflow=20, + **({"poolclass": NullPool} if _is_celery_worker else { + "pool_pre_ping": True, + "pool_size": 10, + "max_overflow": 20, + }), ) # ---------------------------------------------------------------------------