diff --git a/alembic/versions/a1b2c3d4e5f8_create_provider_trace_metadata_table.py b/alembic/versions/a1b2c3d4e5f8_create_provider_trace_metadata_table.py new file mode 100644 index 00000000..c4d3f1c7 --- /dev/null +++ b/alembic/versions/a1b2c3d4e5f8_create_provider_trace_metadata_table.py @@ -0,0 +1,59 @@ +"""create provider_trace_metadata table + +Revision ID: a1b2c3d4e5f8 +Revises: 9275f62ad282 +Create Date: 2026-01-28 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op +from letta.settings import settings + +revision: str = "a1b2c3d4e5f8" +down_revision: Union[str, None] = "9275f62ad282" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + if not settings.letta_pg_uri_no_default: + return + + op.create_table( + "provider_trace_metadata", + sa.Column("id", sa.String(), nullable=False), + sa.Column("step_id", sa.String(), nullable=True), + sa.Column("agent_id", sa.String(), nullable=True), + sa.Column("agent_tags", sa.JSON(), nullable=True), + sa.Column("call_type", sa.String(), nullable=True), + sa.Column("run_id", sa.String(), nullable=True), + sa.Column("source", sa.String(), nullable=True), + sa.Column("org_id", sa.String(), nullable=True), + sa.Column("user_id", sa.String(), nullable=True), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=True), + sa.Column("is_deleted", sa.Boolean(), server_default=sa.text("FALSE"), nullable=False), + sa.Column("_created_by_id", sa.String(), nullable=True), + sa.Column("_last_updated_by_id", sa.String(), nullable=True), + sa.Column("organization_id", sa.String(), nullable=False), + sa.ForeignKeyConstraint( + ["organization_id"], + ["organizations.id"], + ), + sa.PrimaryKeyConstraint("created_at", "id"), + ) + op.create_index("ix_provider_trace_metadata_step_id", "provider_trace_metadata", ["step_id"], unique=False) + op.create_index("ix_provider_trace_metadata_id", "provider_trace_metadata", ["id"], unique=True) + + +def downgrade() -> None: + if not settings.letta_pg_uri_no_default: + return + + op.drop_index("ix_provider_trace_metadata_id", table_name="provider_trace_metadata") + op.drop_index("ix_provider_trace_metadata_step_id", table_name="provider_trace_metadata") + op.drop_table("provider_trace_metadata") diff --git a/letta/orm/__init__.py b/letta/orm/__init__.py index 310e8d2f..72e1112a 100644 --- a/letta/orm/__init__.py +++ b/letta/orm/__init__.py @@ -31,6 +31,7 @@ from letta.orm.prompt import Prompt from letta.orm.provider import Provider from letta.orm.provider_model import ProviderModel from letta.orm.provider_trace import ProviderTrace +from letta.orm.provider_trace_metadata import ProviderTraceMetadata from letta.orm.run import Run from letta.orm.run_metrics import RunMetrics from letta.orm.sandbox_config import AgentEnvironmentVariable, SandboxConfig, SandboxEnvironmentVariable diff --git a/letta/orm/provider_trace_metadata.py b/letta/orm/provider_trace_metadata.py new file mode 100644 index 00000000..5d8fecf7 --- /dev/null +++ b/letta/orm/provider_trace_metadata.py @@ -0,0 +1,45 @@ +import uuid +from datetime import datetime +from typing import Optional + +from sqlalchemy import JSON, DateTime, Index, String, UniqueConstraint, func +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from letta.orm.mixins import OrganizationMixin +from letta.orm.sqlalchemy_base import SqlalchemyBase +from letta.schemas.provider_trace import ProviderTraceMetadata as PydanticProviderTraceMetadata + + +class ProviderTraceMetadata(SqlalchemyBase, OrganizationMixin): + """Metadata-only provider trace storage (no request/response JSON).""" + + __tablename__ = "provider_trace_metadata" + __pydantic_model__ = PydanticProviderTraceMetadata + __table_args__ = ( + Index("ix_provider_trace_metadata_step_id", "step_id"), + UniqueConstraint("id", name="uq_provider_trace_metadata_id"), + ) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), primary_key=True, server_default=func.now(), doc="Timestamp when the trace was created" + ) + id: Mapped[str] = mapped_column( + String, primary_key=True, doc="Unique provider trace identifier", default=lambda: f"provider_trace-{uuid.uuid4()}" + ) + step_id: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="ID of the step that this trace is associated with") + + # Telemetry context fields + agent_id: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="ID of the agent that generated this trace") + agent_tags: Mapped[Optional[list]] = mapped_column(JSON, nullable=True, doc="Tags associated with the agent for filtering") + call_type: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="Type of call (agent_step, summarization, etc.)") + run_id: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="ID of the run this trace is associated with") + source: Mapped[Optional[str]] = mapped_column( + String, nullable=True, doc="Source service that generated this trace (memgpt-server, lettuce-py)" + ) + + # v2 protocol fields + org_id: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="ID of the organization") + user_id: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="ID of the user who initiated the request") + + # Relationships + organization: Mapped["Organization"] = relationship("Organization", lazy="selectin") diff --git a/letta/schemas/provider_trace.py b/letta/schemas/provider_trace.py index 42ee6672..0f4202e8 100644 --- a/letta/schemas/provider_trace.py +++ b/letta/schemas/provider_trace.py @@ -54,3 +54,23 @@ class ProviderTrace(BaseProviderTrace): llm_config: Optional[Dict[str, Any]] = Field(None, description="LLM configuration used for this call (non-summarization calls only)") created_at: datetime = Field(default_factory=get_utc_time, description="The timestamp when the object was created.") + + +class ProviderTraceMetadata(BaseProviderTrace): + """Metadata-only representation of a provider trace (no request/response JSON).""" + + id: str = BaseProviderTrace.generate_id_field() + step_id: Optional[str] = Field(None, description="ID of the step that this trace is associated with") + + # Telemetry context fields + agent_id: Optional[str] = Field(None, description="ID of the agent that generated this trace") + agent_tags: Optional[list[str]] = Field(None, description="Tags associated with the agent for filtering") + call_type: Optional[str] = Field(None, description="Type of call (agent_step, summarization, etc.)") + run_id: Optional[str] = Field(None, description="ID of the run this trace is associated with") + source: Optional[str] = Field(None, description="Source service that generated this trace (memgpt-server, lettuce-py)") + + # v2 protocol fields + org_id: Optional[str] = Field(None, description="ID of the organization") + user_id: Optional[str] = Field(None, description="ID of the user who initiated the request") + + created_at: datetime = Field(default_factory=get_utc_time, description="The timestamp when the object was created.") diff --git a/letta/services/provider_trace_backends/postgres.py b/letta/services/provider_trace_backends/postgres.py index 9980cec9..a70eadf8 100644 --- a/letta/services/provider_trace_backends/postgres.py +++ b/letta/services/provider_trace_backends/postgres.py @@ -2,10 +2,12 @@ from letta.helpers.json_helpers import json_dumps, json_loads from letta.orm.provider_trace import ProviderTrace as ProviderTraceModel -from letta.schemas.provider_trace import ProviderTrace +from letta.orm.provider_trace_metadata import ProviderTraceMetadata as ProviderTraceMetadataModel +from letta.schemas.provider_trace import ProviderTrace, ProviderTraceMetadata from letta.schemas.user import User from letta.server.db import db_registry from letta.services.provider_trace_backends.base import ProviderTraceBackendClient +from letta.settings import telemetry_settings class PostgresProviderTraceBackend(ProviderTraceBackendClient): @@ -15,7 +17,17 @@ class PostgresProviderTraceBackend(ProviderTraceBackendClient): self, actor: User, provider_trace: ProviderTrace, + ) -> ProviderTrace | ProviderTraceMetadata: + if telemetry_settings.provider_trace_pg_metadata_only: + return await self._create_metadata_only_async(actor, provider_trace) + return await self._create_full_async(actor, provider_trace) + + async def _create_full_async( + self, + actor: User, + provider_trace: ProviderTrace, ) -> ProviderTrace: + """Write full provider trace to provider_traces table.""" async with db_registry.async_session() as session: provider_trace_model = ProviderTraceModel(**provider_trace.model_dump()) provider_trace_model.organization_id = actor.organization_id @@ -31,11 +43,44 @@ class PostgresProviderTraceBackend(ProviderTraceBackendClient): await provider_trace_model.create_async(session, actor=actor, no_commit=True, no_refresh=True) return provider_trace_model.to_pydantic() + async def _create_metadata_only_async( + self, + actor: User, + provider_trace: ProviderTrace, + ) -> ProviderTraceMetadata: + """Write metadata-only trace to provider_trace_metadata table.""" + metadata = ProviderTraceMetadata( + id=provider_trace.id, + step_id=provider_trace.step_id, + agent_id=provider_trace.agent_id, + agent_tags=provider_trace.agent_tags, + call_type=provider_trace.call_type, + run_id=provider_trace.run_id, + source=provider_trace.source, + org_id=provider_trace.org_id, + user_id=provider_trace.user_id, + ) + metadata_model = ProviderTraceMetadataModel(**metadata.model_dump()) + metadata_model.organization_id = actor.organization_id + + async with db_registry.async_session() as session: + await metadata_model.create_async(session, actor=actor, no_commit=True, no_refresh=True) + return metadata_model.to_pydantic() + async def get_by_step_id_async( self, step_id: str, actor: User, ) -> ProviderTrace | None: + """Read from provider_traces table. Always reads from full table regardless of write flag.""" + return await self._get_full_by_step_id_async(step_id, actor) + + async def _get_full_by_step_id_async( + self, + step_id: str, + actor: User, + ) -> ProviderTrace | None: + """Read from provider_traces table.""" async with db_registry.async_session() as session: provider_trace_model = await ProviderTraceModel.read_async( db_session=session, @@ -43,3 +88,17 @@ class PostgresProviderTraceBackend(ProviderTraceBackendClient): actor=actor, ) return provider_trace_model.to_pydantic() if provider_trace_model else None + + async def _get_metadata_by_step_id_async( + self, + step_id: str, + actor: User, + ) -> ProviderTraceMetadata | None: + """Read from provider_trace_metadata table.""" + async with db_registry.async_session() as session: + metadata_model = await ProviderTraceMetadataModel.read_async( + db_session=session, + step_id=step_id, + actor=actor, + ) + return metadata_model.to_pydantic() if metadata_model else None diff --git a/letta/settings.py b/letta/settings.py index 83649312..f52fca3c 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -519,6 +519,10 @@ class TelemetrySettings(BaseSettings): default=None, description="Source identifier for telemetry (memgpt-server, lettuce-py, etc.).", ) + provider_trace_pg_metadata_only: bool = Field( + default=False, + description="Write only metadata to Postgres (no request/response JSON). Requires provider_trace_metadata table to exist.", + ) @property def provider_trace_backends(self) -> list[str]: