- Add stripe and cryptography to shared pyproject.toml - Add recharts, @stripe/stripe-js, stripe to portal package.json (submodule) - Add billing fields to Tenant model (stripe_customer_id, subscription_status, agent_quota, trial_ends_at) - Add budget_limit_usd to Agent model - Create TenantLlmKey and StripeEvent models in billing.py (AuditBase and Base respectively) - Create KeyEncryptionService (MultiFernet encrypt/decrypt/rotate) in crypto.py - Create compute_budget_status helper in usage.py (threshold logic: ok/warning/exceeded) - Add platform_encryption_key, stripe_, slack_oauth settings to config.py - Create Alembic migration 005 with all schema changes, RLS, grants, and composite index - All 12 tests passing (key encryption roundtrip, rotation, budget thresholds)
89 lines
3.0 KiB
Python
89 lines
3.0 KiB
Python
"""
|
|
Unit tests for KeyEncryptionService (Fernet-based encryption of BYO API keys).
|
|
|
|
Tests:
|
|
- encrypt/decrypt roundtrip
|
|
- different ciphertexts produced from same plaintext (Fernet random IV)
|
|
- invalid ciphertext raises InvalidToken
|
|
- MultiFernet rotation produces new ciphertext decryptable by current key
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
|
|
from shared.crypto import KeyEncryptionService
|
|
|
|
|
|
@pytest.fixture()
|
|
def primary_key() -> str:
|
|
"""Generate a fresh Fernet key for each test."""
|
|
return Fernet.generate_key().decode()
|
|
|
|
|
|
@pytest.fixture()
|
|
def secondary_key() -> str:
|
|
"""Second Fernet key for rotation tests."""
|
|
return Fernet.generate_key().decode()
|
|
|
|
|
|
def test_encrypt_decrypt_roundtrip(primary_key: str) -> None:
|
|
"""Encrypt then decrypt returns the original plaintext."""
|
|
svc = KeyEncryptionService(primary_key=primary_key)
|
|
plaintext = "sk-my-secret-api-key-12345"
|
|
|
|
ciphertext = svc.encrypt(plaintext)
|
|
result = svc.decrypt(ciphertext)
|
|
|
|
assert result == plaintext
|
|
|
|
|
|
def test_encrypt_produces_different_ciphertext(primary_key: str) -> None:
|
|
"""Same plaintext encrypted twice produces different ciphertexts (Fernet random IV)."""
|
|
svc = KeyEncryptionService(primary_key=primary_key)
|
|
plaintext = "sk-same-plaintext"
|
|
|
|
ct1 = svc.encrypt(plaintext)
|
|
ct2 = svc.encrypt(plaintext)
|
|
|
|
assert ct1 != ct2
|
|
# Both must still decrypt to the same value
|
|
assert svc.decrypt(ct1) == plaintext
|
|
assert svc.decrypt(ct2) == plaintext
|
|
|
|
|
|
def test_decrypt_invalid_raises(primary_key: str) -> None:
|
|
"""Decrypting garbage raises InvalidToken (or ValueError wrapping it)."""
|
|
svc = KeyEncryptionService(primary_key=primary_key)
|
|
|
|
with pytest.raises((InvalidToken, ValueError)):
|
|
svc.decrypt("this-is-not-valid-fernet-ciphertext")
|
|
|
|
|
|
def test_multifernet_rotation(primary_key: str, secondary_key: str) -> None:
|
|
"""
|
|
Rotation scenario:
|
|
1. Encrypt with 'old' key (secondary_key as primary, no previous)
|
|
2. Create new service with primary_key=new and previous=old
|
|
3. rotate(old_ciphertext) produces a new ciphertext decryptable by primary_key
|
|
"""
|
|
# Step 1: encrypt with the old (secondary) key
|
|
old_svc = KeyEncryptionService(primary_key=secondary_key)
|
|
plaintext = "sk-rotate-me"
|
|
old_ciphertext = old_svc.encrypt(plaintext)
|
|
|
|
# Step 2: new service knows both keys — primary=new, previous=old
|
|
new_svc = KeyEncryptionService(primary_key=primary_key, previous_key=secondary_key)
|
|
|
|
# Verify old ciphertext is still decryptable via the new service (previous key fallback)
|
|
assert new_svc.decrypt(old_ciphertext) == plaintext
|
|
|
|
# Step 3: rotate — re-encrypt with the primary key
|
|
rotated_ciphertext = new_svc.rotate(old_ciphertext)
|
|
assert rotated_ciphertext != old_ciphertext
|
|
|
|
# Rotated ciphertext must be decryptable by a service with only the new primary key
|
|
only_new_svc = KeyEncryptionService(primary_key=primary_key)
|
|
assert only_new_svc.decrypt(rotated_ciphertext) == plaintext
|