diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index e737ee32..4643437f 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -23,7 +23,6 @@ from letta.server.rest_api.streaming_response import ( cancellation_aware_stream_wrapper, ) from letta.server.server import SyncServer -from letta.services.lettuce import LettuceClient from letta.services.run_manager import RunManager from letta.settings import settings @@ -150,26 +149,7 @@ async def retrieve_run( """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) runs_manager = RunManager() - - run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor) - - use_lettuce = run.metadata and run.metadata.get("lettuce") - if use_lettuce and run.status not in [RunStatus.completed, RunStatus.failed, RunStatus.cancelled]: - lettuce_client = await LettuceClient.create() - status = await lettuce_client.get_status(run_id=run_id) - - # Map the status to our enum - run_status = run.status - if status == "RUNNING": - run_status = RunStatus.running - elif status == "COMPLETED": - run_status = RunStatus.completed - elif status == "FAILED": - run_status = RunStatus.failed - elif status == "CANCELLED": - run_status = RunStatus.cancelled - run.status = run_status - return run + return await runs_manager.get_run_with_status(run_id=run_id, actor=actor) RunMessagesResponse = Annotated[ diff --git a/letta/services/mcp/base_client.py b/letta/services/mcp/base_client.py index 0438c34a..3e5008c9 100644 --- a/letta/services/mcp/base_client.py +++ b/letta/services/mcp/base_client.py @@ -38,8 +38,8 @@ class AsyncBaseMCPClient: raise e except Exception as e: # MCP connection failures are often due to user misconfiguration, not system errors - # Log at info level to help with debugging without triggering Sentry alerts - logger.info( + # Log as warning for visibility in monitoring + logger.warning( f"Connecting to MCP server failed. Please review your server config: {self.server_config.model_dump_json(indent=4)}. Error: {str(e)}" ) if hasattr(self.server_config, "server_url") and self.server_config.server_url: @@ -78,7 +78,13 @@ class AsyncBaseMCPClient: async def execute_tool(self, tool_name: str, tool_args: dict) -> Tuple[str, bool]: self._check_initialized() - result = await self.session.call_tool(tool_name, tool_args) + try: + result = await self.session.call_tool(tool_name, tool_args) + except Exception as e: + if e.__class__.__name__ == "McpError": + logger.warning(f"MCP tool '{tool_name}' execution failed: {str(e)}") + raise + parsed_content = [] for content_piece in result.content: if isinstance(content_piece, TextContent): diff --git a/letta/services/run_manager.py b/letta/services/run_manager.py index 5691d1ad..8da9fd93 100644 --- a/letta/services/run_manager.py +++ b/letta/services/run_manager.py @@ -97,6 +97,34 @@ class RunManager: raise NoResultFound(f"Run with id {run_id} not found") return run.to_pydantic() + @enforce_types + async def get_run_with_status(self, run_id: str, actor: PydanticUser) -> PydanticRun: + """Get a run by its ID and update status from Lettuce if applicable.""" + run = await self.get_run_by_id(run_id=run_id, actor=actor) + + use_lettuce = run.metadata and run.metadata.get("lettuce") + if use_lettuce and run.status not in [RunStatus.completed, RunStatus.failed, RunStatus.cancelled]: + try: + from letta.services.lettuce_client import LettuceClient + + lettuce_client = await LettuceClient.create() + status = await lettuce_client.get_status(run_id=run_id) + + # Map the status to our enum + if status == "RUNNING": + run.status = RunStatus.running + elif status == "COMPLETED": + run.status = RunStatus.completed + elif status == "FAILED": + run.status = RunStatus.failed + elif status == "CANCELLED": + run.status = RunStatus.cancelled + except Exception as e: + logger.error(f"Failed to get status from Lettuce for run {run_id}: {str(e)}") + # Return run with current status from DB if Lettuce fails + + return run + @enforce_types async def list_runs( self, diff --git a/otel/otel-collector-config-clickhouse-prod.yaml b/otel/otel-collector-config-clickhouse-prod.yaml index 4a09d0e4..07b16d18 100644 --- a/otel/otel-collector-config-clickhouse-prod.yaml +++ b/otel/otel-collector-config-clickhouse-prod.yaml @@ -8,6 +8,8 @@ receivers: filelog: include: - /root/.letta/logs/Letta.log + multiline: + line_start_pattern: '^[\{\[]|^[0-9]{4}-[0-9]{2}-[0-9]{2}' operators: # Parse JSON logs (skip non-JSON lines) - type: json_parser @@ -19,8 +21,14 @@ receivers: layout_type: gotime layout: '2006-01-02T15:04:05.999999Z07:00' on_error: send + if: 'attributes.timestamp != nil' processors: + resource: + attributes: + - key: environment + value: ${env:LETTA_ENVIRONMENT} + action: upsert memory_limiter: check_interval: 1s limit_mib: 1024 @@ -64,7 +72,7 @@ service: exporters: [clickhouse] logs: receivers: [filelog] - processors: [memory_limiter, batch] + processors: [resource, memory_limiter, batch] exporters: [clickhouse] metrics: receivers: [otlp] diff --git a/otel/otel-collector-config-clickhouse.yaml b/otel/otel-collector-config-clickhouse.yaml index e0479d56..2fd18e67 100644 --- a/otel/otel-collector-config-clickhouse.yaml +++ b/otel/otel-collector-config-clickhouse.yaml @@ -8,6 +8,8 @@ receivers: filelog: include: - /root/.letta/logs/Letta.log + multiline: + line_start_pattern: '^[\{\[]|^[0-9]{4}-[0-9]{2}-[0-9]{2}' operators: # Parse JSON logs (skip non-JSON lines) - type: json_parser @@ -19,8 +21,14 @@ receivers: layout_type: gotime layout: '2006-01-02T15:04:05.999999Z07:00' on_error: send + if: 'attributes.timestamp != nil' processors: + resource: + attributes: + - key: environment + value: ${env:LETTA_ENVIRONMENT} + action: upsert memory_limiter: check_interval: 1s limit_mib: 1024 @@ -65,7 +73,7 @@ service: exporters: [clickhouse] logs: receivers: [filelog] - processors: [memory_limiter, batch] + processors: [resource, memory_limiter, batch] exporters: [clickhouse] metrics: receivers: [otlp]