339 lines
17 KiB
Markdown
339 lines
17 KiB
Markdown
---
|
|
phase: 10-agent-capabilities
|
|
plan: 01
|
|
type: execute
|
|
wave: 1
|
|
depends_on: []
|
|
files_modified:
|
|
- migrations/versions/013_kb_status_and_calendar.py
|
|
- packages/shared/shared/models/kb.py
|
|
- packages/shared/shared/models/tenant.py
|
|
- packages/shared/shared/config.py
|
|
- packages/shared/shared/api/kb.py
|
|
- packages/orchestrator/orchestrator/tools/ingest.py
|
|
- packages/orchestrator/orchestrator/tools/extractors.py
|
|
- packages/orchestrator/orchestrator/tasks.py
|
|
- packages/orchestrator/orchestrator/tools/executor.py
|
|
- packages/orchestrator/orchestrator/tools/builtins/kb_search.py
|
|
- packages/orchestrator/pyproject.toml
|
|
- .env.example
|
|
- tests/unit/test_extractors.py
|
|
- tests/unit/test_kb_upload.py
|
|
autonomous: true
|
|
requirements:
|
|
- CAP-01
|
|
- CAP-02
|
|
- CAP-03
|
|
- CAP-04
|
|
- CAP-07
|
|
|
|
must_haves:
|
|
truths:
|
|
- "Documents uploaded via API are saved to MinIO and a KbDocument row is created with status=processing"
|
|
- "The Celery ingestion task extracts text from PDF, DOCX, PPTX, XLSX, CSV, TXT, and MD files"
|
|
- "Extracted text is chunked (500 chars, 50 overlap) and embedded via all-MiniLM-L6-v2 into kb_chunks with tenant_id"
|
|
- "kb_search tool receives tenant_id injection from executor and returns matching chunks"
|
|
- "BRAVE_API_KEY and FIRECRAWL_API_KEY are platform-wide settings in shared config"
|
|
- "Tool executor injects tenant_id and agent_id into tool handler kwargs for context-aware tools"
|
|
artifacts:
|
|
- path: "migrations/versions/013_kb_status_and_calendar.py"
|
|
provides: "DB migration: kb_documents status/error_message/chunk_count columns, agent_id nullable, channel_type CHECK update for google_calendar"
|
|
contains: "status"
|
|
- path: "packages/orchestrator/orchestrator/tools/extractors.py"
|
|
provides: "Text extraction functions for all supported document formats"
|
|
exports: ["extract_text"]
|
|
- path: "packages/orchestrator/orchestrator/tools/ingest.py"
|
|
provides: "Document chunking and ingestion pipeline logic"
|
|
exports: ["chunk_text", "ingest_document_pipeline"]
|
|
- path: "packages/shared/shared/api/kb.py"
|
|
provides: "KB management API router (upload, list, delete, re-index)"
|
|
exports: ["kb_router"]
|
|
- path: "tests/unit/test_extractors.py"
|
|
provides: "Unit tests for text extraction functions"
|
|
key_links:
|
|
- from: "packages/shared/shared/api/kb.py"
|
|
to: "packages/orchestrator/orchestrator/tasks.py"
|
|
via: "ingest_document.delay(document_id, tenant_id)"
|
|
pattern: "ingest_document\\.delay"
|
|
- from: "packages/orchestrator/orchestrator/tools/executor.py"
|
|
to: "tool.handler"
|
|
via: "tenant_id/agent_id injection into kwargs"
|
|
pattern: "tenant_id.*agent_id.*handler"
|
|
---
|
|
|
|
<objective>
|
|
Build the knowledge base document ingestion pipeline backend and activate web search/HTTP tools.
|
|
|
|
Purpose: This is the core backend for CAP-02/CAP-03 -- the document upload, text extraction, chunking, embedding, and storage pipeline that makes the KB search tool functional with real data. Also fixes the tool executor to inject tenant context into tool handlers, activates web search via BRAVE_API_KEY config, and confirms HTTP request tool needs no changes (CAP-04).
|
|
|
|
Output: Working KB upload API, Celery ingestion task, text extractors for all formats, migration 013, executor tenant_id injection, updated config with new env vars.
|
|
</objective>
|
|
|
|
<execution_context>
|
|
@/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md
|
|
@/home/adelorenzo/.claude/get-shit-done/templates/summary.md
|
|
</execution_context>
|
|
|
|
<context>
|
|
@.planning/PROJECT.md
|
|
@.planning/ROADMAP.md
|
|
@.planning/STATE.md
|
|
@.planning/phases/10-agent-capabilities/10-CONTEXT.md
|
|
@.planning/phases/10-agent-capabilities/10-RESEARCH.md
|
|
|
|
<interfaces>
|
|
<!-- Key types and contracts the executor needs -->
|
|
|
|
From packages/shared/shared/models/kb.py:
|
|
```python
|
|
class KnowledgeBaseDocument(KBBase):
|
|
__tablename__ = "kb_documents"
|
|
id: Mapped[uuid.UUID]
|
|
tenant_id: Mapped[uuid.UUID]
|
|
agent_id: Mapped[uuid.UUID] # Currently NOT NULL — migration 013 makes nullable
|
|
filename: Mapped[str | None]
|
|
source_url: Mapped[str | None]
|
|
content_type: Mapped[str | None]
|
|
created_at: Mapped[datetime]
|
|
chunks: Mapped[list[KBChunk]]
|
|
|
|
class KBChunk(KBBase):
|
|
__tablename__ = "kb_chunks"
|
|
id: Mapped[uuid.UUID]
|
|
tenant_id: Mapped[uuid.UUID]
|
|
document_id: Mapped[uuid.UUID]
|
|
content: Mapped[str]
|
|
chunk_index: Mapped[int | None]
|
|
created_at: Mapped[datetime]
|
|
```
|
|
|
|
From packages/orchestrator/orchestrator/tools/executor.py:
|
|
```python
|
|
async def execute_tool(
|
|
tool_call: dict[str, Any],
|
|
registry: dict[str, "ToolDefinition"],
|
|
tenant_id: uuid.UUID,
|
|
agent_id: uuid.UUID,
|
|
audit_logger: "AuditLogger",
|
|
) -> str:
|
|
# Line 126: result = await tool.handler(**args)
|
|
# PROBLEM: only LLM-provided args are passed, tenant_id/agent_id NOT injected
|
|
```
|
|
|
|
From packages/orchestrator/orchestrator/memory/embedder.py:
|
|
```python
|
|
def embed_text(text: str) -> list[float]: # Returns 384-dim vector
|
|
def embed_texts(texts: list[str]) -> list[list[float]]: # Batch embedding
|
|
```
|
|
|
|
From packages/shared/shared/config.py:
|
|
```python
|
|
class Settings(BaseSettings):
|
|
minio_endpoint: str
|
|
minio_access_key: str
|
|
minio_secret_key: str
|
|
minio_media_bucket: str
|
|
```
|
|
|
|
From packages/shared/shared/api/channels.py:
|
|
```python
|
|
channels_router = APIRouter(prefix="/api/portal/channels", tags=["channels"])
|
|
# Uses: require_tenant_admin, get_session, KeyEncryptionService
|
|
# OAuth state: generate_oauth_state() / verify_oauth_state() with HMAC-SHA256
|
|
```
|
|
|
|
From packages/shared/shared/api/rbac.py:
|
|
```python
|
|
class PortalCaller: ...
|
|
async def require_tenant_admin(...) -> PortalCaller: ...
|
|
async def require_tenant_member(...) -> PortalCaller: ...
|
|
```
|
|
</interfaces>
|
|
</context>
|
|
|
|
<tasks>
|
|
|
|
<task type="auto" tdd="true">
|
|
<name>Task 1: Migration 013, ORM updates, config settings, text extractors, KB API router</name>
|
|
<files>
|
|
migrations/versions/013_kb_status_and_calendar.py,
|
|
packages/shared/shared/models/kb.py,
|
|
packages/shared/shared/models/tenant.py,
|
|
packages/shared/shared/config.py,
|
|
packages/shared/shared/api/kb.py,
|
|
packages/orchestrator/orchestrator/tools/extractors.py,
|
|
packages/orchestrator/pyproject.toml,
|
|
.env.example,
|
|
tests/unit/test_extractors.py,
|
|
tests/unit/test_kb_upload.py
|
|
</files>
|
|
<behavior>
|
|
- extract_text("hello.pdf", pdf_bytes) returns extracted text from PDF pages
|
|
- extract_text("doc.docx", docx_bytes) returns paragraph text from DOCX
|
|
- extract_text("slides.pptx", pptx_bytes) returns slide text from PPTX
|
|
- extract_text("data.xlsx", xlsx_bytes) returns CSV-formatted cell data
|
|
- extract_text("data.csv", csv_bytes) returns decoded UTF-8 text
|
|
- extract_text("notes.txt", txt_bytes) returns decoded text
|
|
- extract_text("notes.md", md_bytes) returns decoded text
|
|
- extract_text("file.exe", bytes) raises ValueError("Unsupported file extension")
|
|
- KB upload endpoint returns 201 with document_id for valid file
|
|
- KB list endpoint returns documents with status field
|
|
- KB delete endpoint removes document and chunks
|
|
</behavior>
|
|
<action>
|
|
1. **Migration 013** (`migrations/versions/013_kb_status_and_calendar.py`):
|
|
- ALTER TABLE kb_documents ADD COLUMN status TEXT NOT NULL DEFAULT 'processing'
|
|
- ALTER TABLE kb_documents ADD COLUMN error_message TEXT
|
|
- ALTER TABLE kb_documents ADD COLUMN chunk_count INTEGER
|
|
- ALTER TABLE kb_documents ALTER COLUMN agent_id DROP NOT NULL (KB is per-tenant per locked decision)
|
|
- DROP + re-ADD channel_connections CHECK constraint to include 'google_calendar' (same pattern as migration 008)
|
|
- New channel types tuple: slack, whatsapp, mattermost, rocketchat, teams, telegram, signal, web, google_calendar
|
|
- Add CHECK constraint on kb_documents.status: CHECK (status IN ('processing', 'ready', 'error'))
|
|
|
|
2. **ORM updates**:
|
|
- `packages/shared/shared/models/kb.py`: Add status (str, server_default='processing'), error_message (str | None), chunk_count (int | None) mapped columns to KnowledgeBaseDocument. Change agent_id to nullable=True.
|
|
- `packages/shared/shared/models/tenant.py`: Add GOOGLE_CALENDAR = "google_calendar" to ChannelTypeEnum
|
|
|
|
3. **Config** (`packages/shared/shared/config.py`):
|
|
- Add brave_api_key: str = Field(default="", description="Brave Search API key")
|
|
- Add firecrawl_api_key: str = Field(default="", description="Firecrawl API key for URL scraping")
|
|
- Add google_client_id: str = Field(default="", description="Google OAuth client ID")
|
|
- Add google_client_secret: str = Field(default="", description="Google OAuth client secret")
|
|
- Add minio_kb_bucket: str = Field(default="kb-documents", description="MinIO bucket for KB documents")
|
|
- Update .env.example with all new env vars
|
|
|
|
4. **Install dependencies** on orchestrator:
|
|
```bash
|
|
uv add --project packages/orchestrator pypdf python-docx python-pptx openpyxl pandas firecrawl-py youtube-transcript-api google-api-python-client google-auth-oauthlib
|
|
```
|
|
|
|
5. **Text extractors** (`packages/orchestrator/orchestrator/tools/extractors.py`):
|
|
- Create extract_text(filename: str, file_bytes: bytes) -> str function
|
|
- PDF: pypdf PdfReader on BytesIO, join page text with newlines
|
|
- DOCX: python-docx Document on BytesIO, join paragraph text
|
|
- PPTX: python-pptx Presentation on BytesIO, iterate slides/shapes for text
|
|
- XLSX/XLS: pandas read_excel on BytesIO, to_csv(index=False)
|
|
- CSV: decode UTF-8 with errors="replace"
|
|
- TXT/MD: decode UTF-8 with errors="replace"
|
|
- Raise ValueError for unsupported extensions
|
|
- After extraction, check if len(text.strip()) < 100 chars for PDF — return error message about OCR not supported
|
|
|
|
6. **KB API router** (`packages/shared/shared/api/kb.py`):
|
|
- kb_router = APIRouter(prefix="/api/portal/kb", tags=["knowledge-base"])
|
|
- POST /{tenant_id}/documents — multipart file upload (UploadFile + File)
|
|
- Validate file extension against supported list
|
|
- Read file bytes, upload to MinIO kb-documents bucket with key: {tenant_id}/{doc_id}/{filename}
|
|
- Insert KnowledgeBaseDocument(tenant_id, filename, content_type, status='processing', agent_id=None)
|
|
- Call ingest_document.delay(str(doc.id), str(tenant_id)) — import from orchestrator.tasks
|
|
- Return 201 with {"id": str(doc.id), "filename": filename, "status": "processing"}
|
|
- Guard with require_tenant_admin
|
|
- POST /{tenant_id}/documents/url — JSON body {url: str, source_type: "web" | "youtube"}
|
|
- Insert KnowledgeBaseDocument(tenant_id, source_url=url, status='processing', agent_id=None)
|
|
- Call ingest_document.delay(str(doc.id), str(tenant_id))
|
|
- Return 201
|
|
- Guard with require_tenant_admin
|
|
- GET /{tenant_id}/documents — list KbDocuments for tenant with status, chunk_count, created_at
|
|
- Guard with require_tenant_member (operators can view)
|
|
- DELETE /{tenant_id}/documents/{document_id} — delete document (CASCADE deletes chunks)
|
|
- Also delete file from MinIO if filename present
|
|
- Guard with require_tenant_admin
|
|
- POST /{tenant_id}/documents/{document_id}/reindex — delete existing chunks, re-dispatch ingest_document.delay
|
|
- Guard with require_tenant_admin
|
|
|
|
7. **Tests** (write BEFORE implementation per tdd=true):
|
|
- test_extractors.py: test each format extraction with minimal valid files (create in-memory test fixtures using the libraries)
|
|
- test_kb_upload.py: test upload endpoint with mocked MinIO and mocked Celery task dispatch
|
|
</action>
|
|
<verify>
|
|
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_extractors.py tests/unit/test_kb_upload.py -x -q</automated>
|
|
</verify>
|
|
<done>Migration 013 exists with all schema changes. Text extractors handle all 7 format families. KB API router has upload, list, delete, URL ingest, and reindex endpoints. All unit tests pass.</done>
|
|
</task>
|
|
|
|
<task type="auto" tdd="true">
|
|
<name>Task 2: Celery ingestion task, executor tenant_id injection, KB search wiring</name>
|
|
<files>
|
|
packages/orchestrator/orchestrator/tasks.py,
|
|
packages/orchestrator/orchestrator/tools/ingest.py,
|
|
packages/orchestrator/orchestrator/tools/executor.py,
|
|
packages/orchestrator/orchestrator/tools/builtins/kb_search.py,
|
|
packages/orchestrator/orchestrator/tools/builtins/web_search.py,
|
|
tests/unit/test_ingestion.py,
|
|
tests/unit/test_executor_injection.py
|
|
</files>
|
|
<behavior>
|
|
- chunk_text("hello world " * 100, chunk_size=500, overlap=50) returns overlapping chunks of correct size
|
|
- ingest_document_pipeline fetches file from MinIO, extracts text, chunks, embeds, inserts kb_chunks rows, updates status to 'ready'
|
|
- ingest_document_pipeline sets status='error' with error_message on failure
|
|
- execute_tool injects tenant_id and agent_id into handler kwargs before calling handler
|
|
- web_search reads BRAVE_API_KEY from settings (not os.getenv) for consistency
|
|
- kb_search receives injected tenant_id from executor
|
|
</behavior>
|
|
<action>
|
|
1. **Chunking + ingestion logic** (`packages/orchestrator/orchestrator/tools/ingest.py`):
|
|
- chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]
|
|
- Simple sliding window chunker, strip empty chunks
|
|
- async ingest_document_pipeline(document_id: str, tenant_id: str) -> None:
|
|
- Load KnowledgeBaseDocument from DB by ID (use RLS with tenant_id)
|
|
- If filename: download file bytes from MinIO (boto3 client, kb-documents bucket, key: {tenant_id}/{document_id}/{filename})
|
|
- If source_url and source_url contains "youtube.com" or "youtu.be": use youtube_transcript_api to fetch transcript
|
|
- If source_url and not YouTube: use firecrawl-py to scrape URL to markdown (graceful error if FIRECRAWL_API_KEY not set)
|
|
- Call extract_text(filename, file_bytes) for file uploads
|
|
- Call chunk_text(text) on extracted text
|
|
- Batch embed chunks using embed_texts() from embedder.py
|
|
- INSERT kb_chunks rows with embedding vectors (use raw SQL text() with CAST(:embedding AS vector) pattern from kb_search.py)
|
|
- UPDATE kb_documents SET status='ready', chunk_count=len(chunks)
|
|
- On any error: UPDATE kb_documents SET status='error', error_message=str(exc)
|
|
|
|
2. **Celery task** in `packages/orchestrator/orchestrator/tasks.py`:
|
|
- Add ingest_document Celery task (sync def with asyncio.run per hard architectural constraint)
|
|
- @celery_app.task(bind=True, max_retries=2, ignore_result=True)
|
|
- def ingest_document(self, document_id: str, tenant_id: str) -> None
|
|
- Calls asyncio.run(ingest_document_pipeline(document_id, tenant_id))
|
|
- On exception: asyncio.run to mark document as error, then self.retry(countdown=60)
|
|
|
|
3. **Executor tenant_id injection** (`packages/orchestrator/orchestrator/tools/executor.py`):
|
|
- Before calling tool.handler(**args), inject tenant_id and agent_id as string kwargs:
|
|
args["tenant_id"] = str(tenant_id)
|
|
args["agent_id"] = str(agent_id)
|
|
- This makes kb_search, calendar_lookup, and future context-aware tools work without LLM needing to know tenant context
|
|
- Place injection AFTER schema validation (line ~126) so the injected keys don't fail validation
|
|
|
|
4. **Update web_search.py**: Change `os.getenv("BRAVE_API_KEY", "")` to import settings from shared.config and use `settings.brave_api_key` for consistency with platform-wide config pattern.
|
|
|
|
5. **Tests** (write BEFORE implementation):
|
|
- test_ingestion.py: test chunk_text with various inputs, test ingest_document_pipeline with mocked MinIO/DB/embedder
|
|
- test_executor_injection.py: test that execute_tool injects tenant_id/agent_id into handler kwargs
|
|
</action>
|
|
<verify>
|
|
<automated>cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_ingestion.py tests/unit/test_executor_injection.py -x -q</automated>
|
|
</verify>
|
|
<done>Celery ingest_document task dispatches async ingestion pipeline. Pipeline downloads files from MinIO, extracts text, chunks, embeds, and stores in kb_chunks. Executor injects tenant_id/agent_id into all tool handlers. web_search uses shared config. All tests pass.</done>
|
|
</task>
|
|
|
|
</tasks>
|
|
|
|
<verification>
|
|
- Migration 013 applies cleanly: `cd /home/adelorenzo/repos/konstruct && alembic upgrade head`
|
|
- All unit tests pass: `pytest tests/unit/test_extractors.py tests/unit/test_kb_upload.py tests/unit/test_ingestion.py tests/unit/test_executor_injection.py -x -q`
|
|
- KB API router mounts and serves: import kb_router without errors
|
|
- Executor properly injects tenant context into tool handlers
|
|
</verification>
|
|
|
|
<success_criteria>
|
|
- KnowledgeBaseDocument has status, error_message, chunk_count columns; agent_id is nullable
|
|
- channel_connections CHECK constraint includes 'google_calendar'
|
|
- Text extraction works for PDF, DOCX, PPTX, XLSX, CSV, TXT, MD
|
|
- KB upload endpoint accepts files and dispatches Celery task
|
|
- KB list/delete/reindex endpoints work
|
|
- URL and YouTube ingestion endpoints dispatch Celery tasks
|
|
- Celery ingestion pipeline: extract -> chunk -> embed -> store
|
|
- Tool executor injects tenant_id and agent_id into handler kwargs
|
|
- BRAVE_API_KEY and FIRECRAWL_API_KEY in shared config
|
|
- All unit tests pass
|
|
</success_criteria>
|
|
|
|
<output>
|
|
After completion, create `.planning/phases/10-agent-capabilities/10-01-SUMMARY.md`
|
|
</output>
|