Files
konstruct/.planning/phases/03-operator-experience/03-01-PLAN.md

19 KiB

phase, plan, type, wave, depends_on, files_modified, autonomous, requirements, must_haves
phase plan type wave depends_on files_modified autonomous requirements must_haves
03-operator-experience 01 execute 1
packages/shared/pyproject.toml
packages/portal/package.json
packages/shared/shared/config.py
packages/shared/shared/models/billing.py
packages/shared/shared/models/tenant.py
packages/shared/shared/api/billing.py
packages/shared/shared/api/channels.py
packages/shared/shared/api/usage.py
packages/shared/shared/crypto.py
packages/orchestrator/orchestrator/agents/runner.py
packages/orchestrator/orchestrator/audit/logger.py
migrations/versions/005_billing_and_usage.py
tests/unit/test_key_encryption.py
tests/unit/test_slack_oauth.py
tests/unit/test_stripe_webhooks.py
tests/unit/test_usage_aggregation.py
tests/unit/test_budget_alerts.py
true
AGNT-07
LLM-03
PRTA-03
PRTA-05
PRTA-06
truths artifacts key_links
Audit events for LLM calls include prompt_tokens, completion_tokens, cost_usd, and provider in metadata
BYO API keys can be encrypted and decrypted without data loss using Fernet
Slack OAuth state can be HMAC-signed and verified for CSRF protection
Stripe webhook events can be processed idempotently
Token usage can be aggregated per agent and per provider from audit events
Budget alerts trigger at 80% and 100% thresholds
path provides
packages/shared/shared/models/billing.py TenantLlmKey model, StripeEvent model, billing field mixins
path provides
packages/shared/shared/crypto.py KeyEncryptionService with Fernet encrypt/decrypt/rotate
path provides
packages/shared/shared/api/channels.py Slack OAuth state generation/verification, OAuth callback endpoint
path provides
packages/shared/shared/api/billing.py Stripe webhook handler, checkout session creation, billing portal session
path provides
packages/shared/shared/api/usage.py Usage aggregation endpoints (per-agent tokens, per-provider cost, budget alerts)
path provides
migrations/versions/005_billing_and_usage.py DB migration for billing fields, tenant_llm_keys, stripe_events, audit index, agent budget_limit_usd
from to via pattern
packages/orchestrator/orchestrator/agents/runner.py packages/shared/shared/models/audit.py log_llm_call metadata includes token counts and cost prompt_tokens.*completion_tokens.*cost_usd
from to via pattern
packages/shared/shared/api/usage.py audit_events table JSONB aggregate queries on metadata fields metadata.*prompt_tokens.*cost_usd
from to via pattern
packages/shared/shared/crypto.py PLATFORM_ENCRYPTION_KEY env var Fernet key loaded at init MultiFernet
Backend foundation for Phase 3: database migrations, dependency installs, audit trail token metadata, encryption service, and all backend API endpoints for billing, channel connection, and usage aggregation.

Purpose: Every portal UI feature in Phase 3 depends on backend APIs and database schema. This plan ships all backend infrastructure so Plans 02-04 can focus on frontend. Output: New DB tables/fields, billing + channel + usage API endpoints, encryption service, enhanced audit logger, and comprehensive test scaffolds.

<execution_context> @/home/adelorenzo/.claude/get-shit-done/workflows/execute-plan.md @/home/adelorenzo/.claude/get-shit-done/templates/summary.md </execution_context>

@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/03-operator-experience/03-CONTEXT.md @.planning/phases/03-operator-experience/03-RESEARCH.md

From packages/shared/shared/models/tenant.py:

class Tenant(Base):
    __tablename__ = "tenants"
    id: Mapped[uuid.UUID]
    name: Mapped[str]
    slug: Mapped[str]
    settings: Mapped[dict[str, Any]]
    created_at: Mapped[datetime]
    updated_at: Mapped[datetime]
    agents: Mapped[list[Agent]] = relationship(...)
    channel_connections: Mapped[list[ChannelConnection]] = relationship(...)

class Agent(Base):
    __tablename__ = "agents"
    id: Mapped[uuid.UUID]
    tenant_id: Mapped[uuid.UUID]
    name: Mapped[str]
    role: Mapped[str]
    is_active: Mapped[bool]
    # ... other fields

class ChannelConnection(Base):
    __tablename__ = "channel_connections"
    # channel_type, workspace_id, tenant_id, config (JSON), is_active

From packages/shared/shared/models/audit.py:

class AuditEvent(AuditBase):
    __tablename__ = "audit_events"
    id: Mapped[uuid.UUID]
    tenant_id: Mapped[uuid.UUID]
    agent_id: Mapped[uuid.UUID | None]
    action_type: Mapped[str]  # "llm_call" | "tool_invocation" | "escalation"
    metadata: Mapped[dict[str, Any]]  # JSONB
    created_at: Mapped[datetime]

