This commit introduces major enhancements to Talk2Me: ## Database Integration - PostgreSQL support with SQLAlchemy ORM - Redis integration for caching and real-time analytics - Automated database initialization scripts - Migration support infrastructure ## User Authentication System - JWT-based API authentication - Session-based web authentication - API key authentication for programmatic access - User roles and permissions (admin/user) - Login history and session tracking - Rate limiting per user with customizable limits ## Admin Dashboard - Real-time analytics and monitoring - User management interface (create, edit, delete users) - System health monitoring - Request/error tracking - Language pair usage statistics - Performance metrics visualization ## Key Features - Dual authentication support (token + user accounts) - Graceful fallback for missing services - Non-blocking analytics middleware - Comprehensive error handling - Session management with security features ## Bug Fixes - Fixed rate limiting bypass for admin routes - Added missing email validation method - Improved error handling for missing database tables - Fixed session-based authentication for API endpoints 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
256 lines
8.4 KiB
Python
256 lines
8.4 KiB
Python
"""
|
|
Input validation and sanitization for the Talk2Me application
|
|
"""
|
|
import re
|
|
import html
|
|
from typing import Optional, Dict, Any, Tuple
|
|
import os
|
|
|
|
class Validators:
|
|
# Maximum sizes
|
|
MAX_TEXT_LENGTH = 10000
|
|
MAX_AUDIO_SIZE = 25 * 1024 * 1024 # 25MB
|
|
MAX_URL_LENGTH = 2048
|
|
MAX_API_KEY_LENGTH = 128
|
|
|
|
# Allowed audio formats
|
|
ALLOWED_AUDIO_EXTENSIONS = {'.webm', '.ogg', '.wav', '.mp3', '.mp4', '.m4a'}
|
|
ALLOWED_AUDIO_MIMETYPES = {
|
|
'audio/webm', 'audio/ogg', 'audio/wav', 'audio/mp3',
|
|
'audio/mpeg', 'audio/mp4', 'audio/x-m4a', 'audio/x-wav'
|
|
}
|
|
|
|
@staticmethod
|
|
def sanitize_text(text: str, max_length: int = None) -> str:
|
|
"""Sanitize text input by removing dangerous characters"""
|
|
if not isinstance(text, str):
|
|
return ""
|
|
|
|
if max_length is None:
|
|
max_length = Validators.MAX_TEXT_LENGTH
|
|
|
|
# Trim and limit length
|
|
text = text.strip()[:max_length]
|
|
|
|
# Remove null bytes
|
|
text = text.replace('\x00', '')
|
|
|
|
# Remove control characters except newlines and tabs
|
|
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
|
|
|
|
return text
|
|
|
|
@staticmethod
|
|
def sanitize_html(text: str) -> str:
|
|
"""Escape HTML to prevent XSS"""
|
|
if not isinstance(text, str):
|
|
return ""
|
|
return html.escape(text)
|
|
|
|
@staticmethod
|
|
def validate_language_code(code: str, allowed_languages: set) -> Optional[str]:
|
|
"""Validate language code against allowed list"""
|
|
if not code or not isinstance(code, str):
|
|
return None
|
|
|
|
code = code.strip().lower()
|
|
|
|
# Check if it's in the allowed list or is 'auto'
|
|
if code in allowed_languages or code == 'auto':
|
|
return code
|
|
|
|
return None
|
|
|
|
@staticmethod
|
|
def validate_audio_file(file_storage) -> Tuple[bool, Optional[str]]:
|
|
"""Validate uploaded audio file"""
|
|
if not file_storage:
|
|
return False, "No file provided"
|
|
|
|
# Check file size
|
|
file_storage.seek(0, os.SEEK_END)
|
|
size = file_storage.tell()
|
|
file_storage.seek(0)
|
|
|
|
if size > Validators.MAX_AUDIO_SIZE:
|
|
return False, f"File size exceeds {Validators.MAX_AUDIO_SIZE // (1024*1024)}MB limit"
|
|
|
|
# Check file extension
|
|
if file_storage.filename:
|
|
ext = os.path.splitext(file_storage.filename.lower())[1]
|
|
if ext not in Validators.ALLOWED_AUDIO_EXTENSIONS:
|
|
return False, "Invalid audio file type"
|
|
|
|
# Check MIME type if available
|
|
if hasattr(file_storage, 'content_type') and file_storage.content_type:
|
|
if file_storage.content_type not in Validators.ALLOWED_AUDIO_MIMETYPES:
|
|
# Allow generic application/octet-stream as browsers sometimes use this
|
|
if file_storage.content_type != 'application/octet-stream':
|
|
return False, "Invalid audio MIME type"
|
|
|
|
return True, None
|
|
|
|
@staticmethod
|
|
def validate_email(email: str) -> bool:
|
|
"""Validate email address format"""
|
|
if not email or not isinstance(email, str):
|
|
return False
|
|
|
|
# Basic email pattern
|
|
email_pattern = re.compile(
|
|
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
)
|
|
|
|
return bool(email_pattern.match(email))
|
|
|
|
@staticmethod
|
|
def validate_url(url: str) -> Optional[str]:
|
|
"""Validate and sanitize URL"""
|
|
if not url or not isinstance(url, str):
|
|
return None
|
|
|
|
url = url.strip()
|
|
|
|
# Check length
|
|
if len(url) > Validators.MAX_URL_LENGTH:
|
|
return None
|
|
|
|
# Basic URL pattern check
|
|
url_pattern = re.compile(
|
|
r'^https?://' # http:// or https://
|
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
|
|
r'localhost|' # localhost...
|
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
|
|
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
|
|
r'(?::\d+)?' # optional port
|
|
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
|
|
|
|
if not url_pattern.match(url):
|
|
return None
|
|
|
|
# Prevent some common injection attempts
|
|
dangerous_patterns = [
|
|
'javascript:', 'data:', 'vbscript:', 'file:', 'about:', 'chrome:'
|
|
]
|
|
if any(pattern in url.lower() for pattern in dangerous_patterns):
|
|
return None
|
|
|
|
return url
|
|
|
|
@staticmethod
|
|
def validate_api_key(key: str) -> Optional[str]:
|
|
"""Validate API key format"""
|
|
if not key or not isinstance(key, str):
|
|
return None
|
|
|
|
key = key.strip()
|
|
|
|
# Check length
|
|
if len(key) < 20 or len(key) > Validators.MAX_API_KEY_LENGTH:
|
|
return None
|
|
|
|
# Only allow alphanumeric, dash, and underscore
|
|
if not re.match(r'^[a-zA-Z0-9\-_]+$', key):
|
|
return None
|
|
|
|
return key
|
|
|
|
@staticmethod
|
|
def sanitize_filename(filename: str) -> str:
|
|
"""Sanitize filename to prevent directory traversal"""
|
|
if not filename or not isinstance(filename, str):
|
|
return "file"
|
|
|
|
# Remove any path components
|
|
filename = os.path.basename(filename)
|
|
|
|
# Remove dangerous characters
|
|
filename = re.sub(r'[^a-zA-Z0-9.\-_]', '_', filename)
|
|
|
|
# Limit length
|
|
if len(filename) > 255:
|
|
name, ext = os.path.splitext(filename)
|
|
max_name_length = 255 - len(ext)
|
|
filename = name[:max_name_length] + ext
|
|
|
|
# Don't allow hidden files
|
|
if filename.startswith('.'):
|
|
filename = '_' + filename[1:]
|
|
|
|
return filename or "file"
|
|
|
|
@staticmethod
|
|
def validate_json_size(data: Dict[str, Any], max_size_kb: int = 1024) -> bool:
|
|
"""Check if JSON data size is within limits"""
|
|
try:
|
|
import json
|
|
json_str = json.dumps(data)
|
|
size_kb = len(json_str.encode('utf-8')) / 1024
|
|
return size_kb <= max_size_kb
|
|
except:
|
|
return False
|
|
|
|
@staticmethod
|
|
def validate_settings(settings: Dict[str, Any]) -> Tuple[bool, Dict[str, Any], list]:
|
|
"""Validate settings object"""
|
|
errors = []
|
|
sanitized = {}
|
|
|
|
# Boolean settings
|
|
bool_settings = [
|
|
'notificationsEnabled', 'notifyTranscription',
|
|
'notifyTranslation', 'notifyErrors', 'offlineMode'
|
|
]
|
|
|
|
for setting in bool_settings:
|
|
if setting in settings:
|
|
sanitized[setting] = bool(settings[setting])
|
|
|
|
# URL validation
|
|
if 'ttsServerUrl' in settings and settings['ttsServerUrl']:
|
|
url = Validators.validate_url(settings['ttsServerUrl'])
|
|
if not url:
|
|
errors.append('Invalid TTS server URL')
|
|
else:
|
|
sanitized['ttsServerUrl'] = url
|
|
|
|
# API key validation
|
|
if 'ttsApiKey' in settings and settings['ttsApiKey']:
|
|
key = Validators.validate_api_key(settings['ttsApiKey'])
|
|
if not key:
|
|
errors.append('Invalid API key format')
|
|
else:
|
|
sanitized['ttsApiKey'] = key
|
|
|
|
return len(errors) == 0, sanitized, errors
|
|
|
|
@staticmethod
|
|
def rate_limit_check(identifier: str, action: str, max_requests: int = 10,
|
|
window_seconds: int = 60, storage: Dict = None) -> bool:
|
|
"""
|
|
Simple rate limiting check
|
|
Returns True if request is allowed, False if rate limited
|
|
"""
|
|
import time
|
|
|
|
if storage is None:
|
|
return True # Can't track without storage
|
|
|
|
key = f"{identifier}:{action}"
|
|
current_time = time.time()
|
|
window_start = current_time - window_seconds
|
|
|
|
# Get or create request list
|
|
if key not in storage:
|
|
storage[key] = []
|
|
|
|
# Remove old requests outside the window
|
|
storage[key] = [t for t in storage[key] if t > window_start]
|
|
|
|
# Check if limit exceeded
|
|
if len(storage[key]) >= max_requests:
|
|
return False
|
|
|
|
# Add current request
|
|
storage[key].append(current_time)
|
|
return True |