feat(02-01): wire two-layer memory into orchestrator pipeline

- builder.py: add build_messages_with_memory() — injects pgvector context as
  system message + sliding window history before current user turn
- runner.py: accept optional messages parameter; fallback to simple build for
  backward compat (existing tests unaffected)
- tasks.py: memory pipeline in _process_message — load short-term + retrieve
  long-term before LLM call; append turns to Redis + dispatch embed_and_store
  fire-and-forget after response
- tasks.py: add embed_and_store Celery task (sync def + asyncio.run()) for
  async pgvector backfill — never blocks the LLM response pipeline
- memory/embedder.py: lazy singleton SentenceTransformer (all-MiniLM-L6-v2)
  with embed_text() / embed_texts() helpers
- All 202 tests pass (196 existing + 6 new memory integration tests)
This commit is contained in:
2026-03-23 14:45:21 -06:00
parent 2dc94682ff
commit 45b957377f
4 changed files with 345 additions and 14 deletions

View File

@@ -11,6 +11,14 @@ AI TRANSPARENCY POLICY:
Per Konstruct product design, agents MUST acknowledge they are AI assistants
when directly asked. This clause is injected unconditionally to prevent
agents from deceiving users, regardless of persona configuration.
Memory-enriched message assembly:
build_messages_with_memory() extends build_messages() by injecting:
1. Long-term context: semantically relevant past exchanges (pgvector)
Injected as a system message BEFORE the sliding window so the LLM
has background context without it polluting the conversation flow.
2. Short-term context: recent messages (Redis sliding window)
Represents the immediate conversation history in this session.
"""
from __future__ import annotations
@@ -82,3 +90,54 @@ def build_messages(
messages.append({"role": "user", "content": user_message})
return messages
def build_messages_with_memory(
agent: Agent,
current_message: str,
recent_messages: list[dict],
relevant_context: list[str],
) -> list[dict]:
"""
Build an LLM messages array enriched with two-layer memory.
Structure (in order):
1. System message — agent identity, persona, AI transparency clause
2. System message — long-term context from pgvector (ONLY if non-empty)
Injected as a system message before the sliding window so the LLM
has relevant background without it appearing in the conversation.
3. Sliding window messages — recent conversation history (user/assistant)
4. Current user message
The pgvector context is omitted entirely when relevant_context is empty —
injecting an empty context block would be noise in the LLM's context window.
Args:
agent: ORM Agent instance (for system prompt assembly).
current_message: The current user message text.
recent_messages: Short-term memory — list of {"role", "content"} dicts
from Redis sliding window (oldest first).
relevant_context: Long-term memory — list of content strings from
pgvector similarity search (most relevant first).
Returns:
List of message dicts suitable for an OpenAI-compatible API call.
"""
system_prompt = build_system_prompt(agent)
messages: list[dict] = [{"role": "system", "content": system_prompt}]
# Inject long-term pgvector context as a system message BEFORE sliding window
# Only inject when there IS relevant context — empty context block is noise
if relevant_context:
context_lines = "\n".join(f"- {item}" for item in relevant_context)
context_message = f"Relevant context from past conversations:\n{context_lines}"
messages.append({"role": "system", "content": context_message})
# Append short-term sliding window messages (recent conversation history)
if recent_messages:
messages.extend(recent_messages)
# Append the current user message
messages.append({"role": "user", "content": current_message})
return messages

View File

@@ -31,19 +31,29 @@ _FALLBACK_RESPONSE = (
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
async def run_agent(msg: KonstructMessage, agent: Agent) -> str:
async def run_agent(
msg: KonstructMessage,
agent: Agent,
messages: list[dict] | None = None,
) -> str:
"""
Execute an agent against the LLM pool and return the response text.
Args:
msg: The inbound Konstruct message being processed.
agent: The ORM Agent instance that handles this message.
messages: Optional pre-built messages array (e.g. from
build_messages_with_memory). When provided, used directly.
When None, falls back to simple [system, user] construction
for backward compatibility (e.g. existing tests).
Returns:
The LLM response content as a plain string.
Returns a polite fallback message if the LLM pool is unreachable or
returns a non-200 response.
"""
if messages is None:
# Fallback: simple two-message construction (backward compat)
system_prompt = build_system_prompt(agent)
# Extract user text from the message content

View File

@@ -0,0 +1,81 @@
"""
Singleton embedding model for the Orchestrator.
Loads all-MiniLM-L6-v2 once at module level (lazy singleton pattern).
The model produces 384-dimensional embeddings compatible with the
conversation_embeddings.embedding vector(384) column.
Why a singleton: sentence-transformers models are ~100MB and take ~2s to load.
Loading per-request would be catastrophically slow. Loading at module level
means the model is loaded once when the Celery worker starts.
Thread safety: SentenceTransformer.encode() releases the GIL and is safe to
call from multiple Celery threads simultaneously.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from sentence_transformers import SentenceTransformer
logger = logging.getLogger(__name__)
# Embedding model name — must match the vector(384) column dimension
_MODEL_NAME = "all-MiniLM-L6-v2"
# Lazy singleton — loaded on first use, not at import time
# This avoids 2s+ load time when the module is imported but not used
_model: "SentenceTransformer | None" = None
def get_embedding_model() -> "SentenceTransformer":
"""
Return the singleton SentenceTransformer model, loading it on first call.
Thread-safe: multiple Celery workers can call this concurrently.
The model is loaded only once per process.
Returns:
Loaded SentenceTransformer model (all-MiniLM-L6-v2, 384 dims).
"""
global _model
if _model is None:
logger.info("Loading embedding model %s (first use)", _MODEL_NAME)
from sentence_transformers import SentenceTransformer
_model = SentenceTransformer(_MODEL_NAME)
logger.info("Embedding model %s loaded", _MODEL_NAME)
return _model
def embed_text(text: str) -> list[float]:
"""
Embed a single text string using all-MiniLM-L6-v2.
Args:
text: The text to embed.
Returns:
384-dimensional float list.
"""
model = get_embedding_model()
embedding = model.encode(text, normalize_embeddings=True)
return embedding.tolist()
def embed_texts(texts: list[str]) -> list[list[float]]:
"""
Embed a list of text strings in batch (more efficient than one-by-one).
Args:
texts: List of strings to embed.
Returns:
List of 384-dimensional float lists, same order as input.
"""
model = get_embedding_model()
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=32)
return [e.tolist() for e in embeddings]

