diff --git a/letta/schemas/enums.py b/letta/schemas/enums.py index ec65f52f..899da24b 100644 --- a/letta/schemas/enums.py +++ b/letta/schemas/enums.py @@ -154,6 +154,7 @@ class DuplicateFileHandling(str, Enum): SKIP = "skip" # skip files with duplicate names ERROR = "error" # error when duplicate names are encountered SUFFIX = "suffix" # add numeric suffix to make names unique (default behavior) + REPLACE = "replace" # replace the file with the duplicate name class SandboxType(str, Enum): diff --git a/letta/server/rest_api/routers/v1/folders.py b/letta/server/rest_api/routers/v1/folders.py index 10b555ea..fffdc018 100644 --- a/letta/server/rest_api/routers/v1/folders.py +++ b/letta/server/rest_api/routers/v1/folders.py @@ -270,18 +270,21 @@ async def upload_file_to_folder( ) elif duplicate_handling == DuplicateFileHandling.SKIP: # Return existing file metadata with custom header to indicate it was skipped - from fastapi import Response - response = Response( content=existing_file.model_dump_json(), media_type="application/json", headers={"X-Upload-Result": "skipped"} ) return response - # For SUFFIX, continue to generate unique filename + elif duplicate_handling == DuplicateFileHandling.REPLACE: + # delete the file + deleted_file = await server.file_manager.delete_file(file_id=existing_file.id, actor=actor) + unique_filename = original_filename - # Generate unique filename (adds suffix if needed) - unique_filename = await server.file_manager.generate_unique_filename( - original_filename=original_filename, source=folder, organization_id=actor.organization_id - ) + if not unique_filename: + # For SUFFIX, continue to generate unique filename + # Generate unique filename (adds suffix if needed) + unique_filename = await server.file_manager.generate_unique_filename( + original_filename=original_filename, source=folder, organization_id=actor.organization_id + ) # create file metadata file_metadata = FileMetadata( diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index 992eb064..16d2fe33 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -263,6 +263,7 @@ async def upload_file_to_source( original_filename=original_filename, source_id=source_id, actor=actor ) + unique_filename = None if existing_file: # Duplicate found, handle based on strategy if duplicate_handling == DuplicateFileHandling.ERROR: @@ -271,18 +272,21 @@ async def upload_file_to_source( ) elif duplicate_handling == DuplicateFileHandling.SKIP: # Return existing file metadata with custom header to indicate it was skipped - from fastapi import Response - response = Response( content=existing_file.model_dump_json(), media_type="application/json", headers={"X-Upload-Result": "skipped"} ) return response - # For SUFFIX, continue to generate unique filename + elif duplicate_handling == DuplicateFileHandling.REPLACE: + # delete the file + deleted_file = await server.file_manager.delete_file(file_id=existing_file.id, actor=actor) + unique_filename = original_filename - # Generate unique filename (adds suffix if needed) - unique_filename = await server.file_manager.generate_unique_filename( - original_filename=original_filename, source=source, organization_id=actor.organization_id - ) + if not unique_filename: + # For SUFFIX, continue to generate unique filename + # Generate unique filename (adds suffix if needed) + unique_filename = await server.file_manager.generate_unique_filename( + original_filename=original_filename, source=source, organization_id=actor.organization_id + ) # create file metadata file_metadata = FileMetadata( diff --git a/tests/test_sources.py b/tests/test_sources.py index efe2e73a..9f96c01d 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -1,12 +1,13 @@ import os import re +import tempfile import threading import time from datetime import datetime, timedelta import pytest from dotenv import load_dotenv -from letta_client import CreateBlock +from letta_client import CreateBlock, DuplicateFileHandling from letta_client import Letta as LettaSDKClient from letta_client import LettaRequest from letta_client import MessageCreate as ClientMessageCreate @@ -70,10 +71,15 @@ def client() -> LettaSDKClient: yield client -def upload_file_and_wait(client: LettaSDKClient, source_id: str, file_path: str, max_wait: int = 60): +def upload_file_and_wait( + client: LettaSDKClient, source_id: str, file_path: str, max_wait: int = 60, duplicate_handling: DuplicateFileHandling = None +): """Helper function to upload a file and wait for processing to complete""" with open(file_path, "rb") as f: - file_metadata = client.sources.files.upload(source_id=source_id, file=f) + if duplicate_handling: + file_metadata = client.sources.files.upload(source_id=source_id, file=f, duplicate_handling=duplicate_handling) + else: + file_metadata = client.sources.files.upload(source_id=source_id, file=f) # Wait for the file to be processed start_time = time.time() @@ -763,6 +769,88 @@ def test_duplicate_file_renaming(disable_pinecone, client: LettaSDKClient): print(f" File {i+1}: original='{file.original_file_name}' → renamed='{file.file_name}'") +def test_duplicate_file_handling_replace(disable_pinecone, client: LettaSDKClient): + """Test that DuplicateFileHandling.REPLACE replaces existing files with same name""" + # Create a new source + source = client.sources.create(name="test_replace_source", embedding="openai/text-embedding-3-small") + + # Create agent and attach source to test memory blocks + agent_state = client.agents.create( + name="test_replace_agent", + memory_blocks=[ + CreateBlock(label="human", value="username: sarah"), + ], + model="openai/gpt-4o-mini", + embedding="openai/text-embedding-3-small", + source_ids=[source.id], + ) + + # Create a temporary file with original content + original_content = "original file content for testing" + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + f.write(original_content) + temp_file_path = f.name + temp_filename = os.path.basename(f.name) + + try: + # Wait for the file to be processed + upload_file_and_wait(client, source.id, temp_file_path) + + # Verify original file was uploaded + files = client.sources.files.list(source_id=source.id, limit=10) + assert len(files) == 1, f"Expected 1 file, got {len(files)}" + original_file = files[0] + assert original_file.original_file_name == temp_filename + + # Get agent state and verify original content is in memory blocks + agent_state = client.agents.retrieve(agent_id=agent_state.id) + file_blocks = agent_state.memory.file_blocks + assert len(file_blocks) == 1, f"Expected 1 file block, got {len(file_blocks)}" + original_block_content = file_blocks[0].value + assert original_content in original_block_content + + # Create replacement content + replacement_content = "this is the replacement content that should overwrite the original" + with open(temp_file_path, "w") as f: + f.write(replacement_content) + + # Upload replacement file with REPLACE duplicate handling + from letta.schemas.enums import DuplicateFileHandling # TODO: Temporary pre-client compliation, good to remove + + replacement_file = upload_file_and_wait(client, source.id, temp_file_path, duplicate_handling=DuplicateFileHandling.REPLACE) + + # Verify we still have only 1 file (replacement, not addition) + files_after_replace = client.sources.files.list(source_id=source.id, limit=10) + assert len(files_after_replace) == 1, f"Expected 1 file after replacement, got {len(files_after_replace)}" + + replaced_file = files_after_replace[0] + + # Verify file metadata shows replacement + assert replaced_file.original_file_name == temp_filename, "Original filename should be preserved" + assert replaced_file.file_name == temp_filename, "File name should match original" + + # Verify the file ID is different (new file replaced the old one) + assert replaced_file.id != original_file.id, "Replacement file should have different ID" + + # Verify agent memory blocks contain replacement content + agent_state = client.agents.retrieve(agent_id=agent_state.id) + updated_file_blocks = agent_state.memory.file_blocks + assert len(updated_file_blocks) == 1, f"Expected 1 file block after replacement, got {len(updated_file_blocks)}" + + replacement_block_content = updated_file_blocks[0].value + assert replacement_content in replacement_block_content, f"Expected replacement content in block, got: {replacement_block_content}" + assert ( + original_content not in replacement_block_content + ), f"Original content should not be present after replacement: {replacement_block_content}" + + print("✓ Successfully tested DuplicateFileHandling.REPLACE functionality") + + finally: + # Clean up temporary file + if os.path.exists(temp_file_path): + os.unlink(temp_file_path) + + def test_open_files_schema_descriptions(disable_pinecone, client: LettaSDKClient): """Test that open_files tool schema contains correct descriptions from docstring"""