chore: bump letta version 0.8.11 (#2711)

Co-authored-by: Kian Jones <11655409+kianjones9@users.noreply.github.com>
Co-authored-by: Sarah Wooders <sarahwooders@gmail.com>
Co-authored-by: Matthew Zhou <mattzh1314@gmail.com>
Co-authored-by: Andy Li <55300002+cliandy@users.noreply.github.com>
This commit is contained in:
cthomas
2025-07-07 17:03:26 -07:00
committed by GitHub
parent 5abbe44c30
commit ca6424470f
15 changed files with 637 additions and 211 deletions

View File

@@ -371,3 +371,4 @@ PINECONE_TEXT_FIELD_NAME = "chunk_text"
PINECONE_METRIC = "cosine"
PINECONE_CLOUD = "aws"
PINECONE_REGION = "us-east-1"
PINECONE_MAX_BATCH_SIZE = 96

View File

@@ -2,7 +2,14 @@ from typing import Any, Dict, List
from pinecone import PineconeAsyncio
from letta.constants import PINECONE_CLOUD, PINECONE_EMBEDDING_MODEL, PINECONE_METRIC, PINECONE_REGION, PINECONE_TEXT_FIELD_NAME
from letta.constants import (
PINECONE_CLOUD,
PINECONE_EMBEDDING_MODEL,
PINECONE_MAX_BATCH_SIZE,
PINECONE_METRIC,
PINECONE_REGION,
PINECONE_TEXT_FIELD_NAME,
)
from letta.log import get_logger
from letta.schemas.user import User
from letta.settings import settings
@@ -90,7 +97,10 @@ async def upsert_records_to_pinecone_index(records: List[dict], actor: User):
async with PineconeAsyncio(api_key=settings.pinecone_api_key) as pc:
description = await pc.describe_index(name=settings.pinecone_source_index)
async with pc.IndexAsyncio(host=description.index.host) as dense_index:
await dense_index.upsert_records(actor.organization_id, records)
# Process records in batches to avoid exceeding Pinecone limits
for i in range(0, len(records), PINECONE_MAX_BATCH_SIZE):
batch = records[i : i + PINECONE_MAX_BATCH_SIZE]
await dense_index.upsert_records(actor.organization_id, batch)
async def search_pinecone_index(query: str, limit: int, filter: Dict[str, Any], actor: User) -> Dict[str, Any]:

View File

@@ -4,10 +4,11 @@ from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import text
from letta.jobs.llm_batch_job_polling import poll_running_llm_batches
from letta.log import get_logger
from letta.server.db import db_context
from letta.server.db import db_registry
from letta.server.server import SyncServer
from letta.settings import settings
@@ -16,68 +17,54 @@ scheduler = AsyncIOScheduler()
logger = get_logger(__name__)
ADVISORY_LOCK_KEY = 0x12345678ABCDEF00
_advisory_lock_conn = None # Holds the raw DB connection if leader
_advisory_lock_cur = None # Holds the cursor for the lock connection if leader
_advisory_lock_session = None # Holds the async session if leader
_lock_retry_task: Optional[asyncio.Task] = None # Background task handle for non-leaders
_is_scheduler_leader = False # Flag indicating if this instance runs the scheduler
async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
"""Attempts to acquire lock, starts scheduler if successful."""
global _advisory_lock_conn, _advisory_lock_cur, _is_scheduler_leader, scheduler
global _advisory_lock_session, _is_scheduler_leader, scheduler
if _is_scheduler_leader:
return True # Already leading
raw_conn = None
cur = None
lock_session = None
acquired_lock = False
try:
# Use a temporary connection context for the attempt initially
with db_context() as session:
async with db_registry.async_session() as session:
engine = session.get_bind()
engine_name = engine.name
logger.info(f"Database engine type: {engine_name}")
if engine_name != "postgresql":
logger.warning(f"Advisory locks not supported for {engine_name} database. Starting scheduler without leader election.")
acquired_lock = True # For SQLite, assume we can start the scheduler
acquired_lock = True
else:
# Get raw connection - MUST be kept open if lock is acquired
raw_conn = engine.raw_connection()
cur = raw_conn.cursor()
cur.execute("SELECT pg_try_advisory_lock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
acquired_lock = cur.fetchone()[0]
lock_session = db_registry.get_async_session_factory()()
result = await lock_session.execute(
text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY}
)
acquired_lock = result.scalar()
if not acquired_lock:
if cur:
cur.close()
if raw_conn:
raw_conn.close()
if lock_session:
await lock_session.close()
logger.info("Scheduler lock held by another instance.")
return False
# --- Lock Acquired ---
if engine_name == "postgresql":
logger.info("Acquired PostgreSQL advisory lock.")
_advisory_lock_conn = raw_conn # Keep connection for lock duration
_advisory_lock_cur = cur # Keep cursor for lock duration
raw_conn = None # Prevent closing in finally block
cur = None # Prevent closing in finally block
_advisory_lock_session = lock_session
lock_session = None
else:
logger.info("Starting scheduler for non-PostgreSQL database.")
# For SQLite, we don't need to keep the connection open
if cur:
cur.close()
if raw_conn:
raw_conn.close()
raw_conn = None
cur = None
if lock_session:
await lock_session.close()
lock_session = None
trigger = IntervalTrigger(
seconds=settings.poll_running_llm_batches_interval_seconds,
jitter=10, # Jitter for the job execution
jitter=10,
)
scheduler.add_job(
poll_running_llm_batches,
@@ -91,7 +78,7 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
if not scheduler.running:
scheduler.start()
elif scheduler.state == 2: # PAUSED
elif scheduler.state == 2:
scheduler.resume()
_is_scheduler_leader = True
@@ -99,38 +86,27 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
except Exception as e:
logger.error(f"Error during lock acquisition/scheduler start: {e}", exc_info=True)
if acquired_lock: # If lock was acquired before error, try to release
if acquired_lock:
logger.warning("Attempting to release lock due to error during startup.")
try:
# Use the cursor/connection we were about to store
_advisory_lock_cur = cur
_advisory_lock_conn = raw_conn
await _release_advisory_lock() # Attempt cleanup
_advisory_lock_session = lock_session
await _release_advisory_lock()
except Exception as unlock_err:
logger.error(f"Failed to release lock during error handling: {unlock_err}", exc_info=True)
finally:
# Ensure globals are cleared after failed attempt
_advisory_lock_cur = None
_advisory_lock_conn = None
_advisory_lock_session = None
_is_scheduler_leader = False
# Ensure scheduler is stopped if we failed partially
if scheduler.running:
try:
scheduler.shutdown(wait=False)
except:
pass # Best effort
pass
return False
finally:
# Clean up temporary resources if lock wasn't acquired or error occurred before storing
if cur:
if lock_session:
try:
cur.close()
except:
pass
if raw_conn:
try:
raw_conn.close()
await lock_session.close()
except:
pass
@@ -141,63 +117,50 @@ async def _background_lock_retry_loop(server: SyncServer):
logger.info("Starting background task to periodically check for scheduler lock.")
while True:
if _is_scheduler_leader: # Should be cancelled first, but safety check
if _is_scheduler_leader:
break
try:
wait_time = settings.poll_lock_retry_interval_seconds
await asyncio.sleep(wait_time)
# Re-check state before attempting lock
if _is_scheduler_leader or _lock_retry_task is None:
break # Stop if became leader or task was cancelled
break
acquired = await _try_acquire_lock_and_start_scheduler(server)
if acquired:
logger.info("Background task acquired lock and started scheduler.")
_lock_retry_task = None # Clear self handle
break # Exit loop, we are now the leader
_lock_retry_task = None
break
except asyncio.CancelledError:
logger.info("Background lock retry task cancelled.")
break
except Exception as e:
logger.error(f"Error in background lock retry loop: {e}", exc_info=True)
# Avoid tight loop on persistent errors
await asyncio.sleep(settings.poll_lock_retry_interval_seconds)
async def _release_advisory_lock():
"""Releases the advisory lock using the stored connection."""
global _advisory_lock_conn, _advisory_lock_cur
"""Releases the advisory lock using the stored session."""
global _advisory_lock_session
lock_cur = _advisory_lock_cur
lock_conn = _advisory_lock_conn
_advisory_lock_cur = None # Clear global immediately
_advisory_lock_conn = None # Clear global immediately
lock_session = _advisory_lock_session
_advisory_lock_session = None
if lock_cur is not None and lock_conn is not None:
if lock_session is not None:
logger.info(f"Attempting to release PostgreSQL advisory lock {ADVISORY_LOCK_KEY}")
try:
# Try to execute unlock - connection/cursor validity is checked by attempting the operation
lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
lock_cur.fetchone() # Consume result
lock_conn.commit()
await lock_session.execute(text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY})
logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}")
except Exception as e:
logger.error(f"Error executing pg_advisory_unlock: {e}", exc_info=True)
finally:
# Ensure resources are closed regardless of unlock success
try:
if lock_cur:
lock_cur.close()
if lock_session:
await lock_session.close()
logger.info("Closed database session that held advisory lock.")
except Exception as e:
logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True)
try:
if lock_conn:
lock_conn.close()
logger.info("Closed database connection that held advisory lock.")
except Exception as e:
logger.error(f"Error closing advisory lock connection: {e}", exc_info=True)
logger.error(f"Error closing advisory lock session: {e}", exc_info=True)
else:
logger.info("No PostgreSQL advisory lock to release (likely using SQLite or non-PostgreSQL database).")
@@ -220,7 +183,6 @@ async def start_scheduler_with_leader_election(server: SyncServer):
acquired_immediately = await _try_acquire_lock_and_start_scheduler(server)
if not acquired_immediately and _lock_retry_task is None:
# Failed initial attempt, start background retry task
loop = asyncio.get_running_loop()
_lock_retry_task = loop.create_task(_background_lock_retry_loop(server))
@@ -232,48 +194,40 @@ async def shutdown_scheduler_and_release_lock():
"""
global _is_scheduler_leader, _lock_retry_task, scheduler
# 1. Cancel retry task if running (for non-leaders)
if _lock_retry_task is not None:
logger.info("Shutting down: Cancelling background lock retry task.")
current_task = _lock_retry_task
_lock_retry_task = None # Clear handle first
_lock_retry_task = None
current_task.cancel()
try:
await current_task # Wait for cancellation
await current_task
except asyncio.CancelledError:
logger.info("Background lock retry task successfully cancelled.")
except Exception as e:
logger.warning(f"Exception waiting for cancelled retry task: {e}", exc_info=True)
# 2. Shutdown scheduler and release lock if we were the leader
if _is_scheduler_leader:
logger.info("Shutting down: Leader instance stopping scheduler and releasing lock.")
if scheduler.running:
try:
# Force synchronous shutdown to prevent callback scheduling
scheduler.shutdown(wait=True)
# wait for any internal cleanup to complete
await asyncio.sleep(0.1)
logger.info("APScheduler shut down.")
except Exception as e:
# Handle SchedulerNotRunningError and other shutdown exceptions
logger.warning(f"Exception during APScheduler shutdown: {e}")
if "not running" not in str(e).lower():
logger.error(f"Unexpected error shutting down APScheduler: {e}", exc_info=True)
await _release_advisory_lock()
_is_scheduler_leader = False # Update state after cleanup
_is_scheduler_leader = False
else:
logger.info("Shutting down: Non-leader instance.")
# Final cleanup check for scheduler state (belt and suspenders)
# This should rarely be needed if shutdown logic above worked correctly
try:
if scheduler.running:
logger.warning("Scheduler still running after shutdown logic completed? Forcing shutdown.")
scheduler.shutdown(wait=False)
except Exception as e:
# Catch SchedulerNotRunningError and other shutdown exceptions
logger.debug(f"Expected exception during final scheduler cleanup: {e}")

View File

@@ -0,0 +1,32 @@
from typing import List, Optional
from pydantic import Field
from letta.schemas.letta_base import LettaBase
class FileStats(LettaBase):
"""File statistics for metadata endpoint"""
file_id: str = Field(..., description="Unique identifier of the file")
file_name: str = Field(..., description="Name of the file")
file_size: Optional[int] = Field(None, description="Size of the file in bytes")
class SourceStats(LettaBase):
"""Aggregated metadata for a source"""
source_id: str = Field(..., description="Unique identifier of the source")
source_name: str = Field(..., description="Name of the source")
file_count: int = Field(0, description="Number of files in the source")
total_size: int = Field(0, description="Total size of all files in bytes")
files: List[FileStats] = Field(default_factory=list, description="List of file statistics")
class OrganizationSourcesStats(LettaBase):
"""Complete metadata response for organization sources"""
total_sources: int = Field(0, description="Total number of sources")
total_files: int = Field(0, description="Total number of files across all sources")
total_size: int = Field(0, description="Total size of all files in bytes")
sources: List[SourceStats] = Field(default_factory=list, description="List of source metadata")

View File

@@ -226,6 +226,32 @@ class DatabaseRegistry:
@contextmanager
def session(self, name: str = "default") -> Generator[Any, None, None]:
"""Context manager for database sessions."""
caller_info = "unknown caller"
try:
import inspect
frame = inspect.currentframe()
stack = inspect.getouterframes(frame)
for i, frame_info in enumerate(stack):
module = inspect.getmodule(frame_info.frame)
module_name = module.__name__ if module else "unknown"
if module_name != "contextlib" and "db.py" not in frame_info.filename:
caller_module = module_name
caller_function = frame_info.function
caller_lineno = frame_info.lineno
caller_file = frame_info.filename.split("/")[-1]
caller_info = f"{caller_module}.{caller_function}:{caller_lineno} ({caller_file})"
break
except:
pass
finally:
del frame
self.session_caller_trace(caller_info)
session_factory = self.get_session_factory(name)
if not session_factory:
raise ValueError(f"No session factory found for '{name}'")
@@ -250,6 +276,11 @@ class DatabaseRegistry:
finally:
await session.close()
@trace_method
def session_caller_trace(self, caller_info: str):
"""Trace sync db caller information for debugging purposes."""
pass # wrapper used for otel tracing only
# Create a singleton instance
db_registry = DatabaseRegistry()

View File

@@ -23,10 +23,10 @@ from letta.schemas.enums import FileProcessingStatus
from letta.schemas.file import FileMetadata
from letta.schemas.passage import Passage
from letta.schemas.source import Source, SourceCreate, SourceUpdate
from letta.schemas.source_metadata import OrganizationSourcesStats
from letta.schemas.user import User
from letta.server.rest_api.utils import get_letta_server
from letta.server.server import SyncServer
from letta.services.file_processor.chunker.llama_index_chunker import LlamaIndexChunker
from letta.services.file_processor.embedder.openai_embedder import OpenAIEmbedder
from letta.services.file_processor.embedder.pinecone_embedder import PineconeEmbedder
from letta.services.file_processor.file_processor import FileProcessor
@@ -95,6 +95,24 @@ async def get_source_id_by_name(
return source.id
@router.get("/metadata", response_model=OrganizationSourcesStats, operation_id="get_sources_metadata")
async def get_sources_metadata(
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"),
):
"""
Get aggregated metadata for all sources in an organization.
Returns structured metadata including:
- Total number of sources
- Total number of files across all sources
- Total size of all files
- Per-source breakdown with file details (file_name, file_size per file)
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
return await server.file_manager.get_organization_sources_metadata(actor=actor)
@router.get("/", response_model=List[Source], operation_id="list_sources")
async def list_sources(
server: "SyncServer" = Depends(get_letta_server),
@@ -344,7 +362,9 @@ async def get_file_metadata(
if should_use_pinecone() and not file_metadata.is_processing_terminal():
ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor, limit=file_metadata.total_chunks)
logger.info(f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} in organization {actor.organization_id}")
logger.info(
f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} ({file_metadata.file_name}) in organization {actor.organization_id}"
)
if len(ids) != file_metadata.chunks_embedded or len(ids) == file_metadata.total_chunks:
if len(ids) != file_metadata.total_chunks:
@@ -424,15 +444,12 @@ async def load_file_to_source_cloud(
file_metadata: FileMetadata,
):
file_processor = MistralFileParser()
text_chunker = LlamaIndexChunker(chunk_size=embedding_config.embedding_chunk_size)
using_pinecone = should_use_pinecone()
if using_pinecone:
embedder = PineconeEmbedder()
else:
embedder = OpenAIEmbedder(embedding_config=embedding_config)
file_processor = FileProcessor(
file_parser=file_processor, text_chunker=text_chunker, embedder=embedder, actor=actor, using_pinecone=using_pinecone
)
file_processor = FileProcessor(file_parser=file_processor, embedder=embedder, actor=actor, using_pinecone=using_pinecone)
await file_processor.process(
server=server, agent_states=agent_states, source_id=source_id, content=content, file_metadata=file_metadata
)

