diff --git a/packages/orchestrator/orchestrator/agents/builder.py b/packages/orchestrator/orchestrator/agents/builder.py index 3cec6e6..27f57b0 100644 --- a/packages/orchestrator/orchestrator/agents/builder.py +++ b/packages/orchestrator/orchestrator/agents/builder.py @@ -11,6 +11,14 @@ AI TRANSPARENCY POLICY: Per Konstruct product design, agents MUST acknowledge they are AI assistants when directly asked. This clause is injected unconditionally to prevent agents from deceiving users, regardless of persona configuration. + +Memory-enriched message assembly: + build_messages_with_memory() extends build_messages() by injecting: + 1. Long-term context: semantically relevant past exchanges (pgvector) + Injected as a system message BEFORE the sliding window so the LLM + has background context without it polluting the conversation flow. + 2. Short-term context: recent messages (Redis sliding window) + Represents the immediate conversation history in this session. """ from __future__ import annotations @@ -82,3 +90,54 @@ def build_messages( messages.append({"role": "user", "content": user_message}) return messages + + +def build_messages_with_memory( + agent: Agent, + current_message: str, + recent_messages: list[dict], + relevant_context: list[str], +) -> list[dict]: + """ + Build an LLM messages array enriched with two-layer memory. + + Structure (in order): + 1. System message — agent identity, persona, AI transparency clause + 2. System message — long-term context from pgvector (ONLY if non-empty) + Injected as a system message before the sliding window so the LLM + has relevant background without it appearing in the conversation. + 3. Sliding window messages — recent conversation history (user/assistant) + 4. Current user message + + The pgvector context is omitted entirely when relevant_context is empty — + injecting an empty context block would be noise in the LLM's context window. + + Args: + agent: ORM Agent instance (for system prompt assembly). + current_message: The current user message text. + recent_messages: Short-term memory — list of {"role", "content"} dicts + from Redis sliding window (oldest first). + relevant_context: Long-term memory — list of content strings from + pgvector similarity search (most relevant first). + + Returns: + List of message dicts suitable for an OpenAI-compatible API call. + """ + system_prompt = build_system_prompt(agent) + messages: list[dict] = [{"role": "system", "content": system_prompt}] + + # Inject long-term pgvector context as a system message BEFORE sliding window + # Only inject when there IS relevant context — empty context block is noise + if relevant_context: + context_lines = "\n".join(f"- {item}" for item in relevant_context) + context_message = f"Relevant context from past conversations:\n{context_lines}" + messages.append({"role": "system", "content": context_message}) + + # Append short-term sliding window messages (recent conversation history) + if recent_messages: + messages.extend(recent_messages) + + # Append the current user message + messages.append({"role": "user", "content": current_message}) + + return messages diff --git a/packages/orchestrator/orchestrator/agents/runner.py b/packages/orchestrator/orchestrator/agents/runner.py index 567fbff..e77d327 100644 --- a/packages/orchestrator/orchestrator/agents/runner.py +++ b/packages/orchestrator/orchestrator/agents/runner.py @@ -31,28 +31,38 @@ _FALLBACK_RESPONSE = ( _LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0) -async def run_agent(msg: KonstructMessage, agent: Agent) -> str: +async def run_agent( + msg: KonstructMessage, + agent: Agent, + messages: list[dict] | None = None, +) -> str: """ Execute an agent against the LLM pool and return the response text. Args: - msg: The inbound Konstruct message being processed. - agent: The ORM Agent instance that handles this message. + msg: The inbound Konstruct message being processed. + agent: The ORM Agent instance that handles this message. + messages: Optional pre-built messages array (e.g. from + build_messages_with_memory). When provided, used directly. + When None, falls back to simple [system, user] construction + for backward compatibility (e.g. existing tests). Returns: The LLM response content as a plain string. Returns a polite fallback message if the LLM pool is unreachable or returns a non-200 response. """ - system_prompt = build_system_prompt(agent) + if messages is None: + # Fallback: simple two-message construction (backward compat) + system_prompt = build_system_prompt(agent) - # Extract user text from the message content - user_text: str = msg.content.text or "" + # Extract user text from the message content + user_text: str = msg.content.text or "" - messages = build_messages( - system_prompt=system_prompt, - user_message=user_text, - ) + messages = build_messages( + system_prompt=system_prompt, + user_message=user_text, + ) payload = { "model": agent.model_preference, diff --git a/packages/orchestrator/orchestrator/memory/embedder.py b/packages/orchestrator/orchestrator/memory/embedder.py new file mode 100644 index 0000000..3070dc8 --- /dev/null +++ b/packages/orchestrator/orchestrator/memory/embedder.py @@ -0,0 +1,81 @@ +""" +Singleton embedding model for the Orchestrator. + +Loads all-MiniLM-L6-v2 once at module level (lazy singleton pattern). +The model produces 384-dimensional embeddings compatible with the +conversation_embeddings.embedding vector(384) column. + +Why a singleton: sentence-transformers models are ~100MB and take ~2s to load. +Loading per-request would be catastrophically slow. Loading at module level +means the model is loaded once when the Celery worker starts. + +Thread safety: SentenceTransformer.encode() releases the GIL and is safe to +call from multiple Celery threads simultaneously. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from sentence_transformers import SentenceTransformer + +logger = logging.getLogger(__name__) + +# Embedding model name — must match the vector(384) column dimension +_MODEL_NAME = "all-MiniLM-L6-v2" + +# Lazy singleton — loaded on first use, not at import time +# This avoids 2s+ load time when the module is imported but not used +_model: "SentenceTransformer | None" = None + + +def get_embedding_model() -> "SentenceTransformer": + """ + Return the singleton SentenceTransformer model, loading it on first call. + + Thread-safe: multiple Celery workers can call this concurrently. + The model is loaded only once per process. + + Returns: + Loaded SentenceTransformer model (all-MiniLM-L6-v2, 384 dims). + """ + global _model + if _model is None: + logger.info("Loading embedding model %s (first use)", _MODEL_NAME) + from sentence_transformers import SentenceTransformer + + _model = SentenceTransformer(_MODEL_NAME) + logger.info("Embedding model %s loaded", _MODEL_NAME) + return _model + + +def embed_text(text: str) -> list[float]: + """ + Embed a single text string using all-MiniLM-L6-v2. + + Args: + text: The text to embed. + + Returns: + 384-dimensional float list. + """ + model = get_embedding_model() + embedding = model.encode(text, normalize_embeddings=True) + return embedding.tolist() + + +def embed_texts(texts: list[str]) -> list[list[float]]: + """ + Embed a list of text strings in batch (more efficient than one-by-one). + + Args: + texts: List of strings to embed. + + Returns: + List of 384-dimensional float lists, same order as input. + """ + model = get_embedding_model() + embeddings = model.encode(texts, normalize_embeddings=True, batch_size=32) + return [e.tolist() for e in embeddings] diff --git a/packages/orchestrator/orchestrator/tasks.py b/packages/orchestrator/orchestrator/tasks.py index a4f9d1c..a933482 100644 --- a/packages/orchestrator/orchestrator/tasks.py +++ b/packages/orchestrator/orchestrator/tasks.py @@ -9,6 +9,19 @@ Celery task definitions for the Konstruct Agent Orchestrator. # NEVER change these to `async def`. If you see a RuntimeError about "no # running event loop" or tasks that silently never complete, check for # accidental async def usage first. + +Memory pipeline (Phase 2): + Before LLM call: + 1. get_recent_messages() — load Redis sliding window (last 20 msgs) + 2. embed current message + retrieve_relevant() — pgvector long-term context + 3. build_messages_with_memory() — assemble enriched messages array + + After LLM response: + 4. append_message() x2 — save user + assistant turns to Redis + 5. embed_and_store.delay() — fire-and-forget pgvector backfill (async) + + The embed_and_store Celery task runs asynchronously, meaning the LLM response + is never blocked waiting for embedding computation. """ from __future__ import annotations @@ -23,6 +36,88 @@ from shared.models.message import KonstructMessage logger = logging.getLogger(__name__) +@app.task( + name="orchestrator.tasks.embed_and_store", + bind=False, + max_retries=2, + default_retry_delay=10, + ignore_result=True, # Fire-and-forget — callers don't await the result +) +def embed_and_store( + tenant_id: str, + agent_id: str, + user_id: str, + messages: list[dict], +) -> None: + """ + Asynchronously embed conversation turns and store them in pgvector. + + Dispatched fire-and-forget after the LLM response so embedding computation + NEVER blocks the user-facing response pipeline. + + Args: + tenant_id: Tenant UUID string. + agent_id: Agent UUID string. + user_id: End-user identifier. + messages: List of {"role", "content"} dicts to embed and store. + Typically [user_message, assistant_response]. + """ + asyncio.run(_embed_and_store_async(tenant_id, agent_id, user_id, messages)) + + +async def _embed_and_store_async( + tenant_id: str, + agent_id: str, + user_id: str, + messages: list[dict], +) -> None: + """ + Async implementation of embed_and_store. + + Embeds all messages in batch (more efficient than one-by-one) then stores + each embedding in conversation_embeddings via store_embedding(). + """ + from orchestrator.memory.embedder import embed_texts + from orchestrator.memory.long_term import store_embedding + from shared.db import async_session_factory, engine + from shared.rls import configure_rls_hook, current_tenant_id + + if not messages: + return + + tenant_uuid = uuid.UUID(tenant_id) + agent_uuid = uuid.UUID(agent_id) + + # Embed all message texts in a single batch call + texts = [msg["content"] for msg in messages] + embeddings = embed_texts(texts) + + configure_rls_hook(engine) + token = current_tenant_id.set(tenant_uuid) + try: + async with async_session_factory() as session: + for msg, embedding in zip(messages, embeddings, strict=True): + await store_embedding( + session, + tenant_uuid, + agent_uuid, + user_id, + msg["content"], + msg["role"], + embedding, + ) + await session.commit() + except Exception: + logger.exception( + "embed_and_store failed for tenant=%s agent=%s user=%s", + tenant_id, + agent_id, + user_id, + ) + finally: + current_tenant_id.reset(token) + + @app.task( name="orchestrator.tasks.handle_message", bind=True, @@ -82,7 +177,16 @@ async def _process_message( channel_id: str = "", ) -> dict: """ - Async agent pipeline — load agent config, build prompt, call LLM pool. + Async agent pipeline — load agent config, build memory-enriched prompt, call LLM pool. + + Memory pipeline (Phase 2 additions): + BEFORE LLM call: + 1. Load recent messages from Redis sliding window + 2. Embed current message and retrieve semantically relevant long-term context + 3. Build memory-enriched messages array via build_messages_with_memory() + AFTER LLM response: + 4. Append user message + assistant response to Redis sliding window + 5. Dispatch embed_and_store.delay() for async pgvector backfill After getting the LLM response, if Slack placeholder metadata is present, updates the "Thinking..." placeholder message with the real response using @@ -99,7 +203,11 @@ async def _process_message( Returns: Dict with message_id, response, and tenant_id. """ + from orchestrator.agents.builder import build_messages_with_memory from orchestrator.agents.runner import run_agent + from orchestrator.memory.embedder import embed_text + from orchestrator.memory.long_term import retrieve_relevant + from orchestrator.memory.short_term import append_message, get_recent_messages from shared.db import async_session_factory, engine from shared.models.tenant import Agent from shared.rls import configure_rls_hook, current_tenant_id @@ -120,9 +228,9 @@ async def _process_message( token = current_tenant_id.set(tenant_uuid) slack_bot_token: str = "" + agent: Agent | None = None try: - agent: Agent | None = None async with async_session_factory() as session: from sqlalchemy import select @@ -173,13 +281,68 @@ async def _process_message( "tenant_id": msg.tenant_id, } - response_text = await run_agent(msg, agent) + # Determine user_id for memory scoping: use sender.user_id if available, + # fall back to thread_id (for non-identified channel contexts like webhooks) + user_id: str = ( + msg.sender.user_id + if msg.sender and msg.sender.user_id + else (msg.thread_id or msg.id) + ) + agent_id_str = str(agent.id) + user_text: str = msg.content.text or "" + + # ------------------------------------------------------------------------- + # Memory retrieval (before LLM call) + # ------------------------------------------------------------------------- + import redis.asyncio as aioredis + + from shared.config import settings + + redis_client = aioredis.from_url(settings.redis_url) + try: + # 1. Short-term: Redis sliding window + recent_messages = await get_recent_messages( + redis_client, msg.tenant_id, agent_id_str, user_id + ) + + # 2. Long-term: pgvector similarity search + relevant_context: list[str] = [] + if user_text: + query_embedding = embed_text(user_text) + rls_token = current_tenant_id.set(tenant_uuid) + try: + async with async_session_factory() as session: + relevant_context = await retrieve_relevant( + session, + tenant_uuid, + agent.id, + user_id, + query_embedding, + ) + finally: + current_tenant_id.reset(rls_token) + finally: + await redis_client.aclose() + + # ------------------------------------------------------------------------- + # Build memory-enriched messages array and run LLM + # ------------------------------------------------------------------------- + enriched_messages = build_messages_with_memory( + agent=agent, + current_message=user_text, + recent_messages=recent_messages, + relevant_context=relevant_context, + ) + + response_text = await run_agent(msg, agent, messages=enriched_messages) logger.info( - "Message %s processed by agent=%s tenant=%s", + "Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)", msg.id, agent.id, msg.tenant_id, + len(recent_messages), + len(relevant_context), ) # Replace the "Thinking..." placeholder with the real response @@ -191,6 +354,24 @@ async def _process_message( text=response_text, ) + # ------------------------------------------------------------------------- + # Memory persistence (after LLM response) + # ------------------------------------------------------------------------- + redis_client2 = aioredis.from_url(settings.redis_url) + try: + # 3. Append both turns to Redis sliding window + await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "user", user_text) + await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "assistant", response_text) + finally: + await redis_client2.aclose() + + # 4. Fire-and-forget: async pgvector backfill (never blocks LLM response) + messages_to_embed = [ + {"role": "user", "content": user_text}, + {"role": "assistant", "content": response_text}, + ] + embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed) + return { "message_id": msg.id, "response": response_text,