Files
konstruct/packages/orchestrator/orchestrator/tasks.py
Adolfo Delorenzo d59f85cd87 feat(04-rbac-01): RBAC guards + invite token + email + invitation API
- rbac.py: PortalCaller dataclass + get_portal_caller dependency (header-based)
- rbac.py: require_platform_admin (403 for non-platform_admin)
- rbac.py: require_tenant_admin (platform_admin bypasses; customer_admin
  checks UserTenantRole; operator always rejected)
- rbac.py: require_tenant_member (platform_admin bypasses; all roles
  checked against UserTenantRole)
- invite_token.py: generate_invite_token (HMAC-SHA256, base64url, 48h TTL)
- invite_token.py: validate_invite_token (timing-safe compare_digest, TTL check)
- invite_token.py: token_to_hash (SHA-256 for DB storage)
- email.py: send_invite_email (sync smtplib, skips if smtp_host empty)
- invitations.py: POST /api/portal/invitations (create, requires tenant admin)
- invitations.py: POST /api/portal/invitations/accept (accept invitation)
- invitations.py: POST /api/portal/invitations/{id}/resend (regenerate token)
- invitations.py: GET /api/portal/invitations (list pending)
- portal.py: AuthVerifyResponse now returns role+tenant_ids+active_tenant_id
- portal.py: auth/register gated behind require_platform_admin
- tasks.py: send_invite_email_task Celery task (fire-and-forget)
- gateway/main.py: invitations_router mounted
2026-03-24 13:52:45 -06:00

842 lines
32 KiB
Python