View File

@@ -9,6 +9,19 @@ Celery task definitions for the Konstruct Agent Orchestrator.
# NEVER change these to `async def`. If you see a RuntimeError about "no
# running event loop" or tasks that silently never complete, check for
# accidental async def usage first.
Memory pipeline (Phase 2):
Before LLM call:
1. get_recent_messages() — load Redis sliding window (last 20 msgs)
2. embed current message + retrieve_relevant() — pgvector long-term context
3. build_messages_with_memory() — assemble enriched messages array
After LLM response:
4. append_message() x2 — save user + assistant turns to Redis
5. embed_and_store.delay() — fire-and-forget pgvector backfill (async)
The embed_and_store Celery task runs asynchronously, meaning the LLM response
is never blocked waiting for embedding computation.
"""
from __future__ import annotations
@@ -23,6 +36,88 @@ from shared.models.message import KonstructMessage
logger = logging.getLogger(__name__)
@app.task(
name="orchestrator.tasks.embed_and_store",
bind=False,
max_retries=2,
default_retry_delay=10,
ignore_result=True, # Fire-and-forget — callers don't await the result
)
def embed_and_store(
tenant_id: str,
agent_id: str,
user_id: str,
messages: list[dict],
) -> None:
"""
Asynchronously embed conversation turns and store them in pgvector.
Dispatched fire-and-forget after the LLM response so embedding computation
NEVER blocks the user-facing response pipeline.
Args:
tenant_id: Tenant UUID string.
agent_id: Agent UUID string.
user_id: End-user identifier.
messages: List of {"role", "content"} dicts to embed and store.
Typically [user_message, assistant_response].
"""
asyncio.run(_embed_and_store_async(tenant_id, agent_id, user_id, messages))
async def _embed_and_store_async(
tenant_id: str,
agent_id: str,
user_id: str,
messages: list[dict],
) -> None:
"""
Async implementation of embed_and_store.
Embeds all messages in batch (more efficient than one-by-one) then stores
each embedding in conversation_embeddings via store_embedding().
"""
from orchestrator.memory.embedder import embed_texts
from orchestrator.memory.long_term import store_embedding
from shared.db import async_session_factory, engine
from shared.rls import configure_rls_hook, current_tenant_id
if not messages:
return
tenant_uuid = uuid.UUID(tenant_id)
agent_uuid = uuid.UUID(agent_id)
# Embed all message texts in a single batch call
texts = [msg["content"] for msg in messages]
embeddings = embed_texts(texts)
configure_rls_hook(engine)
token = current_tenant_id.set(tenant_uuid)
try:
async with async_session_factory() as session:
for msg, embedding in zip(messages, embeddings, strict=True):
await store_embedding(
session,
tenant_uuid,
agent_uuid,
user_id,
msg["content"],
msg["role"],
embedding,
)
await session.commit()
except Exception:
logger.exception(
"embed_and_store failed for tenant=%s agent=%s user=%s",
tenant_id,
agent_id,
user_id,
)
finally:
current_tenant_id.reset(token)
@app.task(
name="orchestrator.tasks.handle_message",
bind=True,
@@ -82,7 +177,16 @@ async def _process_message(
channel_id: str = "",
) -> dict:
"""
Async agent pipeline — load agent config, build prompt, call LLM pool.
Async agent pipeline — load agent config, build memory-enriched prompt, call LLM pool.
Memory pipeline (Phase 2 additions):
BEFORE LLM call:
1. Load recent messages from Redis sliding window
2. Embed current message and retrieve semantically relevant long-term context
3. Build memory-enriched messages array via build_messages_with_memory()
AFTER LLM response:
4. Append user message + assistant response to Redis sliding window
5. Dispatch embed_and_store.delay() for async pgvector backfill
After getting the LLM response, if Slack placeholder metadata is present,
updates the "Thinking..." placeholder message with the real response using
@@ -99,7 +203,11 @@ async def _process_message(
Returns:
Dict with message_id, response, and tenant_id.
"""
from orchestrator.agents.builder import build_messages_with_memory
from orchestrator.agents.runner import run_agent
from orchestrator.memory.embedder import embed_text
from orchestrator.memory.long_term import retrieve_relevant
from orchestrator.memory.short_term import append_message, get_recent_messages
from shared.db import async_session_factory, engine
from shared.models.tenant import Agent
from shared.rls import configure_rls_hook, current_tenant_id
@@ -120,9 +228,9 @@ async def _process_message(
token = current_tenant_id.set(tenant_uuid)
slack_bot_token: str = ""
agent: Agent | None = None
try:
agent: Agent | None = None
async with async_session_factory() as session:
from sqlalchemy import select
@@ -173,13 +281,68 @@ async def _process_message(
"tenant_id": msg.tenant_id,
}
response_text = await run_agent(msg, agent)
# Determine user_id for memory scoping: use sender.user_id if available,
# fall back to thread_id (for non-identified channel contexts like webhooks)
user_id: str = (
msg.sender.user_id
if msg.sender and msg.sender.user_id
else (msg.thread_id or msg.id)
)
agent_id_str = str(agent.id)
user_text: str = msg.content.text or ""
# -------------------------------------------------------------------------
# Memory retrieval (before LLM call)
# -------------------------------------------------------------------------
import redis.asyncio as aioredis
from shared.config import settings
redis_client = aioredis.from_url(settings.redis_url)
try:
# 1. Short-term: Redis sliding window
recent_messages = await get_recent_messages(
redis_client, msg.tenant_id, agent_id_str, user_id
)
# 2. Long-term: pgvector similarity search
relevant_context: list[str] = []
if user_text:
query_embedding = embed_text(user_text)
rls_token = current_tenant_id.set(tenant_uuid)
try:
async with async_session_factory() as session:
relevant_context = await retrieve_relevant(
session,
tenant_uuid,
agent.id,
user_id,
query_embedding,
)
finally:
current_tenant_id.reset(rls_token)
finally:
await redis_client.aclose()
# -------------------------------------------------------------------------
# Build memory-enriched messages array and run LLM
# -------------------------------------------------------------------------
enriched_messages = build_messages_with_memory(
agent=agent,
current_message=user_text,
recent_messages=recent_messages,
relevant_context=relevant_context,
)
response_text = await run_agent(msg, agent, messages=enriched_messages)
logger.info(
"Message %s processed by agent=%s tenant=%s",
"Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
msg.id,
agent.id,
msg.tenant_id,
len(recent_messages),
len(relevant_context),
)
# Replace the "Thinking..." placeholder with the real response
@@ -191,6 +354,24 @@ async def _process_message(
text=response_text,
)
# -------------------------------------------------------------------------
# Memory persistence (after LLM response)
# -------------------------------------------------------------------------
redis_client2 = aioredis.from_url(settings.redis_url)
try:
# 3. Append both turns to Redis sliding window
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "user", user_text)
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
finally:
await redis_client2.aclose()
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
messages_to_embed = [
{"role": "user", "content": user_text},
{"role": "assistant", "content": response_text},
]
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
return {
"message_id": msg.id,
"response": response_text,