From 609bcf84f2737decead0cc0a5e9ca6f3cbfc5f04 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Wed, 6 Aug 2025 20:19:29 -0700 Subject: [PATCH] feat: Add comprehensive error tracking to steps table (#3765) --- ...14d20_add_error_tracking_to_steps_table.py | 43 +++ letta/agent.py | 3 +- letta/agents/letta_agent.py | 358 +++++++++++------- letta/orm/step.py | 8 + letta/schemas/enums.py | 9 + letta/schemas/step.py | 6 + letta/services/step_manager.py | 180 ++++++++- .../tool_executor/builtin_tool_executor.py | 5 +- tests/integration_test_multi_agent.py | 6 +- tests/integration_test_sleeptime_agent.py | 1 + tests/test_managers.py | 304 +++++++++++++++ 11 files changed, 774 insertions(+), 149 deletions(-) create mode 100644 alembic/versions/f7f757414d20_add_error_tracking_to_steps_table.py diff --git a/alembic/versions/f7f757414d20_add_error_tracking_to_steps_table.py b/alembic/versions/f7f757414d20_add_error_tracking_to_steps_table.py new file mode 100644 index 00000000..41014c93 --- /dev/null +++ b/alembic/versions/f7f757414d20_add_error_tracking_to_steps_table.py @@ -0,0 +1,43 @@ +"""Add error tracking to steps table + +Revision ID: f7f757414d20 +Revises: 05c3bc564286 +Create Date: 2025-08-05 18:17:06.026153 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "f7f757414d20" +down_revision: Union[str, None] = "05c3bc564286" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + # Create the enum type first + stepstatus = sa.Enum("PENDING", "SUCCESS", "FAILED", "CANCELLED", name="stepstatus") + stepstatus.create(op.get_bind(), checkfirst=True) + + op.add_column("steps", sa.Column("error_type", sa.String(), nullable=True)) + op.add_column("steps", sa.Column("error_data", sa.JSON(), nullable=True)) + op.add_column("steps", sa.Column("status", stepstatus, nullable=True)) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("steps", "status") + op.drop_column("steps", "error_data") + op.drop_column("steps", "error_type") + + # Drop the enum type + stepstatus = sa.Enum("PENDING", "SUCCESS", "FAILED", "CANCELLED", name="stepstatus") + stepstatus.drop(op.get_bind(), checkfirst=True) + # ### end Alembic commands ### diff --git a/letta/agent.py b/letta/agent.py index ead504e1..7aec2469 100644 --- a/letta/agent.py +++ b/letta/agent.py @@ -45,7 +45,7 @@ from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState, AgentStepResponse, UpdateAgent, get_prompt_template_for_agent_type from letta.schemas.block import BlockUpdate from letta.schemas.embedding_config import EmbeddingConfig -from letta.schemas.enums import MessageRole, ProviderType, ToolType +from letta.schemas.enums import MessageRole, ProviderType, StepStatus, ToolType from letta.schemas.letta_message_content import ImageContent, TextContent from letta.schemas.memory import ContextWindowOverview, Memory from letta.schemas.message import Message, MessageCreate, ToolReturn @@ -991,6 +991,7 @@ class Agent(BaseAgent): job_id=job_id, step_id=step_id, project_id=self.agent_state.project_id, + status=StepStatus.SUCCESS, # Set to SUCCESS since we're logging after successful completion ) for message in all_new_messages: message.step_id = step.id diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index c3b95194..6db9907f 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -34,7 +34,7 @@ from letta.otel.context import get_ctx_attributes from letta.otel.metric_registry import MetricRegistry from letta.otel.tracing import log_event, trace_method, tracer from letta.schemas.agent import AgentState, UpdateAgent -from letta.schemas.enums import JobStatus, MessageRole, ProviderType, ToolType +from letta.schemas.enums import JobStatus, MessageRole, ProviderType, StepStatus, ToolType from letta.schemas.letta_message import MessageType from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.letta_response import LettaResponse @@ -241,6 +241,26 @@ class LettaAgent(BaseAgent): step_progression = StepProgression.START should_continue = False + + # Create step early with PENDING status + logged_step = await self.step_manager.log_step_async( + actor=self.actor, + agent_id=agent_state.id, + provider_name=agent_state.llm_config.model_endpoint_type, + provider_category=agent_state.llm_config.provider_category or "base", + model=agent_state.llm_config.model, + model_endpoint=agent_state.llm_config.model_endpoint, + context_window_limit=agent_state.llm_config.context_window, + usage=UsageStatistics(completion_tokens=0, prompt_tokens=0, total_tokens=0), + provider_id=None, + job_id=self.current_run_id if self.current_run_id else None, + step_id=step_id, + project_id=agent_state.project_id, + status=StepStatus.PENDING, + ) + # Only use step_id in messages if step was actually created + effective_step_id = step_id if logged_step else None + try: request_data, response_data, current_in_context_messages, new_in_context_messages, valid_tool_names = ( await self._build_and_request_from_llm( @@ -295,13 +315,17 @@ class LettaAgent(BaseAgent): tool_rules_solver, response.usage, reasoning_content=reasoning, - step_id=step_id, + step_id=effective_step_id, initial_messages=initial_messages, agent_step_span=agent_step_span, is_final_step=(i == max_steps - 1), ) step_progression = StepProgression.STEP_LOGGED + # Update step with actual usage now that we have it (if step was created) + if logged_step: + await self.step_manager.update_step_success_async(self.actor, step_id, response.usage, stop_reason) + # TODO (cliandy): handle message contexts with larger refactor and dedupe logic new_message_idx = len(initial_messages) if initial_messages else 0 self.response_messages.extend(persisted_messages[new_message_idx:]) @@ -321,7 +345,7 @@ class LettaAgent(BaseAgent): provider_trace_create=ProviderTraceCreate( request_json=request_data, response_json=response_data, - step_id=step_id, + step_id=step_id, # Use original step_id for telemetry organization_id=self.actor.organization_id, ), ) @@ -358,54 +382,57 @@ class LettaAgent(BaseAgent): # Update step if it needs to be updated finally: - if settings.track_stop_reason: - if step_progression == StepProgression.FINISHED and should_continue: - continue + if step_progression == StepProgression.FINISHED and should_continue: + continue - self.logger.debug("Running cleanup for agent loop run: %s", self.current_run_id) - self.logger.info("Running final update. Step Progression: %s", step_progression) - try: - if step_progression == StepProgression.FINISHED and not should_continue: - if stop_reason is None: - stop_reason = LettaStopReason(stop_reason=StopReasonType.end_turn.value) + self.logger.debug("Running cleanup for agent loop run: %s", self.current_run_id) + self.logger.info("Running final update. Step Progression: %s", step_progression) + try: + if step_progression == StepProgression.FINISHED and not should_continue: + # Successfully completed - update with final usage and stop reason + if stop_reason is None: + stop_reason = LettaStopReason(stop_reason=StopReasonType.end_turn.value) + # Note: step already updated with success status after _handle_ai_response + if logged_step: await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) - break + break - if step_progression < StepProgression.STEP_LOGGED: - await self.step_manager.log_step_async( + # Handle error cases + if step_progression < StepProgression.STEP_LOGGED: + # Error occurred before step was fully logged + import traceback + + if logged_step: + await self.step_manager.update_step_error_async( actor=self.actor, - agent_id=agent_state.id, - provider_name=agent_state.llm_config.model_endpoint_type, - provider_category=agent_state.llm_config.provider_category or "base", - model=agent_state.llm_config.model, - model_endpoint=agent_state.llm_config.model_endpoint, - context_window_limit=agent_state.llm_config.context_window, - usage=UsageStatistics(completion_tokens=0, prompt_tokens=0, total_tokens=0), - provider_id=None, - job_id=self.current_run_id if self.current_run_id else None, - step_id=step_id, - project_id=agent_state.project_id, + step_id=step_id, # Use original step_id for telemetry + error_type=type(e).__name__ if "e" in locals() else "Unknown", + error_message=str(e) if "e" in locals() else "Unknown error", + error_traceback=traceback.format_exc(), stop_reason=stop_reason, ) - if step_progression <= StepProgression.RESPONSE_RECEIVED: - # TODO (cliandy): persist response if we get it back - if settings.track_errored_messages: - for message in initial_messages: - message.is_err = True - message.step_id = step_id - await self.message_manager.create_many_messages_async(initial_messages, actor=self.actor) - elif step_progression <= StepProgression.LOGGED_TRACE: - if stop_reason is None: - self.logger.error("Error in step after logging step") - stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) - await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) - else: - self.logger.error("Invalid StepProgression value") + if step_progression <= StepProgression.RESPONSE_RECEIVED: + # TODO (cliandy): persist response if we get it back + if settings.track_errored_messages: + for message in initial_messages: + message.is_err = True + message.step_id = effective_step_id + await self.message_manager.create_many_messages_async(initial_messages, actor=self.actor) + elif step_progression <= StepProgression.LOGGED_TRACE: + if stop_reason is None: + self.logger.error("Error in step after logging step") + stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) + if logged_step: + await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) + else: + self.logger.error("Invalid StepProgression value") + + if settings.track_stop_reason: await self._log_request(request_start_timestamp_ns, request_span) - except Exception as e: - self.logger.error("Failed to update step: %s", e) + except Exception as e: + self.logger.error("Failed to update step: %s", e) if not should_continue: break @@ -484,6 +511,25 @@ class LettaAgent(BaseAgent): step_progression = StepProgression.START should_continue = False + # Create step early with PENDING status + logged_step = await self.step_manager.log_step_async( + actor=self.actor, + agent_id=agent_state.id, + provider_name=agent_state.llm_config.model_endpoint_type, + provider_category=agent_state.llm_config.provider_category or "base", + model=agent_state.llm_config.model, + model_endpoint=agent_state.llm_config.model_endpoint, + context_window_limit=agent_state.llm_config.context_window, + usage=UsageStatistics(completion_tokens=0, prompt_tokens=0, total_tokens=0), + provider_id=None, + job_id=run_id if run_id else self.current_run_id, + step_id=step_id, + project_id=agent_state.project_id, + status=StepStatus.PENDING, + ) + # Only use step_id in messages if step was actually created + effective_step_id = step_id if logged_step else None + try: request_data, response_data, current_in_context_messages, new_in_context_messages, valid_tool_names = ( await self._build_and_request_from_llm( @@ -533,7 +579,7 @@ class LettaAgent(BaseAgent): tool_rules_solver, response.usage, reasoning_content=reasoning, - step_id=step_id, + step_id=effective_step_id, initial_messages=initial_messages, agent_step_span=agent_step_span, is_final_step=(i == max_steps - 1), @@ -541,6 +587,10 @@ class LettaAgent(BaseAgent): ) step_progression = StepProgression.STEP_LOGGED + # Update step with actual usage now that we have it (if step was created) + if logged_step: + await self.step_manager.update_step_success_async(self.actor, step_id, response.usage, stop_reason) + new_message_idx = len(initial_messages) if initial_messages else 0 self.response_messages.extend(persisted_messages[new_message_idx:]) new_in_context_messages.extend(persisted_messages[new_message_idx:]) @@ -560,7 +610,7 @@ class LettaAgent(BaseAgent): provider_trace_create=ProviderTraceCreate( request_json=request_data, response_json=response_data, - step_id=step_id, + step_id=step_id, # Use original step_id for telemetry organization_id=self.actor.organization_id, ), ) @@ -584,54 +634,56 @@ class LettaAgent(BaseAgent): # Update step if it needs to be updated finally: - if settings.track_stop_reason: - if step_progression == StepProgression.FINISHED and should_continue: - continue + if step_progression == StepProgression.FINISHED and should_continue: + continue - self.logger.debug("Running cleanup for agent loop run: %s", self.current_run_id) - self.logger.info("Running final update. Step Progression: %s", step_progression) - try: - if step_progression == StepProgression.FINISHED and not should_continue: - if stop_reason is None: - stop_reason = LettaStopReason(stop_reason=StopReasonType.end_turn.value) - await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) - break + self.logger.debug("Running cleanup for agent loop run: %s", self.current_run_id) + self.logger.info("Running final update. Step Progression: %s", step_progression) + try: + if step_progression == StepProgression.FINISHED and not should_continue: + # Successfully completed - update with final usage and stop reason + if stop_reason is None: + stop_reason = LettaStopReason(stop_reason=StopReasonType.end_turn.value) + if logged_step: + await self.step_manager.update_step_success_async(self.actor, step_id, usage, stop_reason) + break - if step_progression < StepProgression.STEP_LOGGED: - await self.step_manager.log_step_async( + # Handle error cases + if step_progression < StepProgression.STEP_LOGGED: + # Error occurred before step was fully logged + import traceback + + if logged_step: + await self.step_manager.update_step_error_async( actor=self.actor, - agent_id=agent_state.id, - provider_name=agent_state.llm_config.model_endpoint_type, - provider_category=agent_state.llm_config.provider_category or "base", - model=agent_state.llm_config.model, - model_endpoint=agent_state.llm_config.model_endpoint, - context_window_limit=agent_state.llm_config.context_window, - usage=UsageStatistics(completion_tokens=0, prompt_tokens=0, total_tokens=0), - provider_id=None, - job_id=self.current_run_id if self.current_run_id else None, - step_id=step_id, - project_id=agent_state.project_id, + step_id=step_id, # Use original step_id for telemetry + error_type=type(e).__name__ if "e" in locals() else "Unknown", + error_message=str(e) if "e" in locals() else "Unknown error", + error_traceback=traceback.format_exc(), stop_reason=stop_reason, ) - if step_progression <= StepProgression.RESPONSE_RECEIVED: - # TODO (cliandy): persist response if we get it back - if settings.track_errored_messages: - for message in initial_messages: - message.is_err = True - message.step_id = step_id - await self.message_manager.create_many_messages_async(initial_messages, actor=self.actor) - elif step_progression <= StepProgression.LOGGED_TRACE: - if stop_reason is None: - self.logger.error("Error in step after logging step") - stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) - await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) - else: - self.logger.error("Invalid StepProgression value") + if step_progression <= StepProgression.RESPONSE_RECEIVED: + # TODO (cliandy): persist response if we get it back + if settings.track_errored_messages: + for message in initial_messages: + message.is_err = True + message.step_id = effective_step_id + await self.message_manager.create_many_messages_async(initial_messages, actor=self.actor) + elif step_progression <= StepProgression.LOGGED_TRACE: + if stop_reason is None: + self.logger.error("Error in step after logging step") + stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) + if logged_step: + await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) + else: + self.logger.error("Invalid StepProgression value") + + if settings.track_stop_reason: await self._log_request(request_start_timestamp_ns, request_span) - except Exception as e: - self.logger.error("Failed to update step: %s", e) + except Exception as e: + self.logger.error("Failed to update step: %s", e) if not should_continue: break @@ -717,6 +769,26 @@ class LettaAgent(BaseAgent): step_progression = StepProgression.START should_continue = False + + # Create step early with PENDING status + logged_step = await self.step_manager.log_step_async( + actor=self.actor, + agent_id=agent_state.id, + provider_name=agent_state.llm_config.model_endpoint_type, + provider_category=agent_state.llm_config.provider_category or "base", + model=agent_state.llm_config.model, + model_endpoint=agent_state.llm_config.model_endpoint, + context_window_limit=agent_state.llm_config.context_window, + usage=UsageStatistics(completion_tokens=0, prompt_tokens=0, total_tokens=0), + provider_id=None, + job_id=self.current_run_id if self.current_run_id else None, + step_id=step_id, + project_id=agent_state.project_id, + status=StepStatus.PENDING, + ) + # Only use step_id in messages if step was actually created + effective_step_id = step_id if logged_step else None + try: ( request_data, @@ -827,13 +899,26 @@ class LettaAgent(BaseAgent): ), reasoning_content=reasoning_content, pre_computed_assistant_message_id=interface.letta_message_id, - step_id=step_id, + step_id=effective_step_id, initial_messages=initial_messages, agent_step_span=agent_step_span, is_final_step=(i == max_steps - 1), ) step_progression = StepProgression.STEP_LOGGED + # Update step with actual usage now that we have it (if step was created) + if logged_step: + await self.step_manager.update_step_success_async( + self.actor, + step_id, + UsageStatistics( + completion_tokens=usage.completion_tokens, + prompt_tokens=usage.prompt_tokens, + total_tokens=usage.total_tokens, + ), + stop_reason, + ) + new_message_idx = len(initial_messages) if initial_messages else 0 self.response_messages.extend(persisted_messages[new_message_idx:]) new_in_context_messages.extend(persisted_messages[new_message_idx:]) @@ -872,7 +957,7 @@ class LettaAgent(BaseAgent): "output_tokens": usage.completion_tokens, }, }, - step_id=step_id, + step_id=step_id, # Use original step_id for telemetry organization_id=self.actor.organization_id, ), ) @@ -907,54 +992,57 @@ class LettaAgent(BaseAgent): # Update step if it needs to be updated finally: - if settings.track_stop_reason: - if step_progression == StepProgression.FINISHED and should_continue: - continue + if step_progression == StepProgression.FINISHED and should_continue: + continue - self.logger.debug("Running cleanup for agent loop run: %s", self.current_run_id) - self.logger.info("Running final update. Step Progression: %s", step_progression) - try: - if step_progression == StepProgression.FINISHED and not should_continue: - if stop_reason is None: - stop_reason = LettaStopReason(stop_reason=StopReasonType.end_turn.value) + self.logger.debug("Running cleanup for agent loop run: %s", self.current_run_id) + self.logger.info("Running final update. Step Progression: %s", step_progression) + try: + if step_progression == StepProgression.FINISHED and not should_continue: + # Successfully completed - update with final usage and stop reason + if stop_reason is None: + stop_reason = LettaStopReason(stop_reason=StopReasonType.end_turn.value) + # Note: step already updated with success status after _handle_ai_response + if logged_step: await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) - break + break - if step_progression < StepProgression.STEP_LOGGED: - await self.step_manager.log_step_async( + # Handle error cases + if step_progression < StepProgression.STEP_LOGGED: + # Error occurred before step was fully logged + import traceback + + if logged_step: + await self.step_manager.update_step_error_async( actor=self.actor, - agent_id=agent_state.id, - provider_name=agent_state.llm_config.model_endpoint_type, - provider_category=agent_state.llm_config.provider_category or "base", - model=agent_state.llm_config.model, - model_endpoint=agent_state.llm_config.model_endpoint, - context_window_limit=agent_state.llm_config.context_window, - usage=UsageStatistics(completion_tokens=0, prompt_tokens=0, total_tokens=0), - provider_id=None, - job_id=self.current_run_id if self.current_run_id else None, - step_id=step_id, - project_id=agent_state.project_id, + step_id=step_id, # Use original step_id for telemetry + error_type=type(e).__name__ if "e" in locals() else "Unknown", + error_message=str(e) if "e" in locals() else "Unknown error", + error_traceback=traceback.format_exc(), stop_reason=stop_reason, ) - if step_progression <= StepProgression.STREAM_RECEIVED: - if first_chunk and settings.track_errored_messages: - for message in initial_messages: - message.is_err = True - message.step_id = step_id - await self.message_manager.create_many_messages_async(initial_messages, actor=self.actor) - elif step_progression <= StepProgression.LOGGED_TRACE: - if stop_reason is None: - self.logger.error("Error in step after logging step") - stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) - await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) - else: - self.logger.error("Invalid StepProgression value") - # Do tracking for failure cases. Can consolidate with success conditions later. + if step_progression <= StepProgression.STREAM_RECEIVED: + if first_chunk and settings.track_errored_messages: + for message in initial_messages: + message.is_err = True + message.step_id = effective_step_id + await self.message_manager.create_many_messages_async(initial_messages, actor=self.actor) + elif step_progression <= StepProgression.LOGGED_TRACE: + if stop_reason is None: + self.logger.error("Error in step after logging step") + stop_reason = LettaStopReason(stop_reason=StopReasonType.error.value) + if logged_step: + await self.step_manager.update_step_stop_reason(self.actor, step_id, stop_reason.stop_reason) + else: + self.logger.error("Invalid StepProgression value") + + # Do tracking for failure cases. Can consolidate with success conditions later. + if settings.track_stop_reason: await self._log_request(request_start_timestamp_ns, request_span) - except Exception as e: - self.logger.error("Failed to update step: %s", e) + except Exception as e: + self.logger.error("Failed to update step: %s", e) if not should_continue: break @@ -1315,23 +1403,7 @@ class LettaAgent(BaseAgent): is_final_step=is_final_step, ) - # 5. Persist step + messages and propagate to jobs - logged_step = await self.step_manager.log_step_async( - actor=self.actor, - agent_id=agent_state.id, - provider_name=agent_state.llm_config.model_endpoint_type, - provider_category=agent_state.llm_config.provider_category or "base", - model=agent_state.llm_config.model, - model_endpoint=agent_state.llm_config.model_endpoint, - context_window_limit=agent_state.llm_config.context_window, - usage=usage, - provider_id=None, - job_id=run_id if run_id else self.current_run_id, - step_id=step_id, - project_id=agent_state.project_id, - stop_reason=stop_reason, - ) - + # 5. Create messages (step was already created at the beginning) tool_call_messages = create_letta_messages_from_llm_response( agent_id=agent_state.id, model=agent_state.llm_config.model, @@ -1347,7 +1419,7 @@ class LettaAgent(BaseAgent): heartbeat_reason=heartbeat_reason, reasoning_content=reasoning_content, pre_computed_assistant_message_id=pre_computed_assistant_message_id, - step_id=logged_step.id if logged_step else None, + step_id=step_id, ) persisted_messages = await self.message_manager.create_many_messages_async( diff --git a/letta/orm/step.py b/letta/orm/step.py index e35aa135..d616b85b 100644 --- a/letta/orm/step.py +++ b/letta/orm/step.py @@ -6,6 +6,7 @@ from sqlalchemy.orm import Mapped, mapped_column, relationship from letta.orm.mixins import ProjectMixin from letta.orm.sqlalchemy_base import SqlalchemyBase +from letta.schemas.enums import StepStatus from letta.schemas.letta_stop_reason import StopReasonType from letta.schemas.step import Step as PydanticStep @@ -55,6 +56,13 @@ class Step(SqlalchemyBase, ProjectMixin): None, nullable=True, doc="The feedback for this step. Must be either 'positive' or 'negative'." ) + # error handling + error_type: Mapped[Optional[str]] = mapped_column(None, nullable=True, doc="The type/class of the error that occurred") + error_data: Mapped[Optional[Dict]] = mapped_column( + JSON, nullable=True, doc="Error details including message, traceback, and additional context" + ) + status: Mapped[Optional[StepStatus]] = mapped_column(None, nullable=True, doc="Step status: pending, success, or failed") + # Relationships (foreign keys) organization: Mapped[Optional["Organization"]] = relationship("Organization") provider: Mapped[Optional["Provider"]] = relationship("Provider") diff --git a/letta/schemas/enums.py b/letta/schemas/enums.py index 340836c6..fe443aed 100644 --- a/letta/schemas/enums.py +++ b/letta/schemas/enums.py @@ -160,3 +160,12 @@ class SandboxType(str, Enum): E2B = "e2b" MODAL = "modal" LOCAL = "local" + + +class StepStatus(str, Enum): + """Status of a step execution""" + + PENDING = "pending" + SUCCESS = "success" + FAILED = "failed" + CANCELLED = "cancelled" diff --git a/letta/schemas/step.py b/letta/schemas/step.py index bc5cd204..a29bfb8e 100644 --- a/letta/schemas/step.py +++ b/letta/schemas/step.py @@ -3,6 +3,7 @@ from typing import Dict, List, Literal, Optional from pydantic import Field +from letta.schemas.enums import StepStatus from letta.schemas.letta_base import LettaBase from letta.schemas.letta_stop_reason import StopReasonType from letta.schemas.message import Message @@ -40,6 +41,11 @@ class Step(StepBase): ) project_id: Optional[str] = Field(None, description="The project that the agent that executed this step belongs to (cloud only).") + # error tracking fields + error_type: Optional[str] = Field(None, description="The type/class of the error that occurred") + error_data: Optional[Dict] = Field(None, description="Error details including message, traceback, and additional context") + status: Optional[StepStatus] = Field(StepStatus.PENDING, description="Step status: pending, success, or failed") + class StepProgression(int, Enum): START = auto() diff --git a/letta/services/step_manager.py b/letta/services/step_manager.py index 32be8275..26257179 100644 --- a/letta/services/step_manager.py +++ b/letta/services/step_manager.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum -from typing import List, Literal, Optional +from typing import Dict, List, Literal, Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -12,6 +12,7 @@ from letta.orm.job import Job as JobModel from letta.orm.sqlalchemy_base import AccessType from letta.orm.step import Step as StepModel from letta.otel.tracing import get_trace_id, trace_method +from letta.schemas.enums import StepStatus from letta.schemas.letta_stop_reason import LettaStopReason, StopReasonType from letta.schemas.openai.chat_completion_response import UsageStatistics from letta.schemas.step import Step as PydanticStep @@ -87,6 +88,10 @@ class StepManager: job_id: Optional[str] = None, step_id: Optional[str] = None, project_id: Optional[str] = None, + stop_reason: Optional[LettaStopReason] = None, + status: Optional[StepStatus] = None, + error_type: Optional[str] = None, + error_data: Optional[Dict] = None, ) -> PydanticStep: step_data = { "origin": None, @@ -106,9 +111,14 @@ class StepManager: "tid": None, "trace_id": get_trace_id(), # Get the current trace ID "project_id": project_id, + "status": status if status else StepStatus.PENDING, + "error_type": error_type, + "error_data": error_data, } if step_id: step_data["id"] = step_id + if stop_reason: + step_data["stop_reason"] = stop_reason.stop_reason with db_registry.session() as session: if job_id: self._verify_job_access(session, job_id, actor, access=["write"]) @@ -133,6 +143,9 @@ class StepManager: step_id: Optional[str] = None, project_id: Optional[str] = None, stop_reason: Optional[LettaStopReason] = None, + status: Optional[StepStatus] = None, + error_type: Optional[str] = None, + error_data: Optional[Dict] = None, ) -> PydanticStep: step_data = { "origin": None, @@ -152,6 +165,9 @@ class StepManager: "tid": None, "trace_id": get_trace_id(), # Get the current trace ID "project_id": project_id, + "status": status if status else StepStatus.PENDING, + "error_type": error_type, + "error_data": error_data, } if step_id: step_data["id"] = step_id @@ -236,6 +252,126 @@ class StepManager: await session.commit() return step + @enforce_types + @trace_method + async def update_step_error_async( + self, + actor: PydanticUser, + step_id: str, + error_type: str, + error_message: str, + error_traceback: str, + error_details: Optional[Dict] = None, + stop_reason: Optional[LettaStopReason] = None, + ) -> PydanticStep: + """Update a step with error information. + + Args: + actor: The user making the request + step_id: The ID of the step to update + error_type: The type/class of the error + error_message: The error message + error_traceback: Full error traceback + error_details: Additional error context + stop_reason: The stop reason to set + + Returns: + The updated step + + Raises: + NoResultFound: If the step does not exist + """ + async with db_registry.async_session() as session: + step = await session.get(StepModel, step_id) + if not step: + raise NoResultFound(f"Step with id {step_id} does not exist") + if step.organization_id != actor.organization_id: + raise Exception("Unauthorized") + + step.status = StepStatus.FAILED + step.error_type = error_type + step.error_data = {"message": error_message, "traceback": error_traceback, "details": error_details} + if stop_reason: + step.stop_reason = stop_reason.stop_reason + + await session.commit() + return step.to_pydantic() + + @enforce_types + @trace_method + async def update_step_success_async( + self, + actor: PydanticUser, + step_id: str, + usage: UsageStatistics, + stop_reason: Optional[LettaStopReason] = None, + ) -> PydanticStep: + """Update a step with success status and final usage statistics. + + Args: + actor: The user making the request + step_id: The ID of the step to update + usage: Final usage statistics + stop_reason: The stop reason to set + + Returns: + The updated step + + Raises: + NoResultFound: If the step does not exist + """ + async with db_registry.async_session() as session: + step = await session.get(StepModel, step_id) + if not step: + raise NoResultFound(f"Step with id {step_id} does not exist") + if step.organization_id != actor.organization_id: + raise Exception("Unauthorized") + + step.status = StepStatus.SUCCESS + step.completion_tokens = usage.completion_tokens + step.prompt_tokens = usage.prompt_tokens + step.total_tokens = usage.total_tokens + if stop_reason: + step.stop_reason = stop_reason.stop_reason + + await session.commit() + return step.to_pydantic() + + @enforce_types + @trace_method + async def update_step_cancelled_async( + self, + actor: PydanticUser, + step_id: str, + stop_reason: Optional[LettaStopReason] = None, + ) -> PydanticStep: + """Update a step with cancelled status. + + Args: + actor: The user making the request + step_id: The ID of the step to update + stop_reason: The stop reason to set + + Returns: + The updated step + + Raises: + NoResultFound: If the step does not exist + """ + async with db_registry.async_session() as session: + step = await session.get(StepModel, step_id) + if not step: + raise NoResultFound(f"Step with id {step_id} does not exist") + if step.organization_id != actor.organization_id: + raise Exception("Unauthorized") + + step.status = StepStatus.CANCELLED + if stop_reason: + step.stop_reason = stop_reason.stop_reason + + await session.commit() + return step.to_pydantic() + def _verify_job_access( self, session: Session, @@ -319,6 +455,10 @@ class NoopStepManager(StepManager): job_id: Optional[str] = None, step_id: Optional[str] = None, project_id: Optional[str] = None, + stop_reason: Optional[LettaStopReason] = None, + status: Optional[StepStatus] = None, + error_type: Optional[str] = None, + error_data: Optional[Dict] = None, ) -> PydanticStep: return @@ -339,5 +479,43 @@ class NoopStepManager(StepManager): step_id: Optional[str] = None, project_id: Optional[str] = None, stop_reason: Optional[LettaStopReason] = None, + status: Optional[StepStatus] = None, + error_type: Optional[str] = None, + error_data: Optional[Dict] = None, + ) -> PydanticStep: + return + + @enforce_types + @trace_method + async def update_step_error_async( + self, + actor: PydanticUser, + step_id: str, + error_type: str, + error_message: str, + error_traceback: str, + error_details: Optional[Dict] = None, + stop_reason: Optional[LettaStopReason] = None, + ) -> PydanticStep: + return + + @enforce_types + @trace_method + async def update_step_success_async( + self, + actor: PydanticUser, + step_id: str, + usage: UsageStatistics, + stop_reason: Optional[LettaStopReason] = None, + ) -> PydanticStep: + return + + @enforce_types + @trace_method + async def update_step_cancelled_async( + self, + actor: PydanticUser, + step_id: str, + stop_reason: Optional[LettaStopReason] = None, ) -> PydanticStep: return diff --git a/letta/services/tool_executor/builtin_tool_executor.py b/letta/services/tool_executor/builtin_tool_executor.py index 77810bc4..a4146320 100644 --- a/letta/services/tool_executor/builtin_tool_executor.py +++ b/letta/services/tool_executor/builtin_tool_executor.py @@ -209,7 +209,10 @@ class LettaBuiltinToolExecutor(ToolExecutor): logger.info(f"[DEBUG] Starting Firecrawl search for query: '{task.query}' with limit={limit}") # Perform the search for this task - search_result = await app.search(task.query, limit=limit, scrape_options=ScrapeOptions(formats=["markdown"])) + scrape_options = ScrapeOptions( + formats=["markdown"], excludeTags=["#ad", "#footer"], onlyMainContent=True, parsePDF=True, removeBase64Images=True + ) + search_result = await app.search(task.query, limit=limit, scrape_options=scrape_options) logger.info( f"[DEBUG] Firecrawl search completed for '{task.query}': {len(search_result.get('data', [])) if search_result else 0} results" diff --git a/tests/integration_test_multi_agent.py b/tests/integration_test_multi_agent.py index ad1139e8..36f25508 100644 --- a/tests/integration_test_multi_agent.py +++ b/tests/integration_test_multi_agent.py @@ -95,7 +95,7 @@ def agent_obj(client): ) yield agent_state_instance - client.agents.delete(agent_state_instance.id) + # client.agents.delete(agent_state_instance.id) @pytest.fixture(scope="function") @@ -111,7 +111,7 @@ def other_agent_obj(client): yield agent_state_instance - client.agents.delete(agent_state_instance.id) + # client.agents.delete(agent_state_instance.id) @pytest.fixture @@ -150,7 +150,7 @@ def test_send_message_to_agent(client, server, agent_obj, other_agent_obj): actor = server.user_manager.get_user_or_default() # Encourage the agent to send a message to the other agent_obj with the secret string - client.agents.messages.create( + response = client.agents.messages.create( agent_id=agent_obj.id, messages=[ { diff --git a/tests/integration_test_sleeptime_agent.py b/tests/integration_test_sleeptime_agent.py index e2190633..5f08452b 100644 --- a/tests/integration_test_sleeptime_agent.py +++ b/tests/integration_test_sleeptime_agent.py @@ -257,6 +257,7 @@ async def test_sleeptime_group_chat_v2(server, actor): job_manager=server.job_manager, actor=actor, group=main_agent.multi_agent_group, + step_manager=server.step_manager, ) response = await agent.step( diff --git a/tests/test_managers.py b/tests/test_managers.py index cc6ba588..a6d6e4b9 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -62,6 +62,7 @@ from letta.schemas.enums import ( MessageRole, ProviderType, SandboxType, + StepStatus, ToolType, ) from letta.schemas.environment_variables import SandboxEnvironmentVariableCreate, SandboxEnvironmentVariableUpdate @@ -74,6 +75,7 @@ from letta.schemas.job import Job as PydanticJob from letta.schemas.job import JobUpdate, LettaRequestConfig from letta.schemas.letta_message import UpdateAssistantMessage, UpdateReasoningMessage, UpdateSystemMessage, UpdateUserMessage from letta.schemas.letta_message_content import TextContent +from letta.schemas.letta_stop_reason import LettaStopReason, StopReasonType from letta.schemas.llm_batch_job import AgentStepState, LLMBatchItem from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage @@ -8036,6 +8038,308 @@ async def test_job_usage_stats_add_multiple(server: SyncServer, sarah_agent, def assert len(steps_without_feedback) == 2 +@pytest.mark.asyncio +async def test_step_manager_error_tracking(server: SyncServer, sarah_agent, default_job, default_user, event_loop): + """Test step manager error tracking functionality.""" + step_manager = server.step_manager + + # Create a step with pending status + step = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=0, + prompt_tokens=0, + total_tokens=0, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=StepStatus.PENDING, + ) + + assert step.status == StepStatus.PENDING + assert step.error_type is None + assert step.error_data is None + + # Test update_step_error_async + error_details = {"step_progression": "RESPONSE_RECEIVED", "context": "Test error context"} + + updated_step = await step_manager.update_step_error_async( + actor=default_user, + step_id=step.id, + error_type="ValueError", + error_message="Test error message", + error_traceback="Traceback (most recent call last):\n File test.py, line 1\n raise ValueError('Test error')", + error_details=error_details, + stop_reason=LettaStopReason(stop_reason=StopReasonType.error.value), + ) + + assert updated_step.status == StepStatus.FAILED + assert updated_step.error_type == "ValueError" + assert updated_step.error_data["message"] == "Test error message" + assert updated_step.error_data["traceback"].startswith("Traceback") + assert updated_step.error_data["details"] == error_details + assert updated_step.stop_reason == StopReasonType.error + + # Create another step to test success update + success_step = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=0, + prompt_tokens=0, + total_tokens=0, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=StepStatus.PENDING, + ) + + # Test update_step_success_async + final_usage = UsageStatistics( + completion_tokens=150, + prompt_tokens=100, + total_tokens=250, + ) + + updated_success_step = await step_manager.update_step_success_async( + actor=default_user, + step_id=success_step.id, + usage=final_usage, + stop_reason=LettaStopReason(stop_reason=StopReasonType.end_turn.value), + ) + + assert updated_success_step.status == StepStatus.SUCCESS + assert updated_success_step.completion_tokens == 150 + assert updated_success_step.prompt_tokens == 100 + assert updated_success_step.total_tokens == 250 + assert updated_success_step.stop_reason == StopReasonType.end_turn + assert updated_success_step.error_type is None + assert updated_success_step.error_data is None + + # Create a step to test cancellation + cancelled_step = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=0, + prompt_tokens=0, + total_tokens=0, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=StepStatus.PENDING, + ) + + # Test update_step_cancelled_async + updated_cancelled_step = await step_manager.update_step_cancelled_async( + actor=default_user, + step_id=cancelled_step.id, + stop_reason=LettaStopReason(stop_reason=StopReasonType.cancelled.value), + ) + + assert updated_cancelled_step.status == StepStatus.CANCELLED + assert updated_cancelled_step.stop_reason == StopReasonType.cancelled + assert updated_cancelled_step.error_type is None + assert updated_cancelled_step.error_data is None + + +@pytest.mark.asyncio +async def test_step_manager_error_tracking_edge_cases(server: SyncServer, sarah_agent, default_job, default_user, event_loop): + """Test edge cases for step manager error tracking.""" + step_manager = server.step_manager + + # Test 1: Attempt to update non-existent step + with pytest.raises(NoResultFound): + await step_manager.update_step_error_async( + actor=default_user, + step_id="non-existent-step-id", + error_type="TestError", + error_message="Test", + error_traceback="Test traceback", + ) + + # Test 2: Create step with initial error information + step_with_error = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=0, + prompt_tokens=0, + total_tokens=0, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=StepStatus.FAILED, + error_type="InitialError", + error_data={"message": "Step failed at creation", "traceback": "Initial traceback", "details": {"initial": True}}, + ) + + assert step_with_error.status == StepStatus.FAILED + assert step_with_error.error_type == "InitialError" + assert step_with_error.error_data["message"] == "Step failed at creation" + assert step_with_error.error_data["details"] == {"initial": True} + + # Test 3: Update from failed to success (recovery scenario) + recovered_step = await step_manager.update_step_success_async( + actor=default_user, + step_id=step_with_error.id, + usage=UsageStatistics( + completion_tokens=50, + prompt_tokens=30, + total_tokens=80, + ), + ) + + # Verify error fields are still present but status changed + assert recovered_step.status == StepStatus.SUCCESS + assert recovered_step.error_type == "InitialError" # Should retain error info + assert recovered_step.completion_tokens == 50 + + # Test 4: Very long error messages and tracebacks + long_error_step = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=0, + prompt_tokens=0, + total_tokens=0, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=StepStatus.PENDING, + ) + + very_long_traceback = "Traceback (most recent call last):\n" + "\n".join([f" File 'test{i}.py', line {i}" for i in range(100)]) + complex_error_details = { + "nested": {"data": {"arrays": [1, 2, 3, 4, 5], "strings": ["error1", "error2", "error3"], "booleans": [True, False, True]}}, + "timestamp": "2024-01-01T00:00:00Z", + "context": "Complex nested error details", + } + + updated_long_error = await step_manager.update_step_error_async( + actor=default_user, + step_id=long_error_step.id, + error_type="VeryLongError", + error_message="A" * 500, # Very long error message + error_traceback=very_long_traceback, + error_details=complex_error_details, + ) + + assert updated_long_error.status == StepStatus.FAILED + assert len(updated_long_error.error_data["message"]) == 500 + assert "test99.py" in updated_long_error.error_data["traceback"] + assert updated_long_error.error_data["details"]["nested"]["data"]["arrays"] == [1, 2, 3, 4, 5] + + # Test 5: Multiple status updates on same step + multi_update_step = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=0, + prompt_tokens=0, + total_tokens=0, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=StepStatus.PENDING, + ) + + # First update to cancelled + step1 = await step_manager.update_step_cancelled_async( + actor=default_user, + step_id=multi_update_step.id, + ) + assert step1.status == StepStatus.CANCELLED + + # Then update to error (simulating race condition or retry) + step2 = await step_manager.update_step_error_async( + actor=default_user, + step_id=multi_update_step.id, + error_type="PostCancellationError", + error_message="Error after cancellation", + error_traceback="Traceback after cancel", + ) + assert step2.status == StepStatus.FAILED + assert step2.error_type == "PostCancellationError" + + +@pytest.mark.asyncio +async def test_step_manager_list_steps_with_status_filter(server: SyncServer, sarah_agent, default_job, default_user, event_loop): + """Test listing steps with status filters.""" + step_manager = server.step_manager + + # Create steps with different statuses + statuses = [StepStatus.PENDING, StepStatus.SUCCESS, StepStatus.FAILED, StepStatus.CANCELLED] + created_steps = [] + + for status in statuses: + step = await step_manager.log_step_async( + agent_id=sarah_agent.id, + provider_name="openai", + provider_category="base", + model="gpt-4o-mini", + model_endpoint="https://api.openai.com/v1", + context_window_limit=8192, + job_id=default_job.id, + usage=UsageStatistics( + completion_tokens=10, + prompt_tokens=20, + total_tokens=30, + ), + actor=default_user, + project_id=sarah_agent.project_id, + status=status, + ) + created_steps.append(step) + + # List all steps for the agent + all_steps = await step_manager.list_steps_async( + agent_id=sarah_agent.id, + actor=default_user, + ) + + # Verify we can find steps with each status + status_counts = {status: 0 for status in statuses} + for step in all_steps: + if step.status in status_counts: + status_counts[step.status] += 1 + + # Each status should have at least one step + for status in statuses: + assert status_counts[status] >= 1, f"No steps found with status {status}" + + def test_job_usage_stats_get_nonexistent_job(server: SyncServer, default_user): """Test getting usage statistics for a nonexistent job.""" job_manager = server.job_manager