feat: Add refresh functionality for files (#4053)
This commit is contained in:
@@ -154,6 +154,7 @@ class DuplicateFileHandling(str, Enum):
|
||||
SKIP = "skip" # skip files with duplicate names
|
||||
ERROR = "error" # error when duplicate names are encountered
|
||||
SUFFIX = "suffix" # add numeric suffix to make names unique (default behavior)
|
||||
REPLACE = "replace" # replace the file with the duplicate name
|
||||
|
||||
|
||||
class SandboxType(str, Enum):
|
||||
|
||||
@@ -270,18 +270,21 @@ async def upload_file_to_folder(
|
||||
)
|
||||
elif duplicate_handling == DuplicateFileHandling.SKIP:
|
||||
# Return existing file metadata with custom header to indicate it was skipped
|
||||
from fastapi import Response
|
||||
|
||||
response = Response(
|
||||
content=existing_file.model_dump_json(), media_type="application/json", headers={"X-Upload-Result": "skipped"}
|
||||
)
|
||||
return response
|
||||
# For SUFFIX, continue to generate unique filename
|
||||
elif duplicate_handling == DuplicateFileHandling.REPLACE:
|
||||
# delete the file
|
||||
deleted_file = await server.file_manager.delete_file(file_id=existing_file.id, actor=actor)
|
||||
unique_filename = original_filename
|
||||
|
||||
# Generate unique filename (adds suffix if needed)
|
||||
unique_filename = await server.file_manager.generate_unique_filename(
|
||||
original_filename=original_filename, source=folder, organization_id=actor.organization_id
|
||||
)
|
||||
if not unique_filename:
|
||||
# For SUFFIX, continue to generate unique filename
|
||||
# Generate unique filename (adds suffix if needed)
|
||||
unique_filename = await server.file_manager.generate_unique_filename(
|
||||
original_filename=original_filename, source=folder, organization_id=actor.organization_id
|
||||
)
|
||||
|
||||
# create file metadata
|
||||
file_metadata = FileMetadata(
|
||||
|
||||
@@ -263,6 +263,7 @@ async def upload_file_to_source(
|
||||
original_filename=original_filename, source_id=source_id, actor=actor
|
||||
)
|
||||
|
||||
unique_filename = None
|
||||
if existing_file:
|
||||
# Duplicate found, handle based on strategy
|
||||
if duplicate_handling == DuplicateFileHandling.ERROR:
|
||||
@@ -271,18 +272,21 @@ async def upload_file_to_source(
|
||||
)
|
||||
elif duplicate_handling == DuplicateFileHandling.SKIP:
|
||||
# Return existing file metadata with custom header to indicate it was skipped
|
||||
from fastapi import Response
|
||||
|
||||
response = Response(
|
||||
content=existing_file.model_dump_json(), media_type="application/json", headers={"X-Upload-Result": "skipped"}
|
||||
)
|
||||
return response
|
||||
# For SUFFIX, continue to generate unique filename
|
||||
elif duplicate_handling == DuplicateFileHandling.REPLACE:
|
||||
# delete the file
|
||||
deleted_file = await server.file_manager.delete_file(file_id=existing_file.id, actor=actor)
|
||||
unique_filename = original_filename
|
||||
|
||||
# Generate unique filename (adds suffix if needed)
|
||||
unique_filename = await server.file_manager.generate_unique_filename(
|
||||
original_filename=original_filename, source=source, organization_id=actor.organization_id
|
||||
)
|
||||
if not unique_filename:
|
||||
# For SUFFIX, continue to generate unique filename
|
||||
# Generate unique filename (adds suffix if needed)
|
||||
unique_filename = await server.file_manager.generate_unique_filename(
|
||||
original_filename=original_filename, source=source, organization_id=actor.organization_id
|
||||
)
|
||||
|
||||
# create file metadata
|
||||
file_metadata = FileMetadata(
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytest
|
||||
from dotenv import load_dotenv
|
||||
from letta_client import CreateBlock
|
||||
from letta_client import CreateBlock, DuplicateFileHandling
|
||||
from letta_client import Letta as LettaSDKClient
|
||||
from letta_client import LettaRequest
|
||||
from letta_client import MessageCreate as ClientMessageCreate
|
||||
@@ -70,10 +71,15 @@ def client() -> LettaSDKClient:
|
||||
yield client
|
||||
|
||||
|
||||
def upload_file_and_wait(client: LettaSDKClient, source_id: str, file_path: str, max_wait: int = 60):
|
||||
def upload_file_and_wait(
|
||||
client: LettaSDKClient, source_id: str, file_path: str, max_wait: int = 60, duplicate_handling: DuplicateFileHandling = None
|
||||
):
|
||||
"""Helper function to upload a file and wait for processing to complete"""
|
||||
with open(file_path, "rb") as f:
|
||||
file_metadata = client.sources.files.upload(source_id=source_id, file=f)
|
||||
if duplicate_handling:
|
||||
file_metadata = client.sources.files.upload(source_id=source_id, file=f, duplicate_handling=duplicate_handling)
|
||||
else:
|
||||
file_metadata = client.sources.files.upload(source_id=source_id, file=f)
|
||||
|
||||
# Wait for the file to be processed
|
||||
start_time = time.time()
|
||||
@@ -763,6 +769,88 @@ def test_duplicate_file_renaming(disable_pinecone, client: LettaSDKClient):
|
||||
print(f" File {i+1}: original='{file.original_file_name}' → renamed='{file.file_name}'")
|
||||
|
||||
|
||||
def test_duplicate_file_handling_replace(disable_pinecone, client: LettaSDKClient):
|
||||
"""Test that DuplicateFileHandling.REPLACE replaces existing files with same name"""
|
||||
# Create a new source
|
||||
source = client.sources.create(name="test_replace_source", embedding="openai/text-embedding-3-small")
|
||||
|
||||
# Create agent and attach source to test memory blocks
|
||||
agent_state = client.agents.create(
|
||||
name="test_replace_agent",
|
||||
memory_blocks=[
|
||||
CreateBlock(label="human", value="username: sarah"),
|
||||
],
|
||||
model="openai/gpt-4o-mini",
|
||||
embedding="openai/text-embedding-3-small",
|
||||
source_ids=[source.id],
|
||||
)
|
||||
|
||||
# Create a temporary file with original content
|
||||
original_content = "original file content for testing"
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f:
|
||||
f.write(original_content)
|
||||
temp_file_path = f.name
|
||||
temp_filename = os.path.basename(f.name)
|
||||
|
||||
try:
|
||||
# Wait for the file to be processed
|
||||
upload_file_and_wait(client, source.id, temp_file_path)
|
||||
|
||||
# Verify original file was uploaded
|
||||
files = client.sources.files.list(source_id=source.id, limit=10)
|
||||
assert len(files) == 1, f"Expected 1 file, got {len(files)}"
|
||||
original_file = files[0]
|
||||
assert original_file.original_file_name == temp_filename
|
||||
|
||||
# Get agent state and verify original content is in memory blocks
|
||||
agent_state = client.agents.retrieve(agent_id=agent_state.id)
|
||||
file_blocks = agent_state.memory.file_blocks
|
||||
assert len(file_blocks) == 1, f"Expected 1 file block, got {len(file_blocks)}"
|
||||
original_block_content = file_blocks[0].value
|
||||
assert original_content in original_block_content
|
||||
|
||||
# Create replacement content
|
||||
replacement_content = "this is the replacement content that should overwrite the original"
|
||||
with open(temp_file_path, "w") as f:
|
||||
f.write(replacement_content)
|
||||
|
||||
# Upload replacement file with REPLACE duplicate handling
|
||||
from letta.schemas.enums import DuplicateFileHandling # TODO: Temporary pre-client compliation, good to remove
|
||||
|
||||
replacement_file = upload_file_and_wait(client, source.id, temp_file_path, duplicate_handling=DuplicateFileHandling.REPLACE)
|
||||
|
||||
# Verify we still have only 1 file (replacement, not addition)
|
||||
files_after_replace = client.sources.files.list(source_id=source.id, limit=10)
|
||||
assert len(files_after_replace) == 1, f"Expected 1 file after replacement, got {len(files_after_replace)}"
|
||||
|
||||
replaced_file = files_after_replace[0]
|
||||
|
||||
# Verify file metadata shows replacement
|
||||
assert replaced_file.original_file_name == temp_filename, "Original filename should be preserved"
|
||||
assert replaced_file.file_name == temp_filename, "File name should match original"
|
||||
|
||||
# Verify the file ID is different (new file replaced the old one)
|
||||
assert replaced_file.id != original_file.id, "Replacement file should have different ID"
|
||||
|
||||
# Verify agent memory blocks contain replacement content
|
||||
agent_state = client.agents.retrieve(agent_id=agent_state.id)
|
||||
updated_file_blocks = agent_state.memory.file_blocks
|
||||
assert len(updated_file_blocks) == 1, f"Expected 1 file block after replacement, got {len(updated_file_blocks)}"
|
||||
|
||||
replacement_block_content = updated_file_blocks[0].value
|
||||
assert replacement_content in replacement_block_content, f"Expected replacement content in block, got: {replacement_block_content}"
|
||||
assert (
|
||||
original_content not in replacement_block_content
|
||||
), f"Original content should not be present after replacement: {replacement_block_content}"
|
||||
|
||||
print("✓ Successfully tested DuplicateFileHandling.REPLACE functionality")
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
if os.path.exists(temp_file_path):
|
||||
os.unlink(temp_file_path)
|
||||
|
||||
|
||||
def test_open_files_schema_descriptions(disable_pinecone, client: LettaSDKClient):
|
||||
"""Test that open_files tool schema contains correct descriptions from docstring"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user