Compare commits

..

5 Commits

Author SHA1 Message Date
5e4d9ce144 chore: update portal submodule ref to include streaming changes 2026-03-25 17:57:23 -06:00
61b8762bac feat(streaming): update WebSocket handler to forward streaming chunks to browser
- Pub-sub loop now handles 'chunk' and 'done' message types (not just 'response')
- 'chunk' messages are forwarded immediately via websocket.send_json
- 'done' message breaks the loop and triggers DB persistence of full response
- Sends final 'done' JSON to browser to signal stream completion
- Legacy 'response' type no longer emitted from orchestrator (now unified as 'done')
2026-03-25 17:57:08 -06:00
5fb79beb76 feat(streaming): wire streaming path through orchestrator task pipeline
- _stream_agent_response_to_redis() publishes chunk/done messages to Redis pub-sub
- _process_message() uses streaming path for web channel with no tools registered
- Non-web channels (Slack, WhatsApp) and tool-enabled agents use non-streaming run_agent()
- streaming_delivered flag prevents double-publish when streaming path is active
- _send_response() web branch changed from 'response' to 'done' message type for consistency
2026-03-25 17:57:04 -06:00
9090b54f43 feat(streaming): add run_agent_streaming() to orchestrator runner
- run_agent_streaming() calls POST /complete/stream and yields token strings
- Reads NDJSON lines from the streaming response, yields on 'chunk' events
- On 'error' or connection failure, yields the fallback response string
- Tool calls are not supported in the streaming path
- Existing run_agent() (non-streaming, tool-call loop) is unchanged
2026-03-25 17:57:00 -06:00
f3e358b418 feat(streaming): add complete_stream() generator and POST /complete/stream NDJSON endpoint to llm-pool
- complete_stream() in router.py yields token strings via acompletion(stream=True)
- POST /complete/stream returns NDJSON: chunk lines then a done line
- Streaming path does not support tool calls (plain text only)
- Non-streaming POST /complete endpoint unchanged
2026-03-25 17:56:56 -06:00
6 changed files with 368 additions and 24 deletions

View File

