Files
Adolfo Delorenzo f3e358b418 feat(streaming): add complete_stream() generator and POST /complete/stream NDJSON endpoint to llm-pool
- complete_stream() in router.py yields token strings via acompletion(stream=True)
- POST /complete/stream returns NDJSON: chunk lines then a done line
- Streaming path does not support tool calls (plain text only)
- Non-streaming POST /complete endpoint unchanged
2026-03-25 17:56:56 -06:00

184 lines
5.6 KiB
Python

"""
LLM Backend Pool — FastAPI service on port 8004.
Endpoints:
POST /complete — route a completion request through the LiteLLM Router.
POST /complete/stream — streaming variant; returns NDJSON token chunks.
GET /health — liveness probe.
"""
from __future__ import annotations
import json
import logging
from typing import Any
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from llm_pool.router import complete as router_complete
from llm_pool.router import complete_stream as router_complete_stream
logger = logging.getLogger(__name__)
app = FastAPI(
title="Konstruct LLM Pool",
description="LiteLLM Router — Ollama + Anthropic + OpenAI with automatic fallback",
version="0.1.0",
)
# ---------------------------------------------------------------------------
# Request / Response schemas
# ---------------------------------------------------------------------------
class CompleteRequest(BaseModel):
"""Body for POST /complete."""
model: str
"""Model group name: "quality" or "fast"."""
messages: list[dict]
"""OpenAI-format message list."""
tenant_id: str
"""Konstruct tenant UUID for cost tracking."""
tools: list[dict] | None = None
"""
Optional OpenAI function-calling tool definitions.
When provided, the LLM may return tool_calls instead of text content.
"""
class UsageInfo(BaseModel):
prompt_tokens: int = 0
completion_tokens: int = 0
class CompleteResponse(BaseModel):
content: str
model: str
usage: UsageInfo
tool_calls: list[dict[str, Any]] = []
"""
Tool calls returned by the LLM, in OpenAI format.
Non-empty when the LLM decided to use a tool instead of responding with text.
"""
class HealthResponse(BaseModel):
status: str
class StreamCompleteRequest(BaseModel):
"""Body for POST /complete/stream."""
model: str
"""Model group name: "quality" or "fast"."""
messages: list[dict]
"""OpenAI-format message list."""
tenant_id: str
"""Konstruct tenant UUID for cost tracking."""
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
"""Liveness probe — returns immediately."""
return HealthResponse(status="ok")
@app.post("/complete", response_model=CompleteResponse)
async def complete_endpoint(request: CompleteRequest) -> CompleteResponse:
"""
Route a completion request through the LiteLLM Router.
The `model` field selects the provider group ("quality" or "fast").
LiteLLM handles provider selection, retries, and cross-group fallback
automatically.
When `tools` are provided, the LLM may return tool_calls instead of text.
The response includes both `content` and `tool_calls` fields — exactly one
will be populated depending on whether the LLM chose to use a tool.
Returns 503 JSON if all providers (including fallbacks) are unavailable.
"""
from fastapi.responses import JSONResponse
try:
llm_response = await router_complete(
model_group=request.model,
messages=request.messages,
tenant_id=request.tenant_id,
tools=request.tools,
)
# LiteLLM Router doesn't expose per-call usage easily via acompletion
# on all provider paths; we return zeroed usage for now and will wire
# real token counts in a follow-up plan when cost tracking is added.
return CompleteResponse(
content=llm_response.content,
model=request.model,
usage=UsageInfo(),
tool_calls=llm_response.tool_calls,
)
except Exception:
logger.exception(
"All LLM providers unavailable for tenant=%s model=%s",
request.tenant_id,
request.model,
)
return JSONResponse( # type: ignore[return-value]
status_code=503,
content={"error": "All providers unavailable"},
)
@app.post("/complete/stream")
async def complete_stream_endpoint(request: StreamCompleteRequest) -> StreamingResponse:
"""
Stream a completion through the LiteLLM Router using NDJSON.
Each line of the response body is a JSON object:
{"type": "chunk", "text": "<token>"} — zero or more times
{"type": "done"} — final line, signals end of stream
On provider failure, yields:
{"type": "error", "message": "All providers unavailable"}
The caller (orchestrator runner) reads line-by-line and forwards chunks
to Redis pub-sub for the web WebSocket handler.
NOTE: Tool calls are NOT supported in this endpoint — only plain text
streaming. Use POST /complete for tool-call responses.
"""
async def _generate() -> Any:
try:
async for token in router_complete_stream(
model_group=request.model,
messages=request.messages,
tenant_id=request.tenant_id,
):
yield json.dumps({"type": "chunk", "text": token}) + "\n"
yield json.dumps({"type": "done"}) + "\n"
except Exception:
logger.exception(
"Streaming LLM failed for tenant=%s model=%s",
request.tenant_id,
request.model,
)
yield json.dumps({"type": "error", "message": "All providers unavailable"}) + "\n"
return StreamingResponse(
_generate(),
media_type="application/x-ndjson",
)