feat(01-foundation-01): monorepo scaffolding, Docker Compose, and shared data models

- pyproject.toml: uv workspace with 5 member packages (shared, gateway, router, orchestrator, llm-pool)
- docker-compose.yml: PostgreSQL 16 + Redis 7 + Ollama services on konstruct-net
- .env.example: all required env vars documented, konstruct_app role (not superuser)
- scripts/init-db.sh: creates konstruct_app role at DB init time
- packages/shared/shared/config.py: Pydantic Settings loading all env vars
- packages/shared/shared/models/message.py: KonstructMessage, ChannelType, SenderInfo, MessageContent
- packages/shared/shared/models/tenant.py: Tenant, Agent, ChannelConnection SQLAlchemy 2.0 models
- packages/shared/shared/models/auth.py: PortalUser model for admin portal auth
- packages/shared/shared/db.py: async SQLAlchemy engine, session factory, get_session dependency
- packages/shared/shared/rls.py: current_tenant_id ContextVar and configure_rls_hook with parameterized SET LOCAL
- packages/shared/shared/redis_keys.py: tenant-namespaced key constructors (rate_limit, idempotency, session, engaged_thread)
This commit is contained in:
2026-03-23 09:49:28 -06:00
parent d611a07cc2
commit 5714acf741
19 changed files with 3935 additions and 0 deletions

View File

@@ -0,0 +1,22 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "konstruct-gateway"
version = "0.1.0"
description = "Channel Gateway — unified ingress for all messaging platforms"
requires-python = ">=3.12"
dependencies = [
"konstruct-shared",
"fastapi[standard]>=0.115.0",
"slack-bolt>=1.22.0",
"python-telegram-bot>=21.0",
"httpx>=0.28.0",
]
[tool.uv.sources]
konstruct-shared = { workspace = true }
[tool.hatch.build.targets.wheel]
packages = ["gateway"]

View File

@@ -0,0 +1,20 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "konstruct-llm-pool"
version = "0.1.0"
description = "LLM Backend Pool — LiteLLM router for Ollama, vLLM, OpenAI, Anthropic, and BYO endpoints"
requires-python = ">=3.12"
dependencies = [
"konstruct-shared",
"litellm>=1.54.0",
"httpx>=0.28.0",
]
[tool.uv.sources]
konstruct-shared = { workspace = true }
[tool.hatch.build.targets.wheel]
packages = ["llm_pool"]

View File

@@ -0,0 +1,20 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "konstruct-orchestrator"
version = "0.1.0"
description = "Agent Orchestrator — agent selection, tool dispatch, memory, handoffs"
requires-python = ">=3.12"
dependencies = [
"konstruct-shared",
"fastapi[standard]>=0.115.0",
"httpx>=0.28.0",
]
[tool.uv.sources]
konstruct-shared = { workspace = true }
[tool.hatch.build.targets.wheel]
packages = ["orchestrator"]

View File

@@ -0,0 +1,20 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "konstruct-router"
version = "0.1.0"
description = "Message Router — tenant resolution, rate limiting, context loading"
requires-python = ">=3.12"
dependencies = [
"konstruct-shared",
"fastapi[standard]>=0.115.0",
"httpx>=0.28.0",
]
[tool.uv.sources]
konstruct-shared = { workspace = true }
[tool.hatch.build.targets.wheel]
packages = ["router"]

View File

@@ -0,0 +1,24 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "konstruct-shared"
version = "0.1.0"
description = "Shared Pydantic models, SQLAlchemy ORM, and utilities for Konstruct"
requires-python = ">=3.12"
dependencies = [
"fastapi[standard]>=0.115.0",
"pydantic[email]>=2.12.0",
"pydantic-settings>=2.8.0",
"sqlalchemy[asyncio]>=2.0.36",
"asyncpg>=0.31.0",
"alembic>=1.14.0",
"redis>=5.2.0",
"celery[redis]>=5.4.0",
"httpx>=0.28.0",
"slowapi>=0.1.9",
]
[tool.hatch.build.targets.wheel]
packages = ["shared"]

View File

@@ -0,0 +1,6 @@
"""
Konstruct shared library.
Provides shared Pydantic models, SQLAlchemy ORM models, database utilities,
RLS integration, and Redis key namespacing used across all Konstruct services.
"""

View File

