feat: Add delete file from source endpoint (#1893)
Co-authored-by: Matt Zhou <mattzhou@Matts-MacBook-Pro.local>
This commit is contained in:
@@ -83,7 +83,7 @@ client = create_client()
|
||||
source = client.create_source(name="example_source")
|
||||
|
||||
# Add file data into a source
|
||||
client.load_file_into_source(filename=filename, source_id=source.id)
|
||||
client.load_file_to_source(filename=filename, source_id=source.id)
|
||||
```
|
||||
|
||||
### Loading with custom connectors
|
||||
|
||||
@@ -345,7 +345,7 @@ Load data into a source
|
||||
* **connector** (`DataConnector`) – Data connector
|
||||
* **source_name** (`str`) – Name of the source
|
||||
|
||||
#### load_file_into_source(filename: str, source_id: str, blocking=True) → Job
|
||||
#### load_file_to_source(filename: str, source_id: str, blocking=True) → Job
|
||||
|
||||
Load a file into a source
|
||||
|
||||
@@ -820,7 +820,7 @@ Load data into a source
|
||||
* **connector** (`DataConnector`) – Data connector
|
||||
* **source_name** (`str`) – Name of the source
|
||||
|
||||
#### load_file_into_source(filename: str, source_id: str, blocking=True)
|
||||
#### load_file_to_source(filename: str, source_id: str, blocking=True)
|
||||
|
||||
Load {filename} and insert into source
|
||||
|
||||
@@ -1243,7 +1243,7 @@ List available tools
|
||||
* **Returns:**
|
||||
*tools (List[Tool])* – List of tools
|
||||
|
||||
#### load_file_into_source(filename: str, source_id: str, blocking=True)
|
||||
#### load_file_to_source(filename: str, source_id: str, blocking=True)
|
||||
|
||||
Load {filename} and insert into source
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"job = client.load_file_into_source(filename=filename, source_id=letta_paper.id)\n",
|
||||
"job = client.load_file_to_source(filename=filename, source_id=letta_paper.id)\n",
|
||||
"job"
|
||||
]
|
||||
},
|
||||
|
||||
@@ -206,7 +206,10 @@ class AbstractClient(object):
|
||||
def load_data(self, connector: DataConnector, source_name: str):
|
||||
raise NotImplementedError
|
||||
|
||||
def load_file_into_source(self, filename: str, source_id: str, blocking=True) -> Job:
|
||||
def load_file_to_source(self, filename: str, source_id: str, blocking=True) -> Job:
|
||||
raise NotImplementedError
|
||||
|
||||
def delete_file_from_source(self, source_id: str, file_id: str) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def create_source(self, name: str) -> Source:
|
||||
@@ -1038,7 +1041,7 @@ class RESTClient(AbstractClient):
|
||||
def load_data(self, connector: DataConnector, source_name: str):
|
||||
raise NotImplementedError
|
||||
|
||||
def load_file_into_source(self, filename: str, source_id: str, blocking=True):
|
||||
def load_file_to_source(self, filename: str, source_id: str, blocking=True):
|
||||
"""
|
||||
Load a file into a source
|
||||
|
||||
@@ -1069,6 +1072,11 @@ class RESTClient(AbstractClient):
|
||||
time.sleep(1)
|
||||
return job
|
||||
|
||||
def delete_file_from_source(self, source_id: str, file_id: str) -> None:
|
||||
response = requests.delete(f"{self.base_url}/{self.api_prefix}/sources/{source_id}/{file_id}", headers=self.headers)
|
||||
if response.status_code not in [200, 204]:
|
||||
raise ValueError(f"Failed to delete tool: {response.text}")
|
||||
|
||||
def create_source(self, name: str) -> Source:
|
||||
"""
|
||||
Create a source
|
||||
@@ -2175,7 +2183,7 @@ class LocalClient(AbstractClient):
|
||||
"""
|
||||
self.server.load_data(user_id=self.user_id, connector=connector, source_name=source_name)
|
||||
|
||||
def load_file_into_source(self, filename: str, source_id: str, blocking=True):
|
||||
def load_file_to_source(self, filename: str, source_id: str, blocking=True):
|
||||
"""
|
||||
Load a file into a source
|
||||
|
||||
@@ -2194,6 +2202,9 @@ class LocalClient(AbstractClient):
|
||||
self.server.load_file_to_source(source_id=source_id, file_path=filename, job_id=job.id)
|
||||
return job
|
||||
|
||||
def delete_file_from_source(self, source_id: str, file_id: str):
|
||||
self.server.delete_file_from_source(source_id, file_id, user_id=self.user_id)
|
||||
|
||||
def get_job(self, job_id: str):
|
||||
return self.server.get_job(job_id=job_id)
|
||||
|
||||
|
||||
@@ -631,6 +631,21 @@ class MetadataStore:
|
||||
session.query(ToolModel).filter(ToolModel.id == tool_id).delete()
|
||||
session.commit()
|
||||
|
||||
@enforce_types
|
||||
def delete_file_from_source(self, source_id: str, file_id: str, user_id: Optional[str]):
|
||||
with self.session_maker() as session:
|
||||
file_metadata = (
|
||||
session.query(FileMetadataModel)
|
||||
.filter(FileMetadataModel.source_id == source_id, FileMetadataModel.id == file_id, FileMetadataModel.user_id == user_id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if file_metadata:
|
||||
session.delete(file_metadata)
|
||||
session.commit()
|
||||
|
||||
return file_metadata
|
||||
|
||||
@enforce_types
|
||||
def delete_block(self, block_id: str):
|
||||
with self.session_maker() as session:
|
||||
|
||||
@@ -2,7 +2,15 @@ import os
|
||||
import tempfile
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, Header, Query, UploadFile
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
BackgroundTasks,
|
||||
Depends,
|
||||
Header,
|
||||
HTTPException,
|
||||
Query,
|
||||
UploadFile,
|
||||
)
|
||||
|
||||
from letta.schemas.file import FileMetadata
|
||||
from letta.schemas.job import Job
|
||||
@@ -199,6 +207,25 @@ def list_files_from_source(
|
||||
return server.list_files_from_source(source_id=source_id, limit=limit, cursor=cursor)
|
||||
|
||||
|
||||
# it's redundant to include /delete in the URL path. The HTTP verb DELETE already implies that action.
|
||||
# it's still good practice to return a status indicating the success or failure of the deletion
|
||||
@router.delete("/{source_id}/{file_id}", status_code=204, operation_id="delete_file_from_source")
|
||||
def delete_file_from_source(
|
||||
source_id: str,
|
||||
file_id: str,
|
||||
server: "SyncServer" = Depends(get_letta_server),
|
||||
user_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
|
||||
):
|
||||
"""
|
||||
Delete a data source.
|
||||
"""
|
||||
actor = server.get_user_or_default(user_id=user_id)
|
||||
|
||||
deleted_file = server.delete_file_from_source(source_id=source_id, file_id=file_id, user_id=actor.id)
|
||||
if deleted_file is None:
|
||||
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.")
|
||||
|
||||
|
||||
def load_file_to_source_async(server: SyncServer, source_id: str, job_id: str, file: UploadFile, bytes: bytes):
|
||||
# write the file to a temporary directory (deleted after the context manager exits)
|
||||
with tempfile.TemporaryDirectory() as tmpdirname:
|
||||
|
||||
@@ -1620,6 +1620,9 @@ class SyncServer(Server):
|
||||
|
||||
return job
|
||||
|
||||
def delete_file_from_source(self, source_id: str, file_id: str, user_id: Optional[str]) -> Optional[FileMetadata]:
|
||||
return self.ms.delete_file_from_source(source_id=source_id, file_id=file_id, user_id=user_id)
|
||||
|
||||
def load_data(
|
||||
self,
|
||||
user_id: str,
|
||||
|
||||
@@ -9,7 +9,7 @@ from letta.schemas.source import Source
|
||||
|
||||
def upload_file_using_client(client: Union[LocalClient, RESTClient], source: Source, filename: str) -> Job:
|
||||
# load a file into a source (non-blocking job)
|
||||
upload_job = client.load_file_into_source(filename=filename, source_id=source.id, blocking=False)
|
||||
upload_job = client.load_file_to_source(filename=filename, source_id=source.id, blocking=False)
|
||||
print("Upload job", upload_job, upload_job.status, upload_job.metadata_)
|
||||
|
||||
# view active jobs
|
||||
|
||||
@@ -335,6 +335,35 @@ def test_list_files_pagination(client: Union[LocalClient, RESTClient], agent: Ag
|
||||
assert len(files) == 0 # Should be empty
|
||||
|
||||
|
||||
def test_delete_file_from_source(client: Union[LocalClient, RESTClient], agent: AgentState):
|
||||
# clear sources
|
||||
for source in client.list_sources():
|
||||
client.delete_source(source.id)
|
||||
|
||||
# clear jobs
|
||||
for job in client.list_jobs():
|
||||
client.delete_job(job.id)
|
||||
|
||||
# create a source
|
||||
source = client.create_source(name="test_source")
|
||||
|
||||
# load files into sources
|
||||
file_a = "tests/data/test.txt"
|
||||
upload_file_using_client(client, source, file_a)
|
||||
|
||||
# Get the first file
|
||||
files_a = client.list_files_from_source(source.id, limit=1)
|
||||
assert len(files_a) == 1
|
||||
assert files_a[0].source_id == source.id
|
||||
|
||||
# Delete the file
|
||||
client.delete_file_from_source(source.id, files_a[0].id)
|
||||
|
||||
# Check that no files are attached to the source
|
||||
empty_files = client.list_files_from_source(source.id, limit=1)
|
||||
assert len(empty_files) == 0
|
||||
|
||||
|
||||
def test_load_file(client: Union[LocalClient, RESTClient], agent: AgentState):
|
||||
# _reset_config()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user