Files
konstruct/tests/unit/test_kb_upload.py
Adolfo Delorenzo e8d3e8a108 feat(10-01): KB ingestion pipeline - migration, extractors, API router
- Migration 014: add status/error_message/chunk_count to kb_documents, make agent_id nullable
- Add GOOGLE_CALENDAR to ChannelTypeEnum in tenant.py
- Add brave_api_key, firecrawl_api_key, google_client_id/secret, minio_kb_bucket to config
- Add text extractors for PDF, DOCX, PPTX, XLSX/XLS, CSV, TXT, MD
- Add KB management API router with upload, list, delete, URL ingest, reindex endpoints
- Install pypdf, python-docx, python-pptx, openpyxl, pandas, firecrawl-py, youtube-transcript-api
- Update .env.example with new env vars
- Unit tests: test_extractors.py (10 tests) and test_kb_upload.py (7 tests) all pass
2026-03-26 09:05:29 -06:00

279 lines
9.4 KiB
Python

"""
Unit tests for the KB upload API router.
Tests:
- POST /{tenant_id}/documents — file upload returns 201 with document_id
- GET /{tenant_id}/documents — list returns documents with status field
- DELETE /{tenant_id}/documents/{doc_id} — removes document
- POST /{tenant_id}/documents/url — URL ingest dispatches Celery task
- POST /{tenant_id}/documents/{doc_id}/reindex — re-dispatches Celery task
All external dependencies (MinIO, DB, Celery) are mocked.
Auth dependencies are overridden via FastAPI app.dependency_overrides.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import FastAPI
from httpx import ASGITransport, AsyncClient
from shared.api.rbac import require_tenant_admin, require_tenant_member
from shared.db import get_session
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
TENANT_ID = str(uuid.uuid4())
DOC_ID = uuid.uuid4()
def _make_mock_caller() -> MagicMock:
caller = MagicMock()
caller.tenant_id = uuid.UUID(TENANT_ID)
caller.role = "admin"
return caller
def _make_test_app(mock_session: AsyncMock) -> FastAPI:
"""Create a minimal FastAPI app mounting the kb_router with overridden deps."""
from shared.api.kb import kb_router
test_app = FastAPI()
test_app.include_router(kb_router)
# Override auth dependencies so no real JWT validation happens
mock_caller = _make_mock_caller()
test_app.dependency_overrides[require_tenant_admin] = lambda: mock_caller
test_app.dependency_overrides[require_tenant_member] = lambda: mock_caller
# Override DB session
async def _override_session() -> AsyncMock: # type: ignore[return]
yield mock_session
test_app.dependency_overrides[get_session] = _override_session
return test_app
@pytest.fixture
def mock_session() -> AsyncMock:
session = AsyncMock()
session.add = MagicMock()
session.flush = AsyncMock()
session.commit = AsyncMock()
session.delete = AsyncMock()
return session
@pytest.fixture
def mock_doc() -> MagicMock:
doc = MagicMock()
doc.id = DOC_ID
doc.tenant_id = uuid.UUID(TENANT_ID)
doc.filename = "test.txt"
doc.source_url = None
doc.content_type = "text/plain"
doc.status = "processing"
doc.chunk_count = None
doc.created_at = datetime(2026, 1, 1, 12, 0, 0)
return doc
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestKbUploadEndpoint:
@pytest.mark.asyncio
async def test_upload_file_returns_201(self, mock_session: AsyncMock) -> None:
"""Uploading a file should return 201 with document_id."""
def _side_add(obj: Any) -> None:
obj.id = DOC_ID
obj.created_at = datetime(2026, 1, 1, 12, 0, 0)
mock_session.add.side_effect = _side_add
app = _make_test_app(mock_session)
with (
patch("shared.api.kb._get_minio_client") as mock_minio,
patch("shared.api.kb._get_ingest_task") as mock_get_task,
):
minio_client = MagicMock()
minio_client.put_object = MagicMock()
minio_client.head_bucket = MagicMock()
mock_minio.return_value = minio_client
mock_task = MagicMock()
mock_task.delay = MagicMock()
mock_get_task.return_value = mock_task
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
f"/api/portal/kb/{TENANT_ID}/documents",
files={"file": ("hello.txt", b"Hello world content", "text/plain")},
)
assert response.status_code == 201
data = response.json()
assert "id" in data
assert data["filename"] == "hello.txt"
assert data["status"] == "processing"
mock_task.delay.assert_called_once()
@pytest.mark.asyncio
async def test_upload_unsupported_extension_returns_400(self, mock_session: AsyncMock) -> None:
"""Uploading an unsupported file type should return 400."""
app = _make_test_app(mock_session)
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
f"/api/portal/kb/{TENANT_ID}/documents",
files={"file": ("malware.exe", b"bad bytes", "application/octet-stream")},
)
assert response.status_code == 400
assert "Unsupported" in response.json()["detail"]
class TestKbListEndpoint:
@pytest.mark.asyncio
async def test_list_returns_documents_with_status(
self, mock_session: AsyncMock, mock_doc: MagicMock
) -> None:
"""GET /{tenant_id}/documents should return list with status field."""
mock_result = MagicMock()
mock_result.scalars.return_value.all.return_value = [mock_doc]
mock_session.execute = AsyncMock(return_value=mock_result)
app = _make_test_app(mock_session)
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.get(f"/api/portal/kb/{TENANT_ID}/documents")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) == 1
assert data[0]["status"] == "processing"
assert "id" in data[0]
class TestKbDeleteEndpoint:
@pytest.mark.asyncio
async def test_delete_document_returns_204(
self, mock_session: AsyncMock, mock_doc: MagicMock
) -> None:
"""DELETE /{tenant_id}/documents/{doc_id} should remove document."""
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_doc
mock_session.execute = AsyncMock(return_value=mock_result)
app = _make_test_app(mock_session)
with patch("shared.api.kb._get_minio_client") as mock_minio:
minio_client = MagicMock()
minio_client.remove_object = MagicMock()
mock_minio.return_value = minio_client
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.delete(
f"/api/portal/kb/{TENANT_ID}/documents/{DOC_ID}"
)
assert response.status_code == 204
@pytest.mark.asyncio
async def test_delete_nonexistent_returns_404(self, mock_session: AsyncMock) -> None:
"""DELETE on a document that doesn't exist should return 404."""
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = None
mock_session.execute = AsyncMock(return_value=mock_result)
app = _make_test_app(mock_session)
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.delete(
f"/api/portal/kb/{TENANT_ID}/documents/{DOC_ID}"
)
assert response.status_code == 404
class TestKbUrlIngestEndpoint:
@pytest.mark.asyncio
async def test_url_ingest_dispatches_celery(self, mock_session: AsyncMock) -> None:
"""POST /{tenant_id}/documents/url should dispatch ingest_document task."""
def _side_add(obj: Any) -> None:
obj.id = DOC_ID
obj.created_at = datetime(2026, 1, 1, 12, 0, 0)
mock_session.add.side_effect = _side_add
app = _make_test_app(mock_session)
with patch("shared.api.kb._get_ingest_task") as mock_get_task:
mock_task = MagicMock()
mock_task.delay = MagicMock()
mock_get_task.return_value = mock_task
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
f"/api/portal/kb/{TENANT_ID}/documents/url",
json={"url": "https://example.com/page", "source_type": "web"},
)
assert response.status_code == 201
mock_task.delay.assert_called_once()
class TestKbReindexEndpoint:
@pytest.mark.asyncio
async def test_reindex_dispatches_celery(
self, mock_session: AsyncMock, mock_doc: MagicMock
) -> None:
"""POST /{tenant_id}/documents/{doc_id}/reindex should dispatch ingest task."""
mock_result = MagicMock()
mock_result.scalar_one_or_none.return_value = mock_doc
mock_session.execute = AsyncMock(return_value=mock_result)
app = _make_test_app(mock_session)
with patch("shared.api.kb._get_ingest_task") as mock_get_task:
mock_task = MagicMock()
mock_task.delay = MagicMock()
mock_get_task.return_value = mock_task
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as client:
response = await client.post(
f"/api/portal/kb/{TENANT_ID}/documents/{DOC_ID}/reindex",
)
assert response.status_code == 202
mock_task.delay.assert_called_once()