portainer-mcp/portainer_kubernetes_server.py
Adolfo Delorenzo d5f8ae5794 refactor: clean up codebase and rename core server
- Remove test files and demos (test_*.py, create_nginx_stack.py)
- Remove build artifacts (egg-info directory)
- Rename merged_mcp_server.py to portainer_core_server.py for consistency
- Update documentation to reflect new naming
- Add comprehensive docstrings to all Python files
- Maintain all essential functionality

This cleanup improves code organization while preserving all production servers:
- portainer_core_server.py (formerly merged_mcp_server.py)
- portainer_docker_server.py
- portainer_edge_server.py
- portainer_environments_server.py
- portainer_gitops_server.py
- portainer_kubernetes_server.py
- portainer_stacks_server.py

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-19 00:43:23 -03:00

1876 lines
83 KiB
Python
Executable File

#!/usr/bin/env python3
"""MCP server for Portainer Kubernetes management."""
import os
import sys
import json
import asyncio
from typing import Any, Dict, List, Optional
from enum import Enum
import base64
# Suppress all logging to stderr
os.environ["MCP_MODE"] = "true"
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Create server
server = Server("portainer-kubernetes")
# Store for our state
portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live")
api_key = os.getenv("PORTAINER_API_KEY", "")
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools."""
return [
# Namespace Management
types.Tool(
name="list_namespaces",
description="List all namespaces in a Kubernetes cluster",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_namespace",
description="Create a new namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"name": {
"type": "string",
"description": "Namespace name"
},
"labels": {
"type": "object",
"description": "Labels for the namespace (optional)"
}
},
"required": ["environment_id", "name"]
}
),
types.Tool(
name="delete_namespace",
description="Delete a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name"
}
},
"required": ["environment_id", "namespace"]
}
),
# Pod Management
types.Tool(
name="list_pods",
description="List pods in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"label_selector": {
"type": "string",
"description": "Label selector to filter pods (optional)"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="get_pod",
description="Get detailed information about a pod",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"pod_name": {
"type": "string",
"description": "Pod name"
}
},
"required": ["environment_id", "pod_name"]
}
),
types.Tool(
name="delete_pod",
description="Delete a pod",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"pod_name": {
"type": "string",
"description": "Pod name"
}
},
"required": ["environment_id", "pod_name"]
}
),
types.Tool(
name="get_pod_logs",
description="Get logs from a pod",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"pod_name": {
"type": "string",
"description": "Pod name"
},
"container": {
"type": "string",
"description": "Container name (optional if pod has single container)"
},
"tail": {
"type": "integer",
"description": "Number of lines from the end",
"default": 100
}
},
"required": ["environment_id", "pod_name"]
}
),
# Deployment Management
types.Tool(
name="list_deployments",
description="List deployments in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="get_deployment",
description="Get detailed information about a deployment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"deployment_name": {
"type": "string",
"description": "Deployment name"
}
},
"required": ["environment_id", "deployment_name"]
}
),
types.Tool(
name="create_deployment",
description="Create a new deployment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"name": {
"type": "string",
"description": "Deployment name"
},
"image": {
"type": "string",
"description": "Container image"
},
"replicas": {
"type": "integer",
"description": "Number of replicas",
"default": 1
},
"port": {
"type": "integer",
"description": "Container port (optional)"
},
"env": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"value": {"type": "string"}
}
},
"description": "Environment variables (optional)"
},
"labels": {
"type": "object",
"description": "Labels for the deployment (optional)"
}
},
"required": ["environment_id", "name", "image"]
}
),
types.Tool(
name="scale_deployment",
description="Scale a deployment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"deployment_name": {
"type": "string",
"description": "Deployment name"
},
"replicas": {
"type": "integer",
"description": "Number of replicas"
}
},
"required": ["environment_id", "deployment_name", "replicas"]
}
),
types.Tool(
name="update_deployment_image",
description="Update deployment container image",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"deployment_name": {
"type": "string",
"description": "Deployment name"
},
"container_name": {
"type": "string",
"description": "Container name"
},
"new_image": {
"type": "string",
"description": "New container image"
}
},
"required": ["environment_id", "deployment_name", "container_name", "new_image"]
}
),
types.Tool(
name="restart_deployment",
description="Restart a deployment by updating annotation",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"deployment_name": {
"type": "string",
"description": "Deployment name"
}
},
"required": ["environment_id", "deployment_name"]
}
),
types.Tool(
name="delete_deployment",
description="Delete a deployment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"deployment_name": {
"type": "string",
"description": "Deployment name"
}
},
"required": ["environment_id", "deployment_name"]
}
),
# Service Management
types.Tool(
name="list_services",
description="List services in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_service",
description="Create a service for a deployment",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"name": {
"type": "string",
"description": "Service name"
},
"selector": {
"type": "object",
"description": "Pod selector labels"
},
"ports": {
"type": "array",
"items": {
"type": "object",
"properties": {
"port": {"type": "integer"},
"targetPort": {"type": "integer"},
"protocol": {"type": "string", "default": "TCP"}
}
},
"description": "Service ports"
},
"type": {
"type": "string",
"enum": ["ClusterIP", "NodePort", "LoadBalancer"],
"default": "ClusterIP"
}
},
"required": ["environment_id", "name", "selector", "ports"]
}
),
types.Tool(
name="delete_service",
description="Delete a service",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"service_name": {
"type": "string",
"description": "Service name"
}
},
"required": ["environment_id", "service_name"]
}
),
# Ingress Management
types.Tool(
name="list_ingresses",
description="List ingresses in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_ingress",
description="Create an ingress",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"name": {
"type": "string",
"description": "Ingress name"
},
"rules": {
"type": "array",
"items": {
"type": "object",
"properties": {
"host": {"type": "string"},
"paths": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {"type": "string"},
"service": {"type": "string"},
"port": {"type": "integer"}
}
}
}
}
},
"description": "Ingress rules"
}
},
"required": ["environment_id", "name", "rules"]
}
),
# ConfigMap and Secret Management
types.Tool(
name="list_configmaps",
description="List ConfigMaps in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_configmap",
description="Create a ConfigMap",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"name": {
"type": "string",
"description": "ConfigMap name"
},
"data": {
"type": "object",
"description": "Key-value pairs for the ConfigMap"
}
},
"required": ["environment_id", "name", "data"]
}
),
types.Tool(
name="list_secrets",
description="List Secrets in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_secret",
description="Create a Secret",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"name": {
"type": "string",
"description": "Secret name"
},
"type": {
"type": "string",
"description": "Secret type",
"default": "Opaque"
},
"data": {
"type": "object",
"description": "Key-value pairs (values will be base64 encoded)"
}
},
"required": ["environment_id", "name", "data"]
}
),
# Persistent Volume Management
types.Tool(
name="list_persistent_volumes",
description="List PersistentVolumes",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="list_persistent_volume_claims",
description="List PersistentVolumeClaims in a namespace",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="create_persistent_volume_claim",
description="Create a PersistentVolumeClaim",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"namespace": {
"type": "string",
"description": "Namespace name",
"default": "default"
},
"name": {
"type": "string",
"description": "PVC name"
},
"storage_class": {
"type": "string",
"description": "Storage class name (optional)"
},
"size": {
"type": "string",
"description": "Storage size (e.g., '10Gi')"
},
"access_mode": {
"type": "string",
"enum": ["ReadWriteOnce", "ReadOnlyMany", "ReadWriteMany"],
"default": "ReadWriteOnce"
}
},
"required": ["environment_id", "name", "size"]
}
),
# Node Information
types.Tool(
name="list_nodes",
description="List cluster nodes",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
}
},
"required": ["environment_id"]
}
),
types.Tool(
name="get_node",
description="Get detailed information about a node",
inputSchema={
"type": "object",
"properties": {
"environment_id": {
"type": "integer",
"description": "ID of the Kubernetes environment"
},
"node_name": {
"type": "string",
"description": "Node name"
}
},
"required": ["environment_id", "node_name"]
}
)
]
async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None,
params: Optional[Dict] = None, text_response: bool = False) -> Dict[str, Any]:
"""
Make HTTP request to Portainer Kubernetes API.
Centralized HTTP request handler that manages all API communication with
Portainer's Kubernetes proxy endpoints. Handles authentication and returns
structured response data.
Args:
method: HTTP method (GET, POST, PUT, DELETE, PATCH)
endpoint: API endpoint path (e.g., /endpoints/{id}/kubernetes/...)
json_data: Optional JSON payload for POST/PUT/PATCH requests
params: Optional query parameters for the request
text_response: If True, return raw text instead of parsing JSON
Returns:
Dict containing status_code, data (parsed JSON or text), and raw text
Complexity: O(1) - Single HTTP request
Call Flow:
- Called by: handle_call_tool() for all Kubernetes API operations
- Calls: httpx for async HTTP requests
Error Handling:
- Returns status code for caller to handle
- Preserves raw response text for debugging
- SSL verification disabled for self-signed certificates
- Handles both JSON and text responses
"""
import httpx
async with httpx.AsyncClient(verify=False, timeout=30.0) as client:
headers = {"X-API-Key": api_key} if api_key else {}
# Add JSON content type for POST/PUT requests
if method in ["POST", "PUT", "PATCH"] and json_data is not None:
headers["Content-Type"] = "application/json"
if method == "GET":
response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers, params=params)
elif method == "POST":
response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "PUT":
response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "PATCH":
response = await client.patch(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data)
elif method == "DELETE":
response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers)
else:
raise ValueError(f"Unsupported method: {method}")
# Handle text responses (like logs)
if text_response:
return {"status_code": response.status_code, "data": None, "text": response.text}
# Parse JSON response safely
try:
data = response.json() if response.text and response.headers.get("content-type", "").startswith("application/json") else None
except Exception:
data = None
return {"status_code": response.status_code, "data": data, "text": response.text}
def format_pod_phase(phase: str) -> str:
"""Format pod phase for display."""
phase_map = {
"Pending": "⏳ Pending",
"Running": "🟢 Running",
"Succeeded": "✅ Succeeded",
"Failed": "❌ Failed",
"Unknown": "❓ Unknown"
}
return phase_map.get(phase, phase)
def format_resource_quantity(quantity: str) -> str:
"""Format Kubernetes resource quantities."""
if quantity.endswith("Ki"):
kb = int(quantity[:-2])
return f"{kb / 1024:.1f} Mi"
elif quantity.endswith("Mi"):
return quantity
elif quantity.endswith("Gi"):
return quantity
elif quantity.endswith("m"): # millicores
cores = int(quantity[:-1]) / 1000
return f"{cores:.2f} cores"
return quantity
@server.call_tool()
async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]:
"""Handle tool calls."""
import httpx
try:
env_id = arguments.get("environment_id") if arguments else None
namespace = arguments.get("namespace", "default") if arguments else "default"
# Namespace Management
if name == "list_namespaces":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} namespace(s):\n"
for ns in items:
metadata = ns.get("metadata", {})
status = ns.get("status", {})
output += f"\n- {metadata.get('name')}"
output += f"\n Status: {status.get('phase', 'Unknown')}"
output += f"\n Created: {metadata.get('creationTimestamp', 'N/A')}"
labels = metadata.get("labels", {})
if labels:
label_str = ", ".join([f"{k}={v}" for k, v in labels.items()])
output += f"\n Labels: {label_str}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list namespaces: HTTP {result['status_code']}")]
elif name == "create_namespace":
ns_name = arguments.get("name")
if not env_id or not ns_name:
return [types.TextContent(type="text", text="Error: environment_id and name are required")]
ns_data = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": ns_name
}
}
if arguments.get("labels"):
ns_data["metadata"]["labels"] = arguments["labels"]
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces", json_data=ns_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Namespace '{ns_name}' created successfully")]
else:
error_msg = f"Failed to create namespace: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
elif name == "delete_namespace":
ns_name = arguments.get("namespace")
if not env_id or not ns_name:
return [types.TextContent(type="text", text="Error: environment_id and namespace are required")]
result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{ns_name}")
if result["status_code"] in [200, 202]:
return [types.TextContent(type="text", text=f"✓ Namespace '{ns_name}' deletion initiated")]
else:
return [types.TextContent(type="text", text=f"Failed to delete namespace: HTTP {result['status_code']}")]
# Pod Management
elif name == "list_pods":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
params = {}
if arguments.get("label_selector"):
params["labelSelector"] = arguments["label_selector"]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods", params=params)
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} pod(s) in namespace '{namespace}':\n"
for pod in items[:20]: # Limit to 20 pods
metadata = pod.get("metadata", {})
spec = pod.get("spec", {})
status = pod.get("status", {})
pod_name = metadata.get("name", "unknown")
phase = status.get("phase", "Unknown")
node = spec.get("nodeName", "unscheduled")
# Count ready containers
container_statuses = status.get("containerStatuses", [])
ready_count = sum(1 for c in container_statuses if c.get("ready", False))
total_count = len(container_statuses)
output += f"\n- {pod_name}"
output += f"\n Status: {format_pod_phase(phase)} ({ready_count}/{total_count} ready)"
output += f"\n Node: {node}"
# Show restart count if any
restart_count = sum(c.get("restartCount", 0) for c in container_statuses)
if restart_count > 0:
output += f"\n Restarts: {restart_count}"
# Show IP if available
pod_ip = status.get("podIP")
if pod_ip:
output += f"\n IP: {pod_ip}"
if len(items) > 20:
output += f"\n\n... and {len(items) - 20} more pods"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list pods: HTTP {result['status_code']}")]
elif name == "get_pod":
pod_name = arguments.get("pod_name")
if not env_id or not pod_name:
return [types.TextContent(type="text", text="Error: environment_id and pod_name are required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods/{pod_name}")
if result["status_code"] == 200 and result["data"]:
pod = result["data"]
metadata = pod.get("metadata", {})
spec = pod.get("spec", {})
status = pod.get("status", {})
output = f"Pod Details:\n"
output += f"- Name: {metadata.get('name')}\n"
output += f"- Namespace: {metadata.get('namespace')}\n"
output += f"- Phase: {format_pod_phase(status.get('phase'))}\n"
output += f"- Node: {spec.get('nodeName', 'unscheduled')}\n"
output += f"- IP: {status.get('podIP', 'N/A')}\n"
output += f"- Created: {metadata.get('creationTimestamp')}\n"
# Container details
containers = spec.get("containers", [])
if containers:
output += "\nContainers:\n"
for i, container in enumerate(containers):
container_status = status.get("containerStatuses", [])[i] if i < len(status.get("containerStatuses", [])) else {}
output += f"- {container.get('name')}\n"
output += f" Image: {container.get('image')}\n"
output += f" Ready: {container_status.get('ready', False)}\n"
output += f" Restarts: {container_status.get('restartCount', 0)}\n"
# Resources
resources = container.get("resources", {})
if resources.get("requests") or resources.get("limits"):
output += " Resources:\n"
if resources.get("requests"):
output += f" Requests: CPU={resources['requests'].get('cpu', 'N/A')}, Memory={resources['requests'].get('memory', 'N/A')}\n"
if resources.get("limits"):
output += f" Limits: CPU={resources['limits'].get('cpu', 'N/A')}, Memory={resources['limits'].get('memory', 'N/A')}\n"
# Events (recent)
events_result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/events?fieldSelector=involvedObject.name={pod_name}")
if events_result["status_code"] == 200 and events_result["data"]:
events = events_result["data"].get("items", [])
if events:
output += "\nRecent Events:\n"
for event in events[-5:]: # Last 5 events
output += f"- {event.get('type')}: {event.get('reason')} - {event.get('message')}\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get pod: HTTP {result['status_code']}")]
elif name == "delete_pod":
pod_name = arguments.get("pod_name")
if not env_id or not pod_name:
return [types.TextContent(type="text", text="Error: environment_id and pod_name are required")]
result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods/{pod_name}")
if result["status_code"] in [200, 202]:
return [types.TextContent(type="text", text=f"✓ Pod '{pod_name}' deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete pod: HTTP {result['status_code']}")]
elif name == "get_pod_logs":
pod_name = arguments.get("pod_name")
if not env_id or not pod_name:
return [types.TextContent(type="text", text="Error: environment_id and pod_name are required")]
params = {
"tailLines": arguments.get("tail", 100)
}
if arguments.get("container"):
params["container"] = arguments["container"]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods/{pod_name}/log",
params=params, text_response=True)
if result["status_code"] == 200:
logs = result.get("text", "")
if logs:
output = f"Pod logs (last {arguments.get('tail', 100)} lines):\n"
output += "-" * 50 + "\n"
output += logs
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text="No logs available")]
else:
return [types.TextContent(type="text", text=f"Failed to get logs: HTTP {result['status_code']}")]
# Deployment Management
elif name == "list_deployments":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} deployment(s) in namespace '{namespace}':\n"
for deployment in items:
metadata = deployment.get("metadata", {})
spec = deployment.get("spec", {})
status = deployment.get("status", {})
dep_name = metadata.get("name")
replicas = spec.get("replicas", 0)
ready_replicas = status.get("readyReplicas", 0)
available_replicas = status.get("availableReplicas", 0)
output += f"\n- {dep_name}"
output += f"\n Replicas: {ready_replicas}/{replicas} ready, {available_replicas} available"
# Container images
containers = spec.get("template", {}).get("spec", {}).get("containers", [])
if containers:
images = [c.get("image") for c in containers]
output += f"\n Images: {', '.join(images)}"
# Conditions
conditions = status.get("conditions", [])
for condition in conditions:
if condition.get("type") == "Progressing" and condition.get("status") != "True":
output += f"\n ⚠️ {condition.get('reason', 'Progressing issue')}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list deployments: HTTP {result['status_code']}")]
elif name == "get_deployment":
deployment_name = arguments.get("deployment_name")
if not env_id or not deployment_name:
return [types.TextContent(type="text", text="Error: environment_id and deployment_name are required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}")
if result["status_code"] == 200 and result["data"]:
deployment = result["data"]
metadata = deployment.get("metadata", {})
spec = deployment.get("spec", {})
status = deployment.get("status", {})
output = f"Deployment Details:\n"
output += f"- Name: {metadata.get('name')}\n"
output += f"- Namespace: {metadata.get('namespace')}\n"
output += f"- Created: {metadata.get('creationTimestamp')}\n"
output += f"- Replicas: {spec.get('replicas', 0)} desired\n"
output += f"- Ready: {status.get('readyReplicas', 0)}/{spec.get('replicas', 0)}\n"
output += f"- Available: {status.get('availableReplicas', 0)}\n"
output += f"- Up-to-date: {status.get('updatedReplicas', 0)}\n"
# Deployment strategy
strategy = spec.get("strategy", {})
output += f"\nStrategy: {strategy.get('type', 'Unknown')}\n"
# Container details
containers = spec.get("template", {}).get("spec", {}).get("containers", [])
if containers:
output += "\nContainers:\n"
for container in containers:
output += f"- {container.get('name')}\n"
output += f" Image: {container.get('image')}\n"
# Ports
ports = container.get("ports", [])
if ports:
port_str = ", ".join([f"{p.get('containerPort')}/{p.get('protocol', 'TCP')}" for p in ports])
output += f" Ports: {port_str}\n"
# Resources
resources = container.get("resources", {})
if resources.get("requests") or resources.get("limits"):
output += " Resources:\n"
if resources.get("requests"):
output += f" Requests: CPU={resources['requests'].get('cpu', 'N/A')}, Memory={resources['requests'].get('memory', 'N/A')}\n"
if resources.get("limits"):
output += f" Limits: CPU={resources['limits'].get('cpu', 'N/A')}, Memory={resources['limits'].get('memory', 'N/A')}\n"
# Conditions
conditions = status.get("conditions", [])
if conditions:
output += "\nConditions:\n"
for condition in conditions:
output += f"- {condition.get('type')}: {condition.get('status')} ({condition.get('reason', 'N/A')})\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")]
elif name == "create_deployment":
dep_name = arguments.get("name")
image = arguments.get("image")
if not env_id or not dep_name or not image:
return [types.TextContent(type="text", text="Error: environment_id, name, and image are required")]
# Build deployment spec
deployment_data = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": dep_name,
"namespace": namespace
},
"spec": {
"replicas": arguments.get("replicas", 1),
"selector": {
"matchLabels": {
"app": dep_name
}
},
"template": {
"metadata": {
"labels": {
"app": dep_name
}
},
"spec": {
"containers": [{
"name": dep_name,
"image": image
}]
}
}
}
}
# Add custom labels if provided
if arguments.get("labels"):
deployment_data["metadata"]["labels"] = arguments["labels"]
deployment_data["spec"]["template"]["metadata"]["labels"].update(arguments["labels"])
# Add port if provided
if arguments.get("port"):
deployment_data["spec"]["template"]["spec"]["containers"][0]["ports"] = [{
"containerPort": arguments["port"]
}]
# Add environment variables if provided
if arguments.get("env"):
deployment_data["spec"]["template"]["spec"]["containers"][0]["env"] = arguments["env"]
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments",
json_data=deployment_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Deployment '{dep_name}' created successfully")]
else:
error_msg = f"Failed to create deployment: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
elif name == "scale_deployment":
deployment_name = arguments.get("deployment_name")
replicas = arguments.get("replicas")
if not env_id or not deployment_name or replicas is None:
return [types.TextContent(type="text", text="Error: environment_id, deployment_name, and replicas are required")]
# Get current deployment first
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}")
if result["status_code"] != 200:
return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")]
deployment = result["data"]
deployment["spec"]["replicas"] = replicas
# Update deployment
result = await make_request("PUT", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}",
json_data=deployment)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Deployment '{deployment_name}' scaled to {replicas} replicas")]
else:
return [types.TextContent(type="text", text=f"Failed to scale deployment: HTTP {result['status_code']}")]
elif name == "update_deployment_image":
deployment_name = arguments.get("deployment_name")
container_name = arguments.get("container_name")
new_image = arguments.get("new_image")
if not env_id or not deployment_name or not container_name or not new_image:
return [types.TextContent(type="text", text="Error: environment_id, deployment_name, container_name, and new_image are required")]
# Get current deployment
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}")
if result["status_code"] != 200:
return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")]
deployment = result["data"]
containers = deployment["spec"]["template"]["spec"]["containers"]
# Find and update container image
updated = False
for container in containers:
if container["name"] == container_name:
container["image"] = new_image
updated = True
break
if not updated:
return [types.TextContent(type="text", text=f"Container '{container_name}' not found in deployment")]
# Update deployment
result = await make_request("PUT", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}",
json_data=deployment)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Updated container '{container_name}' to image '{new_image}'")]
else:
return [types.TextContent(type="text", text=f"Failed to update deployment: HTTP {result['status_code']}")]
elif name == "restart_deployment":
deployment_name = arguments.get("deployment_name")
if not env_id or not deployment_name:
return [types.TextContent(type="text", text="Error: environment_id and deployment_name are required")]
# Get current deployment
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}")
if result["status_code"] != 200:
return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")]
deployment = result["data"]
# Add/update restart annotation
import datetime
if "annotations" not in deployment["spec"]["template"]["metadata"]:
deployment["spec"]["template"]["metadata"]["annotations"] = {}
deployment["spec"]["template"]["metadata"]["annotations"]["kubectl.kubernetes.io/restartedAt"] = datetime.datetime.utcnow().isoformat()
# Update deployment
result = await make_request("PUT", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}",
json_data=deployment)
if result["status_code"] == 200:
return [types.TextContent(type="text", text=f"✓ Deployment '{deployment_name}' restart initiated")]
else:
return [types.TextContent(type="text", text=f"Failed to restart deployment: HTTP {result['status_code']}")]
elif name == "delete_deployment":
deployment_name = arguments.get("deployment_name")
if not env_id or not deployment_name:
return [types.TextContent(type="text", text="Error: environment_id and deployment_name are required")]
result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}")
if result["status_code"] in [200, 202]:
return [types.TextContent(type="text", text=f"✓ Deployment '{deployment_name}' deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete deployment: HTTP {result['status_code']}")]
# Service Management
elif name == "list_services":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/services")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} service(s) in namespace '{namespace}':\n"
for service in items:
metadata = service.get("metadata", {})
spec = service.get("spec", {})
svc_name = metadata.get("name")
svc_type = spec.get("type", "ClusterIP")
cluster_ip = spec.get("clusterIP", "None")
output += f"\n- {svc_name}"
output += f"\n Type: {svc_type}"
output += f"\n Cluster IP: {cluster_ip}"
# Ports
ports = spec.get("ports", [])
if ports:
port_info = []
for port in ports:
port_str = f"{port.get('port')}"
if port.get("targetPort") and str(port.get("targetPort")) != str(port.get("port")):
# Display target port mapping using arrow notation
port_str += f"{port.get('targetPort')}"
if port.get("nodePort"):
port_str += f" (NodePort: {port.get('nodePort')})"
port_info.append(port_str)
output += f"\n Ports: {', '.join(port_info)}"
# External IPs or LoadBalancer IP
if svc_type == "LoadBalancer":
status = service.get("status", {})
lb = status.get("loadBalancer", {})
ingresses = lb.get("ingress", [])
if ingresses:
ips = [ing.get("ip") or ing.get("hostname") for ing in ingresses]
output += f"\n External: {', '.join(ips)}"
elif spec.get("externalIPs"):
output += f"\n External IPs: {', '.join(spec['externalIPs'])}"
# Selector
selector = spec.get("selector", {})
if selector:
selector_str = ", ".join([f"{k}={v}" for k, v in selector.items()])
output += f"\n Selector: {selector_str}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list services: HTTP {result['status_code']}")]
elif name == "create_service":
svc_name = arguments.get("name")
selector = arguments.get("selector")
ports = arguments.get("ports")
if not env_id or not svc_name or not selector or not ports:
return [types.TextContent(type="text", text="Error: environment_id, name, selector, and ports are required")]
service_data = {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": svc_name,
"namespace": namespace
},
"spec": {
"selector": selector,
"ports": ports,
"type": arguments.get("type", "ClusterIP")
}
}
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/services",
json_data=service_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Service '{svc_name}' created successfully")]
else:
error_msg = f"Failed to create service: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
elif name == "delete_service":
service_name = arguments.get("service_name")
if not env_id or not service_name:
return [types.TextContent(type="text", text="Error: environment_id and service_name are required")]
result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/services/{service_name}")
if result["status_code"] in [200, 202]:
return [types.TextContent(type="text", text=f"✓ Service '{service_name}' deleted successfully")]
else:
return [types.TextContent(type="text", text=f"Failed to delete service: HTTP {result['status_code']}")]
# Ingress Management
elif name == "list_ingresses":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/networking.k8s.io/v1/namespaces/{namespace}/ingresses")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} ingress(es) in namespace '{namespace}':\n"
for ingress in items:
metadata = ingress.get("metadata", {})
spec = ingress.get("spec", {})
status = ingress.get("status", {})
ing_name = metadata.get("name")
output += f"\n- {ing_name}"
# Ingress class
ing_class = spec.get("ingressClassName", "default")
output += f"\n Class: {ing_class}"
# Rules
rules = spec.get("rules", [])
for rule in rules:
host = rule.get("host", "*")
output += f"\n Host: {host}"
http = rule.get("http", {})
paths = http.get("paths", [])
for path in paths:
path_val = path.get("path", "/")
backend = path.get("backend", {})
service = backend.get("service", {})
svc_name = service.get("name", "unknown")
svc_port = service.get("port", {}).get("number", "unknown")
output += f"\n {path_val}{svc_name}:{svc_port}"
# Load balancer info
lb = status.get("loadBalancer", {})
ingresses = lb.get("ingress", [])
if ingresses:
ips = [ing.get("ip") or ing.get("hostname") for ing in ingresses]
output += f"\n Address: {', '.join(ips)}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list ingresses: HTTP {result['status_code']}")]
elif name == "create_ingress":
ing_name = arguments.get("name")
rules = arguments.get("rules")
if not env_id or not ing_name or not rules:
return [types.TextContent(type="text", text="Error: environment_id, name, and rules are required")]
# Build ingress spec
ingress_data = {
"apiVersion": "networking.k8s.io/v1",
"kind": "Ingress",
"metadata": {
"name": ing_name,
"namespace": namespace
},
"spec": {
"rules": []
}
}
# Process rules
for rule in rules:
rule_spec = {}
if rule.get("host"):
rule_spec["host"] = rule["host"]
paths = []
for path in rule.get("paths", []):
path_spec = {
"path": path.get("path", "/"),
"pathType": "Prefix",
"backend": {
"service": {
"name": path["service"],
"port": {
"number": path["port"]
}
}
}
}
paths.append(path_spec)
rule_spec["http"] = {"paths": paths}
ingress_data["spec"]["rules"].append(rule_spec)
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/apis/networking.k8s.io/v1/namespaces/{namespace}/ingresses",
json_data=ingress_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Ingress '{ing_name}' created successfully")]
else:
error_msg = f"Failed to create ingress: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
# ConfigMap and Secret Management
elif name == "list_configmaps":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/configmaps")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} ConfigMap(s) in namespace '{namespace}':\n"
for cm in items:
metadata = cm.get("metadata", {})
data = cm.get("data", {})
output += f"\n- {metadata.get('name')}"
output += f"\n Keys: {len(data)}"
if data:
output += f" ({', '.join(list(data.keys())[:5])}"
if len(data) > 5:
output += f", ... +{len(data) - 5} more"
output += ")"
output += f"\n Created: {metadata.get('creationTimestamp')}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list ConfigMaps: HTTP {result['status_code']}")]
elif name == "create_configmap":
cm_name = arguments.get("name")
data = arguments.get("data")
if not env_id or not cm_name or not data:
return [types.TextContent(type="text", text="Error: environment_id, name, and data are required")]
configmap_data = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": cm_name,
"namespace": namespace
},
"data": data
}
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/configmaps",
json_data=configmap_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ ConfigMap '{cm_name}' created successfully")]
else:
error_msg = f"Failed to create ConfigMap: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
elif name == "list_secrets":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/secrets")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} Secret(s) in namespace '{namespace}':\n"
for secret in items:
metadata = secret.get("metadata", {})
data = secret.get("data", {})
secret_type = secret.get("type", "Opaque")
output += f"\n- {metadata.get('name')}"
output += f"\n Type: {secret_type}"
output += f"\n Keys: {len(data)}"
if data:
output += f" ({', '.join(list(data.keys())[:3])}"
if len(data) > 3:
output += f", ... +{len(data) - 3} more"
output += ")"
output += f"\n Created: {metadata.get('creationTimestamp')}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list Secrets: HTTP {result['status_code']}")]
elif name == "create_secret":
secret_name = arguments.get("name")
data = arguments.get("data")
if not env_id or not secret_name or not data:
return [types.TextContent(type="text", text="Error: environment_id, name, and data are required")]
# Base64 encode the data values
encoded_data = {}
for key, value in data.items():
encoded_data[key] = base64.b64encode(value.encode()).decode()
secret_data = {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": secret_name,
"namespace": namespace
},
"type": arguments.get("type", "Opaque"),
"data": encoded_data
}
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/secrets",
json_data=secret_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ Secret '{secret_name}' created successfully")]
else:
error_msg = f"Failed to create Secret: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
# Persistent Volume Management
elif name == "list_persistent_volumes":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/persistentvolumes")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} PersistentVolume(s):\n"
for pv in items:
metadata = pv.get("metadata", {})
spec = pv.get("spec", {})
status = pv.get("status", {})
output += f"\n- {metadata.get('name')}"
output += f"\n Capacity: {spec.get('capacity', {}).get('storage', 'N/A')}"
output += f"\n Access Modes: {', '.join(spec.get('accessModes', []))}"
output += f"\n Storage Class: {spec.get('storageClassName', 'N/A')}"
output += f"\n Status: {status.get('phase', 'Unknown')}"
# Claim reference
claim_ref = spec.get("claimRef")
if claim_ref:
output += f"\n Bound to: {claim_ref.get('namespace')}/{claim_ref.get('name')}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list PersistentVolumes: HTTP {result['status_code']}")]
elif name == "list_persistent_volume_claims":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/persistentvolumeclaims")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} PersistentVolumeClaim(s) in namespace '{namespace}':\n"
for pvc in items:
metadata = pvc.get("metadata", {})
spec = pvc.get("spec", {})
status = pvc.get("status", {})
output += f"\n- {metadata.get('name')}"
output += f"\n Status: {status.get('phase', 'Unknown')}"
output += f"\n Capacity: {status.get('capacity', {}).get('storage', spec.get('resources', {}).get('requests', {}).get('storage', 'N/A'))}"
output += f"\n Access Modes: {', '.join(spec.get('accessModes', []))}"
output += f"\n Storage Class: {spec.get('storageClassName', 'N/A')}"
output += f"\n Volume: {spec.get('volumeName', 'N/A')}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list PersistentVolumeClaims: HTTP {result['status_code']}")]
elif name == "create_persistent_volume_claim":
pvc_name = arguments.get("name")
size = arguments.get("size")
if not env_id or not pvc_name or not size:
return [types.TextContent(type="text", text="Error: environment_id, name, and size are required")]
pvc_data = {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": pvc_name,
"namespace": namespace
},
"spec": {
"accessModes": [arguments.get("access_mode", "ReadWriteOnce")],
"resources": {
"requests": {
"storage": size
}
}
}
}
if arguments.get("storage_class"):
pvc_data["spec"]["storageClassName"] = arguments["storage_class"]
result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/persistentvolumeclaims",
json_data=pvc_data)
if result["status_code"] in [200, 201]:
return [types.TextContent(type="text", text=f"✓ PersistentVolumeClaim '{pvc_name}' created successfully")]
else:
error_msg = f"Failed to create PVC: HTTP {result['status_code']}"
if result.get("data") and result["data"].get("message"):
error_msg += f"\n{result['data']['message']}"
return [types.TextContent(type="text", text=error_msg)]
# Node Information
elif name == "list_nodes":
if not env_id:
return [types.TextContent(type="text", text="Error: environment_id is required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/nodes")
if result["status_code"] == 200 and result["data"]:
items = result["data"].get("items", [])
output = f"Found {len(items)} node(s):\n"
for node in items:
metadata = node.get("metadata", {})
status = node.get("status", {})
node_name = metadata.get("name")
# Node conditions
ready = "Unknown"
for condition in status.get("conditions", []):
if condition.get("type") == "Ready":
ready = "Ready" if condition.get("status") == "True" else "NotReady"
break
output += f"\n- {node_name}"
output += f"\n Status: {ready}"
# Node info
node_info = status.get("nodeInfo", {})
output += f"\n Version: {node_info.get('kubeletVersion', 'N/A')}"
output += f"\n OS: {node_info.get('operatingSystem', 'N/A')} ({node_info.get('architecture', 'N/A')})"
# Resources
capacity = status.get("capacity", {})
allocatable = status.get("allocatable", {})
if capacity:
output += f"\n CPU: {capacity.get('cpu', 'N/A')} cores"
output += f"\n Memory: {format_resource_quantity(capacity.get('memory', 'N/A'))}"
output += f"\n Pods: {allocatable.get('pods', 'N/A')} allocatable"
# Taints
taints = node.get("spec", {}).get("taints", [])
if taints:
taint_strs = [f"{t.get('key')}={t.get('value')}:{t.get('effect')}" for t in taints]
output += f"\n Taints: {', '.join(taint_strs)}"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to list nodes: HTTP {result['status_code']}")]
elif name == "get_node":
node_name = arguments.get("node_name")
if not env_id or not node_name:
return [types.TextContent(type="text", text="Error: environment_id and node_name are required")]
result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/nodes/{node_name}")
if result["status_code"] == 200 and result["data"]:
node = result["data"]
metadata = node.get("metadata", {})
spec = node.get("spec", {})
status = node.get("status", {})
output = f"Node Details:\n"
output += f"- Name: {metadata.get('name')}\n"
output += f"- Created: {metadata.get('creationTimestamp')}\n"
# Status
for condition in status.get("conditions", []):
output += f"- {condition.get('type')}: {condition.get('status')} ({condition.get('reason', 'N/A')})\n"
# Node info
node_info = status.get("nodeInfo", {})
output += f"\nSystem Info:\n"
output += f"- Kubernetes: {node_info.get('kubeletVersion', 'N/A')}\n"
output += f"- Container Runtime: {node_info.get('containerRuntimeVersion', 'N/A')}\n"
output += f"- OS: {node_info.get('osImage', 'N/A')}\n"
output += f"- Kernel: {node_info.get('kernelVersion', 'N/A')}\n"
output += f"- Architecture: {node_info.get('architecture', 'N/A')}\n"
# Capacity and Allocatable
capacity = status.get("capacity", {})
allocatable = status.get("allocatable", {})
output += f"\nResources:\n"
output += f"- CPU: {capacity.get('cpu', 'N/A')} capacity, {allocatable.get('cpu', 'N/A')} allocatable\n"
output += f"- Memory: {format_resource_quantity(capacity.get('memory', 'N/A'))} capacity, "
output += f"{format_resource_quantity(allocatable.get('memory', 'N/A'))} allocatable\n"
output += f"- Pods: {capacity.get('pods', 'N/A')} capacity, {allocatable.get('pods', 'N/A')} allocatable\n"
# Addresses
addresses = status.get("addresses", [])
if addresses:
output += "\nAddresses:\n"
for addr in addresses:
output += f"- {addr.get('type')}: {addr.get('address')}\n"
# Images
images = status.get("images", [])
if images:
output += f"\nImages: {len(images)} cached\n"
return [types.TextContent(type="text", text=output)]
else:
return [types.TextContent(type="text", text=f"Failed to get node: HTTP {result['status_code']}")]
else:
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
except httpx.TimeoutException:
return [types.TextContent(type="text", text="Error: Request timed out. The operation may take longer than expected.")]
except httpx.ConnectError:
return [types.TextContent(type="text", text="Error: Could not connect to Portainer server. Please check the URL and network connection.")]
except Exception as e:
import traceback
error_details = f"Error: {str(e)}\nType: {type(e).__name__}"
return [types.TextContent(type="text", text=error_details)]
async def run():
"""Run the MCP server."""
# Use stdio transport
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="portainer-kubernetes",
server_version="1.0.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(
prompts_changed=False,
resources_changed=False,
tools_changed=False,
),
experimental_capabilities={},
),
),
)
def main():
"""Main entry point."""
asyncio.run(run())
if __name__ == "__main__":
main()