feat: Add endpoint to get source statistics (#3206)
This commit is contained in:
32
letta/schemas/source_metadata.py
Normal file
32
letta/schemas/source_metadata.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from typing import List, Optional
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
from letta.schemas.letta_base import LettaBase
|
||||
|
||||
|
||||
class FileStats(LettaBase):
|
||||
"""File statistics for metadata endpoint"""
|
||||
|
||||
file_id: str = Field(..., description="Unique identifier of the file")
|
||||
file_name: str = Field(..., description="Name of the file")
|
||||
file_size: Optional[int] = Field(None, description="Size of the file in bytes")
|
||||
|
||||
|
||||
class SourceStats(LettaBase):
|
||||
"""Aggregated metadata for a source"""
|
||||
|
||||
source_id: str = Field(..., description="Unique identifier of the source")
|
||||
source_name: str = Field(..., description="Name of the source")
|
||||
file_count: int = Field(0, description="Number of files in the source")
|
||||
total_size: int = Field(0, description="Total size of all files in bytes")
|
||||
files: List[FileStats] = Field(default_factory=list, description="List of file statistics")
|
||||
|
||||
|
||||
class OrganizationSourcesStats(LettaBase):
|
||||
"""Complete metadata response for organization sources"""
|
||||
|
||||
total_sources: int = Field(0, description="Total number of sources")
|
||||
total_files: int = Field(0, description="Total number of files across all sources")
|
||||
total_size: int = Field(0, description="Total size of all files in bytes")
|
||||
sources: List[SourceStats] = Field(default_factory=list, description="List of source metadata")
|
||||
@@ -23,6 +23,7 @@ from letta.schemas.enums import FileProcessingStatus
|
||||
from letta.schemas.file import FileMetadata
|
||||
from letta.schemas.passage import Passage
|
||||
from letta.schemas.source import Source, SourceCreate, SourceUpdate
|
||||
from letta.schemas.source_metadata import OrganizationSourcesStats
|
||||
from letta.schemas.user import User
|
||||
from letta.server.rest_api.utils import get_letta_server
|
||||
from letta.server.server import SyncServer
|
||||
@@ -94,6 +95,24 @@ async def get_source_id_by_name(
|
||||
return source.id
|
||||
|
||||
|
||||
@router.get("/metadata", response_model=OrganizationSourcesStats, operation_id="get_sources_metadata")
|
||||
async def get_sources_metadata(
|
||||
server: "SyncServer" = Depends(get_letta_server),
|
||||
actor_id: Optional[str] = Header(None, alias="user_id"),
|
||||
):
|
||||
"""
|
||||
Get aggregated metadata for all sources in an organization.
|
||||
|
||||
Returns structured metadata including:
|
||||
- Total number of sources
|
||||
- Total number of files across all sources
|
||||
- Total size of all files
|
||||
- Per-source breakdown with file details (file_name, file_size per file)
|
||||
"""
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
|
||||
return await server.file_manager.get_organization_sources_metadata(actor=actor)
|
||||
|
||||
|
||||
@router.get("/", response_model=List[Source], operation_id="list_sources")
|
||||
async def list_sources(
|
||||
server: "SyncServer" = Depends(get_letta_server),
|
||||
|
||||
@@ -16,6 +16,7 @@ from letta.otel.tracing import trace_method
|
||||
from letta.schemas.enums import FileProcessingStatus
|
||||
from letta.schemas.file import FileMetadata as PydanticFileMetadata
|
||||
from letta.schemas.source import Source as PydanticSource
|
||||
from letta.schemas.source_metadata import FileStats, OrganizationSourcesStats, SourceStats
|
||||
from letta.schemas.user import User as PydanticUser
|
||||
from letta.server.db import db_registry
|
||||
from letta.utils import enforce_types
|
||||
@@ -272,3 +273,72 @@ class FileManager:
|
||||
else:
|
||||
# Add numeric suffix
|
||||
return f"{source.name}/{base}_({count}){ext}"
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def get_organization_sources_metadata(self, actor: PydanticUser) -> OrganizationSourcesStats:
|
||||
"""
|
||||
Get aggregated metadata for all sources in an organization with optimized queries.
|
||||
|
||||
Returns structured metadata including:
|
||||
- Total number of sources
|
||||
- Total number of files across all sources
|
||||
- Total size of all files
|
||||
- Per-source breakdown with file details
|
||||
"""
|
||||
async with db_registry.async_session() as session:
|
||||
# Import here to avoid circular imports
|
||||
from letta.orm.source import Source as SourceModel
|
||||
|
||||
# Single optimized query to get all sources with their file aggregations
|
||||
query = (
|
||||
select(
|
||||
SourceModel.id,
|
||||
SourceModel.name,
|
||||
func.count(FileMetadataModel.id).label("file_count"),
|
||||
func.coalesce(func.sum(FileMetadataModel.file_size), 0).label("total_size"),
|
||||
)
|
||||
.outerjoin(FileMetadataModel, (FileMetadataModel.source_id == SourceModel.id) & (FileMetadataModel.is_deleted == False))
|
||||
.where(SourceModel.organization_id == actor.organization_id)
|
||||
.where(SourceModel.is_deleted == False)
|
||||
.group_by(SourceModel.id, SourceModel.name)
|
||||
.order_by(SourceModel.name)
|
||||
)
|
||||
|
||||
result = await session.execute(query)
|
||||
source_aggregations = result.fetchall()
|
||||
|
||||
# Build response
|
||||
metadata = OrganizationSourcesStats()
|
||||
|
||||
for row in source_aggregations:
|
||||
source_id, source_name, file_count, total_size = row
|
||||
|
||||
# Get individual file details for this source
|
||||
files_query = (
|
||||
select(FileMetadataModel.id, FileMetadataModel.file_name, FileMetadataModel.file_size)
|
||||
.where(
|
||||
FileMetadataModel.source_id == source_id,
|
||||
FileMetadataModel.organization_id == actor.organization_id,
|
||||
FileMetadataModel.is_deleted == False,
|
||||
)
|
||||
.order_by(FileMetadataModel.file_name)
|
||||
)
|
||||
|
||||
files_result = await session.execute(files_query)
|
||||
files_rows = files_result.fetchall()
|
||||
|
||||
# Build file stats
|
||||
files = [FileStats(file_id=file_row[0], file_name=file_row[1], file_size=file_row[2]) for file_row in files_rows]
|
||||
|
||||
# Build source metadata
|
||||
source_metadata = SourceStats(
|
||||
source_id=source_id, source_name=source_name, file_count=file_count, total_size=total_size, files=files
|
||||
)
|
||||
|
||||
metadata.sources.append(source_metadata)
|
||||
metadata.total_files += file_count
|
||||
metadata.total_size += total_size
|
||||
|
||||
metadata.total_sources = len(metadata.sources)
|
||||
return metadata
|
||||
|
||||
@@ -5299,6 +5299,95 @@ async def test_upsert_file_content_basic(server: SyncServer, default_user, defau
|
||||
assert orm_file.updated_at > orm_file.created_at
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_organization_sources_metadata(server, default_user):
|
||||
"""Test getting organization sources metadata with aggregated file information."""
|
||||
# Create test sources
|
||||
source1 = await server.source_manager.create_source(
|
||||
source=PydanticSource(
|
||||
name="test_source_1",
|
||||
embedding_config=DEFAULT_EMBEDDING_CONFIG,
|
||||
),
|
||||
actor=default_user,
|
||||
)
|
||||
|
||||
source2 = await server.source_manager.create_source(
|
||||
source=PydanticSource(
|
||||
name="test_source_2",
|
||||
embedding_config=DEFAULT_EMBEDDING_CONFIG,
|
||||
),
|
||||
actor=default_user,
|
||||
)
|
||||
|
||||
# Create test files for source1
|
||||
file1_meta = PydanticFileMetadata(
|
||||
source_id=source1.id,
|
||||
file_name="file1.txt",
|
||||
file_type="text/plain",
|
||||
file_size=1024,
|
||||
)
|
||||
file1 = await server.file_manager.create_file(file_metadata=file1_meta, actor=default_user)
|
||||
|
||||
file2_meta = PydanticFileMetadata(
|
||||
source_id=source1.id,
|
||||
file_name="file2.txt",
|
||||
file_type="text/plain",
|
||||
file_size=2048,
|
||||
)
|
||||
file2 = await server.file_manager.create_file(file_metadata=file2_meta, actor=default_user)
|
||||
|
||||
# Create test file for source2
|
||||
file3_meta = PydanticFileMetadata(
|
||||
source_id=source2.id,
|
||||
file_name="file3.txt",
|
||||
file_type="text/plain",
|
||||
file_size=512,
|
||||
)
|
||||
file3 = await server.file_manager.create_file(file_metadata=file3_meta, actor=default_user)
|
||||
|
||||
# Get organization metadata
|
||||
metadata = await server.file_manager.get_organization_sources_metadata(actor=default_user)
|
||||
|
||||
# Verify top-level aggregations
|
||||
assert metadata.total_sources >= 2 # May have other sources from other tests
|
||||
assert metadata.total_files >= 3
|
||||
assert metadata.total_size >= 3584
|
||||
|
||||
# Find our test sources in the results
|
||||
source1_meta = next((s for s in metadata.sources if s.source_id == source1.id), None)
|
||||
source2_meta = next((s for s in metadata.sources if s.source_id == source2.id), None)
|
||||
|
||||
assert source1_meta is not None
|
||||
assert source1_meta.source_name == "test_source_1"
|
||||
assert source1_meta.file_count == 2
|
||||
assert source1_meta.total_size == 3072 # 1024 + 2048
|
||||
assert len(source1_meta.files) == 2
|
||||
|
||||
# Verify file details in source1
|
||||
file1_stats = next((f for f in source1_meta.files if f.file_id == file1.id), None)
|
||||
file2_stats = next((f for f in source1_meta.files if f.file_id == file2.id), None)
|
||||
|
||||
assert file1_stats is not None
|
||||
assert file1_stats.file_name == "file1.txt"
|
||||
assert file1_stats.file_size == 1024
|
||||
|
||||
assert file2_stats is not None
|
||||
assert file2_stats.file_name == "file2.txt"
|
||||
assert file2_stats.file_size == 2048
|
||||
|
||||
assert source2_meta is not None
|
||||
assert source2_meta.source_name == "test_source_2"
|
||||
assert source2_meta.file_count == 1
|
||||
assert source2_meta.total_size == 512
|
||||
assert len(source2_meta.files) == 1
|
||||
|
||||
# Verify file details in source2
|
||||
file3_stats = source2_meta.files[0]
|
||||
assert file3_stats.file_id == file3.id
|
||||
assert file3_stats.file_name == "file3.txt"
|
||||
assert file3_stats.file_size == 512
|
||||
|
||||
|
||||
# ======================================================================================================================
|
||||
# SandboxConfigManager Tests - Sandbox Configs
|
||||
# ======================================================================================================================
|
||||
|
||||
Reference in New Issue
Block a user