diff --git a/letta/constants.py b/letta/constants.py index bd8d2c49..5b7725bb 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -371,3 +371,4 @@ PINECONE_TEXT_FIELD_NAME = "chunk_text" PINECONE_METRIC = "cosine" PINECONE_CLOUD = "aws" PINECONE_REGION = "us-east-1" +PINECONE_MAX_BATCH_SIZE = 96 diff --git a/letta/helpers/pinecone_utils.py b/letta/helpers/pinecone_utils.py index d7cd1d4a..d51b071d 100644 --- a/letta/helpers/pinecone_utils.py +++ b/letta/helpers/pinecone_utils.py @@ -2,7 +2,14 @@ from typing import Any, Dict, List from pinecone import PineconeAsyncio -from letta.constants import PINECONE_CLOUD, PINECONE_EMBEDDING_MODEL, PINECONE_METRIC, PINECONE_REGION, PINECONE_TEXT_FIELD_NAME +from letta.constants import ( + PINECONE_CLOUD, + PINECONE_EMBEDDING_MODEL, + PINECONE_MAX_BATCH_SIZE, + PINECONE_METRIC, + PINECONE_REGION, + PINECONE_TEXT_FIELD_NAME, +) from letta.log import get_logger from letta.schemas.user import User from letta.settings import settings @@ -90,7 +97,10 @@ async def upsert_records_to_pinecone_index(records: List[dict], actor: User): async with PineconeAsyncio(api_key=settings.pinecone_api_key) as pc: description = await pc.describe_index(name=settings.pinecone_source_index) async with pc.IndexAsyncio(host=description.index.host) as dense_index: - await dense_index.upsert_records(actor.organization_id, records) + # Process records in batches to avoid exceeding Pinecone limits + for i in range(0, len(records), PINECONE_MAX_BATCH_SIZE): + batch = records[i : i + PINECONE_MAX_BATCH_SIZE] + await dense_index.upsert_records(actor.organization_id, batch) async def search_pinecone_index(query: str, limit: int, filter: Dict[str, Any], actor: User) -> Dict[str, Any]: diff --git a/letta/jobs/scheduler.py b/letta/jobs/scheduler.py index b453aea9..056167fa 100644 --- a/letta/jobs/scheduler.py +++ b/letta/jobs/scheduler.py @@ -4,10 +4,11 @@ from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger +from sqlalchemy import text from letta.jobs.llm_batch_job_polling import poll_running_llm_batches from letta.log import get_logger -from letta.server.db import db_context +from letta.server.db import db_registry from letta.server.server import SyncServer from letta.settings import settings @@ -16,68 +17,54 @@ scheduler = AsyncIOScheduler() logger = get_logger(__name__) ADVISORY_LOCK_KEY = 0x12345678ABCDEF00 -_advisory_lock_conn = None # Holds the raw DB connection if leader -_advisory_lock_cur = None # Holds the cursor for the lock connection if leader +_advisory_lock_session = None # Holds the async session if leader _lock_retry_task: Optional[asyncio.Task] = None # Background task handle for non-leaders _is_scheduler_leader = False # Flag indicating if this instance runs the scheduler async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: """Attempts to acquire lock, starts scheduler if successful.""" - global _advisory_lock_conn, _advisory_lock_cur, _is_scheduler_leader, scheduler + global _advisory_lock_session, _is_scheduler_leader, scheduler if _is_scheduler_leader: return True # Already leading - raw_conn = None - cur = None + lock_session = None acquired_lock = False try: - # Use a temporary connection context for the attempt initially - with db_context() as session: + async with db_registry.async_session() as session: engine = session.get_bind() engine_name = engine.name logger.info(f"Database engine type: {engine_name}") - if engine_name != "postgresql": logger.warning(f"Advisory locks not supported for {engine_name} database. Starting scheduler without leader election.") - acquired_lock = True # For SQLite, assume we can start the scheduler + acquired_lock = True else: - # Get raw connection - MUST be kept open if lock is acquired - raw_conn = engine.raw_connection() - cur = raw_conn.cursor() - - cur.execute("SELECT pg_try_advisory_lock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,)) - acquired_lock = cur.fetchone()[0] + lock_session = db_registry.get_async_session_factory()() + result = await lock_session.execute( + text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY} + ) + acquired_lock = result.scalar() if not acquired_lock: - if cur: - cur.close() - if raw_conn: - raw_conn.close() + if lock_session: + await lock_session.close() logger.info("Scheduler lock held by another instance.") return False - # --- Lock Acquired --- if engine_name == "postgresql": logger.info("Acquired PostgreSQL advisory lock.") - _advisory_lock_conn = raw_conn # Keep connection for lock duration - _advisory_lock_cur = cur # Keep cursor for lock duration - raw_conn = None # Prevent closing in finally block - cur = None # Prevent closing in finally block + _advisory_lock_session = lock_session + lock_session = None else: logger.info("Starting scheduler for non-PostgreSQL database.") - # For SQLite, we don't need to keep the connection open - if cur: - cur.close() - if raw_conn: - raw_conn.close() - raw_conn = None - cur = None + if lock_session: + await lock_session.close() + lock_session = None trigger = IntervalTrigger( seconds=settings.poll_running_llm_batches_interval_seconds, - jitter=10, # Jitter for the job execution + jitter=10, ) scheduler.add_job( poll_running_llm_batches, @@ -91,7 +78,7 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: if not scheduler.running: scheduler.start() - elif scheduler.state == 2: # PAUSED + elif scheduler.state == 2: scheduler.resume() _is_scheduler_leader = True @@ -99,38 +86,27 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: except Exception as e: logger.error(f"Error during lock acquisition/scheduler start: {e}", exc_info=True) - if acquired_lock: # If lock was acquired before error, try to release + if acquired_lock: logger.warning("Attempting to release lock due to error during startup.") try: - # Use the cursor/connection we were about to store - _advisory_lock_cur = cur - _advisory_lock_conn = raw_conn - await _release_advisory_lock() # Attempt cleanup + _advisory_lock_session = lock_session + await _release_advisory_lock() except Exception as unlock_err: logger.error(f"Failed to release lock during error handling: {unlock_err}", exc_info=True) finally: - # Ensure globals are cleared after failed attempt - _advisory_lock_cur = None - _advisory_lock_conn = None + _advisory_lock_session = None _is_scheduler_leader = False - # Ensure scheduler is stopped if we failed partially if scheduler.running: try: scheduler.shutdown(wait=False) except: - pass # Best effort + pass return False finally: - # Clean up temporary resources if lock wasn't acquired or error occurred before storing - if cur: + if lock_session: try: - cur.close() - except: - pass - if raw_conn: - try: - raw_conn.close() + await lock_session.close() except: pass @@ -141,63 +117,50 @@ async def _background_lock_retry_loop(server: SyncServer): logger.info("Starting background task to periodically check for scheduler lock.") while True: - if _is_scheduler_leader: # Should be cancelled first, but safety check + if _is_scheduler_leader: break try: wait_time = settings.poll_lock_retry_interval_seconds await asyncio.sleep(wait_time) - # Re-check state before attempting lock if _is_scheduler_leader or _lock_retry_task is None: - break # Stop if became leader or task was cancelled + break acquired = await _try_acquire_lock_and_start_scheduler(server) if acquired: logger.info("Background task acquired lock and started scheduler.") - _lock_retry_task = None # Clear self handle - break # Exit loop, we are now the leader + _lock_retry_task = None + break except asyncio.CancelledError: logger.info("Background lock retry task cancelled.") break except Exception as e: logger.error(f"Error in background lock retry loop: {e}", exc_info=True) - # Avoid tight loop on persistent errors await asyncio.sleep(settings.poll_lock_retry_interval_seconds) async def _release_advisory_lock(): - """Releases the advisory lock using the stored connection.""" - global _advisory_lock_conn, _advisory_lock_cur + """Releases the advisory lock using the stored session.""" + global _advisory_lock_session - lock_cur = _advisory_lock_cur - lock_conn = _advisory_lock_conn - _advisory_lock_cur = None # Clear global immediately - _advisory_lock_conn = None # Clear global immediately + lock_session = _advisory_lock_session + _advisory_lock_session = None - if lock_cur is not None and lock_conn is not None: + if lock_session is not None: logger.info(f"Attempting to release PostgreSQL advisory lock {ADVISORY_LOCK_KEY}") try: - # Try to execute unlock - connection/cursor validity is checked by attempting the operation - lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,)) - lock_cur.fetchone() # Consume result - lock_conn.commit() + await lock_session.execute(text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY}) logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}") except Exception as e: logger.error(f"Error executing pg_advisory_unlock: {e}", exc_info=True) finally: - # Ensure resources are closed regardless of unlock success try: - if lock_cur: - lock_cur.close() + if lock_session: + await lock_session.close() + logger.info("Closed database session that held advisory lock.") except Exception as e: - logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True) - try: - if lock_conn: - lock_conn.close() - logger.info("Closed database connection that held advisory lock.") - except Exception as e: - logger.error(f"Error closing advisory lock connection: {e}", exc_info=True) + logger.error(f"Error closing advisory lock session: {e}", exc_info=True) else: logger.info("No PostgreSQL advisory lock to release (likely using SQLite or non-PostgreSQL database).") @@ -220,7 +183,6 @@ async def start_scheduler_with_leader_election(server: SyncServer): acquired_immediately = await _try_acquire_lock_and_start_scheduler(server) if not acquired_immediately and _lock_retry_task is None: - # Failed initial attempt, start background retry task loop = asyncio.get_running_loop() _lock_retry_task = loop.create_task(_background_lock_retry_loop(server)) @@ -232,48 +194,40 @@ async def shutdown_scheduler_and_release_lock(): """ global _is_scheduler_leader, _lock_retry_task, scheduler - # 1. Cancel retry task if running (for non-leaders) if _lock_retry_task is not None: logger.info("Shutting down: Cancelling background lock retry task.") current_task = _lock_retry_task - _lock_retry_task = None # Clear handle first + _lock_retry_task = None current_task.cancel() try: - await current_task # Wait for cancellation + await current_task except asyncio.CancelledError: logger.info("Background lock retry task successfully cancelled.") except Exception as e: logger.warning(f"Exception waiting for cancelled retry task: {e}", exc_info=True) - # 2. Shutdown scheduler and release lock if we were the leader if _is_scheduler_leader: logger.info("Shutting down: Leader instance stopping scheduler and releasing lock.") if scheduler.running: try: - # Force synchronous shutdown to prevent callback scheduling scheduler.shutdown(wait=True) - # wait for any internal cleanup to complete await asyncio.sleep(0.1) logger.info("APScheduler shut down.") except Exception as e: - # Handle SchedulerNotRunningError and other shutdown exceptions logger.warning(f"Exception during APScheduler shutdown: {e}") if "not running" not in str(e).lower(): logger.error(f"Unexpected error shutting down APScheduler: {e}", exc_info=True) await _release_advisory_lock() - _is_scheduler_leader = False # Update state after cleanup + _is_scheduler_leader = False else: logger.info("Shutting down: Non-leader instance.") - # Final cleanup check for scheduler state (belt and suspenders) - # This should rarely be needed if shutdown logic above worked correctly try: if scheduler.running: logger.warning("Scheduler still running after shutdown logic completed? Forcing shutdown.") scheduler.shutdown(wait=False) except Exception as e: - # Catch SchedulerNotRunningError and other shutdown exceptions logger.debug(f"Expected exception during final scheduler cleanup: {e}") diff --git a/letta/schemas/source_metadata.py b/letta/schemas/source_metadata.py new file mode 100644 index 00000000..d395e188 --- /dev/null +++ b/letta/schemas/source_metadata.py @@ -0,0 +1,32 @@ +from typing import List, Optional + +from pydantic import Field + +from letta.schemas.letta_base import LettaBase + + +class FileStats(LettaBase): + """File statistics for metadata endpoint""" + + file_id: str = Field(..., description="Unique identifier of the file") + file_name: str = Field(..., description="Name of the file") + file_size: Optional[int] = Field(None, description="Size of the file in bytes") + + +class SourceStats(LettaBase): + """Aggregated metadata for a source""" + + source_id: str = Field(..., description="Unique identifier of the source") + source_name: str = Field(..., description="Name of the source") + file_count: int = Field(0, description="Number of files in the source") + total_size: int = Field(0, description="Total size of all files in bytes") + files: List[FileStats] = Field(default_factory=list, description="List of file statistics") + + +class OrganizationSourcesStats(LettaBase): + """Complete metadata response for organization sources""" + + total_sources: int = Field(0, description="Total number of sources") + total_files: int = Field(0, description="Total number of files across all sources") + total_size: int = Field(0, description="Total size of all files in bytes") + sources: List[SourceStats] = Field(default_factory=list, description="List of source metadata") diff --git a/letta/server/db.py b/letta/server/db.py index cf423c59..ff76a000 100644 --- a/letta/server/db.py +++ b/letta/server/db.py @@ -226,6 +226,32 @@ class DatabaseRegistry: @contextmanager def session(self, name: str = "default") -> Generator[Any, None, None]: """Context manager for database sessions.""" + caller_info = "unknown caller" + try: + import inspect + + frame = inspect.currentframe() + stack = inspect.getouterframes(frame) + + for i, frame_info in enumerate(stack): + module = inspect.getmodule(frame_info.frame) + module_name = module.__name__ if module else "unknown" + + if module_name != "contextlib" and "db.py" not in frame_info.filename: + caller_module = module_name + caller_function = frame_info.function + caller_lineno = frame_info.lineno + caller_file = frame_info.filename.split("/")[-1] + + caller_info = f"{caller_module}.{caller_function}:{caller_lineno} ({caller_file})" + break + except: + pass + finally: + del frame + + self.session_caller_trace(caller_info) + session_factory = self.get_session_factory(name) if not session_factory: raise ValueError(f"No session factory found for '{name}'") @@ -250,6 +276,11 @@ class DatabaseRegistry: finally: await session.close() + @trace_method + def session_caller_trace(self, caller_info: str): + """Trace sync db caller information for debugging purposes.""" + pass # wrapper used for otel tracing only + # Create a singleton instance db_registry = DatabaseRegistry() diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index bb6694eb..48049d73 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -23,10 +23,10 @@ from letta.schemas.enums import FileProcessingStatus from letta.schemas.file import FileMetadata from letta.schemas.passage import Passage from letta.schemas.source import Source, SourceCreate, SourceUpdate +from letta.schemas.source_metadata import OrganizationSourcesStats from letta.schemas.user import User from letta.server.rest_api.utils import get_letta_server from letta.server.server import SyncServer -from letta.services.file_processor.chunker.llama_index_chunker import LlamaIndexChunker from letta.services.file_processor.embedder.openai_embedder import OpenAIEmbedder from letta.services.file_processor.embedder.pinecone_embedder import PineconeEmbedder from letta.services.file_processor.file_processor import FileProcessor @@ -95,6 +95,24 @@ async def get_source_id_by_name( return source.id +@router.get("/metadata", response_model=OrganizationSourcesStats, operation_id="get_sources_metadata") +async def get_sources_metadata( + server: "SyncServer" = Depends(get_letta_server), + actor_id: Optional[str] = Header(None, alias="user_id"), +): + """ + Get aggregated metadata for all sources in an organization. + + Returns structured metadata including: + - Total number of sources + - Total number of files across all sources + - Total size of all files + - Per-source breakdown with file details (file_name, file_size per file) + """ + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + return await server.file_manager.get_organization_sources_metadata(actor=actor) + + @router.get("/", response_model=List[Source], operation_id="list_sources") async def list_sources( server: "SyncServer" = Depends(get_letta_server), @@ -344,7 +362,9 @@ async def get_file_metadata( if should_use_pinecone() and not file_metadata.is_processing_terminal(): ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor, limit=file_metadata.total_chunks) - logger.info(f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} in organization {actor.organization_id}") + logger.info( + f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} ({file_metadata.file_name}) in organization {actor.organization_id}" + ) if len(ids) != file_metadata.chunks_embedded or len(ids) == file_metadata.total_chunks: if len(ids) != file_metadata.total_chunks: @@ -424,15 +444,12 @@ async def load_file_to_source_cloud( file_metadata: FileMetadata, ): file_processor = MistralFileParser() - text_chunker = LlamaIndexChunker(chunk_size=embedding_config.embedding_chunk_size) using_pinecone = should_use_pinecone() if using_pinecone: embedder = PineconeEmbedder() else: embedder = OpenAIEmbedder(embedding_config=embedding_config) - file_processor = FileProcessor( - file_parser=file_processor, text_chunker=text_chunker, embedder=embedder, actor=actor, using_pinecone=using_pinecone - ) + file_processor = FileProcessor(file_parser=file_processor, embedder=embedder, actor=actor, using_pinecone=using_pinecone) await file_processor.process( server=server, agent_states=agent_states, source_id=source_id, content=content, file_metadata=file_metadata ) diff --git a/letta/services/file_manager.py b/letta/services/file_manager.py index ed1ee7bd..f4a84fb3 100644 --- a/letta/services/file_manager.py +++ b/letta/services/file_manager.py @@ -16,6 +16,7 @@ from letta.otel.tracing import trace_method from letta.schemas.enums import FileProcessingStatus from letta.schemas.file import FileMetadata as PydanticFileMetadata from letta.schemas.source import Source as PydanticSource +from letta.schemas.source_metadata import FileStats, OrganizationSourcesStats, SourceStats from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.utils import enforce_types @@ -272,3 +273,72 @@ class FileManager: else: # Add numeric suffix return f"{source.name}/{base}_({count}){ext}" + + @enforce_types + @trace_method + async def get_organization_sources_metadata(self, actor: PydanticUser) -> OrganizationSourcesStats: + """ + Get aggregated metadata for all sources in an organization with optimized queries. + + Returns structured metadata including: + - Total number of sources + - Total number of files across all sources + - Total size of all files + - Per-source breakdown with file details + """ + async with db_registry.async_session() as session: + # Import here to avoid circular imports + from letta.orm.source import Source as SourceModel + + # Single optimized query to get all sources with their file aggregations + query = ( + select( + SourceModel.id, + SourceModel.name, + func.count(FileMetadataModel.id).label("file_count"), + func.coalesce(func.sum(FileMetadataModel.file_size), 0).label("total_size"), + ) + .outerjoin(FileMetadataModel, (FileMetadataModel.source_id == SourceModel.id) & (FileMetadataModel.is_deleted == False)) + .where(SourceModel.organization_id == actor.organization_id) + .where(SourceModel.is_deleted == False) + .group_by(SourceModel.id, SourceModel.name) + .order_by(SourceModel.name) + ) + + result = await session.execute(query) + source_aggregations = result.fetchall() + + # Build response + metadata = OrganizationSourcesStats() + + for row in source_aggregations: + source_id, source_name, file_count, total_size = row + + # Get individual file details for this source + files_query = ( + select(FileMetadataModel.id, FileMetadataModel.file_name, FileMetadataModel.file_size) + .where( + FileMetadataModel.source_id == source_id, + FileMetadataModel.organization_id == actor.organization_id, + FileMetadataModel.is_deleted == False, + ) + .order_by(FileMetadataModel.file_name) + ) + + files_result = await session.execute(files_query) + files_rows = files_result.fetchall() + + # Build file stats + files = [FileStats(file_id=file_row[0], file_name=file_row[1], file_size=file_row[2]) for file_row in files_rows] + + # Build source metadata + source_metadata = SourceStats( + source_id=source_id, source_name=source_name, file_count=file_count, total_size=total_size, files=files + ) + + metadata.sources.append(source_metadata) + metadata.total_files += file_count + metadata.total_size += total_size + + metadata.total_sources = len(metadata.sources) + return metadata diff --git a/letta/services/file_processor/chunker/line_chunker.py b/letta/services/file_processor/chunker/line_chunker.py index 3b49a43f..c06f024b 100644 --- a/letta/services/file_processor/chunker/line_chunker.py +++ b/letta/services/file_processor/chunker/line_chunker.py @@ -99,7 +99,12 @@ class LineChunker: return [line for line in lines if line.strip()] def chunk_text( - self, file_metadata: FileMetadata, start: Optional[int] = None, end: Optional[int] = None, add_metadata: bool = True + self, + file_metadata: FileMetadata, + start: Optional[int] = None, + end: Optional[int] = None, + add_metadata: bool = True, + validate_range: bool = False, ) -> List[str]: """Content-aware text chunking based on file type""" strategy = self._determine_chunking_strategy(file_metadata) @@ -116,11 +121,31 @@ class LineChunker: content_lines = self._chunk_by_lines(text, preserve_indentation=False) total_chunks = len(content_lines) + chunk_type = ( + "sentences" if strategy == ChunkingStrategy.DOCUMENTATION else "chunks" if strategy == ChunkingStrategy.PROSE else "lines" + ) + + # Validate range if requested + if validate_range and (start is not None or end is not None): + if start is not None and start >= total_chunks: + # Convert to 1-indexed for user-friendly error message + start_display = start + 1 + raise ValueError( + f"File {file_metadata.file_name} has only {total_chunks} lines, but requested offset {start_display} is out of range" + ) + + if start is not None and end is not None and end > total_chunks: + # Convert to 1-indexed for user-friendly error message + start_display = start + 1 + end_display = end + raise ValueError( + f"File {file_metadata.file_name} has only {total_chunks} lines, but requested range {start_display} to {end_display} extends beyond file bounds" + ) # Handle start/end slicing - if start is not None and end is not None: + if start is not None or end is not None: content_lines = content_lines[start:end] - line_offset = start + line_offset = start if start is not None else 0 else: line_offset = 0 @@ -129,14 +154,15 @@ class LineChunker: # Add metadata about total chunks if add_metadata: - chunk_type = ( - "sentences" if strategy == ChunkingStrategy.DOCUMENTATION else "chunks" if strategy == ChunkingStrategy.PROSE else "lines" - ) if start is not None and end is not None: # Display 1-indexed ranges for users start_display = start + 1 end_display = end content_lines.insert(0, f"[Viewing {chunk_type} {start_display} to {end_display} (out of {total_chunks} {chunk_type})]") + elif start is not None: + # Only start specified - viewing from start to end + start_display = start + 1 + content_lines.insert(0, f"[Viewing {chunk_type} {start_display} to end (out of {total_chunks} {chunk_type})]") else: content_lines.insert(0, f"[Viewing file start (out of {total_chunks} {chunk_type})]") diff --git a/letta/services/file_processor/chunker/llama_index_chunker.py b/letta/services/file_processor/chunker/llama_index_chunker.py index e62b9ceb..ab6ea4a6 100644 --- a/letta/services/file_processor/chunker/llama_index_chunker.py +++ b/letta/services/file_processor/chunker/llama_index_chunker.py @@ -1,119 +1,169 @@ -from typing import List, Tuple +from typing import List, Optional, Union from mistralai import OCRPageObject from letta.log import get_logger from letta.otel.tracing import trace_method +from letta.services.file_processor.file_types import ChunkingStrategy, file_type_registry logger = get_logger(__name__) class LlamaIndexChunker: - """LlamaIndex-based text chunking""" + """LlamaIndex-based text chunking with automatic splitter selection""" - def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50): + # Conservative default chunk sizes for fallback scenarios + DEFAULT_CONSERVATIVE_CHUNK_SIZE = 384 + DEFAULT_CONSERVATIVE_CHUNK_OVERLAP = 25 + + def __init__(self, chunk_size: int = 512, chunk_overlap: int = 50, file_type: Optional[str] = None): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap + self.file_type = file_type - from llama_index.core.node_parser import SentenceSplitter + # Create appropriate parser based on file type + self.parser = self._create_parser_for_file_type(file_type, chunk_size, chunk_overlap) - self.parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + # Log which parser was selected + parser_name = type(self.parser).__name__ + logger.info(f"LlamaIndexChunker initialized with {parser_name} for file type: {file_type}") + + def _create_parser_for_file_type(self, file_type: Optional[str], chunk_size: int, chunk_overlap: int): + """Create appropriate parser based on file type""" + if not file_type: + # Default fallback + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + try: + # Get chunking strategy from file type registry + chunking_strategy = file_type_registry.get_chunking_strategy_by_mime_type(file_type) + logger.debug(f"Chunking strategy for {file_type}: {chunking_strategy}") + + if chunking_strategy == ChunkingStrategy.CODE: + from llama_index.core.node_parser import CodeSplitter + + return CodeSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + elif chunking_strategy == ChunkingStrategy.DOCUMENTATION: + if file_type in ["text/markdown", "text/x-markdown"]: + from llama_index.core.node_parser import MarkdownNodeParser + + return MarkdownNodeParser() + elif file_type in ["text/html"]: + from llama_index.core.node_parser import HTMLNodeParser + + return HTMLNodeParser() + else: + # Fall back to sentence splitter for other documentation + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + elif chunking_strategy == ChunkingStrategy.STRUCTURED_DATA: + if file_type in ["application/json", "application/jsonl"]: + from llama_index.core.node_parser import JSONNodeParser + + return JSONNodeParser() + else: + # Fall back to sentence splitter for other structured data + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + else: + # Default to sentence splitter for PROSE and LINE_BASED + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + + except Exception as e: + logger.warning(f"Failed to create specialized parser for {file_type}: {str(e)}. Using default SentenceSplitter.") + from llama_index.core.node_parser import SentenceSplitter + + return SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - # TODO: Make this more general beyond Mistral @trace_method - def chunk_text(self, page: OCRPageObject) -> List[str]: + def chunk_text(self, content: Union[OCRPageObject, str]) -> List[str]: """Chunk text using LlamaIndex splitter""" try: - return self.parser.split_text(page.markdown) + # Handle different input types + if isinstance(content, OCRPageObject): + # Extract markdown from OCR page object + text_content = content.markdown + else: + # Assume it's a string + text_content = content + + # Use the selected parser + if hasattr(self.parser, "split_text"): + # Most parsers have split_text method + return self.parser.split_text(text_content) + elif hasattr(self.parser, "get_nodes_from_documents"): + # Some parsers need Document objects + from llama_index.core import Document + from llama_index.core.node_parser import SentenceSplitter + + document = Document(text=text_content) + nodes = self.parser.get_nodes_from_documents([document]) + + # Further split nodes that exceed chunk_size using SentenceSplitter + final_chunks = [] + sentence_splitter = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap) + + for node in nodes: + if len(node.text) > self.chunk_size: + # Split oversized nodes with sentence splitter + sub_chunks = sentence_splitter.split_text(node.text) + final_chunks.extend(sub_chunks) + else: + final_chunks.append(node.text) + + return final_chunks + else: + # Fallback - try to call the parser directly + return self.parser(text_content) except Exception as e: - logger.error(f"Chunking failed: {str(e)}") - raise + logger.error(f"Chunking failed with {type(self.parser).__name__}: {str(e)}") + # Try fallback with SentenceSplitter + try: + logger.info("Attempting fallback to SentenceSplitter") + from llama_index.core.node_parser import SentenceSplitter + fallback_parser = SentenceSplitter(chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap) -class MarkdownChunker: - """Markdown-specific chunker that preserves line numbers for citation purposes""" + # Extract text content if needed + if isinstance(content, OCRPageObject): + text_content = content.markdown + else: + text_content = content - def __init__(self, chunk_size: int = 2048): - self.chunk_size = chunk_size - # No overlap for line-based citations to avoid ambiguity + return fallback_parser.split_text(text_content) + except Exception as fallback_error: + logger.error(f"Fallback chunking also failed: {str(fallback_error)}") + raise e # Raise the original error - from llama_index.core.node_parser import MarkdownNodeParser - - self.parser = MarkdownNodeParser() - - def chunk_markdown_with_line_numbers(self, markdown_content: str) -> List[Tuple[str, int, int]]: - """ - Chunk markdown content while preserving line number mappings. - - Returns: - List of tuples: (chunk_text, start_line, end_line) - """ + @trace_method + def default_chunk_text(self, content: Union[OCRPageObject, str], chunk_size: int = None, chunk_overlap: int = None) -> List[str]: + """Chunk text using default SentenceSplitter regardless of file type with conservative defaults""" try: - # Split content into lines for line number tracking - lines = markdown_content.split("\n") + from llama_index.core.node_parser import SentenceSplitter - # Create nodes using MarkdownNodeParser - from llama_index.core import Document + # Use provided defaults or fallback to conservative values + chunk_size = chunk_size if chunk_size is not None else self.DEFAULT_CONSERVATIVE_CHUNK_SIZE + chunk_overlap = chunk_overlap if chunk_overlap is not None else self.DEFAULT_CONSERVATIVE_CHUNK_OVERLAP + default_parser = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - document = Document(text=markdown_content) - nodes = self.parser.get_nodes_from_documents([document]) + # Handle different input types + if isinstance(content, OCRPageObject): + text_content = content.markdown + else: + text_content = content - chunks_with_line_numbers = [] - - for node in nodes: - chunk_text = node.text - - # Find the line numbers for this chunk - start_line, end_line = self._find_line_numbers(chunk_text, lines) - - chunks_with_line_numbers.append((chunk_text, start_line, end_line)) - - return chunks_with_line_numbers + return default_parser.split_text(text_content) except Exception as e: - logger.error(f"Markdown chunking failed: {str(e)}") - # Fallback to simple line-based chunking - return self._fallback_line_chunking(markdown_content) - - def _find_line_numbers(self, chunk_text: str, lines: List[str]) -> Tuple[int, int]: - """Find the start and end line numbers for a given chunk of text.""" - chunk_lines = chunk_text.split("\n") - - # Find the first line of the chunk in the original document - start_line = 1 - for i, line in enumerate(lines): - if chunk_lines[0].strip() in line.strip() and len(chunk_lines[0].strip()) > 10: # Avoid matching short lines - start_line = i + 1 - break - - # Calculate end line - end_line = start_line + len(chunk_lines) - 1 - - return start_line, min(end_line, len(lines)) - - def _fallback_line_chunking(self, markdown_content: str) -> List[Tuple[str, int, int]]: - """Fallback chunking method that simply splits by lines with no overlap.""" - lines = markdown_content.split("\n") - chunks = [] - - i = 0 - while i < len(lines): - chunk_lines = [] - start_line = i + 1 - char_count = 0 - - # Build chunk until we hit size limit - while i < len(lines) and char_count < self.chunk_size: - line = lines[i] - chunk_lines.append(line) - char_count += len(line) + 1 # +1 for newline - i += 1 - - end_line = i - chunk_text = "\n".join(chunk_lines) - chunks.append((chunk_text, start_line, end_line)) - - # No overlap - continue from where we left off - - return chunks + logger.error(f"Default chunking failed: {str(e)}") + raise diff --git a/letta/services/file_processor/embedder/openai_embedder.py b/letta/services/file_processor/embedder/openai_embedder.py index ce43a72d..5a888549 100644 --- a/letta/services/file_processor/embedder/openai_embedder.py +++ b/letta/services/file_processor/embedder/openai_embedder.py @@ -91,7 +91,7 @@ class OpenAIEmbedder(BaseEmbedder): try: return await self._embed_batch(batch, indices) except Exception as e: - logger.error(f"Failed to embed batch of size {len(batch)}: {str(e)}") + logger.error("Failed to embed batch of size %s: %s", len(batch), e) log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__}) raise diff --git a/letta/services/file_processor/file_processor.py b/letta/services/file_processor/file_processor.py index cbf70b1c..07b324c9 100644 --- a/letta/services/file_processor/file_processor.py +++ b/letta/services/file_processor/file_processor.py @@ -26,14 +26,12 @@ class FileProcessor: def __init__( self, file_parser: MistralFileParser, - text_chunker: LlamaIndexChunker, embedder: BaseEmbedder, actor: User, using_pinecone: bool, max_file_size: int = 50 * 1024 * 1024, # 50MB default ): self.file_parser = file_parser - self.text_chunker = text_chunker self.line_chunker = LineChunker() self.embedder = embedder self.max_file_size = max_file_size @@ -44,6 +42,61 @@ class FileProcessor: self.actor = actor self.using_pinecone = using_pinecone + async def _chunk_and_embed_with_fallback(self, file_metadata: FileMetadata, ocr_response, source_id: str) -> List: + """Chunk text and generate embeddings with fallback to default chunker if needed""" + filename = file_metadata.file_name + + # Create file-type-specific chunker + text_chunker = LlamaIndexChunker(file_type=file_metadata.file_type) + + # First attempt with file-specific chunker + try: + all_chunks = [] + for page in ocr_response.pages: + chunks = text_chunker.chunk_text(page) + if not chunks: + log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}) + raise ValueError("No chunks created from text") + all_chunks.extend(chunks) + + all_passages = await self.embedder.generate_embedded_passages( + file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor + ) + return all_passages + + except Exception as e: + logger.warning(f"Failed to chunk/embed with file-specific chunker for {filename}: {str(e)}. Retrying with default chunker.") + log_event("file_processor.embedding_failed_retrying", {"filename": filename, "error": str(e), "error_type": type(e).__name__}) + + # Retry with default chunker + try: + logger.info(f"Retrying chunking with default SentenceSplitter for {filename}") + all_chunks = [] + + for page in ocr_response.pages: + chunks = text_chunker.default_chunk_text(page) + if not chunks: + log_event( + "file_processor.default_chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)} + ) + raise ValueError("No chunks created from text with default chunker") + all_chunks.extend(chunks) + + all_passages = await self.embedder.generate_embedded_passages( + file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor + ) + logger.info(f"Successfully generated passages with default chunker for {filename}") + log_event("file_processor.default_chunking_success", {"filename": filename, "total_chunks": len(all_chunks)}) + return all_passages + + except Exception as fallback_error: + logger.error("Default chunking also failed for %s: %s", filename, fallback_error) + log_event( + "file_processor.default_chunking_also_failed", + {"filename": filename, "fallback_error": str(fallback_error), "fallback_error_type": type(fallback_error).__name__}, + ) + raise fallback_error + # TODO: Factor this function out of SyncServer @trace_method async def process( @@ -111,19 +164,10 @@ class FileProcessor: logger.info("Chunking extracted text") log_event("file_processor.chunking_started", {"filename": filename, "pages_to_process": len(ocr_response.pages)}) - all_chunks = [] - for page in ocr_response.pages: - chunks = self.text_chunker.chunk_text(page) - - if not chunks: - log_event("file_processor.chunking_failed", {"filename": filename, "page_index": ocr_response.pages.index(page)}) - raise ValueError("No chunks created from text") - - all_chunks.extend(self.text_chunker.chunk_text(page)) - - all_passages = await self.embedder.generate_embedded_passages( - file_id=file_metadata.id, source_id=source_id, chunks=all_chunks, actor=self.actor + # Chunk and embed with fallback logic + all_passages = await self._chunk_and_embed_with_fallback( + file_metadata=file_metadata, ocr_response=ocr_response, source_id=source_id ) if not self.using_pinecone: @@ -156,7 +200,7 @@ class FileProcessor: return all_passages except Exception as e: - logger.error(f"File processing failed for {filename}: {str(e)}") + logger.error("File processing failed for %s: %s", filename, e) log_event( "file_processor.processing_failed", { diff --git a/letta/services/tool_executor/files_tool_executor.py b/letta/services/tool_executor/files_tool_executor.py index 1de53724..4815243a 100644 --- a/letta/services/tool_executor/files_tool_executor.py +++ b/letta/services/tool_executor/files_tool_executor.py @@ -175,7 +175,7 @@ class LettaFileToolExecutor(ToolExecutor): file = await self.file_manager.get_file_by_id(file_id=file_id, actor=self.actor, include_content=True) # Process file content - content_lines = LineChunker().chunk_text(file_metadata=file, start=start, end=end) + content_lines = LineChunker().chunk_text(file_metadata=file, start=start, end=end, validate_range=True) visible_content = "\n".join(content_lines) # Handle LRU eviction and file opening diff --git a/letta/settings.py b/letta/settings.py index fb2751c3..7a86605e 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -247,7 +247,7 @@ class Settings(BaseSettings): # cron job parameters enable_batch_job_polling: bool = False poll_running_llm_batches_interval_seconds: int = 5 * 60 - poll_lock_retry_interval_seconds: int = 5 * 60 + poll_lock_retry_interval_seconds: int = 8 * 60 batch_job_polling_lookback_weeks: int = 2 batch_job_polling_batch_size: Optional[int] = None diff --git a/tests/test_managers.py b/tests/test_managers.py index 9edc8af3..8b88000a 100644 --- a/tests/test_managers.py +++ b/tests/test_managers.py @@ -5299,6 +5299,95 @@ async def test_upsert_file_content_basic(server: SyncServer, default_user, defau assert orm_file.updated_at > orm_file.created_at +@pytest.mark.asyncio +async def test_get_organization_sources_metadata(server, default_user): + """Test getting organization sources metadata with aggregated file information.""" + # Create test sources + source1 = await server.source_manager.create_source( + source=PydanticSource( + name="test_source_1", + embedding_config=DEFAULT_EMBEDDING_CONFIG, + ), + actor=default_user, + ) + + source2 = await server.source_manager.create_source( + source=PydanticSource( + name="test_source_2", + embedding_config=DEFAULT_EMBEDDING_CONFIG, + ), + actor=default_user, + ) + + # Create test files for source1 + file1_meta = PydanticFileMetadata( + source_id=source1.id, + file_name="file1.txt", + file_type="text/plain", + file_size=1024, + ) + file1 = await server.file_manager.create_file(file_metadata=file1_meta, actor=default_user) + + file2_meta = PydanticFileMetadata( + source_id=source1.id, + file_name="file2.txt", + file_type="text/plain", + file_size=2048, + ) + file2 = await server.file_manager.create_file(file_metadata=file2_meta, actor=default_user) + + # Create test file for source2 + file3_meta = PydanticFileMetadata( + source_id=source2.id, + file_name="file3.txt", + file_type="text/plain", + file_size=512, + ) + file3 = await server.file_manager.create_file(file_metadata=file3_meta, actor=default_user) + + # Get organization metadata + metadata = await server.file_manager.get_organization_sources_metadata(actor=default_user) + + # Verify top-level aggregations + assert metadata.total_sources >= 2 # May have other sources from other tests + assert metadata.total_files >= 3 + assert metadata.total_size >= 3584 + + # Find our test sources in the results + source1_meta = next((s for s in metadata.sources if s.source_id == source1.id), None) + source2_meta = next((s for s in metadata.sources if s.source_id == source2.id), None) + + assert source1_meta is not None + assert source1_meta.source_name == "test_source_1" + assert source1_meta.file_count == 2 + assert source1_meta.total_size == 3072 # 1024 + 2048 + assert len(source1_meta.files) == 2 + + # Verify file details in source1 + file1_stats = next((f for f in source1_meta.files if f.file_id == file1.id), None) + file2_stats = next((f for f in source1_meta.files if f.file_id == file2.id), None) + + assert file1_stats is not None + assert file1_stats.file_name == "file1.txt" + assert file1_stats.file_size == 1024 + + assert file2_stats is not None + assert file2_stats.file_name == "file2.txt" + assert file2_stats.file_size == 2048 + + assert source2_meta is not None + assert source2_meta.source_name == "test_source_2" + assert source2_meta.file_count == 1 + assert source2_meta.total_size == 512 + assert len(source2_meta.files) == 1 + + # Verify file details in source2 + file3_stats = source2_meta.files[0] + assert file3_stats.file_id == file3.id + assert file3_stats.file_name == "file3.txt" + assert file3_stats.file_size == 512 + + # ====================================================================================================================== # SandboxConfigManager Tests - Sandbox Configs # ====================================================================================================================== diff --git a/tests/test_utils.py b/tests/test_utils.py index 1af23e62..5b0e724c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,6 +2,8 @@ import pytest from letta.constants import MAX_FILENAME_LENGTH from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_function_annotations_from_source +from letta.schemas.file import FileMetadata +from letta.services.file_processor.chunker.line_chunker import LineChunker from letta.services.helpers.agent_manager_helper import safe_format from letta.utils import sanitize_filename @@ -407,3 +409,103 @@ def test_formatter(): """ assert UNUSED_AND_EMPRY_VAR_SOL == safe_format(UNUSED_AND_EMPRY_VAR, VARS_DICT) + + +# ---------------------- LineChunker TESTS ---------------------- # + + +def test_line_chunker_valid_range(): + """Test that LineChunker works correctly with valid ranges""" + file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3\nline4") + chunker = LineChunker() + + # Test valid range with validation + result = chunker.chunk_text(file, start=1, end=3, validate_range=True) + # Should return lines 2 and 3 (0-indexed 1:3) + assert "[Viewing lines 2 to 3 (out of 4 lines)]" in result[0] + assert "2: line2" in result[1] + assert "3: line3" in result[2] + + +def test_line_chunker_valid_range_no_validation(): + """Test that LineChunker works the same without validation for valid ranges""" + file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3\nline4") + chunker = LineChunker() + + # Test same range without validation + result = chunker.chunk_text(file, start=1, end=3, validate_range=False) + assert "[Viewing lines 2 to 3 (out of 4 lines)]" in result[0] + assert "2: line2" in result[1] + assert "3: line3" in result[2] + + +def test_line_chunker_out_of_range_start(): + """Test that LineChunker throws error when start is out of range""" + file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3") + chunker = LineChunker() + + # Test with start beyond file length (3 lines, requesting start=5 which is 0-indexed 4) + with pytest.raises(ValueError, match="File test.py has only 3 lines, but requested offset 6 is out of range"): + chunker.chunk_text(file, start=5, end=6, validate_range=True) + + +def test_line_chunker_out_of_range_end(): + """Test that LineChunker throws error when end extends beyond file bounds""" + file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3") + chunker = LineChunker() + + # Test with end beyond file length (3 lines, requesting 1 to 10) + with pytest.raises(ValueError, match="File test.py has only 3 lines, but requested range 1 to 10 extends beyond file bounds"): + chunker.chunk_text(file, start=0, end=10, validate_range=True) + + +def test_line_chunker_edge_case_empty_file(): + """Test that LineChunker handles empty files correctly""" + file = FileMetadata(file_name="empty.py", source_id="test_source", content="") + chunker = LineChunker() + + # Test requesting lines from empty file + with pytest.raises(ValueError, match="File empty.py has only 0 lines, but requested offset 1 is out of range"): + chunker.chunk_text(file, start=0, end=1, validate_range=True) + + +def test_line_chunker_edge_case_single_line(): + """Test that LineChunker handles single line files correctly""" + file = FileMetadata(file_name="single.py", source_id="test_source", content="only line") + chunker = LineChunker() + + # Test valid single line access + result = chunker.chunk_text(file, start=0, end=1, validate_range=True) + assert "1: only line" in result[1] + + # Test out of range for single line file + with pytest.raises(ValueError, match="File single.py has only 1 lines, but requested offset 2 is out of range"): + chunker.chunk_text(file, start=1, end=2, validate_range=True) + + +def test_line_chunker_validation_disabled_allows_out_of_range(): + """Test that when validation is disabled, out of range silently returns partial results""" + file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3") + chunker = LineChunker() + + # Test with validation disabled - should not raise error + result = chunker.chunk_text(file, start=5, end=10, validate_range=False) + # Should return empty content (except metadata header) since slice is out of bounds + assert len(result) == 1 # Only metadata header + assert "[Viewing lines 6 to 10 (out of 3 lines)]" in result[0] + + +def test_line_chunker_only_start_parameter(): + """Test validation with only start parameter specified""" + file = FileMetadata(file_name="test.py", source_id="test_source", content="line1\nline2\nline3") + chunker = LineChunker() + + # Test valid start only + result = chunker.chunk_text(file, start=1, validate_range=True) + assert "[Viewing lines 2 to end (out of 3 lines)]" in result[0] + assert "2: line2" in result[1] + assert "3: line3" in result[2] + + # Test invalid start only + with pytest.raises(ValueError, match="File test.py has only 3 lines, but requested offset 4 is out of range"): + chunker.chunk_text(file, start=3, validate_range=True)