Compare commits
5 Commits
b6c8da8cca
...
5e4d9ce144
| Author | SHA1 | Date | |
|---|---|---|---|
| 5e4d9ce144 | |||
| 61b8762bac | |||
| 5fb79beb76 | |||
| 9090b54f43 | |||
| f3e358b418 |
@@ -245,15 +245,22 @@ async def _handle_websocket_connection(
|
|||||||
handle_message.delay(task_payload)
|
handle_message.delay(task_payload)
|
||||||
|
|
||||||
# -------------------------------------------------------------------
|
# -------------------------------------------------------------------
|
||||||
# d. Subscribe to Redis pub-sub and wait for agent response
|
# d. Subscribe to Redis pub-sub and forward streaming chunks to client
|
||||||
|
#
|
||||||
|
# The orchestrator publishes two message types to the response channel:
|
||||||
|
# {"type": "chunk", "text": "<token>"} — zero or more times (streaming)
|
||||||
|
# {"type": "done", "text": "<full>", "conversation_id": "..."} — final marker
|
||||||
|
#
|
||||||
|
# We forward each "chunk" immediately to the browser so text appears
|
||||||
|
# word-by-word. On "done" we save the full response to the DB.
|
||||||
# -------------------------------------------------------------------
|
# -------------------------------------------------------------------
|
||||||
response_channel = webchat_response_key(tenant_id_str, saved_conversation_id)
|
response_channel = webchat_response_key(tenant_id_str, saved_conversation_id)
|
||||||
subscribe_redis = aioredis.from_url(settings.redis_url)
|
subscribe_redis = aioredis.from_url(settings.redis_url)
|
||||||
|
response_text: str = ""
|
||||||
try:
|
try:
|
||||||
pubsub = subscribe_redis.pubsub()
|
pubsub = subscribe_redis.pubsub()
|
||||||
await pubsub.subscribe(response_channel)
|
await pubsub.subscribe(response_channel)
|
||||||
|
|
||||||
response_text: str = ""
|
|
||||||
deadline = asyncio.get_event_loop().time() + _RESPONSE_TIMEOUT_SECONDS
|
deadline = asyncio.get_event_loop().time() + _RESPONSE_TIMEOUT_SECONDS
|
||||||
|
|
||||||
while asyncio.get_event_loop().time() < deadline:
|
while asyncio.get_event_loop().time() < deadline:
|
||||||
@@ -261,10 +268,31 @@ async def _handle_websocket_connection(
|
|||||||
if message and message.get("type") == "message":
|
if message and message.get("type") == "message":
|
||||||
try:
|
try:
|
||||||
payload = json.loads(message["data"])
|
payload = json.loads(message["data"])
|
||||||
response_text = payload.get("text", "")
|
|
||||||
except (json.JSONDecodeError, KeyError):
|
except (json.JSONDecodeError, KeyError):
|
||||||
pass
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
|
msg_type = payload.get("type")
|
||||||
|
|
||||||
|
if msg_type == "chunk":
|
||||||
|
# Forward token immediately — do not break the loop
|
||||||
|
token = payload.get("text", "")
|
||||||
|
if token:
|
||||||
|
try:
|
||||||
|
await websocket.send_json({
|
||||||
|
"type": "chunk",
|
||||||
|
"text": token,
|
||||||
|
})
|
||||||
|
except Exception:
|
||||||
|
# Client disconnected mid-stream — exit cleanly
|
||||||
break
|
break
|
||||||
|
|
||||||
|
elif msg_type == "done":
|
||||||
|
# Final marker — full text for DB persistence
|
||||||
|
response_text = payload.get("text", "")
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
await asyncio.sleep(0.05)
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
await pubsub.unsubscribe(response_channel)
|
await pubsub.unsubscribe(response_channel)
|
||||||
@@ -272,7 +300,7 @@ async def _handle_websocket_connection(
|
|||||||
await subscribe_redis.aclose()
|
await subscribe_redis.aclose()
|
||||||
|
|
||||||
# -------------------------------------------------------------------
|
# -------------------------------------------------------------------
|
||||||
# e. Save assistant message and send response to client
|
# e. Save assistant message and send final "done" to client
|
||||||
# -------------------------------------------------------------------
|
# -------------------------------------------------------------------
|
||||||
if response_text:
|
if response_text:
|
||||||
rls_token2 = current_tenant_id.set(tenant_uuid)
|
rls_token2 = current_tenant_id.set(tenant_uuid)
|
||||||
@@ -299,8 +327,9 @@ async def _handle_websocket_connection(
|
|||||||
finally:
|
finally:
|
||||||
current_tenant_id.reset(rls_token2)
|
current_tenant_id.reset(rls_token2)
|
||||||
|
|
||||||
|
# Signal stream completion to the client
|
||||||
await websocket.send_json({
|
await websocket.send_json({
|
||||||
"type": "response",
|
"type": "done",
|
||||||
"text": response_text,
|
"text": response_text,
|
||||||
"conversation_id": saved_conversation_id,
|
"conversation_id": saved_conversation_id,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -3,18 +3,22 @@ LLM Backend Pool — FastAPI service on port 8004.
|
|||||||
|
|
||||||
Endpoints:
|
Endpoints:
|
||||||
POST /complete — route a completion request through the LiteLLM Router.
|
POST /complete — route a completion request through the LiteLLM Router.
|
||||||
|
POST /complete/stream — streaming variant; returns NDJSON token chunks.
|
||||||
GET /health — liveness probe.
|
GET /health — liveness probe.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from llm_pool.router import complete as router_complete
|
from llm_pool.router import complete as router_complete
|
||||||
|
from llm_pool.router import complete_stream as router_complete_stream
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -69,6 +73,19 @@ class HealthResponse(BaseModel):
|
|||||||
status: str
|
status: str
|
||||||
|
|
||||||
|
|
||||||
|
class StreamCompleteRequest(BaseModel):
|
||||||
|
"""Body for POST /complete/stream."""
|
||||||
|
|
||||||
|
model: str
|
||||||
|
"""Model group name: "quality" or "fast"."""
|
||||||
|
|
||||||
|
messages: list[dict]
|
||||||
|
"""OpenAI-format message list."""
|
||||||
|
|
||||||
|
tenant_id: str
|
||||||
|
"""Konstruct tenant UUID for cost tracking."""
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Routes
|
# Routes
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -123,3 +140,44 @@ async def complete_endpoint(request: CompleteRequest) -> CompleteResponse:
|
|||||||
status_code=503,
|
status_code=503,
|
||||||
content={"error": "All providers unavailable"},
|
content={"error": "All providers unavailable"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@app.post("/complete/stream")
|
||||||
|
async def complete_stream_endpoint(request: StreamCompleteRequest) -> StreamingResponse:
|
||||||
|
"""
|
||||||
|
Stream a completion through the LiteLLM Router using NDJSON.
|
||||||
|
|
||||||
|
Each line of the response body is a JSON object:
|
||||||
|
{"type": "chunk", "text": "<token>"} — zero or more times
|
||||||
|
{"type": "done"} — final line, signals end of stream
|
||||||
|
|
||||||
|
On provider failure, yields:
|
||||||
|
{"type": "error", "message": "All providers unavailable"}
|
||||||
|
|
||||||
|
The caller (orchestrator runner) reads line-by-line and forwards chunks
|
||||||
|
to Redis pub-sub for the web WebSocket handler.
|
||||||
|
|
||||||
|
NOTE: Tool calls are NOT supported in this endpoint — only plain text
|
||||||
|
streaming. Use POST /complete for tool-call responses.
|
||||||
|
"""
|
||||||
|
async def _generate() -> Any:
|
||||||
|
try:
|
||||||
|
async for token in router_complete_stream(
|
||||||
|
model_group=request.model,
|
||||||
|
messages=request.messages,
|
||||||
|
tenant_id=request.tenant_id,
|
||||||
|
):
|
||||||
|
yield json.dumps({"type": "chunk", "text": token}) + "\n"
|
||||||
|
yield json.dumps({"type": "done"}) + "\n"
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"Streaming LLM failed for tenant=%s model=%s",
|
||||||
|
request.tenant_id,
|
||||||
|
request.model,
|
||||||
|
)
|
||||||
|
yield json.dumps({"type": "error", "message": "All providers unavailable"}) + "\n"
|
||||||
|
|
||||||
|
return StreamingResponse(
|
||||||
|
_generate(),
|
||||||
|
media_type="application/x-ndjson",
|
||||||
|
)
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ NOTE: LiteLLM is pinned to ==1.82.5 in pyproject.toml.
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from collections.abc import AsyncGenerator
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from litellm import Router
|
from litellm import Router
|
||||||
@@ -173,3 +174,48 @@ async def complete(
|
|||||||
|
|
||||||
content: str = message.content or ""
|
content: str = message.content or ""
|
||||||
return LLMResponse(content=content, tool_calls=tool_calls)
|
return LLMResponse(content=content, tool_calls=tool_calls)
|
||||||
|
|
||||||
|
|
||||||
|
async def complete_stream(
|
||||||
|
model_group: str,
|
||||||
|
messages: list[dict],
|
||||||
|
tenant_id: str,
|
||||||
|
) -> AsyncGenerator[str, None]:
|
||||||
|
"""
|
||||||
|
Stream a completion from the LiteLLM Router, yielding token strings.
|
||||||
|
|
||||||
|
Only used for the web channel streaming path — does NOT support tool calls
|
||||||
|
(tool-call responses are not streamed). The caller is responsible for
|
||||||
|
assembling the full response from the yielded chunks.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
model_group: "quality", "fast", etc. — selects the provider group.
|
||||||
|
messages: OpenAI-format message list.
|
||||||
|
tenant_id: Konstruct tenant UUID for cost tracking metadata.
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
Token strings as they are generated by the LLM.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: Propagated if all providers (and fallbacks) fail.
|
||||||
|
"""
|
||||||
|
logger.info(
|
||||||
|
"LLM stream request",
|
||||||
|
extra={"model_group": model_group, "tenant_id": tenant_id},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = await llm_router.acompletion(
|
||||||
|
model=model_group,
|
||||||
|
messages=messages,
|
||||||
|
metadata={"tenant_id": tenant_id},
|
||||||
|
stream=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
async for chunk in response:
|
||||||
|
try:
|
||||||
|
delta = chunk.choices[0].delta
|
||||||
|
token = getattr(delta, "content", None)
|
||||||
|
if token:
|
||||||
|
yield token
|
||||||
|
except (IndexError, AttributeError):
|
||||||
|
continue
|
||||||
|
|||||||
@@ -16,13 +16,20 @@ Tool-call loop (Phase 2):
|
|||||||
- Otherwise: append tool result as 'tool' role message, re-call LLM
|
- Otherwise: append tool result as 'tool' role message, re-call LLM
|
||||||
Loop until LLM returns plain text (no tool_calls) or max_iterations reached.
|
Loop until LLM returns plain text (no tool_calls) or max_iterations reached.
|
||||||
Max iterations = 5 (prevents runaway tool chains).
|
Max iterations = 5 (prevents runaway tool chains).
|
||||||
|
|
||||||
|
Streaming (web channel only):
|
||||||
|
run_agent_streaming() calls POST /complete/stream and yields token strings.
|
||||||
|
Tool calls are NOT supported in the streaming path — only used for the final
|
||||||
|
text response after the tool-call loop has resolved (or when no tools needed).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
from collections.abc import AsyncGenerator
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
@@ -268,6 +275,99 @@ async def run_agent(
|
|||||||
return _FALLBACK_RESPONSE
|
return _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
|
||||||
|
async def run_agent_streaming(
|
||||||
|
msg: KonstructMessage,
|
||||||
|
agent: Agent,
|
||||||
|
messages: list[dict] | None = None,
|
||||||
|
) -> AsyncGenerator[str, None]:
|
||||||
|
"""
|
||||||
|
Stream the final LLM response for an agent, yielding token strings.
|
||||||
|
|
||||||
|
This is the web-channel-only streaming path. It does NOT support tool calls —
|
||||||
|
tool resolution must be completed BEFORE calling this function (use run_agent()
|
||||||
|
for the tool-call loop, then only call this for streaming plain text responses).
|
||||||
|
|
||||||
|
The caller (tasks._process_message) is responsible for:
|
||||||
|
- Running the tool-call loop via run_agent() first if tools are registered
|
||||||
|
- Only falling into this path when the response will be plain text
|
||||||
|
- Publishing each yielded token to Redis pub-sub
|
||||||
|
|
||||||
|
For simple conversations (no tools), this streams the response directly.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg: The inbound KonstructMessage.
|
||||||
|
agent: The ORM Agent instance.
|
||||||
|
messages: Memory-enriched messages array (from build_messages_with_memory).
|
||||||
|
When None, falls back to simple [system, user] construction.
|
||||||
|
|
||||||
|
Yields:
|
||||||
|
Token strings as generated by the LLM.
|
||||||
|
On error, yields the fallback response string as a single chunk.
|
||||||
|
"""
|
||||||
|
if messages is None:
|
||||||
|
from orchestrator.agents.builder import build_messages, build_system_prompt
|
||||||
|
system_prompt = build_system_prompt(agent)
|
||||||
|
user_text: str = msg.content.text or ""
|
||||||
|
messages = build_messages(
|
||||||
|
system_prompt=system_prompt,
|
||||||
|
user_message=user_text,
|
||||||
|
)
|
||||||
|
|
||||||
|
llm_stream_url = f"{settings.llm_pool_url}/complete/stream"
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
"model": agent.model_preference,
|
||||||
|
"messages": messages,
|
||||||
|
"tenant_id": str(msg.tenant_id) if msg.tenant_id else "",
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=_LLM_TIMEOUT) as client:
|
||||||
|
async with client.stream("POST", llm_stream_url, json=payload) as response:
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.error(
|
||||||
|
"LLM pool stream returned %d for tenant=%s agent=%s",
|
||||||
|
response.status_code,
|
||||||
|
msg.tenant_id,
|
||||||
|
agent.id,
|
||||||
|
)
|
||||||
|
yield _FALLBACK_RESPONSE
|
||||||
|
return
|
||||||
|
|
||||||
|
async for line in response.aiter_lines():
|
||||||
|
if not line.strip():
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
event = json.loads(line)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
event_type = event.get("type")
|
||||||
|
if event_type == "chunk":
|
||||||
|
token = event.get("text", "")
|
||||||
|
if token:
|
||||||
|
yield token
|
||||||
|
elif event_type == "done":
|
||||||
|
return
|
||||||
|
elif event_type == "error":
|
||||||
|
logger.error(
|
||||||
|
"LLM pool stream error for tenant=%s agent=%s: %s",
|
||||||
|
msg.tenant_id,
|
||||||
|
agent.id,
|
||||||
|
event.get("message", "unknown"),
|
||||||
|
)
|
||||||
|
yield _FALLBACK_RESPONSE
|
||||||
|
return
|
||||||
|
|
||||||
|
except httpx.RequestError:
|
||||||
|
logger.exception(
|
||||||
|
"LLM pool stream unreachable for tenant=%s agent=%s url=%s",
|
||||||
|
msg.tenant_id,
|
||||||
|
agent.id,
|
||||||
|
llm_stream_url,
|
||||||
|
)
|
||||||
|
yield _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
|
||||||
def _get_last_user_message(messages: list[dict[str, Any]]) -> str:
|
def _get_last_user_message(messages: list[dict[str, Any]]) -> str:
|
||||||
"""Extract the content of the last user message for audit summary."""
|
"""Extract the content of the last user message for audit summary."""
|
||||||
for msg in reversed(messages):
|
for msg in reversed(messages):
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ import redis.asyncio as aioredis
|
|||||||
|
|
||||||
from gateway.channels.whatsapp import send_whatsapp_message
|
from gateway.channels.whatsapp import send_whatsapp_message
|
||||||
from orchestrator.agents.builder import build_messages_with_memory
|
from orchestrator.agents.builder import build_messages_with_memory
|
||||||
from orchestrator.agents.runner import run_agent
|
from orchestrator.agents.runner import run_agent, run_agent_streaming
|
||||||
from orchestrator.audit.logger import AuditLogger
|
from orchestrator.audit.logger import AuditLogger
|
||||||
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
|
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
|
||||||
from orchestrator.main import app
|
from orchestrator.main import app
|
||||||
@@ -526,8 +526,32 @@ async def _process_message(
|
|||||||
tool_registry = get_tools_for_agent(agent)
|
tool_registry = get_tools_for_agent(agent)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
# Run agent with tool loop
|
# Run agent — streaming for web channel (no tools), non-streaming otherwise
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
|
is_web_channel = str(msg.channel) == "web"
|
||||||
|
has_tools = bool(tool_registry)
|
||||||
|
|
||||||
|
# Track whether the streaming path already published to Redis so we can
|
||||||
|
# skip the _send_response() call below (avoids a duplicate publish).
|
||||||
|
streaming_delivered = False
|
||||||
|
|
||||||
|
if is_web_channel and not has_tools:
|
||||||
|
# Streaming path: yield tokens directly to Redis pub-sub so the
|
||||||
|
# WebSocket handler can forward them to the browser immediately.
|
||||||
|
# The tool-call loop is skipped because there are no tools registered.
|
||||||
|
web_conversation_id: str = extras.get("conversation_id", "") or ""
|
||||||
|
web_tenant_id: str = extras.get("tenant_id", "") or str(msg.tenant_id or "")
|
||||||
|
response_text = await _stream_agent_response_to_redis(
|
||||||
|
msg=msg,
|
||||||
|
agent=agent,
|
||||||
|
messages=enriched_messages,
|
||||||
|
conversation_id=web_conversation_id,
|
||||||
|
tenant_id=web_tenant_id,
|
||||||
|
)
|
||||||
|
streaming_delivered = True
|
||||||
|
else:
|
||||||
|
# Non-streaming path: tool-call loop + full response (Slack, WhatsApp,
|
||||||
|
# and web channel when tools are registered).
|
||||||
response_text = await run_agent(
|
response_text = await run_agent(
|
||||||
msg,
|
msg,
|
||||||
agent,
|
agent,
|
||||||
@@ -592,7 +616,11 @@ async def _process_message(
|
|||||||
len(relevant_context),
|
len(relevant_context),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Send response via channel-aware routing
|
# Send response via channel-aware routing.
|
||||||
|
# Skip for the streaming path — _stream_agent_response_to_redis already
|
||||||
|
# published chunk + done messages to Redis; calling _send_response here
|
||||||
|
# would publish a duplicate done message.
|
||||||
|
if not streaming_delivered:
|
||||||
await _send_response(msg.channel, response_text, response_extras)
|
await _send_response(msg.channel, response_text, response_extras)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
@@ -733,6 +761,87 @@ def _extract_tool_name_from_confirmation(confirmation_message: str) -> str:
|
|||||||
return "unknown_tool"
|
return "unknown_tool"
|
||||||
|
|
||||||
|
|
||||||
|
# Fallback text for streaming errors (mirrors runner._FALLBACK_RESPONSE)
|
||||||
|
_FALLBACK_RESPONSE_TEXT = (
|
||||||
|
"I'm having trouble processing your request right now. "
|
||||||
|
"Please try again in a moment."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _stream_agent_response_to_redis(
|
||||||
|
msg: "KonstructMessage",
|
||||||
|
agent: Any,
|
||||||
|
messages: list[dict],
|
||||||
|
conversation_id: str,
|
||||||
|
tenant_id: str,
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Stream LLM token chunks to Redis pub-sub for web channel delivery.
|
||||||
|
|
||||||
|
Calls run_agent_streaming() and publishes each token as a
|
||||||
|
``{"type": "chunk", "text": "<token>"}`` message to the webchat response
|
||||||
|
channel. Publishes a final ``{"type": "done", "text": "<full_response>"}``
|
||||||
|
when the stream completes.
|
||||||
|
|
||||||
|
The WebSocket handler (web.py) listens on this channel and forwards
|
||||||
|
chunk/done messages directly to the browser, enabling word-by-word display.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg: The inbound KonstructMessage.
|
||||||
|
agent: The ORM Agent instance.
|
||||||
|
messages: Memory-enriched messages array.
|
||||||
|
conversation_id: Web conversation UUID string.
|
||||||
|
tenant_id: Konstruct tenant UUID string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The full assembled response text (for memory persistence and audit).
|
||||||
|
"""
|
||||||
|
response_channel = webchat_response_key(tenant_id, conversation_id)
|
||||||
|
chunks: list[str] = []
|
||||||
|
full_response = _FALLBACK_RESPONSE_TEXT
|
||||||
|
|
||||||
|
publish_redis = aioredis.from_url(settings.redis_url)
|
||||||
|
try:
|
||||||
|
async for token in run_agent_streaming(msg, agent, messages=messages):
|
||||||
|
chunks.append(token)
|
||||||
|
await publish_redis.publish(
|
||||||
|
response_channel,
|
||||||
|
json.dumps({"type": "chunk", "text": token}),
|
||||||
|
)
|
||||||
|
|
||||||
|
full_response = "".join(chunks) or _FALLBACK_RESPONSE_TEXT
|
||||||
|
await publish_redis.publish(
|
||||||
|
response_channel,
|
||||||
|
json.dumps({
|
||||||
|
"type": "done",
|
||||||
|
"text": full_response,
|
||||||
|
"conversation_id": conversation_id,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception(
|
||||||
|
"Streaming agent response failed for conversation=%s tenant=%s",
|
||||||
|
conversation_id,
|
||||||
|
tenant_id,
|
||||||
|
)
|
||||||
|
# Publish a done marker with fallback so the browser doesn't hang
|
||||||
|
try:
|
||||||
|
await publish_redis.publish(
|
||||||
|
response_channel,
|
||||||
|
json.dumps({
|
||||||
|
"type": "done",
|
||||||
|
"text": full_response,
|
||||||
|
"conversation_id": conversation_id,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
await publish_redis.aclose()
|
||||||
|
|
||||||
|
return full_response
|
||||||
|
|
||||||
|
|
||||||
async def _send_response(
|
async def _send_response(
|
||||||
channel: Any,
|
channel: Any,
|
||||||
text: str,
|
text: str,
|
||||||
@@ -792,7 +901,9 @@ async def _send_response(
|
|||||||
)
|
)
|
||||||
|
|
||||||
elif channel_str == "web":
|
elif channel_str == "web":
|
||||||
# Publish agent response to Redis pub-sub so the WebSocket handler can deliver it
|
# Publish agent response to Redis pub-sub so the WebSocket handler can deliver it.
|
||||||
|
# Uses "done" type (consistent with streaming path) so the WebSocket handler
|
||||||
|
# processes it identically whether or not streaming was used.
|
||||||
web_conversation_id: str = extras.get("conversation_id", "") or ""
|
web_conversation_id: str = extras.get("conversation_id", "") or ""
|
||||||
web_tenant_id: str = extras.get("tenant_id", "") or ""
|
web_tenant_id: str = extras.get("tenant_id", "") or ""
|
||||||
|
|
||||||
@@ -808,7 +919,7 @@ async def _send_response(
|
|||||||
await publish_redis.publish(
|
await publish_redis.publish(
|
||||||
response_channel,
|
response_channel,
|
||||||
json.dumps({
|
json.dumps({
|
||||||
"type": "response",
|
"type": "done",
|
||||||
"text": text,
|
"text": text,
|
||||||
"conversation_id": web_conversation_id,
|
"conversation_id": web_conversation_id,
|
||||||
}),
|
}),
|
||||||
|
|||||||
Submodule packages/portal updated: c2586a2260...cd8899f070
Reference in New Issue
Block a user