""" Document processing utilities for the RAG service. Handles text chunking and extraction from various file formats. """ import logging import subprocess import tempfile from pathlib import Path from typing import List logger = logging.getLogger("moxie-rag.processor") # Approximate chars per token for multilingual text CHARS_PER_TOKEN = 4 def chunk_text( text: str, chunk_size: int = 500, overlap: int = 50, ) -> List[str]: """ Split text into chunks of approximately chunk_size tokens with overlap. """ char_size = chunk_size * CHARS_PER_TOKEN char_overlap = overlap * CHARS_PER_TOKEN text = text.strip() if not text: return [] if len(text) <= char_size: return [text] chunks = [] start = 0 while start < len(text): end = start + char_size if end < len(text): window = text[start:end] best_break = -1 for separator in ["\n\n", ".\n", ". ", "?\n", "? ", "!\n", "! ", "\n", ", ", " "]: pos = window.rfind(separator) if pos > char_size // 2: best_break = pos + len(separator) break if best_break > 0: end = start + best_break chunk = text[start:end].strip() if chunk: chunks.append(chunk) next_start = end - char_overlap if next_start <= start: next_start = end start = next_start return chunks def extract_text_from_pdf(file_path: str) -> str: """Extract text from a PDF file using pdfplumber.""" import pdfplumber text_parts = [] with pdfplumber.open(file_path) as pdf: for i, page in enumerate(pdf.pages): page_text = page.extract_text() if page_text: text_parts.append(page_text) else: logger.debug(f"Page {i + 1}: no text extracted") result = "\n\n".join(text_parts) logger.info(f"Extracted {len(result)} chars from PDF ({len(text_parts)} pages)") return result def extract_text_from_docx(file_path: str) -> str: """Extract text from a DOCX file using python-docx.""" from docx import Document doc = Document(file_path) paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] result = "\n\n".join(paragraphs) logger.info(f"Extracted {len(result)} chars from DOCX ({len(paragraphs)} paragraphs)") return result def extract_text_from_excel(file_path: str) -> str: """Extract text from Excel files (.xlsx, .xls) using openpyxl/pandas.""" import pandas as pd text_parts = [] xls = pd.ExcelFile(file_path) for sheet_name in xls.sheet_names: df = pd.read_excel(file_path, sheet_name=sheet_name) if df.empty: continue text_parts.append(f"--- Sheet: {sheet_name} ---") # Include column headers headers = " | ".join(str(c) for c in df.columns) text_parts.append(f"Columns: {headers}") # Convert rows to readable text for idx, row in df.iterrows(): row_text = " | ".join( f"{col}: {val}" for col, val in row.items() if pd.notna(val) and str(val).strip() ) if row_text: text_parts.append(row_text) result = "\n".join(text_parts) logger.info(f"Extracted {len(result)} chars from Excel ({len(xls.sheet_names)} sheets)") return result def extract_audio_from_video(video_path: str) -> str: """Extract audio track from video file using ffmpeg. Returns path to wav file.""" audio_path = tempfile.mktemp(suffix=".wav") try: subprocess.run( [ "ffmpeg", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", audio_path, ], capture_output=True, check=True, timeout=600, ) logger.info(f"Extracted audio from video to {audio_path}") return audio_path except subprocess.CalledProcessError as e: logger.error(f"ffmpeg failed: {e.stderr.decode()}") raise ValueError(f"Could not extract audio from video: {e.stderr.decode()[:200]}") def extract_text_from_file(file_path: str, filename: str) -> str: """ Extract text from a file based on its extension. Supported: .pdf, .docx, .doc, .txt, .md, .csv, .json, .html, .xlsx, .xls """ ext = Path(filename).suffix.lower() if ext == ".pdf": return extract_text_from_pdf(file_path) elif ext in (".docx", ".doc"): return extract_text_from_docx(file_path) elif ext in (".xlsx", ".xls"): return extract_text_from_excel(file_path) elif ext in (".txt", ".md", ".csv", ".json", ".html", ".xml", ".rst"): with open(file_path, "r", encoding="utf-8", errors="ignore") as f: content = f.read() logger.info(f"Read {len(content)} chars from {ext} file") return content else: raise ValueError(f"Unsupported file type: {ext}")