portainer-mcp/portainer_environments_server.py
Adolfo Delorenzo d5f8ae5794 refactor: clean up codebase and rename core server
- Remove test files and demos (test_*.py, create_nginx_stack.py)
- Remove build artifacts (egg-info directory)
- Rename merged_mcp_server.py to portainer_core_server.py for consistency
- Update documentation to reflect new naming
- Add comprehensive docstrings to all Python files
- Maintain all essential functionality

This cleanup improves code organization while preserving all production servers:
- portainer_core_server.py (formerly merged_mcp_server.py)
- portainer_docker_server.py
- portainer_edge_server.py
- portainer_environments_server.py
- portainer_gitops_server.py
- portainer_kubernetes_server.py
- portainer_stacks_server.py

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 00:43:23 -03:00

1043 lines
42 KiB
Python
Executable File

#!/usr/bin/env python3
"""
MCP server for Portainer Environments management.
This module provides comprehensive environment and endpoint management functionality
for Portainer Business Edition. It handles Docker, Kubernetes, and Edge environments
with support for groups, tags, and team associations.
The server implements tools for:
- Environment CRUD operations (Docker, Kubernetes, Edge)
- Environment status monitoring and health checks
- Environment groups for logical organization
- Tag management for categorization
- Team access control associations
- Edge agent deployment script generation
Complexity: O(1) for all operations (simple HTTP requests)
Dependencies: httpx for async HTTP, mcp for server protocol, enum for type constants
Call Flow: MCP client -> handle_call_tool() -> make_request() -> Portainer API
Environment Variables:
PORTAINER_URL: Base URL of Portainer instance
PORTAINER_API_KEY: API key for authentication
MCP_MODE: Set to "true" to suppress logging
API Compatibility:
Supports both new (/environments) and legacy (/endpoints) API endpoints
for backward compatibility with older Portainer versions.
"""
import os
import sys
import json
import asyncio
from typing import Any, Dict, List, Optional
from enum import Enum
# Suppress all logging to stderr
os.environ["MCP_MODE"] = "true"
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Create server
server = Server("portainer-environments")
# Store for our state
portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live")
api_key = os.getenv("PORTAINER_API_KEY", "")
# Environment types enum
class EnvironmentType(Enum):
"""
Enumeration of supported Portainer environment types.
Maps environment type names to their numeric API values.
Used when creating new environments to specify the type.
Values:
DOCKER: Standard Docker environment
DOCKER_SWARM: Docker Swarm cluster
KUBERNETES: Kubernetes cluster
ACI: Azure Container Instances
EDGE_AGENT: Edge computing environment
"""
DOCKER = 1
DOCKER_SWARM = 2
KUBERNETES = 3
ACI = 4
EDGE_AGENT = 5
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""
List available tools.
Returns a comprehensive list of environment management tools.
Each tool includes its name, description, and JSON schema for input validation.
Returns:
list[types.Tool]: List of available tools with their schemas
Complexity: O(1) - Returns static list
Call Flow:
- Called by: MCP protocol during initialization
- Calls: None (static return)
Tool Categories:
- Basic Operations: list, get, create, update, delete environments
- Status: get_environment_status for health monitoring
- Groups: CRUD operations for environment groups
- Tags: Create and manage environment tags
- Edge: Generate Edge agent deployment scripts
- Access: Team association management
"""
return [
# Basic environment operations
types.Tool(
name="list_environments",
description="List all environments/endpoints",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "Number of environments to return (default: all)",
"default": 100
},
"start": {
"type": "integer",
"description": "Starting index for pagination",
"default": 0
}
},
"required": []
}
),
types.Tool(
name="get_environment",
description="Get details of a specific environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_docker_environment",
description="Create a new Docker environment",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the environment"
},
"url": {
"type": "string",
"description": "Docker API URL (e.g., tcp://localhost:2375 or unix:///var/run/docker.sock)"
},
"public_url": {
"type": "string",
"description": "Public URL for accessing the environment (optional)"
},
"group_id": {
"type": "integer",
"description": "Environment group ID (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Array of tags for the environment (optional)"
},
"tls": {
"type": "boolean",
"description": "Enable TLS",
"default": False
},
"tls_skip_verify": {
"type": "boolean",
"description": "Skip TLS certificate verification",
"default": False
}
},
"required": ["name", "url"]
}
),
types.Tool(
name="create_kubernetes_environment",
description="Create a new Kubernetes environment",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the environment"
},
"url": {
"type": "string",
"description": "Kubernetes API server URL"
},
"bearer_token": {
"type": "string",
"description": "Bearer token for authentication"
},
"group_id": {
"type": "integer",
"description": "Environment group ID (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Array of tags for the environment (optional)"
},
"tls_skip_verify": {
"type": "boolean",
"description": "Skip TLS certificate verification",
"default": False
}
},
"required": ["name", "url"]
}
),
types.Tool(
name="update_environment",
description="Update an existing environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment to update"
},
"name": {
"type": "string",
"description": "New name for the environment (optional)"
},
"url": {
"type": "string",
"description": "New URL for the environment (optional)"
},
"public_url": {
"type": "string",
"description": "New public URL (optional)"
},
"group_id": {
"type": "integer",
"description": "New group ID (optional)"
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "New tags array (optional)"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="delete_environment",
description="Delete an environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment to delete"
}
},
"required": ["environment_id"]
}
),
# Environment status and management
types.Tool(
name="get_environment_status",
description="Get status and statistics of an environment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="associate_environment",
description="Associate/disassociate environment with teams",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the environment"
},
"teams": {
"type": "array",
"items": {
"type": "object",
"properties": {
"team_id": {"type": "integer"},
"access_level": {"type": "string", "enum": ["read", "write"]}
}
},
"description": "Array of team associations"
}
},
"required": ["environment_id", "teams"]
}
),
# Environment groups
types.Tool(
name="list_environment_groups",
description="List all environment groups",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_environment_group",
description="Create a new environment group",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the environment group"
},
"description": {
"type": "string",
"description": "Description of the group (optional)"
}
},
"required": ["name"]
}
),
types.Tool(
name="update_environment_group",
description="Update an environment group",
inputSchema={
"type": "object",
"properties": {
"group_id": {
"type": "integer",
"description": "ID of the group to update"
},
"name": {
"type": "string",
"description": "New name for the group (optional)"
},
"description": {
"type": "string",
"description": "New description (optional)"
}
},
"required": ["group_id"]
}
),
types.Tool(
name="delete_environment_group",
description="Delete an environment group",
inputSchema={
"type": "object",
"properties": {
"group_id": {
"type": "integer",
"description": "ID of the group to delete"
}
},
"required": ["group_id"]
}
),
# Tags management
types.Tool(
name="list_tags",
description="List all available tags",
inputSchema={
"type": "object",
"properties": {},
"required": []
}
),
types.Tool(
name="create_tag",
description="Create a new tag",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the tag"
}
},
"required": ["name"]
}
),
types.Tool(
name="delete_tag",
description="Delete a tag",
inputSchema={
"type": "object",
"properties": {
"tag_id": {
"type": "integer",
"description": "ID of the tag to delete"
}
},
"required": ["tag_id"]
}
),
# Edge environments
types.Tool(
name="generate_edge_key",
description="Generate Edge agent deployment script",
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name for the Edge environment"
},
"group_id": {
"type": "integer",
"description": "Environment group ID (optional)"
}
},
"required": ["name"]
}
)
]
async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
"""
Make HTTP request to Portainer API.
Centralized HTTP request handler that manages all API communication with Portainer.
Handles authentication headers, timeout configuration, and safe JSON parsing.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint path (without /api prefix)
json_data: Optional JSON payload for POST/PUT requests
Returns:
Dict containing status_code, data (parsed JSON), and raw text
Complexity: O(1) - Single HTTP request
Call Flow:
- Called by: handle_call_tool() for all API operations
- Calls: httpx for async HTTP requests
Error Handling:
- 30 second timeout for long-running operations
- Safe JSON parsing with content-type checking
- SSL verification disabled for self-signed certificates
"""
import httpx
async with httpx.AsyncClient(verify=False, timeout=30.0) as client:
headers = {"X-API-Key": api_key} if api_key else {}
if method == "GET":
response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers)
elif method == "POST":
response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "PUT":
response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "DELETE":
response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers)
else:
raise ValueError(f"Unsupported method: {method}")
# Parse JSON response safely
# Check content-type header to avoid parsing non-JSON responses
try:
data = response.json() if response.text and response.headers.get("content-type", "").startswith("application/json") else None
except Exception:
data = None
return {"status_code": response.status_code, "data": data, "text": response.text}
def format_environment_type(env_type: int) -> str:
"""
Convert environment type ID to readable string.
Maps numeric environment type values from the API to human-readable names.
Includes support for local environment types not in the enum.
Args:
env_type: Numeric environment type from API
Returns:
Human-readable environment type name
Complexity: O(1) - Dictionary lookup
Call Flow:
- Called by: list_environments, get_environment in handle_call_tool()
- Calls: None (pure function)
"""
type_map = {
1: "Docker",
2: "Docker Swarm",
3: "Kubernetes",
4: "Azure ACI",
5: "Edge Agent",
6: "Local Docker",
7: "Local Kubernetes"
}
return type_map.get(env_type, f"Unknown({env_type})")
def format_environment_status(status: int) -> str:
"""
Convert environment status to readable string.
Maps numeric status values to human-readable status descriptions.
Args:
status: Numeric status value from API
Returns:
Human-readable status ("up", "down", or "unknown")
Complexity: O(1) - Dictionary lookup
Call Flow:
- Called by: list_environments, get_environment in handle_call_tool()
- Calls: None (pure function)
"""
status_map = {
1: "up",
2: "down"
}
return status_map.get(status, f"unknown({status})")
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""
Handle tool calls.
Main request dispatcher that processes all incoming tool requests from MCP clients.
Routes requests to appropriate Portainer API endpoints and formats responses.
Handles both new (/environments) and legacy (/endpoints) API endpoints.
Args:
name: The name of the tool to execute
arguments: Optional dictionary of arguments for the tool
Returns:
list[types.TextContent]: Response messages as text content
Complexity: O(1) - Each tool performs single API request
Call Flow:
- Called by: MCP protocol when client invokes a tool
- Calls: make_request() for API operations, format_* functions for display
Error Handling:
- Validates required arguments before API calls
- Handles both new and legacy API endpoints with fallback
- Specific error messages for timeouts and connection failures
- Detailed traceback for debugging unexpected errors
"""
import httpx
try:
# Basic environment operations
if name == "list_environments":
limit = arguments.get("limit", 100) if arguments else 100
start = arguments.get("start", 0) if arguments else 0
# Try both /environments (new) and /endpoints (old) endpoints
# This dual-endpoint approach ensures compatibility across Portainer versions
result = await make_request("GET", f"/environments?limit={limit}&start={start}")
if result["status_code"] == 404:
# Legacy endpoint doesn't support pagination params in URL
# Fall back to fetching all and paginating manually
result = await make_request("GET", "/endpoints")
if result["status_code"] == 200 and result["data"] is not None:
environments = result["data"]
# Handle pagination manually for legacy endpoint
# Legacy API returns array, new API might return paginated object
if isinstance(environments, list):
total = len(environments)
environments = environments[start:start + limit]
output = f"Found {total} environments (showing {len(environments)}):\n"
else:
output = f"Found environments:\n"
for env in environments[:10]: # Limit output to first 10 to avoid huge responses
# Format each environment with essential details
env_type = format_environment_type(env.get("Type", 0))
status = format_environment_status(env.get("Status", 0))
output += f"\n- ID: {env.get('Id')}, Name: {env.get('Name')}"
output += f"\n Type: {env_type}, Status: {status}"
output += f"\n URL: {env.get('URL', 'N/A')}"
if env.get("GroupId"):
output += f"\n Group ID: {env.get('GroupId')}"
if env.get("TagIds") or env.get("Tags"):
tags = env.get("Tags", [])
if tags:
tag_names = [t.get('Name', '') for t in tags if isinstance(t, dict)]
if tag_names:
output += f"\n Tags: {', '.join(tag_names)}"
if len(environments) > 10:
output += f"\n\n... and {len(environments) - 10} more environments"
return [types.TextContent(type="text", text=output)]
else:
error_msg = f"Failed to list environments: HTTP {result['status_code']}"
if result.get("text"):
error_msg += f" - {result['text'][:200]}"
return [types.TextContent(type="text", text=error_msg)]
elif name == "get_environment":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Try both endpoints
result = await make_request("GET", f"/environments/{env_id}")
if result["status_code"] == 404:
result = await make_request("GET", f"/endpoints/{env_id}")
if result["status_code"] == 200:
env = result["data"]
output = f"Environment Details:\n"
output += f"- ID: {env.get('Id')}\n"
output += f"- Name: {env.get('Name')}\n"
output += f"- Type: {format_environment_type(env.get('Type', 0))}\n"
output += f"- Status: {format_environment_status(env.get('Status', 0))}\n"
output += f"- URL: {env.get('URL', 'N/A')}\n"
output += f"- Public URL: {env.get('PublicURL', 'N/A')}\n"
if env.get("GroupId"):
output += f"- Group ID: {env.get('GroupId')}\n"
if env.get("Tags"):
output += f"- Tags: {', '.join([t.get('Name', '') for t in env.get('Tags', [])])}\n"
if env.get("TLSConfig"):
output += f"- TLS Enabled: {env['TLSConfig'].get('TLS', False)}\n"
output += f"- TLS Skip Verify: {env['TLSConfig'].get('TLSSkipVerify', False)}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get environment: HTTP {result['status_code']}")]
elif name == "create_docker_environment":
req_name = arguments.get("name")
url = arguments.get("url")
if not req_name or not url:
return [types.TextContent(type="text", text="Error: name and url are required")]
env_data = {
"Name": req_name,
"Type": EnvironmentType.DOCKER.value,
"URL": url,
"EndpointCreationType": 1 # Local environment
}
if arguments.get("public_url"):
env_data["PublicURL"] = arguments["public_url"]
if arguments.get("group_id"):
env_data["GroupId"] = arguments["group_id"]
if arguments.get("tags"):
env_data["TagIds"] = arguments["tags"]
# TLS configuration for secure connections
# Required when connecting to Docker daemon over TCP with TLS
if arguments.get("tls") or arguments.get("tls_skip_verify"):
env_data["TLSConfig"] = {
"TLS": arguments.get("tls", False),
"TLSSkipVerify": arguments.get("tls_skip_verify", False)
}
# Try both endpoints
result = await make_request("POST", "/environments", env_data)
if result["status_code"] == 404:
result = await make_request("POST", "/endpoints", env_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Docker environment '{req_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create Docker environment: HTTP {result['status_code']} - {result['text']}")]
elif name == "create_kubernetes_environment":
req_name = arguments.get("name")
url = arguments.get("url")
if not req_name or not url:
return [types.TextContent(type="text", text="Error: name and url are required")]
env_data = {
"Name": req_name,
"Type": EnvironmentType.KUBERNETES.value,
"URL": url,
"EndpointCreationType": 3 # Kubernetes environment
}
if arguments.get("bearer_token"):
env_data["Token"] = arguments["bearer_token"]
if arguments.get("group_id"):
env_data["GroupId"] = arguments["group_id"]
if arguments.get("tags"):
env_data["TagIds"] = arguments["tags"]
if arguments.get("tls_skip_verify"):
env_data["TLSConfig"] = {
"TLS": True,
"TLSSkipVerify": arguments["tls_skip_verify"]
}
# Try both endpoints
result = await make_request("POST", "/environments", env_data)
if result["status_code"] == 404:
result = await make_request("POST", "/endpoints", env_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Kubernetes environment '{req_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create Kubernetes environment: HTTP {result['status_code']} - {result['text']}")]
elif name == "update_environment":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Get current environment first to preserve unchanged fields
# Portainer requires complete environment object for updates
result = await make_request("GET", f"/environments/{env_id}")
if result["status_code"] == 404:
result = await make_request("GET", f"/endpoints/{env_id}")
if result["status_code"] != 200:
return [types.TextContent(type="text", text=f"Failed to get current environment: HTTP {result['status_code']}")]
env_data = result["data"]
# Update only provided fields
if "name" in arguments:
env_data["Name"] = arguments["name"]
if "url" in arguments:
env_data["URL"] = arguments["url"]
if "public_url" in arguments:
env_data["PublicURL"] = arguments["public_url"]
if "group_id" in arguments:
env_data["GroupId"] = arguments["group_id"]
if "tags" in arguments:
env_data["TagIds"] = arguments["tags"]
# Try both endpoints
result = await make_request("PUT", f"/environments/{env_id}", env_data)
if result["status_code"] == 404:
result = await make_request("PUT", f"/endpoints/{env_id}", env_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Environment {env_id} updated successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to update environment: HTTP {result['status_code']}")]
elif name == "delete_environment":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Try both endpoints
result = await make_request("DELETE", f"/environments/{env_id}")
if result["status_code"] == 404:
result = await make_request("DELETE", f"/endpoints/{env_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Environment {env_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete environment: HTTP {result['status_code']}")]
# Environment status and management
elif name == "get_environment_status":
env_id = arguments.get("environment_id")
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
# Try to get docker info through Portainer proxy
# This endpoint proxies requests to the actual Docker/Kubernetes API
result = await make_request("GET", f"/environments/{env_id}/docker/info")
if result["status_code"] == 404:
result = await make_request("GET", f"/endpoints/{env_id}/docker/info")
if result["status_code"] == 200:
info = result["data"]
output = f"Environment Status:\n"
output += f"- Status: up\n"
output += f"- Docker Version: {info.get('ServerVersion', 'N/A')}\n"
output += f"- Containers: {info.get('Containers', 0)}\n"
output += f"- Running: {info.get('ContainersRunning', 0)}\n"
output += f"- Stopped: {info.get('ContainersStopped', 0)}\n"
output += f"- Images: {info.get('Images', 0)}\n"
output += f"- CPU Count: {info.get('NCPU', 0)}\n"
output += f"- Memory: {info.get('MemTotal', 0) / (1024**3):.2f} GB\n" # Convert bytes to GB
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text="Environment is down or inaccessible")]
elif name == "associate_environment":
env_id = arguments.get("environment_id")
teams = arguments.get("teams", [])
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
assoc_data = {
"TeamAccessPolicies": {}
}
for team in teams:
team_id = team.get("team_id")
access_level = team.get("access_level", "read")
if team_id:
assoc_data["TeamAccessPolicies"][str(team_id)] = {"AccessLevel": access_level}
# Try both endpoints
result = await make_request("PUT", f"/environments/{env_id}/association", assoc_data)
if result["status_code"] == 404:
result = await make_request("PUT", f"/endpoints/{env_id}/association", assoc_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Environment {env_id} team associations updated")]
else:
return [types.TextContent(type="text", text=f"Failed to update associations: HTTP {result['status_code']}")]
# Environment groups
elif name == "list_environment_groups":
result = await make_request("GET", "/endpoint_groups")
if result["status_code"] == 200:
groups = result["data"]
output = f"Found {len(groups)} environment groups:\n"
for group in groups:
output += f"- ID: {group.get('Id')}, Name: {group.get('Name')}\n"
if group.get('Description'):
output += f" Description: {group.get('Description')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list groups: HTTP {result['status_code']}")]
elif name == "create_environment_group":
group_name = arguments.get("name")
if not group_name:
return [types.TextContent(type="text", text="Error: name is required")]
group_data = {
"Name": group_name,
"Description": arguments.get("description", "")
}
result = await make_request("POST", "/endpoint_groups", group_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Environment group '{group_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create group: HTTP {result['status_code']}")]
elif name == "update_environment_group":
group_id = arguments.get("group_id")
if not group_id:
return [types.TextContent(type="text", text="Error: group_id is required")]
update_data = {}
if "name" in arguments:
update_data["Name"] = arguments["name"]
if "description" in arguments:
update_data["Description"] = arguments["description"]
result = await make_request("PUT", f"/endpoint_groups/{group_id}", update_data)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Environment group {group_id} updated successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to update group: HTTP {result['status_code']}")]
elif name == "delete_environment_group":
group_id = arguments.get("group_id")
if not group_id:
return [types.TextContent(type="text", text="Error: group_id is required")]
result = await make_request("DELETE", f"/endpoint_groups/{group_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Environment group {group_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete group: HTTP {result['status_code']}")]
# Tags management
elif name == "list_tags":
result = await make_request("GET", "/tags")
if result["status_code"] == 200:
tags = result["data"]
output = f"Found {len(tags)} tags:\n"
for tag in tags:
output += f"- ID: {tag.get('ID')}, Name: {tag.get('Name')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list tags: HTTP {result['status_code']}")]
elif name == "create_tag":
tag_name = arguments.get("name")
if not tag_name:
return [types.TextContent(type="text", text="Error: name is required")]
tag_data = {"Name": tag_name}
result = await make_request("POST", "/tags", tag_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Tag '{tag_name}' created successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to create tag: HTTP {result['status_code']}")]
elif name == "delete_tag":
tag_id = arguments.get("tag_id")
if not tag_id:
return [types.TextContent(type="text", text="Error: tag_id is required")]
result = await make_request("DELETE", f"/tags/{tag_id}")
if result["status_code"] in [200, 204]:
return [types.TextContent(type="text", text=f"✓ Tag {tag_id} deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete tag: HTTP {result['status_code']}")]
# Edge environments
elif name == "generate_edge_key":
edge_name = arguments.get("name")
if not edge_name:
return [types.TextContent(type="text", text="Error: name is required")]
edge_data = {
"Name": edge_name,
"Type": EnvironmentType.EDGE_AGENT.value,
"EndpointCreationType": 4 # Edge agent
}
if arguments.get("group_id"):
edge_data["GroupId"] = arguments["group_id"]
# Try both endpoints
result = await make_request("POST", "/environments", edge_data)
if result["status_code"] == 404:
result = await make_request("POST", "/endpoints", edge_data)
if result["status_code"] in [200, 201]:
env_id = result["data"].get("Id")
edge_key = result["data"].get("EdgeKey", "")
output = f"✓ Edge environment '{edge_name}' created\n"
output += f"- Environment ID: {env_id}\n"
output += f"- Edge Key: {edge_key}\n"
output += f"\nDeployment command:\n" # Provide ready-to-use Docker command
output += f"docker run -d --name portainer_edge_agent --restart always \\\n"
output += f" -v /var/run/docker.sock:/var/run/docker.sock \\\n"
output += f" -v /var/lib/docker/volumes:/var/lib/docker/volumes \\\n"
output += f" -v /:/host \\\n"
output += f" -v portainer_agent_data:/data \\\n"
output += f" --env EDGE=1 \\\n"
output += f" --env EDGE_ID={env_id} \\\n"
output += f" --env EDGE_KEY={edge_key} \\\n"
output += f" --env EDGE_INSECURE_POLL=1 \\\n"
output += f" portainer/agent:latest"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to create Edge environment: HTTP {result['status_code']}")]
else:
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
except httpx.TimeoutException:
return [types.TextContent(type="text", text="Error: Request timed out. The Portainer server may be slow or unresponsive.")]
except httpx.ConnectError:
return [types.TextContent(type="text", text="Error: Could not connect to Portainer server. Please check the URL and network connection.")]
except Exception as e:
import traceback
error_details = f"Error: {str(e)}\nType: {type(e).__name__}"
if hasattr(e, "__traceback__"):
error_details += f"\nTraceback: {traceback.format_exc()}"
return [types.TextContent(type="text", text=error_details)]
async def run():
"""
Run the MCP server.
Sets up the stdio transport and runs the server with configured capabilities.
This is the main async loop that handles all MCP protocol communication.
The server runs indefinitely until interrupted, processing tool requests
and responses through stdin/stdout streams.
Complexity: O(1) - Server initialization
Call Flow:
- Called by: main() via asyncio.run()
- Calls: server.run() with stdio streams
Configuration:
- Uses stdio transport for Claude Desktop compatibility
- Disables all change notifications (tools are static)
- Server version 1.0.0 indicates stable implementation
"""
# Use stdio transport
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="portainer-environments",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(
prompts_changed=False,
resources_changed=False,
tools_changed=False,
),
experimental_capabilities={},
),
),
)
def main():
"""
Main entry point.
Synchronous wrapper that starts the async server loop.
This is the script's entry point when run directly.
Complexity: O(1)
Call Flow:
- Called by: __main__ block or external scripts
- Calls: asyncio.run(run())
"""
asyncio.run(run())
if __name__ == "__main__":
main()