- Add ChannelType.WEB = 'web' to shared/models/message.py - Add webchat_response_key() to shared/redis_keys.py - Create WebConversation and WebConversationMessage ORM models (SQLAlchemy 2.0) - Create migration 008_web_chat.py with RLS, indexes, and channel_type CHECK update - Pop conversation_id/portal_user_id extras in handle_message before model_validate - Add web case to _build_response_extras and _send_response (Redis pub-sub publish) - Import webchat_response_key in orchestrator/tasks.py - Write 19 unit tests covering CHAT-01 through CHAT-05 (all pass)
169 lines
5.5 KiB
Python
169 lines
5.5 KiB
Python
"""
|
|
Namespaced Redis key constructors.
|
|
|
|
DESIGN PRINCIPLE: It must be impossible to construct a Redis key without
|
|
a tenant_id. Every function in this module requires tenant_id as its first
|
|
argument and prepends `{tenant_id}:` to every key.
|
|
|
|
This ensures strict per-tenant namespace isolation — Tenant A's rate limit
|
|
counters, session state, and deduplication keys are entirely separate from
|
|
Tenant B's, even though they share the same Redis instance.
|
|
|
|
Key format: {tenant_id}:{key_type}:{discriminator}
|
|
|
|
Examples:
|
|
rate_limit_key("acme", "slack") → "acme:ratelimit:slack"
|
|
idempotency_key("acme", "msg-123") → "acme:dedup:msg-123"
|
|
session_key("acme", "thread-456") → "acme:session:thread-456"
|
|
engaged_thread_key("acme", "T12345") → "acme:engaged:T12345"
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
def rate_limit_key(tenant_id: str, channel: str) -> str:
|
|
"""
|
|
Redis key for per-tenant, per-channel rate limit counters.
|
|
|
|
Used by the token bucket rate limiter in the Message Router.
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
channel: Channel type string (e.g. "slack", "whatsapp").
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:ratelimit:{channel}"
|
|
"""
|
|
return f"{tenant_id}:ratelimit:{channel}"
|
|
|
|
|
|
def idempotency_key(tenant_id: str, message_id: str) -> str:
|
|
"""
|
|
Redis key for message deduplication (idempotency).
|
|
|
|
Prevents duplicate processing when channels deliver events more than once
|
|
(e.g. Slack retry behaviour on gateway timeout).
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
message_id: Unique message identifier from the channel.
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:dedup:{message_id}"
|
|
"""
|
|
return f"{tenant_id}:dedup:{message_id}"
|
|
|
|
|
|
def session_key(tenant_id: str, thread_id: str) -> str:
|
|
"""
|
|
Redis key for conversation session state.
|
|
|
|
Stores sliding window conversation history for a thread, used by the
|
|
Agent Orchestrator to maintain context between messages.
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
thread_id: Thread identifier (e.g. Slack thread_ts or DM channel ID).
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:session:{thread_id}"
|
|
"""
|
|
return f"{tenant_id}:session:{thread_id}"
|
|
|
|
|
|
def engaged_thread_key(tenant_id: str, thread_id: str) -> str:
|
|
"""
|
|
Redis key tracking whether an agent is actively engaged in a thread.
|
|
|
|
An "engaged" thread means the agent has been @mentioned or responded in
|
|
this thread — subsequent messages in the thread don't require a new @mention.
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
thread_id: Thread identifier.
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:engaged:{thread_id}"
|
|
"""
|
|
return f"{tenant_id}:engaged:{thread_id}"
|
|
|
|
|
|
def memory_short_key(tenant_id: str, agent_id: str, user_id: str) -> str:
|
|
"""
|
|
Redis key for the short-term conversational memory sliding window.
|
|
|
|
Stores the last N messages (serialized as JSON) for a specific
|
|
tenant + agent + user combination. Used by the Agent Orchestrator to
|
|
inject recent conversation history into every LLM prompt.
|
|
|
|
Key includes all three discriminators to ensure:
|
|
- Two users talking to the same agent have separate histories
|
|
- The same user talking to two different agents has separate histories
|
|
- Two tenants with the same agent/user IDs are fully isolated
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
agent_id: Agent identifier (UUID string).
|
|
user_id: End-user identifier (channel-native, e.g. Slack user ID).
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:memory:short:{agent_id}:{user_id}"
|
|
"""
|
|
return f"{tenant_id}:memory:short:{agent_id}:{user_id}"
|
|
|
|
|
|
def escalation_status_key(tenant_id: str, thread_id: str) -> str:
|
|
"""
|
|
Redis key for tracking escalation status of a thread.
|
|
|
|
Stores the current escalation state for a conversation thread —
|
|
whether it has been escalated to a human or another agent.
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
thread_id: Thread identifier.
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:escalation:{thread_id}"
|
|
"""
|
|
return f"{tenant_id}:escalation:{thread_id}"
|
|
|
|
|
|
def pending_tool_confirm_key(tenant_id: str, thread_id: str) -> str:
|
|
"""
|
|
Redis key for tracking pending tool confirmation requests.
|
|
|
|
Stores the pending tool invocation that requires explicit user
|
|
confirmation before execution (e.g. destructive operations).
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
thread_id: Thread identifier.
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:tool_confirm:{thread_id}"
|
|
"""
|
|
return f"{tenant_id}:tool_confirm:{thread_id}"
|
|
|
|
|
|
def webchat_response_key(tenant_id: str, conversation_id: str) -> str:
|
|
"""
|
|
Redis pub-sub channel key for web chat response delivery.
|
|
|
|
The WebSocket handler subscribes to this channel after dispatching
|
|
a message to Celery. The orchestrator publishes the agent response
|
|
to this channel when processing completes.
|
|
|
|
Key includes both tenant_id and conversation_id to ensure:
|
|
- Two conversations in the same tenant get separate channels
|
|
- Two tenants with the same conversation_id are fully isolated
|
|
|
|
Args:
|
|
tenant_id: Konstruct tenant identifier.
|
|
conversation_id: Web conversation UUID string.
|
|
|
|
Returns:
|
|
Namespaced Redis key: "{tenant_id}:webchat:response:{conversation_id}"
|
|
"""
|
|
return f"{tenant_id}:webchat:response:{conversation_id}"
|