@@ -0,0 +1,113 @@
"""
Konstruct shared configuration.
Loads all environment variables via Pydantic Settings with sensible defaults
for local development. All services import from this module.
"""
from __future__ import annotations
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
# -------------------------------------------------------------------------
# Database
# -------------------------------------------------------------------------
database_url: str = Field(
default="postgresql+asyncpg://konstruct_app:konstruct_dev@localhost:5432/konstruct",
description="Async database URL — must use konstruct_app role, not superuser",
)
database_admin_url: str = Field(
default="postgresql+asyncpg://postgres:postgres_dev@localhost:5432/konstruct",
description="Admin database URL for Alembic migrations (superuser)",
)
# -------------------------------------------------------------------------
# Redis
# -------------------------------------------------------------------------
redis_url: str = Field(
default="redis://localhost:6379/0",
description="Redis connection URL",
)
celery_broker_url: str = Field(
default="redis://localhost:6379/1",
description="Celery broker URL",
)
celery_result_backend: str = Field(
default="redis://localhost:6379/2",
description="Celery result backend URL",
)
# -------------------------------------------------------------------------
# Slack
# -------------------------------------------------------------------------
slack_bot_token: str = Field(
default="",
description="Slack bot token (xoxb-...)",
)
slack_signing_secret: str = Field(
default="",
description="Slack signing secret for webhook verification",
)
slack_app_token: str = Field(
default="",
description="Slack app-level token for Socket Mode (xapp-...)",
)
# -------------------------------------------------------------------------
# LLM Providers
# -------------------------------------------------------------------------
anthropic_api_key: str = Field(
default="",
description="Anthropic API key",
)
openai_api_key: str = Field(
default="",
description="OpenAI API key",
)
ollama_base_url: str = Field(
default="http://localhost:11434",
description="Ollama inference server base URL",
)
# -------------------------------------------------------------------------
# Auth / Security
# -------------------------------------------------------------------------
auth_secret: str = Field(
default="insecure-dev-secret-change-in-production",
description="Secret key for signing JWT tokens",
)
# -------------------------------------------------------------------------
# Service URLs
# -------------------------------------------------------------------------
gateway_url: str = Field(default="http://localhost:8001")
router_url: str = Field(default="http://localhost:8002")
orchestrator_url: str = Field(default="http://localhost:8003")
llm_pool_url: str = Field(default="http://localhost:8004")
# -------------------------------------------------------------------------
# Application
# -------------------------------------------------------------------------
environment: str = Field(default="development")
log_level: str = Field(default="INFO")
debug: bool = Field(default=False)
default_rate_limit_rpm: int = Field(
default=60,
description="Default requests per minute per tenant",
)
# Module-level singleton — imported by all services
settings = Settings()

View File

