import os import time import tempfile import requests import json import logging from dotenv import load_dotenv from flask import Flask, render_template, request, jsonify, Response, send_file, send_from_directory, stream_with_context, g from flask_cors import CORS, cross_origin import whisper import torch import ollama from whisper_config import MODEL_SIZE, GPU_OPTIMIZATIONS, TRANSCRIBE_OPTIONS from pywebpush import webpush, WebPushException import base64 from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend import gc # For garbage collection from functools import wraps import traceback from validators import Validators import atexit import threading from datetime import datetime, timedelta from rate_limiter import rate_limit, rate_limiter, cleanup_rate_limiter, ip_filter_check # Load environment variables from .env file load_dotenv() # Initialize logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Import configuration and secrets management from config import init_app as init_config from secrets_manager import init_app as init_secrets from session_manager import init_app as init_session_manager, track_resource from request_size_limiter import RequestSizeLimiter, limit_request_size from error_logger import ErrorLogger, log_errors, log_performance, log_exception, get_logger from memory_manager import MemoryManager, AudioProcessingContext, with_memory_management # Error boundary decorator for Flask routes def with_error_boundary(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: # Log the error with full context log_exception( e, message=f"Error in {func.__name__}", endpoint=request.endpoint, method=request.method, path=request.path, ip=request.remote_addr, function=func.__name__, module=func.__module__ ) # Log security event for suspicious errors if any(keyword in str(e).lower() for keyword in ['inject', 'attack', 'malicious', 'unauthorized']): app.error_logger.log_security( 'suspicious_error', severity='warning', error_type=type(e).__name__, error_message=str(e), endpoint=request.endpoint, ip=request.remote_addr ) # Return appropriate error response error_message = str(e) if app.debug else "An internal error occurred" return jsonify({ 'success': False, 'error': error_message, 'component': func.__name__, 'request_id': getattr(g, 'request_id', None) }), 500 return wrapper app = Flask(__name__) # Initialize configuration and secrets management init_config(app) init_secrets(app) # Configure CORS with security best practices cors_config = { "origins": app.config.get('CORS_ORIGINS', ['*']), "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type", "Authorization", "X-Requested-With", "X-Admin-Token"], "expose_headers": ["Content-Range", "X-Content-Range"], "supports_credentials": True, "max_age": 3600 # Cache preflight requests for 1 hour } # Apply CORS configuration CORS(app, resources={ r"/api/*": cors_config, r"/transcribe": cors_config, r"/translate": cors_config, r"/translate/stream": cors_config, r"/speak": cors_config, r"/get_audio/*": cors_config, r"/check_tts_server": cors_config, r"/update_tts_config": cors_config, r"/health/*": cors_config, r"/admin/*": { **cors_config, "origins": app.config.get('ADMIN_CORS_ORIGINS', ['http://localhost:*']) } }) # Configure upload folder upload_folder = app.config.get('UPLOAD_FOLDER') if not upload_folder: upload_folder = os.path.join(tempfile.gettempdir(), 'talk2me_uploads') # Ensure upload folder exists with proper permissions try: os.makedirs(upload_folder, mode=0o755, exist_ok=True) logger.info(f"Using upload folder: {upload_folder}") except Exception as e: logger.error(f"Failed to create upload folder {upload_folder}: {str(e)}") # Fall back to system temp upload_folder = tempfile.mkdtemp(prefix='talk2me_') logger.warning(f"Falling back to temporary folder: {upload_folder}") app.config['UPLOAD_FOLDER'] = upload_folder # Initialize session management after upload folder is configured init_session_manager(app) # Initialize request size limiter request_size_limiter = RequestSizeLimiter(app, { 'max_content_length': app.config.get('MAX_CONTENT_LENGTH', 50 * 1024 * 1024), # 50MB default 'max_audio_size': app.config.get('MAX_AUDIO_SIZE', 25 * 1024 * 1024), # 25MB for audio 'max_json_size': app.config.get('MAX_JSON_SIZE', 1 * 1024 * 1024), # 1MB for JSON 'max_image_size': app.config.get('MAX_IMAGE_SIZE', 10 * 1024 * 1024), # 10MB for images }) # Initialize error logging system error_logger = ErrorLogger(app, { 'log_level': app.config.get('LOG_LEVEL', 'INFO'), 'log_file': app.config.get('LOG_FILE', 'logs/talk2me.log'), 'error_log_file': app.config.get('ERROR_LOG_FILE', 'logs/errors.log'), 'max_bytes': app.config.get('LOG_MAX_BYTES', 50 * 1024 * 1024), # 50MB 'backup_count': app.config.get('LOG_BACKUP_COUNT', 10) }) # Update logger to use the new system logger = get_logger(__name__) # Initialize memory management memory_manager = MemoryManager(app, { 'memory_threshold_mb': app.config.get('MEMORY_THRESHOLD_MB', 4096), 'gpu_memory_threshold_mb': app.config.get('GPU_MEMORY_THRESHOLD_MB', 2048), 'cleanup_interval': app.config.get('MEMORY_CLEANUP_INTERVAL', 30) }) # TTS configuration is already loaded from config.py # Warn if TTS API key is not set if not app.config.get('TTS_API_KEY'): logger.warning("TTS_API_KEY not set. TTS functionality may not work.") # Rate limiting storage rate_limit_storage = {} # Temporary file cleanup configuration TEMP_FILE_MAX_AGE = 300 # 5 minutes CLEANUP_INTERVAL = 60 # Run cleanup every minute temp_file_registry = {} # Track temporary files and their creation times def cleanup_temp_files(): """Clean up old temporary files to prevent disk space exhaustion""" try: current_time = datetime.now() files_to_remove = [] # Check registered temporary files for filepath, created_time in list(temp_file_registry.items()): if current_time - created_time > timedelta(seconds=TEMP_FILE_MAX_AGE): files_to_remove.append(filepath) # Remove old files for filepath in files_to_remove: try: if os.path.exists(filepath): os.remove(filepath) logger.info(f"Cleaned up temporary file: {filepath}") temp_file_registry.pop(filepath, None) except Exception as e: logger.error(f"Failed to remove temporary file {filepath}: {str(e)}") # Also clean any orphaned files in upload folder if os.path.exists(app.config['UPLOAD_FOLDER']): for filename in os.listdir(app.config['UPLOAD_FOLDER']): filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.isfile(filepath): # Check file age file_age = current_time - datetime.fromtimestamp(os.path.getmtime(filepath)) if file_age > timedelta(seconds=TEMP_FILE_MAX_AGE): try: os.remove(filepath) logger.info(f"Cleaned up orphaned file: {filepath}") except Exception as e: logger.error(f"Failed to remove orphaned file {filepath}: {str(e)}") logger.debug(f"Cleanup completed. Files in registry: {len(temp_file_registry)}") except Exception as e: logger.error(f"Error during temp file cleanup: {str(e)}") def register_temp_file(filepath): """Register a temporary file for tracking and cleanup""" temp_file_registry[filepath] = datetime.now() # Schedule periodic cleanup def run_cleanup_loop(): """Run cleanup in a separate thread""" while True: time.sleep(CLEANUP_INTERVAL) cleanup_temp_files() # Start cleanup thread cleanup_thread = threading.Thread(target=run_cleanup_loop, daemon=True) cleanup_thread.start() # Rate limiter cleanup thread def run_rate_limiter_cleanup(): """Run rate limiter cleanup periodically""" while True: time.sleep(3600) # Run every hour cleanup_rate_limiter() logger.info("Rate limiter cleanup completed") rate_limiter_thread = threading.Thread(target=run_rate_limiter_cleanup, daemon=True) rate_limiter_thread.start() # Cleanup on app shutdown @atexit.register def cleanup_on_exit(): """Clean up all temporary files on app shutdown""" logger.info("Cleaning up temporary files on shutdown...") for filepath in list(temp_file_registry.keys()): try: if os.path.exists(filepath): os.remove(filepath) except Exception as e: logger.error(f"Failed to remove {filepath} on shutdown: {str(e)}") # Clean entire upload folder try: import shutil if os.path.exists(app.config['UPLOAD_FOLDER']): shutil.rmtree(app.config['UPLOAD_FOLDER']) logger.info("Removed temporary upload folder") except Exception as e: logger.error(f"Failed to remove upload folder on shutdown: {str(e)}") # Generate VAPID keys for push notifications if not os.path.exists('vapid_private.pem'): # Generate new VAPID keys private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) public_key = private_key.public_key() # Save private key with open('vapid_private.pem', 'wb') as f: f.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() )) # Save public key with open('vapid_public.pem', 'wb') as f: f.write(public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo )) # Load VAPID keys with open('vapid_private.pem', 'rb') as f: vapid_private_key = f.read() with open('vapid_public.pem', 'rb') as f: vapid_public_pem = f.read() vapid_public_key = serialization.load_pem_public_key( vapid_public_pem, backend=default_backend() ) # Convert public key to base64 for client public_numbers = vapid_public_key.public_numbers() x = public_numbers.x.to_bytes(32, byteorder='big') y = public_numbers.y.to_bytes(32, byteorder='big') vapid_public_key_base64 = base64.urlsafe_b64encode(b'\x04' + x + y).decode('utf-8').rstrip('=') # Store subscriptions in memory (in production, use a database) push_subscriptions = [] @app.route('/') def root_files(filename): # Check if requested file is one of the common icon filenames common_icons = [ 'favicon.ico', 'apple-touch-icon.png', 'apple-touch-icon-precomposed.png', 'apple-touch-icon-120x120.png', 'apple-touch-icon-120x120-precomposed.png' ] if filename in common_icons: # Map to appropriate icon in static/icons icon_mapping = { 'favicon.ico': 'favicon.ico', 'apple-touch-icon.png': 'apple-icon-180x180.png', 'apple-touch-icon-precomposed.png': 'apple-icon-180x180.png', 'apple-touch-icon-120x120.png': 'apple-icon-120x120.png', 'apple-touch-icon-120x120-precomposed.png': 'apple-icon-120x120.png' } return send_from_directory('static/icons', icon_mapping.get(filename, 'apple-icon-180x180.png')) # If not an icon, return 404 return "File not found", 404 @app.route('/favicon.ico') def favicon(): return send_from_directory('static/icons', 'favicon.ico') @app.route('/apple-touch-icon.png') def apple_touch_icon(): return send_from_directory('static/icons', 'apple-icon-180x180.png') @app.route('/apple-touch-icon-precomposed.png') def apple_touch_icon_precomposed(): return send_from_directory('static/icons', 'apple-icon-180x180.png') @app.route('/apple-touch-icon-120x120.png') def apple_touch_icon_120(): return send_from_directory('static/icons', 'apple-icon-120x120.png') @app.route('/apple-touch-icon-120x120-precomposed.png') def apple_touch_icon_120_precomposed(): return send_from_directory('static/icons', 'apple-icon-120x120.png') # Add this route to your Flask app @app.route('/service-worker.js') def service_worker(): return app.send_static_file('service-worker.js') # Make sure static files are served properly app.static_folder = 'static' @app.route('/static/icons/') def serve_icon(filename): return send_from_directory('static/icons', filename) @app.route('/api/push-public-key', methods=['GET']) @rate_limit(requests_per_minute=30) def push_public_key(): return jsonify({'publicKey': vapid_public_key_base64}) @app.route('/api/push-subscribe', methods=['POST']) @rate_limit(requests_per_minute=10, requests_per_hour=50) def push_subscribe(): try: subscription = request.json # Store subscription (in production, use a database) if subscription not in push_subscriptions: push_subscriptions.append(subscription) logger.info(f"New push subscription registered. Total subscriptions: {len(push_subscriptions)}") return jsonify({'success': True}) except Exception as e: logger.error(f"Failed to register push subscription: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/push-unsubscribe', methods=['POST']) def push_unsubscribe(): try: subscription = request.json # Remove subscription if subscription in push_subscriptions: push_subscriptions.remove(subscription) logger.info(f"Push subscription removed. Total subscriptions: {len(push_subscriptions)}") return jsonify({'success': True}) except Exception as e: logger.error(f"Failed to unsubscribe: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 def send_push_notification(title, body, icon='/static/icons/icon-192x192.png', badge='/static/icons/icon-192x192.png', tag=None, data=None): """Send push notification to all subscribed clients""" claims = { "sub": "mailto:admin@talk2me.app", "exp": int(time.time()) + 86400 # 24 hours } notification_sent = 0 for subscription in push_subscriptions[:]: # Create a copy to iterate try: webpush( subscription_info=subscription, data=json.dumps({ 'title': title, 'body': body, 'icon': icon, 'badge': badge, 'tag': tag or 'talk2me-notification', 'data': data or {} }), vapid_private_key=vapid_private_key, vapid_claims=claims ) notification_sent += 1 except WebPushException as e: logger.error(f"Failed to send push notification: {str(e)}") # Remove invalid subscription if e.response and e.response.status_code == 410: push_subscriptions.remove(subscription) logger.info(f"Sent {notification_sent} push notifications") return notification_sent # Add a route to check TTS server status @app.route('/check_tts_server', methods=['GET']) def check_tts_server(): try: # Get current TTS server configuration tts_server_url = app.config.get('TTS_SERVER_URL', 'http://localhost:5050/v1/audio/speech') tts_api_key = app.config.get('TTS_API_KEY', '') # Try a simple request to the TTS server with a minimal payload headers = { "Content-Type": "application/json", "Authorization": f"Bearer {tts_api_key}" } # For status check, we'll just check if the server responds to a HEAD request # or a minimal POST with a very short text to minimize bandwidth usage try: response = requests.head( tts_server_url.split('/v1/audio/speech')[0] + '/v1/models', headers=headers, timeout=5 ) status_code = response.status_code except: # If HEAD request fails, try minimal POST response = requests.post( tts_server_url, headers=headers, json={ "input": "Test", "voice": "echo", "response_format": "mp3", "speed": 1.0 }, timeout=5 ) status_code = response.status_code if status_code in [200, 401, 403]: # Even auth errors mean server is running logger.info(f"TTS server is reachable at {tts_server_url}") return jsonify({ 'status': 'online' if status_code == 200 else 'auth_error', 'message': 'TTS server is online' if status_code == 200 else 'Authentication error. Check API key.', 'url': tts_server_url, 'code': status_code }) else: logger.warning(f"TTS server returned status code {status_code}") return jsonify({ 'status': 'error', 'message': f'TTS server returned status code {status_code}', 'url': tts_server_url, 'code': status_code }) except requests.exceptions.RequestException as e: logger.error(f"Cannot connect to TTS server: {str(e)}") return jsonify({ 'status': 'error', 'message': f'Cannot connect to TTS server: {str(e)}', 'url': app.config.get('TTS_SERVER_URL', 'http://localhost:5050/v1/audio/speech') }) @app.route('/update_tts_config', methods=['POST']) @with_error_boundary def update_tts_config(): try: data = request.json # Validate and sanitize URL tts_server_url = data.get('server_url') if tts_server_url: validated_url = Validators.validate_url(tts_server_url) if not validated_url: return jsonify({ 'success': False, 'error': 'Invalid server URL format' }), 400 app.config['TTS_SERVER_URL'] = validated_url logger.info(f"Updated TTS server URL to {validated_url}") # Validate and sanitize API key tts_api_key = data.get('api_key') if tts_api_key: validated_key = Validators.validate_api_key(tts_api_key) if not validated_key: return jsonify({ 'success': False, 'error': 'Invalid API key format' }), 400 app.config['TTS_API_KEY'] = validated_key logger.info("Updated TTS API key") return jsonify({ 'success': True, 'message': 'TTS configuration updated', 'url': app.config.get('TTS_SERVER_URL') }) except Exception as e: logger.error(f"Failed to update TTS config: {str(e)}") return jsonify({ 'success': False, 'error': f'Failed to update TTS config: {str(e)}' }), 500 # Initialize Whisper model with GPU optimization logger.info("Initializing Whisper model with GPU optimization...") # Detect available acceleration if torch.cuda.is_available(): device = torch.device("cuda") # Check if it's AMD or NVIDIA try: gpu_name = torch.cuda.get_device_name(0) if 'AMD' in gpu_name or 'Radeon' in gpu_name: logger.info(f"AMD GPU detected via ROCm: {gpu_name}") logger.info("Using ROCm acceleration (limited optimizations)") else: logger.info(f"NVIDIA GPU detected: {gpu_name}") logger.info("Using CUDA acceleration with full optimizations") except: logger.info("GPU detected - using CUDA/ROCm acceleration") elif hasattr(torch.backends, 'mps') and torch.backends.mps.is_available(): device = torch.device("mps") logger.info("Apple Silicon detected - using Metal Performance Shaders") else: device = torch.device("cpu") logger.info("No GPU acceleration available - using CPU") logger.info(f"Using device: {device}") # Load model with optimizations whisper_model = whisper.load_model(MODEL_SIZE, device=device) # Enable GPU optimizations based on device type if device.type == 'cuda': # NVIDIA GPU optimizations try: # Enable TensorFloat-32 for faster computation on Ampere GPUs torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True # Enable cudnn autotuner for optimized convolution algorithms torch.backends.cudnn.benchmark = True # Set model to evaluation mode and enable half precision for faster inference whisper_model.eval() whisper_model = whisper_model.half() # FP16 for faster GPU inference # Pre-allocate GPU memory to avoid fragmentation torch.cuda.empty_cache() # Warm up the model with a dummy input to cache CUDA kernels logger.info("Warming up GPU with dummy inference...") with torch.no_grad(): # Create a dummy audio tensor (30 seconds at 16kHz) dummy_audio = torch.randn(1, 16000 * 30).to(device).half() _ = whisper_model.encode(whisper.pad_or_trim(dummy_audio)) logger.info(f"GPU memory allocated: {torch.cuda.memory_allocated() / 1024**2:.2f} MB") logger.info("Whisper model loaded and optimized for NVIDIA GPU") except Exception as e: logger.warning(f"Some NVIDIA optimizations failed: {e}") elif device.type == 'mps': # Apple Silicon optimizations whisper_model.eval() # MPS doesn't support half precision well yet logger.info("Whisper model loaded and optimized for Apple Silicon") else: # CPU mode whisper_model.eval() logger.info("Whisper model loaded (CPU mode)") # Register model with memory manager memory_manager.set_whisper_model(whisper_model) app.whisper_model = whisper_model # Supported languages SUPPORTED_LANGUAGES = { "ar": "Arabic", "hy": "Armenian", "az": "Azerbaijani", "en": "English", "fr": "French", "ka": "Georgian", "kk": "Kazakh", "zh": "Mandarin", "fa": "Farsi", "pt": "Portuguese", "ru": "Russian", "es": "Spanish", "tr": "Turkish", "uz": "Uzbek" } # Map language names to language codes LANGUAGE_TO_CODE = {v: k for k, v in SUPPORTED_LANGUAGES.items()} # Map language names to OpenAI TTS voice options LANGUAGE_TO_VOICE = { "Arabic": "ar-EG-ShakirNeural", # Using OpenAI general voices "Armenian": "echo", # as OpenAI doesn't have specific voices "Azerbaijani": "az-AZ-BanuNeural", # for all these languages "English": "en-GB-RyanNeural", # We'll use the available voices "French": "fr-FR-DeniseNeural", # and rely on the translation being "Georgian": "ka-GE-GiorgiNeural", # in the correct language text "Kazakh": "kk-KZ-DauletNeural", "Mandarin": "zh-CN-YunjianNeural", "Farsi": "fa-IR-FaridNeural", "Portuguese": "pt-BR-ThalitaNeural", "Russian": "ru-RU-SvetlanaNeural", "Spanish": "es-CR-MariaNeural", "Turkish": "tr-TR-EmelNeural", "Uzbek": "uz-UZ-SardorNeural" } @app.route('/') def index(): return render_template('index.html', languages=sorted(SUPPORTED_LANGUAGES.values())) @app.route('/transcribe', methods=['POST']) @rate_limit(requests_per_minute=10, requests_per_hour=100, check_size=True) @limit_request_size(max_audio_size=25 * 1024 * 1024) # 25MB limit for audio @with_error_boundary @track_resource('audio_file') @log_performance('transcribe_audio') @with_memory_management def transcribe(): # Use memory management context with AudioProcessingContext(app.memory_manager, name='transcribe') as ctx: if 'audio' not in request.files: return jsonify({'error': 'No audio file provided'}), 400 audio_file = request.files['audio'] # Validate audio file valid, error_msg = Validators.validate_audio_file(audio_file) if not valid: return jsonify({'error': error_msg}), 400 # Validate and sanitize language code source_lang = request.form.get('source_lang', '') allowed_languages = set(SUPPORTED_LANGUAGES.values()) source_lang = Validators.validate_language_code(source_lang, allowed_languages) or '' # Save the audio file temporarily with unique name temp_filename = f'input_audio_{int(time.time() * 1000)}.wav' temp_path = os.path.join(app.config['UPLOAD_FOLDER'], temp_filename) # Ensure file handle is properly closed with open(temp_path, 'wb') as f: audio_file.save(f) register_temp_file(temp_path) ctx.add_temp_file(temp_path) # Register with context for cleanup # Add to session resources if hasattr(g, 'session_manager') and hasattr(g, 'user_session'): file_size = os.path.getsize(temp_path) g.session_manager.add_resource( session_id=g.user_session.session_id, resource_type='audio_file', path=temp_path, size_bytes=file_size, metadata={'filename': temp_filename, 'purpose': 'transcription'} ) try: # Check if we should auto-detect language auto_detect = source_lang == 'auto' or source_lang == '' # Use Whisper for transcription with GPU optimizations transcribe_options = { "task": "transcribe", "temperature": 0, # Disable temperature sampling for faster inference "best_of": 1, # Disable beam search for faster inference "beam_size": 1, # Disable beam search "fp16": device.type == 'cuda', # Use FP16 on GPU "condition_on_previous_text": False, # Faster inference "compression_ratio_threshold": 2.4, "logprob_threshold": -1.0, "no_speech_threshold": 0.6 } # Only set language if not auto-detecting if not auto_detect: transcribe_options["language"] = LANGUAGE_TO_CODE.get(source_lang, None) # Clear GPU cache before transcription if device.type == 'cuda': torch.cuda.empty_cache() # Transcribe with optimized settings with torch.no_grad(): # Disable gradient computation result = whisper_model.transcribe( temp_path, **transcribe_options ) transcribed_text = result["text"] # Get detected language if auto-detection was used detected_language = None if auto_detect and 'language' in result: # Convert language code back to full name detected_code = result['language'] for lang_name, lang_code in LANGUAGE_TO_CODE.items(): if lang_code == detected_code: detected_language = lang_name break # Log detected language logger.info(f"Auto-detected language: {detected_language} ({detected_code})") # Send notification if push is enabled if len(push_subscriptions) > 0: send_push_notification( title="Transcription Complete", body=f"Successfully transcribed: {transcribed_text[:50]}...", tag="transcription-complete" ) response = { 'success': True, 'text': transcribed_text } # Include detected language if auto-detection was used if detected_language: response['detected_language'] = detected_language return jsonify(response) except Exception as e: logger.error(f"Transcription error: {str(e)}") return jsonify({'error': f'Transcription failed: {str(e)}'}), 500 finally: # Clean up the temporary file try: if 'temp_path' in locals() and os.path.exists(temp_path): os.remove(temp_path) temp_file_registry.pop(temp_path, None) except Exception as e: logger.error(f"Failed to clean up temp file: {e}") # Force garbage collection to free memory if device.type == 'cuda': torch.cuda.empty_cache() torch.cuda.synchronize() # Ensure all CUDA operations are complete gc.collect() @app.route('/translate', methods=['POST']) @rate_limit(requests_per_minute=20, requests_per_hour=300, check_size=True) @limit_request_size(max_size=1 * 1024 * 1024) # 1MB limit for JSON @with_error_boundary @log_performance('translate_text') def translate(): try: # Validate request size if not Validators.validate_json_size(request.json, max_size_kb=100): return jsonify({'error': 'Request too large'}), 413 data = request.json # Sanitize and validate text text = data.get('text', '') text = Validators.sanitize_text(text) if not text: return jsonify({'error': 'No text provided'}), 400 # Validate language codes allowed_languages = set(SUPPORTED_LANGUAGES.values()) source_lang = Validators.validate_language_code( data.get('source_lang', ''), allowed_languages ) or 'auto' target_lang = Validators.validate_language_code( data.get('target_lang', ''), allowed_languages ) if not target_lang: return jsonify({'error': 'Invalid target language'}), 400 # Create a prompt for Gemma 3 translation prompt = f""" Translate the following text from {source_lang} to {target_lang}: "{text}" Provide only the translation without any additional text. """ # Use Ollama to interact with Gemma 3 response = ollama.chat( model="gemma3:27b", messages=[ { "role": "user", "content": prompt } ] ) translated_text = response['message']['content'].strip() # Send notification if push is enabled if len(push_subscriptions) > 0: send_push_notification( title="Translation Complete", body=f"Translated from {source_lang} to {target_lang}", tag="translation-complete", data={'translation': translated_text[:100]} ) return jsonify({ 'success': True, 'translation': translated_text }) except Exception as e: logger.error(f"Translation error: {str(e)}") return jsonify({'error': f'Translation failed: {str(e)}'}), 500 @app.route('/translate/stream', methods=['POST']) @rate_limit(requests_per_minute=10, requests_per_hour=150, check_size=True) @limit_request_size(max_size=1 * 1024 * 1024) # 1MB limit for JSON @with_error_boundary def translate_stream(): """Streaming translation endpoint for reduced latency""" try: # Validate request size if not Validators.validate_json_size(request.json, max_size_kb=100): return jsonify({'error': 'Request too large'}), 413 data = request.json # Sanitize and validate text text = data.get('text', '') text = Validators.sanitize_text(text) if not text: return jsonify({'error': 'No text provided'}), 400 # Validate language codes allowed_languages = set(SUPPORTED_LANGUAGES.values()) source_lang = Validators.validate_language_code( data.get('source_lang', ''), allowed_languages ) or 'auto' target_lang = Validators.validate_language_code( data.get('target_lang', ''), allowed_languages ) if not target_lang: return jsonify({'error': 'Invalid target language'}), 400 # Create prompt for streaming translation prompt = f""" Translate the following text from {source_lang} to {target_lang}: {text} Provide only the translation, no explanations. """ def generate(): """Generator function for streaming response""" try: # Send initial connection yield f"data: {json.dumps({'type': 'start', 'source_lang': source_lang, 'target_lang': target_lang})}\n\n" # Stream translation from Ollama stream = ollama.generate( model='gemma2:9b', prompt=prompt, stream=True, options={ 'temperature': 0.5, 'top_p': 0.9, 'max_tokens': 2048 } ) accumulated_text = "" word_buffer = "" for chunk in stream: if 'response' in chunk: chunk_text = chunk['response'] word_buffer += chunk_text # Send complete words/phrases for better UX if ' ' in word_buffer or '\n' in word_buffer or '.' in word_buffer or ',' in word_buffer: accumulated_text += word_buffer yield f"data: {json.dumps({'type': 'chunk', 'text': word_buffer})}\n\n" word_buffer = "" # Send any remaining text if word_buffer: accumulated_text += word_buffer yield f"data: {json.dumps({'type': 'chunk', 'text': word_buffer})}\n\n" # Send completion signal yield f"data: {json.dumps({'type': 'complete', 'full_text': accumulated_text.strip()})}\n\n" except Exception as e: logger.error(f"Streaming translation error: {str(e)}") yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n" return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no', # Disable Nginx buffering 'Connection': 'keep-alive' } ) except Exception as e: logger.error(f"Translation stream error: {str(e)}") return jsonify({'error': f'Translation failed: {str(e)}'}), 500 @app.route('/speak', methods=['POST']) @rate_limit(requests_per_minute=15, requests_per_hour=200, check_size=True) @limit_request_size(max_size=1 * 1024 * 1024) # 1MB limit for JSON @with_error_boundary @track_resource('audio_file') def speak(): try: # Validate request size if not Validators.validate_json_size(request.json, max_size_kb=100): return jsonify({'error': 'Request too large'}), 413 data = request.json # Sanitize and validate text text = data.get('text', '') text = Validators.sanitize_text(text, max_length=5000) # Shorter limit for TTS if not text: return jsonify({'error': 'No text provided'}), 400 # Validate language code allowed_languages = set(SUPPORTED_LANGUAGES.values()) language = Validators.validate_language_code( data.get('language', ''), allowed_languages ) if not language: return jsonify({'error': 'Invalid language'}), 400 voice = LANGUAGE_TO_VOICE.get(language, 'echo') # Default to echo if language not found # Get TTS server URL and API key from config tts_server_url = app.config.get('TTS_SERVER_URL', 'http://localhost:5050/v1/audio/speech') tts_api_key = app.config.get('TTS_API_KEY', '') try: # Request TTS from the OpenAI Edge TTS server logger.info(f"Sending TTS request to {tts_server_url}") headers = { "Content-Type": "application/json", "Authorization": f"Bearer {tts_api_key}" } # Log request details for debugging logger.info(f"Text for TTS: {text}") logger.info(f"Selected voice: {voice}") # Proper OpenAI TTS payload payload = { "input": text, "voice": voice, "response_format": "mp3", "speed": 1.0 } logger.debug(f"Full TTS request payload: {payload}") # Dump the payload to ensure proper JSON formatting payload_json = json.dumps(payload) logger.debug(f"Serialized payload: {payload_json}") tts_response = requests.post( tts_server_url, headers=headers, json=payload, # Use json parameter to ensure proper serialization timeout=15 # Longer timeout for audio generation ) logger.info(f"TTS response status: {tts_response.status_code}") if tts_response.status_code != 200: error_msg = f'TTS request failed with status {tts_response.status_code}' logger.error(error_msg) # Try to get error details from response if possible try: error_details = tts_response.json() logger.error(f"Error details: {error_details}") error_msg = f"{error_msg}: {error_details.get('error', {}).get('message', 'Unknown error')}" except Exception as e: logger.error(f"Could not parse error response: {str(e)}") # Log the raw response content logger.error(f"Raw response: {tts_response.text[:200]}") return jsonify({'error': error_msg}), 500 # The response contains the audio data directly temp_audio_filename = f'output_{int(time.time() * 1000)}.mp3' temp_audio_path = os.path.join(app.config['UPLOAD_FOLDER'], temp_audio_filename) with open(temp_audio_path, 'wb') as f: f.write(tts_response.content) # Register for cleanup register_temp_file(temp_audio_path) # Add to session resources if hasattr(g, 'session_manager') and hasattr(g, 'user_session'): file_size = os.path.getsize(temp_audio_path) g.session_manager.add_resource( session_id=g.user_session.session_id, resource_type='audio_file', path=temp_audio_path, size_bytes=file_size, metadata={'filename': temp_audio_filename, 'purpose': 'tts_output'} ) return jsonify({ 'success': True, 'audio_url': f'/get_audio/{temp_audio_filename}' }) except requests.exceptions.RequestException as e: error_msg = f'Failed to connect to TTS server: {str(e)}' logger.error(error_msg) return jsonify({'error': error_msg}), 500 except Exception as e: logger.error(f"TTS error: {str(e)}") return jsonify({'error': f'TTS failed: {str(e)}'}), 500 @app.route('/get_audio/') def get_audio(filename): try: # Validate filename to prevent directory traversal safe_filename = Validators.sanitize_filename(filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], safe_filename) # Check if file exists if not os.path.exists(file_path): return jsonify({'error': 'Audio file not found'}), 404 # Register file for cleanup if not already registered if file_path not in temp_file_registry: register_temp_file(file_path) # Serve the file with appropriate headers response = send_file( file_path, mimetype='audio/mpeg', as_attachment=False, download_name=safe_filename ) # Add cache control headers to prevent repeated downloads response.headers['Cache-Control'] = 'public, max-age=300' # Cache for 5 minutes return response except Exception as e: logger.error(f"Audio retrieval error: {str(e)}") return jsonify({'error': f'Audio retrieval failed: {str(e)}'}), 500 # Error logging endpoint for frontend error reporting @app.route('/api/log-error', methods=['POST']) @rate_limit(requests_per_minute=10, requests_per_hour=100) def log_error(): """Log frontend errors for monitoring""" try: error_data = request.json error_info = error_data.get('errorInfo', {}) error_details = error_data.get('error', {}) # Log the error logger.error(f"Frontend error in {error_info.get('component', 'unknown')}: {error_details.get('message', 'No message')}") logger.error(f"Stack trace: {error_details.get('stack', 'No stack trace')}") logger.error(f"User agent: {error_info.get('userAgent', 'Unknown')}") logger.error(f"URL: {error_info.get('url', 'Unknown')}") # In production, you might want to send this to a monitoring service # like Sentry, LogRocket, or your own analytics return jsonify({'success': True}) except Exception as e: logger.error(f"Failed to log frontend error: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 # Health check endpoints for monitoring @app.route('/health', methods=['GET']) def health_check(): """Basic health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': time.time(), 'service': 'voice-translator' }) @app.route('/health/detailed', methods=['GET']) def detailed_health_check(): """Detailed health check with component status""" health_status = { 'status': 'healthy', 'timestamp': time.time(), 'components': { 'whisper': {'status': 'unknown'}, 'ollama': {'status': 'unknown'}, 'tts': {'status': 'unknown'}, 'gpu': {'status': 'unknown'} }, 'metrics': {} } # Check Whisper model try: if whisper_model is not None: health_status['components']['whisper']['status'] = 'healthy' health_status['components']['whisper']['model_size'] = MODEL_SIZE else: health_status['components']['whisper']['status'] = 'unhealthy' health_status['status'] = 'degraded' except Exception as e: health_status['components']['whisper']['status'] = 'unhealthy' health_status['components']['whisper']['error'] = str(e) health_status['status'] = 'unhealthy' # Check GPU availability try: if torch.cuda.is_available(): health_status['components']['gpu']['status'] = 'healthy' health_status['components']['gpu']['device'] = torch.cuda.get_device_name(0) health_status['components']['gpu']['memory_allocated'] = f"{torch.cuda.memory_allocated(0) / 1024**2:.2f} MB" health_status['components']['gpu']['memory_reserved'] = f"{torch.cuda.memory_reserved(0) / 1024**2:.2f} MB" elif torch.backends.mps.is_available(): health_status['components']['gpu']['status'] = 'healthy' health_status['components']['gpu']['device'] = 'Apple Silicon GPU' else: health_status['components']['gpu']['status'] = 'not_available' health_status['components']['gpu']['device'] = 'CPU' except Exception as e: health_status['components']['gpu']['status'] = 'error' health_status['components']['gpu']['error'] = str(e) # Check Ollama connection try: ollama_models = ollama.list() health_status['components']['ollama']['status'] = 'healthy' health_status['components']['ollama']['available_models'] = len(ollama_models.get('models', [])) except Exception as e: health_status['components']['ollama']['status'] = 'unhealthy' health_status['components']['ollama']['error'] = str(e) health_status['status'] = 'degraded' # Check TTS server try: tts_server_url = app.config.get('TTS_SERVER_URL', 'http://localhost:5050/v1/audio/speech') tts_response = requests.get(tts_server_url.replace('/v1/audio/speech', '/health'), timeout=5) if tts_response.status_code == 200: health_status['components']['tts']['status'] = 'healthy' health_status['components']['tts']['server_url'] = tts_server_url else: health_status['components']['tts']['status'] = 'unhealthy' health_status['components']['tts']['http_status'] = tts_response.status_code health_status['status'] = 'degraded' except Exception as e: health_status['components']['tts']['status'] = 'unhealthy' health_status['components']['tts']['error'] = str(e) health_status['status'] = 'degraded' # Add system metrics health_status['metrics']['uptime'] = time.time() - app.start_time if hasattr(app, 'start_time') else 0 health_status['metrics']['request_count'] = getattr(app, 'request_count', 0) # Add size limits info if hasattr(app, 'request_size_limiter'): health_status['metrics']['size_limits'] = { k: f"{v / 1024 / 1024:.1f}MB" if v > 1024 * 1024 else f"{v / 1024:.1f}KB" for k, v in app.request_size_limiter.limits.items() } # Set appropriate HTTP status code http_status = 200 if health_status['status'] == 'healthy' else 503 if health_status['status'] == 'unhealthy' else 200 return jsonify(health_status), http_status @app.route('/health/ready', methods=['GET']) def readiness_check(): """Readiness probe - checks if service is ready to accept traffic""" try: # Check if all critical components are loaded if whisper_model is None: return jsonify({'status': 'not_ready', 'reason': 'Whisper model not loaded'}), 503 # Check Ollama connection ollama.list() return jsonify({'status': 'ready', 'timestamp': time.time()}) except Exception as e: return jsonify({'status': 'not_ready', 'reason': str(e)}), 503 @app.route('/health/live', methods=['GET']) def liveness_check(): """Liveness probe - basic check to see if process is alive""" return jsonify({'status': 'alive', 'timestamp': time.time()}) @app.route('/health/storage', methods=['GET']) def storage_health(): """Check temporary file storage health""" try: upload_folder = app.config['UPLOAD_FOLDER'] # Count files and calculate total size file_count = 0 total_size = 0 oldest_file_age = 0 if os.path.exists(upload_folder): current_time = datetime.now() for filename in os.listdir(upload_folder): filepath = os.path.join(upload_folder, filename) if os.path.isfile(filepath): file_count += 1 total_size += os.path.getsize(filepath) file_age = (current_time - datetime.fromtimestamp(os.path.getmtime(filepath))).total_seconds() oldest_file_age = max(oldest_file_age, file_age) # Get disk usage for the upload folder try: import shutil disk_usage = shutil.disk_usage(upload_folder if os.path.exists(upload_folder) else '/') disk_free_percent = (disk_usage.free / disk_usage.total) * 100 except: disk_free_percent = -1 return jsonify({ 'status': 'healthy' if file_count < 100 and total_size < 100 * 1024 * 1024 else 'warning', 'temp_files': { 'count': file_count, 'total_size_mb': round(total_size / (1024 * 1024), 2), 'oldest_file_age_seconds': round(oldest_file_age), 'registry_size': len(temp_file_registry), 'max_age_seconds': TEMP_FILE_MAX_AGE }, 'disk': { 'free_percent': round(disk_free_percent, 2) }, 'cleanup': { 'interval_seconds': CLEANUP_INTERVAL, 'last_run': 'running' } }) except Exception as e: logger.error(f"Storage health check error: {str(e)}") return jsonify({ 'status': 'error', 'error': str(e) }), 500 @app.route('/admin/cleanup', methods=['POST']) def manual_cleanup(): """Manual cleanup endpoint for emergency situations""" try: # Simple authentication check (in production, use proper auth) auth_token = request.headers.get('X-Admin-Token') expected_token = os.environ.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 # Run cleanup logger.info("Manual cleanup triggered") cleanup_temp_files() # Get current status upload_folder = app.config['UPLOAD_FOLDER'] file_count = 0 total_size = 0 if os.path.exists(upload_folder): for filename in os.listdir(upload_folder): filepath = os.path.join(upload_folder, filename) if os.path.isfile(filepath): file_count += 1 total_size += os.path.getsize(filepath) return jsonify({ 'success': True, 'message': 'Cleanup completed', 'remaining_files': file_count, 'remaining_size_mb': round(total_size / (1024 * 1024), 2) }) except Exception as e: logger.error(f"Manual cleanup error: {str(e)}") return jsonify({'error': str(e)}), 500 # Initialize app start time for metrics app.start_time = time.time() app.request_count = 0 # Middleware to count requests and check IP filtering @app.before_request def before_request(): app.request_count = getattr(app, 'request_count', 0) + 1 # Check IP filtering response = ip_filter_check() if response: return response # Global error handlers @app.errorhandler(404) def not_found_error(error): logger.warning(f"404 error: {request.url}") return jsonify({ 'success': False, 'error': 'Resource not found', 'status': 404 }), 404 @app.errorhandler(500) def internal_error(error): logger.error(f"500 error: {str(error)}") logger.error(traceback.format_exc()) return jsonify({ 'success': False, 'error': 'Internal server error', 'status': 500 }), 500 @app.errorhandler(Exception) def handle_exception(error): # Log the error logger.error(f"Unhandled exception: {str(error)}") logger.error(traceback.format_exc()) # Return JSON instead of HTML for HTTP errors if hasattr(error, 'code'): return jsonify({ 'success': False, 'error': str(error), 'status': error.code }), error.code # Non-HTTP exceptions return jsonify({ 'success': False, 'error': 'An unexpected error occurred', 'status': 500 }), 500 @app.route('/admin/rate-limits', methods=['GET']) @rate_limit(requests_per_minute=10) def get_rate_limits(): """Get current rate limit configuration""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 return jsonify({ 'default_limits': rate_limiter.default_limits, 'endpoint_limits': rate_limiter.endpoint_limits, 'global_limits': rate_limiter.global_limits }) except Exception as e: logger.error(f"Failed to get rate limits: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/rate-limits/stats', methods=['GET']) @rate_limit(requests_per_minute=10) def get_rate_limit_stats(): """Get rate limiting statistics""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 # Get client ID from query param or header client_id = request.args.get('client_id') if client_id: stats = rate_limiter.get_client_stats(client_id) return jsonify({'client_stats': stats}) # Return global stats return jsonify({ 'total_buckets': len(rate_limiter.buckets), 'concurrent_requests': rate_limiter.concurrent_requests, 'blocked_ips': list(rate_limiter.blocked_ips), 'temp_blocked_ips': len(rate_limiter.temp_blocked_ips) }) except Exception as e: logger.error(f"Failed to get rate limit stats: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/block-ip', methods=['POST']) @rate_limit(requests_per_minute=5) def block_ip(): """Block an IP address""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 data = request.json ip = data.get('ip') duration = data.get('duration', 3600) # Default 1 hour permanent = data.get('permanent', False) if not ip: return jsonify({'error': 'IP address required'}), 400 if permanent: rate_limiter.blocked_ips.add(ip) logger.warning(f"IP {ip} permanently blocked by admin") else: rate_limiter.block_ip_temporarily(ip, duration) return jsonify({ 'success': True, 'ip': ip, 'permanent': permanent, 'duration': duration if not permanent else None }) except Exception as e: logger.error(f"Failed to block IP: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/sessions', methods=['GET']) @rate_limit(requests_per_minute=10) def get_sessions(): """Get information about all active sessions""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'session_manager'): sessions = app.session_manager.get_all_sessions_info() stats = app.session_manager.get_stats() return jsonify({ 'sessions': sessions, 'stats': stats }) else: return jsonify({'error': 'Session manager not initialized'}), 500 except Exception as e: logger.error(f"Failed to get sessions: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/sessions/', methods=['GET']) @rate_limit(requests_per_minute=20) def get_session_details(session_id): """Get detailed information about a specific session""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'session_manager'): session_info = app.session_manager.get_session_info(session_id) if session_info: return jsonify(session_info) else: return jsonify({'error': 'Session not found'}), 404 else: return jsonify({'error': 'Session manager not initialized'}), 500 except Exception as e: logger.error(f"Failed to get session details: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/sessions//cleanup', methods=['POST']) @rate_limit(requests_per_minute=5) def cleanup_session(session_id): """Manually cleanup a specific session""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'session_manager'): success = app.session_manager.cleanup_session(session_id) if success: return jsonify({ 'success': True, 'message': f'Session {session_id} cleaned up successfully' }) else: return jsonify({'error': 'Session not found'}), 404 else: return jsonify({'error': 'Session manager not initialized'}), 500 except Exception as e: logger.error(f"Failed to cleanup session: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/sessions/metrics', methods=['GET']) @rate_limit(requests_per_minute=30) def get_session_metrics(): """Get session management metrics for monitoring""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'session_manager'): metrics = app.session_manager.export_metrics() return jsonify(metrics) else: return jsonify({'error': 'Session manager not initialized'}), 500 except Exception as e: logger.error(f"Failed to get session metrics: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/size-limits', methods=['GET']) @rate_limit(requests_per_minute=10) def get_size_limits(): """Get current request size limits""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'request_size_limiter'): return jsonify({ 'limits': app.request_size_limiter.limits, 'limits_human': { k: f"{v / 1024 / 1024:.1f}MB" if v > 1024 * 1024 else f"{v / 1024:.1f}KB" for k, v in app.request_size_limiter.limits.items() } }) else: return jsonify({'error': 'Size limiter not initialized'}), 500 except Exception as e: logger.error(f"Failed to get size limits: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/size-limits', methods=['POST']) @rate_limit(requests_per_minute=5) def update_size_limits(): """Update request size limits""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 data = request.json if not data: return jsonify({'error': 'No data provided'}), 400 # Validate limits valid_keys = {'max_content_length', 'max_audio_size', 'max_json_size', 'max_image_size'} updates = {} for key, value in data.items(): if key in valid_keys: try: # Accept values in MB and convert to bytes if isinstance(value, str) and value.endswith('MB'): value = float(value[:-2]) * 1024 * 1024 elif isinstance(value, str) and value.endswith('KB'): value = float(value[:-2]) * 1024 else: value = int(value) # Enforce reasonable limits if value < 1024: # Minimum 1KB return jsonify({'error': f'{key} too small (min 1KB)'}), 400 if value > 500 * 1024 * 1024: # Maximum 500MB return jsonify({'error': f'{key} too large (max 500MB)'}), 400 updates[key] = value except ValueError: return jsonify({'error': f'Invalid value for {key}'}), 400 if not updates: return jsonify({'error': 'No valid limits provided'}), 400 # Update limits old_limits = app.request_size_limiter.update_limits(**updates) logger.info(f"Size limits updated by admin: {updates}") return jsonify({ 'success': True, 'old_limits': old_limits, 'new_limits': app.request_size_limiter.limits, 'new_limits_human': { k: f"{v / 1024 / 1024:.1f}MB" if v > 1024 * 1024 else f"{v / 1024:.1f}KB" for k, v in app.request_size_limiter.limits.items() } }) except Exception as e: logger.error(f"Failed to update size limits: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/logs/errors', methods=['GET']) @rate_limit(requests_per_minute=10) def get_error_logs(): """Get recent error log entries""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 # Get error summary if hasattr(app, 'error_logger'): error_summary = app.error_logger.get_error_summary() # Read last 100 lines from error log error_log_path = app.error_logger.error_log_file recent_errors = [] if os.path.exists(error_log_path): try: with open(error_log_path, 'r') as f: lines = f.readlines() # Get last 100 lines for line in lines[-100:]: try: error_entry = json.loads(line) recent_errors.append(error_entry) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Failed to read error log: {e}") return jsonify({ 'error_summary': error_summary, 'recent_errors': recent_errors[-50:], # Last 50 errors 'total_errors_logged': len(recent_errors) }) else: return jsonify({'error': 'Error logger not initialized'}), 500 except Exception as e: logger.error(f"Failed to get error logs: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/logs/performance', methods=['GET']) @rate_limit(requests_per_minute=10) def get_performance_logs(): """Get performance metrics""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 # Read performance log perf_log_path = 'logs/performance.log' metrics = { 'endpoints': {}, 'slow_requests': [], 'average_response_times': {} } if os.path.exists(perf_log_path): try: with open(perf_log_path, 'r') as f: for line in f.readlines()[-1000:]: # Last 1000 entries try: entry = json.loads(line) if 'extra_fields' in entry: metric = entry['extra_fields'].get('metric', '') duration = entry['extra_fields'].get('duration_ms', 0) # Track endpoint metrics if metric not in metrics['endpoints']: metrics['endpoints'][metric] = { 'count': 0, 'total_duration': 0, 'max_duration': 0, 'min_duration': float('inf') } metrics['endpoints'][metric]['count'] += 1 metrics['endpoints'][metric]['total_duration'] += duration metrics['endpoints'][metric]['max_duration'] = max( metrics['endpoints'][metric]['max_duration'], duration ) metrics['endpoints'][metric]['min_duration'] = min( metrics['endpoints'][metric]['min_duration'], duration ) # Track slow requests if duration > 1000: # Over 1 second metrics['slow_requests'].append({ 'metric': metric, 'duration_ms': duration, 'timestamp': entry.get('timestamp') }) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Failed to read performance log: {e}") # Calculate averages for endpoint, data in metrics['endpoints'].items(): if data['count'] > 0: metrics['average_response_times'][endpoint] = { 'avg_ms': data['total_duration'] / data['count'], 'max_ms': data['max_duration'], 'min_ms': data['min_duration'] if data['min_duration'] != float('inf') else 0, 'count': data['count'] } # Sort slow requests by duration metrics['slow_requests'].sort(key=lambda x: x['duration_ms'], reverse=True) metrics['slow_requests'] = metrics['slow_requests'][:20] # Top 20 slowest return jsonify({ 'performance_metrics': metrics['average_response_times'], 'slow_requests': metrics['slow_requests'] }) except Exception as e: logger.error(f"Failed to get performance logs: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/logs/security', methods=['GET']) @rate_limit(requests_per_minute=10) def get_security_logs(): """Get security event logs""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 # Read security log security_log_path = 'logs/security.log' security_events = [] if os.path.exists(security_log_path): try: with open(security_log_path, 'r') as f: for line in f.readlines()[-200:]: # Last 200 entries try: event = json.loads(line) security_events.append(event) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Failed to read security log: {e}") # Group by event type event_summary = {} for event in security_events: if 'extra_fields' in event: event_type = event['extra_fields'].get('event', 'unknown') if event_type not in event_summary: event_summary[event_type] = 0 event_summary[event_type] += 1 return jsonify({ 'security_events': security_events[-50:], # Last 50 events 'event_summary': event_summary, 'total_events': len(security_events) }) except Exception as e: logger.error(f"Failed to get security logs: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/memory', methods=['GET']) @rate_limit(requests_per_minute=10) def get_memory_stats(): """Get memory usage statistics""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'memory_manager'): metrics = app.memory_manager.get_metrics() return jsonify(metrics) else: return jsonify({'error': 'Memory manager not initialized'}), 500 except Exception as e: logger.error(f"Failed to get memory stats: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/admin/memory/cleanup', methods=['POST']) @rate_limit(requests_per_minute=5) def trigger_memory_cleanup(): """Manually trigger memory cleanup""" try: # Simple authentication check auth_token = request.headers.get('X-Admin-Token') expected_token = app.config.get('ADMIN_TOKEN', 'default-admin-token') if auth_token != expected_token: return jsonify({'error': 'Unauthorized'}), 401 if hasattr(app, 'memory_manager'): # Get stats before cleanup before_stats = app.memory_manager.get_memory_stats() # Perform aggressive cleanup app.memory_manager.cleanup_memory(aggressive=True) # Get stats after cleanup after_stats = app.memory_manager.get_memory_stats() return jsonify({ 'success': True, 'before': { 'process_mb': before_stats.process_memory_mb, 'gpu_mb': before_stats.gpu_memory_mb }, 'after': { 'process_mb': after_stats.process_memory_mb, 'gpu_mb': after_stats.gpu_memory_mb }, 'freed': { 'process_mb': before_stats.process_memory_mb - after_stats.process_memory_mb, 'gpu_mb': before_stats.gpu_memory_mb - after_stats.gpu_memory_mb } }) else: return jsonify({'error': 'Memory manager not initialized'}), 500 except Exception as e: logger.error(f"Failed to trigger memory cleanup: {str(e)}") return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5005, debug=True)