17 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | requirements | must_haves | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 10-agent-capabilities | 01 | execute | 1 |
|
true |
|
|
Purpose: This is the core backend for CAP-02/CAP-03 -- the document upload, text extraction, chunking, embedding, and storage pipeline that makes the KB search tool functional with real data. Also fixes the tool executor to inject tenant context into tool handlers, activates web search via BRAVE_API_KEY config, and confirms HTTP request tool needs no changes (CAP-04).
Output: Working KB upload API, Celery ingestion task, text extractors for all formats, migration 013, executor tenant_id injection, updated config with new env vars.
<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/10-agent-capabilities/10-CONTEXT.md @.planning/phases/10-agent-capabilities/10-RESEARCH.mdFrom packages/shared/shared/models/kb.py:
class KnowledgeBaseDocument(KBBase):
__tablename__ = "kb_documents"
id: Mapped[uuid.UUID]
tenant_id: Mapped[uuid.UUID]
agent_id: Mapped[uuid.UUID] # Currently NOT NULL — migration 013 makes nullable
filename: Mapped[str | None]
source_url: Mapped[str | None]
content_type: Mapped[str | None]
created_at: Mapped[datetime]
chunks: Mapped[list[KBChunk]]
class KBChunk(KBBase):
__tablename__ = "kb_chunks"
id: Mapped[uuid.UUID]
tenant_id: Mapped[uuid.UUID]
document_id: Mapped[uuid.UUID]
content: Mapped[str]
chunk_index: Mapped[int | None]
created_at: Mapped[datetime]
From packages/orchestrator/orchestrator/tools/executor.py:
async def execute_tool(
tool_call: dict[str, Any],
registry: dict[str, "ToolDefinition"],
tenant_id: uuid.UUID,
agent_id: uuid.UUID,
audit_logger: "AuditLogger",
) -> str:
# Line 126: result = await tool.handler(**args)
# PROBLEM: only LLM-provided args are passed, tenant_id/agent_id NOT injected
From packages/orchestrator/orchestrator/memory/embedder.py:
def embed_text(text: str) -> list[float]: # Returns 384-dim vector
def embed_texts(texts: list[str]) -> list[list[float]]: # Batch embedding
From packages/shared/shared/config.py:
class Settings(BaseSettings):
minio_endpoint: str
minio_access_key: str
minio_secret_key: str
minio_media_bucket: str
From packages/shared/shared/api/channels.py:
channels_router = APIRouter(prefix="/api/portal/channels", tags=["channels"])
# Uses: require_tenant_admin, get_session, KeyEncryptionService
# OAuth state: generate_oauth_state() / verify_oauth_state() with HMAC-SHA256
From packages/shared/shared/api/rbac.py:
class PortalCaller: ...
async def require_tenant_admin(...) -> PortalCaller: ...
async def require_tenant_member(...) -> PortalCaller: ...
2. **ORM updates**:
- `packages/shared/shared/models/kb.py`: Add status (str, server_default='processing'), error_message (str | None), chunk_count (int | None) mapped columns to KnowledgeBaseDocument. Change agent_id to nullable=True.
- `packages/shared/shared/models/tenant.py`: Add GOOGLE_CALENDAR = "google_calendar" to ChannelTypeEnum
3. **Config** (`packages/shared/shared/config.py`):
- Add brave_api_key: str = Field(default="", description="Brave Search API key")
- Add firecrawl_api_key: str = Field(default="", description="Firecrawl API key for URL scraping")
- Add google_client_id: str = Field(default="", description="Google OAuth client ID")
- Add google_client_secret: str = Field(default="", description="Google OAuth client secret")
- Add minio_kb_bucket: str = Field(default="kb-documents", description="MinIO bucket for KB documents")
- Update .env.example with all new env vars
4. **Install dependencies** on orchestrator:
```bash
uv add --project packages/orchestrator pypdf python-docx python-pptx openpyxl pandas firecrawl-py youtube-transcript-api google-api-python-client google-auth-oauthlib
```
5. **Text extractors** (`packages/orchestrator/orchestrator/tools/extractors.py`):
- Create extract_text(filename: str, file_bytes: bytes) -> str function
- PDF: pypdf PdfReader on BytesIO, join page text with newlines
- DOCX: python-docx Document on BytesIO, join paragraph text
- PPTX: python-pptx Presentation on BytesIO, iterate slides/shapes for text
- XLSX/XLS: pandas read_excel on BytesIO, to_csv(index=False)
- CSV: decode UTF-8 with errors="replace"
- TXT/MD: decode UTF-8 with errors="replace"
- Raise ValueError for unsupported extensions
- After extraction, check if len(text.strip()) < 100 chars for PDF — return error message about OCR not supported
6. **KB API router** (`packages/shared/shared/api/kb.py`):
- kb_router = APIRouter(prefix="/api/portal/kb", tags=["knowledge-base"])
- POST /{tenant_id}/documents — multipart file upload (UploadFile + File)
- Validate file extension against supported list
- Read file bytes, upload to MinIO kb-documents bucket with key: {tenant_id}/{doc_id}/{filename}
- Insert KnowledgeBaseDocument(tenant_id, filename, content_type, status='processing', agent_id=None)
- Call ingest_document.delay(str(doc.id), str(tenant_id)) — import from orchestrator.tasks
- Return 201 with {"id": str(doc.id), "filename": filename, "status": "processing"}
- Guard with require_tenant_admin
- POST /{tenant_id}/documents/url — JSON body {url: str, source_type: "web" | "youtube"}
- Insert KnowledgeBaseDocument(tenant_id, source_url=url, status='processing', agent_id=None)
- Call ingest_document.delay(str(doc.id), str(tenant_id))
- Return 201
- Guard with require_tenant_admin
- GET /{tenant_id}/documents — list KbDocuments for tenant with status, chunk_count, created_at
- Guard with require_tenant_member (operators can view)
- DELETE /{tenant_id}/documents/{document_id} — delete document (CASCADE deletes chunks)
- Also delete file from MinIO if filename present
- Guard with require_tenant_admin
- POST /{tenant_id}/documents/{document_id}/reindex — delete existing chunks, re-dispatch ingest_document.delay
- Guard with require_tenant_admin
7. **Tests** (write BEFORE implementation per tdd=true):
- test_extractors.py: test each format extraction with minimal valid files (create in-memory test fixtures using the libraries)
- test_kb_upload.py: test upload endpoint with mocked MinIO and mocked Celery task dispatch
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_extractors.py tests/unit/test_kb_upload.py -x -q
Migration 013 exists with all schema changes. Text extractors handle all 7 format families. KB API router has upload, list, delete, URL ingest, and reindex endpoints. All unit tests pass.
Task 2: Celery ingestion task, executor tenant_id injection, KB search wiring
packages/orchestrator/orchestrator/tasks.py,
packages/orchestrator/orchestrator/tools/ingest.py,
packages/orchestrator/orchestrator/tools/executor.py,
packages/orchestrator/orchestrator/tools/builtins/kb_search.py,
packages/orchestrator/orchestrator/tools/builtins/web_search.py,
tests/unit/test_ingestion.py,
tests/unit/test_executor_injection.py
- chunk_text("hello world " * 100, chunk_size=500, overlap=50) returns overlapping chunks of correct size
- ingest_document_pipeline fetches file from MinIO, extracts text, chunks, embeds, inserts kb_chunks rows, updates status to 'ready'
- ingest_document_pipeline sets status='error' with error_message on failure
- execute_tool injects tenant_id and agent_id into handler kwargs before calling handler
- web_search reads BRAVE_API_KEY from settings (not os.getenv) for consistency
- kb_search receives injected tenant_id from executor
1. **Chunking + ingestion logic** (`packages/orchestrator/orchestrator/tools/ingest.py`):
- chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]
- Simple sliding window chunker, strip empty chunks
- async ingest_document_pipeline(document_id: str, tenant_id: str) -> None:
- Load KnowledgeBaseDocument from DB by ID (use RLS with tenant_id)
- If filename: download file bytes from MinIO (boto3 client, kb-documents bucket, key: {tenant_id}/{document_id}/{filename})
- If source_url and source_url contains "youtube.com" or "youtu.be": use youtube_transcript_api to fetch transcript
- If source_url and not YouTube: use firecrawl-py to scrape URL to markdown (graceful error if FIRECRAWL_API_KEY not set)
- Call extract_text(filename, file_bytes) for file uploads
- Call chunk_text(text) on extracted text
- Batch embed chunks using embed_texts() from embedder.py
- INSERT kb_chunks rows with embedding vectors (use raw SQL text() with CAST(:embedding AS vector) pattern from kb_search.py)
- UPDATE kb_documents SET status='ready', chunk_count=len(chunks)
- On any error: UPDATE kb_documents SET status='error', error_message=str(exc)
2. **Celery task** in `packages/orchestrator/orchestrator/tasks.py`:
- Add ingest_document Celery task (sync def with asyncio.run per hard architectural constraint)
- @celery_app.task(bind=True, max_retries=2, ignore_result=True)
- def ingest_document(self, document_id: str, tenant_id: str) -> None
- Calls asyncio.run(ingest_document_pipeline(document_id, tenant_id))
- On exception: asyncio.run to mark document as error, then self.retry(countdown=60)
3. **Executor tenant_id injection** (`packages/orchestrator/orchestrator/tools/executor.py`):
- Before calling tool.handler(**args), inject tenant_id and agent_id as string kwargs:
args["tenant_id"] = str(tenant_id)
args["agent_id"] = str(agent_id)
- This makes kb_search, calendar_lookup, and future context-aware tools work without LLM needing to know tenant context
- Place injection AFTER schema validation (line ~126) so the injected keys don't fail validation
4. **Update web_search.py**: Change `os.getenv("BRAVE_API_KEY", "")` to import settings from shared.config and use `settings.brave_api_key` for consistency with platform-wide config pattern.
5. **Tests** (write BEFORE implementation):
- test_ingestion.py: test chunk_text with various inputs, test ingest_document_pipeline with mocked MinIO/DB/embedder
- test_executor_injection.py: test that execute_tool injects tenant_id/agent_id into handler kwargs
cd /home/adelorenzo/repos/konstruct && python -m pytest tests/unit/test_ingestion.py tests/unit/test_executor_injection.py -x -q
Celery ingest_document task dispatches async ingestion pipeline. Pipeline downloads files from MinIO, extracts text, chunks, embeds, and stores in kb_chunks. Executor injects tenant_id/agent_id into all tool handlers. web_search uses shared config. All tests pass.
- Migration 013 applies cleanly: `cd /home/adelorenzo/repos/konstruct && alembic upgrade head`
- All unit tests pass: `pytest tests/unit/test_extractors.py tests/unit/test_kb_upload.py tests/unit/test_ingestion.py tests/unit/test_executor_injection.py -x -q`
- KB API router mounts and serves: import kb_router without errors
- Executor properly injects tenant context into tool handlers
<success_criteria>
- KnowledgeBaseDocument has status, error_message, chunk_count columns; agent_id is nullable
- channel_connections CHECK constraint includes 'google_calendar'
- Text extraction works for PDF, DOCX, PPTX, XLSX, CSV, TXT, MD
- KB upload endpoint accepts files and dispatches Celery task
- KB list/delete/reindex endpoints work
- URL and YouTube ingestion endpoints dispatch Celery tasks
- Celery ingestion pipeline: extract -> chunk -> embed -> store
- Tool executor injects tenant_id and agent_id into handler kwargs
- BRAVE_API_KEY and FIRECRAWL_API_KEY in shared config
- All unit tests pass </success_criteria>