diff --git a/letta/server/rest_api/routers/v1/anthropic.py b/letta/server/rest_api/routers/v1/anthropic.py index 1656ecc7..e804fdca 100644 --- a/letta/server/rest_api/routers/v1/anthropic.py +++ b/letta/server/rest_api/routers/v1/anthropic.py @@ -63,6 +63,23 @@ def extract_assistant_message(response_data: dict) -> str: return "" +def is_topic_detection_response(message: str) -> bool: + """ + Check if the assistant message is a topic detection response (contains isNewTopic key). + These are Claude Code metadata responses that should not be persisted as conversation. + """ + try: + stripped = message.strip() + if stripped.startswith("{") and stripped.endswith("}"): + parsed = json.loads(stripped) + # Check for isNewTopic key which indicates topic detection + if "isNewTopic" in parsed: + return True + except (json.JSONDecodeError, AttributeError): + pass + return False + + def prepare_anthropic_headers(request: Request) -> dict | None: skip_headers = { "host", @@ -260,7 +277,10 @@ async def _persist_messages_background( assistant_message=assistant_message, model=model_name, ) - logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved") + if result.get("success"): + logger.info(f"[Anthropic Proxy] Persisted messages: {result['messages_created']} messages saved") + else: + logger.debug(f"[Anthropic Proxy] Skipped persistence: {result.get('reason', 'unknown')}") except Exception as e: logger.error(f"[Anthropic Proxy] Failed to persist messages in background: {e}") @@ -361,8 +381,12 @@ async def anthropic_messages_proxy( modified_body = json.dumps(modified_request_data).encode("utf-8") - # Forward the request to Anthropic API + # Forward the request to Anthropic API (preserve query params like ?beta=true) # Note: For streaming, we create the client outside the generator to keep it alive + anthropic_url = f"{ANTHROPIC_API_BASE}/v1/messages" + if request.url.query: + anthropic_url = f"{anthropic_url}?{request.url.query}" + if is_streaming: # Handle streaming response collected_chunks = [] @@ -372,7 +396,7 @@ async def anthropic_messages_proxy( async with httpx.AsyncClient(timeout=300.0) as client: async with client.stream( "POST", - f"{ANTHROPIC_API_BASE}/v1/messages", + anthropic_url, headers=anthropic_headers, content=modified_body, ) as response: @@ -385,21 +409,53 @@ async def anthropic_messages_proxy( if user_messages and assistant_message: logger.info("=" * 70) logger.info("📨 CAPTURED USER MESSAGE:") - for user_message in user_messages: - logger.info(f" {user_message[:200]}{'...' if len(user_message) > 200 else ''}") + for i, user_message in enumerate(user_messages): + logger.info(f" {i}: {user_message[:200]}{'...' if len(user_message) > 200 else ''}") logger.info("=" * 70) logger.info("🤖 CAPTURED ASSISTANT RESPONSE (streaming):") logger.info(f" {assistant_message[:200]}{'...' if len(assistant_message) > 200 else ''}") logger.info("=" * 70) + # Skip persisting topic detection responses (metadata, not conversation) + if is_topic_detection_response(assistant_message): + logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response") # Persist messages to database (non-blocking, skip for system requests) - if agent and user_messages: + elif agent: + # Check for duplicate user messages before creating background task + # This prevents race conditions where multiple requests persist the same message + user_messages_to_persist = user_messages.copy() if user_messages else [] + if user_messages_to_persist: + try: + from letta.schemas.enums import MessageRole + + recent_messages = await server.message_manager.list_messages( + agent_id=agent.id, + actor=actor, + limit=5, + roles=[MessageRole.user], + ascending=False, + ) + if recent_messages: + last_user_msg = recent_messages[0] + last_message_text = "" + if last_user_msg.content: + for content_block in last_user_msg.content: + if hasattr(content_block, "text"): + last_message_text += content_block.text + + incoming_msg = user_messages_to_persist[-1] + if last_message_text and last_message_text == incoming_msg: + logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...") + user_messages_to_persist = [] + except Exception as e: + logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}") + asyncio.create_task( _persist_messages_background( server=server, agent=agent, actor=actor, - user_messages=user_messages, + user_messages=user_messages_to_persist, assistant_message=assistant_message, model_name=model_name, ) @@ -419,7 +475,7 @@ async def anthropic_messages_proxy( try: # Handle non-streaming response response = await client.post( - f"{ANTHROPIC_API_BASE}/v1/messages", + anthropic_url, headers=anthropic_headers, content=modified_body, ) @@ -439,14 +495,41 @@ async def anthropic_messages_proxy( logger.info(f" {assistant_message[:500]}{'...' if len(assistant_message) > 500 else ''}") logger.info("=" * 70) + # Skip persisting topic detection responses (metadata, not conversation) + if is_topic_detection_response(assistant_message): + logger.debug("[Anthropic Proxy] Skipping persistence - topic detection response") # Persist messages to database (non-blocking) - if agent and user_messages: + elif agent: + # Check for duplicate user messages before creating background task + user_messages_to_persist = user_messages.copy() if user_messages else [] + if user_messages_to_persist: + try: + from letta.schemas.enums import MessageRole + + recent_messages = await server.message_manager.list_messages( + agent_id=agent.id, actor=actor, limit=5, roles=[MessageRole.user] + ) + if recent_messages: + last_user_msg = recent_messages[0] + last_message_text = "" + if last_user_msg.content: + for content_block in last_user_msg.content: + if hasattr(content_block, "text"): + last_message_text += content_block.text + + incoming_msg = user_messages_to_persist[-1] + if last_message_text and last_message_text == incoming_msg: + logger.info(f"[Anthropic Proxy] Skipping duplicate user message: {incoming_msg[:100]}...") + user_messages_to_persist = [] + except Exception as e: + logger.warning(f"[Anthropic Proxy] Failed to check for duplicate messages: {e}") + asyncio.create_task( _persist_messages_background( server=server, agent=agent, actor=actor, - user_messages=user_messages, + user_messages=user_messages_to_persist, assistant_message=assistant_message, model_name=model_name, )