View File

@@ -16,6 +16,7 @@ from letta.otel.tracing import trace_method
from letta.schemas.enums import FileProcessingStatus
from letta.schemas.file import FileMetadata as PydanticFileMetadata
from letta.schemas.source import Source as PydanticSource
from letta.schemas.source_metadata import FileStats, OrganizationSourcesStats, SourceStats
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.utils import enforce_types
@@ -272,3 +273,72 @@ class FileManager:
else:
# Add numeric suffix
return f"{source.name}/{base}_({count}){ext}"
@enforce_types
@trace_method
async def get_organization_sources_metadata(self, actor: PydanticUser) -> OrganizationSourcesStats:
"""
Get aggregated metadata for all sources in an organization with optimized queries.
Returns structured metadata including:
- Total number of sources
- Total number of files across all sources
- Total size of all files
- Per-source breakdown with file details
"""
async with db_registry.async_session() as session:
# Import here to avoid circular imports
from letta.orm.source import Source as SourceModel
# Single optimized query to get all sources with their file aggregations
query = (
select(
SourceModel.id,
SourceModel.name,
func.count(FileMetadataModel.id).label("file_count"),
func.coalesce(func.sum(FileMetadataModel.file_size), 0).label("total_size"),
)
.outerjoin(FileMetadataModel, (FileMetadataModel.source_id == SourceModel.id) & (FileMetadataModel.is_deleted == False))
.where(SourceModel.organization_id == actor.organization_id)
.where(SourceModel.is_deleted == False)
.group_by(SourceModel.id, SourceModel.name)
.order_by(SourceModel.name)
)
result = await session.execute(query)
source_aggregations = result.fetchall()
# Build response
metadata = OrganizationSourcesStats()
for row in source_aggregations:
source_id, source_name, file_count, total_size = row
# Get individual file details for this source
files_query = (
select(FileMetadataModel.id, FileMetadataModel.file_name, FileMetadataModel.file_size)
.where(
FileMetadataModel.source_id == source_id,
FileMetadataModel.organization_id == actor.organization_id,
FileMetadataModel.is_deleted == False,
)
.order_by(FileMetadataModel.file_name)
)
files_result = await session.execute(files_query)
files_rows = files_result.fetchall()
# Build file stats
files = [FileStats(file_id=file_row[0], file_name=file_row[1], file_size=file_row[2]) for file_row in files_rows]
# Build source metadata
source_metadata = SourceStats(
source_id=source_id, source_name=source_name, file_count=file_count, total_size=total_size, files=files
)
metadata.sources.append(source_metadata)
metadata.total_files += file_count
metadata.total_size += total_size
metadata.total_sources = len(metadata.sources)
return metadata

