From 208d6fefa9133666dfc233902dd4c6ae4a1fca6a Mon Sep 17 00:00:00 2001 From: cthomas Date: Thu, 10 Jul 2025 15:43:39 -0700 Subject: [PATCH] feat: make dispatch callback maintain own db connection (#3279) --- letta/services/job_manager.py | 46 ++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/letta/services/job_manager.py b/letta/services/job_manager.py index d9c4f7c3..dbba4a91 100644 --- a/letta/services/job_manager.py +++ b/letta/services/job_manager.py @@ -96,6 +96,8 @@ class JobManager: @trace_method async def update_job_by_id_async(self, job_id: str, job_update: JobUpdate, actor: PydanticUser) -> PydanticJob: """Update a job by its ID with the given JobUpdate object asynchronously.""" + callback_func = None + async with db_registry.async_session() as session: # Fetch the job by ID job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"]) @@ -115,11 +117,23 @@ class JobManager: logger.info(f"Current job completed at: {job.completed_at}") job.completed_at = get_utc_time().replace(tzinfo=None) if job.callback_url: - await self._dispatch_callback_async(job) + callback_func = self._dispatch_callback_async( + callback_url=job.callback_url, + payload={ + "job_id": job.id, + "status": job.status, + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "metadata": job.metadata_, + }, + actor=actor, + ) # Save the updated job to the database await job.update_async(db_session=session, actor=actor) + if callback_func: + return await callback_func + return job.to_pydantic() @enforce_types @@ -699,29 +713,33 @@ class JobManager: # Continue silently - callback failures should not affect job completion @trace_method - async def _dispatch_callback_async(self, job: JobModel) -> None: + async def _dispatch_callback_async(self, callback_url: str, payload: dict, actor: PydanticUser) -> PydanticJob: """ POST a standard JSON payload to job.callback_url and record timestamp + HTTP status asynchronously. """ - payload = { - "job_id": job.id, - "status": job.status, - "completed_at": job.completed_at.isoformat() if job.completed_at else None, - "metadata": job.metadata_, - } + job_id = payload["job_id"] + callback_sent_at, callback_status_code, callback_error = None, None, None try: async with AsyncClient() as client: log_event("POST callback dispatched", payload) - resp = await client.post(job.callback_url, json=payload, timeout=5.0) + resp = await client.post(callback_url, json=payload, timeout=5.0) log_event("POST callback finished") # Ensure timestamp is timezone-naive for DB compatibility - job.callback_sent_at = get_utc_time().replace(tzinfo=None) - job.callback_status_code = resp.status_code + callback_sent_at = get_utc_time().replace(tzinfo=None) + callback_status_code = resp.status_code except Exception as e: - error_message = f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {e!s}" + error_message = f"Failed to dispatch callback for job {job_id} to {callback_url}: {e!s}" logger.error(error_message) # Record the failed attempt - job.callback_sent_at = get_utc_time().replace(tzinfo=None) - job.callback_error = error_message + callback_sent_at = get_utc_time().replace(tzinfo=None) + callback_error = error_message # Continue silently - callback failures should not affect job completion + + async with db_registry.async_session() as session: + job = await JobModel.read_async(db_session=session, identifier=job_id, actor=actor, access_type=AccessType.USER) + job.callback_sent_at = callback_sent_at + job.callback_status_code = callback_status_code + job.callback_error = callback_error + await job.update_async(db_session=session, actor=actor) + return job.to_pydantic()