#!/usr/bin/env python3 """ Portainer Edge MCP Server Provides edge computing functionality through Portainer's API. Manages edge environments, edge stacks, edge groups, and edge jobs. This module implements comprehensive edge computing management including: - Edge environment registration and monitoring - Edge agent deployment and configuration - Edge groups for organizing edge devices - Edge stacks for distributed application deployment - Edge jobs for remote command execution - Edge environment status tracking and diagnostics The server supports various edge computing scenarios: - IoT device management at scale - Distributed application deployment to edge locations - Remote configuration and updates - Centralized monitoring of edge infrastructure - Batch operations across edge device groups Complexity: O(n) for list operations where n is number of edge environments Dependencies: aiohttp for async HTTP, mcp for server protocol Call Flow: MCP client -> handle_call_tool() -> make_request() -> Portainer API Environment Variables: PORTAINER_URL: Base URL of Portainer instance (required) PORTAINER_API_KEY: API key for authentication (required) MCP_MODE: Set to "true" to suppress logging (default: true) Edge Concepts: - Edge Environment: Remote Docker/K8s instance with edge agent - Edge Group: Logical grouping of edge environments - Edge Stack: Application deployed to multiple edge environments - Edge Job: Script/command executed on edge environments """ import os import sys import json import asyncio import aiohttp import logging from typing import Any, Optional from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types # Set up logging MCP_MODE = os.getenv("MCP_MODE", "true").lower() == "true" if MCP_MODE: # In MCP mode, suppress all logs to stdout/stderr logging.basicConfig(level=logging.CRITICAL + 1) else: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Environment variables PORTAINER_URL = os.getenv("PORTAINER_URL", "").rstrip("/") PORTAINER_API_KEY = os.getenv("PORTAINER_API_KEY", "") # Validate environment if not PORTAINER_URL or not PORTAINER_API_KEY: if not MCP_MODE: logger.error("PORTAINER_URL and PORTAINER_API_KEY must be set") sys.exit(1) # Helper functions async def make_request( method: str, endpoint: str, json_data: Optional[dict] = None, params: Optional[dict] = None, data: Optional[Any] = None, headers: Optional[dict] = None ) -> dict: """ Make an authenticated request to Portainer API. Centralized HTTP request handler for all Portainer Edge API interactions. Handles authentication, error responses, and timeout management. Args: method: HTTP method (GET, POST, PUT, DELETE) endpoint: API endpoint path (e.g., /api/edge_groups) json_data: Optional JSON payload for request body params: Optional query parameters data: Optional form data or raw body content headers: Optional additional headers to merge with defaults Returns: Dict containing response data or error information Error responses have 'error' key with descriptive message Complexity: O(1) - Single HTTP request Call Flow: - Called by: All tool handler functions - Calls: aiohttp for async HTTP operations Error Handling: - HTTP 4xx/5xx errors return structured error dict - Timeout errors (30s) return timeout error - Network errors return connection error - Parses Portainer error details from response """ url = f"{PORTAINER_URL}{endpoint}" default_headers = { "X-API-Key": PORTAINER_API_KEY } if headers: default_headers.update(headers) timeout = aiohttp.ClientTimeout(total=30) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.request( method, url, json=json_data, params=params, data=data, headers=default_headers ) as response: response_text = await response.text() if response.status >= 400: error_msg = f"API request failed: {response.status}" try: error_data = json.loads(response_text) if "message" in error_data: error_msg = f"{error_msg} - {error_data['message']}" elif "details" in error_data: error_msg = f"{error_msg} - {error_data['details']}" except: if response_text: error_msg = f"{error_msg} - {response_text}" return {"error": error_msg} if response_text: return json.loads(response_text) return {} except asyncio.TimeoutError: return {"error": "Request timeout"} except Exception as e: return {"error": f"Request failed: {str(e)}"} def format_edge_status(status: int) -> str: """ Format edge environment status with emoji. Converts numeric edge status codes to user-friendly strings with visual indicators for quick status recognition. Args: status: Edge environment status code from API Returns: Formatted status string with emoji indicator Complexity: O(1) - Simple lookup Call Flow: - Called by: Edge environment listing functions - Calls: None (pure function) Status Codes: - 1: Connected (šŸ”“ green) - Agent is online - 2: Disconnected (šŸ”“ red) - Agent is offline - Other: Unknown (ā”) - Undefined status """ if status == 1: return "🟢 Connected" elif status == 2: return "šŸ”“ Disconnected" else: return "⚪ Unknown" def format_edge_group_type(group_type: int) -> str: """Format edge group type.""" if group_type == 1: return "Static" elif group_type == 2: return "Dynamic" else: return "Unknown" # Create server instance server = Server("portainer-edge") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List all available tools.""" return [ # Edge Environment Tools types.Tool( name="list_edge_environments", description="List all edge environments", inputSchema={ "type": "object", "properties": { "group_id": { "type": "string", "description": "Filter by edge group ID (optional)" }, "status": { "type": "string", "enum": ["connected", "disconnected", "all"], "description": "Filter by connection status", "default": "all" } } } ), types.Tool( name="get_edge_environment", description="Get detailed information about an edge environment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "string", "description": "Edge environment ID" } }, "required": ["environment_id"] } ), types.Tool( name="get_edge_status", description="Get the connection status of an edge environment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "string", "description": "Edge environment ID" } }, "required": ["environment_id"] } ), types.Tool( name="generate_edge_key", description="Generate an edge agent deployment script", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name for the edge environment" }, "group_id": { "type": "string", "description": "Edge group ID to assign to (optional)" } }, "required": ["name"] } ), # Edge Stack Tools types.Tool( name="list_edge_stacks", description="List all edge stacks", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_edge_stack", description="Get detailed information about an edge stack", inputSchema={ "type": "object", "properties": { "edge_stack_id": { "type": "string", "description": "Edge stack ID" } }, "required": ["edge_stack_id"] } ), types.Tool( name="create_edge_stack", description="Create a new edge stack", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Edge stack name" }, "stack_file_content": { "type": "string", "description": "Stack file content (Docker Compose or Kubernetes manifest)" }, "edge_groups": { "type": "array", "items": {"type": "string"}, "description": "List of edge group IDs to deploy to" }, "deploy_type": { "type": "integer", "description": "Deployment type (0=compose, 1=kubernetes)", "default": 0 } }, "required": ["name", "stack_file_content", "edge_groups"] } ), types.Tool( name="update_edge_stack", description="Update an existing edge stack", inputSchema={ "type": "object", "properties": { "edge_stack_id": { "type": "string", "description": "Edge stack ID" }, "stack_file_content": { "type": "string", "description": "Updated stack file content" }, "edge_groups": { "type": "array", "items": {"type": "string"}, "description": "Updated list of edge group IDs" }, "update_version": { "type": "boolean", "description": "Update the stack version", "default": True } }, "required": ["edge_stack_id"] } ), types.Tool( name="delete_edge_stack", description="Delete an edge stack", inputSchema={ "type": "object", "properties": { "edge_stack_id": { "type": "string", "description": "Edge stack ID" } }, "required": ["edge_stack_id"] } ), # Edge Group Tools types.Tool( name="list_edge_groups", description="List all edge groups", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_edge_group", description="Get detailed information about an edge group", inputSchema={ "type": "object", "properties": { "edge_group_id": { "type": "string", "description": "Edge group ID" } }, "required": ["edge_group_id"] } ), types.Tool( name="create_edge_group", description="Create a new edge group", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Edge group name" }, "dynamic": { "type": "boolean", "description": "Whether this is a dynamic group", "default": False }, "tag_ids": { "type": "array", "items": {"type": "string"}, "description": "Tag IDs for dynamic groups" }, "endpoints": { "type": "array", "items": {"type": "string"}, "description": "Endpoint IDs for static groups" } }, "required": ["name"] } ), types.Tool( name="update_edge_group", description="Update an edge group", inputSchema={ "type": "object", "properties": { "edge_group_id": { "type": "string", "description": "Edge group ID" }, "name": { "type": "string", "description": "Updated group name" }, "tag_ids": { "type": "array", "items": {"type": "string"}, "description": "Updated tag IDs for dynamic groups" }, "endpoints": { "type": "array", "items": {"type": "string"}, "description": "Updated endpoint IDs for static groups" } }, "required": ["edge_group_id"] } ), types.Tool( name="delete_edge_group", description="Delete an edge group", inputSchema={ "type": "object", "properties": { "edge_group_id": { "type": "string", "description": "Edge group ID" } }, "required": ["edge_group_id"] } ), # Edge Job Tools types.Tool( name="list_edge_jobs", description="List all edge jobs", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_edge_job", description="Get detailed information about an edge job", inputSchema={ "type": "object", "properties": { "edge_job_id": { "type": "string", "description": "Edge job ID" } }, "required": ["edge_job_id"] } ), types.Tool( name="create_edge_job", description="Create a new edge job", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Edge job name" }, "script_content": { "type": "string", "description": "Script content to execute" }, "edge_groups": { "type": "array", "items": {"type": "string"}, "description": "Edge group IDs to run the job on" }, "cron_expression": { "type": "string", "description": "Cron expression for scheduling (optional)" }, "recurring": { "type": "boolean", "description": "Whether this is a recurring job", "default": False } }, "required": ["name", "script_content", "edge_groups"] } ), types.Tool( name="delete_edge_job", description="Delete an edge job", inputSchema={ "type": "object", "properties": { "edge_job_id": { "type": "string", "description": "Edge job ID" } }, "required": ["edge_job_id"] } ), # Edge Settings Tools types.Tool( name="get_edge_settings", description="Get global edge settings", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="update_edge_settings", description="Update global edge settings", inputSchema={ "type": "object", "properties": { "check_in_interval": { "type": "integer", "description": "Edge agent check-in interval in seconds" }, "command_interval": { "type": "integer", "description": "Command execution interval in seconds" }, "ping_interval": { "type": "integer", "description": "Ping interval in seconds" }, "snapshot_interval": { "type": "integer", "description": "Snapshot interval in seconds" }, "tunnel_server_address": { "type": "string", "description": "Tunnel server address" } } } ) ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool execution.""" if not arguments: arguments = {} try: # Edge Environment Tools if name == "list_edge_environments": # Get all environments and filter for edge type result = await make_request("GET", "/api/endpoints") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] # Filter for edge environments (Type = 4 or 5) edge_envs = [env for env in result if env.get("Type") in [4, 5]] # Apply additional filters if arguments.get("group_id"): edge_envs = [env for env in edge_envs if env.get("GroupId") == int(arguments["group_id"])] if arguments.get("status") != "all": status_filter = arguments.get("status", "all") if status_filter == "connected": edge_envs = [env for env in edge_envs if env.get("Status") == 1] elif status_filter == "disconnected": edge_envs = [env for env in edge_envs if env.get("Status") == 2] if not edge_envs: return [types.TextContent(type="text", text="No edge environments found")] output = "🌐 Edge Environments:\n\n" for env in edge_envs: status = format_edge_status(env.get("Status", 0)) output += f"• {env['Name']} (ID: {env['Id']})\n" output += f" Status: {status}\n" output += f" URL: {env.get('URL', 'N/A')}\n" if env.get("EdgeID"): output += f" Edge ID: {env['EdgeID']}\n" if env.get("EdgeKey"): output += f" Edge Key: {env['EdgeKey'][:8]}...\n" if env.get("GroupId"): output += f" Group ID: {env['GroupId']}\n" output += f" Last Check-in: {env.get('LastCheckInDate', 'Never')}\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_environment": env_id = arguments["environment_id"] result = await make_request("GET", f"/api/endpoints/{env_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"🌐 Edge Environment: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Status: {format_edge_status(result.get('Status', 0))}\n" output += f"Type: {'Edge Agent' if result.get('Type') == 4 else 'Edge Agent (async)'}\n" output += f"URL: {result.get('URL', 'N/A')}\n" if result.get("EdgeID"): output += f"\nšŸ”‘ Edge Configuration:\n" output += f" Edge ID: {result['EdgeID']}\n" output += f" Edge Key: {result.get('EdgeKey', 'N/A')[:8]}...\n" output += f" Check-in Interval: {result.get('EdgeCheckinInterval', 5)} seconds\n" if result.get("GroupId"): output += f"\nšŸ‘„ Group: {result['GroupId']}\n" if result.get("TagIds"): output += f"\nšŸ·ļø Tags: {', '.join(map(str, result['TagIds']))}\n" output += f"\nšŸ“… Timestamps:\n" output += f" Created: {result.get('CreatedAt', 'Unknown')}\n" output += f" Last Check-in: {result.get('LastCheckInDate', 'Never')}\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_status": env_id = arguments["environment_id"] result = await make_request("GET", f"/api/endpoints/{env_id}/edge/status") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = "šŸ“Š Edge Status Information:\n\n" output += f"Check-in Time: {result.get('CheckinTime', 'Unknown')}\n" output += f"Status: {result.get('Status', 'Unknown')}\n" if result.get("Stacks"): output += f"\nšŸ“š Deployed Stacks: {len(result['Stacks'])}\n" for stack in result["Stacks"]: output += f" • {stack.get('Name', 'Unknown')} (v{stack.get('Version', '?')})\n" return [types.TextContent(type="text", text=output)] elif name == "generate_edge_key": edge_name = arguments["name"] edge_data = { "Name": edge_name, "Type": 4, # Edge Agent "URL": "", "PublicURL": "", "TLS": False } if arguments.get("group_id"): edge_data["GroupId"] = int(arguments["group_id"]) result = await make_request("POST", "/api/endpoints", json_data=edge_data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] edge_id = result.get("Id") edge_key = result.get("EdgeKey", "") output = f"āœ… Edge environment '{edge_name}' created!\n\n" output += f"ID: {edge_id}\n" output += f"Edge Key: {edge_key}\n\n" output += "šŸš€ Deploy Edge Agent with:\n\n" output += "```bash\n" output += f"docker run -d \\\n" output += f" --name portainer_edge_agent \\\n" output += f" --restart always \\\n" output += f" --cap-add CAP_HOST_MANAGEMENT \\\n" output += f" -v /var/run/docker.sock:/var/run/docker.sock \\\n" output += f" -v /var/lib/docker/volumes:/var/lib/docker/volumes \\\n" output += f" -v /:/host \\\n" output += f" -v portainer_agent_data:/data \\\n" output += f" -e EDGE=1 \\\n" output += f" -e EDGE_ID={edge_id} \\\n" output += f" -e EDGE_KEY={edge_key} \\\n" output += f" -e EDGE_INSECURE_POLL=1 \\\n" output += f" -e EDGE_ASYNC=1 \\\n" output += f" portainer/agent:latest\n" output += "```\n" return [types.TextContent(type="text", text=output)] # Edge Stack Tools elif name == "list_edge_stacks": result = await make_request("GET", "/api/edge_stacks") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] if not result: return [types.TextContent(type="text", text="No edge stacks found")] output = "šŸ“š Edge Stacks:\n\n" for stack in result: output += f"• {stack['Name']} (ID: {stack['Id']})\n" output += f" Version: {stack.get('Version', 1)}\n" output += f" Type: {'Compose' if stack.get('DeploymentType') == 0 else 'Kubernetes'}\n" output += f" Edge Groups: {len(stack.get('EdgeGroups', []))}\n" if stack.get("Status"): deployed = sum(1 for s in stack["Status"].values() if s.get("Type") == "OK") output += f" Deployed: {deployed}/{len(stack['Status'])}\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_stack": edge_stack_id = arguments["edge_stack_id"] result = await make_request("GET", f"/api/edge_stacks/{edge_stack_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"šŸ“š Edge Stack: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Version: {result.get('Version', 1)}\n" output += f"Type: {'Compose' if result.get('DeploymentType') == 0 else 'Kubernetes'}\n" output += f"Created: {result.get('CreationDate', 'Unknown')}\n" if result.get("EdgeGroups"): output += f"\nšŸ‘„ Edge Groups ({len(result['EdgeGroups'])}):\n" for group_id in result["EdgeGroups"]: output += f" • Group {group_id}\n" if result.get("Status"): output += f"\nšŸ“Š Deployment Status:\n" for env_id, status in result["Status"].items(): output += f" • Environment {env_id}: {status.get('Type', 'Unknown')}\n" if status.get("Error"): output += f" Error: {status['Error']}\n" return [types.TextContent(type="text", text=output)] elif name == "create_edge_stack": data = { "Name": arguments["name"], "StackFileContent": arguments["stack_file_content"], "EdgeGroups": [int(g) for g in arguments["edge_groups"]], "DeploymentType": arguments.get("deploy_type", 0) } result = await make_request("POST", "/api/edge_stacks", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āœ… Edge stack created successfully!\n\n" output += f"Name: {result['Name']}\n" output += f"ID: {result['Id']}\n" output += f"Version: {result.get('Version', 1)}\n" output += f"Edge Groups: {len(result.get('EdgeGroups', []))}\n" return [types.TextContent(type="text", text=output)] elif name == "update_edge_stack": edge_stack_id = arguments["edge_stack_id"] # Get current stack first current = await make_request("GET", f"/api/edge_stacks/{edge_stack_id}") if "error" in current: return [types.TextContent(type="text", text=f"Error: {current['error']}")] data = {} if arguments.get("stack_file_content"): data["StackFileContent"] = arguments["stack_file_content"] if arguments.get("edge_groups"): data["EdgeGroups"] = [int(g) for g in arguments["edge_groups"]] if arguments.get("update_version", True): data["Version"] = current.get("Version", 1) + 1 result = await make_request("PUT", f"/api/edge_stacks/{edge_stack_id}", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text=f"āœ… Edge stack '{current['Name']}' updated successfully!")] elif name == "delete_edge_stack": edge_stack_id = arguments["edge_stack_id"] result = await make_request("DELETE", f"/api/edge_stacks/{edge_stack_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="šŸ—‘ļø Edge stack deleted successfully!")] # Edge Group Tools elif name == "list_edge_groups": result = await make_request("GET", "/api/edge_groups") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] if not result: return [types.TextContent(type="text", text="No edge groups found")] output = "šŸ‘„ Edge Groups:\n\n" for group in result: group_type = format_edge_group_type(2 if group.get("Dynamic") else 1) output += f"• {group['Name']} (ID: {group['Id']})\n" output += f" Type: {group_type}\n" if group.get("Dynamic") and group.get("TagIds"): output += f" Tags: {len(group['TagIds'])} tags\n" elif not group.get("Dynamic") and group.get("Endpoints"): output += f" Endpoints: {len(group['Endpoints'])} environments\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_group": edge_group_id = arguments["edge_group_id"] result = await make_request("GET", f"/api/edge_groups/{edge_group_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"šŸ‘„ Edge Group: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Type: {format_edge_group_type(2 if result.get('Dynamic') else 1)}\n" if result.get("Dynamic"): output += f"\nšŸ·ļø Dynamic Tags:\n" if result.get("TagIds"): for tag_id in result["TagIds"]: output += f" • Tag {tag_id}\n" else: output += f"\nšŸ–„ļø Static Endpoints:\n" if result.get("Endpoints"): for endpoint_id in result["Endpoints"]: output += f" • Environment {endpoint_id}\n" return [types.TextContent(type="text", text=output)] elif name == "create_edge_group": data = { "Name": arguments["name"], "Dynamic": arguments.get("dynamic", False) } if data["Dynamic"] and arguments.get("tag_ids"): data["TagIds"] = [int(t) for t in arguments["tag_ids"]] elif not data["Dynamic"] and arguments.get("endpoints"): data["Endpoints"] = [int(e) for e in arguments["endpoints"]] result = await make_request("POST", "/api/edge_groups", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āœ… Edge group created successfully!\n\n" output += f"Name: {result['Name']}\n" output += f"ID: {result['Id']}\n" output += f"Type: {format_edge_group_type(2 if result.get('Dynamic') else 1)}\n" return [types.TextContent(type="text", text=output)] elif name == "update_edge_group": edge_group_id = arguments["edge_group_id"] # Get current group current = await make_request("GET", f"/api/edge_groups/{edge_group_id}") if "error" in current: return [types.TextContent(type="text", text=f"Error: {current['error']}")] data = current.copy() if arguments.get("name"): data["Name"] = arguments["name"] if current.get("Dynamic") and arguments.get("tag_ids"): data["TagIds"] = [int(t) for t in arguments["tag_ids"]] elif not current.get("Dynamic") and arguments.get("endpoints"): data["Endpoints"] = [int(e) for e in arguments["endpoints"]] result = await make_request("PUT", f"/api/edge_groups/{edge_group_id}", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text=f"āœ… Edge group '{current['Name']}' updated successfully!")] elif name == "delete_edge_group": edge_group_id = arguments["edge_group_id"] result = await make_request("DELETE", f"/api/edge_groups/{edge_group_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="šŸ—‘ļø Edge group deleted successfully!")] # Edge Job Tools elif name == "list_edge_jobs": result = await make_request("GET", "/api/edge_jobs") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] if not result: return [types.TextContent(type="text", text="No edge jobs found")] output = "āš™ļø Edge Jobs:\n\n" for job in result: output += f"• {job['Name']} (ID: {job['Id']})\n" output += f" Created: {job.get('Created', 'Unknown')}\n" if job.get("CronExpression"): output += f" Schedule: {job['CronExpression']}\n" output += f" Recurring: Yes\n" else: output += f" Recurring: No\n" if job.get("EdgeGroups"): output += f" Edge Groups: {len(job['EdgeGroups'])}\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_job": edge_job_id = arguments["edge_job_id"] result = await make_request("GET", f"/api/edge_jobs/{edge_job_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āš™ļø Edge Job: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Created: {result.get('Created', 'Unknown')}\n" output += f"Recurring: {'Yes' if result.get('Recurring') else 'No'}\n" if result.get("CronExpression"): output += f"Schedule: {result['CronExpression']}\n" if result.get("EdgeGroups"): output += f"\nšŸ‘„ Target Edge Groups:\n" for group_id in result["EdgeGroups"]: output += f" • Group {group_id}\n" if result.get("ScriptPath"): output += f"\nšŸ“œ Script: {result['ScriptPath']}\n" return [types.TextContent(type="text", text=output)] elif name == "create_edge_job": data = { "Name": arguments["name"], "ScriptContent": arguments["script_content"], "EdgeGroups": [int(g) for g in arguments["edge_groups"]], "Recurring": arguments.get("recurring", False) } if arguments.get("cron_expression"): data["CronExpression"] = arguments["cron_expression"] data["Recurring"] = True result = await make_request("POST", "/api/edge_jobs", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āœ… Edge job created successfully!\n\n" output += f"Name: {result['Name']}\n" output += f"ID: {result['Id']}\n" output += f"Recurring: {'Yes' if result.get('Recurring') else 'No'}\n" return [types.TextContent(type="text", text=output)] elif name == "delete_edge_job": edge_job_id = arguments["edge_job_id"] result = await make_request("DELETE", f"/api/edge_jobs/{edge_job_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="šŸ—‘ļø Edge job deleted successfully!")] # Edge Settings Tools elif name == "get_edge_settings": result = await make_request("GET", "/api/settings") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = "āš™ļø Edge Settings:\n\n" edge_settings = result.get("Edge", {}) output += f"Check-in Interval: {edge_settings.get('CheckinInterval', 5)} seconds\n" output += f"Command Interval: {edge_settings.get('CommandInterval', 5)} seconds\n" output += f"Ping Interval: {edge_settings.get('PingInterval', 5)} seconds\n" output += f"Snapshot Interval: {edge_settings.get('SnapshotInterval', 5)} seconds\n" output += f"Tunnel Server Address: {edge_settings.get('TunnelServerAddress', 'Not configured')}\n" return [types.TextContent(type="text", text=output)] elif name == "update_edge_settings": # Get current settings first current = await make_request("GET", "/api/settings") if "error" in current: return [types.TextContent(type="text", text=f"Error: {current['error']}")] edge_settings = current.get("Edge", {}) # Update only provided settings if arguments.get("check_in_interval"): edge_settings["CheckinInterval"] = arguments["check_in_interval"] if arguments.get("command_interval"): edge_settings["CommandInterval"] = arguments["command_interval"] if arguments.get("ping_interval"): edge_settings["PingInterval"] = arguments["ping_interval"] if arguments.get("snapshot_interval"): edge_settings["SnapshotInterval"] = arguments["snapshot_interval"] if arguments.get("tunnel_server_address"): edge_settings["TunnelServerAddress"] = arguments["tunnel_server_address"] # Update settings update_data = {"Edge": edge_settings} result = await make_request("PUT", "/api/settings", json_data=update_data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="āœ… Edge settings updated successfully!")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Error in {name}: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"Error: {str(e)}")] async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="portainer-edge", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())