From packages/shared/shared/config.py:

class Settings(BaseSettings):
    # existing: database_url, redis_url, slack_bot_token, slack_signing_secret, ...

From packages/orchestrator/orchestrator/audit/logger.py:

class AuditLogger:
    async def log_llm_call(self, tenant_id, agent_id, user_id, input_summary, output_summary, latency_ms, metadata=None)
    async def log_tool_call(self, tool_name, args, result, tenant_id, agent_id, latency_ms, error=None)
    async def log_escalation(self, ...)

From packages/shared/shared/api/portal.py:

portal_router = APIRouter(prefix="/api/portal")
# Existing: /auth/verify, /auth/register, /tenants CRUD, /tenants/{id}/agents CRUD
Task 1: Database migrations, models, encryption service, and test scaffolds packages/shared/pyproject.toml, packages/portal/package.json, packages/shared/shared/config.py, packages/shared/shared/models/billing.py, packages/shared/shared/models/tenant.py, packages/shared/shared/crypto.py, migrations/versions/005_billing_and_usage.py, tests/unit/test_key_encryption.py, tests/unit/test_budget_alerts.py - test_encrypt_decrypt_roundtrip: KeyEncryptionService.encrypt(plaintext) -> decrypt -> returns original plaintext - test_encrypt_produces_different_ciphertext: encrypt("key") != encrypt("key") (Fernet uses random IV) - test_decrypt_invalid_raises: decrypt("garbage") raises InvalidToken - test_multifernet_rotation: rotate(old_ciphertext) produces new ciphertext decryptable by current key - test_budget_alert_no_limit: agent with budget_limit_usd=None -> no alert - test_budget_alert_under_threshold: usage at 50% -> status "ok" - test_budget_alert_warning: usage at 80% -> status "warning" - test_budget_alert_exceeded: usage at 100%+ -> status "exceeded" 1. Install Python dependencies: `uv add stripe cryptography` in packages/shared/pyproject.toml 2. Install Node dependencies: `npm install recharts @stripe/stripe-js stripe` in packages/portal/
3. Create `packages/shared/shared/models/billing.py`:
   - `TenantLlmKey` model: id (UUID PK), tenant_id (FK tenants.id CASCADE), provider (TEXT NOT NULL), label (TEXT NOT NULL), encrypted_key (TEXT NOT NULL), key_version (INT DEFAULT 1), created_at. UNIQUE(tenant_id, provider). Use AuditBase (same as audit_events — separate declarative base).
   - `StripeEvent` model: event_id (TEXT PK), processed_at (TIMESTAMPTZ DEFAULT now()). Use Base from tenant.py.
   - Note: tenant_llm_keys needs RLS enabled (same pattern as agents table).

4. Add billing fields to `Tenant` model in tenant.py:
   - stripe_customer_id: Mapped[str | None] (String(255), nullable=True)
   - stripe_subscription_id: Mapped[str | None] (String(255), nullable=True)
   - stripe_subscription_item_id: Mapped[str | None] (String(255), nullable=True)
   - subscription_status: Mapped[str] (String(50), default="none") — values: none, trialing, active, past_due, canceled, unpaid
   - trial_ends_at: Mapped[datetime | None] (DateTime(timezone=True), nullable=True)
   - agent_quota: Mapped[int] (Integer, default=0)

5. Add budget field to `Agent` model in tenant.py:
   - budget_limit_usd: Mapped[float | None] (Float, nullable=True, default=None) — NULL means no limit

6. Create `packages/shared/shared/crypto.py` — KeyEncryptionService:
   - Uses MultiFernet with PLATFORM_ENCRYPTION_KEY (required) and PLATFORM_ENCRYPTION_KEY_PREVIOUS (optional)
   - Methods: encrypt(plaintext: str) -> str, decrypt(ciphertext: str) -> str, rotate(ciphertext: str) -> str
   - See research Pattern 4 for exact implementation

7. Add to `packages/shared/shared/config.py`:
   - platform_encryption_key: str = Field(default="", description="Fernet key for BYO API key encryption")
   - platform_encryption_key_previous: str = Field(default="", description="Previous Fernet key for rotation")
   - stripe_secret_key: str = Field(default="", description="Stripe secret API key")
   - stripe_webhook_secret: str = Field(default="", description="Stripe webhook endpoint secret")
   - stripe_per_agent_price_id: str = Field(default="", description="Stripe Price ID for per-agent monthly plan")
   - portal_url: str = Field(default="http://localhost:3000", description="Portal base URL for Stripe redirects")
   - slack_client_id: str = Field(default="", description="Slack OAuth app client ID")
   - slack_client_secret: str = Field(default="", description="Slack OAuth app client secret")
   - slack_oauth_redirect_uri: str = Field(default="http://localhost:3000/api/slack/callback", description="Slack OAuth redirect URI")
   - oauth_state_secret: str = Field(default="", description="HMAC secret for OAuth state parameter signing")

