feat: use Letta XML memory format in Anthropic proxy (#6514)
- Update memory injection to use Letta XML format with memory_blocks tags - Extract memory formatting into format_memory_blocks() helper function 🐙 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -1,12 +1,5 @@
|
||||
"""
|
||||
Anthropic API proxy router.
|
||||
|
||||
This router proxies requests to the Anthropic API, allowing Claude Code CLI
|
||||
to use Letta as an intermediary by setting anthropic_base_url in settings.json.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import json
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, Depends, Request
|
||||
@@ -25,19 +18,11 @@ logger = get_logger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/anthropic", tags=["anthropic"])
|
||||
|
||||
# Anthropic API base URL
|
||||
ANTHROPIC_API_BASE = "https://api.anthropic.com"
|
||||
|
||||
|
||||
def extract_user_messages(body: bytes) -> list[str]:
|
||||
"""
|
||||
Extract user messages from the request body.
|
||||
|
||||
Returns a list of user message content strings.
|
||||
"""
|
||||
try:
|
||||
import json
|
||||
|
||||
request_data = json.loads(body)
|
||||
messages = request_data.get("messages", [])
|
||||
|
||||
@@ -45,27 +30,20 @@ def extract_user_messages(body: bytes) -> list[str]:
|
||||
for msg in messages:
|
||||
if msg.get("role") == "user":
|
||||
content = msg.get("content", "")
|
||||
# Content can be a string or a list of content blocks
|
||||
if isinstance(content, str):
|
||||
user_messages.append(content)
|
||||
elif isinstance(content, list):
|
||||
# Extract text from content blocks
|
||||
for block in content:
|
||||
if isinstance(block, dict) and block.get("type") == "text":
|
||||
user_messages.append(block.get("text", ""))
|
||||
|
||||
return user_messages
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract user messages: {e}")
|
||||
logger.warning(f"[Anthropic Proxy] Failed to extract user messages: {e}")
|
||||
return []
|
||||
|
||||
|
||||
def extract_assistant_message(response_data: dict) -> str:
|
||||
"""
|
||||
Extract assistant message text from Anthropic API response.
|
||||
|
||||
Returns the concatenated text content from the assistant's response.
|
||||
"""
|
||||
try:
|
||||
content_blocks = response_data.get("content", [])
|
||||
text_parts = []
|
||||
@@ -76,28 +54,54 @@ def extract_assistant_message(response_data: dict) -> str:
|
||||
|
||||
return "\n".join(text_parts)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract assistant message: {e}")
|
||||
logger.warning(f"[Anthropic Proxy] Failed to extract assistant message: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def format_memory_blocks(blocks) -> str:
|
||||
blocks_with_content = [block for block in blocks if block.value]
|
||||
|
||||
if not blocks_with_content:
|
||||
return ""
|
||||
|
||||
memory_context = "<memory_blocks>\nThe following memory blocks are currently engaged in your core memory unit:\n\n"
|
||||
|
||||
for idx, block in enumerate(blocks_with_content):
|
||||
label = block.label or "block"
|
||||
value = block.value or ""
|
||||
desc = block.description or ""
|
||||
chars_current = len(value)
|
||||
limit = block.limit if block.limit is not None else 0
|
||||
|
||||
memory_context += f"<{label}>\n"
|
||||
memory_context += "<description>\n"
|
||||
memory_context += f"{desc}\n"
|
||||
memory_context += "</description>\n"
|
||||
memory_context += "<metadata>\n"
|
||||
memory_context += f"- chars_current={chars_current}\n"
|
||||
memory_context += f"- chars_limit={limit}\n"
|
||||
memory_context += "</metadata>\n"
|
||||
memory_context += "<value>\n"
|
||||
memory_context += f"{value}\n"
|
||||
memory_context += "</value>\n"
|
||||
memory_context += f"</{label}>\n"
|
||||
|
||||
if idx != len(blocks_with_content) - 1:
|
||||
memory_context += "\n"
|
||||
|
||||
memory_context += "\n</memory_blocks>"
|
||||
return memory_context
|
||||
|
||||
|
||||
def _build_response_from_chunks(chunks: list[bytes]) -> str:
|
||||
"""
|
||||
Build assistant message from streaming response chunks.
|
||||
|
||||
Parses SSE (Server-Sent Events) format and extracts text deltas.
|
||||
"""
|
||||
try:
|
||||
import json
|
||||
|
||||
text_parts = []
|
||||
full_data = b"".join(chunks).decode("utf-8")
|
||||
|
||||
# Parse SSE format: "data: {json}\n\n"
|
||||
for line in full_data.split("\n"):
|
||||
if line.startswith("data: "):
|
||||
data_str = line[6:] # Remove "data: " prefix
|
||||
|
||||
# Skip special messages
|
||||
if data_str.strip() in ["[DONE]", ""]:
|
||||
continue
|
||||
|
||||
@@ -105,7 +109,6 @@ def _build_response_from_chunks(chunks: list[bytes]) -> str:
|
||||
event_data = json.loads(data_str)
|
||||
event_type = event_data.get("type")
|
||||
|
||||
# Extract text from content_block_delta events
|
||||
if event_type == "content_block_delta":
|
||||
delta = event_data.get("delta", {})
|
||||
if delta.get("type") == "text_delta":
|
||||
@@ -115,7 +118,7 @@ def _build_response_from_chunks(chunks: list[bytes]) -> str:
|
||||
|
||||
return "".join(text_parts)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to build response from chunks: {e}")
|
||||
logger.warning(f"[Anthropic Proxy] Failed to build response from chunks: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
@@ -125,42 +128,19 @@ async def _inject_memory_context(
|
||||
actor,
|
||||
request_data: dict,
|
||||
) -> dict:
|
||||
"""
|
||||
Inject relevant memory context into the request.
|
||||
|
||||
Searches agent's memory and prepends relevant context to the system prompt.
|
||||
|
||||
Args:
|
||||
server: SyncServer instance
|
||||
agent_id: Agent ID to search memory
|
||||
actor: Actor performing the operation
|
||||
request_data: Original request data dict
|
||||
|
||||
Returns:
|
||||
Modified request data with memory context injected
|
||||
"""
|
||||
try:
|
||||
# Extract user messages to use as search query
|
||||
messages = request_data.get("messages", [])
|
||||
if not messages:
|
||||
return request_data
|
||||
|
||||
memory_context = "Memory context from prior conversation:\n\n"
|
||||
found = False
|
||||
block_count = 0
|
||||
for block in agent.blocks:
|
||||
if block.value:
|
||||
memory_context += f"{block.label.upper()}: {block.value}\n\n"
|
||||
found = True
|
||||
block_count += 1
|
||||
memory_context = format_memory_blocks(agent.blocks)
|
||||
|
||||
if not found:
|
||||
logger.debug("No memory blocks found, skipping memory injection")
|
||||
if not memory_context:
|
||||
logger.debug("[Anthropic Proxy] No memory blocks found, skipping memory injection")
|
||||
return request_data
|
||||
|
||||
memory_context = memory_context.rstrip()
|
||||
|
||||
logger.info(f"💭 Injecting {block_count} memory block(s) into request")
|
||||
block_count = len([b for b in agent.blocks if b.value])
|
||||
logger.info(f"[Anthropic Proxy] Injecting {block_count} memory block(s) into request")
|
||||
|
||||
# Inject into system prompt
|
||||
modified_data = request_data.copy()
|
||||
@@ -191,13 +171,13 @@ async def _inject_memory_context(
|
||||
# Claude Code typically uses budget_tokens around 10000-20000
|
||||
modified_data["max_tokens"] = budget_tokens + 4096
|
||||
logger.info(
|
||||
f"⚠️ Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})"
|
||||
f"[Anthropic Proxy] Adjusted max_tokens from {current_max_tokens} to {modified_data['max_tokens']} (thinking.budget_tokens={budget_tokens})"
|
||||
)
|
||||
|
||||
return modified_data
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Failed to inject memory context: {e}")
|
||||
logger.exception(f"[Anthropic Proxy] Failed to inject memory context: {e}")
|
||||
return request_data
|
||||
|
||||
|
||||
@@ -223,9 +203,9 @@ async def _persist_messages_background(
|
||||
assistant_message=assistant_message,
|
||||
model=model_name,
|
||||
)
|
||||
logger.info(f"✅ Persisted messages: {result['messages_created']} messages saved")
|
||||
logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to persist messages in background: {e}")
|
||||
logger.error(f"[Anthropic Proxy] Failed to persist messages in background: {e}")
|
||||
|
||||
|
||||
@router.api_route("/v1/messages", methods=["POST"], operation_id="anthropic_messages_proxy", include_in_schema=False)
|
||||
@@ -243,15 +223,15 @@ async def anthropic_messages_proxy(
|
||||
Usage in Claude Code CLI settings.json:
|
||||
{
|
||||
"env": {
|
||||
"ANTHROPIC_BASE_URL": "http://localhost:8283/v1/anthropic"
|
||||
"ANTHROPIC_BASE_URL": "http://localhost:3000/v1/anthropic"
|
||||
}
|
||||
}
|
||||
"""
|
||||
# Get the request body
|
||||
body = await request.body()
|
||||
|
||||
logger.info(f"Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages")
|
||||
logger.debug(f"Request body preview: {body[:200]}...")
|
||||
logger.info(f"[Anthropic Proxy] Proxying request to Anthropic Messages API: {ANTHROPIC_API_BASE}/v1/messages")
|
||||
logger.debug(f"[Anthropic Proxy] Request body preview: {body[:200]}...")
|
||||
|
||||
actor = await server.user_manager.get_actor_or_default_async(headers.actor_id)
|
||||
|
||||
@@ -265,7 +245,7 @@ async def anthropic_messages_proxy(
|
||||
first_message = user_messages[0] if len(user_messages) > 0 else ""
|
||||
if first_message.startswith("<system-reminder>"):
|
||||
is_system_request = True
|
||||
logger.debug("Skipping capture/memory for system request")
|
||||
logger.debug("[Anthropic Proxy] Skipping capture/memory for system request")
|
||||
|
||||
if user_messages and not is_system_request:
|
||||
logger.info("=" * 70)
|
||||
@@ -281,15 +261,13 @@ async def anthropic_messages_proxy(
|
||||
anthropic_api_key = model_settings.anthropic_api_key
|
||||
|
||||
if not anthropic_api_key:
|
||||
logger.error("No Anthropic API key found in headers or settings")
|
||||
logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings")
|
||||
return Response(
|
||||
content='{"error": {"type": "authentication_error", "message": "Anthropic API key required. Pass via anthropic-api-key or x-api-key header."}}',
|
||||
status_code=401,
|
||||
media_type="application/json",
|
||||
)
|
||||
|
||||
logger.debug(f"Using Anthropic API key: {anthropic_api_key[:10]}...")
|
||||
|
||||
# Prepare headers for Anthropic API
|
||||
anthropic_headers = {
|
||||
"x-api-key": anthropic_api_key,
|
||||
@@ -304,10 +282,10 @@ async def anthropic_messages_proxy(
|
||||
request_data = json.loads(body)
|
||||
is_streaming = request_data.get("stream", False)
|
||||
model_name = request_data.get("model")
|
||||
logger.debug(f"Request is streaming: {is_streaming}")
|
||||
logger.debug(f"Model: {model_name}")
|
||||
logger.debug(f"[Anthropic Proxy] Request is streaming: {is_streaming}")
|
||||
logger.debug(f"[Anthropic Proxy] Model: {model_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to parse request body: {e}")
|
||||
logger.warning(f"[Anthropic Proxy] Failed to parse request body: {e}")
|
||||
is_streaming = False
|
||||
model_name = None
|
||||
|
||||
@@ -321,9 +299,9 @@ async def anthropic_messages_proxy(
|
||||
server=server,
|
||||
actor=actor,
|
||||
)
|
||||
logger.debug(f"Using agent ID: {agent.id}")
|
||||
logger.debug(f"[Anthropic Proxy] Using agent ID: {agent.id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get/create agent: {e}")
|
||||
logger.error(f"[Anthropic Proxy] Failed to get/create agent: {e}")
|
||||
|
||||
# Inject memory context into request (skip for system requests)
|
||||
# TODO: Optimize - skip memory injection on subsequent messages in same session
|
||||
@@ -428,7 +406,7 @@ async def anthropic_messages_proxy(
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract assistant response for logging: {e}")
|
||||
logger.warning(f"[Anthropic Proxy] Failed to extract assistant response for logging: {e}")
|
||||
|
||||
return Response(
|
||||
content=response.content,
|
||||
@@ -442,7 +420,7 @@ async def anthropic_messages_proxy(
|
||||
)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"Error proxying request to Anthropic API: {e}")
|
||||
logger.error(f"[Anthropic Proxy] Error proxying request to Anthropic API: {e}")
|
||||
return Response(
|
||||
content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}',
|
||||
status_code=500,
|
||||
@@ -491,7 +469,7 @@ async def anthropic_catchall_proxy(
|
||||
# anthropic_api_key = request.headers.get("x-api-key") or model_settings.anthropic_api_key
|
||||
anthropic_api_key = model_settings.anthropic_api_key
|
||||
if not anthropic_api_key:
|
||||
logger.error("No Anthropic API key found in headers or settings")
|
||||
logger.error("[Anthropic Proxy] No Anthropic API key found in headers or settings")
|
||||
return Response(
|
||||
content='{"error": {"type": "authentication_error", "message": "Anthropic API key required"}}',
|
||||
status_code=401,
|
||||
@@ -527,7 +505,7 @@ async def anthropic_catchall_proxy(
|
||||
)
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
logger.error(f"Error proxying catch-all request to Anthropic API: {e}")
|
||||
logger.error(f"[Anthropic Proxy] Error proxying catch-all request to Anthropic API: {e}")
|
||||
return Response(
|
||||
content=f'{{"error": {{"type": "api_error", "message": "Failed to proxy request to Anthropic API: {str(e)}"}}}}',
|
||||
status_code=500,
|
||||
|
||||
Reference in New Issue
Block a user