From 0bdd555f337ffb7a42b5308a730c3e1ebc858d96 Mon Sep 17 00:00:00 2001 From: cthomas Date: Thu, 5 Feb 2026 18:33:17 -0800 Subject: [PATCH] feat: add memfs-py service (#9315) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add memfs-py service * add tf for bucket access and secrets v2 access * feat(memfs): add helm charts, deploy workflow, and bug fixes - Add dev helm chart (helm/dev/memfs-py/) with CSI secrets pattern - Update prod helm chart with CSI secrets and correct service account - Add GitHub Actions deploy workflow - Change port from 8284 to 8285 to avoid conflict with core's dulwich sidecar - Fix chunked transfer encoding issue (strip HTTP_TRANSFER_ENCODING header) - Fix timestamp parsing to handle both ISO and HTTP date formats - Fix get_head_sha to raise FileNotFoundError on 404 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta --------- Co-authored-by: Kian Jones Co-authored-by: Letta --- letta/server/rest_api/routers/v1/git_http.py | 137 +++--- letta/server/server.py | 12 +- letta/services/memory_repo/__init__.py | 9 + .../services/memory_repo/memfs_client_base.py | 425 ++++++++++++++++++ .../services/memory_repo/storage/__init__.py | 2 + letta/services/memory_repo/storage/local.py | 149 ++++++ letta/settings.py | 9 + 7 files changed, 691 insertions(+), 52 deletions(-) create mode 100644 letta/services/memory_repo/memfs_client_base.py create mode 100644 letta/services/memory_repo/storage/local.py diff --git a/letta/server/rest_api/routers/v1/git_http.py b/letta/server/rest_api/routers/v1/git_http.py index 52aa8e19..ab4e5b0b 100644 --- a/letta/server/rest_api/routers/v1/git_http.py +++ b/letta/server/rest_api/routers/v1/git_http.py @@ -394,7 +394,17 @@ def _prune_broken_refs(repo: Repo) -> int: async def _sync_after_push(actor_id: str, agent_id: str) -> None: - """Sync repo back to GCS and PostgreSQL after a successful push.""" + """Sync repo back to GCS and PostgreSQL after a successful push. + + When using memfs service: + - GCS sync is handled by memfs (skipped here) + - We still sync blocks to Postgres + + When using local dulwich: + - Upload repo to GCS + - Sync blocks to Postgres + """ + from letta.settings import settings if _server_instance is None: logger.warning("Server instance not set; cannot sync after push") @@ -407,45 +417,51 @@ async def _sync_after_push(actor_id: str, agent_id: str) -> None: return org_id = actor.organization_id - cache_key = f"{org_id}/{agent_id}" + using_memfs = bool(settings.memfs_service_url) - repo_path = _repo_cache.get(cache_key) - if not repo_path: - # Cross-worker fallback: read marker file written by the dulwich process. - try: - with open(_dulwich_repo_path_marker_file(cache_key), "r") as f: - repo_path = f.read().strip() or None - except FileNotFoundError: - repo_path = None + # When using local dulwich, we need to upload to GCS + if not using_memfs: + cache_key = f"{org_id}/{agent_id}" - if not repo_path: - logger.warning("No cached repo for %s after push", cache_key) - return + repo_path = _repo_cache.get(cache_key) + if not repo_path: + # Cross-worker fallback: read marker file written by the dulwich process. + try: + with open(_dulwich_repo_path_marker_file(cache_key), "r") as f: + repo_path = f.read().strip() or None + except FileNotFoundError: + repo_path = None - if not os.path.exists(repo_path): - logger.warning("Repo path %s does not exist after push", repo_path) - return + if not repo_path: + logger.warning("No cached repo for %s after push", cache_key) + return - logger.info("Syncing repo after push: org=%s agent=%s", org_id, agent_id) + if not os.path.exists(repo_path): + logger.warning("Repo path %s does not exist after push", repo_path) + return - storage = _server_instance.memory_repo_manager.git.storage - storage_prefix = f"{org_id}/{agent_id}/repo.git" - git_dir = os.path.join(repo_path, ".git") + logger.info("Syncing repo after push: org=%s agent=%s", org_id, agent_id) - upload_tasks = [] - for root, _dirs, files in os.walk(git_dir): - for filename in files: - local_file = os.path.join(root, filename) - rel_path = os.path.relpath(local_file, git_dir) - storage_path = f"{storage_prefix}/{rel_path}" + storage = _server_instance.memory_repo_manager.git.storage + storage_prefix = f"{org_id}/{agent_id}/repo.git" + git_dir = os.path.join(repo_path, ".git") - with open(local_file, "rb") as f: - content = f.read() + upload_tasks = [] + for root, _dirs, files in os.walk(git_dir): + for filename in files: + local_file = os.path.join(root, filename) + rel_path = os.path.relpath(local_file, git_dir) + storage_path = f"{storage_prefix}/{rel_path}" - upload_tasks.append(storage.upload_bytes(storage_path, content)) + with open(local_file, "rb") as f: + content = f.read() - await asyncio.gather(*upload_tasks) - logger.info("Uploaded %s files to GCS", len(upload_tasks)) + upload_tasks.append(storage.upload_bytes(storage_path, content)) + + await asyncio.gather(*upload_tasks) + logger.info("Uploaded %s files to GCS", len(upload_tasks)) + else: + logger.info("Using memfs service; GCS sync handled by memfs (agent=%s)", agent_id) # Sync blocks to Postgres (if using GitEnabledBlockManager). # @@ -518,13 +534,15 @@ async def _sync_after_push(actor_id: str, agent_id: str) -> None: except Exception: logger.exception("Failed detaching removed blocks during post-push sync (agent=%s)", agent_id) - # Cleanup local cache - _repo_cache.pop(cache_key, None) - try: - os.unlink(_dulwich_repo_path_marker_file(cache_key)) - except FileNotFoundError: - pass - shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) + # Cleanup local cache (only relevant when using local dulwich) + if not using_memfs: + cache_key = f"{org_id}/{agent_id}" + _repo_cache.pop(cache_key, None) + try: + os.unlink(_dulwich_repo_path_marker_file(cache_key)) + except FileNotFoundError: + pass + shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True) def _parse_agent_id_from_repo_path(path: str) -> Optional[str]: @@ -566,6 +584,13 @@ def _filter_out_hop_by_hop_headers(headers: Iterable[tuple[str, str]]) -> Dict[s return out +def _get_memfs_service_url() -> Optional[str]: + """Get the memfs service URL from settings, if configured.""" + from letta.settings import settings + + return settings.memfs_service_url + + @router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) # pragma: no cover async def proxy_git_http( path: str, @@ -573,21 +598,33 @@ async def proxy_git_http( server=Depends(get_letta_server), headers: HeaderParams = Depends(get_headers), ): - """Proxy `/v1/git/*` requests to the local dulwich WSGI server.""" + """Proxy `/v1/git/*` requests to the git HTTP backend. - if not _DULWICH_AVAILABLE: - return JSONResponse( - status_code=501, - content={ - "detail": "git smart HTTP is disabled (dulwich not installed)", - }, - ) + If LETTA_MEMFS_SERVICE_URL is set, proxies to the external memfs service. + Otherwise, proxies to the local dulwich WSGI server. + """ - # Ensure server is running (best-effort). We also start it during lifespan. - start_dulwich_server() + memfs_url = _get_memfs_service_url() - port = _get_dulwich_port() - url = f"http://127.0.0.1:{port}/{path}" + if memfs_url: + # Proxy to external memfs service + url = f"{memfs_url.rstrip('/')}/git/{path}" + logger.info("proxy_git_http: using memfs service at %s", memfs_url) + else: + # Proxy to local dulwich server + if not _DULWICH_AVAILABLE: + return JSONResponse( + status_code=501, + content={ + "detail": "git smart HTTP is disabled (dulwich not installed)", + }, + ) + + # Ensure server is running (best-effort). We also start it during lifespan. + start_dulwich_server() + + port = _get_dulwich_port() + url = f"http://127.0.0.1:{port}/{path}" req_headers = _filter_out_hop_by_hop_headers(request.headers.items()) # Avoid sending FastAPI host/length; httpx will compute diff --git a/letta/server/server.py b/letta/server/server.py index c3731f96..ebe7f190 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -433,7 +433,8 @@ class SyncServer(object): def _init_memory_repo_manager(self) -> Optional[MemoryRepoManager]: """Initialize the memory repository manager if configured. - Configure the object store via settings (recommended): + If LETTA_MEMFS_SERVICE_URL is set, uses the external memfs service. + Otherwise, configure the object store via settings (recommended): LETTA_OBJECT_STORE_URI="gs://my-bucket/repository?project=my-gcp-project" @@ -441,7 +442,7 @@ class SyncServer(object): - gs:// (or gcs://) -> Google Cloud Storage Returns: - MemoryRepoManager if configured, None otherwise + MemoryRepoManager (or MemfsClient) if configured, None otherwise """ # Keep import local to avoid import/circular issues during server bootstrap. @@ -449,6 +450,13 @@ class SyncServer(object): from letta.settings import settings + # Check if memfs service is configured (takes priority over local object store) + if settings.memfs_service_url: + from letta.services.memory_repo import MemfsClient + + logger.info("Memory repo manager using memfs service: %s", settings.memfs_service_url) + return MemfsClient(base_url=settings.memfs_service_url) + uri = settings.object_store_uri if not uri: logger.debug("Memory repo manager not configured (object_store_uri not set)") diff --git a/letta/services/memory_repo/__init__.py b/letta/services/memory_repo/__init__.py index 0d88b161..fb7d5e97 100644 --- a/letta/services/memory_repo/__init__.py +++ b/letta/services/memory_repo/__init__.py @@ -3,9 +3,18 @@ from letta.services.memory_repo.manager import MemoryRepoManager from letta.services.memory_repo.storage.base import StorageBackend from letta.services.memory_repo.storage.gcs import GCSStorageBackend +from letta.services.memory_repo.storage.local import LocalStorageBackend + +# MemfsClient: try cloud implementation first, fall back to local filesystem +try: + from letta.services.memory_repo.memfs_client import MemfsClient +except ImportError: + from letta.services.memory_repo.memfs_client_base import MemfsClient __all__ = [ "MemoryRepoManager", + "MemfsClient", "StorageBackend", "GCSStorageBackend", + "LocalStorageBackend", ] diff --git a/letta/services/memory_repo/memfs_client_base.py b/letta/services/memory_repo/memfs_client_base.py new file mode 100644 index 00000000..ff6341ad --- /dev/null +++ b/letta/services/memory_repo/memfs_client_base.py @@ -0,0 +1,425 @@ +"""Local filesystem-based client for git memory operations. + +This is the open-source implementation that stores git repositories +on the local filesystem (~/.letta/memfs/ by default). This enables +git-backed memory for self-hosted deployments without external dependencies. + +The cloud/enterprise version (memfs_client.py) connects to the memfs +HTTP service instead. +""" + +import hashlib +import json +import os +import uuid +from typing import Dict, List, Optional + +from letta.log import get_logger +from letta.otel.tracing import trace_method +from letta.schemas.block import Block as PydanticBlock +from letta.schemas.memory_repo import MemoryCommit +from letta.schemas.user import User as PydanticUser +from letta.services.memory_repo.git_operations import GitOperations +from letta.services.memory_repo.storage.local import LocalStorageBackend +from letta.utils import enforce_types + +logger = get_logger(__name__) + +# File paths within the memory repository +BLOCKS_DIR = "blocks" +METADATA_FILE = "metadata/blocks.json" + +# Default local storage path +DEFAULT_LOCAL_PATH = os.path.expanduser("~/.letta/memfs") + + +class MemfsClient: + """Local filesystem-based client for git memory operations. + + Provides the same interface as the cloud MemfsClient but stores + repositories on the local filesystem using LocalStorageBackend. + This enables git-backed memory for self-hosted OSS deployments. + """ + + def __init__(self, base_url: str = None, local_path: str = None, timeout: float = 120.0): + """Initialize the local memfs client. + + Args: + base_url: Ignored (for interface compatibility with cloud client) + local_path: Path for local storage (default: ~/.letta/memfs) + timeout: Ignored (for interface compatibility) + """ + self.local_path = local_path or DEFAULT_LOCAL_PATH + self.storage = LocalStorageBackend(base_path=self.local_path) + self.git = GitOperations(storage=self.storage, redis_client=None) + + logger.info(f"MemfsClient initialized with local storage at {self.local_path}") + + async def close(self): + """Close the client (no-op for local storage).""" + pass + + # ========================================================================= + # Repository Operations + # ========================================================================= + + @enforce_types + @trace_method + async def create_repo_async( + self, + agent_id: str, + actor: PydanticUser, + initial_blocks: List[PydanticBlock] = None, + ) -> str: + """Create a new repository for an agent with optional initial blocks. + + Args: + agent_id: Agent ID + actor: User performing the operation + initial_blocks: Optional list of blocks to commit as initial state + + Returns: + The HEAD SHA of the created repository + """ + initial_blocks = initial_blocks or [] + org_id = actor.organization_id + + # Build initial files from blocks + initial_files = {} + metadata = {"blocks": {}} + + for block in initial_blocks: + file_path = f"{BLOCKS_DIR}/{block.label}.md" + initial_files[file_path] = block.value or "" + metadata["blocks"][block.label] = { + "description": block.description, + "limit": block.limit, + } + + if metadata["blocks"]: + initial_files[METADATA_FILE] = json.dumps(metadata, indent=2) + + return await self.git.create_repo( + agent_id=agent_id, + org_id=org_id, + initial_files=initial_files, + author_name=f"User {actor.id}", + author_email=f"{actor.id}@letta.ai", + ) + + # ========================================================================= + # Block Operations (Read) + # ========================================================================= + + @enforce_types + @trace_method + async def get_blocks_async( + self, + agent_id: str, + actor: PydanticUser, + ref: str = "HEAD", + ) -> List[PydanticBlock]: + """Get all memory blocks at a specific ref. + + Args: + agent_id: Agent ID + actor: User performing the operation + ref: Git ref (commit SHA, branch name, or 'HEAD') + + Returns: + List of memory blocks + """ + org_id = actor.organization_id + + try: + files = await self.git.get_files(agent_id, org_id, ref) + except FileNotFoundError: + return [] + + # Parse metadata + metadata: dict = {} + if METADATA_FILE in files: + try: + metadata_json = json.loads(files[METADATA_FILE]) + if isinstance(metadata_json, dict): + metadata = metadata_json.get("blocks", {}) or {} + except json.JSONDecodeError: + logger.warning(f"Failed to parse metadata for agent {agent_id}") + + # Convert block files to PydanticBlock + blocks = [] + for file_path, content in files.items(): + if file_path.startswith(f"{BLOCKS_DIR}/") and file_path.endswith(".md"): + label = file_path[len(f"{BLOCKS_DIR}/") : -3] + block_meta = metadata.get(label, {}) + + # Generate deterministic UUID-style ID from agent_id + label + synthetic_uuid = uuid.UUID(hashlib.md5(f"{agent_id}:{label}".encode()).hexdigest()) + blocks.append( + PydanticBlock( + id=f"block-{synthetic_uuid}", + label=label, + value=content, + description=block_meta.get("description"), + limit=block_meta.get("limit", 5000), + metadata=block_meta.get("metadata", {}), + ) + ) + + return blocks + + @enforce_types + @trace_method + async def get_block_async( + self, + agent_id: str, + label: str, + actor: PydanticUser, + ref: str = "HEAD", + ) -> Optional[PydanticBlock]: + """Get a specific memory block. + + Args: + agent_id: Agent ID + label: Block label + actor: User performing the operation + ref: Git ref + + Returns: + Memory block or None + """ + blocks = await self.get_blocks_async(agent_id, actor, ref) + for block in blocks: + if block.label == label: + return block + return None + + # ========================================================================= + # Block Operations (Write) + # ========================================================================= + + async def _ensure_repo_exists(self, agent_id: str, actor: PydanticUser) -> str: + """Ensure the repository exists, creating if needed.""" + try: + return await self.git.get_head_sha(agent_id, actor.organization_id) + except FileNotFoundError: + return await self.git.create_repo( + agent_id=agent_id, + org_id=actor.organization_id, + initial_files={}, + author_name=f"User {actor.id}", + author_email=f"{actor.id}@letta.ai", + ) + + @enforce_types + @trace_method + async def update_block_async( + self, + agent_id: str, + label: str, + value: str, + actor: PydanticUser, + message: Optional[str] = None, + ) -> MemoryCommit: + """Update a memory block. + + Args: + agent_id: Agent ID + label: Block label + value: New block value + actor: User performing the operation + message: Optional commit message + + Returns: + Commit details + """ + from letta.schemas.memory_repo import FileChange + + await self._ensure_repo_exists(agent_id, actor) + + file_path = f"{BLOCKS_DIR}/{label}.md" + commit_message = message or f"Update {label}" + + return await self.git.commit( + agent_id=agent_id, + org_id=actor.organization_id, + changes=[FileChange(path=file_path, content=value, change_type="modify")], + message=commit_message, + author_name=f"User {actor.id}", + author_email=f"{actor.id}@letta.ai", + ) + + @enforce_types + @trace_method + async def create_block_async( + self, + agent_id: str, + block: PydanticBlock, + actor: PydanticUser, + message: Optional[str] = None, + ) -> MemoryCommit: + """Create a new memory block. + + Args: + agent_id: Agent ID + block: Block to create + actor: User performing the operation + message: Optional commit message + + Returns: + Commit details + """ + from letta.schemas.memory_repo import FileChange + + await self._ensure_repo_exists(agent_id, actor) + org_id = actor.organization_id + + # Get current metadata + try: + files = await self.git.get_files(agent_id, org_id) + except FileNotFoundError: + files = {} + + metadata = {"blocks": {}} + if METADATA_FILE in files: + try: + raw_metadata = json.loads(files[METADATA_FILE]) + if isinstance(raw_metadata, dict) and isinstance(raw_metadata.get("blocks"), dict): + metadata = raw_metadata + except json.JSONDecodeError: + pass + + # Add new block metadata + metadata["blocks"][block.label] = { + "description": block.description, + "limit": block.limit, + "metadata": block.metadata or {}, + } + + # Prepare changes + changes = [ + FileChange( + path=f"{BLOCKS_DIR}/{block.label}.md", + content=block.value, + change_type="add", + ), + FileChange( + path=METADATA_FILE, + content=json.dumps(metadata, indent=2), + change_type="modify", + ), + ] + + commit_message = message or f"Create block {block.label}" + + return await self.git.commit( + agent_id=agent_id, + org_id=org_id, + changes=changes, + message=commit_message, + author_name=f"User {actor.id}", + author_email=f"{actor.id}@letta.ai", + ) + + @enforce_types + @trace_method + async def delete_block_async( + self, + agent_id: str, + label: str, + actor: PydanticUser, + message: Optional[str] = None, + ) -> MemoryCommit: + """Delete a memory block. + + Args: + agent_id: Agent ID + label: Block label to delete + actor: User performing the operation + message: Optional commit message + + Returns: + Commit details + """ + from letta.schemas.memory_repo import FileChange + + await self._ensure_repo_exists(agent_id, actor) + org_id = actor.organization_id + + # Get current metadata + try: + files = await self.git.get_files(agent_id, org_id) + except FileNotFoundError: + files = {} + + metadata = {"blocks": {}} + if METADATA_FILE in files: + try: + raw_metadata = json.loads(files[METADATA_FILE]) + if isinstance(raw_metadata, dict) and isinstance(raw_metadata.get("blocks"), dict): + metadata = raw_metadata + except json.JSONDecodeError: + pass + + # Remove block from metadata + if label in metadata["blocks"]: + del metadata["blocks"][label] + + # Prepare changes + changes = [ + FileChange( + path=f"{BLOCKS_DIR}/{label}.md", + content=None, + change_type="delete", + ), + FileChange( + path=METADATA_FILE, + content=json.dumps(metadata, indent=2), + change_type="modify", + ), + ] + + commit_message = message or f"Delete block {label}" + + return await self.git.commit( + agent_id=agent_id, + org_id=org_id, + changes=changes, + message=commit_message, + author_name=f"User {actor.id}", + author_email=f"{actor.id}@letta.ai", + ) + + # ========================================================================= + # History Operations + # ========================================================================= + + @enforce_types + @trace_method + async def get_history_async( + self, + agent_id: str, + actor: PydanticUser, + path: Optional[str] = None, + limit: int = 50, + ) -> List[MemoryCommit]: + """Get commit history. + + Args: + agent_id: Agent ID + actor: User performing the operation + path: Optional file path to filter by + limit: Maximum commits to return + + Returns: + List of commits, newest first + """ + try: + return await self.git.get_history( + agent_id=agent_id, + org_id=actor.organization_id, + path=path, + limit=limit, + ) + except FileNotFoundError: + return [] diff --git a/letta/services/memory_repo/storage/__init__.py b/letta/services/memory_repo/storage/__init__.py index 2387b3d3..08db7a97 100644 --- a/letta/services/memory_repo/storage/__init__.py +++ b/letta/services/memory_repo/storage/__init__.py @@ -2,8 +2,10 @@ from letta.services.memory_repo.storage.base import StorageBackend from letta.services.memory_repo.storage.gcs import GCSStorageBackend +from letta.services.memory_repo.storage.local import LocalStorageBackend __all__ = [ "StorageBackend", "GCSStorageBackend", + "LocalStorageBackend", ] diff --git a/letta/services/memory_repo/storage/local.py b/letta/services/memory_repo/storage/local.py new file mode 100644 index 00000000..2bc6c631 --- /dev/null +++ b/letta/services/memory_repo/storage/local.py @@ -0,0 +1,149 @@ +"""Local filesystem storage backend for memory repositories. + +This backend stores git repository data on the local filesystem, +making git-backed memory available without external dependencies. +Ideal for self-hosted OSS deployments. +""" + +import os +import shutil +from pathlib import Path +from typing import List, Optional + +from letta.log import get_logger +from letta.services.memory_repo.storage.base import StorageBackend + +logger = get_logger(__name__) + + +class LocalStorageBackend(StorageBackend): + """Local filesystem storage backend for memory repositories. + + Stores repository data under a configurable base path, defaulting to + ~/.letta/memfs/. This enables git-backed memory for self-hosted + deployments without requiring cloud storage. + + Directory structure: + {base_path}/{prefix}/{org_id}/{agent_id}/repo.git/ + """ + + def __init__( + self, + base_path: Optional[str] = None, + prefix: str = "repository", + ): + """Initialize local storage backend. + + Args: + base_path: Base directory for storage (default: ~/.letta/memfs) + prefix: Prefix for all paths in this backend (default: "repository") + """ + if base_path is None: + base_path = os.path.expanduser("~/.letta/memfs") + + self._base_path = Path(base_path) + self._prefix = prefix.rstrip("/") + self._bucket_name = "local" # For interface compatibility + + # Ensure base directory exists + self._base_path.mkdir(parents=True, exist_ok=True) + logger.debug(f"LocalStorageBackend initialized at {self._base_path}") + + def _full_path(self, path: str) -> Path: + """Get full filesystem path including prefix.""" + path = path.lstrip("/") + if self._prefix: + return self._base_path / self._prefix / path + return self._base_path / path + + @property + def bucket_name(self) -> str: + """Return the bucket name (for interface compatibility).""" + return self._bucket_name + + async def upload_bytes(self, path: str, content: bytes) -> None: + """Write bytes to a local file.""" + full_path = self._full_path(path) + full_path.parent.mkdir(parents=True, exist_ok=True) + + with open(full_path, "wb") as f: + f.write(content) + + logger.debug(f"Wrote {len(content)} bytes to {full_path}") + + async def download_bytes(self, path: str) -> bytes: + """Read bytes from a local file.""" + full_path = self._full_path(path) + + if not full_path.exists(): + raise FileNotFoundError(f"{full_path} not found") + + with open(full_path, "rb") as f: + return f.read() + + async def exists(self, path: str) -> bool: + """Check if a path exists.""" + full_path = self._full_path(path) + return full_path.exists() + + async def delete(self, path: str) -> None: + """Delete a file.""" + full_path = self._full_path(path) + + if not full_path.exists(): + raise FileNotFoundError(f"{full_path} not found") + + full_path.unlink() + logger.debug(f"Deleted {full_path}") + + async def list_files(self, prefix: str) -> List[str]: + """List all files with the given prefix.""" + full_prefix = self._full_path(prefix) + + if not full_prefix.exists(): + return [] + + result = [] + if full_prefix.is_file(): + # Prefix is a file, return it + rel_path = str(full_prefix.relative_to(self._base_path / self._prefix)) + result.append(rel_path) + else: + # Walk directory + for file_path in full_prefix.rglob("*"): + if file_path.is_file(): + rel_path = str(file_path.relative_to(self._base_path / self._prefix)) + result.append(rel_path) + + return result + + async def delete_prefix(self, prefix: str) -> int: + """Delete all files with the given prefix.""" + full_prefix = self._full_path(prefix) + + if not full_prefix.exists(): + return 0 + + # Count files before deletion + count = sum(1 for _ in full_prefix.rglob("*") if _.is_file()) + + if full_prefix.is_file(): + full_prefix.unlink() + count = 1 + else: + shutil.rmtree(full_prefix, ignore_errors=True) + + logger.debug(f"Deleted {count} files with prefix {prefix}") + return count + + async def copy(self, source_path: str, dest_path: str) -> None: + """Copy a file.""" + source_full = self._full_path(source_path) + dest_full = self._full_path(dest_path) + + if not source_full.exists(): + raise FileNotFoundError(f"{source_full} not found") + + dest_full.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(source_full, dest_full) + logger.debug(f"Copied {source_full} to {dest_full}") diff --git a/letta/settings.py b/letta/settings.py index 05b87bce..308d9cbc 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -324,6 +324,15 @@ class Settings(BaseSettings): validation_alias=AliasChoices("LETTA_OBJECT_STORE_PROJECT"), description="Optional project override for object store clients (e.g., GCS project).", ) + + # memfs service URL - when set, git memory operations are proxied to the memfs service + # instead of running locally. This enables separating git/GCS operations into a dedicated service. + memfs_service_url: str | None = Field( + default=None, + validation_alias=AliasChoices("LETTA_MEMFS_SERVICE_URL"), + description="URL of the memfs service (e.g., http://memfs:8284). When set, git memory operations use this service.", + ) + # multi agent settings multi_agent_send_message_max_retries: int = 3 multi_agent_send_message_timeout: int = 20 * 60