feat(02-01): wire two-layer memory into orchestrator pipeline

- builder.py: add build_messages_with_memory() — injects pgvector context as
  system message + sliding window history before current user turn
- runner.py: accept optional messages parameter; fallback to simple build for
  backward compat (existing tests unaffected)
- tasks.py: memory pipeline in _process_message — load short-term + retrieve
  long-term before LLM call; append turns to Redis + dispatch embed_and_store
  fire-and-forget after response
- tasks.py: add embed_and_store Celery task (sync def + asyncio.run()) for
  async pgvector backfill — never blocks the LLM response pipeline
- memory/embedder.py: lazy singleton SentenceTransformer (all-MiniLM-L6-v2)
  with embed_text() / embed_texts() helpers
- All 202 tests pass (196 existing + 6 new memory integration tests)
This commit is contained in:
2026-03-23 14:45:21 -06:00
parent 2dc94682ff
commit 45b957377f
4 changed files with 345 additions and 14 deletions

View File

@@ -11,6 +11,14 @@ AI TRANSPARENCY POLICY:
Per Konstruct product design, agents MUST acknowledge they are AI assistants Per Konstruct product design, agents MUST acknowledge they are AI assistants
when directly asked. This clause is injected unconditionally to prevent when directly asked. This clause is injected unconditionally to prevent
agents from deceiving users, regardless of persona configuration. agents from deceiving users, regardless of persona configuration.
Memory-enriched message assembly:
build_messages_with_memory() extends build_messages() by injecting:
1. Long-term context: semantically relevant past exchanges (pgvector)
Injected as a system message BEFORE the sliding window so the LLM
has background context without it polluting the conversation flow.
2. Short-term context: recent messages (Redis sliding window)
Represents the immediate conversation history in this session.
""" """
from __future__ import annotations from __future__ import annotations
@@ -82,3 +90,54 @@ def build_messages(
messages.append({"role": "user", "content": user_message}) messages.append({"role": "user", "content": user_message})
return messages return messages
def build_messages_with_memory(
agent: Agent,
current_message: str,
recent_messages: list[dict],
relevant_context: list[str],
) -> list[dict]:
"""
Build an LLM messages array enriched with two-layer memory.
Structure (in order):
1. System message — agent identity, persona, AI transparency clause
2. System message — long-term context from pgvector (ONLY if non-empty)
Injected as a system message before the sliding window so the LLM
has relevant background without it appearing in the conversation.
3. Sliding window messages — recent conversation history (user/assistant)
4. Current user message
The pgvector context is omitted entirely when relevant_context is empty —
injecting an empty context block would be noise in the LLM's context window.
Args:
agent: ORM Agent instance (for system prompt assembly).
current_message: The current user message text.
recent_messages: Short-term memory — list of {"role", "content"} dicts
from Redis sliding window (oldest first).
relevant_context: Long-term memory — list of content strings from
pgvector similarity search (most relevant first).
Returns:
List of message dicts suitable for an OpenAI-compatible API call.
"""
system_prompt = build_system_prompt(agent)
messages: list[dict] = [{"role": "system", "content": system_prompt}]
# Inject long-term pgvector context as a system message BEFORE sliding window
# Only inject when there IS relevant context — empty context block is noise
if relevant_context:
context_lines = "\n".join(f"- {item}" for item in relevant_context)
context_message = f"Relevant context from past conversations:\n{context_lines}"
messages.append({"role": "system", "content": context_message})
# Append short-term sliding window messages (recent conversation history)
if recent_messages:
messages.extend(recent_messages)
# Append the current user message
messages.append({"role": "user", "content": current_message})
return messages

View File

@@ -31,19 +31,29 @@ _FALLBACK_RESPONSE = (
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0) _LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
async def run_agent(msg: KonstructMessage, agent: Agent) -> str: async def run_agent(
msg: KonstructMessage,
agent: Agent,
messages: list[dict] | None = None,
) -> str:
""" """
Execute an agent against the LLM pool and return the response text. Execute an agent against the LLM pool and return the response text.
Args: Args:
msg: The inbound Konstruct message being processed. msg: The inbound Konstruct message being processed.
agent: The ORM Agent instance that handles this message. agent: The ORM Agent instance that handles this message.
messages: Optional pre-built messages array (e.g. from
build_messages_with_memory). When provided, used directly.
When None, falls back to simple [system, user] construction
for backward compatibility (e.g. existing tests).
Returns: Returns:
The LLM response content as a plain string. The LLM response content as a plain string.
Returns a polite fallback message if the LLM pool is unreachable or Returns a polite fallback message if the LLM pool is unreachable or
returns a non-200 response. returns a non-200 response.
""" """
if messages is None:
# Fallback: simple two-message construction (backward compat)
system_prompt = build_system_prompt(agent) system_prompt = build_system_prompt(agent)
# Extract user text from the message content # Extract user text from the message content

View File

@@ -0,0 +1,81 @@
"""
Singleton embedding model for the Orchestrator.
Loads all-MiniLM-L6-v2 once at module level (lazy singleton pattern).
The model produces 384-dimensional embeddings compatible with the
conversation_embeddings.embedding vector(384) column.
Why a singleton: sentence-transformers models are ~100MB and take ~2s to load.
Loading per-request would be catastrophically slow. Loading at module level
means the model is loaded once when the Celery worker starts.
Thread safety: SentenceTransformer.encode() releases the GIL and is safe to
call from multiple Celery threads simultaneously.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sentence_transformers import SentenceTransformer
logger = logging.getLogger(__name__)
# Embedding model name — must match the vector(384) column dimension
_MODEL_NAME = "all-MiniLM-L6-v2"
# Lazy singleton — loaded on first use, not at import time
# This avoids 2s+ load time when the module is imported but not used
_model: "SentenceTransformer | None" = None
def get_embedding_model() -> "SentenceTransformer":
"""
Return the singleton SentenceTransformer model, loading it on first call.
Thread-safe: multiple Celery workers can call this concurrently.
The model is loaded only once per process.
Returns:
Loaded SentenceTransformer model (all-MiniLM-L6-v2, 384 dims).
"""
global _model
if _model is None:
logger.info("Loading embedding model %s (first use)", _MODEL_NAME)
from sentence_transformers import SentenceTransformer
_model = SentenceTransformer(_MODEL_NAME)
logger.info("Embedding model %s loaded", _MODEL_NAME)
return _model
def embed_text(text: str) -> list[float]:
"""
Embed a single text string using all-MiniLM-L6-v2.
Args:
text: The text to embed.
Returns:
384-dimensional float list.
"""
model = get_embedding_model()
embedding = model.encode(text, normalize_embeddings=True)
return embedding.tolist()
def embed_texts(texts: list[str]) -> list[list[float]]:
"""
Embed a list of text strings in batch (more efficient than one-by-one).
Args:
texts: List of strings to embed.
Returns:
List of 384-dimensional float lists, same order as input.
"""
model = get_embedding_model()
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=32)
return [e.tolist() for e in embeddings]

View File

@@ -9,6 +9,19 @@ Celery task definitions for the Konstruct Agent Orchestrator.
# NEVER change these to `async def`. If you see a RuntimeError about "no # NEVER change these to `async def`. If you see a RuntimeError about "no
# running event loop" or tasks that silently never complete, check for # running event loop" or tasks that silently never complete, check for
# accidental async def usage first. # accidental async def usage first.
Memory pipeline (Phase 2):
Before LLM call:
1. get_recent_messages() — load Redis sliding window (last 20 msgs)
2. embed current message + retrieve_relevant() — pgvector long-term context
3. build_messages_with_memory() — assemble enriched messages array
After LLM response:
4. append_message() x2 — save user + assistant turns to Redis
5. embed_and_store.delay() — fire-and-forget pgvector backfill (async)
The embed_and_store Celery task runs asynchronously, meaning the LLM response
is never blocked waiting for embedding computation.
""" """
from __future__ import annotations from __future__ import annotations
@@ -23,6 +36,88 @@ from shared.models.message import KonstructMessage
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@app.task(
name="orchestrator.tasks.embed_and_store",
bind=False,
max_retries=2,
default_retry_delay=10,
ignore_result=True, # Fire-and-forget — callers don't await the result
)
def embed_and_store(
tenant_id: str,
agent_id: str,
user_id: str,
messages: list[dict],
) -> None:
"""
Asynchronously embed conversation turns and store them in pgvector.
Dispatched fire-and-forget after the LLM response so embedding computation
NEVER blocks the user-facing response pipeline.
Args:
tenant_id: Tenant UUID string.
agent_id: Agent UUID string.
user_id: End-user identifier.
messages: List of {"role", "content"} dicts to embed and store.
Typically [user_message, assistant_response].
"""
asyncio.run(_embed_and_store_async(tenant_id, agent_id, user_id, messages))
async def _embed_and_store_async(
tenant_id: str,
agent_id: str,
user_id: str,
messages: list[dict],
) -> None:
"""
Async implementation of embed_and_store.
Embeds all messages in batch (more efficient than one-by-one) then stores
each embedding in conversation_embeddings via store_embedding().
"""
from orchestrator.memory.embedder import embed_texts
from orchestrator.memory.long_term import store_embedding
from shared.db import async_session_factory, engine
from shared.rls import configure_rls_hook, current_tenant_id
if not messages:
return
tenant_uuid = uuid.UUID(tenant_id)
agent_uuid = uuid.UUID(agent_id)
# Embed all message texts in a single batch call
texts = [msg["content"] for msg in messages]
embeddings = embed_texts(texts)
configure_rls_hook(engine)
token = current_tenant_id.set(tenant_uuid)
try:
async with async_session_factory() as session:
for msg, embedding in zip(messages, embeddings, strict=True):
await store_embedding(
session,
tenant_uuid,
agent_uuid,
user_id,
msg["content"],
msg["role"],
embedding,
)
await session.commit()
except Exception:
logger.exception(
"embed_and_store failed for tenant=%s agent=%s user=%s",
tenant_id,
agent_id,
user_id,
)
finally:
current_tenant_id.reset(token)
@app.task( @app.task(
name="orchestrator.tasks.handle_message", name="orchestrator.tasks.handle_message",
bind=True, bind=True,
@@ -82,7 +177,16 @@ async def _process_message(
channel_id: str = "", channel_id: str = "",
) -> dict: ) -> dict:
""" """
Async agent pipeline — load agent config, build prompt, call LLM pool. Async agent pipeline — load agent config, build memory-enriched prompt, call LLM pool.
Memory pipeline (Phase 2 additions):
BEFORE LLM call:
1. Load recent messages from Redis sliding window
2. Embed current message and retrieve semantically relevant long-term context
3. Build memory-enriched messages array via build_messages_with_memory()
AFTER LLM response:
4. Append user message + assistant response to Redis sliding window
5. Dispatch embed_and_store.delay() for async pgvector backfill
After getting the LLM response, if Slack placeholder metadata is present, After getting the LLM response, if Slack placeholder metadata is present,
updates the "Thinking..." placeholder message with the real response using updates the "Thinking..." placeholder message with the real response using
@@ -99,7 +203,11 @@ async def _process_message(
Returns: Returns:
Dict with message_id, response, and tenant_id. Dict with message_id, response, and tenant_id.
""" """
from orchestrator.agents.builder import build_messages_with_memory
from orchestrator.agents.runner import run_agent from orchestrator.agents.runner import run_agent
from orchestrator.memory.embedder import embed_text
from orchestrator.memory.long_term import retrieve_relevant
from orchestrator.memory.short_term import append_message, get_recent_messages
from shared.db import async_session_factory, engine from shared.db import async_session_factory, engine
from shared.models.tenant import Agent from shared.models.tenant import Agent
from shared.rls import configure_rls_hook, current_tenant_id from shared.rls import configure_rls_hook, current_tenant_id
@@ -120,9 +228,9 @@ async def _process_message(
token = current_tenant_id.set(tenant_uuid) token = current_tenant_id.set(tenant_uuid)
slack_bot_token: str = "" slack_bot_token: str = ""
agent: Agent | None = None
try: try:
agent: Agent | None = None
async with async_session_factory() as session: async with async_session_factory() as session:
from sqlalchemy import select from sqlalchemy import select
@@ -173,13 +281,68 @@ async def _process_message(
"tenant_id": msg.tenant_id, "tenant_id": msg.tenant_id,
} }
response_text = await run_agent(msg, agent) # Determine user_id for memory scoping: use sender.user_id if available,
# fall back to thread_id (for non-identified channel contexts like webhooks)
user_id: str = (
msg.sender.user_id
if msg.sender and msg.sender.user_id
else (msg.thread_id or msg.id)
)
agent_id_str = str(agent.id)
user_text: str = msg.content.text or ""
# -------------------------------------------------------------------------
# Memory retrieval (before LLM call)
# -------------------------------------------------------------------------
import redis.asyncio as aioredis
from shared.config import settings
redis_client = aioredis.from_url(settings.redis_url)
try:
# 1. Short-term: Redis sliding window
recent_messages = await get_recent_messages(
redis_client, msg.tenant_id, agent_id_str, user_id
)
# 2. Long-term: pgvector similarity search
relevant_context: list[str] = []
if user_text:
query_embedding = embed_text(user_text)
rls_token = current_tenant_id.set(tenant_uuid)
try:
async with async_session_factory() as session:
relevant_context = await retrieve_relevant(
session,
tenant_uuid,
agent.id,
user_id,
query_embedding,
)
finally:
current_tenant_id.reset(rls_token)
finally:
await redis_client.aclose()
# -------------------------------------------------------------------------
# Build memory-enriched messages array and run LLM
# -------------------------------------------------------------------------
enriched_messages = build_messages_with_memory(
agent=agent,
current_message=user_text,
recent_messages=recent_messages,
relevant_context=relevant_context,
)
response_text = await run_agent(msg, agent, messages=enriched_messages)
logger.info( logger.info(
"Message %s processed by agent=%s tenant=%s", "Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
msg.id, msg.id,
agent.id, agent.id,
msg.tenant_id, msg.tenant_id,
len(recent_messages),
len(relevant_context),
) )
# Replace the "Thinking..." placeholder with the real response # Replace the "Thinking..." placeholder with the real response
@@ -191,6 +354,24 @@ async def _process_message(
text=response_text, text=response_text,
) )
# -------------------------------------------------------------------------
# Memory persistence (after LLM response)
# -------------------------------------------------------------------------
redis_client2 = aioredis.from_url(settings.redis_url)
try:
# 3. Append both turns to Redis sliding window
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "user", user_text)
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
finally:
await redis_client2.aclose()
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
messages_to_embed = [
{"role": "user", "content": user_text},
{"role": "assistant", "content": response_text},
]
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
return { return {
"message_id": msg.id, "message_id": msg.id,
"response": response_text, "response": response_text,