feat(02-01): wire two-layer memory into orchestrator pipeline
- builder.py: add build_messages_with_memory() — injects pgvector context as system message + sliding window history before current user turn - runner.py: accept optional messages parameter; fallback to simple build for backward compat (existing tests unaffected) - tasks.py: memory pipeline in _process_message — load short-term + retrieve long-term before LLM call; append turns to Redis + dispatch embed_and_store fire-and-forget after response - tasks.py: add embed_and_store Celery task (sync def + asyncio.run()) for async pgvector backfill — never blocks the LLM response pipeline - memory/embedder.py: lazy singleton SentenceTransformer (all-MiniLM-L6-v2) with embed_text() / embed_texts() helpers - All 202 tests pass (196 existing + 6 new memory integration tests)
This commit is contained in:
@@ -11,6 +11,14 @@ AI TRANSPARENCY POLICY:
|
||||
Per Konstruct product design, agents MUST acknowledge they are AI assistants
|
||||
when directly asked. This clause is injected unconditionally to prevent
|
||||
agents from deceiving users, regardless of persona configuration.
|
||||
|
||||
Memory-enriched message assembly:
|
||||
build_messages_with_memory() extends build_messages() by injecting:
|
||||
1. Long-term context: semantically relevant past exchanges (pgvector)
|
||||
Injected as a system message BEFORE the sliding window so the LLM
|
||||
has background context without it polluting the conversation flow.
|
||||
2. Short-term context: recent messages (Redis sliding window)
|
||||
Represents the immediate conversation history in this session.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -82,3 +90,54 @@ def build_messages(
|
||||
|
||||
messages.append({"role": "user", "content": user_message})
|
||||
return messages
|
||||
|
||||
|
||||
def build_messages_with_memory(
|
||||
agent: Agent,
|
||||
current_message: str,
|
||||
recent_messages: list[dict],
|
||||
relevant_context: list[str],
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Build an LLM messages array enriched with two-layer memory.
|
||||
|
||||
Structure (in order):
|
||||
1. System message — agent identity, persona, AI transparency clause
|
||||
2. System message — long-term context from pgvector (ONLY if non-empty)
|
||||
Injected as a system message before the sliding window so the LLM
|
||||
has relevant background without it appearing in the conversation.
|
||||
3. Sliding window messages — recent conversation history (user/assistant)
|
||||
4. Current user message
|
||||
|
||||
The pgvector context is omitted entirely when relevant_context is empty —
|
||||
injecting an empty context block would be noise in the LLM's context window.
|
||||
|
||||
Args:
|
||||
agent: ORM Agent instance (for system prompt assembly).
|
||||
current_message: The current user message text.
|
||||
recent_messages: Short-term memory — list of {"role", "content"} dicts
|
||||
from Redis sliding window (oldest first).
|
||||
relevant_context: Long-term memory — list of content strings from
|
||||
pgvector similarity search (most relevant first).
|
||||
|
||||
Returns:
|
||||
List of message dicts suitable for an OpenAI-compatible API call.
|
||||
"""
|
||||
system_prompt = build_system_prompt(agent)
|
||||
messages: list[dict] = [{"role": "system", "content": system_prompt}]
|
||||
|
||||
# Inject long-term pgvector context as a system message BEFORE sliding window
|
||||
# Only inject when there IS relevant context — empty context block is noise
|
||||
if relevant_context:
|
||||
context_lines = "\n".join(f"- {item}" for item in relevant_context)
|
||||
context_message = f"Relevant context from past conversations:\n{context_lines}"
|
||||
messages.append({"role": "system", "content": context_message})
|
||||
|
||||
# Append short-term sliding window messages (recent conversation history)
|
||||
if recent_messages:
|
||||
messages.extend(recent_messages)
|
||||
|
||||
# Append the current user message
|
||||
messages.append({"role": "user", "content": current_message})
|
||||
|
||||
return messages
|
||||
|
||||
@@ -31,28 +31,38 @@ _FALLBACK_RESPONSE = (
|
||||
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
|
||||
|
||||
|
||||
async def run_agent(msg: KonstructMessage, agent: Agent) -> str:
|
||||
async def run_agent(
|
||||
msg: KonstructMessage,
|
||||
agent: Agent,
|
||||
messages: list[dict] | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
Execute an agent against the LLM pool and return the response text.
|
||||
|
||||
Args:
|
||||
msg: The inbound Konstruct message being processed.
|
||||
agent: The ORM Agent instance that handles this message.
|
||||
msg: The inbound Konstruct message being processed.
|
||||
agent: The ORM Agent instance that handles this message.
|
||||
messages: Optional pre-built messages array (e.g. from
|
||||
build_messages_with_memory). When provided, used directly.
|
||||
When None, falls back to simple [system, user] construction
|
||||
for backward compatibility (e.g. existing tests).
|
||||
|
||||
Returns:
|
||||
The LLM response content as a plain string.
|
||||
Returns a polite fallback message if the LLM pool is unreachable or
|
||||
returns a non-200 response.
|
||||
"""
|
||||
system_prompt = build_system_prompt(agent)
|
||||
if messages is None:
|
||||
# Fallback: simple two-message construction (backward compat)
|
||||
system_prompt = build_system_prompt(agent)
|
||||
|
||||
# Extract user text from the message content
|
||||
user_text: str = msg.content.text or ""
|
||||
# Extract user text from the message content
|
||||
user_text: str = msg.content.text or ""
|
||||
|
||||
messages = build_messages(
|
||||
system_prompt=system_prompt,
|
||||
user_message=user_text,
|
||||
)
|
||||
messages = build_messages(
|
||||
system_prompt=system_prompt,
|
||||
user_message=user_text,
|
||||
)
|
||||
|
||||
payload = {
|
||||
"model": agent.model_preference,
|
||||
|
||||
81
packages/orchestrator/orchestrator/memory/embedder.py
Normal file
81
packages/orchestrator/orchestrator/memory/embedder.py
Normal file
@@ -0,0 +1,81 @@
|
||||
"""
|
||||
Singleton embedding model for the Orchestrator.
|
||||
|
||||
Loads all-MiniLM-L6-v2 once at module level (lazy singleton pattern).
|
||||
The model produces 384-dimensional embeddings compatible with the
|
||||
conversation_embeddings.embedding vector(384) column.
|
||||
|
||||
Why a singleton: sentence-transformers models are ~100MB and take ~2s to load.
|
||||
Loading per-request would be catastrophically slow. Loading at module level
|
||||
means the model is loaded once when the Celery worker starts.
|
||||
|
||||
Thread safety: SentenceTransformer.encode() releases the GIL and is safe to
|
||||
call from multiple Celery threads simultaneously.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Embedding model name — must match the vector(384) column dimension
|
||||
_MODEL_NAME = "all-MiniLM-L6-v2"
|
||||
|
||||
# Lazy singleton — loaded on first use, not at import time
|
||||
# This avoids 2s+ load time when the module is imported but not used
|
||||
_model: "SentenceTransformer | None" = None
|
||||
|
||||
|
||||
def get_embedding_model() -> "SentenceTransformer":
|
||||
"""
|
||||
Return the singleton SentenceTransformer model, loading it on first call.
|
||||
|
||||
Thread-safe: multiple Celery workers can call this concurrently.
|
||||
The model is loaded only once per process.
|
||||
|
||||
Returns:
|
||||
Loaded SentenceTransformer model (all-MiniLM-L6-v2, 384 dims).
|
||||
"""
|
||||
global _model
|
||||
if _model is None:
|
||||
logger.info("Loading embedding model %s (first use)", _MODEL_NAME)
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
_model = SentenceTransformer(_MODEL_NAME)
|
||||
logger.info("Embedding model %s loaded", _MODEL_NAME)
|
||||
return _model
|
||||
|
||||
|
||||
def embed_text(text: str) -> list[float]:
|
||||
"""
|
||||
Embed a single text string using all-MiniLM-L6-v2.
|
||||
|
||||
Args:
|
||||
text: The text to embed.
|
||||
|
||||
Returns:
|
||||
384-dimensional float list.
|
||||
"""
|
||||
model = get_embedding_model()
|
||||
embedding = model.encode(text, normalize_embeddings=True)
|
||||
return embedding.tolist()
|
||||
|
||||
|
||||
def embed_texts(texts: list[str]) -> list[list[float]]:
|
||||
"""
|
||||
Embed a list of text strings in batch (more efficient than one-by-one).
|
||||
|
||||
Args:
|
||||
texts: List of strings to embed.
|
||||
|
||||
Returns:
|
||||
List of 384-dimensional float lists, same order as input.
|
||||
"""
|
||||
model = get_embedding_model()
|
||||
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=32)
|
||||
return [e.tolist() for e in embeddings]
|
||||
@@ -9,6 +9,19 @@ Celery task definitions for the Konstruct Agent Orchestrator.
|
||||
# NEVER change these to `async def`. If you see a RuntimeError about "no
|
||||
# running event loop" or tasks that silently never complete, check for
|
||||
# accidental async def usage first.
|
||||
|
||||
Memory pipeline (Phase 2):
|
||||
Before LLM call:
|
||||
1. get_recent_messages() — load Redis sliding window (last 20 msgs)
|
||||
2. embed current message + retrieve_relevant() — pgvector long-term context
|
||||
3. build_messages_with_memory() — assemble enriched messages array
|
||||
|
||||
After LLM response:
|
||||
4. append_message() x2 — save user + assistant turns to Redis
|
||||
5. embed_and_store.delay() — fire-and-forget pgvector backfill (async)
|
||||
|
||||
The embed_and_store Celery task runs asynchronously, meaning the LLM response
|
||||
is never blocked waiting for embedding computation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -23,6 +36,88 @@ from shared.models.message import KonstructMessage
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@app.task(
|
||||
name="orchestrator.tasks.embed_and_store",
|
||||
bind=False,
|
||||
max_retries=2,
|
||||
default_retry_delay=10,
|
||||
ignore_result=True, # Fire-and-forget — callers don't await the result
|
||||
)
|
||||
def embed_and_store(
|
||||
tenant_id: str,
|
||||
agent_id: str,
|
||||
user_id: str,
|
||||
messages: list[dict],
|
||||
) -> None:
|
||||
"""
|
||||
Asynchronously embed conversation turns and store them in pgvector.
|
||||
|
||||
Dispatched fire-and-forget after the LLM response so embedding computation
|
||||
NEVER blocks the user-facing response pipeline.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant UUID string.
|
||||
agent_id: Agent UUID string.
|
||||
user_id: End-user identifier.
|
||||
messages: List of {"role", "content"} dicts to embed and store.
|
||||
Typically [user_message, assistant_response].
|
||||
"""
|
||||
asyncio.run(_embed_and_store_async(tenant_id, agent_id, user_id, messages))
|
||||
|
||||
|
||||
async def _embed_and_store_async(
|
||||
tenant_id: str,
|
||||
agent_id: str,
|
||||
user_id: str,
|
||||
messages: list[dict],
|
||||
) -> None:
|
||||
"""
|
||||
Async implementation of embed_and_store.
|
||||
|
||||
Embeds all messages in batch (more efficient than one-by-one) then stores
|
||||
each embedding in conversation_embeddings via store_embedding().
|
||||
"""
|
||||
from orchestrator.memory.embedder import embed_texts
|
||||
from orchestrator.memory.long_term import store_embedding
|
||||
from shared.db import async_session_factory, engine
|
||||
from shared.rls import configure_rls_hook, current_tenant_id
|
||||
|
||||
if not messages:
|
||||
return
|
||||
|
||||
tenant_uuid = uuid.UUID(tenant_id)
|
||||
agent_uuid = uuid.UUID(agent_id)
|
||||
|
||||
# Embed all message texts in a single batch call
|
||||
texts = [msg["content"] for msg in messages]
|
||||
embeddings = embed_texts(texts)
|
||||
|
||||
configure_rls_hook(engine)
|
||||
token = current_tenant_id.set(tenant_uuid)
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
for msg, embedding in zip(messages, embeddings, strict=True):
|
||||
await store_embedding(
|
||||
session,
|
||||
tenant_uuid,
|
||||
agent_uuid,
|
||||
user_id,
|
||||
msg["content"],
|
||||
msg["role"],
|
||||
embedding,
|
||||
)
|
||||
await session.commit()
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"embed_and_store failed for tenant=%s agent=%s user=%s",
|
||||
tenant_id,
|
||||
agent_id,
|
||||
user_id,
|
||||
)
|
||||
finally:
|
||||
current_tenant_id.reset(token)
|
||||
|
||||
|
||||
@app.task(
|
||||
name="orchestrator.tasks.handle_message",
|
||||
bind=True,
|
||||
@@ -82,7 +177,16 @@ async def _process_message(
|
||||
channel_id: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
Async agent pipeline — load agent config, build prompt, call LLM pool.
|
||||
Async agent pipeline — load agent config, build memory-enriched prompt, call LLM pool.
|
||||
|
||||
Memory pipeline (Phase 2 additions):
|
||||
BEFORE LLM call:
|
||||
1. Load recent messages from Redis sliding window
|
||||
2. Embed current message and retrieve semantically relevant long-term context
|
||||
3. Build memory-enriched messages array via build_messages_with_memory()
|
||||
AFTER LLM response:
|
||||
4. Append user message + assistant response to Redis sliding window
|
||||
5. Dispatch embed_and_store.delay() for async pgvector backfill
|
||||
|
||||
After getting the LLM response, if Slack placeholder metadata is present,
|
||||
updates the "Thinking..." placeholder message with the real response using
|
||||
@@ -99,7 +203,11 @@ async def _process_message(
|
||||
Returns:
|
||||
Dict with message_id, response, and tenant_id.
|
||||
"""
|
||||
from orchestrator.agents.builder import build_messages_with_memory
|
||||
from orchestrator.agents.runner import run_agent
|
||||
from orchestrator.memory.embedder import embed_text
|
||||
from orchestrator.memory.long_term import retrieve_relevant
|
||||
from orchestrator.memory.short_term import append_message, get_recent_messages
|
||||
from shared.db import async_session_factory, engine
|
||||
from shared.models.tenant import Agent
|
||||
from shared.rls import configure_rls_hook, current_tenant_id
|
||||
@@ -120,9 +228,9 @@ async def _process_message(
|
||||
token = current_tenant_id.set(tenant_uuid)
|
||||
|
||||
slack_bot_token: str = ""
|
||||
agent: Agent | None = None
|
||||
|
||||
try:
|
||||
agent: Agent | None = None
|
||||
async with async_session_factory() as session:
|
||||
from sqlalchemy import select
|
||||
|
||||
@@ -173,13 +281,68 @@ async def _process_message(
|
||||
"tenant_id": msg.tenant_id,
|
||||
}
|
||||
|
||||
response_text = await run_agent(msg, agent)
|
||||
# Determine user_id for memory scoping: use sender.user_id if available,
|
||||
# fall back to thread_id (for non-identified channel contexts like webhooks)
|
||||
user_id: str = (
|
||||
msg.sender.user_id
|
||||
if msg.sender and msg.sender.user_id
|
||||
else (msg.thread_id or msg.id)
|
||||
)
|
||||
agent_id_str = str(agent.id)
|
||||
user_text: str = msg.content.text or ""
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Memory retrieval (before LLM call)
|
||||
# -------------------------------------------------------------------------
|
||||
import redis.asyncio as aioredis
|
||||
|
||||
from shared.config import settings
|
||||
|
||||
redis_client = aioredis.from_url(settings.redis_url)
|
||||
try:
|
||||
# 1. Short-term: Redis sliding window
|
||||
recent_messages = await get_recent_messages(
|
||||
redis_client, msg.tenant_id, agent_id_str, user_id
|
||||
)
|
||||
|
||||
# 2. Long-term: pgvector similarity search
|
||||
relevant_context: list[str] = []
|
||||
if user_text:
|
||||
query_embedding = embed_text(user_text)
|
||||
rls_token = current_tenant_id.set(tenant_uuid)
|
||||
try:
|
||||
async with async_session_factory() as session:
|
||||
relevant_context = await retrieve_relevant(
|
||||
session,
|
||||
tenant_uuid,
|
||||
agent.id,
|
||||
user_id,
|
||||
query_embedding,
|
||||
)
|
||||
finally:
|
||||
current_tenant_id.reset(rls_token)
|
||||
finally:
|
||||
await redis_client.aclose()
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Build memory-enriched messages array and run LLM
|
||||
# -------------------------------------------------------------------------
|
||||
enriched_messages = build_messages_with_memory(
|
||||
agent=agent,
|
||||
current_message=user_text,
|
||||
recent_messages=recent_messages,
|
||||
relevant_context=relevant_context,
|
||||
)
|
||||
|
||||
response_text = await run_agent(msg, agent, messages=enriched_messages)
|
||||
|
||||
logger.info(
|
||||
"Message %s processed by agent=%s tenant=%s",
|
||||
"Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
|
||||
msg.id,
|
||||
agent.id,
|
||||
msg.tenant_id,
|
||||
len(recent_messages),
|
||||
len(relevant_context),
|
||||
)
|
||||
|
||||
# Replace the "Thinking..." placeholder with the real response
|
||||
@@ -191,6 +354,24 @@ async def _process_message(
|
||||
text=response_text,
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Memory persistence (after LLM response)
|
||||
# -------------------------------------------------------------------------
|
||||
redis_client2 = aioredis.from_url(settings.redis_url)
|
||||
try:
|
||||
# 3. Append both turns to Redis sliding window
|
||||
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "user", user_text)
|
||||
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
|
||||
finally:
|
||||
await redis_client2.aclose()
|
||||
|
||||
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
|
||||
messages_to_embed = [
|
||||
{"role": "user", "content": user_text},
|
||||
{"role": "assistant", "content": response_text},
|
||||
]
|
||||
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
|
||||
|
||||
return {
|
||||
"message_id": msg.id,
|
||||
"response": response_text,
|
||||
|
||||
Reference in New Issue
Block a user