From 8afe3edff087182efd59825c7d570534fe22d936 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Tue, 1 Apr 2025 10:05:40 -0700 Subject: [PATCH] feat: Rework to make robust to gaps (#1498) --- letta/services/block_manager.py | 61 ++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 28 deletions(-) diff --git a/letta/services/block_manager.py b/letta/services/block_manager.py index 01b23d3b..01d3de26 100644 --- a/letta/services/block_manager.py +++ b/letta/services/block_manager.py @@ -252,17 +252,18 @@ class BlockManager: @enforce_types def undo_checkpoint_block(self, block_id: str, actor: PydanticUser, use_preloaded_block: Optional[BlockModel] = None) -> PydanticBlock: """ - Move the block to the previous checkpoint (sequence_number - 1). + Move the block to the immediately previous checkpoint in BlockHistory. + If older sequences have been pruned, we jump to the largest sequence + number that is still < current_seq. """ with self.session_maker() as session: - # 1) Load current block + # 1) Load the current block block = ( session.merge(use_preloaded_block) if use_preloaded_block else BlockModel.read(db_session=session, identifier=block_id, actor=actor) ) - # 2) Ensure there's a current checkpoint to undo from if not block.current_history_entry_id: raise ValueError(f"Block {block_id} has no history entry - cannot undo.") @@ -271,37 +272,39 @@ class BlockManager: raise NoResultFound(f"BlockHistory row not found for id={block.current_history_entry_id}") current_seq = current_entry.sequence_number - if current_seq <= 1: - raise ValueError(f"Block {block_id} is at the first checkpoint (seq=1). Cannot undo further.") - # 3) Move to the previous sequence - previous_seq = current_seq - 1 - block = self._move_block_to_sequence(session, block, previous_seq, actor) + # 2) Find the largest sequence < current_seq + previous_entry = ( + session.query(BlockHistory) + .filter(BlockHistory.block_id == block.id, BlockHistory.sequence_number < current_seq) + .order_by(BlockHistory.sequence_number.desc()) + .first() + ) + if not previous_entry: + # No earlier checkpoint available + raise ValueError(f"Block {block_id} is already at the earliest checkpoint (seq={current_seq}). Cannot undo further.") - # 4) Commit once at the end + # 3) Move to that sequence + block = self._move_block_to_sequence(session, block, previous_entry.sequence_number, actor) + + # 4) Commit session.commit() - return block.to_pydantic() # type: ignore + return block.to_pydantic() @enforce_types def redo_checkpoint_block(self, block_id: str, actor: PydanticUser, use_preloaded_block: Optional[BlockModel] = None) -> PydanticBlock: """ - Move the block to the next checkpoint (sequence_number + 1). - - Raises: - ValueError: if the block is not pointing to a known checkpoint, - or if there's no higher sequence_number to move to. - NoResultFound: if the relevant BlockHistory row doesn't exist. - StaleDataError: on concurrency conflicts. + Move the block to the next checkpoint if it exists. + If some middle checkpoints have been pruned, we jump to the smallest + sequence > current_seq that remains. """ with self.session_maker() as session: - # 1) Load current block block = ( session.merge(use_preloaded_block) if use_preloaded_block else BlockModel.read(db_session=session, identifier=block_id, actor=actor) ) - # 2) If no current_history_entry_id, can't redo if not block.current_history_entry_id: raise ValueError(f"Block {block_id} has no history entry - cannot redo.") @@ -311,15 +314,17 @@ class BlockManager: current_seq = current_entry.sequence_number - # We'll move to next_seq = current_seq + 1 - next_seq = current_seq + 1 - - # 3) Move to the next sequence using our helper - try: - block = self._move_block_to_sequence(session, block, next_seq, actor) - except NoResultFound: + # Find the smallest sequence that is > current_seq + next_entry = ( + session.query(BlockHistory) + .filter(BlockHistory.block_id == block.id, BlockHistory.sequence_number > current_seq) + .order_by(BlockHistory.sequence_number.asc()) + .first() + ) + if not next_entry: raise ValueError(f"Block {block_id} is at the highest checkpoint (seq={current_seq}). Cannot redo further.") - # 4) Commit once + block = self._move_block_to_sequence(session, block, next_entry.sequence_number, actor) + session.commit() - return block.to_pydantic() # type: ignore + return block.to_pydantic()