test: migrate sources test to 1.0 sdk [LET-6317] (#6367)

* test: migrate sources test to 1.0 sdk

* fix dict attribute access
This commit is contained in:
cthomas
2025-11-24 19:27:12 -08:00
committed by Caren Thomas
parent 7216d35418
commit 0ecf72567a
2 changed files with 224 additions and 176 deletions

View File

@@ -1,7 +1,7 @@
import functools
import os
import time
from typing import Optional, Union
from typing import Any, Optional, Union
from letta_client import AsyncLetta, Letta
@@ -294,22 +294,32 @@ def upload_file_and_wait(
"""Helper function to upload a file and wait for processing to complete"""
with open(file_path, "rb") as f:
if duplicate_handling:
file_metadata = client.sources.files.upload(source_id=source_id, file=f, duplicate_handling=duplicate_handling, name=name)
file_metadata = client.folders.files.upload(folder_id=source_id, file=f, duplicate_handling=duplicate_handling, name=name)
else:
file_metadata = client.sources.files.upload(source_id=source_id, file=f, name=name)
file_metadata = client.folders.files.upload(folder_id=source_id, file=f, name=name)
# wait for the file to be processed
start_time = time.time()
while file_metadata.processing_status != "completed" and file_metadata.processing_status != "error":
file_metadata_id = file_metadata.id
processing_status = file_metadata.processing_status
while processing_status != "completed" and processing_status != "error":
if time.time() - start_time > max_wait:
raise TimeoutError(f"File processing timed out after {max_wait} seconds")
time.sleep(1)
file_metadata = client.sources.get_file_metadata(source_id=source_id, file_id=file_metadata.id)
print("Waiting for file processing to complete...", file_metadata.processing_status)
file_metadata = client.get(
path=f"/v1/sources/{source_id}/files/{file_metadata_id}",
cast_to=dict[str, Any],
)
print("Waiting for file processing to complete...", file_metadata["processing_status"])
processing_status = file_metadata["processing_status"]
if file_metadata.processing_status == "error":
if isinstance(file_metadata, dict) and file_metadata["processing_status"] == "error":
raise RuntimeError(f"File processing failed: {file_metadata['error_message']}")
elif hasattr(file_metadata, "processing_status") and file_metadata.processing_status == "error":
raise RuntimeError(f"File processing failed: {file_metadata.error_message}")
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
return file_metadata

View File

@@ -5,11 +5,13 @@ import tempfile
import threading
import time
from datetime import datetime, timedelta
from typing import Any
import pytest
from dotenv import load_dotenv
from letta_client import CreateBlock, Letta as LettaSDKClient, LettaRequest, MessageCreate as ClientMessageCreate
from letta_client.types import AgentState
from letta_client import Letta as LettaSDKClient
from letta_client.types import CreateBlockParam
from letta_client.types.agent_state import AgentState
from letta.constants import DEFAULT_ORG_ID, FILES_TOOLS
from letta.helpers.pinecone_utils import should_use_pinecone
@@ -27,16 +29,17 @@ SERVER_PORT = 8283
def get_raw_system_message(client: LettaSDKClient, agent_id: str) -> str:
"""Helper function to get the raw system message from an agent's preview payload."""
raw_payload = client.agents.messages.preview_raw_payload(
agent_id=agent_id,
request=LettaRequest(
messages=[
ClientMessageCreate(
role="user",
content="Testing",
)
raw_payload = client.post(
f"/v1/agents/{agent_id}/messages/preview-raw-payload",
cast_to=dict[str, Any],
body={
"messages": [
{
"role": "user",
"content": "Testing",
}
],
),
},
)
return raw_payload["messages"][0]["content"]
@@ -44,8 +47,8 @@ def get_raw_system_message(client: LettaSDKClient, agent_id: str) -> str:
@pytest.fixture(autouse=True)
def clear_sources(client: LettaSDKClient):
# Clear existing sources
for source in client.sources.list():
client.sources.delete(source_id=source.id)
for source in list(client.folders.list()):
client.folders.delete(folder_id=source.id)
def run_server():
@@ -67,21 +70,21 @@ def client() -> LettaSDKClient:
thread.start()
wait_for_server(server_url)
print("Running client tests with server:", server_url)
client = LettaSDKClient(base_url=server_url, token=None)
client = LettaSDKClient(base_url=server_url)
client.tools.upsert_base_tools()
yield client
@pytest.fixture
def agent_state(disable_pinecone, client: LettaSDKClient):
open_file_tool = client.tools.list(name="open_files")[0]
search_files_tool = client.tools.list(name="semantic_search_files")[0]
grep_tool = client.tools.list(name="grep_files")[0]
open_file_tool = list(client.tools.list(name="open_files"))[0]
search_files_tool = list(client.tools.list(name="semantic_search_files"))[0]
grep_tool = list(client.tools.list(name="grep_files"))[0]
agent_state = client.agents.create(
name="test_sources_agent",
memory_blocks=[
CreateBlock(
CreateBlockParam(
label="human",
value="username: sarah",
),
@@ -101,7 +104,7 @@ def test_auto_attach_detach_files_tools(disable_pinecone, disable_turbopuffer, c
# Create agent with basic configuration
agent = client.agents.create(
memory_blocks=[
CreateBlock(label="human", value="username: sarah"),
CreateBlockParam(label="human", value="username: sarah"),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
@@ -125,28 +128,32 @@ def test_auto_attach_detach_files_tools(disable_pinecone, disable_turbopuffer, c
assert_no_file_tools(agent)
# Create and attach first source
source_1 = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(client.sources.list()) == 1
source_1 = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(list(client.folders.list())) == 1
agent = client.agents.sources.attach(source_id=source_1.id, agent_id=agent.id)
assert len(client.agents.retrieve(agent_id=agent.id).sources) == 1
client.agents.folders.attach(folder_id=source_1.id, agent_id=agent.id)
agent = client.agents.retrieve(agent_id=agent.id, include=["agent.sources", "agent.tools"])
assert len(agent.sources) == 1
assert_file_tools_present(agent, set(FILES_TOOLS))
# Create and attach second source
source_2 = client.sources.create(name="another_test_source", embedding="openai/text-embedding-3-small")
assert len(client.sources.list()) == 2
source_2 = client.folders.create(name="another_test_source", embedding="openai/text-embedding-3-small")
assert len(list(client.folders.list())) == 2
agent = client.agents.sources.attach(source_id=source_2.id, agent_id=agent.id)
assert len(client.agents.retrieve(agent_id=agent.id).sources) == 2
client.agents.folders.attach(folder_id=source_2.id, agent_id=agent.id)
agent = client.agents.retrieve(agent_id=agent.id, include=["agent.sources", "agent.tools"])
assert len(agent.sources) == 2
# File tools should remain after attaching second source
assert_file_tools_present(agent, set(FILES_TOOLS))
# Detach second source - tools should remain (first source still attached)
agent = client.agents.sources.detach(source_id=source_2.id, agent_id=agent.id)
client.agents.folders.detach(folder_id=source_2.id, agent_id=agent.id)
agent = client.agents.retrieve(agent_id=agent.id, include=["agent.sources", "agent.tools"])
assert_file_tools_present(agent, set(FILES_TOOLS))
# Detach first source - all file tools should be removed
agent = client.agents.sources.detach(source_id=source_1.id, agent_id=agent.id)
client.agents.folders.detach(folder_id=source_1.id, agent_id=agent.id)
agent = client.agents.retrieve(agent_id=agent.id, include=["agent.sources", "agent.tools"])
assert_no_file_tools(agent)
@@ -185,22 +192,22 @@ def test_file_upload_creates_source_blocks_correctly(
settings.mistral_api_key = None
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(client.sources.list()) == 1
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(list(client.folders.list())) == 1
# Attach
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Upload the file
upload_file_and_wait(client, source.id, file_path)
# Get uploaded files
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
# Check that blocks were created
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 1
assert any(expected_value in b.value for b in blocks)
@@ -217,10 +224,10 @@ def test_file_upload_creates_source_blocks_correctly(
assert 'status="open"' in raw_system_message
# Remove file from source
client.sources.files.delete(source_id=source.id, file_id=files[0].id)
client.folders.files.delete(folder_id=source.id, file_id=files[0].id)
# Confirm blocks were removed
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 0
assert not any(expected_value in b.value for b in blocks)
@@ -243,8 +250,8 @@ def test_attach_existing_files_creates_source_blocks_correctly(
disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState
):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(client.sources.list()) == 1
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(list(client.folders.list())) == 1
# Load files into the source
file_path = "tests/data/test.txt"
@@ -253,12 +260,12 @@ def test_attach_existing_files_creates_source_blocks_correctly(
upload_file_and_wait(client, source.id, file_path)
# Get the first file with pagination
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
# Attach after uploading the file
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
raw_system_message = get_raw_system_message(client, agent_state.id)
# Assert that the expected chunk is in the raw system message
@@ -284,17 +291,17 @@ def test_attach_existing_files_creates_source_blocks_correctly(
assert expected_chunk in raw_system_message
# Get the agent state, check blocks exist
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 1
assert any("test" in b.value for b in blocks)
assert any(b.value.startswith("[Viewing file start") for b in blocks)
# Detach the source
client.agents.sources.detach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.detach(folder_id=source.id, agent_id=agent_state.id)
# Get the agent state, check blocks do NOT exist
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 0
assert not any("test" in b.value for b in blocks)
@@ -310,10 +317,10 @@ def test_delete_source_removes_source_blocks_correctly(
disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState
):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(client.sources.list()) == 1
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(list(client.folders.list())) == 1
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
raw_system_message = get_raw_system_message(client, agent_state.id)
assert "test_source" in raw_system_message
assert "<directories>" in raw_system_message
@@ -347,20 +354,20 @@ def test_delete_source_removes_source_blocks_correctly(
assert expected_chunk in raw_system_message
# Get the agent state, check blocks exist
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 1
assert any("test" in b.value for b in blocks)
# Remove file from source
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
raw_system_message_after_detach = get_raw_system_message(client, agent_state.id)
assert expected_chunk not in raw_system_message_after_detach
assert "test_source" not in raw_system_message_after_detach
assert "<directories>" not in raw_system_message_after_detach
# Get the agent state, check blocks do NOT exist
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 0
assert not any("test" in b.value for b in blocks)
@@ -368,13 +375,13 @@ def test_delete_source_removes_source_blocks_correctly(
def test_agent_uses_open_close_file_correctly(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
sources_list = client.sources.list()
sources_list = list(client.folders.list())
assert len(sources_list) == 1
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Load files into the source
file_path = "tests/data/long_test.txt"
@@ -383,13 +390,13 @@ def test_agent_uses_open_close_file_correctly(disable_pinecone, disable_turbopuf
upload_file_and_wait(client, source.id, file_path)
# Get uploaded files
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
file = files[0]
# Check that file is opened initially
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
print(f"Agent has {len(blocks)} file block(s)")
if blocks:
@@ -414,8 +421,9 @@ def test_agent_uses_open_close_file_correctly(disable_pinecone, disable_turbopuf
print(open_response1.messages)
# Check that file is opened
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 1
old_value = blocks[0].value
old_content_length = len(old_value)
print(f"File content length after first open: {old_content_length} characters")
@@ -443,7 +451,7 @@ def test_agent_uses_open_close_file_correctly(disable_pinecone, disable_turbopuf
# Check that file is opened, but for different range
print("Verifying file is opened with second range...")
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
new_value = blocks[0].value
new_content_length = len(new_value)
@@ -471,13 +479,13 @@ def test_agent_uses_open_close_file_correctly(disable_pinecone, disable_turbopuf
def test_agent_uses_search_files_correctly(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
sources_list = client.sources.list()
sources_list = list(client.folders.list())
assert len(sources_list) == 1
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Load files into the source
file_path = "tests/data/long_test.txt"
@@ -485,10 +493,10 @@ def test_agent_uses_search_files_correctly(disable_pinecone, disable_turbopuffer
# Upload the files
file_metadata = upload_file_and_wait(client, source.id, file_path)
print(f"File uploaded and processed: {file_metadata.file_name}")
print(f"File uploaded and processed: {file_metadata['file_name']}")
# Get uploaded files
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
@@ -517,13 +525,13 @@ def test_agent_uses_search_files_correctly(disable_pinecone, disable_turbopuffer
def test_agent_uses_grep_correctly_basic(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
sources_list = client.sources.list()
sources_list = list(client.folders.list())
assert len(sources_list) == 1
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Load files into the source
file_path = "tests/data/long_test.txt"
@@ -531,10 +539,12 @@ def test_agent_uses_grep_correctly_basic(disable_pinecone, disable_turbopuffer,
# Upload the files
file_metadata = upload_file_and_wait(client, source.id, file_path)
print(f"File uploaded and processed: {file_metadata.file_name}")
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
print(f"File uploaded and processed: {file_metadata['file_name']}")
# Get uploaded files
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
@@ -559,13 +569,13 @@ def test_agent_uses_grep_correctly_basic(disable_pinecone, disable_turbopuffer,
def test_agent_uses_grep_correctly_advanced(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
sources_list = client.sources.list()
sources_list = list(client.folders.list())
assert len(sources_list) == 1
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Load files into the source
file_path = "tests/data/list_tools.json"
@@ -573,10 +583,12 @@ def test_agent_uses_grep_correctly_advanced(disable_pinecone, disable_turbopuffe
# Upload the files
file_metadata = upload_file_and_wait(client, source.id, file_path)
print(f"File uploaded and processed: {file_metadata.file_name}")
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
print(f"File uploaded and processed: {file_metadata['file_name']}")
# Get uploaded files
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
@@ -608,15 +620,15 @@ def test_agent_uses_grep_correctly_advanced(disable_pinecone, disable_turbopuffe
def test_create_agent_with_source_ids_creates_source_blocks_correctly(disable_pinecone, disable_turbopuffer, client: LettaSDKClient):
"""Test that creating an agent with source_ids parameter correctly creates source blocks."""
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(client.sources.list()) == 1
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
assert len(list(client.folders.list())) == 1
# Upload a file to the source before attaching
file_path = "tests/data/long_test.txt"
upload_file_and_wait(client, source.id, file_path)
# Get uploaded files to verify
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
@@ -624,7 +636,7 @@ def test_create_agent_with_source_ids_creates_source_blocks_correctly(disable_pi
temp_agent_state = client.agents.create(
name="test_agent_with_sources",
memory_blocks=[
CreateBlock(
CreateBlockParam(
label="human",
value="username: sarah",
),
@@ -650,13 +662,13 @@ def test_create_agent_with_source_ids_creates_source_blocks_correctly(disable_pi
def test_view_ranges_have_metadata(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
sources_list = client.sources.list()
sources_list = list(client.folders.list())
assert len(sources_list) == 1
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Load files into the source
file_path = "tests/data/1_to_100.py"
@@ -665,13 +677,13 @@ def test_view_ranges_have_metadata(disable_pinecone, disable_turbopuffer, client
upload_file_and_wait(client, source.id, file_path)
# Get uploaded files
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].source_id == source.id
file = files[0]
# Check that file is opened initially
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 1
block = blocks[0]
@@ -693,7 +705,7 @@ def test_view_ranges_have_metadata(disable_pinecone, disable_turbopuffer, client
print(open_response.messages)
# Check that file is opened correctly
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
blocks = agent_state.memory.file_blocks
assert len(blocks) == 1
block = blocks[0]
@@ -714,22 +726,22 @@ def test_view_ranges_have_metadata(disable_pinecone, disable_turbopuffer, client
def test_duplicate_file_renaming(disable_pinecone, disable_turbopuffer, client: LettaSDKClient):
"""Test that duplicate files are renamed with count-based suffixes (e.g., file.txt, file (1).txt, file (2).txt)"""
# Create a new source
source = client.sources.create(name="test_duplicate_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_duplicate_source", embedding="openai/text-embedding-3-small")
# Upload the same file three times
file_path = "tests/data/test.txt"
with open(file_path, "rb") as f:
first_file = client.sources.files.upload(source_id=source.id, file=f)
first_file = client.folders.files.upload(folder_id=source.id, file=f)
with open(file_path, "rb") as f:
second_file = client.sources.files.upload(source_id=source.id, file=f)
second_file = client.folders.files.upload(folder_id=source.id, file=f)
with open(file_path, "rb") as f:
third_file = client.sources.files.upload(source_id=source.id, file=f)
third_file = client.folders.files.upload(folder_id=source.id, file=f)
# Get all uploaded files
files = client.sources.files.list(source_id=source.id, limit=10)
files = list(client.folders.files.list(folder_id=source.id, limit=10))
assert len(files) == 3, f"Expected 3 files, got {len(files)}"
# Sort files by creation time to ensure predictable order
@@ -753,13 +765,13 @@ def test_duplicate_file_renaming(disable_pinecone, disable_turbopuffer, client:
def test_duplicate_file_handling_replace(disable_pinecone, disable_turbopuffer, client: LettaSDKClient):
"""Test that DuplicateFileHandling.REPLACE replaces existing files with same name"""
# Create a new source
source = client.sources.create(name="test_replace_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_replace_source", embedding="openai/text-embedding-3-small")
# Create agent and attach source to test memory blocks
agent_state = client.agents.create(
name="test_replace_agent",
memory_blocks=[
CreateBlock(label="human", value="username: sarah"),
CreateBlockParam(label="human", value="username: sarah"),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
@@ -778,13 +790,13 @@ def test_duplicate_file_handling_replace(disable_pinecone, disable_turbopuffer,
upload_file_and_wait(client, source.id, temp_file_path)
# Verify original file was uploaded
files = client.sources.files.list(source_id=source.id, limit=10)
files = list(client.folders.files.list(folder_id=source.id, limit=10))
assert len(files) == 1, f"Expected 1 file, got {len(files)}"
original_file = files[0]
assert original_file.original_file_name == temp_filename
# Get agent state and verify original content is in memory blocks
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
file_blocks = agent_state.memory.file_blocks
assert len(file_blocks) == 1, f"Expected 1 file block, got {len(file_blocks)}"
original_block_content = file_blocks[0].value
@@ -801,7 +813,7 @@ def test_duplicate_file_handling_replace(disable_pinecone, disable_turbopuffer,
replacement_file = upload_file_and_wait(client, source.id, temp_file_path, duplicate_handling=DuplicateFileHandling.REPLACE)
# Verify we still have only 1 file (replacement, not addition)
files_after_replace = client.sources.files.list(source_id=source.id, limit=10)
files_after_replace = list(client.folders.files.list(folder_id=source.id, limit=10))
assert len(files_after_replace) == 1, f"Expected 1 file after replacement, got {len(files_after_replace)}"
replaced_file = files_after_replace[0]
@@ -814,7 +826,7 @@ def test_duplicate_file_handling_replace(disable_pinecone, disable_turbopuffer,
assert replaced_file.id != original_file.id, "Replacement file should have different ID"
# Verify agent memory blocks contain replacement content
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
updated_file_blocks = agent_state.memory.file_blocks
assert len(updated_file_blocks) == 1, f"Expected 1 file block after replacement, got {len(updated_file_blocks)}"
@@ -838,11 +850,11 @@ def test_upload_file_with_custom_name(disable_pinecone, disable_turbopuffer, cli
agent_state = client.agents.create(
name="test_agent_custom_name",
memory_blocks=[
CreateBlock(
CreateBlockParam(
label="persona",
value="I am a helpful assistant",
),
CreateBlock(
CreateBlockParam(
label="human",
value="The user is a developer",
),
@@ -852,10 +864,10 @@ def test_upload_file_with_custom_name(disable_pinecone, disable_turbopuffer, cli
)
# Create source
source = client.sources.create(name="test_source_custom_name", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source_custom_name", embedding="openai/text-embedding-3-small")
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Create a temporary file with specific content
import tempfile
@@ -869,19 +881,21 @@ def test_upload_file_with_custom_name(disable_pinecone, disable_turbopuffer, cli
# Upload file with custom name
custom_name = "my_custom_file_name.txt"
file_metadata = upload_file_and_wait(client, source.id, temp_file_path, name=custom_name)
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
# Verify the file uses the custom name
assert file_metadata.file_name == custom_name
assert file_metadata.original_file_name == custom_name
assert file_metadata["file_name"] == custom_name
assert file_metadata["original_file_name"] == custom_name
# Verify file appears in source files list with custom name
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].file_name == custom_name
assert files[0].original_file_name == custom_name
# Verify the custom name is used in file blocks
agent_state = client.agents.retrieve(agent_id=agent_state.id)
agent_state = client.agents.retrieve(agent_id=agent_state.id, include=["agent.blocks"])
file_blocks = agent_state.memory.file_blocks
assert len(file_blocks) == 1
# Check that the custom name appears in the block label
@@ -897,11 +911,13 @@ def test_upload_file_with_custom_name(disable_pinecone, disable_turbopuffer, cli
# Upload same file with different custom name should succeed
different_custom_name = "folder_a/folder_b/another_custom_name.txt"
file_metadata2 = upload_file_and_wait(client, source.id, temp_file_path, name=different_custom_name)
assert file_metadata2.file_name == different_custom_name
assert file_metadata2.original_file_name == different_custom_name
if not isinstance(file_metadata2, dict):
file_metadata2 = file_metadata2.model_dump()
assert file_metadata2["file_name"] == different_custom_name
assert file_metadata2["original_file_name"] == different_custom_name
# Verify both files exist
files = client.sources.files.list(source_id=source.id, limit=10)
files = list(client.folders.files.list(folder_id=source.id, limit=10))
assert len(files) == 2
file_names = {f.file_name for f in files}
assert custom_name in file_names
@@ -917,7 +933,7 @@ def test_open_files_schema_descriptions(disable_pinecone, disable_turbopuffer, c
"""Test that open_files tool schema contains correct descriptions from docstring"""
# Get the open_files tool
tools = client.tools.list(name="open_files")
tools = list(client.tools.list(name="open_files"))
assert len(tools) == 1, "Expected exactly one open_files tool"
open_files_tool = tools[0]
@@ -1000,7 +1016,7 @@ def test_grep_files_schema_descriptions(disable_pinecone, disable_turbopuffer, c
"""Test that grep_files tool schema contains correct descriptions from docstring"""
# Get the grep_files tool
tools = client.tools.list(name="grep_files")
tools = list(client.tools.list(name="grep_files"))
assert len(tools) == 1, "Expected exactly one grep_files tool"
grep_files_tool = tools[0]
@@ -1085,17 +1101,19 @@ def test_grep_files_schema_descriptions(disable_pinecone, disable_turbopuffer, c
def test_agent_open_file(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
"""Test client.agents.open_file() function"""
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Upload a file
file_path = "tests/data/test.txt"
file_metadata = upload_file_and_wait(client, source.id, file_path)
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
# Basic test open_file function
closed_files = client.agents.files.open(agent_id=agent_state.id, file_id=file_metadata.id)
closed_files = client.agents.files.open(agent_id=agent_state.id, file_id=file_metadata["id"])
assert len(closed_files) == 0
system = get_raw_system_message(client, agent_state.id)
@@ -1106,20 +1124,22 @@ def test_agent_open_file(disable_pinecone, disable_turbopuffer, client: LettaSDK
def test_agent_close_file(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
"""Test client.agents.close_file() function"""
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Upload a file
file_path = "tests/data/test.txt"
file_metadata = upload_file_and_wait(client, source.id, file_path)
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
# First open the file
client.agents.files.open(agent_id=agent_state.id, file_id=file_metadata.id)
client.agents.files.open(agent_id=agent_state.id, file_id=file_metadata["id"])
# Test close_file function
client.agents.files.close(agent_id=agent_state.id, file_id=file_metadata.id)
client.agents.files.close(agent_id=agent_state.id, file_id=file_metadata["id"])
system = get_raw_system_message(client, agent_state.id)
assert '<file status="closed" name="test_source/test.txt">' in system
@@ -1128,19 +1148,21 @@ def test_agent_close_file(disable_pinecone, disable_turbopuffer, client: LettaSD
def test_agent_close_all_open_files(disable_pinecone, disable_turbopuffer, client: LettaSDKClient, agent_state: AgentState):
"""Test client.agents.close_all_open_files() function"""
# Create a new source
source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_source", embedding="openai/text-embedding-3-small")
# Attach source to agent
client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id)
client.agents.folders.attach(folder_id=source.id, agent_id=agent_state.id)
# Upload multiple files
file_paths = ["tests/data/test.txt", "tests/data/test.md"]
file_metadatas = []
for file_path in file_paths:
file_metadata = upload_file_and_wait(client, source.id, file_path)
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
file_metadatas.append(file_metadata)
# Open each file
client.agents.files.open(agent_id=agent_state.id, file_id=file_metadata.id)
client.agents.files.open(agent_id=agent_state.id, file_id=file_metadata["id"])
system = get_raw_system_message(client, agent_state.id)
assert '<file status="open"' in system
@@ -1159,12 +1181,12 @@ def test_agent_close_all_open_files(disable_pinecone, disable_turbopuffer, clien
def test_file_processing_timeout(disable_pinecone, disable_turbopuffer, client: LettaSDKClient):
"""Test that files in non-terminal states are moved to error after timeout"""
# Create a source
source = client.sources.create(name="test_timeout_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_timeout_source", embedding="openai/text-embedding-3-small")
# Upload a file
file_path = "tests/data/test.txt"
with open(file_path, "rb") as f:
file_metadata = client.sources.files.upload(source_id=source.id, file=f)
file_metadata = client.folders.files.upload(folder_id=source.id, file=f)
# Get the file ID
file_id = file_metadata.id
@@ -1177,20 +1199,26 @@ def test_file_processing_timeout(disable_pinecone, disable_turbopuffer, client:
assert FileProcessingStatus.PENDING.is_terminal_state() == False
# For testing the actual timeout logic, we can check the current file status
current_file = client.sources.get_file_metadata(source_id=source.id, file_id=file_id)
current_file = client.get(
path=f"/v1/sources/{source.id}/files/{file_id}",
cast_to=dict[str, Any],
)
# Convert string status to enum for testing
status_enum = FileProcessingStatus(current_file.processing_status)
if not isinstance(current_file, dict):
current_file = current_file.model_dump()
processing_status = current_file["processing_status"]
status_enum = FileProcessingStatus(processing_status)
# Verify that files in terminal states are not affected by timeout checks
if status_enum.is_terminal_state():
# This is the expected behavior - files that completed processing shouldn't timeout
print(f"File {file_id} is in terminal state: {current_file.processing_status}")
print(f"File {file_id} is in terminal state: {processing_status}")
assert status_enum in [FileProcessingStatus.COMPLETED, FileProcessingStatus.ERROR]
else:
# If file is still processing, it should eventually complete or timeout
# In a real scenario, we'd wait and check, but for unit tests we just verify the logic exists
print(f"File {file_id} is still processing: {current_file.processing_status}")
print(f"File {file_id} is still processing: {processing_status}")
assert status_enum in [FileProcessingStatus.PENDING, FileProcessingStatus.PARSING, FileProcessingStatus.EMBEDDING]
@@ -1222,7 +1250,7 @@ def test_file_processing_timeout_logic():
def test_letta_free_embedding(disable_pinecone, disable_turbopuffer, client: LettaSDKClient):
"""Test creating a source with letta/letta-free embedding and uploading a file"""
# create a source with letta-free embedding
source = client.sources.create(name="test_letta_free_source", embedding="letta/letta-free")
source = client.folders.create(name="test_letta_free_source", embedding="letta/letta-free")
# verify source was created with correct embedding
assert source.name == "test_letta_free_source"
@@ -1233,17 +1261,19 @@ def test_letta_free_embedding(disable_pinecone, disable_turbopuffer, client: Let
file_metadata = upload_file_and_wait(client, source.id, file_path)
# verify file was uploaded successfully
assert file_metadata.processing_status == "completed"
assert file_metadata.source_id == source.id
assert file_metadata.file_name == "test.txt"
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
assert file_metadata["processing_status"] == "completed"
assert file_metadata["source_id"] == source.id
assert file_metadata["file_name"] == "test.txt"
# verify file appears in source files list
files = client.sources.files.list(source_id=source.id, limit=1)
files = list(client.folders.files.list(folder_id=source.id, limit=1))
assert len(files) == 1
assert files[0].id == file_metadata.id
assert files[0].id == file_metadata["id"]
# cleanup
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
# --- Pinecone Tests ---
@@ -1260,15 +1290,15 @@ def test_pinecone_search_files_tool(disable_turbopuffer, client: LettaSDKClient)
agent = client.agents.create(
name="test_pinecone_agent",
memory_blocks=[
CreateBlock(label="human", value="username: testuser"),
CreateBlockParam(label="human", value="username: testuser"),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
# Create source and attach to agent
source = client.sources.create(name="test_pinecone_source", embedding="openai/text-embedding-3-small")
client.agents.sources.attach(source_id=source.id, agent_id=agent.id)
source = client.folders.create(name="test_pinecone_source", embedding="openai/text-embedding-3-small")
client.agents.folders.attach(folder_id=source.id, agent_id=agent.id)
# Upload a file with searchable content
file_path = "tests/data/long_test.txt"
@@ -1304,7 +1334,7 @@ def test_pinecone_list_files_status(disable_turbopuffer, client: LettaSDKClient)
pytest.skip("Pinecone not configured (missing API key or disabled), skipping Pinecone-specific tests")
# create source
source = client.sources.create(name="test_list_files_status", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_list_files_status", embedding="openai/text-embedding-3-small")
file_paths = ["tests/data/long_test.txt"]
uploaded_files = []
@@ -1312,25 +1342,29 @@ def test_pinecone_list_files_status(disable_turbopuffer, client: LettaSDKClient)
# use the new helper that polls via list_files
file_metadata = upload_file_and_wait_list_files(client, source.id, file_path)
uploaded_files.append(file_metadata)
assert file_metadata.processing_status == "completed", f"File {file_path} should be completed"
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
assert file_metadata["processing_status"] == "completed", f"File {file_path} should be completed"
# now get files using list_source_files to verify status checking works
files_list = client.sources.files.list(source_id=source.id, limit=100)
files_list = client.folders.files.list(folder_id=source.id, limit=100)
# verify all files show completed status and have proper embedding counts
assert len(files_list) == len(uploaded_files), f"Expected {len(uploaded_files)} files, got {len(files_list)}"
for file_metadata in files_list:
assert file_metadata.processing_status == "completed", f"File {file_metadata.file_name} should show completed status"
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
assert file_metadata["processing_status"] == "completed", f"File {file_metadata['file_name']} should show completed status"
# verify embedding counts for files that have chunks
if file_metadata.total_chunks and file_metadata.total_chunks > 0:
assert file_metadata.chunks_embedded == file_metadata.total_chunks, (
f"File {file_metadata.file_name} should have all chunks embedded: {file_metadata.chunks_embedded}/{file_metadata.total_chunks}"
if file_metadata["total_chunks"] and file_metadata["total_chunks"] > 0:
assert file_metadata["chunks_embedded"] == file_metadata["total_chunks"], (
f"File {file_metadata['file_name']} should have all chunks embedded: {file_metadata['chunks_embedded']}/{file_metadata['total_chunks']}"
)
# cleanup
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
def test_pinecone_lifecycle_file_and_source_deletion(disable_turbopuffer, client: LettaSDKClient):
@@ -1343,7 +1377,7 @@ def test_pinecone_lifecycle_file_and_source_deletion(disable_turbopuffer, client
print("Testing Pinecone file and source deletion lifecycle")
# Create source
source = client.sources.create(name="test_lifecycle_source", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_lifecycle_source", embedding="openai/text-embedding-3-small")
# Upload multiple files and wait for processing
file_paths = ["tests/data/test.txt", "tests/data/test.md"]
@@ -1364,7 +1398,7 @@ def test_pinecone_lifecycle_file_and_source_deletion(disable_turbopuffer, client
print(f"Found {len(records_before)} records for file before deletion")
# Delete the file
client.sources.files.delete(source_id=source.id, file_id=file_to_delete.id)
client.folders.files.delete(folder_id=source.id, file_id=file_to_delete.id)
# Allow time for deletion to propagate
time.sleep(2)
@@ -1386,7 +1420,7 @@ def test_pinecone_lifecycle_file_and_source_deletion(disable_turbopuffer, client
print(f"Found {records_before} records for remaining files before source deletion")
# Delete the entire source
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
# Allow time for deletion to propagate
time.sleep(3)
@@ -1413,14 +1447,14 @@ def test_turbopuffer_search_files_tool(disable_pinecone, client: LettaSDKClient)
agent = client.agents.create(
name="test_turbopuffer_agent",
memory_blocks=[
CreateBlock(label="human", value="username: testuser"),
CreateBlockParam(label="human", value="username: testuser"),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
source = client.sources.create(name="test_turbopuffer_source", embedding="openai/text-embedding-3-small")
client.agents.sources.attach(source_id=source.id, agent_id=agent.id)
source = client.folders.create(name="test_turbopuffer_source", embedding="openai/text-embedding-3-small")
client.agents.folders.attach(folder_id=source.id, agent_id=agent.id)
file_path = "tests/data/long_test.txt"
upload_file_and_wait(client, source.id, file_path)
@@ -1445,40 +1479,44 @@ def test_turbopuffer_search_files_tool(disable_pinecone, client: LettaSDKClient)
)
client.agents.delete(agent_id=agent.id)
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
def test_turbopuffer_file_processing_status(disable_pinecone, client: LettaSDKClient):
"""Test that file processing completes successfully with Turbopuffer"""
print("Testing Turbopuffer file processing status")
source = client.sources.create(name="test_tpuf_file_status", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_tpuf_file_status", embedding="openai/text-embedding-3-small")
file_paths = ["tests/data/long_test.txt", "tests/data/test.md"]
uploaded_files = []
for file_path in file_paths:
file_metadata = upload_file_and_wait(client, source.id, file_path)
uploaded_files.append(file_metadata)
assert file_metadata.processing_status == "completed", f"File {file_path} should be completed"
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
assert file_metadata["processing_status"] == "completed", f"File {file_path} should be completed"
files_list = client.sources.files.list(source_id=source.id, limit=100)
files_list = client.folders.files.list(folder_id=source.id, limit=100).items
assert len(files_list) == len(uploaded_files), f"Expected {len(uploaded_files)} files, got {len(files_list)}"
for file_metadata in files_list:
assert file_metadata.processing_status == "completed", f"File {file_metadata.file_name} should show completed status"
if not isinstance(file_metadata, dict):
file_metadata = file_metadata.model_dump()
assert file_metadata["processing_status"] == "completed", f"File {file_metadata['file_name']} should show completed status"
if file_metadata.total_chunks and file_metadata.total_chunks > 0:
assert file_metadata.chunks_embedded == file_metadata.total_chunks, (
f"File {file_metadata.file_name} should have all chunks embedded: {file_metadata.chunks_embedded}/{file_metadata.total_chunks}"
if file_metadata["total_chunks"] and file_metadata["total_chunks"] > 0:
assert file_metadata["chunks_embedded"] == file_metadata["total_chunks"], (
f"File {file_metadata['file_name']} should have all chunks embedded: {file_metadata['chunks_embedded']}/{file_metadata['total_chunks']}"
)
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
def test_turbopuffer_lifecycle_file_and_source_deletion(disable_pinecone, client: LettaSDKClient):
"""Test that file and source deletion removes records from Turbopuffer"""
source = client.sources.create(name="test_tpuf_lifecycle", embedding="openai/text-embedding-3-small")
source = client.folders.create(name="test_tpuf_lifecycle", embedding="openai/text-embedding-3-small")
file_paths = ["tests/data/test.txt", "tests/data/test.md"]
uploaded_files = []
@@ -1495,19 +1533,19 @@ def test_turbopuffer_lifecycle_file_and_source_deletion(disable_pinecone, client
passages_before = asyncio.run(
tpuf_client.query_file_passages(
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_to_delete.id, top_k=100
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_to_delete["id"], top_k=100
)
)
print(f"Found {len(passages_before)} passages for file before deletion")
assert len(passages_before) > 0, "Should have passages before deletion"
client.sources.files.delete(source_id=source.id, file_id=file_to_delete.id)
client.folders.files.delete(folder_id=source.id, file_id=file_to_delete["id"])
time.sleep(2)
passages_after = asyncio.run(
tpuf_client.query_file_passages(
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_to_delete.id, top_k=100
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_to_delete["id"], top_k=100
)
)
print(f"Found {len(passages_after)} passages for file after deletion")
@@ -1518,7 +1556,7 @@ def test_turbopuffer_lifecycle_file_and_source_deletion(disable_pinecone, client
for file_metadata in uploaded_files[1:]:
passages = asyncio.run(
tpuf_client.query_file_passages(
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_metadata.id, top_k=100
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_metadata["id"], top_k=100
)
)
remaining_passages_before.extend(passages)
@@ -1526,7 +1564,7 @@ def test_turbopuffer_lifecycle_file_and_source_deletion(disable_pinecone, client
print(f"Found {len(remaining_passages_before)} passages for remaining files before source deletion")
assert len(remaining_passages_before) > 0, "Should have passages for remaining files"
client.sources.delete(source_id=source.id)
client.folders.delete(folder_id=source.id)
time.sleep(3)
@@ -1535,7 +1573,7 @@ def test_turbopuffer_lifecycle_file_and_source_deletion(disable_pinecone, client
try:
passages = asyncio.run(
tpuf_client.query_file_passages(
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_metadata.id, top_k=100
source_ids=[source.id], organization_id=user.organization_id, actor=user, file_id=file_metadata["id"], top_k=100
)
)
remaining_passages_after.extend(passages)
@@ -1550,8 +1588,8 @@ def test_turbopuffer_lifecycle_file_and_source_deletion(disable_pinecone, client
def test_turbopuffer_multiple_sources(disable_pinecone, client: LettaSDKClient):
"""Test that Turbopuffer correctly isolates passages by source in org-scoped namespace"""
source1 = client.sources.create(name="test_tpuf_source1", embedding="openai/text-embedding-3-small")
source2 = client.sources.create(name="test_tpuf_source2", embedding="openai/text-embedding-3-small")
source1 = client.folders.create(name="test_tpuf_source1", embedding="openai/text-embedding-3-small")
source2 = client.folders.create(name="test_tpuf_source2", embedding="openai/text-embedding-3-small")
file1_metadata = upload_file_and_wait(client, source1.id, "tests/data/test.txt")
file2_metadata = upload_file_and_wait(client, source2.id, "tests/data/test.md")
@@ -1574,15 +1612,15 @@ def test_turbopuffer_multiple_sources(disable_pinecone, client: LettaSDKClient):
assert len(source2_passages) > 0, "Source2 should have passages"
for passage, _, _ in source1_passages:
assert passage.source_id == source1.id, f"Passage should belong to source1, but has source_id={passage.source_id}"
assert passage.file_id == file1_metadata.id, f"Passage should belong to file1, but has file_id={passage.file_id}"
assert passage.source_id == source1.id, f"Passage should belong to source1, but has folder_id={passage.source_id}"
assert passage.file_id == file1_metadata["id"], f"Passage should belong to file1, but has file_id={passage.file_id}"
for passage, _, _ in source2_passages:
assert passage.source_id == source2.id, f"Passage should belong to source2, but has source_id={passage.source_id}"
assert passage.file_id == file2_metadata.id, f"Passage should belong to file2, but has file_id={passage.file_id}"
assert passage.source_id == source2.id, f"Passage should belong to source2, but has folder_id={passage.source_id}"
assert passage.file_id == file2_metadata["id"], f"Passage should belong to file2, but has file_id={passage.file_id}"
# delete source1 and verify source2 is unaffected
client.sources.delete(source_id=source1.id)
client.folders.delete(folder_id=source1.id)
time.sleep(2)
source2_passages_after = asyncio.run(
@@ -1593,7 +1631,7 @@ def test_turbopuffer_multiple_sources(disable_pinecone, client: LettaSDKClient):
f"Source2 should still have all passages after source1 deletion: {len(source2_passages_after)} vs {len(source2_passages)}"
)
client.sources.delete(source_id=source2.id)
client.folders.delete(folder_id=source2.id)
# --- End Turbopuffer Tests ---