feat: add memfs-py service (#9315)

* feat: add memfs-py service

* add tf for bucket access and secrets v2 access

* feat(memfs): add helm charts, deploy workflow, and bug fixes

- Add dev helm chart (helm/dev/memfs-py/) with CSI secrets pattern
- Update prod helm chart with CSI secrets and correct service account
- Add GitHub Actions deploy workflow
- Change port from 8284 to 8285 to avoid conflict with core's dulwich sidecar
- Fix chunked transfer encoding issue (strip HTTP_TRANSFER_ENCODING header)
- Fix timestamp parsing to handle both ISO and HTTP date formats
- Fix get_head_sha to raise FileNotFoundError on 404

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

---------

Co-authored-by: Kian Jones <kian@letta.com>
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
cthomas
2026-02-05 18:33:17 -08:00
committed by Caren Thomas
parent 21e880907f
commit 0bdd555f33
7 changed files with 691 additions and 52 deletions

View File

@@ -394,7 +394,17 @@ def _prune_broken_refs(repo: Repo) -> int:
async def _sync_after_push(actor_id: str, agent_id: str) -> None:
"""Sync repo back to GCS and PostgreSQL after a successful push."""
"""Sync repo back to GCS and PostgreSQL after a successful push.
When using memfs service:
- GCS sync is handled by memfs (skipped here)
- We still sync blocks to Postgres
When using local dulwich:
- Upload repo to GCS
- Sync blocks to Postgres
"""
from letta.settings import settings
if _server_instance is None:
logger.warning("Server instance not set; cannot sync after push")
@@ -407,45 +417,51 @@ async def _sync_after_push(actor_id: str, agent_id: str) -> None:
return
org_id = actor.organization_id
cache_key = f"{org_id}/{agent_id}"
using_memfs = bool(settings.memfs_service_url)
repo_path = _repo_cache.get(cache_key)
if not repo_path:
# Cross-worker fallback: read marker file written by the dulwich process.
try:
with open(_dulwich_repo_path_marker_file(cache_key), "r") as f:
repo_path = f.read().strip() or None
except FileNotFoundError:
repo_path = None
# When using local dulwich, we need to upload to GCS
if not using_memfs:
cache_key = f"{org_id}/{agent_id}"
if not repo_path:
logger.warning("No cached repo for %s after push", cache_key)
return
repo_path = _repo_cache.get(cache_key)
if not repo_path:
# Cross-worker fallback: read marker file written by the dulwich process.
try:
with open(_dulwich_repo_path_marker_file(cache_key), "r") as f:
repo_path = f.read().strip() or None
except FileNotFoundError:
repo_path = None
if not os.path.exists(repo_path):
logger.warning("Repo path %s does not exist after push", repo_path)
return
if not repo_path:
logger.warning("No cached repo for %s after push", cache_key)
return
logger.info("Syncing repo after push: org=%s agent=%s", org_id, agent_id)
if not os.path.exists(repo_path):
logger.warning("Repo path %s does not exist after push", repo_path)
return
storage = _server_instance.memory_repo_manager.git.storage
storage_prefix = f"{org_id}/{agent_id}/repo.git"
git_dir = os.path.join(repo_path, ".git")
logger.info("Syncing repo after push: org=%s agent=%s", org_id, agent_id)
upload_tasks = []
for root, _dirs, files in os.walk(git_dir):
for filename in files:
local_file = os.path.join(root, filename)
rel_path = os.path.relpath(local_file, git_dir)
storage_path = f"{storage_prefix}/{rel_path}"
storage = _server_instance.memory_repo_manager.git.storage
storage_prefix = f"{org_id}/{agent_id}/repo.git"
git_dir = os.path.join(repo_path, ".git")
with open(local_file, "rb") as f:
content = f.read()
upload_tasks = []
for root, _dirs, files in os.walk(git_dir):
for filename in files:
local_file = os.path.join(root, filename)
rel_path = os.path.relpath(local_file, git_dir)
storage_path = f"{storage_prefix}/{rel_path}"
upload_tasks.append(storage.upload_bytes(storage_path, content))
with open(local_file, "rb") as f:
content = f.read()
await asyncio.gather(*upload_tasks)
logger.info("Uploaded %s files to GCS", len(upload_tasks))
upload_tasks.append(storage.upload_bytes(storage_path, content))
await asyncio.gather(*upload_tasks)
logger.info("Uploaded %s files to GCS", len(upload_tasks))
else:
logger.info("Using memfs service; GCS sync handled by memfs (agent=%s)", agent_id)
# Sync blocks to Postgres (if using GitEnabledBlockManager).
#
@@ -518,13 +534,15 @@ async def _sync_after_push(actor_id: str, agent_id: str) -> None:
except Exception:
logger.exception("Failed detaching removed blocks during post-push sync (agent=%s)", agent_id)
# Cleanup local cache
_repo_cache.pop(cache_key, None)
try:
os.unlink(_dulwich_repo_path_marker_file(cache_key))
except FileNotFoundError:
pass
shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True)
# Cleanup local cache (only relevant when using local dulwich)
if not using_memfs:
cache_key = f"{org_id}/{agent_id}"
_repo_cache.pop(cache_key, None)
try:
os.unlink(_dulwich_repo_path_marker_file(cache_key))
except FileNotFoundError:
pass
shutil.rmtree(os.path.dirname(repo_path), ignore_errors=True)
def _parse_agent_id_from_repo_path(path: str) -> Optional[str]:
@@ -566,6 +584,13 @@ def _filter_out_hop_by_hop_headers(headers: Iterable[tuple[str, str]]) -> Dict[s
return out
def _get_memfs_service_url() -> Optional[str]:
"""Get the memfs service URL from settings, if configured."""
from letta.settings import settings
return settings.memfs_service_url
@router.api_route("/{path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) # pragma: no cover
async def proxy_git_http(
path: str,
@@ -573,21 +598,33 @@ async def proxy_git_http(
server=Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""Proxy `/v1/git/*` requests to the local dulwich WSGI server."""
"""Proxy `/v1/git/*` requests to the git HTTP backend.
if not _DULWICH_AVAILABLE:
return JSONResponse(
status_code=501,
content={
"detail": "git smart HTTP is disabled (dulwich not installed)",
},
)
If LETTA_MEMFS_SERVICE_URL is set, proxies to the external memfs service.
Otherwise, proxies to the local dulwich WSGI server.
"""
# Ensure server is running (best-effort). We also start it during lifespan.
start_dulwich_server()
memfs_url = _get_memfs_service_url()
port = _get_dulwich_port()
url = f"http://127.0.0.1:{port}/{path}"
if memfs_url:
# Proxy to external memfs service
url = f"{memfs_url.rstrip('/')}/git/{path}"
logger.info("proxy_git_http: using memfs service at %s", memfs_url)
else:
# Proxy to local dulwich server
if not _DULWICH_AVAILABLE:
return JSONResponse(
status_code=501,
content={
"detail": "git smart HTTP is disabled (dulwich not installed)",
},
)
# Ensure server is running (best-effort). We also start it during lifespan.
start_dulwich_server()
port = _get_dulwich_port()
url = f"http://127.0.0.1:{port}/{path}"
req_headers = _filter_out_hop_by_hop_headers(request.headers.items())
# Avoid sending FastAPI host/length; httpx will compute

View File

@@ -433,7 +433,8 @@ class SyncServer(object):
def _init_memory_repo_manager(self) -> Optional[MemoryRepoManager]:
"""Initialize the memory repository manager if configured.
Configure the object store via settings (recommended):
If LETTA_MEMFS_SERVICE_URL is set, uses the external memfs service.
Otherwise, configure the object store via settings (recommended):
LETTA_OBJECT_STORE_URI="gs://my-bucket/repository?project=my-gcp-project"
@@ -441,7 +442,7 @@ class SyncServer(object):
- gs:// (or gcs://) -> Google Cloud Storage
Returns:
MemoryRepoManager if configured, None otherwise
MemoryRepoManager (or MemfsClient) if configured, None otherwise
"""
# Keep import local to avoid import/circular issues during server bootstrap.
@@ -449,6 +450,13 @@ class SyncServer(object):
from letta.settings import settings
# Check if memfs service is configured (takes priority over local object store)
if settings.memfs_service_url:
from letta.services.memory_repo import MemfsClient
logger.info("Memory repo manager using memfs service: %s", settings.memfs_service_url)
return MemfsClient(base_url=settings.memfs_service_url)
uri = settings.object_store_uri
if not uri:
logger.debug("Memory repo manager not configured (object_store_uri not set)")

View File

@@ -3,9 +3,18 @@
from letta.services.memory_repo.manager import MemoryRepoManager
from letta.services.memory_repo.storage.base import StorageBackend
from letta.services.memory_repo.storage.gcs import GCSStorageBackend
from letta.services.memory_repo.storage.local import LocalStorageBackend
# MemfsClient: try cloud implementation first, fall back to local filesystem
try:
from letta.services.memory_repo.memfs_client import MemfsClient
except ImportError:
from letta.services.memory_repo.memfs_client_base import MemfsClient
__all__ = [
"MemoryRepoManager",
"MemfsClient",
"StorageBackend",
"GCSStorageBackend",
"LocalStorageBackend",
]

View File

@@ -0,0 +1,425 @@
"""Local filesystem-based client for git memory operations.
This is the open-source implementation that stores git repositories
on the local filesystem (~/.letta/memfs/ by default). This enables
git-backed memory for self-hosted deployments without external dependencies.
The cloud/enterprise version (memfs_client.py) connects to the memfs
HTTP service instead.
"""
import hashlib
import json
import os
import uuid
from typing import Dict, List, Optional
from letta.log import get_logger
from letta.otel.tracing import trace_method
from letta.schemas.block import Block as PydanticBlock
from letta.schemas.memory_repo import MemoryCommit
from letta.schemas.user import User as PydanticUser
from letta.services.memory_repo.git_operations import GitOperations
from letta.services.memory_repo.storage.local import LocalStorageBackend
from letta.utils import enforce_types
logger = get_logger(__name__)
# File paths within the memory repository
BLOCKS_DIR = "blocks"
METADATA_FILE = "metadata/blocks.json"
# Default local storage path
DEFAULT_LOCAL_PATH = os.path.expanduser("~/.letta/memfs")
class MemfsClient:
"""Local filesystem-based client for git memory operations.
Provides the same interface as the cloud MemfsClient but stores
repositories on the local filesystem using LocalStorageBackend.
This enables git-backed memory for self-hosted OSS deployments.
"""
def __init__(self, base_url: str = None, local_path: str = None, timeout: float = 120.0):
"""Initialize the local memfs client.
Args:
base_url: Ignored (for interface compatibility with cloud client)
local_path: Path for local storage (default: ~/.letta/memfs)
timeout: Ignored (for interface compatibility)
"""
self.local_path = local_path or DEFAULT_LOCAL_PATH
self.storage = LocalStorageBackend(base_path=self.local_path)
self.git = GitOperations(storage=self.storage, redis_client=None)
logger.info(f"MemfsClient initialized with local storage at {self.local_path}")
async def close(self):
"""Close the client (no-op for local storage)."""
pass
# =========================================================================
# Repository Operations
# =========================================================================
@enforce_types
@trace_method
async def create_repo_async(
self,
agent_id: str,
actor: PydanticUser,
initial_blocks: List[PydanticBlock] = None,
) -> str:
"""Create a new repository for an agent with optional initial blocks.
Args:
agent_id: Agent ID
actor: User performing the operation
initial_blocks: Optional list of blocks to commit as initial state
Returns:
The HEAD SHA of the created repository
"""
initial_blocks = initial_blocks or []
org_id = actor.organization_id
# Build initial files from blocks
initial_files = {}
metadata = {"blocks": {}}
for block in initial_blocks:
file_path = f"{BLOCKS_DIR}/{block.label}.md"
initial_files[file_path] = block.value or ""
metadata["blocks"][block.label] = {
"description": block.description,
"limit": block.limit,
}
if metadata["blocks"]:
initial_files[METADATA_FILE] = json.dumps(metadata, indent=2)
return await self.git.create_repo(
agent_id=agent_id,
org_id=org_id,
initial_files=initial_files,
author_name=f"User {actor.id}",
author_email=f"{actor.id}@letta.ai",
)
# =========================================================================
# Block Operations (Read)
# =========================================================================
@enforce_types
@trace_method
async def get_blocks_async(
self,
agent_id: str,
actor: PydanticUser,
ref: str = "HEAD",
) -> List[PydanticBlock]:
"""Get all memory blocks at a specific ref.
Args:
agent_id: Agent ID
actor: User performing the operation
ref: Git ref (commit SHA, branch name, or 'HEAD')
Returns:
List of memory blocks
"""
org_id = actor.organization_id
try:
files = await self.git.get_files(agent_id, org_id, ref)
except FileNotFoundError:
return []
# Parse metadata
metadata: dict = {}
if METADATA_FILE in files:
try:
metadata_json = json.loads(files[METADATA_FILE])
if isinstance(metadata_json, dict):
metadata = metadata_json.get("blocks", {}) or {}
except json.JSONDecodeError:
logger.warning(f"Failed to parse metadata for agent {agent_id}")
# Convert block files to PydanticBlock
blocks = []
for file_path, content in files.items():
if file_path.startswith(f"{BLOCKS_DIR}/") and file_path.endswith(".md"):
label = file_path[len(f"{BLOCKS_DIR}/") : -3]
block_meta = metadata.get(label, {})
# Generate deterministic UUID-style ID from agent_id + label
synthetic_uuid = uuid.UUID(hashlib.md5(f"{agent_id}:{label}".encode()).hexdigest())
blocks.append(
PydanticBlock(
id=f"block-{synthetic_uuid}",
label=label,
value=content,
description=block_meta.get("description"),
limit=block_meta.get("limit", 5000),
metadata=block_meta.get("metadata", {}),
)
)
return blocks
@enforce_types
@trace_method
async def get_block_async(
self,
agent_id: str,
label: str,
actor: PydanticUser,
ref: str = "HEAD",
) -> Optional[PydanticBlock]:
"""Get a specific memory block.
Args:
agent_id: Agent ID
label: Block label
actor: User performing the operation
ref: Git ref
Returns:
Memory block or None
"""
blocks = await self.get_blocks_async(agent_id, actor, ref)
for block in blocks:
if block.label == label:
return block
return None
# =========================================================================
# Block Operations (Write)
# =========================================================================
async def _ensure_repo_exists(self, agent_id: str, actor: PydanticUser) -> str:
"""Ensure the repository exists, creating if needed."""
try:
return await self.git.get_head_sha(agent_id, actor.organization_id)
except FileNotFoundError:
return await self.git.create_repo(
agent_id=agent_id,
org_id=actor.organization_id,
initial_files={},
author_name=f"User {actor.id}",
author_email=f"{actor.id}@letta.ai",
)
@enforce_types
@trace_method
async def update_block_async(
self,
agent_id: str,
label: str,
value: str,
actor: PydanticUser,
message: Optional[str] = None,
) -> MemoryCommit:
"""Update a memory block.
Args:
agent_id: Agent ID
label: Block label
value: New block value
actor: User performing the operation
message: Optional commit message
Returns:
Commit details
"""
from letta.schemas.memory_repo import FileChange
await self._ensure_repo_exists(agent_id, actor)
file_path = f"{BLOCKS_DIR}/{label}.md"
commit_message = message or f"Update {label}"
return await self.git.commit(
agent_id=agent_id,
org_id=actor.organization_id,
changes=[FileChange(path=file_path, content=value, change_type="modify")],
message=commit_message,
author_name=f"User {actor.id}",
author_email=f"{actor.id}@letta.ai",
)
@enforce_types
@trace_method
async def create_block_async(
self,
agent_id: str,
block: PydanticBlock,
actor: PydanticUser,
message: Optional[str] = None,
) -> MemoryCommit:
"""Create a new memory block.
Args:
agent_id: Agent ID
block: Block to create
actor: User performing the operation
message: Optional commit message
Returns:
Commit details
"""
from letta.schemas.memory_repo import FileChange
await self._ensure_repo_exists(agent_id, actor)
org_id = actor.organization_id
# Get current metadata
try:
files = await self.git.get_files(agent_id, org_id)
except FileNotFoundError:
files = {}
metadata = {"blocks": {}}
if METADATA_FILE in files:
try:
raw_metadata = json.loads(files[METADATA_FILE])
if isinstance(raw_metadata, dict) and isinstance(raw_metadata.get("blocks"), dict):
metadata = raw_metadata
except json.JSONDecodeError:
pass
# Add new block metadata
metadata["blocks"][block.label] = {
"description": block.description,
"limit": block.limit,
"metadata": block.metadata or {},
}
# Prepare changes
changes = [
FileChange(
path=f"{BLOCKS_DIR}/{block.label}.md",
content=block.value,
change_type="add",
),
FileChange(
path=METADATA_FILE,
content=json.dumps(metadata, indent=2),
change_type="modify",
),
]
commit_message = message or f"Create block {block.label}"
return await self.git.commit(
agent_id=agent_id,
org_id=org_id,
changes=changes,
message=commit_message,
author_name=f"User {actor.id}",
author_email=f"{actor.id}@letta.ai",
)
@enforce_types
@trace_method
async def delete_block_async(
self,
agent_id: str,
label: str,
actor: PydanticUser,
message: Optional[str] = None,
) -> MemoryCommit:
"""Delete a memory block.
Args:
agent_id: Agent ID
label: Block label to delete
actor: User performing the operation
message: Optional commit message
Returns:
Commit details
"""
from letta.schemas.memory_repo import FileChange
await self._ensure_repo_exists(agent_id, actor)
org_id = actor.organization_id
# Get current metadata
try:
files = await self.git.get_files(agent_id, org_id)
except FileNotFoundError:
files = {}
metadata = {"blocks": {}}
if METADATA_FILE in files:
try:
raw_metadata = json.loads(files[METADATA_FILE])
if isinstance(raw_metadata, dict) and isinstance(raw_metadata.get("blocks"), dict):
metadata = raw_metadata
except json.JSONDecodeError:
pass
# Remove block from metadata
if label in metadata["blocks"]:
del metadata["blocks"][label]
# Prepare changes
changes = [
FileChange(
path=f"{BLOCKS_DIR}/{label}.md",
content=None,
change_type="delete",
),
FileChange(
path=METADATA_FILE,
content=json.dumps(metadata, indent=2),
change_type="modify",
),
]
commit_message = message or f"Delete block {label}"
return await self.git.commit(
agent_id=agent_id,
org_id=org_id,
changes=changes,
message=commit_message,
author_name=f"User {actor.id}",
author_email=f"{actor.id}@letta.ai",
)
# =========================================================================
# History Operations
# =========================================================================
@enforce_types
@trace_method
async def get_history_async(
self,
agent_id: str,
actor: PydanticUser,
path: Optional[str] = None,
limit: int = 50,
) -> List[MemoryCommit]:
"""Get commit history.
Args:
agent_id: Agent ID
actor: User performing the operation
path: Optional file path to filter by
limit: Maximum commits to return
Returns:
List of commits, newest first
"""
try:
return await self.git.get_history(
agent_id=agent_id,
org_id=actor.organization_id,
path=path,
limit=limit,
)
except FileNotFoundError:
return []

View File

@@ -2,8 +2,10 @@
from letta.services.memory_repo.storage.base import StorageBackend
from letta.services.memory_repo.storage.gcs import GCSStorageBackend
from letta.services.memory_repo.storage.local import LocalStorageBackend
__all__ = [
"StorageBackend",
"GCSStorageBackend",
"LocalStorageBackend",
]

View File

@@ -0,0 +1,149 @@
"""Local filesystem storage backend for memory repositories.
This backend stores git repository data on the local filesystem,
making git-backed memory available without external dependencies.
Ideal for self-hosted OSS deployments.
"""
import os
import shutil
from pathlib import Path
from typing import List, Optional
from letta.log import get_logger
from letta.services.memory_repo.storage.base import StorageBackend
logger = get_logger(__name__)
class LocalStorageBackend(StorageBackend):
"""Local filesystem storage backend for memory repositories.
Stores repository data under a configurable base path, defaulting to
~/.letta/memfs/. This enables git-backed memory for self-hosted
deployments without requiring cloud storage.
Directory structure:
{base_path}/{prefix}/{org_id}/{agent_id}/repo.git/
"""
def __init__(
self,
base_path: Optional[str] = None,
prefix: str = "repository",
):
"""Initialize local storage backend.
Args:
base_path: Base directory for storage (default: ~/.letta/memfs)
prefix: Prefix for all paths in this backend (default: "repository")
"""
if base_path is None:
base_path = os.path.expanduser("~/.letta/memfs")
self._base_path = Path(base_path)
self._prefix = prefix.rstrip("/")
self._bucket_name = "local" # For interface compatibility
# Ensure base directory exists
self._base_path.mkdir(parents=True, exist_ok=True)
logger.debug(f"LocalStorageBackend initialized at {self._base_path}")
def _full_path(self, path: str) -> Path:
"""Get full filesystem path including prefix."""
path = path.lstrip("/")
if self._prefix:
return self._base_path / self._prefix / path
return self._base_path / path
@property
def bucket_name(self) -> str:
"""Return the bucket name (for interface compatibility)."""
return self._bucket_name
async def upload_bytes(self, path: str, content: bytes) -> None:
"""Write bytes to a local file."""
full_path = self._full_path(path)
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, "wb") as f:
f.write(content)
logger.debug(f"Wrote {len(content)} bytes to {full_path}")
async def download_bytes(self, path: str) -> bytes:
"""Read bytes from a local file."""
full_path = self._full_path(path)
if not full_path.exists():
raise FileNotFoundError(f"{full_path} not found")
with open(full_path, "rb") as f:
return f.read()
async def exists(self, path: str) -> bool:
"""Check if a path exists."""
full_path = self._full_path(path)
return full_path.exists()
async def delete(self, path: str) -> None:
"""Delete a file."""
full_path = self._full_path(path)
if not full_path.exists():
raise FileNotFoundError(f"{full_path} not found")
full_path.unlink()
logger.debug(f"Deleted {full_path}")
async def list_files(self, prefix: str) -> List[str]:
"""List all files with the given prefix."""
full_prefix = self._full_path(prefix)
if not full_prefix.exists():
return []
result = []
if full_prefix.is_file():
# Prefix is a file, return it
rel_path = str(full_prefix.relative_to(self._base_path / self._prefix))
result.append(rel_path)
else:
# Walk directory
for file_path in full_prefix.rglob("*"):
if file_path.is_file():
rel_path = str(file_path.relative_to(self._base_path / self._prefix))
result.append(rel_path)
return result
async def delete_prefix(self, prefix: str) -> int:
"""Delete all files with the given prefix."""
full_prefix = self._full_path(prefix)
if not full_prefix.exists():
return 0
# Count files before deletion
count = sum(1 for _ in full_prefix.rglob("*") if _.is_file())
if full_prefix.is_file():
full_prefix.unlink()
count = 1
else:
shutil.rmtree(full_prefix, ignore_errors=True)
logger.debug(f"Deleted {count} files with prefix {prefix}")
return count
async def copy(self, source_path: str, dest_path: str) -> None:
"""Copy a file."""
source_full = self._full_path(source_path)
dest_full = self._full_path(dest_path)
if not source_full.exists():
raise FileNotFoundError(f"{source_full} not found")
dest_full.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_full, dest_full)
logger.debug(f"Copied {source_full} to {dest_full}")

View File

@@ -324,6 +324,15 @@ class Settings(BaseSettings):
validation_alias=AliasChoices("LETTA_OBJECT_STORE_PROJECT"),
description="Optional project override for object store clients (e.g., GCS project).",
)
# memfs service URL - when set, git memory operations are proxied to the memfs service
# instead of running locally. This enables separating git/GCS operations into a dedicated service.
memfs_service_url: str | None = Field(
default=None,
validation_alias=AliasChoices("LETTA_MEMFS_SERVICE_URL"),
description="URL of the memfs service (e.g., http://memfs:8284). When set, git memory operations use this service.",
)
# multi agent settings
multi_agent_send_message_max_retries: int = 3
multi_agent_send_message_timeout: int = 20 * 60