portainer-mcp/portainer_environments_server.py
Adolfo Delorenzo e27251b922 feat: add three new Portainer MCP servers
- Add portainer-environments server for environment/endpoint management
- Add portainer-docker server for Docker and Swarm container operations
- Add merged portainer server combining core + teams functionality
- Fix JSON schema issues and API compatibility
- Add comprehensive documentation for each server
- Add .gitignore and .env.example for security

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-18 13:00:05 -03:00

861 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""MCP server for Portainer Environments management."""
import os
import sys
import json
import asyncio
from typing import Any, Dict, List, Optional
from enum import Enum
# Suppress all logging to stderr
os.environ["MCP_MODE"] = "true"
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Create server
server = Server("portainer-environments")
# Store for our state
portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live")
api_key = os.getenv("PORTAINER_API_KEY", "")
# Environment types enum
class EnvironmentType(Enum):
DOCKER = 1
DOCKER_SWARM = 2
KUBERNETES = 3
ACI = 4
EDGE_AGENT = 5
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools."""
return [
# Basic environment operations
types.Tool(
name="list_environments",
description="List all environments/endpoints",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of environments to return (default: all)",
"default": 100
},
"start": {
"type": "integer",
"description": "Starting index for pagination",
"default": 0
}
},
"required": []
}
),
types.Tool(
name="get_environment",
description="Get details of a specific environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_docker_environment",
description="Create a new Docker environment",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the environment"
},
"url": {
"type": "string",
"description": "Docker API URL (e.g., tcp://localhost:2375 or unix:///var/run/docker.sock)"
},
"public_url": {
"type": "string",
"description": "Public URL for accessing the environment (optional)"
},
"group_id": {
"type": "integer",
"description": "Environment group ID (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Array of tags for the environment (optional)"
},
"tls": {
"type": "boolean",
"description": "Enable TLS",
"default": False
},
"tls_skip_verify": {
"type": "boolean",
"description": "Skip TLS certificate verification",
"default": False
}
},
"required": ["name", "url"]
}
),
types.Tool(
name="create_kubernetes_environment",
description="Create a new Kubernetes environment",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the environment"
},
"url": {
"type": "string",
"description": "Kubernetes API server URL"
},
"bearer_token": {
"type": "string",
"description": "Bearer token for authentication"
},
"group_id": {
"type": "integer",
"description": "Environment group ID (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Array of tags for the environment (optional)"
},
"tls_skip_verify": {
"type": "boolean",
"description": "Skip TLS certificate verification",
"default": False
}
},
"required": ["name", "url"]
}
),
types.Tool(
name="update_environment",
description="Update an existing environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment to update"
},
"name": {
"type": "string",
"description": "New name for the environment (optional)"
},
"url": {
"type": "string",
"description": "New URL for the environment (optional)"
},
"public_url": {
"type": "string",
"description": "New public URL (optional)"
},
"group_id": {
"type": "integer",
"description": "New group ID (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "New tags array (optional)"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="delete_environment",
description="Delete an environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment to delete"
}
},
"required": ["environment_id"]
}
),
# Environment status and management
types.Tool(
name="get_environment_status",
description="Get status and statistics of an environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="associate_environment",
description="Associate/disassociate environment with teams",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment"
},
"teams": {
"type": "array",
"items": {
"type": "object",
"properties": {
"team_id": {"type": "integer"},
"access_level": {"type": "string", "enum": ["read", "write"]}
}
},
"description": "Array of team associations"
}
},
"required": ["environment_id", "teams"]
}
),
# Environment groups
types.Tool(
name="list_environment_groups",
description="List all environment groups",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_environment_group",
description="Create a new environment group",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the environment group"
},
"description": {
"type": "string",
"description": "Description of the group (optional)"
}
},
"required": ["name"]
}
),
types.Tool(
name="update_environment_group",
description="Update an environment group",
inputSchema={
"type": "object",
"properties": {
"group_id": {
"type": "integer",
"description": "ID of the group to update"
},
"name": {
"type": "string",
"description": "New name for the group (optional)"
},
"description": {
"type": "string",
"description": "New description (optional)"
}
},
"required": ["group_id"]
}
),
types.Tool(
name="delete_environment_group",
description="Delete an environment group",
inputSchema={
"type": "object",
"properties": {
"group_id": {
"type": "integer",
"description": "ID of the group to delete"
}
},
"required": ["group_id"]
}
),
# Tags management
types.Tool(
name="list_tags",
description="List all available tags",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_tag",
description="Create a new tag",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the tag"
}
},
"required": ["name"]
}
),
types.Tool(
name="delete_tag",
description="Delete a tag",
inputSchema={
"type": "object",
"properties": {
"tag_id": {
"type": "integer",
"description": "ID of the tag to delete"
}
},
"required": ["tag_id"]
}
),
# Edge environments
types.Tool(
name="generate_edge_key",
description="Generate Edge agent deployment script",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name for the Edge environment"
},
"group_id": {
"type": "integer",
"description": "Environment group ID (optional)"
}
},
"required": ["name"]
}
)
]
async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make HTTP request to Portainer API."""
import httpx
async with httpx.AsyncClient(verify=False, timeout=30.0) as client:
headers = {"X-API-Key": api_key} if api_key else {}
if method == "GET":
response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers)
elif method == "POST":
response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "PUT":
response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "DELETE":
response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers)
else:
raise ValueError(f"Unsupported method: {method}")
# Parse JSON response safely
try:
data = response.json() if response.text and response.headers.get("content-type", "").startswith("application/json") else None
except Exception:
data = None
return {"status_code": response.status_code, "data": data, "text": response.text}
def format_environment_type(env_type: int) -> str:
"""Convert environment type ID to readable string."""
type_map = {
1: "Docker",
2: "Docker Swarm",
3: "Kubernetes",
4: "Azure ACI",
5: "Edge Agent",
6: "Local Docker",
7: "Local Kubernetes"
}
return type_map.get(env_type, f"Unknown({env_type})")
def format_environment_status(status: int) -> str:
"""Convert environment status to readable string."""
status_map = {
1: "up",
2: "down"
}
return status_map.get(status, f"unknown({status})")
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""Handle tool calls."""
import httpx
try:
# Basic environment operations
if name == "list_environments":
limit = arguments.get("limit", 100) if arguments else 100
start = arguments.get("start", 0) if arguments else 0
# Try both /environments (new) and /endpoints (old) endpoints
result = await make_request("GET", f"/environments?limit={limit}&start={start}")
if result["status_code"] == 404:
# Legacy endpoint doesn't support pagination params in URL
result = await make_request("GET", "/endpoints")
if result["status_code"] == 200 and result["data"] is not None:
environments = result["data"]
# Handle pagination manually for legacy endpoint
if isinstance(environments, list):
total = len(environments)
environments = environments[start:start + limit]
output = f"Found {total} environments (showing {len(environments)}):\n"
else:
output = f"Found environments:\n"
for env in environments[:10]: # Limit output to first 10 to avoid huge responses
env_type = format_environment_type(env.get("Type", 0))
status = format_environment_status(env.get("Status", 0))
output += f"\n- ID: {env.get('Id')}, Name: {env.get('Name')}"
output += f"\n Type: {env_type}, Status: {status}"
output += f"\n URL: {env.get('URL', 'N/A')}"
if env.get("GroupId"):
output += f"\n Group ID: {env.get('GroupId')}"
if env.get("TagIds") or env.get("Tags"):
tags = env.get("Tags", [])
if tags:
tag_names = [t.get('Name', '') for t in tags if isinstance(t, dict)]
if tag_names:
output += f"\n Tags: {', '.join(tag_names)}"
if len(environments) > 10:
output += f"\n\n... and {len(environments) - 10} more environments"
return [types.TextContent(type="text", text=output)]
else:
error_msg = f"Failed to list environments: HTTP {result['status_code']}"
if result.get("text"):
error_msg += f" - {result['text'][:200]}"
return [types.TextContent(type="text", text=error_msg)]
elif name == "get_environment":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Try both endpoints
result = await make_request("GET", f"/environments/{env_id}")
if result["status_code"] == 404:
result = await make_request("GET", f"/endpoints/{env_id}")
if result["status_code"] == 200:
env = result["data"]
output = f"Environment Details:\n"
output += f"- ID: {env.get('Id')}\n"
output += f"- Name: {env.get('Name')}\n"
output += f"- Type: {format_environment_type(env.get('Type', 0))}\n"
output += f"- Status: {format_environment_status(env.get('Status', 0))}\n"
output += f"- URL: {env.get('URL', 'N/A')}\n"
output += f"- Public URL: {env.get('PublicURL', 'N/A')}\n"
if env.get("GroupId"):
output += f"- Group ID: {env.get('GroupId')}\n"
if env.get("Tags"):
output += f"- Tags: {', '.join([t.get('Name', '') for t in env.get('Tags', [])])}\n"
if env.get("TLSConfig"):
output += f"- TLS Enabled: {env['TLSConfig'].get('TLS', False)}\n"
output += f"- TLS Skip Verify: {env['TLSConfig'].get('TLSSkipVerify', False)}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get environment: HTTP {result['status_code']}")]
elif name == "create_docker_environment":
req_name = arguments.get("name")
url = arguments.get("url")
if not req_name or not url:
return [types.TextContent(type="text", text="Error: name and url are required")]
env_data = {
"Name": req_name,
"Type": EnvironmentType.DOCKER.value,
"URL": url,
"EndpointCreationType": 1 # Local environment
}
if arguments.get("public_url"):
env_data["PublicURL"] = arguments["public_url"]
if arguments.get("group_id"):
env_data["GroupId"] = arguments["group_id"]
if arguments.get("tags"):
env_data["TagIds"] = arguments["tags"]
# TLS configuration
if arguments.get("tls") or arguments.get("tls_skip_verify"):
env_data["TLSConfig"] = {
"TLS": arguments.get("tls", False),
"TLSSkipVerify": arguments.get("tls_skip_verify", False)
}
# Try both endpoints
result = await make_request("POST", "/environments", env_data)
if result["status_code"] == 404:
result = await make_request("POST", "/endpoints", env_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Docker environment '{req_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create Docker environment: HTTP {result['status_code']} - {result['text']}")]
elif name == "create_kubernetes_environment":
req_name = arguments.get("name")
url = arguments.get("url")
if not req_name or not url:
return [types.TextContent(type="text", text="Error: name and url are required")]
env_data = {
"Name": req_name,
"Type": EnvironmentType.KUBERNETES.value,
"URL": url,
"EndpointCreationType": 3 # Kubernetes environment
}
if arguments.get("bearer_token"):
env_data["Token"] = arguments["bearer_token"]
if arguments.get("group_id"):
env_data["GroupId"] = arguments["group_id"]
if arguments.get("tags"):
env_data["TagIds"] = arguments["tags"]
if arguments.get("tls_skip_verify"):
env_data["TLSConfig"] = {
"TLS": True,
"TLSSkipVerify": arguments["tls_skip_verify"]
}
# Try both endpoints
result = await make_request("POST", "/environments", env_data)
if result["status_code"] == 404:
result = await make_request("POST", "/endpoints", env_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Kubernetes environment '{req_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create Kubernetes environment: HTTP {result['status_code']} - {result['text']}")]
elif name == "update_environment":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Get current environment first
result = await make_request("GET", f"/environments/{env_id}")
if result["status_code"] == 404:
result = await make_request("GET", f"/endpoints/{env_id}")
if result["status_code"] != 200:
return [types.TextContent(type="text", text=f"Failed to get current environment: HTTP {result['status_code']}")]
env_data = result["data"]
# Update only provided fields
if "name" in arguments:
env_data["Name"] = arguments["name"]
if "url" in arguments:
env_data["URL"] = arguments["url"]
if "public_url" in arguments:
env_data["PublicURL"] = arguments["public_url"]
if "group_id" in arguments:
env_data["GroupId"] = arguments["group_id"]
if "tags" in arguments:
env_data["TagIds"] = arguments["tags"]
# Try both endpoints
result = await make_request("PUT", f"/environments/{env_id}", env_data)
if result["status_code"] == 404:
result = await make_request("PUT", f"/endpoints/{env_id}", env_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Environment {env_id} updated successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to update environment: HTTP {result['status_code']}")]
elif name == "delete_environment":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Try both endpoints
result = await make_request("DELETE", f"/environments/{env_id}")
if result["status_code"] == 404:
result = await make_request("DELETE", f"/endpoints/{env_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Environment {env_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete environment: HTTP {result['status_code']}")]
# Environment status and management
elif name == "get_environment_status":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Try to get docker info through Portainer proxy
result = await make_request("GET", f"/environments/{env_id}/docker/info")
if result["status_code"] == 404:
result = await make_request("GET", f"/endpoints/{env_id}/docker/info")
if result["status_code"] == 200:
info = result["data"]
output = f"Environment Status:\n"
output += f"- Status: up\n"
output += f"- Docker Version: {info.get('ServerVersion', 'N/A')}\n"
output += f"- Containers: {info.get('Containers', 0)}\n"
output += f"- Running: {info.get('ContainersRunning', 0)}\n"
output += f"- Stopped: {info.get('ContainersStopped', 0)}\n"
output += f"- Images: {info.get('Images', 0)}\n"
output += f"- CPU Count: {info.get('NCPU', 0)}\n"
output += f"- Memory: {info.get('MemTotal', 0) / (1024**3):.2f} GB\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text="Environment is down or inaccessible")]
elif name == "associate_environment":
env_id = arguments.get("environment_id")
teams = arguments.get("teams", [])
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
assoc_data = {
"TeamAccessPolicies": {}
}
for team in teams:
team_id = team.get("team_id")
access_level = team.get("access_level", "read")
if team_id:
assoc_data["TeamAccessPolicies"][str(team_id)] = {"AccessLevel": access_level}
# Try both endpoints
result = await make_request("PUT", f"/environments/{env_id}/association", assoc_data)
if result["status_code"] == 404:
result = await make_request("PUT", f"/endpoints/{env_id}/association", assoc_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Environment {env_id} team associations updated")]
else:
return [types.TextContent(type="text", text=f"Failed to update associations: HTTP {result['status_code']}")]
# Environment groups
elif name == "list_environment_groups":
result = await make_request("GET", "/endpoint_groups")
if result["status_code"] == 200:
groups = result["data"]
output = f"Found {len(groups)} environment groups:\n"
for group in groups:
output += f"- ID: {group.get('Id')}, Name: {group.get('Name')}\n"
if group.get('Description'):
output += f" Description: {group.get('Description')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list groups: HTTP {result['status_code']}")]
elif name == "create_environment_group":
group_name = arguments.get("name")
if not group_name:
return [types.TextContent(type="text", text="Error: name is required")]
group_data = {
"Name": group_name,
"Description": arguments.get("description", "")
}
result = await make_request("POST", "/endpoint_groups", group_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Environment group '{group_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create group: HTTP {result['status_code']}")]
elif name == "update_environment_group":
group_id = arguments.get("group_id")
if not group_id:
return [types.TextContent(type="text", text="Error: group_id is required")]
update_data = {}
if "name" in arguments:
update_data["Name"] = arguments["name"]
if "description" in arguments:
update_data["Description"] = arguments["description"]
result = await make_request("PUT", f"/endpoint_groups/{group_id}", update_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Environment group {group_id} updated successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to update group: HTTP {result['status_code']}")]
elif name == "delete_environment_group":
group_id = arguments.get("group_id")
if not group_id:
return [types.TextContent(type="text", text="Error: group_id is required")]
result = await make_request("DELETE", f"/endpoint_groups/{group_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Environment group {group_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete group: HTTP {result['status_code']}")]
# Tags management
elif name == "list_tags":
result = await make_request("GET", "/tags")
if result["status_code"] == 200:
tags = result["data"]
output = f"Found {len(tags)} tags:\n"
for tag in tags:
output += f"- ID: {tag.get('ID')}, Name: {tag.get('Name')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list tags: HTTP {result['status_code']}")]
elif name == "create_tag":
tag_name = arguments.get("name")
if not tag_name:
return [types.TextContent(type="text", text="Error: name is required")]
tag_data = {"Name": tag_name}
result = await make_request("POST", "/tags", tag_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Tag '{tag_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create tag: HTTP {result['status_code']}")]
elif name == "delete_tag":
tag_id = arguments.get("tag_id")
if not tag_id:
return [types.TextContent(type="text", text="Error: tag_id is required")]
result = await make_request("DELETE", f"/tags/{tag_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Tag {tag_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete tag: HTTP {result['status_code']}")]
# Edge environments
elif name == "generate_edge_key":
edge_name = arguments.get("name")
if not edge_name:
return [types.TextContent(type="text", text="Error: name is required")]
edge_data = {
"Name": edge_name,
"Type": EnvironmentType.EDGE_AGENT.value,
"EndpointCreationType": 4 # Edge agent
}
if arguments.get("group_id"):
edge_data["GroupId"] = arguments["group_id"]
# Try both endpoints
result = await make_request("POST", "/environments", edge_data)
if result["status_code"] == 404:
result = await make_request("POST", "/endpoints", edge_data)
if result["status_code"] in [200, 201]:
env_id = result["data"].get("Id")
edge_key = result["data"].get("EdgeKey", "")
output = f"✓ Edge environment '{edge_name}' created\n"
output += f"- Environment ID: {env_id}\n"
output += f"- Edge Key: {edge_key}\n"
output += f"\nDeployment command:\n"
output += f"docker run -d --name portainer_edge_agent --restart always \\\n"
output += f" -v /var/run/docker.sock:/var/run/docker.sock \\\n"
output += f" -v /var/lib/docker/volumes:/var/lib/docker/volumes \\\n"
output += f" -v /:/host \\\n"
output += f" -v portainer_agent_data:/data \\\n"
output += f" --env EDGE=1 \\\n"
output += f" --env EDGE_ID={env_id} \\\n"
output += f" --env EDGE_KEY={edge_key} \\\n"
output += f" --env EDGE_INSECURE_POLL=1 \\\n"
output += f" portainer/agent:latest"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to create Edge environment: HTTP {result['status_code']}")]
else:
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
except httpx.TimeoutException:
return [types.TextContent(type="text", text="Error: Request timed out. The Portainer server may be slow or unresponsive.")]
except httpx.ConnectError:
return [types.TextContent(type="text", text="Error: Could not connect to Portainer server. Please check the URL and network connection.")]
except Exception as e:
import traceback
error_details = f"Error: {str(e)}\nType: {type(e).__name__}"
if hasattr(e, "__traceback__"):
error_details += f"\nTraceback: {traceback.format_exc()}"
return [types.TextContent(type="text", text=error_details)]
async def run():
"""Run the MCP server."""
# Use stdio transport
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="portainer-environments",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(
prompts_changed=False,
resources_changed=False,
tools_changed=False,
),
experimental_capabilities={},
),
),
)
def main():
"""Main entry point."""
asyncio.run(run())
if __name__ == "__main__":
main()