@@ -245,15 +245,22 @@ async def _handle_websocket_connection(
handle_message.delay(task_payload) handle_message.delay(task_payload)
# ------------------------------------------------------------------- # -------------------------------------------------------------------
# d. Subscribe to Redis pub-sub and wait for agent response # d. Subscribe to Redis pub-sub and forward streaming chunks to client
#
# The orchestrator publishes two message types to the response channel:
# {"type": "chunk", "text": "<token>"} — zero or more times (streaming)
# {"type": "done", "text": "<full>", "conversation_id": "..."} — final marker
#
# We forward each "chunk" immediately to the browser so text appears
# word-by-word. On "done" we save the full response to the DB.
# ------------------------------------------------------------------- # -------------------------------------------------------------------
response_channel = webchat_response_key(tenant_id_str, saved_conversation_id) response_channel = webchat_response_key(tenant_id_str, saved_conversation_id)
subscribe_redis = aioredis.from_url(settings.redis_url) subscribe_redis = aioredis.from_url(settings.redis_url)
response_text: str = ""
try: try:
pubsub = subscribe_redis.pubsub() pubsub = subscribe_redis.pubsub()
await pubsub.subscribe(response_channel) await pubsub.subscribe(response_channel)
response_text: str = ""
deadline = asyncio.get_event_loop().time() + _RESPONSE_TIMEOUT_SECONDS deadline = asyncio.get_event_loop().time() + _RESPONSE_TIMEOUT_SECONDS
while asyncio.get_event_loop().time() < deadline: while asyncio.get_event_loop().time() < deadline:
@@ -261,10 +268,31 @@ async def _handle_websocket_connection(
if message and message.get("type") == "message": if message and message.get("type") == "message":
try: try:
payload = json.loads(message["data"]) payload = json.loads(message["data"])
response_text = payload.get("text", "")
except (json.JSONDecodeError, KeyError): except (json.JSONDecodeError, KeyError):
pass await asyncio.sleep(0.01)
continue
msg_type = payload.get("type")
if msg_type == "chunk":
# Forward token immediately — do not break the loop
token = payload.get("text", "")
if token:
try:
await websocket.send_json({
"type": "chunk",
"text": token,
})
except Exception:
# Client disconnected mid-stream — exit cleanly
break break
elif msg_type == "done":
# Final marker — full text for DB persistence
response_text = payload.get("text", "")
break
else:
await asyncio.sleep(0.05) await asyncio.sleep(0.05)
await pubsub.unsubscribe(response_channel) await pubsub.unsubscribe(response_channel)
@@ -272,7 +300,7 @@ async def _handle_websocket_connection(
await subscribe_redis.aclose() await subscribe_redis.aclose()
# ------------------------------------------------------------------- # -------------------------------------------------------------------
# e. Save assistant message and send response to client # e. Save assistant message and send final "done" to client
# ------------------------------------------------------------------- # -------------------------------------------------------------------
if response_text: if response_text:
rls_token2 = current_tenant_id.set(tenant_uuid) rls_token2 = current_tenant_id.set(tenant_uuid)
@@ -299,8 +327,9 @@ async def _handle_websocket_connection(
finally: finally:
current_tenant_id.reset(rls_token2) current_tenant_id.reset(rls_token2)
# Signal stream completion to the client
await websocket.send_json({ await websocket.send_json({
"type": "response", "type": "done",
"text": response_text, "text": response_text,
"conversation_id": saved_conversation_id, "conversation_id": saved_conversation_id,
}) })

View File

@@ -3,18 +3,22 @@ LLM Backend Pool — FastAPI service on port 8004.
Endpoints: Endpoints:
POST /complete — route a completion request through the LiteLLM Router. POST /complete — route a completion request through the LiteLLM Router.
POST /complete/stream — streaming variant; returns NDJSON token chunks.
GET /health — liveness probe. GET /health — liveness probe.
""" """
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
from typing import Any from typing import Any
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel from pydantic import BaseModel
from llm_pool.router import complete as router_complete from llm_pool.router import complete as router_complete
from llm_pool.router import complete_stream as router_complete_stream
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -69,6 +73,19 @@ class HealthResponse(BaseModel):
status: str status: str
class StreamCompleteRequest(BaseModel):
"""Body for POST /complete/stream."""
model: str
"""Model group name: "quality" or "fast"."""
messages: list[dict]
"""OpenAI-format message list."""
tenant_id: str
"""Konstruct tenant UUID for cost tracking."""
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Routes # Routes
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -123,3 +140,44 @@ async def complete_endpoint(request: CompleteRequest) -> CompleteResponse:
status_code=503, status_code=503,
content={"error": "All providers unavailable"}, content={"error": "All providers unavailable"},
) )
@app.post("/complete/stream")
async def complete_stream_endpoint(request: StreamCompleteRequest) -> StreamingResponse:
"""
Stream a completion through the LiteLLM Router using NDJSON.
Each line of the response body is a JSON object:
{"type": "chunk", "text": "<token>"} — zero or more times
{"type": "done"} — final line, signals end of stream
On provider failure, yields:
{"type": "error", "message": "All providers unavailable"}
The caller (orchestrator runner) reads line-by-line and forwards chunks
to Redis pub-sub for the web WebSocket handler.
NOTE: Tool calls are NOT supported in this endpoint — only plain text
streaming. Use POST /complete for tool-call responses.
"""
async def _generate() -> Any:
try:
async for token in router_complete_stream(
model_group=request.model,
messages=request.messages,
tenant_id=request.tenant_id,
):
yield json.dumps({"type": "chunk", "text": token}) + "\n"
yield json.dumps({"type": "done"}) + "\n"
except Exception:
logger.exception(
"Streaming LLM failed for tenant=%s model=%s",
request.tenant_id,
request.model,
)
yield json.dumps({"type": "error", "message": "All providers unavailable"}) + "\n"
return StreamingResponse(
_generate(),
media_type="application/x-ndjson",
)

View File

@@ -16,6 +16,7 @@ NOTE: LiteLLM is pinned to ==1.82.5 in pyproject.toml.
from __future__ import annotations from __future__ import annotations
import logging import logging
from collections.abc import AsyncGenerator
from typing import Any from typing import Any
from litellm import Router from litellm import Router
@@ -173,3 +174,48 @@ async def complete(
content: str = message.content or "" content: str = message.content or ""
return LLMResponse(content=content, tool_calls=tool_calls) return LLMResponse(content=content, tool_calls=tool_calls)
async def complete_stream(
model_group: str,
messages: list[dict],
tenant_id: str,
) -> AsyncGenerator[str, None]:
"""
Stream a completion from the LiteLLM Router, yielding token strings.
Only used for the web channel streaming path — does NOT support tool calls
(tool-call responses are not streamed). The caller is responsible for
assembling the full response from the yielded chunks.
Args:
model_group: "quality", "fast", etc. — selects the provider group.
messages: OpenAI-format message list.
tenant_id: Konstruct tenant UUID for cost tracking metadata.
Yields:
Token strings as they are generated by the LLM.
Raises:
Exception: Propagated if all providers (and fallbacks) fail.
"""
logger.info(
"LLM stream request",
extra={"model_group": model_group, "tenant_id": tenant_id},
)
response = await llm_router.acompletion(
model=model_group,
messages=messages,
metadata={"tenant_id": tenant_id},
stream=True,
)
async for chunk in response:
try:
delta = chunk.choices[0].delta
token = getattr(delta, "content", None)
if token:
yield token
except (IndexError, AttributeError):
continue

View File

@@ -16,13 +16,20 @@ Tool-call loop (Phase 2):
- Otherwise: append tool result as 'tool' role message, re-call LLM - Otherwise: append tool result as 'tool' role message, re-call LLM
Loop until LLM returns plain text (no tool_calls) or max_iterations reached. Loop until LLM returns plain text (no tool_calls) or max_iterations reached.
Max iterations = 5 (prevents runaway tool chains). Max iterations = 5 (prevents runaway tool chains).
Streaming (web channel only):
run_agent_streaming() calls POST /complete/stream and yields token strings.
Tool calls are NOT supported in the streaming path — only used for the final
text response after the tool-call loop has resolved (or when no tools needed).
""" """
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import time import time
import uuid import uuid
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
import httpx import httpx
@@ -268,6 +275,99 @@ async def run_agent(
return _FALLBACK_RESPONSE return _FALLBACK_RESPONSE
async def run_agent_streaming(
msg: KonstructMessage,
agent: Agent,
messages: list[dict] | None = None,
) -> AsyncGenerator[str, None]:
"""
Stream the final LLM response for an agent, yielding token strings.
This is the web-channel-only streaming path. It does NOT support tool calls —
tool resolution must be completed BEFORE calling this function (use run_agent()
for the tool-call loop, then only call this for streaming plain text responses).
The caller (tasks._process_message) is responsible for:
- Running the tool-call loop via run_agent() first if tools are registered
- Only falling into this path when the response will be plain text
- Publishing each yielded token to Redis pub-sub
For simple conversations (no tools), this streams the response directly.
Args:
msg: The inbound KonstructMessage.
agent: The ORM Agent instance.
messages: Memory-enriched messages array (from build_messages_with_memory).
When None, falls back to simple [system, user] construction.
Yields:
Token strings as generated by the LLM.
On error, yields the fallback response string as a single chunk.
"""
if messages is None:
from orchestrator.agents.builder import build_messages, build_system_prompt
system_prompt = build_system_prompt(agent)
user_text: str = msg.content.text or ""
messages = build_messages(
system_prompt=system_prompt,
user_message=user_text,
)
llm_stream_url = f"{settings.llm_pool_url}/complete/stream"
payload: dict[str, Any] = {
"model": agent.model_preference,
"messages": messages,
"tenant_id": str(msg.tenant_id) if msg.tenant_id else "",
}
try:
async with httpx.AsyncClient(timeout=_LLM_TIMEOUT) as client:
async with client.stream("POST", llm_stream_url, json=payload) as response:
if response.status_code != 200:
logger.error(
"LLM pool stream returned %d for tenant=%s agent=%s",
response.status_code,
msg.tenant_id,
agent.id,
)
yield _FALLBACK_RESPONSE
return
async for line in response.aiter_lines():
if not line.strip():
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
event_type = event.get("type")
if event_type == "chunk":
token = event.get("text", "")
if token:
yield token
elif event_type == "done":
return
elif event_type == "error":
logger.error(
"LLM pool stream error for tenant=%s agent=%s: %s",
msg.tenant_id,
agent.id,
event.get("message", "unknown"),
)
yield _FALLBACK_RESPONSE
return
except httpx.RequestError:
logger.exception(
"LLM pool stream unreachable for tenant=%s agent=%s url=%s",
msg.tenant_id,
agent.id,
llm_stream_url,
)
yield _FALLBACK_RESPONSE
def _get_last_user_message(messages: list[dict[str, Any]]) -> str: def _get_last_user_message(messages: list[dict[str, Any]]) -> str:
"""Extract the content of the last user message for audit summary.""" """Extract the content of the last user message for audit summary."""
for msg in reversed(messages): for msg in reversed(messages):

View File

@@ -66,7 +66,7 @@ import redis.asyncio as aioredis
from gateway.channels.whatsapp import send_whatsapp_message from gateway.channels.whatsapp import send_whatsapp_message
from orchestrator.agents.builder import build_messages_with_memory from orchestrator.agents.builder import build_messages_with_memory
from orchestrator.agents.runner import run_agent from orchestrator.agents.runner import run_agent, run_agent_streaming
from orchestrator.audit.logger import AuditLogger from orchestrator.audit.logger import AuditLogger
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
from orchestrator.main import app from orchestrator.main import app
@@ -526,8 +526,32 @@ async def _process_message(
tool_registry = get_tools_for_agent(agent) tool_registry = get_tools_for_agent(agent)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Run agent with tool loop # Run agent — streaming for web channel (no tools), non-streaming otherwise
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
is_web_channel = str(msg.channel) == "web"
has_tools = bool(tool_registry)
# Track whether the streaming path already published to Redis so we can
# skip the _send_response() call below (avoids a duplicate publish).
streaming_delivered = False
if is_web_channel and not has_tools:
# Streaming path: yield tokens directly to Redis pub-sub so the
# WebSocket handler can forward them to the browser immediately.
# The tool-call loop is skipped because there are no tools registered.
web_conversation_id: str = extras.get("conversation_id", "") or ""
web_tenant_id: str = extras.get("tenant_id", "") or str(msg.tenant_id or "")
response_text = await _stream_agent_response_to_redis(
msg=msg,
agent=agent,
messages=enriched_messages,
conversation_id=web_conversation_id,
tenant_id=web_tenant_id,
)
streaming_delivered = True
else:
# Non-streaming path: tool-call loop + full response (Slack, WhatsApp,
# and web channel when tools are registered).
response_text = await run_agent( response_text = await run_agent(
msg, msg,
agent, agent,
@@ -592,7 +616,11 @@ async def _process_message(
len(relevant_context), len(relevant_context),
) )
# Send response via channel-aware routing # Send response via channel-aware routing.
# Skip for the streaming path — _stream_agent_response_to_redis already
# published chunk + done messages to Redis; calling _send_response here
# would publish a duplicate done message.
if not streaming_delivered:
await _send_response(msg.channel, response_text, response_extras) await _send_response(msg.channel, response_text, response_extras)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
@@ -733,6 +761,87 @@ def _extract_tool_name_from_confirmation(confirmation_message: str) -> str:
return "unknown_tool" return "unknown_tool"
# Fallback text for streaming errors (mirrors runner._FALLBACK_RESPONSE)
_FALLBACK_RESPONSE_TEXT = (
"I'm having trouble processing your request right now. "
"Please try again in a moment."
)
async def _stream_agent_response_to_redis(
msg: "KonstructMessage",
agent: Any,
messages: list[dict],
conversation_id: str,
tenant_id: str,
) -> str:
"""
Stream LLM token chunks to Redis pub-sub for web channel delivery.
Calls run_agent_streaming() and publishes each token as a
``{"type": "chunk", "text": "<token>"}`` message to the webchat response
channel. Publishes a final ``{"type": "done", "text": "<full_response>"}``
when the stream completes.
The WebSocket handler (web.py) listens on this channel and forwards
chunk/done messages directly to the browser, enabling word-by-word display.
Args:
msg: The inbound KonstructMessage.
agent: The ORM Agent instance.
messages: Memory-enriched messages array.
conversation_id: Web conversation UUID string.
tenant_id: Konstruct tenant UUID string.
Returns:
The full assembled response text (for memory persistence and audit).
"""
response_channel = webchat_response_key(tenant_id, conversation_id)
chunks: list[str] = []
full_response = _FALLBACK_RESPONSE_TEXT
publish_redis = aioredis.from_url(settings.redis_url)
try:
async for token in run_agent_streaming(msg, agent, messages=messages):
chunks.append(token)
await publish_redis.publish(
response_channel,
json.dumps({"type": "chunk", "text": token}),
)
full_response = "".join(chunks) or _FALLBACK_RESPONSE_TEXT
await publish_redis.publish(
response_channel,
json.dumps({
"type": "done",
"text": full_response,
"conversation_id": conversation_id,
}),
)
except Exception:
logger.exception(
"Streaming agent response failed for conversation=%s tenant=%s",
conversation_id,
tenant_id,
)
# Publish a done marker with fallback so the browser doesn't hang
try:
await publish_redis.publish(
response_channel,
json.dumps({
"type": "done",
"text": full_response,
"conversation_id": conversation_id,
}),
)
except Exception:
pass
finally:
await publish_redis.aclose()
return full_response
async def _send_response( async def _send_response(
channel: Any, channel: Any,
text: str, text: str,
@@ -792,7 +901,9 @@ async def _send_response(
) )
elif channel_str == "web": elif channel_str == "web":
# Publish agent response to Redis pub-sub so the WebSocket handler can deliver it # Publish agent response to Redis pub-sub so the WebSocket handler can deliver it.
# Uses "done" type (consistent with streaming path) so the WebSocket handler
# processes it identically whether or not streaming was used.
web_conversation_id: str = extras.get("conversation_id", "") or "" web_conversation_id: str = extras.get("conversation_id", "") or ""
web_tenant_id: str = extras.get("tenant_id", "") or "" web_tenant_id: str = extras.get("tenant_id", "") or ""
@@ -808,7 +919,7 @@ async def _send_response(
await publish_redis.publish( await publish_redis.publish(
response_channel, response_channel,
json.dumps({ json.dumps({
"type": "response", "type": "done",
"text": text, "text": text,
"conversation_id": web_conversation_id, "conversation_id": web_conversation_id,
}), }),

Submodule packages/portal updated: c2586a2260...cd8899f070