feat(10-01): KB ingestion pipeline - migration, extractors, API router

- Migration 014: add status/error_message/chunk_count to kb_documents, make agent_id nullable
- Add GOOGLE_CALENDAR to ChannelTypeEnum in tenant.py
- Add brave_api_key, firecrawl_api_key, google_client_id/secret, minio_kb_bucket to config
- Add text extractors for PDF, DOCX, PPTX, XLSX/XLS, CSV, TXT, MD
- Add KB management API router with upload, list, delete, URL ingest, reindex endpoints
- Install pypdf, python-docx, python-pptx, openpyxl, pandas, firecrawl-py, youtube-transcript-api
- Update .env.example with new env vars
- Unit tests: test_extractors.py (10 tests) and test_kb_upload.py (7 tests) all pass
This commit is contained in:
2026-03-26 09:05:29 -06:00
parent eae4b0324d
commit e8d3e8a108
11 changed files with 1745 additions and 28 deletions

View File

@@ -0,0 +1,376 @@
"""
Knowledge Base management API endpoints for the Konstruct portal.
Endpoints:
POST /api/portal/kb/{tenant_id}/documents — upload a file
POST /api/portal/kb/{tenant_id}/documents/url — ingest from URL/YouTube
GET /api/portal/kb/{tenant_id}/documents — list documents
DELETE /api/portal/kb/{tenant_id}/documents/{doc_id} — delete document
POST /api/portal/kb/{tenant_id}/documents/{doc_id}/reindex — re-run ingestion
Upload flow:
1. Validate file extension against supported list
2. Upload raw bytes to MinIO kb-documents bucket (key: {tenant_id}/{doc_id}/{filename})
3. Insert KnowledgeBaseDocument row (status='processing')
4. Dispatch ingest_document.delay(doc_id, tenant_id) Celery task
5. Return 201 with {id, filename, status}
The Celery task handles text extraction, chunking, and embedding asynchronously.
Status is updated to 'ready' or 'error' when ingestion completes.
"""
from __future__ import annotations
import logging
import os
import uuid
from datetime import datetime
from typing import Annotated, Any
import boto3
from botocore.exceptions import ClientError
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, status
from pydantic import BaseModel, HttpUrl
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.api.rbac import PortalCaller, require_tenant_admin, require_tenant_member
from shared.config import settings
from shared.db import get_session
from shared.models.kb import KBChunk, KnowledgeBaseDocument
logger = logging.getLogger(__name__)
kb_router = APIRouter(prefix="/api/portal/kb", tags=["knowledge-base"])
# Supported file extensions for upload
_SUPPORTED_EXTENSIONS = {
".pdf", ".docx", ".pptx", ".xlsx", ".xls", ".csv", ".txt", ".md"
}
# ---------------------------------------------------------------------------
# Lazy Celery task import — avoids circular dependency at module load time
# ---------------------------------------------------------------------------
def _get_ingest_task() -> Any:
"""Return the ingest_document Celery task (lazy import to avoid circular deps)."""
from orchestrator.tasks import ingest_document # noqa: PLC0415
return ingest_document
# Convenience alias — tests can patch 'shared.api.kb.ingest_document'
def ingest_document(document_id: str, tenant_id: str) -> None: # type: ignore[empty-body]
"""Placeholder — replaced at call site via _get_ingest_task()."""
# ---------------------------------------------------------------------------
# MinIO client helper
# ---------------------------------------------------------------------------
def _get_minio_client() -> Any:
"""Create a boto3 S3 client pointed at MinIO."""
return boto3.client(
"s3",
endpoint_url=settings.minio_endpoint,
aws_access_key_id=settings.minio_access_key,
aws_secret_access_key=settings.minio_secret_key,
)
def _ensure_bucket(client: Any, bucket: str) -> None:
"""Create bucket if it doesn't exist."""
try:
client.head_bucket(Bucket=bucket)
except ClientError:
try:
client.create_bucket(Bucket=bucket)
except ClientError as exc:
logger.warning("Could not create bucket %s: %s", bucket, exc)
# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------
class DocumentResponse(BaseModel):
"""Response schema for a knowledge base document."""
id: str
filename: str | None
source_url: str | None
content_type: str | None
status: str
chunk_count: int | None
created_at: datetime
class UrlIngestRequest(BaseModel):
"""Request body for URL/YouTube ingestion."""
url: str
source_type: str = "web" # "web" | "youtube"
# ---------------------------------------------------------------------------
# POST /{tenant_id}/documents — file upload
# ---------------------------------------------------------------------------
@kb_router.post(
"/{tenant_id}/documents",
status_code=status.HTTP_201_CREATED,
response_model=DocumentResponse,
summary="Upload a document to the knowledge base",
)
async def upload_document(
tenant_id: uuid.UUID,
file: Annotated[UploadFile, File(description="Document file to ingest")],
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> DocumentResponse:
"""
Upload a document and dispatch the ingestion pipeline.
Supported formats: PDF, DOCX, PPTX, XLSX, XLS, CSV, TXT, MD
"""
filename = file.filename or "upload"
_, ext = os.path.splitext(filename.lower())
if ext not in _SUPPORTED_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported file type '{ext}'. Supported: {', '.join(sorted(_SUPPORTED_EXTENSIONS))}",
)
file_bytes = await file.read()
content_type = file.content_type or "application/octet-stream"
# Insert document row first to get the ID
doc = KnowledgeBaseDocument(
tenant_id=tenant_id,
agent_id=None,
filename=filename,
content_type=content_type,
status="processing",
)
session.add(doc)
await session.flush() # Populate doc.id
doc_id = doc.id
# Upload to MinIO
bucket = settings.minio_kb_bucket
key = f"{tenant_id}/{doc_id}/{filename}"
try:
minio = _get_minio_client()
_ensure_bucket(minio, bucket)
import io
minio.put_object(
Bucket=bucket,
Key=key,
Body=io.BytesIO(file_bytes),
ContentLength=len(file_bytes),
ContentType=content_type,
)
except Exception as exc:
logger.warning("MinIO upload failed for %s: %s", key, exc)
# Continue — ingestion task will try to re-fetch or fail gracefully
await session.commit()
# Dispatch async ingestion task
try:
task = _get_ingest_task()
task.delay(str(doc_id), str(tenant_id))
except Exception as exc:
logger.exception("Failed to dispatch ingest_document task for %s: %s", doc_id, exc)
return DocumentResponse(
id=str(doc_id),
filename=filename,
source_url=None,
content_type=content_type,
status="processing",
chunk_count=None,
created_at=doc.created_at or datetime.utcnow(),
)
# ---------------------------------------------------------------------------
# POST /{tenant_id}/documents/url — URL / YouTube ingest
# ---------------------------------------------------------------------------
@kb_router.post(
"/{tenant_id}/documents/url",
status_code=status.HTTP_201_CREATED,
response_model=DocumentResponse,
summary="Ingest a URL or YouTube video transcript into the knowledge base",
)
async def ingest_url(
tenant_id: uuid.UUID,
body: UrlIngestRequest,
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> DocumentResponse:
"""Ingest content from a URL (web page or YouTube video) into the KB."""
doc = KnowledgeBaseDocument(
tenant_id=tenant_id,
agent_id=None,
source_url=body.url,
content_type=None,
status="processing",
)
session.add(doc)
await session.flush()
doc_id = doc.id
await session.commit()
try:
task = _get_ingest_task()
task.delay(str(doc_id), str(tenant_id))
except Exception as exc:
logger.exception("Failed to dispatch ingest_document task for %s: %s", doc_id, exc)
return DocumentResponse(
id=str(doc_id),
filename=None,
source_url=body.url,
content_type=None,
status="processing",
chunk_count=None,
created_at=doc.created_at or datetime.utcnow(),
)
# ---------------------------------------------------------------------------
# GET /{tenant_id}/documents — list
# ---------------------------------------------------------------------------
@kb_router.get(
"/{tenant_id}/documents",
response_model=list[DocumentResponse],
summary="List knowledge base documents for a tenant",
)
async def list_documents(
tenant_id: uuid.UUID,
caller: Annotated[PortalCaller, Depends(require_tenant_member)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> list[DocumentResponse]:
"""List all KB documents for the given tenant with status and chunk count."""
result = await session.execute(
select(KnowledgeBaseDocument)
.where(KnowledgeBaseDocument.tenant_id == tenant_id)
.order_by(KnowledgeBaseDocument.created_at.desc())
)
docs = result.scalars().all()
return [
DocumentResponse(
id=str(doc.id),
filename=doc.filename,
source_url=doc.source_url,
content_type=doc.content_type,
status=doc.status,
chunk_count=doc.chunk_count,
created_at=doc.created_at,
)
for doc in docs
]
# ---------------------------------------------------------------------------
# DELETE /{tenant_id}/documents/{document_id} — delete
# ---------------------------------------------------------------------------
@kb_router.delete(
"/{tenant_id}/documents/{document_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete a knowledge base document and its chunks",
)
async def delete_document(
tenant_id: uuid.UUID,
document_id: uuid.UUID,
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> None:
"""Delete a document (CASCADE removes all kb_chunks rows automatically)."""
result = await session.execute(
select(KnowledgeBaseDocument).where(
KnowledgeBaseDocument.id == document_id,
KnowledgeBaseDocument.tenant_id == tenant_id,
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Document not found")
# Remove from MinIO if it was a file upload
if doc.filename:
bucket = settings.minio_kb_bucket
key = f"{tenant_id}/{document_id}/{doc.filename}"
try:
minio = _get_minio_client()
minio.remove_object(Bucket=bucket, Key=key)
except Exception as exc:
logger.warning("MinIO delete failed for %s: %s", key, exc)
await session.delete(doc)
await session.commit()
# ---------------------------------------------------------------------------
# POST /{tenant_id}/documents/{document_id}/reindex — re-run ingestion
# ---------------------------------------------------------------------------
@kb_router.post(
"/{tenant_id}/documents/{document_id}/reindex",
status_code=status.HTTP_202_ACCEPTED,
response_model=DocumentResponse,
summary="Delete existing chunks and re-dispatch the ingestion pipeline",
)
async def reindex_document(
tenant_id: uuid.UUID,
document_id: uuid.UUID,
caller: Annotated[PortalCaller, Depends(require_tenant_admin)],
session: Annotated[AsyncSession, Depends(get_session)],
) -> DocumentResponse:
"""Re-run the ingestion pipeline for an existing document."""
result = await session.execute(
select(KnowledgeBaseDocument).where(
KnowledgeBaseDocument.id == document_id,
KnowledgeBaseDocument.tenant_id == tenant_id,
)
)
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Document not found")
# Delete existing chunks so they get re-created
await session.execute(
delete(KBChunk).where(KBChunk.document_id == document_id)
)
# Reset status to processing
doc.status = "processing"
doc.error_message = None
doc.chunk_count = None
await session.commit()
try:
task = _get_ingest_task()
task.delay(str(document_id), str(tenant_id))
except Exception as exc:
logger.exception("Failed to dispatch reindex task for %s: %s", document_id, exc)
return DocumentResponse(
id=str(doc.id),
filename=doc.filename,
source_url=doc.source_url,
content_type=doc.content_type,
status="processing",
chunk_count=None,
created_at=doc.created_at,
)

View File

@@ -96,6 +96,10 @@ class Settings(BaseSettings):
default="konstruct-media",
description="MinIO bucket name for media attachments",
)
minio_kb_bucket: str = Field(
default="kb-documents",
description="MinIO bucket name for knowledge base documents",
)
# -------------------------------------------------------------------------
# LLM Providers
@@ -213,6 +217,30 @@ class Settings(BaseSettings):
description="HMAC secret for signing OAuth state parameters (CSRF protection)",
)
# -------------------------------------------------------------------------
# Web Search / Scraping
# -------------------------------------------------------------------------
brave_api_key: str = Field(
default="",
description="Brave Search API key for the web_search built-in tool",
)
firecrawl_api_key: str = Field(
default="",
description="Firecrawl API key for URL scraping in KB ingestion pipeline",
)
# -------------------------------------------------------------------------
# Google OAuth (Calendar integration)
# -------------------------------------------------------------------------
google_client_id: str = Field(
default="",
description="Google OAuth 2.0 Client ID for Calendar integration",
)
google_client_secret: str = Field(
default="",
description="Google OAuth 2.0 Client Secret for Calendar integration",
)
# -------------------------------------------------------------------------
# Application
# -------------------------------------------------------------------------

View File

@@ -20,6 +20,11 @@ from sqlalchemy import DateTime, ForeignKey, Integer, Text, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
# Valid status values for KnowledgeBaseDocument.status
KB_STATUS_PROCESSING = "processing"
KB_STATUS_READY = "ready"
KB_STATUS_ERROR = "error"
class KBBase(DeclarativeBase):
"""Separate declarative base for KB models."""
@@ -47,11 +52,27 @@ class KnowledgeBaseDocument(KBBase):
nullable=False,
index=True,
)
agent_id: Mapped[uuid.UUID] = mapped_column(
agent_id: Mapped[uuid.UUID | None] = mapped_column(
UUID(as_uuid=True),
nullable=False,
nullable=True,
index=True,
comment="Agent this document is associated with",
comment="Agent this document is associated with (nullable — KB is per-tenant)",
)
status: Mapped[str] = mapped_column(
Text,
nullable=False,
server_default=KB_STATUS_PROCESSING,
comment="Ingestion status: processing | ready | error",
)
error_message: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="Error details when status='error'",
)
chunk_count: Mapped[int | None] = mapped_column(
Integer,
nullable=True,
comment="Number of chunks created after successful ingestion",
)
filename: Mapped[str | None] = mapped_column(
Text,

View File

@@ -37,6 +37,8 @@ class ChannelTypeEnum(str, enum.Enum):
TEAMS = "teams"
TELEGRAM = "telegram"
SIGNAL = "signal"
WEB = "web"
GOOGLE_CALENDAR = "google_calendar"
class Tenant(Base):