- complete_stream() in router.py yields token strings via acompletion(stream=True) - POST /complete/stream returns NDJSON: chunk lines then a done line - Streaming path does not support tool calls (plain text only) - Non-streaming POST /complete endpoint unchanged
184 lines
5.6 KiB
Python
184 lines
5.6 KiB
Python
"""
|
|
LLM Backend Pool — FastAPI service on port 8004.
|
|
|
|
Endpoints:
|
|
POST /complete — route a completion request through the LiteLLM Router.
|
|
POST /complete/stream — streaming variant; returns NDJSON token chunks.
|
|
GET /health — liveness probe.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.responses import StreamingResponse
|
|
from pydantic import BaseModel
|
|
|
|
from llm_pool.router import complete as router_complete
|
|
from llm_pool.router import complete_stream as router_complete_stream
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(
|
|
title="Konstruct LLM Pool",
|
|
description="LiteLLM Router — Ollama + Anthropic + OpenAI with automatic fallback",
|
|
version="0.1.0",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request / Response schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class CompleteRequest(BaseModel):
|
|
"""Body for POST /complete."""
|
|
|
|
model: str
|
|
"""Model group name: "quality" or "fast"."""
|
|
|
|
messages: list[dict]
|
|
"""OpenAI-format message list."""
|
|
|
|
tenant_id: str
|
|
"""Konstruct tenant UUID for cost tracking."""
|
|
|
|
tools: list[dict] | None = None
|
|
"""
|
|
Optional OpenAI function-calling tool definitions.
|
|
When provided, the LLM may return tool_calls instead of text content.
|
|
"""
|
|
|
|
|
|
class UsageInfo(BaseModel):
|
|
prompt_tokens: int = 0
|
|
completion_tokens: int = 0
|
|
|
|
|
|
class CompleteResponse(BaseModel):
|
|
content: str
|
|
model: str
|
|
usage: UsageInfo
|
|
tool_calls: list[dict[str, Any]] = []
|
|
"""
|
|
Tool calls returned by the LLM, in OpenAI format.
|
|
Non-empty when the LLM decided to use a tool instead of responding with text.
|
|
"""
|
|
|
|
|
|
class HealthResponse(BaseModel):
|
|
status: str
|
|
|
|
|
|
class StreamCompleteRequest(BaseModel):
|
|
"""Body for POST /complete/stream."""
|
|
|
|
model: str
|
|
"""Model group name: "quality" or "fast"."""
|
|
|
|
messages: list[dict]
|
|
"""OpenAI-format message list."""
|
|
|
|
tenant_id: str
|
|
"""Konstruct tenant UUID for cost tracking."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Routes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@app.get("/health", response_model=HealthResponse)
|
|
async def health() -> HealthResponse:
|
|
"""Liveness probe — returns immediately."""
|
|
return HealthResponse(status="ok")
|
|
|
|
|
|
@app.post("/complete", response_model=CompleteResponse)
|
|
async def complete_endpoint(request: CompleteRequest) -> CompleteResponse:
|
|
"""
|
|
Route a completion request through the LiteLLM Router.
|
|
|
|
The `model` field selects the provider group ("quality" or "fast").
|
|
LiteLLM handles provider selection, retries, and cross-group fallback
|
|
automatically.
|
|
|
|
When `tools` are provided, the LLM may return tool_calls instead of text.
|
|
The response includes both `content` and `tool_calls` fields — exactly one
|
|
will be populated depending on whether the LLM chose to use a tool.
|
|
|
|
Returns 503 JSON if all providers (including fallbacks) are unavailable.
|
|
"""
|
|
from fastapi.responses import JSONResponse
|
|
|
|
try:
|
|
llm_response = await router_complete(
|
|
model_group=request.model,
|
|
messages=request.messages,
|
|
tenant_id=request.tenant_id,
|
|
tools=request.tools,
|
|
)
|
|
# LiteLLM Router doesn't expose per-call usage easily via acompletion
|
|
# on all provider paths; we return zeroed usage for now and will wire
|
|
# real token counts in a follow-up plan when cost tracking is added.
|
|
return CompleteResponse(
|
|
content=llm_response.content,
|
|
model=request.model,
|
|
usage=UsageInfo(),
|
|
tool_calls=llm_response.tool_calls,
|
|
)
|
|
except Exception:
|
|
logger.exception(
|
|
"All LLM providers unavailable for tenant=%s model=%s",
|
|
request.tenant_id,
|
|
request.model,
|
|
)
|
|
return JSONResponse( # type: ignore[return-value]
|
|
status_code=503,
|
|
content={"error": "All providers unavailable"},
|
|
)
|
|
|
|
|
|
@app.post("/complete/stream")
|
|
async def complete_stream_endpoint(request: StreamCompleteRequest) -> StreamingResponse:
|
|
"""
|
|
Stream a completion through the LiteLLM Router using NDJSON.
|
|
|
|
Each line of the response body is a JSON object:
|
|
{"type": "chunk", "text": "<token>"} — zero or more times
|
|
{"type": "done"} — final line, signals end of stream
|
|
|
|
On provider failure, yields:
|
|
{"type": "error", "message": "All providers unavailable"}
|
|
|
|
The caller (orchestrator runner) reads line-by-line and forwards chunks
|
|
to Redis pub-sub for the web WebSocket handler.
|
|
|
|
NOTE: Tool calls are NOT supported in this endpoint — only plain text
|
|
streaming. Use POST /complete for tool-call responses.
|
|
"""
|
|
async def _generate() -> Any:
|
|
try:
|
|
async for token in router_complete_stream(
|
|
model_group=request.model,
|
|
messages=request.messages,
|
|
tenant_id=request.tenant_id,
|
|
):
|
|
yield json.dumps({"type": "chunk", "text": token}) + "\n"
|
|
yield json.dumps({"type": "done"}) + "\n"
|
|
except Exception:
|
|
logger.exception(
|
|
"Streaming LLM failed for tenant=%s model=%s",
|
|
request.tenant_id,
|
|
request.model,
|
|
)
|
|
yield json.dumps({"type": "error", "message": "All providers unavailable"}) + "\n"
|
|
|
|
return StreamingResponse(
|
|
_generate(),
|
|
media_type="application/x-ndjson",
|
|
)
|