feat(02-01): add two-layer memory system — Redis sliding window + pgvector long-term

- ConversationEmbedding ORM model with Vector(384) column (pgvector)
- memory_short_key, escalation_status_key, pending_tool_confirm_key in redis_keys.py
- orchestrator/memory/short_term.py: RPUSH/LTRIM sliding window (get_recent_messages, append_message)
- orchestrator/memory/long_term.py: pgvector HNSW cosine search (retrieve_relevant, store_embedding)
- Migration 002: conversation_embeddings table, HNSW index, RLS with FORCE, SELECT/INSERT only
- 10 unit tests (fakeredis), 6 integration tests (pgvector) — all passing
- Auto-fix [Rule 3]: postgres image updated to pgvector/pgvector:pg16 (extension required)
This commit is contained in:
2026-03-23 14:41:57 -06:00
parent 370a860622
commit 28a5ee996e
11 changed files with 998 additions and 1 deletions

View File

@@ -0,0 +1,22 @@
"""
Konstruct Agent Memory Layer.
Two-layer conversational memory system:
1. Short-term (Redis sliding window):
- Stores the last N messages verbatim
- Zero latency — Redis is always available
- Provides immediate in-session context continuity
- See: short_term.py
2. Long-term (pgvector HNSW similarity search):
- Stores all messages as semantic embeddings
- Retrieves top-K semantically relevant past exchanges
- Provides cross-session recall (user preferences, past issues, etc.)
- Embedding model: all-MiniLM-L6-v2 (384 dimensions)
- See: long_term.py
Memory scoping: All operations are keyed by (tenant_id, agent_id, user_id).
This ensures complete isolation — no cross-tenant, cross-agent, or cross-user
contamination is possible.
"""

View File

@@ -0,0 +1,164 @@
"""
pgvector-backed long-term conversational memory.
Stores conversation turns as 384-dimensional embeddings (all-MiniLM-L6-v2)
and retrieves semantically relevant past exchanges using HNSW cosine similarity
search.
CRITICAL SECURITY CONSTRAINTS:
1. ALL queries MUST pre-filter by (tenant_id, agent_id, user_id) BEFORE the
ANN operator. This prevents cross-tenant, cross-agent, or cross-user data
leakage even in the face of embedding collisions.
2. Cosine similarity threshold filters out low-relevance results — only content
genuinely related to the query should be injected into the LLM prompt.
3. RLS (Row Level Security) is the DB-level backstop — the application-level
filters above are the primary guard; RLS is the safety net.
pgvector cosine operations:
- <=> operator: cosine DISTANCE (0 = identical, 2 = opposite)
- cosine similarity = 1 - cosine distance
- A threshold of 0.75 means: only return results where
1 - (embedding <=> query) >= 0.75 → distance <= 0.25
"""
from __future__ import annotations
import logging
import uuid
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
logger = logging.getLogger(__name__)
async def retrieve_relevant(
session: AsyncSession,
tenant_id: uuid.UUID,
agent_id: uuid.UUID,
user_id: str,
query_embedding: list[float],
top_k: int = 3,
threshold: float = 0.75,
) -> list[str]:
"""
Retrieve semantically relevant past conversation content.
Performs an HNSW approximate nearest neighbor search scoped strictly to
(tenant_id, agent_id, user_id). Results below the cosine similarity
threshold are discarded.
IMPORTANT: The (tenant_id, agent_id, user_id) pre-filter is applied
BEFORE the ANN operator to guarantee isolation. This is not optional —
removing these WHERE clauses would allow cross-tenant data leakage.
Args:
session: Async SQLAlchemy session (must have RLS configured).
tenant_id: Tenant UUID — mandatory for isolation.
agent_id: Agent UUID — mandatory for isolation.
user_id: End-user identifier — mandatory for isolation.
query_embedding: 384-dimensional query vector (all-MiniLM-L6-v2).
top_k: Maximum number of results to return. Default 3.
threshold: Minimum cosine similarity (0.01.0). Default 0.75.
Set lower for broader recall, higher for precision.
Returns:
List of content strings (original message text), most relevant first.
Returns empty list if no results meet the threshold.
"""
# Convert embedding list to pgvector string format: '[0.1, 0.2, ...]'
vec_str = "[" + ",".join(str(float(v)) for v in query_embedding) + "]"
# CRITICAL: pre-filter by all three isolation columns BEFORE ANN search.
# The ORDER BY uses <=> (cosine distance) — lower is more similar.
# We convert to similarity (1 - distance) to apply the threshold filter.
stmt = text("""
SELECT content, 1 - (embedding <=> CAST(:query AS vector)) AS similarity
FROM conversation_embeddings
WHERE tenant_id = :tenant_id
AND agent_id = :agent_id
AND user_id = :user_id
AND 1 - (embedding <=> CAST(:query AS vector)) >= :threshold
ORDER BY embedding <=> CAST(:query AS vector)
LIMIT :top_k
""")
try:
result = await session.execute(
stmt,
{
"query": vec_str,
"tenant_id": str(tenant_id),
"agent_id": str(agent_id),
"user_id": user_id,
"threshold": threshold,
"top_k": top_k,
},
)
rows = result.fetchall()
except Exception:
logger.exception(
"pgvector retrieve_relevant failed for tenant=%s agent=%s user=%s",
tenant_id,
agent_id,
user_id,
)
return []
return [row.content for row in rows]
async def store_embedding(
session: AsyncSession,
tenant_id: uuid.UUID,
agent_id: uuid.UUID,
user_id: str,
content: str,
role: str,
embedding: list[float],
) -> None:
"""
Store a conversation turn embedding in the database.
Inserts a new row into conversation_embeddings. Embeddings are immutable
once stored — there is no UPDATE path. This matches the audit-log-like
nature of conversation history.
Args:
session: Async SQLAlchemy session (must have RLS configured).
tenant_id: Tenant UUID for isolation.
agent_id: Agent UUID for isolation.
user_id: End-user identifier for isolation.
content: Original message text.
role: "user" or "assistant".
embedding: 384-dimensional float list (all-MiniLM-L6-v2).
"""
vec_str = "[" + ",".join(str(float(v)) for v in embedding) + "]"
stmt = text("""
INSERT INTO conversation_embeddings
(id, tenant_id, agent_id, user_id, content, role, embedding)
VALUES
(gen_random_uuid(), :tenant_id, :agent_id, :user_id, :content, :role, CAST(:embedding AS vector))
""")
try:
await session.execute(
stmt,
{
"tenant_id": str(tenant_id),
"agent_id": str(agent_id),
"user_id": user_id,
"content": content,
"role": role,
"embedding": vec_str,
},
)
except Exception:
logger.exception(
"pgvector store_embedding failed for tenant=%s agent=%s user=%s",
tenant_id,
agent_id,
user_id,
)
raise

