diff --git a/packages/orchestrator/orchestrator/tasks.py b/packages/orchestrator/orchestrator/tasks.py index 9dd8896..d74d501 100644 --- a/packages/orchestrator/orchestrator/tasks.py +++ b/packages/orchestrator/orchestrator/tasks.py @@ -66,7 +66,7 @@ import redis.asyncio as aioredis from gateway.channels.whatsapp import send_whatsapp_message from orchestrator.agents.builder import build_messages_with_memory -from orchestrator.agents.runner import run_agent +from orchestrator.agents.runner import run_agent, run_agent_streaming from orchestrator.audit.logger import AuditLogger from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human from orchestrator.main import app @@ -526,15 +526,39 @@ async def _process_message( tool_registry = get_tools_for_agent(agent) # ------------------------------------------------------------------------- - # Run agent with tool loop + # Run agent — streaming for web channel (no tools), non-streaming otherwise # ------------------------------------------------------------------------- - response_text = await run_agent( - msg, - agent, - messages=enriched_messages, - audit_logger=audit_logger, - tool_registry=tool_registry if tool_registry else None, - ) + is_web_channel = str(msg.channel) == "web" + has_tools = bool(tool_registry) + + # Track whether the streaming path already published to Redis so we can + # skip the _send_response() call below (avoids a duplicate publish). + streaming_delivered = False + + if is_web_channel and not has_tools: + # Streaming path: yield tokens directly to Redis pub-sub so the + # WebSocket handler can forward them to the browser immediately. + # The tool-call loop is skipped because there are no tools registered. + web_conversation_id: str = extras.get("conversation_id", "") or "" + web_tenant_id: str = extras.get("tenant_id", "") or str(msg.tenant_id or "") + response_text = await _stream_agent_response_to_redis( + msg=msg, + agent=agent, + messages=enriched_messages, + conversation_id=web_conversation_id, + tenant_id=web_tenant_id, + ) + streaming_delivered = True + else: + # Non-streaming path: tool-call loop + full response (Slack, WhatsApp, + # and web channel when tools are registered). + response_text = await run_agent( + msg, + agent, + messages=enriched_messages, + audit_logger=audit_logger, + tool_registry=tool_registry if tool_registry else None, + ) # ------------------------------------------------------------------------- # Escalation post-check — evaluate rules against conversation metadata @@ -592,8 +616,12 @@ async def _process_message( len(relevant_context), ) - # Send response via channel-aware routing - await _send_response(msg.channel, response_text, response_extras) + # Send response via channel-aware routing. + # Skip for the streaming path — _stream_agent_response_to_redis already + # published chunk + done messages to Redis; calling _send_response here + # would publish a duplicate done message. + if not streaming_delivered: + await _send_response(msg.channel, response_text, response_extras) # ------------------------------------------------------------------------- # Memory persistence (after LLM response) @@ -733,6 +761,87 @@ def _extract_tool_name_from_confirmation(confirmation_message: str) -> str: return "unknown_tool" +# Fallback text for streaming errors (mirrors runner._FALLBACK_RESPONSE) +_FALLBACK_RESPONSE_TEXT = ( + "I'm having trouble processing your request right now. " + "Please try again in a moment." +) + + +async def _stream_agent_response_to_redis( + msg: "KonstructMessage", + agent: Any, + messages: list[dict], + conversation_id: str, + tenant_id: str, +) -> str: + """ + Stream LLM token chunks to Redis pub-sub for web channel delivery. + + Calls run_agent_streaming() and publishes each token as a + ``{"type": "chunk", "text": ""}`` message to the webchat response + channel. Publishes a final ``{"type": "done", "text": ""}`` + when the stream completes. + + The WebSocket handler (web.py) listens on this channel and forwards + chunk/done messages directly to the browser, enabling word-by-word display. + + Args: + msg: The inbound KonstructMessage. + agent: The ORM Agent instance. + messages: Memory-enriched messages array. + conversation_id: Web conversation UUID string. + tenant_id: Konstruct tenant UUID string. + + Returns: + The full assembled response text (for memory persistence and audit). + """ + response_channel = webchat_response_key(tenant_id, conversation_id) + chunks: list[str] = [] + full_response = _FALLBACK_RESPONSE_TEXT + + publish_redis = aioredis.from_url(settings.redis_url) + try: + async for token in run_agent_streaming(msg, agent, messages=messages): + chunks.append(token) + await publish_redis.publish( + response_channel, + json.dumps({"type": "chunk", "text": token}), + ) + + full_response = "".join(chunks) or _FALLBACK_RESPONSE_TEXT + await publish_redis.publish( + response_channel, + json.dumps({ + "type": "done", + "text": full_response, + "conversation_id": conversation_id, + }), + ) + except Exception: + logger.exception( + "Streaming agent response failed for conversation=%s tenant=%s", + conversation_id, + tenant_id, + ) + # Publish a done marker with fallback so the browser doesn't hang + try: + await publish_redis.publish( + response_channel, + json.dumps({ + "type": "done", + "text": full_response, + "conversation_id": conversation_id, + }), + ) + except Exception: + pass + finally: + await publish_redis.aclose() + + return full_response + + async def _send_response( channel: Any, text: str, @@ -792,7 +901,9 @@ async def _send_response( ) elif channel_str == "web": - # Publish agent response to Redis pub-sub so the WebSocket handler can deliver it + # Publish agent response to Redis pub-sub so the WebSocket handler can deliver it. + # Uses "done" type (consistent with streaming path) so the WebSocket handler + # processes it identically whether or not streaming was used. web_conversation_id: str = extras.get("conversation_id", "") or "" web_tenant_id: str = extras.get("tenant_id", "") or "" @@ -808,7 +919,7 @@ async def _send_response( await publish_redis.publish( response_channel, json.dumps({ - "type": "response", + "type": "done", "text": text, "conversation_id": web_conversation_id, }),