20 KiB
20 KiB
phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
| phase | plan | type | wave | depends_on | files_modified | autonomous | requirements | must_haves | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 04-rbac | 01 | execute | 1 |
|
true |
|
|
Purpose: All backend authorization primitives must exist before the portal can implement role-based UI or endpoint guards can be wired to existing routes. Output: Migration 006, RBAC models, guard dependencies, invitation API, SMTP email utility, updated auth/verify response, unit tests.
<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/04-rbac/04-CONTEXT.md @.planning/phases/04-rbac/04-RESEARCH.mdFrom packages/shared/shared/models/auth.py:
class PortalUser(Base):
__tablename__ = "portal_users"
id: Mapped[uuid.UUID]
email: Mapped[str] # unique, indexed
hashed_password: Mapped[str]
name: Mapped[str]
is_admin: Mapped[bool] # TO BE REPLACED by role enum
created_at: Mapped[datetime]
updated_at: Mapped[datetime]
From packages/shared/shared/api/portal.py:
class AuthVerifyResponse(BaseModel):
id: str
email: str
name: str
is_admin: bool # TO BE REPLACED by role + tenant_ids + active_tenant_id
portal_router = APIRouter(prefix="/api/portal", tags=["portal"])
From packages/shared/shared/config.py:
class Settings(BaseSettings):
# Auth / Security section — add invite_secret and SMTP settings here
auth_secret: str = Field(default="insecure-dev-secret-change-in-production")
From packages/shared/shared/db.py:
async def get_session() -> AsyncGenerator[AsyncSession, None]: ...
From packages/shared/shared/models/tenant.py:
class Base(DeclarativeBase): ... # All ORM models inherit from this
class Tenant(Base):
__tablename__ = "tenants"
id: Mapped[uuid.UUID]
name: Mapped[str]
From packages/shared/shared/models/audit.py:
class AuditEvent(Base): # Reuse for impersonation logging in Plan 03
Update `packages/shared/shared/models/auth.py`:
- Add `UserRole(str, enum.Enum)` with PLATFORM_ADMIN, CUSTOMER_ADMIN, CUSTOMER_OPERATOR
- Replace `is_admin: Mapped[bool]` with `role: Mapped[str] = mapped_column(String(50), nullable=False, default="customer_admin")`
- Add `UserTenantRole(Base)` model with __tablename__ = "user_tenant_roles", UniqueConstraint("user_id", "tenant_id"), ForeignKey refs
- Add `PortalInvitation(Base)` model with __tablename__ = "portal_invitations", all fields matching migration
Update `packages/shared/shared/config.py` — add to Settings:
- `invite_secret: str = Field(default="insecure-invite-secret-change-in-production")`
- `smtp_host: str = Field(default="localhost")`
- `smtp_port: int = Field(default=587)`
- `smtp_username: str = Field(default="")`
- `smtp_password: str = Field(default="")`
- `smtp_from_email: str = Field(default="noreply@konstruct.dev")`
- `portal_base_url: str = Field(default="http://localhost:3000")` (for invite link URL construction, only add if portal_url doesn't already exist — check existing field name)
Note: `portal_url` already exists in Settings (used for Stripe redirects). Reuse it for invite links — do NOT add a duplicate field.
cd /home/adelorenzo/repos/konstruct && python -c "from shared.models.auth import PortalUser, UserTenantRole, PortalInvitation, UserRole; print('Models OK'); assert hasattr(PortalUser, 'role'); assert not hasattr(PortalUser, 'is_admin')"
Migration 006 exists with upgrade/downgrade. PortalUser has role (not is_admin). UserTenantRole and PortalInvitation models exist. Settings has invite_secret and SMTP fields.
Task 2: RBAC guard dependencies + invite token + email utility + invitation API
packages/shared/shared/api/rbac.py,
packages/shared/shared/invite_token.py,
packages/shared/shared/email.py,
packages/shared/shared/api/invitations.py,
packages/shared/shared/api/portal.py,
packages/gateway/gateway/main.py
- require_platform_admin raises 403 for non-platform_admin callers
- require_platform_admin returns PortalCaller for platform_admin
- require_tenant_admin raises 403 for operators and non-members
- require_tenant_admin allows platform_admin to bypass tenant membership check
- require_tenant_member allows all three roles if they have tenant membership (or are platform_admin)
- require_tenant_member raises 403 for users with no membership in the target tenant
- generate_invite_token produces a base64url-encoded HMAC-signed token
- validate_invite_token rejects tampered signatures (ValueError)
- validate_invite_token rejects expired tokens (ValueError)
- validate_invite_token returns invitation_id for valid tokens
- POST /api/portal/invitations creates invitation, returns 201
- POST /api/portal/invitations/accept accepts invitation, creates user, returns 200
- POST /api/portal/invitations/{id}/resend resets expiry and returns new token
- AuthVerifyResponse returns role, tenant_ids, active_tenant_id instead of is_admin
Create `packages/shared/shared/api/rbac.py`:
- `PortalCaller` dataclass with user_id (UUID), role (str), tenant_id (UUID | None)
- `get_portal_caller()` — reads X-Portal-User-Id, X-Portal-User-Role, X-Portal-Tenant-Id headers. Returns PortalCaller. Raises 401 on invalid user_id format.
- `require_platform_admin(caller: PortalCaller)` — raises 403 if role != "platform_admin"
- `require_tenant_admin(tenant_id: UUID, caller: PortalCaller, session: AsyncSession)` — platform_admin bypasses (return immediately). customer_admin checks UserTenantRole membership. Others get 403.
- `require_tenant_member(tenant_id: UUID, caller: PortalCaller, session: AsyncSession)` — platform_admin bypasses. customer_admin or customer_operator checks UserTenantRole membership. Returns PortalCaller.
Create `packages/shared/shared/invite_token.py`:
- `generate_invite_token(invitation_id: str) -> str` — HMAC-SHA256 with settings.invite_secret, embeds invitation_id:timestamp, base64url-encodes result
- `validate_invite_token(token: str) -> str` — decodes, verifies HMAC with hmac.compare_digest (timing-safe), checks 48h TTL, returns invitation_id. Raises ValueError on tamper or expiry.
- Uses `from shared.config import settings` for invite_secret
Create `packages/shared/shared/email.py`:
- `send_invite_email(to_email, invitee_name, tenant_name, invite_url)` — sync function using smtplib + email.mime. Subject: "You've been invited to join {tenant_name} on Konstruct". Reads SMTP config from settings. Called from Celery task.
Create `packages/shared/shared/api/invitations.py`:
- `invitations_router = APIRouter(prefix="/api/portal/invitations", tags=["invitations"])`
- POST `/` — accepts {email, name, role, tenant_id}. Requires tenant admin (use require_tenant_admin). Creates PortalInvitation row, generates token, stores SHA-256 hash of token in token_hash column, dispatches Celery email task (fire-and-forget), returns invitation details + raw token in response (for display/copy in UI). Returns 201.
- POST `/accept` — accepts {token, password}. Validates token via validate_invite_token. Looks up invitation by id, checks status='pending' and not expired. Creates PortalUser with bcrypt password + role from invitation. Creates UserTenantRole row. Updates invitation status='accepted'. All in one transaction. Returns 200 with user details.
- POST `/{invitation_id}/resend` — requires tenant admin. Generates new token, updates token_hash and expires_at (+48h from now), re-dispatches email Celery task. Returns 200.
- GET `/` — requires tenant admin. Lists pending invitations for the caller's active tenant. Returns list.
Add Celery task in orchestrator or define inline:
- Add `send_invite_email_task` to an appropriate location. Per codebase pattern, Celery tasks are sync def. The task calls `send_invite_email()` directly. If SMTP is not configured (empty smtp_host), log a warning and skip — do not crash.
Update `packages/shared/shared/api/portal.py`:
- Change `AuthVerifyResponse` to return `role: str`, `tenant_ids: list[str]`, `active_tenant_id: str | None` instead of `is_admin: bool`
- Update `verify_credentials` endpoint: after finding user, query `user_tenant_roles` for all tenant_ids where user has membership. For platform_admin, return all tenant IDs (query tenants table). Return role + tenant_ids + active_tenant_id (first tenant or None).
- Update `AuthRegisterResponse` similarly (replace is_admin with role)
- Gate `/auth/register` behind platform_admin only (add Depends(require_platform_admin)) with a deprecation comment noting invite-only is the standard flow.
Mount invitations_router in `packages/gateway/gateway/main.py`:
- `from shared.api.invitations import invitations_router`
- `app.include_router(invitations_router)`
cd /home/adelorenzo/repos/konstruct && python -c "from shared.api.rbac import PortalCaller, require_platform_admin, require_tenant_admin, require_tenant_member; from shared.invite_token import generate_invite_token, validate_invite_token; from shared.api.invitations import invitations_router; print('All imports OK')"
RBAC guards raise 403 for unauthorized callers. Invite tokens are HMAC-signed with 48h TTL. Invitation API supports create/accept/resend/list. Auth verify returns role+tenant_ids. Invitations router mounted on gateway.
Task 3: Unit tests for RBAC guards and invitation system
tests/unit/test_rbac_guards.py,
tests/unit/test_invitations.py
- test_platform_admin_passes: platform_admin caller gets through require_platform_admin
- test_non_admin_rejected: customer_admin and customer_operator get 403 from require_platform_admin
- test_tenant_admin_own_tenant: customer_admin with membership passes require_tenant_admin
- test_tenant_admin_other_tenant: customer_admin without membership gets 403
- test_platform_admin_bypasses_tenant_check: platform_admin passes require_tenant_admin without membership
- test_operator_rejected_from_admin: customer_operator gets 403 from require_tenant_admin
- test_tenant_member_all_roles: all three roles with membership pass require_tenant_member
- test_token_roundtrip: generate then validate returns same invitation_id
- test_token_tamper_rejected: modified token raises ValueError
- test_token_expired_rejected: token older than 48h raises ValueError
- test_invite_accept_creates_user: accepting invite creates PortalUser + UserTenantRole
- test_invite_accept_rejects_expired: expired invitation returns error
- test_invite_resend_updates_token: resend generates new token_hash and extends expires_at
Create `tests/unit/test_rbac_guards.py`:
- Test require_platform_admin with PortalCaller(role="platform_admin") — should return caller
- Test require_platform_admin with PortalCaller(role="customer_admin") — should raise HTTPException 403
- Test require_platform_admin with PortalCaller(role="customer_operator") — should raise HTTPException 403
- Test require_tenant_admin: mock session with UserTenantRole row for customer_admin — should pass
- Test require_tenant_admin: mock session with no membership — should raise 403
- Test require_tenant_admin: platform_admin caller — should bypass membership check entirely
- Test require_tenant_member: customer_operator with membership — should pass
- Test require_tenant_member: customer_admin with membership — should pass
- Test require_tenant_member: no membership — should raise 403
For tenant membership tests, mock the AsyncSession.execute() to return appropriate results.
Use `pytest.raises(HTTPException)` and assert `.status_code == 403`.
Create `tests/unit/test_invitations.py`:
- Test generate_invite_token + validate_invite_token roundtrip with a UUID string
- Test validate_invite_token with a manually tampered signature — should raise ValueError
- Test validate_invite_token with an artificially old timestamp (patch time.time to simulate expiry) — should raise ValueError
- Test invitation creation via httpx TestClient hitting POST /api/portal/invitations (mock DB session, mock Celery task)
- Test invitation acceptance via POST /api/portal/invitations/accept (mock DB session with pending invitation)
- Test resend updates token_hash and extends expires_at
Follow existing test patterns: use `make_app(session)` factory from tests/ or direct httpx.AsyncClient with app.
cd /home/adelorenzo/repos/konstruct && pytest tests/unit/test_rbac_guards.py tests/unit/test_invitations.py -x -v
All RBAC guard unit tests pass. All invitation token and API unit tests pass. Coverage includes platform_admin bypass, tenant membership checks, token tampering, and expiry validation.
- `pytest tests/unit/test_rbac_guards.py tests/unit/test_invitations.py -x -v` — all pass
- `python -c "from shared.models.auth import UserRole; assert len(UserRole) == 3"` — enum has 3 values
- `python -c "from shared.api.rbac import require_platform_admin"` — guard imports clean
- `python -c "from shared.invite_token import generate_invite_token, validate_invite_token"` — token utils import clean
<success_criteria>
- Migration 006 exists and handles is_admin -> role backfill
- PortalUser model has role field, no is_admin
- UserTenantRole and PortalInvitation models exist
- RBAC guards (require_platform_admin, require_tenant_admin, require_tenant_member) implemented
- Invitation API (create, accept, resend, list) endpoints exist
- HMAC invite tokens generate and validate with 48h TTL
- SMTP email utility exists (sync, for Celery)
- Auth/verify returns role + tenant_ids
- All unit tests pass </success_criteria>