- rbac.py: PortalCaller dataclass + get_portal_caller dependency (header-based)
- rbac.py: require_platform_admin (403 for non-platform_admin)
- rbac.py: require_tenant_admin (platform_admin bypasses; customer_admin
checks UserTenantRole; operator always rejected)
- rbac.py: require_tenant_member (platform_admin bypasses; all roles
checked against UserTenantRole)
- invite_token.py: generate_invite_token (HMAC-SHA256, base64url, 48h TTL)
- invite_token.py: validate_invite_token (timing-safe compare_digest, TTL check)
- invite_token.py: token_to_hash (SHA-256 for DB storage)
- email.py: send_invite_email (sync smtplib, skips if smtp_host empty)
- invitations.py: POST /api/portal/invitations (create, requires tenant admin)
- invitations.py: POST /api/portal/invitations/accept (accept invitation)
- invitations.py: POST /api/portal/invitations/{id}/resend (regenerate token)
- invitations.py: GET /api/portal/invitations (list pending)
- portal.py: AuthVerifyResponse now returns role+tenant_ids+active_tenant_id
- portal.py: auth/register gated behind require_platform_admin
- tasks.py: send_invite_email_task Celery task (fire-and-forget)
- gateway/main.py: invitations_router mounted
107 lines
2.7 KiB
Python
107 lines
2.7 KiB
Python
"""
|
|
HMAC-signed invite token generation and validation.
|
|
|
|
Tokens encode `{invitation_id}:{timestamp}` signed with HMAC-SHA256
|
|
using settings.invite_secret. The raw token is base64url-encoded so
|
|
it's safe to include in URLs and emails.
|
|
|
|
Token format (before base64url encoding):
|
|
{invitation_id}:{timestamp_int}:{hmac_hex}
|
|
|
|
TTL: 48 hours. Tokens are single-use — the caller must mark the
|
|
invitation as 'accepted' or 'revoked' after use.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import time
|
|
|
|
from shared.config import settings
|
|
|
|
_TTL_SECONDS = 48 * 3600 # 48 hours
|
|
|
|
|
|
def generate_invite_token(invitation_id: str) -> str:
|
|
"""
|
|
Generate a base64url-encoded HMAC-signed invite token.
|
|
|
|
Args:
|
|
invitation_id: UUID string of the PortalInvitation row.
|
|
|
|
Returns:
|
|
A URL-safe base64-encoded token string.
|
|
"""
|
|
ts = int(time.time())
|
|
payload = f"{invitation_id}:{ts}"
|
|
sig = _sign(payload)
|
|
raw = f"{payload}:{sig}"
|
|
return base64.urlsafe_b64encode(raw.encode()).decode()
|
|
|
|
|
|
def validate_invite_token(token: str) -> str:
|
|
"""
|
|
Validate an invite token and return the invitation_id.
|
|
|
|
Args:
|
|
token: The base64url-encoded token from generate_invite_token.
|
|
|
|
Returns:
|
|
The invitation_id embedded in the token.
|
|
|
|
Raises:
|
|
ValueError: If the token is tampered, malformed, or expired.
|
|
"""
|
|
try:
|
|
raw = base64.urlsafe_b64decode(token.encode()).decode()
|
|
except Exception as exc:
|
|
raise ValueError("Invalid token encoding") from exc
|
|
|
|
parts = raw.split(":")
|
|
if len(parts) != 3:
|
|
raise ValueError("Malformed token: expected 3 parts")
|
|
|
|
invitation_id, ts_str, sig = parts
|
|
|
|
try:
|
|
ts = int(ts_str)
|
|
except ValueError as exc:
|
|
raise ValueError("Malformed token: invalid timestamp") from exc
|
|
|
|
# Timing-safe signature verification
|
|
expected_payload = f"{invitation_id}:{ts_str}"
|
|
expected_sig = _sign(expected_payload)
|
|
if not hmac.compare_digest(sig, expected_sig):
|
|
raise ValueError("Invalid token signature")
|
|
|
|
# TTL check
|
|
now = int(time.time())
|
|
if now - ts > _TTL_SECONDS:
|
|
raise ValueError("Token expired")
|
|
|
|
return invitation_id
|
|
|
|
|
|
def token_to_hash(token: str) -> str:
|
|
"""
|
|
Compute the SHA-256 hash of a raw invite token for DB storage.
|
|
|
|
Args:
|
|
token: The raw base64url-encoded token.
|
|
|
|
Returns:
|
|
Hex-encoded SHA-256 digest.
|
|
"""
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
|
|
def _sign(payload: str) -> str:
|
|
"""Return HMAC-SHA256 hex digest of the payload."""
|
|
return hmac.new(
|
|
settings.invite_secret.encode(),
|
|
payload.encode(),
|
|
hashlib.sha256,
|
|
).hexdigest()
|