#!/usr/bin/env python3 """MCP server for Portainer Kubernetes management.""" import os import sys import json import asyncio from typing import Any, Dict, List, Optional from enum import Enum import base64 # Suppress all logging to stderr os.environ["MCP_MODE"] = "true" import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions # Create server server = Server("portainer-kubernetes") # Store for our state portainer_url = os.getenv("PORTAINER_URL", "https://partner.portainer.live") api_key = os.getenv("PORTAINER_API_KEY", "") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List available tools.""" return [ # Namespace Management types.Tool( name="list_namespaces", description="List all namespaces in a Kubernetes cluster", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" } }, "required": ["environment_id"] } ), types.Tool( name="create_namespace", description="Create a new namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "name": { "type": "string", "description": "Namespace name" }, "labels": { "type": "object", "description": "Labels for the namespace (optional)" } }, "required": ["environment_id", "name"] } ), types.Tool( name="delete_namespace", description="Delete a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name" } }, "required": ["environment_id", "namespace"] } ), # Pod Management types.Tool( name="list_pods", description="List pods in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "label_selector": { "type": "string", "description": "Label selector to filter pods (optional)" } }, "required": ["environment_id"] } ), types.Tool( name="get_pod", description="Get detailed information about a pod", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "pod_name": { "type": "string", "description": "Pod name" } }, "required": ["environment_id", "pod_name"] } ), types.Tool( name="delete_pod", description="Delete a pod", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "pod_name": { "type": "string", "description": "Pod name" } }, "required": ["environment_id", "pod_name"] } ), types.Tool( name="get_pod_logs", description="Get logs from a pod", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "pod_name": { "type": "string", "description": "Pod name" }, "container": { "type": "string", "description": "Container name (optional if pod has single container)" }, "tail": { "type": "integer", "description": "Number of lines from the end", "default": 100 } }, "required": ["environment_id", "pod_name"] } ), # Deployment Management types.Tool( name="list_deployments", description="List deployments in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" } }, "required": ["environment_id"] } ), types.Tool( name="get_deployment", description="Get detailed information about a deployment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "deployment_name": { "type": "string", "description": "Deployment name" } }, "required": ["environment_id", "deployment_name"] } ), types.Tool( name="create_deployment", description="Create a new deployment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "name": { "type": "string", "description": "Deployment name" }, "image": { "type": "string", "description": "Container image" }, "replicas": { "type": "integer", "description": "Number of replicas", "default": 1 }, "port": { "type": "integer", "description": "Container port (optional)" }, "env": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "value": {"type": "string"} } }, "description": "Environment variables (optional)" }, "labels": { "type": "object", "description": "Labels for the deployment (optional)" } }, "required": ["environment_id", "name", "image"] } ), types.Tool( name="scale_deployment", description="Scale a deployment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "deployment_name": { "type": "string", "description": "Deployment name" }, "replicas": { "type": "integer", "description": "Number of replicas" } }, "required": ["environment_id", "deployment_name", "replicas"] } ), types.Tool( name="update_deployment_image", description="Update deployment container image", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "deployment_name": { "type": "string", "description": "Deployment name" }, "container_name": { "type": "string", "description": "Container name" }, "new_image": { "type": "string", "description": "New container image" } }, "required": ["environment_id", "deployment_name", "container_name", "new_image"] } ), types.Tool( name="restart_deployment", description="Restart a deployment by updating annotation", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "deployment_name": { "type": "string", "description": "Deployment name" } }, "required": ["environment_id", "deployment_name"] } ), types.Tool( name="delete_deployment", description="Delete a deployment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "deployment_name": { "type": "string", "description": "Deployment name" } }, "required": ["environment_id", "deployment_name"] } ), # Service Management types.Tool( name="list_services", description="List services in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" } }, "required": ["environment_id"] } ), types.Tool( name="create_service", description="Create a service for a deployment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "name": { "type": "string", "description": "Service name" }, "selector": { "type": "object", "description": "Pod selector labels" }, "ports": { "type": "array", "items": { "type": "object", "properties": { "port": {"type": "integer"}, "targetPort": {"type": "integer"}, "protocol": {"type": "string", "default": "TCP"} } }, "description": "Service ports" }, "type": { "type": "string", "enum": ["ClusterIP", "NodePort", "LoadBalancer"], "default": "ClusterIP" } }, "required": ["environment_id", "name", "selector", "ports"] } ), types.Tool( name="delete_service", description="Delete a service", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "service_name": { "type": "string", "description": "Service name" } }, "required": ["environment_id", "service_name"] } ), # Ingress Management types.Tool( name="list_ingresses", description="List ingresses in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" } }, "required": ["environment_id"] } ), types.Tool( name="create_ingress", description="Create an ingress", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "name": { "type": "string", "description": "Ingress name" }, "rules": { "type": "array", "items": { "type": "object", "properties": { "host": {"type": "string"}, "paths": { "type": "array", "items": { "type": "object", "properties": { "path": {"type": "string"}, "service": {"type": "string"}, "port": {"type": "integer"} } } } } }, "description": "Ingress rules" } }, "required": ["environment_id", "name", "rules"] } ), # ConfigMap and Secret Management types.Tool( name="list_configmaps", description="List ConfigMaps in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" } }, "required": ["environment_id"] } ), types.Tool( name="create_configmap", description="Create a ConfigMap", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "name": { "type": "string", "description": "ConfigMap name" }, "data": { "type": "object", "description": "Key-value pairs for the ConfigMap" } }, "required": ["environment_id", "name", "data"] } ), types.Tool( name="list_secrets", description="List Secrets in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" } }, "required": ["environment_id"] } ), types.Tool( name="create_secret", description="Create a Secret", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "name": { "type": "string", "description": "Secret name" }, "type": { "type": "string", "description": "Secret type", "default": "Opaque" }, "data": { "type": "object", "description": "Key-value pairs (values will be base64 encoded)" } }, "required": ["environment_id", "name", "data"] } ), # Persistent Volume Management types.Tool( name="list_persistent_volumes", description="List PersistentVolumes", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" } }, "required": ["environment_id"] } ), types.Tool( name="list_persistent_volume_claims", description="List PersistentVolumeClaims in a namespace", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" } }, "required": ["environment_id"] } ), types.Tool( name="create_persistent_volume_claim", description="Create a PersistentVolumeClaim", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "namespace": { "type": "string", "description": "Namespace name", "default": "default" }, "name": { "type": "string", "description": "PVC name" }, "storage_class": { "type": "string", "description": "Storage class name (optional)" }, "size": { "type": "string", "description": "Storage size (e.g., '10Gi')" }, "access_mode": { "type": "string", "enum": ["ReadWriteOnce", "ReadOnlyMany", "ReadWriteMany"], "default": "ReadWriteOnce" } }, "required": ["environment_id", "name", "size"] } ), # Node Information types.Tool( name="list_nodes", description="List cluster nodes", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" } }, "required": ["environment_id"] } ), types.Tool( name="get_node", description="Get detailed information about a node", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "integer", "description": "ID of the Kubernetes environment" }, "node_name": { "type": "string", "description": "Node name" } }, "required": ["environment_id", "node_name"] } ) ] async def make_request(method: str, endpoint: str, json_data: Optional[Dict] = None, params: Optional[Dict] = None, text_response: bool = False) -> Dict[str, Any]: """ Make HTTP request to Portainer Kubernetes API. Centralized HTTP request handler that manages all API communication with Portainer's Kubernetes proxy endpoints. Handles authentication and returns structured response data. Args: method: HTTP method (GET, POST, PUT, DELETE, PATCH) endpoint: API endpoint path (e.g., /endpoints/{id}/kubernetes/...) json_data: Optional JSON payload for POST/PUT/PATCH requests params: Optional query parameters for the request text_response: If True, return raw text instead of parsing JSON Returns: Dict containing status_code, data (parsed JSON or text), and raw text Complexity: O(1) - Single HTTP request Call Flow: - Called by: handle_call_tool() for all Kubernetes API operations - Calls: httpx for async HTTP requests Error Handling: - Returns status code for caller to handle - Preserves raw response text for debugging - SSL verification disabled for self-signed certificates - Handles both JSON and text responses """ import httpx async with httpx.AsyncClient(verify=False, timeout=30.0) as client: headers = {"X-API-Key": api_key} if api_key else {} # Add JSON content type for POST/PUT requests if method in ["POST", "PUT", "PATCH"] and json_data is not None: headers["Content-Type"] = "application/json" if method == "GET": response = await client.get(f"{portainer_url}/api{endpoint}", headers=headers, params=params) elif method == "POST": response = await client.post(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "PUT": response = await client.put(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "PATCH": response = await client.patch(f"{portainer_url}/api{endpoint}", headers=headers, json=json_data) elif method == "DELETE": response = await client.delete(f"{portainer_url}/api{endpoint}", headers=headers) else: raise ValueError(f"Unsupported method: {method}") # Handle text responses (like logs) if text_response: return {"status_code": response.status_code, "data": None, "text": response.text} # Parse JSON response safely try: data = response.json() if response.text and response.headers.get("content-type", "").startswith("application/json") else None except Exception: data = None return {"status_code": response.status_code, "data": data, "text": response.text} def format_pod_phase(phase: str) -> str: """Format pod phase for display.""" phase_map = { "Pending": "⏳ Pending", "Running": "🟢 Running", "Succeeded": "✅ Succeeded", "Failed": "❌ Failed", "Unknown": "❓ Unknown" } return phase_map.get(phase, phase) def format_resource_quantity(quantity: str) -> str: """Format Kubernetes resource quantities.""" if quantity.endswith("Ki"): kb = int(quantity[:-2]) return f"{kb / 1024:.1f} Mi" elif quantity.endswith("Mi"): return quantity elif quantity.endswith("Gi"): return quantity elif quantity.endswith("m"): # millicores cores = int(quantity[:-1]) / 1000 return f"{cores:.2f} cores" return quantity @server.call_tool() async def handle_call_tool(name: str, arguments: dict[str, Any] | None) -> list[types.TextContent]: """Handle tool calls.""" import httpx try: env_id = arguments.get("environment_id") if arguments else None namespace = arguments.get("namespace", "default") if arguments else "default" # Namespace Management if name == "list_namespaces": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} namespace(s):\n" for ns in items: metadata = ns.get("metadata", {}) status = ns.get("status", {}) output += f"\n- {metadata.get('name')}" output += f"\n Status: {status.get('phase', 'Unknown')}" output += f"\n Created: {metadata.get('creationTimestamp', 'N/A')}" labels = metadata.get("labels", {}) if labels: label_str = ", ".join([f"{k}={v}" for k, v in labels.items()]) output += f"\n Labels: {label_str}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list namespaces: HTTP {result['status_code']}")] elif name == "create_namespace": ns_name = arguments.get("name") if not env_id or not ns_name: return [types.TextContent(type="text", text="Error: environment_id and name are required")] ns_data = { "apiVersion": "v1", "kind": "Namespace", "metadata": { "name": ns_name } } if arguments.get("labels"): ns_data["metadata"]["labels"] = arguments["labels"] result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces", json_data=ns_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Namespace '{ns_name}' created successfully")] else: error_msg = f"Failed to create namespace: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] elif name == "delete_namespace": ns_name = arguments.get("namespace") if not env_id or not ns_name: return [types.TextContent(type="text", text="Error: environment_id and namespace are required")] result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{ns_name}") if result["status_code"] in [200, 202]: return [types.TextContent(type="text", text=f"✓ Namespace '{ns_name}' deletion initiated")] else: return [types.TextContent(type="text", text=f"Failed to delete namespace: HTTP {result['status_code']}")] # Pod Management elif name == "list_pods": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] params = {} if arguments.get("label_selector"): params["labelSelector"] = arguments["label_selector"] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods", params=params) if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} pod(s) in namespace '{namespace}':\n" for pod in items[:20]: # Limit to 20 pods metadata = pod.get("metadata", {}) spec = pod.get("spec", {}) status = pod.get("status", {}) pod_name = metadata.get("name", "unknown") phase = status.get("phase", "Unknown") node = spec.get("nodeName", "unscheduled") # Count ready containers container_statuses = status.get("containerStatuses", []) ready_count = sum(1 for c in container_statuses if c.get("ready", False)) total_count = len(container_statuses) output += f"\n- {pod_name}" output += f"\n Status: {format_pod_phase(phase)} ({ready_count}/{total_count} ready)" output += f"\n Node: {node}" # Show restart count if any restart_count = sum(c.get("restartCount", 0) for c in container_statuses) if restart_count > 0: output += f"\n Restarts: {restart_count}" # Show IP if available pod_ip = status.get("podIP") if pod_ip: output += f"\n IP: {pod_ip}" if len(items) > 20: output += f"\n\n... and {len(items) - 20} more pods" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list pods: HTTP {result['status_code']}")] elif name == "get_pod": pod_name = arguments.get("pod_name") if not env_id or not pod_name: return [types.TextContent(type="text", text="Error: environment_id and pod_name are required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods/{pod_name}") if result["status_code"] == 200 and result["data"]: pod = result["data"] metadata = pod.get("metadata", {}) spec = pod.get("spec", {}) status = pod.get("status", {}) output = f"Pod Details:\n" output += f"- Name: {metadata.get('name')}\n" output += f"- Namespace: {metadata.get('namespace')}\n" output += f"- Phase: {format_pod_phase(status.get('phase'))}\n" output += f"- Node: {spec.get('nodeName', 'unscheduled')}\n" output += f"- IP: {status.get('podIP', 'N/A')}\n" output += f"- Created: {metadata.get('creationTimestamp')}\n" # Container details containers = spec.get("containers", []) if containers: output += "\nContainers:\n" for i, container in enumerate(containers): container_status = status.get("containerStatuses", [])[i] if i < len(status.get("containerStatuses", [])) else {} output += f"- {container.get('name')}\n" output += f" Image: {container.get('image')}\n" output += f" Ready: {container_status.get('ready', False)}\n" output += f" Restarts: {container_status.get('restartCount', 0)}\n" # Resources resources = container.get("resources", {}) if resources.get("requests") or resources.get("limits"): output += " Resources:\n" if resources.get("requests"): output += f" Requests: CPU={resources['requests'].get('cpu', 'N/A')}, Memory={resources['requests'].get('memory', 'N/A')}\n" if resources.get("limits"): output += f" Limits: CPU={resources['limits'].get('cpu', 'N/A')}, Memory={resources['limits'].get('memory', 'N/A')}\n" # Events (recent) events_result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/events?fieldSelector=involvedObject.name={pod_name}") if events_result["status_code"] == 200 and events_result["data"]: events = events_result["data"].get("items", []) if events: output += "\nRecent Events:\n" for event in events[-5:]: # Last 5 events output += f"- {event.get('type')}: {event.get('reason')} - {event.get('message')}\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get pod: HTTP {result['status_code']}")] elif name == "delete_pod": pod_name = arguments.get("pod_name") if not env_id or not pod_name: return [types.TextContent(type="text", text="Error: environment_id and pod_name are required")] result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods/{pod_name}") if result["status_code"] in [200, 202]: return [types.TextContent(type="text", text=f"✓ Pod '{pod_name}' deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete pod: HTTP {result['status_code']}")] elif name == "get_pod_logs": pod_name = arguments.get("pod_name") if not env_id or not pod_name: return [types.TextContent(type="text", text="Error: environment_id and pod_name are required")] params = { "tailLines": arguments.get("tail", 100) } if arguments.get("container"): params["container"] = arguments["container"] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/pods/{pod_name}/log", params=params, text_response=True) if result["status_code"] == 200: logs = result.get("text", "") if logs: output = f"Pod logs (last {arguments.get('tail', 100)} lines):\n" output += "-" * 50 + "\n" output += logs return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text="No logs available")] else: return [types.TextContent(type="text", text=f"Failed to get logs: HTTP {result['status_code']}")] # Deployment Management elif name == "list_deployments": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} deployment(s) in namespace '{namespace}':\n" for deployment in items: metadata = deployment.get("metadata", {}) spec = deployment.get("spec", {}) status = deployment.get("status", {}) dep_name = metadata.get("name") replicas = spec.get("replicas", 0) ready_replicas = status.get("readyReplicas", 0) available_replicas = status.get("availableReplicas", 0) output += f"\n- {dep_name}" output += f"\n Replicas: {ready_replicas}/{replicas} ready, {available_replicas} available" # Container images containers = spec.get("template", {}).get("spec", {}).get("containers", []) if containers: images = [c.get("image") for c in containers] output += f"\n Images: {', '.join(images)}" # Conditions conditions = status.get("conditions", []) for condition in conditions: if condition.get("type") == "Progressing" and condition.get("status") != "True": output += f"\n ⚠️ {condition.get('reason', 'Progressing issue')}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list deployments: HTTP {result['status_code']}")] elif name == "get_deployment": deployment_name = arguments.get("deployment_name") if not env_id or not deployment_name: return [types.TextContent(type="text", text="Error: environment_id and deployment_name are required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}") if result["status_code"] == 200 and result["data"]: deployment = result["data"] metadata = deployment.get("metadata", {}) spec = deployment.get("spec", {}) status = deployment.get("status", {}) output = f"Deployment Details:\n" output += f"- Name: {metadata.get('name')}\n" output += f"- Namespace: {metadata.get('namespace')}\n" output += f"- Created: {metadata.get('creationTimestamp')}\n" output += f"- Replicas: {spec.get('replicas', 0)} desired\n" output += f"- Ready: {status.get('readyReplicas', 0)}/{spec.get('replicas', 0)}\n" output += f"- Available: {status.get('availableReplicas', 0)}\n" output += f"- Up-to-date: {status.get('updatedReplicas', 0)}\n" # Deployment strategy strategy = spec.get("strategy", {}) output += f"\nStrategy: {strategy.get('type', 'Unknown')}\n" # Container details containers = spec.get("template", {}).get("spec", {}).get("containers", []) if containers: output += "\nContainers:\n" for container in containers: output += f"- {container.get('name')}\n" output += f" Image: {container.get('image')}\n" # Ports ports = container.get("ports", []) if ports: port_str = ", ".join([f"{p.get('containerPort')}/{p.get('protocol', 'TCP')}" for p in ports]) output += f" Ports: {port_str}\n" # Resources resources = container.get("resources", {}) if resources.get("requests") or resources.get("limits"): output += " Resources:\n" if resources.get("requests"): output += f" Requests: CPU={resources['requests'].get('cpu', 'N/A')}, Memory={resources['requests'].get('memory', 'N/A')}\n" if resources.get("limits"): output += f" Limits: CPU={resources['limits'].get('cpu', 'N/A')}, Memory={resources['limits'].get('memory', 'N/A')}\n" # Conditions conditions = status.get("conditions", []) if conditions: output += "\nConditions:\n" for condition in conditions: output += f"- {condition.get('type')}: {condition.get('status')} ({condition.get('reason', 'N/A')})\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")] elif name == "create_deployment": dep_name = arguments.get("name") image = arguments.get("image") if not env_id or not dep_name or not image: return [types.TextContent(type="text", text="Error: environment_id, name, and image are required")] # Build deployment spec deployment_data = { "apiVersion": "apps/v1", "kind": "Deployment", "metadata": { "name": dep_name, "namespace": namespace }, "spec": { "replicas": arguments.get("replicas", 1), "selector": { "matchLabels": { "app": dep_name } }, "template": { "metadata": { "labels": { "app": dep_name } }, "spec": { "containers": [{ "name": dep_name, "image": image }] } } } } # Add custom labels if provided if arguments.get("labels"): deployment_data["metadata"]["labels"] = arguments["labels"] deployment_data["spec"]["template"]["metadata"]["labels"].update(arguments["labels"]) # Add port if provided if arguments.get("port"): deployment_data["spec"]["template"]["spec"]["containers"][0]["ports"] = [{ "containerPort": arguments["port"] }] # Add environment variables if provided if arguments.get("env"): deployment_data["spec"]["template"]["spec"]["containers"][0]["env"] = arguments["env"] result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments", json_data=deployment_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Deployment '{dep_name}' created successfully")] else: error_msg = f"Failed to create deployment: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] elif name == "scale_deployment": deployment_name = arguments.get("deployment_name") replicas = arguments.get("replicas") if not env_id or not deployment_name or replicas is None: return [types.TextContent(type="text", text="Error: environment_id, deployment_name, and replicas are required")] # Get current deployment first result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}") if result["status_code"] != 200: return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")] deployment = result["data"] deployment["spec"]["replicas"] = replicas # Update deployment result = await make_request("PUT", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}", json_data=deployment) if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ Deployment '{deployment_name}' scaled to {replicas} replicas")] else: return [types.TextContent(type="text", text=f"Failed to scale deployment: HTTP {result['status_code']}")] elif name == "update_deployment_image": deployment_name = arguments.get("deployment_name") container_name = arguments.get("container_name") new_image = arguments.get("new_image") if not env_id or not deployment_name or not container_name or not new_image: return [types.TextContent(type="text", text="Error: environment_id, deployment_name, container_name, and new_image are required")] # Get current deployment result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}") if result["status_code"] != 200: return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")] deployment = result["data"] containers = deployment["spec"]["template"]["spec"]["containers"] # Find and update container image updated = False for container in containers: if container["name"] == container_name: container["image"] = new_image updated = True break if not updated: return [types.TextContent(type="text", text=f"Container '{container_name}' not found in deployment")] # Update deployment result = await make_request("PUT", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}", json_data=deployment) if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ Updated container '{container_name}' to image '{new_image}'")] else: return [types.TextContent(type="text", text=f"Failed to update deployment: HTTP {result['status_code']}")] elif name == "restart_deployment": deployment_name = arguments.get("deployment_name") if not env_id or not deployment_name: return [types.TextContent(type="text", text="Error: environment_id and deployment_name are required")] # Get current deployment result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}") if result["status_code"] != 200: return [types.TextContent(type="text", text=f"Failed to get deployment: HTTP {result['status_code']}")] deployment = result["data"] # Add/update restart annotation import datetime if "annotations" not in deployment["spec"]["template"]["metadata"]: deployment["spec"]["template"]["metadata"]["annotations"] = {} deployment["spec"]["template"]["metadata"]["annotations"]["kubectl.kubernetes.io/restartedAt"] = datetime.datetime.utcnow().isoformat() # Update deployment result = await make_request("PUT", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}", json_data=deployment) if result["status_code"] == 200: return [types.TextContent(type="text", text=f"✓ Deployment '{deployment_name}' restart initiated")] else: return [types.TextContent(type="text", text=f"Failed to restart deployment: HTTP {result['status_code']}")] elif name == "delete_deployment": deployment_name = arguments.get("deployment_name") if not env_id or not deployment_name: return [types.TextContent(type="text", text="Error: environment_id and deployment_name are required")] result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/apis/apps/v1/namespaces/{namespace}/deployments/{deployment_name}") if result["status_code"] in [200, 202]: return [types.TextContent(type="text", text=f"✓ Deployment '{deployment_name}' deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete deployment: HTTP {result['status_code']}")] # Service Management elif name == "list_services": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/services") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} service(s) in namespace '{namespace}':\n" for service in items: metadata = service.get("metadata", {}) spec = service.get("spec", {}) svc_name = metadata.get("name") svc_type = spec.get("type", "ClusterIP") cluster_ip = spec.get("clusterIP", "None") output += f"\n- {svc_name}" output += f"\n Type: {svc_type}" output += f"\n Cluster IP: {cluster_ip}" # Ports ports = spec.get("ports", []) if ports: port_info = [] for port in ports: port_str = f"{port.get('port')}" if port.get("targetPort") and str(port.get("targetPort")) != str(port.get("port")): # Display target port mapping using arrow notation port_str += f"→{port.get('targetPort')}" if port.get("nodePort"): port_str += f" (NodePort: {port.get('nodePort')})" port_info.append(port_str) output += f"\n Ports: {', '.join(port_info)}" # External IPs or LoadBalancer IP if svc_type == "LoadBalancer": status = service.get("status", {}) lb = status.get("loadBalancer", {}) ingresses = lb.get("ingress", []) if ingresses: ips = [ing.get("ip") or ing.get("hostname") for ing in ingresses] output += f"\n External: {', '.join(ips)}" elif spec.get("externalIPs"): output += f"\n External IPs: {', '.join(spec['externalIPs'])}" # Selector selector = spec.get("selector", {}) if selector: selector_str = ", ".join([f"{k}={v}" for k, v in selector.items()]) output += f"\n Selector: {selector_str}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list services: HTTP {result['status_code']}")] elif name == "create_service": svc_name = arguments.get("name") selector = arguments.get("selector") ports = arguments.get("ports") if not env_id or not svc_name or not selector or not ports: return [types.TextContent(type="text", text="Error: environment_id, name, selector, and ports are required")] service_data = { "apiVersion": "v1", "kind": "Service", "metadata": { "name": svc_name, "namespace": namespace }, "spec": { "selector": selector, "ports": ports, "type": arguments.get("type", "ClusterIP") } } result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/services", json_data=service_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Service '{svc_name}' created successfully")] else: error_msg = f"Failed to create service: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] elif name == "delete_service": service_name = arguments.get("service_name") if not env_id or not service_name: return [types.TextContent(type="text", text="Error: environment_id and service_name are required")] result = await make_request("DELETE", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/services/{service_name}") if result["status_code"] in [200, 202]: return [types.TextContent(type="text", text=f"✓ Service '{service_name}' deleted successfully")] else: return [types.TextContent(type="text", text=f"Failed to delete service: HTTP {result['status_code']}")] # Ingress Management elif name == "list_ingresses": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/apis/networking.k8s.io/v1/namespaces/{namespace}/ingresses") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} ingress(es) in namespace '{namespace}':\n" for ingress in items: metadata = ingress.get("metadata", {}) spec = ingress.get("spec", {}) status = ingress.get("status", {}) ing_name = metadata.get("name") output += f"\n- {ing_name}" # Ingress class ing_class = spec.get("ingressClassName", "default") output += f"\n Class: {ing_class}" # Rules rules = spec.get("rules", []) for rule in rules: host = rule.get("host", "*") output += f"\n Host: {host}" http = rule.get("http", {}) paths = http.get("paths", []) for path in paths: path_val = path.get("path", "/") backend = path.get("backend", {}) service = backend.get("service", {}) svc_name = service.get("name", "unknown") svc_port = service.get("port", {}).get("number", "unknown") output += f"\n {path_val} → {svc_name}:{svc_port}" # Load balancer info lb = status.get("loadBalancer", {}) ingresses = lb.get("ingress", []) if ingresses: ips = [ing.get("ip") or ing.get("hostname") for ing in ingresses] output += f"\n Address: {', '.join(ips)}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list ingresses: HTTP {result['status_code']}")] elif name == "create_ingress": ing_name = arguments.get("name") rules = arguments.get("rules") if not env_id or not ing_name or not rules: return [types.TextContent(type="text", text="Error: environment_id, name, and rules are required")] # Build ingress spec ingress_data = { "apiVersion": "networking.k8s.io/v1", "kind": "Ingress", "metadata": { "name": ing_name, "namespace": namespace }, "spec": { "rules": [] } } # Process rules for rule in rules: rule_spec = {} if rule.get("host"): rule_spec["host"] = rule["host"] paths = [] for path in rule.get("paths", []): path_spec = { "path": path.get("path", "/"), "pathType": "Prefix", "backend": { "service": { "name": path["service"], "port": { "number": path["port"] } } } } paths.append(path_spec) rule_spec["http"] = {"paths": paths} ingress_data["spec"]["rules"].append(rule_spec) result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/apis/networking.k8s.io/v1/namespaces/{namespace}/ingresses", json_data=ingress_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Ingress '{ing_name}' created successfully")] else: error_msg = f"Failed to create ingress: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] # ConfigMap and Secret Management elif name == "list_configmaps": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/configmaps") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} ConfigMap(s) in namespace '{namespace}':\n" for cm in items: metadata = cm.get("metadata", {}) data = cm.get("data", {}) output += f"\n- {metadata.get('name')}" output += f"\n Keys: {len(data)}" if data: output += f" ({', '.join(list(data.keys())[:5])}" if len(data) > 5: output += f", ... +{len(data) - 5} more" output += ")" output += f"\n Created: {metadata.get('creationTimestamp')}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list ConfigMaps: HTTP {result['status_code']}")] elif name == "create_configmap": cm_name = arguments.get("name") data = arguments.get("data") if not env_id or not cm_name or not data: return [types.TextContent(type="text", text="Error: environment_id, name, and data are required")] configmap_data = { "apiVersion": "v1", "kind": "ConfigMap", "metadata": { "name": cm_name, "namespace": namespace }, "data": data } result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/configmaps", json_data=configmap_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ ConfigMap '{cm_name}' created successfully")] else: error_msg = f"Failed to create ConfigMap: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] elif name == "list_secrets": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/secrets") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} Secret(s) in namespace '{namespace}':\n" for secret in items: metadata = secret.get("metadata", {}) data = secret.get("data", {}) secret_type = secret.get("type", "Opaque") output += f"\n- {metadata.get('name')}" output += f"\n Type: {secret_type}" output += f"\n Keys: {len(data)}" if data: output += f" ({', '.join(list(data.keys())[:3])}" if len(data) > 3: output += f", ... +{len(data) - 3} more" output += ")" output += f"\n Created: {metadata.get('creationTimestamp')}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list Secrets: HTTP {result['status_code']}")] elif name == "create_secret": secret_name = arguments.get("name") data = arguments.get("data") if not env_id or not secret_name or not data: return [types.TextContent(type="text", text="Error: environment_id, name, and data are required")] # Base64 encode the data values encoded_data = {} for key, value in data.items(): encoded_data[key] = base64.b64encode(value.encode()).decode() secret_data = { "apiVersion": "v1", "kind": "Secret", "metadata": { "name": secret_name, "namespace": namespace }, "type": arguments.get("type", "Opaque"), "data": encoded_data } result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/secrets", json_data=secret_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ Secret '{secret_name}' created successfully")] else: error_msg = f"Failed to create Secret: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] # Persistent Volume Management elif name == "list_persistent_volumes": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/persistentvolumes") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} PersistentVolume(s):\n" for pv in items: metadata = pv.get("metadata", {}) spec = pv.get("spec", {}) status = pv.get("status", {}) output += f"\n- {metadata.get('name')}" output += f"\n Capacity: {spec.get('capacity', {}).get('storage', 'N/A')}" output += f"\n Access Modes: {', '.join(spec.get('accessModes', []))}" output += f"\n Storage Class: {spec.get('storageClassName', 'N/A')}" output += f"\n Status: {status.get('phase', 'Unknown')}" # Claim reference claim_ref = spec.get("claimRef") if claim_ref: output += f"\n Bound to: {claim_ref.get('namespace')}/{claim_ref.get('name')}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list PersistentVolumes: HTTP {result['status_code']}")] elif name == "list_persistent_volume_claims": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/persistentvolumeclaims") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} PersistentVolumeClaim(s) in namespace '{namespace}':\n" for pvc in items: metadata = pvc.get("metadata", {}) spec = pvc.get("spec", {}) status = pvc.get("status", {}) output += f"\n- {metadata.get('name')}" output += f"\n Status: {status.get('phase', 'Unknown')}" output += f"\n Capacity: {status.get('capacity', {}).get('storage', spec.get('resources', {}).get('requests', {}).get('storage', 'N/A'))}" output += f"\n Access Modes: {', '.join(spec.get('accessModes', []))}" output += f"\n Storage Class: {spec.get('storageClassName', 'N/A')}" output += f"\n Volume: {spec.get('volumeName', 'N/A')}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list PersistentVolumeClaims: HTTP {result['status_code']}")] elif name == "create_persistent_volume_claim": pvc_name = arguments.get("name") size = arguments.get("size") if not env_id or not pvc_name or not size: return [types.TextContent(type="text", text="Error: environment_id, name, and size are required")] pvc_data = { "apiVersion": "v1", "kind": "PersistentVolumeClaim", "metadata": { "name": pvc_name, "namespace": namespace }, "spec": { "accessModes": [arguments.get("access_mode", "ReadWriteOnce")], "resources": { "requests": { "storage": size } } } } if arguments.get("storage_class"): pvc_data["spec"]["storageClassName"] = arguments["storage_class"] result = await make_request("POST", f"/endpoints/{env_id}/kubernetes/api/v1/namespaces/{namespace}/persistentvolumeclaims", json_data=pvc_data) if result["status_code"] in [200, 201]: return [types.TextContent(type="text", text=f"✓ PersistentVolumeClaim '{pvc_name}' created successfully")] else: error_msg = f"Failed to create PVC: HTTP {result['status_code']}" if result.get("data") and result["data"].get("message"): error_msg += f"\n{result['data']['message']}" return [types.TextContent(type="text", text=error_msg)] # Node Information elif name == "list_nodes": if not env_id: return [types.TextContent(type="text", text="Error: environment_id is required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/nodes") if result["status_code"] == 200 and result["data"]: items = result["data"].get("items", []) output = f"Found {len(items)} node(s):\n" for node in items: metadata = node.get("metadata", {}) status = node.get("status", {}) node_name = metadata.get("name") # Node conditions ready = "Unknown" for condition in status.get("conditions", []): if condition.get("type") == "Ready": ready = "Ready" if condition.get("status") == "True" else "NotReady" break output += f"\n- {node_name}" output += f"\n Status: {ready}" # Node info node_info = status.get("nodeInfo", {}) output += f"\n Version: {node_info.get('kubeletVersion', 'N/A')}" output += f"\n OS: {node_info.get('operatingSystem', 'N/A')} ({node_info.get('architecture', 'N/A')})" # Resources capacity = status.get("capacity", {}) allocatable = status.get("allocatable", {}) if capacity: output += f"\n CPU: {capacity.get('cpu', 'N/A')} cores" output += f"\n Memory: {format_resource_quantity(capacity.get('memory', 'N/A'))}" output += f"\n Pods: {allocatable.get('pods', 'N/A')} allocatable" # Taints taints = node.get("spec", {}).get("taints", []) if taints: taint_strs = [f"{t.get('key')}={t.get('value')}:{t.get('effect')}" for t in taints] output += f"\n Taints: {', '.join(taint_strs)}" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to list nodes: HTTP {result['status_code']}")] elif name == "get_node": node_name = arguments.get("node_name") if not env_id or not node_name: return [types.TextContent(type="text", text="Error: environment_id and node_name are required")] result = await make_request("GET", f"/endpoints/{env_id}/kubernetes/api/v1/nodes/{node_name}") if result["status_code"] == 200 and result["data"]: node = result["data"] metadata = node.get("metadata", {}) spec = node.get("spec", {}) status = node.get("status", {}) output = f"Node Details:\n" output += f"- Name: {metadata.get('name')}\n" output += f"- Created: {metadata.get('creationTimestamp')}\n" # Status for condition in status.get("conditions", []): output += f"- {condition.get('type')}: {condition.get('status')} ({condition.get('reason', 'N/A')})\n" # Node info node_info = status.get("nodeInfo", {}) output += f"\nSystem Info:\n" output += f"- Kubernetes: {node_info.get('kubeletVersion', 'N/A')}\n" output += f"- Container Runtime: {node_info.get('containerRuntimeVersion', 'N/A')}\n" output += f"- OS: {node_info.get('osImage', 'N/A')}\n" output += f"- Kernel: {node_info.get('kernelVersion', 'N/A')}\n" output += f"- Architecture: {node_info.get('architecture', 'N/A')}\n" # Capacity and Allocatable capacity = status.get("capacity", {}) allocatable = status.get("allocatable", {}) output += f"\nResources:\n" output += f"- CPU: {capacity.get('cpu', 'N/A')} capacity, {allocatable.get('cpu', 'N/A')} allocatable\n" output += f"- Memory: {format_resource_quantity(capacity.get('memory', 'N/A'))} capacity, " output += f"{format_resource_quantity(allocatable.get('memory', 'N/A'))} allocatable\n" output += f"- Pods: {capacity.get('pods', 'N/A')} capacity, {allocatable.get('pods', 'N/A')} allocatable\n" # Addresses addresses = status.get("addresses", []) if addresses: output += "\nAddresses:\n" for addr in addresses: output += f"- {addr.get('type')}: {addr.get('address')}\n" # Images images = status.get("images", []) if images: output += f"\nImages: {len(images)} cached\n" return [types.TextContent(type="text", text=output)] else: return [types.TextContent(type="text", text=f"Failed to get node: HTTP {result['status_code']}")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except httpx.TimeoutException: return [types.TextContent(type="text", text="Error: Request timed out. The operation may take longer than expected.")] except httpx.ConnectError: return [types.TextContent(type="text", text="Error: Could not connect to Portainer server. Please check the URL and network connection.")] except Exception as e: import traceback error_details = f"Error: {str(e)}\nType: {type(e).__name__}" return [types.TextContent(type="text", text=error_details)] async def run(): """Run the MCP server.""" # Use stdio transport async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="portainer-kubernetes", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions( prompts_changed=False, resources_changed=False, tools_changed=False, ), experimental_capabilities={}, ), ), ) def main(): """Main entry point.""" asyncio.run(run()) if __name__ == "__main__": main()