diff --git a/fern/openapi.json b/fern/openapi.json index 29580dd3..855976dc 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -553,6 +553,67 @@ } } }, + "/v1/archives/{archive_id}/passages/batch": { + "post": { + "tags": ["archives"], + "summary": "Create Passages In Archive", + "description": "Create multiple passages in an archive.\n\nThis adds passages to the archive and creates embeddings for vector storage.", + "operationId": "create_passages_in_archive", + "parameters": [ + { + "name": "archive_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "minLength": 44, + "maxLength": 44, + "pattern": "^archive-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", + "description": "The ID of the archive in the format 'archive-'", + "examples": ["archive-123e4567-e89b-42d3-8456-426614174000"], + "title": "Archive Id" + }, + "description": "The ID of the archive in the format 'archive-'" + } + ], + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PassageBatchCreateRequest" + } + } + } + }, + "responses": { + "200": { + "description": "Successful Response", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Passage" + }, + "title": "Response Create Passages In Archive" + } + } + } + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } + }, "/v1/archives/{archive_id}/passages/{passage_id}": { "delete": { "tags": ["archives"], @@ -38651,8 +38712,15 @@ "description": "The id of the user that made this object." }, "created_at": { - "type": "string", - "format": "date-time", + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": "null" + } + ], "title": "Created At", "description": "The creation date of the passage." }, @@ -38798,6 +38866,22 @@ "title": "Passage", "description": "Representation of a passage, which is stored in archival memory." }, + "PassageBatchCreateRequest": { + "properties": { + "passages": { + "items": { + "$ref": "#/components/schemas/PassageCreateRequest" + }, + "type": "array", + "title": "Passages", + "description": "Passages to create in the archive" + } + }, + "type": "object", + "required": ["passages"], + "title": "PassageBatchCreateRequest", + "description": "Request model for creating multiple passages in an archive." + }, "PassageCreateRequest": { "properties": { "text": { @@ -38832,6 +38916,18 @@ ], "title": "Tags", "description": "Optional tags for categorizing the passage" + }, + "created_at": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Created At", + "description": "Optional creation datetime for the passage (ISO 8601 format)" } }, "type": "object", @@ -38842,7 +38938,14 @@ "PassageSearchRequest": { "properties": { "query": { - "type": "string", + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], "title": "Query", "description": "Text query for semantic search" }, @@ -38928,7 +39031,6 @@ } }, "type": "object", - "required": ["query"], "title": "PassageSearchRequest", "description": "Request model for searching passages across archives." }, diff --git a/letta/schemas/passage.py b/letta/schemas/passage.py index 39dae6b0..6cb7423b 100644 --- a/letta/schemas/passage.py +++ b/letta/schemas/passage.py @@ -44,7 +44,7 @@ class Passage(PassageBase): embedding: Optional[List[float]] = Field(..., description="The embedding of the passage.") embedding_config: Optional[EmbeddingConfig] = Field(..., description="The embedding configuration used by the passage.") - created_at: datetime = Field(default_factory=get_utc_time, description="The creation date of the passage.") + created_at: Optional[datetime] = Field(default_factory=get_utc_time, description="The creation date of the passage.") @field_validator("embedding", mode="before") @classmethod @@ -83,6 +83,7 @@ class PassageCreate(PassageBase): # optionally provide embeddings embedding: Optional[List[float]] = Field(None, description="The embedding of the passage.") embedding_config: Optional[EmbeddingConfig] = Field(None, description="The embedding configuration used by the passage.") + created_at: Optional[datetime] = Field(None, description="Optional creation datetime for the passage.") class PassageUpdate(PassageCreate): diff --git a/letta/server/rest_api/routers/v1/archives.py b/letta/server/rest_api/routers/v1/archives.py index 34ac92ac..1313bf8d 100644 --- a/letta/server/rest_api/routers/v1/archives.py +++ b/letta/server/rest_api/routers/v1/archives.py @@ -48,6 +48,13 @@ class PassageCreateRequest(BaseModel): text: str = Field(..., description="The text content of the passage") metadata: Optional[Dict] = Field(default=None, description="Optional metadata for the passage") tags: Optional[List[str]] = Field(default=None, description="Optional tags for categorizing the passage") + created_at: Optional[str] = Field(default=None, description="Optional creation datetime for the passage (ISO 8601 format)") + + +class PassageBatchCreateRequest(BaseModel): + """Request model for creating multiple passages in an archive.""" + + passages: List[PassageCreateRequest] = Field(..., description="Passages to create in the archive") @router.post("/", response_model=PydanticArchive, operation_id="create_archive") @@ -225,6 +232,27 @@ async def create_passage_in_archive( text=passage.text, metadata=passage.metadata, tags=passage.tags, + created_at=passage.created_at, + actor=actor, + ) + + +@router.post("/{archive_id}/passages/batch", response_model=List[Passage], operation_id="create_passages_in_archive") +async def create_passages_in_archive( + archive_id: ArchiveId, + payload: PassageBatchCreateRequest = Body(...), + server: "SyncServer" = Depends(get_letta_server), + headers: HeaderParams = Depends(get_headers), +): + """ + Create multiple passages in an archive. + + This adds passages to the archive and creates embeddings for vector storage. + """ + actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + return await server.archive_manager.create_passages_in_archive_async( + archive_id=archive_id, + passages=[passage.model_dump() for passage in payload.passages], actor=actor, ) diff --git a/letta/server/rest_api/routers/v1/passages.py b/letta/server/rest_api/routers/v1/passages.py index cdb1010c..7e7f59c3 100644 --- a/letta/server/rest_api/routers/v1/passages.py +++ b/letta/server/rest_api/routers/v1/passages.py @@ -4,18 +4,62 @@ from typing import List, Literal, Optional from fastapi import APIRouter, Body, Depends from pydantic import BaseModel, Field +from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import TagMatchMode from letta.schemas.passage import Passage +from letta.schemas.user import User as PydanticUser from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server from letta.server.server import SyncServer router = APIRouter(prefix="/passages", tags=["passages"]) +async def _get_embedding_config_for_search( + server: SyncServer, + actor: PydanticUser, + agent_id: Optional[str], + archive_id: Optional[str], +) -> Optional[EmbeddingConfig]: + """Determine which embedding config to use for a passage search. + + Args: + server: The SyncServer instance + actor: The user making the request + agent_id: Optional agent ID to get embedding config from + archive_id: Optional archive ID to get embedding config from + + Returns: + The embedding config to use, or None if not found + + Priority: + 1. If agent_id is provided, use that agent's embedding config + 2. If archive_id is provided, use that archive's embedding config + 3. Otherwise, try to get embedding config from any existing agent + 4. Fall back to server default if no agents exist + """ + if agent_id: + agent_state = await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor) + return agent_state.embedding_config + + if archive_id: + archive = await server.archive_manager.get_archive_by_id_async(archive_id=archive_id, actor=actor) + return archive.embedding_config + + # Search across all passages - try to get embedding config from any agent + agent_count = await server.agent_manager.size_async(actor=actor) + if agent_count > 0: + agents = await server.agent_manager.list_agents_async(actor=actor, limit=1) + if agents: + return agents[0].embedding_config + + # Fall back to server default + return server.default_embedding_config + + class PassageSearchRequest(BaseModel): """Request model for searching passages across archives.""" - query: str = Field(..., description="Text query for semantic search") + query: Optional[str] = Field(None, description="Text query for semantic search") agent_id: Optional[str] = Field(None, description="Filter passages by agent ID") archive_id: Optional[str] = Field(None, description="Filter passages by archive ID") tags: Optional[List[str]] = Field(None, description="Optional list of tags to filter search results") @@ -56,29 +100,16 @@ async def search_passages( # Convert tag_match_mode to enum tag_mode = TagMatchMode.ANY if request.tag_match_mode == "any" else TagMatchMode.ALL - # Determine which embedding config to use + # Determine embedding config (only needed when query text is provided) + embed_query = bool(request.query) embedding_config = None - if request.agent_id: - # Search by agent - agent_state = await server.agent_manager.get_agent_by_id_async(agent_id=request.agent_id, actor=actor) - embedding_config = agent_state.embedding_config - elif request.archive_id: - # Search by archive_id - archive = await server.archive_manager.get_archive_by_id_async(archive_id=request.archive_id, actor=actor) - embedding_config = archive.embedding_config - else: - # Search across all passages in the organization - # Get default embedding config from any agent or use server default - agent_count = await server.agent_manager.size_async(actor=actor) - if agent_count > 0: - # Get first agent to derive embedding config - agents = await server.agent_manager.list_agents_async(actor=actor, limit=1) - if agents: - embedding_config = agents[0].embedding_config - - if not embedding_config: - # Fall back to server default - embedding_config = server.default_embedding_config + if embed_query: + embedding_config = await _get_embedding_config_for_search( + server=server, + actor=actor, + agent_id=request.agent_id, + archive_id=request.archive_id, + ) # Search passages passages_with_metadata = await server.agent_manager.query_agent_passages_async( @@ -88,7 +119,7 @@ async def search_passages( query_text=request.query, limit=request.limit, embedding_config=embedding_config, - embed_query=True, + embed_query=embed_query, tags=request.tags, tag_match_mode=tag_mode, start_date=request.start_date, diff --git a/letta/services/archive_manager.py b/letta/services/archive_manager.py index fd98a56a..28c3322a 100644 --- a/letta/services/archive_manager.py +++ b/letta/services/archive_manager.py @@ -288,6 +288,7 @@ class ArchiveManager: text: str, metadata: Optional[Dict] = None, tags: Optional[List[str]] = None, + created_at: Optional[str] = None, actor: PydanticUser = None, ) -> PydanticPassage: """Create a passage in an archive. @@ -297,6 +298,7 @@ class ArchiveManager: text: The text content of the passage metadata: Optional metadata for the passage tags: Optional tags for categorizing the passage + created_at: Optional creation datetime in ISO 8601 format actor: User performing the operation Returns: @@ -321,7 +323,12 @@ class ArchiveManager: embeddings = await embedding_client.request_embeddings([text], archive.embedding_config) embedding = embeddings[0] if embeddings else None - # Create the passage object (with or without embedding) + # Parse created_at from ISO string if provided + parsed_created_at = None + if created_at: + parsed_created_at = datetime.fromisoformat(created_at) + + # Create the passage object with embedding passage = PydanticPassage( text=text, archive_id=archive_id, @@ -330,6 +337,7 @@ class ArchiveManager: tags=tags, embedding_config=archive.embedding_config, embedding=embedding, + created_at=parsed_created_at, ) # Use PassageManager to create the passage @@ -364,6 +372,92 @@ class ArchiveManager: logger.info(f"Created passage {created_passage.id} in archive {archive_id}") return created_passage + @enforce_types + @raise_on_invalid_id(param_name="archive_id", expected_prefix=PrimitiveType.ARCHIVE) + @trace_method + async def create_passages_in_archive_async( + self, + archive_id: str, + passages: List[Dict], + actor: PydanticUser = None, + ) -> List[PydanticPassage]: + """Create multiple passages in an archive. + + Args: + archive_id: ID of the archive to add the passages to + passages: Passage create payloads + actor: User performing the operation + + Returns: + The created passages + + Raises: + NoResultFound: If archive not found + """ + if not passages: + return [] + + from letta.llm_api.llm_client import LLMClient + from letta.services.passage_manager import PassageManager + + archive = await self.get_archive_by_id_async(archive_id=archive_id, actor=actor) + + texts = [passage["text"] for passage in passages] + embedding_client = LLMClient.create( + provider_type=archive.embedding_config.embedding_endpoint_type, + actor=actor, + ) + embeddings = await embedding_client.request_embeddings(texts, archive.embedding_config) + + if len(embeddings) != len(passages): + raise ValueError("Embedding response count does not match passages count") + + # Build PydanticPassage objects for batch creation + pydantic_passages: List[PydanticPassage] = [] + for passage_payload, embedding in zip(passages, embeddings): + # Parse created_at from ISO string if provided + created_at = passage_payload.get("created_at") + if created_at and isinstance(created_at, str): + created_at = datetime.fromisoformat(created_at) + + passage = PydanticPassage( + text=passage_payload["text"], + archive_id=archive_id, + organization_id=actor.organization_id, + metadata=passage_payload.get("metadata") or {}, + tags=passage_payload.get("tags"), + embedding_config=archive.embedding_config, + embedding=embedding, + created_at=created_at, + ) + pydantic_passages.append(passage) + + # Use batch create for efficient single-transaction insert + passage_manager = PassageManager() + created_passages = await passage_manager.create_agent_passages_async( + pydantic_passages=pydantic_passages, + actor=actor, + ) + + if archive.vector_db_provider == VectorDBProvider.TPUF: + try: + from letta.helpers.tpuf_client import TurbopufferClient + + tpuf_client = TurbopufferClient() + await tpuf_client.insert_archival_memories( + archive_id=archive.id, + text_chunks=[passage.text for passage in created_passages], + passage_ids=[passage.id for passage in created_passages], + organization_id=actor.organization_id, + actor=actor, + ) + logger.info(f"Uploaded {len(created_passages)} passages to Turbopuffer for archive {archive_id}") + except Exception as e: + logger.error(f"Failed to upload passages to Turbopuffer: {e}") + + logger.info(f"Created {len(created_passages)} passages in archive {archive_id}") + return created_passages + @enforce_types @raise_on_invalid_id(param_name="archive_id", expected_prefix=PrimitiveType.ARCHIVE) @raise_on_invalid_id(param_name="passage_id", expected_prefix=PrimitiveType.PASSAGE) diff --git a/letta/services/passage_manager.py b/letta/services/passage_manager.py index c1e6bad3..0a69e70e 100644 --- a/letta/services/passage_manager.py +++ b/letta/services/passage_manager.py @@ -192,6 +192,93 @@ class PassageManager: return passage.to_pydantic() + @enforce_types + @trace_method + async def create_agent_passages_async(self, pydantic_passages: List[PydanticPassage], actor: PydanticUser) -> List[PydanticPassage]: + """Create multiple agent passages in a single database transaction. + + Args: + pydantic_passages: List of passages to create + actor: User performing the operation + + Returns: + List of created passages + """ + if not pydantic_passages: + return [] + + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + use_tpuf = should_use_tpuf() + passage_objects: List[ArchivalPassage] = [] + all_tags_data: List[tuple] = [] # (passage_index, tags) for creating tags after passages are created + + for idx, pydantic_passage in enumerate(pydantic_passages): + if not pydantic_passage.archive_id: + raise ValueError("Agent passage must have archive_id") + if pydantic_passage.source_id: + raise ValueError("Agent passage cannot have source_id") + + data = pydantic_passage.model_dump(to_orm=True) + + # Deduplicate tags if provided (for dual storage consistency) + tags = data.get("tags") + if tags: + tags = list(set(tags)) + all_tags_data.append((idx, tags)) + + # Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB) + embedding = data["embedding"] + if embedding and not use_tpuf: + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist() + + # Sanitize text to remove null bytes which PostgreSQL rejects + text = data["text"] + if text and "\x00" in text: + text = text.replace("\x00", "") + logger.warning(f"Removed null bytes from passage text (length: {len(data['text'])} -> {len(text)})") + + common_fields = { + "id": data.get("id"), + "text": text, + "embedding": embedding, + "embedding_config": data["embedding_config"], + "organization_id": data["organization_id"], + "metadata_": data.get("metadata_", {}), + "tags": tags, + "is_deleted": data.get("is_deleted", False), + "created_at": data.get("created_at", datetime.now(timezone.utc)), + } + agent_fields = {"archive_id": data["archive_id"]} + passage = ArchivalPassage(**common_fields, **agent_fields) + passage_objects.append(passage) + + async with db_registry.async_session() as session: + # Batch create all passages in a single transaction + created_passages = await ArchivalPassage.batch_create_async( + items=passage_objects, + db_session=session, + actor=actor, + ) + + # Create tags for passages that have them + for idx, tags in all_tags_data: + created_passage = created_passages[idx] + await self._create_tags_for_passage( + session=session, + passage_id=created_passage.id, + archive_id=created_passage.archive_id, + organization_id=created_passage.organization_id, + tags=tags, + actor=actor, + ) + + return [p.to_pydantic() for p in created_passages] + @enforce_types @trace_method async def create_source_passage_async( diff --git a/tests/managers/test_passage_manager.py b/tests/managers/test_passage_manager.py index 5adb69a4..e5020dea 100644 --- a/tests/managers/test_passage_manager.py +++ b/tests/managers/test_passage_manager.py @@ -222,6 +222,52 @@ async def test_agent_list_passages_filtering(server, default_user, sarah_agent, assert len(date_filtered) == 5 +@pytest.mark.asyncio +async def test_agent_query_passages_time_only(server, default_user, default_archive, disable_turbopuffer): + """Test querying passages with date filters and no query text.""" + now = datetime.now(timezone.utc) + older_date = now - timedelta(days=2) + newer_date = now - timedelta(hours=2) + + older_passage = await server.passage_manager.create_agent_passage_async( + PydanticPassage( + organization_id=default_user.organization_id, + archive_id=default_archive.id, + text="Older passage", + embedding=[0.1], + embedding_config=DEFAULT_EMBEDDING_CONFIG, + created_at=older_date, + ), + actor=default_user, + ) + + newer_passage = await server.passage_manager.create_agent_passage_async( + PydanticPassage( + organization_id=default_user.organization_id, + archive_id=default_archive.id, + text="Newer passage", + embedding=[0.1], + embedding_config=DEFAULT_EMBEDDING_CONFIG, + created_at=newer_date, + ), + actor=default_user, + ) + + results = await server.agent_manager.query_agent_passages_async( + actor=default_user, + archive_id=default_archive.id, + start_date=now - timedelta(days=1), + end_date=now + timedelta(minutes=1), + ) + + assert len(results) == 1 + passage, _, _ = results[0] + assert passage.id == newer_passage.id + assert passage.id != older_passage.id + assert passage.created_at >= now - timedelta(days=1) + assert passage.created_at <= now + timedelta(minutes=1) + + @pytest.fixture def mock_embeddings(): """Load mock embeddings from JSON file"""