"""
Celery task definitions for the Konstruct Agent Orchestrator.
# CELERY TASKS MUST BE SYNC def — async def causes RuntimeError or silent hang.
# Use asyncio.run() for async work. This is a fundamental Celery constraint:
# Celery workers are NOT async-native. The handle_message task bridges the
# sync Celery world to the async agent pipeline via asyncio.run().
#
# NEVER change these to `async def`. If you see a RuntimeError about "no
# running event loop" or tasks that silently never complete, check for
# accidental async def usage first.
Memory pipeline (Phase 2):
Before LLM call:
1. get_recent_messages() — load Redis sliding window (last 20 msgs)
2. embed current message + retrieve_relevant() — pgvector long-term context
3. build_messages_with_memory() — assemble enriched messages array
After LLM response:
4. append_message() x2 — save user + assistant turns to Redis
5. embed_and_store.delay() — fire-and-forget pgvector backfill (async)
The embed_and_store Celery task runs asynchronously, meaning the LLM response
is never blocked waiting for embedding computation.
Tool pipeline (Phase 2 Plan 02):
run_agent() now accepts audit_logger and tool_registry and implements a
multi-turn tool-call loop internally. The loop runs within the same
asyncio.run() block — no separate Celery tasks for tool execution.
Pending tool confirmation:
When a tool with requires_confirmation=True is invoked, the runner returns
a confirmation message. The task stores a pending_tool_confirm entry in Redis
and returns the confirmation message as the response.
On the next user message, if a pending confirmation exists:
- "yes" → execute the pending tool and continue
- "no" / anything else → cancel and inform the user
Escalation pipeline (Phase 2 Plan 06):
Pre-check (before LLM call):
If Redis escalation_status_key == "escalated", return early assistant-mode reply.
This prevents the LLM from being called when a human has already taken over.
Post-check (after LLM response):
check_escalation_rules() evaluates configured rules against conversation metadata.
If a rule matches AND agent.escalation_assignee is set, escalate_to_human() is
called and its return value replaces the LLM response.
Outbound routing (Phase 2 Plan 06):
All response delivery goes through _send_response() which routes to:
- Slack: _update_slack_placeholder() via chat.update
- WhatsApp: send_whatsapp_message() via Meta Cloud API
handle_message now pops WhatsApp extras (phone_number_id, bot_token) and
passes them through to _process_message via the extras dict.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from typing import Any
import redis.asyncio as aioredis
from gateway.channels.whatsapp import send_whatsapp_message
from orchestrator.agents.builder import build_messages_with_memory
from orchestrator.agents.runner import run_agent
from orchestrator.audit.logger import AuditLogger
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
from orchestrator.main import app
from orchestrator.memory.embedder import embed_text
from orchestrator.memory.long_term import retrieve_relevant
from orchestrator.memory.short_term import append_message, get_recent_messages
from orchestrator.tools.registry import get_tools_for_agent
from shared.config import settings
from shared.db import async_session_factory, engine
from shared.models.message import KonstructMessage
from shared.redis_keys import escalation_status_key
from shared.rls import configure_rls_hook, current_tenant_id
logger = logging.getLogger(__name__)
# Redis key pattern for pending tool confirmation
_PENDING_TOOL_KEY = "pending_tool_confirm:{tenant_id}:{user_id}"
# TTL for pending confirmation: 10 minutes (user must respond within this window)
_PENDING_TOOL_TTL = 600
@app.task(
name="orchestrator.tasks.embed_and_store",
bind=False,
max_retries=2,
default_retry_delay=10,
ignore_result=True, # Fire-and-forget — callers don't await the result
)
def embed_and_store(
tenant_id: str,
agent_id: str,
user_id: str,
messages: list[dict],
) -> None:
"""
Asynchronously embed conversation turns and store them in pgvector.
Dispatched fire-and-forget after the LLM response so embedding computation
NEVER blocks the user-facing response pipeline.
Args:
tenant_id: Tenant UUID string.
agent_id: Agent UUID string.
user_id: End-user identifier.
messages: List of {"role", "content"} dicts to embed and store.
Typically [user_message, assistant_response].
"""
asyncio.run(_embed_and_store_async(tenant_id, agent_id, user_id, messages))
async def _embed_and_store_async(
tenant_id: str,
agent_id: str,
user_id: str,
messages: list[dict],
) -> None:
"""
Async implementation of embed_and_store.
Embeds all messages in batch (more efficient than one-by-one) then stores
each embedding in conversation_embeddings via store_embedding().
"""
from orchestrator.memory.embedder import embed_texts
from orchestrator.memory.long_term import store_embedding
from shared.db import async_session_factory, engine
from shared.rls import configure_rls_hook, current_tenant_id
if not messages:
return
tenant_uuid = uuid.UUID(tenant_id)
agent_uuid = uuid.UUID(agent_id)
# Embed all message texts in a single batch call
texts = [msg["content"] for msg in messages]
embeddings = embed_texts(texts)
configure_rls_hook(engine)
token = current_tenant_id.set(tenant_uuid)
try:
async with async_session_factory() as session:
for msg, embedding in zip(messages, embeddings, strict=True):
await store_embedding(
session,
tenant_uuid,
agent_uuid,
user_id,
msg["content"],
msg["role"],
embedding,
)
await session.commit()
except Exception:
logger.exception(
"embed_and_store failed for tenant=%s agent=%s user=%s",
tenant_id,
agent_id,
user_id,
)
finally:
current_tenant_id.reset(token)
@app.task(
name="orchestrator.tasks.send_invite_email_task",
bind=False,
max_retries=2,
default_retry_delay=30,
ignore_result=True, # Fire-and-forget — callers don't await the result
)
def send_invite_email_task(
to_email: str,
invitee_name: str,
tenant_name: str,
invite_url: str,
) -> None:
"""
Asynchronously send an invitation email via SMTP.
Dispatched fire-and-forget by the invitation API after creating an invitation.
If SMTP is not configured, logs a warning and returns silently.
Args:
to_email: Recipient email address.
invitee_name: Recipient display name.
tenant_name: Name of the tenant being joined.
invite_url: Full invitation acceptance URL.
"""
from shared.email import send_invite_email
send_invite_email(to_email, invitee_name, tenant_name, invite_url)
@app.task(
name="orchestrator.tasks.handle_message",
bind=True,
max_retries=3,
default_retry_delay=5,
)
def handle_message(self, message_data: dict) -> dict: # type: ignore[no-untyped-def]
"""
Process an inbound Konstruct message through the agent pipeline.
This task is the primary entry point for the Celery worker. It is dispatched
by the Channel Gateway after tenant resolution completes.
The ``message_data`` dict MAY contain extra keys beyond KonstructMessage
fields. Specifically:
- Slack handler injects:
``placeholder_ts``: Slack message timestamp of the "Thinking..." placeholder
``channel_id``: Slack channel ID where the response should be posted
- WhatsApp gateway injects:
``phone_number_id``: WhatsApp phone number ID for outbound messaging
``bot_token``: WhatsApp access_token (injected as bot_token by gateway)
These are extracted before KonstructMessage validation and used to route
outbound responses via _send_response().
Pipeline:
1. Extract channel reply metadata (Slack + WhatsApp extras) if present
2. Deserialize message_data -> KonstructMessage
3. Extract wa_id from sender.user_id for WhatsApp messages
4. Build extras dict for channel-aware outbound routing
5. Run async agent pipeline via asyncio.run()
6. Return response dict
Args:
message_data: JSON-serializable dict. Must contain KonstructMessage
fields plus optional channel-specific extras.
Returns:
Dict with keys:
- message_id (str): Original message ID
- response (str): Agent's response text
- tenant_id (str | None): Tenant that handled the message
"""
# Extract Slack-specific reply metadata before model validation
# (KonstructMessage doesn't know about these fields)
placeholder_ts: str = message_data.pop("placeholder_ts", "") or ""
channel_id: str = message_data.pop("channel_id", "") or ""
# Extract WhatsApp-specific extras before model validation
# The WhatsApp gateway injects these alongside the normalized KonstructMessage fields
phone_number_id: str = message_data.pop("phone_number_id", "") or ""
bot_token: str = message_data.pop("bot_token", "") or ""
try:
msg = KonstructMessage.model_validate(message_data)
except Exception as exc:
logger.exception("Failed to deserialize KonstructMessage: %s", message_data)
raise self.retry(exc=exc)
# Extract wa_id from sender.user_id — WhatsApp normalizer sets sender.user_id
# to the wa_id (recipient phone number). This must happen AFTER model_validate.
wa_id: str = ""
if msg.channel == "whatsapp" and msg.sender and msg.sender.user_id:
wa_id = msg.sender.user_id
# Build the unified extras dict for channel-aware outbound routing
extras: dict[str, Any] = {
"placeholder_ts": placeholder_ts,
"channel_id": channel_id,
"phone_number_id": phone_number_id,
"bot_token": bot_token,
"wa_id": wa_id,
}
result = asyncio.run(_process_message(msg, extras=extras))
return result
async def _process_message(
msg: KonstructMessage,
extras: dict[str, Any] | None = None,
) -> dict:
"""
Async agent pipeline — load agent config, build memory-enriched prompt, call LLM pool.
Memory pipeline (Phase 2 additions):
BEFORE LLM call:
1. Load recent messages from Redis sliding window
2. Embed current message and retrieve semantically relevant long-term context
3. Build memory-enriched messages array via build_messages_with_memory()
AFTER LLM response:
4. Append user message + assistant response to Redis sliding window
5. Dispatch embed_and_store.delay() for async pgvector backfill
Tool pipeline (Phase 2 Plan 02 additions):
- Check Redis for pending tool confirmation from previous turn
- If pending confirmation: handle yes/no, execute or cancel
- Otherwise: initialize AuditLogger, build tool registry, pass to run_agent()
- Tool-call loop runs inside run_agent() — no separate Celery tasks
- If run_agent returns a confirmation message: store pending action in Redis
Escalation pipeline (Phase 2 Plan 06 additions):
Pre-check: if Redis escalation_status_key == "escalated", return assistant-mode reply
Post-check: check_escalation_rules after LLM response; if triggered, escalate_to_human
Outbound routing (Phase 2 Plan 06 additions):
All response delivery goes through _send_response() — never direct _update_slack_placeholder.
extras dict is passed through to _send_response for channel-aware routing.
Args:
msg: The deserialized KonstructMessage.
extras: Channel-specific routing metadata. For Slack: placeholder_ts, channel_id,
bot_token. For WhatsApp: phone_number_id, bot_token, wa_id.
Returns:
Dict with message_id, response, and tenant_id.
"""
from shared.models.tenant import Agent
if extras is None:
extras = {}
if msg.tenant_id is None:
logger.warning("Message %s has no tenant_id — cannot process", msg.id)
return {
"message_id": msg.id,
"response": "Unable to process: tenant not identified.",
"tenant_id": None,
}
# Set up RLS engine hook (idempotent — safe to call on every task)
configure_rls_hook(engine)
# Set the RLS context variable for this async task's context
tenant_uuid = uuid.UUID(msg.tenant_id)
token = current_tenant_id.set(tenant_uuid)
slack_bot_token: str = ""
agent: Agent | None = None
try:
async with async_session_factory() as session:
from sqlalchemy import select
stmt = (
select(Agent)
.where(Agent.tenant_id == tenant_uuid)
.where(Agent.is_active.is_(True))
.limit(1)
)
result = await session.execute(stmt)
agent = result.scalars().first()
# Load the Slack bot token for this tenant from channel_connections config.
# This is needed for escalation DM delivery even on WhatsApp messages —
# the escalation handler always sends via Slack DM to the assignee.
if agent is not None and (extras.get("placeholder_ts") and extras.get("channel_id")):
from shared.models.tenant import ChannelConnection, ChannelTypeEnum
conn_stmt = (
select(ChannelConnection)
.where(ChannelConnection.tenant_id == tenant_uuid)
.where(ChannelConnection.channel_type == ChannelTypeEnum.SLACK)
.limit(1)
)
conn_result = await session.execute(conn_stmt)
conn = conn_result.scalars().first()
if conn and conn.config:
slack_bot_token = conn.config.get("bot_token", "")
finally:
# Always reset the RLS context var after DB work is done
current_tenant_id.reset(token)
if agent is None:
logger.warning(
"No active agent found for tenant=%s message=%s",
msg.tenant_id,
msg.id,
)
no_agent_response = "No active agent is configured for your workspace. Please contact your administrator."
# Build response_extras for channel-aware delivery
response_extras = _build_response_extras(msg.channel, extras, slack_bot_token)
await _send_response(msg.channel, no_agent_response, response_extras)
return {
"message_id": msg.id,
"response": no_agent_response,
"tenant_id": msg.tenant_id,
}
# Determine user_id for memory scoping: use sender.user_id if available,
# fall back to thread_id (for non-identified channel contexts like webhooks)
user_id: str = (
msg.sender.user_id
if msg.sender and msg.sender.user_id
else (msg.thread_id or msg.id)
)
agent_id_str = str(agent.id)
user_text: str = msg.content.text or ""
# -------------------------------------------------------------------------
# Initialize AuditLogger for this pipeline run
# -------------------------------------------------------------------------
audit_logger = AuditLogger(session_factory=async_session_factory)
# Build response_extras dict used for all outbound delivery in this pipeline run.
# For Slack: merges DB-loaded slack_bot_token with incoming extras.
# For WhatsApp: extras already contain phone_number_id, bot_token, wa_id.
response_extras = _build_response_extras(msg.channel, extras, slack_bot_token)
# -------------------------------------------------------------------------
# Escalation pre-check — if conversation already escalated, reply in assistant mode
# -------------------------------------------------------------------------
thread_key = msg.thread_id or user_id
esc_key = escalation_status_key(msg.tenant_id, thread_key)
pre_check_redis = aioredis.from_url(settings.redis_url)
try:
esc_status = await pre_check_redis.get(esc_key)
finally:
await pre_check_redis.aclose()
if esc_status == b"escalated":
assistant_reply = "I've already connected you with a team member. They'll continue assisting you."
await _send_response(msg.channel, assistant_reply, response_extras)
return {"message_id": msg.id, "response": assistant_reply, "tenant_id": msg.tenant_id}
# -------------------------------------------------------------------------
# Pending tool confirmation check
# -------------------------------------------------------------------------
pending_confirm_key = _PENDING_TOOL_KEY.format(
tenant_id=msg.tenant_id,
user_id=user_id,
)
response_text: str = ""
handled_as_confirmation = False
redis_client = aioredis.from_url(settings.redis_url)
try:
pending_raw = await redis_client.get(pending_confirm_key)
if pending_raw:
# There's a pending tool confirmation waiting for user response
handled_as_confirmation = True
pending_data = json.loads(pending_raw)
user_response = user_text.strip().lower()
if user_response in ("yes", "y", "confirm", "ok", "sure", "proceed"):
# User confirmed — execute the pending tool
response_text = await _execute_pending_tool(
pending_data=pending_data,
tenant_uuid=tenant_uuid,
agent=agent,
audit_logger=audit_logger,
)
else:
# User rejected or provided unclear response — cancel
tool_name = pending_data.get("tool_name", "the action")
response_text = f"Action cancelled. I won't proceed with {tool_name}."
# Always clear the pending confirmation after handling
await redis_client.delete(pending_confirm_key)
finally:
await redis_client.aclose()
if handled_as_confirmation:
await _send_response(msg.channel, response_text, response_extras)
return {
"message_id": msg.id,
"response": response_text,
"tenant_id": msg.tenant_id,
}
# -------------------------------------------------------------------------
# Memory retrieval (before LLM call)
# -------------------------------------------------------------------------
redis_client2 = aioredis.from_url(settings.redis_url)
try:
# 1. Short-term: Redis sliding window
recent_messages = await get_recent_messages(
redis_client2, msg.tenant_id, agent_id_str, user_id
)
# 2. Long-term: pgvector similarity search
relevant_context: list[str] = []
if user_text:
query_embedding = embed_text(user_text)
rls_token = current_tenant_id.set(tenant_uuid)
try:
async with async_session_factory() as session:
relevant_context = await retrieve_relevant(
session,
tenant_uuid,
agent.id,
user_id,
query_embedding,
)
finally:
current_tenant_id.reset(rls_token)
finally:
await redis_client2.aclose()
# -------------------------------------------------------------------------
# Build memory-enriched messages array
# -------------------------------------------------------------------------
enriched_messages = build_messages_with_memory(
agent=agent,
current_message=user_text,
recent_messages=recent_messages,
relevant_context=relevant_context,
channel=str(msg.channel) if msg.channel else "",
)
# Build tool registry for this agent
tool_registry = get_tools_for_agent(agent)
# -------------------------------------------------------------------------
# Run agent with tool loop
# -------------------------------------------------------------------------
response_text = await run_agent(
msg,
agent,
messages=enriched_messages,
audit_logger=audit_logger,
tool_registry=tool_registry if tool_registry else None,
)
# -------------------------------------------------------------------------
# Escalation post-check — evaluate rules against conversation metadata
# -------------------------------------------------------------------------
conversation_metadata = _build_conversation_metadata(recent_messages, user_text)
triggered_rule = check_escalation_rules(
agent=agent,
message_text=user_text,
conversation_metadata=conversation_metadata,
natural_lang_enabled=getattr(agent, "natural_language_escalation", False),
)
if triggered_rule and getattr(agent, "escalation_assignee", None):
escalation_redis = aioredis.from_url(settings.redis_url)
try:
response_text = await escalate_to_human(
tenant_id=msg.tenant_id,
agent=agent,
thread_id=thread_key,
trigger_reason=triggered_rule.get("condition", "rule triggered"),
recent_messages=recent_messages,
assignee_slack_user_id=agent.escalation_assignee,
bot_token=slack_bot_token,
redis=escalation_redis,
audit_logger=audit_logger,
user_id=user_id,
agent_id=agent_id_str,
)
finally:
await escalation_redis.aclose()
# Check if the response is a tool confirmation request
# The confirmation message template starts with a specific prefix
is_confirmation_request = response_text.startswith("This action requires your approval")
if is_confirmation_request:
# Store pending confirmation in Redis so the next message can resolve it
pending_entry = json.dumps({
"tool_name": _extract_tool_name_from_confirmation(response_text),
"message": response_text,
})
redis_client3 = aioredis.from_url(settings.redis_url)
try:
await redis_client3.setex(pending_confirm_key, _PENDING_TOOL_TTL, pending_entry)
finally:
await redis_client3.aclose()
logger.info(
"Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
msg.id,
agent.id,
msg.tenant_id,
len(recent_messages),
len(relevant_context),
)
# Send response via channel-aware routing
await _send_response(msg.channel, response_text, response_extras)
# -------------------------------------------------------------------------
# Memory persistence (after LLM response)
# -------------------------------------------------------------------------
# Only persist if this was a normal LLM response (not a confirmation request)
if not is_confirmation_request:
redis_client4 = aioredis.from_url(settings.redis_url)
try:
# 3. Append both turns to Redis sliding window
await append_message(redis_client4, msg.tenant_id, agent_id_str, user_id, "user", user_text)
await append_message(redis_client4, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
finally:
await redis_client4.aclose()
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
messages_to_embed = [
{"role": "user", "content": user_text},
{"role": "assistant", "content": response_text},
]
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
return {
"message_id": msg.id,
"response": response_text,
"tenant_id": msg.tenant_id,
}
def _build_response_extras(
channel: Any,
extras: dict[str, Any],
slack_bot_token: str,
) -> dict[str, Any]:
"""
Build the response_extras dict for channel-aware outbound delivery.
For Slack: injects slack_bot_token into extras["bot_token"] so _send_response
can use it for chat.update calls.
For WhatsApp: extras already contain phone_number_id, bot_token (access_token),
and wa_id — no transformation needed.
Args:
channel: Channel name from KonstructMessage.channel.
extras: Incoming extras from handle_message.
slack_bot_token: Bot token loaded from DB channel_connections.
Returns:
Dict suitable for passing to _send_response.
"""
channel_str = str(channel) if channel else ""
if channel_str == "slack":
return {
"bot_token": slack_bot_token,
"channel_id": extras.get("channel_id", "") or "",
"placeholder_ts": extras.get("placeholder_ts", "") or "",
}
elif channel_str == "whatsapp":
return {
"phone_number_id": extras.get("phone_number_id", "") or "",
"bot_token": extras.get("bot_token", "") or "",
"wa_id": extras.get("wa_id", "") or "",
}
else:
return dict(extras)
def _build_conversation_metadata(
recent_messages: list[dict],
current_text: str,
) -> dict[str, Any]:
"""
Build conversation metadata dict for escalation rule evaluation.
Scans recent messages and current_text for billing keywords and counts occurrences.
Returns a dict with:
- "billing_dispute" (bool): True if any billing keyword found in any message
- "attempts" (int): count of messages containing billing keywords
This is the v1 keyword-based metadata detection (per STATE.md decisions).
Args:
recent_messages: List of {"role", "content"} dicts from Redis sliding window.
current_text: The user's current message text.
Returns:
Dict with billing_dispute and attempts keys.
"""
billing_keywords = {"billing", "invoice", "charge", "refund", "payment", "subscription"}
all_texts = [m.get("content", "") for m in recent_messages] + [current_text]
billing_count = sum(1 for t in all_texts if any(kw in t.lower() for kw in billing_keywords))
return {
"billing_dispute": billing_count > 0,
"attempts": billing_count,
}
async def _execute_pending_tool(
pending_data: dict,
tenant_uuid: uuid.UUID,
agent: "Agent",
audit_logger: "AuditLogger",
) -> str:
"""
Execute a tool that was previously paused waiting for user confirmation.
Since we don't re-execute the full LLM tool-call loop from a pending
confirmation (the agent already ran its reasoning), we simply inform the
user that the action was confirmed. The actual tool execution with the
stored tool_call is handled here.
Args:
pending_data: Dict stored in Redis with tool_name and message.
tenant_uuid: Tenant UUID for audit logging.
agent: Agent that originally invoked the tool.
audit_logger: AuditLogger instance.
Returns:
A response string to send back to the user.
"""
tool_name = pending_data.get("tool_name", "the action")
return f"Confirmed. I'll proceed with {tool_name} now. (Full tool execution will be implemented in Phase 3 with per-tenant OAuth.)"
def _extract_tool_name_from_confirmation(confirmation_message: str) -> str:
"""Extract tool name from a confirmation message for Redis storage."""
# The confirmation template includes: "**Tool:** {tool_name}"
for line in confirmation_message.splitlines():
if line.startswith("**Tool:**"):
return line.replace("**Tool:**", "").strip()
return "unknown_tool"
async def _send_response(
channel: Any,
text: str,
extras: dict,
) -> None:
"""
Channel-aware outbound routing — dispatch a response to the correct channel.
Checks ``channel`` and routes to:
- ``slack``: calls ``_update_slack_placeholder`` to replace the "Thinking..." message
- ``whatsapp``: calls ``send_whatsapp_message`` via Meta Cloud API
- other channels: logs a warning and returns (no-op for unsupported channels)
Args:
channel: Channel name from KonstructMessage.channel (e.g. "slack", "whatsapp").
text: Response text to send.
extras: Channel-specific metadata dict.
For Slack: ``bot_token``, ``channel_id``, ``placeholder_ts``
For WhatsApp: ``phone_number_id``, ``bot_token`` (access_token), ``wa_id``
"""
channel_str = str(channel) if channel else ""
if channel_str == "slack":
bot_token: str = extras.get("bot_token", "") or ""
channel_id: str = extras.get("channel_id", "") or ""
placeholder_ts: str = extras.get("placeholder_ts", "") or ""
if not channel_id or not placeholder_ts:
logger.warning(
"_send_response: Slack channel missing channel_id or placeholder_ts in extras"
)
return
await _update_slack_placeholder(
bot_token=bot_token,
channel_id=channel_id,
placeholder_ts=placeholder_ts,
text=text,
)
elif channel_str == "whatsapp":
phone_number_id: str = extras.get("phone_number_id", "") or ""
access_token: str = extras.get("bot_token", "") or ""
wa_id: str = extras.get("wa_id", "") or ""
if not phone_number_id or not wa_id:
logger.warning(
"_send_response: WhatsApp channel missing phone_number_id or wa_id in extras"
)
return
await send_whatsapp_message(
phone_number_id=phone_number_id,
access_token=access_token,
recipient_wa_id=wa_id,
text=text,
)
else:
logger.warning(
"_send_response: unsupported channel=%r — response not delivered", channel
)
async def _update_slack_placeholder(
bot_token: str,
channel_id: str,
placeholder_ts: str,
text: str,
) -> None:
"""
Replace the "Thinking..." placeholder message with the real agent response.
Uses Slack's chat.update API via httpx (no slack-bolt dependency in
orchestrator — keeps the service boundary clean).
Per user decision: responses are always posted in threads (thread_ts is
set to placeholder_ts — the placeholder was posted in-thread).
Args:
bot_token: Slack bot token (xoxb-...) for this tenant.
channel_id: Slack channel ID where the placeholder was posted.
placeholder_ts: Slack message timestamp of the placeholder to replace.
text: The real LLM response to replace the placeholder with.
"""
import httpx
if not bot_token:
# No bot token available — cannot update via Slack API.
# This happens when channel_connections has no bot_token in config.
# Log and continue — the placeholder will remain as "Thinking...".
logger.warning(
"No Slack bot token for channel=%s placeholder_ts=%s — cannot update placeholder",
channel_id,
placeholder_ts,
)
return
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client:
response = await client.post(
"https://slack.com/api/chat.update",
headers={"Authorization": f"Bearer {bot_token}"},
json={
"channel": channel_id,
"ts": placeholder_ts,
"text": text,
},
)
data = response.json()
if not data.get("ok"):
logger.error(
"chat.update failed: channel=%s ts=%s error=%r",
channel_id,
placeholder_ts,
data.get("error"),
)
except Exception:
logger.exception(
"Failed to update Slack placeholder: channel=%s ts=%s",
channel_id,
placeholder_ts,
)