#!/usr/bin/env python3 """ Portainer Edge MCP Server Provides edge computing functionality through Portainer's API. Manages edge environments, edge stacks, edge groups, and edge jobs. """ import os import sys import json import asyncio import aiohttp import logging from typing import Any, Optional from mcp.server import Server, NotificationOptions from mcp.server.models import InitializationOptions import mcp.server.stdio import mcp.types as types # Set up logging MCP_MODE = os.getenv("MCP_MODE", "true").lower() == "true" if MCP_MODE: # In MCP mode, suppress all logs to stdout/stderr logging.basicConfig(level=logging.CRITICAL + 1) else: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Environment variables PORTAINER_URL = os.getenv("PORTAINER_URL", "").rstrip("/") PORTAINER_API_KEY = os.getenv("PORTAINER_API_KEY", "") # Validate environment if not PORTAINER_URL or not PORTAINER_API_KEY: if not MCP_MODE: logger.error("PORTAINER_URL and PORTAINER_API_KEY must be set") sys.exit(1) # Helper functions async def make_request( method: str, endpoint: str, json_data: Optional[dict] = None, params: Optional[dict] = None, data: Optional[Any] = None, headers: Optional[dict] = None ) -> dict: """Make an authenticated request to Portainer API.""" url = f"{PORTAINER_URL}{endpoint}" default_headers = { "X-API-Key": PORTAINER_API_KEY } if headers: default_headers.update(headers) timeout = aiohttp.ClientTimeout(total=30) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.request( method, url, json=json_data, params=params, data=data, headers=default_headers ) as response: response_text = await response.text() if response.status >= 400: error_msg = f"API request failed: {response.status}" try: error_data = json.loads(response_text) if "message" in error_data: error_msg = f"{error_msg} - {error_data['message']}" elif "details" in error_data: error_msg = f"{error_msg} - {error_data['details']}" except: if response_text: error_msg = f"{error_msg} - {response_text}" return {"error": error_msg} if response_text: return json.loads(response_text) return {} except asyncio.TimeoutError: return {"error": "Request timeout"} except Exception as e: return {"error": f"Request failed: {str(e)}"} def format_edge_status(status: int) -> str: """Format edge environment status with emoji.""" if status == 1: return "🟢 Connected" elif status == 2: return "šŸ”“ Disconnected" else: return "⚪ Unknown" def format_edge_group_type(group_type: int) -> str: """Format edge group type.""" if group_type == 1: return "Static" elif group_type == 2: return "Dynamic" else: return "Unknown" # Create server instance server = Server("portainer-edge") @server.list_tools() async def handle_list_tools() -> list[types.Tool]: """List all available tools.""" return [ # Edge Environment Tools types.Tool( name="list_edge_environments", description="List all edge environments", inputSchema={ "type": "object", "properties": { "group_id": { "type": "string", "description": "Filter by edge group ID (optional)" }, "status": { "type": "string", "enum": ["connected", "disconnected", "all"], "description": "Filter by connection status", "default": "all" } } } ), types.Tool( name="get_edge_environment", description="Get detailed information about an edge environment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "string", "description": "Edge environment ID" } }, "required": ["environment_id"] } ), types.Tool( name="get_edge_status", description="Get the connection status of an edge environment", inputSchema={ "type": "object", "properties": { "environment_id": { "type": "string", "description": "Edge environment ID" } }, "required": ["environment_id"] } ), types.Tool( name="generate_edge_key", description="Generate an edge agent deployment script", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Name for the edge environment" }, "group_id": { "type": "string", "description": "Edge group ID to assign to (optional)" } }, "required": ["name"] } ), # Edge Stack Tools types.Tool( name="list_edge_stacks", description="List all edge stacks", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_edge_stack", description="Get detailed information about an edge stack", inputSchema={ "type": "object", "properties": { "edge_stack_id": { "type": "string", "description": "Edge stack ID" } }, "required": ["edge_stack_id"] } ), types.Tool( name="create_edge_stack", description="Create a new edge stack", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Edge stack name" }, "stack_file_content": { "type": "string", "description": "Stack file content (Docker Compose or Kubernetes manifest)" }, "edge_groups": { "type": "array", "items": {"type": "string"}, "description": "List of edge group IDs to deploy to" }, "deploy_type": { "type": "integer", "description": "Deployment type (0=compose, 1=kubernetes)", "default": 0 } }, "required": ["name", "stack_file_content", "edge_groups"] } ), types.Tool( name="update_edge_stack", description="Update an existing edge stack", inputSchema={ "type": "object", "properties": { "edge_stack_id": { "type": "string", "description": "Edge stack ID" }, "stack_file_content": { "type": "string", "description": "Updated stack file content" }, "edge_groups": { "type": "array", "items": {"type": "string"}, "description": "Updated list of edge group IDs" }, "update_version": { "type": "boolean", "description": "Update the stack version", "default": True } }, "required": ["edge_stack_id"] } ), types.Tool( name="delete_edge_stack", description="Delete an edge stack", inputSchema={ "type": "object", "properties": { "edge_stack_id": { "type": "string", "description": "Edge stack ID" } }, "required": ["edge_stack_id"] } ), # Edge Group Tools types.Tool( name="list_edge_groups", description="List all edge groups", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_edge_group", description="Get detailed information about an edge group", inputSchema={ "type": "object", "properties": { "edge_group_id": { "type": "string", "description": "Edge group ID" } }, "required": ["edge_group_id"] } ), types.Tool( name="create_edge_group", description="Create a new edge group", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Edge group name" }, "dynamic": { "type": "boolean", "description": "Whether this is a dynamic group", "default": False }, "tag_ids": { "type": "array", "items": {"type": "string"}, "description": "Tag IDs for dynamic groups" }, "endpoints": { "type": "array", "items": {"type": "string"}, "description": "Endpoint IDs for static groups" } }, "required": ["name"] } ), types.Tool( name="update_edge_group", description="Update an edge group", inputSchema={ "type": "object", "properties": { "edge_group_id": { "type": "string", "description": "Edge group ID" }, "name": { "type": "string", "description": "Updated group name" }, "tag_ids": { "type": "array", "items": {"type": "string"}, "description": "Updated tag IDs for dynamic groups" }, "endpoints": { "type": "array", "items": {"type": "string"}, "description": "Updated endpoint IDs for static groups" } }, "required": ["edge_group_id"] } ), types.Tool( name="delete_edge_group", description="Delete an edge group", inputSchema={ "type": "object", "properties": { "edge_group_id": { "type": "string", "description": "Edge group ID" } }, "required": ["edge_group_id"] } ), # Edge Job Tools types.Tool( name="list_edge_jobs", description="List all edge jobs", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="get_edge_job", description="Get detailed information about an edge job", inputSchema={ "type": "object", "properties": { "edge_job_id": { "type": "string", "description": "Edge job ID" } }, "required": ["edge_job_id"] } ), types.Tool( name="create_edge_job", description="Create a new edge job", inputSchema={ "type": "object", "properties": { "name": { "type": "string", "description": "Edge job name" }, "script_content": { "type": "string", "description": "Script content to execute" }, "edge_groups": { "type": "array", "items": {"type": "string"}, "description": "Edge group IDs to run the job on" }, "cron_expression": { "type": "string", "description": "Cron expression for scheduling (optional)" }, "recurring": { "type": "boolean", "description": "Whether this is a recurring job", "default": False } }, "required": ["name", "script_content", "edge_groups"] } ), types.Tool( name="delete_edge_job", description="Delete an edge job", inputSchema={ "type": "object", "properties": { "edge_job_id": { "type": "string", "description": "Edge job ID" } }, "required": ["edge_job_id"] } ), # Edge Settings Tools types.Tool( name="get_edge_settings", description="Get global edge settings", inputSchema={ "type": "object", "properties": {} } ), types.Tool( name="update_edge_settings", description="Update global edge settings", inputSchema={ "type": "object", "properties": { "check_in_interval": { "type": "integer", "description": "Edge agent check-in interval in seconds" }, "command_interval": { "type": "integer", "description": "Command execution interval in seconds" }, "ping_interval": { "type": "integer", "description": "Ping interval in seconds" }, "snapshot_interval": { "type": "integer", "description": "Snapshot interval in seconds" }, "tunnel_server_address": { "type": "string", "description": "Tunnel server address" } } } ) ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict | None ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: """Handle tool execution.""" if not arguments: arguments = {} try: # Edge Environment Tools if name == "list_edge_environments": # Get all environments and filter for edge type result = await make_request("GET", "/api/endpoints") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] # Filter for edge environments (Type = 4 or 5) edge_envs = [env for env in result if env.get("Type") in [4, 5]] # Apply additional filters if arguments.get("group_id"): edge_envs = [env for env in edge_envs if env.get("GroupId") == int(arguments["group_id"])] if arguments.get("status") != "all": status_filter = arguments.get("status", "all") if status_filter == "connected": edge_envs = [env for env in edge_envs if env.get("Status") == 1] elif status_filter == "disconnected": edge_envs = [env for env in edge_envs if env.get("Status") == 2] if not edge_envs: return [types.TextContent(type="text", text="No edge environments found")] output = "🌐 Edge Environments:\n\n" for env in edge_envs: status = format_edge_status(env.get("Status", 0)) output += f"• {env['Name']} (ID: {env['Id']})\n" output += f" Status: {status}\n" output += f" URL: {env.get('URL', 'N/A')}\n" if env.get("EdgeID"): output += f" Edge ID: {env['EdgeID']}\n" if env.get("EdgeKey"): output += f" Edge Key: {env['EdgeKey'][:8]}...\n" if env.get("GroupId"): output += f" Group ID: {env['GroupId']}\n" output += f" Last Check-in: {env.get('LastCheckInDate', 'Never')}\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_environment": env_id = arguments["environment_id"] result = await make_request("GET", f"/api/endpoints/{env_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"🌐 Edge Environment: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Status: {format_edge_status(result.get('Status', 0))}\n" output += f"Type: {'Edge Agent' if result.get('Type') == 4 else 'Edge Agent (async)'}\n" output += f"URL: {result.get('URL', 'N/A')}\n" if result.get("EdgeID"): output += f"\nšŸ”‘ Edge Configuration:\n" output += f" Edge ID: {result['EdgeID']}\n" output += f" Edge Key: {result.get('EdgeKey', 'N/A')[:8]}...\n" output += f" Check-in Interval: {result.get('EdgeCheckinInterval', 5)} seconds\n" if result.get("GroupId"): output += f"\nšŸ‘„ Group: {result['GroupId']}\n" if result.get("TagIds"): output += f"\nšŸ·ļø Tags: {', '.join(map(str, result['TagIds']))}\n" output += f"\nšŸ“… Timestamps:\n" output += f" Created: {result.get('CreatedAt', 'Unknown')}\n" output += f" Last Check-in: {result.get('LastCheckInDate', 'Never')}\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_status": env_id = arguments["environment_id"] result = await make_request("GET", f"/api/endpoints/{env_id}/edge/status") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = "šŸ“Š Edge Status Information:\n\n" output += f"Check-in Time: {result.get('CheckinTime', 'Unknown')}\n" output += f"Status: {result.get('Status', 'Unknown')}\n" if result.get("Stacks"): output += f"\nšŸ“š Deployed Stacks: {len(result['Stacks'])}\n" for stack in result["Stacks"]: output += f" • {stack.get('Name', 'Unknown')} (v{stack.get('Version', '?')})\n" return [types.TextContent(type="text", text=output)] elif name == "generate_edge_key": edge_name = arguments["name"] edge_data = { "Name": edge_name, "Type": 4, # Edge Agent "URL": "", "PublicURL": "", "TLS": False } if arguments.get("group_id"): edge_data["GroupId"] = int(arguments["group_id"]) result = await make_request("POST", "/api/endpoints", json_data=edge_data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] edge_id = result.get("Id") edge_key = result.get("EdgeKey", "") output = f"āœ… Edge environment '{edge_name}' created!\n\n" output += f"ID: {edge_id}\n" output += f"Edge Key: {edge_key}\n\n" output += "šŸš€ Deploy Edge Agent with:\n\n" output += "```bash\n" output += f"docker run -d \\\n" output += f" --name portainer_edge_agent \\\n" output += f" --restart always \\\n" output += f" --cap-add CAP_HOST_MANAGEMENT \\\n" output += f" -v /var/run/docker.sock:/var/run/docker.sock \\\n" output += f" -v /var/lib/docker/volumes:/var/lib/docker/volumes \\\n" output += f" -v /:/host \\\n" output += f" -v portainer_agent_data:/data \\\n" output += f" -e EDGE=1 \\\n" output += f" -e EDGE_ID={edge_id} \\\n" output += f" -e EDGE_KEY={edge_key} \\\n" output += f" -e EDGE_INSECURE_POLL=1 \\\n" output += f" -e EDGE_ASYNC=1 \\\n" output += f" portainer/agent:latest\n" output += "```\n" return [types.TextContent(type="text", text=output)] # Edge Stack Tools elif name == "list_edge_stacks": result = await make_request("GET", "/api/edge_stacks") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] if not result: return [types.TextContent(type="text", text="No edge stacks found")] output = "šŸ“š Edge Stacks:\n\n" for stack in result: output += f"• {stack['Name']} (ID: {stack['Id']})\n" output += f" Version: {stack.get('Version', 1)}\n" output += f" Type: {'Compose' if stack.get('DeploymentType') == 0 else 'Kubernetes'}\n" output += f" Edge Groups: {len(stack.get('EdgeGroups', []))}\n" if stack.get("Status"): deployed = sum(1 for s in stack["Status"].values() if s.get("Type") == "OK") output += f" Deployed: {deployed}/{len(stack['Status'])}\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_stack": edge_stack_id = arguments["edge_stack_id"] result = await make_request("GET", f"/api/edge_stacks/{edge_stack_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"šŸ“š Edge Stack: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Version: {result.get('Version', 1)}\n" output += f"Type: {'Compose' if result.get('DeploymentType') == 0 else 'Kubernetes'}\n" output += f"Created: {result.get('CreationDate', 'Unknown')}\n" if result.get("EdgeGroups"): output += f"\nšŸ‘„ Edge Groups ({len(result['EdgeGroups'])}):\n" for group_id in result["EdgeGroups"]: output += f" • Group {group_id}\n" if result.get("Status"): output += f"\nšŸ“Š Deployment Status:\n" for env_id, status in result["Status"].items(): output += f" • Environment {env_id}: {status.get('Type', 'Unknown')}\n" if status.get("Error"): output += f" Error: {status['Error']}\n" return [types.TextContent(type="text", text=output)] elif name == "create_edge_stack": data = { "Name": arguments["name"], "StackFileContent": arguments["stack_file_content"], "EdgeGroups": [int(g) for g in arguments["edge_groups"]], "DeploymentType": arguments.get("deploy_type", 0) } result = await make_request("POST", "/api/edge_stacks", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āœ… Edge stack created successfully!\n\n" output += f"Name: {result['Name']}\n" output += f"ID: {result['Id']}\n" output += f"Version: {result.get('Version', 1)}\n" output += f"Edge Groups: {len(result.get('EdgeGroups', []))}\n" return [types.TextContent(type="text", text=output)] elif name == "update_edge_stack": edge_stack_id = arguments["edge_stack_id"] # Get current stack first current = await make_request("GET", f"/api/edge_stacks/{edge_stack_id}") if "error" in current: return [types.TextContent(type="text", text=f"Error: {current['error']}")] data = {} if arguments.get("stack_file_content"): data["StackFileContent"] = arguments["stack_file_content"] if arguments.get("edge_groups"): data["EdgeGroups"] = [int(g) for g in arguments["edge_groups"]] if arguments.get("update_version", True): data["Version"] = current.get("Version", 1) + 1 result = await make_request("PUT", f"/api/edge_stacks/{edge_stack_id}", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text=f"āœ… Edge stack '{current['Name']}' updated successfully!")] elif name == "delete_edge_stack": edge_stack_id = arguments["edge_stack_id"] result = await make_request("DELETE", f"/api/edge_stacks/{edge_stack_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="šŸ—‘ļø Edge stack deleted successfully!")] # Edge Group Tools elif name == "list_edge_groups": result = await make_request("GET", "/api/edge_groups") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] if not result: return [types.TextContent(type="text", text="No edge groups found")] output = "šŸ‘„ Edge Groups:\n\n" for group in result: group_type = format_edge_group_type(2 if group.get("Dynamic") else 1) output += f"• {group['Name']} (ID: {group['Id']})\n" output += f" Type: {group_type}\n" if group.get("Dynamic") and group.get("TagIds"): output += f" Tags: {len(group['TagIds'])} tags\n" elif not group.get("Dynamic") and group.get("Endpoints"): output += f" Endpoints: {len(group['Endpoints'])} environments\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_group": edge_group_id = arguments["edge_group_id"] result = await make_request("GET", f"/api/edge_groups/{edge_group_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"šŸ‘„ Edge Group: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Type: {format_edge_group_type(2 if result.get('Dynamic') else 1)}\n" if result.get("Dynamic"): output += f"\nšŸ·ļø Dynamic Tags:\n" if result.get("TagIds"): for tag_id in result["TagIds"]: output += f" • Tag {tag_id}\n" else: output += f"\nšŸ–„ļø Static Endpoints:\n" if result.get("Endpoints"): for endpoint_id in result["Endpoints"]: output += f" • Environment {endpoint_id}\n" return [types.TextContent(type="text", text=output)] elif name == "create_edge_group": data = { "Name": arguments["name"], "Dynamic": arguments.get("dynamic", False) } if data["Dynamic"] and arguments.get("tag_ids"): data["TagIds"] = [int(t) for t in arguments["tag_ids"]] elif not data["Dynamic"] and arguments.get("endpoints"): data["Endpoints"] = [int(e) for e in arguments["endpoints"]] result = await make_request("POST", "/api/edge_groups", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āœ… Edge group created successfully!\n\n" output += f"Name: {result['Name']}\n" output += f"ID: {result['Id']}\n" output += f"Type: {format_edge_group_type(2 if result.get('Dynamic') else 1)}\n" return [types.TextContent(type="text", text=output)] elif name == "update_edge_group": edge_group_id = arguments["edge_group_id"] # Get current group current = await make_request("GET", f"/api/edge_groups/{edge_group_id}") if "error" in current: return [types.TextContent(type="text", text=f"Error: {current['error']}")] data = current.copy() if arguments.get("name"): data["Name"] = arguments["name"] if current.get("Dynamic") and arguments.get("tag_ids"): data["TagIds"] = [int(t) for t in arguments["tag_ids"]] elif not current.get("Dynamic") and arguments.get("endpoints"): data["Endpoints"] = [int(e) for e in arguments["endpoints"]] result = await make_request("PUT", f"/api/edge_groups/{edge_group_id}", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text=f"āœ… Edge group '{current['Name']}' updated successfully!")] elif name == "delete_edge_group": edge_group_id = arguments["edge_group_id"] result = await make_request("DELETE", f"/api/edge_groups/{edge_group_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="šŸ—‘ļø Edge group deleted successfully!")] # Edge Job Tools elif name == "list_edge_jobs": result = await make_request("GET", "/api/edge_jobs") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] if not result: return [types.TextContent(type="text", text="No edge jobs found")] output = "āš™ļø Edge Jobs:\n\n" for job in result: output += f"• {job['Name']} (ID: {job['Id']})\n" output += f" Created: {job.get('Created', 'Unknown')}\n" if job.get("CronExpression"): output += f" Schedule: {job['CronExpression']}\n" output += f" Recurring: Yes\n" else: output += f" Recurring: No\n" if job.get("EdgeGroups"): output += f" Edge Groups: {len(job['EdgeGroups'])}\n" output += "\n" return [types.TextContent(type="text", text=output)] elif name == "get_edge_job": edge_job_id = arguments["edge_job_id"] result = await make_request("GET", f"/api/edge_jobs/{edge_job_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āš™ļø Edge Job: {result['Name']}\n\n" output += f"ID: {result['Id']}\n" output += f"Created: {result.get('Created', 'Unknown')}\n" output += f"Recurring: {'Yes' if result.get('Recurring') else 'No'}\n" if result.get("CronExpression"): output += f"Schedule: {result['CronExpression']}\n" if result.get("EdgeGroups"): output += f"\nšŸ‘„ Target Edge Groups:\n" for group_id in result["EdgeGroups"]: output += f" • Group {group_id}\n" if result.get("ScriptPath"): output += f"\nšŸ“œ Script: {result['ScriptPath']}\n" return [types.TextContent(type="text", text=output)] elif name == "create_edge_job": data = { "Name": arguments["name"], "ScriptContent": arguments["script_content"], "EdgeGroups": [int(g) for g in arguments["edge_groups"]], "Recurring": arguments.get("recurring", False) } if arguments.get("cron_expression"): data["CronExpression"] = arguments["cron_expression"] data["Recurring"] = True result = await make_request("POST", "/api/edge_jobs", json_data=data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = f"āœ… Edge job created successfully!\n\n" output += f"Name: {result['Name']}\n" output += f"ID: {result['Id']}\n" output += f"Recurring: {'Yes' if result.get('Recurring') else 'No'}\n" return [types.TextContent(type="text", text=output)] elif name == "delete_edge_job": edge_job_id = arguments["edge_job_id"] result = await make_request("DELETE", f"/api/edge_jobs/{edge_job_id}") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="šŸ—‘ļø Edge job deleted successfully!")] # Edge Settings Tools elif name == "get_edge_settings": result = await make_request("GET", "/api/settings") if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] output = "āš™ļø Edge Settings:\n\n" edge_settings = result.get("Edge", {}) output += f"Check-in Interval: {edge_settings.get('CheckinInterval', 5)} seconds\n" output += f"Command Interval: {edge_settings.get('CommandInterval', 5)} seconds\n" output += f"Ping Interval: {edge_settings.get('PingInterval', 5)} seconds\n" output += f"Snapshot Interval: {edge_settings.get('SnapshotInterval', 5)} seconds\n" output += f"Tunnel Server Address: {edge_settings.get('TunnelServerAddress', 'Not configured')}\n" return [types.TextContent(type="text", text=output)] elif name == "update_edge_settings": # Get current settings first current = await make_request("GET", "/api/settings") if "error" in current: return [types.TextContent(type="text", text=f"Error: {current['error']}")] edge_settings = current.get("Edge", {}) # Update only provided settings if arguments.get("check_in_interval"): edge_settings["CheckinInterval"] = arguments["check_in_interval"] if arguments.get("command_interval"): edge_settings["CommandInterval"] = arguments["command_interval"] if arguments.get("ping_interval"): edge_settings["PingInterval"] = arguments["ping_interval"] if arguments.get("snapshot_interval"): edge_settings["SnapshotInterval"] = arguments["snapshot_interval"] if arguments.get("tunnel_server_address"): edge_settings["TunnelServerAddress"] = arguments["tunnel_server_address"] # Update settings update_data = {"Edge": edge_settings} result = await make_request("PUT", "/api/settings", json_data=update_data) if "error" in result: return [types.TextContent(type="text", text=f"Error: {result['error']}")] return [types.TextContent(type="text", text="āœ… Edge settings updated successfully!")] else: return [types.TextContent(type="text", text=f"Unknown tool: {name}")] except Exception as e: logger.error(f"Error in {name}: {str(e)}", exc_info=True) return [types.TextContent(type="text", text=f"Error: {str(e)}")] async def main(): # Run the server using stdin/stdout streams async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="portainer-edge", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())