View File

@@ -99,7 +99,12 @@ class LineChunker:
return [line for line in lines if line.strip()]
def chunk_text(
self, file_metadata: FileMetadata, start: Optional[int] = None, end: Optional[int] = None, add_metadata: bool = True
self,
file_metadata: FileMetadata,
start: Optional[int] = None,
end: Optional[int] = None,
add_metadata: bool = True,
validate_range: bool = False,
) -> List[str]:
"""Content-aware text chunking based on file type"""
strategy = self._determine_chunking_strategy(file_metadata)
@@ -116,11 +121,31 @@ class LineChunker:
content_lines = self._chunk_by_lines(text, preserve_indentation=False)
total_chunks = len(content_lines)
chunk_type = (
"sentences" if strategy == ChunkingStrategy.DOCUMENTATION else "chunks" if strategy == ChunkingStrategy.PROSE else "lines"
)
# Validate range if requested
if validate_range and (start is not None or end is not None):
if start is not None and start >= total_chunks:
# Convert to 1-indexed for user-friendly error message
start_display = start + 1
raise ValueError(
f"File {file_metadata.file_name} has only {total_chunks} lines, but requested offset {start_display} is out of range"
)
if start is not None and end is not None and end > total_chunks:
# Convert to 1-indexed for user-friendly error message
start_display = start + 1
end_display = end
raise ValueError(
f"File {file_metadata.file_name} has only {total_chunks} lines, but requested range {start_display} to {end_display} extends beyond file bounds"
)
# Handle start/end slicing
if start is not None and end is not None:
if start is not None or end is not None:
content_lines = content_lines[start:end]
line_offset = start
line_offset = start if start is not None else 0
else:
line_offset = 0
@@ -129,14 +154,15 @@ class LineChunker:
# Add metadata about total chunks
if add_metadata:
chunk_type = (
"sentences" if strategy == ChunkingStrategy.DOCUMENTATION else "chunks" if strategy == ChunkingStrategy.PROSE else "lines"
)
if start is not None and end is not None:
# Display 1-indexed ranges for users
start_display = start + 1
end_display = end
content_lines.insert(0, f"[Viewing {chunk_type} {start_display} to {end_display} (out of {total_chunks} {chunk_type})]")
elif start is not None:
# Only start specified - viewing from start to end
start_display = start + 1
content_lines.insert(0, f"[Viewing {chunk_type} {start_display} to end (out of {total_chunks} {chunk_type})]")
else:
content_lines.insert(0, f"[Viewing file start (out of {total_chunks} {chunk_type})]")