@@ -0,0 +1,56 @@
"""
Async SQLAlchemy engine and session factory.
Usage in FastAPI:
async def route(session: AsyncSession = Depends(get_session)):
...
Usage in tests:
async with async_session_factory() as session:
...
"""
from __future__ import annotations
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from shared.config import settings
# ---------------------------------------------------------------------------
# Engine — one per process; shared across all requests
# ---------------------------------------------------------------------------
engine: AsyncEngine = create_async_engine(
settings.database_url,
echo=settings.debug,
pool_pre_ping=True,
pool_size=10,
max_overflow=20,
)
# ---------------------------------------------------------------------------
# Session factory
# ---------------------------------------------------------------------------
async_session_factory: async_sessionmaker[AsyncSession] = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency that yields an async database session.
The session is automatically closed (and the connection returned to the
pool) when the request context exits, even if an exception is raised.
Example:
@router.get("/agents")
async def list_agents(session: AsyncSession = Depends(get_session)):
result = await session.execute(select(Agent))
return result.scalars().all()
"""
async with async_session_factory() as session:
yield session

View File

@@ -0,0 +1,26 @@
"""
Shared data models for Konstruct.
Re-exports all public model classes for convenient importing:
from shared.models import KonstructMessage, Tenant, Agent, ChannelConnection
"""
from shared.models.auth import PortalUser
from shared.models.message import ChannelType, KonstructMessage, MessageContent, SenderInfo
from shared.models.tenant import Agent, Base, ChannelConnection, ChannelTypeEnum, Tenant
__all__ = [
# Pydantic models (message format)
"ChannelType",
"KonstructMessage",
"MessageContent",
"SenderInfo",
# SQLAlchemy models
"Base",
"Tenant",
"Agent",
"ChannelConnection",
"ChannelTypeEnum",
"PortalUser",
]

View File

@@ -0,0 +1,62 @@
"""
Portal user model for admin dashboard authentication.
Auth.js v5 validates credentials against this model via a FastAPI endpoint.
Passwords are stored as bcrypt hashes — never plaintext.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from sqlalchemy import Boolean, DateTime, String, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from shared.models.tenant import Base
class PortalUser(Base):
"""
An operator with access to the Konstruct admin portal.
RLS is NOT applied to this table — users are authenticated before
tenant context is established. Authorization is handled at the
application layer (is_admin flag + JWT claims).
"""
__tablename__ = "portal_users"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
hashed_password: Mapped[str] = mapped_column(
String(255),
nullable=False,
comment="bcrypt hash — never store plaintext",
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
is_admin: Mapped[bool] = mapped_column(
Boolean,
nullable=False,
default=False,
comment="True for platform-level admin; tenant managers use RBAC",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
def __repr__(self) -> str:
return f"<PortalUser id={self.id} email={self.email!r} is_admin={self.is_admin}>"

View File

@@ -0,0 +1,93 @@
"""
KonstructMessage — the unified internal message format.
All channel adapters (Slack, WhatsApp, Mattermost, etc.) normalize inbound
events into this format before passing them to the Message Router. The core
business logic never depends on which messaging platform a message came from.
"""
from __future__ import annotations
import uuid
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field
class ChannelType(StrEnum):
"""Supported messaging channels."""
SLACK = "slack"
WHATSAPP = "whatsapp"
MATTERMOST = "mattermost"
ROCKETCHAT = "rocketchat"
TEAMS = "teams"
TELEGRAM = "telegram"
SIGNAL = "signal"
class SenderInfo(BaseModel):
"""Information about the message sender."""
user_id: str = Field(description="Channel-native user ID (e.g. Slack user ID U12345)")
display_name: str = Field(description="Human-readable display name")
email: str | None = Field(default=None, description="Sender email if available")
is_bot: bool = Field(default=False, description="True if sender is a bot/automation")
class MessageContent(BaseModel):
"""The content of a message — text and optional attachments."""
text: str = Field(description="Plain text content of the message")
html: str | None = Field(default=None, description="HTML-formatted content if available")
attachments: list[dict[str, Any]] = Field(
default_factory=list,
description="File attachments, images, or structured payloads",
)
mentions: list[str] = Field(
default_factory=list,
description="List of user/bot IDs mentioned in the message",
)
class KonstructMessage(BaseModel):
"""
Unified internal message format for Konstruct.
All channel adapters normalize events into this format. Downstream services
(Router, Orchestrator) operate exclusively on KonstructMessage — they never
inspect channel-specific fields directly.
`tenant_id` is None immediately after normalization. The Message Router
populates it via channel_connections lookup before forwarding.
"""
id: str = Field(
default_factory=lambda: str(uuid.uuid4()),
description="Unique message ID (UUID)",
)
tenant_id: str | None = Field(
default=None,
description="Konstruct tenant ID — populated by Message Router after resolution",
)
channel: ChannelType = Field(description="Source messaging channel")
channel_metadata: dict[str, Any] = Field(
description="Channel-specific identifiers: workspace_id, channel_id, bot_user_id, etc."
)
sender: SenderInfo = Field(description="Message sender information")
content: MessageContent = Field(description="Message content")
timestamp: datetime = Field(description="Message timestamp (UTC)")
thread_id: str | None = Field(
default=None,
description="Thread identifier for threaded conversations (e.g. Slack thread_ts)",
)
reply_to: str | None = Field(
default=None,
description="Parent message ID if this is a reply",
)
context: dict[str, Any] = Field(
default_factory=dict,
description="Extracted intent, entities, sentiment — populated by downstream processors",
)

View File

@@ -0,0 +1,188 @@
"""
SQLAlchemy 2.0 ORM models for multi-tenant data.
IMPORTANT: All models here use SQLAlchemy 2.0 `Mapped[]` and `mapped_column()`
style. Never use the legacy 1.x `Column()` style.
RLS is applied to tenant-scoped tables (agents, channel_connections) via
Alembic migration. Application connections MUST use the `konstruct_app` role
(not the postgres superuser) for RLS to be enforced.
"""
from __future__ import annotations
import enum
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import JSON, Boolean, DateTime, Enum, ForeignKey, String, Text, UniqueConstraint, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
"""Shared declarative base for all Konstruct ORM models."""
pass
class ChannelTypeEnum(str, enum.Enum):
"""Matches ChannelType StrEnum in message.py — kept in sync."""
SLACK = "slack"
WHATSAPP = "whatsapp"
MATTERMOST = "mattermost"
ROCKETCHAT = "rocketchat"
TEAMS = "teams"
TELEGRAM = "telegram"
SIGNAL = "signal"
class Tenant(Base):
"""
Top-level tenant. Represents one Konstruct customer / workspace.
RLS is NOT applied to this table — platform admin needs to list all tenants.
The konstruct_app role has SELECT/INSERT/UPDATE/DELETE on tenants.
"""
__tablename__ = "tenants"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
name: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
settings: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default=dict)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
# Relationships
agents: Mapped[list[Agent]] = relationship("Agent", back_populates="tenant", cascade="all, delete-orphan")
channel_connections: Mapped[list[ChannelConnection]] = relationship(
"ChannelConnection", back_populates="tenant", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<Tenant id={self.id} slug={self.slug!r}>"
class Agent(Base):
"""
An AI employee belonging to a specific tenant.
RLS is ENABLED on this table. Rows are visible only when
`app.current_tenant` session variable matches the row's `tenant_id`.
FORCE ROW LEVEL SECURITY ensures even the table owner cannot bypass RLS.
"""
__tablename__ = "agents"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("tenants.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
name: Mapped[str] = mapped_column(String(255), nullable=False)
role: Mapped[str] = mapped_column(String(255), nullable=False)
persona: Mapped[str] = mapped_column(Text, nullable=False, default="")
system_prompt: Mapped[str] = mapped_column(Text, nullable=False, default="")
model_preference: Mapped[str] = mapped_column(
String(50),
nullable=False,
default="quality",
comment="quality | balanced | economy | local",
)
tool_assignments: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list)
escalation_rules: Mapped[list[Any]] = mapped_column(JSON, nullable=False, default=list)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
# Relationships
tenant: Mapped[Tenant] = relationship("Tenant", back_populates="agents")
def __repr__(self) -> str:
return f"<Agent id={self.id} name={self.name!r} tenant_id={self.tenant_id}>"
class ChannelConnection(Base):
"""
Links a messaging platform workspace to a Konstruct tenant.
Example: Slack workspace T12345 → Tenant UUID abc-123.
The Message Router queries this table to resolve incoming messages to the
correct tenant. RLS is ENABLED — tenant agents can only see their own
channel connections.
"""
__tablename__ = "channel_connections"
__table_args__ = (
UniqueConstraint("channel_type", "workspace_id", name="uq_channel_workspace"),
)
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4,
)
tenant_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
ForeignKey("tenants.id", ondelete="CASCADE"),
nullable=False,
index=True,
)
channel_type: Mapped[ChannelTypeEnum] = mapped_column(
Enum(ChannelTypeEnum, name="channel_type_enum"),
nullable=False,
)
workspace_id: Mapped[str] = mapped_column(
String(255),
nullable=False,
comment="Channel-native workspace/org ID (e.g. Slack workspace ID T12345)",
)
config: Mapped[dict[str, Any]] = mapped_column(
JSON,
nullable=False,
default=dict,
comment="Encrypted bot tokens, channel IDs, and other per-tenant channel config",
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
)
# Relationships
tenant: Mapped[Tenant] = relationship("Tenant", back_populates="channel_connections")
def __repr__(self) -> str:
return f"<ChannelConnection channel={self.channel_type} workspace={self.workspace_id!r}>"

