feat: Implement resume step after request in new batch agent loop (#1676)

This commit is contained in:
Matthew Zhou
2025-04-15 13:56:22 -07:00
committed by GitHub
parent 68915e2356
commit cead849f19
20 changed files with 1059 additions and 183 deletions

View File

@@ -3,7 +3,7 @@ import datetime
from typing import List
from letta.jobs.helpers import map_anthropic_batch_job_status_to_job_status, map_anthropic_individual_batch_item_status_to_job_status
from letta.jobs.types import BatchId, BatchPollingResult, ItemUpdateInfo
from letta.jobs.types import BatchPollingResult, ItemUpdateInfo
from letta.log import get_logger
from letta.schemas.enums import JobStatus, ProviderType
from letta.schemas.llm_batch_job import LLMBatchJob
@@ -49,14 +49,14 @@ async def fetch_batch_status(server: SyncServer, batch_job: LLMBatchJob) -> Batc
response = await server.anthropic_async_client.beta.messages.batches.retrieve(batch_id_str)
new_status = map_anthropic_batch_job_status_to_job_status(response.processing_status)
logger.debug(f"[Poll BatchJob] Batch {batch_job.id}: provider={response.processing_status} → internal={new_status}")
return (batch_job.id, new_status, response)
return BatchPollingResult(batch_job.id, new_status, response)
except Exception as e:
logger.warning(f"[Poll BatchJob] Batch {batch_job.id}: failed to retrieve {batch_id_str}: {e}")
logger.error(f"[Poll BatchJob] Batch {batch_job.id}: failed to retrieve {batch_id_str}: {e}")
# We treat a retrieval error as still running to try again next cycle
return (batch_job.id, JobStatus.running, None)
return BatchPollingResult(batch_job.id, JobStatus.running, None)
async def fetch_batch_items(server: SyncServer, batch_id: BatchId, batch_resp_id: str) -> List[ItemUpdateInfo]:
async def fetch_batch_items(server: SyncServer, batch_id: str, batch_resp_id: str) -> List[ItemUpdateInfo]:
"""
Fetch individual item results for a completed batch.
@@ -73,7 +73,7 @@ async def fetch_batch_items(server: SyncServer, batch_id: BatchId, batch_resp_id
async for item_result in server.anthropic_async_client.beta.messages.batches.results(batch_resp_id):
# Here, custom_id should be the agent_id
item_status = map_anthropic_individual_batch_item_status_to_job_status(item_result)
updates.append((batch_id, item_result.custom_id, item_status, item_result))
updates.append(ItemUpdateInfo(batch_id, item_result.custom_id, item_status, item_result))
logger.info(f"[Poll BatchJob] Fetched {len(updates)} item updates for batch {batch_id}.")
except Exception as e:
logger.error(f"[Poll BatchJob] Error fetching item updates for batch {batch_id}: {e}")
@@ -193,7 +193,7 @@ async def poll_running_llm_batches(server: "SyncServer") -> None:
# 6. Bulk update all items for newly completed batch(es)
if item_updates:
metrics.updated_items_count = len(item_updates)
server.batch_manager.bulk_update_batch_items_by_agent(item_updates)
server.batch_manager.bulk_update_batch_items_results_by_agent(item_updates)
else:
logger.info("[Poll BatchJob] No item-level updates needed.")

View File

@@ -1,10 +1,30 @@
from typing import Optional, Tuple
from typing import NamedTuple, Optional
from anthropic.types.beta.messages import BetaMessageBatch, BetaMessageBatchIndividualResponse
from letta.schemas.enums import JobStatus
from letta.schemas.enums import AgentStepStatus, JobStatus
BatchId = str
AgentId = str
BatchPollingResult = Tuple[BatchId, JobStatus, Optional[BetaMessageBatch]]
ItemUpdateInfo = Tuple[BatchId, AgentId, JobStatus, BetaMessageBatchIndividualResponse]
class BatchPollingResult(NamedTuple):
batch_id: str
request_status: JobStatus
batch_response: Optional[BetaMessageBatch]
class ItemUpdateInfo(NamedTuple):
batch_id: str
agent_id: str
request_status: JobStatus
batch_request_result: Optional[BetaMessageBatchIndividualResponse]
class StepStatusUpdateInfo(NamedTuple):
batch_id: str
agent_id: str
step_status: AgentStepStatus
class RequestStatusUpdateInfo(NamedTuple):
batch_id: str
agent_id: str
request_status: JobStatus