feat: Make chunking more robust and file type specific (#3188)

This commit is contained in:
Matthew Zhou
2025-07-07 11:05:01 -07:00
committed by GitHub
parent 961d14f975
commit e5cb964d90
3 changed files with 178 additions and 108 deletions

View File

@@ -26,7 +26,6 @@ from letta.schemas.source import Source, SourceCreate, SourceUpdate
from letta.schemas.user import User
from letta.server.rest_api.utils import get_letta_server
from letta.server.server import SyncServer
from letta.services.file_processor.chunker.llama_index_chunker import LlamaIndexChunker
from letta.services.file_processor.embedder.openai_embedder import OpenAIEmbedder
from letta.services.file_processor.embedder.pinecone_embedder import PineconeEmbedder
from letta.services.file_processor.file_processor import FileProcessor
@@ -424,15 +423,12 @@ async def load_file_to_source_cloud(
file_metadata: FileMetadata,
):
file_processor = MistralFileParser()
text_chunker = LlamaIndexChunker(chunk_size=embedding_config.embedding_chunk_size)
using_pinecone = should_use_pinecone()
if using_pinecone:
embedder = PineconeEmbedder()
else:
embedder = OpenAIEmbedder(embedding_config=embedding_config)
file_processor = FileProcessor(
file_parser=file_processor, text_chunker=text_chunker, embedder=embedder, actor=actor, using_pinecone=using_pinecone
)
file_processor = FileProcessor(file_parser=file_processor, embedder=embedder, actor=actor, using_pinecone=using_pinecone)
await file_processor.process(
server=server, agent_states=agent_states, source_id=source_id, content=content, file_metadata=file_metadata
)

View File

@@ -1,119 +1,149 @@
from typing import List, Tuple
from typing import List, Optional, Union
from mistralai import OCRPageObject
from letta.log import get_logger
from letta.otel.tracing import trace_method
from letta.services.file_processor.file_types import ChunkingStrategy, file_type_registry
logger = get_logger(__name__)
class LlamaIndexChunker:
"""LlamaIndex-based text chunking"""
"""LlamaIndex-based text chunking with automatic splitter selection"""
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50):
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50, file_type: Optional[str] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.file_type = file_type
from llama_index.core.node_parser import SentenceSplitter
# Create appropriate parser based on file type
self.parser = self._create_parser_for_file_type(file_type, chunk_size, chunk_overlap)
self.parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
# Log which parser was selected
parser_name = type(self.parser).__name__
logger.info(f"LlamaIndexChunker initialized with {parser_name} for file type: {file_type}")
def _create_parser_for_file_type(self, file_type: Optional[str], chunk_size: int, chunk_overlap: int):
"""Create appropriate parser based on file type"""
if not file_type:
# Default fallback
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
try:
# Get chunking strategy from file type registry
chunking_strategy = file_type_registry.get_chunking_strategy_by_mime_type(file_type)
logger.debug(f"Chunking strategy for {file_type}: {chunking_strategy}")
if chunking_strategy == ChunkingStrategy.CODE:
from llama_index.core.node_parser import CodeSplitter
return CodeSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
elif chunking_strategy == ChunkingStrategy.DOCUMENTATION:
if file_type in ["text/markdown", "text/x-markdown"]:
from llama_index.core.node_parser import MarkdownNodeParser
return MarkdownNodeParser()
elif file_type in ["text/html"]:
from llama_index.core.node_parser import HTMLNodeParser
return HTMLNodeParser()
else:
# Fall back to sentence splitter for other documentation
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
elif chunking_strategy == ChunkingStrategy.STRUCTURED_DATA:
if file_type in ["application/json", "application/jsonl"]:
from llama_index.core.node_parser import JSONNodeParser
return JSONNodeParser()
else:
# Fall back to sentence splitter for other structured data
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
else:
# Default to sentence splitter for PROSE and LINE_BASED
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
except Exception as e:
logger.warning(f"Failed to create specialized parser for {file_type}: {str(e)}. Using default SentenceSplitter.")
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
# TODO: Make this more general beyond Mistral
@trace_method
def chunk_text(self, page: OCRPageObject) -> List[str]:
def chunk_text(self, content: Union[OCRPageObject, str]) -> List[str]:
"""Chunk text using LlamaIndex splitter"""
try:
return self.parser.split_text(page.markdown)
# Handle different input types
if isinstance(content, OCRPageObject):
# Extract markdown from OCR page object
text_content = content.markdown
else:
# Assume it's a string
text_content = content
# Use the selected parser
if hasattr(self.parser, "split_text"):
# Most parsers have split_text method
return self.parser.split_text(text_content)
elif hasattr(self.parser, "get_nodes_from_documents"):
# Some parsers need Document objects
from llama_index.core import Document
document = Document(text=text_content)
nodes = self.parser.get_nodes_from_documents([document])
return [node.text for node in nodes]
else:
# Fallback - try to call the parser directly
return self.parser(text_content)
except Exception as e:
logger.error(f"Chunking failed: {str(e)}")
raise
logger.error(f"Chunking failed with {type(self.parser).__name__}: {str(e)}")
# Try fallback with SentenceSplitter
try:
logger.info("Attempting fallback to SentenceSplitter")
from llama_index.core.node_parser import SentenceSplitter
fallback_parser = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
class MarkdownChunker:
"""Markdown-specific chunker that preserves line numbers for citation purposes"""
# Extract text content if needed
if isinstance(content, OCRPageObject):
text_content = content.markdown
else:
text_content = content
def __init__(self, chunk_size: int = 2048):
self.chunk_size = chunk_size
# No overlap for line-based citations to avoid ambiguity
return fallback_parser.split_text(text_content)
except Exception as fallback_error:
logger.error(f"Fallback chunking also failed: {str(fallback_error)}")
raise e # Raise the original error
from llama_index.core.node_parser import MarkdownNodeParser
self.parser = MarkdownNodeParser()
def chunk_markdown_with_line_numbers(self, markdown_content: str) -> List[Tuple[str, int, int]]:
"""
Chunk markdown content while preserving line number mappings.
Returns:
List of tuples: (chunk_text, start_line, end_line)
"""
@trace_method
def default_chunk_text(self, content: Union[OCRPageObject, str], chunk_size: int = 384, chunk_overlap: int = 25) -> List[str]:
"""Chunk text using default SentenceSplitter regardless of file type with conservative defaults"""
try:
# Split content into lines for line number tracking
lines = markdown_content.split("\n")
from llama_index.core.node_parser import SentenceSplitter
# Create nodes using MarkdownNodeParser
from llama_index.core import Document
# Use provided defaults or fallback to conservative values
default_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
document = Document(text=markdown_content)
nodes = self.parser.get_nodes_from_documents([document])
# Handle different input types
if isinstance(content, OCRPageObject):
text_content = content.markdown
else:
text_content = content
chunks_with_line_numbers = []
for node in nodes:
chunk_text = node.text
# Find the line numbers for this chunk
start_line, end_line = self._find_line_numbers(chunk_text, lines)
chunks_with_line_numbers.append((chunk_text, start_line, end_line))
return chunks_with_line_numbers
return default_parser.split_text(text_content)
except Exception as e:
logger.error(f"Markdown chunking failed: {str(e)}")
# Fallback to simple line-based chunking
return self._fallback_line_chunking(markdown_content)
def _find_line_numbers(self, chunk_text: str, lines: List[str]) -> Tuple[int, int]:
"""Find the start and end line numbers for a given chunk of text."""
chunk_lines = chunk_text.split("\n")
# Find the first line of the chunk in the original document
start_line = 1
for i, line in enumerate(lines):
if chunk_lines[0].strip() in line.strip() and len(chunk_lines[0].strip()) > 10: # Avoid matching short lines
start_line = i + 1
break
# Calculate end line
end_line = start_line + len(chunk_lines) - 1
return start_line, min(end_line, len(lines))
def _fallback_line_chunking(self, markdown_content: str) -> List[Tuple[str, int, int]]:
"""Fallback chunking method that simply splits by lines with no overlap."""
lines = markdown_content.split("\n")
chunks = []
i = 0
while i < len(lines):
chunk_lines = []
start_line = i + 1
char_count = 0
# Build chunk until we hit size limit
while i < len(lines) and char_count < self.chunk_size:
line = lines[i]
chunk_lines.append(line)
char_count += len(line) + 1 # +1 for newline
i += 1
end_line = i
chunk_text = "\n".join(chunk_lines)
chunks.append((chunk_text, start_line, end_line))
# No overlap - continue from where we left off
return chunks
logger.error(f"Default chunking failed: {str(e)}")
raise

