""" Input validation and sanitization for the Talk2Me application """ import re import html from typing import Optional, Dict, Any, Tuple import os class Validators: # Maximum sizes MAX_TEXT_LENGTH = 10000 MAX_AUDIO_SIZE = 25 * 1024 * 1024 # 25MB MAX_URL_LENGTH = 2048 MAX_API_KEY_LENGTH = 128 # Allowed audio formats ALLOWED_AUDIO_EXTENSIONS = {'.webm', '.ogg', '.wav', '.mp3', '.mp4', '.m4a'} ALLOWED_AUDIO_MIMETYPES = { 'audio/webm', 'audio/ogg', 'audio/wav', 'audio/mp3', 'audio/mpeg', 'audio/mp4', 'audio/x-m4a', 'audio/x-wav' } @staticmethod def sanitize_text(text: str, max_length: int = None) -> str: """Sanitize text input by removing dangerous characters""" if not isinstance(text, str): return "" if max_length is None: max_length = Validators.MAX_TEXT_LENGTH # Trim and limit length text = text.strip()[:max_length] # Remove null bytes text = text.replace('\x00', '') # Remove control characters except newlines and tabs text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text) return text @staticmethod def sanitize_html(text: str) -> str: """Escape HTML to prevent XSS""" if not isinstance(text, str): return "" return html.escape(text) @staticmethod def validate_language_code(code: str, allowed_languages: set) -> Optional[str]: """Validate language code against allowed list""" if not code or not isinstance(code, str): return None code = code.strip().lower() # Check if it's in the allowed list or is 'auto' if code in allowed_languages or code == 'auto': return code return None @staticmethod def validate_audio_file(file_storage) -> Tuple[bool, Optional[str]]: """Validate uploaded audio file""" if not file_storage: return False, "No file provided" # Check file size file_storage.seek(0, os.SEEK_END) size = file_storage.tell() file_storage.seek(0) if size > Validators.MAX_AUDIO_SIZE: return False, f"File size exceeds {Validators.MAX_AUDIO_SIZE // (1024*1024)}MB limit" # Check file extension if file_storage.filename: ext = os.path.splitext(file_storage.filename.lower())[1] if ext not in Validators.ALLOWED_AUDIO_EXTENSIONS: return False, "Invalid audio file type" # Check MIME type if available if hasattr(file_storage, 'content_type') and file_storage.content_type: if file_storage.content_type not in Validators.ALLOWED_AUDIO_MIMETYPES: # Allow generic application/octet-stream as browsers sometimes use this if file_storage.content_type != 'application/octet-stream': return False, "Invalid audio MIME type" return True, None @staticmethod def validate_url(url: str) -> Optional[str]: """Validate and sanitize URL""" if not url or not isinstance(url, str): return None url = url.strip() # Check length if len(url) > Validators.MAX_URL_LENGTH: return None # Basic URL pattern check url_pattern = re.compile( r'^https?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4 r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6 r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) if not url_pattern.match(url): return None # Prevent some common injection attempts dangerous_patterns = [ 'javascript:', 'data:', 'vbscript:', 'file:', 'about:', 'chrome:' ] if any(pattern in url.lower() for pattern in dangerous_patterns): return None return url @staticmethod def validate_api_key(key: str) -> Optional[str]: """Validate API key format""" if not key or not isinstance(key, str): return None key = key.strip() # Check length if len(key) < 20 or len(key) > Validators.MAX_API_KEY_LENGTH: return None # Only allow alphanumeric, dash, and underscore if not re.match(r'^[a-zA-Z0-9\-_]+$', key): return None return key @staticmethod def sanitize_filename(filename: str) -> str: """Sanitize filename to prevent directory traversal""" if not filename or not isinstance(filename, str): return "file" # Remove any path components filename = os.path.basename(filename) # Remove dangerous characters filename = re.sub(r'[^a-zA-Z0-9.\-_]', '_', filename) # Limit length if len(filename) > 255: name, ext = os.path.splitext(filename) max_name_length = 255 - len(ext) filename = name[:max_name_length] + ext # Don't allow hidden files if filename.startswith('.'): filename = '_' + filename[1:] return filename or "file" @staticmethod def validate_json_size(data: Dict[str, Any], max_size_kb: int = 1024) -> bool: """Check if JSON data size is within limits""" try: import json json_str = json.dumps(data) size_kb = len(json_str.encode('utf-8')) / 1024 return size_kb <= max_size_kb except: return False @staticmethod def validate_settings(settings: Dict[str, Any]) -> Tuple[bool, Dict[str, Any], list]: """Validate settings object""" errors = [] sanitized = {} # Boolean settings bool_settings = [ 'notificationsEnabled', 'notifyTranscription', 'notifyTranslation', 'notifyErrors', 'offlineMode' ] for setting in bool_settings: if setting in settings: sanitized[setting] = bool(settings[setting]) # URL validation if 'ttsServerUrl' in settings and settings['ttsServerUrl']: url = Validators.validate_url(settings['ttsServerUrl']) if not url: errors.append('Invalid TTS server URL') else: sanitized['ttsServerUrl'] = url # API key validation if 'ttsApiKey' in settings and settings['ttsApiKey']: key = Validators.validate_api_key(settings['ttsApiKey']) if not key: errors.append('Invalid API key format') else: sanitized['ttsApiKey'] = key return len(errors) == 0, sanitized, errors @staticmethod def rate_limit_check(identifier: str, action: str, max_requests: int = 10, window_seconds: int = 60, storage: Dict = None) -> bool: """ Simple rate limiting check Returns True if request is allowed, False if rate limited """ import time if storage is None: return True # Can't track without storage key = f"{identifier}:{action}" current_time = time.time() window_start = current_time - window_seconds # Get or create request list if key not in storage: storage[key] = [] # Remove old requests outside the window storage[key] = [t for t in storage[key] if t > window_start] # Check if limit exceeded if len(storage[key]) >= max_requests: return False # Add current request storage[key].append(current_time) return True