diff --git a/letta/server/rest_api/routers/v1/messages.py b/letta/server/rest_api/routers/v1/messages.py index e610df28..1b4a2ca9 100644 --- a/letta/server/rest_api/routers/v1/messages.py +++ b/letta/server/rest_api/routers/v1/messages.py @@ -1,12 +1,11 @@ from typing import List, Literal, Optional from fastapi import APIRouter, Body, Depends, Query -from fastapi.exceptions import HTTPException from starlette.requests import Request from letta.agents.letta_agent_batch import LettaAgentBatch +from letta.errors import LettaInvalidArgumentError from letta.log import get_logger -from letta.orm.errors import NoResultFound from letta.schemas.job import BatchJob, JobStatus, JobType, JobUpdate from letta.schemas.letta_request import CreateBatch from letta.schemas.letta_response import LettaBatchMessages @@ -42,7 +41,9 @@ async def create_batch( if content_length: length = int(content_length) if length > max_bytes: - raise HTTPException(status_code=413, detail=f"Request too large ({length} bytes). Max is {max_bytes} bytes.") + raise LettaInvalidArgumentError( + message=f"Request too large ({length} bytes). Max is {max_bytes} bytes.", argument_name="content-length" + ) if not settings.enable_batch_job_polling: logger.warning("Batch job polling is disabled. Enable batch processing by setting LETTA_ENABLE_BATCH_JOB_POLLING to True.") @@ -93,12 +94,8 @@ async def retrieve_batch( Retrieve the status and details of a batch run. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - - try: - job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor) - return BatchJob.from_job(job) - except NoResultFound: - raise HTTPException(status_code=404, detail="Batch not found") + job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor) + return BatchJob.from_job(job) @router.get("/batches", response_model=List[BatchJob], operation_id="list_batches") @@ -162,11 +159,8 @@ async def list_messages_for_batch( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) # Verify the batch job exists and the user has access to it - try: - job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor) - BatchJob.from_job(job) - except NoResultFound: - raise HTTPException(status_code=404, detail="Batch not found") + job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor) + BatchJob.from_job(job) # Get messages directly using our efficient method messages = await server.batch_manager.get_messages_for_letta_batch_async( @@ -187,23 +181,18 @@ async def cancel_batch( """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - try: - job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor) - job = await server.job_manager.update_job_by_id_async(job_id=job.id, job_update=JobUpdate(status=JobStatus.cancelled), actor=actor) + job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor) + job = await server.job_manager.update_job_by_id_async(job_id=job.id, job_update=JobUpdate(status=JobStatus.cancelled), actor=actor) - # Get related llm batch jobs - llm_batch_jobs = await server.batch_manager.list_llm_batch_jobs_async(letta_batch_id=job.id, actor=actor) - for llm_batch_job in llm_batch_jobs: - if llm_batch_job.status in {JobStatus.running, JobStatus.created}: - # TODO: Extend to providers beyond anthropic - # TODO: For now, we only support anthropic - # Cancel the job - anthropic_batch_id = llm_batch_job.create_batch_response.id - await server.anthropic_async_client.messages.batches.cancel(anthropic_batch_id) + # Get related llm batch jobs + llm_batch_jobs = await server.batch_manager.list_llm_batch_jobs_async(letta_batch_id=job.id, actor=actor) + for llm_batch_job in llm_batch_jobs: + if llm_batch_job.status in {JobStatus.running, JobStatus.created}: + # TODO: Extend to providers beyond anthropic + # TODO: For now, we only support anthropic + # Cancel the job + anthropic_batch_id = llm_batch_job.create_batch_response.id + await server.anthropic_async_client.messages.batches.cancel(anthropic_batch_id) - # Update all the batch_job statuses - await server.batch_manager.update_llm_batch_status_async( - llm_batch_id=llm_batch_job.id, status=JobStatus.cancelled, actor=actor - ) - except NoResultFound: - raise HTTPException(status_code=404, detail="Run not found") + # Update all the batch_job statuses + await server.batch_manager.update_llm_batch_status_async(llm_batch_id=llm_batch_job.id, status=JobStatus.cancelled, actor=actor)