feat(streaming): wire streaming path through orchestrator task pipeline
- _stream_agent_response_to_redis() publishes chunk/done messages to Redis pub-sub - _process_message() uses streaming path for web channel with no tools registered - Non-web channels (Slack, WhatsApp) and tool-enabled agents use non-streaming run_agent() - streaming_delivered flag prevents double-publish when streaming path is active - _send_response() web branch changed from 'response' to 'done' message type for consistency
This commit is contained in:
@@ -66,7 +66,7 @@ import redis.asyncio as aioredis
|
|||||||
|
|
||||||
from gateway.channels.whatsapp import send_whatsapp_message
|
from gateway.channels.whatsapp import send_whatsapp_message
|
||||||
from orchestrator.agents.builder import build_messages_with_memory
|
from orchestrator.agents.builder import build_messages_with_memory
|
||||||
from orchestrator.agents.runner import run_agent
|
from orchestrator.agents.runner import run_agent, run_agent_streaming
|
||||||
from orchestrator.audit.logger import AuditLogger
|
from orchestrator.audit.logger import AuditLogger
|
||||||
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
|
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
|
||||||
from orchestrator.main import app
|
from orchestrator.main import app
|
||||||
@@ -526,8 +526,32 @@ async def _process_message(
|
|||||||
tool_registry = get_tools_for_agent(agent)
|
tool_registry = get_tools_for_agent(agent)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
# Run agent with tool loop
|
# Run agent — streaming for web channel (no tools), non-streaming otherwise
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
|
is_web_channel = str(msg.channel) == "web"
|
||||||
|
has_tools = bool(tool_registry)
|
||||||
|
|
||||||
|
# Track whether the streaming path already published to Redis so we can
|
||||||
|
# skip the _send_response() call below (avoids a duplicate publish).
|
||||||
|
streaming_delivered = False
|
||||||
|
|
||||||
|
if is_web_channel and not has_tools:
|
||||||
|
# Streaming path: yield tokens directly to Redis pub-sub so the
|
||||||
|
# WebSocket handler can forward them to the browser immediately.
|
||||||
|
# The tool-call loop is skipped because there are no tools registered.
|
||||||
|
web_conversation_id: str = extras.get("conversation_id", "") or ""
|
||||||
|
web_tenant_id: str = extras.get("tenant_id", "") or str(msg.tenant_id or "")
|
||||||
|
response_text = await _stream_agent_response_to_redis(
|
||||||
|
msg=msg,
|
||||||
|
agent=agent,
|
||||||
|
messages=enriched_messages,
|
||||||
|
conversation_id=web_conversation_id,
|
||||||
|
tenant_id=web_tenant_id,
|
||||||
|
)
|
||||||
|
streaming_delivered = True
|
||||||
|
else:
|
||||||
|
# Non-streaming path: tool-call loop + full response (Slack, WhatsApp,
|
||||||
|
# and web channel when tools are registered).
|
||||||
response_text = await run_agent(
|
response_text = await run_agent(
|
||||||
msg,
|
msg,
|
||||||
agent,
|
agent,
|
||||||
@@ -592,7 +616,11 @@ async def _process_message(
|
|||||||
len(relevant_context),
|
len(relevant_context),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Send response via channel-aware routing
|
# Send response via channel-aware routing.
|
||||||
|
# Skip for the streaming path — _stream_agent_response_to_redis already
|
||||||
|
# published chunk + done messages to Redis; calling _send_response here
|
||||||
|
# would publish a duplicate done message.
|
||||||
|
if not streaming_delivered:
|
||||||
await _send_response(msg.channel, response_text, response_extras)
|
await _send_response(msg.channel, response_text, response_extras)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
@@ -733,6 +761,87 @@ def _extract_tool_name_from_confirmation(confirmation_message: str) -> str:
|
|||||||
return "unknown_tool"
|
return "unknown_tool"
|
||||||
|
|
||||||
|
|
||||||
|
# Fallback text for streaming errors (mirrors runner._FALLBACK_RESPONSE)
|
||||||
|
_FALLBACK_RESPONSE_TEXT = (
|
||||||
|
"I'm having trouble processing your request right now. "
|
||||||
|
"Please try again in a moment."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _stream_agent_response_to_redis(
|
||||||
|
msg: "KonstructMessage",
|
||||||
|
agent: Any,
|
||||||
|
messages: list[dict],
|
||||||
|
conversation_id: str,
|
||||||
|
tenant_id: str,
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Stream LLM token chunks to Redis pub-sub for web channel delivery.
|
||||||
|
|
||||||
|
Calls run_agent_streaming() and publishes each token as a
|
||||||
|
``{"type": "chunk", "text": "<token>"}`` message to the webchat response
|
||||||
|
channel. Publishes a final ``{"type": "done", "text": "<full_response>"}``
|
||||||
|
when the stream completes.
|
||||||
|
|
||||||
|
The WebSocket handler (web.py) listens on this channel and forwards
|
||||||
|
chunk/done messages directly to the browser, enabling word-by-word display.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg: The inbound KonstructMessage.
|
||||||
|
agent: The ORM Agent instance.
|
||||||
|
messages: Memory-enriched messages array.
|
||||||
|
conversation_id: Web conversation UUID string.
|
||||||
|
tenant_id: Konstruct tenant UUID string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The full assembled response text (for memory persistence and audit).
|
||||||
|
"""
|
||||||
|
response_channel = webchat_response_key(tenant_id, conversation_id)
|
||||||
|
chunks: list[str] = []
|
||||||
|
full_response = _FALLBACK_RESPONSE_TEXT
|
||||||
|
|
||||||
|
publish_redis = aioredis.from_url(settings.redis_url)
|
||||||
|
try:
|
||||||
|
async for token in run_agent_streaming(msg, agent, messages=messages):
|
||||||
|
chunks.append(token)
|
||||||
|
await publish_redis.publish(
|
||||||
|
response_channel,
|
||||||
|
json.dumps({"type": "chunk", "text": token}),
|
||||||
|
)
|
||||||
|
|
||||||
|
full_response = "".join(chunks) or _FALLBACK_RESPONSE_TEXT
|
||||||
|
await publish_redis.publish(
|
||||||
|
response_channel,
|
||||||
|
json.dumps({
|
||||||
|
"type": "done",
|
||||||
|
"text": full_response,
|
||||||
|
"conversation_id": conversation_id,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"Streaming agent response failed for conversation=%s tenant=%s",
|
||||||
|
conversation_id,
|
||||||
|
tenant_id,
|
||||||
|
)
|
||||||
|
# Publish a done marker with fallback so the browser doesn't hang
|
||||||
|
try:
|
||||||
|
await publish_redis.publish(
|
||||||
|
response_channel,
|
||||||
|
json.dumps({
|
||||||
|
"type": "done",
|
||||||
|
"text": full_response,
|
||||||
|
"conversation_id": conversation_id,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
await publish_redis.aclose()
|
||||||
|
|
||||||
|
return full_response
|
||||||
|
|
||||||
|
|
||||||
async def _send_response(
|
async def _send_response(
|
||||||
channel: Any,
|
channel: Any,
|
||||||
text: str,
|
text: str,
|
||||||
@@ -792,7 +901,9 @@ async def _send_response(
|
|||||||
)
|
)
|
||||||
|
|
||||||
elif channel_str == "web":
|
elif channel_str == "web":
|
||||||
# Publish agent response to Redis pub-sub so the WebSocket handler can deliver it
|
# Publish agent response to Redis pub-sub so the WebSocket handler can deliver it.
|
||||||
|
# Uses "done" type (consistent with streaming path) so the WebSocket handler
|
||||||
|
# processes it identically whether or not streaming was used.
|
||||||
web_conversation_id: str = extras.get("conversation_id", "") or ""
|
web_conversation_id: str = extras.get("conversation_id", "") or ""
|
||||||
web_tenant_id: str = extras.get("tenant_id", "") or ""
|
web_tenant_id: str = extras.get("tenant_id", "") or ""
|
||||||
|
|
||||||
@@ -808,7 +919,7 @@ async def _send_response(
|
|||||||
await publish_redis.publish(
|
await publish_redis.publish(
|
||||||
response_channel,
|
response_channel,
|
||||||
json.dumps({
|
json.dumps({
|
||||||
"type": "response",
|
"type": "done",
|
||||||
"text": text,
|
"text": text,
|
||||||
"conversation_id": web_conversation_id,
|
"conversation_id": web_conversation_id,
|
||||||
}),
|
}),
|
||||||
|
|||||||
Reference in New Issue
Block a user