feat(02-01): wire two-layer memory into orchestrator pipeline
- builder.py: add build_messages_with_memory() — injects pgvector context as system message + sliding window history before current user turn - runner.py: accept optional messages parameter; fallback to simple build for backward compat (existing tests unaffected) - tasks.py: memory pipeline in _process_message — load short-term + retrieve long-term before LLM call; append turns to Redis + dispatch embed_and_store fire-and-forget after response - tasks.py: add embed_and_store Celery task (sync def + asyncio.run()) for async pgvector backfill — never blocks the LLM response pipeline - memory/embedder.py: lazy singleton SentenceTransformer (all-MiniLM-L6-v2) with embed_text() / embed_texts() helpers - All 202 tests pass (196 existing + 6 new memory integration tests)
This commit is contained in:
@@ -11,6 +11,14 @@ AI TRANSPARENCY POLICY:
|
|||||||
Per Konstruct product design, agents MUST acknowledge they are AI assistants
|
Per Konstruct product design, agents MUST acknowledge they are AI assistants
|
||||||
when directly asked. This clause is injected unconditionally to prevent
|
when directly asked. This clause is injected unconditionally to prevent
|
||||||
agents from deceiving users, regardless of persona configuration.
|
agents from deceiving users, regardless of persona configuration.
|
||||||
|
|
||||||
|
Memory-enriched message assembly:
|
||||||
|
build_messages_with_memory() extends build_messages() by injecting:
|
||||||
|
1. Long-term context: semantically relevant past exchanges (pgvector)
|
||||||
|
Injected as a system message BEFORE the sliding window so the LLM
|
||||||
|
has background context without it polluting the conversation flow.
|
||||||
|
2. Short-term context: recent messages (Redis sliding window)
|
||||||
|
Represents the immediate conversation history in this session.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@@ -82,3 +90,54 @@ def build_messages(
|
|||||||
|
|
||||||
messages.append({"role": "user", "content": user_message})
|
messages.append({"role": "user", "content": user_message})
|
||||||
return messages
|
return messages
|
||||||
|
|
||||||
|
|
||||||
|
def build_messages_with_memory(
|
||||||
|
agent: Agent,
|
||||||
|
current_message: str,
|
||||||
|
recent_messages: list[dict],
|
||||||
|
relevant_context: list[str],
|
||||||
|
) -> list[dict]:
|
||||||
|
"""
|
||||||
|
Build an LLM messages array enriched with two-layer memory.
|
||||||
|
|
||||||
|
Structure (in order):
|
||||||
|
1. System message — agent identity, persona, AI transparency clause
|
||||||
|
2. System message — long-term context from pgvector (ONLY if non-empty)
|
||||||
|
Injected as a system message before the sliding window so the LLM
|
||||||
|
has relevant background without it appearing in the conversation.
|
||||||
|
3. Sliding window messages — recent conversation history (user/assistant)
|
||||||
|
4. Current user message
|
||||||
|
|
||||||
|
The pgvector context is omitted entirely when relevant_context is empty —
|
||||||
|
injecting an empty context block would be noise in the LLM's context window.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: ORM Agent instance (for system prompt assembly).
|
||||||
|
current_message: The current user message text.
|
||||||
|
recent_messages: Short-term memory — list of {"role", "content"} dicts
|
||||||
|
from Redis sliding window (oldest first).
|
||||||
|
relevant_context: Long-term memory — list of content strings from
|
||||||
|
pgvector similarity search (most relevant first).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of message dicts suitable for an OpenAI-compatible API call.
|
||||||
|
"""
|
||||||
|
system_prompt = build_system_prompt(agent)
|
||||||
|
messages: list[dict] = [{"role": "system", "content": system_prompt}]
|
||||||
|
|
||||||
|
# Inject long-term pgvector context as a system message BEFORE sliding window
|
||||||
|
# Only inject when there IS relevant context — empty context block is noise
|
||||||
|
if relevant_context:
|
||||||
|
context_lines = "\n".join(f"- {item}" for item in relevant_context)
|
||||||
|
context_message = f"Relevant context from past conversations:\n{context_lines}"
|
||||||
|
messages.append({"role": "system", "content": context_message})
|
||||||
|
|
||||||
|
# Append short-term sliding window messages (recent conversation history)
|
||||||
|
if recent_messages:
|
||||||
|
messages.extend(recent_messages)
|
||||||
|
|
||||||
|
# Append the current user message
|
||||||
|
messages.append({"role": "user", "content": current_message})
|
||||||
|
|
||||||
|
return messages
|
||||||
|
|||||||
@@ -31,19 +31,29 @@ _FALLBACK_RESPONSE = (
|
|||||||
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
|
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
|
||||||
|
|
||||||
|
|
||||||
async def run_agent(msg: KonstructMessage, agent: Agent) -> str:
|
async def run_agent(
|
||||||
|
msg: KonstructMessage,
|
||||||
|
agent: Agent,
|
||||||
|
messages: list[dict] | None = None,
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Execute an agent against the LLM pool and return the response text.
|
Execute an agent against the LLM pool and return the response text.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
msg: The inbound Konstruct message being processed.
|
msg: The inbound Konstruct message being processed.
|
||||||
agent: The ORM Agent instance that handles this message.
|
agent: The ORM Agent instance that handles this message.
|
||||||
|
messages: Optional pre-built messages array (e.g. from
|
||||||
|
build_messages_with_memory). When provided, used directly.
|
||||||
|
When None, falls back to simple [system, user] construction
|
||||||
|
for backward compatibility (e.g. existing tests).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The LLM response content as a plain string.
|
The LLM response content as a plain string.
|
||||||
Returns a polite fallback message if the LLM pool is unreachable or
|
Returns a polite fallback message if the LLM pool is unreachable or
|
||||||
returns a non-200 response.
|
returns a non-200 response.
|
||||||
"""
|
"""
|
||||||
|
if messages is None:
|
||||||
|
# Fallback: simple two-message construction (backward compat)
|
||||||
system_prompt = build_system_prompt(agent)
|
system_prompt = build_system_prompt(agent)
|
||||||
|
|
||||||
# Extract user text from the message content
|
# Extract user text from the message content
|
||||||
|
|||||||
81
packages/orchestrator/orchestrator/memory/embedder.py
Normal file
81
packages/orchestrator/orchestrator/memory/embedder.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
"""
|
||||||
|
Singleton embedding model for the Orchestrator.
|
||||||
|
|
||||||
|
Loads all-MiniLM-L6-v2 once at module level (lazy singleton pattern).
|
||||||
|
The model produces 384-dimensional embeddings compatible with the
|
||||||
|
conversation_embeddings.embedding vector(384) column.
|
||||||
|
|
||||||
|
Why a singleton: sentence-transformers models are ~100MB and take ~2s to load.
|
||||||
|
Loading per-request would be catastrophically slow. Loading at module level
|
||||||
|
means the model is loaded once when the Celery worker starts.
|
||||||
|
|
||||||
|
Thread safety: SentenceTransformer.encode() releases the GIL and is safe to
|
||||||
|
call from multiple Celery threads simultaneously.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from sentence_transformers import SentenceTransformer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Embedding model name — must match the vector(384) column dimension
|
||||||
|
_MODEL_NAME = "all-MiniLM-L6-v2"
|
||||||
|
|
||||||
|
# Lazy singleton — loaded on first use, not at import time
|
||||||
|
# This avoids 2s+ load time when the module is imported but not used
|
||||||
|
_model: "SentenceTransformer | None" = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_embedding_model() -> "SentenceTransformer":
|
||||||
|
"""
|
||||||
|
Return the singleton SentenceTransformer model, loading it on first call.
|
||||||
|
|
||||||
|
Thread-safe: multiple Celery workers can call this concurrently.
|
||||||
|
The model is loaded only once per process.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Loaded SentenceTransformer model (all-MiniLM-L6-v2, 384 dims).
|
||||||
|
"""
|
||||||
|
global _model
|
||||||
|
if _model is None:
|
||||||
|
logger.info("Loading embedding model %s (first use)", _MODEL_NAME)
|
||||||
|
from sentence_transformers import SentenceTransformer
|
||||||
|
|
||||||
|
_model = SentenceTransformer(_MODEL_NAME)
|
||||||
|
logger.info("Embedding model %s loaded", _MODEL_NAME)
|
||||||
|
return _model
|
||||||
|
|
||||||
|
|
||||||
|
def embed_text(text: str) -> list[float]:
|
||||||
|
"""
|
||||||
|
Embed a single text string using all-MiniLM-L6-v2.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text to embed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
384-dimensional float list.
|
||||||
|
"""
|
||||||
|
model = get_embedding_model()
|
||||||
|
embedding = model.encode(text, normalize_embeddings=True)
|
||||||
|
return embedding.tolist()
|
||||||
|
|
||||||
|
|
||||||
|
def embed_texts(texts: list[str]) -> list[list[float]]:
|
||||||
|
"""
|
||||||
|
Embed a list of text strings in batch (more efficient than one-by-one).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
texts: List of strings to embed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of 384-dimensional float lists, same order as input.
|
||||||
|
"""
|
||||||
|
model = get_embedding_model()
|
||||||
|
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=32)
|
||||||
|
return [e.tolist() for e in embeddings]
|
||||||
@@ -9,6 +9,19 @@ Celery task definitions for the Konstruct Agent Orchestrator.
|
|||||||
# NEVER change these to `async def`. If you see a RuntimeError about "no
|
# NEVER change these to `async def`. If you see a RuntimeError about "no
|
||||||
# running event loop" or tasks that silently never complete, check for
|
# running event loop" or tasks that silently never complete, check for
|
||||||
# accidental async def usage first.
|
# accidental async def usage first.
|
||||||
|
|
||||||
|
Memory pipeline (Phase 2):
|
||||||
|
Before LLM call:
|
||||||
|
1. get_recent_messages() — load Redis sliding window (last 20 msgs)
|
||||||
|
2. embed current message + retrieve_relevant() — pgvector long-term context
|
||||||
|
3. build_messages_with_memory() — assemble enriched messages array
|
||||||
|
|
||||||
|
After LLM response:
|
||||||
|
4. append_message() x2 — save user + assistant turns to Redis
|
||||||
|
5. embed_and_store.delay() — fire-and-forget pgvector backfill (async)
|
||||||
|
|
||||||
|
The embed_and_store Celery task runs asynchronously, meaning the LLM response
|
||||||
|
is never blocked waiting for embedding computation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@@ -23,6 +36,88 @@ from shared.models.message import KonstructMessage
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@app.task(
|
||||||
|
name="orchestrator.tasks.embed_and_store",
|
||||||
|
bind=False,
|
||||||
|
max_retries=2,
|
||||||
|
default_retry_delay=10,
|
||||||
|
ignore_result=True, # Fire-and-forget — callers don't await the result
|
||||||
|
)
|
||||||
|
def embed_and_store(
|
||||||
|
tenant_id: str,
|
||||||
|
agent_id: str,
|
||||||
|
user_id: str,
|
||||||
|
messages: list[dict],
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Asynchronously embed conversation turns and store them in pgvector.
|
||||||
|
|
||||||
|
Dispatched fire-and-forget after the LLM response so embedding computation
|
||||||
|
NEVER blocks the user-facing response pipeline.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tenant_id: Tenant UUID string.
|
||||||
|
agent_id: Agent UUID string.
|
||||||
|
user_id: End-user identifier.
|
||||||
|
messages: List of {"role", "content"} dicts to embed and store.
|
||||||
|
Typically [user_message, assistant_response].
|
||||||
|
"""
|
||||||
|
asyncio.run(_embed_and_store_async(tenant_id, agent_id, user_id, messages))
|
||||||
|
|
||||||
|
|
||||||
|
async def _embed_and_store_async(
|
||||||
|
tenant_id: str,
|
||||||
|
agent_id: str,
|
||||||
|
user_id: str,
|
||||||
|
messages: list[dict],
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Async implementation of embed_and_store.
|
||||||
|
|
||||||
|
Embeds all messages in batch (more efficient than one-by-one) then stores
|
||||||
|
each embedding in conversation_embeddings via store_embedding().
|
||||||
|
"""
|
||||||
|
from orchestrator.memory.embedder import embed_texts
|
||||||
|
from orchestrator.memory.long_term import store_embedding
|
||||||
|
from shared.db import async_session_factory, engine
|
||||||
|
from shared.rls import configure_rls_hook, current_tenant_id
|
||||||
|
|
||||||
|
if not messages:
|
||||||
|
return
|
||||||
|
|
||||||
|
tenant_uuid = uuid.UUID(tenant_id)
|
||||||
|
agent_uuid = uuid.UUID(agent_id)
|
||||||
|
|
||||||
|
# Embed all message texts in a single batch call
|
||||||
|
texts = [msg["content"] for msg in messages]
|
||||||
|
embeddings = embed_texts(texts)
|
||||||
|
|
||||||
|
configure_rls_hook(engine)
|
||||||
|
token = current_tenant_id.set(tenant_uuid)
|
||||||
|
try:
|
||||||
|
async with async_session_factory() as session:
|
||||||
|
for msg, embedding in zip(messages, embeddings, strict=True):
|
||||||
|
await store_embedding(
|
||||||
|
session,
|
||||||
|
tenant_uuid,
|
||||||
|
agent_uuid,
|
||||||
|
user_id,
|
||||||
|
msg["content"],
|
||||||
|
msg["role"],
|
||||||
|
embedding,
|
||||||
|
)
|
||||||
|
await session.commit()
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"embed_and_store failed for tenant=%s agent=%s user=%s",
|
||||||
|
tenant_id,
|
||||||
|
agent_id,
|
||||||
|
user_id,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
current_tenant_id.reset(token)
|
||||||
|
|
||||||
|
|
||||||
@app.task(
|
@app.task(
|
||||||
name="orchestrator.tasks.handle_message",
|
name="orchestrator.tasks.handle_message",
|
||||||
bind=True,
|
bind=True,
|
||||||
@@ -82,7 +177,16 @@ async def _process_message(
|
|||||||
channel_id: str = "",
|
channel_id: str = "",
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
Async agent pipeline — load agent config, build prompt, call LLM pool.
|
Async agent pipeline — load agent config, build memory-enriched prompt, call LLM pool.
|
||||||
|
|
||||||
|
Memory pipeline (Phase 2 additions):
|
||||||
|
BEFORE LLM call:
|
||||||
|
1. Load recent messages from Redis sliding window
|
||||||
|
2. Embed current message and retrieve semantically relevant long-term context
|
||||||
|
3. Build memory-enriched messages array via build_messages_with_memory()
|
||||||
|
AFTER LLM response:
|
||||||
|
4. Append user message + assistant response to Redis sliding window
|
||||||
|
5. Dispatch embed_and_store.delay() for async pgvector backfill
|
||||||
|
|
||||||
After getting the LLM response, if Slack placeholder metadata is present,
|
After getting the LLM response, if Slack placeholder metadata is present,
|
||||||
updates the "Thinking..." placeholder message with the real response using
|
updates the "Thinking..." placeholder message with the real response using
|
||||||
@@ -99,7 +203,11 @@ async def _process_message(
|
|||||||
Returns:
|
Returns:
|
||||||
Dict with message_id, response, and tenant_id.
|
Dict with message_id, response, and tenant_id.
|
||||||
"""
|
"""
|
||||||
|
from orchestrator.agents.builder import build_messages_with_memory
|
||||||
from orchestrator.agents.runner import run_agent
|
from orchestrator.agents.runner import run_agent
|
||||||
|
from orchestrator.memory.embedder import embed_text
|
||||||
|
from orchestrator.memory.long_term import retrieve_relevant
|
||||||
|
from orchestrator.memory.short_term import append_message, get_recent_messages
|
||||||
from shared.db import async_session_factory, engine
|
from shared.db import async_session_factory, engine
|
||||||
from shared.models.tenant import Agent
|
from shared.models.tenant import Agent
|
||||||
from shared.rls import configure_rls_hook, current_tenant_id
|
from shared.rls import configure_rls_hook, current_tenant_id
|
||||||
@@ -120,9 +228,9 @@ async def _process_message(
|
|||||||
token = current_tenant_id.set(tenant_uuid)
|
token = current_tenant_id.set(tenant_uuid)
|
||||||
|
|
||||||
slack_bot_token: str = ""
|
slack_bot_token: str = ""
|
||||||
|
agent: Agent | None = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
agent: Agent | None = None
|
|
||||||
async with async_session_factory() as session:
|
async with async_session_factory() as session:
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
|
|
||||||
@@ -173,13 +281,68 @@ async def _process_message(
|
|||||||
"tenant_id": msg.tenant_id,
|
"tenant_id": msg.tenant_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
response_text = await run_agent(msg, agent)
|
# Determine user_id for memory scoping: use sender.user_id if available,
|
||||||
|
# fall back to thread_id (for non-identified channel contexts like webhooks)
|
||||||
|
user_id: str = (
|
||||||
|
msg.sender.user_id
|
||||||
|
if msg.sender and msg.sender.user_id
|
||||||
|
else (msg.thread_id or msg.id)
|
||||||
|
)
|
||||||
|
agent_id_str = str(agent.id)
|
||||||
|
user_text: str = msg.content.text or ""
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Memory retrieval (before LLM call)
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
|
from shared.config import settings
|
||||||
|
|
||||||
|
redis_client = aioredis.from_url(settings.redis_url)
|
||||||
|
try:
|
||||||
|
# 1. Short-term: Redis sliding window
|
||||||
|
recent_messages = await get_recent_messages(
|
||||||
|
redis_client, msg.tenant_id, agent_id_str, user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
# 2. Long-term: pgvector similarity search
|
||||||
|
relevant_context: list[str] = []
|
||||||
|
if user_text:
|
||||||
|
query_embedding = embed_text(user_text)
|
||||||
|
rls_token = current_tenant_id.set(tenant_uuid)
|
||||||
|
try:
|
||||||
|
async with async_session_factory() as session:
|
||||||
|
relevant_context = await retrieve_relevant(
|
||||||
|
session,
|
||||||
|
tenant_uuid,
|
||||||
|
agent.id,
|
||||||
|
user_id,
|
||||||
|
query_embedding,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
current_tenant_id.reset(rls_token)
|
||||||
|
finally:
|
||||||
|
await redis_client.aclose()
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Build memory-enriched messages array and run LLM
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
enriched_messages = build_messages_with_memory(
|
||||||
|
agent=agent,
|
||||||
|
current_message=user_text,
|
||||||
|
recent_messages=recent_messages,
|
||||||
|
relevant_context=relevant_context,
|
||||||
|
)
|
||||||
|
|
||||||
|
response_text = await run_agent(msg, agent, messages=enriched_messages)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Message %s processed by agent=%s tenant=%s",
|
"Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
|
||||||
msg.id,
|
msg.id,
|
||||||
agent.id,
|
agent.id,
|
||||||
msg.tenant_id,
|
msg.tenant_id,
|
||||||
|
len(recent_messages),
|
||||||
|
len(relevant_context),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Replace the "Thinking..." placeholder with the real response
|
# Replace the "Thinking..." placeholder with the real response
|
||||||
@@ -191,6 +354,24 @@ async def _process_message(
|
|||||||
text=response_text,
|
text=response_text,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Memory persistence (after LLM response)
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
redis_client2 = aioredis.from_url(settings.redis_url)
|
||||||
|
try:
|
||||||
|
# 3. Append both turns to Redis sliding window
|
||||||
|
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "user", user_text)
|
||||||
|
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
|
||||||
|
finally:
|
||||||
|
await redis_client2.aclose()
|
||||||
|
|
||||||
|
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
|
||||||
|
messages_to_embed = [
|
||||||
|
{"role": "user", "content": user_text},
|
||||||
|
{"role": "assistant", "content": response_text},
|
||||||
|
]
|
||||||
|
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"message_id": msg.id,
|
"message_id": msg.id,
|
||||||
"response": response_text,
|
"response": response_text,
|
||||||
|
|||||||
Reference in New Issue
Block a user