#!/usr/bin/env python3 """Merged MCP server for Portainer Core + Teams functionality.""" import os import sys import json import asyncio from typing import Any, Dict, List, Optional # Suppress all logging to stderr os.environ["MCP_MODE"] = "true" import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions # Create server server = Server("portainer-unified") # Store for our state portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live") api_key = os.getenv("PORTAINER_API_KEY", "") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools.""" return [ # Core tools types.Tool( name="test_connection", description="Test connection to Portainer", inputSchema={ "type": "object", "properties": {}, "required": [] } ), # User Management tools types.Tool( name="get_users", description="Get list of Portainer users", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="create_user", description="Create a new Portainer user", inputSchema={ "type": "object", "properties": { "username": { "type": "string", "description": "Username for the new user" }, "password": { "type": "string", "description": "Password for the new user" }, "role": { "type": "string", "description": "User role", "enum": ["Administrator", "StandardUser", "ReadOnlyUser"], "default": "StandardUser" } }, "required": ["username", "password"] } ), types.Tool( name="update_user", description="Update an existing user", inputSchema={ "type": "object", "properties": { "user_id": { "type": "integer", "description": "ID of the user to update" }, "password": { "type": "string", "description": "New password (optional)" }, "role": { "type": "string", "description": "New role (optional)", "enum": ["Administrator", "StandardUser", "ReadOnlyUser"] } }, "required": ["user_id"] } ), types.Tool( name="delete_user", description="Delete a user", inputSchema={ "type": "object", "properties": { "user_id": { "type": "integer", "description": "ID of the user to delete" } }, "required": ["user_id"] } ), # Teams Management tools types.Tool( name="get_teams", description="Get list of teams", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="create_team", description="Create a new team", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name of the team" }, "leaders": { "type": "array", "items": {"type": "integer"}, "description": "Array of user IDs to be team leaders (optional)" } }, "required": ["name"] } ), types.Tool( name="add_team_members", description="Add members to a team", inputSchema={ "type": "object", "properties": { "team_id": { "type": "integer", "description": "ID of the team" }, "user_ids": { "type": "array", "items": {"type": "integer"}, "description": "Array of user IDs to add to the team" } }, "required": ["team_id", "user_ids"] } ), types.Tool( name="remove_team_members", description="Remove members from a team", inputSchema={ "type": "object", "properties": { "team_id": { "type": "integer", "description": "ID of the team" }, "user_ids": { "type": "array", "items": {"type": "integer"}, "description": "Array of user IDs to remove from the team" } }, "required": ["team_id", "user_ids"] } ), types.Tool( name="delete_team", description="Delete a team", inputSchema={ "type": "object", "properties": { "team_id": { "type": "integer", "description": "ID of the team to delete" } }, "required": ["team_id"] } ), # RBAC tools types.Tool( name="get_roles", description="Get available roles", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="get_resource_controls", description="Get resource access controls", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="create_resource_control", description="Create resource access control", inputSchema={ "type": "object", "properties": { "resource_id": { "type": "string", "description": "ID of the resource" }, "resource_type": { "type": "string", "description": "Type of resource (container, service, volume, etc.)" }, "public": { "type": "boolean", "description": "Whether the resource is public", "default": False }, "administrators_only": { "type": "boolean", "description": "Whether only administrators can access", "default": False }, "users": { "type": "array", "items": { "type": "object", "properties": { "user_id": {"type": "integer"}, "access_level": {"type": "string", "enum": ["read", "write"]} } }, "description": "User access list" }, "teams": { "type": "array", "items": { "type": "object", "properties": { "team_id": {"type": "integer"}, "access_level": {"type": "string", "enum": ["read", "write"]} } }, "description": "Team access list" } }, "required": ["resource_id", "resource_type"] } ), # Settings tools types.Tool( name="get_settings", description="Get Portainer settings", inputSchema={ "type": "object", "properties": {}, "required": [] } ), types.Tool( name="update_settings", description="Update Portainer settings", inputSchema={ "type": "object", "properties": { "allow_volume_browser": { "type": "boolean", "description": "Allow users to browse volumes" }, "allow_bind_mounts": { "type": "boolean", "description": "Allow regular users to use bind mounts" }, "allow_privileged_mode": { "type": "boolean", "description": "Allow regular users to use privileged mode" }, "allow_stack_management": { "type": "boolean", "description": "Allow regular users to manage stacks" } }, "required": [] } ) ] async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]: """Make HTTP request to Portainer API.""" import httpx async with httpx.AsyncClient(verify=False) as client: headers = {"X-API-Key": api_key} if api_key else {} if method == "GET": response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers) elif method == "POST": response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "PUT": response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "DELETE": response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers) else: raise ValueError(f"Unsupported method: {method}") return {"status_code": response.status_code, "data": response.json() if response.text else None, "text": response.text} def convert_role(role): """Convert between string and integer role representations.""" if isinstance(role, int): role_map = {1: "Administrator", 2: "StandardUser", 3: "ReadOnlyUser"} return role_map.get(role, f"Unknown({role})") else: role_map = {"Administrator": 1, "StandardUser": 2, "ReadOnlyUser": 3} return role_map.get(role, 2) @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool calls.""" try: # Core tools if name == "test_connection": result = await make_request("GET", "/status") if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ Connected to Portainer at {portainer_url}")] else: return [types.TextContent(type="text", text=f"✗ Failed to connect: HTTP {result['status_code']}")] # User Management elif name == "get_users": result = await make_request("GET", "/users") if result["status_code"] == 200: users = result["data"] output = f"Found {len(users)} users:\n" for user in users: role = convert_role(user.get("Role", "Unknown")) output += f"- ID: {user.get('Id')}, Username: {user.get('Username')}, Role: {role}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get users: HTTP {result['status_code']}")] elif name == "create_user": username = arguments.get("username") password = arguments.get("password") role = arguments.get("role", "StandardUser") if not username or not password: return [types.TextContent(type="text", text="Error: Username and password are required")] role_int = convert_role(role) # Try with integer role first result = await make_request("POST", "/users", { "Username": username, "Password": password, "Role": role_int }) if result["status_code"] == 409: return [types.TextContent(type="text", text=f"User '{username}' already exists")] elif result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ User '{username}' created successfully with role {role}")] else: # Try with string role result = await make_request("POST", "/users", { "Username": username, "Password": password, "Role": role }) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ User '{username}' created successfully with role {role}")] else: return [types.TextContent(type="text", text=f"Failed to create user: HTTP {result['status_code']} - {result['text']}")] elif name == "update_user": user_id = arguments.get("user_id") if not user_id: return [types.TextContent(type="text", text="Error: user_id is required")] update_data = {} if "password" in arguments: update_data["Password"] = arguments["password"] if "role" in arguments: update_data["Role"] = convert_role(arguments["role"]) result = await make_request("PUT", f"/users/{user_id}", update_data) if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ User {user_id} updated successfully")] else: return [types.TextContent(type="text", text=f"Failed to update user: HTTP {result['status_code']}")] elif name == "delete_user": user_id = arguments.get("user_id") if not user_id: return [types.TextContent(type="text", text="Error: user_id is required")] result = await make_request("DELETE", f"/users/{user_id}") if result["status_code"] in [200, 204]: return [types.TextContent(type="text", text=f"✓ User {user_id} deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete user: HTTP {result['status_code']}")] # Teams Management elif name == "get_teams": result = await make_request("GET", "/teams") if result["status_code"] == 200: teams = result["data"] output = f"Found {len(teams)} teams:\n" for team in teams: output += f"- ID: {team.get('Id')}, Name: {team.get('Name')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get teams: HTTP {result['status_code']}")] elif name == "create_team": team_name = arguments.get("name") if not team_name: return [types.TextContent(type="text", text="Error: Team name is required")] team_data = { "Name": team_name, "Leaders": arguments.get("leaders", []) } result = await make_request("POST", "/teams", team_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Team '{team_name}' created successfully")] else: return [types.TextContent(type="text", text=f"Failed to create team: HTTP {result['status_code']} - {result['text']}")] elif name == "add_team_members": team_id = arguments.get("team_id") user_ids = arguments.get("user_ids", []) if not team_id: return [types.TextContent(type="text", text="Error: team_id is required")] if not user_ids: return [types.TextContent(type="text", text="Error: user_ids array is required")] result = await make_request("POST", f"/teams/{team_id}/memberships", {"UserIds": user_ids}) if result["status_code"] in [200, 201, 204]: return [types.TextContent(type="text", text=f"✓ Added {len(user_ids)} users to team {team_id}")] else: return [types.TextContent(type="text", text=f"Failed to add team members: HTTP {result['status_code']}")] elif name == "remove_team_members": team_id = arguments.get("team_id") user_ids = arguments.get("user_ids", []) if not team_id: return [types.TextContent(type="text", text="Error: team_id is required")] if not user_ids: return [types.TextContent(type="text", text="Error: user_ids array is required")] result = await make_request("DELETE", f"/teams/{team_id}/memberships", {"UserIds": user_ids}) if result["status_code"] in [200, 204]: return [types.TextContent(type="text", text=f"✓ Removed {len(user_ids)} users from team {team_id}")] else: return [types.TextContent(type="text", text=f"Failed to remove team members: HTTP {result['status_code']}")] elif name == "delete_team": team_id = arguments.get("team_id") if not team_id: return [types.TextContent(type="text", text="Error: team_id is required")] result = await make_request("DELETE", f"/teams/{team_id}") if result["status_code"] in [200, 204]: return [types.TextContent(type="text", text=f"✓ Team {team_id} deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete team: HTTP {result['status_code']}")] # RBAC tools elif name == "get_roles": result = await make_request("GET", "/roles") if result["status_code"] == 200: roles = result["data"] output = "Available roles:\n" for role in roles: output += f"- ID: {role.get('Id')}, Name: {role.get('Name')}, Priority: {role.get('Priority')}\n" if role.get('Description'): output += f" Description: {role.get('Description')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get roles: HTTP {result['status_code']}")] elif name == "get_resource_controls": result = await make_request("GET", "/resource_controls") if result["status_code"] == 200: controls = result["data"] output = f"Found {len(controls)} resource controls:\n" for control in controls: output += f"- ID: {control.get('Id')}, Resource: {control.get('ResourceId')}, " output += f"Type: {control.get('Type')}, Public: {control.get('Public')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get resource controls: HTTP {result['status_code']}")] elif name == "create_resource_control": resource_id = arguments.get("resource_id") resource_type = arguments.get("resource_type") if not resource_id or not resource_type: return [types.TextContent(type="text", text="Error: resource_id and resource_type are required")] control_data = { "ResourceId": resource_id, "Type": resource_type, "Public": arguments.get("public", False), "AdministratorsOnly": arguments.get("administrators_only", False), "UserAccesses": arguments.get("users", []), "TeamAccesses": arguments.get("teams", []) } result = await make_request("POST", "/resource_controls", control_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Resource control created for {resource_id}")] else: return [types.TextContent(type="text", text=f"Failed to create resource control: HTTP {result['status_code']}")] # Settings tools elif name == "get_settings": result = await make_request("GET", "/settings") if result["status_code"] == 200: settings = result["data"] output = "Portainer Settings:\n" output += f"- Allow Volume Browser: {settings.get('AllowVolumeBrowser', False)}\n" output += f"- Allow Bind Mounts: {settings.get('AllowBindMountsForRegularUsers', False)}\n" output += f"- Allow Privileged Mode: {settings.get('AllowPrivilegedModeForRegularUsers', False)}\n" output += f"- Allow Stack Management: {settings.get('AllowStackManagementForRegularUsers', False)}\n" output += f"- Authentication Method: {settings.get('AuthenticationMethod', 'Unknown')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get settings: HTTP {result['status_code']}")] elif name == "update_settings": # Get current settings first current = await make_request("GET", "/settings") if current["status_code"] != 200: return [types.TextContent(type="text", text=f"Failed to get current settings: HTTP {current['status_code']}")] settings_data = current["data"] # Update only provided fields if "allow_volume_browser" in arguments: settings_data["AllowVolumeBrowser"] = arguments["allow_volume_browser"] if "allow_bind_mounts" in arguments: settings_data["AllowBindMountsForRegularUsers"] = arguments["allow_bind_mounts"] if "allow_privileged_mode" in arguments: settings_data["AllowPrivilegedModeForRegularUsers"] = arguments["allow_privileged_mode"] if "allow_stack_management" in arguments: settings_data["AllowStackManagementForRegularUsers"] = arguments["allow_stack_management"] result = await make_request("PUT", "/settings", settings_data) if result["status_code"] == 200: return [types.TextContent(type="text", text="✓ Settings updated successfully")] else: return [types.TextContent(type="text", text=f"Failed to update settings: HTTP {result['status_code']}")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: return [types.TextContent(type="text", text=f"Error: {str(e)}")] async def run(): """Run the MCP server.""" # Use stdio transport async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="portainer-unified", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions( prompts_changed=False, resources_changed=False, tools_changed=False, ), experimental_capabilities={}, ), ), ) def main(): """Main entry point.""" asyncio.run(run()) if __name__ == "__main__": main()