View File

@@ -26,14 +26,12 @@ class FileProcessor:
def __init__(
self,
file_parser: MistralFileParser,
text_chunker: LlamaIndexChunker,
embedder: BaseEmbedder,
actor: User,
using_pinecone: bool,
max_file_size: int = 50 * 1024 * 1024, # 50MB default
):
self.file_parser = file_parser
self.text_chunker = text_chunker
self.line_chunker = LineChunker()
self.embedder = embedder
self.max_file_size = max_file_size
@@ -44,6 +42,61 @@ class FileProcessor:
self.actor = actor
self.using_pinecone = using_pinecone
async def _chunk_and_embed_with_fallback(self, file_metadata: FileMetadata, ocr_response, source_id: str) -> List:
"""Chunk text and generate embeddings with fallback to default chunker if needed"""
filename = file_metadata.file_name
# Create file-type-specific chunker
text_chunker = LlamaIndexChunker(file_type=file_metadata.file_type)
# First attempt with file-specific chunker
try:
all_chunks = []
for page in ocr_response.pages:
chunks = text_chunker.chunk_text(page)
if not chunks:
log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)})
raise ValueError("No chunks created from text")
all_chunks.extend(chunks)
all_passages = await self.embedder.generate_embedded_passages(
file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor
)
return all_passages
except Exception as e:
logger.warning(f"Failed to chunk/embed with file-specific chunker for {filename}: {str(e)}. Retrying with default chunker.")
log_event("file_processor.embedding_failed_retrying", {"filename": filename, "error": str(e), "error_type": type(e).__name__})
# Retry with default chunker
try:
logger.info(f"Retrying chunking with default SentenceSplitter for {filename}")
all_chunks = []
for page in ocr_response.pages:
chunks = text_chunker.default_chunk_text(page)
if not chunks:
log_event(
"file_processor.default_chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}
)
raise ValueError("No chunks created from text with default chunker")
all_chunks.extend(chunks)
all_passages = await self.embedder.generate_embedded_passages(
file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor
)
logger.info(f"Successfully generated passages with default chunker for {filename}")
log_event("file_processor.default_chunking_success", {"filename": filename, "total_chunks": len(all_chunks)})
return all_passages
except Exception as fallback_error:
logger.error(f"Default chunking also failed for {filename}: {str(fallback_error)}")
log_event(
"file_processor.default_chunking_also_failed",
{"filename": filename, "fallback_error": str(fallback_error), "fallback_error_type": type(fallback_error).__name__},
)
raise fallback_error
# TODO: Factor this function out of SyncServer
@trace_method
async def process(
@@ -111,19 +164,10 @@ class FileProcessor:
logger.info("Chunking extracted text")
log_event("file_processor.chunking_started", {"filename": filename, "pages_to_process": len(ocr_response.pages)})
all_chunks = []
for page in ocr_response.pages:
chunks = self.text_chunker.chunk_text(page)
if not chunks:
log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)})
raise ValueError("No chunks created from text")
all_chunks.extend(self.text_chunker.chunk_text(page))
all_passages = await self.embedder.generate_embedded_passages(
file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor
# Chunk and embed with fallback logic
all_passages = await self._chunk_and_embed_with_fallback(
file_metadata=file_metadata, ocr_response=ocr_response, source_id=source_id
)
if not self.using_pinecone:
@@ -156,7 +200,7 @@ class FileProcessor:
return all_passages
except Exception as e:
logger.debug(f"File processing failed for {filename}: {str(e)}")
logger.error(f"File processing failed for {filename}: {str(e)}")
log_event(
"file_processor.processing_failed",
{