From e0c688dc32e0db44c981e0834b38bb7423bf65c4 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Tue, 1 Jul 2025 16:19:58 -0700 Subject: [PATCH] feat: Add telemetry for file uploads (#3128) --- letta/orm/file.py | 2 +- letta/server/rest_api/routers/v1/sources.py | 2 + .../chunker/llama_index_chunker.py | 2 + .../embedder/openai_embedder.py | 37 ++++++++++++ .../services/file_processor/file_processor.py | 59 +++++++++++++++++++ .../file_processor/parser/mistral_parser.py | 2 + 6 files changed, 103 insertions(+), 1 deletion(-) diff --git a/letta/orm/file.py b/letta/orm/file.py index 0552763d..35184f67 100644 --- a/letta/orm/file.py +++ b/letta/orm/file.py @@ -96,7 +96,7 @@ class FileMetadata(SqlalchemyBase, OrganizationMixin, SourceMixin, AsyncAttrs): content_text = None file_name = self.file_name - if strip_directory_prefix: + if strip_directory_prefix and "/" in file_name: file_name = "/".join(file_name.split("/")[1:]) return PydanticFileMetadata( diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index b6e54b6b..a3e32df2 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -10,6 +10,7 @@ from starlette import status import letta.constants as constants from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import FileProcessingStatus @@ -389,6 +390,7 @@ async def sleeptime_document_ingest_async(server: SyncServer, source_id: str, ac await server.sleeptime_document_ingest_async(agent, source, actor, clear_history) +@trace_method async def load_file_to_source_cloud( server: SyncServer, agent_states: List[AgentState], diff --git a/letta/services/file_processor/chunker/llama_index_chunker.py b/letta/services/file_processor/chunker/llama_index_chunker.py index dbb290e3..e62b9ceb 100644 --- a/letta/services/file_processor/chunker/llama_index_chunker.py +++ b/letta/services/file_processor/chunker/llama_index_chunker.py @@ -3,6 +3,7 @@ from typing import List, Tuple from mistralai import OCRPageObject from letta.log import get_logger +from letta.otel.tracing import trace_method logger = get_logger(__name__) @@ -19,6 +20,7 @@ class LlamaIndexChunker: self.parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) # TODO: Make this more general beyond Mistral + @trace_method def chunk_text(self, page: OCRPageObject) -> List[str]: """Chunk text using LlamaIndex splitter""" try: diff --git a/letta/services/file_processor/embedder/openai_embedder.py b/letta/services/file_processor/embedder/openai_embedder.py index 925cf1f3..7e6fa55e 100644 --- a/letta/services/file_processor/embedder/openai_embedder.py +++ b/letta/services/file_processor/embedder/openai_embedder.py @@ -4,6 +4,7 @@ from typing import List, Optional, Tuple, cast from letta.llm_api.llm_client import LLMClient from letta.llm_api.openai_client import OpenAIClient from letta.log import get_logger +from letta.otel.tracing import log_event, trace_method from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import ProviderType from letta.schemas.passage import Passage @@ -35,17 +36,39 @@ class OpenAIEmbedder: ) self.max_concurrent_requests = 20 + @trace_method async def _embed_batch(self, batch: List[str], batch_indices: List[int]) -> List[Tuple[int, List[float]]]: """Embed a single batch and return embeddings with their original indices""" + log_event( + "embedder.batch_started", + { + "batch_size": len(batch), + "model": self.embedding_config.embedding_model, + "provider": self.embedding_config.embedding_provider, + }, + ) embeddings = await self.client.request_embeddings(inputs=batch, embedding_config=self.embedding_config) + log_event("embedder.batch_completed", {"batch_size": len(batch), "embeddings_generated": len(embeddings)}) return [(idx, e) for idx, e in zip(batch_indices, embeddings)] + @trace_method async def generate_embedded_passages(self, file_id: str, source_id: str, chunks: List[str], actor: User) -> List[Passage]: """Generate embeddings for chunks with batching and concurrent processing""" if not chunks: return [] logger.info(f"Generating embeddings for {len(chunks)} chunks using {self.embedding_config.embedding_model}") + log_event( + "embedder.generation_started", + { + "total_chunks": len(chunks), + "model": self.embedding_config.embedding_model, + "provider": self.embedding_config.embedding_provider, + "batch_size": self.embedding_config.batch_size, + "file_id": file_id, + "source_id": source_id, + }, + ) # Create batches with their original indices batches = [] @@ -58,18 +81,28 @@ class OpenAIEmbedder: batch_indices.append(indices) logger.info(f"Processing {len(batches)} batches") + log_event( + "embedder.batching_completed", + {"total_batches": len(batches), "batch_size": self.embedding_config.batch_size, "total_chunks": len(chunks)}, + ) async def process(batch: List[str], indices: List[int]): try: return await self._embed_batch(batch, indices) except Exception as e: logger.error(f"Failed to embed batch of size {len(batch)}: {str(e)}") + log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__}) raise # Execute all batches concurrently with semaphore control tasks = [process(batch, indices) for batch, indices in zip(batches, batch_indices)] + log_event( + "embedder.concurrent_processing_started", + {"concurrent_tasks": len(tasks), "max_concurrent_requests": self.max_concurrent_requests}, + ) results = await asyncio.gather(*tasks) + log_event("embedder.concurrent_processing_completed", {"batches_processed": len(results)}) # Flatten results and sort by original index indexed_embeddings = [] @@ -93,4 +126,8 @@ class OpenAIEmbedder: passages.append(passage) logger.info(f"Successfully generated {len(passages)} embeddings") + log_event( + "embedder.generation_completed", + {"passages_created": len(passages), "total_chunks_processed": len(chunks), "file_id": file_id, "source_id": source_id}, + ) return passages diff --git a/letta/services/file_processor/file_processor.py b/letta/services/file_processor/file_processor.py index 16fd027e..67c086ed 100644 --- a/letta/services/file_processor/file_processor.py +++ b/letta/services/file_processor/file_processor.py @@ -1,6 +1,7 @@ from typing import List from letta.log import get_logger +from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState from letta.schemas.enums import FileProcessingStatus from letta.schemas.file import FileMetadata @@ -42,6 +43,7 @@ class FileProcessor: self.actor = actor # TODO: Factor this function out of SyncServer + @trace_method async def process( self, server: SyncServer, agent_states: List[AgentState], source_id: str, content: bytes, file_metadata: FileMetadata ) -> List[Passage]: @@ -50,6 +52,15 @@ class FileProcessor: # Create file as early as possible with no content file_metadata.processing_status = FileProcessingStatus.PARSING # Parsing now file_metadata = await self.file_manager.create_file(file_metadata, self.actor) + log_event( + "file_processor.file_created", + { + "file_id": str(file_metadata.id), + "filename": filename, + "file_type": file_metadata.file_type, + "status": FileProcessingStatus.PARSING.value, + }, + ) try: # Ensure we're working with bytes @@ -57,13 +68,22 @@ class FileProcessor: content = content.encode("utf-8") if len(content) > self.max_file_size: + log_event( + "file_processor.size_limit_exceeded", + {"filename": filename, "file_size": len(content), "max_file_size": self.max_file_size}, + ) raise ValueError(f"PDF size exceeds maximum allowed size of {self.max_file_size} bytes") logger.info(f"Starting OCR extraction for {filename}") + log_event("file_processor.ocr_started", {"filename": filename, "file_size": len(content), "mime_type": file_metadata.file_type}) ocr_response = await self.file_parser.extract_text(content, mime_type=file_metadata.file_type) # update file with raw text raw_markdown_text = "".join([page.markdown for page in ocr_response.pages]) + log_event( + "file_processor.ocr_completed", + {"filename": filename, "pages_extracted": len(ocr_response.pages), "text_length": len(raw_markdown_text)}, + ) file_metadata = await self.file_manager.update_file_status( file_id=file_metadata.id, actor=self.actor, processing_status=FileProcessingStatus.EMBEDDING ) @@ -77,27 +97,56 @@ class FileProcessor: ) if not ocr_response or len(ocr_response.pages) == 0: + log_event( + "file_processor.ocr_no_text", + { + "filename": filename, + "ocr_response_empty": not ocr_response, + "pages_count": len(ocr_response.pages) if ocr_response else 0, + }, + ) raise ValueError("No text extracted from PDF") logger.info("Chunking extracted text") + log_event("file_processor.chunking_started", {"filename": filename, "pages_to_process": len(ocr_response.pages)}) all_passages = [] for page in ocr_response.pages: chunks = self.text_chunker.chunk_text(page) if not chunks: + log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}) raise ValueError("No chunks created from text") passages = await self.embedder.generate_embedded_passages( file_id=file_metadata.id, source_id=source_id, chunks=chunks, actor=self.actor ) + log_event( + "file_processor.page_processed", + { + "filename": filename, + "page_index": ocr_response.pages.index(page), + "chunks_created": len(chunks), + "passages_generated": len(passages), + }, + ) all_passages.extend(passages) all_passages = await self.passage_manager.create_many_source_passages_async( passages=all_passages, file_metadata=file_metadata, actor=self.actor ) + log_event("file_processor.passages_created", {"filename": filename, "total_passages": len(all_passages)}) logger.info(f"Successfully processed {filename}: {len(all_passages)} passages") + log_event( + "file_processor.processing_completed", + { + "filename": filename, + "file_id": str(file_metadata.id), + "total_passages": len(all_passages), + "status": FileProcessingStatus.COMPLETED.value, + }, + ) # update job status await self.file_manager.update_file_status( @@ -108,6 +157,16 @@ class FileProcessor: except Exception as e: logger.error(f"File processing failed for {filename}: {str(e)}") + log_event( + "file_processor.processing_failed", + { + "filename": filename, + "file_id": str(file_metadata.id), + "error": str(e), + "error_type": type(e).__name__, + "status": FileProcessingStatus.ERROR.value, + }, + ) await self.file_manager.update_file_status( file_id=file_metadata.id, actor=self.actor, processing_status=FileProcessingStatus.ERROR, error_message=str(e) ) diff --git a/letta/services/file_processor/parser/mistral_parser.py b/letta/services/file_processor/parser/mistral_parser.py index 6b2b13f6..d40999a9 100644 --- a/letta/services/file_processor/parser/mistral_parser.py +++ b/letta/services/file_processor/parser/mistral_parser.py @@ -3,6 +3,7 @@ import base64 from mistralai import Mistral, OCRPageObject, OCRResponse, OCRUsageInfo from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.services.file_processor.file_types import is_simple_text_mime_type from letta.services.file_processor.parser.base_parser import FileParser from letta.settings import settings @@ -17,6 +18,7 @@ class MistralFileParser(FileParser): self.model = model # TODO: Make this return something general if we add more file parsers + @trace_method async def extract_text(self, content: bytes, mime_type: str) -> OCRResponse: """Extract text using Mistral OCR or shortcut for plain text.""" try: