fix(core): preserve git-memory formatting and enforce lock conflicts (#9451)
* test(core): strengthen git-memory system prompt stability integration coverage Switch git-memory HTTP integration tests to OpenAI model handles and add assertions that system prompt content remains stable after normal turns and direct block value updates until explicit recompilation or reset. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): preserve git-memory formatting and enforce lock conflicts Preserve existing markdown frontmatter formatting on block updates while still ensuring required metadata fields exist, and make post-push git sync propagate memory-repo lock conflicts as 409 responses. Also enable slash-containing core-memory block labels in route params and add regression coverage. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(memfs): fail closed on memory repo lock contention Make memfs git commits fail closed when the per-agent Redis lock cannot be acquired, return 409 MEMORY_REPO_BUSY from the memfs files write API, and map that 409 back to core MemoryRepoBusyError so API callers receive consistent busy conflicts. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * chore(core): minimize git-memory fix scope to memfs lock and frontmatter paths 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * chore: drop unrelated changes and keep memfs-focused scope Revert branch-only changes that are not required for the memfs lock contention and frontmatter-preservation fix so the PR contains only issue-relevant files. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(memfs): lock push sync path and improve nested sync diagnostics Serialize memfs push-to-GCS sync with the same per-agent Redis lock key used by API commits, and add targeted post-push nested-block diagnostics plus a focused nested-label sync regression test for _sync_after_push. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> --------- Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
committed by
Caren Thomas
parent
d7793a4474
commit
05073ba837
@@ -21,8 +21,6 @@ import orjson
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.exceptions import RequestValidationError
|
||||
from fastapi.responses import JSONResponse, ORJSONResponse
|
||||
|
||||
from letta.helpers.json_helpers import sanitize_unicode_surrogates
|
||||
from marshmallow import ValidationError
|
||||
from sqlalchemy.exc import DBAPIError, IntegrityError, OperationalError
|
||||
from starlette.middleware.cors import CORSMiddleware
|
||||
@@ -59,9 +57,11 @@ from letta.errors import (
|
||||
LLMProviderOverloaded,
|
||||
LLMRateLimitError,
|
||||
LLMTimeoutError,
|
||||
MemoryRepoBusyError,
|
||||
NoActiveRunsToCancelError,
|
||||
PendingApprovalError,
|
||||
)
|
||||
from letta.helpers.json_helpers import sanitize_unicode_surrogates
|
||||
from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pinecone, upsert_pinecone_indices
|
||||
from letta.jobs.scheduler import start_scheduler_with_leader_election
|
||||
from letta.log import get_logger
|
||||
@@ -574,6 +574,7 @@ def create_application() -> "FastAPI":
|
||||
app.add_exception_handler(IntegrityError, _error_handler_409)
|
||||
app.add_exception_handler(ConcurrentUpdateError, _error_handler_409)
|
||||
app.add_exception_handler(ConversationBusyError, _error_handler_409)
|
||||
app.add_exception_handler(MemoryRepoBusyError, _error_handler_409)
|
||||
app.add_exception_handler(PendingApprovalError, _error_handler_409)
|
||||
app.add_exception_handler(NoActiveRunsToCancelError, _error_handler_409)
|
||||
|
||||
|
||||
@@ -496,6 +496,17 @@ async def _sync_after_push(actor_id: str, agent_id: str) -> None:
|
||||
expected_labels = set()
|
||||
from letta.services.memory_repo.block_markdown import parse_block_markdown
|
||||
|
||||
md_file_paths = sorted([file_path for file_path in files if file_path.endswith(".md")])
|
||||
nested_md_file_paths = [file_path for file_path in md_file_paths if "/" in file_path[:-3]]
|
||||
logger.info(
|
||||
"Post-push sync file scan: agent=%s total_files=%d md_files=%d nested_md_files=%d sample_md_paths=%s",
|
||||
agent_id,
|
||||
len(files),
|
||||
len(md_file_paths),
|
||||
len(nested_md_file_paths),
|
||||
md_file_paths[:10],
|
||||
)
|
||||
|
||||
synced = 0
|
||||
for file_path, content in files.items():
|
||||
if not file_path.endswith(".md"):
|
||||
@@ -521,7 +532,13 @@ async def _sync_after_push(actor_id: str, agent_id: str) -> None:
|
||||
synced += 1
|
||||
logger.info("Synced block %s to PostgreSQL", label)
|
||||
except Exception:
|
||||
logger.exception("Failed to sync block %s to PostgreSQL (agent=%s)", label, agent_id)
|
||||
logger.exception(
|
||||
"Failed to sync block %s to PostgreSQL (agent=%s) [path=%s nested=%s]",
|
||||
label,
|
||||
agent_id,
|
||||
file_path,
|
||||
"/" in label,
|
||||
)
|
||||
|
||||
if synced == 0:
|
||||
logger.warning("No *.md files found in repo HEAD during post-push sync (agent=%s)", agent_id)
|
||||
|
||||
@@ -34,8 +34,8 @@ def serialize_block(
|
||||
) -> str:
|
||||
"""Serialize a block to Markdown with optional YAML frontmatter.
|
||||
|
||||
Only non-default fields are included in the frontmatter.
|
||||
If all fields are at their defaults, no frontmatter is emitted.
|
||||
This is used for initial file creation. For updates to existing files,
|
||||
prefer `merge_frontmatter_with_body` to preserve user formatting.
|
||||
"""
|
||||
# description and limit are always included in frontmatter.
|
||||
# read_only and metadata are only included when non-default.
|
||||
@@ -54,6 +54,102 @@ def serialize_block(
|
||||
return f"---\n{yaml_str}\n---\n{value}"
|
||||
|
||||
|
||||
def _extract_frontmatter(content: str) -> tuple[Optional[str], str]:
|
||||
"""Return (frontmatter_yaml, body).
|
||||
|
||||
If no valid opening/closing frontmatter delimiters are found, returns
|
||||
(None, original_content).
|
||||
"""
|
||||
if not content.startswith("---\n"):
|
||||
return None, content
|
||||
|
||||
end_idx = content.find("\n---\n", 4)
|
||||
if end_idx == -1:
|
||||
return None, content
|
||||
|
||||
yaml_str = content[4:end_idx]
|
||||
body = content[end_idx + 5 :]
|
||||
return yaml_str, body
|
||||
|
||||
|
||||
def merge_frontmatter_with_body(
|
||||
existing_content: str,
|
||||
*,
|
||||
value: str,
|
||||
description: Optional[str],
|
||||
limit: Optional[int],
|
||||
read_only: bool,
|
||||
metadata: Optional[dict],
|
||||
) -> str:
|
||||
"""Update block content while preserving existing frontmatter formatting when possible.
|
||||
|
||||
Behavior:
|
||||
- If existing content has YAML frontmatter, parse it and update keys in-memory,
|
||||
then splice back using the exact original YAML text when values are unchanged.
|
||||
- If keys changed or missing, emit normalized frontmatter only for changed keys,
|
||||
while preserving body exactly as provided.
|
||||
- If no frontmatter exists, create one.
|
||||
"""
|
||||
yaml_str, _existing_body = _extract_frontmatter(existing_content)
|
||||
|
||||
if yaml_str is None:
|
||||
return serialize_block(
|
||||
value=value,
|
||||
description=description,
|
||||
limit=limit,
|
||||
read_only=read_only,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
try:
|
||||
parsed = yaml.safe_load(yaml_str) or {}
|
||||
except yaml.YAMLError:
|
||||
parsed = {}
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
parsed = {}
|
||||
|
||||
# Desired values
|
||||
desired_description = description
|
||||
desired_limit = limit if limit is not None else _get_field_default("limit")
|
||||
desired_read_only = read_only
|
||||
desired_metadata = metadata if metadata is not None else _get_field_default("metadata")
|
||||
|
||||
# Track whether anything semantically changes in frontmatter.
|
||||
changed = False
|
||||
|
||||
if "description" not in parsed or parsed.get("description") != desired_description:
|
||||
parsed["description"] = desired_description
|
||||
changed = True
|
||||
|
||||
if "limit" not in parsed or parsed.get("limit") != desired_limit:
|
||||
parsed["limit"] = desired_limit
|
||||
changed = True
|
||||
|
||||
if desired_read_only != _get_field_default("read_only"):
|
||||
if parsed.get("read_only") != desired_read_only:
|
||||
parsed["read_only"] = desired_read_only
|
||||
changed = True
|
||||
elif "read_only" in parsed:
|
||||
del parsed["read_only"]
|
||||
changed = True
|
||||
|
||||
if desired_metadata and desired_metadata != _get_field_default("metadata"):
|
||||
if parsed.get("metadata") != desired_metadata:
|
||||
parsed["metadata"] = desired_metadata
|
||||
changed = True
|
||||
elif "metadata" in parsed:
|
||||
del parsed["metadata"]
|
||||
changed = True
|
||||
|
||||
# If frontmatter semantics unchanged, preserve original YAML formatting verbatim.
|
||||
if not changed:
|
||||
return f"---\n{yaml_str}\n---\n{value}"
|
||||
|
||||
normalized_yaml = yaml.dump(parsed, default_flow_style=False, sort_keys=False, allow_unicode=True).rstrip("\n")
|
||||
return f"---\n{normalized_yaml}\n---\n{value}"
|
||||
|
||||
|
||||
def parse_block_markdown(content: str) -> Dict[str, Any]:
|
||||
"""Parse a Markdown file into block fields.
|
||||
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
import letta.server.rest_api.routers.v1.git_http as git_http_router
|
||||
from letta.log_context import get_log_context
|
||||
from letta.server.rest_api.middleware import LoggingMiddleware
|
||||
|
||||
@@ -35,6 +38,64 @@ def client(app):
|
||||
|
||||
|
||||
class TestLogContextMiddleware:
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_after_push_syncs_nested_block_labels_to_postgres(self, monkeypatch):
|
||||
"""Regression test: nested labels (e.g., system/human) are synced from git files."""
|
||||
|
||||
synced_calls = []
|
||||
|
||||
class DummyActor:
|
||||
id = "user-123"
|
||||
organization_id = "org-123"
|
||||
|
||||
class DummyGit:
|
||||
async def get_files(self, agent_id, org_id, ref):
|
||||
assert ref == "HEAD"
|
||||
return {
|
||||
"system/human.md": "---\ndescription: human\nlimit: 20000\n---\nname: sarah",
|
||||
"system/persona.md": "---\ndescription: persona\nlimit: 20000\n---\nbe helpful",
|
||||
}
|
||||
|
||||
class DummyMemoryRepoManager:
|
||||
git = DummyGit()
|
||||
|
||||
class DummyBlockManager:
|
||||
async def _sync_block_to_postgres(self, **kwargs):
|
||||
synced_calls.append(kwargs)
|
||||
|
||||
class DummyAgentManager:
|
||||
async def list_agent_blocks_async(self, **kwargs):
|
||||
return []
|
||||
|
||||
class DummyUserManager:
|
||||
async def get_actor_by_id_async(self, actor_id):
|
||||
return DummyActor()
|
||||
|
||||
class DummyServer:
|
||||
user_manager = DummyUserManager()
|
||||
memory_repo_manager = DummyMemoryRepoManager()
|
||||
block_manager = DummyBlockManager()
|
||||
agent_manager = DummyAgentManager()
|
||||
|
||||
class DummyGitEnabledBlockManager(DummyBlockManager):
|
||||
pass
|
||||
|
||||
dummy_server = DummyServer()
|
||||
dummy_server.block_manager = DummyGitEnabledBlockManager()
|
||||
|
||||
monkeypatch.setattr(git_http_router, "_server_instance", dummy_server)
|
||||
|
||||
from letta.settings import settings as core_settings
|
||||
|
||||
monkeypatch.setattr(core_settings, "memfs_service_url", "http://memfs.test")
|
||||
|
||||
with patch("letta.services.block_manager_git.GitEnabledBlockManager", DummyGitEnabledBlockManager):
|
||||
await git_http_router._sync_after_push(actor_id="user-123", agent_id="agent-123")
|
||||
|
||||
labels = {call["label"] for call in synced_calls}
|
||||
assert "system/human" in labels
|
||||
assert "system/persona" in labels
|
||||
|
||||
def test_extracts_actor_id_from_headers(self, client):
|
||||
response = client.get("/v1/agents/agent-123e4567-e89b-42d3-8456-426614174000", headers={"user_id": "user-abc123"})
|
||||
assert response.status_code == 200
|
||||
|
||||
Reference in New Issue
Block a user