talk2me/analytics_middleware.py
Adolfo Delorenzo fa951c3141 Add comprehensive database integration, authentication, and admin dashboard
This commit introduces major enhancements to Talk2Me:

## Database Integration
- PostgreSQL support with SQLAlchemy ORM
- Redis integration for caching and real-time analytics
- Automated database initialization scripts
- Migration support infrastructure

## User Authentication System
- JWT-based API authentication
- Session-based web authentication
- API key authentication for programmatic access
- User roles and permissions (admin/user)
- Login history and session tracking
- Rate limiting per user with customizable limits

## Admin Dashboard
- Real-time analytics and monitoring
- User management interface (create, edit, delete users)
- System health monitoring
- Request/error tracking
- Language pair usage statistics
- Performance metrics visualization

## Key Features
- Dual authentication support (token + user accounts)
- Graceful fallback for missing services
- Non-blocking analytics middleware
- Comprehensive error handling
- Session management with security features

## Bug Fixes
- Fixed rate limiting bypass for admin routes
- Added missing email validation method
- Improved error handling for missing database tables
- Fixed session-based authentication for API endpoints

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-06-03 18:21:56 -06:00

426 lines
18 KiB
Python

"""Analytics middleware for tracking requests and operations"""
import time
import json
import logging
from datetime import datetime
from flask import request, g
import redis
import psycopg2
from psycopg2.extras import RealDictCursor
import threading
from queue import Queue
from functools import wraps
logger = logging.getLogger(__name__)
class AnalyticsTracker:
"""Track and store analytics data"""
def __init__(self, app=None):
self.app = app
self.redis_client = None
self.pg_conn = None
self.write_queue = Queue()
self.writer_thread = None
if app:
self.init_app(app)
def init_app(self, app):
"""Initialize analytics with Flask app"""
self.app = app
# Initialize Redis connection
try:
self.redis_client = redis.from_url(
app.config.get('REDIS_URL', 'redis://localhost:6379/0'),
decode_responses=True
)
self.redis_client.ping()
logger.info("Analytics Redis connection established")
except Exception as e:
logger.error(f"Failed to connect to Redis for analytics: {e}")
self.redis_client = None
# Initialize PostgreSQL connection
try:
self.pg_conn = psycopg2.connect(
app.config.get('DATABASE_URL', 'postgresql://localhost/talk2me')
)
self.pg_conn.autocommit = True
logger.info("Analytics PostgreSQL connection established")
except Exception as e:
logger.error(f"Failed to connect to PostgreSQL for analytics: {e}")
self.pg_conn = None
# Start background writer thread
self.writer_thread = threading.Thread(target=self._write_worker, daemon=True)
self.writer_thread.start()
# Register before/after request handlers
app.before_request(self.before_request)
app.after_request(self.after_request)
def before_request(self):
"""Track request start time"""
g.start_time = time.time()
g.request_size = request.content_length or 0
def after_request(self, response):
"""Track request completion and metrics"""
try:
# Skip if analytics is disabled
if not self.enabled:
return response
# Calculate response time
response_time = int((time.time() - g.start_time) * 1000) # in ms
# Track in Redis for real-time stats
if self.redis_client:
self._track_redis_stats(request, response, response_time)
# Queue for PostgreSQL logging
if self.pg_conn and request.endpoint not in ['static', 'admin.static']:
self._queue_request_log(request, response, response_time)
except Exception as e:
logger.error(f"Error in analytics after_request: {e}")
return response
def _track_redis_stats(self, request, response, response_time):
"""Track statistics in Redis"""
try:
now = datetime.now()
# Increment request counters
pipe = self.redis_client.pipeline()
# Total requests
pipe.incr('stats:requests:total')
# Time-based counters
pipe.incr(f'stats:requests:minute:{now.strftime("%Y-%m-%d-%H-%M")}')
pipe.expire(f'stats:requests:minute:{now.strftime("%Y-%m-%d-%H-%M")}', 3600) # 1 hour
pipe.incr(f'stats:requests:hourly:{now.strftime("%Y-%m-%d-%H")}')
pipe.expire(f'stats:requests:hourly:{now.strftime("%Y-%m-%d-%H")}', 86400) # 24 hours
pipe.incr(f'stats:requests:daily:{now.strftime("%Y-%m-%d")}')
pipe.expire(f'stats:requests:daily:{now.strftime("%Y-%m-%d")}', 604800) # 7 days
# Track errors
if response.status_code >= 400:
pipe.incr(f'stats:errors:daily:{now.strftime("%Y-%m-%d")}')
pipe.incr(f'stats:errors:hourly:{now.strftime("%Y-%m-%d-%H")}')
pipe.expire(f'stats:errors:hourly:{now.strftime("%Y-%m-%d-%H")}', 86400)
# Track response times
endpoint_key = request.endpoint or 'unknown'
pipe.lpush(f'stats:response_times:{endpoint_key}', response_time)
pipe.ltrim(f'stats:response_times:{endpoint_key}', 0, 999) # Keep last 1000
# Track slow requests
if response_time > 1000: # Over 1 second
slow_request = {
'endpoint': request.endpoint,
'method': request.method,
'response_time': response_time,
'timestamp': now.isoformat()
}
pipe.lpush('stats:slow_requests', json.dumps(slow_request))
pipe.ltrim('stats:slow_requests', 0, 99) # Keep last 100
pipe.execute()
except Exception as e:
logger.error(f"Error tracking Redis stats: {e}")
def _queue_request_log(self, request, response, response_time):
"""Queue request log for PostgreSQL"""
try:
log_entry = {
'endpoint': request.endpoint,
'method': request.method,
'status_code': response.status_code,
'response_time_ms': response_time,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent', '')[:500],
'request_size_bytes': g.get('request_size', 0),
'response_size_bytes': len(response.get_data()),
'session_id': g.get('session_id'),
'created_at': datetime.now()
}
self.write_queue.put(('request_log', log_entry))
except Exception as e:
logger.error(f"Error queuing request log: {e}")
def track_operation(self, operation_type, **kwargs):
"""Track specific operations (translation, transcription, etc.)"""
def decorator(f):
@wraps(f)
def wrapped(*args, **inner_kwargs):
start_time = time.time()
success = True
error_message = None
result = None
try:
result = f(*args, **inner_kwargs)
return result
except Exception as e:
success = False
error_message = str(e)
raise
finally:
# Track operation
response_time = int((time.time() - start_time) * 1000)
self._track_operation_complete(
operation_type, response_time, success,
error_message, kwargs, result
)
return wrapped
return decorator
def _track_operation_complete(self, operation_type, response_time, success,
error_message, metadata, result):
"""Track operation completion"""
try:
now = datetime.now()
# Update Redis counters
if self.redis_client:
pipe = self.redis_client.pipeline()
# Operation counters
pipe.incr(f'stats:{operation_type}:total')
pipe.incr(f'stats:{operation_type}:daily:{now.strftime("%Y-%m-%d")}')
pipe.expire(f'stats:{operation_type}:daily:{now.strftime("%Y-%m-%d")}', 604800)
# Response times
pipe.lpush(f'stats:response_times:{operation_type}', response_time)
pipe.ltrim(f'stats:response_times:{operation_type}', 0, 999)
# Language pairs for translations
if operation_type == 'translations' and 'source_lang' in metadata:
lang_pair = f"{metadata.get('source_lang')} -> {metadata.get('target_lang')}"
pipe.hincrby('stats:language_pairs', lang_pair, 1)
# Error tracking
if not success:
pipe.hincrby('stats:error_types', error_message[:100], 1)
pipe.execute()
# Queue for PostgreSQL
if self.pg_conn:
log_entry = {
'operation_type': operation_type,
'response_time_ms': response_time,
'success': success,
'error_message': error_message,
'metadata': metadata,
'result': result,
'session_id': g.get('session_id'),
'created_at': now
}
self.write_queue.put((operation_type, log_entry))
except Exception as e:
logger.error(f"Error tracking operation: {e}")
def _write_worker(self):
"""Background worker to write logs to PostgreSQL"""
while True:
try:
# Get items from queue (blocking)
operation_type, log_entry = self.write_queue.get()
if operation_type == 'request_log':
self._write_request_log(log_entry)
elif operation_type == 'translations':
self._write_translation_log(log_entry)
elif operation_type == 'transcriptions':
self._write_transcription_log(log_entry)
elif operation_type == 'tts':
self._write_tts_log(log_entry)
except Exception as e:
logger.error(f"Error in analytics write worker: {e}")
def _write_request_log(self, log_entry):
"""Write request log to PostgreSQL"""
try:
with self.pg_conn.cursor() as cursor:
cursor.execute("""
INSERT INTO request_logs
(endpoint, method, status_code, response_time_ms,
ip_address, user_agent, request_size_bytes,
response_size_bytes, session_id, created_at)
VALUES (%(endpoint)s, %(method)s, %(status_code)s,
%(response_time_ms)s, %(ip_address)s, %(user_agent)s,
%(request_size_bytes)s, %(response_size_bytes)s,
%(session_id)s, %(created_at)s)
""", log_entry)
except Exception as e:
error_msg = str(e)
if 'relation "request_logs" does not exist' in error_msg:
logger.warning("Analytics tables not found. Run init_analytics_db.py to create them.")
# Disable analytics to prevent repeated errors
self.enabled = False
else:
logger.error(f"Error writing request log: {e}")
def _write_translation_log(self, log_entry):
"""Write translation log to PostgreSQL"""
try:
metadata = log_entry.get('metadata', {})
with self.pg_conn.cursor() as cursor:
cursor.execute("""
INSERT INTO translation_logs
(source_language, target_language, text_length,
response_time_ms, success, error_message,
session_id, created_at)
VALUES (%(source_language)s, %(target_language)s,
%(text_length)s, %(response_time_ms)s,
%(success)s, %(error_message)s,
%(session_id)s, %(created_at)s)
""", {
'source_language': metadata.get('source_lang'),
'target_language': metadata.get('target_lang'),
'text_length': metadata.get('text_length', 0),
'response_time_ms': log_entry['response_time_ms'],
'success': log_entry['success'],
'error_message': log_entry['error_message'],
'session_id': log_entry['session_id'],
'created_at': log_entry['created_at']
})
except Exception as e:
logger.error(f"Error writing translation log: {e}")
def _write_transcription_log(self, log_entry):
"""Write transcription log to PostgreSQL"""
try:
metadata = log_entry.get('metadata', {})
result = log_entry.get('result', {})
with self.pg_conn.cursor() as cursor:
cursor.execute("""
INSERT INTO transcription_logs
(detected_language, audio_duration_seconds,
file_size_bytes, response_time_ms, success,
error_message, session_id, created_at)
VALUES (%(detected_language)s, %(audio_duration_seconds)s,
%(file_size_bytes)s, %(response_time_ms)s,
%(success)s, %(error_message)s,
%(session_id)s, %(created_at)s)
""", {
'detected_language': result.get('detected_language') if isinstance(result, dict) else None,
'audio_duration_seconds': metadata.get('audio_duration', 0),
'file_size_bytes': metadata.get('file_size', 0),
'response_time_ms': log_entry['response_time_ms'],
'success': log_entry['success'],
'error_message': log_entry['error_message'],
'session_id': log_entry['session_id'],
'created_at': log_entry['created_at']
})
except Exception as e:
logger.error(f"Error writing transcription log: {e}")
def _write_tts_log(self, log_entry):
"""Write TTS log to PostgreSQL"""
try:
metadata = log_entry.get('metadata', {})
with self.pg_conn.cursor() as cursor:
cursor.execute("""
INSERT INTO tts_logs
(language, text_length, voice, response_time_ms,
success, error_message, session_id, created_at)
VALUES (%(language)s, %(text_length)s, %(voice)s,
%(response_time_ms)s, %(success)s,
%(error_message)s, %(session_id)s, %(created_at)s)
""", {
'language': metadata.get('language'),
'text_length': metadata.get('text_length', 0),
'voice': metadata.get('voice'),
'response_time_ms': log_entry['response_time_ms'],
'success': log_entry['success'],
'error_message': log_entry['error_message'],
'session_id': log_entry['session_id'],
'created_at': log_entry['created_at']
})
except Exception as e:
logger.error(f"Error writing TTS log: {e}")
def log_error(self, error_type, error_message, **kwargs):
"""Log error to analytics"""
try:
# Track in Redis
if self.redis_client:
pipe = self.redis_client.pipeline()
pipe.hincrby('stats:error_types', error_type, 1)
pipe.incr(f'stats:errors:daily:{datetime.now().strftime("%Y-%m-%d")}')
pipe.execute()
# Log to PostgreSQL
if self.pg_conn:
with self.pg_conn.cursor() as cursor:
cursor.execute("""
INSERT INTO error_logs
(error_type, error_message, endpoint, method,
status_code, ip_address, user_agent, request_id,
stack_trace, created_at)
VALUES (%(error_type)s, %(error_message)s,
%(endpoint)s, %(method)s, %(status_code)s,
%(ip_address)s, %(user_agent)s,
%(request_id)s, %(stack_trace)s,
%(created_at)s)
""", {
'error_type': error_type,
'error_message': error_message[:1000],
'endpoint': kwargs.get('endpoint'),
'method': kwargs.get('method'),
'status_code': kwargs.get('status_code'),
'ip_address': kwargs.get('ip_address'),
'user_agent': kwargs.get('user_agent', '')[:500],
'request_id': kwargs.get('request_id'),
'stack_trace': kwargs.get('stack_trace', '')[:5000],
'created_at': datetime.now()
})
except Exception as e:
logger.error(f"Error logging analytics error: {e}")
def update_cache_stats(self, hit=True):
"""Update cache hit/miss statistics"""
try:
if self.redis_client:
if hit:
self.redis_client.incr('stats:cache:hits')
else:
self.redis_client.incr('stats:cache:misses')
except Exception as e:
logger.error(f"Error updating cache stats: {e}")
# Create global instance
analytics_tracker = AnalyticsTracker()
# Convenience decorators
def track_translation(**kwargs):
"""Decorator to track translation operations"""
return analytics_tracker.track_operation('translations', **kwargs)
def track_transcription(**kwargs):
"""Decorator to track transcription operations"""
return analytics_tracker.track_operation('transcriptions', **kwargs)
def track_tts(**kwargs):
"""Decorator to track TTS operations"""
return analytics_tracker.track_operation('tts', **kwargs)