From 7b5d03a714f93e65484a8ca5cb53da847f728bee Mon Sep 17 00:00:00 2001 From: cthomas Date: Thu, 4 Dec 2025 12:15:51 -0800 Subject: [PATCH] feat: use Letta XML memory format in Anthropic proxy (#6514) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update memory injection to use Letta XML format with memory_blocks tags - Extract memory formatting into format_memory_blocks() helper function 🐙 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta --- letta/server/rest_api/routers/v1/anthropic.py | 146 ++++++++---------- 1 file changed, 62 insertions(+), 84 deletions(-) diff --git a/letta/server/rest_api/routers/v1/anthropic.py b/letta/server/rest_api/routers/v1/anthropic.py index 9cfc7a3f..bd323b78 100644 --- a/letta/server/rest_api/routers/v1/anthropic.py +++ b/letta/server/rest_api/routers/v1/anthropic.py @@ -1,12 +1,5 @@ -""" -Anthropic API proxy router. - -This router proxies requests to the Anthropic API, allowing Claude Code CLI -to use Letta as an intermediary by setting anthropic_base_url in settings.json. -""" - import asyncio -import os +import json import httpx from fastapi import APIRouter, Depends, Request @@ -25,19 +18,11 @@ logger = get_logger(__name__) router = APIRouter(prefix="/anthropic", tags=["anthropic"]) -# Anthropic API base URL ANTHROPIC_API_BASE = "https://api.anthropic.com" def extract_user_messages(body: bytes) -> list[str]: - """ - Extract user messages from the request body. - - Returns a list of user message content strings. - """ try: - import json - request_data = json.loads(body) messages = request_data.get("messages", []) @@ -45,27 +30,20 @@ def extract_user_messages(body: bytes) -> list[str]: for msg in messages: if msg.get("role") == "user": content = msg.get("content", "") - # Content can be a string or a list of content blocks if isinstance(content, str): user_messages.append(content) elif isinstance(content, list): - # Extract text from content blocks for block in content: if isinstance(block, dict) and block.get("type") == "text": user_messages.append(block.get("text", "")) return user_messages except Exception as e: - logger.warning(f"Failed to extract user messages: {e}") + logger.warning(f"[Anthropic Proxy] Failed to extract user messages: {e}") return [] def extract_assistant_message(response_data: dict) -> str: - """ - Extract assistant message text from Anthropic API response. - - Returns the concatenated text content from the assistant's response. - """ try: content_blocks = response_data.get("content", []) text_parts = [] @@ -76,28 +54,54 @@ def extract_assistant_message(response_data: dict) -> str: return "\n".join(text_parts) except Exception as e: - logger.warning(f"Failed to extract assistant message: {e}") + logger.warning(f"[Anthropic Proxy] Failed to extract assistant message: {e}") return "" +def format_memory_blocks(blocks) -> str: + blocks_with_content = [block for block in blocks if block.value] + + if not blocks_with_content: + return "" + + memory_context = "\nThe following memory blocks are currently engaged in your core memory unit:\n\n" + + for idx, block in enumerate(blocks_with_content): + label = block.label or "block" + value = block.value or "" + desc = block.description or "" + chars_current = len(value) + limit = block.limit if block.limit is not None else 0 + + memory_context += f"<{label}>\n" + memory_context += "\n" + memory_context += f"{desc}\n" + memory_context += "\n" + memory_context += "\n" + memory_context += f"- chars_current={chars_current}\n" + memory_context += f"- chars_limit={limit}\n" + memory_context += "\n" + memory_context += "\n" + memory_context += f"{value}\n" + memory_context += "\n" + memory_context += f"\n" + + if idx != len(blocks_with_content) - 1: + memory_context += "\n" + + memory_context += "\n" + return memory_context + + def _build_response_from_chunks(chunks: list[bytes]) -> str: - """ - Build assistant message from streaming response chunks. - - Parses SSE (Server-Sent Events) format and extracts text deltas. - """ try: - import json - text_parts = [] full_data = b"".join(chunks).decode("utf-8") - # Parse SSE format: "data: {json}\n\n" for line in full_data.split("\n"): if line.startswith("data: "): data_str = line[6:] # Remove "data: " prefix - # Skip special messages if data_str.strip() in ["[DONE]", ""]: continue @@ -105,7 +109,6 @@ def _build_response_from_chunks(chunks: list[bytes]) -> str: event_data = json.loads(data_str) event_type = event_data.get("type") - # Extract text from content_block_delta events if event_type == "content_block_delta": delta = event_data.get("delta", {}) if delta.get("type") == "text_delta": @@ -115,7 +118,7 @@ def _build_response_from_chunks(chunks: list[bytes]) -> str: return "".join(text_parts) except Exception as e: - logger.warning(f"Failed to build response from chunks: {e}") + logger.warning(f"[Anthropic Proxy] Failed to build response from chunks: {e}") return "" @@ -125,42 +128,19 @@ async def _inject_memory_context( actor, request_data: dict, ) -> dict: - """ - Inject relevant memory context into the request. - - Searches agent's memory and prepends relevant context to the system prompt. - - Args: - server: SyncServer instance - agent_id: Agent ID to search memory - actor: Actor performing the operation - request_data: Original request data dict - - Returns: - Modified request data with memory context injected - """ try: - # Extract user messages to use as search query messages = request_data.get("messages", []) if not messages: return request_data - memory_context = "Memory context from prior conversation:\n\n" - found = False - block_count = 0 - for block in agent.blocks: - if block.value: - memory_context += f"{block.label.upper()}: {block.value}\n\n" - found = True - block_count += 1 + memory_context = format_memory_blocks(agent.blocks) - if not found: - logger.debug("No memory blocks found, skipping memory injection") + if not memory_context: + logger.debug("[Anthropic Proxy] No memory blocks found, skipping memory injection") return request_data - memory_context = memory_context.rstrip() - - logger.info(f"💭 Injecting {block_count} memory block(s) into request") + block_count = len([b for b in agent.blocks if b.value]) + logger.info(f"[Anthropic Proxy] Injecting {block_count} memory block(s) into request") # Inject into system prompt modified_data = request_data.copy() @@ -191,13 +171,13 @@ async def _inject_memory_context( # Claude Code typically uses budget_tokens around 10000-20000 modified_data["max_tokens"] = budget_tokens + 4096 logger.info( - f"⚠️ Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})" + f"[Anthropic Proxy] Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})" ) return modified_data except Exception as e: - logger.exception(f"Failed to inject memory context: {e}") + logger.exception(f"[Anthropic Proxy] Failed to inject memory context: {e}") return request_data @@ -223,9 +203,9 @@ async def _persist_messages_background( assistant_message=assistant_message, model=model_name, ) - logger.info(f"✅ Persisted messages: {result['messages_created']} messages saved") + logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved") except Exception as e: - logger.error(f"Failed to persist messages in background: {e}") + logger.error(f"[Anthropic Proxy] Failed to persist messages in background: {e}") @router.api_route("/v1/messages", methods=["POST"], operation_id="anthropic_messages_proxy", include_in_schema=False) @@ -243,15 +223,15 @@ async def anthropic_messages_proxy( Usage in Claude Code CLI settings.json: { "env": { - "ANTHROPIC_BASE_URL": "http://localhost:8283/v1/anthropic" + "ANTHROPIC_BASE_URL": "http://localhost:3000/v1/anthropic" } } """ # Get the request body body = await request.body() - logger.info(f"Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages") - logger.debug(f"Request body preview: {body[:200]}...") + logger.info(f"[Anthropic Proxy] Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages") + logger.debug(f"[Anthropic Proxy] Request body preview: {body[:200]}...") actor = await server.user_manager.get_actor_or_default_async(headers.actor_id) @@ -265,7 +245,7 @@ async def anthropic_messages_proxy( first_message = user_messages[0] if len(user_messages) > 0 else "" if first_message.startswith(""): is_system_request = True - logger.debug("Skipping capture/memory for system request") + logger.debug("[Anthropic Proxy] Skipping capture/memory for system request") if user_messages and not is_system_request: logger.info("=" * 70) @@ -281,15 +261,13 @@ async def anthropic_messages_proxy( anthropic_api_key = model_settings.anthropic_api_key if not anthropic_api_key: - logger.error("No Anthropic API key found in headers or settings") + logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings") return Response( content='{"error": {"type": "authentication_error", "message": "Anthropic API key required. Pass via anthropic-api-key or x-api-key header."}}', status_code=401, media_type="application/json", ) - logger.debug(f"Using Anthropic API key: {anthropic_api_key[:10]}...") - # Prepare headers for Anthropic API anthropic_headers = { "x-api-key": anthropic_api_key, @@ -304,10 +282,10 @@ async def anthropic_messages_proxy( request_data = json.loads(body) is_streaming = request_data.get("stream", False) model_name = request_data.get("model") - logger.debug(f"Request is streaming: {is_streaming}") - logger.debug(f"Model: {model_name}") + logger.debug(f"[Anthropic Proxy] Request is streaming: {is_streaming}") + logger.debug(f"[Anthropic Proxy] Model: {model_name}") except Exception as e: - logger.warning(f"Failed to parse request body: {e}") + logger.warning(f"[Anthropic Proxy] Failed to parse request body: {e}") is_streaming = False model_name = None @@ -321,9 +299,9 @@ async def anthropic_messages_proxy( server=server, actor=actor, ) - logger.debug(f"Using agent ID: {agent.id}") + logger.debug(f"[Anthropic Proxy] Using agent ID: {agent.id}") except Exception as e: - logger.error(f"Failed to get/create agent: {e}") + logger.error(f"[Anthropic Proxy] Failed to get/create agent: {e}") # Inject memory context into request (skip for system requests) # TODO: Optimize - skip memory injection on subsequent messages in same session @@ -428,7 +406,7 @@ async def anthropic_messages_proxy( ) ) except Exception as e: - logger.warning(f"Failed to extract assistant response for logging: {e}") + logger.warning(f"[Anthropic Proxy] Failed to extract assistant response for logging: {e}") return Response( content=response.content, @@ -442,7 +420,7 @@ async def anthropic_messages_proxy( ) except httpx.HTTPError as e: - logger.error(f"Error proxying request to Anthropic API: {e}") + logger.error(f"[Anthropic Proxy] Error proxying request to Anthropic API: {e}") return Response( content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}', status_code=500, @@ -491,7 +469,7 @@ async def anthropic_catchall_proxy( # anthropic_api_key = request.headers.get("x-api-key") or model_settings.anthropic_api_key anthropic_api_key = model_settings.anthropic_api_key if not anthropic_api_key: - logger.error("No Anthropic API key found in headers or settings") + logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings") return Response( content='{"error": {"type": "authentication_error", "message": "Anthropic API key required"}}', status_code=401, @@ -527,7 +505,7 @@ async def anthropic_catchall_proxy( ) except httpx.HTTPError as e: - logger.error(f"Error proxying catch-all request to Anthropic API: {e}") + logger.error(f"[Anthropic Proxy] Error proxying catch-all request to Anthropic API: {e}") return Response( content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}', status_code=500,