messages [LET-4636] (#5460)
messages Co-authored-by: Ari Webb <ari@letta.com>
This commit is contained in:
@@ -1,12 +1,11 @@
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
from fastapi import APIRouter, Body, Depends, Query
|
||||
from fastapi.exceptions import HTTPException
|
||||
from starlette.requests import Request
|
||||
|
||||
from letta.agents.letta_agent_batch import LettaAgentBatch
|
||||
from letta.errors import LettaInvalidArgumentError
|
||||
from letta.log import get_logger
|
||||
from letta.orm.errors import NoResultFound
|
||||
from letta.schemas.job import BatchJob, JobStatus, JobType, JobUpdate
|
||||
from letta.schemas.letta_request import CreateBatch
|
||||
from letta.schemas.letta_response import LettaBatchMessages
|
||||
@@ -42,7 +41,9 @@ async def create_batch(
|
||||
if content_length:
|
||||
length = int(content_length)
|
||||
if length > max_bytes:
|
||||
raise HTTPException(status_code=413, detail=f"Request too large ({length} bytes). Max is {max_bytes} bytes.")
|
||||
raise LettaInvalidArgumentError(
|
||||
message=f"Request too large ({length} bytes). Max is {max_bytes} bytes.", argument_name="content-length"
|
||||
)
|
||||
|
||||
if not settings.enable_batch_job_polling:
|
||||
logger.warning("Batch job polling is disabled. Enable batch processing by setting LETTA_ENABLE_BATCH_JOB_POLLING to True.")
|
||||
@@ -93,12 +94,8 @@ async def retrieve_batch(
|
||||
Retrieve the status and details of a batch run.
|
||||
"""
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
try:
|
||||
job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor)
|
||||
return BatchJob.from_job(job)
|
||||
except NoResultFound:
|
||||
raise HTTPException(status_code=404, detail="Batch not found")
|
||||
job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor)
|
||||
return BatchJob.from_job(job)
|
||||
|
||||
|
||||
@router.get("/batches", response_model=List[BatchJob], operation_id="list_batches")
|
||||
@@ -162,11 +159,8 @@ async def list_messages_for_batch(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
# Verify the batch job exists and the user has access to it
|
||||
try:
|
||||
job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor)
|
||||
BatchJob.from_job(job)
|
||||
except NoResultFound:
|
||||
raise HTTPException(status_code=404, detail="Batch not found")
|
||||
job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor)
|
||||
BatchJob.from_job(job)
|
||||
|
||||
# Get messages directly using our efficient method
|
||||
messages = await server.batch_manager.get_messages_for_letta_batch_async(
|
||||
@@ -187,23 +181,18 @@ async def cancel_batch(
|
||||
"""
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
try:
|
||||
job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor)
|
||||
job = await server.job_manager.update_job_by_id_async(job_id=job.id, job_update=JobUpdate(status=JobStatus.cancelled), actor=actor)
|
||||
job = await server.job_manager.get_job_by_id_async(job_id=batch_id, actor=actor)
|
||||
job = await server.job_manager.update_job_by_id_async(job_id=job.id, job_update=JobUpdate(status=JobStatus.cancelled), actor=actor)
|
||||
|
||||
# Get related llm batch jobs
|
||||
llm_batch_jobs = await server.batch_manager.list_llm_batch_jobs_async(letta_batch_id=job.id, actor=actor)
|
||||
for llm_batch_job in llm_batch_jobs:
|
||||
if llm_batch_job.status in {JobStatus.running, JobStatus.created}:
|
||||
# TODO: Extend to providers beyond anthropic
|
||||
# TODO: For now, we only support anthropic
|
||||
# Cancel the job
|
||||
anthropic_batch_id = llm_batch_job.create_batch_response.id
|
||||
await server.anthropic_async_client.messages.batches.cancel(anthropic_batch_id)
|
||||
# Get related llm batch jobs
|
||||
llm_batch_jobs = await server.batch_manager.list_llm_batch_jobs_async(letta_batch_id=job.id, actor=actor)
|
||||
for llm_batch_job in llm_batch_jobs:
|
||||
if llm_batch_job.status in {JobStatus.running, JobStatus.created}:
|
||||
# TODO: Extend to providers beyond anthropic
|
||||
# TODO: For now, we only support anthropic
|
||||
# Cancel the job
|
||||
anthropic_batch_id = llm_batch_job.create_batch_response.id
|
||||
await server.anthropic_async_client.messages.batches.cancel(anthropic_batch_id)
|
||||
|
||||
# Update all the batch_job statuses
|
||||
await server.batch_manager.update_llm_batch_status_async(
|
||||
llm_batch_id=llm_batch_job.id, status=JobStatus.cancelled, actor=actor
|
||||
)
|
||||
except NoResultFound:
|
||||
raise HTTPException(status_code=404, detail="Run not found")
|
||||
# Update all the batch_job statuses
|
||||
await server.batch_manager.update_llm_batch_status_async(llm_batch_id=llm_batch_job.id, status=JobStatus.cancelled, actor=actor)
|
||||
|
||||
Reference in New Issue
Block a user