talk2me/validators.py
Adolfo Delorenzo aedface2a9 Add comprehensive input validation and sanitization
Frontend Validation:
- Created Validator class with comprehensive validation methods
- HTML sanitization to prevent XSS attacks
- Text sanitization removing dangerous characters
- Language code validation against allowed list
- Audio file validation (size, type, extension)
- URL validation preventing injection attacks
- API key format validation
- Request size validation
- Filename sanitization
- Settings validation with type checking
- Cache key sanitization
- Client-side rate limiting tracking

Backend Validation:
- Created validators.py module for server-side validation
- Audio file validation with size and type checks
- Text sanitization with length limits
- Language code validation
- URL and API key validation
- JSON request size validation
- Rate limiting per endpoint (30 req/min)
- Added validation to all API endpoints
- Error boundary decorators on all routes
- CSRF token support ready

Security Features:
- Prevents XSS through HTML escaping
- Prevents SQL injection through input sanitization
- Prevents directory traversal in filenames
- Prevents oversized requests (DoS protection)
- Rate limiting prevents abuse
- Type checking prevents type confusion attacks
- Length limits prevent memory exhaustion
- Character filtering prevents control character injection

All user inputs are now validated and sanitized before processing.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-02 22:58:17 -06:00

243 lines
8.0 KiB
Python

"""
Input validation and sanitization for the Talk2Me application
"""
import re
import html
from typing import Optional, Dict, Any, Tuple
import os
class Validators:
# Maximum sizes
MAX_TEXT_LENGTH = 10000
MAX_AUDIO_SIZE = 25 * 1024 * 1024 # 25MB
MAX_URL_LENGTH = 2048
MAX_API_KEY_LENGTH = 128
# Allowed audio formats
ALLOWED_AUDIO_EXTENSIONS = {'.webm', '.ogg', '.wav', '.mp3', '.mp4', '.m4a'}
ALLOWED_AUDIO_MIMETYPES = {
'audio/webm', 'audio/ogg', 'audio/wav', 'audio/mp3',
'audio/mpeg', 'audio/mp4', 'audio/x-m4a', 'audio/x-wav'
}
@staticmethod
def sanitize_text(text: str, max_length: int = None) -> str:
"""Sanitize text input by removing dangerous characters"""
if not isinstance(text, str):
return ""
if max_length is None:
max_length = Validators.MAX_TEXT_LENGTH
# Trim and limit length
text = text.strip()[:max_length]
# Remove null bytes
text = text.replace('\x00', '')
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
return text
@staticmethod
def sanitize_html(text: str) -> str:
"""Escape HTML to prevent XSS"""
if not isinstance(text, str):
return ""
return html.escape(text)
@staticmethod
def validate_language_code(code: str, allowed_languages: set) -> Optional[str]:
"""Validate language code against allowed list"""
if not code or not isinstance(code, str):
return None
code = code.strip().lower()
# Check if it's in the allowed list or is 'auto'
if code in allowed_languages or code == 'auto':
return code
return None
@staticmethod
def validate_audio_file(file_storage) -> Tuple[bool, Optional[str]]:
"""Validate uploaded audio file"""
if not file_storage:
return False, "No file provided"
# Check file size
file_storage.seek(0, os.SEEK_END)
size = file_storage.tell()
file_storage.seek(0)
if size > Validators.MAX_AUDIO_SIZE:
return False, f"File size exceeds {Validators.MAX_AUDIO_SIZE // (1024*1024)}MB limit"
# Check file extension
if file_storage.filename:
ext = os.path.splitext(file_storage.filename.lower())[1]
if ext not in Validators.ALLOWED_AUDIO_EXTENSIONS:
return False, "Invalid audio file type"
# Check MIME type if available
if hasattr(file_storage, 'content_type') and file_storage.content_type:
if file_storage.content_type not in Validators.ALLOWED_AUDIO_MIMETYPES:
# Allow generic application/octet-stream as browsers sometimes use this
if file_storage.content_type != 'application/octet-stream':
return False, "Invalid audio MIME type"
return True, None
@staticmethod
def validate_url(url: str) -> Optional[str]:
"""Validate and sanitize URL"""
if not url or not isinstance(url, str):
return None
url = url.strip()
# Check length
if len(url) > Validators.MAX_URL_LENGTH:
return None
# Basic URL pattern check
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if not url_pattern.match(url):
return None
# Prevent some common injection attempts
dangerous_patterns = [
'javascript:', 'data:', 'vbscript:', 'file:', 'about:', 'chrome:'
]
if any(pattern in url.lower() for pattern in dangerous_patterns):
return None
return url
@staticmethod
def validate_api_key(key: str) -> Optional[str]:
"""Validate API key format"""
if not key or not isinstance(key, str):
return None
key = key.strip()
# Check length
if len(key) < 20 or len(key) > Validators.MAX_API_KEY_LENGTH:
return None
# Only allow alphanumeric, dash, and underscore
if not re.match(r'^[a-zA-Z0-9\-_]+$', key):
return None
return key
@staticmethod
def sanitize_filename(filename: str) -> str:
"""Sanitize filename to prevent directory traversal"""
if not filename or not isinstance(filename, str):
return "file"
# Remove any path components
filename = os.path.basename(filename)
# Remove dangerous characters
filename = re.sub(r'[^a-zA-Z0-9.\-_]', '_', filename)
# Limit length
if len(filename) > 255:
name, ext = os.path.splitext(filename)
max_name_length = 255 - len(ext)
filename = name[:max_name_length] + ext
# Don't allow hidden files
if filename.startswith('.'):
filename = '_' + filename[1:]
return filename or "file"
@staticmethod
def validate_json_size(data: Dict[str, Any], max_size_kb: int = 1024) -> bool:
"""Check if JSON data size is within limits"""
try:
import json
json_str = json.dumps(data)
size_kb = len(json_str.encode('utf-8')) / 1024
return size_kb <= max_size_kb
except:
return False
@staticmethod
def validate_settings(settings: Dict[str, Any]) -> Tuple[bool, Dict[str, Any], list]:
"""Validate settings object"""
errors = []
sanitized = {}
# Boolean settings
bool_settings = [
'notificationsEnabled', 'notifyTranscription',
'notifyTranslation', 'notifyErrors', 'offlineMode'
]
for setting in bool_settings:
if setting in settings:
sanitized[setting] = bool(settings[setting])
# URL validation
if 'ttsServerUrl' in settings and settings['ttsServerUrl']:
url = Validators.validate_url(settings['ttsServerUrl'])
if not url:
errors.append('Invalid TTS server URL')
else:
sanitized['ttsServerUrl'] = url
# API key validation
if 'ttsApiKey' in settings and settings['ttsApiKey']:
key = Validators.validate_api_key(settings['ttsApiKey'])
if not key:
errors.append('Invalid API key format')
else:
sanitized['ttsApiKey'] = key
return len(errors) == 0, sanitized, errors
@staticmethod
def rate_limit_check(identifier: str, action: str, max_requests: int = 10,
window_seconds: int = 60, storage: Dict = None) -> bool:
"""
Simple rate limiting check
Returns True if request is allowed, False if rate limited
"""
import time
if storage is None:
return True # Can't track without storage
key = f"{identifier}:{action}"
current_time = time.time()
window_start = current_time - window_seconds
# Get or create request list
if key not in storage:
storage[key] = []
# Remove old requests outside the window
storage[key] = [t for t in storage[key] if t > window_start]
# Check if limit exceeded
if len(storage[key]) >= max_requests:
return False
# Add current request
storage[key].append(current_time)
return True