#!/usr/bin/env python3 """ IMAP IDLE Email Poller for RAG Integration Replaces polling with push-based notifications. """ import imaplib import email import os import json import time import requests from datetime import datetime from pathlib import Path # Configuration from environment IMAP_HOST = os.environ.get("IMAP_HOST", "mail.oe74.net") IMAP_PORT = int(os.environ.get("IMAP_PORT", "993")) IMAP_USER = os.environ.get("IMAP_USER") IMAP_PASS = os.environ.get("IMAP_PASS") RAG_URL = os.environ.get("RAG_URL", "http://moxie-rag:8899") RAG_COLLECTION = os.environ.get("RAG_COLLECTION") ALLOWED_SENDERS = os.environ.get("ALLOWED_SENDERS", "").split(",") STATE_FILE = os.environ.get("STATE_FILE", "/app/data/idle_state.json") LOG_DIR = os.environ.get("LOG_DIR", "/app/logs") # IDLE timeout (reconnect after this many seconds) IDLE_TIMEOUT = 1800 # 30 minutes def log(msg): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_msg = f"[{timestamp}] {msg}" print(log_msg) # Also write to log file log_file = Path(LOG_DIR) / f"{RAG_COLLECTION}_idle.log" log_file.parent.mkdir(parents=True, exist_ok=True) with open(log_file, "a") as f: f.write(log_msg + "\n") def load_state(): """Load processed message IDs""" try: with open(STATE_FILE, "r") as f: return set(json.load(f)) except (FileNotFoundError, json.JSONDecodeError): return set() def save_state(processed_ids): """Save processed message IDs""" Path(STATE_FILE).parent.mkdir(parents=True, exist_ok=True) with open(STATE_FILE, "w") as f: json.dump(list(processed_ids), f) def process_email(imap, msg_id): """Download and process an email""" try: typ, data = imap.fetch(msg_id, '(RFC822)') if not data or not data[0]: return False raw_email = data[0][1] msg = email.message_from_bytes(raw_email) sender = msg.get('From', '') subject = msg.get('Subject', '') log(f"Processing email {msg_id}: {subject[:50]}... from {sender}") # Check allowed senders if ALLOWED_SENDERS and ALLOWED_SENDERS[0]: allowed = False for allowed_sender in ALLOWED_SENDERS: if allowed_sender.strip().lower() in sender.lower(): allowed = True break if not allowed: log(f"Skipping - sender not in allowlist") return False # Process attachments attachments = [] if msg.is_multipart(): for part in msg.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue filename = part.get_filename() if filename: content = part.get_payload(decode=True) attachments.append({ 'filename': filename, 'content': content, 'type': part.get_content_type() }) log(f"Found attachment: {filename}") # Send to RAG if attachments: for att in attachments: try: files = {'file': (att['filename'], att['content'])} data = {'collection': RAG_COLLECTION} response = requests.post( f"{RAG_URL}/ingest", files=files, data=data, timeout=60 ) if response.status_code == 200: log(f"✅ Uploaded {att['filename']} to {RAG_COLLECTION}") else: log(f"❌ Failed to upload {att['filename']}: {response.status_code}") except Exception as e: log(f"❌ Error uploading {att['filename']}: {e}") else: # Process email body text body = "" if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": body = part.get_payload(decode=True).decode('utf-8', errors='ignore') break else: body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') if body: try: response = requests.post( f"{RAG_URL}/ingest_text", json={ 'collection': RAG_COLLECTION, 'text': body, 'metadata': { 'source': f'email:{sender}', 'subject': subject, 'date': msg.get('Date') } }, timeout=60 ) if response.status_code == 200: log(f"✅ Uploaded email body to {RAG_COLLECTION}") else: log(f"❌ Failed to upload email body: {response.status_code}") except Exception as e: log(f"❌ Error uploading email body: {e}") return True except Exception as e: log(f"❌ Error processing email {msg_id}: {e}") return False def idle_loop(): """Main IDLE loop""" processed_ids = load_state() reconnect_delay = 5 while True: try: log(f"Connecting to {IMAP_HOST}:{IMAP_PORT}...") imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) log(f"Logging in as {IMAP_USER}...") imap.login(IMAP_USER, IMAP_PASS) # Check IDLE support typ, data = imap.capability() if b'IDLE' not in data[0]: log("❌ Server does not support IDLE!") time.sleep(60) continue imap.select('INBOX') log("Entering IDLE mode (waiting for new mail)...") reconnect_delay = 5 # Reset on success while True: # Send IDLE command tag = imap._new_tag().decode() imap.send(f"{tag} IDLE\r\n".encode()) # Wait for IDLE confirmation idle_confirmed = False while not idle_confirmed: line = imap.readline().decode() if line.startswith("+"): idle_confirmed = True elif line.startswith("*"): log(f"Server: {line.strip()}") # Wait for notifications with timeout idle_start = time.time() notification_received = False while time.time() - idle_start < IDLE_TIMEOUT: # Set socket timeout for periodic checks imap.socket().settimeout(1.0) try: line = imap.readline().decode() if "EXISTS" in line or "RECENT" in line: log(f"🔔 New mail notification: {line.strip()}") notification_received = True break elif line.startswith("b") or "BYE" in line: log(f"Server closed connection: {line.strip()}") raise ConnectionError("Server disconnected") except socket.timeout: continue except Exception as e: log(f"Read error: {e}") break # Exit IDLE mode imap.send(b"DONE\r\n") # Wait for DONE response done_confirmed = False while not done_confirmed: line = imap.readline().decode() if "OK" in line or line.startswith(tag): done_confirmed = True if notification_received: # Process new emails typ, data = imap.search(None, 'UNSEEN') if data[0]: msg_ids = data[0].decode().split() log(f"Processing {len(msg_ids)} new messages...") for msg_id in msg_ids: if msg_id not in processed_ids: if process_email(imap, msg_id): processed_ids.add(msg_id) save_state(processed_ids) else: log(f"Skipping already processed message {msg_id}") else: log("IDLE timeout - reconnecting...") break # Will reconnect and re-enter IDLE except KeyboardInterrupt: log("Interrupted by user - exiting") break except Exception as e: log(f"❌ Connection error: {e}") log(f"Reconnecting in {reconnect_delay} seconds...") time.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 300) # Max 5 min delay if __name__ == "__main__": import socket if not IMAP_USER or not IMAP_PASS: log("❌ Error: IMAP_USER and IMAP_PASS environment variables required") exit(1) if not RAG_COLLECTION: log("❌ Error: RAG_COLLECTION environment variable required") exit(1) log("=" * 60) log(f"IMAP IDLE Poller starting") log(f"Collection: {RAG_COLLECTION}") log(f"RAG URL: {RAG_URL}") log("=" * 60) idle_loop()