View File

@@ -0,0 +1,88 @@
"""
Namespaced Redis key constructors.
DESIGN PRINCIPLE: It must be impossible to construct a Redis key without
a tenant_id. Every function in this module requires tenant_id as its first
argument and prepends `{tenant_id}:` to every key.
This ensures strict per-tenant namespace isolation — Tenant A's rate limit
counters, session state, and deduplication keys are entirely separate from
Tenant B's, even though they share the same Redis instance.
Key format: {tenant_id}:{key_type}:{discriminator}
Examples:
rate_limit_key("acme", "slack") → "acme:ratelimit:slack"
idempotency_key("acme", "msg-123") → "acme:dedup:msg-123"
session_key("acme", "thread-456") → "acme:session:thread-456"
engaged_thread_key("acme", "T12345") → "acme:engaged:T12345"
"""
from __future__ import annotations
def rate_limit_key(tenant_id: str, channel: str) -> str:
"""
Redis key for per-tenant, per-channel rate limit counters.
Used by the token bucket rate limiter in the Message Router.
Args:
tenant_id: Konstruct tenant identifier.
channel: Channel type string (e.g. "slack", "whatsapp").
Returns:
Namespaced Redis key: "{tenant_id}:ratelimit:{channel}"
"""
return f"{tenant_id}:ratelimit:{channel}"
def idempotency_key(tenant_id: str, message_id: str) -> str:
"""
Redis key for message deduplication (idempotency).
Prevents duplicate processing when channels deliver events more than once
(e.g. Slack retry behaviour on gateway timeout).
Args:
tenant_id: Konstruct tenant identifier.
message_id: Unique message identifier from the channel.
Returns:
Namespaced Redis key: "{tenant_id}:dedup:{message_id}"
"""
return f"{tenant_id}:dedup:{message_id}"
def session_key(tenant_id: str, thread_id: str) -> str:
"""
Redis key for conversation session state.
Stores sliding window conversation history for a thread, used by the
Agent Orchestrator to maintain context between messages.
Args:
tenant_id: Konstruct tenant identifier.
thread_id: Thread identifier (e.g. Slack thread_ts or DM channel ID).
Returns:
Namespaced Redis key: "{tenant_id}:session:{thread_id}"
"""
return f"{tenant_id}:session:{thread_id}"
def engaged_thread_key(tenant_id: str, thread_id: str) -> str:
"""
Redis key tracking whether an agent is actively engaged in a thread.
An "engaged" thread means the agent has been @mentioned or responded in
this thread — subsequent messages in the thread don't require a new @mention.
Args:
tenant_id: Konstruct tenant identifier.
thread_id: Thread identifier.
Returns:
Namespaced Redis key: "{tenant_id}:engaged:{thread_id}"
"""
return f"{tenant_id}:engaged:{thread_id}"