8. Create Alembic migration `005_billing_and_usage.py`:
   - ADD COLUMNS to tenants: stripe_customer_id, stripe_subscription_id, stripe_subscription_item_id, subscription_status, trial_ends_at, agent_quota
   - ADD COLUMN to agents: budget_limit_usd
   - CREATE TABLE tenant_llm_keys with RLS enabled (same FORCE ROW LEVEL SECURITY pattern as agents)
   - CREATE TABLE stripe_events (event_id TEXT PK, processed_at TIMESTAMPTZ DEFAULT now())
   - CREATE INDEX idx_audit_events_tenant_type_created ON audit_events (tenant_id, action_type, created_at DESC)
   - GRANT SELECT, INSERT on tenant_llm_keys to konstruct_app
   - GRANT SELECT, INSERT on stripe_events to konstruct_app

9. Write test scaffolds:
   - tests/unit/test_key_encryption.py — test encrypt/decrypt roundtrip, rotation, invalid ciphertext
   - tests/unit/test_budget_alerts.py — test threshold logic (no limit, under 80%, at 80%, at 100%+)
cd /home/adelorenzo/repos/konstruct && pytest tests/unit/test_key_encryption.py tests/unit/test_budget_alerts.py -x -v - stripe and cryptography in shared pyproject.toml, recharts and @stripe/stripe-js in portal package.json - Tenant model has billing fields, Agent model has budget_limit_usd - TenantLlmKey and StripeEvent models exist in billing.py - KeyEncryptionService passes encrypt/decrypt/rotate tests - Budget alert threshold logic passes at all levels - Alembic migration 005 exists with all schema changes - Config has all new settings fields Task 2: Backend API endpoints — channels, billing, usage aggregation, and audit logger enhancement packages/shared/shared/api/channels.py, packages/shared/shared/api/billing.py, packages/shared/shared/api/usage.py, packages/shared/shared/api/portal.py, packages/orchestrator/orchestrator/agents/runner.py, tests/unit/test_slack_oauth.py, tests/unit/test_stripe_webhooks.py, tests/unit/test_usage_aggregation.py - test_generate_oauth_state: generate_oauth_state(tenant_id, secret) produces base64-encoded string containing tenant_id - test_verify_oauth_state_valid: verify_oauth_state(valid_state, secret) returns correct tenant_id - test_verify_oauth_state_tampered: verify_oauth_state(tampered_state, secret) raises ValueError - test_stripe_webhook_idempotency: processing same event_id twice returns "already_processed" on second call - test_stripe_subscription_updated: customer.subscription.updated event updates tenant subscription_status - test_stripe_cancellation: customer.subscription.deleted event sets status=canceled and deactivates agents - test_usage_group_by_agent: aggregation query groups prompt_tokens, completion_tokens, cost_usd by agent_id - test_usage_group_by_provider: aggregation query groups cost_usd by provider 1. Create `packages/shared/shared/api/channels.py`: - `generate_oauth_state(tenant_id: str, secret: str) -> str` — HMAC-SHA256 signed state with nonce (see research Pattern 1) - `verify_oauth_state(state: str, secret: str) -> str` — returns tenant_id or raises ValueError - `GET /api/portal/channels/slack/install?tenant_id={id}` — generates state, returns Slack OAuth authorize URL with scopes: app_mentions:read,channels:read,channels:history,chat:write,im:read,im:write,im:history - `GET /api/portal/channels/slack/callback?code={code}&state={state}` — verifies state, exchanges code via POST to https://slack.com/api/oauth.v2.access, encrypts bot_token with KeyEncryptionService, stores in channel_connections (channel_type="slack", workspace_id=team.id, config={bot_token, bot_user_id, team_name}) - `POST /api/portal/channels/whatsapp/connect` — accepts {tenant_id, phone_number_id, waba_id, system_user_token}, validates by calling GET https://graph.facebook.com/v22.0/{phone_number_id} with token, encrypts token, stores in channel_connections - `POST /api/portal/channels/{tenant_id}/test` — accepts {channel_type}, loads channel_connection for tenant, sends test message ("Konstruct connected successfully") via the appropriate channel SDK, returns success/failure
2. Create `packages/shared/shared/api/billing.py`:
   - `POST /api/portal/billing/checkout` — accepts {tenant_id, agent_count}, creates Stripe Customer if none exists (lazy creation per research recommendation), creates Checkout Session with mode="subscription", trial_period_days=14, quantity=agent_count, returns session.url
   - `POST /api/portal/billing/portal` — accepts {tenant_id}, creates Stripe Billing Portal session, returns portal_session.url
   - `POST /api/webhooks/stripe` — Stripe webhook handler: reads raw body with request.body(), verifies signature with stripe.Webhook.construct_event(), checks idempotency via StripeEvent table (INSERT ON CONFLICT DO NOTHING), dispatches to handler per event type:
     - checkout.session.completed: store subscription_id, subscription_item_id, set status
     - customer.subscription.updated: update subscription_status, agent_quota, trial_ends_at
     - customer.subscription.deleted: set status=canceled, set Agent.is_active=False for all tenant agents
     - invoice.paid: set status=active, re-enable agents
     - invoice.payment_failed: set status=past_due
   - Use StripeClient pattern (not legacy stripe.api_key): `client = stripe.StripeClient(api_key=settings.stripe_secret_key)`

