From 9090b54f43c17a2f7d6347cc75c33ea7c465e9c7 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Wed, 25 Mar 2026 17:57:00 -0600 Subject: [PATCH] feat(streaming): add run_agent_streaming() to orchestrator runner - run_agent_streaming() calls POST /complete/stream and yields token strings - Reads NDJSON lines from the streaming response, yields on 'chunk' events - On 'error' or connection failure, yields the fallback response string - Tool calls are not supported in the streaming path - Existing run_agent() (non-streaming, tool-call loop) is unchanged --- .../orchestrator/agents/runner.py | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/packages/orchestrator/orchestrator/agents/runner.py b/packages/orchestrator/orchestrator/agents/runner.py index 1d2796c..c53f881 100644 --- a/packages/orchestrator/orchestrator/agents/runner.py +++ b/packages/orchestrator/orchestrator/agents/runner.py @@ -16,13 +16,20 @@ Tool-call loop (Phase 2): - Otherwise: append tool result as 'tool' role message, re-call LLM Loop until LLM returns plain text (no tool_calls) or max_iterations reached. Max iterations = 5 (prevents runaway tool chains). + +Streaming (web channel only): + run_agent_streaming() calls POST /complete/stream and yields token strings. + Tool calls are NOT supported in the streaming path — only used for the final + text response after the tool-call loop has resolved (or when no tools needed). """ from __future__ import annotations +import json import logging import time import uuid +from collections.abc import AsyncGenerator from typing import TYPE_CHECKING, Any import httpx @@ -268,6 +275,99 @@ async def run_agent( return _FALLBACK_RESPONSE +async def run_agent_streaming( + msg: KonstructMessage, + agent: Agent, + messages: list[dict] | None = None, +) -> AsyncGenerator[str, None]: + """ + Stream the final LLM response for an agent, yielding token strings. + + This is the web-channel-only streaming path. It does NOT support tool calls — + tool resolution must be completed BEFORE calling this function (use run_agent() + for the tool-call loop, then only call this for streaming plain text responses). + + The caller (tasks._process_message) is responsible for: + - Running the tool-call loop via run_agent() first if tools are registered + - Only falling into this path when the response will be plain text + - Publishing each yielded token to Redis pub-sub + + For simple conversations (no tools), this streams the response directly. + + Args: + msg: The inbound KonstructMessage. + agent: The ORM Agent instance. + messages: Memory-enriched messages array (from build_messages_with_memory). + When None, falls back to simple [system, user] construction. + + Yields: + Token strings as generated by the LLM. + On error, yields the fallback response string as a single chunk. + """ + if messages is None: + from orchestrator.agents.builder import build_messages, build_system_prompt + system_prompt = build_system_prompt(agent) + user_text: str = msg.content.text or "" + messages = build_messages( + system_prompt=system_prompt, + user_message=user_text, + ) + + llm_stream_url = f"{settings.llm_pool_url}/complete/stream" + payload: dict[str, Any] = { + "model": agent.model_preference, + "messages": messages, + "tenant_id": str(msg.tenant_id) if msg.tenant_id else "", + } + + try: + async with httpx.AsyncClient(timeout=_LLM_TIMEOUT) as client: + async with client.stream("POST", llm_stream_url, json=payload) as response: + if response.status_code != 200: + logger.error( + "LLM pool stream returned %d for tenant=%s agent=%s", + response.status_code, + msg.tenant_id, + agent.id, + ) + yield _FALLBACK_RESPONSE + return + + async for line in response.aiter_lines(): + if not line.strip(): + continue + try: + event = json.loads(line) + except json.JSONDecodeError: + continue + + event_type = event.get("type") + if event_type == "chunk": + token = event.get("text", "") + if token: + yield token + elif event_type == "done": + return + elif event_type == "error": + logger.error( + "LLM pool stream error for tenant=%s agent=%s: %s", + msg.tenant_id, + agent.id, + event.get("message", "unknown"), + ) + yield _FALLBACK_RESPONSE + return + + except httpx.RequestError: + logger.exception( + "LLM pool stream unreachable for tenant=%s agent=%s url=%s", + msg.tenant_id, + agent.id, + llm_stream_url, + ) + yield _FALLBACK_RESPONSE + + def _get_last_user_message(messages: list[dict[str, Any]]) -> str: """Extract the content of the last user message for audit summary.""" for msg in reversed(messages):