talk2me/validators.py
Adolfo Delorenzo fa951c3141 Add comprehensive database integration, authentication, and admin dashboard
This commit introduces major enhancements to Talk2Me:

## Database Integration
- PostgreSQL support with SQLAlchemy ORM
- Redis integration for caching and real-time analytics
- Automated database initialization scripts
- Migration support infrastructure

## User Authentication System
- JWT-based API authentication
- Session-based web authentication
- API key authentication for programmatic access
- User roles and permissions (admin/user)
- Login history and session tracking
- Rate limiting per user with customizable limits

## Admin Dashboard
- Real-time analytics and monitoring
- User management interface (create, edit, delete users)
- System health monitoring
- Request/error tracking
- Language pair usage statistics
- Performance metrics visualization

## Key Features
- Dual authentication support (token + user accounts)
- Graceful fallback for missing services
- Non-blocking analytics middleware
- Comprehensive error handling
- Session management with security features

## Bug Fixes
- Fixed rate limiting bypass for admin routes
- Added missing email validation method
- Improved error handling for missing database tables
- Fixed session-based authentication for API endpoints

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-03 18:21:56 -06:00

256 lines
8.4 KiB
Python

"""
Input validation and sanitization for the Talk2Me application
"""
import re
import html
from typing import Optional, Dict, Any, Tuple
import os
class Validators:
# Maximum sizes
MAX_TEXT_LENGTH = 10000
MAX_AUDIO_SIZE = 25 * 1024 * 1024 # 25MB
MAX_URL_LENGTH = 2048
MAX_API_KEY_LENGTH = 128
# Allowed audio formats
ALLOWED_AUDIO_EXTENSIONS = {'.webm', '.ogg', '.wav', '.mp3', '.mp4', '.m4a'}
ALLOWED_AUDIO_MIMETYPES = {
'audio/webm', 'audio/ogg', 'audio/wav', 'audio/mp3',
'audio/mpeg', 'audio/mp4', 'audio/x-m4a', 'audio/x-wav'
}
@staticmethod
def sanitize_text(text: str, max_length: int = None) -> str:
"""Sanitize text input by removing dangerous characters"""
if not isinstance(text, str):
return ""
if max_length is None:
max_length = Validators.MAX_TEXT_LENGTH
# Trim and limit length
text = text.strip()[:max_length]
# Remove null bytes
text = text.replace('\x00', '')
# Remove control characters except newlines and tabs
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
return text
@staticmethod
def sanitize_html(text: str) -> str:
"""Escape HTML to prevent XSS"""
if not isinstance(text, str):
return ""
return html.escape(text)
@staticmethod
def validate_language_code(code: str, allowed_languages: set) -> Optional[str]:
"""Validate language code against allowed list"""
if not code or not isinstance(code, str):
return None
code = code.strip().lower()
# Check if it's in the allowed list or is 'auto'
if code in allowed_languages or code == 'auto':
return code
return None
@staticmethod
def validate_audio_file(file_storage) -> Tuple[bool, Optional[str]]:
"""Validate uploaded audio file"""
if not file_storage:
return False, "No file provided"
# Check file size
file_storage.seek(0, os.SEEK_END)
size = file_storage.tell()
file_storage.seek(0)
if size > Validators.MAX_AUDIO_SIZE:
return False, f"File size exceeds {Validators.MAX_AUDIO_SIZE // (1024*1024)}MB limit"
# Check file extension
if file_storage.filename:
ext = os.path.splitext(file_storage.filename.lower())[1]
if ext not in Validators.ALLOWED_AUDIO_EXTENSIONS:
return False, "Invalid audio file type"
# Check MIME type if available
if hasattr(file_storage, 'content_type') and file_storage.content_type:
if file_storage.content_type not in Validators.ALLOWED_AUDIO_MIMETYPES:
# Allow generic application/octet-stream as browsers sometimes use this
if file_storage.content_type != 'application/octet-stream':
return False, "Invalid audio MIME type"
return True, None
@staticmethod
def validate_email(email: str) -> bool:
"""Validate email address format"""
if not email or not isinstance(email, str):
return False
# Basic email pattern
email_pattern = re.compile(
r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
)
return bool(email_pattern.match(email))
@staticmethod
def validate_url(url: str) -> Optional[str]:
"""Validate and sanitize URL"""
if not url or not isinstance(url, str):
return None
url = url.strip()
# Check length
if len(url) > Validators.MAX_URL_LENGTH:
return None
# Basic URL pattern check
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
if not url_pattern.match(url):
return None
# Prevent some common injection attempts
dangerous_patterns = [
'javascript:', 'data:', 'vbscript:', 'file:', 'about:', 'chrome:'
]
if any(pattern in url.lower() for pattern in dangerous_patterns):
return None
return url
@staticmethod
def validate_api_key(key: str) -> Optional[str]:
"""Validate API key format"""
if not key or not isinstance(key, str):
return None
key = key.strip()
# Check length
if len(key) < 20 or len(key) > Validators.MAX_API_KEY_LENGTH:
return None
# Only allow alphanumeric, dash, and underscore
if not re.match(r'^[a-zA-Z0-9\-_]+$', key):
return None
return key
@staticmethod
def sanitize_filename(filename: str) -> str:
"""Sanitize filename to prevent directory traversal"""
if not filename or not isinstance(filename, str):
return "file"
# Remove any path components
filename = os.path.basename(filename)
# Remove dangerous characters
filename = re.sub(r'[^a-zA-Z0-9.\-_]', '_', filename)
# Limit length
if len(filename) > 255:
name, ext = os.path.splitext(filename)
max_name_length = 255 - len(ext)
filename = name[:max_name_length] + ext
# Don't allow hidden files
if filename.startswith('.'):
filename = '_' + filename[1:]
return filename or "file"
@staticmethod
def validate_json_size(data: Dict[str, Any], max_size_kb: int = 1024) -> bool:
"""Check if JSON data size is within limits"""
try:
import json
json_str = json.dumps(data)
size_kb = len(json_str.encode('utf-8')) / 1024
return size_kb <= max_size_kb
except:
return False
@staticmethod
def validate_settings(settings: Dict[str, Any]) -> Tuple[bool, Dict[str, Any], list]:
"""Validate settings object"""
errors = []
sanitized = {}
# Boolean settings
bool_settings = [
'notificationsEnabled', 'notifyTranscription',
'notifyTranslation', 'notifyErrors', 'offlineMode'
]
for setting in bool_settings:
if setting in settings:
sanitized[setting] = bool(settings[setting])
# URL validation
if 'ttsServerUrl' in settings and settings['ttsServerUrl']:
url = Validators.validate_url(settings['ttsServerUrl'])
if not url:
errors.append('Invalid TTS server URL')
else:
sanitized['ttsServerUrl'] = url
# API key validation
if 'ttsApiKey' in settings and settings['ttsApiKey']:
key = Validators.validate_api_key(settings['ttsApiKey'])
if not key:
errors.append('Invalid API key format')
else:
sanitized['ttsApiKey'] = key
return len(errors) == 0, sanitized, errors
@staticmethod
def rate_limit_check(identifier: str, action: str, max_requests: int = 10,
window_seconds: int = 60, storage: Dict = None) -> bool:
"""
Simple rate limiting check
Returns True if request is allowed, False if rate limited
"""
import time
if storage is None:
return True # Can't track without storage
key = f"{identifier}:{action}"
current_time = time.time()
window_start = current_time - window_seconds
# Get or create request list
if key not in storage:
storage[key] = []
# Remove old requests outside the window
storage[key] = [t for t in storage[key] if t > window_start]
# Check if limit exceeded
if len(storage[key]) >= max_requests:
return False
# Add current request
storage[key].append(current_time)
return True