This commit introduces major enhancements to Talk2Me: ## Database Integration - PostgreSQL support with SQLAlchemy ORM - Redis integration for caching and real-time analytics - Automated database initialization scripts - Migration support infrastructure ## User Authentication System - JWT-based API authentication - Session-based web authentication - API key authentication for programmatic access - User roles and permissions (admin/user) - Login history and session tracking - Rate limiting per user with customizable limits ## Admin Dashboard - Real-time analytics and monitoring - User management interface (create, edit, delete users) - System health monitoring - Request/error tracking - Language pair usage statistics - Performance metrics visualization ## Key Features - Dual authentication support (token + user accounts) - Graceful fallback for missing services - Non-blocking analytics middleware - Comprehensive error handling - Session management with security features ## Bug Fixes - Fixed rate limiting bypass for admin routes - Added missing email validation method - Improved error handling for missing database tables - Fixed session-based authentication for API endpoints 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
426 lines
18 KiB
Python
426 lines
18 KiB
Python
"""Analytics middleware for tracking requests and operations"""
|
|
|
|
import time
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from flask import request, g
|
|
import redis
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
import threading
|
|
from queue import Queue
|
|
from functools import wraps
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AnalyticsTracker:
|
|
"""Track and store analytics data"""
|
|
|
|
def __init__(self, app=None):
|
|
self.app = app
|
|
self.redis_client = None
|
|
self.pg_conn = None
|
|
self.write_queue = Queue()
|
|
self.writer_thread = None
|
|
|
|
if app:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app):
|
|
"""Initialize analytics with Flask app"""
|
|
self.app = app
|
|
|
|
# Initialize Redis connection
|
|
try:
|
|
self.redis_client = redis.from_url(
|
|
app.config.get('REDIS_URL', 'redis://localhost:6379/0'),
|
|
decode_responses=True
|
|
)
|
|
self.redis_client.ping()
|
|
logger.info("Analytics Redis connection established")
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to Redis for analytics: {e}")
|
|
self.redis_client = None
|
|
|
|
# Initialize PostgreSQL connection
|
|
try:
|
|
self.pg_conn = psycopg2.connect(
|
|
app.config.get('DATABASE_URL', 'postgresql://localhost/talk2me')
|
|
)
|
|
self.pg_conn.autocommit = True
|
|
logger.info("Analytics PostgreSQL connection established")
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to PostgreSQL for analytics: {e}")
|
|
self.pg_conn = None
|
|
|
|
# Start background writer thread
|
|
self.writer_thread = threading.Thread(target=self._write_worker, daemon=True)
|
|
self.writer_thread.start()
|
|
|
|
# Register before/after request handlers
|
|
app.before_request(self.before_request)
|
|
app.after_request(self.after_request)
|
|
|
|
def before_request(self):
|
|
"""Track request start time"""
|
|
g.start_time = time.time()
|
|
g.request_size = request.content_length or 0
|
|
|
|
def after_request(self, response):
|
|
"""Track request completion and metrics"""
|
|
try:
|
|
# Skip if analytics is disabled
|
|
if not self.enabled:
|
|
return response
|
|
|
|
# Calculate response time
|
|
response_time = int((time.time() - g.start_time) * 1000) # in ms
|
|
|
|
# Track in Redis for real-time stats
|
|
if self.redis_client:
|
|
self._track_redis_stats(request, response, response_time)
|
|
|
|
# Queue for PostgreSQL logging
|
|
if self.pg_conn and request.endpoint not in ['static', 'admin.static']:
|
|
self._queue_request_log(request, response, response_time)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in analytics after_request: {e}")
|
|
|
|
return response
|
|
|
|
def _track_redis_stats(self, request, response, response_time):
|
|
"""Track statistics in Redis"""
|
|
try:
|
|
now = datetime.now()
|
|
|
|
# Increment request counters
|
|
pipe = self.redis_client.pipeline()
|
|
|
|
# Total requests
|
|
pipe.incr('stats:requests:total')
|
|
|
|
# Time-based counters
|
|
pipe.incr(f'stats:requests:minute:{now.strftime("%Y-%m-%d-%H-%M")}')
|
|
pipe.expire(f'stats:requests:minute:{now.strftime("%Y-%m-%d-%H-%M")}', 3600) # 1 hour
|
|
|
|
pipe.incr(f'stats:requests:hourly:{now.strftime("%Y-%m-%d-%H")}')
|
|
pipe.expire(f'stats:requests:hourly:{now.strftime("%Y-%m-%d-%H")}', 86400) # 24 hours
|
|
|
|
pipe.incr(f'stats:requests:daily:{now.strftime("%Y-%m-%d")}')
|
|
pipe.expire(f'stats:requests:daily:{now.strftime("%Y-%m-%d")}', 604800) # 7 days
|
|
|
|
# Track errors
|
|
if response.status_code >= 400:
|
|
pipe.incr(f'stats:errors:daily:{now.strftime("%Y-%m-%d")}')
|
|
pipe.incr(f'stats:errors:hourly:{now.strftime("%Y-%m-%d-%H")}')
|
|
pipe.expire(f'stats:errors:hourly:{now.strftime("%Y-%m-%d-%H")}', 86400)
|
|
|
|
# Track response times
|
|
endpoint_key = request.endpoint or 'unknown'
|
|
pipe.lpush(f'stats:response_times:{endpoint_key}', response_time)
|
|
pipe.ltrim(f'stats:response_times:{endpoint_key}', 0, 999) # Keep last 1000
|
|
|
|
# Track slow requests
|
|
if response_time > 1000: # Over 1 second
|
|
slow_request = {
|
|
'endpoint': request.endpoint,
|
|
'method': request.method,
|
|
'response_time': response_time,
|
|
'timestamp': now.isoformat()
|
|
}
|
|
pipe.lpush('stats:slow_requests', json.dumps(slow_request))
|
|
pipe.ltrim('stats:slow_requests', 0, 99) # Keep last 100
|
|
|
|
pipe.execute()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error tracking Redis stats: {e}")
|
|
|
|
def _queue_request_log(self, request, response, response_time):
|
|
"""Queue request log for PostgreSQL"""
|
|
try:
|
|
log_entry = {
|
|
'endpoint': request.endpoint,
|
|
'method': request.method,
|
|
'status_code': response.status_code,
|
|
'response_time_ms': response_time,
|
|
'ip_address': request.remote_addr,
|
|
'user_agent': request.headers.get('User-Agent', '')[:500],
|
|
'request_size_bytes': g.get('request_size', 0),
|
|
'response_size_bytes': len(response.get_data()),
|
|
'session_id': g.get('session_id'),
|
|
'created_at': datetime.now()
|
|
}
|
|
|
|
self.write_queue.put(('request_log', log_entry))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error queuing request log: {e}")
|
|
|
|
def track_operation(self, operation_type, **kwargs):
|
|
"""Track specific operations (translation, transcription, etc.)"""
|
|
def decorator(f):
|
|
@wraps(f)
|
|
def wrapped(*args, **inner_kwargs):
|
|
start_time = time.time()
|
|
success = True
|
|
error_message = None
|
|
result = None
|
|
|
|
try:
|
|
result = f(*args, **inner_kwargs)
|
|
return result
|
|
except Exception as e:
|
|
success = False
|
|
error_message = str(e)
|
|
raise
|
|
finally:
|
|
# Track operation
|
|
response_time = int((time.time() - start_time) * 1000)
|
|
self._track_operation_complete(
|
|
operation_type, response_time, success,
|
|
error_message, kwargs, result
|
|
)
|
|
|
|
return wrapped
|
|
return decorator
|
|
|
|
def _track_operation_complete(self, operation_type, response_time, success,
|
|
error_message, metadata, result):
|
|
"""Track operation completion"""
|
|
try:
|
|
now = datetime.now()
|
|
|
|
# Update Redis counters
|
|
if self.redis_client:
|
|
pipe = self.redis_client.pipeline()
|
|
|
|
# Operation counters
|
|
pipe.incr(f'stats:{operation_type}:total')
|
|
pipe.incr(f'stats:{operation_type}:daily:{now.strftime("%Y-%m-%d")}')
|
|
pipe.expire(f'stats:{operation_type}:daily:{now.strftime("%Y-%m-%d")}', 604800)
|
|
|
|
# Response times
|
|
pipe.lpush(f'stats:response_times:{operation_type}', response_time)
|
|
pipe.ltrim(f'stats:response_times:{operation_type}', 0, 999)
|
|
|
|
# Language pairs for translations
|
|
if operation_type == 'translations' and 'source_lang' in metadata:
|
|
lang_pair = f"{metadata.get('source_lang')} -> {metadata.get('target_lang')}"
|
|
pipe.hincrby('stats:language_pairs', lang_pair, 1)
|
|
|
|
# Error tracking
|
|
if not success:
|
|
pipe.hincrby('stats:error_types', error_message[:100], 1)
|
|
|
|
pipe.execute()
|
|
|
|
# Queue for PostgreSQL
|
|
if self.pg_conn:
|
|
log_entry = {
|
|
'operation_type': operation_type,
|
|
'response_time_ms': response_time,
|
|
'success': success,
|
|
'error_message': error_message,
|
|
'metadata': metadata,
|
|
'result': result,
|
|
'session_id': g.get('session_id'),
|
|
'created_at': now
|
|
}
|
|
|
|
self.write_queue.put((operation_type, log_entry))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error tracking operation: {e}")
|
|
|
|
def _write_worker(self):
|
|
"""Background worker to write logs to PostgreSQL"""
|
|
while True:
|
|
try:
|
|
# Get items from queue (blocking)
|
|
operation_type, log_entry = self.write_queue.get()
|
|
|
|
if operation_type == 'request_log':
|
|
self._write_request_log(log_entry)
|
|
elif operation_type == 'translations':
|
|
self._write_translation_log(log_entry)
|
|
elif operation_type == 'transcriptions':
|
|
self._write_transcription_log(log_entry)
|
|
elif operation_type == 'tts':
|
|
self._write_tts_log(log_entry)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in analytics write worker: {e}")
|
|
|
|
def _write_request_log(self, log_entry):
|
|
"""Write request log to PostgreSQL"""
|
|
try:
|
|
with self.pg_conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
INSERT INTO request_logs
|
|
(endpoint, method, status_code, response_time_ms,
|
|
ip_address, user_agent, request_size_bytes,
|
|
response_size_bytes, session_id, created_at)
|
|
VALUES (%(endpoint)s, %(method)s, %(status_code)s,
|
|
%(response_time_ms)s, %(ip_address)s, %(user_agent)s,
|
|
%(request_size_bytes)s, %(response_size_bytes)s,
|
|
%(session_id)s, %(created_at)s)
|
|
""", log_entry)
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if 'relation "request_logs" does not exist' in error_msg:
|
|
logger.warning("Analytics tables not found. Run init_analytics_db.py to create them.")
|
|
# Disable analytics to prevent repeated errors
|
|
self.enabled = False
|
|
else:
|
|
logger.error(f"Error writing request log: {e}")
|
|
|
|
def _write_translation_log(self, log_entry):
|
|
"""Write translation log to PostgreSQL"""
|
|
try:
|
|
metadata = log_entry.get('metadata', {})
|
|
|
|
with self.pg_conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
INSERT INTO translation_logs
|
|
(source_language, target_language, text_length,
|
|
response_time_ms, success, error_message,
|
|
session_id, created_at)
|
|
VALUES (%(source_language)s, %(target_language)s,
|
|
%(text_length)s, %(response_time_ms)s,
|
|
%(success)s, %(error_message)s,
|
|
%(session_id)s, %(created_at)s)
|
|
""", {
|
|
'source_language': metadata.get('source_lang'),
|
|
'target_language': metadata.get('target_lang'),
|
|
'text_length': metadata.get('text_length', 0),
|
|
'response_time_ms': log_entry['response_time_ms'],
|
|
'success': log_entry['success'],
|
|
'error_message': log_entry['error_message'],
|
|
'session_id': log_entry['session_id'],
|
|
'created_at': log_entry['created_at']
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error writing translation log: {e}")
|
|
|
|
def _write_transcription_log(self, log_entry):
|
|
"""Write transcription log to PostgreSQL"""
|
|
try:
|
|
metadata = log_entry.get('metadata', {})
|
|
result = log_entry.get('result', {})
|
|
|
|
with self.pg_conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
INSERT INTO transcription_logs
|
|
(detected_language, audio_duration_seconds,
|
|
file_size_bytes, response_time_ms, success,
|
|
error_message, session_id, created_at)
|
|
VALUES (%(detected_language)s, %(audio_duration_seconds)s,
|
|
%(file_size_bytes)s, %(response_time_ms)s,
|
|
%(success)s, %(error_message)s,
|
|
%(session_id)s, %(created_at)s)
|
|
""", {
|
|
'detected_language': result.get('detected_language') if isinstance(result, dict) else None,
|
|
'audio_duration_seconds': metadata.get('audio_duration', 0),
|
|
'file_size_bytes': metadata.get('file_size', 0),
|
|
'response_time_ms': log_entry['response_time_ms'],
|
|
'success': log_entry['success'],
|
|
'error_message': log_entry['error_message'],
|
|
'session_id': log_entry['session_id'],
|
|
'created_at': log_entry['created_at']
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error writing transcription log: {e}")
|
|
|
|
def _write_tts_log(self, log_entry):
|
|
"""Write TTS log to PostgreSQL"""
|
|
try:
|
|
metadata = log_entry.get('metadata', {})
|
|
|
|
with self.pg_conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
INSERT INTO tts_logs
|
|
(language, text_length, voice, response_time_ms,
|
|
success, error_message, session_id, created_at)
|
|
VALUES (%(language)s, %(text_length)s, %(voice)s,
|
|
%(response_time_ms)s, %(success)s,
|
|
%(error_message)s, %(session_id)s, %(created_at)s)
|
|
""", {
|
|
'language': metadata.get('language'),
|
|
'text_length': metadata.get('text_length', 0),
|
|
'voice': metadata.get('voice'),
|
|
'response_time_ms': log_entry['response_time_ms'],
|
|
'success': log_entry['success'],
|
|
'error_message': log_entry['error_message'],
|
|
'session_id': log_entry['session_id'],
|
|
'created_at': log_entry['created_at']
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error writing TTS log: {e}")
|
|
|
|
def log_error(self, error_type, error_message, **kwargs):
|
|
"""Log error to analytics"""
|
|
try:
|
|
# Track in Redis
|
|
if self.redis_client:
|
|
pipe = self.redis_client.pipeline()
|
|
pipe.hincrby('stats:error_types', error_type, 1)
|
|
pipe.incr(f'stats:errors:daily:{datetime.now().strftime("%Y-%m-%d")}')
|
|
pipe.execute()
|
|
|
|
# Log to PostgreSQL
|
|
if self.pg_conn:
|
|
with self.pg_conn.cursor() as cursor:
|
|
cursor.execute("""
|
|
INSERT INTO error_logs
|
|
(error_type, error_message, endpoint, method,
|
|
status_code, ip_address, user_agent, request_id,
|
|
stack_trace, created_at)
|
|
VALUES (%(error_type)s, %(error_message)s,
|
|
%(endpoint)s, %(method)s, %(status_code)s,
|
|
%(ip_address)s, %(user_agent)s,
|
|
%(request_id)s, %(stack_trace)s,
|
|
%(created_at)s)
|
|
""", {
|
|
'error_type': error_type,
|
|
'error_message': error_message[:1000],
|
|
'endpoint': kwargs.get('endpoint'),
|
|
'method': kwargs.get('method'),
|
|
'status_code': kwargs.get('status_code'),
|
|
'ip_address': kwargs.get('ip_address'),
|
|
'user_agent': kwargs.get('user_agent', '')[:500],
|
|
'request_id': kwargs.get('request_id'),
|
|
'stack_trace': kwargs.get('stack_trace', '')[:5000],
|
|
'created_at': datetime.now()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error logging analytics error: {e}")
|
|
|
|
def update_cache_stats(self, hit=True):
|
|
"""Update cache hit/miss statistics"""
|
|
try:
|
|
if self.redis_client:
|
|
if hit:
|
|
self.redis_client.incr('stats:cache:hits')
|
|
else:
|
|
self.redis_client.incr('stats:cache:misses')
|
|
except Exception as e:
|
|
logger.error(f"Error updating cache stats: {e}")
|
|
|
|
# Create global instance
|
|
analytics_tracker = AnalyticsTracker()
|
|
|
|
# Convenience decorators
|
|
def track_translation(**kwargs):
|
|
"""Decorator to track translation operations"""
|
|
return analytics_tracker.track_operation('translations', **kwargs)
|
|
|
|
def track_transcription(**kwargs):
|
|
"""Decorator to track transcription operations"""
|
|
return analytics_tracker.track_operation('transcriptions', **kwargs)
|
|
|
|
def track_tts(**kwargs):
|
|
"""Decorator to track TTS operations"""
|
|
return analytics_tracker.track_operation('tts', **kwargs) |