feat: Rework to make robust to gaps (#1498)

This commit is contained in:
Matthew Zhou
2025-04-01 10:05:40 -07:00
committed by GitHub
parent 9d63ccbeb2
commit 8afe3edff0

View File

@@ -252,17 +252,18 @@ class BlockManager:
@enforce_types
def undo_checkpoint_block(self, block_id: str, actor: PydanticUser, use_preloaded_block: Optional[BlockModel] = None) -> PydanticBlock:
"""
Move the block to the previous checkpoint (sequence_number - 1).
Move the block to the immediately previous checkpoint in BlockHistory.
If older sequences have been pruned, we jump to the largest sequence
number that is still < current_seq.
"""
with self.session_maker() as session:
# 1) Load current block
# 1) Load the current block
block = (
session.merge(use_preloaded_block)
if use_preloaded_block
else BlockModel.read(db_session=session, identifier=block_id, actor=actor)
)
# 2) Ensure there's a current checkpoint to undo from
if not block.current_history_entry_id:
raise ValueError(f"Block {block_id} has no history entry - cannot undo.")
@@ -271,37 +272,39 @@ class BlockManager:
raise NoResultFound(f"BlockHistory row not found for id={block.current_history_entry_id}")
current_seq = current_entry.sequence_number
if current_seq <= 1:
raise ValueError(f"Block {block_id} is at the first checkpoint (seq=1). Cannot undo further.")
# 3) Move to the previous sequence
previous_seq = current_seq - 1
block = self._move_block_to_sequence(session, block, previous_seq, actor)
# 2) Find the largest sequence < current_seq
previous_entry = (
session.query(BlockHistory)
.filter(BlockHistory.block_id == block.id, BlockHistory.sequence_number < current_seq)
.order_by(BlockHistory.sequence_number.desc())
.first()
)
if not previous_entry:
# No earlier checkpoint available
raise ValueError(f"Block {block_id} is already at the earliest checkpoint (seq={current_seq}). Cannot undo further.")
# 4) Commit once at the end
# 3) Move to that sequence
block = self._move_block_to_sequence(session, block, previous_entry.sequence_number, actor)
# 4) Commit
session.commit()
return block.to_pydantic() # type: ignore
return block.to_pydantic()
@enforce_types
def redo_checkpoint_block(self, block_id: str, actor: PydanticUser, use_preloaded_block: Optional[BlockModel] = None) -> PydanticBlock:
"""
Move the block to the next checkpoint (sequence_number + 1).
Raises:
ValueError: if the block is not pointing to a known checkpoint,
or if there's no higher sequence_number to move to.
NoResultFound: if the relevant BlockHistory row doesn't exist.
StaleDataError: on concurrency conflicts.
Move the block to the next checkpoint if it exists.
If some middle checkpoints have been pruned, we jump to the smallest
sequence > current_seq that remains.
"""
with self.session_maker() as session:
# 1) Load current block
block = (
session.merge(use_preloaded_block)
if use_preloaded_block
else BlockModel.read(db_session=session, identifier=block_id, actor=actor)
)
# 2) If no current_history_entry_id, can't redo
if not block.current_history_entry_id:
raise ValueError(f"Block {block_id} has no history entry - cannot redo.")
@@ -311,15 +314,17 @@ class BlockManager:
current_seq = current_entry.sequence_number
# We'll move to next_seq = current_seq + 1
next_seq = current_seq + 1
# 3) Move to the next sequence using our helper
try:
block = self._move_block_to_sequence(session, block, next_seq, actor)
except NoResultFound:
# Find the smallest sequence that is > current_seq
next_entry = (
session.query(BlockHistory)
.filter(BlockHistory.block_id == block.id, BlockHistory.sequence_number > current_seq)
.order_by(BlockHistory.sequence_number.asc())
.first()
)
if not next_entry:
raise ValueError(f"Block {block_id} is at the highest checkpoint (seq={current_seq}). Cannot redo further.")
# 4) Commit once
block = self._move_block_to_sequence(session, block, next_entry.sequence_number, actor)
session.commit()
return block.to_pydantic() # type: ignore
return block.to_pydantic()