View File

@@ -0,0 +1,67 @@
"""
PostgreSQL Row Level Security (RLS) integration.
How it works:
1. `current_tenant_id` is a ContextVar — set once per request/task.
2. `configure_rls_hook(engine)` registers a SQLAlchemy event listener that
fires before every cursor execute.
3. When `current_tenant_id` is set, the listener injects:
SET LOCAL app.current_tenant = '<tenant_id>'
into the current transaction.
4. PostgreSQL evaluates this setting in every RLS policy via:
current_setting('app.current_tenant')::uuid
CRITICAL: The application MUST connect as `konstruct_app` (not postgres
superuser). Superuser connections bypass RLS entirely — isolation tests
would pass trivially but provide zero real protection.
IMPORTANT: SET LOCAL is transaction-scoped. The tenant context resets
automatically when each transaction ends — no manual cleanup required.
"""
from __future__ import annotations
from contextvars import ContextVar
from typing import Any
from uuid import UUID
from sqlalchemy import event
from sqlalchemy.ext.asyncio import AsyncEngine
# ---------------------------------------------------------------------------
# ContextVar — set in middleware or request context
# ---------------------------------------------------------------------------
current_tenant_id: ContextVar[UUID | None] = ContextVar("current_tenant_id", default=None)
def configure_rls_hook(engine: AsyncEngine) -> None:
"""
Register the before_cursor_execute event on the given engine.
Call once at application startup, after the engine is created.
Example:
from shared.db import engine
from shared.rls import configure_rls_hook
configure_rls_hook(engine)
"""
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def _set_rls_tenant(
conn: Any,
cursor: Any,
statement: str,
parameters: Any,
context: Any,
executemany: bool,
) -> None:
"""
Inject SET LOCAL app.current_tenant before every statement.
Uses parameterized query to prevent SQL injection.
SET LOCAL is transaction-scoped and resets on commit/rollback.
"""
tenant_id = current_tenant_id.get()
if tenant_id is not None:
# Parameterized to prevent SQL injection — never use f-string here
cursor.execute("SET LOCAL app.current_tenant = %s", (str(tenant_id),))