View File

@@ -1,119 +1,169 @@
from typing import List, Tuple
from typing import List, Optional, Union
from mistralai import OCRPageObject
from letta.log import get_logger
from letta.otel.tracing import trace_method
from letta.services.file_processor.file_types import ChunkingStrategy, file_type_registry
logger = get_logger(__name__)
class LlamaIndexChunker:
"""LlamaIndex-based text chunking"""
"""LlamaIndex-based text chunking with automatic splitter selection"""
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50):
# Conservative default chunk sizes for fallback scenarios
DEFAULT_CONSERVATIVE_CHUNK_SIZE = 384
DEFAULT_CONSERVATIVE_CHUNK_OVERLAP = 25
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50, file_type: Optional[str] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.file_type = file_type
from llama_index.core.node_parser import SentenceSplitter
# Create appropriate parser based on file type
self.parser = self._create_parser_for_file_type(file_type, chunk_size, chunk_overlap)
self.parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
# Log which parser was selected
parser_name = type(self.parser).__name__
logger.info(f"LlamaIndexChunker initialized with {parser_name} for file type: {file_type}")
def _create_parser_for_file_type(self, file_type: Optional[str], chunk_size: int, chunk_overlap: int):
"""Create appropriate parser based on file type"""
if not file_type:
# Default fallback
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
try:
# Get chunking strategy from file type registry
chunking_strategy = file_type_registry.get_chunking_strategy_by_mime_type(file_type)
logger.debug(f"Chunking strategy for {file_type}: {chunking_strategy}")
if chunking_strategy == ChunkingStrategy.CODE:
from llama_index.core.node_parser import CodeSplitter
return CodeSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
elif chunking_strategy == ChunkingStrategy.DOCUMENTATION:
if file_type in ["text/markdown", "text/x-markdown"]:
from llama_index.core.node_parser import MarkdownNodeParser
return MarkdownNodeParser()
elif file_type in ["text/html"]:
from llama_index.core.node_parser import HTMLNodeParser
return HTMLNodeParser()
else:
# Fall back to sentence splitter for other documentation
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
elif chunking_strategy == ChunkingStrategy.STRUCTURED_DATA:
if file_type in ["application/json", "application/jsonl"]:
from llama_index.core.node_parser import JSONNodeParser
return JSONNodeParser()
else:
# Fall back to sentence splitter for other structured data
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
else:
# Default to sentence splitter for PROSE and LINE_BASED
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
except Exception as e:
logger.warning(f"Failed to create specialized parser for {file_type}: {str(e)}. Using default SentenceSplitter.")
from llama_index.core.node_parser import SentenceSplitter
return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
# TODO: Make this more general beyond Mistral
@trace_method
def chunk_text(self, page: OCRPageObject) -> List[str]:
def chunk_text(self, content: Union[OCRPageObject, str]) -> List[str]:
"""Chunk text using LlamaIndex splitter"""
try:
return self.parser.split_text(page.markdown)
# Handle different input types
if isinstance(content, OCRPageObject):
# Extract markdown from OCR page object
text_content = content.markdown
else:
# Assume it's a string
text_content = content
# Use the selected parser
if hasattr(self.parser, "split_text"):
# Most parsers have split_text method
return self.parser.split_text(text_content)
elif hasattr(self.parser, "get_nodes_from_documents"):
# Some parsers need Document objects
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
document = Document(text=text_content)
nodes = self.parser.get_nodes_from_documents([document])
# Further split nodes that exceed chunk_size using SentenceSplitter
final_chunks = []
sentence_splitter = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
for node in nodes:
if len(node.text) > self.chunk_size:
# Split oversized nodes with sentence splitter
sub_chunks = sentence_splitter.split_text(node.text)
final_chunks.extend(sub_chunks)
else:
final_chunks.append(node.text)
return final_chunks
else:
# Fallback - try to call the parser directly
return self.parser(text_content)
except Exception as e:
logger.error(f"Chunking failed: {str(e)}")
raise
logger.error(f"Chunking failed with {type(self.parser).__name__}: {str(e)}")
# Try fallback with SentenceSplitter
try:
logger.info("Attempting fallback to SentenceSplitter")
from llama_index.core.node_parser import SentenceSplitter
fallback_parser = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap)
class MarkdownChunker:
"""Markdown-specific chunker that preserves line numbers for citation purposes"""
# Extract text content if needed
if isinstance(content, OCRPageObject):
text_content = content.markdown
else:
text_content = content
def __init__(self, chunk_size: int = 2048):
self.chunk_size = chunk_size
# No overlap for line-based citations to avoid ambiguity
return fallback_parser.split_text(text_content)
except Exception as fallback_error:
logger.error(f"Fallback chunking also failed: {str(fallback_error)}")
raise e # Raise the original error
from llama_index.core.node_parser import MarkdownNodeParser
self.parser = MarkdownNodeParser()
def chunk_markdown_with_line_numbers(self, markdown_content: str) -> List[Tuple[str, int, int]]:
"""
Chunk markdown content while preserving line number mappings.
Returns:
List of tuples: (chunk_text, start_line, end_line)
"""
@trace_method
def default_chunk_text(self, content: Union[OCRPageObject, str], chunk_size: int = None, chunk_overlap: int = None) -> List[str]:
"""Chunk text using default SentenceSplitter regardless of file type with conservative defaults"""
try:
# Split content into lines for line number tracking
lines = markdown_content.split("\n")
from llama_index.core.node_parser import SentenceSplitter
# Create nodes using MarkdownNodeParser
from llama_index.core import Document
# Use provided defaults or fallback to conservative values
chunk_size = chunk_size if chunk_size is not None else self.DEFAULT_CONSERVATIVE_CHUNK_SIZE
chunk_overlap = chunk_overlap if chunk_overlap is not None else self.DEFAULT_CONSERVATIVE_CHUNK_OVERLAP
default_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
document = Document(text=markdown_content)
nodes = self.parser.get_nodes_from_documents([document])
# Handle different input types
if isinstance(content, OCRPageObject):
text_content = content.markdown
else:
text_content = content
chunks_with_line_numbers = []
for node in nodes:
chunk_text = node.text
# Find the line numbers for this chunk
start_line, end_line = self._find_line_numbers(chunk_text, lines)
chunks_with_line_numbers.append((chunk_text, start_line, end_line))
return chunks_with_line_numbers
return default_parser.split_text(text_content)
except Exception as e:
logger.error(f"Markdown chunking failed: {str(e)}")
# Fallback to simple line-based chunking
return self._fallback_line_chunking(markdown_content)
def _find_line_numbers(self, chunk_text: str, lines: List[str]) -> Tuple[int, int]:
"""Find the start and end line numbers for a given chunk of text."""
chunk_lines = chunk_text.split("\n")
# Find the first line of the chunk in the original document
start_line = 1
for i, line in enumerate(lines):
if chunk_lines[0].strip() in line.strip() and len(chunk_lines[0].strip()) > 10: # Avoid matching short lines
start_line = i + 1
break
# Calculate end line
end_line = start_line + len(chunk_lines) - 1
return start_line, min(end_line, len(lines))
def _fallback_line_chunking(self, markdown_content: str) -> List[Tuple[str, int, int]]:
"""Fallback chunking method that simply splits by lines with no overlap."""
lines = markdown_content.split("\n")
chunks = []
i = 0
while i < len(lines):
chunk_lines = []
start_line = i + 1
char_count = 0
# Build chunk until we hit size limit
while i < len(lines) and char_count < self.chunk_size:
line = lines[i]
chunk_lines.append(line)
char_count += len(line) + 1 # +1 for newline
i += 1
end_line = i
chunk_text = "\n".join(chunk_lines)
chunks.append((chunk_text, start_line, end_line))
# No overlap - continue from where we left off
return chunks
logger.error(f"Default chunking failed: {str(e)}")
raise

