From 857c289ed22ca45bd7a9348677f164ec09deb0f7 Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 20 Feb 2026 12:18:41 -0800 Subject: [PATCH] fix: handle compact edge case in idempotency check (#9588) --- letta/agents/helpers.py | 21 +++- tests/integration_test_human_in_the_loop.py | 113 ++++++++++++++++++++ 2 files changed, 133 insertions(+), 1 deletion(-) diff --git a/letta/agents/helpers.py b/letta/agents/helpers.py index 70fdd856..28f5d304 100644 --- a/letta/agents/helpers.py +++ b/letta/agents/helpers.py @@ -269,12 +269,31 @@ async def _prepare_in_context_messages_no_persist_async( for msg in reversed(recent_messages): if msg.role == "tool" and validate_persisted_tool_call_ids(msg, input_messages[0]): logger.info( - f"Idempotency check: Found matching tool return in recent history. " + f"Idempotency check: Found matching tool return in recent in-context history. " f"tool_returns={msg.tool_returns}, approval_response.approvals={input_messages[0].approvals}" ) approval_already_processed = True break + # If not found in context and summarization just happened, check full history + non_system_summary_messages = [ + m for m in current_in_context_messages if m.role not in (MessageRole.system, MessageRole.summary) + ] + if not approval_already_processed and len(non_system_summary_messages) == 0: + last_tool_messages = await message_manager.list_messages( + actor=actor, + agent_id=agent_state.id, + roles=[MessageRole.tool], + limit=1, + ascending=False, # Most recent first + ) + if len(last_tool_messages) == 1 and validate_persisted_tool_call_ids(last_tool_messages[0], input_messages[0]): + logger.info( + f"Idempotency check: Found matching tool return in full history (post-compaction). " + f"tool_returns={last_tool_messages[0].tool_returns}, approval_response.approvals={input_messages[0].approvals}" + ) + approval_already_processed = True + if approval_already_processed: # Approval already handled, just process follow-up messages if any or manually inject keep-alive message keep_alive_messages = input_messages[1:] or [ diff --git a/tests/integration_test_human_in_the_loop.py b/tests/integration_test_human_in_the_loop.py index aa3484a4..fab4cc14 100644 --- a/tests/integration_test_human_in_the_loop.py +++ b/tests/integration_test_human_in_the_loop.py @@ -1445,3 +1445,116 @@ def test_approve_with_cancellation( assert len(messages) > 0, "Should have persisted new messages" assert messages[0].message_type == "user_message", "First message should be a user message" assert "keep-alive" in messages[0].content, f"Expected keep-alive message, got '{messages[0].content}'" + + +def test_retry_with_summarization( + client: Letta, + agent: AgentState, +) -> None: + """ + Test that approval retry works correctly after summarization evicts messages from context. + + Scenario: + 1. Send message that triggers approval request + 2. Send approval response, but cancel during LLM processing + 3. Call summarize with mode='all' to evict all messages from context + 4. Verify only system and summary messages remain in context + 5. Retry the original approval response - should succeed via idempotency check + """ + import threading + import time + + # Step 1: Send message that triggers approval request + response = client.agents.messages.create( + agent_id=agent.id, + messages=USER_MESSAGE_TEST_APPROVAL, + ) + tool_call_id = response.messages[-1].tool_call.tool_call_id + + # Step 2: Start cancellation in background thread + def cancel_after_delay(): + time.sleep(0.3) # Wait for stream to start + client.agents.messages.cancel(agent_id=agent.id) + + cancel_thread = threading.Thread(target=cancel_after_delay, daemon=True) + cancel_thread.start() + + # Step 3: Start approval stream (will be cancelled during processing) + response = client.agents.messages.stream( + agent_id=agent.id, + messages=[ + { + "type": "approval", + "approvals": [ + { + "type": "tool", + "tool_call_id": tool_call_id, + "tool_return": SECRET_CODE, + "status": "success", + }, + ], + }, + ], + streaming=True, + stream_tokens=True, + ) + + # Step 4: Accumulate chunks (stream will be cancelled) + messages = accumulate_chunks(response) + + # Step 5: Verify we got cancelled + stop_reasons = [msg for msg in messages if hasattr(msg, "message_type") and msg.message_type == "stop_reason"] + assert len(stop_reasons) == 1, f"Expected exactly 1 stop_reason, got {len(stop_reasons)}" + assert stop_reasons[0].stop_reason == "cancelled", f"Expected stop_reason 'cancelled', got '{stop_reasons[0].stop_reason}'" + + cancel_thread.join(timeout=1.0) + + # Step 6: Verify tool return message is persisted + all_messages = client.agents.messages.list(agent_id=agent.id, limit=100).items + tool_return_messages = [m for m in all_messages if m.message_type == "tool_return_message"] + assert len(tool_return_messages) > 0, "Tool return message should be persisted" + + # Step 7: Call compact with mode='all' to evict all messages from context + compaction_response = client.agents.messages.compact( + agent_id=agent.id, + compaction_settings={"mode": "all"}, + ) + + # Step 8: Verify only system and summary messages remain in context (should be 2) + assert compaction_response.num_messages_after == 2, ( + f"Expected 2 messages (system + summary) after compaction, but got {compaction_response.num_messages_after}" + ) + + logger.info(f"✅ After compaction: {compaction_response.num_messages_before} -> {compaction_response.num_messages_after} messages") + + # Step 9: Retry the original approval response - should succeed via idempotency check + response = client.agents.messages.stream( + agent_id=agent.id, + messages=[ + { + "type": "approval", + "approvals": [ + { + "type": "tool", + "tool_call_id": tool_call_id, + "tool_return": SECRET_CODE, + "status": "success", + }, + ], + }, + ], + streaming=True, + stream_tokens=True, + ) + + # Step 10: Accumulate chunks + messages = accumulate_chunks(response) + + # Step 11: Verify we got chunks AND an end_turn stop reason (not an error) + assert len(messages) > 1, "Should receive at least some chunks" + + stop_reasons = [msg for msg in messages if hasattr(msg, "message_type") and msg.message_type == "stop_reason"] + assert len(stop_reasons) == 1, f"Expected exactly 1 stop_reason, got {len(stop_reasons)}" + assert stop_reasons[0].stop_reason == "end_turn", f"Expected stop_reason 'end_turn', got '{stop_reasons[0].stop_reason}'" + + logger.info("✅ Test passed: approval retry after summarization handled correctly via idempotency check")