From e5cb964d90b007f49fffeb76b83547322557f685 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Mon, 7 Jul 2025 11:05:01 -0700 Subject: [PATCH] feat: Make chunking more robust and file type specific (#3188) --- letta/server/rest_api/routers/v1/sources.py | 6 +- .../chunker/llama_index_chunker.py | 206 ++++++++++-------- .../services/file_processor/file_processor.py | 74 +++++-- 3 files changed, 178 insertions(+), 108 deletions(-) diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index bb6694eb..3e715e66 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -26,7 +26,6 @@ from letta.schemas.source import Source, SourceCreate, SourceUpdate from letta.schemas.user import User from letta.server.rest_api.utils import get_letta_server from letta.server.server import SyncServer -from letta.services.file_processor.chunker.llama_index_chunker import LlamaIndexChunker from letta.services.file_processor.embedder.openai_embedder import OpenAIEmbedder from letta.services.file_processor.embedder.pinecone_embedder import PineconeEmbedder from letta.services.file_processor.file_processor import FileProcessor @@ -424,15 +423,12 @@ async def load_file_to_source_cloud( file_metadata: FileMetadata, ): file_processor = MistralFileParser() - text_chunker = LlamaIndexChunker(chunk_size=embedding_config.embedding_chunk_size) using_pinecone = should_use_pinecone() if using_pinecone: embedder = PineconeEmbedder() else: embedder = OpenAIEmbedder(embedding_config=embedding_config) - file_processor = FileProcessor( - file_parser=file_processor, text_chunker=text_chunker, embedder=embedder, actor=actor, using_pinecone=using_pinecone - ) + file_processor = FileProcessor(file_parser=file_processor, embedder=embedder, actor=actor, using_pinecone=using_pinecone) await file_processor.process( server=server, agent_states=agent_states, source_id=source_id, content=content, file_metadata=file_metadata ) diff --git a/letta/services/file_processor/chunker/llama_index_chunker.py b/letta/services/file_processor/chunker/llama_index_chunker.py index e62b9ceb..d2c3604a 100644 --- a/letta/services/file_processor/chunker/llama_index_chunker.py +++ b/letta/services/file_processor/chunker/llama_index_chunker.py @@ -1,119 +1,149 @@ -from typing import List, Tuple +from typing import List, Optional, Union from mistralai import OCRPageObject from letta.log import get_logger from letta.otel.tracing import trace_method +from letta.services.file_processor.file_types import ChunkingStrategy, file_type_registry logger = get_logger(__name__) class LlamaIndexChunker: - """LlamaIndex-based text chunking""" + """LlamaIndex-based text chunking with automatic splitter selection""" - def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50): + def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50, file_type: Optional[str] = None): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap + self.file_type = file_type - from llama_index.core.node_parser import SentenceSplitter + # Create appropriate parser based on file type + self.parser = self._create_parser_for_file_type(file_type, chunk_size, chunk_overlap) - self.parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + # Log which parser was selected + parser_name = type(self.parser).__name__ + logger.info(f"LlamaIndexChunker initialized with {parser_name} for file type: {file_type}") + + def _create_parser_for_file_type(self, file_type: Optional[str], chunk_size: int, chunk_overlap: int): + """Create appropriate parser based on file type""" + if not file_type: + # Default fallback + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + try: + # Get chunking strategy from file type registry + chunking_strategy = file_type_registry.get_chunking_strategy_by_mime_type(file_type) + logger.debug(f"Chunking strategy for {file_type}: {chunking_strategy}") + + if chunking_strategy == ChunkingStrategy.CODE: + from llama_index.core.node_parser import CodeSplitter + + return CodeSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + elif chunking_strategy == ChunkingStrategy.DOCUMENTATION: + if file_type in ["text/markdown", "text/x-markdown"]: + from llama_index.core.node_parser import MarkdownNodeParser + + return MarkdownNodeParser() + elif file_type in ["text/html"]: + from llama_index.core.node_parser import HTMLNodeParser + + return HTMLNodeParser() + else: + # Fall back to sentence splitter for other documentation + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + elif chunking_strategy == ChunkingStrategy.STRUCTURED_DATA: + if file_type in ["application/json", "application/jsonl"]: + from llama_index.core.node_parser import JSONNodeParser + + return JSONNodeParser() + else: + # Fall back to sentence splitter for other structured data + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + else: + # Default to sentence splitter for PROSE and LINE_BASED + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + except Exception as e: + logger.warning(f"Failed to create specialized parser for {file_type}: {str(e)}. Using default SentenceSplitter.") + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - # TODO: Make this more general beyond Mistral @trace_method - def chunk_text(self, page: OCRPageObject) -> List[str]: + def chunk_text(self, content: Union[OCRPageObject, str]) -> List[str]: """Chunk text using LlamaIndex splitter""" try: - return self.parser.split_text(page.markdown) + # Handle different input types + if isinstance(content, OCRPageObject): + # Extract markdown from OCR page object + text_content = content.markdown + else: + # Assume it's a string + text_content = content + + # Use the selected parser + if hasattr(self.parser, "split_text"): + # Most parsers have split_text method + return self.parser.split_text(text_content) + elif hasattr(self.parser, "get_nodes_from_documents"): + # Some parsers need Document objects + from llama_index.core import Document + + document = Document(text=text_content) + nodes = self.parser.get_nodes_from_documents([document]) + return [node.text for node in nodes] + else: + # Fallback - try to call the parser directly + return self.parser(text_content) except Exception as e: - logger.error(f"Chunking failed: {str(e)}") - raise + logger.error(f"Chunking failed with {type(self.parser).__name__}: {str(e)}") + # Try fallback with SentenceSplitter + try: + logger.info("Attempting fallback to SentenceSplitter") + from llama_index.core.node_parser import SentenceSplitter + fallback_parser = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap) -class MarkdownChunker: - """Markdown-specific chunker that preserves line numbers for citation purposes""" + # Extract text content if needed + if isinstance(content, OCRPageObject): + text_content = content.markdown + else: + text_content = content - def __init__(self, chunk_size: int = 2048): - self.chunk_size = chunk_size - # No overlap for line-based citations to avoid ambiguity + return fallback_parser.split_text(text_content) + except Exception as fallback_error: + logger.error(f"Fallback chunking also failed: {str(fallback_error)}") + raise e # Raise the original error - from llama_index.core.node_parser import MarkdownNodeParser - - self.parser = MarkdownNodeParser() - - def chunk_markdown_with_line_numbers(self, markdown_content: str) -> List[Tuple[str, int, int]]: - """ - Chunk markdown content while preserving line number mappings. - - Returns: - List of tuples: (chunk_text, start_line, end_line) - """ + @trace_method + def default_chunk_text(self, content: Union[OCRPageObject, str], chunk_size: int = 384, chunk_overlap: int = 25) -> List[str]: + """Chunk text using default SentenceSplitter regardless of file type with conservative defaults""" try: - # Split content into lines for line number tracking - lines = markdown_content.split("\n") + from llama_index.core.node_parser import SentenceSplitter - # Create nodes using MarkdownNodeParser - from llama_index.core import Document + # Use provided defaults or fallback to conservative values + default_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - document = Document(text=markdown_content) - nodes = self.parser.get_nodes_from_documents([document]) + # Handle different input types + if isinstance(content, OCRPageObject): + text_content = content.markdown + else: + text_content = content - chunks_with_line_numbers = [] - - for node in nodes: - chunk_text = node.text - - # Find the line numbers for this chunk - start_line, end_line = self._find_line_numbers(chunk_text, lines) - - chunks_with_line_numbers.append((chunk_text, start_line, end_line)) - - return chunks_with_line_numbers + return default_parser.split_text(text_content) except Exception as e: - logger.error(f"Markdown chunking failed: {str(e)}") - # Fallback to simple line-based chunking - return self._fallback_line_chunking(markdown_content) - - def _find_line_numbers(self, chunk_text: str, lines: List[str]) -> Tuple[int, int]: - """Find the start and end line numbers for a given chunk of text.""" - chunk_lines = chunk_text.split("\n") - - # Find the first line of the chunk in the original document - start_line = 1 - for i, line in enumerate(lines): - if chunk_lines[0].strip() in line.strip() and len(chunk_lines[0].strip()) > 10: # Avoid matching short lines - start_line = i + 1 - break - - # Calculate end line - end_line = start_line + len(chunk_lines) - 1 - - return start_line, min(end_line, len(lines)) - - def _fallback_line_chunking(self, markdown_content: str) -> List[Tuple[str, int, int]]: - """Fallback chunking method that simply splits by lines with no overlap.""" - lines = markdown_content.split("\n") - chunks = [] - - i = 0 - while i < len(lines): - chunk_lines = [] - start_line = i + 1 - char_count = 0 - - # Build chunk until we hit size limit - while i < len(lines) and char_count < self.chunk_size: - line = lines[i] - chunk_lines.append(line) - char_count += len(line) + 1 # +1 for newline - i += 1 - - end_line = i - chunk_text = "\n".join(chunk_lines) - chunks.append((chunk_text, start_line, end_line)) - - # No overlap - continue from where we left off - - return chunks + logger.error(f"Default chunking failed: {str(e)}") + raise diff --git a/letta/services/file_processor/file_processor.py b/letta/services/file_processor/file_processor.py index 2b356e3c..09cf8c0b 100644 --- a/letta/services/file_processor/file_processor.py +++ b/letta/services/file_processor/file_processor.py @@ -26,14 +26,12 @@ class FileProcessor: def __init__( self, file_parser: MistralFileParser, - text_chunker: LlamaIndexChunker, embedder: BaseEmbedder, actor: User, using_pinecone: bool, max_file_size: int = 50 * 1024 * 1024, # 50MB default ): self.file_parser = file_parser - self.text_chunker = text_chunker self.line_chunker = LineChunker() self.embedder = embedder self.max_file_size = max_file_size @@ -44,6 +42,61 @@ class FileProcessor: self.actor = actor self.using_pinecone = using_pinecone + async def _chunk_and_embed_with_fallback(self, file_metadata: FileMetadata, ocr_response, source_id: str) -> List: + """Chunk text and generate embeddings with fallback to default chunker if needed""" + filename = file_metadata.file_name + + # Create file-type-specific chunker + text_chunker = LlamaIndexChunker(file_type=file_metadata.file_type) + + # First attempt with file-specific chunker + try: + all_chunks = [] + for page in ocr_response.pages: + chunks = text_chunker.chunk_text(page) + if not chunks: + log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}) + raise ValueError("No chunks created from text") + all_chunks.extend(chunks) + + all_passages = await self.embedder.generate_embedded_passages( + file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor + ) + return all_passages + + except Exception as e: + logger.warning(f"Failed to chunk/embed with file-specific chunker for {filename}: {str(e)}. Retrying with default chunker.") + log_event("file_processor.embedding_failed_retrying", {"filename": filename, "error": str(e), "error_type": type(e).__name__}) + + # Retry with default chunker + try: + logger.info(f"Retrying chunking with default SentenceSplitter for {filename}") + all_chunks = [] + + for page in ocr_response.pages: + chunks = text_chunker.default_chunk_text(page) + if not chunks: + log_event( + "file_processor.default_chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)} + ) + raise ValueError("No chunks created from text with default chunker") + all_chunks.extend(chunks) + + all_passages = await self.embedder.generate_embedded_passages( + file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor + ) + logger.info(f"Successfully generated passages with default chunker for {filename}") + log_event("file_processor.default_chunking_success", {"filename": filename, "total_chunks": len(all_chunks)}) + return all_passages + + except Exception as fallback_error: + logger.error(f"Default chunking also failed for {filename}: {str(fallback_error)}") + log_event( + "file_processor.default_chunking_also_failed", + {"filename": filename, "fallback_error": str(fallback_error), "fallback_error_type": type(fallback_error).__name__}, + ) + raise fallback_error + # TODO: Factor this function out of SyncServer @trace_method async def process( @@ -111,19 +164,10 @@ class FileProcessor: logger.info("Chunking extracted text") log_event("file_processor.chunking_started", {"filename": filename, "pages_to_process": len(ocr_response.pages)}) - all_chunks = [] - for page in ocr_response.pages: - chunks = self.text_chunker.chunk_text(page) - - if not chunks: - log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}) - raise ValueError("No chunks created from text") - - all_chunks.extend(self.text_chunker.chunk_text(page)) - - all_passages = await self.embedder.generate_embedded_passages( - file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor + # Chunk and embed with fallback logic + all_passages = await self._chunk_and_embed_with_fallback( + file_metadata=file_metadata, ocr_response=ocr_response, source_id=source_id ) if not self.using_pinecone: @@ -156,7 +200,7 @@ class FileProcessor: return all_passages except Exception as e: - logger.debug(f"File processing failed for {filename}: {str(e)}") + logger.error(f"File processing failed for {filename}: {str(e)}") log_event( "file_processor.processing_failed", {