import re #!/usr/bin/env python3 """ Email poller for Zeus RAG — checks zeus@zz11.net via IMAP, downloads attachments, and ingests them into the RAG service. Also ingests email body text. """ import email import email.header import imaplib import json import logging import os import sys import tempfile import time from datetime import datetime from email.message import Message from pathlib import Path import httpx # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- IMAP_HOST = os.environ.get("IMAP_HOST", "mail.oe74.net") IMAP_PORT = int(os.environ.get("IMAP_PORT", "993")) IMAP_USER = os.environ.get("IMAP_USER", "zeus@zz11.net") IMAP_PASS = os.environ.get("IMAP_PASS", "") RAG_URL = os.environ.get("RAG_URL", "http://moxie-rag:8899") RAG_COLLECTION = os.environ.get("RAG_COLLECTION", "") # empty = default collection POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "60")) # seconds STATE_FILE = os.environ.get("STATE_FILE", "/app/data/email_state.json") # Whitelist of allowed senders (comma-separated email addresses) ALLOWED_SENDERS = os.environ.get("ALLOWED_SENDERS", "") ALLOWED_SENDERS_LIST = [s.strip().lower() for s in ALLOWED_SENDERS.split(",") if s.strip()] SUPPORTED_EXTENSIONS = { ".pdf", ".docx", ".doc", ".txt", ".md", ".csv", ".json", ".xlsx", ".xls", ".html", ".xml", } MEDIA_EXTENSIONS = { ".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv", ".mp3", ".wav", ".ogg", ".m4a", ".flac", ".aac", } LOG_DIR = Path(os.environ.get("LOG_DIR", "/app/logs")) LOG_DIR.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", handlers=[ logging.FileHandler(LOG_DIR / "email_poller.log"), logging.StreamHandler(sys.stdout), ], ) logger = logging.getLogger("zeus-email-poller") # --------------------------------------------------------------------------- # State management (track processed emails) # --------------------------------------------------------------------------- def load_state() -> dict: if os.path.exists(STATE_FILE): with open(STATE_FILE) as f: return json.load(f) return {"processed_uids": [], "last_check": None} def save_state(state: dict): Path(STATE_FILE).parent.mkdir(parents=True, exist_ok=True) with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2) # --------------------------------------------------------------------------- # Email processing # --------------------------------------------------------------------------- def decode_header_value(value: str) -> str: """Decode MIME encoded header value.""" if not value: return "" parts = email.header.decode_header(value) decoded = [] for part, charset in parts: if isinstance(part, bytes): decoded.append(part.decode(charset or "utf-8", errors="replace")) else: decoded.append(part) return " ".join(decoded) def get_email_body(msg: Message) -> str: """Extract plain text body from email message.""" body_parts = [] if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() if ctype == "text/plain": payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" body_parts.append(payload.decode(charset, errors="replace")) elif ctype == "text/html" and not body_parts: # Fallback to HTML if no plain text payload = part.get_payload(decode=True) if payload: charset = part.get_content_charset() or "utf-8" body_parts.append(payload.decode(charset, errors="replace")) else: payload = msg.get_payload(decode=True) if payload: charset = msg.get_content_charset() or "utf-8" body_parts.append(payload.decode(charset, errors="replace")) return "\n".join(body_parts).strip() def get_attachments(msg: Message) -> list: """Extract attachments from email message.""" attachments = [] for part in msg.walk(): if part.get_content_maintype() == "multipart": continue filename = part.get_filename() if filename: filename = decode_header_value(filename) payload = part.get_payload(decode=True) if payload: attachments.append({"filename": filename, "data": payload}) return attachments def ingest_text(content: str, title: str, source: str, doc_type: str = "email"): """Send text to the RAG ingest endpoint.""" try: payload = { "content": content, "title": title, "source": source, "doc_type": doc_type, "date": datetime.now().isoformat(), } if RAG_COLLECTION: payload["collection"] = RAG_COLLECTION resp = httpx.post( f"{RAG_URL}/ingest", json=payload, timeout=120.0, ) if resp.status_code == 200: result = resp.json() logger.info(f"Ingested text '{title}': {result.get('chunks_created', 0)} chunks") return result else: logger.error(f"Ingest failed ({resp.status_code}): {resp.text}") except Exception as e: logger.error(f"Error ingesting text: {e}") return None def ingest_file(filepath: str, filename: str, source: str, doc_type: str = None): """Send a file to the RAG ingest-file endpoint.""" ext = Path(filename).suffix.lower() try: form_data = { "title": filename, "source": source, "doc_type": doc_type or ext.lstrip("."), } if RAG_COLLECTION: form_data["collection"] = RAG_COLLECTION with open(filepath, "rb") as f: resp = httpx.post( f"{RAG_URL}/ingest-file", files={"file": (filename, f)}, data=form_data, timeout=300.0, ) if resp.status_code == 200: result = resp.json() logger.info(f"Ingested file '{filename}': {result.get('chunks_created', 0)} chunks") return result else: logger.error(f"File ingest failed ({resp.status_code}): {resp.text}") except Exception as e: logger.error(f"Error ingesting file '{filename}': {e}") return None def transcribe_and_ingest(filepath: str, filename: str, source: str): """Send audio/video to transcribe endpoint with auto_ingest=true.""" try: form_data = { "auto_ingest": "true", "title": f"Transcription: {filename}", "source": source, } if RAG_COLLECTION: form_data["collection"] = RAG_COLLECTION with open(filepath, "rb") as f: resp = httpx.post( f"{RAG_URL}/transcribe", files={"file": (filename, f)}, data=form_data, timeout=600.0, ) if resp.status_code == 200: result = resp.json() logger.info( f"Transcribed+ingested '{filename}': " f"{result.get('word_count', 0)} words, " f"{result.get('chunks_created', 0)} chunks" ) return result else: logger.error(f"Transcribe failed ({resp.status_code}): {resp.text}") except Exception as e: logger.error(f"Error transcribing '{filename}': {e}") return None def process_email(uid: str, msg: Message) -> dict: """Process a single email: extract body and attachments, ingest everything.""" subject = decode_header_value(msg.get("Subject", "No Subject")) sender = decode_header_value(msg.get("From", "Unknown")) date_str = msg.get("Date", datetime.now().isoformat()) source = f"email:{sender}" logger.info(f"Processing email UID={uid}: '{subject}' from {sender}") # Check sender whitelist if ALLOWED_SENDERS_LIST: sender_email = sender.lower() # Extract email from "Name " format email_match = re.search(r'<([^>]+)>', sender_email) if email_match: sender_email = email_match.group(1) if sender_email not in ALLOWED_SENDERS_LIST: logger.warning(f"Rejecting email from {sender}: not in whitelist") return {"uid": uid, "subject": subject, "sender": sender, "rejected": True, "reason": "sender_not_allowed"} results = {"uid": uid, "subject": subject, "sender": sender, "ingested": []} # 1. Ingest email body body = get_email_body(msg) if body and len(body.strip()) > 20: title = f"Email: {subject}" content = f"From: {sender}\nDate: {date_str}\nSubject: {subject}\n\n{body}" r = ingest_text(content, title, source, doc_type="email") if r: results["ingested"].append({"type": "body", "title": title, **r}) # 2. Process attachments attachments = get_attachments(msg) for att in attachments: filename = att["filename"] ext = Path(filename).suffix.lower() with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as tmp: tmp.write(att["data"]) tmp_path = tmp.name try: att_source = f"email-attachment:{sender}:{filename}" if ext in SUPPORTED_EXTENSIONS: r = ingest_file(tmp_path, filename, att_source) if r: results["ingested"].append({"type": "file", "filename": filename, **r}) elif ext in MEDIA_EXTENSIONS: r = transcribe_and_ingest(tmp_path, filename, att_source) if r: results["ingested"].append({"type": "media", "filename": filename, **r}) else: logger.warning(f"Skipping unsupported attachment: {filename} ({ext})") finally: os.unlink(tmp_path) return results def check_emails(): """Connect to IMAP, fetch unread emails, process them.""" state = load_state() processed = set(state.get("processed_uids", [])) logger.info(f"Connecting to {IMAP_HOST}:{IMAP_PORT} as {IMAP_USER}...") try: imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) imap.login(IMAP_USER, IMAP_PASS) imap.select("INBOX") # Search for UNSEEN messages status, data = imap.search(None, "UNSEEN") if status != "OK": logger.error(f"IMAP search failed: {status}") return message_nums = data[0].split() if not message_nums: logger.info("No new emails.") imap.logout() return logger.info(f"Found {len(message_nums)} unread email(s)") for num in message_nums: # Get UID status, uid_data = imap.fetch(num, "(UID)") if status != "OK": continue uid = uid_data[0].decode().split("UID ")[1].split(")")[0].strip() if uid in processed: logger.info(f"Skipping already-processed UID={uid}") continue # Fetch full message status, msg_data = imap.fetch(num, "(RFC822)") if status != "OK": continue raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) try: result = process_email(uid, msg) processed.add(uid) total_ingested = len(result.get("ingested", [])) logger.info( f"Email UID={uid} processed: " f"{total_ingested} item(s) ingested" ) except Exception as e: logger.error(f"Error processing UID={uid}: {e}", exc_info=True) imap.logout() except imaplib.IMAP4.error as e: logger.error(f"IMAP error: {e}") except Exception as e: logger.error(f"Unexpected error: {e}", exc_info=True) # Save state state["processed_uids"] = list(processed)[-500:] # Keep last 500 state["last_check"] = datetime.now().isoformat() save_state(state) # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def main(): if not IMAP_PASS: logger.error("IMAP_PASS not set! Cannot connect to email.") sys.exit(1) logger.info(f"Email Poller starting — checking {IMAP_USER} every {POLL_INTERVAL}s") logger.info(f"RAG endpoint: {RAG_URL}") if RAG_COLLECTION: logger.info(f"Target collection: {RAG_COLLECTION}") else: logger.info("Target collection: default") # Wait for RAG service to be ready for attempt in range(30): try: resp = httpx.get(f"{RAG_URL}/health", timeout=5.0) if resp.status_code == 200: logger.info("RAG service is ready!") break except Exception: pass logger.info(f"Waiting for RAG service... (attempt {attempt + 1}/30)") time.sleep(5) else: logger.error("RAG service not available after 150s, starting anyway") while True: try: check_emails() except Exception as e: logger.error(f"Poll cycle error: {e}", exc_info=True) time.sleep(POLL_INTERVAL) if __name__ == "__main__": main()