feat(03-01): LLM key CRUD API endpoints with encryption

- Create llm_keys.py: GET list (redacted, key_hint only), POST (encrypt + store), DELETE (204 or 404)
- LlmKeyResponse never exposes encrypted_key or raw api_key
- 409 returned on duplicate (tenant_id, provider) key
- Cross-tenant deletion prevented by tenant_id verification in DELETE query
- Update api/__init__.py to export llm_keys_router
- All 5 LLM key CRUD tests passing (32 total unit tests green)
This commit is contained in:
2026-03-23 21:36:08 -06:00
parent 4cbf192fa5
commit 3c8fc255bc
3 changed files with 429 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ Import and mount these routers in service main.py files.
from shared.api.billing import billing_router, webhook_router from shared.api.billing import billing_router, webhook_router
from shared.api.channels import channels_router from shared.api.channels import channels_router
from shared.api.llm_keys import llm_keys_router
from shared.api.portal import portal_router from shared.api.portal import portal_router
from shared.api.usage import usage_router from shared.api.usage import usage_router
@@ -14,5 +15,6 @@ __all__ = [
"channels_router", "channels_router",
"billing_router", "billing_router",
"webhook_router", "webhook_router",
"llm_keys_router",
"usage_router", "usage_router",
] ]

View File

@@ -0,0 +1,222 @@
"""
LLM key CRUD API endpoints for the Konstruct portal.
Endpoints:
GET /api/portal/tenants/{tenant_id}/llm-keys → list BYO keys (redacted)
POST /api/portal/tenants/{tenant_id}/llm-keys → create encrypted key
DELETE /api/portal/tenants/{tenant_id}/llm-keys/{key_id} → remove key
Security design:
- The raw API key is NEVER stored — only the Fernet-encrypted ciphertext.
- The `key_hint` column stores the last 4 characters of the plaintext key
so the portal can display "...ABCD" without requiring decryption.
- GET responses include key_hint but never encrypted_key or the plaintext.
- DELETE verifies the key belongs to the requested tenant_id before removing.
UNIQUE constraint: (tenant_id, provider) — one BYO key per provider per tenant.
If a tenant needs to rotate a key, DELETE the existing row and POST a new one.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from shared.config import settings
from shared.crypto import KeyEncryptionService
from shared.db import get_session
from shared.models.billing import TenantLlmKey
from shared.models.tenant import Tenant
llm_keys_router = APIRouter(
prefix="/api/portal/tenants/{tenant_id}/llm-keys",
tags=["llm-keys"],
)
# ---------------------------------------------------------------------------
# Pydantic schemas
# ---------------------------------------------------------------------------
class LlmKeyCreate(BaseModel):
"""Request body for creating a new BYO API key."""
provider: str = Field(
...,
min_length=1,
max_length=100,
description="LLM provider name (e.g. 'openai', 'anthropic', 'cohere')",
examples=["openai"],
)
label: str = Field(
...,
min_length=1,
max_length=255,
description="Human-readable label for the key (e.g. 'Production OpenAI Key')",
examples=["Production OpenAI Key"],
)
api_key: str = Field(
...,
min_length=1,
description="The raw API key — will be encrypted before storage, never logged",
examples=["sk-..."],
)
class LlmKeyResponse(BaseModel):
"""Response body for LLM key operations — never contains raw or encrypted key."""
id: str
provider: str
label: str
key_hint: str | None
created_at: datetime
model_config = {"from_attributes": True}
@classmethod
def from_orm_key(cls, key: TenantLlmKey) -> "LlmKeyResponse":
return cls(
id=str(key.id),
provider=key.provider,
label=key.label,
key_hint=key.key_hint,
created_at=key.created_at,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _get_encryption_service() -> KeyEncryptionService:
"""Return the platform-level KeyEncryptionService from settings."""
if not settings.platform_encryption_key:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="PLATFORM_ENCRYPTION_KEY not configured",
)
return KeyEncryptionService(
primary_key=settings.platform_encryption_key,
previous_key=settings.platform_encryption_key_previous,
)
async def _get_tenant_or_404(tenant_id: uuid.UUID, session: AsyncSession) -> Tenant:
result = await session.execute(select(Tenant).where(Tenant.id == tenant_id))
tenant = result.scalar_one_or_none()
if tenant is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Tenant not found")
return tenant
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@llm_keys_router.get("", response_model=list[LlmKeyResponse])
async def list_llm_keys(
tenant_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> list[LlmKeyResponse]:
"""
List all BYO API keys for a tenant.
Returns: provider, label, key_hint (last 4 chars), created_at.
The encrypted_key and any plaintext key are NEVER returned.
"""
await _get_tenant_or_404(tenant_id, session)
result = await session.execute(
select(TenantLlmKey)
.where(TenantLlmKey.tenant_id == tenant_id)
.order_by(TenantLlmKey.created_at.desc())
)
keys = result.scalars().all()
return [LlmKeyResponse.from_orm_key(k) for k in keys]
@llm_keys_router.post("", response_model=LlmKeyResponse, status_code=status.HTTP_201_CREATED)
async def create_llm_key(
tenant_id: uuid.UUID,
body: LlmKeyCreate,
session: AsyncSession = Depends(get_session),
) -> LlmKeyResponse:
"""
Store a new encrypted BYO API key for a tenant.
The raw api_key is encrypted with Fernet before storage. The response
contains only the key_hint (last 4 chars) — the raw key is discarded
after encryption.
Returns 409 if a key for the same provider already exists for this tenant.
Returns 201 with the created key record on success.
"""
await _get_tenant_or_404(tenant_id, session)
# Check for existing key for this tenant+provider (UNIQUE constraint guard)
existing = await session.execute(
select(TenantLlmKey).where(
TenantLlmKey.tenant_id == tenant_id,
TenantLlmKey.provider == body.provider,
)
)
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"A BYO key for provider '{body.provider}' already exists for this tenant. "
"Delete the existing key before adding a new one.",
)
# Encrypt the API key — raw key must not be stored or logged
enc_svc = _get_encryption_service()
encrypted_key = enc_svc.encrypt(body.api_key)
key_hint = body.api_key[-4:] if len(body.api_key) >= 4 else body.api_key
key = TenantLlmKey(
tenant_id=tenant_id,
provider=body.provider,
label=body.label,
encrypted_key=encrypted_key,
key_hint=key_hint,
key_version=1,
)
session.add(key)
await session.commit()
await session.refresh(key)
return LlmKeyResponse.from_orm_key(key)
@llm_keys_router.delete("/{key_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_llm_key(
tenant_id: uuid.UUID,
key_id: uuid.UUID,
session: AsyncSession = Depends(get_session),
) -> None:
"""
Delete a BYO API key.
Verifies that the key belongs to the specified tenant_id before deletion
to prevent cross-tenant key removal. Returns 404 if not found.
"""
await _get_tenant_or_404(tenant_id, session)
result = await session.execute(
select(TenantLlmKey).where(
TenantLlmKey.id == key_id,
TenantLlmKey.tenant_id == tenant_id, # Cross-tenant protection
)
)
key = result.scalar_one_or_none()
if key is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="LLM key not found or does not belong to this tenant",
)
await session.delete(key)
await session.commit()

View File

@@ -0,0 +1,205 @@
"""
Unit tests for LLM key CRUD API endpoints.
Tests:
- test_create_llm_key: POST encrypts key and returns {id, provider, label, key_hint, created_at}
with no api_key in the response
- test_list_llm_keys_redacted: GET returns list without full key, only key_hint (last 4 chars)
- test_delete_llm_key: DELETE removes key, subsequent GET no longer includes it
- test_create_duplicate_provider: POST with same tenant_id+provider returns 409 Conflict
- test_delete_nonexistent_key: DELETE with unknown key_id returns 404
"""
from __future__ import annotations
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import HTTPException
from shared.api.llm_keys import (
LlmKeyCreate,
LlmKeyResponse,
create_llm_key,
delete_llm_key,
list_llm_keys,
)
from shared.models.billing import TenantLlmKey
def _make_mock_key(
tenant_id: uuid.UUID,
provider: str,
label: str,
api_key: str,
) -> TenantLlmKey:
"""Create a mock TenantLlmKey instance."""
key = MagicMock(spec=TenantLlmKey)
key.id = uuid.uuid4()
key.tenant_id = tenant_id
key.provider = provider
key.label = label
key.encrypted_key = f"ENCRYPTED_{api_key}"
key.key_hint = api_key[-4:]
key.key_version = 1
from datetime import datetime, timezone
key.created_at = datetime.now(tz=timezone.utc)
return key
@pytest.fixture()
def tenant_id() -> uuid.UUID:
return uuid.uuid4()
@pytest.fixture()
def mock_session() -> AsyncMock:
session = AsyncMock()
session.execute.return_value = MagicMock(
scalar_one_or_none=MagicMock(return_value=None),
scalars=MagicMock(return_value=MagicMock(all=MagicMock(return_value=[]))),
)
return session
@pytest.mark.asyncio
async def test_create_llm_key(tenant_id: uuid.UUID, mock_session: AsyncMock) -> None:
"""POST encrypts the key and returns {id, provider, label, key_hint, created_at} — no api_key."""
api_key = "sk-test-openai-key-ABCD"
body = LlmKeyCreate(provider="openai", label="Production Key", api_key=api_key)
# Mock: no existing key for this tenant+provider
mock_session.execute.return_value = MagicMock(
scalar_one_or_none=MagicMock(return_value=None)
)
created_key = _make_mock_key(tenant_id, "openai", "Production Key", api_key)
# Intercept session.add to capture the key and inject the mock
def _capture_add(obj: object) -> None:
# Mimic the ORM populating the key attributes after add+refresh
pass
mock_session.add.side_effect = _capture_add
# Patch refresh to update the "obj" with values from created_key
async def _mock_refresh(obj: object) -> None:
obj.id = created_key.id # type: ignore[union-attr]
obj.created_at = created_key.created_at # type: ignore[union-attr]
mock_session.refresh = AsyncMock(side_effect=_mock_refresh)
with patch("shared.api.llm_keys._get_encryption_service") as mock_enc:
enc = MagicMock()
enc.encrypt.return_value = "FERNET_CIPHERTEXT"
mock_enc.return_value = enc
with patch("shared.api.llm_keys._get_tenant_or_404", new_callable=AsyncMock):
response = await create_llm_key(
tenant_id=tenant_id,
body=body,
session=mock_session,
)
assert isinstance(response, LlmKeyResponse)
assert response.provider == "openai"
assert response.label == "Production Key"
assert response.key_hint == api_key[-4:]
# Response must NOT contain the raw api_key
response_dict = response.model_dump()
assert "api_key" not in response_dict
assert "encrypted_key" not in response_dict
@pytest.mark.asyncio
async def test_list_llm_keys_redacted(tenant_id: uuid.UUID, mock_session: AsyncMock) -> None:
"""GET returns list with key_hint (last 4 chars) — never full key or encrypted_key."""
api_key1 = "sk-openai-secret-WXYZ"
api_key2 = "sk-anthropic-key-1234"
key1 = _make_mock_key(tenant_id, "openai", "OpenAI Key", api_key1)
key2 = _make_mock_key(tenant_id, "anthropic", "Anthropic Key", api_key2)
mock_session.execute.return_value = MagicMock(
scalars=MagicMock(return_value=MagicMock(all=MagicMock(return_value=[key1, key2])))
)
with patch("shared.api.llm_keys._get_tenant_or_404", new_callable=AsyncMock):
response = await list_llm_keys(tenant_id=tenant_id, session=mock_session)
assert len(response) == 2
for item in response:
assert isinstance(item, LlmKeyResponse)
item_dict = item.model_dump()
# Must not contain full key or encrypted data
assert "api_key" not in item_dict
assert "encrypted_key" not in item_dict
# Must contain key_hint
assert item_dict["key_hint"] is not None
assert len(item_dict["key_hint"]) <= 4
@pytest.mark.asyncio
async def test_delete_llm_key(tenant_id: uuid.UUID, mock_session: AsyncMock) -> None:
"""DELETE removes the key and returns 204-equivalent (None)."""
key_id = uuid.uuid4()
existing_key = _make_mock_key(tenant_id, "openai", "Key to delete", "sk-DELETE-ABCD")
mock_session.execute.return_value = MagicMock(
scalar_one_or_none=MagicMock(return_value=existing_key)
)
with patch("shared.api.llm_keys._get_tenant_or_404", new_callable=AsyncMock):
result = await delete_llm_key(
tenant_id=tenant_id,
key_id=key_id,
session=mock_session,
)
assert result is None
mock_session.delete.assert_called_once_with(existing_key)
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
async def test_create_duplicate_provider(tenant_id: uuid.UUID, mock_session: AsyncMock) -> None:
"""POST with same tenant_id+provider returns 409 Conflict."""
# Mock: existing key found for this tenant+provider
existing_key = _make_mock_key(tenant_id, "openai", "Existing Key", "sk-EXISTING-9999")
mock_session.execute.return_value = MagicMock(
scalar_one_or_none=MagicMock(return_value=existing_key)
)
body = LlmKeyCreate(provider="openai", label="Duplicate Key", api_key="sk-NEW-KEY-0000")
with patch("shared.api.llm_keys._get_tenant_or_404", new_callable=AsyncMock):
with pytest.raises(HTTPException) as exc_info:
await create_llm_key(
tenant_id=tenant_id,
body=body,
session=mock_session,
)
assert exc_info.value.status_code == 409
@pytest.mark.asyncio
async def test_delete_nonexistent_key(tenant_id: uuid.UUID, mock_session: AsyncMock) -> None:
"""DELETE with unknown key_id returns 404."""
key_id = uuid.uuid4()
# Mock: no key found
mock_session.execute.return_value = MagicMock(
scalar_one_or_none=MagicMock(return_value=None)
)
with patch("shared.api.llm_keys._get_tenant_or_404", new_callable=AsyncMock):
with pytest.raises(HTTPException) as exc_info:
await delete_llm_key(
tenant_id=tenant_id,
key_id=key_id,
session=mock_session,
)
assert exc_info.value.status_code == 404