feat(03-01): DB migrations, models, encryption service, and test scaffolds
- Add stripe and cryptography to shared pyproject.toml - Add recharts, @stripe/stripe-js, stripe to portal package.json (submodule) - Add billing fields to Tenant model (stripe_customer_id, subscription_status, agent_quota, trial_ends_at) - Add budget_limit_usd to Agent model - Create TenantLlmKey and StripeEvent models in billing.py (AuditBase and Base respectively) - Create KeyEncryptionService (MultiFernet encrypt/decrypt/rotate) in crypto.py - Create compute_budget_status helper in usage.py (threshold logic: ok/warning/exceeded) - Add platform_encryption_key, stripe_, slack_oauth settings to config.py - Create Alembic migration 005 with all schema changes, RLS, grants, and composite index - All 12 tests passing (key encryption roundtrip, rotation, budget thresholds)
This commit is contained in:
88
tests/unit/test_key_encryption.py
Normal file
88
tests/unit/test_key_encryption.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""
|
||||
Unit tests for KeyEncryptionService (Fernet-based encryption of BYO API keys).
|
||||
|
||||
Tests:
|
||||
- encrypt/decrypt roundtrip
|
||||
- different ciphertexts produced from same plaintext (Fernet random IV)
|
||||
- invalid ciphertext raises InvalidToken
|
||||
- MultiFernet rotation produces new ciphertext decryptable by current key
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
from shared.crypto import KeyEncryptionService
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def primary_key() -> str:
|
||||
"""Generate a fresh Fernet key for each test."""
|
||||
return Fernet.generate_key().decode()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def secondary_key() -> str:
|
||||
"""Second Fernet key for rotation tests."""
|
||||
return Fernet.generate_key().decode()
|
||||
|
||||
|
||||
def test_encrypt_decrypt_roundtrip(primary_key: str) -> None:
|
||||
"""Encrypt then decrypt returns the original plaintext."""
|
||||
svc = KeyEncryptionService(primary_key=primary_key)
|
||||
plaintext = "sk-my-secret-api-key-12345"
|
||||
|
||||
ciphertext = svc.encrypt(plaintext)
|
||||
result = svc.decrypt(ciphertext)
|
||||
|
||||
assert result == plaintext
|
||||
|
||||
|
||||
def test_encrypt_produces_different_ciphertext(primary_key: str) -> None:
|
||||
"""Same plaintext encrypted twice produces different ciphertexts (Fernet random IV)."""
|
||||
svc = KeyEncryptionService(primary_key=primary_key)
|
||||
plaintext = "sk-same-plaintext"
|
||||
|
||||
ct1 = svc.encrypt(plaintext)
|
||||
ct2 = svc.encrypt(plaintext)
|
||||
|
||||
assert ct1 != ct2
|
||||
# Both must still decrypt to the same value
|
||||
assert svc.decrypt(ct1) == plaintext
|
||||
assert svc.decrypt(ct2) == plaintext
|
||||
|
||||
|
||||
def test_decrypt_invalid_raises(primary_key: str) -> None:
|
||||
"""Decrypting garbage raises InvalidToken (or ValueError wrapping it)."""
|
||||
svc = KeyEncryptionService(primary_key=primary_key)
|
||||
|
||||
with pytest.raises((InvalidToken, ValueError)):
|
||||
svc.decrypt("this-is-not-valid-fernet-ciphertext")
|
||||
|
||||
|
||||
def test_multifernet_rotation(primary_key: str, secondary_key: str) -> None:
|
||||
"""
|
||||
Rotation scenario:
|
||||
1. Encrypt with 'old' key (secondary_key as primary, no previous)
|
||||
2. Create new service with primary_key=new and previous=old
|
||||
3. rotate(old_ciphertext) produces a new ciphertext decryptable by primary_key
|
||||
"""
|
||||
# Step 1: encrypt with the old (secondary) key
|
||||
old_svc = KeyEncryptionService(primary_key=secondary_key)
|
||||
plaintext = "sk-rotate-me"
|
||||
old_ciphertext = old_svc.encrypt(plaintext)
|
||||
|
||||
# Step 2: new service knows both keys — primary=new, previous=old
|
||||
new_svc = KeyEncryptionService(primary_key=primary_key, previous_key=secondary_key)
|
||||
|
||||
# Verify old ciphertext is still decryptable via the new service (previous key fallback)
|
||||
assert new_svc.decrypt(old_ciphertext) == plaintext
|
||||
|
||||
# Step 3: rotate — re-encrypt with the primary key
|
||||
rotated_ciphertext = new_svc.rotate(old_ciphertext)
|
||||
assert rotated_ciphertext != old_ciphertext
|
||||
|
||||
# Rotated ciphertext must be decryptable by a service with only the new primary key
|
||||
only_new_svc = KeyEncryptionService(primary_key=primary_key)
|
||||
assert only_new_svc.decrypt(rotated_ciphertext) == plaintext
|
||||
Reference in New Issue
Block a user