From 847d3bd384193d8f949381f4d128c5b43d372201 Mon Sep 17 00:00:00 2001 From: Adolfo Delorenzo Date: Mon, 16 Mar 2026 16:08:57 -0300 Subject: [PATCH] fixes --- .dockerignore | 6 + Dockerfile | 10 +- app/email_poller.py | 14 +- app/email_poller_idle.py | 279 +++++++++++++++++++++++++ app/email_poller_with_notifications.py | 262 +++++++++++++++++++++++ docker-compose.yml | 34 ++- 6 files changed, 590 insertions(+), 15 deletions(-) create mode 100644 .dockerignore create mode 100644 app/email_poller_idle.py create mode 100644 app/email_poller_with_notifications.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1d64063 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +data/ +logs/ +__pycache__/ +*.pyc +.git/ +.env diff --git a/Dockerfile b/Dockerfile index 13febe3..3a05808 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,10 +10,10 @@ RUN apt-get update && \ WORKDIR /app -# Install PyTorch with ROCm support first (big layer, cache it) +# Install PyTorch (CPU-only for Docker build - will use GPU at runtime if available) RUN pip install --no-cache-dir \ torch torchvision torchaudio \ - --index-url https://download.pytorch.org/whl/nightly/rocm7.2/ + --index-url https://download.pytorch.org/whl/cpu # Install remaining Python dependencies COPY app/requirements.txt . @@ -22,12 +22,6 @@ RUN pip install --no-cache-dir -r requirements.txt # Copy application code COPY app/ . -# Pre-download the embedding model at build time so startup is fast -RUN python -c "\ -from sentence_transformers import SentenceTransformer; \ -m = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'); \ -print('Model cached:', m.encode(['test']).shape)" - EXPOSE 8899 VOLUME ["/app/data", "/app/logs"] diff --git a/app/email_poller.py b/app/email_poller.py index 0e00cdb..4a9897b 100644 --- a/app/email_poller.py +++ b/app/email_poller.py @@ -65,8 +65,18 @@ logger = logging.getLogger("zeus-email-poller") # --------------------------------------------------------------------------- def load_state() -> dict: if os.path.exists(STATE_FILE): - with open(STATE_FILE) as f: - return json.load(f) + try: + with open(STATE_FILE) as f: + content = f.read().strip() + if not content: + return {"processed_uids": [], "last_check": None} + data = json.loads(content) + if isinstance(data, dict): + return data + else: + logger.warning(f"State file corrupted (not a dict), resetting") + except (json.JSONDecodeError, IOError) as e: + logger.warning(f"State file corrupted: {e}, resetting") return {"processed_uids": [], "last_check": None} diff --git a/app/email_poller_idle.py b/app/email_poller_idle.py new file mode 100644 index 0000000..0e5ead0 --- /dev/null +++ b/app/email_poller_idle.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +""" +IMAP IDLE Email Poller for RAG Integration +Replaces polling with push-based notifications. +""" + +import imaplib +import email +import os +import json +import time +import requests +from datetime import datetime +from pathlib import Path + +# Configuration from environment +IMAP_HOST = os.environ.get("IMAP_HOST", "mail.oe74.net") +IMAP_PORT = int(os.environ.get("IMAP_PORT", "993")) +IMAP_USER = os.environ.get("IMAP_USER") +IMAP_PASS = os.environ.get("IMAP_PASS") +RAG_URL = os.environ.get("RAG_URL", "http://moxie-rag:8899") +RAG_COLLECTION = os.environ.get("RAG_COLLECTION") +ALLOWED_SENDERS = os.environ.get("ALLOWED_SENDERS", "").split(",") +STATE_FILE = os.environ.get("STATE_FILE", "/app/data/idle_state.json") +LOG_DIR = os.environ.get("LOG_DIR", "/app/logs") + +# IDLE timeout (reconnect after this many seconds) +IDLE_TIMEOUT = 1800 # 30 minutes + +def log(msg): + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_msg = f"[{timestamp}] {msg}" + print(log_msg) + + # Also write to log file + log_file = Path(LOG_DIR) / f"{RAG_COLLECTION}_idle.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + with open(log_file, "a") as f: + f.write(log_msg + "\n") + +def load_state(): + """Load processed message IDs""" + try: + with open(STATE_FILE, "r") as f: + return set(json.load(f)) + except (FileNotFoundError, json.JSONDecodeError): + return set() + +def save_state(processed_ids): + """Save processed message IDs""" + Path(STATE_FILE).parent.mkdir(parents=True, exist_ok=True) + with open(STATE_FILE, "w") as f: + json.dump(list(processed_ids), f) + +def process_email(imap, msg_id): + """Download and process an email""" + try: + typ, data = imap.fetch(msg_id, '(RFC822)') + if not data or not data[0]: + return False + + raw_email = data[0][1] + msg = email.message_from_bytes(raw_email) + + sender = msg.get('From', '') + subject = msg.get('Subject', '') + + log(f"Processing email {msg_id}: {subject[:50]}... from {sender}") + + # Check allowed senders + if ALLOWED_SENDERS and ALLOWED_SENDERS[0]: + allowed = False + for allowed_sender in ALLOWED_SENDERS: + if allowed_sender.strip().lower() in sender.lower(): + allowed = True + break + if not allowed: + log(f"Skipping - sender not in allowlist") + return False + + # Process attachments + attachments = [] + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_maintype() == 'multipart': + continue + if part.get('Content-Disposition') is None: + continue + + filename = part.get_filename() + if filename: + content = part.get_payload(decode=True) + attachments.append({ + 'filename': filename, + 'content': content, + 'type': part.get_content_type() + }) + log(f"Found attachment: {filename}") + + # Send to RAG + if attachments: + for att in attachments: + try: + files = {'file': (att['filename'], att['content'])} + data = {'collection': RAG_COLLECTION} + + response = requests.post( + f"{RAG_URL}/ingest", + files=files, + data=data, + timeout=60 + ) + + if response.status_code == 200: + log(f"✅ Uploaded {att['filename']} to {RAG_COLLECTION}") + else: + log(f"❌ Failed to upload {att['filename']}: {response.status_code}") + + except Exception as e: + log(f"❌ Error uploading {att['filename']}: {e}") + else: + # Process email body text + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + body = part.get_payload(decode=True).decode('utf-8', errors='ignore') + break + else: + body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + + if body: + try: + response = requests.post( + f"{RAG_URL}/ingest_text", + json={ + 'collection': RAG_COLLECTION, + 'text': body, + 'metadata': { + 'source': f'email:{sender}', + 'subject': subject, + 'date': msg.get('Date') + } + }, + timeout=60 + ) + + if response.status_code == 200: + log(f"✅ Uploaded email body to {RAG_COLLECTION}") + else: + log(f"❌ Failed to upload email body: {response.status_code}") + + except Exception as e: + log(f"❌ Error uploading email body: {e}") + + return True + + except Exception as e: + log(f"❌ Error processing email {msg_id}: {e}") + return False + +def idle_loop(): + """Main IDLE loop""" + processed_ids = load_state() + reconnect_delay = 5 + + while True: + try: + log(f"Connecting to {IMAP_HOST}:{IMAP_PORT}...") + imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) + + log(f"Logging in as {IMAP_USER}...") + imap.login(IMAP_USER, IMAP_PASS) + + # Check IDLE support + typ, data = imap.capability() + if b'IDLE' not in data[0]: + log("❌ Server does not support IDLE!") + time.sleep(60) + continue + + imap.select('INBOX') + log("Entering IDLE mode (waiting for new mail)...") + reconnect_delay = 5 # Reset on success + + while True: + # Send IDLE command + tag = imap._new_tag().decode() + imap.send(f"{tag} IDLE\r\n".encode()) + + # Wait for IDLE confirmation + idle_confirmed = False + while not idle_confirmed: + line = imap.readline().decode() + if line.startswith("+"): + idle_confirmed = True + elif line.startswith("*"): + log(f"Server: {line.strip()}") + + # Wait for notifications with timeout + idle_start = time.time() + notification_received = False + + while time.time() - idle_start < IDLE_TIMEOUT: + # Set socket timeout for periodic checks + imap.socket().settimeout(1.0) + + try: + line = imap.readline().decode() + + if "EXISTS" in line or "RECENT" in line: + log(f"🔔 New mail notification: {line.strip()}") + notification_received = True + break + elif line.startswith("b") or "BYE" in line: + log(f"Server closed connection: {line.strip()}") + raise ConnectionError("Server disconnected") + + except socket.timeout: + continue + except Exception as e: + log(f"Read error: {e}") + break + + # Exit IDLE mode + imap.send(b"DONE\r\n") + + # Wait for DONE response + done_confirmed = False + while not done_confirmed: + line = imap.readline().decode() + if "OK" in line or line.startswith(tag): + done_confirmed = True + + if notification_received: + # Process new emails + typ, data = imap.search(None, 'UNSEEN') + if data[0]: + msg_ids = data[0].decode().split() + log(f"Processing {len(msg_ids)} new messages...") + + for msg_id in msg_ids: + if msg_id not in processed_ids: + if process_email(imap, msg_id): + processed_ids.add(msg_id) + save_state(processed_ids) + else: + log(f"Skipping already processed message {msg_id}") + else: + log("IDLE timeout - reconnecting...") + break # Will reconnect and re-enter IDLE + + except KeyboardInterrupt: + log("Interrupted by user - exiting") + break + except Exception as e: + log(f"❌ Connection error: {e}") + log(f"Reconnecting in {reconnect_delay} seconds...") + time.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 300) # Max 5 min delay + +if __name__ == "__main__": + import socket + + if not IMAP_USER or not IMAP_PASS: + log("❌ Error: IMAP_USER and IMAP_PASS environment variables required") + exit(1) + + if not RAG_COLLECTION: + log("❌ Error: RAG_COLLECTION environment variable required") + exit(1) + + log("=" * 60) + log(f"IMAP IDLE Poller starting") + log(f"Collection: {RAG_COLLECTION}") + log(f"RAG URL: {RAG_URL}") + log("=" * 60) + + idle_loop() diff --git a/app/email_poller_with_notifications.py b/app/email_poller_with_notifications.py new file mode 100644 index 0000000..cc85683 --- /dev/null +++ b/app/email_poller_with_notifications.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +""" +Email Poller with WhatsApp Notifications for RAG Integration +Sends WhatsApp message after successful document upload. +""" + +import imaplib +import email +import os +import json +import time +import requests +from datetime import datetime +from pathlib import Path + +# Configuration from environment +IMAP_HOST = os.environ.get("IMAP_HOST", "mail.oe74.net") +IMAP_PORT = int(os.environ.get("IMAP_PORT", "993")) +IMAP_USER = os.environ.get("IMAP_USER") +IMAP_PASS = os.environ.get("IMAP_PASS") +RAG_URL = os.environ.get("RAG_URL", "http://moxie-rag:8899") +RAG_COLLECTION = os.environ.get("RAG_COLLECTION") +ALLOWED_SENDERS = os.environ.get("ALLOWED_SENDERS", "").split(",") +STATE_FILE = os.environ.get("STATE_FILE", "/app/data/email_state.json") +LOG_DIR = os.environ.get("LOG_DIR", "/app/logs") +NOTIFY_NUMBER = os.environ.get("NOTIFY_NUMBER", "+50660151288") # WhatsApp number to notify + +def log(msg): + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_msg = f"[{timestamp}] {msg}" + print(log_msg) + + log_file = Path(LOG_DIR) / f"{RAG_COLLECTION}_poller.log" + log_file.parent.mkdir(parents=True, exist_ok=True) + with open(log_file, "a") as f: + f.write(log_msg + "\n") + +def send_whatsapp_notification(filename, sender, collection): + """Send WhatsApp notification via OpenClaw gateway""" + try: + # Use the OpenClaw gateway to send WhatsApp + gateway_url = os.environ.get("GATEWAY_URL", "http://host.docker.internal:18789") + gateway_token = os.environ.get("GATEWAY_TOKEN", "") + + message = f"📧 *Library Update*\n\n" \ + f"*Document:* `{filename}`\n" \ + f"*From:* {sender}\n" \ + f"*Collection:* {collection}\n" \ + f"*Time:* {datetime.now().strftime('%H:%M:%S')}\n\n" \ + f"Added to your Library successfully ✅" + + headers = {} + if gateway_token: + headers["Authorization"] = f"Bearer {gateway_token}" + + response = requests.post( + f"{gateway_url}/message/send", + headers=headers, + json={ + "channel": "whatsapp", + "target": NOTIFY_NUMBER, + "message": message + }, + timeout=10 + ) + + if response.status_code == 200: + log(f"✅ WhatsApp notification sent to {NOTIFY_NUMBER}") + else: + log(f"⚠️ WhatsApp notification failed: {response.status_code}") + + except Exception as e: + log(f"⚠️ Failed to send WhatsApp: {e}") + +def load_state(): + try: + with open(STATE_FILE, "r") as f: + return set(json.load(f)) + except (FileNotFoundError, json.JSONDecodeError): + return set() + +def save_state(processed_ids): + Path(STATE_FILE).parent.mkdir(parents=True, exist_ok=True) + with open(STATE_FILE, "w") as f: + json.dump(list(processed_ids), f) + +def process_email(imap, msg_id, processed_ids): + """Download and process an email""" + try: + typ, data = imap.fetch(msg_id, '(RFC822)') + if not data or not data[0]: + return False + + raw_email = data[0][1] + msg = email.message_from_bytes(raw_email) + + sender = msg.get('From', '') + subject = msg.get('Subject', '') + + log(f"Processing email {msg_id}: {subject[:50]}... from {sender}") + + # Check allowed senders + if ALLOWED_SENDERS and ALLOWED_SENDERS[0]: + allowed = any(a.strip().lower() in sender.lower() for a in ALLOWED_SENDERS) + if not allowed: + log(f"Skipping - sender not in allowlist") + return False + + # Process attachments + attachments = [] + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_maintype() == 'multipart': + continue + if part.get('Content-Disposition') is None: + continue + + filename = part.get_filename() + if filename: + content = part.get_payload(decode=True) + attachments.append({ + 'filename': filename, + 'content': content, + 'type': part.get_content_type() + }) + log(f"Found attachment: {filename}") + + # Send to RAG + uploaded_files = [] + if attachments: + for att in attachments: + try: + files = {'file': (att['filename'], att['content'])} + data = {'collection': RAG_COLLECTION} + + response = requests.post( + f"{RAG_URL}/ingest", + files=files, + data=data, + timeout=60 + ) + + if response.status_code == 200: + log(f"✅ Uploaded {att['filename']} to {RAG_COLLECTION}") + uploaded_files.append(att['filename']) + else: + log(f"❌ Failed to upload {att['filename']}: {response.status_code}") + + except Exception as e: + log(f"❌ Error uploading {att['filename']}: {e}") + + # Send WhatsApp notification for successful uploads + if uploaded_files: + for filename in uploaded_files: + send_whatsapp_notification(filename, sender, RAG_COLLECTION) + else: + # Process email body text + body = "" + if msg.is_multipart(): + for part in msg.walk(): + if part.get_content_type() == "text/plain": + body = part.get_payload(decode=True).decode('utf-8', errors='ignore') + break + else: + body = msg.get_payload(decode=True).decode('utf-8', errors='ignore') + + if body: + try: + response = requests.post( + f"{RAG_URL}/ingest_text", + json={ + 'collection': RAG_COLLECTION, + 'text': body, + 'metadata': { + 'source': f'email:{sender}', + 'subject': subject, + 'date': msg.get('Date') + } + }, + timeout=60 + ) + + if response.status_code == 200: + log(f"✅ Uploaded email body to {RAG_COLLECTION}") + send_whatsapp_notification("Email body text", sender, RAG_COLLECTION) + else: + log(f"❌ Failed to upload email body: {response.status_code}") + + except Exception as e: + log(f"❌ Error uploading email body: {e}") + + return True + + except Exception as e: + log(f"❌ Error processing email {msg_id}: {e}") + return False + +def poll_loop(): + """Main polling loop""" + processed_ids = load_state() + poll_interval = int(os.environ.get("POLL_INTERVAL", "300")) + reconnect_delay = 5 + + while True: + try: + log(f"Connecting to {IMAP_HOST}:{IMAP_PORT}...") + imap = imaplib.IMAP4_SSL(IMAP_HOST, IMAP_PORT) + + log(f"Logging in as {IMAP_USER}...") + imap.login(IMAP_USER, IMAP_PASS) + + imap.select('INBOX') + log(f"Polling every {poll_interval} seconds...") + reconnect_delay = 5 + + while True: + # Search for unseen messages + typ, data = imap.search(None, 'UNSEEN') + + if data[0]: + msg_ids = data[0].decode().split() + log(f"Found {len(msg_ids)} new messages") + + for msg_id in msg_ids: + if msg_id not in processed_ids: + if process_email(imap, msg_id, processed_ids): + processed_ids.add(msg_id) + save_state(processed_ids) + else: + log(f"Skipping already processed message {msg_id}") + + # Wait before next poll + time.sleep(poll_interval) + + # Keep connection alive + imap.noop() + + except KeyboardInterrupt: + log("Interrupted by user - exiting") + break + except Exception as e: + log(f"❌ Connection error: {e}") + log(f"Reconnecting in {reconnect_delay} seconds...") + time.sleep(reconnect_delay) + reconnect_delay = min(reconnect_delay * 2, 300) + +if __name__ == "__main__": + if not IMAP_USER or not IMAP_PASS: + log("❌ Error: IMAP_USER and IMAP_PASS required") + exit(1) + + if not RAG_COLLECTION: + log("❌ Error: RAG_COLLECTION required") + exit(1) + + log("=" * 60) + log(f"Email Poller with WhatsApp notifications starting") + log(f"Collection: {RAG_COLLECTION}") + log(f"Notify: {NOTIFY_NUMBER}") + log("=" * 60) + + poll_loop() diff --git a/docker-compose.yml b/docker-compose.yml index b5b8d09..d5cb03c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ services: rag: build: . container_name: moxie-rag - restart: unless-stopped + restart: always ports: - "8899:8899" volumes: @@ -13,6 +13,8 @@ services: - CHROMA_DIR=/app/data/chromadb - UPLOAD_DIR=/app/data/uploads - LOG_DIR=/app/logs + - TORCH_BLAS_PREFER_HIPBLASLT=0 + - TORCH_ROCM_AOTRITON_ENABLE_EXPERIMENTAL=1 devices: - /dev/kfd:/dev/kfd - /dev/dri:/dev/dri @@ -28,7 +30,7 @@ services: poller-zeus: build: . container_name: zeus-email-poller - restart: unless-stopped + restart: always command: python email_poller.py volumes: - ./data:/app/data @@ -41,7 +43,7 @@ services: - RAG_URL=http://moxie-rag:8899 - RAG_COLLECTION=zeus_docs - ALLOWED_SENDERS=isabella.isg@gmail.com - - POLL_INTERVAL=60 + - POLL_INTERVAL=300 - STATE_FILE=/app/data/zeus_email_state.json - LOG_DIR=/app/logs depends_on: @@ -50,7 +52,7 @@ services: poller-moxie: build: . container_name: moxie-email-poller - restart: unless-stopped + restart: always command: python email_poller.py volumes: - ./data:/app/data @@ -62,8 +64,30 @@ services: - IMAP_PASS=Xn1R#JThrcn0k - RAG_URL=http://moxie-rag:8899 - RAG_COLLECTION=adolfo_docs - - POLL_INTERVAL=60 + - POLL_INTERVAL=300 - STATE_FILE=/app/data/moxie_email_state.json - LOG_DIR=/app/logs depends_on: - rag + + poller-bruno: + build: . + container_name: bruno-email-poller + restart: always + command: python email_poller.py + volumes: + - ./data:/app/data + - ./logs:/app/logs + environment: + - IMAP_HOST=mail.oe74.net + - IMAP_PORT=993 + - IMAP_USER=moxie-ai@zz11.net + - IMAP_PASS=mistouente2019# + - RAG_URL=http://moxie-rag:8899 + - RAG_COLLECTION=bruno_docs + - ALLOWED_SENDERS=bruno@delorenzo.net + - POLL_INTERVAL=300 + - STATE_FILE=/app/data/bruno_email_state.json + - LOG_DIR=/app/logs + depends_on: + - rag