"""Analytics middleware for tracking requests and operations""" import time import json import logging from datetime import datetime from flask import request, g import redis import psycopg2 from psycopg2.extras import RealDictCursor import threading from queue import Queue from functools import wraps logger = logging.getLogger(__name__) class AnalyticsTracker: """Track and store analytics data""" def __init__(self, app=None): self.app = app self.redis_client = None self.pg_conn = None self.write_queue = Queue() self.writer_thread = None if app: self.init_app(app) def init_app(self, app): """Initialize analytics with Flask app""" self.app = app # Initialize Redis connection try: self.redis_client = redis.from_url( app.config.get('REDIS_URL', 'redis://localhost:6379/0'), decode_responses=True ) self.redis_client.ping() logger.info("Analytics Redis connection established") except Exception as e: logger.error(f"Failed to connect to Redis for analytics: {e}") self.redis_client = None # Initialize PostgreSQL connection try: self.pg_conn = psycopg2.connect( app.config.get('DATABASE_URL', 'postgresql://localhost/talk2me') ) self.pg_conn.autocommit = True logger.info("Analytics PostgreSQL connection established") except Exception as e: logger.error(f"Failed to connect to PostgreSQL for analytics: {e}") self.pg_conn = None # Start background writer thread self.writer_thread = threading.Thread(target=self._write_worker, daemon=True) self.writer_thread.start() # Register before/after request handlers app.before_request(self.before_request) app.after_request(self.after_request) def before_request(self): """Track request start time""" g.start_time = time.time() g.request_size = request.content_length or 0 def after_request(self, response): """Track request completion and metrics""" try: # Skip if analytics is disabled if not self.enabled: return response # Calculate response time response_time = int((time.time() - g.start_time) * 1000) # in ms # Track in Redis for real-time stats if self.redis_client: self._track_redis_stats(request, response, response_time) # Queue for PostgreSQL logging if self.pg_conn and request.endpoint not in ['static', 'admin.static']: self._queue_request_log(request, response, response_time) except Exception as e: logger.error(f"Error in analytics after_request: {e}") return response def _track_redis_stats(self, request, response, response_time): """Track statistics in Redis""" try: now = datetime.now() # Increment request counters pipe = self.redis_client.pipeline() # Total requests pipe.incr('stats:requests:total') # Time-based counters pipe.incr(f'stats:requests:minute:{now.strftime("%Y-%m-%d-%H-%M")}') pipe.expire(f'stats:requests:minute:{now.strftime("%Y-%m-%d-%H-%M")}', 3600) # 1 hour pipe.incr(f'stats:requests:hourly:{now.strftime("%Y-%m-%d-%H")}') pipe.expire(f'stats:requests:hourly:{now.strftime("%Y-%m-%d-%H")}', 86400) # 24 hours pipe.incr(f'stats:requests:daily:{now.strftime("%Y-%m-%d")}') pipe.expire(f'stats:requests:daily:{now.strftime("%Y-%m-%d")}', 604800) # 7 days # Track errors if response.status_code >= 400: pipe.incr(f'stats:errors:daily:{now.strftime("%Y-%m-%d")}') pipe.incr(f'stats:errors:hourly:{now.strftime("%Y-%m-%d-%H")}') pipe.expire(f'stats:errors:hourly:{now.strftime("%Y-%m-%d-%H")}', 86400) # Track response times endpoint_key = request.endpoint or 'unknown' pipe.lpush(f'stats:response_times:{endpoint_key}', response_time) pipe.ltrim(f'stats:response_times:{endpoint_key}', 0, 999) # Keep last 1000 # Track slow requests if response_time > 1000: # Over 1 second slow_request = { 'endpoint': request.endpoint, 'method': request.method, 'response_time': response_time, 'timestamp': now.isoformat() } pipe.lpush('stats:slow_requests', json.dumps(slow_request)) pipe.ltrim('stats:slow_requests', 0, 99) # Keep last 100 pipe.execute() except Exception as e: logger.error(f"Error tracking Redis stats: {e}") def _queue_request_log(self, request, response, response_time): """Queue request log for PostgreSQL""" try: log_entry = { 'endpoint': request.endpoint, 'method': request.method, 'status_code': response.status_code, 'response_time_ms': response_time, 'ip_address': request.remote_addr, 'user_agent': request.headers.get('User-Agent', '')[:500], 'request_size_bytes': g.get('request_size', 0), 'response_size_bytes': len(response.get_data()), 'session_id': g.get('session_id'), 'created_at': datetime.now() } self.write_queue.put(('request_log', log_entry)) except Exception as e: logger.error(f"Error queuing request log: {e}") def track_operation(self, operation_type, **kwargs): """Track specific operations (translation, transcription, etc.)""" def decorator(f): @wraps(f) def wrapped(*args, **inner_kwargs): start_time = time.time() success = True error_message = None result = None try: result = f(*args, **inner_kwargs) return result except Exception as e: success = False error_message = str(e) raise finally: # Track operation response_time = int((time.time() - start_time) * 1000) self._track_operation_complete( operation_type, response_time, success, error_message, kwargs, result ) return wrapped return decorator def _track_operation_complete(self, operation_type, response_time, success, error_message, metadata, result): """Track operation completion""" try: now = datetime.now() # Update Redis counters if self.redis_client: pipe = self.redis_client.pipeline() # Operation counters pipe.incr(f'stats:{operation_type}:total') pipe.incr(f'stats:{operation_type}:daily:{now.strftime("%Y-%m-%d")}') pipe.expire(f'stats:{operation_type}:daily:{now.strftime("%Y-%m-%d")}', 604800) # Response times pipe.lpush(f'stats:response_times:{operation_type}', response_time) pipe.ltrim(f'stats:response_times:{operation_type}', 0, 999) # Language pairs for translations if operation_type == 'translations' and 'source_lang' in metadata: lang_pair = f"{metadata.get('source_lang')} -> {metadata.get('target_lang')}" pipe.hincrby('stats:language_pairs', lang_pair, 1) # Error tracking if not success: pipe.hincrby('stats:error_types', error_message[:100], 1) pipe.execute() # Queue for PostgreSQL if self.pg_conn: log_entry = { 'operation_type': operation_type, 'response_time_ms': response_time, 'success': success, 'error_message': error_message, 'metadata': metadata, 'result': result, 'session_id': g.get('session_id'), 'created_at': now } self.write_queue.put((operation_type, log_entry)) except Exception as e: logger.error(f"Error tracking operation: {e}") def _write_worker(self): """Background worker to write logs to PostgreSQL""" while True: try: # Get items from queue (blocking) operation_type, log_entry = self.write_queue.get() if operation_type == 'request_log': self._write_request_log(log_entry) elif operation_type == 'translations': self._write_translation_log(log_entry) elif operation_type == 'transcriptions': self._write_transcription_log(log_entry) elif operation_type == 'tts': self._write_tts_log(log_entry) except Exception as e: logger.error(f"Error in analytics write worker: {e}") def _write_request_log(self, log_entry): """Write request log to PostgreSQL""" try: with self.pg_conn.cursor() as cursor: cursor.execute(""" INSERT INTO request_logs (endpoint, method, status_code, response_time_ms, ip_address, user_agent, request_size_bytes, response_size_bytes, session_id, created_at) VALUES (%(endpoint)s, %(method)s, %(status_code)s, %(response_time_ms)s, %(ip_address)s, %(user_agent)s, %(request_size_bytes)s, %(response_size_bytes)s, %(session_id)s, %(created_at)s) """, log_entry) except Exception as e: error_msg = str(e) if 'relation "request_logs" does not exist' in error_msg: logger.warning("Analytics tables not found. Run init_analytics_db.py to create them.") # Disable analytics to prevent repeated errors self.enabled = False else: logger.error(f"Error writing request log: {e}") def _write_translation_log(self, log_entry): """Write translation log to PostgreSQL""" try: metadata = log_entry.get('metadata', {}) with self.pg_conn.cursor() as cursor: cursor.execute(""" INSERT INTO translation_logs (source_language, target_language, text_length, response_time_ms, success, error_message, session_id, created_at) VALUES (%(source_language)s, %(target_language)s, %(text_length)s, %(response_time_ms)s, %(success)s, %(error_message)s, %(session_id)s, %(created_at)s) """, { 'source_language': metadata.get('source_lang'), 'target_language': metadata.get('target_lang'), 'text_length': metadata.get('text_length', 0), 'response_time_ms': log_entry['response_time_ms'], 'success': log_entry['success'], 'error_message': log_entry['error_message'], 'session_id': log_entry['session_id'], 'created_at': log_entry['created_at'] }) except Exception as e: logger.error(f"Error writing translation log: {e}") def _write_transcription_log(self, log_entry): """Write transcription log to PostgreSQL""" try: metadata = log_entry.get('metadata', {}) result = log_entry.get('result', {}) with self.pg_conn.cursor() as cursor: cursor.execute(""" INSERT INTO transcription_logs (detected_language, audio_duration_seconds, file_size_bytes, response_time_ms, success, error_message, session_id, created_at) VALUES (%(detected_language)s, %(audio_duration_seconds)s, %(file_size_bytes)s, %(response_time_ms)s, %(success)s, %(error_message)s, %(session_id)s, %(created_at)s) """, { 'detected_language': result.get('detected_language') if isinstance(result, dict) else None, 'audio_duration_seconds': metadata.get('audio_duration', 0), 'file_size_bytes': metadata.get('file_size', 0), 'response_time_ms': log_entry['response_time_ms'], 'success': log_entry['success'], 'error_message': log_entry['error_message'], 'session_id': log_entry['session_id'], 'created_at': log_entry['created_at'] }) except Exception as e: logger.error(f"Error writing transcription log: {e}") def _write_tts_log(self, log_entry): """Write TTS log to PostgreSQL""" try: metadata = log_entry.get('metadata', {}) with self.pg_conn.cursor() as cursor: cursor.execute(""" INSERT INTO tts_logs (language, text_length, voice, response_time_ms, success, error_message, session_id, created_at) VALUES (%(language)s, %(text_length)s, %(voice)s, %(response_time_ms)s, %(success)s, %(error_message)s, %(session_id)s, %(created_at)s) """, { 'language': metadata.get('language'), 'text_length': metadata.get('text_length', 0), 'voice': metadata.get('voice'), 'response_time_ms': log_entry['response_time_ms'], 'success': log_entry['success'], 'error_message': log_entry['error_message'], 'session_id': log_entry['session_id'], 'created_at': log_entry['created_at'] }) except Exception as e: logger.error(f"Error writing TTS log: {e}") def log_error(self, error_type, error_message, **kwargs): """Log error to analytics""" try: # Track in Redis if self.redis_client: pipe = self.redis_client.pipeline() pipe.hincrby('stats:error_types', error_type, 1) pipe.incr(f'stats:errors:daily:{datetime.now().strftime("%Y-%m-%d")}') pipe.execute() # Log to PostgreSQL if self.pg_conn: with self.pg_conn.cursor() as cursor: cursor.execute(""" INSERT INTO error_logs (error_type, error_message, endpoint, method, status_code, ip_address, user_agent, request_id, stack_trace, created_at) VALUES (%(error_type)s, %(error_message)s, %(endpoint)s, %(method)s, %(status_code)s, %(ip_address)s, %(user_agent)s, %(request_id)s, %(stack_trace)s, %(created_at)s) """, { 'error_type': error_type, 'error_message': error_message[:1000], 'endpoint': kwargs.get('endpoint'), 'method': kwargs.get('method'), 'status_code': kwargs.get('status_code'), 'ip_address': kwargs.get('ip_address'), 'user_agent': kwargs.get('user_agent', '')[:500], 'request_id': kwargs.get('request_id'), 'stack_trace': kwargs.get('stack_trace', '')[:5000], 'created_at': datetime.now() }) except Exception as e: logger.error(f"Error logging analytics error: {e}") def update_cache_stats(self, hit=True): """Update cache hit/miss statistics""" try: if self.redis_client: if hit: self.redis_client.incr('stats:cache:hits') else: self.redis_client.incr('stats:cache:misses') except Exception as e: logger.error(f"Error updating cache stats: {e}") # Create global instance analytics_tracker = AnalyticsTracker() # Convenience decorators def track_translation(**kwargs): """Decorator to track translation operations""" return analytics_tracker.track_operation('translations', **kwargs) def track_transcription(**kwargs): """Decorator to track transcription operations""" return analytics_tracker.track_operation('transcriptions', **kwargs) def track_tts(**kwargs): """Decorator to track TTS operations""" return analytics_tracker.track_operation('tts', **kwargs)