- Migration 014: add status/error_message/chunk_count to kb_documents, make agent_id nullable - Add GOOGLE_CALENDAR to ChannelTypeEnum in tenant.py - Add brave_api_key, firecrawl_api_key, google_client_id/secret, minio_kb_bucket to config - Add text extractors for PDF, DOCX, PPTX, XLSX/XLS, CSV, TXT, MD - Add KB management API router with upload, list, delete, URL ingest, reindex endpoints - Install pypdf, python-docx, python-pptx, openpyxl, pandas, firecrawl-py, youtube-transcript-api - Update .env.example with new env vars - Unit tests: test_extractors.py (10 tests) and test_kb_upload.py (7 tests) all pass
377 lines
12 KiB
Python
377 lines
12 KiB
Python
"""
|
|
Knowledge Base management API endpoints for the Konstruct portal.
|
|
|
|
Endpoints:
|
|
POST /api/portal/kb/{tenant_id}/documents — upload a file
|
|
POST /api/portal/kb/{tenant_id}/documents/url — ingest from URL/YouTube
|
|
GET /api/portal/kb/{tenant_id}/documents — list documents
|
|
DELETE /api/portal/kb/{tenant_id}/documents/{doc_id} — delete document
|
|
POST /api/portal/kb/{tenant_id}/documents/{doc_id}/reindex — re-run ingestion
|
|
|
|
Upload flow:
|
|
1. Validate file extension against supported list
|
|
2. Upload raw bytes to MinIO kb-documents bucket (key: {tenant_id}/{doc_id}/{filename})
|
|
3. Insert KnowledgeBaseDocument row (status='processing')
|
|
4. Dispatch ingest_document.delay(doc_id, tenant_id) Celery task
|
|
5. Return 201 with {id, filename, status}
|
|
|
|
The Celery task handles text extraction, chunking, and embedding asynchronously.
|
|
Status is updated to 'ready' or 'error' when ingestion completes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Annotated, Any
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
|
|
from pydantic import BaseModel, HttpUrl
|
|
from sqlalchemy import delete, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.api.rbac import PortalCaller, require_tenant_admin, require_tenant_member
|
|
from shared.config import settings
|
|
from shared.db import get_session
|
|
from shared.models.kb import KBChunk, KnowledgeBaseDocument
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
kb_router = APIRouter(prefix="/api/portal/kb", tags=["knowledge-base"])
|
|
|
|
# Supported file extensions for upload
|
|
_SUPPORTED_EXTENSIONS = {
|
|
".pdf", ".docx", ".pptx", ".xlsx", ".xls", ".csv", ".txt", ".md"
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lazy Celery task import — avoids circular dependency at module load time
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_ingest_task() -> Any:
|
|
"""Return the ingest_document Celery task (lazy import to avoid circular deps)."""
|
|
from orchestrator.tasks import ingest_document # noqa: PLC0415
|
|
|
|
return ingest_document
|
|
|
|
|
|
# Convenience alias — tests can patch 'shared.api.kb.ingest_document'
|
|
def ingest_document(document_id: str, tenant_id: str) -> None: # type: ignore[empty-body]
|
|
"""Placeholder — replaced at call site via _get_ingest_task()."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MinIO client helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _get_minio_client() -> Any:
|
|
"""Create a boto3 S3 client pointed at MinIO."""
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=settings.minio_endpoint,
|
|
aws_access_key_id=settings.minio_access_key,
|
|
aws_secret_access_key=settings.minio_secret_key,
|
|
)
|
|
|
|
|
|
def _ensure_bucket(client: Any, bucket: str) -> None:
|
|
"""Create bucket if it doesn't exist."""
|
|
try:
|
|
client.head_bucket(Bucket=bucket)
|
|
except ClientError:
|
|
try:
|
|
client.create_bucket(Bucket=bucket)
|
|
except ClientError as exc:
|
|
logger.warning("Could not create bucket %s: %s", bucket, exc)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pydantic schemas
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class DocumentResponse(BaseModel):
|
|
"""Response schema for a knowledge base document."""
|
|
|
|
id: str
|
|
filename: str | None
|
|
source_url: str | None
|
|
content_type: str | None
|
|
status: str
|
|
chunk_count: int | None
|
|
created_at: datetime
|
|
|
|
|
|
class UrlIngestRequest(BaseModel):
|
|
"""Request body for URL/YouTube ingestion."""
|
|
|
|
url: str
|
|
source_type: str = "web" # "web" | "youtube"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /{tenant_id}/documents — file upload
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@kb_router.post(
|
|
"/{tenant_id}/documents",
|
|
status_code=status.HTTP_201_CREATED,
|
|
response_model=DocumentResponse,
|
|
summary="Upload a document to the knowledge base",
|
|
)
|
|
async def upload_document(
|
|
tenant_id: uuid.UUID,
|
|
file: Annotated[UploadFile, File(description="Document file to ingest")],
|
|
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
) -> DocumentResponse:
|
|
"""
|
|
Upload a document and dispatch the ingestion pipeline.
|
|
|
|
Supported formats: PDF, DOCX, PPTX, XLSX, XLS, CSV, TXT, MD
|
|
"""
|
|
filename = file.filename or "upload"
|
|
_, ext = os.path.splitext(filename.lower())
|
|
|
|
if ext not in _SUPPORTED_EXTENSIONS:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Unsupported file type '{ext}'. Supported: {', '.join(sorted(_SUPPORTED_EXTENSIONS))}",
|
|
)
|
|
|
|
file_bytes = await file.read()
|
|
content_type = file.content_type or "application/octet-stream"
|
|
|
|
# Insert document row first to get the ID
|
|
doc = KnowledgeBaseDocument(
|
|
tenant_id=tenant_id,
|
|
agent_id=None,
|
|
filename=filename,
|
|
content_type=content_type,
|
|
status="processing",
|
|
)
|
|
session.add(doc)
|
|
await session.flush() # Populate doc.id
|
|
|
|
doc_id = doc.id
|
|
|
|
# Upload to MinIO
|
|
bucket = settings.minio_kb_bucket
|
|
key = f"{tenant_id}/{doc_id}/{filename}"
|
|
try:
|
|
minio = _get_minio_client()
|
|
_ensure_bucket(minio, bucket)
|
|
import io
|
|
|
|
minio.put_object(
|
|
Bucket=bucket,
|
|
Key=key,
|
|
Body=io.BytesIO(file_bytes),
|
|
ContentLength=len(file_bytes),
|
|
ContentType=content_type,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("MinIO upload failed for %s: %s", key, exc)
|
|
# Continue — ingestion task will try to re-fetch or fail gracefully
|
|
|
|
await session.commit()
|
|
|
|
# Dispatch async ingestion task
|
|
try:
|
|
task = _get_ingest_task()
|
|
task.delay(str(doc_id), str(tenant_id))
|
|
except Exception as exc:
|
|
logger.exception("Failed to dispatch ingest_document task for %s: %s", doc_id, exc)
|
|
|
|
return DocumentResponse(
|
|
id=str(doc_id),
|
|
filename=filename,
|
|
source_url=None,
|
|
content_type=content_type,
|
|
status="processing",
|
|
chunk_count=None,
|
|
created_at=doc.created_at or datetime.utcnow(),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /{tenant_id}/documents/url — URL / YouTube ingest
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@kb_router.post(
|
|
"/{tenant_id}/documents/url",
|
|
status_code=status.HTTP_201_CREATED,
|
|
response_model=DocumentResponse,
|
|
summary="Ingest a URL or YouTube video transcript into the knowledge base",
|
|
)
|
|
async def ingest_url(
|
|
tenant_id: uuid.UUID,
|
|
body: UrlIngestRequest,
|
|
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
) -> DocumentResponse:
|
|
"""Ingest content from a URL (web page or YouTube video) into the KB."""
|
|
doc = KnowledgeBaseDocument(
|
|
tenant_id=tenant_id,
|
|
agent_id=None,
|
|
source_url=body.url,
|
|
content_type=None,
|
|
status="processing",
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
doc_id = doc.id
|
|
await session.commit()
|
|
|
|
try:
|
|
task = _get_ingest_task()
|
|
task.delay(str(doc_id), str(tenant_id))
|
|
except Exception as exc:
|
|
logger.exception("Failed to dispatch ingest_document task for %s: %s", doc_id, exc)
|
|
|
|
return DocumentResponse(
|
|
id=str(doc_id),
|
|
filename=None,
|
|
source_url=body.url,
|
|
content_type=None,
|
|
status="processing",
|
|
chunk_count=None,
|
|
created_at=doc.created_at or datetime.utcnow(),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GET /{tenant_id}/documents — list
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@kb_router.get(
|
|
"/{tenant_id}/documents",
|
|
response_model=list[DocumentResponse],
|
|
summary="List knowledge base documents for a tenant",
|
|
)
|
|
async def list_documents(
|
|
tenant_id: uuid.UUID,
|
|
caller: Annotated[PortalCaller, Depends(require_tenant_member)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
) -> list[DocumentResponse]:
|
|
"""List all KB documents for the given tenant with status and chunk count."""
|
|
result = await session.execute(
|
|
select(KnowledgeBaseDocument)
|
|
.where(KnowledgeBaseDocument.tenant_id == tenant_id)
|
|
.order_by(KnowledgeBaseDocument.created_at.desc())
|
|
)
|
|
docs = result.scalars().all()
|
|
|
|
return [
|
|
DocumentResponse(
|
|
id=str(doc.id),
|
|
filename=doc.filename,
|
|
source_url=doc.source_url,
|
|
content_type=doc.content_type,
|
|
status=doc.status,
|
|
chunk_count=doc.chunk_count,
|
|
created_at=doc.created_at,
|
|
)
|
|
for doc in docs
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DELETE /{tenant_id}/documents/{document_id} — delete
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@kb_router.delete(
|
|
"/{tenant_id}/documents/{document_id}",
|
|
status_code=status.HTTP_204_NO_CONTENT,
|
|
summary="Delete a knowledge base document and its chunks",
|
|
)
|
|
async def delete_document(
|
|
tenant_id: uuid.UUID,
|
|
document_id: uuid.UUID,
|
|
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
) -> None:
|
|
"""Delete a document (CASCADE removes all kb_chunks rows automatically)."""
|
|
result = await session.execute(
|
|
select(KnowledgeBaseDocument).where(
|
|
KnowledgeBaseDocument.id == document_id,
|
|
KnowledgeBaseDocument.tenant_id == tenant_id,
|
|
)
|
|
)
|
|
doc = result.scalar_one_or_none()
|
|
if doc is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Document not found")
|
|
|
|
# Remove from MinIO if it was a file upload
|
|
if doc.filename:
|
|
bucket = settings.minio_kb_bucket
|
|
key = f"{tenant_id}/{document_id}/{doc.filename}"
|
|
try:
|
|
minio = _get_minio_client()
|
|
minio.remove_object(Bucket=bucket, Key=key)
|
|
except Exception as exc:
|
|
logger.warning("MinIO delete failed for %s: %s", key, exc)
|
|
|
|
await session.delete(doc)
|
|
await session.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# POST /{tenant_id}/documents/{document_id}/reindex — re-run ingestion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@kb_router.post(
|
|
"/{tenant_id}/documents/{document_id}/reindex",
|
|
status_code=status.HTTP_202_ACCEPTED,
|
|
response_model=DocumentResponse,
|
|
summary="Delete existing chunks and re-dispatch the ingestion pipeline",
|
|
)
|
|
async def reindex_document(
|
|
tenant_id: uuid.UUID,
|
|
document_id: uuid.UUID,
|
|
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
) -> DocumentResponse:
|
|
"""Re-run the ingestion pipeline for an existing document."""
|
|
result = await session.execute(
|
|
select(KnowledgeBaseDocument).where(
|
|
KnowledgeBaseDocument.id == document_id,
|
|
KnowledgeBaseDocument.tenant_id == tenant_id,
|
|
)
|
|
)
|
|
doc = result.scalar_one_or_none()
|
|
if doc is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Document not found")
|
|
|
|
# Delete existing chunks so they get re-created
|
|
await session.execute(
|
|
delete(KBChunk).where(KBChunk.document_id == document_id)
|
|
)
|
|
|
|
# Reset status to processing
|
|
doc.status = "processing"
|
|
doc.error_message = None
|
|
doc.chunk_count = None
|
|
await session.commit()
|
|
|
|
try:
|
|
task = _get_ingest_task()
|
|
task.delay(str(document_id), str(tenant_id))
|
|
except Exception as exc:
|
|
logger.exception("Failed to dispatch reindex task for %s: %s", document_id, exc)
|
|
|
|
return DocumentResponse(
|
|
id=str(doc.id),
|
|
filename=doc.filename,
|
|
source_url=doc.source_url,
|
|
content_type=doc.content_type,
|
|
status="processing",
|
|
chunk_count=None,
|
|
created_at=doc.created_at,
|
|
)
|