View File

@@ -91,7 +91,7 @@ class OpenAIEmbedder(BaseEmbedder):
try:
return await self._embed_batch(batch, indices)
except Exception as e:
logger.error(f"Failed to embed batch of size {len(batch)}: {str(e)}")
logger.error("Failed to embed batch of size %s: %s", len(batch), e)
log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__})
raise

View File

@@ -26,14 +26,12 @@ class FileProcessor:
def __init__(
self,
file_parser: MistralFileParser,
text_chunker: LlamaIndexChunker,
embedder: BaseEmbedder,
actor: User,
using_pinecone: bool,
max_file_size: int = 50 * 1024 * 1024, # 50MB default
):
self.file_parser = file_parser
self.text_chunker = text_chunker
self.line_chunker = LineChunker()
self.embedder = embedder
self.max_file_size = max_file_size
@@ -44,6 +42,61 @@ class FileProcessor:
self.actor = actor
self.using_pinecone = using_pinecone
async def _chunk_and_embed_with_fallback(self, file_metadata: FileMetadata, ocr_response, source_id: str) -> List:
"""Chunk text and generate embeddings with fallback to default chunker if needed"""
filename = file_metadata.file_name
# Create file-type-specific chunker
text_chunker = LlamaIndexChunker(file_type=file_metadata.file_type)
# First attempt with file-specific chunker
try:
all_chunks = []
for page in ocr_response.pages:
chunks = text_chunker.chunk_text(page)
if not chunks:
log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)})
raise ValueError("No chunks created from text")
all_chunks.extend(chunks)
all_passages = await self.embedder.generate_embedded_passages(
file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor
)
return all_passages
except Exception as e:
logger.warning(f"Failed to chunk/embed with file-specific chunker for {filename}: {str(e)}. Retrying with default chunker.")
log_event("file_processor.embedding_failed_retrying", {"filename": filename, "error": str(e), "error_type": type(e).__name__})
# Retry with default chunker
try:
logger.info(f"Retrying chunking with default SentenceSplitter for {filename}")
all_chunks = []
for page in ocr_response.pages:
chunks = text_chunker.default_chunk_text(page)
if not chunks:
log_event(
"file_processor.default_chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}
)
raise ValueError("No chunks created from text with default chunker")
all_chunks.extend(chunks)
all_passages = await self.embedder.generate_embedded_passages(
file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor
)
logger.info(f"Successfully generated passages with default chunker for {filename}")
log_event("file_processor.default_chunking_success", {"filename": filename, "total_chunks": len(all_chunks)})
return all_passages
except Exception as fallback_error:
logger.error("Default chunking also failed for %s: %s", filename, fallback_error)
log_event(
"file_processor.default_chunking_also_failed",
{"filename": filename, "fallback_error": str(fallback_error), "fallback_error_type": type(fallback_error).__name__},
)
raise fallback_error
# TODO: Factor this function out of SyncServer
@trace_method
async def process(
@@ -111,19 +164,10 @@ class FileProcessor:
logger.info("Chunking extracted text")
log_event("file_processor.chunking_started", {"filename": filename, "pages_to_process": len(ocr_response.pages)})
all_chunks = []
for page in ocr_response.pages:
chunks = self.text_chunker.chunk_text(page)
if not chunks:
log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)})
raise ValueError("No chunks created from text")
all_chunks.extend(self.text_chunker.chunk_text(page))
all_passages = await self.embedder.generate_embedded_passages(
file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor
# Chunk and embed with fallback logic
all_passages = await self._chunk_and_embed_with_fallback(
file_metadata=file_metadata, ocr_response=ocr_response, source_id=source_id
)
if not self.using_pinecone:
@@ -156,7 +200,7 @@ class FileProcessor:
return all_passages
except Exception as e:
logger.error(f"File processing failed for {filename}: {str(e)}")
logger.error("File processing failed for %s: %s", filename, e)
log_event(
"file_processor.processing_failed",
{

View File

@@ -175,7 +175,7 @@ class LettaFileToolExecutor(ToolExecutor):
file = await self.file_manager.get_file_by_id(file_id=file_id, actor=self.actor, include_content=True)
# Process file content
content_lines = LineChunker().chunk_text(file_metadata=file, start=start, end=end)
content_lines = LineChunker().chunk_text(file_metadata=file, start=start, end=end, validate_range=True)
visible_content = "\n".join(content_lines)
# Handle LRU eviction and file opening

View File

@@ -247,7 +247,7 @@ class Settings(BaseSettings):
# cron job parameters
enable_batch_job_polling: bool = False
poll_running_llm_batches_interval_seconds: int = 5 * 60
poll_lock_retry_interval_seconds: int = 5 * 60
poll_lock_retry_interval_seconds: int = 8 * 60
batch_job_polling_lookback_weeks: int = 2
batch_job_polling_batch_size: Optional[int] = None

View File

@@ -5299,6 +5299,95 @@ async def test_upsert_file_content_basic(server: SyncServer, default_user, defau
assert orm_file.updated_at > orm_file.created_at
@pytest.mark.asyncio
async def test_get_organization_sources_metadata(server, default_user):
"""Test getting organization sources metadata with aggregated file information."""
# Create test sources
source1 = await server.source_manager.create_source(
source=PydanticSource(
name="test_source_1",
embedding_config=DEFAULT_EMBEDDING_CONFIG,
),
actor=default_user,
)
source2 = await server.source_manager.create_source(
source=PydanticSource(
name="test_source_2",
embedding_config=DEFAULT_EMBEDDING_CONFIG,
),
actor=default_user,
)
# Create test files for source1
file1_meta = PydanticFileMetadata(
source_id=source1.id,
file_name="file1.txt",
file_type="text/plain",
file_size=1024,
)
file1 = await server.file_manager.create_file(file_metadata=file1_meta, actor=default_user)
file2_meta = PydanticFileMetadata(
source_id=source1.id,
file_name="file2.txt",
file_type="text/plain",
file_size=2048,
)
file2 = await server.file_manager.create_file(file_metadata=file2_meta, actor=default_user)
# Create test file for source2
file3_meta = PydanticFileMetadata(
source_id=source2.id,
file_name="file3.txt",
file_type="text/plain",
file_size=512,
)
file3 = await server.file_manager.create_file(file_metadata=file3_meta, actor=default_user)
# Get organization metadata
metadata = await server.file_manager.get_organization_sources_metadata(actor=default_user)
# Verify top-level aggregations
assert metadata.total_sources >= 2 # May have other sources from other tests
assert metadata.total_files >= 3
assert metadata.total_size >= 3584
# Find our test sources in the results
source1_meta = next((s for s in metadata.sources if s.source_id == source1.id), None)
source2_meta = next((s for s in metadata.sources if s.source_id == source2.id), None)
assert source1_meta is not None
assert source1_meta.source_name == "test_source_1"
assert source1_meta.file_count == 2
assert source1_meta.total_size == 3072 # 1024 + 2048
assert len(source1_meta.files) == 2
# Verify file details in source1
file1_stats = next((f for f in source1_meta.files if f.file_id == file1.id), None)
file2_stats = next((f for f in source1_meta.files if f.file_id == file2.id), None)
assert file1_stats is not None
assert file1_stats.file_name == "file1.txt"
assert file1_stats.file_size == 1024
assert file2_stats is not None
assert file2_stats.file_name == "file2.txt"
assert file2_stats.file_size == 2048
assert source2_meta is not None
assert source2_meta.source_name == "test_source_2"
assert source2_meta.file_count == 1
assert source2_meta.total_size == 512
assert len(source2_meta.files) == 1
# Verify file details in source2
file3_stats = source2_meta.files[0]
assert file3_stats.file_id == file3.id
assert file3_stats.file_name == "file3.txt"
assert file3_stats.file_size == 512
# ======================================================================================================================
# SandboxConfigManager Tests - Sandbox Configs
# ======================================================================================================================

View File

@@ -2,6 +2,8 @@ import pytest
from letta.constants import MAX_FILENAME_LENGTH
from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_function_annotations_from_source
from letta.schemas.file import FileMetadata
from letta.services.file_processor.chunker.line_chunker import LineChunker
from letta.services.helpers.agent_manager_helper import safe_format
from letta.utils import sanitize_filename
@@ -407,3 +409,103 @@ def test_formatter():
"""
assert UNUSED_AND_EMPRY_VAR_SOL == safe_format(UNUSED_AND_EMPRY_VAR, VARS_DICT)
# ---------------------- LineChunker TESTS ---------------------- #
def test_line_chunker_valid_range():
"""Test that LineChunker works correctly with valid ranges"""
file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3\nline4")
chunker = LineChunker()
# Test valid range with validation
result = chunker.chunk_text(file, start=1, end=3, validate_range=True)
# Should return lines 2 and 3 (0-indexed 1:3)
assert "[Viewing lines 2 to 3 (out of 4 lines)]" in result[0]
assert "2: line2" in result[1]
assert "3: line3" in result[2]
def test_line_chunker_valid_range_no_validation():
"""Test that LineChunker works the same without validation for valid ranges"""
file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3\nline4")
chunker = LineChunker()
# Test same range without validation
result = chunker.chunk_text(file, start=1, end=3, validate_range=False)
assert "[Viewing lines 2 to 3 (out of 4 lines)]" in result[0]
assert "2: line2" in result[1]
assert "3: line3" in result[2]
def test_line_chunker_out_of_range_start():
"""Test that LineChunker throws error when start is out of range"""
file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3")
chunker = LineChunker()
# Test with start beyond file length (3 lines, requesting start=5 which is 0-indexed 4)
with pytest.raises(ValueError, match="File test.py has only 3 lines, but requested offset 6 is out of range"):
chunker.chunk_text(file, start=5, end=6, validate_range=True)
def test_line_chunker_out_of_range_end():
"""Test that LineChunker throws error when end extends beyond file bounds"""
file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3")
chunker = LineChunker()
# Test with end beyond file length (3 lines, requesting 1 to 10)
with pytest.raises(ValueError, match="File test.py has only 3 lines, but requested range 1 to 10 extends beyond file bounds"):
chunker.chunk_text(file, start=0, end=10, validate_range=True)
def test_line_chunker_edge_case_empty_file():
"""Test that LineChunker handles empty files correctly"""
file = FileMetadata(file_name="empty.py", source_id="test_source", content="")
chunker = LineChunker()
# Test requesting lines from empty file
with pytest.raises(ValueError, match="File empty.py has only 0 lines, but requested offset 1 is out of range"):
chunker.chunk_text(file, start=0, end=1, validate_range=True)
def test_line_chunker_edge_case_single_line():
"""Test that LineChunker handles single line files correctly"""
file = FileMetadata(file_name="single.py", source_id="test_source", content="only line")
chunker = LineChunker()
# Test valid single line access
result = chunker.chunk_text(file, start=0, end=1, validate_range=True)
assert "1: only line" in result[1]
# Test out of range for single line file
with pytest.raises(ValueError, match="File single.py has only 1 lines, but requested offset 2 is out of range"):
chunker.chunk_text(file, start=1, end=2, validate_range=True)
def test_line_chunker_validation_disabled_allows_out_of_range():
"""Test that when validation is disabled, out of range silently returns partial results"""
file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3")
chunker = LineChunker()
# Test with validation disabled - should not raise error
result = chunker.chunk_text(file, start=5, end=10, validate_range=False)
# Should return empty content (except metadata header) since slice is out of bounds
assert len(result) == 1 # Only metadata header
assert "[Viewing lines 6 to 10 (out of 3 lines)]" in result[0]
def test_line_chunker_only_start_parameter():
"""Test validation with only start parameter specified"""
file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3")
chunker = LineChunker()
# Test valid start only
result = chunker.chunk_text(file, start=1, validate_range=True)
assert "[Viewing lines 2 to end (out of 3 lines)]" in result[0]
assert "2: line2" in result[1]
assert "3: line3" in result[2]
# Test invalid start only
with pytest.raises(ValueError, match="File test.py has only 3 lines, but requested offset 4 is out of range"):
chunker.chunk_text(file, start=3, validate_range=True)