first commit

This commit is contained in:
Adolfo Delorenzo 2025-07-18 07:33:27 -06:00
commit 84ca8aee99
45 changed files with 3860 additions and 0 deletions

36
.env.example Normal file
View File

@ -0,0 +1,36 @@
# Portainer Core MCP Server Configuration
# Portainer connection settings
PORTAINER_URL=https://your-portainer-instance.com
# Authentication settings (choose one method)
# Method 1: API Key authentication (recommended)
PORTAINER_API_KEY=your-api-key-here
# Method 2: Username/Password authentication
# PORTAINER_USERNAME=admin
# PORTAINER_PASSWORD=your-password-here
# HTTP client settings
HTTP_TIMEOUT=30
MAX_RETRIES=3
RETRY_DELAY=1.0
# Circuit breaker settings
CIRCUIT_BREAKER_FAILURE_THRESHOLD=5
CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60
# Token management settings
TOKEN_CACHE_TTL=3600
TOKEN_REFRESH_THRESHOLD=300
# Logging settings
LOG_LEVEL=INFO
LOG_FORMAT=json
# Development settings
DEBUG=false
# Server settings
SERVER_HOST=localhost
SERVER_PORT=8000

181
CLAUDE.md Normal file
View File

@ -0,0 +1,181 @@
# Portainer Core MCP Server
This MCP server provides authentication and core management functionality for Portainer Business Edition.
## Server Purpose
**Primary Functions:**
- User authentication and session management
- API token generation and validation
- User profile and settings management
- Basic system information retrieval
**Part of Portainer MCP Suite:**
- `portainer-core` - Authentication and user management (this server)
- `portainer-teams` - Teams and RBAC management
- `portainer-environments` - Environment and endpoint management
- `portainer-docker` - Docker container operations
- `portainer-kubernetes` - Kubernetes cluster management
- `portainer-stacks` - Stack deployment and management
- `portainer-edge` - Edge computing and device management
## Authentication Flow
**Required for all operations:**
1. Initialize admin user (first time setup)
2. Authenticate to get JWT token
3. Use token in `X-API-Key` header for all requests
**Token Management:**
- Tokens expire based on server configuration
- Generate new tokens before expiration
- Store tokens securely in environment variables
## Base Configuration
**Environment Variables:**
```bash
PORTAINER_URL=https://your-portainer-instance.com
PORTAINER_API_KEY=your-api-token
```
**API Base URLs:**
- Business Edition: `{PORTAINER_URL}/api`
- All endpoints require authentication except initial admin setup
## Common Response Patterns
**Success Response Structure:**
```json
{
"Id": 1,
"Username": "admin",
"Role": 1,
"CreationDate": 1631852794
}
```
**Error Response Structure:**
```json
{
"message": "Invalid credentials",
"details": "Authentication failed"
}
```
## Core Endpoint Categories
**Authentication Endpoints:**
- `POST /api/users/admin/init` - Initialize admin user
- `POST /api/auth` - Authenticate user
- `POST /api/users/{id}/tokens` - Generate API token
- `DELETE /api/auth` - Logout/invalidate session
**User Management:**
- `GET /api/users` - List users
- `POST /api/users` - Create user
- `GET /api/users/{id}` - Get user details
- `PUT /api/users/{id}` - Update user
- `DELETE /api/users/{id}` - Delete user
**Settings:**
- `GET /api/settings` - Get Portainer settings
- `PUT /api/settings` - Update settings
- `GET /api/settings/public` - Get public settings
## Error Handling
**Common HTTP Status Codes:**
- `200` - Success
- `201` - Created
- `400` - Bad Request (invalid parameters)
- `401` - Unauthorized (invalid/missing token)
- `403` - Forbidden (insufficient permissions)
- `404` - Not Found
- `409` - Conflict (resource already exists)
- `500` - Internal Server Error
**Retry Logic:**
- Implement exponential backoff for 5xx errors
- Refresh token on 401 responses
- Maximum 3 retry attempts
## Security Considerations
**Best Practices:**
- Always use HTTPS in production
- Rotate API tokens regularly
- Implement proper token storage
- Log authentication events
- Rate limit API calls
**RBAC Integration:**
- Check user permissions before operations
- Respect environment-level access controls
- Honor team-based restrictions
## Development Workflow
**Testing Authentication:**
```bash
# Test admin initialization
curl -X POST "${PORTAINER_URL}/api/users/admin/init" \
-H "Content-Type: application/json" \
-d '{"Username":"admin","Password":"yourpassword"}'
# Test login
curl -X POST "${PORTAINER_URL}/api/auth" \
-H "Content-Type: application/json" \
-d '{"Username":"admin","Password":"yourpassword"}'
```
**Local Development:**
- Use Docker Compose with Portainer for testing
- Mock authentication for unit tests
- Validate all endpoints with proper error handling
## Integration Notes
**With Other MCP Servers:**
- Share authentication state across Portainer MCP servers
- Use consistent error handling patterns
- Maintain session context for user operations
**Rate Limits:**
- Portainer has built-in rate limiting
- Implement client-side throttling
- Monitor for 429 responses
## Troubleshooting
**Common Issues:**
- **401 Unauthorized**: Check token validity and format
- **403 Forbidden**: Verify user permissions and RBAC settings
- **Connection Refused**: Confirm Portainer URL and network access
- **SSL Errors**: Validate certificate configuration
**Debug Commands:**
```bash
# Verify Portainer connectivity
curl -I "${PORTAINER_URL}/api/status"
# Check current user context
curl -H "X-API-Key: ${PORTAINER_API_KEY}" \
"${PORTAINER_URL}/api/users/me"
```
**Logging:**
- Enable verbose logging for authentication flows
- Log all API calls with timestamps
- Mask sensitive data in logs (passwords, tokens)
## Version Compatibility
**Supported Versions:**
- Portainer Business Edition 2.30.x+
- API Version: 2.31.3
**Breaking Changes:**
- Monitor Portainer release notes for API changes
- Test against new versions before upgrading
- Maintain backward compatibility where possible

134
README.md Normal file
View File

@ -0,0 +1,134 @@
# Portainer Core MCP Server
A Model Context Protocol (MCP) server for Portainer Business Edition authentication and user management.
## Features
- **Authentication & Session Management**: JWT token handling and user authentication
- **User Management**: Create, read, update, and delete users
- **Settings Management**: Retrieve and update Portainer settings
- **Secure Token Handling**: Automatic token refresh and secure storage
- **Error Handling**: Comprehensive error handling with retry logic
- **Circuit Breaker**: Fault tolerance for external API calls
## Installation
```bash
pip install portainer-core-mcp
```
## Configuration
Set the following environment variables:
```bash
PORTAINER_URL=https://your-portainer-instance.com
PORTAINER_API_KEY=your-api-token # Optional, for API key authentication
PORTAINER_USERNAME=admin # For username/password authentication
PORTAINER_PASSWORD=your-password # For username/password authentication
```
## Usage
### As MCP Server
```bash
portainer-core-mcp
```
### Programmatic Usage
```python
from portainer_core.server import PortainerCoreMCPServer
server = PortainerCoreMCPServer()
# Use server instance
```
## Available MCP Tools
- `authenticate` - Login with username/password
- `generate_token` - Generate API token
- `get_current_user` - Get authenticated user info
- `list_users` - List all users
- `create_user` - Create new user
- `update_user` - Update user details
- `delete_user` - Delete user
- `get_settings` - Get Portainer settings
- `update_settings` - Update settings
## Development
### Setup
```bash
# Clone the repository
git clone https://github.com/yourusername/portainer-core-mcp.git
cd portainer-core-mcp
# Install development dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
```
### Testing
```bash
# Run all tests
pytest
# Run with coverage
pytest --cov=src/portainer_core --cov-report=html
# Run only unit tests
pytest -m unit
# Run only integration tests
pytest -m integration
```
### Code Quality
```bash
# Format code
black src tests
isort src tests
# Lint code
flake8 src tests
# Type checking
mypy src
```
## Architecture
The server follows a layered architecture:
- **MCP Server Layer**: Handles MCP protocol communication
- **Service Layer**: Abstracts Portainer API interactions
- **Models Layer**: Defines data structures and validation
- **Utils Layer**: Provides utility functions and helpers
## Security
- All API communications use HTTPS
- JWT tokens are handled securely and never logged
- Input validation on all parameters
- Rate limiting to prevent abuse
- Circuit breaker pattern for fault tolerance
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Ensure all tests pass
6. Submit a pull request
## License
MIT License - see LICENSE file for details.

141
pyproject.toml Normal file
View File

