feat(asyncify): migrate get active jobs (#2227)

This commit is contained in:
cthomas
2025-05-16 15:18:06 -07:00
committed by GitHub
parent 89054accca
commit 8ddec60d07
2 changed files with 32 additions and 3 deletions

View File

@@ -33,16 +33,16 @@ def list_jobs(
@router.get("/active", response_model=List[Job], operation_id="list_active_jobs")
def list_active_jobs(
async def list_active_jobs(
server: "SyncServer" = Depends(get_letta_server),
actor_id: Optional[str] = Header(None, alias="user_id"), # Extract user_id from header, default to None if not present
):
"""
List all active jobs.
"""
actor = server.user_manager.get_user_or_default(user_id=actor_id)
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
return server.job_manager.list_jobs(actor=actor, statuses=[JobStatus.created, JobStatus.running])
return await server.job_manager.list_jobs_async(actor=actor, statuses=[JobStatus.created, JobStatus.running])
@router.get("/{job_id}", response_model=Job, operation_id="retrieve_job")

View File

@@ -150,6 +150,35 @@ class JobManager:
)
return [job.to_pydantic() for job in jobs]
@enforce_types
async def list_jobs_async(
self,
actor: PydanticUser,
before: Optional[str] = None,
after: Optional[str] = None,
limit: Optional[int] = 50,
statuses: Optional[List[JobStatus]] = None,
job_type: JobType = JobType.JOB,
ascending: bool = True,
) -> List[PydanticJob]:
"""List all jobs with optional pagination and status filter."""
async with db_registry.async_session() as session:
filter_kwargs = {"user_id": actor.id, "job_type": job_type}
# Add status filter if provided
if statuses:
filter_kwargs["status"] = statuses
jobs = await JobModel.list_async(
db_session=session,
before=before,
after=after,
limit=limit,
ascending=ascending,
**filter_kwargs,
)
return [job.to_pydantic() for job in jobs]
@enforce_types
def delete_job_by_id(self, job_id: str, actor: PydanticUser) -> PydanticJob:
"""Delete a job by its ID."""