View File

@@ -0,0 +1,111 @@
"""
Redis sliding window for short-term conversational memory.
Implements a RPUSH + LTRIM pattern:
- RPUSH appends new messages to the right (tail) of the list
- LTRIM trims the list to the last `window` entries
- LRANGE retrieves all current entries
This gives O(1) append + O(1) trim + O(N) read where N <= window size.
Key format: {tenant_id}:memory:short:{agent_id}:{user_id}
Messages are stored as JSON objects with "role" and "content" keys,
matching the OpenAI chat messages format for direct injection into
the LLM messages array.
Design decisions:
- No TTL: message retention is indefinite per user preference. If TTL-based
expiry is needed in the future, add it via a separate expiry policy.
- No compression: messages are stored as plain JSON. At 20 messages * ~200
bytes average, storage per user/agent is ~4KB — negligible.
- Parameterized window: callers control the window size, defaulting to 20.
This allows future policy changes without code modification.
"""
from __future__ import annotations
import json
import logging
from shared.redis_keys import memory_short_key
logger = logging.getLogger(__name__)
async def get_recent_messages(
redis: object,
tenant_id: str,
agent_id: str,
user_id: str,
n: int = 20,
) -> list[dict[str, str]]:
"""
Retrieve the most recent N messages from the sliding window.
Returns messages in insertion order (oldest first) — this matches the
expected LLM message array format where conversation flows chronologically.
Args:
redis: Redis async client (redis.asyncio.Redis or compatible).
tenant_id: Konstruct tenant identifier.
agent_id: Agent UUID string.
user_id: End-user identifier (channel-native).
n: Maximum number of messages to retrieve. Default 20.
Pass a larger value than the window size to get all messages.
Returns:
List of message dicts with "role" and "content" keys, oldest first.
Returns empty list if no messages exist for this key.
"""
key = memory_short_key(tenant_id, agent_id, user_id)
# LRANGE -n -1 returns the last n items in insertion order
raw_messages = await redis.lrange(key, -n, -1) # type: ignore[union-attr]
messages: list[dict[str, str]] = []
for raw in raw_messages:
try:
msg = json.loads(raw)
messages.append({"role": str(msg["role"]), "content": str(msg["content"])})
except (json.JSONDecodeError, KeyError):
logger.warning("Malformed message in Redis key %s — skipping", key)
return messages
async def append_message(
redis: object,
tenant_id: str,
agent_id: str,
user_id: str,
role: str,
content: str,
window: int = 20,
) -> None:
"""
Append a message to the sliding window and trim to window size.
Uses a pipeline to make RPUSH + LTRIM atomic — no race condition
between append and trim even under concurrent writes.
Args:
redis: Redis async client.
tenant_id: Konstruct tenant identifier.
agent_id: Agent UUID string.
user_id: End-user identifier (channel-native).
role: Message role: "user" or "assistant".
content: Message text content.
window: Maximum number of messages to retain. Default 20.
After this operation the list will contain at most `window`
entries (the most recent ones).
"""
key = memory_short_key(tenant_id, agent_id, user_id)
serialized = json.dumps({"role": role, "content": content})
# Pipeline ensures RPUSH + LTRIM are sent atomically
pipe = redis.pipeline() # type: ignore[union-attr]
pipe.rpush(key, serialized)
# LTRIM to last `window` entries: keep index -(window) through -1
pipe.ltrim(key, -window, -1)
await pipe.execute()