@ -0,0 +1,141 @@
[build-system]
requires = ["setuptools>=64", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "portainer-core-mcp"
version = "0.1.0"
description = "Portainer Core MCP Server - Authentication and User Management"
authors = [
{name = "Your Name", email = "your.email@example.com"}
]
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.8"
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = [
"mcp>=1.0.0",
"httpx>=0.25.0",
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
"structlog>=23.0.0",
"PyJWT>=2.8.0",
"python-dotenv>=1.0.0",
"tenacity>=8.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.0.0",
"pytest-asyncio>=0.21.0",
"pytest-cov>=4.0.0",
"pytest-mock>=3.10.0",
"httpx-mock>=0.10.0",
"black>=23.0.0",
"isort>=5.12.0",
"flake8>=6.0.0",
"mypy>=1.0.0",
"pre-commit>=3.0.0",
]
[project.urls]
Homepage = "https://github.com/yourusername/portainer-core-mcp"
Documentation = "https://github.com/yourusername/portainer-core-mcp#readme"
Repository = "https://github.com/yourusername/portainer-core-mcp"
Issues = "https://github.com/yourusername/portainer-core-mcp/issues"
[project.scripts]
portainer-core-mcp = "portainer_core.server:main"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-dir]
"" = "src"
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
# directories
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| build
| dist
)/
'''
[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88
known_first_party = ["portainer_core"]
[tool.mypy]
python_version = "3.8"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
strict_equality = true
[tool.pytest.ini_options]
minversion = "7.0"
addopts = "-ra -q --strict-markers --strict-config"
testpaths = [
"tests",
]
python_files = [
"test_*.py",
"*_test.py",
]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"integration: marks tests as integration tests",
"unit: marks tests as unit tests",
]
[tool.coverage.run]
source = ["src"]
omit = [
"*/tests/*",
"*/test_*",
"*/conftest.py",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]

115
run_server.py Normal file
View File

@ -0,0 +1,115 @@
#!/usr/bin/env python3
"""
Portainer Core MCP Server - Entry Point Script.
This script serves as the main entry point for the Portainer Core MCP Server,
providing authentication and user management functionality for Portainer Business Edition.
It handles environment setup, configuration validation, and server startup.
Usage:
python run_server.py
Environment Variables:
PORTAINER_URL: The base URL of your Portainer instance (required)
PORTAINER_API_KEY: API key for authentication (option 1)
PORTAINER_USERNAME: Username for authentication (option 2)
PORTAINER_PASSWORD: Password for authentication (option 2)
Example:
export PORTAINER_URL=https://portainer.example.com
export PORTAINER_API_KEY=your-api-key-here
python run_server.py
Architecture:
- Environment setup and validation
- Configuration loading from environment variables
- Async server initialization and startup
- Graceful error handling and shutdown
"""
import os
import sys
import asyncio
from portainer_core.server import main
def setup_environment():
"""
Set up environment variables with fallback values for demo/testing.
This function ensures the required environment variables are set for the server
to start properly. It provides fallback values for demo purposes when specific
configuration is not provided.
Environment Variables Handled:
PORTAINER_URL: Sets default demo URL if not provided
PORTAINER_API_KEY: Sets placeholder if no authentication is configured
PORTAINER_USERNAME/PORTAINER_PASSWORD: Alternative authentication method
Raises:
None: This function doesn't raise exceptions but prints warnings for
missing configuration.
Side Effects:
- Modifies os.environ with default values
- Prints configuration messages to stdout
- Ensures minimum viable configuration for server startup
Note:
In production environments, always provide proper authentication
credentials rather than relying on demo/placeholder values.
"""
# Configure Portainer URL with fallback to demo instance
if not os.environ.get('PORTAINER_URL'):
os.environ['PORTAINER_URL'] = 'https://demo.portainer.io'
print("🔧 Using demo Portainer URL: https://demo.portainer.io")
# Configure authentication - requires either API key or username/password
has_api_key = os.environ.get('PORTAINER_API_KEY')
has_credentials = (os.environ.get('PORTAINER_USERNAME') and
os.environ.get('PORTAINER_PASSWORD'))
if not has_api_key and not has_credentials:
print("⚠️ No authentication configured. Set PORTAINER_API_KEY or PORTAINER_USERNAME/PORTAINER_PASSWORD")
print(" For demo purposes, using placeholder API key")
os.environ['PORTAINER_API_KEY'] = 'demo-api-key'
if __name__ == "__main__":
"""
Main execution block for the Portainer Core MCP Server.
This block handles the complete server lifecycle:
1. Environment setup and configuration validation
2. Async server initialization
3. Graceful error handling and shutdown
4. User-friendly status messages
Exit Codes:
0: Successful shutdown (user interrupt)
1: Server failure or configuration error
Flow:
1. Print startup messages and configuration requirements
2. Setup environment variables with fallbacks
3. Start async server using asyncio.run()
4. Handle interrupts and exceptions gracefully
"""
print("🚀 Starting Portainer Core MCP Server...")
print(" Configuration will be loaded from environment variables")
print(" Set PORTAINER_URL and PORTAINER_API_KEY (or username/password) before running")
print("")
# Initialize environment with fallback values for demo/testing
setup_environment()
try:
# Start the async MCP server - this blocks until shutdown
asyncio.run(main())
except KeyboardInterrupt:
# Handle graceful shutdown on Ctrl+C
print("\n👋 Server stopped by user")
sys.exit(0)
except Exception as e:
# Handle any startup or runtime errors
print(f"\n❌ Server failed: {e}")
sys.exit(1)

View File

@ -0,0 +1,57 @@
"""
Portainer Core MCP Server - Main Package
This package provides Model Context Protocol (MCP) server functionality for Portainer
Business Edition, focusing on authentication and user management operations.
The package implements a complete MCP server that integrates with Portainer's REST API
to provide secure, reliable access to user management, authentication, and settings
configuration through the MCP protocol.
Key Features:
- Authentication service with JWT token management
- User management with full CRUD operations
- Settings management for Portainer configuration
- Comprehensive error handling and logging
- Circuit breaker pattern for fault tolerance
- Health monitoring and service status reporting
Package Structure:
- server: Main MCP server implementation
- config: Configuration management with environment variable support
- models: Pydantic data models for API requests/responses
- services: Business logic services (auth, users, settings)
- utils: Utility modules (errors, logging, tokens)
Usage:
from portainer_core.server import create_server
# Create and run the MCP server
server = create_server()
await server.run()
Dependencies:
- mcp: Model Context Protocol implementation
- pydantic: Data validation and serialization
- httpx: HTTP client for API requests
- structlog: Structured logging
Version: 0.1.0
License: MIT
Compatibility: Python 3.8+
"""
__version__ = "0.1.0"
__author__ = "Portainer MCP Team"
__email__ = "support@portainer.io"
__license__ = "MIT"
__description__ = "MCP server for Portainer Business Edition authentication and user management"
# Package metadata for introspection
__all__ = [
"__version__",
"__author__",
"__email__",
"__license__",
"__description__",
]

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,229 @@
"""
Configuration management for Portainer Core MCP Server.
This module handles environment variable validation and configuration settings.
"""
import os
from typing import Optional
from urllib.parse import urlparse
from pydantic import BaseModel, Field, field_validator, ConfigDict
from pydantic_settings import BaseSettings
class PortainerConfig(BaseSettings):
"""Configuration settings for Portainer Core MCP Server."""
# Portainer connection settings
portainer_url: str = Field(
...,
description="Base URL of the Portainer instance"
)
# Authentication settings (either API key or username/password)
portainer_api_key: Optional[str] = Field(
default=None,
description="Portainer API key for authentication"
)
portainer_username: Optional[str] = Field(
default=None,
description="Portainer username for authentication"
)
portainer_password: Optional[str] = Field(
default=None,
description="Portainer password for authentication"
)
# HTTP client settings
http_timeout: int = Field(
default=30,
description="HTTP request timeout in seconds"
)
max_retries: int = Field(
default=3,
description="Maximum number of retry attempts"
)
retry_delay: float = Field(
default=1.0,
description="Base delay between retries in seconds"
)
# Circuit breaker settings
circuit_breaker_failure_threshold: int = Field(
default=5,
description="Number of failures before circuit breaker opens"
)
circuit_breaker_recovery_timeout: int = Field(
default=60,
description="Time in seconds before attempting recovery"
)
# Token management settings
token_cache_ttl: int = Field(
default=3600,
description="Token cache TTL in seconds"
)
token_refresh_threshold: int = Field(
default=300,
description="Refresh token when expiring within this many seconds"
)
# Logging settings
log_level: str = Field(
default="INFO",
description="Logging level"
)
log_format: str = Field(
default="json",
description="Logging format (json or text)"
)
# Development settings
debug: bool = Field(
default=False,
description="Enable debug mode"
)
model_config = ConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False
)
@field_validator("portainer_url")
@classmethod
def validate_portainer_url(cls, v):
"""Validate Portainer URL format."""
if not v:
raise ValueError("Portainer URL is required")
try:
parsed = urlparse(v)
if not parsed.scheme or not parsed.netloc:
raise ValueError("Invalid URL format")
if parsed.scheme not in ["http", "https"]:
raise ValueError("URL must use http or https scheme")
# Remove trailing slash for consistency
return v.rstrip("/")
except Exception as e:
raise ValueError(f"Invalid Portainer URL: {e}")
@field_validator("log_level")
@classmethod
def validate_log_level(cls, v):
"""Validate logging level."""
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if v.upper() not in valid_levels:
raise ValueError(f"Log level must be one of {valid_levels}")
return v.upper()
@field_validator("log_format")
@classmethod
def validate_log_format(cls, v):
"""Validate logging format."""
valid_formats = ["json", "text"]
if v.lower() not in valid_formats:
raise ValueError(f"Log format must be one of {valid_formats}")
return v.lower()
def validate_auth_config(self) -> None:
"""Validate authentication configuration."""
has_api_key = self.portainer_api_key is not None
has_credentials = (
self.portainer_username is not None
and self.portainer_password is not None
)
if not has_api_key and not has_credentials:
raise ValueError(
"Either PORTAINER_API_KEY or both PORTAINER_USERNAME and "
"PORTAINER_PASSWORD must be provided"
)
if has_api_key and has_credentials:
# Prefer API key if both are provided
self.portainer_username = None
self.portainer_password = None
@property
def api_base_url(self) -> str:
"""Get the base API URL."""
return f"{self.portainer_url}/api"
@property
def use_api_key_auth(self) -> bool:
"""Check if API key authentication should be used."""
return self.portainer_api_key is not None
@property
def use_credentials_auth(self) -> bool:
"""Check if username/password authentication should be used."""
return (
self.portainer_username is not None
and self.portainer_password is not None
)
class ServerConfig(BaseModel):
"""Server-specific configuration settings."""
host: str = Field(
default="localhost",
description="Server host"
)
port: int = Field(
default=8000,
description="Server port"
)
server_name: str = Field(
default="portainer-core-mcp",
description="Server name for identification"
)
version: str = Field(
default="0.1.0",
description="Server version"
)
def get_config() -> PortainerConfig:
"""Get validated configuration instance."""
config = PortainerConfig()
config.validate_auth_config()
return config
def get_server_config() -> ServerConfig:
"""Get server configuration instance."""
return ServerConfig()
# Global configuration instances - lazy initialization
_config = None
_server_config = None
def get_global_config() -> PortainerConfig:
"""Get global configuration instance with lazy initialization."""
global _config
if _config is None:
_config = get_config()
return _config
def get_global_server_config() -> ServerConfig:
"""Get global server configuration instance with lazy initialization."""
global _server_config
if _server_config is None:
_server_config = get_server_config()
return _server_config

View File

@ -0,0 +1,5 @@
"""
Data models for Portainer Core MCP Server.
This module contains Pydantic models for request/response data structures.
"""

View File

@ -0,0 +1,50 @@
"""
Authentication data models for Portainer Core MCP Server.
This module defines Pydantic models for authentication-related requests and responses.
"""
from typing import Optional
from pydantic import BaseModel, Field
class LoginRequest(BaseModel):
"""Request model for user authentication."""
username: str = Field(..., description="Username for authentication")
password: str = Field(..., description="Password for authentication")
class LoginResponse(BaseModel):
"""Response model for successful authentication."""
jwt: str = Field(..., description="JWT authentication token")
class TokenRequest(BaseModel):
"""Request model for API token generation."""
description: str = Field(
default="API Token",
description="Description for the API token"
)
class TokenResponse(BaseModel):
"""Response model for API token generation."""
raw_token: str = Field(..., description="The generated API token")
description: str = Field(..., description="Token description")
class InitializeAdminRequest(BaseModel):
"""Request model for admin user initialization."""
username: str = Field(..., description="Admin username")
password: str = Field(..., description="Admin password")
class UserContextResponse(BaseModel):
"""Response model for current user context."""
id: int = Field(..., description="User ID")
username: str = Field(..., description="Username")
role: int = Field(..., description="User role")
portainer_authorizations: Optional[dict] = Field(
None,
description="User authorization settings"
)

View File

@ -0,0 +1,152 @@
"""
Settings models for Portainer Core MCP Server.
This module defines Pydantic models for settings-related data structures.
"""
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, Field, field_validator, ConfigDict
class SettingsBase(BaseModel):
"""Base settings model."""
pass
class UpdateSettingsRequest(BaseModel):
"""Settings update request model."""
settings: Dict[str, Any] = Field(..., description="Settings to update")
@field_validator("settings")
@classmethod
def validate_settings(cls, v):
if not isinstance(v, dict):
raise ValueError("Settings must be a dictionary")
return v
class SettingsResponse(BaseModel):
"""Settings response model."""
authentication_method: Optional[int] = Field(None, description="Authentication method")
enable_registry_management: Optional[bool] = Field(None, description="Registry management enabled")
edge_agent_checkin_interval: Optional[int] = Field(None, description="Edge Agent checkin interval")
snapshot_interval: Optional[int] = Field(None, description="Snapshot interval")
templates_url: Optional[str] = Field(None, description="Templates URL")
logo_url: Optional[str] = Field(None, description="Logo URL")
model_config = ConfigDict(populate_by_name=True)
class LogoUpdate(BaseModel):
"""Logo update model."""
logo: str = Field(..., description="Base64 encoded logo data")
@field_validator("logo")
@classmethod
def validate_logo(cls, v):
if not v or not v.strip():
raise ValueError("Logo data cannot be empty")
return v.strip()
class AuthenticationSettings(BaseModel):
"""Authentication settings model."""
method: int = Field(..., description="Authentication method (0=disabled, 1=enabled)")
@field_validator("method")
@classmethod
def validate_method(cls, v):
if v not in [0, 1]:
raise ValueError("Authentication method must be 0 (disabled) or 1 (enabled)")
return v
class RegistrySettings(BaseModel):
"""Registry management settings model."""
enabled: bool = Field(..., description="Whether registry management is enabled")
class EdgeAgentSettings(BaseModel):
"""Edge Agent settings model."""
checkin_interval: int = Field(..., description="Checkin interval in seconds")
@field_validator("checkin_interval")
@classmethod
def validate_interval(cls, v):
if v <= 0:
raise ValueError("Checkin interval must be positive")
return v
class SnapshotSettings(BaseModel):
"""Snapshot settings model."""
interval: int = Field(..., description="Snapshot interval in seconds")
@field_validator("interval")
@classmethod
def validate_interval(cls, v):
if v <= 0:
raise ValueError("Snapshot interval must be positive")
return v
class TemplateSettings(BaseModel):
"""Template settings model."""
url: str = Field(..., description="Template file URL")
@field_validator("url")
@classmethod
def validate_url(cls, v):
if not v or not v.strip():
raise ValueError("Template URL cannot be empty")
if not v.startswith(("http://", "https://")):
raise ValueError("Template URL must start with http:// or https://")
return v.strip()
class PortainerSettings(BaseModel):
"""Portainer settings model."""
authentication_method: Optional[int] = Field(None, description="Authentication method")
enable_registry_management: Optional[bool] = Field(None, description="Registry management enabled")
edge_agent_checkin_interval: Optional[int] = Field(None, description="Edge Agent checkin interval")
snapshot_interval: Optional[int] = Field(None, description="Snapshot interval")
templates_url: Optional[str] = Field(None, description="Templates URL")
logo_url: Optional[str] = Field(None, description="Logo URL")
model_config = ConfigDict(populate_by_name=True)
class SystemInfo(BaseModel):
"""System information model."""
version: str = Field(..., description="Portainer version")
database_version: str = Field(..., description="Database version")
file_service_version: str = Field(..., description="File service version")
model_config = ConfigDict(populate_by_name=True)
class SystemVersion(BaseModel):
"""System version model."""
server_version: str = Field(..., description="Server version")
database_version: str = Field(..., description="Database version")
build: str = Field(..., description="Build information")
model_config = ConfigDict(populate_by_name=True)
class PortainerStatus(BaseModel):
"""Portainer status model."""
version: str = Field(..., description="Portainer version")
instance_id: str = Field(..., description="Instance ID")
model_config = ConfigDict(populate_by_name=True)
class PublicSettings(BaseModel):
"""Public settings model (no authentication required)."""
logo_url: Optional[str] = Field(None, description="Logo URL")
external_templates_url: Optional[str] = Field(None, description="External templates URL")
authentication_method: Optional[int] = Field(None, description="Authentication method")
model_config = ConfigDict(populate_by_name=True)

View File

@ -0,0 +1,147 @@
"""
User models for Portainer Core MCP Server.
This module defines Pydantic models for user-related data structures.
"""
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field, field_validator, ConfigDict
class UserBase(BaseModel):
"""Base user model."""
username: str = Field(..., description="Username")
role: int = Field(..., description="User role (1=Admin, 2=User)")
@field_validator("username")
@classmethod
def validate_username(cls, v):
if not v or not v.strip():
raise ValueError("Username cannot be empty")
return v.strip()
@field_validator("role")
@classmethod
def validate_role(cls, v):
if v not in [1, 2]:
raise ValueError("Role must be 1 (Admin) or 2 (User)")
return v
class CreateUserRequest(UserBase):
"""User creation request model."""
password: str = Field(..., description="Password for the new user")
@field_validator("password")
@classmethod
def validate_password(cls, v):
if not v:
raise ValueError("Password cannot be empty")
if len(v) < 8:
raise ValueError("Password must be at least 8 characters long")
return v
class UpdateUserRequest(BaseModel):
"""User update request model."""
username: Optional[str] = Field(None, description="New username")
password: Optional[str] = Field(None, description="New password")
role: Optional[int] = Field(None, description="New role (1=Admin, 2=User)")
@field_validator("username")
@classmethod
def validate_username(cls, v):
if v is not None and (not v or not v.strip()):
raise ValueError("Username cannot be empty")
return v.strip() if v else v
@field_validator("password")
@classmethod
def validate_password(cls, v):
if v is not None and len(v) < 8:
raise ValueError("Password must be at least 8 characters long")
return v
@field_validator("role")
@classmethod
def validate_role(cls, v):
if v is not None and v not in [1, 2]:
raise ValueError("Role must be 1 (Admin) or 2 (User)")
return v
class UserResponse(UserBase):
"""User response model."""
id: int = Field(..., description="User ID", alias="Id")
username: str = Field(..., description="Username", alias="Username")
role: int = Field(..., description="User role", alias="Role")
created_at: Optional[datetime] = Field(None, description="User creation timestamp")
model_config = ConfigDict(populate_by_name=True)
class ChangePasswordRequest(BaseModel):
"""Password change request model."""
password: str = Field(..., description="New password")
@field_validator("password")
@classmethod
def validate_password(cls, v):
if not v:
raise ValueError("Password cannot be empty")
if len(v) < 8:
raise ValueError("Password must be at least 8 characters long")
return v
class UserList(BaseModel):
"""User list model."""
users: List[UserResponse] = Field(..., description="List of users")
total: int = Field(..., description="Total number of users")
class UserMembership(BaseModel):
"""User team membership model."""
id: int = Field(..., description="Membership ID", alias="Id")
user_id: int = Field(..., description="User ID", alias="UserId")
team_id: int = Field(..., description="Team ID", alias="TeamId")
role: int = Field(..., description="Role in team", alias="Role")
model_config = ConfigDict(populate_by_name=True)
class UserToken(BaseModel):
"""User API token model."""
id: int = Field(..., description="Token ID", alias="Id")
description: str = Field(..., description="Token description", alias="Description")
created_at: datetime = Field(..., description="Token creation timestamp", alias="CreationDate")
last_used: Optional[datetime] = Field(None, description="Last usage timestamp", alias="LastUsed")
model_config = ConfigDict(populate_by_name=True)
class UserTokenList(BaseModel):
"""User token list model."""
tokens: List[UserToken] = Field(..., description="List of user tokens")
total: int = Field(..., description="Total number of tokens")
class UserSearch(BaseModel):
"""User search model."""
query: str = Field(..., description="Search query")
@field_validator("query")
@classmethod
def validate_query(cls, v):
if not v or not v.strip():
raise ValueError("Search query cannot be empty")
return v.strip()
class UserSearchResult(BaseModel):
"""User search result model."""
users: List[UserResponse] = Field(..., description="Matching users")
query: str = Field(..., description="Search query used")
total: int = Field(..., description="Total number of matches")

View File

@ -0,0 +1,549 @@
"""
Main MCP server implementation for Portainer Core.
This module implements the MCP server that provides authentication and user
management functionality for Portainer Business Edition.
"""
import asyncio
from typing import Any, Dict, List, Optional, Sequence
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.server.stdio import stdio_server
from mcp.types import (
Resource,
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
from .config import get_global_config, get_global_server_config
from .utils.logging import get_logger, LogContext, set_correlation_id
from .utils.errors import (
PortainerError,
PortainerConfigurationError,
PortainerNetworkError,
)
from .services.auth import AuthService
from .services.users import UserService
from .services.settings import SettingsService
logger = get_logger(__name__)
class PortainerCoreMCPServer:
"""Main MCP server class for Portainer Core functionality."""
def __init__(self):
config = get_global_config()
server_config = get_global_server_config()
self.server = Server(server_config.server_name)
self.setup_handlers()
# Service instances will be initialized later
self.auth_service = None
self.user_service = None
self.settings_service = None
logger.info(
"Portainer Core MCP Server initialized",
server_name=server_config.server_name,
version=server_config.version,
portainer_url=config.portainer_url
)
def setup_handlers(self) -> None:
"""Set up MCP server handlers."""
@self.server.list_resources()
async def handle_list_resources() -> List[Resource]:
"""List available resources."""
return [
Resource(
uri="portainer://users",
name="Users",
description="Portainer users and their details",
mimeType="application/json",
),
Resource(
uri="portainer://settings",
name="Settings",
description="Portainer instance settings",
mimeType="application/json",
),
Resource(
uri="portainer://health",
name="Health",
description="Server health status",
mimeType="application/json",
),
]
@self.server.read_resource()
async def handle_read_resource(uri: str) -> str:
"""Read a specific resource."""
with LogContext():
logger.info("Reading resource", uri=uri)
try:
if uri == "portainer://health":
return await self._get_health_status()
elif uri == "portainer://users":
return await self._get_users_resource()
elif uri == "portainer://settings":
return await self._get_settings_resource()
else:
raise PortainerError(f"Unknown resource: {uri}")
except Exception as e:
logger.error("Error reading resource", uri=uri, error=str(e))
raise
@self.server.list_tools()
async def handle_list_tools() -> List[Tool]:
"""List available tools."""
return [
Tool(
name="authenticate",
description="Authenticate with Portainer using username/password",
inputSchema={
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Username for authentication"
},
"password": {
"type": "string",
"description": "Password for authentication"
}
},
"required": ["username", "password"]
}
),
Tool(
name="generate_token",
description="Generate API token for a user",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "User ID to generate token for"
},
"description": {
"type": "string",
"description": "Token description"
}
},
"required": ["user_id"]
}
),
Tool(
name="get_current_user",
description="Get current authenticated user information",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
),
Tool(
name="list_users",
description="List all users",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
),
Tool(
name="create_user",
description="Create a new user",
inputSchema={
"type": "object",
"properties": {
"username": {
"type": "string",
"description": "Username for the new user"
},
"password": {
"type": "string",
"description": "Password for the new user"
},
"role": {
"type": "integer",
"description": "Role ID for the user (1=Admin, 2=User)"
}
},
"required": ["username", "password", "role"]
}
),
Tool(
name="update_user",
description="Update user information",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "User ID to update"
},
"username": {
"type": "string",
"description": "New username"
},
"password": {
"type": "string",
"description": "New password"
},
"role": {
"type": "integer",
"description": "New role ID"
}
},
"required": ["user_id"]
}
),
Tool(
name="delete_user",
description="Delete a user",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "integer",
"description": "User ID to delete"
}
},
"required": ["user_id"]
}
),
Tool(
name="get_settings",
description="Get Portainer settings",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
),
Tool(
name="update_settings",
description="Update Portainer settings",
inputSchema={
"type": "object",
"properties": {
"settings": {
"type": "object",
"description": "Settings to update"
}
},
"required": ["settings"]
}
),
Tool(
name="health_check",
description="Check server health status",
inputSchema={
"type": "object",
"properties": {},
"additionalProperties": False
}
),
]
@self.server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]:
"""Handle tool calls."""
correlation_id = set_correlation_id()
with LogContext(correlation_id):
logger.info("Tool called", tool_name=name, arguments=arguments)
try:
if name == "health_check":
result = await self._handle_health_check()
elif name == "authenticate":
result = await self._handle_authenticate(arguments)
elif name == "generate_token":
result = await self._handle_generate_token(arguments)
elif name == "get_current_user":
result = await self._handle_get_current_user(arguments)
elif name == "list_users":
result = await self._handle_list_users(arguments)
elif name == "create_user":
result = await self._handle_create_user(arguments)
elif name == "update_user":
result = await self._handle_update_user(arguments)
elif name == "delete_user":
result = await self._handle_delete_user(arguments)
elif name == "get_settings":
result = await self._handle_get_settings(arguments)
elif name == "update_settings":
result = await self._handle_update_settings(arguments)
else:
raise PortainerError(f"Unknown tool: {name}")
logger.info("Tool executed successfully", tool_name=name)
return [TextContent(type="text", text=result)]
except Exception as e:
logger.error("Tool execution failed", tool_name=name, error=str(e))
error_message = f"Error executing {name}: {str(e)}"
return [TextContent(type="text", text=error_message)]
async def _get_health_status(self) -> str:
"""Get server health status."""
try:
config = get_global_config()
server_config = get_global_server_config()
# Initialize services if not already done
await self._ensure_services_initialized()
# Check service health
service_statuses = {}
if self.auth_service:
service_statuses["auth"] = "healthy" if await self.auth_service.health_check() else "unhealthy"
else:
service_statuses["auth"] = "not_initialized"
if self.user_service:
service_statuses["users"] = "healthy" if await self.user_service.health_check() else "unhealthy"
else:
service_statuses["users"] = "not_initialized"
if self.settings_service:
service_statuses["settings"] = "healthy" if await self.settings_service.health_check() else "unhealthy"
else:
service_statuses["settings"] = "not_initialized"
# Overall status
overall_status = "healthy" if all(s == "healthy" for s in service_statuses.values()) else "degraded"
status = {
"status": overall_status,
"server": server_config.server_name,
"version": server_config.version,
"portainer_url": config.portainer_url,
"services": service_statuses
}
return str(status)
except Exception as e:
logger.error("Health check failed", error=str(e))
return f"Health check failed: {str(e)}"
async def _ensure_services_initialized(self) -> None:
"""Ensure all services are initialized."""
if self.auth_service is None:
self.auth_service = AuthService()
await self.auth_service.initialize()
if self.user_service is None:
self.user_service = UserService()
await self.user_service.initialize()
if self.settings_service is None:
self.settings_service = SettingsService()
await self.settings_service.initialize()
async def _get_users_resource(self) -> str:
"""Get users resource."""
try:
await self._ensure_services_initialized()
users = await self.user_service.list_users()
return str(users)
except Exception as e:
logger.error("Failed to get users resource", error=str(e))
return f"Failed to get users: {str(e)}"
async def _get_settings_resource(self) -> str:
"""Get settings resource."""
try:
await self._ensure_services_initialized()
settings = await self.settings_service.get_settings()
return str(settings)
except Exception as e:
logger.error("Failed to get settings resource", error=str(e))
return f"Failed to get settings: {str(e)}"
async def _handle_health_check(self) -> str:
"""Handle health check tool call."""
return await self._get_health_status()
async def _handle_authenticate(self, arguments: Dict[str, Any]) -> str:
"""Handle authentication tool call."""
try:
await self._ensure_services_initialized()
username = arguments.get("username")
password = arguments.get("password")
result = await self.auth_service.login(username, password)
logger.info("Authentication successful", username=username)
return str(result)
except Exception as e:
logger.error("Authentication failed", error=str(e))
return f"Authentication failed: {str(e)}"
async def _handle_generate_token(self, arguments: Dict[str, Any]) -> str:
"""Handle token generation tool call."""
try:
await self._ensure_services_initialized()
user_id = arguments.get("user_id")
description = arguments.get("description", "MCP Server Token")
result = await self.auth_service.generate_api_token(user_id, description)
logger.info("Token generation successful", user_id=user_id)
return str(result)
except Exception as e:
logger.error("Token generation failed", error=str(e))
return f"Token generation failed: {str(e)}"
async def _handle_get_current_user(self, arguments: Dict[str, Any]) -> str:
"""Handle get current user tool call."""
try:
await self._ensure_services_initialized()
result = await self.auth_service.get_current_user()
return str(result)
except Exception as e:
logger.error("Get current user failed", error=str(e))
return f"Get current user failed: {str(e)}"
async def _handle_list_users(self, arguments: Dict[str, Any]) -> str:
"""Handle list users tool call."""
try:
await self._ensure_services_initialized()
result = await self.user_service.list_users()
return str(result)
except Exception as e:
logger.error("List users failed", error=str(e))
return f"List users failed: {str(e)}"
async def _handle_create_user(self, arguments: Dict[str, Any]) -> str:
"""Handle create user tool call."""
try:
await self._ensure_services_initialized()
username = arguments.get("username")
password = arguments.get("password")
role = arguments.get("role")
result = await self.user_service.create_user(username, password, role)
logger.info("User creation successful", username=username)
return str(result)
except Exception as e:
logger.error("User creation failed", error=str(e))
return f"User creation failed: {str(e)}"
async def _handle_update_user(self, arguments: Dict[str, Any]) -> str:
"""Handle update user tool call."""
try:
await self._ensure_services_initialized()
user_id = arguments.get("user_id")
username = arguments.get("username")
password = arguments.get("password")
role = arguments.get("role")
result = await self.user_service.update_user(user_id, username, password, role)
logger.info("User update successful", user_id=user_id)
return str(result)
except Exception as e:
logger.error("User update failed", error=str(e))
return f"User update failed: {str(e)}"
async def _handle_delete_user(self, arguments: Dict[str, Any]) -> str:
"""Handle delete user tool call."""
try:
await self._ensure_services_initialized()
user_id = arguments.get("user_id")
result = await self.user_service.delete_user(user_id)
logger.info("User deletion successful", user_id=user_id)
return f"User {user_id} deleted successfully"
except Exception as e:
logger.error("User deletion failed", error=str(e))
return f"User deletion failed: {str(e)}"
async def _handle_get_settings(self, arguments: Dict[str, Any]) -> str:
"""Handle get settings tool call."""
try:
await self._ensure_services_initialized()
result = await self.settings_service.get_settings()
return str(result)
except Exception as e:
logger.error("Get settings failed", error=str(e))
return f"Get settings failed: {str(e)}"
async def _handle_update_settings(self, arguments: Dict[str, Any]) -> str:
"""Handle update settings tool call."""
try:
await self._ensure_services_initialized()
settings = arguments.get("settings")
result = await self.settings_service.update_settings(settings)
logger.info("Settings update successful")
return str(result)
except Exception as e:
logger.error("Settings update failed", error=str(e))
return f"Settings update failed: {str(e)}"
async def run(self) -> None:
"""Run the MCP server."""
logger.info("Starting Portainer Core MCP Server")
try:
server_config = get_global_server_config()
# Initialize services
await self._ensure_services_initialized()
async with stdio_server() as (read_stream, write_stream):
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=server_config.server_name,
server_version=server_config.version,
)
)
except Exception as e:
logger.error("Server failed to start", error=str(e))
raise
finally:
# Clean up services
await self._cleanup_services()
async def _cleanup_services(self) -> None:
"""Clean up service resources."""
if self.auth_service:
await self.auth_service.cleanup()
if self.user_service:
await self.user_service.cleanup()
if self.settings_service:
await self.settings_service.cleanup()
def create_server() -> PortainerCoreMCPServer:
"""Create and return a configured MCP server instance."""
return PortainerCoreMCPServer()
async def main() -> None:
"""Main entry point for the MCP server."""
try:
server = create_server()
await server.run()
except KeyboardInterrupt:
logger.info("Server stopped by user")
except Exception as e:
logger.error("Server failed", error=str(e))
raise
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,5 @@
"""
Service layer for Portainer Core MCP Server.
This module contains service classes for interacting with the Portainer API.
"""

View File

@ -0,0 +1,230 @@
"""
Authentication service for Portainer Core MCP Server.
This module provides authentication functionality including login, token management,
and session handling for Portainer Business Edition.
"""
import asyncio
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from ..services.base import BaseService
from ..models.auth import LoginRequest, LoginResponse, TokenRequest, TokenResponse
from ..utils.errors import (
PortainerAuthenticationError,
PortainerValidationError,
PortainerTokenExpiredError,
)
from ..utils.tokens import decode_jwt_token, is_token_expired
from ..utils.logging import get_logger
class AuthService(BaseService):
"""Service for authentication operations."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = get_logger(__name__)
self._cached_token = None
self._token_expires_at = None
async def login(self, username: str, password: str) -> LoginResponse:
"""
Authenticate user with username and password.
Args:
username: User's username
password: User's password
Returns:
LoginResponse containing authentication token
Raises:
PortainerAuthenticationError: If authentication fails
"""
request_data = LoginRequest(username=username, password=password)
try:
response = await self._make_request(
method="POST",
endpoint="/auth",
json_data=request_data.model_dump(),
require_auth=False
)
# Store token for future use
self._cached_token = response.get("jwt")
if self._cached_token:
# Calculate expiration time (tokens typically last 8 hours)
self._token_expires_at = datetime.utcnow() + timedelta(hours=8)
return LoginResponse(**response)
except Exception as e:
self.logger.error("Authentication failed", username=username, error=str(e))
raise PortainerAuthenticationError(f"Authentication failed: {str(e)}")
async def logout(self) -> bool:
"""
Logout the current user session.
Returns:
True if logout successful
"""
try:
await self._make_request(
method="DELETE",
endpoint="/auth"
)
# Clear cached token
self._cached_token = None
self._token_expires_at = None
return True
except Exception as e:
self.logger.error("Logout failed", error=str(e))
return False
async def generate_api_token(self, user_id: int, description: str = "API Token") -> TokenResponse:
"""
Generate an API token for a user.
Args:
user_id: ID of the user
description: Token description
Returns:
TokenResponse containing the new API token
"""
request_data = TokenRequest(description=description)
try:
response = await self._make_request(
method="POST",
endpoint=f"/users/{user_id}/tokens",
json_data=request_data.model_dump()
)
return TokenResponse(**response)
except Exception as e:
self.logger.error("Token generation failed", user_id=user_id, error=str(e))
raise PortainerValidationError(f"Token generation failed: {str(e)}")
async def validate_token(self, token: str) -> bool:
"""
Validate an authentication token.
Args:
token: JWT token to validate
Returns:
True if token is valid
"""
try:
if is_token_expired(token):
return False
# Try to use the token for a simple API call
headers = {"Authorization": f"Bearer {token}"}
await self._make_request(
method="GET",
endpoint="/status",
headers=headers,
require_auth=False
)
return True
except Exception:
return False
async def refresh_token_if_needed(self) -> Optional[str]:
"""
Refresh the cached token if it's expired or about to expire.
Returns:
New token if refreshed, None if no refresh needed
"""
if not self._cached_token or not self._token_expires_at:
return None
# Check if token expires within 1 hour
expires_soon = datetime.utcnow() + timedelta(hours=1)
if self._token_expires_at > expires_soon:
return None
# Re-authenticate if using credentials
if self.config.use_credentials_auth:
try:
response = await self.login(
self.config.portainer_username,
self.config.portainer_password
)
return response.jwt
except Exception as e:
self.logger.error("Token refresh failed", error=str(e))
raise PortainerTokenExpiredError("Failed to refresh expired token")
return None
async def get_current_user(self) -> Dict[str, Any]:
"""
Get information about the currently authenticated user.
Returns:
User information dictionary
"""
try:
response = await self._make_request(
method="GET",
endpoint="/users/me"
)
return response
except Exception as e:
self.logger.error("Failed to get current user", error=str(e))
raise PortainerAuthenticationError(f"Failed to get current user: {str(e)}")
def get_cached_token(self) -> Optional[str]:
"""
Get the currently cached authentication token.
Returns:
Cached token if available and not expired
"""
if not self._cached_token or not self._token_expires_at:
return None
if datetime.utcnow() >= self._token_expires_at:
# Token expired, clear cache
self._cached_token = None
self._token_expires_at = None
return None
return self._cached_token
async def health_check(self) -> bool:
"""
Check if the authentication service is healthy.
Returns:
True if service is healthy
"""
try:
# Try to access the status endpoint
await self._make_request(
method="GET",
endpoint="/status",
require_auth=False
)
return True
except Exception as e:
self.logger.error("Auth service health check failed", error=str(e))
return False

View File

@ -0,0 +1,333 @@
"""
Base service class for Portainer API interactions.
This module provides a base service class with HTTP client, retry logic,
circuit breaker pattern, and error handling.
"""
import asyncio
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Optional, Union
import httpx
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log,
)
from ..config import get_global_config
from ..utils.errors import (
PortainerError,
PortainerNetworkError,
PortainerTimeoutError,
PortainerCircuitBreakerError,
map_http_error,
is_retryable_error,
should_refresh_token,
)
from ..utils.logging import get_logger
logger = get_logger(__name__)
class CircuitBreakerState(Enum):
"""Circuit breaker states."""
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker implementation for fault tolerance."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
name: str = "default"
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.name = name
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitBreakerState.CLOSED
def can_execute(self) -> bool:
"""Check if the circuit breaker allows execution."""
if self.state == CircuitBreakerState.CLOSED:
return True
if self.state == CircuitBreakerState.OPEN:
if self.last_failure_time and \
time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitBreakerState.HALF_OPEN
logger.info(
"Circuit breaker transitioning to half-open",
name=self.name,
failure_count=self.failure_count
)
return True
return False
# HALF_OPEN state
return True
def record_success(self) -> None:
"""Record a successful operation."""
self.failure_count = 0
self.last_failure_time = None
if self.state == CircuitBreakerState.HALF_OPEN:
self.state = CircuitBreakerState.CLOSED
logger.info(
"Circuit breaker closed after successful operation",
name=self.name
)
def record_failure(self) -> None:
"""Record a failed operation."""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitBreakerState.OPEN
logger.warning(
"Circuit breaker opened due to failures",
name=self.name,
failure_count=self.failure_count
)
class BaseService(ABC):
"""Base class for Portainer API services."""
def __init__(self, name: str):
self.name = name
self.client = None
self.config = get_global_config()
self.circuit_breaker = CircuitBreaker(
failure_threshold=self.config.circuit_breaker_failure_threshold,
recovery_timeout=self.config.circuit_breaker_recovery_timeout,
name=name
)
self.logger = get_logger(f"{__name__}.{name}")
self._auth_token = None
self._token_expiry = None
async def __aenter__(self):
"""Async context manager entry."""
await self.initialize()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.cleanup()
async def initialize(self) -> None:
"""Initialize the service and HTTP client."""
if self.client is None:
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(self.config.http_timeout),
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10),
verify=True, # Always verify SSL certificates
)
self.logger.info("HTTP client initialized", service=self.name)
async def cleanup(self) -> None:
"""Clean up resources."""
if self.client:
await self.client.aclose()
self.client = None
self.logger.info("HTTP client closed", service=self.name)
def _get_base_headers(self) -> Dict[str, str]:
"""Get base headers for API requests."""
headers = {
"Content-Type": "application/json",
"User-Agent": f"portainer-core-mcp/{self.config.portainer_url}",
}
if self._auth_token:
headers["Authorization"] = f"Bearer {self._auth_token}"
elif self.config.portainer_api_key:
headers["X-API-Key"] = self.config.portainer_api_key
return headers
def _build_url(self, endpoint: str) -> str:
"""Build full URL for API endpoint."""
base_url = self.config.api_base_url
if endpoint.startswith("/"):
endpoint = endpoint[1:]
return f"{base_url}/{endpoint}"
async def _execute_request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
retry_on_auth_failure: bool = True,
) -> httpx.Response:
"""Execute HTTP request with circuit breaker and error handling."""
# Check circuit breaker
if not self.circuit_breaker.can_execute():
raise PortainerCircuitBreakerError(
f"Circuit breaker is open for {self.name} service"
)
# Ensure client is initialized
if self.client is None:
await self.initialize()
# Build request
url = self._build_url(endpoint)
request_headers = self._get_base_headers()
if headers:
request_headers.update(headers)
self.logger.debug(
"Making HTTP request",
method=method,
url=url,
params=params,
service=self.name
)
try:
response = await self.client.request(
method=method,
url=url,
json=data,
params=params,
headers=request_headers,
)
# Check for authentication errors
if response.status_code == 401 and retry_on_auth_failure:
self.logger.warning("Authentication failed, attempting token refresh")
if await self._refresh_token():
# Retry once with new token
return await self._execute_request(
method, endpoint, data, params, headers, retry_on_auth_failure=False
)
# Handle HTTP errors
if response.status_code >= 400:
error_message = f"HTTP {response.status_code}"
try:
error_data = response.json()
if isinstance(error_data, dict):
error_message = error_data.get("message", error_message)
except:
error_message = response.text or error_message
error = map_http_error(response.status_code, error_message)
self.circuit_breaker.record_failure()
raise error
# Record success
self.circuit_breaker.record_success()
self.logger.debug(
"HTTP request successful",
method=method,
url=url,
status_code=response.status_code,
service=self.name
)
return response
except httpx.TimeoutException as e:
self.circuit_breaker.record_failure()
raise PortainerTimeoutError(f"Request timeout: {str(e)}")
except httpx.NetworkError as e:
self.circuit_breaker.record_failure()
raise PortainerNetworkError(f"Network error: {str(e)}")
except PortainerError:
# Re-raise Portainer errors as-is
raise
except Exception as e:
self.circuit_breaker.record_failure()
raise PortainerError(f"Unexpected error: {str(e)}")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((
PortainerNetworkError,
PortainerTimeoutError,
httpx.TimeoutException,
httpx.NetworkError,
)),
before_sleep=before_sleep_log(logger, "WARNING"),
)
async def _make_request(
self,
method: str,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> httpx.Response:
"""Make HTTP request with retry logic."""
return await self._execute_request(method, endpoint, data, params, headers)
async def get(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> httpx.Response:
"""Make GET request."""
return await self._make_request("GET", endpoint, params=params, headers=headers)
async def post(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> httpx.Response:
"""Make POST request."""
return await self._make_request("POST", endpoint, data=data, params=params, headers=headers)
async def put(
self,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> httpx.Response:
"""Make PUT request."""
return await self._make_request("PUT", endpoint, data=data, params=params, headers=headers)
async def delete(
self,
endpoint: str,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
) -> httpx.Response:
"""Make DELETE request."""
return await self._make_request("DELETE", endpoint, params=params, headers=headers)
async def _refresh_token(self) -> bool:
"""Refresh authentication token if needed."""
# TODO: Implement token refresh logic
# This will be implemented in the auth service
self.logger.warning("Token refresh not implemented yet")
return False
@abstractmethod
async def health_check(self) -> bool:
"""Check if the service is healthy."""
pass

View File

@ -0,0 +1,251 @@
"""
Settings management service for Portainer Core MCP Server.
This module provides settings management functionality for Portainer Business Edition
configuration and system settings.
"""
from typing import Dict, Any, Optional
from ..services.base import BaseService
from ..models.settings import SettingsResponse, UpdateSettingsRequest
from ..utils.errors import (
PortainerValidationError,
PortainerAuthorizationError,
)
from ..utils.logging import get_logger
class SettingsService(BaseService):
"""Service for settings management operations."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = get_logger(__name__)
async def get_settings(self) -> SettingsResponse:
"""
Get current Portainer settings.
Returns:
SettingsResponse object with current settings
"""
try:
response = await self._make_request(
method="GET",
endpoint="/settings"
)
return SettingsResponse(**response)
except Exception as e:
self.logger.error("Failed to get settings", error=str(e))
raise PortainerValidationError(f"Failed to get settings: {str(e)}")
async def get_public_settings(self) -> Dict[str, Any]:
"""
Get public settings (accessible without authentication).
Returns:
Dictionary of public settings
"""
try:
response = await self._make_request(
method="GET",
endpoint="/settings/public",
require_auth=False
)
return response
except Exception as e:
self.logger.error("Failed to get public settings", error=str(e))
raise PortainerValidationError(f"Failed to get public settings: {str(e)}")
async def update_settings(self, settings_data: UpdateSettingsRequest) -> SettingsResponse:
"""
Update Portainer settings.
Args:
settings_data: Settings update data
Returns:
SettingsResponse object with updated settings
"""
try:
response = await self._make_request(
method="PUT",
endpoint="/settings",
json_data=settings_data.model_dump(exclude_unset=True)
)
self.logger.info("Settings updated successfully")
return SettingsResponse(**response)
except Exception as e:
self.logger.error("Failed to update settings", error=str(e))
raise PortainerAuthorizationError(f"Failed to update settings: {str(e)}")
async def get_ldap_settings(self) -> Dict[str, Any]:
"""
Get LDAP authentication settings.
Returns:
Dictionary of LDAP settings
"""
try:
response = await self._make_request(
method="GET",
endpoint="/settings/authentication/ldap"
)
return response
except Exception as e:
self.logger.error("Failed to get LDAP settings", error=str(e))
raise PortainerValidationError(f"Failed to get LDAP settings: {str(e)}")
async def update_ldap_settings(self, ldap_data: Dict[str, Any]) -> bool:
"""
Update LDAP authentication settings.
Args:
ldap_data: LDAP configuration data
Returns:
True if update successful
"""
try:
await self._make_request(
method="PUT",
endpoint="/settings/authentication/ldap",
json_data=ldap_data
)
self.logger.info("LDAP settings updated successfully")
return True
except Exception as e:
self.logger.error("Failed to update LDAP settings", error=str(e))
raise PortainerAuthorizationError(f"Failed to update LDAP settings: {str(e)}")
async def test_ldap_connectivity(self) -> Dict[str, Any]:
"""
Test LDAP server connectivity.
Returns:
Dictionary with connectivity test results
"""
try:
response = await self._make_request(
method="POST",
endpoint="/settings/authentication/ldap/test"
)
return response
except Exception as e:
self.logger.error("LDAP connectivity test failed", error=str(e))
raise PortainerValidationError(f"LDAP connectivity test failed: {str(e)}")
async def get_edge_settings(self) -> Dict[str, Any]:
"""
Get Edge computing settings.
Returns:
Dictionary of Edge settings
"""
try:
response = await self._make_request(
method="GET",
endpoint="/settings/edge"
)
return response
except Exception as e:
self.logger.error("Failed to get Edge settings", error=str(e))
raise PortainerValidationError(f"Failed to get Edge settings: {str(e)}")
async def update_edge_settings(self, edge_data: Dict[str, Any]) -> bool:
"""
Update Edge computing settings.
Args:
edge_data: Edge configuration data
Returns:
True if update successful
"""
try:
await self._make_request(
method="PUT",
endpoint="/settings/edge",
json_data=edge_data
)
self.logger.info("Edge settings updated successfully")
return True
except Exception as e:
self.logger.error("Failed to update Edge settings", error=str(e))
raise PortainerAuthorizationError(f"Failed to update Edge settings: {str(e)}")
async def get_ssl_settings(self) -> Dict[str, Any]:
"""
Get SSL/TLS settings.
Returns:
Dictionary of SSL settings
"""
try:
response = await self._make_request(
method="GET",
endpoint="/settings/ssl"
)
return response
except Exception as e:
self.logger.error("Failed to get SSL settings", error=str(e))
raise PortainerValidationError(f"Failed to get SSL settings: {str(e)}")
async def update_ssl_settings(self, ssl_data: Dict[str, Any]) -> bool:
"""
Update SSL/TLS settings.
Args:
ssl_data: SSL configuration data
Returns:
True if update successful
"""
try:
await self._make_request(
method="PUT",
endpoint="/settings/ssl",
json_data=ssl_data
)
self.logger.info("SSL settings updated successfully")
return True
except Exception as e:
self.logger.error("Failed to update SSL settings", error=str(e))
raise PortainerAuthorizationError(f"Failed to update SSL settings: {str(e)}")
async def health_check(self) -> bool:
"""
Check if the settings service is healthy.
Returns:
True if service is healthy
"""
try:
# Try to get public settings (no auth required)
await self.get_public_settings()
return True
except Exception as e:
self.logger.error("Settings service health check failed", error=str(e))
return False

View File

@ -0,0 +1,270 @@
"""
User management service for Portainer Core MCP Server.
This module provides user management functionality including creating, updating,
and managing users in Portainer Business Edition.
"""
from typing import Dict, List, Any, Optional
from ..services.base import BaseService
from ..models.users import (
CreateUserRequest,
UpdateUserRequest,
UserResponse,
ChangePasswordRequest
)
from ..utils.errors import (
PortainerValidationError,
PortainerNotFoundError,
PortainerAuthorizationError,
)
from ..utils.logging import get_logger
class UserService(BaseService):
"""Service for user management operations."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = get_logger(__name__)
async def list_users(self) -> List[UserResponse]:
"""
List all users in the Portainer instance.
Returns:
List of UserResponse objects
"""
try:
response = await self._make_request(
method="GET",
endpoint="/users"
)
return [UserResponse(**user) for user in response]
except Exception as e:
self.logger.error("Failed to list users", error=str(e))
raise PortainerValidationError(f"Failed to list users: {str(e)}")
async def get_user(self, user_id: int) -> UserResponse:
"""
Get details of a specific user.
Args:
user_id: ID of the user to retrieve
Returns:
UserResponse object
Raises:
PortainerNotFoundError: If user doesn't exist
"""
try:
response = await self._make_request(
method="GET",
endpoint=f"/users/{user_id}"
)
return UserResponse(**response)
except Exception as e:
self.logger.error("Failed to get user", user_id=user_id, error=str(e))
if "404" in str(e):
raise PortainerNotFoundError(f"User {user_id} not found")
raise PortainerValidationError(f"Failed to get user: {str(e)}")
async def create_user(self, user_data: CreateUserRequest) -> UserResponse:
"""
Create a new user.
Args:
user_data: User creation data
Returns:
UserResponse object for the created user
"""
try:
response = await self._make_request(
method="POST",
endpoint="/users",
json_data=user_data.model_dump()
)
self.logger.info("User created successfully", username=user_data.username)
return UserResponse(**response)
except Exception as e:
self.logger.error("Failed to create user", username=user_data.username, error=str(e))
raise PortainerValidationError(f"Failed to create user: {str(e)}")
async def update_user(self, user_id: int, user_data: UpdateUserRequest) -> UserResponse:
"""
Update an existing user.
Args:
user_id: ID of the user to update
user_data: User update data
Returns:
UserResponse object for the updated user
"""
try:
response = await self._make_request(
method="PUT",
endpoint=f"/users/{user_id}",
json_data=user_data.model_dump(exclude_unset=True)
)
self.logger.info("User updated successfully", user_id=user_id)
return UserResponse(**response)
except Exception as e:
self.logger.error("Failed to update user", user_id=user_id, error=str(e))
if "404" in str(e):
raise PortainerNotFoundError(f"User {user_id} not found")
raise PortainerValidationError(f"Failed to update user: {str(e)}")
async def delete_user(self, user_id: int) -> bool:
"""
Delete a user.
Args:
user_id: ID of the user to delete
Returns:
True if deletion successful
"""
try:
await self._make_request(
method="DELETE",
endpoint=f"/users/{user_id}"
)
self.logger.info("User deleted successfully", user_id=user_id)
return True
except Exception as e:
self.logger.error("Failed to delete user", user_id=user_id, error=str(e))
if "404" in str(e):
raise PortainerNotFoundError(f"User {user_id} not found")
raise PortainerValidationError(f"Failed to delete user: {str(e)}")
async def change_password(self, user_id: int, password_data: ChangePasswordRequest) -> bool:
"""
Change a user's password.
Args:
user_id: ID of the user
password_data: Password change data
Returns:
True if password change successful
"""
try:
await self._make_request(
method="PUT",
endpoint=f"/users/{user_id}/passwd",
json_data=password_data.model_dump()
)
self.logger.info("Password changed successfully", user_id=user_id)
return True
except Exception as e:
self.logger.error("Failed to change password", user_id=user_id, error=str(e))
if "404" in str(e):
raise PortainerNotFoundError(f"User {user_id} not found")
raise PortainerAuthorizationError(f"Failed to change password: {str(e)}")
async def get_user_memberships(self, user_id: int) -> List[Dict[str, Any]]:
"""
Get team memberships for a user.
Args:
user_id: ID of the user
Returns:
List of team membership data
"""
try:
response = await self._make_request(
method="GET",
endpoint=f"/users/{user_id}/memberships"
)
return response
except Exception as e:
self.logger.error("Failed to get user memberships", user_id=user_id, error=str(e))
if "404" in str(e):
raise PortainerNotFoundError(f"User {user_id} not found")
raise PortainerValidationError(f"Failed to get user memberships: {str(e)}")
async def check_admin_user_exists(self) -> bool:
"""
Check if an admin user exists in the system.
Returns:
True if admin user exists
"""
try:
response = await self._make_request(
method="GET",
endpoint="/users/admin/check",
require_auth=False
)
return response.get("exists", False)
except Exception as e:
self.logger.error("Failed to check admin user", error=str(e))
return False
async def initialize_admin_user(self, username: str, password: str) -> UserResponse:
"""
Initialize the admin user (first-time setup).
Args:
username: Admin username
password: Admin password
Returns:
UserResponse for the created admin user
"""
try:
data = {
"Username": username,
"Password": password
}
response = await self._make_request(
method="POST",
endpoint="/users/admin/init",
json_data=data,
require_auth=False
)
self.logger.info("Admin user initialized", username=username)
return UserResponse(**response)
except Exception as e:
self.logger.error("Failed to initialize admin user", username=username, error=str(e))
raise PortainerValidationError(f"Failed to initialize admin user: {str(e)}")
async def health_check(self) -> bool:
"""
Check if the user service is healthy.
Returns:
True if service is healthy
"""
try:
# Try to list users (requires authentication)
await self.list_users()
return True
except Exception as e:
self.logger.error("User service health check failed", error=str(e))
return False

View File

@ -0,0 +1,5 @@
"""
Utility modules for Portainer Core MCP Server.
This module contains utility functions and helper classes.
"""

View File

@ -0,0 +1,174 @@
"""
Error handling utilities for Portainer Core MCP Server.
This module provides custom exceptions and error handling utilities.
"""
from typing import Any, Dict, Optional
class PortainerError(Exception):
"""Base exception for Portainer-related errors."""
def __init__(
self,
message: str,
status_code: Optional[int] = None,
details: Optional[Dict[str, Any]] = None
):
super().__init__(message)
self.message = message
self.status_code = status_code
self.details = details or {}
def __str__(self) -> str:
if self.status_code:
return f"[{self.status_code}] {self.message}"
return self.message
class PortainerAuthenticationError(PortainerError):
"""Authentication-related errors."""
def __init__(self, message: str = "Authentication failed", **kwargs):
super().__init__(message, status_code=401, **kwargs)
class PortainerAuthorizationError(PortainerError):
"""Authorization-related errors."""
def __init__(self, message: str = "Insufficient permissions", **kwargs):
super().__init__(message, status_code=403, **kwargs)
class PortainerNotFoundError(PortainerError):
"""Resource not found errors."""
def __init__(self, message: str = "Resource not found", **kwargs):
super().__init__(message, status_code=404, **kwargs)
class PortainerValidationError(PortainerError):
"""Request validation errors."""
def __init__(self, message: str = "Invalid request data", **kwargs):
super().__init__(message, status_code=400, **kwargs)
class PortainerConflictError(PortainerError):
"""Resource conflict errors."""
def __init__(self, message: str = "Resource conflict", **kwargs):
super().__init__(message, status_code=409, **kwargs)
class PortainerServerError(PortainerError):
"""Server-side errors."""
def __init__(self, message: str = "Internal server error", **kwargs):
super().__init__(message, status_code=500, **kwargs)
class PortainerNetworkError(PortainerError):
"""Network-related errors."""
def __init__(self, message: str = "Network error", **kwargs):
super().__init__(message, **kwargs)
class PortainerTimeoutError(PortainerError):
"""Request timeout errors."""
def __init__(self, message: str = "Request timeout", **kwargs):
super().__init__(message, **kwargs)
class PortainerRateLimitError(PortainerError):
"""Rate limiting errors."""
def __init__(self, message: str = "Rate limit exceeded", **kwargs):
super().__init__(message, status_code=429, **kwargs)
class PortainerCircuitBreakerError(PortainerError):
"""Circuit breaker errors."""
def __init__(self, message: str = "Circuit breaker is open", **kwargs):
super().__init__(message, **kwargs)
class PortainerTokenExpiredError(PortainerAuthenticationError):
"""Token expiration errors."""
def __init__(self, message: str = "Token has expired", **kwargs):
super().__init__(message, **kwargs)
class PortainerConfigurationError(PortainerError):
"""Configuration-related errors."""
def __init__(self, message: str = "Configuration error", **kwargs):
super().__init__(message, **kwargs)
def map_http_error(status_code: int, message: str, details: Optional[Dict[str, Any]] = None) -> PortainerError:
"""Map HTTP status codes to appropriate exception types."""
error_map = {
400: PortainerValidationError,
401: PortainerAuthenticationError,
403: PortainerAuthorizationError,
404: PortainerNotFoundError,
409: PortainerConflictError,
429: PortainerRateLimitError,
500: PortainerServerError,
502: PortainerServerError,
503: PortainerServerError,
504: PortainerTimeoutError,
}
error_class = error_map.get(status_code, PortainerError)
# For known error classes, pass only message and details to avoid status_code conflicts
if error_class in error_map.values():
return error_class(message, details=details)
else:
# For generic PortainerError, include status_code
return error_class(message, status_code=status_code, details=details)
def is_retryable_error(error: Exception) -> bool:
"""Check if an error is retryable."""
if isinstance(error, PortainerError):
# Don't retry authentication, authorization, validation, or not found errors
if isinstance(error, (
PortainerAuthenticationError,
PortainerAuthorizationError,
PortainerValidationError,
PortainerNotFoundError,
PortainerConflictError
)):
return False
# Don't retry client errors (4xx) except for rate limiting
if error.status_code and 400 <= error.status_code < 500:
return isinstance(error, PortainerRateLimitError)
# Retry server errors (5xx) and network errors
return True
# Retry network-related exceptions
if isinstance(error, (ConnectionError, TimeoutError)):
return True
return False
def should_refresh_token(error: Exception) -> bool:
"""Check if an error indicates token refresh is needed."""
if isinstance(error, PortainerAuthenticationError):
return True
if isinstance(error, PortainerError) and error.status_code == 401:
return True
return False

View File

@ -0,0 +1,164 @@
"""
Logging utilities for Portainer Core MCP Server.
This module provides structured logging configuration and utilities.
"""
import logging
import sys
import uuid
from contextvars import ContextVar
from typing import Any, Dict, Optional
import structlog
from structlog.typing import EventDict
from ..config import get_global_config
# Context variable for correlation IDs
correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")
def add_correlation_id(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
"""Add correlation ID to log entries."""
correlation_id = correlation_id_var.get()
if correlation_id:
event_dict["correlation_id"] = correlation_id
return event_dict
def add_service_info(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
"""Add service information to log entries."""
event_dict["service"] = "portainer-core-mcp"
event_dict["version"] = "0.1.0"
return event_dict
def mask_sensitive_data(logger: Any, method_name: str, event_dict: EventDict) -> EventDict:
"""Mask sensitive data in log entries."""
sensitive_keys = {
"password", "token", "api_key", "secret", "auth", "authorization",
"x-api-key", "bearer", "jwt", "credential", "credentials"
}
def mask_dict(data: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively mask sensitive data in dictionaries."""
masked = {}
for key, value in data.items():
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_keys):
masked[key] = "***MASKED***"
elif isinstance(value, dict):
masked[key] = mask_dict(value)
elif isinstance(value, list):
masked[key] = [
mask_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
masked[key] = value
return masked
# Mask sensitive data in the event dict
for key, value in event_dict.items():
if isinstance(value, dict):
event_dict[key] = mask_dict(value)
elif isinstance(value, str):
key_lower = key.lower()
if any(sensitive in key_lower for sensitive in sensitive_keys):
event_dict[key] = "***MASKED***"
return event_dict
def configure_logging() -> None:
"""Configure structured logging."""
try:
config = get_global_config()
log_level = config.log_level
log_format = config.log_format
except Exception:
# Fallback to defaults if config is not available
log_level = "INFO"
log_format = "json"
processors = [
structlog.contextvars.merge_contextvars,
add_correlation_id,
add_service_info,
mask_sensitive_data,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
if log_format == "json":
processors.append(structlog.processors.JSONRenderer())
else:
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level)
),
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Configure standard library logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, log_level),
)
def get_logger(name: Optional[str] = None) -> structlog.BoundLogger:
"""Get a configured logger instance."""
return structlog.get_logger(name)
def set_correlation_id(correlation_id: Optional[str] = None) -> str:
"""Set correlation ID for request tracing."""
if correlation_id is None:
correlation_id = str(uuid.uuid4())
correlation_id_var.set(correlation_id)
return correlation_id
def get_correlation_id() -> str:
"""Get current correlation ID."""
return correlation_id_var.get()
def clear_correlation_id() -> None:
"""Clear correlation ID."""
correlation_id_var.set("")
class LogContext:
"""Context manager for setting correlation ID."""
def __init__(self, correlation_id: Optional[str] = None):
self.correlation_id = correlation_id
self.previous_id = None
def __enter__(self) -> str:
self.previous_id = get_correlation_id()
return set_correlation_id(self.correlation_id)
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self.previous_id:
correlation_id_var.set(self.previous_id)
else:
clear_correlation_id()
# Initialize logging configuration
configure_logging()
# Create module logger
logger = get_logger(__name__)

View File

@ -0,0 +1,231 @@
"""
JWT token utilities for Portainer Core MCP Server.
This module provides utilities for handling JWT tokens including validation,
expiration checking, and token management.
"""
import json
import base64
from datetime import datetime, timezone
from typing import Dict, Any, Optional
from ..utils.logging import get_logger
logger = get_logger(__name__)
def decode_jwt_token(token: str) -> Optional[Dict[str, Any]]:
"""
Decode a JWT token without verification.
Note: This function only decodes the token payload for inspection.
It does not verify the token signature.
Args:
token: JWT token to decode
Returns:
Decoded token payload or None if invalid
"""
try:
# Remove Bearer prefix if present
if token.startswith("Bearer "):
token = token[7:]
# JWT tokens have 3 parts separated by dots
parts = token.split(".")
if len(parts) != 3:
logger.warning("Invalid JWT format: token does not have 3 parts")
return None
# Decode the payload (second part)
payload = parts[1]
# Add padding if needed (JWT base64 encoding may not include padding)
missing_padding = len(payload) % 4
if missing_padding:
payload += "=" * (4 - missing_padding)
# Decode from base64
decoded_bytes = base64.urlsafe_b64decode(payload)
decoded_payload = json.loads(decoded_bytes.decode("utf-8"))
return decoded_payload
except Exception as e:
logger.error("Failed to decode JWT token", error=str(e))
return None
def is_token_expired(token: str) -> bool:
"""
Check if a JWT token is expired.
Args:
token: JWT token to check
Returns:
True if token is expired or invalid
"""
try:
payload = decode_jwt_token(token)
if not payload:
return True
# Check expiration time (exp claim)
exp = payload.get("exp")
if not exp:
# No expiration claim, assume token is valid
logger.warning("JWT token has no expiration claim")
return False
# Convert exp (timestamp) to datetime
exp_datetime = datetime.fromtimestamp(exp, tz=timezone.utc)
current_datetime = datetime.now(tz=timezone.utc)
is_expired = current_datetime >= exp_datetime
if is_expired:
logger.info("JWT token is expired",
exp=exp_datetime.isoformat(),
now=current_datetime.isoformat())
return is_expired
except Exception as e:
logger.error("Failed to check token expiration", error=str(e))
return True
def get_token_claims(token: str) -> Dict[str, Any]:
"""
Get all claims from a JWT token.
Args:
token: JWT token to parse
Returns:
Dictionary of token claims
"""
payload = decode_jwt_token(token)
return payload or {}
def get_token_expiration(token: str) -> Optional[datetime]:
"""
Get the expiration time of a JWT token.
Args:
token: JWT token to check
Returns:
Expiration datetime or None if not available
"""
try:
payload = decode_jwt_token(token)
if not payload:
return None
exp = payload.get("exp")
if not exp:
return None
return datetime.fromtimestamp(exp, tz=timezone.utc)
except Exception as e:
logger.error("Failed to get token expiration", error=str(e))
return None
def get_token_subject(token: str) -> Optional[str]:
"""
Get the subject (user ID) from a JWT token.
Args:
token: JWT token to parse
Returns:
Subject claim value or None
"""
payload = decode_jwt_token(token)
if payload:
return payload.get("sub")
return None
def get_token_issuer(token: str) -> Optional[str]:
"""
Get the issuer from a JWT token.
Args:
token: JWT token to parse
Returns:
Issuer claim value or None
"""
payload = decode_jwt_token(token)
if payload:
return payload.get("iss")
return None
def is_token_valid_for_duration(token: str, min_duration_seconds: int = 300) -> bool:
"""
Check if a token is valid for at least the specified duration.
Args:
token: JWT token to check
min_duration_seconds: Minimum remaining validity in seconds (default: 5 minutes)
Returns:
True if token is valid for at least the specified duration
"""
try:
exp_time = get_token_expiration(token)
if not exp_time:
# No expiration time, assume it's valid
return True
current_time = datetime.now(tz=timezone.utc)
remaining_seconds = (exp_time - current_time).total_seconds()
return remaining_seconds >= min_duration_seconds
except Exception as e:
logger.error("Failed to check token validity duration", error=str(e))
return False
def format_token_info(token: str) -> str:
"""
Format token information for logging/debugging.
Args:
token: JWT token to format
Returns:
Formatted string with token information
"""
try:
payload = decode_jwt_token(token)
if not payload:
return "Invalid token"
exp_time = get_token_expiration(token)
subject = get_token_subject(token)
issuer = get_token_issuer(token)
info_parts = []
if subject:
info_parts.append(f"sub: {subject}")
if issuer:
info_parts.append(f"iss: {issuer}")
if exp_time:
info_parts.append(f"exp: {exp_time.isoformat()}")
return " | ".join(info_parts) if info_parts else "No standard claims found"
except Exception as e:
return f"Error formatting token info: {str(e)}"

View File

@ -0,0 +1,175 @@
Metadata-Version: 2.4
Name: portainer-core-mcp
Version: 0.1.0
Summary: Portainer Core MCP Server - Authentication and User Management
Author-email: Your Name <your.email@example.com>
License: MIT
Project-URL: Homepage, https://github.com/yourusername/portainer-core-mcp
Project-URL: Documentation, https://github.com/yourusername/portainer-core-mcp#readme
Project-URL: Repository, https://github.com/yourusername/portainer-core-mcp
Project-URL: Issues, https://github.com/yourusername/portainer-core-mcp/issues
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Requires-Python: >=3.8
Description-Content-Type: text/markdown
Requires-Dist: mcp>=1.0.0
Requires-Dist: httpx>=0.25.0
Requires-Dist: pydantic>=2.0.0
Requires-Dist: pydantic-settings>=2.0.0
Requires-Dist: structlog>=23.0.0
Requires-Dist: PyJWT>=2.8.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: tenacity>=8.0.0
Provides-Extra: dev
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev"
Requires-Dist: pytest-cov>=4.0.0; extra == "dev"
Requires-Dist: pytest-mock>=3.10.0; extra == "dev"
Requires-Dist: httpx-mock>=0.10.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: isort>=5.12.0; extra == "dev"
Requires-Dist: flake8>=6.0.0; extra == "dev"
Requires-Dist: mypy>=1.0.0; extra == "dev"
Requires-Dist: pre-commit>=3.0.0; extra == "dev"
# Portainer Core MCP Server
A Model Context Protocol (MCP) server for Portainer Business Edition authentication and user management.
## Features
- **Authentication & Session Management**: JWT token handling and user authentication
- **User Management**: Create, read, update, and delete users
- **Settings Management**: Retrieve and update Portainer settings
- **Secure Token Handling**: Automatic token refresh and secure storage
- **Error Handling**: Comprehensive error handling with retry logic
- **Circuit Breaker**: Fault tolerance for external API calls
## Installation
```bash
pip install portainer-core-mcp
```
## Configuration
Set the following environment variables:
```bash
PORTAINER_URL=https://your-portainer-instance.com
PORTAINER_API_KEY=your-api-token # Optional, for API key authentication
PORTAINER_USERNAME=admin # For username/password authentication
PORTAINER_PASSWORD=your-password # For username/password authentication
```
## Usage
### As MCP Server
```bash
portainer-core-mcp
```
### Programmatic Usage
```python
from portainer_core.server import PortainerCoreMCPServer
server = PortainerCoreMCPServer()
# Use server instance
```
## Available MCP Tools
- `authenticate` - Login with username/password
- `generate_token` - Generate API token
- `get_current_user` - Get authenticated user info
- `list_users` - List all users
- `create_user` - Create new user
- `update_user` - Update user details
- `delete_user` - Delete user
- `get_settings` - Get Portainer settings
- `update_settings` - Update settings
## Development
### Setup
```bash
# Clone the repository
git clone https://github.com/yourusername/portainer-core-mcp.git
cd portainer-core-mcp
# Install development dependencies
pip install -e ".[dev]"
# Install pre-commit hooks
pre-commit install
```
### Testing
```bash
# Run all tests
pytest
# Run with coverage
pytest --cov=src/portainer_core --cov-report=html
# Run only unit tests
pytest -m unit
# Run only integration tests
pytest -m integration
```
### Code Quality
```bash
# Format code
black src tests
isort src tests
# Lint code
flake8 src tests
# Type checking
mypy src
```
## Architecture
The server follows a layered architecture:
- **MCP Server Layer**: Handles MCP protocol communication
- **Service Layer**: Abstracts Portainer API interactions
- **Models Layer**: Defines data structures and validation
- **Utils Layer**: Provides utility functions and helpers
## Security
- All API communications use HTTPS
- JWT tokens are handled securely and never logged
- Input validation on all parameters
- Rate limiting to prevent abuse
- Circuit breaker pattern for fault tolerance
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Ensure all tests pass
6. Submit a pull request
## License
MIT License - see LICENSE file for details.

View File

@ -0,0 +1,18 @@
README.md
pyproject.toml
src/portainer_core/__init__.py
src/portainer_core/config.py
src/portainer_core/server.py
src/portainer_core/models/__init__.py
src/portainer_core/services/__init__.py
src/portainer_core/services/base.py
src/portainer_core/utils/__init__.py
src/portainer_core/utils/errors.py
src/portainer_core/utils/logging.py
src/portainer_core_mcp.egg-info/PKG-INFO
src/portainer_core_mcp.egg-info/SOURCES.txt
src/portainer_core_mcp.egg-info/dependency_links.txt
src/portainer_core_mcp.egg-info/entry_points.txt
src/portainer_core_mcp.egg-info/requires.txt
src/portainer_core_mcp.egg-info/top_level.txt
tests/test_basic.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,2 @@
[console_scripts]
portainer-core-mcp = portainer_core.server:main

View File

@ -0,0 +1,20 @@
mcp>=1.0.0
httpx>=0.25.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
structlog>=23.0.0
PyJWT>=2.8.0
python-dotenv>=1.0.0
tenacity>=8.0.0
[dev]
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-cov>=4.0.0
pytest-mock>=3.10.0
httpx-mock>=0.10.0
black>=23.0.0
isort>=5.12.0
flake8>=6.0.0
mypy>=1.0.0
pre-commit>=3.0.0

View File

@ -0,0 +1 @@
portainer_core

184
tests/test_basic.py Normal file
View File

@ -0,0 +1,184 @@
"""
Basic tests for Portainer Core MCP Server.
This module contains basic tests to verify the setup and configuration.
"""
import pytest
import os
from unittest.mock import patch
from portainer_core.config import PortainerConfig, get_config
from portainer_core.utils.errors import PortainerError, PortainerAuthenticationError
from portainer_core.utils.logging import get_logger, set_correlation_id
class TestConfiguration:
"""Test configuration management."""
def test_config_validation_with_api_key(self):
"""Test configuration validation with API key."""
with patch.dict(os.environ, {
'PORTAINER_URL': 'https://test.example.com',
'PORTAINER_API_KEY': 'test-api-key'
}):
config = PortainerConfig()
config.validate_auth_config()
assert config.portainer_url == 'https://test.example.com'
assert config.portainer_api_key == 'test-api-key'
assert config.use_api_key_auth is True
assert config.use_credentials_auth is False
def test_config_validation_with_credentials(self):
"""Test configuration validation with username/password."""
with patch.dict(os.environ, {
'PORTAINER_URL': 'https://test.example.com',
'PORTAINER_USERNAME': 'admin',
'PORTAINER_PASSWORD': 'password'
}):
config = PortainerConfig()
config.validate_auth_config()
assert config.portainer_url == 'https://test.example.com'
assert config.portainer_username == 'admin'
assert config.portainer_password == 'password'
assert config.use_api_key_auth is False
assert config.use_credentials_auth is True
def test_config_validation_no_auth(self):
"""Test configuration validation without authentication."""
with patch.dict(os.environ, {
'PORTAINER_URL': 'https://test.example.com'
}, clear=True):
config = PortainerConfig()
with pytest.raises(ValueError, match="Either PORTAINER_API_KEY or both"):
config.validate_auth_config()
def test_invalid_url(self):
"""Test invalid URL validation."""
with patch.dict(os.environ, {
'PORTAINER_URL': 'invalid-url',
'PORTAINER_API_KEY': 'test-key'
}):
with pytest.raises(ValueError, match="Invalid URL format"):
PortainerConfig()
def test_url_trailing_slash_removal(self):
"""Test URL trailing slash removal."""
with patch.dict(os.environ, {
'PORTAINER_URL': 'https://test.example.com/',
'PORTAINER_API_KEY': 'test-key'
}):
config = PortainerConfig()
assert config.portainer_url == 'https://test.example.com'
def test_api_base_url(self):
"""Test API base URL construction."""
with patch.dict(os.environ, {
'PORTAINER_URL': 'https://test.example.com',
'PORTAINER_API_KEY': 'test-key'
}):
config = PortainerConfig()
assert config.api_base_url == 'https://test.example.com/api'
class TestErrors:
"""Test error handling utilities."""
def test_portainer_error_basic(self):
"""Test basic PortainerError."""
error = PortainerError("Test error")
assert str(error) == "Test error"
assert error.message == "Test error"
assert error.status_code is None
assert error.details == {}
def test_portainer_error_with_status_code(self):
"""Test PortainerError with status code."""
error = PortainerError("Test error", status_code=400)
assert str(error) == "[400] Test error"
assert error.status_code == 400
def test_portainer_authentication_error(self):
"""Test PortainerAuthenticationError."""
error = PortainerAuthenticationError()
assert error.status_code == 401
assert "Authentication failed" in str(error)
def test_error_mapping(self):
"""Test HTTP error mapping."""
from portainer_core.utils.errors import map_http_error
error = map_http_error(404, "Not found")
assert error.__class__.__name__ == "PortainerNotFoundError"
assert error.status_code == 404
error = map_http_error(500, "Server error")
assert error.__class__.__name__ == "PortainerServerError"
assert error.status_code == 500
class TestLogging:
"""Test logging utilities."""
def test_get_logger(self):
"""Test logger creation."""
logger = get_logger("test")
assert logger is not None
def test_correlation_id(self):
"""Test correlation ID functionality."""
correlation_id = set_correlation_id("test-id")
assert correlation_id == "test-id"
from portainer_core.utils.logging import get_correlation_id
assert get_correlation_id() == "test-id"
def test_correlation_id_auto_generation(self):
"""Test automatic correlation ID generation."""
correlation_id = set_correlation_id()
assert correlation_id is not None
assert len(correlation_id) > 0
class TestCircuitBreaker:
"""Test circuit breaker functionality."""
def test_circuit_breaker_initial_state(self):
"""Test circuit breaker initial state."""
from portainer_core.services.base import CircuitBreaker, CircuitBreakerState
cb = CircuitBreaker()
assert cb.state == CircuitBreakerState.CLOSED
assert cb.can_execute() is True
assert cb.failure_count == 0
def test_circuit_breaker_failure_threshold(self):
"""Test circuit breaker failure threshold."""
from portainer_core.services.base import CircuitBreaker, CircuitBreakerState
cb = CircuitBreaker(failure_threshold=2)
# First failure
cb.record_failure()
assert cb.state == CircuitBreakerState.CLOSED
assert cb.can_execute() is True
# Second failure - should open
cb.record_failure()
assert cb.state == CircuitBreakerState.OPEN
assert cb.can_execute() is False
def test_circuit_breaker_success_reset(self):
"""Test circuit breaker success reset."""
from portainer_core.services.base import CircuitBreaker, CircuitBreakerState
cb = CircuitBreaker()
cb.record_failure()
cb.record_success()
assert cb.failure_count == 0
assert cb.last_failure_time is None
if __name__ == "__main__":
pytest.main([__file__])