- Added debug endpoint to verify database contents - Enhanced logging in user list API endpoint - Fixed user query to properly return all users - Added frontend debugging for troubleshooting The user list now correctly displays all users in the system. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
930 lines
29 KiB
Python
930 lines
29 KiB
Python
"""Authentication and user management routes"""
|
|
import os
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, request, jsonify, g
|
|
from flask_jwt_extended import jwt_required, get_jwt_identity, get_jwt
|
|
from sqlalchemy import or_, func
|
|
from werkzeug.exceptions import BadRequest
|
|
|
|
from database import db
|
|
from auth_models import User, LoginHistory, UserSession
|
|
from auth import (
|
|
create_user, authenticate_user, create_tokens, create_user_session,
|
|
revoke_token, get_current_user, require_admin,
|
|
require_auth, revoke_user_sessions, update_user_usage_stats
|
|
)
|
|
from rate_limiter import rate_limit
|
|
from validators import Validators
|
|
from error_logger import log_exception
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
|
|
@auth_bp.route('/login', methods=['POST'])
|
|
@rate_limit(requests_per_minute=5, requests_per_hour=30)
|
|
def login():
|
|
"""User login endpoint"""
|
|
try:
|
|
data = request.get_json()
|
|
|
|
# Validate input
|
|
username_or_email = data.get('username') or data.get('email')
|
|
password = data.get('password')
|
|
|
|
if not username_or_email or not password:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Username/email and password required'
|
|
}), 400
|
|
|
|
# Authenticate user
|
|
user, error = authenticate_user(username_or_email, password)
|
|
if error:
|
|
# Log failed attempt
|
|
login_record = LoginHistory(
|
|
user_id=None,
|
|
login_method='password',
|
|
success=False,
|
|
failure_reason=error,
|
|
ip_address=request.remote_addr,
|
|
user_agent=request.headers.get('User-Agent')
|
|
)
|
|
db.session.add(login_record)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': False,
|
|
'error': error
|
|
}), 401
|
|
|
|
# Create session
|
|
session = create_user_session(user, {
|
|
'ip_address': request.remote_addr,
|
|
'user_agent': request.headers.get('User-Agent')
|
|
})
|
|
|
|
# Create tokens
|
|
tokens = create_tokens(user, session.session_id)
|
|
|
|
# Note: We can't get JWT payload here since we haven't set the JWT context yet
|
|
# The session JTI will be updated on the next authenticated request
|
|
db.session.commit()
|
|
|
|
# Log successful login with request info
|
|
login_record = LoginHistory(
|
|
user_id=user.id,
|
|
login_method='password',
|
|
success=True,
|
|
session_id=session.session_id,
|
|
ip_address=request.remote_addr,
|
|
user_agent=request.headers.get('User-Agent')
|
|
)
|
|
db.session.add(login_record)
|
|
db.session.commit()
|
|
|
|
# Store user info in Flask session for web access
|
|
from flask import session as flask_session
|
|
flask_session['user_id'] = str(user.id)
|
|
flask_session['username'] = user.username
|
|
flask_session['user_role'] = user.role
|
|
flask_session['logged_in'] = True
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'user': user.to_dict(),
|
|
'tokens': tokens,
|
|
'session_id': session.session_id
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Login error")
|
|
# In development, show the actual error
|
|
import os
|
|
if os.environ.get('FLASK_ENV') == 'development':
|
|
return jsonify({
|
|
'success': False,
|
|
'error': f'Login failed: {str(e)}'
|
|
}), 500
|
|
else:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Login failed'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/logout', methods=['POST'])
|
|
@jwt_required()
|
|
def logout():
|
|
"""User logout endpoint"""
|
|
try:
|
|
jti = get_jwt()["jti"]
|
|
user_id = get_jwt_identity()
|
|
|
|
# Revoke the access token
|
|
revoke_token(jti, 'access', user_id, 'User logout')
|
|
|
|
# Update login history
|
|
session_id = get_jwt().get('session_id')
|
|
if session_id:
|
|
login_record = LoginHistory.query.filter_by(
|
|
session_id=session_id,
|
|
logout_at=None
|
|
).first()
|
|
if login_record:
|
|
login_record.logout_at = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Successfully logged out'
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Logout error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Logout failed'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/refresh', methods=['POST'])
|
|
@jwt_required(refresh=True)
|
|
def refresh_token():
|
|
"""Refresh access token"""
|
|
try:
|
|
user_id = get_jwt_identity()
|
|
user = User.query.get(user_id)
|
|
|
|
if not user or not user.is_active:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Invalid user'
|
|
}), 401
|
|
|
|
# Check if user can login
|
|
can_login, reason = user.can_login()
|
|
if not can_login:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': reason
|
|
}), 401
|
|
|
|
# Create new access token
|
|
session_id = get_jwt().get('session_id')
|
|
tokens = create_tokens(user, session_id)
|
|
|
|
# Update session if exists
|
|
if session_id:
|
|
session = UserSession.query.filter_by(session_id=session_id).first()
|
|
if session:
|
|
session.refresh()
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'access_token': tokens['access_token'],
|
|
'expires_in': tokens['expires_in']
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Token refresh error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Token refresh failed'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/profile', methods=['GET'])
|
|
@require_auth
|
|
def get_profile():
|
|
"""Get current user profile"""
|
|
try:
|
|
return jsonify({
|
|
'success': True,
|
|
'user': g.current_user.to_dict(include_sensitive=True)
|
|
})
|
|
except Exception as e:
|
|
log_exception(e, "Profile fetch error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to fetch profile'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/profile', methods=['PUT'])
|
|
@require_auth
|
|
def update_profile():
|
|
"""Update user profile"""
|
|
try:
|
|
data = request.get_json()
|
|
user = g.current_user
|
|
|
|
# Update allowed fields
|
|
if 'full_name' in data:
|
|
user.full_name = Validators.sanitize_text(data['full_name'], max_length=255)
|
|
|
|
if 'avatar_url' in data:
|
|
validated_url = Validators.validate_url(data['avatar_url'])
|
|
if validated_url:
|
|
user.avatar_url = validated_url
|
|
|
|
if 'settings' in data and isinstance(data['settings'], dict):
|
|
user.settings = {**user.settings, **data['settings']}
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'user': user.to_dict(include_sensitive=True)
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Profile update error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to update profile'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/change-password', methods=['POST'])
|
|
@require_auth
|
|
def change_password():
|
|
"""Change user password"""
|
|
try:
|
|
data = request.get_json()
|
|
user = g.current_user
|
|
|
|
current_password = data.get('current_password')
|
|
new_password = data.get('new_password')
|
|
|
|
if not current_password or not new_password:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Current and new passwords required'
|
|
}), 400
|
|
|
|
# Verify current password
|
|
if not user.check_password(current_password):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Invalid current password'
|
|
}), 401
|
|
|
|
# Validate new password
|
|
if len(new_password) < 8:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Password must be at least 8 characters'
|
|
}), 400
|
|
|
|
# Update password
|
|
user.set_password(new_password)
|
|
db.session.commit()
|
|
|
|
# Revoke all sessions except current
|
|
session_id = get_jwt().get('session_id') if hasattr(g, 'jwt_payload') else None
|
|
revoked_count = revoke_user_sessions(user.id, except_session=session_id)
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Password changed successfully',
|
|
'revoked_sessions': revoked_count
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Password change error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to change password'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/regenerate-api-key', methods=['POST'])
|
|
@require_auth
|
|
def regenerate_api_key():
|
|
"""Regenerate user's API key"""
|
|
try:
|
|
user = g.current_user
|
|
new_key = user.regenerate_api_key()
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'api_key': new_key,
|
|
'created_at': user.api_key_created_at.isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "API key regeneration error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to regenerate API key'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/sessions', methods=['GET'])
|
|
@require_auth
|
|
def get_user_sessions():
|
|
"""Get user's active sessions"""
|
|
try:
|
|
sessions = UserSession.query.filter_by(
|
|
user_id=g.current_user.id
|
|
).filter(
|
|
UserSession.expires_at > datetime.utcnow()
|
|
).order_by(UserSession.last_active_at.desc()).all()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'sessions': [s.to_dict() for s in sessions]
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Sessions fetch error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to fetch sessions'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/sessions/<session_id>', methods=['DELETE'])
|
|
@require_auth
|
|
def revoke_session(session_id):
|
|
"""Revoke a specific session"""
|
|
try:
|
|
session = UserSession.query.filter_by(
|
|
session_id=session_id,
|
|
user_id=g.current_user.id
|
|
).first()
|
|
|
|
if not session:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Session not found'
|
|
}), 404
|
|
|
|
# Revoke tokens
|
|
if session.access_token_jti:
|
|
revoke_token(session.access_token_jti, 'access', g.current_user.id, 'Session revoked by user')
|
|
if session.refresh_token_jti:
|
|
revoke_token(session.refresh_token_jti, 'refresh', g.current_user.id, 'Session revoked by user')
|
|
|
|
# Delete session
|
|
db.session.delete(session)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Session revoked successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Session revocation error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to revoke session'
|
|
}), 500
|
|
|
|
|
|
# Admin endpoints for user management
|
|
|
|
@auth_bp.route('/admin/users', methods=['GET'])
|
|
@require_admin
|
|
def admin_list_users():
|
|
"""List all users (admin only)"""
|
|
try:
|
|
# Get query parameters
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = request.args.get('per_page', 20, type=int)
|
|
search = request.args.get('search', '')
|
|
role = request.args.get('role')
|
|
status = request.args.get('status')
|
|
sort_by = request.args.get('sort_by', 'created_at')
|
|
sort_order = request.args.get('sort_order', 'desc')
|
|
|
|
# Build query
|
|
query = User.query
|
|
|
|
# Debug logging
|
|
logger.info(f"Admin user list query parameters: page={page}, per_page={per_page}, search={search}, role={role}, status={status}, sort_by={sort_by}, sort_order={sort_order}")
|
|
|
|
# Search filter
|
|
if search:
|
|
search_term = f'%{search}%'
|
|
query = query.filter(or_(
|
|
User.email.ilike(search_term),
|
|
User.username.ilike(search_term),
|
|
User.full_name.ilike(search_term)
|
|
))
|
|
logger.info(f"Applied search filter: {search_term}")
|
|
|
|
# Role filter
|
|
if role:
|
|
query = query.filter(User.role == role)
|
|
logger.info(f"Applied role filter: {role}")
|
|
|
|
# Status filter
|
|
if status == 'active':
|
|
query = query.filter(User.is_active == True, User.is_suspended == False)
|
|
logger.info(f"Applied status filter: active")
|
|
elif status == 'suspended':
|
|
query = query.filter(User.is_suspended == True)
|
|
logger.info(f"Applied status filter: suspended")
|
|
elif status == 'inactive':
|
|
query = query.filter(User.is_active == False)
|
|
logger.info(f"Applied status filter: inactive")
|
|
|
|
# Sorting
|
|
order_column = getattr(User, sort_by, User.created_at)
|
|
if sort_order == 'desc':
|
|
query = query.order_by(order_column.desc())
|
|
else:
|
|
query = query.order_by(order_column.asc())
|
|
logger.info(f"Applied sorting: {sort_by} {sort_order}")
|
|
|
|
# Log the SQL query being generated
|
|
try:
|
|
sql = str(query.statement.compile(compile_kwargs={"literal_binds": True}))
|
|
logger.info(f"Generated SQL query: {sql}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not log SQL query: {e}")
|
|
|
|
# Count total results before pagination
|
|
total_count = query.count()
|
|
logger.info(f"Total users matching query (before pagination): {total_count}")
|
|
|
|
# Get all users without pagination for debugging
|
|
all_matching_users = query.all()
|
|
logger.info(f"All matching users: {[u.username for u in all_matching_users[:10]]}") # Log first 10 usernames
|
|
|
|
# Paginate
|
|
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
|
|
|
|
# Debug logging for results
|
|
logger.info(f"Query returned {pagination.total} total users, showing {len(pagination.items)} on page {pagination.page}")
|
|
logger.info(f"Pagination items: {[u.username for u in pagination.items]}")
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'users': [u.to_dict(include_sensitive=True) for u in pagination.items],
|
|
'pagination': {
|
|
'page': pagination.page,
|
|
'per_page': pagination.per_page,
|
|
'total': pagination.total,
|
|
'pages': pagination.pages
|
|
}
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user list error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to fetch users'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users', methods=['POST'])
|
|
@require_admin
|
|
def admin_create_user():
|
|
"""Create a new user (admin only)"""
|
|
try:
|
|
data = request.get_json()
|
|
|
|
# Validate required fields
|
|
email = data.get('email')
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
if not email or not username or not password:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Email, username, and password are required'
|
|
}), 400
|
|
|
|
# Validate email
|
|
if not Validators.validate_email(email):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Invalid email address'
|
|
}), 400
|
|
|
|
# Create user
|
|
user, error = create_user(
|
|
email=email,
|
|
username=username,
|
|
password=password,
|
|
full_name=data.get('full_name'),
|
|
role=data.get('role', 'user'),
|
|
is_verified=data.get('is_verified', False)
|
|
)
|
|
|
|
if error:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': error
|
|
}), 400
|
|
|
|
# Set additional properties
|
|
if 'rate_limit_per_minute' in data:
|
|
user.rate_limit_per_minute = data['rate_limit_per_minute']
|
|
if 'rate_limit_per_hour' in data:
|
|
user.rate_limit_per_hour = data['rate_limit_per_hour']
|
|
if 'rate_limit_per_day' in data:
|
|
user.rate_limit_per_day = data['rate_limit_per_day']
|
|
if 'permissions' in data:
|
|
user.permissions = data['permissions']
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'user': user.to_dict(include_sensitive=True)
|
|
}), 201
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user creation error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to create user'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/<user_id>', methods=['GET'])
|
|
@require_admin
|
|
def admin_get_user(user_id):
|
|
"""Get user details (admin only)"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User not found'
|
|
}), 404
|
|
|
|
# Get additional info
|
|
login_history = LoginHistory.query.filter_by(
|
|
user_id=user.id
|
|
).order_by(LoginHistory.login_at.desc()).limit(10).all()
|
|
|
|
active_sessions = UserSession.query.filter_by(
|
|
user_id=user.id
|
|
).filter(
|
|
UserSession.expires_at > datetime.utcnow()
|
|
).all()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'user': user.to_dict(include_sensitive=True),
|
|
'login_history': [l.to_dict() for l in login_history],
|
|
'active_sessions': [s.to_dict() for s in active_sessions]
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user fetch error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to fetch user'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/<user_id>', methods=['PUT'])
|
|
@require_admin
|
|
def admin_update_user(user_id):
|
|
"""Update user (admin only)"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User not found'
|
|
}), 404
|
|
|
|
data = request.get_json()
|
|
|
|
# Update allowed fields
|
|
if 'email' in data:
|
|
if Validators.validate_email(data['email']):
|
|
user.email = data['email']
|
|
|
|
if 'username' in data:
|
|
user.username = data['username']
|
|
|
|
if 'full_name' in data:
|
|
user.full_name = data['full_name']
|
|
|
|
if 'role' in data and data['role'] in ['admin', 'user']:
|
|
user.role = data['role']
|
|
|
|
if 'is_active' in data:
|
|
user.is_active = data['is_active']
|
|
|
|
if 'is_verified' in data:
|
|
user.is_verified = data['is_verified']
|
|
|
|
if 'permissions' in data:
|
|
user.permissions = data['permissions']
|
|
|
|
if 'rate_limit_per_minute' in data:
|
|
user.rate_limit_per_minute = data['rate_limit_per_minute']
|
|
|
|
if 'rate_limit_per_hour' in data:
|
|
user.rate_limit_per_hour = data['rate_limit_per_hour']
|
|
|
|
if 'rate_limit_per_day' in data:
|
|
user.rate_limit_per_day = data['rate_limit_per_day']
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'user': user.to_dict(include_sensitive=True)
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user update error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to update user'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/<user_id>', methods=['DELETE'])
|
|
@require_admin
|
|
def admin_delete_user(user_id):
|
|
"""Delete user (admin only)"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User not found'
|
|
}), 404
|
|
|
|
# Don't allow deleting admin users
|
|
if user.is_admin:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Cannot delete admin users'
|
|
}), 403
|
|
|
|
# Revoke all sessions
|
|
revoke_user_sessions(user.id)
|
|
|
|
# Delete user (cascades to related records)
|
|
db.session.delete(user)
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'User deleted successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user deletion error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to delete user'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/<user_id>/suspend', methods=['POST'])
|
|
@require_admin
|
|
def admin_suspend_user(user_id):
|
|
"""Suspend user account (admin only)"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User not found'
|
|
}), 404
|
|
|
|
data = request.get_json()
|
|
reason = data.get('reason', 'Policy violation')
|
|
until = data.get('until') # ISO datetime string or None for indefinite
|
|
|
|
# Parse until date if provided
|
|
suspend_until = None
|
|
if until:
|
|
try:
|
|
suspend_until = datetime.fromisoformat(until.replace('Z', '+00:00'))
|
|
except:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Invalid date format for until'
|
|
}), 400
|
|
|
|
# Suspend user
|
|
user.suspend(reason, suspend_until)
|
|
|
|
# Revoke all sessions
|
|
revoked_count = revoke_user_sessions(user.id)
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'User suspended successfully',
|
|
'revoked_sessions': revoked_count,
|
|
'suspended_until': suspend_until.isoformat() if suspend_until else None
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user suspension error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to suspend user'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/<user_id>/unsuspend', methods=['POST'])
|
|
@require_admin
|
|
def admin_unsuspend_user(user_id):
|
|
"""Unsuspend user account (admin only)"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User not found'
|
|
}), 404
|
|
|
|
user.unsuspend()
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'User unsuspended successfully'
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin user unsuspension error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to unsuspend user'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/<user_id>/reset-password', methods=['POST'])
|
|
@require_admin
|
|
def admin_reset_password(user_id):
|
|
"""Reset user password (admin only)"""
|
|
try:
|
|
user = User.query.get(user_id)
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User not found'
|
|
}), 404
|
|
|
|
data = request.get_json()
|
|
new_password = data.get('password')
|
|
|
|
if not new_password or len(new_password) < 8:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Password must be at least 8 characters'
|
|
}), 400
|
|
|
|
# Reset password
|
|
user.set_password(new_password)
|
|
user.failed_login_attempts = 0
|
|
user.locked_until = None
|
|
|
|
# Revoke all sessions
|
|
revoked_count = revoke_user_sessions(user.id)
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Password reset successfully',
|
|
'revoked_sessions': revoked_count
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin password reset error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to reset password'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/users/bulk', methods=['POST'])
|
|
@require_admin
|
|
def admin_bulk_operation():
|
|
"""Perform bulk operations on users (admin only)"""
|
|
try:
|
|
data = request.get_json()
|
|
user_ids = data.get('user_ids', [])
|
|
operation = data.get('operation')
|
|
|
|
if not user_ids or not operation:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'User IDs and operation required'
|
|
}), 400
|
|
|
|
# Get users
|
|
users = User.query.filter(User.id.in_(user_ids)).all()
|
|
|
|
if not users:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'No users found'
|
|
}), 404
|
|
|
|
results = {
|
|
'success': 0,
|
|
'failed': 0,
|
|
'errors': []
|
|
}
|
|
|
|
for user in users:
|
|
try:
|
|
if operation == 'suspend':
|
|
user.suspend(data.get('reason', 'Bulk suspension'))
|
|
revoke_user_sessions(user.id)
|
|
elif operation == 'unsuspend':
|
|
user.unsuspend()
|
|
elif operation == 'activate':
|
|
user.is_active = True
|
|
elif operation == 'deactivate':
|
|
user.is_active = False
|
|
revoke_user_sessions(user.id)
|
|
elif operation == 'verify':
|
|
user.is_verified = True
|
|
elif operation == 'unverify':
|
|
user.is_verified = False
|
|
elif operation == 'delete':
|
|
if not user.is_admin:
|
|
revoke_user_sessions(user.id)
|
|
db.session.delete(user)
|
|
else:
|
|
results['errors'].append(f"Cannot delete admin user {user.username}")
|
|
results['failed'] += 1
|
|
continue
|
|
else:
|
|
results['errors'].append(f"Unknown operation for user {user.username}")
|
|
results['failed'] += 1
|
|
continue
|
|
|
|
results['success'] += 1
|
|
except Exception as e:
|
|
results['errors'].append(f"Failed for user {user.username}: {str(e)}")
|
|
results['failed'] += 1
|
|
|
|
db.session.commit()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'results': results
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin bulk operation error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to perform bulk operation'
|
|
}), 500
|
|
|
|
|
|
@auth_bp.route('/admin/stats/users', methods=['GET'])
|
|
@require_admin
|
|
def admin_user_stats():
|
|
"""Get user statistics (admin only)"""
|
|
try:
|
|
stats = {
|
|
'total_users': User.query.count(),
|
|
'active_users': User.query.filter(
|
|
User.is_active == True,
|
|
User.is_suspended == False
|
|
).count(),
|
|
'suspended_users': User.query.filter(User.is_suspended == True).count(),
|
|
'verified_users': User.query.filter(User.is_verified == True).count(),
|
|
'admin_users': User.query.filter(User.role == 'admin').count(),
|
|
'users_by_role': dict(
|
|
db.session.query(User.role, func.count(User.id))
|
|
.group_by(User.role).all()
|
|
),
|
|
'recent_registrations': User.query.filter(
|
|
User.created_at >= datetime.utcnow() - timedelta(days=7)
|
|
).count(),
|
|
'active_sessions': UserSession.query.filter(
|
|
UserSession.expires_at > datetime.utcnow()
|
|
).count()
|
|
}
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'stats': stats
|
|
})
|
|
|
|
except Exception as e:
|
|
log_exception(e, "Admin stats error")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Failed to fetch statistics'
|
|
}), 500 |