#!/usr/bin/env python3 """ Portainer Core MCP Server This module provides the core MCP server for Portainer Business Edition, combining essential user management and teams/RBAC functionality into a single unified server. The server implements comprehensive management capabilities including: - User authentication and session management - User CRUD operations with role assignments - Teams creation and membership management - Role-based access control (RBAC) configuration - Resource access controls for fine-grained permissions - System settings management This core server serves as the foundation for Portainer management, providing the essential identity and access management features that other specialized servers (Docker, Kubernetes, Edge, etc.) depend on. Complexity: O(1) for all operations (simple HTTP requests) Dependencies: httpx for async HTTP, mcp for server protocol Call Flow: MCP client -> handle_call_tool() -> make_request() -> Portainer API Environment Variables: PORTAINER_URL: Base URL of Portainer instance PORTAINER_API_KEY: API key for authentication MCP_MODE: Set to "true" to suppress logging """ import os import sys import json import asyncio from typing import Any, Dict, List, Optional # Suppress all logging to stderr os.environ["MCP_MODE"] = "true" import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions # Create server server = Server("portainer-core") # Store for our state portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live") api_key = os.getenv("PORTAINER_API_KEY", "") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools.""" return [ # Core tools types.Tool( name="test_connection", description="Test connection to Portainer", inputSchema={ "type": "object", "properties": {}, "required": [] } ), # User Management tools types.Tool( name="get_users", description="Get list of Portainer users", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="create_user", description="Create a new Portainer user", inputSchema={ "type": "object", "properties": { "username": { "type": "string", "description": "Username for the new user" }, "password": { "type": "string", "description": "Password for the new user" }, "role": { "type": "string", "description": "User role", "enum": ["Administrator", "StandardUser", "ReadOnlyUser"], "default": "StandardUser" } }, "required": ["username", "password"] } ), types.Tool( name="update_user", description="Update an existing user", inputSchema={ "type": "object", "properties": { "user_id": { "type": "integer", "description": "ID of the user to update" }, "password": { "type": "string", "description": "New password (optional)" }, "role": { "type": "string", "description": "New role (optional)", "enum": ["Administrator", "StandardUser", "ReadOnlyUser"] } }, "required": ["user_id"] } ), types.Tool( name="delete_user", description="Delete a user", inputSchema={ "type": "object", "properties": { "user_id": { "type": "integer", "description": "ID of the user to delete" } }, "required": ["user_id"] } ), # Teams Management tools types.Tool( name="get_teams", description="Get list of teams", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="create_team", description="Create a new team", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name of the team" }, "leaders": { "type": "array", "items": {"type": "integer"}, "description": "Array of user IDs to be team leaders (optional)" } }, "required": ["name"] } ), types.Tool( name="add_team_members", description="Add members to a team", inputSchema={ "type": "object", "properties": { "team_id": { "type": "integer", "description": "ID of the team" }, "user_ids": { "type": "array", "items": {"type": "integer"}, "description": "Array of user IDs to add to the team" } }, "required": ["team_id", "user_ids"] } ), types.Tool( name="remove_team_members", description="Remove members from a team", inputSchema={ "type": "object", "properties": { "team_id": { "type": "integer", "description": "ID of the team" }, "user_ids": { "type": "array", "items": {"type": "integer"}, "description": "Array of user IDs to remove from the team" } }, "required": ["team_id", "user_ids"] } ), types.Tool( name="delete_team", description="Delete a team", inputSchema={ "type": "object", "properties": { "team_id": { "type": "integer", "description": "ID of the team to delete" } }, "required": ["team_id"] } ), # RBAC tools types.Tool( name="get_roles", description="Get available roles", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="get_resource_controls", description="Get resource access controls", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="create_resource_control", description="Create resource access control", inputSchema={ "type": "object", "properties": { "resource_id": { "type": "string", "description": "ID of the resource" }, "resource_type": { "type": "string", "description": "Type of resource (container, service, volume, etc.)" }, "public": { "type": "boolean", "description": "Whether the resource is public", "default": False }, "administrators_only": { "type": "boolean", "description": "Whether only administrators can access", "default": False }, "users": { "type": "array", "items": { "type": "object", "properties": { "user_id": {"type": "integer"}, "access_level": {"type": "string", "enum": ["read", "write"]} } }, "description": "User access list" }, "teams": { "type": "array", "items": { "type": "object", "properties": { "team_id": {"type": "integer"}, "access_level": {"type": "string", "enum": ["read", "write"]} } }, "description": "Team access list" } }, "required": ["resource_id", "resource_type"] } ), # Settings tools types.Tool( name="get_settings", description="Get Portainer settings", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="update_settings", description="Update Portainer settings", inputSchema={ "type": "object", "properties": { "allow_volume_browser": { "type": "boolean", "description": "Allow users to browse volumes" }, "allow_bind_mounts": { "type": "boolean", "description": "Allow regular users to use bind mounts" }, "allow_privileged_mode": { "type": "boolean", "description": "Allow regular users to use privileged mode" }, "allow_stack_management": { "type": "boolean", "description": "Allow regular users to manage stacks" } }, "required": [] } ) ] async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]: """Make HTTP request to Portainer API.""" import httpx async with httpx.AsyncClient(verify=False) as client: headers = {"X-API-Key": api_key} if api_key else {} if method == "GET": response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers) elif method == "POST": response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "PUT": response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "DELETE": response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers) else: raise ValueError(f"Unsupported method: {method}") return {"status_code": response.status_code, "data": response.json() if response.text else None, "text": response.text} def convert_role(role): """Convert between string and integer role representations.""" if isinstance(role, int): role_map = {1: "Administrator", 2: "StandardUser", 3: "ReadOnlyUser"} return role_map.get(role, f"Unknown({role})") else: role_map = {"Administrator": 1, "StandardUser": 2, "ReadOnlyUser": 3} return role_map.get(role, 2) @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool calls.""" try: # Core tools if name == "test_connection": result = await make_request("GET", "/status") if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ Connected to Portainer at {portainer_url}")] else: return [types.TextContent(type="text", text=f"✗ Failed to connect: HTTP {result['status_code']}")] # User Management elif name == "get_users": result = await make_request("GET", "/users") if result["status_code"] == 200: users = result["data"] output = f"Found {len(users)} users:\n" for user in users: role = convert_role(user.get("Role", "Unknown")) output += f"- ID: {user.get('Id')}, Username: {user.get('Username')}, Role: {role}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get users: HTTP {result['status_code']}")] elif name == "create_user": username = arguments.get("username") password = arguments.get("password") role = arguments.get("role", "StandardUser") if not username or not password: return [types.TextContent(type="text", text="Error: Username and password are required")] role_int = convert_role(role) # Try with integer role first result = await make_request("POST", "/users", { "Username": username, "Password": password, "Role": role_int }) if result["status_code"] == 409: return [types.TextContent(type="text", text=f"User '{username}' already exists")] elif result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ User '{username}' created successfully with role {role}")] else: # Try with string role result = await make_request("POST", "/users", { "Username": username, "Password": password, "Role": role }) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ User '{username}' created successfully with role {role}")] else: return [types.TextContent(type="text", text=f"Failed to create user: HTTP {result['status_code']} - {result['text']}")] elif name == "update_user": user_id = arguments.get("user_id") if not user_id: return [types.TextContent(type="text", text="Error: user_id is required")] update_data = {} if "password" in arguments: update_data["Password"] = arguments["password"] if "role" in arguments: update_data["Role"] = convert_role(arguments["role"]) result = await make_request("PUT", f"/users/{user_id}", update_data) if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ User {user_id} updated successfully")] else: return [types.TextContent(type="text", text=f"Failed to update user: HTTP {result['status_code']}")] elif name == "delete_user": user_id = arguments.get("user_id") if not user_id: return [types.TextContent(type="text", text="Error: user_id is required")] result = await make_request("DELETE", f"/users/{user_id}") if result["status_code"] in [200, 204]: return [types.TextContent(type="text", text=f"✓ User {user_id} deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete user: HTTP {result['status_code']}")] # Teams Management elif name == "get_teams": result = await make_request("GET", "/teams") if result["status_code"] == 200: teams = result["data"] output = f"Found {len(teams)} teams:\n" for team in teams: output += f"- ID: {team.get('Id')}, Name: {team.get('Name')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get teams: HTTP {result['status_code']}")] elif name == "create_team": team_name = arguments.get("name") if not team_name: return [types.TextContent(type="text", text="Error: Team name is required")] team_data = { "Name": team_name, "Leaders": arguments.get("leaders", []) } result = await make_request("POST", "/teams", team_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Team '{team_name}' created successfully")] else: return [types.TextContent(type="text", text=f"Failed to create team: HTTP {result['status_code']} - {result['text']}")] elif name == "add_team_members": team_id = arguments.get("team_id") user_ids = arguments.get("user_ids", []) if not team_id: return [types.TextContent(type="text", text="Error: team_id is required")] if not user_ids: return [types.TextContent(type="text", text="Error: user_ids array is required")] result = await make_request("POST", f"/teams/{team_id}/memberships", {"UserIds": user_ids}) if result["status_code"] in [200, 201, 204]: return [types.TextContent(type="text", text=f"✓ Added {len(user_ids)} users to team {team_id}")] else: return [types.TextContent(type="text", text=f"Failed to add team members: HTTP {result['status_code']}")] elif name == "remove_team_members": team_id = arguments.get("team_id") user_ids = arguments.get("user_ids", []) if not team_id: return [types.TextContent(type="text", text="Error: team_id is required")] if not user_ids: return [types.TextContent(type="text", text="Error: user_ids array is required")] result = await make_request("DELETE", f"/teams/{team_id}/memberships", {"UserIds": user_ids}) if result["status_code"] in [200, 204]: return [types.TextContent(type="text", text=f"✓ Removed {len(user_ids)} users from team {team_id}")] else: return [types.TextContent(type="text", text=f"Failed to remove team members: HTTP {result['status_code']}")] elif name == "delete_team": team_id = arguments.get("team_id") if not team_id: return [types.TextContent(type="text", text="Error: team_id is required")] result = await make_request("DELETE", f"/teams/{team_id}") if result["status_code"] in [200, 204]: return [types.TextContent(type="text", text=f"✓ Team {team_id} deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete team: HTTP {result['status_code']}")] # RBAC tools elif name == "get_roles": result = await make_request("GET", "/roles") if result["status_code"] == 200: roles = result["data"] output = "Available roles:\n" for role in roles: output += f"- ID: {role.get('Id')}, Name: {role.get('Name')}, Priority: {role.get('Priority')}\n" if role.get('Description'): output += f" Description: {role.get('Description')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get roles: HTTP {result['status_code']}")] elif name == "get_resource_controls": result = await make_request("GET", "/resource_controls") if result["status_code"] == 200: controls = result["data"] output = f"Found {len(controls)} resource controls:\n" for control in controls: output += f"- ID: {control.get('Id')}, Resource: {control.get('ResourceId')}, " output += f"Type: {control.get('Type')}, Public: {control.get('Public')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get resource controls: HTTP {result['status_code']}")] elif name == "create_resource_control": resource_id = arguments.get("resource_id") resource_type = arguments.get("resource_type") if not resource_id or not resource_type: return [types.TextContent(type="text", text="Error: resource_id and resource_type are required")] control_data = { "ResourceId": resource_id, "Type": resource_type, "Public": arguments.get("public", False), "AdministratorsOnly": arguments.get("administrators_only", False), "UserAccesses": arguments.get("users", []), "TeamAccesses": arguments.get("teams", []) } result = await make_request("POST", "/resource_controls", control_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Resource control created for {resource_id}")] else: return [types.TextContent(type="text", text=f"Failed to create resource control: HTTP {result['status_code']}")] # Settings tools elif name == "get_settings": result = await make_request("GET", "/settings") if result["status_code"] == 200: settings = result["data"] output = "Portainer Settings:\n" output += f"- Allow Volume Browser: {settings.get('AllowVolumeBrowser', False)}\n" output += f"- Allow Bind Mounts: {settings.get('AllowBindMountsForRegularUsers', False)}\n" output += f"- Allow Privileged Mode: {settings.get('AllowPrivilegedModeForRegularUsers', False)}\n" output += f"- Allow Stack Management: {settings.get('AllowStackManagementForRegularUsers', False)}\n" output += f"- Authentication Method: {settings.get('AuthenticationMethod', 'Unknown')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get settings: HTTP {result['status_code']}")] elif name == "update_settings": # Get current settings first current = await make_request("GET", "/settings") if current["status_code"] != 200: return [types.TextContent(type="text", text=f"Failed to get current settings: HTTP {current['status_code']}")] settings_data = current["data"] # Update only provided fields if "allow_volume_browser" in arguments: settings_data["AllowVolumeBrowser"] = arguments["allow_volume_browser"] if "allow_bind_mounts" in arguments: settings_data["AllowBindMountsForRegularUsers"] = arguments["allow_bind_mounts"] if "allow_privileged_mode" in arguments: settings_data["AllowPrivilegedModeForRegularUsers"] = arguments["allow_privileged_mode"] if "allow_stack_management" in arguments: settings_data["AllowStackManagementForRegularUsers"] = arguments["allow_stack_management"] result = await make_request("PUT", "/settings", settings_data) if result["status_code"] == 200: return [types.TextContent(type="text", text="✓ Settings updated successfully")] else: return [types.TextContent(type="text", text=f"Failed to update settings: HTTP {result['status_code']}")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] async def run(): """Run the MCP server.""" # Use stdio transport async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="portainer-core", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions( prompts_changed=False, resources_changed=False, tools_changed=False, ), experimental_capabilities={}, ), ), ) def main(): """Main entry point.""" asyncio.run(run()) if __name__ == "__main__": main()