feat(streaming): add run_agent_streaming() to orchestrator runner

- run_agent_streaming() calls POST /complete/stream and yields token strings
- Reads NDJSON lines from the streaming response, yields on 'chunk' events
- On 'error' or connection failure, yields the fallback response string
- Tool calls are not supported in the streaming path
- Existing run_agent() (non-streaming, tool-call loop) is unchanged
This commit is contained in:
2026-03-25 17:57:00 -06:00
parent f3e358b418
commit 9090b54f43

View File

@@ -16,13 +16,20 @@ Tool-call loop (Phase 2):
- Otherwise: append tool result as 'tool' role message, re-call LLM - Otherwise: append tool result as 'tool' role message, re-call LLM
Loop until LLM returns plain text (no tool_calls) or max_iterations reached. Loop until LLM returns plain text (no tool_calls) or max_iterations reached.
Max iterations = 5 (prevents runaway tool chains). Max iterations = 5 (prevents runaway tool chains).
Streaming (web channel only):
run_agent_streaming() calls POST /complete/stream and yields token strings.
Tool calls are NOT supported in the streaming path — only used for the final
text response after the tool-call loop has resolved (or when no tools needed).
""" """
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import time import time
import uuid import uuid
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import httpx import httpx
@@ -268,6 +275,99 @@ async def run_agent(
return _FALLBACK_RESPONSE return _FALLBACK_RESPONSE
async def run_agent_streaming(
msg: KonstructMessage,
agent: Agent,
messages: list[dict] | None = None,
) -> AsyncGenerator[str, None]:
"""
Stream the final LLM response for an agent, yielding token strings.
This is the web-channel-only streaming path. It does NOT support tool calls —
tool resolution must be completed BEFORE calling this function (use run_agent()
for the tool-call loop, then only call this for streaming plain text responses).
The caller (tasks._process_message) is responsible for:
- Running the tool-call loop via run_agent() first if tools are registered
- Only falling into this path when the response will be plain text
- Publishing each yielded token to Redis pub-sub
For simple conversations (no tools), this streams the response directly.
Args:
msg: The inbound KonstructMessage.
agent: The ORM Agent instance.
messages: Memory-enriched messages array (from build_messages_with_memory).
When None, falls back to simple [system, user] construction.
Yields:
Token strings as generated by the LLM.
On error, yields the fallback response string as a single chunk.
"""
if messages is None:
from orchestrator.agents.builder import build_messages, build_system_prompt
system_prompt = build_system_prompt(agent)
user_text: str = msg.content.text or ""
messages = build_messages(
system_prompt=system_prompt,
user_message=user_text,
)
llm_stream_url = f"{settings.llm_pool_url}/complete/stream"
payload: dict[str, Any] = {
"model": agent.model_preference,
"messages": messages,
"tenant_id": str(msg.tenant_id) if msg.tenant_id else "",
}
try:
async with httpx.AsyncClient(timeout=_LLM_TIMEOUT) as client:
async with client.stream("POST", llm_stream_url, json=payload) as response:
if response.status_code != 200:
logger.error(
"LLM pool stream returned %d for tenant=%s agent=%s",
response.status_code,
msg.tenant_id,
agent.id,
)
yield _FALLBACK_RESPONSE
return
async for line in response.aiter_lines():
if not line.strip():
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
event_type = event.get("type")
if event_type == "chunk":
token = event.get("text", "")
if token:
yield token
elif event_type == "done":
return
elif event_type == "error":
logger.error(
"LLM pool stream error for tenant=%s agent=%s: %s",
msg.tenant_id,
agent.id,
event.get("message", "unknown"),
)
yield _FALLBACK_RESPONSE
return
except httpx.RequestError:
logger.exception(
"LLM pool stream unreachable for tenant=%s agent=%s url=%s",
msg.tenant_id,
agent.id,
llm_stream_url,
)
yield _FALLBACK_RESPONSE
def _get_last_user_message(messages: list[dict[str, Any]]) -> str: def _get_last_user_message(messages: list[dict[str, Any]]) -> str:
"""Extract the content of the last user message for audit summary.""" """Extract the content of the last user message for audit summary."""
for msg in reversed(messages): for msg in reversed(messages):