3. Create `packages/shared/shared/api/usage.py`:
   - `GET /api/portal/usage/{tenant_id}/summary?start_date={}&end_date={}` — returns per-agent token usage and cost (SQL aggregate on audit_events WHERE action_type='llm_call', GROUP BY agent_id). Use CAST(:metadata AS jsonb) pattern for asyncpg.
   - `GET /api/portal/usage/{tenant_id}/by-provider?start_date={}&end_date={}` — returns cost grouped by provider
   - `GET /api/portal/usage/{tenant_id}/message-volume?start_date={}&end_date={}` — returns message count grouped by channel
   - `GET /api/portal/usage/{tenant_id}/budget-alerts` — for each agent with budget_limit_usd, compare current month cost_usd sum against limit. Return status: "ok" (<80%), "warning" (80-99%), "exceeded" (>=100%).
   - Include the composite index from migration 005 for performance.

4. Register new routers in the appropriate main.py files. Add channels_router, billing_router, and usage_router to the FastAPI app. The stripe webhook route should be on a separate prefix (/api/webhooks/stripe) without auth.

5. Enhance audit logger — in `packages/orchestrator/orchestrator/agents/runner.py`, extend the metadata dict passed to `log_llm_call()` to include:
   - prompt_tokens: extracted from LiteLLM response usage object
   - completion_tokens: extracted from LiteLLM response usage object
   - total_tokens: prompt + completion
   - cost_usd: use litellm.completion_cost() if available, otherwise estimate from model pricing table
   - provider: extract from model string (e.g., "anthropic/claude-sonnet-4" -> "anthropic")
   These fields are CRITICAL — the cost dashboard (Plan 04) queries them from audit_events.metadata JSONB.

6. Write test files:
   - tests/unit/test_slack_oauth.py — test state generation, verification, and tampered state rejection
   - tests/unit/test_stripe_webhooks.py — test idempotency (duplicate event skipped), subscription update, cancellation with agent deactivation
   - tests/unit/test_usage_aggregation.py — test per-agent grouping and per-provider grouping with mock audit data
cd /home/adelorenzo/repos/konstruct && pytest tests/unit/test_slack_oauth.py tests/unit/test_stripe_webhooks.py tests/unit/test_usage_aggregation.py -x -v - Slack OAuth install URL generation and callback exchange work (state HMAC verified) - WhatsApp manual connect endpoint validates token and stores encrypted - Test message endpoint sends via appropriate channel - Stripe checkout session creation, billing portal session, and webhook handler all functional - Webhook idempotency prevents duplicate processing - Subscription cancellation deactivates all tenant agents - Usage aggregation returns per-agent and per-provider data from audit_events - Budget alerts return correct status for each threshold level - Audit logger now includes token counts and cost in LLM call metadata - All unit tests pass All Wave 0 test scaffolds created and passing: - `pytest tests/unit/test_key_encryption.py -x` — Fernet encrypt/decrypt/rotate - `pytest tests/unit/test_budget_alerts.py -x` — threshold logic - `pytest tests/unit/test_slack_oauth.py -x` — OAuth state HMAC - `pytest tests/unit/test_stripe_webhooks.py -x` — idempotency, status updates, cancellation - `pytest tests/unit/test_usage_aggregation.py -x` — SQL aggregates - `pytest tests/unit -x -q` — full unit suite still green

<success_criteria>

  • All 5 test files pass with 0 failures
  • Alembic migration 005 exists and is syntactically valid
  • New API routers registered and importable
  • KeyEncryptionService encrypt/decrypt roundtrip works
  • Audit logger metadata includes prompt_tokens, completion_tokens, cost_usd, provider
  • Existing test suite remains green </success_criteria>
After completion, create `.planning/phases/03-operator-experience/03-01-SUMMARY.md`