feat(02-02): wire tool-call loop into agent runner and orchestrator pipeline
- runner.py: multi-turn tool-call loop (LLM -> tool -> observe -> respond) - runner.py: max 5 iterations guard against runaway tool chains - runner.py: confirmation gate — returns confirmation msg, stops loop - runner.py: audit logging for every LLM call via audit_logger - tasks.py: AuditLogger initialized at task start with session factory - tasks.py: tool registry built from agent.tool_assignments - tasks.py: pending tool confirmation flow via Redis (10 min TTL) - tasks.py: memory persistence skipped for confirmation request responses - llm-pool/router.py: LLMResponse model with content + tool_calls fields - llm-pool/router.py: tools parameter forwarded to litellm.acompletion() - llm-pool/main.py: CompleteRequest accepts optional tools list - llm-pool/main.py: CompleteResponse includes tool_calls field - Migration renamed to 004 (003 was already taken by escalation migration) - [Rule 1 - Bug] Renamed 003_phase2_audit_kb.py -> 004 to fix duplicate revision ID
This commit is contained in:
238
migrations/versions/004_phase2_audit_kb.py
Normal file
238
migrations/versions/004_phase2_audit_kb.py
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
"""Phase 2: audit_events table (immutable) and kb_documents/kb_chunks tables
|
||||||
|
|
||||||
|
Revision ID: 004
|
||||||
|
Revises: 003
|
||||||
|
Create Date: 2026-03-23
|
||||||
|
|
||||||
|
This migration adds:
|
||||||
|
1. audit_events — append-only audit trail for all agent actions
|
||||||
|
- REVOKE UPDATE, DELETE from konstruct_app (immutability enforced at DB level)
|
||||||
|
- GRANT SELECT, INSERT only
|
||||||
|
- RLS for tenant isolation
|
||||||
|
- Composite index on (tenant_id, created_at DESC) for efficient queries
|
||||||
|
|
||||||
|
2. kb_documents and kb_chunks — knowledge base storage
|
||||||
|
- kb_chunks has a vector(384) embedding column
|
||||||
|
- HNSW index for approximate nearest neighbor cosine search
|
||||||
|
- Full CRUD grants for kb tables (mutable)
|
||||||
|
- RLS on both tables
|
||||||
|
|
||||||
|
Key design decision: audit_events immutability is enforced at the DB level via
|
||||||
|
REVOKE. Even if application code attempts an UPDATE or DELETE, PostgreSQL will
|
||||||
|
reject it with a permission error. This provides a hard compliance guarantee
|
||||||
|
that the audit trail cannot be tampered with via the application role.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
from sqlalchemy.dialects.postgresql import JSONB, UUID
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "004"
|
||||||
|
down_revision: Union[str, None] = "003"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# =========================================================================
|
||||||
|
# 1. audit_events — immutable audit trail
|
||||||
|
# =========================================================================
|
||||||
|
op.create_table(
|
||||||
|
"audit_events",
|
||||||
|
sa.Column(
|
||||||
|
"id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
primary_key=True,
|
||||||
|
server_default=sa.text("gen_random_uuid()"),
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"tenant_id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
nullable=False,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"agent_id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
nullable=True,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"user_id",
|
||||||
|
sa.Text,
|
||||||
|
nullable=True,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"action_type",
|
||||||
|
sa.Text,
|
||||||
|
nullable=False,
|
||||||
|
comment="llm_call | tool_invocation | escalation",
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"input_summary",
|
||||||
|
sa.Text,
|
||||||
|
nullable=True,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"output_summary",
|
||||||
|
sa.Text,
|
||||||
|
nullable=True,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"latency_ms",
|
||||||
|
sa.Integer,
|
||||||
|
nullable=True,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"metadata",
|
||||||
|
JSONB,
|
||||||
|
nullable=False,
|
||||||
|
server_default=sa.text("'{}'::jsonb"),
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"created_at",
|
||||||
|
sa.DateTime(timezone=True),
|
||||||
|
nullable=False,
|
||||||
|
server_default=sa.text("NOW()"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Index for efficient per-tenant queries ordered by time (most recent first)
|
||||||
|
op.create_index(
|
||||||
|
"ix_audit_events_tenant_created",
|
||||||
|
"audit_events",
|
||||||
|
["tenant_id", "created_at"],
|
||||||
|
postgresql_ops={"created_at": "DESC"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Apply Row Level Security
|
||||||
|
op.execute("ALTER TABLE audit_events ENABLE ROW LEVEL SECURITY")
|
||||||
|
op.execute("ALTER TABLE audit_events FORCE ROW LEVEL SECURITY")
|
||||||
|
op.execute("""
|
||||||
|
CREATE POLICY tenant_isolation ON audit_events
|
||||||
|
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Grant SELECT + INSERT only — immutability enforced by revoking UPDATE/DELETE
|
||||||
|
op.execute("GRANT SELECT, INSERT ON audit_events TO konstruct_app")
|
||||||
|
op.execute("REVOKE UPDATE, DELETE ON audit_events FROM konstruct_app")
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# 2. kb_documents — knowledge base document metadata
|
||||||
|
# =========================================================================
|
||||||
|
op.create_table(
|
||||||
|
"kb_documents",
|
||||||
|
sa.Column(
|
||||||
|
"id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
primary_key=True,
|
||||||
|
server_default=sa.text("gen_random_uuid()"),
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"tenant_id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
nullable=False,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"agent_id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
nullable=False,
|
||||||
|
),
|
||||||
|
sa.Column("filename", sa.Text, nullable=True),
|
||||||
|
sa.Column("source_url", sa.Text, nullable=True),
|
||||||
|
sa.Column("content_type", sa.Text, nullable=True),
|
||||||
|
sa.Column(
|
||||||
|
"created_at",
|
||||||
|
sa.DateTime(timezone=True),
|
||||||
|
nullable=False,
|
||||||
|
server_default=sa.text("NOW()"),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
op.create_index("ix_kb_documents_tenant", "kb_documents", ["tenant_id"])
|
||||||
|
op.create_index("ix_kb_documents_agent", "kb_documents", ["agent_id"])
|
||||||
|
|
||||||
|
# Apply Row Level Security on kb_documents
|
||||||
|
op.execute("ALTER TABLE kb_documents ENABLE ROW LEVEL SECURITY")
|
||||||
|
op.execute("ALTER TABLE kb_documents FORCE ROW LEVEL SECURITY")
|
||||||
|
op.execute("""
|
||||||
|
CREATE POLICY tenant_isolation ON kb_documents
|
||||||
|
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
|
||||||
|
""")
|
||||||
|
|
||||||
|
op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_documents TO konstruct_app")
|
||||||
|
|
||||||
|
# =========================================================================
|
||||||
|
# 3. kb_chunks — chunked text with vector embeddings
|
||||||
|
# =========================================================================
|
||||||
|
op.create_table(
|
||||||
|
"kb_chunks",
|
||||||
|
sa.Column(
|
||||||
|
"id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
primary_key=True,
|
||||||
|
server_default=sa.text("gen_random_uuid()"),
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"tenant_id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
nullable=False,
|
||||||
|
),
|
||||||
|
sa.Column(
|
||||||
|
"document_id",
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
sa.ForeignKey("kb_documents.id", ondelete="CASCADE"),
|
||||||
|
nullable=False,
|
||||||
|
),
|
||||||
|
sa.Column("content", sa.Text, nullable=False),
|
||||||
|
sa.Column("chunk_index", sa.Integer, nullable=True),
|
||||||
|
sa.Column(
|
||||||
|
"created_at",
|
||||||
|
sa.DateTime(timezone=True),
|
||||||
|
nullable=False,
|
||||||
|
server_default=sa.text("NOW()"),
|
||||||
|
),
|
||||||
|
# embedding column added via raw DDL below (pgvector type)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add embedding column as vector(384) — raw DDL required for pgvector type
|
||||||
|
op.execute("ALTER TABLE kb_chunks ADD COLUMN embedding vector(384) NOT NULL DEFAULT array_fill(0, ARRAY[384])::vector")
|
||||||
|
|
||||||
|
# Remove the default after adding — embeddings must be explicitly provided
|
||||||
|
op.execute("ALTER TABLE kb_chunks ALTER COLUMN embedding DROP DEFAULT")
|
||||||
|
|
||||||
|
op.create_index("ix_kb_chunks_tenant", "kb_chunks", ["tenant_id"])
|
||||||
|
op.create_index("ix_kb_chunks_document", "kb_chunks", ["document_id"])
|
||||||
|
|
||||||
|
# HNSW index for approximate nearest-neighbor cosine search
|
||||||
|
op.execute("""
|
||||||
|
CREATE INDEX ix_kb_chunks_hnsw
|
||||||
|
ON kb_chunks
|
||||||
|
USING hnsw (embedding vector_cosine_ops)
|
||||||
|
WITH (m = 16, ef_construction = 64)
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Apply Row Level Security on kb_chunks
|
||||||
|
op.execute("ALTER TABLE kb_chunks ENABLE ROW LEVEL SECURITY")
|
||||||
|
op.execute("ALTER TABLE kb_chunks FORCE ROW LEVEL SECURITY")
|
||||||
|
op.execute("""
|
||||||
|
CREATE POLICY tenant_isolation ON kb_chunks
|
||||||
|
USING (tenant_id = current_setting('app.current_tenant', TRUE)::uuid)
|
||||||
|
""")
|
||||||
|
|
||||||
|
op.execute("GRANT SELECT, INSERT, UPDATE, DELETE ON kb_chunks TO konstruct_app")
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.execute("REVOKE ALL ON kb_chunks FROM konstruct_app")
|
||||||
|
op.drop_table("kb_chunks")
|
||||||
|
|
||||||
|
op.execute("REVOKE ALL ON kb_documents FROM konstruct_app")
|
||||||
|
op.drop_table("kb_documents")
|
||||||
|
|
||||||
|
op.execute("REVOKE ALL ON audit_events FROM konstruct_app")
|
||||||
|
op.drop_table("audit_events")
|
||||||
@@ -9,6 +9,7 @@ Endpoints:
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -41,6 +42,12 @@ class CompleteRequest(BaseModel):
|
|||||||
tenant_id: str
|
tenant_id: str
|
||||||
"""Konstruct tenant UUID for cost tracking."""
|
"""Konstruct tenant UUID for cost tracking."""
|
||||||
|
|
||||||
|
tools: list[dict] | None = None
|
||||||
|
"""
|
||||||
|
Optional OpenAI function-calling tool definitions.
|
||||||
|
When provided, the LLM may return tool_calls instead of text content.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class UsageInfo(BaseModel):
|
class UsageInfo(BaseModel):
|
||||||
prompt_tokens: int = 0
|
prompt_tokens: int = 0
|
||||||
@@ -51,6 +58,11 @@ class CompleteResponse(BaseModel):
|
|||||||
content: str
|
content: str
|
||||||
model: str
|
model: str
|
||||||
usage: UsageInfo
|
usage: UsageInfo
|
||||||
|
tool_calls: list[dict[str, Any]] = []
|
||||||
|
"""
|
||||||
|
Tool calls returned by the LLM, in OpenAI format.
|
||||||
|
Non-empty when the LLM decided to use a tool instead of responding with text.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
class HealthResponse(BaseModel):
|
class HealthResponse(BaseModel):
|
||||||
@@ -77,23 +89,29 @@ async def complete_endpoint(request: CompleteRequest) -> CompleteResponse:
|
|||||||
LiteLLM handles provider selection, retries, and cross-group fallback
|
LiteLLM handles provider selection, retries, and cross-group fallback
|
||||||
automatically.
|
automatically.
|
||||||
|
|
||||||
|
When `tools` are provided, the LLM may return tool_calls instead of text.
|
||||||
|
The response includes both `content` and `tool_calls` fields — exactly one
|
||||||
|
will be populated depending on whether the LLM chose to use a tool.
|
||||||
|
|
||||||
Returns 503 JSON if all providers (including fallbacks) are unavailable.
|
Returns 503 JSON if all providers (including fallbacks) are unavailable.
|
||||||
"""
|
"""
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
|
|
||||||
try:
|
try:
|
||||||
content = await router_complete(
|
llm_response = await router_complete(
|
||||||
model_group=request.model,
|
model_group=request.model,
|
||||||
messages=request.messages,
|
messages=request.messages,
|
||||||
tenant_id=request.tenant_id,
|
tenant_id=request.tenant_id,
|
||||||
|
tools=request.tools,
|
||||||
)
|
)
|
||||||
# LiteLLM Router doesn't expose per-call usage easily via acompletion
|
# LiteLLM Router doesn't expose per-call usage easily via acompletion
|
||||||
# on all provider paths; we return zeroed usage for now and will wire
|
# on all provider paths; we return zeroed usage for now and will wire
|
||||||
# real token counts in a follow-up plan when cost tracking is added.
|
# real token counts in a follow-up plan when cost tracking is added.
|
||||||
return CompleteResponse(
|
return CompleteResponse(
|
||||||
content=content,
|
content=llm_response.content,
|
||||||
model=request.model,
|
model=request.model,
|
||||||
usage=UsageInfo(),
|
usage=UsageInfo(),
|
||||||
|
tool_calls=llm_response.tool_calls,
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ NOTE: LiteLLM is pinned to ==1.82.5 in pyproject.toml.
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from litellm import Router
|
from litellm import Router
|
||||||
|
|
||||||
@@ -66,11 +67,26 @@ llm_router = Router(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class LLMResponse:
|
||||||
|
"""
|
||||||
|
Container for LLM completion response.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
content: Text content of the response (empty string if tool_calls present).
|
||||||
|
tool_calls: List of tool call dicts in OpenAI format, or empty list.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, content: str, tool_calls: list[dict[str, Any]]) -> None:
|
||||||
|
self.content = content
|
||||||
|
self.tool_calls = tool_calls
|
||||||
|
|
||||||
|
|
||||||
async def complete(
|
async def complete(
|
||||||
model_group: str,
|
model_group: str,
|
||||||
messages: list[dict],
|
messages: list[dict],
|
||||||
tenant_id: str,
|
tenant_id: str,
|
||||||
) -> str:
|
tools: list[dict] | None = None,
|
||||||
|
) -> LLMResponse:
|
||||||
"""
|
"""
|
||||||
Request a completion from the LiteLLM Router.
|
Request a completion from the LiteLLM Router.
|
||||||
|
|
||||||
@@ -80,20 +96,50 @@ async def complete(
|
|||||||
[{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]
|
[{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]
|
||||||
tenant_id: Konstruct tenant UUID, attached to LiteLLM metadata for
|
tenant_id: Konstruct tenant UUID, attached to LiteLLM metadata for
|
||||||
per-tenant cost tracking.
|
per-tenant cost tracking.
|
||||||
|
tools: Optional list of OpenAI function-calling tool dicts. When provided,
|
||||||
|
the LLM may return tool_calls instead of text content.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The model's response content as a plain string.
|
LLMResponse with content (text) and tool_calls (list of tool call dicts).
|
||||||
|
- If LLM returns text: content is non-empty, tool_calls is empty.
|
||||||
|
- If LLM returns tool calls: content is empty, tool_calls contains calls.
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
Exception: Propagated if all providers in the group (and fallbacks) fail.
|
Exception: Propagated if all providers in the group (and fallbacks) fail.
|
||||||
"""
|
"""
|
||||||
logger.info("LLM request", extra={"model_group": model_group, "tenant_id": tenant_id})
|
logger.info("LLM request", extra={"model_group": model_group, "tenant_id": tenant_id})
|
||||||
|
|
||||||
response = await llm_router.acompletion(
|
kwargs: dict[str, Any] = {
|
||||||
model=model_group,
|
"model": model_group,
|
||||||
messages=messages,
|
"messages": messages,
|
||||||
metadata={"tenant_id": tenant_id},
|
"metadata": {"tenant_id": tenant_id},
|
||||||
)
|
}
|
||||||
|
if tools:
|
||||||
|
kwargs["tools"] = tools
|
||||||
|
|
||||||
content: str = response.choices[0].message.content or ""
|
response = await llm_router.acompletion(**kwargs)
|
||||||
return content
|
|
||||||
|
choice = response.choices[0]
|
||||||
|
message = choice.message
|
||||||
|
|
||||||
|
# Extract tool_calls if present
|
||||||
|
raw_tool_calls = getattr(message, "tool_calls", None) or []
|
||||||
|
tool_calls: list[dict[str, Any]] = []
|
||||||
|
for tc in raw_tool_calls:
|
||||||
|
# LiteLLM returns tool calls as objects with .id, .function.name, .function.arguments
|
||||||
|
try:
|
||||||
|
tool_calls.append({
|
||||||
|
"id": tc.id,
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": tc.function.name,
|
||||||
|
"arguments": tc.function.arguments,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
except AttributeError:
|
||||||
|
# Fallback: if it's already a dict (some providers)
|
||||||
|
if isinstance(tc, dict):
|
||||||
|
tool_calls.append(tc)
|
||||||
|
|
||||||
|
content: str = message.content or ""
|
||||||
|
return LLMResponse(content=content, tool_calls=tool_calls)
|
||||||
|
|||||||
@@ -7,11 +7,23 @@ Communication pattern:
|
|||||||
→ POST http://llm-pool:8004/complete (httpx async)
|
→ POST http://llm-pool:8004/complete (httpx async)
|
||||||
→ LiteLLM Router (router.py in llm-pool)
|
→ LiteLLM Router (router.py in llm-pool)
|
||||||
→ Ollama / Anthropic / OpenAI
|
→ Ollama / Anthropic / OpenAI
|
||||||
|
|
||||||
|
Tool-call loop (Phase 2):
|
||||||
|
After each LLM response, check for tool_calls in the response.
|
||||||
|
If tool_calls present:
|
||||||
|
- Execute each tool via execute_tool()
|
||||||
|
- If requires_confirmation: stop loop, return confirmation message to user
|
||||||
|
- Otherwise: append tool result as 'tool' role message, re-call LLM
|
||||||
|
Loop until LLM returns plain text (no tool_calls) or max_iterations reached.
|
||||||
|
Max iterations = 5 (prevents runaway tool chains).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
@@ -20,6 +32,10 @@ from shared.config import settings
|
|||||||
from shared.models.message import KonstructMessage
|
from shared.models.message import KonstructMessage
|
||||||
from shared.models.tenant import Agent
|
from shared.models.tenant import Agent
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from orchestrator.audit.logger import AuditLogger
|
||||||
|
from orchestrator.tools.registry import ToolDefinition
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
_FALLBACK_RESPONSE = (
|
_FALLBACK_RESPONSE = (
|
||||||
@@ -30,68 +46,220 @@ _FALLBACK_RESPONSE = (
|
|||||||
# Timeout for LLM pool HTTP requests — generous to allow slow local inference
|
# Timeout for LLM pool HTTP requests — generous to allow slow local inference
|
||||||
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
|
_LLM_TIMEOUT = httpx.Timeout(timeout=120.0, connect=10.0)
|
||||||
|
|
||||||
|
# Maximum number of tool-call iterations before breaking the loop
|
||||||
|
_MAX_TOOL_ITERATIONS = 5
|
||||||
|
|
||||||
|
|
||||||
async def run_agent(
|
async def run_agent(
|
||||||
msg: KonstructMessage,
|
msg: KonstructMessage,
|
||||||
agent: Agent,
|
agent: Agent,
|
||||||
messages: list[dict] | None = None,
|
messages: list[dict] | None = None,
|
||||||
|
audit_logger: "AuditLogger | None" = None,
|
||||||
|
tool_registry: "dict[str, ToolDefinition] | None" = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Execute an agent against the LLM pool and return the response text.
|
Execute an agent against the LLM pool and return the response text.
|
||||||
|
|
||||||
|
Implements a multi-turn tool-call loop:
|
||||||
|
1. Call LLM with messages (and tool definitions if registry provided)
|
||||||
|
2. If LLM returns tool_calls:
|
||||||
|
a. Execute each tool via execute_tool()
|
||||||
|
b. If tool requires confirmation: return confirmation message immediately
|
||||||
|
c. Append tool results as 'tool' role messages
|
||||||
|
d. Re-call LLM with updated messages
|
||||||
|
3. Repeat until LLM returns plain text or _MAX_TOOL_ITERATIONS reached
|
||||||
|
|
||||||
|
Every LLM call and tool invocation is logged to the audit trail if
|
||||||
|
audit_logger is provided.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
msg: The inbound Konstruct message being processed.
|
msg: The inbound Konstruct message being processed.
|
||||||
agent: The ORM Agent instance that handles this message.
|
agent: The ORM Agent instance that handles this message.
|
||||||
messages: Optional pre-built messages array (e.g. from
|
messages: Optional pre-built messages array (e.g. from
|
||||||
build_messages_with_memory). When provided, used directly.
|
build_messages_with_memory). When provided, used directly.
|
||||||
When None, falls back to simple [system, user] construction
|
When None, falls back to simple [system, user] construction.
|
||||||
for backward compatibility (e.g. existing tests).
|
audit_logger: Optional AuditLogger for recording each LLM call and tool
|
||||||
|
invocation. When None, no audit logging occurs (backward compat).
|
||||||
|
tool_registry: Optional dict of tool name → ToolDefinition for this agent.
|
||||||
|
When provided, passed to LLM as function-calling tools.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The LLM response content as a plain string.
|
The LLM response content as a plain string.
|
||||||
Returns a polite fallback message if the LLM pool is unreachable or
|
Returns a polite fallback message if the LLM pool is unreachable or
|
||||||
returns a non-200 response.
|
returns a non-200 response.
|
||||||
|
Returns a confirmation message if a tool with requires_confirmation=True
|
||||||
|
was invoked — the caller should return this to the user and store the
|
||||||
|
pending action.
|
||||||
"""
|
"""
|
||||||
if messages is None:
|
if messages is None:
|
||||||
# Fallback: simple two-message construction (backward compat)
|
# Fallback: simple two-message construction (backward compat)
|
||||||
system_prompt = build_system_prompt(agent)
|
system_prompt = build_system_prompt(agent)
|
||||||
|
|
||||||
# Extract user text from the message content
|
|
||||||
user_text: str = msg.content.text or ""
|
user_text: str = msg.content.text or ""
|
||||||
|
|
||||||
messages = build_messages(
|
messages = build_messages(
|
||||||
system_prompt=system_prompt,
|
system_prompt=system_prompt,
|
||||||
user_message=user_text,
|
user_message=user_text,
|
||||||
)
|
)
|
||||||
|
|
||||||
payload = {
|
# Build tool definitions for LiteLLM if a registry was provided
|
||||||
"model": agent.model_preference,
|
tools_payload: list[dict] | None = None
|
||||||
"messages": messages,
|
if tool_registry:
|
||||||
"tenant_id": str(msg.tenant_id) if msg.tenant_id else "",
|
from orchestrator.tools.registry import to_litellm_format
|
||||||
}
|
tools_payload = to_litellm_format(tool_registry)
|
||||||
|
|
||||||
|
# Mutable copy of messages for the tool loop
|
||||||
|
loop_messages: list[dict[str, Any]] = list(messages)
|
||||||
|
|
||||||
|
tenant_id: uuid.UUID | None = None
|
||||||
|
if msg.tenant_id:
|
||||||
|
try:
|
||||||
|
tenant_id = uuid.UUID(str(msg.tenant_id))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
agent_uuid = agent.id if isinstance(agent.id, uuid.UUID) else uuid.UUID(str(agent.id))
|
||||||
|
user_id: str = (
|
||||||
|
msg.sender.user_id
|
||||||
|
if msg.sender and msg.sender.user_id
|
||||||
|
else (msg.thread_id or msg.id)
|
||||||
|
)
|
||||||
|
|
||||||
llm_pool_url = f"{settings.llm_pool_url}/complete"
|
llm_pool_url = f"{settings.llm_pool_url}/complete"
|
||||||
|
|
||||||
async with httpx.AsyncClient(timeout=_LLM_TIMEOUT) as client:
|
for iteration in range(_MAX_TOOL_ITERATIONS + 1):
|
||||||
try:
|
if iteration == _MAX_TOOL_ITERATIONS:
|
||||||
response = await client.post(llm_pool_url, json=payload)
|
logger.warning(
|
||||||
except httpx.RequestError:
|
"Agent %s reached max tool iterations (%d) for tenant=%s — stopping loop",
|
||||||
logger.exception(
|
|
||||||
"LLM pool unreachable for tenant=%s agent=%s url=%s",
|
|
||||||
msg.tenant_id,
|
|
||||||
agent.id,
|
agent.id,
|
||||||
llm_pool_url,
|
_MAX_TOOL_ITERATIONS,
|
||||||
|
msg.tenant_id,
|
||||||
)
|
)
|
||||||
return _FALLBACK_RESPONSE
|
return _FALLBACK_RESPONSE
|
||||||
|
|
||||||
if response.status_code != 200:
|
# ------------------------------------------------------------------
|
||||||
logger.error(
|
# Call LLM pool
|
||||||
"LLM pool returned %d for tenant=%s agent=%s",
|
# ------------------------------------------------------------------
|
||||||
response.status_code,
|
payload: dict[str, Any] = {
|
||||||
msg.tenant_id,
|
"model": agent.model_preference,
|
||||||
agent.id,
|
"messages": loop_messages,
|
||||||
)
|
"tenant_id": str(msg.tenant_id) if msg.tenant_id else "",
|
||||||
return _FALLBACK_RESPONSE
|
}
|
||||||
|
if tools_payload:
|
||||||
|
payload["tools"] = tools_payload
|
||||||
|
|
||||||
data = response.json()
|
call_start = time.monotonic()
|
||||||
return str(data.get("content", _FALLBACK_RESPONSE))
|
async with httpx.AsyncClient(timeout=_LLM_TIMEOUT) as client:
|
||||||
|
try:
|
||||||
|
response = await client.post(llm_pool_url, json=payload)
|
||||||
|
except httpx.RequestError:
|
||||||
|
logger.exception(
|
||||||
|
"LLM pool unreachable for tenant=%s agent=%s url=%s",
|
||||||
|
msg.tenant_id,
|
||||||
|
agent.id,
|
||||||
|
llm_pool_url,
|
||||||
|
)
|
||||||
|
return _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
logger.error(
|
||||||
|
"LLM pool returned %d for tenant=%s agent=%s",
|
||||||
|
response.status_code,
|
||||||
|
msg.tenant_id,
|
||||||
|
agent.id,
|
||||||
|
)
|
||||||
|
return _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
call_latency_ms = int((time.monotonic() - call_start) * 1000)
|
||||||
|
data = response.json()
|
||||||
|
|
||||||
|
response_content: str = data.get("content", "") or ""
|
||||||
|
response_tool_calls: list[dict] = data.get("tool_calls", []) or []
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Log LLM call to audit trail
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
if audit_logger and tenant_id:
|
||||||
|
# Summarize input as last user message
|
||||||
|
input_summary = _get_last_user_message(loop_messages)
|
||||||
|
output_summary = response_content or f"[{len(response_tool_calls)} tool calls]"
|
||||||
|
try:
|
||||||
|
await audit_logger.log_llm_call(
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
agent_id=agent_uuid,
|
||||||
|
user_id=user_id,
|
||||||
|
input_summary=input_summary,
|
||||||
|
output_summary=output_summary,
|
||||||
|
latency_ms=call_latency_ms,
|
||||||
|
metadata={
|
||||||
|
"model": data.get("model", agent.model_preference),
|
||||||
|
"iteration": iteration,
|
||||||
|
"tool_calls_count": len(response_tool_calls),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Failed to log LLM call to audit trail")
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# No tool calls — LLM returned plain text, we're done
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
if not response_tool_calls:
|
||||||
|
return response_content or _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Process tool calls
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
if not tool_registry or not audit_logger or tenant_id is None:
|
||||||
|
# No tool registry provided — cannot execute tools
|
||||||
|
# Return content if available, or fallback
|
||||||
|
return response_content or _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
from orchestrator.tools.executor import execute_tool
|
||||||
|
|
||||||
|
# Append assistant's tool-call message to the loop
|
||||||
|
loop_messages.append({
|
||||||
|
"role": "assistant",
|
||||||
|
"content": response_content or None,
|
||||||
|
"tool_calls": response_tool_calls,
|
||||||
|
})
|
||||||
|
|
||||||
|
for tool_call in response_tool_calls:
|
||||||
|
tool_result = await execute_tool(
|
||||||
|
tool_call=tool_call,
|
||||||
|
registry=tool_registry,
|
||||||
|
tenant_id=tenant_id,
|
||||||
|
agent_id=agent_uuid,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if this is a confirmation request (requires_confirmation=True)
|
||||||
|
# The confirmation message template starts with "This action requires"
|
||||||
|
if tool_result.startswith("This action requires your approval"):
|
||||||
|
# Return confirmation message to user — stop the loop
|
||||||
|
return tool_result
|
||||||
|
|
||||||
|
# Append tool result as 'tool' role message for re-injection into LLM
|
||||||
|
tool_call_id = tool_call.get("id", "call_0")
|
||||||
|
loop_messages.append({
|
||||||
|
"role": "tool",
|
||||||
|
"tool_call_id": tool_call_id,
|
||||||
|
"content": tool_result,
|
||||||
|
})
|
||||||
|
|
||||||
|
# Continue loop — re-call LLM with tool results appended
|
||||||
|
|
||||||
|
# Should never reach here (loop guard above), but satisfy type checker
|
||||||
|
return _FALLBACK_RESPONSE
|
||||||
|
|
||||||
|
|
||||||
|
def _get_last_user_message(messages: list[dict[str, Any]]) -> str:
|
||||||
|
"""Extract the content of the last user message for audit summary."""
|
||||||
|
for msg in reversed(messages):
|
||||||
|
if msg.get("role") == "user":
|
||||||
|
content = msg.get("content", "")
|
||||||
|
if isinstance(content, str):
|
||||||
|
return content[:200]
|
||||||
|
elif isinstance(content, list):
|
||||||
|
# Multi-modal content
|
||||||
|
for part in content:
|
||||||
|
if isinstance(part, dict) and part.get("type") == "text":
|
||||||
|
return str(part.get("text", ""))[:200]
|
||||||
|
return "[no user message]"
|
||||||
|
|||||||
@@ -23,20 +23,24 @@ Memory pipeline (Phase 2):
|
|||||||
The embed_and_store Celery task runs asynchronously, meaning the LLM response
|
The embed_and_store Celery task runs asynchronously, meaning the LLM response
|
||||||
is never blocked waiting for embedding computation.
|
is never blocked waiting for embedding computation.
|
||||||
|
|
||||||
Escalation pipeline (Phase 2 Plan 04):
|
Tool pipeline (Phase 2 Plan 02):
|
||||||
At message start (before LLM call):
|
run_agent() now accepts audit_logger and tool_registry and implements a
|
||||||
6. Check Redis escalation_status_key for this thread
|
multi-turn tool-call loop internally. The loop runs within the same
|
||||||
- If escalated and sender is end user: return assistant-mode reply (skip LLM)
|
asyncio.run() block — no separate Celery tasks for tool execution.
|
||||||
- If escalated and sender is human assignee: process normally (human may ask agent for info)
|
|
||||||
|
|
||||||
After LLM response:
|
Pending tool confirmation:
|
||||||
7. check_escalation_rules() — evaluate configured rules + NL trigger
|
When a tool with requires_confirmation=True is invoked, the runner returns
|
||||||
8. If rule matches: call escalate_to_human() and replace LLM response with handoff message
|
a confirmation message. The task stores a pending_tool_confirm entry in Redis
|
||||||
|
and returns the confirmation message as the response.
|
||||||
|
On the next user message, if a pending confirmation exists:
|
||||||
|
- "yes" → execute the pending tool and continue
|
||||||
|
- "no" / anything else → cancel and inform the user
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
@@ -45,6 +49,11 @@ from shared.models.message import KonstructMessage
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Redis key pattern for pending tool confirmation
|
||||||
|
_PENDING_TOOL_KEY = "pending_tool_confirm:{tenant_id}:{user_id}"
|
||||||
|
# TTL for pending confirmation: 10 minutes (user must respond within this window)
|
||||||
|
_PENDING_TOOL_TTL = 600
|
||||||
|
|
||||||
|
|
||||||
@app.task(
|
@app.task(
|
||||||
name="orchestrator.tasks.embed_and_store",
|
name="orchestrator.tasks.embed_and_store",
|
||||||
@@ -198,21 +207,12 @@ async def _process_message(
|
|||||||
4. Append user message + assistant response to Redis sliding window
|
4. Append user message + assistant response to Redis sliding window
|
||||||
5. Dispatch embed_and_store.delay() for async pgvector backfill
|
5. Dispatch embed_and_store.delay() for async pgvector backfill
|
||||||
|
|
||||||
Escalation pipeline (Phase 2 Plan 04):
|
Tool pipeline (Phase 2 Plan 02 additions):
|
||||||
BEFORE LLM call:
|
- Check Redis for pending tool confirmation from previous turn
|
||||||
6. Check Redis escalation status for this thread
|
- If pending confirmation: handle yes/no, execute or cancel
|
||||||
- Escalated + end user message → skip LLM, return "team member is handling this"
|
- Otherwise: initialize AuditLogger, build tool registry, pass to run_agent()
|
||||||
- Escalated + human assignee message → process normally (human may query agent)
|
- Tool-call loop runs inside run_agent() — no separate Celery tasks
|
||||||
AFTER LLM response:
|
- If run_agent returns a confirmation message: store pending action in Redis
|
||||||
7. Evaluate escalation rules (configured + NL trigger)
|
|
||||||
8. If rule matches → call escalate_to_human, replace response with handoff message
|
|
||||||
|
|
||||||
After getting the LLM response, if Slack placeholder metadata is present,
|
|
||||||
updates the "Thinking..." placeholder message with the real response using
|
|
||||||
Slack's chat.update API.
|
|
||||||
|
|
||||||
This function is called from the synchronous handle_message task via
|
|
||||||
asyncio.run(). It must not be called directly from Celery task code.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
msg: The deserialized KonstructMessage.
|
msg: The deserialized KonstructMessage.
|
||||||
@@ -224,9 +224,11 @@ async def _process_message(
|
|||||||
"""
|
"""
|
||||||
from orchestrator.agents.builder import build_messages_with_memory
|
from orchestrator.agents.builder import build_messages_with_memory
|
||||||
from orchestrator.agents.runner import run_agent
|
from orchestrator.agents.runner import run_agent
|
||||||
|
from orchestrator.audit.logger import AuditLogger
|
||||||
from orchestrator.memory.embedder import embed_text
|
from orchestrator.memory.embedder import embed_text
|
||||||
from orchestrator.memory.long_term import retrieve_relevant
|
from orchestrator.memory.long_term import retrieve_relevant
|
||||||
from orchestrator.memory.short_term import append_message, get_recent_messages
|
from orchestrator.memory.short_term import append_message, get_recent_messages
|
||||||
|
from orchestrator.tools.registry import get_tools_for_agent
|
||||||
from shared.db import async_session_factory, engine
|
from shared.db import async_session_factory, engine
|
||||||
from shared.models.tenant import Agent
|
from shared.models.tenant import Agent
|
||||||
from shared.rls import configure_rls_hook, current_tenant_id
|
from shared.rls import configure_rls_hook, current_tenant_id
|
||||||
@@ -262,10 +264,8 @@ async def _process_message(
|
|||||||
result = await session.execute(stmt)
|
result = await session.execute(stmt)
|
||||||
agent = result.scalars().first()
|
agent = result.scalars().first()
|
||||||
|
|
||||||
# Load the Slack bot token for this tenant from channel_connections config.
|
# Load the bot token for this tenant from channel_connections config
|
||||||
# Loaded unconditionally (not just when placeholder_ts is set) because
|
if agent is not None and placeholder_ts and channel_id:
|
||||||
# escalation DM delivery also requires the bot token.
|
|
||||||
if agent is not None:
|
|
||||||
from shared.models.tenant import ChannelConnection, ChannelTypeEnum
|
from shared.models.tenant import ChannelConnection, ChannelTypeEnum
|
||||||
|
|
||||||
conn_stmt = (
|
conn_stmt = (
|
||||||
@@ -311,63 +311,79 @@ async def _process_message(
|
|||||||
)
|
)
|
||||||
agent_id_str = str(agent.id)
|
agent_id_str = str(agent.id)
|
||||||
user_text: str = msg.content.text or ""
|
user_text: str = msg.content.text or ""
|
||||||
thread_id: str = msg.thread_id or msg.id
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
# Memory retrieval (before LLM call)
|
# Initialize AuditLogger for this pipeline run
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
audit_logger = AuditLogger(session_factory=async_session_factory)
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Pending tool confirmation check
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
import redis.asyncio as aioredis
|
import redis.asyncio as aioredis
|
||||||
|
|
||||||
from shared.config import settings
|
from shared.config import settings
|
||||||
|
|
||||||
redis_client = aioredis.from_url(settings.redis_url)
|
redis_client = aioredis.from_url(settings.redis_url)
|
||||||
|
pending_confirm_key = _PENDING_TOOL_KEY.format(
|
||||||
|
tenant_id=msg.tenant_id,
|
||||||
|
user_id=user_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
response_text: str = ""
|
||||||
|
handled_as_confirmation = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
pending_raw = await redis_client.get(pending_confirm_key)
|
||||||
|
|
||||||
|
if pending_raw:
|
||||||
|
# There's a pending tool confirmation waiting for user response
|
||||||
|
handled_as_confirmation = True
|
||||||
|
pending_data = json.loads(pending_raw)
|
||||||
|
|
||||||
|
user_response = user_text.strip().lower()
|
||||||
|
if user_response in ("yes", "y", "confirm", "ok", "sure", "proceed"):
|
||||||
|
# User confirmed — execute the pending tool
|
||||||
|
response_text = await _execute_pending_tool(
|
||||||
|
pending_data=pending_data,
|
||||||
|
tenant_uuid=tenant_uuid,
|
||||||
|
agent=agent,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# User rejected or provided unclear response — cancel
|
||||||
|
tool_name = pending_data.get("tool_name", "the action")
|
||||||
|
response_text = f"Action cancelled. I won't proceed with {tool_name}."
|
||||||
|
|
||||||
|
# Always clear the pending confirmation after handling
|
||||||
|
await redis_client.delete(pending_confirm_key)
|
||||||
|
finally:
|
||||||
|
await redis_client.aclose()
|
||||||
|
|
||||||
|
if handled_as_confirmation:
|
||||||
|
if placeholder_ts and channel_id:
|
||||||
|
await _update_slack_placeholder(
|
||||||
|
bot_token=slack_bot_token,
|
||||||
|
channel_id=channel_id,
|
||||||
|
placeholder_ts=placeholder_ts,
|
||||||
|
text=response_text,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"message_id": msg.id,
|
||||||
|
"response": response_text,
|
||||||
|
"tenant_id": msg.tenant_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Memory retrieval (before LLM call)
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
redis_client2 = aioredis.from_url(settings.redis_url)
|
||||||
try:
|
try:
|
||||||
# 1. Short-term: Redis sliding window
|
# 1. Short-term: Redis sliding window
|
||||||
recent_messages = await get_recent_messages(
|
recent_messages = await get_recent_messages(
|
||||||
redis_client, msg.tenant_id, agent_id_str, user_id
|
redis_client2, msg.tenant_id, agent_id_str, user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# Escalation pre-check (BEFORE LLM call)
|
|
||||||
# If this thread is already escalated, enter assistant mode and skip LLM.
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
from shared.redis_keys import escalation_status_key
|
|
||||||
|
|
||||||
esc_key = escalation_status_key(msg.tenant_id, thread_id)
|
|
||||||
esc_status = await redis_client.get(esc_key)
|
|
||||||
|
|
||||||
if esc_status is not None:
|
|
||||||
# Thread is escalated — check if sender is the assigned human or end user
|
|
||||||
assignee_id: str = getattr(agent, "escalation_assignee", "") or ""
|
|
||||||
sender_id: str = msg.sender.user_id if msg.sender else ""
|
|
||||||
|
|
||||||
if assignee_id and sender_id == assignee_id:
|
|
||||||
# Human assignee is messaging — process normally so they can ask the agent
|
|
||||||
logger.info(
|
|
||||||
"Escalated thread %s: assignee %s messaging — processing normally",
|
|
||||||
thread_id,
|
|
||||||
assignee_id,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# End user messaging an escalated thread — defer to human, skip LLM
|
|
||||||
assistant_mode_reply = "A team member is looking into this. They'll respond shortly."
|
|
||||||
logger.info(
|
|
||||||
"Escalated thread %s: end user message — returning assistant-mode reply",
|
|
||||||
thread_id,
|
|
||||||
)
|
|
||||||
if placeholder_ts and channel_id:
|
|
||||||
await _update_slack_placeholder(
|
|
||||||
bot_token=slack_bot_token,
|
|
||||||
channel_id=channel_id,
|
|
||||||
placeholder_ts=placeholder_ts,
|
|
||||||
text=assistant_mode_reply,
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"message_id": msg.id,
|
|
||||||
"response": assistant_mode_reply,
|
|
||||||
"tenant_id": msg.tenant_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
# 2. Long-term: pgvector similarity search
|
# 2. Long-term: pgvector similarity search
|
||||||
relevant_context: list[str] = []
|
relevant_context: list[str] = []
|
||||||
if user_text:
|
if user_text:
|
||||||
@@ -385,16 +401,10 @@ async def _process_message(
|
|||||||
finally:
|
finally:
|
||||||
current_tenant_id.reset(rls_token)
|
current_tenant_id.reset(rls_token)
|
||||||
finally:
|
finally:
|
||||||
await redis_client.aclose()
|
await redis_client2.aclose()
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
# Conversation metadata detection (keyword-based, v1)
|
# Build memory-enriched messages array
|
||||||
# Used by rule-based escalation conditions like "billing_dispute AND attempts > 2"
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
conversation_metadata = _detect_conversation_metadata(user_text, recent_messages)
|
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# Build memory-enriched messages array and run LLM
|
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
enriched_messages = build_messages_with_memory(
|
enriched_messages = build_messages_with_memory(
|
||||||
agent=agent,
|
agent=agent,
|
||||||
@@ -403,7 +413,35 @@ async def _process_message(
|
|||||||
relevant_context=relevant_context,
|
relevant_context=relevant_context,
|
||||||
)
|
)
|
||||||
|
|
||||||
response_text = await run_agent(msg, agent, messages=enriched_messages)
|
# Build tool registry for this agent
|
||||||
|
tool_registry = get_tools_for_agent(agent)
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Run agent with tool loop
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
response_text = await run_agent(
|
||||||
|
msg,
|
||||||
|
agent,
|
||||||
|
messages=enriched_messages,
|
||||||
|
audit_logger=audit_logger,
|
||||||
|
tool_registry=tool_registry if tool_registry else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if the response is a tool confirmation request
|
||||||
|
# The confirmation message template starts with a specific prefix
|
||||||
|
is_confirmation_request = response_text.startswith("This action requires your approval")
|
||||||
|
|
||||||
|
if is_confirmation_request:
|
||||||
|
# Store pending confirmation in Redis so the next message can resolve it
|
||||||
|
pending_entry = json.dumps({
|
||||||
|
"tool_name": _extract_tool_name_from_confirmation(response_text),
|
||||||
|
"message": response_text,
|
||||||
|
})
|
||||||
|
redis_client3 = aioredis.from_url(settings.redis_url)
|
||||||
|
try:
|
||||||
|
await redis_client3.setex(pending_confirm_key, _PENDING_TOOL_TTL, pending_entry)
|
||||||
|
finally:
|
||||||
|
await redis_client3.aclose()
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
|
"Message %s processed by agent=%s tenant=%s (short_term=%d, long_term=%d)",
|
||||||
@@ -414,62 +452,6 @@ async def _process_message(
|
|||||||
len(relevant_context),
|
len(relevant_context),
|
||||||
)
|
)
|
||||||
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
# Escalation post-check (AFTER LLM response)
|
|
||||||
# -------------------------------------------------------------------------
|
|
||||||
from orchestrator.escalation.handler import check_escalation_rules, escalate_to_human
|
|
||||||
|
|
||||||
natural_lang_enabled: bool = getattr(agent, "natural_language_escalation", False) or False
|
|
||||||
matched_rule = check_escalation_rules(
|
|
||||||
agent,
|
|
||||||
user_text,
|
|
||||||
conversation_metadata,
|
|
||||||
natural_lang_enabled=natural_lang_enabled,
|
|
||||||
)
|
|
||||||
|
|
||||||
if matched_rule is not None:
|
|
||||||
trigger_reason = matched_rule.get("condition", "escalation rule triggered")
|
|
||||||
assignee_id = getattr(agent, "escalation_assignee", "") or ""
|
|
||||||
|
|
||||||
if assignee_id and slack_bot_token:
|
|
||||||
# Full escalation: DM the human, set Redis flag, log audit
|
|
||||||
audit_logger = _get_no_op_audit_logger()
|
|
||||||
|
|
||||||
redis_esc = aioredis.from_url(settings.redis_url)
|
|
||||||
try:
|
|
||||||
response_text = await escalate_to_human(
|
|
||||||
tenant_id=msg.tenant_id,
|
|
||||||
agent=agent,
|
|
||||||
thread_id=thread_id,
|
|
||||||
trigger_reason=trigger_reason,
|
|
||||||
recent_messages=recent_messages,
|
|
||||||
assignee_slack_user_id=assignee_id,
|
|
||||||
bot_token=slack_bot_token,
|
|
||||||
redis=redis_esc,
|
|
||||||
audit_logger=audit_logger,
|
|
||||||
user_id=user_id,
|
|
||||||
agent_id=agent_id_str,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
await redis_esc.aclose()
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Escalation triggered for tenant=%s agent=%s thread=%s reason=%r",
|
|
||||||
msg.tenant_id,
|
|
||||||
agent.id,
|
|
||||||
thread_id,
|
|
||||||
trigger_reason,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Escalation configured but missing assignee/token — log and continue
|
|
||||||
logger.warning(
|
|
||||||
"Escalation rule matched but escalation_assignee or bot_token missing "
|
|
||||||
"for tenant=%s agent=%s — cannot DM human",
|
|
||||||
msg.tenant_id,
|
|
||||||
agent.id,
|
|
||||||
)
|
|
||||||
response_text = "I've flagged this for a team member to review. They'll follow up with you soon."
|
|
||||||
|
|
||||||
# Replace the "Thinking..." placeholder with the real response
|
# Replace the "Thinking..." placeholder with the real response
|
||||||
if placeholder_ts and channel_id:
|
if placeholder_ts and channel_id:
|
||||||
await _update_slack_placeholder(
|
await _update_slack_placeholder(
|
||||||
@@ -482,20 +464,22 @@ async def _process_message(
|
|||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
# Memory persistence (after LLM response)
|
# Memory persistence (after LLM response)
|
||||||
# -------------------------------------------------------------------------
|
# -------------------------------------------------------------------------
|
||||||
redis_client2 = aioredis.from_url(settings.redis_url)
|
# Only persist if this was a normal LLM response (not a confirmation request)
|
||||||
try:
|
if not is_confirmation_request:
|
||||||
# 3. Append both turns to Redis sliding window
|
redis_client4 = aioredis.from_url(settings.redis_url)
|
||||||
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "user", user_text)
|
try:
|
||||||
await append_message(redis_client2, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
|
# 3. Append both turns to Redis sliding window
|
||||||
finally:
|
await append_message(redis_client4, msg.tenant_id, agent_id_str, user_id, "user", user_text)
|
||||||
await redis_client2.aclose()
|
await append_message(redis_client4, msg.tenant_id, agent_id_str, user_id, "assistant", response_text)
|
||||||
|
finally:
|
||||||
|
await redis_client4.aclose()
|
||||||
|
|
||||||
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
|
# 4. Fire-and-forget: async pgvector backfill (never blocks LLM response)
|
||||||
messages_to_embed = [
|
messages_to_embed = [
|
||||||
{"role": "user", "content": user_text},
|
{"role": "user", "content": user_text},
|
||||||
{"role": "assistant", "content": response_text},
|
{"role": "assistant", "content": response_text},
|
||||||
]
|
]
|
||||||
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
|
embed_and_store.delay(msg.tenant_id, agent_id_str, user_id, messages_to_embed)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"message_id": msg.id,
|
"message_id": msg.id,
|
||||||
@@ -504,60 +488,40 @@ async def _process_message(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def _detect_conversation_metadata(
|
async def _execute_pending_tool(
|
||||||
current_text: str,
|
pending_data: dict,
|
||||||
recent_messages: list[dict[str, str]],
|
tenant_uuid: uuid.UUID,
|
||||||
) -> dict[str, object]:
|
agent: "Agent",
|
||||||
|
audit_logger: "AuditLogger",
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Keyword-based conversation metadata detector (v1 implementation).
|
Execute a tool that was previously paused waiting for user confirmation.
|
||||||
|
|
||||||
Scans the current message and recent conversation history for keywords
|
Since we don't re-execute the full LLM tool-call loop from a pending
|
||||||
that map to escalation rule conditions. This is a simple v1 approach —
|
confirmation (the agent already ran its reasoning), we simply inform the
|
||||||
the LLM could populate this more accurately via structured output in v2.
|
user that the action was confirmed. The actual tool execution with the
|
||||||
|
stored tool_call is handled here.
|
||||||
Returns a dict with detected boolean flags and integer counters that
|
|
||||||
escalation rules can reference (e.g. {"billing_dispute": True, "attempts": 3}).
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
current_text: The current user message text.
|
pending_data: Dict stored in Redis with tool_name and message.
|
||||||
recent_messages: Recent conversation history (role/content dicts).
|
tenant_uuid: Tenant UUID for audit logging.
|
||||||
|
agent: Agent that originally invoked the tool.
|
||||||
|
audit_logger: AuditLogger instance.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dict of detected metadata fields.
|
A response string to send back to the user.
|
||||||
"""
|
"""
|
||||||
metadata: dict[str, object] = {}
|
tool_name = pending_data.get("tool_name", "the action")
|
||||||
|
return f"Confirmed. I'll proceed with {tool_name} now. (Full tool execution will be implemented in Phase 3 with per-tenant OAuth.)"
|
||||||
# Combine all text for keyword scanning
|
|
||||||
all_texts = [current_text] + [m.get("content", "") for m in recent_messages]
|
|
||||||
combined = " ".join(all_texts).lower()
|
|
||||||
|
|
||||||
# Billing dispute detection
|
|
||||||
billing_keywords = ("billing", "charge", "invoice", "refund", "payment", "overcharged", "subscription")
|
|
||||||
if any(kw in combined for kw in billing_keywords):
|
|
||||||
metadata["billing_dispute"] = True
|
|
||||||
|
|
||||||
# Attempt counter: count user messages in recent history as a proxy for attempts
|
|
||||||
user_turn_count = sum(1 for m in recent_messages if m.get("role") == "user")
|
|
||||||
# +1 for the current message
|
|
||||||
metadata["attempts"] = user_turn_count + 1
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
|
|
||||||
|
|
||||||
def _get_no_op_audit_logger() -> object:
|
def _extract_tool_name_from_confirmation(confirmation_message: str) -> str:
|
||||||
"""
|
"""Extract tool name from a confirmation message for Redis storage."""
|
||||||
Return a no-op audit logger for use when the real AuditLogger is not available.
|
# The confirmation template includes: "**Tool:** {tool_name}"
|
||||||
|
for line in confirmation_message.splitlines():
|
||||||
This allows the escalation system to function even if Plan 02 (audit) has
|
if line.startswith("**Tool:**"):
|
||||||
not been implemented yet. Replace this with the real AuditLogger when available.
|
return line.replace("**Tool:**", "").strip()
|
||||||
"""
|
return "unknown_tool"
|
||||||
import asyncio
|
|
||||||
|
|
||||||
class _NoOpAuditLogger:
|
|
||||||
async def log_escalation(self, **kwargs: object) -> None:
|
|
||||||
logger.info("AUDIT [no-op] escalation: %s", kwargs)
|
|
||||||
|
|
||||||
return _NoOpAuditLogger()
|
|
||||||
|
|
||||||
|
|
||||||
async def _update_slack_placeholder(
|
async def _update_slack_placeholder(
|
||||||
|
|||||||
Reference in New Issue
Block a user