This comprehensive error logging system provides structured logging, automatic rotation, and detailed tracking for production debugging. Key features: - Structured JSON logging for easy parsing and analysis - Multiple log streams: app, errors, access, security, performance - Automatic log rotation to prevent disk space exhaustion - Request tracing with unique IDs for debugging - Performance metrics collection with slow request tracking - Security event logging for suspicious activities - Error deduplication and frequency tracking - Full exception details with stack traces Implementation details: - StructuredFormatter outputs JSON-formatted logs - ErrorLogger manages multiple specialized loggers - Rotating file handlers prevent disk space issues - Request context automatically included in logs - Performance decorator tracks execution times - Security events logged for audit trails - Admin endpoints for log analysis Admin endpoints: - GET /admin/logs/errors - View recent errors and frequencies - GET /admin/logs/performance - View performance metrics - GET /admin/logs/security - View security events Log types: - talk2me.log - General application logs - errors.log - Dedicated error logging with stack traces - access.log - HTTP request/response logs - security.log - Security events and suspicious activities - performance.log - Performance metrics and timing This provides production-grade observability critical for debugging issues, monitoring performance, and maintaining security in production environments. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
564 lines
19 KiB
Python
564 lines
19 KiB
Python
# Comprehensive error logging system for production debugging
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import sys
|
|
import json
|
|
import traceback
|
|
import time
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, Union
|
|
from functools import wraps
|
|
import socket
|
|
import threading
|
|
from flask import request, g
|
|
from contextlib import contextmanager
|
|
import hashlib
|
|
|
|
# Create logs directory if it doesn't exist
|
|
LOGS_DIR = os.environ.get('LOGS_DIR', 'logs')
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
|
|
class StructuredFormatter(logging.Formatter):
|
|
"""
|
|
Custom formatter that outputs structured JSON logs
|
|
"""
|
|
def __init__(self, app_name='talk2me', environment='development'):
|
|
super().__init__()
|
|
self.app_name = app_name
|
|
self.environment = environment
|
|
self.hostname = socket.gethostname()
|
|
|
|
def format(self, record):
|
|
# Base log structure
|
|
log_data = {
|
|
'timestamp': datetime.utcnow().isoformat() + 'Z',
|
|
'level': record.levelname,
|
|
'logger': record.name,
|
|
'message': record.getMessage(),
|
|
'app': self.app_name,
|
|
'environment': self.environment,
|
|
'hostname': self.hostname,
|
|
'thread': threading.current_thread().name,
|
|
'process': os.getpid()
|
|
}
|
|
|
|
# Add exception info if present
|
|
if record.exc_info:
|
|
log_data['exception'] = {
|
|
'type': record.exc_info[0].__name__,
|
|
'message': str(record.exc_info[1]),
|
|
'traceback': traceback.format_exception(*record.exc_info)
|
|
}
|
|
|
|
# Add extra fields
|
|
if hasattr(record, 'extra_fields'):
|
|
log_data.update(record.extra_fields)
|
|
|
|
# Add Flask request context if available
|
|
if hasattr(record, 'request_id'):
|
|
log_data['request_id'] = record.request_id
|
|
|
|
if hasattr(record, 'user_id'):
|
|
log_data['user_id'] = record.user_id
|
|
|
|
if hasattr(record, 'session_id'):
|
|
log_data['session_id'] = record.session_id
|
|
|
|
# Add performance metrics if available
|
|
if hasattr(record, 'duration'):
|
|
log_data['duration_ms'] = record.duration
|
|
|
|
if hasattr(record, 'memory_usage'):
|
|
log_data['memory_usage_mb'] = record.memory_usage
|
|
|
|
return json.dumps(log_data, default=str)
|
|
|
|
class ErrorLogger:
|
|
"""
|
|
Comprehensive error logging system with multiple handlers and features
|
|
"""
|
|
def __init__(self, app=None, config=None):
|
|
self.config = config or {}
|
|
self.loggers = {}
|
|
self.error_counts = {}
|
|
self.error_signatures = {}
|
|
|
|
if app:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app):
|
|
"""Initialize error logging for Flask app"""
|
|
self.app = app
|
|
|
|
# Get configuration
|
|
self.log_level = self.config.get('log_level',
|
|
app.config.get('LOG_LEVEL', 'INFO'))
|
|
self.log_file = self.config.get('log_file',
|
|
app.config.get('LOG_FILE',
|
|
os.path.join(LOGS_DIR, 'talk2me.log')))
|
|
self.error_log_file = self.config.get('error_log_file',
|
|
os.path.join(LOGS_DIR, 'errors.log'))
|
|
self.max_bytes = self.config.get('max_bytes', 50 * 1024 * 1024) # 50MB
|
|
self.backup_count = self.config.get('backup_count', 10)
|
|
self.environment = app.config.get('FLASK_ENV', 'development')
|
|
|
|
# Set up loggers
|
|
self._setup_app_logger()
|
|
self._setup_error_logger()
|
|
self._setup_access_logger()
|
|
self._setup_security_logger()
|
|
self._setup_performance_logger()
|
|
|
|
# Add Flask error handlers
|
|
self._setup_flask_handlers(app)
|
|
|
|
# Add request logging
|
|
app.before_request(self._before_request)
|
|
app.after_request(self._after_request)
|
|
|
|
# Store logger in app
|
|
app.error_logger = self
|
|
|
|
logging.info("Error logging system initialized")
|
|
|
|
def _setup_app_logger(self):
|
|
"""Set up main application logger"""
|
|
app_logger = logging.getLogger('talk2me')
|
|
app_logger.setLevel(getattr(logging, self.log_level.upper()))
|
|
|
|
# Remove existing handlers
|
|
app_logger.handlers = []
|
|
|
|
# Console handler with color support
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
if sys.stdout.isatty():
|
|
# Use colored output for terminals
|
|
from colorlog import ColoredFormatter
|
|
console_formatter = ColoredFormatter(
|
|
'%(log_color)s%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
log_colors={
|
|
'DEBUG': 'cyan',
|
|
'INFO': 'green',
|
|
'WARNING': 'yellow',
|
|
'ERROR': 'red',
|
|
'CRITICAL': 'red,bg_white',
|
|
}
|
|
)
|
|
console_handler.setFormatter(console_formatter)
|
|
else:
|
|
console_handler.setFormatter(
|
|
StructuredFormatter('talk2me', self.environment)
|
|
)
|
|
app_logger.addHandler(console_handler)
|
|
|
|
# Rotating file handler
|
|
file_handler = logging.handlers.RotatingFileHandler(
|
|
self.log_file,
|
|
maxBytes=self.max_bytes,
|
|
backupCount=self.backup_count
|
|
)
|
|
file_handler.setFormatter(
|
|
StructuredFormatter('talk2me', self.environment)
|
|
)
|
|
app_logger.addHandler(file_handler)
|
|
|
|
self.loggers['app'] = app_logger
|
|
|
|
def _setup_error_logger(self):
|
|
"""Set up dedicated error logger"""
|
|
error_logger = logging.getLogger('talk2me.errors')
|
|
error_logger.setLevel(logging.ERROR)
|
|
|
|
# Error file handler
|
|
error_handler = logging.handlers.RotatingFileHandler(
|
|
self.error_log_file,
|
|
maxBytes=self.max_bytes,
|
|
backupCount=self.backup_count
|
|
)
|
|
error_handler.setFormatter(
|
|
StructuredFormatter('talk2me', self.environment)
|
|
)
|
|
error_logger.addHandler(error_handler)
|
|
|
|
# Also send errors to syslog if available
|
|
try:
|
|
syslog_handler = logging.handlers.SysLogHandler(
|
|
address='/dev/log' if os.path.exists('/dev/log') else ('localhost', 514)
|
|
)
|
|
syslog_handler.setFormatter(
|
|
logging.Formatter('talk2me[%(process)d]: %(levelname)s %(message)s')
|
|
)
|
|
error_logger.addHandler(syslog_handler)
|
|
except Exception:
|
|
pass # Syslog not available
|
|
|
|
self.loggers['error'] = error_logger
|
|
|
|
def _setup_access_logger(self):
|
|
"""Set up access logger for HTTP requests"""
|
|
access_logger = logging.getLogger('talk2me.access')
|
|
access_logger.setLevel(logging.INFO)
|
|
|
|
# Access log file
|
|
access_handler = logging.handlers.TimedRotatingFileHandler(
|
|
os.path.join(LOGS_DIR, 'access.log'),
|
|
when='midnight',
|
|
interval=1,
|
|
backupCount=30
|
|
)
|
|
access_handler.setFormatter(
|
|
StructuredFormatter('talk2me', self.environment)
|
|
)
|
|
access_logger.addHandler(access_handler)
|
|
|
|
self.loggers['access'] = access_logger
|
|
|
|
def _setup_security_logger(self):
|
|
"""Set up security event logger"""
|
|
security_logger = logging.getLogger('talk2me.security')
|
|
security_logger.setLevel(logging.WARNING)
|
|
|
|
# Security log file
|
|
security_handler = logging.handlers.RotatingFileHandler(
|
|
os.path.join(LOGS_DIR, 'security.log'),
|
|
maxBytes=self.max_bytes,
|
|
backupCount=self.backup_count
|
|
)
|
|
security_handler.setFormatter(
|
|
StructuredFormatter('talk2me', self.environment)
|
|
)
|
|
security_logger.addHandler(security_handler)
|
|
|
|
self.loggers['security'] = security_logger
|
|
|
|
def _setup_performance_logger(self):
|
|
"""Set up performance metrics logger"""
|
|
perf_logger = logging.getLogger('talk2me.performance')
|
|
perf_logger.setLevel(logging.INFO)
|
|
|
|
# Performance log file
|
|
perf_handler = logging.handlers.TimedRotatingFileHandler(
|
|
os.path.join(LOGS_DIR, 'performance.log'),
|
|
when='H', # Hourly rotation
|
|
interval=1,
|
|
backupCount=168 # 7 days
|
|
)
|
|
perf_handler.setFormatter(
|
|
StructuredFormatter('talk2me', self.environment)
|
|
)
|
|
perf_logger.addHandler(perf_handler)
|
|
|
|
self.loggers['performance'] = perf_logger
|
|
|
|
def _setup_flask_handlers(self, app):
|
|
"""Set up Flask error handlers"""
|
|
@app.errorhandler(Exception)
|
|
def handle_exception(error):
|
|
# Get request ID
|
|
request_id = getattr(g, 'request_id', 'unknown')
|
|
|
|
# Create error signature for deduplication
|
|
error_signature = self._get_error_signature(error)
|
|
|
|
# Log the error
|
|
self.log_error(
|
|
error,
|
|
request_id=request_id,
|
|
endpoint=request.endpoint,
|
|
method=request.method,
|
|
path=request.path,
|
|
ip=request.remote_addr,
|
|
user_agent=request.headers.get('User-Agent'),
|
|
signature=error_signature
|
|
)
|
|
|
|
# Track error frequency
|
|
self._track_error(error_signature)
|
|
|
|
# Return appropriate response
|
|
if hasattr(error, 'code'):
|
|
return {'error': str(error)}, error.code
|
|
else:
|
|
return {'error': 'Internal server error'}, 500
|
|
|
|
def _before_request(self):
|
|
"""Log request start"""
|
|
# Generate request ID
|
|
g.request_id = self._generate_request_id()
|
|
g.request_start_time = time.time()
|
|
|
|
# Log access
|
|
self.log_access(
|
|
'request_start',
|
|
request_id=g.request_id,
|
|
method=request.method,
|
|
path=request.path,
|
|
ip=request.remote_addr,
|
|
user_agent=request.headers.get('User-Agent')
|
|
)
|
|
|
|
def _after_request(self, response):
|
|
"""Log request completion"""
|
|
# Calculate duration
|
|
duration = None
|
|
if hasattr(g, 'request_start_time'):
|
|
duration = int((time.time() - g.request_start_time) * 1000)
|
|
|
|
# Log access
|
|
self.log_access(
|
|
'request_complete',
|
|
request_id=getattr(g, 'request_id', 'unknown'),
|
|
method=request.method,
|
|
path=request.path,
|
|
status=response.status_code,
|
|
duration_ms=duration,
|
|
content_length=response.content_length
|
|
)
|
|
|
|
# Log performance metrics for slow requests
|
|
if duration and duration > 1000: # Over 1 second
|
|
self.log_performance(
|
|
'slow_request',
|
|
request_id=getattr(g, 'request_id', 'unknown'),
|
|
endpoint=request.endpoint,
|
|
duration_ms=duration
|
|
)
|
|
|
|
return response
|
|
|
|
def log_error(self, error: Exception, **kwargs):
|
|
"""Log an error with context"""
|
|
error_logger = self.loggers.get('error', logging.getLogger())
|
|
|
|
# Create log record with extra fields
|
|
extra = {
|
|
'extra_fields': kwargs,
|
|
'request_id': kwargs.get('request_id'),
|
|
'user_id': kwargs.get('user_id'),
|
|
'session_id': kwargs.get('session_id')
|
|
}
|
|
|
|
# Log with full traceback
|
|
error_logger.error(
|
|
f"Error in {kwargs.get('endpoint', 'unknown')}: {str(error)}",
|
|
exc_info=sys.exc_info(),
|
|
extra=extra
|
|
)
|
|
|
|
def log_access(self, message: str, **kwargs):
|
|
"""Log access event"""
|
|
access_logger = self.loggers.get('access', logging.getLogger())
|
|
|
|
extra = {
|
|
'extra_fields': kwargs,
|
|
'request_id': kwargs.get('request_id')
|
|
}
|
|
|
|
access_logger.info(message, extra=extra)
|
|
|
|
def log_security(self, event: str, severity: str = 'warning', **kwargs):
|
|
"""Log security event"""
|
|
security_logger = self.loggers.get('security', logging.getLogger())
|
|
|
|
extra = {
|
|
'extra_fields': {
|
|
'event': event,
|
|
'severity': severity,
|
|
**kwargs
|
|
},
|
|
'request_id': kwargs.get('request_id')
|
|
}
|
|
|
|
log_method = getattr(security_logger, severity.lower(), security_logger.warning)
|
|
log_method(f"Security event: {event}", extra=extra)
|
|
|
|
def log_performance(self, metric: str, value: Union[int, float] = None, **kwargs):
|
|
"""Log performance metric"""
|
|
perf_logger = self.loggers.get('performance', logging.getLogger())
|
|
|
|
extra = {
|
|
'extra_fields': {
|
|
'metric': metric,
|
|
'value': value,
|
|
**kwargs
|
|
},
|
|
'request_id': kwargs.get('request_id')
|
|
}
|
|
|
|
perf_logger.info(f"Performance metric: {metric}", extra=extra)
|
|
|
|
def _generate_request_id(self):
|
|
"""Generate unique request ID"""
|
|
return f"{int(time.time() * 1000)}-{os.urandom(8).hex()}"
|
|
|
|
def _get_error_signature(self, error: Exception):
|
|
"""Generate signature for error deduplication"""
|
|
# Create signature from error type and key parts of traceback
|
|
tb_summary = traceback.format_exception_only(type(error), error)
|
|
signature_data = f"{type(error).__name__}:{tb_summary[0] if tb_summary else ''}"
|
|
return hashlib.md5(signature_data.encode()).hexdigest()
|
|
|
|
def _track_error(self, signature: str):
|
|
"""Track error frequency"""
|
|
now = time.time()
|
|
|
|
if signature not in self.error_counts:
|
|
self.error_counts[signature] = []
|
|
|
|
# Add current timestamp
|
|
self.error_counts[signature].append(now)
|
|
|
|
# Clean old entries (keep last hour)
|
|
self.error_counts[signature] = [
|
|
ts for ts in self.error_counts[signature]
|
|
if now - ts < 3600
|
|
]
|
|
|
|
# Alert if error rate is high
|
|
error_count = len(self.error_counts[signature])
|
|
if error_count > 10: # More than 10 in an hour
|
|
self.log_security(
|
|
'high_error_rate',
|
|
severity='error',
|
|
signature=signature,
|
|
count=error_count,
|
|
message="High error rate detected"
|
|
)
|
|
|
|
def get_error_summary(self):
|
|
"""Get summary of recent errors"""
|
|
summary = {}
|
|
now = time.time()
|
|
|
|
for signature, timestamps in self.error_counts.items():
|
|
recent_count = len([ts for ts in timestamps if now - ts < 3600])
|
|
if recent_count > 0:
|
|
summary[signature] = {
|
|
'count_last_hour': recent_count,
|
|
'last_seen': max(timestamps)
|
|
}
|
|
|
|
return summary
|
|
|
|
# Decorators for easy logging
|
|
def log_errors(logger_name='talk2me'):
|
|
"""Decorator to log function errors"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
logger = logging.getLogger(logger_name)
|
|
logger.error(
|
|
f"Error in {func.__name__}: {str(e)}",
|
|
exc_info=sys.exc_info(),
|
|
extra={
|
|
'extra_fields': {
|
|
'function': func.__name__,
|
|
'module': func.__module__
|
|
}
|
|
}
|
|
)
|
|
raise
|
|
return wrapper
|
|
return decorator
|
|
|
|
def log_performance(metric_name=None):
|
|
"""Decorator to log function performance"""
|
|
def decorator(func):
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
duration = int((time.time() - start_time) * 1000)
|
|
|
|
# Log performance
|
|
logger = logging.getLogger('talk2me.performance')
|
|
logger.info(
|
|
f"Performance: {metric_name or func.__name__}",
|
|
extra={
|
|
'extra_fields': {
|
|
'metric': metric_name or func.__name__,
|
|
'duration_ms': duration,
|
|
'function': func.__name__,
|
|
'module': func.__module__
|
|
}
|
|
}
|
|
)
|
|
|
|
return result
|
|
except Exception:
|
|
duration = int((time.time() - start_time) * 1000)
|
|
logger = logging.getLogger('talk2me.performance')
|
|
logger.warning(
|
|
f"Performance (failed): {metric_name or func.__name__}",
|
|
extra={
|
|
'extra_fields': {
|
|
'metric': metric_name or func.__name__,
|
|
'duration_ms': duration,
|
|
'function': func.__name__,
|
|
'module': func.__module__,
|
|
'status': 'failed'
|
|
}
|
|
}
|
|
)
|
|
raise
|
|
return wrapper
|
|
return decorator
|
|
|
|
@contextmanager
|
|
def log_context(**kwargs):
|
|
"""Context manager to add context to logs"""
|
|
# Store current context
|
|
old_context = {}
|
|
for key, value in kwargs.items():
|
|
if hasattr(g, key):
|
|
old_context[key] = getattr(g, key)
|
|
setattr(g, key, value)
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
# Restore old context
|
|
for key in kwargs:
|
|
if key in old_context:
|
|
setattr(g, key, old_context[key])
|
|
else:
|
|
delattr(g, key)
|
|
|
|
# Utility functions
|
|
def configure_logging(app, **kwargs):
|
|
"""Configure logging for the application"""
|
|
config = {
|
|
'log_level': kwargs.get('log_level', app.config.get('LOG_LEVEL', 'INFO')),
|
|
'log_file': kwargs.get('log_file', app.config.get('LOG_FILE')),
|
|
'error_log_file': kwargs.get('error_log_file'),
|
|
'max_bytes': kwargs.get('max_bytes', 50 * 1024 * 1024),
|
|
'backup_count': kwargs.get('backup_count', 10)
|
|
}
|
|
|
|
error_logger = ErrorLogger(app, config)
|
|
return error_logger
|
|
|
|
def get_logger(name='talk2me'):
|
|
"""Get a logger instance"""
|
|
return logging.getLogger(name)
|
|
|
|
def log_exception(error, message=None, **kwargs):
|
|
"""Log an exception with context"""
|
|
logger = logging.getLogger('talk2me.errors')
|
|
|
|
extra = {
|
|
'extra_fields': kwargs,
|
|
'request_id': getattr(g, 'request_id', None)
|
|
}
|
|
|
|
logger.error(
|
|
message or f"Exception: {str(error)}",
|
|
exc_info=(type(error), error, error.__traceback__),
|
|
extra=extra
|
|
) |