commit 84ca8aee99b2cb863434426cad74a97d9fa5537f Author: Adolfo Delorenzo Date: Fri Jul 18 07:33:27 2025 -0600 first commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ab1ea6f --- /dev/null +++ b/.env.example @@ -0,0 +1,36 @@ +# Portainer Core MCP Server Configuration + +# Portainer connection settings +PORTAINER_URL=https://your-portainer-instance.com + +# Authentication settings (choose one method) +# Method 1: API Key authentication (recommended) +PORTAINER_API_KEY=your-api-key-here + +# Method 2: Username/Password authentication +# PORTAINER_USERNAME=admin +# PORTAINER_PASSWORD=your-password-here + +# HTTP client settings +HTTP_TIMEOUT=30 +MAX_RETRIES=3 +RETRY_DELAY=1.0 + +# Circuit breaker settings +CIRCUIT_BREAKER_FAILURE_THRESHOLD=5 +CIRCUIT_BREAKER_RECOVERY_TIMEOUT=60 + +# Token management settings +TOKEN_CACHE_TTL=3600 +TOKEN_REFRESH_THRESHOLD=300 + +# Logging settings +LOG_LEVEL=INFO +LOG_FORMAT=json + +# Development settings +DEBUG=false + +# Server settings +SERVER_HOST=localhost +SERVER_PORT=8000 \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..d9efcc3 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,181 @@ +# Portainer Core MCP Server + +This MCP server provides authentication and core management functionality for Portainer Business Edition. + +## Server Purpose + +**Primary Functions:** +- User authentication and session management +- API token generation and validation +- User profile and settings management +- Basic system information retrieval + +**Part of Portainer MCP Suite:** +- `portainer-core` - Authentication and user management (this server) +- `portainer-teams` - Teams and RBAC management +- `portainer-environments` - Environment and endpoint management +- `portainer-docker` - Docker container operations +- `portainer-kubernetes` - Kubernetes cluster management +- `portainer-stacks` - Stack deployment and management +- `portainer-edge` - Edge computing and device management + +## Authentication Flow + +**Required for all operations:** +1. Initialize admin user (first time setup) +2. Authenticate to get JWT token +3. Use token in `X-API-Key` header for all requests + +**Token Management:** +- Tokens expire based on server configuration +- Generate new tokens before expiration +- Store tokens securely in environment variables + +## Base Configuration + +**Environment Variables:** +```bash +PORTAINER_URL=https://your-portainer-instance.com +PORTAINER_API_KEY=your-api-token +``` + +**API Base URLs:** +- Business Edition: `{PORTAINER_URL}/api` +- All endpoints require authentication except initial admin setup + +## Common Response Patterns + +**Success Response Structure:** +```json +{ + "Id": 1, + "Username": "admin", + "Role": 1, + "CreationDate": 1631852794 +} +``` + +**Error Response Structure:** +```json +{ + "message": "Invalid credentials", + "details": "Authentication failed" +} +``` + +## Core Endpoint Categories + +**Authentication Endpoints:** +- `POST /api/users/admin/init` - Initialize admin user +- `POST /api/auth` - Authenticate user +- `POST /api/users/{id}/tokens` - Generate API token +- `DELETE /api/auth` - Logout/invalidate session + +**User Management:** +- `GET /api/users` - List users +- `POST /api/users` - Create user +- `GET /api/users/{id}` - Get user details +- `PUT /api/users/{id}` - Update user +- `DELETE /api/users/{id}` - Delete user + +**Settings:** +- `GET /api/settings` - Get Portainer settings +- `PUT /api/settings` - Update settings +- `GET /api/settings/public` - Get public settings + +## Error Handling + +**Common HTTP Status Codes:** +- `200` - Success +- `201` - Created +- `400` - Bad Request (invalid parameters) +- `401` - Unauthorized (invalid/missing token) +- `403` - Forbidden (insufficient permissions) +- `404` - Not Found +- `409` - Conflict (resource already exists) +- `500` - Internal Server Error + +**Retry Logic:** +- Implement exponential backoff for 5xx errors +- Refresh token on 401 responses +- Maximum 3 retry attempts + +## Security Considerations + +**Best Practices:** +- Always use HTTPS in production +- Rotate API tokens regularly +- Implement proper token storage +- Log authentication events +- Rate limit API calls + +**RBAC Integration:** +- Check user permissions before operations +- Respect environment-level access controls +- Honor team-based restrictions + +## Development Workflow + +**Testing Authentication:** +```bash +# Test admin initialization +curl -X POST "${PORTAINER_URL}/api/users/admin/init" \ + -H "Content-Type: application/json" \ + -d '{"Username":"admin","Password":"yourpassword"}' + +# Test login +curl -X POST "${PORTAINER_URL}/api/auth" \ + -H "Content-Type: application/json" \ + -d '{"Username":"admin","Password":"yourpassword"}' +``` + +**Local Development:** +- Use Docker Compose with Portainer for testing +- Mock authentication for unit tests +- Validate all endpoints with proper error handling + +## Integration Notes + +**With Other MCP Servers:** +- Share authentication state across Portainer MCP servers +- Use consistent error handling patterns +- Maintain session context for user operations + +**Rate Limits:** +- Portainer has built-in rate limiting +- Implement client-side throttling +- Monitor for 429 responses + +## Troubleshooting + +**Common Issues:** +- **401 Unauthorized**: Check token validity and format +- **403 Forbidden**: Verify user permissions and RBAC settings +- **Connection Refused**: Confirm Portainer URL and network access +- **SSL Errors**: Validate certificate configuration + +**Debug Commands:** +```bash +# Verify Portainer connectivity +curl -I "${PORTAINER_URL}/api/status" + +# Check current user context +curl -H "X-API-Key: ${PORTAINER_API_KEY}" \ + "${PORTAINER_URL}/api/users/me" +``` + +**Logging:** +- Enable verbose logging for authentication flows +- Log all API calls with timestamps +- Mask sensitive data in logs (passwords, tokens) + +## Version Compatibility + +**Supported Versions:** +- Portainer Business Edition 2.30.x+ +- API Version: 2.31.3 + +**Breaking Changes:** +- Monitor Portainer release notes for API changes +- Test against new versions before upgrading +- Maintain backward compatibility where possible diff --git a/README.md b/README.md new file mode 100644 index 0000000..0565a42 --- /dev/null +++ b/README.md @@ -0,0 +1,134 @@ +# Portainer Core MCP Server + +A Model Context Protocol (MCP) server for Portainer Business Edition authentication and user management. + +## Features + +- **Authentication & Session Management**: JWT token handling and user authentication +- **User Management**: Create, read, update, and delete users +- **Settings Management**: Retrieve and update Portainer settings +- **Secure Token Handling**: Automatic token refresh and secure storage +- **Error Handling**: Comprehensive error handling with retry logic +- **Circuit Breaker**: Fault tolerance for external API calls + +## Installation + +```bash +pip install portainer-core-mcp +``` + +## Configuration + +Set the following environment variables: + +```bash +PORTAINER_URL=https://your-portainer-instance.com +PORTAINER_API_KEY=your-api-token # Optional, for API key authentication +PORTAINER_USERNAME=admin # For username/password authentication +PORTAINER_PASSWORD=your-password # For username/password authentication +``` + +## Usage + +### As MCP Server + +```bash +portainer-core-mcp +``` + +### Programmatic Usage + +```python +from portainer_core.server import PortainerCoreMCPServer + +server = PortainerCoreMCPServer() +# Use server instance +``` + +## Available MCP Tools + +- `authenticate` - Login with username/password +- `generate_token` - Generate API token +- `get_current_user` - Get authenticated user info +- `list_users` - List all users +- `create_user` - Create new user +- `update_user` - Update user details +- `delete_user` - Delete user +- `get_settings` - Get Portainer settings +- `update_settings` - Update settings + +## Development + +### Setup + +```bash +# Clone the repository +git clone https://github.com/yourusername/portainer-core-mcp.git +cd portainer-core-mcp + +# Install development dependencies +pip install -e ".[dev]" + +# Install pre-commit hooks +pre-commit install +``` + +### Testing + +```bash +# Run all tests +pytest + +# Run with coverage +pytest --cov=src/portainer_core --cov-report=html + +# Run only unit tests +pytest -m unit + +# Run only integration tests +pytest -m integration +``` + +### Code Quality + +```bash +# Format code +black src tests +isort src tests + +# Lint code +flake8 src tests + +# Type checking +mypy src +``` + +## Architecture + +The server follows a layered architecture: + +- **MCP Server Layer**: Handles MCP protocol communication +- **Service Layer**: Abstracts Portainer API interactions +- **Models Layer**: Defines data structures and validation +- **Utils Layer**: Provides utility functions and helpers + +## Security + +- All API communications use HTTPS +- JWT tokens are handled securely and never logged +- Input validation on all parameters +- Rate limiting to prevent abuse +- Circuit breaker pattern for fault tolerance + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests for new functionality +5. Ensure all tests pass +6. Submit a pull request + +## License + +MIT License - see LICENSE file for details. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9bfdbf0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,141 @@ +[build-system] +requires = ["setuptools>=64", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "portainer-core-mcp" +version = "0.1.0" +description = "Portainer Core MCP Server - Authentication and User Management" +authors = [ + {name = "Your Name", email = "your.email@example.com"} +] +readme = "README.md" +license = {text = "MIT"} +requires-python = ">=3.8" +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +dependencies = [ + "mcp>=1.0.0", + "httpx>=0.25.0", + "pydantic>=2.0.0", + "pydantic-settings>=2.0.0", + "structlog>=23.0.0", + "PyJWT>=2.8.0", + "python-dotenv>=1.0.0", + "tenacity>=8.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-asyncio>=0.21.0", + "pytest-cov>=4.0.0", + "pytest-mock>=3.10.0", + "httpx-mock>=0.10.0", + "black>=23.0.0", + "isort>=5.12.0", + "flake8>=6.0.0", + "mypy>=1.0.0", + "pre-commit>=3.0.0", +] + +[project.urls] +Homepage = "https://github.com/yourusername/portainer-core-mcp" +Documentation = "https://github.com/yourusername/portainer-core-mcp#readme" +Repository = "https://github.com/yourusername/portainer-core-mcp" +Issues = "https://github.com/yourusername/portainer-core-mcp/issues" + +[project.scripts] +portainer-core-mcp = "portainer_core.server:main" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.package-dir] +"" = "src" + +[tool.black] +line-length = 88 +target-version = ['py38'] +include = '\.pyi?$' +extend-exclude = ''' +/( + # directories + \.eggs + | \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | build + | dist +)/ +''' + +[tool.isort] +profile = "black" +multi_line_output = 3 +line_length = 88 +known_first_party = ["portainer_core"] + +[tool.mypy] +python_version = "3.8" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +disallow_incomplete_defs = true +check_untyped_defs = true +disallow_untyped_decorators = true +no_implicit_optional = true +warn_redundant_casts = true +warn_unused_ignores = true +warn_no_return = true +warn_unreachable = true +strict_equality = true + +[tool.pytest.ini_options] +minversion = "7.0" +addopts = "-ra -q --strict-markers --strict-config" +testpaths = [ + "tests", +] +python_files = [ + "test_*.py", + "*_test.py", +] +markers = [ + "slow: marks tests as slow (deselect with '-m \"not slow\"')", + "integration: marks tests as integration tests", + "unit: marks tests as unit tests", +] + +[tool.coverage.run] +source = ["src"] +omit = [ + "*/tests/*", + "*/test_*", + "*/conftest.py", +] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "if self.debug:", + "if settings.DEBUG", + "raise AssertionError", + "raise NotImplementedError", + "if 0:", + "if __name__ == .__main__.:", + "class .*\\bProtocol\\):", + "@(abc\\.)?abstractmethod", +] \ No newline at end of file diff --git a/run_server.py b/run_server.py new file mode 100644 index 0000000..5e45f06 --- /dev/null +++ b/run_server.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Portainer Core MCP Server - Entry Point Script. + +This script serves as the main entry point for the Portainer Core MCP Server, +providing authentication and user management functionality for Portainer Business Edition. +It handles environment setup, configuration validation, and server startup. + +Usage: + python run_server.py + +Environment Variables: + PORTAINER_URL: The base URL of your Portainer instance (required) + PORTAINER_API_KEY: API key for authentication (option 1) + PORTAINER_USERNAME: Username for authentication (option 2) + PORTAINER_PASSWORD: Password for authentication (option 2) + +Example: + export PORTAINER_URL=https://portainer.example.com + export PORTAINER_API_KEY=your-api-key-here + python run_server.py + +Architecture: + - Environment setup and validation + - Configuration loading from environment variables + - Async server initialization and startup + - Graceful error handling and shutdown +""" + +import os +import sys +import asyncio +from portainer_core.server import main + + +def setup_environment(): + """ + Set up environment variables with fallback values for demo/testing. + + This function ensures the required environment variables are set for the server + to start properly. It provides fallback values for demo purposes when specific + configuration is not provided. + + Environment Variables Handled: + PORTAINER_URL: Sets default demo URL if not provided + PORTAINER_API_KEY: Sets placeholder if no authentication is configured + PORTAINER_USERNAME/PORTAINER_PASSWORD: Alternative authentication method + + Raises: + None: This function doesn't raise exceptions but prints warnings for + missing configuration. + + Side Effects: + - Modifies os.environ with default values + - Prints configuration messages to stdout + - Ensures minimum viable configuration for server startup + + Note: + In production environments, always provide proper authentication + credentials rather than relying on demo/placeholder values. + """ + # Configure Portainer URL with fallback to demo instance + if not os.environ.get('PORTAINER_URL'): + os.environ['PORTAINER_URL'] = 'https://demo.portainer.io' + print("šŸ”§ Using demo Portainer URL: https://demo.portainer.io") + + # Configure authentication - requires either API key or username/password + has_api_key = os.environ.get('PORTAINER_API_KEY') + has_credentials = (os.environ.get('PORTAINER_USERNAME') and + os.environ.get('PORTAINER_PASSWORD')) + + if not has_api_key and not has_credentials: + print("āš ļø No authentication configured. Set PORTAINER_API_KEY or PORTAINER_USERNAME/PORTAINER_PASSWORD") + print(" For demo purposes, using placeholder API key") + os.environ['PORTAINER_API_KEY'] = 'demo-api-key' + +if __name__ == "__main__": + """ + Main execution block for the Portainer Core MCP Server. + + This block handles the complete server lifecycle: + 1. Environment setup and configuration validation + 2. Async server initialization + 3. Graceful error handling and shutdown + 4. User-friendly status messages + + Exit Codes: + 0: Successful shutdown (user interrupt) + 1: Server failure or configuration error + + Flow: + 1. Print startup messages and configuration requirements + 2. Setup environment variables with fallbacks + 3. Start async server using asyncio.run() + 4. Handle interrupts and exceptions gracefully + """ + print("šŸš€ Starting Portainer Core MCP Server...") + print(" Configuration will be loaded from environment variables") + print(" Set PORTAINER_URL and PORTAINER_API_KEY (or username/password) before running") + print("") + + # Initialize environment with fallback values for demo/testing + setup_environment() + + try: + # Start the async MCP server - this blocks until shutdown + asyncio.run(main()) + except KeyboardInterrupt: + # Handle graceful shutdown on Ctrl+C + print("\nšŸ‘‹ Server stopped by user") + sys.exit(0) + except Exception as e: + # Handle any startup or runtime errors + print(f"\nāŒ Server failed: {e}") + sys.exit(1) \ No newline at end of file diff --git a/src/portainer_core/__init__.py b/src/portainer_core/__init__.py new file mode 100644 index 0000000..ef411f0 --- /dev/null +++ b/src/portainer_core/__init__.py @@ -0,0 +1,57 @@ +""" +Portainer Core MCP Server - Main Package + +This package provides Model Context Protocol (MCP) server functionality for Portainer +Business Edition, focusing on authentication and user management operations. + +The package implements a complete MCP server that integrates with Portainer's REST API +to provide secure, reliable access to user management, authentication, and settings +configuration through the MCP protocol. + +Key Features: + - Authentication service with JWT token management + - User management with full CRUD operations + - Settings management for Portainer configuration + - Comprehensive error handling and logging + - Circuit breaker pattern for fault tolerance + - Health monitoring and service status reporting + +Package Structure: + - server: Main MCP server implementation + - config: Configuration management with environment variable support + - models: Pydantic data models for API requests/responses + - services: Business logic services (auth, users, settings) + - utils: Utility modules (errors, logging, tokens) + +Usage: + from portainer_core.server import create_server + + # Create and run the MCP server + server = create_server() + await server.run() + +Dependencies: + - mcp: Model Context Protocol implementation + - pydantic: Data validation and serialization + - httpx: HTTP client for API requests + - structlog: Structured logging + +Version: 0.1.0 +License: MIT +Compatibility: Python 3.8+ +""" + +__version__ = "0.1.0" +__author__ = "Portainer MCP Team" +__email__ = "support@portainer.io" +__license__ = "MIT" +__description__ = "MCP server for Portainer Business Edition authentication and user management" + +# Package metadata for introspection +__all__ = [ + "__version__", + "__author__", + "__email__", + "__license__", + "__description__", +] \ No newline at end of file diff --git a/src/portainer_core/__pycache__/__init__.cpython-312.pyc b/src/portainer_core/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..05d0b4c Binary files /dev/null and b/src/portainer_core/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/portainer_core/__pycache__/config.cpython-312.pyc b/src/portainer_core/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..de87e17 Binary files /dev/null and b/src/portainer_core/__pycache__/config.cpython-312.pyc differ diff --git a/src/portainer_core/__pycache__/server.cpython-312.pyc b/src/portainer_core/__pycache__/server.cpython-312.pyc new file mode 100644 index 0000000..7397189 Binary files /dev/null and b/src/portainer_core/__pycache__/server.cpython-312.pyc differ diff --git a/src/portainer_core/config.py b/src/portainer_core/config.py new file mode 100644 index 0000000..6346c48 --- /dev/null +++ b/src/portainer_core/config.py @@ -0,0 +1,229 @@ +""" +Configuration management for Portainer Core MCP Server. + +This module handles environment variable validation and configuration settings. +""" + +import os +from typing import Optional +from urllib.parse import urlparse + +from pydantic import BaseModel, Field, field_validator, ConfigDict +from pydantic_settings import BaseSettings + + +class PortainerConfig(BaseSettings): + """Configuration settings for Portainer Core MCP Server.""" + + # Portainer connection settings + portainer_url: str = Field( + ..., + description="Base URL of the Portainer instance" + ) + + # Authentication settings (either API key or username/password) + portainer_api_key: Optional[str] = Field( + default=None, + description="Portainer API key for authentication" + ) + + portainer_username: Optional[str] = Field( + default=None, + description="Portainer username for authentication" + ) + + portainer_password: Optional[str] = Field( + default=None, + description="Portainer password for authentication" + ) + + # HTTP client settings + http_timeout: int = Field( + default=30, + description="HTTP request timeout in seconds" + ) + + max_retries: int = Field( + default=3, + description="Maximum number of retry attempts" + ) + + retry_delay: float = Field( + default=1.0, + description="Base delay between retries in seconds" + ) + + # Circuit breaker settings + circuit_breaker_failure_threshold: int = Field( + default=5, + description="Number of failures before circuit breaker opens" + ) + + circuit_breaker_recovery_timeout: int = Field( + default=60, + description="Time in seconds before attempting recovery" + ) + + # Token management settings + token_cache_ttl: int = Field( + default=3600, + description="Token cache TTL in seconds" + ) + + token_refresh_threshold: int = Field( + default=300, + description="Refresh token when expiring within this many seconds" + ) + + # Logging settings + log_level: str = Field( + default="INFO", + description="Logging level" + ) + + log_format: str = Field( + default="json", + description="Logging format (json or text)" + ) + + # Development settings + debug: bool = Field( + default=False, + description="Enable debug mode" + ) + + model_config = ConfigDict( + env_file=".env", + env_file_encoding="utf-8", + case_sensitive=False + ) + + @field_validator("portainer_url") + @classmethod + def validate_portainer_url(cls, v): + """Validate Portainer URL format.""" + if not v: + raise ValueError("Portainer URL is required") + + try: + parsed = urlparse(v) + if not parsed.scheme or not parsed.netloc: + raise ValueError("Invalid URL format") + + if parsed.scheme not in ["http", "https"]: + raise ValueError("URL must use http or https scheme") + + # Remove trailing slash for consistency + return v.rstrip("/") + except Exception as e: + raise ValueError(f"Invalid Portainer URL: {e}") + + @field_validator("log_level") + @classmethod + def validate_log_level(cls, v): + """Validate logging level.""" + valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + if v.upper() not in valid_levels: + raise ValueError(f"Log level must be one of {valid_levels}") + return v.upper() + + @field_validator("log_format") + @classmethod + def validate_log_format(cls, v): + """Validate logging format.""" + valid_formats = ["json", "text"] + if v.lower() not in valid_formats: + raise ValueError(f"Log format must be one of {valid_formats}") + return v.lower() + + def validate_auth_config(self) -> None: + """Validate authentication configuration.""" + has_api_key = self.portainer_api_key is not None + has_credentials = ( + self.portainer_username is not None + and self.portainer_password is not None + ) + + if not has_api_key and not has_credentials: + raise ValueError( + "Either PORTAINER_API_KEY or both PORTAINER_USERNAME and " + "PORTAINER_PASSWORD must be provided" + ) + + if has_api_key and has_credentials: + # Prefer API key if both are provided + self.portainer_username = None + self.portainer_password = None + + @property + def api_base_url(self) -> str: + """Get the base API URL.""" + return f"{self.portainer_url}/api" + + @property + def use_api_key_auth(self) -> bool: + """Check if API key authentication should be used.""" + return self.portainer_api_key is not None + + @property + def use_credentials_auth(self) -> bool: + """Check if username/password authentication should be used.""" + return ( + self.portainer_username is not None + and self.portainer_password is not None + ) + + +class ServerConfig(BaseModel): + """Server-specific configuration settings.""" + + host: str = Field( + default="localhost", + description="Server host" + ) + + port: int = Field( + default=8000, + description="Server port" + ) + + server_name: str = Field( + default="portainer-core-mcp", + description="Server name for identification" + ) + + version: str = Field( + default="0.1.0", + description="Server version" + ) + + +def get_config() -> PortainerConfig: + """Get validated configuration instance.""" + config = PortainerConfig() + config.validate_auth_config() + return config + + +def get_server_config() -> ServerConfig: + """Get server configuration instance.""" + return ServerConfig() + + +# Global configuration instances - lazy initialization +_config = None +_server_config = None + +def get_global_config() -> PortainerConfig: + """Get global configuration instance with lazy initialization.""" + global _config + if _config is None: + _config = get_config() + return _config + +def get_global_server_config() -> ServerConfig: + """Get global server configuration instance with lazy initialization.""" + global _server_config + if _server_config is None: + _server_config = get_server_config() + return _server_config \ No newline at end of file diff --git a/src/portainer_core/models/__init__.py b/src/portainer_core/models/__init__.py new file mode 100644 index 0000000..fb6da54 --- /dev/null +++ b/src/portainer_core/models/__init__.py @@ -0,0 +1,5 @@ +""" +Data models for Portainer Core MCP Server. + +This module contains Pydantic models for request/response data structures. +""" \ No newline at end of file diff --git a/src/portainer_core/models/__pycache__/__init__.cpython-312.pyc b/src/portainer_core/models/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..4966039 Binary files /dev/null and b/src/portainer_core/models/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/portainer_core/models/__pycache__/auth.cpython-312.pyc b/src/portainer_core/models/__pycache__/auth.cpython-312.pyc new file mode 100644 index 0000000..314ccad Binary files /dev/null and b/src/portainer_core/models/__pycache__/auth.cpython-312.pyc differ diff --git a/src/portainer_core/models/__pycache__/settings.cpython-312.pyc b/src/portainer_core/models/__pycache__/settings.cpython-312.pyc new file mode 100644 index 0000000..47345d1 Binary files /dev/null and b/src/portainer_core/models/__pycache__/settings.cpython-312.pyc differ diff --git a/src/portainer_core/models/__pycache__/users.cpython-312.pyc b/src/portainer_core/models/__pycache__/users.cpython-312.pyc new file mode 100644 index 0000000..0734c66 Binary files /dev/null and b/src/portainer_core/models/__pycache__/users.cpython-312.pyc differ diff --git a/src/portainer_core/models/auth.py b/src/portainer_core/models/auth.py new file mode 100644 index 0000000..570c586 --- /dev/null +++ b/src/portainer_core/models/auth.py @@ -0,0 +1,50 @@ +""" +Authentication data models for Portainer Core MCP Server. + +This module defines Pydantic models for authentication-related requests and responses. +""" + +from typing import Optional +from pydantic import BaseModel, Field + + +class LoginRequest(BaseModel): + """Request model for user authentication.""" + username: str = Field(..., description="Username for authentication") + password: str = Field(..., description="Password for authentication") + + +class LoginResponse(BaseModel): + """Response model for successful authentication.""" + jwt: str = Field(..., description="JWT authentication token") + + +class TokenRequest(BaseModel): + """Request model for API token generation.""" + description: str = Field( + default="API Token", + description="Description for the API token" + ) + + +class TokenResponse(BaseModel): + """Response model for API token generation.""" + raw_token: str = Field(..., description="The generated API token") + description: str = Field(..., description="Token description") + + +class InitializeAdminRequest(BaseModel): + """Request model for admin user initialization.""" + username: str = Field(..., description="Admin username") + password: str = Field(..., description="Admin password") + + +class UserContextResponse(BaseModel): + """Response model for current user context.""" + id: int = Field(..., description="User ID") + username: str = Field(..., description="Username") + role: int = Field(..., description="User role") + portainer_authorizations: Optional[dict] = Field( + None, + description="User authorization settings" + ) \ No newline at end of file diff --git a/src/portainer_core/models/settings.py b/src/portainer_core/models/settings.py new file mode 100644 index 0000000..d68454e --- /dev/null +++ b/src/portainer_core/models/settings.py @@ -0,0 +1,152 @@ +""" +Settings models for Portainer Core MCP Server. + +This module defines Pydantic models for settings-related data structures. +""" + +from typing import Any, Dict, Optional, Union + +from pydantic import BaseModel, Field, field_validator, ConfigDict + + +class SettingsBase(BaseModel): + """Base settings model.""" + pass + + +class UpdateSettingsRequest(BaseModel): + """Settings update request model.""" + settings: Dict[str, Any] = Field(..., description="Settings to update") + + @field_validator("settings") + @classmethod + def validate_settings(cls, v): + if not isinstance(v, dict): + raise ValueError("Settings must be a dictionary") + return v + + +class SettingsResponse(BaseModel): + """Settings response model.""" + authentication_method: Optional[int] = Field(None, description="Authentication method") + enable_registry_management: Optional[bool] = Field(None, description="Registry management enabled") + edge_agent_checkin_interval: Optional[int] = Field(None, description="Edge Agent checkin interval") + snapshot_interval: Optional[int] = Field(None, description="Snapshot interval") + templates_url: Optional[str] = Field(None, description="Templates URL") + logo_url: Optional[str] = Field(None, description="Logo URL") + + model_config = ConfigDict(populate_by_name=True) + + +class LogoUpdate(BaseModel): + """Logo update model.""" + logo: str = Field(..., description="Base64 encoded logo data") + + @field_validator("logo") + @classmethod + def validate_logo(cls, v): + if not v or not v.strip(): + raise ValueError("Logo data cannot be empty") + return v.strip() + + +class AuthenticationSettings(BaseModel): + """Authentication settings model.""" + method: int = Field(..., description="Authentication method (0=disabled, 1=enabled)") + + @field_validator("method") + @classmethod + def validate_method(cls, v): + if v not in [0, 1]: + raise ValueError("Authentication method must be 0 (disabled) or 1 (enabled)") + return v + + +class RegistrySettings(BaseModel): + """Registry management settings model.""" + enabled: bool = Field(..., description="Whether registry management is enabled") + + +class EdgeAgentSettings(BaseModel): + """Edge Agent settings model.""" + checkin_interval: int = Field(..., description="Checkin interval in seconds") + + @field_validator("checkin_interval") + @classmethod + def validate_interval(cls, v): + if v <= 0: + raise ValueError("Checkin interval must be positive") + return v + + +class SnapshotSettings(BaseModel): + """Snapshot settings model.""" + interval: int = Field(..., description="Snapshot interval in seconds") + + @field_validator("interval") + @classmethod + def validate_interval(cls, v): + if v <= 0: + raise ValueError("Snapshot interval must be positive") + return v + + +class TemplateSettings(BaseModel): + """Template settings model.""" + url: str = Field(..., description="Template file URL") + + @field_validator("url") + @classmethod + def validate_url(cls, v): + if not v or not v.strip(): + raise ValueError("Template URL cannot be empty") + if not v.startswith(("http://", "https://")): + raise ValueError("Template URL must start with http:// or https://") + return v.strip() + + +class PortainerSettings(BaseModel): + """Portainer settings model.""" + authentication_method: Optional[int] = Field(None, description="Authentication method") + enable_registry_management: Optional[bool] = Field(None, description="Registry management enabled") + edge_agent_checkin_interval: Optional[int] = Field(None, description="Edge Agent checkin interval") + snapshot_interval: Optional[int] = Field(None, description="Snapshot interval") + templates_url: Optional[str] = Field(None, description="Templates URL") + logo_url: Optional[str] = Field(None, description="Logo URL") + + model_config = ConfigDict(populate_by_name=True) + + +class SystemInfo(BaseModel): + """System information model.""" + version: str = Field(..., description="Portainer version") + database_version: str = Field(..., description="Database version") + file_service_version: str = Field(..., description="File service version") + + model_config = ConfigDict(populate_by_name=True) + + +class SystemVersion(BaseModel): + """System version model.""" + server_version: str = Field(..., description="Server version") + database_version: str = Field(..., description="Database version") + build: str = Field(..., description="Build information") + + model_config = ConfigDict(populate_by_name=True) + + +class PortainerStatus(BaseModel): + """Portainer status model.""" + version: str = Field(..., description="Portainer version") + instance_id: str = Field(..., description="Instance ID") + + model_config = ConfigDict(populate_by_name=True) + + +class PublicSettings(BaseModel): + """Public settings model (no authentication required).""" + logo_url: Optional[str] = Field(None, description="Logo URL") + external_templates_url: Optional[str] = Field(None, description="External templates URL") + authentication_method: Optional[int] = Field(None, description="Authentication method") + + model_config = ConfigDict(populate_by_name=True) \ No newline at end of file diff --git a/src/portainer_core/models/users.py b/src/portainer_core/models/users.py new file mode 100644 index 0000000..63af693 --- /dev/null +++ b/src/portainer_core/models/users.py @@ -0,0 +1,147 @@ +""" +User models for Portainer Core MCP Server. + +This module defines Pydantic models for user-related data structures. +""" + +from datetime import datetime +from typing import List, Optional + +from pydantic import BaseModel, Field, field_validator, ConfigDict + + +class UserBase(BaseModel): + """Base user model.""" + username: str = Field(..., description="Username") + role: int = Field(..., description="User role (1=Admin, 2=User)") + + @field_validator("username") + @classmethod + def validate_username(cls, v): + if not v or not v.strip(): + raise ValueError("Username cannot be empty") + return v.strip() + + @field_validator("role") + @classmethod + def validate_role(cls, v): + if v not in [1, 2]: + raise ValueError("Role must be 1 (Admin) or 2 (User)") + return v + + +class CreateUserRequest(UserBase): + """User creation request model.""" + password: str = Field(..., description="Password for the new user") + + @field_validator("password") + @classmethod + def validate_password(cls, v): + if not v: + raise ValueError("Password cannot be empty") + if len(v) < 8: + raise ValueError("Password must be at least 8 characters long") + return v + + +class UpdateUserRequest(BaseModel): + """User update request model.""" + username: Optional[str] = Field(None, description="New username") + password: Optional[str] = Field(None, description="New password") + role: Optional[int] = Field(None, description="New role (1=Admin, 2=User)") + + @field_validator("username") + @classmethod + def validate_username(cls, v): + if v is not None and (not v or not v.strip()): + raise ValueError("Username cannot be empty") + return v.strip() if v else v + + @field_validator("password") + @classmethod + def validate_password(cls, v): + if v is not None and len(v) < 8: + raise ValueError("Password must be at least 8 characters long") + return v + + @field_validator("role") + @classmethod + def validate_role(cls, v): + if v is not None and v not in [1, 2]: + raise ValueError("Role must be 1 (Admin) or 2 (User)") + return v + + +class UserResponse(UserBase): + """User response model.""" + id: int = Field(..., description="User ID", alias="Id") + username: str = Field(..., description="Username", alias="Username") + role: int = Field(..., description="User role", alias="Role") + created_at: Optional[datetime] = Field(None, description="User creation timestamp") + + model_config = ConfigDict(populate_by_name=True) + + +class ChangePasswordRequest(BaseModel): + """Password change request model.""" + password: str = Field(..., description="New password") + + @field_validator("password") + @classmethod + def validate_password(cls, v): + if not v: + raise ValueError("Password cannot be empty") + if len(v) < 8: + raise ValueError("Password must be at least 8 characters long") + return v + + +class UserList(BaseModel): + """User list model.""" + users: List[UserResponse] = Field(..., description="List of users") + total: int = Field(..., description="Total number of users") + + +class UserMembership(BaseModel): + """User team membership model.""" + id: int = Field(..., description="Membership ID", alias="Id") + user_id: int = Field(..., description="User ID", alias="UserId") + team_id: int = Field(..., description="Team ID", alias="TeamId") + role: int = Field(..., description="Role in team", alias="Role") + + model_config = ConfigDict(populate_by_name=True) + + +class UserToken(BaseModel): + """User API token model.""" + id: int = Field(..., description="Token ID", alias="Id") + description: str = Field(..., description="Token description", alias="Description") + created_at: datetime = Field(..., description="Token creation timestamp", alias="CreationDate") + last_used: Optional[datetime] = Field(None, description="Last usage timestamp", alias="LastUsed") + + model_config = ConfigDict(populate_by_name=True) + + +class UserTokenList(BaseModel): + """User token list model.""" + tokens: List[UserToken] = Field(..., description="List of user tokens") + total: int = Field(..., description="Total number of tokens") + + +class UserSearch(BaseModel): + """User search model.""" + query: str = Field(..., description="Search query") + + @field_validator("query") + @classmethod + def validate_query(cls, v): + if not v or not v.strip(): + raise ValueError("Search query cannot be empty") + return v.strip() + + +class UserSearchResult(BaseModel): + """User search result model.""" + users: List[UserResponse] = Field(..., description="Matching users") + query: str = Field(..., description="Search query used") + total: int = Field(..., description="Total number of matches") \ No newline at end of file diff --git a/src/portainer_core/server.py b/src/portainer_core/server.py new file mode 100644 index 0000000..c3c60a5 --- /dev/null +++ b/src/portainer_core/server.py @@ -0,0 +1,549 @@ +""" +Main MCP server implementation for Portainer Core. + +This module implements the MCP server that provides authentication and user +management functionality for Portainer Business Edition. +""" + +import asyncio +from typing import Any, Dict, List, Optional, Sequence + +from mcp.server import Server +from mcp.server.models import InitializationOptions +from mcp.server.stdio import stdio_server +from mcp.types import ( + Resource, + Tool, + TextContent, + ImageContent, + EmbeddedResource, +) + +from .config import get_global_config, get_global_server_config +from .utils.logging import get_logger, LogContext, set_correlation_id +from .utils.errors import ( + PortainerError, + PortainerConfigurationError, + PortainerNetworkError, +) +from .services.auth import AuthService +from .services.users import UserService +from .services.settings import SettingsService + +logger = get_logger(__name__) + + +class PortainerCoreMCPServer: + """Main MCP server class for Portainer Core functionality.""" + + def __init__(self): + config = get_global_config() + server_config = get_global_server_config() + + self.server = Server(server_config.server_name) + self.setup_handlers() + + # Service instances will be initialized later + self.auth_service = None + self.user_service = None + self.settings_service = None + + logger.info( + "Portainer Core MCP Server initialized", + server_name=server_config.server_name, + version=server_config.version, + portainer_url=config.portainer_url + ) + + def setup_handlers(self) -> None: + """Set up MCP server handlers.""" + + @self.server.list_resources() + async def handle_list_resources() -> List[Resource]: + """List available resources.""" + return [ + Resource( + uri="portainer://users", + name="Users", + description="Portainer users and their details", + mimeType="application/json", + ), + Resource( + uri="portainer://settings", + name="Settings", + description="Portainer instance settings", + mimeType="application/json", + ), + Resource( + uri="portainer://health", + name="Health", + description="Server health status", + mimeType="application/json", + ), + ] + + @self.server.read_resource() + async def handle_read_resource(uri: str) -> str: + """Read a specific resource.""" + with LogContext(): + logger.info("Reading resource", uri=uri) + + try: + if uri == "portainer://health": + return await self._get_health_status() + elif uri == "portainer://users": + return await self._get_users_resource() + elif uri == "portainer://settings": + return await self._get_settings_resource() + else: + raise PortainerError(f"Unknown resource: {uri}") + + except Exception as e: + logger.error("Error reading resource", uri=uri, error=str(e)) + raise + + @self.server.list_tools() + async def handle_list_tools() -> List[Tool]: + """List available tools.""" + return [ + Tool( + name="authenticate", + description="Authenticate with Portainer using username/password", + inputSchema={ + "type": "object", + "properties": { + "username": { + "type": "string", + "description": "Username for authentication" + }, + "password": { + "type": "string", + "description": "Password for authentication" + } + }, + "required": ["username", "password"] + } + ), + Tool( + name="generate_token", + description="Generate API token for a user", + inputSchema={ + "type": "object", + "properties": { + "user_id": { + "type": "integer", + "description": "User ID to generate token for" + }, + "description": { + "type": "string", + "description": "Token description" + } + }, + "required": ["user_id"] + } + ), + Tool( + name="get_current_user", + description="Get current authenticated user information", + inputSchema={ + "type": "object", + "properties": {}, + "additionalProperties": False + } + ), + Tool( + name="list_users", + description="List all users", + inputSchema={ + "type": "object", + "properties": {}, + "additionalProperties": False + } + ), + Tool( + name="create_user", + description="Create a new user", + inputSchema={ + "type": "object", + "properties": { + "username": { + "type": "string", + "description": "Username for the new user" + }, + "password": { + "type": "string", + "description": "Password for the new user" + }, + "role": { + "type": "integer", + "description": "Role ID for the user (1=Admin, 2=User)" + } + }, + "required": ["username", "password", "role"] + } + ), + Tool( + name="update_user", + description="Update user information", + inputSchema={ + "type": "object", + "properties": { + "user_id": { + "type": "integer", + "description": "User ID to update" + }, + "username": { + "type": "string", + "description": "New username" + }, + "password": { + "type": "string", + "description": "New password" + }, + "role": { + "type": "integer", + "description": "New role ID" + } + }, + "required": ["user_id"] + } + ), + Tool( + name="delete_user", + description="Delete a user", + inputSchema={ + "type": "object", + "properties": { + "user_id": { + "type": "integer", + "description": "User ID to delete" + } + }, + "required": ["user_id"] + } + ), + Tool( + name="get_settings", + description="Get Portainer settings", + inputSchema={ + "type": "object", + "properties": {}, + "additionalProperties": False + } + ), + Tool( + name="update_settings", + description="Update Portainer settings", + inputSchema={ + "type": "object", + "properties": { + "settings": { + "type": "object", + "description": "Settings to update" + } + }, + "required": ["settings"] + } + ), + Tool( + name="health_check", + description="Check server health status", + inputSchema={ + "type": "object", + "properties": {}, + "additionalProperties": False + } + ), + ] + + @self.server.call_tool() + async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: + """Handle tool calls.""" + correlation_id = set_correlation_id() + + with LogContext(correlation_id): + logger.info("Tool called", tool_name=name, arguments=arguments) + + try: + if name == "health_check": + result = await self._handle_health_check() + elif name == "authenticate": + result = await self._handle_authenticate(arguments) + elif name == "generate_token": + result = await self._handle_generate_token(arguments) + elif name == "get_current_user": + result = await self._handle_get_current_user(arguments) + elif name == "list_users": + result = await self._handle_list_users(arguments) + elif name == "create_user": + result = await self._handle_create_user(arguments) + elif name == "update_user": + result = await self._handle_update_user(arguments) + elif name == "delete_user": + result = await self._handle_delete_user(arguments) + elif name == "get_settings": + result = await self._handle_get_settings(arguments) + elif name == "update_settings": + result = await self._handle_update_settings(arguments) + else: + raise PortainerError(f"Unknown tool: {name}") + + logger.info("Tool executed successfully", tool_name=name) + return [TextContent(type="text", text=result)] + + except Exception as e: + logger.error("Tool execution failed", tool_name=name, error=str(e)) + error_message = f"Error executing {name}: {str(e)}" + return [TextContent(type="text", text=error_message)] + + async def _get_health_status(self) -> str: + """Get server health status.""" + try: + config = get_global_config() + server_config = get_global_server_config() + + # Initialize services if not already done + await self._ensure_services_initialized() + + # Check service health + service_statuses = {} + + if self.auth_service: + service_statuses["auth"] = "healthy" if await self.auth_service.health_check() else "unhealthy" + else: + service_statuses["auth"] = "not_initialized" + + if self.user_service: + service_statuses["users"] = "healthy" if await self.user_service.health_check() else "unhealthy" + else: + service_statuses["users"] = "not_initialized" + + if self.settings_service: + service_statuses["settings"] = "healthy" if await self.settings_service.health_check() else "unhealthy" + else: + service_statuses["settings"] = "not_initialized" + + # Overall status + overall_status = "healthy" if all(s == "healthy" for s in service_statuses.values()) else "degraded" + + status = { + "status": overall_status, + "server": server_config.server_name, + "version": server_config.version, + "portainer_url": config.portainer_url, + "services": service_statuses + } + + return str(status) + except Exception as e: + logger.error("Health check failed", error=str(e)) + return f"Health check failed: {str(e)}" + + async def _ensure_services_initialized(self) -> None: + """Ensure all services are initialized.""" + if self.auth_service is None: + self.auth_service = AuthService() + await self.auth_service.initialize() + + if self.user_service is None: + self.user_service = UserService() + await self.user_service.initialize() + + if self.settings_service is None: + self.settings_service = SettingsService() + await self.settings_service.initialize() + + async def _get_users_resource(self) -> str: + """Get users resource.""" + try: + await self._ensure_services_initialized() + users = await self.user_service.list_users() + return str(users) + except Exception as e: + logger.error("Failed to get users resource", error=str(e)) + return f"Failed to get users: {str(e)}" + + async def _get_settings_resource(self) -> str: + """Get settings resource.""" + try: + await self._ensure_services_initialized() + settings = await self.settings_service.get_settings() + return str(settings) + except Exception as e: + logger.error("Failed to get settings resource", error=str(e)) + return f"Failed to get settings: {str(e)}" + + async def _handle_health_check(self) -> str: + """Handle health check tool call.""" + return await self._get_health_status() + + async def _handle_authenticate(self, arguments: Dict[str, Any]) -> str: + """Handle authentication tool call.""" + try: + await self._ensure_services_initialized() + username = arguments.get("username") + password = arguments.get("password") + + result = await self.auth_service.login(username, password) + logger.info("Authentication successful", username=username) + return str(result) + except Exception as e: + logger.error("Authentication failed", error=str(e)) + return f"Authentication failed: {str(e)}" + + async def _handle_generate_token(self, arguments: Dict[str, Any]) -> str: + """Handle token generation tool call.""" + try: + await self._ensure_services_initialized() + user_id = arguments.get("user_id") + description = arguments.get("description", "MCP Server Token") + + result = await self.auth_service.generate_api_token(user_id, description) + logger.info("Token generation successful", user_id=user_id) + return str(result) + except Exception as e: + logger.error("Token generation failed", error=str(e)) + return f"Token generation failed: {str(e)}" + + async def _handle_get_current_user(self, arguments: Dict[str, Any]) -> str: + """Handle get current user tool call.""" + try: + await self._ensure_services_initialized() + result = await self.auth_service.get_current_user() + return str(result) + except Exception as e: + logger.error("Get current user failed", error=str(e)) + return f"Get current user failed: {str(e)}" + + async def _handle_list_users(self, arguments: Dict[str, Any]) -> str: + """Handle list users tool call.""" + try: + await self._ensure_services_initialized() + result = await self.user_service.list_users() + return str(result) + except Exception as e: + logger.error("List users failed", error=str(e)) + return f"List users failed: {str(e)}" + + async def _handle_create_user(self, arguments: Dict[str, Any]) -> str: + """Handle create user tool call.""" + try: + await self._ensure_services_initialized() + username = arguments.get("username") + password = arguments.get("password") + role = arguments.get("role") + + result = await self.user_service.create_user(username, password, role) + logger.info("User creation successful", username=username) + return str(result) + except Exception as e: + logger.error("User creation failed", error=str(e)) + return f"User creation failed: {str(e)}" + + async def _handle_update_user(self, arguments: Dict[str, Any]) -> str: + """Handle update user tool call.""" + try: + await self._ensure_services_initialized() + user_id = arguments.get("user_id") + username = arguments.get("username") + password = arguments.get("password") + role = arguments.get("role") + + result = await self.user_service.update_user(user_id, username, password, role) + logger.info("User update successful", user_id=user_id) + return str(result) + except Exception as e: + logger.error("User update failed", error=str(e)) + return f"User update failed: {str(e)}" + + async def _handle_delete_user(self, arguments: Dict[str, Any]) -> str: + """Handle delete user tool call.""" + try: + await self._ensure_services_initialized() + user_id = arguments.get("user_id") + + result = await self.user_service.delete_user(user_id) + logger.info("User deletion successful", user_id=user_id) + return f"User {user_id} deleted successfully" + except Exception as e: + logger.error("User deletion failed", error=str(e)) + return f"User deletion failed: {str(e)}" + + async def _handle_get_settings(self, arguments: Dict[str, Any]) -> str: + """Handle get settings tool call.""" + try: + await self._ensure_services_initialized() + result = await self.settings_service.get_settings() + return str(result) + except Exception as e: + logger.error("Get settings failed", error=str(e)) + return f"Get settings failed: {str(e)}" + + async def _handle_update_settings(self, arguments: Dict[str, Any]) -> str: + """Handle update settings tool call.""" + try: + await self._ensure_services_initialized() + settings = arguments.get("settings") + + result = await self.settings_service.update_settings(settings) + logger.info("Settings update successful") + return str(result) + except Exception as e: + logger.error("Settings update failed", error=str(e)) + return f"Settings update failed: {str(e)}" + + async def run(self) -> None: + """Run the MCP server.""" + logger.info("Starting Portainer Core MCP Server") + + try: + server_config = get_global_server_config() + + # Initialize services + await self._ensure_services_initialized() + + async with stdio_server() as (read_stream, write_stream): + await self.server.run( + read_stream, + write_stream, + InitializationOptions( + server_name=server_config.server_name, + server_version=server_config.version, + ) + ) + except Exception as e: + logger.error("Server failed to start", error=str(e)) + raise + finally: + # Clean up services + await self._cleanup_services() + + async def _cleanup_services(self) -> None: + """Clean up service resources.""" + if self.auth_service: + await self.auth_service.cleanup() + if self.user_service: + await self.user_service.cleanup() + if self.settings_service: + await self.settings_service.cleanup() + + +def create_server() -> PortainerCoreMCPServer: + """Create and return a configured MCP server instance.""" + return PortainerCoreMCPServer() + + +async def main() -> None: + """Main entry point for the MCP server.""" + try: + server = create_server() + await server.run() + except KeyboardInterrupt: + logger.info("Server stopped by user") + except Exception as e: + logger.error("Server failed", error=str(e)) + raise + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/portainer_core/services/__init__.py b/src/portainer_core/services/__init__.py new file mode 100644 index 0000000..9091dcc --- /dev/null +++ b/src/portainer_core/services/__init__.py @@ -0,0 +1,5 @@ +""" +Service layer for Portainer Core MCP Server. + +This module contains service classes for interacting with the Portainer API. +""" \ No newline at end of file diff --git a/src/portainer_core/services/__pycache__/__init__.cpython-312.pyc b/src/portainer_core/services/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..3e5d91a Binary files /dev/null and b/src/portainer_core/services/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/portainer_core/services/__pycache__/auth.cpython-312.pyc b/src/portainer_core/services/__pycache__/auth.cpython-312.pyc new file mode 100644 index 0000000..12ae685 Binary files /dev/null and b/src/portainer_core/services/__pycache__/auth.cpython-312.pyc differ diff --git a/src/portainer_core/services/__pycache__/base.cpython-312.pyc b/src/portainer_core/services/__pycache__/base.cpython-312.pyc new file mode 100644 index 0000000..3858a72 Binary files /dev/null and b/src/portainer_core/services/__pycache__/base.cpython-312.pyc differ diff --git a/src/portainer_core/services/__pycache__/settings.cpython-312.pyc b/src/portainer_core/services/__pycache__/settings.cpython-312.pyc new file mode 100644 index 0000000..fd72d24 Binary files /dev/null and b/src/portainer_core/services/__pycache__/settings.cpython-312.pyc differ diff --git a/src/portainer_core/services/__pycache__/users.cpython-312.pyc b/src/portainer_core/services/__pycache__/users.cpython-312.pyc new file mode 100644 index 0000000..1345960 Binary files /dev/null and b/src/portainer_core/services/__pycache__/users.cpython-312.pyc differ diff --git a/src/portainer_core/services/auth.py b/src/portainer_core/services/auth.py new file mode 100644 index 0000000..425b260 --- /dev/null +++ b/src/portainer_core/services/auth.py @@ -0,0 +1,230 @@ +""" +Authentication service for Portainer Core MCP Server. + +This module provides authentication functionality including login, token management, +and session handling for Portainer Business Edition. +""" + +import asyncio +from typing import Dict, Any, Optional +from datetime import datetime, timedelta + +from ..services.base import BaseService +from ..models.auth import LoginRequest, LoginResponse, TokenRequest, TokenResponse +from ..utils.errors import ( + PortainerAuthenticationError, + PortainerValidationError, + PortainerTokenExpiredError, +) +from ..utils.tokens import decode_jwt_token, is_token_expired +from ..utils.logging import get_logger + + +class AuthService(BaseService): + """Service for authentication operations.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger = get_logger(__name__) + self._cached_token = None + self._token_expires_at = None + + async def login(self, username: str, password: str) -> LoginResponse: + """ + Authenticate user with username and password. + + Args: + username: User's username + password: User's password + + Returns: + LoginResponse containing authentication token + + Raises: + PortainerAuthenticationError: If authentication fails + """ + request_data = LoginRequest(username=username, password=password) + + try: + response = await self._make_request( + method="POST", + endpoint="/auth", + json_data=request_data.model_dump(), + require_auth=False + ) + + # Store token for future use + self._cached_token = response.get("jwt") + if self._cached_token: + # Calculate expiration time (tokens typically last 8 hours) + self._token_expires_at = datetime.utcnow() + timedelta(hours=8) + + return LoginResponse(**response) + + except Exception as e: + self.logger.error("Authentication failed", username=username, error=str(e)) + raise PortainerAuthenticationError(f"Authentication failed: {str(e)}") + + async def logout(self) -> bool: + """ + Logout the current user session. + + Returns: + True if logout successful + """ + try: + await self._make_request( + method="DELETE", + endpoint="/auth" + ) + + # Clear cached token + self._cached_token = None + self._token_expires_at = None + + return True + + except Exception as e: + self.logger.error("Logout failed", error=str(e)) + return False + + async def generate_api_token(self, user_id: int, description: str = "API Token") -> TokenResponse: + """ + Generate an API token for a user. + + Args: + user_id: ID of the user + description: Token description + + Returns: + TokenResponse containing the new API token + """ + request_data = TokenRequest(description=description) + + try: + response = await self._make_request( + method="POST", + endpoint=f"/users/{user_id}/tokens", + json_data=request_data.model_dump() + ) + + return TokenResponse(**response) + + except Exception as e: + self.logger.error("Token generation failed", user_id=user_id, error=str(e)) + raise PortainerValidationError(f"Token generation failed: {str(e)}") + + async def validate_token(self, token: str) -> bool: + """ + Validate an authentication token. + + Args: + token: JWT token to validate + + Returns: + True if token is valid + """ + try: + if is_token_expired(token): + return False + + # Try to use the token for a simple API call + headers = {"Authorization": f"Bearer {token}"} + await self._make_request( + method="GET", + endpoint="/status", + headers=headers, + require_auth=False + ) + + return True + + except Exception: + return False + + async def refresh_token_if_needed(self) -> Optional[str]: + """ + Refresh the cached token if it's expired or about to expire. + + Returns: + New token if refreshed, None if no refresh needed + """ + if not self._cached_token or not self._token_expires_at: + return None + + # Check if token expires within 1 hour + expires_soon = datetime.utcnow() + timedelta(hours=1) + if self._token_expires_at > expires_soon: + return None + + # Re-authenticate if using credentials + if self.config.use_credentials_auth: + try: + response = await self.login( + self.config.portainer_username, + self.config.portainer_password + ) + return response.jwt + + except Exception as e: + self.logger.error("Token refresh failed", error=str(e)) + raise PortainerTokenExpiredError("Failed to refresh expired token") + + return None + + async def get_current_user(self) -> Dict[str, Any]: + """ + Get information about the currently authenticated user. + + Returns: + User information dictionary + """ + try: + response = await self._make_request( + method="GET", + endpoint="/users/me" + ) + + return response + + except Exception as e: + self.logger.error("Failed to get current user", error=str(e)) + raise PortainerAuthenticationError(f"Failed to get current user: {str(e)}") + + def get_cached_token(self) -> Optional[str]: + """ + Get the currently cached authentication token. + + Returns: + Cached token if available and not expired + """ + if not self._cached_token or not self._token_expires_at: + return None + + if datetime.utcnow() >= self._token_expires_at: + # Token expired, clear cache + self._cached_token = None + self._token_expires_at = None + return None + + return self._cached_token + + async def health_check(self) -> bool: + """ + Check if the authentication service is healthy. + + Returns: + True if service is healthy + """ + try: + # Try to access the status endpoint + await self._make_request( + method="GET", + endpoint="/status", + require_auth=False + ) + return True + + except Exception as e: + self.logger.error("Auth service health check failed", error=str(e)) + return False \ No newline at end of file diff --git a/src/portainer_core/services/base.py b/src/portainer_core/services/base.py new file mode 100644 index 0000000..e3864e9 --- /dev/null +++ b/src/portainer_core/services/base.py @@ -0,0 +1,333 @@ +""" +Base service class for Portainer API interactions. + +This module provides a base service class with HTTP client, retry logic, +circuit breaker pattern, and error handling. +""" + +import asyncio +import time +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any, Dict, Optional, Union + +import httpx +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, + before_sleep_log, +) + +from ..config import get_global_config +from ..utils.errors import ( + PortainerError, + PortainerNetworkError, + PortainerTimeoutError, + PortainerCircuitBreakerError, + map_http_error, + is_retryable_error, + should_refresh_token, +) +from ..utils.logging import get_logger + +logger = get_logger(__name__) + + +class CircuitBreakerState(Enum): + """Circuit breaker states.""" + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + + +class CircuitBreaker: + """Circuit breaker implementation for fault tolerance.""" + + def __init__( + self, + failure_threshold: int = 5, + recovery_timeout: int = 60, + name: str = "default" + ): + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.name = name + self.failure_count = 0 + self.last_failure_time = None + self.state = CircuitBreakerState.CLOSED + + def can_execute(self) -> bool: + """Check if the circuit breaker allows execution.""" + if self.state == CircuitBreakerState.CLOSED: + return True + + if self.state == CircuitBreakerState.OPEN: + if self.last_failure_time and \ + time.time() - self.last_failure_time > self.recovery_timeout: + self.state = CircuitBreakerState.HALF_OPEN + logger.info( + "Circuit breaker transitioning to half-open", + name=self.name, + failure_count=self.failure_count + ) + return True + return False + + # HALF_OPEN state + return True + + def record_success(self) -> None: + """Record a successful operation.""" + self.failure_count = 0 + self.last_failure_time = None + + if self.state == CircuitBreakerState.HALF_OPEN: + self.state = CircuitBreakerState.CLOSED + logger.info( + "Circuit breaker closed after successful operation", + name=self.name + ) + + def record_failure(self) -> None: + """Record a failed operation.""" + self.failure_count += 1 + self.last_failure_time = time.time() + + if self.failure_count >= self.failure_threshold: + self.state = CircuitBreakerState.OPEN + logger.warning( + "Circuit breaker opened due to failures", + name=self.name, + failure_count=self.failure_count + ) + + +class BaseService(ABC): + """Base class for Portainer API services.""" + + def __init__(self, name: str): + self.name = name + self.client = None + self.config = get_global_config() + self.circuit_breaker = CircuitBreaker( + failure_threshold=self.config.circuit_breaker_failure_threshold, + recovery_timeout=self.config.circuit_breaker_recovery_timeout, + name=name + ) + self.logger = get_logger(f"{__name__}.{name}") + self._auth_token = None + self._token_expiry = None + + async def __aenter__(self): + """Async context manager entry.""" + await self.initialize() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit.""" + await self.cleanup() + + async def initialize(self) -> None: + """Initialize the service and HTTP client.""" + if self.client is None: + self.client = httpx.AsyncClient( + timeout=httpx.Timeout(self.config.http_timeout), + limits=httpx.Limits(max_keepalive_connections=5, max_connections=10), + verify=True, # Always verify SSL certificates + ) + self.logger.info("HTTP client initialized", service=self.name) + + async def cleanup(self) -> None: + """Clean up resources.""" + if self.client: + await self.client.aclose() + self.client = None + self.logger.info("HTTP client closed", service=self.name) + + def _get_base_headers(self) -> Dict[str, str]: + """Get base headers for API requests.""" + headers = { + "Content-Type": "application/json", + "User-Agent": f"portainer-core-mcp/{self.config.portainer_url}", + } + + if self._auth_token: + headers["Authorization"] = f"Bearer {self._auth_token}" + elif self.config.portainer_api_key: + headers["X-API-Key"] = self.config.portainer_api_key + + return headers + + def _build_url(self, endpoint: str) -> str: + """Build full URL for API endpoint.""" + base_url = self.config.api_base_url + if endpoint.startswith("/"): + endpoint = endpoint[1:] + return f"{base_url}/{endpoint}" + + async def _execute_request( + self, + method: str, + endpoint: str, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + retry_on_auth_failure: bool = True, + ) -> httpx.Response: + """Execute HTTP request with circuit breaker and error handling.""" + + # Check circuit breaker + if not self.circuit_breaker.can_execute(): + raise PortainerCircuitBreakerError( + f"Circuit breaker is open for {self.name} service" + ) + + # Ensure client is initialized + if self.client is None: + await self.initialize() + + # Build request + url = self._build_url(endpoint) + request_headers = self._get_base_headers() + if headers: + request_headers.update(headers) + + self.logger.debug( + "Making HTTP request", + method=method, + url=url, + params=params, + service=self.name + ) + + try: + response = await self.client.request( + method=method, + url=url, + json=data, + params=params, + headers=request_headers, + ) + + # Check for authentication errors + if response.status_code == 401 and retry_on_auth_failure: + self.logger.warning("Authentication failed, attempting token refresh") + if await self._refresh_token(): + # Retry once with new token + return await self._execute_request( + method, endpoint, data, params, headers, retry_on_auth_failure=False + ) + + # Handle HTTP errors + if response.status_code >= 400: + error_message = f"HTTP {response.status_code}" + try: + error_data = response.json() + if isinstance(error_data, dict): + error_message = error_data.get("message", error_message) + except: + error_message = response.text or error_message + + error = map_http_error(response.status_code, error_message) + self.circuit_breaker.record_failure() + raise error + + # Record success + self.circuit_breaker.record_success() + + self.logger.debug( + "HTTP request successful", + method=method, + url=url, + status_code=response.status_code, + service=self.name + ) + + return response + + except httpx.TimeoutException as e: + self.circuit_breaker.record_failure() + raise PortainerTimeoutError(f"Request timeout: {str(e)}") + except httpx.NetworkError as e: + self.circuit_breaker.record_failure() + raise PortainerNetworkError(f"Network error: {str(e)}") + except PortainerError: + # Re-raise Portainer errors as-is + raise + except Exception as e: + self.circuit_breaker.record_failure() + raise PortainerError(f"Unexpected error: {str(e)}") + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type(( + PortainerNetworkError, + PortainerTimeoutError, + httpx.TimeoutException, + httpx.NetworkError, + )), + before_sleep=before_sleep_log(logger, "WARNING"), + ) + async def _make_request( + self, + method: str, + endpoint: str, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> httpx.Response: + """Make HTTP request with retry logic.""" + return await self._execute_request(method, endpoint, data, params, headers) + + async def get( + self, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> httpx.Response: + """Make GET request.""" + return await self._make_request("GET", endpoint, params=params, headers=headers) + + async def post( + self, + endpoint: str, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> httpx.Response: + """Make POST request.""" + return await self._make_request("POST", endpoint, data=data, params=params, headers=headers) + + async def put( + self, + endpoint: str, + data: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> httpx.Response: + """Make PUT request.""" + return await self._make_request("PUT", endpoint, data=data, params=params, headers=headers) + + async def delete( + self, + endpoint: str, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> httpx.Response: + """Make DELETE request.""" + return await self._make_request("DELETE", endpoint, params=params, headers=headers) + + async def _refresh_token(self) -> bool: + """Refresh authentication token if needed.""" + # TODO: Implement token refresh logic + # This will be implemented in the auth service + self.logger.warning("Token refresh not implemented yet") + return False + + @abstractmethod + async def health_check(self) -> bool: + """Check if the service is healthy.""" + pass \ No newline at end of file diff --git a/src/portainer_core/services/settings.py b/src/portainer_core/services/settings.py new file mode 100644 index 0000000..39c5959 --- /dev/null +++ b/src/portainer_core/services/settings.py @@ -0,0 +1,251 @@ +""" +Settings management service for Portainer Core MCP Server. + +This module provides settings management functionality for Portainer Business Edition +configuration and system settings. +""" + +from typing import Dict, Any, Optional + +from ..services.base import BaseService +from ..models.settings import SettingsResponse, UpdateSettingsRequest +from ..utils.errors import ( + PortainerValidationError, + PortainerAuthorizationError, +) +from ..utils.logging import get_logger + + +class SettingsService(BaseService): + """Service for settings management operations.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger = get_logger(__name__) + + async def get_settings(self) -> SettingsResponse: + """ + Get current Portainer settings. + + Returns: + SettingsResponse object with current settings + """ + try: + response = await self._make_request( + method="GET", + endpoint="/settings" + ) + + return SettingsResponse(**response) + + except Exception as e: + self.logger.error("Failed to get settings", error=str(e)) + raise PortainerValidationError(f"Failed to get settings: {str(e)}") + + async def get_public_settings(self) -> Dict[str, Any]: + """ + Get public settings (accessible without authentication). + + Returns: + Dictionary of public settings + """ + try: + response = await self._make_request( + method="GET", + endpoint="/settings/public", + require_auth=False + ) + + return response + + except Exception as e: + self.logger.error("Failed to get public settings", error=str(e)) + raise PortainerValidationError(f"Failed to get public settings: {str(e)}") + + async def update_settings(self, settings_data: UpdateSettingsRequest) -> SettingsResponse: + """ + Update Portainer settings. + + Args: + settings_data: Settings update data + + Returns: + SettingsResponse object with updated settings + """ + try: + response = await self._make_request( + method="PUT", + endpoint="/settings", + json_data=settings_data.model_dump(exclude_unset=True) + ) + + self.logger.info("Settings updated successfully") + return SettingsResponse(**response) + + except Exception as e: + self.logger.error("Failed to update settings", error=str(e)) + raise PortainerAuthorizationError(f"Failed to update settings: {str(e)}") + + async def get_ldap_settings(self) -> Dict[str, Any]: + """ + Get LDAP authentication settings. + + Returns: + Dictionary of LDAP settings + """ + try: + response = await self._make_request( + method="GET", + endpoint="/settings/authentication/ldap" + ) + + return response + + except Exception as e: + self.logger.error("Failed to get LDAP settings", error=str(e)) + raise PortainerValidationError(f"Failed to get LDAP settings: {str(e)}") + + async def update_ldap_settings(self, ldap_data: Dict[str, Any]) -> bool: + """ + Update LDAP authentication settings. + + Args: + ldap_data: LDAP configuration data + + Returns: + True if update successful + """ + try: + await self._make_request( + method="PUT", + endpoint="/settings/authentication/ldap", + json_data=ldap_data + ) + + self.logger.info("LDAP settings updated successfully") + return True + + except Exception as e: + self.logger.error("Failed to update LDAP settings", error=str(e)) + raise PortainerAuthorizationError(f"Failed to update LDAP settings: {str(e)}") + + async def test_ldap_connectivity(self) -> Dict[str, Any]: + """ + Test LDAP server connectivity. + + Returns: + Dictionary with connectivity test results + """ + try: + response = await self._make_request( + method="POST", + endpoint="/settings/authentication/ldap/test" + ) + + return response + + except Exception as e: + self.logger.error("LDAP connectivity test failed", error=str(e)) + raise PortainerValidationError(f"LDAP connectivity test failed: {str(e)}") + + async def get_edge_settings(self) -> Dict[str, Any]: + """ + Get Edge computing settings. + + Returns: + Dictionary of Edge settings + """ + try: + response = await self._make_request( + method="GET", + endpoint="/settings/edge" + ) + + return response + + except Exception as e: + self.logger.error("Failed to get Edge settings", error=str(e)) + raise PortainerValidationError(f"Failed to get Edge settings: {str(e)}") + + async def update_edge_settings(self, edge_data: Dict[str, Any]) -> bool: + """ + Update Edge computing settings. + + Args: + edge_data: Edge configuration data + + Returns: + True if update successful + """ + try: + await self._make_request( + method="PUT", + endpoint="/settings/edge", + json_data=edge_data + ) + + self.logger.info("Edge settings updated successfully") + return True + + except Exception as e: + self.logger.error("Failed to update Edge settings", error=str(e)) + raise PortainerAuthorizationError(f"Failed to update Edge settings: {str(e)}") + + async def get_ssl_settings(self) -> Dict[str, Any]: + """ + Get SSL/TLS settings. + + Returns: + Dictionary of SSL settings + """ + try: + response = await self._make_request( + method="GET", + endpoint="/settings/ssl" + ) + + return response + + except Exception as e: + self.logger.error("Failed to get SSL settings", error=str(e)) + raise PortainerValidationError(f"Failed to get SSL settings: {str(e)}") + + async def update_ssl_settings(self, ssl_data: Dict[str, Any]) -> bool: + """ + Update SSL/TLS settings. + + Args: + ssl_data: SSL configuration data + + Returns: + True if update successful + """ + try: + await self._make_request( + method="PUT", + endpoint="/settings/ssl", + json_data=ssl_data + ) + + self.logger.info("SSL settings updated successfully") + return True + + except Exception as e: + self.logger.error("Failed to update SSL settings", error=str(e)) + raise PortainerAuthorizationError(f"Failed to update SSL settings: {str(e)}") + + async def health_check(self) -> bool: + """ + Check if the settings service is healthy. + + Returns: + True if service is healthy + """ + try: + # Try to get public settings (no auth required) + await self.get_public_settings() + return True + + except Exception as e: + self.logger.error("Settings service health check failed", error=str(e)) + return False \ No newline at end of file diff --git a/src/portainer_core/services/users.py b/src/portainer_core/services/users.py new file mode 100644 index 0000000..3012447 --- /dev/null +++ b/src/portainer_core/services/users.py @@ -0,0 +1,270 @@ +""" +User management service for Portainer Core MCP Server. + +This module provides user management functionality including creating, updating, +and managing users in Portainer Business Edition. +""" + +from typing import Dict, List, Any, Optional + +from ..services.base import BaseService +from ..models.users import ( + CreateUserRequest, + UpdateUserRequest, + UserResponse, + ChangePasswordRequest +) +from ..utils.errors import ( + PortainerValidationError, + PortainerNotFoundError, + PortainerAuthorizationError, +) +from ..utils.logging import get_logger + + +class UserService(BaseService): + """Service for user management operations.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.logger = get_logger(__name__) + + async def list_users(self) -> List[UserResponse]: + """ + List all users in the Portainer instance. + + Returns: + List of UserResponse objects + """ + try: + response = await self._make_request( + method="GET", + endpoint="/users" + ) + + return [UserResponse(**user) for user in response] + + except Exception as e: + self.logger.error("Failed to list users", error=str(e)) + raise PortainerValidationError(f"Failed to list users: {str(e)}") + + async def get_user(self, user_id: int) -> UserResponse: + """ + Get details of a specific user. + + Args: + user_id: ID of the user to retrieve + + Returns: + UserResponse object + + Raises: + PortainerNotFoundError: If user doesn't exist + """ + try: + response = await self._make_request( + method="GET", + endpoint=f"/users/{user_id}" + ) + + return UserResponse(**response) + + except Exception as e: + self.logger.error("Failed to get user", user_id=user_id, error=str(e)) + if "404" in str(e): + raise PortainerNotFoundError(f"User {user_id} not found") + raise PortainerValidationError(f"Failed to get user: {str(e)}") + + async def create_user(self, user_data: CreateUserRequest) -> UserResponse: + """ + Create a new user. + + Args: + user_data: User creation data + + Returns: + UserResponse object for the created user + """ + try: + response = await self._make_request( + method="POST", + endpoint="/users", + json_data=user_data.model_dump() + ) + + self.logger.info("User created successfully", username=user_data.username) + return UserResponse(**response) + + except Exception as e: + self.logger.error("Failed to create user", username=user_data.username, error=str(e)) + raise PortainerValidationError(f"Failed to create user: {str(e)}") + + async def update_user(self, user_id: int, user_data: UpdateUserRequest) -> UserResponse: + """ + Update an existing user. + + Args: + user_id: ID of the user to update + user_data: User update data + + Returns: + UserResponse object for the updated user + """ + try: + response = await self._make_request( + method="PUT", + endpoint=f"/users/{user_id}", + json_data=user_data.model_dump(exclude_unset=True) + ) + + self.logger.info("User updated successfully", user_id=user_id) + return UserResponse(**response) + + except Exception as e: + self.logger.error("Failed to update user", user_id=user_id, error=str(e)) + if "404" in str(e): + raise PortainerNotFoundError(f"User {user_id} not found") + raise PortainerValidationError(f"Failed to update user: {str(e)}") + + async def delete_user(self, user_id: int) -> bool: + """ + Delete a user. + + Args: + user_id: ID of the user to delete + + Returns: + True if deletion successful + """ + try: + await self._make_request( + method="DELETE", + endpoint=f"/users/{user_id}" + ) + + self.logger.info("User deleted successfully", user_id=user_id) + return True + + except Exception as e: + self.logger.error("Failed to delete user", user_id=user_id, error=str(e)) + if "404" in str(e): + raise PortainerNotFoundError(f"User {user_id} not found") + raise PortainerValidationError(f"Failed to delete user: {str(e)}") + + async def change_password(self, user_id: int, password_data: ChangePasswordRequest) -> bool: + """ + Change a user's password. + + Args: + user_id: ID of the user + password_data: Password change data + + Returns: + True if password change successful + """ + try: + await self._make_request( + method="PUT", + endpoint=f"/users/{user_id}/passwd", + json_data=password_data.model_dump() + ) + + self.logger.info("Password changed successfully", user_id=user_id) + return True + + except Exception as e: + self.logger.error("Failed to change password", user_id=user_id, error=str(e)) + if "404" in str(e): + raise PortainerNotFoundError(f"User {user_id} not found") + raise PortainerAuthorizationError(f"Failed to change password: {str(e)}") + + async def get_user_memberships(self, user_id: int) -> List[Dict[str, Any]]: + """ + Get team memberships for a user. + + Args: + user_id: ID of the user + + Returns: + List of team membership data + """ + try: + response = await self._make_request( + method="GET", + endpoint=f"/users/{user_id}/memberships" + ) + + return response + + except Exception as e: + self.logger.error("Failed to get user memberships", user_id=user_id, error=str(e)) + if "404" in str(e): + raise PortainerNotFoundError(f"User {user_id} not found") + raise PortainerValidationError(f"Failed to get user memberships: {str(e)}") + + async def check_admin_user_exists(self) -> bool: + """ + Check if an admin user exists in the system. + + Returns: + True if admin user exists + """ + try: + response = await self._make_request( + method="GET", + endpoint="/users/admin/check", + require_auth=False + ) + + return response.get("exists", False) + + except Exception as e: + self.logger.error("Failed to check admin user", error=str(e)) + return False + + async def initialize_admin_user(self, username: str, password: str) -> UserResponse: + """ + Initialize the admin user (first-time setup). + + Args: + username: Admin username + password: Admin password + + Returns: + UserResponse for the created admin user + """ + try: + data = { + "Username": username, + "Password": password + } + + response = await self._make_request( + method="POST", + endpoint="/users/admin/init", + json_data=data, + require_auth=False + ) + + self.logger.info("Admin user initialized", username=username) + return UserResponse(**response) + + except Exception as e: + self.logger.error("Failed to initialize admin user", username=username, error=str(e)) + raise PortainerValidationError(f"Failed to initialize admin user: {str(e)}") + + async def health_check(self) -> bool: + """ + Check if the user service is healthy. + + Returns: + True if service is healthy + """ + try: + # Try to list users (requires authentication) + await self.list_users() + return True + + except Exception as e: + self.logger.error("User service health check failed", error=str(e)) + return False \ No newline at end of file diff --git a/src/portainer_core/utils/__init__.py b/src/portainer_core/utils/__init__.py new file mode 100644 index 0000000..fdbf972 --- /dev/null +++ b/src/portainer_core/utils/__init__.py @@ -0,0 +1,5 @@ +""" +Utility modules for Portainer Core MCP Server. + +This module contains utility functions and helper classes. +""" \ No newline at end of file diff --git a/src/portainer_core/utils/__pycache__/__init__.cpython-312.pyc b/src/portainer_core/utils/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..4eb41dc Binary files /dev/null and b/src/portainer_core/utils/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/portainer_core/utils/__pycache__/errors.cpython-312.pyc b/src/portainer_core/utils/__pycache__/errors.cpython-312.pyc new file mode 100644 index 0000000..ea68d33 Binary files /dev/null and b/src/portainer_core/utils/__pycache__/errors.cpython-312.pyc differ diff --git a/src/portainer_core/utils/__pycache__/logging.cpython-312.pyc b/src/portainer_core/utils/__pycache__/logging.cpython-312.pyc new file mode 100644 index 0000000..61961f8 Binary files /dev/null and b/src/portainer_core/utils/__pycache__/logging.cpython-312.pyc differ diff --git a/src/portainer_core/utils/__pycache__/tokens.cpython-312.pyc b/src/portainer_core/utils/__pycache__/tokens.cpython-312.pyc new file mode 100644 index 0000000..c92e5cc Binary files /dev/null and b/src/portainer_core/utils/__pycache__/tokens.cpython-312.pyc differ diff --git a/src/portainer_core/utils/errors.py b/src/portainer_core/utils/errors.py new file mode 100644 index 0000000..39cbbad --- /dev/null +++ b/src/portainer_core/utils/errors.py @@ -0,0 +1,174 @@ +""" +Error handling utilities for Portainer Core MCP Server. + +This module provides custom exceptions and error handling utilities. +""" + +from typing import Any, Dict, Optional + + +class PortainerError(Exception): + """Base exception for Portainer-related errors.""" + + def __init__( + self, + message: str, + status_code: Optional[int] = None, + details: Optional[Dict[str, Any]] = None + ): + super().__init__(message) + self.message = message + self.status_code = status_code + self.details = details or {} + + def __str__(self) -> str: + if self.status_code: + return f"[{self.status_code}] {self.message}" + return self.message + + +class PortainerAuthenticationError(PortainerError): + """Authentication-related errors.""" + + def __init__(self, message: str = "Authentication failed", **kwargs): + super().__init__(message, status_code=401, **kwargs) + + +class PortainerAuthorizationError(PortainerError): + """Authorization-related errors.""" + + def __init__(self, message: str = "Insufficient permissions", **kwargs): + super().__init__(message, status_code=403, **kwargs) + + +class PortainerNotFoundError(PortainerError): + """Resource not found errors.""" + + def __init__(self, message: str = "Resource not found", **kwargs): + super().__init__(message, status_code=404, **kwargs) + + +class PortainerValidationError(PortainerError): + """Request validation errors.""" + + def __init__(self, message: str = "Invalid request data", **kwargs): + super().__init__(message, status_code=400, **kwargs) + + +class PortainerConflictError(PortainerError): + """Resource conflict errors.""" + + def __init__(self, message: str = "Resource conflict", **kwargs): + super().__init__(message, status_code=409, **kwargs) + + +class PortainerServerError(PortainerError): + """Server-side errors.""" + + def __init__(self, message: str = "Internal server error", **kwargs): + super().__init__(message, status_code=500, **kwargs) + + +class PortainerNetworkError(PortainerError): + """Network-related errors.""" + + def __init__(self, message: str = "Network error", **kwargs): + super().__init__(message, **kwargs) + + +class PortainerTimeoutError(PortainerError): + """Request timeout errors.""" + + def __init__(self, message: str = "Request timeout", **kwargs): + super().__init__(message, **kwargs) + + +class PortainerRateLimitError(PortainerError): + """Rate limiting errors.""" + + def __init__(self, message: str = "Rate limit exceeded", **kwargs): + super().__init__(message, status_code=429, **kwargs) + + +class PortainerCircuitBreakerError(PortainerError): + """Circuit breaker errors.""" + + def __init__(self, message: str = "Circuit breaker is open", **kwargs): + super().__init__(message, **kwargs) + + +class PortainerTokenExpiredError(PortainerAuthenticationError): + """Token expiration errors.""" + + def __init__(self, message: str = "Token has expired", **kwargs): + super().__init__(message, **kwargs) + + +class PortainerConfigurationError(PortainerError): + """Configuration-related errors.""" + + def __init__(self, message: str = "Configuration error", **kwargs): + super().__init__(message, **kwargs) + + +def map_http_error(status_code: int, message: str, details: Optional[Dict[str, Any]] = None) -> PortainerError: + """Map HTTP status codes to appropriate exception types.""" + error_map = { + 400: PortainerValidationError, + 401: PortainerAuthenticationError, + 403: PortainerAuthorizationError, + 404: PortainerNotFoundError, + 409: PortainerConflictError, + 429: PortainerRateLimitError, + 500: PortainerServerError, + 502: PortainerServerError, + 503: PortainerServerError, + 504: PortainerTimeoutError, + } + + error_class = error_map.get(status_code, PortainerError) + + # For known error classes, pass only message and details to avoid status_code conflicts + if error_class in error_map.values(): + return error_class(message, details=details) + else: + # For generic PortainerError, include status_code + return error_class(message, status_code=status_code, details=details) + + +def is_retryable_error(error: Exception) -> bool: + """Check if an error is retryable.""" + if isinstance(error, PortainerError): + # Don't retry authentication, authorization, validation, or not found errors + if isinstance(error, ( + PortainerAuthenticationError, + PortainerAuthorizationError, + PortainerValidationError, + PortainerNotFoundError, + PortainerConflictError + )): + return False + + # Don't retry client errors (4xx) except for rate limiting + if error.status_code and 400 <= error.status_code < 500: + return isinstance(error, PortainerRateLimitError) + + # Retry server errors (5xx) and network errors + return True + + # Retry network-related exceptions + if isinstance(error, (ConnectionError, TimeoutError)): + return True + + return False + + +def should_refresh_token(error: Exception) -> bool: + """Check if an error indicates token refresh is needed.""" + if isinstance(error, PortainerAuthenticationError): + return True + + if isinstance(error, PortainerError) and error.status_code == 401: + return True + + return False \ No newline at end of file diff --git a/src/portainer_core/utils/logging.py b/src/portainer_core/utils/logging.py new file mode 100644 index 0000000..b0545ab --- /dev/null +++ b/src/portainer_core/utils/logging.py @@ -0,0 +1,164 @@ +""" +Logging utilities for Portainer Core MCP Server. + +This module provides structured logging configuration and utilities. +""" + +import logging +import sys +import uuid +from contextvars import ContextVar +from typing import Any, Dict, Optional + +import structlog +from structlog.typing import EventDict + +from ..config import get_global_config + +# Context variable for correlation IDs +correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="") + + +def add_correlation_id(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + """Add correlation ID to log entries.""" + correlation_id = correlation_id_var.get() + if correlation_id: + event_dict["correlation_id"] = correlation_id + return event_dict + + +def add_service_info(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + """Add service information to log entries.""" + event_dict["service"] = "portainer-core-mcp" + event_dict["version"] = "0.1.0" + return event_dict + + +def mask_sensitive_data(logger: Any, method_name: str, event_dict: EventDict) -> EventDict: + """Mask sensitive data in log entries.""" + sensitive_keys = { + "password", "token", "api_key", "secret", "auth", "authorization", + "x-api-key", "bearer", "jwt", "credential", "credentials" + } + + def mask_dict(data: Dict[str, Any]) -> Dict[str, Any]: + """Recursively mask sensitive data in dictionaries.""" + masked = {} + for key, value in data.items(): + key_lower = key.lower() + if any(sensitive in key_lower for sensitive in sensitive_keys): + masked[key] = "***MASKED***" + elif isinstance(value, dict): + masked[key] = mask_dict(value) + elif isinstance(value, list): + masked[key] = [ + mask_dict(item) if isinstance(item, dict) else item + for item in value + ] + else: + masked[key] = value + return masked + + # Mask sensitive data in the event dict + for key, value in event_dict.items(): + if isinstance(value, dict): + event_dict[key] = mask_dict(value) + elif isinstance(value, str): + key_lower = key.lower() + if any(sensitive in key_lower for sensitive in sensitive_keys): + event_dict[key] = "***MASKED***" + + return event_dict + + +def configure_logging() -> None: + """Configure structured logging.""" + try: + config = get_global_config() + log_level = config.log_level + log_format = config.log_format + except Exception: + # Fallback to defaults if config is not available + log_level = "INFO" + log_format = "json" + + processors = [ + structlog.contextvars.merge_contextvars, + add_correlation_id, + add_service_info, + mask_sensitive_data, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + ] + + if log_format == "json": + processors.append(structlog.processors.JSONRenderer()) + else: + processors.append(structlog.dev.ConsoleRenderer()) + + structlog.configure( + processors=processors, + wrapper_class=structlog.make_filtering_bound_logger( + getattr(logging, log_level) + ), + logger_factory=structlog.PrintLoggerFactory(), + cache_logger_on_first_use=True, + ) + + # Configure standard library logging + logging.basicConfig( + format="%(message)s", + stream=sys.stdout, + level=getattr(logging, log_level), + ) + + +def get_logger(name: Optional[str] = None) -> structlog.BoundLogger: + """Get a configured logger instance.""" + return structlog.get_logger(name) + + +def set_correlation_id(correlation_id: Optional[str] = None) -> str: + """Set correlation ID for request tracing.""" + if correlation_id is None: + correlation_id = str(uuid.uuid4()) + + correlation_id_var.set(correlation_id) + return correlation_id + + +def get_correlation_id() -> str: + """Get current correlation ID.""" + return correlation_id_var.get() + + +def clear_correlation_id() -> None: + """Clear correlation ID.""" + correlation_id_var.set("") + + +class LogContext: + """Context manager for setting correlation ID.""" + + def __init__(self, correlation_id: Optional[str] = None): + self.correlation_id = correlation_id + self.previous_id = None + + def __enter__(self) -> str: + self.previous_id = get_correlation_id() + return set_correlation_id(self.correlation_id) + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + if self.previous_id: + correlation_id_var.set(self.previous_id) + else: + clear_correlation_id() + + +# Initialize logging configuration +configure_logging() + +# Create module logger +logger = get_logger(__name__) \ No newline at end of file diff --git a/src/portainer_core/utils/tokens.py b/src/portainer_core/utils/tokens.py new file mode 100644 index 0000000..2a31401 --- /dev/null +++ b/src/portainer_core/utils/tokens.py @@ -0,0 +1,231 @@ +""" +JWT token utilities for Portainer Core MCP Server. + +This module provides utilities for handling JWT tokens including validation, +expiration checking, and token management. +""" + +import json +import base64 +from datetime import datetime, timezone +from typing import Dict, Any, Optional + +from ..utils.logging import get_logger + + +logger = get_logger(__name__) + + +def decode_jwt_token(token: str) -> Optional[Dict[str, Any]]: + """ + Decode a JWT token without verification. + + Note: This function only decodes the token payload for inspection. + It does not verify the token signature. + + Args: + token: JWT token to decode + + Returns: + Decoded token payload or None if invalid + """ + try: + # Remove Bearer prefix if present + if token.startswith("Bearer "): + token = token[7:] + + # JWT tokens have 3 parts separated by dots + parts = token.split(".") + if len(parts) != 3: + logger.warning("Invalid JWT format: token does not have 3 parts") + return None + + # Decode the payload (second part) + payload = parts[1] + + # Add padding if needed (JWT base64 encoding may not include padding) + missing_padding = len(payload) % 4 + if missing_padding: + payload += "=" * (4 - missing_padding) + + # Decode from base64 + decoded_bytes = base64.urlsafe_b64decode(payload) + decoded_payload = json.loads(decoded_bytes.decode("utf-8")) + + return decoded_payload + + except Exception as e: + logger.error("Failed to decode JWT token", error=str(e)) + return None + + +def is_token_expired(token: str) -> bool: + """ + Check if a JWT token is expired. + + Args: + token: JWT token to check + + Returns: + True if token is expired or invalid + """ + try: + payload = decode_jwt_token(token) + if not payload: + return True + + # Check expiration time (exp claim) + exp = payload.get("exp") + if not exp: + # No expiration claim, assume token is valid + logger.warning("JWT token has no expiration claim") + return False + + # Convert exp (timestamp) to datetime + exp_datetime = datetime.fromtimestamp(exp, tz=timezone.utc) + current_datetime = datetime.now(tz=timezone.utc) + + is_expired = current_datetime >= exp_datetime + + if is_expired: + logger.info("JWT token is expired", + exp=exp_datetime.isoformat(), + now=current_datetime.isoformat()) + + return is_expired + + except Exception as e: + logger.error("Failed to check token expiration", error=str(e)) + return True + + +def get_token_claims(token: str) -> Dict[str, Any]: + """ + Get all claims from a JWT token. + + Args: + token: JWT token to parse + + Returns: + Dictionary of token claims + """ + payload = decode_jwt_token(token) + return payload or {} + + +def get_token_expiration(token: str) -> Optional[datetime]: + """ + Get the expiration time of a JWT token. + + Args: + token: JWT token to check + + Returns: + Expiration datetime or None if not available + """ + try: + payload = decode_jwt_token(token) + if not payload: + return None + + exp = payload.get("exp") + if not exp: + return None + + return datetime.fromtimestamp(exp, tz=timezone.utc) + + except Exception as e: + logger.error("Failed to get token expiration", error=str(e)) + return None + + +def get_token_subject(token: str) -> Optional[str]: + """ + Get the subject (user ID) from a JWT token. + + Args: + token: JWT token to parse + + Returns: + Subject claim value or None + """ + payload = decode_jwt_token(token) + if payload: + return payload.get("sub") + return None + + +def get_token_issuer(token: str) -> Optional[str]: + """ + Get the issuer from a JWT token. + + Args: + token: JWT token to parse + + Returns: + Issuer claim value or None + """ + payload = decode_jwt_token(token) + if payload: + return payload.get("iss") + return None + + +def is_token_valid_for_duration(token: str, min_duration_seconds: int = 300) -> bool: + """ + Check if a token is valid for at least the specified duration. + + Args: + token: JWT token to check + min_duration_seconds: Minimum remaining validity in seconds (default: 5 minutes) + + Returns: + True if token is valid for at least the specified duration + """ + try: + exp_time = get_token_expiration(token) + if not exp_time: + # No expiration time, assume it's valid + return True + + current_time = datetime.now(tz=timezone.utc) + remaining_seconds = (exp_time - current_time).total_seconds() + + return remaining_seconds >= min_duration_seconds + + except Exception as e: + logger.error("Failed to check token validity duration", error=str(e)) + return False + + +def format_token_info(token: str) -> str: + """ + Format token information for logging/debugging. + + Args: + token: JWT token to format + + Returns: + Formatted string with token information + """ + try: + payload = decode_jwt_token(token) + if not payload: + return "Invalid token" + + exp_time = get_token_expiration(token) + subject = get_token_subject(token) + issuer = get_token_issuer(token) + + info_parts = [] + if subject: + info_parts.append(f"sub: {subject}") + if issuer: + info_parts.append(f"iss: {issuer}") + if exp_time: + info_parts.append(f"exp: {exp_time.isoformat()}") + + return " | ".join(info_parts) if info_parts else "No standard claims found" + + except Exception as e: + return f"Error formatting token info: {str(e)}" \ No newline at end of file diff --git a/src/portainer_core_mcp.egg-info/PKG-INFO b/src/portainer_core_mcp.egg-info/PKG-INFO new file mode 100644 index 0000000..de5c578 --- /dev/null +++ b/src/portainer_core_mcp.egg-info/PKG-INFO @@ -0,0 +1,175 @@ +Metadata-Version: 2.4 +Name: portainer-core-mcp +Version: 0.1.0 +Summary: Portainer Core MCP Server - Authentication and User Management +Author-email: Your Name +License: MIT +Project-URL: Homepage, https://github.com/yourusername/portainer-core-mcp +Project-URL: Documentation, https://github.com/yourusername/portainer-core-mcp#readme +Project-URL: Repository, https://github.com/yourusername/portainer-core-mcp +Project-URL: Issues, https://github.com/yourusername/portainer-core-mcp/issues +Classifier: Development Status :: 3 - Alpha +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: MIT License +Classifier: Programming Language :: Python :: 3 +Classifier: Programming Language :: Python :: 3.8 +Classifier: Programming Language :: Python :: 3.9 +Classifier: Programming Language :: Python :: 3.10 +Classifier: Programming Language :: Python :: 3.11 +Classifier: Programming Language :: Python :: 3.12 +Requires-Python: >=3.8 +Description-Content-Type: text/markdown +Requires-Dist: mcp>=1.0.0 +Requires-Dist: httpx>=0.25.0 +Requires-Dist: pydantic>=2.0.0 +Requires-Dist: pydantic-settings>=2.0.0 +Requires-Dist: structlog>=23.0.0 +Requires-Dist: PyJWT>=2.8.0 +Requires-Dist: python-dotenv>=1.0.0 +Requires-Dist: tenacity>=8.0.0 +Provides-Extra: dev +Requires-Dist: pytest>=7.0.0; extra == "dev" +Requires-Dist: pytest-asyncio>=0.21.0; extra == "dev" +Requires-Dist: pytest-cov>=4.0.0; extra == "dev" +Requires-Dist: pytest-mock>=3.10.0; extra == "dev" +Requires-Dist: httpx-mock>=0.10.0; extra == "dev" +Requires-Dist: black>=23.0.0; extra == "dev" +Requires-Dist: isort>=5.12.0; extra == "dev" +Requires-Dist: flake8>=6.0.0; extra == "dev" +Requires-Dist: mypy>=1.0.0; extra == "dev" +Requires-Dist: pre-commit>=3.0.0; extra == "dev" + +# Portainer Core MCP Server + +A Model Context Protocol (MCP) server for Portainer Business Edition authentication and user management. + +## Features + +- **Authentication & Session Management**: JWT token handling and user authentication +- **User Management**: Create, read, update, and delete users +- **Settings Management**: Retrieve and update Portainer settings +- **Secure Token Handling**: Automatic token refresh and secure storage +- **Error Handling**: Comprehensive error handling with retry logic +- **Circuit Breaker**: Fault tolerance for external API calls + +## Installation + +```bash +pip install portainer-core-mcp +``` + +## Configuration + +Set the following environment variables: + +```bash +PORTAINER_URL=https://your-portainer-instance.com +PORTAINER_API_KEY=your-api-token # Optional, for API key authentication +PORTAINER_USERNAME=admin # For username/password authentication +PORTAINER_PASSWORD=your-password # For username/password authentication +``` + +## Usage + +### As MCP Server + +```bash +portainer-core-mcp +``` + +### Programmatic Usage + +```python +from portainer_core.server import PortainerCoreMCPServer + +server = PortainerCoreMCPServer() +# Use server instance +``` + +## Available MCP Tools + +- `authenticate` - Login with username/password +- `generate_token` - Generate API token +- `get_current_user` - Get authenticated user info +- `list_users` - List all users +- `create_user` - Create new user +- `update_user` - Update user details +- `delete_user` - Delete user +- `get_settings` - Get Portainer settings +- `update_settings` - Update settings + +## Development + +### Setup + +```bash +# Clone the repository +git clone https://github.com/yourusername/portainer-core-mcp.git +cd portainer-core-mcp + +# Install development dependencies +pip install -e ".[dev]" + +# Install pre-commit hooks +pre-commit install +``` + +### Testing + +```bash +# Run all tests +pytest + +# Run with coverage +pytest --cov=src/portainer_core --cov-report=html + +# Run only unit tests +pytest -m unit + +# Run only integration tests +pytest -m integration +``` + +### Code Quality + +```bash +# Format code +black src tests +isort src tests + +# Lint code +flake8 src tests + +# Type checking +mypy src +``` + +## Architecture + +The server follows a layered architecture: + +- **MCP Server Layer**: Handles MCP protocol communication +- **Service Layer**: Abstracts Portainer API interactions +- **Models Layer**: Defines data structures and validation +- **Utils Layer**: Provides utility functions and helpers + +## Security + +- All API communications use HTTPS +- JWT tokens are handled securely and never logged +- Input validation on all parameters +- Rate limiting to prevent abuse +- Circuit breaker pattern for fault tolerance + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Add tests for new functionality +5. Ensure all tests pass +6. Submit a pull request + +## License + +MIT License - see LICENSE file for details. diff --git a/src/portainer_core_mcp.egg-info/SOURCES.txt b/src/portainer_core_mcp.egg-info/SOURCES.txt new file mode 100644 index 0000000..dcad2f4 --- /dev/null +++ b/src/portainer_core_mcp.egg-info/SOURCES.txt @@ -0,0 +1,18 @@ +README.md +pyproject.toml +src/portainer_core/__init__.py +src/portainer_core/config.py +src/portainer_core/server.py +src/portainer_core/models/__init__.py +src/portainer_core/services/__init__.py +src/portainer_core/services/base.py +src/portainer_core/utils/__init__.py +src/portainer_core/utils/errors.py +src/portainer_core/utils/logging.py +src/portainer_core_mcp.egg-info/PKG-INFO +src/portainer_core_mcp.egg-info/SOURCES.txt +src/portainer_core_mcp.egg-info/dependency_links.txt +src/portainer_core_mcp.egg-info/entry_points.txt +src/portainer_core_mcp.egg-info/requires.txt +src/portainer_core_mcp.egg-info/top_level.txt +tests/test_basic.py \ No newline at end of file diff --git a/src/portainer_core_mcp.egg-info/dependency_links.txt b/src/portainer_core_mcp.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/portainer_core_mcp.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/portainer_core_mcp.egg-info/entry_points.txt b/src/portainer_core_mcp.egg-info/entry_points.txt new file mode 100644 index 0000000..1d8eacc --- /dev/null +++ b/src/portainer_core_mcp.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +portainer-core-mcp = portainer_core.server:main diff --git a/src/portainer_core_mcp.egg-info/requires.txt b/src/portainer_core_mcp.egg-info/requires.txt new file mode 100644 index 0000000..312cdcb --- /dev/null +++ b/src/portainer_core_mcp.egg-info/requires.txt @@ -0,0 +1,20 @@ +mcp>=1.0.0 +httpx>=0.25.0 +pydantic>=2.0.0 +pydantic-settings>=2.0.0 +structlog>=23.0.0 +PyJWT>=2.8.0 +python-dotenv>=1.0.0 +tenacity>=8.0.0 + +[dev] +pytest>=7.0.0 +pytest-asyncio>=0.21.0 +pytest-cov>=4.0.0 +pytest-mock>=3.10.0 +httpx-mock>=0.10.0 +black>=23.0.0 +isort>=5.12.0 +flake8>=6.0.0 +mypy>=1.0.0 +pre-commit>=3.0.0 diff --git a/src/portainer_core_mcp.egg-info/top_level.txt b/src/portainer_core_mcp.egg-info/top_level.txt new file mode 100644 index 0000000..661bb10 --- /dev/null +++ b/src/portainer_core_mcp.egg-info/top_level.txt @@ -0,0 +1 @@ +portainer_core diff --git a/tests/__pycache__/test_basic.cpython-312-pytest-7.4.3.pyc b/tests/__pycache__/test_basic.cpython-312-pytest-7.4.3.pyc new file mode 100644 index 0000000..bdac776 Binary files /dev/null and b/tests/__pycache__/test_basic.cpython-312-pytest-7.4.3.pyc differ diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..55623b8 --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,184 @@ +""" +Basic tests for Portainer Core MCP Server. + +This module contains basic tests to verify the setup and configuration. +""" + +import pytest +import os +from unittest.mock import patch + +from portainer_core.config import PortainerConfig, get_config +from portainer_core.utils.errors import PortainerError, PortainerAuthenticationError +from portainer_core.utils.logging import get_logger, set_correlation_id + + +class TestConfiguration: + """Test configuration management.""" + + def test_config_validation_with_api_key(self): + """Test configuration validation with API key.""" + with patch.dict(os.environ, { + 'PORTAINER_URL': 'https://test.example.com', + 'PORTAINER_API_KEY': 'test-api-key' + }): + config = PortainerConfig() + config.validate_auth_config() + assert config.portainer_url == 'https://test.example.com' + assert config.portainer_api_key == 'test-api-key' + assert config.use_api_key_auth is True + assert config.use_credentials_auth is False + + def test_config_validation_with_credentials(self): + """Test configuration validation with username/password.""" + with patch.dict(os.environ, { + 'PORTAINER_URL': 'https://test.example.com', + 'PORTAINER_USERNAME': 'admin', + 'PORTAINER_PASSWORD': 'password' + }): + config = PortainerConfig() + config.validate_auth_config() + assert config.portainer_url == 'https://test.example.com' + assert config.portainer_username == 'admin' + assert config.portainer_password == 'password' + assert config.use_api_key_auth is False + assert config.use_credentials_auth is True + + def test_config_validation_no_auth(self): + """Test configuration validation without authentication.""" + with patch.dict(os.environ, { + 'PORTAINER_URL': 'https://test.example.com' + }, clear=True): + config = PortainerConfig() + with pytest.raises(ValueError, match="Either PORTAINER_API_KEY or both"): + config.validate_auth_config() + + def test_invalid_url(self): + """Test invalid URL validation.""" + with patch.dict(os.environ, { + 'PORTAINER_URL': 'invalid-url', + 'PORTAINER_API_KEY': 'test-key' + }): + with pytest.raises(ValueError, match="Invalid URL format"): + PortainerConfig() + + def test_url_trailing_slash_removal(self): + """Test URL trailing slash removal.""" + with patch.dict(os.environ, { + 'PORTAINER_URL': 'https://test.example.com/', + 'PORTAINER_API_KEY': 'test-key' + }): + config = PortainerConfig() + assert config.portainer_url == 'https://test.example.com' + + def test_api_base_url(self): + """Test API base URL construction.""" + with patch.dict(os.environ, { + 'PORTAINER_URL': 'https://test.example.com', + 'PORTAINER_API_KEY': 'test-key' + }): + config = PortainerConfig() + assert config.api_base_url == 'https://test.example.com/api' + + +class TestErrors: + """Test error handling utilities.""" + + def test_portainer_error_basic(self): + """Test basic PortainerError.""" + error = PortainerError("Test error") + assert str(error) == "Test error" + assert error.message == "Test error" + assert error.status_code is None + assert error.details == {} + + def test_portainer_error_with_status_code(self): + """Test PortainerError with status code.""" + error = PortainerError("Test error", status_code=400) + assert str(error) == "[400] Test error" + assert error.status_code == 400 + + def test_portainer_authentication_error(self): + """Test PortainerAuthenticationError.""" + error = PortainerAuthenticationError() + assert error.status_code == 401 + assert "Authentication failed" in str(error) + + def test_error_mapping(self): + """Test HTTP error mapping.""" + from portainer_core.utils.errors import map_http_error + + error = map_http_error(404, "Not found") + assert error.__class__.__name__ == "PortainerNotFoundError" + assert error.status_code == 404 + + error = map_http_error(500, "Server error") + assert error.__class__.__name__ == "PortainerServerError" + assert error.status_code == 500 + + +class TestLogging: + """Test logging utilities.""" + + def test_get_logger(self): + """Test logger creation.""" + logger = get_logger("test") + assert logger is not None + + def test_correlation_id(self): + """Test correlation ID functionality.""" + correlation_id = set_correlation_id("test-id") + assert correlation_id == "test-id" + + from portainer_core.utils.logging import get_correlation_id + assert get_correlation_id() == "test-id" + + def test_correlation_id_auto_generation(self): + """Test automatic correlation ID generation.""" + correlation_id = set_correlation_id() + assert correlation_id is not None + assert len(correlation_id) > 0 + + +class TestCircuitBreaker: + """Test circuit breaker functionality.""" + + def test_circuit_breaker_initial_state(self): + """Test circuit breaker initial state.""" + from portainer_core.services.base import CircuitBreaker, CircuitBreakerState + + cb = CircuitBreaker() + assert cb.state == CircuitBreakerState.CLOSED + assert cb.can_execute() is True + assert cb.failure_count == 0 + + def test_circuit_breaker_failure_threshold(self): + """Test circuit breaker failure threshold.""" + from portainer_core.services.base import CircuitBreaker, CircuitBreakerState + + cb = CircuitBreaker(failure_threshold=2) + + # First failure + cb.record_failure() + assert cb.state == CircuitBreakerState.CLOSED + assert cb.can_execute() is True + + # Second failure - should open + cb.record_failure() + assert cb.state == CircuitBreakerState.OPEN + assert cb.can_execute() is False + + def test_circuit_breaker_success_reset(self): + """Test circuit breaker success reset.""" + from portainer_core.services.base import CircuitBreaker, CircuitBreakerState + + cb = CircuitBreaker() + cb.record_failure() + cb.record_success() + + assert cb.failure_count == 0 + assert cb.last_failure_time is None + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file