View File

@@ -12,6 +12,7 @@ dependencies = [
"fastapi[standard]>=0.115.0",
"celery[redis]>=5.4.0",
"httpx>=0.28.0",
"sentence-transformers>=3.0.0",
]
[tool.uv.sources]

View File

@@ -19,6 +19,7 @@ dependencies = [
"httpx>=0.28.0",
"slowapi>=0.1.9",
"bcrypt>=4.0.0",
"pgvector>=0.3.0",
]
[tool.hatch.build.targets.wheel]

View File

@@ -0,0 +1,96 @@
"""
SQLAlchemy 2.0 ORM models for conversational memory.
ConversationEmbedding stores pgvector embeddings of past conversation turns
for long-term semantic retrieval across sessions. This is the persistence layer
for the long-term memory module in the Agent Orchestrator.
IMPORTANT:
- Embeddings are immutable (no UPDATE) — like audit records. We store and read
but never modify. This simplifies the data model and prevents mutation bugs.
- RLS is ENABLED with FORCE — tenant_id isolation is enforced at the DB level.
- The vector dimension (384) corresponds to all-MiniLM-L6-v2 output size.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from pgvector.sqlalchemy import Vector
from sqlalchemy import DateTime, ForeignKey, Text, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from shared.models.tenant import Base
class ConversationEmbedding(Base):
"""
A single embedded conversation turn stored for long-term recall.
Each row represents one message (user or assistant) converted to a
384-dimensional embedding via all-MiniLM-L6-v2. The Agent Orchestrator
queries this table at prompt assembly time to inject relevant past context.
Scoped by:
- tenant_id: RLS enforced isolation between tenants
- agent_id: isolation between agents within a tenant
- user_id: isolation between end-users of the same agent
RLS policy enforces:
tenant_id = current_setting('app.current_tenant', TRUE)::uuid
FORCE ROW LEVEL SECURITY ensures even the table owner cannot bypass this.
"""
__tablename__ = "conversation_embeddings"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("tenants.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
agent_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
nullable=False,
index=True,
)
user_id: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="Channel-native user identifier (e.g. Slack user ID U12345)",
)
content: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="Original message text that was embedded",
)
role: Mapped[str] = mapped_column(
Text,
nullable=False,
comment="Message role: 'user' or 'assistant'",
)
embedding: Mapped[list[float]] = mapped_column(
Vector(384),
nullable=False,
comment="all-MiniLM-L6-v2 embedding (384 dimensions)",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
def __repr__(self) -> str:
return (
f"<ConversationEmbedding id={self.id} "
f"tenant_id={self.tenant_id} agent_id={self.agent_id} "
f"user_id={self.user_id!r} role={self.role!r}>"
)

View File

@@ -86,3 +86,61 @@ def engaged_thread_key(tenant_id: str, thread_id: str) -> str:
Namespaced Redis key: "{tenant_id}:engaged:{thread_id}"
"""
return f"{tenant_id}:engaged:{thread_id}"
def memory_short_key(tenant_id: str, agent_id: str, user_id: str) -> str:
"""
Redis key for the short-term conversational memory sliding window.
Stores the last N messages (serialized as JSON) for a specific
tenant + agent + user combination. Used by the Agent Orchestrator to
inject recent conversation history into every LLM prompt.
Key includes all three discriminators to ensure:
- Two users talking to the same agent have separate histories
- The same user talking to two different agents has separate histories
- Two tenants with the same agent/user IDs are fully isolated
Args:
tenant_id: Konstruct tenant identifier.
agent_id: Agent identifier (UUID string).
user_id: End-user identifier (channel-native, e.g. Slack user ID).
Returns:
Namespaced Redis key: "{tenant_id}:memory:short:{agent_id}:{user_id}"
"""
return f"{tenant_id}:memory:short:{agent_id}:{user_id}"
def escalation_status_key(tenant_id: str, thread_id: str) -> str:
"""
Redis key for tracking escalation status of a thread.
Stores the current escalation state for a conversation thread —
whether it has been escalated to a human or another agent.
Args:
tenant_id: Konstruct tenant identifier.
thread_id: Thread identifier.
Returns:
Namespaced Redis key: "{tenant_id}:escalation:{thread_id}"
"""
return f"{tenant_id}:escalation:{thread_id}"
def pending_tool_confirm_key(tenant_id: str, thread_id: str) -> str:
"""
Redis key for tracking pending tool confirmation requests.
Stores the pending tool invocation that requires explicit user
confirmation before execution (e.g. destructive operations).
Args:
tenant_id: Konstruct tenant identifier.
thread_id: Thread identifier.
Returns:
Namespaced Redis key: "{tenant_id}:tool_confirm:{thread_id}"
"""
return f"{tenant_id}:tool_confirm:{thread_id}"