talk2me/admin/__init__.py
Adolfo Delorenzo e5a274d191 Fix TTS server status errors and startup warnings
- Fixed 'app is not defined' errors by using current_app
- Improved TTS health check to handle missing /health endpoint
- Fixed database trigger creation to be idempotent
- Added .env.example with all configuration options
- Updated README with security configuration instructions
2025-06-03 20:08:19 -06:00

667 lines
28 KiB
Python

from flask import Blueprint, request, jsonify, render_template, redirect, url_for, session, current_app
from functools import wraps
import os
import logging
import json
from datetime import datetime, timedelta
import redis
import psycopg2
from psycopg2.extras import RealDictCursor
import time
logger = logging.getLogger(__name__)
# Create admin blueprint
admin_bp = Blueprint('admin', __name__,
template_folder='templates',
static_folder='static',
static_url_path='/admin/static')
# Initialize Redis and PostgreSQL connections
redis_client = None
pg_conn = None
def init_admin(app):
"""Initialize admin module with app configuration"""
global redis_client, pg_conn
try:
# Initialize Redis
redis_client = redis.from_url(
app.config.get('REDIS_URL', 'redis://localhost:6379/0'),
decode_responses=True
)
redis_client.ping()
logger.info("Redis connection established for admin dashboard")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
redis_client = None
try:
# Initialize PostgreSQL
pg_conn = psycopg2.connect(
app.config.get('DATABASE_URL', 'postgresql://localhost/talk2me')
)
logger.info("PostgreSQL connection established for admin dashboard")
except Exception as e:
logger.error(f"Failed to connect to PostgreSQL: {e}")
pg_conn = None
def admin_required(f):
"""Decorator to require admin authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Check if user is logged in with admin role (from unified login)
user_role = session.get('user_role')
if user_role == 'admin':
return f(*args, **kwargs)
# Also support the old admin token for backward compatibility
auth_token = request.headers.get('X-Admin-Token')
session_token = session.get('admin_token')
expected_token = os.environ.get('ADMIN_TOKEN', 'default-admin-token')
if auth_token == expected_token or session_token == expected_token:
if auth_token == expected_token:
session['admin_token'] = expected_token
return f(*args, **kwargs)
# For API endpoints, return JSON error
if request.path.startswith('/admin/api/'):
return jsonify({'error': 'Unauthorized'}), 401
# For web pages, redirect to unified login
return redirect(url_for('login', next=request.url))
return decorated_function
@admin_bp.route('/login', methods=['GET', 'POST'])
def login():
"""Admin login - redirect to main login page"""
# Redirect to the unified login page
next_url = request.args.get('next', url_for('admin.dashboard'))
return redirect(url_for('login', next=next_url))
@admin_bp.route('/logout')
def logout():
"""Admin logout - redirect to main logout"""
# Clear all session data
session.clear()
return redirect(url_for('index'))
@admin_bp.route('/')
@admin_bp.route('/dashboard')
@admin_required
def dashboard():
"""Main admin dashboard"""
return render_template('dashboard.html')
@admin_bp.route('/users')
@admin_required
def users():
"""User management page"""
# The template is in the main templates folder, not admin/templates
return render_template('admin_users.html')
# Analytics API endpoints
@admin_bp.route('/api/stats/overview')
@admin_required
def get_overview_stats():
"""Get overview statistics"""
try:
stats = {
'requests': {'total': 0, 'today': 0, 'hour': 0},
'translations': {'total': 0, 'today': 0},
'transcriptions': {'total': 0, 'today': 0},
'active_sessions': 0,
'error_rate': 0,
'cache_hit_rate': 0,
'system_health': check_system_health()
}
# Get data from Redis
if redis_client:
try:
# Request counts
stats['requests']['total'] = int(redis_client.get('stats:requests:total') or 0)
stats['requests']['today'] = int(redis_client.get(f'stats:requests:daily:{datetime.now().strftime("%Y-%m-%d")}') or 0)
stats['requests']['hour'] = int(redis_client.get(f'stats:requests:hourly:{datetime.now().strftime("%Y-%m-%d-%H")}') or 0)
# Operation counts
stats['translations']['total'] = int(redis_client.get('stats:translations:total') or 0)
stats['translations']['today'] = int(redis_client.get(f'stats:translations:daily:{datetime.now().strftime("%Y-%m-%d")}') or 0)
stats['transcriptions']['total'] = int(redis_client.get('stats:transcriptions:total') or 0)
stats['transcriptions']['today'] = int(redis_client.get(f'stats:transcriptions:daily:{datetime.now().strftime("%Y-%m-%d")}') or 0)
# Active sessions
stats['active_sessions'] = len(redis_client.keys('session:*'))
# Cache stats
cache_hits = int(redis_client.get('stats:cache:hits') or 0)
cache_misses = int(redis_client.get('stats:cache:misses') or 0)
if cache_hits + cache_misses > 0:
stats['cache_hit_rate'] = round((cache_hits / (cache_hits + cache_misses)) * 100, 2)
# Error rate
total_requests = stats['requests']['today']
errors_today = int(redis_client.get(f'stats:errors:daily:{datetime.now().strftime("%Y-%m-%d")}') or 0)
if total_requests > 0:
stats['error_rate'] = round((errors_today / total_requests) * 100, 2)
except Exception as e:
logger.error(f"Error fetching Redis stats: {e}")
return jsonify(stats)
except Exception as e:
logger.error(f"Error in get_overview_stats: {e}")
return jsonify({'error': str(e)}), 500
@admin_bp.route('/api/stats/requests/<timeframe>')
@admin_required
def get_request_stats(timeframe):
"""Get request statistics for different timeframes"""
try:
if timeframe not in ['minute', 'hour', 'day']:
return jsonify({'error': 'Invalid timeframe'}), 400
data = []
labels = []
if redis_client:
now = datetime.now()
if timeframe == 'minute':
# Last 60 minutes
for i in range(59, -1, -1):
time_key = (now - timedelta(minutes=i)).strftime('%Y-%m-%d-%H-%M')
count = int(redis_client.get(f'stats:requests:minute:{time_key}') or 0)
data.append(count)
labels.append((now - timedelta(minutes=i)).strftime('%H:%M'))
elif timeframe == 'hour':
# Last 24 hours
for i in range(23, -1, -1):
time_key = (now - timedelta(hours=i)).strftime('%Y-%m-%d-%H')
count = int(redis_client.get(f'stats:requests:hourly:{time_key}') or 0)
data.append(count)
labels.append((now - timedelta(hours=i)).strftime('%H:00'))
elif timeframe == 'day':
# Last 30 days
for i in range(29, -1, -1):
time_key = (now - timedelta(days=i)).strftime('%Y-%m-%d')
count = int(redis_client.get(f'stats:requests:daily:{time_key}') or 0)
data.append(count)
labels.append((now - timedelta(days=i)).strftime('%m/%d'))
return jsonify({
'labels': labels,
'data': data,
'timeframe': timeframe
})
except Exception as e:
logger.error(f"Error in get_request_stats: {e}")
return jsonify({'error': str(e)}), 500
@admin_bp.route('/api/stats/operations')
@admin_required
def get_operation_stats():
"""Get translation and transcription statistics"""
try:
stats = {
'translations': {'data': [], 'labels': []},
'transcriptions': {'data': [], 'labels': []},
'language_pairs': {},
'response_times': {'translation': [], 'transcription': []}
}
if redis_client:
now = datetime.now()
# Get daily stats for last 7 days
for i in range(6, -1, -1):
date_key = (now - timedelta(days=i)).strftime('%Y-%m-%d')
date_label = (now - timedelta(days=i)).strftime('%m/%d')
# Translation counts
trans_count = int(redis_client.get(f'stats:translations:daily:{date_key}') or 0)
stats['translations']['data'].append(trans_count)
stats['translations']['labels'].append(date_label)
# Transcription counts
transcr_count = int(redis_client.get(f'stats:transcriptions:daily:{date_key}') or 0)
stats['transcriptions']['data'].append(transcr_count)
stats['transcriptions']['labels'].append(date_label)
# Get language pair statistics
lang_pairs = redis_client.hgetall('stats:language_pairs') or {}
stats['language_pairs'] = {k: int(v) for k, v in lang_pairs.items()}
# Get response times (last 100 operations)
trans_times = redis_client.lrange('stats:response_times:translation', 0, 99)
transcr_times = redis_client.lrange('stats:response_times:transcription', 0, 99)
stats['response_times']['translation'] = [float(t) for t in trans_times[:20]]
stats['response_times']['transcription'] = [float(t) for t in transcr_times[:20]]
return jsonify(stats)
except Exception as e:
logger.error(f"Error in get_operation_stats: {e}")
return jsonify({'error': str(e)}), 500
@admin_bp.route('/api/stats/errors')
@admin_required
def get_error_stats():
"""Get error statistics"""
try:
stats = {
'error_types': {},
'error_timeline': {'data': [], 'labels': []},
'recent_errors': []
}
if pg_conn:
try:
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
# Get error types distribution
cursor.execute("""
SELECT error_type, COUNT(*) as count
FROM error_logs
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY error_type
ORDER BY count DESC
LIMIT 10
""")
error_types = cursor.fetchall()
stats['error_types'] = {row['error_type']: row['count'] for row in error_types}
# Get error timeline (hourly for last 24 hours)
cursor.execute("""
SELECT
DATE_TRUNC('hour', created_at) as hour,
COUNT(*) as count
FROM error_logs
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY hour
ORDER BY hour
""")
timeline = cursor.fetchall()
for row in timeline:
stats['error_timeline']['labels'].append(row['hour'].strftime('%H:00'))
stats['error_timeline']['data'].append(row['count'])
# Get recent errors
cursor.execute("""
SELECT
error_type,
error_message,
endpoint,
created_at
FROM error_logs
ORDER BY created_at DESC
LIMIT 10
""")
recent = cursor.fetchall()
stats['recent_errors'] = [
{
'type': row['error_type'],
'message': row['error_message'][:100],
'endpoint': row['endpoint'],
'time': row['created_at'].isoformat()
}
for row in recent
]
except Exception as e:
logger.error(f"Error querying PostgreSQL: {e}")
# Fallback to Redis if PostgreSQL fails
if not stats['error_types'] and redis_client:
error_types = redis_client.hgetall('stats:error_types') or {}
stats['error_types'] = {k: int(v) for k, v in error_types.items()}
# Get hourly error counts
now = datetime.now()
for i in range(23, -1, -1):
hour_key = (now - timedelta(hours=i)).strftime('%Y-%m-%d-%H')
count = int(redis_client.get(f'stats:errors:hourly:{hour_key}') or 0)
stats['error_timeline']['data'].append(count)
stats['error_timeline']['labels'].append((now - timedelta(hours=i)).strftime('%H:00'))
return jsonify(stats)
except Exception as e:
logger.error(f"Error in get_error_stats: {e}")
return jsonify({'error': str(e)}), 500
@admin_bp.route('/api/stats/performance')
@admin_required
def get_performance_stats():
"""Get performance metrics"""
try:
stats = {
'response_times': {
'translation': {'avg': 0, 'p95': 0, 'p99': 0},
'transcription': {'avg': 0, 'p95': 0, 'p99': 0},
'tts': {'avg': 0, 'p95': 0, 'p99': 0}
},
'throughput': {'data': [], 'labels': []},
'slow_requests': []
}
if redis_client:
# Calculate response time percentiles
for operation in ['translation', 'transcription', 'tts']:
times = redis_client.lrange(f'stats:response_times:{operation}', 0, -1)
if times:
times = sorted([float(t) for t in times])
stats['response_times'][operation]['avg'] = round(sum(times) / len(times), 2)
stats['response_times'][operation]['p95'] = round(times[int(len(times) * 0.95)], 2)
stats['response_times'][operation]['p99'] = round(times[int(len(times) * 0.99)], 2)
# Get throughput (requests per minute for last hour)
now = datetime.now()
for i in range(59, -1, -1):
time_key = (now - timedelta(minutes=i)).strftime('%Y-%m-%d-%H-%M')
count = int(redis_client.get(f'stats:requests:minute:{time_key}') or 0)
stats['throughput']['data'].append(count)
stats['throughput']['labels'].append((now - timedelta(minutes=i)).strftime('%H:%M'))
# Get slow requests
slow_requests = redis_client.lrange('stats:slow_requests', 0, 9)
stats['slow_requests'] = [json.loads(req) for req in slow_requests if req]
return jsonify(stats)
except Exception as e:
logger.error(f"Error in get_performance_stats: {e}")
return jsonify({'error': str(e)}), 500
@admin_bp.route('/api/export/<data_type>')
@admin_required
def export_data(data_type):
"""Export analytics data"""
try:
if data_type not in ['requests', 'errors', 'performance', 'all']:
return jsonify({'error': 'Invalid data type'}), 400
export_data = {
'export_time': datetime.now().isoformat(),
'data_type': data_type
}
if data_type in ['requests', 'all']:
# Export request data
request_data = []
if redis_client:
# Get daily stats for last 30 days
now = datetime.now()
for i in range(29, -1, -1):
date_key = (now - timedelta(days=i)).strftime('%Y-%m-%d')
request_data.append({
'date': date_key,
'requests': int(redis_client.get(f'stats:requests:daily:{date_key}') or 0),
'translations': int(redis_client.get(f'stats:translations:daily:{date_key}') or 0),
'transcriptions': int(redis_client.get(f'stats:transcriptions:daily:{date_key}') or 0),
'errors': int(redis_client.get(f'stats:errors:daily:{date_key}') or 0)
})
export_data['requests'] = request_data
if data_type in ['errors', 'all']:
# Export error data from PostgreSQL
error_data = []
if pg_conn:
try:
with pg_conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("""
SELECT * FROM error_logs
WHERE created_at > NOW() - INTERVAL '7 days'
ORDER BY created_at DESC
""")
errors = cursor.fetchall()
error_data = [dict(row) for row in errors]
except Exception as e:
logger.error(f"Error exporting from PostgreSQL: {e}")
export_data['errors'] = error_data
if data_type in ['performance', 'all']:
# Export performance data
perf_data = {
'response_times': {},
'slow_requests': []
}
if redis_client:
for op in ['translation', 'transcription', 'tts']:
times = redis_client.lrange(f'stats:response_times:{op}', 0, -1)
perf_data['response_times'][op] = [float(t) for t in times]
slow_reqs = redis_client.lrange('stats:slow_requests', 0, -1)
perf_data['slow_requests'] = [json.loads(req) for req in slow_reqs if req]
export_data['performance'] = perf_data
# Return as downloadable JSON
response = jsonify(export_data)
response.headers['Content-Disposition'] = f'attachment; filename=talk2me_analytics_{data_type}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
return response
except Exception as e:
logger.error(f"Error in export_data: {e}")
return jsonify({'error': str(e)}), 500
def check_system_health():
"""Check health of system components"""
health = {
'redis': 'unknown',
'postgresql': 'unknown',
'tts': 'unknown',
'overall': 'healthy'
}
# Check Redis
if redis_client:
try:
redis_client.ping()
health['redis'] = 'healthy'
except:
health['redis'] = 'unhealthy'
health['overall'] = 'degraded'
else:
health['redis'] = 'not_configured'
health['overall'] = 'degraded'
# Check PostgreSQL
if pg_conn:
try:
with pg_conn.cursor() as cursor:
cursor.execute("SELECT 1")
cursor.fetchone()
health['postgresql'] = 'healthy'
except:
health['postgresql'] = 'unhealthy'
health['overall'] = 'degraded'
else:
health['postgresql'] = 'not_configured'
health['overall'] = 'degraded'
# Check TTS Server
tts_server_url = current_app.config.get('TTS_SERVER_URL')
if tts_server_url:
try:
import requests
# Extract base URL from the speech endpoint
base_url = tts_server_url.rsplit('/v1/audio/speech', 1)[0] if '/v1/audio/speech' in tts_server_url else tts_server_url
health_url = f"{base_url}/health" if not tts_server_url.endswith('/health') else tts_server_url
response = requests.get(health_url, timeout=2)
if response.status_code == 200:
health['tts'] = 'healthy'
health['tts_details'] = response.json() if response.headers.get('content-type') == 'application/json' else {}
elif response.status_code == 404:
# Try voices endpoint as fallback
voices_url = f"{base_url}/voices" if base_url else f"{tts_server_url.rsplit('/speech', 1)[0]}/voices"
voices_response = requests.get(voices_url, timeout=2)
if voices_response.status_code == 200:
health['tts'] = 'healthy'
else:
health['tts'] = 'unhealthy'
health['overall'] = 'degraded'
else:
health['tts'] = 'unhealthy'
health['overall'] = 'degraded'
except requests.exceptions.RequestException:
health['tts'] = 'unreachable'
health['overall'] = 'degraded'
except Exception as e:
health['tts'] = 'error'
health['overall'] = 'degraded'
logger.error(f"TTS health check error: {e}")
else:
health['tts'] = 'not_configured'
# TTS is optional, so don't degrade overall health
return health
# TTS Server Status endpoint
@admin_bp.route('/api/tts/status')
@admin_required
def get_tts_status():
"""Get detailed TTS server status"""
try:
tts_info = {
'configured': False,
'status': 'not_configured',
'server_url': None,
'api_key_configured': False,
'details': {}
}
# Check configuration
tts_server_url = current_app.config.get('TTS_SERVER_URL')
tts_api_key = current_app.config.get('TTS_API_KEY')
if tts_server_url:
tts_info['configured'] = True
tts_info['server_url'] = tts_server_url
tts_info['api_key_configured'] = bool(tts_api_key)
# Try to get detailed status
try:
import requests
headers = {}
if tts_api_key:
headers['Authorization'] = f'Bearer {tts_api_key}'
# Check health endpoint
# Extract base URL from the speech endpoint
base_url = tts_server_url.rsplit('/v1/audio/speech', 1)[0] if '/v1/audio/speech' in tts_server_url else tts_server_url
health_url = f"{base_url}/health" if not tts_server_url.endswith('/health') else tts_server_url
response = requests.get(health_url, headers=headers, timeout=3)
if response.status_code == 200:
tts_info['status'] = 'healthy'
if response.headers.get('content-type') == 'application/json':
tts_info['details'] = response.json()
else:
tts_info['status'] = 'unhealthy'
tts_info['details']['error'] = f'Health check returned status {response.status_code}'
# Try to get voice list
try:
voices_url = f"{base_url}/voices" if base_url else f"{tts_server_url.rsplit('/speech', 1)[0]}/voices"
voices_response = requests.get(voices_url, headers=headers, timeout=3)
if voices_response.status_code == 200 and voices_response.headers.get('content-type') == 'application/json':
voices_data = voices_response.json()
tts_info['details']['available_voices'] = voices_data.get('voices', [])
tts_info['details']['voice_count'] = len(voices_data.get('voices', []))
# If we can get voices, consider the server healthy even if health endpoint doesn't exist
if tts_info['status'] == 'unhealthy' and response.status_code == 404:
tts_info['status'] = 'healthy'
tts_info['details'].pop('error', None)
except:
pass
except requests.exceptions.ConnectionError:
tts_info['status'] = 'unreachable'
tts_info['details']['error'] = 'Cannot connect to TTS server'
except requests.exceptions.Timeout:
tts_info['status'] = 'timeout'
tts_info['details']['error'] = 'TTS server request timed out'
except Exception as e:
tts_info['status'] = 'error'
tts_info['details']['error'] = str(e)
# Get recent TTS usage stats from Redis
if redis_client:
try:
now = datetime.now()
tts_info['usage'] = {
'total': int(redis_client.get('stats:tts:total') or 0),
'today': int(redis_client.get(f'stats:tts:daily:{now.strftime("%Y-%m-%d")}') or 0),
'this_hour': int(redis_client.get(f'stats:tts:hourly:{now.strftime("%Y-%m-%d-%H")}') or 0)
}
# Get recent response times
response_times = redis_client.lrange('stats:response_times:tts', -100, -1)
if response_times:
times = [float(t) for t in response_times]
tts_info['performance'] = {
'avg_response_time': round(sum(times) / len(times), 2),
'min_response_time': round(min(times), 2),
'max_response_time': round(max(times), 2)
}
except Exception as e:
logger.error(f"Error getting TTS stats from Redis: {e}")
return jsonify(tts_info)
except Exception as e:
logger.error(f"Error in get_tts_status: {e}")
return jsonify({'error': str(e)}), 500
# WebSocket support for real-time updates (using Server-Sent Events as fallback)
@admin_bp.route('/api/stream/updates')
@admin_required
def stream_updates():
"""Stream real-time updates using Server-Sent Events"""
def generate():
last_update = time.time()
while True:
# Send update every 5 seconds
if time.time() - last_update > 5:
try:
# Get current stats
stats = {
'timestamp': datetime.now().isoformat(),
'requests_per_minute': 0,
'active_sessions': 0,
'recent_errors': 0
}
if redis_client:
# Current requests per minute
current_minute = datetime.now().strftime('%Y-%m-%d-%H-%M')
stats['requests_per_minute'] = int(redis_client.get(f'stats:requests:minute:{current_minute}') or 0)
# Active sessions
stats['active_sessions'] = len(redis_client.keys('session:*'))
# Recent errors
current_hour = datetime.now().strftime('%Y-%m-%d-%H')
stats['recent_errors'] = int(redis_client.get(f'stats:errors:hourly:{current_hour}') or 0)
yield f"data: {json.dumps(stats)}\n\n"
last_update = time.time()
except Exception as e:
logger.error(f"Error in stream_updates: {e}")
yield f"data: {json.dumps({'error': str(e)})}\n\n"
time.sleep(1)
return current_app.response_class(
generate(),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)