diff --git a/alembic/versions/82feb220a9b8_add_source_column_to_provider_traces.py b/alembic/versions/82feb220a9b8_add_source_column_to_provider_traces.py new file mode 100644 index 00000000..5baad52d --- /dev/null +++ b/alembic/versions/82feb220a9b8_add_source_column_to_provider_traces.py @@ -0,0 +1,27 @@ +"""add source column to provider_traces + +Revision ID: 82feb220a9b8 +Revises: 539afa667cff +Create Date: 2026-01-18 21:09:59.529688 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "82feb220a9b8" +down_revision: Union[str, None] = "539afa667cff" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("provider_traces", sa.Column("source", sa.String(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("provider_traces", "source") diff --git a/letta/orm/provider_trace.py b/letta/orm/provider_trace.py index 29e8a63f..b0cbb181 100644 --- a/letta/orm/provider_trace.py +++ b/letta/orm/provider_trace.py @@ -28,6 +28,9 @@ class ProviderTrace(SqlalchemyBase, OrganizationMixin): agent_tags: Mapped[Optional[list]] = mapped_column(JSON, nullable=True, doc="Tags associated with the agent for filtering") call_type: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="Type of call (agent_step, summarization, etc.)") run_id: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="ID of the run this trace is associated with") + source: Mapped[Optional[str]] = mapped_column( + String, nullable=True, doc="Source service that generated this trace (memgpt-server, lettuce-py)" + ) # Relationships organization: Mapped["Organization"] = relationship("Organization", lazy="selectin") diff --git a/letta/schemas/provider_trace.py b/letta/schemas/provider_trace.py index a7493a58..10ca5c3a 100644 --- a/letta/schemas/provider_trace.py +++ b/letta/schemas/provider_trace.py @@ -27,6 +27,7 @@ class ProviderTrace(BaseProviderTrace): agent_tags (list[str]): Tags associated with the agent for filtering. call_type (str): Type of call (agent_step, summarization, etc.). run_id (str): ID of the run this trace is associated with. + source (str): Source service that generated this trace (memgpt-server, lettuce-py). organization_id (str): The unique identifier of the organization. created_at (datetime): The timestamp when the object was created. """ @@ -41,5 +42,6 @@ class ProviderTrace(BaseProviderTrace): agent_tags: Optional[list[str]] = Field(None, description="Tags associated with the agent for filtering") call_type: Optional[str] = Field(None, description="Type of call (agent_step, summarization, etc.)") run_id: Optional[str] = Field(None, description="ID of the run this trace is associated with") + source: Optional[str] = Field(None, description="Source service that generated this trace (memgpt-server, lettuce-py)") created_at: datetime = Field(default_factory=get_utc_time, description="The timestamp when the object was created.") diff --git a/letta/services/provider_trace_backends/socket.py b/letta/services/provider_trace_backends/socket.py index 3450acbc..dfb4ef8e 100644 --- a/letta/services/provider_trace_backends/socket.py +++ b/letta/services/provider_trace_backends/socket.py @@ -88,6 +88,7 @@ class SocketProviderTraceBackend(ProviderTraceBackendClient): "step_id": provider_trace.step_id, "tags": provider_trace.agent_tags or [], "type": provider_trace.call_type or "agent_step", + "source": provider_trace.source, "request": request, "response": response if not error else None, "error": error, diff --git a/letta/services/telemetry_manager.py b/letta/services/telemetry_manager.py index 0cfa2003..deddb20b 100644 --- a/letta/services/telemetry_manager.py +++ b/letta/services/telemetry_manager.py @@ -1,4 +1,5 @@ import asyncio +import os from letta.helpers.singleton import singleton from letta.log import get_logger @@ -6,6 +7,7 @@ from letta.otel.tracing import trace_method from letta.schemas.provider_trace import ProviderTrace from letta.schemas.user import User as PydanticUser from letta.services.provider_trace_backends import get_provider_trace_backend, get_provider_trace_backends +from letta.settings import telemetry_settings from letta.utils import enforce_types logger = get_logger(__name__) @@ -51,6 +53,12 @@ class TelemetryManager: actor: PydanticUser, provider_trace: ProviderTrace, ) -> ProviderTrace: + # Set source if not already set (use LETTA_TELEMETRY_SOURCE, fallback to DD_SERVICE) + if provider_trace.source is None: + source = telemetry_settings.source or os.environ.get("DD_SERVICE") + if source: + provider_trace = provider_trace.model_copy(update={"source": source}) + # Write to all backends concurrently tasks = [self._safe_create_async(backend, actor, provider_trace) for backend in self._backends] results = await asyncio.gather(*tasks) @@ -77,6 +85,12 @@ class TelemetryManager: provider_trace: ProviderTrace, ) -> ProviderTrace | None: """Synchronous version - writes to all backends.""" + # Set source if not already set (use LETTA_TELEMETRY_SOURCE, fallback to DD_SERVICE) + if provider_trace.source is None: + source = telemetry_settings.source or os.environ.get("DD_SERVICE") + if source: + provider_trace = provider_trace.model_copy(update={"source": source}) + result = None for backend in self._backends: try: diff --git a/letta/settings.py b/letta/settings.py index a4d084ca..3caead22 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -499,6 +499,10 @@ class TelemetrySettings(BaseSettings): validation_alias=AliasChoices("TELEMETRY_SOCKET", "socket_path"), description="Unix socket path for socket backend.", ) + source: str | None = Field( + default=None, + description="Source identifier for telemetry (memgpt-server, lettuce-py, etc.).", + ) @property def provider_trace_backends(self) -> list[str]: