portainer-mcp/portainer_core_server.py
Adolfo Delorenzo d5f8ae5794 refactor: clean up codebase and rename core server
- Remove test files and demos (test_*.py, create_nginx_stack.py)
- Remove build artifacts (egg-info directory)
- Rename merged_mcp_server.py to portainer_core_server.py for consistency
- Update documentation to reflect new naming
- Add comprehensive docstrings to all Python files
- Maintain all essential functionality

This cleanup improves code organization while preserving all production servers:
- portainer_core_server.py (formerly merged_mcp_server.py)
- portainer_docker_server.py
- portainer_edge_server.py
- portainer_environments_server.py
- portainer_gitops_server.py
- portainer_kubernetes_server.py
- portainer_stacks_server.py

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 00:43:23 -03:00

640 lines
26 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Portainer Core MCP Server
This module provides the core MCP server for Portainer Business Edition,
combining essential user management and teams/RBAC functionality into a
single unified server.
The server implements comprehensive management capabilities including:
- User authentication and session management
- User CRUD operations with role assignments
- Teams creation and membership management
- Role-based access control (RBAC) configuration
- Resource access controls for fine-grained permissions
- System settings management
This core server serves as the foundation for Portainer management,
providing the essential identity and access management features that
other specialized servers (Docker, Kubernetes, Edge, etc.) depend on.
Complexity: O(1) for all operations (simple HTTP requests)
Dependencies: httpx for async HTTP, mcp for server protocol
Call Flow: MCP client -> handle_call_tool() -> make_request() -> Portainer API
Environment Variables:
PORTAINER_URL: Base URL of Portainer instance
PORTAINER_API_KEY: API key for authentication
MCP_MODE: Set to "true" to suppress logging
"""
import os
import sys
import json
import asyncio
from typing import Any, Dict, List, Optional
# Suppress all logging to stderr
os.environ["MCP_MODE"] = "true"
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Create server
server = Server("portainer-core")
# Store for our state
portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live")
api_key = os.getenv("PORTAINER_API_KEY", "")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools."""
return [
# Core tools
types.Tool(
name="test_connection",
description="Test connection to Portainer",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
# User Management tools
types.Tool(
name="get_users",
description="Get list of Portainer users",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_user",
description="Create a new Portainer user",
inputSchema={
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Username for the new user"
},
"password": {
"type": "string",
"description": "Password for the new user"
},
"role": {
"type": "string",
"description": "User role",
"enum": ["Administrator", "StandardUser", "ReadOnlyUser"],
"default": "StandardUser"
}
},
"required": ["username", "password"]
}
),
types.Tool(
name="update_user",
description="Update an existing user",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "ID of the user to update"
},
"password": {
"type": "string",
"description": "New password (optional)"
},
"role": {
"type": "string",
"description": "New role (optional)",
"enum": ["Administrator", "StandardUser", "ReadOnlyUser"]
}
},
"required": ["user_id"]
}
),
types.Tool(
name="delete_user",
description="Delete a user",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "ID of the user to delete"
}
},
"required": ["user_id"]
}
),
# Teams Management tools
types.Tool(
name="get_teams",
description="Get list of teams",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_team",
description="Create a new team",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the team"
},
"leaders": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of user IDs to be team leaders (optional)"
}
},
"required": ["name"]
}
),
types.Tool(
name="add_team_members",
description="Add members to a team",
inputSchema={
"type": "object",
"properties": {
"team_id": {
"type": "integer",
"description": "ID of the team"
},
"user_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of user IDs to add to the team"
}
},
"required": ["team_id", "user_ids"]
}
),
types.Tool(
name="remove_team_members",
description="Remove members from a team",
inputSchema={
"type": "object",
"properties": {
"team_id": {
"type": "integer",
"description": "ID of the team"
},
"user_ids": {
"type": "array",
"items": {"type": "integer"},
"description": "Array of user IDs to remove from the team"
}
},
"required": ["team_id", "user_ids"]
}
),
types.Tool(
name="delete_team",
description="Delete a team",
inputSchema={
"type": "object",
"properties": {
"team_id": {
"type": "integer",
"description": "ID of the team to delete"
}
},
"required": ["team_id"]
}
),
# RBAC tools
types.Tool(
name="get_roles",
description="Get available roles",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="get_resource_controls",
description="Get resource access controls",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_resource_control",
description="Create resource access control",
inputSchema={
"type": "object",
"properties": {
"resource_id": {
"type": "string",
"description": "ID of the resource"
},
"resource_type": {
"type": "string",
"description": "Type of resource (container, service, volume, etc.)"
},
"public": {
"type": "boolean",
"description": "Whether the resource is public",
"default": False
},
"administrators_only": {
"type": "boolean",
"description": "Whether only administrators can access",
"default": False
},
"users": {
"type": "array",
"items": {
"type": "object",
"properties": {
"user_id": {"type": "integer"},
"access_level": {"type": "string", "enum": ["read", "write"]}
}
},
"description": "User access list"
},
"teams": {
"type": "array",
"items": {
"type": "object",
"properties": {
"team_id": {"type": "integer"},
"access_level": {"type": "string", "enum": ["read", "write"]}
}
},
"description": "Team access list"
}
},
"required": ["resource_id", "resource_type"]
}
),
# Settings tools
types.Tool(
name="get_settings",
description="Get Portainer settings",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="update_settings",
description="Update Portainer settings",
inputSchema={
"type": "object",
"properties": {
"allow_volume_browser": {
"type": "boolean",
"description": "Allow users to browse volumes"
},
"allow_bind_mounts": {
"type": "boolean",
"description": "Allow regular users to use bind mounts"
},
"allow_privileged_mode": {
"type": "boolean",
"description": "Allow regular users to use privileged mode"
},
"allow_stack_management": {
"type": "boolean",
"description": "Allow regular users to manage stacks"
}
},
"required": []
}
)
]
async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make HTTP request to Portainer API."""
import httpx
async with httpx.AsyncClient(verify=False) as client:
headers = {"X-API-Key": api_key} if api_key else {}
if method == "GET":
response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers)
elif method == "POST":
response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "PUT":
response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "DELETE":
response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers)
else:
raise ValueError(f"Unsupported method: {method}")
return {"status_code": response.status_code, "data": response.json() if response.text else None, "text": response.text}
def convert_role(role):
"""Convert between string and integer role representations."""
if isinstance(role, int):
role_map = {1: "Administrator", 2: "StandardUser", 3: "ReadOnlyUser"}
return role_map.get(role, f"Unknown({role})")
else:
role_map = {"Administrator": 1, "StandardUser": 2, "ReadOnlyUser": 3}
return role_map.get(role, 2)
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""Handle tool calls."""
try:
# Core tools
if name == "test_connection":
result = await make_request("GET", "/status")
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Connected to Portainer at {portainer_url}")]
else:
return [types.TextContent(type="text", text=f"✗ Failed to connect: HTTP {result['status_code']}")]
# User Management
elif name == "get_users":
result = await make_request("GET", "/users")
if result["status_code"] == 200:
users = result["data"]
output = f"Found {len(users)} users:\n"
for user in users:
role = convert_role(user.get("Role", "Unknown"))
output += f"- ID: {user.get('Id')}, Username: {user.get('Username')}, Role: {role}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get users: HTTP {result['status_code']}")]
elif name == "create_user":
username = arguments.get("username")
password = arguments.get("password")
role = arguments.get("role", "StandardUser")
if not username or not password:
return [types.TextContent(type="text", text="Error: Username and password are required")]
role_int = convert_role(role)
# Try with integer role first
result = await make_request("POST", "/users", {
"Username": username,
"Password": password,
"Role": role_int
})
if result["status_code"] == 409:
return [types.TextContent(type="text", text=f"User '{username}' already exists")]
elif result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ User '{username}' created successfully with role {role}")]
else:
# Try with string role
result = await make_request("POST", "/users", {
"Username": username,
"Password": password,
"Role": role
})
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ User '{username}' created successfully with role {role}")]
else:
return [types.TextContent(type="text", text=f"Failed to create user: HTTP {result['status_code']} - {result['text']}")]
elif name == "update_user":
user_id = arguments.get("user_id")
if not user_id:
return [types.TextContent(type="text", text="Error: user_id is required")]
update_data = {}
if "password" in arguments:
update_data["Password"] = arguments["password"]
if "role" in arguments:
update_data["Role"] = convert_role(arguments["role"])
result = await make_request("PUT", f"/users/{user_id}", update_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ User {user_id} updated successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to update user: HTTP {result['status_code']}")]
elif name == "delete_user":
user_id = arguments.get("user_id")
if not user_id:
return [types.TextContent(type="text", text="Error: user_id is required")]
result = await make_request("DELETE", f"/users/{user_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ User {user_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete user: HTTP {result['status_code']}")]
# Teams Management
elif name == "get_teams":
result = await make_request("GET", "/teams")
if result["status_code"] == 200:
teams = result["data"]
output = f"Found {len(teams)} teams:\n"
for team in teams:
output += f"- ID: {team.get('Id')}, Name: {team.get('Name')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get teams: HTTP {result['status_code']}")]
elif name == "create_team":
team_name = arguments.get("name")
if not team_name:
return [types.TextContent(type="text", text="Error: Team name is required")]
team_data = {
"Name": team_name,
"Leaders": arguments.get("leaders", [])
}
result = await make_request("POST", "/teams", team_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Team '{team_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create team: HTTP {result['status_code']} - {result['text']}")]
elif name == "add_team_members":
team_id = arguments.get("team_id")
user_ids = arguments.get("user_ids", [])
if not team_id:
return [types.TextContent(type="text", text="Error: team_id is required")]
if not user_ids:
return [types.TextContent(type="text", text="Error: user_ids array is required")]
result = await make_request("POST", f"/teams/{team_id}/memberships", {"UserIds": user_ids})
if result["status_code"] in [200, 201, 204]:
return [types.TextContent(type="text", text=f"✓ Added {len(user_ids)} users to team {team_id}")]
else:
return [types.TextContent(type="text", text=f"Failed to add team members: HTTP {result['status_code']}")]
elif name == "remove_team_members":
team_id = arguments.get("team_id")
user_ids = arguments.get("user_ids", [])
if not team_id:
return [types.TextContent(type="text", text="Error: team_id is required")]
if not user_ids:
return [types.TextContent(type="text", text="Error: user_ids array is required")]
result = await make_request("DELETE", f"/teams/{team_id}/memberships", {"UserIds": user_ids})
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Removed {len(user_ids)} users from team {team_id}")]
else:
return [types.TextContent(type="text", text=f"Failed to remove team members: HTTP {result['status_code']}")]
elif name == "delete_team":
team_id = arguments.get("team_id")
if not team_id:
return [types.TextContent(type="text", text="Error: team_id is required")]
result = await make_request("DELETE", f"/teams/{team_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Team {team_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete team: HTTP {result['status_code']}")]
# RBAC tools
elif name == "get_roles":
result = await make_request("GET", "/roles")
if result["status_code"] == 200:
roles = result["data"]
output = "Available roles:\n"
for role in roles:
output += f"- ID: {role.get('Id')}, Name: {role.get('Name')}, Priority: {role.get('Priority')}\n"
if role.get('Description'):
output += f" Description: {role.get('Description')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get roles: HTTP {result['status_code']}")]
elif name == "get_resource_controls":
result = await make_request("GET", "/resource_controls")
if result["status_code"] == 200:
controls = result["data"]
output = f"Found {len(controls)} resource controls:\n"
for control in controls:
output += f"- ID: {control.get('Id')}, Resource: {control.get('ResourceId')}, "
output += f"Type: {control.get('Type')}, Public: {control.get('Public')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get resource controls: HTTP {result['status_code']}")]
elif name == "create_resource_control":
resource_id = arguments.get("resource_id")
resource_type = arguments.get("resource_type")
if not resource_id or not resource_type:
return [types.TextContent(type="text", text="Error: resource_id and resource_type are required")]
control_data = {
"ResourceId": resource_id,
"Type": resource_type,
"Public": arguments.get("public", False),
"AdministratorsOnly": arguments.get("administrators_only", False),
"UserAccesses": arguments.get("users", []),
"TeamAccesses": arguments.get("teams", [])
}
result = await make_request("POST", "/resource_controls", control_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Resource control created for {resource_id}")]
else:
return [types.TextContent(type="text", text=f"Failed to create resource control: HTTP {result['status_code']}")]
# Settings tools
elif name == "get_settings":
result = await make_request("GET", "/settings")
if result["status_code"] == 200:
settings = result["data"]
output = "Portainer Settings:\n"
output += f"- Allow Volume Browser: {settings.get('AllowVolumeBrowser', False)}\n"
output += f"- Allow Bind Mounts: {settings.get('AllowBindMountsForRegularUsers', False)}\n"
output += f"- Allow Privileged Mode: {settings.get('AllowPrivilegedModeForRegularUsers', False)}\n"
output += f"- Allow Stack Management: {settings.get('AllowStackManagementForRegularUsers', False)}\n"
output += f"- Authentication Method: {settings.get('AuthenticationMethod', 'Unknown')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get settings: HTTP {result['status_code']}")]
elif name == "update_settings":
# Get current settings first
current = await make_request("GET", "/settings")
if current["status_code"] != 200:
return [types.TextContent(type="text", text=f"Failed to get current settings: HTTP {current['status_code']}")]
settings_data = current["data"]
# Update only provided fields
if "allow_volume_browser" in arguments:
settings_data["AllowVolumeBrowser"] = arguments["allow_volume_browser"]
if "allow_bind_mounts" in arguments:
settings_data["AllowBindMountsForRegularUsers"] = arguments["allow_bind_mounts"]
if "allow_privileged_mode" in arguments:
settings_data["AllowPrivilegedModeForRegularUsers"] = arguments["allow_privileged_mode"]
if "allow_stack_management" in arguments:
settings_data["AllowStackManagementForRegularUsers"] = arguments["allow_stack_management"]
result = await make_request("PUT", "/settings", settings_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text="✓ Settings updated successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to update settings: HTTP {result['status_code']}")]
else:
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
return [types.TextContent(type="text", text=f"Error: {str(e)}")]
async def run():
"""Run the MCP server."""
# Use stdio transport
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="portainer-core",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(
prompts_changed=False,
resources_changed=False,
tools_changed=False,
),
experimental_capabilities={},
),
),
)
def main():
"""Main entry point."""
asyncio.run(run())
if __name__ == "__main__":
main()